Kafka安装管理
Linux环境安装
准备zookeeper环境
kafka是依赖于zookeeper注册中心的一款分布式消息对列,所以需要有zookeeper单机或者集群环境。
下载Kafka
1
| wget http://labfile.oss.aliyuncs.com/courses/859/kafka_2.10-0.10.2.1.tgz
|
解压Kafka
1
| tar -zxvf kafka_2.10-0.10.2.1.tgz
|
进入配置文件目录
1
| cd /usr/local/software/kafka_2.10-0.10.2.1/config
|
修改配置文件
在server.properties,添加下面内容:
1 2 3 4 5
| broker.id=0 port=9092 #端口号 host.name=172.30.0.9 #服务器IP地址,修改为自己的服务器IP log.dirs=/usr/local/logs/kafka #日志存放路径,上面创建的目录 zookeeper.connect=localhost:2181 #zookeeper地址和端口,单机配置部署,localhost:2181
|
编写启动脚本
1 2 3 4 5 6
| #启动zookeeper /usr/local/software/kafka_2.10-0.10.2.1/bin/zookeeper-server-start.sh /usr/local/software/kafka_2.10-0.10.2.1/config/zookeeper.properties & #等3秒后执行 sleep 3 #启动kafka /usr/local/software/kafka_2.10-0.10.2.1/bin/kafka-server-start.sh /usr/local/software/kafka_2.10-0.10.2.1/config/server.properties &
|
编写关闭脚本
1 2 3 4 5 6
| #关闭zookeeper /usr/local/software/kafka_2.10-0.10.2.1/bin/zookeeper-server-stop.sh /usr/local/software/kafka_2.10-0.10.2.1/config/zookeeper.properties & #等3秒后执行 sleep 3 #关闭kafka /usr/local/software/kafka_2.10-0.10.2.1/bin/kafka-server-stop.sh /usr/local/software/kafka_2.10-0.10.2.1/config/server.properties &
|
启动脚本,关闭脚本赋予权限
1 2
| chmod 777 kafkastart.sh chmod 777 kafkastop.sh
|
Docker 安装
普通方式
创建网络
1
| docker network create --subnet=172.18.0.0/16 docker_network
|
运行zk
1
| docker run -d --name zookeeper --network=docker_network --ip=172.18.0.5 -p 2181:2181 --privileged=true zookeeper
|
运行Kafka
1
| docker run -d --name kafka --network=docker_network --ip=172.18.0.10 -p 9092:9092 -e KAFKA_BROKER_ID=0 -e KAFKA_ZOOKEEPER_CONNECT=172.18.0.5:2181 -e KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://172.18.0.10:9092 -e KAFKA_LISTENERS=PLAINTEXT://0.0.0.0:9092 -t wurstmeister/kafka
|
参数含义
- KAFKA_BROKER_ID:配置Kafka的broker_id
- KAFKA_ZOOKEEPER_CONNECT: 配置kafka连接zk的地址 172.18.0.5:218
- KAFKA_ADVERTISED_LISTENERS:把kafka的地址端口注册给zookeeper
- KAFKA_LISTENERS:配置kafka的监听端口
docker-compose容器编排
docker-compose.yml
编辑 docker-compose.yml
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44
| version: '2' services: zookeeper: image: zookeeper container_name: zookeeper privileged: true networks: docker_network: ipv4_address: 172.18.0.5 ports: - "2181:2181" kafka: image: wurstmeister/kafka hostname: kafka container_name: kafka privileged: true depends_on: - zookeeper networks: docker_network: ipv4_address: 172.18.0.10 environment: KAFKA_BROKER_ID: 0 KAFKA_ZOOKEEPER_CONNECT: 172.18.0.5:2181 KAFKA_ADVERTISED_HOST_NAME: 192.168.64.136 KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://192.168.64.136:9092 KAFKA_ADVERTISED_PORT: 9092 ports: - "9092:9092" kafka-manager: image: sheepkiller/kafka-manager container_name: kafka-manager depends_on: - zookeeper environment: ZK_HOSTS: 192.168.64.136 ports: - "9000:9000" networks: docker_network: ipam: config: - subnet: 172.18.0.0/16 gateway: 172.18.0.1
|
注意: kafka-manager的ZK_HOSTS属性要改成宿主机地址
运行docker-compose
访问kafka管理界面
kafka 集群
为何需要Kafka集群
本地开发,一台Kafka足够使用。在实际生产中,集群可以跨服务器进行负载均衡,再则可以使用复制功能来避免单独故障造成的数据丢失。同时集群可以提供高可用性。
如何估算Kafka集群中Broker的数量
要估量以下几个因素:
需要多少磁盘空间保留数据,和每个broker上有多少空间可以用。比如,如果一个集群有10TB的数据需要保留,而每个broker可以存储2TB,那么至少需要5个broker。如果启用了数据复制,则还需要一倍的空间,那么这个集群需要10个broker。
集群处理请求的能力。如果因为磁盘吞吐量和内存不足造成性能问题,可以通过扩展broker来解决。
Broker如何加入Kafka集群
非常简单,只需要两个参数。第一,配置zookeeper.connect,第二,为新增的broker设置一个集群内的唯一性id。
Kafka 中的集群是可以动态扩容的。
kafka集群安装
docker-compose方式安装
创建zookeeper挂载目录
1 2
| mkdir -p /tmp/data/zkcluster/zookeeper0{1..3}/data mkdir -p /tmp/data/zkcluster/zookeeper0{1..3}/datalog
|
创建Kafka日志挂载目录
1
| mkdir -p /tmp/data/kfklcuster/kafka0{1..3}/logs
|
编写docker-compose.yml
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136
| version: '2' services: zookeeper01: image: zookeeper hostname: zookeeper01 container_name: zookeeper01 privileged: true volumes: - "/tmp/data/zkcluster/zookeeper01/data:/data" - "/tmp/data/zkcluster/zookeeper01/datalog:/datalog" networks: docker_network: ipv4_address: 172.18.0.5 environment: ZOO_MY_ID: 1 TZ: Asia/Shanghai ZOO_SERVERS: server.1=zookeeper01:2888:3888;2181 server.2=zookeeper02:2888:3888;2181 server.3=zookeeper03:2888:3888;2181 ports: - "2181:2181" zookeeper02: image: zookeeper hostname: zookeeper02 container_name: zookeeper02 privileged: true volumes: - "/tmp/data/zkcluster/zookeeper02/data:/data" - "/tmp/data/zkcluster/zookeeper02/datalog:/datalog" networks: docker_network: ipv4_address: 172.18.0.6 environment: ZOO_MY_ID: 2 TZ: Asia/Shanghai ZOO_SERVERS: server.1=zookeeper01:2888:3888;2181 server.2=zookeeper02:2888:3888;2181 server.3=zookeeper03:2888:3888;2181 ports: - "2182:2181" zookeeper03: image: zookeeper hostname: zookeeper03 container_name: zookeeper03 privileged: true volumes: - "/tmp/data/zkcluster/zookeeper03/data:/data" - "/tmp/data/zkcluster/zookeeper03/datalog:/datalog" networks: docker_network: ipv4_address: 172.18.0.7 environment: ZOO_MY_ID: 3 TZ: Asia/Shanghai ZOO_SERVERS: server.1=zookeeper01:2888:3888;2181 server.2=zookeeper02:2888:3888;2181 server.3=zookeeper03:2888:3888;2181 ports: - "2183:2181" kafka01: image: wurstmeister/kafka hostname: kafka01 container_name: kafka01 privileged: true volumes: - "/tmp/data/kfklcuster/kafka01/logs:/kafka" depends_on: - zookeeper01 - zookeeper02 - zookeeper03 networks: docker_network: ipv4_address: 172.18.0.10 environment: KAFKA_BROKER_ID: 0 KAFKA_ZOOKEEPER_CONNECT: zookeeper01:2181,zookeeper02:2181,zookeeper03:2181 KAFKA_ADVERTISED_HOST_NAME: 192.168.64.141 KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://192.168.64.141:9092 KAFKA_ADVERTISED_PORT: 9093 ports: - "9092:9092" kafka02: image: wurstmeister/kafka hostname: kafka02 container_name: kafka02 privileged: true volumes: - "/tmp/data/kfklcuster/kafka02/logs:/kafka" depends_on: - zookeeper01 - zookeeper02 - zookeeper03 networks: docker_network: ipv4_address: 172.18.0.11 environment: KAFKA_BROKER_ID: 1 KAFKA_ZOOKEEPER_CONNECT: zookeeper01:2181,zookeeper02:2181,zookeeper03:2181 KAFKA_ADVERTISED_HOST_NAME: 192.168.64.141 KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://192.168.64.141:9093 KAFKA_ADVERTISED_PORT: 9093 ports: - "9093:9092" kafka03: image: wurstmeister/kafka hostname: kafka03 container_name: kafka03 privileged: true volumes: - "/tmp/data/kfklcuster/kafka03/logs:/kafka" depends_on: - zookeeper01 - zookeeper02 - zookeeper03 networks: docker_network: ipv4_address: 172.18.0.12 environment: KAFKA_BROKER_ID: 2 KAFKA_ZOOKEEPER_CONNECT: zookeeper01:2181,zookeeper02:2181,zookeeper03:2181 KAFKA_ADVERTISED_HOST_NAME: 192.168.64.141 KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://192.168.64.141:9094 KAFKA_ADVERTISED_PORT: 9094 ports: - "9094:9092" kafka-manager: image: sheepkiller/kafka-manager container_name: kafka-manager depends_on: - zookeeper01 - zookeeper02 - zookeeper03 environment: ZK_HOSTS: 192.168.64.141 ports: - "9000:9000" networks: docker_network: ipam: config: - subnet: 172.18.0.0/16 gateway: 172.18.0.1
|
注意: kafka-manager的ZK_HOSTS属性要改成宿主机地址
运行docker-compose
查看zookeeper集群状态
1 2 3
| docker exec zookeeper01 /bin/bash -c "./bin/zkServer.sh status" docker exec zookeeper02 /bin/bash -c "./bin/zkServer.sh status" docker exec zookeeper03 /bin/bash -c "./bin/zkServer.sh status"
|
到这里zk已经启动成功了
Kafka集群测试
创建主题
登录kafka容器
1
| docker exec -ti kafka01 /bin/bash
|
创建主题
1
| kafka-topics.sh --zookeeper zookeeper01:2181 --create --topic hello-kafka --replication-factor 1 --partitions 4
|
列出所有主题
1
| kafka-topics.sh --list --zookeeper zookeeper01:2181
|
查看主题详情
1
| kafka-topics.sh --zookeeper zookeeper01:2181 --describe --topic hello-kafka
|
创建项目
引入坐标
1 2 3 4 5
| <dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka-clients</artifactId> <version>2.3.0</version> </dependency>
|
生产者发送消息
生产者代码
注意指定生成者的broker地址清单,key和value的序列化器
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21
| public class MyKafkaProducer { public static void main(String[] args) { Properties kafkaProperties = new Properties(); kafkaProperties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.64.141:9092,192.168.64.141:9093,192.168.64.141:9094"); kafkaProperties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer"); kafkaProperties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer"); KafkaProducer producer = new KafkaProducer(kafkaProperties); for (int i = 0; i < 10; i++) { ProducerRecord record = new ProducerRecord("hello-kafka", "key" + i, "message" + i); try { Future result = producer.send(record); System.out.println(result.get()); } catch (Exception e) { e.printStackTrace(); } }
} }
|
必选属性
创建生产者对象时有三个属性必须指定。
bootstrap.servers
该属性指定 broker 的地址清单, 地址的格式为 host:port。 清单里不需要包含所有的 broker 地址, 生产者会从给定的 broker 里查询其他 broker 的信息。不过最少提供 2 个 broker 的信息(用逗号分隔, 比如: 192.168.64.141:9092,192.168.64.141:9093,192.168.64.141:9094), 一旦其中一个宕机, 生产者仍能连接到集群上。
key.serializer
是表示消息中的键的序列化器,网络传输需要进行序列化成字节数组
生产者接口允许使用参数化类型, 可以把 Java 对象作为键和值传 broker, 但是 broker 希望收到的消息的键和值都是字节数组, 所以, 必须提供将对象序列化成字节数组的序列化器。 key.serializer 必须设置为实现 org.apache.kafka.common.serialization.Serializer 的接口类, Kafka 的客户端默认提供了
ByteArraySerializer,IntegerSerializer, StringSerializer, 也可以实现自定义的序列化器。
value.serializer
是表示消息中值的序列化器 同key.serializer
消费者接受消息
Kafka 只提供拉取的方式
消费者代码
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35
| public class MyKafkaConsumer { public static void main(String[] args) { Properties props = new Properties(); props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.64.141:9092,192.168.64.141:9093,192.168.64.141:9094"); props.put(ConsumerConfig.GROUP_ID_CONFIG, "group_0"); props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer"); props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer"); KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(props);
consumer.subscribe(Collections.singletonList("hello-kafka"));
try { while (true) { ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(500)); for (ConsumerRecord<String, String> record : records) { System.out.println(String.format("主题:%s, 分区:%d, 偏移量:%d, key:%s, value:%s", record.topic(), record.partition(), record.offset(), record.key(), record.value())); } } } finally { consumer.close(); } } }
|
必选参数
bootstrap.servers、 key.serializer、 value.serializer 含义同生产者
运行测试
自动提交代码
加入如下配置
1 2
| props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false"); props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "2000");
|
一旦设置了 enable.auto.commit 为 true,Kafka 会保证在开始调用 poll 方法时,提交上次 poll 返回的所有消息。从顺序上来说,poll 方法的逻辑是先提交上一批消息的位移,再处理下一批消息,因此它能保证不出现消费丢失的情况。但自动提交位移的一个问题在于,它可能会出现重复消费。
在默认情况下,Consumer 每 5 秒自动提交一次位移。我们假设提交位移之后的 3 秒发生了 Rebalance 操作。在 Rebalance 之后,所有 Consumer 从上一次提交的位移处继续消费,但该位移已经是 3 秒前的位移数据了,故在 Rebalance 发生前 3 秒消费的所有数据都要重新再消费一次。虽然你能够通过减少 auto.commit.interval.ms 的值来提高提交频率,但这么做只能缩小重复消费的时间窗口,不可能完全消除它。这是自动提交机制的一个缺陷。
生产者发送消息
1 2 3 4 5 6 7 8 9 10
| hello-kafka-2@0 hello-kafka-0@2 hello-kafka-3@5 hello-kafka-1@2 hello-kafka-1@3 hello-kafka-3@6 hello-kafka-0@3 hello-kafka-3@7 hello-kafka-3@8 hello-kafka-3@9
|
消费者接收数据
1 2 3 4 5 6 7 8 9 10
| 主题:hello-kafka, 分区:2, 偏移量:0, key:key0, value:message0 主题:hello-kafka, 分区:0, 偏移量:2, key:key1, value:message1 主题:hello-kafka, 分区:3, 偏移量:5, key:key2, value:message2 主题:hello-kafka, 分区:1, 偏移量:2, key:key3, value:message3 主题:hello-kafka, 分区:3, 偏移量:6, key:key5, value:message5 主题:hello-kafka, 分区:0, 偏移量:3, key:key6, value:message6 主题:hello-kafka, 分区:1, 偏移量:3, key:key4, value:message4 主题:hello-kafka, 分区:3, 偏移量:7, key:key7, value:message7 主题:hello-kafka, 分区:3, 偏移量:8, key:key8, value:message8 主题:hello-kafka, 分区:3, 偏移量:9, key:key9, value:message9
|
基本的操作和管理
kafka的基本管理命令都在安装目录下面的bin文件夹。跳转到安装目录,可以执行一下基本的管理命令,比如创建topic,管理消费组,消费组消费进度查询等
列出所有主题
1
| kafka-topics.sh --list --zookeeper zk_host:port
|
创建topic
1
| kafka-topics.sh --zookeeper zk_host:port/chroot--create --topic Hello-kafka --partitions 20 --replication-factor 3
|
partitions为分区数,分区越多,并发能力越强。replication_factor为副本数,避免单点故障
修改topic
1
| kafka-topics.sh --zookeeper zk_host:port/chroot--alter --topic Hello-kafka --partitions 40
|
partitions数量改为40,可能因为消费端能力不够,需要增加消费者
删除主题
1
| kafka-topics.sh --zookeeper zk_host:port --delete --topic Hello-kafka
|
如果 delete.topic.enable 未设置为true,则此操作不会产生任何影响
优雅的关机
broker意外关闭的时候,没有所谓的优雅关机,只能够等待重新选举出leader。如果broker主动关闭,可以设置一个参数controlled.shutdown.enable=true来控制是否优雅的关闭。如果参数为true,那么broker关闭前,会将自己节点的为leader的partition的领导权限转交给其他节点。这样,减少了重新选举造成topic不可用的时间。
broker自动平衡
当一个broker主动关闭或者crash的时候,partition的领导权限将会转交给其他broker。如果crash的broker重新启动,它将只会是其他broker的follower,不会被客户端使用。这样会导致每个broker的压力不平衡。这个时候,我们可以设置一些参数,让broker们自动平衡:
1 2 3 4 5
| auto.leader.rebalance.enable = true
leader.imbalance.check.interval.seconds = 300
|
查看消费进度
当一个topic被很多消费者消费的时候,我们可能需要知道某一个消费者的消费进度情况,可以运行以下令进行查看:
1
| kafka-consumer-groups.sh --bootstrap-server localhost:9092 --describe --group my-group
|
这条命令是查看消费组my_group的消费进度。如果是老版本的kafka,需要通过如下命令查看
1
| bin/kafka-consumer-groups.sh --zookeeper localhost:2181 --describe --group my-group
|
因为新版本的kafka,offset才存放在broker,老版本0.8及以下存放在zookeeper。
增加副本
当我们新建topic没有指定副本的时候,后续如果认为不安全,需要增加副本的话,需要如下操作
1 2 3
| cat increase-replication-factor.json
{"version":1,"partitions":[{"topic":"foo","partition":0,"replicas":[5,6,7]}]}
|
第一步,新建一个json文件,表示要修改哪个topic,副本存放到哪几个broker。比如上面的例子是leader在5号broker,需要在6,7号broker新增2个副本。
1
| bin/kafka-reassign-partitions.sh --zookeeper localhost:2181 --reassignment-json-file increase-replication-factor.json --execute
|
第二步,执行定义好的分配方案,会有提示
1
| bin/kafka-reassign-partitions.sh --zookeeper localhost:2181 --reassignment-json-file increase-replication-factor.json --verify
|