Introduction
In the rapidly evolving landscape of AI agents and autonomous systems, enabling seamless communication between different agents has become crucial. The Agent-to-Agent (A2A) protocol provides a standardized way for AI agents to interact, exchange messages, and coordinate tasks. In this article, Iβll walk you through our implementation of an A2A gateway built on AWS serverless architecture.
What is Agent-to-Agent (A2A) Communication?
Agent-to-Agent communication is a protocol that allows different AI agents to interact with each other in a standardized way. Think of it as an API contract specifically designed for agent interactions. Rather than having agents communicate through proprietary interfaces, A2A provides:
- Standardized message formats using JSONβ¦
Introduction
In the rapidly evolving landscape of AI agents and autonomous systems, enabling seamless communication between different agents has become crucial. The Agent-to-Agent (A2A) protocol provides a standardized way for AI agents to interact, exchange messages, and coordinate tasks. In this article, Iβll walk you through our implementation of an A2A gateway built on AWS serverless architecture.
What is Agent-to-Agent (A2A) Communication?
Agent-to-Agent communication is a protocol that allows different AI agents to interact with each other in a standardized way. Think of it as an API contract specifically designed for agent interactions. Rather than having agents communicate through proprietary interfaces, A2A provides:
- Standardized message formats using JSON-RPC
- Task lifecycle management (submit, track, cancel)
- Context preservation across multi-turn conversations
- Asynchronous processing with polling-based status checks
- Secure authentication between agents
Architecture Overview
Our A2A implementation leverages AWS serverless services to create a scalable, cost-effective solution:
βββββββββββββββ
β Client β
β Agent β
ββββββββ¬βββββββ
β HTTPS + JWT
βΌ
βββββββββββββββββββββββ
β API Gateway β
β Custom Authorizer β
ββββββββ¬βββββββββββββββ
β
βΌ
βββββββββββββββββββββββ
β Lambda (FastAPI) β
β A2A Gateway β
ββββββββ¬βββββββββββββββ
β
ββββββββββββ¬ββββββββββ
βΌ βΌ βΌ
ββββββββββββ ββββββββ ββββββββββββ
β DynamoDB β β SQS β β Secrets β
β Tasks β βQueue β β Manager β
ββββββββββββ ββββββββ ββββββββββββ
Key Components
- API Gateway with Custom Authorizer: Validates JWT tokens and enforces scope-based access control
- FastAPI Lambda Function: Handles JSON-RPC requests and implements the A2A protocol
- DynamoDB: Stores task state and history with efficient querying via GSI
- SQS: Decouples message submission from agent processing
- AWS Secrets Manager: Securely manages signing keys for inter-service communication
Core Implementation Details
1. JSON-RPC Handler
The gateway implements the A2A protocol using JSON-RPC 2.0, supporting three primary operations:
@app.post("/a2a", tags=["A2A"])
async def a2a_rpc(request: Request) -> JSONResponse:
body = await request.json()
a2a_req = A2ARequest.model_validate(body)
req = a2a_req.root
if isinstance(req, SendMessageRequest):
ctx = _authorize(request, "message.send")
req = _inject_user_id(req, ctx["user_id"])
resp = await rpc.on_message_send(req)
elif isinstance(req, GetTaskRequest):
ctx = _authorize(request, "task.get")
resp = await rpc.on_get_task(req)
elif isinstance(req, CancelTaskRequest):
ctx = _authorize(request, "task.cancel")
resp = await rpc.on_cancel_task(req)
2. Authentication and Authorization
Security is implemented through a multi-layered approach:
Custom JWT Authorizer
The authorizer validates access tokens from an OAuth provider, checking:
- Token signature using JWKS (JSON Web Key Set)
- Token expiration and validity
- Issuer and audience claims
- Required scopes for each operation
def _verify_access_token(token: str) -> dict[str, Any]:
header = jwt.get_unverified_header(token)
kid = header.get("kid")
alg = header.get("alg")
jwks = _get_jwks() # Cached JWKS
jwk = next((k for k in jwks if k.get("kid") == kid), None)
claims = jwt.decode(
token, jwk,
algorithms=[alg],
audience=API_AUDIENCE,
issuer=ISSUER
)
return claims
Scope-Based Access Control
Different operations require specific scopes:
| Operation | Required Scope |
|---|---|
| Send Message | tasks:submit |
| Get Task Status | tasks:read |
| Cancel Task | tasks:cancel |
Inter-Service Authentication
For communication between the A2A gateway and backend agents, we mint short-lived JWTs (60 seconds TTL):
def mint_agent_token(user_id: str, ttl_seconds: int = 60) -> str:
now = int(time.time())
payload = {
"iss": "a2a-gateway",
"aud": "agent",
"sub": user_id,
"iat": now,
"exp": now + ttl_seconds,
}
return jwt.encode(payload, secret, algorithm="HS256")
3. Task Management and State Machine
Tasks follow a clear lifecycle:
submitted β working β completed
β failed
β canceled
DynamoDB Schema
We use a single-table design with a Global Secondary Index for efficient querying:
class A2ADatabase(BaseModel):
# Primary Key: Task-centric access
pk: str # Task#{task_id}
sk: str # Event#{timestamp}#{uuid}
# GSI: Session-centric access
gsi1_pk: str # session_id
gsi1_sk: int # created_at_ms
# Domain fields
session_id: str
task_id: str
state: TaskState
message: str
metadata: dict
This schema enables:
- Fast task status lookups using the primary key
- Chronological event history per task
- Session-based queries via GSI
- Optimistic concurrency through monotonic timestamps
4. Asynchronous Processing with Polling
Rather than implementing real-time streaming (complex in API Gateway + Lambda), we use a polling model:
Message Send Flow
- Client sends message: Initial request creates a task in
submittedstate - Immediate response: Returns task ID with
submittedstatus - Background processing: Message queued to SQS for agent processing
- Client polls: Periodically calls
tasks/getto check status (recommended: 15-second intervals) - Task completion: Agent updates task to
completedwith response artifact
async def on_message_send(
self, params: MessageSendParams
) -> Task:
# Check if polling existing task
if params.message.task_id:
return await self.on_get_task(
TaskQueryParams(id=params.message.task_id)
)
# Create new task
task_id = str(uuid.uuid4())
session_id = get_session_id(params)
# Store in DynamoDB
_table.put_item(Item={
"pk": f"Task#{task_id}",
"session_id": session_id,
"state": "submitted",
"message": get_message_text(params),
# ... other fields
})
# Queue for processing
sqs_client.send_message(
QueueUrl=SQS,
MessageBody=message_body
)
return Task(
id=task_id,
context_id=session_id,
status=TaskStatus(state=TaskState.submitted)
)
5. Task Cancellation
Clients can cancel tasks that havenβt completed:
async def on_cancel_task(
self, params: TaskIdParams
) -> Task:
task = await self.on_get_task(
TaskQueryParams(id=params.id)
)
if task.status.state == TaskState.completed:
return task # Cannot cancel completed tasks
# Update state to canceled
_table.put_item(Item={
"pk": f"Task#{params.id}",
"state": "canceled",
"message": "Task cancelled by user.",
# ... other fields
})
return Task(
id=params.id,
status=TaskStatus(state=TaskState.canceled)
)
Best Practices and Lessons Learned
1. Monotonic Timestamps for Event Ordering
We encountered a subtle bug where rapid successive events could have the same millisecond timestamp, causing sort key collisions. Solution: maintain a module-level counter to ensure monotonically increasing timestamps.
_LAST_EVENT_MS: int = 0
def _next_event_ms() -> int:
global _LAST_EVENT_MS
now_ms = int(time.time() * 1000)
if now_ms <= _LAST_EVENT_MS:
now_ms = _LAST_EVENT_MS + 1
_LAST_EVENT_MS = now_ms
return now_ms
2. JWKS Caching with Fallback
Fetching JWKS on every request is inefficient. We implemented a cache with graceful degradation:
def _get_jwks() -> list[dict]:
now = time.time()
if _JWKS["keys"] and now - _JWKS["at"] < CACHE_TTL:
return _JWKS["keys"]
try:
keys = _fetch_jwks()
_JWKS.update({"keys": keys, "at": now})
return keys
except Exception:
if _JWKS["keys"]: # Use stale cache
return _JWKS["keys"]
raise
3. Comprehensive Logging
Structured logging is crucial for debugging distributed systems:
logger.info(
"on_message_send created | task_id=%s sessionId=%s",
task_id,
sessionId
)
4. Consistent Read for Status Checks
DynamoDBβs eventual consistency can cause race conditions. Always use ConsistentRead=True when checking task status:
r = _table.query(
KeyConditionExpression=...,
ConsistentRead=True, # Critical!
Limit=1
)
5. Proper Error Handling
Implement JSON-RPC compliant error responses:
except ValidationError as e:
return JSONResponse(
JSONRPCErrorResponse(
id=request_id,
error=InternalError(
message="Invalid request",
data=json.loads(e.json())
)
).model_dump(mode="json"),
status_code=200 # JSON-RPC errors still return 200
)
Performance Considerations
Scalability
- Lambda concurrency: Automatically scales to handle traffic spikes
- DynamoDB on-demand: Scales with request volume without capacity planning
- SQS buffering: Smooths traffic bursts to backend agents
- Stateless design: No sticky sessions or state management overhead
Cost Optimization
- Lambda cold starts: Mitigated by keeping functions warm during business hours
- DynamoDB query efficiency: Single-table design with targeted queries
- Secret caching: Reduces Secrets Manager API calls by 99%
- JWKS caching: Avoids repeated HTTPS calls to OAuth provider
Security Highlights
- Defense in depth: Multiple authentication layers (OAuth, scopes, inter-service JWTs)
- Least privilege: Scope-based access control ensures clients can only perform authorized operations
- Short-lived tokens: Agent tokens expire in 60 seconds
- HTTPS only: All communication encrypted in transit
- No sensitive data in logs: User IDs and task IDs only, no message content
Future Enhancements
While our current implementation is production-ready, several enhancements could be valuable:
- WebSocket support: For real-time updates instead of polling
- Event-driven notifications: SNS/EventBridge for push notifications
- GraphQL interface: Alternative to JSON-RPC for more flexible queries
- Multi-region deployment: For global low-latency access
- Enhanced observability: X-Ray tracing and CloudWatch Insights dashboards
Conclusion
Building a robust A2A gateway on AWS requires careful consideration of security, scalability, and reliability. By leveraging serverless services and implementing best practices like:
- Proper authentication and authorization
- Asynchronous processing with polling
- Monotonic event ordering
- Comprehensive error handling
- Strategic caching
Weβve created a system that can handle enterprise-scale agent interactions while maintaining security and performance.
The A2A protocol represents an important step toward interoperable AI agents. As the ecosystem matures, standardized communication protocols will enable rich agent ecosystems where specialized agents can collaborate to solve complex problems.
Key Takeaways
β Use JSON-RPC for standardized agent communication
β Implement multi-layered security with OAuth and scope-based access control
β Design for asynchronous processing with polling-based status checks
β Leverage DynamoDB single-table design for efficient task management
β Cache aggressively but with graceful fallbacks
β Use monotonic timestamps to prevent race conditions
β Log structured data for observability
About the Implementation: This A2A gateway is built with Python, FastAPI, AWS Lambda, DynamoDB, SQS, and integrates with OAuth 2.0 providers. It follows the A2A specification and provides a scalable foundation for agent-to-agent communication.
Have you implemented agent communication protocols? What challenges did you face? Share your experiences in the comments below!
About the Author
Written by Suraj Khaitan β Gen AI Architect | Working on serverless AI & cloud platforms.