Skip to content

Vector

OpenSearch implementation of vector search and CRUD capability.

Authors

Kadek Denaya (kadek.d.r.diana@gdplabs.id)

References

NONE

OpenSearchVectorCapability(index_name, em_invoker, client, opensearch_url=None, query_field='text', vector_query_field='vector', retrieval_strategy=None, distance_strategy=None, connection_params=None, encryption=None)

OpenSearch implementation of VectorCapability protocol.

This class provides document CRUD operations and vector search using OpenSearch. Uses LangChain's OpenSearchVectorSearch for create and retrieve operations, and direct OpenSearch client for update and delete operations.

Attributes:

Name Type Description
index_name str

The name of the OpenSearch index.

vector_store OpenSearchVectorSearch

The vector store instance.

client AsyncOpenSearch

AsyncOpenSearch client for direct operations.

em_invoker BaseEMInvoker

The embedding model to perform vectorization.

Initialize the OpenSearch vector capability.

OpenSearchVectorSearch creates its own sync and async clients internally based on the provided connection parameters. The async client is used for operations like update, delete, and clear.

Parameters:

Name Type Description Default
index_name str

The name of the OpenSearch index.

required
em_invoker BaseEMInvoker

The embedding model to perform vectorization.

required
client AsyncOpenSearch

The OpenSearch client for direct operations.

required
opensearch_url str | None

The URL of the OpenSearch server. Used for LangChain's OpenSearchVectorSearch initialization. If None, will be extracted from client connection info. Defaults to None.

None
query_field str

The field name for text queries. Defaults to "text".

'text'
vector_query_field str

The field name for vector queries. Defaults to "vector".

'vector'
retrieval_strategy Any

Not used with OpenSearchVectorSearch (kept for API compatibility).

None
distance_strategy str | None

The distance strategy for retrieval. For example, "l2" for Euclidean distance, "l2squared" for squared Euclidean distance, "cosine" for cosine similarity, etc. Defaults to None.

None
connection_params dict[str, Any] | None

Additional connection parameters to override defaults. These will be merged with automatically detected parameters (authentication, SSL settings). User-provided params take precedence. Defaults to None. Available parameters include: 1. http_auth (tuple[str, str] | None): HTTP authentication tuple (username, password). 2. use_ssl (bool): Whether to use SSL/TLS. Defaults to True for HTTPS URLs. 3. verify_certs (bool): Whether to verify SSL certificates. Defaults to True for HTTPS URLs. 4. ssl_show_warn (bool): Whether to show SSL warnings. Defaults to True for HTTPS URLs. 5. ssl_assert_hostname (str | None): SSL hostname assertion. Defaults to None. 6. max_retries (int): Maximum number of retries for requests. Defaults to 3. 7. retry_on_timeout (bool): Whether to retry on timeouts. Defaults to True. 8. client_cert (str | None): Path to the client certificate file. Defaults to None. 9. client_key (str | None): Path to the client private key file. Defaults to None. 10. root_cert (str | None): Path to the root certificate file. Defaults to None. 11. Additional kwargs: Any other parameters accepted by OpenSearch client constructor.

None
encryption OpenSearchEncryptionCapability | None

Encryption capability for field-level encryption. Defaults to None.

None

em_invoker property

Returns the EM Invoker instance.

Returns:

Name Type Description
BaseEMInvoker BaseEMInvoker

The EM Invoker instance.

clear(**kwargs) async

Clear all records from the datastore.

Examples:

from gllm_datastore.core.filters import filter as F

# Clear all chunks
await vector_capability.clear()

Parameters:

Name Type Description Default
**kwargs Any

Datastore-specific parameters.

{}

create(data, **kwargs) async

Create new records in the datastore.

This method will automatically encrypt the content and metadata of the chunks if encryption is enabled following the encryption configuration. When encryption is enabled, embeddings are generated from plaintext first, then chunks are encrypted, ensuring that embeddings represent the original content rather than encrypted ciphertext.

Examples:

from gllm_datastore.core.filters import filter as F

# Create a single chunk
await vector_capability.create(data=Chunk(content="Hello, world!", metadata={"source": "test"}))

Parameters:

Name Type Description Default
data Chunk | list[Chunk]

Data to create (single item or collection).

required
**kwargs Any

Datastore-specific parameters.

{}

Raises:

Type Description
ValueError

If data structure is invalid.

create_from_vector(chunk_vectors, **kwargs) async

Add pre-computed embeddings directly.

This method will automatically encrypt the content and metadata of the chunks if encryption is enabled following the encryption configuration.

Examples:

from gllm_datastore.core.filters import filter as F

# Create a single chunk
await vector_capability.create_from_vector(
    chunk_vectors=[
        (Chunk(content="Hello, world!", metadata={"source": "test"}), Vector([0.1, 0.2, 0.3])),
        (Chunk(content="Hello, another world!", metadata={"source": "test"}), Vector([0.4, 0.5, 0.6])),
    ]
)

Parameters:

Name Type Description Default
chunk_vectors list[tuple[Chunk, Vector]]

List of tuples containing chunks and their corresponding vectors.

required
**kwargs Any

Datastore-specific parameters.

{}

Returns:

Type Description
list[str]

list[str]: List of IDs of the added documents.

delete(filters=None, **kwargs) async

Delete records from the data store based on filters.

Warning

Filters cannot target encrypted fields. If you try to delete documents based on an encrypted metadata field (e.g., filters=F.eq("metadata.secret", "val")), the filter will fail to match because the filter value is not encrypted but the stored data is. Always use non-encrypted fields (like 'id') in filters when working with encrypted data.

Examples:

from gllm_datastore.core.filters import filter as F

# Delete a single chunk
await vector_capability.delete(filters=F.eq("id", "document_id"))

Parameters:

Name Type Description Default
filters FilterClause | QueryFilter | None

Filters to select records for deletion. FilterClause objects are automatically converted to QueryFilter internally. Cannot use encrypted fields in filters. Defaults to None.

None
**kwargs Any

Datastore-specific parameters.

{}

delete_by_id(id, **kwargs) async

Delete records from the data store based on IDs.

Examples:

from gllm_datastore.core.filters import filter as F

# Delete a single chunk
await vector_capability.delete_by_id(id="document_id")

Parameters:

Name Type Description Default
id str | list[str]

ID or list of IDs to delete.

required
**kwargs Any

Datastore-specific parameters.

{}

ensure_index(mapping=None, index_settings=None, dimension=None, distance_strategy=None) async

Ensure OpenSearch index exists, creating it if necessary.

This method is idempotent - if the index already exists, it will skip creation and return early.

Parameters:

Name Type Description Default
mapping dict[str, Any] | None

Custom mapping dictionary to use for index creation. If provided, this mapping will be used directly. The mapping should follow OpenSearch mapping format. Defaults to None, in which default mapping will be used.

None
index_settings dict[str, Any] | None

Custom index settings. These settings will be merged with any default settings. Defaults to None.

None
dimension int | None

Vector dimension. If not provided and mapping is not provided, will be inferred from em_invoker by generating a test embedding.

None
distance_strategy str | None

Distance strategy for vector similarity. Supported values: "l2", "l2squared", "cosine", "innerproduct", etc. Only used when building default mapping. Defaults to "l2" if not specified.

None

Raises:

Type Description
ValueError

If mapping is invalid or required parameters are missing.

RuntimeError

If index creation fails.

retrieve(query, filters=None, options=None, **kwargs) async

Semantic search using text query converted to vector.

This method will automatically decrypt the content and metadata of the chunks if encryption is enabled following the encryption configuration.

Warning

Filters cannot target encrypted fields. If you try to filter by an encrypted metadata field (e.g., filters=F.eq("metadata.secret", "val")), the filter will fail to match because the filter value is not encrypted but the stored data is. Always use non-encrypted fields in filters when working with encrypted data.

Examples:

from gllm_datastore.core.filters import filter as F

# Direct FilterClause usage - using non-encrypted field
await vector_capability.retrieve(
    query="What is the capital of France?",
    filters=F.eq("id", "document_id"),
    options=QueryOptions(limit=10),
)

# Multiple filters - using non-encrypted fields
filters = F.and_(F.eq("id", "doc1"), F.eq("id", "doc2"))
await vector_capability.retrieve(query="What is the capital of France?", filters=filters)

Parameters:

Name Type Description Default
query str

Text query to embed and search for.

required
filters FilterClause | QueryFilter | None

Filters to apply to the search. FilterClause objects are automatically converted to QueryFilter internally. Cannot use encrypted fields in filters. Defaults to None.

None
options QueryOptions | None

Options to apply to the search. Defaults to None.

None
**kwargs Any

Datastore-specific parameters.

{}

Returns:

Type Description
list[Chunk]

list[Chunk]: List of chunks ordered by relevance score.

retrieve_by_vector(vector, filters=None, options=None, **kwargs) async

Direct vector similarity search.

Warning

Filters cannot target encrypted fields. If you try to filter by an encrypted metadata field (e.g., filters=F.eq("metadata.secret", "val")), the filter will fail to match because the filter value is not encrypted but the stored data is. Always use non-encrypted fields in filters when working with encrypted data.

Examples:

from gllm_datastore.core.filters import filter as F

# Direct FilterClause usage - using non-encrypted field
await vector_capability.retrieve_by_vector(
    vector=[0.1, 0.2, 0.3],
    filters=F.eq("id", "document_id"),
    options=QueryOptions(limit=10),
)

# Multiple filters - using non-encrypted fields
filters = F.and_(F.eq("id", "doc1"), F.eq("id", "doc2"))
await vector_capability.retrieve_by_vector(vector=[0.1, 0.2, 0.3], filters=filters)

Parameters:

Name Type Description Default
vector Vector

Query embedding vector.

required
filters FilterClause | QueryFilter | None

Filters to apply to the search. FilterClause objects are automatically converted to QueryFilter internally. Cannot use encrypted fields in filters. Defaults to None.

None
options QueryOptions | None

Options to apply to the search. Defaults to None.

None
**kwargs Any

Datastore-specific parameters.

{}

Returns:

Type Description
list[Chunk]

list[Chunk]: List of chunks ordered by similarity score.

update(update_values, filters=None, **kwargs) async

Update existing records in the datastore.

This method will automatically encrypt the content and metadata in update_values if encryption is enabled following the encryption configuration.

Warning

Filters cannot target encrypted fields. While update_values are encrypted before being written, the filters used to identify which documents to update are NOT encrypted. If you try to update documents based on an encrypted metadata field (e.g., filters=F.eq("metadata.secret", "val")), the filter will fail to match because the filter value is not encrypted but the stored data is. Always use non-encrypted fields (like "id") in filters when working with encrypted data.

Examples:

from gllm_datastore.core.filters import filter as F

# Update content - using non-encrypted field for filter
await vector_capability.update(
    update_values={"content": "new_content"},
    filters=F.eq("id", "unique_id"),
)

# Update metadata - using non-encrypted field for filter
await vector_capability.update(
    update_values={"metadata": {"status": "published"}},
    filters=F.eq("id", "unique_id"),
)

Parameters:

Name Type Description Default
update_values dict[str, Any]

Values to update.

required
filters FilterClause | QueryFilter | None

Filters to select records to update. FilterClause objects are automatically converted to QueryFilter internally. Cannot use encrypted fields in filters. Defaults to None.

None
**kwargs Any

Datastore-specific parameters.

{}