Content
# a2a-amqp
AMQP-backed EventBus and WorkQueue for scaling A2A agents with long-running tasks.
## Why?
A2A agents often need to handle long-running tasks (LLM calls, complex processing, etc.). Running these tasks inline in HTTP handlers causes:
- **Timeout issues**: HTTP connections timeout on long tasks
- **Scaling problems**: Single server bottlenecks
- **Resource waste**: Servers blocked waiting for tasks to complete
This library solves these problems by:
1. **Queuing tasks** via AMQP instead of processing inline
2. **Distributing work** across multiple worker processes
3. **Event sourcing** all task events for replay and recovery
4. **Streaming results** back via SSE while workers process in the background
## Architecture
```
HTTP Request → Server (enqueues task) → Returns immediately
↓
AMQP Queue
↓
Worker Pool (scales horizontally)
↓
Process task & publish events
↓
AMQP Stream (event sourcing)
↓
Client streams results via SSE
```
## Installation
```bash
# Installing using bun
bun add @cloudamqp/a2a-amqp @a2a-js/sdk @cloudamqp/amqp-client
# Or with npm
npm install @cloudamqp/a2a-amqp @a2a-js/sdk @cloudamqp/amqp-client
```
**Requires**: LavinMQ or RabbitMQ with stream support
```bash
docker run -d -p 5672:5672 -p 15672:15672 cloudamqp/lavinmq:latest
```
## Quick Start
### 1. HTTP Server (enqueues tasks)
```typescript
import { AMQPAgentBackend, QueuingRequestHandler } from "@84codes/a2a-amqp";
import { A2AExpressApp } from "@a2a-js/sdk/server/express";
import express from "express";
// Create AMQP backend
const backend = await AMQPAgentBackend.create({
url: "amqp://localhost:5672",
agentName: "my-agent",
});
// Create request handler (handles task queuing + event projection)
const requestHandler = new QueuingRequestHandler(agentCard, backend);
await requestHandler.initialize();
// Setup Express with A2A routes
const app = express();
new A2AExpressApp(requestHandler).setupRoutes(app, "/");
app.listen(3000);
```
### 2. Worker Process (processes tasks)
```typescript
import { AMQPAgentBackend, WorkerEventBus } from "@84codes/a2a-amqp";
import { AgentExecutor, RequestContext } from "@a2a-js/sdk/server";
// Create backend with same agent name as server
const backend = await AMQPAgentBackend.create({
url: "amqp://localhost:5672",
agentName: "my-agent",
});
// Initialize work queue
await backend.workQueue.initialize();
class MyExecutor implements AgentExecutor {
async execute(context: RequestContext, eventBus: ExecutionEventBus) {
// Your long-running task logic here
eventBus.publish({
kind: "status-update",
taskId: context.taskId,
contextId: context.contextId,
status: { state: "working", timestamp: new Date().toISOString() },
final: false,
});
// ... do work ...
eventBus.publish({
kind: "status-update",
taskId: context.taskId,
contextId: context.contextId,
status: { state: "completed", timestamp: new Date().toISOString() },
final: true,
});
eventBus.finished();
}
}
const executor = new MyExecutor();
// Start consuming with async generator pattern
const messages = backend.workQueue.start();
for await (const taskMessage of messages) {
const { taskId, contextId, requestContext } = taskMessage;
// Create request context
const context = new RequestContext(
requestContext.userMessage,
taskId,
contextId,
requestContext.task,
requestContext.referenceTasks
);
// Create event bus for publishing task events
const eventBus = new WorkerEventBus(backend.amqpConnection, taskId, contextId);
// Execute task
await executor.execute(context, eventBus);
}
```
### 3. Scale horizontally
Run multiple workers to process tasks in parallel:
```bash
# Terminal 1: HTTP Server
bun run server
# Terminal 2-N: Workers (scale as needed)
bun run worker
bun run worker # Add more workers for higher throughput
```
## Features
- **Work Queue**: Distribute tasks across multiple worker processes
- **Event Sourcing**: All task events stored in AMQP streams for replay
- **In-Memory Projection**: Fast task lookups with automatic recovery from streams
- **SSE Streaming**: Automatic streaming of task events back to clients
- **Horizontal Scaling**: Add more workers to increase throughput
- **Graceful Shutdown**: Clean consumer and connection handling
- **Type-Safe**: Full TypeScript support with Zod validation
## Configuration
```typescript
interface AMQPAgentBackendConfig {
url: string; // AMQP broker URL
agentName: string; // Agent identifier
streamRetention?: string; // Event retention (default: "7d")
streamMaxBytes?: number; // Max stream size (default: 1GB)
workQueueName?: string; // Custom work queue name
exchangeName?: string; // Custom exchange name
logger?: Logger; // Custom logger
connection?: {
heartbeat?: number; // Heartbeat interval in seconds
reconnectDelay?: number; // Reconnection delay in ms
maxReconnectAttempts?: number;// Max reconnection attempts
};
publishing?: {
persistent?: boolean; // Persistent messages (default: true)
confirmMode?: boolean; // Publisher confirms (default: true)
messageTtl?: number; // Message TTL in ms (0 = no expiration)
};
}
```
## Examples
See complete working examples:
- `src/examples/http-server.ts` - HTTP server with queuing
- `src/examples/worker.ts` - Worker process
```bash
# Run the example
bun run server # Terminal 1
bun run worker # Terminal 2
# Send a request
curl -X POST http://localhost:3000/ \
-H "Content-Type: application/json" \
-d '{"jsonrpc":"2.0","id":1,"method":"messages/send","params":{"message":{"kind":"message","role":"user","messageId":"1","contextId":"ctx-1","parts":[{"kind":"text","text":"Hello"}]}}}'
```
## Testing
```bash
bun run test # Run all tests (unit + integration)
bun run test:unit # Unit tests only
bun run test:integration # Integration tests only
bun run test:watch # Watch mode
bun run test:coverage # With coverage
```
## License
MIT