Skip to content

Agent2Agent Protocol (A2A)

A2A 是 Google 推出的 Agent 间通信协议

核心概念

A2A 协议专注于 AI Agent 之间的通信与协作,实现跨平台、跨框架的 Agent 互操作。

Client Agent ─── A2A Protocol ─── Remote Agent
     │                                  │
     │  1. 发现 (Agent Card)             │
     │  2. 发起任务 (Task)               │
     │  3. 状态更新 (SSE Stream)         │
     │  4. 交换消息 (Message)            │
     │  5. 返回结果 (Artifact)           │

设计原则

原则描述
拥抱 Agent 能力允许 Agent 以自然方式协作
基于现有标准使用 HTTP、SSE、JSON-RPC
默认安全支持 OAuth 2.0、W3C DID
支持长任务从秒级到数天的任务
模态无关支持文本、音频、视频等

核心实体

Agent Card

typescript
interface AgentCard {
  name: string;
  description: string;
  url: string;
  version: string;
  capabilities: {
    streaming?: boolean;
    pushNotifications?: boolean;
  };
  skills: Skill[];
  authentication: {
    schemes: string[];
  };
}

Task

typescript
interface Task {
  id: string;
  state: 'pending' | 'working' | 'input_required' | 'completed' | 'failed';
  input: Message;
  output?: Artifact;
  metadata?: Record<string, any>;
}

Message

typescript
interface Message {
  role: 'user' | 'agent';
  parts: MessagePart[];
}

type MessagePart = TextPart | FilePart | DataPart;

Artifact

typescript
interface Artifact {
  parts: ArtifactPart[];
}

type ArtifactPart = TextPart | FilePart | DataPart | FormPart | IframePart;

API 端点

1. 获取 Agent Card

http
GET /.well-known/agent.json

2. 创建任务

http
POST /tasks
Content-Type: application/json

{
  "input": {
    "role": "user",
    "parts": [{ "text": "分析这个数据集" }]
  }
}

3. 获取任务状态

http
GET /tasks/{taskId}

4. 流式更新

http
GET /tasks/{taskId}/stream
Accept: text/event-stream

5. 发送消息

http
POST /tasks/{taskId}/messages

实现示例

服务端实现

typescript
import express from 'express';

const app = express();

// Agent Card
app.get('/.well-known/agent.json', (req, res) => {
  res.json({
    name: 'Data Analyzer',
    description: '数据分析 Agent',
    url: 'https://analyzer.example.com',
    version: '1.0',
    skills: [{
      type: 'analysis',
      description: '分析数据并生成报告'
    }]
  });
});

// 创建任务
app.post('/tasks', async (req, res) => {
  const task = {
    id: generateId(),
    state: 'working',
    input: req.body.input
  };

  // 异步处理
  processTask(task);

  res.json(task);
});

// 获取任务
app.get('/tasks/:taskId', (req, res) => {
  const task = getTask(req.params.taskId);
  res.json(task);
});

// 流式更新
app.get('/tasks/:taskId/stream', (req, res) => {
  res.setHeader('Content-Type', 'text/event-stream');

  const stream = getTaskStream(req.params.taskId);
  stream.on('update', (data) => {
    res.write(`data: ${JSON.stringify(data)}\n\n`);
  });
});

客户端实现

typescript
class A2AClient {
  constructor(private agentUrl: string) {}

  async getAgentCard(): Promise<AgentCard> {
    const res = await fetch(`${this.agentUrl}/.well-known/agent.json`);
    return await res.json();
  }

  async createTask(input: Message): Promise<Task> {
    const res = await fetch(`${this.agentUrl}/tasks`, {
      method: 'POST',
      headers: { 'Content-Type': 'application/json' },
      body: JSON.stringify({ input })
    });
    return await res.json();
  }

  async getTask(taskId: string): Promise<Task> {
    const res = await fetch(`${this.agentUrl}/tasks/${taskId}`);
    return await res.json();
  }

  async *streamTask(taskId: string): AsyncGenerator<TaskUpdate> {
    const res = await fetch(`${this.agentUrl}/tasks/${taskId}/stream`);
    const reader = res.body!.getReader();
    const decoder = new TextDecoder();

    while (true) {
      const { done, value } = await reader.read();
      if (done) break;

      const text = decoder.decode(value);
      const lines = text.split('\n');

      for (const line of lines) {
        if (line.startsWith('data: ')) {
          yield JSON.parse(line.slice(6));
        }
      }
    }
  }
}

使用示例

typescript
const client = new A2AClient('https://analyzer.example.com');

// 1. 获取 Agent 信息
const card = await client.getAgentCard();
console.log(card.name, card.skills);

// 2. 创建任务
const task = await client.createTask({
  role: 'user',
  parts: [{ text: '分析销售数据' }]
});

// 3. 监听更新
for await (const update of client.streamTask(task.id)) {
  console.log(update.state, update.message);
}

// 4. 获取结果
const result = await client.getTask(task.id);
console.log(result.output);

多 Agent 协作

typescript
class OrchestratorAgent {
  private agents: Map<string, A2AClient> = new Map();

  async addAgent(skillType: string, url: string) {
    this.agents.set(skillType, new A2AClient(url));
  }

  async processRequest(request: string) {
    // 1. 数据清洗
    const cleanTask = await this.agents.get('data-cleaning')!.createTask({
      role: 'user',
      parts: [{ text: request }]
    });
    await this.waitForCompletion(cleanTask);

    // 2. 分析
    const analysisTask = await this.agents.get('analysis')!.createTask({
      role: 'user',
      parts: [{ text: '分析清洗后的数据', data: cleanTask.output }]
    });
    await this.waitForCompletion(analysisTask);

    // 3. 可视化
    const vizTask = await this.agents.get('visualization')!.createTask({
      role: 'user',
      parts: [{ text: '生成图表', data: analysisTask.output }]
    });

    return await this.waitForCompletion(vizTask);
  }

  private async waitForCompletion(task: Task) {
    while (task.state !== 'completed') {
      await sleep(1000);
      task = await this.agents.get(task.agentType)!.getTask(task.id);
    }
    return task;
  }
}

关键要点

  1. Agent Card 是发现机制: 标准化的 Agent 能力描述
  2. 基于 HTTP/SSE: 易于集成和部署
  3. 支持长任务: 适合复杂的 Agent 工作流
  4. 流式更新: 实时反馈任务进度
  5. 多模态支持: 不限于文本交互

参考资源

前端面试知识库