SpringBoot整合kafka

本章节说说kafka的安装部署,分为centosOS宿主机部署和docker部署

添加依赖

<dependency>
     <groupId>org.springframework.kafka</groupId>
     <artifactId>spring-kafka</artifactId>
</dependency>

添加配置

spring:
  kafka:
    bootstrap-servers: 192.168.88.129:9092
    consumer:
      group-id: cn-yyzmain
      auto-offset-reset: earliest
      enable-auto-commit: false
      max-poll-records: 5
      session-timeout: 60000
      max-poll-interval-ms: 300000
    listener:
      ack-mode: MANUAL_IMMEDIATE
      concurrency: 1

添加配置文件

package cn.yyzmain.kafka.config;
// 导包
@Configuration
@EnableKafka
@Slf4j
public class KafkaConfig {
​
    @Value("${spring.kafka.bootstrap-servers}")
    private String servers;
    @Value("${spring.kafka.consumer.session-timeout}")
    private String sessionTimeout;
    @Value("${spring.kafka.consumer.group-id}")
    private String groupId;
    @Value("${spring.kafka.consumer.auto-offset-reset}")
    private String autoOffsetReset;
    @Value("${spring.kafka.consumer.max-poll-records}")
    private String maxPollRecords;
​
    @Value("${spring.kafka.consumer.max-poll-interval-ms}")
    private String maxPollIntervalMs;
    @Value("${spring.kafka.listener.concurrency:5}")
    private int concurrency;
​
    @Bean
    public KafkaListenerContainerFactory<?> batchFactory() {
        ConcurrentKafkaListenerContainerFactory<Integer, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(new DefaultKafkaConsumerFactory<>(consumerConfigs()));
        //跟分区数对应,可以提高消费速率
        factory.setConcurrency(concurrency);
        //设置为批量消费,每个批次数量在Kafka配置参数中设置
        factory.setBatchListener(true);
        //设置手动提交ackMode
        factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL_IMMEDIATE);
        return factory;
    }
​
    private Map<String, Object> consumerConfigs() {
        Map<String, Object> propsMap = new HashMap<>();
        //设置服务地址
        propsMap.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, servers);
        //设置分组信息
        propsMap.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
        //设置偏移量读取类型
        propsMap.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, autoOffsetReset);
        //设置是否自动提交
        propsMap.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
        //设置单次数据处理会话超时时间
        propsMap.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, sessionTimeout);
        propsMap.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        propsMap.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        //设置批量消费大小
        propsMap.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, maxPollRecords);
        //设置批量拉取间隔大小
        propsMap.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, maxPollIntervalMs);
        return propsMap;
    }
​
}
​

发送数据

package cn.yyzmain.kafka.product;
​
// 导包
​
@Service
@Slf4j
@RequiredArgsConstructor
public class MainProduct {
​
    private final KafkaTemplate<String, String> kafkaTemplate;
​
    public void sendMessage(String topic, String msg) {
        log.info("开始发送kafka数据,topic:{}, msg:{}", topic, msg);
        kafkaTemplate.send(topic, msg);
        log.info("====================end=======================");
    }
​
    public void sendMessageCallback(String topic, String msg) {
        log.info("开始发送kafka数据,topic:{}, msg:{}", topic, msg);
        ListenableFuture<SendResult<String, String>> result = kafkaTemplate.send(topic, msg);
        result.addCallback(sendResult -> log.info("==>>>消息发送成功..."), throwable -> {
            log.error("==>>>消息发送失败!", throwable);
            // 兜底处理
            result.isCancelled();
        });
        log.info("====================end=======================");
    }
}
​

消费数据

package cn.yyzmain.kafka.listen;
​
// 导包
​
@Service
@Slf4j
@RequiredArgsConstructor
public class MainListen {
​
​
    @KafkaListener(topics = "${app.main.yyzmain:yyzmain}", containerFactory = "batchFactory")
    public void handleReceiveDataTopic(List<ConsumerRecord<String, String>> records, Acknowledgment ack) {
        try {
            records.forEach(v -> log.info("消费到数据: {}", v));
        } catch (Exception e) {
            log.error("处理异常,{}", e.getMessage(), e);
        } finally {
            //消息处理完毕,手动提交
            ack.acknowledge();
        }
    }
​
}
​

最后

参考代码:https://github.com/yyzmain