A deep dive into Multi-Agent Text-to-SQL Architectures, Dynamic Schema Linking, and Self-Correction workflows to balance high accuracy with low latency.

Remember the early days of business intelligence? You’d ask a simple question about last quarter’s sales, and your data team would get back to you in a week. Maybe two. Fast forward to today, and we’ve got data pouring into platforms like Snowflake, BigQuery, and Databricks-so much data, in fact, that storing it is no longer the real challenge. The real bottleneck now is getting answers out of it before the questions themselves become irrelevant.
We’re overwhelmed with data but lack insights. Business intelligence tea...
A deep dive into Multi-Agent Text-to-SQL Architectures, Dynamic Schema Linking, and Self-Correction workflows to balance high accuracy with low latency.

Remember the early days of business intelligence? You’d ask a simple question about last quarter’s sales, and your data team would get back to you in a week. Maybe two. Fast forward to today, and we’ve got data pouring into platforms like Snowflake, BigQuery, and Databricks-so much data, in fact, that storing it is no longer the real challenge. The real bottleneck now is getting answers out of it before the questions themselves become irrelevant.
We’re overwhelmed with data but lack insights. Business intelligence teams are buried under constant ad-hoc requests, creating what we can call knowledge latency- the frustrating delay between asking a question and receiving an answer, while the decision it could have influenced slips by.
For a while, it seemed like Large Language Models might be the solution. Tools like GPT-4 handle simple questions effectively, converting plain English into SQL for basic queries. However, when faced with real-world business questions — those involving nested logic, time-based conditions, or company-specific jargon — they often struggle to produce the correct result. The last mile of complexity is where most generic models tend to fall short.
So, how do we bridge that gap? How do we move from cool demos to a system that can reliably handle the messy, complicated, and critical questions that actually matter to a business?
In this article, I want to walk you through a practical, production-ready architecture for building a Text-to-SQL agent that’s built for complexity. Instead of relying on a single, monolithic AI call, we break the process down, much like a senior data engineer would, into distinct stages: planning the query, retrieving relevant context, generating the SQL, and then correcting it.
1. The Paradigm Shift: From Monolithic Models to Agentic Workflows
1.1 The Limitations of Monolithic Text-to-SQL
In the early days of Generative AI, Text-to-SQL was often seen as a simple machine translation task, similar to translating English to French. A single, large architecture would feed an LLM with the user’s question and the entire database schema, expecting a valid SQL query as the answer. While the Zero-Shot approach produces acceptable results for simple questions on small databases (e.g., “How many users signed up yesterday?”), It fails in real-world enterprise scenarios.
The failure modes of the monolithic approach are structural and distinct. First, the Context Window Saturation problem arises when dealing with real-world databases that may contain thousands of tables and tens of thousands of columns. Injecting a complete Data Definition Language (DDL) dump into the context window not only incurs prohibitive latency and cost but also degrades the model’s reasoning capabilities due to the lost-in-the-middle phenomenon, in which LLMs struggle to prioritize information buried in the middle of a large prompt.
Second, Reasoning Drift occurs in complex queries. Questions such as “Calculate the rolling 3-month retention rate for enterprise customers in EMEA, excluding those with beta feature flags, and compare it to the same period last year” require multi-step reasoning. A single generation pass forces the model to perform intent understanding, logic planning, schema mapping, and syntax generation simultaneously. This cognitive overload frequently results in logic conflation, missed filter conditions, or “hallucinated” joins, where the model invents relationships between tables that do not exist.
Third, the monolithic system is fundamentally Open-Loop. It operates as a fire-and-forget mechanism. If the generated SQL contains a syntax error or a logical flaw, such as querying a column that was renamed three months ago, the system fails without recourse. There is no opportunity for the model to see the error message, reflect on its mistake, and iterate.
1.2 The Agentic Solution: PExA and MAC-SQL

To address these limitations, the industry has shifted toward Agentic Workflows. An agentic system does not simply predict the next token; it perceives, reasons, acts, and critiques. In the context of Text-to-SQL, this means decomposing the monolithic generation process into discrete, manageable sub-tasks handled by specialized agents.
Recent breakthroughs validate this approach. Bloomberg’s PExA (Planning-Execution-Agent) framework, for instance, achieved 70.2% execution accuracy on the rigorous Spider 2.0 benchmark- a dataset specifically designed to test complex, real-world SQL generation.PExA’s success lies in its separation of high-level planning from low-level execution. By first articulating a logical plan, the agent creates a blueprint that guides the subsequent code generation, reducing the likelihood of logical errors.
Similarly, the MAC-SQL (Multi-Agent Collaborative SQL) framework introduces a tripartite structure comprising a Decomposer (to break down intents), a Selector (to identify relevant data structures), and a Refiner (to critique and fix errors). This collaborative approach allows the use of specialized models for different tasks-a smaller, faster model might handle schema selection. In contrast, a reasoning-heavy model like GPT-4 or Claude 3.5 Sonnet handles complex logic generation.

2. Architectural Blueprint: The DRGC Framework
To construct an agent capable of navigating the intricacies of enterprise data, we can take a Decomposition-Retrieval-Generation-Correction (DRGC) architecture. This framework assigns specific responsibilities to four distinct agent personas, orchestrated via a stateful graph.
2.1 The Planner (Decomposer Agent)
The Planner is the system’s strategic cortex. Its role is not to write code, but to understand intent. When a user asks a complex question, the Planner’s job is to break the natural-language request into a structured, logical plan or a series of sub-questions.

For example, consider the query: “Which sales regions have shown a decline in gross margin over the last four quarters despite an increase in total revenue?”
A monolithic model might attempt to write this SQL in one go, likely missing the nuances of “gross margin” calculation or the specific windowing required for “last four quarters.” The Planner, however, would output a structured plan:
1. Identify Timeframe: Define the exact date range for the “last four quarters” relative to the current date.
2. Calculate Metrics: Define the formula for “Gross Margin” (Revenue — Cost of Goods Sold) and “Total Revenue.”
3. Aggregation: Group these metrics by Region and Quarter.
4. Trend Analysis: Compare the metrics sequentially to identify regions where Margin is decreasing, AND Revenue is increasing.
This intermediate representation serves as a stable foundation for the subsequent agents. It resolves ambiguity early-if the Planner cannot define “Gross Margin,” it can ask the user for clarification before attempting any SQL.
"""Planner Agent: Breaks down complex questions into logical steps.""" from langchain_groq import ChatGroq from langchain_core.prompts import ChatPromptTemplate from loguru import logger from core.state import AgentState from config import settings class PlannerAgent: """Decomposes natural language questions into structured logical plans.""" def __init__(self): self.llm = ChatGroq( model=settings.groq_model_reasoning, temperature=settings.groq_temperature, groq_api_key=settings.groq_api_key ) # System prompt for logical planning self.prompt = ChatPromptTemplate.from_messages([ ("system", """You are a data architect specializing in SQL query planning. Your task: Decompose the user's question into clear logical steps. Guidelines: 1. Identify the core intent (aggregation, comparison, trend analysis, joins) 2. Break down into atomic logical steps 3. Define metrics and formulas explicitly 4. Specify filters, groupings, and ordering needed Output: A clear, numbered plan. Do NOT write SQL code. Example: Question: "What is the average order value by customer segment?" Plan: 1. Join orders table with customers table 2. Calculate AVG(order_total) for each customer 3. Group by customer.segment 4. Order by average value descending"""), ("user", "{question}") ]) self.chain = self.prompt | self.llm def plan(self, state: AgentState) -> dict: """Generate a logical plan for the question.""" logger.info("PLANNER: Decomposing question into logical steps") question = state["question"] try: response = self.chain.invoke({"question": question}) plan = response.content # Extract numbered steps from the plan import re steps = re.findall(r'^\d+\..*$', plan, re.MULTILINE) logger.info(f"Generated plan with {len(steps)} steps") return { "plan": plan, "plan_steps": steps, "iterations": 0, "should_retry": True } except Exception as e: logger.error(f"Planner error: {e}") return {"error": f"Planning failed: {str(e)}", "should_retry": False} # Node function for LangGraph def planner_node(state: AgentState) -> dict: """LangGraph node wrapper for PlannerAgent.""" agent = PlannerAgent() return agent.plan(state)2.2 The Schema Linker (Selector Agent)
The Schema Linker addresses the limitations of the context window and the noise problem. Its sole objective is Schema Pruning: identifying only the relevant tables and columns required for the Planner-generated query plan.
This is critical because accurate schema linking is the single most significant predictor of Text-to-SQL success. If the generator is not aware of a specific column, it cannot query it. Conversely, if it is distracted by 500 irrelevant columns, it is prone to hallucination. Research indicates that limiting the context to only relevant schema elements can improve accuracy by over 15–20% on benchmarks like BIRD-SQL.
The Linker typically employs a two-stage process:
1. Coarse Retrieval: Using vector search (semantic similarity) to retrieve the top-N (e.g., 20) tables based on the user’s query keywords.
2. Fine-Grained Selection: Using a fast LLM to analyze the schemas of those 20 tables against the Planner’s requirements and select only the strictly necessary tables and columns.
""" Schema Linker Agent (Selector): Identifies relevant tables and columns. """ from typing import List from langchain_groq import ChatGroq from langchain_core.prompts import ChatPromptTemplate from loguru import logger from core.state import AgentState from core.database import db_manager from config import settings class SchemaLinkerAgent: """ Performs schema pruning to reduce context noise. Identifies only the relevant tables and columns needed for the query. """ def __init__(self): # Use faster model for schema selection self.llm = ChatGroq( model=settings.groq_model_fast, temperature=0, groq_api_key=settings.groq_api_key ) # Prompt for table selection self.table_selection_prompt = ChatPromptTemplate.from_messages([ ("system", """You are a database schema expert. Identify which tables are relevant for the user's question. Instructions: 1. Analyze the question and logical plan 2. Select ONLY tables that are strictly necessary 3. Be conservative - include a table only if clearly needed 4. Return ONLY a comma-separated list of table names (no explanations) Example: Question: "What is the average order value by customer segment?" Available Tables: customers, orders, products, invoices, shipments, employees Response: customers, orders"""), ("user", """Question: {question} Plan: {plan} Available Tables: {all_tables} Return comma-separated table names:""") ]) # Prompt for column selection (optional, currently not used) self.column_selection_prompt = ChatPromptTemplate.from_messages([ ("system", """You are a database schema expert. Identify which columns are needed for the query. Return a JSON object mapping table names to lists of required columns. Example: {"customers": ["customer_id", "segment"], "orders": ["order_id", "customer_id", "total_amount"]} Include only columns used in: - SELECT clause - WHERE/HAVING conditions - JOIN conditions - GROUP BY or ORDER BY"""), ("user", """Plan: {plan} Schema: {schema} Return JSON with required columns:""") ]) def select_tables(self, question: str, plan: str, all_tables: List[str]) -> List[str]: """ Select relevant tables using LLM reasoning. Args: question: User's question plan: Logical plan all_tables: All available table names Returns: List of relevant table names """ try: chain = self.table_selection_prompt | self.llm response = chain.invoke({ "question": question, "plan": plan, "all_tables": ", ".join(all_tables) }) # Parse comma-separated table names selected = [t.strip() for t in response.content.split(",")] # Filter out any invalid table names selected = [t for t in selected if t in all_tables] logger.info(f"Selected {len(selected)} tables from {len(all_tables)} available") return selected except Exception as e: logger.error(f"Table selection error: {e}") # Fallback: return top 5 tables (simple heuristic) return all_tables[:5] def retrieve_schema(self, state: AgentState) -> dict: """ Retrieve and prune schema information to only relevant tables. Args: state: Current agent state Returns: Updated state with schema context """ logger.info("SCHEMA LINKER: Retrieving relevant tables and schema") question = state["question"] plan = state.get("plan", "") try: # Step 1: Get all available tables all_tables = db_manager.get_all_table_names() logger.info(f"Database has {len(all_tables)} tables") # Step 2: Select relevant tables using LLM if plan: selected_tables = self.select_tables(question, plan, all_tables) else: # Fallback: use first 10 tables if no plan available selected_tables = all_tables[:10] # Step 3: Retrieve DDL schema for selected tables schema_context = db_manager.get_schema_for_tables(selected_tables) # Step 4: Get metadata (keys, indexes, etc.) schema_metadata = {} for table in selected_tables: metadata = db_manager.get_table_metadata(table) schema_metadata[table] = metadata logger.info(f"Selected {len(selected_tables)} tables: {', '.join(selected_tables)}") return { "relevant_tables": selected_tables, "schema_context": schema_context, "schema_metadata": schema_metadata } except Exception as e: logger.error(f"Schema retrieval error: {e}") return { "error": f"Schema retrieval failed: {str(e)}", "should_retry": False } # Node function for LangGraph def schema_linker_node(state: AgentState) -> dict: """LangGraph node wrapper for SchemaLinkerAgent.""" agent = SchemaLinkerAgent() return agent.retrieve_schema(state)2.3 The SQL Generator (Writer Agent)
The SQL Generator is a specialized coder. It receives the sanitized schema from the Linker and the logical plan from the Planner. Its focus is translating that logic into valid SQL syntax for the target dialect (Postgres, Snowflake, T-SQL).
To handle complex logic, this agent is primed with “SQL-of-Thought” or “Chain-of-Thought” (CoT) prompts. Rather than immediately outputting SELECT, the agent is instructed to explain its thinking: “To calculate the moving average, I will use a window function partitioned by region…” This forced articulation helps the model organize its logic and reduces syntax errors.
""" SQL Generator Agent: Translates logical plans into SQL queries. """ from langchain_groq import ChatGroq from langchain_core.prompts import ChatPromptTemplate, FewShotChatMessagePromptTemplate from loguru import logger from core.state import AgentState from config import settings class SQLGeneratorAgent: """ Translates logical plans into valid SQL queries using Chain-of-Thought reasoning. """ def __init__(self): self.llm = ChatGroq( model=settings.groq_model_reasoning, temperature=settings.groq_temperature, groq_api_key=settings.groq_api_key ) # System prompt with Chain-of-Thought guidance self.system_prompt = """You are an expert SQL engineer. Write correct, efficient SQL queries. CRITICAL RULES: 1. Use ONLY the provided schema - Never hallucinate table or column names 2. Follow the logical plan exactly - Each plan step should map to SQL logic 3. Think before coding - Explain your approach first (Chain-of-Thought) 4. Be dialect-aware - Adjust syntax for the target database 5. Return ONLY the SQL - No markdown formatting, no extra text Chain-of-Thought Process: Before writing SQL, briefly explain: - What tables will you join and how? - What filters will you apply? - What aggregations are needed? - What is the logical flow? Then write the SQL with inline comments. SCHEMA: {schema_context} LOGICAL PLAN: {plan} USER QUESTION: {question} Now think through the solution, then write the SQL:""" self.generation_prompt = ChatPromptTemplate.from_messages([ ("system", self.system_prompt), ("user", "Generate the SQL query:") ]) def generate(self, state: AgentState, few_shot_examples=None) -> dict: """ Generate SQL query from plan and schema. Args: state: Current agent state few_shot_examples: Optional list of example queries for few-shot learning Returns: Updated state with generated SQL """ logger.info("SQL GENERATOR: Creating SQL query from plan") question = state["question"] plan = state.get("plan", "") schema_context = state.get("schema_context", "") if not schema_context: logger.error("No schema context available") return { "error": "Cannot generate SQL without schema context", "should_retry": False } try: # Build prompt with or without few-shot examples if few_shot_examples and settings.enable_dynamic_few_shot: # Create few-shot prompt with examples examples = [ {"question": ex.get("question", ""), "sql": ex.get("sql", "")} for ex in few_shot_examples ] example_prompt = ChatPromptTemplate.from_messages([ ("human", "{question}"), ("ai", "{sql}") ]) few_shot_prompt = FewShotChatMessagePromptTemplate( example_prompt=example_prompt, examples=examples ) # Combine with main prompt full_prompt = ChatPromptTemplate.from_messages([ ("system", self.system_prompt), few_shot_prompt, ("user", "Generate the SQL query:") ]) chain = full_prompt | self.llm else: chain = self.generation_prompt | self.llm # Generate SQL response = chain.invoke({ "question": question, "plan": plan, "schema_context": schema_context }) # Clean the SQL output sql = self._clean_sql(response.content) logger.info(f"Generated SQL ({len(sql)} characters)") logger.debug(f"SQL: {sql}") return { "sql_query": sql, "sql_explanation": response.content # Keep full response with reasoning } except Exception as e: logger.error(f"SQL generation error: {e}") return { "error": f"SQL generation failed: {str(e)}", "should_retry": False } def _clean_sql(self, raw_sql: str) -> str: """ Clean SQL output from LLM response. Removes markdown formatting and extracts SQL query. Args: raw_sql: Raw SQL from LLM Returns: Cleaned SQL string """ # Remove markdown code blocks sql = raw_sql.replace("```sql", "").replace("```", "").strip() # Extract SQL from response (if it contains reasoning + SQL) # Look for SQL keywords: SELECT, WITH, INSERT, UPDATE, DELETE lines = sql.split("\n") sql_start_idx = None for i, line in enumerate(lines): if any(keyword in line.upper() for keyword in ["SELECT", "WITH", "INSERT", "UPDATE", "DELETE"]): sql_start_idx = i break if sql_start_idx is not None: sql = "\n".join(lines[sql_start_idx:]) return sql.strip() # Node function for LangGraph def generator_node(state: AgentState) -> dict: """LangGraph node wrapper for SQLGeneratorAgent.""" agent = SQLGeneratorAgent() # Get few-shot examples from state (retrieved in previous step) few_shot = state.get("few_shot_examples", None) return agent.generate(state, few_shot_examples=few_shot)2.4 The Critic (Refiner Agent)
The Critic introduces a “closed-loop” system that transforms the agent from a linear pipeline into a resilient cycle. The Critic validates the generated SQL against syntax rules and, crucially, executes the query (in a sandbox or dry-run mode) to catch runtime errors.

If the database returns an error (e.g., OperationalError: no such column ‘churn_date’), the Critic does not crash. Instead, it captures the error message, analyses it, and instructs the Generator to repair the query. This Execution-Guided Error Correction allows the system to recover from hallucinations and typos, significantly boosting performance on nested queries where logic errors are common.
""" Critic Agent (Refiner): Validates, executes, and corrects SQL queries. """ from langchain_groq import ChatGroq from langchain_core.prompts import ChatPromptTemplate from loguru import logger from core.state import AgentState from core.database import db_manager from config import settings class CriticAgent: """ Validates, executes, and corrects SQL queries. Implements closed-loop error correction with execution feedback. """ def __init__(self): self.llm = ChatGroq( model=settings.groq_model_reasoning, temperature=settings.groq_temperature, groq_api_key=settings.groq_api_key ) # Prompt for error correction self.reflection_prompt = ChatPromptTemplate.from_messages([ ("system", """You are a SQL debugging expert. A query failed and you must fix it. Your Task: 1. Analyze the error message carefully 2. Review the schema to understand what went wrong 3. Identify the specific issue (wrong column, incorrect join, syntax error, etc.) 4. Generate a CORRECTED SQL query Common Error Patterns: - Column does not exist → Check schema for correct column names - Table does not exist → Verify table name spelling - Syntax error → Check SQL dialect requirements - Ambiguous column → Add table aliases - Join error → Verify foreign key relationships IMPORTANT: Return ONLY the fixed SQL query (no explanations, no markdown) SCHEMA: {schema_context} ORIGINAL QUESTION: {question} FAILED SQL: {sql_query} ERROR MESSAGE: {error} LOGICAL PLAN (reference): {plan} Generate the CORRECTED SQL:"""), ("user", "Fix the query:") ]) def execute_and_validate(self, state: AgentState) -> dict: """ Execute SQL query and handle results/errors. Args: state: Current agent state Returns: Updated state with results or error information """ logger.info("CRITIC: Executing and validating SQL query") sql_query = state.get("sql_query") if not sql_query: return { "error": "No SQL query to execute", "should_retry": False } try: # Execute the query result, error, exec_time = db_manager.execute_query( sql_query, timeout=settings.query_timeout_seconds ) if error: # Query failed - prepare for reflection logger.warning(f"Query execution failed: {error}") error_type = self._classify_error(error) return { "error": error, "error_type": error_type, "query_result": None, "execution_time_ms": exec_time, "should_retry": True } else: # Query succeeded logger.info(f"Query executed successfully in {exec_time:.2f}ms") result_preview = self._format_result_preview(result) return { "query_result": result, "result_preview": result_preview, "execution_time_ms": exec_time, "error": None, "should_retry": False } except Exception as e: logger.error(f"Execution error: {e}") return { "error": str(e), "error_type": "runtime", "should_retry": True } def reflect_and_fix(self, state: AgentState) -> dict: """ Analyze error and generate corrected SQL. Args: state: Current agent state with error information Returns: Updated state with corrected SQL """ logger.info("CRITIC: Reflecting on error and fixing SQL") iterations = state.get("iterations", 0) # Check if we've exceeded max iterations if iterations >= settings.max_iterations: logger.error(f"Max iterations ({settings.max_iterations}) reached") return { "should_retry": False, "error": f"Failed to generate valid SQL after {settings.max_iterations} attempts" } question = state["question"] plan = state.get("plan", "") schema_context = state.get("schema_context", "") sql_query = state.get("sql_query", "") error = state.get("error", "") try: chain = self.reflection_prompt | self.llm response = chain.invoke({ "question": question, "plan": plan, "schema_context": schema_context, "sql_query": sql_query, "error": error }) # Clean the fixed SQL from agents.generator import SQLGeneratorAgent generator = SQLGeneratorAgent() fixed_sql = generator._clean_sql(response.content) logger.info(f"Generated corrected SQL (iteration {iterations + 1})") logger.debug(f"Fixed SQL: {fixed_sql}") return { "sql_query": fixed_sql, "iterations": iterations + 1, "should_retry": True } except Exception as e: logger.error(f"Reflection error: {e}") return { "error": f"Failed to correct SQL: {str(e)}", "should_retry": False } def _classify_error(self, error_msg: str) -> str: """ Classify error type for better handling. Args: error_msg: Error message from database Returns: Error category """ error_lower = error_msg.lower() if "column" in error_lower and ("does not exist" in error_lower or "not found" in error_lower): return "column_not_found" elif "table" in error_lower and ("does not exist" in error_lower or "not found" in error_lower): return "table_not_found" elif "syntax" in error_lower: return "syntax_error" elif "ambiguous" in error_lower: return "ambiguous_column" elif "timeout" in error_lower: return "timeout" else: return "runtime_error" def _format_result_preview(self, result, max_rows: int = 5) -> str: """ Format query result for display. Args: result: Query result (list of rows or message) max_rows: Maximum rows to include in preview Returns: Formatted string preview """ if isinstance(result, str): return result if not result: return "Query returned no results" try: # If result is a list of Row objects if hasattr(result[0], '_mapping'): rows = [dict(row._mapping) for row in result[:max_rows]] preview = f"Returned {len(result)} row(s). Preview:\n" for i, row in enumerate(rows, 1): preview += f"Row {i}: {row}\n" if len(result) > max_rows: preview += f"... ({len(result) - max_rows} more rows)" return preview else: return str(result[:max_rows]) except Exception as e: logger.warning(f"Could not format result: {e}") return str(result)[:500] # Truncate to 500 chars # Node functions for LangGraph def executor_node(state: AgentState) -> dict: """LangGraph node wrapper for execution.""" agent = CriticAgent() return agent.execute_and_validate(state) def reflector_node(state: AgentState) -> dict: """LangGraph node wrapper for reflection/correction.""" agent = CriticAgent() return agent.reflect_and_fix(state)3. Deep Dive: Optimisation for Accuracy
Accuracy in Text-to-SQL is defined not by string matching (BLEU score), but by Execution Accuracy (EX)-does the query return the correct data? To maximise EX, particularly on complex queries, we can use advanced techniques such as Dynamic Few-Shot Learning, rigorous Schema Linking, and Self-Reflection loops.
3.1 Dynamic Few-Shot Learning with RAG
Standard few-shot prompting involves hard-coding a static set of example pairs (Question, SQL) into the prompt. However, a static set cannot cover the immense diversity of complex enterprise queries. A question about “Customer Churn” requires different examples than a question about “Inventory Turnover.”
Dynamic Few-Shot Learning solves this by utilizing Retrieval Augmented Generation (RAG) to inject the most relevant examples into the prompt at runtime.
The Mechanism:
1. Example Bank: We maintain a vector database (e.g., Chroma, Milvus, PGVector) containing high-quality, verified SQL queries paired with their natural language questions. This “Golden Corpus” acts as the agent’s long-term memory.
2. Semantic Retrieval: When a new query arrives, an embedding model (e.g., OpenAI’s text-embedding-3-small or a domain-specific model) converts the question into a vector.
3. Similarity Search: The system retrieves the top-k (usually 3–5) most semantically similar questions from the Example Bank.
4. Prompt Injection: These retrieved examples are dynamically inserted into the system prompt of the SQL Generator.
This technique is particularly effective for domain-specific logic. If a user asks for “Revenue,” and the database requires a complex calculation ((quantity * price) — discount + tax), the schema alone provides no clue. However, if the Example Bank contains a previous query calculating “Revenue,” the agent can retrieve it and mimic the formula. This effectively bridges the gap between the LLM’s generic knowledge and the enterprise’s proprietary business logic.
""" Vector store for dynamic few-shot example retrieval. """ from typing import List, Dict from langchain_chroma import Chroma from langchain_huggingface import HuggingFaceEmbeddings from langchain_core.documents import Document from loguru import logger from config import settings import os class FewShotRetriever: """ Manages a vector store of SQL examples for dynamic few-shot learning. """ def __init__(self): self.enabled = settings.enable_dynamic_few_shot if not self.enabled: logger.info("Dynamic few-shot learning disabled") return # Initialize embeddings with HuggingFace model (local) self.embeddings = HuggingFaceEmbeddings( model_name=settings.embedding_model ) # Initialize vector store persist_directory = settings.vector_store_path os.makedirs(persist_directory, exist_ok=True) self.vectorstore = Chroma( collection_name=settings.chroma_collection_name, embedding_function=self.embeddings, persist_directory=persist_directory ) logger.info(f"Few-shot retriever initialized with ChromaDB") def add_example(self, question: str, sql: str, explanation: str = None, schema_context: str = None, complexity: str = "medium"): """ Add a new SQL example to the vector store. Args: question: Natural language question sql: Corresponding SQL query explanation: Optional explanation schema_context: Optional schema info complexity: Difficulty level (simple, medium, complex) """ if not self.enabled: return try: doc = Document( page_content=question, metadata={ "sql": sql, "explanation": explanation or "", "schema_context": schema_context or "", "complexity": complexity } ) self.vectorstore.add_documents([doc]) logger.info(f"Added example: {question[:50]}...") except Exception as e: logger.error(f"Error adding example: {e}") def add_examples_batch(self, examples: List[Dict]): """ Add multiple examples at once. Args: examples: List of example dicts with 'question', 'sql', etc. """ if not self.enabled: return try: docs = [] for ex in examples: doc = Document( page_content=ex["question"], metadata={ "sql": ex.get("sql", ""), "explanation": ex.get("explanation", ""), "schema_context": ex.get("schema_context", ""), "complexity": ex.get("complexity", "medium") } ) docs.append(doc) self.vectorstore.add_documents(docs) logger.info(f"Added {len(docs)} examples to vector store") except Exception as e: logger.error(f"Error adding batch examples: {e}") def retrieve(self, question: str, k: int = None) -> List[Dict]: """ Retrieve most relevant examples for a question. Args: question: User's question k: Number of examples to retrieve Returns: List of example dicts """ if not self.enabled: return [] k = k or settings.few_shot_examples_count try: # Similarity search results = self.vectorstore.similarity_search(question, k=k) examples = [] for doc in results: examples.append({ "question": doc.page_content, "sql": doc.metadata.get("sql", ""), "explanation": doc.metadata.get("explanation", ""), "schema_context": doc.metadata.get("schema_context", ""), "complexity": doc.metadata.get("complexity", "medium") }) logger.info(f"Retrieved {len(examples)} similar examples") return examples except Exception as e: logger.error(f"Error retrieving examples: {e}") return [] def clear(self): """Clear all examples from vector store.""" if self.enabled: # Delete and recreate collection self.vectorstore.delete_collection() logger.info("Vector store cleared") # Global retriever instance few_shot_retriever = FewShotRetriever() def seed_examples(): """ Seed the vector store with common SQL patterns. This should be called during setup with your domain-specific examples. """ if not settings.enable_dynamic_few_shot: return default_examples = [ { "question": "What is the total revenue for each product category?", "sql": """SELECT category, SUM(price * quantity) as total_revenue FROM products p JOIN sales s ON p.product_id = s.product_id GROUP BY category ORDER BY total_revenue DESC""", "explanation": "Join products with sales and aggregate by category", "complexity": "simple" }, { "question": "Find customers who made purchases in the last 30 days but not in the previous 30 days", "sql": """SELECT DISTINCT c.customer_id, c.name FROM customers c JOIN orders o ON c.customer_id = o.customer_id WHERE o.order_date >= CURRENT_DATE - INTERVAL '30 days' AND c.customer_id NOT IN ( SELECT customer_id FROM orders WHERE order_date >= CURRENT_DATE - INTERVAL '60 days' AND order_date < CURRENT_DATE - INTERVAL '30 days' )""", "explanation": "Use subquery to exclude customers from previous period", "complexity": "complex" }, { "question": "Calculate the 3-month rolling average of sales", "sql": """SELECT DATE_TRUNC('month', sale_date) as month, AVG(sale_amount) OVER ( ORDER BY DATE_TRUNC('month', sale_date) ROWS BETWEEN 2 PRECEDING AND CURRENT ROW ) as rolling_3month_avg FROM sales ORDER BY month""", "explanation": "Use window function with ROWS BETWEEN for rolling average", "complexity": "complex" } ] few_shot_retriever.add_examples_batch(default_examples) logger.info(f"Seeded {len(default_examples)} default examples")3.2 Advanced Schema Linking and Pruning
For databases with hundreds of tables, passing the whole schema is impossible. We must implement a multi-stage filtering process to maximise signal and minimise noise.
The “Funnel” Approach:
1. Keyword Extraction: Use NLP to extract potential entity names (e.g., “customer”, “invoice”, “campaign”) from the query.
2. Metadata Search: Search the schema descriptions (metadata) for these entities. This requires a well-documented Data Catalogue with descriptive comments for tables and columns.
3. LLM Selection: Pass the candidate tables (e.g., top 20) to a lightweight LLM (e.g., GPT-4o-mini or Claude Haiku) and ask it to select the strictly necessary tables.
4. Column Pruning: Once tables are selected, perform a second pass to select only relevant columns.
Research on the BIRD-SQL benchmark demonstrates that effective schema linking is often more valuable than using a larger model. A smaller model with a perfectly pruned schema usually outperforms a larger model with a noisy, complete schema.
""" Schema Linker Agent (Selector): Identifies relevant tables and columns. """ from typing import List from langchain_groq import ChatGroq from langchain_core.prompts import ChatPromptTemplate from loguru import logger from core.state import AgentState from core.database import db_manager from config import settings class SchemaLinkerAgent: """ Performs schema pruning to reduce context noise. Identifies only the relevant tables and columns needed for the query. """ def __init__(self): # Use faster model for schema selection self.llm = ChatGroq( model=settings.groq_model_fast, temperature=0, groq_api_key=settings.groq_api_key ) # Prompt for table selection self.table_selection_prompt = ChatPromptTemplate.from_messages([ ("system", """You are a database schema expert. Identify which tables are relevant for the user's question. Instructions: 1. Analyze the question and logical plan 2. Select ONLY tables that are strictly necessary 3. Be conservative - include a table only if clearly needed 4. Return ONLY a comma-separated list of table names (no explanations) Example: Question: "What is the average order value by customer segment?" Available Tables: customers, orders, products, invoices, shipments, employees Response: customers, orders"""), ("user", """Question: {question} Plan: {plan} Available Tables: {all_tables} Return comma-separated table names:""") ]) # Prompt for column selection (optional, currently not used) self.column_selection_prompt = ChatPromptTemplate.from_messages([ ("system", """You are a database schema expert. Identify which columns are needed for the query. Return a JSON object mapping table names to lists of required columns. Example: {"customers": ["customer_id", "segment"], "orders": ["order_id", "customer_id", "total_amount"]} Include only columns used in: - SELECT clause - WHERE/HAVING conditions - JOIN conditions - GROUP BY or ORDER BY"""), ("user", """Plan: {plan} Schema: {schema} Return JSON with required columns:""") ]) def select_tables(self, question: str, plan: str, all_tables: List[str]) -> List[str]: """ Select relevant tables using LLM reasoning. Args: question: User's question plan: Logical plan all_tables: All available table names Returns: List of relevant table names """ try: chain = self.table_selection_prompt | self.llm response = chain.invoke({ "question": question, "plan": plan, "all_tables": ", ".join(all_tables) }) # Parse comma-separated table names selected = [t.strip() for t in response.content.split(",")] # Filter out any invalid table names selected = [t for t in selected if t in all_tables] logger.info(f"Selected {len(selected)} tables from {len(all_tables)} available") return selected except Exception as e: logger.error(f"Table selection error: {e}") # Fallback: return top 5 tables (simple heuristic) return all_tables[:5] def retrieve_schema(self, state: AgentState) -> dict: """ Retrieve and prune schema information to only relevant tables. Args: state: Current agent state Returns: Updated state with schema context """ logger.info("SCHEMA LINKER: Retrieving relevant tables and schema") question = state["question"] plan = state.get("plan", "") try: # Step 1: Get all available tables all_tables = db_manager.get_all_table_names() logger.info(f"Database has {len(all_tables)} tables") # Step 2: Select relevant tables using LLM if plan: selected_tables = self.select_tables(question, plan, all_tables) else: # Fallback: use first 10 tables if no plan available selected_tables = all_tables[:10] # Step 3: Retrieve DDL schema for selected tables schema_context = db_manager.get_schema_for_tables(selected_tables) # Step 4: Get metadata (keys, indexes, etc.) schema_metadata = {} for table in selected_tables: metadata = db_manager.get_table_metadata(table) schema_metadata[table] = metadata logger.info(f"Selected {len(selected_tables)} tables: {', '.join(selected_tables)}") return { "relevant_tables": selected_tables, "schema_context": schema_context, "schema_metadata": schema_metadata } except Exception as e: logger.error(f"Schema retrieval error: {e}") return { "error": f"Schema retrieval failed: {str(e)}", "should_retry": False } # Node function for LangGraph def schema_linker_node(state: AgentState) -> dict: """LangGraph node wrapper for SchemaLinkerAgent.""" agent = SchemaLinkerAgent() return agent.retrieve_schema(state)3.3 The Reflexion Loop (Self-Correction)
Complex queries often fail on the first attempt due to minor syntax errors or logical misalignments. A robust agent must be able to “debug” itself. This concept, known as Reflexion, mimics human debugging.

The Loop:
1. Generate: The SQL Generator produces Draft 1.
2. Validate: A syntactic checker (using a parser like sqlglot) checks for validity.
3. Execute (Dry Run): The agent attempts to EXPLAIN or execute the query.
4. Catch Error: If execution fails (e.g., OperationalError: no such column), the error message is captured.
5. Reflect: The Critic agent receives the original query, Draft 1, and the Error Message. It reasons about why it failed.
6. Regenerate: The Generator produces Draft 2 based on the Critic’s feedback.
This Execution-Guided Error Correction is central to frameworks like SQL-of-Thought and significantly boosts performance on nested queries where logic errors are common. By treating the error message as a signal rather than a failure state, the agent iteratively converges on the correct solution.
""" Critic Agent (Refiner): Validates, executes, and corrects SQL queries. """ from langchain_groq import ChatGroq from langchain_core.prompts import ChatPromptTemplate from loguru import logger from core.state import AgentState from core.database import db_manager from config import settings class CriticAgent: """ Validates, executes, and corrects SQL queries. Implements closed-loop error correction with execution feedback. """ def __init__(self): self.llm = ChatGroq( model=settings.groq_model_reasoning, temperature=settings.groq_temperature, groq_api_key=settings.groq_api_key ) # Prompt for error correction self.reflection_prompt = ChatPromptTemplate.from_messages([ ("system", """You are a SQL debugging expert. A query failed and you must fix it. Your Task: 1. Analyze the error message carefully 2. Review the schema to understand what went wrong 3. Identify the specific issue (wrong column, incorrect join, syntax error, etc.) 4. Generate a CORRECTED SQL query Common Error Patterns: - Column does not exist → Check schema for correct column names - Table does not exist → Verify table name spelling - Syntax error → Check SQL dialect requirements - Ambiguous column → Add table aliases - Join error → Verify foreign key relationships IMPORTANT: Return ONLY the fixed SQL query (no explanations, no markdown) SCHEMA: {schema_context} ORIGINAL QUESTION: {question} FAILED SQL: {sql_query} ERROR MESSAGE: {error} LOGICAL PLAN (reference): {plan} Generate the CORRECTED SQL:"""), ("user", "Fix the query:") ]) def execute_and_validate(self, state: AgentState) -> dict: """ Execute SQL query and handle results/errors. Args: state: Current agent state Returns: Updated state with results or error information """ logger.info("CRITIC: Executing and validating SQL query") sql_query = state.get("sql_query") if not sql_query: return { "error": "No SQL query to execute", "should_retry": False } try: # Execute the query result, error, exec_time = db_manager.execute_query( sql_query, timeout=settings.query_timeout_seconds ) if error: # Query failed - prepare for reflection logger.warning(f"Query execution failed: {error}") error_type = self._classify_error(error) return { "error": error, "error_type": error_type, "query_result": None, "execution_time_ms": exec_time, "should_retry": True } else: # Query succeeded logger.info(f"Query executed successfully in {exec_time:.2f}ms") result_preview = self._format_result_preview(result) return { "query_result": result, "result_preview": result_preview, "execution_time_ms": exec_time, "error": None, "should_retry": False } except Exception as e: logger.error(f"Execution error: {e}") return { "error": str(e), "error_type": "runtime", "should_retry": True } def reflect_and_fix(self, state: AgentState) -> dict: """ Analyze error and generate corrected SQL. Args: state: Current agent state with error information Returns: Updated state with corrected SQL """ logger.info("CRITIC: Reflecting on error and fixing SQL") iterations = state.get("iterations", 0) # Check if we've exceeded max iteratio