服务端实现
Node.js、Edge Function
6.1 核心理解:服务端的视角
从服务端角度看,SSE 流式响应可以拆解为 "入口(Input)" 和 "出口(Output)" 两部分:
┌─────────────────────────────────────────────────────────────────────────┐
│ 服务端 SSE 处理流程 │
├─────────────────────────────────────────────────────────────────────────┤
│ │
│ 入口 (Input) ─────────┐ │
│ • 接收请求 │ │
│ • 解析参数 │ ✅ GET 和 POST 的唯一区别 │
│ • 身份验证 │ │
│ ──────────────────────┘ │
│ │
│ 核心逻辑 ──────────────┐ │
│ • 调用 LLM API │ │
│ • 处理业务逻辑 │ ✅ 完全相同 │
│ ──────────────────────┘ │
│ │
│ 出口 (Output) ─────────┐ │
│ • 设置 SSE 响应头 │ │
│ • 输出流式数据 │ ✅ 完全相同 │
│ • 发送 [DONE] 标记 │ │
│ ──────────────────────┘ │
│ │
└─────────────────────────────────────────────────────────────────────────┘关键结论:无论客户端用 GET 还是 POST,服务端的"出口"(推流逻辑)完全一样,只需调整"入口"(参数获取方式)。
6.2 Node.js 实现对比
场景 A:标准 SSE (支持 GET)
javascript
// 适用于:简单通知、公开数据流
app.get('/api/sse-stream', (req, res) => {
// ===== 入口:从 Query String 获取参数 =====
const userMessage = req.query.msg;
// ===== 出口:设置 SSE 响应头(与 POST 完全相同)=====
res.setHeader('Content-Type', 'text/event-stream');
res.setHeader('Cache-Control', 'no-cache');
res.setHeader('Connection', 'keep-alive');
res.flushHeaders();
// ===== 推流逻辑(与 POST 完全相同)=====
sendStreamData(res, userMessage);
});场景 B:Fetch Streaming (支持 POST)
javascript
// 适用于:LLM 对话、需要认证的场景
app.post('/api/chat', async (req, res) => {
// ===== 入口:从 Request Body 获取参数(唯一区别)=====
const { messages } = req.body;
// ===== 出口:设置 SSE 响应头(与 GET 完全相同)=====
res.setHeader('Content-Type', 'text/event-stream');
res.setHeader('Cache-Control', 'no-cache');
res.setHeader('Connection', 'keep-alive');
res.setHeader('X-Accel-Buffering', 'no'); // Nginx 专用
res.flushHeaders();
// ===== 推流逻辑(与 GET 完全相同)=====
try {
// 调用 OpenAI API
const stream = await openai.chat.completions.create({
model: 'gpt-4',
messages,
stream: true,
});
// 转发流数据
for await (const chunk of stream) {
const content = chunk.choices[0]?.delta?.content || '';
if (content) {
res.write(`data: ${JSON.stringify({ content })}\n\n`);
}
}
res.write('data: [DONE]\n\n');
} catch (error) {
res.write(`data: ${JSON.stringify({ error: error.message })}\n\n`);
} finally {
res.end();
}
// 客户端断开时清理资源
req.on('close', () => {
console.log('Client disconnected');
});
});6.3 通用推流函数(复用核心逻辑)
javascript
// 提取通用的推流逻辑
async function sendStreamData(res, userMessage) {
try {
const stream = await callLLMAPI(userMessage);
for await (const chunk of stream) {
// 这部分逻辑在 GET 和 POST 中完全一样
res.write(`data: ${JSON.stringify(chunk)}\n\n`);
}
res.write('data: [DONE]\n\n');
} catch (error) {
res.write(`data: ${JSON.stringify({ error: error.message })}\n\n`);
} finally {
res.end();
}
}
// 同时支持 GET 和 POST
app.get('/api/stream', (req, res) => {
res.setHeader('Content-Type', 'text/event-stream');
res.setHeader('Cache-Control', 'no-cache');
res.setHeader('Connection', 'keep-alive');
res.flushHeaders();
const message = req.query.msg;
sendStreamData(res, message);
});
app.post('/api/stream', async (req, res) => {
res.setHeader('Content-Type', 'text/event-stream');
res.setHeader('Cache-Control', 'no-cache');
res.setHeader('Connection', 'keep-alive');
res.flushHeaders();
const { message } = req.body;
sendStreamData(res, message);
});6.4 Edge Function (Vercel/Cloudflare)
typescript
// app/api/chat/route.ts (Next.js App Router)
import { OpenAIStream, StreamingTextResponse } from 'ai';
import OpenAI from 'openai';
export const runtime = 'edge';
const openai = new OpenAI({ apiKey: process.env.OPENAI_API_KEY });
export async function POST(req: Request) {
const { messages } = await req.json();
const response = await openai.chat.completions.create({
model: 'gpt-4',
messages,
stream: true,
});
// 使用 Vercel AI SDK 简化流处理
const stream = OpenAIStream(response);
return new StreamingTextResponse(stream);
}