Skip to content

HLG optimization for read_parquet + iloc#6345

Closed
gforsyth wants to merge 3 commits intodask:masterfrom
gforsyth:iloc_hlg
Closed

HLG optimization for read_parquet + iloc#6345
gforsyth wants to merge 3 commits intodask:masterfrom
gforsyth:iloc_hlg

Conversation

@gforsyth
Copy link
Contributor

@gforsyth gforsyth commented Jun 25, 2020

  • Tests added / passed
  • Passes black dask / flake8 dask

Starting to address one of the points raised by @martindurant in #6264.

Adds a high level graph optimization to handle a read_parquet followed by a full-column iloc by updating the ParquetSubgraph.

Very much a draft and very much not completely working.

Need to:

  • make it pass tests
  • [ ] not assume that all calls to iloc are for full column selections dask only supports full column selection, so nm for now
  • Think about how / if to combine parts of this with the existing getitem optimizations as there are a bunch of similarities (and a number of differences)

But, initial simple benchmark:
timeseries parquet file created with dask.dataframe.demo with 4 columns and 34447680 rows

on master:

%%time
len(df)

CPU times: user 8.04 s, sys: 4.2 s, total: 12.2 s
Wall time: 6.21 s

with optimization:

%%time
len(df)

running getitem optimization
running iloc optimization
Updating iloc subgraph!
CPU times: user 4.62 s, sys: 734 ms, total: 5.35 s
Wall time: 2.14 s

@TomAugspurger
Copy link
Member

Question: In my head I was thinking of an approach like

  1. Allow iloc in addition to getitem at
    if list(block.dsk.values())[0][0] != operator.getitem:
    # ... where this value is __getitem__...
    return dsk
  2. For iloc, use meta[block.indices[1][0] at
    block_columns = block.indices[1][0]

Do you know if something like that has a chance of working?

@gforsyth
Copy link
Contributor Author

  1. Allow iloc in addition to getitem at

yeah, so I've found a better check which is checking that block.output.startswith("iloc-") -- there's no clean operator check since the Blockwise looks like

> block.dsk
{'iloc-677f4093af82134a39609fb71cf9586a': (<function apply at 0x7fb6ff4808b0>, <function apply_and_enforce at 0x7fb701cfc940>, ['_0', '_1'], (<class 'dict'>, [['_func', <function iloc at 0x7fb701ccd790>], ['_meta', Empty DataFrame
Columns: [name, x]
Index: []]]))}

re 2: I think I got too fancy trying to handle slices -- might make more sense to take a first cut with single columns.

@gforsyth
Copy link
Contributor Author

Ok, I think that's working (still as a separate optimization function). I'll spend some time tomorrow to see what an overlay would look like. Most of the code is duplicated, as you noted, just some special handling around column names vs. indices, I think.

@gforsyth gforsyth changed the title [WIP] HLG optimization for read_parquet + iloc HLG optimization for read_parquet + iloc Jun 26, 2020
@gforsyth gforsyth marked this pull request as ready for review June 26, 2020 15:27
This extends the existing `read_parquet -> getitem` optimization to also cover
`read_parquet -> iloc`.

The way it's currently set up it:
* Only supports single-column selection (this could be changed)
* Doesn't support dataframes with duplicate column names
@gforsyth
Copy link
Contributor Author

Ok, this is ready for another look. It was more straight-forward than I thought to combine them (woo!).
Really the only bits where there is divergence with the existing getitem stuff is that the block indices are indices instead of column names and the check for an iloc doesn't have a nice operator check available.

if list(block.dsk.values())[0][0] != operator.getitem:
if list(block.dsk.values())[0][
0
] != operator.getitem and not block.output.startswith("iloc"):
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm open to suggestions if there's a better way to check for a read_parquet -> iloc

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To be sure: does the current optimization work with loc, which is really identical to column selection if the other selector is :?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I doubt that it works with .loc.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm open to suggestions if there's a better way to check for a read_parquet -> iloc

Probably out of scope for this PR, but I think we'll want to specialize the kind of Block that we return from an iloc operation. This feels similar to #6261 where I added things like BlockwiseGetitem to avoid having to dive into Blockwise objects to figure out what they are.

Copy link
Member

@TomAugspurger TomAugspurger left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just to be sure, can you do something like

# column order is A B C
df = dd.read_parquet(...)[['B', 'A']]  # swap the columns
result = df.iloc[:, 0]
assert result.name == "B"
assert result.compute().name == "B"

if list(block.dsk.values())[0][0] != operator.getitem:
if list(block.dsk.values())[0][
0
] != operator.getitem and not block.output.startswith("iloc"):
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I doubt that it works with .loc.

if list(block.dsk.values())[0][0] != operator.getitem:
if list(block.dsk.values())[0][
0
] != operator.getitem and not block.output.startswith("iloc"):
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm open to suggestions if there's a better way to check for a read_parquet -> iloc

Probably out of scope for this PR, but I think we'll want to specialize the kind of Block that we return from an iloc operation. This feels similar to #6261 where I added things like BlockwiseGetitem to avoid having to dive into Blockwise objects to figure out what they are.


block_columns = block.indices[1][0]
if isinstance(block_columns, slice):
# only single-column iloc is currently optimized
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What prevents us from doing the same slice on old.meta.columns[block_columns] here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nothing prevents it here, but I noticed that the set operation below scrambles the column order with some regularity

@gforsyth
Copy link
Contributor Author

gforsyth commented Jun 26, 2020

# column order is A B C
df = dd.read_parquet(...)[['B', 'A']]  # swap the columns
result = df.iloc[:, 0]
assert result.name == "B"
assert result.compute().name == "B"

That's a great check, added as another test.
Also yes, that works 🎉

@mrocklin
Copy link
Member

Quick thought: what if instead we changed the definition of df.iloc[:, ...] to call df[...] in common case simple situations?

I understand that there are some tricky cases of iloc when we have duplicate column names, but my guess is that those are very rare. By trying to centralize many API routes down to a few common operations we might be able to more easily reason about optimizations in the future.

@martindurant
Copy link
Member

what if instead we changed the definition of df.iloc[:, ...] to call df[...] in common case simple situations?

I think that's what I originally had conceived of. In that case, though, we would have to check for and forgo the optimisation in the case of duplicate labels. Note that parquet does not allow duplicates, but pyarrow will apply pandas names to columns on load, so duplicates are technically still possible.

@mrocklin
Copy link
Member

Yeah, duplicate labels seem uncommon to me though. I guess this comes down to figuring out the cost of maintaining an alternate iloc optimization code path vs the value of how often this code path will be used. I don't have a good sense of that.

@gforsyth
Copy link
Contributor Author

Quick thought: what if instead we changed the definition of df.iloc[:, ...] to call df[...] in common case simple situations?

I'm game to try that out and see what it looks like.
I can think of two ways to handle this:
a) tweak the HLG and replace iloc with getitem there
b) change the iloc method on dask.dataframe.core to do a column name lookup and then dispatch to __getitem__

Any strong feelings on approach?

@mrocklin
Copy link
Member

mrocklin commented Jun 29, 2020 via email

@gforsyth gforsyth mentioned this pull request Jun 29, 2020
2 tasks
@TomAugspurger
Copy link
Member

So this may not be needed after #6355?

@gforsyth
Copy link
Contributor Author

Yep, closing in favor of #6355

@gforsyth gforsyth closed this Jun 30, 2020
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants