生态工具与库
🌟 推荐:不想手动解析看这里
4.1 为什么需要第三方库
虽然原生 fetch() + ReadableStream 可以实现 POST SSE,但手动解析存在以下痛点:
javascript
// 手动解析的复杂度
while (true) {
const { done, value } = await reader.read();
if (done) break;
buffer += decoder.decode(value, { stream: true });
const lines = buffer.split('\n');
buffer = lines.pop() || ''; // 处理粘包
for (const line of lines) {
if (line.startsWith('data: ')) {
// 解析 JSON、处理错误、维护状态...
}
}
}痛点:
- 需要处理 TCP 粘包/分包
- 手动管理 buffer
- 解析 SSE 多种字段(data/event/id/retry)
- 错误处理、重连逻辑复杂
4.2 通用 SSE 库
4.2.1 fetch-event-source
🌟 最流行的 POST SSE 客户端库,专为 Fetch API 设计
bash
npm install @microsoft/fetch-event-sourcetypescript
import { fetchEventSource } from '@microsoft/fetch-event-source';
await fetchEventSource('/api/chat', {
method: 'POST',
headers: {
'Content-Type': 'application/json',
'Authorization': 'Bearer xxx'
},
body: JSON.stringify({
messages: [{ role: 'user', content: 'Hello' }]
}),
// ✅ 自动解析 SSE 格式
onmessage(event) {
const data = JSON.parse(event.data);
console.log('Received:', data);
},
// ✅ 自动处理错误
onerror(err) {
console.error('Stream error:', err);
throw err; // 停止重连
},
// ✅ 内置重连机制
onopen(response) {
if (response.ok && response.headers.get('content-type')?.includes('text/event-stream')) {
return; // 正常连接
}
throw new Error('Invalid response');
}
});特性:
- ✅ 支持 POST/自定义 Headers
- ✅ 自动解析 SSE 格式
- ✅ 内置重连机制
- ✅ TypeScript 支持
- ✅ 处理粘包/分包
4.2.2 sse.js / eventsource-parser
bash
npm install eventsource-parsertypescript
import { createParser, ParsedEvent } from 'eventsource-parser';
const response = await fetch('/api/chat', {
method: 'POST',
body: JSON.stringify({ prompt: 'Hello' })
});
const parser = createParser((event: ParsedEvent) => {
if (event.type === 'event') {
console.log('Event:', event.data);
}
});
const reader = response.body!.getReader();
const decoder = new TextDecoder();
while (true) {
const { done, value } = await reader.read();
if (done) break;
// ✅ 只需喂数据给 parser,它自动处理分包
parser.feed(decoder.decode(value));
}适用场景:
- 只需要解析器,不需要重连等高级特性
- 轻量级(~2KB)
4.3 LLM 专用库
4.3.1 Vercel AI SDK
🚀 最强大的 LLM 流式处理库,开箱即用
bash
npm install ai客户端 (React):
tsx
import { useChat } from 'ai/react';
export function ChatComponent() {
const { messages, input, handleInputChange, handleSubmit, isLoading, stop } = useChat({
api: '/api/chat' // ✅ 自动处理 POST + SSE 解析
});
return (
<div>
{messages.map(msg => (
<div key={msg.id}>{msg.content}</div>
))}
<form onSubmit={handleSubmit}>
<input value={input} onChange={handleInputChange} />
{isLoading ? (
<button onClick={stop}>停止</button>
) : (
<button type="submit">发送</button>
)}
</form>
</div>
);
}服务端 (Next.js):
typescript
// app/api/chat/route.ts
import { OpenAIStream, StreamingTextResponse } from 'ai';
import OpenAI from 'openai';
export const runtime = 'edge';
const openai = new OpenAI({ apiKey: process.env.OPENAI_API_KEY });
export async function POST(req: Request) {
const { messages } = await req.json();
const response = await openai.chat.completions.create({
model: 'gpt-4',
messages,
stream: true,
});
// ✅ 自动转换 OpenAI 流为标准 SSE
const stream = OpenAIStream(response);
return new StreamingTextResponse(stream);
}特性:
- ✅ React/Vue/Svelte Hooks
- ✅ 自动处理消息状态
- ✅ 内置 AbortController
- ✅ 支持流式 UI 更新
- ✅ 适配多种 LLM(OpenAI/Anthropic/Cohere)
4.3.2 OpenAI 官方 SDK
bash
npm install openaitypescript
import OpenAI from 'openai';
const openai = new OpenAI({ apiKey: 'xxx' });
// ✅ 自动处理流式响应
const stream = await openai.chat.completions.create({
model: 'gpt-4',
messages: [{ role: 'user', content: 'Hello' }],
stream: true,
});
for await (const chunk of stream) {
const content = chunk.choices[0]?.delta?.content || '';
process.stdout.write(content); // 打字机效果
}浏览器端使用:
typescript
import OpenAI from 'openai';
const openai = new OpenAI({
apiKey: 'xxx',
dangerouslyAllowBrowser: true // ⚠️ 仅开发环境
});
const stream = await openai.chat.completions.create({
model: 'gpt-4',
messages: [{ role: 'user', content: 'Hello' }],
stream: true,
});
// ✅ 使用 async iterator
for await (const part of stream) {
console.log(part.choices[0]?.delta?.content || '');
}4.4 框架集成方案
4.4.1 React Query + Streaming
typescript
import { useMutation } from '@tanstack/react-query';
function useStreamChat() {
return useMutation({
mutationFn: async (prompt: string) => {
const response = await fetch('/api/chat', {
method: 'POST',
body: JSON.stringify({ prompt })
});
return response.body!.getReader();
},
onSuccess: (reader) => {
// 流式读取
readStream(reader);
}
});
}4.4.2 SWR + Streaming
typescript
import useSWRMutation from 'swr/mutation';
async function sendMessage(url: string, { arg }: { arg: string }) {
const response = await fetch(url, {
method: 'POST',
body: JSON.stringify({ message: arg })
});
return response.body!.getReader();
}
function ChatComponent() {
const { trigger, data } = useSWRMutation('/api/chat', sendMessage);
// 触发流式请求
const handleSend = () => trigger('Hello');
return <button onClick={handleSend}>发送</button>;
}4.5 工具库对比
┌─────────────────────────────────────────────────────────────────────────┐
│ SSE 库对比选型 │
├─────────────────────────────────────────────────────────────────────────┤
│ │
│ 库名 │ 大小 │ POST │ 重连 │ 解析 │ 推荐场景 │
│ ────────────────────┼──────┼─────┼─────┼─────┼───────────────────── │
│ EventSource (原生) │ - │ ❌ │ ✅ │ ✅ │ 简单 GET 通知 │
│ fetch (原生) │ - │ ✅ │ ❌ │ ❌ │ 需完全自定义 │
│ @microsoft/fetch- │ 4KB │ ✅ │ ✅ │ ✅ │ 通用 POST SSE │
│ event-source │ │ │ │ │ │
│ eventsource-parser │ 2KB │ ✅ │ ❌ │ ✅ │ 只需解析器 │
│ Vercel AI SDK │ 50KB │ ✅ │ ✅ │ ✅ │ LLM 全栈方案 │
│ OpenAI SDK │ 80KB │ ✅ │ ✅ │ ✅ │ OpenAI 专用 │
│ │
│ 选择建议: │
│ • 简单通知推送 → EventSource (原生) │
│ • 通用 POST SSE → @microsoft/fetch-event-source │
│ • LLM 应用 → Vercel AI SDK (推荐) 或 OpenAI SDK │
│ • 极致轻量 → eventsource-parser │
│ │
└─────────────────────────────────────────────────────────────────────────┘4.6 生产级最佳实践
typescript
// 使用 @microsoft/fetch-event-source 的完整示例
import { fetchEventSource, EventSourceMessage } from '@microsoft/fetch-event-source';
class ChatService {
private controller: AbortController | null = null;
async streamChat(
messages: Message[],
onToken: (token: string) => void,
onError?: (error: Error) => void
) {
this.abort(); // 取消之前的请求
this.controller = new AbortController();
try {
await fetchEventSource('/api/chat', {
method: 'POST',
headers: {
'Content-Type': 'application/json',
'Authorization': `Bearer ${getToken()}`
},
body: JSON.stringify({ messages }),
signal: this.controller.signal,
// ✅ 连接建立
async onopen(response) {
if (!response.ok) {
throw new Error(`HTTP ${response.status}: ${await response.text()}`);
}
},
// ✅ 接收消息(自动解析 SSE)
onmessage(event: EventSourceMessage) {
if (event.data === '[DONE]') {
return;
}
try {
const data = JSON.parse(event.data);
const token = data.choices?.[0]?.delta?.content || '';
if (token) onToken(token);
} catch (e) {
console.warn('Parse error:', event.data);
}
},
// ✅ 错误处理
onerror(err) {
onError?.(err);
throw err; // 停止重连
},
// ✅ 关闭连接
onclose() {
console.log('Stream closed');
}
});
} catch (error: any) {
if (error.name !== 'AbortError') {
throw error;
}
} finally {
this.controller = null;
}
}
abort() {
this.controller?.abort();
}
}
// 使用
const chat = new ChatService();
await chat.streamChat(
[{ role: 'user', content: 'Hello' }],
(token) => {
document.getElementById('output')!.textContent += token;
},
(error) => {
console.error('Chat error:', error);
}
);