分布式消息通知套件架构及解决方案
分布式环境下,一个系统内的多个服务部署在不同的进程上,这些服务的通信方式多种多样,这里来介绍下消息订阅通知模式下的服务通知方式实现方案。
首先需要明晰一个分布式消息通知套件需要具备的能力:
- 订阅广播能力
- 订阅单播能力
对于组播能力,本身订阅就是组播。
接下来就是对这个套件选择底层框架,分布式订阅通知中间件通常使用消息队列,一般使用kafka,rabbitmq,nats等。
不论是订阅广播还是订阅单播,订阅能力都需要两个部分组成:
- subject:订阅主题
- callback:订阅回调
以下是对这篇技术博客的补充内容,重点扩展架构设计、关键实现细节及具体解决方案:
核心架构设计
1. 分层架构
graph TD
A[业务服务] -->|发布消息| B(通知套件SDK)
B --> C{消息路由层}
C -->|广播| D[消息队列集群]
C -->|单播| D
D --> E[通知套件Worker]
E -->|回调| F[订阅服务]
2. 关键组件说明
| 组件 | 职责 | 实现要点 | |—————–|—————————–|—————————-| | SDK | 提供统一API接口 | 封装序列化/重试/降级逻辑 | | 消息路由层 | 识别广播/单播模式 | 基于消息头message_type
字段 | | 元数据管理 | 存储主题-订阅者映射关系 | 使用ETCD/ZooKeeper持久化 | | 死信处理器 | 处理失败消息 | 自定义重试策略+人工干预接口 |
订阅模式实现方案
1. 广播订阅(Broadcast)
适用场景:配置更新、全局状态同步
实现方案:
1
2
3
4
5
6
7
// RabbitMQ 实现示例
FanoutExchange exchange = new FanoutExchange("config_update");
binding = BindingBuilder.bind(queue).to(exchange);
// Kafka 实现
Properties props = new Properties();
props.put("group.id", "ALL_CONSUMERS"); // 所有消费者相同Group
2. 单播订阅(Unicast)
适用场景:订单处理、任务分发
实现方案:
1
2
3
4
5
6
# RabbitMQ (Direct Exchange + 唯一队列)
channel.queue_declare(queue='order_service', exclusive=True)
channel.queue_bind(exchange='orders', queue='order_service', routing_key='SERVICE_A')
# NATS (Queue Groups)
nc.subscribe("orders.*", "ORDER_GROUP", callback)
3. 组播订阅(Multicast)
实现核心:动态路由绑定
1
2
3
4
// 创建订阅组
func JoinGroup(group, subject string) {
nats.QueueSubscribe(subject, group, handler)
}
消息timeout
分布式环境下消息的发放和接受依赖于网络环境,网络环境是不稳定的,在高并发下,消息丢失是非常可能发生的事情。那么消息重发机制就非常必要。
首先我们要明确消息丢失发生的时机,消息丢失可能发生在消息发送过程和订阅者处理完消息返回ack的过程,简单来说就是一去一返两个阶段。对于消息队列来说,这两个阶段发生消息丢失都是消息发送超时错误,但是能否都通过重发来解决呢?显然不能,对于send阶段的丢失,重发是可以解决的,但是对于ack阶段,重发会导致消息的重复消费,也就是一个消息被多次处理。
如何解决这个问题呢?下面提供两种解决方案:
消息幂等 / message idempotency
对于每个消息对应一个分布式唯一的unique_id,所有消息超时触发重发,但是订阅者处理消息时先检查消息unique_id是否被处理, 如果被处理直接返回成功ack;反之处理消息并记录unique_id。
unique_id的缓存可以使用redis,redis是分布式缓存。
消息状态追踪 / message state tracking
消息幂等虽然能够解决重复消费的问题,但是处理的消息错误状态非常有限,仅仅是处理过的消息和未处理的消息两种状态。对于复杂的场景下(比如消息处理逻辑涉及第三方接口,第三方接口可能不支持幂等,导致消息失效),消息幂等可能无法处理,实时记录消息状态想来是一个更好的选择。
消息状态的持久化可以选择redis持久化或者数据库持久化;消息数量较大时,保存在redis内可能带来昂贵的内存开销;消息持久化在数据库中查询速度不如redis。需要读者根据实际的消息量和业务场景来选择。
消息生产端可以查询发送的消息的状态来决定是否需要重发消息,消息消费端也可以查询消息的状态来决定是否需要处理消息。这样就双重保证了消息不会被重复消费并且消息的可达性大大提高。