跳至主要内容

使用 RabbitMQ Streams 进行消息去重

·阅读时长:9 分钟

RabbitMQ Streams 概述 介绍了流,RabbitMQ 3.9 中的新功能,以及 RabbitMQ Streams 首个应用程序 提供了使用流 Java 客户端的编程模型概述。本文介绍了如何在 RabbitMQ Streams 中对发布的消息进行去重。

由于去重是一个至关重要且复杂的概念,本文将逐步引导您完成此机制,从一个天真且有些错误的发布应用程序到一个经过优化且可靠的实现。

重复消息的问题

对于应用程序来说,多次发布相同的消息非常容易:应用程序以错误的方式重新启动并从头开始重新发布所有数据,网络故障使应用程序重新连接并重新发送几个消息,等等。

即使消费应用程序应使其处理幂等,也应尽可能避免重复发布的消息,因为它们会减慢处理速度并占用额外的空间。

本文将从一个简单的应用程序开始,该应用程序生成大量重复消息(以帮助理解问题),并将逐步改进它,最终获得一个稳健的解决方案。

无去重发布

发布程序模拟了一个应用程序,该应用程序从数据源读取记录并为每个记录发布一条消息。

Producer producer = environment.producerBuilder()
.stream("deduplication-stream")
.build();
int messageCount = 10;
records(0, messageCount).forEach(record -> {
Message message = producer.messageBuilder()
.addData(record.content().getBytes(StandardCharsets.UTF_8))
.build();
producer.send(message, confirmationStatus -> latch.countDown());
});

我们假设应用程序读取所有可用的记录,并且该数量对于第一次运行为 10。如果您需要关于流 Java 客户端 API 的提醒,可以阅读 RabbitMQ Streams 首个应用程序

如果您想在阅读时运行代码,可以继续到下一节。请注意,您可以在不运行任何代码的情况下继续阅读本文的其余部分,因此如果您不想尝试代码,可以跳过下一节。

设置示例项目

运行示例需要安装 Docker、Git 和 Java 8 或更高版本。您可以使用以下命令启动代理

docker run -it --rm --name rabbitmq -p 5552:5552 \
-e RABBITMQ_SERVER_ADDITIONAL_ERL_ARGS='-rabbitmq_stream advertised_host localhost' \
rabbitmq:3.9

然后需要启用流插件

docker exec rabbitmq rabbitmq-plugins enable rabbitmq_stream

代码托管在 GitHub 上。以下是如何克隆存储库并创建示例中使用的流

git clone https://github.com/acogoluegnes/rabbitmq-streams-blog-posts.git
cd rabbitmq-streams-blog-posts
./mvnw -q compile exec:java -Dexec.mainClass='com.rabbitmq.stream.Deduplication$CreateEmptyStream'

好的,您已经设置好了,让我们运行发布应用程序。

第一天运行发布程序

使用以下命令运行发布应用程序

./mvnw -q compile exec:java -Dexec.mainClass='com.rabbitmq.stream.Deduplication$PublishFirstDay'
Connecting...
Connected.
Publishing 10 messages.
Messages confirmed? yes

在第一次运行中,应用程序从数据源读取所有记录(对于此运行,总共有 10 个记录)并为每个记录发送一条消息。我们可以使用以下命令检查流的内容

./mvnw -q compile exec:java -Dexec.mainClass='com.rabbitmq.stream.Deduplication$Consume'        
Connecting...
Connected.
Starting consuming, press Enter to exit...
message 0
message 1
message 2
message 3
message 4
message 5
message 6
message 7
message 8
message 9

到目前为止,一切都很好,我们发布了 10 条消息,并且可以在流中看到 10 条消息。现在让我们检查一下我们的应用程序是否可行,并且在第二次运行时也能正常工作。

第二天运行发布程序

现在我们可以想象一下,我们在第二天运行应用程序,数据源包含 10 个额外的记录,总共 20 个记录。我们的发布应用程序很笨拙:它将从数据源读取所有内容并发布消息。让我们试试

./mvnw -q compile exec:java -Dexec.mainClass='com.rabbitmq.stream.Deduplication$PublishSecondDay'
Connecting...
Connected.
Publishing 20 messages.
Messages confirmed? yes

现在流的内容为

./mvnw -q compile exec:java -Dexec.mainClass='com.rabbitmq.stream.Deduplication$Consume'
Connecting...
Connected.
Starting consuming, press Enter to exit...
message 0
message 1
message 2
...
message 9
message 0
message 1
message 2
...
message 9
message 10
message 11
...
message 19

我们看到 30 条消息:第一次运行的 10 条和第二次运行的 20 条。前 10 条出现了两次,因此我们的流包含重复的消息。按照我们实现应用程序的方式,这是预期的结果,但我们必须修复它,因为我们只希望在第二次运行时发布新记录。

这就是 RabbitMQ Streams 中去重的作用。

带有去重功能的发布

我们需要 2 个东西来在发布时启用去重

  • 生产者的名称
  • 每个记录的严格递增序列值,即发布 ID

流 Java 客户端文档 提供了有关生产者名称和发布 ID 的更多详细信息。请注意,消息去重不特定于流 Java 客户端,只要符合语义,任何客户端都可以实现它。

我们只需要为我们的发布应用程序选择一个名称,并在不同的运行中保持这个名称。对于发布 ID,我们可以使用记录的 ID:它恰好是唯一的,并且记录按 ID 排序返回(例如,就像来自数据库的记录一样,具有数字主键和适当的查询)。

以下是现在我们的发布应用程序,其中进行了生产者名称和发布 ID 的更改

Producer producer = environment.producerBuilder()
.stream("deduplication-stream")
.name("app-1") // provide a name for the producer
.confirmTimeout(Duration.ZERO) // to never stop retrying
.build();
int messageCount = 10;
records(0, messageCount).forEach(record -> {
Message message = producer.messageBuilder()
.publishingId(record.id()) // set the publishing ID
.addData(record.content().getBytes(StandardCharsets.UTF_8))
.build();
producer.send(message, confirmationStatus -> latch.countDown());
});

代理将跟踪此生产者的最后一个发布 ID。我们将看到这如何允许对消息进行去重。

第一天运行发布程序

首先让我们重新创建流

./mvnw -q compile exec:java -Dexec.mainClass='com.rabbitmq.stream.Deduplication$CreateEmptyStream'
Connection...
Connected. Trying to delete stream if it exists.
Stream deleted.
Creating 'deduplication-stream' stream.
Stream created.

然后我们可以第一次运行改进后的发布应用程序

./mvnw -q compile exec:java -Dexec.mainClass='com.rabbitmq.stream.Deduplication$PublishDedupFirstDay'
Connecting...
Connected.
Publishing 10 messages with deduplication enabled.
Messages confirmed? yes

好的,第一天数据源中有 10 条消息。

第二天运行发布程序

我们现在在第二天运行应用程序,其中包含额外的 10 条记录。我们的应用程序不像第一次运行那样笨拙:它使用生产者名称和发布 ID 进行去重。但它仍然读取数据源中的所有记录

./mvnw -q compile exec:java -Dexec.mainClass='com.rabbitmq.stream.Deduplication$PublishDedupSecondDay'
Connecting...
Connected.
Publishing 20 messages with deduplication enabled.
Messages confirmed? yes

以及流的内容

./mvnw -q compile exec:java -Dexec.mainClass='com.rabbitmq.stream.Deduplication$Consume'
Connecting...
Connected.
Starting consuming, press Enter to exit...
message 0
message 1
message 2
...
message 9
message 10
message 11
message 12
...
message 19

这次没有重复,不错!即使我们重新发布了前 10 条消息,代理也成功地过滤掉了它们。它知道应该忽略所有发布 ID 小于 9(第一次运行的最后一个值)的消息。请注意,即使它过滤掉了这些重复的消息,它仍然向客户端确认了它们。

这比我们第一次运行的应用程序要好得多,在第一次运行中我们最终得到了重复的消息,但仍然存在一个问题:应用程序每次都会重新发送所有消息。如果数据不断增长,应用程序每次运行所需的时间会越来越长。幸运的是,可以找出应用程序在上次运行时停止的位置。

了解您的停止位置:使发布程序更智能

在本节中,我们将看到如何通过不仅使用去重,还查询代理以获取其发送的最后一个发布 ID 来使发布应用程序更智能。

第一天运行发布程序

我们必须重新创建空的流

./mvnw -q compile exec:java -Dexec.mainClass='com.rabbitmq.stream.Deduplication$CreateEmptyStream'
Connection...
Connected. Trying to delete stream if it exists.
Stream deleted.
Creating 'deduplication-stream' stream.
Stream created.

我们可以重新使用我们的发布应用程序来发送前 10 条消息

./mvnw -q compile exec:java -Dexec.mainClass='com.rabbitmq.stream.Deduplication$PublishDedupFirstDay'
Connecting...
Connected.
Publishing 10 messages with deduplication enabled.
Messages confirmed? yes

这个版本的应用程序不是最智能的,但对于“第一天”来说已经足够了。

第一天运行(智能)发布程序

发布应用程序需要在第二天做得更好,因为数据源现在包含 20 条消息。它可以使用 Producer#getLastPublishingId 方法,该方法查询代理以获取此生产者在此流上的最后一个发布 ID。应用程序可以为此值加 1,它将获得其起点。然后它只需要从这个点选择记录到最后一个可用的记录。这样它就不会从头开始重新发布。以下代码展示了如何做到这一点

Producer producer = environment.producerBuilder()
.stream("deduplication-stream")
.name("app-1") // provide a name for the producer
.confirmTimeout(Duration.ZERO) // to never stop retrying
.build();
long start = producer.getLastPublishingId() + 1; // get last publishing ID and add 1
int messageCount = 20;
records(start, messageCount).forEach(record -> {
Message message = producer.messageBuilder()
.publishingId(record.id()) // set the publishing ID
.addData(record.content().getBytes(StandardCharsets.UTF_8))
.build();
producer.send(message, confirmationStatus -> latch.countDown());
});

现在让我们运行这个智能发布程序

./mvnw -q compile exec:java -Dexec.mainClass='com.rabbitmq.stream.Deduplication$PublishSmartDedupSecondDay'
Connecting...
Connected.
Starting publishing at 10
Publishing 10 message with deduplication enabled.
Messages confirmed? yes

因此发布程序从 10(9,第一次运行的最后一个发布 ID,+ 1)开始,并发布 10(20,总数,- 10 已经发布)条新消息。我们可以检查流的内容

./mvnw -q compile exec:java -Dexec.mainClass='com.rabbitmq.stream.Deduplication$Consume'
Connecting...
Connected.
Starting consuming, press Enter to exit...
message 0
message 1
message 2
...
message 9
message 10
message 11
message 12
...
message 19

我们得到了流中预期数量的消息,但这次发布应用程序得到了优化。

总结

这篇博文介绍了 RabbitMQ Streams 的去重功能。

  • 代理可以检测并过滤掉重复的消息
  • 需要生产应用程序的名称和发布 ID 来启用去重
  • 生产者名称必须是唯一的,并且在应用程序重启之间重复使用
  • 发布 ID 是严格递增的序列,它通常是给定消息的标识符(例如,数据库记录的主键,文件中的行)
  • 应用程序应查询代理以获取其使用的最后一个发布 ID,以便从其停止的地方重新开始
© 2024 RabbitMQ. All rights reserved.