Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

PipelineBus.sendEvents locks on the sender PipelineOutput impacting overall throughput #16171

Open
jsvd opened this issue May 20, 2024 · 1 comment

Comments

@jsvd
Copy link
Member

jsvd commented May 20, 2024

Currently (up to 8.14.0 at the moment of writing), the PipelineBus class has a lock on the sender output:

    public void sendEvents(final PipelineOutput sender,
                           final Collection<JrubyEventExtLibrary.RubyEvent> events,
                           final boolean ensureDelivery) {
        if (events.isEmpty()) return; // This can happen on pipeline shutdown or in some other situations

        synchronized (sender) {
// .....
            addressesToInputs.forEach((address, addressState) -> {
// .....
                    PipelineInput input = addressState.getInput(); // Save on calls to getInput since it's volatile
// .....
                        lastResponse = input.internalReceive(clones);

internalReceive will call Queue.write that mainly does 3 steps:

    public long write(Queueable element) throws IOException {
//...
        byte[] data = element.serialize();
        lock.lock();
        this.headPage.write( .. );

This means that when there are multiple writers to the same pipeline and serialization + pagewrite take a long time, most threads will spend time waiting for 1 thread that is writing an event, which can be seen with the simple pipelines.yml:

- pipeline.id: source
  config.string: "input { java_generator {} } output { pipeline { send_to => [dest1] } }"
- pipeline.id: dest
  config.string: "input { pipeline { address => dest1 } } output { null {} }"

And queue.type: persisted in the logstash.yml. This will cause all but one of the workers of the upstream pipeline to be blocked at any given time:

❯ curl -s -XGET 'localhost:9600/_node/hot_threads?human=true&threads=30&stacktrace_size=10' | grep "thread name.*source.*worker" -A 1 | grep -v "\-\-"
12.17 % of cpu usage, state: blocked, thread name: '[source]>worker2', thread id: 54 
	app//org.logstash.plugins.pipeline.PipelineBus.sendEvents(PipelineBus.java:58)
12.12 % of cpu usage, state: blocked, thread name: '[source]>worker5', thread id: 60 
	app//org.logstash.plugins.pipeline.PipelineBus.sendEvents(PipelineBus.java:58)
11.96 % of cpu usage, state: blocked, thread name: '[source]>worker0', thread id: 51 
	app//org.logstash.plugins.pipeline.PipelineBus.sendEvents(PipelineBus.java:58)
11.93 % of cpu usage, state: blocked, thread name: '[source]>worker4', thread id: 58 
	app//org.logstash.plugins.pipeline.PipelineBus.sendEvents(PipelineBus.java:58)
11.89 % of cpu usage, state: runnable, thread name: '[source]>worker3', thread id: 56 
	[email protected]/jdk.internal.misc.Unsafe.unpark(Native Method)
11.84 % of cpu usage, state: blocked, thread name: '[source]>worker7', thread id: 63 
	app//org.logstash.plugins.pipeline.PipelineBus.sendEvents(PipelineBus.java:58)
11.82 % of cpu usage, state: blocked, thread name: '[source]>worker6', thread id: 62 
	app//org.logstash.plugins.pipeline.PipelineBus.sendEvents(PipelineBus.java:58)
11.66 % of cpu usage, state: blocked, thread name: '[source]>worker9', thread id: 68 
	app//org.logstash.plugins.pipeline.PipelineBus.sendEvents(PipelineBus.java:58)
11.57 % of cpu usage, state: blocked, thread name: '[source]>worker1', thread id: 52 
	app//org.logstash.plugins.pipeline.PipelineBus.sendEvents(PipelineBus.java:58)
11.48 % of cpu usage, state: blocked, thread name: '[source]>worker8', thread id: 65 
	app//org.logstash.plugins.pipeline.PipelineBus.sendEvents(PipelineBus.java:58)

This was introduced by #10872 to ensure proper order during pipeline shutdown. However it should be possible to improve concurrency by having a readwritelock that allows read access to the sender object during event processing, but uses the write lock for every other operation.

@jsvd
Copy link
Member Author

jsvd commented May 20, 2024

Impact on 7.2.0 -> 7.2.1 can be easily observed. For the following pipelines.yaml:

- pipeline.id: source
  config.string: "input { generator { count => 20000000 } } output { pipeline { send_to => destination } }"
- pipeline.id: destination
  queue.type: persisted
  config.string: "input { pipeline { address => destination } } output { null {} }"

we can observe the following numbers:

7.2.0 - 1m 46s

❯ curl -s localhost:9600/_node/stats/pipelines/destination | jq .pipelines.destination.events
{
  "duration_in_millis": 771,
  "filtered": 20000000,
  "in": 20000000,
  "out": 20000000,
  "queue_push_duration_in_millis": 90072
}

/tmp/logstash-7.2.1
❯ curl -s localhost:9600/_node/stats/pipelines/source | jq .pipelines.source.events          
{
  "duration_in_millis": 866418,
  "filtered": 20000000,
  "in": 20000000,
  "out": 20000000,
  "queue_push_duration_in_millis": 7082
}


7.2.1 - 2m 38s

❯ curl -s localhost:9600/_node/stats/pipelines/destination | jq .pipelines.destination.events
{
  "duration_in_millis": 565,
  "filtered": 20000000,
  "in": 20000000,
  "out": 20000000
  "queue_push_duration_in_millis": 3720,
}

/tmp/logstash-7.2.0
❯ curl -s localhost:9600/_node/stats/pipelines/source | jq .pipelines.source.events          
{
  "duration_in_millis": 1354787,
  "filtered": 20000000,
  "in": 20000000,
  "out": 20000000
  "queue_push_duration_in_millis": 12592,
}

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

2 participants