Skip to content

Retrieval

Retrieval blocks for GLLM RAG.

Convenience functions and utilities for retrieval pipeline steps.

AgenticRetrievalPipeline(retriever, selector, evaluator, max_rounds=2, top_k=10, max_top_k=None, selected_chunk_limit=5, on_insufficient=OnInsufficientPolicy.BEST_EFFORT)

Bounded agentic retrieval pipeline block.

The pipeline follows the pattern: normalize_state -> while_do(retrieve -> select -> evaluate -> prepare_retry) -> finalize.

When on_insufficient is OnInsufficientPolicy.EMPTY, exhausted loops with no sufficient evidence return an empty chunk list. When OnInsufficientPolicy.BEST_EFFORT, the last selected chunks are kept. When max_top_k is set, top_k growth per retry is capped at that value.

Attributes:

Name Type Description
retriever BaseRetriever

Retriever used in each retrieval round.

selector SelectorStage

Selector stage for chunk selection.

evaluator EvaluatorStage

Evaluator stage for sufficiency checks.

max_rounds int

Maximum retrieval rounds.

top_k int

Initial retrieval candidate size.

max_top_k int | None

Optional upper bound for retry growth of top_k.

selected_chunk_limit int

Maximum selected chunk count per round.

on_insufficient OnInsufficientPolicy

Terminal behavior when evidence is insufficient after loop exhaustion.

Initialize the bounded retrieval block configuration.

Parameters:

Name Type Description Default
retriever BaseRetriever

Retriever used for each loop round.

required
selector SelectorStage

Selector stage used to choose final chunks from candidates.

required
evaluator EvaluatorStage

Evaluator stage used to determine evidence sufficiency.

required
max_rounds int

Maximum retrieval rounds.

2
top_k int

Initial retrieval candidate size.

10
max_top_k int | None

Optional upper bound for top_k during retries.

None
selected_chunk_limit int

Maximum selected chunk count per round.

5
on_insufficient OnInsufficientPolicy | str

Terminal behavior when evidence remains insufficient.

BEST_EFFORT

Raises:

Type Description
ValueError

If numeric limits or policy value are invalid.

TypeError

If selector or evaluator stage types are unsupported.

build()

Materialize a bounded retrieval loop as a pipeline.

Returns:

Name Type Description
Pipeline Pipeline

Materialized pipeline implementing bounded retrieve-select-evaluate behavior.

QueryExpansionPipeline(transformer, fuse_fn='rrf')

Default QueryExpansionPipeline block.

Wires: Transform → Retrieve (per query, via BaseRetriever batch mode) → Fuse. The block is not a Pipeline at construction time; it becomes one via .build(retriever).

Initializes a new QueryExpansionPipeline.

Resolves fuse_fn immediately at construction time so that invalid values raise before any pipeline is built.

Parameters:

Name Type Description Default
transformer BaseQueryTransformer

The query transformer component used to expand the user query into sub-queries.

required
fuse_fn str | Callable[[list[list[Chunk]]], list[Chunk]]

Fusion function name ("rrf" or "concat") or a custom callable that accepts list[list[Chunk]] and returns list[Chunk]. Defaults to "rrf".

'rrf'

build(retriever)

Builds and returns a new Pipeline for query expansion retrieval.

Each call returns an independent Pipeline instance — no caching is performed on this block itself.

The pipeline executes three named steps composed with the | operator: 1. expand_query: step(transformer)queries 2. retrieve_chunks: step(retriever)chunk_lists (list of queries dispatches to concurrent batch retrieval internally) 3. fuse_chunks: fuse(self._fusion_fn)chunks

Steps 1 and 2 use step() (ComponentStep) because BaseRetriever.retrieve() accepts **kwargs. The transformer is also wrapped in step() for consistency. map_reduce is intentionally not used here because cost-visibility threshold is met at the strategy level only in MapReduceRAG.

Parameters:

Name Type Description Default
retriever BaseRetriever

The retriever component used to fetch chunks for each sub-query.

required

Returns:

Name Type Description
Pipeline Pipeline

A new Pipeline with state_type=_QueryExpansionState (a StrategyState subclass that adds the intermediate chunk_lists key).

Raises:

Type Description
RuntimeError

If retriever is None.

fuse(fn, input_state, output_state, name=None)

Returns a gllm-pipeline transform step wrapping the given fusion function.

This convenience function creates a StateOperatorStep that applies the specified fusion function to chunk lists from the pipeline state.

The fusion function receives a list of chunk lists and returns a single fused list of chunks. Named strategies "rrf" (Reciprocal Rank Fusion) and "concat" (concatenation with deduplication) are built-in.

Examples:

# Using named strategy
fuse_step = fuse(fn="rrf", input_state="chunk_lists", output_state="chunks")

# Using custom callable
fuse_step = fuse(
    fn=lambda chunk_lists: chunk_lists[0],
    input_state="chunk_lists",
    output_state="chunks",
)

Parameters:

Name Type Description Default
fn str | Callable[[list[list[Chunk]]], list[Chunk]]

Fusion function name ("rrf" or "concat") or a custom callable that accepts list[list[Chunk]] and returns list[Chunk].

required
input_state str

State key containing the list of chunk lists to fuse.

required
output_state str

State key to store the fused result under.

required
name str | None

A unique identifier for this pipeline step. Defaults to None, in which case the name will be auto-generated.

None

Returns:

Name Type Description
StateOperatorStep StateOperatorStep

A pipeline step configured to run the fusion operation.

Raises:

Type Description
ValueError

If fn is an unknown string name.

TypeError

If fn is neither a string nor a callable.