- 指产生消息的业务动作与消息发送的一致。(也就是说,如果业务操作成功,那么由这个业务操作所产生的消息一定要成功投递出去,否则就丢消息)
- 处理方式1
- 如果业务操作成功,执行消息发送前应用故障,消息发不出去,导致消息丢失(订单系统与会计系统的数据不一致);
- 如果业务操作成功,应用正常,但消息系统故障或网络故障,也会导致消息发不出去(订单系统与会计系统的数据不一致);
/** 支付订单处理 **/
public void completeOrder() {
// 订单处理(业务操作)
orderBiz.process();
// 发送会记原始凭证消息(发送消息)
sendAccountingVoucherMsg ();
}
- 处理方式2
- 这种情况下,更不可控,消息发出去了,但业务可能会失败(订单系统与会计系统的数据不一致)
/** 支付订单处理 **/
public void completeOrder() {
// 发送会记原始凭证消息(发送消息)
sendAccountingVoucherMsg ();
// 订单处理(业务操作)
orderBiz.process();
}
- 主动方应用先把消息发给消息中间件,消息状态标记为“待确认”;
- 消息中间件收到消息后,把消息持久化到消息存储中,但并不向被动方应用投递消息;
- 消息中间件返回消息持久化结果(成功/失败),主动方应用根据返回结果进行判断如何进行业务操作处理:
- 失败:放弃业务操作处理,结束(必要时向上层返回失败结果);
- 成功:执行业务操作处理;
- 业务操作完成后,把业务操作结果(成功/失败)发送给消息中间件;
- 消息中间件收到业务操作结果后,根据业务结果进行处理;
- 失败:删除消息存储中的消息,结束;
- 成功:更新消息存储中的消息状态为“待发送(可发送)”,紧接着执行消息投递;
- 前面的正向流程都成功后,向被动方应用投递消息
- 异常点分析(任何一个环节都可能会出问题)
- 从主动方应用的角度Fenix
异常状况 | 可能的状态 | 一致性 |
---|---|---|
预发送消息失败 | 消息未进存储,业务操作未执行(可能的原因:主动方应用、网络、消息中间件、消息存储) | 一致 |
预发送消息后,主动方应用没有收到返回消息存储结果 | (1)消息未进存储,业务操作未执行 | 一致 |
预发送消息后,主动方应用没有收到返回消息存储结果 | (2)消息已进存储(待确认),业务操作未执行 | 不一致 |
收到消息存储成功的返回结果,但未执行业务操作就失败 | 消息已进存储(待确认),业务操作未执行 | 不一致 |
- 从消息中间件的角度来分析
异常状况 | 可能的状态 | 一致性 |
---|---|---|
消息中间件没有收到主动方应用的业务操作处理结果 | (1)消息已进存储(待确认),业务操作未执行(或业务操作出错回滚了) | 不一致 |
消息中间件没有收到主动方应用的业务操作处理结果 | (2)消息已进存储(待确认),业务操作成功 | 不一致 |
消息中间件收到业务操作结果(成功/失败),但处理消息存储中的消息状态失败 | (1)消息已进存储(待确认),业务操作未执行(或业务操作出错回滚了) | 不一致 |
消息中间件收到业务操作结果(成功/失败),但处理消息存储中的消息状态失败 | (2)消息已进存储(待确认),业务操作成功 | 不一致 |
- 总结
异常状况 | 一致性 | 异常处理方法 |
---|---|---|
消息未进存储,业务操作未执行 | 一致 | 无需处理 |
消息已进存储(状态待确认),业务操作未执行 | 不一致 | 确认业务操作结果,处理消息(删除消息) |
消息已进存储(状态待确认),业务操作成功,但未通知发送 | 不一致 | 确认业务操作结果,处理消息(更新消息状态,执行消息投递) |
- 解决方法:消息中间件进行对主动业务方进行一次查询确认消息的状态
- 消息消费流程
- 消息的消费确认流程中,任何一个环节都可能会出问题!
- Producer生成消息并发送给MQ(同步、异步);
- MQ接收消息并将消息数据持久化到消息存储(持久化操作为可选配置);
- MQ向Producer返回消息的接收结果(返回值、异常);
- Consumer监听并消费MQ中的消息;
- Consumer获取到消息后执行业务处理;
- Consumer对已成功消费的消息向MQ进行ACK确认(确认后的消息将从MQ中删除)。
-
队列消息模型的特点:
- 消息生产者将消息发送到Queue中,然后消息消费者监听Queue并接收消息;
- 消息被确认消费以后,就会从Queue中删除,所以消息消费者不会消费到已经被消费的消息;
- Queue支持存在多个消费者,但是对某一个消息而言,只会有一个消费者成功消费。
-
常规MQ队列消息的处理流程无法实现消息发送一致性;
-
投递消息的流程其实就是消息的消费流程,可细化。
-
解决方案如下
常规MQ队列消息的处理流程无法实现消息发送一致性,因此直接使用现成的MQ中间件产品无法实现可靠消息最终一致性的分布式事务解决方案。
- 被动方应用接收到消息,业务处理完成后应用出问题,消息中间件不知道消息处理结果,会重新投递消息。
- 被动方应用接收到消息,业务处理完成后网络出问题,消息中间件收不到消息处理结果,会重新投递消息。
- 被动方应用接收到消息,业务处理时间过长,消息中间件因消息超时未确认,会再次投递消息。
- 被动方应用接收到消息,业务处理完成,消息中间件问题导致收不到消息处理结果,消息会重新投递。
- 被动方应用接收到消息,业务处理完成,消息中间件收到了消息处理结果,但由于消息存储故障导致消息没能成功确认,消息会再次投递。
约束:被动方应用对于消息的业务处理要实现幂等
- 对于存在同一请求数据会发生重复调用的业务接口,接口的业务逻辑要实现幂等性设计。
- 在实际的业务应用场景中,业务接口的幂等性设计,常结合可查询操作一起使用。
- 支付订单创建:商户编号 + 商户订单号 + 订单状态
- 订单更新处理:平台订单号 + 订单状态
- 会计系统记账:系统来源 + 请求号
- 现成MQ中间件不支持消息发送的一致性
- 直接改造MQ中间件难度很大
- 有什么变通的实现方式?
- 消息存储与业务存储在同一个本地事务中进行,消息存储后设置为待确认状态,并异步将消息发送(注意需要异步发送消息,不要影响主流程).
- 通过一定策略不断将待确认的消息重新发送.
- 业务方回收到消息,成功处理业务,并持久化完成后调主动方接口,通知主动方此消息已经处理完成,主动方将数据库中消息状态改为已发送.
- 实现一个消息管理系统,手动处理多次重发失败已死亡的消息.
- 消息实时性较高
- 从应用设计开发的角度实现了消息数据的可靠性,消息数据的可靠性不依赖于MQ中间件,弱化了对MQ中间件的依赖
- 方案轻量容易实现
- 业务绑定,耦合性强,不通用(如果不想直接被动方应用调用主动方应用接口,也可以使用另外一条队列来通知主动方应用)
- 消息数据与业务数据同库,占用业务系统资源
- 业务系统在使用关系型数据库的情况下,消息服务性能会受到关系型数据库并发性能的局限
- 现成MQ中间件不支持消息发送的一致性
- 直接改造MQ中间件难度很大
- 有什么变通的实现方式?
- 存储预发送消息(主动方业务执行之前进行,预发送的消息存储后状态为待确认)
- 确认并发送消息(主动方业务完成之后,主动方或消息状态确认系统通过此接口将消息变为取消或发送中)
- 查询状态确认超时的消息(消息状态确认系统使用)
- 确认消息已被成功消费(被动方业务执行完成之后调用,消息队列的ACK可以在业务处理之前返回)
- 查询消费确认超时的信息
- 消息服务独立部署,独立维护,独立伸缩
- 消息存储可以按需选择不同的数据库来集成实现
- 消息服务可以被相同的使用场景共用,降低消息重复建设消息服务的成本
- 从应用设计开发的角度实现了消息数据的可靠性,消息数据的可靠性不依赖于MQ中间件,弱化了MQ中间件特性的依赖
- 降低了业务系统与系统间的耦合,有利于系统的扩展维护
- 一次消息需要发送两次请求
- 主动方应用系统需要实现业务操作状态校验查询接口