Kafka基础及核心概念
简介
Kafka是一个开源的分布式流处理平台,使用scala语言编写,广泛应用于日志收集、事件驱动架构等场景。
TIP
- 批处理:处理静态的、有界的数据流(如昨天的日志文件)
- 流处理:处理连续产生的、动态的、无界的数据流(如实时日志、订单等)
核心概念
Producer
Producer(生产者) 是 Apache Kafka 中负责向主题(Topic)发送消息的客户端组件。它是数据管道的起点,设计目标是实现高吞吐、低延迟、可靠的消息传输。
消息发送流程
- 序列化与分区
- 将消息的 Key 和 Value 序列化(如使用 StringSerializer、AvroSerializer)。
- 根据 分区策略 确定消息应发送到 Topic 的哪个分区(Partition)。
- 默认策略:若指定 Key,按 Key 的哈希值选择分区;若无 Key,采用轮询(Round-Robin)。
- 批次聚合
- 消息不会立即发送,而是按批次(Batch)聚合,减少网络请求次数。
- 批次大小由 batch.size 控制,等待时间由 linger.ms 控制(如 linger.ms=100 表示最多等待 100ms 发送批次)。
- 发送到 Broker
- 批次数据通过 Sender 线程 异步发送到目标分区的 Leader Broker。
- Broker 接收后写入本地日志(Log Segment),并根据副本机制同步到 Follower。
- 确认与重试
- Broker 返回确认(ACK)后,Producer 标记消息发送成功。
- 若发送失败(如网络问题、Leader 切换),Producer 自动重试(次数由 retries 配置)。
核心配置与优化
- 可靠性配置
- acks 控制 Broker 对消息持久化的确认级别。
- acks=0:不等待确认,吞吐量最高,可能丢失数据。
- acks=1:Leader 写入本地日志即确认,均衡可靠性与性能(默认)。
- acks=all(或 acks=-1):等待所有 ISR(In-Sync Replica)副本确认,最可靠,但延迟更高。
- enable.idempotence=true
启用幂等性,避免网络重试导致的消息重复(通过唯一 PID 和序列号去重)。 - max.in.flight.requests.per.connection
单连接未完成请求的最大数。若需严格顺序性,设置为 1(需启用幂等性)。
- acks 控制 Broker 对消息持久化的确认级别。
- 性能优化
- batch.size 与 linger.ms
增大批次大小,提升吞吐量 - compression.type
启用压缩(如 snappy、lz4),减少网络传输量 - buffer.memory
增大缓冲区内存(默认 32MB),避免因积压导致阻塞 - max.block.ms
缓冲区满或元数据获取阻塞时的最大等待时间,避免无限阻塞
- batch.size 与 linger.ms
高级配置
自定义分区器
实现 Partitioner 接口,按业务逻辑选择分区(如按用户 ID 哈希)。
java
public class CustomPartitioner implements Partitioner {
@Override
public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
return (key.hashCode() & Integer.MAX_VALUE) % partitions.size();
}
}拦截器
实现 ProducerInterceptor,在发送前后添加逻辑(如日志、监控)。
java
public class LoggingInterceptor implements ProducerInterceptor<String, String> {
@Override
public ProducerRecord<String, String> onSend(ProducerRecord<String, String> record) {
System.out.println("Sending: " + record.value());
return record;
}
}消息发送模式
其实 Kafka 发送消息本质都是通过异步发送消息的,通过 Future 接收发送结果
- 同步发送
调用 send() 后立即调用 get(),阻塞等待结果。javaFuture<RecordMetadata> future = producer.send(record); RecordMetadata metadata = future.get(); // 阻塞 - 异步发送
通过回调处理结果,避免阻塞主线程。javaproducer.send(record, (metadata, exception) -> { if (exception != null) { System.err.println("发送失败: " + exception.getMessage()); } else { System.out.println("发送成功,分区: " + metadata.partition()); } });
监控指标
- record-send-rate:消息发送速率(条/秒)。
- request-latency-avg:平均请求延迟。
- record-queue-time-avg:消息在缓冲区的平均等待时间。
- records-per-request-avg:每个请求的平均消息数。
- compression-rate-avg:压缩率(节省的带宽比例)。 通过监控工具(如 JMX、Prometheus)观察指标,调整配置优化性能。
Broker
Broker 是 Kafka 集群中的核心组件,负责消息的存储、管理、副本同步和客户端请求处理。每个 Broker 是一个独立的服务器节点,多个 Broker 协同工作构成分布式集群。
核心职责
消息存储与管理
- 分区(Partition)存储
- 每个 Topic 划分为多个 Partition,每个 Partition 在 Broker 上以有序、不可变的日志(Log Segment)形式存储。
- Partition 的物理路径示例:/tmp/kafka-logs/topic-0/00000000000000000000.log。
- 日志分段(Log Segment)
- 将 Partition 的日志切分为多个 Segment 文件(默认 1GB),便于管理和清理。
- 每个 Segment 包含一个数据文件(.log)和两个索引文件(.index 和 .timeindex),加速消息查找。
- 副本管理(Replication)
- Leader 与 Follower
- 每个 Partition 有多个副本(Replica),其中一个为 Leader,处理读写请求;其他为 Follower,从 Leader 同步数据。
- ISR(In-Sync Replica):与 Leader 保持同步的副本集合。只有 ISR 中的副本有资格被选举为 Leader。
- 副本同步机制
- Follower 定期向 Leader 发送 FETCH 请求,拉取最新消息并写入本地日志。
- 若 Follower 长时间未同步(replica.lag.time.max.ms,默认 30s),会被移出 ISR。
- Leader 与 Follower
- 客户端请求处理
- 生产者请求:接收 Producer 发送的消息,写入 Leader Partition 的日志文件。
- 消费者请求:响应 Consumer 的拉取请求,从指定 Offset 读取消息。
- 元数据请求:向客户端返回集群元数据(如 Topic 分区分布、Leader 节点地址)。
- 集群协调
- Controller 选举
- 集群中某个 Broker 被选举为 Controller,负责 Partition 的 Leader 选举、副本分配等管理任务。
- 基于 ZooKeeper(旧版本)或 KRaft(新版本,Kafka 3.3+ 内置元数据仲裁)实现选举。
- 分区重平衡
- 当 Broker 宕机或扩容时,Controller 重新分配 Partition 的 Leader 和副本。
- Controller 选举
Broker存储机制
- 日志文件结构
- 数据文件(.log)
- 存储消息的二进制数据,按 Offset 顺序追加写入。
- 每条消息包含 Offset、Timestamp、Key、Value 等信息。
- 偏移量索引(.index)
- 稀疏索引,记录 Offset 到物理位置的映射(如 Offset=100 → 文件位置 1024)。
- 消费者可通过二分查找快速定位消息。
- 时间戳索引(.timeindex)
- 记录 Timestamp 到 Offset 的映射,支持按时间范围查询消息。
- 数据文件(.log)
- 消息清理策略
- 基于时间
- 删除早于 retention.ms(默认 7 天)的旧消息。
- 基于大小
- 删除超出 retention.bytes 限制的旧消息。
- 日志压缩(Log Compaction)
- 保留每个 Key 的最新值,适用于需要精确恢复状态的场景(如数据库变更日志)。
- 基于时间
- 高效读写设计
- 顺序写入:消息追加到日志末尾,充分利用磁盘顺序 I/O 性能。
- 零拷贝(Zero-Copy)
- 使用 sendfile 系统调用,将磁盘文件直接发送到网络,减少内核态与用户态的数据拷贝。
- 由参数 socket.send.buffer.bytes 控制发送缓冲区大小。
高可用性与容错
- Leader 选举
- 故障场景
- Leader Broker 宕机时,Controller 从 ISR 中选举新的 Leader。
- 若 ISR 为空,根据 unclean.leader.election.enable 决定是否允许非 ISR 副本成为 Leader(可能丢失数据)。
- 选举过程
- Controller 监控 Broker 状态,触发选举并更新元数据。
- 客户端通过元数据请求获取新 Leader 地址。
- 故障场景
- 副本同步
- 同步延迟控制
- 参数 replica.lag.time.max.ms 决定 Follower 的最大允许延迟时间(默认 30s)。
- 延迟超限的 Follower 被移出 ISR,避免影响写入性能。
- 最小同步副本数
- 参数 min.insync.replicas(默认 1)定义写入成功所需的最小 ISR 副本数。
- 若 ISR 副本数不足,Producer 发送会抛出 NotEnoughReplicasException。
- 同步延迟控制
- 数据持久化
- 刷盘策略
- 参数 log.flush.interval.messages 和 log.flush.interval.ms 控制日志刷盘频率。
- 生产环境中通常依赖副本机制保证数据安全,而非频繁刷盘。
- 刷盘策略
Broker配置与优化
- 核心配置参数
- 基础配置
- broker.id:Broker 的唯一标识(整数)。
- listeners:客户端连接的监听地址(如 PLAINTEXT://:9092)。
- log.dirs:日志文件存储目录(建议配置多个磁盘路径提升吞吐)。
- 副本与 ISR
- default.replication.factor:新建 Topic 的默认副本数(建议 ≥3)。
- num.replica.fetchers:副本同步线程数,影响同步速度。
- 资源限制
- num.network.threads:处理网络请求的线程数(默认 3)。
- num.io.threads:处理磁盘 I/O 的线程数(默认 8,建议根据磁盘数量调整)。
- 基础配置
- 性能优化
- 磁盘与文件系统
- 使用 SSD 或 RAID 提升 I/O 性能。
- 文件系统挂载参数建议:noatime(禁用访问时间更新)。
- JVM 调优
- 堆内存分配(如 -Xmx8G -Xms8G),避免频繁 GC。
- 使用 G1 垃圾回收器:-XX:+UseG1GC。
- 网络优化
- 调整 socket.send.buffer.bytes 和 socket.receive.buffer.bytes 提升网络吞吐。
- 启用网络压缩(与 Producer/Consumer 压缩配置一致)。
- 磁盘与文件系统
- 监控指标
- 分区状态
- UnderReplicatedPartitions:未充分同步的分区数(理想值为 0)。
- ActiveControllerCount:当前 Controller 数量(正常为 1)。
- 资源使用
- NetworkProcessorAvgIdlePercent:网络线程空闲率(过低需增加线程数)。
- LogFlushRateAndTimeMs:日志刷盘延迟。
- 请求处理
- RequestHandlerAvgIdlePercent:请求处理线程空闲率。
- ProduceRequestRate 和 FetchRequestRate:生产/消费请求速率。
- 分区状态
Broker 运维实践
- 扩容与缩容
- 扩容
- 新增 Broker 后,使用 kafka-reassign-partitions.sh 工具重新分配 Partition。
- 示例命令
bashbin/kafka-reassign-partitions.sh --bootstrap-server localhost:9092 --reassignment-json-file reassign.json --execute - 缩容
- 将要移除的 Broker 上的所有副本迁移到其他节点,再关闭 Broker。
- 扩容
- 数据迁移与平衡
- 副本分配策略
- 确保 Partition 的副本分布在不同的机架(broker.rack 配置),避免单点故障。
- 使用 kafka-topics.sh --describe 检查副本分布。
- 副本分配策略
- 平衡工具
- 使用 kafka-reassign-partitions.sh 或第三方工具(如 Cruise Control)自动平衡负载。
- 常见问题处理
- Leader 不可用
- 检查 Broker 是否宕机,或网络分区导致 ZooKeeper/KRaft 会话超时。
- 手动触发 Leader 选举:kafka-leader-election.sh。
- 磁盘空间不足
- 清理旧数据:调整 retention.ms 或手动删除过期 Segment。
- 扩展 log.dirs 目录或挂载新磁盘。
- ISR 频繁收缩
- 检查 Follower 同步延迟原因(如网络带宽、磁盘 I/O 瓶颈)。
- 增大 replica.lag.time.max.ms 或优化同步线程数。
- Leader 不可用
新旧版本
- 旧版本(依赖 ZooKeeper)
- ZooKeeper 存储集群元数据(Broker 注册、Topic 配置、Controller 选举)。
- 新版本(KRaft 模式)
- Kafka 自身通过 Raft 协议管理元数据,无需 ZooKeeper,简化部署。
Topic
Topic(主题) 是 Kafka 中消息的逻辑分类单元,类似于数据库中的“表”或文件系统中的“文件夹”。所有消息的生产、存储和消费均围绕 Topic 展开。
Partition
Partition(分区) 是 Kafka 实现分布式、高吞吐能力的核心设计,每个 Topic 由多个 Partition 组成,每个 Partition 是一个有序、不可变的消息序列。
核心机制
物理存储
- 日志分段(Log Segment)
- Partition 的日志被切分为多个 Segment 文件(默认 1GB),每个 Segment 包含
- 数据文件(.log)
- 按 Offset 顺序存储消息
- 文件命名规则:[起始Offset].log(如 00000000000000000000.log)
- 偏移量索引(.index) 稀疏索引,记录 Offset 到物理位置的映射
- 时间戳索引(.timeindex) 支持按时间范围查找消息
- 数据文件(.log)
- Partition 的日志被切分为多个 Segment 文件(默认 1GB),每个 Segment 包含
- 顺序写入
- 新消息追加到当前活跃 Segment 末尾,充分利用磁盘顺序 I/O 的高性能。
- 写入完成后更新 LEO(Log End Offset),表示最新消息位置。
消息顺序性
- 分区内有序
- 同一 Partition 内的消息按写入顺序分配递增 Offset,保证严格有序
- 生产者可通过分区键(Key)将需保证顺序的消息发送到同一 Partition
- 跨分区无序
- 不同 Partition 之间的消息顺序不保证,需业务逻辑处理(如全局排序需额外设计)
读写机制
- 生产者写入
- 消息通过 Leader Partition 写入,Follower 异步拉取数据同步
- 写入成功条件由 acks 参数控制(如 acks=all 需所有 ISR 副本确认)
- 消费者读取
- 消费者从指定 Offset 拉取消息,支持顺序消费或随机跳转(通过 seek() API)
- 消费者组(Consumer Group)中每个消费者实例独立负责一组 Partition 的消费
设计原则
- 分区数选择
- 吞吐量
- 单个 Partition 的吞吐量约为 10~50MB/s(取决于磁盘和网络性能)
- 目标总吞吐量 ÷ 单分区吞吐量 ≈ 最小分区数(需预留扩展空间)
- 并行度
- 分区数决定消费者的最大并行度(每个 Partition 只能被同一消费者组的一个实例消费)
- 示例:若需 10 个消费者并行消费,Topic 至少需要 10 个 Partition。
- 业务需求
- 需保证顺序的消息必须分配到同一 Partition(如订单号作为 Key)。
- 分区数过多可能导致元数据管理开销增加(ZooKeeper/KRaft 压力)。
- 吞吐量
- 副本策略
- 副本数(Replication Factor)
- 生产环境建议 replication.factor=3,确保高可用性
- 副本分布在不同机架(通过 broker.rack 配置)避免单点故障
- Leader 选举
- Leader 负责处理读写请求,Follower 仅同步数据
- 若 Leader 宕机,Controller 从 ISR(In-Sync Replicas)中选举新 Leader
- 副本数(Replication Factor)
- 数据保留与清理
- 时间保留
- retention.ms=604800000(默认 7 天):删除过期 Segment
- 空间保留
- retention.bytes=-1(默认无限制):按总大小清理旧数据
- 日志压缩(Log Compaction)
- 保留每个 Key 的最新值,适用于状态更新场景(如用户配置变更)
- 通过 cleanup.policy=compact 启用
- 时间保留
副本管理
- 副本同步机制
- ISR 列表
- 包含所有与 Leader 保持同步的副本,由 Leader 动态维护
- Follower 同步延迟超过 replica.lag.time.max.ms(默认 30s)会被移出 ISR
- 同步过程
- Follower 定期向 Leader 发送 FETCH 请求,拉取新消息并写入本地日志
- Leader 更新 HW(High Watermark),表示所有 ISR 副本已同步的最高 Offset
- ISR 列表
- 写入可靠性
- ACK 机制
- acks=0:无需确认,可能丢失数据
- acks=1(默认):Leader 写入即确认,可能丢失未同步的副本数据
- acks=all:需所有 ISR 副本确认,保证数据不丢失
- 最小同步副本数
- min.insync.replicas=2:若 ISR 副本数不足,Producer 抛出 NotEnoughReplicasException
- ACK 机制
- 故障恢复
- Leader 选举
- Controller 监控 Broker 状态,从 ISR 中选择新 Leader
- 若 ISR 为空且 unclean.leader.election.enable=true,允许非 ISR 副本成为 Leader(可能丢失数据)
- 数据一致性
- 消费者只能读取到 HW 之前的消息,确保故障恢复后不会读到未提交数据
- Leader 选举
性能优化
- 硬件与配置优化
- 磁盘性能
- 使用 SSD 或 RAID 提升 I/O 吞吐量
- 挂载参数建议:noatime(禁用文件访问时间更新)
- 网络优化
- 增大 socket.send.buffer.bytes 和 socket.receive.buffer.bytes(默认 100KB)
- 启用压缩(compression.type=snappy)减少网络传输量
- 磁盘性能
- JVM 与系统调优
- 堆内存分配
- Broker 堆内存建议 6~8GB(避免过大导致 GC 停顿)
- 使用 G1 垃圾回收器:-XX:+UseG1GC
- 文件描述符限制
- 增大 ulimit -n(如 100000),避免 “Too many open files” 错误
- 堆内存分配
- 分区与负载均衡
- 避免热点分区
- 均匀分配分区键(如哈希散列),防止单个 Partition 负载过高
- 监控 BytesInPerSec 和 BytesOutPerSec 发现热点
- 动态扩容
- 使用 kafka-reassign-partitions.sh 工具迁移 Partition 到新 Broker
- 扩容后需调整生产者分区策略以利用新分区
- 避免热点分区
