-
Notifications
You must be signed in to change notification settings - Fork 13.6k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
KAFKA-16750: Added acknowledge code in SharePartitionManager including unit tests #16457
base: trunk
Are you sure you want to change the base?
Conversation
5f943a7
to
276e4f2
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the PR. Some comments and doubts I have.
@@ -45,6 +45,7 @@ | |||
files="(KafkaClusterTestKit).java"/> | |||
<suppress checks="JavaNCSS" | |||
files="RemoteLogManagerTest.java"/> | |||
<suppress checks="ClassDataAbstractionCoupling" files="SharePartitionManagerTest"/> |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we remove this suppression? We have refactored SharePartition and SharePartitionManager code to have smaller methods so can go away with this suppression.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hi, thanks for the review. Actually I am not able to run any tests locally without the suppression. It results in a checkstyle error without the suppression
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, what I am suggesting is that to check if there is a way to write tests better so we do not have to add this suppression.
core/src/main/java/kafka/server/share/SharePartitionManager.java
Outdated
Show resolved
Hide resolved
core/src/main/java/kafka/server/share/SharePartitionManager.java
Outdated
Show resolved
Hide resolved
core/src/main/java/kafka/server/share/SharePartitionManager.java
Outdated
Show resolved
Hide resolved
core/src/test/java/kafka/server/share/SharePartitionManagerTest.java
Outdated
Show resolved
Hide resolved
@@ -1239,6 +1242,180 @@ public void testCloseSharePartitionManager() throws Exception { | |||
Mockito.verify(persister, times(1)).stop(); | |||
} | |||
|
|||
@Test | |||
public void testAcknowledgeSuccess() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Query: What do we mean by success
in the method name?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is in regards to this comment and all the ones below. I realise some of these tests have stopped making sense. The share partition code, including its tests, was edited quite a number of times by multiple people. I will go through all the tests once again and rewrite them the right way. Thanks for the review
TopicIdPartition tp1 = new TopicIdPartition(Uuid.randomUuid(), new TopicPartition("foo1", 0)); | ||
TopicIdPartition tp2 = new TopicIdPartition(Uuid.randomUuid(), new TopicPartition("foo2", 2)); | ||
TopicIdPartition tp3 = new TopicIdPartition(Uuid.randomUuid(), new TopicPartition("foo3", 4)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we need 3 topic partitions to test incorrect group id
? Can't we do by just single topic partition i.e. what we are testing different with 3 topic partitions?
when(sp1.acknowledge(ArgumentMatchers.eq(memberId2), any())).thenReturn(CompletableFuture.completedFuture( | ||
Optional.of(new InvalidRequestException("Member is not the owner of batch record")) | ||
)); | ||
when(sp2.acknowledge(ArgumentMatchers.eq(memberId2), any())).thenReturn(CompletableFuture.completedFuture( | ||
Optional.of(new InvalidRequestException("Member is not the owner of batch record")) | ||
)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Same question as above.
SharePartition sp1 = mock(SharePartition.class); | ||
SharePartition sp2 = mock(SharePartition.class); | ||
when(sp1.acknowledge(ArgumentMatchers.eq(memberId), any())).thenReturn(CompletableFuture.completedFuture(Optional.empty())); | ||
when(sp2.acknowledge(ArgumentMatchers.eq(memberId), any())).thenReturn(CompletableFuture.completedFuture( | ||
Optional.of(new InvalidRequestException("Batch record not found. The base offset is not found in the cache.")) | ||
)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I do not see these mocks being used anywhere. Am I missing something?
1986472
to
20f50eb
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for cleaning the tests and addressing the comments. LGTM, just minor comments and pelase check if we can get rid of suppression.
@@ -45,6 +45,7 @@ | |||
files="(KafkaClusterTestKit).java"/> | |||
<suppress checks="JavaNCSS" | |||
files="RemoteLogManagerTest.java"/> | |||
<suppress checks="ClassDataAbstractionCoupling" files="SharePartitionManagerTest"/> |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, what I am suggesting is that to check if there is a way to write tests better so we do not have to add this suppression.
Map<SharePartitionManager.SharePartitionKey, SharePartition> partitionCacheMap = new HashMap<>(); | ||
partitionCacheMap.put(new SharePartitionManager.SharePartitionKey(groupId, tp), sp); | ||
|
||
Map<TopicIdPartition, List<ShareAcknowledgementBatch>> acknowledgeTopics = new HashMap<>(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: You can move the declaration later in the code when the usage occurs i.e. when it's populated.
About
This PR adds acknowledge code in SharePartitionManager. Internally, the record acknowledgements happen at the SharePartition level. SharePartitionManager identifies the SharePartitions and calls their acknowledge method to actually acknowledge the individual records
Testing
Added unit tests to cover the new functionality added in SharePartitionManagerTest