Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

AdminClient is stuck at CONNECT when using OAUTHBEARER with SASL_SSL protocol #1713

Open
6 of 7 tasks
kumarappan-arumugam opened this issue Feb 21, 2024 · 8 comments
Open
6 of 7 tasks
Labels
investigate further It's unclear what the issue is at this time but there is enough interest to look into it

Comments

@kumarappan-arumugam
Copy link

kumarappan-arumugam commented Feb 21, 2024

Description

Admin client cannot connect to the bootstrap servers when using SASL_SSL with OAUTHBEARER.
I'm trying to connect to AWS MSK cluster bootstrap servers. Admin client can connect when using an unauthenticated endpoint. Producer and Consumer config work fine with OAUTHBEARER.

How to reproduce

>>> from confluent_kafka.admin import AdminClient, NewTopic
>>> from aws_msk_iam_sasl_signer import MSKAuthTokenProvider
>>> import socket

>>> def oauth_cb(oauth_config):
...     auth_token, expiry_ms = MSKAuthTokenProvider.generate_auth_token("region")
...     return auth_token, expiry_ms/1000

>>> config = {
...     'bootstrap.servers': 'xxx-1.kafka.us-west-2.amazonaws.com:9098,xxx-2.kafka.us-west-2.amazonaws.com:9098',
...     'security.protocol': 'SASL_SSL',
...     'sasl.mechanism': 'OAUTHBEARER',
...     'oauth_cb': oauth_cb,
... }

>>> admin = AdminClient(config)
>>> admin.list_topics(timeout=2)
Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
  File "/venv3/lib/python3.8/site-packages/confluent_kafka/admin/__init__.py", line 603, in list_topics
    return super(AdminClient, self).list_topics(*args, **kwargs)
cimpl.KafkaException: KafkaError{code=_TRANSPORT,val=-195,str="Failed to get metadata: Local: Broker transport failure"}

Debug logs:

>>> config = {
...     'bootstrap.servers': 'xxx-1.kafka.us-west-2.amazonaws.com:9098,xxx-2.kafka.us-west-2.amazonaws.com:9098',
...     'security.protocol': 'SASL_SSL',
...     'sasl.mechanism': 'OAUTHBEARER',
...     'oauth_cb': oauth_cb,
...     'debug': 'all'
... }
>>> admin = AdminClient(config)
%7|1708548520.827|SASL|rdkafka#producer-2| [thrd:app]: Selected provider OAUTHBEARER (builtin) for SASL mechanism OAUTHBEARER
%7|1708548520.827|OPENSSL|rdkafka#producer-2| [thrd:app]: Using statically linked OpenSSL version OpenSSL 3.0.11 19 Sep 2023 (0x300000b0, librdkafka built with 0x300000b0)
%7|1708548520.828|CACERTS|rdkafka#producer-2| [thrd:app]: Setting default CA certificate location to /etc/ssl/certs/ca-certificates.crt, override with ssl.ca.location
out=2)%7|1708548520.879|WAKEUPFD|rdkafka#producer-2| [thrd:app]: sasl_ssl://xxx-1.kafka.us-west-2.amazonaws.com:9098/bootstrap: Enabled low-latency ops queue wake-ups
%7|1708548520.880|BROKER|rdkafka#producer-2| [thrd:app]: sasl_ssl://xxx-1.kafka.us-west-2.amazonaws.com:9098/bootstrap: Added new broker with NodeId -1
%7|1708548520.880|WAKEUPFD|rdkafka#producer-2| [thrd:app]: sasl_ssl://xxx-2.kafka.us-west-2.amazonaws.com:9098/bootstrap: Enabled low-latency ops queue wake-ups
%7|1708548520.880|BRKMAIN|rdkafka#producer-2| [thrd:sasl_ssl://xxx-1.kafka.us-west-2]: sasl_ssl://xxx-1.kafka.us-west-2.amazonaws.com:9098/bootstrap: Enter main broker thread
%7|1708548520.880|BROKER|rdkafka#producer-2| [thrd:app]: sasl_ssl://xxx-2.kafka.us-west-2.amazonaws.com:9098/bootstrap: Added new broker with NodeId -1
%7|1708548520.880|CONNECT|rdkafka#producer-2| [thrd:app]: sasl_ssl://xxx-2.kafka.us-west-2.amazonaws.com:9098/bootstrap: Selected for cluster connection: bootstrap servers added (broker has 0 connection attempt(s))
%7|1708548520.880|INIT|rdkafka#producer-2| [thrd:app]: librdkafka v2.3.0 (0x20300ff) rdkafka#producer-2 initialized (builtin.features gzip,snappy,ssl,sasl,regex,lz4,sasl_plain,sasl_scram,plugins,zstd,sasl_oauthbearer,http,oidc, STRIP STATIC_LINKING GCC GXX PKGCONFIG INSTALL GNULD LIBDL PLUGINS ZLIB SSL ZSTD CURL HDRHISTOGRAM SYSLOG SNAPPY SOCKEM SASL_SCRAM SASL_OAUTHBEARER OAUTHBEARER_OIDC CRC32C_HW, debug 0xfffff)
%7|1708548520.880|CONF|rdkafka#producer-2| [thrd:app]: Client configuration:
%7|1708548520.880|CONF|rdkafka#producer-2| [thrd:app]:   client.software.name = confluent-kafka-python
%7|1708548520.880|CONF|rdkafka#producer-2| [thrd:app]:   client.software.version = 2.3.0-rdkafka-2.3.0
%7|1708548520.880|CONF|rdkafka#producer-2| [thrd:app]:   metadata.broker.list = xxx-1.kafka.us-west-2.amazonaws.com:9098,xxx-2.kafka.us-west-2.amazonaws.com:9098
%7|1708548520.880|CONF|rdkafka#producer-2| [thrd:app]:   debug = generic,broker,topic,metadata,feature,queue,msg,protocol,cgrp,security,fetch,interceptor,plugin,consumer,admin,eos,mock,assignor,conf,all
%7|1708548520.880|CONF|rdkafka#producer-2| [thrd:app]:   error_cb = 0x7f388f36f120
%7|1708548520.880|CONF|rdkafka#producer-2| [thrd:app]:   background_event_cb = 0x7f388f360530
%7|1708548520.880|CONF|rdkafka#producer-2| [thrd:app]:   opaque = 0x7f388c7d0880
%7|1708548520.880|CONF|rdkafka#producer-2| [thrd:app]:   security.protocol = sasl_ssl
%7|1708548520.880|CONF|rdkafka#producer-2| [thrd:app]:   sasl.mechanisms = OAUTHBEARER
%7|1708548520.880|CONF|rdkafka#producer-2| [thrd:app]:   oauthbearer_token_refresh_cb = 0x7f388f36eb50
>>> %7|1708548520.880|BRKMAIN|rdkafka#producer-2| [thrd:sasl_ssl://xxx-2.kafka.us-west-2]: sasl_ssl://xxx-2.kafka.us-west-2.amazonaws.com:9098/bootstrap: Enter main broker thread
%7|1708548520.880|CONNECT|rdkafka#producer-2| [thrd:sasl_ssl://xxx-2.kafka.us-west-2]: sasl_ssl://xxx-2.kafka.us-west-2.amazonaws.com:9098/bootstrap: Received CONNECT op
%7|1708548520.880|STATE|rdkafka#producer-2| [thrd:sasl_ssl://xxx-2.kafka.us-west-2]: sasl_ssl://xxx-2.kafka.us-west-2.amazonaws.com:9098/bootstrap: Broker changed state INIT -> TRY_CONNECT
%7|1708548520.880|BROADCAST|rdkafka#producer-2| [thrd:sasl_ssl://xxx-2.kafka.us-west-2]: Broadcasting state change
admin.list_topics(timeout=2)%7|1708548520.883|BRKMAIN|rdkafka#producer-2| [thrd::0/internal]: :0/internal: Enter main broker thread
%7|1708548521.880|CONNECT|rdkafka#producer-2| [thrd:main]: sasl_ssl://xxx-1.kafka.us-west-2.amazonaws.com:9098/bootstrap: Selected for cluster connection: no cluster connection (broker has 0 connection attempt(s))
%7|1708548521.880|CONNECT|rdkafka#producer-2| [thrd:sasl_ssl://xxx-1.kafka.us-west-2]: sasl_ssl://xxx-1.kafka.us-west-2.amazonaws.com:9098/bootstrap: Received CONNECT op
%7|1708548521.880|STATE|rdkafka#producer-2| [thrd:sasl_ssl://xxx-1.kafka.us-west-2]: sasl_ssl://xxx-1.kafka.us-west-2.amazonaws.com:9098/bootstrap: Broker changed state INIT -> TRY_CONNECT
%7|1708548521.880|BROADCAST|rdkafka#producer-2| [thrd:sasl_ssl://xxx-1.kafka.us-west-2]: Broadcasting state change

%7|1708548522.108|CONNECT|rdkafka#producer-2| [thrd:app]: Cluster connection already in progress: application metadata request
%7|1708548522.880|CONNECT|rdkafka#producer-2| [thrd:main]: Cluster connection already in progress: no cluster connection
%7|1708548523.880|CONNECT|rdkafka#producer-2| [thrd:main]: Cluster connection already in progress: no cluster connection
%7|1708548524.108|CONNECT|rdkafka#producer-2| [thrd:app]: Cluster connection already in progress: application metadata request
Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
  File "/venv3/lib/python3.8/site-packages/confluent_kafka/admin/__init__.py", line 603, in list_topics
    return super(AdminClient, self).list_topics(*args, **kwargs)
cimpl.KafkaException: KafkaError{code=_TRANSPORT,val=-195,str="Failed to get metadata: Local: Broker transport failure"}
%7|1708548525.880|CONNECT|rdkafka#producer-2| [thrd:main]: Cluster connection already in progress: no cluster connection
%7|1708548526.880|CONNECT|rdkafka#producer-2| [thrd:main]: Cluster connection already in progress: no cluster connection
%7|1708548527.880|CONNECT|rdkafka#producer-2| [thrd:main]: Cluster connection already in progress: no cluster connection
%7|1708548528.881|CONNECT|rdkafka#producer-2| [thrd:main]: Cluster connection already in progress: no cluster connection
%7|1708548529.881|CONNECT|rdkafka#producer-2| [thrd:main]: Cluster connection already in progress: no cluster connection

Checklist

Please provide the following information:

  • confluent-kafka-python and librdkafka version (confluent_kafka.version() and confluent_kafka.libversion()): 2.3.0
  • Apache Kafka broker version: 3.5.1
  • Client configuration: {...}
  • Operating system: ubuntu
  • Provide client logs (with 'debug': '..' as necessary)
  • Provide broker log excerpts
  • Critical issue
@GergelyKalmar
Copy link

Looks like the auth callback is not used with the AdminClient. Not sure if there's any workaround at the moment besides using username/password auth.

@GergelyKalmar
Copy link

Looks like this affects methods like Consumer.list_topics() as well, it doesn't work with OAUTHBEARER. That particular issue can be worked around by executing consumer.poll() first, after which list_topics() seems to be working fine. I've switched over to kafka-python (https://kafka-python.readthedocs.io/en/master/) for now, because this seems to be working fine there.

@pranavrth pranavrth added the investigate further It's unclear what the issue is at this time but there is enough interest to look into it label Feb 26, 2024
@mohasin-ibrahim
Copy link

Facing the same issue as well when trying to connect to MSK from a lambda. However, java based lambda works just fine. I have triple checked the IAM side - both java and python lambdas are using the same SG and role. This not just happens for admin but for producer as well. Are there any fix in progress for this?

@woodlee
Copy link
Contributor

woodlee commented Apr 2, 2024

I was able to get this to work in my code by calling e.g. admin_client.poll(1) just after instantiating the client. (In my case, I had to do it with my producer as well.) But I think the auth mechanism might work asynchronously, because if I try to use the admin client too soon after issuing that poll() it will fail... so for now I have a 3-second sleep after the poll() call and so far, so good. Not great (and not documented as far as I've found), but maybe it will get you going.

@mfatihaktas
Copy link

I am having the same issue while trying to access MSK from an EKS node with IAM.

@woodlee I tried using your suggestion with admin_client.poll(1) but unfortunately it did not work for me. Have you found a different way to work around this issue?

@ketan-nabera
Copy link

Facing the same issue. Has anyone found a work around for this ?

@mfatihaktas
Copy link

@ketan-nabera Actually, the workaround above also worked for me after some trial and error. In my case, I had to put a 10-second sleep after poll(1).

@woodlee
Copy link
Contributor

woodlee commented Jun 7, 2024

@ketan-nabera Our code that is currently working properly against an AWS MSK cluster just does an initial call to poll(3) right before calling other admin client methods, and doesn't seem to need any additional sleep time after that. You can see it here (in the method at L473 if the link doesn't take you there).

I suppose it's possible that the needed amount of poll timeout could depend on network latency between the process and the brokers. But that's just a guess on my part. In our case the code is running on EC2 in the same region as our MSK clusters.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
investigate further It's unclear what the issue is at this time but there is enough interest to look into it
Projects
None yet
Development

No branches or pull requests

7 participants