avatar


5.监控和调优

Kafka-Eagle

Kafka-Eagle,监控Kafka集群的整体运行情况的工具。

环境准备

Kafka-Eagle,依赖MySQL或者sqlite,用以存储可视化展示的数据。
这里,我们采用MySQL,关于MySQL的安装,可以参考《MySQL从入门到实践:1.概述和工具准备》
这里,我们重点讨论在Kafka上要进行的修改。

关闭Kafka集群

1
./bin/kafka-server-stop.sh

修改./bin/kafka-server-start.sh

1
2
3
if [ "x$KAFKA_HEAP_OPTS" = "x" ]; then
export KAFKA_HEAP_OPTS="-Xmx1G -Xms1G"
fi
修改为
1
2
3
4
5
if [ "x$KAFKA_HEAP_OPTS" = "x" ]; then
# export KAFKA_HEAP_OPTS="-Xmx1G -Xms1G"
export KAFKA_HEAP_OPTS="-server -Xms2G -Xmx2G -XX:PermSize=128m -XX:+UseG1GC -XX:MaxGCPauseMillis=200 -XX:ParallelGCThreads=8 -XX:ConcGCThreads=5 -XX:InitiatingHeapOccupancyPercent=70"
export JMX_PORT="9999"
fi

解释说明:

  • Kafka默认的内存是1G,使用Kafka-Eagle,建议将内存增加到2G。
  • Kafka自身提供的监控指标可以通过JMX(Java Managent Extension)来进行获取,我们暴露一个JMX的端口号export JMX_PORT="9999"

启动Kafka集群

1
./kafka-server-start.sh -daemon ../config/server.properties

监控原理

上述的步骤完成之后,其实就可以监控Kafka了。

JConsole

JConsole,是一个图形化的Java监控和管理工具,在我们安装JDK的时候,会自带这个工具,和javajavac等处于同一个目录。

JConsole

我们在Remote Process中填入地址kafka-01:9999

在最后一个Tab页,我们可以看到需要和Kafka有关的。

许多和Kafka有关的

Java代码

除了通过JConsole,我们也可以通过编程,写Java代码的方式,获取上述指标。
示例代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
import javax.management.*;
import javax.management.remote.JMXConnector;
import javax.management.remote.JMXConnectorFactory;
import javax.management.remote.JMXServiceURL;
import java.io.IOException;

public class JmxConnectionDemo {
private MBeanServerConnection conn;
private String jmxURL;
private String ipAndPort;

public JmxConnectionDemo(String ipAndPort) {
this.ipAndPort = ipAndPort;
}

public boolean init() {
jmxURL = "service:jmx:rmi:///jndi/rmi://" + ipAndPort + "/jmxrmi";
try {
JMXServiceURL serviceURL = new JMXServiceURL(jmxURL);
JMXConnector connector = JMXConnectorFactory.connect(serviceURL, null);
conn = connector.getMBeanServerConnection();
if (conn == null) {
return false;
}
} catch (IOException e) {
e.printStackTrace();
}
return true;
}

public Object getObjectNameVal() {
String objName = "kafka.cluster:type=Partition,name=AtMinIsr,topic=first,partition=0";
String objAttr = "Value";
Object val = getAttribute(objName, objAttr);
return val;
}

private Object getAttribute(String objName, String objAttr) {
ObjectName objectName;
try {
objectName = new ObjectName(objName);
return conn.getAttribute(objectName, objAttr);
} catch (MalformedObjectNameException | IOException |
ReflectionException | InstanceNotFoundException |
AttributeNotFoundException | MBeanException e) {
e.printStackTrace();
}
return null;
}

public static void main(String[] args) {
JmxConnectionDemo jmxConnectionDemo = new JmxConnectionDemo("kafka-01:9999");
jmxConnectionDemo.init();
System.out.println(jmxConnectionDemo.getObjectNameVal());
}
}

getAttribute()方法的两个形参objNameobjAttr,分别对应下图的12两处。

getAttribute()

Kafka-Eagle的监控原理,就是通过JMX获取各种监控指标,并做了一个非常好的展示。

安装

通过官网下载安装包,下载完成后,上传至服务器,直接解压即可。

示例代码:

1
tar -zxvf kafka-eagle-bin-3.0.1.tar.gz

运行结果:

1
2
kafka-eagle-bin-3.0.1/
kafka-eagle-bin-3.0.1/efak-web-3.0.1-bin.tar.gz

我们需要再解压efak-web-3.0.1-bin.tar.gz

1
tar -zxvf efak-web-3.0.1-bin.tar.gz

配置

配置文件

配置文件:./conf/system-config.properties

集群配置

1
2
3
4
5
6
7
######################################
# multi zookeeper & kafka cluster list
# Settings prefixed with 'kafka.eagle.' will be deprecated, use 'efak.' instead
######################################
efak.zk.cluster.alias=cluster1,cluster2
cluster1.zk.list=tdn1:2181,tdn2:2181,tdn3:2181
cluster2.zk.list=xdn10:2181,xdn11:2181,xdn12:2181

默认的配置文件有两个集群,我们需要修改改部分。在本文改为:

1
2
efak.zk.cluster.alias=cluster1,
cluster1.zk.list=kafka-01:2181,kafka-02:2181,kafka-03:2181/kafka
  • cluster1.zk.list,配置的Zookeeper的地址。

数据库连接配置

1
2
3
4
5
6
7
######################################
# kafka mysql jdbc driver address
######################################
efak.driver=com.mysql.cj.jdbc.Driver
efak.url=jdbc:mysql://127.0.0.1:3306/ke?useUnicode=true&characterEncoding=UTF-8&zeroDateTimeBehavior=convertToNull
efak.username=root
efak.password=123456

将该部分修改为我们自己的数据库连接信息。

添加环境变量

修改/etc/profile

在文件结尾添加如下内容:

1
2
3
# Kafka-Eagle
export KE_HOME=/usr/local/kafka-eagle/kafka-eagle-bin-3.0.1/efak-web-3.0.1
export PATH=$PATH:$KE_HOME/bin

刷新:source /etc/profile

如果没有添加环境变量,在启动的时候,会有如下的报错

1
2
3
[2023-03-07 16:42:15] INFO: Starting  EFAK( Eagle For Apache Kafka ) environment check ...
[2023-03-07 16:42:15] Error: The KE_HOME environment variable is not defined correctly.
[2023-03-07 16:42:15] Error: This environment variable is needed to run this program.

启动

示例代码:

1
./bin/ke.sh start

运行结果:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31

【部分运行结果略】

inflated: media/css/public/account/hfd.ttf
inflated: media/css/public/account/hfc.ttf
inflated: media/css/public/fonts/boxicons.woff
inflated: media/css/public/fonts/boxicons.woff2
inflated: media/css/public/fonts/boxicons.svg
inflated: media/js/public/plus/all.min.js
[2023-03-07 16:48:22] INFO: Port Progress: [##################################################] | 100%
[2023-03-07 16:48:26] INFO: Config Progress: [##################################################] | 100%
[2023-03-07 16:48:29] INFO: Startup Progress: [##################################################] | 100%
[2023-03-07 16:48:19] INFO: Status Code[0]
[2023-03-07 16:48:19] INFO: [Job done!]
Welcome to
______ ______ ___ __ __
/ ____/ / ____/ / | / //_/
/ __/ / /_ / /| | / ,<
/ /___ / __/ / ___ | / /| |
/_____/ /_/ /_/ |_|/_/ |_|
( Eagle For Apache Kafka® )

Version v3.0.1 -- Copyright 2016-2022
*******************************************************************
* EFAK Service has started success.
* Welcome, Now you can visit 'http://10.211.55.20:8048'
* Account:admin ,Password:123456
*******************************************************************
* <Usage> ke.sh [start|status|stop|restart|stats] </Usage>
* <Usage> https://www.kafka-eagle.org/ </Usage>
*******************************************************************
  • 需要先启动Zookeeper和Kafka。
  • Welcome, Now you can visit 'http://10.211.55.20:8048',即访问地址。
  • Account:admin ,Password:123456,即用户名和密码

监控页面

页面

具体的Kafka-Eagle的使用方法略,我们自己点一点就能明白。
还可以参考官方文档:https://docs.kafka-eagle.org

硬件的选择

服务器台数

服务器台数=2×生产者峰值生产速率×副本数100+1\text{服务器台数} = 2 \times \frac{\text{生产者峰值生产速率} \times \text{副本数}}{100} + 1

  • 生产者峰值生产速率的单位是MB/S。
  • 生产者峰值生产速率指的是总的生产者峰值生产速率。
    例如,有两个生产者,生产者A在时刻1的速率是5MB/S,在时刻2的速率是2MB/S;生产者B在时刻1没有生产,在时刻2的速率是10MB/S,那么生产者峰值生产速率是12MB/S
  • 生产者峰值生产速率×副本数100\frac{\text{生产者峰值生产速率} \times \text{副本数}}{100},如果除不尽,建议向上取整,而不是四舍五入。

例如,生产者峰值生产速率是20MB/S,副本数是2,则有:

服务器台数=2×20×2100+1=3\begin{aligned} \text{服务器台数} & = 2 \times \frac{20 \times 2}{100} + 1 & = 3 \end{aligned}

磁盘

《3.Broker》的"高效读写"部分,我们讨论过,Kafka写数据时候,是顺序写磁盘。

所以,固态硬盘和机械硬盘速度其实差不多。

如果Kafka集群出现了性能问题,除非是机械硬盘有故障,否则将机械硬盘换成固态硬盘,不会有太大的性能提升。

内存

Kafka内存分为两部分:

  • 堆内存
    由Kafka进行配置。
  • 页缓存

堆内存

修改方法

./bin/kafka-server-start.sh中修改

1
2
3
if [ "x$KAFKA_HEAP_OPTS" = "x" ]; then
export KAFKA_HEAP_OPTS="-Xmx1G -Xms1G"
fi

判断方法

查看Kafka进程号

示例代码:

1
jps
运行结果:
1
2
3
4
736799 KafkaEagle
875881 Jps
621199 Kafka
13884 QuorumPeerMain

查看使用情况
命令:jmap -heap 【Kafka进程号】

示例代码:

1
jmap -heap 621199
运行结果:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
Attaching to process ID 621199, please wait...
Debugger attached successfully.
Server compiler detected.
JVM version is 25.361-b09

using thread-local object allocation.
Garbage-First (G1) GC with 8 thread(s)

Heap Configuration:
MinHeapFreeRatio = 40
MaxHeapFreeRatio = 70
MaxHeapSize = 2147483648 (2048.0MB)
NewSize = 1363144 (1.2999954223632812MB)
MaxNewSize = 1287651328 (1228.0MB)
OldSize = 5452592 (5.1999969482421875MB)
NewRatio = 2
SurvivorRatio = 8
MetaspaceSize = 21807104 (20.796875MB)
CompressedClassSpaceSize = 1073741824 (1024.0MB)
MaxMetaspaceSize = 17592186044415 MB
G1HeapRegionSize = 1048576 (1.0MB)

Heap Usage:
G1 Heap:
regions = 2048
capacity = 2147483648 (2048.0MB)
used = 169624064 (161.76611328125MB)
free = 1977859584 (1886.23388671875MB)
7.898736000061035% used
G1 Young Generation:
Eden Space:
regions = 7
capacity = 103809024 (99.0MB)
used = 7340032 (7.0MB)
free = 96468992 (92.0MB)
7.070707070707071% used
Survivor Space:
regions = 9
capacity = 9437184 (9.0MB)
used = 9437184 (9.0MB)
free = 0 (0.0MB)
100.0% used
G1 Old Generation:
regions = 147
capacity = 2034237440 (1940.0MB)
used = 152846848 (145.76611328125MB)
free = 1881390592 (1794.23388671875MB)
7.513717179445877% used

13714 interned Strings occupying 1475928 bytes.

主要关注如下部分:

1
2
3
4
5
6
7
Heap Usage:
G1 Heap:
regions = 2048
capacity = 2147483648 (2048.0MB)
used = 169624064 (161.76611328125MB)
free = 1977859584 (1886.23388671875MB)
7.898736000061035% used

我们看到使用率只有7.89%,一般使用率超过了34\frac{3}{4},需要关注,增大堆内存。

另一个关注点:查看GC情况

命令:jstat -gc 【Kafka进程号】 1s 10

示例代码:

1
jstat -gc 621199 1s 10
运行结果:
1
2
3
4
5
6
7
8
9
10
11
S0C    S1C    S0U    S1U      EC       EU        OC         OU       MC     MU    CCSC   CCSU   YGC     YGCT    FGC    FGCT     GCT   
0.0 9216.0 0.0 9216.0 101376.0 26624.0 1986560.0 149264.5 51200.0 46317.2 6144.0 5578.8 14 6.869 0 0.000 6.869
0.0 9216.0 0.0 9216.0 101376.0 26624.0 1986560.0 149264.5 51200.0 46317.2 6144.0 5578.8 14 6.869 0 0.000 6.869
0.0 9216.0 0.0 9216.0 101376.0 26624.0 1986560.0 149264.5 51200.0 46317.2 6144.0 5578.8 14 6.869 0 0.000 6.869
0.0 9216.0 0.0 9216.0 101376.0 26624.0 1986560.0 149264.5 51200.0 46317.2 6144.0 5578.8 14 6.869 0 0.000 6.869
0.0 9216.0 0.0 9216.0 101376.0 26624.0 1986560.0 149264.5 51200.0 46317.2 6144.0 5578.8 14 6.869 0 0.000 6.869
0.0 9216.0 0.0 9216.0 101376.0 26624.0 1986560.0 149264.5 51200.0 46317.2 6144.0 5578.8 14 6.869 0 0.000 6.869
0.0 9216.0 0.0 9216.0 101376.0 26624.0 1986560.0 149264.5 51200.0 46317.2 6144.0 5578.8 14 6.869 0 0.000 6.869
0.0 9216.0 0.0 9216.0 101376.0 26624.0 1986560.0 149264.5 51200.0 46317.2 6144.0 5578.8 14 6.869 0 0.000 6.869
0.0 9216.0 0.0 9216.0 101376.0 26624.0 1986560.0 149264.5 51200.0 46317.2 6144.0 5578.8 14 6.869 0 0.000 6.869
0.0 9216.0 0.0 9216.0 101376.0 26624.0 1986560.0 149264.5 51200.0 46317.2 6144.0 5578.8 14 6.869 0 0.000 6.869

主要关注YGC(年轻代垃圾回收次数)。
在例子中,次数没有超过100,完全不用关注。

页缓存

页缓存是Linux系统服务器的内存。

每个节点页缓存大小=集群的总分区数×0.25节点数\text{每个节点页缓存大小} = \frac{\text{集群的总分区数} \times 0.25}{\text{节点数}}

  • 单位是GB。
  • 计算结果进行四舍五入。
  • 集群的总分区数,特指Leader分区,并且不包含系统分区。
  • 0.250.25是一个经验值,一个分区的数据全都读进内存,当然是最好的,但是一般没有必要(而且不一定一个分区的数就是1G)。

例如,集群中一共有10个Leader分区(不含系统主题的分区),3个节点,则:

每个节点页缓存大小=10×0.253=1\begin{aligned} \text{每个节点页缓存大小} & = \frac{10 \times 0.25}{3} & = 1 \end{aligned}

为什么不考虑系统分区

根据我们在之前章节的讨论,我们知道Kafka中有默认有100个系统分区(50个__consumer_offsets和50个__transaction_state)。

我查阅了许多资料,这些资料虽然在计算页缓存方面存在差异,但是都没有考虑到系统分区数。
而且,都没有解释不考虑系统分区的原因,甚至都没有提不考虑系统,只是在举例子的时候,体现了不考虑系统分区。

只能理解为,整个计算公式,其实就是经验,甚至是一种惯例。

CPU

Kafka是I/O密集型而非计算密集型的框架,所以对CPU的需求并不高,消耗CPU的点主要在于消息的压缩和解压缩。
此外,一个Broker节点往往要承载许多个TopicPartition并与许多个Producer/Consumer交互,所以并行度(核心/线程数)要比单核性能(频率)更重要。

注意根据CPU规格的不同,Broker如下三个参数要修改

  • num.io.threads
    负责写磁盘的线程数,整个参数值要占总核数的50%。
    默认8。
  • num.replica.fetchers
    副本拉取线程数,这个参数占总核数的50%的13\frac{1}{3}
    默认1。
  • num.network.threads
    数据传输线程数,这个参数占总核数的50%的23\frac{2}{3}
    默认3。

有些资料建议32核的CPU,我的观点是Kafka是I/O密集型而非计算密集型的框架,不需要那么大的CPU,具体我们可以根据生产环境的实际负载进行调整,通过top命令可以查看。

网络

网络带宽 = 峰值吞吐量

需要注意的是:

  • 网络带宽的但是为Mbps,b的含义是比特(bit);
  • 峰值吞吐量,可能用MB/s表示,B的含义是字节(Byte),1Byte=8bit。

生产者调优

调优方向有:

  • 提高吞吐量
  • 消息不丢失
  • 消息不重复
  • 消息有序

关于该部分,我们在《2.Producer》已经讨论了。

Broker调优

调优方向

调优方向有:

  • 新增节点
  • 调整分区副本
  • Leader分区自动平衡
  • 关闭自动创建主题
  • 动态配置

除了"关闭自动创建主题"和"动态配置",我们没有讨论。其他的,我们在《3.Broker》已经讨论了。

关闭自动创建主题

参数auto.create.topics.enable,自动创建副本。
当生产者向一个未创建的主题发送消息时,或者当一个消费者开始从未创建的主题中读取消息时,Kafka会自动创建一个分区数为num.partitions(默认值为1)、副本因子为default.replication.factor(默认值为1)的主题。
这种创建主题的方式是非预期的,增加了主题管理和维护的难度。
生产环境建议将该参数设置为false

动态配置

操作方法

参数

示例代码:

1
./bin/kafka-configs.sh

运行结果:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
--add-config <String>                  Key Value pairs of configs to add.     
Square brackets can be used to group
values which contain commas: 'k1=v1,
k2=[v1,v2,v2],k3=v3'. The following
is a list of valid configurations:
For entity-type 'topics':

【部分运行结果略】

follower.replication.throttled.replicas
leader.replication.throttled.replicas

【部分运行结果略】

For entity-type 'brokers':

【部分运行结果略】

follower.replication.throttled.rate
leader.replication.throttled.rate

【部分运行结果略】

For entity-type 'users':

【部分运行结果略】

consumer_byte_rate
producer_byte_rate

【部分运行结果略】

For entity-type 'clients':

【部分运行结果略】

consumer_byte_rate
producer_byte_rate

【部分运行结果略】
For entity-type 'ips':
【部分运行结果略】

--add-config-file <String> Path to a properties file with configs
to add. See add-config for a list of
valid configurations.
--all List all configs for the given topic,
broker, or broker-logger entity
(includes static configuration when
the entity type is brokers)
--alter Alter the configuration for the entity.
--bootstrap-server <String: server to The Kafka server to connect to. This
connect to> is required for describing and
altering broker configs.
--broker <String> The broker's ID.
--broker-defaults The config defaults for all brokers.
--broker-logger <String> The broker's ID for its logger config.
--client <String> The client's ID.
--client-defaults The config defaults for all clients.
--command-config <String: command Property file containing configs to be
config property file> passed to Admin Client. This is used
only with --bootstrap-server option
for describing and altering broker
configs.
--delete-config <String> config keys to remove 'k1,k2'
--describe List configs for the given entity.
--entity-default Default entity name for
clients/users/brokers/ips (applies
to corresponding entity type in
command line)
--entity-name <String> Name of entity (topic name/client
id/user principal name/broker id/ip)
--entity-type <String> Type of entity
(topics/clients/users/brokers/broker-
loggers/ips)
--force Suppress console prompts
--help Print usage information.
--ip <String> The IP address.
--ip-defaults The config defaults for all IPs.
--topic <String> The topic's name.
--user <String> The user's principal name.
--user-defaults The config defaults for all users.
--version Display Kafka version.
--zk-tls-config-file <String: Identifies the file where ZooKeeper
ZooKeeper TLS configuration> client TLS connectivity properties
are defined. Any properties other
than zookeeper.clientCnxnSocket,
zookeeper.ssl.cipher.suites,
zookeeper.ssl.client.enable,
zookeeper.ssl.crl.enable, zookeeper.
ssl.enabled.protocols, zookeeper.ssl.
endpoint.identification.algorithm,
zookeeper.ssl.keystore.location,
zookeeper.ssl.keystore.password,
zookeeper.ssl.keystore.type,
zookeeper.ssl.ocsp.enable, zookeeper.
ssl.protocol, zookeeper.ssl.
truststore.location, zookeeper.ssl.
truststore.password, zookeeper.ssl.
truststore.type are ignored.
--zookeeper <String: urls> DEPRECATED. The connection string for
the zookeeper connection in the form
host:port. Multiple URLS can be
given to allow fail-over. Required
when configuring SCRAM credentials
for users or dynamic broker configs
when the relevant broker(s) are
down. Not allowed otherwise.

我们主要需要关注的参数有:

  • --add-config,增加配置,以'k1=v1,k2=v2,k3=v3'的形式,对于不同的被操作的实体的类型,可以增加的配置也不同。
    • producer_byte_rate,生产者最大速率,单位是字节每秒
    • consumer_byte_rate,消费者最大速率,单位是字节每秒
    • follower.replication.throttled.replicasleader.replication.throttled.replicas,副本列表
    • follower.replication.throttled.rateleader.replication.throttled.rate,同步速率
  • --delete-config,删除配置,以'k1,k2,k3'的形式。
  • --entity-type,被操作的实体的类型,有topicsclientsusersbrokersbroker-loggersips
  • --entity-name,和--entity-type是配套的,被操作的实体的名称,有topic nameclient iduser principal namebroker idip
  • --entity-default,和--entity-type是配套的,表示默认的--entity-name,一般情况下,默认的含义是所有。

使用注意:

  • 可以同时制定多个实体类型,例如:
    1
    2
    3
    4
    --entity-type users \
    --entity-name user1 \
    --entity-type clients \
    --entity-name clientA
    表示操作对象是某个客户端的某个用户。
  • clientid是每个接入kafka集群的client的一个身份标志,users只有在开启了身份认证的kafka集群才有。
  • 生产者的clientid默认值为producer-自增序号
  • 消费者的clientid默认值为groupid

副本列表和同步速率的详细介绍:

Brokers配额限流

  • Follower向Leader发起复制请求,用于follower.replication.throttled.rate限流。
  • Leader向Follower返回复制响应,用于leader.replication.throttled.rate限流。
  • XXX.replication.throttled.replicas只有配合XXX.replication.throttled.rate,才能生效。
  • xxx.replication.throttled.rate,必须设置具体的brokerId,设置--entity-default无效。

例子

增加配置项

对某个topic增加配置:

1
2
3
4
5
6
./bin/kafka-configs.sh \
--bootstrap-server kafka-01:9092 \
--alter \
--entity-type topics \
--entity-name 【主题名称】 \
--add-config 'k1=v1,k2=v2,k3=v3'
  • --alter,表示要做修改操作。

对所有client增加配置:

1
2
3
4
5
6
./bin/kafka-configs.sh \
--bootstrap-server kafka-01:9092 \
--alter
--entity-type clients
--entity-default
--add-config 'k1=v1,k2=v2,k3=v3'

删除配置项

1
2
3
4
5
6
./bin/kafka-configs.sh \
--bootstrap-server kafka-01:9092 \
--alter \
--entity-type topics \
--entity-name 【主题名称】\
--delete-config ‘k1,k2,k3’

修改配置项

修改配置项与"增加配置项"相同,对于已有的参数,会直接覆盖。

查看配置项

1
2
3
4
5
./bin/kafka-configs.sh \
--bootstrap-server kafka-01:9092 \
--entity-type topics \
--entity-name 【主题名称】\
--describe

注意!查看的只是动态配置项,不是相关的所有配置。
示例代码:

1
2
3
4
5
./bin/kafka-configs.sh \ \
--bootstrap-server kafka-01:9092 \
--entity-type topics \
--entity-name first \
--describe
运行结果:
1
Dynamic configs for topic first are:

客户端配额限流

命令

通过上文的讨论,其实客户端配额限流已经很简单了,是动态配置的一个具体应用。

对生产者客户端进行限流:

1
2
3
4
5
6
./bin/kafka-configs.sh \
--bootstrap-server kafka-01:9092,kafka-02:9092,kafka-03:9092 \
--alter \
--add-config 'producer_byte_rate=1048576' \
--entity-type clients \
--entity-default

对消费者客户端进行限流:

1
2
3
4
5
6
./bin/kafka-configs.sh \
--bootstrap-server kafka-01:9092,kafka-02:9092,kafka-03:9092 \
--alter \
--add-config 'consumer_byte_rate=20971520' \
--entity-type clients \
--entity-default

原理过程

Kakfa处理限流的原理过程:

  • 对于生产者
    Kafka先把数据append到log文件,再计算延时,并在等待ThrottleTime时间后响应给Producer。
    如果没有客户端反馈机制,生产者可能会认为自己写入超时了,这时候会重发,写入消息会重复。
  • 对于消费者
    Kafka先计算延时时间,并在等待ThrottleTime时间后,Kafka从log读取数据并响应Consumer。
    如果消费者的QequestTimeout小于ThrottleTime,消费者在ThrottleTime时间内会不断重发fetch请求,kafka会堆积大量无效请求,占用资源。

这种通过"Sleep"进行速度控制的方法,还在一个地方有,DataX。
具体可以参考《离线异构数据同步工具DataX:2.源码概览》的"数据传输"的"限速方法"。

Brokers配额限流

1
2
3
4
5
6
./bin/kafka-configs.sh \
--bootstrap-server kafka-01:9092,kafka-02:9092,kafka-03:9092 \
--alter \
--entity-type brokers \
--entity-name 【brokerId】 \
--add-config 'leader.replication.throttled.rate=10485760'

消费者调优

调优方向有:

  • 避免触发消费者再平衡
  • 提高吞吐量

关于该部分,我们在《4.Consumer》已经讨论了。

压力测试

Kafka自带了压力测试的脚本:

  • 生产者压测:kafka-producer-perf-test.sh
  • 消费者压测:kafka-consumer-perf-test.sh

生产者压测

生产者压测:kafka-producer-perf-test.sh

查看参数

直接执行kafka-producer-perf-test.sh,即可查看参数。
示例代码:

1
./bin/kafka-producer-perf-test.sh

运行结果:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
usage: producer-performance [-h] --topic TOPIC --num-records NUM-RECORDS [--payload-delimiter PAYLOAD-DELIMITER] --throughput THROUGHPUT [--producer-props PROP-NAME=PROP-VALUE [PROP-NAME=PROP-VALUE ...]]
[--producer.config CONFIG-FILE] [--print-metrics] [--transactional-id TRANSACTIONAL-ID] [--transaction-duration-ms TRANSACTION-DURATION] (--record-size RECORD-SIZE |
--payload-file PAYLOAD-FILE)

This tool is used to verify the producer performance.

optional arguments:
-h, --help show this help message and exit
--topic TOPIC produce messages to this topic
--num-records NUM-RECORDS
number of messages to produce
--payload-delimiter PAYLOAD-DELIMITER
provides delimiter to be used when --payload-file is provided. Defaults to new line. Note that this parameter will be ignored if --payload-file is not provided. (default: \n)
--throughput THROUGHPUT
throttle maximum message throughput to *approximately* THROUGHPUT messages/sec. Set this to -1 to disable throttling.
--producer-props PROP-NAME=PROP-VALUE [PROP-NAME=PROP-VALUE ...]
kafka producer related configuration properties like bootstrap.servers,client.id etc. These configs take precedence over those passed via --producer.config.
--producer.config CONFIG-FILE
producer config properties file.
--print-metrics print out metrics at the end of the test. (default: false)
--transactional-id TRANSACTIONAL-ID
The transactionalId to use if transaction-duration-ms is > 0. Useful when testing the performance of concurrent transactions. (default: performance-producer-default-transactional-
id)
--transaction-duration-ms TRANSACTION-DURATION
The max age of each transaction. The commitTransaction will be called after this time has elapsed. Transactions are only enabled if this value is positive. (default: 0)

either --record-size or --payload-file must be specified but not both.

--record-size RECORD-SIZE
message size in bytes. Note that you must provide exactly one of --record-size or --payload-file.
--payload-file PAYLOAD-FILE
file to read the message payloads from. This works only for UTF-8 encoded text files. Payloads will be read from this file and a payload will be randomly selected when sending
messages. Note that you must provide exactly one of --record-size or --payload-file.

常用参数

  • --record-size
    一条信息的数据量,单位是字节。
    本次测试设置为1KB
  • num-records
    一共发送多少条信息。
    本次测试设置为100万条。
  • --throughput
    是每秒多少条信息,-1,表示不限流,尽可能快的生产数据,用这个可以测出生产者最大吞吐量。
    本次测试设置为每秒1万条。
  • --producer-props
    后面跟配置生产者相关参数,batch.sizelinger.ms等。
    我们可以通过这个,寻找生产者中的一个较优配置。

例子

示例代码:

1
2
3
4
5
6
./bin/kafka-producer-perf-test.sh \
--topic first \
--record-size 1024 \
--num-records 1000000 \
--throughput 10000 \
--producer-props bootstrap.servers=kafka-01:9092,kafka-02:9092,kafka-03:9092 batch.size=16384 linger.ms=500

运行结果:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
41266 records sent, 8235.1 records/sec (8.04 MB/sec), 789.0 ms avg latency, 1091.0 ms max latency.
57287 records sent, 11450.5 records/sec (11.18 MB/sec), 402.0 ms avg latency, 1084.0 ms max latency.
51287 records sent, 10257.4 records/sec (10.02 MB/sec), 50.7 ms avg latency, 222.0 ms max latency.
40944 records sent, 8188.8 records/sec (8.00 MB/sec), 318.7 ms avg latency, 966.0 ms max latency.
51930 records sent, 10383.9 records/sec (10.14 MB/sec), 1149.5 ms avg latency, 1476.0 ms max latency.
57583 records sent, 11512.0 records/sec (11.24 MB/sec), 181.0 ms avg latency, 770.0 ms max latency.
47850 records sent, 9568.1 records/sec (9.34 MB/sec), 35.2 ms avg latency, 219.0 ms max latency.
48015 records sent, 9603.0 records/sec (9.38 MB/sec), 372.4 ms avg latency, 600.0 ms max latency.
50303 records sent, 10052.6 records/sec (9.82 MB/sec), 481.5 ms avg latency, 1015.0 ms max latency.
52117 records sent, 10421.3 records/sec (10.18 MB/sec), 273.3 ms avg latency, 647.0 ms max latency.
47205 records sent, 9435.3 records/sec (9.21 MB/sec), 355.9 ms avg latency, 543.0 ms max latency.
53550 records sent, 10707.9 records/sec (10.46 MB/sec), 229.9 ms avg latency, 483.0 ms max latency.
51242 records sent, 10248.4 records/sec (10.01 MB/sec), 29.5 ms avg latency, 133.0 ms max latency.
49613 records sent, 9922.6 records/sec (9.69 MB/sec), 39.8 ms avg latency, 138.0 ms max latency.
49287 records sent, 9857.4 records/sec (9.63 MB/sec), 102.0 ms avg latency, 317.0 ms max latency.
41748 records sent, 8349.6 records/sec (8.15 MB/sec), 292.2 ms avg latency, 956.0 ms max latency.
44730 records sent, 8946.0 records/sec (8.74 MB/sec), 1363.8 ms avg latency, 1542.0 ms max latency.
51390 records sent, 10278.0 records/sec (10.04 MB/sec), 1771.4 ms avg latency, 2096.0 ms max latency.
63375 records sent, 12675.0 records/sec (12.38 MB/sec), 600.1 ms avg latency, 1485.0 ms max latency.
47456 records sent, 9491.2 records/sec (9.27 MB/sec), 66.7 ms avg latency, 273.0 ms max latency.
1000000 records sent, 9964.923469 records/sec (9.73 MB/sec), 442.95 ms avg latency, 2096.00 ms max latency, 227 ms 50th, 1505 ms 95th, 1927 ms 99th, 2058 ms 99.9th.

消费者压测

消费者压测:kafka-consumer-perf-test.sh

查看参数

示例代码:

1
./bin/kafka-consumer-perf-test.sh

运行结果:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
Missing required option(s) [bootstrap-server]
Option Description
------ -----------
--bootstrap-server <String: server to REQUIRED unless --broker-list
connect to> (deprecated) is specified. The server
(s) to connect to.
--broker-list <String: broker-list> DEPRECATED, use --bootstrap-server
instead; ignored if --bootstrap-
server is specified. The broker
list string in the form HOST1:PORT1,
HOST2:PORT2.
--consumer.config <String: config file> Consumer config properties file.
--date-format <String: date format> The date format to use for formatting
the time field. See java.text.
SimpleDateFormat for options.
(default: yyyy-MM-dd HH:mm:ss:SSS)
--fetch-size <Integer: size> The amount of data to fetch in a
single request. (default: 1048576)
--from-latest If the consumer does not already have
an established offset to consume
from, start with the latest message
present in the log rather than the
earliest message.
--group <String: gid> The group id to consume on. (default:
perf-consumer-71970)
--help Print usage information.
--hide-header If set, skips printing the header for
the stats
--messages <Long: count> REQUIRED: The number of messages to
send or consume
--num-fetch-threads <Integer: count> DEPRECATED AND IGNORED: Number of
fetcher threads. (default: 1)
--print-metrics Print out the metrics.
--reporting-interval <Integer: Interval in milliseconds at which to
interval_ms> print progress info. (default: 5000)
--show-detailed-stats If set, stats are reported for each
reporting interval as configured by
reporting-interval
--socket-buffer-size <Integer: size> The size of the tcp RECV size.
(default: 2097152)
--threads <Integer: count> DEPRECATED AND IGNORED: Number of
processing threads. (default: 10)
--timeout [Long: milliseconds] The maximum allowed time in
milliseconds between returned
records. (default: 10000)
--topic <String: topic> REQUIRED: The topic to consume from.
--version Display Kafka version.

常用参数

  • --bootstrap-server,指定Kafka集群地址。
  • --topic,指定topic的名称。
  • --messages,一共需要消费的消息条数。
    本次测试100万条。
  • --consumer.config
    消费者配置文件地址。
    我们可以通过这个,寻找消费者中的一个较优配置。

例子

示例代码:

1
2
3
4
5
./bin/kafka-consumer-perf-test.sh \
--bootstrap-server kafka-01:9092,kafka-02:9092,kafka-03:9092 \
--topic first \
--messages 1000000 \
--consumer.config ./config/consumer.properties \

运行结果:

1
2
start.time, end.time, data.consumed.in.MB, MB.sec, data.consumed.in.nMsg, nMsg.sec, rebalance.time.ms, fetch.time.ms, fetch.MB.sec, fetch.nMsg.sec
2023-03-07 20:26:06:838, 2023-03-07 20:26:27:029, 976.8214, 48.3791, 1000324, 49543.0637, 2130, 18061, 54.0846, 55385.8590
文章作者: Kaka Wan Yifan
文章链接: https://kakawanyifan.com/11905
版权声明: 本博客所有文章版权为文章作者所有,未经书面许可,任何机构和个人不得以任何形式转载、摘编或复制。

评论区