52 SQL Injection Code Audit Exercises: From Beginner to Expert 🔐
Master SQL injection detection through hands-on code review — no exploit development required.
Whether you’re preparing for a Security Engineering interview, studying for the OSWE, or building AppSec skills, these exercises will train your eye to spot vulnerable code patterns across Python’s most popular database libraries.
🎯 What You’ll Learn
- Identify SQL injection vulnerabilities in sqlite3, psycopg2, mysql.connector, SQLAlchemy, and Django ORM
- Recognize when input validation IS sufficient vs. when parameterization is required
- Understand the difference between identifiers (column/table names) and values
- Write secure fixes using proper parameterization patterns
##…
52 SQL Injection Code Audit Exercises: From Beginner to Expert 🔐
Master SQL injection detection through hands-on code review — no exploit development required.
Whether you’re preparing for a Security Engineering interview, studying for the OSWE, or building AppSec skills, these exercises will train your eye to spot vulnerable code patterns across Python’s most popular database libraries.
🎯 What You’ll Learn
- Identify SQL injection vulnerabilities in sqlite3, psycopg2, mysql.connector, SQLAlchemy, and Django ORM
- Recognize when input validation IS sufficient vs. when parameterization is required
- Understand the difference between identifiers (column/table names) and values
- Write secure fixes using proper parameterization patterns
📋 Prerequisites
- Basic Python knowledge
- Basic SQL syntax understanding
- Familiarity with web application concepts
🏆 Challenge Format
For each snippet, determine:
- Vulnerable? (Yes/No)
- If yes, write the fix
Scoring Guide:
- Correct identification: 1 point
- Correct fix (if vulnerable): 1 point
🔑 Key Concepts Before You Start
Identifiers vs Values
| Type | Examples | Can Parameterize? | Protection |
|---|---|---|---|
| Identifier | Column names, table names, aliases, ASC/DESC | ❌ No | Allowlist validation |
| Value | Data in WHERE, LIMIT, function arguments | ✅ Yes | Parameterization |
Placeholder Syntax by Library
| Library | Placeholder | Example |
|---|---|---|
| sqlite3 | ? | cursor.execute("SELECT * FROM users WHERE id = ?", (user_id,)) |
| psycopg2 | %s | cursor.execute("SELECT * FROM users WHERE id = %s", (user_id,)) |
| mysql.connector | %s | cursor.execute("SELECT * FROM users WHERE id = %s", (user_id,)) |
| SQLAlchemy | :param | conn.execute(text("SELECT * FROM users WHERE id = :id"), {"id": user_id}) |
Round 1: Fundamentals (Snippets 1-5)
Difficulty: Beginner
Snippet 1: Basic Authentication
import sqlite3
def authenticate_user(username, password):
conn = sqlite3.connect('app.db')
cursor = conn.cursor()
query = f"SELECT * FROM users WHERE username = '{username}' AND password = '{password}'"
cursor.execute(query)
user = cursor.fetchone()
conn.close()
if user:
return {"status": "success", "user_id": user[0]}
else:
return {"status": "failed"}
Snippet 2: Product Search
import psycopg2
def search_products(search_term):
conn = psycopg2.connect("dbname=store")
cursor = conn.cursor()
query = "SELECT name, price FROM products WHERE name ILIKE %s"
cursor.execute(query, (f"%{search_term}%",))
results = cursor.fetchall()
conn.close()
return results
Snippet 3: Account Check
import mysql.connector
def check_account_exists(email):
conn = mysql.connector.connect(host="localhost", database="accounts")
cursor = conn.cursor()
query = "SELECT 1 FROM users WHERE email = '" + email + "' LIMIT 1"
cursor.execute(query)
exists = cursor.fetchone() is not None
conn.close()
return {"exists": exists}
Snippet 4: Order Status Lookup
from sqlalchemy import create_engine, text
def get_order_status(order_id):
engine = create_engine('postgresql://localhost/orders')
with engine.connect() as conn:
query = text("SELECT status FROM orders WHERE id = :order_id")
result = conn.execute(query, {"order_id": order_id})
row = result.fetchone()
if row:
return {"status": row[0]}
return {"status": "not_found"}
Snippet 5: Report Generator
import sqlite3
def generate_report(table_name, date_filter):
conn = sqlite3.connect('reports.db')
cursor = conn.cursor()
allowed_tables = ['sales', 'inventory', 'customers']
if table_name not in allowed_tables:
return {"error": "Invalid table"}
query = f"SELECT * FROM {table_name} WHERE created_at > '{date_filter}'"
cursor.execute(query)
results = cursor.fetchall()
conn.close()
return {"data": results}
Round 2: Mixed Patterns (Snippets 6-14)
Difficulty: Intermediate
Snippet 6: User Profile with ID Check
import sqlite3
def get_user_profile(user_id):
conn = sqlite3.connect('app.db')
cursor = conn.cursor()
if not user_id.isdigit():
return {"error": "Invalid user ID"}
query = f"SELECT username, email, bio FROM users WHERE id = {user_id}"
cursor.execute(query)
profile = cursor.fetchone()
conn.close()
if profile:
return {"username": profile[0], "email": profile[1], "bio": profile[2]}
return {"error": "User not found"}
Snippet 7: Flask Search with Sort
from flask import Flask, request
import psycopg2
app = Flask(__name__)
@app.route('/search')
def search():
query_param = request.args.get('q', '')
sort_by = request.args.get('sort', 'name')
conn = psycopg2.connect("dbname=products")
cursor = conn.cursor()
allowed_sort = ['name', 'price', 'date_added']
if sort_by not in allowed_sort:
sort_by = 'name'
query = f"SELECT * FROM products WHERE name ILIKE %s ORDER BY {sort_by}"
cursor.execute(query, (f"%{query_param}%",))
results = cursor.fetchall()
conn.close()
return {"results": results}
Snippet 8: Bulk Delete
import mysql.connector
def delete_users(user_ids: list):
conn = mysql.connector.connect(host="localhost", database="app")
cursor = conn.cursor()
placeholders = ','.join(['%s'] * len(user_ids))
query = f"DELETE FROM users WHERE id IN ({placeholders})"
cursor.execute(query, tuple(user_ids))
conn.commit()
deleted_count = cursor.rowcount
conn.close()
return {"deleted": deleted_count}
Snippet 9: Login with Remember Token
import sqlite3
import hashlib
def login(username, password, remember_token=None):
conn = sqlite3.connect('app.db')
cursor = conn.cursor()
if remember_token:
query = f"SELECT * FROM users WHERE remember_token = '{remember_token}'"
cursor.execute(query)
else:
password_hash = hashlib.sha256(password.encode()).hexdigest()
query = "SELECT * FROM users WHERE username = ? AND password_hash = ?"
cursor.execute(query, (username, password_hash))
user = cursor.fetchone()
conn.close()
if user:
return {"status": "success", "user_id": user[0]}
return {"status": "failed"}
Snippet 10: Django Connection Query
from django.db import connection
def get_orders_by_status(status, customer_id):
with connection.cursor() as cursor:
cursor.execute(
"SELECT * FROM orders WHERE status = %s AND customer_id = %s",
[status, customer_id]
)
rows = cursor.fetchall()
return [{"id": row[0], "total": row[1], "status": row[2]} for row in rows]
Snippet 11: Audit Log Search
import psycopg2
from datetime import datetime
def search_audit_logs(action_type, start_date, end_date, limit=100):
conn = psycopg2.connect("dbname=audit")
cursor = conn.cursor()
query = """
SELECT timestamp, user_id, action, details
FROM audit_logs
WHERE action = %s
AND timestamp BETWEEN %s AND %s
ORDER BY timestamp DESC
LIMIT """ + str(limit)
cursor.execute(query, (action_type, start_date, end_date))
logs = cursor.fetchall()
conn.close()
return {"logs": logs}
Snippet 12: User Registration
import sqlite3
import hashlib
def register_user(username, email, password):
conn = sqlite3.connect('app.db')
cursor = conn.cursor()
cursor.execute("SELECT 1 FROM users WHERE username = ?", (username,))
if cursor.fetchone():
conn.close()
return {"error": "Username taken"}
cursor.execute("SELECT 1 FROM users WHERE email = ?", (email,))
if cursor.fetchone():
conn.close()
return {"error": "Email already registered"}
password_hash = hashlib.sha256(password.encode()).hexdigest()
cursor.execute(
"INSERT INTO users (username, email, password_hash) VALUES (?, ?, ?)",
(username, email, password_hash)
)
conn.commit()
user_id = cursor.lastrowid
conn.close()
return {"success": True, "user_id": user_id}
Snippet 13: Dynamic Report Builder
import psycopg2
def build_report(table, columns, filters):
"""
table: string - table name
columns: list - columns to select
filters: dict - {column: value} pairs for WHERE clause
"""
conn = psycopg2.connect("dbname=reports")
cursor = conn.cursor()
allowed_tables = ['sales', 'inventory', 'employees']
allowed_columns = ['id', 'name', 'amount', 'date', 'department']
if table not in allowed_tables:
return {"error": "Invalid table"}
safe_columns = [c for c in columns if c in allowed_columns]
if not safe_columns:
return {"error": "No valid columns"}
column_str = ', '.join(safe_columns)
where_clauses = []
values = []
for col, val in filters.items():
if col in allowed_columns:
where_clauses.append(f"{col} = %s")
values.append(val)
query = f"SELECT {column_str} FROM {table}"
if where_clauses:
query += " WHERE " + " AND ".join(where_clauses)
cursor.execute(query, tuple(values))
results = cursor.fetchall()
conn.close()
return {"data": results}
Snippet 14: Password Reset
import sqlite3
import secrets
def request_password_reset(email):
conn = sqlite3.connect('app.db')
cursor = conn.cursor()
cursor.execute("SELECT id FROM users WHERE email = ?", (email,))
user = cursor.fetchone()
if not user:
conn.close()
return {"message": "If email exists, reset link sent"}
reset_token = secrets.token_urlsafe(32)
cursor.execute(
"UPDATE users SET reset_token = ?, reset_expires = datetime('now', '+1 hour') WHERE id = ?",
(reset_token, user[0])
)
conn.commit()
conn.close()
return {"message": "If email exists, reset link sent"}
def reset_password(token, new_password):
conn = sqlite3.connect('app.db')
cursor = conn.cursor()
query = f"SELECT id FROM users WHERE reset_token = '{token}' AND reset_expires > datetime('now')"
cursor.execute(query)
user = cursor.fetchone()
if not user:
conn.close()
return {"error": "Invalid or expired token"}
cursor.execute(
"UPDATE users SET password_hash = ?, reset_token = NULL WHERE id = ?",
(new_password, user[0])
)
conn.commit()
conn.close()
return {"success": True}
Round 3: Identifier Patterns (Snippets 15-26)
Difficulty: Intermediate-Advanced
Snippet 15: Dynamic Column Filter
import psycopg2
def filter_products(column, value):
conn = psycopg2.connect("dbname=store")
cursor = conn.cursor()
allowed_columns = ['category', 'brand', 'status']
if column not in allowed_columns:
return {"error": "Invalid filter column"}
query = f"SELECT * FROM products WHERE {column} = %s"
cursor.execute(query, (value,))
results = cursor.fetchall()
conn.close()
return {"products": results}
Snippet 16: Sort Direction
import sqlite3
def get_users_sorted(sort_dir):
conn = sqlite3.connect('app.db')
cursor = conn.cursor()
query = f"SELECT id, username, created_at FROM users ORDER BY created_at {sort_dir}"
cursor.execute(query)
users = cursor.fetchall()
conn.close()
return {"users": users}
Snippet 17: Numeric ID Check
import mysql.connector
def get_order(order_id):
conn = mysql.connector.connect(host="localhost", database="shop")
cursor = conn.cursor()
try:
order_id = int(order_id)
except ValueError:
return {"error": "Invalid order ID"}
query = f"SELECT * FROM orders WHERE id = {order_id}"
cursor.execute(query)
order = cursor.fetchone()
conn.close()
return {"order": order}
Snippet 18: Multiple Column Sort
import psycopg2
def search_inventory(search_term, sort_columns: list):
conn = psycopg2.connect("dbname=warehouse")
cursor = conn.cursor()
allowed_sorts = ['name', 'quantity', 'price', 'updated_at']
safe_sorts = [col for col in sort_columns if col in allowed_sorts]
if not safe_sorts:
safe_sorts = ['name']
sort_str = ', '.join(safe_sorts)
query = f"SELECT * FROM inventory WHERE name ILIKE %s ORDER BY {sort_str}"
cursor.execute(query, (f"%{search_term}%",))
results = cursor.fetchall()
conn.close()
return {"items": results}
Snippet 19: Pagination
import sqlite3
def get_posts(page, per_page):
conn = sqlite3.connect('blog.db')
cursor = conn.cursor()
offset = (page - 1) * per_page
query = f"SELECT * FROM posts ORDER BY created_at DESC LIMIT {per_page} OFFSET {offset}"
cursor.execute(query)
posts = cursor.fetchall()
conn.close()
return {"posts": posts}
Snippet 20: Schema Browser
import psycopg2
def get_table_columns(table_name):
conn = psycopg2.connect("dbname=app")
cursor = conn.cursor()
query = f"""
SELECT column_name, data_type
FROM information_schema.columns
WHERE table_name = '{table_name}'
"""
cursor.execute(query)
columns = cursor.fetchall()
conn.close()
return {"columns": columns}
Snippet 21: Aggregate Query
import mysql.connector
def get_sales_summary(group_by_col, year):
conn = mysql.connector.connect(host="localhost", database="sales")
cursor = conn.cursor()
allowed_groups = ['product_id', 'category', 'region', 'salesperson']
if group_by_col not in allowed_groups:
group_by_col = 'category'
if not isinstance(year, int) or year < 2000 or year > 2100:
return {"error": "Invalid year"}
query = f"""
SELECT {group_by_col}, SUM(amount) as total
FROM sales
WHERE YEAR(sale_date) = {year}
GROUP BY {group_by_col}
"""
cursor.execute(query)
results = cursor.fetchall()
conn.close()
return {"summary": results}
Snippet 22: User Preferences Update
import sqlite3
def update_preference(user_id, pref_key, pref_value):
conn = sqlite3.connect('app.db')
cursor = conn.cursor()
allowed_prefs = ['theme', 'language', 'timezone', 'notifications']
if pref_key not in allowed_prefs:
return {"error": "Invalid preference"}
query = f"UPDATE user_preferences SET {pref_key} = ? WHERE user_id = ?"
cursor.execute(query, (pref_value, user_id))
conn.commit()
conn.close()
return {"success": True}
Snippet 23: Log Search with Date Range
import psycopg2
def search_logs(start_date, end_date, log_level):
conn = psycopg2.connect("dbname=logging")
cursor = conn.cursor()
query = """
SELECT timestamp, level, message
FROM logs
WHERE timestamp >= %s
AND timestamp <= %s
AND level = '""" + log_level + "' ORDER BY timestamp DESC"
cursor.execute(query, (start_date, end_date))
logs = cursor.fetchall()
conn.close()
return {"logs": logs}
Snippet 24: Dynamic IN Clause
import sqlite3
def get_users_by_role(roles: list):
conn = sqlite3.connect('app.db')
cursor = conn.cursor()
allowed_roles = ['admin', 'editor', 'viewer', 'guest']
safe_roles = [r for r in roles if r in allowed_roles]
if not safe_roles:
return {"error": "No valid roles provided"}
placeholders = ','.join(['?'] * len(safe_roles))
query = f"SELECT id, username, role FROM users WHERE role IN ({placeholders})"
cursor.execute(query, tuple(safe_roles))
users = cursor.fetchall()
conn.close()
return {"users": users}
Snippet 25: Conditional Column Selection
import mysql.connector
def export_users(include_email, include_phone):
conn = mysql.connector.connect(host="localhost", database="app")
cursor = conn.cursor()
columns = ['id', 'username']
if include_email:
columns.append('email')
if include_phone:
columns.append('phone')
column_str = ', '.join(columns)
query = f"SELECT {column_str} FROM users"
cursor.execute(query)
users = cursor.fetchall()
conn.close()
return {"users": users}
Snippet 26: Boolean Filter
import psycopg2
def get_active_items(category, is_active):
conn = psycopg2.connect("dbname=inventory")
cursor = conn.cursor()
if not isinstance(is_active, bool):
return {"error": "is_active must be boolean"}
query = f"SELECT * FROM items WHERE category = %s AND active = {is_active}"
cursor.execute(query, (category,))
items = cursor.fetchall()
conn.close()
return {"items": items}
Round 4: Complex Patterns (Snippets 27-38)
Difficulty: Advanced
Snippet 27: Search with Multiple Filters
import psycopg2
def advanced_search(filters: dict):
"""
filters: {"name": "widget", "min_price": 10, "max_price": 100}
"""
conn = psycopg2.connect("dbname=store")
cursor = conn.cursor()
allowed_filters = ['name', 'min_price', 'max_price', 'category', 'brand']
where_clauses = []
values = []
for key, val in filters.items():
if key not in allowed_filters:
continue
if key == 'name':
where_clauses.append("name ILIKE %s")
values.append(f"%{val}%")
elif key == 'min_price':
where_clauses.append("price >= %s")
values.append(val)
elif key == 'max_price':
where_clauses.append("price <= %s")
values.append(val)
elif key in ('category', 'brand'):
where_clauses.append(f"{key} = %s")
values.append(val)
query = "SELECT * FROM products"
if where_clauses:
query += " WHERE " + " AND ".join(where_clauses)
cursor.execute(query, tuple(values))
results = cursor.fetchall()
conn.close()
return {"products": results}
Snippet 28: JSON Field Query
import sqlite3
def search_metadata(field_name, field_value):
conn = sqlite3.connect('app.db')
cursor = conn.cursor()
query = f"SELECT * FROM documents WHERE json_extract(metadata, '$.{field_name}') = ?"
cursor.execute(query, (field_value,))
results = cursor.fetchall()
conn.close()
return {"documents": results}
Snippet 29: Bulk Status Update
import mysql.connector
def update_order_status(order_ids: list, new_status):
conn = mysql.connector.connect(host="localhost", database="shop")
cursor = conn.cursor()
allowed_statuses = ['pending', 'processing', 'shipped', 'delivered', 'cancelled']
if new_status not in allowed_statuses:
return {"error": "Invalid status"}
if not all(isinstance(oid, int) for oid in order_ids):
return {"error": "Invalid order IDs"}
placeholders = ','.join(['%s'] * len(order_ids))
query = f"UPDATE orders SET status = %s WHERE id IN ({placeholders})"
cursor.execute(query, (new_status, *order_ids))
conn.commit()
conn.close()
return {"updated": cursor.rowcount}
Snippet 30: Column Alias
import psycopg2
def get_report(value_column, label):
conn = psycopg2.connect("dbname=reports")
cursor = conn.cursor()
allowed_columns = ['revenue', 'expenses', 'profit', 'units_sold']
if value_column not in allowed_columns:
return {"error": "Invalid column"}
query = f"SELECT date, {value_column} AS {label} FROM monthly_reports ORDER BY date"
cursor.execute(query)
results = cursor.fetchall()
conn.close()
return {"report": results}
Snippet 31: SQLAlchemy Text Query
from sqlalchemy import create_engine, text
def find_user_by_field(field, value):
engine = create_engine('postgresql://localhost/app')
allowed_fields = ['username', 'email', 'phone']
if field not in allowed_fields:
return {"error": "Invalid field"}
with engine.connect() as conn:
query = text(f"SELECT * FROM users WHERE {field} = :value")
result = conn.execute(query, {"value": value})
user = result.fetchone()
return {"user": user}
Snippet 32: Date Range with Regex
import sqlite3
import re
def get_events(start_date, end_date):
conn = sqlite3.connect('events.db')
cursor = conn.cursor()
date_pattern = r'^\d{4}-\d{2}-\d{2}$'
if not re.match(date_pattern, start_date) or not re.match(date_pattern, end_date):
return {"error": "Invalid date format. Use YYYY-MM-DD"}
query = f"SELECT * FROM events WHERE event_date BETWEEN '{start_date}' AND '{end_date}'"
cursor.execute(query)
events = cursor.fetchall()
conn.close()
return {"events": events}
Snippet 33: Subquery Filter
import psycopg2
def get_top_customers(min_orders):
conn = psycopg2.connect("dbname=shop")
cursor = conn.cursor()
try:
min_orders = int(min_orders)
except (ValueError, TypeError):
return {"error": "Invalid minimum orders value"}
query = """
SELECT c.id, c.name, c.email
FROM customers c
WHERE (SELECT COUNT(*) FROM orders o WHERE o.customer_id = c.id) >= %s
"""
cursor.execute(query, (min_orders,))
customers = cursor.fetchall()
conn.close()
return {"customers": customers}
Snippet 34: LIKE with Match Types
import mysql.connector
def search_products(search_term, match_type):
conn = mysql.connector.connect(host="localhost", database="store")
cursor = conn.cursor()
if match_type == 'starts_with':
pattern = f"{search_term}%"
elif match_type == 'ends_with':
pattern = f"%{search_term}"
elif match_type == 'contains':
pattern = f"%{search_term}%"
else:
pattern = search_term
query = "SELECT * FROM products WHERE name LIKE %s"
cursor.execute(query, (pattern,))
results = cursor.fetchall()
conn.close()
return {"products": results}
Snippet 35: Django ORM
from django.contrib.auth.models import User
def search_users(search_term, order_by):
allowed_ordering = ['username', 'email', 'date_joined', '-username', '-email', '-date_joined']
if order_by not in allowed_ordering:
order_by = 'username'
users = User.objects.filter(
username__icontains=search_term
).order_by(order_by)
return list(users.values('id', 'username', 'email'))
Snippet 36: Table Join
import sqlite3
def get_order_details(order_id, include_customer):
conn = sqlite3.connect('shop.db')
cursor = conn.cursor()
if include_customer:
query = """
SELECT o.*, c.name, c.email
FROM orders o
JOIN customers c ON o.customer_id = c.id
WHERE o.id = ?
"""
else:
query = "SELECT * FROM orders WHERE id = ?"
cursor.execute(query, (order_id,))
order = cursor.fetchone()
conn.close()
return {"order": order}
Snippet 37: Regex Column Validation
import psycopg2
import re
def get_column_stats(column_name):
conn = psycopg2.connect("dbname=analytics")
cursor = conn.cursor()
if not re.match(r'^[a-z_]+$', column_name):
return {"error": "Invalid column name"}
query = f"SELECT MIN({column_name}), MAX({column_name}), AVG({column_name}) FROM metrics"
cursor.execute(query)
stats = cursor.fetchone()
conn.close()
return {"min": stats[0], "max": stats[1], "avg": stats[2]}
Snippet 38: Cursor Pagination
import psycopg2
def get_paginated_items(cursor_id, limit, direction):
conn = psycopg2.connect("dbname=app")
cursor = conn.cursor()
if not isinstance(cursor_id, int):
cursor_id = 0
if not isinstance(limit, int) or limit < 1 or limit > 100:
limit = 20
if direction == 'next':
operator = '>'
order = 'ASC'
elif direction == 'prev':
operator = '<'
order = 'DESC'
else:
return {"error": "Invalid direction"}
query = f"SELECT * FROM items WHERE id {operator} %s ORDER BY id {order} LIMIT %s"
cursor.execute(query, (cursor_id, limit))
items = cursor.fetchall()
conn.close()
return {"items": items}
Round 5: Expert Patterns (Snippets 39-52)
Difficulty: Expert
Snippet 39: UUID Validation
import sqlite3
import re
def get_session(session_id):
conn = sqlite3.connect('app.db')
cursor = conn.cursor()
uuid_pattern = r'^[a-f0-9]{8}-[a-f0-9]{4}-[a-f0-9]{4}-[a-f0-9]{4}-[a-f0-9]{12}$'
if not re.match(uuid_pattern, session_id):
return {"error": "Invalid session ID"}
query = f"SELECT * FROM sessions WHERE id = '{session_id}'"
cursor.execute(query)
session = cursor.fetchone()
conn.close()
return {"session": session}
Snippet 40: PostgreSQL JSON Operator
import psycopg2
def search_by_tag(tag_name):
conn = psycopg2.connect("dbname=content")
cursor = conn.cursor()
query = f"SELECT * FROM articles WHERE metadata->>'tags' ILIKE '%{tag_name}%'"
cursor.execute(query)
results = cursor.fetchall()
conn.close()
return {"articles": results}
Snippet 41: Numeric String Check
import mysql.connector
def get_product(product_id):
conn = mysql.connector.connect(host="localhost", database="store")
cursor = conn.cursor()
if not str(product_id).isnumeric():
return {"error": "Invalid product ID"}
query = f"SELECT * FROM products WHERE id = {product_id}"
cursor.execute(query)
product = cursor.fetchone()
conn.close()
return {"product": product}
Snippet 42: Window Function
import psycopg2
def get_ranked_sales(partition_col):
conn = psycopg2.connect("dbname=sales")
cursor = conn.cursor()
allowed_partitions = ['region', 'category', 'salesperson', 'quarter']
if partition_col not in allowed_partitions:
return {"error": "Invalid partition column"}
query = f"""
SELECT *, RANK() OVER (PARTITION BY {partition_col} ORDER BY amount DESC) as rank
FROM sales
"""
cursor.execute(query)
results = cursor.fetchall()
conn.close()
return {"sales": results}
Snippet 43: COALESCE Function
import sqlite3
def get_user_display_name(user_id, default_name):
conn = sqlite3.connect('app.db')
cursor = conn.cursor()
query = f"SELECT COALESCE(display_name, '{default_name}') FROM users WHERE id = ?"
cursor.execute(query, (user_id,))
result = cursor.fetchone()
conn.close()
return {"display_name": result[0] if result else None}
Snippet 44: Offset Pagination with Validation
import psycopg2
def get_comments(post_id, page):
conn = psycopg2.connect("dbname=blog")
cursor = conn.cursor()
try:
page = int(page)
if page < 1:
page = 1
except (ValueError, TypeError):
page = 1
offset = (page - 1) * 20
query = f"SELECT * FROM comments WHERE post_id = %s ORDER BY created_at LIMIT 20 OFFSET {offset}"
cursor.execute(query, (post_id,))
comments = cursor.fetchall()
conn.close()
return {"comments": comments}
Snippet 45: CAST Function
import mysql.connector
def search_by_year(year_input):
conn = mysql.connector.connect(host="localhost", database="archive")
cursor = conn.cursor()
query = f"SELECT * FROM documents WHERE CAST(created_year AS CHAR) = '{year_input}'"
cursor.execute(query)
results = cursor.fetchall()
conn.close()
return {"documents": results}
Snippet 46: PostgreSQL ANY Operator
import psycopg2
def find_users_with_role(role):
conn = psycopg2.connect("dbname=app")
cursor = conn.cursor()
allowed_roles = ['admin', 'editor', 'viewer', 'moderator']
if role not in allowed_roles:
return {"error": "Invalid role"}
query = f"SELECT * FROM users WHERE '{role}' = ANY(roles)"
cursor.execute(query)
users = cursor.fetchall()
conn.close()
return {"users": users}
Snippet 47: INTERVAL Expression
import psycopg2
def get_recent_activity(hours_ago):
conn = psycopg2.connect("dbname=activity")
cursor = conn.cursor()
if not isinstance(hours_ago, int) or hours_ago < 1 or hours_ago > 168:
hours_ago = 24
query = f"SELECT * FROM activity_log WHERE timestamp > NOW() - INTERVAL '{hours_ago} hours'"
cursor.execute(query)
activities = cursor.fetchall()
conn.close()
return {"activities": activities}
Snippet 48: Simple Parameterization
import sqlite3
def search_full_name(first_name, last_name):
conn = sqlite3.connect('app.db')
cursor = conn.cursor()
query = "SELECT * FROM users WHERE first_name = ? AND last_name = ?"
cursor.execute(query, (first_name, last_name))
users = cursor.fetchall()
conn.close()
return {"users": users}
Snippet 49: Table Function with isidentifier
import psycopg2
def get_table_size(table_name):
conn = psycopg2.connect("dbname=app")
cursor = conn.cursor()
if not table_name.isidentifier():
return {"error": "Invalid table name"}
query = f"SELECT pg_size_pretty(pg_total_relation_size('{table_name}'))"
cursor.execute(query)
size = cursor.fetchone()
conn.close()
return {"size": size[0]}
Snippet 50: Float Validation
import mysql.connector
def get_products_under_price(max_price):
conn = mysql.connector.connect(host="localhost", database="store")
cursor = conn.cursor()
try:
max_price = float(max_price)
except (ValueError, TypeError):
return {"error": "Invalid price"}
query = f"SELECT * FROM products WHERE price <= {max_price}"
cursor.execute(query)
products = cursor.fetchall()
conn.close()
return {"products": products}
Snippet 51: EXISTS Subquery
import sqlite3
def get_authors_with_posts(min_posts):
conn = sqlite3.connect('blog.db')
cursor = conn.cursor()
try:
min_posts = int(min_posts)
except ValueError:
return {"error": "Invalid minimum posts"}
query = """
SELECT * FROM authors a
WHERE EXISTS (
SELECT 1 FROM posts p
WHERE p.author_id = a.id
GROUP BY p.author_id
HAVING COUNT(*) >= ?
)
"""
cursor.execute(query, (min_posts,))
authors = cursor.fetchall()
conn.close()
return {"authors": authors}
Snippet 52: Schema-Qualified Table
import psycopg2
def get_data(schema_name, limit):
conn = psycopg2.connect("dbname=warehouse")
cursor = conn.cursor()
allowed_schemas = ['public', 'staging', 'archive']
if schema_name not in allowed_schemas:
return {"error": "Invalid schema"}
if not isinstance(limit, int) or limit < 1:
limit = 100
query = f"SELECT * FROM {schema_name}.reports ORDER BY created_at DESC LIMIT {limit}"
cursor.execute(query)
data = cursor.fetchall()
conn.close()
return {"data": data}
📝 Answer Key
Click to reveal answers
Round 1 Answers
| Snippet | Vulnerable? | Reason |
|---|---|---|
| 1 | ✅ Yes | f-string interpolation of username and password |
| 2 | ❌ No | Properly parameterized with %s |
| 3 | ✅ Yes | String concatenation of email |
| 4 | ❌ No | SQLAlchemy :param parameterization |
| 5 | ✅ Yes | date_filter interpolated via f-string (table is allowlisted) |
Snippet 1 Fix
# Vulnerable
query = f"SELECT * FROM users WHERE username = '{username}' AND password = '{password}'"
cursor.execute(query)
# Fixed
query = "SELECT * FROM users WHERE username = ? AND password = ?"
cursor.execute(query, (username, password))
Snippet 3 Fix
# Vulnerable
query = "SELECT 1 FROM users WHERE email = '" + email + "' LIMIT 1"
cursor.execute(query)
# Fixed
query = "SELECT 1 FROM users WHERE email = %s LIMIT 1"
cursor.execute(query, (email,))
Snippet 5 Fix
# Vulnerable
query = f"SELECT * FROM {table_name} WHERE created_at > '{date_filter}'"
cursor.execute(query)
# Fixed (table_name already allowlisted, just parameterize date_filter)
query = f"SELECT * FROM {table_name} WHERE created_at > ?"
cursor.execute(query, (date_filter,))
Round 2 Answers
| Snippet | Vulnerable? | Reason |
|---|---|---|
| 6 | ❌ No | .isdigit() prevents non-numeric input |
| 7 | ❌ No | sort_by allowlisted, query_param parameterized |
| 8 | ❌ No | Placeholders generated safely, values parameterized |
| 9 | ✅ Yes | remember_token branch uses f-string |
| 10 | ❌ No | Django parameterization with %s |
| 11 | ✅ Yes | limit concatenated via str() |
| 12 | ❌ No | All queries use ? parameterization |
| 13 | ❌ No | All identifiers allowlisted, values parameterized |
| 14 | ✅ Yes | reset_password function uses f-string for token |
Snippet 9 Fix
# Vulnerable
if remember_token:
query = f"SELECT * FROM users WHERE remember_token = '{remember_token}'"
cursor.execute(query)
# Fixed
if remember_token:
query = "SELECT * FROM users WHERE remember_token = ?"
cursor.execute(query, (remember_token,))
Snippet 11 Fix
# Vulnerable
query = """
SELECT timestamp, user_id, action, details
FROM audit_logs
WHERE action = %s
AND timestamp BETWEEN %s AND %s
ORDER BY timestamp DESC
LIMIT """ + str(limit)
cursor.execute(query, (action_type, start_date, end_date))
# Fixed
query = """
SELECT timestamp, user_id, action, details
FROM audit_logs
WHERE action = %s
AND timestamp BETWEEN %s AND %s
ORDER BY timestamp DESC
LIMIT %s
"""
cursor.execute(query, (action_type, start_date, end_date, limit))
Snippet 14 Fix (reset_password function)
# Vulnerable
query = f"SELECT id FROM users WHERE reset_token = '{token}' AND reset_expires > datetime('now')"
cursor.execute(query)
# Fixed
query = "SELECT id FROM users WHERE reset_token = ? AND reset_expires > datetime('now')"
cursor.execute(query, (token,))
Round 3 Answers
| Snippet | Vulnerable? | Reason |
|---|---|---|
| 15 | ❌ No | column allowlisted, value parameterized |
| 16 | ✅ Yes | sort_dir not validated |
| 17 | ❌ No | int() conversion prevents injection |
| 18 | ❌ No | Columns filtered against allowlist |
| 19 | ✅ Yes | page and per_page not validated |
| 20 | ✅ Yes | table_name interpolated without validation |
| 21 | ❌ No | group_by_col allowlisted, year validated with isinstance(int) |
| 22 | ❌ No | pref_key allowlisted, values parameterized |
| 23 | ✅ Yes | log_level concatenated |
| 24 | ❌ No | Roles filtered against allowlist, then parameterized |
| 25 | ❌ No | Boolean flags control hardcoded columns |
| 26 | ❌ No | isinstance(bool) only allows True/False |
Snippet 16 Fix
# Vulnerable
query = f"SELECT id, username, created_at FROM users ORDER BY created_at {sort_dir}"
cursor.execute(query)
# Fixed (allowlist validation — cannot parameterize ASC/DESC)
allowed_directions = ['ASC', 'DESC']
if sort_dir.upper() not in allowed_directions:
sort_dir = 'ASC'
query = f"SELECT id, username, created_at FROM users ORDER BY created_at {sort_dir.upper()}"
cursor.execute(query)
Snippet 19 Fix
# Vulnerable
offset = (page - 1) * per_page
query = f"SELECT * FROM posts ORDER BY created_at DESC LIMIT {per_page} OFFSET {offset}"
cursor.execute(query)
# Fixed (add validation + parameterization)
try:
page = int(page)
per_page = int(per_page)
if page < 1:
page = 1
if per_page < 1 or per_page > 100:
per_page = 20
except (ValueError, TypeError):
page, per_page = 1, 20
offset = (page - 1) * per_page
query = "SELECT * FROM posts ORDER BY created_at DESC LIMIT ? OFFSET ?"
cursor.execute(query, (per_page, offset))
Snippet 20 Fix
# Vulnerable
query = f"""
SELECT column_name, data_type
FROM information_schema.columns
WHERE table_name = '{table_name}'
"""
cursor.execute(query)
# Fixed
query = """
SELECT column_name, data_type
FROM information_schema.columns
WHERE table_name = %s
"""
cursor.execute(query, (table_name,))
Snippet 23 Fix
# Vulnerable
query = """
SELECT timestamp, level, message
FROM logs
WHERE timestamp >= %s
AND timestamp <= %s
AND level = '""" + log_level + "' ORDER BY timestamp DESC"
cursor.execute(query, (start_date, end_date))
# Fixed
query = """
SELECT timestamp, level, message
FROM logs
WHERE timestamp >= %s
AND timestamp <= %s
AND level = %s
ORDER BY timestamp DESC
"""
cursor.execute(query, (start_date, end_date, log_level))
Round 4 Answers
| Snippet | Vulnerable? | Reason |
|---|---|---|
| 27 | ❌ No | Keys allowlisted, values parameterized |
| 28 | ✅ Yes | field_name in JSON path not validated |
| 29 | ❌ No | Status allowlisted, IDs type-checked |
| 30 | ✅ Yes | label (alias) not validated |
| 31 | ❌ No | field allowlisted, value parameterized |
| 32 | ❌ No | Strict regex allows only YYYY-MM-DD |
| 33 | ❌ No | int() + parameterization |
| 34 | ❌ No | Pattern built safely, parameterized |
| 35 | ❌ No | Django ORM + allowlisted ordering |
| 36 | ❌ No | Boolean selects hardcoded queries |
| 37 | ❌ No | Strict regex ^[a-z_]+$ |
| 38 | ❌ No | Type checks + hardcoded operators |
Snippet 28 Fix
# Vulnerable
query = f"SELECT * FROM documents WHERE json_extract(metadata, '$.{field_name}') = ?"
cursor.execute(query, (field_value,))
# Fixed (allowlist validation — cannot parameterize JSON paths)
allowed_fields = ['author', 'category', 'status', 'priority']
if field_name not in allowed_fields:
return {"error": "Invalid field name"}
query = f"SELECT * FROM documents WHERE json_extract(metadata, '$.{field_name}') = ?"
cursor.execute(query, (field_value,))
Snippet 30 Fix
# Vulnerable
query = f"SELECT date, {value_column} AS {label} FROM monthly_reports ORDER BY date"
cursor.execute(query)
# Fixed (allowlist validation — cannot parameterize aliases)
allowed_columns = ['revenue', 'expenses', 'profit', 'units_sold']
allowed_labels = ['total', 'amount', 'value', 'metric', 'result']
if value_column not in allowed_columns:
return {"error": "Invalid column"}
if label not in allowed_labels:
return {"error": "Invalid label"}
query = f"SELECT date, {value_column} AS {label} FROM monthly_reports ORDER BY date"
cursor.execute(query)
Round 5 Answers
| Snippet | Vulnerable? | Reason |
|---|---|---|
| 39 | ❌ No | UUID regex only allows hex + hyphens |
| 40 | ✅ Yes | tag_name in ILIKE not parameterized |
| 41 | ❌ No | .isnumeric() prevents non-numeric |
| 42 | ❌ No | partition_col allowlisted |
| 43 | ✅ Yes | default_name in COALESCE not parameterized |
| 44 | ❌ No | int() with error handling |
| 45 | ✅ Yes | year_input in CAST not parameterized |
| 46 | ❌ No | role allowlisted |
| 47 | ❌ No | isinstance(int) + range check |
| 48 | ❌ No | Proper parameterization |
| 49 | ❌ No | .isidentifier() allows only valid identifiers |
| 50 | ❌ No | float() with error handling |
| 51 | ❌ No | int() + parameterization |
| 52 | ❌ No | Schema allowlisted, limit type-checked |
Snippet 40 Fix
# Vulnerable
query = f"SELECT * FROM articles WHERE metadata->>'tags' ILIKE '%{tag_name}%'"
cursor.execute(query)
# Fixed (parameterize with wildcards in parameter value)
query = "SELECT * FROM articles WHERE metadata->>'tags' ILIKE %s"
cursor.execute(query, (f"%{tag_name}%",))
Snippet 43 Fix
# Vulnerable
query = f"SELECT COALESCE(display_name, '{default_name}') FROM users WHERE id = ?"
cursor.execute(query, (user_id,))
# Fixed (parameterize both values)
query = "SELECT COALESCE(display_name, ?) FROM users WHERE id = ?"
cursor.execute(query, (default_name, user_id))
Snippet 45 Fix
# Vulnerable
query = f"SELECT * FROM documents WHERE CAST(created_year AS CHAR) = '{year_input}'"
cursor.execute(query)
# Fixed
query = "SELECT * FROM documents WHERE CAST(created_year AS CHAR) = %s"
cursor.execute(query, (year_input,))
📊 Score Yourself
| Score | Level |
|---|---|
| 0-20 | Beginner — Review parameterization basics |
| 21-35 | Intermediate — Practice identifier vs value distinction |
| 36-45 | Advanced — Focus on subtle patterns |
| 46-52 | Expert — Ready for production code review! |
🚀 Want More Exercises Like These?
I’m building an open-source repository of LeetCode-style secure coding exercises to help train the next generation of Security Engineers — and to curate high-quality secure code datasets for AI training.
⭐ Star the repo: github.com/fosres/SecEng-Exercises
What’s inside:
- 🔐 SQL Injection exercises (like these!)
- 🛡️ XSS prevention patterns
- 🔑 Authentication security
- 📡 API security challenges
- 🧪 60+ test cases per exercise
Contributing
Found a bug? Want to add exercises? PRs welcome!
📚 References
These exercises were inspired by real-world patterns documented in:
- Full Stack Python Security by Dennis Byrne (Manning, 2021) — pp. 205-207
- Secure by Design by Johnsson, Deogun, Sawano (Manning, 2019) — Ch. 5
- API Security in Action by Neil Madden (Manning, 2020) — Ch. 2
- Hacking APIs by Corey Ball (No Starch Press, 2022) — Ch. 9
- OWASP SQL Injection Prevention Cheat Sheet — owasp.org
- PortSwigger Web Security Academy — portswigger.net
🏷️ Tags
#security #python #sql #webdev #beginners #tutorial #opensource #appsec
Did these exercises help you? Drop a comment with your score! And don’t forget to ⭐ the GitHub repo for more security content.