抱歉,您的浏览器无法访问本站
本页面需要浏览器支持(启用)JavaScript
了解详情 >

Kafka消费者

消费者的入门

​ 消费者的含义, 同一般消息中间件中消费者的概念。 在高并发的情况下, 生产者产生消息的速度是远大于消费者消费的速度, 单个消费者很可能会负担不起, 此时有必要对消费者进行横向伸缩, 于是我们可以使用多个消费者从同一个主题读取消息, 对消息进行分流。
​ (买单的故事, 群组, 消费者的一群人, 消费者: 买单的, 分区: 一笔单, 一笔单能被买单一次, 当然一个消费者可以买多个单, 如果有一个消费者挂掉了<跑单了>, 另外的消费者接上)

消费者与消费组

​ 假设这么个场景:我们从Kafka中读取消息,并且进行检查,最后产生结果数据。我们可以创建一个消费者实例去做这件事情,但如果生产者写入消息的速度比消费者读取的速度快怎么办呢?这样随着时间增长,消息堆积越来越严重。对于这种场景,我们需要增加多个消费者来进行水平扩展。

​ Kafka消费者是消费组的一部分,当多个消费者形成一个消费组来消费主题时,每个消费者会收到不同分区的消息。假设有一个T1主题,该主题有4个分区;同时我们有一个消费组G1,这个消费组只有一个消费者C1。那么消费者C1将会收到这4个分区的消息,如下所示:

如果我们增加新的消费者C2到消费组G1,那么每个消费者将会分别收到两个分区的消息,如下所示:

如果增加到4个消费者,那么每个消费者将会分别收到一个分区的消息,如下所示:

但如果我们继续增加消费者到这个消费组,剩余的消费者将会空闲,不会收到任何消息:

​ 总而言之,我们可以通过增加消费组的消费者来进行水平扩展提升消费能力。这也是为什么建议创建主题时使用比较多的分区数,这样可以在消费负载高的情况下增加消费者来提升性能。另外,消费者的数量不应该比分区数多,因为多出来的消费者是空闲的,没有任何帮助。

​ Kafka一个很重要的特性就是,只需写入一次消息,可以支持任意多的应用读取这个消息。换句话说,每个应用都可以读到全量的消息。为了使得每个应用都能读到全量消息,应用需要有不同的消费组。对于上面的例子,假如我们新增了一个新的消费组G2,而这个消费组有两个消费者,那么会是这样的:

在这个场景中,消费组G1和消费组G2都能收到T1主题的全量消息,在逻辑意义上来说它们属于不同的应用。

最后,总结起来就是:如果应用需要读取全量消息,那么请为该应用设置一个消费组;如果该应用消费能力不足,那么可以考虑在这个消费组里增加消费者。

消费者配置

消费者有很多属性可以设置,大部分都有合理的默认值,无需调整。有些参数可能对内存使用,性能和可靠性方面有较大影响。可以参考org.apache.kafka.clients.consumer 包下 ConsumerConfig 类 。

auto.offset.reset

消费者在读取一个没有偏移量的分区或者偏移量无效的情况下,如何处理。默认值是 latest, 从最新的记录开始读取,另一个值是 earliest,表示消费者从起始位置读取分区的记录。

当Kafka中没有初始offset或如果当前的offset不存在时(例如,该数据被删除了),该怎么办。

  • 最早:自动将偏移重置为最早的偏移
  • 最新:自动将偏移重置为最新偏移
  • none:如果消费者组找到之前的offset,则向消费者抛出异常
  • 其他:抛出异常给消费者。

注意: 如果是消费者在读取一个没有偏移量的分区或者偏移量无效的情况(因消费者长时间失效, 包含的偏移量记录已经过时并被删除)下,默认值是 latest 的话,消费者将从最新的记录开始读取数据(在消费者启动之后生成的记录),可以先启动生产者, 再启动消费者, 观察到这种情况。

enable.auto.commit

​ 默认值 true,表明消费者是否自动提交偏移,如果为true,消费者的offset将在后台周期性的提交。为了尽量避免重复数据和数据丢失,可以改为 false,自行控制何时提交。

auto.commit.enable

​ 如果为true,请定期向ZooKeeper提交消费者已经获取的消息的偏移量。 当进程失败时,将使用这种承诺偏移量作为新消费者开始的位置。

auto.commit.interval.ms

​ 如果enable.auto.commit设置为true,则消费者偏移量自动提交给Kafka的频率(以毫秒为单位)。

partition.assignment.strategy

​ 每个 Topic 一般会有很多个 partitions。为了使得我们能够及时消费消息,我们也可能会启动多个 Consumer 去消费,而每个 Consumer 又会启动一个或多个streams去分别消费 Topic 里面的数据。我们又知道,Kafka 存在 Consumer Group 的概念,也就是 group.id 一样的 Consumer,这些 Consumer 属于同一个Consumer Group,组内的所有消费者协调在一起来消费订阅主题(subscribed topics)的所有分区(partition)。当然,每个分区只能由同一个消费组内的一个consumer来消费。那么问题来了,同一个 Consumer Group 里面的 Consumer 是如何知道该消费哪些分区里面的数据呢?

如上图,Consumer1 为啥消费的是 Partition0 和 Partition2,而不是 Partition0 和 Partition3?这就涉及到 Kafka 内部分区分配策略(Partition Assignment Strategy)了。

在 Kafka 内部存在两种默认的分区分配策略:Range 和 RoundRobin。当以下事件发生时,Kafka 将会进行一次分区分配:

  • 同一个 Consumer Group 内新增消费者
  • 消费者离开当前所属的Consumer Group,包括shuts down 或 crashes
  • 订阅的主题新增分区

​ 将分区的所有权从一个消费者移到另一个消费者称为重新平衡(rebalance),如何rebalance就涉及到本文提到的分区分配策略。下面我们将详细介绍 Kafka 内置的两种分区分配策略。本文假设我们有个名为 T1 的主题,其包含了10个分区,然后我们有两个消费者(C1,C2)来消费这10个分区里面的数据,而且 C1 的 num.streams = 1,C2 的 num.streams = 2。

Range strategy

​ Range策略是对每个主题而言的,首先对同一个主题里面的分区按照序号进行排序,并对消费者按照字母顺序进行排序。在我们的例子里面,排完序的分区将会是0, 1, 2, 3, 4, 5, 6, 7, 8, 9;消费者线程排完序将会是C1-0, C2-0, C2-1。然后将partitions的个数除于消费者线程的总数来决定每个消费者线程消费几个分区。如果除不尽,那么前面几个消费者线程将会多消费一个分区。在我们的例子里面,我们有10个分区,3个消费者线程, 10 / 3 = 3,而且除不尽,那么消费者线程 C1-0 将会多消费一个分区,所以最后分区分配的结果看起来是这样的:

1
2
3
C1-0 将消费 0, 1, 2, 3 分区
C2-0 将消费 4, 5, 6 分区
C2-1 将消费 7, 8, 9 分区

假如我们有11个分区,那么最后分区分配的结果看起来是这样的:

1
2
3
C1-0 将消费 0, 1, 2, 3 分区
C2-0 将消费 4, 5, 6, 7 分区
C2-1 将消费 8, 9, 10 分区

假如我们有2个主题(T1和T2),分别有10个分区,那么最后分区分配的结果看起来是这样的:

1
2
3
C1-0 将消费 T1主题的 0, 1, 2, 3 分区以及 T2主题的 0, 1, 2, 3分区
C2-0 将消费 T1主题的 4, 5, 6 分区以及 T2主题的 4, 5, 6分区
C2-1 将消费 T1主题的 7, 8, 9 分区以及 T2主题的 7, 8, 9分区

可以看出,C1-0 消费者线程比其他消费者线程多消费了2个分区,这就是Range strategy的一个很明显的弊端。

RoundRobin strategy
  • 使用RoundRobin策略有两个前提条件必须满足:

  • 使用RoundRobin策略有两个前提条件必须满足:

​ 所以这里假设前面提到的2个消费者的num.streams = 2。RoundRobin策略的工作原理:将所有主题的分区组成 TopicAndPartition 列表,然后对 TopicAndPartition 列表按照 hashCode 进行排序。

在我们的例子里面,加入按照 hashCode 排序完的topic-partitions组依次为T1-5, T1-3, T1-0, T1-8, T1-2, T1-1, T1-4, T1-7, T1-6, T1-9,我们的消费者线程排序为C1-0, C1-1, C2-0, C2-1,最后分区分配的结果为:

1
2
3
4
C1-0 将消费 T1-5, T1-2, T1-6 分区;
C1-1 将消费 T1-3, T1-1, T1-9 分区;
C2-0 将消费 T1-0, T1-4 分区;
C2-1 将消费 T1-8, T1-7 分区;

​ 根据上面的详细介绍相信大家已经对Kafka的分区分配策略原理很清楚了。不过遗憾的是,目前我们还不能自定义分区分配策略,只能通过partition.assignment.strategy参数选择 range 或 roundrobin。partition.assignment.strategy参数默认的值是range。

max.poll.records

在单次调用poll()中返回的最大记录数。

max.poll.interval.ms

​ 使用消费者组管理时poll()调用之间的最大延迟。消费者在获取更多记录之前可以空闲的时间量的上限。如果此超时时间期满之前poll()没有调用,则消费者被视为失败,并且分组将重新平衡,以便将分区重新分配给别的成员。

fetch.min.bytes

​ 每次 fetch 请求时,server 应该返回的最小字节数。如果没有足够的数据返回,请求会等待,直到足够的数据才会返回。缺省为 1 个字节。多消费者下,可以设大这个值,以降低 broker 的工作负载

fetch.wait.max.ms

​ 如果没有足够的数据能够满足 fetch.min.bytes,则此项配置是指在应答 fetch 请求之前, server 会阻塞的最大时间。 缺省为 500 个毫秒。和上面的fetch.min.bytes 结合起来,要么满足数据的大小,要么满足时间,就看哪个条件先满足。

max.partition.fetch.bytes

​ 指定了服务器从每个分区里返回给消费者的最大字节数,默认 1MB。 假设一个主题有 20 个分区和 5 个消费者,那么每个消费者至少要有 4MB 的可用内存来接收记录,而且一旦有消费者崩溃,这个内存还需更大。注意, 这个参数要比服务器的 message.max.bytes 更大,否则消费者可能无法读取消息

session.timeout.ms

​ 用于发现消费者故障的超时时间。消费者周期性的发送心跳到broker,表示其还活着。如果会话超时期满之前没有收到心跳,那么broker将从分组中移除消费者,并启动重新平衡。请注意,该值必须在broker配置的group.min.session.timeout.msgroup.max.session.timeout.ms允许的范围内。

​ 如果 consumer 在这段时间内没有发送心跳信息,则它会被认为挂掉了,默认 3 秒。

client.id

​ 在发出请求时传递给服务器的id字符串。 这样做的目的是通过允许将逻辑应用程序名称包含在服务器端请求日志记录中,来跟踪ip/port的请求源,这项应用可以设置任意字符串,因为没有任何功能性的目的,除了记录和跟踪。

receive.buffer.bytes

读取数据时使用的TCP接收缓冲区(SO_RCVBUF)的大小。 如果值为-1,则将使用OS默认值。

send.buffer.bytes

发送数据时要使用的TCP发送缓冲区(SO_SNDBUF)的大小。 如果值为-1,则将使用OS默认值。

​ 指定 TCP socket 接受和发送数据包的缓存区大小。 如果它们被设置为-1, 则使用操作系统的默认值。 如果生产者或消费者处在不同的数据中心, 那么可以适当增大这些值, 因为跨数据中心的网络一般都有比较高的延迟和比较低的带宽。

check.crcs

​ 自动检查CRC32记录的消耗。 这样可以确保消息发生时不会在线或磁盘损坏。 此检查增加了一些开销,因此在寻求极致性能的情况下可能会被禁用。

metadata.max.age.ms

​ 在一定时间段之后(以毫秒为单位的),强制更新元数据,即使没有任何分区领导变化,任何新的broker或分区。

retry.backoff.ms

​ 尝试重新发送失败的请求到指定topic分区之前的等待时间。避免在某些故障情况下,频繁的重复发送。

connections.max.idle.ms

​ 指定在多少毫秒之后关闭闲置的连接

消费者相关概念

消费者群组

参加入门的群组介绍

订阅

​ 创建完消费者后我们便可以订阅主题了,只需要通过调用subscribe()方法即可,这个方法接收一个主题列表,非常简单:

1
consumer.subscribe(Collections.singletonList("customerCountries"));

​ 这个例子中只订阅了一个customerCountries主题。另外,我们也可以使用正则表达式来匹配多个主题,而且订阅之后如果又有匹配的新主题,那么这个消费组会立即对其进行消费。正则表达式在连接Kafka与其他系统时非常有用。比如订阅所有的测试主题:

1
consumer.subscribe("test.*");

轮询

​ 消费数据的API和处理方式很简单,我们只需要循环不断拉取消息即可。Kafka对外暴露了一个非常简洁的poll方法,其内部实现了协作、分区重平衡、心跳、数据拉取等功能,但使用时这些细节都被隐藏了,我们也不需要关注这些。下面是一个代码样例:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
try {
while (true) { //1)
ConsumerRecords<String, String> records = consumer.poll(100); //2)
for (ConsumerRecord<String, String> record : records) //3)
{
log.debug("topic = %s, partition = %s, offset = %d,
customer = %s, country = %s\n",
record.topic(), record.partition(), record.offset(),
record.key(), record.value());
int updatedCount = 1;
if (custCountryMap.countainsValue(record.value())) {
updatedCount = custCountryMap.get(record.value()) + 1;
}
custCountryMap.put(record.value(), updatedCount)
JSONObject json = new JSONObject(custCountryMap);
System.out.println(json.toString(4))
}
}
} finally {
consumer.close(); //4
}

其中,代码中标注了几点,说明如下:

  • 1)这个例子使用无限循环消费并处理数据,这也是使用Kafka最多的一个场景,后面我们会讨论如何更好的退出循环并关闭。
  • 2)这是上面代码中最核心的一行代码。我们不断调用poll拉取数据,如果停止拉取,那么Kafka会认为此消费者已经死亡并进行重平衡。参数值是一个超时时间,指明线程如果没有数据时等待多长时间,0表示不等待立即返回。
  • 3)poll()方法返回记录的列表,每条记录包含key/value以及主题、分区、位移信息。
  • 4)主动关闭可以使得Kafka立即进行重平衡而不需要等待会话过期。

​ 另外需要提醒的是,消费者对象不是线程安全的,也就是不能够多个线程同时使用一个消费者对象;而且也不能够一个线程有多个消费者对象。简而言之,一个线程一个消费者,如果需要多个消费者那么请使用多线程来进行一一对应。

提交和偏移量

一般情况下,我们调用 poll 方法的时候,broker 返回的是生产者写入 Kafka 同时 kafka 的消费者提交偏移量,这样可以确保消费者消息消费不丢失也不重复,所以一般情况下 Kafka 提供的原生的消费者是安全的,但是事情会这么完美吗?

​ Consumer 需要向 Kafka 汇报自己的位移数据,这个汇报过程被称为提交位移(Committing Offsets)。因为 Consumer 能够同时消费多个分区的数据,所以位移的提交实际上是在分区粒度上进行的,即 Consumer 需要为分配给它的每个分区提交各自的位移数据。

​ 提交位移主要是为了表征 Consumer 的消费进度,这样当 Consumer 发生故障重启之后,就能够从 Kafka 中读取之前提交的位移值,然后从相应的位移处继续消费,从而避免整个消费过程重来一遍。

从用户的角度来说,位移提交分为自动提交和手动提交;从 Consumer 端的角度来说,位移提交分为同步提交和异步提交。

消费者提交偏移量导致的问题

​ 当我们调用poll()时,该方法会返回我们没有消费的消息。当消息从broker返回消费者时,broker并不跟踪这些消息是否被消费者接收到;Kafka让消费者自身来管理消费的位移,并向消费者提供更新位移的接口,这种更新位移方式称为提交(commit)。

​ 在正常情况下,消费者会发送分区的提交信息到Kafka,Kafka进行记录。当消费者宕机或者新消费者加入时,Kafka会进行重平衡,这会导致消费者负责之前并不属于它的分区。重平衡完成后,消费者会重新获取分区的位移,下面来看下两种有意思的情况。

重复消费

​ 假如一个消费者在重平衡前后都负责某个分区,如果提交位移比之前实际处理的消息位移要小,那么会导致消息重复消费,如下所示:

丢失数据

​ 假如在重平衡前某个消费者拉取分区消息,在进行消息处理前提交了位移,但还没完成处理宕机了,然后Kafka进行重平衡,新的消费者负责此分区并读取提交位移,此时会“丢失”消息,如下所示:

因此,提交位移的方式会对应用有比较大的影响,下面来看下不同的提交方式。

自动提交

​ 这种方式让消费者来管理位移,应用本身不需要显式操作。当我们将enable.auto.commit设置为true,那么消费者会在poll方法调用后每隔5秒(由auto.commit.interval.ms指定)提交一次位移。和很多其他操作一样,自动提交也是由poll()方法来驱动的;在调用poll()时,消费者判断是否到达提交时间,如果是则提交上一次poll返回的最大位移。

​ 需要注意到,这种方式可能会导致消息重复消费。假如,某个消费者poll消息后,应用正在处理消息,在3秒后Kafka进行了重平衡,那么由于没有更新位移导致重平衡后这部分消息重复消费。

​ 一旦设置了 enable.auto.commit 为 true,Kafka 会保证在开始调用 poll 方法时,提交上次 poll 返回的所有消息。从顺序上来说,poll 方法的逻辑是先提交上一批消息的位移,再处理下一批消息,因此它能保证不出现消费丢失的情况。但自动提交位移的一个问题在于,它可能会出现重复消费。

在默认情况下,Consumer 每 5 秒自动提交一次位移。我们假设提交位移之后的 3 秒发生了 Rebalance 操作。在 Rebalance 之后,所有 Consumer 从上一次提交的位移处继续消费,但该位移已经是 3 秒前的位移数据了,故在 Rebalance 发生前 3 秒消费的所有数据都要重新再消费一次。虽然你能够通过减少 auto.commit.interval.ms 的值来提高提交频率,但这么做只能缩小重复消费的时间窗口,不可能完全消除它。这是自动提交机制的一个缺陷。

测试
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
public class MyKafkaConsumer {
public static void main(String[] args) {
//消费者三个属性必须指定(broker地址清单,key和value的序列化器)
Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.64.141:9092,192.168.64.141:9093,192.168.64.141:9094");
props.put(ConsumerConfig.GROUP_ID_CONFIG, "group_0");
//关闭自动提交
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true");
//设置每隔2s 提交一次
props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "2000");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
//1.创建消费者
KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(props);

//2.订阅Topic
//创建一个只包含单个元素的列表,Topic的名字叫作customerCountries,可以订阅多个主题
consumer.subscribe(Collections.singletonList("hello-kafka"));
//支持正则表达式,订阅所有与test相关的Topic
//consumer.subscribe("test.*");

//3.轮询
//消息轮询是消费者的核心API,通过一个简单的轮询向服务器请求数据,一旦消费者订阅了Topic,轮询就会处理所欲的细节,包括群组协调、partition再均衡、发送心跳
//以及获取数据,开发者只要处理从partition返回的数据即可。
try {
while (true) {//消费者是一个长期运行的程序,通过持续轮询向Kafka请求数据。在其他线程中调用consumer.wakeup()可以退出循环
//在100ms内等待Kafka的broker返回数据.超市参数指定poll在多久之后可以返回,不管有没有可用的数据都要返回
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(500));
for (ConsumerRecord<String, String> record : records) {
System.out.println(String.format("主题:%s, 分区:%d, 偏移量:%d, key:%s, value:%s",
record.topic(), record.partition(), record.offset(), record.key(), record.value())); //统计各个地区的客户数量,即模拟对消息的处理
}
}
} finally {
//退出应用程序前使用close方法关闭消费者,网络连接和socket也会随之关闭,并立即触发一次再均衡
consumer.close();
}
}
}
手动提交

​ 为了减少消息重复消费或者避免消息丢失,很多应用选择自己主动提交位移。设置auto.commit.offset为false,那么应用需要自己通过调用commitSync()来主动提交位移,该方法会提交poll返回的最后位移。

​ 为了避免消息丢失,我们应当在完成业务逻辑后才提交位移。而如果在处理消息时发生了重平衡,那么只有当前poll的消息会重复消费。

​ commitAsync 不能够替代 commitSync . commitAsync 的问题在于,出现问题时它不会自动重试。因为它是异步操作,倘若提交失败后自动重试,那么它重试时提交的位移值可能早已经“过期”或不是最新值了。因此,异步提交的重试其实没有意义,所以 commitAsync 是不会重试的。

测试
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
...
//关闭自动提交
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
try {
while (true) {//消费者是一个长期运行的程序,通过持续轮询向Kafka请求数据。在其他线程中调用consumer.wakeup()可以退出循环
//在100ms内等待Kafka的broker返回数据.超市参数指定poll在多久之后可以返回,不管有没有可用的数据都要返回
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(500));
for (ConsumerRecord<String, String> record : records) {
System.out.println(String.format("主题:%s, 分区:%d, 偏移量:%d, key:%s, value:%s",
record.topic(), record.partition(), record.offset(), record.key(), record.value())); //统计各个地区的客户数量,即模拟对消息的处理
consumer.commitSync();
}
}
} finally {
//退出应用程序前使用close方法关闭消费者,网络连接和socket也会随之关闭,并立即触发一次再均衡
consumer.close();
}

上面代码poll消息,并进行简单的打印(在实际中有更多的处理),最后完成处理后进行了位移提交。

异步提交

​ 手动提交有一个缺点,那就是当发起提交调用时应用会阻塞。当然我们可以减少手动提交的频率,但这个会增加消息重复的概率(和自动提交一样)。另外一个解决办法是,使用异步提交的API。以下为使用异步提交的方式,应用发了一个提交请求然后立即返回

​ 对于常规性、阶段性的手动提交,我们调用 commitAsync() 避免程序阻塞,而在 Consumer 要关闭前,我们调用 commitSync() 方法执行同步阻塞式的位移提交,以确保 Consumer 关闭前能够保存正确的位移数据。将两者结合后,我们既实现了异步无阻塞式的位移管理,也确保了 Consumer 位移的正确性.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
...
//关闭自动提交
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
try {
while (true) {//消费者是一个长期运行的程序,通过持续轮询向Kafka请求数据。在其他线程中调用consumer.wakeup()可以退出循环
//在100ms内等待Kafka的broker返回数据.超市参数指定poll在多久之后可以返回,不管有没有可用的数据都要返回
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(500));
for (ConsumerRecord<String, String> record : records) {
System.out.println(String.format("主题:%s, 分区:%d, 偏移量:%d, key:%s, value:%s",
record.topic(), record.partition(), record.offset(), record.key(), record.value())); //统计各个地区的客户数量,即模拟对消息的处理
consumer.commitAsync();
}
}
} finally {
//退出应用程序前使用close方法关闭消费者,网络连接和socket也会随之关闭,并立即触发一次再均衡
consumer.close();
}

​ 但是异步提交也有个缺点,那就是如果服务器返回提交失败,异步提交不会进行重试。相比较起来,同步提交会进行重试直到成功或者最后抛出异常给应用。异步提交没有实现重试是因为,如果同时存在多个异步提交,进行重试可能会导致位移覆盖。举个例子,假如我们发起了一个异步提交commitA,此时的提交位移为2000,随后又发起了一个异步提交commitB且位移为3000;commitA提交失败但commitB提交成功,此时commitA进行重试并成功的话,会将实际上将已经提交的位移从3000回滚到2000,导致消息重复消费。

因此,基于这种性质,一般情况下对于异步提交,我们可能会通过回调的方式记录提交结果:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
try {
while (true) {//消费者是一个长期运行的程序,通过持续轮询向Kafka请求数据。在其他线程中调用consumer.wakeup()可以退出循环
//在100ms内等待Kafka的broker返回数据.超市参数指定poll在多久之后可以返回,不管有没有可用的数据都要返回
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(500));
for (ConsumerRecord<String, String> record : records) {
System.out.println(String.format("主题:%s, 分区:%d, 偏移量:%d, key:%s, value:%s",
record.topic(), record.partition(), record.offset(), record.key(), record.value())); //统计各个地区的客户数量,即模拟对消息的处理
consumer.commitAsync(new OffsetCommitCallback() {
@Override
public void onComplete(Map<TopicPartition, OffsetAndMetadata> map, Exception e) {
if (e != null) {
System.out.println("异步提交异常,errorMsg:" + e.getMessage());

}
}
});
}
}
} finally {
//退出应用程序前使用close方法关闭消费者,网络连接和socket也会随之关闭,并立即触发一次再均衡
consumer.close();
}

​ 而如果想进行重试同时又保证提交顺序的话,一种简单的办法是使用单调递增的序号。每次发起异步提交时增加此序号,并且将此时的序号作为参数传给回调方法;当消息提交失败回调时,检查参数中的序号值与全局的序号值,如果相等那么可以进行重试提交,否则放弃(因为已经有更新的位移提交了)。

混合同步提交与异步提交

​ 正常情况下,偶然的提交失败并不是什么大问题,因为后续的提交成功就可以了。但是在某些情况下(例如程序退出、重平衡),我们希望最后的提交成功,因此一种非常普遍的方式是混合异步提交和同步提交,如下所示:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
...
//关闭自动提交
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
try {
while (true) {//消费者是一个长期运行的程序,通过持续轮询向Kafka请求数据。在其他线程中调用consumer.wakeup()可以退出循环
//在100ms内等待Kafka的broker返回数据.超市参数指定poll在多久之后可以返回,不管有没有可用的数据都要返回
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(500));
for (ConsumerRecord<String, String> record : records) {
System.out.println(String.format("主题:%s, 分区:%d, 偏移量:%d, key:%s, value:%s",
record.topic(), record.partition(), record.offset(), record.key(), record.value())); //统计各个地区的客户数量,即模拟对消息的处理
consumer.commitAsync();
}
}
} finally {
try {
// 最后一次提交使用同步阻塞式提交
consumer.commitSync();
} finally {
//退出应用程序前使用close方法关闭消费者,网络连接和socket也会随之关闭,并立即触发一次再均衡
consumer.close();
}
}

在正常处理流程中,我们使用异步提交来提高性能,但最后使用同步提交来保证位移提交成功。

提交特定位移

​ commitSync()和commitAsync()会提交上一次poll()的最大位移,但如果poll()返回了批量消息,而且消息数量非常多,我们可能会希望在处理这些批量消息过程中提交位移,以免重平衡导致从头开始消费和处理。幸运的是,commitSync()和commitAsync()允许我们指定特定的位移参数,参数为一个分区与位移的map。由于一个消费者可能会消费多个分区,所以这种方式会增加一定的代码复杂度,如下所示:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
...
//关闭自动提交
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
try {
Map<TopicPartition, OffsetAndMetadata> currentOffsets = new HashMap<>();
int count = 0;
while (true) {//消费者是一个长期运行的程序,通过持续轮询向Kafka请求数据。在其他线程中调用consumer.wakeup()可以退出循环
//在100ms内等待Kafka的broker返回数据.超市参数指定poll在多久之后可以返回,不管有没有可用的数据都要返回
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(500));
for (ConsumerRecord<String, String> record : records) {
System.out.println(String.format("主题:%s, 分区:%d, 偏移量:%d, key:%s, value:%s",
record.topic(), record.partition(), record.offset(), record.key(), record.value())); //统计各个地区的客户数量,即模拟对消息的处理
currentOffsets.put(new TopicPartition(record.topic(), record.partition()),
new OffsetAndMetadata(record.offset() + 1));
if (count % 100 == 0) {
consumer.commitAsync(currentOffsets, null); // 回调处理逻辑是null
}
count++;
}
}
} finally {
try {
// 最后一次提交使用同步阻塞式提交
consumer.commitSync();
} finally {
//退出应用程序前使用close方法关闭消费者,网络连接和socket也会随之关闭,并立即触发一次再均衡
consumer.close();
}
}

​ 代码中在处理poll()消息的过程中,不断保存分区与位移的关系,每处理1000条消息就会异步提交(也可以使用同步提交)。

多线程安全问题

KafkaConsumer 的实现不是线程安全的, 所以我们在多线程的环境下, 使用 KafkaConsumer 的实例要小心, 应该每个消费数据的线程拥有自己的KafkaConsumer 实例 ,所以实现多线程时通常由两种实现方法。

每个线程维护一个KafkaConsumer

实现代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
public class KafkaConsumerRunner implements Runnable {
private final AtomicBoolean closed = new AtomicBoolean(false);
private final KafkaConsumer consumer;

public void run() {
try{
consumer.subscribe(Arrays.asList("topic"));
while(!closed.get()){
ConsumerRecords records = consumer.poll(Duration.ofMillis(10000));
}
}catch (WakeupException e){
if (!closed.get()) throw e;
}finally {
consumer.close();
}
}

public void shutdown() {
closed.set(true);
consumer.wakeup();
}
}
维护一个或多个KafkaConsumer,同时维护多个事件处理线程(worker thread)

实现代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
private final KafkaConsumer<String, String> consumer;
private ExecutorService executors;
...

private int workerNum = ...;
executors = new ThreadPoolExecutor(
workerNum, workerNum, 0L, TimeUnit.MILLISECONDS,
new ArrayBlockingQueue<>(1000),
new ThreadPoolExecutor.CallerRunsPolicy());

...
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(1));
for (final ConsumerRecord record : records) {
executors.submit(new Worker(record));
}
}
总结

​ 当然,这种方法还可以有多个变种:比如每个worker线程有自己的处理队列。consumer根据某种规则或逻辑将消息放入不同的队列。不过总体思想还是相同的,故这里不做过多展开讨论了。

  下表总结了两种方法的优缺点:

方式 优点 缺点
方法1(每个线程维护一个KafkaConsumer) 方便实现 速度较快,因为不需要任何线程间交互 易于维护分区内的消息顺序 更多的TCP连接开销(每个线程都要维护若干个TCP连接) consumer数受限于topic分区数,扩展性差 频繁请求导致吞吐量下降 线程自己处理消费到的消息可能会导致超时,从而造成rebalance
方法2 (单个(或多个)consumer,多个worker线程) 可独立扩展consumer数和worker数,伸缩性好 实现麻烦通常难于维护分区内的消息顺序处理链路变长,导致难以保证提交位移的语义正确性

群组协调

​ 消费者要加入群组时,会向群组协调器发送一个 JoinGroup 请求,第一个加入群主的消费者成为群主,群主会获得群组的成员列表,并负责给每一个消费者分配分区。 分配完毕后,群主把分配情况发送给群组协调器,协调器再把这些信息发送给所有的消费者, 每个消费者只能看到自己的分配信息,只有群主知道群组里所有消费者的分配信息。群组协调的工作会在消费者发生变化(新加入或者掉线),主题中分区发生了变化(增加)时发生。

分区再均衡

​ 当消费者群组里的消费者发生变化,或者主题里的分区发生了变化,都会导致再均衡现象的发生。从前面的知识中,我们知道,Kafka 中,存在着消费者对分区所有权的关系,这样无论是消费者变化,比如增加了消费者, 新消费者会读取原本由其他消费者读取的分区,消费者减少,原本由它负责的分区要由其他消费者来读取,增加了分区,哪个消费者来读取这个新增的分区,这些行为,都会导致分区所有权的变化,这种变化就被称为再均衡。

​ 再均衡对 Kafka 很重要,这是消费者群组带来高可用性和伸缩性的关键所在。不过一般情况下,尽量减少再均衡,因为再均衡期间,消费者是无法读取消息的,会造成整个群组一小段时间的不可用

​ 消费者通过向称为群组协调器的 broker(不同的群组有不同的协调器)发送心跳来维持它和群组的从属关系以及对分区的所有权关系。如果消费者长时间不发送心跳,群组协调器认为它已经死亡,就会触发一次再均衡。
在 0.10.1 及以后的版本中,心跳由单独的线程负责,相关的控制参数为 max.poll.interval.ms

再均衡监听器

​ 在分区重平衡前,如果消费者知道它即将不再负责某个分区,那么它可能需要将已经处理过的消息位移进行提交。Kafka的API允许我们在消费者新增分区或者失去分区时进行处理,我们只需要在调用subscribe()方法时传入ConsumerRebalanceListener对象,该对象有两个方法:

  • public void onPartitionRevoked(Collection partitions):此方法会在消费者停止消费消费后,在重平衡开始前调用。
  • public void onPartitionAssigned(Collection partitions):此方法在分区分配给消费者后,在消费者开始读取消息前调用。

下面来看一个onPartitionRevoked()的例子,该例子在消费者失去某个分区时提交位移(以便其他消费者可以接着消费消息并处理):

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
private Map<TopicPartition, OffsetAndMetadata> currentOffsets = new HashMap<>();

private class HandleRebalance implements ConsumerRebalanceListener {
public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
}

public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
System.out.println("Lost partitions in rebalance.
Committing current
offsets:" + currentOffsets);
consumer.commitSync(currentOffsets);
}
}

try {
consumer.subscribe(topics, new HandleRebalance());
while (true) {
ConsumerRecords<String, String> records = consumer.poll(100);
for (ConsumerRecord<String, String> record : records)
{
System.out.printf("topic = %s, partition = %s, offset = %d, customer = %s, country = %s\n", record.topic(), record.partition(), record.offset(), record.key(), record.value());
currentOffsets.put(new TopicPartition(record.topic(), record.partition()), new OffsetAndMetadata(record.offset()+1, "no metadata"));
}
consumer.commitAsync(currentOffsets, null);
}
} catch (WakeupException e) {
// ignore, we're closing
} catch (Exception e) {
log.error("Unexpected error", e);
} finally {
try {
consumer.commitSync(currentOffsets);
} finally {
consumer.close();
System.out.println("Closed consumer and we are done");
}
}

​ 代码中实现了onPartitionsRevoked()方法,当消费者失去某个分区时,会提交已经处理的消息位移(而不是poll()的最大位移)。上面代码会提交所有的分区位移,而不仅仅是失去分区的位移,但这种做法没什么坏处。

从指定位移开始消费

​ 在此之前,我们使用poll()来从最后的提交位移开始消费,但我们也可以从一个指定的位移开始消费。

​ 如果想从分区开始端重新开始消费,那么可以使用seekToBeginning(TopicPartition tp);如果想从分区的最末端消费最新的消息,那么可以使用seekToEnd(TopicPartition tp)。而且,Kafka还支持我们从指定位移开始消费。从指定位移开始消费的应用场景有很多,其中最典型的一个是:位移存在其他系统(例如数据库)中,并且以其他系统的位移为准。

​ 考虑这么个场景:我们从Kafka中读取消费,然后进行处理,最后把结果写入数据库;我们既不想丢失消息,也不想数据库中存在重复的消息数据。对于这样的场景,我们可能会按如下逻辑处理:

1
2
3
4
5
6
7
8
9
10
while (true) {
ConsumerRecords<String, String> records = consumer.poll(100);
for (ConsumerRecord<String, String> record : records)
{
currentOffsets.put(new TopicPartition(record.topic(), record.partition()), record.offset());
processRecord(record);
storeRecordInDB(record);
consumer.commitAsync(currentOffsets);
}
}

​ 这个逻辑似乎没什么问题,但是要注意到这么个事实,在持久化到数据库成功后,提交位移到Kafka可能会失败,那么这可能会导致消息会重复处理。对于这种情况,我们可以优化方案,将持久化到数据库与提交位移实现为原子性操作,也就是要么同时成功,要么同时失败。但这个是不可能的,因此我们可以在保存记录到数据库的同时,也保存位移,然后在消费者开始消费时使用数据库的位移开始消费。这个方案是可行的,我们只需要通过seek()来指定分区位移开始消费即可。下面是一个改进的样例代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
public class SaveOffsetsOnRebalance implements ConsumerRebalanceListener {
public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
//在消费者负责的分区被回收前提交数据库事务,保存消费的记录和位移
commitDBTransaction();
}

public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
//在开始消费前,从数据库中获取分区的位移,并使用seek()来指定开始消费的位移
for(TopicPartition partition: partitions)
consumer.seek(partition, getOffsetFromDB(partition));
}
}

consumer.subscribe(topics, new SaveOffsetOnRebalance(consumer));
//在subscribe()之后poll一次,并从数据库中获取分区的位移,使用seek()来指定开始消费的位移
consumer.poll(0);
for (TopicPartition partition: consumer.assignment())
consumer.seek(partition, getOffsetFromDB(partition));

while (true) {
ConsumerRecords<String, String> records = consumer.poll(100);
for (ConsumerRecord<String, String> record : records)
{
processRecord(record);
//保存记录结果
storeRecordInDB(record);
//保存位移
storeOffsetInDB(record.topic(), record.partition(), record.offset());
}
//提交数据库事务,保存消费的记录以及位移
commitDBTransaction();
}

​ 具体逻辑见代码注释,此处不再赘述。另外注意的是,seek()只是指定了poll()拉取的开始位移,这并不影响在Kafka中保存的提交位移(当然我们可以在seek和poll之后提交位移覆盖)。

优雅退出

​ 下面我们来讨论下消费者如何优雅退出。

​ 在一般情况下,我们会在一个主线程中循环poll消息并进行处理。当需要退出poll循环时,我们可以使用另一个线程调用consumer.wakeup(),调用此方法会使得poll()抛出WakeupException。如果调用wakup时,主线程正在处理消息,那么在下一次主线程调用poll时会抛出异常。主线程在抛出WakeUpException后,需要调用consumer.close(),此方法会提交位移,同时发送一个退出消费组的消息到Kafka的组协调者。组协调者收到消息后会立即进行重平衡(而无需等待此消费者会话过期)。

下面是一个优雅退出的样例代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
//注册JVM关闭时的回调钩子,当JVM关闭时调用此钩子。
Runtime.getRuntime().addShutdownHook(new Thread() {
public void run() {
System.out.println("Starting exit...");
//调用消费者的wakeup方法通知主线程退出
consumer.wakeup();
try {
//等待主线程退出
mainThread.join();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
});

...

try {
// looping until ctrl-c, the shutdown hook will cleanup on exit
while (true) {
ConsumerRecords<String, String> records = consumer.poll(1000);
System.out.println(System.currentTimeMillis() + "-- waiting for data...");
for (ConsumerRecord<String, String> record : records) {
System.out.printf("offset = %d, key = %s, value = %s\n",record.offset(), record.key(), record.value());
}
for (TopicPartition tp: consumer.assignment())
System.out.println("Committing offset at position:" + consumer.position(tp));
consumer.commitSync();
}
} catch (WakeupException e) {
// ignore for shutdown
} finally {
consumer.close();
System.out.println("Closed consumer and we are done");
}

反序列化

​ 如前所述,Kafka生产者负责将对象序列化成字节数组并发送到Kafka。消费者则需要将字节数组转换成对象,这就是反序列化做的事情。序列化与反序列化需要匹配,如果序列化使用IntegerSerializer,但使用StringDeserializer来反序列化,那么会反序列化失败。因此作为开发者,我们需要关注写入到主题使用的是什么序列化格式,并且保证写入的数据能够被消费者反序列化成功。如果使用Avro与模式注册中心(Schema Registry)来序列化与反序列化,那么事情会轻松许多,因为AvroSerializer会保证所有写入的数据都是结构兼容的,并且能够被反序列化出来。

下面先来看下如何自定义反序列化,后面会进一步讨论如何使用Avro。

自定义反序列化

首先,假设序列化的对象为Customer:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
public class Customer {
private int customerID;
private String customerName;
public Customer(int ID, String name) {
this.customerID = ID;
this.customerName = name;
}
public int getID() {
return customerID;
}
public String getName() {
return customerName;
}
}

根据之前的序列化策略,我们的反序列化代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37

import org.apache.kafka.common.errors.SerializationException;
import java.nio.ByteBuffer;
import java.util.Map;

public class CustomerDeserializer implements Deserializer<Customer> {
@Override
public void configure(Map configs, boolean isKey) {
// nothing to configure
}

@Override
public Customer deserialize(String topic, byte[] data) {
int id;
int nameSize;
String name;
try {
if (data == null)
return null;
if (data.length < 8)
throw new SerializationException("Size of data received by IntegerDeserializer is shorter than expected");
ByteBuffer buffer = ByteBuffer.wrap(data);
id = buffer.getInt();
String nameSize = buffer.getInt();
byte[] nameBytes = new Array[Byte](nameSize);
buffer.get(nameBytes);
name = new String(nameBytes, 'UTF-8');
return new Customer(id, name);
} catch (Exception e) {
throw new SerializationException("Error when serializing Customer to byte[] " + e);
}
}
@Override
public void close() {
// nothing to close
}
}

消费者使用这个反序列化的代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
Properties props = new Properties();
props.put("bootstrap.servers", "broker1:9092,broker2:9092");
props.put("group.id", "CountryCounter");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.CustomerDeserializer");

KafkaConsumer<String, Customer> consumer = new KafkaConsumer<>(props);
consumer.subscribe("customerCountries")
while (true) {
ConsumerRecords<String, Customer> records = consumer.poll(100);
for (ConsumerRecord<String, Customer> record : records)
{
System.out.println("current customer Id: " + record.value().getId() + " and current customer name: " + record.value().getName());
}
}

​ 最后提醒下,我们并不推荐实现自定义的序列化与反序列化,因为往往这些方案并不成熟,难以维护和升级,而且容易出错。我们可以使用JSON、Thrift、Protobuf或者Avro的成熟的解决方案。

使用Avro反序列化

​ 假设我们使用之前生产者Avro序列化时使用的Customer,那么使用Avro反序列化的话,我们的样例代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
Properties props = new Properties();
props.put("bootstrap.servers", "broker1:9092,broker2:9092");
props.put("group.id", "CountryCounter");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
//使用KafkaAvroDeserializer来反序列化Avro消息
props.put("value.deserializer", "io.confluent.kafka.serializers.KafkaAvroDeserializer");
//这里增加了schema.registry.url参数,获取生产者注册的消息模式
props.put("schema.registry.url", schemaUrl);
String topic = "customerContacts"

KafkaConsumer consumer = new KafkaConsumer(createConsumerConfig(brokers, groupId, url));
consumer.subscribe(Collections.singletonList(topic));

System.out.println("Reading topic:" + topic);

while (true) {
//这里使用之前生产者使用的Avro生成的Customer类
ConsumerRecords<String, Customer> records = consumer.poll(1000);
for (ConsumerRecord<String, Customer> record: records) {
System.out.println("Current customer name is: " + record.value().getName());
}
consumer.commitSync();
}

独立消费者

​ 到目前为止,我们讨论了消费者群组,分区被自动分配给群组里的消费者,在群组里新增或移除消费者时自动触发再均衡。 不过有时候可能只需要一个消费者从一个主题的所有分区或者某个特定的分区读取数据。 这个时候就不需要消费者群组和再均衡了,只需要把主题或者分区分配给消费者,然后开始读取消息并提交偏移量

​ 如果是这样的话,就不需要订阅主题,取而代之的是为自己分配分区。一个消费者可以订阅主题(并加入消费者群组),或者为自己分配分区,但不能同时做这两件事情。

下面是一个给单个消费者指定分区进行消费的代码样例:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19

List<PartitionInfo> partitionInfos = null;
//获取主题下所有的分区。如果你知道所指定的分区,可以跳过这一步
partitionInfos = consumer.partitionsFor("topic");

if (partitionInfos != null) {
for (PartitionInfo partition : partitionInfos)
partitions.add(new TopicPartition(partition.topic(), partition.partition()));
//为消费者指定分区
consumer.assign(partitions);

while (true) {
ConsumerRecords<String, String> records = consumer.poll(1000);
for (ConsumerRecord<String, String> record: records) {
System.out.printf("topic = %s, partition = %s, offset = %d, customer = %s, country = %s\n", record.topic(), record.partition(), record.offset(), record.key(), record.value());
}
consumer.commitSync();
}
}

​ 除了需要主动获取分区以及没有分区重平衡,其他的处理逻辑都是一样的。需要注意的是,如果添加了新的分区,这个消费者是感知不到的,需要通过consumer.partitionsFor()来重新获取分区。

评论