Skip to content

Stream 与 Buffer

1. Buffer 详解

1.1 什么是 Buffer?

Buffer 是 Node.js 中用于处理二进制数据的类,在处理 TCP 流、文件系统操作等场景必不可少。

javascript
// 创建 Buffer
const buf1 = Buffer.from('Hello');            // 从字符串创建
const buf2 = Buffer.from([0x48, 0x69]);       // 从数组创建
const buf3 = Buffer.alloc(10);                // 分配 10 字节 (填充 0)
const buf4 = Buffer.allocUnsafe(10);          // 分配 10 字节 (不初始化, 更快)

// 基本操作
buf1.toString('utf8');      // 'Hello'
buf1.length;                // 5
buf1[0];                    // 72 (H 的 ASCII)
Buffer.concat([buf1, buf2]); // 合并
buf1.compare(buf2);         // 比较 (-1, 0, 1)

1.2 编码支持

编码说明
utf8默认编码,多字节 Unicode
ascii7-bit ASCII
base64Base64 编码
hex十六进制
binary / latin1ISO-8859-1
javascript
const buf = Buffer.from('Hello');
buf.toString('hex');     // '48656c6c6f'
buf.toString('base64');  // 'SGVsbG8='

// Base64 解码
Buffer.from('SGVsbG8=', 'base64').toString();  // 'Hello'

1.3 内存分配机制 🔥

堆外内存 (Off-heap)

Buffer 的内存不在 V8 堆内存中,而是 C++ 层分配的堆外内存。

┌────────────────────────────────────────────────────────────┐
│                        进程内存空间                          │
├─────────────────────────┬──────────────────────────────────┤
│     V8 堆内存            │          堆外内存                 │
│   (JS 对象, ~1.4GB)      │    (Buffer, 物理内存限制)         │
│   ┌─────────────────┐   │   ┌─────────────────────────┐    │
│   │ 新生代 │ 老生代  │   │   │  Buffer 内存池 (Slab)   │    │
│   └─────────────────┘   │   └─────────────────────────┘    │
└─────────────────────────┴──────────────────────────────────┘
内存特性V8 堆Buffer (堆外)
上限~1.4GB/2GB物理内存限制
GC 管理V8 GC 直接管理V8 GC 触发释放 C++ 内存
适用场景JS 对象二进制数据、大文件

Slab 分配机制

针对小块 Buffer (< 8KB),Node.js 使用预分配的 Slab 内存池 (8KB):

┌───────────────────────────────────────┐
│              8KB Slab                 │
├─────────┬─────────┬─────────┬─────────┤
│Buffer 1 │Buffer 2 │Buffer 3 │  空闲   │
│  100B   │  256B   │  512B   │         │
└─────────┴─────────┴─────────┴─────────┘
javascript
// Slab 共享示例
const buf1 = Buffer.allocUnsafe(100);  // 从 Slab 分配
const buf2 = Buffer.allocUnsafe(200);  // 从同一 Slab 分配

// 大 Buffer (>= 8KB) 单独分配
const bigBuf = Buffer.allocUnsafe(10 * 1024);  // 独立内存

WARNING

Buffer.allocUnsafe() 不会初始化内存,可能包含敏感旧数据!生产环境处理敏感数据时使用 Buffer.alloc()


2. Stream 四种类型

┌─────────────────────────────────────────────────────────────────────┐
│                          Stream 类型                                 │
├──────────────┬──────────────┬──────────────┬────────────────────────┤
│   Readable   │   Writable   │    Duplex    │      Transform         │
│   可读流      │    可写流     │    双工流     │       转换流           │
├──────────────┼──────────────┼──────────────┼────────────────────────┤
│ fs.readStream│fs.writeStream│  net.Socket  │   zlib.createGzip()    │
│ http 请求体  │ http 响应体   │  TCP Socket  │   crypto.createCipher  │
│ process.stdin│process.stdout│ WebSocket    │   自定义数据处理        │
└──────────────┴──────────────┴──────────────┴────────────────────────┘
        │              │              │                   │
        │              │              │                   │
        └──────────────┴──────────────┴───────────────────┘

                    所有流都继承 EventEmitter

2.1 Readable Stream (可读流)

javascript
const fs = require('fs');

// 创建可读流
const readable = fs.createReadStream('file.txt', {
    encoding: 'utf8',
    highWaterMark: 16 * 1024  // 16KB 缓冲区
});

// 方式一: 事件监听 (流动模式)
readable.on('data', (chunk) => {
    console.log(`Received ${chunk.length} bytes`);
});

readable.on('end', () => {
    console.log('No more data');
});

// 方式二: 暂停模式 (手动读取)
readable.on('readable', () => {
    let chunk;
    while ((chunk = readable.read()) !== null) {
        console.log(`Read ${chunk.length} bytes`);
    }
});

两种模式

模式触发方式特点
流动模式 (Flowing)data 事件 / pipe()数据自动推送
暂停模式 (Paused)read() 手动拉取按需消费

2.2 Writable Stream (可写流)

javascript
const writable = fs.createWriteStream('output.txt', {
    encoding: 'utf8',
    highWaterMark: 16 * 1024
});

// 写入数据
writable.write('Hello ');
writable.write('World');

// 结束写入
writable.end('!\n');

// 完成事件
writable.on('finish', () => {
    console.log('Write complete');
});

// 错误处理
writable.on('error', (err) => {
    console.error('Write error:', err);
});

2.3 Duplex Stream (双工流)

javascript
const { Duplex } = require('stream');

class MyDuplex extends Duplex {
    constructor() {
        super();
        this.data = ['a', 'b', 'c'];
    }
    
    _read() {
        const item = this.data.shift();
        this.push(item || null);  // null 表示结束
    }
    
    _write(chunk, encoding, callback) {
        console.log('Received:', chunk.toString());
        callback();
    }
}

const duplex = new MyDuplex();
duplex.on('data', (chunk) => console.log('Read:', chunk.toString()));
duplex.write('Hello');

2.4 Transform Stream (转换流)

javascript
const { Transform } = require('stream');

// 自定义转换流: 转大写
class UpperCaseTransform extends Transform {
    _transform(chunk, encoding, callback) {
        this.push(chunk.toString().toUpperCase());
        callback();
    }
}

// 使用
const upper = new UpperCaseTransform();
process.stdin.pipe(upper).pipe(process.stdout);

// 内置转换流
const zlib = require('zlib');
const gzip = zlib.createGzip();

fs.createReadStream('input.txt')
    .pipe(gzip)
    .pipe(fs.createWriteStream('input.txt.gz'));

3. 背压 (Backpressure) 深度解析 🔥

3.1 问题场景

生产者 (Fast)           消费者 (Slow)
   │                        │
   │  100 MB/s     →        │  10 MB/s
   │                        │
   └────────────────────────┘
           内存堆积 → OOM!

3.2 核心机制

方法返回 false 的含义
writable.write(chunk)写入缓冲区已达 highWaterMark
readable.push(chunk)读取缓冲区已满

CAUTION

highWaterMark 不是内存硬限制,只是"警告线"!超过后仍可写入直到 OOM。

3.3 手动流控

javascript
const readable = fs.createReadStream('large-file.bin');
const writable = fs.createWriteStream('output.bin');

readable.on('data', (chunk) => {
    const canContinue = writable.write(chunk);
    
    if (!canContinue) {
        console.log('Backpressure! Pausing...');
        readable.pause();
    }
});

writable.on('drain', () => {
    console.log('Drained! Resuming...');
    readable.resume();
});

readable.on('end', () => writable.end());

3.4 Pipe 内部原理


4. Pipeline API (推荐方式)

4.1 为什么用 pipeline?

问题pipe()pipeline()
错误处理需要每个流单独监听统一回调
流清理不自动销毁自动销毁所有流
Promise 支持

4.2 基本用法

javascript
const { pipeline } = require('stream');
const { promisify } = require('util');
const pipelineAsync = promisify(pipeline);

// 回调方式
pipeline(
    fs.createReadStream('input.txt'),
    zlib.createGzip(),
    fs.createWriteStream('input.txt.gz'),
    (err) => {
        if (err) {
            console.error('Pipeline failed:', err);
        } else {
            console.log('Pipeline succeeded');
        }
    }
);

// Promise 方式
async function compress() {
    await pipelineAsync(
        fs.createReadStream('input.txt'),
        zlib.createGzip(),
        fs.createWriteStream('input.txt.gz')
    );
    console.log('Compression complete');
}

4.3 stream/promises (Node.js 15+)

javascript
const { pipeline } = require('stream/promises');
const { createGzip } = require('zlib');

async function gzipFile(src, dest) {
    await pipeline(
        fs.createReadStream(src),
        createGzip(),
        fs.createWriteStream(dest)
    );
}

5. 异步迭代 (for await...of)

Node.js 10+ 的 Readable 流实现了 Symbol.asyncIterator

javascript
const fs = require('fs');
const readline = require('readline');

// 方式一: 直接迭代流
async function processFile(filePath) {
    const stream = fs.createReadStream(filePath, { encoding: 'utf8' });
    
    for await (const chunk of stream) {
        console.log(`Chunk: ${chunk.length} bytes`);
    }
}

// 方式二: 逐行读取
async function processLines(filePath) {
    const rl = readline.createInterface({
        input: fs.createReadStream(filePath),
        crlfDelay: Infinity
    });
    
    for await (const line of rl) {
        console.log(`Line: ${line}`);
    }
}

6. 实战场景

6.1 HTTP 响应流式处理

javascript
const http = require('http');

http.createServer((req, res) => {
    // 大文件流式返回
    const stream = fs.createReadStream('large-video.mp4');
    
    stream.on('error', (err) => {
        res.statusCode = 500;
        res.end('Error loading file');
    });
    
    res.setHeader('Content-Type', 'video/mp4');
    stream.pipe(res);
}).listen(3000);

6.2 上传文件处理

javascript
const http = require('http');
const { pipeline } = require('stream/promises');

http.createServer(async (req, res) => {
    if (req.method === 'POST') {
        try {
            await pipeline(
                req,
                fs.createWriteStream('upload.bin')
            );
            res.end('Upload complete');
        } catch (err) {
            res.statusCode = 500;
            res.end('Upload failed');
        }
    }
}).listen(3000);

6.3 CSV 大文件处理

javascript
const { Transform } = require('stream');

class CSVParser extends Transform {
    constructor() {
        super({ objectMode: true });
        this.headers = null;
    }
    
    _transform(chunk, encoding, callback) {
        const lines = chunk.toString().split('\n');
        
        for (const line of lines) {
            if (!line.trim()) continue;
            
            const values = line.split(',');
            
            if (!this.headers) {
                this.headers = values;
            } else {
                const obj = {};
                this.headers.forEach((h, i) => obj[h] = values[i]);
                this.push(obj);  // objectMode: 推送对象
            }
        }
        
        callback();
    }
}

// 使用
fs.createReadStream('data.csv')
    .pipe(new CSVParser())
    .on('data', (row) => console.log(row));

6.4 实时日志处理

javascript
const { Transform } = require('stream');

class LogFilter extends Transform {
    constructor(level) {
        super();
        this.level = level;
    }
    
    _transform(chunk, encoding, callback) {
        const lines = chunk.toString().split('\n');
        const filtered = lines
            .filter(line => line.includes(`[${this.level}]`))
            .join('\n');
        
        if (filtered) {
            this.push(filtered + '\n');
        }
        callback();
    }
}

// 过滤 ERROR 级别日志
fs.createReadStream('app.log')
    .pipe(new LogFilter('ERROR'))
    .pipe(process.stdout);

7. 性能优化技巧

7.1 调整 highWaterMark

javascript
// 大文件传输: 增大缓冲区
const readable = fs.createReadStream('huge.bin', {
    highWaterMark: 64 * 1024  // 64KB (默认 16KB)
});

// 内存敏感场景: 减小缓冲区
const readable = fs.createReadStream('data.txt', {
    highWaterMark: 4 * 1024  // 4KB
});

7.2 objectMode 注意事项

javascript
// Object 模式: 每次 push/write 一个对象
const { Transform } = require('stream');

const objStream = new Transform({
    objectMode: true,  // highWaterMark = 16 (对象数量)
    transform(obj, encoding, callback) {
        this.push({ ...obj, processed: true });
        callback();
    }
});

7.3 流的销毁

javascript
const stream = fs.createReadStream('file.txt');

// 手动销毁 (释放资源)
stream.destroy();

// 带错误销毁
stream.destroy(new Error('Something went wrong'));

// 监听销毁
stream.on('close', () => console.log('Stream closed'));

8. 面试高频问题

Q1: Stream 相比于一次性读入的优势?

  1. 内存效率: 不需要将整个文件加载到内存
  2. 时间效率: 数据到达即可处理,无需等待完整数据
  3. 组合性: 可通过 pipe() 链式组合

Q2: pipe() 和 pipeline() 的区别?

特性pipe()pipeline()
错误传播❌ 需手动处理✅ 自动传播
流清理❌ 不自动销毁✅ 自动销毁
Promise✅ (可 promisify)

Q3: 什么是背压? 如何处理?

背压: 当消费者速度慢于生产者时,数据在缓冲区堆积。

处理方式:

  1. 使用 pipe() / pipeline() 自动处理
  2. 监听 drain 事件手动控制 pause() / resume()
  3. 调整 highWaterMark 参数

Q4: Buffer.alloc() vs Buffer.allocUnsafe()?

方法初始化性能安全性
alloc()填充 0较慢安全
allocUnsafe()不初始化较快可能泄露旧数据

Q5: highWaterMark 是硬限制吗?

不是。它只是一个"建议值",超过后 write() 返回 false 作为信号,但仍允许写入直到内存耗尽。

Q6: 如何实现自定义转换流?

javascript
const { Transform } = require('stream');

class MyTransform extends Transform {
    _transform(chunk, encoding, callback) {
        // 处理数据并推送
        this.push(processedData);
        callback();  // 或 callback(error)
    }
    
    _flush(callback) {
        // 流结束前的最后处理
        this.push(finalData);
        callback();
    }
}

Q7: 流的错误处理最佳实践?

javascript
// ❌ 错误做法: 只监听第一个流
readStream
    .pipe(transform)
    .pipe(writeStream)
    .on('error', handleError);

// ✅ 正确做法: 使用 pipeline
const { pipeline } = require('stream/promises');

try {
    await pipeline(readStream, transform, writeStream);
} catch (err) {
    // 统一错误处理,所有流自动销毁
}

前端面试知识库