Skip to content

Node.js 稳定性与高可用

生产环境 Node.js 应用的稳定性保障体系

目录


优雅退出

问题背景

Kubernetes 滚动更新时向 Pod 发送 SIGTERM 信号。如果直接 process.exit(),正在处理的请求会失败 (502 错误)。

标准实现流程 🔥

javascript
const http = require('http');

const server = http.createServer((req, res) => {
    // 业务逻辑
    res.end('Hello');
});

server.listen(3000);

// 优雅退出
process.on('SIGTERM', shutdown);
process.on('SIGINT', shutdown);

async function shutdown() {
    console.log('Received shutdown signal, gracefully shutting down...');
    
    // Step 1: 停止接收新请求
    server.close(() => {
        console.log('All connections closed, exiting.');
        process.exit(0);
    });
    
    // Step 2: 兜底强制退出
    setTimeout(() => {
        console.error('Forcing shutdown after timeout');
        process.exit(1);
    }, 10000);
}

Keep-Alive 长连接问题 🔥

问题: server.close() 只停止监听端口,不会切断已建立的连接

Keep-Alive 连接可能长时间保持,导致:

  • server.close() 回调永不执行
  • Pod 无法正常销毁
  • K8s 最终发送 SIGKILL 强杀

解决方案 A: 手动追踪连接

javascript
const connections = new Set();

server.on('connection', (socket) => {
    connections.add(socket);
    socket.on('close', () => connections.delete(socket));
});

async function shutdown() {
    console.log('Shutting down...');
    
    // 停止接收新请求
    server.close();
    
    // 处理现有连接
    for (const socket of connections) {
        // 标记关闭,让客户端知道不要复用连接
        socket.end();
        
        // 设置超时强制销毁
        setTimeout(() => {
            if (!socket.destroyed) {
                socket.destroy();
            }
        }, 5000);
    }
}

解决方案 B: 设置响应头

javascript
server.on('request', (req, res) => {
    if (isShuttingDown) {
        res.setHeader('Connection', 'close');
    }
    // 正常处理请求...
});

解决方案 C: 使用现成库 (推荐)

javascript
// http-terminator (推荐)
const { createHttpTerminator } = require('http-terminator');

const terminator = createHttpTerminator({ server });

process.on('SIGTERM', async () => {
    await terminator.terminate();
    process.exit(0);
});

完整最佳实践 🔥

javascript
const http = require('http');
const { createHttpTerminator } = require('http-terminator');

// 创建服务
const server = http.createServer(app);
const terminator = createHttpTerminator({
    server,
    gracefulTerminationTimeout: 10000  // 10秒超时
});

server.listen(3000);

// 健康检查状态
let isShuttingDown = false;

// 健康检查端点 (K8s readinessProbe)
app.get('/health', (req, res) => {
    if (isShuttingDown) {
        return res.status(503).json({ status: 'shutting down' });
    }
    res.json({ status: 'healthy' });
});

// 优雅退出
async function gracefulShutdown(signal) {
    console.log(`Received ${signal}, starting graceful shutdown...`);
    
    // 1. 标记为关闭中 (健康检查返回 503)
    isShuttingDown = true;
    
    // 2. 等待 K8s 从负载均衡中移除 (preStop hook 延迟)
    await sleep(5000);
    
    // 3. 关闭服务器连接
    console.log('Closing HTTP connections...');
    await terminator.terminate();
    
    // 4. 关闭数据库连接等资源
    console.log('Closing database connections...');
    await db.close();
    
    // 5. 退出
    console.log('Graceful shutdown complete');
    process.exit(0);
}

process.on('SIGTERM', () => gracefulShutdown('SIGTERM'));
process.on('SIGINT', () => gracefulShutdown('SIGINT'));

function sleep(ms) {
    return new Promise(resolve => setTimeout(resolve, ms));
}

Kubernetes 配置

yaml
apiVersion: v1
kind: Pod
spec:
  containers:
    - name: app
      lifecycle:
        preStop:
          exec:
            command: ["sleep", "5"]  # 等待从 Service 移除
      terminationGracePeriodSeconds: 30
配置项说明
preStopSIGTERM 前执行,给 LB 时间移除 Pod
terminationGracePeriodSeconds最长等待时间,超时发 SIGKILL

进程管理

PM2 集群模式

javascript
// ecosystem.config.js
module.exports = {
  apps: [{
    name: 'api-server',
    script: './dist/main.js',
    instances: 'max',           // 根据 CPU 核心数启动
    exec_mode: 'cluster',       // 集群模式
    
    // 自动重启
    watch: false,
    max_memory_restart: '1G',   // 内存超限重启
    restart_delay: 3000,        // 重启间隔
    max_restarts: 10,           // 最大重启次数
    min_uptime: '5s',           // 最小运行时间
    
    // 环境变量
    env: {
      NODE_ENV: 'production',
      PORT: 3000
    }
  }]
};

Cluster 模块原生实现

typescript
import cluster from 'cluster';
import os from 'os';

if (cluster.isPrimary) {
  const numCPUs = os.cpus().length;
  
  console.log(`Primary ${process.pid} is running`);
  
  // Fork workers
  for (let i = 0; i < numCPUs; i++) {
    cluster.fork();
  }
  
  // 监听 worker 退出,自动重启
  cluster.on('exit', (worker, code, signal) => {
    console.log(`Worker ${worker.process.pid} died (${signal || code})`);
    
    // 延迟重启,避免频繁崩溃
    setTimeout(() => {
      cluster.fork();
    }, 1000);
  });
  
  // 优雅重启
  process.on('SIGUSR2', () => {
    const workers = Object.values(cluster.workers || {});
    
    const restartWorker = (index: number) => {
      if (index >= workers.length) return;
      
      const worker = workers[index];
      worker?.disconnect();
      
      worker?.on('disconnect', () => {
        const newWorker = cluster.fork();
        newWorker.on('listening', () => {
          restartWorker(index + 1);
        });
      });
    };
    
    restartWorker(0);
  });
  
} else {
  // Worker 进程运行服务
  startServer();
}

健康检查

多层次健康检查

typescript
interface HealthStatus {
  status: 'healthy' | 'degraded' | 'unhealthy';
  checks: Record<string, {
    status: 'ok' | 'error';
    latency?: number;
    message?: string;
  }>;
}

async function healthCheck(): Promise<HealthStatus> {
  const checks: HealthStatus['checks'] = {};
  let hasError = false;
  
  // 数据库检查
  const dbStart = Date.now();
  try {
    await prisma.$queryRaw`SELECT 1`;
    checks.database = { status: 'ok', latency: Date.now() - dbStart };
  } catch (error: any) {
    checks.database = { status: 'error', message: error.message };
    hasError = true;
  }
  
  // Redis 检查
  const redisStart = Date.now();
  try {
    await redis.ping();
    checks.redis = { status: 'ok', latency: Date.now() - redisStart };
  } catch (error: any) {
    checks.redis = { status: 'error', message: error.message };
    hasError = true;
  }
  
  // 内存检查
  const memUsage = process.memoryUsage();
  const heapUsedPct = memUsage.heapUsed / memUsage.heapTotal;
  checks.memory = {
    status: heapUsedPct < 0.9 ? 'ok' : 'error',
    message: `Heap: ${(heapUsedPct * 100).toFixed(1)}%`
  };
  
  return {
    status: hasError ? 'unhealthy' : 'healthy',
    checks
  };
}

// 健康检查端点
app.get('/health', async (req, res) => {
  const health = await healthCheck();
  const statusCode = health.status === 'healthy' ? 200 : 503;
  res.status(statusCode).json(health);
});

// 存活探针(轻量)
app.get('/health/live', (req, res) => {
  res.sendStatus(200);
});

// 就绪探针(完整检查)
app.get('/health/ready', async (req, res) => {
  const health = await healthCheck();
  res.status(health.status === 'healthy' ? 200 : 503).json(health);
});

Kubernetes 配置

yaml
livenessProbe:
  httpGet:
    path: /health/live
    port: 3000
  initialDelaySeconds: 10
  periodSeconds: 10
  failureThreshold: 3

readinessProbe:
  httpGet:
    path: /health/ready
    port: 3000
  initialDelaySeconds: 5
  periodSeconds: 5
  failureThreshold: 3

内存泄漏防治

常见泄漏场景

typescript
// ❌ 全局缓存无限增长
const cache = new Map();
app.get('/data/:id', (req, res) => {
  const data = fetchData(req.params.id);
  cache.set(req.params.id, data); // 永不清理
  res.json(data);
});

// ✅ 使用 LRU 缓存
import LRU from 'lru-cache';
const cache = new LRU({ max: 500, ttl: 1000 * 60 * 5 });

// ❌ 事件监听器未移除
class MyEmitter extends EventEmitter {}
const emitter = new MyEmitter();

function handleRequest(req, res) {
  const handler = (data) => res.json(data);
  emitter.on('data', handler); // 每次请求都添加,从不移除
}

// ✅ 及时移除监听器
function handleRequest(req, res) {
  const handler = (data) => {
    res.json(data);
    emitter.off('data', handler); // 用完移除
  };
  emitter.once('data', handler); // 或使用 once
}

// ❌ 闭包引用大对象
function processData() {
  const largeData = loadHugeFile();
  
  setInterval(() => {
    console.log(largeData.length); // 闭包持有引用
  }, 1000);
}

内存监控

typescript
import v8 from 'v8';

// 定期打印内存状态
setInterval(() => {
  const heapStats = v8.getHeapStatistics();
  const memUsage = process.memoryUsage();
  
  console.log({
    heapUsed: `${(memUsage.heapUsed / 1024 / 1024).toFixed(2)} MB`,
    heapTotal: `${(memUsage.heapTotal / 1024 / 1024).toFixed(2)} MB`,
    external: `${(memUsage.external / 1024 / 1024).toFixed(2)} MB`,
    heapUsedPct: `${(heapStats.used_heap_size / heapStats.heap_size_limit * 100).toFixed(1)}%`
  });
  
  // 内存超限告警
  if (memUsage.heapUsed / memUsage.heapTotal > 0.9) {
    console.error('ALERT: Memory usage exceeds 90%');
    // 触发堆快照
    v8.writeHeapSnapshot();
  }
}, 30000);

生产环境内存分析

bash
# 生成堆快照
kill -USR2 <pid>

# 或使用 inspector
node --inspect dist/main.js

# Chrome DevTools 分析
chrome://inspect

限流与熔断

请求限流

typescript
import rateLimit from 'express-rate-limit';
import RedisStore from 'rate-limit-redis';

// 基于 IP 限流
const limiter = rateLimit({
  store: new RedisStore({
    sendCommand: (...args: string[]) => redis.call(...args)
  }),
  windowMs: 60 * 1000,       // 1 分钟
  max: 100,                   // 最多 100 请求
  message: { error: 'Too many requests' },
  standardHeaders: true,
  legacyHeaders: false
});

app.use('/api', limiter);

// 基于用户限流
const userLimiter = rateLimit({
  keyGenerator: (req) => req.user?.id || req.ip,
  windowMs: 60 * 1000,
  max: 50
});

熔断器模式

typescript
enum CircuitState {
  CLOSED = 'CLOSED',     // 正常
  OPEN = 'OPEN',         // 熔断
  HALF_OPEN = 'HALF_OPEN' // 半开(尝试恢复)
}

class CircuitBreaker {
  private state = CircuitState.CLOSED;
  private failures = 0;
  private successes = 0;
  private lastFailureTime = 0;
  
  constructor(
    private readonly threshold = 5,      // 失败阈值
    private readonly timeout = 30000,    // 熔断时间
    private readonly halfOpenMax = 3     // 半开最大尝试
  ) {}

  async execute<T>(fn: () => Promise<T>): Promise<T> {
    if (this.state === CircuitState.OPEN) {
      if (Date.now() - this.lastFailureTime > this.timeout) {
        this.state = CircuitState.HALF_OPEN;
        this.successes = 0;
      } else {
        throw new Error('Circuit is OPEN');
      }
    }

    try {
      const result = await fn();
      this.onSuccess();
      return result;
    } catch (error) {
      this.onFailure();
      throw error;
    }
  }

  private onSuccess() {
    if (this.state === CircuitState.HALF_OPEN) {
      this.successes++;
      if (this.successes >= this.halfOpenMax) {
        this.state = CircuitState.CLOSED;
        this.failures = 0;
      }
    }
    this.failures = 0;
  }

  private onFailure() {
    this.failures++;
    this.lastFailureTime = Date.now();
    
    if (this.failures >= this.threshold) {
      this.state = CircuitState.OPEN;
    }
  }
}

// 使用
const dbCircuit = new CircuitBreaker();

app.get('/data', async (req, res) => {
  try {
    const data = await dbCircuit.execute(() => prisma.data.findMany());
    res.json(data);
  } catch (error) {
    res.status(503).json({ error: 'Service temporarily unavailable' });
  }
});

日志与追踪

结构化日志

typescript
import pino from 'pino';

const logger = pino({
  level: process.env.LOG_LEVEL || 'info',
  formatters: {
    level: (label) => ({ level: label })
  },
  timestamp: () => `,"time":"${new Date().toISOString()}"`
});

// 请求日志中间件
app.use((req, res, next) => {
  const requestId = req.headers['x-request-id'] || crypto.randomUUID();
  const startTime = Date.now();
  
  // 绑定到请求上下文
  req.log = logger.child({ requestId });
  
  res.on('finish', () => {
    req.log.info({
      method: req.method,
      url: req.url,
      statusCode: res.statusCode,
      duration: Date.now() - startTime
    });
  });
  
  next();
});

分布式追踪 (OpenTelemetry)

typescript
import { NodeSDK } from '@opentelemetry/sdk-node';
import { getNodeAutoInstrumentations } from '@opentelemetry/auto-instrumentations-node';
import { OTLPTraceExporter } from '@opentelemetry/exporter-trace-otlp-http';

const sdk = new NodeSDK({
  traceExporter: new OTLPTraceExporter({
    url: 'http://jaeger:4318/v1/traces'
  }),
  instrumentations: [getNodeAutoInstrumentations()]
});

sdk.start();

// 手动创建 Span
import { trace } from '@opentelemetry/api';

const tracer = trace.getTracer('my-service');

async function handleRequest(req: Request) {
  const span = tracer.startSpan('handle-request');
  
  try {
    span.setAttribute('user.id', req.userId);
    const result = await processRequest(req);
    span.setStatus({ code: SpanStatusCode.OK });
    return result;
  } catch (error) {
    span.setStatus({ code: SpanStatusCode.ERROR, message: error.message });
    throw error;
  } finally {
    span.end();
  }
}

高频面试题

Q1: Node.js 如何实现零停机部署?

  1. PM2 Cluster 模式pm2 reload 滚动重启
  2. K8s Rolling Update:逐步替换 Pod
  3. 优雅退出:处理完现有请求再关闭

Q2: 如何检测和定位内存泄漏?

  1. 监控堆内存趋势:持续增长是泄漏信号
  2. 生成堆快照v8.writeHeapSnapshot()
  3. 对比分析:Chrome DevTools 比较两次快照
  4. 审查代码:全局变量、事件监听、闭包

Q3: 限流和熔断有什么区别?

特性限流熔断
目的控制入流量保护下游依赖
触发条件请求超过阈值错误率超过阈值
作用范围入口流量特定依赖调用
恢复方式时间窗口重置半开尝试恢复

Q4: 生产环境如何做健康检查?

/health/live   → 存活探针(进程是否运行)
/health/ready  → 就绪探针(是否可接收流量)
/health        → 详细健康状态(依赖检查)

Q5: server.close() 为什么不能立即关闭服务?

server.close() 只停止监听新连接,不会主动关闭已建立的 Keep-Alive 连接。需要:

  1. 追踪活跃连接并手动关闭
  2. 使用 http-terminator 等库处理

稳定性检查清单

  • [ ] 使用 PM2 或 K8s 管理进程
  • [ ] 实现优雅退出(处理 SIGTERM)
  • [ ] 配置健康检查端点
  • [ ] 设置内存上限和自动重启
  • [ ] 使用 LRU 缓存替代无限 Map
  • [ ] 实现请求限流
  • [ ] 对外部依赖添加熔断
  • [ ] 结构化日志输出
  • [ ] 配置分布式追踪

前端面试知识库