Skip to content

Parallel composer

Fluent Parallel builder for Composer.

This module defines ParallelComposer, a small builder used by Composer to add parallel branches fluently.

Note

This builder is not meant to be instantiated directly by users. Use Composer.parallel(...) to obtain an instance, add forks via .fork(...), and call .end() to finalize.

Authors

Dimitrij Ray (dimitrij.ray@gdplabs.id)

References

NONE

ParallelComposer(parent, *, squash=True, input_map=None, retry_config=None, error_handler=None, cache_store=None, cache_config=None, name=None)

Fluent builder for a parallel step.

Usage

composer.parallel(name="p").fork(step_a).fork([step_b1, step_b2]).end()

The builder collects forks as branches and constructs a ParallelStep using the functional helper _func.parallel(...).

Initialize the ParallelComposer.

Parameters:

Name Type Description Default
parent Composer

The parent composer instance.

required
squash bool

Whether to squash execution into a single node (async gather). Defaults to True.

True
input_map InputMapSpec | None

Unified input mapping for all branches. Defaults to None.

None
retry_config RetryConfig | None

Retry configuration. Defaults to None.

None
error_handler BaseStepErrorHandler | None

Error handler. Defaults to None.

None
cache_store BaseCache | None

Optional cache store. Defaults to None.

None
cache_config dict[str, Any] | None

Optional cache config. Defaults to None.

None
name str | None

Optional name for the resulting step. Defaults to None.

None

end()

Finalize and append the parallel step to the parent composer.

Returns:

Name Type Description
Composer Composer

The parent composer for continued chaining.

fork(branch)

Add a fork (branch) to execute in parallel.

Parameters:

Name Type Description Default
branch BasePipelineStep | list[BasePipelineStep]

Step(s) for this fork.

required

Returns:

Name Type Description
Self Self

The builder instance for chaining.