0

I am trying to test future.batchtools for parallelisation in R.

I have a small test job (run_futurebatchtools_job.R) as:

library(future)
library(future.batchtools)

# Set up the future plan to use SLURM via batchtools
plan(batchtools_slurm, 
     template = "Scripts/batch_tools_test/slurm_future_config.tmpl", 
     resources = list( partition = "small",
                       walltime = 86400,
                       memory = 1024,
                       ntasks = 1)
)

# Define your function
my_fun <- function(x) {
  Sys.sleep(x)  # Simulate heavy computation
  x^2
}

# Input values
input_values <- 100:150

# Run in parallel using future_lapply
results <- future.apply::future_lapply(input_values, my_fun)

# Check results
print(results)

print(warnings())

The slurm config file (slurm_future_config.tmpl) is:

#!/bin/bash
#SBATCH --job-name=<%= job.name %>
#SBATCH --output=<%= log.file %>
#SBATCH --ntasks=<%= resources$ntasks %>
#SBATCH --mem=<%= resources$memory %>MB
#SBATCH --partition=<%= resources$partition %>
#SBATCH --time=<%= ceiling(resources$walltime / 3600) %>:00:00

module purge 
module load r/4.3.3

Rscript -e 'batchtools::doJobCollection("<%= uri %>")'

The job runs as expected when I run interactive on the login node. As expected it spawns child jobs to be executed on slurm. But when I submit this parent job as a slurm job (run_futurebatchtools.sh), it fails.

#!/bin/bash
#SBATCH --job-name=futurebatchtools_test
#SBATCH --output=futurebatchtools_test.log
#SBATCH --ntasks=1
#SBATCH --time=01:00:00
#SBATCH --mem=2G
#SBATCH --partition=small

# Load R
module load  r/4.3.3

# Run your R script
Rscript Scripts/batch_tools_test/run_futurebatchtools_job.R

Since it will not be possible to run the job in the login node, I need to push it to the slurm array. How can I do it? I was initially trying to use batchtools directly. What I observed was that the parent job exits the cluster leaving the child jobs running. Is this issue inheritted by future.batchtools?

2 Answers 2

0

I have been dealing with a similar issue - calling an R script repeatedly with a slurm process. Here's the solution I landed on. Perhaps this will work; if using batchtools is a must, please disregard. I have an R script that looks as follows:

rgs <- commandArgs(trailingOnly = TRUE)
rep_id <- as.integer(args[1])             # from SLURM_ARRAY_TASK_ID
workdir <- args[2]

# Reproducible seed mapping
base_seed <- 12345L
set.seed(base_seed + rep_id)

# ---- your simulation here ----
# One run produces a single data frame with results across your 3000 conditions
# e.g., results <- run_all_conditions()  # data.frame with, say, 3000 rows

# Example placeholder:
library(lifeguard)
eg <- expand.grid(rho_x=c(0,.5, .75, .9, 1), 
                  rho_y = c(0,.5,.75, .9, 1), 
                  rho_z = c(0,.5), 
                  k = 1:4, 
                  n = c(25,50,75,100), 
                  b_1 = c(0, 1), 
                  b_2 = c(0, .5), 
                  model = c("adl", "ecm"), 
         stringsAsFactors=FALSE)

do_sim <- function(rho_x, rho_y, rho_z, k, n, b_1, b_2, model = c("adl", "ecm"), ...){
  # j <- 1792
  # rho_x <- eg$rho_x[j]
  # rho_y <- eg$rho_y[j]
  # rho_z <- eg$rho_z[j]
  # k <- eg$k[j]
  # n <- eg$n[j]
  # b_1 <- eg$b_1[j]
  # b_2 <- eg$b_2[j]
  mdl <- match.arg(model)
  
  e <- MASS::mvrnorm(n, rep(0, k+1), diag((k+1)), empirical=TRUE)
  z <- matrix(NA, n, k)
  z[1, 1:k] <- 0
  for(t in 2:n){
    z[t,1:k] <- rho_z*z[(t-1),1:k] + e[t,-1]
  }
  colnames(z) <- paste0("z", 1:ncol(z))
  x <- y <- c(0, rep(NA, (n-1)))
  for(t in 2:n){
    x[t] <- rho_x*x[(t-1)] + e[t,1]
  }
  for(t in 2:n){
    y[t] <- rho_y*y[(t-1)] + b_1*x[t] + b_2*x[(t-1)] + rnorm(1,0,1)
  }
  d <- data.frame(x=x, y=y)
  d <- cbind(d, z)
  nx <- setdiff(names(d), "y")
  if(mdl == "adl"){
    form <- reformulate(c("lag_n(y,1)", sprintf("lag_n(%s, 0:1)", nx)), response="y")
    adl <- lm(form, data=d)
    xc <- list()
    xc$x <- 3:4
    for(i in 1:k){
      mxc <- max(c(unlist(xc)))
      xc[[paste0("z", i)]] <- mxc + 1:2
    }
    lrm_ci(adl, y_coefs = 2, x_coefs=xc)
  }else{
    ecm_form <- reformulate(c("lag_n(y,1)", sprintf("lag_n(%s, 1)", nx), sprintf("diff_n(%s, 0:1)", nx)), response = "diff_n(y, 0:1)")
    ecm <- lm(ecm_form, data=d)
    xc <- as.list(1:length(nx) + 2)
    names(xc) <- nx
    lrm_ci(ecm, y_coefs = 2, x_coefs=xc, modtype="ecm")
  }
  
}

out <- purrr::pmap(eg, do_sim)
out <- dplyr::bind_rows(out, .id="row") 
out <- dplyr::filter(out, vbl %in% c("x", "z1"))

# Write one file per replication (no write contention)
# Parquet is fast/small; install.packages("arrow") if needed
outfile <- file.path(workdir, sprintf("rep_%05d.parquet", rep_id))
arrow::write_parquet(out, outfile)

The script takes two arguments at the command line (a job array number and a working directory, to be determined below). Otherwise, it just does the simulation in the code and writes the resulting data to a parquet file. The .sh file looks like this:

#!/bin/bash
#SBATCH -J sim
#SBATCH --account=uid
#SBATCH --array=0-9999%100
#SBATCH -c 1
#SBATCH --mem=512M
#SBATCH -t 00:15:00
#SBATCH -o logs/sim_%A_%a.out
#SBATCH -e logs/sim_%A_%a.err

module load r/4.5.0   

# write all files initially to a temporary directory
WORKDIR="${TMPDIR:-/tmp}/sim_${SLURM_ARRAY_JOB_ID}_${SLURM_ARRAY_TASK_ID}"
mkdir -p "$WORKDIR"
Rscript sim_new.r "$SLURM_ARRAY_TASK_ID" "$WORKDIR"

# Move the output atomically to a shared location
OUTDIR="$HOME/project/darmst46/lrm_coverage/output_new"
mkdir -p "$OUTDIR"
mv "$WORKDIR"/rep_*.parquet "$OUTDIR"/

I use the --array=0-9999%100 to run the simulation 10,000 times with as many as 100 processors at once. The -t is the amount of time allocated to a single run (not all the runs). The $SLURM_ARRAY_JOB_ID$ is an environment variable generated by slurm capturing the array number and $WORKDIR is something that gets cooked up for each different array job.

Sign up to request clarification or add additional context in comments.

Comments

0

Here is a solution to run nested slurm jobs using R batchtools. The method seems to work but with some caveats.

run_batchtools_job.R: The main script defining the job to be executed in parallel. This script will spawn the children jobs.

require(batchtools)

# Define the registry directory
reg_dir <- "batchtools_slurm_registry"

# Clear old registry
if (dir.exists(reg_dir)) {
  unlink(reg_dir, recursive=TRUE)
  Sys.sleep(30)
} 

# Make new registry
reg <- makeRegistry(file.dir = reg_dir,
                    seed = 5081,
                    conf.file = NA, 
                    packages = c("unixtools"))
reg$cluster.functions <- makeClusterFunctionsSlurm(template = "slurm_batchtools_config.tmpl", 
                                                   array.jobs = TRUE, 
                                                   scheduler.latency = 60,
                                                   fs.latency = 60 )


# Define a custom function
my_fun <- function(x, y) {
  Sys.sleep(x)  # pretend it's doing something heavy
  return((x^2 + y))
}

# Submit jobs to slurm
ids <- batchMap(reg = reg,
                fun = my_fun, 
                x = 100:150,
                more.args = list( y = 1 ))

result <- submitJobs(ids = ids, 
                     reg = reg,
                     resources = list( partition = "small",
                                       time = "1-00:00:00",
                                       memory = "1G",
                                       max.concurrent.jobs = 3,                                                                                                                           
                                       ntasks = 1 ) )

while( !waitForJobs(ids = ids, reg = reg, timeout = Inf, expire.after = 10000) ){
  Sys.sleep(60)
}

print("Job status::")
print(getStatus(ids = ids, reg = reg) )   

# Check for errors
error_msg <- getErrorMessages(ids = ids, reg = reg, missing.as.error = TRUE)
if(any(error_msg$error)){
  stop("Jobs failed with errors!")
  print(error_msg)
}

# Compile the output paths
result <- reduceResultsList(ids = ids, reg = reg)
print(result)

slurm_batchtools_config.tmpl: The template file for slurm job submission.

#!/bin/bash
#SBATCH --job-name=<%= job.name %>
#SBATCH --output=<%= log.file %>
#SBATCH --ntasks=<%= resources$ntasks %>
#SBATCH --mem=<%= resources$memory %>
#SBATCH --partition=<%= resources$partition %>
#SBATCH --time=<%= resources$time %>

module load  r/4.3.3

Rscript -e 'batchtools::doJobCollection("<%= uri %>")'

sbatch run_batchtools.sh: Submits the R script as parent slurm job.

#!/bin/bash
#SBATCH --job-name=batchtools_test
#SBATCH --output=batchtools_test.log
#SBATCH --ntasks=1
#SBATCH --time=01:00:00
#SBATCH --mem=2G
#SBATCH --partition=small

# Load R
module load  r/4.3.3

# Run your R script
Rscript run_batchtools_job.R

Ideally, run_batchtools_job.R should be directly run on the server login node. It will submit the jobs to be executed in parallel and then wait for them to be completed/terminated. In this case, max.concurrent.jobs is also honored and only the said number of jobs are executed at a time.

But when run_batchtools_job.R is submitted as a slurm job via sbatch run_batchtools.sh it does not honor the max.concurrent.jobs condition. It will list all the jobs in slurm and execute them as per resource availability. This could clog the server and other users might not be able to use the server.

An option could be to limit the number of jobs by chunking/grouping the jobs. This will allow only n.chunks number of jobs to be submitted in slurm by grouping them into total_jobs/n.chunks groups. Issue with this is that each job on slurm requires more resources.

ids[, chunk := chunk(job.id, n.chunks = 10, shuffle = FALSE)]

Also, I am not sure if there is a better way to stop the parent job from crashing after submitting all the children jobs. The following line does the job.

while( !waitForJobs(ids = ids, reg = reg, timeout = Inf, expire.after = 10000) ){ Sys.sleep(60) }

Comments

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.