Content
# RAGFlow A2A Integration
This project encapsulates the RAGFlow knowledge base assistant using the A2A (Agent2Agent) protocol, allowing any client that supports the A2A protocol to easily access and utilize RAGFlow's retrieval and question-answering capabilities.
## Project Architecture
```
┌───────────────┐ A2A Protocol ┌───────────────┐ RAGFlow API ┌───────────────┐
│ A2A Client │<---------------->│ A2A Server │<------------------->│ RAGFlow Backend│
│ │ (HTTP/JSON) │ │ │ chat assistant │
└───────────────┘ └───────────────┘ └───────────────┘
| | |
| | |
┌─────V─────┐ ┌───────V────────┐ ┌────────V───────┐
│ Client App │ │ Flask Service │ │ Large Language Model │
│ Interaction │ │ A2A Protocol Handling │ │ Embedding Model │
│ Management │ │ RAGFlow Integration │ │ Vector Storage │
└───────────┘ └────────────────┘ └────────────────┘
```
The project consists of two main components:
- **ragflow_a2a_server**: A Flask-based A2A server that wraps your configured RAGFlow chat assistant into an agent compatible with the A2A protocol, providing an interface that complies with A2A specifications.
- **ragflow_a2a_client**: A simple Python client example demonstrating how to interact with the A2A server.
## System Features
- **Complete Logging System**: Supports multi-level logging, outputting to both console and files, with automatic log rotation.
- **Intelligent Retry Mechanism**: Automatically retries critical API calls upon failure, using exponential backoff and jitter algorithms.
- **Security Rate Limiting**:
- IP-level rate limiting: Up to 10 requests per minute per IP to the `/tasks/send` endpoint, and 20 requests to the `/tasks/get` endpoint.
- Global rate limiting: Prevents resource overconsumption.
- Intelligent error handling: Avoids exposing sensitive information.
- **Comprehensive Testing Framework**: Includes unit tests and integration tests to ensure system stability and reliability.
## Installation
### Server Installation
1. Navigate to the server directory:
```
cd ragflow_a2a_server
```
2. Install dependencies:
```
pip install -r requirements.txt
```
3. Configure the `.env` file (refer to `.env.example`):
```
RAGFLOW_API_KEY=your_api_key_here
RAGFLOW_ENDPOINT=http://your_ragflow_instance_url:port
chat_assistants=your_assistant_id
```
### Client Installation
1. Navigate to the client directory:
```
cd ragflow_a2a_client
```
2. Install dependencies:
```
pip install -r requirements.txt
```
## Usage
### Starting the Server
Run the following command in the `ragflow_a2a_server` directory:
```
python run_production.py
```
The server will start at http://localhost:5000.
### Using the Client Example
Run the following command in the `ragflow_a2a_client` directory:
```
python client.py
```
This will execute a preset dialogue flow, demonstrating how to interact with the A2A server.
## Integration with Other A2A Clients
You can connect to this server using any client that complies with the A2A protocol. Key endpoints include:
- **Agent Card**: `GET /.well-known/agent.json` - Retrieve agent information and capabilities.
- **Send Task**: `POST /tasks/send` - Send a message to the agent (rate-limited).
- **Get Task**: `GET /tasks/get?taskId={taskId}` - Query task status and results (rate-limited).
## A2A Protocol Implementation Details
This project strictly adheres to the A2A (Agent2Agent) protocol specifications and implements the following core functionalities:
### 1. Agent Card Discovery Mechanism
Provides agent metadata through the `/.well-known/agent.json` endpoint, including:
- Agent name, description, and version information.
- Agent capabilities and API references.
- Agent icon and other visual resources.
### 2. Task Management and Status Tracking
- **Task Creation and Identification**: Each dialogue has a unique `taskId`, which can be provided by the client or automatically generated by the server.
- **Task Status Maintenance**: Maintains a mapping of task statuses in memory (`tasks` dictionary), including:
```python
{
"taskId": "unique_task_id",
"state": "completed/failed",
"createTime": "creation_time",
"updateTime": "update_time",
"messages": [list_of_message_objects],
"artifacts": [list_of_artifacts]
}
```
- **Session Continuity**: Maps A2A tasks to RAGFlow sessions through `task_to_session` mapping, ensuring contextual coherence in multi-turn dialogues.
### 3. Message Format and Interaction Model
Follows the A2A message format specifications, including:
- **Message Roles**: Distinguishes between `user` and `agent` messages.
- **Message Parts**: Supports a `parts` array structure for text content:
```json
{
"role": "user",
"parts": [{"text": {"value": "user message content"}}]
}
```
- **Complete Error Handling**: Returns standardized error responses upon processing failures.
### 4. RAGFlow Integration Layer
- **Transparent Proxy**: Seamlessly converts A2A requests into RAGFlow API calls.
- **Session Management**: Automatically creates and manages RAGFlow sessions, mapping A2A tasks to RAGFlow sessions.
- **Response Adaptation**: Handles various RAGFlow response formats (including SSE streaming responses) and adapts them to A2A response formats.
## A2A Protocol Overview
### A2A Protocol Overview
The A2A (Agent2Agent) protocol is an open protocol launched by Google Cloud, aimed at facilitating interoperability between different AI agents. Its primary goal is to allow AI agents to communicate and collaborate effectively in dynamic, multi-agent ecosystems, regardless of whether they are built by different vendors or use different technological frameworks.
### Core Terms of the A2A Protocol
1. **Agent**: An AI service that provides specific functionalities and communicates with other agents via the A2A protocol.
2. **AgentCard**: A JSON metadata file describing the agent's capabilities, including name, description, version, skills, etc.
Key fields in this project's AgentCard:
- **agentName**: Agent name - "RAGFlow Agent"
- **description**: Detailed description of the agent's capabilities, including knowledge base retrieval, Q&A, and information querying.
- **version**: Agent version number.
- **contactEmail**: Contact email.
- **endpointUrl**: Agent service endpoint URL.
- **authentication**: Authentication method configuration.
- **capabilities**: Capability configuration, including text input/output, streaming responses, etc.
- **skills**: Core skills configuration of the agent, defining input/output patterns.
3. **Task**: A stateful unit of work representing a specific request being processed by the agent.
- **TaskState**: A task can be in the following states:
- `submitted`: Submitted but not yet started.
- `working`: Currently being processed.
- `input-required`: Requires additional input.
- `completed`: Successfully completed.
- `canceled`: Canceled.
- `failed`: Processing failed.
- `unknown`: Status unknown.
4. **Message**: A unit of communication exchanged between the user and the agent, containing:
- `role`: Sender's role (`user` or `agent`).
- `parts`: List of message content parts.
5. **Part**: A content fragment within a message or artifact, supporting various types:
- `TextPart`: Text content.
- `FilePart`: File content.
- `DataPart`: Structured data.
6. **Artifact**: An output generated by the agent as a result of a task, which is immutable and can contain multiple parts.
### Design Principles of the A2A Protocol
1. **Capability Interoperability**: Allows agents to collaborate naturally in an unstructured manner.
2. **Compatibility with Existing Standards**: Based on widely accepted technical standards such as HTTP, SSE, and JSON-RPC.
3. **Default Security**: Supports enterprise-level authentication and authorization mechanisms.
4. **Long Task Support**: Flexibly supports scenarios ranging from quick tasks to complex long-running operations.
5. **Modal Independence**: Supports various interaction forms, including text, audio, and video.
### A2A Endpoints Implementation in This Project
This project implements the following standard A2A endpoints:
1. **Agent Card Discovery**
- `GET /.well-known/agent.json`: Returns agent metadata.
2. **Task Operations**
- `POST /tasks/send`: Sends a task request, creating a new task or continuing an existing one.
- `GET /tasks/get?taskId={taskId}`: Retrieves the status and results of a specific task.
3. **Error Handling and Status Codes**
- Implements standard JSON-RPC error codes.
- Provides detailed error messages and statuses.
## Error Handling and Rate Limiting
When your requests reach the rate limit, the server will return a response similar to the following:
```json
{
"error": "IP xxx.xxx.xxx.xxx rate limit: please try again in x.xx seconds"
}
```
Clients should implement appropriate backoff and retry logic to handle such situations.
## License
[Apache License 2.0](LICENSE)
## Contribution
Contributions are welcome through Issues and Pull Requests. For details, please refer to the [Contribution Guidelines](CONTRIBUTING.md).