Skip to content

Pipeline

Modules concerning the pipeline orchestration of Gen AI applications.

Pipeline(steps=None, state_type=RAGState, input_type=None, output_type=None, context_schema=None, recursion_limit=30, name=None, cache=None, checkpointer=None, interrupt_before=None, interrupt_after=None)

Represents a sequence of steps executed in order, forming a pipeline.

A pipeline can have zero or more steps. When a pipeline has no steps (empty list), it acts as a pass-through pipeline that simply returns the input state unchanged. This is useful when using the pipe (|) operator to define RAG State without requiring explicit steps.

Examples:

Basic pipeline with steps

pipeline = Pipeline([retrieval_step, generation_step, terminator_step])

Empty pipeline (pass-through)

pipeline = Pipeline([])
pipeline = Pipeline(None)

Pipeline with custom state type

class CustomState(TypedDict):
    user_query: str
    context: str
    response: str

pipeline = Pipeline([retrieval_step, generation_step], state_type=CustomState)

Named pipeline for subgraph usage

pipeline = Pipeline([retrieval_step, generation_step], name="rag_pipeline")

Pipeline with caching

pipeline = Pipeline(
    [retrieval_step, generation_step],
    cache=CacheConfig(store=cache_store, key_func=lambda x: x["input"]["user_query"])
)

Using pipe (|) operator to combine steps

pipeline = retrieval_step | generation_step | terminator_step

Using pipe (|) operator to combine step with pipeline

pipeline = Pipeline([retrieval_step, generation_step]) | terminator_step

Using pipe (|) operator to combine pipelines

pipeline1 = Pipeline([retrieval_step])
pipeline2 = Pipeline([generation_step, terminator_step])
combined_pipeline = pipeline1 | pipeline2

Configure step exclusion after initialization (set-only)

log_step = log(name="log_step", ...)
retrieval_step = step(name="retrieval_step", ...)
generation_step = step(name="generation_step", ...)
pipeline = Pipeline([log_step, retrieval_step, generation_step])
pipeline.exclusions.exclude("log_step")  # Skip logging step

Configure composite step exclusion

log_step = log(name="log_step", ...)
retrieval_a_step = step(name="retrieval_a_step", ...)
retrieval_b_step = step(name="retrieval_b_step", ...)
parallel_step = parallel(
    name="parallel_step", {"retrieval_a": retrieval_a_step, "retrieval_b": retrieval_b_step},
)
pipeline = Pipeline([log_step, parallel_step])
pipeline.exclusions.exclude("parallel_step")  # Skip the entire parallel step
pipeline.exclusions.exclude("parallel_step.retrieval_a")  # Skip retrieval_a step
pipeline.exclusions.exclude("parallel_step.retrieval_b")  # Skip retrieval_b step

Attributes:

Name Type Description
steps list[BasePipelineStep]

List of steps to be executed in the pipeline. Can be empty for a pass-through pipeline.

state_type type

The type of state used in the pipeline. Defaults to RAGState.

recursion_limit int

The maximum number of steps allowed.

name str | None

A name for this pipeline. Used when this pipeline is included as a subgraph. Defaults to None, in which case the name will be "Subgraph" followed by a unique identifier.

exclusions ExclusionManager

The exclusion manager for this pipeline.

Initializes the Pipeline with the given steps and state type.

Parameters:

Name Type Description Default
steps list[BasePipelineStep] | None

List of steps to be executed in the pipeline. Defaults to None, in which case the steps will be [] and simply returns the input state unchanged.

None
state_type type

The type of pipeline's overall internal state. Could be a TypedDict or a Pydantic BaseModel. Defaults to RAGState.

RAGState
input_type type | None

The type of pipeline's input state. This state should be compatible with the pipeline's state_type. Could be a TypedDict or a Pydantic BaseModel. Defaults to None, in which case the input state type will be the same as the pipeline's state_type.

None
output_type type | None

The type of pipeline's output state. This state should be compatible with the pipeline's state_type. Could be a TypedDict or a Pydantic BaseModel. Defaults to None, in which case the output state type will be the same as the pipeline's state_type.

None
context_schema type | None

The type of pipeline's runtime context. Defaults to None, in which case no context schema will be used.

None
recursion_limit int

The maximum number of steps allowed. Defaults to 30.

30
name str | None

A name for this pipeline. Used when this pipeline is included as a subgraph. Defaults to None, in which case the name will be "Subgraph" followed by a unique identifier.

None
cache CacheConfig | None

Configuration for the cache store including the cache store instance and its settings. Defaults to None, in which case no caching will be used.

None
checkpointer Any

A LangGraph checkpoint saver instance (e.g., MemorySaver) to persist state and enable resuming from interruptions. Defaults to None.

None
interrupt_before list[str] | str | None

Name(s) of steps to pause before executing. Requires a checkpointer to save state. Defaults to None.

None
interrupt_after list[str] | str | None

Name(s) of steps to pause after executing. Requires a checkpointer to save state. Defaults to None.

None

composer property

Get a Composer instance that manages this pipeline.

The Composer provides a fluent API for building pipelines by chaining step-adding methods. It allows for easy composition of pipeline steps in a readable, chainable manner.

Returns:

Name Type Description
Composer Composer

A composer instance that manages this pipeline.

exclusions property

Get the exclusion manager for this pipeline.

Returns:

Name Type Description
ExclusionManager 'ExclusionManager'

The exclusion manager for this pipeline.

graph property

The graph representation of the pipeline.

If the graph doesn't exist yet, it will be built automatically.

Returns:

Type Description
StateGraph | None

StateGraph | None: The graph representation of the pipeline, or None if the pipeline has no steps.

state_type property writable

The current state type of the pipeline.

Returns:

Type Description
type[Any]

type[Any]: The current state type.

__lshift__(other)

Includes another pipeline or step using the '<<' operator.

This allows for easy composition where: - If 'other' is a Pipeline: it becomes a subgraph within this pipeline - If 'other' is a BasePipelineStep: it's added directly to this pipeline's steps

The syntax pipeline1 << pipeline2 visually indicates pipeline2 being inserted into pipeline1. The syntax pipeline << step adds the step to the pipeline.

Parameters:

Name Type Description Default
other Pipeline | BasePipelineStep

The pipeline to include as a subgraph or step to add.

required

Returns:

Name Type Description
Pipeline 'Pipeline'

A new pipeline with the other pipeline included as a subgraph step or with the step added.

__or__(other)

Combines the current pipeline with another pipeline or step using the '|' operator.

When combining two pipelines, the state types must match.

Parameters:

Name Type Description Default
other Pipeline | BasePipelineStep

The other pipeline or step to combine with.

required

Returns:

Name Type Description
Pipeline 'Pipeline'

A new pipeline consisting of the combined steps.

Raises:

Type Description
ValueError

If the state types of the pipelines do not match.

__rshift__(other)

Includes this pipeline as a subgraph in another context using the '>>' operator.

This allows for easy composition where: - If 'other' is a Pipeline: this pipeline becomes a subgraph within the other pipeline - If 'other' is a BasePipelineStep: a new pipeline is created with the step, and this pipeline is included as a subgraph within that pipeline

The syntax pipeline1 >> pipeline2 embeds pipeline1 as a subgraph within pipeline2 (equivalent to pipeline2 << pipeline1). The syntax pipeline >> step creates a new pipeline with the step, and includes this pipeline as a subgraph within that pipeline.

Parameters:

Name Type Description Default
other Pipeline | BasePipelineStep

The pipeline to include this pipeline in as a subgraph, or a step to create a new pipeline with.

required

Returns:

Name Type Description
Pipeline 'Pipeline'

A new pipeline with this pipeline included as a subgraph.

add_step(step)

Append a step and invalidate cached graph artifacts.

Parameters:

Name Type Description Default
step BasePipelineStep

The pipeline step to append.

required

Raises:

Type Description
TypeError

If step is not a BasePipelineStep.

as_tool(description=None, input_schema=None, output_schema=None, input_transform=None, output_transform=None)

Convert the pipeline to a Tool instance.

This method allows a Pipeline instance to be used as a tool, with flexible input/output transformation support. The pipeline must have an input_type defined unless a custom input_schema is provided with input_transform.

Note that if the input_schema is not provided, the input schema will fall back to the pipeline's input_type and context_schema. In this case, the input_transform is ignored.

Examples:

# Default behavior with context schema
class InputState(TypedDict):
    user_query: str

class ContextSchema(TypedDict):
    session_id: str

pipeline = Pipeline(
    [retrieval_step, generation_step],
    input_type=InputState,
    context_schema=ContextSchema,
    name="rag_pipeline"
)
tool = pipeline.as_tool()
result = await tool.invoke(
    input={"user_query": "What is AI?"},
    context={"session_id": "abc123"}
)

# Custom input schema with transforms
class ToolInput(TypedDict):
    message: str
    limit: int

def pre_transform(data):
    return {
        "input": {"user_query": data["message"]},
        "config": {"max_results": data["limit"]}
    }

def post_transform(data):
    return {"processed": True, "result": data}

tool = pipeline.as_tool(
    input_schema=ToolInput,
    input_transform=pre_transform,
    output_transform=post_transform
)
result = await tool.invoke(message="What is AI?", limit=5)

Parameters:

Name Type Description Default
description str | None

Optional description to associate with the tool. Defaults to None, in which case a description will be generated automatically.

None
input_schema type[Any] | None

Custom input schema for the tool (TypedDict or Pydantic BaseModel). If provided, input_transform must also be provided. Defaults to None.

None
output_schema type[Any] | None

Custom output schema for the tool (TypedDict or Pydantic BaseModel). If provided, the output will be filtered to match this schema after applying output_transform. If not provided, uses the pipeline's output_type. Defaults to None.

None
input_transform Callable | None

Function to transform tool input to pipeline invocation format. Must return dict with 'input' key and optional 'config' key. Required if input_schema is provided. Defaults to None.

None
output_transform Callable | None

Function to transform pipeline output before returning from tool. Can change the output type. Defaults to None.

None

Returns:

Name Type Description
Tool Tool

A Tool instance that wraps the pipeline.

Raises:

Type Description
ValueError

If the pipeline does not have an input schema defined and no custom input_schema is provided, or if input_schema is provided without input_transform, or if input_schema/output_schema is not a TypedDict or BaseModel.

build_graph()

Builds the graph representation of the pipeline by connecting the steps.

clear()

Clears the pipeline by resetting steps, graph, and app to their initial state.

This method resets the pipeline to an empty state, clearing all steps and invalidating any built graph or compiled app. Useful for reusing a pipeline instance with different configurations.

disable_debug_tracing()

Disable per-node console tracing for subsequent invoke() calls.

Reverses the effect of enable_debug_tracing(). Safe to call even if tracing was never enabled.

enable_debug_tracing()

Enable per-node console tracing for subsequent invoke() calls.

Examples:

pipeline = Pipeline([step_a, step_b])
pipeline.enable_debug_tracing()
await pipeline.invoke({"user_query": "hi"})

Once called, every node executed during a subsequent invoke() will emit a plain-text summary to stdout showing the node name, truncated state snapshot, and the step duration in milliseconds.

The output is fully compatible with existing gl-observability OpenTelemetry traces; enabling this tracer does not affect, replace, or conflict with any OTel spans already being emitted.

fork_from(thread_id, checkpoint_id, values, checkpoint_ns='', as_node=None)

Fork the pipeline state from a historical checkpoint.

Examples:

from langgraph.checkpoint.memory import InMemorySaver

pipeline = Pipeline([step_a, step_b], checkpointer=InMemorySaver())
await pipeline.invoke({"user_query": "hello"}, thread_id="t1")

# Retrieve history
history = [snap async for snap in pipeline.get_state_history("t1")]
checkpoint_id = history[0].config["configurable"]["checkpoint_id"]
checkpoint_ns = history[0].config["configurable"].get("checkpoint_ns", "")

# Fork pipeline from specific checkpoint with new state
new_config = pipeline.fork_from(
    "t1",
    checkpoint_id,
    {"user_query": "hello again"},
    checkpoint_ns=checkpoint_ns,
)

# Run pipeline using the new configuration
await pipeline.invoke(None, config=new_config)

Parameters:

Name Type Description Default
thread_id str

The thread ID.

required
checkpoint_id str

The checkpoint ID to fork from.

required
values dict[str, Any]

The values to update the state with.

required
checkpoint_ns str

The checkpoint namespace. Defaults to "".

''
as_node str | None

The node to simulate the update as. Defaults to None.

None

Returns:

Name Type Description
RunnableConfig RunnableConfig

The configuration to use for the subsequent invocation.

Raises:

Type Description
ValueError

If the checkpointer is None.

get_mermaid_diagram()

Generate a Mermaid diagram representation of the pipeline.

Returns:

Name Type Description
str str

The complete Mermaid diagram representation.

get_state(thread_id) async

Return the latest checkpoint snapshot for a thread.

Examples:

snapshot = await pipeline.get_state("ticket-123")
print(snapshot.values)

Parameters:

Name Type Description Default
thread_id str

Thread identifier used for the checkpointed run.

required

Returns:

Name Type Description
StateSnapshot StateSnapshot

LangGraph's latest checkpoint snapshot for the thread.

Raises:

Type Description
TypeError

If thread_id is not a string.

ValueError

If thread_id is empty or no checkpointer is attached.

get_state_history(thread_id, limit=None, history_filter=None, before=None)

Return an async iterator over past StateSnapshots for the given thread.

Time-travel debugging: iterates through all checkpointed states that were saved for thread_id, from the most-recent snapshot backwards. Each StateSnapshot carries the full graph state at that point in time so callers can inspect intermediate values or fork from a previous checkpoint.

Examples:

from langgraph.checkpoint.memory import InMemorySaver

pipeline = Pipeline([step_a, step_b], checkpointer=InMemorySaver())
await pipeline.invoke({"query": "hello"}, thread_id="t1")

# Iterate through all snapshots
async for snapshot in pipeline.get_state_history("t1"):
    print(snapshot.values)

# Iterate with a filter (e.g. only get snapshots where the source is 'step_a')
async for snapshot in pipeline.get_state_history(
    "t1", history_filter={"source": "step_a"}
):
    print(snapshot.values)

Parameters:

Name Type Description Default
thread_id str

The thread identifier whose history should be retrieved. Must match the thread_id that was passed to invoke.

required
limit int | None

Maximum number of snapshots to return. None means no limit (return the full history). Defaults to None.

None
history_filter dict[str, Any] | None

Optional filter for snapshots. Defaults to None.

None
before RunnableConfig | None

Optional config to start the history before. Defaults to None.

None

Returns:

Type Description
AsyncIterator[StateSnapshot]

AsyncIterator[StateSnapshot]: A lazy async iterator of StateSnapshot objects in reverse-chronological order (newest first).

Raises:

Type Description
TypeError

If thread_id is not a string.

ValueError

If no checkpointer is attached, thread_id is empty, or limit is not a positive integer.

invoke(initial_state=None, config=None, thread_id=None, context=None, interrupt_before=None, interrupt_after=None, include_outputs_from=None) async

Runs the pipeline asynchronously with the given initial state and configuration.

Parameters:

Name Type Description Default
initial_state PipelineState | None

The initial state to start the pipeline with. This initial state should comply with the state type of the pipeline. Can be None or a Command to resume.

None
config dict[str, Any]

Additional configuration for the pipeline. Should only contain configuration flags like 'debug_state'. User-defined config should not have "langgraph_" prefix as it should be reserved for internal use. Runtime context should be passed via the 'context' parameter instead. Defaults to None.

None
thread_id str | None

The thread ID for this specific pipeline invocation. This will be passed in the invocation_config.configurable when invoking the pipeline. Useful for checkpointing and tracking related invocations. Defaults to None.

None
context dict[str, Any]

Runtime context for the pipeline execution. This contains user-defined data that steps might need during execution. When not provided, will fall back to using 'config' for backwards compatibility. Defaults to None.

None
interrupt_before list[str] | str | None

Name(s) of step nodes to pause execution before. Passed to LangGraph at runtime — no graph recompilation needed. Requires a checkpointer. Defaults to None.

None
interrupt_after list[str] | str | None

Name(s) of step nodes to pause execution after. Passed to LangGraph at runtime — no graph recompilation needed. Requires a checkpointer. Defaults to None.

None
include_outputs_from set[str] | None

Nodes to extract step outputs from. Defaults to None.

None

Returns:

Type Description
dict[str, Any]

dict[str, Any]: The final state after the pipeline execution. If 'debug_state' is set to True in the config, the state logs will be included in the final state with the key 'state_logs'.

Raises:

Type Description
BaseInvokerError

If an error occurs during LM invocation.

CancelledError

If the execution is cancelled, preserved with added context.

TimeoutError

If the execution times out, preserved with added context.

RuntimeError

If an error occurs during pipeline execution. If the error is due to a step execution, the step name will be included in the error message.

resume(thread_id, value, config=None, context=None, interrupt_before=None, interrupt_after=None) async

Resume an interrupted checkpointed run.

Examples:

from langgraph.checkpoint.memory import InMemorySaver

from gllm_pipeline.pipeline import Pipeline
from gllm_pipeline.steps import interrupt

pipeline = Pipeline(
    steps=[
        interrupt(
            name="human_approval",
            message="Approve this run?",
            resume_value_map="approval",
        ),
    ],
    checkpointer=InMemorySaver(),
)

await pipeline.invoke({}, thread_id="ticket-123")
result = await pipeline.resume("ticket-123", "approved")
assert result["approval"] == "approved"

Parameters:

Name Type Description Default
thread_id str

Thread identifier used for the checkpointed run.

required
value Any

Resume payload to wrap in Command(resume=value).

required
config dict[str, Any] | None

Optional pipeline invocation config. Defaults to None.

None
context dict[str, Any] | None

Optional runtime context for pipeline steps. Defaults to None.

None
interrupt_before list[str] | str | None

Optional runtime interrupt-before nodes. Defaults to None.

None
interrupt_after list[str] | str | None

Optional runtime interrupt-after nodes. Defaults to None.

None

Returns:

Type Description
dict[str, Any]

dict[str, Any]: Final pipeline state returned by invoke.

Raises:

Type Description
TypeError

If thread_id is not a string.

ValueError

If thread_id is empty or no checkpointer is attached.

update_state(thread_id, values, as_node=None, task_id=None) async

Patch checkpoint state for a thread.

Examples:

updated_config = await pipeline.update_state(
    "ticket-123",
    {"draft": "Reviewer-edited draft answer."},
    as_node="human_approval",
)

Parameters:

Name Type Description Default
thread_id str

Thread identifier used for the checkpointed run.

required
values dict[str, Any] | Any | None

State values to pass through to LangGraph.

required
as_node str | None

Optional graph node name to treat as the writer. Defaults to None.

None
task_id str | None

Optional LangGraph task ID to target. Defaults to None.

None

Returns:

Name Type Description
RunnableConfig RunnableConfig

LangGraph's updated runnable config for the patched checkpoint state.

Raises:

Type Description
TypeError

If thread_id is not a string.

ValueError

If thread_id is empty or no checkpointer is attached.

RAGState

Bases: TypedDict

A TypedDict representing the state of a Retrieval-Augmented Generation (RAG) pipeline.

This docstring documents the original intention of each of the attributes in the TypedDict. However, in practice, the attributes may be modified or extended to suit the specific requirements of the application. The TypedDict is used to enforce the structure of the state object.

Attributes:

Name Type Description
user_query str

The original query from the user.

queries list[str]

A list of queries generated for retrieval.

retrieval_params dict[str, Any]

Parameters used for the retrieval process.

chunks list

A list of chunks retrieved from the knowledge base.

history list[Any]

The history of the conversation or interaction.

context str

The context information used for generating responses.

response_synthesis_bundle dict[str, Any]

Data used for synthesizing the final response.

response str

The generated response to the user's query.

references str | list[str]

References or sources used in generating the response.

event_emitter EventEmitter

An event emitter instance for logging purposes.

Example
state = {
    "user_query": "What is machine learning?",
    "queries": ["machine learning definition", "ML basics"],
    "retrieval_params": {"top_k": 5, "threshold": 0.8},
    "chunks": [
        {"content": "Machine learning is...", "score": 0.95},
        {"content": "ML algorithms include...", "score": 0.87}
    ],
    "history": [
        {"role": "user", "contents": ["What is machine learning?"]},
        {"role": "assistant", "contents": ["Machine learning is a subset of artificial intelligence..."]}
    ],
    "context": "Retrieved information about ML",
    "response_synthesis_bundle": {"template": "informative"},
    "response": "Machine learning is a subset of artificial intelligence...",
    "references": ["source1.pdf", "article2.html"],
    "event_emitter": EventEmitter()
}

RAGStateModel

Bases: BaseModel

A Pydantic BaseModel representing the state of a Retrieval-Augmented Generation (RAG) pipeline.

This implementation provides runtime validation, default values, and enhanced type safety compared to the TypedDict version. It maintains compatibility with LangGraph while offering improved developer experience through automatic validation and sensible defaults.

Attributes:

Name Type Description
user_query str

The original query from the user.

queries list[str]

A list of queries generated for retrieval. Defaults to empty list.

retrieval_params dict[str, Any]

Parameters used for the retrieval process. Defaults to empty dict.

chunks list[Any]

A list of chunks retrieved from the knowledge base. Defaults to empty list.

history list[Any]

The history of the conversation or interaction. Defaults to empty list.

context str

The context information used for generating responses. Defaults to empty string.

response_synthesis_bundle dict[str, Any]

Data used for synthesizing the final response. Defaults to empty dict.

response str

The generated response to the user's query. Defaults to empty string.

references str | list[str]

References or sources used in generating the response. Defaults to empty list.

event_emitter EventEmitter | None

An event emitter instance for logging purposes. Defaults to None.

Example
# Basic usage with minimal required fields
state = RAGStateModel(user_query="What is machine learning?")

# Full usage with all fields
state = RAGStateModel(
    user_query="What is machine learning?",
    queries=["machine learning definition", "ML basics"],
    retrieval_params={"top_k": 5, "threshold": 0.8},
    chunks=[
        {"content": "Machine learning is...", "score": 0.95},
        {"content": "ML algorithms include...", "score": 0.87}
    ],
    history=[
        {"role": "user", "contents": ["What is machine learning?"]},
        {"role": "assistant", "contents": ["Machine learning is a subset of artificial intelligence..."]}
    ],
    context="Retrieved information about ML",
    response_synthesis_bundle={"template": "informative"},
    response="Machine learning is a subset of artificial intelligence...",
    references=["source1.pdf", "article2.html"],
    event_emitter=EventEmitter()
)

# Convert to dictionary for pipeline processing
state_dict = state.model_dump()

# Use with json_encoders for special types
state_json = state.model_dump_json()
Example with custom JSON encoders
from datetime import datetime
from pydantic import BaseModel, ConfigDict, Field

class CustomStateModel(BaseModel):
    timestamp: datetime = Field(default_factory=datetime.now)

    model_config = ConfigDict(
        json_encoders={
            datetime: lambda v: v.isoformat(),
            EventEmitter: lambda v: str(v) if v else None
        }
    )