Kafka 消息队列
Kafka 核心概念、分区、副本与最佳实践
Kafka 概述
特点
- 高吞吐量,单机可达百万条消息/秒
- 分布式,支持水平扩展
- 消息持久化到磁盘
- 消息消费后仍保留(可重放)
- 支持流处理(Kafka Streams)
核心概念
┌─────────────────────────────────────────────────────────────┐
│ Kafka 架构 │
├─────────────────────────────────────────────────────────────┤
│ │
│ Producer ──┬──▶ Partition 0 ──┬──▶ Consumer Group A │
│ │ │ (Consumer 1, 2) │
│ │ │ │
│ ├──▶ Partition 1 ──┤──▶ Consumer Group B │
│ │ │ (Consumer 1) │
│ │ │ │
│ └──▶ Partition 2 ──┴──▶ Consumer Group C │
│ (Consumer 1, 2) │
│ │
│ Zookeeper / KRaft ─── 集群管理 │
└─────────────────────────────────────────────────────────────┘| 概念 | 说明 |
|---|---|
| Topic | 消息主题,逻辑分类 |
| Partition | 分区,物理存储单位 |
| Replica | 副本,保证高可用 |
| Producer | 消息生产者 |
| Consumer | 消息消费者 |
| Consumer Group | 消费组,同组内负载均衡 |
| Offset | 消费位移,记录消费位置 |
基本操作
创建 Topic
bash
# 创建 topic
kafka-topics.sh --create --topic user-events \
--bootstrap-server localhost:9092 \
--partitions 3 \
--replication-factor 2
# 查看 topic 列表
kafka-topics.sh --list --bootstrap-server localhost:9092
# 查看 topic 详情
kafka-topics.sh --describe --topic user-events \
--bootstrap-server localhost:9092生产消息
javascript
import { Kafka } from 'kafkajs'
const kafka = new Kafka({
clientId: 'my-app',
brokers: ['localhost:9092']
})
const producer = kafka.producer()
await producer.connect()
// 发送单条消息
await producer.send({
topic: 'user-events',
messages: [
{
key: 'user-1',
value: JSON.stringify({
event: 'user.created',
userId: 1,
timestamp: new Date()
})
}
]
})
// 批量发送
await producer.send({
topic: 'user-events',
messages: [
{ key: 'user-1', value: 'message 1' },
{ key: 'user-2', value: 'message 2' },
{ key: 'user-3', value: 'message 3' }
]
})
await producer.disconnect()消费消息
javascript
const consumer = kafka.consumer({ groupId: 'my-group' })
await consumer.connect()
await consumer.subscribe({ topic: 'user-events', fromBeginning: false })
await consumer.run({
eachMessage: async ({ topic, partition, message }) => {
console.log({
key: message.key?.toString(),
value: message.value?.toString(),
partition,
offset: message.offset
})
}
})
// 或 eachBatch
await consumer.run({
eachBatch: async ({ batch, resolveOffset, heartbeat, commitOffsetsIfNecessary }) => {
console.log(`Received batch with ${batch.messages.length} messages`)
for (const message of batch.messages) {
const data = JSON.parse(message.value.toString())
await processMessage(data)
resolveOffset(message.offset)
}
await heartbeat()
await commitOffsetsIfNecessary()
}
})消费者组
javascript
const consumer = kafka.consumer({
groupId: 'order-processor',
sessionTimeout: 30000,
heartbeatInterval: 3000
})
// 同一个消费组内,分区消息会被分配到不同消费者
// 不同消费组各自独立消费
await consumer.subscribe({
topic: 'orders',
fromBeginning: true
})
await consumer.run({
eachMessage: async ({ topic, partition, message }) => {
console.log(`Consumer ${consumer.groupId} received from partition ${partition}`)
}
})手动分配分区
javascript
await consumer.assign([
{ topic: 'orders', partition: 0 },
{ topic: 'orders', partition: 2 }
])消费者再平衡
javascript
const consumer = kafka.consumer({
groupId: 'dynamic-group',
sessionTimeout: 30000
})
// 监听再平衡事件
await consumer.subscribe({ topic: 'events' })
consumer.on('rebalance', async (payload) => {
console.log('Rebalancing:', payload)
// 可以在此保存消费进度
})分区与顺序
分区策略
javascript
// 1. 按 key 哈希分区(默认)
const result = await producer.send({
topic: 'orders',
messages: [
{ key: 'user-1', value: 'order-1' }, // 相同 key 到相同分区
{ key: 'user-1', value: 'order-2' } // 保证顺序
]
})
// 2. 轮询分区
const result = await producer.send({
topic: 'orders',
messages: [
{ value: 'message-1' },
{ value: 'message-2' },
{ value: 'message-3' }
]
})
// 3. 自定义分区器
const { Partitioners } = require('kafkajs')
const producer = kafka.producer({
createPartitioner: (message, defaultPartitioner) => {
// 自定义分区逻辑
const key = message.key?.toString()
if (key?.startsWith('vip-')) {
return 0 // VIP 用户到分区 0
}
return defaultPartitioner(message)
}
})保证消息顺序
javascript
// 同一分区内的消息有序
// 同一 key 的消息会发送到同一分区
const messages = [
{ key: 'order-1', value: 'created' },
{ key: 'order-1', value: 'paid' },
{ key: 'order-1', value: 'shipped' }
]
await producer.send({
topic: 'order-events',
messages // 同一 key 保证同一分区顺序
})高级特性
消息压缩
javascript
await producer.send({
topic: 'events',
messages: [
{ value: JSON.stringify({ event: 'click', data: {...} }) }
],
compression: Kafka.COMPRESSION_TYPES.GZIP // 或 SNAPPY, ZSTD, LZ4
})事务
javascript
const producer = kafka.producer({
transactionId: 'order-transaction'
})
await producer.sendTransactional({
topic: 'order-events',
messages: [
{ key: 'order-1', value: 'created' }
]
})
// 消费者事务(Exactly-Once)
const consumer = kafka.consumer({ groupId: 'transactional-group' })
await consumer.run({
eachMessage: async ({ topic, partition, message }) => {
// 消费和生产在同一事务中
await producer.send({
topic: 'output-topic',
messages: [{ value: message.value }]
})
}
})消费者位移管理
javascript
await consumer.subscribe({ topic: 'events' })
// 手动控制位移
await consumer.run({
eachMessage: async ({ topic, partition, message, heartbeat }) => {
await processMessage(message)
// 手动提交位移
await consumer.commitOffsets([
{
topic: topic,
partition: partition,
offset: (parseInt(message.offset) + 1).toString()
}
])
await heartbeat()
}
})
// 或自动提交
await consumer.run({
eachMessage: async ({ topic, partition, message }) => {
await processMessage(message)
}
})
// 配置自动提交
const consumer = kafka.consumer({
groupId: 'my-group',
autoCommit: true,
autoCommitInterval: 5000,
autoCommitThreshold: 100 // 或消息数量
})延迟队列
javascript
// 使用时间轮实现延迟消息
// 发送延迟消息到 _delay_ 前缀 topic
await producer.send({
topic: '_delay_order_check',
messages: [{
key: 'order-1',
value: JSON.stringify({ orderId: 'order-1' }),
headers: {
'x-delay': 60000 // 延迟 60 秒
}
}]
})
// 延迟消费者转发到实际队列
const delayConsumer = kafka.consumer({ groupId: 'delay-worker' })
await delayConsumer.subscribe({ topic: '_delay_order_check' })
await delayConsumer.run({
eachMessage: async ({ topic, partition, message }) => {
const delay = message.headers['x-delay']
const delayMs = parseInt(delay?.toString() || '0')
if (delayMs > 0) {
await new Promise(resolve => setTimeout(resolve, delayMs))
}
await producer.send({
topic: 'order-check',
messages: [{ value: message.value }]
})
}
})Stream 处理
javascript
import { Kafka } from 'kafkajs'
const kafka = new Kafka({
clientId: 'stream-app',
brokers: ['localhost:9092']
})
const admin = kafka.admin()
await admin.connect()
// 创建输入和输出 topic
await admin.createTopics({
waitForLeaders: true,
topics: [
{ topic: 'orders', numPartitions: 3, replicationFactor: 1 },
{ topic: 'orders-summary', numPartitions: 1, replicationFactor: 1 }
]
})
await admin.disconnect()
// Streams 示例
const stream = kafka.streams({
consumer: { groupId: 'orders-consumer' },
producer: { }
})
const orders = stream
.getConsumer({ groupId: 'orders-group' })
.getProducer()
const orderStream = orders
.getNativeStream()
.filter(({ message }) => {
const event = JSON.parse(message.value.toString())
return event.type === 'order.created'
})
.map(({ message }) => {
const event = JSON.parse(message.value.toString())
return {
key: event.userId,
value: JSON.stringify({
userId: event.userId,
amount: event.amount,
timestamp: event.timestamp
})
}
})
await orderStream.to(kafka.producer(), {
topic: 'orders-summary'
})
orders.start()Node.js 集成
kafkajs 封装
javascript
import { Kafka, logLevel } from 'kafkajs'
class KafkaService {
constructor() {
this.kafka = new Kafka({
clientId: process.env.KAFKA_CLIENT_ID || 'my-app',
brokers: (process.env.KAFKA_BROKERS || 'localhost:9092').split(','),
logLevel: logLevel.WARN,
retry: {
initialRetryTime: 100,
retries: 8
}
})
this.producer = null
this.consumer = null
this.admin = this.kafka.admin()
}
async connect() {
await this.admin.connect()
console.log('Kafka admin connected')
}
async createTopic(topic, partitions = 3, replication = 1) {
const exists = (await this.admin.listTopics()).includes(topic)
if (!exists) {
await this.admin.createTopics({
waitForLeaders: true,
topics: [{
topic,
numPartitions: partitions,
replicationFactor: replication
}]
})
console.log(`Topic ${topic} created`)
}
}
async getProducer() {
if (!this.producer) {
this.producer = this.kafka.producer({
allowAutoTopicCreation: true,
transactionTimeout: 30000
})
await this.producer.connect()
}
return this.producer
}
async publish(topic, messages, key = null) {
const producer = await this.getProducer()
const formattedMessages = messages.map(msg => ({
key: key || msg.key,
value: typeof msg === 'string' ? msg : JSON.stringify(msg)
}))
await producer.send({
topic,
messages: formattedMessages,
compression: this.kafka.COMPRESSION_TYPES.GZIP
})
}
async getConsumer(groupId) {
return this.kafka.consumer({
groupId,
sessionTimeout: 30000,
heartbeatInterval: 3000
})
}
async consume(topic, groupId, handler) {
const consumer = await this.getConsumer(groupId)
await consumer.connect()
await consumer.subscribe({ topic, fromBeginning: false })
await consumer.run({
eachMessage: async ({ topic, partition, message }) => {
try {
const key = message.key?.toString()
const value = message.value?.toString()
const data = JSON.parse(value)
await handler(data, { key, partition, offset: message.offset })
} catch (err) {
console.error('Message processing error:', err)
}
}
})
}
async close() {
await this.producer?.disconnect()
await this.consumer?.disconnect()
await this.admin.disconnect()
}
}
export const kafkaService = new KafkaService()使用示例
javascript
import { kafkaService } from './kafkaService.js'
// 初始化
await kafkaService.connect()
await kafkaService.createTopic('user-events')
// 生产消息
await kafkaService.publish('user-events', {
userId: 1,
event: 'user.created',
timestamp: new Date()
})
// 消费消息
await kafkaService.consume('user-events', 'user-processor', async (data) => {
console.log('Received:', data)
await sendWelcomeEmail(data.userId)
})
// 优雅退出
process.on('SIGTERM', async () => {
await kafkaService.close()
})面试高频题
Q1: Kafka 的分区和副本机制?
答案:
- 分区:将 topic 数据分成多个 partition,分布在不同节点,实现并行处理和水平扩展
- 副本:每个 partition 有多个副本,其中一个 leader 负责读写,其他 follower 同步
- ISR(In-Sync Replicas):与 leader 保持同步的副本集合
Q2: Kafka 如何保证消息顺序?
答案:
- 同一 partition 内的消息有序
- 相同 key 的消息发送到同一 partition
- 无法跨 partition 保证全局顺序
Q3: Kafka 和 RabbitMQ 的区别?
| 特性 | Kafka | RabbitMQ |
|---|---|---|
| 协议 | 自定义二进制 | AMQP |
| 模型 | 日志结构 | 队列+交换机 |
| 消息保留 | 按时间/大小 | 按队列策略 |
| 性能 | 极高 | 较高 |
| 消息顺序 | 分区内有序 | 队列内有序 |
| 适用场景 | 大数据流 | 复杂路由 |
Q4: Kafka 的消费位移如何管理?
答案:
- 自动提交:定时提交 offset
- 手动提交:调用
consumer.commitOffsets() - 手动分配:调用
consumer.assign() - 位移存储:保存在内部 __consumer_offsets topic
Q5: 如何保证 Kafka 的高可用?
答案:
- 副本机制:设置 replication-factor > 1
- ISR 机制:确保副本同步
- 控制器选举:ZK/KRaft 负责 leader 选举
- 合理分区:分区数与消费者数匹配
- 监控告警:监控 lag 和 ISR 状态