Skip to content

服务端实现

Node.js、Edge Function

6.1 核心理解:服务端的视角

从服务端角度看,SSE 流式响应可以拆解为 "入口(Input)""出口(Output)" 两部分:

┌─────────────────────────────────────────────────────────────────────────┐
│                        服务端 SSE 处理流程                               │
├─────────────────────────────────────────────────────────────────────────┤
│                                                                         │
│  入口 (Input) ─────────┐                                               │
│  • 接收请求           │                                               │
│  • 解析参数           │  ✅ GET 和 POST 的唯一区别                     │
│  • 身份验证           │                                               │
│  ──────────────────────┘                                               │
│                                                                         │
│  核心逻辑 ──────────────┐                                              │
│  • 调用 LLM API        │                                              │
│  • 处理业务逻辑        │  ✅ 完全相同                                  │
│  ──────────────────────┘                                               │
│                                                                         │
│  出口 (Output) ─────────┐                                              │
│  • 设置 SSE 响应头      │                                              │
│  • 输出流式数据         │  ✅ 完全相同                                  │
│  • 发送 [DONE] 标记    │                                              │
│  ──────────────────────┘                                               │
│                                                                         │
└─────────────────────────────────────────────────────────────────────────┘

关键结论:无论客户端用 GET 还是 POST,服务端的"出口"(推流逻辑)完全一样,只需调整"入口"(参数获取方式)。

6.2 Node.js 实现对比

场景 A:标准 SSE (支持 GET)

javascript
// 适用于:简单通知、公开数据流
app.get('/api/sse-stream', (req, res) => {
    // ===== 入口:从 Query String 获取参数 =====
    const userMessage = req.query.msg;

    // ===== 出口:设置 SSE 响应头(与 POST 完全相同)=====
    res.setHeader('Content-Type', 'text/event-stream');
    res.setHeader('Cache-Control', 'no-cache');
    res.setHeader('Connection', 'keep-alive');
    res.flushHeaders();

    // ===== 推流逻辑(与 POST 完全相同)=====
    sendStreamData(res, userMessage);
});

场景 B:Fetch Streaming (支持 POST)

javascript
// 适用于:LLM 对话、需要认证的场景
app.post('/api/chat', async (req, res) => {
    // ===== 入口:从 Request Body 获取参数(唯一区别)=====
    const { messages } = req.body;

    // ===== 出口:设置 SSE 响应头(与 GET 完全相同)=====
    res.setHeader('Content-Type', 'text/event-stream');
    res.setHeader('Cache-Control', 'no-cache');
    res.setHeader('Connection', 'keep-alive');
    res.setHeader('X-Accel-Buffering', 'no'); // Nginx 专用
    res.flushHeaders();

    // ===== 推流逻辑(与 GET 完全相同)=====
    try {
        // 调用 OpenAI API
        const stream = await openai.chat.completions.create({
            model: 'gpt-4',
            messages,
            stream: true,
        });

        // 转发流数据
        for await (const chunk of stream) {
            const content = chunk.choices[0]?.delta?.content || '';
            if (content) {
                res.write(`data: ${JSON.stringify({ content })}\n\n`);
            }
        }

        res.write('data: [DONE]\n\n');
    } catch (error) {
        res.write(`data: ${JSON.stringify({ error: error.message })}\n\n`);
    } finally {
        res.end();
    }

    // 客户端断开时清理资源
    req.on('close', () => {
        console.log('Client disconnected');
    });
});

6.3 通用推流函数(复用核心逻辑)

javascript
// 提取通用的推流逻辑
async function sendStreamData(res, userMessage) {
    try {
        const stream = await callLLMAPI(userMessage);

        for await (const chunk of stream) {
            // 这部分逻辑在 GET 和 POST 中完全一样
            res.write(`data: ${JSON.stringify(chunk)}\n\n`);
        }

        res.write('data: [DONE]\n\n');
    } catch (error) {
        res.write(`data: ${JSON.stringify({ error: error.message })}\n\n`);
    } finally {
        res.end();
    }
}

// 同时支持 GET 和 POST
app.get('/api/stream', (req, res) => {
    res.setHeader('Content-Type', 'text/event-stream');
    res.setHeader('Cache-Control', 'no-cache');
    res.setHeader('Connection', 'keep-alive');
    res.flushHeaders();

    const message = req.query.msg;
    sendStreamData(res, message);
});

app.post('/api/stream', async (req, res) => {
    res.setHeader('Content-Type', 'text/event-stream');
    res.setHeader('Cache-Control', 'no-cache');
    res.setHeader('Connection', 'keep-alive');
    res.flushHeaders();

    const { message } = req.body;
    sendStreamData(res, message);
});

6.4 Edge Function (Vercel/Cloudflare)

typescript
// app/api/chat/route.ts (Next.js App Router)
import { OpenAIStream, StreamingTextResponse } from 'ai';
import OpenAI from 'openai';

export const runtime = 'edge';

const openai = new OpenAI({ apiKey: process.env.OPENAI_API_KEY });

export async function POST(req: Request) {
    const { messages } = await req.json();

    const response = await openai.chat.completions.create({
        model: 'gpt-4',
        messages,
        stream: true,
    });

    // 使用 Vercel AI SDK 简化流处理
    const stream = OpenAIStream(response);
    return new StreamingTextResponse(stream);
}

前端面试知识库