跳到主要内容

使用 RabbitMQ Streams 的消息去重

·9 分钟阅读

RabbitMQ Streams 概览 介绍了 streams,这是 RabbitMQ 3.9 中的一项新功能,RabbitMQ Streams 首个应用 概述了使用 stream Java 客户端的编程模型。这篇文章介绍了如何在 RabbitMQ Streams 中去重已发布的消息。

由于去重是一个关键且复杂概念,这篇文章将逐步引导您了解此机制,从一个幼稚且有些损坏的发布应用程序到一个优化且可靠的实现。

重复消息的问题

应用程序很容易多次发布相同的消息:应用程序以错误的方式重启并重新发布所有数据,网络故障使应用程序重新连接并重新发送几条消息等等。

即使消费应用程序应该使其处理具有幂等性,也应尽可能避免重复发布的 сообщения,因为它们会减慢处理速度并占用额外的空间。

这篇文章将从一个生成大量重复消息的简单应用程序开始(以帮助掌握问题),并将逐步改进它,最终得到一个健壮的解决方案。

不进行去重发布

发布程序模拟一个应用程序,该应用程序从数据源读取记录并为每个记录发布消息

Producer producer = environment.producerBuilder()
.stream("deduplication-stream")
.build();
int messageCount = 10;
records(0, messageCount).forEach(record -> {
Message message = producer.messageBuilder()
.addData(record.content().getBytes(StandardCharsets.UTF_8))
.build();
producer.send(message, confirmationStatus -> latch.countDown());
});

我们假设应用程序读取所有可用的记录,并且第一次运行时该数字为 10。如果您想回顾一下 stream Java 客户端 API,可以阅读 RabbitMQ Streams 首个应用

如果您想在阅读时运行代码,可以继续下一节。请注意,即使您不想尝试代码,也可以继续阅读本文的其余部分。

设置示例项目

运行示例需要安装 Docker、Git 和 Java 8 或更高版本。您可以使用以下命令启动 broker

docker run -it --rm --name rabbitmq -p 5552:5552 \
-e RABBITMQ_SERVER_ADDITIONAL_ERL_ARGS='-rabbitmq_stream advertised_host localhost' \
rabbitmq:3.9

然后您需要启用 stream 插件

docker exec rabbitmq rabbitmq-plugins enable rabbitmq_stream

代码托管在 GitHub 上。以下是如何克隆存储库并创建示例中使用的 stream

git clone https://github.com/acogoluegnes/rabbitmq-streams-blog-posts.git
cd rabbitmq-streams-blog-posts
./mvnw -q compile exec:java -Dexec.mainClass='com.rabbitmq.stream.Deduplication$CreateEmptyStream'

好的,一切就绪,让我们运行发布应用程序。

第一天运行发布者

使用以下命令运行发布应用程序

./mvnw -q compile exec:java -Dexec.mainClass='com.rabbitmq.stream.Deduplication$PublishFirstDay'
Connecting...
Connected.
Publishing 10 messages.
Messages confirmed? yes

在第一次运行时,应用程序从数据源读取所有记录(对于本次运行,总共 10 条记录),并为每条记录发送一条消息。我们可以使用以下命令检查 stream 的内容

./mvnw -q compile exec:java -Dexec.mainClass='com.rabbitmq.stream.Deduplication$Consume'        
Connecting...
Connected.
Starting consuming, press Enter to exit...
message 0
message 1
message 2
message 3
message 4
message 5
message 6
message 7
message 8
message 9

到目前为止,一切顺利,我们发布了 10 条消息,并且可以在 stream 中看到 10 条消息。现在让我们看看我们的应用程序是否可行,并在第二次运行时保持正常工作。

第二天运行发布者

现在我们可以想象在第二天运行应用程序,数据源包含另外 10 条记录,因此总共有 20 条记录。我们的发布应用程序很笨:它将从数据源读取所有内容并发布消息。让我们试试

./mvnw -q compile exec:java -Dexec.mainClass='com.rabbitmq.stream.Deduplication$PublishSecondDay'
Connecting...
Connected.
Publishing 20 messages.
Messages confirmed? yes

现在 stream 的内容

./mvnw -q compile exec:java -Dexec.mainClass='com.rabbitmq.stream.Deduplication$Consume'
Connecting...
Connected.
Starting consuming, press Enter to exit...
message 0
message 1
message 2
...
message 9
message 0
message 1
message 2
...
message 9
message 10
message 11
...
message 19

我们看到 30 条消息:第一次运行的 10 条消息和第二次运行的 20 条消息。前 10 条消息出现了两次,因此我们的 stream 包含重复项。按照我们实现应用程序的方式,这是预期的,但我们必须修复这个问题,因为我们只希望在新记录在第二次运行时发布。

这就是 RabbitMQ Streams 中的去重发挥作用的地方。

进行去重发布

我们需要 2 件事才能在发布时启用去重

  • 生产者的名称
  • 每条记录的严格递增序列值,即发布 ID

stream Java 客户端文档 提供了有关生产者名称和发布 ID 的更多详细信息。请注意,消息去重并非 stream Java 客户端特有,任何客户端只要符合语义都可以实现。

我们只需要为我们的发布应用程序选择一个名称,并在不同的运行中保留此名称。对于发布 ID,我们可以使用记录的 ID:它恰好是唯一的,并且记录按 ID 排序返回(例如,就像数据库中具有数字主键和适当查询的记录一样)。

这是我们现在具有生产者名称和发布 ID 更改的发布应用程序

Producer producer = environment.producerBuilder()
.stream("deduplication-stream")
.name("app-1") // provide a name for the producer
.confirmTimeout(Duration.ZERO) // to never stop retrying
.build();
int messageCount = 10;
records(0, messageCount).forEach(record -> {
Message message = producer.messageBuilder()
.publishingId(record.id()) // set the publishing ID
.addData(record.content().getBytes(StandardCharsets.UTF_8))
.build();
producer.send(message, confirmationStatus -> latch.countDown());
});

broker 将跟踪此生产者的最后一个发布 ID。我们将看到这如何允许去重消息。

第一天运行发布者

让我们首先重新创建我们的 stream

./mvnw -q compile exec:java -Dexec.mainClass='com.rabbitmq.stream.Deduplication$CreateEmptyStream'
Connection...
Connected. Trying to delete stream if it exists.
Stream deleted.
Creating 'deduplication-stream' stream.
Stream created.

然后我们可以第一次运行我们改进的发布应用程序

./mvnw -q compile exec:java -Dexec.mainClass='com.rabbitmq.stream.Deduplication$PublishDedupFirstDay'
Connecting...
Connected.
Publishing 10 messages with deduplication enabled.
Messages confirmed? yes

好的,第一天数据源中有 10 条消息。

第二天运行发布者

现在我们在第二天运行我们的应用程序,并添加了额外的 10 条记录。我们的应用程序没有第一次那么笨:它使用生产者名称和发布 ID 进行去重。但它仍然读取数据源中的所有记录

./mvnw -q compile exec:java -Dexec.mainClass='com.rabbitmq.stream.Deduplication$PublishDedupSecondDay'
Connecting...
Connected.
Publishing 20 messages with deduplication enabled.
Messages confirmed? yes

以及 stream 的内容

./mvnw -q compile exec:java -Dexec.mainClass='com.rabbitmq.stream.Deduplication$Consume'
Connecting...
Connected.
Starting consuming, press Enter to exit...
message 0
message 1
message 2
...
message 9
message 10
message 11
message 12
...
message 19

这次没有重复项,很好!即使我们重新发布了前 10 条消息,broker 还是设法过滤掉了它们。它知道应该忽略所有发布 ID 小于 9 的消息(第一次运行中的最后一个值)。请注意,即使它过滤掉了这些重复项,它仍然向客户端确认了它们。

这比我们第一个最终得到重复项的应用程序好得多,但仍然存在一个问题:应用程序每次都会重新发送所有消息。如果数据持续增长,应用程序每次运行将花费越来越多的时间。幸运的是,有可能找出应用程序上次运行的停止位置。

了解您的停止位置:使发布者更智能

在本节中,我们将了解如何通过不仅使用去重,还通过查询 broker 以获取其发送的最后一个发布 ID,使发布应用程序更加智能。

第一天运行发布者

我们必须重新创建我们的空 stream

./mvnw -q compile exec:java -Dexec.mainClass='com.rabbitmq.stream.Deduplication$CreateEmptyStream'
Connection...
Connected. Trying to delete stream if it exists.
Stream deleted.
Creating 'deduplication-stream' stream.
Stream created.

我们可以重新使用我们的发布应用程序来发送前 10 条消息

./mvnw -q compile exec:java -Dexec.mainClass='com.rabbitmq.stream.Deduplication$PublishDedupFirstDay'
Connecting...
Connected.
Publishing 10 messages with deduplication enabled.
Messages confirmed? yes

此版本的应用程序不是最智能的,但对于“第一天”来说已经足够了。

第一天运行(智能)发布者

对于第二天,发布应用程序需要做得更好,此时数据源现在包含 20 条消息。它可以使用 Producer#getLastPublishingId 方法,该方法向 broker 查询此生产者针对此 stream 的最后一个发布 ID。应用程序可以将此值加 1,它将获得其起点。然后,它只需从此点到最后一个可用记录中选择记录即可。这样,它就不会从头开始重新发布。以下代码显示了如何执行此操作

Producer producer = environment.producerBuilder()
.stream("deduplication-stream")
.name("app-1") // provide a name for the producer
.confirmTimeout(Duration.ZERO) // to never stop retrying
.build();
long start = producer.getLastPublishingId() + 1; // get last publishing ID and add 1
int messageCount = 20;
records(start, messageCount).forEach(record -> {
Message message = producer.messageBuilder()
.publishingId(record.id()) // set the publishing ID
.addData(record.content().getBytes(StandardCharsets.UTF_8))
.build();
producer.send(message, confirmationStatus -> latch.countDown());
});

现在让我们运行这个智能发布者

./mvnw -q compile exec:java -Dexec.mainClass='com.rabbitmq.stream.Deduplication$PublishSmartDedupSecondDay'
Connecting...
Connected.
Starting publishing at 10
Publishing 10 message with deduplication enabled.
Messages confirmed? yes

因此,发布者从 10 开始(9,第一次运行的最后一个发布 ID,+ 1),并发布 10 条(20,总共,- 10 条已发布)新消息。我们可以检查 stream 的内容

./mvnw -q compile exec:java -Dexec.mainClass='com.rabbitmq.stream.Deduplication$Consume'
Connecting...
Connected.
Starting consuming, press Enter to exit...
message 0
message 1
message 2
...
message 9
message 10
message 11
message 12
...
message 19

我们在 stream 中获得了预期数量的消息,但这次使用的是优化的发布应用程序。

总结

这篇博客介绍了 RabbitMQ Streams 的去重功能。

  • broker 可以检测并过滤掉重复消息
  • 需要发布应用程序的名称和发布 ID 才能启用去重
  • 生产者名称必须是唯一的,并在应用程序重启之间重复使用
  • 发布 ID 是一个严格递增的序列,它通常是给定消息的标识符(例如,数据库记录的主键、文件中的行)
  • 应用程序应向 broker 查询他们使用的最后一个发布 ID,以便从他们停止的位置重新开始
© . All rights reserved.