Multi-Step Pipelines

Multi-step pipelines allow you to chain multiple processing steps together using the pipe operator (|). This enables you to create sophisticated processing flows by combining simple, reusable components.

Basic Concept

The pipe operator (|) combines multiple steps into a single pipeline:

full_pipeline = step1 | step2 | step3

Each step processes the output of the previous step, creating a chain of transformations.

Simple Example

Let's create a simple pipeline with three steps: preprocessing, agent processing, and postprocessing.

Step 1: Define Custom Components

First, create simple components for preprocessing and postprocessing:

from typing import Any
from gllm_core.schema import Component
from gllm_pipeline.steps import step

class InputValidatorComponent(Component):
    """Validates and cleans input text."""

    async def _run(self, message: str, **kwargs: Any) -> str:
        cleaned = " ".join(message.strip().split())
        return f"[VALIDATED] {cleaned}"

class ResponseFormatterComponent(Component):
    """Formats the agent's response."""

    async def _run(self, agent_response: str, **kwargs: Any) -> dict:
        return {
            "response": agent_response,
            "formatted": True,
            "timestamp": "2026-03-06",
        }

Step 2: Create the Agent

Create a simple Digital Employee agent:

Step 3: Build the Pipeline with Pipe Operator

Now, combine all steps using the pipe operator:

Step 4: Run the Pipeline

How It Works

  1. Input Validation Step: The InputValidatorComponent cleans and validates the input message

  2. Agent Step: The Digital Employee processes the validated input and generates a response

  3. Response Formatting Step: The ResponseFormatterComponent adds formatting to the agent's response

The pipe operator (|) ensures that each step receives the output of the previous step as its input.

Important: Setting state_type

When using the pipe operator, the resulting Pipeline object has a default state_type. For the pipeline to work with DigitalEmployee, you must set it to DigitalEmployeeState:

This is necessary because DigitalEmployee creates a DigitalEmployeeState object and passes it to the pipeline. If the pipeline's state_type doesn't match, the pipeline won't recognize the state keys.

Last updated