Skip to content

生态工具与库

🌟 推荐:不想手动解析看这里

4.1 为什么需要第三方库

虽然原生 fetch() + ReadableStream 可以实现 POST SSE,但手动解析存在以下痛点:

javascript
// 手动解析的复杂度
while (true) {
    const { done, value } = await reader.read();
    if (done) break;

    buffer += decoder.decode(value, { stream: true });
    const lines = buffer.split('\n');
    buffer = lines.pop() || '';  // 处理粘包

    for (const line of lines) {
        if (line.startsWith('data: ')) {
            // 解析 JSON、处理错误、维护状态...
        }
    }
}

痛点:

  • 需要处理 TCP 粘包/分包
  • 手动管理 buffer
  • 解析 SSE 多种字段(data/event/id/retry)
  • 错误处理、重连逻辑复杂

4.2 通用 SSE 库

4.2.1 fetch-event-source

🌟 最流行的 POST SSE 客户端库,专为 Fetch API 设计

bash
npm install @microsoft/fetch-event-source
typescript
import { fetchEventSource } from '@microsoft/fetch-event-source';

await fetchEventSource('/api/chat', {
    method: 'POST',
    headers: {
        'Content-Type': 'application/json',
        'Authorization': 'Bearer xxx'
    },
    body: JSON.stringify({
        messages: [{ role: 'user', content: 'Hello' }]
    }),

    // ✅ 自动解析 SSE 格式
    onmessage(event) {
        const data = JSON.parse(event.data);
        console.log('Received:', data);
    },

    // ✅ 自动处理错误
    onerror(err) {
        console.error('Stream error:', err);
        throw err; // 停止重连
    },

    // ✅ 内置重连机制
    onopen(response) {
        if (response.ok && response.headers.get('content-type')?.includes('text/event-stream')) {
            return; // 正常连接
        }
        throw new Error('Invalid response');
    }
});

特性:

  • ✅ 支持 POST/自定义 Headers
  • ✅ 自动解析 SSE 格式
  • ✅ 内置重连机制
  • ✅ TypeScript 支持
  • ✅ 处理粘包/分包

4.2.2 sse.js / eventsource-parser

bash
npm install eventsource-parser
typescript
import { createParser, ParsedEvent } from 'eventsource-parser';

const response = await fetch('/api/chat', {
    method: 'POST',
    body: JSON.stringify({ prompt: 'Hello' })
});

const parser = createParser((event: ParsedEvent) => {
    if (event.type === 'event') {
        console.log('Event:', event.data);
    }
});

const reader = response.body!.getReader();
const decoder = new TextDecoder();

while (true) {
    const { done, value } = await reader.read();
    if (done) break;

    // ✅ 只需喂数据给 parser,它自动处理分包
    parser.feed(decoder.decode(value));
}

适用场景:

  • 只需要解析器,不需要重连等高级特性
  • 轻量级(~2KB)

4.3 LLM 专用库

4.3.1 Vercel AI SDK

🚀 最强大的 LLM 流式处理库,开箱即用

bash
npm install ai

客户端 (React):

tsx
import { useChat } from 'ai/react';

export function ChatComponent() {
    const { messages, input, handleInputChange, handleSubmit, isLoading, stop } = useChat({
        api: '/api/chat'  // ✅ 自动处理 POST + SSE 解析
    });

    return (
        <div>
            {messages.map(msg => (
                <div key={msg.id}>{msg.content}</div>
            ))}

            <form onSubmit={handleSubmit}>
                <input value={input} onChange={handleInputChange} />
                {isLoading ? (
                    <button onClick={stop}>停止</button>
                ) : (
                    <button type="submit">发送</button>
                )}
            </form>
        </div>
    );
}

服务端 (Next.js):

typescript
// app/api/chat/route.ts
import { OpenAIStream, StreamingTextResponse } from 'ai';
import OpenAI from 'openai';

export const runtime = 'edge';

const openai = new OpenAI({ apiKey: process.env.OPENAI_API_KEY });

export async function POST(req: Request) {
    const { messages } = await req.json();

    const response = await openai.chat.completions.create({
        model: 'gpt-4',
        messages,
        stream: true,
    });

    // ✅ 自动转换 OpenAI 流为标准 SSE
    const stream = OpenAIStream(response);
    return new StreamingTextResponse(stream);
}

特性:

  • ✅ React/Vue/Svelte Hooks
  • ✅ 自动处理消息状态
  • ✅ 内置 AbortController
  • ✅ 支持流式 UI 更新
  • ✅ 适配多种 LLM(OpenAI/Anthropic/Cohere)

4.3.2 OpenAI 官方 SDK

bash
npm install openai
typescript
import OpenAI from 'openai';

const openai = new OpenAI({ apiKey: 'xxx' });

// ✅ 自动处理流式响应
const stream = await openai.chat.completions.create({
    model: 'gpt-4',
    messages: [{ role: 'user', content: 'Hello' }],
    stream: true,
});

for await (const chunk of stream) {
    const content = chunk.choices[0]?.delta?.content || '';
    process.stdout.write(content);  // 打字机效果
}

浏览器端使用:

typescript
import OpenAI from 'openai';

const openai = new OpenAI({
    apiKey: 'xxx',
    dangerouslyAllowBrowser: true  // ⚠️ 仅开发环境
});

const stream = await openai.chat.completions.create({
    model: 'gpt-4',
    messages: [{ role: 'user', content: 'Hello' }],
    stream: true,
});

// ✅ 使用 async iterator
for await (const part of stream) {
    console.log(part.choices[0]?.delta?.content || '');
}

4.4 框架集成方案

4.4.1 React Query + Streaming

typescript
import { useMutation } from '@tanstack/react-query';

function useStreamChat() {
    return useMutation({
        mutationFn: async (prompt: string) => {
            const response = await fetch('/api/chat', {
                method: 'POST',
                body: JSON.stringify({ prompt })
            });

            return response.body!.getReader();
        },
        onSuccess: (reader) => {
            // 流式读取
            readStream(reader);
        }
    });
}

4.4.2 SWR + Streaming

typescript
import useSWRMutation from 'swr/mutation';

async function sendMessage(url: string, { arg }: { arg: string }) {
    const response = await fetch(url, {
        method: 'POST',
        body: JSON.stringify({ message: arg })
    });

    return response.body!.getReader();
}

function ChatComponent() {
    const { trigger, data } = useSWRMutation('/api/chat', sendMessage);

    // 触发流式请求
    const handleSend = () => trigger('Hello');

    return <button onClick={handleSend}>发送</button>;
}

4.5 工具库对比

┌─────────────────────────────────────────────────────────────────────────┐
│                        SSE 库对比选型                                    │
├─────────────────────────────────────────────────────────────────────────┤
│                                                                         │
│  库名                 │ 大小  │ POST │ 重连 │ 解析 │ 推荐场景           │
│  ────────────────────┼──────┼─────┼─────┼─────┼─────────────────────  │
│  EventSource (原生)   │  -   │  ❌  │  ✅  │  ✅  │ 简单 GET 通知      │
│  fetch (原生)         │  -   │  ✅  │  ❌  │  ❌  │ 需完全自定义       │
│  @microsoft/fetch-    │ 4KB  │  ✅  │  ✅  │  ✅  │ 通用 POST SSE      │
│    event-source       │      │      │      │      │                    │
│  eventsource-parser   │ 2KB  │  ✅  │  ❌  │  ✅  │ 只需解析器         │
│  Vercel AI SDK        │ 50KB │  ✅  │  ✅  │  ✅  │ LLM 全栈方案       │
│  OpenAI SDK           │ 80KB │  ✅  │  ✅  │  ✅  │ OpenAI 专用        │
│                                                                         │
│  选择建议:                                                              │
│  • 简单通知推送  → EventSource (原生)                                   │
│  • 通用 POST SSE → @microsoft/fetch-event-source                       │
│  • LLM 应用      → Vercel AI SDK (推荐) 或 OpenAI SDK                  │
│  • 极致轻量     → eventsource-parser                                    │
│                                                                         │
└─────────────────────────────────────────────────────────────────────────┘

4.6 生产级最佳实践

typescript
// 使用 @microsoft/fetch-event-source 的完整示例
import { fetchEventSource, EventSourceMessage } from '@microsoft/fetch-event-source';

class ChatService {
    private controller: AbortController | null = null;

    async streamChat(
        messages: Message[],
        onToken: (token: string) => void,
        onError?: (error: Error) => void
    ) {
        this.abort(); // 取消之前的请求
        this.controller = new AbortController();

        try {
            await fetchEventSource('/api/chat', {
                method: 'POST',
                headers: {
                    'Content-Type': 'application/json',
                    'Authorization': `Bearer ${getToken()}`
                },
                body: JSON.stringify({ messages }),
                signal: this.controller.signal,

                // ✅ 连接建立
                async onopen(response) {
                    if (!response.ok) {
                        throw new Error(`HTTP ${response.status}: ${await response.text()}`);
                    }
                },

                // ✅ 接收消息(自动解析 SSE)
                onmessage(event: EventSourceMessage) {
                    if (event.data === '[DONE]') {
                        return;
                    }

                    try {
                        const data = JSON.parse(event.data);
                        const token = data.choices?.[0]?.delta?.content || '';
                        if (token) onToken(token);
                    } catch (e) {
                        console.warn('Parse error:', event.data);
                    }
                },

                // ✅ 错误处理
                onerror(err) {
                    onError?.(err);
                    throw err; // 停止重连
                },

                // ✅ 关闭连接
                onclose() {
                    console.log('Stream closed');
                }
            });
        } catch (error: any) {
            if (error.name !== 'AbortError') {
                throw error;
            }
        } finally {
            this.controller = null;
        }
    }

    abort() {
        this.controller?.abort();
    }
}

// 使用
const chat = new ChatService();

await chat.streamChat(
    [{ role: 'user', content: 'Hello' }],
    (token) => {
        document.getElementById('output')!.textContent += token;
    },
    (error) => {
        console.error('Chat error:', error);
    }
);

前端面试知识库