Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Ability to pipe stdout -> stdin between processes #5028

Open
ewels opened this issue May 27, 2024 · 15 comments
Open

Ability to pipe stdout -> stdin between processes #5028

ewels opened this issue May 27, 2024 · 15 comments

Comments

@ewels
Copy link
Member

ewels commented May 27, 2024

This suggestion / request has come up several times, so wanted to collect the latest thread into a GitHub issue ('cc @nh13 @kmhernan @mahesh-panchal @adamrtalbot @muffato )

Unix pipes are a powerful way to stream data from one process to another, without needing to write intermediate data to disk. Currently this is not possible to do between processes in Nextflow. Current practice is either to write intermediate files, or to put multiple tools into a single process and pipe within that single script block.

Support in Nextflow is not a new request, but is technically challenging for several reasons:

  • Making this work with distributed (cloud) clusters
  • Figuring out how publishing + retry would work

Piping output between containers does work:

Singularity

singularity exec img1.sif cmd1 | singularity exec img2.sif cmd2

Docker

docker run ubuntu printf "line1\nline2\n" | docker run -i ubuntu grep line2 | docker run -i ubuntu sed 's/line2/line3/g'

@mahesh-panchal has written a minimal example demo using named pipes:

And this was the demo I wrote for using named pipes. The issue there was clean up and potential process deadlock. It would likely work with containers too though:
So just for completion, one can send a pipe, but cleaning up ( i.e. removing the pipe afterwards is not simple because of the working directory isolation).

workflow {
    MKFIFO()
    SENDER( params.message, MKFIFO.out.pipe ) 
    RECEIVER ( MKFIFO.out.pipe ) | view
}

process MKFIFO {
    script:
    """
    mkfifo mypipe
    """

    output:
    path "mypipe", emit: pipe
}

process SENDER {
    input:
    val message
    path pipename

    script:
    """
    echo $message > $pipename
    """

    output:
    path pipename
    
}

process RECEIVER {
    input:
    path pipename

    script:
    """
    cat $pipename
    """

    output:
    stdout
}

And this is bad practice as it could easily lead to a process deadlock (edited)
The reason for this structure is because named pipes block until they're read from ( i.e stop the process from completing )

@bentsherman
Copy link
Member

Duplicate of #1822

@bentsherman bentsherman marked this as a duplicate of #1822 May 27, 2024
@bentsherman
Copy link
Member

The first challenge before you even get to the issue of pipes is determining when two processes can run on the same node. Even if we allow users to mark when tasks should be grouped, we still have to verify that a configuration is valid.

There is a nice way to make this work, which is an idea I've been toying with for DSL3. If we allow processes to be invoked within an operator closure, instead of baking a map operator into into the process, then it becomes very clear when two processes can be grouped. See this snippet from my fetchngs POC:

    sra_metadata
        |> map { meta ->
            sra = SRATOOLS_PREFETCH ( meta )
            SRATOOLS_FASTERQDUMP ( meta, sra )
        }
        |> set { reads }

In this example we know that, for each value that comes into the map from the input channel, the two tasks can be fused into a single job.

Then it's a matter of whether and how to actually "fuse" the tasks. I think we can take a similar approach to job arrays and task batching by creating a "meta-wrapper" script which calls the .command.run of each child task, but in this case we need to chain them rather than simply launching them in parallel, something like:

TASK1='/work/dir/1'
TASK2='/work/dir/2'

$TASK1/.command.run
# stage outputs of 1 as inputs of 2 ...
$TASK2/.command.run

Then the piping can be an enhancement on top of this, where if process 1 has a stdout output which matches a stdin input to process 2, we can re-jig the wrapper scripts to use a pipe:

$TASK1/.command.run | $TASK2/.command.run

@bentsherman
Copy link
Member

All that is to say, users should not have to work with named pipes directly. They should just use the stdout and stdin qualifiers, compose processes in such a way that Nextflow can identify opportunities to pipe and do all of the bash magic under the hood.

@mahesh-panchal
Copy link
Contributor

I agree that users shouldn't have to do that. I'm a little bit hesitant on the use of stdout and stdin though given the extra stuff that can end up there. It would be nice if named pipes were just created under the hood, particularly if a file was marked temporary somehow, or maybe streamable.

@bentsherman
Copy link
Member

That's a good point, I was only thinking of stdout and stdin but really we should be matching the path outputs and inputs of the processes.

I don't think we even need to mark the file as streamable. We just need to identify the output-to-input links and manage them with named pipes instead of files. And I think you can duplicate a named pipe in order to e.g. pipe it into a command and also write it to a file, if the user wants to keep it.

@nh13
Copy link

nh13 commented May 27, 2024

I’d caution against too much inference being done under the hood. lf I expect that two processes will be run, communicating a pipe, then it should fail if it doesn’t. I.e. I don’t want it to be best effort, I want it to be explicit in the language/operator I use.

@Poshi
Copy link

Poshi commented May 27, 2024

Be careful with some ugly software that looks like it could be pipe-friendly, but it isn't. I'm thinking on practices like opening the file to read the magic number, closing the file and then opening the file again to do the actual processing. It tends to happen when you use named pipes where a file is expected (when using stdin it usually works fine).
A way to mark some input/output as streamable (or the oposite, non-streamable) should be present. Otherwise, this kind of software will constantly fail.
Also, be careful when duplicating pipes. Adding a duplication to write a file is fine, but if the duplicated pipe goes to another program, it could cause some performance loss, even a deadlock if the buffers are not big enough and the results of the commands ends in a third command (like in A->B, A->C, B->D, C->D).

@bentsherman
Copy link
Member

The task chaining by itself is safe and could be automatic, though we'll probably gate it with a config option anyway. The piping is essentially an enhancement atop that, and should be explicitly enabled. Could be a process directive:

process foo {
  pipe true
  // ...
}

If Nextflow sees that it can chain two tasks, it will only use pipes if both processes have pipe true.

The nice thing is that everything we're talking about can be implemented manually in Nextflow. Just combine the processes into one process, and use named pipes as needed. That should make it easy to test different scenarios and see how far we can get with named pipes.

@muffato
Copy link

muffato commented May 27, 2024

I think pipe should be a property of each input/output channel. For instance, for an aligner it's likely that its input reference genome would not be pipe-able (because random seeks are needed) whereas the input reads would. For outputs, some file formats have a header with information such as the size of the data, etc. Typically, these files are first written with placeholder values and once the data are complete, the writer would seek back to the beginning of the file to complete the header.

@nh13
Copy link

nh13 commented May 28, 2024

Seqera containers could enable this. Since you know what combination of packages you need, and you can build the combination on demand for either Docker or Singularity. Then executing them in one container, using traditional Unix pipes, seems easy. If running using conda, then just build an environment with both. And if local, well that’s on you.

@bentsherman
Copy link
Member

@muffato I see your point. Some files might be amenable to streaming while others aren't. But I wonder if this needs to be defined for both the output and the input, or only the output? In theory, if I have an input stream and I seek to a particular position, the input stream could just block until that much data is received. But I don't know enough about how pipes work in this regard.

@nh13 Yes the software deps are another important point to consider. If both processes specify conda packages then it's easy, just merge them and check for conflicting versions. That can work with or without Wave. But if they use different containers, then you probably have to use Wave, and you probably have to use the conda strategy, although Wave might be able to merge two containers if they have the same base image.

@Poshi
Copy link

Poshi commented May 28, 2024

AFAIK, pipes are not seekable. Also, if an input pipe were able to seek, that would imply that the pipe would have to hold the whole piped content, which would be an issue if the content is big (as it is usually the case). Without the whole contents you cannot seek to the beginning and to the end, so... In the case of an output pipe, you would need until the file is closed to start spitting the data on the other pipe end (and keeping everything in memory in the meantime), because the whole contents could be modified until the file closure.
A streamable, non-seekable, data flow that can go thru a pipe is, by definition, something that you can (and must) read/write sequentially. Augmenting the capabilities of the pipes to make them seekable kind of defeats their purpose and advantages.
You need to define as pipeable each input and output independently, as you can have simultaneously all kinds of files: streamable inputs (like fastq files), non-streamable inputs (like index files), streamable outputs (likes a gzip file) and non streamable outputs (like those ones explained above).

@bentsherman
Copy link
Member

Fair enough. It just seems to me like Nextflow could provide some mechanism for the input file which can be a stream (if the output is streamed) but fall backs to a file or memory buffer if the downstream task tries to seek through it. That way you wouldn't have to think about whether an input file is streamable, because really it should be able to accept a stream or file. This is similar to how Fusion works.

Such a mechanism is in a weird space between Unix pipes and the Nextflow runtime. Maybe we could do it with some bash magic but I suspect it would be unreliable. Maybe Fusion could do it, even though it's intended mainly for remote storage, but because it is intercepting filesystem calls it could also probably implement a sort of "pipe with fallback".

@feiloo
Copy link

feiloo commented Jun 5, 2024

I think its an antipattern to keep process-scripts in separate nextflow-processes and then recombine them with pipes.
This also requires to ensure piped processes run simultaneously, which is a paradigm break from current nextflow afaict.

@nh13
Copy link

nh13 commented Jun 5, 2024

Why is it an anti-pattern? Unix pipe and filter has been a great data flow paradigm and pattern for a long time, and nextflow models that.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

7 participants