前端流式通信完整指南
🎯 SSE + Fetch Streaming + WebSocket 三种方案对比,LLM 打字机效果实战
1. 流式通信架构总览
1.1 三种方案对比
┌─────────────────────────────────────────────────────────────────────────┐
│ 前端流式通信方案对比 │
├─────────────────────────────────────────────────────────────────────────┤
│ │
│ 特性 │ SSE (EventSource) │ Fetch Streaming │ WebSocket │
│ ────────────────┼───────────────────┼─────────────────┼───────────── │
│ 方向 │ 服务端→客户端 │ 服务端→客户端 │ 双向 │
│ 请求方法 │ 仅 GET │ 任意 (POST) │ 独立协议 │
│ 自定义 Header │ ❌ │ ✅ │ ❌ │
│ 取消请求 │ close() │ AbortController │ close() │
│ 自动重连 │ ✅ 内置 │ ❌ 需手动 │ ❌ 需手动 │
│ 二进制数据 │ ❌ │ ✅ │ ✅ │
│ 浏览器支持 │ IE 不支持 │ 全面支持 │ 全面支持 │
│ 复杂度 │ 低 │ 中 │ 中 │
│ │
│ 推荐场景: │
│ • SSE: 简单通知推送、无需认证的公开流 │
│ • Fetch Streaming: LLM 对话 (需 POST + Authorization) │
│ • WebSocket: 实时双向通信 (聊天室、协作编辑) │
│ │
└─────────────────────────────────────────────────────────────────────────┘1.2 LLM 流式架构
┌─────────────────────────────────────────────────────────────────────────┐
│ LLM 流式响应完整架构 │
├─────────────────────────────────────────────────────────────────────────┤
│ │
│ ┌──────────────┐ POST /chat ┌──────────────┐ │
│ │ React UI │ ────────────────────▶ │ BFF/Edge │ │
│ │ │ Authorization: │ Function │ │
│ │ ┌────────┐ │ Bearer xxx └──────┬───────┘ │
│ │ │ChatBox │ │ │ │
│ │ └────────┘ │ │ stream: true │
│ │ │ │ ▼ │
│ │ │ │ ┌──────────────┐ │
│ │ ▼ │ ◀────────────────── │ OpenAI API │ │
│ │ ┌────────┐ │ SSE: data: {...} │ GPT-4 │ │
│ │ │Markdown│ │ data: {...} └──────────────┘ │
│ │ │Render │ │ data: [DONE] │
│ │ └────────┘ │ │
│ └──────────────┘ │
│ │
│ 数据流转: │
│ ───────── │
│ fetch(POST) → response.body (ReadableStream) │
│ → reader.read() (Uint8Array) │
│ → TextDecoder.decode() (string) │
│ → 解析 SSE "data: {...}" │
│ → JSON.parse() → delta.content │
│ → setState() → UI 更新 │
│ │
└─────────────────────────────────────────────────────────────────────────┘2. SSE (Server-Sent Events)
2.1 SSE 协议格式
http
HTTP/1.1 200 OK
Content-Type: text/event-stream
Cache-Control: no-cache
Connection: keep-alive
X-Accel-Buffering: no # Nginx 禁用缓冲┌─────────────────────────────────────────────────────────────────────────┐
│ SSE 消息格式 │
├─────────────────────────────────────────────────────────────────────────┤
│ 字段 │ 格式 │ 说明 │
├─────────────────────────────────────────────────────────────────────────┤
│ data │ data: 内容\n │ 消息内容 (必须) │
│ event │ event: 事件名\n │ 自定义事件名 (默认 message) │
│ id │ id: 123\n │ 消息 ID, 用于断线重连 │
│ retry │ retry: 3000\n │ 重连间隔 (毫秒) │
│ 注释 │ : 这是注释\n │ 心跳保活常用 │
├─────────────────────────────────────────────────────────────────────────┤
│ ⚠️ 每条消息必须以 \n\n (空行) 结尾 │
└─────────────────────────────────────────────────────────────────────────┘2.2 EventSource 客户端
javascript
// 基础用法 (仅支持 GET)
const eventSource = new EventSource('/api/stream');
eventSource.onopen = () => {
console.log('✅ 连接已建立');
};
eventSource.onmessage = (event) => {
if (event.data === '[DONE]') {
eventSource.close();
return;
}
const { content } = JSON.parse(event.data);
document.getElementById('output').textContent += content;
};
eventSource.onerror = (error) => {
if (eventSource.readyState === EventSource.CLOSED) {
console.log('❌ 连接已关闭');
} else {
console.log('⚠️ 连接错误, 正在重连...');
// EventSource 会自动重连
}
};
// 主动关闭
function stopGeneration() {
eventSource.close();
}2.3 EventSource 局限性
❌ 只支持 GET 请求 → 无法 POST prompt
❌ 无法自定义 Header → 无法携带 Authorization
❌ HTTP/1.1 同域最多 6 连接 → 多 SSE 会阻塞请求
🔧 解决方案: 使用 Fetch API + ReadableStream3. Fetch Streaming (LLM 标准方案)
3.1 为什么选择 Fetch Streaming
┌─────────────────────────────────────────────────────────────────────────┐
│ EventSource vs Fetch Streaming │
├─────────────────────────────────────────────────────────────────────────┤
│ 需求 │ EventSource │ Fetch Streaming │
├─────────────────────────────────────────────────────────────────────────┤
│ POST 请求发送 prompt │ ❌ │ ✅ │
│ 自定义 Headers │ ❌ │ ✅ │
│ Authorization 认证 │ ❌ │ ✅ │
│ 取消请求 (AbortController)│ ❌ │ ✅ │
│ 精细流控制 │ ❌ │ ✅ │
│ 自动重连 │ ✅ │ ❌ (需手动) │
└─────────────────────────────────────────────────────────────────────────┘3.2 核心 API 详解
javascript
const response = await fetch('/api/chat', {
method: 'POST',
headers: {
'Content-Type': 'application/json',
'Authorization': 'Bearer sk-xxx'
},
body: JSON.stringify({
model: 'gpt-4',
messages: [{ role: 'user', content: 'Hello' }],
stream: true
})
});
// response.body 是 ReadableStream
const reader = response.body.getReader();
const decoder = new TextDecoder('utf-8');
while (true) {
const { done, value } = await reader.read();
if (done) {
console.log('Stream finished');
break;
}
// Uint8Array → string
const chunk = decoder.decode(value, { stream: true });
console.log('Chunk:', chunk);
}3.3 数据流转过程
┌─────────────────────────────────────────────────────────────────────────┐
│ 数据流转过程 │
├─────────────────────────────────────────────────────────────────────────┤
│ │
│ Server Network fetch() User Code │
│ │ │ │ │ │
│ │──"Hello"────►│──Uint8Array──►│──.read()─────►│ │
│ │ │ [72,101, │ │ │
│ │ │ 108,108, │ │ │
│ │ │ 111] │ │ │
│ │ │ │ │ │
│ │ │ │
│ │ TextDecoder │ │
│ │ .decode() │ │
│ │ │ │ │
│ │ └─►"Hello"────►│ │
│ │
└─────────────────────────────────────────────────────────────────────────┘3.4 TextDecoder 的 stream: true
javascript
// 问题: 多字节字符 (如中文) 可能被拆分到不同 chunk
// Chunk 1: [228, 184] (中的前 2 字节)
// Chunk 2: [173, 230, 150, 135] (中的后 1 字节 + 文)
// ❌ 错误: 每次独立解码
const decoder = new TextDecoder();
decoder.decode([228, 184]); // 乱码 "�"
decoder.decode([173, 230, 150, 135]); // 乱码 "��文"
// ✅ 正确: stream: true 保留不完整序列
const decoder = new TextDecoder();
decoder.decode(chunk1, { stream: true }); // "" (等待)
decoder.decode(chunk2, { stream: true }); // "中文"3.5 完整 SSE 格式解析
javascript
// OpenAI 响应格式:
// data: {"id":"chatcmpl-xxx","choices":[{"delta":{"content":"Hello"}}]}
// data: {"id":"chatcmpl-xxx","choices":[{"delta":{"content":" World"}}]}
// data: [DONE]
async function* streamChat(messages) {
const response = await fetch('https://api.openai.com/v1/chat/completions', {
method: 'POST',
headers: {
'Content-Type': 'application/json',
'Authorization': `Bearer ${API_KEY}`
},
body: JSON.stringify({
model: 'gpt-4',
messages,
stream: true
})
});
if (!response.ok) {
throw new Error(`HTTP ${response.status}: ${await response.text()}`);
}
const reader = response.body.getReader();
const decoder = new TextDecoder();
let buffer = '';
try {
while (true) {
const { done, value } = await reader.read();
if (done) break;
buffer += decoder.decode(value, { stream: true });
// 按行分割
const lines = buffer.split('\n');
buffer = lines.pop() || ''; // 保留不完整的行
for (const line of lines) {
const trimmed = line.trim();
// 跳过空行和注释
if (!trimmed || trimmed.startsWith(':')) continue;
// 解析 data: 前缀
if (trimmed.startsWith('data:')) {
const data = trimmed.slice(5).trim();
// 结束标记
if (data === '[DONE]') return;
try {
const json = JSON.parse(data);
const content = json.choices?.[0]?.delta?.content;
if (content) yield content;
} catch (e) {
console.warn('Parse error:', data);
}
}
}
}
} finally {
reader.releaseLock();
}
}
// 使用
const output = document.getElementById('output');
output.textContent = '';
for await (const token of streamChat([
{ role: 'user', content: '写一首关于春天的诗' }
])) {
output.textContent += token;
}4. AbortController 取消请求
4.1 基础用法
javascript
const controller = new AbortController();
// 传入 signal
const response = await fetch('/api/chat', {
method: 'POST',
signal: controller.signal,
body: JSON.stringify({ prompt: 'Hello' })
});
// 用户点击 "停止生成"
document.getElementById('stop').onclick = () => {
controller.abort();
};4.2 完整 ChatStream 类
javascript
class ChatStream {
constructor() {
this.controller = null;
}
async stream(messages, onToken) {
// 取消之前的请求
this.abort();
this.controller = new AbortController();
const signal = this.controller.signal;
try {
const response = await fetch('/api/chat', {
method: 'POST',
headers: { 'Content-Type': 'application/json' },
body: JSON.stringify({ messages }),
signal
});
const reader = response.body.getReader();
const decoder = new TextDecoder();
let buffer = '';
while (true) {
if (signal.aborted) break;
const { done, value } = await reader.read();
if (done) break;
buffer += decoder.decode(value, { stream: true });
const lines = buffer.split('\n');
buffer = lines.pop() || '';
for (const line of lines) {
if (line.startsWith('data: ')) {
const data = line.slice(6);
if (data === '[DONE]') return;
const json = JSON.parse(data);
const content = json.choices?.[0]?.delta?.content;
if (content) onToken(content);
}
}
}
} catch (error) {
if (error.name === 'AbortError') {
console.log('Stream aborted by user');
} else {
throw error;
}
} finally {
this.controller = null;
}
}
abort() {
this.controller?.abort();
}
}
// 使用
const chat = new ChatStream();
document.getElementById('send').onclick = () => {
const output = document.getElementById('output');
output.textContent = '';
chat.stream(
[{ role: 'user', content: 'Hello' }],
(token) => { output.textContent += token; }
);
};
document.getElementById('stop').onclick = () => {
chat.abort();
};4.3 超时控制
javascript
// 方法 1: AbortSignal.timeout() (现代浏览器)
const response = await fetch(url, {
signal: AbortSignal.timeout(30000) // 30秒超时
});
// 方法 2: 手动实现
const controller = new AbortController();
const timeoutId = setTimeout(() => controller.abort(), 30000);
try {
const response = await fetch(url, { signal: controller.signal });
// ...
} finally {
clearTimeout(timeoutId);
}
// 方法 3: 合并多个 signal
const userController = new AbortController();
const timeoutSignal = AbortSignal.timeout(30000);
const response = await fetch(url, {
signal: AbortSignal.any([userController.signal, timeoutSignal])
});5. 服务端实现
5.1 Node.js + Express
javascript
app.post('/api/chat', async (req, res) => {
const { messages } = req.body;
// 设置 SSE 响应头
res.setHeader('Content-Type', 'text/event-stream');
res.setHeader('Cache-Control', 'no-cache');
res.setHeader('Connection', 'keep-alive');
res.setHeader('X-Accel-Buffering', 'no'); // Nginx
res.flushHeaders();
try {
// 调用 OpenAI
const stream = await openai.chat.completions.create({
model: 'gpt-4',
messages,
stream: true,
});
// 转发流
for await (const chunk of stream) {
const content = chunk.choices[0]?.delta?.content || '';
if (content) {
res.write(`data: ${JSON.stringify({ content })}\n\n`);
}
}
res.write('data: [DONE]\n\n');
} catch (error) {
res.write(`data: ${JSON.stringify({ error: error.message })}\n\n`);
} finally {
res.end();
}
// 客户端断开时清理
req.on('close', () => {
// 清理资源
});
});5.2 Edge Function (Vercel/Cloudflare)
typescript
// app/api/chat/route.ts (Next.js App Router)
import { OpenAIStream, StreamingTextResponse } from 'ai';
import OpenAI from 'openai';
export const runtime = 'edge';
const openai = new OpenAI({ apiKey: process.env.OPENAI_API_KEY });
export async function POST(req: Request) {
const { messages } = await req.json();
const response = await openai.chat.completions.create({
model: 'gpt-4',
messages,
stream: true,
});
// 使用 Vercel AI SDK 简化流处理
const stream = OpenAIStream(response);
return new StreamingTextResponse(stream);
}6. React 集成
6.1 useChatStream Hook
typescript
import { useState, useCallback, useRef } from 'react';
interface Message {
role: 'user' | 'assistant';
content: string;
}
export function useChatStream() {
const [messages, setMessages] = useState<Message[]>([]);
const [isStreaming, setIsStreaming] = useState(false);
const controllerRef = useRef<AbortController | null>(null);
const sendMessage = useCallback(async (content: string) => {
const userMessage: Message = { role: 'user', content };
const newMessages = [...messages, userMessage];
setMessages(newMessages);
setIsStreaming(true);
// 添加空的 assistant 消息
let assistantContent = '';
setMessages([...newMessages, { role: 'assistant', content: '' }]);
controllerRef.current = new AbortController();
try {
const response = await fetch('/api/chat', {
method: 'POST',
headers: { 'Content-Type': 'application/json' },
body: JSON.stringify({ messages: newMessages }),
signal: controllerRef.current.signal
});
const reader = response.body!.getReader();
const decoder = new TextDecoder();
let buffer = '';
while (true) {
const { done, value } = await reader.read();
if (done) break;
buffer += decoder.decode(value, { stream: true });
const lines = buffer.split('\n');
buffer = lines.pop() || '';
for (const line of lines) {
if (line.startsWith('data: ') && line !== 'data: [DONE]') {
const json = JSON.parse(line.slice(6));
const token = json.choices?.[0]?.delta?.content || '';
assistantContent += token;
setMessages(prev => [
...prev.slice(0, -1),
{ role: 'assistant', content: assistantContent }
]);
}
}
}
} catch (error: any) {
if (error.name !== 'AbortError') {
console.error(error);
}
} finally {
setIsStreaming(false);
controllerRef.current = null;
}
}, [messages]);
const stopStream = useCallback(() => {
controllerRef.current?.abort();
}, []);
return { messages, isStreaming, sendMessage, stopStream };
}6.2 ChatUI 组件
tsx
import { useState } from 'react';
import { useChatStream } from './useChatStream';
import ReactMarkdown from 'react-markdown';
export function ChatUI() {
const { messages, isStreaming, sendMessage, stopStream } = useChatStream();
const [input, setInput] = useState('');
const handleSubmit = (e: React.FormEvent) => {
e.preventDefault();
if (input.trim() && !isStreaming) {
sendMessage(input);
setInput('');
}
};
return (
<div className="chat-container">
<div className="messages">
{messages.map((msg, i) => (
<div key={i} className={`message ${msg.role}`}>
<ReactMarkdown>{msg.content}</ReactMarkdown>
{msg.role === 'assistant' && isStreaming && i === messages.length - 1 && (
<span className="cursor">▋</span>
)}
</div>
))}
</div>
<form onSubmit={handleSubmit}>
<input
value={input}
onChange={e => setInput(e.target.value)}
disabled={isStreaming}
placeholder="输入消息..."
/>
{isStreaming ? (
<button type="button" onClick={stopStream}>停止</button>
) : (
<button type="submit">发送</button>
)}
</form>
</div>
);
}6.3 打字机光标动画
css
.cursor {
display: inline-block;
animation: blink 1s steps(1) infinite;
}
@keyframes blink {
0%, 50% { opacity: 1; }
51%, 100% { opacity: 0; }
}
.message.assistant {
white-space: pre-wrap;
}7. 高级用法
7.1 TransformStream 管道
javascript
function createSSEParser() {
let buffer = '';
return new TransformStream({
transform(chunk, controller) {
buffer += chunk;
const lines = buffer.split('\n');
buffer = lines.pop() || '';
for (const line of lines) {
if (line.startsWith('data: ')) {
const data = line.slice(6).trim();
if (data && data !== '[DONE]') {
try {
const json = JSON.parse(data);
const content = json.choices?.[0]?.delta?.content;
if (content) controller.enqueue(content);
} catch {}
}
}
}
}
});
}
// 使用管道
const response = await fetch('/api/chat', { method: 'POST', body: '...' });
const textStream = response.body
.pipeThrough(new TextDecoderStream())
.pipeThrough(createSSEParser());
for await (const token of textStream) {
console.log(token);
}7.2 流中断恢复
javascript
async function streamWithResume(messages, onToken, lastContent = '') {
const resumeMessages = [...messages];
if (lastContent) {
resumeMessages.push({ role: 'assistant', content: lastContent });
resumeMessages.push({ role: 'user', content: '请继续' });
}
try {
for await (const token of streamChat(resumeMessages)) {
lastContent += token;
onToken(token);
}
} catch (error) {
if (confirm('连接中断,是否继续?')) {
return streamWithResume(messages, onToken, lastContent);
}
}
}8. 面试高频问题
Q1: SSE 和 WebSocket 的区别?
| 维度 | SSE | WebSocket |
|---|---|---|
| 方向 | 单向 (服务端→客户端) | 双向 |
| 协议 | HTTP | 独立协议 (ws://) |
| 自动重连 | ✅ 内置 | ❌ 需手动 |
| 二进制 | ❌ | ✅ |
| 适用场景 | LLM、通知 | 聊天、协作 |
Q2: 为什么 LLM 用 Fetch Streaming 而不是 EventSource?
- EventSource 只支持 GET,无法 POST prompt
- EventSource 无法自定义 Header (Authorization)
- Fetch Streaming 支持 AbortController 取消
- 更灵活的流处理控制
Q3: TextDecoder 的 stream: true 有什么作用?
处理多字节字符 (如 UTF-8 中文) 被拆分到不同 chunk 的情况。设置 stream: true 让 decoder 保留不完整的字节序列,等待下一个 chunk 正确解码。
Q4: 如何实现"停止生成"功能?
- 创建 AbortController
- 将 signal 传给 fetch
- 用户点击停止时调用 controller.abort()
- 捕获 AbortError,不作为错误处理
Q5: ReadableStream 的背压机制是什么?
当消费者 (reader.read()) 处理速度慢于生产者时,ReadableStream 自动暂停数据流入,等待消费者准备好,防止内存无限增长。
Q6: 如何在 Nginx 后正确代理 SSE?
nginx
location /api/chat/stream {
proxy_pass http://backend;
proxy_http_version 1.1;
proxy_set_header Connection "";
# 关键:关闭缓冲
proxy_buffering off;
proxy_cache off;
# SSE 特殊处理
chunked_transfer_encoding on;
}