Content
# MXP Agents Runtime SDK
[](https://crates.io/crates/mxp-agents)
[](https://docs.rs/mxp-agents)
[](https://github.com/yafatek/mxpnexus/blob/main/LICENSE-MIT)
[](https://www.rust-lang.org)
<img src="assets/mxp-runtime-social.png" alt="MXP Agents Runtime overview" width="1200" />
Production-grade Rust SDK for building autonomous AI agents over the [MXP protocol](https://github.com/yafatek/mxpnexus).
## Why MXP Agents Runtime
MXP Agents Runtime is the production runtime for agents that speak MXP natively. It focuses on predictable behavior, governance, and interoperability.
- Protocol-native: handles MXP `Call`, `Response`, `Event`, `Stream*`, `AgentRegister`, and `AgentHeartbeat`.
- Deterministic lifecycle and concurrency: explicit `Lifecycle` transitions plus bounded `TaskScheduler`.
- Governance-first: policy checks for tools, inference, and memory with audit emission hooks.
- Capability-scoped tools: a typed registry with metadata and versioning.
- Operations-ready: graceful shutdown and checkpoint-ready recovery.
### Compared to chain frameworks
LangChain/LangGraph are great for Python-first prototyping and graph orchestration. MXP Agents Runtime targets protocol-native, production-grade agent systems.
| Need | MXP Agents Runtime | Chain frameworks |
| --- | --- | --- |
| Protocol-native A2A | MXP message types and transport | Typically HTTP/JSON adapters |
| Runtime lifecycle | Explicit lifecycle + bounded scheduler | Orchestration focused |
| Governance | Policy engine + audit emission | Usually external or custom |
| Operations | Graceful shutdown + recovery hooks | Often add-on |
| Language strategy | Rust core; JS/Py SDKs planned | Python-first |
## MXP-native real-world examples
- Customer support agent with tools and MXP responses: `examples/customer-support-agent`
- Enterprise setup with policy + telemetry: `examples/enterprise-agent`
- Multi-agent pipeline and real-world demos: `examples/real-world`
- MXP agent mesh over UDP (coordinator + agents + test client): `examples/RUN_AGENTS.md`
- Aviation Flight Pulse (DXB live feed + weather): `examples/aviation-flight-pulse`
## Community and roadmap
- Positioning and why we built this: `docs/positioning.md`
- Comparison and when to use MXP vs chain frameworks: `docs/comparison.md`
- Demo guide with real MXP flows: `docs/demos.md`
- Project updates and release notes: `docs/updates.md`
- Community channels: `docs/community.md`
- Roadmap and milestones: `ROADMAP.md`
- Contributing guidelines: `CONTRIBUTING.md`
---
## Installation
```toml
[dependencies]
mxp-agents = "0.3"
tokio = { version = "1", features = ["rt-multi-thread", "macros"] }
futures = "0.3"
```
## MXP-native quickstart
Small end-to-end flow: build a kernel, send an MXP `Call`, and read the outcome.
```rust
use mxp::{Message, MessageType};
use mxp_agents::adapters::gemini::{GeminiAdapter, GeminiConfig};
use mxp_agents::kernel::{AgentKernel, CollectingSink, KernelMessageHandler, TaskScheduler};
use mxp_agents::primitives::AgentId;
use mxp_agents::tools::registry::{ToolMetadata, ToolRegistry};
use serde_json::{json, Value};
use std::sync::Arc;
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let adapter = Arc::new(GeminiAdapter::new(
GeminiConfig::from_env("gemini-2.0-flash").with_stream(false),
)?);
let tools = Arc::new(ToolRegistry::new());
tools.register_tool(
ToolMetadata::new("lookup_order", "1.0.0")?,
|input: Value| async move { Ok(json!({ "status": "shipped", "input": input })) },
)?;
let sink = CollectingSink::new();
let handler = Arc::new(KernelMessageHandler::new(adapter, tools, sink.clone()));
let kernel = AgentKernel::new(AgentId::random(), handler, TaskScheduler::default());
let payload = json!({
"messages": [
{ "role": "user", "content": "Look up order ORD-12345" }
],
"tools": [
{ "name": "lookup_order", "input": { "order_id": "ORD-12345" } }
]
});
kernel
.handle_message(Message::new(MessageType::Call, serde_json::to_vec(&payload)?))
.await?;
let outcome = sink.drain().pop().expect("call outcome");
println!("Response: {}", outcome.response());
Ok(())
}
```
---
## Cookbook
### 1. Basic LLM Inference (Non-Streaming)
Get a complete response in one shot:
```rust
use mxp_agents::adapters::gemini::{GeminiAdapter, GeminiConfig};
use mxp_agents::adapters::traits::{InferenceRequest, MessageRole, ModelAdapter, PromptMessage};
use futures::StreamExt;
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
// Create adapter (streaming disabled by default)
let adapter = GeminiAdapter::new(
GeminiConfig::from_env("gemini-2.0-flash")
)?;
let request = InferenceRequest::new(vec![
PromptMessage::new(MessageRole::User, "What is Rust?"),
])?;
let mut stream = adapter.infer(request).await?;
// Single chunk with complete response
while let Some(chunk) = stream.next().await {
println!("{}", chunk?.delta);
}
Ok(())
}
```
---
### 2. Streaming LLM Responses (Token-by-Token)
Stream tokens as they're generated:
```rust
use std::io::{Write, stdout};
use mxp_agents::adapters::gemini::{GeminiAdapter, GeminiConfig};
use mxp_agents::adapters::traits::{InferenceRequest, MessageRole, ModelAdapter, PromptMessage};
use futures::StreamExt;
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
// Enable streaming with .with_stream(true)
let adapter = GeminiAdapter::new(
GeminiConfig::from_env("gemini-2.0-flash")
.with_stream(true)
)?;
let request = InferenceRequest::new(vec![
PromptMessage::new(MessageRole::User, "Write a haiku about Rust."),
])?;
let mut stream = adapter.infer(request).await?;
// Tokens arrive incrementally
while let Some(chunk) = stream.next().await {
print!("{}", chunk?.delta);
stdout().flush()?; // IMPORTANT: flush to see tokens immediately
}
Ok(())
}
```
---
### 3. All Supported LLM Providers
```rust
use mxp_agents::adapters::openai::{OpenAiAdapter, OpenAiConfig};
use mxp_agents::adapters::anthropic::{AnthropicAdapter, AnthropicConfig};
use mxp_agents::adapters::gemini::{GeminiAdapter, GeminiConfig};
use mxp_agents::adapters::ollama::{OllamaAdapter, OllamaConfig};
// OpenAI (requires OPENAI_API_KEY env var)
let openai = OpenAiAdapter::new(
OpenAiConfig::from_env("gpt-4o")
.with_stream(true)
)?;
// Anthropic (requires ANTHROPIC_API_KEY env var)
let anthropic = AnthropicAdapter::new(
AnthropicConfig::from_env("claude-sonnet-4-20250514")
.with_stream(true)
)?;
// Google Gemini (requires GEMINI_API_KEY env var)
let gemini = GeminiAdapter::new(
GeminiConfig::from_env("gemini-2.0-flash")
.with_stream(true)
)?;
// Ollama (local, no API key needed)
let ollama = OllamaAdapter::new(
OllamaConfig::new("llama3.2")
.with_stream(true)
)?;
```
---
### 4. System Prompts & Temperature
```rust
use mxp_agents::adapters::openai::{OpenAiAdapter, OpenAiConfig};
use mxp_agents::adapters::traits::{InferenceRequest, MessageRole, ModelAdapter, PromptMessage};
let adapter = OpenAiAdapter::new(OpenAiConfig::from_env("gpt-4o"))?;
let request = InferenceRequest::new(vec![
PromptMessage::new(MessageRole::User, "Explain quantum computing"),
])?
.with_system_prompt("You are a physics professor. Explain concepts simply.")
.with_temperature(0.7) // 0.0 = deterministic, 1.0 = creative
.with_max_output_tokens(500); // Limit response length
let mut stream = adapter.infer(request).await?;
```
---
### 5. Multi-Turn Conversations
```rust
use mxp_agents::adapters::ollama::{OllamaAdapter, OllamaConfig};
use mxp_agents::adapters::traits::{InferenceRequest, MessageRole, ModelAdapter, PromptMessage};
let adapter = OllamaAdapter::new(OllamaConfig::new("llama3.2"))?;
// Build conversation history
let request = InferenceRequest::new(vec![
PromptMessage::new(MessageRole::User, "My name is Alice."),
PromptMessage::new(MessageRole::Assistant, "Hello Alice! How can I help you today?"),
PromptMessage::new(MessageRole::User, "What's my name?"),
])?;
let mut stream = adapter.infer(request).await?;
// Response: "Your name is Alice."
```
---
### 6. Resilient Adapter (Circuit Breaker + Retry)
Production-grade error handling:
```rust
use mxp_agents::adapters::openai::{OpenAiAdapter, OpenAiConfig};
use mxp_agents::adapters::resilience::{
ResilientAdapter, CircuitBreakerConfig, RetryConfig, BackoffStrategy,
};
use std::time::Duration;
let base = OpenAiAdapter::new(OpenAiConfig::from_env("gpt-4o"))?;
let resilient = ResilientAdapter::builder(base)
// Circuit breaker: stop calling after 5 failures, wait 30s before retry
.with_circuit_breaker(CircuitBreakerConfig {
failure_threshold: 5,
cooldown: Duration::from_secs(30),
success_threshold: 2,
})
// Retry: exponential backoff with jitter
.with_retry(RetryConfig {
max_attempts: 3,
backoff: BackoffStrategy::Exponential {
base: Duration::from_millis(100),
max: Duration::from_secs(10),
jitter: true,
},
..Default::default()
})
// Timeout: fail if request takes > 30s
.with_timeout_duration(Duration::from_secs(30))
.build();
// Use exactly like any other adapter
let mut stream = resilient.infer(request).await?;
```
---
### 7. Prometheus Metrics
```rust
use mxp_agents::telemetry::PrometheusExporter;
let exporter = PrometheusExporter::new();
// Register metrics for runtime and adapters
exporter.register_runtime();
exporter.register_adapter("openai");
exporter.register_adapter("ollama");
// Export Prometheus-format metrics
let metrics = exporter.export();
println!("{}", metrics);
// Metrics include:
// - mxp_adapter_requests_total{adapter="openai"}
// - mxp_adapter_request_duration_seconds{adapter="openai"}
// - mxp_adapter_errors_total{adapter="openai", error_type="timeout"}
// - mxp_circuit_breaker_state{adapter="openai"}
```
---
### 8. Health Checks (Kubernetes Ready)
```rust
use mxp_agents::telemetry::{HealthReporter, HealthStatus, ComponentHealth};
use std::time::Duration;
let reporter = HealthReporter::new();
// Register health checks
reporter.register("database", || async {
// Your health check logic
ComponentHealth::healthy("connected")
});
reporter.register("llm_adapter", || async {
ComponentHealth::healthy("openai responding")
});
// Kubernetes endpoints
let readiness = reporter.readiness().await; // /readyz
let liveness = reporter.liveness().await; // /healthz
match readiness.status {
HealthStatus::Healthy => println!("Ready to serve traffic"),
HealthStatus::Degraded => println!("Partially available"),
HealthStatus::Unhealthy => println!("Not ready"),
}
```
---
### 9. Secrets Management
```rust
use mxp_agents::config::{Secret, EnvSecretProvider, ChainedSecretProvider};
// Load from environment (redacted in Debug output)
let provider = EnvSecretProvider::new();
let api_key: Secret<String> = provider.get("OPENAI_API_KEY")?;
// Safe to log - shows "[REDACTED]"
println!("API Key: {:?}", api_key);
// Access the actual value when needed
let actual_key = api_key.expose();
// Chain multiple providers (env -> file -> vault)
let chain = ChainedSecretProvider::new(vec![
Box::new(EnvSecretProvider::new()),
Box::new(FileSecretProvider::new("/run/secrets")),
]);
```
---
### 10. Rate Limiting
```rust
use mxp_agents::primitives::{RateLimiter, AgentRateLimiter, AgentId};
use std::time::Duration;
// Per-adapter rate limiter
let limiter = RateLimiter::new(
100.0, // 100 requests per second
200, // burst capacity
);
if limiter.try_acquire() {
// Proceed with request
} else {
// Rate limited, back off
}
// Per-agent rate limiting
let agent_limiter = AgentRateLimiter::new(10.0, 20); // 10 req/s per agent
let agent_id = AgentId::random();
if agent_limiter.try_acquire(&agent_id) {
// This agent can proceed
}
```
---
### 11. MXP Agent Kernel
Build agents that communicate over MXP protocol:
```rust
use mxp_agents::kernel::{AgentKernel, AgentMessageHandler, HandlerContext, TaskScheduler, LifecycleEvent};
use mxp_agents::primitives::AgentId;
use async_trait::async_trait;
use std::sync::Arc;
struct MyHandler;
#[async_trait]
impl AgentMessageHandler for MyHandler {
async fn handle_call(&self, ctx: HandlerContext) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
println!("Received: {:?}", ctx.message());
Ok(())
}
}
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let agent_id = AgentId::random();
let handler = Arc::new(MyHandler);
let scheduler = TaskScheduler::default();
let mut kernel = AgentKernel::new(agent_id, handler, scheduler);
// Lifecycle: Created -> Ready -> Active
kernel.transition(LifecycleEvent::Boot)?;
kernel.transition(LifecycleEvent::Activate)?;
println!("Agent {} is active", agent_id);
Ok(())
}
```
---
### 12. Graceful Shutdown
```rust
use mxp_agents::kernel::{ShutdownCoordinator, WorkGuard};
use std::sync::Arc;
use std::time::Duration;
let coordinator = Arc::new(ShutdownCoordinator::new(Duration::from_secs(30)));
// Register in-flight work
let guard: WorkGuard = coordinator.register_work()?;
// Do work...
tokio::spawn({
let coord = coordinator.clone();
async move {
// Work completes, guard drops automatically
drop(guard);
}
});
// Initiate shutdown (waits for in-flight work)
coordinator.shutdown().await?;
```
---
### 13. Tool Registration
Define tools that LLMs can call:
```rust
use mxp_agents::tools::{ToolRegistry, ToolMetadata, ToolBinding};
use serde_json::{json, Value};
let mut registry = ToolRegistry::new();
// Register a tool
registry.register(ToolBinding {
metadata: ToolMetadata {
name: "get_weather".into(),
description: "Get current weather for a city".into(),
parameters: json!({
"type": "object",
"properties": {
"city": { "type": "string" }
},
"required": ["city"]
}),
},
executor: Box::new(|args: Value| {
Box::pin(async move {
let city = args["city"].as_str().unwrap_or("unknown");
Ok(json!({ "temp": 72, "city": city }))
})
}),
})?;
// Invoke tool
let result = registry.invoke("get_weather", json!({"city": "Seattle"})).await?;
```
---
### 14. Policy Enforcement
```rust
use mxp_agents::policy::{PolicyEngine, PolicyRule, PolicyDecision, PolicyRequest};
let mut engine = PolicyEngine::new();
// Add rules
engine.add_rule(PolicyRule {
name: "block_dangerous_tools".into(),
condition: |req: &PolicyRequest| req.tool_name == "delete_all",
decision: PolicyDecision::Deny("Dangerous operation blocked".into()),
});
// Check policy before tool execution
let request = PolicyRequest { tool_name: "delete_all".into(), ..Default::default() };
match engine.evaluate(&request) {
PolicyDecision::Allow => { /* proceed */ },
PolicyDecision::Deny(reason) => println!("Blocked: {}", reason),
PolicyDecision::Escalate => { /* require human approval */ },
}
```
---
## Environment Variables
| Variable | Provider | Required |
|----------|----------|----------|
| `OPENAI_API_KEY` | OpenAI | Yes |
| `ANTHROPIC_API_KEY` | Anthropic | Yes |
| `GEMINI_API_KEY` | Google Gemini | Yes |
| (none) | Ollama | No (local) |
---
## Crate Structure
```
mxp-agents (facade)
├── agent-adapters # LLM providers (OpenAI, Anthropic, Gemini, Ollama)
├── agent-kernel # Agent lifecycle, MXP handlers, scheduler
├── agent-primitives # Core types (AgentId, Capability, RateLimiter)
├── agent-tools # Tool registry and execution
├── agent-policy # Governance and policy engine
├── agent-memory # Memory bus, journal, embeddings
├── agent-prompts # System prompts, context window management
├── agent-config # Configuration loading, secrets
└── agent-telemetry # Metrics, health checks, tracing
```
---
## Examples
| Example | Description |
|---------|-------------|
| `examples/real-world` | **Real-world agents** - Customer support, data analysis, code review, multi-agent pipelines |
| `examples/RUN_AGENTS.md` | **MXP agent mesh** - Coordinator plus agents over MXP transport |
| `examples/aviation-flight-pulse` | **Aviation Flight Pulse** - DXB region feed + weather with MXP events |
| `examples/cookbook` | **SDK features demo** - Resilience, metrics, health checks, rate limiting |
| `examples/streaming-test` | Token-by-token streaming demo |
| `examples/basic-agent` | Simple agent with LLM adapter |
| `examples/enterprise-agent` | Production setup with resilience & metrics |
### Real-World Examples
Run the interactive real-world examples:
```bash
GEMINI_API_KEY=your_key cargo run -p real-world
```
**1. Customer Support Agent** - Handles queries, looks up orders, processes refunds with policy checks
**2. Data Analysis Agent** - Queries structured data, generates insights, creates reports
**3. Code Review Agent** - Reviews code for bugs, security issues, performance problems
**4. Multi-Agent Pipeline** - Research → Write → Edit workflow with 3 agents collaborating
Sample output (Multi-Agent Pipeline):
```
STAGE 1: Research Agent
🔬 Research Agent working...
⏱️ Completed in 4.1s
STAGE 2: Writer Agent
✍️ Writer Agent working...
⏱️ Completed in 2.5s
STAGE 3: Editor Agent
📝 Editor Agent working...
⏱️ Completed in 1.7s
📊 Pipeline Summary
Total pipeline time: 8.4s
Agents involved: 3 (Research, Writer, Editor)
Data passed between agents: 3216 bytes
```
### SDK Features Demo
Run the cookbook demo:
```bash
GEMINI_API_KEY=your_key cargo run -p cookbook
```
Sample output:
```
[1/6] SECRETS MANAGEMENT
API Key (debug): [REDACTED]
✓ Secrets are automatically redacted in logs
[2/6] RATE LIMITING
Attempted 25 requests, 20 allowed (burst limit)
✓ Rate limiting prevents resource exhaustion
[3/6] HEALTH CHECKS (Kubernetes Ready)
Readiness: Healthy
Liveness: Healthy
✓ Health checks ready for Kubernetes probes
[4/6] PROMETHEUS METRICS
Metrics exported (2258 bytes)
✓ Metrics ready for Prometheus scraping
[5/6] RESILIENT STREAMING (Circuit Breaker + Retry)
Response (streaming): Rust achieves memory safety through...
Chunks received: 5
Time to first token: 1.3s
✓ Resilient streaming completed successfully
[6/6] GRACEFUL SHUTDOWN
Shutdown completed gracefully
✓ Graceful shutdown with work draining
```
---
## Requirements
- Rust 1.85+ (MSRV)
- Tokio async runtime
- API keys for cloud providers (or Ollama for local)
---
## License
MIT OR Apache-2.0
---
## Links
- [API Documentation](https://docs.rs/mxp-agents)
- [MXP Protocol](https://github.com/yafatek/mxpnexus)
- [Examples](./examples)
- [Changelog](./CHANGELOG.md)