什么是confirm消息确认机制?顾名思义,就是生产端投递的消息一旦投递到RabbitMQ后,RabbitMQ就会发送一个确认消息给生产端,让生产端知道我已经收到消息了,否则这条消息就可能已经丢失了,需要生产端重新发送消息了。
我们都知道,消息从生产端到消费端消费要经过3个步骤:
生产端发送消息到RabbitMQ;RabbitMQ发送消息到消费端;消费端消费这条消息;【资料图】
这3个步骤中的每一步都有可能导致消息丢失,消息丢失不可怕,可怕的是丢失了我们还不知道,所以要有一些措施来保证系统的可靠性。
这里的可靠并不是一定就100%不丢失了,磁盘损坏,机房爆炸等等都能导致数据丢失,当然这种都是极小概率发生,能做到99.999999%消息不丢失,就是可靠的了。下面来具体分析一下问题以及解决方案。
生产端可靠性投递
生产端可靠性投递,即生产端要确保将消息正确投递到RabbitMQ中。
生产端投递的消息丢失的原因有很多,比如消息在网络传输的过程中发生网络故障消息丢失,或者消息投递到RabbitMQ时RabbitMQ挂了,那消息也可能丢失,而我们根本不知道发生了什么。
针对以上情况,RabbitMQ本身提供了一些机制。
事务消息机制
事务消息机制由于会严重降低性能,所以一般不采用这种方法,我就不介绍了,而采用另一种轻量级的解决方案:confirm消息确认机制。
confirm消息确认机制
什么是confirm消息确认机制?顾名思义,就是生产端投递的消息一旦投递到RabbitMQ后,RabbitMQ就会发送一个确认消息给生产端,让生产端知道我已经收到消息了,否则这条消息就可能已经丢失了,需要生产端重新发送消息了。
通过下面这句代码来开启确认模式:
channel.confirmSelect();// 开启发送方确认模式
然后异步监听确认和未确认的消息:
channel.addConfirmListener(new ConfirmListener() { //消息正确到达broker @Override public void handleAck(long deliveryTag, boolean multiple) throws IOException { System.out.println("已收到消息"); //做一些其他处理 } //RabbitMQ因为自身内部错误导致消息丢失,就会发送一条nack消息 @Override public void handleNack(long deliveryTag, boolean multiple) throws IOException { System.out.println("未确认消息,标识:" + deliveryTag); //做一些其他处理,比如消息重发等 }});
这样就可以让生产端感知到消息是否投递到RabbitMQ中了,当然这样还不够,稍后我会说一下极端情况。
消息持久化
那消息持久化呢?我们知道,RabbitMQ收到消息后将这个消息暂时存在了内存中,那这就会有个问题,如果RabbitMQ挂了,那重启后数据就丢失了,所以相关的数据应该持久化到硬盘中,这样就算RabbitMQ重启后也可以到硬盘中取数据恢复。那如何持久化呢?
message消息到达RabbitMQ后先是到exchange交换机中,然后路由给queue队列,最后发送给消费端。
所有需要给exchange、queue和message都进行持久化:
exchange持久化:
//第三个参数true表示这个exchange持久化channel.exchangeDeclare(EXCHANGE_NAME, "direct", true);
queue持久化:
//第二个参数true表示这个queue持久化channel.queueDeclare(QUEUE_NAME, true, false, false, null);
message持久化:
//第三个参数MessageProperties.PERSISTENT_TEXT_PLAIN表示这条消息持久化channel.basicPublish(EXCHANGE_NAME, ROUTING_KEY, MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes(StandardCharsets.UTF_8));
这样,如果RabbitMQ收到消息后挂了,重启后会自行恢复消息。
到此,RabbitMQ提供的几种机制都介绍完了,但这样还不足以保证消息可靠性投递RabbitMQ中,上面我也提到了会有极端情况,比如RabbitMQ收到消息还没来得及将消息持久化到硬盘时,RabbitMQ挂了,这样消息还是丢失了,或者RabbitMQ在发送确认消息给生产端的过程中,由于网络故障而导致生产端没有收到确认消息,这样生产端就不知道RabbitMQ到底有没有收到消息,就不好做接下来的处理。
所以除了RabbitMQ提供的一些机制外,我们自己也要做一些消息补偿机制,以应对一些极端情况。接下来我就介绍其中的一种解决方案——消息入库。
消息入库
消息入库,顾名思义就是将要发送的消息保存到数据库中。
首先发送消息前先将消息保存到数据库中,有一个状态字段status=0
,表示生产端将消息发送给了RabbitMQ但还没收到确认。在生产端收到确认后将status设为1,表示RabbitMQ已收到消息。
这里有可能会出现上面说的两种情况,所以生产端这边开一个定时器,定时检索消息表,将status=0并且超过固定时间后(可能消息刚发出去还没来得及确认这边定时器刚好检索到这条status=0的消息,所以给个时间)还没收到确认的消息取出重发(第二种情况下这里会造成消息重复,消费者端要做幂等性),可能重发还会失败,所以可以做一个最大重发次数,超过就做另外的处理。
这样消息就可以可靠性投递到RabbitMQ中了,而生产端也可以感知到了。
消费端消息不丢失
既然已经可以让生产端100%可靠性投递到RabbitMQ了,那接下来就改看看消费端的了,如何让消费端不丢失消息。
默认情况下,以下3种情况会导致消息丢失:
在RabbitMQ将消息发出后,消费端还没接收到消息之前,发生网络故障,消费端与RabbitMQ断开连接,此时消息会丢失;在RabbitMQ将消息发出后,消费端还没接收到消息之前,消费端挂了,此时消息会丢失;消费端正确接收到消息,但在处理消息的过程中发生异常或宕机了,消息也会丢失。其实,上述3中情况导致消息丢失归根结底是因为RabbitMQ的自动ack机制,即默认RabbitMQ在消息发出后就立即将这条消息删除,而不管消费端是否接收到,是否处理完,导致消费端消息丢失时RabbitMQ自己又没有这条消息了。
所以就需要将自动ack机制改为手动ack机制。
消费端手动确认消息:
DeliverCallback deliverCallback = (consumerTag, delivery) -> { try { //接收到消息,做处理 //手动确认 channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false); } catch (Exception e) { //出错处理,这里可以让消息重回队列重新发送或直接丢弃消息 }};//第二个参数autoAck设为false表示关闭自动确认机制,需手动确认channel.basicConsume(QUEUE_NAME, false, deliverCallback, consumerTag -> {});
这样,当autoAck参数置为false,对于RabbitMQ服务端而言,队列中的消息分成了两个部分:
一部分是等待投递给消费端的消息;一部分是已经投递给消费端,但是还没有收到消费端确认信号的消息。如果RabbitMQ一直没有收到消费端的确认信号,并且消费此消息的消费端已经断开连接或宕机(RabbitMQ会自己感知到),则RabbitMQ会安排该消息重新进入队列(放在队列头部),等待投递给下一个消费者,当然也有能还是原来的那个消费端,当然消费端也需要确保幂等性。
标签:
-
全球速看:字节二面:引入RabbitMQ后,你如何保证全链路数据100%不丢失?
什么是confirm消息确认机制?顾名思义,就是生产端投递的消息一旦投递到RabbitMQ后,RabbitMQ就会发送一个
-
联通查询话费余额怎么查_查别人话费余额怎么查_全球最新
你们好,最近小活发现有诸多的小伙伴们对于联通查询话费余额怎么查,查别人话费余额怎么查这个问题都颇为感
-
熊猫课堂 这个幼儿园的小朋友们收到一份特别的“六一”礼物
“为什么大熊猫喜欢爬树?除了竹子外,大熊猫还吃些什么东西?大熊猫的可以活多少岁?大熊猫有几个手指?”
-
男子自称我爸是人大代表 官方:失实详细内容
1、想必大家现在对于男子自称我爸是人大代表官方:失实方面的信息都是比较想了解的吧,那么针对于男子自称我
-
D9订单量超8万台,N7即将上市,腾势首个品牌日有何干货?-环球时讯
D9订单量超8万台,N7即将上市,腾势首个品牌日有何干货?
-
全球资讯:检察机关“禁噪护考” 做夜幕下的“守护者”
“天天吵,天天吵,一直到凌晨,我也就算了,可是我女儿马上就要高考了,这还怎么休息?就没有人能管管吗?
-
全球头条:学校宁愿拿几十万买一块蠢笨的石头,为什么就是不愿给老师发福利
教育教学是学校工作的核心工作。一所学校要想办好,你就必须要提高教育教学质量。只有教育教学质量提高了,
-
今日热文:百亿级规模“投资未来” 沪市公司业绩有底气发展有信心
5月以来,多家沪市公司抛出百亿元级规模投资计划,反映市场回暖的温度,折射行业向好的景气度,彰显“投资
-
天山电子:5月31日融资买入79.95万元,融资融券余额1621.69万元
5月31日,天山电子(301379)融资买入79 95万元,融资偿还171 84万元,融资净卖出91 9万元,融资余额1621 69万元。
-
环球热文:局地突遭“烂场雨”,麦收如何雨中“抢”粮
当前,全国小麦陆续进入集中收获期。5月下旬,北方冬麦区出现大范围持续降雨天气过程,局地出现短时强降雨
-
凯赛生物:5月31日融资买入251.68万元,融资融券余额4.05亿元|环球视点
5月31日,凯赛生物(688065)融资买入251 68万元,融资偿还173 31万元,融资净买入78 37万元,融资余额3 75亿元。
-
全球短讯!妙趣游园促成长
5月31日,市第十九小学举办庆“六一”学科趣味游园暨科技节活动(如图,记者林石湛摄)。此次游园活动集综
-
如何计算机器转速
RPM是表面速度的函数。在机械加工中,表面速度表示刀具外刃移动的速度。使用表面速度和刀具的尺寸来计算以
-
董其昌选集_关于董其昌选集的简介|每日快播
音频解说1、《董其昌选集》是浙江人民美术出版社出版的图书,作者是董其昌本文关于董其昌选集的简介就讲解
-
安全泄放装置
1、安全泄放装置是指紧急或异常状况时,能自动开启以防止因内部流体介质超压导致承压设备失效的装置。2、以
-
当前热文:中国女排取得世联赛开门红
中国女排取得世联赛开门红
-
每日信息:几月几号秋分(几月几号秋分?)
1、今年秋分是2020年9月22日,21:30:32,星期二。2、秋分在每年公历的9月22日,23日或24日,为中国传统节
-
焦点速讯:起泡白葡萄酒怎么喝_白葡萄酒怎么喝
1、如果真的要喝干白,是不能添加任何东西进去的,如果加了的话就浪费一瓶好的干白了,一定要把白葡萄酒冻
-
尖锐疣会自己消失吗会传染吗_尖锐疣会自己消失吗 简讯
1、一般来说,尖锐湿疣这种性病是不会自行消退的。2、如果不治疗,可能导致数量越来越多,面积越来越大,最
-
牛吴辉在1999年演唱歌曲
当前大家对于牛年伍晖演唱歌曲都是颇为感兴趣的,大家都想要了解一下牛年伍晖演唱歌曲,那么小美也是在网络
-
LOL:不听话就取消S赛名额,拳头官方给LCS赛区下达最后通牒
在说LCS赛区的事情之前先吐槽一下LGD这个队伍,他们在和平精英项目中引进33岁高龄的华晨宇当青训就够离谱了
-
天天观焦点:5月十大牛股出炉!最牛股票大涨176%
5月A股行情收官。在持续震荡的5月,上证指数、深证成指、创业板指分别累计下跌3 57%、4 80%、5 65%。5月涨
-
满油满电续航1370公里 加速7秒内!吉利银河L7上市:13.87万起 全球热门
满油满电续航1370公里加速7秒内!吉利银河L7上市:13 87万起
-
济南地铁8号线最新公示!
近日,济南市自然资源和规划局发布地铁8号线青年政治学院站主体(地下)和围子山站主体(地下)建设用地规划许
-
聚焦新兴“网红”玩具 加强儿童和学生用品质量安全监管
央视网消息:市场监管总局5月31日召开专题新闻发布会,介绍市场监管部门加强儿童和学生用品质量安全监管情
-
结构力学
这个问题不够专业哦,虚功原理包括虚位移原理和虚力原理。结构力学中的很多问题是用虚位移原理解决的,尤其
-
通用织物阻燃剂商品报价动态(2023-05-31)|当前聚焦
交易商品牌 产地交货地最新报价通用织物阻燃剂 含量≥80%,密度1 090-1 140g cm3,PH值5—7河南森蒂环保科
-
【独家焦点】欧洲央行
欧洲央行:欧元区金融稳定性依然脆弱。经济、通胀以及利率不确定性对企业构成压力。金融市场容易受到无序调
-
如果面试的岗位,自己没经验,如何表达可以让面试官可能选择自己|滚动
在面试中,如果你没有实操经验,可能会让面试官对你的能力产生疑虑。但是,即使你没有直接相关的实操经验,
-
2016世界女排大奖赛中国对美国(2016世界女排大奖赛)
来为大家解答以上的问题。2016世界女排大奖赛中国对美国,2016世界女排大奖赛这个很多人还不知道,现在让我