试着回答一下这个有趣的消费-生产问题吧:

  1. 这是一个使用消息队列解耦多个生产者和多个消费者的系统。
  2. 生产者生产的消息是有分区的(按project_id分区),分区也会动态分配。
  3. 消费者需要竞争消费消息,每个消费者可以消费多个分区。
  4. 对于一个分区,最多有不超过1/n个消费者在消费。
  5. 生产者和消费者根据性能和数据量,会在kubernetes中进行动态的扩缩容。

消息需要尽可能的被均匀地分配到消费者上,以实现负载均衡。

消息需要尽可能的N等分,这N份消息将分配给消费者服务A(A的数量N>=2A)

e.g: 1/10 情景: 消息需要尽可能的10等分,这10份消息将分配给消费者服务A(A的数量>=5),对于不满足5的倍数情况,如23个消费者,则余下3个随机分配

在这种动态输入输出的情况下,我们该如何设计一个消息队列负载均衡系统呢,如何保证6个9的SLA呢?

流量路径

客户的IoT Devices -> StreamGateway(nginx)四层负载均衡 -> DataGateway(生产者集群) -> NATS JetStream(消息队列集群) -> DataHandler(消费者数据存储集群)

背景

上述问题就是我在工作中遇到的一个实际场景。我们需要设计一个系统来处理多个生产者和消费者之间的消息传递,同时确保负载均衡和动态扩缩容。

这套系统使用mermaid图来描述其架构和工作流程如下:

graph TD
    subgraph Producers["生产者"]
        P1[生产者 1]
        P2[生产者 2]
        P3[生产者 ...]
    end

    subgraph MessageQueue["消息队列 (按 project_id 分区)"]
        Partition1[分区 1]
        Partition2[分区 2]
        Partition3[分区 ...]
    end

    subgraph Consumers["消费者"]
        C1[消费者 1]
        C2[消费者 2]
        C3[消费者 ...]
    end

    subgraph Kubernetes["Kubernetes 集群"]
        ScaleUpDown[动态扩缩容
根据性能和数据量] end P1 -->|生产消息| Partition1 P2 -->|生产消息| Partition2 P3 -->|生产消息| Partition3 Partition1 -->|竞争消费
最多1/n消费者| C1 Partition1 -->|竞争消费
最多1/n消费者| C2 Partition2 -->|竞争消费
最多1/n消费者| C2 Partition2 -->|竞争消费
最多1/n消费者| C3 Partition3 -->|竞争消费
最多1/n消费者| C1 Partition3 -->|竞争消费
最多1/n消费者| C3 Producers -->|动态扩缩容| Kubernetes Consumers -->|动态扩缩容| Kubernetes

你可能会注意到,在这个系统中,生产者和消费者都是动态的,并且消息队列是按project_id分区的。这意味着我们需要一个灵活的机制来处理消息的分发和消费。

那么,为什么他们是动态的呢?

在实际应用中,生产者和消费者的数量会根据业务需求和系统负载进行调整。例如,在高峰期可能需要增加更多的消费者来处理大量的消息,而在低峰期则可以减少消费者以节省资源。

扩缩容策略:

消费者leader会根据性能(Cpu>=90%)或数据量(单个消费者消费的消息条数超过消费者数量的50倍(这个数据也可以动态配置),如100个消费者消费超过5000条),leader会调用kubernetes api进行动态的扩容。

当每个消费者平均消费的消息小于2条时缩容。

工作背景:

系统是一个物联网系统,需要处理不同客户上报的数据,并对其进行处理.

所有客户上报的数据(消息)会遵循一定的规律.在这里,它是一个project, 因此会有唯一的project_id

所有的消息以projectid进行分区,
生产者(数据网关)会将projectid投送至DATASTORE_PROJECT
消费者在通过订阅获取到DATASTORE_PROJECT后,
发送成功请求至DATASTORE_PROJECT_REG_RESP,并订阅DATASTORE_PROJECT_DATA数据开始消费

1
2
3
4
5
6
const (
DATASTORE_PROJECT = "/handler/data/partitions/p%d" // e.g., /handler/data/partitions/p0, ... p127
DATASTORE_PROJECT_REG_RESP = DATASTORE_PROJECT + "/%s/reg/response"
DATASTORE_PROJECT_CLOSE = DATASTORE_PROJECT + "/%s/close"
DATASTORE_PROJECT_DATA= DATASTORE_PROJECT + "/%s/data"
)

Language And Tools

  • Golang
  • NATS(JetStream)
  • Kubernetes
  • Etcd

Solution

我们的解决目标是构建一个能够动态伸缩、自动负载均衡的消费者集群。

我们将采用 分区(Partitioning)和服务协调(Coordination) 相结合的策略。

核心思路

我们将放弃 “所有消费者都去订阅同一个发现Topic” 的模式,因为这会导致所有消费者产生 惊群效应 ,并需要复杂的分布式锁来争抢和校验projectId

取而代之的核心思想是:

  1. 固定分区:

    我们将projectid的空间预先划分成一个固定的、数量较多的分区,例如 P = 128 个。每个projectid通过哈希计算后,将永远属于同一个分区。

  2. 动态分配:

    通过一个中心化的协调服务(etcd),将这 P 个分区动态地、均衡地分配给当前所有存活的 N 个消费者。

  3. 领导者选举:

    在所有消费者中,选举出一个 Leader。这个 Leader 负责监控消费者的增减,并执行分区的重新分配和动态扩缩容决策。

这个设计类似于 Kafka 的 Consumer Group 机制,具有高可用性和水平扩展性

生产者逻辑(分区划分)

当一个projectid需要被处理时,生产者会:

  1. 计算该projectid的哈希值。
  2. 对哈希值取模,得到分区号 partition_id = hash(projectid) % P
  3. 将该projectid投递到对应的分区Topic,例如 /handler/data/partitions/p42
  4. 生产者不需要关心消费者的数量和状态,只需将消息投递到正确的分区。
1
2
3
4
5
6
7
8
9
10
import (
"crypto/sha256"
"math/big"
)

func GetPartition(id string, totalPartitions int) int {
hash := sha256.Sum256([]byte(id))
hashInt := new(big.Int).SetBytes(hash[:])
return int(hashInt.Mod(hashInt, big.NewInt(int64(totalPartitions))).Int64())
}

服务协调和leader选举

etcd 终于登场了。在这个部分,它扮演了项目里至关重要的协调角色。在项目里,我利用它的几个特性:

  • 键值存储:存储分区分配状态和消费者信息。
  • Watch机制:实时监控消费者变化。
  • Lease机制(TTL):实现消费者心跳和故障检测。
  • 分布式锁:实现领导选举

数据结构

  • /consumers/

    /{consumer-id} -> {“cpu”: 25.5, “project_count”: 30, “last_message_id”: “uuid” } (Lease attached)

    每个消费者启动后,都会在这个目录下创建一个以自己唯一ID为名的key,并关联一个Lease(租约,带TTL)。消费者需要定期续约(”心跳”),如果Lease过期,key会自动删除,代表该消费者已下线。值用来存放该消费者的性能指标。

  • /partitions/

    /{partition_id} -> {consumer-id}

    这个目录存储了每个分区当前被哪个消费者占有。例如 /partitions/42 -> consumer-abc

  • /leader

    一个用于分布式锁的key,成功写入该key的消费者成为Leader,同样需要关联Lease。

消费者逻辑(Leader)

Leader消费者负责以下任务:

  1. 赢得选举:启动后,所有消费者实例尝试获取 /leader 这个key上的锁。成功的成为Leader

  2. 监控消费者变化:Leader WATCH etcd中的 /consumers/ 目录,获取事件变化。

    • 新消费者加入 (PUT):触发再均衡 (Rebalance)。

    • 消费者下线 (DELETE):触发再均衡。

    • ?什么时候消费者数量会变化?

      • 一般是消费者崩溃时(数据处理有可能导致崩溃,不过崩溃并不会导致数据丢失,数据会因为没有ACK重新回到分区)
  3. 执行再均衡:

    • 获取当前所有存活的消费者列表 C (from /consumers/)。

    • 获取当前所有的分区列表 P (0 to 127)。

    • 执行分配算法:为每个消费者 c in C 分配 P/N 个分区。

      • 最简单的分配方式是 partition_id % len(C),将分区 i 分配给 C[i % len(C)]。

      • 为了最小化分区迁移,可以采用更优化的算法(如一致性哈希环的变种),不过,取模方式已足够均衡。

    • 更新etcd:将新的分配结果 map[partition_id]consumer_id 写入到 /partitions/ 目录。

  4. 执行扩缩容:Leader 定期(在代码里,项目里设置了30s)执行一次监控和扩缩容决策。

消费者逻辑(Worker【include leader】)

  1. 注册和发送心跳:启动后,生成唯一ID,在etcd /consumers/{consumer-id} 创建自己的key,并持续续约Lease。

  2. 监控任务分配:WATCH etcd中的 /partitions/ 目录。

  3. 响应任务变更:

    • 当发现某个分区 p 的负责人变成了自己 (/partitions/p -> my-id),并且自己之前未订阅该分区:
    • a. 立刻通过NATS订阅对应的分区Topic: /handler/data/partitions/p{p}。
    • 当发现某个分区 p 的负责人不再是自己 (/partitions/p 被删除或值变为其他id),并且自己正在订阅该分区:
    • a. 立刻取消NATS对该分区Topic的订阅,并完成正在处理的消息后,停止处理来自该分区的任何新消息。
  4. 消费数据:从已订阅的分区Topic中接收到 projectid 后,执行原有的业务逻辑:

    • 发送成功请求至 DATASTORE_PROJECT_REG_RESP

    • 订阅 DATASTORE_PROJECT_DATA

  5. 上报状态:定期更新自己在etcd中 /consumers/{consumer-id} 的值,报告信息,供Leader决策。

动态扩缩容策略(Leader)

数据定义:

  • cpu: cpu使用率
  • cpu_max_usage: 节点触发扩容的CPU使用率
  • project_count: 消费者当前处理的项目数量
  • max_project_count: 触发扩容的项目数量阈值
  • N: 消费者数量

Leader会定期(30秒)检查所有消费者的状态,并根据以下规则决定是否扩缩容:

  • 超过CPU上限: 如果有任何消费者的 cpu 使用率超过 cpu_max_usage(90%),则触发扩容。
  • 超过负载上限:
    1. 计算N.
    2. 获取所有消费者的 project_count,并计算平均值 avg_project_count = sum(project_count) / N
    3. 如果 avg_project_count > max_project_count(50)
    4. 触发扩容:
      • Leader调用Go的Kubernetes客户端,更新对应Deployment的 replicas 字段,使其 replicas = replicas + 1
      • 冷静期: 为防止频繁扩缩容,一次操作后,进入一个冷却期(10分钟),在此期间不执行扩缩容轮询操作。
  • 平均负载过低:
    1. 计算N。
    2. 计算总项目数 avg_project_count = sum(project_counts) / N
    3. if N > min_replicas (e.g., 2) && avg_project_count / N < 2
    4. 触发缩容:
      • Leader调用Go的Kubernetes客户端,更新对应Deployment的 replicas 字段,使其 replicas = replicas - 1
      • 冷静期:同样,为防止频繁扩缩容,一次操作后,进入一个冷却期(10分钟),在此期间不执行扩缩容轮询操作。

工作流程:

  1. K8s启动消费Pod。

  2. 该Pod内的服务实例启动,生成唯一ID consumer-hostname

  3. consumer-hostname 连接etcd,在 /consumers/consumer-hostname 创建了一个带10秒Lease的key,并开始每5秒续约一次。

  4. Leader正在 WATCH /consumers/,收到 consumer-hostname 的 PUT 事件。

  5. Leader触发再均衡逻辑。假设之前有3个消费者 c1, c2, c3,现在有4个了。分区总数是128。

    • 旧分配:c1 (0,3,6,…), c2 (1,4,7,…), c3 (2,5,8,…)。

    • 新分配:c1 (0,4,8,…), c2 (1,5,9,…), c3 (2,6,10,…), consumer-hostname (3,7,11,…)。

  6. Leader将新的分配结果批量更新到etcd的 /partitions/ 目录下。

  7. 所有消费者(包括c1,c2,c3,consumer-hostname)都在WATCH /partitions/。

  8. c1, c2, c3 发现自己的一些分区被剥夺了,它们会取消对这些分区Topic的NATS订阅。

  9. consumer-hostname 发现自己被分配了一批新的分区,它会去订阅这些分区对应的NATS Topic。

  10. 系统在没有停机的情况下,自动将负载均衡到了新的消费者上。


数据一致性保障:

你可能会担心:在消费者宕机时(这系统在消费者因为外部逻辑的问题,存在处理数据时崩溃的可能),数据的一致性和完整性如何保障。
在这套系统里,我们采用以下策略来处理这些情况:

JetStream

NATS JetStream支持一种数据应答机制,确保消息在被消费者处理后,才会从队列中删除。
消费者在处理完数据后,必须发送一个确认消息(ACK)给JetStream,只有收到ACK后,JetStream才会将该消息从队列中移除。

Stream定义:

  • Subjects: “/handler/data/partitions/*”
  • Storage: “Memory” (Quickly)
  • Retention Policy: WorkQueuePolicy (消息被成功消费后即可删除)
  • Replicas: 3
  • Ack Policy: AckExplicit
  • Ack Wait Timeout: 5s(我们经过统计,5秒足够处理客户的数据了,数据SLA已达到6个9要求[99.9999%],再长的等待时间会导致资源浪费)

消费者宕机时的数据处理

如果消费者在处理数据(已拉取,发送uuid响应,但没写入最终结果)时宕机,系统会自动将其数据重新投入队列,其触发流程如下:

  1. 消费者在处理数据时,发送了成功请求至 DATASTORE_PROJECT_REG_RESP,但未完成最终数据处理。
  2. NATS JetStream在 5秒 后未收到ACK时,将该消息重新放回队列。

总结

通过上述设计,我们项目组实现了一个高效、动态的消息队列负载均衡系统。

它能够在生产者和消费者数量变化时,自动调整分区分配,确保负载均衡,并且在消费者宕机时,能够保证数据的一致性和完整性。