Multi-Step Pipelines
Basic Concept
full_pipeline = step1 | step2 | step3Simple Example
Step 1: Define Custom Components
from typing import Any
from gllm_core.schema import Component
from gllm_pipeline.steps import step
class InputValidatorComponent(Component):
"""Validates and cleans input text."""
async def _run(self, message: str, **kwargs: Any) -> str:
cleaned = " ".join(message.strip().split())
return f"[VALIDATED] {cleaned}"
class ResponseFormatterComponent(Component):
"""Formats the agent's response."""
async def _run(self, agent_response: str, **kwargs: Any) -> dict:
return {
"response": agent_response,
"formatted": True,
"timestamp": "2026-03-06",
}Step 2: Create the Agent
Step 3: Build the Pipeline with Pipe Operator
Step 4: Run the Pipeline
How It Works
Important: Setting state_type
state_typeRelated Documentation
Last updated