diff --git a/R/clusterFunctionsOpenLava.R b/R/clusterFunctionsOpenLava.R index 8894745..4dbb6d6 100644 --- a/R/clusterFunctionsOpenLava.R +++ b/R/clusterFunctionsOpenLava.R @@ -30,10 +30,16 @@ makeClusterFunctionsOpenLava = function(template.file, list.jobs.cmd = c("bjobs" # or a non-existent job ID is entered. Sys.setenv(LSB_BJOBS_CONSISTENT_EXIT_CODE = "Y") - submitJob = function(conf, reg, job.name, rscript, log.file, job.dir, resources, arrayjobs) { + submitJob = function(conf, reg, job.name, rscript, log.file, job.dir, resources, arrayjobs, np = 1L) { outfile = cfBrewTemplate(conf, template, rscript, "job") # returns: "Job <128952> is submitted to default queue ." - res = runOSCommandLinux("bsub", stdin = outfile, stop.on.exit.code = FALSE) + if (np > 1L) { + args <- paste(c("-n ", np), collapse="") + res = runOSCommandLinux("bsub", args = args, stdin = outfile, stop.on.exit.code = FALSE) + } else { + res = runOSCommandLinux("bsub", stdin = outfile, stop.on.exit.code = FALSE) + } + # FIXME filled queues if (res$exit.code > 0L) { cfHandleUnknownSubmitError("bsub", res$exit.code, res$output) diff --git a/R/submitJobs.R b/R/submitJobs.R index 8d120c7..5178b2c 100644 --- a/R/submitJobs.R +++ b/R/submitJobs.R @@ -64,7 +64,7 @@ #' chunked = chunk(getJobIds(reg), n.chunks = 2, shuffle = TRUE) #' submitJobs(reg, chunked) submitJobs = function(reg, ids, resources = list(), wait, max.retries = 10L, chunks.as.arrayjobs = FALSE, - job.delay = FALSE, progressbar = TRUE) { + job.delay = FALSE, progressbar = TRUE, np = 1L) { ### helper function to calculate the delay getDelays = function(cf, job.delay, n) { if (is.logical(job.delay)) { @@ -215,6 +215,7 @@ submitJobs = function(reg, ids, resources = list(), wait, max.retries = 10L, chu batch.result = cf$submitJob( conf = conf, reg = reg, + np = np, job.name = sprintf("%s-%i", reg$id, id1), rscript = rscripts[i], log.file = getLogFilePath(reg, id1), diff --git a/examples/cfOpenLava/openlava-rmpi.tmpl b/examples/cfOpenLava/openlava-rmpi.tmpl new file mode 100644 index 0000000..63eb30e --- /dev/null +++ b/examples/cfOpenLava/openlava-rmpi.tmpl @@ -0,0 +1,7 @@ +#BSUB -J <%= job.name %> # name of the job +#BSUB -o <%= log.file %> # output is sent to logfile, stdout + stderr by default + +# very simple system, no resources.... + +# we merge R output with stdout from OpenLava, which gets then logged via -o option +/opt/openlava/bin/openmpi-mpirun -np 1 R CMD BATCH "<%= rscript %>" /dev/stdout diff --git a/external_tests/openlava-rmpi.R b/external_tests/openlava-rmpi.R new file mode 100644 index 0000000..3b7ccbd --- /dev/null +++ b/external_tests/openlava-rmpi.R @@ -0,0 +1,27 @@ +library(BatchJobs) + +conf = BatchJobs:::getBatchJobsConf() + +conf$cluster.functions = makeClusterFunctionsOpenLava("/home/clusteradmin/BatchJobs/examples/cfOpenLava/openlava-rmpi.tmpl") + +reg = makeRegistry(id = "BatchJobsRmpiExample") + +f = function(x) { + library("Rmpi") + + slaveno <- mpi.universe.size() - 1 + if (slaveno < 1) { + slaveno <- 1 + } + + mpi.spawn.Rslaves(nslaves=slaveno) + mpi.remote.exec(paste("I am",mpi.comm.rank(),"of",mpi.comm.size())) + mpi.close.Rslaves() + mpi.exit() + +} +batchMap(reg, f, 1) +submitJobs(reg, np=2) +showStatus(reg) +waitForJobs(reg) +showStatus(reg)