跳至主内容

RabbitMQ Streams 消息去重

·10 分钟阅读

RabbitMQ Streams 概览》介绍了 RabbitMQ 3.9 的新功能 Streams,而《RabbitMQ Streams 第一个应用》则概述了 Streams Java 客户端的编程模型。本文将介绍如何在 RabbitMQ Streams 中去重已发布的消息。

由于去重是一个关键且复杂的问题,本文将一步步引导您了解这一机制,从一个简单且存在问题的发布应用程序,到一个优化且可靠的实现。

重复消息的问题

应用程序很容易导致同一条消息被发布多次:应用程序异常重启并从头开始重新发布所有数据,网络故障导致应用程序重新连接并重新发送少量消息,等等。

即使消费应用程序应该使处理具有幂等性,也应尽可能避免重复发布消息,因为它们会减慢处理速度并占用额外空间。

本文将从一个生成大量重复消息的简单应用程序(以帮助理解问题)开始,并逐步改进它,最终得到一个健壮的解决方案。

不带去重的发布

发布程序模拟了一个应用程序,该应用程序从数据源读取记录,并为每条记录发布一条消息。

Producer producer = environment.producerBuilder()
.stream("deduplication-stream")
.build();
int messageCount = 10;
records(0, messageCount).forEach(record -> {
Message message = producer.messageBuilder()
.addData(record.content().getBytes(StandardCharsets.UTF_8))
.build();
producer.send(message, confirmationStatus -> latch.countDown());
});

我们假设应用程序读取所有可用的记录,并且第一次运行时有 10 条记录。如果您需要回顾 Streams Java 客户端 API,可以阅读《RabbitMQ Streams 第一个应用》。

如果您想在阅读文章的同时运行代码,可以继续阅读下一节。请注意,您可以不运行任何代码而继续阅读本文的其余部分,因此如果您不想尝试代码,可以跳过下一节。

设置示例项目

运行示例需要安装 Docker、Git 和 Java 8 或更高版本。您可以使用以下命令启动代理:

docker run -it --rm --name rabbitmq -p 5552:5552 \
-e RABBITMQ_SERVER_ADDITIONAL_ERL_ARGS='-rabbitmq_stream advertised_host localhost' \
rabbitmq:3.9

然后您需要启用 Streams 插件:

docker exec rabbitmq rabbitmq-plugins enable rabbitmq_stream

代码托管在 GitHub 上。以下是如何克隆存储库并创建示例中使用的 stream:

git clone https://github.com/acogoluegnes/rabbitmq-streams-blog-posts.git
cd rabbitmq-streams-blog-posts
./mvnw -q compile exec:java -Dexec.mainClass='com.rabbitmq.stream.Deduplication$CreateEmptyStream'

好的,一切就绪,让我们运行发布应用程序。

第一天运行发布程序

使用以下命令运行发布应用程序:

./mvnw -q compile exec:java -Dexec.mainClass='com.rabbitmq.stream.Deduplication$PublishFirstDay'
Connecting...
Connected.
Publishing 10 messages.
Messages confirmed? yes

第一次运行时,应用程序读取了数据源中的所有记录(本次运行共 10 条记录),并为每条记录发送一条消息。我们可以使用以下命令检查 stream 的内容:

./mvnw -q compile exec:java -Dexec.mainClass='com.rabbitmq.stream.Deduplication$Consume'        
Connecting...
Connected.
Starting consuming, press Enter to exit...
message 0
message 1
message 2
message 3
message 4
message 5
message 6
message 7
message 8
message 9

到目前为止一切顺利,我们发布了 10 条消息,并且可以在 stream 中看到 10 条消息。现在让我们看看我们的应用程序是否可行,并在第二次运行时继续正常工作。

第二天运行发布程序

我们可以想象现在我们在第二天运行应用程序,数据源中包含另外 10 条记录,总共 20 条记录。我们的发布应用程序很“笨”:它会读取数据源中的所有内容并发布消息。让我们试试:

./mvnw -q compile exec:java -Dexec.mainClass='com.rabbitmq.stream.Deduplication$PublishSecondDay'
Connecting...
Connected.
Publishing 20 messages.
Messages confirmed? yes

现在 stream 的内容是:

./mvnw -q compile exec:java -Dexec.mainClass='com.rabbitmq.stream.Deduplication$Consume'
Connecting...
Connected.
Starting consuming, press Enter to exit...
message 0
message 1
message 2
...
message 9
message 0
message 1
message 2
...
message 9
message 10
message 11
...
message 19

我们看到了 30 条消息:第一次运行的 10 条和第二次运行的 20 条。前 10 条消息出现了两次,所以我们的 stream 包含重复。按照我们实现应用程序的方式,这是预期的,但我们必须解决这个问题,因为我们只想在第二次运行时发布新记录。

这就是 RabbitMQ Streams 中的去重发挥作用的地方。

带去重的发布

要启用发布去重,我们需要两样东西:

  • 为生产者指定一个名称
  • 为每条记录指定一个严格递增的序列值,即发布 ID

Stream Java 客户端文档 提供了关于生产者名称和发布 ID 的更多详细信息。请注意,消息去重并非仅限于 Streams Java 客户端,只要遵循语义,任何客户端都可以实现。

我们只需为我们的发布应用程序选择一个名称,并在不同的运行中保持该名称。对于发布 ID,我们可以使用记录的 ID:它的 ID 恰好是唯一的,并且记录是按 ID 排序返回的(例如,就像数据库记录具有数字主键和适当的查询一样)。

现在,这是我们改进了生产者名称和发布 ID 的发布应用程序:

Producer producer = environment.producerBuilder()
.stream("deduplication-stream")
.name("app-1") // provide a name for the producer
.confirmTimeout(Duration.ZERO) // to never stop retrying
.build();
int messageCount = 10;
records(0, messageCount).forEach(record -> {
Message message = producer.messageBuilder()
.publishingId(record.id()) // set the publishing ID
.addData(record.content().getBytes(StandardCharsets.UTF_8))
.build();
producer.send(message, confirmationStatus -> latch.countDown());
});

代理将跟踪此生产者上次的发布 ID。我们将看到这如何实现消息去重。

第一天运行发布程序

让我们先重新创建 stream:

./mvnw -q compile exec:java -Dexec.mainClass='com.rabbitmq.stream.Deduplication$CreateEmptyStream'
Connection...
Connected. Trying to delete stream if it exists.
Stream deleted.
Creating 'deduplication-stream' stream.
Stream created.

然后我们可以运行我们改进后的发布应用程序第一次:

./mvnw -q compile exec:java -Dexec.mainClass='com.rabbitmq.stream.Deduplication$PublishDedupFirstDay'
Connecting...
Connected.
Publishing 10 messages with deduplication enabled.
Messages confirmed? yes

好的,第一天数据源中有 10 条消息。

第二天运行发布程序

现在我们在第二天运行应用程序,增加了 10 条记录。我们的应用程序比第一次“聪明”:它使用生产者名称和发布 ID 进行去重。但它仍然读取数据源中的所有记录:

./mvnw -q compile exec:java -Dexec.mainClass='com.rabbitmq.stream.Deduplication$PublishDedupSecondDay'
Connecting...
Connected.
Publishing 20 messages with deduplication enabled.
Messages confirmed? yes

Stream 的内容如下:

./mvnw -q compile exec:java -Dexec.mainClass='com.rabbitmq.stream.Deduplication$Consume'
Connecting...
Connected.
Starting consuming, press Enter to exit...
message 0
message 1
message 2
...
message 9
message 10
message 11
message 12
...
message 19

这次没有重复,很好!尽管我们重新发布了前 10 条消息,但代理还是设法过滤掉了它们。它知道应该忽略所有发布 ID 小于 9(第一次运行的最后一个值)的消息。请注意,尽管它过滤掉了这些重复项,但它仍然向客户端确认了它们。

这比我们第一次导致重复消息的应用程序要好得多,但仍然存在一个问题:应用程序每次都会重新发送所有消息。如果数据不断增长,每次运行所需的时间将越来越长。幸运的是,可以找到应用程序在上次运行时停止的位置。

知道您在哪里停止:让发布程序更智能

在本节中,我们将通过不仅使用去重,还通过查询代理来获取它发送的最后一个发布 ID,来使发布应用程序更加智能。

第一天运行发布程序

我们必须重新创建空的 stream:

./mvnw -q compile exec:java -Dexec.mainClass='com.rabbitmq.stream.Deduplication$CreateEmptyStream'
Connection...
Connected. Trying to delete stream if it exists.
Stream deleted.
Creating 'deduplication-stream' stream.
Stream created.

然后我们可以重用我们的发布应用程序来发送前 10 条消息:

./mvnw -q compile exec:java -Dexec.mainClass='com.rabbitmq.stream.Deduplication$PublishDedupFirstDay'
Connecting...
Connected.
Publishing 10 messages with deduplication enabled.
Messages confirmed? yes

这个版本的应用程序不够智能,但对于“第一天”来说足够了。

第二天运行(智能)发布程序

第二天,数据源包含 20 条消息,发布应用程序需要做得更好。它可以利用 `Producer#getLastPublishingId` 方法,该方法向代理查询此生产者在此 stream 上的最后一个发布 ID。应用程序可以将此值加 1,这将得到其起始点。然后,它只需从该点选择记录直到最后一个可用记录。这样,它就不会从头开始重新发布。以下代码显示了如何实现这一点:

Producer producer = environment.producerBuilder()
.stream("deduplication-stream")
.name("app-1") // provide a name for the producer
.confirmTimeout(Duration.ZERO) // to never stop retrying
.build();
long start = producer.getLastPublishingId() + 1; // get last publishing ID and add 1
int messageCount = 20;
records(start, messageCount).forEach(record -> {
Message message = producer.messageBuilder()
.publishingId(record.id()) // set the publishing ID
.addData(record.content().getBytes(StandardCharsets.UTF_8))
.build();
producer.send(message, confirmationStatus -> latch.countDown());
});

现在让我们运行这个智能发布程序:

./mvnw -q compile exec:java -Dexec.mainClass='com.rabbitmq.stream.Deduplication$PublishSmartDedupSecondDay'
Connecting...
Connected.
Starting publishing at 10
Publishing 10 message with deduplication enabled.
Messages confirmed? yes

因此,发布程序从 10(上一次运行的最后一个发布 ID 9 + 1)开始,并发布 10 条(总共 20 条,已发布 10 条)新消息。我们可以检查 stream 的内容:

./mvnw -q compile exec:java -Dexec.mainClass='com.rabbitmq.stream.Deduplication$Consume'
Connecting...
Connected.
Starting consuming, press Enter to exit...
message 0
message 1
message 2
...
message 9
message 10
message 11
message 12
...
message 19

我们在 stream 中得到了预期的消息数量,但这次是一个经过优化的发布应用程序。

总结

本文介绍了 RabbitMQ Streams 的去重功能。

  • 代理可以检测和过滤重复消息
  • 要启用去重,需要为发布应用程序指定一个名称和一个发布 ID
  • 生产者名称必须是唯一的,并在应用程序重启之间保持不变
  • 发布 ID 是一个严格递增的序列,通常是给定消息的标识符(例如,数据库记录的主键,文件的行号)
  • 应用程序应查询代理以获取它们上次使用的发布 ID,以便从上次停止的地方继续
© . This site is unofficial and not affiliated with VMware.