Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

class java.nio.HeapByteBuffer cannot be cast to class org.apache.avro.generic.GenericRecord #675

Open
applejag opened this issue Aug 17, 2023 · 7 comments

Comments

@applejag
Copy link

Hello! I'm trying to do basic message backups to S3 using https://docs.confluent.io/kafka-connectors/s3-sink/current/overview.html#schema-evolution

When trying to use the s3-source to restore messages into a brand new Kafka, I get the error:

[2023-08-17 16:03:31,521] ERROR [s3-source|task-0] WorkerSourceTask{id=s3-source-0} Task threw an uncaught and unrecoverable exception. Task is being killed and will not recover until manually restarted (org.apache.kafka.connect.runtime.WorkerTask:210)
org.apache.kafka.connect.errors.ConnectException: Error while executing read with record co-ordinates : RecordCoordinates [storagePartition=topics/staging-teambank-risk-assessment-calculated/, startOffset=0, endOffset=4904]
	at io.confluent.connect.cloud.storage.errorhandler.handlers.ReThrowErrorHandler.handle(ReThrowErrorHandler.java:21)
	at io.confluent.connect.cloud.storage.source.util.StorageObjectSourceReader.nextRecord(StorageObjectSourceReader.java:69)
	at io.confluent.connect.cloud.storage.source.StorageSourceTask.poll(StorageSourceTask.java:161)
	at org.apache.kafka.connect.runtime.AbstractWorkerSourceTask.poll(AbstractWorkerSourceTask.java:457)
	at org.apache.kafka.connect.runtime.AbstractWorkerSourceTask.execute(AbstractWorkerSourceTask.java:351)
	at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:202)
	at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:257)
	at org.apache.kafka.connect.runtime.AbstractWorkerSourceTask.run(AbstractWorkerSourceTask.java:75)
	at org.apache.kafka.connect.runtime.isolation.Plugins.lambda$withClassLoader$1(Plugins.java:177)
	at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
	at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
	at java.base/java.lang.Thread.run(Thread.java:829)
Caused by: java.lang.ClassCastException: class java.nio.HeapByteBuffer cannot be cast to class org.apache.avro.generic.GenericRecord (java.nio.HeapByteBuffer is in module java.base of loader 'bootstrap'; org.apache.avro.generic.GenericRecord is in unnamed module of loader org.apache.kafka.connect.runtime.isolation.PluginClassLoader @2beee7ff)
	at io.confluent.connect.cloud.storage.source.format.CloudStorageAvroFormat.extractRecord(CloudStorageAvroFormat.java:75)
	at io.confluent.connect.cloud.storage.source.StorageObjectFormat.nextRecord(StorageObjectFormat.java:72)
	at io.confluent.connect.cloud.storage.source.util.StorageObjectSourceReader.nextRecord(StorageObjectSourceReader.java:65)
	... 12 more
[2023-08-17 16:03:31,523] INFO [s3-source|task-0] Stopping storage source connector (io.confluent.connect.cloud.storage.source.StorageSourceTask:233)

Configs for cluster 1

Where I'm trying to backup messages using s3-sink. Running a separate pod from the Kafka using:

/opt/bitnami/kafka/bin/connect-standalone.sh /config/connect-standalone.properties /config/sink.properties

connect-standalone.properties

bootstrap.servers=kafka-0.kafka-headless.kafka.svc.cluster.local:9093,kafka-1.kafka-headless.kafka.svc.cluster.local:9093,kafka-2.kaf
ka-headless.kafka.svc.cluster.local:9093

offset.flush.interval.ms=10000
offset.storage.file.filename=/tmp/connect.offsets
plugin.path=/opt/bitnami/kafka/plugins

key.converter=org.apache.kafka.connect.converters.ByteArrayConverter
value.converter=org.apache.kafka.connect.converters.ByteArrayConverter

sink.properties

name=s3-sink
connector.class=io.confluent.connect.s3.S3SinkConnector
topics.regex=.*

flush.size=10000
rotate.schedule.interval.ms=600000
locale=en_US
timezone=Europe/Berlin

format.class=io.confluent.connect.s3.format.avro.AvroFormat
schema.compatibility=NONE
schema.generator.class=io.confluent.connect.storage.hive.schema.DefaultSchemaGenerator

partition.duration.ms=600000
partitioner.class=io.confluent.connect.storage.partitioner.TimeBasedPartitioner
path.format='year'=YYYY/'month'=MM/'day'=dd/'hour'=HH

storage.class=io.confluent.connect.s3.storage.S3Storage
store.url=https://XXXXXXXXXXXXXXXX
s3.bucket.name=kafka-backup-testing
s3.bucket.tagging=true
s3.part.size=5242880
aws.access.key.id=XXXXXXXXXXXXXXXX
aws.secret.access.key=XXXXXXXXXXXXXXXX

behavior.on.error=fail

Configs for cluster 2

Where I'm trying to restore messages using s3-source. Running a separate pod from the Kafka using:

/opt/bitnami/kafka/bin/connect-standalone.sh /config/connect-standalone.properties /config/source.properties

connect-standalone.properties

(same as cluster 1)

source.properties

name=s3-source
connector.class=io.confluent.connect.s3.source.S3SourceConnector

confluent.topic.bootstrap.servers=kafka-0.kafka-headless.kafka.svc.cluster.local:9093,kafka-1.kafka-headless.kafka.svc.cluster.local:9093,kafka-2.kafka-headless.kafka.svc.cluster.local:9093
tasks.max=1

format.class=io.confluent.connect.s3.format.avro.AvroFormat

partitioner.class=io.confluent.connect.storage.partitioner.TimeBasedPartitioner
path.format='year'=YYYY/'month'=MM/'day'=dd/'hour'=HH

storage.class=io.confluent.connect.s3.storage.S3Storage
store.url=https://XXXXXXXXXXXXXXXX
s3.bucket.name=kafka-backup-testing
aws.access.key.id=XXXXXXXXXXXXXXXX
aws.secret.access.key=XXXXXXXXXXXXXXXX

behavior.on.error=fail

What am I doing wrong here?

Btw the messages are protobuf encoded, but I don't want to lock in the message format with the protobuf converter. To my understanding, I just want to use the ByteArrayConverter as I just want to backup and restore the messages as-is.

@OneCricketeer
Copy link

just want to backup and restore the messages as-is

Then don't use AvroFormat. Use format.class=io.confluent.connect.s3.format.bytearray.ByteArrayFormat, as mentioned in the page you've shared

@OneCricketeer
Copy link

Worth mentioning that this isn't exactly a proper backup strategy. You've only configured the values of the record to be saved, not the timestamp, headers, or key (or Protobuf schema itself assuming you are using the Schema Registry)

@applejag
Copy link
Author

Worth mentioning that this isn't exactly a proper backup strategy. You've only configured the values of the record to be saved, not the timestamp, headers, or key (or Protobuf schema itself assuming you are using the Schema Registry)

What would a proper backup strategy look like?

@OneCricketeer
Copy link

OneCricketeer commented Aug 23, 2023

  1. This doesn't backup any topic metadata like replication factor, compression settings, etc. You need to backup Zookeeper for this, or however Kraft manages state.
  2. You'd want something that would persist the whole Kafka payload, as is, without deserialization. In other words, it's not a true backup without something that can also restore the original data without modification (re-serialization, or other modifications). The S3 Source Connector does not exactly do this (and is closed source, so I don't really know how it operates; you've opened this issue in the S3 Sink connector repo). Also restoring any data may cause offset-mismatch (backed up offset 1 may not get restored/produced back at offset 1 and could be out of order), if that is important.
  3. Assuming you could restore, that restore process needs to have exact knowledge of how producers send data (they can override the partition of any single record, and assuming a DefaultPartitioning strategy is therefore not safe)

I personally haven't come across any decent way to backup Kafka in a streaming fashion without using tools like MirrorMaker2 to replicate to a warm standby cluster with increased retention periods.

And I didn't even mention compacted topics... They would not obviously not compact in S3, and therefore would not be restored in a compacted format.

Ultimately, the last option is static disk snapshots on a regular basis, which you could upload to S3 separately, if needed.

@applejag
Copy link
Author

Thank you so much for that writeup!

I guess then that this issue could be closed as "user error"

@OneCricketeer
Copy link

OneCricketeer commented Aug 23, 2023

If you want to do some more research into the topic (pun intended)

I've tried to implement this at my last job and used

Related issue - jcustenborder/kafka-connect-transform-archive#6

@applejag
Copy link
Author

Yea well I've to tweak the requirements a little, because just "do backups on kafka" is apparently way too broad.

Just focusing on the messages will probably be enough for my use case, and ignoring the other parts. Some messages lay in our Kafka for a week or two before it gets processed. In case of a disaster recovery, we need to ensure those weeks old messages also get restored, in addition to our other non-Kafka databases.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants