抱歉,您的浏览器无法访问本站
本页面需要浏览器支持(启用)JavaScript
了解详情 >

Kafka生产者

生产者发送消息的基本流程

kafka是流式的处理数据,那么这个数据的来源就是由生产者发送到kafka的。整个消息的发送流程如下图:

  1. 从创建一个 ProducerRecord对象开始,Producer Record 对象需要包含目标主题和要发送的内容。 我们还可以指定键或分区。在发送 ProducerRecord对象时,生产者要先把键和值对象序列化成字节数组, 这样它们才能够在网络上传输。

  2. 接下来, 数据被传给分区器。 如果之前在 Producer Record 对象里指定了分区, 那么分区器就不会再做任何事情, 直接把指定的分区返回。 如果没有指定分区, 那么分区器会根据 Producer Record 对象的键来选择一个分区。 选好分区以后,生产者就知道该往哪个主题和分区发送这条记录了。 紧接着,这条记录被添加到一个记录批次里(双端队列,尾部写入) ,这个批次里的所有消息会被发送到相同的主题和分区上。 有一个独立的线程负责把这些记录批次发送到相应的 broker 上。

  3. 服务器在收到这些消息时会返回一个响应。 如果消息成功写入 Kafka ,就返回一个 RecordMetaData 对象, 它包含了主题和分区信息,以及记录在分区里的偏移量。如果写入失败,则会返回一个错误。 生产者在收到错误之后会尝试重新发送消息, 几次之后如果还是失败,就返回错误信息

生产者发送消息一般会发生两类错误:

  • 一类是可重试错误,比如连接错误(可通过再次建立连接解决)、无主 no leader(可通过分区重新选举首领解决)。
  • 另一类是无法通过重试解决,比如“消息太大” 异常,具体见 message.max.bytes,这类消息不会进行任何重试,直接抛出异常

使用Kafka生产者

三种发送方式

​ 我们通过生成者的 send 方法进行发送。 send 方法会返回一个包含 RecordMetadata 的 Future 对象。 RecordMetadata 里包含了目标主题, 分区信息和消息的偏移量。

发送并忘记

不关心消息是否正常到达,对返回结果不做任何判断处理

​ 发送并忘记的方式本质上也是一种异步的方式,只是它不会获取消息发送的返回结果,这种方式的吞吐量是最高的,但是无法保证消息的可靠性:

同步发送

通过get方法等待Kafka的响应,判断消息是否发送成功

​ 以同步的方式发送消息时,一条一条的发送,对每条消息返回的结果判断, 可以明确地知道每条消息的发送情况,但是由于同步的方式会阻塞,只有当消息通过get返回future对象时,才会继续下一条消息的发送:

异步发送+回调函数

消息以异步的方式发送,通过回调函数返回消息发送成功/失败

​ 在调用send方法发送消息的同时,指定一个回调函数,服务器在返回响应时会调用该回调函数,通过回调函数能够对异常情况进行处理,当调用了回调函数时,只有回调函数执行完毕生产者才会结束,否则一直会阻塞:

​ 实现接口org.apache.kafka.clients.producer.Callback,然后将实现类的实例作为参数传递给 send 方法 。

参考代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
public class MyProducer implements Job {
private static KafkaProducer<String,String> producer;

static {
Properties properties = new Properties();
properties.put("bootstrap.servers","127.0.0.1:9092");
properties.put("key.serializer",
"org.apache.kafka.common.serialization.StringSerializer");
properties.put("value.serializer",
"org.apache.kafka.common.serialization.StringSerializer");
producer = new KafkaProducer<String, String>(properties);
}

/**
* 第一种直接发送,不管结果
*/
private static void sendMessageForgetResult(){
ProducerRecord<String,String> record = new ProducerRecord<String,String>(
"kafka-study","name","Forget_result"
);
producer.send(record);
producer.close();
}

/**
* 第二种同步发送,等待执行结果
* @return
* @throws Exception
*/
private static RecordMetadata sendMessageSync() throws Exception{
ProducerRecord<String,String> record = new ProducerRecord<String,String>(
"kafka-study","name","sync"
);
RecordMetadata result = producer.send(record).get();
System.out.println(result.topic());
System.out.println(result.partition());
System.out.println(result.offset());
return result;
}

/**
* 第三种执行回调函数
*/
private static void sendMessageCallback(){
ProducerRecord<String,String> record = new ProducerRecord<String,String>(
"kafka-study","name","callback"
);
producer.send(record,new MyProducerCallback());
}

//定时任务
@Override
public void execute(JobExecutionContext jobExecutionContext) throws JobExecutionException {
try {
sendMessageSync();
}catch (Exception e){
System.out.println("error:"+e);
}
}

private static class MyProducerCallback implements Callback{

@Override
public void onCompletion(RecordMetadata recordMetadata, Exception e) {
if (e !=null){
e.printStackTrace();
return;
}
System.out.println(recordMetadata.topic());
System.out.println(recordMetadata.partition());
System.out.println(recordMetadata.offset());
System.out.println("Coming in MyProducerCallback");
}
}


public static void main(String[] args){
//sendMessageForgetResult();
//sendMessageCallback();
JobDetail job = JobBuilder.newJob(MyProducer.class).build();

Trigger trigger = TriggerBuilder.newTrigger()
.withSchedule(SimpleScheduleBuilder.repeatSecondlyForever()).build();

try {
Scheduler scheduler = StdSchedulerFactory.getDefaultScheduler();
scheduler.scheduleJob(job,trigger);
scheduler.start();
}catch (SchedulerException e){
e.printStackTrace();
} catch (Exception e) {
e.printStackTrace();
}
}
}

多线程下的生产者

​ KafkaProducer 的实现是线程安全的, 所以我们可以在多线程的环境下, 安全的使用 KafkaProducer 的实例,可以使用线程池来控制节服务器性能的消耗。

更多发送配置

生产者有很多属性可以设置,大部分都有合理的默认值,无需调整。有些参数可能对内存使用,性能和可靠性方面有较大影响。 可以参考org.apache.kafka.clients.producer 包下的 ProducerConfig 类。

ack
  • ack=0, 生产者在成功写入消息之前不会等待任何来自服务器的相应。如果出现问题生产者是感知不到的,消息就丢失了。不过因为生产者不需要等待服务器响应,所以它可以以网络能够支持的最大速度发送消息,从而达到很高的吞吐量。

  • ack=1,只要集群的首领节点收到消息,生产这就会收到一个来自服务器的成功响应。如果消息无法达到首领节点(比如首领节点崩溃,新的首领还没有被选举出来),生产者会收到一个错误响应,为了避免数据丢失,生产者会重发消息。但是,这样还有可能会导致数据丢失,如果收到写成功通知,此时首领节点还没来的及同步数据到follower节点,首领节点崩溃,就会导致数据丢失。

  • ack=all, 只有当所有参与复制的节点都收到消息时,生产这会收到一个来自服务器的成功响应,这种模式是最安全的,它可以保证不止一个服务器收到消息。

buffer.memory

​ 该参数用来设置生产者内存缓冲区的大小,生产者用它缓冲要发送到服务器的消息。如果应用程序发送消息的速率比写入kafka的速度要快,会导致生产者空间不足。这个时候,send()方法调用要么被阻塞,要么抛出异常,取决于如何设置max.block.ms,表示在抛出异常之前可以阻塞一段时间,如果数据产生速度大于向 broker 发送的速度,导致生产者空间不足,producer 会阻塞或者抛出异常,缺省 33554432 (32M) 。

max.block.ms

​ 该参数指定了在调用send()方法或使用partitionsFor()方法获取元数据时生产者的阻塞时间,当生产者的缓冲区已满,或没有可用的元数据时,这些方法就会阻塞。在阻塞时间达到max.block.ms时,生产者抛出超时异常,缺省 60000ms 。

retires

​ 发送失败时, 指定生产者可以重发消息的次数(缺省 Integer.MAX_VALUE) ,生产者从服务器收到的错误有可能是临时性的错误(比如分区找不到首领)。在这种情况下,如果达到了retires设置的次数,生产者会放弃重试并返回错误。默认情况下,生产者会在每次重试之间等待100ms,可以通过retry.backoff.ms参数来修改这个时间间隔。建议在设置重试次数和等待间隔之前,测一下恢复一个崩溃节点需要多久时间(比如所有分区选举出首领需要多长时间),让总的重试时间比kafka集群从崩溃中恢复的时间长,否则生产者会过早的放弃重试。不过有些错误不是临时性错误,没办法通过重试来解决,比如消息太大错误,一般情况下,因为生产者会自动重试,所以没必要在代码逻辑里处理那些可重试的错误,只需要处理那些不可能重试错误或重试次数超过上限的情况。

receive.buffer.bytes和send.buffer.bytes

​ 这两个参数分别指定了TCP socket接收和发送数据包的缓冲区大小,如果它们被设置成-1,就使用操作系统的默认值。如果生产者或消费者与broker 处于不同的数据中心,那么可以适当增大这些值,因为跨数据中心的网络一般都有比较高的延迟和比较低的带宽,缺省 102400 。

batch.size

​ 当有多个消息要被发送到同一个分区时,生产者会把它们放在同一个批次里。该参数指定了一个批次可以使用的内存大小,按照字节数计算,而不是消息个数。当批次被填满,批次里的所有消息会被发送出去。不过生产者并不一定都会等到批次被填满才发送,半满的批次,甚至只包含一个消息的批次也可能被发送。所以就算把batch.size设置的很大,也不会造成延迟,只会占用更多的内存而已,如果设置的太小,生产者会因为频繁发送消息而增加一些额外的开销,缺省16384(16k) , 如果一条消息超过了批次的大小,会写不进去 。

linger.ms

​ 该参数指定了生产者在发送批次之前等待更多消息加入批次的时间。KafkaProducer会在批次填满或linger.ms达到上限时把批次发送出去。默认情况下,只要有可用的线程,就算批次里只有一个消息,生产者也会把消息发送出去。把linger.ms设置成大于0的数,让生产者在发送批次前等一会,使更多的消息加入这个批次,这样做会增加延迟,但也会提高吞吐量。

compression.type

​ 默认情况下,消息发送时不会被压缩。该参数可以设置成snappy,gziplz4,它指定了消息发送给broker之前使用哪一种压缩算法。snappy占用较少的CPU,却能提供较好的性能和相当可观的压缩比,如果比较关注性能和网络带宽,可以使用这种压缩算法。gzip压缩算法一般会占用比较多的CPU,但会提供更高的压缩,如果网络带宽有限,可以使用这种算法。使用压缩可以降低网络传输开销和存储开销,而这往往是向kafka发送消息的瓶颈所在。

client.id

​ 该参数可以是任意的字符串,服务器会用它识别消息的来源,还可以用在日志和配额指标里(限速)。

​ 当向 server 发出请求时, 这个字符串会发送给 server。目的是能够追踪请求源头,以此来允许 ip/port 许可列表之外的一些应用可以发送信息。这项应用可以设置任意字符串, 因为没有任何功能性的目的, 除了记录和跟踪。

max.in.flight.requests.per.connection

​ 该参数指定了生产者在收到服务器响应之前可以发送多少消息。它的值越高,就会占用越多的内存,不过也会提升吞吐量。把它设置成1可以保证消息是按照发送的顺序写入服务器的,即使发生了重试,默认是 5 ,如果需要保证消息在一个分区上的严格顺序,这个值应该设为 1,不过这样会严重影响生产者的吞吐量。

request.timeout.ms

​ 客户端将等待请求的响应的最大时间,如果在这个时间内没有收到响应,客户端将重发请求;超过重试次数将抛异常,默认 30 秒。

metadata.fetch.timeout.ms

​ 是指我们所获取的一些元数据的第一个时间数据。元数据包含:topic,host,partitions。此项配置是指当等待元数据 fetch 成功完成所需要的时间,否则会跑出异常给客户端。

timeout.ms

​ 指定了broker等待同步副本返回消息确认的时间,与ack的配置相匹配——如果在指定时间内没有收到同步副本的确认,那么broker会返回一个错误。

顺序保证

​ Kafka可以保证同一个分区里的消息是有序的。也就是说,发送消息时,主题只有且只有一个分区,同时生产者按照一定的顺序发送消息,broker 就会按照这个顺序把它们写入分区,消费者也会按照同样的顺序读取它们。 在某些情况下,顺序是非常重要的。例如,往一个账户存入 100 元再取出来,这个与先取钱再存钱是截然不同的!不过,有些场景对顺序不是很敏感。

​ 如果把 retires 设为非零整数, 同时把 max.in.flight.requests.per.connection 设为比 1 大的数,那么,如果第一个批次消息写入失败,而第二个批次写入成功,broker 会重试写入第一个批次。 如果此时第一个批次也写入成功, 那么两个批次的顺序就反过来了。
​ 一般来说,如果某些场景要求消息是有序的,那么消息是否写入成功也是很关键的,所以不建议把 retires 设为 0(不重试的话消息可能会因为连接关闭等原因会丢) 。所以还是需要重试,同时把max.in.flight.request.per.connection 设为 1,这样在生产者尝试发送第一批消息时,就不会有其他的消息发
送给 broker 。不过这样会严重影响生产者的吞吐量,所以只有在对消息的顺序有严格要求的情况下才能这么做。

数据可靠性保证

为保证 Producer 发送的数据,能可靠地发送到指定的 Topic,Topic 的每个 Partition 收到 Producer 发送的数据后,都需要向 Producer 发送 ACK(ACKnowledge 确认收到)。

​ 如果 Producer 收到 ACK,就会进行下一轮的发送,否则重新发送数据。

副本数据同步策略

何时发送 ACK?确保有 Follower 与 Leader 同步完成,Leader 再发送 ACK,这样才能保证 Leader 挂掉之后,能在 Follower 中选举出新的 Leader 而不丢数据。

​ 多少个 Follower 同步完成后发送 ACK?全部 Follower 同步完成,再发送 ACK。

序列化

序列化主要是用来解决数据在网络中传输的问题. 在网络中传输的数据必须全是字节,也称为字节流. 而文本数据到字节数据的这一步就是序列化(将非字节数据 -> 字节数组).

Kafka中序列化

Kafka中的序列化主要是将发送的消息序列化成字节数组. 在Java中,有八大基本数据类型和引用类型. Kafka预先内置了一些相应的序列化和反序列化

Java类型 序列化 反序列化
int IntegerSerializer IntegerDeserializer
long LongSerializer LongDeserializer
double DoubleSerializer DoubleDeserializer
byte BytesSerializer BytesDeserializer
byte ByteArraySerializer ByteArrayDeserializer
byte ByteBufferSerializer ByteBufferDeserializer
String StringSerializer StringDeserializer

​ 通过上面表格可以看出,Kafka并不是为所有的基本类型内置了对应的序列化器和反序列化器. 而且Kafka为对byte提供方便,内置了三个不同的序列化器和反序列化器. 同时,Kafka为一个引用类型-String,提供了序列化器和反序列化器,因为String太常用了.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
// StringSerializer序列化代码
public class StringSerializer implements Serializer<String> {
private String encoding = "UTF8";

@Override
public byte[] serialize(String topic, String data) {
try {
if (data == null)
return null;
else
return data.getBytes(encoding);
} catch (UnsupportedEncodingException e) {
throw new SerializationException("Error when serializing string to byte[] due to unsupported encoding " + encoding);
}
}
}

从代码可以看出,默认情况下会把字符串编码成UTF-8格式,然后在网络中传输。

自定义序列化器

​ Kafka自带的序列化器并不能满足所有的需求,假如我有一个用户对象,里面包含用户姓名,用户年龄… 但是Kafka中没有提供相对应的序列化器,需要自己实现一个. 实现一个序列化器很简单,只需要实现一个接口。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
public interface Serializer<T> extends Closeable {

// 配置该类
void configure(Map<String, ?> configs, boolean isKey);

// 将数据转变为字节数组
byte[] serialize(String topic, T data);

// 默认方法
default byte[] serialize(String topic, Headers headers, T data) {
return serialize(topic, data);
}

// 关闭序列化器
@Override
void close();
}

接下来,自己实现一个序列化器,下面序列化器是商店顾客序列化器. 这里采用硬编码的方式,将该对象序列化成字节数组。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
public class CustomerSerializer implements Serializer<Customer> {
@Override
public byte[] serialize(String topic, Customer data) {
try {
byte[] serializedName;
int stringSize;
if (data == null) {
return null;
} else {
if (data.getName() != null) {
serializedName = data.getName().getBytes(StandardCharsets.UTF_8);
stringSize = serializedName.length;
} else {
serializedName = new byte[0];
stringSize = 0;
}
}

final ByteBuffer buffer = ByteBuffer.allocate(4 + 4 + stringSize);
buffer.putInt(data.getCustomerId());
buffer.putInt(stringSize);
buffer.put(serializedName);
return buffer.array();
} catch (Exception e) {
throw new SerializationException("Error when serializing Customer to byte[] " + e);
}
}
}
自定义序列化器的劣势
  • 需要考虑向前兼容和向后兼容的问题,假如更新的反序列化能否对以前的消息进行支持.
  • 需要将序列化和反序列化成匹配的出现

用第三方jar包实现自定义序列化

用JSON,ProtoBuf,Protostuff,Thrift…实现通过的序列化工具.

JSON 实现序列化
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
public class JsonSerializer implements Serializer<Customer> {

private final Logger log = LoggerFactory.getLogger(JsonSerializer.class);

@Override
public byte[] serialize(String topic, Customer data) {
byte[] result = null;
try {
// 关键代码,把对象序列化为字节数组.
result = JSON.toJSONBytes(data);
log.info("{} is serialize after the size is {}", data, result.length);
} catch (Exception e) {
e.printStackTrace();
}
return result;
}
}

使用Apache Avro进行序列化

​ Avro 会使用一个 JSON 文件作为 schema 来描述数据,Avro 在读写时会用到这个 schema,可以把这个schema 内嵌在数据文件中。这样,不管数据格式如何变动, 消费者都知道如何处理数据。
​ 但是内嵌的消息,自带格式,会导致消息的大小不必要的增大,消耗了资源。我们可以使用 schema 注册表机制,将所有写入的数据用到的 schema保存在注册表中, 然后在消息中引用 schema 的标识符, 而读取的数据的消费者程序使用这个标识符从注册表中拉取 schema 来反序列化记录。
​ 注意:Kafka 本身并不提供 schema 注册表,需要借助第三方,现在已经有很多的开源实现,比如Confluent Schema Registry,可以从 GitHub 上获取。
如何使用参考如下网址:https://cloud.tencent.com/developer/article/1336568

​ 不过一般除非你使用 Kafka 需要关联的团队比较大,敏捷开发团队才会使用,一般的团队用不上。对于一般的情况使用 JSON 足够了。

分区策略

分区原因

  • 方便在集群中扩展,每个 Partition 可以通过调整以适应它所在的机器,而一个 Topic 又可以有多个 Partition 组成,因此可以以 Partition 为单位读写了。

  • 可以提高并发,因此可以以 Partition 为单位读写了。

    分区原则:我们需要将 Producer 发送的数据封装成一个 ProducerRecord 对象。

​ 在使用Apache Kafka 生产和消费消息的时候,肯定是希望能够将数据均匀地分配到所有服务上。 比如很多公司使用Kafka收集应用程序的日志数据,这种数据都是很多的, 特别是对于大批量集群环境,每分钟产生的日志量达到GB级别,因此如何将这么大的数据量均匀分配到Kafka的各个Broker上,是一个非常重要的问题。

​ Kafka有主题(Topic)的概念,它是承载真实数据的逻辑容器,而在主体之下还分为若干分区,Kafka 消息组织是三级结构:主题-分区-消息。主题下每条消息都会保存到一个分区中。 官网描述三级结果示例图如下:

Kafka 为什么使用分区的概念而不使用多个主题呢 ?

​ 其实分区的作用是提供负载均衡的能力,是为了实现系统的高伸缩性(Scalability)。不同的分区能够放置到不同的节点上,而数据的读写操作是针对分区粒度进行的,这样每个节点的机器都能独立执行各个自分区的读写操作。并且,我们还可以通过添加新节点机器来增加整体的吞吐量。

​ 其实分区的概念在分布式系统中非常常见,不同的分布式系统中叫法不尽相同而已。比如,在Kafka中叫分区,在MongoDB 和 Elasticsearch 中就叫分片Shard,在HBase中则叫Region,在Cassandra 中被叫做vnode。

​ 在Kafka 分区除了提供负载均衡这种核心功能外, 还能解决供顺序消息等业务需求。

有哪些分区策略

所谓的分区策略就是决定生产者将消息发送到哪个分区的算法。 Kafka为我们提供了默认的分区策略,同时也支持自定义的分区策略。

轮询策略(Rund-robin)

比如一个主题下有3个分区,第一条消息被发送的了分区0,第二条消息被发送到了分区1,第三条消息被分配到了分区2,依次类推 第四条消息应该又重新开始,发送到分区0。

​ 轮询策略是Kafka 生产者API默认提供的分区策略。轮询策略有非常优秀的负载表现,它能最大限度的将消息平均分配大道所有分区上,也是我们最常用的分区策略之一。

随机策略(Randomness)

将消息发送到Topic下任意一个分区。

本质上随机策略是力求将数据均匀的打散到每个分区,但实际表现看来,它要逊于轮询策略。

按消息键保序策略(Key-ordering)

Kafka允许每条消息定义消息键,简称为Key。这个Key的作用非常大,她可以是一个有明确业务含义的字符串,比如:客户代码,部门编码,业务编号等,也可以用来表征消息元数据。在同一个Key的消息可以进入同一分区,且可以保证消息的顺序性,故这个策略叫按消息键保序策略。

其它分区策略

​ 除了上面已经讲述的几种分区策略, 还有哪些实际运用到的分区策略呢 ?有一种比较常见的 按地理位置分区的策略。这种策略对一些大规模的Kafka集群,特别是跨城市、跨国家 的集群比较有使用的价值。
比如:按注册地的不同,发不同的优惠卷。

评论