定义
Kafka是一个分布式的基于发布/订阅模式的消息队列(Message Queue),主要应用于大数据实时处理领域。
官网:https://kafka.apache.org
官方文档:https://kafka.apache.org/documentation
传统的定义是这样的,当然Kafka这个组织的野心,不只有这么一点。
Kafka官方的最新定义是:Kafka是一个开源的分布式事件流平台(Event Streaming Platform),被数千家公司用于高性能数据管道、流分析、数据集成和关键任务应用。
例如,我们可以将Kafka用于日志归档。
消息队列
概述
关于消息队列,我们在《基于Java的后端开发入门:22.SpringBoot [2/3]》,有过简单的讨论。
常见的消息队列协议有三种:JMS
、AMQP
、MQTT
。
常见的消息队列产品有:Kafka
、ActiveMQ
(JMS
)、RabbitMQ
(AMQP
)和RocketMQ
(AMQP
)等。
应用场景
消息队列的应用场景有:
消峰(缓冲)
解耦
异步通信
消峰(缓冲)
例如,我们的秒杀系统,每秒只能处理一千万人次的请求,但现在的访问量每秒有十亿人次。针对这种情况,我们可以先把所有的请求都保存在消息队列,然后由秒杀系统逐步进行处理。
解耦
允许独立的扩展或修改两边的处理过程,只要确保它们遵守同样的接口约束
(在本文开头定义部分说的"日志归档",其实就是解耦,业务系统和日志存储系统是解耦的。)
异步通信
允许用户把一个消息放入队列,但并不立即处理它,然后在需要的时候再去处理它们。
两种模式
消息队列有两种模式:
- 点对点模式:
消费者主动拉取数据,消息收到后清除消息。 - 发布/订阅模式:
消费者消费数据之后,不删除数据。
每个消费者相互独立,都可以消费到数据。
可以有多个topic主题(浏览、点赞、收藏、评论等)。
Kafka基于的就是发布订阅模式。
架构模型
整体架构
整体架构,如图所示。
接下来,我们会一步一步解析,这么设计的原因。
分区
从最简单的架构开始。
producer
:消息生产者,向Kafka broker
发消息的客户端。consumer
:消息消费者,向Kafka broker
收消息的客户端。topic
:可以理解为一个队列,生产者和消费者面向的都是一个topic
。
如果我们的数据有100T呢?
为了方便扩展、提高吞吐量,我们将一个topic
分为多个partition
。
同时,为了配合分区的设计,设计了消费者组,组内每个消费者并行消费。
一个分区的数据,只能由一个消费者进行消费。
consumer group
:消费者组,由多个consumer
组成。消费者组内每个消费者负责消费不同分区的数据,一个分区只能由消费者组内的一个消费者消费;消费者组之间互不影响。消费者组是逻辑上的一个订阅者。broker
:一台Kafka服务器就是一个broker
,一个集群由多个broker
组成,一个broker
可以容纳多个topic
。partition
:为了实现扩展性,将一个非常大的topic
可以分布到多个broker
上。一个topic
可以分为多个partition
,每个partition
是一个有序的队列。
分区的好处:
- 分区可以 合理使用存储资源 ,每个Partition在一个Broker上存储,可以把海量的数据按照分区切割成一块一块数据存储在多台Broker上。
- 可以 提高并行度 ,生产者可以以分区为单位发送数据;消费者可以以分区为单位进行消费数据。
- 如果我们合理控制分区,还可以实现 负载均衡 。
副本
为提高可用性,为每个partition
增加若干副本。
副本分为leader
和follower
。生产和消费只针对leader
,在leader
挂了之后,follower
有条件成为leader
。
replica
:副本。一个topic
的每个分区都有若干个副本,分为一个leader
和若干个follower
。leader
:每个分区多个副本的"主",生产者发送数据的对象,以及消费者消费数据的对象都是leader
。follower
:每个分区多个副本中的"从",实时从leader
中同步数据,保持和leader
数据的同步。leader
发生故障时,某个follower
会成为新的leader
。
Zookeeper
Kafka中,还有一部分数据存储在Zookeeper,Zookeeper记录哪些服务器上线了,记录每一个分区中的leader
、follower
。
Kraft
在2.8.0以后也可以配置不采用Zookeeper,这个被称为Kraft模式。
Kraft模式,是未来的趋势,Zookeeper模式,是现在的主流。
在本文,我们对于两种模式都会讨论,但还是以Zookeeper模式为主。
安装部署
在《基于Java的后端开发入门:22.SpringBoot [2/3]》,我们已经讨论了Kafka的安装部署。
在这里我们更详细的讨论。
集群规划
01 | 02 | 03 |
---|---|---|
Zookeeper | Zookeeper | Zookeeper |
Kafka | Kafka | Kafka |
在本文的示例中,我们所有的服务器(三台),都安装了Zookeeper和Kafka。
这并不是说都要这么规划(所有的服务器都安装Zookeeper和Kafka),比如我们有10台服务器,不一定需要10台服务器都安装Zookeeper和Kafka。
集群部署
下载地址
下载地址:http://kafka.apache.org/downloads.html
注意红框标出的部分。解释一下,Kafka是用Scala语言编写的;kafka_2.12-3.4.0.tgz
的含义是,用Scala 2.12
编写的,Kafka的版本是3.4.0
;kafka_2.13-3.4.0.tgz
的含义是,用Scala 2.13
编写的,Kafka的版本是3.4.0
;推荐用Scala 2.13
编写的。
JDK
注意,虽然Kafka是用Scala编写的,但是部分模块(Provider和Consumer相关的),是基于Java的,所以我们还是需要安装JDK。关于JDK的安装,可以参考《ElasticSearch实战入门(6.X):1.工具、概念、集群和倒排索引》。
如果没有安装JDK,在运行的过程中,可能会有如下的报错:
1 | /usr/kafka/kafka_2.13-3.4.0/bin/kafka-run-class.sh: line 346: exec: java: not found |
解压安装包
示例代码:
1 | tar -zxvf kafka_2.13-3.4.0.tgz |
server.properties
修改config
目录下的server.properties
。
zookeeper.properties
zookeeper.properties
的内容如下:
1 | # the directory where the snapshot is stored. |
我们修改dataDir
,改为/usr/kafka/kafka_2.13-3.4.0/zk/data
然后在dataDir
定义的目录(/usr/kafka/kafka_2.13-3.4.0/zk/data
)下创建一个myid
文件,内容分别是1
、2
、3
,记录每个服务器的ID。
Zookeeper集群配置,需要添加如下内容:
1 | 172.23.2.82:2287:3387 = |
Zookeeper集群配置,还需要添加如下内容:
1 | initLimit=10 |
initLimit
leader
和follow
初始连接时能容忍的最多心跳数。syncLimit=2
leader
和follower
之间发送消息, 请求和应答的最大心跳数。
Zookeeper的一次心跳的时间由tickTime
配置,默认该值为2000
。
启动集群
先启动Zookeeper:
1 | ./zookeeper-server-start.sh -daemon ../config/zookeeper.properties |
再启动Kafka:
1 | ./kafka-server-start.sh -daemon ../config/server.properties |
关闭集群
先关闭Kafka:
1 | ./bin/kafka-server-stop.sh |
再关闭Zookeeper:
1 | ./bin/zookeeper-server-stop.sh |
问题排查
Kraft
关于下载、JDK以及解压部分,没有任何区别。
server.properties
注意,这里的server.properties
,位于config/kraft/server.properties
。
初始化集群数据目录
启动集群
不需要启动Zookeeper,只需要启动Kafka。
注意server.properties
的地址。
1 | ./bin/kafka-server-start.sh -daemon config/kraft/server.properties |
命令行
主题命令行
查看参数
1 | ./bin/kafka-topics.sh |
运行结果:
1 | Create, delete, describe, or change a topic. |
参数 | 描述 |
---|---|
--bootstrap-server <String: server toconnect to> |
连接的KafkaBroker主机名称和端口号 |
--topic <String: topic> |
操作的topic名称 |
--create |
创建主题 |
--delete |
删除主题 |
--alter |
修改主题 |
--list |
查看所有主题 |
--describe |
查看主题详细描述 |
--partitions <Integer: # of partitions> |
设置分区数 |
--replication-factor <Integer: replication factor> |
设置分区副本 |
--config <String: name=value> |
更新系统默认的配置。 |
查看所有的主题
1 | ./bin/kafka-topics.sh --bootstrap-server 127.0.0.1:9092 --list |
创建主题
1 | ./bin/kafka-topics.sh --bootstrap-server 127.0.0.1:9092 --create --partitions 1 --replication-factor 3 --topic first |
选项说明:
--topic
:定义topic名称--replication-factor
:定义副本数--partitions
:定义分区数
查看主题详情
1 | ./bin/kafka-topics.sh --bootstrap-server 127.0.0.1:9092 --describe --topic first |
1 | Topic: first TopicId: Z4rDyAllS3OBmrZaVddP-Q PartitionCount: 1 ReplicationFactor: 3 Configs: |
修改分区数
注意,分区只能增加,不能减少。
1 | ./bin/kafka-topics.sh --bootstrap-server 127.0.0.1:9092 --alter --topic first --partitions 3 |
修改之后,我们再查看一下:
1 | Topic: first TopicId: Z4rDyAllS3OBmrZaVddP-Q PartitionCount: 3 ReplicationFactor: 3 Configs: |
删除主题
1 | ./bin/kafka-topics.sh --bootstrap-server 127.0.0.1:9092 --delete --topic first |
生产者命令行
查看参数
1 | ./bin/kafka-console-producer.sh |
运行结果:
1 | Missing required option(s) [bootstrap-server] |
参数 | 描述 |
---|---|
--bootstrap-server <String: server toconnect to> |
连接的KafkaBroker主机名称和端口号 |
--topic <String: topic> |
操作的topic名称 |
发送消息
1 | ./bin/kafka-console-producer.sh --bootstrap-server 127.0.0.1:9092 --topic first |
消费者命令行
查看参数
1 | ./bin/kafka-console-consumer.sh |
参数 | 描述 |
---|---|
--bootstrap-server <String: server toconnect to> |
连接的KafkaBroker主机名称和端口号 |
--topic <String: topic> |
操作的topic名称 |
--from-beginning |
从头开始消费 |
--group <String: consumer group id> |
指定消费者组名称 |
消费消息
消费first主题中的数据:
1 | ./bin/kafka-console-consumer.sh --bootstrap-server 127.0.0.1:9092 --topic first |
把主题中所有的数据都读取出来(包括历史数据):
1 | ./bin/kafka-console-consumer.sh --bootstrap-server 127.0.0.1:9092 --from-beginning --topic first |