使用 RabbitMQ 消息拓扑 Kubernetes Operator
使用此信息了解如何部署由消息拓扑 Operator 管理的自定义资源对象。
如何使用 RabbitMQ 消息拓扑 Operator
如果尚未安装 RabbitMQ 消息拓扑 Operator,请参阅快速入门信息进行部署。
此信息包含以下部分
- 集群 Operator 要求
- 跨多个命名空间范围
- 非 Operator 管理的 RabbitMQ
- 自定义连接 URI
- 队列和策略
- 用户和用户权限
- 交换器和绑定
- 虚拟主机
- 联邦
- Shovel
- 更新资源
- 删除资源
- 限制
- TLS
- 使用 HashiCorp Vault
- 配置 Operator 的日志级别
- 基于时间的协调
有关在 Openshift 上使用 Operator 的更多信息,请参阅在 Openshift 上使用 RabbitMQ Kubernetes Operators。
RabbitMQ 集群 Operator 要求
- 消息拓扑 Operator 可以与使用 Kubernetes 集群 Operator 部署的 RabbitMQ 集群一起使用。集群 Operator 的最低要求版本是
2.0.0。
跨多个命名空间范围
消息拓扑 Operator 可以在任何命名空间中协调其对象。但是,默认情况下,它会将交互限制在与拓扑对象(例如 Queue)相同命名空间内的 RabbitmqCluster 对象。换句话说,消息拓扑 Operator 将仅在 RabbitmqCluster 对象也位于 default 命名空间时,才协调 default 命名空间中的 Queue 对象。
这是默认行为,可以自定义以允许特定命名空间列表进行协调。要创建允许的命名空间列表,必须使用键 rabbitmq.com/topology-allowed-namespaces 和值(无空格的逗号分隔的命名空间名称列表,例如 default,ns1,my-namespace)来注释 RabbitmqCluster 对象。星号 * 值可用于允许所有命名空间。
任何拓扑对象都可以声明为使用 .spec.rabbitmqClusterReference.namespace 来定位不同命名空间中的 RabbitmqCluster。与 RabbitmqCluster 位于同一命名空间内的拓扑对象始终允许。
以下 YAML 声明了一个 RabbitmqCluster 对象,该对象允许来自 my-app 命名空间的拓扑对象
apiVersion: rabbitmq.com/v1beta1
kind: RabbitmqCluster
metadata:
name: example-rabbit
namespace: rabbitmq-service
annotations:
rabbitmq.com/topology-allowed-namespaces: "my-app"
spec:
replicas: 1
请注意,上面的 YAML 指定了 rabbitmq-service 命名空间。然后,以下 YAML 将从 my-app 命名空间定位上述 RabbitMQ。
apiVersion: rabbitmq.com/v1beta1
kind: Queue
metadata:
name: test # name of this custom resource; does not have to the same as the actual queue name
namespace: my-app
spec:
name: test-queue # name of the queue
rabbitmqClusterReference:
name: example-rabbit
namespace: rabbitmq-service
关于禁止的跨命名空间对象的 重要信息
如果拓扑对象(例如 Queue)定位了一个 RabbitmqCluster,而该 RabbitmqCluster 不允许来自拓扑对象命名空间的请求,则会创建一个带有错误消息的状态条件,并且该对象不会被协调,直到它被更新为止。
例如,ns1 命名空间中的 RabbitmqCluster 允许来自 my-app 的拓扑,而 not-allowed 命名空间中的拓扑对象 Queue 定位该 RabbitmqCluster,消息拓扑 Operator 将记录一条错误消息,更新 Queue 对象中的状态条件,并放弃协调 Queue 对象。如果 RabbitmqCluster 对象被更新为允许来自 not-allowed 命名空间的拓扑对象,则 Queue 对象必须手动更新以触发协调;例如,通过向 Queue 对象添加标签。
非 Operator 管理的 RabbitMQ
- 对于任何非由 集群 Operator 部署的 RabbitMQ,都可以提供连接 secret 来创建 RabbitMQ 拓扑对象。
- 此功能自消息拓扑 Operator
1.4.0起发布。
请注意,spec.rabbitmqClusterReference 是一个不可变字段。例如,创建后无法更新 connectionSecret 名称。
以下清单创建一个队列,并使用 kubernetes secret my-rabbit-creds 中的凭据连接到 RabbitMQ 服务器
---
apiVersion: v1
kind: Secret
metadata:
name: my-rabbit-creds
type: Opaque
stringData:
# has to be an existing user
username: a-user
password: a-secure-password
# uri for the management api; when no scheme is provided in the uri, 'http' will be used by default
uri: https://my.rabbit:15672
---
apiVersion: rabbitmq.com/v1beta1
kind: Queue
metadata:
name: qq-example
spec:
name: my-queue
rabbitmqClusterReference:
connectionSecret:
# has to be an existing secret in the same namespace as this Queue object
name: my-rabbit-creds
- 如果设置了
rabbitmqClusterReference.namespace,将使用该命名空间中的 secret。
---
apiVersion: v1
kind: Secret
metadata:
name: my-rabbit-creds
namespace: rabbitmq-system
annotations:
# has to be "*" or match namespace name(s) where RabbitMQ objects are deployed
rabbitmq.com/topology-allowed-namespaces: "qq-namespace"
type: Opaque
stringData:
# has to be an existing user
username: a-user
password: a-secure-password
# uri for the management api; when no scheme is provided in the uri, 'http' will be used by default
uri: https://my.rabbit:15672
---
apiVersion: rabbitmq.com/v1beta1
kind: Queue
metadata:
name: qq-example
namespace: qq-namespace
spec:
name: my-queue
rabbitmqClusterReference:
namespace: rabbitmq-system
connectionSecret:
# has to be an existing secret in the namespace specified in rabbitmqClusterReference.namespace
name: my-rabbit-creds
请注意,spec.rabbitmqClusterReference 是一个不可变字段。例如,创建后无法更新 connectionSecret 名称。
跨命名空间连接 secret
从消息拓扑 Operator 1.13 开始,可以在 connectionSecret 对象中设置 namespace。但是,Secret必须使用 rabbitmq.com/topology-allowed-namespaces 注释,并包含允许的命名空间列表。例如,一个位于 central-vault 命名空间的 Secret,使用 rabbitmq.com/topology-allowed-namespaces: rabbitmq-service 注释,只有当拓扑对象(例如 Queue)位于 rabbitmq-service 命名空间中时,拓扑 Operator 才能使用它来读取 RabbitMQ 凭据。
---
apiVersion: v1
kind: Secret
metadata:
name: rabbitmq-service-credentials
namespace: central-vault
annotations:
rabbitmq.com/topology-allowed-namespaces: rabbitmq-service
type: Opaque
stringData:
username: a-user # user must already exist in RabbitMQ
password: a-secure-password
uri: https://my.rabbit:15672 # (optional) uri for the management api; when scheme is not provided in uri, operator defaults to 'http'
---
apiVersion: rabbitmq.com/v1beta1
kind: Queue
metadata:
name: my-queue
namespace: rabbitmq-service
spec:
name: my-queue
rabbitmqClusterReference:
connectionSecret:
name: rabbitmq-service-credentials
namespace: central-vault
自定义连接 URI
- 对于无法通过其 Kubernetes 服务对象连接的 RabbitmqClusters(例如,如果 TLS 证书是为自定义域而不是 Kubernetes 服务生成的),您可以使用自定义连接 URI 来注释 Rabbitmqclusters。消息拓扑 Operator 将使用提供的信息进行连接,而不是 Kubernetes DNS。
- 此功能自消息拓扑 Operator
1.12.0起发布。
要注释 RabbitmqClusters,
kubectl annotate rmq RMQ-NAME rabbitmq.com/operator-connection-uri=https://test:1234
队列和策略
消息拓扑 Operator 可以声明 RabbitMQ 集群中的队列和策略。
以下清单将创建一个名为“test”的队列,位于默认 vhost 中
apiVersion: rabbitmq.com/v1beta1
kind: Queue
metadata:
name: test # name of this custom resource; does not have to the same as the actual queue name
namespace: rabbitmq-system
spec:
name: test # name of the queue
autoDelete: false
durable: true
rabbitmqClusterReference:
name: example-rabbit
以下清单将在默认虚拟主机中创建一个名为“lazy-queue”的策略
apiVersion: rabbitmq.com/v1beta1
kind: Policy
metadata:
name: policy-example # name of this custom resource
namespace: rabbitmq-system
spec:
name: lazy-queue
pattern: "^lazy-queue-" # matches any queue begins with "lazy-queue-"
applyTo: "queues"
definition:
queue-mode: lazy
rabbitmqClusterReference:
name: example-rabbit
请注意,不建议直接在队列上设置可选队列参数。一旦设置,队列属性将无法更改。请改用策略。
交换器和绑定
消息拓扑 Operator 可以管理交换器和绑定。以下清单将创建一个 fanout 交换器
apiVersion: rabbitmq.com/v1beta1
kind: Exchange
metadata:
name: fanout
namespace: rabbitmq-system
spec:
name: fanout-exchange # name of the exchange
type: fanout # default to 'direct' if not provided; can be set to 'direct', 'fanout', 'headers', and 'topic'
autoDelete: false
durable: true
rabbitmqClusterReference:
name: example-rabbit
以下清单将创建一个交换器和队列之间的绑定
apiVersion: rabbitmq.com/v1beta1
kind: Binding
metadata:
name: binding
namespace: rabbitmq-system
spec:
source: test # an existing exchange
destination: test # an existing queue
destinationType: queue # can be 'queue' or 'exchange'
rabbitmqClusterReference:
name: example-rabbit
用户和用户权限
您可以使用消息拓扑 Operator 来创建 RabbitMQ 用户并分配用户权限。在访问控制指南中了解更多关于用户管理的信息。
具有自动生成用户名和密码的用户
默认情况下,消息拓扑 Operator 会创建带有生成凭据的用户。
以下清单将创建一个带有生成用户名和密码的用户,并且可以通过 Kubernetes secret 对象访问生成的用户名和密码
---
apiVersion: rabbitmq.com/v1beta1
kind: User
metadata:
name: user-example
namespace: rabbitmq-system
spec:
tags:
- policymaker
rabbitmqClusterReference:
name: example-rabbitmq
生成的用户名和密码将存储在名称为 User 对象后缀为“-user-credentials”的 secret 中。
$ kubectl -n rabbitmq-system get secret user-example-user-credentials -o yaml
apiVersion: v1
data:
password: <base64>
username: <base64>
kind: Secret
metadata:
creationTimestamp: "2025-08-29T10:31:03Z"
name: user-example-user-credentials
namespace: rabbimq-system
ownerReferences:
- apiVersion: rabbitmq.com/v1beta1
blockOwnerDeletion: false
controller: true
kind: User
name: user-example
uid: d9fbf25f-ddf2-4576-a4e6-7574ca6a660c
resourceVersion: "4806431680"
uid: c03013f5-b7a5-4f02-b0be-f8ab1e17971c
type: Opaque
请注意,Operator 不会监控生成的 secret 对象,更新 secret 对象也不会更新凭据。作为一种解决方法,请向 users.rabbitmq.com 对象添加标签或注释以触发 Operator 进行协调。
具有提供用户名和密码的用户
Operator 还支持使用提供的凭据创建 RabbitMQ 用户。在创建带有提供用户名和密码的用户时,在 Data 字段中创建一个包含 username 和 password 键的 kubernetes secret 对象。Operator 不会监控提供的 secret 对象,更新 secret 对象也不会更新凭据。作为一种解决方法,请向 users.rabbitmq.com 对象添加标签或注释以触发 Operator 进行协调。
以下清单将从 secret 'my-rabbit-user' 中提供用户名和密码来创建一个用户
---
apiVersion: rabbitmq.com/v1beta1
kind: User
metadata:
name: import-user-sample
spec:
tags:
- policymaker
- monitoring # other available tags are 'management' and 'administrator'
rabbitmqClusterReference:
name: rabbit-example
importCredentialsSecret:
name: my-rabbit-user # name of the secret
具有密码哈希和无密码用户
从 Topology Operator v1.15.0 开始,可以使用 SHA-512 哈希提供用户密码。Topology Operator 资源不支持其他哈希算法。要创建带有密码哈希的用户,请在凭据 Secret 中使用 passwordHash 字段。
---
apiVersion: v1
kind: Secret
metadata:
name: my-user-credentials # IMPORTANT: this Secret name must match .spec.importCredentialsSecret.name field in User object
type: Opaque
stringData:
username: my-user
passwordHash: tLXSw/gQg2QVPehN+O5wpRx6o22OKMX2OuO5nb0mrametb29utktQXy6caRkxk9QwNxxwgPC7qZ2psKy48GPI0sC6TQ= # SHA-512 hash of "foobarbaz"
---
apiVersion: rabbitmq.com/v1beta1
kind: User
metadata:
name: my-admin-user
spec:
tags:
- administrator
rabbitmqClusterReference:
name: test # rabbitmqCluster must exist in the same namespace as this resource
importCredentialsSecret:
name: my-user-credentials # must match the name of the Secret
如果存在 passwordHash 字段,则会忽略 password 字段,并且生成的凭据 Secret 将仅包含哈希。如果哈希为空字符串,则会创建一个无密码用户。例如
---
apiVersion: v1
kind: Secret
metadata:
name: my-user-credentials
type: Opaque
stringData:
username: my-user
passwordHash: ""
重要的是要注意,空字符串 passwordHash不等同于根本不提供该字段。为了生成无密码用户,必须提供一个空字符串 "" 作为 passwordHash。
用户权限对象
要为现有用户设置用户权限,请创建 permissions.rabbitmq.com 资源。以下示例将为用户 rabbit-user-1 分配权限
apiVersion: rabbitmq.com/v1beta1
kind: Permission
metadata:
name: rabbit-user-1-permission
namespace: rabbitmq-system
spec:
vhost: "/"
user: "rabbit-user-1" # name of the RabbitMQ user
permissions:
write: ".*"
configure: ".*"
read: ".*"
rabbitmqClusterReference:
name: sample
虚拟主机
消息拓扑 Operator 可以创建虚拟主机。
以下清单将在名为 'example-rabbit' 的 RabbitmqCluster 中创建一个名为 'test' 的 vhost
apiVersion: rabbitmq.com/v1beta1
kind: Vhost
metadata:
name: test-vhost # name of this custom resource
namespace: rabbitmq-system
spec:
name: test # name of the vhost
rabbitmqClusterReference:
name: example-rabbit
Federation
消息拓扑 Operator 可以定义Federation 上游。
由于 Federation 上游 URI 包含凭据,因此它通过 Kubernetes Secret 对象提供。'uri' 键对于 Secret 对象是必需的。其值可以是单个 URI,也可以是以逗号分隔的 URI 列表。
以下清单将在名为 'example-rabbit' 的 RabbitmqCluster 中定义一个名为 'origin' 的上游
apiVersion: rabbitmq.com/v1beta1
kind: Federation
metadata:
name: federation-example
namespace: rabbitmq-system
spec:
name: "origin"
uriSecret:
# secret must be created in the same namespace as this Federation object; in this case 'rabbitmq-system'
name: {secret-name}
ackMode: "on-confirm"
rabbitmqClusterReference:
name: example-rabbit
Shovel
消息拓扑 Operator 可以声明动态 Shovel。
Shovel 源 URI 和目标 URI 通过 Kubernetes Secret 对象提供。Secret 对象必须包含两个键,“srcUri”和“destUri”,每个键的值可以是单个 URI,也可以是以逗号分隔的 URI 列表。
以下清单将在名为 'example-rabbit' 的 RabbitmqCluster 中创建一个名为 'my-shovel' 的 Shovel
apiVersion: rabbitmq.com/v1beta1
kind: Shovel
metadata:
name: shovel-example
namespace: rabbitmq-system
spec:
name: "my-shovel"
uriSecret:
# secret must be created in the same namespace as this Shovel object; in this case 'rabbitmq-system'
name: {secret-name}
srcQueue: "the-source-queue"
destQueue: "the-destination-queue"
rabbitmqClusterReference:
name: example-rabbit
更多shovels 示例。
更新资源
某些自定义资源属性是不可变的。消息拓扑 Operator 实现验证 Webhook 以防止对不可变字段的更新。不允许的更新将被拒绝。例如
kubectl apply -f test-queue.yaml
Error from server (Forbidden):
...
Resource: "rabbitmq.com/v1beta1, Resource=queues", GroupVersionKind: "rabbitmq.com/v1beta1, Kind=Queue"
Name: "example", Namespace: "rabbitmq-system"
for: "test-queue.yaml": admission webhook "vqueue.kb.io" denied the request: Queue.rabbitmq.com "example" is forbidden: spec.name: Forbidden: updates on name, vhost, and rabbitmqClusterReference are all forbidden
无法更新的属性在消息拓扑 Operator API 文档中有说明。
删除资源
删除自定义资源将删除 RabbitMQ 集群中相应的资源。消息拓扑 Operator 在所有自定义资源上设置 Kubernetes finalizers。
从消息拓扑 Operator 版本 1.17 开始,可以使用 deletionPolicy 属性控制 Federation、Queue、Shovel 和 Vhost 资源的删除行为。
此属性决定在删除自定义资源时,集群中相应资源会发生什么。
deletionPolicy 支持以下值
delete: 删除自定义资源时,资源将从集群中移除(这是默认值)retain: 删除自定义资源时,资源将保留在 RabbitMQ 集群中
以下使用了 Queue 资源
apiVersion: rabbitmq.com/v1beta1
kind: Queue
metadata:
name: example-queue
spec:
name: example-queue
rabbitmqClusterReference:
name: example-rabbitmq
deletionPolicy: retain # Queue will remain in RabbitMQ cluster when this k8s resource is deleted
如果对象已在 RabbitMQ 集群中删除,或者 RabbitMQ 集群已被删除,消息拓扑 Operator 将删除自定义资源,而不尝试删除 RabbitMQ 对象。
限制
消息拓扑 Operator 不适用于导入定义的 RabbitmqCluster。Operator 读取 RabbitmqCluster 状态 status.binding 中设置的默认用户 secret,如果 RabbitmqCluster 使用导入的定义部署,定义将覆盖默认用户凭据,并可能导致消息拓扑 Operator 无法访问 RabbitmqCluster。要解决此问题,您可以创建一个不导入定义的 RabbitmqCluster,或者手动更新默认用户 kubernetes secret 以匹配定义中设置的实际用户凭据。
TLS
如果消息拓扑 Operator 管理的 RabbitmqClusters 配置为通过 HTTPS 提供管理服务,则需要额外的步骤来配置消息拓扑 Operator。请遵循此TLS 专用指南来配置 Operator。
(可选) 使用 HashiCorp Vault
如果消息拓扑 Operator 管理的 RabbitmqClusters 配置为将默认用户凭据存储在 Vault 中,则需要额外的步骤来配置消息拓扑 Operator。请遵循此Vault 专用指南来配置 Operator。
配置 Operator 的日志级别
消息拓扑 Operator 会记录协调结果和错误。可以通过 kubectl -n rabbitmq-system logs -l app.kubernetes.io/name=messaging-topology-operator 来查看 Operator 日志。它使用 zap logger,可以通过在 Operator 部署清单中传递命令行标志来配置。
例如,要将日志级别配置为“debug”
apiVersion: apps/v1
kind: Deployment
metadata:
name: messaging-topology-operator
namespace: rabbitmq-system
spec:
template:
spec:
containers:
- args:
- --zap-log-level=debug
command:
- /manager
zap logger 的其他可用命令行标志可以在controller runtime 中找到。
基于时间的协调
默认情况下,当特定自定义资源发生创建/更新/删除事件时,消息拓扑 Operator 会协调拓扑对象。从版本 1.6.0 开始,您可以通过在 Operator 部署中设置环境变量来配置 Operator 以特定频率执行所有拓扑对象的协调。
apiVersion: apps/v1
kind: Deployment
metadata:
[...]
name: messaging-topology-operator
namespace: rabbitmq-system
spec:
template:
spec:
containers:
- command:
- /manager
env:
- name: SYNC_PERIOD
value: 5m # needs to be in a format that's readable by golang time.ParseDuration(), e.g. “1000s”, “5.3h” or “20h35m”
...
重新创建已删除的队列不会恢复任何消息。
请注意,此频率适用于 Operator 管理的所有拓扑对象。根据您使用 Topology Operator 创建的对象数量,以特定频率协调所有对象可能会给 Operator 和 RabbitMQ 服务器带来不必要的负载。仅当用例需要基于时间的协调时,才使用此功能。