parallel_computation.Rmd
If an analysis or modelling task relies on having access to complete time series data, the complete sequential data has to be read into memory, but not for all gridcells of geospatial raster data. This lends itself to parallelising the task across gridcells. This not only speeds up the computation but may also avoid critical memory limitation.
The workflow proposed here is the following:
This is demonstrated below.
Convert geospatial raster data (e.g., saved as NetCDF, potentially as a list of NetCDF files for multiple time slices) into a tidy data frame. To avoid the memory limitation, we write tidy data into multiple files, separated by longitudinal bands.
This is demonstrated and explained in the vignette map2tidy example. The demo data are raster files of 30 x 30 in latitude and longitude.
library(dplyr)
library(map2tidy)
library(lubridate)
path <- file.path(system.file(package = "map2tidy"),"extdata")
files <- list.files(path, pattern = "demo_data_2017_month", full.names = TRUE)
outdir <- tempdir()
res_tidy <- map2tidy(
nclist = files,
varnam = "et",
lonnam = "lon",
latnam = "lat",
timenam = "time",
do_chunks = TRUE,
outdir = outdir,
fileprefix = "demo_data_2017",
ncores = 3,
overwrite = TRUE
)
Write the analysis or modelling task as a function,
add_loess()
in the exmple below, that runs with the time
series data for a single gridcell as a data frame as its first argument
and returns a data frame.
This can be anything. Here, we perform a LOESS spline on the time
series of the column et
and add its result as a new column
to the data frame. The function requires the time series data frame as
its input. Write the function into a file
R/add_loess.R
.
add_loess <- function(df){
df <- df |>
mutate(year_dec = lubridate::decimal_date(lubridate::ymd(datetime))) %>%
dplyr::mutate(loess = stats::loess( et ~ year_dec,
data = .,
span = 0.05 )$fitted)
return(df)
}
# # test it:
# df <- readRDS(file.path(outdir, "demo_data_2017_LON_+000.125.rds")) |>
# slice(1) |> unnest(data) |> select(datetime, et)
# add_loess(df)
Wrap that function into another function
add_loess_byLON()
that takes the list of longitudes as its
first argument and loops over gridcells (i.e. latitude values) within
that longitudinal band. The data frame df
is already
nested. Write modified data for this longitudinal band to file again.
Write this function into a file R/add_loess_byLON.R
add_loess_byLON <- function(LON_str){
# read from file that contains tidy data for a single longitudinal band
filnam <- file.path(outdir, paste0("demo_data_2017_", LON_str, ".rds"))
df <- readr::read_rds(filnam)
df <- df |>
# Uncomment code below to nest data by gridcell, if not already nested.
# # group data by gridcells and wrap time series for each gridcell into a new
# # column, by default called 'data'.
# dplyr::group_by(lon, lat) |>
# tidyr::nest() |>
# apply the custom function on the time series data frame separately for
# each gridcell.
dplyr::mutate(data = purrr::map(data, ~add_loess(.)))
# write (complemented) data to file
readr::write_rds(df,
file.path(outdir, paste0("demo_data_2017_", LON_str, "_COMPLEMENTED_xz.rds")),
compress = "xz") # xz seems most efficient
}
# # test it:
# LON <- "LON_+000.125"
# add_loess_byLON(LON)
# readRDS(file.path(outdir, "demo_data_2017_LON_+000.125_COMPLEMENTED.rds")) |>
# slice(1) |> unnest(data)
Write an R script that distributes jobs for parallel computation,
where each job processes data of one chunk and makes a number of calls
to add_loess_byLON()
. For example, if we have 30
longitudinal bands and 3 chunks, each chunk makes 10 calls. Write this
script so that it can be run from the shell and that it can deal with
three arguments:
Such an R script would look like this:
#!/usr/bin/env Rscript
args = commandArgs(trailingOnly=TRUE) # to receive arguments from the shell
library(map2tidy)
print("getting data for longitude indices:")
# list_of_LON_str = c("LON_+046.250", "LON_+046.750")
list_of_LON_str <- gsub(".*(LON_[0-9.+-]*).*.rds","\\1", res_tidy$data) # extract LONGITUDES from res_tidy
# get all available cores
ncores <- 3 # length(parallelly::availableWorkers())
# set up the cluster, sending required objects to each core
cl <- multidplyr::new_cluster(ncores) %>%
multidplyr::cluster_library(c("map2tidy",
"dplyr",
"purrr",
"tidyr",
"readr",
"here")) %>%
multidplyr::cluster_assign(add_loess_byLON = add_loess_byLON)
# distribute computation across the cores, calculating for all longitudinal
# indices of this chunk
out <- tibble(LON_str = list_of_LON_str) %>%
# multidplyr::partition(cl) %>%
dplyr::mutate(out = purrr::map( LON_str,
~add_loess_byLON(.)))
Save the sript as analysis/add_loess_bychunk.R
.
An example shell script to send four jobs on HPC (here: ETH Euler) looks like this: