Skip to content

Conversation

@sadielbartholomew
Copy link
Member

@sadielbartholomew sadielbartholomew commented Aug 10, 2022

Migrates the Data.stats method towards #182.

Since stats is a compound method, in that it is in essence just reporting the outputs from various other cf stats/collapsing methods, each of which have already been 'daskified' hence utilise dask under-the-hood, the simplest way to migrate with full performance (bar working out and making use of any sub-calculation operations which might be shared between any of the statistics or something like that which would surely be over the top for our purposes?) is (I believe) to run each statistic calculation in parallel, which in Dask world for this context is done with delayed functions and a final compute.

This PR implements this. As indicated by the resultant Dask task graph shown below, all statistics are calculated separately, not in serial, so stats should take only as long as the most intensive calculation rather than the sum of all calculation times.

(I hope I haven't misunderstood the assignment here, so to speak, by taking such an approach centered on Dask's delayed.)

Graph

With the code as-is on the branch/PR here and now, but with a little tweak so we can access and save the Dask task graph, namely I did:

diff --git a/cf/data/data.py b/cf/data/data.py
index e91d6df03..2da53c70f 100644
--- a/cf/data/data.py
+++ b/cf/data/data.py
@@ -15,6 +15,7 @@ from dask.array.core import normalize_chunks
 from dask.base import is_dask_collection, tokenize
 from dask.core import flatten
 from dask.highlevelgraph import HighLevelGraph
+from dask import delayed, compute, visualize
 
 from ..cfdatetime import dt as cf_dt
 from ..constants import masked as cf_masked
@@ -9376,16 +9377,20 @@ class Data(DataClassDeprecationsMixin, Container, cfdm.Data):
-        return compute(out)[0]
+        return out

the task graph generated interactively via:

In [1]: import cf
   ...: import numpy as np
   ...: import dask

In [2]: a = cf.example_field(1)

In [3]: b = a.data.stats()

In [4]: dask.visualize(b, filename='stats-graph-final.png')
Out[4]: <IPython.core.display.Image object>

is:

stats-graph-final

which looks right to me. The task graph here doesn't indicate but obviously all results get pulled together into the dict output at compute-time at the end via querying the compute output tuple).

@sadielbartholomew sadielbartholomew added the dask Relating to the use of Dask label Aug 10, 2022
cf/data/data.py Outdated

if all or sample_size:
out["sample_size"] = int(self.sample_size())
out["sample_size"] = delayed(lambda: int(self.sample_size()))()
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I know lambdas are frowned upon somewhat, but it seems intuitive to use one here instead of defining a trivial function, I think.

@davidhassell
Copy link
Collaborator

Hi Sadie,

This is a really nice idea. Perhaps we could go one step further and potentially avoid having to read the data from disk each time. If we defined simple wrappers for the stats functions, they could share the same underlying data. E.g.

import dask.array as da
from dask import delayed, compute

def max(x):
    return x.max()

def min(x):
    return x.min()

dx = da.arange(100, chunks=50)

stats = (delayed(min, pure=True)(dx), delayed(max, pure=True)(dx))
compute(*stats)
# (0, 99)

It would involve writing a little function for each stat (or lambdas - but I don't know how that would play with the weights kwarg), but they would be contained within the stats method ... What do you think?

(I'm sure you can visualise the above graph, but I couldn't work out how. The docs imply that the dx will be shared between functions, though, I think)

@sadielbartholomew
Copy link
Member Author

Thanks David, this:

Perhaps we could go one step further and potentially avoid having to read the data from disk each time. If we defined simple wrappers for the stats functions, they could share the same underlying data.

is a great idea. Once I've tied up some loose ends elsewhere I will have a crack at getting it implemented in the way you suggest, which also sounds nice to me.

@sadielbartholomew
Copy link
Member Author

sadielbartholomew commented Nov 2, 2022

Hi @davidhassell, I got back onto this one this week (intermittently) as per our priority list. I've updated it to incorporate your idea in #432 (comment). Let me know if I've implemented that as you intended, correctly, and if there is any further feedback. Saying that, there are a few aspects we should agree on regarding the desired output for stats in light of API updates made elsewhere during the Dask migration:

Namely, the 'new way' with Dask migration, where we return Data objects rather than values (sorry I can't find the relevant PR or Issue where we discussed that change, after quite a bit of searching, maybe you can remember which one it was?) means that the outputs (return dict values corresponding to each stats operation key) have gone from Python scalars/numbers to cf.Data containing the corresponding values, e.g. 1.0 -> cf.Data(1.0):

  • I have updated the PR in line with that, but do we actually want that for the output, or do we want to instead return the scalar number values, as before? E.g. the <scalar> instead of cf.Data(<scalar>)?
  • The dict value for the 'sample size' operation ("sample size" key) emerges as cf.Data([<scalar>]), i.e. has ndim of 1 rather than 0 like the others. Is that what we want? I would argue that we should have consistency across the outputs in ndim and therefore we should change it to be cf.Data(<scalar>). Maybe that was the intention anyway and I have not coded that aspect up correctly?

@sadielbartholomew
Copy link
Member Author

Regarding the fact that the data type of the scalar values themselves is not consistent, at least for our testing case with a stats input of cf.Data[1, 1] and weights=1 set, namely that the values are in some cases float and in some cases int (see the test_Data_stats dict compared output now, noting that this is the form when the test passes, with the stats method code as-is), I think that is fine because it makes sense in the context of the different operations calculations, .e.g a maximum of 1 and 1 is 1 not 1.0, but the mean is 1.0.

@sadielbartholomew
Copy link
Member Author

Updating now in line with an offline discussion with @davidhassell, in which we realised that the approach in #432 (comment) wasn't worthwhile. In the meantime, David, what are your thoughts regarding the output value format and ndim as I outline in #432 (comment), in case we want to tweak the output format?

@davidhassell
Copy link
Collaborator

Hi Sadie - thanks for the nudge. This is a bit tricky, as the results haven't actually been computed, now! Perhaps we could have a compute keyword parameter, that defaults to True, which determines whether or not computed <scalar> values are returned, or else non-computed Data objects?

@sadielbartholomew
Copy link
Member Author

Perhaps we could have a compute keyword parameter, that defaults to True, which determines whether or not computed values are returned, or else non-computed Data objects?

Aha, I like that idea! Thanks, let's do that. I'll add it in today. I was also updating the test to cover a representative range of possible input parameters since it was too basic as it was. I'll push the commits up shortly.

@sadielbartholomew
Copy link
Member Author

sadielbartholomew commented Nov 8, 2022

All feedback incorporated so this is now ready for re-review @davidhassell. Thanks. (Note sinec last review I've also greatly improved the stats test coverage and added any outstanding test warning messages to those to be filtered out, to keep us from being spammed by them when running the test_Data overall.)

@sadielbartholomew
Copy link
Member Author

sadielbartholomew commented Nov 10, 2022

@davidhassell ready for re-review I think. Have my last two commits amended this to what you were thinking, namely:

I think we should return the actual Data objects when compute=False, rather than dask.delayed objects.

?

I assume the Data from the compute=False outputs are showing data values when printed because can_compute is set to return True for development purposes at the moment? Or because a representation operation forces a compute, at least right now? Unless I have missed something?

@davidhassell
Copy link
Collaborator

This look perfect to me. I tried:

In [1]: import cf

In [2]: f = cf.example_field(0)

In [3]: f.data.stats()
Out[3]: 
{'minimum': 0.003,
 'mean': 0.046075,
 'median': 0.036,
 'maximum': 0.146,
 'range': 0.143,
 'mid_range': 0.0745,
 'standard_deviation': 0.03613612285511549,
 'root_mean_square': 0.05855531572795078,
 'sample_size': 40}

In [4]: f.data.stats(compute=False)
Out[4]: 
{'minimum': <CF Data(): 0.003 1>,
 'mean': <CF Data(): 0.046075 1>,
 'median': <CF Data(): 0.036 1>,
 'maximum': <CF Data(): 0.146 1>,
 'range': <CF Data(): 0.143 1>,
 'mid_range': <CF Data(): 0.0745 1>,
 'standard_deviation': <CF Data(): 0.03613612285511549 1>,
 'root_mean_square': <CF Data(): 0.05855531572795078 1>,
 'sample_size': <CF Data(1, 1): [[40]]>}

In [5]: 

The values are appearing in the Data repr for exactly the reason you say (can_compute=True), but if we did stats = f.data.stats(compute=False) then we'd still be lazy, of course.

Copy link
Collaborator

@davidhassell davidhassell left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi @sadielbartholomew, thanks for working through all of these subtleties. Please merge when you're ready.

@sadielbartholomew
Copy link
Member Author

The values are appearing in the Data repr for exactly the reason you say (can_compute=True), but if we did stats = f.data.stats(compute=False) then we'd still be lazy, of course.

Brilliant, thanks for clarifying. Merging now.

@sadielbartholomew sadielbartholomew merged commit 7d99782 into NCAS-CMS:lama-to-dask Nov 10, 2022
@sadielbartholomew sadielbartholomew deleted the lama-to-dask-stats branch November 10, 2022 12:54
@davidhassell davidhassell added this to the 3.14.0 milestone Nov 15, 2022
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

dask Relating to the use of Dask

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants