消息积压了怎么办?

2021/09/15 429点热度 0人点赞 0条评论

引入消息中间件以后,系统交互的复杂性提升,一旦出了问题以后,要想各种各样的解决方案。

如果用消息中间件是为了削峰填谷,那在高峰期积压了慢慢处理即可。

如果用消息中间件只是为了解耦、异步处理,业务对时效性有一定的要求,那么就要求必须尽快处理。

消息积压,一般分为两大类:

  • 生产者生产消息过快

  • 消费者消费过慢 这两个问题,又可能引出别的问题,如中间件的磁盘有限,无法导致

这两大类又可以细分为三种:

生产者消费过快

  • 运营活动(导致单位时间内流量激增)

  • 微信推文

  • push营销

  • 短信触达

  • 跑批处理,一下子导入过多的数据到消费者

  • 异常数据处理,一下子把所有的异常数据推入到了队列,导致正常业务延迟严重;

  • 数据跑批

  • 新需求上线未评估业务量

  • 消费端能满足正常需求的2倍处理,但是生产端上线了个需求,导致生产速度翻了n倍,导致队列快速膨胀

消费者消费不过来

  • 一种是系统处理确实慢,下游系统有瓶颈;

  • 一种是消费端处理异常或超时,导致无法提交offset,

  • 一种是失败又重新放入队列进行了重试;

消息队列混合使用

  • 有时效性的和无时效性的业务处理都放入到了一个队列里(比如风控授信决策、准入决策、交易决策)

解决方案又分为几种

  1. 消费者瓶颈,但是能通过扩容快速解决的

  2. 消费者有瓶颈,但是能满足正常的业务需求的

  3. 消费者异常导致的无法提交

  4. 消费者异常导致的重试机制

  5. 多种类型的消息在队列中堵塞

消费者瓶颈,但是能通过扩容快速解决的

  • 比如我们默认建的kafka,partition 默认是2个,能满足大部分业务

  • 分析情况是消费者处理不过来?还是消费者能处理,就是queue的数量太少?

  • 消费者处理不过来

  • 消费者是否多线程消费(有的项目只有一个线程处理,处理速度优先,评估能否改成多线程消费)

  • 评估增加消费者的可能性(别增加了消费者,数据库等又承受不了,导致崩溃)

  • 消费者能处理,就是queue的数量太少(扩容queue的数量)

  • 消费者不是纯粹的消费者,可能还承担了别的业务,消费者节点多,但是queue只有两个,只有两个消费者处理,通过增加queue让别的消费者也能处理;

  • kafka可以通过kafka管理后台,调整partition的数量(只能增加,不能减少),需评估是临时性的,还是以后会经常性这样

  • 其他的消息中间件不知道能不能动态调整queue的数量,如果不能动态调整,只能新建topic,然后将消息都导过去,但是消费者得改代码

消费者有瓶颈(消费者不能扩容)但是能满足正常的业务需求的

  • 这种情况就比较麻烦,如果是削峰填谷,那就慢慢等着处理即可;

  • 如果是异步、解耦又有时效性(这时候需要将积压的消息先转移,优先恢复正常业务)

  • 临时申请一个新的topic

  • 先暂停下所有的消费者(也可以不暂停)

  • 通过工具,将原队列中积压的消息,抽取到新的队列里(将积压的消息转发出去,可以根据消息的发送时间判定);

  • 启动消费者消费新的消息,确保正在操作的用户的流程

  • 转移出的消息如何处理?得具体业务场景具体分析

消费者异常

  • 代码bug,导致抛异常,未提交成功(重复消费),一直是那几条消息在处理,后面的消息都无法处理

  • 处理过程:改代码上线,将异常数据写入日志文件,分析如何修复数据

  • 执行过程过慢(这就有点危险了,如果开启了自动提交,呵呵,就看你对一致性的要求)

  • 消费过程中间有慢sql(优化sql),自动提交后,如果执行异常,则会丢失

  • 消费过程中调用外部接口,外部接口响应过慢(读取超时,抛异常,和代码bug处理一致,响应时间过长,自动提交,可能导致数据不一致)

  • 手动提交,或者控制消费者整体执行时间,不超过自动提交时间

  • 通过定时任务补偿异常数据

  • 代码bug,导致的数据质量不符合要求,无法处理

  • 如:我们的延迟消息,有些系统会失败后继续延迟,导致循环执行

  • 异常后,又将消息推入到了队列

多种类型的消息在队列中堵塞

  • 例如:跑批和正常业务搅合到了一起,跑批可以慢慢处理,毕竟用户没在app那操作,正常业务的用户可等不得;

  • 临时额度大批量到期,到期时间没有随机,一下子几百万数据丢入了队列里,堵塞了正常业务

  • 授信(最长10分钟)、准入(准实时)、交易审核(异步慢慢跑),混合到一个队列,交易可能压单后批量处理

  • 解决方案:

  • 正常业务和跑批任务的队列尽可能隔离开;

  • 对实时性有要求的和对实时性无要求的也分隔开;

  • 跑批任务也尽可能的散开,比如临时额度的到期时间内存int自增,在到期时间上加上对应的int的毫秒数,100万条最多+100万毫秒,也就1000秒,也就16分钟,如果还是密集,可以设置步长(最终根据系统的吞吐量来决定),比如我们的额度系统单机qps能达到5000~8000,1秒钟1000条基本上不会影响业务

如果觉得对你有帮助,请关注公众号:5ycode,后续会不断更新哦

公众号图片

yxkong

这个人很懒,什么都没留下

Watch Series Online watch movies online free Watch AlRawabi School for Girls 2021 Free Online Watch West Side Story 2021 Free Online Watch Theodosia 2022 Free Online