About Us

Faster R Code

land %>% 
  group_by(Borough, ZipCode) %>% 
  summarize(AvgValue=mean(TotalValue))
# A tibble: 199 x 3
# Groups:   Borough [?]
   Borough ZipCode AvgValue
   <chr>     <int>    <dbl>
 1 Bronx     10451 1157195.
 2 Bronx     10452 1103737.
 3 Bronx     10453  420323.
 4 Bronx     10454  441817.
 5 Bronx     10455  404376.
 6 Bronx     10456  349105.
 7 Bronx     10457  371780.
 8 Bronx     10458  469990.
 9 Bronx     10459  242332.
10 Bronx     10460  272096.
# ... with 189 more rows

landDT[, mean(TotalValue), by=list(Borough, ZipCode)]
           Borough ZipCode         V1
  1:         Bronx   10451 1157194.57
  2:         Bronx   10452 1103737.40
  3:         Bronx   10453  420323.29
  4:         Bronx   10454  441817.27
  5:         Bronx   10455  404375.68
 ---                                 
195: Staten Island   10308   49687.77
196: Staten Island   10309   96927.82
197: Staten Island   10310   61827.77
198: Staten Island   10312   57095.65
199: Staten Island   10314  121638.01

land %>% 
  group_by(Borough, ZipCode) %>% 
  nest(TotalValue) %>% 
  mutate(AvgValue=map_dbl(data, ~mean(.x$TotalValue)))
# A tibble: 199 x 4
   Borough  ZipCode data                  AvgValue
   <chr>      <int> <list>                   <dbl>
 1 Brookyln   11201 <tibble [3,389 x 1]>  1653660.
 2 Brookyln   11217 <tibble [4,032 x 1]>   459809.
 3 Brookyln   11241 <tibble [1 x 1]>     25675650 
 4 Brookyln   11231 <tibble [4,844 x 1]>   183358.
 5 Brookyln   11215 <tibble [9,066 x 1]>   183265.
 6 Brookyln   11232 <tibble [2,961 x 1]>   382912.
 7 Brookyln   11220 <tibble [9,036 x 1]>   134261.
 8 Brookyln   11218 <tibble [6,906 x 1]>   130675.
 9 Brookyln   11238 <tibble [4,892 x 1]>   260129.
10 Brookyln   11226 <tibble [5,332 x 1]>   230392.
# ... with 189 more rows

Now in Parallel

land %>% 
  partition(Borough, ZipCode) %>% 
  summarize(AvgValue=mean(TotalValue))
Source: party_df [199 x 3]
Groups: Borough
Shards: 3 [60--71 rows]

# S3: party_df
   Borough  ZipCode   AvgValue
   <chr>      <int>      <dbl>
 1 Bronx      10459    242332.
 2 Bronx      10460    272096.
 3 Bronx      10475   1169362.
 4 Bronx      11370 251893350 
 5 Brookyln   11207     99846.
 6 Brookyln   11213    168743.
 7 Brookyln   11214    125953.
 8 Brookyln   11215    183265.
 9 Brookyln   11218    130675.
10 Brookyln   11219    140375.
# ... with 189 more rows

plan(multiprocess)
land %>% 
  group_by(Borough, ZipCode) %>% 
  nest(TotalValue) %>% 
  mutate(AvgValue=future_map_dbl(data, ~mean(.x$TotalValue)))
# A tibble: 199 x 4
   Borough  ZipCode data                  AvgValue
   <chr>      <int> <list>                   <dbl>
 1 Brookyln   11201 <tibble [3,389 x 1]>  1653660.
 2 Brookyln   11217 <tibble [4,032 x 1]>   459809.
 3 Brookyln   11241 <tibble [1 x 1]>     25675650 
 4 Brookyln   11231 <tibble [4,844 x 1]>   183358.
 5 Brookyln   11215 <tibble [9,066 x 1]>   183265.
 6 Brookyln   11232 <tibble [2,961 x 1]>   382912.
 7 Brookyln   11220 <tibble [9,036 x 1]>   134261.
 8 Brookyln   11218 <tibble [6,906 x 1]>   130675.
 9 Brookyln   11238 <tibble [4,892 x 1]>   260129.
10 Brookyln   11226 <tibble [5,332 x 1]>   230392.
# ... with 189 more rows

Even More Speed

numbers <- rnorm(1E8, mean=5, sd=2)
head(numbers, 20)
 [1] 3.879049 4.539645 8.117417 5.141017 5.258575 8.430130 5.921832
 [8] 2.469878 3.626294 4.108676 7.448164 5.719628 5.801543 5.221365
[15] 3.888318 8.573826 5.995701 1.066766 6.402712 4.054417

ggplot(data.frame(x=numbers)) + geom_histogram(aes(x=x))

mean(numbers)
[1] 5.000314

// [[Rcpp::export]]
double mean1(NumericVector x)
{
    double result = 0;
    
    int xLen = x.size();
    
    for(int i=0; i < xLen; ++i)
    {
        result += x[i];
    }
    
    return result / xLen;
}
mean1(numbers)
[1] 5.000314

// [[Rcpp::export]]
double mean2(NumericVector x)
{
    return std::accumulate(x.begin(), x.end(), 0.0) / x.size();
}
mean2(numbers)
[1] 5.000314

// [[Rcpp::export]]
double mean3(NumericVector x)
{
    return mean(x);
}
mean3(numbers)
[1] 5.000314

Parallel Rcpp

// [[Rcpp::depends(RcppParallel)]]
#include <RcppParallel.h>
using namespace RcppParallel;

struct Sum : public Worker{   
  const RVector<double> input;  // source vector
  double value;   // accumulated value
  
  // constructors
  Sum(const NumericVector input) : input(input), value(0) {}
  Sum(const Sum& sum, Split) : input(sum.input), value(0) {}
  
  // accumulate just the element of the range I've been asked to
  void operator()(std::size_t begin, std::size_t end) {
    value += std::accumulate(input.begin() + begin, input.begin() + end, 0.0);
  }
  
  // join my value with that of another Sum
  void join(const Sum& rhs) { 
    value += rhs.value; 
  }
};

// [[Rcpp::export]]
double parallelMean(NumericVector x) {
    
    // declare the SumBody instance 
    Sum sum(x);
    
    // call parallel_reduce to start the work
    parallelReduce(0, x.length(), sum);
    
    // return the computed sum
    return sum.value/x.size();
}

parallelMean(numbers)
[1] 5.000314

Rcpp Aggregation

// [[Rcpp::export]]
DataFrame agger1(DataFrame DF, std::string var, std::string id){
  NumericVector numbers = DF[var]; CharacterVector groupers = DF[id];
  CharacterVector onlyThese = unique(groupers);
  NumericVector calcResults(onlyThese.size());
  
  int n = groupers.size();
  std::map<SEXP, std::vector<double> > counts;
  
  for (int i = 0; i < n; ++i)
    counts[groupers[i]].push_back(numbers[i]);
  for(int i=0; i<onlyThese.size(); ++i)
    calcResults[i] = std::accumulate(counts[onlyThese[i]].begin(), 
                                     counts[onlyThese[i]].end(), 0.0) / 
                                       counts[onlyThese[i]].size();
  DataFrame answer = DataFrame::create(_[id]=onlyThese, _[var]=calcResults);
  return answer;
}

agger1(land, 'TotalValue', 'ZipCode') %>% as_tibble
# A tibble: 196 x 2
   ZipCode TotalValue
   <fct>        <dbl>
 1 11238      260129.
 2 11230      148027.
 3 10463      544006.
 4 10011     3299441.
 5 10033     1338293.
 6 11420       61828.
 7 11433      115486.
 8 10306       55926.
 9 11218      130675.
10 11210      120266.
# ... with 186 more rows

#include <omp.h>
// [[Rcpp::plugins(openmp)]]
// [[Rcpp::export]]
DataFrame agger2(DataFrame DF, std::string var, std::string id, int cores=1){
  omp_set_num_threads(cores);
  NumericVector numbers = DF[var];CharacterVector groupers = DF[id];
  CharacterVector onlyThese = unique(groupers);
  NumericVector calcResults(onlyThese.size());
  int n = groupers.size();
  std::map<SEXP, std::vector<double> > counts;
  #pragma omp parallel for schedule(static)
  for (int i = 0; i < n; ++i)
    counts[groupers[i]].push_back(numbers[i]);
  #pragma omp parallel for schedule(static)
  for(int i=0; i<onlyThese.size(); ++i)
    calcResults[i] = std::accumulate(counts[onlyThese[i]].begin(), 
                                     counts[onlyThese[i]].end(), 0.0) / 
                                       counts[onlyThese[i]].size();
  DataFrame answer = DataFrame::create(_[id]=onlyThese, _[var]=calcResults);
  return answer;
}

agger2(land, 'TotalValue', 'ZipCode', cores=2) %>% as_tibble
# A tibble: 196 x 2
   ZipCode TotalValue
   <fct>        <dbl>
 1 11238      260129.
 2 11230      148027.
 3 10463      544006.
 4 10011     3299441.
 5 10033     1338293.
 6 11420       61828.
 7 11433      115486.
 8 10306       55926.
 9 11218      130675.
10 11210      120266.
# ... with 186 more rows

agger2(land %>% unite(col=BoroZip, Borough, ZipCode, sep='_'), 
       'TotalValue', 'BoroZip', cores=2) %>% 
  separate(col=BoroZip, into=c('Borough', 'Zip'), sep='_') %>% 
  as_tibble
# A tibble: 199 x 3
   Borough       Zip   TotalValue
   <chr>         <chr>      <dbl>
 1 Bronx         10465    124298.
 2 Manhattan     10021   4836174.
 3 Brookyln      11207     99846.
 4 Queens        11421     45866.
 5 Queens        11434     97754.
 6 Manhattan     10039   1760972.
 7 Queens        11411     34974.
 8 Queens        11418    112953.
 9 Staten Island 10302     57667.
10 Manhattan     10280  34487680 
# ... with 189 more rows

MPI

library(doMPI)
cl <- startMPIcluster(count = 4)
registerDoMPI(cl)

library(doFuture)
registerDoFuture()
cl <- makeCluster(4, type = "MPI")
plan(cluster, workers = cl)

foreach(i = 1:1E6) %dopar%
  {
    rnorm(i, mean = mu, sd = sigma)
  }

mpirun -H localhost,n2,n3 R --slave -f generate_normals.R

Many Ways to Go Parallel

  • parallel
  • future
  • furrr
  • RcppParallel
  • multidplyr

Thank You