Skip to content

Composer

Fluent pipeline composer package.

Exports the Composer class which provides a fluent API to build Pipeline instances using concise, readable chaining for common step types.

Composer(pipeline=None)

Fluent API for composing a pipeline out of simple steps.

The composer accumulates steps in order and can build a Pipeline with a specified state type, recursion limit, and optional name.

The Composer uses a manager-style API: 1. When initialized with an existing Pipeline instance, it manages the pipeline in place. 2. If none is provided, a new empty Pipeline with the default RAGState is created.

Examples:

# With existing pipeline
composer = Composer(existing_pipeline).no_op().terminate()
pipeline = composer.done()
# With new pipeline
composer = Composer().no_op().terminate()
pipeline = composer.done()

Create a composer that manages a pipeline instance.

Parameters:

Name Type Description Default
pipeline Pipeline | None

Pipeline to modify. Defaults to None, in which case a new empty pipeline with the default RAGState is created.

None

bundle(input_states, output_state, retry_config=None, error_handler=None, cache_store=None, cache_config=None, name=None)

Append a bundle step to combine multiple state keys.

Parameters:

Name Type Description Default
input_states list[str] | dict[str, str]

List of keys to bundle as-is or mapping of output field names to source state keys.

required
output_state str | list[str]

State key(s) to store the bundled result.

required
retry_config RetryConfig | None

Retry configuration for the step. Defaults to None.

None
error_handler BaseStepErrorHandler | None

Error handler for this step. Defaults to None.

None
cache_store BaseCache | None

Cache store for this step. Defaults to None.

None
cache_config dict[str, Any] | None

Cache configuration for this step. Defaults to None.

None
name str | None

Optional name for the step. Defaults to None.

None

Returns:

Name Type Description
Self Self

The composer instance with this step appended.

copy(input_state, output_state, retry_config=None, error_handler=None, cache_store=None, cache_config=None, name=None)

Append a copy step to copy input state(s) to output state(s).

This method creates a step that copies data from input state(s) to output state(s) without transformation. The function handles different scenarios: 1. Single input to single output: Direct copy 2. Single input to multiple outputs: Broadcast the input to all outputs 3. Multiple inputs to single output: Pack all inputs into a list 4. Multiple inputs to multiple outputs: Copy each input to corresponding output (must have same length)

Parameters:

Name Type Description Default
input_state str | list[str]

Input state key(s) to copy from.

required
output_state str | list[str]

Output state key(s) to copy to.

required
retry_config RetryConfig | None

Configuration for retry behavior using GLLM Core's RetryConfig. Defaults to None, in which case no retry config is applied.

None
error_handler BaseStepErrorHandler | None

Error handler to use for this step. Defaults to None, in which case no error handler is used.

None
cache_store BaseCache | None

Cache store to be used for caching. Defaults to None, in which case no cache store is used.

None
cache_config dict[str, Any] | None

Cache configuration to be used for caching. Defaults to None, in which case no cache configuration is used.

None
name str | None

A unique identifier for this step. If None, a name will be auto-generated with the prefix "copy_". Defaults to None.

None

Returns:

Name Type Description
Self Self

The composer instance with this step appended.

done()

Return the composed Pipeline instance.

This does not build the execution graph. The graph is built when Pipeline.graph or Pipeline.build_graph() is accessed/called.

Returns:

Name Type Description
Pipeline Pipeline

The composed pipeline instance.

guard(condition, *, success_branch=None, failure_branch=None, input_map=None, output_state=None, retry_config=None, error_handler=None, cache_store=None, cache_config=None, name=None)

Create a guard conditional (builder-style or direct-style).

This method supports two usage patterns: 1) Builder-style (no success_branch provided): python composer.guard(condition) .on_success(success_steps) .on_failure(failure_steps) .end()

2) Direct-style (provide success_branch): python composer.guard( condition, success_branch=success_steps, failure_branch=failure_steps, )

Parameters:

Name Type Description Default
condition Component | Callable[[dict[str, Any]], bool]

The condition to evaluate. If a Component, it must return a boolean value. If a Callable, it must return a boolean value as well.

required
success_branch BasePipelineStep | list[BasePipelineStep] | None

Steps to execute if condition is true. If omitted (builder-style), a GuardComposer is returned to define branches via .on_success() and .on_failure(); if provided (direct-style), the guard step is created and appended immediately. Defaults to None, in which case a GuardComposer is returned.

None
failure_branch BasePipelineStep | list[BasePipelineStep] | None

Steps to execute if condition is false. If None, pipeline terminates immediately. Defaults to None.

None
input_map InputMapSpec | None

Unified input mapping for the condition. Defaults to None.

None
output_state str | None

Optional state key to store the condition result. Defaults to None.

None
retry_config RetryConfig | None

Configuration for retry behavior using GLLM Core's RetryConfig. Defaults to None.

None
error_handler BaseStepErrorHandler | None

Strategy to handle errors during execution. Defaults to None.

None
cache_store BaseCache | None

Cache store to be used for caching. Defaults to None.

None
cache_config dict[str, Any] | None

Cache configuration to be used for caching. Defaults to None.

None
name str | None

A unique identifier for the conditional step. If None, a name will be auto-generated. Defaults to None.

None

Returns:

Type Description
Self | GuardComposer

GuardComposer | Self: If success_branch is omitted, returns a GuardComposer builder; otherwise, returns the current composer after appending the constructed step.

if_else(condition, if_branch, else_branch, input_map=None, output_state=None, retry_config=None, error_handler=None, cache_store=None, cache_config=None, name=None)

Append a direct if/else conditional step.

This is the direct-style counterpart to the builder returned by when(...). Use this when you already have both branches available and prefer a single call.

Parameters:

Name Type Description Default
condition Component | Callable[[dict[str, Any]], bool]

The condition to evaluate.

required
if_branch BasePipelineStep | list[BasePipelineStep]

Step(s) to execute if condition is true.

required
else_branch BasePipelineStep | list[BasePipelineStep]

Step(s) to execute if condition is false.

required
input_map InputMapSpec | None

Unified input mapping for the condition. Defaults to None.

None
output_state str | None

Optional state key to store the condition result. Defaults to None.

None
retry_config RetryConfig | None

Configuration for retry behavior using GLLM Core's RetryConfig. Defaults to None.

None
error_handler BaseStepErrorHandler | None

Strategy to handle errors during execution. Defaults to None.

None
cache_store BaseCache | None

Cache store to be used for caching. Defaults to None.

None
cache_config dict[str, Any] | None

Cache configuration to be used for caching. Defaults to None.

None
name str | None

A unique identifier for the conditional step. If None, a name will be auto-generated. Defaults to None.

None

Returns:

Name Type Description
Self Self

The composer instance with this step appended.

log(message, is_template=True, emit_kwargs=None, retry_config=None, name=None, cache_store=None, cache_config=None)

Append a log step.

Parameters:

Name Type Description Default
message str

The message to log.

required
is_template bool

Whether the message is a template. Defaults to True.

True
emit_kwargs dict[str, Any] | None

Keyword arguments to pass to the emit function. Defaults to None.

None
retry_config RetryConfig | None

Configuration for retry behavior using GLLM Core's RetryConfig. Defaults to None, in which case no retry config is applied.

None
name str | None

A unique identifier for this step. If None, a name will be auto-generated with the prefix "step_". Defaults to None.

None
cache_store BaseCache | None

Cache store to be used for caching. Defaults to None, in which case no cache store is used.

None
cache_config dict[str, Any] | None

Cache configuration to be used for caching. Defaults to None, in which case no cache configuration is used.

None

Returns:

Name Type Description
Self Self

The composer instance with this step appended.

map_reduce(input_map, output_state, map_func, reduce_func=lambda results: results, retry_config=None, error_handler=None, cache_store=None, cache_config=None, name=None)

Append a map-reduce step.

Parameters:

Name Type Description Default
input_map InputMapSpec

Unified input mapping for the map function.

required
output_state str

Key to store the reduced result in the pipeline state.

required
map_func Component | Callable[[dict[str, Any]], Any]

Function to apply to each input item.

required
reduce_func Callable[[list[Any]], Any]

Function to reduce mapped results. Defaults to a function that returns the list of results as is.

lambda results: results
retry_config RetryConfig | None

Retry behavior configuration. Defaults to None, in which case no retry config is applied.

None
error_handler BaseStepErrorHandler | None

Error handler to use for this step. Defaults to None, in which case the default error handler is used.

None
cache_store BaseCache | None

Cache store to be used for caching. Defaults to None, in which case no cache store is used.

None
cache_config dict[str, Any] | None

Cache configuration to be used for caching. Defaults to None, in which case no cache configuration is used.

None
name str | None

Optional name for the step. Defaults to None.

None

Returns:

Name Type Description
Self Self

The composer instance with this step appended.

no_op(name=None)

Append a no-op step.

Parameters:

Name Type Description Default
name str | None

A unique identifier for this step. If None, a name will be auto-generated with the prefix "no_op_". Defaults to None.

None

Returns:

Name Type Description
Self Self

The composer instance with this step appended.

parallel(*, branches=None, squash=True, input_map=None, retry_config=None, error_handler=None, cache_store=None, cache_config=None, name=None)

Create a parallel step (builder-style or direct-style).

This method supports two usage patterns: 1) Builder-style (no branches provided): python composer.parallel(input_states=["query"], squash=True, name="p") .fork(step_a) .fork([step_b1, step_b2]) .end()

2) Direct-style (provide branches list or dict): python composer.parallel( branches=[step_a, [step_b1, step_b2]], input_states=["query"], squash=True, name="p_direct", )

Parameters:

Name Type Description Default
branches list[PipelineSteps] | dict[str, PipelineSteps] | None

Branches to execute in parallel. Each branch can be a single step or a list of steps to run sequentially. If omitted (builder-style), a ParallelComposer is returned to define forks via .fork(); if provided (direct-style), the parallel step is created and appended immediately. Defaults to None.

None
squash bool

Whether to squash execution into a single node (async gather). If True, the parallel execution is represented by a single node; if False, native graph structures are used. Defaults to True.

True
input_map InputMapSpec | None

Unified input mapping for all branches. Defaults to None.

None
retry_config RetryConfig | None

Retry behavior configuration. Defaults to None, in which case no retry config is applied.

None
error_handler BaseStepErrorHandler | None

Strategy to handle errors during execution. Defaults to None, in which case the default error handler is used.

None
cache_store BaseCache | None

Cache store to be used for caching. Defaults to None, in which case no cache store is used.

None
cache_config dict[str, Any] | None

Cache configuration to be used for caching. Defaults to None, in which case no cache configuration is used.

None
name str | None

A unique identifier for the parallel step. Defaults to None.

None

Returns:

Type Description
Self | ParallelComposer

ParallelComposer | Self: If branches is omitted, returns a ParallelComposer builder; otherwise,

Self | ParallelComposer

returns the current composer after appending the constructed step.

step(component, input_map, output_state=None, retry_config=None, error_handler=None, name=None, cache_store=None, cache_config=None)

Append a component step.

Parameters:

Name Type Description Default
component Component

The component to execute.

required
input_map InputMapSpec

Unified input mapping for the component.

required
output_state str | list[str] | None

Key to store the result in the pipeline state. Defaults to None.

None
retry_config RetryConfig | None

Configuration for retry behavior using GLLM Core's RetryConfig. Defaults to None, in which case no retry config is applied.

None
error_handler BaseStepErrorHandler | None

Error handler to use for this step. Defaults to None, in which case no error handler is used.

None
name str | None

A unique identifier for this step. If None, a name will be auto-generated with the prefix "step_". Defaults to None.

None
cache_store BaseCache | None

Cache store to be used for caching. Defaults to None, in which case no cache store is used.

None
cache_config dict[str, Any] | None

Cache configuration to be used for caching. Defaults to None, in which case no cache configuration is used.

None

Returns:

Name Type Description
Self Self

The composer instance with this step appended.

subgraph(subgraph, input_map=None, output_state_map=None, retry_config=None, error_handler=None, cache_store=None, cache_config=None, name=None)

Append a subgraph step that executes another pipeline.

Parameters:

Name Type Description Default
subgraph Pipeline

The pipeline to be executed as a subgraph.

required
input_map InputMapSpec | None

Unified input mapping for the subgraph. Defaults to None.

None
output_state_map dict[str, str] | None

Map parent state keys to subgraph output keys. Defaults to None, in which case all outputs are passed through.

None
retry_config RetryConfig | None

Retry behavior configuration. Defaults to None, in which case no retry config is applied.

None
error_handler BaseStepErrorHandler | None

Error handler to use for this step. Defaults to None, in which case the default error handler is used.

None
cache_store BaseCache | None

Cache store to be used for caching. Defaults to None, in which case no cache store is used.

None
cache_config dict[str, Any] | None

Cache configuration to be used for caching. Defaults to None, in which case no cache configuration is used.

None
name str | None

Optional name for the step. Defaults to None.

None

Returns:

Name Type Description
Self Self

The composer instance with this step appended.

switch(condition, *, branches=None, input_map=None, output_state=None, default=None, retry_config=None, error_handler=None, cache_store=None, cache_config=None, name=None)

Create a switch conditional (builder-style or direct-style).

This method supports two usage patterns: 1) Builder-style (no branches provided): python composer.switch(cond) .case("A", step_a) .case("B", step_b) .default(step_default) .end()

2) Direct-style (provide branches mapping): python composer.switch( cond, branches={"A": step_a, "B": [step_b1, step_b2]}, default=step_default, )

Parameters:

Name Type Description Default
condition Component | Callable[[dict[str, Any]], str]

The condition to evaluate. If a Component, it must return a string key for branch selection. If a Callable, it must return a string key as well.

required
branches dict[str, BasePipelineStep | list[BasePipelineStep]] | None

Mapping of case keys to step(s) to execute. If omitted (builder-style), a SwitchComposer is returned to define cases via .case() and .default(); if provided (direct-style), the switch step is created and appended immediately. Defaults to None, in which case a SwitchComposer is returned. Note that manually setting branches to None also results in a SwitchComposer being returned.

None
input_map InputMapSpec | None

Unified input mapping for the condition. Defaults to None.

None
output_state str | None

Optional state key to store the condition result. Defaults to None.

None
default BasePipelineStep | list[BasePipelineStep] | None

Fallback branch to use when the condition result is not present in branches. Defaults to None.

None
retry_config RetryConfig | None

Configuration for retry behavior using GLLM Core's RetryConfig. Defaults to None.

None
error_handler BaseStepErrorHandler | None

Strategy to handle errors during execution. Defaults to None.

None
cache_store BaseCache | None

Cache store to be used for caching. Defaults to None.

None
cache_config dict[str, Any] | None

Cache configuration to be used for caching. Defaults to None.

None
name str | None

A unique identifier for the conditional step. If None, a name will be auto-generated. Defaults to None.

None

Returns:

Type Description
Self | SwitchComposer

SwitchComposer | Self: If branches is omitted, returns a SwitchComposer builder; otherwise, returns the current composer after appending the constructed step.

terminate(name=None, retry_config=None, error_handler=None, cache_store=None, cache_config=None)

Append a terminator step.

Parameters:

Name Type Description Default
name str | None

A unique identifier for this step. If None, a name will be auto-generated with the prefix "terminator_". Defaults to None.

None
retry_config RetryConfig | None

Configuration for retry behavior using GLLM Core's RetryConfig. Defaults to None, in which case no retry config is applied.

None
error_handler BaseStepErrorHandler | None

Error handler to use for this step. Defaults to None, in which case no error handler is used.

None
cache_store BaseCache | None

Cache store to be used for caching. Defaults to None, in which case no cache store is used.

None
cache_config dict[str, Any] | None

Cache configuration to be used for caching. Defaults to None, in which case no cache configuration is used.

None

Returns:

Name Type Description
Self Self

The composer instance with this step appended.

toggle(condition, *, if_branch=None, input_map=None, output_state=None, retry_config=None, error_handler=None, cache_store=None, cache_config=None, name=None)

Create a toggle conditional (builder-style or direct-style).

This method supports two usage patterns: 1) Builder-style (no if_branch provided): python composer.toggle(condition) .then(enabled_steps) .end()

2) Direct-style (provide if_branch): python composer.toggle( condition, if_branch=enabled_steps, )

Parameters:

Name Type Description Default
condition Component | Callable[[dict[str, Any]], bool] | str

The condition to evaluate. If a Component, it must return a boolean value. If a Callable, it must return a boolean value. If a str, it will be looked up in the merged state data.

required
if_branch BasePipelineStep | list[BasePipelineStep] | None

Steps to execute if condition is true. If omitted (builder-style), a ToggleComposer is returned to define the branch via .then(); if provided (direct-style), the toggle step is created and appended immediately. Defaults to None, in which case a ToggleComposer is returned.

None
input_map InputMapSpec | None

Unified input mapping for the condition. Defaults to None.

None
output_state str | None

Optional state key to store the condition result. Defaults to None.

None
retry_config RetryConfig | None

Configuration for retry behavior using GLLM Core's RetryConfig. Defaults to None.

None
error_handler BaseStepErrorHandler | None

Strategy to handle errors during execution. Defaults to None.

None
cache_store BaseCache | None

Cache store to be used for caching. Defaults to None.

None
cache_config dict[str, Any] | None

Cache configuration to be used for caching. Defaults to None.

None
name str | None

A unique identifier for the conditional step. If None, a name will be auto-generated. Defaults to None.

None

Returns:

Type Description
Self | ToggleComposer

ToggleComposer | Self: If if_branch is omitted, returns a ToggleComposer builder; otherwise, returns the current composer after appending the constructed step.

transform(operation, input_map, output_state, retry_config=None, error_handler=None, name=None, cache_store=None, cache_config=None)

Append a state operator step.

Parameters:

Name Type Description Default
operation Callable[[dict[str, Any]], Any]

The operation to apply to the state.

required
input_map InputMapSpec

Unified input mapping for the operation.

required
output_state str | list[str]

State key(s) to store the result in.

required
retry_config RetryConfig | None

Configuration for retry behavior using GLLM Core's RetryConfig. Defaults to None, in which case no retry config is applied.

None
error_handler BaseStepErrorHandler | None

Error handler to use for this step. Defaults to None, in which case no error handler is used.

None
name str | None

A unique identifier for this step. If None, a name will be auto-generated with the prefix "transform_". Defaults to None.

None
cache_store BaseCache | None

Cache store to be used for caching. Defaults to None, in which case no cache store is used.

None
cache_config dict[str, Any] | None

Cache configuration to be used for caching. Defaults to None, in which case no cache configuration is used.

None

Returns:

Name Type Description
Self Self

The composer instance with this step appended.

when(condition, input_map=None, output_state=None, retry_config=None, error_handler=None, cache_store=None, cache_config=None, name=None)

Begin an if/else conditional using a fluent builder.

Returns an IfElseComposer bound to this composer. Use .then(...) on the returned builder to set the true-branch, .otherwise(...) to set the false-branch, and .end() to finalize and return to the parent composer.

Parameters:

Name Type Description Default
condition Component | Callable[[dict[str, Any]], bool]

The condition to evaluate.

required
input_map InputMapSpec | None

Unified input mapping for this conditional. Defaults to None.

None
output_state str | None

Key to store the outcome state. Defaults to None.

None
retry_config RetryConfig | None

Retry behavior configuration. Defaults to None.

None
error_handler BaseStepErrorHandler | None

Error handler. Defaults to None.

None
cache_store BaseCache | None

Cache store to be used for caching. Defaults to None.

None
cache_config dict[str, Any] | None

Cache configuration. Defaults to None.

None
name str | None

Optional name for the step. Defaults to None.

None

Returns:

Name Type Description
IfElseComposer IfElseComposer

A builder to define the branches and finalize with .end().

GuardComposer(parent, condition, output_state=None, input_map=None, retry_config=None, error_handler=None, cache_store=None, cache_config=None, name=None)

Fluent builder for a guard conditional.

Usage

composer.guard(condition).on_success(success_branch).on_failure(failure_branch).end() composer.guard(condition).on_success(success_branch).end() # failure defaults to termination

After setting the branches, call .end() to append the guard step and return back to the parent Composer.

Initialize the GuardComposer.

Parameters:

Name Type Description Default
parent Composer

The parent composer instance.

required
condition Component | Callable[[dict[str, Any]], bool]

The condition to evaluate.

required
output_state str | None

Optional state key to store condition result. Defaults to None.

None
input_map InputMapSpec | None

Unified input mapping for the condition. Defaults to None.

None
retry_config RetryConfig | None

Retry configuration. Defaults to None.

None
error_handler BaseStepErrorHandler | None

Error handler. Defaults to None.

None
cache_store BaseCache | None

Optional cache store. Defaults to None.

None
cache_config dict[str, Any] | None

Optional cache config. Defaults to None.

None
name str | None

Optional name for the resulting step. Defaults to None, in which case a name will be auto-generated with the prefix "Guard_".

None

end()

Finalize and append the guard step to the parent composer.

Returns:

Name Type Description
Composer Composer

The parent composer for continued chaining.

on_failure(branch)

Define the branch to execute when the condition is false (failed).

Parameters:

Name Type Description Default
branch BasePipelineStep | list[BasePipelineStep]

The step(s) for the failure branch.

required

Returns:

Name Type Description
Self Self

The builder instance for chaining.

on_success(branch)

Define the branch to execute when the condition is true (successful).

Parameters:

Name Type Description Default
branch BasePipelineStep | list[BasePipelineStep]

The step(s) for the success branch.

required

Returns:

Name Type Description
Self Self

The builder instance for chaining.

IfElseComposer(parent, condition, output_state=None, input_map=None, retry_config=None, error_handler=None, cache_store=None, cache_config=None, name=None)

Fluent builder for an if/else conditional.

Usage

composer.when(cond).then(if_branch).otherwise(else_branch).end()

After setting both branches, call .end() to append the conditional step and return back to the parent Composer.

Initialize the IfElseComposer.

Parameters:

Name Type Description Default
parent Composer

The parent composer instance.

required
condition Component | Callable[[dict[str, Any]], bool]

The condition to evaluate.

required
output_state str | None

Optional state key to store condition result. Defaults to None.

None
input_map InputMapSpec | None

Unified input mapping for the condition. Defaults to None.

None
retry_config RetryConfig | None

Retry configuration. Defaults to None.

None
error_handler BaseStepErrorHandler | None

Error handler. Defaults to None.

None
cache_store BaseCache | None

Optional cache store. Defaults to None.

None
cache_config dict[str, Any] | None

Optional cache config. Defaults to None.

None
name str | None

Optional name for the resulting step. Defaults to None, in which case a name will be auto-generated with the prefix "IfElse_".

None

end()

Finalize and append the conditional step to the parent composer.

Returns:

Name Type Description
Composer Composer

The parent composer for continued chaining.

otherwise(branch)

Define the branch to execute when the condition is false.

Parameters:

Name Type Description Default
branch BasePipelineStep | list[BasePipelineStep]

The step(s) for the false branch.

required

Returns:

Name Type Description
Self Self

The builder instance for chaining.

then(branch)

Define the branch to execute when the condition is true.

Parameters:

Name Type Description Default
branch BasePipelineStep | list[BasePipelineStep]

The step(s) for the true branch.

required

Returns:

Name Type Description
Self Self

The builder instance for chaining.

ParallelComposer(parent, *, squash=True, input_map=None, retry_config=None, error_handler=None, cache_store=None, cache_config=None, name=None)

Fluent builder for a parallel step.

Usage

composer.parallel(name="p").fork(step_a).fork([step_b1, step_b2]).end()

The builder collects forks as branches and constructs a ParallelStep using the functional helper _func.parallel(...).

Initialize the ParallelComposer.

Parameters:

Name Type Description Default
parent Composer

The parent composer instance.

required
squash bool

Whether to squash execution into a single node (async gather). Defaults to True.

True
input_map InputMapSpec | None

Unified input mapping for all branches. Defaults to None.

None
retry_config RetryConfig | None

Retry configuration. Defaults to None.

None
error_handler BaseStepErrorHandler | None

Error handler. Defaults to None.

None
cache_store BaseCache | None

Optional cache store. Defaults to None.

None
cache_config dict[str, Any] | None

Optional cache config. Defaults to None.

None
name str | None

Optional name for the resulting step. Defaults to None.

None

end()

Finalize and append the parallel step to the parent composer.

Returns:

Name Type Description
Composer Composer

The parent composer for continued chaining.

fork(branch)

Add a fork (branch) to execute in parallel.

Parameters:

Name Type Description Default
branch BasePipelineStep | list[BasePipelineStep]

Step(s) for this fork.

required

Returns:

Name Type Description
Self Self

The builder instance for chaining.

SwitchComposer(parent, condition, output_state=None, input_map=None, retry_config=None, error_handler=None, cache_store=None, cache_config=None, name=None)

Fluent builder for a switch conditional.

Usage

composer.switch(cond).case("A", step_a).case("B", step_b).end()

Optionally call .default(...) to set a fallback branch.

Initialize the SwitchComposer.

Parameters:

Name Type Description Default
parent Composer

The parent composer instance.

required
condition Component | Callable[[dict[str, Any]], str]

The condition to evaluate.

required
output_state str | None

Optional state key to store condition result. Defaults to None.

None
input_map InputMapSpec | None

Unified input mapping for the condition. Defaults to None.

None
retry_config RetryConfig | None

Retry configuration. Defaults to None.

None
error_handler BaseStepErrorHandler | None

Error handler. Defaults to None.

None
cache_store BaseCache | None

Optional cache store. Defaults to None.

None
cache_config dict[str, Any] | None

Optional cache config. Defaults to None.

None
name str | None

Optional name for the resulting step. Defaults to None, in which case a name will be auto-generated with the prefix "Switch_".

None

case(key, branch)

Add a case branch.

Parameters:

Name Type Description Default
key str

Case key to match against condition result.

required
branch BasePipelineStep | list[BasePipelineStep]

Step(s) to execute for this case.

required

Returns:

Name Type Description
Self Self

The builder instance for chaining.

default(branch)

Set the default branch for unmatched cases.

Parameters:

Name Type Description Default
branch BasePipelineStep | list[BasePipelineStep]

Default fallback branch.

required

Returns:

Name Type Description
Self Self

The builder instance for chaining.

end()

Finalize and append the switch conditional to the parent composer.

Returns:

Name Type Description
Composer Composer

The parent composer for continued chaining.

ToggleComposer(parent, condition, output_state=None, input_map=None, retry_config=None, error_handler=None, cache_store=None, cache_config=None, name=None)

Fluent builder for a toggle conditional.

Usage

composer.toggle(condition).then(enabled_branch).end()

After setting the enabled branch, call .end() to append the toggle step and return back to the parent Composer.

Initialize the ToggleComposer.

Parameters:

Name Type Description Default
parent Composer

The parent composer instance.

required
condition Component | Callable[[dict[str, Any]], bool] | str

The condition to evaluate.

required
output_state str | None

Optional state key to store condition result. Defaults to None.

None
input_map InputMapSpec | None

Unified input mapping for the condition. Defaults to None.

None
retry_config RetryConfig | None

Retry configuration. Defaults to None.

None
error_handler BaseStepErrorHandler | None

Error handler. Defaults to None.

None
cache_store BaseCache | None

Optional cache store. Defaults to None.

None
cache_config dict[str, Any] | None

Optional cache config. Defaults to None.

None
name str | None

Optional name for the resulting step. Defaults to None, in which case a name will be auto-generated with the prefix "Toggle_".

None

end()

Finalize and append the toggle step to the parent composer.

Returns:

Name Type Description
Composer Composer

The parent composer for continued chaining.

then(branch)

Define the branch to execute when the condition is true.

Parameters:

Name Type Description Default
branch BasePipelineStep | list[BasePipelineStep]

The step(s) for the enabled branch.

required

Returns:

Name Type Description
Self Self

The builder instance for chaining.