Retrieval
Retrieval blocks for GLLM RAG.
Convenience functions and utilities for retrieval pipeline steps.
AgenticRetrievalPipeline(retriever, selector, evaluator, max_rounds=2, top_k=10, max_top_k=None, selected_chunk_limit=5, on_insufficient=OnInsufficientPolicy.BEST_EFFORT)
Bounded agentic retrieval pipeline block.
The pipeline follows the pattern:
normalize_state -> while_do(retrieve -> select -> evaluate -> prepare_retry) -> finalize.
When on_insufficient is OnInsufficientPolicy.EMPTY, exhausted loops with
no sufficient evidence return an empty chunk list. When
OnInsufficientPolicy.BEST_EFFORT, the last selected chunks are kept.
When max_top_k is set, top_k growth per retry is capped at that value.
Attributes:
| Name | Type | Description |
|---|---|---|
retriever |
BaseRetriever
|
Retriever used in each retrieval round. |
selector |
SelectorStage
|
Selector stage for chunk selection. |
evaluator |
EvaluatorStage
|
Evaluator stage for sufficiency checks. |
max_rounds |
int
|
Maximum retrieval rounds. |
top_k |
int
|
Initial retrieval candidate size. |
max_top_k |
int | None
|
Optional upper bound for retry growth of |
selected_chunk_limit |
int
|
Maximum selected chunk count per round. |
on_insufficient |
OnInsufficientPolicy
|
Terminal behavior when evidence is insufficient after loop exhaustion. |
Initialize the bounded retrieval block configuration.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
retriever
|
BaseRetriever
|
Retriever used for each loop round. |
required |
selector
|
SelectorStage
|
Selector stage used to choose final chunks from candidates. |
required |
evaluator
|
EvaluatorStage
|
Evaluator stage used to determine evidence sufficiency. |
required |
max_rounds
|
int
|
Maximum retrieval rounds. |
2
|
top_k
|
int
|
Initial retrieval candidate size. |
10
|
max_top_k
|
int | None
|
Optional upper bound for top_k during retries. |
None
|
selected_chunk_limit
|
int
|
Maximum selected chunk count per round. |
5
|
on_insufficient
|
OnInsufficientPolicy | str
|
Terminal behavior when evidence remains insufficient. |
BEST_EFFORT
|
Raises:
| Type | Description |
|---|---|
ValueError
|
If numeric limits or policy value are invalid. |
TypeError
|
If selector or evaluator stage types are unsupported. |
build()
Materialize a bounded retrieval loop as a pipeline.
Returns:
| Name | Type | Description |
|---|---|---|
Pipeline |
Pipeline
|
Materialized pipeline implementing bounded retrieve-select-evaluate behavior. |
QueryExpansionPipeline(transformer, fuse_fn='rrf')
Default QueryExpansionPipeline block.
Wires: Transform → Retrieve (per query, via BaseRetriever batch mode) → Fuse. The block is not a Pipeline at construction time; it becomes one via .build(retriever).
Initializes a new QueryExpansionPipeline.
Resolves fuse_fn immediately at construction time so that invalid values raise before any pipeline is built.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
transformer
|
BaseQueryTransformer
|
The query transformer component used to expand the user query into sub-queries. |
required |
fuse_fn
|
str | Callable[[list[list[Chunk]]], list[Chunk]]
|
Fusion function name ("rrf" or "concat") or a custom callable that accepts list[list[Chunk]] and returns list[Chunk]. Defaults to "rrf". |
'rrf'
|
build(retriever)
Builds and returns a new Pipeline for query expansion retrieval.
Each call returns an independent Pipeline instance — no caching is performed on this block itself.
The pipeline executes three named steps composed with the | operator:
1. expand_query: step(transformer) → queries
2. retrieve_chunks: step(retriever) → chunk_lists
(list of queries dispatches to concurrent batch retrieval internally)
3. fuse_chunks: fuse(self._fusion_fn) → chunks
Steps 1 and 2 use step() (ComponentStep) because BaseRetriever.retrieve()
accepts **kwargs. The transformer is also wrapped in step() for consistency.
map_reduce is intentionally not used here because cost-visibility threshold
is met at the strategy level only in MapReduceRAG.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
retriever
|
BaseRetriever
|
The retriever component used to fetch chunks for each sub-query. |
required |
Returns:
| Name | Type | Description |
|---|---|---|
Pipeline |
Pipeline
|
A new Pipeline with state_type=_QueryExpansionState (a StrategyState
subclass that adds the intermediate |
Raises:
| Type | Description |
|---|---|
RuntimeError
|
If retriever is None. |
fuse(fn, input_state, output_state, name=None)
Returns a gllm-pipeline transform step wrapping the given fusion function.
This convenience function creates a StateOperatorStep that applies the specified fusion function to chunk lists from the pipeline state.
The fusion function receives a list of chunk lists and returns a single fused list of chunks. Named strategies "rrf" (Reciprocal Rank Fusion) and "concat" (concatenation with deduplication) are built-in.
Examples:
# Using named strategy
fuse_step = fuse(fn="rrf", input_state="chunk_lists", output_state="chunks")
# Using custom callable
fuse_step = fuse(
fn=lambda chunk_lists: chunk_lists[0],
input_state="chunk_lists",
output_state="chunks",
)
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
fn
|
str | Callable[[list[list[Chunk]]], list[Chunk]]
|
Fusion function name ("rrf" or "concat") or a custom callable that accepts list[list[Chunk]] and returns list[Chunk]. |
required |
input_state
|
str
|
State key containing the list of chunk lists to fuse. |
required |
output_state
|
str
|
State key to store the fused result under. |
required |
name
|
str | None
|
A unique identifier for this pipeline step. Defaults to None, in which case the name will be auto-generated. |
None
|
Returns:
| Name | Type | Description |
|---|---|---|
StateOperatorStep |
StateOperatorStep
|
A pipeline step configured to run the fusion operation. |
Raises:
| Type | Description |
|---|---|
ValueError
|
If fn is an unknown string name. |
TypeError
|
If fn is neither a string nor a callable. |