Skip to content

Node.js 多进程与 Worker Threads

1. Node.js 单线程的局限

Node.js 主线程是单线程的,无法利用多核 CPU。

解决方案

方案适用场景特点
clusterWeb Server 多实例多进程,共享端口
child_process执行外部命令/脚本fork/exec/spawn
worker_threadsCPU 密集型任务真正的多线程,共享内存

2. Cluster 模块

原理

  • Master 进程 fork 多个 Worker 进程
  • Worker 共享同一个 TCP 端口 (通过 IPC + 负载均衡)
  • 默认使用 轮询 (Round-Robin) 分发请求

示例

javascript
const cluster = require('cluster');
const http = require('http');
const numCPUs = require('os').cpus().length;

if (cluster.isMaster) {
    console.log(`Master ${process.pid} is running`);
    
    // Fork workers
    for (let i = 0; i < numCPUs; i++) {
        cluster.fork();
    }
    
    cluster.on('exit', (worker, code, signal) => {
        console.log(`Worker ${worker.process.pid} died`);
        cluster.fork(); // 重新拉起
    });
} else {
    http.createServer((req, res) => {
        res.end(`Hello from ${process.pid}`);
    }).listen(8000);
    
    console.log(`Worker ${process.pid} started`);
}

PM2

生产环境推荐使用 PM2 管理 Cluster:

bash
pm2 start app.js -i max  # 自动根据 CPU 核数启动

3. Worker Threads

与 Cluster 的区别

ClusterWorker Threads
模型多进程多线程
内存隔离可共享 (SharedArrayBuffer)
通信IPC (序列化)postMessage 或共享内存
开销大 (进程切换)小 (线程切换)
适用Web Server 扩展CPU 密集计算

基本用法

javascript
// main.js
const { Worker } = require('worker_threads');

const worker = new Worker('./worker.js', {
    workerData: { input: 1000000 }
});

worker.on('message', (result) => {
    console.log('Result:', result);
});

worker.on('error', (err) => console.error(err));
worker.on('exit', (code) => console.log(`Worker exited: ${code}`));
javascript
// worker.js
const { workerData, parentPort } = require('worker_threads');

function heavyComputation(n) {
    let sum = 0;
    for (let i = 0; i < n; i++) sum += i;
    return sum;
}

const result = heavyComputation(workerData.input);
parentPort.postMessage(result);

SharedArrayBuffer 共享内存

javascript
// main.js
const { Worker } = require('worker_threads');

const sharedBuffer = new SharedArrayBuffer(4);
const sharedArray = new Int32Array(sharedBuffer);
sharedArray[0] = 0;

const worker = new Worker('./worker.js', {
    workerData: { sharedBuffer }
});

// worker.js
const { workerData } = require('worker_threads');
const sharedArray = new Int32Array(workerData.sharedBuffer);
Atomics.add(sharedArray, 0, 1); // 原子操作

4. IPC 通信

进程间通信方式

  1. message: process.send() / process.on('message')
  2. 管道 (Pipe): spawn 的 stdio 选项
  3. 共享内存: SharedArrayBuffer (Worker Threads)
  4. Socket: Unix Domain Socket / TCP

避免阻塞 Event Loop

javascript
// 错误: 在主线程做 CPU 密集操作
app.get('/heavy', (req, res) => {
    const result = heavySync(); // 阻塞!
    res.json(result);
});

// 正确: 使用 Worker
app.get('/heavy', async (req, res) => {
    const result = await runInWorker(heavySync);
    res.json(result);
});

前端面试知识库