拦截消息
概述
RabbitMQ 提供了一种通用的机制来拦截 Broker 上的消息。拦截可以在两个阶段发生:
拦截器由以下 Erlang 进程之一执行:
- AMQP 1.0 会话
- AMQP 0.9.1 通道
- MQTT 连接
通过 RabbitMQ Streams 协议发送的消息不会被拦截。
消息拦截器是一个实现了 rabbit_msg_interceptor行为的 Erlang 模块。拦截器做什么完全取决于其实现——它可以验证消息元数据、添加注解或执行任意的副作用。
可以通过 插件 开发和集成自定义拦截器。
RabbitMQ 附带了几个内置的消息拦截器。以下是使用 rabbitmq.conf 文件配置它们的示例。
入站消息拦截器
时间戳
此拦截器为每条入站消息添加时间戳。
message_interceptors.incoming.set_header_timestamp.overwrite = true
- AMQP 1.0 和 Streams 客户端会收到消息注解:
x-opt-rabbitmq-received-time(自 1970 年 1 月 1 日 UTC 起的毫秒时间戳)。 - AMQP 0.9.1 客户端会收到:
timestamp_in_ms头部(毫秒),以兼容以前的 消息时间戳插件。timestamp属性(秒)。
要保留现有的 timestamp_in_ms 头部,请将 overwrite 设置为 false。
message_interceptors.incoming.set_header_timestamp.overwrite = false
路由节点
此拦截器添加一个 x-routed-by 消息注解,指示是哪个 RabbitMQ 节点接收并路由了该消息。
message_interceptors.incoming.set_header_routing_node.overwrite = true
将 overwrite 设置为 false 以保留现有值。
message_interceptors.incoming.set_header_routing_node.overwrite = false
MQTT 客户端 ID
如果启用了 MQTT 插件,RabbitMQ 可以用发布 MQTT 客户端的 客户端 ID 来注解入站消息。这是通过添加一个键为 x-opt-mqtt-client-id 的消息注解来完成的。
mqtt.message_interceptors.incoming.set_client_id_annotation.enabled = true
此注解对 AMQP 1.0、AMQP 0.9.1 和 Streams 消费者可见。但是,MQTT 客户端不会收到此注解,因为 MQTT 规范不允许任意的 Broker 添加的注解。
出站消息拦截器
时间戳
此拦截器在消息发送给客户端时对其进行时间戳标记。
message_interceptors.outgoing.timestamp.enabled = true
注解键为 x-opt-rabbitmq-sent-time,其值为自 1970 年 1 月 1 日 UTC 起的毫秒时间戳。