RocketMQ高级实战
1. 生产者
-
tags的使用
一个应用尽可能用一个Topic,而消息子类型则可以用tags来标识。tags可以由应用自由设置,只有生产者在发送消息设置了tags,消费方在订阅消息时才可以利用tags通过broker做消息过滤:message.setTags(“TagA”)。
-
keys的使用
每个消息在业务层面的唯一标识码要设置到keys字段,方便将来定位消息丢失问题。服务器会为每个消息创建索引(哈希索引),应用可以通过topic、key来查询这条消息内容,以及消息被谁消费。由于是哈希索引,请务必保证key尽可能唯一,这样可以避免潜在的哈希冲突。
String orderId = "20034568923546"; message.setKeys(orderId);
-
日志的打印
消息发送成功或者失败要打印消息日志,务必要打印SendResult和key字段。send消息方法只要不抛异常,就代表发送成功。发送成功会有多个状态,在sendResult里定义
SENC_OK: 消息发送成功.要注意的是消息发送成功也不意味着它是可靠的。要确保不会丢失任何消息,还应启用同步Master服务器或同步刷盘,即SYNC_MASTER或SYNC_FLUSH
FLUSH_DISK_TIMEOUT:消息发送成功但是服务器刷盘超时。此时消息已经进入服务器队列(内存),只有服务器宕机,消息才会丢失。消息存储配置参数中可以设置刷盘方式和同步刷盘时间长度
FLUSH_SLAVE_TIMEOUT:消息发送成功,但是服务器同步到Slave时超时。此时消息已经进入服务器队列,只有服务器宕机,消息才会丢失
SLAVE_NOT_AVAILABLE:消息发送成功,但是此时Slave不可用。
-
消息发送失败处理方式
Producer的send方法本身支持内部重试,重试逻辑如下
至多重试2次(同步发送为2次,异步发送为0次)。 如果发送失败,则轮转到下一个Broker。这个方法的总耗时时间不超过sendMsgTimeout设置的值,默认10s。 如果本身向broker发送消息产生超时异常,就不会再重试
以上策略也是在一定程度上保证了消息可以发送成功。如果业务对消息可靠性要求比较高,建议应用增加相应的重试逻辑:比如调用send同步方法发送失败时,则尝试将消息存储到db,然后由后台线程定时重试,确保消息一定到达Broker。
-
选择oneway形式发送
对可靠性要求并不高,例如日志收集类应用,此类应用可以采用oneway形式调用,oneway****形式只发送请求不等待应答**,而发送请求在客户端实现层面仅仅是一个操作系统系统调用的开销,即将数据写入客户端的socket缓冲区,此过程耗时通常在微秒级
2. 消费者
-
消费过程幂等
RocketMQ无法避免消息重复(Exactly-Once),所以如果业务对消费重复非常敏感,务必要在业务层面进行去重处理
-
消费速度慢的处理方式
-
提高消费并行度
合理设置消费并行度的方法:
1. 同一个 ConsumerGroup 下,通过增加 Consumer 实例数量来提高并行度(需要注意的是超 过订阅队列数的 Consumer 实例无效)。可以通过加机器,或者在已有机器启动多个进程的 方式。 2. 提高单个 Consumer 的消费并行线程,通过修改参数 consumeThreadMin、 consumeThreadMax实现。 3. 丢弃部分不重要的消息
-
批量方式消费
例如订单扣款类应用,一次处理一个订单耗时 1 s,一次处理 10 个订单可能也只耗时 2 s,这样即可大幅度提高消费的吞吐量,通过设置 consumer的 consumeMessageBatchMaxSize 返个参数,默认是 1,即一次只消费一条消息,例如设置为 N,那么每次消费的消息数小于等于 N
-
跳过非重要消息
-
-
优化每条消息消费过程
-
消费打印日志
-
其他建议
-
确保同一组内的每个消费者订阅信息保持一致
-
消费者将锁定每个消息队列,以确保他们被逐个消费,虽然这将会导致性能下降,但是当你关心消息顺序的时候会很有用。我们不建议抛出异常,你可以返回ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT 作为替代。
-
消费者将并发消费消息,建议你使用它来获得良好性能,我们不建议抛出异常,你可以返回 ConsumeConcurrentlyStatus.RECONSUME_LATER 作为替代
-
对于并发的消费监听器,你可以返回 RECONSUME_LATER 来通知消费者现在不能消费这条消息,并且希望可以稍后重新消费它。然后,你可以继续消费其他消息。对于有序的消息监听器,因为你关心它的顺序,所以不能跳过消息,但是你可以返回SUSPEND_CURRENT_QUEUE_A_MOMENT 告诉消费者等待片刻
-
不建议阻塞监听器,因为它会阻塞线程池,并最终可能会终止消费进程
-
当建立一个新的消费组时,需要决定是否需要消费已经存在于 Broker 中的历史消息
CONSUME_FROM_LAST_OFFSET 将会忽略历史消息,并消费之后生成的任何消息。
CONSUME_FROM_FIRST_OFFSET 将会消费每个存在于 Broker 中的信息。
也可以使用 CONSUME_FROM_TIMESTAMP 来消费在指定时间戳后产生的消息。
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET);
-
3. Broker
-
broker角色
ASYNC_MASTER(异步主机)、SYNC_MASTER(同步主机)以及SLAVE(从机)
-
FlushDiskType
SYNC_FLUSH(同步刷新)相比于ASYNC_FLUSH(异步处理)会损失很多性能,但是也更可靠,所以需要根据实际的业务场景做好权衡
-
Broker配置
上一节已提到过
4. NameServer
NameServer设计:
-
NameServer互相独立,彼此没有通信关系,单台NameServer挂掉,不影响其他NameServer。
-
NameServer不去连接别的机器,不主动推消息。
-
单个Broker(Master、Slave)与所有NameServer进行定时注册,以便告知NameServer自己还活着。
-
Consumer随机与一个NameServer建立长连接,如果该NameServer断开,则从NameServer列表中查找下一个进行连接
-
Producer随机与一个NameServer建立长连接,每隔30秒(此处时间可配置)从NameServer获取Topic的最新队列情况,如果某个Broker Master宕机,Producer最多30秒才能感知,在这个期间,发往该broker master的消息失败。Producer向提供Topic服务的Master建立长连接,且定时向Master发送心跳
NameServer的作用:
1. NameServer 用来保存活跃的 broker 列表,包括 Master 和 Slave 。
2. NameServer 用来保存所有 topic 和该 topic 所有队列的列表。
3. NameServer 用来保存所有 broker 的 Filter 列表。
4. 命名服务器为客户端,包括生产者,消费者和命令行客户端提供最新的路由信息。
RocketMQ为什么不使用zookeeper?
在粗粒度分布式锁,分布式选主,主备高可用切换等不需要高TPS支持的场景下有不可替代的作
用,而这些需求往往多集中在大数据、离线任务等相关的业务领域,因为大数据领域,讲究分割数据
集,并且大部分时间分任务多进程/线程并行处理这些数据集,但是总是有一些点上需要将这些任务和
进程统一协调,这时候就是ZooKeeper发挥巨大作用的用武之地。
但是在交易场景交易链路上,在主业务数据存取,大规模服务发现、大规模健康监测等方面有天然
的短板,应该竭力避免在这些场景下引入ZooKeeper,在阿里巴巴的生产实践中,应用对ZooKeeper申
请使用的时候要进行严格的场景、容量、SLA需求的评估。
对于ZooKeeper,大数据使用,服务发现不用
5. 客户端配置
DefaultMQProducer、TransactionMQProducer、DefaultMQPushConsumer、DefaultMQPullConsumer都继承于ClientConfig类,ClientConfig为客户端的公共配置类
-
客户端寻址方式
RocketMQ可以令客户端找到Name Server, 然后通过Name Server再找到Broker。如下所示有多种配置方式,优先级由高到低,高优先级会覆盖低优先级
-
代码中指定NameServer地址
-
java启动参数中指定Name Server地址
-
环境变量指定NameServer地址
export NAMESRV_ADDR=192.168.0.1:9876;192.168.0.2:9876
-
HTTP静态服务器寻址(默认)
该静态地址,客户端第一次会10s后调用,然后每个2分钟调用一次。
客户端启动后,会定时访问一个静态HTTP服务器,地址如下:http://ip:port/rocketmq/nsaddr
-
-
客户端公共配置
参数名 默认值 说明 namesrvAddr nameServer地址列表,多个",“隔开 clientIP 本机ip 客户端本机ip instanceName 客户端实例名称,客户端创建的多个Producer、Consumer实际是共用一个内部实例(这个实例包含网络连接、线程资源等) clientCallbackExecutorThreads 4 通信层异步回调线程数 pollNameServerInteval 30000 轮询Name Server间隔时间,单位毫秒 heartbeatBrokerInterval 30000 向Broker发送心跳间隔时间,单位毫秒 persistConsumerOffsetInterval 5000 持久化Consumer消费进度间隔时间,单位毫秒 -
Producer配置
参数名 默认值 说明 producerGroup DEFAULT_PRODUCER Producer组名,多个Producer如果属于一个应用,发送同样的消 息,则应该将它们归为同一组 createTopicKey TBW102 在发送消息时,自动创建服务器不存在的topic,需要指定Key,该Key可用于配置发送消息所在topic的默认路由。 defaultTopicQueueNums 4 在发送消息,自动创建服务器不存在的topic时,默认创建的队列数 sendMsgTimeout 10000 发送消息超时时间,单位毫秒 compressMsgBodyOverHowmuch 4096 消息Body超过多大开始压缩(Consumer收到消息会自动解压缩),单位字节 retryAnotherBrokerWhenNotStoreOK FALSE 如果发送消息返回sendResult,但是sendStatus!=SEND_OK,是否重 试发送 retryTimesWhenSendFailed 2 如果消息发送失败,最大重试次数,该参数只对同步发送模式起作用 maxMessageSize 4MB 客户端限制的消息大小,超过报错,同时服务端也会限制,所以需 要跟服务端配合使用 transactionCheckListener 事务消息回查监听器,如果发送事 务消息,必须设置 checkThreadPoolMinSize 1 Broker回查Producer事务状态时,线程池最小线程数 checkThreadPoolMaxSize 1 Broker回查Producer事务状态时,线程池最大线程数 checkRequestHoldMax 2000 Broker回查Producer事务状态时, Producer本地缓冲请求队列大小 RPCHook null 该参数是在Producer创建时传入的,包含消息发送前的预处理和消息响应后的处理两个接口,用户可以在第一个接口中做一些安全控制或者其他操作 -
PushConsumer配置
参数名 默认值 说明 consumerGroup Consumer组名,多个Consumer如果属于 一个应用,订阅同样的消息,且消费逻辑 一致,则应该将它们归为同一组 messageModel CLUSTERING 消费模型支持集群消费和广播消费两种 consumeFromWhere CONSUME_FROM_LAST_OFFSET Consumer启动后,默认从上次消费的位置 开始消费,这包含两种情况:一种是上次消费的位置未过期,则消费从上次中止的 位置进行;一种是上次消费位置已经过期,则从当前队列第一条消息开始消费 consumeTimestamp 只有当consumeFromWhere值为CONSUME_FROM_TIMESTAMP时才起作 用。 allocateMessageQueueStrategy AllocateMessageQueueAveragely Rebalance算法实现策略 subscription 订阅关系 messageListener 消息监听器 offsetStore 消费进度存储 consumeThreadMin 10 消费线程池最小线程数 consumeThreadMax 20 消费线程池最大线程数 consumeConcurrentlyMaxSpan 2000 单队列并行消费允许的最大跨度 pullThresholdForQueue 1000 拉消息本地队列缓存消息最大数 pullInterval 0 拉消息间隔,由于是长轮询,所以为0,但 是如果应用为了流控,也可以设置大于0的 值,单位毫秒 consumeMessageBatchMaxSize 1 批量消费,一次消费多少条消息 pullBatchSize 32 批量拉消息,一次最多拉多少条 -
PullConsumer配置
参数名 默认值 说明 consumerGroup brokerSuspendMaxTimeMillis 20000 长轮询,Consumer 拉消息请求在 Broker挂起最长时间,单位毫秒 consumerTimeoutMillisWhenSuspend 30000 长轮询,Consumer拉消息请求在 Broker挂起超过指 定时间,客户端认为超时,单位毫秒 consumerPullTimeoutMillis 10000 非长轮询,拉消息超时时间,单位毫秒 messageModel BROADCASTING 消息支持两种模式:集群消费和广播消费 messageQueueListener 监听队列变化 offsetStore 消费进度存储 registerTopics 注册的topic集合 allocateMessageQueueStrategy AllocateMessageQueueAveragely Rebalance算法实现策略 -
Message数据结构
参数名 默认值 说明 Topic null 必填,消息所属topic的名称 Body null 必填,消息体 Tags null 选填,消息标签,方便服务器过滤使用。目前只支持每个消息设置一个tag Keys null 选填,代表这条消息的业务关键词,服务器会根据keys创建哈希索引,设置后,可以在Console系统根据Topic、Keys来查询消息,由于是哈希索引,请尽可能保证key唯一,例如订单号,商品Id等。 Flag 0 选填,完全由应用来设置,RocketMQ不做干预 DelayTimeLevel 0 选填,消息延时级别,0表示不延时,大于0会延时特定的时间才会被消费 WaitStoreMsgOK TRUE 选填,表示消息是否在服务器落盘后才返回应答
6. 系统配置
-
JVM选项
#设置Xms和Xmx一样大,防止JVM重新调整堆空间大小影响性能。 -server -Xms8g -Xmx8g -Xmn4g #设置DirectByteBuffer内存大小。当DirectByteBuffer占用达到这个值,就会触发Full GC。 -XX:MaxDirectMemorySize=15g #如果不太关心RocketMQ的启动时间,可以设置pre-touch,这样在JVM启动的时候就会分配完整的页空间。 -XX:+AlwaysPreTouch #禁用偏向锁可能减少JVM的停顿,因为偏向锁在线程需要获取锁之前会判断当前线程是否拥有锁 #如果拥有,就不用再去获取锁了。 #在并发小的时候使用偏向锁有利于提升JVM效率,在高并发场合禁用掉。 -XX:-UseBiasedLocking
推荐使用JDK1.8的G1垃圾回收器:当在G1的GC日志中看到 to-space overflow 或者 to-space exhausted 的时候,表示G1没有足够的内存使用的(可能是 survivor 区不够了,可能是老年代不够了,也可能是两者都不够了),这时候表示Java堆占用大小已经达到了最大值
对G1而言,大小超过region大小50%的对象将被认为是大对象,这种大对象将直接被分配到老年代的humongous regions中,humongous regions是连续的region集合,StartsHumongous表记集合从那里开始,ContinuesHumongous标记连续集合
在分配大对象之前,将会检查标记阈值,如果有必要的话,还会启动并发周期。
死亡的大对象会在标记周期的清理阶段和发生Full GC的时候被清理。
为了减少复制开销,任何转移阶段都不包含大对象的复制。在Full GC时,G1在原地压缩大对象。因为每个独立的humongous regions只包含一个大对象,因此从大对象的结尾到它占用的最后一个region的结尾的那部分空间时没有被使用的,对于那些大小略大于region整数倍的对象,这些没有被使用的内存将导致内存碎片化。
如果你看到因为大对象的分配导致不断的启动并发收集,并且这种分配使得老年代碎片化不断加剧,那么请增加-XX:G1HeapRegionSize参数的值,这样的话,大对象将不再被G1认为是大对象,它会走普通对象的分配流程。
# G1回收器将堆空间划分为1024个region,此选项指定堆空间region的大小 -XX:+UseG1GC -XX:G1HeapRegionSize=16m -XX:G1ReservePercent=25 -XX:InitiatingHeapOccupancyPercent=30
上述设置可能有点儿激进,但是对于生产环境,性能很好。
-XX:MaxGCPauseMillis不要设置的太小,否则JVM会使用小的年轻代空间以达到此设置的值,同时引起很频繁的minor GC。
#推荐使用GC log文件: -XX:+UseGCLogFileRotation -XX:NumberOfGCLogFiles=5 -XX:GCLogFileSize=30m #如果写GC文件增加了Broker的延迟,可以考虑将GC log文件写到内存文件系统: -Xloggc:/dev/shm/mq_gc_%p.log123
-
linux内核参数
os.sh脚本在bin文件夹中列出了许多内核参数,可以进行微小的更改然后用于生产用途
# 获取内核参数值 sysctl vm.extra_free_kbytes # 设置内核参数值 sudo sysctl -w vm.overcommit_memory=1
vm.extra_free_kbytes,告诉VM在后台回收(kswapd)启动的阈值与直接回收(通过分配进程)的阈值之间保留额外的可用内存。RocketMQ使用此参数来避免内存分配中的长延迟。(与具体内核版本相关)
vm.min_free_kbytes,如果将其设置为低于1024KB,将会巧妙的将系统破坏,并且系统在高负载下容易出现死锁。
vm.max_map_count,限制一个进程可能具有的最大内存映射区域数。RocketMQ将使用mmap加载CommitLog和ConsumeQueue,因此建议将为此参数设置较大的值。
vm.swappiness,定义内核交换内存页面的积极程度。较高的值会增加攻击性,较低的值会减少交换量。建议将值设置为10来避免交换延迟。
File descriptor limits,RocketMQ需要为文件(CommitLog和ConsumeQueue)和网络连接打开文件描述符。我们建议设置文件描述符的值为655350。
Disk scheduler,RocketMQ建议使用I/O截止时间调度器,它试图为请求提供有保证的延迟
7. 动态扩缩容
-
动态增减NameServer机器
通过HTTP服务来设置方式看似繁琐,但它是唯一支持动态增加NameServer,无须重启其他组件的方式。使用这种方式后其他组件会每隔2分钟请求一次该URL,获取最新的NameServer地址
-
动态增减Broker机器
集群扩容后,一是可以把新建的Topic指定到新的Broker机器上,均衡利用资源;另一种方式是通过updateTopic命令更改现有的Topic配置,在新加的Broker上创建新的队列
如果因为业务变动或者置换机器需要减少Broker,此时该如何操作呢?减少Broker要看是否有持续运行的Producer,当一个Topic只有一个Master Broker,停掉这个Broker后,消息的发送肯定会受到影响,需要在停止这个Broker前,停止发送消息
当某个Topic有多个Master Broker,停了其中一个,这时候是否会丢失消息呢?答案和Producer使用的发送消息的方式有关,如果使用同步方式send(msg)发送,在DefaultMQProducer内部有个自动重试逻辑,其中一个Broker停了,会自动向另一个Broker发消息,不会发生丢消息现象。如果使用异步方式发送send(msg,callback),或者用sendOneWay方式,会丢失切换过程中的消息。因为在异步和sendOneWay这两种发送方式下,Producer.setRetryTimesWhenSendFailed设置不起作用,发送失败不会重试。DefaultMQProducer默认每30秒到NameServer请求最新的路由消息,Producer如果获取不到已停止的Broker下的队列信息,后续就自动不再向这些队列发送消息
如果Producer程序能够暂停,在有一个Master和一个Slave的情况下也可以顺利切换。可以关闭Producer后关闭Master Broker,这个时候所有的读取都会被定向到Slave机器,消费消息不受影响。把Master Broker机器置换完后,基于原来的数据启动这个Master Broker,然后再启动Producer程序正常发送消息
8. 各种故障对消息的影响
-
Broker正常关闭,启动;
属于可控的软件问题,内存中的数据不会丢失
-
Broker异常Crash,然后启动;
属于软件故障,内存的数据可能丢失,所以刷盘策略不同,造成的影响也不同,
-
OS Crash,重启;
属于软件故障,内存的数据可能丢失,所以刷盘策略不同,造成的影响也不同,
-
机器断电,但能马上恢复供电;
属于软件故障,内存的数据可能丢失,所以刷盘策略不同,造成的影响也不同,
-
磁盘损坏;
属于硬件故障,发生第5、6种情况的故障,原有机器的磁盘数据可能会丢失
-
CPU、主板、内存等关键设备损坏。
属于硬件故障,发生第5、6种情况的故障,原有机器的磁盘数据可能会丢失