ZK存储的信息
信息目录
如图,是Zookeeper中存储的部分和Kafka有关的信息。
具体,我们可以通过Zookeeper命令看一下。
启动Zookeeper的Shell客户端。
在Kafka安装目录的bin
目录下,找到zookeeper-shell.sh
。
1 | ./zookeeper-shell.sh 127.0.0.1:2181 |
如果我们没有用Kafka自带的Zookeeper,而是用的外部的Zookeeper,该Sh文件应该zkCli.sh
。
通过ls
命令可以查看Zookeeper的节点信息相关信息。
示例代码:
1 | ls / |
运行结果:
1 | [kafka, zookeeper] |
查看其中和Kafka相关的。
示例代码:
1 | ls /kafka |
运行结果:
1 | [admin, brokers, cluster, config, consumers, controller, controller_epoch, feature, isr_change_notification, latest_producer_id_block, log_dir_event_notification] |
查看/kafka/brokers
示例代码:
1 | ls /kafka/brokers |
运行结果:
1 | [ids, seqid, topics] |
查看/kafka/brokers/ids
示例代码:
1 | ls /kafka/brokers/ids |
运行结果:
1 | [1, 2, 3] |
对应我们有的三个Broker。
重要信息
整体过程
Leader选举过程
- 上方的是Zookeeper集群。
- 下方的是Kafka集群。
选举过程:
- broker启动,向Kafka进行注册。
- 三个broker开始抢占Controller,先注册上的作为Controller。
- Controller监听brokers的节点变化。
- Controller决定Leader的选举
规则:在ISR中存活为前提,按照AR中排在前面的优先。
例如:AR[2,1,3]
,ISR[1,2,3]
,那么Controller会选举2
作为Leader。
AR:Kafka中的所有副本。 - Controller将节点信息上传到Zookeeper。
- 其他Controller节点会从Zookeeper拉取数据。
- 接下来,生产者发送消息。但是,如果broker-2中的Leader这时候挂了呢?
- Controller监听到节点(
/brokers/ids/
)的变化。 - Controller从Zookeeper中重新获取ISR信息。
- 选举新的Leader,之后更新Leader信息和ISR信息等。
我们可以进行实验,手动下线一些broker,以验证上述过程,具体过程略。
故障处理细节
几个概念
- 副本
- 作用:提高数据可靠性。
- 数量:默认1个副本,生产环境一般配置为2个;太多副本会增加磁盘存储空间,增加网络上数据传输,降低效率。
- 分类:Kafka中副本分为Leader和Follower。生产者只会把数据发往Leader,然后Follower找Leader进行同步数据。
- AR:所有副本统称为AR(Assigned Repllicas)。
AR = ISR + OSR - ISR:和Leader保持同步的Follower集合,如果Follower长时间未向Leader发送通信请求或同步数据,则该Follower将被踢出ISR。
- OSR:没有和Leader保持同步(延迟过多)的Follower集合。
那么,OSR中的副本,什么情况下会恢复到ISR中呢?
这就是我们接下来要讨论的问题的。
LEO和HW
LEO
(Log End Offset
):每个副本的最后一个offset
。HW
(High Watermark
):所有副本中最小的LEO。
为什么Follower的数据会和Leader的没对齐呢?
生产者只会把数据发往Leader,然后Follower找Leader进行同步数据。所以,可能会在某一个瞬间,数据没对齐。
Follower故障
Leader故障
如果Controller挂了
通过上文的讨论,我们知道,只有一个Controller。如果Controller挂了呢?
Kafka为Controller提供了故障转移(Failover)功能,当运行中的Controller突然宕机或意外终止时,Kafka能够快速地感知到,并立即启用备用Controller来代替之前失败的Controller。
- 当Broker-0宕机后,Zookeeper通过Watch机制感知到并删除了Controller临时节点。
所有存活的Broker开始竞选新的Controller身份。 - Broker-3最终赢得了选举,成功地在ZooKeeper上重建了Controller节点。
- Broker-3会从Zookeeper中读取集群元数据信息,并初始化到自己的缓存中。
至此Controller的Failover完成,可以行使正常的工作职责了。
脑裂问题
如果之前的Controller只是因为某些原因(例如GC),而被认为是挂掉了呢,现在又恢复了,这时候集群中存在两个Controller,那怎么办?
Kafka通过使用epoch number
(纪元编号,也称为隔离令牌)来克服脑裂问题。
epoch number
是单调递增的数字,第一次选出Controller时,epoch number
值为1,如果再次选出新的Controller,则epoch number
将为2,依次单调递增。
每个新选出的Controller,通过Zookeeper的条件递增操作都会获得一个全新的、数值更大的epoch number
。
其他Broker在知道当前epoch number
后,如果收到由Controller发出的包含较旧(较小)epoch number
的消息,就会忽略它们,即Broker根据最大的epoch number
来区分当前最新的Controller。
重要参数
节点管理
包括节点的退役和服役在内的节点管理,其实操作是一样的,是同一种方法的不同应用场景。
新增节点
讨论了怎么服役新节点,也就讨论了怎么退役旧节点。
现在是我们的Kafka集群有三个节点,如果我们想再增加一个节点,应该怎么操作?
新增节点的方法很简单,再安装一个Kakfa,修改server.properties
,重点关注如下三项:
broker.id
zookeeper.connect
log.dirs
然后直接启动即可。
启动完成之后,我们可以在Zookeeper上看一下节点信息。
示例代码:
1 | ls /kafka/brokers/ids |
运行结果:
1 | [1, 2, 3, 4] |
会发现新的节点已经加进来了。
Partition迁移
有些资料会称Partition迁移
为负载均衡操作
。我个人观点,负载均衡操作
,这个名字太笼统了。
假如有这么一个场景,我们有一个主题first
,分布在之前的三个节点上。示例代码:
1 | ./bin/kafka-topics.sh --bootstrap-server kafka-01:9092 --describe --topic first |
运行结果:
1 | Topic: first TopicId: ultePqLST-yGb88tTRarwA PartitionCount: 3 ReplicationFactor: 3 Configs: |
现在,我们想让其也分布在新服役的第四个节点上。
创建迁移文件
新建一个JSON格式的文件,文件名任意,在这里的文件名是topics-to-move.json
。
在文件内容中,我们按如下的格式配置需要被均衡的主题。
1 | { |
生成迁移计划
示例代码:
1 | ./bin/kafka-reassign-partitions.sh \ |
运行结果:
1 | Current partition replica assignment |
- 如果我们想退役旧节点,例如,不想用
2
来存储first
主题的信息,在--broker-list
的参数中,不写2
即可。 Current partition replica assignment
是当前first主题的存储情况。Proposed partition reassignment configuration
是计划first主题的存储情况,我们注意到有了4
。
创建副本存储计划
新建一个JSON格式的文件,文件名任意,在这里的文件名是increase-replication-factor.json
。
在文件内容中,我们填入上文的Proposed partition reassignment configuration
的内容。
我们也可以自己写一个结构相同的JSON文件。
1 | { |
执行副本存储计划
示例代码:
1 | ./bin/kafka-reassign-partitions.sh \ |
运行结果:
1 | Current partition replica assignment |
- 在"生成迁移计划"中,配置JSON文件的参数是
--topics-to-move-json-file
。 - 在"执行副本存储计划"中,配置JSON文件的参数是
--reassignment-json-file
。
- 在"生成迁移计划"中,生成参数是
--generate
。 - 在"执行副本存储计划"中,创建参数是
--execute
。
验证副本存储计划
示例代码:
1 | ./bin/kafka-reassign-partitions.sh \ |
运行结果:
1 | Status of partition reassignment: |
- 注意,最后一个参数是
--verify
。
问题排查
如果我们验证副本存储计划,发现有些任务一直处于still in progress
的状态。
1 | Status of partition reassignment: |
查看主题信息,发现也一直处于一个进行时的状态。
可以查看对应节点的日志,位于logs
目录下,在kafkaServer.out
可能会发现如下的日志。
1 | [2023-03-03 16:16:25,512] WARN [Controller id=1, targetBrokerId=4] Error connecting to node kafka-04:9092 (id: 4 rack: null) (org.apache.kafka.clients.NetworkClient) |
找不到主机kafka-04
,因为没有修改其他主机的/etc/hosts
文件。
退役旧节点
在上文讨论"生成迁移计划"的时候,我们提到过:
如果我们想退役旧节点,例如,不想用
2
来存储first主题的信息,在--broker-list
的参数中,不写2
即可。
这就是退役旧节点的关键步骤。
上文的"Partition迁移"重新来一遍,注意把要退役的节点去除。
最后,在要退役的节点上执行./bin/kafka-server-stop.sh
,关闭Kafka即可。
调整分区副本
如果,我们不计划退役旧节点,只是想让其少承担一些负载(比如,因为机器的性能较其他机器低),怎么处理?
或者,我们除了服役新节点,还想让其多承担一些负载(比如,因为机器的性能较其他机器高),怎么处理?
在上文讨论"创建副本存储计划"的时候,我们提到过:
当然我们也可以自己写一个结构相同的JSON文件。
这就是关键步骤。
Leader分区自动平衡
现象
正常情况下,Kafka本身会自动把Leader分区均匀分散在各个机器上,来保证每台机器的读写吞吐量都是均匀的。
但是如果某些broker故障,可能会导致Leader分区过于集中在其他少部分几台broker上,这会导致少数几台broker的读写请求压力过高,其他故障的broker重启之后都默认是Follower分区,读写请求很低,造成集群负载不均衡。
例如,我们手动把broker-2
和broker-3
故障,然后再恢复,此时first
主题的分布如下,Leader分区都是broker-1
上。
示例代码:
1 | ./bin/kafka-topics.sh --bootstrap-server 127.0.0.1:9092 --describe --topic first |
运行结果:
1 | Topic: first TopicId: ultePqLST-yGb88tTRarwA PartitionCount: 3 ReplicationFactor: 3 Configs: |
但是,如果我们5分钟之后再看,会发现自动平衡了。
示例代码:
1 | ./bin/kafka-topics.sh --bootstrap-server 127.0.0.1:9092 --describe --topic first |
运行结果:
1 | Topic: first TopicId: ultePqLST-yGb88tTRarwA PartitionCount: 3 ReplicationFactor: 3 Configs: |
这是因为Kafka有Leader分区自动平衡机制
参数
和Leader分区自动平衡机制的相关参数:
auto.leader.rebalance.enable
是否开启自动Leader分区自动平衡。
默认true
。leader.imbalance.per.broker.percentage
每个broker允许的不平衡的Leader的比率。
如果每个broker超过了这个值,Controller会触发Leader的平衡。
默认10%
。leader.imbalance.check.interval.seconds
检查Leader负载是否平衡的间隔时间。
默认300秒。
不平衡率的计算
例如,针对如下的情况:
1 | Topic: first TopicId: ultePqLST-yGb88tTRarwA PartitionCount: 3 ReplicationFactor: 3 Configs: |
broker-1
是分区1的AR(Replicas)的优先副本,实际上也是Leader,所以broker-1
是平衡的。broker-2
是分区2的AR(Replicas)的优先副本,但实际上不是Leader,所以不平衡数,AR总数是,不平衡率是,所以需要再平衡。broker-3
是分区0的AR(Replicas)的优先副本,但实际上不是Leader,所以不平衡数,AR总数是,不平衡率是,所以需要再平衡。
那么,对于如下的情况呢:
1 | Topic: first TopicId: ultePqLST-yGb88tTRarwA PartitionCount: 3 ReplicationFactor: 3 Configs: |
- 三个节点都是平衡的,计算过程略。
存储机制
存储结构
topic
是逻辑上的概念,partition
是物理上的概念,每个partition
分为多个segment
。
每个segment包括:.index文件
、.log文件
和.timeindex
等文件。
这些文件位于一个文件夹下,该文件夹的命名规则为:topic名称+分区序号
- 一个
topic
由多个partition
组成。 - 一个
partition
分为多个segment
。 .log
:日志文件。
以当前segment
的第一条消息的offset命名。.index
:偏移量索引文件。
以当前segment
的第一条消息的offset命名。.timeindex
:时间戳索引文件
存储内容
具体,我们可以在log.dirs
定义的目录,看看内容。
1 | -rw-r--r-- 1 root root 4 3月 3 16:17 cleaner-offset-checkpoint |
我们还看看first-0/
这个文件夹里都有什么。
1 | -rw-r--r-- 1 root root 10485760 3月 3 10:55 00000000000000000000.index |
来吧,看看00000000000000000000.log
都有啥。示例代码:
1 | less 00000000000000000000.log |
运行结果:
1 | "00000000000000000000.log" may be a binary file. See it anyway? |
如果想看该文件的内容,方法如下。
示例代码:
1 | ../../bin/kafka-run-class.sh kafka.tools.DumpLogSegments --files 00000000000000000000.log |
运行结果:
1 | Dumping 00000000000000000000.log |
示例代码:
1 | ../../bin/kafka-run-class.sh kafka.tools.DumpLogSegments --files 00000000000000000000.index |
运行结果:
1 | Dumping 00000000000000000000.index |
清理策略
Kafka中默认的日志保存时间为7天,可以通过调整如下参数修改保存时间。
log.retention.hours
Kafka中数据保存的时间。
默认7天。log.retention.minutes
Kafka中数据保存的时间,分钟级别。
默认关闭。log.retention.ms
Kafka中数据保存的时间,毫秒级别。
默认关闭。log.retention.check.interval.ms
检查数据是否保存超时的间隔。
默认5分钟。
该间隔时间一定要小于我们设置的数据保存时间。一个极端的例子,我们设置了数据保存时间是100毫秒,但这个值没有修改,那么100毫秒的数据保存,将不会真正的发挥作用。log.retention.bytes
超过设置的所有日志总大小,删除最早的Segment。
默认-1
,表示无穷大。log.cleanup.policy
表示所有数据启用删除策略;如果设置值为compact
,表示所有数据启用压缩策略。
默认delete
。
基于时间的删除策略:
- 通俗理解,根据
.log
文件的时间来判断。 - 严格意义上是,以segment中所有记录中的最大时间戳作为该文件时间戳。
虽然,一般情况下,所有记录中的最大时间戳就是.log
文件的时间。
但是不排除有恶意软件,修改.log
文件的时间。
基于大小的删除策略:
默认关闭,超过设置的所有日志总大小,删除最早的segment。
其实,这里的清理策略,和我们平时后端应用日志的清理策略类似。
也都是超过了时间的日志,删除或压缩;
总日志的大小超过了指定的大小,删除或压缩。
高效读写
Kafka是如何做到高效的读写数据的?
- Kafka本身是分布式集群,可以采用分区技术,并行度高。
- 读数据采用稀疏索引,可以快速定位要消费的数据
- 写数据时候,是顺序写磁盘。
写的过程是一直追加到文件末端,为顺序写。 - 零拷贝
数据加工处理操作交由Kafka生产者和Kafka消费者处理(生产者和消费者的拦截器,或者在调用生产者和消费者之前,由我们编写的应用进行处理。),Broker应用层不关心存储的数据,所以就不用走应用层,直接交给操作系统,传输效率高。 - 页缓存
页缓存,也就是所谓的page chche
。我们在《MySQL从入门到实践:6.事务》讨论"redo log"的"刷盘策略"的时候,讨论过这个,并且还做过实验比较。
写数据的时候,先保存在内存中,满足一定的要求才刷到磁盘中。读数据的时候,也会先看看内存中的page chche
有没有,没有的话才去磁盘读。
是尽可能多的把尽可能多的空闲内存都当做了磁盘缓存来使用。
索引机制
过程
Kafka中的索引文件(.index
和.timeindex
),以稀疏索引(Sparse Index)的方式构造消息的索引,它并不保证每个消息在索引文件中都有对应的索引项。
当写入一定量(由broker端参数log.index.interval.bytes
指定,默认值为4096,即4KB)的消息时,.index
偏移量索引文件 和.timeindex
时间戳索引文件分别增加一个偏移量索引项和时间戳索引项。
稀疏索引通过MappedByteBuffer将索引文件映射到内存中,以加快索引的查询速度。
偏移量索引文件中的偏移量是单调递增的,查询指定偏移量时,根据二分查找法来快速定位偏移量的位置,如果指定的偏移量不在索引文件中,则会返回小于指定偏移量的最大偏移量。
时间戳索引文件中的时间戳也是单调递增的,查询指定时间戳时,也根据二分查找法来查找不大于该时间戳的最大偏移量,至于要找到对应的物理文件位置还需要根据偏移量索引文件来进行再次定位。
偏移量索引文件
我们以偏移量索引文件为例,详细讨论。
偏移量索引文件的格式如下图所示:
每个索引项占用8个字节,分为两个部分:
relativeOffset
:相对偏移量,表示消息相对于baseOffset的偏移量,占用4个字节。
一个日志片段的baseOffset
为32,那么其文件名就是00000000000000000032.log
,3指的是offset=35的消息。(32+3=35)
position
:物理地址,也就是消息在日志分段文件中对应的物理位置,占用4个字节。
如下图所示:
例子
如何通过offset=3的Message?
- 先找到
offset=3
的Message所在的segment文件(利用二分法查找) - 读取对应的segment中的
.index
文件,根据找到的相对offset的索引,确定message存储的物理偏移地址为756。 - 根据物理偏移地址,去
.log
文件找相应的Message。