-
Notifications
You must be signed in to change notification settings - Fork 900
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add StreamMessage.timeout() #5761
base: main
Are you sure you want to change the base?
Conversation
core/src/main/java/com/linecorp/armeria/common/stream/TimeoutSubscriber.java
Outdated
Show resolved
Hide resolved
Some of the descriptions in the Modifications field contain Korean.
p.s. It's an interesting feature, so I'm watching with interest |
Thank you for your interest. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks! Please make the build pass as well.
long currentTimeNanos = System.nanoTime(); | ||
long elapsedNanos = currentTimeNanos - lastOnNextTimeNanos; | ||
|
||
if(elapsedNanos <= timeoutNanos) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Perhaps we should use <
instead of <=
? If elapsedNanos == timeoutNanos
, it means it's been timed out.
long elapsedNanos = currentTimeNanos - lastOnNextTimeNanos; | ||
|
||
if(elapsedNanos <= timeoutNanos) { | ||
long delayNanos = timeoutNanos - (currentTimeNanos - lastOnNextTimeNanos); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We don't need to calculate this more than once:
long delayNanos = timeoutNanos - (currentTimeNanos - lastOnNextTimeNanos);
if (delayNanos > 0) {
timeoutFuture = ...;
}
} | ||
|
||
private ScheduledFuture<?> scheduleTimeout() { | ||
return executor.schedule(() -> { | ||
private Runnable createTimeoutTask() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not very pretty, but how about making TimeoutSubscriber
implement Runnable
to save one object allocation? i.e.
final class TimeoutSubscriber implements Runnable, ... {
@Override
public void run() {
if (timeoutMode == ...
}
}
@@ -29,70 +29,88 @@ | |||
import io.netty.util.concurrent.ScheduledFuture; | |||
|
|||
public class TimeoutSubscriber<T> implements Subscriber<T> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we hide this class from the public API? i.e. final class TimeoutSubscriber ...
Last but not least, please sign the ICLA. |
I checked, thank you for your review. |
7c52cef
to
d1ea297
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks mostly done 👍 Left a couple of comments regarding edge cases
subscription.cancel(); | ||
delegate.onError(new TimeoutException(String.format(TIMEOUT_MESSAGE, timeoutDuration.toMillis(), timeoutMode))); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There is no guarantee that subscription.cancel
will synchronize with upstream to not send additional signals.
e.g.delegate.onError
could be called twice, once from L65 and once from L108, or delegate.onError
(L65) and delegate.onComplete
(L114) could be called.
If a Subscription is cancelled its Subscriber MUST eventually stop being signaled ... The reason for eventually is because signals can have propagation delay due to being asynchronous.
ref: https://github.com/reactive-streams/reactive-streams-jvm?tab=readme-ov-file#1.8
ref: https://github.com/reactive-streams/reactive-streams-jvm?tab=readme-ov-file#1.7
} | ||
|
||
@Override | ||
public void abort() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Question) If TimeoutStreamMessage#abort
is called, shouldn't we also cancel the scheduled timeout future as well?
Motivation:
Currently, the
aggregate()
andsubscribe()
methods ofStreamMessage
do not have the ability to set a timeout. This provides the ability to detect when a client or server has not responded for a period of time and handle it appropriately. Additionally, the timeout API can be used to detect idle streams by setting a timeout until the next message.Modifications:
TimeoutStreamMessage
classStreamMessage
to provide timeout functionalitytimeout()
method to theStreamMessage
interface.TimeoutSubscriber
classStreamTimeoutMode
allows you to set different timeout modes.StreamTimeoutMode
enumerationUNTIL_FIRST
,UNTIL_NEXT
, andUNTIL_EOS
modes.timeout
method to theStreamMessage
,HttpResponse
, andHttpRequest
interfaces to provide the ability to set a timeout.Result:
StreamMessage.timeout()
#5744aggregate()
andsubscribe()
methods ofStreamMessage
and HTTP requests/responses.StreamTimeoutMode
to set the timeout between the arrival of the first message, the arrival of the next message, or the end of the stream.