IT 老齐的架构 300 讲-7【消息队列】

2021年10月09日 17:18 · 阅读(610) ·

来源

《IT 老齐的架构 300 讲》

课件地址:https://manongbiji.oss-cn-beijing.aliyuncs.com/ittailkshow/it300/download/ppt_all_in_one.zip

【014】巧用阿里 Canal 实现 MySQL 异构数据同步

来源:https://www.bilibili.com/video/BV1zy4y1L7HF?spm_id_from=333.999.0.0

啥是异构数据?

在 Java 代码中新增 MySQL 数据时向调用团队 B 同步接口新建 ES 商品数据

团队 A 与团队 B 的协作产生代码强耦合
团队 A 必须了解团队 B 提供的接口才可以实现
但本身这并不属于团队 A 的工作范畴

扩展困难
团队 C 维护的 MongoDB 也要同步 MySQL 数据库
难道要团队 A 又要改代码?烦死啦,等排期吧!!!

要保证数据做到准实时同步
还要团队之间解耦,团队 A 不再背锅

Canal 是阿里巴巴旗下的一款开源项目,纯 Java 开发。基于数据库增量日志解析,提供增量数据订阅&消费,目前主要支持了 MySQL(也支持 mariaDB)。

通过 Canal 解决了数据监听的问题
下面要解决解耦的问题

【038】MQ 中间件是如何实现可靠性投递的?

来源:https://www.bilibili.com/video/BV12v411w7Ap?spm_id_from=333.999.0.0

MQ 中间件的通用消息投递过程,有哪些情况可能会造成丢数

● 发送阶段,遇到高延迟,Producer 会多次重发消息,直到 Broker ack 确认,过程中 Broker 会自动去重,超时 Producer 产生异常,应用进行捕获提示。
● 存储阶段,Broker 先刷盘再 ack 确认,即便 ack 失败消息不会丢失,多次重试直到 Producer 接收,会导致消息积压。
● 消费阶段,Broker 向 Consumer 发数据,一段时间未接收,自动重发,直到 Consumer Ack 确认,Consumer 注意幂等处理

丢数问题解决方案:
1.异步刷盘(NSYNC_FLUSH),改同步刷盘
2.存储介质损坏,建议采用 RAID10 或分布式存储
3.不要启用自动 Ack,RabbitMQ 存在此问题
4.避开都市传说 ActiveMQ

【087】先写库还是先发消息?RocketMQ如何保证消息与事务一致性?

来源:https://www.bilibili.com/video/BV1Fr4y1v7NU?spm_id_from=333.999.0.0

先写库还是先发消息?

首先,咱们来看一下工作场景, 订单 ID 1030被创建后要保存到数据库,同时该 1030 订单通过 MQ 投递给其他系统进行消费。

如果要保证订单数据入库与消息投递状态要保证最终一致, 要怎么做?

这里有两种常见做法:

第一种,先写库,再发送数据

  1. //插入 1030 号订单
  2. orderDao.insert(1030, order);
  3. //向1030号订单新增3条订单明细,10081-10083,
  4. orderDetailDao.insert(10081, 1030, orderDetai11);
  5. orderDetailDao.insert (10082, 1030, orderDetail2);
  6. orderDetailDao.insert(10083, 1030, orderDetail3);
  7. //向 MQ 发送数据,如果数据发送失败
  8. SendResult result = producer.send(orderMessage);
  9. if(result.getstate().equals("SEND_0K")){
  10. connection.commit();
  11. }else{
  12. connection.rollback();
  13. }

如果生产者发送消息时,因为网络原因导致 10 秒消息才返回 SendResult 结果,这就意味这 10 秒内数据库事务无法提交,大量并发下,数据库连接资源会在这 10 秒内迅速耗尽,后续请求进入连接池等待状态,最终导致系统停止响应。

第二种,先发消息,再写库

  1. //伪代码
  2. //向 MQ 发送数据,如果数据发送失败
  3. SendResult result = producer.send(orderMessage);
  4. if(result.getstate().equals("SEND_0K")){
  5. //插入 1030 号订单
  6. orderDao.insert(1030, order);
  7. //向1030号订单新增3条订单明细,10081-10083,
  8. orderDetailDao.insert(10081, 1030, orderDetai11);
  9. orderDetailDao.insert (10082, 1030, orderDetail2);
  10. orderDetailDao.insert(10083, 1030, orderDetail3);
  11. connection.commit();
  12. }else{
  13. connection.rollback();
  14. }

问题更严重,因为消息已经被发送了,消费者可以立即消费,比如下游消费者为 1030 订单自动设置了“快递信息”,可是如果后续 orderDao 向数据库插入数据产生异常导致业务失败。我们还需要再次发送取消 1030 订单”的消息把下游 1030 订单分配的“快递信息”给撤销,这些都是在业务层面上的额外处理,这无疑提高了对程序员的要求与处理的难度。

那有没有什么方式可以既不阻塞数据库事务,也能保证最终一致性呢? 有,RocketMQ 提供了事务消息可以保障应用本地事务与 MQ 最终一致性。

案例实践

架构拓扑

代码分析

MessageType4-发出事务消息代码

  1. public class MessageType4 {
  2. public static void main(String[] args) throws MQClientException, InterruptedException, UnsupportedEncodingException {
  3. //事务消息一定要使用TransactionMQProducer事务生产者创建
  4. TransactionMQProducer producer = new TransactionMQProducer("transaction_producer_grou");
  5. //从NameServer获取配置数据
  6. producer.setNamesrvAddr("192.168.31.103:9876");
  7. //CachedThreadPool1 线程池用于回查本地事务状态
  8. ExecutorService cachedThreadPool = Executors.newCachedThreadPool(new ThreadFactory() {
  9. @Override
  10. public Thread new Thread(Runnable r){
  11. Thread thread = new Thread(r);
  12. thread. setName("check-transaction-thread");
  13. return thread;
  14. }
  15. });
  16. //将生产者与线程池绑定
  17. producer.setExecutorService(cachedThreadPool);
  18. //绑定事务监听器,用于执行代码
  19. TransactionListener transactionListener = new OrderTransactionListenerImpl();
  20. producer.setTransactionListener(transactionListener);
  21. //启动生产者
  22. producer.start();
  23. //创建消息对象
  24. Message msg = new Message ("order", "order-1030", "1030", "1030订单与明细的完整JSON数据(略) ".getBytes());
  25. //- - 定要调用sendMessageInTransaction发送事务消息
  26. //参数1: 消息对象
  27. //参数2: 其他参数,目前用不到
  28. producer.sendMessageInTransaction(msgnull);
  29. }
  30. }

TransactionListenerImpl-处理本地业务事务代码

  1. public class OrderTransactionListenerImpl implements TransactionListener {
  2. @Override
  3. //执行本地事务代码
  4. public LocalTransactionState executeLocalTransaction(Message msgobject arg) {
  5. log.info("正在执行本地事务,订单编号:" + msg.getKeys());
  6. /*伪代码
  7. try{
  8. //插入1030号订单
  9. orderDao.insert (1030, order);
  10. //向1030号订单新增3条订单明细,10081- 10083,
  11. orderDetailDao.insert(10081 , 1030,orderDetail1);
  12. orderDetailDao.insert(10082 , 1030, orderDetail2);
  13. orderDetailDao.insert(10083, 1030, orderDetail3);
  14. connection.commit( );
  15. //返回Commit,消费者可以消费1030订单消息
  16. return LocalTransactionState.COMMIT_MESSAGE;
  17. }catch(Exception e){
  18. //返回Rollback, Broker 直接将数据删除,消费者不能收到1030订单消息
  19. connection.rollback();
  20. return LocalTransactionState.ROLLBACK_MESSAGE ;
  21. }
  22. */
  23. log.info("模拟网络中断,Broker 并未收到生产者本地事务状态回执,返回UNKNOW");
  24. return LocalTransactionState.UNKNOW;
  25. }
  26. @Override
  27. //回查本地事务处理状态
  28. public LocalTransactionState checkLocalTransaction(MessageExt msg) {
  29. String keys = msg.getKeys();
  30. log.info("触发回查,正在检查”+ keys +“订单状态");
  31. /* 伪代码
  32. Order order = orderDao.selectById(1030);
  33. if(order != null){
  34. //查询到记录,代表数据库已处理成功,回查返回Commit,消费者可以消费1030订单消息
  35. return LocalTransactionState.COMMIT_MESSAGE;
  36. }else{
  37. //未查询到记录,代表数据库处理失败,回查返回Rollback,Broker直 接将数据删除,消费者不能收到1030订单消息
  38. return LocalTransactionState.ROLLBACK_MESSAGE;
  39. }
  40. */
  41. log.info("回查结果,”+ keys + "订单已入库,发送Commit指令");
  42. return LocalTransactionState.COMMIT_MESSAGE;
  43. }
  44. }

实验了解 RocketMQ 事务执行过程

标准流程

1.producer.sendMessagelnTransaction(msg, nul);执行成功
此时 1030 订单消息已被发送到MQ服务器(Broker) ,不过该消息在 Broker 此时状态为”half-message” ,相当于存储在 MQ 中的”临时消息”,此状态下消息无法被投递给消费者。

2.生产者发送消息成功后自动触发 OrderTransactionListenerlmpl.executelocalTransaction() 执行本地事务。

当消息发送成功,紧接着生产者向本地数据库写数据,数据库写入后提交 commit,同时 executel ocalTransaction 方法返回 COMMIT_ MESSAGE, 生产者会再次向 MQ 服务器发送一个 commit 提交消息, 此前在 Broker 中保存 1030 订单消息状态就从 “half-message” 变为 “已提交”, broker 将消息发给下游的消费者处理。

  1. public LocalTransactionState executeLocalTransaction(Message msgobject arg) {
  2. log.info("正在执行本地事务,订单编号:" + msg.getKeys());
  3. /*伪代码
  4. try{
  5. //插入1030号订单
  6. orderDao.insert (1030, order);
  7. //向1030号订单新增3条订单明细,10081- 10083,
  8. orderDetailDao.insert(10081 , 1030,orderDetail1);
  9. orderDetailDao.insert(10082 , 1030, orderDetail2);
  10. orderDetailDao.insert(10083, 1030, orderDetail3);
  11. connection.commit( );
  12. //返回Commit,消费者可以消费1030订单消息
  13. return LocalTransactionState.COMMIT_MESSAGE;
  14. }catch(Exception e){
  15. //返回Rollback, Broker 直接将数据删除,消费者不能收到1030订单消息
  16. connection.rollback();
  17. return LocalTransactionState.ROLLBACK_MESSAGE;
  18. }
  19. */
  20. }

异常流程

异常流程1: producer.sendMessageInTransaction(msg, null); 执行失败,抛出异常此时没有任何消息被发出,本地事务也不会执行,除了报错外不会产生任何不一致。

异常流程2: producer.sendMessagelnTransaction(msg, null); 执行成功,本地事务执行失败

OrderTransactionListenerlmpl:

  1. public LocalTransactionState executeLocalTransaction(Message msgobject arg) {
  2. log.info("正在执行本地事务,订单编号:" + msg.getKeys());
  3. /*伪代码
  4. try{
  5. //插入1030号订单
  6. }catch(Exception e){
  7. //返回Rollback, Broker 直接将数据删除,消费者不能收到1030订单消息
  8. connection.rollback();
  9. return LocalTransactionState.ROLLBACK_MESSAGE;
  10. }
  11. */
  12. }

此时本地事务执行 rollback回滚,数据库数据被撤销,同时 executeLocalTransaction 方法返 ROLLBACK_MESSAGE 代表回滚,生产者会再次向 MQ 服务器发送一个 rollback 回滚消息,此前在Broker 中保存 1030 订单消息就会被直接删除,不会发送给消费者,本地事务也可以保证与 MQ 消息一致。

异常流程3: producer.sendMessagelnTransaction(msg, null); 执行成功,本地事务执行成功,但给 Broker 返回 Commit 消息时断网了,导致 broker 无法收到提交指令。

  1. public LocalTransactionState executeLocalTransaction(Message msgobject arg) {
  2. log.info("正在执行本地事务,订单编号:" + msg.getKeys());
  3. /*伪代码
  4. try{
  5. //插入1030号订单
  6. orderDao.insert (1030, order);
  7. //向1030号订单新增3条订单明细,10081- 10083,
  8. orderDetailDao.insert(10081 , 1030,orderDetail1);
  9. orderDetailDao.insert(10082 , 1030, orderDetail2);
  10. orderDetailDao.insert(10083, 1030, orderDetail3);
  11. connection.commit( );
  12. //返回 Commit 时网络中断
  13. return LocalTransactionState.COMMIT_MESSAGE;
  14. }catch(Exception e){
  15. //返回Rollback, Broker 直接将数据删除,消费者不能收到1030订单消息
  16. connection.rollback();
  17. return LocalTransactionState.ROLLBACK_MESSAGE;
  18. }
  19. */
  20. }

此时本地数据库订单数据入库,但 MQ 因为断网无法收到生产者的发来的 “commit” 消息,1030 订单数据一直处于 “half message”的状态,消息无法被投递到消费者,本地事务与 MQ 消息的一致性被破坏。

RocketMQ 为了解决这个问题,设计了回查机制,对于 broker 中的 half message,每过一小段时间就自动尝试与生产者通信,试图调用通 OrderTransactionListenerlmpl.checkLocalTransaction() 方法确认之前的本地事务是否成功。

  1. @Override
  2. //回查本地事务处理状态
  3. public LocalTransactionState checkLocalTransaction(MessageExt msg) {
  4. String keys = msg.getKeys();
  5. log.info("触发回查,正在检查”+ keys +“订单状态");
  6. /* 伪代码
  7. Order order = orderDao.selectById(1030);
  8. if(order != null){
  9. //查询到记录,代表数据库已处理成功,回查返回Commit,消费者可以消费1030订单消息
  10. return LocalTransactionState.COMMIT_MESSAGE;
  11. }else{
  12. //未查询到记录,代表数据库处理失败,回查返回Rollback,Broker直 接将数据删除,消费者不能收到1030订单消息
  13. return LocalTransactionState.ROLLBACK_MESSAGE;
  14. }
  15. */
  16. log.info("回查结果,”+ keys + "订单已入库,发送Commit指令");
  17. return LocalTransactionState.COMMIT_MESSAGE;
  18. }

由 MQ 服务器主动发起,生产者调用 OrderTransactionListenerlmpl.checklocalTransaction() 检查之前数据库事务是否完成。

checkLocalTransaction() 查询到订单数据,说明之前的数据库事务已经完成,返回 COMMIT_MESSAGE,这样 Broker 中的 1030 订单消息就可以被发送给消费者进行处理。

运行结果:

checkLocalTransaction() 未查询到订单数据,说明之前的数据库事务没有处理成功,返回 ROLLBACK_MESSAGE,这样 Broker 中的 1030 订单消息就会被删除。

【045】聊一聊 RabbitMQ 六种队列模式与应用场景

来源:https://www.bilibili.com/video/BV1Fb4y1m72h?spm_id_from=333.999.0.0

● Producer:生产者,消息的提供者
● Consumer:消费者,消息的使用者
● Broker:MQ 服务器,管理队列、消息及相关信息
● Message:消息,程序间的通信的数据
● Queue:队列,消息存放的容器,消息先进先出
● Exchange:交换机,用于分发消息

简单模式

工作队列

● 本讲将创建一个工作队列(Work Queue),它会发送一些耗时的任务给多个工作者(Worker)。
● 在多个消息的情况下,Work Queue 会将消息分派给不同的消费者,每个消费者都会接收到不同的消息,并且可以根据处理消息的速度来接收消息的数量,进而让消费者程序发挥最大性能。
● Work Queue 特别适合在集群环境中做异步处理,能最大程序发挥每一台服务器的性能。

案例:短信通知服务

发布订阅

发布(Publish)/订阅(Subscribe)模式

● 发布/订阅模式中,生产者不再直接与队列绑定,而是将数据发送至“交换机 Exchange”
● 交换机 Exchange 用于将数据按某种规则送入与之绑定的队列,进而供消费者使用。
● 发布/订阅模式中,交换机将无差别的将所有消息送入与之绑定的队列,所有消费者拿到的消息完全相同。

发布/订阅模式使用场景

● 发布订阅模式因为所有消费者获得相同的消息,所以特别适合“数据提供商与应用商“。
● 例如:中国气象局提供“天气预报”送入交换机,网易、新浪、百度、搜狐等门户接入通过队列绑定到该交换机,自动获取气象局推送的气象数据。

路由模式

● 路由(Routing)模式是在发布订阅模式基础上的变种。
● 发布订阅模式是无条件将所有消息分发给所有消费者队列。
● 路由模式则是 Exchange 根据 Routing Key 有条件的将数据筛选后发给消费者队列。

主题模式

主题 Topic 模式

● 主题 Topic 模式是在 Routing 模式基础上,提供了对 RouteKey 模糊匹配的功能,可以简化程序的编写。
● 主题模式下,模糊匹配表达式规则为
· * 匹配单个关键字
· # 匹配所有关键字

RPC同步通信

该过程线层阻塞。虽然 MQ 是异步的,一般生产者将消息送至消息队列候不关心消费者什么时候消费,
但有的情况下需要得到消费者的反馈。该模式需要两个队列,没有固定的生产者、消费者。实际用得很少,因为有 dubbo。

【046】项目案例分享,宜信如何利用 RabbitMQ 队列解决消息积压问题?

来源:https://www.bilibili.com/video/BV1Wq4y1f79c?spm_id_from=333.999.0.0

问题描述:每天上午 10 点,全国 2 万多名客户经理集中录入上一日 JK 单据到 BorrowSale 系统,峰值能达到 500单/s

但信审系统任务重,最多只支持到 60单/s,每日会导致大量消息积压

最简单办法:RabbitMQ 改为工作队列模式,将消息送给 9 个信审系统实例并行完成。但信审系统不愿配合,以预算不足为由拒绝,奶奶个腿!
作为 BS 前台就得自己想办法解决消息积压问题

BS 前台解决办法:依赖 RabbitMQ 的“死信队列”特性,将死信消息自动送达死信队列中,BS 前台接收到死信消息,1 小时后重新发送,等待闲时由信审系统进行处理。这样便实现了在不增加资源的前提下,对信审系统资源进行“削峰填谷”。

什么是死信?
即过期或无法处理的消息。

死信是怎么产生的?
● 消费者拒绝接受,且没有重新入列的消息
● 队列满了,无法入列的消息
● 消息设置了 TTL 过期时间,超过有效时间后的消息
● 队列设置了 TTL 过期时间,超过有效时间后的消息

如果消息跟队列均设置 TTL,以 TTL 短的优先。

死信交换机跟死信队列均为标准的 MQ 交换机和队列,MQ 配置时作用在死信上。

前台—RabbitMQ,若产生死信消息,则—死信交换机—死信队列(过一段时间再)—前台。
无死信消息则后台处理成功。

【048】为什么 Kafka 这么快,解密 Kafka 高性能背后的秘密

来源:https://www.bilibili.com/video/BV1sq4y1f7ri?spm_id_from=333.999.0.0

秘密:
● 磁盘顺序读写
● 页缓存
● 零拷贝
● 批量操作

Kafka 和 RabbitMQ 区别

BATJ 每小时产生数TB数据
对于 Kafka 这种大吞吐量设计自然是最佳选择

磁盘顺序读写

The Pathologies of Big Data

Kafka 基于顺序读写实现高性能

Kafka 避免使用JVM
直接使用操作系统的页缓存特性提高处理速度
进而避免了JVM GC 带来的性能损耗
Kafka 采用字节紧密存储,避免产生对象,这样可以进一步提高空间利用率

页缓存

Linux 操作系统

零拷贝

Linux 操作系统

批量操作

批量操作

【051】你是微博的架构师,大 V 更新动态,动态通知采用推 Push 还是拉 Pull 更合适?

来源:https://www.bilibili.com/video/BV1of4y1E7Jv?spm_id_from=333.999.0.0

推模式(Push)与拉模式(Pull)有什么不同

Push 模式 Pull 拉取模式
实时性 较好,通过网络管道准实时发送 较差,取决于定时轮询时间
服务器状态 有状态,需持久化粉丝动态队列 无状态,根据请求实时查询
风险项 大 V 动态的并发“写扩散”问题
大量动态队列持久化造成磁盘高 IO
大量粉丝准点“读扩散”问题
大V粉丝准点并发查询搞垮服务器
应用场景 微信 微博(早期)

写扩散与读扩散该如何优化应对

写扩散(Push)优化
● 设置上限,微信好友 5000 个
● 限流策略,X 分钟内完成消息发布
● 优化存储策略,采用 NoSQL 或大数据方案

读扩散(Pull)优化
● MQ 削峰填谷,超长队列直接拒绝
● 增加轮询间隔,减少请求次数
● 服务端增加缓存,优化查询效率
● 增加验证码,分散时间,减少机器人刷票

案例
推特的混合模式
● 粉丝量小于 X,Push 模式
● 粉丝量大于 X,Pull 模式

【052】抽 Push 或拉 Pull,一字之差,差之千里。一个真实架构案例引发的思考

来源:https://www.bilibili.com/video/BV1zh411n7Dw?spm_id_from=333.999.0.0

你要问我设计架构时脑子里面在想什么
我会告诉你怎么“解耦”
业务解耦、技术解耦、团队解耦、数据解耦…

问题描述,所有业务系统都使用账户系统的用户数据,现在允许修改用户名,怎么把数据同步到业务系统?

当前的现状:账户系统调用各个业务系统的接口,更新用户信息

问题:耦合太严重了,账户系统做了不是自己职责范围内的事情,每增加一个系统,就需要增加一个调用逻辑
所以让各个业务系统来调用账户系统接口获取最近更新的用户信息

更好的方法是使用 MQ

【077】几张图讲明白RocketMQ高可用方案,进来瞅瞅,工作面试都用的上

来源:https://www.bilibili.com/video/BV1p44y1Y7AR?spm_id_from=333.999.0.0

RocketMQ 有哪些角色

场景:

RocketMQ 消息生产消费流程

Broker 主挂了 RocketMQ 怎么办

Broker 主从都挂了 RocketMQ 怎么办

NameServer 挂了 RocketMQ 怎么办

同步复制与异步复制之间的区别与应用场景