Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Null values being replaced with default #716

Open
andyhuynh3 opened this issue Feb 8, 2024 · 5 comments
Open

Null values being replaced with default #716

andyhuynh3 opened this issue Feb 8, 2024 · 5 comments

Comments

@andyhuynh3
Copy link

andyhuynh3 commented Feb 8, 2024

Hello, I'm using Debezium to extract MySQL data into Kafka in Avro format using the Confluent Avro converter. I'm then using the Confluent S3 sink to get this data into S3 as Avro files. However I'm running into an issue on the Kafka --> S3 side where my null values are being replaced with the MySQL default, even with value.converter.ignore.default.for.nullables=true. More details on setup below:

Here's what my S3 sink settings look like

{
   "connector.class":"io.confluent.connect.s3.S3SinkConnector",
   "tasks.max":"1",
   "errors.deadletterqueue.context.headers.enable":"true",
   "errors.deadletterqueue.topic.name":"db_ingestion_dead_letter_queue",
   "errors.deadletterqueue.topic.replication.factor":"1",
   "filename.offset.zero.pad.widthrotate_interval_ms":"12",
   "flush.size":"500000",
   "locale":"en",
   "partition.duration.ms":"60000",
   "partitioner.class":"io.confluent.connect.storage.partitioner.TimeBasedPartitioner",
   "path.format": "'\''year'\''=YYYY/'\''month'\''=MM/'\''day'\''=dd/'\''hour'\''=HH",
   "retry.backoff.ms":"5000",
   "rotate.interval.ms":"15000",
   "rotate.schedule.interval.ms":"60000",
   "s3.bucket.name":"my-bucket",
   "s3.part.size":"5242880",
   "s3.region":"us-west-2",
   "schema.generator.class":"io.confluent.connect.storage.hive.schema.DefaultSchemaGenerator",
   "schema.compability":"NONE ",
   "storage.class":"io.confluent.connect.s3.storage.S3Storage",
   "timezone":"UTC",
   "topics.dir":"developer/kafka-connect-avro/data/raw",
   "topics.regex":"dbzium\\.inventory\\..+",
   "format.class":"io.confluent.connect.s3.format.avro.AvroFormat",
   "key.converter": "io.confluent.connect.avro.AvroConverter",
   "key.converter.schema.registry.url": "http://registry:8080/apis/ccompat/v7",
   "key.converter.auto.registry.schemas": "true",
   "key.converter.ignore.default.for.nullables": "true",
   "schema.name.adjustment.mode":"avro",
   "value.converter": "io.confluent.connect.avro.AvroConverter",
   "value.converter.schema.registry.url": "http://registry:8080/apis/ccompat/v7",
   "value.converter.auto.registry.schemas": "true",
   "value.converter.ignore.default.for.nullables": "true"
}

Here's what my schema looks like:

{
  "type": "record",
  "name": "Value",
  "namespace": "dbzium.inventory.my_table",
  "fields": [
    {
      "name": "id",
      "type": "long"
    },
    {
      "name": "my_first_tinyint_col",
      "type": [
        "null",
        "boolean"
      ],
      "default": null
    },
    {
      "name": "test_str",
      "type": [
        {
          "type": "string",
          "connect.default": "test_str"
        },
        "null"
      ],
      "default": "test_str"
    },
    {
      "name": "__deleted",
      "type": [
        "null",
        "string"
      ],
      "default": null
    },
    {
      "name": "__op",
      "type": [
        "null",
        "string"
      ],
      "default": null
    },
    {
      "name": "__ts_ms",
      "type": [
        "null",
        "long"
      ],
      "default": null
    },
    {
      "name": "__source_ts_ms",
      "type": [
        "null",
        "long"
      ],
      "default": null
    },
    {
      "name": "__source_file",
      "type": [
        "null",
        "string"
      ],
      "default": null
    },
    {
      "name": "__source_pos",
      "type": [
        "null",
        "long"
      ],
      "default": null
    }
  ],
  "connect.name": "dbzium.inventory.my_table.Value"
}

And here's what my message looks like in Kafka:

./kaf -b kafka:9092 consume --schema-registry registry:8080/apis/ccompat/v7 dbzium.inventory.my_table
Key:         { "id": 1 }
Partition:   0
Offset:      0
Timestamp:   2024-02-07 16:35:24.59 +0000 UTC
{
  "__deleted": {
    "string": "false"
  },
  "__op": {
    "string": "c"
  },
  "__source_file": {
    "string": "1.000003"
  },
  "__source_pos": {
    "long": 746927
  },
  "__source_ts_ms": {
    "long": 1707323723000
  },
  "__ts_ms": {
    "long": 1707323724020
  },
  "id": 1,
  "my_first_tinyint_col": null,
  "test_str": null
}

And when I try to read the Avro file produced by the S3 connector via Python, this is what I'm seeing

>>> import copy, json, avro
>>> from avro.datafile import DataFileWriter, DataFileReader
>>> from avro.io import DatumWriter, DatumReader
>>> file_name = "./dbzium.inventory.my_table+0+0000000000.avro"
>>> with open(file_name, 'rb') as f:
    reader = DataFileReader(f, DatumReader())
    metadata = copy.deepcopy(reader.meta)
    schema_from_file = json.loads(metadata['avro.schema'])
    data = [r for r in reader]
    reader.close()
... 
>>> data[0]
{'id': 1, 'my_first_tinyint_col': None, 'test_str': 'test_str', '__deleted': 'false', '__op': 'c', '__ts_ms': 1707323724020, '__source_ts_ms': 1707323723000, '__source_file': '1.000003', '__source_pos': 746927}
>>> 

Notice how the value for the test_str key is the default value (also test_str) instead of None or null.

In part of the S3 connector logs, I do see ignore.default.for.nullables = false, so is this setting perhaps not taking?

[2024-02-08 00:58:35,672] INFO [kafka-to-s3|task-0] Creating S3 client. (io.confluent.connect.s3.storage.S3Storage:89)
[2024-02-08 00:58:35,673] INFO [kafka-to-s3|task-0] Created a retry policy for the connector (io.confluent.connect.s3.storage.S3Storage:170)
[2024-02-08 00:58:35,673] INFO [kafka-to-s3|task-0] Returning new credentials provider based on the configured credentials provider class (io.confluent.connect.s3.storage.S3Storage:175)
[2024-02-08 00:58:35,673] INFO [kafka-to-s3|task-0] S3 client created (io.confluent.connect.s3.storage.S3Storage:107)
[2024-02-08 00:58:42,099] INFO [kafka-to-s3|task-0] AvroDataConfig values:
	allow.optional.map.keys = false
	connect.meta.data = true
	discard.type.doc.default = false
	enhanced.avro.schema.support = true
	generalized.sum.type.support = false
	ignore.default.for.nullables = false
	schemas.cache.config = 1000
	scrub.invalid.names = false
 (io.confluent.connect.avro.AvroDataConfig:369)
[2024-02-08 00:58:42,099] INFO [kafka-to-s3|task-0] Created S3 sink record writer provider. (io.confluent.connect.s3.S3SinkTask:119)
[2024-02-08 00:58:42,100] INFO [kafka-to-s3|task-0] Created S3 sink partitioner. (io.confluent.connect.s3.S3SinkTask:121)
[2024-02-08 00:58:42,100] INFO [kafka-to-s3|task-0] Started S3 connector task with assigned partitions: [] (io.confluent.connect.s3.S3SinkTask:135)
@raphaelauv
Copy link

raphaelauv commented Feb 13, 2024

the feature is available from confluent-schema-registry release 7.3 are you using this version of the ser/deser lib in your kafka-connect ?

check the doc -> https://docs.confluent.io/platform/current/schema-registry/connect.html#null-values-replaced-with-default-values

@andyhuynh3
Copy link
Author

@raphaelauv Yes I'm using version 7.6.0. It works on the producer side (e.g. with Debezium) but the issue is with the Confluent Kafka Connect sinks. I have a PR here to address the issue.

@raphaelauv
Copy link

raphaelauv commented Feb 13, 2024

"value.converter.ignore.default.for.nullables": "true"

work with

FROM confluentinc/cp-kafka-connect-base:7.6.0
RUN (echo 1 && yes) |confluent-hub install confluentinc/kafka-connect-s3:10.5.0

Screenshot from 2024-02-13 20-10-43

image

and the final file drop on s3 contain

{"currency":null,"contry":"UNKNOW","_kafka_partition":0,"_kafka_offset":9,"_kafka_timestamp":1707851123401}

@andyhuynh3
Copy link
Author

I'm not using the Confluent Kafka Connect image to begin with, but I can give it a try when I have some time.

More details on my setup -- I'm working with the Strimzi base image (quay.io/strimzi/kafka:0.38.0-kafka-3.5.1) and installing version 10.5.7 of the S3 sink connector:

ARG KAFKA_VERSION="3.5.1"
ARG STRIMZI_VERSION="0.38.0"

FROM quay.io/strimzi/kafka:${STRIMZI_VERSION}-kafka-${KAFKA_VERSION}

ARG CONFLUENT_S3_SINK_VERSION="10.5.7"
ENV CONFLUENT_S3_SINK_VERSION=${CONFLUENT_S3_SINK_VERSION}
RUN mkdir -p /opt/kafka/plugins/s3
RUN curl https://d1i4a15mxbxib1.cloudfront.net/api/plugins/confluentinc/kafka-connect-s3/versions/${CONFLUENT_S3_SINK_VERSION}/confluentinc-kafka-connect-s3-${CONFLUENT_S3_SINK_VERSION}.zip -o /tmp/confluentinc-kafka-connect-s3-${CONFLUENT_S3_SINK_VERSION}.zip
RUN cd /tmp && unzip /tmp/confluentinc-kafka-connect-s3-${CONFLUENT_S3_SINK_VERSION}.zip
RUN cp -r /tmp/confluentinc-kafka-connect-s3-${CONFLUENT_S3_SINK_VERSION}/lib /opt/kafka/plugins/s3/
RUN rm /tmp/confluentinc-kafka-connect-s3-${CONFLUENT_S3_SINK_VERSION}.zip && rm -rf /tmp/confluentinc-kafka-connect-s3-${CONFLUENT_S3_SINK_VERSION}

I do have "value.converter.ignore.default.for.nullables": "true" with this setup, but I still get nulls being replaced with the default in the S3 files.

@raphaelauv
Copy link

Check the version of the schema-registry libs jar in that container image

bjoernhaeuser added a commit to bjoernhaeuser/kafka-connect-storage-cloud that referenced this issue May 16, 2024
Description:

Using the JsonFormat to write "from" debezium to kafka and then using
the s3sinkconnector to read from kafka and save to s3, causes null
values to be stored always with their default values.

Therefore adding a new config property (for backwards compatibility) to
allow the value transformer inside the s3sinkconnector to be configured
correctly.

Tests for the configuration and and integration have been added as well.

This addresses confluentinc#716, but for json, instead of avro
bjoernhaeuser added a commit to bjoernhaeuser/kafka-connect-storage-cloud that referenced this issue May 16, 2024
Description:

Using the JsonFormat to write "from" debezium to kafka and then using
the s3sinkconnector to read from kafka and save to s3, causes null
values to be stored always with their default values.

Therefore adding a new config property (for backwards compatibility) to
allow the value transformer inside the s3sinkconnector to be configured
correctly.

Tests for the configuration and and integration have been added as well.

This addresses confluentinc#716, but for json, instead of avro
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants