Skip to content

Kafka基础及核心概念

简介

Kafka是一个开源的分布式流处理平台,使用scala语言编写,广泛应用于日志收集、事件驱动架构等场景。

TIP

  • 批处理:处理静态的、有界的数据流(如昨天的日志文件)
  • 流处理:处理连续产生的、动态的、无界的数据流(如实时日志、订单等)

核心概念

Producer

Producer(生产者) 是 Apache Kafka 中负责向主题(Topic)发送消息的客户端组件。它是数据管道的起点,设计目标是实现高吞吐、低延迟、可靠的消息传输。

消息发送流程

  • 序列化与分区
    • 将消息的 Key 和 Value 序列化(如使用 StringSerializer、AvroSerializer)。
    • 根据 分区策略 确定消息应发送到 Topic 的哪个分区(Partition)。
      • 默认策略:若指定 Key,按 Key 的哈希值选择分区;若无 Key,采用轮询(Round-Robin)。
  • 批次聚合
    • 消息不会立即发送,而是按批次(Batch)聚合,减少网络请求次数。
    • 批次大小由 batch.size 控制,等待时间由 linger.ms 控制(如 linger.ms=100 表示最多等待 100ms 发送批次)。
  • 发送到 Broker
    • 批次数据通过 Sender 线程 异步发送到目标分区的 Leader Broker。
    • Broker 接收后写入本地日志(Log Segment),并根据副本机制同步到 Follower。
  • 确认与重试
    • Broker 返回确认(ACK)后,Producer 标记消息发送成功。
    • 若发送失败(如网络问题、Leader 切换),Producer 自动重试(次数由 retries 配置)。

核心配置与优化

  • 可靠性配置
    • acks 控制 Broker 对消息持久化的确认级别。
      • acks=0:不等待确认,吞吐量最高,可能丢失数据。
      • acks=1:Leader 写入本地日志即确认,均衡可靠性与性能(默认)。
      • acks=all(或 acks=-1):等待所有 ISR(In-Sync Replica)副本确认,最可靠,但延迟更高。
    • enable.idempotence=true
      启用幂等性,避免网络重试导致的消息重复(通过唯一 PID 和序列号去重)。
    • max.in.flight.requests.per.connection
      单连接未完成请求的最大数。若需严格顺序性,设置为 1(需启用幂等性)。
  • 性能优化
    • batch.size 与 linger.ms
      增大批次大小,提升吞吐量
    • compression.type
      启用压缩(如 snappy、lz4),减少网络传输量
    • buffer.memory
      增大缓冲区内存(默认 32MB),避免因积压导致阻塞
    • max.block.ms
      缓冲区满或元数据获取阻塞时的最大等待时间,避免无限阻塞

高级配置

自定义分区器

实现 Partitioner 接口,按业务逻辑选择分区(如按用户 ID 哈希)。

java
public class CustomPartitioner implements Partitioner {
    @Override
    public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
        List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
        return (key.hashCode() & Integer.MAX_VALUE) % partitions.size();
    }
}
拦截器

实现 ProducerInterceptor,在发送前后添加逻辑(如日志、监控)。

java
public class LoggingInterceptor implements ProducerInterceptor<String, String> {
    @Override
    public ProducerRecord<String, String> onSend(ProducerRecord<String, String> record) {
        System.out.println("Sending: " + record.value());
        return record;
    }
}

消息发送模式

其实 Kafka 发送消息本质都是通过异步发送消息的,通过 Future 接收发送结果

  • 同步发送
    调用 send() 后立即调用 get(),阻塞等待结果。
    java
    Future<RecordMetadata> future = producer.send(record);
    RecordMetadata metadata = future.get(); // 阻塞
  • 异步发送
    通过回调处理结果,避免阻塞主线程。
    java
    producer.send(record, (metadata, exception) -> {
      if (exception != null) {
          System.err.println("发送失败: " + exception.getMessage());
      } else {
          System.out.println("发送成功,分区: " + metadata.partition());
      }
    });

监控指标

  • record-send-rate:消息发送速率(条/秒)。
  • request-latency-avg:平均请求延迟。
  • record-queue-time-avg:消息在缓冲区的平均等待时间。
  • records-per-request-avg:每个请求的平均消息数。
  • compression-rate-avg:压缩率(节省的带宽比例)。 通过监控工具(如 JMX、Prometheus)观察指标,调整配置优化性能。

Broker

Broker 是 Kafka 集群中的核心组件,负责消息的存储、管理、副本同步和客户端请求处理。每个 Broker 是一个独立的服务器节点,多个 Broker 协同工作构成分布式集群。

核心职责

消息存储与管理
  • 分区(Partition)存储
    • 每个 Topic 划分为多个 Partition,每个 Partition 在 Broker 上以有序、不可变的日志(Log Segment)形式存储。
    • Partition 的物理路径示例:/tmp/kafka-logs/topic-0/00000000000000000000.log。
  • 日志分段(Log Segment)
    • 将 Partition 的日志切分为多个 Segment 文件(默认 1GB),便于管理和清理。
    • 每个 Segment 包含一个数据文件(.log)和两个索引文件(.index 和 .timeindex),加速消息查找。
  • 副本管理(Replication)
    • Leader 与 Follower
      • 每个 Partition 有多个副本(Replica),其中一个为 Leader,处理读写请求;其他为 Follower,从 Leader 同步数据。
      • ISR(In-Sync Replica):与 Leader 保持同步的副本集合。只有 ISR 中的副本有资格被选举为 Leader。
    • 副本同步机制
      • Follower 定期向 Leader 发送 FETCH 请求,拉取最新消息并写入本地日志。
      • 若 Follower 长时间未同步(replica.lag.time.max.ms,默认 30s),会被移出 ISR。
  • 客户端请求处理
    • 生产者请求:接收 Producer 发送的消息,写入 Leader Partition 的日志文件。
    • 消费者请求:响应 Consumer 的拉取请求,从指定 Offset 读取消息。
    • 元数据请求:向客户端返回集群元数据(如 Topic 分区分布、Leader 节点地址)。
  • 集群协调
    • Controller 选举
      • 集群中某个 Broker 被选举为 Controller,负责 Partition 的 Leader 选举、副本分配等管理任务。
      • 基于 ZooKeeper(旧版本)或 KRaft(新版本,Kafka 3.3+ 内置元数据仲裁)实现选举。
    • 分区重平衡
      • 当 Broker 宕机或扩容时,Controller 重新分配 Partition 的 Leader 和副本。
Broker存储机制
  • 日志文件结构
    • 数据文件(.log)
      • 存储消息的二进制数据,按 Offset 顺序追加写入。
      • 每条消息包含 Offset、Timestamp、Key、Value 等信息。
    • 偏移量索引(.index)
      • 稀疏索引,记录 Offset 到物理位置的映射(如 Offset=100 → 文件位置 1024)。
      • 消费者可通过二分查找快速定位消息。
    • 时间戳索引(.timeindex)
      • 记录 Timestamp 到 Offset 的映射,支持按时间范围查询消息。
  • 消息清理策略
    • 基于时间
      • 删除早于 retention.ms(默认 7 天)的旧消息。
    • 基于大小
      • 删除超出 retention.bytes 限制的旧消息。
    • 日志压缩(Log Compaction)
      • 保留每个 Key 的最新值,适用于需要精确恢复状态的场景(如数据库变更日志)。
  • 高效读写设计
    • 顺序写入:消息追加到日志末尾,充分利用磁盘顺序 I/O 性能。
    • 零拷贝(Zero-Copy)
      • 使用 sendfile 系统调用,将磁盘文件直接发送到网络,减少内核态与用户态的数据拷贝。
      • 由参数 socket.send.buffer.bytes 控制发送缓冲区大小。

高可用性与容错

  • Leader 选举
    • 故障场景
      • Leader Broker 宕机时,Controller 从 ISR 中选举新的 Leader。
      • 若 ISR 为空,根据 unclean.leader.election.enable 决定是否允许非 ISR 副本成为 Leader(可能丢失数据)。
    • 选举过程
      • Controller 监控 Broker 状态,触发选举并更新元数据。
      • 客户端通过元数据请求获取新 Leader 地址。
  • 副本同步
    • 同步延迟控制
      • 参数 replica.lag.time.max.ms 决定 Follower 的最大允许延迟时间(默认 30s)。
      • 延迟超限的 Follower 被移出 ISR,避免影响写入性能。
    • 最小同步副本数
      • 参数 min.insync.replicas(默认 1)定义写入成功所需的最小 ISR 副本数。
      • 若 ISR 副本数不足,Producer 发送会抛出 NotEnoughReplicasException。
  • 数据持久化
    • 刷盘策略
      • 参数 log.flush.interval.messages 和 log.flush.interval.ms 控制日志刷盘频率。
      • 生产环境中通常依赖副本机制保证数据安全,而非频繁刷盘。

Broker配置与优化

  • 核心配置参数
    • 基础配置
      • broker.id:Broker 的唯一标识(整数)。
      • listeners:客户端连接的监听地址(如 PLAINTEXT://:9092)。
      • log.dirs:日志文件存储目录(建议配置多个磁盘路径提升吞吐)。
    • 副本与 ISR
      • default.replication.factor:新建 Topic 的默认副本数(建议 ≥3)。
      • num.replica.fetchers:副本同步线程数,影响同步速度。
    • 资源限制
      • num.network.threads:处理网络请求的线程数(默认 3)。
      • num.io.threads:处理磁盘 I/O 的线程数(默认 8,建议根据磁盘数量调整)。
  • 性能优化
    • 磁盘与文件系统
      • 使用 SSD 或 RAID 提升 I/O 性能。
      • 文件系统挂载参数建议:noatime(禁用访问时间更新)。
    • JVM 调优
      • 堆内存分配(如 -Xmx8G -Xms8G),避免频繁 GC。
      • 使用 G1 垃圾回收器:-XX:+UseG1GC。
    • 网络优化
      • 调整 socket.send.buffer.bytes 和 socket.receive.buffer.bytes 提升网络吞吐。
      • 启用网络压缩(与 Producer/Consumer 压缩配置一致)。
  • 监控指标
    • 分区状态
      • UnderReplicatedPartitions:未充分同步的分区数(理想值为 0)。
      • ActiveControllerCount:当前 Controller 数量(正常为 1)。
    • 资源使用
      • NetworkProcessorAvgIdlePercent:网络线程空闲率(过低需增加线程数)。
      • LogFlushRateAndTimeMs:日志刷盘延迟。
    • 请求处理
      • RequestHandlerAvgIdlePercent:请求处理线程空闲率。
      • ProduceRequestRate 和 FetchRequestRate:生产/消费请求速率。

Broker 运维实践

  • 扩容与缩容
    • 扩容
      • 新增 Broker 后,使用 kafka-reassign-partitions.sh 工具重新分配 Partition。
      • 示例命令
      bash
      bin/kafka-reassign-partitions.sh --bootstrap-server localhost:9092 --reassignment-json-file reassign.json --execute
    • 缩容
      • 将要移除的 Broker 上的所有副本迁移到其他节点,再关闭 Broker。
  • 数据迁移与平衡
    • 副本分配策略
      • 确保 Partition 的副本分布在不同的机架(broker.rack 配置),避免单点故障。
      • 使用 kafka-topics.sh --describe 检查副本分布。
  • 平衡工具
    • 使用 kafka-reassign-partitions.sh 或第三方工具(如 Cruise Control)自动平衡负载。
  • 常见问题处理
    • Leader 不可用
      • 检查 Broker 是否宕机,或网络分区导致 ZooKeeper/KRaft 会话超时。
      • 手动触发 Leader 选举:kafka-leader-election.sh。
    • 磁盘空间不足
      • 清理旧数据:调整 retention.ms 或手动删除过期 Segment。
      • 扩展 log.dirs 目录或挂载新磁盘。
    • ISR 频繁收缩
      • 检查 Follower 同步延迟原因(如网络带宽、磁盘 I/O 瓶颈)。
      • 增大 replica.lag.time.max.ms 或优化同步线程数。

新旧版本

  • 旧版本(依赖 ZooKeeper)
    • ZooKeeper 存储集群元数据(Broker 注册、Topic 配置、Controller 选举)。
  • 新版本(KRaft 模式)
    • Kafka 自身通过 Raft 协议管理元数据,无需 ZooKeeper,简化部署。

Topic

Topic(主题) 是 Kafka 中消息的逻辑分类单元,类似于数据库中的“表”或文件系统中的“文件夹”。所有消息的生产、存储和消费均围绕 Topic 展开。

Partition

Partition(分区) 是 Kafka 实现分布式、高吞吐能力的核心设计,每个 Topic 由多个 Partition 组成,每个 Partition 是一个有序、不可变的消息序列。

核心机制

物理存储
  • 日志分段(Log Segment)
    • Partition 的日志被切分为多个 Segment 文件(默认 1GB),每个 Segment 包含
      • 数据文件(.log)
        • 按 Offset 顺序存储消息
        • 文件命名规则:[起始Offset].log(如 00000000000000000000.log)
      • 偏移量索引(.index) 稀疏索引,记录 Offset 到物理位置的映射
      • 时间戳索引(.timeindex) 支持按时间范围查找消息
  • 顺序写入
    • 新消息追加到当前活跃 Segment 末尾,充分利用磁盘顺序 I/O 的高性能。
    • 写入完成后更新 LEO(Log End Offset),表示最新消息位置。
消息顺序性
  • 分区内有序
    • 同一 Partition 内的消息按写入顺序分配递增 Offset,保证严格有序
    • 生产者可通过分区键(Key)将需保证顺序的消息发送到同一 Partition
  • 跨分区无序
    • 不同 Partition 之间的消息顺序不保证,需业务逻辑处理(如全局排序需额外设计)
读写机制
  • 生产者写入
    • 消息通过 Leader Partition 写入,Follower 异步拉取数据同步
    • 写入成功条件由 acks 参数控制(如 acks=all 需所有 ISR 副本确认)
  • 消费者读取
    • 消费者从指定 Offset 拉取消息,支持顺序消费或随机跳转(通过 seek() API)
    • 消费者组(Consumer Group)中每个消费者实例独立负责一组 Partition 的消费

设计原则

  • 分区数选择
    • 吞吐量
      • 单个 Partition 的吞吐量约为 10~50MB/s(取决于磁盘和网络性能)
      • 目标总吞吐量 ÷ 单分区吞吐量 ≈ 最小分区数(需预留扩展空间)
    • 并行度
      • 分区数决定消费者的最大并行度(每个 Partition 只能被同一消费者组的一个实例消费)
      • 示例:若需 10 个消费者并行消费,Topic 至少需要 10 个 Partition。
    • 业务需求
      • 需保证顺序的消息必须分配到同一 Partition(如订单号作为 Key)。
      • 分区数过多可能导致元数据管理开销增加(ZooKeeper/KRaft 压力)。
  • 副本策略
    • 副本数(Replication Factor)
      • 生产环境建议 replication.factor=3,确保高可用性
      • 副本分布在不同机架(通过 broker.rack 配置)避免单点故障
    • Leader 选举
      • Leader 负责处理读写请求,Follower 仅同步数据
      • 若 Leader 宕机,Controller 从 ISR(In-Sync Replicas)中选举新 Leader
  • 数据保留与清理
    • 时间保留
      • retention.ms=604800000(默认 7 天):删除过期 Segment
    • 空间保留
      • retention.bytes=-1(默认无限制):按总大小清理旧数据
    • 日志压缩(Log Compaction)
      • 保留每个 Key 的最新值,适用于状态更新场景(如用户配置变更)
      • 通过 cleanup.policy=compact 启用

副本管理

  • 副本同步机制
    • ISR 列表
      • 包含所有与 Leader 保持同步的副本,由 Leader 动态维护
      • Follower 同步延迟超过 replica.lag.time.max.ms(默认 30s)会被移出 ISR
    • 同步过程
      • Follower 定期向 Leader 发送 FETCH 请求,拉取新消息并写入本地日志
      • Leader 更新 HW(High Watermark),表示所有 ISR 副本已同步的最高 Offset
  • 写入可靠性
    • ACK 机制
      • acks=0:无需确认,可能丢失数据
      • acks=1(默认):Leader 写入即确认,可能丢失未同步的副本数据
      • acks=all:需所有 ISR 副本确认,保证数据不丢失
    • 最小同步副本数
      • min.insync.replicas=2:若 ISR 副本数不足,Producer 抛出 NotEnoughReplicasException
  • 故障恢复
    • Leader 选举
      • Controller 监控 Broker 状态,从 ISR 中选择新 Leader
      • 若 ISR 为空且 unclean.leader.election.enable=true,允许非 ISR 副本成为 Leader(可能丢失数据)
    • 数据一致性
      • 消费者只能读取到 HW 之前的消息,确保故障恢复后不会读到未提交数据

性能优化

  • 硬件与配置优化
    • 磁盘性能
      • 使用 SSD 或 RAID 提升 I/O 吞吐量
      • 挂载参数建议:noatime(禁用文件访问时间更新)
    • 网络优化
      • 增大 socket.send.buffer.bytes 和 socket.receive.buffer.bytes(默认 100KB)
      • 启用压缩(compression.type=snappy)减少网络传输量
  • JVM 与系统调优
    • 堆内存分配
      • Broker 堆内存建议 6~8GB(避免过大导致 GC 停顿)
      • 使用 G1 垃圾回收器:-XX:+UseG1GC
    • 文件描述符限制
      • 增大 ulimit -n(如 100000),避免 “Too many open files” 错误
  • 分区与负载均衡
    • 避免热点分区
      • 均匀分配分区键(如哈希散列),防止单个 Partition 负载过高
      • 监控 BytesInPerSec 和 BytesOutPerSec 发现热点
    • 动态扩容
      • 使用 kafka-reassign-partitions.sh 工具迁移 Partition 到新 Broker
      • 扩容后需调整生产者分区策略以利用新分区

Replica

Consumer

Consumer Group