Content
# A2A MQTT
A Python implementation of the [Agent-to-Agent (A2A) Protocol](https://github.com/a2aproject/A2A) using **MQTT** as the transport layer.
This library enables AI agents to discover each other, negotiate capabilities, and collaborate on tasks using an event-driven architecture suitable for scalable, distributed systems.
## Features
- **MQTT Transport**: Replaces HTTP/JSON-RPC with lightweight, event-driven MQTT messaging.
- **Protocol Compliance**: Implements core A2A concepts including Agent Cards, Task Requests, and Task Results.
- **Automatic Discovery**: Agents automatically publish retained "Agent Cards" to a common discovery topic.
- **Graceful Shutdown**: Automatically removes Agent Cards (clears retained message) upon stopping.
- **Type Safety**: Uses Pydantic models for strict schema validation.
## Installation
```bash
pip install -r requirements.txt
```
## Quick Start
### 1. Start an Agent
Create an agent that exposes a simple tool.
```python
from a2a_mqtt import A2AAgent
def echo(params: dict):
return f"Echo: {params.get('message')}"
agent = A2AAgent(
org_id="my-org",
agent_id="agent-001",
name="Echo Agent",
description="Echoes messages",
broker_address="test.mosquitto.org"
)
agent.register_tool(echo)
agent.start()
```
### 2. Send a Task
Use the `MQTTTransport` directly or another Agent to send a task.
## Topic Schema
This implementation follows a strict topic hierarchy to organize traffic:
| Purpose | Topic Pattern | Description |
|---------|---------------|-------------|
| **Discovery** | `discovery/{org_id}/{agent_id}` | Retained **Agent Cards**. Subscribers listen here to find available agents. |
| **Tasks** | `{org_id}/{agent_id}/tasks` | Incoming **Task Requests**. Agents subscribe here to receive work. |
| **Results** | `{org_id}/{agent_id}/results` | Outgoing **Task Results**. Clients subscribe here to get responses. |
## Message Formats
Messages use JSON payloads compatible with the A2A spec.
### Agent Card (Discovery)
Published to `discovery/...` with `retain=True`.
```json
{
"protocolVersion": "1.0",
"name": "Supply Chain Agent",
"url": "mqtt://global-industries/supply-chain-01",
"preferredTransport": "MQTT",
"skills": [
{
"id": "inventory-check",
"name": "Check Inventory",
"description": "Returns stock levels for a product ID."
}
]
}
```
### Task Request
Published to `.../tasks`.
```json
{
"id": "req-123",
"taskId": "task-abc-789",
"operation": "inventory-check",
"timestamp": "2024-01-01T12:00:00Z",
"params": {
"message": {
"role": "user",
"content": {
"parts": [{ "data": { "product_id": "WIDGET-A" } }]
}
}
}
}
```
### Task Result
Published to `.../results`.
```json
{
"id": "req-123",
"taskId": "task-abc-789",
"status": "completed",
"result": {
"status": "completed",
"result": { "stock": 500 }
}
}
```
## Security & Best Practices
For production deployments (non-demo), follow these guidelines:
1. **TLS Encryption**: Always use MQTTS (typically port 8883) to encrypt traffic between agents and the broker.
2. **Authentication**: Configure the broker to require username/password or client certificate authentication.
3. **Access Control Lists (ACLs)**: Restrict agents so they can only:
- Publish to their own `tasks/results` topics.
- Subscribe to their own `tasks` topic.
- Read `discovery/#` (if they need to find others).
4. **Graceful Shutdown**: The `A2AAgent` class attempts to clear its retained Agent Card on `stop()`. Ensure your application handles signals (SIGINT/SIGTERM) to call `agent.stop()`.
## Development
Running tests:
```bash
pip install pytest
pytest tests/
```