land %>% group_by(Borough, ZipCode) %>% summarize(AvgValue=mean(TotalValue))
# A tibble: 199 x 3 # Groups: Borough [?] Borough ZipCode AvgValue <chr> <int> <dbl> 1 Bronx 10451 1157195. 2 Bronx 10452 1103737. 3 Bronx 10453 420323. 4 Bronx 10454 441817. 5 Bronx 10455 404376. 6 Bronx 10456 349105. 7 Bronx 10457 371780. 8 Bronx 10458 469990. 9 Bronx 10459 242332. 10 Bronx 10460 272096. # ... with 189 more rows
landDT[, mean(TotalValue), by=list(Borough, ZipCode)]
Borough ZipCode V1 1: Bronx 10451 1157194.57 2: Bronx 10452 1103737.40 3: Bronx 10453 420323.29 4: Bronx 10454 441817.27 5: Bronx 10455 404375.68 --- 195: Staten Island 10308 49687.77 196: Staten Island 10309 96927.82 197: Staten Island 10310 61827.77 198: Staten Island 10312 57095.65 199: Staten Island 10314 121638.01
land %>% group_by(Borough, ZipCode) %>% nest(TotalValue) %>% mutate(AvgValue=map_dbl(data, ~mean(.x$TotalValue)))
# A tibble: 199 x 4 Borough ZipCode data AvgValue <chr> <int> <list> <dbl> 1 Brookyln 11201 <tibble [3,389 x 1]> 1653660. 2 Brookyln 11217 <tibble [4,032 x 1]> 459809. 3 Brookyln 11241 <tibble [1 x 1]> 25675650 4 Brookyln 11231 <tibble [4,844 x 1]> 183358. 5 Brookyln 11215 <tibble [9,066 x 1]> 183265. 6 Brookyln 11232 <tibble [2,961 x 1]> 382912. 7 Brookyln 11220 <tibble [9,036 x 1]> 134261. 8 Brookyln 11218 <tibble [6,906 x 1]> 130675. 9 Brookyln 11238 <tibble [4,892 x 1]> 260129. 10 Brookyln 11226 <tibble [5,332 x 1]> 230392. # ... with 189 more rows
land %>% partition(Borough, ZipCode) %>% summarize(AvgValue=mean(TotalValue))
Source: party_df [199 x 3] Groups: Borough Shards: 3 [60--71 rows] # S3: party_df Borough ZipCode AvgValue <chr> <int> <dbl> 1 Bronx 10459 242332. 2 Bronx 10460 272096. 3 Bronx 10475 1169362. 4 Bronx 11370 251893350 5 Brookyln 11207 99846. 6 Brookyln 11213 168743. 7 Brookyln 11214 125953. 8 Brookyln 11215 183265. 9 Brookyln 11218 130675. 10 Brookyln 11219 140375. # ... with 189 more rows
plan(multiprocess) land %>% group_by(Borough, ZipCode) %>% nest(TotalValue) %>% mutate(AvgValue=future_map_dbl(data, ~mean(.x$TotalValue)))
# A tibble: 199 x 4 Borough ZipCode data AvgValue <chr> <int> <list> <dbl> 1 Brookyln 11201 <tibble [3,389 x 1]> 1653660. 2 Brookyln 11217 <tibble [4,032 x 1]> 459809. 3 Brookyln 11241 <tibble [1 x 1]> 25675650 4 Brookyln 11231 <tibble [4,844 x 1]> 183358. 5 Brookyln 11215 <tibble [9,066 x 1]> 183265. 6 Brookyln 11232 <tibble [2,961 x 1]> 382912. 7 Brookyln 11220 <tibble [9,036 x 1]> 134261. 8 Brookyln 11218 <tibble [6,906 x 1]> 130675. 9 Brookyln 11238 <tibble [4,892 x 1]> 260129. 10 Brookyln 11226 <tibble [5,332 x 1]> 230392. # ... with 189 more rows
numbers <- rnorm(1E8, mean=5, sd=2) head(numbers, 20)
[1] 3.879049 4.539645 8.117417 5.141017 5.258575 8.430130 5.921832 [8] 2.469878 3.626294 4.108676 7.448164 5.719628 5.801543 5.221365 [15] 3.888318 8.573826 5.995701 1.066766 6.402712 4.054417
ggplot(data.frame(x=numbers)) + geom_histogram(aes(x=x))
mean(numbers)
[1] 5.000314
// [[Rcpp::export]]
double mean1(NumericVector x)
{
double result = 0;
int xLen = x.size();
for(int i=0; i < xLen; ++i)
{
result += x[i];
}
return result / xLen;
}
mean1(numbers)
[1] 5.000314
// [[Rcpp::export]]
double mean2(NumericVector x)
{
return std::accumulate(x.begin(), x.end(), 0.0) / x.size();
}
mean2(numbers)
[1] 5.000314
// [[Rcpp::export]]
double mean3(NumericVector x)
{
return mean(x);
}
mean3(numbers)
[1] 5.000314
// [[Rcpp::depends(RcppParallel)]]
#include <RcppParallel.h>
using namespace RcppParallel;
struct Sum : public Worker{
const RVector<double> input; // source vector
double value; // accumulated value
// constructors
Sum(const NumericVector input) : input(input), value(0) {}
Sum(const Sum& sum, Split) : input(sum.input), value(0) {}
// accumulate just the element of the range I've been asked to
void operator()(std::size_t begin, std::size_t end) {
value += std::accumulate(input.begin() + begin, input.begin() + end, 0.0);
}
// join my value with that of another Sum
void join(const Sum& rhs) {
value += rhs.value;
}
};
// [[Rcpp::export]]
double parallelMean(NumericVector x) {
// declare the SumBody instance
Sum sum(x);
// call parallel_reduce to start the work
parallelReduce(0, x.length(), sum);
// return the computed sum
return sum.value/x.size();
}
parallelMean(numbers)
[1] 5.000314
// [[Rcpp::export]]
DataFrame agger1(DataFrame DF, std::string var, std::string id){
NumericVector numbers = DF[var]; CharacterVector groupers = DF[id];
CharacterVector onlyThese = unique(groupers);
NumericVector calcResults(onlyThese.size());
int n = groupers.size();
std::map<SEXP, std::vector<double> > counts;
for (int i = 0; i < n; ++i)
counts[groupers[i]].push_back(numbers[i]);
for(int i=0; i<onlyThese.size(); ++i)
calcResults[i] = std::accumulate(counts[onlyThese[i]].begin(),
counts[onlyThese[i]].end(), 0.0) /
counts[onlyThese[i]].size();
DataFrame answer = DataFrame::create(_[id]=onlyThese, _[var]=calcResults);
return answer;
}
agger1(land, 'TotalValue', 'ZipCode') %>% as_tibble
# A tibble: 196 x 2 ZipCode TotalValue <fct> <dbl> 1 11238 260129. 2 11230 148027. 3 10463 544006. 4 10011 3299441. 5 10033 1338293. 6 11420 61828. 7 11433 115486. 8 10306 55926. 9 11218 130675. 10 11210 120266. # ... with 186 more rows
#include <omp.h>
// [[Rcpp::plugins(openmp)]]
// [[Rcpp::export]]
DataFrame agger2(DataFrame DF, std::string var, std::string id, int cores=1){
omp_set_num_threads(cores);
NumericVector numbers = DF[var];CharacterVector groupers = DF[id];
CharacterVector onlyThese = unique(groupers);
NumericVector calcResults(onlyThese.size());
int n = groupers.size();
std::map<SEXP, std::vector<double> > counts;
#pragma omp parallel for schedule(static)
for (int i = 0; i < n; ++i)
counts[groupers[i]].push_back(numbers[i]);
#pragma omp parallel for schedule(static)
for(int i=0; i<onlyThese.size(); ++i)
calcResults[i] = std::accumulate(counts[onlyThese[i]].begin(),
counts[onlyThese[i]].end(), 0.0) /
counts[onlyThese[i]].size();
DataFrame answer = DataFrame::create(_[id]=onlyThese, _[var]=calcResults);
return answer;
}
agger2(land, 'TotalValue', 'ZipCode', cores=2) %>% as_tibble
# A tibble: 196 x 2 ZipCode TotalValue <fct> <dbl> 1 11238 260129. 2 11230 148027. 3 10463 544006. 4 10011 3299441. 5 10033 1338293. 6 11420 61828. 7 11433 115486. 8 10306 55926. 9 11218 130675. 10 11210 120266. # ... with 186 more rows
agger2(land %>% unite(col=BoroZip, Borough, ZipCode, sep='_'),
'TotalValue', 'BoroZip', cores=2) %>%
separate(col=BoroZip, into=c('Borough', 'Zip'), sep='_') %>%
as_tibble
# A tibble: 199 x 3 Borough Zip TotalValue <chr> <chr> <dbl> 1 Bronx 10465 124298. 2 Manhattan 10021 4836174. 3 Brookyln 11207 99846. 4 Queens 11421 45866. 5 Queens 11434 97754. 6 Manhattan 10039 1760972. 7 Queens 11411 34974. 8 Queens 11418 112953. 9 Staten Island 10302 57667. 10 Manhattan 10280 34487680 # ... with 189 more rows
library(doMPI) cl <- startMPIcluster(count = 4) registerDoMPI(cl)
library(doFuture) registerDoFuture() cl <- makeCluster(4, type = "MPI") plan(cluster, workers = cl)
foreach(i = 1:1E6) %dopar%
{
rnorm(i, mean = mu, sd = sigma)
}
mpirun -H localhost,n2,n3 R --slave -f generate_normals.R
parallelfuturefurrrRcppParallelmultidplyrThank You