-
Notifications
You must be signed in to change notification settings - Fork 13.6k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
KAFKA-16508: Streams custom handler should handle the timeout exceptions #16450
base: trunk
Are you sure you want to change the base?
KAFKA-16508: Streams custom handler should handle the timeout exceptions #16450
Conversation
@@ -306,6 +307,15 @@ private void recordSendError(final String topic, final Exception exception, fina | |||
sendException.set(new TaskMigratedException(errorMessage, exception)); | |||
} else { | |||
if (exception instanceof RetriableException) { | |||
if (exception instanceof TimeoutException && exception.getCause() != null) { | |||
if (exception.getCause() instanceof UnknownTopicOrPartitionException) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should we not use these three nested ifs, but add a new if (isRetriable(excption)) { ... }
helper method?
Would also avoid to call the handler on two places, as isRetriable()
return false
we just re-use the code in the else
path?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Would also avoid to call the handler on two places, as isRetriable() return false we just re-use the code in the else path?
I assumed if we have an UnknownTopicOrPartition
exception and the handler instructs to CONTINUE
, we must follow the old path which is sendException.set(new TaskCorruptedException(Collections.singleton(taskId)));
BTW, I changed the code as you wished.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think so. If the hander returns CONTINUE
we should execute the "continues" code, pass, ie, the existing else to just drop the record and record it:
https://github.com/apache/kafka/pull/16450/files#diff-36dd8c03fa5252dbd39042bb49a0d6272728fbd46459c6a45f5189fa59749b32R322-R323
@@ -1350,6 +1351,30 @@ public void shouldNotThrowStreamsExceptionOnSubsequentCallIfASendFailsWithContin | |||
collector.closeClean(); | |||
} | |||
|
|||
@Test | |||
public void shouldThrowStreamsExceptionOnUnknownTopicOrPartitionExceptionWithDefaultExceptionHandler() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If we don't call the handler, what exception is thrown from flush()
? It is just the plain/unwrapped TimeoutExcpetion
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If we don't call the default handler, it throws TaskCorruptedException
with a null
cause. This is what recordSendError()
method determines.
@@ -306,6 +307,15 @@ private void recordSendError(final String topic, final Exception exception, fina | |||
sendException.set(new TaskMigratedException(errorMessage, exception)); | |||
} else { | |||
if (exception instanceof RetriableException) { | |||
if (exception instanceof TimeoutException && exception.getCause() != null) { | |||
if (exception.getCause() instanceof UnknownTopicOrPartitionException) { | |||
if (productionExceptionHandler.handle(serializedRecord, exception) == ProductionExceptionHandlerResponse.FAIL) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Wondering if we should hand in exception
or exception.getCause()
? Thoughts?
.to(NON_EXISTING_TOPIC, Produced.with(Serdes.Integer(), Serdes.String())); | ||
produceRecords(); | ||
startApplication(); | ||
closeApplication(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think there is a race condition? In the end, we don't know if the record was even processed? We could call KafkaStreams#close()
before the record was processed?
Also, with the default production handler which return "FAIL", should we not expect Kafka Streams to go into ERROR state instead of NOT_RUNNING?
@@ -305,14 +306,16 @@ private void recordSendError(final String topic, final Exception exception, fina | |||
"indicating the task may be migrated out"; | |||
sendException.set(new TaskMigratedException(errorMessage, exception)); | |||
} else { | |||
if (exception instanceof RetriableException) { | |||
final ProductionExceptionHandlerResponse response = productionExceptionHandler.handle(serializedRecord, exception); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think we need to call the handler for all cases here, but only if the exception is non-retriable (old, existing condition), or if the exception is a timeout with root cause "unknown topic or partition"?
So something like:
if (!isRetriable(exception)) { // we only change this line but the rest stays the same?
errorMessage += "\nThe broker is either slow or in bad state (like not having enough replicas) in responding the request, " +
"or the connection to broker was interrupted sending the request or receiving the response. " +
"\nConsider overwriting `max.block.ms` and /or " +
"`delivery.timeout.ms` to a larger value to wait longer for such scenarios and avoid timeout errors";
sendException.set(new TaskCorruptedException(Collections.singleton(taskId)));
} else {
if (productionExceptionHandler.handle(serializedRecord, exception) == ProductionExceptionHandlerResponse.FAIL) {
errorMessage += "\nException handler choose to FAIL the processing, no more records would be sent.";
sendException.set(new StreamsException(errorMessage, exception));
} else {
errorMessage += "\nException handler choose to CONTINUE processing in spite of this error but written offsets would not be recorded.";
droppedRecordsSensor.record();
}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If the exception is Timeout
with root cause UnknownTopicOrPartition
and the handler instructs to CONTINUE
, we must follow the old logic, which is having TaskCorruptedException
. Therefore, I think we have to call handler for nearly all cases.
@@ -325,6 +328,11 @@ private void recordSendError(final String topic, final Exception exception, fina | |||
log.error(errorMessage, exception); | |||
} | |||
|
|||
private boolean isMissingMetadata (final Exception exception) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should we rewrite this to:
private boolean isRetriable (final Exception exception) {
return !(exception instanceOf TimeoutException && exception.getCause() != null
&& exception.getCause() instanceof UnknownTopicOrPartitionException()) || exception instanceOf RetriableExcepiton;
}
Hope I got the logic right... Basically we want to say, it is retriable if it is a RetryableExcetpion
but not if it is a TimeoutException with root-cause "unknonw topic or partition".
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just a few nits. Overall LGTM.
@@ -325,6 +326,11 @@ private void recordSendError(final String topic, final Exception exception, fina | |||
log.error(errorMessage, exception); | |||
} | |||
|
|||
private boolean isRetriable(final Exception exception) { | |||
return (!(exception instanceof TimeoutException) || exception.getCause() == null |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This logic is complex. Let's a comment about what it actually does
// We don't consider a `TimeoutException` with root cause `UnknownTopicOrPartitionException` retriable,
// even if `TimeoutException extends RetriableException` and thus exclude this case expliclity
@@ -325,6 +326,11 @@ private void recordSendError(final String topic, final Exception exception, fina | |||
log.error(errorMessage, exception); | |||
} | |||
|
|||
private boolean isRetriable(final Exception exception) { | |||
return (!(exception instanceof TimeoutException) || exception.getCause() == null | |||
|| !(exception.getCause() instanceof UnknownTopicOrPartitionException)) && exception instanceof RetriableException; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit: Given how you wrote the predicate, might be easier to read if we put exception instanceof RetriableException && ...
at the beginning, and have the complex expression at the end?
streams/src/test/java/org/apache/kafka/streams/integration/CustomHandlerIntegrationTest.java
Show resolved
Hide resolved
while (true) { | ||
if (caughtException.get() != null) { | ||
assertInstanceOf(StreamsException.class, caughtException.get()); | ||
assertInstanceOf(TimeoutException.class, caughtException.get().getCause()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should we add one more verifying the cause of TimeoutException
, too?
IntegrationTestUtils.purgeLocalStreamsState(streamsConfiguration); | ||
final long timeout = 60000; | ||
TestUtils.waitForCondition( | ||
() -> kafkaStreams.state() == State.NOT_RUNNING, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Given that the uncaught exception handler returns SHUTDOWN_CLIENT
the final state should be ERROR
IIRC?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
exactly
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks. LGTM assuming Jenkins passes.
No description provided.