RabbitMQ 教程 - 使用发布者确认实现可靠发布
发布者确认
(使用 Bunny Ruby 客户端)
先决条件
本教程假设 RabbitMQ 已 安装 并在 localhost 上的 标准端口 (5672) 上运行。如果您使用不同的主机、端口或凭据,则需要调整连接设置。
哪里寻求帮助
如果您在学习本教程时遇到困难,可以通过 GitHub Discussions 或 RabbitMQ 社区 Discord 联系我们。
发布者确认 是 RabbitMQ 的一项扩展,用于实现可靠发布。当通道上启用了发布者确认时,客户端发布的这些消息将由代理异步确认,这意味着这些消息在服务器端已被处理。
概述
在本教程中,我们将使用发布者确认来确保已发布的消息已安全到达代理。我们将介绍几种使用发布者确认的策略,并解释它们的优缺点。
在通道上启用发布者确认
发布者确认(Publisher confirms)是 RabbitMQ 对 AMQP 0.9.1 协议的一种扩展,因此默认情况下是不启用的。可以通过 channel(通道)上的 confirm_select 方法启用发布者确认。
channel = connection.create_channel
channel.confirm_select(tracking: true)
必须在你希望使用发布者确认的每个通道上调用此方法。确认机制只需启用一次,而不是针对每条发布的消息都启用。Bunny 3.0 及更高版本提供了 tracking: true 选项,它能自动追踪发布者确认并提供背压(backpressure)支持。
策略 #1:单独发布消息
让我们从最简单的发布确认方式开始,即发布一条消息并等待其确认。
def publish_messages_individually(connection)
channel = connection.create_channel
queue = channel.queue('', exclusive: true)
channel.confirm_select(tracking: true)
start_time = Time.now
MESSAGE_COUNT.times do |i|
body = i.to_s
channel.basic_publish(body, '', queue.name)
channel.wait_for_confirms
end
end_time = Time.now
puts "Published #{MESSAGE_COUNT} messages individually in #{((end_time - start_time) * 1000).to_i} ms"
ensure
channel.close if channel && channel.open?
end
在前面的示例中,我们像往常一样发布一条消息,并使用 channel.wait_for_confirms 等待确认。该方法在消息被确认后立即返回。如果所有消息都成功确认,该方法返回 true;如果任何消息被拒绝(nack-ed,意味着代理由于某种原因无法处理它),则返回 false。处理 false 返回值通常包括记录错误日志和/或重试发送消息。
不同的客户端库处理同步发布者确认的方式各不相同,因此请务必仔细阅读你所使用的客户端文档。
这种技术非常直观,但也有一个主要缺点:它**显著降低了发布速度**,因为确认一条消息会阻塞后续所有消息的发布。这种方法每秒无法实现超过几百条消息的吞吐量。尽管如此,这对某些应用来说已经足够了。
使用 tracking: true 时,Bunny 会自动限制未确认消息的数量(默认为 1000 条),并在达到此限制时阻塞发布线程。这提供了自然的背压机制。
发布者确认是异步的吗?
我们在开头提到过,代理是异步确认发布的消息的,但在第一个示例中,代码是同步等待消息被确认的。客户端实际上是异步接收确认并相应地解除对
wait_for_confirms的调用阻塞。可以将wait_for_confirms视为一种同步助手,其底层依赖于异步通知。
策略 #2:批量发布消息
为了改进我们之前的示例,我们可以批量发布消息并等待整个批次被确认。为了获得最佳吞吐量,我们可以使用 Bunny 3.0 提供的 basic_publish_batch 方法。
def publish_messages_in_batch(connection)
channel = connection.create_channel
queue = channel.queue('', exclusive: true)
channel.confirm_select(tracking: true)
batch_size = 1000
start_time = Time.now
(0...MESSAGE_COUNT).each_slice(batch_size) do |batch|
messages = batch.map { |i| i.to_s }
channel.basic_publish_batch(messages, '', queue.name)
end
# Wait for any remaining confirmations
channel.wait_for_confirms
end_time = Time.now
puts "Published #{MESSAGE_COUNT} messages in batch in #{((end_time - start_time) * 1000).to_i} ms"
ensure
channel.close if channel && channel.open?
end
相比等待单个消息,等待一批消息被确认可以显著提高吞吐量(最高可提升 3-4 倍)。如果批次中的任何消息被拒绝(nack-ed),channel.wait_for_confirms 将返回 false。
一个缺点是,在失败的情况下,我们无法确切知道哪里出了问题,因此我们可能必须将整个批次保存在内存中,以便记录有意义的日志或重新发布消息。而且此解决方案仍然是同步的,因此它会阻塞消息的发布。
策略 3:异步处理发布者确认
代理是异步确认已发布消息的,用户只需要在客户端注册一个回调函数来接收这些确认通知。在 Bunny 中,我们可以向 confirm_select 传递一个代码块(block)来处理这些回调。
channel = connection.create_channel
channel.confirm_select do |delivery_tag, multiple, nack|
# code when message is confirmed or nack-ed
end
传递给代码块的参数有 3 个:
delivery_tag:标识已确认或被拒绝消息的编号。我们稍后会看到如何将其与已发布的消息关联起来。multiple:这是一个布尔值。如果为 false,则仅确认/拒绝一条消息;如果为 true,则序列号小于或等于该值的所有消息都将被确认/拒绝。nack:这是一个布尔值。如果为 true,则表示消息被拒绝(可以认为是被代理丢弃了)。
可以在发布前使用 channel.next_publish_seq_no 获取序列号。
seq_no = channel.next_publish_seq_no
channel.basic_publish(body, '', queue.name)
将消息与序列号关联的一种简单方法是使用哈希表(Hash)。假设我们要发布字符串,因为它们很容易转换为字节数组进行发布。以下代码示例展示了如何使用哈希表将发布序列号与消息体字符串关联起来。
outstanding_confirms = {}
# ... code for confirm callbacks will come later
body = "..."
outstanding_confirms[channel.next_publish_seq_no] = body
channel.basic_publish(body, '', queue.name)
发布代码现在使用哈希表来追踪发出的消息。当确认到达时,我们需要清理这个哈希表,并在消息被拒绝时执行一些操作,比如记录警告日志。
def handle_publish_confirms_asynchronously(connection)
channel = connection.create_channel
queue = channel.queue('', exclusive: true)
outstanding_confirms = {}
# A mutex is necessary because the confirm callbacks are executed in a separate thread
confirms_mutex = Mutex.new
channel.confirm_select do |delivery_tag, multiple, nack|
confirms_mutex.synchronize do
if multiple
outstanding_confirms.reject! { |k, _| k <= delivery_tag }
else
outstanding_confirms.delete(delivery_tag)
end
end
if nack
puts "Message with delivery tag #{delivery_tag} was nacked!"
end
end
start_time = Time.now
MESSAGE_COUNT.times do |i|
body = i.to_s
seq_no = channel.next_publish_seq_no
confirms_mutex.synchronize do
outstanding_confirms[seq_no] = body
end
channel.basic_publish(body, '', queue.name)
end
# Wait for any remaining confirmations
channel.wait_for_confirms
end_time = Time.now
puts "Published #{MESSAGE_COUNT} messages and handled confirms asynchronously in #{((end_time - start_time) * 1000).to_i} ms"
ensure
channel.close if channel && channel.open?
end
前面的示例包含一个在确认到达时清理哈希表的回调。注意,此回调同时处理单个和多个确认。回调会检查 nack 标志,如果消息被拒绝,则发出警告。
如何追踪未决的确认?
我们的示例使用标准的 Ruby
Hash来追踪未决的确认。由于确认回调是在客户端库拥有的线程中调用的(该线程与发布线程不同),我们必须使用Mutex来同步对哈希表的访问。
总之,异步处理发布者确认通常需要以下步骤:
- 提供一种将发布序列号与消息关联的方法。
- 在通道上注册确认监听器,以便在收到发布者确认/拒绝时得到通知,从而执行适当的操作,例如记录日志或重新发布被拒绝的消息。序列号与消息的关联机制在此步骤中可能也需要进行清理。
- 在发布消息前追踪发布序列号。
重新发布被拒绝的消息?
直接在对应的回调函数中重新发布被拒绝的消息可能很有诱惑力,但这应该避免,因为确认回调是在 I/O 线程中分发的,而该线程不应执行此类操作。更好的解决方案是将消息放入内存队列中,由发布线程轮询该队列。
thread标准库中的Queue类是实现回调函数和发布线程之间消息传输的理想选择。
总结
确保已发布的消息确实到达了代理,这在某些应用中至关重要。发布者确认是 RabbitMQ 中有助于满足此要求的功能。
- 单个发布消息:简单,但吞吐量较低。
- 批量发布消息:简单,吞吐量合理,但在出错时难以追溯原因。
- 异步处理:性能和资源利用率最佳,在出错时控制力强,但实现起来可能比较复杂。
Bunny 3.0 的 tracking: true 选项配合 basic_publish_batch,使得使用发布者确认变得非常简单,同时能够实现自动背压和高吞吐量。
整合
publisher_confirms.rb 脚本包含了我们所介绍技术的代码。我们可以直接运行它并观察它们的表现。
bundle exec ruby publisher_confirms.rb
如果客户端和服务器位于同一台机器上,你计算机上的输出应该看起来相似。
Published 50000 messages individually in 34929 ms
Published 50000 messages in batch in 1226 ms
Published 50000 messages and handled confirms asynchronously in 4926 ms
发布者确认非常依赖网络,所以最好使用远程节点进行尝试,因为在生产环境中,客户端和服务器通常不在同一台机器上,这样更贴近实际。可以轻松修改 publisher_confirms.rb 以使用非本地节点。
connection = Bunny.new(hostname: 'remote-host', username: 'remote-user', password: 'remote-password')
connection.start
再次执行脚本,并等待结果。
Published 50000 messages individually in 231541 ms
Published 50000 messages in batch in 7232 ms
Published 50000 messages and handled confirms asynchronously in 6332 ms
我们可以看到单个发布现在的表现非常差。但是,在客户端和服务器之间存在网络的情况下,批量发布和异步处理的表现变得相似,其中异步处理发布者确认略有优势。
请记住,批量发布易于实现,但无法轻易得知在收到否定确认时到底是哪些消息未能到达代理。异步处理发布者确认的实现较为复杂,但提供了更好的粒度和对消息被拒绝时所需执行操作的更好控制。