Vector
OpenSearch implementation of vector search and CRUD capability.
References
NONE
OpenSearchVectorCapability(index_name, em_invoker, client, opensearch_url=None, query_field='text', vector_query_field='vector', retrieval_strategy=None, distance_strategy=None, connection_params=None, encryption=None)
OpenSearch implementation of VectorCapability protocol.
This class provides document CRUD operations and vector search using OpenSearch. Uses LangChain's OpenSearchVectorSearch for create and retrieve operations, and direct OpenSearch client for update and delete operations.
Attributes:
| Name | Type | Description |
|---|---|---|
index_name |
str
|
The name of the OpenSearch index. |
vector_store |
OpenSearchVectorSearch
|
The vector store instance. |
client |
AsyncOpenSearch
|
AsyncOpenSearch client for direct operations. |
em_invoker |
BaseEMInvoker
|
The embedding model to perform vectorization. |
Initialize the OpenSearch vector capability.
OpenSearchVectorSearch creates its own sync and async clients internally based on the provided connection parameters. The async client is used for operations like update, delete, and clear.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
index_name
|
str
|
The name of the OpenSearch index. |
required |
em_invoker
|
BaseEMInvoker
|
The embedding model to perform vectorization. |
required |
client
|
AsyncOpenSearch
|
The OpenSearch client for direct operations. |
required |
opensearch_url
|
str | None
|
The URL of the OpenSearch server. Used for LangChain's OpenSearchVectorSearch initialization. If None, will be extracted from client connection info. Defaults to None. |
None
|
query_field
|
str
|
The field name for text queries. Defaults to "text". |
'text'
|
vector_query_field
|
str
|
The field name for vector queries. Defaults to "vector". |
'vector'
|
retrieval_strategy
|
Any
|
Not used with OpenSearchVectorSearch (kept for API compatibility). |
None
|
distance_strategy
|
str | None
|
The distance strategy for retrieval. For example, "l2" for Euclidean distance, "l2squared" for squared Euclidean distance, "cosine" for cosine similarity, etc. Defaults to None. |
None
|
connection_params
|
dict[str, Any] | None
|
Additional connection parameters to override defaults. These will be merged with automatically detected parameters (authentication, SSL settings). User-provided params take precedence. Defaults to None. Available parameters include: 1. http_auth (tuple[str, str] | None): HTTP authentication tuple (username, password). 2. use_ssl (bool): Whether to use SSL/TLS. Defaults to True for HTTPS URLs. 3. verify_certs (bool): Whether to verify SSL certificates. Defaults to True for HTTPS URLs. 4. ssl_show_warn (bool): Whether to show SSL warnings. Defaults to True for HTTPS URLs. 5. ssl_assert_hostname (str | None): SSL hostname assertion. Defaults to None. 6. max_retries (int): Maximum number of retries for requests. Defaults to 3. 7. retry_on_timeout (bool): Whether to retry on timeouts. Defaults to True. 8. client_cert (str | None): Path to the client certificate file. Defaults to None. 9. client_key (str | None): Path to the client private key file. Defaults to None. 10. root_cert (str | None): Path to the root certificate file. Defaults to None. 11. Additional kwargs: Any other parameters accepted by OpenSearch client constructor. |
None
|
encryption
|
OpenSearchEncryptionCapability | None
|
Encryption capability for field-level encryption. Defaults to None. |
None
|
em_invoker
property
Returns the EM Invoker instance.
Returns:
| Name | Type | Description |
|---|---|---|
BaseEMInvoker |
BaseEMInvoker
|
The EM Invoker instance. |
clear(**kwargs)
async
Clear all records from the datastore.
Examples:
from gllm_datastore.core.filters import filter as F
# Clear all chunks
await vector_capability.clear()
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
**kwargs
|
Any
|
Datastore-specific parameters. |
{}
|
create(data, **kwargs)
async
Create new records in the datastore.
This method will automatically encrypt the content and metadata of the chunks if encryption is enabled following the encryption configuration. When encryption is enabled, embeddings are generated from plaintext first, then chunks are encrypted, ensuring that embeddings represent the original content rather than encrypted ciphertext.
Examples:
from gllm_datastore.core.filters import filter as F
# Create a single chunk
await vector_capability.create(data=Chunk(content="Hello, world!", metadata={"source": "test"}))
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
data
|
Chunk | list[Chunk]
|
Data to create (single item or collection). |
required |
**kwargs
|
Any
|
Datastore-specific parameters. |
{}
|
Raises:
| Type | Description |
|---|---|
ValueError
|
If data structure is invalid. |
create_from_vector(chunk_vectors, **kwargs)
async
Add pre-computed embeddings directly.
This method will automatically encrypt the content and metadata of the chunks if encryption is enabled following the encryption configuration.
Examples:
from gllm_datastore.core.filters import filter as F
# Create a single chunk
await vector_capability.create_from_vector(
chunk_vectors=[
(Chunk(content="Hello, world!", metadata={"source": "test"}), Vector([0.1, 0.2, 0.3])),
(Chunk(content="Hello, another world!", metadata={"source": "test"}), Vector([0.4, 0.5, 0.6])),
]
)
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
chunk_vectors
|
list[tuple[Chunk, Vector]]
|
List of tuples containing chunks and their corresponding vectors. |
required |
**kwargs
|
Any
|
Datastore-specific parameters. |
{}
|
Returns:
| Type | Description |
|---|---|
list[str]
|
list[str]: List of IDs of the added documents. |
delete(filters=None, **kwargs)
async
Delete records from the data store based on filters.
Warning
Filters cannot target encrypted fields. If you try to delete documents based on an encrypted metadata field (e.g., filters=F.eq("metadata.secret", "val")), the filter will fail to match because the filter value is not encrypted but the stored data is. Always use non-encrypted fields (like 'id') in filters when working with encrypted data.
Examples:
from gllm_datastore.core.filters import filter as F
# Delete a single chunk
await vector_capability.delete(filters=F.eq("id", "document_id"))
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
filters
|
FilterClause | QueryFilter | None
|
Filters to select records for deletion. FilterClause objects are automatically converted to QueryFilter internally. Cannot use encrypted fields in filters. Defaults to None. |
None
|
**kwargs
|
Any
|
Datastore-specific parameters. |
{}
|
delete_by_id(id, **kwargs)
async
Delete records from the data store based on IDs.
Examples:
from gllm_datastore.core.filters import filter as F
# Delete a single chunk
await vector_capability.delete_by_id(id="document_id")
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
id
|
str | list[str]
|
ID or list of IDs to delete. |
required |
**kwargs
|
Any
|
Datastore-specific parameters. |
{}
|
ensure_index(mapping=None, index_settings=None, dimension=None, distance_strategy=None)
async
Ensure OpenSearch index exists, creating it if necessary.
This method is idempotent - if the index already exists, it will skip creation and return early.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
mapping
|
dict[str, Any] | None
|
Custom mapping dictionary to use for index creation. If provided, this mapping will be used directly. The mapping should follow OpenSearch mapping format. Defaults to None, in which default mapping will be used. |
None
|
index_settings
|
dict[str, Any] | None
|
Custom index settings. These settings will be merged with any default settings. Defaults to None. |
None
|
dimension
|
int | None
|
Vector dimension. If not provided and mapping is not provided, will be inferred from em_invoker by generating a test embedding. |
None
|
distance_strategy
|
str | None
|
Distance strategy for vector similarity. Supported values: "l2", "l2squared", "cosine", "innerproduct", etc. Only used when building default mapping. Defaults to "l2" if not specified. |
None
|
Raises:
| Type | Description |
|---|---|
ValueError
|
If mapping is invalid or required parameters are missing. |
RuntimeError
|
If index creation fails. |
retrieve(query, filters=None, options=None, **kwargs)
async
Semantic search using text query converted to vector.
This method will automatically decrypt the content and metadata of the chunks if encryption is enabled following the encryption configuration.
Warning
Filters cannot target encrypted fields. If you try to filter by an encrypted metadata
field (e.g., filters=F.eq("metadata.secret", "val")), the filter will fail to match
because the filter value is not encrypted but the stored data is. Always use non-encrypted
fields in filters when working with encrypted data.
Examples:
from gllm_datastore.core.filters import filter as F
# Direct FilterClause usage - using non-encrypted field
await vector_capability.retrieve(
query="What is the capital of France?",
filters=F.eq("id", "document_id"),
options=QueryOptions(limit=10),
)
# Multiple filters - using non-encrypted fields
filters = F.and_(F.eq("id", "doc1"), F.eq("id", "doc2"))
await vector_capability.retrieve(query="What is the capital of France?", filters=filters)
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
query
|
str
|
Text query to embed and search for. |
required |
filters
|
FilterClause | QueryFilter | None
|
Filters to apply to the search. FilterClause objects are automatically converted to QueryFilter internally. Cannot use encrypted fields in filters. Defaults to None. |
None
|
options
|
QueryOptions | None
|
Options to apply to the search. Defaults to None. |
None
|
**kwargs
|
Any
|
Datastore-specific parameters. |
{}
|
Returns:
| Type | Description |
|---|---|
list[Chunk]
|
list[Chunk]: List of chunks ordered by relevance score. |
retrieve_by_vector(vector, filters=None, options=None, **kwargs)
async
Direct vector similarity search.
Warning
Filters cannot target encrypted fields. If you try to filter by an encrypted metadata
field (e.g., filters=F.eq("metadata.secret", "val")), the filter will fail to match
because the filter value is not encrypted but the stored data is. Always use non-encrypted
fields in filters when working with encrypted data.
Examples:
from gllm_datastore.core.filters import filter as F
# Direct FilterClause usage - using non-encrypted field
await vector_capability.retrieve_by_vector(
vector=[0.1, 0.2, 0.3],
filters=F.eq("id", "document_id"),
options=QueryOptions(limit=10),
)
# Multiple filters - using non-encrypted fields
filters = F.and_(F.eq("id", "doc1"), F.eq("id", "doc2"))
await vector_capability.retrieve_by_vector(vector=[0.1, 0.2, 0.3], filters=filters)
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
vector
|
Vector
|
Query embedding vector. |
required |
filters
|
FilterClause | QueryFilter | None
|
Filters to apply to the search. FilterClause objects are automatically converted to QueryFilter internally. Cannot use encrypted fields in filters. Defaults to None. |
None
|
options
|
QueryOptions | None
|
Options to apply to the search. Defaults to None. |
None
|
**kwargs
|
Any
|
Datastore-specific parameters. |
{}
|
Returns:
| Type | Description |
|---|---|
list[Chunk]
|
list[Chunk]: List of chunks ordered by similarity score. |
update(update_values, filters=None, **kwargs)
async
Update existing records in the datastore.
This method will automatically encrypt the content and metadata in update_values if encryption is enabled following the encryption configuration.
Warning
Filters cannot target encrypted fields. While update_values are encrypted before
being written, the filters used to identify which documents to update are NOT encrypted.
If you try to update documents based on an encrypted metadata field (e.g.,
filters=F.eq("metadata.secret", "val")), the filter will fail to match because
the filter value is not encrypted but the stored data is. Always use non-encrypted
fields (like "id") in filters when working with encrypted data.
Examples:
from gllm_datastore.core.filters import filter as F
# Update content - using non-encrypted field for filter
await vector_capability.update(
update_values={"content": "new_content"},
filters=F.eq("id", "unique_id"),
)
# Update metadata - using non-encrypted field for filter
await vector_capability.update(
update_values={"metadata": {"status": "published"}},
filters=F.eq("id", "unique_id"),
)
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
update_values
|
dict[str, Any]
|
Values to update. |
required |
filters
|
FilterClause | QueryFilter | None
|
Filters to select records to update. FilterClause objects are automatically converted to QueryFilter internally. Cannot use encrypted fields in filters. Defaults to None. |
None
|
**kwargs
|
Any
|
Datastore-specific parameters. |
{}
|