Skip to content

RabbitMQ 消息队列

RabbitMQ 核心概念、消息模式、AMQP 协议与最佳实践

RabbitMQ 概述

特点

  • 基于 AMQP 协议
  • 支持多种消息模式
  • 成熟的路由功能
  • 丰富的插件生态
  • 管理界面友好

核心概念

┌─────────────────────────────────────────────────────────────┐
│                        RabbitMQ 结构                         │
├─────────────────────────────────────────────────────────────┤
│                                                             │
│   Producer ──▶ Exchange ──▶ Queue ──▶ Consumer              │
│                │                                            │
│                └── Binding (routing key)                    │
│                                                             │
└─────────────────────────────────────────────────────────────┘
概念说明
Producer消息生产者
Consumer消息消费者
Exchange交换机,接收消息并路由
Queue队列,存储消息
Binding绑定,连接 Exchange 和 Queue
Routing Key路由键,路由规则

消息模式

1. 简单队列(Hello World)

javascript
import amqp from 'amqplib'

const connection = await amqp.connect('amqp://localhost')
const channel = await connection.createChannel()

const queue = 'hello'

// 确保队列存在
await channel.assertQueue(queue, { durable: false })

// 生产者
channel.sendToQueue(queue, Buffer.from('Hello World'))

// 消费者
channel.consume(queue, msg => {
  console.log('Received:', msg.content.toString())
  channel.ack(msg)
})

2. 工作队列(Work Queue)

javascript
import amqp from 'amqplib'

const connection = await amqp.connect('amqp://localhost')
const channel = await connection.createChannel()

const taskQueue = 'tasks'

await channel.assertQueue(taskQueue, { durable: true })

// 公平分发
channel.prefetch(1)

// 生产者 - 发送多个任务
for (let i = 0; i < 10; i++) {
  channel.sendToQueue(taskQueue, Buffer.from(`Task ${i}`), {
    persistent: true  // 消息持久化
  })
}

// 消费者
channel.consume(taskQueue, msg => {
  const task = msg.content.toString()
  console.log('Processing:', task)

  // 模拟处理时间
  setTimeout(() => {
    console.log('Done:', task)
    channel.ack(msg)
  }, 1000)
}, { noAck: false })

3. 发布订阅(Publish/Subscribe)

javascript
import amqp from 'amqplib'

const connection = await amqp.connect('amqp://localhost')
const channel = await connection.createChannel()

// 创建 fanout 交换机
const exchange = 'logs'
await channel.assertExchange(exchange, 'fanout', { durable: false })

// 临时队列
const { queue } = await channel.assertQueue('', { exclusive: true })
await channel.bindQueue(queue, exchange, '')

// 生产者 - 广播消息
channel.publish(exchange, '', Buffer.from('Broadcast message'))

// 消费者 - 接收所有消息
channel.consume(queue, msg => {
  console.log('Received:', msg.content.toString())
})

4. 路由(Routing)

javascript
const connection = await amqp.connect('amqp://localhost')
const channel = await connection.createChannel()

const exchange = 'direct_logs'

// 创建 direct 交换机
await channel.assertExchange(exchange, 'direct', { durable: false })

// 绑定不同队列到不同路由键
await channel.assertQueue('error_queue', { durable: false })
await channel.assertQueue('info_queue', { durable: false })

channel.bindQueue('error_queue', exchange, 'error')
channel.bindQueue('info_queue', exchange, 'info')
channel.bindQueue('error_queue', exchange, 'warning')  // warning 也到 error 队列

// 生产者 - 发送不同级别的日志
channel.publish(exchange, 'error', Buffer.from('Error message'))
channel.publish(exchange, 'info', Buffer.from('Info message'))
channel.publish(exchange, 'warning', Buffer.from('Warning message'))

// 消费者
channel.consume('error_queue', msg => {
  console.log('Error:', msg.content.toString())
})

channel.consume('info_queue', msg => {
  console.log('Info:', msg.content.toString())
})

5. 主题(Topics)

javascript
const connection = await amqp.connect('amqp://localhost')
const channel = await connection.createChannel()

const exchange = 'topic_logs'

// 创建 topic 交换机
await channel.assertExchange(exchange, 'topic', { durable: false })

// 绑定规则
// * 匹配一个词
// # 匹配零个或多个词
await channel.assertQueue('kern_queue', { durable: false })
await channel.assertQueue('critical_queue', { durable: false })

channel.bindQueue('kern_queue', exchange, 'kern.*')      // kern.* 匹配 kern.critical
channel.bindQueue('critical_queue', exchange, '*.critical')  // *.critical 匹配 kern.critical
channel.bindQueue('critical_queue', exchange, '#.critical')  // # 匹配任意前缀

// 生产者
channel.publish(exchange, 'kern.critical', Buffer.from('Kern critical'))
channel.publish(exchange, 'info.critical', Buffer.from('Info critical'))
channel.publish(exchange, 'user.log', Buffer.from('User log'))

6. RPC(远程过程调用)

javascript
const connection = await amqp.connect('amqp://localhost')
const channel = await connection.createChannel()

await channel.assertQueue('rpc_queue', { durable: false })

// 生成唯一 correlationId
const generateUuid = () => Math.random().toString(36).substring(2)

// 服务器
channel.consume('rpc_queue', async msg => {
  const { replyTo, correlationId } = msg.properties
  const n = parseInt(msg.content.toString())

  console.log('Received:', n)

  // 计算斐波那契
  const fib = (x) => {
    if (x === 0) return 0
    if (x === 1) return 1
    return fib(x - 1) + fib(x - 2)
  }

  const result = fib(n)

  // 发送回结果
  channel.sendToQueue(replyTo, Buffer.from(result.toString()), {
    correlationId
  })
})

// 客户端
const fibonacci = (n) => {
  return new Promise((resolve) => {
    const correlationId = generateUuid()

    channel.consume('rpc_queue', function reply(msg) {
      if (msg.properties.correlationId === correlationId) {
        resolve(parseInt(msg.content.toString()))
        channel.cancel(this.fields.consumerTag)
      }
    }, { noAck: true })

    channel.sendToQueue('rpc_queue', Buffer.from(n.toString()), {
      replyTo: 'rpc_queue',
      correlationId
    })
  })
}

console.log(await fibonacci(10))

高级特性

消息确认

javascript
// 自动确认(可能丢失消息)
channel.consume(queue, msg => {
  console.log(msg.content.toString())
}, { noAck: true })

// 手动确认(推荐)
channel.consume(queue, msg => {
  const data = JSON.parse(msg.content.toString())

  try {
    processData(data)
    channel.ack(msg)  // 确认处理成功
  } catch (err) {
    if (msg.fields.redelivered) {
      // 重新入队失败,丢弃或记录
      console.error('Processing failed:', err)
      channel.nack(msg, false, false)
    } else {
      // 重新入队
      channel.nack(msg, false, true)
    }
  }
}, { noAck: false })

消息持久化

javascript
// 1. 队列持久化
await channel.assertQueue('myqueue', {
  durable: true  // 重启后队列仍然存在
})

// 2. 消息持久化
channel.sendToQueue(queue, Buffer.from('message'), {
  persistent: true  // 消息持久化到磁盘
})

// 3. 发布者确认
channel.confirmDelivery(() => {
  console.log('Message confirmed')
})

死信队列

javascript
const mainQueue = 'main_queue'
const deadQueue = 'dead_queue'
const deadExchange = 'dead_exchange'

// 创建死信交换机和队列
await channel.assertExchange(deadExchange, 'direct', { durable: true })
await channel.assertQueue(deadQueue, { durable: true })
await channel.bindQueue(deadQueue, deadExchange, 'dead')

// 主队列配置死信
await channel.assertQueue(mainQueue, {
  durable: true,
  arguments: {
    'x-dead-letter-exchange': deadExchange,
    'x-dead-letter-routing-key': 'dead',
    'x-message-ttl': 60000  // 60秒过期
  }
})

优先级队列

javascript
await channel.assertQueue('priority_queue', {
  arguments: {
    'x-max-priority': 10  // 最大优先级 0-10
  }
})

// 发送优先级消息
channel.sendToQueue('priority_queue', Buffer.from('urgent'), {
  priority: 10  // 高优先级
})

channel.sendToQueue('priority_queue', Buffer.from('normal'), {
  priority: 1  // 低优先级
})

限流

javascript
// 预取限制
channel.prefetch(10)  // 最多10条未确认消息

// 或基于大小
channel.prefetch(100000)  // 100KB

Node.js 集成

amqplib 封装

javascript
import amqp from 'amqplib'

class RabbitMQ {
  constructor(url = 'amqp://localhost') {
    this.url = url
    this.connection = null
    this.channel = null
  }

  async connect() {
    this.connection = await amqp.connect(this.url)
    this.channel = await this.connection.createChannel()

    // 断线重连
    this.connection.on('close', () => {
      console.log('RabbitMQ disconnected, reconnecting...')
      setTimeout(() => this.connect(), 5000)
    })

    return this
  }

  async assertQueue(queue, options = {}) {
    return this.channel.assertQueue(queue, {
      durable: true,
      ...options
    })
  }

  async assertExchange(exchange, type = 'direct', options = {}) {
    return this.channel.assertExchange(exchange, type, {
      durable: true,
      ...options
    })
  }

  async bindQueue(queue, exchange, routingKey = '') {
    return this.channel.bindQueue(queue, exchange, routingKey)
  }

  async publish(exchange, routingKey, message, options = {}) {
    const ok = this.channel.publish(
      exchange,
      routingKey,
      Buffer.from(JSON.stringify(message)),
      {
        persistent: true,
        contentType: 'application/json',
        ...options
      }
    )
    return ok
  }

  async consume(queue, handler, options = {}) {
    return this.channel.consume(queue, async (msg) => {
      if (!msg) return

      try {
        const content = JSON.parse(msg.content.toString())
        await handler(content, msg)
        this.channel.ack(msg)
      } catch (err) {
        console.error('Message processing failed:', err)
        // 判断是否重新入队
        const requeue = !msg.fields.redelivered
        this.channel.nack(msg, false, requeue)
      }
    }, { noAck: false, ...options })
  }

  async close() {
    await this.channel?.close()
    await this.connection?.close()
  }
}

export const rabbitmq = new RabbitMQ()

使用示例

javascript
import { rabbitmq } from './rabbitmq.js'

// 初始化
await rabbitmq.connect()

// 发布消息
await rabbitmq.assertExchange('user.events', 'topic')
await rabbitmq.publish('user.events', 'user.created', {
  userId: 1,
  name: 'Alice',
  timestamp: new Date()
})

// 消费消息
await rabbitmq.assertQueue('user.created.queue')
await rabbitmq.bindQueue('user.created.queue', 'user.events', 'user.created')

await rabbitmq.consume('user.created.queue', async (data) => {
  console.log('User created:', data)
  await sendWelcomeEmail(data)
})

// RPC 调用
await rabbitmq.assertQueue('fibonacci.queue')
const result = await rpcCall('fibonacci.queue', 20)

面试高频题

Q1: RabbitMQ 有哪些消息模式?

答案:

  1. 简单队列:最基础的生产者-消费者模式
  2. 工作队列:多个消费者公平分发任务
  3. 发布订阅:fanout 交换机广播到所有队列
  4. 路由:direct 交换机根据路由键精确匹配
  5. 主题:topic 交换机支持通配符匹配
  6. RPC:请求-响应模式

Q2: RabbitMQ 的消息确认机制?

答案:

  • 自动确认noAck: true,消息一旦发送就确认,可能丢失
  • 手动确认channel.ack(msg) 确认成功,channel.nack(msg) 拒绝
  • 拒绝:可选择是否重新入队

Q3: 如何保证消息不丢失?

答案:

  1. 队列持久化:durable: true
  2. 消息持久化:persistent: true
  3. 发布者确认:confirmDelivery
  4. 消费者手动确认
  5. 使用死信队列处理失败消息

Q4: RabbitMQ 和 Kafka 的区别?

特性RabbitMQKafka
协议AMQP自定义协议
模型队列 + 交换机日志结构
消息顺序队列内有序分区内有序
性能较低极高
适用场景复杂路由大数据流处理

前端面试知识库