RabbitMQ 消息队列
RabbitMQ 核心概念、消息模式、AMQP 协议与最佳实践
RabbitMQ 概述
特点
- 基于 AMQP 协议
- 支持多种消息模式
- 成熟的路由功能
- 丰富的插件生态
- 管理界面友好
核心概念
┌─────────────────────────────────────────────────────────────┐
│ RabbitMQ 结构 │
├─────────────────────────────────────────────────────────────┤
│ │
│ Producer ──▶ Exchange ──▶ Queue ──▶ Consumer │
│ │ │
│ └── Binding (routing key) │
│ │
└─────────────────────────────────────────────────────────────┘| 概念 | 说明 |
|---|---|
| Producer | 消息生产者 |
| Consumer | 消息消费者 |
| Exchange | 交换机,接收消息并路由 |
| Queue | 队列,存储消息 |
| Binding | 绑定,连接 Exchange 和 Queue |
| Routing Key | 路由键,路由规则 |
消息模式
1. 简单队列(Hello World)
javascript
import amqp from 'amqplib'
const connection = await amqp.connect('amqp://localhost')
const channel = await connection.createChannel()
const queue = 'hello'
// 确保队列存在
await channel.assertQueue(queue, { durable: false })
// 生产者
channel.sendToQueue(queue, Buffer.from('Hello World'))
// 消费者
channel.consume(queue, msg => {
console.log('Received:', msg.content.toString())
channel.ack(msg)
})2. 工作队列(Work Queue)
javascript
import amqp from 'amqplib'
const connection = await amqp.connect('amqp://localhost')
const channel = await connection.createChannel()
const taskQueue = 'tasks'
await channel.assertQueue(taskQueue, { durable: true })
// 公平分发
channel.prefetch(1)
// 生产者 - 发送多个任务
for (let i = 0; i < 10; i++) {
channel.sendToQueue(taskQueue, Buffer.from(`Task ${i}`), {
persistent: true // 消息持久化
})
}
// 消费者
channel.consume(taskQueue, msg => {
const task = msg.content.toString()
console.log('Processing:', task)
// 模拟处理时间
setTimeout(() => {
console.log('Done:', task)
channel.ack(msg)
}, 1000)
}, { noAck: false })3. 发布订阅(Publish/Subscribe)
javascript
import amqp from 'amqplib'
const connection = await amqp.connect('amqp://localhost')
const channel = await connection.createChannel()
// 创建 fanout 交换机
const exchange = 'logs'
await channel.assertExchange(exchange, 'fanout', { durable: false })
// 临时队列
const { queue } = await channel.assertQueue('', { exclusive: true })
await channel.bindQueue(queue, exchange, '')
// 生产者 - 广播消息
channel.publish(exchange, '', Buffer.from('Broadcast message'))
// 消费者 - 接收所有消息
channel.consume(queue, msg => {
console.log('Received:', msg.content.toString())
})4. 路由(Routing)
javascript
const connection = await amqp.connect('amqp://localhost')
const channel = await connection.createChannel()
const exchange = 'direct_logs'
// 创建 direct 交换机
await channel.assertExchange(exchange, 'direct', { durable: false })
// 绑定不同队列到不同路由键
await channel.assertQueue('error_queue', { durable: false })
await channel.assertQueue('info_queue', { durable: false })
channel.bindQueue('error_queue', exchange, 'error')
channel.bindQueue('info_queue', exchange, 'info')
channel.bindQueue('error_queue', exchange, 'warning') // warning 也到 error 队列
// 生产者 - 发送不同级别的日志
channel.publish(exchange, 'error', Buffer.from('Error message'))
channel.publish(exchange, 'info', Buffer.from('Info message'))
channel.publish(exchange, 'warning', Buffer.from('Warning message'))
// 消费者
channel.consume('error_queue', msg => {
console.log('Error:', msg.content.toString())
})
channel.consume('info_queue', msg => {
console.log('Info:', msg.content.toString())
})5. 主题(Topics)
javascript
const connection = await amqp.connect('amqp://localhost')
const channel = await connection.createChannel()
const exchange = 'topic_logs'
// 创建 topic 交换机
await channel.assertExchange(exchange, 'topic', { durable: false })
// 绑定规则
// * 匹配一个词
// # 匹配零个或多个词
await channel.assertQueue('kern_queue', { durable: false })
await channel.assertQueue('critical_queue', { durable: false })
channel.bindQueue('kern_queue', exchange, 'kern.*') // kern.* 匹配 kern.critical
channel.bindQueue('critical_queue', exchange, '*.critical') // *.critical 匹配 kern.critical
channel.bindQueue('critical_queue', exchange, '#.critical') // # 匹配任意前缀
// 生产者
channel.publish(exchange, 'kern.critical', Buffer.from('Kern critical'))
channel.publish(exchange, 'info.critical', Buffer.from('Info critical'))
channel.publish(exchange, 'user.log', Buffer.from('User log'))6. RPC(远程过程调用)
javascript
const connection = await amqp.connect('amqp://localhost')
const channel = await connection.createChannel()
await channel.assertQueue('rpc_queue', { durable: false })
// 生成唯一 correlationId
const generateUuid = () => Math.random().toString(36).substring(2)
// 服务器
channel.consume('rpc_queue', async msg => {
const { replyTo, correlationId } = msg.properties
const n = parseInt(msg.content.toString())
console.log('Received:', n)
// 计算斐波那契
const fib = (x) => {
if (x === 0) return 0
if (x === 1) return 1
return fib(x - 1) + fib(x - 2)
}
const result = fib(n)
// 发送回结果
channel.sendToQueue(replyTo, Buffer.from(result.toString()), {
correlationId
})
})
// 客户端
const fibonacci = (n) => {
return new Promise((resolve) => {
const correlationId = generateUuid()
channel.consume('rpc_queue', function reply(msg) {
if (msg.properties.correlationId === correlationId) {
resolve(parseInt(msg.content.toString()))
channel.cancel(this.fields.consumerTag)
}
}, { noAck: true })
channel.sendToQueue('rpc_queue', Buffer.from(n.toString()), {
replyTo: 'rpc_queue',
correlationId
})
})
}
console.log(await fibonacci(10))高级特性
消息确认
javascript
// 自动确认(可能丢失消息)
channel.consume(queue, msg => {
console.log(msg.content.toString())
}, { noAck: true })
// 手动确认(推荐)
channel.consume(queue, msg => {
const data = JSON.parse(msg.content.toString())
try {
processData(data)
channel.ack(msg) // 确认处理成功
} catch (err) {
if (msg.fields.redelivered) {
// 重新入队失败,丢弃或记录
console.error('Processing failed:', err)
channel.nack(msg, false, false)
} else {
// 重新入队
channel.nack(msg, false, true)
}
}
}, { noAck: false })消息持久化
javascript
// 1. 队列持久化
await channel.assertQueue('myqueue', {
durable: true // 重启后队列仍然存在
})
// 2. 消息持久化
channel.sendToQueue(queue, Buffer.from('message'), {
persistent: true // 消息持久化到磁盘
})
// 3. 发布者确认
channel.confirmDelivery(() => {
console.log('Message confirmed')
})死信队列
javascript
const mainQueue = 'main_queue'
const deadQueue = 'dead_queue'
const deadExchange = 'dead_exchange'
// 创建死信交换机和队列
await channel.assertExchange(deadExchange, 'direct', { durable: true })
await channel.assertQueue(deadQueue, { durable: true })
await channel.bindQueue(deadQueue, deadExchange, 'dead')
// 主队列配置死信
await channel.assertQueue(mainQueue, {
durable: true,
arguments: {
'x-dead-letter-exchange': deadExchange,
'x-dead-letter-routing-key': 'dead',
'x-message-ttl': 60000 // 60秒过期
}
})优先级队列
javascript
await channel.assertQueue('priority_queue', {
arguments: {
'x-max-priority': 10 // 最大优先级 0-10
}
})
// 发送优先级消息
channel.sendToQueue('priority_queue', Buffer.from('urgent'), {
priority: 10 // 高优先级
})
channel.sendToQueue('priority_queue', Buffer.from('normal'), {
priority: 1 // 低优先级
})限流
javascript
// 预取限制
channel.prefetch(10) // 最多10条未确认消息
// 或基于大小
channel.prefetch(100000) // 100KBNode.js 集成
amqplib 封装
javascript
import amqp from 'amqplib'
class RabbitMQ {
constructor(url = 'amqp://localhost') {
this.url = url
this.connection = null
this.channel = null
}
async connect() {
this.connection = await amqp.connect(this.url)
this.channel = await this.connection.createChannel()
// 断线重连
this.connection.on('close', () => {
console.log('RabbitMQ disconnected, reconnecting...')
setTimeout(() => this.connect(), 5000)
})
return this
}
async assertQueue(queue, options = {}) {
return this.channel.assertQueue(queue, {
durable: true,
...options
})
}
async assertExchange(exchange, type = 'direct', options = {}) {
return this.channel.assertExchange(exchange, type, {
durable: true,
...options
})
}
async bindQueue(queue, exchange, routingKey = '') {
return this.channel.bindQueue(queue, exchange, routingKey)
}
async publish(exchange, routingKey, message, options = {}) {
const ok = this.channel.publish(
exchange,
routingKey,
Buffer.from(JSON.stringify(message)),
{
persistent: true,
contentType: 'application/json',
...options
}
)
return ok
}
async consume(queue, handler, options = {}) {
return this.channel.consume(queue, async (msg) => {
if (!msg) return
try {
const content = JSON.parse(msg.content.toString())
await handler(content, msg)
this.channel.ack(msg)
} catch (err) {
console.error('Message processing failed:', err)
// 判断是否重新入队
const requeue = !msg.fields.redelivered
this.channel.nack(msg, false, requeue)
}
}, { noAck: false, ...options })
}
async close() {
await this.channel?.close()
await this.connection?.close()
}
}
export const rabbitmq = new RabbitMQ()使用示例
javascript
import { rabbitmq } from './rabbitmq.js'
// 初始化
await rabbitmq.connect()
// 发布消息
await rabbitmq.assertExchange('user.events', 'topic')
await rabbitmq.publish('user.events', 'user.created', {
userId: 1,
name: 'Alice',
timestamp: new Date()
})
// 消费消息
await rabbitmq.assertQueue('user.created.queue')
await rabbitmq.bindQueue('user.created.queue', 'user.events', 'user.created')
await rabbitmq.consume('user.created.queue', async (data) => {
console.log('User created:', data)
await sendWelcomeEmail(data)
})
// RPC 调用
await rabbitmq.assertQueue('fibonacci.queue')
const result = await rpcCall('fibonacci.queue', 20)面试高频题
Q1: RabbitMQ 有哪些消息模式?
答案:
- 简单队列:最基础的生产者-消费者模式
- 工作队列:多个消费者公平分发任务
- 发布订阅:fanout 交换机广播到所有队列
- 路由:direct 交换机根据路由键精确匹配
- 主题:topic 交换机支持通配符匹配
- RPC:请求-响应模式
Q2: RabbitMQ 的消息确认机制?
答案:
- 自动确认:
noAck: true,消息一旦发送就确认,可能丢失 - 手动确认:
channel.ack(msg)确认成功,channel.nack(msg)拒绝 - 拒绝:可选择是否重新入队
Q3: 如何保证消息不丢失?
答案:
- 队列持久化:
durable: true - 消息持久化:
persistent: true - 发布者确认:
confirmDelivery - 消费者手动确认
- 使用死信队列处理失败消息
Q4: RabbitMQ 和 Kafka 的区别?
| 特性 | RabbitMQ | Kafka |
|---|---|---|
| 协议 | AMQP | 自定义协议 |
| 模型 | 队列 + 交换机 | 日志结构 |
| 消息顺序 | 队列内有序 | 分区内有序 |
| 性能 | 较低 | 极高 |
| 适用场景 | 复杂路由 | 大数据流处理 |