Kafka-python Read From Last Produced Message After A Consumer Restart
Solution 1:
Thanks,
it works!
This is a simplified versione of my code:
consumer = KafkaConsumer('mytopic', bootstrap_servers=[server], group_id=group_id, enable_auto_commit=True)
#dummy poll
consumer.poll()
#go to end of the stream
consumer.seek_to_end()
#start iteratefor message in consumer:
print(message)
consumer.close()
The documentation states that the poll() method is incompatible with the iterator interface, which i guess is the the one I use in the loop at the end of my script. However from initial testing, this code looks like to work correctly.
Is it safe to use it? Or did I misunderstood the docuementation?
Thanks
Solution 2:
You will not to seekToEnd()
to the end of the log.
Keep in mind, that you first need to subscribe to a topic before you can seek. Also, subscribing is lazy. Thus, you will need to add a "dummy poll" before you can seek, too.
consumer.subscribe(...)
consumer.poll() // dummy poll
consumer.seekToEnd()
// now enter your regular poll-loop
Solution 3:
In response to your question in your answer:
It is my understanding that when you execute consumer.poll()
a dictionary is returned. So, when I wanted to poll for information I used a loop to walk through the dictionary.
consumer = KafkaConsumer('mytopic', bootstrap_servers=[server], group_id=group_id, enable_auto_commit=True)
messages = consumer.poll()
data = []
for msg in messages:
for value in messages[msg]:
#Add just the values to the list
data.append(value[6])
I believe what you are doing is getting the iterator with consumer = KafkaConsumer('mytopic', bootstrap_servers=[server], group_id=group_id, enable_auto_commit=True)
and then walking the iterator with
#start iteratefor message in consumer:
print(message)
It doesn't look like you are actually getting just the 500 results from the poll. You can confirm this by adding max_poll_records=5
to your KafkaConsumer configuration. Then when you run the code, if more than 5 messages print out you can tell that you aren't using the poll functionality.
Hope that helps!
Solution 4:
Here is a convenient way to have all messages returned by a poll in a list:
whileTrue:
messages = [] # Store all messages
crs = [] # Store all consumer records
tpd = consumer.poll(timeout_ms=60000, max_records=1)
[ crs.extend(tp) for tp in tpd.values() ] # List of cr's
[ messages.extend([json.loads(cr.value)]) for cr in crs ]
print messages
Post a Comment for "Kafka-python Read From Last Produced Message After A Consumer Restart"