基本概念
体系架构
- Producer:生产者
- Consumber:消费者
- Broker:服务代理节点(kafka实例)
消息存储
- 主题(Topic):kafka消息以topic为单位进行归类,逻辑概念
- 分区(Partition):
- Topic-Partition为一对多
- 分区在存储层面可看做是一个可追加的日志文件
- 消息在追加到分区时会分配一个特定的偏移量(offset)作为在此分区的唯一标示
- kafka通过offset保证消息在分区内的顺序性,但只保证分区有序而不保证主题有序
- Kafka 中的分区可以分布在不同的服务器 (broker)上,也 就是说,一个主题可以横跨多个 broker
- 每条消息发送到broker前,会根据分区规则分配到具体的哪个分区
容灾设计
多副本机制(Replica)
- 一个分区会在多个副本中保存相同的消息
- 副本之间是一主多从关系(一个leader副本,若干follower副本,副本数量可通过参数设置)
- leader副本负责读写操作,follower副本只负责同步消息(主动拉取)
- leader副本故障时,从follower副本重新选举新leader
同步状态
- 分区中所有副本统称为 AR(Assigned Replicas)
- 所有与leader副本保持一定程度的同步的副本(包括leader)组成 ISR(In-Sync Replicas)
- 同步滞后过多的副本组成 OSR(Out-of-Sync Replicas)
“一定程度的同步”是指可忍受的滞后范围,这个范围可以通过参数
replica.lag.time.max.ms
进行配置
- ISR的伸缩:
- 从ISR中移出副本
- Kafka 的副本管理器会启动一个副本过期检测的定时任务,而这个定时任务会定时检查当前时间与副本的 lastCaughtUpTimeMs 差值是否大于参数
replica.lag.time.max.ms
指定的值,若是则从ISR中移出该副本 - 当 follower 副本将 leader 副本 LEO (LogEndOffset) 之前的日志全部同步时,则认为该 follower 副本己经追赶上 leader 副本,此时更新该副本的 lastCaughtUpTimeMs 标识为当前时间
- Kafka 的副本管理器会启动一个副本过期检测的定时任务,而这个定时任务会定时检查当前时间与副本的 lastCaughtUpTimeMs 差值是否大于参数
- 将副本移入ISR
- 随着 follower 副本不断与 leader 副本进行消息同步, follower 副本的 LEO 也会逐渐后移 , 并最终追赶上 leader 副本,此时该 follower 副本就有资格进入 ISR 集合
- 追赶上 leader 副本的 判定准则是此副本的 LEO 是否不小于 leader副本的 HW,注意这里并不是和 leader副本的 LEO 相比
- 从ISR中移出副本
特殊偏移量
LEO(Log End Offset):标识当前分区下一条代写入消息的offset
HW(High Watermark):高水位,标识了一个特定的offset,消费者只能拉渠道这个offset之前的消息(不含HW)
- 所有副本都同步了的消息才能被消费,HW的位置取决于所有follower中同步最慢的分区的offset
生产者
客户端开发
消息发送步骤
配置生产者客户端参数及创建相应的生产者实例
- Properties
- KafkaProducer
构建待发送的消息:ProducerRecord
发送消息:send( ),flush( )
关闭生产者实例:close( )
必要参数配置
bootstrap.servers
:设置kafka集群地址,并非需要所有broker地址,因为生产者会从给定的broker中获取其他broker信息key.serializer
、value.serializer
:转换字节数组到所需对象的序列化器,填写全限类名
发送模式
- 发后即忘(fire-and-forget):只管往kafka发送而不关心消息是否正确到达,不对发送结果进行判断处理;
- 同步(sync):KafkaProducer.send()返回的是一个Future对象,使用Future.get()来阻塞获取任务发送的结果,来对发送结果进行相应的处理;
- 异步(async):向send()返回的Future对象注册一个Callback回调函数,Kafka在返回响应时调用该函数来实现异步的发送确认逻辑。
自定义拦截器
实现
ProducerInterceptor
接口,在消息发送的不同阶段调用configure()
:完成生产者配置时onSend()
:调用send()后,消息序列化和计算分区之前onAcknowledgement()
:消息被应答之前或消息发送失败时close()
:关闭拦截器时执行一些资源的清理工作
通过
interceptor.classes
配置指定
自定义序列化器
- 实现
Serializer
接口,此接口有三个方法configure()
:用来配置当前类serialize()
:用来执行序列化操作close()
:用来关闭当前的序列化器
生产者使用的序列化器和消费者使用的反序列化器是需要一一对应的
分区器
- 在消息发送到kafka前,需要先计算出分区号,这要依赖分区器,默认使用DefaultPartitioner
- 默认分区器采用MurmurHash2算法,会对key进行哈希,最终根据得到的哈希值来计算分区号, 拥有相同 key 的消息会被写入同一个分区 。 如果key为null,那么消息将会以轮询的方式发往主 题内的各个可用分区
- 自定义分区器:
- 实现
Partitioner
接口,此接口有两个方法- partition():用来计算分区号
- close():用来在关闭分区器的时候回收一些资源
- 通过
partitioner.class
配置指定
- 实现
原理分析
整体架构
- 主线程KafkaProducer创建消息,通过可能的拦截器、序列化器和分区器之后缓存到消息累加器(RecordAccumulatro)
- 消息在RecordAccumulator被包装成ProducerBatch(一个批次的消息,即一个至多个消息),以便Sender线程可以批量发送。生产者发送消息的速度超过发送到服务器的速度时,会导致生产者的空间不足,
send()
方法会被阻塞或抛异常- RecordAccumulatro缓存的大小通过
buffer.memory
配置,阻塞时间通过max.block.ms
配置
- RecordAccumulatro缓存的大小通过
- Kafka生产者客户端中,通过ByteBuffer实现消息内存的创建和释放,而RecordAccumulator内部有一个BufferPool用来实现ByteBuffer的复用
- BufferPool只针对特定大小的ByteBuffer进行管理,而其他大小的ByteBuffer不会缓存进BufferPool中,这个特定的大小由
batch.size
参数来指定(默认16384B, 即16KB)
- BufferPool只针对特定大小的ByteBuffer进行管理,而其他大小的ByteBuffer不会缓存进BufferPool中,这个特定的大小由
- Sender从RecordAccumulator中获取缓存的消息后,将ProducerBatch按Node分组,转换成<Node, List
>的形式,Node代表broker节点。也就是说sender只向具体broker节点发送消息,而不关注属于哪个分区,这里是应用逻辑层面到网络层面的转换。 - 在转换成<Node, List
>的形式之后, Sender 还会进一步将消息封装成<Node, Request>的形式,这样就可以将 Request 请求发往各个 Node了
- 在转换成<Node, List
- Sender将请求发往Kafka前,还会保存到InFlightRequests中,其主要作用是缓存已经发出去但还没收到响应的请求,也是以Node分组。
- 每个连接最大缓存未响应的请求数通过
max.in.flight.requests.per.connection
配置(默认5)
- 每个连接最大缓存未响应的请求数通过
若请求返回异常,则可进行重试,重试次数可自行指定
元数据的更新
- 元数据是指 Kafka 集群的元数据,这些元数据具体记录了集群中有哪些主题,这些主题有哪些分区,每个分区的 leader 副本分配在哪个节点上, follower副本分配在哪些节点上,哪些副本在 AR、 ISR 等集合中,集群中有哪些节点,控制器节点又是哪一个等信息。
- 当客户端中没有需要使用的元数据信息或超过metadata.max.age.ms没有更新元数据时,就会引起元数据更新操作。
- 当需要更新元数据时,会先挑选出 leastLoadedNode, 然后 向这个Node发送 MetadataRequest请求来获取具体的元数据信息。这个更新操作是由 Sender 线程发起的, 在创建完 MetadataRequest之后同样会存入 InFlightRequests,之后的步骤就和发送消息时类似。
- InFlightRequests可以获得leastLoadedNode,即所有Node中负载最小的那一个。选择 leastLoadedNode 发送请求可以使它能够尽快发出,避免因网络拥塞等异常而影响整体的进度。leastLoadedNode一般用于元数据请求、消费者组播协议等交互。
重要的生产者参数
acks
:用来指定分区中有多少个副本收到这条消息,才认为生产者写入成功并对请求进行响应(默认”1”)- “1”:leader写入即成功、“0”:不需要等待服务端响应、”-1”/“all”:ISR所有副本都写入才收到响应
max.request.size
:限制生产者客户端能发送的消息的最大值(默认1048576,即1m)retries、retry.backoff.ms
:生产者重试次数(默认0)和两次重试之间的间隔(默认100)compression.type
:消息压缩方式,可配置为”gzip”、”snappy”、”lz4”(默认”none”)connections.max.idle.ms
:多久后关闭闲置的连接(默认540000,9分钟)linger.ms
:生产者发送ProducerBatch之前等待更多消息(ProducerRecord)加入的时间(默认为0)receive.buffer.bytes
:Socket接收消息缓冲区的大小(默认32768,32k)send.buffer.bytes
:Socket发送消息缓冲区的大小(默认131072,128k)request.timeout.ms
:Producer等待请求响应的最长时间(默认30000ms),这个值需要比broker参数replica.lag.time.max.ms
大(该参数配置ISR中的follower和leader副本间可容忍的滞后范围)
消费者
消费者与消费组
- 每个分区只能被一个消费组的一个消费者消费
- 当一个消费组内的消费者数大于分区数时,会有消费者分配不到分区而无法消费任何消息
- 消费者并非逻辑上的概念,它是实际的应用实例,它可以是一个钱程,也可以是一个进程。
客户端开发
消费步骤
- 配置消费者客户端参数及创建KafkaConsumer实例
- 订阅主题
- 拉取消息并消费
- 提交消费位移
- 关闭实例
必要的参数配置
bootstrap.servers
:集群broker地址清单group.id
:消费组名称key.deserializer
、value.deserializer
:反序列化器
订阅主题和分区
subscribe()
:订阅主题,既可以以集合的形式订阅多个主题,也可以以正则表达式的形式订阅特定模式的主题assign()
:订阅指定主题的指定分区- 如果我们事先并不知道主题中有多少个分区怎,则可通过
partitionFor()
方法先获取指定主题的元数据信息(包括分区信息)
- 如果我们事先并不知道主题中有多少个分区怎,则可通过
unsubscribe()
:取消订阅- 如果将
subscribe(Collection)
或assign(Collection)
中的集合参数设置为空集合 ,那么作用等同于unsubscribe()
方法
- 如果将
消息消费
poll()
:返回的是所订阅的主题(分区)上的一组消息,可设定timeout参数来控制阻塞时间(返回的是还没有被消费过的消息集)pause()
、resume()
:暂停和恢复某分区的消费seek()
:按指定分区的指定offset消费beginingOffsets()
,endOffsetes()
,offstesForTimes()
:分别获取指定分区开头的位置、指定分区末尾的位置或时间戳大于等于待查询时间的第一条消息对应的位置和时间戳seekToBeginning
、seekToEnd()
:从指定分区的开头、末尾开始消费
位移提交
每次调用poll()
方法时,返回的是还没有被消费过的消息集,要做到这一点,就需要记录上一次消费时的消费位移。在旧消费者客户端中,消费位移是存储在 ZooKeeper中的。而在新消费者客户端中,消费位移存储在 Kafka 内部的主题 consumer offsets 中 。
- 提交的offset为 lastConsumedOffset + 1
- lastConsumedOffset:上一次poll拉取到的分区的最后一条消息的offset
- 自动提交
- Kafka 中默认的消费位移的提交方式是自动提交,由消费者客户端参数
enable.auto.commit
配置,默认值为 true - 该自动提交是定期提交,定期的周期时间由客户端参数
auto.commit.interval.ms
配置,默认值为5秒。(此参数生效的前提是enable.auto.commit
参数为 true) - 自动提交位移的方式非常简便,但会导致重复消费和消息丢失的问题
- Kafka 中默认的消费位移的提交方式是自动提交,由消费者客户端参数
- 手动提交
- 开启手动提交功能的前提是消费者客户端参数
enable.auto.commit
配置为 false - 同步提交:
commitSync()
,会阻塞消费者线程直至位移提交完成 - 异步提交:
commitAsync()
,不会阻塞消费者线程,可能在提交结果未返回之前就开始了新一轮的拉取操作
- 开启手动提交功能的前提是消费者客户端参数
再均衡
再均衡是指分区的所属权从一个消费者转移到另一消费者的行为,它为消费组具备高可用性和伸缩性提供保障,使我们可以既方便又安全地删除消费组内的消费者或往消费组内添加消费者。
在再均衡发生期间,消费组内的消费者是无法读取消息的
当 一个分区被重新分配给另一个消费者时, 消费者当前的状态也会丢失。(当一个消费者A还没提交消费位移就发生了再均衡时,该分区分配的另一个消费者B会重新消费该分区已被A消费的消息)
在
subcribe()
时,可以注册一个实现ConsumerRebalanceListener
接口的监听器,该接口有两个方法onPartionsRevoked()
:该方法会在再均衡开始之前和消费者停止读取消息之后被调用onPartitionsAssigned()
:该方法会在重新分配分区之后和消费者开始读取消费之前被调用
自定义拦截器
- 实现ConsumerInterceptor接口,该接口有以下方法
configure()
:完成消费者配置时onConsume()
:在poll()方法返回之前调用onCommit()
:在提交完消费位移之后调用close()
:关闭拦截器时执行一些资源的清理工作
自定义反序列化器
实现Deserializer
接口,此接口有三个方法
configure()
:用来配置当前类deserialize()
:用来执行反序列化操作close()
:用来关闭当前的反序列化器
多线程实现
KafkaProducer是线程安全的,但KafkaConsumer是非线程安全的。KafkaConsumer 中定义了一个
acquire()
方法可检测当前是否只有一个线程在操作,否则抛出异常.推荐使用单线程消费,而消息处理用多线程,如下图所示.
消费者的更多多线程实现可查阅《深入理解kafka:核心设计和实践原理》3.2.10节内容
重要的消费者参数
fetch.min.bytes
:一次请求能拉取的最小数据量(默认1b)fetch.max.bytes
:一次请求能拉取的最大数据量(默认52428800b,50m)fetch.max.wait.ms
:与min.bytes有关,指定kafka拉取时的等待时间(默认500ms)max.partition.fetch.bytes
:从每个分区里返回Consumer的最大数据量(默认1048576b,1m)max.poll.records
:一次请求拉取的最大消息数(默认500)connections.max.idle.ms
:多久后关闭闲置连接,默认(540000,9分钟)receive.buffer.bytes
:Socket接收消息缓冲区的大小(默认65536,64k)send.buffer.bytes
:Socket发送消息缓冲区的大小(默认131072,128k)request.timeout.ms
:Consumer等待请求响应的最长时间(默认30000ms)metadata.max.age.ms
:元数据过期时间(默认30000,5分钟)reconnect.backoff.ms
:尝试重新连接指定主机前的等待时间(默认50ms)retry.backoff.ms
:尝试重新发送失败请求到指定主题分区的等待时间(默认100ms)isolation.level
:消费者的事务隔离级别(具体查看进阶篇:事务)
主题与分区
主题的管理
创建
- 自动创建:broker设置
auto.create.topics.enable
=true时,生产者发送消息时会自动创建分区数为num.partitions
(默认1),副本因子为default.replication.facto
r(默认1)的主题 - 通过kafka-topics.sh创建:
create
指令kafka-topics.sh --zookeeper <zkpath> --create --topic <topic> --partitions <N> --replication-factor <N>
- 手动分配副本:
--replica-assignment
--replica-assignment 2:0:1,1:2:0,0:1:2 partion1 AR:2,0,1 partion2 AR:1:2:0 partion3 AR:0:1:2
- 设定参数:
--config <key=value>
- 自动创建:broker设置
分区副本的分配
- 使用kafka-topics.sh创建主题内部分配逻辑按机架信息划分两种策略:
- 未指定机架信息分配策略:
assignReplicasToBrokersRackUnaware()
方法 - 指定机架分配策略:
assignReplicasToBrokersRackAware()
方法
- 未指定机架信息分配策略:
- 使用kafka-topics.sh创建主题内部分配逻辑按机架信息划分两种策略:
当创建一个主题时,不管用什么方式,实质上是在zk的/broker/topics节点下创建与该主题对应的子节点并写入分区副本分配方案,并且在/config/topics节点下创建与该主题相关的子节点并写入主题配置信息
查看:kafka-topics.sh脚本的
list
、describe
指令修改:kafka-topics.sh脚本的
alter
指令配置管理:kafka-configs.sh脚本
删除:kafka-topics.sh脚本的
delete
指令
初识KafkaAdminClient
- KafkaAdminClient:一般情况下 ,我们都习惯使用 kafka-topics.sh脚本来管理主题,但有些时候我们希望将主题管理类的功能集成到公司内部的系统中, 打造集管理、监控、运维、告警为一体的生态平台, 那么就需要以程序调用 API 的方式去实现,KafkaAdminClient提供了这些API。具体的API不在本文中列出,有需要可以自行查阅相关文档。
- 主题合法性验证:普通用户在通过KafkaAdminClient 管理主题时,有可能由于误操作或其他原因而创建了不符合运维规范的主题(比如命名不规范,副本因子数太低等),这些会影响后期的系统运维 。我们可以在broker端设置
create.topic.policy.class.name
来指定一个类验证主题创建时的合法性,这个类需要实现ClreateTopicPolicy接口。
分区的管理
优先副本(preferred replica/preferred leader)
优先副本即 AR 集合中的第一个副本,kafka保证了优先副本的均衡分布。优先副本选举就是对分区leader副本进行选举的时候,尽可能让优先副本成为leader副本,从而保证负载均衡
kafka-perferred-replica-election.sh可进行优先副本选举操作
分区重分配
何时需要进行:
需要将某节点上的分区副本迁移至其他节点时(例如宕机迁移失效副本或有计划下线节点迁移副本时)
注意,下线前最好先关闭或重启此broker,保证不是leader节点,减少了节点间流量复制
需要向新增节点分配原有主题分区副本时
集群中新增节点时,只有新创建的主题分区才有可能分配到新节点上,因此需要把老主体的分区分配到新节点上,否则会造成新节点和原来的节点之间的负载不均衡
如何进行:可使用kafka-reassign-partitions.sh脚本
复制限流
- 数据复制会占用额外的资源,如果重分配的量太大必然会严重影响整体的性能。可以通过对副本间的复制流量加以限制来保证重分配期间整体服务不会受太大的影响,可分别限制follower副本复制速度和leader副本传输速度
- 通过kafka-config.sh或 kafka-reassign-partitions.sh配置
- broker级别:follower/leader.replication.throttled.rate=N
- topic级别:follower/leader.replication.throttled.replicas=N
- 分区重分配过程中的临时限流策略(在进行相应配置后)
- 原AR会应用leader限流配置
- 分区移动的目的地会应用follower限流配置
- 重分配所需的数据复制完成后,临时限流策略会被移除
修改副本因子
- 通过kafka-reassign-partitions.sh配置
如何选择合适的分区数
- 性能测试工具
- 生产者性能测试:kafka-producer-perf-test.sh脚本
- 消费者性能测试:kafka-consumer-perf-test.sh脚本
- 分区数和吞吐量的关系
- 在一定限度内,吞吐量随分区数增加而上升,但由于磁盘、文件系统、I/O调度策略等影响,到一定程度时吞吐量会存在瓶颈或有所下降
- 考量因素
- 分区数会占用文件描述符,而一个进程所能支配的文件描述符是有限的,这也是通常所说的文件句柄的开销
- 如果分区数过多,当集群中某个broker宕机,就会有大量分区需要进行leader角色切换,这个过程会耗费一定的时间,并且在此期间这些分区不可用。分区数越多,kafka的正常启动和关闭耗时也会越长,同时也会增加日志清理的耗时
- 建议将分区数设定为broker的倍数
- 性能测试工具