Building an Intelligent Text-to-SQL Multi-Agent Chatbot with LangGraph and Chainli,t leveraging the power of Agentic AI for E-Commerce Use Case [Code Included]
25 min read11 hours ago
–
Introduction
Not very long ago, only handful of people (Data Engineers, Analysts) could query the SQL databases in an organization. The people who needed it the most: Sales and Marketing team, didn’t have the skills to query the databases. They had to rely on the technical teams to get some information.
This multi-agent text-2-sql chatbot removes that dependency. It empowers all the people in the organization to ask questions in natural english language.
This project demonstrate that we can democratize data access by converting natural language questions into SQL queries, executing…
Building an Intelligent Text-to-SQL Multi-Agent Chatbot with LangGraph and Chainli,t leveraging the power of Agentic AI for E-Commerce Use Case [Code Included]
25 min read11 hours ago
–
Introduction
Not very long ago, only handful of people (Data Engineers, Analysts) could query the SQL databases in an organization. The people who needed it the most: Sales and Marketing team, didn’t have the skills to query the databases. They had to rely on the technical teams to get some information.
This multi-agent text-2-sql chatbot removes that dependency. It empowers all the people in the organization to ask questions in natural english language.
This project demonstrate that we can democratize data access by converting natural language questions into SQL queries, executing them, and presenting results with intelligent visualizations.
Built using LangGraph’s state machine architecture and powered by OpenAI’s gpt-4o-mini, this system orchestrates multiple specialized AI agents that work together to provide accurate, context-aware responses to e-commerce database queries.
Press enter or click to view image in full size
Multi Agent Chatbot Flow
Problem Statement
The Challenge
Organizations maintain vast amounts of data in relational databases, but accessing this data presents several challenges:
1. Technical Barrier: Non-technical stakeholders cannot query databases directly
2. SQL Complexity: Writing complex JOINs and aggregations requires expertise
3. Time Consumption: Back-and-forth with data teams slows decision-making
4. Visualization Gap: Raw query results don’t provide immediate insights
5. Error Prone: Manual SQL queries are susceptible to syntax and logical errors
The Solution
A conversational AI system that:
- Accepts natural language questions in plain English
- Validates scope to prevent irrelevant queries
- Generates optimized SQL queries automatically
- Handles errors intelligently with retry mechanisms
- Provides natural language explanations of results
- Creates visualizations when beneficial
- Streams execution process for transparency
Tech Stack & Architecture
- Core Framework → LangGraph
- Front End: Chainlit
- AI & LLM → OpenAI API (gpt-4-mini)
- Data & Database → SQLite3, Pandas, Brazilian E-Commerce Dataset (Olist)
- Visualization → Plotly, LLM-Generated Visualization Code
- Utilities → python-dotenv
About Data
We are going to use Brazilian E-Commerce Public Dataset by Olist which is download from Kaggle.
This dataset has details of 100k orders which took place between 2016 to 2018 at olist store in Brazil.
This dataset has 9 tables, which covers order details, customer details, payment details etc.
The data is anonymized:
- An order might have multiple items.
- Each item might be fulfilled by a distinct seller.
- All text identifying stores and partners where replaced by the names of Game of Thrones great houses.
Below is the schema of the data:
Press enter or click to view image in full size
E-Commerce Data Schema [Source: Kaggle]
Download the Data:
We download the data from Kaggle using kagglehub library and save it to data folder in the repo.
import kagglehubimport shutilimport os# Download latest versionpath = kagglehub.dataset_download("olistbr/brazilian-ecommerce")# Create data folder if it doesn't existdata_folder = "data"os.makedirs(data_folder, exist_ok=True)# Copy all files from the downloaded path to the data folderfor item in os.listdir(path): source = os.path.join(path, item) destination = os.path.join(data_folder, item) if os.path.isfile(source): shutil.copy2(source, destination) print(f"Copied: {item}")print(f"\nAll files saved to '{data_folder}' folder")
Press enter or click to view image in full size
Exploring the Dataset Files
We now check the each files one by one.
1. Customer Data
# Customers Datasetcustomers_df = pd.read_csv('data/olist_customers_dataset.csv')print(f"Shape: {customers_df.shape}")customers_df.head()
Press enter or click to view image in full size
First 5 rows of the Customer Data
2. Geo Location Data
# Geolocation Datasetgeolocation_df = pd.read_csv('data/olist_geolocation_dataset.csv')print(f"Shape: {geolocation_df.shape}")geolocation_df.head()
Press enter or click to view image in full size
First 5 rows of the Geo Location Data
3. Order Items Dataset
# Order Items Datasetorder_items_df = pd.read_csv('data/olist_order_items_dataset.csv')print(f"Shape: {order_items_df.shape}")order_items_df.head()
Press enter or click to view image in full size
First 5 rows of the Order Items Data
4. Order Payments Dataset
# Order Payments Datasetorder_payments_df = pd.read_csv('data/olist_order_payments_dataset.csv')print(f"Shape: {order_payments_df.shape}")order_payments_df.head()
Press enter or click to view image in full size
First 5 rows of the Order Payments Data
5. Order Reviews Dataset
# Order Reviews Datasetorder_reviews_df = pd.read_csv('data/olist_order_reviews_dataset.csv')print(f"Shape: {order_reviews_df.shape}")order_reviews_df.head()
Press enter or click to view image in full size
First 5 rows of the Order Reviews Data
6. Orders Dataset
# Orders Datasetorders_df = pd.read_csv('data/olist_orders_dataset.csv')print(f"Shape: {orders_df.shape}")orders_df.head()
Press enter or click to view image in full size
First 5 rows of the Orders Data
7. Products Dataset
# Products Datasetproducts_df = pd.read_csv('data/olist_products_dataset.csv')print(f"Shape: {products_df.shape}")products_df.head()
Press enter or click to view image in full size
First 5 rows of the Products Data
8. Sellers Dataset
# Sellers Datasetsellers_df = pd.read_csv('data/olist_sellers_dataset.csv')print(f"Shape: {sellers_df.shape}")sellers_df.head()
Press enter or click to view image in full size
First 5 rows of the Sellers Data
9. Product Category Translation Dataset
# Product Category Translation Datasetcategory_translation_df = pd.read_csv('data/product_category_name_translation.csv')print(f"Shape: {category_translation_df.shape}")category_translation_df.head()
Press enter or click to view image in full size
First 5 rows of the Product Category Translation Data
Create the SQLite Database
We create db_init.py which performs following tasks:
• Checks if ecommerce.db exists and removes it to start fresh
• Creates a new SQLite database connection
• Loads each CSV file from the /data folder into a database table using pandas
• Prints a summary of rows and columns for each table, then closes the connection
#db_init.pyimport pandas as pdimport sqlite3import os# Path to local data folderdata_path = "data"db = "ecommerce.db"if os.path.exists(db): os.remove(db)conn = sqlite3.connect(db)def load(name): df = pd.read_csv(f"{data_path}/{name}") table_name = name.replace("olist_", "").replace("_dataset.csv", "").replace(".csv", "") df.to_sql(table_name, conn, index=False, if_exists="replace") print(f"Loaded: {name} -> {table_name} table ({df.shape[0]} rows, {df.shape[1]} columns)")# Load all datasetsload("olist_customers_dataset.csv")load("olist_orders_dataset.csv")load("olist_order_items_dataset.csv")load("olist_order_payments_dataset.csv")load("olist_order_reviews_dataset.csv")load("olist_products_dataset.csv")load("olist_sellers_dataset.csv")load("olist_geolocation_dataset.csv")load("product_category_name_translation.csv")conn.close()print(f"\nDatabase created: {db}")print("Tables: customers, orders, order_items, order_payments, order_reviews, products, sellers, geolocation, product_category_name_translation")
To execute the above file, please run below command in the terminal:
#Terminal python db_init.py
#OutputLoaded: olist_customers_dataset.csv -> customers table (99441 rows, 5 columns)Loaded: olist_orders_dataset.csv -> orders table (99441 rows, 8 columns)Loaded: olist_order_items_dataset.csv -> order_items table (112650 rows, 7 columns)Loaded: olist_order_payments_dataset.csv -> order_payments table (103886 rows, 5 columns)Loaded: olist_order_reviews_dataset.csv -> order_reviews table (99224 rows, 7 columns)Loaded: olist_products_dataset.csv -> products table (32951 rows, 9 columns)Loaded: olist_sellers_dataset.csv -> sellers table (3095 rows, 4 columns)Loaded: olist_geolocation_dataset.csv -> geolocation table (1000163 rows, 5 columns)Loaded: product_category_name_translation.csv -> product_category_name_translation table (71 rows, 2 columns)Database created: ecommerce.dbTables: customers, orders, order_items, order_payments, order_reviews, products, sellers, geolocation, product_category_name_translation
Defining the Agentic Flow
We now define text2sql_agent.py which defines various agents and functions as follows.
- This module builds a full Text-to-SQL agent using LangGraph, converting natural-language questions into SQL, executing them, and generating natural-language answers.
- It also includes automatic error correction, scope checking, and optional Plotly graph generation, forming a complete end-to-end e-commerce analytics assistant.
1. Importing the Libraries:
We import the required libraries. Load the OPENAI_API_KEY from the .env file.
#text2sql_agent.pyimport osimport sqlite3from typing import TypedDictfrom langgraph.graph import StateGraph, ENDfrom openai import OpenAIimport jsonimport pandas as pd# Initialize OpenAI clientclient = OpenAI(api_key=os.getenv("OPENAI_API_KEY"))# Database configurationDB_PATH = "ecommerce.db"
2. Defining the Schema
We define the database schema as follows:
#text2sql_agent.py# Database schema informationSCHEMA_INFO = """Database Schema for E-commerce System:1. customers - customer_id (TEXT): Unique customer identifier - customer_unique_id (TEXT): Unique customer identifier across datasets - customer_zip_code_prefix (INTEGER): Customer zip code - customer_city (TEXT): Customer city - customer_state (TEXT): Customer state2. orders - order_id (TEXT): Unique order identifier - customer_id (TEXT): Foreign key to customers - order_status (TEXT): Order status (delivered, shipped, etc.) - order_purchase_timestamp (TEXT): When the order was placed - order_approved_at (TEXT): When payment was approved - order_delivered_carrier_date (TEXT): When order was handed to carrier - order_delivered_customer_date (TEXT): When customer received the order - order_estimated_delivery_date (TEXT): Estimated delivery date3. order_items - order_id (TEXT): Foreign key to orders - order_item_id (INTEGER): Item sequence number within order - product_id (TEXT): Foreign key to products - seller_id (TEXT): Foreign key to sellers - shipping_limit_date (TEXT): Shipping deadline - price (REAL): Item price - freight_value (REAL): Shipping cost4. order_payments - order_id (TEXT): Foreign key to orders - payment_sequential (INTEGER): Payment sequence number - payment_type (TEXT): Payment method (credit_card, boleto, etc.) - payment_installments (INTEGER): Number of installments - payment_value (REAL): Payment amount5. order_reviews - review_id (TEXT): Unique review identifier - order_id (TEXT): Foreign key to orders - review_score (INTEGER): Review score (1-5) - review_comment_title (TEXT): Review title - review_comment_message (TEXT): Review message - review_creation_date (TEXT): When review was created - review_answer_timestamp (TEXT): When review was answered6. products - product_id (TEXT): Unique product identifier - product_category_name (TEXT): Product category (in Portuguese) - product_name_lenght (REAL): Product name length - product_description_lenght (REAL): Product description length - product_photos_qty (REAL): Number of product photos - product_weight_g (REAL): Product weight in grams - product_length_cm (REAL): Product length in cm - product_height_cm (REAL): Product height in cm - product_width_cm (REAL): Product width in cm7. sellers - seller_id (TEXT): Unique seller identifier - seller_zip_code_prefix (INTEGER): Seller zip code - seller_city (TEXT): Seller city - seller_state (TEXT): Seller state8. geolocation - geolocation_zip_code_prefix (INTEGER): Zip code prefix - geolocation_lat (REAL): Latitude - geolocation_lng (REAL): Longitude - geolocation_city (TEXT): City name - geolocation_state (TEXT): State code9. product_category_name_translation - product_category_name (TEXT): Category name in Portuguese - product_category_name_english (TEXT): Category name in English"""
3. Agent State
De now define the Agent State using TypedDict format.
Using TypedDict instead of a normal Dict in LangGraph is helpful because:
- **You can’t accidentally use the wrong data: **Python will warn you if you put the wrong kind of value in the state.
- **Your editor knows what keys exist: **so you get autocomplete and fewer mistakes when typing.
#text2sql_agent.pyclass AgentState(TypedDict): """State of the agent workflow""" question: str sql_query: str query_result: str final_answer: str error: str iteration: int needs_graph: bool graph_type: str graph_json: str # Plotly figure JSON for Chainlit is_in_scope: bool # Whether the question is about e-commerce data
4. Agent Config
We now define AGENT_CONFIGS where the role and system prompt of each agent is described.
# text2sql_agent.py# Agent configurations with different roles and personalitiesAGENT_CONFIGS = { "guardrails_agent": { "role": "Security and Scope Manager", "system_prompt": "You are a strict guardrails system that filters questions to ensure they are relevant to e-commerce data analysis or identifies greetings.", }, "sql_agent": { "role": "SQL Expert", "system_prompt": "You are a senior SQL developer specializing in e-commerce databases. Generate only valid SQLite queries without any formatting or explanation.", }, "analysis_agent": { "role": "Data Analyst", "system_prompt": "You are a helpful data analyst that explains database query results in natural language with clear insights.", }, "viz_agent": { "role": "Visualization Specialist", "system_prompt": "You are a data visualization expert. Generate clean, executable Plotly code without any markdown formatting or explanations.", }, "error_agent": { "role": "Error Recovery Specialist", "system_prompt": "You diagnose and fix SQL errors with expert knowledge of database schemas and query optimization.", }}
5. Guardrail Agent:
Purpose
- Determines whether the question is about the e-commerce dataset.
- Distinguishes greetings, valid analytics questions, and out-of-scope queries.
Input
question
Output
is_in_scope(True/False)final_answer(If greeting or out of scope)- Passes state unchanged if in-scope.
Notes
- If greeting → sends friendly welcome message and ends.
- If out-of-scope → returns predefined rejection message.
- Only in-scope questions continue to SQL generation.
#text2sql_agent.pydef guardrails_agent(state: AgentState) -> AgentState: """Check if the question is within scope (e-commerce related)""" question = state["question"] prompt = f"""You are a guardrails system for an e-commerce database chatbot. Your job is to determine if a user's question is related to e-commerce data, if it's a greeting, or if it's out of scope.The chatbot has access to an e-commerce database with information about:- Customers and their locations- Orders and order status (data from 2016-2018)- Products and categories- Sellers- Payments- Reviews- Shipping and delivery informationExamples of GREETING messages:- "Hi", "Hello", "Hey"- "Good morning", "Good afternoon"- "How are you?"- Any casual greeting or introductionExamples of IN-SCOPE questions:- "How many orders were placed last month?"- "What are the top selling products?"- "Show me customer distribution by state"- "What is the average order value?"- "Which sellers have the highest ratings?"Examples of OUT-OF-SCOPE questions:- Personal questions (e.g., "What is my wife's name?", "Where do I live?")- Political questions (e.g., "Who should I vote for?", "What do you think about the president?")- General knowledge (e.g., "What is the capital of France?", "How does photosynthesis work?")- Unrelated topics (e.g., "Tell me a joke", "What's the weather like?")User Question: {question}Analyze the question and respond in JSON format:{{ "is_in_scope": true/false, "is_greeting": true/false, "reason": "brief explanation of why it is or isn't in scope or if it's a greeting"}}If the question is a greeting, mark is_greeting as true and is_in_scope as false.If the question is ambiguous but could potentially relate to the e-commerce data, mark it as in_scope.""" response = client.chat.completions.create( model="gpt-4o-mini", messages=[ {"role": "system", "content": AGENT_CONFIGS["guardrails_agent"]["system_prompt"]}, {"role": "user", "content": prompt} ], temperature=0, response_format={"type": "json_object"} ) result = json.loads(response.choices[0].message.content) state["is_in_scope"] = result.get("is_in_scope", False) is_greeting = result.get("is_greeting", False) # If it's a greeting, provide a welcome message if is_greeting: state["final_answer"] = "Hi! I am your e-commerce assistant. I can answer all the queries related to orders, customers, products, sellers, payments, and reviews between 2016-2018. How can I help you today?" return state # If out of scope, set the final answer immediately if not state["is_in_scope"]: state["final_answer"] = "I apologize, but your question appears to be out of scope. I can only answer questions about the e-commerce data, including:\n\n- Customer information and locations\n- Orders and order status\n- Products and categories\n- Sellers and their performance\n- Payment information\n- Reviews and ratings\n- Shipping and delivery data\n\nPlease ask a question related to the e-commerce database." return state
6. SQL Generation Agent:
Purpose
- Converts natural language into syntactically correct SQLite SQL.
- Generates only SQL — no markdown, no explanation.
Input
question- Full database schema
Output
sql_query- Increments
iterationcount
Notes
- Adds LIMIT 10 when query could be large.
- Supports multi-query output separated by semicolons.
- Must respect schema exactly.
#text2sql_agent.pydef sql_agent(state: AgentState) -> AgentState: """Generate SQL query from natural language question""" question = state["question"] iteration = state.get("iteration", 0) prompt = f"""You are a SQL expert. Convert the following natural language question into a valid SQLite query.{SCHEMA_INFO}Question: {question}Important Guidelines:1. Use only the tables and columns mentioned in the schema2. Use proper JOIN clauses when querying multiple tables3. Return ONLY the SQL query without any explanation or markdown formatting4. If the question contains multiple sub-questions, generate separate SQL queries separated by semicolons5. Use aggregate functions (COUNT, SUM, AVG, etc.) appropriately6. Add LIMIT clauses for queries that might return many rows (default LIMIT 10 unless user specifies)7. Use proper WHERE clauses to filter data8. For date comparisons, remember the dates are stored as TEXT in ISO format9. Each SQL statement should be on its own line for clarity when multiple queries are neededGenerate the SQL query:""" response = client.chat.completions.create( model="gpt-4o-mini", messages=[ {"role": "system", "content": AGENT_CONFIGS["sql_agent"]["system_prompt"]}, {"role": "user", "content": prompt} ], temperature=0 ) sql_query = response.choices[0].message.content.strip() # Remove markdown code blocks if present sql_query = sql_query.replace("```sql", "").replace("```", "").strip() state["sql_query"] = sql_query state["iteration"] = iteration + 1 return state
7. Execute SQL:
Purpose
- Runs the SQL safely against SQLite.
- Supports multi-statement SQL.
Input
sql_query
Output
query_result(JSON string)- Or sets
erroron failure
Notes
- Converts rows → list of dicts.
- Limits result sets to 100 rows.
- Handles multiple queries by labeling
query_1,query_2, etc.
#text2sql_agent.pydef execute_sql(state: AgentState) -> AgentState: """Execute the generated SQL query (handles multiple queries if present)""" sql_query = state["sql_query"] try: conn = sqlite3.connect(DB_PATH) cursor = conn.cursor() # Split multiple SQL statements (separated by semicolons) # Remove empty statements and strip whitespace sql_statements = [stmt.strip() for stmt in sql_query.split(';') if stmt.strip()] all_results = [] # Execute each statement separately for i, statement in enumerate(sql_statements): cursor.execute(statement) # Fetch results for this statement results = cursor.fetchall() if results: column_names = [description[0] for description in cursor.description] # Convert to list of dictionaries formatted_results = [] for row in results[:100]: # Limit to 100 rows per query formatted_results.append(dict(zip(column_names, row))) # If multiple queries, label them if len(sql_statements) > 1: all_results.append({ f"query_{i+1}": formatted_results, f"query_{i+1}_sql": statement }) else: all_results = formatted_results conn.close() # Format results if not all_results: state["query_result"] = "No results found." else: state["query_result"] = json.dumps(all_results, indent=2) state["error"] = "" except Exception as e: state["error"] = f"SQL Execution Error: {str(e)}" state["query_result"] = "" return state
8. Error Agent:
Purpose
- Attempts to automatically fix bad SQL.
Input
errorsql_queryquestion
Output
- Repaired
sql_query - Clears
error - Increments retry
iteration
Retry Logic
- Retries up to 3 times.
- If still failing after 3 attempts → returns apology in
final_answer.
#text2sql_agent.pydef error_agent(state: AgentState) -> AgentState: """Handle errors and attempt to fix the SQL query""" error = state["error"] sql_query = state["sql_query"] question = state["question"] iteration = state.get("iteration", 0) # If we've tried too many times, give up if iteration > 3: state["final_answer"] = f"I apologize, but I'm having trouble generating a correct SQL query for your question. Error: {error}" return state prompt = f"""The following SQL query failed with an error. Please fix it.{SCHEMA_INFO}Original Question: {question}Failed SQL Query: {sql_query}Error: {error}Generate a corrected SQL query that will work. Return ONLY the SQL query without any explanation or markdown formatting:""" response = client.chat.completions.create( model="gpt-4o-mini", messages=[ {"role": "system", "content": AGENT_CONFIGS["error_agent"]["system_prompt"]}, {"role": "user", "content": prompt} ], temperature=0 ) corrected_query = response.choices[0].message.content.strip() corrected_query = corrected_query.replace("```sql", "").replace("```", "").strip() state["sql_query"] = corrected_query state["error"] = "" # Clear the error for retry state["iteration"] = iteration + 1 # Increment iteration counter return state
9. Analysis Agent
Purpose
- Converts raw SQL JSON output into a readable explanation.
Input
questionsql_queryquery_result
Output
final_answer
Notes
- Supports multi-query responses.
- Produces user-friendly numbered lists, insights, summaries
#text2sql_agent.pydef analysis_agent(state: AgentState) -> AgentState: """Generate natural language answer from query results""" question = state["question"] sql_query = state["sql_query"] query_result = state["query_result"] prompt = f"""You are a helpful assistant that explains database query results in natural language.Original Question: {question}SQL Query Used: {sql_query}Query Results:{query_result}Please provide a clear, concise answer to the original question based on the query results.Format the answer in a user-friendly way. If the results contain numbers, present them clearly.If there are multiple queries/results (for multi-part questions), address each part of the question separately.Use bullet points or numbered lists for multiple answers.Answer:""" response = client.chat.completions.create( model="gpt-4o-mini", messages=[ {"role": "system", "content": AGENT_CONFIGS["analysis_agent"]["system_prompt"]}, {"role": "user", "content": prompt} ], temperature=0 ) final_answer = response.choices[0].message.content.strip() state["final_answer"] = final_answer return state
10. Decide Graph Need:
Purpose
- Determines whether visualization adds value.
Input
questionquery_result
Output
needs_graph: True/Falsegraph_type: “bar”, “line”, “pie”, “scatter”
Rules
- Time trends → line
- Category comparison → bar
- Proportion → pie
- Correlation → scatter
- Single numbers → no graph
#text2sql_agent.pydef decide_graph_need(state: AgentState) -> AgentState: """Decide if a graph visualization would be helpful for the query""" question = state["question"] query_result = state["query_result"] # If no results or error, no graph needed if not query_result or query_result == "No results found." or state.get("error"): state["needs_graph"] = False state["graph_type"] = "" return state prompt = f"""Analyze the following question and query results to determine if a graph visualization would be helpful.Question: {question}Query Results Sample:{query_result[:500]}...Determine:1. Would a graph be helpful for this data? (YES/NO)2. If yes, what type of graph? (bar, line, pie, scatter)Consider:- Trends over time → line chart- Comparisons between categories → bar chart- Proportions/percentages → pie chart- Correlations → scatter plot- Simple counts or single values → NO graph neededRespond in JSON format:{{"needs_graph": true/false, "graph_type": "bar/line/pie/scatter/none", "reason": "brief explanation"}}""" response = client.chat.completions.create( model="gpt-4o-mini", messages=[ {"role": "system", "content": "You are a data visualization expert. Analyze queries and determine if visualization would add value."}, {"role": "user", "content": prompt} ], temperature=0, response_format={"type": "json_object"} ) decision = json.loads(response.choices[0].message.content) state["needs_graph"] = decision.get("needs_graph", False) state["graph_type"] = decision.get("graph_type", "none") return state
11. Visualization Agent
Purpose
- Generates
Plotlycode using the LLM and executes it. - Converts the resulting figure into JSON for frontend rendering.
Input
query_resultgraph_type- Internal dataframe
df
Output
graph_json
Notes
- LLM returns code only — no markdown.
- Code executed in a restricted namespace.
- Figure must be named
fig.
#text2sql_agent.pydef viz_agent(state: AgentState) -> AgentState: """Generate a graph visualization from query results using LLM-generated Plotly code""" query_result = state["query_result"] graph_type = state["graph_type"] question = state["question"] try: # Parse query results results = json.loads(query_result) if not results or len(results) == 0: state["graph_json"] = "" return state # Convert to DataFrame for context df = pd.DataFrame(results) columns = df.columns.tolist() sample_data = df.head(5).to_dict('records') # Generate Plotly code using LLM prompt = f"""Generate Python code using Plotly to visualize the following data.Question: {question}Graph Type: {graph_type}Columns: {columns}Sample Data (first 5 rows): {json.dumps(sample_data, indent=2)}Total Rows: {len(df)}Requirements:1. Use plotly.graph_objects or plotly.express2. The data is already loaded as 'df' (a pandas DataFrame)3. Create an appropriate {graph_type} chart4. Limit data to top 20 rows if there are many rows5. Add proper titles, labels, and formatting6. The figure variable must be named 'fig'7. Return ONLY the Python code, no explanations or markdown8. Do NOT include any import statements9. Do NOT include code to show the figure (no fig.show())10. Make the visualization visually appealing with appropriate colors and layout11. Update the layout for better interactivity (hover info, responsive sizing)Generate the Plotly code:""" response = client.chat.completions.create( model="gpt-4o-mini", messages=[ {"role": "system", "content": AGENT_CONFIGS["viz_agent"]["system_prompt"]}, {"role": "user", "content": prompt} ], temperature=0 ) plotly_code = response.choices[0].message.content.strip() # Remove markdown code blocks if present plotly_code = plotly_code.replace("```python", "").replace("```", "").strip() # Prepare execution environment exec_globals = { 'df': df, 'pd': pd, 'json': json } # Import plotly dynamically try: import plotly.graph_objects as go import plotly.express as px exec_globals['go'] = go exec_globals['px'] = px except ImportError: print("Plotly not installed. Installing...") import subprocess subprocess.check_call(['pip', 'install', 'plotly']) import plotly.graph_objects as go import plotly.express as px exec_globals['go'] = go exec_globals['px'] = px # Execute the generated code exec(plotly_code, exec_globals) # Get the figure object fig = exec_globals.get('fig') if fig is None: raise ValueError("Generated code did not create a 'fig' variable") # Export figure as JSON for Chainlit's Plotly element graph_json = fig.to_json() state["graph_json"] = graph_json except Exception as e: print(f"Graph generation error: {e}") print(f"Generated code:\n{plotly_code if 'plotly_code' in locals() else 'No code generated'}") state["graph_json"] = "" return state
12. Helper Functions
We now define couple of helper functions which are used to route the flow based on various conditions.
**should_retry**
- Determines whether SQL execution should be retried based on presence of an error.
- Allows up to 3 retry attempts using the
iterationcounter. - Returns
"retry","end", or"success"to control the LangGraph flow.
**should_generate_graph**
- Checks the
needs_graphflag set earlier by the visualization decision agent. - Returns
"viz_agent"if a graph should be generated; otherwise returns"skip_graph". - Controls whether the graph-generating node is executed in the workflow.
**check_scope**
- Verifies whether the user’s question is allowed using
is_in_scope. - Returns
"in_scope"for valid analytics questions and"out_of_scope"otherwise. - Determines early termination of the graph when a query is irrelevant or a greeting.
#text2sql_agent.pydef should_retry(state: AgentState) -> str: """Decide whether to retry after an error""" if state.get("error"): iteration = state.get("iteration", 0) if iteration <= 3: return "retry" else: return "end" return "success"def should_generate_graph(state: AgentState) -> str: """Decide whether to generate a graph""" if state.get("needs_graph", False): return "viz_agent" return "skip_graph"def check_scope(state: AgentState) -> str: """Check if question is in scope to continue processing""" if state.get("is_in_scope", True): return "in_scope" return "out_of_scope"
13. Build the LangGraph Agentic Flow
We now define create_text2sql_graph function which:
- Builds and registers all nodes in the pipeline, including
guardrails_agent,sql_agent,execute_sql,analysis_agent,error_agent,decide_graph_need, andviz_agent. - Connects nodes using both linear and conditional edges, powered by helper functions like
check_scope,should_retry, andshould_generate_graph. - Compiles the graph into an executable state machine that LangGraph can run efficiently and deterministically.
#text2sql_agent.py# Build the LangGraph workflowdef create_text2sql_graph(): """Create the LangGraph state graph for Text2SQL with graph generation""" workflow = StateGraph(AgentState) # Add nodes workflow.add_node("guardrails_agent", guardrails_agent) workflow.add_node("sql_agent", sql_agent) workflow.add_node("execute_sql", execute_sql) workflow.add_node("analysis_agent", analysis_agent) workflow.add_node("error_agent", error_agent) workflow.add_node("decide_graph_need", decide_graph_need) workflow.add_node("viz_agent", viz_agent) # Add edges - start with guardrails check workflow.set_entry_point("guardrails_agent") # Conditional edge from guardrails - only proceed if in scope workflow.add_conditional_edges( "guardrails_agent", check_scope, { "in_scope": "sql_agent", "out_of_scope": END } ) workflow.add_edge("sql_agent", "execute_sql") # Conditional edge based on execution success workflow.add_conditional_edges( "execute_sql", should_retry, { "success": "analysis_agent", "retry": "error_agent", "end": "analysis_agent" } ) workflow.add_edge("error_agent", "execute_sql") workflow.add_edge("analysis_agent", "decide_graph_need") # Conditional edge for graph generation workflow.add_conditional_edges( "decide_graph_need", should_generate_graph, { "viz_agent": "viz_agent", "skip_graph": END } ) workflow.add_edge("viz_agent", END) return workflow.compile()# Create the compiled graphtext2sql_graph = create_text2sql_graph()## Defining the function to generate LangGraph flow visualizationdef generate_graph_visualization(output_path: str = "text2sql_workflow.png") -> str: """ Generate a PNG visualization of the LangGraph workflow. Args: output_path: Path where the PNG file will be saved (default: "text2sql_workflow.png") Returns: str: Path to the generated PNG file """ try: # Get the graph visualization graph_image = text2sql_graph.get_graph().draw_mermaid_png() # Save to file with open(output_path, "wb") as f: f.write(graph_image) print(f"Graph visualization saved to: {output_path}") return output_path except Exception as e: print(f"Error generating graph visualization: {e}") print("Make sure you have 'pygraphviz' or 'grandalf' installed:") print(" pip install pygraphviz") print(" or") print(" pip install grandalf") return None
Press enter or click to view image in full size
LangGraph Flow
14. Stream the output:
We now define the process_question_stream which:
- Streams node-by-node execution events (
node_start,node_end,error,final) to enable real-time debugging, UI visualization, or timeline playback. - Initializes a full
AgentState, then updates and re-yields state incrementally as each node completes usingtext2sql_graph.astream_events(). - Provides a safe async wrapper with structured error handling, ensuring any exceptions are emitted as
"error"events rather than crashing the pipeline.
This is the most important step.
We could have used traditional text2sql_graph.invoke() method also which is synchronous and opaque, the process_question_stream() method provides full transparency and async event-by-event introspection for debugging and monitoring. You will know the real-time update on the UI for each node.
#text2sql_agent.pyasync def process_question_stream(question: str): """ Process a natural language question and stream node execution events. This is an async generator that yields node events for debugging visualization. Yields: dict: Event with type ('node_start', 'node_end', 'error', 'final') and data """ initial_state = AgentState( question=question, sql_query="", query_result="", final_answer="", error="", iteration=0, needs_graph=False, graph_type="", graph_json="", is_in_scope=True ) current_state = initial_state.copy() try: # Stream events from the graph async for event in text2sql_graph.astream_events( initial_state, config={"recursion_limit": 50}, version="v1" ): event_type = event.get("event") # Node start event if event_type == "on_chain_start": node_name = event.get("name", "") if node_name in ["check_guardrails", "generate_sql", "execute_sql", "generate_answer", "handle_error", "decide_graph_need", "generate_graph"]: yield { "type": "node_start", "node": node_name, "input": current_state } # Node end event elif event_type == "on_chain_end": node_name = event.get("name", "") if node_name in ["check_guardrails", "generate_sql", "execute_sql", "generate_answer", "handle_error", "decide_graph_need", "generate_graph"]: output = event.get("data", {}).get("output", {}) if output: current_state.update(output) yield { "type": "node_end", "node": node_name, "output": output, "state": current_state.copy() } # Send final result yield { "type": "final", "result": current_state } except Exception as e: yield { "type": "error", "error": str(e) }if __name__ == "__main__": # Test the agent print("=" * 80) print("Text2SQL Agent - Use 'chainlit run app.py' to start the web interface") print("=" * 80) print("\nThis module is meant to be imported and used via the Chainlit app.") print("Run: chainlit run app.py")
You can skip the next section (Front End) if you are a beginner. Simple write all above functions in a notebook and test your queries there itself.
Notebook is also kept in the repo here.
Testing in Notebook:
Let’s define the helping functions to test in the notebook:
# define the process questiondef process_question(question: str) -> dict: """ Process a natural language question and return the final result. This is a simple synchronous function for notebook usage. Args: question: Natural language question about the e-commerce data Returns: dict: Final state with answer, SQL query, and graph data if applicable """ initial_state = AgentState( question=question, sql_query="", query_result="", final_answer="", error="", iteration=0, needs_graph=False, graph_type="", graph_json="", is_in_scope=True ) try: # Invoke the graph final_state = text2sql_graph.invoke( initial_state, config={"recursion_limit": 50} ) return final_state except Exception as e: return { "error": str(e), "final_answer": f"An error occurred while processing your question: {str(e)}" }## Define the test function:def test_process_question(question: str): result = process_question(question) print("Final Answer:") print(result.get("final_answer", "No answer generated.")) if result.get("sql_query"): print("\nGenerated SQL Query:") print(result.get("sql_query", "No SQL query generated.")) print(f"\nNeeds Graph: {result['needs_graph']}") if result['needs_graph']: print(f"Graph Type: {result['graph_type']}") if result.get("graph_json"): import plotly.graph_objects as go import plotly.io as pio # Load the figure from JSON fig = pio.from_json(result['graph_json']) # Display in notebook fig.show() else: print("No graph was generated for this question.")
Example 1: Hello?
Press enter or click to view image in full size
*Example 2: *What are the top 5 states by number of customers?
Press enter or click to view image in full size
Example 3: What are the most popular payment methods?
Press enter or click to view image in full size