跳至主内容

RabbitMQ Streams 消息去重

·10 分钟阅读

RabbitMQ Streams 概览》介绍了 RabbitMQ 3.9 的新功能 Streams,而《RabbitMQ Streams 第一个应用》则概述了 Streams Java 客户端的编程模型。本文将介绍如何在 RabbitMQ Streams 中去重已发布的消息。

由于去重是一个关键且复杂的问题,本文将一步步引导您了解这一机制,从一个简单且存在问题的发布应用程序,到一个优化且可靠的实现。

重复消息的问题

应用程序多次发布同一条消息非常容易发生:应用程序以错误的方式重启并从头开始重新发布所有数据、网络故障导致应用程序重新连接并重新发送几条消息,等等。

尽管消费应用程序应该确保其处理过程是幂等的,但应尽可能避免重复发布消息,因为它们会减慢处理速度并占用额外空间。

本文将从一个会产生大量重复消息的简单应用程序入手(以帮助理解该问题),并逐步对其进行改进,最终得出一个稳健的解决方案。

不进行去重的发布

该发布程序模仿了一个从数据源读取记录并为每条记录发布一条消息的应用程序。

Producer producer = environment.producerBuilder()
.stream("deduplication-stream")
.build();
int messageCount = 10;
records(0, messageCount).forEach(record -> {
Message message = producer.messageBuilder()
.addData(record.content().getBytes(StandardCharsets.UTF_8))
.build();
producer.send(message, confirmationStatus -> latch.countDown());
});

假设该应用程序读取了所有可用的记录,第一次运行时总共为 10 条。如果您需要回顾 Stream Java 客户端 API,可以阅读 RabbitMQ Streams 首个应用程序

如果您想在阅读时运行代码,可以跳转到下一节。请注意,您无需运行任何内容即可阅读本文的其余部分,因此如果您不想尝试代码,可以跳过下一节。

设置示例项目

运行示例需要安装 Docker、Git 和 Java 8 或更高版本。您可以使用以下命令启动代理:

docker run -it --rm --name rabbitmq -p 5552:5552 \
-e RABBITMQ_SERVER_ADDITIONAL_ERL_ARGS='-rabbitmq_stream advertised_host localhost' \
rabbitmq:3.9

然后您需要启用 Streams 插件:

docker exec rabbitmq rabbitmq-plugins enable rabbitmq_stream

代码托管在 GitHub 上。以下是克隆仓库并创建示例所用流的方法:

git clone https://github.com/acogoluegnes/rabbitmq-streams-blog-posts.git
cd rabbitmq-streams-blog-posts
./mvnw -q compile exec:java -Dexec.mainClass='com.rabbitmq.stream.Deduplication$CreateEmptyStream'

好了,一切准备就绪,让我们运行发布应用程序。

第一天运行发布程序

使用以下命令运行发布应用程序:

./mvnw -q compile exec:java -Dexec.mainClass='com.rabbitmq.stream.Deduplication$PublishFirstDay'
Connecting...
Connected.
Publishing 10 messages.
Messages confirmed? yes

在第一次运行中,应用程序从数据源读取了所有记录(本次运行总共 10 条记录),并为每条记录发送了一条消息。我们可以通过以下命令检查流的内容:

./mvnw -q compile exec:java -Dexec.mainClass='com.rabbitmq.stream.Deduplication$Consume'
Connecting...
Connected.
Starting consuming, press Enter to exit...
message 0
message 1
message 2
message 3
message 4
message 5
message 6
message 7
message 8
message 9

目前为止一切顺利,我们发布了 10 条消息,流中也能看到 10 条消息。现在让我们看看我们的应用程序是否可行,以及在第二次运行时能否继续正常工作。

第二天运行发布程序

现在设想一下,我们在第二天运行该应用程序,此时数据源包含了 10 条额外记录,总共 20 条记录。我们的发布应用程序很“笨”:它会读取数据源中的所有内容并发布消息。让我们试试:

./mvnw -q compile exec:java -Dexec.mainClass='com.rabbitmq.stream.Deduplication$PublishSecondDay'
Connecting...
Connected.
Publishing 20 messages.
Messages confirmed? yes

现在查看流的内容:

./mvnw -q compile exec:java -Dexec.mainClass='com.rabbitmq.stream.Deduplication$Consume'
Connecting...
Connected.
Starting consuming, press Enter to exit...
message 0
message 1
message 2
...
message 9
message 0
message 1
message 2
...
message 9
message 10
message 11
...
message 19

我们看到了 30 条消息:来自第一次运行的 10 条和来自第二次运行的 20 条。前 10 条出现了两次,因此我们的流中包含了重复项。按照我们实现应用程序的方式,这是预料之中的,但我们必须解决这个问题,因为我们只想在第二次运行时发布新记录。

这就是 RabbitMQ Streams 中去重功能的用武之地。

使用去重功能进行发布

我们需要两样东西来启用发布去重功能:

  • 生产者(producer)的名称
  • 每条记录的严格递增序列值,即发布 ID (publishing ID)

Stream Java 客户端文档提供了有关生产者名称和发布 ID 的更多详细信息。请注意,消息去重并非 Stream Java 客户端特有,只要符合语义,任何客户端都可以实现。

我们只需要为发布应用程序选择一个名称,并在不同的运行过程中保持该名称不变。对于发布 ID,我们可以使用记录的 ID:它恰好是唯一的,并且记录是按 ID 排序返回的(例如,就像具有数值主键和适当查询的数据库记录一样)。

以下是修改了生产者名称和发布 ID 后的发布应用程序:

Producer producer = environment.producerBuilder()
.stream("deduplication-stream")
.name("app-1") // provide a name for the producer
.confirmTimeout(Duration.ZERO) // to never stop retrying
.build();
int messageCount = 10;
records(0, messageCount).forEach(record -> {
Message message = producer.messageBuilder()
.publishingId(record.id()) // set the publishing ID
.addData(record.content().getBytes(StandardCharsets.UTF_8))
.build();
producer.send(message, confirmationStatus -> latch.countDown());
});

Broker 将跟踪该生产者的最后一个发布 ID。我们将看到这如何实现消息去重。

第一天运行发布程序

首先让我们重新创建流:

./mvnw -q compile exec:java -Dexec.mainClass='com.rabbitmq.stream.Deduplication$CreateEmptyStream'
Connection...
Connected. Trying to delete stream if it exists.
Stream deleted.
Creating 'deduplication-stream' stream.
Stream created.

然后我们可以第一次运行改进后的发布应用程序:

./mvnw -q compile exec:java -Dexec.mainClass='com.rabbitmq.stream.Deduplication$PublishDedupFirstDay'
Connecting...
Connected.
Publishing 10 messages with deduplication enabled.
Messages confirmed? yes

好的,第一天数据源中有 10 条消息。

第二天运行发布程序

现在我们在第二天运行应用程序,并增加了 10 条记录。我们的应用程序比第一次时聪明了一些:它使用生产者名称和发布 ID 进行去重。但它仍然读取数据源中的所有记录:

./mvnw -q compile exec:java -Dexec.mainClass='com.rabbitmq.stream.Deduplication$PublishDedupSecondDay'
Connecting...
Connected.
Publishing 20 messages with deduplication enabled.
Messages confirmed? yes

查看流的内容:

./mvnw -q compile exec:java -Dexec.mainClass='com.rabbitmq.stream.Deduplication$Consume'
Connecting...
Connected.
Starting consuming, press Enter to exit...
message 0
message 1
message 2
...
message 9
message 10
message 11
message 12
...
message 19

这次没有重复了,太棒了!尽管我们重新发布了前 10 条消息,但 Broker 设法过滤掉了它们。它知道应该忽略所有发布 ID 小于 9(第一次运行中的最后一个值)的消息。请注意,即使它过滤掉了这些重复项,它仍然向客户端确认了这些消息。

这比我们之前导致重复的第一个应用程序要好得多,但仍然有一个问题:应用程序每次都会重新发送所有消息。如果数据持续增长,应用程序每次运行所需的时间会越来越长。幸运的是,我们可以找出应用程序上次运行停止的位置。

了解停止位置:让发布者更智能

在本节中,我们将看到如何通过不仅使用去重,还向 Broker 查询其发送的最后一个发布 ID,来使发布应用程序变得更加智能。

第一天运行发布程序

我们必须重新创建空的流:

./mvnw -q compile exec:java -Dexec.mainClass='com.rabbitmq.stream.Deduplication$CreateEmptyStream'
Connection...
Connected. Trying to delete stream if it exists.
Stream deleted.
Creating 'deduplication-stream' stream.
Stream created.

然后我们可以重新使用发布应用程序来发送前 10 条消息:

./mvnw -q compile exec:java -Dexec.mainClass='com.rabbitmq.stream.Deduplication$PublishDedupFirstDay'
Connecting...
Connected.
Publishing 10 messages with deduplication enabled.
Messages confirmed? yes

此版本的应用程序虽不够聪明,但足以应付“第一天”的情况。

第一天运行(智能)发布程序

发布应用程序需要在第二天(此时数据源包含 20 条消息)做得更好。它可以使用 Producer#getLastPublishingId 方法,该方法会向 Broker 查询此流中该生产者的最后一个发布 ID。应用程序可以将此值加 1,从而获得起始点。然后,它只需选择从该点到最后一条可用记录之间的记录即可。这样,它就不必从头开始重新发布。以下代码展示了如何实现这一点:

Producer producer = environment.producerBuilder()
.stream("deduplication-stream")
.name("app-1") // provide a name for the producer
.confirmTimeout(Duration.ZERO) // to never stop retrying
.build();
long start = producer.getLastPublishingId() + 1; // get last publishing ID and add 1
int messageCount = 20;
records(start, messageCount).forEach(record -> {
Message message = producer.messageBuilder()
.publishingId(record.id()) // set the publishing ID
.addData(record.content().getBytes(StandardCharsets.UTF_8))
.build();
producer.send(message, confirmationStatus -> latch.countDown());
});

现在运行这个智能发布者:

./mvnw -q compile exec:java -Dexec.mainClass='com.rabbitmq.stream.Deduplication$PublishSmartDedupSecondDay'
Connecting...
Connected.
Starting publishing at 10
Publishing 10 message with deduplication enabled.
Messages confirmed? yes

因此,发布者从 10 开始(第一次运行的最后一个发布 ID 为 9,加 1),并发布了 10 条新消息(总共 20 条,减去已发布的 10 条)。我们可以检查流的内容:

./mvnw -q compile exec:java -Dexec.mainClass='com.rabbitmq.stream.Deduplication$Consume'
Connecting...
Connected.
Starting consuming, press Enter to exit...
message 0
message 1
message 2
...
message 9
message 10
message 11
message 12
...
message 19

我们在流中得到了预期的消息数量,但这次使用的是经过优化的发布应用程序。

总结

本篇博客涵盖了 RabbitMQ Streams 的去重功能:

  • Broker 可以检测并过滤掉重复的消息
  • 需要生产者名称和发布 ID 来启用去重
  • 生产者名称必须唯一,并在应用程序重启之间复用
  • 发布 ID 是严格递增的序列,通常是给定消息的标识符(例如数据库记录的主键、文件中的行号)
  • 应用程序应查询 Broker 获取它们使用的最后一个发布 ID,以便从停止的地方重新开始
© . This site is unofficial and not affiliated with VMware.