Skip to content

[WIP] Add tests for order to prefer avoiding upwards branching#3017

Merged
mrocklin merged 5 commits intodask:masterfrom
mrocklin:order-dependents
Jan 9, 2018
Merged

[WIP] Add tests for order to prefer avoiding upwards branching#3017
mrocklin merged 5 commits intodask:masterfrom
mrocklin:order-dependents

Conversation

@mrocklin
Copy link
Member

See #3013

  • Tests added / passed
  • Passes flake8 dask
  • Fully documented, including docs/source/changelog.rst for all changes
    and one of the docs/source/*-api.rst files for new API

@mrocklin
Copy link
Member Author

@rabernat I believe that you were having memory issues recently with XArray. Changes in this PR may affect that. If you're able to provide a tiny XArray example to test against this would be a very convenient time to dive into that.

@rabernat
Copy link
Contributor

Sorry for the delayed reply here.

Can you clarify what sort of xarray-related test you are looking for? I'm not sure I understand this PR enough to know how to test against it.

@mrocklin
Copy link
Member Author

mrocklin commented Dec 22, 2017 via email

@rabernat
Copy link
Contributor

Here is the sort of test case I have in mind

import xarray as xr
import dask.array as dsa
import pandas as pd

# below I create a random dataset that is typical of high-res climate models
# size of example can be adjusted up and down by changing shape
dims = ('time', 'depth', 'lat', 'lon')
time = pd.date_range('1980-01-01', '2017-01-01', freq='5D')
shape = (len(time), 50, 1800, 3600)
# what I consider to be a reasonable chunk size
chunks = (1, 1, 1800, 3600)
ds = xr.Dataset({k: (dims, dsa.random.random(shape, chunks=chunks))
                 for k in ['u', 'v', 'w']},
                coords={'time': time})

# create seasonal climatology
ds_clim = ds.groupby('time.month').mean(dim='time')

# at this point, we usually recommend saving to loading ds_clim or saving to disk
# http://xarray.pydata.org/en/latest/dask.html#optimization-tips
ds_clim = ds_clim.to_netcdf('ds_clim.nc')
# now we reload from disk (could also just have called `ds_clim.load()`)
ds_clim = xr.open_dataset('ds_clim.nc')

# construct seasonal anomaly
ds_anom = ds.groupby('time.month') - ds_clim
# compute variance of seasonal anomaly
ds_anom_var = (ds_anom**2).mean(dim='time').load()

@rabernat
Copy link
Contributor

rabernat commented Dec 22, 2017

I can confirm that this example causes my workers to die (on the first to_netcdf call).

@mrocklin
Copy link
Member Author

In an attempt to reproduce I ran a stripped down version of this and things ran fine.

The differences here are the following:

  1. Reduced the total calendar time of the dataset
  2. Didn't roundtrip to NetCDF (although I've done this as well and it seems fine)
  3. Use the distributed scheduler for diagnostics
import xarray as xr
import dask.array as da
import pandas as pd

from dask.distributed import Client
client = Client(processes=False)

# below I create a random dataset that is typical of high-res climate models
# size of example can be adjusted up and down by changing shape
dims = ('time', 'depth', 'lat', 'lon')
time = pd.date_range('1980-01-01', '1981-01-01', freq='5d')
shape = (len(time), 50, 1800, 3600)
# what I consider to be a reasonable chunk size
chunks = (1, 1, 1800, 3600)
ds = xr.Dataset({k: (dims, da.random.random(shape, chunks=chunks))
                 for k in ['u', 'v', 'w']},
                coords={'time': time})

# create seasonal climatology
ds_clim = ds.groupby('time.month').mean(dim='time')

# at this point, we usually recommend saving to loading ds_clim or saving to disk
# http://xarray.pydata.org/en/latest/dask.html#optimization-tips
# ds_clim = ds_clim.to_netcdf('ds_clim.nc')
# now we reload from disk (could also just have called `ds_clim.load()`)
# ds_clim = xr.open_dataset('ds_clim.nc', chunks=ds_clim.)

# construct seasonal anomaly
ds_anom = ds.groupby('time.month') - ds_clim
# compute variance of seasonal anomaly
ds_anom_var = (ds_anom**2).mean(dim='time')

Still trying to track things down. @rabernat does this still fail to behave well if you reduce the size of the job? As a fail case this thing is huge!

@rabernat
Copy link
Contributor

I just tried running the shorter time range. I am using the following scheduler:

cluster = LocalCluster(n_workers=4, threads_per_worker=4)
client = Client(cluster)

It looks to be heading for a crash, with memory consumption increasing to over 250 GB. I had to kill the scheduler before it crashed the whole machine. (Before that, it was running quite slow...I projected over one hour just to do a single year.)

image

How is this different from your successful trial? Did you actually call .load() on ds_anom_var?

@rabernat
Copy link
Contributor

rabernat commented Dec 27, 2017

Update: memory usage appears to be stable at around 10 GB if I instead use client = Client(processes=False). Calculation is still undesirably slow, however.

@mrocklin
Copy link
Member Author

mrocklin commented Dec 27, 2017 via email

@mrocklin
Copy link
Member Author

mrocklin commented Dec 27, 2017 via email

@mrocklin
Copy link
Member Author

I can reproduce the failure. Thanks @rabernat !

@rabernat
Copy link
Contributor

Is my issue actually related to this PR? If not I can open it elsewhere.

This "climatology" use case is really fundamental to climate data analysis. Achieving stability and good performance on this even for very large datasets should be a top priority for xarray and pangeo. So I want to make sure this is appropriately visible.

@mrocklin
Copy link
Member Author

Is my issue actually related to this PR?

Possibly. This sue is about fixing other workloads that don't navigate the task graph in a way to support low-memory computation. Looking at a toned down version of your computation though everything seems to be mostly fine.

If not I can open it elsewhere.

That seems reasonable. I'm likely to merge this PR soon.

@mrocklin
Copy link
Member Author

mrocklin commented Jan 4, 2018

I plan to merge this later today if there are no comments. @jcrist or @eriknw are probably the right people to review. Note that this does change current scheduling behavior in a way that is not strictly an improvement.

@mrocklin mrocklin merged commit e74161c into dask:master Jan 9, 2018
@mrocklin mrocklin deleted the order-dependents branch January 9, 2018 15:48
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants