kafka分区机制

当我们发送一条数据到kafka,总是希望数据能平均分配到每个服务器上,实现数据的均衡,特别是一些大公司的数据采集,每分钟都可以产生几十上百GB的数据量,甚至更多。因此如何将这么大的数据量均匀地分配到Kafka的各个Broker上,就成为一个非常重要的问题。

kafka分区

kakfa分区即Partition是Topic的物理容器,用于存储消息数据。每个Partition都有一个消息队列,消息会持久化在这个队列中。Partition中的消息都会分配一个从0开始的序列号Offset。而Topic分为若干个分区,也就是说Kafka的消息组织方式实际上是三级结构:主题-分区-消息,主题下的每条消息只会保存在某一个分区中,而不会在多个分区中被保存多份,对于每一个topic, Kafka集群都会维持一个分区日志,如下所示:

那kafka为何要分区呢。Kafka 分区的主要作用有:

  1. 存储数据

    Partition是物理上的存储和结构单元,用于存储数据。一个Topic包含一个或多个Partition。

  2. 提高并行度

    Partition可以分布在不同的broker上,实现对数据集的并行读取和写入,提高Kafka的并发能力。

  3. 实现负载均衡

    可以均匀地将数据分配到不同的Partition,不同的Partition可以分布在不同的机器上,以实现负载均衡。

  4. 提高容错性

    同一个Partition可以配置多个副本,如果一个副本失效,其他副本可以继续提供服务,提高容错性。

  5. 消息排序

    每个Partition内都有一个消息偏移量,可以对Partition中的消息进行顺序读写,保证每个Partition内的消息顺序。

  6. 隔离性

可以为不同的Topic配置不同数目的Partition,从而隔离不同业务系统的数据和负载。

分区策略

所谓分区策略是决定生产者将消息发送到哪个分区的算法。具体分区策略如下:

  1. 轮询调度法(Round Robin)

    每个Partition依次分配给Producer的消息。优点是负载均衡,缺点是无法保证顺序。轮询策略有非常优秀的负载均衡表现,它总是能保证消息最大限度地被平均分配到所有分区上,故默认情况下它是最合理的分区策略,也是我们最常用的分区策略之一。

  2. 哈希取模法(Hashing)

    对消息key进行hash计算,然后对Partition数量取模,确定Partition。保证同一key到同一Partition,由此保证消息顺序。但Partition分配可能不均。

  3. 关键字哈希取模法(Key Hashing)

    仅仅对消息key进行hash取模,使同一key总是到同一Partition。保证顺序,但分配可能不均匀。

  4. 自定义分区器(Custom Partitioner)

    允许自定义Partition分配逻辑,适用于有特殊分区需求的场景。比如固定某类消息到特定Partition。

  5. 粘性分区器(Sticky Partitioner)

    尽可能使用上次的Partition,使生产消费在同一Broker,减少网络传输。但新Consumer随机分配。

  6. 对列分区器(Column Partitioner)

    依据Schema信息进行分区,需要Schema Registry。可以根据字段哈希取模分区。

  7. 地理位置分区器

    根据地理位置信息分配最亲近的分区,减少网络延迟。

自定义分区策略

实例代码如下:

package cn.yyzmain.kafka.config;
​
// 导包
​
@Slf4j
@Component
public class MainCustomPartitioner implements Partitioner {
​
    @Override
    public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
        // 获取主题分区数
        List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
        int numPartitions = partitions.size();
​
        // 按数据开头数据分配分区
        final String message = String.valueOf(value);
        if (message.startsWith("a-")) {
            return 0;
        }
        if (message.startsWith("b-")) {
            return 1;
        }
        if (message.startsWith("c-")) {
            return 2;
        }
​
        // 负载均衡分区
        return Math.abs(message.hashCode() % numPartitions);
    }
​
    @Override
    public void close() {
        //
    }
​
    @Override
    public void configure(Map<String, ?> configs) {
        //
    }
​
}

新增配置

spring:
  kafka:
    producer:
      # 自定义分区策略
      properties:
        partitioner:
          class: cn.yyzmain.kafka.config.MainCustomPartitioner