Skip to content

Kafka 消息队列

Kafka 核心概念、分区、副本与最佳实践

Kafka 概述

特点

  • 高吞吐量,单机可达百万条消息/秒
  • 分布式,支持水平扩展
  • 消息持久化到磁盘
  • 消息消费后仍保留(可重放)
  • 支持流处理(Kafka Streams)

核心概念

┌─────────────────────────────────────────────────────────────┐
│                      Kafka 架构                              │
├─────────────────────────────────────────────────────────────┤
│                                                             │
│   Producer ──┬──▶ Partition 0 ──┬──▶ Consumer Group A       │
│              │                  │     (Consumer 1, 2)        │
│              │                  │                            │
│              ├──▶ Partition 1 ──┤──▶ Consumer Group B       │
│              │                  │     (Consumer 1)           │
│              │                  │                            │
│              └──▶ Partition 2 ──┴──▶ Consumer Group C       │
│                                              (Consumer 1, 2) │
│                                                             │
│   Zookeeper / KRaft ─── 集群管理                              │
└─────────────────────────────────────────────────────────────┘
概念说明
Topic消息主题,逻辑分类
Partition分区,物理存储单位
Replica副本,保证高可用
Producer消息生产者
Consumer消息消费者
Consumer Group消费组,同组内负载均衡
Offset消费位移,记录消费位置

基本操作

创建 Topic

bash
# 创建 topic
kafka-topics.sh --create --topic user-events \
  --bootstrap-server localhost:9092 \
  --partitions 3 \
  --replication-factor 2

# 查看 topic 列表
kafka-topics.sh --list --bootstrap-server localhost:9092

# 查看 topic 详情
kafka-topics.sh --describe --topic user-events \
  --bootstrap-server localhost:9092

生产消息

javascript
import { Kafka } from 'kafkajs'

const kafka = new Kafka({
  clientId: 'my-app',
  brokers: ['localhost:9092']
})

const producer = kafka.producer()

await producer.connect()

// 发送单条消息
await producer.send({
  topic: 'user-events',
  messages: [
    {
      key: 'user-1',
      value: JSON.stringify({
        event: 'user.created',
        userId: 1,
        timestamp: new Date()
      })
    }
  ]
})

// 批量发送
await producer.send({
  topic: 'user-events',
  messages: [
    { key: 'user-1', value: 'message 1' },
    { key: 'user-2', value: 'message 2' },
    { key: 'user-3', value: 'message 3' }
  ]
})

await producer.disconnect()

消费消息

javascript
const consumer = kafka.consumer({ groupId: 'my-group' })

await consumer.connect()
await consumer.subscribe({ topic: 'user-events', fromBeginning: false })

await consumer.run({
  eachMessage: async ({ topic, partition, message }) => {
    console.log({
      key: message.key?.toString(),
      value: message.value?.toString(),
      partition,
      offset: message.offset
    })
  }
})

// 或 eachBatch
await consumer.run({
  eachBatch: async ({ batch, resolveOffset, heartbeat, commitOffsetsIfNecessary }) => {
    console.log(`Received batch with ${batch.messages.length} messages`)

    for (const message of batch.messages) {
      const data = JSON.parse(message.value.toString())
      await processMessage(data)
      resolveOffset(message.offset)
    }

    await heartbeat()
    await commitOffsetsIfNecessary()
  }
})

消费者组

javascript
const consumer = kafka.consumer({
  groupId: 'order-processor',
  sessionTimeout: 30000,
  heartbeatInterval: 3000
})

// 同一个消费组内,分区消息会被分配到不同消费者
// 不同消费组各自独立消费
await consumer.subscribe({
  topic: 'orders',
  fromBeginning: true
})

await consumer.run({
  eachMessage: async ({ topic, partition, message }) => {
    console.log(`Consumer ${consumer.groupId} received from partition ${partition}`)
  }
})

手动分配分区

javascript
await consumer.assign([
  { topic: 'orders', partition: 0 },
  { topic: 'orders', partition: 2 }
])

消费者再平衡

javascript
const consumer = kafka.consumer({
  groupId: 'dynamic-group',
  sessionTimeout: 30000
})

// 监听再平衡事件
await consumer.subscribe({ topic: 'events' })

consumer.on('rebalance', async (payload) => {
  console.log('Rebalancing:', payload)
  // 可以在此保存消费进度
})

分区与顺序

分区策略

javascript
// 1. 按 key 哈希分区(默认)
const result = await producer.send({
  topic: 'orders',
  messages: [
    { key: 'user-1', value: 'order-1' },  // 相同 key 到相同分区
    { key: 'user-1', value: 'order-2' }   // 保证顺序
  ]
})

// 2. 轮询分区
const result = await producer.send({
  topic: 'orders',
  messages: [
    { value: 'message-1' },
    { value: 'message-2' },
    { value: 'message-3' }
  ]
})

// 3. 自定义分区器
const { Partitioners } = require('kafkajs')

const producer = kafka.producer({
  createPartitioner: (message, defaultPartitioner) => {
    // 自定义分区逻辑
    const key = message.key?.toString()
    if (key?.startsWith('vip-')) {
      return 0  // VIP 用户到分区 0
    }
    return defaultPartitioner(message)
  }
})

保证消息顺序

javascript
// 同一分区内的消息有序
// 同一 key 的消息会发送到同一分区
const messages = [
  { key: 'order-1', value: 'created' },
  { key: 'order-1', value: 'paid' },
  { key: 'order-1', value: 'shipped' }
]

await producer.send({
  topic: 'order-events',
  messages  // 同一 key 保证同一分区顺序
})

高级特性

消息压缩

javascript
await producer.send({
  topic: 'events',
  messages: [
    { value: JSON.stringify({ event: 'click', data: {...} }) }
  ],
  compression: Kafka.COMPRESSION_TYPES.GZIP  // 或 SNAPPY, ZSTD, LZ4
})

事务

javascript
const producer = kafka.producer({
  transactionId: 'order-transaction'
})

await producer.sendTransactional({
  topic: 'order-events',
  messages: [
    { key: 'order-1', value: 'created' }
  ]
})

// 消费者事务(Exactly-Once)
const consumer = kafka.consumer({ groupId: 'transactional-group' })

await consumer.run({
  eachMessage: async ({ topic, partition, message }) => {
    // 消费和生产在同一事务中
    await producer.send({
      topic: 'output-topic',
      messages: [{ value: message.value }]
    })
  }
})

消费者位移管理

javascript
await consumer.subscribe({ topic: 'events' })

// 手动控制位移
await consumer.run({
  eachMessage: async ({ topic, partition, message, heartbeat }) => {
    await processMessage(message)

    // 手动提交位移
    await consumer.commitOffsets([
      {
        topic: topic,
        partition: partition,
        offset: (parseInt(message.offset) + 1).toString()
      }
    ])

    await heartbeat()
  }
})

// 或自动提交
await consumer.run({
  eachMessage: async ({ topic, partition, message }) => {
    await processMessage(message)
  }
})

// 配置自动提交
const consumer = kafka.consumer({
  groupId: 'my-group',
  autoCommit: true,
  autoCommitInterval: 5000,
  autoCommitThreshold: 100  // 或消息数量
})

延迟队列

javascript
// 使用时间轮实现延迟消息
// 发送延迟消息到 _delay_ 前缀 topic
await producer.send({
  topic: '_delay_order_check',
  messages: [{
    key: 'order-1',
    value: JSON.stringify({ orderId: 'order-1' }),
    headers: {
      'x-delay': 60000  // 延迟 60 秒
    }
  }]
})

// 延迟消费者转发到实际队列
const delayConsumer = kafka.consumer({ groupId: 'delay-worker' })
await delayConsumer.subscribe({ topic: '_delay_order_check' })

await delayConsumer.run({
  eachMessage: async ({ topic, partition, message }) => {
    const delay = message.headers['x-delay']
    const delayMs = parseInt(delay?.toString() || '0')

    if (delayMs > 0) {
      await new Promise(resolve => setTimeout(resolve, delayMs))
    }

    await producer.send({
      topic: 'order-check',
      messages: [{ value: message.value }]
    })
  }
})

Stream 处理

javascript
import { Kafka } from 'kafkajs'

const kafka = new Kafka({
  clientId: 'stream-app',
  brokers: ['localhost:9092']
})

const admin = kafka.admin()
await admin.connect()

// 创建输入和输出 topic
await admin.createTopics({
  waitForLeaders: true,
  topics: [
    { topic: 'orders', numPartitions: 3, replicationFactor: 1 },
    { topic: 'orders-summary', numPartitions: 1, replicationFactor: 1 }
  ]
})

await admin.disconnect()

// Streams 示例
const stream = kafka.streams({
  consumer: { groupId: 'orders-consumer' },
  producer: { }
})

const orders = stream
  .getConsumer({ groupId: 'orders-group' })
  .getProducer()

const orderStream = orders
  .getNativeStream()
  .filter(({ message }) => {
    const event = JSON.parse(message.value.toString())
    return event.type === 'order.created'
  })
  .map(({ message }) => {
    const event = JSON.parse(message.value.toString())
    return {
      key: event.userId,
      value: JSON.stringify({
        userId: event.userId,
        amount: event.amount,
        timestamp: event.timestamp
      })
    }
  })

await orderStream.to(kafka.producer(), {
  topic: 'orders-summary'
})

orders.start()

Node.js 集成

kafkajs 封装

javascript
import { Kafka, logLevel } from 'kafkajs'

class KafkaService {
  constructor() {
    this.kafka = new Kafka({
      clientId: process.env.KAFKA_CLIENT_ID || 'my-app',
      brokers: (process.env.KAFKA_BROKERS || 'localhost:9092').split(','),
      logLevel: logLevel.WARN,
      retry: {
        initialRetryTime: 100,
        retries: 8
      }
    })

    this.producer = null
    this.consumer = null
    this.admin = this.kafka.admin()
  }

  async connect() {
    await this.admin.connect()
    console.log('Kafka admin connected')
  }

  async createTopic(topic, partitions = 3, replication = 1) {
    const exists = (await this.admin.listTopics()).includes(topic)
    if (!exists) {
      await this.admin.createTopics({
        waitForLeaders: true,
        topics: [{
          topic,
          numPartitions: partitions,
          replicationFactor: replication
        }]
      })
      console.log(`Topic ${topic} created`)
    }
  }

  async getProducer() {
    if (!this.producer) {
      this.producer = this.kafka.producer({
        allowAutoTopicCreation: true,
        transactionTimeout: 30000
      })
      await this.producer.connect()
    }
    return this.producer
  }

  async publish(topic, messages, key = null) {
    const producer = await this.getProducer()

    const formattedMessages = messages.map(msg => ({
      key: key || msg.key,
      value: typeof msg === 'string' ? msg : JSON.stringify(msg)
    }))

    await producer.send({
      topic,
      messages: formattedMessages,
      compression: this.kafka.COMPRESSION_TYPES.GZIP
    })
  }

  async getConsumer(groupId) {
    return this.kafka.consumer({
      groupId,
      sessionTimeout: 30000,
      heartbeatInterval: 3000
    })
  }

  async consume(topic, groupId, handler) {
    const consumer = await this.getConsumer(groupId)

    await consumer.connect()
    await consumer.subscribe({ topic, fromBeginning: false })

    await consumer.run({
      eachMessage: async ({ topic, partition, message }) => {
        try {
          const key = message.key?.toString()
          const value = message.value?.toString()
          const data = JSON.parse(value)

          await handler(data, { key, partition, offset: message.offset })
        } catch (err) {
          console.error('Message processing error:', err)
        }
      }
    })
  }

  async close() {
    await this.producer?.disconnect()
    await this.consumer?.disconnect()
    await this.admin.disconnect()
  }
}

export const kafkaService = new KafkaService()

使用示例

javascript
import { kafkaService } from './kafkaService.js'

// 初始化
await kafkaService.connect()
await kafkaService.createTopic('user-events')

// 生产消息
await kafkaService.publish('user-events', {
  userId: 1,
  event: 'user.created',
  timestamp: new Date()
})

// 消费消息
await kafkaService.consume('user-events', 'user-processor', async (data) => {
  console.log('Received:', data)
  await sendWelcomeEmail(data.userId)
})

// 优雅退出
process.on('SIGTERM', async () => {
  await kafkaService.close()
})

面试高频题

Q1: Kafka 的分区和副本机制?

答案:

  • 分区:将 topic 数据分成多个 partition,分布在不同节点,实现并行处理和水平扩展
  • 副本:每个 partition 有多个副本,其中一个 leader 负责读写,其他 follower 同步
  • ISR(In-Sync Replicas):与 leader 保持同步的副本集合

Q2: Kafka 如何保证消息顺序?

答案:

  • 同一 partition 内的消息有序
  • 相同 key 的消息发送到同一 partition
  • 无法跨 partition 保证全局顺序

Q3: Kafka 和 RabbitMQ 的区别?

特性KafkaRabbitMQ
协议自定义二进制AMQP
模型日志结构队列+交换机
消息保留按时间/大小按队列策略
性能极高较高
消息顺序分区内有序队列内有序
适用场景大数据流复杂路由

Q4: Kafka 的消费位移如何管理?

答案:

  • 自动提交:定时提交 offset
  • 手动提交:调用 consumer.commitOffsets()
  • 手动分配:调用 consumer.assign()
  • 位移存储:保存在内部 __consumer_offsets topic

Q5: 如何保证 Kafka 的高可用?

答案:

  1. 副本机制:设置 replication-factor > 1
  2. ISR 机制:确保副本同步
  3. 控制器选举:ZK/KRaft 负责 leader 选举
  4. 合理分区:分区数与消费者数匹配
  5. 监控告警:监控 lag 和 ISR 状态

前端面试知识库