avatar


Kafka-2.Producer

整体架构

发送原理

整个生产者客户端由两个线程协调运行。

  • 主线程
    在主线程中由KafkaProducer创建消息,然后通过拦截器(或有)、序列化器和分区器的作用之后缓存到RecordAccumulator(消息累加器,也称为消息收集器)中。
  • Sender线程(发送线程)
    Sender线程负责从RecordAccumulator获取消息并将其发送到Kafka中。

Kafka中的拦截器和我们在《基于Java的后端开发入门:17.SpringMVC》讨论的拦截器,作用是一样的,都是对数据进行一些加工操作,也都是一个可选项。
(在实际生产中,很少用这个拦截器。)

Kafka有专门的序列化器,没有用Java的序列化器(java.io.Serializable),因为Java的序列化器相对太重了。
(其实不仅仅是Kafka,在Hadoop和Spark中,也都有专门的序列化器。)

分区器会根据一定的规则,将消息发到对应的分区,一个分区创建一个队列。

RecordAccumulator主要用来缓存消息,以便Sender线程可以批量发送,进而减少网络传输的资源消耗以提升性能。

RecordAccumulator的缓存的大小可以通过生产者客户端参数buffer.memory配置,默认值为32MB。

如果生产者发送消息到RecordAccumulator速度持续的超过发送到服务器的速度,可能会导致生产者空间不足,这个时候KafkaProducer的send()方法会被阻塞或抛出异常。如果超过了最大阻塞时间则抛异常,最大阻塞时间由参数max.block.ms配置,默认值为60000,即60秒。

RecordAccumulator的内部为每个分区都维护了一个双端队列(Deque),主线程发送过来的消息都会被追加到某个双端队列的尾部,Sender会从双端队列的头部读取消息。队列中的内容是ProducerBatch,即Deque<ProducerBatch>

注意,是ProducerBatch不是ProducerRecord,ProducerBatch中包含一个或多个ProducerRecord。
ProducerRecord是生产者中创建的消息,而ProducerBatch是指一个消息批次,ProducerRecord会被包含在ProducerBatch中。

如果生产者客户端需要向很多分区发送消息,可以修改buffer.memory参数,适当调大以增加整体的吞吐量。

在Kafka生产者客户端中,通过java.io.ByteBuffer实现消息内存的创建和释放。

不过频繁的创建和释放是比较耗费资源的,在RecordAccumulator的内部还有一个BufferPool,它主要用来实现ByteBuffer的复用,以实现缓存的高效利用。

但是BufferPool只针对特定大小的ByteBuffer进行管理,而其他大小的ByteBuffer不会缓存进BufferPool中,这个特定的大小由batch.size参数来指定,默认值为16KB,我们可以适当地调大batch.size参数,以便多缓存一些消息。

ProducerRecord的大小和batch.size密切相关,batch.size的默认值为16KB。
(注意,是密切相关,不是说每一个ProducerRecord的大小都是batch.size,可能会大于batch.size。)

当一条消息(ProducerRecord)流入RecordAccumulator时,会先寻找与消息分区所对应的双端队列(如果没有则新建),再从这个双端队列的尾部获取一个ProducerBatch(如果没有则新建),查看ProducerBatch中是否还可以写入这个ProducerRecord,如果可以则写入,如果不可以则需要创建一个新的ProducerBatch。

在新建ProducerBatch时评估这条消息的大小是否超过batch.size参数的大小,如果不超过,那么就以batch.size参数的大小来创建ProducerBatch,这样在使用完这段内存区域之后,可以通过BufferPool的管理来进行复用;如果超过,那么就以评估的大小来创建ProducerBatch,这段内存区域不会被复用。

Sender线程会创建一个Sender对象,把RecordAccumulator中的消息发往Kafka。
具体会根据两个规则,当数据积累到batch.size或者已经等待了linger.ms设置的时间。

linger.ms的时间默认是0ms。
所以,默认情况下,每生产一条消息,就立即发送一条消息。
这样效率其实并不高,在实际生产中,这个值一般都会调整。

Sender对象从RecordAccumulator中获取缓存的消息之后,会将原本<分区,Deque<ProducerBatch>>的保存形式转变成<Node, List<ProducerBatch>>的形式,其中Node表示Kafka集群的broker节点。

对于网络连接来说,生产者客户端是与具体的broker节点建立的连接,也就是向具体的broker节点发送消息,而并不关心消息属于哪一个分区;但是对于KafkaProducer的应用逻辑而言,只关注向哪个分区中发送哪些消息。
所以在这里需要做一个应用逻辑层面到网络IO层面的转换。

在转换成<Node, List<ProducerBatch>>的形式之后,Sender线程还将其进一步封装成<Node,Request>的形式,这样就可以将Request请求发往各个Node了,这里的Request是指Kafka的各种协议请求,对于消息发送而言就是ProduceRequest。

请求在从Sender线程发往Kafka之前还会保存到InFlightRequests(飞行中的请求)中,InFlightRequests保存对象的具体形式为Map<NodeId, Deque<Request>>,主要作用是缓存了已经发出去但还没有收到响应的请求(NodeId是一个String类型,表示节点的ID)。

InFlightRequests提供了许多管理类的方法,并且通过配置参数max.in.flight.requests.per.connection还可以限制每个连接(客户端与Node之间的连接)最多缓存的请求数,max.in.flight.requests.per.connection的默认值是5,即每个连接最多只能缓存5个未响应的请求,超过该数值之后就不能再向这个连按发送更多的请求了,除非有缓存的请求收到了响应(Response)。

InFlightRequests的作用仅仅是缓存那些已经发出去但还没有收到响应的请求。
最后,发送还需要Selector对象。
Selector将消息发送给Kafka,Kafka收到消息后,有一个ACK应答机制,指定分区中必须要有多少个副本收到这条消息。

  • 0:生产者发送过来的消息,不需要等落盘即应答。
  • 1:生产者发送过来的消息,Leader收到后应答。
  • -1(all):生产者发送过来的消息,Leader和ISR队列里面的所有副本收到后应答。

如果Selector收到了成功,生产者会清除InFlightRequestsRecordAccumulator中的消息。
如果Selector收到了失败,会进行重试,重试的次数,默认是int的最大值,即"死磕"。

重要参数

bootstrap.servers

生产者连接集群所需的broker地址清单,可以设置一个或者多个,中间用逗号隔开。

例如:

1
node1:9092,node2:9092,node3:9092

注意这里不需要所有的broker地址,生产者会从给定的broker信息查找到其他broker信息。

key.serializervalue.serializer

指定发送消息的 key 和 value 的序列化类型。一定要写全类名。

buffer.memory

RecordAccumulator缓冲区总大小,默认32MB。

batch.size

默认16KB。

有些资料说这个是ProducerBatch的最大值,其实是不准确的。根据我们上文关于生产者整体的架构的讨论,我们知道ProducerBatch的大小可能会超过这个值。

我们可以适当增加batch.size的值,以提高吞吐量。注意,是适当,如果设置太大,可能会导致数据传输延迟增加。

linger.ms

如果数据迟迟未达到batch.size,Sender对象在等待linger.ms之后就会发送数据,单位ms,默认值是0ms,没有延迟。

acks

  • 0:生产者发送过来的消息,不需要等落盘即应答。
  • 1:生产者发送过来的消息,Leader收到后应答。
  • -1(all):生产者发送过来的消息,Leader和ISR队列里面的所有副本收到后应答。

上文,我们说的是"ACK",这里说的又是acks
一般,这种应答机制,称之为ACK机制,但是具体的参数是acks

max.in.flight.requests.per.connection

默认值是5,即每个连接最多只能缓存5个未响应的请求

retries

如果Selector收到了失败,会进行重试,重试的次数由该参数设置。默认是int的最大值,2147483647。

有些资料说,如果设置了重试,但想保证消息的有序性,需要设置max.in.flight.requests.per.connection=1,否则在重试此失败消息的时候,其他的消息可能发送成功了,导致消息的顺序错乱。
这个确实很有道理,但是不够完整。
在本文"消息有序"部分,会讨论。

retry.backoff.ms

两次重试之间的时间间隔,默认是 100ms。

enable.idempotence

是否开启幂等性,默认true,开启幂等性。

compression.type

生产者发送的所有数据的压缩方式。默认是none,也就是不压缩。
支持压缩类型有:nonegzipsnappylz4zstd

发送消息

普通异步发送

pom.xml

1
2
3
4
5
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>3.3.2</version>
</dependency>

MyProducer

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
package com.kakawanyifan;

import org.apache.kafka.clients.producer.*;
import org.apache.kafka.common.serialization.StringSerializer;

import java.util.Properties;

public class MyProducer {
public static void main(String[] args) {
// 创建Kafka生产者的配置对象
Properties properties = new Properties();
// 给Kafka配置对象添加配置信息:bootstrap.servers
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "10.211.55.14:9092");

// key,value 序列化(必须):key.serializer,value.serializer
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,StringSerializer.class.getName());

// 创建Kafka生产者对象
KafkaProducer<String, String> kafkaProducer = new KafkaProducer<String, String>(properties);

// 调用send方法,发送消息
for (int i = 0; i < 5; i++) {
kafkaProducer.send(new ProducerRecord<>("myTopic","myMessage " + i));
}

// 关闭资源
kafkaProducer.close();
}
}
运行结果:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20:40:25.993 [main] INFO org.apache.kafka.clients.producer.ProducerConfig - ProducerConfig values: 
acks = -1
batch.size = 16384
bootstrap.servers = [10.211.55.14:9092]
buffer.memory = 33554432
client.dns.lookup = use_all_dns_ips

【部分运行结果略】

20:40:26.421 [kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.clients.producer.internals.RecordAccumulator - [Producer clientId=producer-1] Assigned producerId 2029 and producerEpoch 0 to batch with base sequence 0 being sent to partition myTopic-2
20:40:26.426 [kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.clients.NetworkClient - [Producer clientId=producer-1] Sending PRODUCE request with header RequestHeader(apiKey=PRODUCE, apiVersion=9, clientId=producer-1, correlationId=4) and timeout 30000 to node 0: {acks=-1,timeout=30000,partitionSizes=[myTopic-2=151]}
20:40:26.427 [kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.clients.NetworkClient - [Producer clientId=producer-1] Received PRODUCE response from node 0 for request with header RequestHeader(apiKey=PRODUCE, apiVersion=9, clientId=producer-1, correlationId=4): ProduceResponseData(responses=[TopicProduceResponse(name='myTopic', partitionResponses=[PartitionProduceResponse(index=2, errorCode=0, baseOffset=5486, logAppendTimeMs=-1, logStartOffset=0, recordErrors=[], errorMessage=null)])], throttleTimeMs=0)
20:40:26.430 [kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.clients.producer.internals.TransactionManager - [Producer clientId=producer-1] ProducerId: 2029; Set last ack'd sequence number for topic-partition myTopic-2 to 4
20:40:26.436 [kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.clients.producer.internals.Sender - [Producer clientId=producer-1] Shutdown of Kafka producer I/O thread has completed.
20:40:26.436 [main] INFO org.apache.kafka.common.metrics.Metrics - Metrics scheduler closed
20:40:26.436 [main] INFO org.apache.kafka.common.metrics.Metrics - Closing reporter org.apache.kafka.common.metrics.JmxReporter
20:40:26.436 [main] INFO org.apache.kafka.common.metrics.Metrics - Metrics reporters closed
20:40:26.437 [main] INFO org.apache.kafka.common.utils.AppInfoParser - App info kafka.producer for producer-1 unregistered
20:40:26.437 [main] DEBUG org.apache.kafka.clients.producer.KafkaProducer - [Producer clientId=producer-1] Kafka producer has been closed

logback.xml

我们看到,打印了很多日志,而且有些日志还是DEBUG级别的。
如果不想打印那么多日志,在resource目录下添加logback.xml配置文件,内容如下:

1
2
3
4
5
<?xml version="1.0" encoding="UTF-8"?>
<configuration debug="false" scan="true" scanPeriod="1 seconds">
<logger name="org.apache.kafka.clients" level="ERROR" />
<logger name="org.apache.kafka.server" level="ERROR" />
</configuration>

通过提高日志级别,不打印DEGUB日志。
关于日志级别,可以参考《基于Java的后端开发入门:21.SpringBoot [1/3]》

带回调函数的异步发送

发送方法

利用(.send(ProducerRecord, new Callback())进行发送。

示例代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
package com.kakawanyifan;

import org.apache.kafka.clients.producer.*;
import org.apache.kafka.common.serialization.StringSerializer;

import java.util.Properties;

public class MyProducer {
public static void main(String[] args) {
// 创建Kafka生产者的配置对象
Properties properties = new Properties();
// 给Kafka配置对象添加配置信息:bootstrap.servers
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "10.211.55.14:9092");

// key,value 序列化(必须):key.serializer,value.serializer
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,StringSerializer.class.getName());

// 创建Kafka生产者对象
KafkaProducer<String, String> kafkaProducer = new KafkaProducer<String, String>(properties);

// 调用send方法,发送消息
for (int i = 0; i < 5; i++) {
kafkaProducer.send(new ProducerRecord<>("myTopic","myMessage " + i),new Callback() {
@Override
public void onCompletion(RecordMetadata recordMetadata, Exception e) {
if (e == null) {
// 没有异常,输出信息到控制台
System.out.println("主题: " + recordMetadata.topic() + "->" + "分区:" + recordMetadata.partition()); }
else {
// 出现异常打印
e.printStackTrace();
}
}
});
}

// 关闭资源
kafkaProducer.close();
}
}

在上文我们还讨论了Selector的重试。这个和我们的回调函数是独立的。
重试机制依旧按照设定来,不会因为我们用了回调方法,就没有重试了,即我们不需要在回调函数中手动重试。

回调机制

这里其实有一个问题,什么情况下进行回调?
有些资料说,发送到了RecordAccumulator,就会进行回调。
其实,根据源码的注释和官方文档的描述,都是需要Kafka服务器响应后进行回调。

源码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
package org.apache.kafka.clients.producer;

/**
* A callback interface that the user can implement to allow code to execute when the request is complete. This callback
* will generally execute in the background I/O thread so it should be fast.
*/
public interface Callback {

/**
* A callback method the user can implement to provide asynchronous handling of request completion. This method will
* be called when the record sent to the server has been acknowledged. When exception is not null in the callback,
* metadata will contain the special -1 value for all fields. If topicPartition cannot be
* choosen, a -1 value will be assigned.
*
* @param metadata The metadata for the record that was sent (i.e. the partition and offset). An empty metadata
* with -1 value for all fields will be returned if an error occurred.
* @param exception The exception thrown during processing of this record. Null if no error occurred.
* Possible thrown exceptions include:
*
* Non-Retriable exceptions (fatal, the message will never be sent):
*
* InvalidTopicException
* OffsetMetadataTooLargeException
* RecordBatchTooLargeException
* RecordTooLargeException
* UnknownServerException
* UnknownProducerIdException
* InvalidProducerEpochException
*
* Retriable exceptions (transient, may be covered by increasing #.retries):
*
* CorruptRecordException
* InvalidMetadataException
* NotEnoughReplicasAfterAppendException
* NotEnoughReplicasException
* OffsetOutOfRangeException
* TimeoutException
* UnknownTopicOrPartitionException
*/
void onCompletion(RecordMetadata metadata, Exception exception);
}
  • This method will be called when the record sent to the server has been acknowledged.

官方文档:
Kafka响应后进行回调

官方文档的地址:https://kafka.apache.org/34/javadoc/org/apache/kafka/clients/producer/KafkaProducer.html#send(org.apache.kafka.clients.producer.ProducerRecord,org.apache.kafka.clients.producer.Callback)

同步发送

只需在普通异步发送的基础上,再调用一下get()方法即可。
这样底层就会利用同步方法。

示例代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
package com.kakawanyifan;

import org.apache.kafka.clients.producer.*;
import org.apache.kafka.common.serialization.StringSerializer;

import java.util.Properties;
import java.util.concurrent.ExecutionException;

public class MyProducer {
public static void main(String[] args) throws ExecutionException, InterruptedException {
// 创建Kafka生产者的配置对象
Properties properties = new Properties();
// 给Kafka配置对象添加配置信息:bootstrap.servers
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "10.211.55.14:9092");

// key,value 序列化(必须):key.serializer,value.serializer
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,StringSerializer.class.getName());

// 创建Kafka生产者对象
KafkaProducer<String, String> kafkaProducer = new KafkaProducer<String, String>(properties);

// 调用send方法,发送消息
for (int i = 0; i < 10; i++) {
// 同步发送
RecordMetadata recordMetadata = kafkaProducer.send(new ProducerRecord<>("myTopic","myMessage " + i)).get();
System.out.println("主题: " + recordMetadata.topic() + "->" + "分区:" + recordMetadata.partition());
}

// 关闭资源
kafkaProducer.close();
}
}

分区器

分区源码解读

现象

假设现在存在一个myTopic,有三个分区。

示例代码:

1
./bin/kafka-topics.sh --bootstrap-server 127.0.0.1:9092 --describe --topic myTopic

运行结果:

1
2
3
4
Topic: myTopic  TopicId: YxuLKXgKQMemEaqfyAPcpg PartitionCount: 3       ReplicationFactor: 1    Configs: 
Topic: myTopic Partition: 0 Leader: 0 Replicas: 0 Isr: 0
Topic: myTopic Partition: 1 Leader: 0 Replicas: 0 Isr: 0
Topic: myTopic Partition: 2 Leader: 0 Replicas: 0 Isr: 0

我们向其中发送10条消息,示例代码:

1
2
3
4
5
// 调用send方法,发送消息
for (int i = 0; i < 10; i++) {
RecordMetadata recordMetadata = kafkaProducer.send(new ProducerRecord<>("myTopic", "myMessage" + i)).get();
System.out.println("主题: " + recordMetadata.topic() + "->" + "分区:" + recordMetadata.partition());
}

运行结果:

1
2
3
4
5
6
7
8
9
10
主题: myTopic->分区:2
主题: myTopic->分区:2
主题: myTopic->分区:2
主题: myTopic->分区:2
主题: myTopic->分区:2
主题: myTopic->分区:2
主题: myTopic->分区:2
主题: myTopic->分区:2
主题: myTopic->分区:2
主题: myTopic->分区:2

看起来不是随机的。

如果我们指定key呢?示例代码:

1
2
3
4
5
// 调用send方法,发送消息
for (int i = 0; i < 10; i++) {
RecordMetadata recordMetadata = kafkaProducer.send(new ProducerRecord<>("myTopic", UUID.randomUUID().toString(),"myMessage" + i)).get();
System.out.println("主题: " + recordMetadata.topic() + "->" + "分区:" + recordMetadata.partition());
}

运行结果:

1
2
3
4
5
6
7
8
9
10
主题: myTopic->分区:0
主题: myTopic->分区:1
主题: myTopic->分区:1
主题: myTopic->分区:2
主题: myTopic->分区:1
主题: myTopic->分区:0
主题: myTopic->分区:1
主题: myTopic->分区:1
主题: myTopic->分区:2
主题: myTopic->分区:2

这个看起来就随机了。

源码

我们翻阅源码,查看原因。

doSend

点进kafkaProducersend方法:

1
2
3
4
@Override
public Future<RecordMetadata> send(ProducerRecord<K, V> record) {
return send(record, null);
}

再点进send方法:

1
2
3
4
5
6
@Override
public Future<RecordMetadata> send(ProducerRecord<K, V> record, Callback callback) {
// intercept the record, which can be potentially modified; this method does not throw exceptions
ProducerRecord<K, V> interceptedRecord = this.interceptors.onSend(record);
return doSend(interceptedRecord, callback);
}

点进doSend方法,该方法的代码很多,本文只截取和获取分区号有关的。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
/**
* Implementation of asynchronously send a record to a topic.
*/
private Future<RecordMetadata> doSend(ProducerRecord<K, V> record, Callback callback) {

【部分代码略】

try {

【部分代码略】

byte[] serializedKey;
try {
serializedKey = keySerializer.serialize(record.topic(), record.headers(), record.key());
} catch (ClassCastException cce) {
throw new SerializationException("Can't convert key of class " + record.key().getClass().getName() +
" to class " + producerConfig.getClass(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG).getName() +
" specified in key.serializer", cce);
}

【部分代码略】

// Try to calculate partition, but note that after this call it can be RecordMetadata.UNKNOWN_PARTITION,
// which means that the RecordAccumulator would pick a partition using built-in logic (which may
// take into account broker load, the amount of data produced to each partition, etc.).
int partition = partition(record, serializedKey, serializedValue, cluster);

【部分代码略】

RecordAccumulator.RecordAppendResult result = accumulator.append(record.topic(), partition, timestamp, serializedKey, serializedValue, headers, appendCallbacks, remainingWaitMs, abortOnNewBatch, nowMs, cluster);
assert appendCallbacks.getPartition() != RecordMetadata.UNKNOWN_PARTITION;
if (result.abortForNewBatch) {
int prevPartition = partition;
onNewBatch(record.topic(), cluster, prevPartition);
partition = partition(record, serializedKey, serializedValue, cluster);
if (log.isTraceEnabled()) {
log.trace("Retrying append due to new batch creation for topic {} partition {}. The old partition was {}", record.topic(), partition, prevPartition);
}
result = accumulator.append(record.topic(), partition, timestamp, serializedKey,
serializedValue, headers, appendCallbacks, remainingWaitMs, false, nowMs, cluster);
}

【部分代码略】

} catch (ApiException e) {

【部分代码略】

} catch (InterruptedException e) {

【部分代码略】

} catch (KafkaException e) {

【部分代码略】

} catch (Exception e) {

【部分代码略】

}
}

serializedKey

点进keySerializer.serializerecord.key()就是方法的参数T data

1
2
3
default byte[] serialize(String topic, Headers headers, T data) {
return serialize(topic, data);
}

在点进serialize,是一个接口的方法,record.key()就是方法的参数T data

1
2
3
4
5
6
7
8
/**
* Convert {@code data} into a byte array.
*
* @param topic topic associated with data
* @param data typed data
* @return serialized bytes
*/
byte[] serialize(String topic, T data);

我们以其实现类StringSerializerserialize方法为例。

1
2
3
4
5
6
7
8
9
10
11
@Override
public byte[] serialize(String topic, String data) {
try {
if (data == null)
return null;
else
return data.getBytes(encoding);
} catch (UnsupportedEncodingException e) {
throw new SerializationException("Error when serializing string to byte[] due to unsupported encoding " + encoding);
}
}

该方法的逻辑是:

  • 如果record.key()是空,就返回null
  • 否则,返回其Bytes

即:如果我们没填keyserializedKey就是空;填了就不是空。

partition

再点进partition方法。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
/**
* computes partition for given record.
* if the record has partition returns the value otherwise
* if custom partitioner is specified, call it to compute partition
* otherwise try to calculate partition based on key.
* If there is no key or key should be ignored return
* RecordMetadata.UNKNOWN_PARTITION to indicate any partition
* can be used (the partition is then calculated by built-in
* partitioning logic).
*/
private int partition(ProducerRecord<K, V> record, byte[] serializedKey, byte[] serializedValue, Cluster cluster) {
if (record.partition() != null)
return record.partition();

if (partitioner != null) {
int customPartition = partitioner.partition(
record.topic(), record.key(), serializedKey, record.value(), serializedValue, cluster);
if (customPartition < 0) {
throw new IllegalArgumentException(String.format(
"The partitioner generated an invalid partition number: %d. Partition number should always be non-negative.", customPartition));
}
return customPartition;
}

if (serializedKey != null && !partitionerIgnoreKeys) {
// hash the keyBytes to choose a partition
return BuiltInPartitioner.partitionForKey(serializedKey, cluster.partitionsForTopic(record.topic()).size());
} else {
return RecordMetadata.UNKNOWN_PARTITION;
}
}

如果指定了分区,就按指定的分区来。
如果没指定分区,但是指定了分区器,就按分区器的规则来。
如果没指定分区,没指定分区器,就看serializedKey不为空,partitionerIgnoreKeysFalse
符合的话,按照BuiltInPartitioner.partitionForKey的方法来。
不符合的话,返回RecordMetadata.UNKNOWN_PARTITION

有些资料说,指定了key,就是指定了分区,其实这么不对的。
指定分区应该利用如下的构造方法,注意Integer partition

1
2
3
public ProducerRecord(String topic, Integer partition, K key, V value) {
this(topic, partition, (Long)null, key, value, (Iterable)null);
}

BuiltInPartitioner.partitionForKey

点进BuiltInPartitioner.partitionForKey,是一个散列函数。

示例代码:

1
2
3
4
5
6
/*
* Default hashing function to choose a partition from the serialized key bytes
*/
public static int partitionForKey(final byte[] serializedKey, final int numPartitions) {
return Utils.toPositive(Utils.murmur2(serializedKey)) % numPartitions;
}

点进accumulator.append,其代码很多,本文只截取和分区有关的。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
public RecordAppendResult append(String topic,
int partition,
long timestamp,
byte[] key,
byte[] value,
Header[] headers,
AppendCallbacks callbacks,
long maxTimeToBlock,
boolean abortOnNewBatch,
long nowMs,
Cluster cluster) throws InterruptedException {
TopicInfo topicInfo = topicInfoMap.computeIfAbsent(topic, k -> new TopicInfo(logContext, k, batchSize));

【部分代码略】

try {
// Loop to retry in case we encounter partitioner's race conditions.
while (true) {
// If the message doesn't have any partition affinity, so we pick a partition based on the broker
// availability and performance. Note, that here we peek current partition before we hold the
// deque lock, so we'll need to make sure that it's not changed while we were waiting for the
// deque lock.
final BuiltInPartitioner.StickyPartitionInfo partitionInfo;
final int effectivePartition;
if (partition == RecordMetadata.UNKNOWN_PARTITION) {
partitionInfo = topicInfo.builtInPartitioner.peekCurrentPartitionInfo(cluster);
effectivePartition = partitionInfo.partition();
} else {
partitionInfo = null;
effectivePartition = partition;
}

// Now that we know the effective partition, let the caller know.
setPartition(callbacks, effectivePartition);

【部分代码略】

}
} finally {

【部分代码略】

}
}

如果partitionRecordMetadata.UNKNOWN_PARTITION,就进入builtInPartitioner.peekCurrentPartitionInfo
否则,按照partition的来。

点进builtInPartitioner.peekCurrentPartitionInfo

1
2
3
4
5
6
7
8
9
10
11
12
13
StickyPartitionInfo peekCurrentPartitionInfo(Cluster cluster) {
StickyPartitionInfo partitionInfo = stickyPartitionInfo.get();
if (partitionInfo != null)
return partitionInfo;

// We're the first to create it.
partitionInfo = new StickyPartitionInfo(nextPartition(cluster));
if (stickyPartitionInfo.compareAndSet(null, partitionInfo))
return partitionInfo;

// Someone has raced us.
return stickyPartitionInfo.get();
}

我们可以在该方法上进行打断点,第一次进入该方法的时候会执行StickyPartitionInfo(nextPartition(cluster)),之后进入该方法,一般partitionInfo不为空。
StickyPartitionInfo是一个粘性分区策略。

这种粘性分区策略,有助于性能,尽量选取其中一个节点进行发送,避免多节点发送数据造成性能损耗。

小结

综上所述,分区器的逻辑,如下:

小结

内置分区器

在Kafka中,分区器的顶层接口是Partitioner接口。

内置分区器

  • RoundRobinPartitioner:轮询分区器。
  • 除了BuiltInPartitioner分区器,其他三个分区器都实现了Partitioner接口。
    那么,BuiltInPartitioner没有实现Partitioner接口,是怎么实现分区的呢?
    我们上文讨论分区源码的时候,有提到过,是整合在我们的分区方法中。
  • 上图有一个特点,有两个分区器被弃用了,DefaultPartitionerUniformStickyPartitioner,原因是会造成分区间的不均衡。

    具体可以参考官网的这篇文章:https://cwiki.apache.org/confluence/display/KAFKA/KIP-794%3A+Strictly+Uniform+Sticky+Partitioner

有些资料会说DefaultPartitioner是Kafka的默认分区器。
这个名字确实容易迷惑人,但是至少在3.3.2的版本中,不是默认的分区器。

有些资料会说BuiltInPartitioner会随机的把消息发送到主题内的各个可用分区上。
这么说不够清晰。
如果指定了key,会根据key进行散列,那么可以理解为是随机的。
如果没有指定key,其实是一个粘性策略,只是"粘"分区的时候,是随机的。

有些资料会说RoundRobinPartitioner并不能真正的将将消息均匀地分配给每一个分区,而是将消费分配给第偶数分区(第2、4、6、8个)。
在我实际测试中,这个现象并没有发生。
RoundRobinPartitioner的源代码中,也没有找到关于这个的"坑"

示例代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
package com.kakawanyifan;

import org.apache.kafka.clients.producer.*;
import org.apache.kafka.common.serialization.StringSerializer;

import java.util.Properties;
import java.util.concurrent.ExecutionException;

public class MyProducer {
public static void main(String[] args) throws ExecutionException, InterruptedException {
// 创建Kafka生产者的配置对象
Properties properties = new Properties();
// 给Kafka配置对象添加配置信息:bootstrap.servers
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "10.211.55.14:9092");

// key,value 序列化(必须):key.serializer,value.serializer
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,StringSerializer.class.getName());

properties.put(ProducerConfig.PARTITIONER_CLASS_CONFIG,RoundRobinPartitioner.class.getName());

// 创建Kafka生产者对象
KafkaProducer<String, String> kafkaProducer = new KafkaProducer<String, String>(properties);

// 调用send方法,发送消息
for (int i = 0; i < 10; i++) {
RecordMetadata recordMetadata = kafkaProducer.send(new ProducerRecord<>("myTopic","myMessage" + i)).get();
System.out.println("主题: " + recordMetadata.topic() + "->" + "分区:" + recordMetadata.partition());
}

// 关闭资源
kafkaProducer.close();
}
}
运行结果:
1
2
3
4
5
6
7
8
9
10
主题: myTopic->分区:1
主题: myTopic->分区:0
主题: myTopic->分区:2
主题: myTopic->分区:1
主题: myTopic->分区:0
主题: myTopic->分区:2
主题: myTopic->分区:1
主题: myTopic->分区:0
主题: myTopic->分区:2
主题: myTopic->分区:1

自定义分区器

自定义分区器的步骤很简单:

  1. 实现Partitioner接口,及其3个方法(partitioncloseconfigure)
  2. partition方法上编写分区逻辑,其他方法留空即可。

例如,我们根据发送的消息的最后一个数字,进行分区。示例代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
package com.kakawanyifan;

import org.apache.kafka.clients.producer.Partitioner;
import org.apache.kafka.common.Cluster;
import org.apache.kafka.common.PartitionInfo;

import java.util.List;
import java.util.Map;

public class MyPartitioner implements Partitioner {
@Override
public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
int numPartitions = partitions.size();

Integer index = Integer.valueOf(value.toString().substring(value.toString().length() - 1));

return index % numPartitions;
}

@Override
public void close() {

}

@Override
public void configure(Map<String, ?> configs) {

}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
package com.kakawanyifan;

import org.apache.kafka.clients.producer.*;
import org.apache.kafka.common.serialization.StringSerializer;

import java.util.Properties;
import java.util.concurrent.ExecutionException;

public class MyProducer {
public static void main(String[] args) throws ExecutionException, InterruptedException {
// 创建Kafka生产者的配置对象
Properties properties = new Properties();
// 给Kafka配置对象添加配置信息:bootstrap.servers
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "10.211.55.14:9092");

// key,value 序列化(必须):key.serializer,value.serializer
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,StringSerializer.class.getName());

// 创建Kafka生产者对象
KafkaProducer<String, String> kafkaProducer = new KafkaProducer<String, String>(properties);

// 调用send方法,发送消息
for (int i = 0; i < 10; i++) {
RecordMetadata recordMetadata = kafkaProducer.send(new ProducerRecord<>("myTopic","myMessage" + i)).get();
System.out.println("主题: " + recordMetadata.topic() + "->" + "分区:" + recordMetadata.partition());
}

// 关闭资源
kafkaProducer.close();
}
}

运行结果:

1
2
3
4
5
6
7
8
9
10
主题: myTopic->分区:0
主题: myTopic->分区:1
主题: myTopic->分区:2
主题: myTopic->分区:0
主题: myTopic->分区:1
主题: myTopic->分区:2
主题: myTopic->分区:0
主题: myTopic->分区:1
主题: myTopic->分区:2
主题: myTopic->分区:0

提高吞吐量

linger.ms和batch.size

在上文,我们讨论过:
如果数据迟迟未达到batch.size,Sender对象在等待linger.ms之后就会发送数据,单位ms,默认值是0ms,没有延迟。

但这样,每次只拉一点数据,明显不好,可以修改为5-100ms。
也不能改得太大,太大会导致延迟高。

batch.size是一个批次的大小,默认是16KB,可以改成32KB,也不能改得太大。

实际上,我们调整这两个参数的目的,是数据不能拉得太勤,单次拉取的数据不能太多。
《关于弹幕视频网站的例子:基于Serverless的弹幕视频网站实现方案》,我们讨论过视频切片,建议每个切片的长度是30秒。
道理差不多。

compression.type

我们可以设置压缩类型,例如设置compression.typesnappy

buffer.memory

buffer.memory,控制RecordAccumulator缓冲区总大小,默认32MB,可以修改为64MB。

但,一般只有在分区特别多的时候,才修改这个。
例如,我们为每个分区至少准备16KB的数据,如果分区特别多,多到32MB不够用,这时候可以修改buffer.memory的值,否则几乎没效果。

配置方法

1
2
3
4
5
6
7
8
9
10
11
// batch.size:批次大小,默认 16K
properties.put(ProducerConfig.BATCH_SIZE_CONFIG, 16384);

// linger.ms:等待时间,默认 0
properties.put(ProducerConfig.LINGER_MS_CONFIG, 100);

// compression.type:压缩,默认 none,可配置值 gzip、snappy、lz4 和 zstd
properties.put(ProducerConfig.COMPRESSION_TYPE_CONFIG,"snappy");

// RecordAccumulator:缓冲区大小,默认 32M:buffer.memory
properties.put(ProducerConfig.BUFFER_MEMORY_CONFIG,33554432);

消息不丢失

三种应答方式的缺陷

在上文,我们讨论过acks参数的三种取值。

0:生产者发送过来的消息,不需要等落盘即应答。

acks=0

Leader收到消息后,返回了应答。
如果,还没来及的落盘,就挂了。
如此,发生了丢数。

1:生产者发送过来的消息,Leader收到后应答。

acks=1

Leader落盘后,返回了应答。
如果,还没来得及和Follow同步就挂了。
这时候,会选举产生新的Leader,但是新的Leader不会收到Hello的信息,因为生产者已经认为发送成功了。
如此,发生了丢数。

-1(all):生产者发送过来的消息,Leader和ISR队列里面的所有副本收到后应答。

acks=-1

这种情况没问题,不会发生丢消息。
Leader收到消息,所有Follower都开始同步消息。
但是,如果有一个Follower,因为某种故障,迟迟不能与Leader进行同步,那这个问题怎么解决呢?

ISR

Leader维护了一个动态的in-sync replica set(ISR),和Leader保持同步的Follower以及Leader本身组成的集合。

例如,在上文,ISR是[Leader,Follower-1,Follower-2]

如果Follower长时间未向Leader发送通信请求或同步消息,则该Follower将被踢出ISR。该时间阈值由replica.lag.time.max.ms参数设定,默认30s。

在上文,Follower-2挂了之后,ISR是[Leader,Follower-1]

这样就不用等长期联系不上或者已经故障的节点。

但,还存在一个问题。如果分区副本设置为1,或者ISR里应答的最小副本数量(min.insync.replicas默认为1)设置为1。这时候,和acks=1的效果是一样的,仍然有丢数的风险。

消息不丢失的条件

所以,消息不丢失的条件,需要同时满足如下三个条件。

  1. ACKS级别设置为-1
  2. 分区副本大于等于2(至少一个Leader和一个Follower)
  3. ISR里应答的最小副本数量大于等于2

生产环境的选择

  • acks=0,生产者发送过来消息,就不管了,可靠性差,效率高。
  • acks=1,生产者发送过来消息,Leader应答,可靠性中等,效率中等。
  • acks=-1,生产者发送过来消息,Leader和ISR队列里面所有Follwer应答,可靠性高,效率低。

在生产环境中:

  • acks=0,很少使用。
  • acks=1,一般用于传输普通日志,允许丢个别消息。
  • acks=-1,一般用于传输和钱相关的消息,对可靠性要求比较高的场景。

配置方法

1
2
// 设置 acks
properties.put(ProducerConfig.ACKS_CONFIG, "-1");

消息不重复

现象

现象

生产者发消息给Leader后,Leader进行同步,所有的Follower也都收到了。这时候准备发响应给生产者。
如果,就在这个时候,Leader挂了。
这时候会重新选举新的Leader。
然后,因为生产者没有收到响应,又重新发了一遍。
这时候,Kafka实际上收到了两条Hello
如此,发生了消息重复。

即,上文的"消息不丢失的条件",只能保证消息不丢失,不能保证消息不重复。

幂等性

那么,怎么做到消息不丢失不重复呢?
在"消息不丢失"的基础上,在接收消息的时候,判断一下,是不是已经收到过的,如果是收到过的,就不收。
这样不就做到"消息不重复"吗?

那么,我们怎么判断消息是收到过的?

重复消息的判断标准:具有<PID, Partition, SeqNumber>相同主键的消息提交时,Broker只会持久化一条。

  • PID是生产者的ID号,生产者每次重启都会分配一个新的。
  • Partition表示分区号。
  • Sequence Number是自增的。

所以幂等性只能保证的是在 单分区单会话 内不重复。

如图,红叉标识的消息,不会被接收,因为键值重复了。
幂等性只能保证的是在单分区且单会话内不重复

如果我们需要开启幂等性,只需要开启enable.idempotence,设置为true。该参数默认值也就是true

生产者事务

上文,我们说,幂等性只能保证 单会话 内不重复,因为生产者每次重启都会分配一个新的PID。
如果,给生产者配置上一个专门的标签,不论生产者重启多少次,这个标签的值都不变,是不是可以实现 多会话 内不重复?
这个标签就是transactional.id,事务ID。

生产者事务

  1. 获取事务协调器服务地址
    1. 生产者自定义一个唯一的transaction.id
    2. 生产者从Kafka集群中选择任意一台机器,然后向其发送请求,获取事务协调器服务的地址。
      __transaction_state,存储事务信息的特殊主题,默认有50个分区,每个分区负责一部分事务。事务划分是根据transactional.id的哈希值对50求余,计算出该事务属于哪个分区。这个分区的Leader所在的机器,就是负责这个事务协调器的服务地址。
      例如,在本文中,事务协调器地址是broker-0
  2. 生产者通过事务协调器获取PID。
  3. 事务协调器在分配PID后,会将其持久化到Topic__transaction_state
  4. 生产者在向topic-a发送消息之前,将分区地址上传到事务协调器。
    提一个问题,应该发给哪个分区?
    还是根据我们上文讨论的分区器的规则来。
    上图的事务协调器和topic-a-partion-0的Leader处在同一个broker中,容易有误会。这里说明一下,根据分区器的规则来。
  5. 事务协调器将分区地址持久化到Topic__transaction_state
  6. 生产者发送消息到topic-a
    正如步骤45,在发送消息之前,先将这些消息的分区地址上传到事务协调器,事务协调器会将这些分区地址持久化到Topictopic-a-partion-0,然后生产者才会真正的发送消息。
    这里的消息与普通消息不同,会有一个字段,表示自身是事务消息。
  7. 生产者向事务协调器发送commit请求。
    这里需要强调下,生产者会在发送commit请求之前,会等待之前所有的请求都已经发送并且响应成功。
  8. 事务协调器持久化commit请求,存储在Topic__transaction_state中。
  9. 事务协调器给生产者返回成功。
  10. 事务协调器发送commit请求给分区。
    这里有个疑虑,如果事务协调器在返回成功给生产者后,还没来及向分区发送commit请求就挂掉了,怎么办?
    因为每次事务的信息都会持久化,所以事务协调器挂掉重新启动后,会先从topic-a-partion-0加载事务信息,如果发现只有事务提交信息,却没有后来的事务完成信息,说明存在事务结果信息没有提交到分区。
  11. 分区返回成功。
  12. 事务协调器把持久化事务的成功信息持久化在Topic__transaction_state

Kafka 的事务一共有5个API:

  • 初始化事务
    1
    void initTransactions();
  • 开启事务
    1
    void beginTransaction() throws ProducerFencedException;
  • 在事务内提交已经消费的偏移量(主要用于消费者)
    1
    void sendOffsetsToTransaction(Map<TopicPartition, OffsetAndMetadata> offsets, String consumerGroupId) throws ProducerFencedException;
  • 提交事务
    1
    void commitTransaction() throws ProducerFencedException;
  • 放弃事务(类似于回滚事务的操作)
    1
    void abortTransaction() throws ProducerFencedException;

示例代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
package com.kakawanyifan;

import org.apache.kafka.clients.producer.*;
import org.apache.kafka.common.serialization.StringSerializer;

import java.util.Properties;
import java.util.concurrent.ExecutionException;

public class MyProducer {
public static void main(String[] args) throws ExecutionException, InterruptedException {
// 创建Kafka生产者的配置对象
Properties properties = new Properties();
// 给Kafka配置对象添加配置信息:bootstrap.servers
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "10.211.55.14:9092");

// key,value 序列化(必须):key.serializer,value.serializer
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,StringSerializer.class.getName());

properties.put(ProducerConfig.PARTITIONER_CLASS_CONFIG,MyPartitioner.class.getName());

// 设置事务 id(必须),事务 id 任意起名
properties.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "transaction_id_0");

// 创建Kafka生产者对象
KafkaProducer<String, String> kafkaProducer = new KafkaProducer<String, String>(properties);

// 初始化事务
kafkaProducer.initTransactions();

// 开启事务
kafkaProducer.beginTransaction();
try {
// 调用 send 方法,发送消息
for (int i = 0; i < 5; i++) {
// 发送消息
kafkaProducer.send(new ProducerRecord<>("myTopic", "myTransMessage-" + i));
}
// int i = 1 / 0;
// 提交事务
kafkaProducer.commitTransaction();
} catch (Exception e) {
// 终止事务
kafkaProducer.abortTransaction();
} finally {
// 关闭资源
kafkaProducer.close();
}

// 关闭资源
kafkaProducer.close();
}
}

消息有序

这里的消息有序,指的是一个分区内的消息,是有序的。

在上文,我们讨论了一个参数max.in.flight.requests.per.connection的默认值是5,即每个连接最多只能缓存5个未响应的请求。

现在假如有这么一个现象。
生产者依次发了Request1Request2Request3Request4Request5,一个五个请求。但是Kafka在接收的时候,因为种种原因,依次收到的是Request1Request2Request5Request4Request3,怎么办?

我们设置max.in.flight.requests.per.connection的值为1,只能缓存1个未响应的请求,一定要等上一个请求响应了,才可以发送下一个请求。
这个做可以。

在上文讨论幂等性的时候,我们提到了,有一个SeqNumber,我们在接收完毕后,根据SeqNumber排序。
这样也可以。

  • 在1.x版本之前保证消息单分区有序,条件如下:
    max.in.flight.requests.per.connection=1(不需要考虑是否开启幂等性)
  • 在1.x及以后版本保证消息单分区有序,条件如下:
    • 如果未开启幂等性:
      max.in.flight.requests.per.connection需要设置为1。
    • 如果开启幂等性:
      max.in.flight.requests.per.connection需要设置小于等于5。
文章作者: Kaka Wan Yifan
文章链接: https://kakawanyifan.com/11902
版权声明: 本博客所有文章版权为文章作者所有,未经书面许可,任何机构和个人不得以任何形式转载、摘编或复制。

留言板