WIP: DataFrame.iloc implementation (backed by partition_sizes member)#6661
WIP: DataFrame.iloc implementation (backed by partition_sizes member)#6661ryan-williams wants to merge 88 commits intodask:mainfrom
partition_sizes member)#6661Conversation
|
Thanks for opening this. It'll take a bit for someone to be able to go through it. Anything you can do to assist in that by breaking things into smaller, easier to review chunks would be welcome. For example, I'd guess that adding partition lengths to DataFrame's metadata can be done as a pre-cursor to supporting iloc. |
partition_sizes membe)partition_sizes member)
|
just to note: I am cleaning this up and hope to have something reviewable this wknd |
548f2c5 to
93ad340
Compare
|
I had this rebased + passing yesterday 🎉 I'm optimistic that the latest conflicting changes will be easy to re-rebase onto, This Weekend™️. I've also tagged 13 of the 86 commits here with branch pointers ( I'm planning to send a fresh PR that successively adds each of those chunks as a single squashed commit, and lets us verify that all CI checks pass at each one. Hopefully that will make reviewing (and even incremental merging) more tractable. |
partition_sizes member)partition_sizes member)
c1cbfd6 to
66501be
Compare
|
This is rebased on current HEAD + a merge of #7214 and #7215, and CI seems to pass (modulo the macos builds that seem to be in a long queue for all PRs). One thing I'll do now is open a 2nd PR where I can push sequentially push ≈10 chunks of the ≈80 commits here, in sequence, to verify that the CI passes at each "checkpoint". I also have a version that is just 10 squashed commits with those chunks. Open to discussing what makes the reviewing ergonomics easiest. |
9604650 to
b1e921d
Compare
|
Alright, I did a big update to the PR description with fresh code pointers, info, and "compare" links to 12 chunks that this PR decomposes into. Happy to keep working on making this more manageable to review, but feel like it's at a place where someone can at least try to read the OP again and let me know what needs clarifying. |
| from ..dataframe import DataFrame, Series | ||
|
|
||
| if isinstance(other, (DataFrame, Series)): | ||
| return NotImplemented |
There was a problem hiding this comment.
Can this be moved to a function? Seems it's repeated quite a bit.
There was a problem hiding this comment.
Yea, good idea.
This is definitely one of the parts I wanted to come back to, make sure I understand all the semantics of better, and document. There are apparently some subtleties around which things return NotImplemented in order to delegate to which other things.
There was a problem hiding this comment.
alright, I factored this out to a _elemwise_binary_df_op helper. A couple other ops look similar but don't use elemwise, so I've left them as-is (__matmul__, __divmod__). Didn't see a way to include them in the DRYing w/o over-complicating things.
I'm not 100% sure about which ops should do this "delegate to _Frame" vs. not; I added it to left-associative ops (tests were surfacing problems as I implemented previously-unimplemented code-paths related to iloc`) but not their right-associative counterparts. I don't know if that's correct/complete; need to dig in to the details more.
|
Hello, what is needed to push this over the edge? Can someone from outside help in some way? I'm unlcear if the TODO's are really done or not since the PR is a bit old now. |
This is a very old PR and I see there are many conflicts. I'm not really sure, maybe @ryan-williams or @TomAugspurger can comment on the state of this? |
|
I haven't taken another look, but just noting that #9473 is exploring additional partition metadata, including length. IIRC that was relevant for this PR. |
|
Yea, this PR did a few things that probably each needed a separate PR:
It looks like #9473 is tackling 1. and maybe 2., happy to see that progressing! Closing this one as there's no intention of reviving it. |
black dask/flake8 daskDataFrame.ilocis generally unsupported, since it requires knowing the number of elements in each partition. DataFrames usually know each partition's index bounds (divisions), which allows implementingloc, but the partition sizes are never computed or saved.In some cases, it is possible to infer, store, and propagate partition sizes at graph-construction time, so I make a best effort to do so here, and implement
ilocfor DataFrames that have the correspondingpartition_sizesmember set.Many DataFrame-creation methods (converting from Pandas, chunked file formats, or Dask Arrays w/ known
chunks) can seedpartition_sizes, and many transformations can propagate existingpartition_sizes, so a large proportion of DataFrames in the wild may be able to benefit from these new members.Reviewing
I've broken the change into 12 sequential chunks of 5-15 fine-grained commits. Basic CI checks pass on every chunk, and they can in principle be reviewed and merged sequentially:
p0..p1(squashed:q0..q1)partition_sizesto DataFrame constructors and statepreserve_partition_sizesflag tomap_partitionsthat call-sites can set toTrueduring "narrow" map steps, to ensurepartition_sizesare propagatedassert_eq, similar to existingcheck_divisionsfunctionality; also allow for more configurable verifications of bothdivisionsandpartition_sizesp1..p2(q1..q2)from_pandas(results in manyassert_eqtests starting to havepartition_sizesvalues for one of the _Frames being checked)lenbacked bypartition_sizes(when present)is_scalar_for_elemwisetweakp2..p3(q2..q3):from_dask_array,map,__getitem__(str|scalar)p3..p4(q3..q4):rename,copy, somerepartitionpaths,loc, someparquetsupportp4..p5(q4..q5):fillna,align, morerepartition,head,tail,mode, …p5..p6(q5..q6):merge_asof,isna, get/set index, …p6..p7(q6..q7):values,to_framep7..p8(q7..q8):apply,reset_index, binary methodsp8..p9(q8..q9):assign, stats/agg methodsp9..p10(q9..q10):__getitem__(list/ndarray), consolidateSeriesandDataFrame__getitem__into_Frame,Array.__getitem__(Series), preserve delegation ofArrayops to_Framep10..p11(q10..q11):_Frame.iloc, tests 🎉p11..ps(q11..qs):repartition_sizesusingiloc, userepartitioninto_dask_arrayNotes:
qare squashed versions of each correspondingpchunk (the tree atqXis the same as the tree atpX), in case that's easier to review (e.g. q0..qs is all the squashed chunks).p0, is an octo-merge of open PRs I've factored out of this one (currently Improve handling of duplicatedivisions(i.e. empty partitions) #7214, comment fixes #7215, Ensure natural sort order in parquet part paths #7249, and dead code removal / fixes #7388).q0(squashed-chunk base) is the same commit asp0.One path forward would be for me to file separate PRs for the first chunk or couple chunks, to begin a real review process on them. Suggestions welcome.
Rebasing Notes
p0by rebasing its constituent branches, then using agit-octomergehelper:pline usinggit-rebase-sequence:q*" (squashed-chunk) branches from the "p" branches using this script:Progress
I've factored out #7214, #7215, #7249 #7388 of this PR.
#7396 is a prefix of this PR, and can in principle be reviewed+merged first.
Implementation Notes
A few pointers to impt new code blocks (I'll try to keep these updated as I update the code):
ilocimplementation (IlocIndexer.__getitem__) (could use some factoring!)iloctestsIdentifying _Frames with "
shapecontains aDelayed" (if any(is_dask_collection(x) for x in shape))In several places, "shape contains a Delayed" is used to detect whether something is a DataFrame or Series. That relationship was a bit cryptic/indirect, and is less straightforward now (as many
_Frames now know their shapes), so I tweaked it in various places.Here is one example, in
is_scalar_for_elemwise.Another example is: most array binary ops were previously returning
NotImplementeddue to identifying arguments as_Frames in this manner (which allowed_Frame's version of the ops to be used). Those ops now explicitly import_Frameand do anisinstancetest. Importingdask.dataframefrom withindask.arrayseems like a smell, but it is apparently already done (always in a local scope, to avoid circular imports at the top level), and I'm not sure of another way to handle the situation. code# TODO: partition_sizesI left a
# TODO: partition_sizesnext to DataFrame transformations where I wasn't sure whetherpartition_sizescan be propagated (or haven't gotten around to implementing it; this commit contains many examples); commonly this occurs onmap_partitionsandnew_dd_objectcalls.Here's a command for finding them:
git grep 'TODO.*partition.sizes'Setting
df.partition_sizes = Nonein testsMost DataFrame tests construct DataFrames via
from_pandas, which now populatespartition_sizes. This can result in tests failing to cover non-partition-size-aware code paths (which to date have been the default/only code paths), and makes it easy to regress or miss coverage for that functionality.In a few tests, I explicitly set
partition_sizes = Noneon an already-constructedDataFrame(example). I'm not sure if disablingpartition_sizeslike this should be a kwarg onfrom_pandas, or more tests should be parameterized to explicitly test the partition-sizeful and partition-sizeless paths, or what. Open to discussion.TODOs
ilocimplementation (IlocIndexer.__getitem__)check_divisionsandcheck_partition_sizesinassert_eqeach compute DDFs' partitions; they can be combined for efficiencycheck_*params to take string values that indicate that each DDF's divisions/partition_sizes should be verified to be internally consistent (or None), but are allowed/expected to not equal each other (e.g. when testing repartitioning, or when division/partition_size info is expected to be lost during specific transformations)git grep 'TODO.*partition.sizes')suspected backwards overlap check should be factored out as own PR, specifically testedI think this is doneverify parquet num-rows statistics correct (esp in presence of multiple row-groups; I'm guessing tests already cover this, but I played a bit of whack-a-mole getting them all passing across {fastparquet,pyarrow})I think this is donedouble-check long-tail of aggregation functions whereI think this is donepartition_sizesshould be [the number of output columns] (mean,var,semare tested and believed correct, but others may not be)Follow-on work
partition_sizesin more DDF construction methods (e.g. HDF5, other formats?); can be follow-on PRfrom_pandaswonky partitioning (partition_sizesis correct based on existing implementation, it just sometimes results in skewed chunks for no good reason); can be follow-on PR