How Kafka consumers can start reading messages from a different offset and get back to the start.

Antonio Di Mariano
4 min readDec 7, 2018

in a nutshell, how to use consumer.seek with kafka-python and python3.x

In this post I’d like to give an example of how to consume messages from a kafka topic and especially how to use the method consumer.position, consumer.seek, in order to move backward to previous messages. I have spent a few days figuring out of to do, so I’ve decided to write a post not to waste my time anymore and share what I’ve learnt. Frankly speaking I’ve found the official documentation of the python package kafka-python a little bit skinny with just ordinary examples. It took a while ,but I’ve finally gotten my head around about the kafka-python packages and its functionalities. This post is not about how to produce a message to a topic and how to consume it. The official documentation already provide us with a good example

In a nutshell, in kafka every message consists of a key, a value and a timestamp. The fact that each message is marked with a timestamp let me think that I can , somehow, use previous offset and, what is more useful, use a timestamp to look for an given offset. This is useful if we want to feed a dashboard with data and be able to browse the history.

The kafka-python package seek() method changes the current offset in the consumer so it will start consuming messages from that in the next poll(), as in the documentation:

The last consumed offset can be manually set through seek() or automatically set as the last committed offset for the subscribed list of partitions.

One thing Kafka is famous for is that multiple producers in Kafka can write to the same topic, and multiple consumers can read from the same topic with no issue. Kafka knows how to distribute data among all the consumers.

Kafka consumers are usually grouped under a group_id. Each consumer belonging to the same consumer group receives its records from a different subset of the partitions in the topic.

If a topic has 4 partitions and I have only one consumer C1 in my group, this guy will get messages from all the partitions. If I had another consumer C2 to the same group, each of consumer will receive data from two partitions. The limit in this logic is when the number of consumers are higher than the number of partitions, some of the consumers will get no messages because of all the partitions are already assigned. But this is another field, which involves scalability.

For the sake of my exercise, I need to take in mind that each consumer maintains offset to keep track of the next record to consume and it can start consuming records from the earliest offset in the topic subscribed or from the latest offset ignoring all the previous records. In my case I set auto_offset_reset=’earliest’ because I want my consumer starting polling data from the beginning as a default.

So, I have my Class Consumer implementing the KafkaConsumer method to instantiate a consumer consuming records from a topic.


from kafka import (
KafkaConsumer
)
from kafka.structs import (
TopicPartition
)
import
from utils.logger import Logger
class Consumer:
def __init__(self, **kwargs):

self.__servers = kwargs.pop('servers', None)
self.__mytopic = kwargs.pop('topics', None)
logging.basicConfig(
format='%(asctime)s.%(msecs)s:%(name)s:%(thread)d:%(levelname)s:%(process)d:%(message)s',
level=logging.INFO
)
logging.getLogger('kafka').setLevel(logging.INFO)
self.consumer = KafkaConsumer(bootstrap_servers=self.__servers,
auto_offset_reset='earliest',
consumer_timeout_ms=1000,
group_id=os.environ['KAFKA_GROUP_ID'],
value_deserializer=lambda m: json.loads(m.decode('utf-8')),
api_version=(0, 10, 1), )

self.mypartition = TopicPartition(self.__mytopic, 0)
self.assigned_topic = [self.mypartition]
self.consumer.assign(self.assigned_topic)

I am going to use the kafka-python poll() API to consumer records from a topic with 1 partions. On each poll, my consumer will use the earliest consumed offset as starting offset and will fetch data from that sequentially. The default option is to try to use the last consumed offset as the starting offset.

Let’s play with the seek() API.

If we want to move to the most recent offset for my partition

self.consumer.seek_to_end(self.mypartition)

I can have the new or current position, using the position() API

pos = self.consumer.position(self.mypartition) 
print("[most recent offset]=", pos)

If I want to go back to the beginning I can use

self.consumer.seek_to_beginning(self.mypartition)

which seeks to the oldest offset available in the partition.

If I want go to a given offset

self.consumer.seek(self.mypartition, new_pos)

The kafka-python module has also an interesting API offsets_for_times() but I haven't had free time in my hands to test it.

After that, it’s now time to consume data from the offset seletected

for msg in self.consumer:
print("data":,msg.value())

--

--

Antonio Di Mariano

Senior Software and Research Engineer with a passion for Distributed Systems, Microservices, and Python