消费方式
在消息队列中,消费者有两种方式消费消息:
PULL(拉取)模式
PUSH(推送)模式
在Kafka中,是PULL(拉取)模式,即消费者主动从Broker中拉取数据;而没有采取PUSH(推送)模式(Broker主动推送);因为由Broker决定消息发送速率,很难适应所有消费者的消费速率;PULL(拉取)模式的不足之处是,即使Broker中没有数据,消费者也可能会一直循环,轮询拉取数据。
整体过程
通过我们之前章节的讨论,其实对于整体的工作过程,已经非常清楚了。
生产者向每一个分区的Leader发送数据。
Follower主动和Leader同步数据。
消费者消费数据。
一个消费者可以消费一个或多个分区的数据。
每一个分区的数据,只能由消费者组中的一个消费者消费。即,不允许一个消费者组中的多个消费者,同时消费一个分区。
但可以由不同的消费者组进行消费。
消费者消费数据的offset
,维护在系统的主题__consumer_offsets
中。
0.9之前的版本,维护在Zookeeper中。即我们在《Kafka-3.Broker》 提到的Zookeeper的consumer
节点。这么设计的缺陷在于,消费者不但要和Broker进行通信,还要频繁的和Zookeeper进行通信。
消费者组
接下来,我们重点讨论一下消费者组。
概述
消费者组,Consumer Group(CG),由多个消费者组成,这些消费者的group.id
相同。
消费者组内每个消费者负责消费不同分区的数据。
一个分区只能由一个消费者组内的消费者进行消费。
消费者组之间互不影响。
例子:
例一 消费者组中只有一个消费者,这个消费者消费了所有的分区。 例二 消费者组中有两个消费者,第一个消费者消费了0和1,第二个消费者消费了2和3。 (可不可以第一个消费者消费0个2,第二个消费者消费1和3呢?可以,我们在下文讨论分区分配的时候会讨论怎么实现。) 例三 消费者组中有四个消费者,每个消费者消费一个分区。 例四 消费者组中有五个消费者,多于分区数,这时候会有一个闲置的消费者。 例五 有两个消费者组,消费者组之间的互不影响。
初始化
选择coordinator。
根据group.id
的哈希值对50(__consumer_offsets
的分区数量)求余。
例如,group.id
的哈希值= 1 =1 = 1 ,1 % 50 = 1 1\%50 = 1 1 % 5 0 = 1 ,那么就是__consumer_offsets
主题的1号分区,这个分区的Leader所在的机器,就是coordinator的地址,即就是被选择的coordinator。
(这个过程,和我们在《Kafka-2.Producer》 所讨论的生产者事务,有些类似。)
每个Consumer都发送join group
请求。
coordinator选择一个Consumer作为Leader。
coordinator把消费者组中的所有消费者要消费的topic情况,发送给Leader消费者。
Leader消费者制定消费方案(每个消费者到底消费哪个分区的方案)。
Leader消费者把其制定的消费方案发送给coordinator。
coordinator把消费方案发给每一个Consumer,然后Consumer开始消费。
我们还可以看看Kafka的log.dirs
的内容,可以看到__consumer_offsets
有多个分区。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 drwxr-xr-x 40 root root 4096 3月 6 22:23 ./ drwxr-xr-x 10 root root 4096 3月 3 10:43 ../ -rw-r--r-- 1 root root 0 3月 3 10:54 cleaner-offset-checkpoint drwxr-xr-x 2 root root 4096 3月 6 22:16 __consumer_offsets-0/ drwxr-xr-x 2 root root 4096 3月 6 22:16 __consumer_offsets-12/ drwxr-xr-x 2 root root 4096 3月 6 22:16 __consumer_offsets-15/ drwxr-xr-x 2 root root 4096 3月 6 22:16 __consumer_offsets-18/ drwxr-xr-x 2 root root 4096 3月 6 22:16 __consumer_offsets-21/ drwxr-xr-x 2 root root 4096 3月 6 22:16 __consumer_offsets-24/ drwxr-xr-x 2 root root 4096 3月 6 22:16 __consumer_offsets-27/ drwxr-xr-x 2 root root 4096 3月 6 22:16 __consumer_offsets-3/ drwxr-xr-x 2 root root 4096 3月 6 22:16 __consumer_offsets-30/ drwxr-xr-x 2 root root 4096 3月 6 22:16 __consumer_offsets-33/ drwxr-xr-x 2 root root 4096 3月 6 22:16 __consumer_offsets-36/ drwxr-xr-x 2 root root 4096 3月 6 22:16 __consumer_offsets-39/ drwxr-xr-x 2 root root 4096 3月 6 22:16 __consumer_offsets-42/ drwxr-xr-x 2 root root 4096 3月 6 22:16 __consumer_offsets-45/ drwxr-xr-x 2 root root 4096 3月 6 22:16 __consumer_offsets-48/ drwxr-xr-x 2 root root 4096 3月 6 22:16 __consumer_offsets-6/ drwxr-xr-x 2 root root 4096 3月 6 22:16 __consumer_offsets-9/ drwxr-xr-x 2 root root 4096 3月 6 18:28 first-0/ drwxr-xr-x 2 root root 4096 3月 3 19:13 first-1/ drwxr-xr-x 2 root root 4096 3月 6 18:14 first-2/ -rw-r--r-- 1 root root 0 3月 3 10:54 .lock -rw-r--r-- 1 root root 4 3月 6 22:23 log-start-offset-checkpoint -rw-r--r-- 1 root root 88 3月 6 18:14 meta.properties drwxr-xr-x 2 root root 4096 3月 6 22:16 myTopic-0/ -rw-r--r-- 1 root root 873 3月 6 22:23 recovery-point-offset-checkpoint -rw-r--r-- 1 root root 880 3月 6 22:23 replication-offset-checkpoint drwxr-xr-x 2 root root 4096 3月 6 22:16 __transaction_state-1/ drwxr-xr-x 2 root root 4096 3月 6 22:16 __transaction_state-10/ drwxr-xr-x 2 root root 4096 3月 6 22:16 __transaction_state-13/ drwxr-xr-x 2 root root 4096 3月 6 22:16 __transaction_state-16/ drwxr-xr-x 2 root root 4096 3月 6 22:16 __transaction_state-19/ drwxr-xr-x 2 root root 4096 3月 6 22:16 __transaction_state-22/ drwxr-xr-x 2 root root 4096 3月 6 22:16 __transaction_state-25/ drwxr-xr-x 2 root root 4096 3月 6 22:16 __transaction_state-28/ drwxr-xr-x 2 root root 4096 3月 6 22:16 __transaction_state-31/ drwxr-xr-x 2 root root 4096 3月 6 22:16 __transaction_state-34/ drwxr-xr-x 2 root root 4096 3月 6 22:16 __transaction_state-37/ drwxr-xr-x 2 root root 4096 3月 6 22:16 __transaction_state-4/ drwxr-xr-x 2 root root 4096 3月 6 22:16 __transaction_state-40/ drwxr-xr-x 2 root root 4096 3月 6 22:16 __transaction_state-43/ drwxr-xr-x 2 root root 4096 3月 6 22:16 __transaction_state-46/ drwxr-xr-x 2 root root 4096 3月 6 22:16 __transaction_state-49/ drwxr-xr-x 2 root root 4096 3月 6 22:16 __transaction_state-7/
消费过程
关于具体的消费过程,其实已经和消费者组的关系不大了,基本上都在消费者内部进行处理。
消费者内部会有一个对象fetcher
,这个对象会调用其成员方法sendFetches
,发送消费请求。
sendFetches
方法会创建FetchRequest
类型的对象。
在创建FetchRequest
类型的对象的过程中有三个重要参数:
fetch.min.bytes
:每次拉取的最小字节数,默认1个字节。
fetch.max.wait.ms
:每次拉取的最大等待时间,默认300ms。
fetch.max.bytes
:每次拉取的最大字节数。
之后会调用ConsumerNetworkClient的send(fetchRequest)
方法,该方法返回一个RequestFuture类型的对象。
RequestFuture会监听onSuccess
和onFailure
方法。
onSuccess
方法,会把完成的拉取(completedFetch
)保存在队列completedFetches
。
fetcher
调用fetchedRecords
,期间有一个相关参数max.poll.records
,每次最多拉取的条数。
经过parseRecord
(反序列化)、interceptors
(拦截器),最后进行数据处理。
重要参数
bootstrap.servers
向Kafka集群建立初始连接用到的(host/port)列表。
key.deserializer
和value.deserializer
指定接收消息的key和value的反序列化类型,一定要写全类名。
enable.auto.commit
消费者自动周期性地向服务器提交偏移量。 默认true
。
auto.commit.interval.ms
消费者向Kafka提交偏移量的频率,默认5s。
auto.offset.reset
当Kafka中没有初始偏移量或当前偏移量在服务器中不存在(例如,数据被删除了)。 发生这种情况的处理方法:
earliest
:自动重置偏移量到最早的偏移量。latest
:默认,自动重置偏移量为最新的偏移量。none
:如果消费组原来的(previous)偏移量不存在,则向消费者抛异常。
offsets.topic.num.partitions
__consumer_offsets
的分区数。 默认50个分区。
heartbeat.interval.ms
消费者和coordinator之间的心跳时间,默认3s。 根据一般的理解,该值肯定要小于session.timeout.ms
,实际上不应该高于session.timeout.ms
的1 3 \frac{1}{3} 3 1 。
session.timeout.ms
消费者和coordinator之间连接超时时间,如果超过该值,该消费者将被移除,消费者组执行再平衡。 默认45s。
max.poll.interval.ms
消费者处理消息的最大时长,如果超过该值,该消费者被移除,消费者组执行再平衡。 默认5分钟。
partition.assignment.strategy
Kafka有四种分区分配策略:
Range
RoundRobin
Sticky
CooperativeSticky
可以同时使用多个分区分配策略。 默认策略Range + CooperativeSticky
。
fetch.min.bytes
消费者获取服务器端一批消息最小的字节数。 默认1个字节。
fetch.max.wait.ms
消费者从服务端获取到数据的最大等待时间 默认500ms。
fetch.max.bytes
消费者从服务端获取一批消息最大的字节数。 但并不完全由这个决定,如果一条消息的数据量也大于该值,该条消息仍然能被拉去回去。 默认50MB。
max.poll.records
一次拉取数据的最大条数。 默认500条。
isolation.level
如果设置为read_committed
,那么消费者就会忽略生产者事务未提交的消息,即只能消费到LSO(Last Stable Offset)的位置; 默认为read_uncommitted
,可以消费到HW(High Watermark)处的位置。
消费消息
订阅主题
假设存在一个主题first
,有三个分区。
示例代码:
1 ./bin/kafka-topics.sh --bootstrap-server kafka-01:9092 --describe --topic first
运行结果:
1 2 3 4 Topic: first TopicId: ultePqLST-yGb88tTRarwA PartitionCount: 3 ReplicationFactor: 3 Configs: Topic: first Partition: 0 Leader: 1 Replicas: 3,1,2 Isr: 1,2,3 Topic: first Partition: 1 Leader: 1 Replicas: 1,2,3 Isr: 1,2,3 Topic: first Partition: 2 Leader: 2 Replicas: 2,3,1 Isr: 1,2,3
我们现在创建一个消费者组,这个消费者组中只有一个消费者,由这个消费者来消费主题first
。
示例代码:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 package com.kakawanyifan;import org.apache.kafka.clients.consumer.ConsumerConfig;import org.apache.kafka.clients.consumer.ConsumerRecord;import org.apache.kafka.clients.consumer.ConsumerRecords;import org.apache.kafka.clients.consumer.KafkaConsumer;import org.apache.kafka.common.serialization.StringDeserializer;import java.time.Duration;import java.util.ArrayList;import java.util.Properties;public class MyConsumer { public static void main (String[] args) { Properties properties = new Properties(); properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "kafka-01:9092" ); properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class .getName ()) ; properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG , StringDeserializer.class .getName ()) ; properties.put(ConsumerConfig.GROUP_ID_CONFIG, "test" ); KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<String, String>(properties); ArrayList<String> topics = new ArrayList<>(); topics.add("first" ); kafkaConsumer.subscribe(topics); while (true ) { ConsumerRecords<String, String> consumerRecords = kafkaConsumer.poll(Duration.ofSeconds(1 )); for (ConsumerRecord<String, String> consumerRecord : consumerRecords) { System.out.println(consumerRecord); } } } }
我们创建生产者,并发送数据qwert
。
运行结果:
1 ConsumerRecord(topic = first, partition = 2, leaderEpoch = 15, offset = 25, CreateTime = 1678101533106, serialized key size = -1, serialized value size = 5, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = qwert)
在这个例子中,我们指定了消费者组的ID,而且这个是必须指定的,否者会报错。 但是在《Kafka-1.初步认识》 的命令行中又没有指定,因为通过命令行启动消费者不填写消费者组ID的话,会被自动填写随机一个消费者组ID。
指定分区
示例代码:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 package com.kakawanyifan;import org.apache.kafka.clients.consumer.ConsumerConfig;import org.apache.kafka.clients.consumer.ConsumerRecord;import org.apache.kafka.clients.consumer.ConsumerRecords;import org.apache.kafka.clients.consumer.KafkaConsumer;import org.apache.kafka.common.TopicPartition;import org.apache.kafka.common.serialization.StringDeserializer;import java.time.Duration;import java.util.ArrayList;import java.util.Properties;public class MyConsumer { public static void main (String[] args) { Properties properties = new Properties(); properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "kafka-01:9092" ); properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class .getName ()) ; properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG , StringDeserializer.class .getName ()) ; properties.put(ConsumerConfig.GROUP_ID_CONFIG, "test" ); KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<String, String>(properties); ArrayList topicPartitions = new ArrayList<>(); topicPartitions.add(new TopicPartition("first" , 2 )); kafkaConsumer.assign(topicPartitions); while (true ) { ConsumerRecords<String, String> consumerRecords = kafkaConsumer.poll(Duration.ofSeconds(1 )); for (ConsumerRecord<String, String> consumerRecord : consumerRecords) { System.out.println(consumerRecord); } } } }
消费者组
多个消费者组成一个消费者组,要求他们的group.id
相同。
所以,我们需要, 创建多个消费者,配置相同的group.id
。
一个分区的数据,只能由消费者组中的一个消费者进行消费。
所以?我们还需要为每个消费者指定分区?一定不能重复?
不用这么麻烦,这部分是自动完成的。
这就是我们接下来要讨论的内容,分区分配。
分区分配
四种策略
Kafka有四种分区分配策略:
Range
RoundRobin
Sticky
CooperativeSticky
通过配置参数partition.assignment.strategy
,即可以修改分区的分配策略。
四种分区分配策略,实现的是同一个接口ConsumerPartitionAssignor
。
重平衡
需要注意的是,Kafka中,除了在启动的时候会进行分区的分配,在运行过程中,也会进行分区的分配,这个被称为重平衡。
触发重平衡的情况有:
消费者超时
每个消费者都会和coordinator保持心跳(默认3S),一旦超时(session.timeout.ms=45S),该消费者会被移除,触发重平衡。
消费者处理消息的时间过长(max.poll.interval.ms=5分钟),该消费者会被移除,触发重平衡。
消费者主动退出集群。
消费者主动取消订阅某个主题
新的消费者加入集群
分区数被动态扩充
消费者动态监听(消费)了新的主题
Range
分区策略
Range是对每一个topic而言的。
对同一个topic中的分区按照序号进行排序,并对消费者按照字母顺序(Kafka内部会对消费者组中每一个消费者进行随机的编号)进行排序。
例如,有7个分区,3个消费者,排序后的分区是0
,1
,2
,3
,4
,5
,6
;消费者排序完之后将会是C0
、C1
、C2
。
通过partitions数 consumer数 \frac{\text{partitions数}}{\text{consumer数}} consumer 数 partitions 数 来决定每个消费者应该消费几个分区,如果除不尽,那么前面几个消费者将会多消费1个分区。
例如,7 3 = 2 , 余 1 \frac{7}{3}=2,\text{余}1 3 7 = 2 , 余 1 ,那么排在前面的C0会多消费1个分区。如果有8个分区,8 3 = 2 , 余 2 \frac{8}{3}=2,\text{余}2 3 8 = 2 , 余 2 ,那么C0和C1分别多消费一个。
针对一个主题,排在前面的消费者,尤其是C0,都将多消费1个分区;如果主题特别特别多,C0消费的分区会比明显比其他消费者多
这种现象被称为 数据倾斜 。
设置方法
1 properties.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG, RangeAssignor.class .getName ()) ;
RoundRobin
分区策略
RoundRobin针对集群中所有Topic而言。
把所有的partition和所有的消费者都列出来,然后按照hashcode进行排序,最后通过轮询算法来分配partition给到各个消费者。
需要注意的是,是基于消费者组和组内成员的监听情况进行分配的。
例如,有三个成员c1
,c2
,c3
;c1
监听了t1
,c2
监听了t2
和t1
,c3
监听了t1
、t2
和t3
;t1
有1个分区,t2
有两个分区,t3
有两个分区;分配结果是:
c1
:t1p0
c2
:t2p0
c3
:t2p1
、t3p0
、t3p1
这样不是很均衡,如果我们将t2p1
分配给c2
显然会更均衡。
设置方法
1 properties.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG, RoundRobinAssignor.class .getName ()) ;
Sticky
分区策略
粘性分区,可以理解为分配的结果带有"粘性的",其分区目标是:
第一个目标:分区尽可能和上一次分配结果保持一致。
第二个目标:分配尽可能均匀,每个消费者消费的分区数最多相差1。
当两个目标发生冲突的时候,第一个目标优于第二个。
有些资料说,是 粘上就不动 ,这种说法肯定是欠妥的,是尽量不动。
例如,我们有一个消费者组,有三个成员c1
、c2
、c3
;其中c1
监听了t1
,c2
监听了t2
和t1
,c3
监听了t1
、t2
和t3
;t1
有1个分区,t2
有两个分区,t3
有两个分区。分区结果是:
c1
:t1p0
c2
:t2p0
、t2p1
c3
:t3p0
、t3p1
再举一个例子,假如有四个主题,t1
、t2
、t3
、t4
各有两个分区,有三个消费者c1
、c2
、c3
监听。分区结果:
c1
:t1p0
、t2p1
、t4p0
c2
:t1p1
、t3p0
、t4p1
c3
:t2p0
、t3p1
这个分配结果与RoundRobin的一致,不过当消费者c1
退出消费者组的时候,RoundRobin的分配结果是:
c2
:t1p0
、t2p0
、t3p0
、t4p0
c3
:t1p1
、t2p1
、t3p1
、t4p1
而使用Sticky的分配结果是:
c2
:t1p1
、t3p0
、t4p1
、t2p1
c3
:t2p0
、t3p1
、t1p0
、t4p0
即:在粘性的分配机制下,会默认保留原来的分配方案,将需要变化的分区进行分配。
设置方法
1 properties.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG, StickyAssignor.class .getName ()) ;
CooperativeSticky
分区策略
CooperativeSticky
,分区策略是和Sticky一致的,重点在于"Cooperative",这是一种分区过程的协议。
在2.4之前实现的都是Eager协议,它的整个分配过程是一次的,存在消费者不消费的情况(stop-the-world),为了解决这种情况,在2.4之后,增加了Cooperative协议,是一个渐进重平衡过程,它是在一个不中断当前消费者消费的情况下逐步增加分区的过程。
如图,是Sticky分区策略,注意图的中间部分,存在一个全停(stop-the-world)。
如图,是CooperativeSticky分区策略,注意图的中间部分,是一个渐进重平衡过程。
设置方法
1 properties.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG, CooperativeStickyAssignor.class .getName ()) ;
默认策略
默认策略Range + CooperativeSticky
。
对此官网的描述是:
The default assignor is [RangeAssignor, CooperativeStickyAssignor], which will use the RangeAssignor by default, but allows upgrading to the CooperativeStickyAssignor with just a single rolling bounce that removes the RangeAssignor from the list.
翻译成中文是:
默认分配器是[RangeAssignor, CooperativeStickyAssignor],它将默认使用RangeAssignor,但允许升级到 CooperativeStickyAssignor,只需一次从列表中删除RangeAssignor的滚动弹跳。
疑惑点在于:
什么是滚动弹跳(rolling bounce
)?
如何从列表中删除RangeAssignor?
我查阅了许多资料,请教过许多人,始终没有弄清楚Kafka的默认分区策略。
所以,最稳妥的实践方案:不采用默认的分区策略,指定分区策略。
offset位移
自动提交
为了使我们能够专注于自己的业务逻辑,Kafka提供了自动提交offset的功能。
相关参数有:
enable.auto.commit
消费者自动周期性地向服务器提交偏移量。
默认true
。
auto.commit.interval.ms
消费者向Kafka提交偏移量的频率,默认5s。
示例代码:
1 2 3 4 properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true ); properties.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, 1000 );
手动提交
Kafka还提供了手动提交offset的方法。
手动提交offset的方法有两种:
commitSync
,同步提交。
必须等待offset提交完毕,再去消费下一批数据。
阻塞当前线程,一直到提交成功,并且会自动失败重试。
commitAsync
,异步提交。
发送完提交offset请求后,就开始消费下一批数据了。
没有失败重试机制。
同步提交offset,虽然更可靠,但是会阻塞当前线程,直到提交成功,因此吞吐量会受到很大的影响。
更多的情况下,会选用异步提交offset的方式。
指定Offset消费
auto.offset.reset
当Kafka中没有初始偏移量或当前偏移量在服务器中不存在(例如,数据被删除了)。
发生这种情况的处理方法:
earliest
:自动重置偏移量到最早的偏移量。
latest
:默认,自动重置偏移量为最新的偏移量。
none
:如果消费组原来的(previous)偏移量不存在,则向消费者抛异常。
指定分区的方法如下:
1 kafkaConsumerseek(TopicPartition partition, long offset)
TopicPartition
:主题的分区
offset
:指定的offset
示例代码:
1 2 3 4 5 6 7 8 9 10 11 12 13 Set<TopicPartition> assignment = new HashSet<>(); while (assignment.size() == 0 ){ assignment = kafkaConsumer.assignment(); } for (TopicPartition tp : assignment){ kafkaConsumer.seek( tp ,10000 ) }
指定时间
相比指定offset,在实际工作中,我们更多的是指定时间。
例如,最近一天的数据异常,要重新消费。
在《Kafka-3.Broker》 ,我们讨论.timeindex
时间戳索引文件时,说在查询指定时间戳,需要根据.index
偏移量索引文件,进行二次定位。
这里的思路类型,首先根据时间,找到对应的offset,然后再指定offset。
示例代码:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 Set<TopicPartition> assignment = new HashSet<>(); while (assignment.size() == 0 ){ assignment = kafkaConsumer.assignment(); } HashMap< TopicPartition, Long> timestampToSearch = new HashMap<>(); long oneDayBefore = System.currentTimeMillis() - 1 * 24 * 3600 * 1000 ;for (TopicPartition topicPartition : assignment) { timestampToSearch.put(topicPartition, oneDayBefore); } Map<TopicPartition, OffsetAndTimestamp> offsets = kafkaConsumer.offsetsForTimes(timestampToSearch); for (TopicPartition tp : assignment){ OffsetAndTimestamp offsetAndTimestamp = offsets.get(tp); kafkaConsumer.seek( tp , offsetAndTimestamp.offset()); }
重点关注如下的代码,根据时间,找到对应的offset。
1 2 3 4 5 6 7 8 9 HashMap< TopicPartition, Long> timestampToSearch = new HashMap<>(); long oneDayBefore = System.currentTimeMillis() - 1 * 24 * 3600 * 1000 ;for (TopicPartition topicPartition : assignment) { timestampToSearch.put(topicPartition, oneDayBefore); } Map<TopicPartition, OffsetAndTimestamp> offsets = kafkaConsumer.offsetsForTimes(timestampToSearch);
消费者事务
重复消费
重复消费的例子:
Consumer每5s自动提交offset。
已经消费了2s中,但是在第2秒的时候,Consumer挂了。
再次重启Consumer,会从上一次提交的offset处继续消费,导致重复消费。
漏消费
漏消费的例子:
offset为手动提交。
offset已经被提交后,Consumer消费的数据还未处理,Consumer就挂了
再次重启Consumer,会从上一次提交的offset处继续消费,未处理的那一部分数据就漏消费了。
手动管理offset
为了不重复消费,不漏消费。
我们需要设置手动提交offset,并将Kafka的offset保存到支持事务的其他介质。例如:
isolation.level
还有一个需要考虑的地方。
我们知道,生产者有事务的,我们一般希望消费者只读生产者已经提交的,这个由参数isolation.level
控制。
如果设置为read_committed
,那么消费者就会忽略事务未提交的消息,即只能消费到LSO(Last Stable Offset)的位置;默认为read_uncommitted
,可以消费到HW(High Watermark)处的位置。
提高吞吐量
如果发生了数据积压怎么办?即如何提高吞吐量?
可以考虑增加主题的分区数,并且同时提升消费组的消费者数量,当然消费者数量最大只能等于分区数。
如果,消费者数量已经等于了分区数,再增加消费者数量没有任何作用,在增加的消费者会被闲置。
还可以考虑修改拉去数据的频率,每次拉取数据的数据量。
和如下的4个参数有关:
fetch.min.bytes
消费者获取服务器端一批消息最小的字节数。
默认1个字节。
fetch.max.wait.ms
消费者从服务端获取到数据的最大等待时间
默认500ms。
fetch.max.bytes
消费者从服务端获取一批消息最大的字节数。
但并不完全由这个决定,如果一条消息的数据量也大于该值,该条消息仍然能被拉去回去。
默认50MB。
max.poll.records
一次拉取数据的最大条数。
默认500条。