avatar


Kafka-5.监控、调优和避免消息丢失

Kafka-Eagle

Kafka-Eagle,监控Kafka集群的整体运行情况的工具。

环境准备

Kafka-Eagle,依赖MySQL或者sqlite,用以存储可视化展示的数据。
这里,我们采用MySQL,关于MySQL的安装,可以参考《MySQL从入门到实践:1.概述和工具准备》
这里,我们重点讨论在Kafka上要进行的修改。

关闭Kafka集群

1
./bin/kafka-server-stop.sh

修改./bin/kafka-server-start.sh

1
2
3
if [ "x$KAFKA_HEAP_OPTS" = "x" ]; then
export KAFKA_HEAP_OPTS="-Xmx1G -Xms1G"
fi
修改为
1
2
3
4
5
if [ "x$KAFKA_HEAP_OPTS" = "x" ]; then
# export KAFKA_HEAP_OPTS="-Xmx1G -Xms1G"
export KAFKA_HEAP_OPTS="-server -Xms2G -Xmx2G -XX:PermSize=128m -XX:+UseG1GC -XX:MaxGCPauseMillis=200 -XX:ParallelGCThreads=8 -XX:ConcGCThreads=5 -XX:InitiatingHeapOccupancyPercent=70"
export JMX_PORT="9999"
fi

解释说明:

  • Kafka默认的内存是1G,使用Kafka-Eagle,建议将内存增加到2G。
  • Kafka自身提供的监控指标可以通过JMX(Java Managent Extension)来进行获取,我们暴露一个JMX的端口号export JMX_PORT="9999"

启动Kafka集群

1
./kafka-server-start.sh -daemon ../config/server.properties

监控原理

上述的步骤完成之后,其实就可以监控Kafka了。

JConsole

JConsole,是一个图形化的Java监控和管理工具,在我们安装JDK的时候,会自带这个工具,和javajavac等处于同一个目录。

JConsole

我们在Remote Process中填入地址kafka-01:9999

在最后一个Tab页,我们可以看到需要和Kafka有关的。

许多和Kafka有关的

Java代码

除了通过JConsole,我们也可以通过编程,写Java代码的方式,获取上述指标。
示例代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
import javax.management.*;
import javax.management.remote.JMXConnector;
import javax.management.remote.JMXConnectorFactory;
import javax.management.remote.JMXServiceURL;
import java.io.IOException;

public class JmxConnectionDemo {
private MBeanServerConnection conn;
private String jmxURL;
private String ipAndPort;

public JmxConnectionDemo(String ipAndPort) {
this.ipAndPort = ipAndPort;
}

public boolean init() {
jmxURL = "service:jmx:rmi:///jndi/rmi://" + ipAndPort + "/jmxrmi";
try {
JMXServiceURL serviceURL = new JMXServiceURL(jmxURL);
JMXConnector connector = JMXConnectorFactory.connect(serviceURL, null);
conn = connector.getMBeanServerConnection();
if (conn == null) {
return false;
}
} catch (IOException e) {
e.printStackTrace();
}
return true;
}

public Object getObjectNameVal() {
String objName = "kafka.cluster:type=Partition,name=AtMinIsr,topic=first,partition=0";
String objAttr = "Value";
Object val = getAttribute(objName, objAttr);
return val;
}

private Object getAttribute(String objName, String objAttr) {
ObjectName objectName;
try {
objectName = new ObjectName(objName);
return conn.getAttribute(objectName, objAttr);
} catch (MalformedObjectNameException | IOException |
ReflectionException | InstanceNotFoundException |
AttributeNotFoundException | MBeanException e) {
e.printStackTrace();
}
return null;
}

public static void main(String[] args) {
JmxConnectionDemo jmxConnectionDemo = new JmxConnectionDemo("kafka-01:9999");
jmxConnectionDemo.init();
System.out.println(jmxConnectionDemo.getObjectNameVal());
}
}

getAttribute()方法的两个形参objNameobjAttr,分别对应下图的12两处。

getAttribute()

Kafka-Eagle的监控原理,就是通过JMX获取各种监控指标,并做了一个非常好的展示。

安装

通过官网下载安装包,下载完成后,上传至服务器,直接解压即可。

示例代码:

1
tar -zxvf kafka-eagle-bin-3.0.1.tar.gz

运行结果:

1
2
kafka-eagle-bin-3.0.1/
kafka-eagle-bin-3.0.1/efak-web-3.0.1-bin.tar.gz

我们需要再解压efak-web-3.0.1-bin.tar.gz

1
tar -zxvf efak-web-3.0.1-bin.tar.gz

配置

配置文件

配置文件:./conf/system-config.properties

集群配置

1
2
3
4
5
6
7
######################################
# multi zookeeper & kafka cluster list
# Settings prefixed with 'kafka.eagle.' will be deprecated, use 'efak.' instead
######################################
efak.zk.cluster.alias=cluster1,cluster2
cluster1.zk.list=tdn1:2181,tdn2:2181,tdn3:2181
cluster2.zk.list=xdn10:2181,xdn11:2181,xdn12:2181

默认的配置文件有两个集群,我们需要修改改部分。在本文改为:

1
2
efak.zk.cluster.alias=cluster1,
cluster1.zk.list=kafka-01:2181,kafka-02:2181,kafka-03:2181/kafka
  • cluster1.zk.list,配置的Zookeeper的地址。

数据库连接配置

1
2
3
4
5
6
7
######################################
# kafka mysql jdbc driver address
######################################
efak.driver=com.mysql.cj.jdbc.Driver
efak.url=jdbc:mysql://127.0.0.1:3306/ke?useUnicode=true&characterEncoding=UTF-8&zeroDateTimeBehavior=convertToNull
efak.username=root
efak.password=123456

将该部分修改为我们自己的数据库连接信息。

添加环境变量

修改/etc/profile

在文件结尾添加如下内容:

1
2
3
# Kafka-Eagle
export KE_HOME=/usr/local/kafka-eagle/kafka-eagle-bin-3.0.1/efak-web-3.0.1
export PATH=$PATH:$KE_HOME/bin

刷新:source /etc/profile

如果没有添加环境变量,在启动的时候,会有如下的报错

1
2
3
[2023-03-07 16:42:15] INFO: Starting  EFAK( Eagle For Apache Kafka ) environment check ...
[2023-03-07 16:42:15] Error: The KE_HOME environment variable is not defined correctly.
[2023-03-07 16:42:15] Error: This environment variable is needed to run this program.

启动

示例代码:

1
./bin/ke.sh start

运行结果:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31

【部分运行结果略】

inflated: media/css/public/account/hfd.ttf
inflated: media/css/public/account/hfc.ttf
inflated: media/css/public/fonts/boxicons.woff
inflated: media/css/public/fonts/boxicons.woff2
inflated: media/css/public/fonts/boxicons.svg
inflated: media/js/public/plus/all.min.js
[2023-03-07 16:48:22] INFO: Port Progress: [##################################################] | 100%
[2023-03-07 16:48:26] INFO: Config Progress: [##################################################] | 100%
[2023-03-07 16:48:29] INFO: Startup Progress: [##################################################] | 100%
[2023-03-07 16:48:19] INFO: Status Code[0]
[2023-03-07 16:48:19] INFO: [Job done!]
Welcome to
______ ______ ___ __ __
/ ____/ / ____/ / | / //_/
/ __/ / /_ / /| | / ,<
/ /___ / __/ / ___ | / /| |
/_____/ /_/ /_/ |_|/_/ |_|
( Eagle For Apache Kafka® )

Version v3.0.1 -- Copyright 2016-2022
*******************************************************************
* EFAK Service has started success.
* Welcome, Now you can visit 'http://10.211.55.20:8048'
* Account:admin ,Password:123456
*******************************************************************
* <Usage> ke.sh [start|status|stop|restart|stats] </Usage>
* <Usage> https://www.kafka-eagle.org/ </Usage>
*******************************************************************
  • 需要先启动Zookeeper和Kafka。
  • Welcome, Now you can visit 'http://10.211.55.20:8048',即访问地址。
  • Account:admin ,Password:123456,即用户名和密码

监控页面

页面

具体的Kafka-Eagle的使用方法略,我们自己点一点就能明白。
还可以参考官方文档:https://docs.kafka-eagle.org

硬件的选择

服务器台数

服务器台数=2×生产者峰值生产速率×副本数100+1\text{服务器台数} = 2 \times \frac{\text{生产者峰值生产速率} \times \text{副本数}}{100} + 1

  • 生产者峰值生产速率的单位是MB/S。
  • 生产者峰值生产速率指的是总的生产者峰值生产速率。
    例如,有两个生产者,生产者A在时刻1的速率是5MB/S,在时刻2的速率是2MB/S;生产者B在时刻1没有生产,在时刻2的速率是10MB/S,那么生产者峰值生产速率是12MB/S
  • 生产者峰值生产速率×副本数100\frac{\text{生产者峰值生产速率} \times \text{副本数}}{100},如果除不尽,建议向上取整,而不是四舍五入。

例如,生产者峰值生产速率是20MB/S,副本数是2,则有:

服务器台数=2×20×2100+1=3\begin{aligned} \text{服务器台数} & = 2 \times \frac{20 \times 2}{100} + 1 & = 3 \end{aligned}

磁盘

《Kafka-3.Broker》的"高效读写"部分,我们讨论过,Kafka写数据时候,是顺序写磁盘。

所以,固态硬盘和机械硬盘速度其实差不多。

如果Kafka集群出现了性能问题,除非是机械硬盘有故障,否则将机械硬盘换成固态硬盘,不会有太大的性能提升。

内存

Kafka内存分为两部分:

  • 堆内存
    由Kafka进行配置。
  • 页缓存

堆内存

修改方法

./bin/kafka-server-start.sh中修改

1
2
3
if [ "x$KAFKA_HEAP_OPTS" = "x" ]; then
export KAFKA_HEAP_OPTS="-Xmx1G -Xms1G"
fi

判断方法

查看Kafka进程号

示例代码:

1
jps
运行结果:
1
2
3
4
736799 KafkaEagle
875881 Jps
621199 Kafka
13884 QuorumPeerMain

查看使用情况
命令:jmap -heap 【Kafka进程号】

示例代码:

1
jmap -heap 621199
运行结果:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
Attaching to process ID 621199, please wait...
Debugger attached successfully.
Server compiler detected.
JVM version is 25.361-b09

using thread-local object allocation.
Garbage-First (G1) GC with 8 thread(s)

Heap Configuration:
MinHeapFreeRatio = 40
MaxHeapFreeRatio = 70
MaxHeapSize = 2147483648 (2048.0MB)
NewSize = 1363144 (1.2999954223632812MB)
MaxNewSize = 1287651328 (1228.0MB)
OldSize = 5452592 (5.1999969482421875MB)
NewRatio = 2
SurvivorRatio = 8
MetaspaceSize = 21807104 (20.796875MB)
CompressedClassSpaceSize = 1073741824 (1024.0MB)
MaxMetaspaceSize = 17592186044415 MB
G1HeapRegionSize = 1048576 (1.0MB)

Heap Usage:
G1 Heap:
regions = 2048
capacity = 2147483648 (2048.0MB)
used = 169624064 (161.76611328125MB)
free = 1977859584 (1886.23388671875MB)
7.898736000061035% used
G1 Young Generation:
Eden Space:
regions = 7
capacity = 103809024 (99.0MB)
used = 7340032 (7.0MB)
free = 96468992 (92.0MB)
7.070707070707071% used
Survivor Space:
regions = 9
capacity = 9437184 (9.0MB)
used = 9437184 (9.0MB)
free = 0 (0.0MB)
100.0% used
G1 Old Generation:
regions = 147
capacity = 2034237440 (1940.0MB)
used = 152846848 (145.76611328125MB)
free = 1881390592 (1794.23388671875MB)
7.513717179445877% used

13714 interned Strings occupying 1475928 bytes.

主要关注如下部分:

1
2
3
4
5
6
7
Heap Usage:
G1 Heap:
regions = 2048
capacity = 2147483648 (2048.0MB)
used = 169624064 (161.76611328125MB)
free = 1977859584 (1886.23388671875MB)
7.898736000061035% used

我们看到使用率只有7.89%,一般使用率超过了34\frac{3}{4},需要关注,增大堆内存。

另一个关注点:查看GC情况

命令:jstat -gc 【Kafka进程号】 1s 10

示例代码:

1
jstat -gc 621199 1s 10
运行结果:
1
2
3
4
5
6
7
8
9
10
11
S0C    S1C    S0U    S1U      EC       EU        OC         OU       MC     MU    CCSC   CCSU   YGC     YGCT    FGC    FGCT     GCT   
0.0 9216.0 0.0 9216.0 101376.0 26624.0 1986560.0 149264.5 51200.0 46317.2 6144.0 5578.8 14 6.869 0 0.000 6.869
0.0 9216.0 0.0 9216.0 101376.0 26624.0 1986560.0 149264.5 51200.0 46317.2 6144.0 5578.8 14 6.869 0 0.000 6.869
0.0 9216.0 0.0 9216.0 101376.0 26624.0 1986560.0 149264.5 51200.0 46317.2 6144.0 5578.8 14 6.869 0 0.000 6.869
0.0 9216.0 0.0 9216.0 101376.0 26624.0 1986560.0 149264.5 51200.0 46317.2 6144.0 5578.8 14 6.869 0 0.000 6.869
0.0 9216.0 0.0 9216.0 101376.0 26624.0 1986560.0 149264.5 51200.0 46317.2 6144.0 5578.8 14 6.869 0 0.000 6.869
0.0 9216.0 0.0 9216.0 101376.0 26624.0 1986560.0 149264.5 51200.0 46317.2 6144.0 5578.8 14 6.869 0 0.000 6.869
0.0 9216.0 0.0 9216.0 101376.0 26624.0 1986560.0 149264.5 51200.0 46317.2 6144.0 5578.8 14 6.869 0 0.000 6.869
0.0 9216.0 0.0 9216.0 101376.0 26624.0 1986560.0 149264.5 51200.0 46317.2 6144.0 5578.8 14 6.869 0 0.000 6.869
0.0 9216.0 0.0 9216.0 101376.0 26624.0 1986560.0 149264.5 51200.0 46317.2 6144.0 5578.8 14 6.869 0 0.000 6.869
0.0 9216.0 0.0 9216.0 101376.0 26624.0 1986560.0 149264.5 51200.0 46317.2 6144.0 5578.8 14 6.869 0 0.000 6.869

主要关注YGC(年轻代垃圾回收次数)。
在例子中,次数没有超过100,完全不用关注。

页缓存

页缓存是Linux系统服务器的内存。

每个节点页缓存大小=集群的总分区数×0.25节点数\text{每个节点页缓存大小} = \frac{\text{集群的总分区数} \times 0.25}{\text{节点数}}

  • 单位是GB。
  • 计算结果进行四舍五入。
  • 集群的总分区数,特指Leader分区,并且不包含系统分区。
  • 0.250.25是一个经验值,一个分区的数据全都读进内存,当然是最好的,但是一般没有必要(而且不一定一个分区的数就是1G)。

例如,集群中一共有10个Leader分区(不含系统主题的分区),3个节点,则:

每个节点页缓存大小=10×0.253=1\begin{aligned} \text{每个节点页缓存大小} & = \frac{10 \times 0.25}{3} & = 1 \end{aligned}

为什么不考虑系统分区

根据我们在之前章节的讨论,我们知道Kafka中有默认有100个系统分区(50个__consumer_offsets和50个__transaction_state)。

我查阅了许多资料,这些资料虽然在计算页缓存方面存在差异,但是都没有考虑到系统分区数。
而且,都没有解释不考虑系统分区的原因,甚至都没有提不考虑系统,只是在举例子的时候,体现了不考虑系统分区。

只能理解为,整个计算公式,其实就是经验,甚至是一种惯例。

CPU

Kafka是I/O密集型而非计算密集型的框架,所以对CPU的需求并不高,消耗CPU的点主要在于消息的压缩和解压缩。
此外,一个Broker节点往往要承载许多个TopicPartition并与许多个Producer/Consumer交互,所以并行度(核心/线程数)要比单核性能(频率)更重要。

注意根据CPU规格的不同,Broker如下三个参数要修改

  • num.io.threads
    负责写磁盘的线程数,整个参数值要占总核数的50%。
    默认8。
  • num.replica.fetchers
    副本拉取线程数,这个参数占总核数的50%的13\frac{1}{3}
    默认1。
  • num.network.threads
    数据传输线程数,这个参数占总核数的50%的23\frac{2}{3}
    默认3。

有些资料建议32核的CPU,我的观点是Kafka是I/O密集型而非计算密集型的框架,不需要那么大的CPU,具体我们可以根据生产环境的实际负载进行调整,通过top命令可以查看。

网络

网络带宽 = 峰值吞吐量

需要注意的是:

  • 网络带宽的但是为Mbps,b的含义是比特(bit);
  • 峰值吞吐量,可能用MB/s表示,B的含义是字节(Byte),1Byte=8bit。

生产者调优

调优方向有:

  • 提高吞吐量
  • 消息不丢失
  • 消息不重复
  • 消息有序

关于该部分,我们在《Kafka-2.Producer》已经讨论了。

Broker调优

调优方向

调优方向有:

  • 新增节点
  • 调整分区副本
  • Leader分区自动平衡
  • 关闭自动创建主题
  • 动态配置

除了"关闭自动创建主题"和"动态配置",我们没有讨论。其他的,我们在《Kafka-3.Broker》已经讨论了。

关闭自动创建主题

参数auto.create.topics.enable,自动创建副本。
当生产者向一个未创建的主题发送消息时,或者当一个消费者开始从未创建的主题中读取消息时,Kafka会自动创建一个分区数为num.partitions(默认值为1)、副本因子为default.replication.factor(默认值为1)的主题。
这种创建主题的方式是非预期的,增加了主题管理和维护的难度。
生产环境建议将该参数设置为false

动态配置

操作方法

参数

示例代码:

1
./bin/kafka-configs.sh

运行结果:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
--add-config <String>                  Key Value pairs of configs to add.     
Square brackets can be used to group
values which contain commas: 'k1=v1,
k2=[v1,v2,v2],k3=v3'. The following
is a list of valid configurations:
For entity-type 'topics':

【部分运行结果略】

follower.replication.throttled.replicas
leader.replication.throttled.replicas

【部分运行结果略】

For entity-type 'brokers':

【部分运行结果略】

follower.replication.throttled.rate
leader.replication.throttled.rate

【部分运行结果略】

For entity-type 'users':

【部分运行结果略】

consumer_byte_rate
producer_byte_rate

【部分运行结果略】

For entity-type 'clients':

【部分运行结果略】

consumer_byte_rate
producer_byte_rate

【部分运行结果略】
For entity-type 'ips':
【部分运行结果略】

--add-config-file <String> Path to a properties file with configs
to add. See add-config for a list of
valid configurations.
--all List all configs for the given topic,
broker, or broker-logger entity
(includes static configuration when
the entity type is brokers)
--alter Alter the configuration for the entity.
--bootstrap-server <String: server to The Kafka server to connect to. This
connect to> is required for describing and
altering broker configs.
--broker <String> The broker's ID.
--broker-defaults The config defaults for all brokers.
--broker-logger <String> The broker's ID for its logger config.
--client <String> The client's ID.
--client-defaults The config defaults for all clients.
--command-config <String: command Property file containing configs to be
config property file> passed to Admin Client. This is used
only with --bootstrap-server option
for describing and altering broker
configs.
--delete-config <String> config keys to remove 'k1,k2'
--describe List configs for the given entity.
--entity-default Default entity name for
clients/users/brokers/ips (applies
to corresponding entity type in
command line)
--entity-name <String> Name of entity (topic name/client
id/user principal name/broker id/ip)
--entity-type <String> Type of entity
(topics/clients/users/brokers/broker-
loggers/ips)
--force Suppress console prompts
--help Print usage information.
--ip <String> The IP address.
--ip-defaults The config defaults for all IPs.
--topic <String> The topic's name.
--user <String> The user's principal name.
--user-defaults The config defaults for all users.
--version Display Kafka version.
--zk-tls-config-file <String: Identifies the file where ZooKeeper
ZooKeeper TLS configuration> client TLS connectivity properties
are defined. Any properties other
than zookeeper.clientCnxnSocket,
zookeeper.ssl.cipher.suites,
zookeeper.ssl.client.enable,
zookeeper.ssl.crl.enable, zookeeper.
ssl.enabled.protocols, zookeeper.ssl.
endpoint.identification.algorithm,
zookeeper.ssl.keystore.location,
zookeeper.ssl.keystore.password,
zookeeper.ssl.keystore.type,
zookeeper.ssl.ocsp.enable, zookeeper.
ssl.protocol, zookeeper.ssl.
truststore.location, zookeeper.ssl.
truststore.password, zookeeper.ssl.
truststore.type are ignored.
--zookeeper <String: urls> DEPRECATED. The connection string for
the zookeeper connection in the form
host:port. Multiple URLS can be
given to allow fail-over. Required
when configuring SCRAM credentials
for users or dynamic broker configs
when the relevant broker(s) are
down. Not allowed otherwise.

我们主要需要关注的参数有:

  • --add-config,增加配置,以'k1=v1,k2=v2,k3=v3'的形式,对于不同的被操作的实体的类型,可以增加的配置也不同。
    • producer_byte_rate,生产者最大速率,单位是字节每秒
    • consumer_byte_rate,消费者最大速率,单位是字节每秒
    • follower.replication.throttled.replicasleader.replication.throttled.replicas,副本列表
    • follower.replication.throttled.rateleader.replication.throttled.rate,同步速率
  • --delete-config,删除配置,以'k1,k2,k3'的形式。
  • --entity-type,被操作的实体的类型,有topicsclientsusersbrokersbroker-loggersips
  • --entity-name,和--entity-type是配套的,被操作的实体的名称,有topic nameclient iduser principal namebroker idip
  • --entity-default,和--entity-type是配套的,表示默认的--entity-name,一般情况下,默认的含义是所有。

使用注意:

  • 可以同时制定多个实体类型,例如:
    1
    2
    3
    4
    --entity-type users \
    --entity-name user1 \
    --entity-type clients \
    --entity-name clientA
    表示操作对象是某个客户端的某个用户。
  • clientid是每个接入kafka集群的client的一个身份标志,users只有在开启了身份认证的kafka集群才有。
  • 生产者的clientid默认值为producer-自增序号
  • 消费者的clientid默认值为groupid

副本列表和同步速率的详细介绍:

Brokers配额限流

  • Follower向Leader发起复制请求,用于follower.replication.throttled.rate限流。
  • Leader向Follower返回复制响应,用于leader.replication.throttled.rate限流。
  • XXX.replication.throttled.replicas只有配合XXX.replication.throttled.rate,才能生效。
  • xxx.replication.throttled.rate,必须设置具体的brokerId,设置--entity-default无效。

例子

增加配置项

对某个topic增加配置:

1
2
3
4
5
6
./bin/kafka-configs.sh \
--bootstrap-server kafka-01:9092 \
--alter \
--entity-type topics \
--entity-name 【主题名称】 \
--add-config 'k1=v1,k2=v2,k3=v3'
  • --alter,表示要做修改操作。

对所有client增加配置:

1
2
3
4
5
6
./bin/kafka-configs.sh \
--bootstrap-server kafka-01:9092 \
--alter
--entity-type clients
--entity-default
--add-config 'k1=v1,k2=v2,k3=v3'

删除配置项

1
2
3
4
5
6
./bin/kafka-configs.sh \
--bootstrap-server kafka-01:9092 \
--alter \
--entity-type topics \
--entity-name 【主题名称】\
--delete-config ‘k1,k2,k3’

修改配置项

修改配置项与"增加配置项"相同,对于已有的参数,会直接覆盖。

查看配置项

1
2
3
4
5
./bin/kafka-configs.sh \
--bootstrap-server kafka-01:9092 \
--entity-type topics \
--entity-name 【主题名称】\
--describe

注意!查看的只是动态配置项,不是相关的所有配置。
示例代码:

1
2
3
4
5
./bin/kafka-configs.sh \ \
--bootstrap-server kafka-01:9092 \
--entity-type topics \
--entity-name first \
--describe
运行结果:
1
Dynamic configs for topic first are:

客户端配额限流

命令

通过上文的讨论,其实客户端配额限流已经很简单了,是动态配置的一个具体应用。

对生产者客户端进行限流:

1
2
3
4
5
6
./bin/kafka-configs.sh \
--bootstrap-server kafka-01:9092,kafka-02:9092,kafka-03:9092 \
--alter \
--add-config 'producer_byte_rate=1048576' \
--entity-type clients \
--entity-default

对消费者客户端进行限流:

1
2
3
4
5
6
./bin/kafka-configs.sh \
--bootstrap-server kafka-01:9092,kafka-02:9092,kafka-03:9092 \
--alter \
--add-config 'consumer_byte_rate=20971520' \
--entity-type clients \
--entity-default

原理过程

Kakfa处理限流的原理过程:

  • 对于生产者
    Kafka先把数据append到log文件,再计算延时,并在等待ThrottleTime时间后响应给Producer。
    如果没有客户端反馈机制,生产者可能会认为自己写入超时了,这时候会重发,写入消息会重复。
  • 对于消费者
    Kafka先计算延时时间,并在等待ThrottleTime时间后,Kafka从log读取数据并响应Consumer。
    如果消费者的QequestTimeout小于ThrottleTime,消费者在ThrottleTime时间内会不断重发fetch请求,kafka会堆积大量无效请求,占用资源。

这种通过"Sleep"进行速度控制的方法,还在一个地方有,DataX。
具体可以参考《离线异构数据同步工具DataX:2.源码概览》的"数据传输"的"限速方法"。

Brokers配额限流

1
2
3
4
5
6
./bin/kafka-configs.sh \
--bootstrap-server kafka-01:9092,kafka-02:9092,kafka-03:9092 \
--alter \
--entity-type brokers \
--entity-name 【brokerId】 \
--add-config 'leader.replication.throttled.rate=10485760'

消费者调优

调优方向有:

  • 避免触发消费者再平衡
  • 提高吞吐量

关于该部分,我们在《Kafka-4.Consumer》已经讨论了。

压力测试

Kafka自带了压力测试的脚本:

  • 生产者压测:kafka-producer-perf-test.sh
  • 消费者压测:kafka-consumer-perf-test.sh

生产者压测

生产者压测:kafka-producer-perf-test.sh

查看参数

直接执行kafka-producer-perf-test.sh,即可查看参数。
示例代码:

1
./bin/kafka-producer-perf-test.sh

运行结果:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
usage: producer-performance [-h] --topic TOPIC --num-records NUM-RECORDS [--payload-delimiter PAYLOAD-DELIMITER] --throughput THROUGHPUT [--producer-props PROP-NAME=PROP-VALUE [PROP-NAME=PROP-VALUE ...]]
[--producer.config CONFIG-FILE] [--print-metrics] [--transactional-id TRANSACTIONAL-ID] [--transaction-duration-ms TRANSACTION-DURATION] (--record-size RECORD-SIZE |
--payload-file PAYLOAD-FILE)

This tool is used to verify the producer performance.

optional arguments:
-h, --help show this help message and exit
--topic TOPIC produce messages to this topic
--num-records NUM-RECORDS
number of messages to produce
--payload-delimiter PAYLOAD-DELIMITER
provides delimiter to be used when --payload-file is provided. Defaults to new line. Note that this parameter will be ignored if --payload-file is not provided. (default: \n)
--throughput THROUGHPUT
throttle maximum message throughput to *approximately* THROUGHPUT messages/sec. Set this to -1 to disable throttling.
--producer-props PROP-NAME=PROP-VALUE [PROP-NAME=PROP-VALUE ...]
kafka producer related configuration properties like bootstrap.servers,client.id etc. These configs take precedence over those passed via --producer.config.
--producer.config CONFIG-FILE
producer config properties file.
--print-metrics print out metrics at the end of the test. (default: false)
--transactional-id TRANSACTIONAL-ID
The transactionalId to use if transaction-duration-ms is > 0. Useful when testing the performance of concurrent transactions. (default: performance-producer-default-transactional-
id)
--transaction-duration-ms TRANSACTION-DURATION
The max age of each transaction. The commitTransaction will be called after this time has elapsed. Transactions are only enabled if this value is positive. (default: 0)

either --record-size or --payload-file must be specified but not both.

--record-size RECORD-SIZE
message size in bytes. Note that you must provide exactly one of --record-size or --payload-file.
--payload-file PAYLOAD-FILE
file to read the message payloads from. This works only for UTF-8 encoded text files. Payloads will be read from this file and a payload will be randomly selected when sending
messages. Note that you must provide exactly one of --record-size or --payload-file.

常用参数

  • --record-size
    一条信息的数据量,单位是字节。
    本次测试设置为1KB
  • num-records
    一共发送多少条信息。
    本次测试设置为100万条。
  • --throughput
    是每秒多少条信息,-1,表示不限流,尽可能快的生产数据,用这个可以测出生产者最大吞吐量。
    本次测试设置为每秒1万条。
  • --producer-props
    后面跟配置生产者相关参数,batch.sizelinger.ms等。
    我们可以通过这个,寻找生产者中的一个较优配置。

例子

示例代码:

1
2
3
4
5
6
./bin/kafka-producer-perf-test.sh \
--topic first \
--record-size 1024 \
--num-records 1000000 \
--throughput 10000 \
--producer-props bootstrap.servers=kafka-01:9092,kafka-02:9092,kafka-03:9092 batch.size=16384 linger.ms=500

运行结果:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
41266 records sent, 8235.1 records/sec (8.04 MB/sec), 789.0 ms avg latency, 1091.0 ms max latency.
57287 records sent, 11450.5 records/sec (11.18 MB/sec), 402.0 ms avg latency, 1084.0 ms max latency.
51287 records sent, 10257.4 records/sec (10.02 MB/sec), 50.7 ms avg latency, 222.0 ms max latency.
40944 records sent, 8188.8 records/sec (8.00 MB/sec), 318.7 ms avg latency, 966.0 ms max latency.
51930 records sent, 10383.9 records/sec (10.14 MB/sec), 1149.5 ms avg latency, 1476.0 ms max latency.
57583 records sent, 11512.0 records/sec (11.24 MB/sec), 181.0 ms avg latency, 770.0 ms max latency.
47850 records sent, 9568.1 records/sec (9.34 MB/sec), 35.2 ms avg latency, 219.0 ms max latency.
48015 records sent, 9603.0 records/sec (9.38 MB/sec), 372.4 ms avg latency, 600.0 ms max latency.
50303 records sent, 10052.6 records/sec (9.82 MB/sec), 481.5 ms avg latency, 1015.0 ms max latency.
52117 records sent, 10421.3 records/sec (10.18 MB/sec), 273.3 ms avg latency, 647.0 ms max latency.
47205 records sent, 9435.3 records/sec (9.21 MB/sec), 355.9 ms avg latency, 543.0 ms max latency.
53550 records sent, 10707.9 records/sec (10.46 MB/sec), 229.9 ms avg latency, 483.0 ms max latency.
51242 records sent, 10248.4 records/sec (10.01 MB/sec), 29.5 ms avg latency, 133.0 ms max latency.
49613 records sent, 9922.6 records/sec (9.69 MB/sec), 39.8 ms avg latency, 138.0 ms max latency.
49287 records sent, 9857.4 records/sec (9.63 MB/sec), 102.0 ms avg latency, 317.0 ms max latency.
41748 records sent, 8349.6 records/sec (8.15 MB/sec), 292.2 ms avg latency, 956.0 ms max latency.
44730 records sent, 8946.0 records/sec (8.74 MB/sec), 1363.8 ms avg latency, 1542.0 ms max latency.
51390 records sent, 10278.0 records/sec (10.04 MB/sec), 1771.4 ms avg latency, 2096.0 ms max latency.
63375 records sent, 12675.0 records/sec (12.38 MB/sec), 600.1 ms avg latency, 1485.0 ms max latency.
47456 records sent, 9491.2 records/sec (9.27 MB/sec), 66.7 ms avg latency, 273.0 ms max latency.
1000000 records sent, 9964.923469 records/sec (9.73 MB/sec), 442.95 ms avg latency, 2096.00 ms max latency, 227 ms 50th, 1505 ms 95th, 1927 ms 99th, 2058 ms 99.9th.

消费者压测

消费者压测:kafka-consumer-perf-test.sh

查看参数

示例代码:

1
./bin/kafka-consumer-perf-test.sh

运行结果:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
Missing required option(s) [bootstrap-server]
Option Description
------ -----------
--bootstrap-server <String: server to REQUIRED unless --broker-list
connect to> (deprecated) is specified. The server
(s) to connect to.
--broker-list <String: broker-list> DEPRECATED, use --bootstrap-server
instead; ignored if --bootstrap-
server is specified. The broker
list string in the form HOST1:PORT1,
HOST2:PORT2.
--consumer.config <String: config file> Consumer config properties file.
--date-format <String: date format> The date format to use for formatting
the time field. See java.text.
SimpleDateFormat for options.
(default: yyyy-MM-dd HH:mm:ss:SSS)
--fetch-size <Integer: size> The amount of data to fetch in a
single request. (default: 1048576)
--from-latest If the consumer does not already have
an established offset to consume
from, start with the latest message
present in the log rather than the
earliest message.
--group <String: gid> The group id to consume on. (default:
perf-consumer-71970)
--help Print usage information.
--hide-header If set, skips printing the header for
the stats
--messages <Long: count> REQUIRED: The number of messages to
send or consume
--num-fetch-threads <Integer: count> DEPRECATED AND IGNORED: Number of
fetcher threads. (default: 1)
--print-metrics Print out the metrics.
--reporting-interval <Integer: Interval in milliseconds at which to
interval_ms> print progress info. (default: 5000)
--show-detailed-stats If set, stats are reported for each
reporting interval as configured by
reporting-interval
--socket-buffer-size <Integer: size> The size of the tcp RECV size.
(default: 2097152)
--threads <Integer: count> DEPRECATED AND IGNORED: Number of
processing threads. (default: 10)
--timeout [Long: milliseconds] The maximum allowed time in
milliseconds between returned
records. (default: 10000)
--topic <String: topic> REQUIRED: The topic to consume from.
--version Display Kafka version.

常用参数

  • --bootstrap-server,指定Kafka集群地址。
  • --topic,指定topic的名称。
  • --messages,一共需要消费的消息条数。
    本次测试100万条。
  • --consumer.config
    消费者配置文件地址。
    我们可以通过这个,寻找消费者中的一个较优配置。

例子

示例代码:

1
2
3
4
5
./bin/kafka-consumer-perf-test.sh \
--bootstrap-server kafka-01:9092,kafka-02:9092,kafka-03:9092 \
--topic first \
--messages 1000000 \
--consumer.config ./config/consumer.properties \

运行结果:

1
2
start.time, end.time, data.consumed.in.MB, MB.sec, data.consumed.in.nMsg, nMsg.sec, rebalance.time.ms, fetch.time.ms, fetch.MB.sec, fetch.nMsg.sec
2023-03-07 20:26:06:838, 2023-03-07 20:26:27:029, 976.8214, 48.3791, 1000324, 49543.0637, 2130, 18061, 54.0846, 55385.8590

避免消息丢失

主要从Producer、Broker、Consumer等3个方面分析了Kafka应该如何配置才能避免消息丢失。

概述

在使用MQ的时候最大的问题就是消息丢失,常见的丢失情况如下:

  1. Producer 端丢失
  2. Broker 端丢失
  3. Consumer 端丢失

一条消息从生产到消费一共要经过以下3个流程:

  1. Producer 发送到 Broker
  2. Broker 保存消息(持久化)
  3. Consumer 消费消息

3个步骤分别对应了上述的3种消息丢失场景。

消息持久化保障

Kafka只对"已提交"的消息(committed message)做有限度的持久化保证,其他MQ也类似。

第一个核心要素是已提交的消息。
什么是已提交的消息?当Kafka的若干个Broker成功地接收到一条消息并写入到日志文件后,它们会告诉生产者程序这条消息已成功提交。此时,这条消息在Kafka看来就正式变为"已提交"消息了。
为什么是若干个Broker呢?这取决于我们对"已提交"的定义。
我们可以选择只要有一个Broker成功保存该消息就算是已提交,也可以是令所有Broker都成功保存该消息才算是已提交。
不论哪种情况,Kafka只对已提交的消息做持久化保证这件事情是不变的。

第二个核心要素就是有限度的持久化保证。
也就是说Kafka不可能保证在任何情况下都做到不丢失消息。
有限度其实就是说Kafka不丢消息是有前提条件的,假如我们的消息保存在N个Kafka-Broker上,那么这个前提条件就是这N个Broker中至少有1个存活。
只要这个条件成立,Kafka就能保证你的这条消息永远不会丢失。

具体场景分析

Producer端丢失

Producer端丢消息更多是因为消息根本没有提交到Kafka。

目前Kafka-Producer是异步发送消息的,也就是说如果我们调用的是producer.send(msg)这个API,会立即返回,但我们不能认为消息发送已成功完成。
这种发送方式有个有趣的名字,叫"fire and forget",翻译一下就是"发射后不管"。如果出现消息丢失,我们是无法知晓的。这个发送方式挺不靠谱,非常不建议使用。

导致消息没有发送成功的因素也有很多:网络抖动,导致消息压根就没有发送到Broker端;消息本身不合格导致Broker拒绝接收(比如消息太大了,超过了Broker的承受能力)等。

解决方案也很简单:Producer永远要使用带有回调通知的发送API,也就是说不要使用producer.send(msg),而要使用producer.send(msg, callback)

通过回调,一旦出现消息提交失败的情况,我们可以有针对性地进行处理。

如果是因为那些瞬时错误,那么仅仅让Producer重试就可以了;如果是消息不合格造成的,那么可以调整消息格式后再次发送。

Broker端丢失

Broker丢失消息是由Kafka自身原因造成的。

Kafka为了提高吞吐量和性能,采用异步批量的刷盘策略,也就是按照一定的消息量和间隔时间进行刷盘。Kafka收到消息后会先存储在也缓存中(Page Cache)中,之后由操作系统根据自己的策略进行刷盘或者通过fsync命令强制刷盘。如果系统挂掉,在PageCache中的数据就会丢失。

Kafka没有提供同步刷盘的方式,也就是说单个Broker丢失消息是必定会出现的。

为了解决单个Broker数据丢失问题,Kafka通过Producer和Broker协同处理单个Broker丢失参数的情况:

  • acks=0,Producer不等待Broker的响应,效率最高,但是消息很可能会丢。
  • acks=1,Leader-Broker收到消息后,不等待其他Follower的响应,即返回ack,也可以理解为ack数为1。此时,如果Follower还没有收到Leader同步的消息,Leader 就挂了,那么消息会丢失。
  • acks=-1(-1等效于all),Leader-Broker收到消息后,挂起,等待所有ISR列表中的Follower返回结果后,再返回ack。这种配置下,如果Leader刚收到消息就断电,Producer可以知道消息没有被发送成功,将会重新发送。如果在Follower收到数据以后,成功返回ack,Leader断电,数据将存在于原来的Follower中。在重新选举以后,新的Leader会持有该部分数据。
    在配置为all或者-1的时候,只要Producer收到Broker的响应就可以理解为消息已经持久化了。虽然可能只是刚写入了PageCache,但是刷盘也就是迟早的事,除非刚好刷盘之前多个Broker同时挂了,那确实是没办法了。

建议根据实际情况设置:

  • 如果要严格保证消息不丢失,请设置为all或-1;
  • 如果允许存在丢失,建议设置为1;
  • 一般不建议设为0,除非无所谓消息丢不丢失。

Consumer端丢失

Consumer端丢失数据主要体现在Consumer端要消费的消息不见了。

出现该情况的唯一原因就是:Consumer没有正确消费消息,就把位移提交了,导致Kafka认为该消息已经被消费了,从而导致消息丢失。

例如:

  1. 获取到消息后直接提交位移了,然后再处理消息。
    这样在提交位移后,处理完消息前,如果程序挂掉,这部分消息就算是丢失了。
  2. 多线程并发消费消息,且开启了自动提交,导致消费完成之前程序就自动提交了位移,如果程序挂掉也会出现消息丢失。

解决方案:确定消费完成后才提交消息,如果是多线程异步处理消费消息,Consumer程序不要开启自动提交位移,而是要应用程序手动提交位移。

最佳实践

避免Producer端丢失

  • 不要使用 producer.send(msg),而要使用 producer.send(msg, callback)。记住,一定要使用带有回调通知的 send 方法。
  • 设置 retries 为一个较大的值。这里的 retries 同样是 Producer 的参数,对应前面提到的 Producer 自动重试。当出现网络的瞬时抖动时,消息发送可能会失败,此时配置了 retries > 0 的 Producer 能够自动重试消息发送,避免消息丢失。

避免Broker端丢失

  • 设置 acks = all。acks 是 Producer 的一个参数,代表了你对“已提交”消息的定义。如果设置成 all,则表明所有副本 Broker 都要接收到消息,该消息才算是“已提交”。这是最高等级的“已提交”定义。
  • 设置 unclean.leader.election.enable = false。这是 Broker 端的参数,它控制的是哪些 Broker 有资格竞选分区的 Leader。如果一个 Broker 落后原先的 Leader 太多,那么它一旦成为新的 Leader,必然会造成消息的丢失。故一般都要将该参数设置成 false,即不允许这种情况的发生。
  • 设置 replication.factor >= 3。这也是 Broker 端的参数。其实这里想表述的是,最好将消息多保存几份,毕竟目前防止消息丢失的主要机制就是冗余。
  • 设置 min.insync.replicas > 1。这依然是 Broker 端参数,控制的是消息至少要被写入到多少个副本才算是“已提交”。设置成大于 1 可以提升消息持久性。在实际环境中千万不要使用默认值 1。
  • 确保 replication.factor > min.insync.replicas。如果两者相等,那么只要有一个副本挂机,整个分区就无法正常工作了。我们不仅要改善消息的持久性,防止数据丢失,还要在不降低可用性的基础上完成。推荐设置成 replication.factor = min.insync.replicas + 1。

避免Consumer端丢失

  • 确保消息消费完成再提交。Consumer 端有个参数 enable.auto.commit,最好把它设置成 false,并采用手动提交位移的方式。就像前面说的,这对于单 Consumer 多线程处理的场景而言是至关重要的。
文章作者: Kaka Wan Yifan
文章链接: https://kakawanyifan.com/11905
版权声明: 本博客所有文章版权为文章作者所有,未经书面许可,任何机构和个人不得以任何形式转载、摘编或复制。

留言板