文件目录布局

image-20211122210338721

上图所示的是kafka在某一时刻的文件目录布局,每一个根目录都会包含最基本的 4 个检查点文件(xxx-checkpoint)和 meta.prop创ies 文件。当broker配置了多个根目录时,会挑选分区数最少的根目录来创建主题。

主题、分区、副本、Log和LogSegment之间的关系如下图所示:

img
  • 一个分区副本对应一个日志(Log),一个日志会分配成多个日志分段(LogSegment),Log在物理上以文件夹形式存储,而LogSegment对应磁盘上的一个日志文件和两个索引文件及可能的其他文件。
  • 向Log追加消息时是顺序写入的,只有最后一个LogSegment才能执行写入,称为activeSegment,满足一定条件时,需要创建新的activeSegment。
  • 每个日志及索引的文件名根据基准偏移量(BaseOffset)命名,表示当前LogSegment中第一条消息的offset。

日志格式

消息压缩

  • Kafka 会将多条消息一起进行压缩,生产者发送的压缩数据在 broker 中也是保持压缩状态进行存储的 ,消费者从服务端获取的也是压缩的消息,消费者在处理消息之前才会解压消息。
  • 压缩方式通过compression.type来配置:producer、gzip、snappy、lz4、uncmpressed。
  • 消息压缩时,整个消息集压缩为内层消息,整体作为外层消息的value。外层消息的offset保存了内层消息最后一条记录的offset,而内层消息在压缩时会从0开始分配一个offset,内层消息的offset会在服务端进行转换。

v2版本日志格式

旧版本的消息格式(日志格式)不在这里展示,有需要可查阅5.2节的内容

img

该版本引入了变长整型(Varints)和ZigZag编码来保存数值。如果消息本身没有 key,那么 key length 字段为-1,旧版本的消息用int类型来编码,需要4个字节,而如果使用Varints加ZigZag来编码则只需要1个字节。

Varints

  • 使用一个或多个字节来序列化整数的一种方法。数值越小,其占用的字节数就越少。
  • 每个字节都有一个位于最高位的 msb 位( most significant bit),除最后一个字节的msb为0外,其余msb位都设置为1。除 msb 位外,剩余的 7 位用于存储数据本身,即一个字节可以表示$2^7=128$ 个值。
  • Varints中采用的是小端字节序,即最小的字节放在前面。
  • 举例:
    • 数字1用varints来表示-> 0000 0001;
    • 数字300用varints来表示->1010 1100 0000 0010;

ZigZag编码

  • 若使用 int64等有符号整数类型表示一个负数,那么哪怕是-1,其编码后的长度始终为4个字节。为了使编码更加高效, Varints 使用了 ZigZag 的编码方式。
  • ZigZag编码以一种锯齿形( zig-zags)的方式来回穿梭正负整数, 将带符号整数映射为无符号整数,这样可以使绝对值较小的负数仍然享有较小的Varints编码值。
  • 比如-1编码为1, 1编码为 2, -2编码为3,2编码为4,-3编码为5,以此类推。
  • 将原值转换为ZigZag编码的公式为 ( n << 1 ) ^ ( n >> 31 ) 该公式是对sint32类型的原值而言的。

日志索引

Kafka 中的索引文件以**稀疏索引( sparse index)**的方式构造消息的索引,它并不保证每个消息在索引文件中都有对应的索引项 。

每当写入一定量(由 broker 端参数 log.index.interval.bytes 指定,默认值为 4096,即 4KB)的消息时,偏移量索引文件和时间戳索引文件分别增加一个偏移量索引项和时间戳索引项。(通过增大或减小 log.index.interval.bytes 的值,可以增加或缩小索引项的密度)

偏移量索引文件中的偏移量是单调递增的,查询指定偏移量时,使用二分查找法来快速定位偏移量的位置,如果指定的偏移量不在索引文件中,则会返回小于指定偏移量的最大偏移量。查询指定时间戳时,也根据二分查找法来查找不大于该时间戳的最大偏移量,至于要找到对应的物理文件位置还需要根据偏移量索引文件来进行再次定位。

日志分段文件进行切分的条件

  • 当前日志分段文件的大小超过了 broker 端参数 log.segment.bytes 配置的值
  • 当前日志分段中消息的最大时间戳与当前系统的时间戳的差值大于 log.roll .mslog.roll.hours 参数配置的值。(若都配置了,则ms的优先级高)
  • 偏移量索引文件或时间戳索引文件的大小达到 broker端参数 log.index.size.max.bytes 配置的值
  • 追加的消息的偏移量与当前日志分段的偏移量之间的差值大于Integer.MAX_VALUE, 即要追加的消息的偏移量不能转变为相对偏移量

日志分段的大小:Kafka 在创建索引文件的时候会为其预分配 log.index.size.max.bytes 大小的空间,注意这一点与日志分段文件不同,只有当索引文件进行切分的时候,Kafka 才会把该索引文件裁剪到实际的数据大小 。也就是说与当前活跃的日志分段对应的索引文件的大小固定为log.index.size.max.bytes,而其余日志分段对应的索引文件的大小为实际的占用空间。

偏移量索引(.index)

偏移量索引文件用来建立消息偏移量(offset)到物理地址之间的映射关系,方便快速定位消息所在的物理文件位置

每个索引占8个字节,分为两个部分:

  • relativeOffset:相对偏移量,表示消息相对于baseOffset的偏移量,当前索引文件的文件名即为baseOffset值
  • position:消息在日志分段文件中的物理地址
image-20211122215526937

时间戳索引(.timeindex)

时间戳索引文件则根据指定的时间戳( timestamp)来查找对应的偏移量 信息。

每个索引占12个字节,分为两个部分:

  • timestamp:当前日志分段最大的时间戳
  • relativeOffset:时间戳对应的消息的相对偏移量
image-20211122215609268

日志清理

Kafka 将消息存储在磁盘中,为了控制磁盘占用空间的不断增加就需要对消息做一定的清理操作。

  • 可以通过broker端参数log.cleanup.policy设置日志清理策略(默认delete)

  • 有两种清理策略:

    • 日志删除:按照一定保留策略直接删除不符合条件的日志分段

      • 设置log.cleanup.policy为delete
    • 日志压缩:针对每个消息的key进行整合,对于有相同key的不同value值,只保留最后一个版本

      • 设置log.cleanup.policy为compact,并且将log.leaner.enable设置为true(默认true)
    • 可同时使用日志删除和日志压缩两种策略

日志删除

  • Kafka的日志管理器中有一个专门的日志删除任务来周期性地检测和删除不符合保留条件的日志分段文件,周期通过 broker 端参数 log.retention.check.interval.ms来配置(默认300000,5分钟)

  • 日志分段保留策略有3种:

    • 基于时间的保留策略:

      • 通过log.retention.hourslog.retention.minuteslog.retention.ms来配置超时清理阈值

        优先级ms>minutes>hours(默认log.retention.hours=168,7天)

    • 基于日志大小的保留策略:

      • 通过log.retention.bytes配置Log日志总大小阈值(默认-1,无穷大)
      • 通过log.segment.bytes配置日志分段文件大小阈值(默认1G)
    • 基于日志起始偏移量的保留策略:

      • 某日志分段的下一个日志分段的起始偏移量baseOffset是否小于等于logStartOffset,若是则可以删除此日志分段
      • logStartOffset 的值可以通过DeleteRecordsRequest请求(比如使用KafkaAdminClient的deleteRecords()方法、使用kafka-delete-records.sh脚本〉、日志的清理和截断等操作进行修改
  • 删除日志分段时,首先会从Log对象中所维护日志分段的跳表中移出待删除分段,以保证没有线程对其进行读取,然后将对应文件加上.deleted后缀,最后由名为delete-file的延迟任务来删除文件

    • Kafka 的每个日志对象中使用了 ConcurrentSkipListMap 来保存各个日志分段,每个日志分段的 baseOffset 作为 key
    • 删除任务延迟时间通过file.delete.delay.ms配置(默认60000,1分钟)

日志压缩(Log Compaction)

  • Log Compaction对于有相同key的不同value值,只保留最后一个版本。

  • 每个日志目录下都有一个名为“cleaner-offset-checkpoint”的清理检查点文件,用来记录每个主题的每个分区中己清理的偏移量。通过清理检查点文件可以将 Log 分成两个部分。通过检查点cleaner checkpoint来划分出 一个己经清理过的clean部分和一个还未清理过的dirty部分。

    image-20211123103357404
  • 注意Log Compaction是针对key的,所以在使用时应注意每个消息的key值不为null。每个broker会启动log.cleaner.thread(默认1)个日志清理线程负责执行清理任务, 这些线程会选择“污浊率”最高的日志文件进行清理。

    污浊率:dirtyRatio = dirtyBytes / ( cleanBytes + dirtyBytes )

  • 为了防止日志不必要的频繁清理操作,使用参数log.cleaner.min.cleanable.ratio(默认0.5)来限定可进行清理操作的最小污浊率。 Kafka 中用于保存消费者消费位移的主题_consumer_offsets使用的就是Log Compaction策略

  • 每个日志清理线程会使用一个名为“SkimpyOffsetMap”的对象来构建key与offset的映射关系的哈希表(dirty部分)。日志清理需要遍历两次日志文件,第一次遍历把每个 key 的哈希值和最后出现的offset都保存在 SkimpyOffsetMap 中,第二次遍历会检查每个消息是否符合保留条件,符合就保留下来,否则就会被清理.

  • 墓碑消息(tombstone):

    • 如果一条消息的key不为null,但是其value为null,那么此消息就是墓碑消息。
    • 日志清理线程发现墓碑消息时会先进行常规的清理,并保留墓碑消息一段时间。墓碑消息的保留条件是所在的日志分段的最近修改时间 lastModifiedTime大于deleteHorizonMs。deleteHorizonMs为clean部分中最后一个日志分段的最近修改时间减去保留阈值deleteRetionMs(通过 broker 端参数log.cleaner.delete.retention.ms配置,默认86400000,即24小时)
  • Log Compaction执行过后的日志分段的大小会比原先的日志分段的要小,为了防止出现太多的小文件,Kafka 在实际清理过程中并不对单个的日志分段进行单独清理,而是将日志文件 中 offset 从 0 至 firstUncleanableOffset 的所有日志分段进行分组,每个日志分段只属于一组 。

    • 分组策略为: 按照日志分段的顺序遍历 ,每组中日志分段的占用空间大小之和不超过 segmentSize(可以通过 broker端参数 log.segment.bytes 设置,默认值为 1GB),且对应的索引文件占用大小之和不超过 maxindexSize (可以通过 broker端参数 log.index.interval.bytes设置,默认值为 I0MB)。同一个组的多个日志分段清理过后,只会生成一个新的日志分段。
    • image-20211123103847982

磁盘存储

页缓存(pagecache)

  • 页缓存是操作系统实现的一种主要的磁盘缓存,用来减少对磁盘I/O的操作。具体就是把磁盘中的数据缓存到内存中,把对磁盘的访问变成对内存的访问。

    • 读取:操作系统会先查看数据所在的页(page)是否在页缓存中,如果存在则直接返回,不存在则向磁盘发起读取请求并将读取的数据存入页缓存,之后再将数据返回。
    • 写入:查看数据所在的页(page)是否在页缓存中,存在则直接修改页缓存,不存在则在页缓存中添加相应的页再写入。被修改过的页变成了脏页,操作系统会在合适的时间把脏页数据写入磁盘以保持数据一致性。
  • Kafka大量使用了页缓存,这是实现高吞吐的重要因素之一。Kafka提供了同步刷盘及间断性强制刷盘的功能,但并不推荐使用。

磁盘I/O流程

从编程角度而言,一般I/O场景有以下4种,他们的数据流为:

  • 用户调用标准C库进行I/O操作:用户程序buffer->C库标准IObuffer->文件系统页缓存->通过具体文件系统到磁盘
  • 用户调用文件I/O:用户程序buffer->文件系统页缓存->通过具体文件系统到磁盘
  • 用户打开文件时使用O_DIRECT,绕过页缓存直接写磁盘
  • 用户使用类似dd工具,使用direct参数,绕过系统cache与文件系统直接写磁盘

最长链路数据流图示:

image-20211123105825407

IO请求处理:通用块层根据 1/0 请求构造一个或多个 bio 结构并提交给调度层;调度器将 bio 结构进行排序和合并组织成队列且确保读写操作尽可能理想: 将一个或多个进程的读操作合并到一起读,将一个或多个进程的写操作合并到一起写,尽可能变随机为顺序 (因为随机读写比顺序读写要慢),读必须优先满足,而写也不能等太久

针对不同应用场景,I/O调度策略也会影响I/O读写性能,目前Linux提供4中调度策略:NOOP、CFQ(默认)、DEADLINE、ANTICIPATORY。各调度算法的详细描述可查阅5.5.2节,这里不再赘述。

此外,若想了解linux内核的磁盘io可参考该篇博文

零拷贝(Zero-Copy)

  • 零拷贝是指将数据直接从磁盘文件复制到网卡设备中,而不需要经由应用程序之手。零拷贝大大提高了应用程序的性能,减少了内核和用户模式之间的上下文切换。 对 Linux 操作系统而言,零拷贝技术依赖于底层的sendfile()方法实现。对应于Java 语言,FileChannal.transferTo()的底层实现就是sendfile()。

  • 零拷贝和非零拷贝对比

    image-20211123110541773