Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Direct Memory Buffer Leak? #707

Open
ozturkberkay opened this issue Dec 24, 2023 · 1 comment
Open

Direct Memory Buffer Leak? #707

ozturkberkay opened this issue Dec 24, 2023 · 1 comment

Comments

@ozturkberkay
Copy link

Hi,

We have a Docker image like below:

FROM confluentinc/cp-kafka-connect-base:7.4.3

ENV CONNECT_PLUGIN_PATH "/usr/share/java"

RUN confluent-hub install --no-prompt confluentinc/kafka-connect-s3:10.5.7

RUN cp -r /usr/share/confluent-hub-components/confluentinc-kafka-connect-s3 /usr/share/java

We deployed it on Kubernetes as a deployment with 8 replicas. We created a connector that consumes 3 topics with 400 partitions in total. It is reading the data from those topics and writing the data into AWS S3 as hourly partitioned parquet files. We were getting OOM kills frequently, so we started to the changes below:

  • Reduce s3.part.size to the minimum value of 5 mib
  • Enable elastic buffer
  • Reduced rotate.interval.ms from one hour to only 20 minutes.
  • We didn't even need to touch flush.size as the partition files were getting very small already.

This is what the connector config looks like:

{
  "connector.class": "io.confluent.connect.s3.S3SinkConnector",
  "s3.region": "us-east-1",
  "s3.elastic.buffer.enable": "true",
  "flush.size": "50000",
  "tasks.max": "8",
  "timezone": "UTC",
  "s3.part.size": "5242880",
  "rotate.interval.ms": "1200000",
  "locale": "en-US",
  "errors.deadletterqueue.context.headers.enable": "true",
  "format.class": "io.confluent.connect.s3.format.parquet.ParquetFormat",
  "schema.generator.class": "io.confluent.connect.storage.hive.schema.DefaultSchemaGenerator",
  "errors.deadletterqueue.topic.replication.factor": "1",
  "value.converter": "org.apache.kafka.connect.json.JsonConverter",
  "key.converter": "org.apache.kafka.connect.storage.StringConverter",
  "s3.bucket.name": "REDACTED",
  "schema.compatibility": "NONE",
  "topics.regex": "REDACTED",
  "errors.deadletterqueue.topic.name": "REDACTED",
  "partitioner.class": "io.confluent.connect.storage.partitioner.HourlyPartitioner",
  "name": "s3-sink-connector",
  "errors.tolerance": "all",
  "storage.class": "io.confluent.connect.s3.storage.S3Storage",
  "path.format": "'year'=YYYY/'month'=MM/'day'=dd/'hour'=HH",
  "timestamp.extractor": "Record"
}

Reducing the rotation interval by 5x didn't even reduce the memory usage of our deployment. That made us very suspicious. So we enabled JMX metrics and started to play around with JVM settings:

          env:
            - name: JMX_PORT
              value: "5000"
            - name: POD_IP
              valueFrom:
                fieldRef:
                  fieldPath: status.podIP
            - name: MALLOC_ARENA_MAX
              value: "4"
            - name: KAFKA_HEAP_OPTS
              value: >-
                -Xms1024m
                -Xmx10g
            - name: KAFKA_JVM_PERFORMANCE_OPTS
              value: >-
                -XX:MaxDirectMemorySize=5g
                -XX:MaxMetaspaceSize=260m
                -XX:ReservedCodeCacheSize=240M
                -XX:+UseG1GC
                -XX:+UseContainerSupport
                -XX:+HeapDumpOnOutOfMemoryError
                -XX:+OptimizeStringConcat
                -XX:+UseStringDeduplication
                -XX:+AlwaysActAsServerClassMachine
            - name: JAVA_OPTS
              value: >-
                -Dcom.sun.management.jmxremote
                -Dcom.sun.management.jmxremote.authenticate=false
                -Dcom.sun.management.jmxremote.ssl=false
                -Dcom.sun.management.jmxremote.port=5000
                -Dcom.sun.management.jmxremote.rmi.port=5000
                -Dcom.sun.management.jmxremote.local.only=false
                -Djava.rmi.server.hostname=$(POD_IP)
                -Djava.net.preferIPv4Stack=true

Here is what the metrics look like:

36818

We noticed that the direct memory buffer is the source of our always-increasing K8s deployment memory usage.

I will add more info if I get a chance to analyze some heap dumps.

Questions:

  • How exactly is the direct memory buffer used in Kafka Connect?
  • Why does it keep increasing and never getting released?
  • Is this expected behavior?
  • What else we can do to take memory usage under control?
  • What are the best practices for using Kafka Connect in k8s?
@raphaelauv
Copy link

you forgot

rotate.schedule.interval.ms

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants