Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

A consumer joining the consumer group takes 45s to get the first message #1770

Open
7 tasks
persona94 opened this issue Jun 24, 2024 · 5 comments
Open
7 tasks

Comments

@persona94
Copy link

persona94 commented Jun 24, 2024

Description

The consumer subscribes to a topic in the following fashion

def setup_consumer(topic_list):
        try:
            consumer = Consumer(
                    <options for broker>,
                    'auto.offset.reset':'earliest',
                    'fetch.min.bytes': '200000',
                }
            )

            consumer.subscribe(topic_list)
        except Exception as e:
            app.logger.error(
                f'Exception during creation of active calls consumer - {e}'
            )
            return

start_consumer(consumer)

Inside start_consumer() the main while loop looks like

while True:
            message = consumer.poll(0.1)
            if message is None:
                print('Waiting for message')
                gevent.sleep(0.1)
                continue
            elif message.error():
                print(f'Error in loop {message.error()}')
                break
            else:
              print('Message received')
              ....

I see the consumer takes 45s from "Waiting for message" to move to "Message received".

I turned up debug logs in kafka and saw this

"APIVERSION [my-kafka-consumer-ac#consumer-2] [thrd:GroupCoordinator]: GroupCoordinator/0: Enabling feature SaslAuthReq"}                                                                                                                                                                              
"FEATURE [my-kafka-consumer-ac#consumer-2] [thrd:GroupCoordinator]: GroupCoordinator/0: Updated enabled protocol features to MsgVer1,ApiVersion,BrokerBalancedConsumer,ThrottleTime,Sasl,SaslHandshake,BrokerGroupCoordinator,LZ4,OffsetTime,MsgVer2,IdempotentProducer,ZSTD,SaslAuthReq"}             
"STATE [my-kafka-consumer-ac#consumer-2] [thrd:GroupCoordinator]: GroupCoordinator: Broker changed state APIVERSION_QUERY -> UP"}                                                                                                                                                                      
"BROADCAST [my-kafka-consumer-ac#consumer-2] [thrd:GroupCoordinator]: Broadcasting state change"}                                                                                                                                                                                                      
"METADATA [my-kafka-consumer-ac#consumer-2] [thrd:GroupCoordinator]: Skipping metadata refresh of 1 topic(s): connected: already being requested"}                                                                                                                                                     
"CGRPSTATE [my-kafka-consumer-ac#consumer-2] [thrd:main]: Group \"my-kafka-consumer\" changed state wait-broker-transport -> up (join-state init)"}                                                                                                                                             
"BROADCAST [my-kafka-consumer-ac#consumer-2] [thrd:main]: Broadcasting state change"}                                                                                                                                                                                                                  
"JOIN [my-kafka-consumer-ac#consumer-2] [thrd:main]: Group \"my-kafka-consumer\": join with 0 subscribed topic(s)"}                                                                                                                                                                             
"METADATA [my-kafka-consumer-ac#consumer-2] [thrd:main]: Hinted cache of 1/1 topic(s) being queried"}                                                                                                                                                                                                  
"CGRPMETADATA [my-kafka-consumer-ac#consumer-2] [thrd:main]: consumer join: metadata for subscription only available for 0/1 topics (-1ms old)"}                                                                                                                                                       
"METADATA [my-kafka-consumer-ac#consumer-2] [thrd:main]: kafka-controller-headless:9092/bootstrap: Request metadata for 1 topic(s): consumer join"}                                                                                                                                                    
"JOIN [my-kafka-consumer-ac#consumer-2] [thrd:main]: Group \"my-kafka-consumer\": postponing join until up-to-date metadata is available"}                                                                                                                                                      
"CGRPJOINSTATE [my-kafka-consumer-ac#consumer-2] [thrd:main]: Group \"my-kafka-consumer\" changed join state init -> wait-metadata (state up)"}                                                                                                                                                 
"SEND [my-kafka-consumer-ac#consumer-2] [thrd:kafka-controller-headless:9092/bootstrap]: kafka-controller-headless:9092/bootstrap: Sent MetadataRequest (v12, 88 bytes @ 0, CorrId 4)"}                                                                                                                
"DUMP [my-kafka-consumer-ac#consumer-2] [thrd:main]: Assignment dump (started_cnt=0, wait_stop_cnt=0)"}                                                                                                                                                                                                
"DUMP_ALL [my-kafka-consumer-ac#consumer-2] [thrd:main]: List with 0 partition(s):"}                                                                                                                                                                                                                   
"DUMP_PND [my-kafka-consumer-ac#consumer-2] [thrd:main]: List with 0 partition(s):"}                                                                                                                                                                                                                   
"DUMP_QRY [my-kafka-consumer-ac#consumer-2] [thrd:main]: List with 0 partition(s):"}                                                                                                                                                                                                                   
"DUMP_REM [my-kafka-consumer-ac#consumer-2] [thrd:main]: List with 0 partition(s):"}                                                                                                                                                                                                                   
"RECV [my-kafka-consumer-ac#consumer-2] [thrd:kafka-controller-headless:9092/bootstrap]: kafka-controller-headless:9092/bootstrap: Received MetadataResponse (v12, 196 bytes, CorrId 4, rtt 0.19ms)"}                                                                                                  
"ASSIGNDONE [my-kafka-consumer-ac#consumer-2] [thrd:main]: Group \"my-kafka-consumer\": assignment operations done in join-state wait-metadata (rebalance rejoin=false)"}                                                                                                                       
"METADATA [my-kafka-consumer-ac#consumer-2] [thrd:main]: kafka-controller-headless:9092/bootstrap: ===== Received metadata (for 1 requested topics): consumer join ====="}                                                                                                                             
"METADATA [my-kafka-consumer-ac#consumer-2] [thrd:main]: kafka-controller-headless:9092/bootstrap: ClusterId: XJWpYEvfk0PCISvF51RubA, ControllerId: 0"}                                                                                                                                                
"METADATA [my-kafka-consumer-ac#consumer-2] [thrd:main]: kafka-controller-headless:9092/bootstrap: 1 brokers, 1 topics"}                                                                                                                                                                               
"METADATA [my-kafka-consumer-ac#consumer-2] [thrd:main]: kafka-controller-headless:9092/bootstrap:   Broker #0/1: kafka-controller-0.kafka-controller-headless.kube-system.svc.cluster.local:9092 NodeId 0"}                                                                                           
"METADATA [my-kafka-consumer-ac#consumer-2] [thrd:main]: kafka-controller-headless:9092/bootstrap:   Topic kafka_test_topic with 1 partitions"}                                                                                                                                                
"METADATA [my-kafka-consumer-ac#consumer-2] [thrd:main]: kafka-controller-headless:9092/bootstrap: 1/1 requested topic(s) seen in metadata (lookup by name)"}                                                                                                                                          
"CLUSTERID [my-kafka-consumer-ac#consumer-2] [thrd:main]: kafka-controller-headless:9092/bootstrap: ClusterId update \"\" -> \"XJWpYEvfk0PCISvF51RubA\""}                                                                                                                                              
"CONTROLLERID [my-kafka-consumer-ac#consumer-2] [thrd:main]: kafka-controller-headless:9092/bootstrap: ControllerId update -1 -> 0"}                                                                                                                                                                   
"BROADCAST [my-kafka-consumer-ac#consumer-2] [thrd:main]: Broadcasting state change"}                                                                                                                                                                                                                  
"SUBSCRIPTION [my-kafka-consumer-ac#consumer-2] [thrd:main]: Group \"my-kafka-consumer\": effective subscription list changed from 0 to 1 topic(s):"}                                                                                                                                           
"SUBSCRIPTION [my-kafka-consumer-ac#consumer-2] [thrd:main]:  Topic kafka_test_topic with 1 partition(s)"}                                                                                                                                                                                     
"REJOIN [my-kafka-consumer-ac#consumer-2] [thrd:main]: Group \"my-kafka-consumer\": subscription updated from metadata change: rejoining group in state wait-metadata"}                                                                                                                         
"GRPLEADER [my-kafka-consumer-ac#consumer-2] [thrd:main]: Group \"my-kafka-consumer\": resetting group leader info: group (re)join"}                                                                                                                                                            
"REJOIN [my-kafka-consumer-ac#consumer-2] [thrd:main]: Group \"my-kafka-consumer\" (re)joining in join-state wait-metadata with 0 assigned partition(s): Metadata for subscribed topic(s) has changed"}                                                                                         
"REBALANCE [my-kafka-consumer-ac#consumer-2] [thrd:main]: Group \"my-kafka-consumer\" initiating rebalance (NONE) in state up (join-state wait-metadata) with 0 assigned partition(s): Metadata for subscribed topic(s) has changed"}                                                           
"REJOIN [my-kafka-consumer-ac#consumer-2] [thrd:main]: Group \"my-kafka-consumer\": Rejoining group without an assignment: Metadata for subscribed topic(s) has changed"}                                                                                                                       
"CGRPJOINSTATE [my-kafka-consumer-ac#consumer-2] [thrd:main]: Group \"my-kafka-consumer\" changed join state wait-metadata -> init (state up)"}                                                                                                                                                 
"JOIN [my-kafka-consumer-ac#consumer-2] [thrd:main]: Group \"my-kafka-consumer\": join with 1 subscribed topic(s)"}                                                                                                                                                                             
"CGRPMETADATA [my-kafka-consumer-ac#consumer-2] [thrd:main]: consumer join: metadata for subscription is up to date (0ms old)"}                                                                                                                                                                        
"JOIN [my-kafka-consumer-ac#consumer-2] [thrd:main]: kafka-controller-0.kafka-controller-headless.kube-system.svc.cluster.local:9092/0: Joining group \"my-kafka-consumer\" with 1 subscribed topic(s) and member id \"\""}                                                                     
"CGRPJOINSTATE [my-kafka-consumer-ac#consumer-2] [thrd:main]: Group \"my-kafka-consumer\" changed join state init -> wait-join (state up)"}                                                                                                                                                     
"SEND [my-kafka-consumer-ac#consumer-2] [thrd:GroupCoordinator]: GroupCoordinator/0: Sent JoinGroupRequest (v5, 212 bytes @ 0, CorrId 2)"}                                                                                                                                                             
"BROADCAST [my-kafka-consumer-ac#consumer-2] [thrd:GroupCoordinator]: Broadcasting state change"}                                                                                                                                                                                                      
"RECV [my-kafka-consumer-ac#consumer-2] [thrd:GroupCoordinator]: GroupCoordinator/0: Received JoinGroupResponse (v5, 84 bytes, CorrId 2, rtt 1.20ms)"}                                                                                                                                                 
"JOINGROUP [my-kafka-consumer-ac#consumer-2] [thrd:main]: JoinGroup response: GenerationId -1, Protocol , LeaderId , my MemberId my-kafka-consumer-ac-fda83935-3fda-4922-b7b6-8c6ff89e02dc, member metadata count 0: Broker: Group member needs a valid member ID"}                             
"REQERR [my-kafka-consumer-ac#consumer-2] [thrd:main]: GroupCoordinator/0: JoinGroupRequest failed: Broker: Group member needs a valid member ID: explicit actions Ignore"}                                                                                                                            
"MEMBERID [my-kafka-consumer-ac#consumer-2] [thrd:main]: Group \"my-kafka-consumer\": updating member id \"\" -> \"my-kafka-consumer-ac-fda83935-3fda-4922-b7b6-8c6ff89e02dc\""}                                                                                                         
"REJOIN [my-kafka-consumer-ac#consumer-2] [thrd:main]: Group \"my-kafka-consumer\": Rejoining group without an assignment: JoinGroup error: Broker: Group member needs a valid member ID"}                                                                                                      
"CGRPJOINSTATE [my-kafka-consumer-ac#consumer-2] [thrd:main]: Group \"my-kafka-consumer\" changed join state wait-join -> init (state up)"}                                                                                                                                                     
"JOIN [my-kafka-consumer-ac#consumer-2] [thrd:main]: Group \"my-kafka-consumer\": join with 1 subscribed topic(s)"}                                                                                                                                                                             
"CGRPMETADATA [my-kafka-consumer-ac#consumer-2] [thrd:main]: consumer join: metadata for subscription is up to date (2ms old)"}                                                                                                                                                                        
"JOIN [my-kafka-consumer-ac#consumer-2] [thrd:main]: kafka-controller-0.kafka-controller-headless.x.svc.cluster.local:9092/0: Joining group \"my-kafka-consumer\" with 1 subscribed topic(s) and member id \"my-kafka-consumer-ac-fda83935-3fda-4922-b7b6-8c6ff89e02dc\""}     
"CGRPJOINSTATE [my-kafka-consumer-ac#consumer-2] [thrd:main]: Group \"my-kafka-consumer\" changed join state init -> wait-join (state up)"}                                                                                                                                                     
"SEND [my-kafka-consumer-ac#consumer-2] [thrd:GroupCoordinator]: GroupCoordinator/0: Sent JoinGroupRequest (v5, 276 bytes @ 0, CorrId 3)"}                                                                                                                                                             
"BROADCAST [my-kafka-consumer-ac#consumer-2] [thrd:GroupCoordinator]: Broadcasting state change"}                                                                                                                                                                                                      
"METADATA [my-kafka-consumer-ac#consumer-2] [thrd:main]: Expired 1 entries from metadata cache (0 entries remain)"}
"Waiting for message"
"Waiting for message"
"Waiting for message"
"Waiting for message"
<snip><42s of repeated prints></snip>
"RECV [my-kafka-consumer-ac#consumer-2] [thrd:GroupCoordinator]: GroupCoordinator/0: Received JoinGroupResponse (v5, 271 bytes, CorrId 3, rtt 42039.28ms)"}                                                                                                                                                                      
"JOINGROUP [my-kafka-consumer-ac#consumer-2] [thrd:main]: JoinGroup response: GenerationId 14, Protocol range, LeaderId my-kafka-consumer-ac-fda83935-3fda-4922-b7b6-8c6ff89e02dc (me), my MemberId my-kafka-consumer-ac-fda83935-3fda-4922-b7b6-8c6ff89e02dc, member metadata count 1: (no error)"}               
"JOINGROUP [my-kafka-consumer-ac#consumer-2] [thrd:main]: I am elected leader for group \"my-kafka-consumer\" with 1 member(s)"}                                                                                                                                                                                          
"GRPLEADER [my-kafka-consumer-ac#consumer-2] [thrd:main]: Group \"my-kafka-consumer\": resetting group leader info: JoinGroup response clean-up"}                                                                                                                                                                                                                                                                                                                                                                                                                      

How to reproduce

Checklist

Please provide the following information:

  • confluent-kafka-python and librdkafka version (confluent_kafka.version() and confluent_kafka.libversion()): v2.4.0
  • Apache Kafka broker version: 3.7.0
  • Client configuration: 'auto.offset.reset':'earliest', 'client.id': 'my-kafka-consumer-ac', 'fetch.min.bytes': '200000', 'metadata.max.age.ms': '1000',
  • Operating system:
  • Provide client logs (with 'debug': '..' as necessary)
  • Provide broker log excerpts
  • Critical issue
@persona94
Copy link
Author

Is there any other information I can provide? @pranavrth

@pranavrth
Copy link
Member

Can you also provide time in the logs? Better to show human readable logs. Your app logger is not logging time.

@pranavrth pranavrth reopened this Jun 28, 2024
@pranavrth
Copy link
Member

JoinGroupRequest is taking around 42 seconds.

"RECV [my-kafka-consumer-ac#consumer-2] [thrd:GroupCoordinator]: GroupCoordinator/0: Received JoinGroupResponse (v5, 271 bytes, CorrId 3, rtt 42039.28ms)"}

How many consumers is this coordinator serving? Is there some issue in the coordinaor? I don't know what is happening at the coordinator side which is taking 42s to respond.

@persona94
Copy link
Author

Can you also provide time in the logs? Better to show human readable logs. Your app logger is not logging time.

Sure. I had trimmed the logs because they were a lot, I've attached the unmodified file from this morning
book.log

@persona94
Copy link
Author

persona94 commented Jun 28, 2024

JoinGroupRequest is taking around 42 seconds.

"RECV [my-kafka-consumer-ac#consumer-2] [thrd:GroupCoordinator]: GroupCoordinator/0: Received JoinGroupResponse (v5, 271 bytes, CorrId 3, rtt 42039.28ms)"}

How many consumers is this coordinator serving? Is there some issue in the coordinaor? I don't know what is happening at the coordinator side which is taking 42s to respond.

This coordinator serves 2 consumers. Each in it's own consumer group. I run this consumer on 3 separate clusters (ie.e all different independent kafka instances) and all of them seem to have the same issue. From what I can tell this is not an issue on the other consumer which uses a c++ driver. Also, if I use kafka-python as my kafka driver I don't see this delay, I connect instantly

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants