parallel_computation.Rmd
If an analysis or modelling task relies on having access to complete time series data, the complete sequential data has to be read into memory, but not for all gridcells of geospatial raster data. This lends itself to parallelising the task across gridcells. This not only speeds up the computation but may also avoid critical memory limitation.
The workflow proposed here is the following:
This is demonstrated below.
Convert geospatial raster data (e.g., saved as NetCDF, potentially as a list of NetCDF files for multiple time slices) into a tidy data frame. To avoid the memory limitation, we write tidy data into multiple files, separated by longitudinal bands.
This is demonstrated and explained in the vignette map2tidy example. The demo data are raster files of 30 x 30 in latitude and longitude.
library(dplyr)
library(map2tidy)
library(lubridate)
path <- file.path(system.file(package = "map2tidy"),"extdata")
files <- list.files(path, pattern = "demo_data_2017_month", full.names = TRUE)
outdir <- "analysis-output"; dir.create(outdir)
# outdir <- tempdir() # for running this in the vignette
res_tidy <- map2tidy(
nclist = files,
varnam = "et",
lonnam = "lon",
latnam = "lat",
timenam = "time",
do_chunks = TRUE,
outdir = outdir,
fileprefix = "demo_data_2017",
ncores = 3,
overwrite = TRUE
)
Write the analysis or modelling task as a function,
add_loess()
in the example below, that runs with the time
series data for a single gridcell as a data frame as its first argument
and returns a data frame.
This can be anything. Here, we perform a LOESS spline on the time
series of the column et
and add its result as a new column
to the data frame. The function requires the time series data frame as
its input. Write the function into a file
R/add_loess.R
.
add_loess <- function(df){
df <- df |>
mutate(year_dec = lubridate::decimal_date(lubridate::ymd(datetime))) %>%
dplyr::mutate(loess = stats::loess( et ~ year_dec,
data = .,
span = 0.05 )$fitted)
return(df)
}
# # test it:
# df <- readRDS(file.path(outdir, "demo_data_2017_LON_+000.125.rds")) |>
# slice(1) |> unnest(data) |> select(datetime, et)
# add_loess(df)
Wrap that function into another function
add_loess_byLON()
that takes a list of longitude strings as
its first argument and loops over gridcells (i.e. latitude values)
within that longitudinal band. The data frame df
is already
nested. Write modified data for this longitudinal band to file again.
Append this function to the file R/add_loess.R
thus
containing add_loess()
and
add_loess_byLON()
:
add_loess <- function(df){
df <- df |>
mutate(year_dec = lubridate::decimal_date(lubridate::ymd(datetime))) %>%
dplyr::mutate(loess = stats::loess( et ~ year_dec,
data = .,
span = 0.05 )$fitted)
return(df)
}
add_loess_byLON <- function(LON_str, indir){
# read from file that contains tidy data for a single longitudinal band
filnam <- file.path(indir, paste0("demo_data_2017_", LON_str, ".rds"))
df <- readr::read_rds(filnam)
df <- df |>
# Uncomment code below to nest data by gridcell, if not already nested.
# # group data by gridcells and wrap time series for each gridcell into a new
# # column, by default called 'data'.
# dplyr::group_by(lon, lat) |>
# tidyr::nest() |>
# apply the custom function on the time series data frame separately for
# each gridcell.
dplyr::mutate(data = purrr::map(data, ~add_loess(.)))
# write (complemented) data to file
readr::write_rds(df,
file.path(indir, paste0("demo_data_2017_", LON_str, "_COMPLEMENTED_xz.rds")),
compress = "xz") # xz seems most efficient
}
# # test it:
# LON <- "LON_+000.125"
# add_loess_byLON(LON, outdir)
# readRDS(file.path(outdir, "demo_data_2017_LON_+000.125_COMPLEMENTED.rds")) |>
# slice(1) |> unnest(data)
Write an R script that distributes jobs for parallel computation,
where each job processes data of one chunk and makes a number of calls
to add_loess_byLON()
. For example, if we have 30
longitudinal bands and 3 chunks, each chunk makes 10 calls. Write this
script so that it can be run from the shell and that it can deal with
three arguments:
Such an R script would look like this:
#!/usr/bin/env Rscript
args = commandArgs(trailingOnly=TRUE) # to receive arguments from the shell
# e.g. args <- list(1,4,30)
library(map2tidy)
library(dplyr)
source("../analysis/add_loess.R") # Load the workhorse function
indir <- "../analysis/analysis-output/" # Define the directory where the processed rds file are
list_of_files <- list.files(indir, pattern = "demo_data_2017_(LON_[0-9.+-]*).rds")
list_of_LON_str <- gsub(".*(LON_[0-9.+-]*).*.rds","\\1", list_of_files) # extract LONGITUDES from the list of filenames
vec_index <- map2tidy::get_index_by_chunk(
as.integer(args[1]),
as.integer(args[2]),
as.integer(args[3]))
# ifelse(is.null(args[3]), length(list_of_files), as.integer(args[3]))) # (optional, if not provided treats all available files)
print("Running analysis for longitudes:")
print(list_of_LON_str[vec_index])
# get all available cores on each node
# ncores <- 3
ncores <- length(parallelly::availableWorkers())
# set up the cluster, sending required objects to each core
cl <- multidplyr::new_cluster(ncores) %>%
multidplyr::cluster_library(c("map2tidy",
"dplyr",
"purrr",
"tidyr",
"readr",
"here")) %>%
multidplyr::cluster_assign(add_loess_byLON = add_loess_byLON,
indir = indir)
# distribute computation across the cores, calculating for all longitudinal
# indices of this chunk
out <- tibble(LON_str = list_of_LON_str[vec_index]) %>%
multidplyr::partition(cl) %>% # this distributes across threads/cores
dplyr::mutate(out = purrr::map( LON_str,
~add_loess_byLON(., indir)))
Save the script as analysis/add_loess_bychunk.R
.
An example shell script to send four jobs on HPC (here: ETH Euler) looks like this:
## #!/bin/bash
## njobs=4
## nlon=30
## for ((n=1;n<=${njobs};n++)); do
## echo "Submitting chunk number $n ..."
## bsub -W 72:00 -u bestocke -J "job_name $n" -R "rusage[mem=10000]" "Rscript vanilla analysis/add_loess_bychunk.R $n $njobs $nlon"
## done
And a SLURM batch array script for UniBe UBELIX that sends the code
as an array job to four nodes, to be stored as
analysis/batch.sh
. This is sent to the job scheduler as:
ssh ubelix
,
sbatch path/../../analysis/batch.sh
#! /usr/bin/bash -l
#SBATCH --job-name="map2tidy distributed"
#SBATCH --time=20:00:00
#SBATCH --partition=icpu-stocker # if you have access, this gives you priority
#SBATCH --array=1-4 # specifies the slurm array job with the number of tasks
#SBATCH --cpus-per-task=20 # nr of threads, used for shared memory jobs that run locally on a single compute node (default: 1)
#SBATCH --mail-user=your.email@unibe.ch
#SBATCH --mail-type=none # when do you want to get notified: none, all, begin, end, fail, requeue, array_tasks
#SBATCH --chdir=[[TODO path/../../analysis]] # define here the working directory which contains your R-script, and where the output will be written to.
export SBATCH_EXPORT=NONE # source: https://hpc-unibe-ch.github.io/slurm/submission.html#exportnone
export SLURM_EXPORT_ENV=ALL # source: https://hpc-unibe-ch.github.io/slurm/submission.html#exportnone
echo "Started on: $(date --rfc-3339=seconds)"
echo "Hostname: $(hostname)"
echo "Working directory: $PWD" # Is most likely the HOME directory. Allows to check in the log.
module load R
# remotes::install_github("geco-bern/map2tidy") # needed to be done once
## Run the R script for specific indices defined by the ARRAY index
Rscript add_loess_bychunk.R $SLURM_ARRAY_TASK_ID $SLURM_ARRAY_TASK_COUNT 30 # This only runs the first 30 longitude values instead of all longitudes
# NOTE: If you don't provide a chdir argument to SLURM, need provide to full path from the folder from which executing sbatch.
echo "Finished on: $(date --rfc-3339=seconds)"
## Started on: 2025-02-11 09:20:35+00:00
## Hostname: fv-az1344-226
## Working directory: /home/runner/work/map2tidy/map2tidy/vignettes
## sh: 17: module: not found
## Fatal error: cannot open file 'add_loess_bychunk.R': No such file or directory
## Finished on: 2025-02-11 09:20:35+00:00