1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91
| @Component public class ConsumerService { protected final Logger logger = LoggerFactory.getLogger(this.getClass()); private static ExecutorService executorService = new ThreadPoolExecutor(2,2,60, TimeUnit.SECONDS,new SynchronousQueue<>()); private static List<KafkaConsumer> consumers = new ArrayList<KafkaConsumer>(); @Autowired private DBService dbService; @Autowired private KafkaTemplate kafkaTemplate; @Value("${kafka.consumer.servers}") private String servers; @Value("${kafka.consumer.enable.auto.commit}") private boolean enableAutoCommit; @Value("${kafka.consumer.session.timeout}") private String sessionTimeout; @Value("${kafka.consumer.auto.commit.interval}") private String autoCommitInterval; @Value("${kafka.consumer.group.id}") private String groupId; @Value("${kafka.consumer.auto.offset.reset}") private String autoOffsetReset; @Value("${kafka.consumer.concurrency}") private int concurrency; public Map<String, Object> consumerConfigs() { Map<String, Object> propsMap = new HashMap<>(); propsMap.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, servers); propsMap.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, enableAutoCommit); propsMap.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, autoCommitInterval); propsMap.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, sessionTimeout); propsMap.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); propsMap.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); propsMap.put(ConsumerConfig.GROUP_ID_CONFIG, groupId); propsMap.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, autoOffsetReset); propsMap.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG,1); return propsMap; } private static class ConsumerWorker implements Runnable{ private KafkaConsumer<String,String> consumer; private DBService dbService; private KafkaTemplate kafkaTemplate; public ConsumerWorker(KafkaConsumer<String, String> consumer,DBService dbService,KafkaTemplate kafkaTemplate) { this.consumer = consumer; this.dbService = dbService; this.kafkaTemplate = kafkaTemplate; consumer.subscribe(Collections.singletonList("traffic-shaping")); } public void run() { final String id = Thread.currentThread().getId() +"-"+System.identityHashCode(consumer); try { while(true){ ConsumerRecords<String, String> records = consumer.poll(100); for(ConsumerRecord<String, String> record:records){ System.out.println(id+"|"+String.format( "主题:%s,分区:%d,偏移量:%d," + "key:%s,value:%s", record.topic(),record.partition(),record.offset(),record.key(),record.value())); System.out.println("开始购票业务------"); String result = dbService.useDb("select ticket"); System.out.println(result+",准备通知客户端");
kafkaTemplate.send("traffic-shaping-result",result); } } }catch (Exception e) { e.printStackTrace(); } } } @PostConstruct public void init(){ for(int i=0;i<2;i++){ KafkaConsumer<String,String> consumer = new KafkaConsumer<String,String>(consumerConfigs()); executorService.submit(new ConsumerWorker(consumer,dbService,kafkaTemplate)); consumers.add(consumer); } } @PreDestroy public void destory(){ for(KafkaConsumer consumer:consumers){ consumer.close(); } } }
|