Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Flink-1.19: Fix the file offset mismatch when Flink reader first seek… #10567

Open
wants to merge 2 commits into
base: main
Choose a base branch
from

Conversation

zhongyujiang
Copy link
Contributor

…s position.

When DataIterator first seeks position (startingFileOffset=0 & startingRecordOffset=0) on a split, fileOffset might be assigned incorrectly. This can lead to Flink consuming data repeatedly when recovering from ckpt or to not being able to recover from ckpt because of IllegalStateException: Invalid starting record offset...
Here is an example:
Suppose there is a CombinedScanTask that contains three FileScanTask with the following metrics and residual filters:

  • fileScanTask-0: 1 < ts < 10, residual filter: ts > 10
  • fileScanTask-1: 100 < ts < 200, residual filter: ts > 10
  • fileScanTask-2: 100 < ts < 200, residual filter: ts > 10

When DataIterator initializes for the first time, it calls #seek with startingFileOffset=0 and startingRecordOffset=0. In #updateCurrentIterator, when currentIterator is moved to file 0 (fileScanTask-0) , the subsequent while condition !currentIterator.hasNext() will still be true because parquet row-group filters evaluate that fileScanTask-0 can be skipped. Therefore, currentIterator will be moved to fileScanTask-1. This also increases fileOffset, which is the correct offset by now.

private void updateCurrentIterator() {
  try {
    while (!currentIterator.hasNext() && tasks.hasNext()) {
      currentIterator.close();
      currentIterator = openTaskIterator(tasks.next());
      fileOffset += 1;
      recordOffset = 0L;
    }
  } catch (IOException e) {
    throw new UncheckedIOException(e);
  }
}

However, when returning to the #seek method, fileOffset will be assigned the value of startingFileOffset(0, the actual value is 1), which is wrong:

fileOffset = startingFileOffset;
recordOffset = startingRecordOffset;

This results in the following:
Suppose Flink sets a ckpt at record offset 50 of fileScanTask-2. The ckpt record would be (fileOffset = 1, recordOffset=50), while the real value should be (fileOffset = 2, recordOffset=50). When the job attempts to recover from this ckpt, it will try to start reading records again from (fileOffset = 1, recordOffset=50).

  • If the number of records in fileScanTask-1 is less than 50, an exception IllegalStateException: Invalid starting record offset... will be thrown, and the job cannot recover
  • If the number of records in fileScanTask-1 is greater than or equal to 50, although recovery is successful, the records will be consumed repeatedly.

cc @pvary @stevenzwu could you take a look at this? thanks!

@github-actions github-actions bot added the flink label Jun 25, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

1 participant