Parallel loops with foreach & doFuture

Author

Marie-Hélène Burle

The foreach package

The foreach package implements a looping construct without an explicit counter. It doesn’t require the preallocation of an output container, it brings to R an equivalent of the Python or Julia list comprehensions, and mostly, it allows for an easy execution of loops in parallel. Unlike loops, it creates variables (loops are used for their side-effect).

We will explore an example to calculate the sum of 1e3 random vectors of length 3.

First, let’s launch an interactive job with a single task (by default Slurm grants one CPU per task, so we are asking for one CPU):

salloc --time=50 --mem-per-cpu=3700M --ntasks=1

You can see the full list of salloc options here.

Then we can launch R interactively:

R

We are now in the R terminal and can start typing R commands.

Let’s load the foreach package:

library(foreach)

Below is a classic while loop:

set.seed(2)
result1 <- numeric(3)             # Preallocate output container
i <- 0                            # Initialise counter variable

while(i < 1e3) {
  result1 <- result1 + runif(3)   # Calculate the sum
  i <- i + 1                      # Update the counter
}

Here is the equivalent code using foreach:

set.seed(2)
result2 <- foreach(i = 1:1e3, .combine = '+') %do% runif(3)

We can verify that both expressions return the same result:

identical(result1, result2)
[1] TRUE

The best part of foreach is that you can turn sequential loops into parallel ones.

There are many parallelization backends available: doFuture, doMC, doMPI, doParallel, doRedis, doRNG, doSNOW, and doAzureParallel.

In this lesson, we will use doFuture, a modern package which allows to evaluate foreach expressions following any of the strategies of the future package.

So first, what is the future package?

The future package

A future is an object that acts as an abstract representation for a value in the future. A future can be resolved (if the value has been computed) or unresolved. If the value is queried while the future is unresolved, the process is blocked until the future is resolved. Futures thus allow for asynchronous and parallel evaluations.

The future package allows to evaluate futures sequentially or in various forms of parallelism while keeping code simple and consistent. The evaluation strategy is set thanks to the plan function:

  • plan(sequential):
    Futures are evaluated sequentially in the current R session.

  • plan(multisession):
    Futures are evaluated by new R sessions spawned in the background (multi-processing in shared memory).

  • plan(multicore):
    Futures are evaluated in processes forked from the existing process (multi-processing in shared memory).

  • plan(cluster):
    Futures are evaluated on an ad-hoc cluster (distributed parallelism across multiple nodes).

Consistency

To ensure a consistent behaviour across plans, all evaluations are done in a local environment:

library(future)

a <- 1

b %<-% {      # %<-% is used instead of <- to use futures
  a <- 2
}

a
[1] 1

The doFuture package

The doFuture package allows to evaluate foreach expressions across the evaluation strategies of the future package very easily.

Let’s load the doFuture package:

library(doFuture)

This automatically loads the foreach and future packages.

We need to choose an evaluation strategy for our futures (e.g. plan(multicore)):

plan(multicore)

To run the code in parallel, we can now replace %do% with %dofuture%.

There is however one last twist: whenever you create random numbers in parallel, it isn’t enough to use set.seed() to ensure reproducibility. You also need to make sure to generate parallel-safe random numbers. Using the %seed% operator (with %seed% TRUE) or the option .options.future = list(seed = TRUE) pregenerates the random seeds for all iterations using L’Ecuyer-CMRG RNG streams1.

Here are the two equivalent syntaxes:

set.seed(2)
result3 <- foreach(
  i = 1:1e3,
  .options.future = list(seed = TRUE),
  .combine = '+'
) %dofuture% {
  runif(3)
}
set.seed(2)
result3 <- foreach(i = 1:1e3, .combine = '+') %dofuture% {
  runif(3)
} %seed% TRUE

Of course remember that we asked Slurm for a single CPU (--ntasks=1). So we don’t have the hardware to run any code in parallel with our current job.

It is now time to play with our code with all serial and parallel methods and do some benchmarking.

Benchmarks

With the overhead of parallelization, it doesn’t make sense to parallelize such a fast code: the parallel version will take longer than the serial one.

Let’s artificially make our code much slower without adding any complexity that would distract us from the parallelization question. To do that, we will simply add a delay at each iteration:

set.seed(2)
result2 <- foreach(i = 1:1e3, .combine = '+') %do% {
  Sys.sleep(0.01)         # Wait for 0.01s
  runif(3)
}

Now, let’s load the bench package that we will use for benchmarking our various tests:

library(bench)

Reference timing

Let’s first time this to get a reference:

set.seed(2)
bm <- mark(
  result2 <- foreach(i = 1:1e3, .combine = '+') %do% {
    Sys.sleep(0.01)
    runif(3)
  }
)

bm$median
[1] 11.4s

Plan sequential

This is the parallelizable foreach code, but run sequentially:

plan(sequential)     # Set the evaluation strategy

set.seed(2)
bm <- mark(
  result3 <- foreach(i = 1:1e3, .combine = '+') %dofuture% {
    Sys.sleep(0.01)
    runif(3)
  } %seed% TRUE
)

bm$median
[1] 10.6s

No surprise: this is similar to the previous timing.

Multi-processing in shared memory

Number of cores

future provides availableCores() to detect the number of available cores:

availableCores()
cgroups.cpuset
             1

Similar to parallel::detectCores().

This detects the number of CPU cores available to us on the current compute node, that is, what we can use for shared memory multi-processing. Since we asked for a single task (--ntasks=1) and since by default Slurm grants one CPU per task, we have a single CPU available.

To be able to run our code in parallel, we need to have access to at least 2 CPUs each. So let’s quit the R session (with Ctrl+D or quit()—when asked whether to save a workspace image, answer n), terminate our interactive job (also with Ctrl+D) and ask for a different job.

Don’t forget to relinquish your interactive job with Ctrl+D otherwise it will be running for the full 50 min, making the hardware it uses unavailable to all of us until the job expires.

The cluster for this course is made of 40 nodes with 4 CPUs each. We want to test shared memory parallelism, so our job needs to stay within one node. We can thus ask for a maximum of 4 CPUs and we want to ensure that we aren’t getting them on different nodes.

If we all ask for 4 CPUs in an interactive session, we are fine if there are 40 or fewer of us. If we are too numerous, the first 40 people to ask for an interactive job will get it, but the remainder of us will have their job requests pending, waiting for resources to become available, for as long as the lucky 40 are running their session. That’s the big downside of interactive sessions.

A better approach when we need a lot of resource is to write the code in a script and run it with sbatch. That way, everybody will get to run their code with minimal delay.

Open a text file (let’s call it rf.R since it creates a random forest object) with the text editor of your choice, for instance nano:

nano rf.R

We will first play with it to see how many cores are available to us, so write in your script:

rf.R
library(future)   # Don't forget to load the packages in your script
availableCores()

Save and close the text editor.

Now, we want to create a shell script for Slurm. Let’s call it rf.sh:

nano rf.sh

In it lives the hardware request and the code that needs to run:

rf.sh
#!/bin/bash
#SBATCH --time=10             # 10 min
#SBATCH --mem-per-cpu=3700M
#SBATCH --nodes=1
#SBATCH --cpus-per-task=4

Rscript rf.R                  # This is the code that we are running

You can see the full list of sbatch options here.

Save and close the text editor.

We can now run the batch script:

sbatch rf.sh

You can monitor it with sq, but this should be quasi instant. The result will be written to a file called slurm-xx.out with xx being the number of the job that just ran.

You can specify the output file name in the options of your sbatch script.

To see the result, we can simply print the content of that file to screen (you can run ls to see the list of files in the current directory):

cat slurm-xx.out    # Replace xx by the job number
system
     4

We now have 4 CPUs available on one node, so we can test shared memory parallelism.

Plan multisession

Shared memory multi-processing can be run with plan(multisession) that will spawn new R sessions in the background to evaluate futures.

Edit the R script (with nano rf.R):

rf.R
library(doFuture)
library(bench)

plan(multisession)

set.seed(2)

bm <- mark(
  result3 <- foreach(i = 1:1e3, .combine = '+') %dofuture% {
    Sys.sleep(0.01)
    runif(3)
  } %seed% TRUE
)

bm$median

Run the job with the new R script:

sbatch rf.sh

We now get in the output file:

Loading required package: foreach
Loading required package: future
Warning message:
Some expressions had a GC in every iteration; so filtering is disabled.
[1] 3.31s

We got a speedup of 11.4 / 3.31 = 3.4. Not bad considering that we have 4 CPU cores (the ideal speedup would be 4, but there is always some overhead to parallelization).

Plan multicore

Shared memory multi-processing can also be run with plan(multicore) (except on Windows) that will fork the current R process to evaluate futures.

Let’s modify our R script again:

rf.R
library(doFuture)
library(bench)

plan(multicore)

set.seed(2)

bm <- mark(
  result3 <- foreach(i = 1:1e3, .combine = '+') %dofuture% {
    Sys.sleep(0.01)
    runif(3)
  } %seed% TRUE
)

bm$median

Run the job:

sbatch rf.sh

We get:

Loading required package: foreach
Loading required package: future
Warning message:
Some expressions had a GC in every iteration; so filtering is disabled.
[1] 1.89s

We got a similar speedup of 11.4 / 3.01 = 3.8. This time we are really close to the ideal speedup of 4!

Multi-processing in distributed memory

Create a cluster of workers

To test parallel execution in distributed memory, let’s ask Slurm for 8 tasks by editing our rf.sh script:

rf.sh
#!/bin/bash
#SBATCH --time=10
#SBATCH --mem-per-cpu=3700M
#SBATCH --ntasks=8

Rscript rf.R      # This is the code that we are running

Let’s verify that we do get 8 tasks by accessing the SLURM_NTASKS environment variable from within R.

Edit rf.R to contain the following:

rf.R
as.numeric(Sys.getenv("SLURM_NTASKS"))

Run the job:

sbatch rf.sh

We get:

[1] 8

Let’s see which nodes we are using:

rf.R
system("srun hostname -s", intern = T)

We get:

[1] "node1" "node1" "node1" "node1" "node2" "node2" "node2" "node2"

To run the RandomForest code with distributed parallelism using 8 CPU cores across both nodes, we will need to create a cluster of workers. We do this with the makeCluster() function from the base R parallel package: we create a character vector with the names of the nodes our tasks are running on and pass this vector to the makeCluster() function:

## Create a character vector with the nodes names
hosts <- system("srun hostname -s", intern = T)

## Create the cluster of workers
cl <- parallel::makeCluster(hosts)

Let’s test it:

rf.R
library(doFuture)

hosts <- system("srun hostname -s", intern = T)
cl <- parallel::makeCluster(hosts)

cl

If we run this code, we get:

Loading required package: foreach
Loading required package: future
socket cluster with 8 nodes on hosts ‘node1’, ‘node2’

Make sure that your code has finished running before printing the output file. Remember that you can monitor the job with sq.

Plan cluster

We can now run the code in distributed memory parallelism:

rf.R
library(doFuture)
library(bench)

hosts <- system("srun hostname -s", intern = T)
cl <- parallel::makeCluster(hosts)
plan(cluster, workers = cl)

set.seed(2)

bm <- mark(
  result3 <- foreach(i = 1:1e3, .combine = '+') %dofuture% {
    Sys.sleep(0.01)
    runif(3)
  } %seed% TRUE
)

bm$median

We get:

Loading required package: foreach
Loading required package: future
Warning message:
Some expressions had a GC in every iteration; so filtering is disabled.
[1] 1.94s

Speedup: 11.4 / 1.94 = 5.9. Here again, this is not bad with 8 CPU cores, considering the added overhead of message passing between both nodes.

The cluster of workers can be stopped with:

parallel::stopCluster(cl)

Here, this is not necessary since our job stops running as soon as the execution is complete, but in other systems, this will prevent you from monopolizing hardware unnecessarily.