Workflow 编排:构建可靠的 AI 工作流 🔄
"Agent 适合探索,Workflow 适合生产。"
1. Workflow vs Agent
1.1 核心区别
| 特性 | Agent | Workflow |
|---|---|---|
| 控制流 | LLM 动态决定 | 代码预定义 |
| 可预测性 | 低 | 高 |
| 可调试性 | 难 | 易 |
| 适用场景 | 探索性、开放性任务 | 重复性、结构化任务 |
| 成本 | 高 (多次 LLM 调用) | 可控 |
1.2 何时用 Workflow
✅ 适合 Workflow:
- 代码审查 (固定步骤)
- 文档生成 (模板化)
- 数据处理 (ETL)
- 测试生成 (结构化输出)
✅ 适合 Agent:
- 自由对话
- 探索性任务
- 需要创造性的工作
✅ 混合使用:
- Workflow 作为骨架
- 某些步骤内部使用 Agent 处理复杂逻辑
2. Workflow 基础模式
2.1 Prompt Chain (提示链)
最简单的 Workflow:多个 LLM 调用串联。
javascript
async function generateAPIEndpoint(spec) {
// Step 1: 生成类型定义
const types = await llm.chat({
messages: [{
role: "user",
content: `根据以下 API 规格生成 TypeScript 类型定义:\n${spec}`
}]
});
// Step 2: 生成验证 Schema
const validation = await llm.chat({
messages: [{
role: "user",
content: `根据以下类型定义生成 Zod Schema:\n${types}`
}]
});
// Step 3: 生成 API Handler
const handler = await llm.chat({
messages: [{
role: "user",
content: `生成 Next.js API Route Handler:
类型: ${types}
验证: ${validation}
规格: ${spec}`
}]
});
return { types, validation, handler };
}2.2 并行执行 (Parallelization)
独立任务并行处理:
javascript
async function reviewCode(files) {
// 并行审查所有文件
const reviews = await Promise.all(
files.map(file => reviewSingleFile(file))
);
// 合并结果
const summary = await llm.chat({
messages: [{
role: "user",
content: `综合以下代码审查结果,生成总结报告:\n${JSON.stringify(reviews)}`
}]
});
return { reviews, summary };
}
async function reviewSingleFile(file) {
return await llm.chat({
messages: [{
role: "system",
content: "你是代码审查专家。指出问题和改进建议。"
}, {
role: "user",
content: `审查这个文件:\n${file.content}`
}]
});
} ┌─────────────┐
│ 输入文件 │
└──────┬──────┘
│
┌────────┼────────┐
▼ ▼ ▼
┌─────┐ ┌─────┐ ┌─────┐
│File1│ │File2│ │File3│ (并行)
└──┬──┘ └──┬──┘ └──┬──┘
│ │ │
└────────┼────────┘
▼
┌──────────────┐
│ 综合报告 │
└──────────────┘2.3 路由 (Routing)
根据输入类型选择不同处理路径:
javascript
async function handleUserRequest(request) {
// Step 1: 分类请求
const classification = await llm.chat({
messages: [{
role: "user",
content: `分类这个请求: "${request}"
类别:
1. bug_report - 报告 Bug
2. feature_request - 功能请求
3. question - 技术问题
4. other - 其他
只输出类别名称。`
}]
});
// Step 2: 路由到对应处理器
const handlers = {
bug_report: handleBugReport,
feature_request: handleFeatureRequest,
question: handleQuestion,
other: handleOther
};
const handler = handlers[classification.trim()];
return await handler(request);
}
async function handleBugReport(request) {
// 提取 Bug 信息、搜索相关代码、生成修复建议
}
async function handleQuestion(request) {
// 搜索文档、生成回答
}3. 高级 Workflow 模式
3.1 编排器模式 (Orchestrator-Workers)
一个 "指挥者" 分配任务给多个 "工作者":
javascript
class OrchestratorWorkflow {
async run(task) {
// Orchestrator: 分解任务
const subtasks = await this.decompose(task);
// Workers: 并行执行子任务
const results = await Promise.all(
subtasks.map(subtask => this.worker(subtask))
);
// Orchestrator: 综合结果
const final = await this.synthesize(task, results);
return final;
}
async decompose(task) {
const response = await llm.chat({
messages: [{
role: "system",
content: "将任务分解为独立的子任务。输出 JSON 数组。"
}, {
role: "user",
content: task
}],
response_format: { type: "json_object" }
});
return JSON.parse(response.content).subtasks;
}
async worker(subtask) {
// 每个 worker 可以是简单 LLM 调用或复杂的子 workflow
return await llm.chat({
messages: [{
role: "user",
content: `完成这个子任务: ${subtask}`
}]
});
}
async synthesize(task, results) {
return await llm.chat({
messages: [{
role: "user",
content: `原始任务: ${task}
子任务结果:
${results.map((r, i) => `${i+1}. ${r}`).join('\n')}
请综合这些结果,生成最终输出。`
}]
});
}
}3.2 评估器-优化器循环 (Evaluator-Optimizer)
生成 → 评估 → 优化 的循环:
javascript
async function generateWithQuality(task, minScore = 0.8) {
let result = await generate(task);
let iteration = 0;
const maxIterations = 3;
while (iteration < maxIterations) {
// 评估
const evaluation = await evaluate(task, result);
if (evaluation.score >= minScore) {
return result;
}
// 优化
result = await optimize(task, result, evaluation.feedback);
iteration++;
}
return result; // 返回最后一次结果
}
async function evaluate(task, result) {
const response = await llm.chat({
messages: [{
role: "system",
content: `评估以下输出是否满足任务要求。
输出 JSON: { score: 0-1, issues: [], feedback: "" }`
}, {
role: "user",
content: `任务: ${task}\n\n输出: ${result}`
}],
response_format: { type: "json_object" }
});
return JSON.parse(response.content);
}
async function optimize(task, result, feedback) {
return await llm.chat({
messages: [{
role: "user",
content: `任务: ${task}
当前输出: ${result}
反馈: ${feedback}
请根据反馈改进输出。`
}]
});
}3.3 DAG 工作流
复杂依赖关系的有向无环图:
javascript
const workflowDefinition = {
nodes: {
'parse_spec': {
fn: parseSpec,
inputs: ['spec']
},
'gen_types': {
fn: generateTypes,
inputs: ['parse_spec']
},
'gen_schema': {
fn: generateSchema,
inputs: ['gen_types']
},
'gen_handler': {
fn: generateHandler,
inputs: ['gen_types', 'gen_schema']
},
'gen_tests': {
fn: generateTests,
inputs: ['gen_handler']
},
'run_tests': {
fn: runTests,
inputs: ['gen_handler', 'gen_tests']
}
}
};
/*
parse_spec
│
gen_types
╱ ╲
gen_schema ─┐
╲ │
gen_handler
│
gen_tests
│
run_tests
*/
class DAGWorkflow {
async run(definition, inputs) {
const results = { ...inputs };
const completed = new Set(Object.keys(inputs));
const pending = new Set(Object.keys(definition.nodes));
while (pending.size > 0) {
// 找出所有依赖已满足的节点
const ready = [...pending].filter(nodeId => {
const node = definition.nodes[nodeId];
return node.inputs.every(dep => completed.has(dep));
});
if (ready.length === 0) {
throw new Error("检测到循环依赖或缺少输入");
}
// 并行执行所有就绪节点
await Promise.all(ready.map(async (nodeId) => {
const node = definition.nodes[nodeId];
const nodeInputs = node.inputs.map(dep => results[dep]);
results[nodeId] = await node.fn(...nodeInputs);
completed.add(nodeId);
pending.delete(nodeId);
}));
}
return results;
}
}4. 人机协作
4.1 Human-in-the-Loop
关键步骤需要人工确认:
javascript
class HumanInLoopWorkflow {
constructor(confirmFn) {
this.confirm = confirmFn;
}
async refactorWithApproval(file) {
// Step 1: 分析 (自动)
const analysis = await this.analyze(file);
// Step 2: 生成重构计划 (自动)
const plan = await this.generatePlan(analysis);
// Step 3: 人工审批计划
const approved = await this.confirm({
type: 'plan_approval',
message: '请审核重构计划',
plan
});
if (!approved) {
return { status: 'cancelled', reason: 'User rejected plan' };
}
// Step 4: 执行重构 (自动)
const result = await this.executeRefactor(plan);
// Step 5: 人工审核结果
const accepted = await this.confirm({
type: 'result_review',
message: '请审核重构结果',
diff: result.diff
});
if (!accepted) {
await this.rollback(result);
return { status: 'rolled_back' };
}
return { status: 'completed', result };
}
}
// 使用示例 - 在 UI 中实现确认函数
const workflow = new HumanInLoopWorkflow(async (request) => {
return new Promise((resolve) => {
showConfirmDialog({
...request,
onConfirm: () => resolve(true),
onCancel: () => resolve(false)
});
});
});4.2 中断与恢复
javascript
class InterruptibleWorkflow {
constructor() {
this.state = {
currentStep: 0,
results: {},
status: 'idle'
};
}
async run() {
this.state.status = 'running';
const steps = [
{ name: 'analyze', fn: this.analyze },
{ name: 'plan', fn: this.plan },
{ name: 'execute', fn: this.execute },
{ name: 'verify', fn: this.verify }
];
while (this.state.currentStep < steps.length) {
// 检查是否被中断
if (this.state.status === 'paused') {
await this.saveState();
return { status: 'paused', canResume: true };
}
const step = steps[this.state.currentStep];
try {
this.state.results[step.name] = await step.fn.call(this);
this.state.currentStep++;
} catch (error) {
this.state.status = 'error';
this.state.error = error.message;
return { status: 'error', error, canRetry: true };
}
}
this.state.status = 'completed';
return { status: 'completed', results: this.state.results };
}
pause() {
this.state.status = 'paused';
}
async resume() {
if (this.state.status !== 'paused') {
throw new Error('Workflow is not paused');
}
return await this.run();
}
async retry() {
if (this.state.status !== 'error') {
throw new Error('Workflow is not in error state');
}
this.state.status = 'running';
return await this.run();
}
}5. 错误处理与恢复
5.1 重试策略
javascript
async function withRetry(fn, options = {}) {
const {
maxRetries = 3,
backoff = 'exponential', // 'linear' | 'exponential' | 'fixed'
baseDelay = 1000,
shouldRetry = (error) => true
} = options;
let lastError;
for (let attempt = 0; attempt <= maxRetries; attempt++) {
try {
return await fn();
} catch (error) {
lastError = error;
if (attempt === maxRetries || !shouldRetry(error)) {
throw error;
}
const delay = calculateDelay(attempt, backoff, baseDelay);
console.log(`Attempt ${attempt + 1} failed, retrying in ${delay}ms`);
await sleep(delay);
}
}
throw lastError;
}
function calculateDelay(attempt, backoff, baseDelay) {
switch (backoff) {
case 'fixed':
return baseDelay;
case 'linear':
return baseDelay * (attempt + 1);
case 'exponential':
return baseDelay * Math.pow(2, attempt);
default:
return baseDelay;
}
}5.2 Fallback 策略
javascript
async function generateWithFallback(prompt) {
const strategies = [
{ model: 'claude-sonnet-4-20250514', temperature: 0.2 },
{ model: 'gpt-4o', temperature: 0.2 }, // fallback 到其他模型
{ model: 'claude-sonnet-4-20250514', temperature: 0.5 }, // 提高温度重试
];
for (const strategy of strategies) {
try {
return await llm.chat({
model: strategy.model,
temperature: strategy.temperature,
messages: [{ role: 'user', content: prompt }]
});
} catch (error) {
console.log(`Strategy failed: ${JSON.stringify(strategy)}`);
continue;
}
}
throw new Error('All strategies failed');
}5.3 补偿操作 (Compensation)
javascript
class CompensatingWorkflow {
constructor() {
this.completedActions = [];
}
async run() {
try {
// 执行操作并记录
await this.executeWithCompensation(
() => this.createFile('temp.ts', content),
() => this.deleteFile('temp.ts')
);
await this.executeWithCompensation(
() => this.updateConfig('config.json', newConfig),
() => this.updateConfig('config.json', oldConfig)
);
// ... 更多操作
} catch (error) {
// 执行所有补偿操作
await this.compensate();
throw error;
}
}
async executeWithCompensation(action, compensation) {
await action();
this.completedActions.push(compensation);
}
async compensate() {
// 反向执行所有补偿操作
for (const compensation of this.completedActions.reverse()) {
try {
await compensation();
} catch (e) {
console.error('Compensation failed:', e);
}
}
this.completedActions = [];
}
}6. 可观测性
6.1 Workflow 追踪
javascript
class TracedWorkflow {
constructor(name) {
this.name = name;
this.traceId = crypto.randomUUID();
this.spans = [];
}
async runStep(stepName, fn) {
const span = {
stepName,
startTime: Date.now(),
traceId: this.traceId
};
try {
const result = await fn();
span.endTime = Date.now();
span.duration = span.endTime - span.startTime;
span.status = 'success';
span.result = this.summarize(result);
return result;
} catch (error) {
span.endTime = Date.now();
span.duration = span.endTime - span.startTime;
span.status = 'error';
span.error = error.message;
throw error;
} finally {
this.spans.push(span);
}
}
getTrace() {
return {
traceId: this.traceId,
workflow: this.name,
totalDuration: this.spans.reduce((sum, s) => sum + s.duration, 0),
steps: this.spans
};
}
}
// 使用
const workflow = new TracedWorkflow('code-review');
const types = await workflow.runStep('generate-types', () => generateTypes(spec));
const schema = await workflow.runStep('generate-schema', () => generateSchema(types));
console.log(workflow.getTrace());6.2 成本追踪
javascript
class CostTracker {
constructor() {
this.calls = [];
}
// 包装 LLM 调用
wrap(llm) {
const original = llm.chat.bind(llm);
llm.chat = async (options) => {
const start = Date.now();
const response = await original(options);
this.calls.push({
model: options.model,
inputTokens: response.usage.prompt_tokens,
outputTokens: response.usage.completion_tokens,
cost: this.calculateCost(options.model, response.usage),
latency: Date.now() - start
});
return response;
};
return llm;
}
calculateCost(model, usage) {
const pricing = {
'claude-sonnet-4-20250514': { input: 0.003, output: 0.015 },
'gpt-4o': { input: 0.005, output: 0.015 },
'gpt-4o-mini': { input: 0.00015, output: 0.0006 }
};
const price = pricing[model] || { input: 0, output: 0 };
return (
(usage.prompt_tokens / 1000) * price.input +
(usage.completion_tokens / 1000) * price.output
);
}
getSummary() {
return {
totalCalls: this.calls.length,
totalCost: this.calls.reduce((sum, c) => sum + c.cost, 0),
totalTokens: this.calls.reduce((sum, c) => sum + c.inputTokens + c.outputTokens, 0),
avgLatency: this.calls.reduce((sum, c) => sum + c.latency, 0) / this.calls.length,
byModel: this.groupBy('model')
};
}
}7. 实战:代码审查 Workflow
javascript
class CodeReviewWorkflow {
constructor() {
this.tracker = new CostTracker();
this.llm = this.tracker.wrap(anthropic);
}
async review(prFiles) {
const workflow = new TracedWorkflow('code-review');
// 1. 并行分析每个文件
const fileAnalyses = await workflow.runStep('analyze-files', async () => {
return Promise.all(
prFiles.map(file => this.analyzeFile(file))
);
});
// 2. 识别跨文件问题
const crossFileIssues = await workflow.runStep('cross-file-analysis', async () => {
return this.analyzeCrossFile(fileAnalyses);
});
// 3. 生成建议
const suggestions = await workflow.runStep('generate-suggestions', async () => {
return this.generateSuggestions([...fileAnalyses, crossFileIssues]);
});
// 4. 格式化输出
const report = await workflow.runStep('format-report', async () => {
return this.formatReport({
files: fileAnalyses,
crossFile: crossFileIssues,
suggestions
});
});
return {
report,
trace: workflow.getTrace(),
cost: this.tracker.getSummary()
};
}
async analyzeFile(file) {
return this.llm.chat({
model: 'claude-sonnet-4-20250514',
messages: [{
role: 'system',
content: CODE_REVIEW_SYSTEM_PROMPT
}, {
role: 'user',
content: `审查这个文件:\n\n文件名: ${file.name}\n\n\`\`\`\n${file.content}\n\`\`\``
}],
response_format: { type: 'json_object' }
});
}
}
const CODE_REVIEW_SYSTEM_PROMPT = `你是代码审查专家。分析代码并识别:
- 安全问题
- 性能问题
- 可维护性问题
- 最佳实践违规
输出 JSON:
{
"issues": [
{
"severity": "critical|major|minor",
"type": "security|performance|maintainability|style",
"line": number,
"description": string,
"suggestion": string
}
],
"summary": string
}`;8. 关键要点
- Workflow 比 Agent 更可控: 适合生产环境的重复性任务
- 选择合适的模式: Chain、Parallel、Router、Orchestrator
- 人机协作很重要: 关键步骤需要人工确认
- 健壮的错误处理: 重试、Fallback、补偿
- 可观测性: 追踪每步执行,监控成本
- 增量优化: 从简单 Chain 开始,按需添加复杂性