Kafka中的Offset存储问题小记
原文:https://blog.csdn.net/Rose1645/article/details/80696144
1.概述
Kafka版本[0.10.1.1
],已默认将消费的 offset
迁入到了 Kafka
一个名为 __consumer_offsets
的opic
中。其实,早在 0.8.2.2
版本,已支持存入消费的 offset
到opic
中,只是那时候默认是将消费的 offset
存放在 Zookeeper
集群中。那现在,官方默认将消费的offset
存储在 Kafka
的opic
中,同时,也保留了存储在 Zookeeper
的接口,通过 offsets.storage
属性来进行设置。
2.内容
其实,官方这样推荐,也是有其道理的。之前版本,Kafka
其实存在一个比较大的隐患,就是利用 Zookeeper
来存储记录每个消费者/组的消费进度。虽然,在使用过程当中,JVM
帮助我们完成了一些优化,但是消费者需要频繁的去与 Zookeeper
进行交互,而利用ZKClient
的API
操作Zookeeper
频繁的Write
其本身就是一个比较低效的Action
,对于后期水平扩展也是一个比较头疼的问题。如果期间 Zookeeper
集群发生变化,那 Kafka
集群的吞吐量也跟着受影响。
在此之后,官方其实很早就提出了迁移到 Kafka
的概念,只是,之前是一直默认存储在 Zookeeper
集群中,需要手动的设置,如果,对 Kafka
的使用不是很熟悉的话,一般我们就接受了默认的存储(即:存在 ZK
中)。在新版 Kafka
以及之后的版本,Kafka
消费的offset
都会默认存放在 Kafka
集群中的一个叫 __consumer_offsets
的opic
中。
当然,其实她实现的原理也让我们很熟悉,利用 Kafka
自身的 Topic
,以消费的Group
,Topic
,以及Partition
做为组合 Key
。所有的消费offset
都提交写入到上述的Topic
中。因为这部分消息是非常重要,以至于是不能容忍丢数据的,所以消息的 acking
级别设置为了 -1
,生产者等到所有的 ISR
都收到消息后才会得到 ack
(数据安全性极好,当然,其速度会有所影响)。所以 Kafka
又在内存中维护了一个关于 Group
,Topic
和 Partition
的三元组来维护最新的 offset
信息,消费者获取最新的offset
的时候会直接从内存中获取。这也说明offset
的提交是以消费者为单位的。通过消费组、主题、与分区就能确定是哪个消费组中的消费者提交的offset
的了。
kafka
提供三种语义的传递:
- 至少一次
- 至多一次
- 精确一次
首先在 producer
端保证1
和2
的语义是非常简单的,至少一次只需要同步确认即可(确认方式分为只需要 leader
确认以及所有副本都确认,第二种更加具有容错性),至多一次最简单只需要异步不断的发送即可,效率也比较高。目前在 producer
端还不能保证精确一次,在未来有可能实现,实现方式如下:在同步确认的基础上为每一条消息加一个主键,如果发现主键曾经接受过,则丢弃
在 consumer
端,大家都知道可以控制 offset
,所以可以控制消费,其实 offset
只有在重启的时候才会用到。在机器正常运行时我们用的是 position
,我们实时消费的位置也是 position
而不是 offset
。我们可以得到每一条消息的 position
。如果我们在处理消息之前就将当前消息的 position
保存到 zk
上即 offset
,这就是只多一次消费,因为我们可能保存成功后,消息还没有消费机器就挂了,当机器再打开时此消息就丢失了;或者我们可以先消费消息然后保存 position
到 zk
上即 offset
,此时我们就是至少一次,因为我们可能在消费完消息后offset
没有保存成功。而精确一次的做法就是让 position
的保存和消息的消费成为原子性操作,比如将消息和 position
同时保存到 hdfs
上 ,此时保存的 position
就称为 offset
,当机器重启后,从 hdfs
重新读入offset
,这就是精确一次。
consumer
可以先读取消息,然后将offset
写入日志文件中,然后再处理消息。这存在一种可能就是在存储offset
后还没处理消息就crash
了,新的consumer
继续从这个offset
处理,那么就会有些消息永远不会被处理,这就是上面说的“最多一次
”。consumer
可以先读取消息,处理消息,最后记录offset
,当然如果在记录offset
之前就crash
了,新的consumer
会重复的消费一些消息,这就是上面说的“最少一次
”。
“精确一次
”可以通过将提交分为两个阶段来解决:保存了offset
后提交一次,消息处理成功之后再提交一次。但是还有个更简单的做法:将消息的offset
和消息被处理后的结果保存在一起。比如用Hadoop ETL
处理消息时,将处理后的结果和offset
同时保存在HDFS
中,这样就能保证消息和offser
同时被处理了。
项目中由于消息可被重复消费、因此采用最少一次这种方式。从kafka
集群中获取消息之后,存入数据库后再手动提交offset
。这样能保证不会遗漏需要消费的消息。如果要保证精确一次的话可能比较麻烦,就得把offset
存入数据库,保证业务逻辑的处理与offset
的原子性(同一个事务中)。