Skip to content

WIP: DataFrame.iloc implementation (backed by partition_sizes member)#6661

Closed
ryan-williams wants to merge 88 commits intodask:mainfrom
celsiustx:sizes
Closed

WIP: DataFrame.iloc implementation (backed by partition_sizes member)#6661
ryan-williams wants to merge 88 commits intodask:mainfrom
celsiustx:sizes

Conversation

@ryan-williams
Copy link
Contributor

@ryan-williams ryan-williams commented Sep 23, 2020

  • Tests added / passed
  • Passes black dask / flake8 dask

  • rebased ≈2021-05-20T01:00Z
  • description below updated 2021-03-23
  • Dask Summit lightning talk slides: bit.ly/dask-summit-iloc

DataFrame.iloc is generally unsupported, since it requires knowing the number of elements in each partition. DataFrames usually know each partition's index bounds (divisions), which allows implementing loc, but the partition sizes are never computed or saved.

In some cases, it is possible to infer, store, and propagate partition sizes at graph-construction time, so I make a best effort to do so here, and implement iloc for DataFrames that have the corresponding partition_sizes member set.

Many DataFrame-creation methods (converting from Pandas, chunked file formats, or Dask Arrays w/ known chunks) can seed partition_sizes, and many transformations can propagate existing partition_sizes, so a large proportion of DataFrames in the wild may be able to benefit from these new members.

Reviewing

I've broken the change into 12 sequential chunks of 5-15 fine-grained commits. Basic CI checks pass on every chunk, and they can in principle be reviewed and merged sequentially:

  • p0..p1 (squashed: q0..q1)
    • add partition_sizes to DataFrame constructors and state
    • add preserve_partition_sizes flag to map_partitions that call-sites can set to True during "narrow" map steps, to ensure partition_sizes are propagated
    • add partition-size checking to assert_eq, similar to existing check_divisions functionality; also allow for more configurable verifications of both divisions and partition_sizes
  • p1..p2 (q1..q2)
    • support from_pandas (results in many assert_eq tests starting to have partition_sizes values for one of the _Frames being checked)
    • support len backed by partition_sizes (when present)
    • is_scalar_for_elemwise tweak
  • p2..p3 (q2..q3): from_dask_array, map, __getitem__(str|scalar)
  • p3..p4 (q3..q4): rename, copy, some repartition paths, loc, some parquet support
  • p4..p5 (q4..q5): fillna, align, more repartition, head, tail, mode, …
  • p5..p6 (q5..q6): merge_asof, isna, get/set index, …
  • p6..p7 (q6..q7): values, to_frame
  • p7..p8 (q7..q8): apply, reset_index, binary methods
  • p8..p9 (q8..q9): assign, stats/agg methods
  • p9..p10 (q9..q10): __getitem__(list/ndarray), consolidate Series and DataFrame __getitem__ into _Frame, Array.__getitem__(Series), preserve delegation of Array ops to _Frame
  • p10..p11 (q10..q11): _Frame.iloc, tests 🎉
  • p11..ps (q11..qs): repartition_sizes using iloc, use repartition in to_dask_array

Notes:

One path forward would be for me to file separate PRs for the first chunk or couple chunks, to begin a real review process on them. Suggestions welcome.

Rebasing Notes

  • I rebase p0 by rebasing its constituent branches, then using a git-octomerge helper:
    for b in nits divisions parquet; do gco $b && grb; done && gco main && gom nits dcr divisions parquet
  • Then I rebase the rest of the p line using git-rebase-sequence:
    grq p0 <previous p0> p1 p2 p3 p4 p5 p6 p7 p8 p9 p10 p11 ps
    
  • I update the "q*" (squashed-chunk) branches from the "p" branches using this script:
    gsqsq p0 $(for i in `seq 1 11` s; do printf "p$i:q$i "; done; echo)
    

Progress

I've factored out #7214, #7215, #7249 #7388 of this PR.

#7396 is a prefix of this PR, and can in principle be reviewed+merged first.

Implementation Notes

A few pointers to impt new code blocks (I'll try to keep these updated as I update the code):

Identifying _Frames with "shape contains a Delayed" (if any(is_dask_collection(x) for x in shape))

In several places, "shape contains a Delayed" is used to detect whether something is a DataFrame or Series. That relationship was a bit cryptic/indirect, and is less straightforward now (as many _Frames now know their shapes), so I tweaked it in various places.

Here is one example, in is_scalar_for_elemwise.

Another example is: most array binary ops were previously returning NotImplemented due to identifying arguments as _Frames in this manner (which allowed _Frame's version of the ops to be used). Those ops now explicitly import _Frame and do an isinstance test. Importing dask.dataframe from within dask.array seems like a smell, but it is apparently already done (always in a local scope, to avoid circular imports at the top level), and I'm not sure of another way to handle the situation. code

# TODO: partition_sizes

I left a # TODO: partition_sizes next to DataFrame transformations where I wasn't sure whether partition_sizes can be propagated (or haven't gotten around to implementing it; this commit contains many examples); commonly this occurs on map_partitions and new_dd_object calls.

Here's a command for finding them:

git grep 'TODO.*partition.sizes'

Setting df.partition_sizes = None in tests

Most DataFrame tests construct DataFrames via from_pandas, which now populates partition_sizes. This can result in tests failing to cover non-partition-size-aware code paths (which to date have been the default/only code paths), and makes it easy to regress or miss coverage for that functionality.

In a few tests, I explicitly set partition_sizes = None on an already-constructed DataFrame (example). I'm not sure if disabling partition_sizes like this should be a kwarg on from_pandas, or more tests should be parameterized to explicitly test the partition-sizeful and partition-sizeless paths, or what. Open to discussion.

TODOs

  • more documentation
  • factor iloc implementation (IlocIndexer.__getitem__)
  • testing:
    • more coverage likely needed
    • performance: check_divisions and check_partition_sizes in assert_eq each compute DDFs' partitions; they can be combined for efficiency
    • perhaps allow those check_* params to take string values that indicate that each DDF's divisions/partition_sizes should be verified to be internally consistent (or None), but are allowed/expected to not equal each other (e.g. when testing repartitioning, or when division/partition_size info is expected to be lost during specific transformations)
  • some TODOs in the code should probably be resolved before merging; closer look at those warranted (git grep 'TODO.*partition.sizes')
  • suspected backwards overlap check should be factored out as own PR, specifically tested I think this is done
  • verify parquet num-rows statistics correct (esp in presence of multiple row-groups; I'm guessing tests already cover this, but I played a bit of whack-a-mole getting them all passing across {fastparquet,pyarrow}) I think this is done
  • double-check long-tail of aggregation functions where partition_sizes should be [the number of output columns] (mean,var,sem are tested and believed correct, but others may not be) I think this is done

Follow-on work

  • infer partition_sizes in more DDF construction methods (e.g. HDF5, other formats?); can be follow-on PR
  • fix from_pandas wonky partitioning (partition_sizes is correct based on existing implementation, it just sometimes results in skewed chunks for no good reason); can be follow-on PR

@TomAugspurger
Copy link
Member

Thanks for opening this. It'll take a bit for someone to be able to go through it. Anything you can do to assist in that by breaking things into smaller, easier to review chunks would be welcome. For example, I'd guess that adding partition lengths to DataFrame's metadata can be done as a pre-cursor to supporting iloc.

@ryan-williams ryan-williams changed the title DO NOT MERGE: prototype DataFrame.iloc implementation (backed by partition_sizes membe) DO NOT MERGE: prototype DataFrame.iloc implementation (backed by partition_sizes member) Dec 18, 2020
@ryan-williams
Copy link
Contributor Author

just to note: I am cleaning this up and hope to have something reviewable this wknd

@ryan-williams ryan-williams force-pushed the sizes branch 8 times, most recently from 548f2c5 to 93ad340 Compare January 28, 2021 15:43
@ryan-williams
Copy link
Contributor Author

I had this rebased + passing yesterday 🎉 I'm optimistic that the latest conflicting changes will be easy to re-rebase onto, This Weekend™️.

I've also tagged 13 of the 86 commits here with branch pointers (p0 through p11 and ps in celsiustx/dask) that each pass a more limited set of CI checks.

I'm planning to send a fresh PR that successively adds each of those chunks as a single squashed commit, and lets us verify that all CI checks pass at each one. Hopefully that will make reviewing (and even incremental merging) more tractable.

@ryan-williams ryan-williams changed the title DO NOT MERGE: prototype DataFrame.iloc implementation (backed by partition_sizes member) WIP: DataFrame.iloc implementation (backed by partition_sizes member) Feb 14, 2021
@ryan-williams ryan-williams force-pushed the sizes branch 2 times, most recently from c1cbfd6 to 66501be Compare February 17, 2021 20:46
@ryan-williams
Copy link
Contributor Author

This is rebased on current HEAD + a merge of #7214 and #7215, and CI seems to pass (modulo the macos builds that seem to be in a long queue for all PRs).

One thing I'll do now is open a 2nd PR where I can push sequentially push ≈10 chunks of the ≈80 commits here, in sequence, to verify that the CI passes at each "checkpoint".

I also have a version that is just 10 squashed commits with those chunks. Open to discussing what makes the reviewing ergonomics easiest.

@ryan-williams ryan-williams force-pushed the sizes branch 2 times, most recently from 9604650 to b1e921d Compare February 19, 2021 01:30
@ryan-williams
Copy link
Contributor Author

Alright, I did a big update to the PR description with fresh code pointers, info, and "compare" links to 12 chunks that this PR decomposes into.

Happy to keep working on making this more manageable to review, but feel like it's at a place where someone can at least try to read the OP again and let me know what needs clarifying.

Comment on lines +2096 to +2052
from ..dataframe import DataFrame, Series

if isinstance(other, (DataFrame, Series)):
return NotImplemented
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can this be moved to a function? Seems it's repeated quite a bit.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yea, good idea.

This is definitely one of the parts I wanted to come back to, make sure I understand all the semantics of better, and document. There are apparently some subtleties around which things return NotImplemented in order to delegate to which other things.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

alright, I factored this out to a _elemwise_binary_df_op helper. A couple other ops look similar but don't use elemwise, so I've left them as-is (__matmul__, __divmod__). Didn't see a way to include them in the DRYing w/o over-complicating things.

I'm not 100% sure about which ops should do this "delegate to _Frame" vs. not; I added it to left-associative ops (tests were surfacing problems as I implemented previously-unimplemented code-paths related to iloc`) but not their right-associative counterparts. I don't know if that's correct/complete; need to dig in to the details more.

@ilan-gold
Copy link
Contributor

Hello, what is needed to push this over the edge? Can someone from outside help in some way? I'm unlcear if the TODO's are really done or not since the PR is a bit old now.

@ncclementi
Copy link
Member

Hello, what is needed to push this over the edge? Can someone from outside help in some way? I'm unlcear if the TODO's are really done or not since the PR is a bit old now.

This is a very old PR and I see there are many conflicts. I'm not really sure, maybe @ryan-williams or @TomAugspurger can comment on the state of this?

@TomAugspurger
Copy link
Member

I haven't taken another look, but just noting that #9473 is exploring additional partition metadata, including length. IIRC that was relevant for this PR.

@ryan-williams
Copy link
Contributor Author

Yea, this PR did a few things that probably each needed a separate PR:

  1. add partition_sizes member to DDF
  2. propagate it through as many DDF creation and transformation code paths as possible
  3. use partition_sizes to implement DDF.iloc

It looks like #9473 is tackling 1. and maybe 2., happy to see that progressing!

Closing this one as there's no intention of reviving it.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

5 participants