Pipeline
gllm-pipeline | Tutorial: Pipeline| Use Case: Build End-to-End RAG PipelineExecute a Pipeline| API Reference
The Pipeline is the core orchestration component that sequences and manages the execution of the components in our SDK.
Installation
# you can use a Conda environment
pip install --extra-index-url https://oauth2accesstoken:$(gcloud auth print-access-token)@glsdk.gdplabs.id/gen-ai-internal/simple/ "gllm-pipeline"# you can use a Conda environment
$token = (gcloud auth print-access-token)
pip install --extra-index-url "https://oauth2accesstoken:$token@glsdk.gdplabs.id/gen-ai-internal/simple/" "gllm-pipeline"# you can use a Conda environment
FOR /F "tokens=*" %T IN ('gcloud auth print-access-token') DO pip install --extra-index-url "gllm-pipeline"Quickstart
We will create a simple general Pipeline to illustrate the general workflow around building Pipelines.
Import the Pipeline and the steps
from typing import TypedDict # Optional, only for custom states
from gllm_pipeline.pipeline.pipeline import Pipeline
from gllm_pipeline.steps._func import step, transform, bundle, log, subgraphDefine your state
We will use a simplified state for ths quickstart. Alternatively, if you are dealing with an RAG pipeline, you can use the default RAGState instead.
class MiniState(TypedDict):
text: str
text_upper: str
text_len: int
summary: dict # summary bundleDefine your steps
Here we use simple transform and bundle steps to illustrate how the Pipeline works. You can always use the other Steps, or follow the How-to guide for a comprehensive guide.
def to_upper(data: dict) -> str:
return data["text"].upper()
def count_chars(data: dict) -> int:
return len(data["text_upper"])
pipe = Pipeline(
steps=[
transform(to_upper, input_map=["text"], output_state="text_upper"),
transform(count_chars, input_map=["text_upper"], output_state="text_len"),
bundle(["text", "text_upper", "text_len"], output_state="summary"),
],
state_type=MiniState,
)Invoke the pipeline
Our pipeline is asynchronous by default. Therefore, to invoke it, you must use asyncio.run.
import asyncio
initial: MiniState = {
"text": "hello world",
"text_upper": "",
"text_len": 0,
"summary": {},
}
final = asyncio.run(pipe.invoke(initial))
print(final)After invoking the pipeline, you should get an output similar to this:
{'text': 'hello world', 'text_upper': 'HELLO WORLD', 'text_len': 11,
'summary': {'text': 'hello world', 'text_upper': 'HELLO WORLD', 'text_len': 11}}That's it! You have created your first Pipeline! All future Pipelines that you will ever create will follow the above general steps.
The Pipe Operator
You can also utilize the pipe (|) operator to compose your Pipeline.
When composing using | , the Pipeline's state will be RAGState. Make sure that you use the state_type setter to specify the correct state type before invoking.
Appending a Step
You can use the | operator to append a step to a Pipeline.
Merge Two Pipelines
You can also use the | operator to merge two pipelines of the same State schema.
Merging two pipelines with different State schemata will cause a ValidationError.
Placeholder Pipelines
Finally, you can initialize a Pipeline with an empty step to use as a placeholder, e.g. to set the state_type, then use the | operator to compose the pipeline with the correct state_type.
Visualizing the Pipeline
Our Pipelines come with the get_mermaid_diagram() method, which gives you a Mermaid code. This is useful for docs and reviews.
To obtain the Mermaid diagram of a Pipeline, simply call the method.
The output should look something like this:
Which you could then copy and paste to any Mermaid renderer.
Runtime Configuration
Some steps support dynamic runtime configuration, which allows us to change the step's behavior at runtime.
To use these runtime configurations, during invocation, supply a dictionary using the config parameter.
For step, transform, parallel, and map_reduce, the Runtime Config is automatically accessible alongside the State when using input_map. When you specify a key in input_map as a string, the system first tries to resolve it from the State, and if not found, falls back to the Runtime Config.
For conditionals if_else, switch, toggle, and guard, the Runtime Config is always available and can be accessed using its original keys when using a Callable.
However, when using a Component, you must explicitly map the inputs using input_map.
Input and Output Schema
The Pipeline supports defining explicit input and output schemata to validate data entering and leaving the pipeline. This is particularly useful when converting a Pipeline to a tool or when you need strict type checking.
You should define input and output schemata when:
Converting a Pipeline to a Tool (for Agent integration)
You need validation of data entering/leaving the pipeline
You want to document the expected structure of inputs and outputs
Internally, LangGraph uses input and output schemata to filter data at the boundaries of your pipeline:
Input Schema Filtering: When you provide an
input_type, LangGraph filters the initial state before it enters the graph. Only fields defined in the input schema are passed through to the pipeline's internal state. This ensures your pipeline only receives the exact fields you expect.Output Schema Filtering: When you provide an
output_type, LangGraph filters the final state before returning it. Only fields defined in the output schema are included in the result. This is useful for hiding internal state fields from the caller.
Important: Input and output schemas perform filtering, not validation. If you need to validate inputs (e.g., checking ranges, formats), you must either:
Use a Pydantic
BaseModelfor yourstate_type(validates during execution)Manually validate using your
input_typeschema before callinginvoke()
Using TypedDict for Simple Schemas
For straightforward use cases, you can use TypedDict to define your schemas:
Using Pydantic for Validation
For automatic validation, you must use a Pydantic BaseModel as your state schema. LangGraph validates the state during pipeline execution based on the state schema.
The input and output schemas may be used to validate the state before and after the pipeline execution.
Converting to a Tool
When you define input/output schemas, the Pipeline can be easily converted to a Tool for use with AI Agents:
Using the Debug State
Our Pipeline comes with a utility to provide a trace of the Pipeline execution. To do so, pass config={"debug_state": True} . The trace is available as __state_logs__ in the final output.
Using a Pipeline as a Subgraph
To use a Pipeline as a Subgraph, one can wrap the Pipeline inside a subgraph step and map the input states and configs as necessary.
Using the Leftshift (<<) Operator
Alternatively, you can use the leftshift operator (<<) to embed a Pipeline as a subgraph in another Pipeline. Subgraphs created this way will have overlapping State keys automatically mapped.
Last updated