Your First RAG Pipeline
This guide will walk you through setting up your custom RAG pipeline.
Installation
# you can use a Conda environment
pip install --extra-index-url "https://oauth2accesstoken:$(gcloud auth print-access-token)@glsdk.gdplabs.id/gen-ai-internal/simple/" gllm-rag gllm-core gllm-generation gllm-inference gllm-pipeline gllm-retrieval gllm-misc gllm-datastore
Set Up Your Project
Prepare your repository
Letβs prepare your workspace step by step.
Create a new project folder:
mkdir my-rag-pipeline
cd my-rag-pipeline
Download these base files into your folder:
imaginary_animals.csv
indexer.py
Prepare your .env
file:
Create a file named .env
in your project directory with the following content:
CSV_DATA_PATH="data/imaginary_animals.csv"
ELASTICSEARCH_URL="http://localhost:9200/"
EMBEDDING_MODEL="text-embedding-3-small"
LANGUAGE_MODEL="gpt-4o-mini"
INDEX_NAME="first-quest"
OPENAI_API_KEY="<YOUR_OPENAI_API_KEY>"
Adjust Folder Structure
Start by organizing your files. Inside your folder, create this structure:
my-rag-pipeline/
βββ data/
β βββ imaginary_animals.csv
βββ modules/
β βββ __init__.py
β βββ retriever.py
β βββ repacker.py
β βββ response_synthesizer.py
βββ indexer.py
βββ pipeline.py
βββ main.py
Now, letβs start building each part.
Index Your Data
Run this script
python indexer.py
This indexer is a script that prepares your data so it can be searched and used by your AI pipeline.
Here's what it does, step by step:
Reads your data from a CSV file (
imaginary_animals.csv
).Converts each row into a "chunk" (a small piece of text + metadata).
Sends those chunks to your local Elasticsearch database.
Uses OpenAIβs embedding model to turn text into vectors (numerical representations) so your AI can understand and match meanings.
Replaces any old data in the database by deleting the existing index before re-indexing.
Verify your index:
Check again at your database url. You should see a new index named first-quest
with 50 documents.
Shut down the container (when done):
docker compose down
Your data will still be there the next time you run
docker compose up -d
Build Core Components of Your Pipeline
Now that your development environment is ready, itβs time to build the three main components of your RAG (Retrieval-Augmented Generation) pipeline:
Retriever: Finds relevant information from your data.
Repacker: Combines retrieved information into a clean, readable format.
Response Synthesizer: Generates the final answer based on the userβs question and the combined context.
Youβll also prepare two extra files:
pipeline.py
: Connects all components together.main.py
: Runs the complete program.
Create the Retriever
The retriever finds and pulls useful information from your Elasticsearch database. To do this, weβll follow three steps:
Load your environment settings
These settings (like your API key, model name, etc.) are stored in the .env
file.
In retriever.py
, start by loading them:
import os
from dotenv import load_dotenv
load_dotenv()
Set up the embedding model
This helps the retriever understand the meaning of your data and match it with the question being asked. In our case, weβll use OpenAIEMInvoker
, which is a wrapper around OpenAIβs embedding API built into our SDK.
Add this to retriever.py
:
from gllm_inference.em_invoker import OpenAIEMInvoker
embedding_model = OpenAIEMInvoker(
model_name=os.getenv("EMBEDDING_MODEL"),
api_key=os.getenv("OPENAI_API_KEY"),
)
This ensures the embedding model used here matches the one used when you indexed the data earlierβthis is very important for the retrieval to work properly.
Connect to your Elasticsearch database
Weβll now connect your project to the local Elasticsearch container:
from gllm_datastore.vector_data_store import ElasticsearchVectorDataStore
data_store = ElasticsearchVectorDataStore(
index_name=os.getenv("INDEX_NAME"),
url=os.getenv("ELASTICSEARCH_URL"),
embedding_model=embedding_model
)
Set up the retriever
This final step creates the retriever using the data store:
from gllm_retrieval.retriever import BasicVectorRetriever
retriever = BasicVectorRetriever(data_store=data_store)
Create the Repacker
Once information is retrieved, it may be messy or split across several parts. The repacker combines it into a clean format the model can understand.
In repacker.py
, add:
from gllm_misc.context_manipulator import Repacker
repacker = Repacker(
mode="context", # tells it to prepare a combined context
delimiter="\n\n" # optional: separates each chunk of info
)
And thatβs itβsimple and done!
Create the Response Synthesizer
This component takes the user's question and the repacked context then sends it to a language model (like GPT-4o-mini) to generate a helpful response.
Load environment settings
Start your response_synthesizer.py
file the same way:
import os
from dotenv import load_dotenv
load_dotenv()
Create the prompt
Prompts tell the model how to behave. Youβll use both a system prompt (background instructions) and a user prompt (the actual question). Below is a sample prompt that you can use as a reference:
SYSTEM_PROMPT = """
You are a helpful assistant
Use only the information provided in the context below to answer the user's question
Context:
{context}
"""
USER_PROMPT = "Question: {query}"
π§ The
{context}
and{query}
will be filled in automatically when the program runs.
Set up the request processor This SDK provides a helper called build_lm_request_processor
to keep things simple. In your response_synthesizer.py
, bring it all together:
from gllm_inference.builder import build_lm_request_processor
lm_request_processor = build_lm_request_processor(
model_id=os.environ["LANGUAGE_MODEL"],
credentials=os.environ["OPENAI_API_KEY"],
system_template=SYSTEM_PROMPT,
user_template=USER_PROMPT,
)
Build the response synthesizer
Now, create the final synthesizer object that ties everything together:
from gllm_generation.response_synthesizer import StuffResponseSynthesizer
response_synthesizer = StuffResponseSynthesizer(
lm_request_processor=lm_request_processor
)
You now have all three components: a retriever, a repacker, and a response synthesizer.
Define State Variables
In a pipeline, each component passes data to the next using a shared state. By default, the SDK uses a predefined state type called RAGState
. However, you can define your own to add or remove variables based on your use case.
What is RAGState
?
RAGState
?RAGState
is a TypedDict
defined in gllm_pipeline.pipeline.states
. It defines all the default variables passed between steps in the Retrieval-Augmented Generation (RAG) pipeline defined here:
class RAGState(TypedDict):
user_query: str
queries: list[str]
retrieval_params: dict[str, Any]
chunks: list
history: str
context: str
response_synthesis_bundle: dict[str, Any]
response: str
references: str | list[str]
event_emitter: EventEmitter
By default, pipelines use RAGState
as their internal state. However, you can define your own to add or remove variables based on your use case by following the tutorial below.
Build the Pipeline
Now that all the components are ready, itβs time to connect them into a working AI pipeline! π
To do this, weβll use something called a pipeline step, which wraps your component (like the retriever or repacker) and tells the pipeline how to use it.
Weβll build the full process step-by-step in your pipeline.py
file.
Types of Pipeline Steps
In our pipeline system, each action (like retrieving or generating) needs to be wrapped as a step. There are two types of steps weβll use:
Create the pipeline
Open your pipeline.py
file and follow these steps:
Import the helpers and components
from gllm_pipeline.steps import step, bundle
from modules.retriever import retriever_component
from modules.repacker import repacker_component
from modules.response_synthesizer import response_synthesizer_component
Create the Retriever Step
This searches for relevant chunks based on the userβs query.
retriever = retriever_component()
retriever_step = step(
retriever, # The component
{"query": "user_query"}, # initial state: From state["user_query"] β retriever.query
"chunks", # output: Save output in state["chunks"]
{"top_k": "top_k"} # config: from state["top_k"] β retriever.top_k
)
Create the Repacker Step
This formats the retrieved chunks into a clean context string.
repacker = repacker_component()
repacker_step = step(
repacker,
{"chunks": "chunks"}, # initial state: Take from state["chunks"]
"context" # output: Save output as state["context"]
)
Create the Bundler Step
This groups "context"
into a dictionary for the response synthesizer.
bundler_step = bundle(
{"context": "context"}, # Bundle state["context"]
"response_synthesis_bundle" # Save output as state["response_synthesis_bundle"]
)
Create the Response Synthesizer Step
This sends the context and user query to the language model to get the final answer.
response_synthesizer = response_synthesizer_component()
response_synthesizer_step = step(
response_synthesizer,
{
"query": "user_query", # Pass state["user_query"]
"state_variables": "response_synthesis_bundle" # Pass state["response_synthesis_bundle"]
},
"response" # Save the final answer in state["response"]
)
Connect Everything into a Pipeline
Finally, use the |
operator to chain all steps in order:
e2e_pipeline = retriever_step | repacker_step | bundler_step | response_synthesizer_step
Run the Application
We'll use FastAPI, a lightweight web framework for building APIs quickly and easily in Python. Now that weβve built the pipeline, weβll try to create a fully functional FastAPI application that we can hit via an API endpoint.
Start the Server
Make sure you're in your Poetry environment, then run this command:
poetry run uvicorn main:app --reload
You should see something like:
INFO: Uvicorn running on http://127.0.0.1:8000
If everything works correctly, you should be able to see the logs of your RAG pipeline processes in real time!
π Complete Tutorial Files
Last updated