If an analysis or modelling task relies on having access to complete time series data, the complete sequential data has to be read into memory, but not for all gridcells of geospatial raster data. This lends itself to parallelising the task across gridcells. This not only speeds up the computation but may also avoid critical memory limitation.

The workflow proposed here is the following:

  • Convert geospatial raster data into a tidy data frame.
  • Write the analysis or modelling task as a function.
  • Wrap that function for gridcells within a longitudinal band.
  • Write an R script that distributes jobs.

This is demonstrated below.

Make data tidy

Convert geospatial raster data (e.g., saved as NetCDF, potentially as a list of NetCDF files for multiple time slices) into a tidy data frame. To avoid the memory limitation, we write tidy data into multiple files, separated by longitudinal bands.

This is demonstrated and explained in the vignette map2tidy example. The demo data are raster files of 30 x 30 in latitude and longitude.

library(dplyr)
library(map2tidy)
library(lubridate)
path <- file.path(system.file(package = "map2tidy"),"extdata")
files <- list.files(path, pattern = "demo_data_2017_month", full.names = TRUE)
outdir <- "analysis-output"; dir.create(outdir)
# outdir <- tempdir() # for running this in the vignette
res_tidy <- map2tidy(
  nclist = files, 
  varnam = "et",
  lonnam = "lon", 
  latnam = "lat", 
  timenam = "time", 
  do_chunks = TRUE,
  outdir = outdir, 
  fileprefix = "demo_data_2017", 
  ncores = 3,
  overwrite = TRUE
  )

Define function

Write the analysis or modelling task as a function, add_loess() in the example below, that runs with the time series data for a single gridcell as a data frame as its first argument and returns a data frame.

This can be anything. Here, we perform a LOESS spline on the time series of the column et and add its result as a new column to the data frame. The function requires the time series data frame as its input. Write the function into a file R/add_loess.R.

add_loess <- function(df){
  df <- df |> 
    mutate(year_dec = lubridate::decimal_date(lubridate::ymd(datetime))) %>%
    dplyr::mutate(loess = stats::loess( et ~ year_dec,
                                        data = ., 
                                        span = 0.05 )$fitted)
  return(df)
}
# # test it:
# df <- readRDS(file.path(outdir, "demo_data_2017_LON_+000.125.rds")) |>
#   slice(1) |> unnest(data) |> select(datetime, et)
# add_loess(df)

By longitudinal band

Wrap that function into another function add_loess_byLON() that takes a list of longitude strings as its first argument and loops over gridcells (i.e. latitude values) within that longitudinal band. The data frame df is already nested. Write modified data for this longitudinal band to file again. Append this function to the file R/add_loess.R thus containing add_loess() and add_loess_byLON():

add_loess <- function(df){
  df <- df |> 
    mutate(year_dec = lubridate::decimal_date(lubridate::ymd(datetime))) %>%
    dplyr::mutate(loess = stats::loess( et ~ year_dec,
                                        data = ., 
                                        span = 0.05 )$fitted)
  return(df)
}

add_loess_byLON <- function(LON_str, indir){
  
  # read from file that contains tidy data for a single longitudinal band
  filnam <- file.path(indir, paste0("demo_data_2017_", LON_str, ".rds"))
  
  df <- readr::read_rds(filnam)
  
  df <- df |> 
    
    # Uncomment code below to nest data by gridcell, if not already nested.
    # # group data by gridcells and wrap time series for each gridcell into a new
    # # column, by default called 'data'.
    # dplyr::group_by(lon, lat) |> 
    # tidyr::nest() |> 
    
    # apply the custom function on the time series data frame separately for 
    # each gridcell.
    dplyr::mutate(data = purrr::map(data, ~add_loess(.)))
  
  # write (complemented) data to file
  readr::write_rds(df, 
                   file.path(indir, paste0("demo_data_2017_", LON_str, "_COMPLEMENTED_xz.rds")),
                   compress = "xz") # xz seems most efficient
}
# # test it:
# LON <- "LON_+000.125"
# add_loess_byLON(LON, outdir)
# readRDS(file.path(outdir, "demo_data_2017_LON_+000.125_COMPLEMENTED.rds")) |>
#   slice(1) |> unnest(data)

Distribute jobs

Write an R script that distributes jobs for parallel computation, where each job processes data of one chunk and makes a number of calls to add_loess_byLON(). For example, if we have 30 longitudinal bands and 3 chunks, each chunk makes 10 calls. Write this script so that it can be run from the shell and that it can deal with three arguments:

  • an index for the chunk
  • the total number of chunks
  • the total number of longitudinal bands to treat

Such an R script would look like this:

#!/usr/bin/env Rscript
args = commandArgs(trailingOnly=TRUE)   # to receive arguments from the shell
                                        # e.g. args <- list(1,4,30)
library(map2tidy)
library(dplyr)

source("../analysis/add_loess.R")          # Load the workhorse function
indir <- "../analysis/analysis-output/"    # Define the directory where the processed rds file are

list_of_files   <- list.files(indir, pattern = "demo_data_2017_(LON_[0-9.+-]*).rds")
list_of_LON_str <- gsub(".*(LON_[0-9.+-]*).*.rds","\\1", list_of_files) # extract LONGITUDES from the list of filenames
vec_index <- map2tidy::get_index_by_chunk(
  as.integer(args[1]),
  as.integer(args[2]),
  as.integer(args[3]))
  # ifelse(is.null(args[3]), length(list_of_files), as.integer(args[3]))) # (optional, if not provided treats all available files)

print("Running analysis for longitudes:")
print(list_of_LON_str[vec_index])

# get all available cores on each node
# ncores <- 3
ncores <- length(parallelly::availableWorkers())

# set up the cluster, sending required objects to each core
cl <- multidplyr::new_cluster(ncores) %>%
  multidplyr::cluster_library(c("map2tidy",
                                "dplyr",
                                "purrr",
                                "tidyr",
                                "readr",
                                "here")) %>%
  multidplyr::cluster_assign(add_loess_byLON = add_loess_byLON,
                             indir           = indir)

# distribute computation across the cores, calculating for all longitudinal
# indices of this chunk
out <- tibble(LON_str = list_of_LON_str[vec_index]) %>%
  multidplyr::partition(cl) %>% # this distributes across threads/cores
  dplyr::mutate(out = purrr::map( LON_str,
                                  ~add_loess_byLON(., indir)))

Save the script as analysis/add_loess_bychunk.R.

An example shell script to send four jobs on HPC (here: ETH Euler) looks like this:

## #!/bin/bash
## njobs=4
## nlon=30
## for ((n=1;n<=${njobs};n++)); do
##     echo "Submitting chunk number $n ..."
##     bsub -W 72:00 -u bestocke -J "job_name $n" -R "rusage[mem=10000]" "Rscript vanilla analysis/add_loess_bychunk.R $n $njobs $nlon"
## done

And a SLURM batch array script for UniBe UBELIX that sends the code as an array job to four nodes, to be stored as analysis/batch.sh. This is sent to the job scheduler as: ssh ubelix, sbatch path/../../analysis/batch.sh

#! /usr/bin/bash -l
#SBATCH --job-name="map2tidy distributed"
#SBATCH --time=20:00:00
#SBATCH --partition=icpu-stocker # if you have access, this gives you priority
#SBATCH --array=1-4              # specifies the slurm array job with the number of tasks
#SBATCH --cpus-per-task=20       # nr of threads, used for shared memory jobs that run locally on a single compute node (default: 1)
#SBATCH --mail-user=your.email@unibe.ch
#SBATCH --mail-type=none                              # when do you want to get notified: none, all, begin, end, fail, requeue, array_tasks
#SBATCH --chdir=[[TODO path/../../analysis]] # define here the working directory which contains your R-script, and where the output will be written to.

export SBATCH_EXPORT=NONE    # source: https://hpc-unibe-ch.github.io/slurm/submission.html#exportnone
export SLURM_EXPORT_ENV=ALL  # source: https://hpc-unibe-ch.github.io/slurm/submission.html#exportnone

echo "Started on: $(date --rfc-3339=seconds)"
echo "Hostname: $(hostname)"
echo "Working directory: $PWD"   # Is most likely the HOME directory. Allows to check in the log.
module load R

# remotes::install_github("geco-bern/map2tidy") # needed to be done once

## Run the R script for specific indices defined by the ARRAY index
Rscript add_loess_bychunk.R $SLURM_ARRAY_TASK_ID $SLURM_ARRAY_TASK_COUNT 30 # This only runs the first 30 longitude values instead of all longitudes
# NOTE: If you don't provide a chdir argument to SLURM, need provide to full path from the folder from which executing sbatch.

echo "Finished on: $(date --rfc-3339=seconds)"
## Started on: 2025-02-11 09:20:35+00:00
## Hostname: fv-az1344-226
## Working directory: /home/runner/work/map2tidy/map2tidy/vignettes
## sh: 17: module: not found
## Fatal error: cannot open file 'add_loess_bychunk.R': No such file or directory
## Finished on: 2025-02-11 09:20:35+00:00