如何设计一个nats消息队列负载均衡系统
试着回答一下这个有趣的消费-生产问题吧:
- 这是一个使用消息队列解耦多个生产者和多个消费者的系统。
- 生产者生产的消息是有分区的(按
project_id
分区),分区也会动态分配。 - 消费者需要竞争消费消息,每个消费者可以消费多个分区。
- 对于一个分区,最多有不超过
1/n
个消费者在消费。 - 生产者和消费者根据性能和数据量,会在kubernetes中进行动态的扩缩容。
消息需要尽可能的被均匀地分配到消费者上,以实现负载均衡。
消息需要尽可能的N等分,这N份消息将分配给消费者服务A(A的数量N>=2A)
e.g: 1/10 情景: 消息需要尽可能的10等分,这10份消息将分配给消费者服务A(A的数量>=5),对于不满足5的倍数情况,如23个消费者,则余下3个随机分配
在这种动态输入输出的情况下,我们该如何设计一个消息队列负载均衡系统呢,如何保证6个9的SLA呢?
流量路径
客户的IoT Devices -> StreamGateway(nginx)四层负载均衡 -> DataGateway(生产者集群) -> NATS JetStream(消息队列集群) -> DataHandler(消费者数据存储集群)
背景
上述问题就是我在工作中遇到的一个实际场景。我们需要设计一个系统来处理多个生产者和消费者之间的消息传递,同时确保负载均衡和动态扩缩容。
这套系统使用mermaid图来描述其架构和工作流程如下:
graph TD subgraph Producers["生产者"] P1[生产者 1] P2[生产者 2] P3[生产者 ...] end subgraph MessageQueue["消息队列 (按 project_id 分区)"] Partition1[分区 1] Partition2[分区 2] Partition3[分区 ...] end subgraph Consumers["消费者"] C1[消费者 1] C2[消费者 2] C3[消费者 ...] end subgraph Kubernetes["Kubernetes 集群"] ScaleUpDown[动态扩缩容
根据性能和数据量] end P1 -->|生产消息| Partition1 P2 -->|生产消息| Partition2 P3 -->|生产消息| Partition3 Partition1 -->|竞争消费
最多1/n消费者| C1 Partition1 -->|竞争消费
最多1/n消费者| C2 Partition2 -->|竞争消费
最多1/n消费者| C2 Partition2 -->|竞争消费
最多1/n消费者| C3 Partition3 -->|竞争消费
最多1/n消费者| C1 Partition3 -->|竞争消费
最多1/n消费者| C3 Producers -->|动态扩缩容| Kubernetes Consumers -->|动态扩缩容| Kubernetes
你可能会注意到,在这个系统中,生产者和消费者都是动态的,并且消息队列是按project_id
分区的。这意味着我们需要一个灵活的机制来处理消息的分发和消费。
那么,为什么他们是动态的呢?
在实际应用中,生产者和消费者的数量会根据业务需求和系统负载进行调整。例如,在高峰期可能需要增加更多的消费者来处理大量的消息,而在低峰期则可以减少消费者以节省资源。
扩缩容策略:
消费者leader会根据性能(Cpu>=90%)或数据量(单个消费者消费的消息条数超过消费者数量的50倍(这个数据也可以动态配置),如100个消费者消费超过5000条),leader会调用kubernetes api进行动态的扩容。
当每个消费者平均消费的消息小于2条时缩容。
工作背景:
系统是一个物联网系统,需要处理不同客户上报的数据,并对其进行处理.
所有客户上报的数据(消息)会遵循一定的规律.在这里,它是一个project
, 因此会有唯一的project_id
所有的消息以projectid
进行分区,
生产者(数据网关)会将projectid
投送至DATASTORE_PROJECT
,
消费者在通过订阅获取到DATASTORE_PROJECT
后,
发送成功请求至DATASTORE_PROJECT_REG_RESP
,并订阅DATASTORE_PROJECT_DATA
数据开始消费
1 | const ( |
Language And Tools
- Golang
- NATS(JetStream)
- Kubernetes
- Etcd
Solution
我们的解决目标是构建一个能够动态伸缩、自动负载均衡的消费者集群。
我们将采用 分区(Partitioning)和服务协调(Coordination) 相结合的策略。
核心思路
我们将放弃 “所有消费者都去订阅同一个发现Topic” 的模式,因为这会导致所有消费者产生 惊群效应 ,并需要复杂的分布式锁来争抢和校验projectId
取而代之的核心思想是:
固定分区:
我们将projectid的空间预先划分成一个固定的、数量较多的分区,例如 P = 128 个。每个projectid通过哈希计算后,将永远属于同一个分区。
动态分配:
通过一个中心化的协调服务(etcd),将这 P 个分区动态地、均衡地分配给当前所有存活的 N 个消费者。
领导者选举:
在所有消费者中,选举出一个 Leader。这个 Leader 负责监控消费者的增减,并执行分区的重新分配和动态扩缩容决策。
这个设计类似于 Kafka 的 Consumer Group 机制,具有高可用性和水平扩展性。
生产者逻辑(分区划分)
当一个projectid
需要被处理时,生产者会:
- 计算该
projectid
的哈希值。 - 对哈希值取模,得到分区号
partition_id = hash(projectid) % P
。 - 将该projectid投递到对应的分区Topic,例如
/handler/data/partitions/p42
。 - 生产者不需要关心消费者的数量和状态,只需将消息投递到正确的分区。
1 | import ( |
服务协调和leader选举
etcd 终于登场了。在这个部分,它扮演了项目里至关重要的协调角色。在项目里,我利用它的几个特性:
- 键值存储:存储分区分配状态和消费者信息。
- Watch机制:实时监控消费者变化。
- Lease机制(TTL):实现消费者心跳和故障检测。
- 分布式锁:实现领导选举。
数据结构
/consumers/
/{consumer-id} -> {“cpu”: 25.5, “project_count”: 30, “last_message_id”: “uuid” } (Lease attached)
每个消费者启动后,都会在这个目录下创建一个以自己唯一ID为名的key,并关联一个Lease(租约,带TTL)。消费者需要定期续约(”心跳”),如果Lease过期,key会自动删除,代表该消费者已下线。值用来存放该消费者的性能指标。
/partitions/
/{partition_id} -> {consumer-id}
这个目录存储了每个分区当前被哪个消费者占有。例如
/partitions/42 -> consumer-abc
。/leader
一个用于分布式锁的key,成功写入该key的消费者成为Leader,同样需要关联Lease。
消费者逻辑(Leader)
Leader消费者负责以下任务:
赢得选举:启动后,所有消费者实例尝试获取
/leader
这个key上的锁。成功的成为Leader
。监控消费者变化:Leader WATCH etcd中的
/consumers/
目录,获取事件变化。新消费者加入 (PUT):触发再均衡 (Rebalance)。
消费者下线 (DELETE):触发再均衡。
?什么时候消费者数量会变化?
- 一般是消费者崩溃时(数据处理有可能导致崩溃,不过崩溃并不会导致数据丢失,数据会因为没有ACK重新回到分区)
执行再均衡:
获取当前所有存活的消费者列表 C (from /consumers/)。
获取当前所有的分区列表 P (0 to 127)。
执行分配算法:为每个消费者 c in C 分配 P/N 个分区。
最简单的分配方式是 partition_id % len(C),将分区 i 分配给 C[i % len(C)]。
为了最小化分区迁移,可以采用更优化的算法(如一致性哈希环的变种),不过,取模方式已足够均衡。
更新etcd:将新的分配结果 map[partition_id]consumer_id 写入到
/partitions/
目录。
执行扩缩容:Leader 定期(在代码里,项目里设置了30s)执行一次监控和扩缩容决策。
消费者逻辑(Worker【include leader】)
注册和发送心跳:启动后,生成唯一ID,在etcd
/consumers/{consumer-id}
创建自己的key,并持续续约Lease。监控任务分配:WATCH etcd中的
/partitions/
目录。响应任务变更:
- 当发现某个分区 p 的负责人变成了自己 (/partitions/p -> my-id),并且自己之前未订阅该分区:
- a. 立刻通过NATS订阅对应的分区Topic: /handler/data/partitions/p{p}。
- 当发现某个分区 p 的负责人不再是自己 (/partitions/p 被删除或值变为其他id),并且自己正在订阅该分区:
- a. 立刻取消NATS对该分区Topic的订阅,并完成正在处理的消息后,停止处理来自该分区的任何新消息。
消费数据:从已订阅的分区Topic中接收到 projectid 后,执行原有的业务逻辑:
发送成功请求至
DATASTORE_PROJECT_REG_RESP
。订阅
DATASTORE_PROJECT_DATA
。
上报状态:定期更新自己在etcd中 /consumers/{consumer-id} 的值,报告信息,供Leader决策。
动态扩缩容策略(Leader)
数据定义:
- cpu: cpu使用率
- cpu_max_usage: 节点触发扩容的CPU使用率
- project_count: 消费者当前处理的项目数量
- max_project_count: 触发扩容的项目数量阈值
- N: 消费者数量
Leader会定期(30秒)检查所有消费者的状态,并根据以下规则决定是否扩缩容:
- 超过CPU上限: 如果有任何消费者的 cpu 使用率超过 cpu_max_usage(90%),则触发扩容。
- 超过负载上限:
- 计算N.
- 获取所有消费者的 project_count,并计算平均值
avg_project_count = sum(project_count) / N
。 - 如果
avg_project_count > max_project_count(50)
- 触发扩容:
- Leader调用Go的Kubernetes客户端,更新对应Deployment的 replicas 字段,使其 replicas = replicas + 1
- 冷静期: 为防止频繁扩缩容,一次操作后,进入一个冷却期(10分钟),在此期间不执行扩缩容轮询操作。
- 平均负载过低:
- 计算N。
- 计算总项目数
avg_project_count = sum(project_counts) / N
。 if N > min_replicas (e.g., 2) && avg_project_count / N < 2
。- 触发缩容:
- Leader调用Go的Kubernetes客户端,更新对应Deployment的 replicas 字段,使其 replicas = replicas - 1
- 冷静期:同样,为防止频繁扩缩容,一次操作后,进入一个冷却期(10分钟),在此期间不执行扩缩容轮询操作。
工作流程:
K8s启动消费Pod。
该Pod内的服务实例启动,生成唯一ID
consumer-hostname
。consumer-hostname 连接etcd,在
/consumers/consumer-hostname
创建了一个带10秒Lease的key,并开始每5秒续约一次。Leader正在
WATCH /consumers/
,收到consumer-hostname
的 PUT 事件。Leader触发再均衡逻辑。假设之前有3个消费者 c1, c2, c3,现在有4个了。分区总数是128。
旧分配:c1 (0,3,6,…), c2 (1,4,7,…), c3 (2,5,8,…)。
新分配:c1 (0,4,8,…), c2 (1,5,9,…), c3 (2,6,10,…), consumer-hostname (3,7,11,…)。
Leader将新的分配结果批量更新到etcd的 /partitions/ 目录下。
所有消费者(包括c1,c2,c3,consumer-hostname)都在WATCH /partitions/。
c1, c2, c3 发现自己的一些分区被剥夺了,它们会取消对这些分区Topic的NATS订阅。
consumer-hostname
发现自己被分配了一批新的分区,它会去订阅这些分区对应的NATS Topic。系统在没有停机的情况下,自动将负载均衡到了新的消费者上。
数据一致性保障:
你可能会担心:在消费者宕机时(这系统在消费者因为外部逻辑的问题,存在处理数据时崩溃的可能),数据的一致性和完整性如何保障。
在这套系统里,我们采用以下策略来处理这些情况:
JetStream
NATS JetStream支持一种数据应答机制,确保消息在被消费者处理后,才会从队列中删除。
消费者在处理完数据后,必须发送一个确认消息(ACK)给JetStream,只有收到ACK后,JetStream才会将该消息从队列中移除。
Stream定义:
- Subjects: “/handler/data/partitions/*”
- Storage: “Memory” (Quickly)
- Retention Policy: WorkQueuePolicy (消息被成功消费后即可删除)
- Replicas: 3
- Ack Policy: AckExplicit
- Ack Wait Timeout: 5s(我们经过统计,5秒足够处理客户的数据了,数据SLA已达到6个9要求[99.9999%],再长的等待时间会导致资源浪费)
消费者宕机时的数据处理
如果消费者在处理数据(已拉取,发送uuid响应,但没写入最终结果)时宕机,系统会自动将其数据重新投入队列,其触发流程如下:
- 消费者在处理数据时,发送了成功请求至 DATASTORE_PROJECT_REG_RESP,但未完成最终数据处理。
- NATS JetStream在 5秒 后未收到ACK时,将该消息重新放回队列。
总结
通过上述设计,我们项目组实现了一个高效、动态的消息队列负载均衡系统。
它能够在生产者和消费者数量变化时,自动调整分区分配,确保负载均衡,并且在消费者宕机时,能够保证数据的一致性和完整性。