古道长亭

Contact me with ixiaoqiang0011@gmail.com


  • 首页

  • 归档

  • 分类

  • 关于

  • Book

  • 搜索

RabbitMQ高级特性

时间: 2023-06-12   |   分类: RabbitMQ   消息队列   | 字数: 7444 字 | 阅读约: 15分钟 | 阅读次数:

RabbitMQ高级特性

1. 消息可靠性

例: 我们平时使用的支付等, 支付平台必须保证数据正确性,保证数据并发安全性,保证数据最终一致性

平台会通过以下方式:

  • 分布式锁

  • 消息队列

    消息队列是为了保证最终一致性,我们需要确保消息队列有ack机制 客户端收到消息并消费处理完成后,客户端发送ack消息给消息中间件 如果消息中间件超过指定时间还没收到ack消息,则定时去重发消息

保证消息可靠性, 可以通过以下方式

1.1 异常捕获机制

先执行行业务操作,业务操作成功后执行行消息发送,消息发送过程通过trycatch方式捕获异常,在异常处理理的代码块中执行行回滚业务操作或者执行行重发操作等。这是一种最大努力确保的方式,并无法保证100%绝对可靠,因为这里没有异常并不代表消息就一定投递成功。

另外,可以通过spring.rabbitmq.template.retry.enabled=true配置开启发送端的重试

1.2 AMQP/RabbitMQ的事务机制

没有捕获到异常并不能代表消息就一定投递成功了。

一直到事务提交后都没有异常,确实就说明消息是投递成功了。但是,这种方式在性能方面的开销比较大,一般也不推荐使用。

try {
  //将channel设置为事务模式
  channel.txSelect();
  //发布消息到交换器,routingKey为空
  channel.basicPublish(exchange_name, routingKey, props, charSetName);
  //提交事务,只有消息成功被Broker接收才能提交成功
  channel.txCommit();
} catch (Exception e) {
  //事务回滚
  channel.txRollback();
}

1.3 发送端确认机制

RabbitMQ后来引入了一种轻量量级的方式,叫发送方确认(publisherconfirm)机制。生产者将信道设置成confirm(确认)模式,一旦信道进入confirm模式,所有在该信道上⾯面发布的消息都会被指派一个唯一的ID(从1开始),一旦消息被投递到所有匹配的队列之后(如果消息和队列是持久化的,那么确认消息会在消息持久化后发出),RabbitMQ就会发送一个确认(Basic.Ack)给生产者(包含消息的唯一ID),这样生产者就知道消息已经正确送达了。

  • 同步等待mq确认(confirm)示例

    RabbitMQ回传给生产者的确认消息中的deliveryTag字段包含了确认消息的序号,另外,通过设置channel.basicAck方法中的multiple参数,表示到这个序号之前的所有消息是否都已经得到了处理了。生产者投递消息后并不需要一直阻塞着,可以继续投递下一条消息并通过回调方式处理理ACK响应。如果RabbitMQ因为自身内部错误导致消息丢失等异常情况发生,就会响应一条nack(Basic.Nack)命令,生产者应用程序同样可以在回调方法中处理理该nack命令。

    示例代码: demo20_publisherconfirm#PublisherConfirmsProducer:https://gitee.com/ixinglan/rabbitmq-demo.git

    waitForConfirm方法有个重载的,可以自定义timeout超时时间,超时后会抛TimeoutException。

    类似的有几个waitForConfirmsOrDie方法,Broker端在返回nack(Basic.Nack)之后该方法会抛出java.io.IOException。需要根据异常类型来做区别处理理, TimeoutException超时是属于第三状态(无法确定成功还是失败),而返回Basic.Nack抛出IOException这种是明确的失败。上面的代码主要只是演示confirm机制,实际上还是同步阻塞模式的,性能并不不是太好。

  • 批处理方式改善性能

    我们也可以通过“批处理理”的方式来改善整体的性能(即批量量发送消息后仅调用一次waitForConfirms方法)。正常情况下这种批量处理的方式效率会高很多,但是如果发生了超时或者nack(失败)后那就需要批量量重发消息或者通知上游业务批量回滚(因为我们只知道这个批次中有消息没投递成功,而并不知道具体是那条消息投递失败了,所以很难针对性处理),如此看来,批量重发消息肯定会造成部分消息重复

    示例代码: demo20_publisherconfirm#PublisherConfirmsProducer2:https://gitee.com/ixinglan/rabbitmq-demo.git

  • 异步回调处理mq响应

    addConfirmListener方法可以添加ConfirmListener这个回调接口,这个ConfirmListener接口包含两个方法:handleAck和handleNack,分别用来处理RabbitMQ回传的Basic.Ack和Basic.Nack。

    示例代码: demo20_publisherconfirm#PublisherConfirmsProducer3:https://gitee.com/ixinglan/rabbitmq-demo.git

  • springboot示例

    示例代码: demo14_springboot_rabbit#MessageController2:https://gitee.com/ixinglan/rabbitmq-demo.git

1.4 持久化存储机制

持久化是提高RabbitMQ可靠性的基础,否则当RabbitMQ遇到异常时(如:重启、断电、停机等)数据将会丢失。主要从以下几个方面来保障消息的持久性:

  • Exchange的持久化。通过定义时设置durable 参数为ture来保证Exchange相关的元数据不不丢失。

  • Queue的持久化。也是通过定义时设置durable 参数为ture来保证Queue相关的元数据不不丢失。

  • 消息的持久化。通过将消息的投递模式 (BasicProperties 中的 deliveryMode 属性)设置为 2即可实现消息的持久化,保证消息自身不丢失。

示例代码: demo21_persistent:https://gitee.com/ixinglan/rabbitmq-demo.git

RabbitMQ中的持久化消息都需要写入磁盘(当系统内存不不足时,非持久化的消息也会被刷盘处理),这些处理理动作都是在“持久层”中完成的。持久层是一个逻辑上的概念,实际包含两个部分:

  • 队列索引(rabbit_queue_index),rabbit_queue_index 负责维护Queue中消息的信息,包括消息的存储位置、是否已交给消费者、是否已被消费及Ack确认等,每个Queue都有与之对应的rabbit_queue_index。

RabbitMQ通过配置queue_index_embed_msgs_below可以根据消息大小决定存储位置,默认queue_index_embed_msgs_below是4096字节(包含消息体、属性及headers),小于该值的消息存在rabbit_queue_index中。

  • 消息存储(rabbit_msg_store),rabbit_msg_store 以键值对的形式存储消息,它被所有队列列共享,在每个节点中有且只有一个。

1.5 Consumer ACK

RabbitMQ在消费端会有Ack机制,即消费端消费消息后需要发送Ack确认报文给Broker端,告知自己是否已消费完成,否则可能会一直重发消息直到消息过期(AUTO模式)。

一般而言,我们有如下处理手段:

  • 采用NONE模式,消费的过程中自行捕获异常,引发异常后直接记录日志并落到异常恢复表,再通过后台定时任务扫描异常恢复表尝试做重试动作。如果业务不自行处理则有丢失数据的风险

  • 采用AUTO(自动Ack)模式,不主动捕获异常,当消费过程中出现异常时会将消息放回Queue中,然后消息会被重新分配到其他消费者节点(如果没有则还是选择当前节点)重新被消费,默认会一直重发消息并直到消费完成返回Ack或者一直到过期

  • 采用MANUAL(手动Ack)模式,消费者自行控制流程并手动调用channel相关的方法返回Ack

示例代码: demo22_consumer_ack:https://gitee.com/ixinglan/rabbitmq-demo.git

springboot示例: demo15_springboot_consumer:https://gitee.com/ixinglan/rabbitmq-demo.git

    /**
     * 演示手动ack 确认或拒绝
     * 对应配置: spring.rabbitmq.listener.simple.acknowledge-mode=manual
     */
    @RabbitListener(queues = "queue.boot", ackMode = "MANUAL")
    public void getMyMessage(Message message, Channel channel) throws IOException {
        String value = message.getMessageProperties().getHeader("hello");
        System.out.println(message);
        System.out.println("hello = " + value);

        final long deliveryTag = message.getMessageProperties().getDeliveryTag();
        if (index % 2 == 0) {
            // 确认消息
            channel.basicAck(deliveryTag, false);
        } else {
            // 拒收消息
            channel.basicReject(deliveryTag, false);
        }
        index++;
    }

1.6 消费端限流

在电商的秒杀活动中,活动一开始会有大量并发写请求到达服务端,需要对消息进行削峰处理,如何削峰?

当消息投递速度远快于消费速度时,随着时间积累就会出现“消息积压”。消息中间件本身是具备一定的缓冲能力的,但这个能力是有容量限制的,如果长期运行并没有任何处理,最终会导致Broker崩 溃,而分布式系统的故障往往会发生上下游传递,连锁反应那就会很悲剧

  • RabbitMQ 可以对内存和磁盘使用量设置阈值,当达到阈值后,生产者将被阻塞(block),直到对应项指标恢复正常。全局上可以防止超大流量、消息积压等导致的Broker被压垮。当内存受限或磁盘可用空间受限的时候,服务器都会暂时阻止连接,服务器将暂停从发布消息的已连接客户端的套接字读取数据。连接心跳监视也将被禁用。所有网络连接将在rabbitmqctl和管理插件中显示为“已阻止”,这意味着它们尚未尝试发布,因此可以继续或被阻止,或意味着它们已发布,现在已暂停。兼容的客户端被阻止时将收到通知。在/etc/rabbitmq/rabbitmq.conf中配置磁盘可用空间大小:

  • RabbitMQ 还默认提供了一种基于credit flow 的流控机制,面向每一个连接进行流控。当单个队列达到最大流速时,或者多个队列达到总流速时,都会触发流控。触发单个链接的流控可能是因为connection、channel、queue的某一个过程处于flow状态,这些状态都可以从监控平台看到。

  • RabbitMQ中有一种QoS保证机制,可以限制Channel上接收到的未被Ack的消息数量,如果超过这个数量限制RabbitMQ将不会再往消费端推送消息。这是一种流控手段,可以防止大量消息瞬时从Broker送达消费端造成消费端巨大压力(甚至压垮消费端)。比较值得注意的是QoS机制仅对于消费端推模式有效,对拉模式无效.而且不支持NONE Ack模式

    执行channel.basicConsume 方法之前通过channel.basicQoS方法可以设置该数量。消息的发送是异步的,消息的确认也是异步的。在消费者消费慢的时候,可以设置Qos的prefetchCount,它表示broker在向消费者发送消息的时候,一旦发送了prefetchCount个消息而没有一个消息确认的时候,就停止发送。消费者确认一个,broker就发送一个,确认两个就发送两个。换句话说,消费者确认多少,broker就发送多少,消费者等待处理的个数永远限制在prefetchCount个.

    如果对于每个消息都发送确认,增加了网络流量,此时可以批量确认消息。如果设置了multiple为true,消费者在确认的时候,比如说id是8的消息确认了,则在8之前的所有消息都确认了

提升下游应用的吞吐量和缩短消费过程的耗时, 优化方式如下:

  • 优化应用程序性能,缩短响应时间
  • 增加消费者节点实例
  • 调整并发消费的线程数

示例代码: demo23_consumer_qos:https://gitee.com/ixinglan/rabbitmq-demo.git

1.7 消息可靠性保障

即前面讲到的:

  • 消息传输保障
  • 限流,应急手段
  • 业务层面的一些容错,补偿,异常重试等

消息可靠传输一般是业务系统接入消息中间件时首要考虑的问题,一般消息中间件的消息传输保障分为三个层级:

  • At most once:最多一次。消息可能会丢失,但绝不会重复传输

    方式就是生产者随意发送,消费者随意消费,不过这样很难确保消息不会丢失

  • At least once:最少一次。消息绝不会丢失,但可能会重复传输

    • 消息生产者需要开启事务机制或者publisher confirm 机制,以确保消息可以可靠地传输到RabbitMQ 中。

    • 消息生产者需要配合使用 mandatory 参数或者备份交换器来确保消息能够从交换器路由到队列中,进而能够保存下来而不会被丢弃。

    • 消息和队列都需要进行持久化处理,以确保RabbitMQ 服务器在遇到异常情况时不会造成消息丢失。

    • 消费者在消费消息的同时需要将autoAck 设置为false,然后通过手动确认的方式去确认已经正确消费的消息,以避免在消费端引起不必要的消息丢失。

  • Exactly once:恰好一次。每条消息肯定会被传输一次且仅传输一次

    rabbitmq 目前无法保障

1.8 消息幂等性处理

追求高性能就无法保证消息的顺序,而追求可靠性那么就可能产生重复消息,从而导致重复消费.

RabbitMQ层面有实现“去重机制”来保证“恰好一次”吗?答案是并没有。而且这个在目前主流的消息中间件都没有实现。

借用淘宝沈洵的一句话:最好的解决办法就是不去解决。当为了在基础的分布式中间件中实现某种相对不太通用的功能,需要牺牲到性能、可靠性、扩展性时,并且会额外增加很多复杂度,最简单的办法就是交给业务自己去处理。事实证明,很多业务场景下是可以容忍重复消息的。例如:操作日志收集,而对一些金融类的业务则要求比较严苛。

一般解决重复消息的办法是,在消费端让我们消费消息的操作具备幂等性。

幂等(Idempotence)是一个数学上的概念,它是这样定义的:

如果一个函数f(x) 满足:f(f(x)) = f(x),则函数f(x) 满足幂等性。这个概念被拓展到计算机领域,被用来描述一个操作、方法或者服务。

一个幂等操作的特点是,其任意多次执行所产生的影响均与一次执行的影响相同。一个幂等的方法,使用同样的参数,对它进行多次调用和一次调用,对系统产生的影响是一样的。

业界对于幂等性的一些常见处理方法:

  • 借助数据库唯一索引,重复插入直接报错,事务回滚。还是举经典的转账的例子,为了保证不重复扣款或者重复加钱,我们这边维护一张“资金变动流水表”,里面至少需要交易单号、变动账户、变动金额等3个字段。我们选择交易单号和变动账户做联合唯一索引(单号是上游生成的可保证唯一性),这样如果同一笔交易发生重复请求时就会直接报索引冲突,事务直接回滚。现实中,数据库唯一索引的方式通常做为兜底保证;

  • 前置检查机制。这个很容易理解,并且有几种实现办法。还是引用上面转账的例子,当我在执行更改账户余额这个动作之前,我得先检查下资金变动流水表(或者Tair中)中是否已经存在这笔交易相关的记录了, select * from xxx where accountNumber=xxx and orderId=yyy ,如果已经存在,那么直接返回,否则执行正常的更新余额的动作。

    为了防止并发问题,我们通常需要借助**“排他锁”来完成。在支付宝有一条铁律叫:一锁、二判、三操作。当然,我们也可以使用乐观锁或CAS机制**,乐观锁一般会使用扩展一个版本号字段做判断条件

  • 唯一Id机制,比较通用的方式。对于每条消息我们都可以生成唯一Id,消费前判断Tair中是否存在(MsgId做Tair排他锁的key),消费成功后将状态写入Tair中,这样就可以防止重复消费了。

2. 可靠性分析

在使用任何消息中间件的过程中,难免会出现消息丢失等异常情况,这个时候就需要有一个良好的机制来跟踪记录消息的过程(轨迹溯源),帮助我们排查问题。

在RabbitMQ中可以使用Firehose功能来实现消息追踪,Firehose可以记录每一次发送或者消费消息的记录,方便RabbitMQ的使用者进行调试、排错等。

Firehose的原理是将生产者投递给RabbitMQ的消息,或者RabbitMQ投递给消费者的消息按照指定的格式发送到默认的交换器上。这个默认的交换器的名称为amq.rabbitmq.trace,它是一个topic类型的交换器。发送到这个交换器上的消息的路由键为publish.{exchangename}和deliver.{queuename}。其中exchangename和queuename为交换器和队列的名称,分别对应生产者投递到交换器的消息和消费者从队列中获取的消息。

#开启firehose命令
rabbitmqctl trace_on [-p vhost]

#关闭命令
rabbitmqctl trace_off [-p vhost]

#Firehose默认情况下处于关闭状态,并且Firehose的状态是非持久化的,
#会在RabbitMQ服务重启的时候还原成默认的状态。
#Firehose开启之后多少会影响RabbitMQ整体服务性能,因为它会引起额外的消息生成、路由和存储。

#启动rabbitmq_tracing插件
rabbitmq-plugins enable rabbitmq_tracing

#关闭插件
rabbitmq-plugins disable rabbitmq_tracing

开启后我们发送消息看一下效果:

示例代码: demo24_tracing

  • 代码里我们用到的exchange和queue分别为: ex.tc.demo, queue.tc.demo

  • 管理后台创建两个队列

  • 将exchangeamq.rabbitmq.trace分别与上述两个队列绑定

    注意

    amq.rabbitmq.trace 与publishertrace绑定key为 publish.ex.tc.demo

    amq.rabbitmq.trace 与delivertrac绑定key为 deliver.queue.tc.demo

  • 添加两个trace

  • 分别生产和消费消息,即可看到记录

3. TTL机制

在京东下单,订单创建成功,等待支付,一般会给30分钟的时间,开始倒计时。如果在这段时间内用户没有支付,则默认订单取消, 如何实现?

  • 定期轮询

    设计简单,但需要对数据库进行大量的IO操作

  • Timer

    没有持久化机制, 不灵活, 不能利用线程池, 没有管理计划

  • ScheduledExecutorService

    可以多线程执行, 但在高并发下, 浪费服务器性能, 不建议

  • RabbitMQ

    使用TTL

  • Quartz, Redis Zset, JCronTab, SchedulerX……

TTL,Time to Live 的简称,即过期时间。

RabbitMQ 可以对消息和队列两个维度来设置TTL, 如下:

  • 通过Queue属性设置,队列中所有消息都有相同的过期时间。

  • 对消息自身进行单独设置,每条消息的TTL 可以不同。

    不设置, 则默认不会过期, 如果设置为0,表示除非此时可以直接将消息投递到消费者, 否则会被立即丢弃

如果两种方法一起使用,则消息的TTL 以两者之间较小数值为准。通常来讲,消息在队列中的生存时间一旦超过设置的TTL 值时,就会变成“死信”(Dead Message),消费者默认就无法再收到该消息。

示例代码: demo17_ttl

此外,还可以通过命令行方式设置全局TTL,执行如下命令:

rabbitmqctl set_policy TTL ".*" '{"message-ttl":30000}' --apply-to queues

springboot示例: demo14_springboot_rabbit#TTLController

4. 死信队列

DLX,全称为Dead-Letter-Exchange,死信交换器。消息在一个队列中变成死信(Dead Letter)之后,被重新发送到一个特殊的交换器(DLX)中,同时,绑定DLX的队列就称为“死信队列”。

有以下几种情况会变成死信队列:

  • 消息被拒绝, 并且设置requeue参数为false
  • 消息过期
  • 队列达到最大长度

对于RabbitMQ 来说,DLX 是一个非常有用的特性。它可以处理异常情况下,消息不能够被消费者正确消费(消费者调用了Basic.Nack 或者Basic.Reject)而被置入死信队列中的情况,后续分析程序可以通过消费这个死信队列中的内容来分析当时所遇到的异常情况,进而可以改善和优化系统。

示例代码: demo18_dlx

5. 延迟队列

延迟消息是指的消息发送出去后并不想立即就被消费,而是需要等(指定的)一段时间后才触发消费。

可以借助死信队列来变相实现: 使用rabbitmq_delayed_message_exchange插件实现

  • 生产者将消息(msg)和路由键(routekey)发送指定的延时交换机(exchange)上

  • 延时交换机(exchange)存储消息等待消息到期根据路由键(routekey)找到绑定自己的队列(queue)并把消息给它

  • 队列(queue)再把消息发送给监听它的消费者(customer)

实现步骤:

  • 下载插件

    下载地址:https://github.com/rabbitmq/rabbitmq-delayed-message-exchange/releases

  • 安装插件

    将插件拷贝到rabbitmq-server的安装路径:/usr/lib/rabbitmq/lib/rabbitmq_server-3.8.4/plugins

  • 启用插件

    rabbitmq-plugins list 
    rabbitmq-plugins enable rabbitmq_delayed_message_exchange 1
    
  • 重启rabbitmq-server

    systemctl restart rabbitmq-server
    

示例代码: demo19_delayed_exchange

#RabbitMQ# #消息队列#
QQ扫一扫交流

标题:RabbitMQ高级特性

作者:古道长亭

声明: 欢迎加群交流!

如有帮助,欢迎多多交流 ^_^

微信打赏

支付宝打赏

集群与运维
RabbitMQ架构与实战
  • 文章目录
  • 站点概览
古道长亭

古道长亭

Always remember that your present situation is not your final destination. The best is yet to come.

226 日志
57 分类
104 标签
GitHub Gitee
友情链接
  • 古道长亭的BOOK
  • JAVA学习
标签云
  • Mysql
  • 搜索引擎
  • Mybatis
  • 容器
  • 架构
  • 消息队列
  • Flink
  • Sharding sphere
  • 流处理
  • 缓存
  • 1. 消息可靠性
    • 1.1 异常捕获机制
    • 1.2 AMQP/RabbitMQ的事务机制
    • 1.3 发送端确认机制
    • 1.4 持久化存储机制
    • 1.5 Consumer ACK
    • 1.6 消费端限流
    • 1.7 消息可靠性保障
    • 1.8 消息幂等性处理
  • 2. 可靠性分析
  • 3. TTL机制
  • 4. 死信队列
  • 5. 延迟队列
© 2019 - 2024 京ICP备19012088号-1
0%