Agent2Agent Protocol (A2A)
A2A 是 Google 推出的 Agent 间通信协议
核心概念
A2A 协议专注于 AI Agent 之间的通信与协作,实现跨平台、跨框架的 Agent 互操作。
Client Agent ─── A2A Protocol ─── Remote Agent
│ │
│ 1. 发现 (Agent Card) │
│ 2. 发起任务 (Task) │
│ 3. 状态更新 (SSE Stream) │
│ 4. 交换消息 (Message) │
│ 5. 返回结果 (Artifact) │设计原则
| 原则 | 描述 |
|---|---|
| 拥抱 Agent 能力 | 允许 Agent 以自然方式协作 |
| 基于现有标准 | 使用 HTTP、SSE、JSON-RPC |
| 默认安全 | 支持 OAuth 2.0、W3C DID |
| 支持长任务 | 从秒级到数天的任务 |
| 模态无关 | 支持文本、音频、视频等 |
核心实体
Agent Card
typescript
interface AgentCard {
name: string;
description: string;
url: string;
version: string;
capabilities: {
streaming?: boolean;
pushNotifications?: boolean;
};
skills: Skill[];
authentication: {
schemes: string[];
};
}Task
typescript
interface Task {
id: string;
state: 'pending' | 'working' | 'input_required' | 'completed' | 'failed';
input: Message;
output?: Artifact;
metadata?: Record<string, any>;
}Message
typescript
interface Message {
role: 'user' | 'agent';
parts: MessagePart[];
}
type MessagePart = TextPart | FilePart | DataPart;Artifact
typescript
interface Artifact {
parts: ArtifactPart[];
}
type ArtifactPart = TextPart | FilePart | DataPart | FormPart | IframePart;API 端点
1. 获取 Agent Card
http
GET /.well-known/agent.json2. 创建任务
http
POST /tasks
Content-Type: application/json
{
"input": {
"role": "user",
"parts": [{ "text": "分析这个数据集" }]
}
}3. 获取任务状态
http
GET /tasks/{taskId}4. 流式更新
http
GET /tasks/{taskId}/stream
Accept: text/event-stream5. 发送消息
http
POST /tasks/{taskId}/messages实现示例
服务端实现
typescript
import express from 'express';
const app = express();
// Agent Card
app.get('/.well-known/agent.json', (req, res) => {
res.json({
name: 'Data Analyzer',
description: '数据分析 Agent',
url: 'https://analyzer.example.com',
version: '1.0',
skills: [{
type: 'analysis',
description: '分析数据并生成报告'
}]
});
});
// 创建任务
app.post('/tasks', async (req, res) => {
const task = {
id: generateId(),
state: 'working',
input: req.body.input
};
// 异步处理
processTask(task);
res.json(task);
});
// 获取任务
app.get('/tasks/:taskId', (req, res) => {
const task = getTask(req.params.taskId);
res.json(task);
});
// 流式更新
app.get('/tasks/:taskId/stream', (req, res) => {
res.setHeader('Content-Type', 'text/event-stream');
const stream = getTaskStream(req.params.taskId);
stream.on('update', (data) => {
res.write(`data: ${JSON.stringify(data)}\n\n`);
});
});客户端实现
typescript
class A2AClient {
constructor(private agentUrl: string) {}
async getAgentCard(): Promise<AgentCard> {
const res = await fetch(`${this.agentUrl}/.well-known/agent.json`);
return await res.json();
}
async createTask(input: Message): Promise<Task> {
const res = await fetch(`${this.agentUrl}/tasks`, {
method: 'POST',
headers: { 'Content-Type': 'application/json' },
body: JSON.stringify({ input })
});
return await res.json();
}
async getTask(taskId: string): Promise<Task> {
const res = await fetch(`${this.agentUrl}/tasks/${taskId}`);
return await res.json();
}
async *streamTask(taskId: string): AsyncGenerator<TaskUpdate> {
const res = await fetch(`${this.agentUrl}/tasks/${taskId}/stream`);
const reader = res.body!.getReader();
const decoder = new TextDecoder();
while (true) {
const { done, value } = await reader.read();
if (done) break;
const text = decoder.decode(value);
const lines = text.split('\n');
for (const line of lines) {
if (line.startsWith('data: ')) {
yield JSON.parse(line.slice(6));
}
}
}
}
}使用示例
typescript
const client = new A2AClient('https://analyzer.example.com');
// 1. 获取 Agent 信息
const card = await client.getAgentCard();
console.log(card.name, card.skills);
// 2. 创建任务
const task = await client.createTask({
role: 'user',
parts: [{ text: '分析销售数据' }]
});
// 3. 监听更新
for await (const update of client.streamTask(task.id)) {
console.log(update.state, update.message);
}
// 4. 获取结果
const result = await client.getTask(task.id);
console.log(result.output);多 Agent 协作
typescript
class OrchestratorAgent {
private agents: Map<string, A2AClient> = new Map();
async addAgent(skillType: string, url: string) {
this.agents.set(skillType, new A2AClient(url));
}
async processRequest(request: string) {
// 1. 数据清洗
const cleanTask = await this.agents.get('data-cleaning')!.createTask({
role: 'user',
parts: [{ text: request }]
});
await this.waitForCompletion(cleanTask);
// 2. 分析
const analysisTask = await this.agents.get('analysis')!.createTask({
role: 'user',
parts: [{ text: '分析清洗后的数据', data: cleanTask.output }]
});
await this.waitForCompletion(analysisTask);
// 3. 可视化
const vizTask = await this.agents.get('visualization')!.createTask({
role: 'user',
parts: [{ text: '生成图表', data: analysisTask.output }]
});
return await this.waitForCompletion(vizTask);
}
private async waitForCompletion(task: Task) {
while (task.state !== 'completed') {
await sleep(1000);
task = await this.agents.get(task.agentType)!.getTask(task.id);
}
return task;
}
}关键要点
- Agent Card 是发现机制: 标准化的 Agent 能力描述
- 基于 HTTP/SSE: 易于集成和部署
- 支持长任务: 适合复杂的 Agent 工作流
- 流式更新: 实时反馈任务进度
- 多模态支持: 不限于文本交互