古道长亭

Contact me with ixiaoqiang0011@gmail.com


  • 首页

  • 归档

  • 分类

  • 关于

  • Book

  • 搜索

RocketMQ高级实战

时间: 2023-06-19   |   分类: RocketMQ   消息队列   | 字数: 7671 字 | 阅读约: 16分钟 | 阅读次数:

RocketMQ高级实战

1. 生产者

  • tags的使用

    一个应用尽可能用一个Topic,而消息子类型则可以用tags来标识。tags可以由应用自由设置,只有生产者在发送消息设置了tags,消费方在订阅消息时才可以利用tags通过broker做消息过滤:message.setTags(“TagA”)。

  • keys的使用

    每个消息在业务层面的唯一标识码要设置到keys字段,方便将来定位消息丢失问题。服务器会为每个消息创建索引(哈希索引),应用可以通过topic、key来查询这条消息内容,以及消息被谁消费。由于是哈希索引,请务必保证key尽可能唯一,这样可以避免潜在的哈希冲突。

    String orderId = "20034568923546";
    message.setKeys(orderId);
    
  • 日志的打印

    消息发送成功或者失败要打印消息日志,务必要打印SendResult和key字段。send消息方法只要不抛异常,就代表发送成功。发送成功会有多个状态,在sendResult里定义

    SENC_OK: 消息发送成功.要注意的是消息发送成功也不意味着它是可靠的。要确保不会丢失任何消息,还应启用同步Master服务器或同步刷盘,即SYNC_MASTER或SYNC_FLUSH

    FLUSH_DISK_TIMEOUT:消息发送成功但是服务器刷盘超时。此时消息已经进入服务器队列(内存),只有服务器宕机,消息才会丢失。消息存储配置参数中可以设置刷盘方式和同步刷盘时间长度

    FLUSH_SLAVE_TIMEOUT:消息发送成功,但是服务器同步到Slave时超时。此时消息已经进入服务器队列,只有服务器宕机,消息才会丢失

    SLAVE_NOT_AVAILABLE:消息发送成功,但是此时Slave不可用。

  • 消息发送失败处理方式

    Producer的send方法本身支持内部重试,重试逻辑如下

    至多重试2次(同步发送为2次,异步发送为0次)。
    如果发送失败,则轮转到下一个Broker。这个方法的总耗时时间不超过sendMsgTimeout设置的值,默认10s。
    如果本身向broker发送消息产生超时异常,就不会再重试
    

    以上策略也是在一定程度上保证了消息可以发送成功。如果业务对消息可靠性要求比较高,建议应用增加相应的重试逻辑:比如调用send同步方法发送失败时,则尝试将消息存储到db,然后由后台线程定时重试,确保消息一定到达Broker。

  • 选择oneway形式发送

    对可靠性要求并不高,例如日志收集类应用,此类应用可以采用oneway形式调用,oneway****形式只发送请求不等待应答**,而发送请求在客户端实现层面仅仅是一个操作系统系统调用的开销,即将数据写入客户端的socket缓冲区,此过程耗时通常在微秒级

2. 消费者

  • 消费过程幂等

    RocketMQ无法避免消息重复(Exactly-Once),所以如果业务对消费重复非常敏感,务必要在业务层面进行去重处理

  • 消费速度慢的处理方式

    • 提高消费并行度

      合理设置消费并行度的方法:

      1. 同一个 ConsumerGroup 下,通过增加 Consumer 实例数量来提高并行度(需要注意的是超
      过订阅队列数的 Consumer 实例无效)。可以通过加机器,或者在已有机器启动多个进程的
      方式。
      2. 提高单个 Consumer 的消费并行线程,通过修改参数 consumeThreadMin、
      consumeThreadMax实现。
      3. 丢弃部分不重要的消息
      
    • 批量方式消费

      例如订单扣款类应用,一次处理一个订单耗时 1 s,一次处理 10 个订单可能也只耗时 2 s,这样即可大幅度提高消费的吞吐量,通过设置 consumer的 consumeMessageBatchMaxSize 返个参数,默认是 1,即一次只消费一条消息,例如设置为 N,那么每次消费的消息数小于等于 N

    • 跳过非重要消息

  • 优化每条消息消费过程

  • 消费打印日志

  • 其他建议

    • 确保同一组内的每个消费者订阅信息保持一致

    • 消费者将锁定每个消息队列,以确保他们被逐个消费,虽然这将会导致性能下降,但是当你关心消息顺序的时候会很有用。我们不建议抛出异常,你可以返回ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT 作为替代。

    • 消费者将并发消费消息,建议你使用它来获得良好性能,我们不建议抛出异常,你可以返回 ConsumeConcurrentlyStatus.RECONSUME_LATER 作为替代

    • 对于并发的消费监听器,你可以返回 RECONSUME_LATER 来通知消费者现在不能消费这条消息,并且希望可以稍后重新消费它。然后,你可以继续消费其他消息。对于有序的消息监听器,因为你关心它的顺序,所以不能跳过消息,但是你可以返回SUSPEND_CURRENT_QUEUE_A_MOMENT 告诉消费者等待片刻

    • 不建议阻塞监听器,因为它会阻塞线程池,并最终可能会终止消费进程

    • 当建立一个新的消费组时,需要决定是否需要消费已经存在于 Broker 中的历史消息

      CONSUME_FROM_LAST_OFFSET 将会忽略历史消息,并消费之后生成的任何消息。

      CONSUME_FROM_FIRST_OFFSET 将会消费每个存在于 Broker 中的信息。

      也可以使用 CONSUME_FROM_TIMESTAMP 来消费在指定时间戳后产生的消息。

      consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET);
      

3. Broker

  • broker角色

    ASYNC_MASTER(异步主机)、SYNC_MASTER(同步主机)以及SLAVE(从机)

  • FlushDiskType

    SYNC_FLUSH(同步刷新)相比于ASYNC_FLUSH(异步处理)会损失很多性能,但是也更可靠,所以需要根据实际的业务场景做好权衡

  • Broker配置

    上一节已提到过

4. NameServer

NameServer设计:

  • NameServer互相独立,彼此没有通信关系,单台NameServer挂掉,不影响其他NameServer。

  • NameServer不去连接别的机器,不主动推消息。

  • 单个Broker(Master、Slave)与所有NameServer进行定时注册,以便告知NameServer自己还活着。

  • Consumer随机与一个NameServer建立长连接,如果该NameServer断开,则从NameServer列表中查找下一个进行连接

  • Producer随机与一个NameServer建立长连接,每隔30秒(此处时间可配置)从NameServer获取Topic的最新队列情况,如果某个Broker Master宕机,Producer最多30秒才能感知,在这个期间,发往该broker master的消息失败。Producer向提供Topic服务的Master建立长连接,且定时向Master发送心跳

NameServer的作用:

1. NameServer 用来保存活跃的 broker 列表,包括 Master 和 Slave 。
2. NameServer 用来保存所有 topic 和该 topic 所有队列的列表。
3. NameServer 用来保存所有 broker 的 Filter 列表。
4. 命名服务器为客户端,包括生产者,消费者和命令行客户端提供最新的路由信息。

RocketMQ为什么不使用zookeeper?

在粗粒度分布式锁,分布式选主,主备高可用切换等不需要高TPS支持的场景下有不可替代的作
用,而这些需求往往多集中在大数据、离线任务等相关的业务领域,因为大数据领域,讲究分割数据
集,并且大部分时间分任务多进程/线程并行处理这些数据集,但是总是有一些点上需要将这些任务和
进程统一协调,这时候就是ZooKeeper发挥巨大作用的用武之地。

但是在交易场景交易链路上,在主业务数据存取,大规模服务发现、大规模健康监测等方面有天然
的短板,应该竭力避免在这些场景下引入ZooKeeper,在阿里巴巴的生产实践中,应用对ZooKeeper申
请使用的时候要进行严格的场景、容量、SLA需求的评估。

对于ZooKeeper,大数据使用,服务发现不用

5. 客户端配置

DefaultMQProducer、TransactionMQProducer、DefaultMQPushConsumer、DefaultMQPullConsumer都继承于ClientConfig类,ClientConfig为客户端的公共配置类

  • 客户端寻址方式

    RocketMQ可以令客户端找到Name Server, 然后通过Name Server再找到Broker。如下所示有多种配置方式,优先级由高到低,高优先级会覆盖低优先级

    • 代码中指定NameServer地址

    • java启动参数中指定Name Server地址

    • 环境变量指定NameServer地址

      export NAMESRV_ADDR=192.168.0.1:9876;192.168.0.2:9876
      
    • HTTP静态服务器寻址(默认)

      该静态地址,客户端第一次会10s后调用,然后每个2分钟调用一次。

      客户端启动后,会定时访问一个静态HTTP服务器,地址如下:http://ip:port/rocketmq/nsaddr

  • 客户端公共配置

    参数名默认值说明
    namesrvAddrnameServer地址列表,多个",“隔开
    clientIP本机ip客户端本机ip
    instanceName客户端实例名称,客户端创建的多个Producer、Consumer实际是共用一个内部实例(这个实例包含网络连接、线程资源等)
    clientCallbackExecutorThreads4通信层异步回调线程数
    pollNameServerInteval30000轮询Name Server间隔时间,单位毫秒
    heartbeatBrokerInterval30000向Broker发送心跳间隔时间,单位毫秒
    persistConsumerOffsetInterval5000持久化Consumer消费进度间隔时间,单位毫秒
  • Producer配置

    参数名默认值说明
    producerGroupDEFAULT_PRODUCERProducer组名,多个Producer如果属于一个应用,发送同样的消 息,则应该将它们归为同一组
    createTopicKeyTBW102在发送消息时,自动创建服务器不存在的topic,需要指定Key,该Key可用于配置发送消息所在topic的默认路由。
    defaultTopicQueueNums4在发送消息,自动创建服务器不存在的topic时,默认创建的队列数
    sendMsgTimeout10000发送消息超时时间,单位毫秒
    compressMsgBodyOverHowmuch4096消息Body超过多大开始压缩(Consumer收到消息会自动解压缩),单位字节
    retryAnotherBrokerWhenNotStoreOKFALSE如果发送消息返回sendResult,但是sendStatus!=SEND_OK,是否重 试发送
    retryTimesWhenSendFailed2如果消息发送失败,最大重试次数,该参数只对同步发送模式起作用
    maxMessageSize4MB客户端限制的消息大小,超过报错,同时服务端也会限制,所以需 要跟服务端配合使用
    transactionCheckListener事务消息回查监听器,如果发送事 务消息,必须设置
    checkThreadPoolMinSize1Broker回查Producer事务状态时,线程池最小线程数
    checkThreadPoolMaxSize1Broker回查Producer事务状态时,线程池最大线程数
    checkRequestHoldMax2000Broker回查Producer事务状态时, Producer本地缓冲请求队列大小
    RPCHooknull该参数是在Producer创建时传入的,包含消息发送前的预处理和消息响应后的处理两个接口,用户可以在第一个接口中做一些安全控制或者其他操作
  • PushConsumer配置

    参数名默认值说明
    consumerGroupConsumer组名,多个Consumer如果属于 一个应用,订阅同样的消息,且消费逻辑 一致,则应该将它们归为同一组
    messageModelCLUSTERING消费模型支持集群消费和广播消费两种
    consumeFromWhereCONSUME_FROM_LAST_OFFSETConsumer启动后,默认从上次消费的位置 开始消费,这包含两种情况:一种是上次消费的位置未过期,则消费从上次中止的 位置进行;一种是上次消费位置已经过期,则从当前队列第一条消息开始消费
    consumeTimestamp只有当consumeFromWhere值为CONSUME_FROM_TIMESTAMP时才起作 用。
    allocateMessageQueueStrategyAllocateMessageQueueAveragelyRebalance算法实现策略
    subscription订阅关系
    messageListener消息监听器
    offsetStore消费进度存储
    consumeThreadMin10消费线程池最小线程数
    consumeThreadMax20消费线程池最大线程数
    consumeConcurrentlyMaxSpan2000单队列并行消费允许的最大跨度
    pullThresholdForQueue1000拉消息本地队列缓存消息最大数
    pullInterval0拉消息间隔,由于是长轮询,所以为0,但 是如果应用为了流控,也可以设置大于0的 值,单位毫秒
    consumeMessageBatchMaxSize1批量消费,一次消费多少条消息
    pullBatchSize32批量拉消息,一次最多拉多少条
  • PullConsumer配置

    参数名默认值说明
    consumerGroup
    brokerSuspendMaxTimeMillis20000长轮询,Consumer 拉消息请求在 Broker挂起最长时间,单位毫秒
    consumerTimeoutMillisWhenSuspend30000长轮询,Consumer拉消息请求在 Broker挂起超过指 定时间,客户端认为超时,单位毫秒
    consumerPullTimeoutMillis10000非长轮询,拉消息超时时间,单位毫秒
    messageModelBROADCASTING消息支持两种模式:集群消费和广播消费
    messageQueueListener监听队列变化
    offsetStore消费进度存储
    registerTopics注册的topic集合
    allocateMessageQueueStrategyAllocateMessageQueueAveragelyRebalance算法实现策略
  • Message数据结构

    参数名默认值说明
    Topicnull必填,消息所属topic的名称
    Bodynull必填,消息体
    Tagsnull选填,消息标签,方便服务器过滤使用。目前只支持每个消息设置一个tag
    Keysnull选填,代表这条消息的业务关键词,服务器会根据keys创建哈希索引,设置后,可以在Console系统根据Topic、Keys来查询消息,由于是哈希索引,请尽可能保证key唯一,例如订单号,商品Id等。
    Flag0选填,完全由应用来设置,RocketMQ不做干预
    DelayTimeLevel0选填,消息延时级别,0表示不延时,大于0会延时特定的时间才会被消费
    WaitStoreMsgOKTRUE选填,表示消息是否在服务器落盘后才返回应答

6. 系统配置

  • JVM选项

    #设置Xms和Xmx一样大,防止JVM重新调整堆空间大小影响性能。
    -server -Xms8g -Xmx8g -Xmn4g
    #设置DirectByteBuffer内存大小。当DirectByteBuffer占用达到这个值,就会触发Full GC。
    -XX:MaxDirectMemorySize=15g
    #如果不太关心RocketMQ的启动时间,可以设置pre-touch,这样在JVM启动的时候就会分配完整的页空间。
    -XX:+AlwaysPreTouch
    #禁用偏向锁可能减少JVM的停顿,因为偏向锁在线程需要获取锁之前会判断当前线程是否拥有锁
    #如果拥有,就不用再去获取锁了。
    #在并发小的时候使用偏向锁有利于提升JVM效率,在高并发场合禁用掉。
    -XX:-UseBiasedLocking
    

    推荐使用JDK1.8的G1垃圾回收器:当在G1的GC日志中看到 to-space overflow 或者 to-space exhausted 的时候,表示G1没有足够的内存使用的(可能是 survivor 区不够了,可能是老年代不够了,也可能是两者都不够了),这时候表示Java堆占用大小已经达到了最大值

    对G1而言,大小超过region大小50%的对象将被认为是大对象,这种大对象将直接被分配到老年代的humongous regions中,humongous regions是连续的region集合,StartsHumongous表记集合从那里开始,ContinuesHumongous标记连续集合

    在分配大对象之前,将会检查标记阈值,如果有必要的话,还会启动并发周期。

    死亡的大对象会在标记周期的清理阶段和发生Full GC的时候被清理。

    为了减少复制开销,任何转移阶段都不包含大对象的复制。在Full GC时,G1在原地压缩大对象。因为每个独立的humongous regions只包含一个大对象,因此从大对象的结尾到它占用的最后一个region的结尾的那部分空间时没有被使用的,对于那些大小略大于region整数倍的对象,这些没有被使用的内存将导致内存碎片化。

    如果你看到因为大对象的分配导致不断的启动并发收集,并且这种分配使得老年代碎片化不断加剧,那么请增加-XX:G1HeapRegionSize参数的值,这样的话,大对象将不再被G1认为是大对象,它会走普通对象的分配流程。

    # G1回收器将堆空间划分为1024个region,此选项指定堆空间region的大小 
    -XX:+UseG1GC -XX:G1HeapRegionSize=16m 
    -XX:G1ReservePercent=25 
    -XX:InitiatingHeapOccupancyPercent=30
    

    上述设置可能有点儿激进,但是对于生产环境,性能很好。

    -XX:MaxGCPauseMillis不要设置的太小,否则JVM会使用小的年轻代空间以达到此设置的值,同时引起很频繁的minor GC。

    #推荐使用GC log文件:
    -XX:+UseGCLogFileRotation
    -XX:NumberOfGCLogFiles=5 
    -XX:GCLogFileSize=30m
    
    #如果写GC文件增加了Broker的延迟,可以考虑将GC log文件写到内存文件系统:
    -Xloggc:/dev/shm/mq_gc_%p.log123
    
  • linux内核参数

    os.sh脚本在bin文件夹中列出了许多内核参数,可以进行微小的更改然后用于生产用途

    # 获取内核参数值 
    sysctl vm.extra_free_kbytes 
    # 设置内核参数值 
    sudo sysctl -w vm.overcommit_memory=1
    

    vm.extra_free_kbytes,告诉VM在后台回收(kswapd)启动的阈值与直接回收(通过分配进程)的阈值之间保留额外的可用内存。RocketMQ使用此参数来避免内存分配中的长延迟。(与具体内核版本相关)

    vm.min_free_kbytes,如果将其设置为低于1024KB,将会巧妙的将系统破坏,并且系统在高负载下容易出现死锁。

    vm.max_map_count,限制一个进程可能具有的最大内存映射区域数。RocketMQ将使用mmap加载CommitLog和ConsumeQueue,因此建议将为此参数设置较大的值。

    vm.swappiness,定义内核交换内存页面的积极程度。较高的值会增加攻击性,较低的值会减少交换量。建议将值设置为10来避免交换延迟。

    File descriptor limits,RocketMQ需要为文件(CommitLog和ConsumeQueue)和网络连接打开文件描述符。我们建议设置文件描述符的值为655350。

    Disk scheduler,RocketMQ建议使用I/O截止时间调度器,它试图为请求提供有保证的延迟

7. 动态扩缩容

  • 动态增减NameServer机器

    通过HTTP服务来设置方式看似繁琐,但它是唯一支持动态增加NameServer,无须重启其他组件的方式。使用这种方式后其他组件会每隔2分钟请求一次该URL,获取最新的NameServer地址

  • 动态增减Broker机器

    集群扩容后,一是可以把新建的Topic指定到新的Broker机器上,均衡利用资源;另一种方式是通过updateTopic命令更改现有的Topic配置,在新加的Broker上创建新的队列

    如果因为业务变动或者置换机器需要减少Broker,此时该如何操作呢?减少Broker要看是否有持续运行的Producer,当一个Topic只有一个Master Broker,停掉这个Broker后,消息的发送肯定会受到影响,需要在停止这个Broker前,停止发送消息

    当某个Topic有多个Master Broker,停了其中一个,这时候是否会丢失消息呢?答案和Producer使用的发送消息的方式有关,如果使用同步方式send(msg)发送,在DefaultMQProducer内部有个自动重试逻辑,其中一个Broker停了,会自动向另一个Broker发消息,不会发生丢消息现象。如果使用异步方式发送send(msg,callback),或者用sendOneWay方式,会丢失切换过程中的消息。因为在异步和sendOneWay这两种发送方式下,Producer.setRetryTimesWhenSendFailed设置不起作用,发送失败不会重试。DefaultMQProducer默认每30秒到NameServer请求最新的路由消息,Producer如果获取不到已停止的Broker下的队列信息,后续就自动不再向这些队列发送消息

    如果Producer程序能够暂停,在有一个Master和一个Slave的情况下也可以顺利切换。可以关闭Producer后关闭Master Broker,这个时候所有的读取都会被定向到Slave机器,消费消息不受影响。把Master Broker机器置换完后,基于原来的数据启动这个Master Broker,然后再启动Producer程序正常发送消息

8. 各种故障对消息的影响

  • Broker正常关闭,启动;

    属于可控的软件问题,内存中的数据不会丢失

  • Broker异常Crash,然后启动;

    属于软件故障,内存的数据可能丢失,所以刷盘策略不同,造成的影响也不同,

  • OS Crash,重启;

    属于软件故障,内存的数据可能丢失,所以刷盘策略不同,造成的影响也不同,

  • 机器断电,但能马上恢复供电;

    属于软件故障,内存的数据可能丢失,所以刷盘策略不同,造成的影响也不同,

  • 磁盘损坏;

    属于硬件故障,发生第5、6种情况的故障,原有机器的磁盘数据可能会丢失

  • CPU、主板、内存等关键设备损坏。

    属于硬件故障,发生第5、6种情况的故障,原有机器的磁盘数据可能会丢失

#RocketMQ# #消息队列#
QQ扫一扫交流

标题:RocketMQ高级实战

作者:古道长亭

声明: 欢迎加群交流!

如有帮助,欢迎多多交流 ^_^

微信打赏

支付宝打赏

RocketMQ集群与运维
RocketMQ高级特性及原理
  • 文章目录
  • 站点概览
古道长亭

古道长亭

Always remember that your present situation is not your final destination. The best is yet to come.

226 日志
57 分类
104 标签
GitHub Gitee
友情链接
  • 古道长亭的BOOK
  • JAVA学习
标签云
  • Mysql
  • 搜索引擎
  • Mybatis
  • 容器
  • 架构
  • 消息队列
  • Flink
  • Sharding sphere
  • 流处理
  • 缓存
  • 1. 生产者
  • 2. 消费者
  • 3. Broker
  • 4. NameServer
  • 5. 客户端配置
  • 6. 系统配置
  • 7. 动态扩缩容
  • 8. 各种故障对消息的影响
© 2019 - 2024 京ICP备19012088号-1
0%