kafka高性能原理分析,你看懂了吗?

机械结构的磁盘,如果把消息以随机的方式写入到磁盘,那么磁盘首先要做的就是 寻址,也就是定位到数据所在
首页 新闻资讯 行业资讯 kafka高性能原理分析,你看懂了吗?


一、消费者消费消息offset存储

kafka的所有消息都是持久化存储在broker上的,消费者每次消费消息是如何知道获取哪一条呢?kafka提供一个专门的tipic存储每个consumer group的消费消息的offset,offset保证消息在分区内部有序,所以每次消费者都可以知道自己要从哪一条消息开始消费。__consumer_offsets_* 的一个topic ,把 offset 信 息 写 入 到 这 个 topic 中。__consumer_offsets 默认有50 个分区。broker按照以下规则,存储消费者组的消费offset到对应的 __consumer_offsets分区文件中。

Math.abs(“groupid”.hashCode())%groupMetadataTopicPartitionCount ; 默 认 情 况 下groupMetadataTopicPartitionCount 有 50 个分区,假如groupid=”KafkaConsumerDemo”,计算得到的结果为:35, 意味着当前的consumer_group 的位移信息保存在__consumer_offsets 的第 35 个分区,可以用命令格式化查看分区数据


复制

kafka-simple-consumer-shell.sh –topic __consumer_offsets –partition 35 –broker-list 192.168.0.15:9092 –formatter “kafka.coordinator.group.GroupMetadataManager$OffsetsMessageFormatter”
  • 1.

或者直接使用ui工具查看分区数据。


消费消息的offset保存,是按照整个消费者group来分配保存的,同一个group的消费者offset保存在同一个__consumer_offsets分区。


二、消息持久化存储

首先在kafka里面,消息都是需要持久化存储的,不会分持久化和非持久化消息。存储的方式是基于索引文件+内容文件的方式来进行存储。下面看一下有关存储的相关内容。


消息存储的路径

首先我们知道,一个topic可以有多个分区,然后多个分区按照取模算法分配到集群中的多个broker中。其次一个topic的每一个分区的消息都是分开存储的,例如一个topic test,有三个分区。就会创建三个文件夹 test_0,test_1,test_2,去存储消息,消息的结构上面说了,就是index+内容的组合。例如有一个test3p的topic,在单个broker集群环境下,可以看到在dataDir的目录下面生成了如下三个文件夹。

图片


总的来说消息按照不同分区来进行存储。


消息存储机制详细解析

在对应的分区文件夹内部是如何存储消息的呢?


log.segment.bytes

log.segment.bytes是配置文件里面的一个重要配置,当内容文件达到这个配置的字节数大小时,消息存储的内容文件就会分隔,新增一个内容文件来存储内容,新内容文件的命名是上一个内容文件存储的最后一个offset命令。

图片

上面这图是我设置log.segment.bytes=10000,然后不停发送消息测试结果,我发送的消息内容大小是固定的,可以看到大约是在经过26000个offset左右就会新加一个log文件,同时会成对新增index,timindex文件。这个就是kafka的logSegment,消息文件分片,控制文件大小可以提高io性能。


每种存储文件的作用


00000000000000000000.index

这个就是一个索引文件,里面存储对消息内容文件的物理索引,可以快速定位消息内容所在,内容类似下面格式。

执行命令查看。


复制

kafka-run-class.sh kafka.tools.DumpLogSegments --files /tmp/kafkalogs/test3p-0/00000000000000000000.index --print-datalog
  • 1.





复制

offset: 48 position: 4128offset: 96 position: 8256offset: 144 position: 12373
  • 1.

  • 2.

  • 3.


上面就是查看结果,offset就是消息在分区内部的offset,partition就是一个物理地址,用于索引内容,可以看出这里的索引是属于稀疏索引,并不是每个offset都存储消息的物理地址。


00000000000000000000.log

这个就是内容文件,同样可以使用上面使用的命令查看内容,截取部分结果如下。


















复制

producerId: -1 producerEpoch: -1 sequence: -1 isTransactional: false headerKeys: [] key: 0 payload: isAsyncSend48
offset: 151 position: 12968 CreateTime: 1534321675701 isvalid: true keysize: 4 valuesize: 13 magic: 2 compresscodec: NONE producerId: -1 producerEpoch: -1 sequence: -1 isTransactional: false headerKeys: [] key: - payload: isAsyncSend45
offset: 152 position: 13053 CreateTime: 1534321675705 isvalid: true keysize: 4 valuesize: 13 magic: 2 compresscodec: NONE producerId: -1 producerEpoch: -1 sequence: -1 isTransactional: false headerKeys: [] key: * payload: isAsyncSend42
offset: 153 position: 13138 CreateTime: 1534321675706 isvalid: true keysize: 4 valuesize: 13 magic: 2 compresscodec: NONE producerId: -1 producerEpoch: -1 sequence: -1 isTransactional: false headerKeys: [] key: ' payload: isAsyncSend39offset: 154 position: 13223 CreateTime: 1534321675706 isvalid: true keysize: 4 valuesize: 13 magic: 2 compresscodec: NONE producerId: -1 producerEpoch: -1 sequence: -1 isTransactional: false headerKeys: [] key: $ payload: isAsyncSend36offset: 155 position: 13308 CreateTime: 1534321675706 isvalid: true keysize: 4 valuesize: 13 magic: 2 compresscodec: NONE producerId: -1 producerEpoch: -1 sequence: -1 isTransactional: false headerKeys: [] key: ! payload: isAsyncSend33offset: 156 position: 13393 CreateTime: 1534321675707 isvalid: true keysize: 4 valuesize: 13 magic: 2 compresscodec: NONE producerId: -1 producerEpoch: -1 sequence: -1 isTransactional: false headerKeys: [] key:  payload: isAsyncSend30offset: 157 position: 13478 CreateTime: 1534321675707 isvalid: true keysize: 4 valuesize: 13 magic: 2 compresscodec: NONE producerId: -1 producerEpoch: -1 sequence: -1 isTransactional: false headerKeys: [] key: ayload: isAsyncSend27offset: 158 position: 13563 CreateTime: 1534321675707 isvalid: true keysize: 4 valuesize: 13 magic: 2 compresscodec: NONE producerId: -1 producerEpoch: -1 sequence: -1 isTransactional: false headerKeys: [] key:  payload: isAsyncSend24offset: 159 position: 13648 CreateTime: 1534321675707 isvalid: true keysize: 4 valuesize: 13 magic: 2 compresscodec: NONE producerId: -1 producerEpoch: -1 sequence: -1 isTransactional: false headerKeys: [] key:  payload: isAsyncSend21offset: 160 position: 13733 CreateTime: 1534321675708 isvalid: true keysize: 4 valuesize: 13 magic: 2 compresscodec: NONE producerId: -1 producerEpoch: -1 sequence: -1 isTransactional: false headerKeys: [] key:  payload: isAsyncSend18offset: 161 position: 13818 CreateTime: 1534321675708 isvalid: true keysize: 4 valuesize: 13 magic: 2 compresscodec: NONE producerId: -1 producerEpoch: -1 sequence: -1 isTransactional: false headerKeys: [] key:  payload: isAsyncSend15offset: 162 position: 13903 CreateTime: 1534321675708 isvalid: true keysize: 4 valuesize: 13 magic: 2 compresscodec: NONE producerId: -1 producerEpoch: -1 sequence: -1 isTransactional: false headerKeys: [] key:                               payload: isAsyncSend12offset: 163 position: 13988 CreateTime: 1534321675708 isvalid: true keysize: 4 valuesize: 12 magic: 2 compresscodec: NONE producerId: -1 producerEpoch: -1 sequence: -1 isTransactional: false headerKeys: [] key:   payload: isAsyncSend9offset: 164 position: 14072 CreateTime: 1534321675709 isvalid: true keysize: 4 valuesize: 12 magic: 2 compresscodec: NONE producerId: -1 producerEpoch: -1 sequence: -1 isTransactional: false headerKeys: [] key:  payload: isAsyncSend6offset: 165 position: 14156 CreateTime: 1534321675709 isvalid: true keysize: 4 valuesize: 12 magic: 2 compresscodec: NONE producerId: -1 producerEpoch: -1 sequence: -1 isTransactional: false headerKeys: [] key:  payload: isAsyncSend3
  • 1.

  • 2.

  • 3.

  • 4.

  • 5.

  • 6.

  • 7.

  • 8.

  • 9.

  • 10.

  • 11.

  • 12.

  • 13.

  • 14.

  • 15.

  • 16.

  • 17.


可以看出消息内容文件存储了offset、position和payload等内容,通过索引就可以快速定位到position位置,找到消息内容。


实际的查找算法过程

1.索引文件命名是有序的,因此使用二分查找的方式,可以快速查询到消息对应的索引文件

2.在对应的索引文件中,由于使用的是稀疏索引,所以利用offset查找符合offset范围的position。

3.得到position之后自然可以快速从position位置开始查找对应offset的消息,而不必从头搜索


三、消息日志的清理与压缩

消息清理

消息日志的能够分段存储,一方面能够减少单个文件 内容的大小,另一方面,方便kafka进行日志清理。日志的 清理策略有两个分别是按消息时间和topic消息大小来清理。

1. 根据消息的保留时间,当消息在 kafka 中保存的时间超 过了指定的时间,就会触发清理过程

2. 根据topic存储的数据大小,当topic所占的日志文件大 小大于一定的阀值,则可以开始删除最旧的消息。kafka 会启动一个后台线程,定期检查是否存在可以删除的消 息 通过 log.retention.bytes 和 log.retention.hours 这两个参 数来设置,当其中任意一个达到要求,都会执行删除。默认的保留时间是:7天


消息压缩

Kafka 还提供了“日志压缩(Log Compaction)”功能,通过这个功能可以有效的减少日志文件的大小,缓解磁盘紧 张的情况,在很多实际场景中,消息的 key 和 value 的值 之间的对应关系是不断变化的,就像数据库中的数据会不 断被修改一样,消费者只关心key对应的最新的value。因 此,我们可以开启 kafka 的日志压缩功能,服务端会在后 台启动启动Cleaner线程池,定期将相同的key进行合并, 只保留最新的value值。


四、kafka高性能io

机械结构的磁盘,如果把消息以随机的方式写入到磁盘,那么磁盘首先要做的就是 寻址,也就是定位到数据所在的物理地址,在磁盘上就要 找到对应的柱面、磁头以及对应的扇区;这个过程相对内 存来说会消耗大量时间,为了规避随机读写带来的时间消 耗,kafka采用顺序写的方式存储数据来避免这个过程。


但是 频繁的 I/O 操作仍然会造成磁盘的性能瓶颈,所以 kafka 还有一个重要的性能策略,零拷贝。


如果不使用零拷贝技术,要把数据从磁盘读出并且发送到网卡需要进行以下步骤:

  • 操作系统将数据从磁盘读入到内核空间的页缓存

  • 应用程序将数据从内核空间读入到用户空间缓存中

  • 应用程序将数据写回到内核空间到socket缓存中

  • 操作系统将数据从socket缓冲区复制到网卡缓冲区,最后将数据经网络发出


这个过程涉及到4次上下文切换以及4次数据复制,并且有两次复制操作是由 CPU 完成。但是这个过程中,数据完全没有 进行变化,仅仅是从磁盘复制到网卡缓冲区。


如果是零拷贝技术的话,,可以去掉这些没必要的数据复制操作, 同时也会减少上下文切换次数;现代的unix操作系统提供 一个优化的代码路径,用于将数据直接从页缓存传输到socket;在 Linux 中通过 sendfile 系统调用来完成的。Java 提 供了访问这个系统调用的方法,FileChannel.transferTo API ,这样就可以直接跳过数据复制到用户空间然后又从用户控制复制到socket的过程。


29    2022-06-28 08:42:03    磁盘 kafka 高性能