Content
# MSK A2A Demo - Game Balance Automation System
A Hub-Spoke architecture demo utilizing Google A2A protocol and Kafka
## 🎯 Project Overview
This is a system where AI agents communicate via Kafka for game balance adjustments.
- **Balance Agent**: Acts as a coordinator, calling other agents for comprehensive analysis
- **Data Agent**: Analyzes game statistics data (win rates, game duration, etc.)
- **CS Agent**: Collects and analyzes complaints from the bulletin board (upcoming)
## ✨ Key Features
### ✅ Completed Features
1. **Agent Communication Based on Kafka**
- Full operation between Balance Agent and Data Agent
- Asynchronous messaging through Request/Response topics
- Dynamic agent discovery via Agent Registry
2. **Multi-turn Conversation Support**
- Requests additional information in `input-required` state
- Maintains context for continuous conversation
- Example: "Win rate?" → "Which race?" → "Zerg" → "Zerg win rate 50%"
3. **Implementation of A2A Protocol**
- Task-based state management (completed, input-required, failed)
- Response delivery through artifacts
- Asynchronous processing via Event Queue
## 🏗️ Architecture
### Hub-Spoke Structure
```
┌─────────────────┐
│ Kafka Hub │
│ (localhost) │
└────────┬────────┘
│
┌────────────────────┼────────────────────┐
│ │ │
▼ ▼ ▼
┌───────────────┐ ┌───────────────┐ ┌───────────────┐
│ Balance Agent │ │ Data Agent │ │ CS Agent │
│ (port 9001) │ │ (port 9003) │ │ (port 9002) │
│ │ │ │ │ │
│ - Coordinator │ │ - Win Rate Analysis │ │ - Complaints │
│ - Tool Calls │ │ - Game Duration │ │ (upcoming) │
└───────────────┘ └───────────────┘ └───────────────┘
```
### Kafka Topics
```
agent.balance.requests → Requests to Balance Agent
agent.balance.responses → Responses from Balance Agent
agent.data.requests → Requests to Data Agent
agent.data.responses → Responses from Data Agent
agent.registry → Agent registration information
```
## 🚀 Quick Start
### 1. Run Kafka
```bash
docker-compose up -d
```
### 2. Create Topics
```bash
python scripts/create_topics.py
```
### 3. Run Agents
```bash
# Terminal 1: Data Agent
python agents/data_analysis_agent.py
# Terminal 2: Balance Agent
python agents/game_balance_agent.py
```
### 4. Testing
```bash
# Simple question
python test_kafka_a2a.py
# Multi-turn conversation test
python -c "
import asyncio
from kafka.kafka_transport import KafkaTransport
from a2a.types import Message, Part, TextPart, Role, MessageSendParams
from uuid import uuid4
import json
async def test():
transport = KafkaTransport(target_agent_name='balance')
# Turn 1: Ambiguous question
msg1 = Message(kind='message', role=Role.user,
parts=[Part(TextPart(kind='text', text='Win rate?'))],
message_id=uuid4().hex)
result1 = await transport.send_message(MessageSendParams(message=msg1))
print(f'Turn 1 - State: {result1.status.state}')
data1 = json.loads(result1.artifacts[0].parts[0].root.text)
print(f'Message: {data1[\"message\"]}')
# Turn 2: Provide race
if result1.status.state == 'input-required':
msg2 = Message(kind='message', role=Role.user,
parts=[Part(TextPart(kind='text', text='Zerg'))],
message_id=uuid4().hex,
context_id=result1.context_id)
result2 = await transport.send_message(MessageSendParams(message=msg2))
print(f'Turn 2 - State: {result2.status.state}')
data2 = json.loads(result2.artifacts[0].parts[0].root.text)
print(f'Message: {data2[\"message\"]}')
await transport.close()
asyncio.run(test())
"
```
## 📁 Project Structure
```
game-balance-a2a/
├── agents/
│ ├── game_balance_agent.py # Balance Agent (Coordinator)
│ ├── game_balance_agent_executor.py # Balance Agent execution logic
│ ├── data_analysis_agent.py # Data Agent
│ └── data_analysis_agent_executor.py # Data Agent execution logic
├── kafka/
│ ├── kafka_transport.py # Kafka-based A2A Transport
│ ├── kafka_consumer_handler.py # Kafka Consumer handler
│ └── agent_registry.py # Agent registration/discovery
├── scripts/
│ └── create_topics.py # Kafka topic creation
├── docker-compose.yml # Local Kafka environment
└── test_kafka_a2a.py # Test script
```
## 🔄 Message Flow
### Simple Question (Single-turn)
```
Client
↓ "Tell me the Terran win rate"
Balance Agent (Kafka Consumer)
↓ Tool call
Data Agent (Kafka Consumer)
↓ Win rate analysis
Balance Agent
↓ Response generation
Client
✅ "The Terran win rate is 100.0%"
```
### Complex Question (Multi-turn)
```
Client
↓ "Win rate?"
Balance Agent → Data Agent
↓ input_required
Client
✅ "Which race's win rate would you like to know?"
Client
↓ "Zerg" (same context)
Balance Agent → Data Agent
↓ Win rate analysis
Client
✅ "The Zerg win rate is 50.0%"
```
## 🛠️ Tech Stack
- **Language**: Python 3.13
- **Agent Framework**: Strands
- **LLM**: Amazon Bedrock (Nova Lite)
- **Message Broker**: Apache Kafka (Docker)
- **A2A Protocol**: Google A2A
- **Async**: aiokafka, asyncio
## 📊 Test Results
✅ Balance Agent Kafka communication
✅ Data Agent Kafka communication
✅ Tool calls between agents
✅ Multi-turn conversation (input-required)
✅ Artifact transmission
✅ Context maintenance
## 🔜 Future Plans
- [ ] Implement CS Agent
- [ ] Deploy on AWS MSK
- [ ] Improve GUI
- [ ] Enhance error handling
- [ ] Monitoring dashboard
## 📝 License
MIT License