Skip to content

前端流式通信完整指南

🎯 SSE + Fetch Streaming + WebSocket 三种方案对比,LLM 打字机效果实战


1. 流式通信架构总览

1.1 三种方案对比

┌─────────────────────────────────────────────────────────────────────────┐
│                    前端流式通信方案对比                                  │
├─────────────────────────────────────────────────────────────────────────┤
│                                                                         │
│   特性           │ SSE (EventSource)  │ Fetch Streaming │ WebSocket    │
│  ────────────────┼───────────────────┼─────────────────┼─────────────  │
│   方向           │ 服务端→客户端      │ 服务端→客户端   │ 双向         │
│   请求方法       │ 仅 GET             │ 任意 (POST)     │ 独立协议     │
│   自定义 Header  │ ❌                 │ ✅              │ ❌           │
│   取消请求       │ close()            │ AbortController │ close()      │
│   自动重连       │ ✅ 内置            │ ❌ 需手动       │ ❌ 需手动    │
│   二进制数据     │ ❌                 │ ✅              │ ✅           │
│   浏览器支持     │ IE 不支持          │ 全面支持        │ 全面支持     │
│   复杂度         │ 低                 │ 中              │ 中           │
│                                                                         │
│   推荐场景:                                                             │
│   • SSE: 简单通知推送、无需认证的公开流                                 │
│   • Fetch Streaming: LLM 对话 (需 POST + Authorization)                │
│   • WebSocket: 实时双向通信 (聊天室、协作编辑)                          │
│                                                                         │
└─────────────────────────────────────────────────────────────────────────┘

1.2 LLM 流式架构

┌─────────────────────────────────────────────────────────────────────────┐
│                    LLM 流式响应完整架构                                  │
├─────────────────────────────────────────────────────────────────────────┤
│                                                                         │
│  ┌──────────────┐      POST /chat       ┌──────────────┐               │
│  │   React UI   │ ────────────────────▶ │   BFF/Edge   │               │
│  │              │   Authorization:      │   Function   │               │
│  │  ┌────────┐  │   Bearer xxx          └──────┬───────┘               │
│  │  │ChatBox │  │                              │                        │
│  │  └────────┘  │                              │ stream: true           │
│  │       │      │                              ▼                        │
│  │       │      │                       ┌──────────────┐               │
│  │       ▼      │   ◀────────────────── │  OpenAI API  │               │
│  │  ┌────────┐  │   SSE: data: {...}   │   GPT-4      │               │
│  │  │Markdown│  │        data: {...}   └──────────────┘               │
│  │  │Render  │  │        data: [DONE]                                 │
│  │  └────────┘  │                                                      │
│  └──────────────┘                                                      │
│                                                                         │
│  数据流转:                                                             │
│  ─────────                                                             │
│  fetch(POST) → response.body (ReadableStream)                         │
│              → reader.read() (Uint8Array)                             │
│              → TextDecoder.decode() (string)                          │
│              → 解析 SSE "data: {...}"                                 │
│              → JSON.parse() → delta.content                           │
│              → setState() → UI 更新                                   │
│                                                                         │
└─────────────────────────────────────────────────────────────────────────┘

2. SSE (Server-Sent Events)

2.1 SSE 协议格式

http
HTTP/1.1 200 OK
Content-Type: text/event-stream
Cache-Control: no-cache
Connection: keep-alive
X-Accel-Buffering: no    # Nginx 禁用缓冲
┌─────────────────────────────────────────────────────────────────────────┐
│                          SSE 消息格式                                    │
├─────────────────────────────────────────────────────────────────────────┤
│  字段       │ 格式              │ 说明                                  │
├─────────────────────────────────────────────────────────────────────────┤
│  data       │ data: 内容\n      │ 消息内容 (必须)                       │
│  event      │ event: 事件名\n   │ 自定义事件名 (默认 message)           │
│  id         │ id: 123\n         │ 消息 ID, 用于断线重连                 │
│  retry      │ retry: 3000\n     │ 重连间隔 (毫秒)                       │
│  注释       │ : 这是注释\n      │ 心跳保活常用                          │
├─────────────────────────────────────────────────────────────────────────┤
│  ⚠️ 每条消息必须以 \n\n (空行) 结尾                                      │
└─────────────────────────────────────────────────────────────────────────┘

2.2 EventSource 客户端

javascript
// 基础用法 (仅支持 GET)
const eventSource = new EventSource('/api/stream');

eventSource.onopen = () => {
    console.log('✅ 连接已建立');
};

eventSource.onmessage = (event) => {
    if (event.data === '[DONE]') {
        eventSource.close();
        return;
    }
    
    const { content } = JSON.parse(event.data);
    document.getElementById('output').textContent += content;
};

eventSource.onerror = (error) => {
    if (eventSource.readyState === EventSource.CLOSED) {
        console.log('❌ 连接已关闭');
    } else {
        console.log('⚠️ 连接错误, 正在重连...');
        // EventSource 会自动重连
    }
};

// 主动关闭
function stopGeneration() {
    eventSource.close();
}

2.3 EventSource 局限性

❌ 只支持 GET 请求 → 无法 POST prompt
❌ 无法自定义 Header → 无法携带 Authorization
❌ HTTP/1.1 同域最多 6 连接 → 多 SSE 会阻塞请求

🔧 解决方案: 使用 Fetch API + ReadableStream

3. Fetch Streaming (LLM 标准方案)

3.1 为什么选择 Fetch Streaming

┌─────────────────────────────────────────────────────────────────────────┐
│                    EventSource vs Fetch Streaming                        │
├─────────────────────────────────────────────────────────────────────────┤
│  需求                      │ EventSource │ Fetch Streaming              │
├─────────────────────────────────────────────────────────────────────────┤
│  POST 请求发送 prompt      │      ❌      │        ✅                    │
│  自定义 Headers            │      ❌      │        ✅                    │
│  Authorization 认证        │      ❌      │        ✅                    │
│  取消请求 (AbortController)│      ❌      │        ✅                    │
│  精细流控制                │      ❌      │        ✅                    │
│  自动重连                  │      ✅      │        ❌ (需手动)           │
└─────────────────────────────────────────────────────────────────────────┘

3.2 核心 API 详解

javascript
const response = await fetch('/api/chat', {
    method: 'POST',
    headers: {
        'Content-Type': 'application/json',
        'Authorization': 'Bearer sk-xxx'
    },
    body: JSON.stringify({ 
        model: 'gpt-4',
        messages: [{ role: 'user', content: 'Hello' }],
        stream: true
    })
});

// response.body 是 ReadableStream
const reader = response.body.getReader();
const decoder = new TextDecoder('utf-8');

while (true) {
    const { done, value } = await reader.read();
    
    if (done) {
        console.log('Stream finished');
        break;
    }
    
    // Uint8Array → string
    const chunk = decoder.decode(value, { stream: true });
    console.log('Chunk:', chunk);
}

3.3 数据流转过程

┌─────────────────────────────────────────────────────────────────────────┐
│                        数据流转过程                                      │
├─────────────────────────────────────────────────────────────────────────┤
│                                                                         │
│  Server         Network          fetch()         User Code              │
│    │               │                │                │                  │
│    │──"Hello"────►│──Uint8Array──►│──.read()─────►│                  │
│    │               │   [72,101,    │               │                  │
│    │               │    108,108,   │               │                  │
│    │               │    111]       │               │                  │
│    │               │                │                │                  │
│    │                                                │                  │
│    │                             TextDecoder        │                  │
│    │                             .decode()          │                  │
│    │                                │               │                  │
│    │                                └─►"Hello"────►│                  │
│                                                                         │
└─────────────────────────────────────────────────────────────────────────┘

3.4 TextDecoder 的 stream: true

javascript
// 问题: 多字节字符 (如中文) 可能被拆分到不同 chunk

// Chunk 1: [228, 184]      (中的前 2 字节)
// Chunk 2: [173, 230, 150, 135]  (中的后 1 字节 + 文)

// ❌ 错误: 每次独立解码
const decoder = new TextDecoder();
decoder.decode([228, 184]);           // 乱码 "�"
decoder.decode([173, 230, 150, 135]); // 乱码 "��文"

// ✅ 正确: stream: true 保留不完整序列
const decoder = new TextDecoder();
decoder.decode(chunk1, { stream: true }); // "" (等待)
decoder.decode(chunk2, { stream: true }); // "中文"

3.5 完整 SSE 格式解析

javascript
// OpenAI 响应格式:
// data: {"id":"chatcmpl-xxx","choices":[{"delta":{"content":"Hello"}}]}
// data: {"id":"chatcmpl-xxx","choices":[{"delta":{"content":" World"}}]}
// data: [DONE]

async function* streamChat(messages) {
    const response = await fetch('https://api.openai.com/v1/chat/completions', {
        method: 'POST',
        headers: {
            'Content-Type': 'application/json',
            'Authorization': `Bearer ${API_KEY}`
        },
        body: JSON.stringify({
            model: 'gpt-4',
            messages,
            stream: true
        })
    });

    if (!response.ok) {
        throw new Error(`HTTP ${response.status}: ${await response.text()}`);
    }

    const reader = response.body.getReader();
    const decoder = new TextDecoder();
    let buffer = '';

    try {
        while (true) {
            const { done, value } = await reader.read();
            if (done) break;

            buffer += decoder.decode(value, { stream: true });

            // 按行分割
            const lines = buffer.split('\n');
            buffer = lines.pop() || ''; // 保留不完整的行

            for (const line of lines) {
                const trimmed = line.trim();
                
                // 跳过空行和注释
                if (!trimmed || trimmed.startsWith(':')) continue;
                
                // 解析 data: 前缀
                if (trimmed.startsWith('data:')) {
                    const data = trimmed.slice(5).trim();
                    
                    // 结束标记
                    if (data === '[DONE]') return;
                    
                    try {
                        const json = JSON.parse(data);
                        const content = json.choices?.[0]?.delta?.content;
                        if (content) yield content;
                    } catch (e) {
                        console.warn('Parse error:', data);
                    }
                }
            }
        }
    } finally {
        reader.releaseLock();
    }
}

// 使用
const output = document.getElementById('output');
output.textContent = '';

for await (const token of streamChat([
    { role: 'user', content: '写一首关于春天的诗' }
])) {
    output.textContent += token;
}

4. AbortController 取消请求

4.1 基础用法

javascript
const controller = new AbortController();

// 传入 signal
const response = await fetch('/api/chat', {
    method: 'POST',
    signal: controller.signal,
    body: JSON.stringify({ prompt: 'Hello' })
});

// 用户点击 "停止生成"
document.getElementById('stop').onclick = () => {
    controller.abort();
};

4.2 完整 ChatStream 类

javascript
class ChatStream {
    constructor() {
        this.controller = null;
    }

    async stream(messages, onToken) {
        // 取消之前的请求
        this.abort();
        
        this.controller = new AbortController();
        const signal = this.controller.signal;

        try {
            const response = await fetch('/api/chat', {
                method: 'POST',
                headers: { 'Content-Type': 'application/json' },
                body: JSON.stringify({ messages }),
                signal
            });

            const reader = response.body.getReader();
            const decoder = new TextDecoder();
            let buffer = '';

            while (true) {
                if (signal.aborted) break;

                const { done, value } = await reader.read();
                if (done) break;

                buffer += decoder.decode(value, { stream: true });
                const lines = buffer.split('\n');
                buffer = lines.pop() || '';

                for (const line of lines) {
                    if (line.startsWith('data: ')) {
                        const data = line.slice(6);
                        if (data === '[DONE]') return;
                        
                        const json = JSON.parse(data);
                        const content = json.choices?.[0]?.delta?.content;
                        if (content) onToken(content);
                    }
                }
            }
        } catch (error) {
            if (error.name === 'AbortError') {
                console.log('Stream aborted by user');
            } else {
                throw error;
            }
        } finally {
            this.controller = null;
        }
    }

    abort() {
        this.controller?.abort();
    }
}

// 使用
const chat = new ChatStream();

document.getElementById('send').onclick = () => {
    const output = document.getElementById('output');
    output.textContent = '';
    
    chat.stream(
        [{ role: 'user', content: 'Hello' }],
        (token) => { output.textContent += token; }
    );
};

document.getElementById('stop').onclick = () => {
    chat.abort();
};

4.3 超时控制

javascript
// 方法 1: AbortSignal.timeout() (现代浏览器)
const response = await fetch(url, {
    signal: AbortSignal.timeout(30000) // 30秒超时
});

// 方法 2: 手动实现
const controller = new AbortController();
const timeoutId = setTimeout(() => controller.abort(), 30000);

try {
    const response = await fetch(url, { signal: controller.signal });
    // ...
} finally {
    clearTimeout(timeoutId);
}

// 方法 3: 合并多个 signal
const userController = new AbortController();
const timeoutSignal = AbortSignal.timeout(30000);

const response = await fetch(url, {
    signal: AbortSignal.any([userController.signal, timeoutSignal])
});

5. 服务端实现

5.1 Node.js + Express

javascript
app.post('/api/chat', async (req, res) => {
    const { messages } = req.body;
    
    // 设置 SSE 响应头
    res.setHeader('Content-Type', 'text/event-stream');
    res.setHeader('Cache-Control', 'no-cache');
    res.setHeader('Connection', 'keep-alive');
    res.setHeader('X-Accel-Buffering', 'no'); // Nginx
    res.flushHeaders();
    
    try {
        // 调用 OpenAI
        const stream = await openai.chat.completions.create({
            model: 'gpt-4',
            messages,
            stream: true,
        });
        
        // 转发流
        for await (const chunk of stream) {
            const content = chunk.choices[0]?.delta?.content || '';
            if (content) {
                res.write(`data: ${JSON.stringify({ content })}\n\n`);
            }
        }
        
        res.write('data: [DONE]\n\n');
    } catch (error) {
        res.write(`data: ${JSON.stringify({ error: error.message })}\n\n`);
    } finally {
        res.end();
    }
    
    // 客户端断开时清理
    req.on('close', () => {
        // 清理资源
    });
});

5.2 Edge Function (Vercel/Cloudflare)

typescript
// app/api/chat/route.ts (Next.js App Router)
import { OpenAIStream, StreamingTextResponse } from 'ai';
import OpenAI from 'openai';

export const runtime = 'edge';

const openai = new OpenAI({ apiKey: process.env.OPENAI_API_KEY });

export async function POST(req: Request) {
    const { messages } = await req.json();
    
    const response = await openai.chat.completions.create({
        model: 'gpt-4',
        messages,
        stream: true,
    });
    
    // 使用 Vercel AI SDK 简化流处理
    const stream = OpenAIStream(response);
    return new StreamingTextResponse(stream);
}

6. React 集成

6.1 useChatStream Hook

typescript
import { useState, useCallback, useRef } from 'react';

interface Message {
    role: 'user' | 'assistant';
    content: string;
}

export function useChatStream() {
    const [messages, setMessages] = useState<Message[]>([]);
    const [isStreaming, setIsStreaming] = useState(false);
    const controllerRef = useRef<AbortController | null>(null);

    const sendMessage = useCallback(async (content: string) => {
        const userMessage: Message = { role: 'user', content };
        const newMessages = [...messages, userMessage];
        setMessages(newMessages);
        setIsStreaming(true);
        
        // 添加空的 assistant 消息
        let assistantContent = '';
        setMessages([...newMessages, { role: 'assistant', content: '' }]);

        controllerRef.current = new AbortController();

        try {
            const response = await fetch('/api/chat', {
                method: 'POST',
                headers: { 'Content-Type': 'application/json' },
                body: JSON.stringify({ messages: newMessages }),
                signal: controllerRef.current.signal
            });

            const reader = response.body!.getReader();
            const decoder = new TextDecoder();
            let buffer = '';

            while (true) {
                const { done, value } = await reader.read();
                if (done) break;

                buffer += decoder.decode(value, { stream: true });
                const lines = buffer.split('\n');
                buffer = lines.pop() || '';

                for (const line of lines) {
                    if (line.startsWith('data: ') && line !== 'data: [DONE]') {
                        const json = JSON.parse(line.slice(6));
                        const token = json.choices?.[0]?.delta?.content || '';
                        assistantContent += token;
                        
                        setMessages(prev => [
                            ...prev.slice(0, -1),
                            { role: 'assistant', content: assistantContent }
                        ]);
                    }
                }
            }
        } catch (error: any) {
            if (error.name !== 'AbortError') {
                console.error(error);
            }
        } finally {
            setIsStreaming(false);
            controllerRef.current = null;
        }
    }, [messages]);

    const stopStream = useCallback(() => {
        controllerRef.current?.abort();
    }, []);

    return { messages, isStreaming, sendMessage, stopStream };
}

6.2 ChatUI 组件

tsx
import { useState } from 'react';
import { useChatStream } from './useChatStream';
import ReactMarkdown from 'react-markdown';

export function ChatUI() {
    const { messages, isStreaming, sendMessage, stopStream } = useChatStream();
    const [input, setInput] = useState('');

    const handleSubmit = (e: React.FormEvent) => {
        e.preventDefault();
        if (input.trim() && !isStreaming) {
            sendMessage(input);
            setInput('');
        }
    };

    return (
        <div className="chat-container">
            <div className="messages">
                {messages.map((msg, i) => (
                    <div key={i} className={`message ${msg.role}`}>
                        <ReactMarkdown>{msg.content}</ReactMarkdown>
                        {msg.role === 'assistant' && isStreaming && i === messages.length - 1 && (
                            <span className="cursor">▋</span>
                        )}
                    </div>
                ))}
            </div>
            
            <form onSubmit={handleSubmit}>
                <input
                    value={input}
                    onChange={e => setInput(e.target.value)}
                    disabled={isStreaming}
                    placeholder="输入消息..."
                />
                {isStreaming ? (
                    <button type="button" onClick={stopStream}>停止</button>
                ) : (
                    <button type="submit">发送</button>
                )}
            </form>
        </div>
    );
}

6.3 打字机光标动画

css
.cursor {
    display: inline-block;
    animation: blink 1s steps(1) infinite;
}

@keyframes blink {
    0%, 50% { opacity: 1; }
    51%, 100% { opacity: 0; }
}

.message.assistant {
    white-space: pre-wrap;
}

7. 高级用法

7.1 TransformStream 管道

javascript
function createSSEParser() {
    let buffer = '';
    
    return new TransformStream({
        transform(chunk, controller) {
            buffer += chunk;
            const lines = buffer.split('\n');
            buffer = lines.pop() || '';
            
            for (const line of lines) {
                if (line.startsWith('data: ')) {
                    const data = line.slice(6).trim();
                    if (data && data !== '[DONE]') {
                        try {
                            const json = JSON.parse(data);
                            const content = json.choices?.[0]?.delta?.content;
                            if (content) controller.enqueue(content);
                        } catch {}
                    }
                }
            }
        }
    });
}

// 使用管道
const response = await fetch('/api/chat', { method: 'POST', body: '...' });

const textStream = response.body
    .pipeThrough(new TextDecoderStream())
    .pipeThrough(createSSEParser());

for await (const token of textStream) {
    console.log(token);
}

7.2 流中断恢复

javascript
async function streamWithResume(messages, onToken, lastContent = '') {
    const resumeMessages = [...messages];
    
    if (lastContent) {
        resumeMessages.push({ role: 'assistant', content: lastContent });
        resumeMessages.push({ role: 'user', content: '请继续' });
    }

    try {
        for await (const token of streamChat(resumeMessages)) {
            lastContent += token;
            onToken(token);
        }
    } catch (error) {
        if (confirm('连接中断,是否继续?')) {
            return streamWithResume(messages, onToken, lastContent);
        }
    }
}

8. 面试高频问题

Q1: SSE 和 WebSocket 的区别?

维度SSEWebSocket
方向单向 (服务端→客户端)双向
协议HTTP独立协议 (ws://)
自动重连✅ 内置❌ 需手动
二进制
适用场景LLM、通知聊天、协作

Q2: 为什么 LLM 用 Fetch Streaming 而不是 EventSource?

  1. EventSource 只支持 GET,无法 POST prompt
  2. EventSource 无法自定义 Header (Authorization)
  3. Fetch Streaming 支持 AbortController 取消
  4. 更灵活的流处理控制

Q3: TextDecoder 的 stream: true 有什么作用?

处理多字节字符 (如 UTF-8 中文) 被拆分到不同 chunk 的情况。设置 stream: true 让 decoder 保留不完整的字节序列,等待下一个 chunk 正确解码。

Q4: 如何实现"停止生成"功能?

  1. 创建 AbortController
  2. 将 signal 传给 fetch
  3. 用户点击停止时调用 controller.abort()
  4. 捕获 AbortError,不作为错误处理

Q5: ReadableStream 的背压机制是什么?

当消费者 (reader.read()) 处理速度慢于生产者时,ReadableStream 自动暂停数据流入,等待消费者准备好,防止内存无限增长。

Q6: 如何在 Nginx 后正确代理 SSE?

nginx
location /api/chat/stream {
    proxy_pass http://backend;
    proxy_http_version 1.1;
    proxy_set_header Connection "";
    
    # 关键:关闭缓冲
    proxy_buffering off;
    proxy_cache off;
    
    # SSE 特殊处理
    chunked_transfer_encoding on;
}

前端面试知识库