kafka分区机制
当我们发送一条数据到kafka,总是希望数据能平均分配到每个服务器上,实现数据的均衡,特别是一些大公司的数据采集,每分钟都可以产生几十上百GB的数据量,甚至更多。因此如何将这么大的数据量均匀地分配到Kafka的各个Broker上,就成为一个非常重要的问题。
kafka分区
kakfa分区即Partition是Topic的物理容器,用于存储消息数据。每个Partition都有一个消息队列,消息会持久化在这个队列中。Partition中的消息都会分配一个从0开始的序列号Offset。而Topic分为若干个分区,也就是说Kafka的消息组织方式实际上是三级结构:主题-分区-消息,主题下的每条消息只会保存在某一个分区中,而不会在多个分区中被保存多份,对于每一个topic, Kafka集群都会维持一个分区日志,如下所示:
那kafka为何要分区呢。Kafka 分区的主要作用有:
存储数据
Partition是物理上的存储和结构单元,用于存储数据。一个Topic包含一个或多个Partition。
提高并行度
Partition可以分布在不同的broker上,实现对数据集的并行读取和写入,提高Kafka的并发能力。
实现负载均衡
可以均匀地将数据分配到不同的Partition,不同的Partition可以分布在不同的机器上,以实现负载均衡。
提高容错性
同一个Partition可以配置多个副本,如果一个副本失效,其他副本可以继续提供服务,提高容错性。
消息排序
每个Partition内都有一个消息偏移量,可以对Partition中的消息进行顺序读写,保证每个Partition内的消息顺序。
隔离性
可以为不同的Topic配置不同数目的Partition,从而隔离不同业务系统的数据和负载。
分区策略
所谓分区策略是决定生产者将消息发送到哪个分区的算法。具体分区策略如下:
轮询调度法(Round Robin)
每个Partition依次分配给Producer的消息。优点是负载均衡,缺点是无法保证顺序。轮询策略有非常优秀的负载均衡表现,它总是能保证消息最大限度地被平均分配到所有分区上,故默认情况下它是最合理的分区策略,也是我们最常用的分区策略之一。
哈希取模法(Hashing)
对消息key进行hash计算,然后对Partition数量取模,确定Partition。保证同一key到同一Partition,由此保证消息顺序。但Partition分配可能不均。
关键字哈希取模法(Key Hashing)
仅仅对消息key进行hash取模,使同一key总是到同一Partition。保证顺序,但分配可能不均匀。
自定义分区器(Custom Partitioner)
允许自定义Partition分配逻辑,适用于有特殊分区需求的场景。比如固定某类消息到特定Partition。
粘性分区器(Sticky Partitioner)
尽可能使用上次的Partition,使生产消费在同一Broker,减少网络传输。但新Consumer随机分配。
对列分区器(Column Partitioner)
依据Schema信息进行分区,需要Schema Registry。可以根据字段哈希取模分区。
地理位置分区器
根据地理位置信息分配最亲近的分区,减少网络延迟。
自定义分区策略
实例代码如下:
package cn.yyzmain.kafka.config;
// 导包
@Slf4j
@Component
public class MainCustomPartitioner implements Partitioner {
@Override
public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
// 获取主题分区数
List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
int numPartitions = partitions.size();
// 按数据开头数据分配分区
final String message = String.valueOf(value);
if (message.startsWith("a-")) {
return 0;
}
if (message.startsWith("b-")) {
return 1;
}
if (message.startsWith("c-")) {
return 2;
}
// 负载均衡分区
return Math.abs(message.hashCode() % numPartitions);
}
@Override
public void close() {
//
}
@Override
public void configure(Map<String, ?> configs) {
//
}
}
新增配置
spring:
kafka:
producer:
# 自定义分区策略
properties:
partitioner:
class: cn.yyzmain.kafka.config.MainCustomPartitioner