Skip to content

Incremental pca#619

Merged
TomAugspurger merged 32 commits intodask:masterfrom
fujiisoup:incremental_pca
Apr 8, 2020
Merged

Incremental pca#619
TomAugspurger merged 32 commits intodask:masterfrom
fujiisoup:incremental_pca

Conversation

@fujiisoup
Copy link
Copy Markdown
Contributor

closes #617

Most part of the script is stolen from scikit-learn but updated to be compatible with dask-arrays.
Tests are also copied.

self.batch_size = batch_size
self.svd_solver = svd_solver
self.iterated_power = iterated_power
self.random_state = random_state
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

svd_solver, iterated_power, random_state are added as new optional arguments.

X = np.vstack((self.singular_values_.reshape((-1, 1)) *
self.components_, X, mean_correction))

solver = self._get_solver(X, self.n_components_)
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

svd_solver is chosen based on the same strategy with PCA.

solver = self._get_solver(X, self.n_components_)
if solver in {"full", "tsqr"}:
if hasattr(X, 'rechunk'):
X = da.rechunk(X, (X.chunks[0], -1))
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If the array is chunked along the last axis, svd won't work.
I'm not sure if rechunking is a right way here.
Probably we can raise an Exception?

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Perhaps check what we do elsewhere in the library. It's not clear to me whether the estimator should implicitly rechunk or not, since this could cause surprisingly high memory usage.

'auto',
'randomized'
])
def test_compare_with_sklearn(svd_solver):
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is only one test I added aside from those stolen from scikit-learn.

@mrocklin
Copy link
Copy Markdown
Member

I apologize for the delay @fujiisoup . Many of the Dask maintainers are at a workshop this week.

At first glance I am surprised that the Scikit-Learn code works well with small modifications. I haven't looked at it deeply yet, but I have a few questions:

  1. Could we use the Scikit-Learn estimator and the general approach for partial_fit?
  2. How does this perform? In particular, if it is easy for you i would love to see a performance report. I think that this would help us to talk about bottlenecks that might exist in this algorithm. https://docs.dask.org/en/latest/diagnostics-distributed.html#capture-diagnostics
  3. I'm curious about when the da.linalg.svd_compressed algorithm in dask.array might be a better fit, particularly with the compute=True keyword.

Again, I apologize for the delay, and for not yet giving this a thorough review

Copy link
Copy Markdown
Member

@TomAugspurger TomAugspurger left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see warnings like

/Users/taugspurger/Envs/dask-dev/lib/python3.7/site-packages/dask/array/core.py:1324: FutureWarning: The `numpy.expand_dims` function is not implemented by Dask array. You may want to use the da.map_blocks function or something similar to silence this warning. Your code may stop working in a future release.
  FutureWarning,

I think that could be implemented on dask.array.

solver = self._get_solver(X, self.n_components_)
if solver in {"full", "tsqr"}:
if hasattr(X, 'rechunk'):
X = da.rechunk(X, (X.chunks[0], -1))
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Perhaps check what we do elsewhere in the library. It's not clear to me whether the estimator should implicitly rechunk or not, since this could cause surprisingly high memory usage.

self.singular_values_ = S[:self.n_components_]
self.mean_ = col_mean
self.var_ = col_var
self.explained_variance_ = explained_variance[:self.n_components_]
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you check the types of all these vs. those in dask_ml.decomposition.PCA? I notice some that differ.

Which do you find more useful here: a Dask Array or a concrete ndarray?

@fujiisoup
Copy link
Copy Markdown
Contributor Author

Thank you for the comments ;) and sorry for my late reply.

We tested this PR with our real data and found some problems.
Now we are fixing them.
We are also preparing some benchmarking of our PR.

I'll push and post the update (probably later this week or the next week.)
@yasahi-hpc

* bugfix computing noise_variance and explained_variance_ratio

* bugfix factor for previous total_var should be (n_samples_seen - 1)

* bugfix using sklearn formula for explained_variance_ratio

* bugfix correct formula to compute accumulated variance

* add tests for each svd_solver

* bugfix argument of svd_compressed should be self.n_components_

* bugfix add correct _fit method

* bugfix changing the formula to compute total variance

* A bug fix for explained_variance_ratio. The noise variance is still wrong

* A bug fix in noise_variance. But can be still wrong.

* more tests

* typo

Co-authored-by: Yuuichi ASAHI <asahi.yuuichi@qst.go.jp>
Copy link
Copy Markdown
Member

@TomAugspurger TomAugspurger left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for working on this. Still want to give it a closer look.

It'd be nice to document the places where we're needing to copy code from scikit-learn. This is a lot of code, when ideally we only need to override a few pieces relevant to Dask.

@fujiisoup
Copy link
Copy Markdown
Contributor Author

Thanks. I marked the updated part from scikit-learn by comment.
We will make sure its performance, and report it back!

@yasahi-hpc
Copy link
Copy Markdown

Hello, I have made a performance analysis with the performance_report and a naive timer. The testbed is a Intel Skylake CPU. Using the incremental PCA to our large data ~20GB, it gave the dask-report for the fit method . For some reason, if we call the method under performance_report, it gets 50 times slower.

A naive timer gave 184s, 148s and 63s for incremental_mean_and_var, svd, and compute(), respectively. For the naive measurement, we deliberately compute() all the quantities for synchronization. Other functions are immediately finished.

@mrocklin
Copy link
Copy Markdown
Member

For some reason, if we call the method under performance_report, it gets 50 times slower.

Hrm, that is concerning. Thank you for reporting this. I don't immediately see why. I raised dask/distributed#3654 to make sure that we are not counting the time taken to generate the plots.

However after I looked more closely at your performance report I see that this is not the problem. I see that the computations are actually taking many hours, which is surprising. There are tasks like mean_agg-aggregate and getitem which take 60 seconds, which is very surprising. Computing means and getting slices should be very fast. I do not know why these would be slow, or why running your code with the performance_report context manager would make things slow.

The performance report code doesn't run during execution, it only collects data afterwards.

When I look at the administrative profiles, which observe Dask itself (not the numpy/xarray code) I don't see anything that would cause this. I do however see that it is taking some time to read excess data from disk (you are maybe running out of memory?) But this adds only a few minutes of total execution time across all workers.

In summary, I am confused :/

@TomAugspurger
Copy link
Copy Markdown
Member

@fujiisoup apologies for the CI failures. If you merge master they should be resolved.

Comment on lines +6 to +8
from sklearn.utils._testing import assert_almost_equal
from sklearn.utils._testing import assert_array_almost_equal
from sklearn.utils._testing import assert_allclose_dense_sparse
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems to be causing an import error. Can these be avoided?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just noticed that older scikit-learn does not have utils._testing but instead utils.testing.
Added a patch by try: except ImportError.

@yasahi-hpc
Copy link
Copy Markdown

@mrocklin
Thanks so much for your detailed explanation and help.
I am also sorry for my late reply. Our cluster has been unavailable for a week just until today.
I guess some of the issues stem from the Lustre file system of our cluster.
(Still not very clear, why the problem occurs only with the performance_report.)
The memory shortage may be a thing since there is a warning message implying a memory leak (the message appears when creating a Client object).

In order to focus on the performance of Dask itself, I have simplified the script with the smaller input data size (~7.5GB).
I no longer use xarray/numpy to read our actual input data, rather I use the mimic data generated by dask.random.random. In addition, it turned out that inserting the line dask.config.set({'temporary_directory': './tmp'}) to avoid creating the scratch directory gives a significant performance improvement.
In the end, the script runs three times slower than without performance_report.

The new performance report seems to show more reasonable performance. Unfortunately, I am not sure why adding the performance_report changes the performance.

Copy link
Copy Markdown
Member

@TomAugspurger TomAugspurger left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks. This looks nice.

It's a bit unfortunate that we need so much surrounding code to make the small updates we need. It'd be a fun long-term project to see how much we can push on changes in Dask (array assignment, etc) and scikit-learn (check_array, a few other places) to get more code reuse.


# Update stats - they are 0 if this is the first step
# The next line is equivalent with np.repeat(self.n_samples_seen_, X.shape[1]),
# which dask-array does not support
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Dask does support np.repeat. Is it just when repeats=0 that there's an issue? Opened dask/dask#6076 for that.

@TomAugspurger TomAugspurger merged commit 85d0454 into dask:master Apr 8, 2020
@fujiisoup fujiisoup deleted the incremental_pca branch April 9, 2020 01:02
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

IncrementalPCA

5 participants