kafka重置偏移量

背景

某些时候,kafka上游生产者生产的消息有错误,或者下游消费者并不需要消费某部分的数据,这时候,通常有两个解决方案,一种是对数据做不解析处理,直接略过。另一种就是暂时关掉kafka的消费者组,等到生产者正常后再进行消费,但由于kafka本身是默认断点续传的,此时就需要我们先重置kafka中当前kafka组的offset。

解决方案

更改消费者组

由于kafka对某topic中offset的管理是以组的形式来进行的,因此,在新建或更改消费者组后,对于offset的管理也会重新开始,策略取决于配置的auto.offset.reset参数

在重启动时指定起始offset

在再次启动时,通过配置指定要消费topic中分区的offset
@KafkaListener(groupId = "topic_group_test",topicPartitions = { @TopicPartition(topic = "topic_test",partitionOffsets = @PartitionOffset(partition = "0", initialOffset = "9830")) })
java springboot版本

通过kafka服务端脚本指定重置

kafka-consumer-groups.sh --bootstrap-server 10.202.13.27:9092 \ --group cjw --reset-offsets --topic cjw-test --to-earliest --execute
具体支持8种操作
--to-earliest
--to-latest
--to-current
--to-offset
--shift-by N: 把位移调整到当前位移+N 处,N可以为负数
--to-datetime : 把位移调整到大于给定时间的最早位移处,datetime格式yyyy-MM-ddTHH:mm:ss.xxx
--by-duration:把位移调整到距离当前时间指定间隔的位移处,格式为PnDTnHnMnS
--from-file:从CSV文件中读取调整策略

通过API代码来指定

consumer.seekToEnd( consumer.partitionsFor(topic).stream().map(partitionInfo-> new TopicPartition(topic,partitionInfo.partition())) .collect(Collectors.toList()) );
void seek(TopicPartition partition, long offset);
Void seek(TopicPartition partition,OffsetAndMetadata offsetAndMetadata);
Void seekToBeginning(Collection partitions)
Void seekToEnd(Collection partitions)

注意

以上所有操作都需要在消费者组处于未激活的情况下进行
使用代码方式时,需要指定所有分区的消费策略

热门相关:亡国公主穿成王府寡妇:二嫁王妃   驭房我不止有问心术   帝少夜宠:小甜妻,乖!   神秘复苏   盖世双谐