抱歉,您的浏览器无法访问本站
本页面需要浏览器支持(启用)JavaScript
了解详情 >

Kafka安装管理

Linux环境安装

准备zookeeper环境

kafka是依赖于zookeeper注册中心的一款分布式消息对列,所以需要有zookeeper单机或者集群环境。

下载Kafka

1
wget http://labfile.oss.aliyuncs.com/courses/859/kafka_2.10-0.10.2.1.tgz

解压Kafka

1
tar -zxvf kafka_2.10-0.10.2.1.tgz 

进入配置文件目录

1
cd /usr/local/software/kafka_2.10-0.10.2.1/config

修改配置文件

在server.properties,添加下面内容:

1
2
3
4
5
broker.id=0
port=9092 #端口号
host.name=172.30.0.9 #服务器IP地址,修改为自己的服务器IP
log.dirs=/usr/local/logs/kafka #日志存放路径,上面创建的目录
zookeeper.connect=localhost:2181 #zookeeper地址和端口,单机配置部署,localhost:2181

编写启动脚本

1
vi kafkastart.sh
1
2
3
4
5
6
#启动zookeeper
/usr/local/software/kafka_2.10-0.10.2.1/bin/zookeeper-server-start.sh /usr/local/software/kafka_2.10-0.10.2.1/config/zookeeper.properties &
#等3秒后执行
sleep 3
#启动kafka
/usr/local/software/kafka_2.10-0.10.2.1/bin/kafka-server-start.sh /usr/local/software/kafka_2.10-0.10.2.1/config/server.properties &

编写关闭脚本

1
vi kafkastop.sh
1
2
3
4
5
6
#关闭zookeeper
/usr/local/software/kafka_2.10-0.10.2.1/bin/zookeeper-server-stop.sh /usr/local/software/kafka_2.10-0.10.2.1/config/zookeeper.properties &
#等3秒后执行
sleep 3
#关闭kafka
/usr/local/software/kafka_2.10-0.10.2.1/bin/kafka-server-stop.sh /usr/local/software/kafka_2.10-0.10.2.1/config/server.properties &

启动脚本,关闭脚本赋予权限

1
2
chmod 777 kafkastart.sh
chmod 777 kafkastop.sh

Docker 安装

普通方式

创建网络
1
docker network create --subnet=172.18.0.0/16 docker_network
运行zk
1
docker run -d --name zookeeper --network=docker_network --ip=172.18.0.5 -p 2181:2181  --privileged=true zookeeper
运行Kafka
1
docker run  -d --name kafka --network=docker_network --ip=172.18.0.10 -p 9092:9092 -e KAFKA_BROKER_ID=0 -e KAFKA_ZOOKEEPER_CONNECT=172.18.0.5:2181 -e KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://172.18.0.10:9092 -e KAFKA_LISTENERS=PLAINTEXT://0.0.0.0:9092 -t wurstmeister/kafka
参数含义
  • KAFKA_BROKER_ID:配置Kafka的broker_id
  • KAFKA_ZOOKEEPER_CONNECT: 配置kafka连接zk的地址 172.18.0.5:218
  • KAFKA_ADVERTISED_LISTENERS:把kafka的地址端口注册给zookeeper
  • KAFKA_LISTENERS:配置kafka的监听端口

docker-compose容器编排

docker-compose.yml

编辑 docker-compose.yml

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
version: '2'   
services:
zookeeper:
image: zookeeper
container_name: zookeeper
privileged: true
networks:
docker_network:
ipv4_address: 172.18.0.5
ports:
- "2181:2181"
kafka:
image: wurstmeister/kafka
hostname: kafka
container_name: kafka
privileged: true
depends_on:
- zookeeper
networks:
docker_network:
ipv4_address: 172.18.0.10
environment:
KAFKA_BROKER_ID: 0
KAFKA_ZOOKEEPER_CONNECT: 172.18.0.5:2181
KAFKA_ADVERTISED_HOST_NAME: 192.168.64.136 ## 修改:宿主机IP
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://192.168.64.136:9092 ## 修改:宿主机IP
KAFKA_ADVERTISED_PORT: 9092
ports:
- "9092:9092"
kafka-manager:
image: sheepkiller/kafka-manager
container_name: kafka-manager
depends_on:
- zookeeper
environment:
ZK_HOSTS: 192.168.64.136 # 修改:宿主机IP
ports:
- "9000:9000"
networks:
docker_network:
ipam:
config:
- subnet: 172.18.0.0/16
gateway: 172.18.0.1

注意: kafka-manager的ZK_HOSTS属性要改成宿主机地址

运行docker-compose
1
docker-compose up -d

访问kafka管理界面

kafka 集群

为何需要Kafka集群

​ 本地开发,一台Kafka足够使用。在实际生产中,集群可以跨服务器进行负载均衡,再则可以使用复制功能来避免单独故障造成的数据丢失。同时集群可以提供高可用性。

如何估算Kafka集群中Broker的数量

要估量以下几个因素:

​ 需要多少磁盘空间保留数据,和每个broker上有多少空间可以用。比如,如果一个集群有10TB的数据需要保留,而每个broker可以存储2TB,那么至少需要5个broker。如果启用了数据复制,则还需要一倍的空间,那么这个集群需要10个broker。

​ 集群处理请求的能力。如果因为磁盘吞吐量和内存不足造成性能问题,可以通过扩展broker来解决。

Broker如何加入Kafka集群

​ 非常简单,只需要两个参数。第一,配置zookeeper.connect,第二,为新增的broker设置一个集群内的唯一性id。

​ Kafka 中的集群是可以动态扩容的。

kafka集群安装

docker-compose方式安装

创建zookeeper挂载目录
1
2
mkdir -p /tmp/data/zkcluster/zookeeper0{1..3}/data
mkdir -p /tmp/data/zkcluster/zookeeper0{1..3}/datalog
创建Kafka日志挂载目录
1
mkdir -p /tmp/data/kfklcuster/kafka0{1..3}/logs
编写docker-compose.yml
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
version: '2'   
services:
zookeeper01:
image: zookeeper
hostname: zookeeper01
container_name: zookeeper01
privileged: true
volumes:
- "/tmp/data/zkcluster/zookeeper01/data:/data"
- "/tmp/data/zkcluster/zookeeper01/datalog:/datalog"
networks:
docker_network:
ipv4_address: 172.18.0.5
environment:
ZOO_MY_ID: 1
TZ: Asia/Shanghai
ZOO_SERVERS: server.1=zookeeper01:2888:3888;2181 server.2=zookeeper02:2888:3888;2181 server.3=zookeeper03:2888:3888;2181
ports:
- "2181:2181"
zookeeper02:
image: zookeeper
hostname: zookeeper02
container_name: zookeeper02
privileged: true
volumes:
- "/tmp/data/zkcluster/zookeeper02/data:/data"
- "/tmp/data/zkcluster/zookeeper02/datalog:/datalog"
networks:
docker_network:
ipv4_address: 172.18.0.6
environment:
ZOO_MY_ID: 2
TZ: Asia/Shanghai
ZOO_SERVERS: server.1=zookeeper01:2888:3888;2181 server.2=zookeeper02:2888:3888;2181 server.3=zookeeper03:2888:3888;2181
ports:
- "2182:2181"
zookeeper03:
image: zookeeper
hostname: zookeeper03
container_name: zookeeper03
privileged: true
volumes:
- "/tmp/data/zkcluster/zookeeper03/data:/data"
- "/tmp/data/zkcluster/zookeeper03/datalog:/datalog"
networks:
docker_network:
ipv4_address: 172.18.0.7
environment:
ZOO_MY_ID: 3
TZ: Asia/Shanghai
ZOO_SERVERS: server.1=zookeeper01:2888:3888;2181 server.2=zookeeper02:2888:3888;2181 server.3=zookeeper03:2888:3888;2181
ports:
- "2183:2181"
kafka01:
image: wurstmeister/kafka
hostname: kafka01
container_name: kafka01
privileged: true
volumes:
- "/tmp/data/kfklcuster/kafka01/logs:/kafka"
depends_on:
- zookeeper01
- zookeeper02
- zookeeper03
networks:
docker_network:
ipv4_address: 172.18.0.10
environment:
KAFKA_BROKER_ID: 0
KAFKA_ZOOKEEPER_CONNECT: zookeeper01:2181,zookeeper02:2181,zookeeper03:2181
KAFKA_ADVERTISED_HOST_NAME: 192.168.64.141 ## 修改:宿主机IP
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://192.168.64.141:9092 ## 修改:宿主机IP
KAFKA_ADVERTISED_PORT: 9093
ports:
- "9092:9092"
kafka02:
image: wurstmeister/kafka
hostname: kafka02
container_name: kafka02
privileged: true
volumes:
- "/tmp/data/kfklcuster/kafka02/logs:/kafka"
depends_on:
- zookeeper01
- zookeeper02
- zookeeper03
networks:
docker_network:
ipv4_address: 172.18.0.11
environment:
KAFKA_BROKER_ID: 1
KAFKA_ZOOKEEPER_CONNECT: zookeeper01:2181,zookeeper02:2181,zookeeper03:2181
KAFKA_ADVERTISED_HOST_NAME: 192.168.64.141 ## 修改:宿主机IP
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://192.168.64.141:9093 ## 修改:宿主机IP
KAFKA_ADVERTISED_PORT: 9093
ports:
- "9093:9092"
kafka03:
image: wurstmeister/kafka
hostname: kafka03
container_name: kafka03
privileged: true
volumes:
- "/tmp/data/kfklcuster/kafka03/logs:/kafka"
depends_on:
- zookeeper01
- zookeeper02
- zookeeper03
networks:
docker_network:
ipv4_address: 172.18.0.12
environment:
KAFKA_BROKER_ID: 2
KAFKA_ZOOKEEPER_CONNECT: zookeeper01:2181,zookeeper02:2181,zookeeper03:2181
KAFKA_ADVERTISED_HOST_NAME: 192.168.64.141 ## 修改:宿主机IP
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://192.168.64.141:9094 ## 修改:宿主机IP
KAFKA_ADVERTISED_PORT: 9094
ports:
- "9094:9092"
kafka-manager:
image: sheepkiller/kafka-manager
container_name: kafka-manager
depends_on:
- zookeeper01
- zookeeper02
- zookeeper03
environment:
ZK_HOSTS: 192.168.64.141
ports:
- "9000:9000"
networks:
docker_network:
ipam:
config:
- subnet: 172.18.0.0/16
gateway: 172.18.0.1

注意: kafka-manager的ZK_HOSTS属性要改成宿主机地址

运行docker-compose
1
docker-compose up -d

查看zookeeper集群状态
1
2
3
docker exec zookeeper01 /bin/bash -c "./bin/zkServer.sh status"
docker exec zookeeper02 /bin/bash -c "./bin/zkServer.sh status"
docker exec zookeeper03 /bin/bash -c "./bin/zkServer.sh status"

到这里zk已经启动成功了

Kafka集群测试

创建主题

登录kafka容器

1
docker exec -ti kafka01 /bin/bash

创建主题

1
kafka-topics.sh --zookeeper zookeeper01:2181 --create --topic hello-kafka --replication-factor 1 --partitions 4

列出所有主题

1
kafka-topics.sh --list --zookeeper zookeeper01:2181

查看主题详情

1
kafka-topics.sh --zookeeper zookeeper01:2181 --describe --topic hello-kafka

创建项目

引入坐标
1
2
3
4
5
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>2.3.0</version>
</dependency>
生产者发送消息
生产者代码

注意指定生成者的broker地址清单,key和value的序列化器

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
public class MyKafkaProducer {
public static void main(String[] args) {
//生产者三个属性必须指定(broker地址清单,key和value的序列化器)
Properties kafkaProperties = new Properties();
kafkaProperties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.64.141:9092,192.168.64.141:9093,192.168.64.141:9094");
kafkaProperties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
kafkaProperties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
KafkaProducer producer = new KafkaProducer(kafkaProperties);
for (int i = 0; i < 10; i++) {
ProducerRecord record = new ProducerRecord("hello-kafka", "key" + i, "message" + i);
try {
Future result = producer.send(record);
System.out.println(result.get());
} catch (Exception e) {
e.printStackTrace();
}
}

}
}

必选属性

创建生产者对象时有三个属性必须指定。

  • bootstrap.servers

    ​ 该属性指定 broker 的地址清单, 地址的格式为 host:port。 清单里不需要包含所有的 broker 地址, 生产者会从给定的 broker 里查询其他 broker 的信息。不过最少提供 2 个 broker 的信息(用逗号分隔, 比如: 192.168.64.141:9092,192.168.64.141:9093,192.168.64.141:9094), 一旦其中一个宕机, 生产者仍能连接到集群上。

  • key.serializer

    是表示消息中的键的序列化器,网络传输需要进行序列化成字节数组

    ​ 生产者接口允许使用参数化类型, 可以把 Java 对象作为键和值传 broker, 但是 broker 希望收到的消息的键和值都是字节数组, 所以, 必须提供将对象序列化成字节数组的序列化器。 key.serializer 必须设置为实现 org.apache.kafka.common.serialization.Serializer 的接口类, Kafka 的客户端默认提供了
    ByteArraySerializer,IntegerSerializer, StringSerializer, 也可以实现自定义的序列化器。

  • value.serializer

    ​ 是表示消息中值的序列化器 同key.serializer

消费者接受消息

Kafka 只提供拉取的方式

消费者代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
public class MyKafkaConsumer {
public static void main(String[] args) {
//消费者三个属性必须指定(broker地址清单,key和value的序列化器)
Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.64.141:9092,192.168.64.141:9093,192.168.64.141:9094");
props.put(ConsumerConfig.GROUP_ID_CONFIG, "group_0");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
//1.创建消费者
KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(props);

//2.订阅Topic
//创建一个只包含单个元素的列表,Topic的名字叫作customerCountries,可以订阅多个主题
consumer.subscribe(Collections.singletonList("hello-kafka"));
//支持正则表达式,订阅所有与test相关的Topic
//consumer.subscribe("test.*");

//3.轮询
//消息轮询是消费者的核心API,通过一个简单的轮询向服务器请求数据,一旦消费者订阅了Topic,轮询就会处理所欲的细节,包括群组协调、partition再均衡、发送心跳
//以及获取数据,开发者只要处理从partition返回的数据即可。
try {
while (true) {//消费者是一个长期运行的程序,通过持续轮询向Kafka请求数据。在其他线程中调用consumer.wakeup()可以退出循环
//在100ms内等待Kafka的broker返回数据.超市参数指定poll在多久之后可以返回,不管有没有可用的数据都要返回
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(500));
for (ConsumerRecord<String, String> record : records) {
System.out.println(String.format("主题:%s, 分区:%d, 偏移量:%d, key:%s, value:%s",
record.topic(), record.partition(), record.offset(), record.key(), record.value())); //统计各个地区的客户数量,即模拟对消息的处理
}
}
} finally {
//退出应用程序前使用close方法关闭消费者,网络连接和socket也会随之关闭,并立即触发一次再均衡
consumer.close();
}
}
}
必选参数

bootstrap.servers、 key.serializer、 value.serializer 含义同生产者

  • group.id

    ​ 并非完全必需, 它指定了消费者属于哪一个群组, 但是创建不属于任何一个群组的消费者并没有问题

运行测试

自动提交代码

加入如下配置

1
2
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "2000");

​ 一旦设置了 enable.auto.commit 为 true,Kafka 会保证在开始调用 poll 方法时,提交上次 poll 返回的所有消息。从顺序上来说,poll 方法的逻辑是先提交上一批消息的位移,再处理下一批消息,因此它能保证不出现消费丢失的情况。但自动提交位移的一个问题在于,它可能会出现重复消费。

在默认情况下,Consumer 每 5 秒自动提交一次位移。我们假设提交位移之后的 3 秒发生了 Rebalance 操作。在 Rebalance 之后,所有 Consumer 从上一次提交的位移处继续消费,但该位移已经是 3 秒前的位移数据了,故在 Rebalance 发生前 3 秒消费的所有数据都要重新再消费一次。虽然你能够通过减少 auto.commit.interval.ms 的值来提高提交频率,但这么做只能缩小重复消费的时间窗口,不可能完全消除它。这是自动提交机制的一个缺陷。

生产者发送消息
1
2
3
4
5
6
7
8
9
10
hello-kafka-2@0
hello-kafka-0@2
hello-kafka-3@5
hello-kafka-1@2
hello-kafka-1@3
hello-kafka-3@6
hello-kafka-0@3
hello-kafka-3@7
hello-kafka-3@8
hello-kafka-3@9
消费者接收数据
1
2
3
4
5
6
7
8
9
10
主题:hello-kafka, 分区:2, 偏移量:0, key:key0, value:message0
主题:hello-kafka, 分区:0, 偏移量:2, key:key1, value:message1
主题:hello-kafka, 分区:3, 偏移量:5, key:key2, value:message2
主题:hello-kafka, 分区:1, 偏移量:2, key:key3, value:message3
主题:hello-kafka, 分区:3, 偏移量:6, key:key5, value:message5
主题:hello-kafka, 分区:0, 偏移量:3, key:key6, value:message6
主题:hello-kafka, 分区:1, 偏移量:3, key:key4, value:message4
主题:hello-kafka, 分区:3, 偏移量:7, key:key7, value:message7
主题:hello-kafka, 分区:3, 偏移量:8, key:key8, value:message8
主题:hello-kafka, 分区:3, 偏移量:9, key:key9, value:message9

基本的操作和管理

kafka的基本管理命令都在安装目录下面的bin文件夹。跳转到安装目录,可以执行一下基本的管理命令,比如创建topic,管理消费组,消费组消费进度查询等

列出所有主题

1
kafka-topics.sh --list --zookeeper zk_host:port

创建topic

1
kafka-topics.sh --zookeeper zk_host:port/chroot--create --topic Hello-kafka  --partitions 20 --replication-factor 3

partitions为分区数,分区越多,并发能力越强。replication_factor为副本数,避免单点故障

修改topic

1
kafka-topics.sh --zookeeper zk_host:port/chroot--alter --topic Hello-kafka --partitions 40

partitions数量改为40,可能因为消费端能力不够,需要增加消费者

删除主题

1
kafka-topics.sh --zookeeper zk_host:port --delete --topic Hello-kafka

如果 delete.topic.enable 未设置为true,则此操作不会产生任何影响

优雅的关机

​ broker意外关闭的时候,没有所谓的优雅关机,只能够等待重新选举出leader。如果broker主动关闭,可以设置一个参数controlled.shutdown.enable=true来控制是否优雅的关闭。如果参数为true,那么broker关闭前,会将自己节点的为leader的partition的领导权限转交给其他节点。这样,减少了重新选举造成topic不可用的时间。

broker自动平衡

当一个broker主动关闭或者crash的时候,partition的领导权限将会转交给其他broker。如果crash的broker重新启动,它将只会是其他broker的follower,不会被客户端使用。这样会导致每个broker的压力不平衡。这个时候,我们可以设置一些参数,让broker们自动平衡:

1
2
3
4
5
# 是否自动平衡broker之间的分配策略
auto.leader.rebalance.enable = true
# leader的不平衡比例,若是超过这个数值,会对分区进行重新的平衡leader.imbalance.per.broker.percentage = 10
# 检查leader是否不平衡的时间间隔
leader.imbalance.check.interval.seconds = 300

查看消费进度

当一个topic被很多消费者消费的时候,我们可能需要知道某一个消费者的消费进度情况,可以运行以下令进行查看:

1
kafka-consumer-groups.sh --bootstrap-server localhost:9092 --describe --group my-group

这条命令是查看消费组my_group的消费进度。如果是老版本的kafka,需要通过如下命令查看

1
bin/kafka-consumer-groups.sh --zookeeper localhost:2181 --describe --group my-group

因为新版本的kafka,offset才存放在broker,老版本0.8及以下存放在zookeeper。

增加副本

当我们新建topic没有指定副本的时候,后续如果认为不安全,需要增加副本的话,需要如下操作

1
2
3
cat increase-replication-factor.json

{"version":1,"partitions":[{"topic":"foo","partition":0,"replicas":[5,6,7]}]}

第一步,新建一个json文件,表示要修改哪个topic,副本存放到哪几个broker。比如上面的例子是leader在5号broker,需要在6,7号broker新增2个副本。

1
bin/kafka-reassign-partitions.sh --zookeeper localhost:2181 --reassignment-json-file increase-replication-factor.json --execute

第二步,执行定义好的分配方案,会有提示

1
bin/kafka-reassign-partitions.sh --zookeeper localhost:2181 --reassignment-json-file increase-replication-factor.json --verify

评论