MQ
00 分钟
2022-9-16

各个mq的区别

ActiveMQ、RabbitMQ、RocketMQ、Kafka
 

mq的优点:

在特殊场景下可以解耦、异步、削峰

mq的缺点:

可用性降低:系统引入的外部依赖越多,越容易挂掉,万一mq要是挂了更麻烦
复杂性提高:如何保证消息没有被重复消费?咱们处理消息丢失的情况?怎么保证消息传递的顺序性?
一致性问题:A系统处理完了直接返回了成功,要是BCD三个系统里,BD成功了,C写库事变了,怎么办那?数据就不一致了
 

如何保证消息队列的高可用?

消息生产的高可用:创建topic时,把topic的多个message queue创建在多个broker组上。这样当一个broker组的master不可用后,producer仍然可以给其他组的master发送消息。 rocketmq目前还不支持主从切换,需要手动切换
消息消费的高可用:consumer并不能配置从master读还是slave读。当master不可用或者繁忙的时候consumer会被自动切换到从slave读。这样当master出现故障后,consumer仍然可以从slave读,保证了消息消费的高可用
RocketMQ的Broker有三种集群部署方式:
  • 1.单台Master部署;
  • 2.多台Master部署;
  • 3.多Master多Slave部署;

如何保证消息不被重复消费?

通过幂等性来保证消息不被重复消费,添加全局唯一id,通过id来判断是否消费过

如何保证消息的顺序性?

自定义负载均衡模式,把这一批顺序消息有共同的唯一ID,把唯一ID与队列的数量进行hash取余运算,保证这批消息进入到同一个队列

如何保证消息不丢失?

有五个场景可能会丢失消息:
  1. 生产者发消息到Broker时
    1. Produce有三种发消息的方式
      1. 同步发送
      2. 异步发送
      3. 单向发送
      4. 由于同步和异步方式均需要Broker返回确认信息,单向发送只管发,不需要Broker返回确认信息,所以单向发送并不知道消息是不是发送成功,单向发送不能保证消息不丢失。
        produce要想发消息时保证消息不丢失,可以采用同步发送的方式去发消息,send消息方法只要不抛出异常,就代表发送成功。发送成功会有多个状态,以下对每个状态进行说明:
      5. SEND_OK:消息发送成功,Broker刷盘、主从同步成功
      6. FLUSH_DISK_TIMEOUT:消息发送成功,但是服务器同步刷盘(默认为异步刷盘)超时(默认超时时间5秒)
      7. FlUSH_SLAVE_TIMEOUT:消息发送成功,但是服务器同步复制(默认为异步复制)到Slave时超时(默认超时时间5秒)
      8. SLAVE_NOT_AVAILABLE:Broker从节点不存在
      9. 采用事务机制
  1. Broker主从同步
  1. Broker存储消息时
    1. 使用同步刷盘机制
      同步刷盘机制,只有在消息真正持久化至磁盘后,RocketMQ的Broker端才会真正地返回给Producer端一个成功的ACK响应,保证了消息可靠性,但影响了性能。异步刷盘则能够充分利用OS的PageCache的优势,只要消息写入PageCache即可将成功的ACK返回给Producer端,消息刷盘采用后台异步线程提交的方式进行,提高了MQ的性能和吞吐量,但是可能会丢消息。点击查看配置方式
      使用同步复制机制
      同步复制是等Master和Slave都写入消息成功后才反馈给客户端写入成功的状态。在同步复制下,如果Master节点故障,Slave上有全部的数据备份,这样容易恢复数据。但是同步复制会增大数据写入的延迟,降低系统的吞吐量。异步复制是只要master写入消息成功,就反馈给客户端写入成功的状态。速度快,同样可能丢消息!
  1. 消费者消费消息时
    1. 如果出现消费超时(默认15分钟)、拉取消息后消费者服务宕机等消费失败的情况,此时的Broker由于没有等到消费者返回的ACK,会向同一个消费者组中的其他消费者间隔性的重发消息,直到消息返回成功(默认是重复发送16次,若16次还是没有消费成功,那么该消息会转移到死信队列,人工处理或是单独写服务处理这些死信消息),正常同步消费是不会出现这些问题的
  1. 整个MQ服务宕机
    1. 当NameServer全部挂了(注意是全部,只要有一个存活就是正常),或者Broker宕机了。在这种情况下,RocketMQ相当于整个服务都不可用了,那他本身肯定无法给我们保证消息不丢失了。只能自己设计一个降级方案来处理这个问题了。例如在订单系统中,如果多次尝试发送RocketMQ不成功,那就只能另外找给地方(Redis、文件或者内存等)把订单消息缓存下来,然后起一个线程定时的扫描这些失败的订单消息,尝试往RocketMQ发送。这样等RocketMQ的服务恢复过来后,就能第一时间把这些消息重新发送出去。整个这套降级的机制,在大型互联网项目中,都是必须要有的。
总结:
  1. 生产者使用同步发送,或者发送事务消息
  1. Broker配置同步刷盘 + 同步复制
  1. 消费者不要使用异步消费
  1. 整个MQ挂了之后准备降级方案

如何快速处理积压消息?

使用web控制台,就能直接看到消息的积压情况。在Web控制台的主题页面,可以通过 Consumer管理按钮实时看到消息的积压情况。其中差值一栏就是队列中积压的消息!
也可以通过mqadmin指令在后台检查各个Topic的消息延迟情况
如果Topic下的MessageQueue配置得不够多的话,那就不能用上面这种增加Consumer节点个数的方法了。这时怎么办呢?
这时如果要快速处理积压的消息,可以创建一个新的Topic,并配置足够多的MessageQueue。并紧急上线一组新的消费者,只负责搬运积压的消息,转储到新的Topic中,这个速度是可以很快的。然后在新的Topic上,就可以通过增加消费者个数来提高消费速度了。之后再根据情况恢复成正常情况。
 
 
哪些场景用到了mq,解决了什么问题,有遇到什么问题,如何优化的,多少tps,是否遇到过生产环境的故障和问题
 

使用场景:

电商营销-》营销系统-》优惠券
发券
push APP通知
 
牛交所APP
购买限量理财产品,买入以后发放券(代金券、加息券),扣减库存,更新状态,通知用户购买成功、付款前,付款中,付款后、第三方对接
 
使用xxl-job来执行定时任务,分布式任务调度
为了防止重复执行,Executors执行的时候会带有shardIndex、shardNums两个参数,通过id取模数量【hash(id)%shardNums == shardIndex】,来判断是否是自己来执行
 
发券:
场景:
全量push消息,所有用户都发送推送,如果用户数量是1000w,那么应该怎么做?
数据库查询都困难,无法一次查询出所有的数据,即使使用limit,也需要查询很多,如果查询次数过多,会出现fullgc,仅仅是查询用户数据就会造成内存不足,MySQL的缓存区也会被刷
全量发送优惠券
短时间插入所有用户的优惠券,短高峰,对MySQL写入压力较大
指定人群发放优惠券
营销系统 调用 推送系统 调用 会员系统 调用 mq
爆款推荐
以上场景统一的问题是大数据查询,会使数据库读写压力变大
方案:
全量push消息
千万级用户分片
运营人员创建活动-》mq异步操作任务-》查询push总人数-》batch为1000个任务,将1000个任务batch为100个任务,将100个任务推送到mq,推送系统集群去消费,分解任务,使用线程池去查询用户信息并推送到第三方推送平台
基于MQ做一个异步化,batch拆分,1000w用户,1w个batch,每个batch1000个用户,基于1w个消息在次batch化,将100条消息合并为一个batch,一次性发送到mq,使用mq实现任务异步化。
推送系统集群部署
每台机器推送部分任务
削峰填谷:使用mq将大任务分解为小任务,减轻压力
全量发送优惠券(基于MQ分发事件提高扩展性)
惰性发券:用户登录以后调用登录事件,营销系统进行消费,查询redis是否需要发券
 

爆款理财商品分布式调度推送方案

通过用户行为数据和核心数据-》大数据团队通过算法-》分析出每个用户感兴趣的商品-》用户画像标签爆款推荐-》分布式调度推送
通过用户画像数据计算出总数,比如说是100w,将100w分为100个batch,将100个batch发送到MQ进行消费
xxl-job:使用executors分组发送请求shard,任务bean进行分片,通过hash(数据id) % shards=shard取模
notion image
 
lombok:
基于builder构造器模式来进行数据对象的构造
 
 

评论