RocketMQ高级特性及原理
示例代码: demo04-rocketmq
1. 消息发送
生产者向消息队列里写入消息,不同的业务场景需要生产者采用不同的写入策略。比如同步发送、异步发送、Oneway发送、延迟发送、发送事务消息等
发送消息5个步骤:
1)设置Producer的GroupName。
2)设置InstanceName,当一个Jvm需要启动多个Producer的时候,通过设置不同的
InstanceName来区分,不设置的话系统使用默认名称“DEFAULT”。
3)设置发送失败重试次数,当网络出现异常的时候,这个次数影响消息的重复投递次数。
想保证不丢消息,可以设置多重试几次。
4)设置NameServer地址
5)组装消息并发送。
消息返回4种状态:
- FLUSH_DISK_TIMEOUT:表示没有在规定时间内完成刷盘(需要Broker的刷盘策略被设置成SYNC_FLUSH才会报这个错误)。
- FLUSH_SLAVE_TIMEOUT:表示在主备方式下,并且Broker被设置成SYNC_MASTER方式,没有在设定时间内完成主从同步
- SLAVE_NOT_AVAILABLE:这个状态产生的场景和FLUSH_SLAVE_TIMEOUT类似,表示在主备方式下,并且Broker被设置成SYNC_MASTER,但是没有找到被配置成Slave的Broker。
- SEND_OK:表示发送成功,发送成功的具体含义,比如消息是否已经被存储到磁盘?消息是否被同步到了Slave上?消息在Slave上是否被写入磁盘?需要结合所配置的刷盘策略、主从策略来定。这个状态还可以简单理解为,没有发生上面列出的三个问题状态就是SEND_OK。
在一些对速度要求高,但是可靠性要求不高的场景下,比如日志收集类应用, 可以采用Oneway方式发送
Oneway方式只发送请求不等待应答,即将数据写入客户端的Socket缓冲区就返回,不等待对方返回结果
另一种提高发送速度的方法是增加Producer的并发量,使用多个Producer同时发送
我们不用担心多Producer同时写会降低消息写磁盘的效率,RocketMQ引入了一个并发窗口,在窗口内消息可以并发地写入DirectMem中,然后异步地将连续一段无空洞的数据刷入文件系统当中
顺序写CommitLog可让RocketMQ无论在HDD还是SSD磁盘情况下都能保持较高的写入性能
2. 消息消费
要点:
- 消息消费方式:pull/push
- 消费模式: 广播模式/集群模式
- 流量控制
- 并发线程设置
- 消息过滤(tag/key)
当Consumer的处理速度跟不上消息的产生速度,会造成越来越多的消息积压,这个时候首先查看消费逻辑本身有没有优化空间,除此之外还有三种方法可以提高Consumer的处理能力
-
提高消费并行度
同一个consumerGroup 下, 增加consumer实例的数量 ,但不要超过Read Queue的数量
-
以批量方式进行消费
实现方法是设置Consumer的consumeMessageBatchMaxSize这个参数,默认是1,如果设置为N,在消息多的时候每次收到的是个长度为N的消息链表。
-
检测延时情况,路过非重要消息
3. 消息存储
3.1 存储介质
-
关系型数据库
-
文件系统
目前业界较为常用的几款产品(RocketMQ/Kafka/RabbitMQ)均采用的是消息刷盘至所部署虚拟机/物理机的文件系统来做持久化(刷盘一般可以分为异步刷盘和同步刷盘两种模式)
3.2 性能对比
文件系统>DB
3.3 消息的存储和发送
-
消息存储
目前的高性能磁盘,顺序写速度可以达到600MB/s, 超过了一般网卡的传输速度。 但是磁盘随机写的速度只有大概100KB/s,和顺序写的性能相差6000倍! 因为有如此巨大的速度差别,好的消息队列系统会比普通的消息队列系统速度快多个数量级。 RocketMQ的消息用顺序写,保证了消息存储的速度
-
存储结构
RocketMQ消息的存储是由ConsumeQueue和CommitLog配合完成 的,消息真正的物理存储文件是CommitLog,ConsumeQueue是消息的逻辑队列,类似数据库的索引文件,存储的是指向物理存储的地址。每 个Topic下的每个Message Queue都有一个对应的ConsumeQueue文件
消息存储架构图中主要有下面三个跟消息存储相关的文件构成。
-
commitLog
消息主体以及元数据的存储主体,存储Producer端写入的消息主体内容,消 息内容不是定长的。单个文件大小默认1G ,文件名长度为20位,左边补零,剩余为起始偏移 量,比如00000000000000000000代表了第一个文件,起始偏移量为0,文件大小为 1G=1073741824;当第一个文件写满了,第二个文件为00000000001073741824,起始偏 移量为1073741824,以此类推。消息主要是顺序写入日志文件,当文件满了,写入下一个文 件
-
ConsumeQueue
消息消费队列,引入的目的主要是提高消息消费的性能 RocketMQ是基于主题topic的订阅模式,消息消费是针对主题进行 如果要遍历commitlog文件根据topic检索消息是非常低效。 Consumer即可根据ConsumeQueue来查找待消费的消息
其中,ConsumeQueue(逻辑消费队列)作为消费消息的索引
1. 保存了指定Topic下的队列消息在CommitLog中的起始物理偏移量offset 2. 消息大小size 3. 消息Tag的HashCode值。
consumequeue文件可以看成是基于topic的commitlog索引文件,故consumequeue文件夹的组
织方式如下:topic/queue/file三层组织结构
consumequeue文件采取定长设计,每个条目共20个字节,分别为: 1. 8字节的commitlog物理偏移量 2. 4字节的消息长度 3. 8字节tag hashcode
-
indexFile
IndexFile(索引文件)提供了一种可以通过key或时间区间来查询消息的方法
1. Index文件的存储位置是: $HOME/store/index/${fileName} 2. 文件名fileName是以创建时的时间戳命名的 3. 固定的单个IndexFile文件大小约为400M 4. 一个IndexFile可以保存 2000W个索引 5. IndexFile的底层存储设计为在文件系统中实现HashMap结构,故rocketmq的索引文件其底 层实现为hash索引。
-
4. 过滤消息
RocketMQ分布式消息队列的消息过滤方式有别于其它MQ中间件,是在Consumer端订阅消息时再做消息过滤的。
RocketMQ这么做是在于其Producer端写入消息和Consumer端订阅消息采用分离存储的机制来实现的,Consumer端订阅消息是需要通过ConsumeQueue这个消息消费的逻辑队列拿到一个索引,然后再从CommitLog里面读取真正的消息实体内容,所以说到底也是还绕不开其存储结构
其ConsumeQueue的存储结构如下,可以看到其中有8个字节存储的Message Tag的哈希值,基于Tag的消息过滤正式基于这个字段值的。
两种过滤方式:
-
Tag过滤方式: 消费端在订阅消息时除了指定topic 还指定tag,多个用||分隔
1. Consumer端会将这个订阅请求构建成一个 SubscriptionData,发送一个Pull消息的请求给 Broker端。 2. Broker端从RocketMQ的文件存储层—Store读取数据之前,会用这些数据先构建一个 MessageFilter,然后传给Store。 3. Store从 ConsumeQueue读取到一条记录后,会用它记录的消息tag hash值去做过滤。 4. 在服务端只是根据hashcode进行判断,无法精确对tag原始字符串进行过滤,在消息消费端拉 取到消息后,还需要对消息的原始tag字符串进行比对,如果不同,则丢弃该消息,不进行消 息消费。
-
SQL92的过滤方式
仅对push的消费者起作用。Tag方式虽然效率高,但是支持的过滤逻辑比较简单。
SQL表达式可以更加灵活的支持复杂过滤逻辑,这种方式的大致做法和上面的Tag过滤方式一样,只是在Store层的具体过滤过程不太一样
真正的 SQL expression 的构建和执行由rocketmq-filter模块负责的。每次过滤都去执行SQL表达式会影响效率,所以RocketMQ使用了BloomFilter避免了每次都去执行。SQL92的表达式上下文为消息的属性
开启SQL92:
#broker.conf配置如下, 然后重启 enablePropertyFileter=true
基本语法:
1. 数字比较: >, >=, <, <=, BETWEEN, = 2. 字符串比较: =, <>, IN; IS NULL或者IS NOT NULL; 3. 逻辑比较: AND, OR, NOT; 4. Constant types are: 数字如:123, 3.1415; 字符串如:'abc',必须是单引号引起来 NULL,特 殊常量 布尔型如:TRUE or FALSE;
-
Filter Server方式
这是一种比SQL表达式更灵活的过滤方式,允许用户自定义Java函数,根据Java函数的逻辑对消息进行过滤
要使用Filter Server,首先要在启动Broker前在配置文件里加上 filterServer-Nums=3 这样的配置,Broker在启动的时候,就会在本机启动3个Filter Server进程。Filter Server类似一个RocketMQ的Consumer进程,它从本机Broker获取消息,然后根据用户上传过来的Java函数进行过滤,过滤后的消息再传给远端的Consume
这种方式会占用很多Broker机器的CPU资源,要根据实际情况谨慎使用
见示例代码
5. 零拷贝原理
5.1 PageCache
- 由内存中的物理page组成,其内容对应磁盘上的block。
- page cache的大小是动态变化的。
- backing store: cache缓存的存储设备
- 一个page通常包含多个block, 而block不一定是连续的。
-
读cache
-
当内核发起一个读请求时, 先会检查请求的数据是否缓存到了page cache中。
-
如果有,那么直接从内存中读取,不需要访问磁盘, 此即 cache hit(缓存命中)
-
如果没有, 就必须从磁盘中读取数据, 然后内核将读取的数据再缓存到cache中, 如此
-
-
后续的读请求就可以命中缓存了。
page可以只缓存一个文件的部分内容, 而不需要把整个文件都缓存进来。
-
-
写cache
-
当内核发起一个写请求时, 也是直接往cache中写入, 后备存储中的内容不会直接更新。
-
内核会将被写入的page标记为dirty, 并将其加入到dirty list中。
-
内核会周期性地将dirty list中的page写回到磁盘上, 从而使磁盘上的数据和内存中缓存的数据一致。
-
-
cache回收
-
Page cache的另一个重要工作是释放page, 从而释放内存空间。
-
cache回收的任务是选择合适的page释放
如果page是dirty的, 需要将page写回到磁盘中再释放。
-
5.2 cache与buffer的区别
-
cache
缓存区,是高速缓存,是位于CPU和主内存之间的容量较小但速度很快的存储器,因为CPU的速度远远高于主内存的速度,CPU从内存中读取数据需等待很长的时间,而 Cache保存着CPU刚用过的数据或循环使用的部分数据,这时从Cache中读取数据会更快,减少了CPU等待的时间,提高了系统的性能
Cache并不是缓存文件的,而是缓存块的(块是I/O读写最小的单元);Cache一般会用在I/O请求上,如果多个进程要访问某个文件,可以把此文件读入Cache中,这样下一个进程获取CPU控制权并访问此文件直接从Cache读取,提高系统性能
-
buffer
缓冲区,用于存储速度不同步的设备或优先级不同的设备之间传输数据;通过buffer可以减少进程间通信需要等待的时间,当存储速度快的设备与存储速度慢的设备进行通信时,存储慢的数据先把数据存放到buffer,达到一定程度存储快的设备再读取buffer的数据,在此期间存储快的设备CPU可以干其他的事情
5.3 HeapByteBuffer和DirectByteBuffer
HeapByteBuffer,是在jvm堆上面一个buffer,底层的本质是一个数组,用类封装维护了很多的索引(limit/position/capacity等)。优点:内容维护在jvm里,把内容写进buffer里速度快;更容易回收
DirectByteBuffer,底层的数据是维护在操作系统的内存中,而不是jvm里,DirectByteBuffer里维护了一个引用address指向数据,进而操作数据. 优点:跟外设(IO设备)打交道时会快很多,因为外设读取jvm堆里的数据时,不是直接读取的,而是把jvm里的数据读到一个内存块里,再在这个块里读取的,如果使用DirectByteBuffer,则可以省去这一步,实现zero copy(零拷贝)
外设之所以要把jvm堆里的数据copy出来再操作,不是因为操作系统不能直接操作jvm内存,而是因为jvm在进行gc(垃圾回收)时,会对数据进行移动,一旦出现这种问题,外设就会出现数据错乱的情况
所有的通过allocate方法创建的buffer都是HeapByteBuffer.
堆外内存实现堆拷贝
1. 前者分配在JVM堆上(ByteBuffer.allocate()),后者分配在操作系统物理内存上
(ByteBuffer.allocateDirect(),JVM使用C库中的malloc()方法分配堆外内存);
2. DirectByteBuffer可以减少JVM GC压力,当然,堆中依然保存对象引用,fullgc发生时也会回
收直接内存,也可以通过system.gc主动通知JVM回收,或者通过 cleaner.clean主动清理。
Cleaner.create()方法需要传入一个DirectByteBuffer对象和一个Deallocator(一个堆外内存
回收线程)。GC发生时发现堆中的DirectByteBuffer对象没有强引用了,则调用Deallocator
的run()方法回收直接内存,并释放堆中DirectByteBuffer的对象引用;
3. 底层I/O操作需要连续的内存(JVM堆内存容易发生GC和对象移动),所以在执行write操作时
需要将HeapByteBuffer数据拷贝到一个临时的(操作系统用户态)内存空间中,会多一次额外拷贝。而DirectByteBuffer则可以省去这个拷贝动作,这是Java层面的 “零拷贝” 技术,在
netty中广泛使用;
4. MappedByteBuffer底层使用了操作系统的mmap机制,FileChannel#map()方法就会返回
MappedByteBuffer。DirectByteBuffer虽然实现了MappedByteBuffer,不过DirectByteBuffer默认并没有直接使用mmap机制。
5.4 缓存io和直接io
-
缓存io
缓存I/O又被称作标准I/O,大多数文件系统的默认I/O操作都是缓存I/O。在Linux的缓存I/O机制中,数据先从磁盘复制到内核空间的缓冲区,然后从内核空间缓冲区复制到应用程序的地址空间。
读操作:操作系统检查内核的缓冲区有没有需要的数据,如果已经缓存了,那么就直接从缓存中返回;否则从磁盘中读取,然后缓存在操作系统的缓存中。 写操作:将数据从用户空间复制到内核空间的缓存中。这时对用户程序来说写操作就已经完成,至于什么时候再写到磁盘中由操作系统决定,除非显示地调用了sync同步命令。
优点:
1. 在一定程度上分离了内核空间和用户空间,保护系统本身的运行安全; 2. 可以减少读盘的次数,从而提高性能。
缺点:
在缓存 I/O 机制中,DMA 方式可以将数据直接从磁盘读到页缓存中,或者将数据从页缓存直 接写回到磁盘上,而不能直接在应用程序地址空间和磁盘之间进行数据传输。数据在传输过程中就需要在应用程序地址空间(用户空间)和缓存(内核空间)之间进行多次数据拷贝操作,这些数据拷贝操作所带来的CPU以及内存开销是非常大的。
-
直接io
直接IO就是应用程序直接访问磁盘数据,而不经过内核缓冲区,这样做的目的是减少一次从内核缓冲区到用户程序缓存的数据复制。比如说数据库管理系统这类应用,它们更倾向于选择它们自己的缓存机制,因为数据库管理系统往往比操作系统更了解数据库中存放的数据,数据库管理系统可以提供一种更加有效的缓存机制来提高数据库中数据的存取性能
直接IO的缺点:如果访问的数据不在应用程序缓存中,那么每次数据都会直接从磁盘加载,这种直接加载会非常缓慢。通常直接IO与异步IO结合使用,会得到比较好的性能。
5.5 内存映射文件MMAP
在LINUX中我们可以使用mmap用来在进程虚拟内存地址空间中分配地址空间,创建和物理内存的映射关系。
-
映射关系可以分为两种
1. 文件映射 磁盘文件映射进程的虚拟地址空间,使用文件内容初始化物理内存。 2. 匿名映射 初始化全为0的内存空间。
-
对于映射关系是否共享又分为
1. 私有映射(MAP_PRIVATE) 多进程间数据共享,修改不反应到磁盘实际文件,是一个copy-onwrite(写时复制)的映射方式。 2. 共享映射(MAP_SHARED) 多进程间数据共享,修改反应到磁盘实际文件中。
-
因此总结起来有4种组合
1.私有文件映射 多个进程使用同样的物理内存页进行初始化,但是各个进程对内存文件的修改 不会共享,也不会反应到物理文件中 2. 私有匿名映射 mmap会创建一个新的映射,各个进程不共享,这种使用主要用于分配内存 (malloc分配大内存会调用mmap)。 例如开辟新进程时,会为每个进程分配虚拟的地址空间, 这些虚拟地址映射的物理内存空间各个进程间读的时候共享,写的时候会copy-on-write。 3. 共享文件映射 多个进程通过虚拟内存技术共享同样的物理内存空间,对内存文件 的修改会反 应到实际物理文件中,他也是进程间通信(IPC)的一种机制。 4. 共享匿名映射 这种机制在进行fork的时候不会采用写时复制,父子进程完全共享同样的物理 内存页,这也就实现了父子进程通信(IPC).
mmap只是在虚拟内存分配了地址空间,只有在第一次访问虚拟内存的时候才分配物理内存
5.6 直接内存读取发送文件的过程
5.7 Mmap读取并发送文件的过程
5.8 sendfile零拷贝读取和发送文件的过程
零拷贝总结:
-
虽然叫零拷贝,实际上sendfile有2次数据拷贝的。第1次是从磁盘拷贝到内核缓冲区,第二次是从内核缓冲区拷贝到网卡(协议引擎)。如果网卡支持 SG-DMA(The Scatter-Gather Direct Memory Access)技术,就无需从PageCache拷贝至 Socket 缓冲区;
-
之所以叫零拷贝,是从内存角度来看的,数据在内存中没有发生过拷贝,只是在内存和I/O设备之间传输。很多时候我们认为sendfile才是零拷贝,mmap严格来说不算;
-
Linux中的API为sendfile、mmap,Java中的API为FileChanel.transferTo()、FileChannel.map()等;
-
Netty、Kafka(sendfile)、Rocketmq(mmap)、Nginx等高性能中间件中,都有大量利用操作系统零拷贝特性。
6. 同步复制和异步复制
如果一个Broker组有Master和Slave,消息需要从Master复制到Slave 上,有同步和异步两种复制方式
-
同步复制
同步复制方式是等Master和Slave均写 成功后才反馈给客户端写成功状态
在同步复制方式下,如果Master出故障,Slave上有全部的备份数据,容易恢复,但是同步复制会增大数据写入延迟,降低系统吞吐量
-
异步复制
异步复制方式是只要Master写成功即可反馈给客户端写成功状态
在异步复制方式下,系统拥有较低的延迟和较高的吞吐量,但是如果Master出了故障,有些数据因为没有被写入Slave,有可能会丢失;
-
配置
同步复制和异步复制是通过Broker配置文件里的brokerRole参数进行设置的,这个参数可以被设置成ASYNC_MASTER(异步)、 SYNC_MASTER、SLAVE三个值中的一个。
broker.conf参数
参数 默认 说明 listenPort 10911 接受客户端连接的监听端口 namesrvAddr null nameSvr地址 brokerIP1 网卡的InetAddress 当前broker监听的Ip brokerIP2 与brokerIP1一样 存在主从 broker 时,如果在 broker 主节点上配置了 brokerIP2 属性,broker 从节点会连接主节点配置的 brokerIP2 进行同步 brokerName broker 的名称 brokerClusterName DefaultCluster 本 broker 所属的 Cluser 名称 brokerId 0 broker id, 0 表示 master, 其他的正整 数表示 slave storePathCommitLog $HOME/store/commitlog/ 存储 commit log 的路径 storePathConsumerQueue $HOME/store/consumequeue/ 存储 consume queue 的路径 mapedFileSizeCommitLog 1024 * 1024 * 1024(1G) commit log 的映射文件大小 deleteWhen 04 在每天的什么时间删除已经超过文件保留时间的 commit log fileReserverdTime 72 brokerRole ASYNC_MASTER ASYNC_MASTER(异步)、 SYNC_MASTER(同步)、SLAVE flushDiskType ASYNC_FLUSH SYNC_FLUSH/ASYNC_FLUSH
SYNC_FLUSH 模式下的 broker 保证在 收到确认生产者之前将消息刷盘。
ASYNC_FLUSH 模式下的 broker 则利 用刷盘一组消息的模式,可以取得更好 的性能 -
总结
实际应用中要结合业务场景,合理设置刷盘方式和主从复制方式, 尤其是SYNC_FLUSH方式,由于频繁地触发磁盘写动作,会明显降低性能。通常情况下,应该把Master和Save配置成ASYNC_FLUSH的刷盘 方式,主从之间配置成SYNC_MASTER的复制方式,这样即使有一台机器出故障,仍然能保证数据不丢,是个不错的选择。
7. 高可用配置
-
消息消费高可用
在Consumer的配置文件中,并不需要设置是从Master读还是从Slave 读,当Master不可用或者繁忙的时候,Consumer会被自动切换到从Slave 读, 这就达到了消费端的高可用性
-
消息发送高可用
在创建Topic的时候,把Topic的多个Message Queue创建在多个Broker组上(相同Broker名称,不同brokerId的机器组成一个Broker组),这样既可以在性能方面具有扩展性,也可以降低主节点故障对整体上带来的影响,而且当一个Broker组的Master不可用后,其他组的Master仍然可用,Producer仍然可以发送消息的
RocketMQ目前还不支持把Slave自动转成Master,如果机器资源不足,需要把Slave转成Master
RocketMQ 在 2018 年底迎来了一次重大的更新,引入 Dledger,增加了一种全新的复制方式。
例:
假如有3个节点,当主节点宕机的时候,2 个从节点会通过投票选出一个新的主节点来继续提供服务,相比主从的复制模式,解决了可用性的问题。
由于消息要至少复制到 2 个节点上才会返回写入成功,即使主节点宕机了,也至少有一个节点上的消息是和主节点一样的。
Dledger在选举时,总会把数据和主节点一样的从节点选为新的主节点,这样就保证了数据的一致性,既不会丢消息,还可以保证严格顺序。
当然,Dledger的复制方式也不是完美的,依然存在一些不足:
-
比如,选举过程中不能提供服务。
-
最少需要 3 个节点才能保证数据一致性,3 节点时,只能保证 1 个节点宕机时可用,如果 2个节点同时宕机,即使还有 1 个节点存活也无法提供服务,资源的利用率比较低。
-
另外,由于至少要复制到半数以上的节点才返回写入成功,性能上也不如主从异步复制的方式快。
-
8. 刷盘机制
RocketMQ 的所有消息都是持久化的,先写入系统 PageCache,然后刷盘,可以保证内存与磁盘都有一份数据,访问时,直接从内存读取。消息在通过Producer写入RocketMQ的时候,有两种写磁盘方式,分布式同步刷盘和异步刷盘
-
同步刷盘
同步刷盘与异步刷盘的唯一区别是异步刷盘写完 PageCache直接返回,而同步刷盘需要等待刷盘完成才返回,同步刷盘流程如下:
(1). 写入 PageCache后,线程等待,通知刷盘线程刷盘。 (2). 刷盘线程刷盘后,唤醒前端等待线程,可能是一批线程。 (3). 前端等待线程向用户返回成功
-
异步刷盘
在有 RAID 卡,SAS 15000 转磁盘测试顺序写文件,速度可以达到 300M 每秒左右,而线上的网卡一般都为千兆网卡,写磁盘速度明显快于数据网络入口速度,那么是否可以做到写完内存就向用户返回,由后台线程刷盘呢?
1. 由于磁盘速度大于网卡速度,那么刷盘的进度肯定可以跟上消息的写入速度。 2. 万一由于此时系统压力过大,可能堆积消息,除了写入 IO,还有读取 IO,万一出现磁盘读取 落后情况,会不会导致系统内存溢出,答案是否定的,原因如下: - 写入消息到 PageCache时,如果内存不足,则尝试丢弃干净的 PAGE,腾出内存供新消息使用,策略是LRU 方式。 - 如果干净页不足,此时写入 PageCache会被阻塞,系统尝试刷盘部分数据,大约每次尝试 32 个 PAGE , 来找出更多干净 PAGE。
综上,内存溢出的情况不会出现
9. 负载均衡
RocketMQ中的负载均衡都在Client端完成,具体来说的话,主要可以分为Producer端发送消息时候的负载均衡和Consumer端订阅消息的负载均衡。
-
Producer的负载均衡
如图所示,5 个队列可以部署在一台机器上,也可以分别部署在 5 台不同的机器上,发送消息通过轮询队列的方式发送,每个队列接收平均的消息量。通过增加机器,可以水平扩展队列容量。另外也可以自定义方式选择发往哪个队列。
-
consumer负载均衡
如图所示,如果有 5 个队列,2 个 consumer,那么第一个 Consumer 消费 3 个队列,第二consumer 消费 2 个队列。这样即可达到平均消费的目的,可以水平扩展 Consumer 来提高消费能力。但是 Consumer 数量要小于等于队列数量,如果 Consumer 超过队列数量,那么多余的Consumer 将不能消费消息。
要做负载均衡,必须知道一些全局信息,也就是一个ConsumerGroup里到底有多少个Consumer。
在RocketMQ中,负载均衡或者消息分配是在Consumer端代码中完成的,Consumer从Broker处获得全局信息,然后自己做负载均衡,只处理分给自己的那部分消息。
pull模式:
Pull Consumer可以看到所有的Message Queue,而且从哪个Message Queue读取消息,读消息时的Offset都由使用者控制,使用者可以实现任何特殊方式的负载均衡。
DefaultMQPullConsumer有两个辅助方法可以帮助实现负载均衡,一个是registerMessageQueueListener函数,一个是MQPullConsumerScheduleService(使用这个Class类似使用DefaultMQPushConsumer,但是它把Pull消息的主动性留给了使用者)
// 指定从哪个MQ拉取数据即可 PullResult result = consumer.pull(messageQueue, "*", 0L, 10);
Push模式:
DefaultMQPushConsumer的负载均衡过程不需要使用者操心,客户端程序会自动处理,每个DefaultMQPushConsumer启动后,会马上会触发一个doRebalance****动作;而且在同一个ConsumerGroup里加入新的DefaultMQPush-Consumer时,各个Consumer都会被触发
doRebalance动作。
// 设置负载均衡算法 consumer.setAllocateMessageQueueStrategy(new AllocateMessageQueueAveragely());
10. 消息重试
-
顺序消息的重试
对于顺序消息,当消费者消费消息失败后,消息队列 RocketMQ 会自动不断进行消息重试(每次间隔时间为 1 秒),这时,应用会出现消息消费被阻塞的情况。因此,在使用顺序消息时,务必保证应用能够及时监控并处理消费失败的情况,避免阻塞现象的发生。
-
无序消息的重试
对于无序消息(普通、定时、延时、事务消息),当消费者消费消息失败时,您可以通过设置返回状态达到消息重试的结果
无序消息的重试只针对集群消费方式生效;广播方式不提供失败重试特性,即消费失败后,失败消息不再重试,继续消费新的消息
-
重试次数
消息队列 RocketMQ 默认允许每条消息最多重试 16 次,每次重试的间隔时间如下:
如果消息重试 16 次后仍然失败,消息将不再投递。如果严格按照上述重试时间间隔计算,某条消息在一直消费失败的前提下,将会在接下来的 4 小时 46 分钟之内进行 16 次重试,超过这个时间范围消息将不再重试投递
-
配置方式
集群消费方式下,消息消费失败后期望消息重试,需要在消息监听器接口的实现中明确进行配置(三种方式任选一种):
返回 ConsumeConcurrentlyStatus.RECONSUME_LATER; (推荐) 返回 Null 抛出异常
//方式1:返回 ConsumeConcurrentlyStatus.RECONSUME_LATER,消息将重试 // return ConsumeConcurrentlyStatus.RECONSUME_LATER; // 方式2:返回 null,消息将重试 // return null; // 方式3:直接抛出异常, 消息将重试 // throw new RuntimeException("Consumer Message exceotion");
自定义消息最大重试次数
最大重试次数小于等于 16 次,则重试时间间隔同上表描述。 最大重试次数大于 16 次,超过 16 次的重试时间间隔均为每次 2 小时 DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("consumer_grp_04_01"); // 设置重新消费的次数 // 共16个级别,大于16的一律按照2小时重试 consumer.setMaxReconsumeTimes(20);
-
11. 死信队列
RocketMQ中消息重试超过一定次数后(默认16次)就会被放到死信队列中,在消息队列RocketMQ 中,这种正常情况下无法被消费的消息称为死信消息(Dead-Letter Message),存储死信消息的特殊队列称为死信队列(Dead-Letter Queue)。可以在控制台Topic列表中看到“DLQ”相关的Topic,默认命名是
- %RETRY%消费组名称(重试Topic)
- %DLQ%消费组名称(死信Topic)
- 死信队列也可以被订阅和消费,并且也会过期
死信消息特性:
- 不会再被消费者正常消费。
- 有效期与正常消息相同,均为 3 天,3 天后会被自动删除。因此,请在死信消息产生后的 3天内及时处理。
死信队列特性:
-
一个死信队列对应一个 Group ID, 而不是对应单个消费者实例。
-
如果一个 Group ID 未产生死信消息,消息队列 RocketMQ 不会为其创建相应的死信队列。
-
一个死信队列包含了对应 Group ID 产生的所有死信消息,不论该消息属于哪个 Topic。
12. 延迟消息
定时消息(延迟队列)是指消息发送到broker后,不会立即被消费,等待特定时间投递给真正的topic。 broker有配置项messageDelayLevel,默认值为“1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h”,18个level。可以配置自定义messageDelayLevel。注意,messageDelayLevel是broker的属性,不属于某个topic。发消息时,设置delayLevel等级即可:msg.setDelayLevel(level)。level有以下三种情况:
level == 0,消息为非延迟消息
1<=level<=maxLevel,消息延迟特定时间,例如level==1,延迟1s
level > maxLevel,则level== maxLevel,例如level==20,延迟2h
定时消息会暂存在名为SCHEDULE_TOPIC_XXXX的topic中,并根据delayTimeLevel存入特定的queue,queueId = delayTimeLevel – 1,即一个queue只存相同延迟的消息,保证具有相同发送延迟的消息能够顺序消费。broker会调度地消费SCHEDULE_TOPIC_XXXX,将消息写入真实的topic
需要注意的是,定时消息会在第一次写入和调度写入真实topic时都会计数,因此发送数量、tps都会变高
13. 顺序消息
顺序消息是指消息的消费顺序和产生顺序相同,在有些业务逻辑下,必须保证顺序。比如订单的生成、付款、发货,这3个消息必须按顺序处理才行
顺序消息分为全局顺序消息和部分顺序消息:
-
全局顺序消息指某个Topic下的所有消息都要保证顺序;
-
部分顺序消息只要保证每一组消息被顺序消费即可,比如上面订单消息的例子,只要保证同一个订单ID的三个消息能按顺序消费即可
RocketMQ在默认情况下不保证顺序,比如创建一个Topic,默认八个写队列,八个读队列。这时候一条消息可能被写入任意一个队列里;在数据的读取过程中,可能有多个Consumer,每个Consumer也可能启动多个线程并行处理,所以消息被哪个Consumer消费,被消费的顺序和写入的顺序是否一致是不确定的
要保证全局顺序消息,需要先把Topic的读写队列数设置为一,然后Producer和Consumer的并发设置也要是一。简单来说,为了保证整个Topic的全局消息有序,只能消除所有的并发处理,各部分都设置成单线程处理
MessageListenerOrderly并不是简单地禁止并发处理。在MessageListenerOrderly的实现中,为每个Consumer Queue加个锁,消费每个消息前,需要先获得这个消息对应的Consumer Queue所对应的锁,这样保证了同一时间,同一个Consumer Queue的消息不被并发消费,但不同Consumer Queue的消息可以并发处理
14. 事务消息
上面的逻辑似乎很好地实现了事务消息功能,它也是RocketMQ之前的版本实现事务消息的逻辑。但是因为RocketMQ依赖将数据顺序写到磁盘这个特征来提高性能,步骤4)却需要更改第一阶段消息的状态,这样会造成磁盘Catch的脏页过多,降低系统的性能。所以RocketMQ在4.x的版本中将这部分功能去除。系统中的一些上层Class都还在,用户可以根据实际需求实现自己的事务功能。
客户端有三个类来支持用户实现事务消息
第一个类是LocalTransaction-Executer,用来实例化步骤3)的逻辑,根据情况返回LocalTransactionState.ROLLBACK_MESSAGE或者LocalTransactionState.COMMIT_MESSAGE状态。
第二个类是TransactionMQProducer,它的用法和DefaultMQProducer类似,要通过它启动一个Producer并发消息,但是比DefaultMQProducer多设置本地事务处理函数和回查状态函数。
第三个类是TransactionCheckListener,实现步骤5)中MQ服务器的回查请求,返回LocalTransactionState.ROLLBACK_MESSAGE或者LocalTransactionState.COMMIT_MESSAGE
-
事务消息流程概要
分为两个流程:正常事务消息的发送及提交、事务消息的补偿流程
-
事务消息的发送及提交
(1) 发送消息(half消息)。 (2) 服务端响应消息写入结果。 (3) 根据发送结果执行本地事务(如果写入失败,此时half消息对业务不可见,本地逻辑不执行)。 (4) 根据本地事务状态执行Commit或者Rollback(Commit操作生成消息索引,消息对消费者可 见)
-
补偿流程
(1) 对没有Commit/Rollback的事务消息(pending状态的消息),从服务端发起一次“回查” (2) Producer收到回查消息,检查回查消息对应的本地事务的状态 (3) 根据本地事务状态,重新Commit或者Rollback 其中,补偿阶段用于解决消息Commit或者Rollback发生超时或者失败的情况。
-
-
事务消息设计
-
事务消息在一阶段对用户不可见
-
.Commit和Rollback操作以及Op消息的引入
-
Op消息的存储和对应关系
-
Half消息的索引构建
-
如何处理二阶段失败的消息?
如果在RocketMQ事务消息的二阶段过程中失败了,例如在做Commit操作时,出现网络问题导致 Commit失败,那么需要通过一定的策略使这条消息最终被Commit。RocketMQ采用了一种补偿机制, 称为“回查”。Broker端对未确定状态的消息发起回查,将消息发送到对应的Producer端(同一个Group 的Producer),由Producer根据消息来检查本地事务的状态,进而执行Commit或者Rollback。 Broker端通过对比Half消息和Op消息进行事务消息的回查并且推进CheckPoint(记录那些事务消息的 状态是确定的)
值得注意的是,rocketmq并不会无休止的的信息事务状态回查,默认回查15次,如果15次回查还是无法得知事务状态,rocketmq默认回滚该消息
-
15. 消息查询
RocketMQ支持按照下面两种维度(“按照Message Id查询消息”、“按照Message Key查询消息”)进行消息查询
-
按照MessageId查询消息
在RocketMQ中具体做法是:Client端从MessageId中解析出Broker的地址(IP地址和端口)和Commit Log的偏移地址后封装成一个RPC请求后,通过Remoting通信层发送(业务请求码:VIEW_MESSAGE_BY_ID)。Broker使用QueryMessageProcessor,使用请求中的 commitLog offset和 size 去 commitLog 中找到真正的记录并解析成一个完整的消息返回。
-
按照Message Key查询消息
1.根据查询的 key 的 hashcode%slotNum 得到具体的槽的位置(slotNum 是一个索引文件里面包 含的最大槽的数目, 例如图中所示 slotNum=5000000)。 2.根据 slotValue(slot 位置对应的值)查找到索引项列表的最后一项(倒序排列,slotValue 总是 指向最新的一个索引项)。 3.遍历索引项列表返回查询时间范围内的结果集(默认一次最大返回的 32 条记录) 4.Hash 冲突; 第一种,key 的 hash 值不同但模数相同,此时查询的时候会再比较一次 key 的 hash 值(每个索 引项保存了 key 的 hash 值),过滤掉 hash 值不相等的项。 第二种,hash 值相等但 key 不等, 出于性能的考虑冲突的检测放到客户端处理(key 的原始值是 存储在消息文件中的,避免对数据文件的解析), 客户端比较一次消息体的 key 是否相同。 5.存储;为了节省空间索引项中存储的时间是时间差值(存储时间-开始时间,开始时间存储在索引 文件头中), 整个索引文件是定长的,结构也是固定的。
MessageExt message = consumer.viewMessage("tp_demo_08","0A4E00A7178878308DB150A780BB0000");
16. 消息优先级
有些场景,需要应用程序处理几种类型的消息,不同消息的优先级不同。RocketMQ是个先入先出的队列,不支持消息级别或者Topic级别的优先级。业务中简单的优先级需求,可以通过间接的方式解决,下面列举三种优先级相关需求的具体处理方法
-
多个不同的消息类型使用同一个topic时,由于某一个种消息流量非常大,导致其他类型的消息无法及时消费,造成不公平,所以把流量大的类型消息在一个单独的 Topic,其他类型消息在另外一个Topic,应用程序创建两个 Consumer,分别订阅不同的 Topic,这样就可以了
-
情况和第一种情况类似,但是不用创建大量的 Topic。举个实际应用场景: 一个订单处理系统,接收 从 100家快递门店过来的请求,把这些请求通过 Producer 写入RocketMQ;订单处理程序通过 Consumer 从队列里读取消 息并处理,每天最多处理 1 万单 。 如果这 100 个快递门店中某几个门店订 单量 大增,比如门店一接了个大客户,一个上午就发出 2万单消息请求,这样其他 的 99 家门店可能被 迫等待门店一的 2 万单处理完,也就是两天后订单才能被处 理,显然很不公平 这时可以创建 一 个 Topic, 设置 Topic 的 MessageQueue 数 量 超过 100 个,Producer根据订 单的门店号,把每个门店的订单写人 一 个 MessageQueue。 DefaultMQPushConsumer默认是采用 循环的方式逐个读取一个 Topic 的所有 MessageQueue,这样如果某家门店订单量大增,这家门店对 应的 MessageQueue 消息数增多,等待时间增长,但不会造成其他家门店等待时间增长。 DefaultMQPushConsumer 默认的 pullBatchSize 是 32,也就是每次从某个 MessageQueue 读 取消息的时候,最多可以读 32 个 。 在上面的场景中,为了更 加公平,可以把 pullBatchSize 设置成1
-
强制优先级
TypeA、 TypeB、 TypeC 三类消息 。 TypeA 处于第一优先级,要确保只要有TypeA消息,必须优 先处理; TypeB处于第二优先 级; TypeC 处于第三优先级 。 对这种要求,或者逻辑更复杂的要求,就要 用 户自己编码实现优先级控制,如果上述的 三 类消息在一个 Topic 里,可以使 用 PullConsumer,自 主控制 MessageQueue 的遍历,以及消息的读取;如果上述三类消息在三个 Topic下,需要启动三个 Consumer, 实现逻辑控制三个 Consumer 的消费 。
17. 底层网络通信-netty
RocketMQ底层通信的实现是在Remoting模块里,因为借助了Netty而没有重复造轮子.RocketMQ的通信部分没有很多的代码,就是用Netty实现了一个自定义协议的客户端/服务器程序。
-
Remoting通信类结构
-
协议设计与缓解码
在Client和Server之间完成一次消息发送时,需要对发送的消息进行一个协议约定,因此就有必要自定义RocketMQ的消息协议。同时,为了高效地在网络中传输消息和对收到的消息读取,就需要对消息进行编解码。在RocketMQ中,RemotingCommand这个类在消息传输过程中对所有数据内容的封装,不但包含了所有的数据结构,还包含了编码解码操作
可见传输内容主要可以分为以下4部分: (1) 消息长度:总长度,四个字节存储,占用一个int类型; (2) 序列化类型&消息头长度:同样占用一个int类型,第一个字节表示序列化类型,后面三个字节 表示消息头长度; (3) 消息头数据:经过序列化后的消息头数据; (4) 消息主体数据:消息主体的二进制字节数据内容;
-
消息的通信方式和流程
在RocketMQ消息队列中支持通信的方式主要有同步(sync)、异步(async)、单向(oneway) 三种。其中“单向”通信模式相对简单,一般用在发送心跳包场景下,无需关注其Response。这里,主要介绍RocketMQ的异步通信流程。
-
Reactor主从多线程模型
RocketMQ的RPC通信采用Netty组件作为底层通信库,同样也遵循了Reactor多线程模型,同时又在这之上做了一些扩展和优化。
一个 Reactor 主线程(eventLoopGroupBoss)负责监听 TCP网络连接请求,建立好连接,创建SocketChannel,并注册到selector上
18. 限流
RocketMQ消费端中我们可以:
-
设置最大消费线程数
-
每次拉取消息条数等
同时:
- PushConsumer会判断获取但还未处理的消息个数、消息总大小、Offset的跨度,
- 任何一个值超过设定的大小就隔一段时间再拉取消息,从而达到流量控制的目的。
在 Apache RocketMQ 中,当消费者去消费消息的时候,无论是通过 pull 的方式还是 push 的方式,都可能会出现大批量的消息突刺。如果此时要处理所有消息,很可能会导致系统负载过高,影响稳定性。但其实可能后面几秒之内都没有消息投递,若直接把多余的消息丢掉则没有充分利用系统处理消息的能力。我们希望可以把消息突刺均摊到一段时间内,让系统负载保持在消息处理水位之下的同时尽可能地处理更多消息,从而起到“削峰填谷”的效果
-
sentinel介绍
是阿里中间件团队开源的,面向分布式服务架构的轻量级流量控制产品,主要以流量为切入点,从流量控制、熔断降级、系统负载保护等多个维度来帮助用户保护服务的稳定性
-
sentinel原理
Sentinel 专门为这种场景提供了匀速器的特性,可以把突然到来的大量请求以匀速的形式均摊,以固定的间隔时间让请求通过,以稳定的速度逐步处理这些请求,起到“削峰填谷”的效果,从而避免流量突刺造成系统负载过高。同时堆积的请求将会排队,逐步进行处理;当请求排队预计超过最大超时时长的时候则直接拒绝,而不是拒绝全部请求
比如在 RocketMQ 的场景下配置了匀速模式下请求 QPS 为 5,则会每 200 ms 处理一条消息,多余的处理任务将排队;同时设置了超时时间为 5 s,预计排队时长超过 5s 的处理任务将会直接被拒绝。示意图如下图所示:
RocketMQ 用户可以根据不同的 group 和不同的 topic 分别设置限流规则,限流控制模式设置为匀速器模式(RuleConstant.CONTROL_BEHAVIOR_RATE_LIMITER)