Skip to content

Workflow 编排:构建可靠的 AI 工作流 🔄

"Agent 适合探索,Workflow 适合生产。"

1. Workflow vs Agent

1.1 核心区别

特性AgentWorkflow
控制流LLM 动态决定代码预定义
可预测性
可调试性
适用场景探索性、开放性任务重复性、结构化任务
成本高 (多次 LLM 调用)可控

1.2 何时用 Workflow

适合 Workflow:

  • 代码审查 (固定步骤)
  • 文档生成 (模板化)
  • 数据处理 (ETL)
  • 测试生成 (结构化输出)

适合 Agent:

  • 自由对话
  • 探索性任务
  • 需要创造性的工作

混合使用:

  • Workflow 作为骨架
  • 某些步骤内部使用 Agent 处理复杂逻辑

2. Workflow 基础模式

2.1 Prompt Chain (提示链)

最简单的 Workflow:多个 LLM 调用串联。

javascript
async function generateAPIEndpoint(spec) {
  // Step 1: 生成类型定义
  const types = await llm.chat({
    messages: [{
      role: "user",
      content: `根据以下 API 规格生成 TypeScript 类型定义:\n${spec}`
    }]
  });
  
  // Step 2: 生成验证 Schema
  const validation = await llm.chat({
    messages: [{
      role: "user",
      content: `根据以下类型定义生成 Zod Schema:\n${types}`
    }]
  });
  
  // Step 3: 生成 API Handler
  const handler = await llm.chat({
    messages: [{
      role: "user",
      content: `生成 Next.js API Route Handler:
类型: ${types}
验证: ${validation}
规格: ${spec}`
    }]
  });
  
  return { types, validation, handler };
}

2.2 并行执行 (Parallelization)

独立任务并行处理:

javascript
async function reviewCode(files) {
  // 并行审查所有文件
  const reviews = await Promise.all(
    files.map(file => reviewSingleFile(file))
  );
  
  // 合并结果
  const summary = await llm.chat({
    messages: [{
      role: "user",
      content: `综合以下代码审查结果,生成总结报告:\n${JSON.stringify(reviews)}`
    }]
  });
  
  return { reviews, summary };
}

async function reviewSingleFile(file) {
  return await llm.chat({
    messages: [{
      role: "system",
      content: "你是代码审查专家。指出问题和改进建议。"
    }, {
      role: "user",
      content: `审查这个文件:\n${file.content}`
    }]
  });
}
     ┌─────────────┐
     │   输入文件   │
     └──────┬──────┘

   ┌────────┼────────┐
   ▼        ▼        ▼
┌─────┐  ┌─────┐  ┌─────┐
│File1│  │File2│  │File3│  (并行)
└──┬──┘  └──┬──┘  └──┬──┘
   │        │        │
   └────────┼────────┘

     ┌──────────────┐
     │   综合报告    │
     └──────────────┘

2.3 路由 (Routing)

根据输入类型选择不同处理路径:

javascript
async function handleUserRequest(request) {
  // Step 1: 分类请求
  const classification = await llm.chat({
    messages: [{
      role: "user",
      content: `分类这个请求: "${request}"
      
类别:
1. bug_report - 报告 Bug
2. feature_request - 功能请求  
3. question - 技术问题
4. other - 其他

只输出类别名称。`
    }]
  });
  
  // Step 2: 路由到对应处理器
  const handlers = {
    bug_report: handleBugReport,
    feature_request: handleFeatureRequest,
    question: handleQuestion,
    other: handleOther
  };
  
  const handler = handlers[classification.trim()];
  return await handler(request);
}

async function handleBugReport(request) {
  // 提取 Bug 信息、搜索相关代码、生成修复建议
}

async function handleQuestion(request) {
  // 搜索文档、生成回答
}

3. 高级 Workflow 模式

3.1 编排器模式 (Orchestrator-Workers)

一个 "指挥者" 分配任务给多个 "工作者":

javascript
class OrchestratorWorkflow {
  async run(task) {
    // Orchestrator: 分解任务
    const subtasks = await this.decompose(task);
    
    // Workers: 并行执行子任务
    const results = await Promise.all(
      subtasks.map(subtask => this.worker(subtask))
    );
    
    // Orchestrator: 综合结果
    const final = await this.synthesize(task, results);
    
    return final;
  }
  
  async decompose(task) {
    const response = await llm.chat({
      messages: [{
        role: "system",
        content: "将任务分解为独立的子任务。输出 JSON 数组。"
      }, {
        role: "user",
        content: task
      }],
      response_format: { type: "json_object" }
    });
    
    return JSON.parse(response.content).subtasks;
  }
  
  async worker(subtask) {
    // 每个 worker 可以是简单 LLM 调用或复杂的子 workflow
    return await llm.chat({
      messages: [{
        role: "user",
        content: `完成这个子任务: ${subtask}`
      }]
    });
  }
  
  async synthesize(task, results) {
    return await llm.chat({
      messages: [{
        role: "user",
        content: `原始任务: ${task}
        
子任务结果:
${results.map((r, i) => `${i+1}. ${r}`).join('\n')}

请综合这些结果,生成最终输出。`
      }]
    });
  }
}

3.2 评估器-优化器循环 (Evaluator-Optimizer)

生成 → 评估 → 优化 的循环:

javascript
async function generateWithQuality(task, minScore = 0.8) {
  let result = await generate(task);
  let iteration = 0;
  const maxIterations = 3;
  
  while (iteration < maxIterations) {
    // 评估
    const evaluation = await evaluate(task, result);
    
    if (evaluation.score >= minScore) {
      return result;
    }
    
    // 优化
    result = await optimize(task, result, evaluation.feedback);
    iteration++;
  }
  
  return result;  // 返回最后一次结果
}

async function evaluate(task, result) {
  const response = await llm.chat({
    messages: [{
      role: "system",
      content: `评估以下输出是否满足任务要求。
输出 JSON: { score: 0-1, issues: [], feedback: "" }`
    }, {
      role: "user",
      content: `任务: ${task}\n\n输出: ${result}`
    }],
    response_format: { type: "json_object" }
  });
  
  return JSON.parse(response.content);
}

async function optimize(task, result, feedback) {
  return await llm.chat({
    messages: [{
      role: "user",
      content: `任务: ${task}
      
当前输出: ${result}

反馈: ${feedback}

请根据反馈改进输出。`
    }]
  });
}

3.3 DAG 工作流

复杂依赖关系的有向无环图:

javascript
const workflowDefinition = {
  nodes: {
    'parse_spec': { 
      fn: parseSpec, 
      inputs: ['spec'] 
    },
    'gen_types': { 
      fn: generateTypes, 
      inputs: ['parse_spec'] 
    },
    'gen_schema': { 
      fn: generateSchema, 
      inputs: ['gen_types'] 
    },
    'gen_handler': { 
      fn: generateHandler, 
      inputs: ['gen_types', 'gen_schema'] 
    },
    'gen_tests': { 
      fn: generateTests, 
      inputs: ['gen_handler'] 
    },
    'run_tests': { 
      fn: runTests, 
      inputs: ['gen_handler', 'gen_tests'] 
    }
  }
};

/*
              parse_spec

              gen_types
                 ╱ ╲
        gen_schema  ─┐
                 ╲   │
              gen_handler

              gen_tests

              run_tests
*/

class DAGWorkflow {
  async run(definition, inputs) {
    const results = { ...inputs };
    const completed = new Set(Object.keys(inputs));
    const pending = new Set(Object.keys(definition.nodes));
    
    while (pending.size > 0) {
      // 找出所有依赖已满足的节点
      const ready = [...pending].filter(nodeId => {
        const node = definition.nodes[nodeId];
        return node.inputs.every(dep => completed.has(dep));
      });
      
      if (ready.length === 0) {
        throw new Error("检测到循环依赖或缺少输入");
      }
      
      // 并行执行所有就绪节点
      await Promise.all(ready.map(async (nodeId) => {
        const node = definition.nodes[nodeId];
        const nodeInputs = node.inputs.map(dep => results[dep]);
        
        results[nodeId] = await node.fn(...nodeInputs);
        completed.add(nodeId);
        pending.delete(nodeId);
      }));
    }
    
    return results;
  }
}

4. 人机协作

4.1 Human-in-the-Loop

关键步骤需要人工确认:

javascript
class HumanInLoopWorkflow {
  constructor(confirmFn) {
    this.confirm = confirmFn;
  }
  
  async refactorWithApproval(file) {
    // Step 1: 分析 (自动)
    const analysis = await this.analyze(file);
    
    // Step 2: 生成重构计划 (自动)
    const plan = await this.generatePlan(analysis);
    
    // Step 3: 人工审批计划
    const approved = await this.confirm({
      type: 'plan_approval',
      message: '请审核重构计划',
      plan
    });
    
    if (!approved) {
      return { status: 'cancelled', reason: 'User rejected plan' };
    }
    
    // Step 4: 执行重构 (自动)
    const result = await this.executeRefactor(plan);
    
    // Step 5: 人工审核结果
    const accepted = await this.confirm({
      type: 'result_review',
      message: '请审核重构结果',
      diff: result.diff
    });
    
    if (!accepted) {
      await this.rollback(result);
      return { status: 'rolled_back' };
    }
    
    return { status: 'completed', result };
  }
}

// 使用示例 - 在 UI 中实现确认函数
const workflow = new HumanInLoopWorkflow(async (request) => {
  return new Promise((resolve) => {
    showConfirmDialog({
      ...request,
      onConfirm: () => resolve(true),
      onCancel: () => resolve(false)
    });
  });
});

4.2 中断与恢复

javascript
class InterruptibleWorkflow {
  constructor() {
    this.state = {
      currentStep: 0,
      results: {},
      status: 'idle'
    };
  }
  
  async run() {
    this.state.status = 'running';
    
    const steps = [
      { name: 'analyze', fn: this.analyze },
      { name: 'plan', fn: this.plan },
      { name: 'execute', fn: this.execute },
      { name: 'verify', fn: this.verify }
    ];
    
    while (this.state.currentStep < steps.length) {
      // 检查是否被中断
      if (this.state.status === 'paused') {
        await this.saveState();
        return { status: 'paused', canResume: true };
      }
      
      const step = steps[this.state.currentStep];
      
      try {
        this.state.results[step.name] = await step.fn.call(this);
        this.state.currentStep++;
      } catch (error) {
        this.state.status = 'error';
        this.state.error = error.message;
        return { status: 'error', error, canRetry: true };
      }
    }
    
    this.state.status = 'completed';
    return { status: 'completed', results: this.state.results };
  }
  
  pause() {
    this.state.status = 'paused';
  }
  
  async resume() {
    if (this.state.status !== 'paused') {
      throw new Error('Workflow is not paused');
    }
    return await this.run();
  }
  
  async retry() {
    if (this.state.status !== 'error') {
      throw new Error('Workflow is not in error state');
    }
    this.state.status = 'running';
    return await this.run();
  }
}

5. 错误处理与恢复

5.1 重试策略

javascript
async function withRetry(fn, options = {}) {
  const {
    maxRetries = 3,
    backoff = 'exponential',  // 'linear' | 'exponential' | 'fixed'
    baseDelay = 1000,
    shouldRetry = (error) => true
  } = options;
  
  let lastError;
  
  for (let attempt = 0; attempt <= maxRetries; attempt++) {
    try {
      return await fn();
    } catch (error) {
      lastError = error;
      
      if (attempt === maxRetries || !shouldRetry(error)) {
        throw error;
      }
      
      const delay = calculateDelay(attempt, backoff, baseDelay);
      console.log(`Attempt ${attempt + 1} failed, retrying in ${delay}ms`);
      await sleep(delay);
    }
  }
  
  throw lastError;
}

function calculateDelay(attempt, backoff, baseDelay) {
  switch (backoff) {
    case 'fixed':
      return baseDelay;
    case 'linear':
      return baseDelay * (attempt + 1);
    case 'exponential':
      return baseDelay * Math.pow(2, attempt);
    default:
      return baseDelay;
  }
}

5.2 Fallback 策略

javascript
async function generateWithFallback(prompt) {
  const strategies = [
    { model: 'claude-sonnet-4-20250514', temperature: 0.2 },
    { model: 'gpt-4o', temperature: 0.2 },  // fallback 到其他模型
    { model: 'claude-sonnet-4-20250514', temperature: 0.5 },  // 提高温度重试
  ];
  
  for (const strategy of strategies) {
    try {
      return await llm.chat({
        model: strategy.model,
        temperature: strategy.temperature,
        messages: [{ role: 'user', content: prompt }]
      });
    } catch (error) {
      console.log(`Strategy failed: ${JSON.stringify(strategy)}`);
      continue;
    }
  }
  
  throw new Error('All strategies failed');
}

5.3 补偿操作 (Compensation)

javascript
class CompensatingWorkflow {
  constructor() {
    this.completedActions = [];
  }
  
  async run() {
    try {
      // 执行操作并记录
      await this.executeWithCompensation(
        () => this.createFile('temp.ts', content),
        () => this.deleteFile('temp.ts')
      );
      
      await this.executeWithCompensation(
        () => this.updateConfig('config.json', newConfig),
        () => this.updateConfig('config.json', oldConfig)
      );
      
      // ... 更多操作
      
    } catch (error) {
      // 执行所有补偿操作
      await this.compensate();
      throw error;
    }
  }
  
  async executeWithCompensation(action, compensation) {
    await action();
    this.completedActions.push(compensation);
  }
  
  async compensate() {
    // 反向执行所有补偿操作
    for (const compensation of this.completedActions.reverse()) {
      try {
        await compensation();
      } catch (e) {
        console.error('Compensation failed:', e);
      }
    }
    this.completedActions = [];
  }
}

6. 可观测性

6.1 Workflow 追踪

javascript
class TracedWorkflow {
  constructor(name) {
    this.name = name;
    this.traceId = crypto.randomUUID();
    this.spans = [];
  }
  
  async runStep(stepName, fn) {
    const span = {
      stepName,
      startTime: Date.now(),
      traceId: this.traceId
    };
    
    try {
      const result = await fn();
      span.endTime = Date.now();
      span.duration = span.endTime - span.startTime;
      span.status = 'success';
      span.result = this.summarize(result);
      return result;
    } catch (error) {
      span.endTime = Date.now();
      span.duration = span.endTime - span.startTime;
      span.status = 'error';
      span.error = error.message;
      throw error;
    } finally {
      this.spans.push(span);
    }
  }
  
  getTrace() {
    return {
      traceId: this.traceId,
      workflow: this.name,
      totalDuration: this.spans.reduce((sum, s) => sum + s.duration, 0),
      steps: this.spans
    };
  }
}

// 使用
const workflow = new TracedWorkflow('code-review');

const types = await workflow.runStep('generate-types', () => generateTypes(spec));
const schema = await workflow.runStep('generate-schema', () => generateSchema(types));

console.log(workflow.getTrace());

6.2 成本追踪

javascript
class CostTracker {
  constructor() {
    this.calls = [];
  }
  
  // 包装 LLM 调用
  wrap(llm) {
    const original = llm.chat.bind(llm);
    
    llm.chat = async (options) => {
      const start = Date.now();
      const response = await original(options);
      
      this.calls.push({
        model: options.model,
        inputTokens: response.usage.prompt_tokens,
        outputTokens: response.usage.completion_tokens,
        cost: this.calculateCost(options.model, response.usage),
        latency: Date.now() - start
      });
      
      return response;
    };
    
    return llm;
  }
  
  calculateCost(model, usage) {
    const pricing = {
      'claude-sonnet-4-20250514': { input: 0.003, output: 0.015 },
      'gpt-4o': { input: 0.005, output: 0.015 },
      'gpt-4o-mini': { input: 0.00015, output: 0.0006 }
    };
    
    const price = pricing[model] || { input: 0, output: 0 };
    return (
      (usage.prompt_tokens / 1000) * price.input +
      (usage.completion_tokens / 1000) * price.output
    );
  }
  
  getSummary() {
    return {
      totalCalls: this.calls.length,
      totalCost: this.calls.reduce((sum, c) => sum + c.cost, 0),
      totalTokens: this.calls.reduce((sum, c) => sum + c.inputTokens + c.outputTokens, 0),
      avgLatency: this.calls.reduce((sum, c) => sum + c.latency, 0) / this.calls.length,
      byModel: this.groupBy('model')
    };
  }
}

7. 实战:代码审查 Workflow

javascript
class CodeReviewWorkflow {
  constructor() {
    this.tracker = new CostTracker();
    this.llm = this.tracker.wrap(anthropic);
  }
  
  async review(prFiles) {
    const workflow = new TracedWorkflow('code-review');
    
    // 1. 并行分析每个文件
    const fileAnalyses = await workflow.runStep('analyze-files', async () => {
      return Promise.all(
        prFiles.map(file => this.analyzeFile(file))
      );
    });
    
    // 2. 识别跨文件问题
    const crossFileIssues = await workflow.runStep('cross-file-analysis', async () => {
      return this.analyzeCrossFile(fileAnalyses);
    });
    
    // 3. 生成建议
    const suggestions = await workflow.runStep('generate-suggestions', async () => {
      return this.generateSuggestions([...fileAnalyses, crossFileIssues]);
    });
    
    // 4. 格式化输出
    const report = await workflow.runStep('format-report', async () => {
      return this.formatReport({
        files: fileAnalyses,
        crossFile: crossFileIssues,
        suggestions
      });
    });
    
    return {
      report,
      trace: workflow.getTrace(),
      cost: this.tracker.getSummary()
    };
  }
  
  async analyzeFile(file) {
    return this.llm.chat({
      model: 'claude-sonnet-4-20250514',
      messages: [{
        role: 'system',
        content: CODE_REVIEW_SYSTEM_PROMPT
      }, {
        role: 'user',
        content: `审查这个文件:\n\n文件名: ${file.name}\n\n\`\`\`\n${file.content}\n\`\`\``
      }],
      response_format: { type: 'json_object' }
    });
  }
}

const CODE_REVIEW_SYSTEM_PROMPT = `你是代码审查专家。分析代码并识别:
- 安全问题
- 性能问题
- 可维护性问题
- 最佳实践违规

输出 JSON:
{
  "issues": [
    {
      "severity": "critical|major|minor",
      "type": "security|performance|maintainability|style",
      "line": number,
      "description": string,
      "suggestion": string
    }
  ],
  "summary": string
}`;

8. 关键要点

  1. Workflow 比 Agent 更可控: 适合生产环境的重复性任务
  2. 选择合适的模式: Chain、Parallel、Router、Orchestrator
  3. 人机协作很重要: 关键步骤需要人工确认
  4. 健壮的错误处理: 重试、Fallback、补偿
  5. 可观测性: 追踪每步执行,监控成本
  6. 增量优化: 从简单 Chain 开始,按需添加复杂性

延伸阅读

前端面试知识库