Content
# a2a-redis
Redis integrations for the Agent-to-Agent (A2A) JavaScript/TypeScript SDK.
This package provides Redis-backed implementations of core A2A components for persistent task storage, reliable event queue management, and push notification configuration using Redis.
> This package is the JavaScript/TypeScript implementation, inspired by the Python [a2a-redis](https://github.com/redis-developer/a2a-redis) package.
## Features
- **RedisTaskStore & RedisJSONTaskStore**: Redis-backed task storage using hashes or JSON
- **RedisStreamsQueueManager & RedisStreamsEventQueue**: Persistent, reliable event queues with consumer groups
- **RedisPubSubQueueManager & RedisPubSubEventQueue**: Real-time, low-latency event broadcasting
- **RedisPushNotificationConfigStore**: Task-based push notification configuration storage
- **Consumer Group Strategies for Streams**: Flexible load balancing and instance isolation patterns
- **Next.js Serverless Ready**: Built-in helpers for singleton Redis clients and route handlers
## Language Support
This package is the **JavaScript/TypeScript** implementation. For Python, see the [a2a-redis Python package](https://github.com/redis-developer/a2a-redis).
**Why separate implementations?**
- Each language has its own ecosystem and best practices
- Python version: Designed for AsyncIO, Starlette, FastAPI
- JavaScript version: Designed for Node.js, Next.js, serverless environments
**Identical API patterns** make it easy to use both versions interchangeably or migrate between languages.
### Key JavaScript/TypeScript Advantages
- **Next.js Integration**: Built-in helpers for serverless Redis clients and route handlers (`nextjs-helpers`)
- **TypeScript Strict Mode**: Full type safety with strict type checking
- **ESM Support**: Native ES modules, tree-shakeable imports
- **Singleton Pattern**: Optimized Redis client pooling for serverless functions
- **No runtime overhead**: Compile-time type checking catches errors early
### Parallel API Between Languages
Both implementations share identical component names and patterns:
| Component | Python | TypeScript |
|-----------|--------|-----------|
| Task Storage | `RedisTaskStore` | `RedisTaskStore` |
| JSON Storage | `RedisJSONTaskStore` | `RedisJSONTaskStore` |
| Streams Queue | `RedisStreamsQueueManager` | `RedisStreamsQueueManager` |
| Pub/Sub Queue | `RedisPubSubQueueManager` | `RedisPubSubQueueManager` |
| Push Notifications | `RedisPushNotificationConfigStore` | `RedisPushNotificationConfigStore` |
| Consumer Strategy | `ConsumerGroupStrategy` | `ConsumerGroupStrategy` |
## Installation
```bash
npm install a2a-redis
# or with pnpm
pnpm add a2a-redis
```
Both `redis` and `@a2a-js/sdk` are peer dependencies and must be installed in your project.
### Next.js Users
If you're integrating a2a-redis with Next.js, see the [**Next.js Integration Guide**](docs/NEXTJS_INTEGRATION.md) for:
- Singleton Redis client setup for serverless
- Route handler examples
- Multi-turn conversation patterns
- Production deployment tips
## Quick Start
```typescript
import { RedisTaskStore, RedisStreamsQueueManager, RedisPushNotificationConfigStore } from 'a2a-redis';
import { createRedisClient } from 'a2a-redis/utils';
import { DefaultRequestHandler } from 'a2a-sdk/server';
import { A2AExpressApplication } from 'a2a-sdk/server/express';
// Create Redis client with connection management
const redisClient = createRedisClient({
url: 'redis://localhost:6379/0',
maxConnections: 50,
});
// Initialize Redis components
const taskStore = new RedisTaskStore(redisClient, { prefix: 'myapp:tasks:' });
const queueManager = new RedisStreamsQueueManager(redisClient, { prefix: 'myapp:queues:' });
const pushConfigStore = new RedisPushNotificationConfigStore(redisClient, { prefix: 'myapp:push:' });
// Use with A2A request handler
const requestHandler = new DefaultRequestHandler({
agentExecutor: yourAgentExecutor,
taskStore,
queueManager,
pushConfigStore,
});
// Create A2A server
const server = new A2AExpressApplication({
agentCard: yourAgentCard,
httpHandler: requestHandler,
});
```
## Connecting to Hosted Redis
For production deployments, you can connect to hosted Redis services like Redis Cloud, AWS ElastiCache, or Heroku Redis:
### Using Redis URL
```typescript
import { createRedisClient } from 'a2a-redis/utils';
// From environment variable
const redisClient = createRedisClient({
url: process.env.REDIS_URL, // e.g., redis://user:password@host:port/db
maxConnections: 50,
});
```
### Using Host, Port, and Password
```typescript
import { createRedisClient } from 'a2a-redis/utils';
// For Redis Cloud or similar hosted services
const redisClient = createRedisClient({
host: 'your-redis-host.redis.cloud',
port: 19XXX,
password: process.env.REDIS_PASSWORD,
tls: true, // Required for most hosted services
maxConnections: 50,
});
```
### Environment Variables
```bash
# .env file
REDIS_URL=redis://:your-password@your-host:19XXX/0
# OR
REDIS_HOST=your-redis-host.redis.cloud
REDIS_PORT=19XXX
REDIS_PASSWORD=your-password
```
## Queue Components
The package provides both high-level queue managers and direct queue implementations:
### Queue Managers
- `RedisStreamsQueueManager` - Manages Redis Streams-based queues
- `RedisPubSubQueueManager` - Manages Redis Pub/Sub-based queues
- Both implement the A2A SDK's `QueueManager` interface
### Event Queues
- `RedisStreamsEventQueue` - Direct Redis Streams queue implementation
- `RedisPubSubEventQueue` - Direct Redis Pub/Sub queue implementation
- Both implement the `EventQueue` interface
## Queue Types: Streams vs Pub/Sub
### RedisStreamsQueueManager
**Key Features:**
- **Persistent storage**: Events remain in streams until explicitly trimmed
- **Guaranteed delivery**: Consumer groups with acknowledgments prevent message loss
- **Load balancing**: Multiple consumers can share work via consumer groups
- **Failure recovery**: Unacknowledged messages can be reclaimed by other consumers
- **Event replay**: Historical events can be re-read from any point in time
- **Ordering**: Maintains strict insertion order with unique message IDs
**Use Cases:**
- Task event queues requiring reliability
- Audit trails and event history
- Work distribution systems
- Systems requiring failure recovery
- Multi-consumer load balancing
**Trade-offs:**
- Higher memory usage (events persist)
- More complex setup (consumer groups)
- Slightly higher latency than pub/sub
### RedisPubSubQueueManager
**Key Features:**
- **Real-time delivery**: Events delivered immediately to active subscribers
- **No persistence**: Events not stored, only delivered to active consumers
- **Fire-and-forget**: No acknowledgments or delivery guarantees
- **Broadcasting**: All subscribers receive all events
- **Low latency**: Minimal overhead for immediate delivery
- **Minimal memory usage**: No storage of events
**Use Cases:**
- Live status updates and notifications
- Real-time dashboard updates
- System event broadcasting
- Non-critical event distribution
- Low-latency requirements
- Simple fan-out scenarios
**Not suitable for:**
- Critical event processing requiring guarantees
- Systems requiring event replay or audit trails
- Offline-capable applications
- Work queues requiring load balancing
## Components
### Task Storage
#### RedisTaskStore
Stores task data in Redis using hashes with JSON serialization. Works with any Redis server.
```typescript
import { RedisTaskStore } from 'a2a-redis';
const taskStore = new RedisTaskStore(redisClient, { prefix: 'mytasks:' });
// A2A TaskStore interface methods
await taskStore.save('task123', { status: 'pending', data: { key: 'value' } });
const task = await taskStore.get('task123');
const success = await taskStore.delete('task123');
// List all task IDs (utility method)
const taskIds = await taskStore.listTaskIds();
```
#### RedisJSONTaskStore
Stores task data using Redis's JSON module for native JSON operations and complex nested data.
```typescript
import { RedisJSONTaskStore } from 'a2a-redis';
// Requires Redis 8 or RedisJSON module
const jsonTaskStore = new RedisJSONTaskStore(redisClient, { prefix: 'mytasks:' });
// Same interface as RedisTaskStore but with native JSON support
await jsonTaskStore.save('task123', { complex: { nested: { data: 'value' } } });
```
#### Task TTL/Expiration
Both `RedisTaskStore` and `RedisJSONTaskStore` support automatic expiration of task keys via Redis TTL. This is useful for ephemeral tasks that should not accumulate in Redis indefinitely.
```typescript
import { RedisTaskStore } from 'a2a-redis';
// Create a task store with 30-minute task expiration
const taskStore = new RedisTaskStore(redisClient, {
prefix: 'tasks:',
ttl: 1800, // seconds (30 minutes)
});
// Each time save() is called, the TTL is reset
// Example: Task expires if not updated for 30 minutes
await taskStore.save({
id: 'task123',
status: 'pending',
createdAt: new Date().toISOString(),
});
// Update task status after 10 minutes - TTL is reset, now expires in another 30 minutes
setTimeout(() => {
taskStore.save({
id: 'task123',
status: 'completed',
completedAt: new Date().toISOString(),
});
}, 10 * 60 * 1000);
// Without TTL (default), tasks persist until manually deleted
const persistentStore = new RedisTaskStore(redisClient, { prefix: 'permanent:' });
```
**TTL Benefits:**
- Prevents unbounded memory growth from accumulated completed tasks
- Automatic cleanup without manual intervention
- Configurable per task store
- TTL refreshes on each task update (moving expiration window)
**Default:** No TTL (tasks persist forever) - TTL is opt-in
#### Task Store Return Types
Both `RedisTaskStore` and `RedisJSONTaskStore` implement async methods that return optional types. Type aliases are exported for convenience:
```typescript
import type { TaskLoadResult } from 'a2a-redis';
// TaskLoadResult = Task | undefined
const task: TaskLoadResult = await taskStore.load('task-123');
if (task) {
console.log(`Task status: ${task.status}`);
} else {
console.log('Task not found or has expired');
}
```
**Return types:**
- `load(taskId)` returns `Promise<TaskLoadResult>` (Task | undefined)
- `save(taskId, task)` returns `Promise<void>`
- `delete(taskId)` returns `Promise<void>`
- `listTaskIds()` returns `Promise<string[]>`
### Queue Managers
Both queue managers implement the A2A QueueManager interface with full async support:
```typescript
import { RedisStreamsQueueManager, RedisPubSubQueueManager } from 'a2a-redis';
import { ConsumerGroupStrategy, ConsumerGroupConfig } from 'a2a-redis';
// For reliable, persistent processing
const streamsManager = new RedisStreamsQueueManager(redisClient, {
prefix: 'myapp:streams:',
});
// For real-time, low-latency broadcasting
const pubsubManager = new RedisPubSubQueueManager(redisClient, {
prefix: 'myapp:pubsub:',
});
// With custom consumer group configuration (streams only)
const config = new ConsumerGroupConfig({
strategy: ConsumerGroupStrategy.SHARED_LOAD_BALANCING,
});
const streamsManager = new RedisStreamsQueueManager(redisClient, { consumerConfig: config });
async function main() {
// Same interface for both managers
const queue = await streamsManager.createOrTap('task123');
// Enqueue events
await queue.enqueueEvent({ type: 'progress', message: 'Task started' });
await queue.enqueueEvent({ type: 'progress', message: '50% complete' });
// Dequeue events
try {
const event = await queue.dequeueEvent({ noWait: true }); // Non-blocking
console.log(`Got event: ${event}`);
await queue.taskDone(); // Acknowledge the message (streams only)
} catch (error) {
console.log('No events available');
}
// Close queue when done
await queue.close();
}
main();
```
### Consumer Group Strategies
The Streams queue manager supports different consumer group strategies:
```typescript
import { ConsumerGroupStrategy, ConsumerGroupConfig } from 'a2a-redis';
// Multiple instances share work across a single consumer group
const config = new ConsumerGroupConfig({
strategy: ConsumerGroupStrategy.SHARED_LOAD_BALANCING,
});
// Each instance gets its own consumer group
const config = new ConsumerGroupConfig({
strategy: ConsumerGroupStrategy.INSTANCE_ISOLATED,
});
// Custom consumer group name
const config = new ConsumerGroupConfig({
strategy: ConsumerGroupStrategy.CUSTOM,
groupName: 'my_group',
});
const streamsManager = new RedisStreamsQueueManager(redisClient, { consumerConfig: config });
```
### RedisPushNotificationConfigStore
Stores push notification configurations per task. Implements the A2A PushNotificationConfigStore interface.
```typescript
import { RedisPushNotificationConfigStore } from 'a2a-redis';
import { PushNotificationConfig } from 'a2a-sdk';
const configStore = new RedisPushNotificationConfigStore(redisClient, {
prefix: 'myapp:push:',
});
// Create push notification config
const config = new PushNotificationConfig({
url: 'https://webhook.example.com/notify',
token: 'secret_token',
id: 'webhook_1',
});
// A2A interface methods
await configStore.setInfo('task123', config);
// Get all configs for a task
const configs = await configStore.getInfo('task123');
for (const config of configs) {
console.log(`Config ${config.id}: ${config.url}`);
}
// Delete specific config or all configs for a task
await configStore.deleteInfo('task123', 'webhook_1'); // Delete specific
await configStore.deleteInfo('task123'); // Delete all
```
### RedisContextStore
Manages conversation contexts for multi-turn interactions. Groups related tasks and provides history retrieval.
```typescript
import { RedisContextStore } from 'a2a-redis';
// Create a context store with 1-hour expiration
const contextStore = new RedisContextStore(redisClient, {
prefix: 'contexts:',
ttl: 3600, // optional: auto-cleanup after 1 hour
});
// Create a context for a new conversation
await contextStore.createContext('conv-user-123', {
userId: 'user-123',
agent: 'travel-agent',
sessionId: 'session-abc',
});
// Add tasks as they're created in the conversation
await contextStore.addTaskToContext('conv-user-123', 'task-1');
await contextStore.addTaskToContext('conv-user-123', 'task-2');
await contextStore.addTaskToContext('conv-user-123', 'task-3');
// Retrieve context to see all conversation tasks
const context = await contextStore.getContext('conv-user-123');
console.log(`Conversation has ${context?.taskIds.length} tasks`);
console.log(`Started at: ${context?.createdAt}`);
console.log(`User: ${context?.metadata?.userId}`);
// Get just the task IDs
const taskIds = await contextStore.getContextTasks('conv-user-123');
for (const taskId of taskIds) {
const task = await taskStore.load(taskId);
console.log(`Task ${taskId} status: ${task?.status}`);
}
// Clean up when conversation ends
await contextStore.deleteContext('conv-user-123');
```
**Context Features:**
- **Multi-turn support**: Group related tasks from a single conversation
- **History retrieval**: Get all tasks in a conversation
- **Metadata storage**: Store user/session info with the context
- **TTL-based cleanup**: Automatic expiration of old conversations
- **Task isolation**: Tasks added to context without duplicates
**Default:** No TTL (contexts persist) - TTL is opt-in
#### Context Store Return Types
The `RedisContextStore` implements async methods with optional return types. Type aliases are exported for convenience:
```typescript
import type { ContextLookupResult } from 'a2a-redis';
// ContextLookupResult = Context | undefined
const context: ContextLookupResult = await contextStore.getContext('conv-user-123');
if (context) {
console.log(`Context created at: ${context.createdAt}`);
console.log(`Tasks in context: ${context.taskIds.length}`);
} else {
console.log('Context not found or has expired');
}
```
**Return types:**
- `getContext(contextId)` returns `Promise<ContextLookupResult>` (Context | undefined)
- `createContext(contextId, metadata?)` returns `Promise<void>`
- `addTaskToContext(contextId, taskId)` returns `Promise<void>`
- `getContextTasks(contextId)` returns `Promise<string[]>`
- `deleteContext(contextId)` returns `Promise<void>`
## Requirements
### Required
- **Node.js** 18+
- **TypeScript** 5.0+ (if using TypeScript)
- **redis** (peer dependency) - [redis.io](https://github.com/redis/node-redis) >= 4.0.0 (the official Redis client)
- **@a2a-js/sdk** (peer dependency) >= 0.3.4
Both `redis` and `@a2a-js/sdk` must be installed in your project:
```bash
npm install redis @a2a-js/sdk a2a-redis
# or with pnpm
pnpm add redis @a2a-js/sdk a2a-redis
```
### Optional
- **RedisJSON module** for `RedisJSONTaskStore` (enhanced nested data support)
- **Redis Stack** or Redis with modules for full feature support
## Development
```bash
# Install dependencies
npm install
# Run tests
npm test
# Run tests with coverage
npm run test:coverage
# Type checking
npm run type-check
# Linting and formatting
npm run lint
npm run format
# Run examples
npm run example:basic
npm run example:agent
```
## Testing
Tests use Redis database 15 for isolation and include both unit and integration tests:
```bash
# Run all tests
npm test
# Run specific test file
npm test -- task-store.test.ts
# Run with coverage
npm run test:coverage
```
### Test Architecture
Tests are designed to run efficiently with **limited Redis connections**, particularly important for free-tier hosted Redis (e.g., Redis Cloud free tier which allows only 1 concurrent connection):
- **Shared Redis Client**: All tests use a single Redis client initialized in `tests/setup.ts`, reducing connection overhead
- **Single-threaded Execution**: Tests run serially (`maxWorkers: 1`) to prevent connection conflicts
- **Database Isolation**: Tests use Redis database 15, separate from development/production databases
- **Automatic Cleanup**: `cleanupTestRedis()` flushes database between tests
### Pub/Sub Tests (Skipped)
Pub/Sub integration tests are skipped by default because they require duplicate connections:
- Redis Pub/Sub requires a separate connection for subscriptions (`client.duplicate()`)
- Free-tier Redis (1 connection limit) cannot support both main + pub/sub connections
- Unit tests for Pub/Sub still run with mock clients
- To run pub/sub integration tests, use a Redis instance with higher connection limits
### Running Tests with Redis Cloud Free Tier
The test suite is optimized for Redis Cloud free tier:
1. **Environment Setup**
```bash
# Add to .env
REDIS_URL=redis://user:password@host:port/db
```
2. **Run Tests**
```bash
npm test
```
3. **Expected Output**
- ~200+ tests passing (148+ integration tests)
- ~30 pub/sub tests skipped (due to connection limits)
- ~17 failing tests (pre-existing, unrelated to connection limit)
The shared client pattern allows efficient testing even with connection constraints.
## License
MIT