I recently was testing Kafka 0.10 with the excellent Confluent Kafka Python client API and in the process came up with an initially confusing situation where even after dropping a topic and re-creating it I still was seeing an old offset for a consumer group and didn’t understand how to clear it.

I had been testing with both the old and the new Consumer API, and using different ways to obtain consumer group offsets, adding a bit of confusion, I put it here so that it might help others.

First I tried to delete the consumer group. Sure enough, new consumer API doesn’t need to delete consumer groups!

kafka@aleph:~$ /opt/kafka/bin/kafka-run-class.sh kafka.admin.ConsumerGroupCommand --delete --group thirsty1 --zookeeper localhost:2181
Delete for group thirsty1 failed because group does not exist.
kafka@aleph:~$ /opt/kafka/bin/kafka-run-class.sh kafka.admin.ConsumerGroupCommand --delete --group thirsty1 --new-consumer --bootstrap-server aleph:9092
Option [delete] is not valid with [new-consumer]. Note that there's no need to delete group metadata for the new consumer as it is automatically deleted when the last member leaves
...

When I try to check consumer group offsets with the new (as of 0.9) kafka.admin.ConsumerGroupCommand utility, nothing comes up (as it should be), but the old kafka.tools.ConsumerOffsetChecker stills return an offset.

kafka@aleph:~$ /opt/kafka/bin/kafka-run-class.sh kafka.admin.ConsumerGroupCommand --describe --group thirsty1 --zookeeper localhost:2181
No topic available for consumer group provided
GROUP TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG OWNER
kafka@aleph:~$ /opt/kafka/bin/kafka-run-class.sh kafka.tools.ConsumerOffsetChecker --group thirsty1 --topic tap --zookeeper localhost:2181
[2016-09-12 08:34:57,369] WARN WARNING: ConsumerOffsetChecker is deprecated and will be dropped in releases following 0.9.0. Use ConsumerGroupCommand instead. (kafka.tools.ConsumerOffsetChecker$)
Group Topic Pid Offset logSize Lag Owner
thirsty1 tap 0 1715074 0 -1715074 none
thirsty1 tap 1 -1 0 unknown none
thirsty1 tap 2 -1 0 unknown none

I check then the ZooKeeper paths.

kafka@aleph:~$ /usr/share/zookeeper/bin/zkCli.sh -server localhost:2181
Connecting to localhost:2181
Welcome to ZooKeeper!
JLine support is enabled

WATCHER::

WatchedEvent state:SyncConnected type:None path:null
[zk: localhost:2181(CONNECTED) 4] ls /
[isr_change_notification, zookeeper, admin, consumers, config, controller, brokers, controller_epoch]

Tip: the REPL understand the TAB-autofill

[zk: localhost:2181(CONNECTED) 9] ls /consumers/thirsty1/offsets/tap
[0]

We check what we have there and yes, the offset.

[zk: localhost:2181(CONNECTED) 11] get /consumers/thirsty1/offsets/tap/0
1715074
cZxid = 0x3d
ctime = Thu Aug 04 15:22:51 UTC 2016
mZxid = 0x4409
mtime = Sat Aug 06 19:01:47 UTC 2016
pZxid = 0x3d
cversion = 0
dataVersion = 17182
aclVersion = 0
ephemeralOwner = 0x0
dataLength = 7
numChildren = 0

Let’s reset that.

[zk: localhost:2181(CONNECTED) 12] set /consumers/thirsty1/offsets/tap/0 0
cZxid = 0x3d
ctime = Thu Aug 04 15:22:51 UTC 2016
mZxid = 0x46b0
mtime = Mon Sep 12 09:05:44 UTC 2016
pZxid = 0x3d
cversion = 0
dataVersion = 17183
aclVersion = 0
ephemeralOwner = 0x0
dataLength = 1
numChildren = 0

Check it was updated.

[zk: localhost:2181(CONNECTED) 13] get /consumers/thirsty1/offsets/tap/0
0
cZxid = 0x3d
ctime = Thu Aug 04 15:22:51 UTC 2016
mZxid = 0x46b0
mtime = Mon Sep 12 09:05:44 UTC 2016
pZxid = 0x3d
cversion = 0
dataVersion = 17183
aclVersion = 0
ephemeralOwner = 0x0
dataLength = 1
numChildren = 0

Check again.

kafka@aleph:~$ /opt/kafka/bin/kafka-run-class.sh kafka.tools.ConsumerOffsetChecker --group thirsty1 --topic tap --zookeeper localhost:2181
[2016-09-12 09:09:23,649] WARN WARNING: ConsumerOffsetChecker is deprecated and will be dropped in releases following 0.9.0. Use ConsumerGroupCommand instead. (kafka.tools.ConsumerOffsetChecker$)
Group Topic Pid Offset logSize Lag Owner
thirsty1 tap 0 0 0 0 none
thirsty1 tap 1 -1 0 unknown none
thirsty1 tap 2 -1 0 unknown none

Alexis

Alexis is the founder of Aleph Technologies, a data infrastructure consulting and professional services provider based in Brussels, Belgium.

More Posts - Website

Follow Me:
TwitterLinkedIn