可靠性探究

副本剖析

失效副本

  • 同步失效或功能失效的副本成为失效副本,失效副本对应的分区成为同步失效分区(under-replicated)

    • 同步失效:根据broker参数 replica.lag.time.max.ms 作为标准,当ISR中的follower副本滞后leader副本的时间超过此时间则判定同步失败

      滞后时间 = now - lastCaughtUpTimeMs

      当follower副本将leader副本LEO(LogEndOffset)之前的日志全部同步时,则认为该follower副本己经追赶上leader副本,此时更新该副本的lastCaughtUpTimeMs标识

    • 功能失效:副本处于非存活状态,例如副本所在的broker节点被关闭

  • 如果通过工具增加了副本因子,那么新增加的副本在赶上leader副本之前也都是处于失效状态的。

ISR的伸缩

Kafka 在启动时会开启定时任务,周期型的检测每个分区是否需要缩减其ISR集合

  • ISR的缩减:当ISR中的follower副本滞后leader副本的时间超过replica.lag.time.max.ms则将该副本移出ISR集合
  • ISR的扩充:当follower副本的LEO不小于leader副本的HW即判定可进入ISR集合

LEO与HW

多副本消息追加过程

  • 生产者客户端发送消息至leader副本
  • 消息追加到leader副本的本地日志,并更新日志偏移量
  • follower副本向leader副本请求同步数据
  • leader副本所在的服务器读取本地日志,并更新对应拉取的follower副本信息
  • leader副本所在服务器将拉取结果返回follower副本
  • follower副本收到结果,将消息追加到本地日志,并更新日志的偏移量信息

LEO和HW更新过程

image-20211125102555685
  • follower向leader拉取消息时,带有自己的LEO信息(fetch_offset),leader更新HW(取HW和LEO中的最小值),返回follower相应消息,并带有自身的HW
  • follower收到新消息后,更新LEO和HW

在一个分区中,leader会记录所有副本的LEO,而follower只会记录自身LEO

Leader Epoch的介入

解决在需要截断数据的场景下,LEO/HW不一致导致数据丢失的问题,详细内容可查阅8.1.4节。

为什么不支持读写分离

  • 主写从读的问题:数据一致性问题、延时问题
  • kafka通过分区副本机制来解决负载均衡问题

日志同步机制

  • 日志同步机制的基本原则
    • 如果告知客户端已经成功提交了某条消息,那么即使leader宕机,也要保证新选举出来的leader中能够包含这条消息
  • kafka通过维护ISR集合,保证leader切换后的数据完整性

Kafka应用

  1. 命令行工具

    • kafka-configs.sh:配置管理
    • kafka-server-start.sh:启动kafka服务
    • kafka-server-stop.sh:关闭kafka服务
    • kafka-topics.sh:管理主题
    • kafka-preferred-replica-election.sh:优先副本选举
    • kafka-reassign-partitions.sh:分区重分配
    • kafka-consumer-groups.sh:消费组管理、重置消费位移
    • kafka-console-consumer.sh:命令行消费消息
    • kafka-console-producer.sh:命令行生产消息
    • kafka-consumer-perf-test.sh:测试消费性能
    • kafka-dump-log.sh:查看日志内容
    • kafka-delete-records.sh:删除消息
  2. Kafka Connect

    • 基本概念

      • Kafka Connect是一个用于将数据流输入和输出Kafka的框架,可以简单快捷地将数据从Kafka导入或导出
      • Source和Sink:Source负责导入数据到Kafka,Sink负责从Kafka导出数据,统称为Connector
      • Task和Worker:
        • Connector把一项工作分割成许多Task,然后分发到各个Worker进程去执行
        • Task不保存自己的状态信息,而是交给特定kafka topic保存,Connector和Task都是逻辑工作单位,必须安排在进程(Worker)中执行
    • 独立模式

      • 通过connect-standalone.sh启动,所有操作都是在一个进程中完成
      • 需要制定两个配置文件:
        • Worker进程运行相关配置:connect-standalone.properties
        • Source或Sink配置:connect-file-source.properties、connect-file-sink.properties
    • REST API

      • /(GET):查看Kafka集群版本信息
      • /connectors (GET/POST):查看Connector列表、创建Connector
      • /connectors/{name}(GET):查看指定Connector
      • /connectors/{name}/config(GET/PUT):查看/修改指定Connector配置
      • /connectors/{name}/status(GET):查看指定Connector配置
        ……
    • 分布式模式

      • 运行脚本启动:connect-distributed.sh
      • 修改Worker配置文件:connect-distributed.properties
      • 修改Source或Sink配置:同独立模式

      以分布式模式启动的连接器并不支持在启动时通过加载连接器配置文件来创建一个连接器,只能通过访问RESTAPI来创建连接器。

  3. Kafka Mirror Maker

    • 用于在两个集群之间同步数据的工具,原理是从源集群消费消息,然后生产到目标集群
    • 修改配置文件:consumer.properties,producer.properties
    • 启动脚本:kafka-mirror-maker.sh
  4. Kafka Streams

    • Kafka Streams是一个用于处理和分析数据的客户端库,它先把存储在Kafka中的数据进行处理和分析,然后将数据结果写到Kafka或发送到外部系统

    • 解决问题:

      • 毫秒级延迟的逐个事件处理
      • 有状态的处理,包括分布式连接和聚合
      • 方便的DSL
      • 使用类似DataFlow的模型对无序数据进行窗口化
      • 具有快速故障切换的分布式处理和容错能力
      • 无停机滚动部署
    • 使用时需要引入依赖:org.apache.kafka/kafka-streams

Kafka监控

  1. 监控数据的来源
    • Kafka自身提供的监控指标(包括broker和主题的指标)都可以通过JMX来获取,需要设置JMX_PORT设置端口并开启JMX功能
      开启JMX后会在zk的 /brokers/ids/ 节点中有jmx_port值
    • 客户端指标数据可通过ProducerMetrics和ConsumerMetrics获取
  2. 消费滞后
    • Kafka中留存的消息与Consumer的消息之间的差值就是消息滞后量(Lag),对每个分区而言,Lag = HW - ConsumerOffset
      如果分区中有未完成的事务,且isolation.level = “read_committed”,Lag = LSO - ConsumerOffset
    • 计算Lag
      • 通过FindCoordinatorRequest查找消费组对应的GroupCoordinator
      • 通过AdminClient获取DescribeGroupsRequest,获取当前消费组元数据信息
      • 通过OffsetFetchRequest请求获取消费位移ConsumerOffset
      • 通过KafkaConsumer.endOffsets()方法获取HW(LSO)值
      • HW与ConsumerOffset相减得到分区Lag
  3. 监控指标说明
    • 通过jconsole查看所有MBean

高级应用

  1. 过期时间
    • 给消息添加timeStamp和超时时间,并在消费时使用拦截器,判断是否超时后进行消费
  2. 延时队列
    • 到期才能消费
    • 实现方式
      • 1.依然采用给消息添加timeStamp和延时时间,消费者拉取一批消息后,如果有未达到延时时间的消息,就重新写入主题
      • 2.延时消息先投递到一个指定的主题,并使用自定义服务拉取、判断,满足条件后再投递到消费者真实消费的主题
  3. 其他功能:
    • 死信队列、消息路由、消息轨迹、消息审计等均可以自行设计生产消费结构来实现
  4. 消息代理
    • Kafka REST Proxy可以为Kafka集群提供一些列的REST API接口,通过这些接口可以实现发送消息、消费消息、查看集群状态和管理类操作等功能