-
Notifications
You must be signed in to change notification settings - Fork 13.6k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
KAFKA-17022: Fix error-prone in KafkaApis#handleFetchRequest #16455
Conversation
@@ -938,7 +938,7 @@ class KafkaApis(val requestChannel: RequestChannel, | |||
|
|||
var unconvertedFetchResponse: FetchResponse = null |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is the root cause of the error-prone :(
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I understand, mutable variables are the root of many issues. We should keep variables as immutable as possible to avoid unexpected errors.
Hi @chia7712 , I have refactor the method, PTAL 😄 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@frankvicky thanks for your patch
|
||
// Send the response immediately. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we can keep it by returning the unconvertedFetchResponse
from if (maxThrottleTimeMs > 0)
. for example:
val unconvertedFetchResponse = if (maxThrottleTimeMs > 0) {
request.apiThrottleTimeMs = maxThrottleTimeMs
// Even if we need to throttle for request quota violation, we should "unrecord" the already recorded value
// from the fetch quota because we are going to return an empty response.
quotas.fetch.unrecordQuotaSensor(request, responseSize, timeMs)
if (bandwidthThrottleTimeMs > requestThrottleTimeMs) {
requestHelper.throttle(quotas.fetch, request, bandwidthThrottleTimeMs)
} else {
requestHelper.throttle(quotas.request, request, requestThrottleTimeMs)
}
// If throttling is required, return an empty response.
fetchContext.getThrottledResponse(maxThrottleTimeMs)
} else {
// Get the actual response. This will update the fetch context.
val unconvertedFetchResponse = fetchContext.updateAndGenerateResponseData(partitions)
val responsePartitionsSize = unconvertedFetchResponse.data().responses().stream().mapToInt(_.partitions().size()).sum()
trace(s"Sending Fetch response with partitions.size=$responsePartitionsSize, " +
s"metadata=${unconvertedFetchResponse.sessionId}")
unconvertedFetchResponse
}
// Send the response immediately.
requestChannel.sendResponse(request, createResponse(maxThrottleTimeMs, unconvertedFetchResponse), Some(updateConversionStats))
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I see, if-else
is expression in Scala. Cool
var unconvertedFetchResponse: FetchResponse = null | ||
|
||
def createResponse(throttleTimeMs: Int): FetchResponse = { | ||
def createResponse(throttleTimeMs: Int, fetchResponse: FetchResponse): FetchResponse = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As there is a convertedData
, it would be nice to keep using the name: unconvertedFetchResponse
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I will try to refactor it 🤔
Hi @chia7712, I have make some changes based on feedbacks, PTAL 😄 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
createResponse
references a variable out of scope, and so that is error-prone since it could be not initialized when executing. We should do a bit refactor to addunconvertedFetchResponse
tocreateResponse
.Committer Checklist (excluded from commit message)