Saltar a contenido

5. Groovy Imports

There exists in Groovy a wealth of helper classes that can be imported into Nextflow scripts. In this chapter, we create a very small Workflow using the FastP tool to investigate importing the Groovy JSONSlurper class.

First, let's move into the chapter 4 directory:

cd groovy

Let's assume that we would like to pull in a samplesheet, parse the entries and run them through the FastP tool. So far, we have been concerned with local files, but Nextflow will handle remote files transparently:

1
2
3
4
5
6
7
workflow {
    params.input = "https://raw.githubusercontent.com/nf-core/test-datasets/rnaseq/samplesheet/v3.10/samplesheet_test.csv"

    Channel.fromPath(params.input)
    | splitCsv(header: true)
    | view
}

Let's write a small closure to parse each row into the now-familiar map + files shape. We might start by constructing the meta-map:

workflow {
    params.input = "https://raw.githubusercontent.com/nf-core/test-datasets/rnaseq/samplesheet/v3.10/samplesheet_test.csv"

    Channel.fromPath(params.input)
    | splitCsv(header: true)
    | map { row ->
        meta = row.subMap('sample', 'strandedness')
        meta
    }
    | view
}

... but this precludes the possibility of adding additional columns to the samplesheet. We might to ensure the parsing will capture any extra metadata columns should they be added. Instead, let's partition the column names into those that begin with "fastq" and those that don't:

(readKeys, metaKeys) = row.keySet().split { it =~ /^fastq/ }

New methods

We've introduced a new keySet method here. This is a method on Java's LinkedHashMap class (docs here)

We're also using the .split() method, which divides collection based on the return value of the closure. The mrhaki blog provides a succinct summary.

From here, let's

reads = row.subMap(readKeys).values().collect { file(it) }

... but we run into an error:

Argument of `file` function cannot be empty

If we have a closer look at the samplesheet, we notice that not all rows have two read pairs. Let's add a condition

1
2
3
4
5
reads = row
.subMap(readKeys)
.values()
.findAll { it != "" } // Single-end reads will have an empty string
.collect { file(it) } // Turn those strings into paths

Now we need to construct the meta map. Let's have a quick look at the FASTP module that I've already pre-defined:

process FASTP {
    container 'quay.io/biocontainers/fastp:0.23.2--h79da9fb_0'

    input:
    tuple val(meta), path(reads)

    output:
    tuple val(meta), path('*.fastp.fastq.gz') , optional:true, emit: reads
    tuple val(meta), path('*.json')           , emit: json

    script:
    def prefix = task.ext.prefix ?: meta.id
    if (meta.single_end) {
        // SNIP
    } else {
        // SNIP
    }

I can see that we require two extra keys, id and single_end:

1
2
3
meta = row.subMap(metaKeys)
meta.id ?= meta.sample
meta.single_end = reads.size == 1

This is now able to be passed through to our FASTP process:

Channel.fromPath(params.input)
| splitCsv(header: true)
| map { row ->
    (readKeys, metaKeys) = row.keySet().split { it =~ /^fastq/ }
    reads = row.subMap(readKeys).values()
    .findAll { it != "" } // Single-end reads will have an empty string
    .collect { file(it) } // Turn those strings into paths
    meta = row.subMap(metaKeys)
    meta.id ?= meta.sample
    meta.single_end = reads.size == 1
    [meta, reads]
}
| FASTP

FASTP.out.json | view

Let's assume that we want to pull some information out of these JSON files. To make our lives a little more convenient, let's "publish" these json files so that they are more convenient. We're going to discuss configuration more completely in a later chapter, but that's no reason not to dabble a bit here.

We'd like to add a publishDir directive to our FASTP process.

1
2
3
4
5
6
7
8
process {
    withName: 'FASTP' {
        publishDir = [
            path: { "results/fastp/json" },
            saveAs: { filename -> filename.endsWith('.json') ? filename : null },
        ]
    }
}

Groovy Tip: Elvis Operator

This pattern of returning something if it is true and somethingElse if not:

somethingThatMightBeFalsey ? somethingThatMightBeFalsey : somethingElse

has a shortcut in Groovy - the "Elvis" operator:

somethingThatMightBeFalsey ?: somethingElse

This enables us to iterate quickly to test out our JSON parsing without waiting on the FASTP caching to calculate on these slow virtual machines.

nextflow run . -resume

Let's consider the possibility that we'd like to capture some of these metrics so that they can be used downstream. First, we'll have a quick peek at the Groovy docs and I see that I need to import a JsonSlurper:

1
2
3
4
5
import groovy.json.JsonSlurper

// We can also import a Yaml parser just as easily:
//   import org.yaml.snakeyaml.Yaml
//   new Yaml().load(new FileReader('your/data.yml'))

Now let's create a second entrypoint to quickly pass these JSON files through some tests:

Entrypoint developing

Using a second Entrypoint allows us to do quick debugging or development using a small section of the workflow without disturbing the main flow.

1
2
3
4
workflow Jsontest {
    Channel.fromPath("results/fastp/json/*.json")
    | view
}

which we run with

nextflow run . -resume -entry Jsontest

Let's create a small function at the top of the workflow to take the JSON path and pull out some basic metrics:

def getFilteringResult(json_file) {
    fastpResult = new JsonSlurper().parseText(json_file.text)
}

Exercise

The fastpResult returned from the parseText method is a large Map - a class which we're already familiar with. Modify the getFilteringResult function to return just the after_filtering section of the report.

Solution

Here is one potential solution.

1
2
3
4
5
def getFilteringResult(json_file) {
    new JsonSlurper().parseText(json_file.text)
    ?.summary
    ?.after_filtering
}

Note

?. is new notation is a null-safe access operator. The ?.summary will access the summary property if the property exists.

We can then join this new map back to the original reads using the join operator:

1
2
3
4
FASTP.out.json
| map { meta, json -> [meta, getFilteringResult(json)] }
| join( FASTP.out.reads )
| view

Exercise

Can you amend this pipeline to create two channels that filter the reads to exclude any samples where the Q30 rate is less than 93.5%?

Solution
FASTP.out.json
| map { meta, json -> [meta, getFilteringResult(json)] }
| join( FASTP.out.reads )
| map { meta, fastpMap, reads -> [meta + fastpMap, reads] }
| branch { meta, reads ->
    pass: meta.q30_rate >= 0.935
    fail: true
}
| set { reads }

reads.fail | view { meta, reads -> "Failed: ${meta.id}" }
reads.pass | view { meta, reads -> "Passed: ${meta.id}" }