抱歉,您的浏览器无法访问本站
本页面需要浏览器支持(启用)JavaScript
了解详情 >

Kafka开发实战

说明

kafka-clients版本和kafka的版本必须匹配,在引入依然前,需先对照一下版本匹配表

Spring kafka Kafka Version kafka-clients
2.3.x 3.2.x 2.1.0
2.2.x 3.1.x 2.0.0, 2.1.0
2.1.x 3.0.x 1.0.x, 1.1.x, 2.0.0
1.3.x 2.3.x 0.11.0.x, 1.0.x

集成Spring配置

​ 其实Spring和Kafka的集合,Spring是把Kafka当做一个消息中间件(MQ)来用,灵活性对比原生API还是没有那么灵活。

pom文件配置

1
2
3
4
5
<dependency>
    <groupId>org.springframework.kafka</groupId>
    <artifactId>spring-kafka</artifactId>
    <version>2.2.0.RELEASE</version>
</dependency>

统一配置

配置文件配置brokers地址:kafka.properties

1
2
3
4
5
#brokers集群

bootstrap.servers=localhost:9092

concurrency=3

生产者端

Spring配置文件配置

applicationContext.xml 中进行配置

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20

<context:property-placeholder location="classpath*:config/kafka.properties" />
<!-- 定义producer的参数 -->
<bean id="producerProperties" class="java.util.HashMap">
    <constructor-arg>
        <map>
            <entry key="bootstrap.servers" value="${bootstrap.servers}" />
            <entry key="key.serializer" value="org.apache.kafka.common.serialization.StringSerializer" />
            <entry key="value.serializer" value="org.apache.kafka.common.serialization.StringSerializer" />
        </map>
    </constructor-arg>
</bean>
<!-- 创建kafkatemplate需要使用的producerfactory bean -->
<bean id="producerFactory" class="org.springframework.kafka.core.DefaultKafkaProducerFactory">
    <constructor-arg>
        <ref bean="producerProperties"/>
    </constructor-arg>
</bean>
<!-- 发送监听器bean -->
<bean id="sendListener" class="cn.chj.service.SendListener" />

创建kafkatemplate bean,使用的时候只需要注入这个bean,即可使用template的send消息方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43

<bean id="kafkaTemplate" class="org.springframework.kafka.core.KafkaTemplate">
    <constructor-arg ref="producerFactory" />
    <constructor-arg name="autoFlush" value="true" />
    <!-- 配置发送监听器bean -->
    <property name="producerListener" ref="sendListener"></property>
</bean>
<!-- 1.定义consumer的参数 -->
<bean id="consumerProperties" class="java.util.HashMap">
    <constructor-arg>
        <map>
            <entry key="bootstrap.servers" value="${bootstrap.servers}" />
            <entry key="group.id" value="spring-kafka-group" />
            <entry key="key.deserializer" value="org.apache.kafka.common.serialization.StringDeserializer" />
            <entry key="value.deserializer" value="org.apache.kafka.common.serialization.StringDeserializer" />
        </map>
    </constructor-arg>
</bean>
<!-- 2.创建consumerFactory bean -->
<bean id="consumerFactory"
      class="org.springframework.kafka.core.DefaultKafkaConsumerFactory" >
    <constructor-arg>
        <ref bean="consumerProperties" />
    </constructor-arg>
</bean>
<!-- 3.定义消费实现类 -->
<bean id="kafkaConsumerService" class="cn.enjoyedu.service.KafkaConsumer" />
<!-- 4.消费者容器配置信息 -->
<bean id="containerProperties" class="org.springframework.kafka.listener.ContainerProperties">
    <constructor-arg name="topics">
        <list>
            <value>kafka-spring-topic</value>
        </list>
    </constructor-arg>
    <property name="messageListener" ref="kafkaConsumerService"></property>
</bean>
<!-- 5.消费者并发消息监听容器,执行doStart()方法 -->
<bean id="messageListenerContainer"
      class="org.springframework.kafka.listener.ConcurrentMessageListenerContainer" init-method="doStart" >
    <constructor-arg ref="consumerFactory" />
    <constructor-arg ref="containerProperties" />
    <property name="concurrency" value="${concurrency}" />
</bean>
生产代码实现

KafkaController消息生产代码实现:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34

@Controller
@RequestMapping("/kafka")
public class KafkaController {
   @Autowired
   private KafkaTemplate<String,String> kafkaTemplate;
   /**
    * @param message
    */
   @ResponseBody
   @RequestMapping("spring")
   public String queueSender(@RequestParam("message")String message){
      String opt="";
      try {
         kafkaTemplate.send("kafka-spring-topic",message);
         opt = "suc";
      } catch (Exception e) {
         opt = e.getCause().toString();
      }
      return opt;
   }
   @ResponseBody
   @RequestMapping("springb")
   public String topicSender(@RequestParam("message")String message){
      String opt = "";
      try {
         kafkaTemplate.send("kafka-spring-topic-b",message);
         opt = "suc";
      } catch (Exception e) {
         opt = e.getCause().toString();
      }
      return opt;
   }
}
消息确认

SendListener发送者确认

1
2
3
4
5
6
7
8
9
10
11

public class SendListener implements ProducerListener {
    public void onSuccess(String topic, Integer partition, Object key, Object value, RecordMetadata recordMetadata) {
        System.out.println("offset:"+recordMetadata.offset()+"-"+"partition:"+recordMetadata.partition());
    }
    public void onError(String topic, Integer partition, Object key, Object value, Exception exception) {
    }
    public boolean isInterestedInSuccess() {
        return true;
    }
}

消费者端

Spring配置文件配置

applicationContext.xml 中进行配置

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39

<!-- 消费者自行确认-1.定义consumer的参数 -->
<bean id="consumerPropertiesAck" class="java.util.HashMap">
    <constructor-arg>
        <map>
            <entry key="bootstrap.servers" value="${bootstrap.servers}" />
            <entry key="group.id" value="spring-kafka-group-ack" />
            <entry key="key.deserializer" value="org.apache.kafka.common.serialization.StringDeserializer" />
            <entry key="value.deserializer" value="org.apache.kafka.common.serialization.StringDeserializer" />
            <entry key="enable.auto.commit" value="false"/>
        </map>
    </constructor-arg>
</bean>
<!-- 消费者自行确认-2.创建consumerFactory bean -->
<bean id="consumerFactoryAck"  class="org.springframework.kafka.core.DefaultKafkaConsumerFactory" >
    <constructor-arg>
        <ref bean="consumerPropertiesAck" />
    </constructor-arg>
</bean>
<!-- 消费者自行确认-3.定义消费实现类 -->
<bean id="kafkaConsumerServiceAck" class="cn.chj.service.KafkaConsumerAck" />
<!-- 消费者自行确认-4.消费者容器配置信息 -->
<bean id="containerPropertiesAck" class="org.springframework.kafka.listener.ContainerProperties">
    <!-- topic -->
    <constructor-arg name="topics">
        <list>
            <value>kafka-spring-topic-b</value>
        </list>
    </constructor-arg>
    <property name="messageListener" ref="kafkaConsumerServiceAck" />
    <!-- 消费者自行确认模式 -->
    <property name="ackMode" value="MANUAL_IMMEDIATE"></property>
</bean>
<!-- 消费者自行确认-5.消费者并发消息监听容器,执行doStart()方法 -->
<bean id="messageListenerContainerAck" class="org.springframework.kafka.listener.ConcurrentMessageListenerContainer" init-method="doStart" >
    <constructor-arg ref="consumerFactoryAck" />
    <constructor-arg ref="containerPropertiesAck" />
    <property name="concurrency" value="${concurrency}" />
</bean>
消费者代码

KafkaConsumer消费者代码

1
2
3
4
5
6
7
8
9
public class KafkaConsumer implements MessageListener<String,String> {
    public void onMessage(ConsumerRecord<String, String> data) {
        String name = Thread.currentThread().getName();
        System.out.println(name+"|"+String.format(
                "主题:%s,分区:%d,偏移量:%d,key:%s,value:%s",
                data.topic(),data.partition(),data.offset(),
                data.key(),data.value()));
    }
}
消费者确认

KafkaConsumerAck消费者确认实现代码:

1
2
3
4
5
6
7
8
9
10

public class KafkaConsumerAck implements AcknowledgingMessageListener<String,String> {
    public void onMessage(ConsumerRecord<String, String> data,Acknowledgment acknowledgment) {
        String name = Thread.currentThread().getName();
        System.out.println(name+"|"+String.format("主题:%s,分区:%d,偏移量:%d,key:%s,value:%s",
                data.topic(),data.partition(),data.offset(),data.key(),data.value()));
        //偏移量确认(手动的过程)
        acknowledgment.acknowledge();
    }
}

SpringBoot和Kafka的整合

pom文件配置

1
2
3
4
5
6
7
8
9
10
11
12
13
14
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
    <groupId>org.springframework.kafka</groupId>
    <artifactId>spring-kafka</artifactId>
    <version>2.2.0.RELEASE</version>
</dependency>
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-test</artifactId>
    <scope>test</scope>
</dependency>

如果你的SpringBoot是2.0.3版本:那么你可以用2.1.7版本的kafka

如果你的SpringBoot比较新,用的2.1.0版本,那么你可以用2.2.0 版本的kafka

统一配置

application.properties文件配置信息

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
#============== kafka ===================
kafka.consumer.zookeeper.connect=localhost:2181
kafka.consumer.servers=localhost:9092
kafka.consumer.enable.auto.commit=true
kafka.consumer.session.timeout=6000
kafka.consumer.auto.commit.interval=100
kafka.consumer.auto.offset.reset=latest
kafka.consumer.topic=test
kafka.consumer.group.id=test
kafka.consumer.concurrency=10

kafka.producer.servers=localhost:9092
kafka.producer.retries=0
kafka.producer.batch.size=4096
kafka.producer.linger=1
kafka.producer.buffer.memory=40960

SpringBoot配置

消息生产端配置代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
@Configuration
@EnableKafka
public class KafkaProducerConfig {
    @Value("${kafka.producer.servers}")
    private String servers;
    @Value("${kafka.producer.retries}")
    private int retries;
    @Value("${kafka.producer.batch.size}")
    private int batchSize;
    @Value("${kafka.producer.linger}")
    private int linger;
    @Value("${kafka.producer.buffer.memory}")
    private int bufferMemory;
    public Map<String, Object> producerConfigs() {
        Map<String, Object> props = new HashMap<>();
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, servers);
        props.put(ProducerConfig.RETRIES_CONFIG, retries);
        props.put(ProducerConfig.BATCH_SIZE_CONFIG, batchSize);
        props.put(ProducerConfig.LINGER_MS_CONFIG, linger);
        props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, bufferMemory);
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        return props;
    }
    public ProducerFactory<String, String> producerFactory() {
        return new DefaultKafkaProducerFactory<>(producerConfigs());
    }
    @Bean
    public KafkaTemplate<String, String> kafkaTemplate() {
        KafkaTemplate kafkaTemplate = new KafkaTemplate<String, String>(producerFactory()) ;
        //kafkaTemplate.setProducerListener();
        return kafkaTemplate;
    }
}
消息消费端配置代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
@Configuration
@EnableKafka
public class KafkaConsumerConfig {
    @Value("${kafka.consumer.servers}")
    private String servers;
    @Value("${kafka.consumer.enable.auto.commit}")
    private boolean enableAutoCommit;
    @Value("${kafka.consumer.session.timeout}")
    private String sessionTimeout;
    @Value("${kafka.consumer.auto.commit.interval}")
    private String autoCommitInterval;
    @Value("${kafka.consumer.group.id}")
    private String groupId;
    @Value("${kafka.consumer.auto.offset.reset}")
    private String autoOffsetReset;
    @Value("${kafka.consumer.concurrency}")
    private int concurrency;
    public Map<String, Object> consumerConfigs() {
        Map<String, Object> propsMap = new HashMap<>();
        propsMap.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, servers);
        propsMap.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, enableAutoCommit);
        propsMap.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, autoCommitInterval);
        propsMap.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, sessionTimeout);
        propsMap.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        propsMap.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        propsMap.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
        propsMap.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, autoOffsetReset);
        return propsMap;
    }
    public ConsumerFactory<String, String> consumerFactory() {
        return new DefaultKafkaConsumerFactory<>(consumerConfigs());
    }
    @Bean
    public MyListener listener() {
        return new MyListener();
    }
    @Bean
    public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>>
    kafkaListenerContainerFactory() {
        ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory());
        factory.setConcurrency(concurrency);
        factory.getContainerProperties().setPollTimeout(1500);
        return factory;
    }
    // 自行确认
    public Map<String, Object> consumerConfigsAck() {
        Map<String, Object> propsMap = new HashMap<>();
        propsMap.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, servers);
        propsMap.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
        propsMap.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        propsMap.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        propsMap.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
        return propsMap;
    }
    public ConsumerFactory<String, String> consumerFactoryAck() {
        return new DefaultKafkaConsumerFactory<>(consumerConfigsAck());
    }
    @Bean("listenerAck")
    public MyListenerAck listenerAck() {
        return new MyListenerAck();
    }
    @Bean("factoryAck")
    public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> kafkaListenerContainerFactoryAck() {
        ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactoryAck());
        factory.setConcurrency(concurrency);
        factory.getContainerProperties().setPollTimeout(1500);
        factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL_IMMEDIATE);
        return factory;
    }
}

消息发送端

消息发送入口

KafkaController消息发送入口代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
@RestController
@RequestMapping("/kafka")
public class KafkaController {
    protected final Logger logger = LoggerFactory.getLogger(this.getClass());
    @Autowired
    private KafkaTemplate kafkaTemplate;
    @RequestMapping(value = "/send")
    public String sendKafka(@RequestParam(required = false) String key,@RequestParam(required = false) String value) {
        try {
            logger.info("kafka的消息={}", value);
            kafkaTemplate.send("test", key, value);
            return "发送kafka成功";
        } catch (Exception e) {
            logger.error("发送kafka异常:", e);
            return "发送kafka失败";
        }
    }
    @RequestMapping(value = "/sendAck")
    public String sendKafkaAck(@RequestParam(required = false) String key, @RequestParam(required = false) String value) {
        try {
            logger.info("kafka的消息={}", value);
            kafkaTemplate.send("testAck", key, value);
            return "发送kafka成功";
        } catch (Exception e) {
            logger.error("发送kafka异常:", e);
            return "发送kafka失败";
        }
    }
}

消息消费端

消息监听
1
2
3
4
5
6
7
8
9

public class MyListener {
    protected final Logger logger = LoggerFactory.getLogger(this.getClass());
    @KafkaListener(topics = {"test"})
    public void listen(ConsumerRecord<?, ?> record) {
        logger.info("收到消息的key: " + record.key());
        logger.info("收到消息的value: " + record.value().toString());
    }
}
消息自行确认
1
2
3
4
5
6
7
8
9
10
11
12
13
public class MyListenerAck {
    protected final Logger logger = LoggerFactory.getLogger(this.getClass());
    @KafkaListener(topics = {"testAck"},containerFactory = "factoryAck")
    public void listen(ConsumerRecord<?, ?> record, Acknowledgment ack) {
        try {
            logger.info("自行确认方式收到消息的key: " + record.key());
            logger.info("自行确认方式收到消息的value: " + record.value().toString());
        } finally {
            logger.info("消息确认!");
            ack.acknowledge();
        }
    }
}

Kafka实战之削峰填谷

后台服务端

环境配置
pom文件配置
1
2
3
4
5
6
7
8
9
10
11
12
13
14
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
    <groupId>org.springframework.kafka</groupId>
    <artifactId>spring-kafka</artifactId>
    <version>2.2.0.RELEASE</version>
</dependency>
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-test</artifactId>
    <scope>test</scope>
</dependency>
application.yml 配置
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
server:
  port: 8090
#============== kafka ===================
kafka:
  consumer:
    zookeeper.connect: localhost:2181
    servers: 127.0.0.1:9092
    enable.auto.commit: true
    session.timeout: 6000
    auto.commit.interval: 100
    auto.offset.reset: latest
    topic: test
    group.id: test
    concurrency: 2
  producer:
    servers: 127.0.0.1:9092
    retries: 0
    batch.size: 4096
    linger: 1
    buffer.memory: 40960
生产端配置

KafkaProducerConfig生产端代码配置如下

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38

@Configuration
@EnableKafka
public class KafkaProducerConfig {
    @Value("${kafka.producer.servers}")
    private String servers;
    @Value("${kafka.producer.retries}")
    private int retries;
    @Value("${kafka.producer.batch.size}")
    private int batchSize;
    @Value("${kafka.producer.linger}")
    private int linger;
    @Value("${kafka.producer.buffer.memory}")
    private int bufferMemory;

    public Map<String, Object> producerConfigs() {
        Map<String, Object> props = new HashMap<>();
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, servers);
        props.put(ProducerConfig.RETRIES_CONFIG, retries);
        props.put(ProducerConfig.BATCH_SIZE_CONFIG, batchSize);
        props.put(ProducerConfig.LINGER_MS_CONFIG, linger);
        props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, bufferMemory);
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        return props;
    }

    public ProducerFactory<String, String> producerFactory() {
        return new DefaultKafkaProducerFactory<>(producerConfigs());
    }

    @Bean
    public KafkaTemplate<String, String> kafkaTemplate() {
        KafkaTemplate kafkaTemplate = new KafkaTemplate<String, String>(producerFactory()) ;
        //kafkaTemplate.setProducerListener();
        return kafkaTemplate;
    }
}
代码实现
业务入口

KafkaController业务入口代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20

@RestController
public class KafkaController {
    protected final Logger logger = LoggerFactory.getLogger(this.getClass());
    @Autowired
    private DBService dbService;
    /**
     * 对外开放的接口,地址为:http://127.0.0.1:8090/buyTicket
     */
    @RequestMapping("/buyTicket")
    public String buyTicket(){
        try {  //模拟出票……
            System.out.println("开始购票业务------");
            return dbService.useDb("select ticket ");
        } catch (Exception e) {
            e.printStackTrace();
        }
        return null;
    }
}
流量整形

使用Kafka提供服务,实现(削峰填谷)流量整形

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91

@Component
public class ConsumerService {
    protected final Logger logger = LoggerFactory.getLogger(this.getClass());
    private static ExecutorService executorService = new ThreadPoolExecutor(2,2,60,

TimeUnit.SECONDS,new SynchronousQueue<>());
    private static List<KafkaConsumer> consumers = new ArrayList<KafkaConsumer>();
    @Autowired
    private DBService dbService;
    @Autowired
    private KafkaTemplate kafkaTemplate;
    @Value("${kafka.consumer.servers}")
    private String servers;
    @Value("${kafka.consumer.enable.auto.commit}")
    private boolean enableAutoCommit;
    @Value("${kafka.consumer.session.timeout}")
    private String sessionTimeout;
    @Value("${kafka.consumer.auto.commit.interval}")
    private String autoCommitInterval;
    @Value("${kafka.consumer.group.id}")
    private String groupId;
    @Value("${kafka.consumer.auto.offset.reset}")
    private String autoOffsetReset;
    @Value("${kafka.consumer.concurrency}")
    private int concurrency;

    public Map<String, Object> consumerConfigs() {
        Map<String, Object> propsMap = new HashMap<>();
        propsMap.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, servers);
        propsMap.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, enableAutoCommit);
        propsMap.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, autoCommitInterval);
        propsMap.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, sessionTimeout);
        propsMap.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        propsMap.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        propsMap.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
        propsMap.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, autoOffsetReset);
        /*控制消费频率,每次只取一条进行消费*/
        propsMap.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG,1);
        return propsMap;
    }
    private static class ConsumerWorker implements Runnable{
        private KafkaConsumer<String,String> consumer;
        private DBService dbService;
        private KafkaTemplate kafkaTemplate;
        public ConsumerWorker(KafkaConsumer<String, String> consumer,DBService dbService,KafkaTemplate kafkaTemplate) {
            this.consumer = consumer;
            this.dbService = dbService;
            this.kafkaTemplate = kafkaTemplate;
            /*因为连接池为2个大小,所以限定主题为2个分区,所以消费者也只有2个*/
            consumer.subscribe(Collections.singletonList("traffic-shaping"));
        }
        public void run() {
            final String id = Thread.currentThread().getId() +"-"+System.identityHashCode(consumer);
            try {
                while(true){
                    ConsumerRecords<String, String> records = consumer.poll(100);
                    for(ConsumerRecord<String, String> record:records){
                        System.out.println(id+"|"+String.format(

"主题:%s,分区:%d,偏移量:%d," + "key:%s,value:%s",

record.topic(),record.partition(),record.offset(),record.key(),record.value()));
                        System.out.println("开始购票业务------");
                        String result = dbService.useDb("select ticket");
                        System.out.println(result+",准备通知客户端");
                        /*主题为10个分区大小,可以更大,因为客户端那边没有削峰的需要,
                        如果需要,一样处理即可*/
                        kafkaTemplate.send("traffic-shaping-result",result);
                    }
                }
            }catch (Exception e) {
                e.printStackTrace();
            }
        }
    }
    @PostConstruct
    public void init(){
        for(int i=0;i<2;i++){/*启动2个消费者*/
            KafkaConsumer<String,String> consumer = new KafkaConsumer<String,String>(consumerConfigs());
            executorService.submit(new ConsumerWorker(consumer,dbService,kafkaTemplate));
            consumers.add(consumer);
        }
    }
    @PreDestroy
    public void destory(){
        for(KafkaConsumer consumer:consumers){
            consumer.close();
        }
    }
}

客户端

环境配置
pom文件配置

省略(同上)

application.properties配置
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
#============== kafka ===================
kafka.consumer.zookeeper.connect=localhost:2181/kafka-one
kafka.consumer.servers=localhost:9092
kafka.consumer.enable.auto.commit=true
kafka.consumer.session.timeout=6000
kafka.consumer.auto.commit.interval=100
kafka.consumer.auto.offset.reset=latest
kafka.consumer.topic=test
kafka.consumer.group.id=test
kafka.consumer.concurrency=10

kafka.producer.servers=localhost:9092
kafka.producer.retries=0
kafka.producer.batch.size=4096
kafka.producer.linger=1
kafka.producer.buffer.memory=40960
生产者配置

KafkaProducerConfig生产者消息配置代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37

@Configuration
@EnableKafka
public class KafkaProducerConfig {
    @Value("${kafka.producer.servers}")
    private String servers;
    @Value("${kafka.producer.retries}")
    private int retries;
    @Value("${kafka.producer.batch.size}")
    private int batchSize;
    @Value("${kafka.producer.linger}")
    private int linger;
    @Value("${kafka.producer.buffer.memory}")
    private int bufferMemory;
    @Autowired
    private SendInfo sendInfo;
    public Map<String, Object> producerConfigs() {
        Map<String, Object> props = new HashMap<>();
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, servers);
        props.put(ProducerConfig.RETRIES_CONFIG, retries);
        props.put(ProducerConfig.BATCH_SIZE_CONFIG, batchSize);
        props.put(ProducerConfig.LINGER_MS_CONFIG, linger);
        props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, bufferMemory);
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        return props;
    }
    public ProducerFactory<String, String> producerFactory() {
        return new DefaultKafkaProducerFactory<>(producerConfigs());
    }
    @Bean
    public KafkaTemplate<String, String> kafkaTemplate() {
        KafkaTemplate kafkaTemplate = new KafkaTemplate<String, String>(producerFactory()) ;
        kafkaTemplate.setProducerListener(sendInfo);
        return kafkaTemplate;
    }
}
发送者监听代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18

@Component
public class SendInfo implements ProducerListener {
    @Override
    public void onSuccess(String topic, Integer partition,Object key,Object value,RecordMetadata recordMetadata) {
        System.out.println(String.format(
                "主题:%s,分区:%d,偏移量:%d," +"key:%s,value:%s",
                recordMetadata.topic(),recordMetadata.partition(), recordMetadata.offset(),key,value));
    }
    @Override
    public void onError(String topic, Integer partition, Object key, Object value, Exception exception) {
        exception.printStackTrace();
    }
    @Override
    public boolean isInterestedInSuccess() {
        return true;
    }
}
消费者配置

消费者消息配置代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48

@Configuration
@EnableKafka
public class KafkaConsumerConfig {
    @Value("${kafka.consumer.servers}")
    private String servers;
    @Value("${kafka.consumer.enable.auto.commit}")
    private boolean enableAutoCommit;
    @Value("${kafka.consumer.session.timeout}")
    private String sessionTimeout;
    @Value("${kafka.consumer.auto.commit.interval}")
    private String autoCommitInterval;
    @Value("${kafka.consumer.group.id}")
    private String groupId;
    @Value("${kafka.consumer.auto.offset.reset}")
    private String autoOffsetReset;
    @Value("${kafka.consumer.concurrency}")
    private int concurrency;
    public Map<String, Object> consumerConfigs() {
        Map<String, Object> propsMap = new HashMap<>();
        propsMap.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, servers);
        propsMap.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, enableAutoCommit);
        propsMap.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, autoCommitInterval);
        propsMap.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, sessionTimeout);
        propsMap.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        propsMap.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        propsMap.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
        propsMap.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, autoOffsetReset);
        return propsMap;
    }
    public ConsumerFactory<String, String> consumerFactory() {
        return new DefaultKafkaConsumerFactory<>(consumerConfigs());
    }
    @Bean
    public MyListener listener() {
        return new MyListener();
    }
    @Bean
    public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>>
    kafkaListenerContainerFactory() {
        ConcurrentKafkaListenerContainerFactory<String, String> factory
                = new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory());
        factory.setConcurrency(concurrency);
        factory.getContainerProperties().setPollTimeout(1500);
        return factory;
    }
}
消费者监听代码
1
2
3
4
5
6
7
public class MyListener {
    protected final Logger logger = LoggerFactory.getLogger(this.getClass());
    @KafkaListener(topics = {"traffic-shaping-result"})
    public void listen(ConsumerRecord<?, ?> record) {
        logger.info("收到服务器的应答: " + record.value().toString());
    }
}
测试类代码
模拟直接访问接口

未经过削峰填谷后的效果

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
@SpringBootTest(classes = KafkaTrafficShapingClient.class)
@RunWith(SpringJUnit4ClassRunner.class)
public class TestInvokeRemote {
    RestTemplate restTemplate = new RestTemplate();
    private static final int num = 1000;
    private final String url = "http://127.0.0.1:8090/buyTicket";
// 倒计时器,用于模拟高并发
    private static CountDownLatch cdl = new CountDownLatch(num);
    @Test
    public void testInvokeRemote() throws InterruptedException {
        //模拟高并发
        for(int i = 0; i <num; i++){
            new Thread(new TicketQuest()).start();
            cdl.countDown(); //0, 所有线程同时起跑
        }
        Thread.currentThread().sleep(3000);
    }

    // 内部类继承线程接口,用于模拟用户买票请求
    public class TicketQuest implements Runnable{
        @Override
        public void run() {
            try {
                cdl.await();//在起跑线等待
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            String str = restTemplate.getForEntity(url, String.class).getBody();
            System.out.println(str);
        }
    }
}
模拟通过kafka访问接口

经过削峰填谷后的效果

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49

@SpringBootTest(classes = KafkaTrafficShapingClient.class)
@RunWith(SpringJUnit4ClassRunner.class)
public class TestKafka {
   // 并发量
   private static final int USER_NUM = 1000;
   // 倒计时器,用于模拟高并发
   private static CountDownLatch cdl = new CountDownLatch(USER_NUM);
   @Autowired
   private KafkaSender kafkaSender;
   /**
   * 并发模拟
   */
   @Test
   public void testKafkaMq() throws InterruptedException {
      // 循环实例化USER_NUM个并发请求(线程)
      for (int i = 0; i < USER_NUM; i++) {
         new Thread(new UserRequst(i)).start();
         cdl.countDown();// 倒计时器减一
      }
      Thread.currentThread().sleep(60000);
   }
// @Test
// public void consumer() throws InterruptedException {
//    Thread.currentThread().sleep(60000);
// }
   /**
   * 内部类继承线程接口,用于模拟买票请求
   */
   public class UserRequst implements Runnable {
      private int id;
      public UserRequst(int id) {
         this.id = id;
      }
      @Override
      public void run() {
         try {
            // 当前线程等待,等所以线程实例化完成后,同时停止等待后调用接口代码
            cdl.await();
         } catch (InterruptedException e) {
            e.printStackTrace();
         }
         String topic = "traffic-shaping";//主题
         String key = "user-"+id;//键
         String value = "123456="+System.currentTimeMillis();
         kafkaSender.messageSender(topic,key,value);
      }
   }
}

评论