avatar


4.Consumer

消费方式

在消息队列中,消费者有两种方式消费消息:

  1. PULL(拉取)模式
  2. PUSH(推送)模式

在Kafka中,是PULL(拉取)模式,即消费者主动从Broker中拉取数据;而没有采取PUSH(推送)模式(Broker主动推送);因为由Broker决定消息发送速率,很难适应所有消费者的消费速率;PULL(拉取)模式的不足之处是,即使Broker中没有数据,消费者也可能会一直循环,轮询拉取数据。

整体过程

整体过程

通过我们之前章节的讨论,其实对于整体的工作过程,已经非常清楚了。

  1. 生产者向每一个分区的Leader发送数据。
  2. Follower主动和Leader同步数据。
  3. 消费者消费数据。
    1. 一个消费者可以消费一个或多个分区的数据。
    2. 每一个分区的数据,只能由消费者组中的一个消费者消费。即,不允许一个消费者组中的多个消费者,同时消费一个分区。
      但可以由不同的消费者组进行消费。
  4. 消费者消费数据的offset,维护在系统的主题__consumer_offsets中。
    0.9之前的版本,维护在Zookeeper中。即我们在《3.Broker》提到的Zookeeper的consumer节点。这么设计的缺陷在于,消费者不但要和Broker进行通信,还要频繁的和Zookeeper进行通信。

消费者组

接下来,我们重点讨论一下消费者组。

概述

消费者组,Consumer Group(CG),由多个消费者组成,这些消费者的group.id相同。

  • 消费者组内每个消费者负责消费不同分区的数据。
  • 一个分区只能由一个消费者组内的消费者进行消费。
  • 消费者组之间互不影响。

例子:

例图

  • 例一
    消费者组中只有一个消费者,这个消费者消费了所有的分区。
  • 例二
    消费者组中有两个消费者,第一个消费者消费了0和1,第二个消费者消费了2和3。
    (可不可以第一个消费者消费0个2,第二个消费者消费1和3呢?可以,我们在下文讨论分区分配的时候会讨论怎么实现。)
  • 例三
    消费者组中有四个消费者,每个消费者消费一个分区。
  • 例四
    消费者组中有五个消费者,多于分区数,这时候会有一个闲置的消费者。
  • 例五
    有两个消费者组,消费者组之间的互不影响。

初始化

初始化

  1. 选择coordinator。
    根据group.id的哈希值对50(__consumer_offsets的分区数量)求余。
    例如,group.id的哈希值=1=11%50=11\%50 = 1,那么就是__consumer_offsets主题的1号分区,这个分区的Leader所在的机器,就是coordinator的地址,即就是被选择的coordinator。
    (这个过程,和我们在《2.Producer》所讨论的生产者事务,有些类似。)
  2. 每个Consumer都发送join group请求。
  3. coordinator选择一个Consumer作为Leader。
  4. coordinator把消费者组中的所有消费者要消费的topic情况,发送给Leader消费者。
  5. Leader消费者制定消费方案(每个消费者到底消费哪个分区的方案)。
  6. Leader消费者把其制定的消费方案发送给coordinator。
  7. coordinator把消费方案发给每一个Consumer,然后Consumer开始消费。

我们还可以看看Kafka的log.dirs的内容,可以看到__consumer_offsets有多个分区。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
drwxr-xr-x 40 root root 4096  3月  6 22:23 ./
drwxr-xr-x 10 root root 4096 3月 3 10:43 ../
-rw-r--r-- 1 root root 0 3月 3 10:54 cleaner-offset-checkpoint
drwxr-xr-x 2 root root 4096 3月 6 22:16 __consumer_offsets-0/
drwxr-xr-x 2 root root 4096 3月 6 22:16 __consumer_offsets-12/
drwxr-xr-x 2 root root 4096 3月 6 22:16 __consumer_offsets-15/
drwxr-xr-x 2 root root 4096 3月 6 22:16 __consumer_offsets-18/
drwxr-xr-x 2 root root 4096 3月 6 22:16 __consumer_offsets-21/
drwxr-xr-x 2 root root 4096 3月 6 22:16 __consumer_offsets-24/
drwxr-xr-x 2 root root 4096 3月 6 22:16 __consumer_offsets-27/
drwxr-xr-x 2 root root 4096 3月 6 22:16 __consumer_offsets-3/
drwxr-xr-x 2 root root 4096 3月 6 22:16 __consumer_offsets-30/
drwxr-xr-x 2 root root 4096 3月 6 22:16 __consumer_offsets-33/
drwxr-xr-x 2 root root 4096 3月 6 22:16 __consumer_offsets-36/
drwxr-xr-x 2 root root 4096 3月 6 22:16 __consumer_offsets-39/
drwxr-xr-x 2 root root 4096 3月 6 22:16 __consumer_offsets-42/
drwxr-xr-x 2 root root 4096 3月 6 22:16 __consumer_offsets-45/
drwxr-xr-x 2 root root 4096 3月 6 22:16 __consumer_offsets-48/
drwxr-xr-x 2 root root 4096 3月 6 22:16 __consumer_offsets-6/
drwxr-xr-x 2 root root 4096 3月 6 22:16 __consumer_offsets-9/
drwxr-xr-x 2 root root 4096 3月 6 18:28 first-0/
drwxr-xr-x 2 root root 4096 3月 3 19:13 first-1/
drwxr-xr-x 2 root root 4096 3月 6 18:14 first-2/
-rw-r--r-- 1 root root 0 3月 3 10:54 .lock
-rw-r--r-- 1 root root 4 3月 6 22:23 log-start-offset-checkpoint
-rw-r--r-- 1 root root 88 3月 6 18:14 meta.properties
drwxr-xr-x 2 root root 4096 3月 6 22:16 myTopic-0/
-rw-r--r-- 1 root root 873 3月 6 22:23 recovery-point-offset-checkpoint
-rw-r--r-- 1 root root 880 3月 6 22:23 replication-offset-checkpoint
drwxr-xr-x 2 root root 4096 3月 6 22:16 __transaction_state-1/
drwxr-xr-x 2 root root 4096 3月 6 22:16 __transaction_state-10/
drwxr-xr-x 2 root root 4096 3月 6 22:16 __transaction_state-13/
drwxr-xr-x 2 root root 4096 3月 6 22:16 __transaction_state-16/
drwxr-xr-x 2 root root 4096 3月 6 22:16 __transaction_state-19/
drwxr-xr-x 2 root root 4096 3月 6 22:16 __transaction_state-22/
drwxr-xr-x 2 root root 4096 3月 6 22:16 __transaction_state-25/
drwxr-xr-x 2 root root 4096 3月 6 22:16 __transaction_state-28/
drwxr-xr-x 2 root root 4096 3月 6 22:16 __transaction_state-31/
drwxr-xr-x 2 root root 4096 3月 6 22:16 __transaction_state-34/
drwxr-xr-x 2 root root 4096 3月 6 22:16 __transaction_state-37/
drwxr-xr-x 2 root root 4096 3月 6 22:16 __transaction_state-4/
drwxr-xr-x 2 root root 4096 3月 6 22:16 __transaction_state-40/
drwxr-xr-x 2 root root 4096 3月 6 22:16 __transaction_state-43/
drwxr-xr-x 2 root root 4096 3月 6 22:16 __transaction_state-46/
drwxr-xr-x 2 root root 4096 3月 6 22:16 __transaction_state-49/
drwxr-xr-x 2 root root 4096 3月 6 22:16 __transaction_state-7/
  • 我们看到,还有一个主题__transaction_state,也有多个分区。关于这个主题,我们在《2.Producer》的生产者事务部分,有过讨论。

消费过程

关于具体的消费过程,其实已经和消费者组的关系不大了,基本上都在消费者内部进行处理。

消费过程

  1. 消费者内部会有一个对象fetcher,这个对象会调用其成员方法sendFetches,发送消费请求。
  2. sendFetches方法会创建FetchRequest类型的对象。
    在创建FetchRequest类型的对象的过程中有三个重要参数:
    • fetch.min.bytes:每次拉取的最小字节数,默认1个字节。
    • fetch.max.wait.ms:每次拉取的最大等待时间,默认300ms。
    • fetch.max.bytes:每次拉取的最大字节数。
  3. 之后会调用ConsumerNetworkClient的send(fetchRequest)方法,该方法返回一个RequestFuture类型的对象。
  4. RequestFuture会监听onSuccessonFailure方法。
  5. onSuccess方法,会把完成的拉取(completedFetch)保存在队列completedFetches
  6. fetcher调用fetchedRecords,期间有一个相关参数max.poll.records,每次最多拉取的条数。
  7. 经过parseRecord(反序列化)、interceptors(拦截器),最后进行数据处理。

重要参数

bootstrap.servers

向Kafka集群建立初始连接用到的(host/port)列表。

key.deserializervalue.deserializer

指定接收消息的key和value的反序列化类型,一定要写全类名。

group.id

标记消费者所属的消费者组。

enable.auto.commit

消费者自动周期性地向服务器提交偏移量。
默认true

auto.commit.interval.ms

消费者向Kafka提交偏移量的频率,默认5s。

auto.offset.reset

当Kafka中没有初始偏移量或当前偏移量在服务器中不存在(例如,数据被删除了)。
发生这种情况的处理方法:

  • earliest:自动重置偏移量到最早的偏移量。
  • latest:默认,自动重置偏移量为最新的偏移量。
  • none:如果消费组原来的(previous)偏移量不存在,则向消费者抛异常。

offsets.topic.num.partitions

__consumer_offsets的分区数。
默认50个分区。

heartbeat.interval.ms

消费者和coordinator之间的心跳时间,默认3s。
根据一般的理解,该值肯定要小于session.timeout.ms,实际上不应该高于session.timeout.ms13\frac{1}{3}

session.timeout.ms

消费者和coordinator之间连接超时时间,如果超过该值,该消费者将被移除,消费者组执行再平衡。
默认45s。

max.poll.interval.ms

消费者处理消息的最大时长,如果超过该值,该消费者被移除,消费者组执行再平衡。
默认5分钟。

partition.assignment.strategy

Kafka有四种分区分配策略:

  • Range
  • RoundRobin
  • Sticky
  • CooperativeSticky

可以同时使用多个分区分配策略。
默认策略Range + CooperativeSticky

fetch.min.bytes

消费者获取服务器端一批消息最小的字节数。
默认1个字节。

fetch.max.wait.ms

消费者从服务端获取到数据的最大等待时间
默认500ms。

fetch.max.bytes

消费者从服务端获取一批消息最大的字节数。
但并不完全由这个决定,如果一条消息的数据量也大于该值,该条消息仍然能被拉去回去。
默认50MB。

max.poll.records

一次拉取数据的最大条数。
默认500条。

isolation.level

如果设置为read_committed,那么消费者就会忽略生产者事务未提交的消息,即只能消费到LSO(Last Stable Offset)的位置;
默认为read_uncommitted,可以消费到HW(High Watermark)处的位置。

消费消息

订阅主题

假设存在一个主题first,有三个分区。
示例代码:

1
./bin/kafka-topics.sh --bootstrap-server kafka-01:9092 --describe --topic first

运行结果:

1
2
3
4
Topic: first    TopicId: ultePqLST-yGb88tTRarwA PartitionCount: 3       ReplicationFactor: 3    Configs: 
Topic: first Partition: 0 Leader: 1 Replicas: 3,1,2 Isr: 1,2,3
Topic: first Partition: 1 Leader: 1 Replicas: 1,2,3 Isr: 1,2,3
Topic: first Partition: 2 Leader: 2 Replicas: 2,3,1 Isr: 1,2,3

我们现在创建一个消费者组,这个消费者组中只有一个消费者,由这个消费者来消费主题first
示例代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
package com.kakawanyifan;

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.StringDeserializer;

import java.time.Duration;
import java.util.ArrayList;
import java.util.Properties;

public class MyConsumer {
public static void main(String[] args) {
// 创建消费者的配置对象
Properties properties = new Properties();
// 给消费者配置对象添加参数
properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "kafka-01:9092");
// 配置序列化 必须
properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG , StringDeserializer.class.getName());
// 配置消费者组 必须
properties.put(ConsumerConfig.GROUP_ID_CONFIG, "test");

// 创建消费者对象
KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<String, String>(properties);

// 注册主题
ArrayList<String> topics = new ArrayList<>();
topics.add("first");
kafkaConsumer.subscribe(topics);

// 拉取数据打印
while (true) {
// 设置1s中消费一批数据
ConsumerRecords<String, String> consumerRecords = kafkaConsumer.poll(Duration.ofSeconds(1));
// 打印消费到的数据
for (ConsumerRecord<String, String> consumerRecord : consumerRecords) {
System.out.println(consumerRecord);
}
}
}
}

我们创建生产者,并发送数据qwert
运行结果:

1
ConsumerRecord(topic = first, partition = 2, leaderEpoch = 15, offset = 25, CreateTime = 1678101533106, serialized key size = -1, serialized value size = 5, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = qwert)

在这个例子中,我们指定了消费者组的ID,而且这个是必须指定的,否者会报错。
但是在《1.初步认识》的命令行中又没有指定,因为通过命令行启动消费者不填写消费者组ID的话,会被自动填写随机一个消费者组ID。

指定分区

示例代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
package com.kakawanyifan;

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.StringDeserializer;

import java.time.Duration;
import java.util.ArrayList;
import java.util.Properties;

public class MyConsumer {
public static void main(String[] args) {
// 创建消费者的配置对象
Properties properties = new Properties();
// 给消费者配置对象添加参数
properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "kafka-01:9092");
// 配置序列化 必须
properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG , StringDeserializer.class.getName());
// 配置消费者组 必须
properties.put(ConsumerConfig.GROUP_ID_CONFIG, "test");

// 创建消费者对象
KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<String, String>(properties);

// 消费某个主题的某个分区数据
ArrayList topicPartitions = new ArrayList<>();
topicPartitions.add(new TopicPartition("first", 2));
kafkaConsumer.assign(topicPartitions);

// 拉取数据打印
while (true) {
// 设置1s中消费一批数据
ConsumerRecords<String, String> consumerRecords = kafkaConsumer.poll(Duration.ofSeconds(1));
// 打印消费到的数据
for (ConsumerRecord<String, String> consumerRecord : consumerRecords) {
System.out.println(consumerRecord);
}
}
}
}

消费者组

多个消费者组成一个消费者组,要求他们的group.id相同。
所以,我们需要, 创建多个消费者,配置相同的group.id

一个分区的数据,只能由消费者组中的一个消费者进行消费。
所以?我们还需要为每个消费者指定分区?一定不能重复?
不用这么麻烦,这部分是自动完成的。
这就是我们接下来要讨论的内容,分区分配。

分区分配

四种策略

Kafka有四种分区分配策略:

  • Range
  • RoundRobin
  • Sticky
  • CooperativeSticky

通过配置参数partition.assignment.strategy,即可以修改分区的分配策略。

四种分区分配策略,实现的是同一个接口ConsumerPartitionAssignor
分区分配

重平衡

需要注意的是,Kafka中,除了在启动的时候会进行分区的分配,在运行过程中,也会进行分区的分配,这个被称为重平衡。

触发重平衡的情况有:

  • 消费者超时
    每个消费者都会和coordinator保持心跳(默认3S),一旦超时(session.timeout.ms=45S),该消费者会被移除,触发重平衡。
  • 消费者处理消息的时间过长(max.poll.interval.ms=5分钟),该消费者会被移除,触发重平衡。
  • 消费者主动退出集群。
  • 消费者主动取消订阅某个主题
  • 新的消费者加入集群
  • 分区数被动态扩充
  • 消费者动态监听(消费)了新的主题

Range

分区策略

Range

Range是对每一个topic而言的。

对同一个topic中的分区按照序号进行排序,并对消费者按照字母顺序(Kafka内部会对消费者组中每一个消费者进行随机的编号)进行排序。
例如,有7个分区,3个消费者,排序后的分区是0,1,2,3,4,5,6;消费者排序完之后将会是C0C1C2

通过partitions数consumer数\frac{\text{partitions数}}{\text{consumer数}}来决定每个消费者应该消费几个分区,如果除不尽,那么前面几个消费者将会多消费1个分区。
例如,73=2,1\frac{7}{3}=2,\text{余}1,那么排在前面的C0会多消费1个分区。如果有8个分区,83=2,2\frac{8}{3}=2,\text{余}2,那么C0和C1分别多消费一个。

针对一个主题,排在前面的消费者,尤其是C0,都将多消费1个分区;如果主题特别特别多,C0消费的分区会比明显比其他消费者多
这种现象被称为 数据倾斜

设置方法

1
properties.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG, RangeAssignor.class.getName());

RoundRobin

分区策略

RoundRobin

RoundRobin针对集群中所有Topic而言。

把所有的partition和所有的消费者都列出来,然后按照hashcode进行排序,最后通过轮询算法来分配partition给到各个消费者。

需要注意的是,是基于消费者组和组内成员的监听情况进行分配的。
例如,有三个成员c1c2c3c1监听了t1c2监听了t2t1c3监听了t1t2t3t1有1个分区,t2有两个分区,t3有两个分区;分配结果是:

  • c1t1p0
  • c2t2p0
  • c3t2p1t3p0t3p1

这样不是很均衡,如果我们将t2p1分配给c2显然会更均衡。

设置方法

1
properties.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG, RoundRobinAssignor.class.getName());

Sticky

分区策略

粘性分区,可以理解为分配的结果带有"粘性的",其分区目标是:

  • 第一个目标:分区尽可能和上一次分配结果保持一致。
  • 第二个目标:分配尽可能均匀,每个消费者消费的分区数最多相差1。
  • 当两个目标发生冲突的时候,第一个目标优于第二个。

有些资料说,是 粘上就不动 ,这种说法肯定是欠妥的,是尽量不动。

例如,我们有一个消费者组,有三个成员c1c2c3;其中c1监听了t1c2监听了t2t1c3监听了t1t2t3t1有1个分区,t2有两个分区,t3有两个分区。分区结果是:

  • c1t1p0
  • c2t2p0t2p1
  • c3t3p0t3p1

再举一个例子,假如有四个主题,t1t2t3t4各有两个分区,有三个消费者c1c2c3监听。分区结果:

  • c1t1p0t2p1t4p0
  • c2t1p1t3p0t4p1
  • c3t2p0t3p1

这个分配结果与RoundRobin的一致,不过当消费者c1退出消费者组的时候,RoundRobin的分配结果是:

  • c2t1p0t2p0t3p0t4p0
  • c3t1p1t2p1t3p1t4p1

而使用Sticky的分配结果是:

  • c2t1p1t3p0t4p1t2p1
  • c3t2p0t3p1t1p0t4p0

即:在粘性的分配机制下,会默认保留原来的分配方案,将需要变化的分区进行分配。

设置方法

1
properties.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG, StickyAssignor.class.getName());

CooperativeSticky

分区策略

CooperativeSticky,分区策略是和Sticky一致的,重点在于"Cooperative",这是一种分区过程的协议。
在2.4之前实现的都是Eager协议,它的整个分配过程是一次的,存在消费者不消费的情况(stop-the-world),为了解决这种情况,在2.4之后,增加了Cooperative协议,是一个渐进重平衡过程,它是在一个不中断当前消费者消费的情况下逐步增加分区的过程。

Sticky

如图,是Sticky分区策略,注意图的中间部分,存在一个全停(stop-the-world)。

CooperativeSticky

如图,是CooperativeSticky分区策略,注意图的中间部分,是一个渐进重平衡过程。

设置方法

1
properties.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG, CooperativeStickyAssignor.class.getName());

默认策略

默认策略Range + CooperativeSticky

对此官网的描述是:

The default assignor is [RangeAssignor, CooperativeStickyAssignor], which will use the RangeAssignor by default, but allows upgrading to the CooperativeStickyAssignor with just a single rolling bounce that removes the RangeAssignor from the list.

翻译成中文是:

默认分配器是[RangeAssignor, CooperativeStickyAssignor],它将默认使用RangeAssignor,但允许升级到 CooperativeStickyAssignor,只需一次从列表中删除RangeAssignor的滚动弹跳。

疑惑点在于:

  • 什么是滚动弹跳(rolling bounce)?
  • 如何从列表中删除RangeAssignor?

我查阅了许多资料,请教过许多人,始终没有弄清楚Kafka的默认分区策略。

所以,最稳妥的实践方案:
不采用默认的分区策略,指定分区策略。

offset位移

自动提交

为了使我们能够专注于自己的业务逻辑,Kafka提供了自动提交offset的功能。
相关参数有:

  • enable.auto.commit
    消费者自动周期性地向服务器提交偏移量。
    默认true
  • auto.commit.interval.ms
    消费者向Kafka提交偏移量的频率,默认5s。

示例代码:

1
2
3
4
// 是否自动提交offset
properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true);
// 提交offset的时间周期1000ms ,默认5s
properties.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, 1000);

手动提交

Kafka还提供了手动提交offset的方法。
手动提交offset的方法有两种:

  • commitSync,同步提交。
    必须等待offset提交完毕,再去消费下一批数据。
    阻塞当前线程,一直到提交成功,并且会自动失败重试。
  • commitAsync,异步提交。
    发送完提交offset请求后,就开始消费下一批数据了。
    没有失败重试机制。

同步提交offset,虽然更可靠,但是会阻塞当前线程,直到提交成功,因此吞吐量会受到很大的影响。
更多的情况下,会选用异步提交offset的方式。

指定Offset消费

auto.offset.reset

当Kafka中没有初始偏移量或当前偏移量在服务器中不存在(例如,数据被删除了)。
发生这种情况的处理方法:

  • earliest:自动重置偏移量到最早的偏移量。
  • latest:默认,自动重置偏移量为最新的偏移量。
  • none:如果消费组原来的(previous)偏移量不存在,则向消费者抛异常。

指定分区的方法如下:

1
kafkaConsumerseek(TopicPartition partition, long offset)
  • TopicPartition:主题的分区
  • offset:指定的offset

示例代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
// 分区分配方案
Set<TopicPartition> assignment = new HashSet<>();

// 确保分区分配已经完成,已经拿到了分区分配方案
while (assignment.size() == 0){
// 获取消费者分区分配信息(有了分区分配信息才能开始消费)
assignment = kafkaConsumer.assignment();
}

// 遍历所有分区,并指定 offset 从 10000 的位置开始 消费
for (TopicPartition tp : assignment){
kafkaConsumer.seek( tp ,10000)
}

指定时间

相比指定offset,在实际工作中,我们更多的是指定时间。
例如,最近一天的数据异常,要重新消费。

《3.Broker》,我们讨论.timeindex时间戳索引文件时,说在查询指定时间戳,需要根据.index偏移量索引文件,进行二次定位。

这里的思路类型,首先根据时间,找到对应的offset,然后再指定offset。

示例代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
// 分区分配方案
Set<TopicPartition> assignment = new HashSet<>();

// 确保分区分配已经完成,已经拿到了分区分配方案
while (assignment.size() == 0){
// 获取消费者分区分配信息(有了分区分配信息才能开始消费)
assignment = kafkaConsumer.assignment();
}

HashMap< TopicPartition, Long> timestampToSearch = new HashMap<>();
// 封装集合存储,每个分区对应一天前的数据
long oneDayBefore = System.currentTimeMillis() - 1 * 24 * 3600 * 1000;
for (TopicPartition topicPartition : assignment) {
timestampToSearch.put(topicPartition, oneDayBefore);
}

// 获取从 1 天前开始消费的每个分区的 offset
Map<TopicPartition, OffsetAndTimestamp> offsets = kafkaConsumer.offsetsForTimes(timestampToSearch);

// 遍历所有分区,并指定 offset 从 10000 的位置开始 消费
for (TopicPartition tp : assignment){
OffsetAndTimestamp offsetAndTimestamp = offsets.get(tp);
kafkaConsumer.seek( tp , offsetAndTimestamp.offset());
}

重点关注如下的代码,根据时间,找到对应的offset。

1
2
3
4
5
6
7
8
9
HashMap< TopicPartition, Long> timestampToSearch = new HashMap<>();
// 封装集合存储,每个分区对应一天前的数据
long oneDayBefore = System.currentTimeMillis() - 1 * 24 * 3600 * 1000;
for (TopicPartition topicPartition : assignment) {
timestampToSearch.put(topicPartition, oneDayBefore);
}

// 获取从 1 天前开始消费的每个分区的 offset
Map<TopicPartition, OffsetAndTimestamp> offsets = kafkaConsumer.offsetsForTimes(timestampToSearch);

消费者事务

重复消费

重复消费的例子:

  1. Consumer每5s自动提交offset。
  2. 已经消费了2s中,但是在第2秒的时候,Consumer挂了。
  3. 再次重启Consumer,会从上一次提交的offset处继续消费,导致重复消费。

漏消费

漏消费的例子:

  1. offset为手动提交。
  2. offset已经被提交后,Consumer消费的数据还未处理,Consumer就挂了
  3. 再次重启Consumer,会从上一次提交的offset处继续消费,未处理的那一部分数据就漏消费了。

手动管理offset

为了不重复消费,不漏消费。
我们需要设置手动提交offset,并将Kafka的offset保存到支持事务的其他介质。例如:

isolation.level

还有一个需要考虑的地方。
我们知道,生产者有事务的,我们一般希望消费者只读生产者已经提交的,这个由参数isolation.level控制。
如果设置为read_committed,那么消费者就会忽略事务未提交的消息,即只能消费到LSO(Last Stable Offset)的位置;默认为read_uncommitted,可以消费到HW(High Watermark)处的位置。

提高吞吐量

如果发生了数据积压怎么办?即如何提高吞吐量?

可以考虑增加主题的分区数,并且同时提升消费组的消费者数量,当然消费者数量最大只能等于分区数。
如果,消费者数量已经等于了分区数,再增加消费者数量没有任何作用,在增加的消费者会被闲置。

还可以考虑修改拉去数据的频率,每次拉取数据的数据量。
和如下的4个参数有关:

  • fetch.min.bytes
    消费者获取服务器端一批消息最小的字节数。
    默认1个字节。
  • fetch.max.wait.ms
    消费者从服务端获取到数据的最大等待时间
    默认500ms。
  • fetch.max.bytes
    消费者从服务端获取一批消息最大的字节数。
    但并不完全由这个决定,如果一条消息的数据量也大于该值,该条消息仍然能被拉去回去。
    默认50MB。
  • max.poll.records
    一次拉取数据的最大条数。
    默认500条。
文章作者: Kaka Wan Yifan
文章链接: https://kakawanyifan.com/11904
版权声明: 本博客所有文章版权为文章作者所有,未经书面许可,任何机构和个人不得以任何形式转载、摘编或复制。

留言板