Fetch Streaming
基于 Fetch API 的流式数据处理,LLM 标准方案
3.1 为什么选择 Fetch Streaming
┌─────────────────────────────────────────────────────────────────────────┐
│ EventSource vs Fetch Streaming │
├─────────────────────────────────────────────────────────────────────────┤
│ 需求 │ EventSource │ Fetch Streaming │
├─────────────────────────────────────────────────────────────────────────┤
│ POST 请求发送 prompt │ ❌ │ ✅ │
│ 自定义 Headers │ ❌ │ ✅ │
│ Authorization 认证 │ ❌ │ ✅ │
│ 取消请求 (AbortController)│ ❌ │ ✅ │
│ 精细流控制 │ ❌ │ ✅ │
│ 自动重连 │ ✅ │ ❌ (需手动) │
└─────────────────────────────────────────────────────────────────────────┘3.2 核心 API 详解
javascript
const response = await fetch('/api/chat', {
method: 'POST',
headers: {
'Content-Type': 'application/json',
'Authorization': 'Bearer sk-xxx'
},
body: JSON.stringify({
model: 'gpt-4',
messages: [{ role: 'user', content: 'Hello' }],
stream: true
})
});
// response.body 是 ReadableStream
const reader = response.body.getReader();
const decoder = new TextDecoder('utf-8');
while (true) {
const { done, value } = await reader.read();
if (done) {
console.log('Stream finished');
break;
}
// Uint8Array → string
const chunk = decoder.decode(value, { stream: true });
console.log('Chunk:', chunk);
}3.3 数据流转过程
┌─────────────────────────────────────────────────────────────────────────┐
│ 数据流转过程 │
├─────────────────────────────────────────────────────────────────────────┤
│ │
│ Server Network fetch() User Code │
│ │ │ │ │ │
│ │──"Hello"────►│──Uint8Array──►│──.read()─────►│ │
│ │ │ [72,101, │ │ │
│ │ │ 108,108, │ │ │
│ │ │ 111] │ │ │
│ │ │ │ │ │
│ │ │ │
│ │ TextDecoder │ │
│ │ .decode() │ │
│ │ │ │ │
│ │ └─►"Hello"────►│ │
│ │
└─────────────────────────────────────────────────────────────────────────┘3.4 TextDecoder 的 stream: true
javascript
// 问题: 多字节字符 (如中文) 可能被拆分到不同 chunk
// Chunk 1: [228, 184] (中的前 2 字节)
// Chunk 2: [173, 230, 150, 135] (中的后 1 字节 + 文)
// ❌ 错误: 每次独立解码
const decoder = new TextDecoder();
decoder.decode([228, 184]); // 乱码 "�"
decoder.decode([173, 230, 150, 135]); // 乱码 "��文"
// ✅ 正确: stream: true 保留不完整序列
const decoder = new TextDecoder();
decoder.decode(chunk1, { stream: true }); // "" (等待)
decoder.decode(chunk2, { stream: true }); // "中文"3.5 完整 SSE 格式解析
javascript
// OpenAI 响应格式:
// data: {"id":"chatcmpl-xxx","choices":[{"delta":{"content":"Hello"}}]}
// data: {"id":"chatcmpl-xxx","choices":[{"delta":{"content":" World"}}]}
// data: [DONE]
async function* streamChat(messages) {
const response = await fetch('https://api.openai.com/v1/chat/completions', {
method: 'POST',
headers: {
'Content-Type': 'application/json',
'Authorization': `Bearer ${API_KEY}`
},
body: JSON.stringify({
model: 'gpt-4',
messages,
stream: true
})
});
if (!response.ok) {
throw new Error(`HTTP ${response.status}: ${await response.text()}`);
}
const reader = response.body.getReader();
const decoder = new TextDecoder();
let buffer = '';
try {
while (true) {
const { done, value } = await reader.read();
if (done) break;
buffer += decoder.decode(value, { stream: true });
// 按行分割
const lines = buffer.split('\n');
buffer = lines.pop() || ''; // 保留不完整的行
for (const line of lines) {
const trimmed = line.trim();
// 跳过空行和注释
if (!trimmed || trimmed.startsWith(':')) continue;
// 解析 data: 前缀
if (trimmed.startsWith('data:')) {
const data = trimmed.slice(5).trim();
// 结束标记
if (data === '[DONE]') return;
try {
const json = JSON.parse(data);
const content = json.choices?.[0]?.delta?.content;
if (content) yield content;
} catch (e) {
console.warn('Parse error:', data);
}
}
}
}
} finally {
reader.releaseLock();
}
}
// 使用
const output = document.getElementById('output');
output.textContent = '';
for await (const token of streamChat([
{ role: 'user', content: '写一首关于春天的诗' }
])) {
output.textContent += token;
}