Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

KAFKA-16754: Implemented release acquired records functionality to SharePartition #16430

Merged
merged 8 commits into from
Jun 27, 2024

Conversation

adixitconfluent
Copy link
Contributor

@adixitconfluent adixitconfluent commented Jun 24, 2024

About

Implemented release acquired records functionality in SharePartition. This functionality is used when a share session gets closed, hence all the acquired records should either move to AVAILABLE or ARCHIVED state. Implemented the following functions -

  1. releaseAcquiredRecords - This function is executed when the acquisition lock timeout is reached. The function releases the acquired records.
  2. releaseAcquiredRecordsForCompleteBatch - Function which releases acquired records maintained at a batch level.
  3. releaseAcquiredRecordsForPerOffsetBatch - Function which releases acquired records maintained at an offset level.

Testing

Added unit tests to cover the new functionality added.

@adixitconfluent adixitconfluent changed the title Implemented release acquired records functionality to SharePartition KAFKA-16754: Implemented release acquired records functionality to SharePartition Jun 24, 2024
@adixitconfluent adixitconfluent marked this pull request as ready for review June 25, 2024 08:57
Copy link
Contributor

@AndrewJSchofield AndrewJSchofield left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please review my comment and see whether there's a more straightforward way of achieving it.

} finally {
lock.writeLock().unlock();
}
return CompletableFuture.completedFuture(Optional.ofNullable(throwable));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The CompletableFuture<Optional<Throwable>> is a bit unusual. Generally, a future with a Throwable would complete exceptionally, rather than completing with a result which is an optional Throwable. Then, instantiating an exception is an expensive operation which captures a stack trace and so on. So, I wonder whether this is really the best pattern for passing back the results from these methods.

Copy link
Contributor

@apoorvmittal10 apoorvmittal10 Jun 25, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmmm, good question.

There are 3 options:

  1. CompletableFuture<Throwable>
  2. CompletableFuture<Optional<Throwable>>
  3. CompletableFuture<Void>

Option 1 doesn't seems right as it specifies that a Throwable should be returned when completed. I was mostly thinking about option 2 and 3, Option 2 explicitly says that we might have a Throwable which defines the success of the method. Option 3 is more aligned with your suggestion, that future will be completed exceptionally if error occurs else the method success completion shall not result anything.
I went ahead with explicit declaration, also with Option 3, the actual exception gets wrapped in ExecutionException which need wrap/unwrap. Though it's minimal and can be debated when wrapped in Optional. I am fine with either though. Wdyt?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since, we need to use the exception to return to the API that functions acknowledge and releaseAcquiredRecords return, I think we can go ahead with option 2. The tradeoffs remain the same with both option 2 and option 3 as mentioned by @apoorvmittal10 .

Copy link
Contributor

@apoorvmittal10 apoorvmittal10 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the PR, LGTM. Just a minor comment and let's follow up on @AndrewJSchofield's suggestion.

core/src/main/java/kafka/server/share/SharePartition.java Outdated Show resolved Hide resolved
Copy link
Contributor

@apoorvmittal10 apoorvmittal10 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM!

if (inFlightBatch.offsetState() != null) {
Optional<Throwable> releaseAcquiredRecordsThrowable = releaseAcquiredRecordsForPerOffsetBatch(memberId, inFlightBatch, recordState, updatedStates, stateBatches);
if (releaseAcquiredRecordsThrowable.isPresent()) {
throwable = releaseAcquiredRecordsThrowable.get();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why to fetch the throwable and again wrapping in Optional? The method signature returns CompletableFuture<Optional<Throwable>> itself. Just noticed we need to correct other methods as well, hence I will take that in separate PR while correcting other API methods as well.

Copy link
Contributor

@AndrewJSchofield AndrewJSchofield left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

lgtm

Copy link
Contributor

@omkreddy omkreddy left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@adixitconfluent Thanks for the PR. LGTM

@omkreddy omkreddy merged commit 49e9bd4 into apache:trunk Jun 27, 2024
1 check failed
@adixitconfluent adixitconfluent deleted the kafka-16754 branch June 27, 2024 10:04
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
4 participants