Composer
Fluent pipeline composer package.
Exports the Composer class which provides a fluent API to build Pipeline instances using concise, readable
chaining for common step types.
Composer(pipeline=None)
Fluent API for composing a pipeline out of simple steps.
The composer accumulates steps in order and can build a Pipeline with
a specified state type, recursion limit, and optional name.
The Composer uses a manager-style API:
1. When initialized with an existing Pipeline instance, it manages the pipeline in place.
2. If none is provided, a new empty Pipeline with the default RAGState is created.
Examples:
# With existing pipeline
composer = Composer(existing_pipeline).no_op().terminate()
pipeline = composer.done()
# With new pipeline
composer = Composer().no_op().terminate()
pipeline = composer.done()
Create a composer that manages a pipeline instance.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
pipeline |
Pipeline | None
|
Pipeline to modify. Defaults to None,
in which case a new empty pipeline with the default |
None
|
bundle(input_states, output_state, retry_config=None, error_handler=None, cache_store=None, cache_config=None, name=None)
Append a bundle step to combine multiple state keys.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
input_states |
list[str] | dict[str, str]
|
List of keys to bundle as-is or mapping of output field names to source state keys. |
required |
output_state |
str | list[str]
|
State key(s) to store the bundled result. |
required |
retry_config |
RetryConfig | None
|
Retry configuration for the step. Defaults to None. |
None
|
error_handler |
BaseStepErrorHandler | None
|
Error handler for this step. Defaults to None. |
None
|
cache_store |
BaseCache | None
|
Cache store for this step. Defaults to None. |
None
|
cache_config |
dict[str, Any] | None
|
Cache configuration for this step. Defaults to None. |
None
|
name |
str | None
|
Optional name for the step. Defaults to None. |
None
|
Returns:
| Name | Type | Description |
|---|---|---|
Self |
Self
|
The composer instance with this step appended. |
copy(input_state, output_state, retry_config=None, error_handler=None, cache_store=None, cache_config=None, name=None)
Append a copy step to copy input state(s) to output state(s).
This method creates a step that copies data from input state(s) to output state(s) without transformation. The function handles different scenarios: 1. Single input to single output: Direct copy 2. Single input to multiple outputs: Broadcast the input to all outputs 3. Multiple inputs to single output: Pack all inputs into a list 4. Multiple inputs to multiple outputs: Copy each input to corresponding output (must have same length)
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
input_state |
str | list[str]
|
Input state key(s) to copy from. |
required |
output_state |
str | list[str]
|
Output state key(s) to copy to. |
required |
retry_config |
RetryConfig | None
|
Configuration for retry behavior using GLLM Core's RetryConfig. Defaults to None, in which case no retry config is applied. |
None
|
error_handler |
BaseStepErrorHandler | None
|
Error handler to use for this step. Defaults to None, in which case no error handler is used. |
None
|
cache_store |
BaseCache | None
|
Cache store to be used for caching. Defaults to None, in which case no cache store is used. |
None
|
cache_config |
dict[str, Any] | None
|
Cache configuration to be used for caching. Defaults to None, in which case no cache configuration is used. |
None
|
name |
str | None
|
A unique identifier for this step. If None, a name will be auto-generated with the prefix "copy_". Defaults to None. |
None
|
Returns:
| Name | Type | Description |
|---|---|---|
Self |
Self
|
The composer instance with this step appended. |
done()
Return the composed Pipeline instance.
This does not build the execution graph. The graph is built when
Pipeline.graph or Pipeline.build_graph() is accessed/called.
Returns:
| Name | Type | Description |
|---|---|---|
Pipeline |
Pipeline
|
The composed pipeline instance. |
guard(condition, *, success_branch=None, failure_branch=None, input_map=None, output_state=None, retry_config=None, error_handler=None, cache_store=None, cache_config=None, name=None)
Create a guard conditional (builder-style or direct-style).
This method supports two usage patterns:
1) Builder-style (no success_branch provided):
python
composer.guard(condition) .on_success(success_steps) .on_failure(failure_steps) .end()
2) Direct-style (provide success_branch):
python
composer.guard(
condition,
success_branch=success_steps,
failure_branch=failure_steps,
)
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
condition |
Component | Callable[[dict[str, Any]], bool]
|
The condition to evaluate.
If a |
required |
success_branch |
BasePipelineStep | list[BasePipelineStep] | None
|
Steps to execute if condition is true. If omitted (builder-style), a |
None
|
failure_branch |
BasePipelineStep | list[BasePipelineStep] | None
|
Steps to execute if condition is false. If None, pipeline terminates immediately. Defaults to None. |
None
|
input_map |
InputMapSpec | None
|
Unified input mapping for the condition. Defaults to None. |
None
|
output_state |
str | None
|
Optional state key to store the condition result. Defaults to None. |
None
|
retry_config |
RetryConfig | None
|
Configuration for retry behavior using GLLM Core's
|
None
|
error_handler |
BaseStepErrorHandler | None
|
Strategy to handle errors during execution. Defaults to None. |
None
|
cache_store |
BaseCache | None
|
Cache store to be used for caching. Defaults to None. |
None
|
cache_config |
dict[str, Any] | None
|
Cache configuration to be used for caching. Defaults to None. |
None
|
name |
str | None
|
A unique identifier for the conditional step. If None, a name will be auto-generated. Defaults to None. |
None
|
Returns:
| Type | Description |
|---|---|
Self | GuardComposer
|
GuardComposer | Self: If |
if_else(condition, if_branch, else_branch, input_map=None, output_state=None, retry_config=None, error_handler=None, cache_store=None, cache_config=None, name=None)
Append a direct if/else conditional step.
This is the direct-style counterpart to the builder returned by when(...).
Use this when you already have both branches available and prefer a single call.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
condition |
Component | Callable[[dict[str, Any]], bool]
|
The condition to evaluate. |
required |
if_branch |
BasePipelineStep | list[BasePipelineStep]
|
Step(s) to execute if condition is true. |
required |
else_branch |
BasePipelineStep | list[BasePipelineStep]
|
Step(s) to execute if condition is false. |
required |
input_map |
InputMapSpec | None
|
Unified input mapping for the condition. Defaults to None. |
None
|
output_state |
str | None
|
Optional state key to store the condition result. Defaults to None. |
None
|
retry_config |
RetryConfig | None
|
Configuration for retry behavior using GLLM Core's
|
None
|
error_handler |
BaseStepErrorHandler | None
|
Strategy to handle errors during execution. Defaults to None. |
None
|
cache_store |
BaseCache | None
|
Cache store to be used for caching. Defaults to None. |
None
|
cache_config |
dict[str, Any] | None
|
Cache configuration to be used for caching. Defaults to None. |
None
|
name |
str | None
|
A unique identifier for the conditional step. If None, a name will be auto-generated. Defaults to None. |
None
|
Returns:
| Name | Type | Description |
|---|---|---|
Self |
Self
|
The composer instance with this step appended. |
log(message, is_template=True, emit_kwargs=None, retry_config=None, name=None, cache_store=None, cache_config=None)
Append a log step.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
message |
str
|
The message to log. |
required |
is_template |
bool
|
Whether the message is a template. Defaults to True. |
True
|
emit_kwargs |
dict[str, Any] | None
|
Keyword arguments to pass to the emit function. Defaults to None. |
None
|
retry_config |
RetryConfig | None
|
Configuration for retry behavior using GLLM Core's RetryConfig. Defaults to None, in which case no retry config is applied. |
None
|
name |
str | None
|
A unique identifier for this step. If None, a name will be auto-generated with the prefix "step_". Defaults to None. |
None
|
cache_store |
BaseCache | None
|
Cache store to be used for caching. Defaults to None, in which case no cache store is used. |
None
|
cache_config |
dict[str, Any] | None
|
Cache configuration to be used for caching. Defaults to None, in which case no cache configuration is used. |
None
|
Returns:
| Name | Type | Description |
|---|---|---|
Self |
Self
|
The composer instance with this step appended. |
map_reduce(input_map, output_state, map_func, reduce_func=lambda results: results, retry_config=None, error_handler=None, cache_store=None, cache_config=None, name=None)
Append a map-reduce step.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
input_map |
InputMapSpec
|
Unified input mapping for the map function. |
required |
output_state |
str
|
Key to store the reduced result in the pipeline state. |
required |
map_func |
Component | Callable[[dict[str, Any]], Any]
|
Function to apply to each input item. |
required |
reduce_func |
Callable[[list[Any]], Any]
|
Function to reduce mapped results. Defaults to a function that returns the list of results as is. |
lambda results: results
|
retry_config |
RetryConfig | None
|
Retry behavior configuration. Defaults to None, in which case no retry config is applied. |
None
|
error_handler |
BaseStepErrorHandler | None
|
Error handler to use for this step. Defaults to None, in which case the default error handler is used. |
None
|
cache_store |
BaseCache | None
|
Cache store to be used for caching. Defaults to None, in which case no cache store is used. |
None
|
cache_config |
dict[str, Any] | None
|
Cache configuration to be used for caching. Defaults to None, in which case no cache configuration is used. |
None
|
name |
str | None
|
Optional name for the step. Defaults to None. |
None
|
Returns:
| Name | Type | Description |
|---|---|---|
Self |
Self
|
The composer instance with this step appended. |
no_op(name=None)
Append a no-op step.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
name |
str | None
|
A unique identifier for this step. If None, a name will be auto-generated with the prefix "no_op_". Defaults to None. |
None
|
Returns:
| Name | Type | Description |
|---|---|---|
Self |
Self
|
The composer instance with this step appended. |
parallel(*, branches=None, squash=True, input_map=None, retry_config=None, error_handler=None, cache_store=None, cache_config=None, name=None)
Create a parallel step (builder-style or direct-style).
This method supports two usage patterns:
1) Builder-style (no branches provided):
python
composer.parallel(input_states=["query"], squash=True, name="p") .fork(step_a) .fork([step_b1, step_b2]) .end()
2) Direct-style (provide branches list or dict):
python
composer.parallel(
branches=[step_a, [step_b1, step_b2]],
input_states=["query"],
squash=True,
name="p_direct",
)
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
branches |
list[PipelineSteps] | dict[str, PipelineSteps] | None
|
Branches to execute in parallel. Each branch can be a single step or a list of steps to run
sequentially. If omitted (builder-style), a |
None
|
squash |
bool
|
Whether to squash execution into a single node (async gather). If True, the parallel execution is represented by a single node; if False, native graph structures are used. Defaults to True. |
True
|
input_map |
InputMapSpec | None
|
Unified input mapping for all branches. Defaults to None. |
None
|
retry_config |
RetryConfig | None
|
Retry behavior configuration. Defaults to None, in which case no retry config is applied. |
None
|
error_handler |
BaseStepErrorHandler | None
|
Strategy to handle errors during execution. Defaults to None, in which case the default error handler is used. |
None
|
cache_store |
BaseCache | None
|
Cache store to be used for caching. Defaults to None, in which case no cache store is used. |
None
|
cache_config |
dict[str, Any] | None
|
Cache configuration to be used for caching. Defaults to None, in which case no cache configuration is used. |
None
|
name |
str | None
|
A unique identifier for the parallel step. Defaults to None. |
None
|
Returns:
| Type | Description |
|---|---|
Self | ParallelComposer
|
ParallelComposer | Self: If |
Self | ParallelComposer
|
returns the current composer after appending the constructed step. |
step(component, input_map, output_state=None, retry_config=None, error_handler=None, name=None, cache_store=None, cache_config=None)
Append a component step.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
component |
Component
|
The component to execute. |
required |
input_map |
InputMapSpec
|
Unified input mapping for the component. |
required |
output_state |
str | list[str] | None
|
Key to store the result in the pipeline state. Defaults to None. |
None
|
retry_config |
RetryConfig | None
|
Configuration for retry behavior using GLLM Core's RetryConfig. Defaults to None, in which case no retry config is applied. |
None
|
error_handler |
BaseStepErrorHandler | None
|
Error handler to use for this step. Defaults to None, in which case no error handler is used. |
None
|
name |
str | None
|
A unique identifier for this step. If None, a name will be auto-generated with the prefix "step_". Defaults to None. |
None
|
cache_store |
BaseCache | None
|
Cache store to be used for caching. Defaults to None, in which case no cache store is used. |
None
|
cache_config |
dict[str, Any] | None
|
Cache configuration to be used for caching. Defaults to None, in which case no cache configuration is used. |
None
|
Returns:
| Name | Type | Description |
|---|---|---|
Self |
Self
|
The composer instance with this step appended. |
subgraph(subgraph, input_map=None, output_state_map=None, retry_config=None, error_handler=None, cache_store=None, cache_config=None, name=None)
Append a subgraph step that executes another pipeline.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
subgraph |
Pipeline
|
The pipeline to be executed as a subgraph. |
required |
input_map |
InputMapSpec | None
|
Unified input mapping for the subgraph. Defaults to None. |
None
|
output_state_map |
dict[str, str] | None
|
Map parent state keys to subgraph output keys. Defaults to None, in which case all outputs are passed through. |
None
|
retry_config |
RetryConfig | None
|
Retry behavior configuration. Defaults to None, in which case no retry config is applied. |
None
|
error_handler |
BaseStepErrorHandler | None
|
Error handler to use for this step. Defaults to None, in which case the default error handler is used. |
None
|
cache_store |
BaseCache | None
|
Cache store to be used for caching. Defaults to None, in which case no cache store is used. |
None
|
cache_config |
dict[str, Any] | None
|
Cache configuration to be used for caching. Defaults to None, in which case no cache configuration is used. |
None
|
name |
str | None
|
Optional name for the step. Defaults to None. |
None
|
Returns:
| Name | Type | Description |
|---|---|---|
Self |
Self
|
The composer instance with this step appended. |
switch(condition, *, branches=None, input_map=None, output_state=None, default=None, retry_config=None, error_handler=None, cache_store=None, cache_config=None, name=None)
Create a switch conditional (builder-style or direct-style).
This method supports two usage patterns:
1) Builder-style (no branches provided):
python
composer.switch(cond) .case("A", step_a) .case("B", step_b) .default(step_default) .end()
2) Direct-style (provide branches mapping):
python
composer.switch(
cond,
branches={"A": step_a, "B": [step_b1, step_b2]},
default=step_default,
)
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
condition |
Component | Callable[[dict[str, Any]], str]
|
The condition to evaluate.
If a |
required |
branches |
dict[str, BasePipelineStep | list[BasePipelineStep]] | None
|
Mapping of case keys to step(s) to execute. If omitted (builder-style), a |
None
|
input_map |
InputMapSpec | None
|
Unified input mapping for the condition. Defaults to None. |
None
|
output_state |
str | None
|
Optional state key to store the condition result. Defaults to None. |
None
|
default |
BasePipelineStep | list[BasePipelineStep] | None
|
Fallback branch to use when the
condition result is not present in |
None
|
retry_config |
RetryConfig | None
|
Configuration for retry behavior using GLLM Core's
|
None
|
error_handler |
BaseStepErrorHandler | None
|
Strategy to handle errors during execution. Defaults to None. |
None
|
cache_store |
BaseCache | None
|
Cache store to be used for caching. Defaults to None. |
None
|
cache_config |
dict[str, Any] | None
|
Cache configuration to be used for caching. Defaults to None. |
None
|
name |
str | None
|
A unique identifier for the conditional step. If None, a name will be auto-generated. Defaults to None. |
None
|
Returns:
| Type | Description |
|---|---|
Self | SwitchComposer
|
SwitchComposer | Self: If |
terminate(name=None, retry_config=None, error_handler=None, cache_store=None, cache_config=None)
Append a terminator step.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
name |
str | None
|
A unique identifier for this step. If None, a name will be auto-generated with the prefix "terminator_". Defaults to None. |
None
|
retry_config |
RetryConfig | None
|
Configuration for retry behavior using GLLM Core's RetryConfig. Defaults to None, in which case no retry config is applied. |
None
|
error_handler |
BaseStepErrorHandler | None
|
Error handler to use for this step. Defaults to None, in which case no error handler is used. |
None
|
cache_store |
BaseCache | None
|
Cache store to be used for caching. Defaults to None, in which case no cache store is used. |
None
|
cache_config |
dict[str, Any] | None
|
Cache configuration to be used for caching. Defaults to None, in which case no cache configuration is used. |
None
|
Returns:
| Name | Type | Description |
|---|---|---|
Self |
Self
|
The composer instance with this step appended. |
toggle(condition, *, if_branch=None, input_map=None, output_state=None, retry_config=None, error_handler=None, cache_store=None, cache_config=None, name=None)
Create a toggle conditional (builder-style or direct-style).
This method supports two usage patterns:
1) Builder-style (no if_branch provided):
python
composer.toggle(condition) .then(enabled_steps) .end()
2) Direct-style (provide if_branch):
python
composer.toggle(
condition,
if_branch=enabled_steps,
)
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
condition |
Component | Callable[[dict[str, Any]], bool] | str
|
The condition to evaluate.
If a |
required |
if_branch |
BasePipelineStep | list[BasePipelineStep] | None
|
Steps to execute if condition is true. If omitted (builder-style), a |
None
|
input_map |
InputMapSpec | None
|
Unified input mapping for the condition. Defaults to None. |
None
|
output_state |
str | None
|
Optional state key to store the condition result. Defaults to None. |
None
|
retry_config |
RetryConfig | None
|
Configuration for retry behavior using GLLM Core's
|
None
|
error_handler |
BaseStepErrorHandler | None
|
Strategy to handle errors during execution. Defaults to None. |
None
|
cache_store |
BaseCache | None
|
Cache store to be used for caching. Defaults to None. |
None
|
cache_config |
dict[str, Any] | None
|
Cache configuration to be used for caching. Defaults to None. |
None
|
name |
str | None
|
A unique identifier for the conditional step. If None, a name will be auto-generated. Defaults to None. |
None
|
Returns:
| Type | Description |
|---|---|
Self | ToggleComposer
|
ToggleComposer | Self: If |
transform(operation, input_map, output_state, retry_config=None, error_handler=None, name=None, cache_store=None, cache_config=None)
Append a state operator step.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
operation |
Callable[[dict[str, Any]], Any]
|
The operation to apply to the state. |
required |
input_map |
InputMapSpec
|
Unified input mapping for the operation. |
required |
output_state |
str | list[str]
|
State key(s) to store the result in. |
required |
retry_config |
RetryConfig | None
|
Configuration for retry behavior using GLLM Core's RetryConfig. Defaults to None, in which case no retry config is applied. |
None
|
error_handler |
BaseStepErrorHandler | None
|
Error handler to use for this step. Defaults to None, in which case no error handler is used. |
None
|
name |
str | None
|
A unique identifier for this step. If None, a name will be auto-generated with the prefix "transform_". Defaults to None. |
None
|
cache_store |
BaseCache | None
|
Cache store to be used for caching. Defaults to None, in which case no cache store is used. |
None
|
cache_config |
dict[str, Any] | None
|
Cache configuration to be used for caching. Defaults to None, in which case no cache configuration is used. |
None
|
Returns:
| Name | Type | Description |
|---|---|---|
Self |
Self
|
The composer instance with this step appended. |
when(condition, input_map=None, output_state=None, retry_config=None, error_handler=None, cache_store=None, cache_config=None, name=None)
Begin an if/else conditional using a fluent builder.
Returns an IfElseComposer bound to this composer. Use .then(...) on the returned
builder to set the true-branch, .otherwise(...) to set the false-branch, and .end()
to finalize and return to the parent composer.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
condition |
Component | Callable[[dict[str, Any]], bool]
|
The condition to evaluate. |
required |
input_map |
InputMapSpec | None
|
Unified input mapping for this conditional. Defaults to None. |
None
|
output_state |
str | None
|
Key to store the outcome state. Defaults to None. |
None
|
retry_config |
RetryConfig | None
|
Retry behavior configuration. Defaults to None. |
None
|
error_handler |
BaseStepErrorHandler | None
|
Error handler. Defaults to None. |
None
|
cache_store |
BaseCache | None
|
Cache store to be used for caching. Defaults to None. |
None
|
cache_config |
dict[str, Any] | None
|
Cache configuration. Defaults to None. |
None
|
name |
str | None
|
Optional name for the step. Defaults to None. |
None
|
Returns:
| Name | Type | Description |
|---|---|---|
IfElseComposer |
IfElseComposer
|
A builder to define the branches and finalize with |
GuardComposer(parent, condition, output_state=None, input_map=None, retry_config=None, error_handler=None, cache_store=None, cache_config=None, name=None)
Fluent builder for a guard conditional.
Usage
composer.guard(condition).on_success(success_branch).on_failure(failure_branch).end() composer.guard(condition).on_success(success_branch).end() # failure defaults to termination
After setting the branches, call .end() to append the guard step and
return back to the parent Composer.
Initialize the GuardComposer.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
parent |
Composer
|
The parent composer instance. |
required |
condition |
Component | Callable[[dict[str, Any]], bool]
|
The condition to evaluate. |
required |
output_state |
str | None
|
Optional state key to store condition result. Defaults to None. |
None
|
input_map |
InputMapSpec | None
|
Unified input mapping for the condition. Defaults to None. |
None
|
retry_config |
RetryConfig | None
|
Retry configuration. Defaults to None. |
None
|
error_handler |
BaseStepErrorHandler | None
|
Error handler. Defaults to None. |
None
|
cache_store |
BaseCache | None
|
Optional cache store. Defaults to None. |
None
|
cache_config |
dict[str, Any] | None
|
Optional cache config. Defaults to None. |
None
|
name |
str | None
|
Optional name for the resulting step. Defaults to None, in which case a name will be auto-generated with the prefix "Guard_". |
None
|
end()
Finalize and append the guard step to the parent composer.
Returns:
| Name | Type | Description |
|---|---|---|
Composer |
Composer
|
The parent composer for continued chaining. |
on_failure(branch)
Define the branch to execute when the condition is false (failed).
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
branch |
BasePipelineStep | list[BasePipelineStep]
|
The step(s) for the failure branch. |
required |
Returns:
| Name | Type | Description |
|---|---|---|
Self |
Self
|
The builder instance for chaining. |
on_success(branch)
Define the branch to execute when the condition is true (successful).
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
branch |
BasePipelineStep | list[BasePipelineStep]
|
The step(s) for the success branch. |
required |
Returns:
| Name | Type | Description |
|---|---|---|
Self |
Self
|
The builder instance for chaining. |
IfElseComposer(parent, condition, output_state=None, input_map=None, retry_config=None, error_handler=None, cache_store=None, cache_config=None, name=None)
Fluent builder for an if/else conditional.
Usage
composer.when(cond).then(if_branch).otherwise(else_branch).end()
After setting both branches, call .end() to append the conditional step and
return back to the parent Composer.
Initialize the IfElseComposer.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
parent |
Composer
|
The parent composer instance. |
required |
condition |
Component | Callable[[dict[str, Any]], bool]
|
The condition to evaluate. |
required |
output_state |
str | None
|
Optional state key to store condition result. Defaults to None. |
None
|
input_map |
InputMapSpec | None
|
Unified input mapping for the condition. Defaults to None. |
None
|
retry_config |
RetryConfig | None
|
Retry configuration. Defaults to None. |
None
|
error_handler |
BaseStepErrorHandler | None
|
Error handler. Defaults to None. |
None
|
cache_store |
BaseCache | None
|
Optional cache store. Defaults to None. |
None
|
cache_config |
dict[str, Any] | None
|
Optional cache config. Defaults to None. |
None
|
name |
str | None
|
Optional name for the resulting step. Defaults to None, in which case a name will be auto-generated with the prefix "IfElse_". |
None
|
end()
Finalize and append the conditional step to the parent composer.
Returns:
| Name | Type | Description |
|---|---|---|
Composer |
Composer
|
The parent composer for continued chaining. |
otherwise(branch)
Define the branch to execute when the condition is false.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
branch |
BasePipelineStep | list[BasePipelineStep]
|
The step(s) for the false branch. |
required |
Returns:
| Name | Type | Description |
|---|---|---|
Self |
Self
|
The builder instance for chaining. |
then(branch)
Define the branch to execute when the condition is true.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
branch |
BasePipelineStep | list[BasePipelineStep]
|
The step(s) for the true branch. |
required |
Returns:
| Name | Type | Description |
|---|---|---|
Self |
Self
|
The builder instance for chaining. |
ParallelComposer(parent, *, squash=True, input_map=None, retry_config=None, error_handler=None, cache_store=None, cache_config=None, name=None)
Fluent builder for a parallel step.
Usage
composer.parallel(name="p").fork(step_a).fork([step_b1, step_b2]).end()
The builder collects forks as branches and constructs a ParallelStep
using the functional helper _func.parallel(...).
Initialize the ParallelComposer.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
parent |
Composer
|
The parent composer instance. |
required |
squash |
bool
|
Whether to squash execution into a single node (async gather). Defaults to True. |
True
|
input_map |
InputMapSpec | None
|
Unified input mapping for all branches. Defaults to None. |
None
|
retry_config |
RetryConfig | None
|
Retry configuration. Defaults to None. |
None
|
error_handler |
BaseStepErrorHandler | None
|
Error handler. Defaults to None. |
None
|
cache_store |
BaseCache | None
|
Optional cache store. Defaults to None. |
None
|
cache_config |
dict[str, Any] | None
|
Optional cache config. Defaults to None. |
None
|
name |
str | None
|
Optional name for the resulting step. Defaults to None. |
None
|
end()
Finalize and append the parallel step to the parent composer.
Returns:
| Name | Type | Description |
|---|---|---|
Composer |
Composer
|
The parent composer for continued chaining. |
fork(branch)
Add a fork (branch) to execute in parallel.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
branch |
BasePipelineStep | list[BasePipelineStep]
|
Step(s) for this fork. |
required |
Returns:
| Name | Type | Description |
|---|---|---|
Self |
Self
|
The builder instance for chaining. |
SwitchComposer(parent, condition, output_state=None, input_map=None, retry_config=None, error_handler=None, cache_store=None, cache_config=None, name=None)
Fluent builder for a switch conditional.
Usage
composer.switch(cond).case("A", step_a).case("B", step_b).end()
Optionally call .default(...) to set a fallback branch.
Initialize the SwitchComposer.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
parent |
Composer
|
The parent composer instance. |
required |
condition |
Component | Callable[[dict[str, Any]], str]
|
The condition to evaluate. |
required |
output_state |
str | None
|
Optional state key to store condition result. Defaults to None. |
None
|
input_map |
InputMapSpec | None
|
Unified input mapping for the condition. Defaults to None. |
None
|
retry_config |
RetryConfig | None
|
Retry configuration. Defaults to None. |
None
|
error_handler |
BaseStepErrorHandler | None
|
Error handler. Defaults to None. |
None
|
cache_store |
BaseCache | None
|
Optional cache store. Defaults to None. |
None
|
cache_config |
dict[str, Any] | None
|
Optional cache config. Defaults to None. |
None
|
name |
str | None
|
Optional name for the resulting step. Defaults to None, in which case a name will be auto-generated with the prefix "Switch_". |
None
|
case(key, branch)
Add a case branch.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
key |
str
|
Case key to match against condition result. |
required |
branch |
BasePipelineStep | list[BasePipelineStep]
|
Step(s) to execute for this case. |
required |
Returns:
| Name | Type | Description |
|---|---|---|
Self |
Self
|
The builder instance for chaining. |
default(branch)
Set the default branch for unmatched cases.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
branch |
BasePipelineStep | list[BasePipelineStep]
|
Default fallback branch. |
required |
Returns:
| Name | Type | Description |
|---|---|---|
Self |
Self
|
The builder instance for chaining. |
end()
Finalize and append the switch conditional to the parent composer.
Returns:
| Name | Type | Description |
|---|---|---|
Composer |
Composer
|
The parent composer for continued chaining. |
ToggleComposer(parent, condition, output_state=None, input_map=None, retry_config=None, error_handler=None, cache_store=None, cache_config=None, name=None)
Fluent builder for a toggle conditional.
Usage
composer.toggle(condition).then(enabled_branch).end()
After setting the enabled branch, call .end() to append the toggle step and
return back to the parent Composer.
Initialize the ToggleComposer.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
parent |
Composer
|
The parent composer instance. |
required |
condition |
Component | Callable[[dict[str, Any]], bool] | str
|
The condition to evaluate. |
required |
output_state |
str | None
|
Optional state key to store condition result. Defaults to None. |
None
|
input_map |
InputMapSpec | None
|
Unified input mapping for the condition. Defaults to None. |
None
|
retry_config |
RetryConfig | None
|
Retry configuration. Defaults to None. |
None
|
error_handler |
BaseStepErrorHandler | None
|
Error handler. Defaults to None. |
None
|
cache_store |
BaseCache | None
|
Optional cache store. Defaults to None. |
None
|
cache_config |
dict[str, Any] | None
|
Optional cache config. Defaults to None. |
None
|
name |
str | None
|
Optional name for the resulting step. Defaults to None, in which case a name will be auto-generated with the prefix "Toggle_". |
None
|
end()
Finalize and append the toggle step to the parent composer.
Returns:
| Name | Type | Description |
|---|---|---|
Composer |
Composer
|
The parent composer for continued chaining. |
then(branch)
Define the branch to execute when the condition is true.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
branch |
BasePipelineStep | list[BasePipelineStep]
|
The step(s) for the enabled branch. |
required |
Returns:
| Name | Type | Description |
|---|---|---|
Self |
Self
|
The builder instance for chaining. |