Kafka高级特性
1. 生产者
1.1 消息发送
-
数据生产流程解析
1.Producer创建时,会创建⼀个Sender线程并设置为守护线程。 2.⽣产消息时,内部其实是异步流程;⽣产的消息先经过拦截器->序列化器->分区器, 然后将消息缓存在缓冲区(该缓冲区也是在Producer创建时创建)。 3.批次发送的条件为:缓冲区数据⼤⼩达到batch.size或者linger.ms达到上限,哪个先达到就算哪个。 4.批次发送后,发往指定分区,然后落盘到broker;如果⽣产者配置了retrires参数⼤于0并且失败原因允许重 试,那么客户端内部会对该消息进⾏重试。 5.落盘到broker成功,返回⽣产元数据给⽣产者。 6.元数据返回有两种⽅式:⼀种是通过阻塞直接返回,另⼀种是通过回调返回。
-
必要参数配置
bootstrap.servers: broker地址列表,集群用 ","隔开, host1:port1,host2:port2,host3:port3 key.serializer: 实现了接⼝ org.apache.kafka.common.serialization.Serializer 的key序列化类。 value.serializer: 实现了接⼝ org.apache.kafka.common.serialization.Serializer 的value序列化类。 acks: 该选项控制着已发送消息的持久性。 acks=0 :⽣产者不等待broker的任何消息确认。只要将消息放到了socket的缓冲区,就认为消息已发送。不能保证服务器是否收到该消息, retries 设置也不起作⽤,因为客户端不关⼼消息是否发送失败。客户端收到的消息偏移量永远是-1。 acks=1 :leader将记录写到它本地⽇志,就响应客户端确认消息,⽽不等待follower副本的确认。如果leader确认了消息就宕机,则可能 会丢失消息,因为follower副本可能还没来得及同步该消息。 acks=all :leader等待所有同步的副本确认该消息。保证了只要有⼀个同步副本存在,消息就不会丢失。这是最强的可⽤性保证。等价于 acks=-1 。默认值为1,字符串。可选值:[all, -1, 0, 1] compression.type:⽣产者⽣成数据的压缩格式。默认是none(没有压缩)。允许的值: none , gzip , snappy 和 lz4 。压缩是对整 个消息批次来讲的。消息批的效率也影响压缩的⽐例。消息批越⼤,压缩效率越好。字符串类型的值。默认是none。 retries:设置该属性为⼀个⼤于1的值,将在消息发送失败的时候重新发送消息。该重试与客户端收到异常重新发送并⽆⼆⾄。允许重试但是不设 置 max.in.flight.requests.per.connection 为1,存在消息乱序的可能,因为如果两个批次发送到同⼀个分区,第⼀个失败了 重试,第⼆个成功了,则第⼀个消息批在第⼆个消息批后。int类型的值,默认:0,可选值:[0,...,2147483647]
-
序列化器
由于Kafka中的数据都是字节数组,在将消息发送到Kafka之前需要先将数据序列化为字节数组。
序列化器的作⽤就是⽤于序列化要发送的消息的。
Kafka使⽤
org.apache.kafka.common.serialization.Serializer
接⼝⽤于定义序列化器,将泛型指定类型的数据转换为字节数组, 实现类有以下这些:自定义序列化器: kafka-demo#kafka-client-demo#demo02**(https://gitee.com/ixinglan/kafka-demo.git)
-
分区器
-
会⾸先在可⽤的分区中分配分区号
-
如果没有可⽤的分区,则在该主题所有分区中分配分区号。
默认(DefaultPartitioner)分区计算:
- 如果record提供了分区号,则使⽤record提供的分区号
- 如果record没有提供分区号,则使⽤key的序列化后的值的hash值对分区数量取模
- 如果record没有提供分区号,也没有提供key,则使⽤轮询的⽅式分配分区号
如果要自定义分区器, 则需要:
-
⾸先开发Partitioner接⼝的实现类
-
在KafkaProducer中进⾏设置:configs.put(“partitioner.class”, “xxx.xx.Xxx.class”)
自定义分区器: kafka-demo#kafka-client-demo#demo03**(https://gitee.com/ixinglan/kafka-demo.git)
-
-
拦截器
Producer拦截器(interceptor)和Consumer端Interceptor是在Kafka0.10版本被引⼊的,主要⽤于实现Client 端的定制化控制逻辑。
对于Producer⽽⾔,Interceptor使得⽤户在消息发送前以及Producer回调逻辑前有机会对消息做⼀些定制化需求,⽐如修改消息等。同时,Producer允许⽤户指定多个Interceptor按序作⽤于同⼀条消息从⽽形成⼀个拦截链(interceptorchain)。Intercetpor的实现接⼝是org.apache.kafka.clients.producer.ProducerInterceptor,其定义的⽅法包括:
- onSend(ProducerRecord):该⽅法封装进KafkaProducer.send⽅法中,即运⾏在⽤户主线程中。Producer 确保在消息被序列化以计算分区前调⽤该⽅法。⽤户可以在该⽅法中对消息做任何操作,但最好保证不要修改消息所属的topic和分区,否则会影响⽬标分区的计算。
- onAcknowledgement(RecordMetadata,Exception):该⽅法会在消息被应答之前或消息发送失败时调⽤,并且通常都是在Producer回调逻辑触发之前。onAcknowledgement运⾏在Producer的IO线程中,因此不要在该⽅法中放⼊很重的逻辑,否则会拖慢Producer的消息发送效率。
- close:关闭Interceptor,主要⽤于执⾏⼀些资源清理⼯作。
如前所述,Interceptor可能被运⾏在多个线程中,因此在具体实现时⽤户需要⾃⾏确保线程安全。另外倘若指定了多个Interceptor,则Producer将按照指定顺序调⽤它们,并仅仅是捕获每个Interceptor可能抛出的异常记录到错误⽇志中⽽⾮在向上传递。这在使⽤过程中要特别留意。
自定义拦截器:
1.实现ProducerInterceptor接⼝
2.在KafkaProducer的设置中设置⾃定义的拦截器
自定义拦截器: kafka-demo#kafka-client-demo#demo04**(https://gitee.com/ixinglan/kafka-demo.git)
1.2 原理剖析
由上图可以看出:KafkaProducer有两个基本线程:
- 主线程:负责消息创建,拦截器,序列化器,分区器等操作,并将消息追加到消息收集器RecoderAccumulator中;
- 消息收集器RecoderAccumulator为每个分区都维护了⼀个Deque
类型的双端队列。 - ProducerBatch可以理解为是ProducerRecord的集合,批量发送有利于提升吞吐量,降低⽹络影响;
- 由于⽣产者客户端使⽤java.io.ByteBuffer在发送消息之前进⾏消息保存,并维护了⼀个BufferPool实现ByteBuffer的复⽤;该缓存池只针对特定⼤⼩(batch.size指定)的ByteBuffer 进⾏管理,对于消息过⼤的缓存,不能做到重复利⽤。
- 每次追加⼀条ProducerRecord消息,会寻找/新建对应的双端队列,从其尾部获取⼀个ProducerBatch,判断当前消息的⼤⼩是否可以写⼊该批次中。若可以写⼊则写⼊;若不可以写⼊,则新建⼀个ProducerBatch,判断该消息⼤⼩是否超过客户端参数配置batch.size的值,不超过,则以batch.size建⽴新的ProducerBatch,这样⽅便进⾏缓存重复利⽤;若超过,则以计算的消息⼤⼩建⽴对应的ProducerBatch,缺点就是该内存不能被复⽤了。
- 消息收集器RecoderAccumulator为每个分区都维护了⼀个Deque
- Sender线程:
- 该线程从消息收集器获取缓存的消息,将其处理为<Node,List
的形式,Node表示集群的broker节点。 - 进⼀步将<Node,List
转化为<Node,Request>形式,此时才可以向服务端发送数据。 - 在发送之前,Sender线程将消息以Map<NodeId,Deque
>的形式保存到InFlightRequests中进⾏缓存,可以通过其获取leastLoadedNode,即当前Node中负载压⼒最⼩的⼀个,以实现消息的尽快发出。
- 该线程从消息收集器获取缓存的消息,将其处理为<Node,List
1.3 生产者参数配置补充
retry.backoff.ms: 在向一个指定的主题分区重发消息的时候,重试之间的等待时间, 默认100
retries: 重试次数. 如果设置了重试,还想保证消息的有序性,需要设置 max_in_flight_requests_per_connection=1
request.timeout.ms: 客户端等待请求响应的最大时长.如果服务端响应超时,则会重发请求,除非达到重试次数.该设置应该比replica.lag.time.max.ms要大,以免在服务器延迟时间内重发消息.默认300000
interceptor.classes: 拦截器
compression.type: 生产者发送的所有数据的压缩方式.默认None, 支持gzip, snappy, lz4
send.buffer.tytes: tcp发送数据的时候使用的缓冲区大小.如果设置为0, 则使用操作系统默认
..........
可参考: https://kafka.apachecn.org/documentation.html#producerconfigs
2. 消费者
2.1 概念
-
消费者,消费组
消费者从订阅的主题消费消息,消费消息的偏移量保存在Kafka的名字是 __consumer_offsets 的主题中。
消费者还可以将⾃⼰的偏移量存储到Zookeeper,需要设置offset.storage=zookeeper, 推荐使用kafka
多个从同⼀个主题消费的消费者可以加⼊到⼀个消费组中。消费组中的消费者共享group_id。
configs.put("group.id", "xxx");
消费组均衡的给消费者分配分区,每个分区只由消费组中的一个消费者消费.
- ⼀个拥有四个分区的主题,包含⼀个消费者的消费组。此时,消费组中的消费者消费主题中的所有分区。并且没有重复的可能。
- 如果在消费组中添加⼀个消费者2,则每个消费者分别从两个分区接收消息
- 如果消费组有四个消费者,则每个消费者可以分配到⼀个分区。
- 如果向消费组中添加更多的消费者,超过主题分区数量,则有⼀部分消费者就会闲置,不会接收任何消息。
向消费组添加消费者是横向扩展消费能⼒的主要⽅式。
必要时,需要为主题创建⼤量分区,在负载增⻓时可以加⼊更多的消费者。但是不要让消费者的数量超过主题分区的数量。
除了通过增加消费者来横向扩展单个应⽤的消费能⼒之外,经常出现多个应⽤程序从同⼀个主题消费的情况。此时,每个应⽤都可以获取到所有的消息。只要保证每个应⽤都有⾃⼰的消费组,就可以让它们获取到主题所有的消息。
横向扩展消费者和消费组不会对性能造成负⾯影响。
为每个需要获取⼀个或多个主题全部消息的应⽤创建⼀个消费组,然后向消费组添加消费者来横向扩展消费能⼒和应⽤的处理能⼒,则每个消费者只处理⼀部分消息。
-
心跳机制
消费者宕机,退出消费组,触发再平衡,重新给消费组中的消费者分配分区
Kafka 的⼼跳是 Kafka Consumer 和 Broker 之间的健康检查,只有当 Broker Coordinator 正常时,Consumer才会发送⼼跳
Consumer 和 Rebalance 相关的 2 个配置参数:
session.timeout.ms: 字段 MemberMetadata.sessionTimeoutMs max.poll.interval.ms: 字段 MemberMetadata.rebalanceTimeoutMs
broker 处理⼼跳的逻辑在 GroupCoordinator 类中:如果⼼跳超期, broker coordinator 会把消费者从 group中移除,并触发 rebalance。
如果客户端发现⼼跳超期,客户端会标记 coordinator 为不可⽤,并阻塞⼼跳线程;如果超过了 poll 消息的间隔超过了 rebalanceTimeoutMs,则 consumer 告知 broker 主动离开消费组,也会触发 rebalance
2.2 消息接收
2.2.1 必要参数配置
参数 | 说明 |
---|---|
bootstrap.servers | 向Kafka集群建⽴初始连接⽤到的host/port列表。 客户端会使⽤这⾥列出的所有服务器进⾏集群其他服务器的发现,⽽不管是否指定了哪个服务器⽤作引导。这个列表仅影响⽤来发现集群所有服务器的初始主机。 字符串形式:host1:port1,host2:port2,…由于这组服务器仅⽤于建⽴初始链接,然后发现集群中的所有服务器,因此没有必要将集群中的所有地址写在这⾥。⼀般最好两台,以防其中⼀台宕掉。 |
key.deserializer | key的反序列化类,该类需要实现org.apache.kafka.common.serialization.Deserializer |
value.deserializer | ⽤于对消息的value进⾏反序列化。实现了org.apache.kafka.common.serialization.Deserializer接⼝的反序列化器, |
client.id | 当从服务器消费消息的时候向服务器发送的id字符串。在ip/port基础上提供应⽤的逻辑名称,记录在服务端的请求⽇志中,⽤于追踪请求的源。 |
group.id | ⽤于唯⼀标志当前消费者所属的消费组的字符串。 如果消费者使⽤组管理功能如subscribe(topic)或使⽤基于Kafka的偏移量管理策略,该项必须设置。 |
auto.offset.reset | 当Kafka中没有初始偏移量或当前偏移量在服务器中不存在(如,数据被删除了),该如何处理?earliest:⾃动重置偏移量到最早的偏移量 latest:⾃动重置偏移量为最新的偏移量 none:如果消费组原来的(previous)偏移量不存在,则向消费者抛异常 anything:向消费者抛异常 |
enable.auto.commit | 如果设置为true,消费者会⾃动周期性地向服务器提交偏移量。 |
2.2.2 订阅
主题和分区
- Topic,Kafka⽤于分类管理消息的逻辑单元,类似与MySQL的数据库。
- Partition,是Kafka下数据存储的基本单元,这个是物理上的概念。同⼀个topic的数据,会被分散的存储到多个partition中,这些partition可以在同⼀台机器上,也可以是在多台机器上。优势在于:有利于⽔平扩展,避免单台机器在磁盘空间和性能上的限制,同时可以通过复制来增加数据冗余性,提⾼容灾能⼒。为了做到均匀分布,通常partition的数量通常是BrokerServer数量的整数倍。
- ConsumerGroup,同样是逻辑上的概念,是Kafka实现单播和⼴播两种消息模型的⼿段。保证⼀个消费组获取到特定主题的全部的消息。在消费组内部,若⼲个消费者消费主题分区的消息,消费组可以保证⼀个主题的每个分区只被消费组中的⼀个消费者消费。
consumer采⽤pull模式从broker中读取数据。
采⽤pull模式,consumer可⾃主控制消费消息的速率,可以⾃⼰控制消费⽅式(批量消费/逐条消费),还可以选择不同的提交⽅式从⽽实现不同的传输语义。
2.2.3 反序列化
Kafka的broker中所有的消息都是字节数组,消费者获取到消息之后,需要先对消息进⾏反序列化处理,然后才能交给⽤户程序消费处理。消费者的反序列化器包括key的和value的反序列化器。
需要实现org.apache.kafka.common.serialization.Deserializer
Kafka默认提供了⼏个反序列化的实现:
自定义反序列化类: 需要实现org.apache.kafka.common.serialization.Deserializer
自定义反序列化器: kafka-demo#kafka-client-demo#demo05**(https://gitee.com/ixinglan/kafka-demo.git)
2.2.4 位移提交
Consumer需要向Kafka记录⾃⼰的位移数据,这个汇报过程称为提交位移(CommittingOffsets)
Consumer需要为分配给它的每个分区提交各⾃的位移数据
位移提交由Consumer端负责的,Kafka只负责保管。__consumer_offsets
位移提交分为⾃动提交和⼿动提交
位移提交分为同步提交和异步提交
-
自动提交
开启⾃动提交: enable.auto.commit=true
配置⾃动提交间隔:Consumer端: auto.commit.interval.ms ,默认 5s
⾃动提交位移的顺序 - 配置 enable.auto.commit = true - Kafka会保证在开始调⽤poll⽅法时,提交上次poll返回的所有消息 - 因此⾃动提交不会出现消息丢失,但会重复消费
重复消费举例 - Consumer 每 5s 提交 offset - 假设提交 offset 后的 3s 发⽣了 Rebalance - Rebalance 之后的所有 Consumer 从上⼀次提交的 offset 处继续消费 - 因此 Rebalance 发⽣前 3s 的消息会被重复消费
-
异步提交
同步提交:
-
使⽤ KafkaConsumer#commitSync():会提交 KafkaConsumer#poll() 返回的最新 offset
-
该⽅法为同步操作,等待直到 offset 被成功提交才返回
while (true) { ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(1)); process(records); // 处理消息 try { consumer.commitSync(); } catch (CommitFailedException e) { handle(e); // 处理提交失败异常 } }
-
commitSync 在处理完所有消息之后
-
⼿动同步提交可以控制offset提交的时机和频率
⼿动同步提交会: - 调⽤ commitSync 时,Consumer 处于阻塞状态,直到 Broker 返回结果 - 会影响 TPS - 可以选择拉⻓提交间隔,但有以下问题 - 会导致 Consumer 的提交频率下降 - Consumer 重启后,会有更多的消息被消费
异步提交:
-
KafkaConsumer#commitAsync()
while (true) { ConsumerRecords<String, String> records = consumer.poll(3_000); process(records); // 处理消息 consumer.commitAsync((offsets, exception) -> { if (exception != null) { handle(exception); } }); }
-
commitAsync出现问题不会⾃动重试
-
2.2.5 消费者位移管理
Kafka中,消费者根据消息的位移顺序消费消息。消费者的位移由消费者管理,可以存储于zookeeper中,也可以存储于Kafka主题__consumer_offsets中。Kafka提供了消费者API,让消费者可以管理⾃⼰的位移。
细节 | |
---|---|
API | public void assign(Collection |
说明 | 给当前消费者⼿动分配⼀系列主题分区。 ⼿动分配分区不⽀持增量分配,如果先前有分配分区,则该操作会覆盖之前的分配。 如果给出的主题分区是空的,则等价于调⽤unsubscribe⽅法。 ⼿动分配主题分区的⽅法不使⽤消费组管理功能。当消费组成员变了,或者集群或主题的元数据改变了,不会触发分区分配的再平衡。 ⼿动分区分配assign(Collection)不能和⾃动分区分配subscribe(Collection,ConsumerRebalanceListener)⼀起使⽤。 如果启⽤了⾃动提交偏移量,则在新的分区分配替换旧的分区分配之前,会对旧的分区分配中的消费偏移量进⾏异步提交。 |
API | public Set |
说明 | 获取给当前消费者分配的分区集合。如果订阅是通过调⽤assign⽅法直接分配主题分区,则返回相同的集合。如果使⽤了主题订阅,该⽅法返回当前分配给该消费者的主题分区集合。如果分区订阅还没开始进⾏分区分配,或者正在重新分配分区,则会返回none。 |
API | public Map<String,List |
说明 | 获取对⽤户授权的所有主题分区元数据。该⽅法会对服务器发起远程调⽤。 |
API | public List |
说明 | 获取指定主题的分区元数据。如果当前消费者没有关于该主题的元数据,就会对服务器发起远程调⽤。 |
API | public Map<TopicPartition,Long> beginningOffsets(Collection |
说明 | 对于给定的主题分区,列出它们第⼀个消息的偏移量。 注意,如果指定的分区不存在,该⽅法可能会永远阻塞。该⽅法不改变分区的当前消费者偏移量。 |
API | public void seekToEnd(Collection |
说明 | 将偏移量移动到每个给定分区的最后⼀个。 该⽅法延迟执⾏,只有当调⽤过poll⽅法或position⽅法之后才可以使⽤。 如果没有指定分区,则将当前消费者分配的所有分区的消费者偏移量移动到最后。 如果设置了隔离级别为:isolation.level=read_committed,则会将分区的消费偏移量移动到最后⼀个稳定的偏移量,即下⼀个要消费的消息现在还是未提交状态的事务消息。 |
API | public void seek(TopicPartitionpartition,longoffset) |
说明 | 将给定主题分区的消费偏移量移动到指定的偏移量,即当前消费者下⼀条要消费的消息偏移量。若该⽅法多次调⽤,则最后⼀次的覆盖前⾯的。 如果在消费中间随意使⽤,可能会丢失数据。 |
API | public long position(TopicPartitionpartition) |
说明 | 检查指定主题分区的消费偏移量 |
API | public void seekToBeginning(Collection |
说明 | 将给定每个分区的消费者偏移量移动到它们的起始偏移量。该⽅法懒执⾏,只有当调⽤过poll⽅法或 position⽅法之后才会执⾏。如果没有提供分区,则将所有分配给当前消费者的分区消费偏移量移动到起始偏移量。 |
消费者位移管理操作示例: kafka-demo#kafka-client-demo#demo06**(https://gitee.com/ixinglan/kafka-demo.git)
2.2.6 再均衡
重平衡可以说是kafka为⼈诟病最多的⼀个点了。
重平衡其实就是⼀个协议,它规定了如何让消费者组下的所有消费者来分配topic中的每⼀个分区。⽐如⼀个topic 有100个分区,⼀个消费者组内有20个消费者,在协调者的控制下让组内每⼀个消费者分配到5个分区,这个分配的过程就是重平衡。
重平衡的触发条件主要有三个:
- 消费者组内成员发⽣变更,这个变更包括了增加和减少消费者,⽐如消费者宕机退出消费组。
- 主题的分区数发⽣变更,kafka⽬前只⽀持增加分区,当增加的时候就会触发重平衡
- 订阅的主题发⽣变化,当消费者组使⽤正则表达式订阅主题,⽽恰好⼜新建了对应的主题,就会触发重平衡
为什么说重平衡为⼈诟病呢?因为重平衡过程中,消费者⽆法从kafka消费消息,这对kafka的TPS影响极⼤,⽽如果kafka集群内节点较多,⽐如数百个,那重平衡可能会耗时极多。数分钟到数⼩时都有可能,⽽这段时间kafka基本处于不可⽤状态。所以在实际环境中,应该尽量避免重平衡发⽣。
避免重平衡:
-
要说完全避免重平衡,是不可能,因为你⽆法完全保证消费者不会故障。⽽消费者故障其实也是最常⻅的引发重平衡的地⽅,所以我们需要保证尽⼒避免消费者故障。
-
⽽其他⼏种触发重平衡的⽅式,增加分区,或是增加订阅的主题,抑或是增加消费者,更多的是主动控制。
-
如果消费者真正挂掉了,就没办法了,但实际中,会有⼀些情况,kafka错误地认为⼀个正常的消费者已经挂掉了,我们要的就是避免这样的情况出现。
⾸先要知道哪些情况会出现错误判断挂掉的情况。
在分布式系统中,通常是通过⼼跳来维持分布式系统的,kafka也不例外。
在分布式系统中,由于⽹络问题你不清楚没接收到⼼跳,是因为对⽅真正挂了还是只是因为负载过重没来得及发⽣⼼跳或是⽹络堵塞。所以⼀般会约定⼀个时间,超时即判定对⽅挂了。⽽在kafka消费者场景中,session.timout.ms参数就是规定这个超时时间是多少。
还有⼀个参数,heartbeat.interval.ms,这个参数控制发送⼼跳的频率,频率越⾼越不容易被误判,但也会消耗更多资源。
此外,还有最后⼀个参数,max.poll.interval.ms,消费者poll数据后,需要⼀些处理,再进⾏拉取。如果两次拉取时间间隔超过这个参数设置的值,那么消费者就会被踢出消费者组。也就是说,拉取,然后处理,这个处理的时间不能超过max.poll.interval.ms这个参数的值。这个参数的默认值是5分钟,⽽如果消费者接收到数据后会执⾏耗时的操作,则应该将其设置得⼤⼀些。
三个参数,
- session.timout.ms 控制⼼跳超时时间,
- heartbeat.interval.ms 控制⼼跳发送频率
- max.poll.interval.ms控制poll的间隔。
这⾥给出⼀个相对较为合理的配置,如下:
- session.timout.ms:设置为6s
- heartbeat.interval.ms:设置2s
- max.poll.interval.ms:推荐为消费者处理消息最⻓耗时再加1分钟
2.2.7 消费者拦截器
消费者在拉取了分区消息之后,要⾸先经过反序列化器对key和value进⾏反序列化处理。
处理完之后,如果消费端设置了拦截器,则需要经过拦截器的处理之后,才能返回给消费者应⽤程序进⾏处理。
消费端定义消息拦截器,需要实现org.apache.kafka.clients.consumer.ConsumerInterceptor<K,V>
- ⼀个可插拔接⼝,允许拦截甚⾄更改消费者接收到的消息。⾸要的⽤例在于将第三⽅组件引⼊消费者应⽤程序,⽤于定制的监控、⽇志处理等。
- 该接⼝的实现类通过configre⽅法获取消费者配置的属性,如果消费者配置中没有指定clientID,还可以获取KafkaConsumer⽣成的clientId。获取的这个配置是跟其他拦截器共享的,需要保证不会在各个拦截器之间产⽣冲突。
- ConsumerInterceptor⽅法抛出的异常会被捕获、记录,但是不会向下传播。如果⽤户配置了错误的key或
- value类型参数,消费者不会抛出异常,⽽仅仅是记录下来。
- ConsumerInterceptor回调发⽣在org.apache.kafka.clients.consumer.KafkaConsumer#poll(long)⽅法同⼀个线程。
消费者拦截器示例: kafka-demo#kafka-client-demo#demo07**(https://gitee.com/ixinglan/kafka-demo.git)
2.2.8 消费者参数配置补充
bootstrap.servers: 建⽴到Kafka集群的初始连接⽤到的host/port列表。客户端会使⽤这⾥指定的所有的host/port来建⽴初始连接。这个配置仅会影响发现集群所有节点的初始连接。形式:host1:port1,host2:port2...这个配置中不需要包含集群中所有的节点信息。最好不要配置⼀个,以免配置的这个节点宕机的时候连不上。
group.id: ⽤于定义当前消费者所属的消费组的唯⼀字符串。如果使⽤了消费组的功能 subscribe(topic) ,或使⽤了基于Kafka的偏移量管理机制,则应该配置group.id。
...
参考官方文档: https://kafka.apachecn.org/documentation.html#consumerconfigs
2.3 消费组管理
2.3.1 概念
consumer group是kafka提供的可扩展且具有容错性的消费者机制。
-
消费组有⼀个或多个消费者,消费者可以是⼀个进程,也可以是⼀个线程
-
group.id是⼀个字符串,唯⼀标识⼀个消费组
-
消费组订阅的主题每个分区只能分配给消费组⼀个消费者。
2.3.2 消费者位移
每个消费组保存⾃⼰的位移信息,那么只需要简单的⼀个整数表示位置就够了;同时可以引⼊checkpoint机制定期持久化
2.3.3 位移管理
-
自动/手动
默认自动,也可手动.另外kafka会定期把group消费情况保存起来,做成⼀个offset map,如下图所示:
-
位移提交
位移是提交到Kafka中的 __consumer_offsets 主题。 __consumer_offsets 中的消息保存了每个消费组某⼀时刻提交的offset信息。
__consumers_offsets 主题配置了compact策略,使得它总是能够保存最新的位移信息,既控制了该topic总体的⽇志容量,也能实现保存最新offset的⽬的
2.3.4 再谈再均衡
-
概念
再均衡(Rebalance)本质上是⼀种协议,规定了⼀个消费组中所有消费者如何达成⼀致来分配订阅主题的每个分区
⽐如某个消费组有20个消费组,订阅了⼀个具有100个分区的主题。正常情况下,Kafka平均会为每个消费者分配5个分区。这个分配的过程就叫再均衡。
-
再均衡时机
-
组成员发⽣变更(新消费者加⼊消费组组、已有消费者主动离开或崩溃了)
-
订阅主题数发⽣变更。如果正则表达式进⾏订阅,则新建匹配正则表达式的主题触发再均衡。
-
订阅主题的分区数发⽣变更
-
-
如何进行组内分区分配
三种分配策略:RangeAssignor和RoundRobinAssignor以及StickyAssignor
-
谁来执行再均衡和消费组管理
Kafka提供了⼀个⻆⾊:Group Coordinator来执⾏对于消费组的管理。
Group Coordinator——每个消费组分配⼀个消费组协调器⽤于组管理和位移管理。当消费组的第⼀个消费者启动的时候,它会去和Kafka Broker确定谁是它们组的组协调器。之后该消费组内所有消费者和该组协调器协调通信
-
如何确定coordinator
1. 确定消费组位移信息写⼊ __consumers_offsets 的哪个分区。具体计算公式: __consumers_offsets partition# = Math.abs(groupId.hashCode() % groupMetadataTopicPartitionCount) 注意:groupMetadataTopicPartitionCount 由 offsets.topic.num.partitions 指定,默认是50个分区。 2. 该分区leader所在的broker就是组协调器。
-
Rebalance Generation
它表示Rebalance之后主题分区到消费组中消费者映射关系的⼀个版本,主要是⽤于保护消费组,隔离⽆效偏移量提交的。如上⼀个版本的消费者⽆法提交位移到新版本的消费组中,因为映射关系变了,你消费的或许已经不是原来的那个分区了。每次group进⾏Rebalance之后,Generation号都会加1,表示消费组和分区的映射关系到了⼀个新版本,如下图所示:Generation1时group有3个成员,随后成员2退出组,消费组协调器触发Rebalance,消费组进⼊Generation2,之后成员4加⼊,再次触发Rebalance,消费组进⼊Generation3.
-
协议
kafka提供了5个协议来处理与消费组协调相关的问题:组协调器在再均衡的时候主要⽤到了前⾯4种请求。
Heartbeat请求:consumer需要定期给组协调器发送⼼跳来表明⾃⼰还活着 LeaveGroup请求:主动告诉组协调器我要离开消费组 SyncGroup请求:消费组Leader把分配⽅案告诉组内所有成员 JoinGroup请求:成员请求加⼊组 DescribeGroup请求:显示组的所有信息,包括成员信息,协议名称,分配⽅案,订阅信息等。 通常该请求是给管理员使⽤
-
liveness
消费者如何向消费组协调器证明⾃⼰还活着?通过定时向消费组协调器发送Heartbeat请求。如果超过了设定的超时时间,那么协调器认为该消费者已经挂了。⼀旦协调器认为某个消费者挂了,那么它就会开启新⼀轮再均衡,并且在当前其他消费者的⼼跳响应中添加“REBALANCE_IN_PROGRESS”,告诉其他消费者:重新分配分区。
-
再均衡过程
再均衡分为2步:Join和Sync
1. Join, 加⼊组。所有成员都向消费组协调器发送JoinGroup请求,请求加⼊消费组。⼀旦所有成员都发送了 JoinGroup请求,协调器从中选择⼀个消费者担任Leader的⻆⾊,并把组成员信息以及订阅信息发给 Leader。 2. Sync,Leader开始分配消费⽅案,即哪个消费者负责消费哪些主题的哪些分区。⼀旦完成分配,Leader会 将这个⽅案封装进SyncGroup请求中发给消费组协调器,⾮Leader也会发SyncGroup请求,只是内容为空。
消费组协调器接收到分配⽅案之后会把⽅案塞进SyncGroup的response中发给各个消费者。
-
消费组状态机
消费组组协调器根据状态机对消费组做不同的处理:
1.Dead:组内已经没有任何成员的最终状态,组的元数据也已经被组协调器移除了。这种状态响应各种请求都 是⼀个response:UNKNOWN_MEMBER_ID 2.Empty:组内⽆成员,但是位移信息还没有过期。这种状态只能响应JoinGroup请求3.PreparingRebalance: 组准备开启新的rebalance,等待成员加⼊ 4.AwaitingSync:正在等待leaderconsumer将分配⽅案传给各个成员 5.Stable:再均衡完成,可以开始消费。
3. 主题
参数配置参照官方文档: https://kafka.apachecn.org/documentation.html#topicconfigs
3.1 主题操作示例
-
创建主题
kafka-topics.sh --zookeeper localhost:2181/myKafka --create --topic topic_x -- partitions 1 --replication-factor 1 kafka-topics.sh --zookeeper localhost:2181/myKafka --create --topic topic_test_02 -- partitions 3 --replication-factor 1 --config max.message.bytes=1048576 --config segment.bytes=10485760
-
查看主题
kafka-topics.sh --zookeeper localhost:2181/myKafka --list kafka-topics.sh --zookeeper localhost:2181/myKafka --describe --topic topic_x kafka-topics.sh --zookeeper localhost:2181/myKafka --topics-with-overrides --describe
-
修改主题
kafka-topics.sh --zookeeper localhost:2181/myKafka --create --topic topic_test_01 -- partitions 2 --replication-factor 1 kafka-topics.sh --zookeeper localhost:2181/myKafka --alter --topic topic_test_01 -- config max.message.bytes=1048576 kafka-topics.sh --zookeeper localhost:2181/myKafka --describe --topic topic_test_01 kafka-topics.sh --zookeeper localhost:2181/myKafka --alter --topic topic_test_01 -- config segment.bytes=10485760 kafka-topics.sh --zookeeper localhost:2181/myKafka --alter --delete-config max.message.bytes --topic topic_test_01
-
删除主题
kafka-topics.sh --zookeeper localhost:2181/myKafka --delete --topic topic_x
3.2 增加分区
通过命令⾏⼯具操作,主题的分区只能增加,不能减少。否则报错
kafka-topics.sh --zookeeper localhost/myKafka --alter --topic myTop1 --partitions 2
3.3 分区副本的分配
副本分配的三个目标:
1. 均衡地将副本分散于各个broker上
2. 对于某个broker上分配的分区,它的其他副本在其他broker上
3. 如果所有的broker都有机架信息,尽量将分区的各个副本分配到不同机架上的broker。
在不考虑架构信息的情况下:
1. 第⼀个副本分区通过轮询的⽅式挑选⼀个broker,进⾏分配。该轮询从broker列表的随机位置进⾏轮询。
2. 其余副本通过增加偏移进⾏分配。
3.4 必要参数配置
参考: https://kafka.apachecn.org/documentation.html#topicconfigs
3.5 KafkaAdminClient应用
除了使⽤Kafka的bin⽬录下的脚本⼯具来管理Kafka,还可以使⽤管理Kafka的API将某些管理查看的功能集成到系统中。在Kafka0.11.0.0版本之前,可以通过kafka-core包(Kafka的服务端,采⽤Scala编写)中的AdminClient和AdminUtils来实现部分的集群管理操作。Kafka0.11.0.0之后,⼜多了⼀个AdminClient,在kafka-client包下,⼀个抽象类,具体的实现是org.apache.kafka.clients.admin.KafkaAdminClient
。
包含以下方法等:
其内部原理是使⽤Kafka⾃定义的⼀套⼆进制协议来实现,详细可以参⻅Kafka协议。
用到的参数:
属性 | 说明 |
---|---|
bootstrap.servers | 向Kafka集群建⽴初始连接⽤到的host/port列表。 客户端会使⽤这⾥列出的所有服务器进⾏集群其他服务器的发现,⽽不管是否指定了哪个服务器⽤作引导。 这个列表仅影响⽤来发现集群所有服务器的初始主机。 字符串形式:host1:port1,host2:port2,… 由于这组服务器仅⽤于建⽴初始链接,然后发现集群中的所有服务器,因此没有必要将集群中的所有地址写在这⾥。 ⼀般最好两台,以防其中⼀台宕掉。 |
client.id | ⽣产者发送请求的时候传递给broker的id字符串。 ⽤于在broker的请求⽇志中追踪什么应⽤发送了什么消息。⼀般该id是跟业务有关的字符串。 |
connections.max.idle.ms | 当连接空闲时间达到这个值,就关闭连接。long型数据,默认:300000 |
receive.buffer.bytes | TCP接收缓存(SO_RCVBUF),如果设置为-1,则使⽤操作系统默认的值。int类型值,默认65536,可选值:[-1,…] |
request.timeout.ms | 客户端等待服务端响应的最⼤时间。如果该时间超时,则客户端要么重新发起请求,要么如果重试耗尽,请求失败。int类型值,默认:120000 |
security.protocol | 跟broker通信的协议:PLAINTEXT,SSL,SASL_PLAINTEXT,SASL_SSL.string类型值,默认:PLAINTEXT |
send.buffer.bytes | ⽤于TCP发送数据时使⽤的缓冲⼤⼩(SO_SNDBUF),-1表示使⽤OS默认的缓冲区⼤⼩。 int类型值,默认值:131072 |
reconnect.backoff.max.ms | 对于每个连续的连接失败,每台主机的退避将成倍增加,直⾄达到此最⼤值。在计算退避增量之后,添加20%的随机抖动以避免连接⻛暴。 long型值,默认1000,可选值:[0,…] |
reconnect.backoff.ms | 重新连接主机的等待时间。避免了重连的密集循环。该等待时间应⽤于该客户端到broker的所有连接。 long型值,默认:50 |
retries | Themaximumnumberoftimestoretryacallbeforefailingit.重试的次数,达到此值,失败。 int类型值,默认5。 |
retry.backoff.ms | 在发⽣失败的时候如果需要重试,则该配置表示客户端等待多⻓时间再发起重试。该时间的存在避免了密集循环。 long型值,默认值:100。 |
示例代码: kafka-demo#kafka-client-demo#demo08**(https://gitee.com/ixinglan/kafka-demo.git)
3.6 偏移量管理
-
通过 kafka-consumer-groups.sh 查询(可通过help查看说明)
-
查看有哪些group id正在消费
kafka-consumer-groups.sh --bootstrap-server node1:9092 --list
-
这⾥⾯是没有指定 topic,查看的是所有topic消费者的 group.id 的列表。
-
注意: 重名的 group.id 只会显示⼀次
-
-
查看指定group.id的消费者消费情况
kafka-consumer-groups.sh --bootstrap-server node1:9092 --describe --group group --to-earliest 偏移量设置为最早的 --to-latest 偏移量设置为最新的 .....可参考官网文档
-
-
通过代码查看
示例代码: demo09, 可通过在broker端配合修改偏移量来查看消费效果
4. 分区
4.1 副本机制
当集群中的⼀个broker宕机后系统可以⾃动故障转移到其他可⽤的副本上,不会造成数据丢失
--replication-factor 3 1leader+2follower
1. 将复制因⼦为1的未复制主题称为复制主题。
2. 主题的分区是复制的最⼩单元。
3. 在⾮故障情况下,Kafka中的每个分区都有⼀个Leader副本和零个或多个Follower副本。
4. 包括Leader副本在内的副本总数构成复制因⼦。
5. 所有读取和写⼊都由Leader副本负责。
6. 通常,分区⽐broker多,并且Leader分区在broker之间平均分配。
Follower分区像普通的Kafka消费者⼀样,消费来⾃Leader分区的消息,并将其持久化到⾃⼰的⽇志中。允许Follower对⽇志条⽬拉取进⾏批处理
Kafka提供的保证是,只要有⾄少⼀个同步副本处于活动状态,提交的消息就不会丢失。
宕机如何恢复:
-
少部分副本宕机
当leader宕机了,会从follower选择⼀个作为leader。当宕机的重新恢复时,会把之前commit的数据清空,重新从leader⾥pull数据。
-
全部副本宕机
1、等待ISR中的⼀个恢复后,并选它作为leader。(等待时间较⻓,降低可⽤性) 2、选择第⼀个恢复的副本作为新的leader,⽆论是否在ISR中。(并未包含之前leader commit的数据,因此造成 数据丢失)
4.2 Leader选举
⽣产者和消费者的请求都由Leader副本来处理。Follower副本只负责消费Leader副本的数据和Leader保持同步。
Leader副本和Follower副本之间的关系并不是固定不变的,在Leader所在的broker发⽣故障的时候,就需要进⾏分区的Leader副本和Follower副本之间的切换,需要选举Leader副本
-
如何选举
如果某个分区所在的服务器除了问题,不可⽤,kafka会从该分区的其他的副本中选择⼀个作为新的Leader。之后所有的读写就会转移到这个新的Leader上。现在的问题是应当选择哪个作为新的Leader。
只有那些跟Leader保持同步的Follower才应该被选作新的Leader。
Kafka会在Zookeeper上针对每个Topic维护⼀个称为ISR(in-sync replica,已同步的副本)的集合,该集合中是⼀些分区的副本。
只有当这些副本都跟Leader中的副本同步了之后,kafka才会认为消息已提交,并反馈给消息的⽣产者。如果这个集合有增减,kafka会更新zookeeper上的记录。
如果某个分区的Leader不可⽤,Kafka就会从ISR集合中选择⼀个副本作为新的Leader。显然通过ISR,kafka需要的冗余度较低,可以容忍的失败数⽐较⾼。
假设某个topic有N+1个副本,kafka可以容忍N个服务器不可⽤。
-
为什么不用少数服从多数的方法
少数服从多数是⼀种⽐较常⻅的⼀致性算发和Leader选举法。它的含义是只有超过半数的副本同步了,系统才会认为数据已同步; 选择Leader时也是从超过半数的同步的副本中选择。这种算法需要较⾼的冗余度,跟Kafka⽐起来,浪费资源。
譬如只允许⼀台机器失败,需要有三个副本;⽽如果只容忍两台机器失败,则需要五个副本。⽽kafka的ISR集合⽅法,分别只需要两个和三个副本。
-
如果所有的ISR副本都失败了怎么办
1.等待ISR集合中的副本复活, 2.选择任何⼀个⽴即可⽤的副本,⽽这个副本不⼀定是在ISR集合中。 需设置 unclean.leader.election.enable=true
这两种⽅法各有利弊,实际⽣产中按需选择。
如果要等待ISR副本复活,虽然可以保证⼀致性,但可能需要很⻓时间。⽽如果选择⽴即可⽤的副本,则很可能该副本并不⼀致。
总结:
Kafka中Leader分区选举,通过维护⼀个动态变化的ISR集合来实现,⼀旦Leader分区丢掉,则从ISR中随机挑选⼀个副本做新的Leader分区。
4.3 分区重新分配
向已经部署好的Kafka集群⾥⾯添加机器,我们需要从已经部署好的Kafka节点中复制相应的配置⽂件,然后把⾥⾯的broker id修改成全局唯⼀的,最后启动这个节点即可将它加⼊到现有Kafka集群中。
问题:新添加的Kafka节点并不会⾃动地分配数据,⽆法分担集群的负载,除⾮我们新建⼀个topic。
需要⼿动将部分分区移到新添加的Kafka节点上,Kafka内部提供了相关的⼯具来重新分布某个topic的分区。
使用过程介绍: 这里简单介绍下 kafka-reassign-partitions.sh的使用
-
新建一个节点,安装kafka(后面的集群化会讲)
连接同一个zookeeper
-
使用kafka-reassign-partitions.sh工具,其有3种使用模式
1、generate模式,给定需要重新分配的Topic,⾃动⽣成reassign plan(并不执⾏) 2、execute模式,根据指定的reassign plan重新分配Partition 3、verify模式,验证重新分配Partition是否成功
假设原 node1有5个分区: 0, 1, 2, 3, 4
现添加新broker: node2
-
借助kafka-reassign-partitions.sh⼯具⽣成reassign plan, 不过我们先得按照要求定义⼀个⽂件,⾥⾯说明哪些topic需要重新分区,⽂件内容如下:
# topics-to-move.json { "topics":[ { "topic":"tp_re_01" } ], "version":1 }
生成reassign plan:
kafka-reassign-partitions.sh --zookeeper node1:2181/myKafka --topicsto-move-json-file topics-to-move.json --broker-list "0,1" --generate Current partition replica assignment {"version":1,"partitions":[{"topic":"tp_re_01","partition":4,"replicas":[0],"log_dirs": ["any"]},{"topic":"tp_re_01","partition":1,"replicas":[0],"log_dirs":["any"]}, {"topic":"tp_re_01","partition":2,"replicas":[0],"log_dirs":["any"]}, {"topic":"tp_re_01","partition":3,"replicas":[0],"log_dirs":["any"]}, {"topic":"tp_re_01","partition":0,"replicas":[0],"log_dirs":["any"]}]} Proposed partition reassignment configuration {"version":1,"partitions":[{"topic":"tp_re_01","partition":4,"replicas":[0],"log_dirs": ["any"]},{"topic":"tp_re_01","partition":1,"replicas":[1],"log_dirs":["any"]}, {"topic":"tp_re_01","partition":2,"replicas":[0],"log_dirs":["any"]}, {"topic":"tp_re_01","partition":3,"replicas":[1],"log_dirs":["any"]}, {"topic":"tp_re_01","partition":0,"replicas":[0],"log_dirs":["any"]}]}
Proposed partition reassignment configuration下⾯⽣成的就是将分区重新分布到broker 1上的结果。我们将这些内容保存到名为result.json⽂件⾥⾯(⽂件名不重要,⽂件格式也不⼀定要以json为结尾,只要保证内容是json即可),然后执⾏这些reassign plan
-
执行计划
kafka-reassign-partitions.sh --zookeeper node1:2181/myKafka -- reassignment-json-file topics-to-execute.json --execute Current partition replica assignment {"version":1,"partitions":[{"topic":"tp_re_01","partition":4,"replicas":[0],"log_dirs": ["any"]},{"topic":"tp_re_01","partition":1,"replicas":[0],"log_dirs":["any"]}, {"topic":"tp_re_01","partition":2,"replicas":[0],"log_dirs":["any"]}, {"topic":"tp_re_01","partition":3,"replicas":[0],"log_dirs":["any"]}, {"topic":"tp_re_01","partition":0,"replicas":[0],"log_dirs":["any"]}]}
-
校验验reassign plan是否执⾏完成
kafka-reassign-partitions.sh --zookeeper node1:2181/myKafka -- reassignment-json-file topics-to-execute.json --verify Status of partition reassignment: Reassignment of partition tp_re_01-1 completed successfully Reassignment of partition tp_re_01-4 completed successfully Reassignment of partition tp_re_01-2 completed successfully Reassignment of partition tp_re_01-3 completed successfully Reassignment of partition tp_re_01-0 completed successfully
-
使⽤ kafka-reassignpartitions.sh ⼯具⽣成的reassign plan只是⼀个建议,⽅便⼤家⽽已。其实我们⾃⼰完全可以编辑⼀个reassign plan,然后执⾏它
{ "version":1, "partitions":[ { "topic":"tp_re_01", "partition":4, "replicas":[ 1 ], "log_dirs":[ "any" ] }, { "topic":"tp_re_01", "partition":1, "replicas":[ 0 ], "log_dirs":[ "any" ] }, { "topic":"tp_re_01", "partition":2, "replicas":[ 0 ], "log_dirs":[ "any" ] }, { "topic":"tp_re_01", "partition":3, "replicas":[ 1 ], "log_dirs":[ "any" ] }, { "topic":"tp_re_01", "partition":0, "replicas":[ 0 ], "log_dirs":[ "any" ] } ] }
-
4.4 自动再均衡
我们可以在新建主题的时候,⼿动指定主题各个Leader分区以及Follower分区的分配情况,即什么分区副本在哪个broker节点上。
随着系统的运⾏,broker的宕机重启,会引发Leader分区和Follower分区的⻆⾊转换,最后可能Leader⼤部分都集中在少数⼏台broker上,由于Leader负责客户端的读写操作,此时集中Leader分区的少数⼏台服务器的⽹络I/O,CPU,以及内存都会很紧张。
Leader和Follower的⻆⾊转换会引起Leader副本在集群中分布的不均衡,此时我们需要⼀种⼿段,让Leader的分布重新恢复到⼀个均衡的状态。
例: 创建了主题tp_demo_03,有三个分区,每个分区两个副本,Leader副本在列表中第⼀个指定的brokerId上,Follower副本在随后指定的brokerId上。
kafka-topics.sh--zookeepernode1:2181/myKafka--create--topic tp_demo_03--replica-assignment"0:1,1:0,0:1"
然后模拟broker0宕机的情况:杀掉kafka进程, 查看分区信息
重新启动kafka,查看分区信息
结论: broker恢复了,但是Leader的分配并没有变化,还是处于Leader切换后的分配情况。
是否有⼀种⽅式,可以让Kafka⾃动帮我们进⾏修改?改为初始的副本分配?
此时,⽤到了Kafka提供的⾃动再均衡脚本: kafka-preferred-replica-election.sh
该⼯具会让每个分区的Leader副本分配在合适的位置,让Leader分区和Follower分区在服务器之间均衡分配
如果该脚本仅指定zookeeper地址,则会对集群中所有的主题进⾏操作,⾃动再平衡
# 1.创建preferred-replica.json
{
"partitions":[
{
"topic":"tp_demo_03",
"partition":0
},
{
"topic":"tp_demo_03",
"partition":1
},
{
"topic":"tp_demo_03",
"partition":2
}
]
}
# 2.执行操作
kafka-preferred-replica-election.sh --zookeeper
node1:2181/myKafka --path-to-json-file preferred-replicas.json
# 3.查看操作结果
kafka-topics.sh --zookeeper node1:2181/myKafka --describe --topic tp_demo_03
# 恢复到最初的分配情况
4.5 修改分区副本
实际项⽬中,我们可能由于主题的副本因⼦设置的问题,需要重新设置副本因⼦或者由于集群的扩展,需要重新设置副本因⼦。topic⼀旦使⽤⼜不能轻易删除重建,因此动态增加副本因⼦就成为最终的选择。
说明:kafka1.0版本配置⽂件默认没有default.replication.factor=x,因此如果创建topic时,不指定–replication-factor值,默认副本因⼦为1.
我们可以在⾃⼰的server.properties中配置上常⽤的副本因⼦,省去⼿动调整。例如设置default.replication.factor=3,详细内容可参考官⽅⽂档https://kafka.apache.org/documentation/#r eplication
例:
创建主题
kafka-topics.sh --zookeepernode1:2181/myKafka --create --topic tp_re_02 --partitions 3 --replication-factor 1
查看主题细节
kafka-topics.sh --zookeeper node1:2181/myKafka --describe --topic tp_re_02 Topic:tp_re_02 PartitionCount:3 ReplicationFactor:1 Configs: Topic: tp_re_02 Partition: 0 Leader: 1 Replicas: 1 Isr: 1 Topic: tp_re_02 Partition: 1 Leader: 0 Replicas: 0 Isr: 0 Topic: tp_re_02 Partition: 2 Leader: 1 Replicas: 1 Isr: 1
使⽤ kafka-reassign-partitions.sh 修改副本因⼦
# 创建建increment-replication-factor.json { "version":1, "partitions":[ { "topic":"tp_re_02", "partition":0, "replicas":[ 0, 1 ] }, { "topic":"tp_re_02", "partition":1, "replicas":[ 0, 1 ] }, { "topic":"tp_re_02", "partition":2, "replicas":[ 1, 0 ] } ] }
执行分配
kafka-reassign-partitions.sh --zookeeper node1:2181/myKafka --reassignment-json-file increase-replication-factor.json --execute {"version":1,"partitions":[{"topic":"tp_re_02","partition":2,"replicas": [1,0],"log_dirs":["any","any"]},{"topic":"tp_re_02","partition":1,"replicas": [0,1],"log_dirs":["any","any"]},{"topic":"tp_re_02","partition":0,"replicas": [0,1],"log_dirs":["any","any"]}]}
查看主题细节
kafka-topics.sh --zookeeper node1:2181/myKafka --describe --topic tp_re_02 Topic:tp_re_02 PartitionCount:3 ReplicationFactor:2 Configs: Topic: tp_re_02 Partition: 0 Leader: 1 Replicas: 0,1 Isr: 1,0 Topic: tp_re_02 Partition: 1 Leader: 0 Replicas: 0,1 Isr: 0,1 Topic: tp_re_02 Partition: 2 Leader: 1 Replicas: 1,0 Isr: 1,0
4.6 分区分配策略
在Kafka中,每个Topic会包含多个分区,默认情况下⼀个分区只能被⼀个消费组下⾯的⼀个消费者消费,这⾥就产⽣了分区分配的问题。Kafka中提供了多重分区分配算法(PartitionAssignor)的实现:RangeAssignor、RoundRobinAssignor、StickyAssignor。
- RangeAssignor
PartitionAssignor接⼝⽤于⽤户定义实现分区分配算法,以实现Consumer之间的分区分配。
消费组的成员订阅它们感兴趣的Topic并将这种订阅关系传递给作为订阅组协调者的Broker。协调者选择其中的⼀个消费者来执⾏这个消费组的分区分配并将分配结果转发给消费组内所有的消费者。Kafka默认采⽤RangeAssignor 的分配算法。
RangeAssignor对每个Topic进⾏独⽴的分区分配。对于每⼀个Topic,⾸先对分区按照分区ID进⾏数值排序,然后订阅这个Topic的消费组的消费者再进⾏字典排序,之后尽量均衡的将分区分配给消费者。这⾥只能是尽量均衡,因为分区数可能⽆法被消费者数量整除,那么有⼀些消费者就会多分配到⼀些分区。
大致算法如下:
assign(topic, consumers) {
// 对分区和Consumer进⾏排序
List<Partition> partitions = topic.getPartitions();
sort(partitions);
sort(consumers);
// 计算每个Consumer分配的分区数
int numPartitionsPerConsumer = partition.size() / consumers.size();
// 额外有⼀些Consumer会多分配到分区
int consumersWithExtraPartition = partition.size() % consumers.size();
// 计算分配结果
for (int i = 0, n = consumers.size(); i < n; i++) {
// 第i个Consumer分配到的分区的index
int start = numPartitionsPerConsumer * i + Math.min(i,
consumersWithExtraPartition);
// 第i个Consumer分配到的分区数
int length = numPartitionsPerConsumer + (i + 1 > consumersWithExtraPartition ? 0
: 1);
// 分装分配结果
assignment.get(consumersForTopic.get(i)).addAll(partitions.subList(start, start
+ length));
}
}
RangeAssignor策略的原理是按照消费者总数和分区总数进⾏整除运算来获得⼀个跨度,然后将分区按照跨度进⾏平均分配,以保证分区尽可能均匀地分配给所有的消费者。对于每⼀个Topic,RangeAssignor策略会将消费组内所有订阅这个Topic的消费者按照名称的字典序排序,然后为每个消费者划分固定的分区范围,如果不够平均分配,那么字典序靠前的消费者会被多分配⼀个分区。
这种分配⽅式明显的⼀个问题是随着消费者订阅的Topic的数量的增加,不均衡的问题会越来越严重,⽐如上图中4个分区3个消费者的场景,C0会多分配⼀个分区。如果此时再订阅⼀个分区数为4的Topic,那么C0⼜会⽐C1、C2多分配⼀个分区,这样C0总共就⽐C1、C2多分配两个分区了,⽽且随着Topic的增加,这个情况会越来越严重。
字典序靠前的消费组中的消费者⽐较“贪婪”:
-
RoundRobinAssignor
RoundRobinAssignor的分配策略是将消费组内订阅的所有Topic的分区及所有消费者进⾏排序后尽量均衡的分配(RangeAssignor是针对单个Topic的分区进⾏排序分配的)。如果消费组内,消费者订阅的Topic列表是相同的(每个消费者都订阅了相同的Topic),那么分配结果是尽量均衡的(消费者之间分配到的分区数的差值不会超过1)。如果订阅的Topic列表是不同的,那么分配结果是不保证“尽量均衡”的,因为某些消费者不参与⼀些Topic的分配。
相对于RangeAssignor,在订阅多个Topic的情况下,RoundRobinAssignor的⽅式能消费者之间尽量均衡的分配到分区(分配到的分区数的差值不会超过1——RangeAssignor的分配策略可能随着订阅的Topic越来越多,差值越来越⼤)。
对于消费组内消费者订阅Topic不⼀致的情况:假设有两个个消费者分别为C0和C1,有2个TopicT1、T2,分别拥有3和2个分区,并且C0订阅T1和T2,C1订阅T2,那么RoundRobinAssignor的分配结果如下:
看上去分配已经尽量的保证均衡了,不过可以发现C0承担了4个分区的消费⽽C1订阅了T2⼀个分区
-
StickyAssignor
尽管RoundRobinAssignor已经在RangeAssignor上做了⼀些优化来更均衡的分配分区,但是在⼀些情况下依旧会产⽣严重的分配偏差,⽐如消费组中订阅的Topic列表不相同的情况下。
更核⼼的问题是⽆论是RangeAssignor,还是RoundRobinAssignor,当前的分区分配算法都没有考虑上⼀次的分配结果。显然,在执⾏⼀次新的分配之前,如果能考虑到上⼀次分配的结果,尽量少的调整分区分配的变动,显然是能节省很多开销的。
从字⾯意义上看,Sticky是“粘性的”,可以理解为分配结果是带“粘性的”:
1.分区的分配尽量的均衡 2.每⼀次重分配的结果尽量与上⼀次分配结果保持⼀致 当这两个⽬标发⽣冲突时,优先保证第⼀个⽬标。第⼀个⽬标是每个分配算法都尽量尝试去完成的, ⽽第⼆个⽬标才真正体现出StickyAssignor特性的。
例1:
有3个Consumer:C0、C1、C2
有4个Topic:T0、T1、T2、T3,每个Topic有2个分区
所有Consumer都订阅了这4个分区
-
StickyAssignor的分配结果如下图所示:
-
如果消费者1宕机
则按照RoundRobin的⽅式分配结果如下:
按照Sticky的⽅式:仅对消费者1分配的分区进⾏重分配,红线部分。最终达到均衡的⽬的
例2:
有3个Consumer:C0、C1、C2
3个Topic:T0、T1、T2,它们分别有1、2、3个分区
C0订阅T0;C1订阅T0、T1;C2订阅T0、T1、T2
-
分配结果如图
-
消费者0下线,则按照轮询的⽅式分配:
-
按照Sticky⽅式分配分区,仅仅需要动的就是红线部分,其他部分不动:
-
-
自定义分配策略
⾃定义的分配策略必须要实现org.apache.kafka.clients.consumer.internals.PartitionAssignor接⼝。
PartitionAssignor接⼝中定义了两个内部类:Subscription和Assignment。
Subscription类⽤来表示消费者的订阅信息,类中有两个属性:topics和userData,分别表示消费者所订阅topic 列表和⽤户⾃定义信息。PartitionAssignor接⼝通过subscription()⽅法来设置消费者⾃身相关的Subscription信息,注意到此⽅法中只有⼀个参数topics,与Subscription类中的topics的相互呼应,但是并没有有关userData的参数体现。为了增强⽤户对分配结果的控制,可以在subscription()⽅法内部添加⼀些影响分配的⽤户⾃定义信息赋予userData,⽐如:权重、ip地址、host或者机架(rack)等等。
Assignment类,它是⽤来表示分配结果信息的,类中也有两个属性:partitions和userData,分别表示所分配到的分区集合和⽤户⾃定义的数据。可以通过PartitionAssignor接⼝中的onAssignment()⽅法是在每个消费者收到消费组leader分配结果时的回调函数,例如在StickyAssignor策略中就是通过这个⽅法保存当前的分配⽅案,以备在下次消费组再平衡(rebalance)时可以提供分配参考依据
接⼝中的name()⽅法⽤来提供分配策略的名称,对于Kafka提供的3种分配策略⽽⾔,RangeAssignor对应的 protocol_name为“range”,RoundRobinAssignor对应的protocol_name为“roundrobin”,StickyAssignor对应的 protocol_name为“sticky”,所以⾃定义的分配策略中要注意命名的时候不要与已存在的分配策略发⽣冲突。这个命名⽤来标识分配策略的名称,在后⾯所描述的加⼊消费组以及选举消费组leader的时候会有涉及。
真正的分区分配⽅案的实现是在assign()⽅法中,⽅法中的参数metadata表示集群的元数据信息,⽽subscriptions表示消费组内各个消费者成员的订阅信息,最终⽅法返回各个消费者的分配信息。
Kafka中还提供了⼀个抽象类org.apache.kafka.clients.consumer.internals.AbstractPartitionAssignor,它可以简化PartitionAssignor接⼝的实现,对assign()⽅法进⾏了实现,其中会将Subscription中的userData信息去掉后,在进⾏分配。Kafka提供的3种分配策略都是继承⾃这个抽象类。如果开发⼈员在⾃定义分区分配策略时需要使⽤ userData信息来控制分区分配的结果,那么就不能直接继承AbstractPartitionAssignor这个抽象类,⽽需要直接实现PartitionAssignor接⼝。
public class MyAssignor extends AbstractPartitionAssignor { }
5. 物理存储
5.1 日志存储概述
Kafka 消息是以主题为单位进⾏归类,各个主题之间是彼此独⽴的,互不影响。每个主题⼜可以分为⼀个或多个分区。每个分区各⾃存在⼀个记录消息数据的⽇志⽂件
对应的每个Parition下存在⼀个 [Topic Parition] 命名的消息⽇志⽂件。在理想情况下,数据流量分摊到各个 Parition 中,实现了负载均衡的效果。在分区⽇志⽂件中,你会发现很多类型的⽂件,⽐如: .index、.timestamp、.log、.snapshot 等。
-
LogSegment
1. 分区⽇志⽂件中包含很多的 LogSegment 2. Kafka ⽇志追加是顺序写⼊的 3. LogSegment 可以减⼩⽇志⽂件的⼤⼩ 4. 进⾏⽇志删除的时候和数据查找的时候可以快速定位。 5. ActiveLogSegment 是活跃的⽇志分段,拥有⽂件拥有写⼊权限,其余的 LogSegment 只有只读的权限。
⽇志⽂件存在多种后缀⽂件,重点需要关注 .index、.timestamp、.log 三种类型
每个 LogSegment 都有⼀个基准偏移量,表示当前 LogSegment 中第⼀条消息的 offset。
偏移量是⼀个 64 位的⻓整形数,固定是20位数字,⻓度未达到,⽤ 0 进⾏填补,索引⽂件和⽇志⽂件都由该作为⽂件名命名规则(00000000000000000000.index、00000000000000000000.timestamp、00000000000000000000.log)。
如果⽇志⽂件名为 00000000000000000121.log ,则当前⽇志⽂件的⼀条数据偏移量就是 121(偏移量从 0 开始)
日志与索引文件配置项:
偏移量索引⽂件⽤于记录消息偏移量与物理地址之间的映射关系。
时间戳索引⽂件则根据时间戳查找对应的偏移量。
Kafka 中的索引⽂件是以稀疏索引的⽅式构造消息的索引,并不保证每⼀个消息在索引⽂件中都有对应的索引项。
每当写⼊⼀定量的消息时,偏移量索引⽂件和时间戳索引⽂件分别增加⼀个偏移量索引项和时间戳索引项。
通过修改 log.index.interval.bytes 的值,改变索引项的密度。
-
切分文件
当满⾜如下⼏个条件中的其中之⼀,就会触发⽂件的切分:
-
当前⽇志分段⽂件的⼤⼩超过了 broker 端参数 log.segment.bytes 配置的值。 log.segment.bytes 参数的默认值为 1073741824,即 1GB。
-
当前⽇志分段中消息的最⼤时间戳与当前系统的时间戳的差值⼤于 log.roll.ms 或 log.roll.hours 参数配置的值。如果同时配置了 log.roll.ms 和 log.roll.hours 参数,那么 log.roll.ms 的优先级⾼。默认情况下,只配置了 log.roll.hours 参数,其值为168,即 7 天。
-
偏移量索引⽂件或时间戳索引⽂件的⼤⼩达到 broker 端参数 log.index.size.max.bytes 配置的值。 log.index.size.max.bytes 的默认值为 10485760,即 10MB。
-
追加的消息的偏移量与当前⽇志分段的偏移量之间的差值⼤于 Integer.MAX_VALUE ,即要追加的消息的偏移量不能转变为相对偏移量。
为什么是 Integer.MAX_VALUE
1024 * 1024 * 1024=1073741824
在偏移量索引⽂件中,每个索引项共占⽤ 8 个字节,并分为两部分。
相对偏移量和物理地址
相对偏移量:表示消息相对与基准偏移量的偏移量,占 4 个字节
物理地址:消息在⽇志分段⽂件中对应的物理位置,也占 4 个字节
4 个字节刚好对应 Integer.MAX_VALUE ,如果⼤于 Integer.MAX_VALUE ,则不能⽤ 4 个字节进⾏表示了
-
-
索引⽂件切分过程
索引⽂件会根据 log.index.size.max.bytes 值进⾏预先分配空间,即⽂件创建的时候就是最⼤值
当真正的进⾏索引⽂件切分的时候,才会将其裁剪到实际数据⼤⼩的⽂件。
这⼀点是跟⽇志⽂件有所区别的地⽅。其意义降低了代码逻辑的复杂性。
5.2 日志存储
-
索引
偏移量索引⽂件⽤于记录消息偏移量与物理地址之间的映射关系。时间戳索引⽂件则根据时间戳查找对应的偏移量。
消息存储:
1. 消息内容保存在log⽇志⽂件中。 2. 消息封装为Record,追加到log⽇志⽂件末尾,采⽤的是顺序写模式。 3. ⼀个topic的不同分区,可认为是queue,顺序写⼊接收到的消息。
-
偏移量
1. 位置索引保存在index⽂件中 2. log⽇志默认每写⼊4K(log.index.interval.bytes设定的),会写⼊⼀条索引信息到index⽂件中,因此索引 ⽂件是稀疏索引,它不会为每条⽇志都建⽴索引信息。 3. log⽂件中的⽇志,是顺序写⼊的,由message+实际offset+position组成 4. 索引⽂件的数据结构则是由相对offset(4byte)+position(4byte)组成,由于保存的是相对第⼀个消息的 相对offset,只需要4byte就可以了,可以节省空间,在实际查找后还需要计算回实际的offset,这对⽤户是 透明的。
稀疏索引,索引密度不⾼,但是offset有序,⼆分查找的时间复杂度为O(lgN),如果从头遍历时间复杂度是O(N)。
-
时间戳
在偏移量索引⽂件中,索引数据都是顺序记录 offset ,但时间戳索引⽂件中每个追加的索引时间戳必须⼤于之前 追加的索引项,否则不予追加。在 Kafka 0.11.0.0 以后,消息信息中存在若⼲的时间戳信息。如果 broker 端参数 log.message.timestamp.type 设置为 LogAppendTIme ,那么时间戳必定能保持单调增⻓。反之如果是 CreateTime 则⽆法保证顺序。
通过时间戳⽅式进⾏查找消息,需要通过查找时间戳索引和偏移量索引两个⽂件。
时间戳索引索引格式:前⼋个字节表示时间戳,后四个字节表示偏移量。
-
-
清理
Kafka 提供两种⽇志清理策略:
-
⽇志删除:按照⼀定的删除策略,将不满⾜条件的数据进⾏数据删除
-
⽇志压缩:针对每个消息的 Key 进⾏整合,对于有相同 Key 的不同 Value 值,只保留最后⼀个版本。
Kafka 提供 log.cleanup.policy 参数进⾏相应配置,默认值: delete ,还可以选择 compact
简单了解吧, 卷不动了………..
-
5.3 磁盘存储
-
零拷贝
kafka⾼性能,是多⽅⾯协同的结果,包括宏观架构、分布式partition存储、ISR数据同步、以及“⽆所不⽤其极”的⾼效利⽤磁盘/操作系统特性。
零拷⻉并不是不需要拷⻉,⽽是减少不必要的拷⻉次数。通常是说在IO读写过程中
传统⽅式实现:先读取、再发送,实际经过1~4四次copy。
1、第⼀次:将磁盘⽂件,读取到操作系统内核缓冲区; 2、第⼆次:将内核缓冲区的数据,copy到application应⽤程序的buffer; 3、第三步:将application应⽤程序buffer中的数据,copy到socket⽹络发送缓冲区(属于操作系统内核的缓冲区 4、第四次:将socketbuffer的数据,copy到⽹络协议栈,由⽹卡进⾏⽹络传输。
实际IO读写,需要进⾏IO中断,需要CPU响应中断(内核态到⽤户态转换),尽管引⼊DMA(Direct Memory Access,直接存储器访问)来接管CPU的中断请求,但四次copy是存在“不必要的拷⻉”的。
实际上并不需要第⼆个和第三个数据副本。数据可以直接从读缓冲区传输到套接字缓冲区。
kafka的两个过程:
- 1、⽹络数据持久化到磁盘(Producer到Broker)
- 2、磁盘⽂件通过⽹络发送(Broker到Consumer)
数据落盘通常都是⾮实时的,Kafka的数据并不是实时的写⼊硬盘,它充分利⽤了现代操作系统分⻚存储来利⽤内存提⾼I/O效率。
磁盘数据通过DMA(DirectMemoryAccess,直接存储器访问)拷⻉到内核态Buffer
直接通过DMA拷⻉到NICBuffer(socketbuffer),⽆需CPU拷⻉。
除了减少数据拷⻉外,整个读⽂件==>⽹络发送由⼀个sendfile调⽤完成,整个过程只有两次上下⽂切换,因此⼤⼤提⾼了性能。
-
页缓存
⻚缓存是操作系统实现的⼀种主要的磁盘缓存,以此⽤来减少对磁盘I/O的操作。具体来说,就是把磁盘中的数据缓存到内存中,把对磁盘的访问变为对内存的访问。
Kafka接收来⾃socketbuffer的⽹络数据,应⽤进程不需要中间处理、直接进⾏持久化时。可以使⽤mmap内存⽂件映射。
MemoryMappedFiles
简称mmap,简单描述其作⽤就是:将磁盘⽂件映射到内存,⽤户通过修改内存就能修改磁盘⽂件。 它的⼯作原理是直接利⽤操作系统的Page来实现磁盘⽂件到物理内存的直接映射。完成映射之后你对物理内存的操作会被同步到硬盘上(操作系统在适当的时候)。
通过mmap,进程像读写硬盘⼀样读写内存(当然是虚拟机内存)。使⽤这种⽅式可以获取很⼤的I/O提升,省去了⽤户空间到内核空间复制的开销。
mmap也有⼀个很明显的缺陷:不可靠,写到mmap中的数据并没有被真正的写到硬盘,操作系统会在程序主动调⽤flush的时候才把数据真正的写到硬盘。
Kafka提供了⼀个参数 producer.type 来控制是不是主动flush;
如果Kafka写⼊到mmap之后就⽴即flush然后再返回Producer叫同步(sync);写⼊mmap之后⽴即返回Producer不调⽤flush叫异步(async)。
-
顺序写入
操作系统可以针对线性读写做深层次的优化,⽐如预读(read-ahead,提前将⼀个⽐较⼤的磁盘块读⼊内存)和后写(write-behind,将很多⼩的逻辑写操作合并起来组成⼀个⼤的物理写操作)技术。
Kafka在设计时采⽤了⽂件追加的⽅式来写⼊消息,即只能在⽇志⽂件的尾部追加新的消息,并且也不允许修改已写⼊的消息,这种⽅式属于典型的顺序写盘的操作,所以就算Kafka使⽤磁盘作为存储介质,也能承载⾮常⼤的吞吐量。
mmap和sendfile:
1.Linux内核提供、实现零拷⻉的API; 2.sendfile是将读到内核空间的数据,转到socketbuffer,进⾏⽹络发送; 3.mmap将磁盘⽂件映射到内存,⽀持读和写,对内存的操作会反映在磁盘⽂件上。4.RocketMQ在消费消息时,使⽤了mmap。kafka使⽤了sendFile。
Kafka速度快是因为:
1.partition顺序读写,充分利⽤磁盘特性,这是基础; 2.Producer⽣产的数据持久化到broker,采⽤mmap⽂件映射,实现顺序的快速写⼊; 3.Customer从broker读取数据,采⽤sendfile,将磁盘⽂件读到OS内核缓冲区后,直接转到socketbuffer进⾏⽹络发送。
6. 稳定性
6.1 事务
事务场景:
1. 如producer发的多条消息组成⼀个事务这些消息需要对consumer同时可⻅或者同时不可⻅ 。
2. producer可能会给多个topic,多个partition发消息,这些消息也需要能放在⼀个事务⾥⾯,这就形成了⼀
个典型的分布式事务。
3. kafka的应⽤场景经常是应⽤先消费⼀个topic,然后做处理再发到另⼀个topic,这个
consume-transform-produce过程需要放到⼀个事务⾥⾯,⽐如在消息处理或者发送的过程中如果失败了,消费偏移量也不能提
交。
4. producer或者producer所在的应⽤可能会挂掉,新的producer启动以后需要知道怎么处理之前未完成的事
务 。
关键概念和推导
1. 因为producer发送消息可能是分布式事务,所以引⼊了常⽤的2PC,所以有事务协调者(Transaction
Coordinator)。Transaction Coordinator和之前为了解决脑裂和惊群问题引⼊的Group Coordinator在选举
和failover上⾯类似。
2. 事务管理中事务⽇志是必不可少的,kafka使⽤⼀个内部topic来保存事务⽇志,这个设计和之前使⽤内部
topic保存偏移量的设计保持⼀致。事务⽇志是Transaction Coordinator管理的状态的持久化,因为不需要
回溯事务的历史状态,所以事务⽇志只⽤保存最近的事务状态。
3. 因为事务存在commit和abort两种操作,⽽客户端⼜有read committed和read uncommitted两种隔离级
别,所以消息队列必须能标识事务状态,这个被称作Control Message。
4. producer挂掉重启或者漂移到其它机器需要能关联的之前的未完成事务所以需要有⼀个唯⼀标识符来进⾏关
联,这个就是TransactionalId,⼀个producer挂了,另⼀个有相同TransactionalId的producer能够接着处
理这个事务未完成的状态。kafka⽬前没有引⼊全局序,所以也没有transaction id,这个TransactionalId是
⽤户提前配置的。
5. TransactionalId能关联producer,也需要避免两个使⽤相同TransactionalId的producer同时存在,所以引
⼊了producer epoch来保证对应⼀个TransactionalId只有⼀个活跃的producer epoch
事务语义
-
多分区原子写入
事务能够保证Kafka topic下每个分区的原⼦写⼊。事务中所有的消息都将被成功写⼊或者丢弃。
⾸先,我们来考虑⼀下原⼦读取-处理-写⼊周期是什么意思。简⽽⾔之,这意味着如果某个应⽤程序在某个topic tp0的偏移量X处读取到了消息A,并且在对消息A进⾏了⼀些处理(如B=F(A))之后将消息B写⼊topic tp1,则只有当消息A和B被认为被成功地消费并⼀起发布,或者完全不发布时,整个读取过程写⼊操作是原⼦的。
现在,只有当消息A的偏移量X被标记为已消费,消息A才从topic tp0消费,消费到的数据偏移量(record offset)将被标记为提交偏移量(Committing offset)。在Kafka中,我们通过写⼊⼀个名为offsets topic的内部Kafka topic来记录offset commit。消息仅在其offset被提交给offsets topic时才被认为成功消费。
由于offset commit只是对Kafka topic的另⼀次写⼊,并且由于消息仅在提交偏移量时被视为成功消费,所以跨多个主题和分区的原⼦写⼊也启⽤原⼦读取-处理-写⼊循环:提交偏移量X到offset topic和消息B到tp1的写⼊将是单个事务的⼀部分,所以整个步骤都是原⼦的。
-
粉碎"僵尸实例"
我们通过为每个事务Producer分配⼀个称为transactional.id的唯⼀标识符来解决僵⼫实例的问题。在进程重新启动时能够识别相同的Producer实例
API要求事务性Producer的第⼀个操作应该是在Kafka集群中显示注册transactional.id。当注册的时候,Kafka broker⽤给定的transactional.id检查打开的事务并且完成处理。Kafka也增加了⼀个与transactional.id相关的epoch。Epoch存储每个transactional.id内部元数据。
⼀旦epoch被触发,任何具有相同的transactional.id和旧的epoch的⽣产者被视为僵⼫,Kafka拒绝来⾃这些⽣产者的后续事务性写⼊。
简⽽⾔之:Kafka可以保证Consumer最终只能消费⾮事务性消息或已提交事务性消息。它将保留来⾃未完成事务的消息,并过滤掉已中⽌事务的消息。
-
事务消息定义
1. 原子性:消费者的应用程序不应暴露于==未提交事务==的消息中。 2. 持久性:Broker不能丢失任何已提交的事务。 3. 排序:事务消费者应在每个分区中以原始顺序查看事务消息。 4. 交织:每个分区都应该能够接收来自事务性生产者和非事务生产者的消息 5. 事务中不应有重复的消息。
如果允许事务性和非事务性消息的交织,则非事务性和事务性消息的相对顺序将基于附加(对于非事务性消息)和最终提交(对于事务性消息)的相对顺序
事务配置:
1、创建消费者代码,需要
将配置中的⾃动提交属性(auto.commit)进⾏关闭
⽽且在代码⾥⾯也不能使⽤⼿动提交commitSync()或者commitAsync()
设置isolation.level
2、创建⽣成者,代码如下,需要:
配置transactional.id属性
配置enable.idempotence属性
事务工作原理:
-
事务协调器和事务⽇志
事务协调器是每个Kafka内部运⾏的⼀个模块。事务⽇志是⼀个内部的主题。每个协调器拥有事务⽇志所在分区的⼦集,即,这些borker中的分区都是Leader.
每个transactional.id都通过⼀个简单的哈希函数映射到事务⽇志的特定分区,事务⽇志⽂件__transaction_state-0。这意味着只有⼀个Broker拥有给定的transactional.id。
通过这种⽅式,我们利⽤Kafka可靠的复制协议和Leader选举流程来确保事务协调器始终可⽤,并且所有事务状态都能够持久化。
值得注意的是,事务⽇志只保存事务的最新状态⽽不是事务中的实际消息。消息只存储在实际的Topic的分区中。事务可以处于诸如“Ongoing”,“prepare commit”和“Completed”之类的各种状态中。正是这种状态和关联的元数据存储在事务⽇志中。
-
事务数据流
数据流在抽象层⾯上有四种不同的类型。
-
producer和事务coordinator的交互
执⾏事务时,Producer向事务协调员发出如下请求 1. initTransactions API向coordinator注册⼀个transactional.id。 此时,coordinator使⽤该transactional.id 关闭所有待处理的事务,并且会避免遇到僵⼫实例,由具有相同的transactional.id的Producer的另⼀个实 例启动的任何事务将被关闭和隔离。每个Producer会话只发⽣⼀次。 2. 当Producer在事务中第⼀次将数据发送到分区时,⾸先向coordinator注册分区。 3. 当应⽤程序调⽤commitTransaction或abortTransaction时,会向coordinator发送⼀个请求以开始两阶段提交协议。
-
Coordinator和事务⽇志交互
随着事务的进⾏,Producer发送上⾯的请求来更新Coordinator上事务的状态。事务Coordinator会在内存中保存 每个事务的状态,并且把这个状态写到事务⽇志中(这是以三种⽅式复制的,因此是持久保存的)。 事务Coordinator是读写事务⽇志的唯⼀组件。如果⼀个给定的Borker故障了,⼀个新的Coordinator会被选为新 的事务⽇志的Leader,这个事务⽇志分割了这个失效的代理,它从传⼊的分区中读取消息并在内存中重建状态。
-
Producer将数据写⼊⽬标Topic所在分区
在Coordinator的事务中注册新的分区后,Producer将数据正常地发送到真实数据所在分区。这与producer.send 流程完全相同,但有⼀些额外的验证,以确保Producer不被隔离。
-
Topic分区和Coordinator的交互
1. 在Producer发起提交(或中⽌)之后,协调器开始两阶段提交协议。 2. 在第⼀阶段,Coordinator将其内部状态更新为“prepare_commit”并在事务⽇志中更新此状态。⼀旦完成了 这个事务,⽆论发⽣什么事,都能保证事务完成。 3. Coordinator然后开始阶段2,在那⾥它将事务提交标记写⼊作为事务⼀部分的Topic分区。 4. 这些事务标记不会暴露给应⽤程序,但是在read_committed模式下被Consumer使⽤来过滤掉被中⽌事务 的消息,并且不返回属于开放事务的消息(即那些在⽇志中但没有事务标记与他们相关联)。 5. ⼀旦标记被写⼊,事务协调器将事务标记为“完成”,并且Producer可以开始下⼀个事务。
-
事务相关配置:
-
broker configs
配置项 说明 transactional.id.timeout.ms 在ms中,事务协调器在⽣产者TransactionalId提前过期之前等待的最⻓时间,并且没有从该⽣产者TransactionalId接收到任何事务状态更新。默认是604800000(7天)。这允许每周⼀次的⽣产者作业维护它们的id max.transaction.timeout.ms 事务允许的最⼤超时。如果客户端请求的事务时间超过此时间,broke将在InitPidRequest中返回InvalidTransactionTimeout错误。这可以防⽌客户机超时过⼤,从⽽导致⽤户⽆法从事务中包含的主题读取内容。
默认值为900000(15分钟)。这是消息事务需要发送的时间的保守上限。transaction.state.log.replication.factor 事务状态topic的副本数量。默认值:3 transaction.state.log.num.partitions 事务状态主题的分区数。默认值:50 transaction.state.log.min.isr 事务状态主题的每个分区ISR最⼩数量。默认值:2 transaction.state.log.segment.bytes 事务状态主题的segment⼤⼩。默认值:104857600字节 -
producer configs
配置项 说明 enable.idempotence 开启幂等 transaction.timeout.ms 事务超时时间
事务协调器在主动中⽌正在进⾏的事务之前等待⽣产者更新事务状态的最⻓时间。这个配置值将与InitPidRequest⼀起发送到事务协调器。如果该值⼤于
max.transaction.timeout。在broke中设置ms时,请求将失败,并出现
InvalidTransactionTimeout错误。
默认是60000。这使得交易不会阻塞下游消费超过⼀分钟,这在实时应⽤程序中通常是允许的。transactional.id ⽤于事务性交付的TransactionalId。这⽀持跨多个⽣产者会话的可靠性语义,因为它允许客户端确保使⽤相同TransactionalId的事务在启动任何新事务之前已经完成。如果没有提供TransactionalId,则⽣产者仅限于幂等交付。 -
consumer configs
配置项 说明 isolation.level -read_uncommitted:以偏移顺序使⽤已提交和未提交的消息。
-read_committed:仅以偏移量顺序使⽤⾮事务性消息或已提交事务性消息。为了维护偏移排序,这个设置意味着我们必须在使⽤者中缓冲消息,直到看到给定事务中的所有消息。
幂等性
Kafka在引⼊幂等性之前,Producer向Broker发送消息,然后Broker将消息追加到消息流中后给Producer返回Ack信号值。⽣产中,会出现各种不确定的因素
上图这种情况,当Producer第⼀次发送消息给Broker时,Broker将消息(x2,y2)追加到了消息流中,但是在返回Ack信号给Producer时失败了(⽐如⽹络异常)。此时,Producer端触发重试机制,将消息(x2,y2)重新发送给 Broker,Broker接收到消息后,再次将该消息追加到消息流中,然后成功返回Ack信号给Producer。这样下来,消息流中就被重复追加了两条相同的(x2,y2)的消息。
幂等性就是保证在消息重发的时候,消费者不会重复处理。即使在消费者收到重复消息的时候,重复处理,也要保证最终结果的⼀致性。
数学概念就是:f(f(x))=f(x)。f函数表示对消息的处理。
幂等性实现
添加唯⼀ID,类似于数据库的主键,⽤于唯⼀标记⼀个消息。Kafka为了实现幂等性,它在底层设计架构中引⼊了ProducerID和SequenceNumber
- ProducerID:在每个新的Producer初始化时,会被分配⼀个唯⼀的ProducerID,这个ProducerID对客户端使⽤者是不可⻅的。
- SequenceNumber:对于每个ProducerID,Producer发送数据的每个Topic和Partition都对应⼀个从0开始单调递增的SequenceNumber值。
同样,这是⼀种理想状态下的发送流程。实际情况下,会有很多不确定的因素,⽐如Broker在发送Ack信号给Producer时出现⽹络异常,导致发送失败。异常情况如下图所示:
当Producer发送消息(x2,y2)给Broker时,Broker接收到消息并将其追加到消息流中。此时,Broker返回Ack信号给Producer时,发⽣异常导致Producer接收Ack信号失败。对于Producer来说,会触发重试机制,将消息(x2,y2)再次发送,但是,由于引⼊了幂等性,在每条消息中附带了PID(ProducerID)和SequenceNumber。相同的PID和 SequenceNumber发送给Broker,⽽之前Broker缓存过之前发送的相同的消息,那么在消息流中的消息就只有⼀条 (x2,y2),不会出现重复发送的情况。
在org.apache.kafka.clients.producer.internals.Sender类中,在run()中有⼀个maybeWaitForPid()⽅法,⽤来⽣成⼀个ProducerID
事务操作:
在Kafka事务中,⼀个原⼦性操作,根据操作类型可以分为3种情况。情况如下:
- 只有Producer⽣产消息,这种场景需要事务的介⼊;
- 消费消息和⽣产消息并存,⽐如Consumer&Producer模式,这种场景是⼀般Kafka项⽬中⽐较常⻅的模式,需要事务介⼊;
- 只有Consumer消费消息,这种操作在实际项⽬中意义不⼤,和⼿动CommitOffsets的结果⼀样,⽽且这种场景不是事务的引⼊⽬的。
// 初始化事务,需要注意确保transation.id属性被分配
void initTransactions();
// 开启事务
void beginTransaction() throws ProducerFencedException;
// 为Consumer提供的在事务内Commit Offsets的操作
void sendOffsetsToTransaction(Map<TopicPartition, OffsetAndMetadata> offsets,
String consumerGroupId) throws ProducerFencedException;
// 提交事务
void commitTransaction() throws ProducerFencedException;
// 放弃事务,类似于回滚事务的操作
void abortTransaction() throws ProducerFencedException;
示例代码: demo10
6.2 控制器
Kafka集群包含若⼲个broker,broker.id指定broker的编号,编号不要重复。Kafka集群上创建的主题,包含若⼲个分区。每个分区包含若⼲个副本,副本因⼦包括了Follower副本和Leader副本。副本⼜分为ISR(同步副本分区)和OSR(⾮同步副本分区)。
控制器就是⼀个broker。控制器除了⼀般broker的功能,还负责Leader分区的选举。
broker选举
集群⾥第⼀个启动的broker在Zookeeper中创建临时节点
其他broker在该控制器节点创建Zookeeper watch对象,使⽤Zookeeper的监听机制接收该节点的变更
即:Kafka通过Zookeeper的分布式锁特性选举集群控制器
每个新选出的控制器通过 Zookeeper 的条件递增操作获得⼀个全新的、数值更⼤的 controller epoch。其他broker 在知道当前 controller epoch 后,如果收到由控制器发出的包含较旧epoch 的消息,就会忽略它们,以防⽌“脑裂”。
⽐如当⼀个Leader副本分区所在的broker宕机,需要选举新的Leader副本分区,有可能两个具有不同纪元数字的控制器都选举了新的Leader副本分区,如果选举出来的Leader副本分区不⼀样,听谁的?脑裂了。有了纪元数字,直接使⽤纪元数字最新的控制器结果。
结论:
1.Kafka使⽤Zookeeper的分布式锁选举控制器,并在节点加⼊集群或退出集群时通知控制器。
2.控制器负责在节点加⼊或离开集群时进⾏分区Leader选举。
3.控制器使⽤epoch来避免“脑裂”。“脑裂”是指两个节点同时认为⾃⼰是当前的控制器。
6.3 可靠性保证
1. 创建Topic的时候可以指定 --replication-factor 3 ,表示分区的副本数,不要超过broker的数量。
2. Leader是负责读写的节点,⽽其他副本则是Follower。Producer只把消息发送到Leader,Follower定期地
到Leader上Pull数据。
3. ISR是Leader负责维护的与其保持同步的Replica列表,即当前活跃的副本列表。如果⼀个Follow落后太多,
Leader会将它从ISR中移除。落后太多意思是该Follow复制的消息落后于Leader的条数超过预定值(参
数: replica.lag.max.messages 默认值:4000)或者Follow⻓时间没有向Leader发送fetch请求(参
数: replica.lag.time.max.ms 默认值:10000)。
4. 为了保证可靠性,可以设置 acks=all 。Follower收到消息后,会像Leader发送ACK。⼀旦Leader收到了
ISR中所有Replica的ACK,Leader就commit,那么Leader就向Producer发送ACK。
副本的分配
当某个topic的 –replication-factor 为N(N>1)时,每个Partition都有N个副本,称作replica。原则上是将replica均匀的分配到整个集群上。不仅如此,partition的分配也同样需要均匀分配,为了更好的负载均衡。
Leader的选举
-
基于zk的选举方式
缺点: split-brain。这是由ZooKeeper的特性引起的,虽然ZooKeeper能保证所有Watch按顺序触发,但并不能保 证同⼀时刻所有Replica“看”到的状态是⼀样的,这就可能造成不同Replica的响应不⼀致 herd effect。如果宕机的那个Broker上的Partition⽐较多,会造成多个Watch被触发,造成集群内⼤量的调 整 ZooKeeper负载过重。每个Replica都要为此在ZooKeeper上注册⼀个Watch,当集群规模增加到⼏千个 Partition时ZooKeeper负载会过重。
-
基于controller的选举方式
Kafka0.8后的LeaderElection⽅案解决了上述问题,它在所有broker中选出⼀个controller,所有Partition的 Leader选举都由controller决定。controller会将Leader的改变直接通过RPC的⽅式(⽐ZooKeeperQueue的⽅式更⾼效)通知需为为此作为响应的Broker。同时controller也负责增删Topic以及Replica的重新分配。
优点:极⼤缓解了HerdEffect问题、减轻了ZK的负载,Controller与Leader/Follower之间通过RPC通信,⾼效且实时
缺点:引⼊Controller增加了复杂度,且需要考虑Controller的Failover
失效副本:
当ISR中的一个Follower副本滞后Leader副本的时间超过参数 replica.lag.time.max.ms 指定的值时即判定为副本失效,需要将此Follower副本剔出除ISR。
副本复制:
日志复制算法(logreplicationalgorithm)必须提供的基本保证是,如果它告诉客户端消息已被提交,而当前Leader出现故障,新选出的Leader也必须具有该消息。在出现故障时,Kafka会从挂掉Leader的ISR里面选择一个Follower作为这个分区新的Leader。
每个分区的leader会维护一个in-syncreplica(同步副本列表,又称ISR)。当Producer向broker 发送消息,消息先写入到对应Leader分区,然后复制到这个分区的所有副本中。ACKS=ALL时,只有将消息成功复制到所有同步副本(ISR)后,这条消息才算被提交。
6.4 一致性保证
概念
-
水位标记
水位或水印(watermark)一词,表示位置信息,即位移(offset)。Kafka源码中使用的名字是高水位,HW(highwatermark)。
-
副本角色
Kafka分区使用多个副本(replica)提供高可用。
-
LEO和HW
每个分区副本对象都有两个重要的属性:LEO和HW。
-
LEO:即日志末端位移(log end offset),记录了该副本日志中下一条消息的位移值。如果LEO=10,那么表示该副本保存了10条消息,位移值范围是[0,9]。另外,Leader LEO和Follower LEO的更新是有区别的。
-
HW:即上面提到的水位值。对于同一个副本对象而言,其HW值不会大于LEO值。小于等于HW值的所有消息都被认为是“已备份”的(replicated)。Leader副本和Follower副本的HW更新不同。
-
上图中,HW值是7,表示位移是0~7的所有消息都已经处于“已提交状态”(committed),而LEO 值是14,8~13的消息就是未完全备份(fullyreplicated)——为什么没有14?LEO指向的是下一条消息到来时的位移。
-
Follower副本何时更新LEO
Follower副本不停地向Leader副本所在的broker发送FETCH请求,一旦获取消息后写入自己的日志中进行备份。那么Follower副本的LEO是何时更新的呢?首先我必须言明,Kafka有两套Follower副本LEO:
1.一套LEO保存在Follower副本所在Broker的副本管理机中;
2.另一套LEO保存在Leader副本所在Broker的副本管理机中。Leader副本机器上保存了所有的follower副本的LEO。
Kafka使用前者帮助Follower副本更新其HW值;利用后者帮助Leader副本更新其HW
Follower副本何时更新HW
Follower更新HW发生在其更新LEO之后,一旦Follower向Log写完数据,尝试更新自己的HW值。比较当前LEO值与FETCH响应中Leader的HW值,取两者的小者作为新的HW值。
即:如果Follower的LEO大于Leader的HW,FollowerHW值不会大于Leader的HW值
Leader副本何时更新LEO
和Follower更新LEO相同,Leader写Log时自动更新自己的LEO值。
Leader副本何时更新HW值
Leader的HW值就是分区HW值,直接影响分区数据对消费者的可见性。
1. Follower副本成为Leader副本时:Kafka会尝试去更新分区HW。
2. Broker崩溃导致副本被踢出ISR时:检查下分区HW值是否需要更新是有必要的。
3. 生产者向Leader副本写消息时:因为写入消息会更新Leader的LEO,有必要检查HW值是否需
要更新
4. Leader处理Follower FETCH请求时:首先从Log读取数据,之后尝试更新分区HW值
**结论:**当Kafka broker都正常工作时,分区HW值的更新时机有两个:
1. Leader处理PRODUCE请求时
2. Leader处理FETCH请求时。
Leader Epoch使用
Kafka从0.11引入了leaderepoch来取代HW值。Leader端使用内存保存Leader的epoch信息.
所谓Leader epoch实际上是一对值:<epoch,offset>:
1. epoch表示Leader的版本号,从0开始,Leader变更过1次,epoch+1
2. offset对应于该epoch版本的Leader写入第一条消息的offset。
<0, 0>
<1, 120>
则表示第一个Leader从位移0开始写入消息;共写了120条[0, 119];而第二个Leader版本号是1,从位移120处开始写入消息。
6.5 消息重复的场景解决方案
生产者阶段重复场景
生产发送的消息没有收到正确的broke响应,导致生产者重试
异常是RetriableException类型或者TransactionManager允许重试
记录顺序问题:
如果设置 max.in.flight.requests.per.connection 大于1(默认5,单个连接上发送的未确认
请求的最大数量,表示上一个发出的请求没有确认下一个请求又发出了)。大于1可能会改变记录的顺
序,因为如果将两个batch发送到单个分区,第一个batch处理失败并重试,但是第二个batch处理成
功,那么第二个batch处理中的记录可能先出现被消费
设置 max.in.flight.requests.per.connection 为1,可能会影响吞吐量,可以解决单个生产者
发送顺序问题。如果多个生产者,生产者1先发送一个请求,生产者2后发送请求,此时生产者1返回可
恢复异常,重试一定次数成功了。虽然生产者1先发送消息,但生产者2发送的消息会被先消费
生产者发送重复解决方案
要启动kafka的幂等性,设置: enable.idempotence=true ,以及 ack=all 以及 retries > 1
ack=0 不重试,可能会丢消息, 适用于吞吐量指标重要性高于数据丢失,例如:日志收集
生产者和broker阶段消息丢失场景
ack=0 不重试,消息丢失
ack=1,leader crash, 生产者发送消息完,只等待Leader写入成功就返回了,Leader分区丢失了,此时Follower没来及同步,消息丢失
unclean.leader.election.enable 配置true, 允许选举ISR以外的副本作为leader,会导致数据丢失,默认为false。生产者发送异步消息,只等待Leader写入成功就返回,Leader分区丢失,此时ISR中没有Follower,Leader从OSR中选举,因为OSR中本来落后于Leader造成消息丢失。
解决生产者和broke阶段消息丢失
禁用unclean选举,ack=all
配置:min.insync.replicas>1
当一起使用时, min.insync.replicas 和 ack 允许执行更大的持久性保证。一个典型的场景是创建一个复制因子为3的主题,设置min.insync复制到2个,用 all 配置发送。将确保如果大多数副本没有收到写操作,则生产者将引发异常
失败的offset单独记录
消费者数据重复场景及解决方案
原因: 据消费完没有及时提交offset到broker
场景: 消息消费端在消费过程中挂掉没有及时提交offset到broke,另一个消费端启动拿之前记录的offset 开始消费,由于offset的滞后性可能会导致新启动的客户端有少量重复消费。
解决方案: 取消自动提交, 下游做幂等
6.6 __consumer_offsets
Zookeeper不适合大批量的频繁写入操作。
Kafka1.0.2将consumer的位移信息保存在Kafka内部的topic中,即__consumer_offsets主题,并且默认提供了kafka_consumer_groups.sh脚本供用户查看consumer信息。
脚本不再做演示………
7. 延时队列
两个follower副本都已经拉取到了leader副本的最新位置,此时又向leader副本发送拉取请求,而leader副本并没有新的消息写入,那么此时leader副本该如何处理呢?可以直接返回空的拉取结果给 follower副本,不过在leader副本一直没有新消息写入的情况下,follower副本会一直发送拉取请求,并且总收到空的拉取结果,消耗资源。
Kafka在处理拉取请求时,会先读取一次日志文件,如果收集不到足够多(fetchMinBytes,由参数fetch.min.bytes配置,默认值为1)的消息,那么就会创建一个延时拉取操作(DelayedFetch)以等待拉取到足够数量的消息。当延时拉取操作执行时,会再读取一次日志文件,然后将拉取结果返回给follower副本。
延迟操作不只是拉取消息时的特有操作,在Kafka中有多种延时操作,比如延时数据删除、延时生产等。
对于延时生产(消息)而言,如果在使用生产者客户端发送消息的时候将acks参数设置为-1,那么就意味着需要等待ISR集合中的所有副本都确认收到消息之后才能正确地收到响应的结果,或者捕获超时异常。
假设某个分区有3个副本:leader、follower1和follower2,它们都在分区的ISR集合中。不考虑ISR 变动的情况,Kafka在收到客户端的生产请求后,将消息3和消息4写入leader副本的本地日志文件。
由于客户端设置了acks为-1,那么需要等到follower1和follower2两个副本都收到消息3和消息4后才能告知客户端正确地接收了所发送的消息。如果在一定的时间内,follower1副本或follower2副本没能够完全拉取到消息3和消息4,那么就需要返回超时异常给客户端。生产请求的超时时间由参数request.timeout.ms配置,默认值为30000,即30s。
那么这里等待消息3和消息4写入follower1副本和follower2副本,并返回相应的响应结果给客户端的动作是由谁来执行的呢?在将消息写入leader副本的本地日志文件之后,Kafka会创建一个延时的生产操作(DelayedProduce),用来处理消息正常写入所有副本或超时的情况,以返回相应的响应结果给客户端。
延时操作需要延时返回响应的结果,首先它必须有一个超时时间(delayMs),如果在这个超时时间内没有完成既定的任务,那么就需要强制完成以返回响应结果给客户端。其次,延时操作不同于定时操作,定时操作是指在特定时间之后执行的操作,而延时操作可以在所设定的超时时间之前完成,所以延时操作能够支持外部事件的触发。
8. 重试队列
kafka没有重试机制不支持消息重试,也没有死信队列,因此使用kafka做消息队列时,需要自己实现消息重试的功能。
1. 创建一个topic作为重试topic,用于接收等待重试的消息。
2. 普通topic消费者设置待重试消息的下一个重试topic。
3. 从重试topic获取待重试消息储存到redis的zset中,并以下一次消费时间排序
4. 定时任务从redis获取到达消费事件的消息,并把消息发送到对应的topic
5. 同一个消息重试次数过多则不再重试
示例代码: springboot-retry-queue-demo