Skip to content

Composer

Fluent composer for building pipelines.

Provides a chainable API to compose common step types and build a Pipeline instance.

Usage

The Composer can be used directly as a chainable builder for a Pipeline.

from gllm_pipeline.pipeline.composer import Composer

composer = (
    Composer()
    .step(MyRetriever(), {"query": "query"}, "retrieved")
    .log("Retrieved {retrieved}")
    .transform(lambda x: x["retrieved"], ["retrieved"], "kept")
    .no_op()
    .terminate()
)

pipeline = composer.done()
Authors

Dimitrij Ray (dimitrij.ray@gdplabs.id)

References

NONE

Composer(pipeline=None)

Fluent API for composing a pipeline out of simple steps.

The composer accumulates steps in order and can build a Pipeline with a specified state type, recursion limit, and optional name.

The Composer uses a manager-style API: 1. When initialized with an existing Pipeline instance, it manages the pipeline in place. 2. If none is provided, a new empty Pipeline with the default RAGState is created.

Examples:

# With existing pipeline
composer = Composer(existing_pipeline).no_op().terminate()
pipeline = composer.done()
# With new pipeline
composer = Composer().no_op().terminate()
pipeline = composer.done()

Create a composer that manages a pipeline instance.

Parameters:

Name Type Description Default
pipeline Pipeline | None

Pipeline to modify. Defaults to None, in which case a new empty pipeline with the default RAGState is created.

None

bundle(input_states, output_state, retry_config=None, error_handler=None, cache_store=None, cache_config=None, name=None)

Append a bundle step to combine multiple state keys.

Parameters:

Name Type Description Default
input_states list[str] | dict[str, str]

List of keys to bundle as-is or mapping of output field names to source state keys.

required
output_state str | list[str]

State key(s) to store the bundled result.

required
retry_config RetryConfig | None

Retry configuration for the step. Defaults to None.

None
error_handler BaseStepErrorHandler | None

Error handler for this step. Defaults to None.

None
cache_store BaseCache | None

Cache store for this step. Defaults to None.

None
cache_config dict[str, Any] | None

Cache configuration for this step. Defaults to None.

None
name str | None

Optional name for the step. Defaults to None.

None

Returns:

Name Type Description
Self Self

The composer instance with this step appended.

copy(input_state, output_state, retry_config=None, error_handler=None, cache_store=None, cache_config=None, name=None)

Append a copy step to copy input state(s) to output state(s).

This method creates a step that copies data from input state(s) to output state(s) without transformation. The function handles different scenarios: 1. Single input to single output: Direct copy 2. Single input to multiple outputs: Broadcast the input to all outputs 3. Multiple inputs to single output: Pack all inputs into a list 4. Multiple inputs to multiple outputs: Copy each input to corresponding output (must have same length)

Parameters:

Name Type Description Default
input_state str | list[str]

Input state key(s) to copy from.

required
output_state str | list[str]

Output state key(s) to copy to.

required
retry_config RetryConfig | None

Configuration for retry behavior using GLLM Core's RetryConfig. Defaults to None, in which case no retry config is applied.

None
error_handler BaseStepErrorHandler | None

Error handler to use for this step. Defaults to None, in which case no error handler is used.

None
cache_store BaseCache | None

Cache store to be used for caching. Defaults to None, in which case no cache store is used.

None
cache_config dict[str, Any] | None

Cache configuration to be used for caching. Defaults to None, in which case no cache configuration is used.

None
name str | None

A unique identifier for this step. If None, a name will be auto-generated with the prefix "copy_". Defaults to None.

None

Returns:

Name Type Description
Self Self

The composer instance with this step appended.

done()

Return the composed Pipeline instance.

This does not build the execution graph. The graph is built when Pipeline.graph or Pipeline.build_graph() is accessed/called.

Returns:

Name Type Description
Pipeline Pipeline

The composed pipeline instance.

guard(condition, *, success_branch=None, failure_branch=None, input_map=None, output_state=None, retry_config=None, error_handler=None, cache_store=None, cache_config=None, name=None)

Create a guard conditional (builder-style or direct-style).

This method supports two usage patterns: 1) Builder-style (no success_branch provided): python composer.guard(condition) .on_success(success_steps) .on_failure(failure_steps) .end()

2) Direct-style (provide success_branch): python composer.guard( condition, success_branch=success_steps, failure_branch=failure_steps, )

Parameters:

Name Type Description Default
condition Component | Callable[[dict[str, Any]], bool]

The condition to evaluate. If a Component, it must return a boolean value. If a Callable, it must return a boolean value as well.

required
success_branch BasePipelineStep | list[BasePipelineStep] | None

Steps to execute if condition is true. If omitted (builder-style), a GuardComposer is returned to define branches via .on_success() and .on_failure(); if provided (direct-style), the guard step is created and appended immediately. Defaults to None, in which case a GuardComposer is returned.

None
failure_branch BasePipelineStep | list[BasePipelineStep] | None

Steps to execute if condition is false. If None, pipeline terminates immediately. Defaults to None.

None
input_map InputMapSpec | None

Unified input mapping for the condition. Defaults to None.

None
output_state str | None

Optional state key to store the condition result. Defaults to None.

None
retry_config RetryConfig | None

Configuration for retry behavior using GLLM Core's RetryConfig. Defaults to None.

None
error_handler BaseStepErrorHandler | None

Strategy to handle errors during execution. Defaults to None.

None
cache_store BaseCache | None

Cache store to be used for caching. Defaults to None.

None
cache_config dict[str, Any] | None

Cache configuration to be used for caching. Defaults to None.

None
name str | None

A unique identifier for the conditional step. If None, a name will be auto-generated. Defaults to None.

None

Returns:

Type Description
Self | GuardComposer

GuardComposer | Self: If success_branch is omitted, returns a GuardComposer builder; otherwise, returns the current composer after appending the constructed step.

if_else(condition, if_branch, else_branch, input_map=None, output_state=None, retry_config=None, error_handler=None, cache_store=None, cache_config=None, name=None)

Append a direct if/else conditional step.

This is the direct-style counterpart to the builder returned by when(...). Use this when you already have both branches available and prefer a single call.

Parameters:

Name Type Description Default
condition Component | Callable[[dict[str, Any]], bool]

The condition to evaluate.

required
if_branch BasePipelineStep | list[BasePipelineStep]

Step(s) to execute if condition is true.

required
else_branch BasePipelineStep | list[BasePipelineStep]

Step(s) to execute if condition is false.

required
input_map InputMapSpec | None

Unified input mapping for the condition. Defaults to None.

None
output_state str | None

Optional state key to store the condition result. Defaults to None.

None
retry_config RetryConfig | None

Configuration for retry behavior using GLLM Core's RetryConfig. Defaults to None.

None
error_handler BaseStepErrorHandler | None

Strategy to handle errors during execution. Defaults to None.

None
cache_store BaseCache | None

Cache store to be used for caching. Defaults to None.

None
cache_config dict[str, Any] | None

Cache configuration to be used for caching. Defaults to None.

None
name str | None

A unique identifier for the conditional step. If None, a name will be auto-generated. Defaults to None.

None

Returns:

Name Type Description
Self Self

The composer instance with this step appended.

log(message, is_template=True, emit_kwargs=None, retry_config=None, name=None, cache_store=None, cache_config=None)

Append a log step.

Parameters:

Name Type Description Default
message str

The message to log.

required
is_template bool

Whether the message is a template. Defaults to True.

True
emit_kwargs dict[str, Any] | None

Keyword arguments to pass to the emit function. Defaults to None.

None
retry_config RetryConfig | None

Configuration for retry behavior using GLLM Core's RetryConfig. Defaults to None, in which case no retry config is applied.

None
name str | None

A unique identifier for this step. If None, a name will be auto-generated with the prefix "step_". Defaults to None.

None
cache_store BaseCache | None

Cache store to be used for caching. Defaults to None, in which case no cache store is used.

None
cache_config dict[str, Any] | None

Cache configuration to be used for caching. Defaults to None, in which case no cache configuration is used.

None

Returns:

Name Type Description
Self Self

The composer instance with this step appended.

map_reduce(input_map, output_state, map_func, reduce_func=lambda results: results, retry_config=None, error_handler=None, cache_store=None, cache_config=None, name=None)

Append a map-reduce step.

Parameters:

Name Type Description Default
input_map InputMapSpec

Unified input mapping for the map function.

required
output_state str

Key to store the reduced result in the pipeline state.

required
map_func Component | Callable[[dict[str, Any]], Any]

Function to apply to each input item.

required
reduce_func Callable[[list[Any]], Any]

Function to reduce mapped results. Defaults to a function that returns the list of results as is.

lambda results: results
retry_config RetryConfig | None

Retry behavior configuration. Defaults to None, in which case no retry config is applied.

None
error_handler BaseStepErrorHandler | None

Error handler to use for this step. Defaults to None, in which case the default error handler is used.

None
cache_store BaseCache | None

Cache store to be used for caching. Defaults to None, in which case no cache store is used.

None
cache_config dict[str, Any] | None

Cache configuration to be used for caching. Defaults to None, in which case no cache configuration is used.

None
name str | None

Optional name for the step. Defaults to None.

None

Returns:

Name Type Description
Self Self

The composer instance with this step appended.

no_op(name=None)

Append a no-op step.

Parameters:

Name Type Description Default
name str | None

A unique identifier for this step. If None, a name will be auto-generated with the prefix "no_op_". Defaults to None.

None

Returns:

Name Type Description
Self Self

The composer instance with this step appended.

parallel(*, branches=None, squash=True, input_map=None, retry_config=None, error_handler=None, cache_store=None, cache_config=None, name=None)

Create a parallel step (builder-style or direct-style).

This method supports two usage patterns: 1) Builder-style (no branches provided): python composer.parallel(input_states=["query"], squash=True, name="p") .fork(step_a) .fork([step_b1, step_b2]) .end()

2) Direct-style (provide branches list or dict): python composer.parallel( branches=[step_a, [step_b1, step_b2]], input_states=["query"], squash=True, name="p_direct", )

Parameters:

Name Type Description Default
branches list[PipelineSteps] | dict[str, PipelineSteps] | None

Branches to execute in parallel. Each branch can be a single step or a list of steps to run sequentially. If omitted (builder-style), a ParallelComposer is returned to define forks via .fork(); if provided (direct-style), the parallel step is created and appended immediately. Defaults to None.

None
squash bool

Whether to squash execution into a single node (async gather). If True, the parallel execution is represented by a single node; if False, native graph structures are used. Defaults to True.

True
input_map InputMapSpec | None

Unified input mapping for all branches. Defaults to None.

None
retry_config RetryConfig | None

Retry behavior configuration. Defaults to None, in which case no retry config is applied.

None
error_handler BaseStepErrorHandler | None

Strategy to handle errors during execution. Defaults to None, in which case the default error handler is used.

None
cache_store BaseCache | None

Cache store to be used for caching. Defaults to None, in which case no cache store is used.

None
cache_config dict[str, Any] | None

Cache configuration to be used for caching. Defaults to None, in which case no cache configuration is used.

None
name str | None

A unique identifier for the parallel step. Defaults to None.

None

Returns:

Type Description
Self | ParallelComposer

ParallelComposer | Self: If branches is omitted, returns a ParallelComposer builder; otherwise,

Self | ParallelComposer

returns the current composer after appending the constructed step.

step(component, input_map, output_state=None, retry_config=None, error_handler=None, name=None, cache_store=None, cache_config=None)

Append a component step.

Parameters:

Name Type Description Default
component Component

The component to execute.

required
input_map InputMapSpec

Unified input mapping for the component.

required
output_state str | list[str] | None

Key to store the result in the pipeline state. Defaults to None.

None
retry_config RetryConfig | None

Configuration for retry behavior using GLLM Core's RetryConfig. Defaults to None, in which case no retry config is applied.

None
error_handler BaseStepErrorHandler | None

Error handler to use for this step. Defaults to None, in which case no error handler is used.

None
name str | None

A unique identifier for this step. If None, a name will be auto-generated with the prefix "step_". Defaults to None.

None
cache_store BaseCache | None

Cache store to be used for caching. Defaults to None, in which case no cache store is used.

None
cache_config dict[str, Any] | None

Cache configuration to be used for caching. Defaults to None, in which case no cache configuration is used.

None

Returns:

Name Type Description
Self Self

The composer instance with this step appended.

subgraph(subgraph, input_map=None, output_state_map=None, retry_config=None, error_handler=None, cache_store=None, cache_config=None, name=None)

Append a subgraph step that executes another pipeline.

Parameters:

Name Type Description Default
subgraph Pipeline

The pipeline to be executed as a subgraph.

required
input_map InputMapSpec | None

Unified input mapping for the subgraph. Defaults to None.

None
output_state_map dict[str, str] | None

Map parent state keys to subgraph output keys. Defaults to None, in which case all outputs are passed through.

None
retry_config RetryConfig | None

Retry behavior configuration. Defaults to None, in which case no retry config is applied.

None
error_handler BaseStepErrorHandler | None

Error handler to use for this step. Defaults to None, in which case the default error handler is used.

None
cache_store BaseCache | None

Cache store to be used for caching. Defaults to None, in which case no cache store is used.

None
cache_config dict[str, Any] | None

Cache configuration to be used for caching. Defaults to None, in which case no cache configuration is used.

None
name str | None

Optional name for the step. Defaults to None.

None

Returns:

Name Type Description
Self Self

The composer instance with this step appended.

switch(condition, *, branches=None, input_map=None, output_state=None, default=None, retry_config=None, error_handler=None, cache_store=None, cache_config=None, name=None)

Create a switch conditional (builder-style or direct-style).

This method supports two usage patterns: 1) Builder-style (no branches provided): python composer.switch(cond) .case("A", step_a) .case("B", step_b) .default(step_default) .end()

2) Direct-style (provide branches mapping): python composer.switch( cond, branches={"A": step_a, "B": [step_b1, step_b2]}, default=step_default, )

Parameters:

Name Type Description Default
condition Component | Callable[[dict[str, Any]], str]

The condition to evaluate. If a Component, it must return a string key for branch selection. If a Callable, it must return a string key as well.

required
branches dict[str, BasePipelineStep | list[BasePipelineStep]] | None

Mapping of case keys to step(s) to execute. If omitted (builder-style), a SwitchComposer is returned to define cases via .case() and .default(); if provided (direct-style), the switch step is created and appended immediately. Defaults to None, in which case a SwitchComposer is returned. Note that manually setting branches to None also results in a SwitchComposer being returned.

None
input_map InputMapSpec | None

Unified input mapping for the condition. Defaults to None.

None
output_state str | None

Optional state key to store the condition result. Defaults to None.

None
default BasePipelineStep | list[BasePipelineStep] | None

Fallback branch to use when the condition result is not present in branches. Defaults to None.

None
retry_config RetryConfig | None

Configuration for retry behavior using GLLM Core's RetryConfig. Defaults to None.

None
error_handler BaseStepErrorHandler | None

Strategy to handle errors during execution. Defaults to None.

None
cache_store BaseCache | None

Cache store to be used for caching. Defaults to None.

None
cache_config dict[str, Any] | None

Cache configuration to be used for caching. Defaults to None.

None
name str | None

A unique identifier for the conditional step. If None, a name will be auto-generated. Defaults to None.

None

Returns:

Type Description
Self | SwitchComposer

SwitchComposer | Self: If branches is omitted, returns a SwitchComposer builder; otherwise, returns the current composer after appending the constructed step.

terminate(name=None, retry_config=None, error_handler=None, cache_store=None, cache_config=None)

Append a terminator step.

Parameters:

Name Type Description Default
name str | None

A unique identifier for this step. If None, a name will be auto-generated with the prefix "terminator_". Defaults to None.

None
retry_config RetryConfig | None

Configuration for retry behavior using GLLM Core's RetryConfig. Defaults to None, in which case no retry config is applied.

None
error_handler BaseStepErrorHandler | None

Error handler to use for this step. Defaults to None, in which case no error handler is used.

None
cache_store BaseCache | None

Cache store to be used for caching. Defaults to None, in which case no cache store is used.

None
cache_config dict[str, Any] | None

Cache configuration to be used for caching. Defaults to None, in which case no cache configuration is used.

None

Returns:

Name Type Description
Self Self

The composer instance with this step appended.

toggle(condition, *, if_branch=None, input_map=None, output_state=None, retry_config=None, error_handler=None, cache_store=None, cache_config=None, name=None)

Create a toggle conditional (builder-style or direct-style).

This method supports two usage patterns: 1) Builder-style (no if_branch provided): python composer.toggle(condition) .then(enabled_steps) .end()

2) Direct-style (provide if_branch): python composer.toggle( condition, if_branch=enabled_steps, )

Parameters:

Name Type Description Default
condition Component | Callable[[dict[str, Any]], bool] | str

The condition to evaluate. If a Component, it must return a boolean value. If a Callable, it must return a boolean value. If a str, it will be looked up in the merged state data.

required
if_branch BasePipelineStep | list[BasePipelineStep] | None

Steps to execute if condition is true. If omitted (builder-style), a ToggleComposer is returned to define the branch via .then(); if provided (direct-style), the toggle step is created and appended immediately. Defaults to None, in which case a ToggleComposer is returned.

None
input_map InputMapSpec | None

Unified input mapping for the condition. Defaults to None.

None
output_state str | None

Optional state key to store the condition result. Defaults to None.

None
retry_config RetryConfig | None

Configuration for retry behavior using GLLM Core's RetryConfig. Defaults to None.

None
error_handler BaseStepErrorHandler | None

Strategy to handle errors during execution. Defaults to None.

None
cache_store BaseCache | None

Cache store to be used for caching. Defaults to None.

None
cache_config dict[str, Any] | None

Cache configuration to be used for caching. Defaults to None.

None
name str | None

A unique identifier for the conditional step. If None, a name will be auto-generated. Defaults to None.

None

Returns:

Type Description
Self | ToggleComposer

ToggleComposer | Self: If if_branch is omitted, returns a ToggleComposer builder; otherwise, returns the current composer after appending the constructed step.

transform(operation, input_map, output_state, retry_config=None, error_handler=None, name=None, cache_store=None, cache_config=None)

Append a state operator step.

Parameters:

Name Type Description Default
operation Callable[[dict[str, Any]], Any]

The operation to apply to the state.

required
input_map InputMapSpec

Unified input mapping for the operation.

required
output_state str | list[str]

State key(s) to store the result in.

required
retry_config RetryConfig | None

Configuration for retry behavior using GLLM Core's RetryConfig. Defaults to None, in which case no retry config is applied.

None
error_handler BaseStepErrorHandler | None

Error handler to use for this step. Defaults to None, in which case no error handler is used.

None
name str | None

A unique identifier for this step. If None, a name will be auto-generated with the prefix "transform_". Defaults to None.

None
cache_store BaseCache | None

Cache store to be used for caching. Defaults to None, in which case no cache store is used.

None
cache_config dict[str, Any] | None

Cache configuration to be used for caching. Defaults to None, in which case no cache configuration is used.

None

Returns:

Name Type Description
Self Self

The composer instance with this step appended.

when(condition, input_map=None, output_state=None, retry_config=None, error_handler=None, cache_store=None, cache_config=None, name=None)

Begin an if/else conditional using a fluent builder.

Returns an IfElseComposer bound to this composer. Use .then(...) on the returned builder to set the true-branch, .otherwise(...) to set the false-branch, and .end() to finalize and return to the parent composer.

Parameters:

Name Type Description Default
condition Component | Callable[[dict[str, Any]], bool]

The condition to evaluate.

required
input_map InputMapSpec | None

Unified input mapping for this conditional. Defaults to None.

None
output_state str | None

Key to store the outcome state. Defaults to None.

None
retry_config RetryConfig | None

Retry behavior configuration. Defaults to None.

None
error_handler BaseStepErrorHandler | None

Error handler. Defaults to None.

None
cache_store BaseCache | None

Cache store to be used for caching. Defaults to None.

None
cache_config dict[str, Any] | None

Cache configuration. Defaults to None.

None
name str | None

Optional name for the step. Defaults to None.

None

Returns:

Name Type Description
IfElseComposer IfElseComposer

A builder to define the branches and finalize with .end().