Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Transactional producer silently fails to commit a transaction. #4697

Open
7 tasks done
vncorpacc opened this issue Apr 22, 2024 · 2 comments
Open
7 tasks done

Transactional producer silently fails to commit a transaction. #4697

vncorpacc opened this issue Apr 22, 2024 · 2 comments

Comments

@vncorpacc
Copy link

Description

Versions: in the checklist.
We are using confluent kafka python bindings.
This is largely a copy-paste from redpanda-data/redpanda#17921. It is not clear to me whether the issue lies with a misbehaving broker or a client library. Would appreciate any insights.

Transactional producer silently fails to commit a batch: commit_transaction returns without exceptions but messages are not committed after an inspection of a topic.

The issue was encountered at least 3 times.
First 2 times our producer detected an error at an unexpected scenario: after a certain call to produce() (rd_kafka_produce) that is called some time after a begin_transaction(). produce() returned -172 (wrong state).
After the inspection of the state of the topic it was obvious that a call to commit_transaction() that preceded the last begin_transaction() succeeded as we crash on any errors there. But in redpanda messages were not really committed - they did not show up on the redpanda console and when we tried to read those supposedly committed messages programmatically - the request timed out. The producer also got offsets for those uncommitted messages in an on_delivery callback from redpanda meaning that they reached redpanda but were not committed.
For the 3rd time the producer did throw on commit_transaction but I still include this here as it seems relevant.

confluent_kafka did not output any logs while this was happening.
For 2 times when the producer noticed an error only on a produce call confluent_kafka did not log anything so we only have an error code: cimpl.KafkaException: KafkaError{code=_STATE,val=-172,str="Unable to produce message: Local: Erroneous state"}.

For the 3 time when the producer did fail at commit:
Error: _PURGE_QUEUE. Retriable: False. Fatal: False"}

How to reproduce

We couldn't reproduce the issue in any different scenario: it does not fail in our testing environment with the same exact code for the producer. It does however manifests itself reliably in the production environment.

Checklist

IMPORTANT: We will close issues where the checklist has not been completed.

Please provide the following information:

  • librdkafka: v2.2.0, python-confluent-kafka: v2.2.0
  • Broker: Redpanda v23.1.17
  • Producer Configuration:
{
               'transactional.id': 'REDACTED',
               'enable.idempotence': True,
               'acks': 'all',
               'max.in.flight.requests.per.connection': 5,
               'retries': 5,
               'retry.backoff.ms': 500,
               'linger.ms': 500,
               'security.protocol': 'SSL',
               'ssl.ca.location': 'REDACTED',
               'ssl.certificate.location': 'REDACTED',
               'ssl.key.location': 'REDACTED'
}
@emasab
Copy link
Collaborator

emasab commented Jun 18, 2024

Can you share some code to reproduce this?

@vncorpacc
Copy link
Author

Unfortunately this is not something even we were able to reproduce reliably. It stopped happening after yet another redpanda restart.
I am not sure what code you expect to see.
The outline would be:

producer = confluent_kafka.Producer(cfg)
producer.init_transactions()
while not stop_requested():
	msgs = get_msgs_to_send()
	batch_requests_results = []
	producer.begin_transaction()
	for topic, value, partition in msgs:
		batch_requests_results.append(None)
		msg_num = len(batch_requests_results) - 1
		def on_delivery(
			err: confluent_kafka.KafkaError, msg: confluent_kafka.Message
		):
			if err is None or err.code() == confluent_kafka.KafkaError.NO_ERROR:
				batch_requests_results[msg_num] = msg
			else:
				batch_requests_results[msg_num] = err
		producer.produce(topic,value, partition, on_delivery=on_delivery)
	producer.flush()
	offsets_and_metadata = get_offsets_or_raise(batch_requests_results)
	producer.commit_transaction()
	# We got here! And crashed only on a subsequent produce() in the next batch.
	# Those offsets were not reachable afterwards - they were not really committed.
	store_in_db(offsets_and_metadata)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants