avatar


Kafka-1.初步认识

定义

Kafka是一个分布式的基于发布/订阅模式的消息队列(Message Queue),主要应用于大数据实时处理领域。

官网:https://kafka.apache.org
官方文档:https://kafka.apache.org/documentation

传统的定义是这样的,当然Kafka这个组织的野心,不只有这么一点。
Kafka官方的最新定义是:Kafka是一个开源的分布式事件流平台(Event Streaming Platform),被数千家公司用于高性能数据管道、流分析、数据集成和关键任务应用。

例如,我们可以将Kafka用于日志归档。

Kafka用于日志采集和归档

关于上图中的Filebeat,可以参考《未分类【计算机】:日志采集工具Filebeat》

消息队列

概述

关于消息队列,我们在《基于Java的后端开发入门:22.SpringBoot [2/3]》,有过简单的讨论。

常见的消息队列协议有三种:JMSAMQPMQTT
常见的消息队列产品有:KafkaActiveMQ(JMS)、RabbitMQ(AMQP)和RocketMQ(AMQP)等。

应用场景

消息队列的应用场景有:

  • 消峰(缓冲)
  • 解耦
  • 异步通信

消峰(缓冲)

消峰(缓冲)

例如,我们的秒杀系统,每秒只能处理一千万人次的请求,但现在的访问量每秒有十亿人次。针对这种情况,我们可以先把所有的请求都保存在消息队列,然后由秒杀系统逐步进行处理。

解耦

允许独立的扩展或修改两边的处理过程,只要确保它们遵守同样的接口约束

解耦

(在本文开头定义部分说的"日志归档",其实就是解耦,业务系统和日志存储系统是解耦的。)

异步通信

允许用户把一个消息放入队列,但并不立即处理它,然后在需要的时候再去处理它们。

异步通信

两种模式

消息队列有两种模式:

  1. 点对点模式:
    消费者主动拉取数据,消息收到后清除消息。
  2. 发布/订阅模式:
    消费者消费数据之后,不删除数据。
    每个消费者相互独立,都可以消费到数据。
    可以有多个topic主题(浏览、点赞、收藏、评论等)。

Kafka基于的就是发布订阅模式。

两种模式

架构模型

整体架构

整体架构

整体架构,如图所示。
接下来,我们会一步一步解析,这么设计的原因。

分区

从最简单的架构开始。

基础架构

  • producer:消息生产者,向Kafka broker发消息的客户端。
  • consumer:消息消费者,向Kafka broker收消息的客户端。
  • topic:可以理解为一个队列,生产者和消费者面向的都是一个topic

如果我们的数据有100T呢?
为了方便扩展、提高吞吐量,我们将一个topic分为多个partition
同时,为了配合分区的设计,设计了消费者组,组内每个消费者并行消费。
一个分区的数据,只能由一个消费者进行消费。

一个topic分为多个partition。

  • consumer group:消费者组,由多个consumer组成。消费者组内每个消费者负责消费不同分区的数据,一个分区只能由消费者组内的一个消费者消费;消费者组之间互不影响。消费者组是逻辑上的一个订阅者。
  • broker:一台Kafka服务器就是一个broker,一个集群由多个broker组成,一个broker可以容纳多个topic
  • partition:为了实现扩展性,将一个非常大的topic可以分布到多个broker上。一个topic可以分为多个partition,每个partition是一个有序的队列。

分区的好处:

  • 分区可以 合理使用存储资源 ,每个Partition在一个Broker上存储,可以把海量的数据按照分区切割成一块一块数据存储在多台Broker上。
  • 可以 提高并行度 ,生产者可以以分区为单位发送数据;消费者可以以分区为单位进行消费数据。
  • 如果我们合理控制分区,还可以实现 负载均衡

副本

为提高可用性,为每个partition增加若干副本。
副本分为leaderfollower。生产和消费只针对leader,在leader挂了之后,follower有条件成为leader

为每个增加若干副本

  • replica:副本。一个topic的每个分区都有若干个副本,分为一个leader和若干个follower
  • leader:每个分区多个副本的"主",生产者发送数据的对象,以及消费者消费数据的对象都是leader
  • follower:每个分区多个副本中的"从",实时从leader中同步数据,保持和leader数据的同步。leader发生故障时,某个follower会成为新的leader

Zookeeper

Kafka中,还有一部分数据存储在Zookeeper,Zookeeper记录哪些服务器上线了,记录每一个分区中的leaderfollower

ZK

Kraft

在2.8.0以后也可以配置不采用Zookeeper,这个被称为Kraft模式。
Kraft模式,是未来的趋势,Zookeeper模式,是现在的主流。
在本文,我们对于两种模式都会讨论,但还是以Zookeeper模式为主。

安装部署

《基于Java的后端开发入门:22.SpringBoot [2/3]》,我们已经讨论了Kafka的安装部署。
在这里我们更详细的讨论。

集群规划

01 02 03
Zookeeper Zookeeper Zookeeper
Kafka Kafka Kafka

在本文的示例中,我们所有的服务器(三台),都安装了Zookeeper和Kafka。
这并不是说都要这么规划(所有的服务器都安装Zookeeper和Kafka),比如我们有10台服务器,不一定需要10台服务器都安装Zookeeper和Kafka。

集群部署

下载地址

下载地址:http://kafka.apache.org/downloads.html

版本

注意红框标出的部分。解释一下,Kafka是用Scala语言编写的;kafka_2.12-3.4.0.tgz的含义是,用Scala 2.12编写的,Kafka的版本是3.4.0kafka_2.13-3.4.0.tgz的含义是,用Scala 2.13编写的,Kafka的版本是3.4.0;推荐用Scala 2.13编写的。

我们可以再关注一下2.8.0版本,有这么一句话:Early access of replace Zookeeper with a self-managed quorum
这就是Kraft模式。

Early access of replace Zookeeper with a self-managed quorum

JDK

注意,虽然Kafka是用Scala编写的,但是部分模块(Provider和Consumer相关的),是基于Java的,所以我们还是需要安装JDK。关于JDK的安装,可以参考《ElasticSearch实战入门(6.X):1.工具、概念、集群和倒排索引》
如果没有安装JDK,在运行的过程中,可能会有如下的报错:

1
/usr/kafka/kafka_2.13-3.4.0/bin/kafka-run-class.sh: line 346: exec: java: not found

解压安装包

示例代码:

1
tar -zxvf kafka_2.13-3.4.0.tgz

server.properties

修改config目录下的server.properties

broker.id

1
2
# The id of the broker. This must be set to a unique integer for each broker.
broker.id=0

broker.id在Kafka集群的每个broker节点上,不能重复。在这里,我们的三台机器,分别配置为123

log.dirs

1
2
# A comma separated list of directories under which to store log files
log.dirs=/tmp/kafka-logs

将该目录修改为Kafka安装目录下,这样便于我们管理,在本文,我们修改为

1
log.dirs=/usr/kafka/kafka_2.13-3.4.0/kafka-logs

zookeeper.connect

1
2
3
4
5
6
# Zookeeper connection string (see zookeeper docs for details).
# This is a comma separated host:port pairs, each corresponding to a zk
# server. e.g. "127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002".
# You can also append an optional chroot string to the urls to specify the
# root directory for all kafka znodes.
zookeeper.connect=localhost:2181

修改Zookeeper的地址,改为:

1
zookeeper.connect=172.23.2.82:2181,172.23.2.83:2181,172.23.2.84:2181/kafka

注意,我们在地址的结尾加上了/kafka
《基于Java的后端开发入门:20.Dubbo和Zookeeper》,我们讨论过,Zookeeper是一种树型的目录服务,如果不加上/kafka,Kafka集群的信息会分散在整个Zookeeper的目录树中。加上/kafka,则只会存在于目录树的/kafka分支下,这样有利于我们维护。

更多配置:

  • num.network.threads=3,处理网络请求的线程数量。
  • num.replica.fetchers=1,副本拉取数据的线程数。
  • num.io.threads=8,处理磁盘IO的线程数量。
  • socket.send.buffer.bytes=102400,发送套接字的缓冲区大小。
  • socket.receive.buffer.bytes=102400,接收套接字的缓冲区大小。
  • socket.request.max.bytes=104857600,请求套接字的缓冲区大小。
  • num.partitions=1,每个topic在当前broker上的分区个数。
  • num.recovery.threads.per.data.dir=1,用来恢复和清理data下数据的线程数量。
  • offsets.topic.replication.factor=1,每个topic创建时的副本数。
  • log.retention.hours=168,segment文件保留的最长时间,超时将被删除。
  • log.segment.bytes=1073741824,segment文件的最大的大小,默认最大1G。
  • log.retention.check.interval.ms,检查过期数据的时间。

zookeeper.properties

zookeeper.properties的内容如下:

1
2
3
4
5
6
7
8
9
10
# the directory where the snapshot is stored.
dataDir=/tmp/zookeeper
# the port at which the clients will connect
clientPort=2181
# disable the per-ip limit on the number of connections since this is a non-production config
maxClientCnxns=0
# Disable the adminserver by default to avoid port conflicts.
# Set the port to something non-conflicting if choosing to enable this
admin.enableServer=false
# admin.serverPort=8080

我们修改dataDir,改为/usr/kafka/kafka_2.13-3.4.0/zk/data
然后在dataDir定义的目录(/usr/kafka/kafka_2.13-3.4.0/zk/data)下创建一个myid文件,内容分别是123,记录每个服务器的ID。

Zookeeper集群配置,需要添加如下内容:

1
2
3
server.1=172.23.2.82:2287:3387
server.2=172.23.2.83:2287:3387
server.3=172.23.2.84:2287:3387

Zookeeper集群配置,还需要添加如下内容:

1
2
initLimit=10
syncLimit=5
  • initLimit
    leaderfollow初始连接时能容忍的最多心跳数。
  • syncLimit=2
    leaderfollower之间发送消息, 请求和应答的最大心跳数。

Zookeeper的一次心跳的时间由tickTime配置,默认该值为2000

启动集群

先启动Zookeeper:

1
./zookeeper-server-start.sh -daemon ../config/zookeeper.properties

再启动Kafka:

1
./kafka-server-start.sh -daemon ../config/server.properties

关闭集群

先关闭Kafka:

1
./bin/kafka-server-stop.sh

再关闭Zookeeper:

1
./bin/zookeeper-server-stop.sh

问题排查

日志文件

Kafka以及Kafka自带的Zookeeper的日志文件位于logs目录下,例如/usr/kafka/kafka_2.13-3.4.0/logs

initLimit is not set

如果在zookeeper.out有类似如下的报错,需要在配置文件中新增内容。

1
2
3
4
5
6
7
8
9
10
org.apache.zookeeper.server.quorum.QuorumPeerConfig$ConfigException: Error processing ../config/zookeeper.properties
at org.apache.zookeeper.server.quorum.QuorumPeerConfig.parse(QuorumPeerConfig.java:198)
at org.apache.zookeeper.server.quorum.QuorumPeerMain.initializeAndRun(QuorumPeerMain.java:124)
at org.apache.zookeeper.server.quorum.QuorumPeerMain.main(QuorumPeerMain.java:90)
Caused by: java.lang.IllegalArgumentException: initLimit is not set
at org.apache.zookeeper.server.quorum.QuorumPeerConfig.checkValidity(QuorumPeerConfig.java:806)
at org.apache.zookeeper.server.quorum.QuorumPeerConfig.setupQuorumPeerConfig(QuorumPeerConfig.java:683)
at org.apache.zookeeper.server.quorum.QuorumPeerConfig.parseProperties(QuorumPeerConfig.java:507)
at org.apache.zookeeper.server.quorum.QuorumPeerConfig.parse(QuorumPeerConfig.java:194)
... 2 more

KeeperErrorCode = NodeExists

如果在server.log有类似如下的报错,检查server.properties中的broker.id是否冲突。

1
2
3
4
5
[2023-02-10 11:34:43,499] ERROR Exiting Kafka due to fatal exception during startup. (kafka.Kafka$)
org.apache.zookeeper.KeeperException$NodeExistsException: KeeperErrorCode = NodeExists
at org.apache.zookeeper.KeeperException.create(KeeperException.java:126)

【部分运行结果略】

如果还是存在问题,尝试删除Zookeeper的dataDir目录下的version-2。在本文是/usr/kafka/kafka_2.13-3.4.0/zk/data/version-2

Configured broker.id 1 doesn't match stored broker.id

如果在server.log有类似如下的报错,检查server.properties中的broker.id是否冲突。

1
2
3
4
5
6
[2023-02-10 11:30:06,506] ERROR Fatal error during KafkaServer startup. Prepare to shutdown (kafka.server.KafkaServer)
kafka.common.InconsistentBrokerIdException: Configured broker.id 1 doesn't match stored broker.id Some(2) in meta.properties. If you moved your data, make sure your configured broker.id matches. If you intend to create a new broker, you should remove all data in your data directories (log.dirs).
at kafka.server.KafkaServer.getOrGenerateBrokerId(KafkaServer.scala:980)
at kafka.server.KafkaServer.startup(KafkaServer.scala:237)
at kafka.Kafka$.main(Kafka.scala:115)
at kafka.Kafka.main(Kafka.scala)

如果还是存在问题,尝试删除Kafka的log.dirs目录下的内容。在本文是/usr/kafka/kafka_2.13-3.4.0/kafka-logs

Kraft

关于下载、JDK以及解压部分,没有任何区别。

server.properties

注意,这里的server.properties,位于config/kraft/server.properties

process.role

1
2
# The role of this server. Setting this puts us in KRaft mode
process.roles=broker,controller
  • controller,相当于主机,主机有类似Zookeeper的功能。
  • broker,相当于从机。

我们可以为一个节点同时配置两个角色。
也与我们上文的集群规划一样,没必要所有的节点都同时配置两个角色。

node.id

1
2
# The node id associated with this instance's roles
node.id=1

node.id,类似于我们上文的broker.id,不能重复。

controller.quorum.voters

1
2
# The connect string for the controller quorum
controller.quorum.voters=1@localhost:9093

投票人集合ID和地址端口映射({id}@{host}:{port}的形式),以逗号分隔。

我们将其修改为:

1
controller.quorum.voters=1@172.31.71.1:9093,2@172.31.70.255:9093,3@172.31.71.0:9093

advertised.listeners

1
2
3
# Listener name, hostname and port the broker will advertise to clients.
# If not set, it uses the value for "listeners".
advertised.listeners=PLAINTEXT://localhost:9092

需要改为advertised.listeners=PLAINTEXT://【服务器IP】:9092

log.dirs

1
2
# A comma separated list of directories under which to store log files
log.dirs=/tmp/kafka-logs

这个参数,我们在上文也配置过。我们将其修改为Kafka安装目录下,这样便于我们管理。

初始化集群数据目录

在一台机器上,生成存储目录唯一ID。

示例代码:

1
./bin/kafka-storage.sh random-uuid
运行结果:
1
Unyb9gg4T_uxMvkjIjM8YA

在所有机器上,用用该ID格式化Kafka存储目录。

1
./bin/kafka-storage.sh format -t Unyb9gg4T_uxMvkjIjM8YA -c /usr/local/kafka/kafka_2.13-3.4.0/config/kraft/server.properties

启动集群

不需要启动Zookeeper,只需要启动Kafka。
注意server.properties的地址。

1
./bin/kafka-server-start.sh -daemon config/kraft/server.properties

命令行

主题命令行

查看参数

1
./bin/kafka-topics.sh

运行结果:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
Create, delete, describe, or change a topic.
Option Description
------ -----------
--alter Alter the number of partitions,
replica assignment, and/or
configuration for the topic.
--at-min-isr-partitions if set when describing topics, only
show partitions whose isr count is
equal to the configured minimum.
--bootstrap-server <String: server to REQUIRED: The Kafka server to connect
connect to> to.
--command-config <String: command Property file containing configs to be
config property file> passed to Admin Client. This is used
only with --bootstrap-server option
for describing and altering broker
configs.

【部分运行结果略】
参数 描述
--bootstrap-server <String: server toconnect to> 连接的KafkaBroker主机名称和端口号
--topic <String: topic> 操作的topic名称
--create 创建主题
--delete 删除主题
--alter 修改主题
--list 查看所有主题
--describe 查看主题详细描述
--partitions <Integer: # of partitions> 设置分区数
--replication-factor <Integer: replication factor> 设置分区副本
--config <String: name=value> 更新系统默认的配置。

查看所有的主题

1
./bin/kafka-topics.sh --bootstrap-server 127.0.0.1:9092 --list

创建主题

1
./bin/kafka-topics.sh --bootstrap-server 127.0.0.1:9092 --create --partitions 1 --replication-factor 3 --topic first

选项说明:

  • --topic:定义topic名称
  • --replication-factor:定义副本数
  • --partitions:定义分区数

查看主题详情

1
./bin/kafka-topics.sh --bootstrap-server 127.0.0.1:9092 --describe --topic first
1
2
Topic: first	TopicId: Z4rDyAllS3OBmrZaVddP-Q	PartitionCount: 1	ReplicationFactor: 3	Configs: 
Topic: first Partition: 0 Leader: 1 Replicas: 1,2,3 Isr: 1,2,3

修改分区数

注意,分区只能增加,不能减少。

1
./bin/kafka-topics.sh --bootstrap-server 127.0.0.1:9092 --alter --topic first --partitions 3

修改之后,我们再查看一下:

1
2
3
4
Topic: first	TopicId: Z4rDyAllS3OBmrZaVddP-Q	PartitionCount: 3	ReplicationFactor: 3	Configs: 
Topic: first Partition: 0 Leader: 1 Replicas: 1,2,3 Isr: 1,2,3
Topic: first Partition: 1 Leader: 2 Replicas: 2,3,1 Isr: 2,3,1
Topic: first Partition: 2 Leader: 3 Replicas: 3,1,2 Isr: 3,1,2

删除主题

1
./bin/kafka-topics.sh --bootstrap-server 127.0.0.1:9092 --delete --topic first

生产者命令行

查看参数

1
./bin/kafka-console-producer.sh

运行结果:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
Missing required option(s) [bootstrap-server]
Option Description
------ -----------
--batch-size <Integer: size> Number of messages to send in a single
batch if they are not being sent
synchronously. please note that this
option will be replaced if max-
partition-memory-bytes is also set
(default: 16384)
--bootstrap-server <String: server to REQUIRED unless --broker-list
connect to> (deprecated) is specified. The server
(s) to connect to. The broker list
string in the form HOST1:PORT1,HOST2:
PORT2.

【部分运行结果略】
参数 描述
--bootstrap-server <String: server toconnect to> 连接的KafkaBroker主机名称和端口号
--topic <String: topic> 操作的topic名称

发送消息

1
./bin/kafka-console-producer.sh --bootstrap-server 127.0.0.1:9092 --topic first

消费者命令行

查看参数

1
./bin/kafka-console-consumer.sh
参数 描述
--bootstrap-server <String: server toconnect to> 连接的KafkaBroker主机名称和端口号
--topic <String: topic> 操作的topic名称
--from-beginning 从头开始消费
--group <String: consumer group id> 指定消费者组名称

消费消息

消费first主题中的数据:

1
./bin/kafka-console-consumer.sh --bootstrap-server 127.0.0.1:9092 --topic first

把主题中所有的数据都读取出来(包括历史数据):

1
./bin/kafka-console-consumer.sh --bootstrap-server 127.0.0.1:9092 --from-beginning --topic first
文章作者: Kaka Wan Yifan
文章链接: https://kakawanyifan.com/11901
版权声明: 本博客所有文章版权为文章作者所有,未经书面许可,任何机构和个人不得以任何形式转载、摘编或复制。

留言板