RabbitMQ Streams 消息去重
《RabbitMQ Streams 概览》介绍了 RabbitMQ 3.9 的新功能 Streams,而《RabbitMQ Streams 第一个应用》则概述了 Streams Java 客户端的编程模型。本文将介绍如何在 RabbitMQ Streams 中去重已发布的消息。
由于去重是一个关键且复杂的问题,本文将一步步引导您了解这一机制,从一个简单且存在问题的发布应用程序,到一个优化且可靠的实现。
重复消息的问题
应用程序很容易导致同一条消息被发布多次:应用程序异常重启并从头开始重新发布所有数据,网络故障导致应用程序重新连接并重新发送少量消息,等等。
即使消费应用程序应该使处理具有幂等性,也应尽可能避免重复发布消息,因为它们会减慢处理速度并占用额外空间。
本文将从一个生成大量重复消息的简单应用程序(以帮助理解问题)开始,并逐步改进它,最终得到一个健壮的解决方案。
不带去重的发布
发布程序模拟了一个应用程序,该应用程序从数据源读取记录,并为每条记录发布一条消息。
Producer producer = environment.producerBuilder()
.stream("deduplication-stream")
.build();
int messageCount = 10;
records(0, messageCount).forEach(record -> {
Message message = producer.messageBuilder()
.addData(record.content().getBytes(StandardCharsets.UTF_8))
.build();
producer.send(message, confirmationStatus -> latch.countDown());
});
我们假设应用程序读取所有可用的记录,并且第一次运行时有 10 条记录。如果您需要回顾 Streams Java 客户端 API,可以阅读《RabbitMQ Streams 第一个应用》。
如果您想在阅读文章的同时运行代码,可以继续阅读下一节。请注意,您可以不运行任何代码而继续阅读本文的其余部分,因此如果您不想尝试代码,可以跳过下一节。
设置示例项目
运行示例需要安装 Docker、Git 和 Java 8 或更高版本。您可以使用以下命令启动代理:
docker run -it --rm --name rabbitmq -p 5552:5552 \
-e RABBITMQ_SERVER_ADDITIONAL_ERL_ARGS='-rabbitmq_stream advertised_host localhost' \
rabbitmq:3.9
然后您需要启用 Streams 插件:
docker exec rabbitmq rabbitmq-plugins enable rabbitmq_stream
代码托管在 GitHub 上。以下是如何克隆存储库并创建示例中使用的 stream:
git clone https://github.com/acogoluegnes/rabbitmq-streams-blog-posts.git
cd rabbitmq-streams-blog-posts
./mvnw -q compile exec:java -Dexec.mainClass='com.rabbitmq.stream.Deduplication$CreateEmptyStream'
好的,一切就绪,让我们运行发布应用程序。
第一天运行发布程序
使用以下命令运行发布应用程序:
./mvnw -q compile exec:java -Dexec.mainClass='com.rabbitmq.stream.Deduplication$PublishFirstDay'
Connecting...
Connected.
Publishing 10 messages.
Messages confirmed? yes
第一次运行时,应用程序读取了数据源中的所有记录(本次运行共 10 条记录),并为每条记录发送一条消息。我们可以使用以下命令检查 stream 的内容:
./mvnw -q compile exec:java -Dexec.mainClass='com.rabbitmq.stream.Deduplication$Consume'
Connecting...
Connected.
Starting consuming, press Enter to exit...
message 0
message 1
message 2
message 3
message 4
message 5
message 6
message 7
message 8
message 9
到目前为止一切顺利,我们发布了 10 条消息,并且可以在 stream 中看到 10 条消息。现在让我们看看我们的应用程序是否可行,并在第二次运行时继续正常工作。
第二天运行发布程序
我们可以想象现在我们在第二天运行应用程序,数据源中包含另外 10 条记录,总共 20 条记录。我们的发布应用程序很“笨”:它会读取数据源中的所有内容并发布消息。让我们试试:
./mvnw -q compile exec:java -Dexec.mainClass='com.rabbitmq.stream.Deduplication$PublishSecondDay'
Connecting...
Connected.
Publishing 20 messages.
Messages confirmed? yes
现在 stream 的内容是:
./mvnw -q compile exec:java -Dexec.mainClass='com.rabbitmq.stream.Deduplication$Consume'
Connecting...
Connected.
Starting consuming, press Enter to exit...
message 0
message 1
message 2
...
message 9
message 0
message 1
message 2
...
message 9
message 10
message 11
...
message 19
我们看到了 30 条消息:第一次运行的 10 条和第二次运行的 20 条。前 10 条消息出现了两次,所以我们的 stream 包含重复。按照我们实现应用程序的方式,这是预期的,但我们必须解决这个问题,因为我们只想在第二次运行时发布新记录。
这就是 RabbitMQ Streams 中的去重发挥作用的地方。
带去重的发布
要启用发布去重,我们需要两样东西:
- 为生产者指定一个名称
- 为每条记录指定一个严格递增的序列值,即发布 ID
Stream Java 客户端文档 提供了关于生产者名称和发布 ID 的更多详细信息。请注意,消息去重并非仅限于 Streams Java 客户端,只要遵循语义,任何客户端都可以实现。
我们只需为我们的发布应用程序选择一个名称,并在不同的运行中保持该名称。对于发布 ID,我们可以使用记录的 ID:它的 ID 恰好是唯一的,并且记录是按 ID 排序返回的(例如,就像数据库记录具有数字主键和适当的查询一样)。
现在,这是我们改进了生产者名称和发布 ID 的发布应用程序:
Producer producer = environment.producerBuilder()
.stream("deduplication-stream")
.name("app-1") // provide a name for the producer
.confirmTimeout(Duration.ZERO) // to never stop retrying
.build();
int messageCount = 10;
records(0, messageCount).forEach(record -> {
Message message = producer.messageBuilder()
.publishingId(record.id()) // set the publishing ID
.addData(record.content().getBytes(StandardCharsets.UTF_8))
.build();
producer.send(message, confirmationStatus -> latch.countDown());
});
代理将跟踪此生产者上次的发布 ID。我们将看到这如何实现消息去重。
第一天运行发布程序
让我们先重新创建 stream:
./mvnw -q compile exec:java -Dexec.mainClass='com.rabbitmq.stream.Deduplication$CreateEmptyStream'
Connection...
Connected. Trying to delete stream if it exists.
Stream deleted.
Creating 'deduplication-stream' stream.
Stream created.
然后我们可以运行我们改进后的发布应用程序第一次:
./mvnw -q compile exec:java -Dexec.mainClass='com.rabbitmq.stream.Deduplication$PublishDedupFirstDay'
Connecting...
Connected.
Publishing 10 messages with deduplication enabled.
Messages confirmed? yes
好的,第一天数据源中有 10 条消息。
第二天运行发布程序
现在我们在第二天运行应用程序,增加了 10 条记录。我们的应用程序比第一次“聪明”:它使用生产者名称和发布 ID 进行去重。但它仍然读取数据源中的所有记录:
./mvnw -q compile exec:java -Dexec.mainClass='com.rabbitmq.stream.Deduplication$PublishDedupSecondDay'
Connecting...
Connected.
Publishing 20 messages with deduplication enabled.
Messages confirmed? yes
Stream 的内容如下:
./mvnw -q compile exec:java -Dexec.mainClass='com.rabbitmq.stream.Deduplication$Consume'
Connecting...
Connected.
Starting consuming, press Enter to exit...
message 0
message 1
message 2
...
message 9
message 10
message 11
message 12
...
message 19
这次没有重复,很好!尽管我们重新发布了前 10 条消息,但代理还是设法过滤掉了它们。它知道应该忽略所有发布 ID 小于 9(第一次运行的最后一个值)的消息。请注意,尽管它过滤掉了这些重复项,但它仍然向客户端确认了它们。
这比我们第一次导致重复消息的应用程序要好得多,但仍然存在一个问题:应用程序每次都会重新发送所有消息。如果数据不断增长,每次运行所需的时间将越来越长。幸运的是,可以找到应用程序在上次运行时停止的位置。
知道您在哪里停止:让发布程序更智能
在本节中,我们将通过不仅使用去重,还通过查询代理来获取它发送的最后一个发布 ID,来使发布应用程序更加智能。
第一天运行发布程序
我们必须重新创建空的 stream:
./mvnw -q compile exec:java -Dexec.mainClass='com.rabbitmq.stream.Deduplication$CreateEmptyStream'
Connection...
Connected. Trying to delete stream if it exists.
Stream deleted.
Creating 'deduplication-stream' stream.
Stream created.
然后我们可以重用我们的发布应用程序来发送前 10 条消息:
./mvnw -q compile exec:java -Dexec.mainClass='com.rabbitmq.stream.Deduplication$PublishDedupFirstDay'
Connecting...
Connected.
Publishing 10 messages with deduplication enabled.
Messages confirmed? yes
这个版本的应用程序不够智能,但对于“第一天”来说足够了。
第二天运行(智能)发布程序
第二天,数据源包含 20 条消息,发布应用程序需要做得更好。它可以利用 `Producer#getLastPublishingId` 方法,该方法向代理查询此生产者在此 stream 上的最后一个发布 ID。应用程序可以将此值加 1,这将得到其起始点。然后,它只需从该点选择记录直到最后一个可用记录。这样,它就不会从头开始重新发布。以下代码显示了如何实现这一点:
Producer producer = environment.producerBuilder()
.stream("deduplication-stream")
.name("app-1") // provide a name for the producer
.confirmTimeout(Duration.ZERO) // to never stop retrying
.build();
long start = producer.getLastPublishingId() + 1; // get last publishing ID and add 1
int messageCount = 20;
records(start, messageCount).forEach(record -> {
Message message = producer.messageBuilder()
.publishingId(record.id()) // set the publishing ID
.addData(record.content().getBytes(StandardCharsets.UTF_8))
.build();
producer.send(message, confirmationStatus -> latch.countDown());
});
现在让我们运行这个智能发布程序:
./mvnw -q compile exec:java -Dexec.mainClass='com.rabbitmq.stream.Deduplication$PublishSmartDedupSecondDay'
Connecting...
Connected.
Starting publishing at 10
Publishing 10 message with deduplication enabled.
Messages confirmed? yes
因此,发布程序从 10(上一次运行的最后一个发布 ID 9 + 1)开始,并发布 10 条(总共 20 条,已发布 10 条)新消息。我们可以检查 stream 的内容:
./mvnw -q compile exec:java -Dexec.mainClass='com.rabbitmq.stream.Deduplication$Consume'
Connecting...
Connected.
Starting consuming, press Enter to exit...
message 0
message 1
message 2
...
message 9
message 10
message 11
message 12
...
message 19
我们在 stream 中得到了预期的消息数量,但这次是一个经过优化的发布应用程序。
总结
本文介绍了 RabbitMQ Streams 的去重功能。
- 代理可以检测和过滤重复消息
- 要启用去重,需要为发布应用程序指定一个名称和一个发布 ID
- 生产者名称必须是唯一的,并在应用程序重启之间保持不变
- 发布 ID 是一个严格递增的序列,通常是给定消息的标识符(例如,数据库记录的主键,文件的行号)
- 应用程序应查询代理以获取它们上次使用的发布 ID,以便从上次停止的地方继续
