avatar


3.Broker

ZK存储的信息

信息目录

如图,是Zookeeper中存储的部分和Kafka有关的信息。

Zookeeper存储的Kafka信息

具体,我们可以通过Zookeeper命令看一下。

启动Zookeeper的Shell客户端。
在Kafka安装目录的bin目录下,找到zookeeper-shell.sh

1
./zookeeper-shell.sh 127.0.0.1:2181

如果我们没有用Kafka自带的Zookeeper,而是用的外部的Zookeeper,该Sh文件应该zkCli.sh

通过ls命令可以查看Zookeeper的节点信息相关信息。
示例代码:

1
ls /

运行结果:

1
[kafka, zookeeper]

查看其中和Kafka相关的。
示例代码:

1
ls /kafka

运行结果:

1
[admin, brokers, cluster, config, consumers, controller, controller_epoch, feature, isr_change_notification, latest_producer_id_block, log_dir_event_notification]

查看/kafka/brokers
示例代码:

1
ls /kafka/brokers

运行结果:

1
[ids, seqid, topics]

查看/kafka/brokers/ids
示例代码:

1
ls /kafka/brokers/ids

运行结果:

1
[1, 2, 3]

对应我们有的三个Broker。

重要信息

/kafka/brokers/ids

记录有哪些节点。

示例代码:

1
ls /kafka/brokers/ids
运行结果:
1
[1, 2, 3]

/kafka/brokers/topics/first/partitions/0/state

记录Leader以及ISR队列。

例如,查看first主题的分区0的信息。示例代码:

1
get /kafka/brokers/topics/first/partitions/0/state
运行结果:
1
{"controller_epoch":1,"leader":1,"version":1,"leader_epoch":0,"isr":[1,2,3]}

查看first主题的分区1的信息。示例代码:

1
get /kafka/brokers/topics/first/partitions/1/state
运行结果:
1
{"controller_epoch":1,"leader":2,"version":1,"leader_epoch":0,"isr":[2,3,1]}

/kafka/controller

记录辅助选举Leader。

示例代码:

1
get /kafka/controller
运行结果:
1
{"version":2,"brokerid":1,"timestamp":"1677812094647","kraftControllerEpoch":-1}

整体过程

Leader选举过程

Leader选举过程

  • 上方的是Zookeeper集群。
  • 下方的是Kafka集群。

选举过程:

  1. broker启动,向Kafka进行注册。
  2. 三个broker开始抢占Controller,先注册上的作为Controller。
  3. Controller监听brokers的节点变化。
  4. Controller决定Leader的选举
    规则:在ISR中存活为前提,按照AR中排在前面的优先。
    例如:AR[2,1,3],ISR[1,2,3],那么Controller会选举2作为Leader。
    AR:Kafka中的所有副本。
  5. Controller将节点信息上传到Zookeeper。
  6. 其他Controller节点会从Zookeeper拉取数据。
  7. 接下来,生产者发送消息。但是,如果broker-2中的Leader这时候挂了呢?
  8. Controller监听到节点(/brokers/ids/)的变化。
  9. Controller从Zookeeper中重新获取ISR信息。
  10. 选举新的Leader,之后更新Leader信息和ISR信息等。

我们可以进行实验,手动下线一些broker,以验证上述过程,具体过程略。

故障处理细节

几个概念

  • 副本
    • 作用:提高数据可靠性。
    • 数量:默认1个副本,生产环境一般配置为2个;太多副本会增加磁盘存储空间,增加网络上数据传输,降低效率。
    • 分类:Kafka中副本分为Leader和Follower。生产者只会把数据发往Leader,然后Follower找Leader进行同步数据。
  • AR:所有副本统称为AR(Assigned Repllicas)。
    AR = ISR + OSR
  • ISR:和Leader保持同步的Follower集合,如果Follower长时间未向Leader发送通信请求或同步数据,则该Follower将被踢出ISR。
  • OSR:没有和Leader保持同步(延迟过多)的Follower集合。

那么,OSR中的副本,什么情况下会恢复到ISR中呢?
这就是我们接下来要讨论的问题的。

LEO和HW

  • LEO(Log End Offset):每个副本的最后一个offset+1+1
  • HW(High Watermark):所有副本中最小的LEO。

LEO和HW

为什么Follower的数据会和Leader的没对齐呢?
生产者只会把数据发往Leader,然后Follower找Leader进行同步数据。所以,可能会在某一个瞬间,数据没对齐。

Follower故障

  • 发生故障的Follower会被踢出ISR。
  • Leader和其他的Follower继续接收数据。

Follower故障-1

  • 在发生故障的Follower恢复后,该Follower会读取本地磁盘记录的上次的HW,并将log文件高于HW的部分删除。

Follower故障-2

  • 然后从HW开始向Leader进行同步,直到该Follower的LEO大于等于该Partition的HW,即Follower追上HW之后,可以重新加入ISR。

Follower故障-3

Leader故障

  • Leader发生故障之后,会从ISR中选出一个新的Leader。

Leader故障-1

  • 为保证多个副本之间的数据一致性,其余的Follower会先将各自的log文件高于HW的部分删除,然后从新的Leader同步数据。

Leader故障-2

这样做有没有问题?
这样消息不就丢了吗?
就是会丢消息,因为Kafka的这个处理机制,只是做数据一致性。
如果消息不丢失,参考《2.Producer》关于"消息不丢失"的讨论。

如果Controller挂了

通过上文的讨论,我们知道,只有一个Controller。如果Controller挂了呢?

Kafka为Controller提供了故障转移(Failover)功能,当运行中的Controller突然宕机或意外终止时,Kafka能够快速地感知到,并立即启用备用Controller来代替之前失败的Controller。

重新选举Controller

  1. 当Broker-0宕机后,Zookeeper通过Watch机制感知到并删除了Controller临时节点。
    所有存活的Broker开始竞选新的Controller身份。
  2. Broker-3最终赢得了选举,成功地在ZooKeeper上重建了Controller节点。
  3. Broker-3会从Zookeeper中读取集群元数据信息,并初始化到自己的缓存中。
    至此Controller的Failover完成,可以行使正常的工作职责了。

脑裂问题

如果之前的Controller只是因为某些原因(例如GC),而被认为是挂掉了呢,现在又恢复了,这时候集群中存在两个Controller,那怎么办?

Kafka通过使用epoch number(纪元编号,也称为隔离令牌)来克服脑裂问题。
epoch number是单调递增的数字,第一次选出Controller时,epoch number值为1,如果再次选出新的Controller,则epoch number将为2,依次单调递增。
每个新选出的Controller,通过Zookeeper的条件递增操作都会获得一个全新的、数值更大的epoch number
其他Broker在知道当前epoch number后,如果收到由Controller发出的包含较旧(较小)epoch number的消息,就会忽略它们,即Broker根据最大的epoch number来区分当前最新的Controller。

尚方宝剑

重要参数

replica.lag.time.max.ms

在ISR中,如果Follower长时间未向Leader发送通信请求或同步数据,则该Follower将被踢出ISR。
默认30s。

auto.leader.rebalance.enable

是否开启自动Leader分区自动平衡。
默认true

leader.imbalance.per.broker.percentage
每个broker允许的不平衡的Leader的比率。
如果每个broker超过了这个值,Controller会触发Leader的平衡。

默认10%

leader.imbalance.check.interval.seconds

检查Leader负载是否平衡的间隔时间。

默认300秒。

log.segment.bytes

Kafka中log日志是分成一块块存储的,此配置是指log日志划分成块的大小。

默认1G。

log.index.interval.bytes

Kafka每当写入了该值大小的日志(.log),就往(.index)文件里面记录一个索引。

默认4KB。

log.retention.hours

Kafka中数据保存的时间。

默认7天。

log.retention.minutes

Kafka中数据保存的时间,分钟级别。
默认关闭。

log.retention.ms

Kafka中数据保存的时间,毫秒级别。
默认关闭。

log.retention.check.interval.ms

检查数据是否保存超时的间隔。
默认5分钟。
该间隔时间一定要小于我们设置的数据保存时间。一个极端的例子,我们设置了数据保存时间是100毫秒,但这个值没有修改,那么100毫秒的数据保存,将不会真正的发挥作用。

log.retention.bytes

超过设置的所有日志总大小,删除最早的Segment。

默认-1,表示无穷大。

log.cleanup.policy

表示所有数据启用删除策略;如果设置值为compact,表示所有数据启用压缩策略。

默认delete

num.io.threads

负责写磁盘的线程数,整个参数值要占总核数的50%。

默认8。

num.replica.fetchers

副本拉取线程数,这个参数占总核数的50%的13\frac{1}{3}
默认1。

num.network.threads

数据传输线程数,这个参数占总核数的50%的23\frac{2}{3}
默认3。

log.flush.interval.messages

强制页缓存刷写到磁盘的条数,一般不建议修改,交给系统自己管理。

log.flush.interval.ms

每隔多久,刷数据到磁盘,一般不建议修改,交给系统自己管理。

节点管理

包括节点的退役和服役在内的节点管理,其实操作是一样的,是同一种方法的不同应用场景。

新增节点

讨论了怎么服役新节点,也就讨论了怎么退役旧节点。
现在是我们的Kafka集群有三个节点,如果我们想再增加一个节点,应该怎么操作?

新增节点的方法很简单,再安装一个Kakfa,修改server.properties,重点关注如下三项:

  • broker.id
  • zookeeper.connect
  • log.dirs

然后直接启动即可。

启动完成之后,我们可以在Zookeeper上看一下节点信息。
示例代码:

1
ls /kafka/brokers/ids

运行结果:

1
[1, 2, 3, 4]

会发现新的节点已经加进来了。

Partition迁移

有些资料会称Partition迁移负载均衡操作。我个人观点,负载均衡操作,这个名字太笼统了。

假如有这么一个场景,我们有一个主题first,分布在之前的三个节点上。示例代码:

1
./bin/kafka-topics.sh --bootstrap-server kafka-01:9092 --describe --topic first

运行结果:

1
2
3
4
Topic: first    TopicId: ultePqLST-yGb88tTRarwA PartitionCount: 3       ReplicationFactor: 3    Configs: 
Topic: first Partition: 0 Leader: 1 Replicas: 1,2,3 Isr: 1,2,3
Topic: first Partition: 1 Leader: 2 Replicas: 2,3,1 Isr: 2,3,1
Topic: first Partition: 2 Leader: 3 Replicas: 3,1,2 Isr: 3,1,2

现在,我们想让其也分布在新服役的第四个节点上。

创建迁移文件

新建一个JSON格式的文件,文件名任意,在这里的文件名是topics-to-move.json
在文件内容中,我们按如下的格式配置需要被均衡的主题。

1
2
3
4
5
6
{
"topics": [{
"topic": "【需要被均衡的主题】"
}],
"version": 1
}

生成迁移计划

示例代码:

1
2
3
4
5
./bin/kafka-reassign-partitions.sh \
--bootstrap-server kafka-01:9092 \
--topics-to-move-json-file ./json/topics-to-move.json \
--broker-list "1,2,3,4" \
--generate

运行结果:

1
2
3
4
5
Current partition replica assignment
{"version":1,"partitions":[{"topic":"first","partition":0,"replicas":[1,2,3],"log_dirs":["any","any","any"]},{"topic":"first","partition":1,"replicas":[2,3,1],"log_dirs":["any","any","any"]},{"topic":"first","partition":2,"replicas":[3,1,2],"log_dirs":["any","any","any"]}]}

Proposed partition reassignment configuration
{"version":1,"partitions":[{"topic":"first","partition":0,"replicas":[1,4,2],"log_dirs":["any","any","any"]},{"topic":"first","partition":1,"replicas":[2,1,3],"log_dirs":["any","any","any"]},{"topic":"first","partition":2,"replicas":[3,2,4],"log_dirs":["any","any","any"]}]}
  • 如果我们想退役旧节点,例如,不想用2来存储first主题的信息,在--broker-list的参数中,不写2即可。
  • Current partition replica assignment是当前first主题的存储情况。
  • Proposed partition reassignment configuration是计划first主题的存储情况,我们注意到有了4

创建副本存储计划

新建一个JSON格式的文件,文件名任意,在这里的文件名是increase-replication-factor.json
在文件内容中,我们填入上文的Proposed partition reassignment configuration的内容。
我们也可以自己写一个结构相同的JSON文件。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
{
"version": 1,
"partitions": [{
"topic": "first",
"partition": 0,
"replicas": [1, 4, 2],
"log_dirs": ["any", "any", "any"]
}, {
"topic": "first",
"partition": 1,
"replicas": [2, 1, 3],
"log_dirs": ["any", "any", "any"]
}, {
"topic": "first",
"partition": 2,
"replicas": [3, 2, 4],
"log_dirs": ["any", "any", "any"]
}]
}

执行副本存储计划

示例代码:

1
2
3
4
./bin/kafka-reassign-partitions.sh \
--bootstrap-server kafka-01:9092 \
--reassignment-json-file ./json/increase-replication-factor.json \
--execute

运行结果:

1
2
3
4
5
6
Current partition replica assignment

{"version":1,"partitions":[{"topic":"first","partition":0,"replicas":[1,2,3],"log_dirs":["any","any","any"]},{"topic":"first","partition":1,"replicas":[2,3,1],"log_dirs":["any","any","any"]},{"topic":"first","partition":2,"replicas":[3,1,2],"log_dirs":["any","any","any"]}]}

Save this to use as the --reassignment-json-file option during rollback
Successfully started partition reassignments for first-0,first-1,first-2
  • 在"生成迁移计划"中,配置JSON文件的参数是--topics-to-move-json-file
  • 在"执行副本存储计划"中,配置JSON文件的参数是--reassignment-json-file
  • 在"生成迁移计划"中,生成参数是--generate
  • 在"执行副本存储计划"中,创建参数是--execute

验证副本存储计划

示例代码:

1
2
3
4
./bin/kafka-reassign-partitions.sh \
--bootstrap-server kafka-01:9092 \
--reassignment-json-file ./json/increase-replication-factor.json \
--verify

运行结果:

1
2
3
4
5
6
7
Status of partition reassignment:
Reassignment of partition first-0 is completed.
Reassignment of partition first-1 is completed.
Reassignment of partition first-2 is completed.

Clearing broker-level throttles on brokers 1,2,3,4
Clearing topic-level throttles on topic first
  • 注意,最后一个参数是--verify

问题排查

如果我们验证副本存储计划,发现有些任务一直处于still in progress的状态。

1
2
3
4
Status of partition reassignment:
Reassignment of partition first-0 is still in progress.
Reassignment of partition first-1 is completed.
Reassignment of partition first-2 is still in progress.

查看主题信息,发现也一直处于一个进行时的状态。

进行时的状态

可以查看对应节点的日志,位于logs目录下,在kafkaServer.out可能会发现如下的日志。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
[2023-03-03 16:16:25,512] WARN [Controller id=1, targetBrokerId=4] Error connecting to node kafka-04:9092 (id: 4 rack: null) (org.apache.kafka.clients.NetworkClient)
java.net.UnknownHostException: kafka-04
at java.net.InetAddress.getAllByName0(InetAddress.java:1292)
at java.net.InetAddress.getAllByName(InetAddress.java:1203)
at java.net.InetAddress.getAllByName(InetAddress.java:1127)
at org.apache.kafka.clients.DefaultHostResolver.resolve(DefaultHostResolver.java:27)
at org.apache.kafka.clients.ClientUtils.resolve(ClientUtils.java:110)
at org.apache.kafka.clients.ClusterConnectionStates$NodeConnectionState.currentAddress(ClusterConnectionStates.java:510)
at org.apache.kafka.clients.ClusterConnectionStates$NodeConnectionState.access$200(ClusterConnectionStates.java:467)
at org.apache.kafka.clients.ClusterConnectionStates.currentAddress(ClusterConnectionStates.java:173)
at org.apache.kafka.clients.NetworkClient.initiateConnect(NetworkClient.java:990)
at org.apache.kafka.clients.NetworkClient.ready(NetworkClient.java:301)
at org.apache.kafka.clients.NetworkClientUtils.awaitReady(NetworkClientUtils.java:64)
at kafka.controller.RequestSendThread.brokerReady(ControllerChannelManager.scala:292)
at kafka.controller.RequestSendThread.doWork(ControllerChannelManager.scala:245)
at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:96)

找不到主机kafka-04,因为没有修改其他主机的/etc/hosts文件。

退役旧节点

在上文讨论"生成迁移计划"的时候,我们提到过:

如果我们想退役旧节点,例如,不想用2来存储first主题的信息,在--broker-list的参数中,不写2即可。

这就是退役旧节点的关键步骤。

上文的"Partition迁移"重新来一遍,注意把要退役的节点去除。
最后,在要退役的节点上执行./bin/kafka-server-stop.sh,关闭Kafka即可。

调整分区副本

如果,我们不计划退役旧节点,只是想让其少承担一些负载(比如,因为机器的性能较其他机器低),怎么处理?
或者,我们除了服役新节点,还想让其多承担一些负载(比如,因为机器的性能较其他机器高),怎么处理?

在上文讨论"创建副本存储计划"的时候,我们提到过:

当然我们也可以自己写一个结构相同的JSON文件。

这就是关键步骤。

Leader分区自动平衡

现象

正常情况下,Kafka本身会自动把Leader分区均匀分散在各个机器上,来保证每台机器的读写吞吐量都是均匀的。
但是如果某些broker故障,可能会导致Leader分区过于集中在其他少部分几台broker上,这会导致少数几台broker的读写请求压力过高,其他故障的broker重启之后都默认是Follower分区,读写请求很低,造成集群负载不均衡。

例如,我们手动把broker-2broker-3故障,然后再恢复,此时first主题的分布如下,Leader分区都是broker-1上。
示例代码:

1
./bin/kafka-topics.sh --bootstrap-server 127.0.0.1:9092 --describe --topic first

运行结果:

1
2
3
4
Topic: first    TopicId: ultePqLST-yGb88tTRarwA PartitionCount: 3       ReplicationFactor: 3    Configs: 
Topic: first Partition: 0 Leader: 1 Replicas: 3,1,2 Isr: 1,2,3
Topic: first Partition: 1 Leader: 1 Replicas: 1,2,3 Isr: 1,2,3
Topic: first Partition: 2 Leader: 1 Replicas: 2,3,1 Isr: 1,2,3

但是,如果我们5分钟之后再看,会发现自动平衡了。
示例代码:

1
./bin/kafka-topics.sh --bootstrap-server 127.0.0.1:9092 --describe --topic first

运行结果:

1
2
3
4
Topic: first    TopicId: ultePqLST-yGb88tTRarwA PartitionCount: 3       ReplicationFactor: 3    Configs: 
Topic: first Partition: 0 Leader: 3 Replicas: 3,1,2 Isr: 1,2,3
Topic: first Partition: 1 Leader: 1 Replicas: 1,2,3 Isr: 1,2,3
Topic: first Partition: 2 Leader: 2 Replicas: 2,3,1 Isr: 1,2,3

这是因为Kafka有Leader分区自动平衡机制

参数

和Leader分区自动平衡机制的相关参数:

  • auto.leader.rebalance.enable
    是否开启自动Leader分区自动平衡。
    默认true
  • leader.imbalance.per.broker.percentage
    每个broker允许的不平衡的Leader的比率。
    如果每个broker超过了这个值,Controller会触发Leader的平衡。
    默认10%
  • leader.imbalance.check.interval.seconds
    检查Leader负载是否平衡的间隔时间。
    默认300秒。

不平衡率的计算

例如,针对如下的情况:

1
2
3
4
Topic: first    TopicId: ultePqLST-yGb88tTRarwA PartitionCount: 3       ReplicationFactor: 3    Configs: 
Topic: first Partition: 0 Leader: 1 Replicas: 3,1,2 Isr: 1,2,3
Topic: first Partition: 1 Leader: 1 Replicas: 1,2,3 Isr: 1,2,3
Topic: first Partition: 2 Leader: 1 Replicas: 2,3,1 Isr: 1,2,3
  • broker-1是分区1的AR(Replicas)的优先副本,实际上也是Leader,所以broker-1是平衡的。
  • broker-2是分区2的AR(Replicas)的优先副本,但实际上不是Leader,所以不平衡数+1+1,AR总数是33,不平衡率是13\frac{1}{3},所以需要再平衡。
  • broker-3是分区0的AR(Replicas)的优先副本,但实际上不是Leader,所以不平衡数+1+1,AR总数是33,不平衡率是13\frac{1}{3},所以需要再平衡。

那么,对于如下的情况呢:

1
2
3
4
Topic: first    TopicId: ultePqLST-yGb88tTRarwA PartitionCount: 3       ReplicationFactor: 3    Configs: 
Topic: first Partition: 0 Leader: 3 Replicas: 3,1,2 Isr: 1,2,3
Topic: first Partition: 1 Leader: 1 Replicas: 1,2,3 Isr: 1,2,3
Topic: first Partition: 2 Leader: 2 Replicas: 2,3,1 Isr: 1,2,3
  • 三个节点都是平衡的,计算过程略。

存储机制

存储结构

topic是逻辑上的概念,partition是物理上的概念,每个partition分为多个segment
每个segment包括:.index文件.log文件.timeindex等文件。
这些文件位于一个文件夹下,该文件夹的命名规则为:topic名称+分区序号

Topic

  • 一个topic由多个partition组成。
  • 一个partition分为多个segment
  • .log:日志文件。
    以当前segment的第一条消息的offset命名。
  • .index:偏移量索引文件。
    以当前segment的第一条消息的offset命名。
  • .timeindex:时间戳索引文件

存储内容

具体,我们可以在log.dirs定义的目录,看看内容。

1
2
3
4
5
6
7
8
9
-rw-r--r--  1 root root    4  3月  3 16:17 cleaner-offset-checkpoint
drwxr-xr-x 2 root root 4096 3月 3 19:10 first-0/
drwxr-xr-x 2 root root 4096 3月 3 19:10 first-1/
drwxr-xr-x 2 root root 4096 3月 3 19:10 first-2/
-rw-r--r-- 1 root root 0 3月 3 10:54 .lock
-rw-r--r-- 1 root root 4 3月 3 19:55 log-start-offset-checkpoint
-rw-r--r-- 1 root root 88 3月 3 10:54 meta.properties
-rw-r--r-- 1 root root 34 3月 3 19:55 recovery-point-offset-checkpoint
-rw-r--r-- 1 root root 34 3月 3 19:55 replication-offset-checkpoint

我们还看看first-0/这个文件夹里都有什么。

1
2
3
4
5
-rw-r--r-- 1 root root 10485760  3月  3 10:55 00000000000000000000.index
-rw-r--r-- 1 root root 216 3月 3 19:12 00000000000000000000.log
-rw-r--r-- 1 root root 10485756 3月 3 10:55 00000000000000000000.timeindex
-rw-r--r-- 1 root root 9 3月 3 19:10 leader-epoch-checkpoint
-rw-r--r-- 1 root root 43 3月 3 10:55 partition.metadata

来吧,看看00000000000000000000.log都有啥。示例代码:

1
less 00000000000000000000.log

运行结果:

1
"00000000000000000000.log" may be a binary file.  See it anyway?

如果想看该文件的内容,方法如下。

示例代码:

1
../../bin/kafka-run-class.sh kafka.tools.DumpLogSegments --files 00000000000000000000.log

运行结果:

1
2
3
4
5
Dumping 00000000000000000000.log
Log starting offset: 0
baseOffset: 0 lastOffset: 1 count: 2 baseSequence: 0 lastSequence: 1 producerId: 0 producerEpoch: 0 partitionLeaderEpoch: 10 isTransactional: false isControl: false deleteHorizonMs: OptionalLong.empty position: 0 CreateTime: 1677841933361 size: 78 magic: 2 compresscodec: none crc: 1068055326 isvalid: true
baseOffset: 2 lastOffset: 2 count: 1 baseSequence: 2 lastSequence: 2 producerId: 0 producerEpoch: 0 partitionLeaderEpoch: 10 isTransactional: false isControl: false deleteHorizonMs: OptionalLong.empty position: 78 CreateTime: 1677841935292 size: 69 magic: 2 compresscodec: none crc: 3952584416 isvalid: true
baseOffset: 3 lastOffset: 3 count: 1 baseSequence: 3 lastSequence: 3 producerId: 0 producerEpoch: 0 partitionLeaderEpoch: 10 isTransactional: false isControl: false deleteHorizonMs: OptionalLong.empty position: 147 CreateTime: 1677841936499 size: 69 magic: 2 compresscodec: none crc: 3258773019 isvalid: true

示例代码:

1
../../bin/kafka-run-class.sh kafka.tools.DumpLogSegments --files 00000000000000000000.index

运行结果:

1
2
3
4
Dumping 00000000000000000000.index
offset: 0 position: 0
Mismatches in :/usr/local/kafka/kafka_2.13-3.4.0/kafka-logs/first-0/00000000000000000000.index
Index offset: 0, log offset: 1

清理策略

Kafka中默认的日志保存时间为7天,可以通过调整如下参数修改保存时间。

  • log.retention.hours
    Kafka中数据保存的时间。
    默认7天。
  • log.retention.minutes
    Kafka中数据保存的时间,分钟级别。
    默认关闭。
  • log.retention.ms
    Kafka中数据保存的时间,毫秒级别。
    默认关闭。
  • log.retention.check.interval.ms
    检查数据是否保存超时的间隔。
    默认5分钟。
    该间隔时间一定要小于我们设置的数据保存时间。一个极端的例子,我们设置了数据保存时间是100毫秒,但这个值没有修改,那么100毫秒的数据保存,将不会真正的发挥作用。
  • log.retention.bytes
    超过设置的所有日志总大小,删除最早的Segment。
    默认-1,表示无穷大。
  • log.cleanup.policy
    表示所有数据启用删除策略;如果设置值为compact,表示所有数据启用压缩策略。
    默认delete

基于时间的删除策略:

  • 通俗理解,根据.log文件的时间来判断。
  • 严格意义上是,以segment中所有记录中的最大时间戳作为该文件时间戳。
    虽然,一般情况下,所有记录中的最大时间戳就是.log文件的时间。
    但是不排除有恶意软件,修改.log文件的时间。

基于大小的删除策略:
默认关闭,超过设置的所有日志总大小,删除最早的segment。

其实,这里的清理策略,和我们平时后端应用日志的清理策略类似。
也都是超过了时间的日志,删除或压缩;
总日志的大小超过了指定的大小,删除或压缩。

高效读写

Kafka是如何做到高效的读写数据的?

  1. Kafka本身是分布式集群,可以采用分区技术,并行度高。
  2. 读数据采用稀疏索引,可以快速定位要消费的数据
  3. 写数据时候,是顺序写磁盘。
    写的过程是一直追加到文件末端,为顺序写。
  4. 零拷贝
    数据加工处理操作交由Kafka生产者和Kafka消费者处理(生产者和消费者的拦截器,或者在调用生产者和消费者之前,由我们编写的应用进行处理。),Broker应用层不关心存储的数据,所以就不用走应用层,直接交给操作系统,传输效率高。
  5. 页缓存
    页缓存,也就是所谓的page chche。我们在《MySQL从入门到实践:6.事务》讨论"redo log"的"刷盘策略"的时候,讨论过这个,并且还做过实验比较。
    写数据的时候,先保存在内存中,满足一定的要求才刷到磁盘中。读数据的时候,也会先看看内存中的page chche有没有,没有的话才去磁盘读。
    是尽可能多的把尽可能多的空闲内存都当做了磁盘缓存来使用。

索引机制

过程

Kafka中的索引文件(.index.timeindex),以稀疏索引(Sparse Index)的方式构造消息的索引,它并不保证每个消息在索引文件中都有对应的索引项。
当写入一定量(由broker端参数log.index.interval.bytes指定,默认值为4096,即4KB)的消息时,.index偏移量索引文件 和.timeindex时间戳索引文件分别增加一个偏移量索引项和时间戳索引项。

稀疏索引通过MappedByteBuffer将索引文件映射到内存中,以加快索引的查询速度。

偏移量索引文件中的偏移量是单调递增的,查询指定偏移量时,根据二分查找法来快速定位偏移量的位置,如果指定的偏移量不在索引文件中,则会返回小于指定偏移量的最大偏移量。

时间戳索引文件中的时间戳也是单调递增的,查询指定时间戳时,也根据二分查找法来查找不大于该时间戳的最大偏移量,至于要找到对应的物理文件位置还需要根据偏移量索引文件来进行再次定位。

关于"二分法",可以参考我们在《算法入门经典(Java与Python描述):6.二分法》的讨论。

偏移量索引文件

我们以偏移量索引文件为例,详细讨论。

偏移量索引文件的格式如下图所示:

偏移量索引

每个索引项占用8个字节,分为两个部分:
relativeOffset:相对偏移量,表示消息相对于baseOffset的偏移量,占用4个字节。
一个日志片段的baseOffset为32,那么其文件名就是00000000000000000032.log,3指的是offset=35的消息。(32+3=35)

position:物理地址,也就是消息在日志分段文件中对应的物理位置,占用4个字节。
如下图所示:

消息在日志分段文件中对应的物理位置

例子

如何通过offset=3的Message?

如何通过offset=3找到对应的消息?

  1. 先找到offset=3的Message所在的segment文件(利用二分法查找)
  2. 读取对应的segment中的.index文件,根据找到的相对offset的索引,确定message存储的物理偏移地址为756。
  3. 根据物理偏移地址,去.log文件找相应的Message。
文章作者: Kaka Wan Yifan
文章链接: https://kakawanyifan.com/11903
版权声明: 本博客所有文章版权为文章作者所有,未经书面许可,任何机构和个人不得以任何形式转载、摘编或复制。

评论区