Parallelization in R

Resources for scaling out your code
code
R
HPC
Author

Lisa

Published

December 8, 2023

For compute heavy and long running jobs, HPC can come to the rescue!

As a user remember though:

Expectations

Pros

  • CAN speed up otherwise long running tasks
  • CAN optimize and more efficiently use resources

Cons

  • CAN hog computing resources delay other scheduled jobs/access requests
  • CAN prevent others from getting work done (all requested resources are guaranteed and exclusively used by you)
  • CAN mask bad code design choices
  • CAN act as a denial-of-service attack to other systems

Getting started

Read:

When sessions are crashing

It would help to see the following outputs run from your Linux server when you experience the issue:

top
free -h 

Also, share a copy of your /etc/rstudio/profiles file for review for any inconsistencies.

Things to watch out for

  • Some parts of your code are going to be slow, no matter how many cores! - Roger / Amdahl’s Law
  • Profile your code in order to understand base performance with profvis
  • Unless built-in, often parallelization means subtasks where logs need to be manually captured and passed back (for troubleshooting if things go wrong)
  • Overhead / house keeping costs - Additional resources and time are needed to manage the parallel execution of a task. ie. coordinating communication between different processing units, syncing access to shared resources, managing the distribution of data, etc.

Making code faster

  • Make the code better: Assess performance and find bottlenecks
  • Make the machine better: Add more cores / machines

Terminology

  • Thread: An execution context
  • Process: The resources associated with a computation
  • Core: Independent CPU’s (often discussed as “number of cores”)
  • Socket: The interface between the CPU and the motherboard (for example with two sockets and 8 cores in each socket there are a total of 16 physical cores on the server)
  • CPU: Central Processing Unit

Use lscpu or cat /proc/cpuinfo to get your CPU architecture details

Reference: https://www.golinuxcloud.com/processors-cpu-core-threads-explained/

Improve performance by profiling your code

profvis

library(profvis)
library(ggplot2)
library(shiny)
#library(deSolve)

# Simple example
profvis({
  data(diamonds, package = "ggplot2")
  
  plot(price ~ carat, data = diamonds)
  m <- lm(price ~ carat, data = diamonds)
  abline(m, col = "red")
})

More complex example:

profvis({
  # generate a dataset
  data(diamonds, package = "ggplot2")
  
  # save it 
  write.csv(diamonds, "diamonds.csv")
  
  # load it
  csv_diamonds <- read.csv("diamonds.csv")
  
  # summarize
  summary(diamonds)
  
  # plot it  
  plot(price ~ carat, data = csv_diamonds)
  m <- lm(price ~ carat, data = csv_diamonds)
  abline(m, col = "red")
  
  #create histogram of values for price
  ggplot(data=csv_diamonds, aes(x=price)) +
    geom_histogram(fill="steelblue", color="black") +
    ggtitle("Histogram of Price Values")
  
  #create scatterplot of carat vs. price, using cut as color variable
  ggplot(data=diamonds, aes(x=carat, y=price, color=cut)) + 
    geom_point()
  
  #create scatterplot of carat vs. price, using cut as color variable
  ggplot(data=diamonds, aes(x=carat, y=price, color=cut)) + 
    geom_point()
  
  # Examples from: https://www.statology.org/diamonds-dataset-r/#:~:text=The%20diamonds%20dataset%20is%20a,the%20diamonds%20dataset%20in%20R. 
})

Shiny app example:


#profvis({runApp()})

#profvis({runApp(appDir = "./test_profvis/")})

profvis({runApp(appDir = ".")})

Other packages

  • bench
  • microbenchmark

Taking advantage of compute resources with parallelization in R

Taking advantage of native parallelization

Use packages like data.table that implement parallelism natively

library(data.table)

getDTthreads()
DT = as.data.table(iris)
DT[Petal.Width > 1.0, mean(Petal.Length), by = Species]

Explicitly programming parallelization

Read more: https://towardsdatascience.com/getting-started-with-parallel-programming-in-r-d5f801d43745

futureverse

Read more: https://www.futureverse.org/

library(future)
plan(multisession)

## Evaluate an R expression sequentially
y <- slow_fcn(X[1])

## Evaluate it in parallel in the background
f <- future(slow_fcn(X[1]))
y <- value(f)

## future.apply: futurized version of base R apply
library(future.apply)
y <-        lapply(X, slow_fcn)
y <- future_lapply(X, slow_fcn)

## furrr: futurized version of purrr
library(furrr)
y <- X |>        map(slow_fcn)
y <- X |> future_map(slow_fcn)

## foreach: futurized version (modern)
library(foreach)
y <- foreach(x = X) %do%       slow_fcn(x)
y <- foreach(x = X) %dofuture% slow_fcn(x)

## foreach: futurized version (traditional)
library(foreach)
doFuture::registerDoFuture()
y <- foreach(x = X) %do%    slow_fcn(x)
y <- foreach(x = X) %dopar% slow_fcn(x)

parallel

Example from: https://towardsdatascience.com/getting-started-with-parallel-programming-in-r-d5f801d43745

library(parallel)

# Generate data
data <- 1:1e9
data_list <- list("1" = data,
                  "2" = data,
                  "3" = data,
                  "4" = data)

# Single core
time = Sys.time()

time_benchmark <- system.time(
  lapply(data_list, mean)
)
single_core_time = difftime(Sys.time(), time)


# Detect the number of available cores and create cluster
time = Sys.time()

cores_avail= detectCores()

cl <- parallel::makeCluster(detectCores())
# Run parallel computation
time_parallel <- system.time(
  parallel::parLapply(cl,
                      data_list,
                      mean)
)

multiple_core_time = difftime(Sys.time(), time)

# Close cluster
parallel::stopCluster(cl)


print(single_core_time)
print(multiple_core_time)
print(cores_avail)

Running sequentially took 18.33 seconds. Running in parallel shortened that to 4.99 seconds.

parallely (part of futureverse)

Read more: https://parallelly.futureverse.org/

library(parallelly)
parallelly::availableCores()

foreach and futureverse

library(foreach)
library(doFuture)

years = 2024

plan(multisession, workers = 20)
results <- foreach(i=years, 
                   .combine = rbind) %dofuture% {
                     get_api_stats(yr=i, tmt=tmt, product = "litter")}

future.apply

Before:

table(okay2 <- apply(tab2, 1, function(x) {...

After:

library(future.apply)
plan(multisession)

table(okay2 <- future_apply(tab2, 1, function(x) {...

purrr

Before:

library(tidyverse)
sales_data_tbl %>%
  nest(data=c(date, value)) %>%
  mutate(model = purrr::map(data, function(df) {
    lm(value ~month(date) + as.numeric(date) data=df)
  })) 

After:

library(tidyverse)
library(purrr)
sales_data_tbl %>%
  nest(data=c(date, value)) %>%
  mutate(model = furrr::map(data, function(df) {
    lm(value ~month(date) + as.numeric(date) data=df)
  })) 

foreach and doParallel

library(foreach)
library(doParallel)

cl <- makeCluster(20)
registerDoParallel(cl)
results <- foreach(i=years, 
                   .combine = rbind) %dopar% {
                     get_api_stats(yr=i, tmt=tmt, product = "litter")}
stopCluster(cl)

clustermq

Lifesaver, when working in a HPC environment.

library(foreach)
library(clustermq)

n_cores <- parallel::detectCores() - 1
options(clustermq.scheduler="multicore")
getOption("clustermq.scheduler")

register_dopar_cmq(n_jobs = n_cores)
foreach(i = seq_len(n_cores)) %dopar% sqrt(i)

The future of clustermq: crew/mirai/nanonext

But what about Python?

References

  • https://colorado.posit.co/rsc/parallel_thinking/Parallel_Thinking.html#/title-slide
  • https://edavidaja.github.io/parallelooza/#/parallelooza
  • https://github.com/edavidaja/parallelooza
  • Shiny apps on Connect that feature long running jobs https://wlandau.github.io/crew/articles/shiny.html
  • workbench jobs and AWS batch: https://github.com/wlandau/crew.aws.batch

Further reading