使用消息队列的三个常见问题

使用消息队列的三个常见问题

1.如何保证消息不丢

一个消息队列立足之根本,就是要保证消息不丢失,那么具体是如何实现的呢?哪些地方可能会导致消息丢失呢?

无非就是以下三个阶段可能会导致消息丢失:

  • 生产阶段
  • 存储阶段
  • 消费阶段

现在市面上的主流消息队列都可以保证消息不丢,不管是网络出现问题,还是节点出现宕机,都可以保证消息不丢,很多情况下都是开发者使用不当导致的

下面分别看一下这三个阶段,是如何做到不丢消息的:

  • 生产阶段:

生产阶段普遍采用的是请求确认机制,生产者发出消息之后,当Broker收到消息之后,会返回一个确认响应,如果生产者收到这个响应,那么说明发送成功, 如果没收到,就会进行超时重试,这里面会涉及到一个问题,为什么生产者没有收到响应?可能是两个原因:1.因为网络问题,消息确实在传输过程中丢了,这种情况下进行重试,可以保证Broker收到消息,2.Broker收到了消息,但是响应因为网络问题丢了,这个时候生产者还是会进行重发,这就会导致主题上面可能会有重复消息,这也是我们接下来需要考虑的一个问题,消息重复了怎么办?

  • 存储阶段:

    存储阶段还是分两种情况,如果Broker是单机的,那么需要保证消息落盘之后,再返回确认响应,这时候消息已经在磁盘中进行了持久化,就不会丢,如果Broker是集群形式的,那么可以通过配置,当消息在其他Broker节点上进行复制成功之后,再返回确认响应,当一个Broker宕机之后,其他的就可以进行补上

  • 消费阶段:

消费阶段和生产类似,也是通过确认机制来保证消息的不丢,当消费者客户端拿到消息之后,不要立即返回确认响应,而是在完成所有消费业务逻辑之后,再发送确认响应,这样如果在消费消息阶段失败了,那么下一次消费时可以继续消费上一次消费位置的消息,这里还有一种情况:如果消费者发出的消费确认响应丢了怎么办?这里还会涉及到重复消息的问题,后面我们一起讨论

一个合格的消息队列,上面的三个阶段都是可以严格保证消息不丢的,实际上我们在使用消息队列的时候,丢失消息的原因大多数是消费位置的不当导致的,因为目前大多数消息队列都是基于发布订阅模型的,每个消息都可以被多个消费者组消费,每个消费者组是互不影响的,而一个消费者组中的每个消费者是存在竞争关系的,当主题中的一个消息被一个消费者组消费完成之后,不能立即将消息删除,因为其他消费者组后面可能还会进行消费,这是我们需要为每个消费者组维持一个消费位置的变量

2.消息重复怎么办

首先明确一个立场:不管是在哪一种消息队列中,都可能存在重复消息的情况。

之前的问题中已经埋下了一个伏笔,也就是在生产者如果可能会将重复的消息发送到消息队列中,所有的消息队列都是At least Once级别的,之前就听说过Kafka可以保证Exactly Once级别,但是玥哥也是说了,这主要是为了配合流计算领域的实现,在消息的生产消费关系中,消息队列中 也是可能存在重复消息的!后面会单独写一篇文章,说一下事务和Exactly Once在流计算中是怎么搞的,既然消息队列中存在重复消息这件事情已经成为了既定的事实,那么我们只能通过在消费端的业务中实现幂等性来保证了,这样就可以满足:At least once + 幂等消费 = Exactly once

下面是几种实现幂等性的方法:

  • 利用数据库的唯一性实现

大概的思路就是利用数据库的主键的特性,当消息重复时,消费者在数据库表中可以检测到主键冲突异常,这样就可以不用进行接下来的重复消费

  • 设置相关判定条件

比如,“将账户 X 的余额增加 100 元”这个操作并不满足幂等性,我们可以把这个操作加上一个前置条件,变为:“如果账户 X 当前的余额为 500 元,将余额加 100 元”,这个操作就具备了幂等性。对应到消息队列中的使用时,可以在发消息时在消息体中带上当前的余额,在消费的时候进行判断数据库中,当前余额是否与消息中的余额相等,只有相等才执行变更操作。

还有一种比较通用的方法,借助了版本号这种思想,我们可以给数据加上一个版本号属性,每次更新数据前,比较当前数据的版本号和消息中数据的版本号是否匹配,如果不匹配就拒绝更新,更新数据的同时将版本号+1,可以实现幂等操作

  • 记录并检查

具体的实现方法是,在发送消息时,给每条消息指定一个全局唯一的 ID,消费时,先根据这个 ID 检查这条消息是否有被消费过,如果没有消费过,才更新数据,然后将消费状态置为已消费。

这种方法看起来比较理想,但是实现起来还是有一定难度的,因为这个方法一共涉及到三个操作,分别是:1.检查消息消费状态,2.更新数据,3.设置消费状态,这三种操作我们很难保证其原子性,可能会出现这样的问题:

比如说,对于同一条消息:“全局 ID 为 8,操作为:给 ID 为 666 账户增加 100 元”,有可能出现这样的情况:t0 时刻:Consumer A 收到条消息,检查消息执行状态,发现消息未处理过,开始执行“账户增加 100 元”;t1 时刻:Consumer B 收到条消息,检查消息执行状态,发现消息未处理过,因为这个时刻,Consumer A 还未来得及更新消息执行状态。

虽然说可以通过事务或者锁的方式实现,但是分布式系统中,无论是分布式事务,还是分布式锁都是比较难以解决的问题

为什么消息队列中不实现Exactly Once,还是使用At least Once+幂等这种方式实现呢?我想大概有两点原因:1.虽然实现了Exactly Once,但是当消费者的确认响应丢失之后,还是需要在原来的位置进行消费,这样一来,就失去了Exactly Once的意义,2.代价很大,要想让消息队列实现严格意义上的Exactly Once,那么就需要在内部实现很多的业务逻辑进行判断,增加服务端的压力,这样就会导致整个消息队列的性能下降,这是必然的,得不偿失。

3.消息积压该如何处理

消息积压其实在消息队列中是一个比较正常的现象,但是问题是当消息大量积压的时候,时间长了系统可能就就会产生问题,例如消息队列的存储被填满无法提供服务,这样就会丢失消息

处理消息积压,最根本的还是要要从生产者和消费者的协调度上解决,因为消息队列的性能不是瓶颈问题所在,对于绝大多数使用消息队列的业务系统来说,消息队列的处理能力是远远大于业务系统的,所以基本不会产生生产者生产消息过快,消息队列来不及接收的情况,那么当消息积压时,最根本的原因还是消费者消费的速度一直小于生产者的速度,所以我们在设计系统时,一定要保证消费端的消费性能要高于生产端的生产性能,这样的系统才能健康持续的运行,那么,当我们真的遇到了消息积压的情况,应该怎么办?

监控:消息队列基本都是自带监控的,我想这也是JMQ没有完全开源的原因,因为JMQ目前使用的监控也是京东自研的UMP,但是UMP那边还不想进行开源,所以整个JMQ就不能做到完全开源,说的有点远,出现问题的第一做法还是要查看监控日志定位问题,通过监控的数据,就比较容易定位问题,如果是生产者生产速度太快,例如双十一期间,那么我们的做法只能是下面的提高消费端消费能力,或者还有一种做法就是降流:减少不重要业务的发送

  • 消费端水平扩容

在条件允许的情况下,可以对消费端实例进行水平扩容,提高消费端的并行消费能力,但是一定要注意的一点是:必须同步扩容主题中的分区数量,这是很多人都容易忽视的地方,因为每个分区实际上只能支持单线程消费,如果仅仅是增加的消费端实例,没有进行分区的扩容,那么就不会起到效果

还有一种非常错误的做法,这里引以为鉴,不要使用

image-20200903221511984

这种方法看起来没什么问题,在OnMessage中只是进行消息的接收,不进行具体的处理,将消息全部放在内存队列中,然后真正负责业务逻辑处理的业务线程拿来进行处理,看上去进行了解耦,并且提高了并发能力,但是,如果节点发生宕机,内存队列中还没来得及进行处理的消息就会丢失!

  • 消费端代码优化

这也是可行的,可能是消费端整体的代码设计消费性能比较差,这种情况下可以通过优化代码的方式来提高消费能力

还有一种不太常见的情况,你通过监控发现,无论是发送消息的速度还是消费消息的速度和原来都没什么变化,这时候你需要检查一下你的消费端,是不是消费失败导致的一条消息反复消费这种情况比较多,这种情况也会拖慢整个系统的消费速度。


上面大概介绍了三个使用消息队列过程中经常遇到的问题,不管是哪一款消息队列产品,在上面的三个问题上都是具有共性的,当然这些问题也都是使用层面上的,接下来的文章中,我们深入到消息队列的实现层面上,来总结一下,实现一个优秀的消息队列都需要哪些方面的知识,我们可不可以自己动手,通过这些通用的知识,实现一个简单的RPC通信框架呢?敬请期待~