Base API and schedulers
The main worker functions are wrapped in an R6
class
with the name of QSys
. This provides a standardized API to
the lower-level
messages that are sent via ZeroMQ
.
The base class itself is derived in scheduler classes that add the
required functions for submitting and cleaning up jobs:
+ QSys
|- Multicore
|- LSF
+ SGE
|- PBS
|- Torque
|- etc.
A pool of workers can be created using the workers()
function, which instantiates an object of the corresponding
QSys
-derived scheduler class. See ?workers
for
details.
# start up a pool of three workers using the default scheduler
w = workers(n_jobs=3)
# if we make an unclean exit for whatever reason, clean up the jobs
on.exit(w$finalize())
Worker startup
For workers that are started up via a scheduler, we do not know which
machine they will run on. This is why we start up every worker with a
TCP/IP address of the master socket that will distribute work.
This is achieved by the call to R common to all schedulers:
R --no-save --no-restore -e 'clustermq:::worker("{{ master }}")'
On the master’s side, we wait until a worker connects:
# this will block until a worker is ready
msg = w$receive_data()
Common data and exports
Workers will start up without any knowledge of what they should
process or how. In order to transfer initial data to the worker, we
first create and serialize a list object with the following fields:
fun
- the function to call with iterated data
const
- the constant data each function call should
receive
export
- objects that will be exported to the workers’
.GlobalEnv
rettype
- character string which data type to return;
e.g. list
, logical
common_seed
- random seed for function calls; will be
offset by job ID
token
- character string to identify this data set;
this is optional, if an automatically generated token will be returned
if none is given
# create a reusable, serialized ZeroMQ object with the common data on the master
w$set_common_data(fun, const, export, rettype, common_seed, token)
Workers that connect to the master will send a list with a field
token
. This can be used to check if the worker already
received the common data it is supposed to work on.
if (msg$token != <token>)
w$send_common_data()
Iterated data
If the worker has already received the common data, we can send it a
chunk of iterated arguments to work on. These are passed as a list of
iterables, e.g. a data.frame
with a column for each
iterated argument.
It also needs to have a column with name
<space>id<space>
, which will be used to
identify each call.
chunk = data.frame(arg1=1:5, arg2=5:1, ` id `=1:5)
w$send_job_data(chunk)
If the worker has finished processing, it will send a message with
the field result
that is a list, containing:
result
- a named rettype with results
warnings
- a list with warning messages of individual
calls
errors
- a list with error messages of individual
calls
msg = w$receive_data()
if (!is.null(msg$result)) {
# store result here, handle errors/warnings if required
}
Custom calls
Apart from sending common and iterated data that the worker will
process in chunks, it is also possible to send arbitrary calls that it
will evaluate. It needs the following fields:
expr
- the expression to be evaluated
env
- list with all additional objects required to
perform the call
ref
- an identifier for the call; will default to the
expression itself
w$send_call(expr, env=list(...), ref="mycall1")
Main event loop
Putting the above together in an event loop, we get what is
essentially implemented in master
.
w = workers(3)
on.exit(w$finalize())
while (we have new work to send) {
msg = w$receive_data()
if (!is.null(msg$result))
# handle result
if (msg$token != <token>)
w$send_common_data()
else
w$send_job_data(...)
}
# if proper cleanup is successful, cancel kill-on-exit
if (w$cleanup())
on.exit()
A loop of a similar structure can be used to extend
clustermq
. As an example, this
was done by drake
using common data and custom calls
only (no iterated chunks).