- 来源
- 【014】巧用阿里 Canal 实现 MySQL 异构数据同步
- 【038】MQ 中间件是如何实现可靠性投递的?
- 【087】先写库还是先发消息?RocketMQ如何保证消息与事务一致性?
- 【045】聊一聊 RabbitMQ 六种队列模式与应用场景
- 【046】项目案例分享,宜信如何利用 RabbitMQ 队列解决消息积压问题?
- 【048】为什么 Kafka 这么快,解密 Kafka 高性能背后的秘密
- 【051】你是微博的架构师,大 V 更新动态,动态通知采用推 Push 还是拉 Pull 更合适?
- 【052】抽 Push 或拉 Pull,一字之差,差之千里。一个真实架构案例引发的思考
- 【077】几张图讲明白RocketMQ高可用方案,进来瞅瞅,工作面试都用的上
来源
课件地址:https://manongbiji.oss-cn-beijing.aliyuncs.com/ittailkshow/it300/download/ppt_all_in_one.zip
【014】巧用阿里 Canal 实现 MySQL 异构数据同步
来源:https://www.bilibili.com/video/BV1zy4y1L7HF?spm_id_from=333.999.0.0
啥是异构数据?
在 Java 代码中新增 MySQL 数据时向调用团队 B 同步接口新建 ES 商品数据
团队 A 与团队 B 的协作产生代码强耦合
团队 A 必须了解团队 B 提供的接口才可以实现
但本身这并不属于团队 A 的工作范畴
扩展困难
团队 C 维护的 MongoDB 也要同步 MySQL 数据库
难道要团队 A 又要改代码?烦死啦,等排期吧!!!
要保证数据做到准实时同步
还要团队之间解耦,团队 A 不再背锅
Canal
是阿里巴巴旗下的一款开源项目,纯 Java 开发。基于数据库增量日志解析,提供增量数据订阅&消费,目前主要支持了 MySQL(也支持 mariaDB)。
通过 Canal 解决了数据监听的问题
下面要解决解耦的问题
【038】MQ 中间件是如何实现可靠性投递的?
来源:https://www.bilibili.com/video/BV12v411w7Ap?spm_id_from=333.999.0.0
MQ 中间件的通用消息投递过程,有哪些情况可能会造成丢数
● 发送阶段,遇到高延迟,Producer 会多次重发消息,直到 Broker ack 确认,过程中 Broker 会自动去重,超时 Producer 产生异常,应用进行捕获提示。
● 存储阶段,Broker 先刷盘再 ack 确认,即便 ack 失败消息不会丢失,多次重试直到 Producer 接收,会导致消息积压。
● 消费阶段,Broker 向 Consumer 发数据,一段时间未接收,自动重发,直到 Consumer Ack 确认,Consumer 注意幂等处理。
丢数问题解决方案:
1.异步刷盘(NSYNC_FLUSH),改同步刷盘
2.存储介质损坏,建议采用 RAID10 或分布式存储
3.不要启用自动 Ack,RabbitMQ 存在此问题
4.避开都市传说 ActiveMQ
【087】先写库还是先发消息?RocketMQ如何保证消息与事务一致性?
来源:https://www.bilibili.com/video/BV1Fr4y1v7NU?spm_id_from=333.999.0.0
先写库还是先发消息?
首先,咱们来看一下工作场景, 订单 ID 1030被创建后要保存到数据库,同时该 1030 订单通过 MQ 投递给其他系统进行消费。
如果要保证订单数据入库与消息投递状态要保证最终一致, 要怎么做?
这里有两种常见做法:
第一种,先写库,再发送数据
//插入 1030 号订单
orderDao.insert(1030, order);
//向1030号订单新增3条订单明细,10081-10083,
orderDetailDao.insert(10081, 1030, orderDetai11);
orderDetailDao.insert (10082, 1030, orderDetail2);
orderDetailDao.insert(10083, 1030, orderDetail3);
//向 MQ 发送数据,如果数据发送失败
SendResult result = producer.send(orderMessage);
if(result.getstate().equals("SEND_0K")){
connection.commit();
}else{
connection.rollback();
}
如果生产者发送消息时,因为网络原因导致 10 秒消息才返回 SendResult 结果,这就意味这 10 秒内数据库事务无法提交,大量并发下,数据库连接资源会在这 10 秒内迅速耗尽,后续请求进入连接池等待状态,最终导致系统停止响应。
第二种,先发消息,再写库
//伪代码
//向 MQ 发送数据,如果数据发送失败
SendResult result = producer.send(orderMessage);
if(result.getstate().equals("SEND_0K")){
//插入 1030 号订单
orderDao.insert(1030, order);
//向1030号订单新增3条订单明细,10081-10083,
orderDetailDao.insert(10081, 1030, orderDetai11);
orderDetailDao.insert (10082, 1030, orderDetail2);
orderDetailDao.insert(10083, 1030, orderDetail3);
connection.commit();
}else{
connection.rollback();
}
问题更严重,因为消息已经被发送了,消费者可以立即消费,比如下游消费者为 1030 订单自动设置了“快递信息”,可是如果后续 orderDao 向数据库插入数据产生异常导致业务失败。我们还需要再次发送取消 1030 订单”的消息把下游 1030 订单分配的“快递信息”给撤销,这些都是在业务层面上的额外处理,这无疑提高了对程序员的要求与处理的难度。
那有没有什么方式可以既不阻塞数据库事务,也能保证最终一致性呢? 有,RocketMQ 提供了事务消息可以保障应用本地事务与 MQ 最终一致性。
案例实践
架构拓扑
代码分析
MessageType4-发出事务消息代码
public class MessageType4 {
public static void main(String[] args) throws MQClientException, InterruptedException, UnsupportedEncodingException {
//事务消息一定要使用TransactionMQProducer事务生产者创建
TransactionMQProducer producer = new TransactionMQProducer("transaction_producer_grou");
//从NameServer获取配置数据
producer.setNamesrvAddr("192.168.31.103:9876");
//CachedThreadPool1 线程池用于回查本地事务状态
ExecutorService cachedThreadPool = Executors.newCachedThreadPool(new ThreadFactory() {
@Override
public Thread new Thread(Runnable r){
Thread thread = new Thread(r);
thread. setName("check-transaction-thread");
return thread;
}
});
//将生产者与线程池绑定
producer.setExecutorService(cachedThreadPool);
//绑定事务监听器,用于执行代码
TransactionListener transactionListener = new OrderTransactionListenerImpl();
producer.setTransactionListener(transactionListener);
//启动生产者
producer.start();
//创建消息对象
Message msg = new Message ("order", "order-1030", "1030", "1030订单与明细的完整JSON数据(略) ".getBytes());
//- - 定要调用sendMessageInTransaction发送事务消息
//参数1: 消息对象
//参数2: 其他参数,目前用不到
producer.sendMessageInTransaction(msg,null);
}
}
TransactionListenerImpl-处理本地业务事务代码
public class OrderTransactionListenerImpl implements TransactionListener {
@Override
//执行本地事务代码
public LocalTransactionState executeLocalTransaction(Message msg,object arg) {
log.info("正在执行本地事务,订单编号:" + msg.getKeys());
/*伪代码
try{
//插入1030号订单
orderDao.insert (1030, order);
//向1030号订单新增3条订单明细,10081- 10083,
orderDetailDao.insert(10081 , 1030,orderDetail1);
orderDetailDao.insert(10082 , 1030, orderDetail2);
orderDetailDao.insert(10083, 1030, orderDetail3);
connection.commit( );
//返回Commit,消费者可以消费1030订单消息
return LocalTransactionState.COMMIT_MESSAGE;
}catch(Exception e){
//返回Rollback, Broker 直接将数据删除,消费者不能收到1030订单消息
connection.rollback();
return LocalTransactionState.ROLLBACK_MESSAGE ;
}
*/
log.info("模拟网络中断,Broker 并未收到生产者本地事务状态回执,返回UNKNOW");
return LocalTransactionState.UNKNOW;
}
@Override
//回查本地事务处理状态
public LocalTransactionState checkLocalTransaction(MessageExt msg) {
String keys = msg.getKeys();
log.info("触发回查,正在检查”+ keys +“订单状态");
/* 伪代码
Order order = orderDao.selectById(1030);
if(order != null){
//查询到记录,代表数据库已处理成功,回查返回Commit,消费者可以消费1030订单消息
return LocalTransactionState.COMMIT_MESSAGE;
}else{
//未查询到记录,代表数据库处理失败,回查返回Rollback,Broker直 接将数据删除,消费者不能收到1030订单消息
return LocalTransactionState.ROLLBACK_MESSAGE;
}
*/
log.info("回查结果,”+ keys + "订单已入库,发送Commit指令");
return LocalTransactionState.COMMIT_MESSAGE;
}
}
实验了解 RocketMQ 事务执行过程
标准流程
1.producer.sendMessagelnTransaction(msg, nul);
执行成功
此时 1030 订单消息已被发送到MQ服务器(Broker) ,不过该消息在 Broker 此时状态为”half-message” ,相当于存储在 MQ 中的”临时消息”,此状态下消息无法被投递给消费者。
2.生产者发送消息成功后自动触发 OrderTransactionListenerlmpl.executelocalTransaction()
执行本地事务。
当消息发送成功,紧接着生产者向本地数据库写数据,数据库写入后提交 commit
,同时 executel ocalTransaction
方法返回 COMMIT_ MESSAGE
, 生产者会再次向 MQ 服务器发送一个 commit 提交消息, 此前在 Broker 中保存 1030 订单消息状态就从 “half-message” 变为 “已提交”, broker 将消息发给下游的消费者处理。
public LocalTransactionState executeLocalTransaction(Message msg,object arg) {
log.info("正在执行本地事务,订单编号:" + msg.getKeys());
/*伪代码
try{
//插入1030号订单
orderDao.insert (1030, order);
//向1030号订单新增3条订单明细,10081- 10083,
orderDetailDao.insert(10081 , 1030,orderDetail1);
orderDetailDao.insert(10082 , 1030, orderDetail2);
orderDetailDao.insert(10083, 1030, orderDetail3);
connection.commit( );
//返回Commit,消费者可以消费1030订单消息
return LocalTransactionState.COMMIT_MESSAGE;
}catch(Exception e){
//返回Rollback, Broker 直接将数据删除,消费者不能收到1030订单消息
connection.rollback();
return LocalTransactionState.ROLLBACK_MESSAGE;
}
*/
}
异常流程
异常流程1: producer.sendMessageInTransaction(msg, null);
执行失败,抛出异常此时没有任何消息被发出,本地事务也不会执行,除了报错外不会产生任何不一致。
异常流程2: producer.sendMessagelnTransaction(msg, null);
执行成功,本地事务执行失败
OrderTransactionListenerlmpl:
public LocalTransactionState executeLocalTransaction(Message msg,object arg) {
log.info("正在执行本地事务,订单编号:" + msg.getKeys());
/*伪代码
try{
//插入1030号订单
}catch(Exception e){
//返回Rollback, Broker 直接将数据删除,消费者不能收到1030订单消息
connection.rollback();
return LocalTransactionState.ROLLBACK_MESSAGE;
}
*/
}
此时本地事务执行 rollback
回滚,数据库数据被撤销,同时 executeLocalTransaction
方法返 ROLLBACK_MESSAGE
代表回滚,生产者会再次向 MQ 服务器发送一个 rollback
回滚消息,此前在Broker
中保存 1030 订单消息就会被直接删除,不会发送给消费者,本地事务也可以保证与 MQ 消息一致。
异常流程3: producer.sendMessagelnTransaction(msg, null);
执行成功,本地事务执行成功,但给 Broker 返回 Commit 消息时断网了,导致 broker 无法收到提交指令。
public LocalTransactionState executeLocalTransaction(Message msg,object arg) {
log.info("正在执行本地事务,订单编号:" + msg.getKeys());
/*伪代码
try{
//插入1030号订单
orderDao.insert (1030, order);
//向1030号订单新增3条订单明细,10081- 10083,
orderDetailDao.insert(10081 , 1030,orderDetail1);
orderDetailDao.insert(10082 , 1030, orderDetail2);
orderDetailDao.insert(10083, 1030, orderDetail3);
connection.commit( );
//返回 Commit 时网络中断
return LocalTransactionState.COMMIT_MESSAGE;
}catch(Exception e){
//返回Rollback, Broker 直接将数据删除,消费者不能收到1030订单消息
connection.rollback();
return LocalTransactionState.ROLLBACK_MESSAGE;
}
*/
}
此时本地数据库订单数据入库,但 MQ 因为断网无法收到生产者的发来的 “commit” 消息,1030 订单数据一直处于 “half message”的状态,消息无法被投递到消费者,本地事务与 MQ 消息的一致性被破坏。
RocketMQ 为了解决这个问题,设计了回查机制,对于 broker 中的 half message,每过一小段时间就自动尝试与生产者通信,试图调用通 OrderTransactionListenerlmpl.checkLocalTransaction()
方法确认之前的本地事务是否成功。
@Override
//回查本地事务处理状态
public LocalTransactionState checkLocalTransaction(MessageExt msg) {
String keys = msg.getKeys();
log.info("触发回查,正在检查”+ keys +“订单状态");
/* 伪代码
Order order = orderDao.selectById(1030);
if(order != null){
//查询到记录,代表数据库已处理成功,回查返回Commit,消费者可以消费1030订单消息
return LocalTransactionState.COMMIT_MESSAGE;
}else{
//未查询到记录,代表数据库处理失败,回查返回Rollback,Broker直 接将数据删除,消费者不能收到1030订单消息
return LocalTransactionState.ROLLBACK_MESSAGE;
}
*/
log.info("回查结果,”+ keys + "订单已入库,发送Commit指令");
return LocalTransactionState.COMMIT_MESSAGE;
}
由 MQ 服务器主动发起,生产者调用 OrderTransactionListenerlmpl.checklocalTransaction()
检查之前数据库事务是否完成。
checkLocalTransaction()
查询到订单数据,说明之前的数据库事务已经完成,返回 COMMIT_MESSAGE
,这样 Broker 中的 1030 订单消息就可以被发送给消费者进行处理。
运行结果:
checkLocalTransaction()
未查询到订单数据,说明之前的数据库事务没有处理成功,返回 ROLLBACK_MESSAGE
,这样 Broker 中的 1030 订单消息就会被删除。
【045】聊一聊 RabbitMQ 六种队列模式与应用场景
来源:https://www.bilibili.com/video/BV1Fb4y1m72h?spm_id_from=333.999.0.0
● Producer:生产者,消息的提供者
● Consumer:消费者,消息的使用者
● Broker:MQ 服务器,管理队列、消息及相关信息
● Message:消息,程序间的通信的数据
● Queue:队列,消息存放的容器,消息先进先出
● Exchange:交换机,用于分发消息
简单模式
工作队列
● 本讲将创建一个工作队列(Work Queue),它会发送一些耗时的任务给多个工作者(Worker)。
● 在多个消息的情况下,Work Queue 会将消息分派给不同的消费者,每个消费者都会接收到不同的消息,并且可以根据处理消息的速度来接收消息的数量,进而让消费者程序发挥最大性能。
● Work Queue 特别适合在集群环境中做异步处理,能最大程序发挥每一台服务器的性能。
案例:短信通知服务
发布订阅
发布(Publish)/订阅(Subscribe)模式
● 发布/订阅模式中,生产者不再直接与队列绑定,而是将数据发送至“交换机 Exchange”
● 交换机 Exchange 用于将数据按某种规则送入与之绑定的队列,进而供消费者使用。
● 发布/订阅模式中,交换机将无差别的将所有消息送入与之绑定的队列,所有消费者拿到的消息完全相同。
发布/订阅模式使用场景
● 发布订阅模式因为所有消费者获得相同的消息,所以特别适合“数据提供商与应用商“。
● 例如:中国气象局提供“天气预报”送入交换机,网易、新浪、百度、搜狐等门户接入通过队列绑定到该交换机,自动获取气象局推送的气象数据。
路由模式
● 路由(Routing)模式是在发布订阅模式基础上的变种。
● 发布订阅模式是无条件将所有消息分发给所有消费者队列。
● 路由模式则是 Exchange 根据 Routing Key 有条件的将数据筛选后发给消费者队列。
主题模式
主题 Topic 模式
● 主题 Topic 模式是在 Routing 模式基础上,提供了对 RouteKey 模糊匹配的功能,可以简化程序的编写。
● 主题模式下,模糊匹配表达式规则为
· * 匹配单个关键字
· # 匹配所有关键字
RPC同步通信
该过程线层阻塞。虽然 MQ 是异步的,一般生产者将消息送至消息队列候不关心消费者什么时候消费,
但有的情况下需要得到消费者的反馈。该模式需要两个队列,没有固定的生产者、消费者。实际用得很少,因为有 dubbo。
【046】项目案例分享,宜信如何利用 RabbitMQ 队列解决消息积压问题?
来源:https://www.bilibili.com/video/BV1Wq4y1f79c?spm_id_from=333.999.0.0
问题描述:每天上午 10 点,全国 2 万多名客户经理集中录入上一日 JK 单据到 BorrowSale 系统,峰值能达到 500单/s
但信审系统任务重,最多只支持到 60单/s,每日会导致大量消息积压
最简单办法:RabbitMQ 改为工作队列模式,将消息送给 9 个信审系统实例并行完成。但信审系统不愿配合,以预算不足为由拒绝,奶奶个腿!
作为 BS 前台就得自己想办法解决消息积压问题
BS 前台解决办法:依赖 RabbitMQ 的“死信队列”特性,将死信消息自动送达死信队列中,BS 前台接收到死信消息,1 小时后重新发送,等待闲时由信审系统进行处理。这样便实现了在不增加资源的前提下,对信审系统资源进行“削峰填谷”。
什么是死信?
即过期或无法处理的消息。
死信是怎么产生的?
● 消费者拒绝接受,且没有重新入列的消息
● 队列满了,无法入列的消息
● 消息设置了 TTL 过期时间,超过有效时间后的消息
● 队列设置了 TTL 过期时间,超过有效时间后的消息
如果消息跟队列均设置 TTL,以 TTL 短的优先。
死信交换机跟死信队列均为标准的 MQ 交换机和队列,MQ 配置时作用在死信上。
前台—RabbitMQ,若产生死信消息,则—死信交换机—死信队列(过一段时间再)—前台。
无死信消息则后台处理成功。
【048】为什么 Kafka 这么快,解密 Kafka 高性能背后的秘密
来源:https://www.bilibili.com/video/BV1sq4y1f7ri?spm_id_from=333.999.0.0
秘密:
● 磁盘顺序读写
● 页缓存
● 零拷贝
● 批量操作
Kafka 和 RabbitMQ 区别
BATJ 每小时产生数TB数据
对于 Kafka 这种大吞吐量设计自然是最佳选择
磁盘顺序读写
The Pathologies of Big Data
Kafka 基于顺序读写实现高性能
Kafka 避免使用JVM
直接使用操作系统的页缓存特性提高处理速度
进而避免了JVM GC 带来的性能损耗
Kafka 采用字节紧密存储,避免产生对象,这样可以进一步提高空间利用率
页缓存
Linux 操作系统
![]()
零拷贝
Linux 操作系统
![]()
批量操作
批量操作
【051】你是微博的架构师,大 V 更新动态,动态通知采用推 Push 还是拉 Pull 更合适?
来源:https://www.bilibili.com/video/BV1of4y1E7Jv?spm_id_from=333.999.0.0
推模式(Push)与拉模式(Pull)有什么不同
Push 模式 | Pull 拉取模式 | |
---|---|---|
实时性 | 较好,通过网络管道准实时发送 | 较差,取决于定时轮询时间 |
服务器状态 | 有状态,需持久化粉丝动态队列 | 无状态,根据请求实时查询 |
风险项 | 大 V 动态的并发“写扩散”问题 大量动态队列持久化造成磁盘高 IO | 大量粉丝准点“读扩散”问题 大V粉丝准点并发查询搞垮服务器 |
应用场景 | 微信 | 微博(早期) |
写扩散与读扩散该如何优化应对
写扩散(Push)优化
● 设置上限,微信好友 5000 个
● 限流策略,X 分钟内完成消息发布
● 优化存储策略,采用 NoSQL 或大数据方案
读扩散(Pull)优化
● MQ 削峰填谷,超长队列直接拒绝
● 增加轮询间隔,减少请求次数
● 服务端增加缓存,优化查询效率
● 增加验证码,分散时间,减少机器人刷票
案例
推特的混合模式
● 粉丝量小于 X,Push 模式
● 粉丝量大于 X,Pull 模式
【052】抽 Push 或拉 Pull,一字之差,差之千里。一个真实架构案例引发的思考
来源:https://www.bilibili.com/video/BV1zh411n7Dw?spm_id_from=333.999.0.0
你要问我设计架构时脑子里面在想什么
我会告诉你怎么“解耦”
业务解耦、技术解耦、团队解耦、数据解耦…
问题描述,所有业务系统都使用账户系统的用户数据,现在允许修改用户名,怎么把数据同步到业务系统?
当前的现状:账户系统调用各个业务系统的接口,更新用户信息
问题:耦合太严重了,账户系统做了不是自己职责范围内的事情,每增加一个系统,就需要增加一个调用逻辑
所以让各个业务系统来调用账户系统接口获取最近更新的用户信息
更好的方法是使用 MQ
【077】几张图讲明白RocketMQ高可用方案,进来瞅瞅,工作面试都用的上
来源:https://www.bilibili.com/video/BV1p44y1Y7AR?spm_id_from=333.999.0.0
RocketMQ 有哪些角色
场景:
RocketMQ 消息生产消费流程
Broker 主挂了 RocketMQ 怎么办
Broker 主从都挂了 RocketMQ 怎么办
NameServer 挂了 RocketMQ 怎么办
同步复制与异步复制之间的区别与应用场景