playPipeline

gllm-pipelinearrow-up-right | Tutorial: Pipeline| Use Case: Build End-to-End RAG PipelineExecute a Pipeline| API Referencearrow-up-right

The Pipeline is the core orchestration component that sequences and manages the execution of the components in our SDK.

chevron-rightPrerequisiteshashtag

This example specifically requires completion of all setup steps listed on the Prerequisites page.

You should be familiar with these concepts:

  1. Basic Conceptsof orchestration components

Installation

# you can use a Conda environment
pip install --extra-index-url https://oauth2accesstoken:$(gcloud auth print-access-token)@glsdk.gdplabs.id/gen-ai-internal/simple/ "gllm-pipeline"

Quickstart

We will create a simple general Pipeline to illustrate the general workflow around building Pipelines.

1

Import the Pipeline and the steps

from typing import TypedDict  # Optional, only for custom states
from gllm_pipeline.pipeline.pipeline import Pipeline
from gllm_pipeline.steps._func import step, transform, bundle, log, subgraph
2

Define your state

We will use a simplified state for ths quickstart. Alternatively, if you are dealing with an RAG pipeline, you can use the default RAGState instead.

class MiniState(TypedDict):
    text: str
    text_upper: str
    text_len: int
    summary: dict  # summary bundle
3

Define your steps

Here we use simple transform and bundle steps to illustrate how the Pipeline works. You can always use the other Steps, or follow the How-to guide for a comprehensive guide.

def to_upper(data: dict) -> str:
    return data["text"].upper()

def count_chars(data: dict) -> int:
    return len(data["text_upper"])

pipe = Pipeline(
    steps=[
        transform(to_upper, input_map=["text"], output_state="text_upper"),
        transform(count_chars, input_map=["text_upper"], output_state="text_len"),
        bundle(["text", "text_upper", "text_len"], output_state="summary"),
    ],
    state_type=MiniState,
)
4

Invoke the pipeline

Our pipeline is asynchronous by default. Therefore, to invoke it, you must use asyncio.run.

import asyncio

initial: MiniState = {
    "text": "hello world",
    "text_upper": "",
    "text_len": 0,
    "summary": {},
}
final = asyncio.run(pipe.invoke(initial))

print(final)

After invoking the pipeline, you should get an output similar to this:

{'text': 'hello world', 'text_upper': 'HELLO WORLD', 'text_len': 11,
 'summary': {'text': 'hello world', 'text_upper': 'HELLO WORLD', 'text_len': 11}}
circle-check

The Pipe Operator

You can also utilize the pipe (|) operator to compose your Pipeline.

circle-exclamation

Appending a Step

You can use the | operator to append a step to a Pipeline.

Merge Two Pipelines

You can also use the | operator to merge two pipelines of the same State schema.

triangle-exclamation

Placeholder Pipelines

Finally, you can initialize a Pipeline with an empty step to use as a placeholder, e.g. to set the state_type, then use the | operator to compose the pipeline with the correct state_type.

Visualizing the Pipeline

Our Pipelines come with the get_mermaid_diagram() method, which gives you a Mermaid code. This is useful for docs and reviews.

To obtain the Mermaid diagram of a Pipeline, simply call the method.

The output should look something like this:

Which you could then copy and paste to any Mermaid renderer.

Runtime Configuration

Some steps support dynamic runtime configuration, which allows us to change the step's behavior at runtime.

To use these runtime configurations, during invocation, supply a dictionary using the config parameter.

For step, transform, parallel, and map_reduce, the Runtime Config is automatically accessible alongside the State when using input_map. When you specify a key in input_map as a string, the system first tries to resolve it from the State, and if not found, falls back to the Runtime Config.

For conditionals if_else, switch, toggle, and guard, the Runtime Config is always available and can be accessed using its original keys when using a Callable.

However, when using a Component, you must explicitly map the inputs using input_map.

Input and Output Schema

The Pipeline supports defining explicit input and output schemata to validate data entering and leaving the pipeline. This is particularly useful when converting a Pipeline to a tool or when you need strict type checking.

You should define input and output schemata when:

  1. Converting a Pipeline to a Tool (for Agent integration)

  2. You need validation of data entering/leaving the pipeline

  3. You want to document the expected structure of inputs and outputs

Internally, LangGraph uses input and output schemata to filter data at the boundaries of your pipeline:

  1. Input Schema Filtering: When you provide an input_type, LangGraph filters the initial state before it enters the graph. Only fields defined in the input schema are passed through to the pipeline's internal state. This ensures your pipeline only receives the exact fields you expect.

  2. Output Schema Filtering: When you provide an output_type, LangGraph filters the final state before returning it. Only fields defined in the output schema are included in the result. This is useful for hiding internal state fields from the caller.

circle-exclamation

Using TypedDict for Simple Schemas

For straightforward use cases, you can use TypedDict to define your schemas:

Using Pydantic for Validation

For automatic validation, you must use a Pydantic BaseModel as your state schema. LangGraph validates the state during pipeline execution based on the state schema.

The input and output schemas may be used to validate the state before and after the pipeline execution.

Converting to a Tool

When you define input/output schemas, the Pipeline can be easily converted to a Tool for use with AI Agents:

Using the Debug State

Our Pipeline comes with a utility to provide a trace of the Pipeline execution. To do so, pass config={"debug_state": True} . The trace is available as __state_logs__ in the final output.

Using a Pipeline as a Subgraph

To use a Pipeline as a Subgraph, one can wrap the Pipeline inside a subgraph step and map the input states and configs as necessary.

Using the Leftshift (<<) Operator

Alternatively, you can use the leftshift operator (<<) to embed a Pipeline as a subgraph in another Pipeline. Subgraphs created this way will have overlapping State keys automatically mapped.

Last updated