Simple Guardrail

This guide will walk you through adding a Guardrail component to your RAG pipeline that validates inputs and terminates execution when conditions are not met, ensuring your pipeline only processes valid requests.

This tutorial extends the Your First RAG Pipeline tutorial. Ensure you have followed the instructions to set up your repository and index your data.

Prerequisites

This tutorial specifically requires:

  • Completion of the Your First RAG Pipeline tutorial.

  • All setup steps listed on the Prerequisites page.

  • An Elastic Search vector data store that is already set up and available for use. Refer to supported-vector-data-store for tutorial.

Why Do We Need Guardrail?

The Guardrail component provides input validation and safety checks for your RAG pipeline. It ensures that only valid, properly formatted requests are processed, preventing errors and protecting your system from malicious or malformed inputs.

Benefits of using guardrail:

  • Input Validation: Prevents processing of invalid or malicious inputs

  • Resource Protection: Stops expensive operations on invalid requests

  • Error Prevention: Reduces pipeline failures and errors

  • Security: Adds a layer of protection against malformed requests

What is a Guard Step?

A Guard Step is an extension of conditional steps that includes automatic pipeline termination. Unlike regular conditional steps that route to different branches, guard steps ensure that if the validation check fails, the pipeline is terminated with a terminator step.

Installation

# you can use a Conda environment
pip install --extra-index-url "https://oauth2accesstoken:$(gcloud auth print-access-token)@glsdk.gdplabs.id/gen-ai-internal/simple/" gllm-rag gllm-core gllm-generation gllm-inference gllm-pipeline gllm-retrieval gllm-misc gllm-datastore

Set Up Your Project

We'll build upon the pipeline you created in the Your First RAG Pipeline tutorial. Make sure you have that working before proceeding.

Prepare your repository

Let’s prepare your workspace step by step.

1

Go to the repository you use for Your First RAG Pipeline:

cd my-rag-pipeline
2

Prepare your .env file:

Ensure you have a file named .env in your project directory with the following content:


CSV_DATA_PATH="data/imaginary_animals.csv"
ELASTICSEARCH_URL="http://localhost:9200/"
EMBEDDING_MODEL="text-embedding-3-small"
LANGUAGE_MODEL="gpt-4o-mini"
INDEX_NAME="first-quest"
OPENAI_API_KEY="<YOUR_OPENAI_API_KEY>"

This is an example .env file. You may adjust the variables according to your need.

Adjust Folder Structure

Extend your existing project structure to include the guardrail:

my-rag-pipeline/
├── data/
│   ├── imaginary_animals.csv
├── modules/
│   ├── __init__.py
│   ├── retriever.py
│   ├── repacker.py
│   ├── response_synthesizer.py
│   ├── validators.py        # 👈 New
├── indexer.py
├── pipeline.py
├── guardrail_pipeline.py    # 👈 New (alternatively you can modify the pipeline.py)
├── main.py                  # 👈 Will be modified
└── run.py                   # 👈 New

Index Your Data

Ensure you have your data indexed. If not, you should follow steps in Index Your Data before proceeding.

Build Core Components of Your Pipeline

Create the Validator Function

Validator functions are the core of guardrail functionality. When creating validator functions, follow these rules:

  1. Function signature: Must accept inputs: dict[str, Any] parameter

  2. Return type: Must return bool (True/False)

  3. Input access: Access state variables through inputs["key_name"]

  4. Validation logic: Implement your validation rules

  5. Clear naming: Use descriptive function names

Here is how you can create modules/validators.py with length validator functions:

from typing import Any

def validate_message_length(inputs: dict[str, Any]) -> bool:
    """Validate the length of the user query.

    Args:
        inputs (dict[str, Any]): The inputs to the function.

    Returns:
        bool: True if the user query is valid, False otherwise.
    """
    user_query = inputs["user_query"]
    max_query_length = inputs["max_query_length"]
    min_query_length = inputs["min_query_length"]
    
    if len(user_query) > max_query_length or len(user_query) < min_query_length:
        return False
    return True

Key features:

  1. Boolean return: Returns True for valid input length, False for invalid

  2. Inputs dictionary: Receives all input parameters in a single dictionary

Build the Pipeline

Now let's create the pipeline that includes the guardrail functionality.

1

Create the guardrail pipeline file

Create guardrail_pipeline.py with the necessary imports:

from gllm_pipeline.pipeline.states import RAGState
from gllm_pipeline.pipeline.pipeline import Pipeline
from gllm_pipeline.steps import bundle, step, log, guard

from modules import (
    repacker_component,
    response_synthesizer_component,
    retriever_component,
    validate_message_length,
)
2

Define the extended state

Create a custom state that includes validation parameters:

class GuardrailState(RAGState):
    max_query_length: int
    min_query_length: int

This extends the default RAGState to include validation parameters that control the guardrail behavior.

3

Create component instances

Instantiate your existing components:

retriever = retriever_component()
repacker = repacker_component(mode="context")
response_synthesizer = response_synthesizer_component()

These are the same components from your original pipeline, ensuring consistency.

4

Define the individual pipeline steps

Create the standard pipeline steps:

retriever_step = step(
    retriever,
    {"query": "user_query"},
    "chunks",
    {"top_k": "top_k"},
)

repacker_step = step(
    repacker,
    {"chunks": "chunks"},
    "context",
)

bundler_step = bundle(
    ["context"],
    "response_synthesis_bundle",
)

response_synthesizer_step = step(
    response_synthesizer,
    {"query": "user_query", "state_variables": "response_synthesis_bundle"},
    "response",
)

Key points:

  • Same steps as the original pipeline

  • These will be executed only if guardrail validation passes

5

Create the error logging step

Define a step to log validation failures:

error_step = log(
    message="User query length is not valid: '{user_query}'",
)

This step will log the validation failure with details about the invalid input.

6

Create the guardrail step

This is where the magic happens - the guard step validates inputs and controls execution:

guardrail_step = guard(
    validate_message_length,
    success_branch=retriever_step,
    failure_branch=error_step,
    input_state_map={
        "user_query": "user_query",
        "max_query_length": "max_query_length",
        "min_query_length": "min_query_length",
    },
)

How it works:

  • Condition: Uses validate_message_length function

  • Success branch: Executes the retriever step (continues normal pipeline)

  • Failure branch: Executes the error logging step and terminates

  • Input mapping: Maps state variables to validator function parameters

  • Automatic termination: Guard step includes terminator in failure branch

7

Compose the final pipeline

Connect all steps into the complete guardrail pipeline:

e2e_pipeline = guardrail_step | repacker_step | bundler_step | response_synthesizer_step

e2e_pipeline.state_type = GuardrailState

Pipeline flow:

  1. Guardrail Step: Validates user query length

  2. If valid: Continues with retriever, repacker, bundler, and response synthesizer

  3. If invalid: Logs error and terminates pipeline

Modify the Application Code

Here we will update the main.py file to use the guardrail pipeline.

Update Pipeline Import

1

Update the import in main.py

Add the import for the new guardrail pipeline:

from guardrail_pipeline import e2e_pipeline
2

Update the pipeline execution

Modify the run_pipeline function to use the guardrail pipeline:

async def run_pipeline(state: dict, config: dict):
    ...
    try:
        await event_emitter.emit("Starting pipeline")
        await e2e_pipeline.invoke(state, config)  # Change to new pipeline
    ...

Add Validation Parameters

The guardrail requires validation parameters to be included in the state.

1

Update the request handler

Modify the /stream endpoint to include validation parameters:

async def add_message(request: Request):
    ...
    state = {
        "user_query": user_query,
        "event_emitter": event_emitter,
        "max_query_length": 100,
        "min_query_length": 1,
    }
    config = {"top_k": top_k, "debug": debug}
    ...

Key changes:

  • Validation parameters: Added max_query_length and min_query_length

  • Configurable limits: Easy to adjust validation rules

  • State enhancement: Includes all required parameters for guardrail

Run Your Application

Now let's test the guardrail functionality with different scenarios.

1

Start your server

Run your FastAPI server as before:

poetry run uvicorn main:app --reload
2

Create run.py

Create a script to test the guardrail pipeline and run it.

import json
import requests

def run() -> None:
    """Runs the RAG pipeline and streams responses from the FastAPI server."""
    render = True
    body = {
        # "user_query": "this is a very long message that should be rejected by the guardrail, with length 100 characters, and once again, it should be rejected by the guardrail",
        "user_query": "What animal lives in the forest?",
        "top_k": 5,
        "debug": True,
    }
    response = requests.post("http://127.0.0.1:8000/stream", json=body, stream=True)

    if response.status_code == 200:
        for chunk in response.iter_content(chunk_size=8192):
            if chunk:
                event = chunk.decode("utf-8", errors="ignore")
                event = json.loads(event)
                event_type = event["type"]
                if render:
                    if event_type == "status":
                        print(f"[{event['level']}][{event['timestamp']}] {event['value']}")
                    else:
                        print(event["value"], end="", flush=True)
                else:
                    print(event)
        print()
    else:
        print(f"Error status code: {response.status_code}")
        print(response.text)

if __name__ == "__main__":
    run()
3

Test with valid query

Try this query with a valid length:

{
  "user_query": "What animal lives in the forest?",
  "top_k": 5,
  "debug": true
}

Expected behavior:

  • The pipeline will validate the query length (1-100 characters)

  • Validation will pass (query is 30 characters)

  • Normal RAG pipeline execution will continue

  • You'll see retrieval, repacking, and response generation

4

Test with invalid query (too long)

Try this query with an invalid length:

{
  "user_query": "this is a very long message that should be rejected by the guardrail, with length 100 characters, and once again, it should be rejected by the guardrail",
  "top_k": 5,
  "debug": true
}

Expected behavior:

  • The pipeline will validate the query length

  • Validation will fail (query is over 100 characters)

  • Pipeline will log the error and terminate

  • No expensive operations (retrieval, LLM calls) will be performed

5

Analyze the debug output

You should see different behaviors:

Valid Query Output:

[DEBUG] Starting pipeline
[DEBUG] [Start 'BasicVectorRetriever'] Processing input:
    - query: 'What animal lives in the forest?'
    - top_k: 5
[DEBUG] [Finished 'BasicVectorRetriever'] Successfully retrieved 5 chunks.
[DEBUG] [Start 'Repacker'] Repacking 5 chunks.
[DEBUG] [Finished 'Repacker'] Successfully repacked chunks: ...
[DEBUG] [Start 'StuffResponseSynthesizer'] Processing query: 'What animal lives in the forest?'
[DEBUG] [Finished 'StuffResponseSynthesizer'] Successfully synthesized response: ...
[DEBUG] Finished pipeline

Invalid Query Output:

[DEBUG] Starting pipeline
[DEBUG] [Start 'Messenger'] Processing input:
    - event_emitter: ...
    - state_variables: {'user_query': 'this is a very long message...'}
[DEBUG] User query length is not valid: 'this is a very long message...'
[DEBUG] [Finished 'Messenger'] Successfully produced output: None
[DEBUG] Finished pipeline

Understanding the Flow

Here's what happens in the guardrail pipeline:

Valid Input Flow

  1. Guardrail Check: Validates query length (1-100 characters)

  2. Validation Passes: Query meets length requirements

  3. Success Branch: Executes retriever step

  4. Normal Pipeline: Continues with repacker, bundler, response synthesizer

  5. Response: Returns normal RAG response

Invalid Input Flow

  1. Guardrail Check: Validates query length

  2. Validation Fails: Query doesn't meet requirements

  3. Failure Branch: Executes error logging step

  4. Termination: Pipeline terminates automatically

  5. No Processing: No expensive operations performed

Extending the Guardrail System

Multiple Validation Rules

You can add multiple validation functions:

def validate_content(inputs: dict[str, Any]) -> bool:
    """Validate query content for inappropriate language."""
    user_query = inputs["user_query"]
    forbidden_words = inputs["forbidden_words"]
    return not any(word in user_query.lower() for word in forbidden_words)

def validate_format(inputs: dict[str, Any]) -> bool:
    """Validate query format."""
    user_query = inputs["user_query"]
    return user_query.strip() != ""

Chained Guardrails

Create multiple guard steps for different validations:

content_guard = guard(
    validate_content,
    success_branch=length_guard,
    failure_branch=content_error_step,
    input_state_map={
        "user_query": "user_query",
        "forbidden_words": "forbidden_words",
    },
)

length_guard = guard(
    validate_message_length,
    success_branch=retriever_step,
    failure_branch=length_error_step,
    input_state_map={
        "user_query": "user_query",
        "max_query_length": "max_query_length",
        "min_query_length": "min_query_length",
    },
)

Troubleshooting

Common Issues

  1. Validation not working:

    • Ensure validator function returns boolean

    • Check input_state_map keys match state variables

    • Verify validator function signature is correct

  2. Pipeline not terminating:

    • Confirm guard step is properly configured

    • Check that failure_branch includes logging or error handling

    • Verify terminator step is automatically added

  3. State mapping errors:

    • Ensure all required state variables are initialized

    • Check that input_state_map keys exist in state

    • Verify validator function accesses inputs correctly

Debug Tips

  1. Enable debug mode: Set debug: true in your request to see detailed logs

  2. Check validator logic: Add print statements to validator functions

  3. Verify state variables: Ensure all required parameters are in state

  4. Test validation rules: Try different input lengths and formats

📂 Complete Tutorial Files

Coming soon!


Congratulations! You've successfully implemented a Guardrail component in your RAG pipeline. This enhancement provides robust input validation and ensures your pipeline only processes valid requests, protecting your system from errors and improving overall reliability and security.

Last updated