Mastra Agent 高级特性
Agent Networks、Processors、语音交互
🌐 Agent Networks
Agent Networks 允许多个 Agent、Workflow 和 Tool 协同完成复杂任务。
何时使用
- 任务需要多个专业化 Agent 协作
- 需要动态路由到不同的执行路径
- 任务执行顺序无法预先确定
- 需要 LLM 推理来决定下一步操作
核心原则
- 必须配置记忆: Networks 使用记忆存储任务历史和判断完成状态
- 基于描述路由: Agent 根据 primitives 的描述选择执行路径
- 自动优先级: 功能重叠时,优先选择更具体的 primitive
创建 Agent Network
typescript
import { Agent } from '@mastra/core/agent';
import { Memory } from '@mastra/memory';
// 1. 定义专业化 Agent
const researchAgent = new Agent({
id: 'research-agent',
name: 'Research Agent',
description: 'Gathers research insights in bullet-point form.',
instructions: 'Provide bullet-point summaries.',
model: 'openai/gpt-4o',
tools: { webSearch: searchTool, readDocument: documentTool }
});
const writingAgent = new Agent({
id: 'writing-agent',
name: 'Writing Agent',
description: 'Turns research into well-structured written content.',
instructions: 'Write in full paragraphs.',
model: 'openai/gpt-4o'
});
// 2. 创建路由 Agent
const routingAgent = new Agent({
id: 'routing-agent',
name: 'Routing Agent',
instructions: 'You are a network of writers and researchers.',
model: 'openai/gpt-4o',
network: {
agents: [researchAgent, writingAgent],
workflows: [cityWorkflow],
tools: { calculator: calculatorTool }
},
memory: new Memory({
provider: new LibSQLStore({ url: 'file:memory.db' })
})
});
// 3. 使用
const result = await routingAgent.generate('Write a report about Paris');路由逻辑
typescript
// Agent 根据描述自动选择
const primitives = [
{
type: 'agent',
id: 'research-agent',
description: 'Gathers research insights'
},
{
type: 'workflow',
id: 'city-workflow',
description: 'Handles city-specific research'
},
{
type: 'tool',
id: 'calculator',
description: 'Performs calculations'
}
];
// LLM 根据用户输入选择最合适的 primitive最佳实践
typescript
// ✅ 好的描述
const agent = new Agent({
description: `
This agent specializes in financial analysis.
Use it for tasks involving stock prices, market trends, or financial calculations.
`
});
// ❌ 差的描述
const agent = new Agent({
description: 'A helpful agent'
});🔄 Processors
Processors 在 Agent 生成响应前后执行自定义逻辑。
使用场景
- Pre-processors: 修改输入、添加上下文、验证请求
- Post-processors: 格式化输出、过滤敏感信息、记录日志
Pre-processor 示例
typescript
import { Agent } from '@mastra/core/agent';
const agent = new Agent({
id: 'translation-agent',
model: 'openai/gpt-4o',
preProcessors: [
async ({ messages, context }) => {
// 1. 检测语言
const language = await detectLanguage(messages[0].content);
// 2. 添加翻译指令
if (language !== 'en') {
messages.unshift({
role: 'system',
content: `Translate the following from ${language} to English first.`
});
}
return { messages, context };
}
]
});Post-processor 示例
typescript
const agent = new Agent({
id: 'secure-agent',
model: 'openai/gpt-4o',
postProcessors: [
async ({ text, context }) => {
// 1. 过滤敏感信息
text = text.replace(/\b\d{3}-\d{2}-\d{4}\b/g, '[REDACTED]');
// 2. 添加免责声明
text += '\n\n*This is AI-generated content.*';
// 3. 记录日志
await logResponse(context.userId, text);
return { text, context };
}
]
});多个 Processors
typescript
const agent = new Agent({
id: 'multi-processor-agent',
preProcessors: [
addUserContext,
validateInput,
enrichWithHistory
],
postProcessors: [
formatMarkdown,
filterSensitiveData,
logMetrics
]
});实战案例:内容审核
typescript
const moderationAgent = new Agent({
id: 'moderation-agent',
model: 'openai/gpt-4o',
preProcessors: [
async ({ messages }) => {
const content = messages[messages.length - 1].content;
// 检查内容是否违规
const moderation = await openai.moderations.create({ input: content });
if (moderation.results[0].flagged) {
throw new Error('Content violates policy');
}
return { messages };
}
],
postProcessors: [
async ({ text }) => {
// 再次检查输出
const moderation = await openai.moderations.create({ input: text });
if (moderation.results[0].flagged) {
return { text: 'I cannot provide that information.' };
}
return { text };
}
]
});🎙️ 语音交互
Mastra 支持多种语音提供商,实现 TTS 和 STT。
基础配置
typescript
import { Agent } from '@mastra/core/agent';
import { OpenAIVoice } from '@mastra/voice-openai';
const voice = new OpenAIVoice({
apiKey: process.env.OPENAI_API_KEY,
ttsModel: 'tts-1',
ttsVoice: 'alloy',
sttModel: 'whisper-1'
});
const agent = new Agent({
id: 'voice-agent',
model: 'openai/gpt-4o',
voice
});
// 文本转语音
const audio = await agent.speak('Hello, how can I help you?');
// 语音转文本
const text = await agent.listen(audioBuffer);实时语音对话
typescript
import { OpenAIRealtimeVoice } from '@mastra/voice-openai-realtime';
const realtimeVoice = new OpenAIRealtimeVoice({
apiKey: process.env.OPENAI_API_KEY,
model: 'gpt-4o-realtime-preview',
voice: 'alloy',
instructions: 'You are a helpful assistant.'
});
const agent = new Agent({
id: 'realtime-agent',
voice: realtimeVoice
});
// 建立连接
await agent.voice.connect();
// 发送音频流
agent.voice.sendAudio(audioChunk);
// 接收响应
agent.voice.on('audio', (audioChunk) => {
playAudio(audioChunk);
});
// 接收转录
agent.voice.on('transcript', (text) => {
console.log('User said:', text);
});多提供商组合
typescript
import { OpenAIVoice } from '@mastra/voice-openai';
import { ElevenLabsVoice } from '@mastra/voice-elevenlabs';
// 使用 ElevenLabs 的高质量 TTS
const ttsVoice = new ElevenLabsVoice({
apiKey: process.env.ELEVENLABS_API_KEY,
voiceId: 'premium-voice-id'
});
// 使用 OpenAI 的 STT
const sttVoice = new OpenAIVoice({
apiKey: process.env.OPENAI_API_KEY
});
const agent = new Agent({
id: 'hybrid-voice-agent',
voice: {
speak: ttsVoice,
listen: sttVoice
}
});语音提供商对比
| 提供商 | Package | 能力 |
|---|---|---|
| OpenAI | @mastra/voice-openai | TTS, STT |
| OpenAI Realtime | @mastra/voice-openai-realtime | Speech-to-Speech, 实时对话 |
| ElevenLabs | @mastra/voice-elevenlabs | TTS (高质量语音) |
| Deepgram | @mastra/voice-deepgram | STT (高精度转录) |
🔧 高级配置
自定义记忆提供商
typescript
import { Memory } from '@mastra/memory';
import { PostgresStore } from '@mastra/postgres';
const memory = new Memory({
provider: new PostgresStore({
connectionString: process.env.DATABASE_URL
}),
config: {
maxMessages: 50,
ttl: 86400 // 24 hours
}
});
const agent = new Agent({
id: 'persistent-agent',
memory
});工具调用配置
typescript
const agent = new Agent({
id: 'tool-agent',
tools: {
search: searchTool,
calculator: calculatorTool
},
toolChoice: 'auto', // 'auto' | 'required' | 'none'
parallelToolCalls: true
});流式响应
typescript
const stream = await agent.stream('Tell me a story');
for await (const chunk of stream) {
if (chunk.type === 'text') {
process.stdout.write(chunk.content);
} else if (chunk.type === 'tool_call') {
console.log('Calling tool:', chunk.tool);
}
}📊 监控与调试
日志记录
typescript
const agent = new Agent({
id: 'logged-agent',
postProcessors: [
async ({ text, context }) => {
await logToDatabase({
agentId: 'logged-agent',
userId: context.userId,
input: context.messages[0].content,
output: text,
timestamp: new Date()
});
return { text, context };
}
]
});性能监控
typescript
const agent = new Agent({
id: 'monitored-agent',
preProcessors: [
async ({ messages, context }) => {
context.startTime = Date.now();
return { messages, context };
}
],
postProcessors: [
async ({ text, context }) => {
const duration = Date.now() - context.startTime;
await recordMetric('agent.duration', duration);
return { text, context };
}
]
});🎯 最佳实践
- Networks: 为每个 Agent 写清晰的描述,帮助路由决策
- Processors: 保持 processors 简单,避免复杂逻辑
- 语音: 根据场景选择合适的提供商(质量 vs 成本)
- 记忆: 定期清理过期记忆,避免上下文膨胀
- 监控: 记录关键指标,持续优化性能