# A Future for R Henrik Bengtsson, UC San Francisco

useR 2016, Stanford, CA, 2016-06-28

??? This is a 15 + 3 minute presentation. # My first R assignment Calculating the sum \$1 + 2 + \ldots + 100\$: ```r > y <- sum(1:100) > y  5050 ``` # My first R assignment Calculating the sum \$1 + 2 + \ldots + 100\$: ```r > y <- slow_sum(1:100) ## 2 min > y  5050 ``` **Divide-and-conquer alternative**: Calculate \$a = 1 + 2, \ldots + 50\$, and **then** \$b = 51 + 52, \ldots + 100\$, and then add up \$a\$ and \$b\$: ```r > a <- slow_sum(1:50) ## 1 min ``` ```r > b <- slow_sum(51:100) ## 1 min ``` ```r > y <- a + b > y  5050 ``` ??? * All of the above steps are evaluated sequentially. # My first future assignment **Parallel divide-and-conquer**: Calculate \$a = 1 + \ldots + 50\$ and \$b = 51 + \ldots + 100\$ **at the same time**, and then add up \$a\$ and \$b\$: ```r library("future") plan(multiprocess) ## Parallel processing ```

Regular assignment:    `y  <-  x`
Future assignment:    `y %<-% x`

```r > a %<-% slow_sum(1:50) ## These two assignments are > b %<-% slow_sum(51:100) ## non-blocking and in parallel > ``` ```r > y <- a + b ## Waits for a and b ``` ```r > y  5050 ```

Friedman & Wise (1976, 1977), Hibbard (1976), Baker & Hewitt (1977)

# Definition: Future
`y %<-% { expr }`

* A **future** is an abstraction for a **value** that will be available later. * The value is the **result of an evaluated expression**. * The **state of a future** is either **unresolved** or **resolved**. * The **value is blocking** until the future is resolved. ??? ## References * Friedman & Wise, Aspects of applicative programming for file systems, _Proceedings of an ACM conference on Language design for reliable software_, 1977, pp41-55. # R package: future * A simple **unified API** * Works the same on **all platforms** * **Easy to install** * **Lightweight** (~300 kB incl. dependencies) * **Vignettes** * **Extendable** by anyone   # Many ways to resolve futures Strategy: | _ _ :---------------------|:---------------------- `eager` | sequentially `lazy` | only if needed `multiprocess` | in parallel `cluster` | on a set of machines _ _ | _ _ ```r > plan(lazy) > a %<-% slow_sum(1:50) > b %<-% slow_sum(51:100) > ``` ```r > b  3775 ``` ### Future `a` will never be evaluated! # Consistent futures everywhere * Unix * macOS * Windows ```r > library("future") > plan(multiprocess) > demo("mandelbrot") Calculating and plotting Mandelbrot regions ... ```   ```r - Region 1 done - Region 2 done - Region 7 done - Region 5 done - ... ```   # Future takes care of globals **Global variables and functions** that are needed for the future expression to be resolved are **identified automatically** and frozen / **exported**. **Packages** are automatically loaded. ```r x <- rnorm(n=100) y %<-% { slow_sum(x) } ``` Globals identified and frozen / exported: 1. `slow_sum()` - a function. 2. `x` - a numeric vector of length 100. # Nested futures ```r x <- rnorm(n=100) a %<-% { c %<-% slow_sum(x[1:25]) d %<-% slow_sum(x[26:50]) c + d } b %<-% { c %<-% slow_sum(x[51:75]) d %<-% slow_sum(x[76:100]) c + d } y <- a + b ``` Different strategies for resolving, e.g. * `plan(list(cluster, multiprocess))` # High Performance Compute (HPC) clusters # Map-Reduce for HPC ```r ## Find our 40 FASTQ files fastq <- dir(pattern = "[.]fq\$") ## 200 GB each! ## Align them bam <- lapply(fastq, FUN = DNAseq::align) ## 6 hours each! ```
```r library("BatchJobs") reg <- makeRegistry(id="DNASEQseq") fastq <- dir(pattern = "[.]fq\$") batchMap(reg, fastq, fun = DNAseq::align) submitJobs(reg) bam <- loadResults(reg) ```   # future.BatchJobs: Futures for HPC future.BatchJobs: | Job scheduler: :---------------------|:---------------------- `batchjobs_slurm` | Slurm `batchjobs_sge` | Sun Grid Engine `batchjobs_torque` | TORQUE / PBS `batchjobs_lsf` | Load Sharing Facility `batchjobs_openlava` | OpenLava _ _ | _ _ ```r library("future.BatchJobs") plan(batchjobs_slurm) ``` ```r bam <- listenv() for (i in seq_along(fastq)) { bam[[i]] %<-% DNAseq::align(fastq[i]) } `` # Parallel alternatives # foreach: Futures with foreach() ```r ## Align them foreach(i = seq_along(fastq), .export = "fastq") %dopar% { DNAseq::align(fastq[i]) } ```   (48 lines!)
The **doFuture** package provides a foreach `%dopar%` adapter such that _any_ type of futures can be used wherever the foreach package is used. ```r library("doFuture") registerDoFuture() plan(multiprocess) ``` ??? ## Other foreach backend adapter * doMC * doParallel * doSNOW # 1100+ packages can now parallelize on HPC Package that depends on foreach: ~300 directly + ~800 indirectly These can now also take advantage of compute clusters: ```r library("doFuture") registerDoFuture() ## (a) Tell foreach to use futures library("future.BatchJobs") plan(batchjobs_slurm) ## (b) Resolve via Slurm scheduler ```
```r library("plyr") fastq <- dir(pattern = "[.]fq\$") bam <- llply(fastq, DNAseq::align, .parallel = TRUE) ``` ??? ```r > pkgs <- tools::dependsOnPkgs("foreach", recursive=TRUE, installed=utils::available.packages()) > str(pkgs) chr [1:1099] "abcrf" "admixturegraph" "ApacheLogProcessor" "BANFF" ... ``` # Future bonuses # Plot remotely - display locally ```r > library("future") > plan(remote, workers="remote.server.org") ``` ```r ## Plot remotely > g %<-% R.devices::capturePlot({ + filled.contour(volcano, color.palette = terrain.colors) + title(main = "volcano data: filled contour map") + }) ``` ```r ## Display locally > g ``` ??? * R (>= 3.3.0) * `recordPlot()` + `replayPlot()` * Replotted using local R plot routines * X11 and similar is _not_ in play here! # Building a better future
I 💜 bug reports,
feedback and suggestions!

@HenrikBengtsson

Thank you!
# Appenix * A1. Summary * A2. Future has a minimalistic API * A3. Nested futures with plyr * A4. BiocParallel: Futures with Bioconductor * A5. Profile code remotely - display locally * A6. Implement Future API for new backend * A7. Futures I'd like to see * A8. Future improvements # A1. Summary _ _ | **future** | parallel | foreach | BatchJobs | BiocParallel :--------------------|:------------|:----------|:------------|:----------|:------------- Synchroneous |** ✓ **| ✓ | ✓ | ✓ | ✓ Asynchroneous |** ✓ **| ✓ | ✓ | ✓ | ✓ Uniform API |** ✓ **| | ✓ | ✓ | ✓ Extendable API |** ✓ **| | ✓ | ✓ | ✓ Globals |** ✓ **| | (✓) | | Packages |** ✓ **| | (✓) | | For loops |** ✓ **| | `foreach()` | | While loops |** ✓ **| | | | Nested config |** ✓ **| | | | Recursive protection |** ✓ **| mc | | mc | Early stopping |** ✓ **| | | | ✓ Traceback |** ✓ **| | | | ✓ RNG stream |** manual **| mc & SNOW | doRNG | manual | mc & SNOW ??? ## Early stopping is not supported by foreach ```r > library("foreach") > registerDoSEQ() > v <- foreach(i = 1:3) %dopar% { message(i); if (i == 2) stop("x"); i } 1 2 3 Error in { : task 2 failed - "x" ``` # A2. Future has a minimalistic API
Future assignment: Functional API:
```
> x <- 101:200

## Create implicit futures
> a %<-% slow_sum(x[1:50])
> b %<-% slow_sum(x[51:100])

## Get their values
> y <- a + b

> y
 15050
```
```
> x <- 101:200

## Create explicit futures
> f <- future( slow_sum(x[1:50]) )
> g <- future( slow_sum(x[51:100]) )

## Get their values
> y <- value(f) + value(g)

> y
 15050
```
# A3. Nested futures with plyr ```r library("future.BatchJobs") plan(list(batchjobs_slurm, multiprocess)) library(plyr) bam <- llply(fastq, function(fq) { chrs <- llply(1:24, function(chr) { DNAseq::align(fq, chromosome = chr) }, .parallel = TRUE) merge_chromosomes(chrs) }, .parallel = TRUE) ``` # A4. BiocParallel: Futures with Bioconductor You can use futures with BiocParallel, e.g. ```r library("BiocParallel") register(DoparParam(), default = TRUE) library("doFuture") registerDoFuture() plan(multiprocess) bplapply(fastq, function(fq) { DNAseq::align(fq) }) ``` # A5. Profile code remotely - display locally ```r > library("future") > plan(remote, workers="remote.server.org") > library("profvis") > dat <- data.frame( + x = rnorm(50e3), + y = rnorm(50e3) + ) ## Profile remotely > p %<-% profvis({ + plot(x ~ y, data = dat) + m <- lm(x ~ y, data = dat) + abline(m, col = "red") + }) ``` ```r ## Browse locally > p ``` # A6. Implement Future API for new backend To implement a new type of future, create the following methods: * `f <- myfuture({ expr })` - creates a future of class `MyFuture` extending `Future` * `value(f)` for `MyFuture` - gets value of future (blocking) * `resolved(f)` for `MyFuture` - checks if future is resolved or not (non-blocking) # A7. Futures I'd like to see * `plan(r32)` - e.g. `RODBC::odbcConnectAccess()` works only on 32-bit R. * `plan(p2p)` - Private and / or community-based peer-to-peer computer cluster * `plan(rhelp)` - Post R scripts to R-help and ask for the results `:P` # A8. Future improvements ## Standardization * Capturing stdout and stderr uniformly * Random number generation (Pierre L'Ecuyer's RNG streams) - easy to do manually right now ## Optional features * Logging * Progress bars * Memoization (caching of results) * On the-fly time and memory benchmark statistics