跳至主内容
版本:4.2

拦截消息

概述

RabbitMQ 提供了一种通用的机制来拦截 Broker 上的消息。拦截可以在两个阶段发生:

  1. 入站消息 – 当消息进入 RabbitMQ 时,在路由到 队列 之前进行拦截。
  2. 出站消息 – 当 RabbitMQ 将消息传递给客户端时,在将其 转换 为目标协议之前进行拦截。

拦截器由以下 Erlang 进程之一执行:

  • AMQP 1.0 会话
  • AMQP 0.9.1 通道
  • MQTT 连接

通过 RabbitMQ Streams 协议发送的消息不会被拦截。

消息拦截器是一个实现了 rabbit_msg_interceptor行为的 Erlang 模块。拦截器做什么完全取决于其实现——它可以验证消息元数据、添加注解或执行任意的副作用。

可以通过 插件 开发和集成自定义拦截器。

RabbitMQ 附带了几个内置的消息拦截器。以下是使用 rabbitmq.conf 文件配置它们的示例。

入站消息拦截器

时间戳

此拦截器为每条入站消息添加时间戳。

message_interceptors.incoming.set_header_timestamp.overwrite = true
  • AMQP 1.0 和 Streams 客户端会收到消息注解:x-opt-rabbitmq-received-time(自 1970 年 1 月 1 日 UTC 起的毫秒时间戳)。
  • AMQP 0.9.1 客户端会收到:
    • timestamp_in_ms 头部(毫秒),以兼容以前的 消息时间戳插件
    • timestamp 属性(秒)。

要保留现有的 timestamp_in_ms 头部,请将 overwrite 设置为 false

message_interceptors.incoming.set_header_timestamp.overwrite = false

路由节点

此拦截器添加一个 x-routed-by 消息注解,指示是哪个 RabbitMQ 节点接收并路由了该消息。

message_interceptors.incoming.set_header_routing_node.overwrite = true

overwrite 设置为 false 以保留现有值。

message_interceptors.incoming.set_header_routing_node.overwrite = false

MQTT 客户端 ID

如果启用了 MQTT 插件,RabbitMQ 可以用发布 MQTT 客户端的 客户端 ID 来注解入站消息。这是通过添加一个键为 x-opt-mqtt-client-id 的消息注解来完成的。

mqtt.message_interceptors.incoming.set_client_id_annotation.enabled = true

此注解对 AMQP 1.0、AMQP 0.9.1 和 Streams 消费者可见。但是,MQTT 客户端不会收到此注解,因为 MQTT 规范不允许任意的 Broker 添加的注解。

出站消息拦截器

时间戳

此拦截器在消息发送给客户端时对其进行时间戳标记。

message_interceptors.outgoing.timestamp.enabled = true

注解键为 x-opt-rabbitmq-sent-time,其值为自 1970 年 1 月 1 日 UTC 起的毫秒时间戳。

© . This site is unofficial and not affiliated with VMware.