The Problem: Pythonโs Task Queues Are Stuck in 2009
If youโre building async Python applications with FastAPI or aiohttp, youโve hit this wall: every major task queue was designed before async/await even existed.
Celery? Built in 2009. RQ? 2011. Sure, theyโve bolted on async support, but thatโs like putting a Tesla battery in a Model Tโthe foundation is still synchronous.
So youโre stuck choosing between:
- Celery โ Fighting async/sync impedance mismatch
- ARQ โ Locked into Redis forever
- Rolling your own โ (Please. Donโt.)
After releasing AsyncTasQ, Iโm here to show you a much better way.
What Makes AsyncTasQ Different
AsyncTasQ is a modern, async-first, type-safe task queue built from scratch for Pythโฆ
The Problem: Pythonโs Task Queues Are Stuck in 2009
If youโre building async Python applications with FastAPI or aiohttp, youโve hit this wall: every major task queue was designed before async/await even existed.
Celery? Built in 2009. RQ? 2011. Sure, theyโve bolted on async support, but thatโs like putting a Tesla battery in a Model Tโthe foundation is still synchronous.
So youโre stuck choosing between:
- Celery โ Fighting async/sync impedance mismatch
- ARQ โ Locked into Redis forever
- Rolling your own โ (Please. Donโt.)
After releasing AsyncTasQ, Iโm here to show you a much better way.
What Makes AsyncTasQ Different
AsyncTasQ is a modern, async-first, type-safe task queue built from scratch for Pythonโs asyncio ecosystem. Think Laravelโs elegant queue API, rebuilt for async Python in 2025.
The differentiators:
๐ True async-first architecture โ Built with asyncio from day one, not retrofitted
๐ง Intelligent ORM serialization โ Pass SQLAlchemy/Django/Tortoise models directly (90%+ smaller payloads)
๐ Multi-backend flexibility โ 5 production drivers (Redis, PostgreSQL, MySQL, RabbitMQ, AWS SQS), identical API
โจ Type-safe everything โ Full Generic[T] support with IDE autocomplete
โก Performance that matters โ 2-3x faster than Celery (benchmarks below)
The Numbers: AsyncTasQ vs Celery
Letโs cut to the chase. I ran comprehensive benchmarks comparing AsyncTasQ to Celery across three real-world scenarios.
Test Setup: Same hardware, same configuration
- NOOP: 20,000 tasks with minimal work (pure framework overhead)
- I/O: 10,000 I/O-bound tasks (API calls, database queries)
- CPU: 5,000 CPU-intensive tasks (data processing, ML inference)
Benchmark 1: Pure Framework Overhead (NOOP)
20,000 tasks are doing nothing essentially. This measures the amount of overhead the framework itself adds.
AsyncTasQ: 3,429 tasks/sec, completed in 0.10 seconds
Celery: 1,121 tasks/sec, completed in 9.33 seconds
๐ AsyncTasQ is 3.1x faster throughput, 93x faster completion
The difference is stark. While Celery takes over 9 seconds to process 20K minimal tasks, AsyncTasQ finishes in 0.1 seconds. The async-first architecture eliminates blocking operations in the critical path.
Benchmark 2: I/O-Bound Tasks (The Async Sweet Spot)
10,000 tasks making async I/O callsโsimulating API requests, database queries, and file operations.
AsyncTasQ: 3,357 tasks/sec, completed in 0.10 seconds
Celery: 1,194 tasks/sec, completed in 4.30 seconds
๐ AsyncTasQ is 2.81x faster throughput, 43x faster completion
This is where native async/await architecture dominates. While Celery relies on threading or multiprocessing, AsyncTasQ uses the event loop for true asynchronous I/O.
Perfect for: Web scraping, API calls, database queries, webhooks, and email sending.
Benchmark 3: CPU-Intensive Tasks
5,000 CPU-bound tasks. Async typically doesnโt help here, but AsyncTasQ still wins.
AsyncTasQ: 1,473 tasks/sec, 279 MB memory, 2.02s completion
Celery: 972 tasks/sec, 335 MB memory, 2.87s completion
๐ AsyncTasQ is 1.51x faster + uses 16.7% less memory
Even with process pools (which both use for CPU work), AsyncTasQโs architecture is more efficient with lower overhead.
The Verdict
| Workload | AsyncTasQ | Celery | Speedup |
|---|---|---|---|
| Framework overhead | 3,429 t/s | 1,121 t/s | 3.1x |
| I/O-bound | 3,357 t/s | 1,194 t/s | 2.81x |
| CPU-bound | 1,473 t/s | 972 t/s | 1.51x |
Across every scenario, AsyncTasQ is 1.5-3.1x faster. The async architecture gives the biggest win on I/O workloads (where most real-world tasks live), but even CPU tasks benefit from reduced overhead.
Benchmark Setup: Tests conducted on dedicated hardware with identical configurations for fair comparison. Both frameworks configured with Redis backend, 10 concurrent workers, and default settings. Results represent average throughput across multiple runs. Your results may vary based on workload characteristics and infrastructure.
Game-Changer #1: ORM Auto-Serialization
This feature alone is worth switching for. It eliminates so much boilerplate.
The Old Way (Every Other Task Queue)
# The painful manual approach with Celery
@celery_task
def send_welcome_email(user_id: int):
# Manually re-fetch from database
user = User.query.get(user_id)
print(f"Sending email to {user.email}")
# Dispatch - manually extract ID first
send_welcome_email.delay(user.id)
Problems:
- โ Verbose: Extract ID โ pass ID โ re-fetch model
- โ Error-prone: Forget the re-fetch? Runtime error
- โ Large payloads: Full objects serialize everything
The AsyncTasQ Way
from asynctasq import task
@task
async def send_welcome_email(user: User):
# user is automatically re-fetched with fresh data
print(f"Sending email to {user.email}")
# Just pass the model directly
await send_welcome_email(user).dispatch()
Thatโs it. No ID extraction. No manual re-fetching. AsyncTasQ handles it.
How It Works
- On dispatch: AsyncTasQ detects the ORM model and stores only the primary key
- In queue: Lightweight reference goes into Redis/Postgres/SQS (4 bytes vs 400+)
- On execution: Worker automatically re-fetches from database with fresh data
- Parallel optimization: Multiple models?
asyncio.gather()fetches them in parallel
The Impact
User model with 20 fields:
Without AsyncTasQ (standard serialization):
{
"id": 123,
"email": "user@example.com",
"name": "John Doe",
"created_at": "2025-01-01T00:00:00Z",
"address": "123 Main St",
"phone": "+1-555-0123",
# ... 14 more fields
}
# Total: ~450 bytes when serialized with msgpack
With AsyncTasQ (ORM reference):
{"__orm:sqlalchemy__": 123, "__orm_class__": "app.models.User"}
# Total: ~45 bytes (90% reduction!)
Real impact: 10,000 queued tasks = 4.5MB vs 450KB. Thatโs faster queue operations, lower memory usage, and cheaper infrastructure costs.
Supports: SQLAlchemy (async/sync), Django ORM, Tortoise ORM. Handles composite PKs, UUIDs, and foreign keys.
Game-Changer #2: Zero Vendor Lock-In
Most task queues chain you to one backend. ARQ? Redis only. RQ? Redis only. Celery? Three backends, but with different feature sets.
AsyncTasQ gives you 5 production drivers, one API:
- Redis โ Fast, simple, great default
- PostgreSQL โ ACID guarantees, dead-letter queues
- MySQL โ ACID with InnoDB row-level locking
- RabbitMQ โ AMQP protocol, advanced routing
- AWS SQS โ Fully managed, serverless-ready
Switch Backends in One Line
from asynctasq import init
# Dev: Redis
init({'driver': 'redis', 'redis': {'url': 'redis://localhost:6379'}})
# Prod: PostgreSQL (ACID guarantees)
init({'driver': 'postgres', 'postgres': {'dsn': 'postgresql://...'}})
# Serverless: AWS SQS
init({'driver': 'sqs', 'sqs': {'region': 'us-east-1'}})
# Your task code? Unchanged.
Why This Matters
- Start simple: Redis in dev, PostgreSQL in prod
- Use existing infrastructure: No need for new services
- ACID when needed: PostgreSQL/MySQL for critical workflows
- Go serverless: SQS for AWS Lambda
- Experiment freely: Try backends without code changes
Each Driver Has Superpowers
PostgreSQL/MySQL: ACID transactions, dead-letter queues, visibility timeouts Redis: Highest throughput, Pub/Sub events, sorted sets for delays AWS SQS: Fully managed, auto-scaling, IAM roles RabbitMQ: Advanced routing, exchange types, message acks
Game-Changer #3: Type Safety & Developer Experience
Modern Python is typed. AsyncTasQ was built for it.
Full Generic[T] Support
from asynctasq import AsyncTask
class ProcessPayment(AsyncTask[bool]):
async def execute(self) -> bool:
# IDE knows return type
# mypy validates at build time
return True
result: bool = await ProcessPayment(...).dispatch_sync()
Full IDE autocomplete. Full type checking. Zero runtime surprises.
Four Execution Modes for Every Workload
| Mode | Concurrency | Best For |
|---|---|---|
AsyncTask | 1000s concurrent | API calls, async DB, webhooks |
SyncTask | 100s concurrent | requests, sync DB drivers |
AsyncProcessTask | # CPU cores | Async + heavy compute |
SyncProcessTask | # CPU cores | NumPy, Pandas, ML inference |
from asynctasq import task
# Async I/O (handles 1000s concurrently)
@task
async def fetch_data(url: str):
async with httpx.AsyncClient() as client:
return await client.get(url)
# CPU-bound (bypasses GIL with process pool)
@task(process=True)
def crunch_numbers(matrix: list[list[float]]):
import numpy as np
return np.linalg.inv(np.array(matrix))
Laravel-Style Method Chaining
await send_email(to="user@example.com", subject="Welcome") \
.on_queue("high-priority") \
.delay(60) \
.max_attempts(5) \
.timeout(30) \
.dispatch()
Override any parameter at dispatch time. Zero need for separate task functions.
FastAPI: First-Class Integration
from fastapi import FastAPI
from asynctasq import AsyncTasQIntegration, task
asynctasq = AsyncTasQIntegration()
app = FastAPI(lifespan=asynctasq.lifespan)
@task
async def send_welcome_email(user_id: int):
print(f"Sending welcome email to user {user_id}")
@app.post("/users")
async def create_user(email: str):
user_id = 123 # Created user
task_id = await send_welcome_email(user_id).dispatch()
return {"user_id": user_id, "task_id": task_id}
Native lifespan integration = proper cleanup on shutdown.
Enterprise-Ready Out of the Box
ACID Guarantees (PostgreSQL/MySQL) โ Transactional processing, exactly-once delivery, zero lost tasks
Dead-Letter Queues โ Failed tasks auto-moved to DLQ for inspection and manual retry
Crash Recovery โ Visibility timeouts ensure stuck tasks reappear if workers die
Graceful Shutdown โ SIGTERM/SIGINT handlers let in-flight tasks complete
Real-time Monitoring (Redis Pub/Sub)
โ Stream events: task_started, task_completed, task_failed, worker_online
Built-in Metrics:
from asynctasq import MonitoringService
stats = await MonitoringService().get_queue_stats("emails")
# depth, processing, completed, failed
Beautiful CLI (powered by Rich):
$ asynctasq worker --queues default --concurrency 20
โญโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโฎ
โ AsyncTasQ Worker โ
โ Queues: default | Concurrency: 20 โ
โฐโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโฏ
โ Worker started โข โก Waiting for tasks...
When to Choose AsyncTasQ
Perfect for:
- โ Modern async apps (FastAPI, aiohttp)
- โ Type-safe teams (full IDE support)
- โ High-throughput systems (millions of tasks)
- โ ORM-heavy apps (SQLAlchemy, Django, Tortoise)
- โ Enterprise needs (ACID, DLQs, monitoring)
- โ Avoiding vendor lock-in (5 backends)
vs The Competition
| Feature | AsyncTasQ | Celery | ARQ |
|---|---|---|---|
| Async-first | โ Native | โ | โ |
| Type safety | โ Generic[T] | โ ๏ธ External | โ |
| Backends | 5 | 3 | 1 |
| ORM auto-serialization | โ | โ | โ |
| ACID guarantees | โ | โ | โ |
| Dead-letter queues | โ Built-in | โ ๏ธ Manual | โ |
| FastAPI integration | โ Native | โ ๏ธ Manual | โ ๏ธ Manual |
| Performance vs Celery | 1.5-3x faster | 1x | โ |
Still choose Celery for: Mature plugin ecosystem, existing large codebases Still choose ARQ for: Simple Redis-only needs with cron
Getting Started in 30 Seconds
# Install
pip install asynctasq[redis]
# Generate .env template
asynctasq publish
# Edit .env with your settings
# ASYNCTASQ_DRIVER=redis
# ASYNCTASQ_REDIS_URL=redis://localhost:6379
from asynctasq import init, task, run
init() # Load from .env
@task(queue='emails')
async def send_email(to: str, subject: str):
print(f"Sending to {to}: {subject}")
return f"Sent!"
async def main():
# Dispatch
task_id = await send_email(
to="user@example.com",
subject="Welcome!"
).dispatch()
# With delay
await send_email(to="...", subject="Reminder") \
.delay(60) \
.dispatch()
if __name__ == "__main__":
run(main())
# Run worker
asynctasq worker --queues emails --concurrency 20
Done. Tasks are processing.
Real-World Example: FastAPI + SQLAlchemy
from fastapi import FastAPI
from sqlalchemy.ext.asyncio import create_async_engine, async_sessionmaker
from asynctasq import AsyncTasQIntegration, task
# DB setup
engine = create_async_engine('postgresql+asyncpg://...')
async_session = async_sessionmaker(engine)
Base._asynctasq_session_factory = async_session
# FastAPI + AsyncTasQ
asynctasq = AsyncTasQIntegration()
app = FastAPI(lifespan=asynctasq.lifespan)
# Task - pass ORM models directly!
@task(queue='emails')
async def send_welcome_email(user: User):
print(f"Welcome {user.email}!")
# Endpoint
@app.post("/users")
async def create_user(email: str, name: str):
async with async_session() as session:
user = User(email=email, name=name)
session.add(user)
await session.commit()
# Pass the model directly
task_id = await send_welcome_email(user).dispatch()
return {"user_id": user.id, "task_id": task_id}
Magic happening here:
- Pass
Usermodel directly (notuser.id) - AsyncTasQ serializes only the PK (4 bytes vs 400+)
- Worker re-fetches with fresh data
- FastAPI lifespan ensures a clean shutdown
Whatโs Next
AsyncTasQ v1.6 is production-ready.
Coming soon:
- SQLite & Oracle drivers
- Task chaining & workflows (DAG-based)
- Rate limiting & priority queues
- Cron/scheduled tasks
The Bottom Line
After 8 weeks of building and testing, AsyncTasQ v1.6 is what modern Python task queues should be:
๐ Fast โ 1.5-3x faster than Celery ๐ง Smart โ ORM auto-serialization ๐ Flexible โ 5 backends, one API โจ Type-safe โ Full Generic[T] support ๐ข Production-ready โ ACID, DLQs, monitoring
The Python async ecosystem deserved a task queue built for async/await from day one. Not retrofitted. Not bolted on. Native async, all the way down.
Try It
- GitHub: github.com/adamrefaey/asynctasq (โญ if youโre excited!)
- Docs: Full documentation
- PyPI: pypi.org/project/asynctasq
Which feature excites you most? ORM auto-serialization? The 2-3x speedup? Multi-backend flexibility? Drop a comment!
If this was useful, star the repoโit helps other developers discover AsyncTasQ.
Built with โค๏ธ by Adam Refaey for the Python community.