Your First RAG Pipeline

This guide will walk you through setting up your custom RAG pipeline.

Prerequisites

This example specifically requires:

Installation

# you can use a Conda environment
pip install --extra-index-url "https://oauth2accesstoken:$(gcloud auth print-access-token)@glsdk.gdplabs.id/gen-ai-internal/simple/" gllm-rag gllm-core gllm-generation gllm-inference gllm-pipeline gllm-retrieval gllm-misc gllm-datastore

Set Up Your Project

Prepare your repository

Let’s prepare your workspace step by step.

1

Create a new project folder:

mkdir my-rag-pipeline
cd my-rag-pipeline
2

Download these base files into your folder:

  • imaginary_animals.csv

You may use another knowledge base file and adjust accordingly.

  • indexer.py

3

Prepare your .env file:

Create a file named .env in your project directory with the following content:

CSV_DATA_PATH="data/imaginary_animals.csv"
ELASTICSEARCH_URL="http://localhost:9200/"
EMBEDDING_MODEL="text-embedding-3-small"
LANGUAGE_MODEL="gpt-4o-mini"
INDEX_NAME="first-quest"
OPENAI_API_KEY="<YOUR_OPENAI_API_KEY>"

This is an example .env file. You may adjust the variables according to your need.

Adjust Folder Structure

Start by organizing your files. Inside your folder, create this structure:

my-rag-pipeline/
β”œβ”€β”€ data/
β”‚   β”œβ”€β”€ imaginary_animals.csv
β”œβ”€β”€ modules/
β”‚   β”œβ”€β”€ __init__.py
β”‚   β”œβ”€β”€ retriever.py
β”‚   β”œβ”€β”€ repacker.py
β”‚   └── response_synthesizer.py
β”œβ”€β”€ indexer.py
β”œβ”€β”€ pipeline.py
└── main.py

Now, let’s start building each part.


Index Your Data

1

Run this script

python indexer.py

This indexer is a script that prepares your data so it can be searched and used by your AI pipeline.

Here's what it does, step by step:

  1. Reads your data from a CSV file (imaginary_animals.csv).

  2. Converts each row into a "chunk" (a small piece of text + metadata).

  3. Sends those chunks to your local Elasticsearch database.

  4. Uses OpenAI’s embedding model to turn text into vectors (numerical representations) so your AI can understand and match meanings.

  5. Replaces any old data in the database by deleting the existing index before re-indexing.

2

Verify your index:

Check again at your database url. You should see a new index named first-quest with 50 documents.

3

Shut down the container (when done):

docker compose down

Your data will still be there the next time you run docker compose up -d

Build Core Components of Your Pipeline

Now that your development environment is ready, it’s time to build the three main components of your RAG (Retrieval-Augmented Generation) pipeline:

  1. Retriever: Finds relevant information from your data.

  2. Repacker: Combines retrieved information into a clean, readable format.

  3. Response Synthesizer: Generates the final answer based on the user’s question and the combined context.

You’ll also prepare two extra files:

  1. pipeline.py: Connects all components together.

  2. main.py: Runs the complete program.

Create the Retriever

The retriever finds and pulls useful information from your Elasticsearch database. To do this, we’ll follow three steps:

1

Load your environment settings

These settings (like your API key, model name, etc.) are stored in the .env file.

In retriever.py, start by loading them:

import os
from dotenv import load_dotenv

load_dotenv()
2

Set up the embedding model

This helps the retriever understand the meaning of your data and match it with the question being asked. In our case, we’ll use OpenAIEMInvoker, which is a wrapper around OpenAI’s embedding API built into our SDK.

Add this to retriever.py:

from gllm_inference.em_invoker import OpenAIEMInvoker

embedding_model = OpenAIEMInvoker(
    model_name=os.getenv("EMBEDDING_MODEL"),
    api_key=os.getenv("OPENAI_API_KEY"),
)

This ensures the embedding model used here matches the one used when you indexed the data earlierβ€”this is very important for the retrieval to work properly.

3

Connect to your Elasticsearch database

We’ll now connect your project to the local Elasticsearch container:

from gllm_datastore.vector_data_store import ElasticsearchVectorDataStore

data_store = ElasticsearchVectorDataStore(
    index_name=os.getenv("INDEX_NAME"),
    url=os.getenv("ELASTICSEARCH_URL"),
    embedding_model=embedding_model
)

Set up the retriever

This final step creates the retriever using the data store:

from gllm_retrieval.retriever import BasicVectorRetriever

retriever = BasicVectorRetriever(data_store=data_store)

Create the Repacker

Once information is retrieved, it may be messy or split across several parts. The repacker combines it into a clean format the model can understand.

In repacker.py, add:

from gllm_misc.context_manipulator import Repacker

repacker = Repacker(
    mode="context",             # tells it to prepare a combined context
    delimiter="\n\n"            # optional: separates each chunk of info
)

And that’s itβ€”simple and done!

Create the Response Synthesizer

This component takes the user's question and the repacked context then sends it to a language model (like GPT-4o-mini) to generate a helpful response.

1

Load environment settings

Start your response_synthesizer.py file the same way:

import os
from dotenv import load_dotenv

load_dotenv()
2

Create the prompt

Prompts tell the model how to behave. You’ll use both a system prompt (background instructions) and a user prompt (the actual question). Below is a sample prompt that you can use as a reference:

SYSTEM_PROMPT = """
You are a helpful assistant
Use only the information provided in the context below to answer the user's question

Context:
{context}
"""

USER_PROMPT = "Question: {query}"

🧠 The {context} and {query} will be filled in automatically when the program runs.

3

Set up the request processor This SDK provides a helper called build_lm_request_processor to keep things simple. In your response_synthesizer.py, bring it all together:

from gllm_inference.builder import build_lm_request_processor

lm_request_processor = build_lm_request_processor(
    model_id=os.environ["LANGUAGE_MODEL"],
    credentials=os.environ["OPENAI_API_KEY"],
    system_template=SYSTEM_PROMPT,
    user_template=USER_PROMPT,
)
4

Build the response synthesizer

Now, create the final synthesizer object that ties everything together:

from gllm_generation.response_synthesizer import StuffResponseSynthesizer

response_synthesizer = StuffResponseSynthesizer(
    lm_request_processor=lm_request_processor
)

You now have all three components: a retriever, a repacker, and a response synthesizer.

Define State Variables

In a pipeline, each component passes data to the next using a shared state. By default, the SDK uses a predefined state type called RAGState. However, you can define your own to add or remove variables based on your use case.

What is RAGState?

RAGState is a TypedDict defined in gllm_pipeline.pipeline.states. It defines all the default variables passed between steps in the Retrieval-Augmented Generation (RAG) pipeline defined here:

class RAGState(TypedDict):
    user_query: str
    queries: list[str]
    retrieval_params: dict[str, Any]
    chunks: list
    history: str
    context: str
    response_synthesis_bundle: dict[str, Any]
    response: str
    references: str | list[str]
    event_emitter: EventEmitter

By default, pipelines use RAGState as their internal state. However, you can define your own to add or remove variables based on your use case by following the tutorial below.

Defining Custom State Variables

1. Create state class

You can define your own state class in a new file like my_state.py:

from typing import TypedDict, Any
from gllm_core.event.event_emitter import EventEmitter

class MyCustomState(TypedDict):
    user_query: str
    chunks: list
    context: str
    response_synthesis_bundle: dict[str, Any]
    response: str
    document_scores: list[float]  # πŸ‘ˆ custom field
    debug_info: dict[str, Any]   # πŸ‘ˆ custom field
    event_emitter: EventEmitter

🧠 This TypedDict structure ensures the pipeline only uses and produces valid fields.

2. Apply to pipeline

When constructing your pipeline in the next step, pass your custom state type explicitly:

from my_state import MyCustomState
from gllm_pipeline.pipeline import Pipeline

pipeline = Pipeline(
    steps=[
        retriever_step,
        repacker_step,
        bundler_step,
        response_synthesizer_step
    ],
    state_type=MyCustomState
)

Build the Pipeline

Now that all the components are ready, it’s time to connect them into a working AI pipeline! πŸŽ‰

To do this, we’ll use something called a pipeline step, which wraps your component (like the retriever or repacker) and tells the pipeline how to use it.

We’ll build the full process step-by-step in your pipeline.py file.

Types of Pipeline Steps

In our pipeline system, each action (like retrieving or generating) needs to be wrapped as a step. There are two types of steps we’ll use:

Component Step

Whenever you want to use a component (like the retriever), you turn it into a component step. To create one, you’ll use the helper function: step(...).

You can see the details of parameters here:

Parameter
What it does

Name (optional)

Just used for identification (we’ll skip it here for now).

Component

The actual logic you want to run, like the retriever or synthesizer

Input state map

Maps variables from the pipeline’s state into the component’s expected inputs.

Output state

Where to store the result, so the next step can use it.

Runtime config map (optional)

For dynamic configs like top_k values.

Fixed args (optional)

For hardcoded/static inputs (we won’t use this for now).

Bundler Step

Some components (like the response synthesizer) expect their input as a dictionary of values. To prepare that, we use a special step that just groups values from the pipeline state. You’ll use the bundle(...) helper function.

You can see the details of parameters here:

Parameter
What it does

Name (optional)

Again, for identification (not needed here).

Input states

A list of keys you want to group.

Output state

The new key to save the bundled dictionary.

Create the pipeline

Open your pipeline.py file and follow these steps:

1

Import the helpers and components

from gllm_pipeline.steps import step, bundle
from modules.retriever import retriever_component
from modules.repacker import repacker_component
from modules.response_synthesizer import response_synthesizer_component
2

Create the Retriever Step

This searches for relevant chunks based on the user’s query.

retriever = retriever_component()

retriever_step = step(
    retriever,                            # The component
    {"query": "user_query"},              # initial state: From state["user_query"] β†’ retriever.query
    "chunks",                             # output: Save output in state["chunks"]
    {"top_k": "top_k"}                    # config: from state["top_k"] β†’ retriever.top_k
)
3

Create the Repacker Step

This formats the retrieved chunks into a clean context string.

repacker = repacker_component()

repacker_step = step(
    repacker,
    {"chunks": "chunks"},                # initial state: Take from state["chunks"]
    "context"                            # output: Save output as state["context"]
)
4

Create the Bundler Step

This groups "context" into a dictionary for the response synthesizer.

bundler_step = bundle(
    {"context": "context"},              # Bundle state["context"]
    "response_synthesis_bundle"          # Save output as state["response_synthesis_bundle"]
)
5

Create the Response Synthesizer Step

This sends the context and user query to the language model to get the final answer.

response_synthesizer = response_synthesizer_component()

response_synthesizer_step = step(
    response_synthesizer,
    {
        "query": "user_query",                          # Pass state["user_query"]
        "state_variables": "response_synthesis_bundle"  # Pass state["response_synthesis_bundle"]
    },
    "response"                                           # Save the final answer in state["response"]
)
6

Connect Everything into a Pipeline

Finally, use the | operator to chain all steps in order:

e2e_pipeline = retriever_step | repacker_step | bundler_step | response_synthesizer_step

Run the Application

We'll use FastAPI, a lightweight web framework for building APIs quickly and easily in Python. Now that we’ve built the pipeline, we’ll try to create a fully functional FastAPI application that we can hit via an API endpoint.

1

Create the API in main.py

Simply download and place this script in the main.py file

2

Start the Server

Make sure you're in your Poetry environment, then run this command:

poetry run uvicorn main:app --reload

You should see something like:

INFO:     Uvicorn running on http://127.0.0.1:8000
3

Test Your RAG Pipeline via API

To test your app, download and run this run.py file

If everything works correctly, you should be able to see the logs of your RAG pipeline processes in real time!

πŸ“‚ Complete Tutorial Files

Coming soon!

Last updated