-
Notifications
You must be signed in to change notification settings - Fork 13.6k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
KAFKA-16745: Implemented handleShareFetchRequest RPC including unit tests #16456
base: trunk
Are you sure you want to change the base?
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the PR. Took an initial look, will review again later. Some comments though.
@@ -53,6 +53,9 @@ public interface ClientMetadata { | |||
|
|||
|
|||
class DefaultClientMetadata implements ClientMetadata { | |||
|
|||
public static final String DEFAULT_RACK_ID = ""; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is it possible to use DEFAULT_CLIENT_RACK
already defined in Configs?
@@ -359,6 +359,26 @@ public List<TopicIdPartition> cachedTopicIdPartitionsInShareSession(String group | |||
return cachedTopicIdPartitions; | |||
} | |||
|
|||
/** | |||
* The acknowledgeShareSessionCacheUpdate method is used to update the share session cache before acknowledgements are handled | |||
* either as part of shareFetch request or shareAcknowledge request |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit:
* either as part of shareFetch request or shareAcknowledge request | |
* either as part of shareFetch request or shareAcknowledge request. |
// Updating the session's position in the cache to guard against the entry disappearing between the ack and the fetch | ||
sharePartitionManager.acknowledgeShareSessionCacheUpdate(groupId, Uuid.fromString(memberId), shareSessionEpoch) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Query: Can you please help what issue actually occurs and why we need to update the cache i.e. how can the cache eviction be prevented with the cache update?
if (reqEpoch == ShareFetchMetadata.FINAL_EPOCH) { | ||
return; | ||
} | ||
ShareSession shareSession = cache.get(new ShareSessionKey(groupId, memberId)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Query: Is it always guranteed that a non-null share session will exist?
@@ -3957,11 +3961,576 @@ class KafkaApis(val requestChannel: RequestChannel, | |||
} | |||
} | |||
|
|||
def getAcknowledgeBatchesFromShareFetchRequest( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: should we declare the helper methods post handle
methods?
/** | ||
* Generate a random string of letters and digits of the given length | ||
* | ||
* @param len The length of the string | ||
* @return The random string | ||
*/ | ||
def randomString(len: Int): String = JTestUtils.randomString(len) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: do we need this or can work with existing randomBytes
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Didn't realise the existence of randomBytes. Made the amends, thanks for the review
@@ -116,6 +117,7 @@ public class KRaftMetadataRequestBenchmark { | |||
clientQuotaManager, clientRequestQuotaManager, controllerMutationQuotaManager, replicaQuotaManager, | |||
replicaQuotaManager, replicaQuotaManager, Option.empty()); | |||
private final FetchManager fetchManager = Mockito.mock(FetchManager.class); | |||
private final Optional<SharePartitionManager> sharePartitionManager = Optional.of(Mockito.mock(SharePartitionManager.class)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Where does this mocked version used later? If we don not define the setter of SharPartitionManager then ll it make a difference?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It should not make a difference. I removed the mock and passed Optional.empty() in setSharePartitionManager method. Did the same in MetadataRequestBenchmark.java as well
About
Implemented handleShareFetch request RPC in KafkaApis.scala. This method is called whenever the client sends a Share Fetch request to the broker. Although Share Fetch request support acknowledgements, since the logic for acknowledging records is not completely implemented in SharePartitionManager.java class, this method currently includes placeholder code for acknowledging, which will be replaced by the actual functionality in the upcoming PRs.
Testing
Added unit tests to cover the new functionality added in KafkaApisTest