Pipeline
gllm-pipeline | Tutorial: Pipeline| Use Case: Build End-to-End RAG PipelineExecute a Pipeline| API Reference
The Pipeline is the core orchestration component that sequences and manages the execution of the components in our SDK.
Installation
# you can use a Conda environment
pip install --extra-index-url https://oauth2accesstoken:$(gcloud auth print-access-token)@glsdk.gdplabs.id/gen-ai-internal/simple/ "gllm-pipeline"# you can use a Conda environment
$token = (gcloud auth print-access-token)
pip install --extra-index-url "https://oauth2accesstoken:$token@glsdk.gdplabs.id/gen-ai-internal/simple/" "gllm-pipeline"# you can use a Conda environment
FOR /F "tokens=*" %T IN ('gcloud auth print-access-token') DO pip install --extra-index-url "gllm-pipeline"Quickstart
We will create a simple general Pipeline to illustrate the general workflow around building Pipelines.
Define your state
We will use a simplified state for ths quickstart. Alternatively, if you are dealing with an RAG pipeline, you can use the default RAGState instead.
class MiniState(TypedDict):
text: str
text_upper: str
text_len: int
summary: dict # summary bundleDefine your steps
Here we use simple transform and bundle steps to illustrate how the Pipeline works. You can always use the other Steps, or follow the How-to guide for a comprehensive guide.
def to_upper(data: dict) -> str:
return data["text"].upper()
def count_chars(data: dict) -> int:
return len(data["text_upper"])
pipe = Pipeline(
steps=[
transform(to_upper, input_states=["text"], output_state="text_upper"),
transform(count_chars, input_states=["text_upper"], output_state="text_len"),
bundle(["text", "text_upper", "text_len"], output_state="summary"),
],
state_type=MiniState,
)Invoke the pipeline
Our pipeline is asynchronous by default. Therefore, to invoke it, you must use asyncio.run.
import asyncio
initial: MiniState = {
"text": "hello world",
"text_upper": "",
"text_len": 0,
"summary": {},
}
final = asyncio.run(pipe.invoke(initial))
print(final)After invoking the pipeline, you should get an output similar to this:
{'text': 'hello world', 'text_upper': 'HELLO WORLD', 'text_len': 11,
'summary': {'text': 'hello world', 'text_upper': 'HELLO WORLD', 'text_len': 11}}That's it! You have created your first Pipeline! All future Pipelines that you will ever create will follow the above general steps.
The Pipe Operator
You can also utilize the pipe (|) operator to compose your Pipeline.
When composing using | , the Pipeline's state will be RAGState. Make sure that you use the state_type setter to specify the correct state type before invoking.
Appending a Step
You can use the | operator to append a step to a Pipeline.
Merge Two Pipelines
You can also use the | operator to merge two pipelines of the same State schema.
Merging two pipelines with different State schemata will cause a ValidationError.
Placeholder Pipelines
Finally, you can initialize a Pipeline with an empty step to use as a placeholder, e.g. to set the state_type, then use the | operator to compose the pipeline with the correct state_type.
Visualizing the Pipeline
Our Pipelines come with the get_mermaid_diagram() method, which gives you a Mermaid code. This is useful for docs and reviews.
To obtain the Mermaid diagram of a Pipeline, simply call the method.
The output should look something like this:
Which you could then copy and paste to any Mermaid renderer.
Runtime Configuration
Some steps support dynamic runtime configuration, which allows us to change the step's behavior at runtime.
To use these runtime configurations, during invocation, supply a dictionary using the config parameter.
For step, transform, parallel, and map_reduce , the Runtime Config gets merged with the inputs from the State, and can be used as an extra input to the Callable or Component inside these steps. For these steps, the Runtime Config is only available when defined via a runtime_config_map.
For conditionals if_else, switch, toggle, and guard, the Runtime Config is always available and can be accessed using its original keys when using a Callable.
However, when using a Component, the runtime_config_map must be provided.
Using the Debug State
Our Pipeline comes with a utility to provide a trace of the Pipeline execution. To do so, pass config={"debug_state": True} . The trace is available as __state_logs__ in the final output.
Using a Pipeline as a Subgraph
To use a Pipeline as a Subgraph, one can wrap the Pipeline inside a subgraph step and map the input states and configs as necessary.
Using the Leftshift (<<) Operator
Alternatively, you can use the leftshift operator (<<) to embed a Pipeline as a subgraph in another Pipeline. Subgraphs created this way will have overlapping State keys automatically mapped.
Last updated