Pipeline

gllm-pipeline | Tutorial: Pipeline| Use Case: Build End-to-End RAG PipelineExecute a Pipeline| API Reference

The Pipeline is the core orchestration component that sequences and manages the execution of the components in our SDK.

Prerequisites

This example specifically requires completion of all setup steps listed on the Prerequisites page.

You should be familiar with these concepts:

  1. Basic Conceptsof orchestration components

Installation

# you can use a Conda environment
pip install --extra-index-url https://oauth2accesstoken:$(gcloud auth print-access-token)@glsdk.gdplabs.id/gen-ai-internal/simple/ "gllm-pipeline"

Quickstart

We will create a simple general Pipeline to illustrate the general workflow around building Pipelines.

1

Import the Pipeline and the steps

from typing import TypedDict  # Optional, only for custom states
from gllm_pipeline.pipeline.pipeline import Pipeline
from gllm_pipeline.steps._func import step, transform, bundle, log, subgraph
2

Define your state

We will use a simplified state for ths quickstart. Alternatively, if you are dealing with an RAG pipeline, you can use the default RAGState instead.

class MiniState(TypedDict):
    text: str
    text_upper: str
    text_len: int
    summary: dict  # summary bundle
3

Define your steps

Here we use simple transform and bundle steps to illustrate how the Pipeline works. You can always use the other Steps, or follow the How-to guide for a comprehensive guide.

def to_upper(data: dict) -> str:
    return data["text"].upper()

def count_chars(data: dict) -> int:
    return len(data["text_upper"])

pipe = Pipeline(
    steps=[
        transform(to_upper, input_states=["text"], output_state="text_upper"),
        transform(count_chars, input_states=["text_upper"], output_state="text_len"),
        bundle(["text", "text_upper", "text_len"], output_state="summary"),
    ],
    state_type=MiniState,
)
4

Invoke the pipeline

Our pipeline is asynchronous by default. Therefore, to invoke it, you must use asyncio.run.

import asyncio

initial: MiniState = {
    "text": "hello world",
    "text_upper": "",
    "text_len": 0,
    "summary": {},
}
final = asyncio.run(pipe.invoke(initial))

print(final)

After invoking the pipeline, you should get an output similar to this:

{'text': 'hello world', 'text_upper': 'HELLO WORLD', 'text_len': 11,
 'summary': {'text': 'hello world', 'text_upper': 'HELLO WORLD', 'text_len': 11}}

The Pipe Operator

You can also utilize the pipe (|) operator to compose your Pipeline.

Appending a Step

You can use the | operator to append a step to a Pipeline.

Merge Two Pipelines

You can also use the | operator to merge two pipelines of the same State schema.

Placeholder Pipelines

Finally, you can initialize a Pipeline with an empty step to use as a placeholder, e.g. to set the state_type, then use the | operator to compose the pipeline with the correct state_type.

Visualizing the Pipeline

Our Pipelines come with the get_mermaid_diagram() method, which gives you a Mermaid code. This is useful for docs and reviews.

To obtain the Mermaid diagram of a Pipeline, simply call the method.

The output should look something like this:

Which you could then copy and paste to any Mermaid renderer.

Runtime Configuration

Some steps support dynamic runtime configuration, which allows us to change the step's behavior at runtime.

To use these runtime configurations, during invocation, supply a dictionary using the config parameter.

For step, transform, parallel, and map_reduce , the Runtime Config gets merged with the inputs from the State, and can be used as an extra input to the Callable or Component inside these steps. For these steps, the Runtime Config is only available when defined via a runtime_config_map.

For conditionals if_else, switch, toggle, and guard, the Runtime Config is always available and can be accessed using its original keys when using a Callable.

However, when using a Component, the runtime_config_map must be provided.

Using the Debug State

Our Pipeline comes with a utility to provide a trace of the Pipeline execution. To do so, pass config={"debug_state": True} . The trace is available as __state_logs__ in the final output.

Using a Pipeline as a Subgraph

To use a Pipeline as a Subgraph, one can wrap the Pipeline inside a subgraph step and map the input states and configs as necessary.

Using the Leftshift (<<) Operator

Alternatively, you can use the leftshift operator (<<) to embed a Pipeline as a subgraph in another Pipeline. Subgraphs created this way will have overlapping State keys automatically mapped.

Last updated