Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add Flow#onBackpressureDrop() operator #1363

Open
He-Pin opened this issue Jun 14, 2024 · 1 comment
Open

Add Flow#onBackpressureDrop() operator #1363

He-Pin opened this issue Jun 14, 2024 · 1 comment

Comments

@He-Pin
Copy link
Member

He-Pin commented Jun 14, 2024

Motivation:

I want to do some rate limiting and just fail the single task, eg when combined with mapAsync.
Currently seems I can't observe the backpressure.

onBackpressure is only been called once downstream backpressure.

        queue = Source.<TranslateTask>queue(10240)
            .groupedWeightedWithin(
                dynamicDictionaryConfigs.getMaxBatchTokens(), //最大的字符数,1000 个以内
                dynamicDictionaryConfigs.getMaxBatchSize(), //最大的批量大小,50 个以内
                costFn, //字符数计算
                Duration.ofMillis(dynamicDictionaryConfigs.getMaxBatchIntervalInMillis())) //最大的聚合时间,比如 3ms
            .onBackpressure(task -> task.fail(...)) //
            .buffer(dynamicDictionaryConfigs.getOvsQpsLimit() * 2, OverflowStrategy.backpressure())
            .toMat(Sink.foreach(this::batchTranslate), Keep.left())
            ....
            .run(actorSystem);

What do you think?

In reactor-core, there is:

  • reactor.core.publisher.Flux#onBackpressureDrop(java.util.function.Consumer<? super T>)

This is what I would like to make use.

@He-Pin
Copy link
Member Author

He-Pin commented Jun 23, 2024

Update:

@InternalApi private[pekko] trait Buffer[T] {
  def capacity: Int
  def used: Int
  def isFull: Boolean
  def isEmpty: Boolean
  def nonEmpty: Boolean

  def enqueue(elem: T): Unit
  def dequeue(): T

  def peek(): T
  def clear(): Unit
  def dropHead(): Unit
  def dropTail(): Unit
}

as the current Buffer's clear, dropHead and dropTail just returns Unit, change it to returns Seq[T] or T will help , but that can involve: 1. large amount of change across the codebase, 2. hurt performance for clear case.

So seems better to just add a new dedicated implementation with onBackPressureDrop as reactor-core?

image

@He-Pin He-Pin changed the title Add Flow#onBackpressure(T elem) operator Add Flow#onBackpressureDrop() operator Jun 24, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

1 participant