SpringBoot整合kafka
本章节说说kafka的安装部署,分为centosOS宿主机部署和docker部署
添加依赖
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
添加配置
spring:
kafka:
bootstrap-servers: 192.168.88.129:9092
consumer:
group-id: cn-yyzmain
auto-offset-reset: earliest
enable-auto-commit: false
max-poll-records: 5
session-timeout: 60000
max-poll-interval-ms: 300000
listener:
ack-mode: MANUAL_IMMEDIATE
concurrency: 1
添加配置文件
package cn.yyzmain.kafka.config;
// 导包
@Configuration
@EnableKafka
@Slf4j
public class KafkaConfig {
@Value("${spring.kafka.bootstrap-servers}")
private String servers;
@Value("${spring.kafka.consumer.session-timeout}")
private String sessionTimeout;
@Value("${spring.kafka.consumer.group-id}")
private String groupId;
@Value("${spring.kafka.consumer.auto-offset-reset}")
private String autoOffsetReset;
@Value("${spring.kafka.consumer.max-poll-records}")
private String maxPollRecords;
@Value("${spring.kafka.consumer.max-poll-interval-ms}")
private String maxPollIntervalMs;
@Value("${spring.kafka.listener.concurrency:5}")
private int concurrency;
@Bean
public KafkaListenerContainerFactory<?> batchFactory() {
ConcurrentKafkaListenerContainerFactory<Integer, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(new DefaultKafkaConsumerFactory<>(consumerConfigs()));
//跟分区数对应,可以提高消费速率
factory.setConcurrency(concurrency);
//设置为批量消费,每个批次数量在Kafka配置参数中设置
factory.setBatchListener(true);
//设置手动提交ackMode
factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL_IMMEDIATE);
return factory;
}
private Map<String, Object> consumerConfigs() {
Map<String, Object> propsMap = new HashMap<>();
//设置服务地址
propsMap.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, servers);
//设置分组信息
propsMap.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
//设置偏移量读取类型
propsMap.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, autoOffsetReset);
//设置是否自动提交
propsMap.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
//设置单次数据处理会话超时时间
propsMap.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, sessionTimeout);
propsMap.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
propsMap.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
//设置批量消费大小
propsMap.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, maxPollRecords);
//设置批量拉取间隔大小
propsMap.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, maxPollIntervalMs);
return propsMap;
}
}
发送数据
package cn.yyzmain.kafka.product;
// 导包
@Service
@Slf4j
@RequiredArgsConstructor
public class MainProduct {
private final KafkaTemplate<String, String> kafkaTemplate;
public void sendMessage(String topic, String msg) {
log.info("开始发送kafka数据,topic:{}, msg:{}", topic, msg);
kafkaTemplate.send(topic, msg);
log.info("====================end=======================");
}
public void sendMessageCallback(String topic, String msg) {
log.info("开始发送kafka数据,topic:{}, msg:{}", topic, msg);
ListenableFuture<SendResult<String, String>> result = kafkaTemplate.send(topic, msg);
result.addCallback(sendResult -> log.info("==>>>消息发送成功..."), throwable -> {
log.error("==>>>消息发送失败!", throwable);
// 兜底处理
result.isCancelled();
});
log.info("====================end=======================");
}
}
消费数据
package cn.yyzmain.kafka.listen;
// 导包
@Service
@Slf4j
@RequiredArgsConstructor
public class MainListen {
@KafkaListener(topics = "${app.main.yyzmain:yyzmain}", containerFactory = "batchFactory")
public void handleReceiveDataTopic(List<ConsumerRecord<String, String>> records, Acknowledgment ack) {
try {
records.forEach(v -> log.info("消费到数据: {}", v));
} catch (Exception e) {
log.error("处理异常,{}", e.getMessage(), e);
} finally {
//消息处理完毕,手动提交
ack.acknowledge();
}
}
}