Stream 与 Buffer
1. Buffer 详解
1.1 什么是 Buffer?
Buffer 是 Node.js 中用于处理二进制数据的类,在处理 TCP 流、文件系统操作等场景必不可少。
javascript
// 创建 Buffer
const buf1 = Buffer.from('Hello'); // 从字符串创建
const buf2 = Buffer.from([0x48, 0x69]); // 从数组创建
const buf3 = Buffer.alloc(10); // 分配 10 字节 (填充 0)
const buf4 = Buffer.allocUnsafe(10); // 分配 10 字节 (不初始化, 更快)
// 基本操作
buf1.toString('utf8'); // 'Hello'
buf1.length; // 5
buf1[0]; // 72 (H 的 ASCII)
Buffer.concat([buf1, buf2]); // 合并
buf1.compare(buf2); // 比较 (-1, 0, 1)1.2 编码支持
| 编码 | 说明 |
|---|---|
utf8 | 默认编码,多字节 Unicode |
ascii | 7-bit ASCII |
base64 | Base64 编码 |
hex | 十六进制 |
binary / latin1 | ISO-8859-1 |
javascript
const buf = Buffer.from('Hello');
buf.toString('hex'); // '48656c6c6f'
buf.toString('base64'); // 'SGVsbG8='
// Base64 解码
Buffer.from('SGVsbG8=', 'base64').toString(); // 'Hello'1.3 内存分配机制 🔥
堆外内存 (Off-heap)
Buffer 的内存不在 V8 堆内存中,而是 C++ 层分配的堆外内存。
┌────────────────────────────────────────────────────────────┐
│ 进程内存空间 │
├─────────────────────────┬──────────────────────────────────┤
│ V8 堆内存 │ 堆外内存 │
│ (JS 对象, ~1.4GB) │ (Buffer, 物理内存限制) │
│ ┌─────────────────┐ │ ┌─────────────────────────┐ │
│ │ 新生代 │ 老生代 │ │ │ Buffer 内存池 (Slab) │ │
│ └─────────────────┘ │ └─────────────────────────┘ │
└─────────────────────────┴──────────────────────────────────┘| 内存特性 | V8 堆 | Buffer (堆外) |
|---|---|---|
| 上限 | ~1.4GB/2GB | 物理内存限制 |
| GC 管理 | V8 GC 直接管理 | V8 GC 触发释放 C++ 内存 |
| 适用场景 | JS 对象 | 二进制数据、大文件 |
Slab 分配机制
针对小块 Buffer (< 8KB),Node.js 使用预分配的 Slab 内存池 (8KB):
┌───────────────────────────────────────┐
│ 8KB Slab │
├─────────┬─────────┬─────────┬─────────┤
│Buffer 1 │Buffer 2 │Buffer 3 │ 空闲 │
│ 100B │ 256B │ 512B │ │
└─────────┴─────────┴─────────┴─────────┘javascript
// Slab 共享示例
const buf1 = Buffer.allocUnsafe(100); // 从 Slab 分配
const buf2 = Buffer.allocUnsafe(200); // 从同一 Slab 分配
// 大 Buffer (>= 8KB) 单独分配
const bigBuf = Buffer.allocUnsafe(10 * 1024); // 独立内存WARNING
Buffer.allocUnsafe() 不会初始化内存,可能包含敏感旧数据!生产环境处理敏感数据时使用 Buffer.alloc()。
2. Stream 四种类型
┌─────────────────────────────────────────────────────────────────────┐
│ Stream 类型 │
├──────────────┬──────────────┬──────────────┬────────────────────────┤
│ Readable │ Writable │ Duplex │ Transform │
│ 可读流 │ 可写流 │ 双工流 │ 转换流 │
├──────────────┼──────────────┼──────────────┼────────────────────────┤
│ fs.readStream│fs.writeStream│ net.Socket │ zlib.createGzip() │
│ http 请求体 │ http 响应体 │ TCP Socket │ crypto.createCipher │
│ process.stdin│process.stdout│ WebSocket │ 自定义数据处理 │
└──────────────┴──────────────┴──────────────┴────────────────────────┘
│ │ │ │
│ │ │ │
└──────────────┴──────────────┴───────────────────┘
│
所有流都继承 EventEmitter2.1 Readable Stream (可读流)
javascript
const fs = require('fs');
// 创建可读流
const readable = fs.createReadStream('file.txt', {
encoding: 'utf8',
highWaterMark: 16 * 1024 // 16KB 缓冲区
});
// 方式一: 事件监听 (流动模式)
readable.on('data', (chunk) => {
console.log(`Received ${chunk.length} bytes`);
});
readable.on('end', () => {
console.log('No more data');
});
// 方式二: 暂停模式 (手动读取)
readable.on('readable', () => {
let chunk;
while ((chunk = readable.read()) !== null) {
console.log(`Read ${chunk.length} bytes`);
}
});两种模式
| 模式 | 触发方式 | 特点 |
|---|---|---|
| 流动模式 (Flowing) | data 事件 / pipe() | 数据自动推送 |
| 暂停模式 (Paused) | read() 手动拉取 | 按需消费 |
2.2 Writable Stream (可写流)
javascript
const writable = fs.createWriteStream('output.txt', {
encoding: 'utf8',
highWaterMark: 16 * 1024
});
// 写入数据
writable.write('Hello ');
writable.write('World');
// 结束写入
writable.end('!\n');
// 完成事件
writable.on('finish', () => {
console.log('Write complete');
});
// 错误处理
writable.on('error', (err) => {
console.error('Write error:', err);
});2.3 Duplex Stream (双工流)
javascript
const { Duplex } = require('stream');
class MyDuplex extends Duplex {
constructor() {
super();
this.data = ['a', 'b', 'c'];
}
_read() {
const item = this.data.shift();
this.push(item || null); // null 表示结束
}
_write(chunk, encoding, callback) {
console.log('Received:', chunk.toString());
callback();
}
}
const duplex = new MyDuplex();
duplex.on('data', (chunk) => console.log('Read:', chunk.toString()));
duplex.write('Hello');2.4 Transform Stream (转换流)
javascript
const { Transform } = require('stream');
// 自定义转换流: 转大写
class UpperCaseTransform extends Transform {
_transform(chunk, encoding, callback) {
this.push(chunk.toString().toUpperCase());
callback();
}
}
// 使用
const upper = new UpperCaseTransform();
process.stdin.pipe(upper).pipe(process.stdout);
// 内置转换流
const zlib = require('zlib');
const gzip = zlib.createGzip();
fs.createReadStream('input.txt')
.pipe(gzip)
.pipe(fs.createWriteStream('input.txt.gz'));3. 背压 (Backpressure) 深度解析 🔥
3.1 问题场景
生产者 (Fast) 消费者 (Slow)
│ │
│ 100 MB/s → │ 10 MB/s
│ │
└────────────────────────┘
内存堆积 → OOM!3.2 核心机制
| 方法 | 返回 false 的含义 |
|---|---|
writable.write(chunk) | 写入缓冲区已达 highWaterMark |
readable.push(chunk) | 读取缓冲区已满 |
CAUTION
highWaterMark 不是内存硬限制,只是"警告线"!超过后仍可写入直到 OOM。
3.3 手动流控
javascript
const readable = fs.createReadStream('large-file.bin');
const writable = fs.createWriteStream('output.bin');
readable.on('data', (chunk) => {
const canContinue = writable.write(chunk);
if (!canContinue) {
console.log('Backpressure! Pausing...');
readable.pause();
}
});
writable.on('drain', () => {
console.log('Drained! Resuming...');
readable.resume();
});
readable.on('end', () => writable.end());3.4 Pipe 内部原理
4. Pipeline API (推荐方式)
4.1 为什么用 pipeline?
| 问题 | pipe() | pipeline() |
|---|---|---|
| 错误处理 | 需要每个流单独监听 | 统一回调 |
| 流清理 | 不自动销毁 | 自动销毁所有流 |
| Promise 支持 | ❌ | ✅ |
4.2 基本用法
javascript
const { pipeline } = require('stream');
const { promisify } = require('util');
const pipelineAsync = promisify(pipeline);
// 回调方式
pipeline(
fs.createReadStream('input.txt'),
zlib.createGzip(),
fs.createWriteStream('input.txt.gz'),
(err) => {
if (err) {
console.error('Pipeline failed:', err);
} else {
console.log('Pipeline succeeded');
}
}
);
// Promise 方式
async function compress() {
await pipelineAsync(
fs.createReadStream('input.txt'),
zlib.createGzip(),
fs.createWriteStream('input.txt.gz')
);
console.log('Compression complete');
}4.3 stream/promises (Node.js 15+)
javascript
const { pipeline } = require('stream/promises');
const { createGzip } = require('zlib');
async function gzipFile(src, dest) {
await pipeline(
fs.createReadStream(src),
createGzip(),
fs.createWriteStream(dest)
);
}5. 异步迭代 (for await...of)
Node.js 10+ 的 Readable 流实现了 Symbol.asyncIterator:
javascript
const fs = require('fs');
const readline = require('readline');
// 方式一: 直接迭代流
async function processFile(filePath) {
const stream = fs.createReadStream(filePath, { encoding: 'utf8' });
for await (const chunk of stream) {
console.log(`Chunk: ${chunk.length} bytes`);
}
}
// 方式二: 逐行读取
async function processLines(filePath) {
const rl = readline.createInterface({
input: fs.createReadStream(filePath),
crlfDelay: Infinity
});
for await (const line of rl) {
console.log(`Line: ${line}`);
}
}6. 实战场景
6.1 HTTP 响应流式处理
javascript
const http = require('http');
http.createServer((req, res) => {
// 大文件流式返回
const stream = fs.createReadStream('large-video.mp4');
stream.on('error', (err) => {
res.statusCode = 500;
res.end('Error loading file');
});
res.setHeader('Content-Type', 'video/mp4');
stream.pipe(res);
}).listen(3000);6.2 上传文件处理
javascript
const http = require('http');
const { pipeline } = require('stream/promises');
http.createServer(async (req, res) => {
if (req.method === 'POST') {
try {
await pipeline(
req,
fs.createWriteStream('upload.bin')
);
res.end('Upload complete');
} catch (err) {
res.statusCode = 500;
res.end('Upload failed');
}
}
}).listen(3000);6.3 CSV 大文件处理
javascript
const { Transform } = require('stream');
class CSVParser extends Transform {
constructor() {
super({ objectMode: true });
this.headers = null;
}
_transform(chunk, encoding, callback) {
const lines = chunk.toString().split('\n');
for (const line of lines) {
if (!line.trim()) continue;
const values = line.split(',');
if (!this.headers) {
this.headers = values;
} else {
const obj = {};
this.headers.forEach((h, i) => obj[h] = values[i]);
this.push(obj); // objectMode: 推送对象
}
}
callback();
}
}
// 使用
fs.createReadStream('data.csv')
.pipe(new CSVParser())
.on('data', (row) => console.log(row));6.4 实时日志处理
javascript
const { Transform } = require('stream');
class LogFilter extends Transform {
constructor(level) {
super();
this.level = level;
}
_transform(chunk, encoding, callback) {
const lines = chunk.toString().split('\n');
const filtered = lines
.filter(line => line.includes(`[${this.level}]`))
.join('\n');
if (filtered) {
this.push(filtered + '\n');
}
callback();
}
}
// 过滤 ERROR 级别日志
fs.createReadStream('app.log')
.pipe(new LogFilter('ERROR'))
.pipe(process.stdout);7. 性能优化技巧
7.1 调整 highWaterMark
javascript
// 大文件传输: 增大缓冲区
const readable = fs.createReadStream('huge.bin', {
highWaterMark: 64 * 1024 // 64KB (默认 16KB)
});
// 内存敏感场景: 减小缓冲区
const readable = fs.createReadStream('data.txt', {
highWaterMark: 4 * 1024 // 4KB
});7.2 objectMode 注意事项
javascript
// Object 模式: 每次 push/write 一个对象
const { Transform } = require('stream');
const objStream = new Transform({
objectMode: true, // highWaterMark = 16 (对象数量)
transform(obj, encoding, callback) {
this.push({ ...obj, processed: true });
callback();
}
});7.3 流的销毁
javascript
const stream = fs.createReadStream('file.txt');
// 手动销毁 (释放资源)
stream.destroy();
// 带错误销毁
stream.destroy(new Error('Something went wrong'));
// 监听销毁
stream.on('close', () => console.log('Stream closed'));8. 面试高频问题
Q1: Stream 相比于一次性读入的优势?
- 内存效率: 不需要将整个文件加载到内存
- 时间效率: 数据到达即可处理,无需等待完整数据
- 组合性: 可通过
pipe()链式组合
Q2: pipe() 和 pipeline() 的区别?
| 特性 | pipe() | pipeline() |
|---|---|---|
| 错误传播 | ❌ 需手动处理 | ✅ 自动传播 |
| 流清理 | ❌ 不自动销毁 | ✅ 自动销毁 |
| Promise | ❌ | ✅ (可 promisify) |
Q3: 什么是背压? 如何处理?
背压: 当消费者速度慢于生产者时,数据在缓冲区堆积。
处理方式:
- 使用
pipe()/pipeline()自动处理 - 监听
drain事件手动控制pause()/resume() - 调整
highWaterMark参数
Q4: Buffer.alloc() vs Buffer.allocUnsafe()?
| 方法 | 初始化 | 性能 | 安全性 |
|---|---|---|---|
alloc() | 填充 0 | 较慢 | 安全 |
allocUnsafe() | 不初始化 | 较快 | 可能泄露旧数据 |
Q5: highWaterMark 是硬限制吗?
不是。它只是一个"建议值",超过后 write() 返回 false 作为信号,但仍允许写入直到内存耗尽。
Q6: 如何实现自定义转换流?
javascript
const { Transform } = require('stream');
class MyTransform extends Transform {
_transform(chunk, encoding, callback) {
// 处理数据并推送
this.push(processedData);
callback(); // 或 callback(error)
}
_flush(callback) {
// 流结束前的最后处理
this.push(finalData);
callback();
}
}Q7: 流的错误处理最佳实践?
javascript
// ❌ 错误做法: 只监听第一个流
readStream
.pipe(transform)
.pipe(writeStream)
.on('error', handleError);
// ✅ 正确做法: 使用 pipeline
const { pipeline } = require('stream/promises');
try {
await pipeline(readStream, transform, writeStream);
} catch (err) {
// 统一错误处理,所有流自动销毁
}