The first change we're going to make is to correct some repetitive code that we've seen quite a lot already in this workshop. The construction of the meta map from this row stutters quite a lot. We can make use of the subMap method available Maps to quickly return a new map constructed from the subset of an existing map:
The subMap method will take a collection of keys and construct a new map with just the keys listed in the collection. This method, in combination with the plus or + method for combining maps and resetting values should allow all contraction, expansion and modification of maps safely.
Exercise
Can you extend our workflow in an unsafe manner? Use the set operator to name the channel in our workflow above, and then map (the operator) over that without modification. In a separate map operation, try modifying the meta map in a way that is reflected in the first map.
Note that we're trying to do the wrong thing in this example to clarify what the correct approach might be.
Solution
To ensure that the modification of the map happens first, we introduce a sleep into the first map operation. This sleep emulates a long-running Nextflow process.
workflow{Channel.fromPath("data/samplesheet.csv").splitCsv(header:true).map{row->defmeta=row.subMap('id','repeat','type')[meta,[file(row.fastq1,checkIfExists:true),file(row.fastq2,checkIfExists:true)]]}.set{samples}samples.map{element->sleep10;element}.view{meta,reads->"Should be unmodified: $meta"}samples.map{meta,reads->meta.type=meta.type=="tumor"?"abnormal":"normal"[meta,reads]}.view{meta,reads->"Should be modified: $meta"}}
Exercise
How would you fix the example above to use the safe operators plus and subMap to ensure that the original map remains unmodified?
workflow{Channel.fromPath("data/samplesheet.csv").splitCsv(header:true).map{row->defmeta=row.subMap('id','repeat','type')[meta,[file(row.fastq1,checkIfExists:true),file(row.fastq2,checkIfExists:true)]]}.set{samples}samples.map{element->sleep10;element}.view{meta,reads->"Should be unmodified: $meta"}samples.map{meta,reads->defnewmap=[type:meta.type=="tumor"?"abnormal":"normal"][meta+newmap,reads]}.view{meta,reads->"Should be modified: $meta"}}
This is easy enough, but the groupTuple operator has to wait until all items are emitted from the incoming queue before it is able to reassemble the output queue. If even one read mapping job takes a long time, the processing of all other samples is held up. We need a way of signalling to Nextflow how many items are in a given group so that items can be emitted as early as possible.
By default, the groupTuple operator groups on the first item in the element, which at the moment is a Map. We can turn this map into a special class using the groupKey method, which takes our grouping object as a first parameter and the number of expected elements in the second parameter.
The previous exercise demonstrated the fan-in approach using groupTuple and groupKey, but we might want to fan out our processes. An example might be computing over some intervals - genotyping over intervals, for example.
We can take an existing bed file, for example and turn it into a channel of Maps.
We can use the combine operator to emit a new channel where each combined bam is attached to each bed file. These can then be piped into the genotyping process:
Finally, we can combine these genotyped bams back using groupTuple and another bam merge process. We construct our "merge" process that will combine the bam files from multiple intervals:
We might be tempted to pipe the output of GenotypeOnInterval directly into groupTuple, but the meta object we are passing down is still the groupKey we created earlier:
mapped_reads=MapReads(samples,reference).map{meta,bam->defkey=groupKey(meta.subMap('id','type'),meta.repeatcount)[key,bam]}.groupTuple()combined_bams=CombineBams(mapped_reads).map{meta,bam->[meta.subMap('id','type'),bam]}.combine(intervals)genotyped_bams=GenotypeOnInterval(combined_bams).view{meta,bamfile->"Meta is of ${meta.getClass()}"}
To ensure that grouping is performed only on the relevant elements, we can convert the groupKey back into a plain Map using the as Map operator. This allows the groupTuple operator to group by just the keys present in the map, similar to how subMap works. This approach ensures that downstream grouping and merging steps operate on the intended sample attributes.
If we would like to save the output of our MergeGenotyped process, we can "publish" the outputs of a process using the publishDir directive. Try modifying the MergeGenotyped process to include the directive:
This will publish all of the files in the output block of this process to the results/genotyped directory.
Workflow outputs
As of Nextflow 24.04 you can also manage result publication at the workflow level using the new output { } block. This allows you to publish files by pushing them from a channel instead of a process, similar to the channel operations we have been exploring in this workshop.