This package will allow you to send function calls as jobs on a
computing cluster with a minimal interface provided by the
Q
function:
# load the library and create a simple function
library(clustermq)
= function(x) x * 2
fx
# queue the function call on your scheduler
Q(fx, x=1:3, n_jobs=1)
#> Running sequentially ('LOCAL') ...
#> [[1]]
#> [1] 2
#>
#> [[2]]
#> [1] 4
#>
#> [[3]]
#> [1] 6
Computations are done entirely on the network and without any temporary files on network-mounted storage, so there is no strain on the file system apart from starting up R once per job. All calculations are load-balanced, i.e. workers that get their jobs done faster will also receive more function calls to work on. This is especially useful if not all calls return after the same time, or one worker has a high load.
First, we need the ZeroMQ system library. This is probably already installed on your system. If not, your package manager will provide it:
# You can skip this step on Windows and macOS, the package binary has it
# On a computing cluster, we recommend to use Conda or Linuxbrew
brew install zeromq # Linuxbrew, Homebrew on macOS
conda install zeromq # Conda, Miniconda
sudo apt-get install libzmq3-dev # Ubuntu
sudo yum install zeromq-devel # Fedora
pacman -S zeromq # Arch Linux
Then install the clustermq
package in R from CRAN:
install.packages('clustermq')
Alternatively you can use the remotes
package to install
directly from Github:
# install.packages('remotes')
::install_github('mschubert/clustermq')
remotes# remotes::install_github('mschubert/clustermq', ref="develop") # dev version
You should be good to go!
By default, clustermq
will look for sbatch
(SLURM), bsub
(LSF), or qsub
(SGE) in your
$PATH
and use the scheduler that is available. If the
examples don’t run out of the box, you might need to set your scheduler
explicitly.
An HPC cluster’s scheduler ensures that computing jobs are distributed to available worker nodes. Hence, this is what clustermq interfaces with in order to do computations.
We currently support the following schedulers (either locally or via SSH):
options(clustermq.scheduler="multiprocess")
options(clustermq.scheduler="PBS"/"Torque")
options(clustermq.scheduler="ssh", clustermq.ssh.host=<yourhost>)
Default submission templates are provided and can be customized, e.g. to activate compute environments or containers.
The package is designed to distribute arbitrary function calls on HPC worker nodes. There are, however, a couple of caveats to observe as the R session running on a worker does not share your local memory.
The simplest example is to a function call that is completely
self-sufficient, and there is one argument (x
) that we
iterate through:
= function(x) x * 2
fx Q(fx, x=1:3, n_jobs=1)
#> Running sequentially ('LOCAL') ...
#> [[1]]
#> [1] 2
#>
#> [[2]]
#> [1] 4
#>
#> [[3]]
#> [1] 6
Non-iterated arguments are supported by the const
argument:
= function(x, y) x * 2 + y
fx Q(fx, x=1:3, const=list(y=10), n_jobs=1)
#> Running sequentially ('LOCAL') ...
#> [[1]]
#> [1] 12
#>
#> [[2]]
#> [1] 14
#>
#> [[3]]
#> [1] 16
If a function relies on objects in its environment that are not
passed as arguments, they can be exported using the export
argument:
= function(x) x * 2 + y
fx Q(fx, x=1:3, export=list(y=10), n_jobs=1)
#> Running sequentially ('LOCAL') ...
#> [[1]]
#> [1] 12
#>
#> [[2]]
#> [1] 14
#>
#> [[3]]
#> [1] 16
If we want to use a package function we need to load it on the worker
using a library()
call or referencing it with
package_name::
:
= function(x) {
fx `%>%` = dplyr::`%>%`
%>%
x ::mutate(area = Sepal.Length * Sepal.Width) %>%
dplyrhead()
}Q(fx, x=list(iris), n_jobs=1)
#> Running sequentially ('LOCAL') ...
#> [[1]]
#> Sepal.Length Sepal.Width Petal.Length Petal.Width Species area
#> 1 5.1 3.5 1.4 0.2 setosa 17.85
#> 2 4.9 3.0 1.4 0.2 setosa 14.70
#> 3 4.7 3.2 1.3 0.2 setosa 15.04
#> 4 4.6 3.1 1.5 0.2 setosa 14.26
#> 5 5.0 3.6 1.4 0.2 setosa 18.00
#> 6 5.4 3.9 1.7 0.4 setosa 21.06
clustermq
can also be used as a parallel backend for foreach
.
As this is also used by BiocParallel
,
we can run those packages on the cluster as well:
library(foreach)
register_dopar_cmq(n_jobs=2, memory=1024) # accepts same arguments as `workers`
foreach(i=1:3) %dopar% sqrt(i) # this will be executed as jobs
#> Running sequentially ('LOCAL') ...
#> [[1]]
#> [1] 1
#>
#> [[2]]
#> [1] 1.414214
#>
#> [[3]]
#> [1] 1.732051
library(BiocParallel)
register(DoparParam()) # after register_dopar_cmq(...)
bplapply(1:3, sqrt)
More examples are available in the user guide.
The following arguments are supported by Q
:
fun
- The function to call. This needs to be
self-sufficient (because it will not have access to the
master
environment)...
- All iterated arguments passed to the function. If
there is more than one, all of them need to be namedconst
- A named list of non-iterated arguments passed
to fun
export
- A named list of objects to export to the
worker environmentBehavior can further be fine-tuned using the options below:
fail_on_error
- Whether to stop if one of the calls
returns an errorseed
- A common seed that is combined with job number
for reproducible resultsmemory
- Amount of memory to request for the job
(bsub -M
)n_jobs
- Number of jobs to submit for all the function
callsjob_size
- Number of function calls per job. If used in
combination with n_jobs
the latter will be overall
limitchunk_size
- How many calls a worker should process
before reporting back to the master. Default: every worker will report
back 100 times totalThe full documentation is available by typing ?Q
.
There are some packages that provide high-level parallelization of R function calls on a computing cluster. A thorough comparison of features and performance is available on the wiki.
Briefly, we compare how long it takes different HPC scheduler tools
to submit, run and collect function calls of negligible processing time
(multiplying a numeric value by 2). This serves to quantify the maximum
throughput we can reach with BatchJobs
,
batchtools
and clustermq
.
We find that BatchJobs
is unable to process
106 calls or more but produces a reproducible
RSQLite
error. batchtools
is able to process
more function calls, but the file system practically limits it at about
106 calls. clustermq
has no problems processing
109 calls, and is still faster than batchtools
at 106 calls.
In short, use ClusterMQ
if you want:
Use batchtools
if:
Use Snakemake (or flowr
, remake
, drake
) if:
Don’t use batch
(last updated 2013) or BatchJobs
(issues with SQLite on network-mounted storage).