Parallel computations#

cogent3 supports parallel computation explicitly for the case where the same calculations need to be performed on many different data sets. As an example, consider the case of aligning all the one-to-one orthologs of protein coding genes sampled from 100 vertebrate species where the data for each gene is stored in a separate text file. These files are used as input for an alignment algorithm that will produce a corresponding output file. In other words, applying the alignment algorithm to "homologs1.fasta" produces "aligned-homologs1.fasta".

We could perform the alignments in serial, one after the other, on one CPU core of a single computer. But what if we have 18,000 such files? If we had 18,000 CPUs then we could assign one alignment task to each file and be done in the same time as aligning a single file! This case is an example of “data parallelism” or “data level parallelism”.

There are multiple algorithmic approaches to solving parallel computation problems. The approach cogent3 adopts is that of a master process and helper (or worker) processes. The master process splits the work up amongst the available CPU cores. Using our alignment example, the master process assigns sets of files to each worker CPU core. Each worker then performs the alignment step on its designated files and returns each alignment to the master process.

Warning

It is not always faster to split tasks between processes. You should see a performance gain if the calculation time per task of the worker is significantly greater than the time it will take the master process to deal with the result – in our example, the time it takes to write the alignment to file.

While the alignment problem indicated above stipulated writing all results to separate files, this is not always a good idea. It can prove very inefficient if the individual alignment files are small. In such a case, storing the result in a single file (e.g. as a sqlitedb database) is better.

Parallel computation on a single computer#

This is the simplest case to implement, requires no additional software installs and will work with standalone scripts or within Jupyter notebooks. For this use case, cogent3.util.parallel uses the Python standard library concurrent.futures module.

Using app.apply_to()#

If you are using cogent3 composable apps, then the simplest approach is to use the apply_to() method. The conditions of parallel execution are controlled using the keyword arguments parallel and par_kw. The former indicates parallel execution is to be undertaken. The latter is how additional arguments are provided to parallel.map(). For instance, using 4 workers would be specified as:

results = app.apply_to(data, parallel=True, par_kw=dict(max_workers=4))

Note

If you are using mpi, set par_kw=dict(max_workers=4, use_mpi=True).

Directly using cogent3.util.parallel.as_completed()#

This function enables distribution of calculations across CPUs.

Warning

This function delivers results in the order completed! This is different to cogent3.util.parallel.map() which delivers results in the order provided.

The demo script shown below calculates a small number of prime numbers by splitting chunks of numbers across the provided cores. The key line is

result = parallel.as_completed(is_prime, PRIMES, max_workers=4)

The first argument, is_prime, is the function to be called with values from the data, PRIMES. The max_workers argument indicates how many worker processes to use. The elements of PRIMES will be broken into max_workers number of equal sized chunks. Each such chunk is applied to is_prime on a separate CPU. In this case, the returned results will be a series of bool values.

Note

If you don’t specify max_workers, all available CPUs will be used.

import math
import time

from collections import Counter

from cogent3.util import parallel


def is_prime(n):
    r = parallel.get_rank()

    if n % 2 == 0:
        return False

    sqrt_n = int(math.floor(math.sqrt(n)))
    for i in range(3, sqrt_n + 1, 2):
        if n % i == 0:
            return False

    return r


PRIMES = (
    [
        112272535095293,
        112582705942171,
        112272535095293,
        115280095190773,
        115797848077099,
        117450548693743,
        993960000099397,
    ]
    * 4
    * 20
)  # multiplying just to increase the amount of data to calculate


def main():
    print(f"World size: {parallel.SIZE}\n")

    start = time.time()
    result = Counter(parallel.as_completed(is_prime, PRIMES, max_workers=4))

    if sum(result.values()) != len(PRIMES):
        print(f" failed: {len(result)} != {len(PRIMES)} : {result=}")
    else:
        print(
            f"Time taken = {time.time() - start:.2f} seconds",
            f"CPU rank by number of jobs: {result}",
            sep="\n",
        )


if __name__ == "__main__":
    main()

Parallel computation on multiple computers#

On systems consisting of multiple separate computers, we use the mpi4py bindings to the message passing interface (MPI) standard. Specifically, cogent3.util.parallel.map(..., use_mpi=True, ...) uses the mpi4py futures module of mpi4py. This module is modelled after that of concurrent.futures but using it has some important differences.

First, you must install additional software. You will need to install a tool implementing the MPI system (e.g. openmpi) and the MPI python bindings library mpi4py. To install openmpi, you can use conda, homebrew or your preferred package manager. You can just pip install mpi4py.

Second, as described in the documentation on mpi4py futures, you need to write your code slightly differently. We provide an example that runs on a supercomputer. To execute a program on this facility, we submit a “job” to a “queuing system” (e.g. PBS) which controls the scheduling of our job with the computing resources we requested (how many CPUs, how much RAM, etc..). There are many such job control systems and the specifics of how to select the resources your job needs can vary between them. In general, however, our experience is the user writes two scripts.

  1. a script performing the computations you actually care about

  2. a bash script for the queuing system setting out the job parameters and invoking (1)

The example code presented below is based on the mpi4py demo script for computing prime numbers. In addition to validating the prime numbers, it also prints out the “MPI rank” of the processor [1]. The script relies on the environment variable, PBS_NCPUS [2], to establish the number of CPUs that are available. It prints to stdout, the rank of each processor [3].

To execute this script as part of a PBS job script you need to use the following command:

$ mpiexec -n $PBS_NCPUS python3 -m mpi4py.futures demo-mpi-parallel.py

Note

To execute it directly with 4 CPUs do:

$ PBS_NCPUS=4 mpiexec -n 4 python3 -m mpi4py.futures demo-mpi-parallel.py

The -n argument tells mpiexec to use this number of CPUs.

In the demo-mpi-parallel.py script, the key line is

result = parallel.map(is_prime, PRIMES, use_mpi=True, max_workers=PBS_CPUS)

The use_mpi argument invokes the correct back end, otherwise the interface is the same as described above.

Note

You can use mpi for parallel execution on a single computer. This can be useful for checking your code prior to migrating to a larger system.

import math
import os
import time

from collections import Counter

from cogent3.util import parallel


# the following environment variable is created by PBS on job execution
PBS_NCPUS = os.environ.get("PBS_NCPUS", None)
if PBS_NCPUS is None:
    raise RuntimeError("did not get cpu number from environment")

PBS_NCPUS = int(PBS_NCPUS)


def is_prime(n):
    # Postprocess the processor MPI rank to check your job got the resources
    # you requested
    r = parallel.get_rank()

    if n % 2 == 0:
        return False

    sqrt_n = int(math.floor(math.sqrt(n)))
    for i in range(3, sqrt_n + 1, 2):
        if n % i == 0:
            return False

    return r


PRIMES = (
    [
        112272535095293,
        112582705942171,
        112272535095293,
        115280095190773,
        115797848077099,
        117450548693743,
        993960000099397,
    ]
    * PBS_NCPUS
    * 20
)


def main():
    print(f"World size: {parallel.SIZE}\n")

    # Each worker will evaluate 20 prime numbers. This is just to slow the
    # script down!

    start = time.time()

    result = Counter(
        parallel.as_completed(is_prime, PRIMES, use_mpi=True, max_workers=PBS_NCPUS)
    )
    if sum(result.values()) != len(PRIMES):
        print(f" failed: {len(result)} != {len(PRIMES)} : {result=}")
    else:
        print(
            f"Time taken = {time.time() - start:.2f} seconds",
            f"CPU rank by number of jobs: {result}",
            sep="\n",
        )


if __name__ == "__main__":
    # This block is crucial! See
    # https://mpi4py.readthedocs.io/en/stable/mpi4py.futures.html
    # for why it needs to be done
    main()