Channel factories
empty
The channel.empty factory method, by definition, creates a channel that doesn't emit any value.
See also: ifEmpty.
from
Use channel.of or channel.fromList instead.
The channel.from method allows you to create a channel emitting any sequence of values that are specified as the method argument, for example:
ch = channel.from( 1, 3, 5, 7 )
ch.subscribe { v -> println "value: $v" }
The first line in this example creates a variable ch which holds a channel object. This channel emits the values specified as a parameter in the from method. Thus the second line will print the following:
value: 1
value: 3
value: 5
value: 7
The following example shows how to create a channel from a range of numbers or strings:
zeroToNine = channel.from( 0..9 )
strings = channel.from( 'A'..'Z' )
When the channel.from argument is an object implementing the (Java) Collection interface, the resulting channel emits the collection entries as individual items.
Thus the following two declarations produce an identical result even though in the first case the items are specified as multiple arguments while in the second case as a single list object argument:
channel.from( 1, 3, 5, 7, 9 )
channel.from( [1, 3, 5, 7, 9] )
But when more than one argument is provided, they are always managed as single emissions. Thus, the following example creates a channel emitting three entries each of which is a list containing two elements:
channel.from( [1, 2], [5,6], [7,9] )
fromLineage
The channel.fromLineage factory creates a channel that emits files from the lineage store that match the given key-value params:
channel
.fromLineage(workflowRun: 'lid://0d1d1622ced3e4edc690bec768919b45', label: ['alpha', 'beta'])
.view()
The above snippet emits files published by the given workflow run that are labeled as alpha and beta.
Available options: List of labels associated with the desired files. LID of the task run that produced the desired files. LID of the workflow run that produced the desired files.labeltaskRunworkflowRun
fromList
The channel.fromList method allows you to create a channel emitting the values provided as a list of elements, for example:
channel
.fromList( ['a', 'b', 'c', 'd'] )
.view { v -> "value: $v" }
Prints:
value: a
value: b
value: c
value: d
See also: channel.of factory method.
fromPath
You can create a channel emitting one or more file paths by using the channel.fromPath method and specifying a path
string as an argument. For example:
myFileChannel = channel.fromPath( '/data/some/bigfile.txt' )
The above line creates a channel and binds it to a Path object for the specified file.
channel.fromPath does not check whether the file exists.
Whenever the channel.fromPath argument contains a * or ? wildcard character it is interpreted as a glob path matcher.
For example:
myFileChannel = channel.fromPath( '/data/big/*.txt' )
This example creates a channel and emits as many Path items as there are files with txt extension in the /data/big folder.
Two asterisks, i.e. **, works like * but crosses directory boundaries. This syntax is generally used for matching complete paths. Curly brackets specify a collection of sub-patterns.
For example:
files = channel.fromPath( 'data/**.fa' )
moreFiles = channel.fromPath( 'data/**/*.fa' )
pairFiles = channel.fromPath( 'data/file_{1,2}.fq' )
The first line returns a channel emitting the files ending with the suffix .fa in the data folder and recursively in all its sub-folders. While the second one only emits the files which have the same suffix in any sub-folder in the data path. Finally the last example emits two files: data/file_1.fq and data/file_2.fq.
As in Linux Bash, the * wildcard does not catch hidden files (i.e. files whose name starts with a . character).
Multiple paths or glob patterns can be specified using a list:
channel.fromPath( ['/some/path/*.fq', '/other/path/*.fastq'] )
In order to include hidden files, you need to start your pattern with a period character or specify the hidden: true option. For example:
expl1 = channel.fromPath( '/path/.*' )
expl2 = channel.fromPath( '/path/.*.fa' )
expl3 = channel.fromPath( '/path/*', hidden: true )
The first example returns all hidden files in the specified path. The second one returns all hidden files ending with the .fa suffix. Finally the last example returns all files (hidden and non-hidden) in that path.
By default a glob pattern only looks for regular file paths that match the specified criteria, i.e. it won't return directory paths.
You can use the type option specifying the value file, dir or any in order to define what kind of paths you want. For example:
myFileChannel = channel.fromPath( '/path/*b', type: 'dir' )
myFileChannel = channel.fromPath( '/path/a*', type: 'any' )
The first example will return all directory paths ending with the b suffix, while the second will return any file or directory starting with a a prefix.
Available options: When When When When Maximum number of directory levels to visit (default: no limit) When Type of paths returned, either checkIfExiststrue throws an exception of the specified path do not exist in the file system (default: false)followLinkstrue it follows symbolic links during directories tree traversal, otherwise they are managed as files (default: true)globtrue interprets characters *, ?, [] and {} as glob wildcards, otherwise handles them as normal characters (default: true)hiddentrue includes hidden files in the resulting paths (default: false)maxDepthrelativetrue returned paths are relative to the top-most common directory (default: false)typefile, dir or any (default: file)
fromFilePairs
The channel.fromFilePairs method creates a channel emitting the file pairs matching a glob pattern provided
by the user. The matching files are emitted as tuples in which the first element is the grouping key of the matching pair and the second element is the list of files (sorted in lexicographical order). For example:
channel
.fromFilePairs('/my/data/SRR*_{1,2}.fastq')
.view()
It will produce an output similar to the following:
[SRR493366, [/my/data/SRR493366_1.fastq, /my/data/SRR493366_2.fastq]]
[SRR493367, [/my/data/SRR493367_1.fastq, /my/data/SRR493367_2.fastq]]
[SRR493368, [/my/data/SRR493368_1.fastq, /my/data/SRR493368_2.fastq]]
[SRR493369, [/my/data/SRR493369_1.fastq, /my/data/SRR493369_2.fastq]]
[SRR493370, [/my/data/SRR493370_1.fastq, /my/data/SRR493370_2.fastq]]
[SRR493371, [/my/data/SRR493371_1.fastq, /my/data/SRR493371_2.fastq]]
The glob pattern must contain at least one * wildcard character.
Multiple glob patterns can be specified using a list:
channel.fromFilePairs( ['/some/data/SRR*_{1,2}.fastq', '/other/data/QFF*_{1,2}.fastq'] )
Alternatively, it is possible to implement a custom file pair grouping strategy providing a closure which, given the current file as parameter, returns the grouping key. For example:
channel
.fromFilePairs('/some/data/*', size: -1) { file -> file.extension }
.view { ext, files -> "Files with the extension $ext are $files" }
Available options: When When When When Maximum number of directory levels to visit (default: no limit) Defines the number of files each emitted item is expected to hold (default: 2). Set to Type of paths returned, either checkIfExiststrue throws an exception of the specified path do not exist in the file system (default: false)followLinkstrue it follows symbolic links during directories tree traversal, otherwise they are managed as files (default: true)flattrue the matching files are produced as sole elements in the emitted tuples (default: false).hiddentrue includes hidden files in the resulting paths (default: false)maxDepthsize-1 for any.typefile, dir or any (default: file)
fromSRA
The channel.fromSRA method queries the NCBI SRA database and returns a channel emitting the FASTQ files matching the specified criteria i.e project or accession number(s). For example:
channel
.fromSRA('SRP043510')
.view()
It returns:
[SRR1448794, ftp://ftp.sra.ebi.ac.uk/vol1/fastq/SRR144/004/SRR1448794/SRR1448794.fastq.gz]
[SRR1448795, ftp://ftp.sra.ebi.ac.uk/vol1/fastq/SRR144/005/SRR1448795/SRR1448795.fastq.gz]
[SRR1448792, ftp://ftp.sra.ebi.ac.uk/vol1/fastq/SRR144/002/SRR1448792/SRR1448792.fastq.gz]
[SRR1448793, ftp://ftp.sra.ebi.ac.uk/vol1/fastq/SRR144/003/SRR1448793/SRR1448793.fastq.gz]
[SRR1910483, ftp://ftp.sra.ebi.ac.uk/vol1/fastq/SRR191/003/SRR1910483/SRR1910483.fastq.gz]
[SRR1910482, ftp://ftp.sra.ebi.ac.uk/vol1/fastq/SRR191/002/SRR1910482/SRR1910482.fastq.gz]
(remaining omitted)
Multiple accession IDs can be specified using a list object:
ids = ['ERR908507', 'ERR908506', 'ERR908505']
channel
.fromSRA(ids)
.view()
[ERR908507, [ftp://ftp.sra.ebi.ac.uk/vol1/fastq/ERR908/ERR908507/ERR908507_1.fastq.gz, ftp://ftp.sra.ebi.ac.uk/vol1/fastq/ERR908/ERR908507/ERR908507_2.fastq.gz]]
[ERR908506, [ftp://ftp.sra.ebi.ac.uk/vol1/fastq/ERR908/ERR908506/ERR908506_1.fastq.gz, ftp://ftp.sra.ebi.ac.uk/vol1/fastq/ERR908/ERR908506/ERR908506_2.fastq.gz]]
[ERR908505, [ftp://ftp.sra.ebi.ac.uk/vol1/fastq/ERR908/ERR908505/ERR908505_1.fastq.gz, ftp://ftp.sra.ebi.ac.uk/vol1/fastq/ERR908/ERR908505/ERR908505_2.fastq.gz]]
Each read pair is implicitly managed and returned as a list of files.
This method uses the NCBI ESearch API behind the scenes, therefore it allows the use of any query term supported by this API.
To access the ESearch API, you must provide your NCBI API keys through one of the following ways:
-
The
apiKeyoption:channel.fromSRA(ids, apiKey:'0123456789abcdef') -
The
NCBI_API_KEYvariable in your environment:export NCBI_API_KEY=0123456789abcdef
Available options: NCBI user API key. Enable/disable the caching API requests (default: Maximum number of entries that can be retried (default: unlimited) . Allow choosing the protocol for the resulting remote URLs. Available choices: Set a retry policy in case the SRA request fails with a retriable error.
The retry policy is set as a Map specifying the different policy properties. Available retry policy properties: The following code snippet shows an example for using the apiKeycachetrue).maxprotocolftp, http, https (default: ftp).retryPolicyProperty Description Default delayDelay when retrying failed SRA requests. 500msjitterJitter value when retrying failed SRA requests. 0.25maxAttemptsMax attempts when retrying failed SRA requests. 3maxDelayMax delay when retrying failed SRA requests. 30schannel.fromSRA factory method with a custom retryPolicy.channel.fromSRA(ids, retryPolicy: [delay: '250ms', maxAttempts: 5])
interval
The interval method emits an incrementing index (starting from zero) at a periodic interval. For example:
channel.interval('1s').view()
The above snippet will emit 0, 1, 2, and so on, every second, forever. You can use an operator such as take or until to close the channel based on a stopping condition.
An optional closure can be used to transform the index. Additionally, returning channel.STOP will close the channel. For example:
ch = channel.interval('1s') { i ->
i == 10 ? channel.STOP : i
}
ch.view()
of
The channel.of method allows you to create a channel that emits the arguments provided to it, for example:
ch = channel.of( 1, 3, 5, 7 )
ch.view { v -> "value: $v" }
The first line in this example creates a variable ch which holds a channel object. This channel emits the arguments
supplied to the of method. Thus the second line prints the following:
value: 1
value: 3
value: 5
value: 7
Ranges of values are expanded accordingly:
channel
.of(1..23, 'X', 'Y')
.view()
Prints:
1
2
3
4
:
23
X
Y
See also: channel.fromList factory method.
topic
In versions of Nextflow prior to 25.04, this feature requires the nextflow.preview.topic feature flag to be enabled.
A topic channel is a channel that can receive values from many sources implicitly based on a matching topic name.
You can think of it as a channel that is shared across many different processes using the same topic name.
A process output can be assigned to a topic using the topic option on an output, for example:
process hello {
output:
val('hello'), topic: my_topic
// ...
}
process bye {
output:
val('bye'), topic: my_topic
// ...
}
The channel.topic function returns the topic channel for the given name, which can be used as input to a process or operator, like any other channel:
channel.topic('my-topic').view()
This approach is a convenient way to collect related items from many different sources without explicitly connecting them (e.g. using the mix operator).
Any process that consumes a topic channel (directly or indirectly) should not send any outputs to that topic, or else the pipeline will hang forever.
See also: Options for process outputs.
value
The channel.value function creates a dataflow value bound to the given argument. For example:
v1 = channel.value( 'Hello there' )
v2 = channel.value( [1,2,3,4,5] )
The first line creates a dataflow value bound to the string 'Hello there'. The second line creates a dataflow value bound to the list [1,2,3,4,5], which is treated as a single value in dataflow logic.
watchPath
The channel.watchPath method watches a folder for one or more files matching a specified pattern. As soon as there
is a file that meets the specified condition, it is emitted over the channel that is returned by the watchPath method.
The condition on files to watch can be specified by using * or ? wildcard characters i.e. by specifying a glob path matching criteria.
For example:
channel
.watchPath( '/path/*.fa' )
.subscribe { fa -> println "Fasta file: $fa" }
By default it watches only for new files created in the specified folder. Optionally, it is possible to provide a second argument that specifies what event(s) to watch. The supported events are:
create: A new file is created (default)modify: A file is modifieddelete: A file is deleted
You can specify more than one of these events by using a comma separated string as shown below:
channel
.watchPath( '/path/*.fa', 'create,modify' )
.subscribe { fa -> println "File created or modified: $fa" }
The channel.watchPath factory waits endlessly for files that match the specified pattern and event(s), which means
that it will cause your pipeline to run forever. Consider using the take or until operator to close the channel when
a certain condition is met (e.g. after receiving 10 files, receiving a file named DONE).
The channel.watchPath factory only works with local and shared filesystems. It does not support object storage such as S3.
See also: channel.fromPath factory method.