Avoid late records preemptively rotating/committing S3 output files #574
+251
−37
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
When late data is arriving on a Kafka partition (e.g. data for the previous hourly encodedPartition) the following check triggers an immediate rotation and commit of files:
kafka-connect-storage-cloud/kafka-connect-s3/src/main/java/io/confluent/connect/s3/TopicPartitionWriter.java
Line 410 in 918730d
When late data is interleaved with up-to-date data arriving the problem is exacerbated.
When this happens, a quick succession of rotations cause a large number of small files to be committed to S3.
This affects both the performance/throughput of Kafka Connect as well as downstream consumers which need to deal with the many small file fragments.
This PR adds a new
max.open.files.per.partition
S3SinkConnectorConfig. It defaults to 1, which preserves the current existing behavior.If set to a value > 1, the following behavior is enabled:
A separate commit file is kept open for each encodedPartition target up to a maximum of
max.open.files.per.partition
Only when any of the encodedPartition targets hits its rotation condition (
flush.size
,rotate.interval.ms
) does rotation occur, committing all open files. All files are committed so that S3Sink's pre-commit hook will commit a high watermark of offset to the Kafka consumer group. This avoids buffered gaps of data still being in-flight when that occurs.It's worth noting that this issue/limitation was previously encountered and is well-described as part of:
"CC-2313 Handle late arriving records in storage cloud sink connectors" #187
However, that feature was subsequently reverted:
a2ce6fc confluentinc/kafka-connect-storage-common#87
N.B. Unlike the solution proposed on CC-2313, we do not opt to write late data to an incorrect encodedPartition. i.e. late data for hour 7 will not land in a path/file for hour 8