How to build a RAG pipeline using Google ADK with code examples?

#google-adk#rag#retrieval#vector-search#pipeline#chromadb

Answer

Building a RAG Pipeline with Google ADK

Google ADK provides built-in RAG support and the flexibility to build custom RAG pipelines using agents, tools, and workflow orchestration.


RAG Pipeline Architecture


Approach 1: Built-in RAG Tool

python
from google.adk.agents import Agent
from google.adk.tools import google_search

# Simple RAG using Google Search grounding
agent = Agent(
    name="rag_agent",
    model="gemini-2.5-flash",
    instruction="""Answer questions using web search for grounding.
    Always cite your sources.""",
    tools=[google_search],
)

Approach 2: Custom RAG with Function Tools

python
from google.adk.agents import Agent
import chromadb

# Initialize ChromaDB
client = chromadb.PersistentClient(path="./chroma_db")
collection = client.get_or_create_collection("documents")

def ingest_document(content: str, doc_id: str) -> str:
    """Ingest a document into the vector database.

    Args:
        content: The document text to store.
        doc_id: Unique identifier for the document.

    Returns:
        str: Confirmation message.
    """
    collection.add(
        documents=[content],
        ids=[doc_id],
    )
    return f"Document '{doc_id}' ingested successfully."

def search_documents(query: str, top_k: int = 5) -> list[dict]:
    """Search the vector database for relevant documents.

    Args:
        query: The search query.
        top_k: Number of results to return.

    Returns:
        list[dict]: Retrieved documents with content and metadata.
    """
    results = collection.query(
        query_texts=[query],
        n_results=top_k,
    )
    return [
        {"content": doc, "distance": dist}
        for doc, dist in zip(
            results["documents"][0],
            results["distances"][0]
        )
    ]

rag_agent = Agent(
    name="rag_agent",
    model="gemini-2.5-flash",
    instruction="""You are a RAG-powered assistant.
    1. Use search_documents to find relevant information
    2. Use the retrieved context to answer accurately
    3. If no relevant docs found, say so honestly
    4. Always cite which documents you used""",
    tools=[search_documents, ingest_document],
)

Approach 3: Multi-Agent RAG Pipeline

python
from google.adk.agents import Agent, SequentialAgent, LoopAgent

# Agent 1: Retriever
retriever = Agent(
    name="retriever",
    model="gemini-2.5-flash",
    instruction="Retrieve relevant documents for the user query.",
    tools=[search_documents],
    output_key="retrieved_docs",
)

# Agent 2: Grader (checks relevance)
grader = Agent(
    name="grader",
    model="gemini-2.5-flash",
    instruction="""Grade the retrieved documents for relevance.
    Retrieved docs: {retrieved_docs}
    If relevant, set output to 'RELEVANT: <docs>'.
    If not, set output to 'NOT_RELEVANT' to trigger rewrite.""",
    output_key="grade_result",
)

# Agent 3: Answer Generator
generator = Agent(
    name="generator",
    model="gemini-2.5-pro",
    instruction="""Generate a comprehensive answer using:
    Context: {retrieved_docs}
    Grade: {grade_result}
    Be accurate and cite sources.""",
    output_key="final_answer",
)

# Full RAG Pipeline
rag_pipeline = SequentialAgent(
    name="rag_pipeline",
    sub_agents=[retriever, grader, generator],
)

Approach 4: Vertex AI RAG Engine

python
from google.adk.agents import Agent
from google.adk.tools import vertex_ai_search

# Use Vertex AI's managed RAG
agent = Agent(
    name="enterprise_rag",
    model="gemini-2.5-flash",
    instruction="Answer questions using the company knowledge base.",
    tools=[vertex_ai_search],
)

Running the RAG Agent

python
from google.adk.runners import Runner
from google.adk.sessions import InMemorySessionService

runner = Runner(
    agent=rag_agent,
    app_name="rag_app",
    session_service=InMemorySessionService(),
)

# Ingest documents first
async def setup():
    session = await runner.session_service.create_session(
        app_name="rag_app", user_id="user-1"
    )

    # Ingest
    async for event in runner.run_async(
        user_id="user-1",
        session_id=session.id,
        new_message="Ingest this: ADK is an open-source framework for AI agents.",
    ):
        print(event)

    # Query
    async for event in runner.run_async(
        user_id="user-1",
        session_id=session.id,
        new_message="What is ADK?",
    ):
        print(event)

Learn more at ADK Tools and ADK Samples.