Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

io.confluent.connect.s3.format.parquet.ParquetFormat and ParquetRecordWriterProvider writes value in record field of type map and enum as bytes in Parquet result #713

Open
richard-urena opened this issue Feb 3, 2024 · 0 comments

Comments

@richard-urena
Copy link

richard-urena commented Feb 3, 2024

Context:
My Confluent S3 Sink connector (v10.5.6) is configured to export kafka messages from a topic in AWS MSK that were produced with a schema in the AWS Glue schema registry. I have successfully configured the AWSKafkaAvroConverter to take care of the deserialization of my message. And I set up the config prop "format.class" to io.confluent.connect.s3.format.parquet.ParquetFormat. as that's the target format for S3 that the owner of the S3 bucket expects.

Issue
The export, and deserialization works fine, and the S3 output is parquet as expected. However, there are 2 fields with values that include only bytes instead of the type specified in the Schema. Those two fields are of type Map, and Enum.

There is NO Error found in the connector logs, so wondering if the types of field (map and enum) are supported for conversion to parquet by the ParquetFormat and ParquetRecordWriterProvider classes. The rest of the values like string, boolean, int, and bytes are properly output as the same type in the parquet result to S3.

Ask:
Any advise, or leads will help my team troubleshoot this as there is no connector error or exception thrown. There is only a single version of the schema which is used to produce the record. A separate non-connector kafka consumer is able to deserialize the message using the same schema without any issues.

Key properties of this config:

connector.class=io.confluent.connect.s3.S3SinkConnector
format.class=io.confluent.connect.s3.format.parquet.ParquetFormat
flush.size=1000
enhanced.avro.schema.support=true
parquet.codec=snappy
allow.optional.map.keys=true
s3.bucket.name=my-bucket-name
s3.region=us-east-1
topics=topic-live-streaming
topics.dir=data_streaming
value.converter=com.amazonaws.services.schemaregistry.kafkaconnect.AWSKafkaAvroConverter
key.converter=org.apache.kafka.connect.storage.StringConverter
value.converter.avroRecordType=GENERIC_RECORD
key.converter.avroRecordType=GENERIC_RECORD
value.converter.registry.name=my-schema-registry-1234
key.converter.registry.name=my-schema-registry-1234

I referred to the docs in this page for configuration per my usecase: https://docs.confluent.io/kafka-connectors/s3-sink/current/configuration_options.html

@richard-urena richard-urena changed the title io.confluent.connect.s3.format.parquet.ParquetFormat and ParquetRecordWriterProvider writes value in record field of type map as bytes io.confluent.connect.s3.format.parquet.ParquetFormat and ParquetRecordWriterProvider writes value in record field of type map and enum as bytes in Parquet result Feb 3, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

1 participant