Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Consumer.poll goes into infinite loop #1767

Open
3 of 7 tasks
r-priyam opened this issue Jun 20, 2024 · 5 comments
Open
3 of 7 tasks

Consumer.poll goes into infinite loop #1767

r-priyam opened this issue Jun 20, 2024 · 5 comments

Comments

@r-priyam
Copy link

Description

Consumer.poll client is going into infinite loop when the Kafka is running on Kubernetes. When the cluster is scheduled on the new node is just stuck on the below error:

Screenshot 2024-06-20 at 4 02 28 PM

As you can see in the screenshot the service was just stuck and then it recovered after like 12 hours.

How to reproduce

import socket
from confluent_kafka import Consumer, Message

consumer = Consumer(
	{
            "bootstrap.servers": "",
            "client.id": socket.gethostname(),
            "auto.offset.reset": "earliest",
            "enable.auto.commit": False,
            "group.id": "",
        }
)
consumer.subscribe(topics=[...])
msg: Message = self._consumer.poll(timeout="10') # Goes into infinite loop here and gets stuck

Run the above code and then restart the kafka cluster or change the node in k8s

Checklist

Please provide the following information:

  • confluent-kafka-python and librdkafka version (confluent_kafka.version() and confluent_kafka.libversion()): 2.1.1
  • Apache Kafka broker version:
  • Client configuration: { "bootstrap.servers": "", "client.id": socket.gethostname(), "auto.offset.reset": "earliest", "enable.auto.commit": False, "group.id": "", }
  • Operating system: Linux
  • Provide client logs (with 'debug': '..' as necessary)
  • Provide broker log excerpts
  • Critical issue
@pranavrth
Copy link
Member

It feels that the host is not reachable or up.

@r-priyam
Copy link
Author

@pranavrth the host gets up back in like max 2 mins, it's just k8s scheduling it on the another node which changes the IP, the service container works fine when it's restarted.

@r-priyam
Copy link
Author

@pranavrth, any luck, please? Appears that the package is handling this issue at the very low level and not raising an exception?

@r-priyam
Copy link
Author

@pranavrth, upon further digging in the source code, we found that we can use "error_cb" in the config to have the callback whenever the error is raised. It would be good if the client has raised this directly instead of having a callback.

@pranavrth
Copy link
Member

Can you enable debug logging by using 'debug': 'all' in the config and send us the logs to investigate?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants