Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Error and data race using transaction example from the library #2913

Open
bentcoder opened this issue May 25, 2024 · 2 comments
Open

Error and data race using transaction example from the library #2913

bentcoder opened this issue May 25, 2024 · 2 comments

Comments

@bentcoder
Copy link

Description

Hi,

Using pool of producers like shown in this example time to time cause kafka server: The producer attempted to update a transaction while another concurrent operation on the same transaction was ongoing error and data race shown below.

Thanks

Versions
Sarama Kafka Go
v1.43.2 v3.3.2 1.22
Configuration
	cfg := sarama.NewConfig()
	cfg.ClientID = clientID
	cfg.Version = sarama.V3_3_2_0
	cfg.Net.MaxOpenRequests = 1
	cfg.Metadata.AllowAutoTopicCreation = false
	cfg.Producer.Return.Successes = true
	cfg.Producer.Compression = sarama.CompressionZSTD
	cfg.Producer.Idempotent = true
	cfg.Producer.RequiredAcks = sarama.WaitForAll
	cfg.Producer.Partitioner = sarama.NewRoundRobinPartitioner
	cfg.Producer.Transaction.ID = tansactionID
	cfg.Producer.Transaction.Retry.Max = 10
	cfg.Producer.Transaction.Retry.Backoff = time.Millisecond * 100
Additional Context
==================
WARNING: DATA RACE
Write at 0x00c00010f310 by goroutine 251:
  github.com/IBM/sarama.(*transactionManager).transitionTo()
      /Users/inan/go/pkg/mod/github.com/!i!b!m/[email protected]/transaction_manager.go:220 +0x330
  github.com/IBM/sarama.(*asyncProducer).maybeTransitionToErrorState()
      /Users/inan/go/pkg/mod/github.com/!i!b!m/[email protected]/async_producer.go:1276 +0xec
  github.com/IBM/sarama.(*asyncProducer).returnError()
      /Users/inan/go/pkg/mod/github.com/!i!b!m/[email protected]/async_producer.go:1286 +0xad
  github.com/IBM/sarama.(*asyncProducer).retryMessage()
      /Users/inan/go/pkg/mod/github.com/!i!b!m/[email protected]/async_producer.go:1323 +0x109
  github.com/IBM/sarama.(*asyncProducer).retryMessages()
      /Users/inan/go/pkg/mod/github.com/!i!b!m/[email protected]/async_producer.go:1332 +0x8c
  github.com/IBM/sarama.(*brokerProducer).handleError.func2()
      /Users/inan/go/pkg/mod/github.com/!i!b!m/[email protected]/async_producer.go:1197 +0x8f4
  github.com/IBM/sarama.(*produceSet).eachPartition()
      /Users/inan/go/pkg/mod/github.com/!i!b!m/[email protected]/produce_set.go:223 +0x89b
  github.com/IBM/sarama.(*brokerProducer).handleError()
      /Users/inan/go/pkg/mod/github.com/!i!b!m/[email protected]/async_producer.go:1196 +0x3cd
  github.com/IBM/sarama.(*brokerProducer).handleResponse()
      /Users/inan/go/pkg/mod/github.com/!i!b!m/[email protected]/async_producer.go:1060 +0x84
  github.com/IBM/sarama.(*brokerProducer).run()
      /Users/inan/go/pkg/mod/github.com/!i!b!m/[email protected]/async_producer.go:993 +0x1a93
  github.com/IBM/sarama.(*brokerProducer).run-fm()
      <autogenerated>:1 +0x33
  github.com/IBM/sarama.withRecover()
      /Users/inan/go/pkg/mod/github.com/!i!b!m/[email protected]/utils.go:43 +0x41
  github.com/IBM/sarama.(*asyncProducer).newBrokerProducer.gowrap1()
      /Users/inan/go/pkg/mod/github.com/!i!b!m/[email protected]/async_producer.go:791 +0x33

Previous read at 0x00c00010f310 by goroutine 252:
  github.com/IBM/sarama.(*transactionManager).publishTxnPartitions()
      /Users/inan/go/pkg/mod/github.com/!i!b!m/[email protected]/transaction_manager.go:766 +0x40e
  github.com/IBM/sarama.(*asyncProducer).newBrokerProducer.func1()
      /Users/inan/go/pkg/mod/github.com/!i!b!m/[email protected]/async_producer.go:818 +0x23c
  github.com/IBM/sarama.withRecover()
      /Users/inan/go/pkg/mod/github.com/!i!b!m/[email protected]/utils.go:43 +0x41
  github.com/IBM/sarama.(*asyncProducer).newBrokerProducer.gowrap2()
      /Users/inan/go/pkg/mod/github.com/!i!b!m/[email protected]/async_producer.go:794 +0x33

Goroutine 251 (running) created at:
  github.com/IBM/sarama.(*asyncProducer).newBrokerProducer()
      /Users/inan/go/pkg/mod/github.com/!i!b!m/[email protected]/async_producer.go:791 +0x4f3
  github.com/IBM/sarama.(*asyncProducer).getBrokerProducer()
      /Users/inan/go/pkg/mod/github.com/!i!b!m/[email protected]/async_producer.go:1343 +0x133
  github.com/IBM/sarama.(*partitionProducer).updateLeaderIfBrokerProducerIsNil.(*partitionProducer).updateLeader.func1()
      /Users/inan/go/pkg/mod/github.com/!i!b!m/[email protected]/async_producer.go:765 +0x244
  github.com/eapache/go-resiliency/breaker.(*Breaker).doWork.func1()
      /Users/inan/go/pkg/mod/github.com/eapache/[email protected]/breaker/breaker.go:94 +0x83
  github.com/eapache/go-resiliency/breaker.(*Breaker).doWork()
      /Users/inan/go/pkg/mod/github.com/eapache/[email protected]/breaker/breaker.go:95 +0x3e
  github.com/eapache/go-resiliency/breaker.(*Breaker).Run()
      /Users/inan/go/pkg/mod/github.com/eapache/[email protected]/breaker/breaker.go:58 +0x88
  github.com/IBM/sarama.(*partitionProducer).updateLeader()
      /Users/inan/go/pkg/mod/github.com/!i!b!m/[email protected]/async_producer.go:756 +0xca
  github.com/IBM/sarama.(*partitionProducer).updateLeaderIfBrokerProducerIsNil()
      /Users/inan/go/pkg/mod/github.com/!i!b!m/[email protected]/async_producer.go:619 +0xcb
  github.com/IBM/sarama.(*partitionProducer).dispatch()
      /Users/inan/go/pkg/mod/github.com/!i!b!m/[email protected]/async_producer.go:689 +0xd08
  github.com/IBM/sarama.(*partitionProducer).dispatch-fm()
      <autogenerated>:1 +0x33
  github.com/IBM/sarama.withRecover()
      /Users/inan/go/pkg/mod/github.com/!i!b!m/[email protected]/utils.go:43 +0x41
  github.com/IBM/sarama.(*asyncProducer).newPartitionProducer.gowrap1()
      /Users/inan/go/pkg/mod/github.com/!i!b!m/[email protected]/async_producer.go:600 +0x33

Goroutine 252 (running) created at:
  github.com/IBM/sarama.(*asyncProducer).newBrokerProducer()
      /Users/inan/go/pkg/mod/github.com/!i!b!m/[email protected]/async_producer.go:794 +0x690
  github.com/IBM/sarama.(*asyncProducer).getBrokerProducer()
      /Users/inan/go/pkg/mod/github.com/!i!b!m/[email protected]/async_producer.go:1343 +0x133
  github.com/IBM/sarama.(*partitionProducer).updateLeaderIfBrokerProducerIsNil.(*partitionProducer).updateLeader.func1()
      /Users/inan/go/pkg/mod/github.com/!i!b!m/[email protected]/async_producer.go:765 +0x244
  github.com/eapache/go-resiliency/breaker.(*Breaker).doWork.func1()
      /Users/inan/go/pkg/mod/github.com/eapache/[email protected]/breaker/breaker.go:94 +0x83
  github.com/eapache/go-resiliency/breaker.(*Breaker).doWork()
      /Users/inan/go/pkg/mod/github.com/eapache/[email protected]/breaker/breaker.go:95 +0x3e
  github.com/eapache/go-resiliency/breaker.(*Breaker).Run()
      /Users/inan/go/pkg/mod/github.com/eapache/[email protected]/breaker/breaker.go:58 +0x88
  github.com/IBM/sarama.(*partitionProducer).updateLeader()
      /Users/inan/go/pkg/mod/github.com/!i!b!m/[email protected]/async_producer.go:756 +0xca
  github.com/IBM/sarama.(*partitionProducer).updateLeaderIfBrokerProducerIsNil()
      /Users/inan/go/pkg/mod/github.com/!i!b!m/[email protected]/async_producer.go:619 +0xcb
  github.com/IBM/sarama.(*partitionProducer).dispatch()
      /Users/inan/go/pkg/mod/github.com/!i!b!m/[email protected]/async_producer.go:689 +0xd08
  github.com/IBM/sarama.(*partitionProducer).dispatch-fm()
      <autogenerated>:1 +0x33
  github.com/IBM/sarama.withRecover()
      /Users/inan/go/pkg/mod/github.com/!i!b!m/[email protected]/utils.go:43 +0x41
  github.com/IBM/sarama.(*asyncProducer).newPartitionProducer.gowrap1()
      /Users/inan/go/pkg/mod/github.com/!i!b!m/[email protected]/async_producer.go:600 +0x33
==================
@puellanivis
Copy link
Contributor

Hm… yeah. The transitionTo() call is holding the statusLock while the publishTxnPartitions is holding the partitionTxnLock.

They should be holding a common lock, or they will race. :(

@bentcoder
Copy link
Author

More conversation can be found here for reference.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants