Skip to content

Fix bugs in _metadata creation and filtering in parquet ArrowEngine#6023

Merged
martindurant merged 16 commits intodask:masterfrom
rjzamora:partitioned-filter
Mar 27, 2020
Merged

Fix bugs in _metadata creation and filtering in parquet ArrowEngine#6023
martindurant merged 16 commits intodask:masterfrom
rjzamora:partitioned-filter

Conversation

@rjzamora
Copy link
Member

@rjzamora rjzamora commented Mar 18, 2020

Mostly addresses #5959 (More work needed for a "_metadata"-based solution)

Closes #5959

The current ArrowEngine implementation is not writing a correct global "_metadata" file, because it is only including metadata for a single parquet partition originating from each dask partition. This PR includes a simple fix for the "_metadata"-creation bug, but still skips the step of populating the "file_path" fields in the metadata. We are skipping this for now, because it is not trivial to back out the full file_path for each metadata element.

While the global metadata file now contains the correct number of row-groups in this PR, the read_metadata definition must ignore it (mostly) when gathering statistics, because the order of the row-groups is different than the order of the dataset.pieces (which uses a natural_sort_key ordering by default). There are several ways to rearrange the "_metadata"-based row groups to align with dataset.pieces, but the most general solution would be to add the "file_path" fields in the metadata (and simply align the row-group paths with the pieces paths). To make this clean, we may want to add an option to populate the file_path in pyarrow's write_to_dataset definition... I am still trying to make a decision on the best solution (I'm certainly open to ideas/suggestions).

EDIT: This PR now generates/uses a correct "_metadata" file for partitioned datasets in ArrowEngine (note that this has fixed some pyarrow-fastparquet compatibility issues). To achieve this, the dataset partitioning is now performed in dask rather than pyarrow. The new code added here for partitioning can be removed if/when the pyarrow write_to_dataset API is modified to return new file paths or to explicitly set the file_path for metadata objects added to metadata_collector.

  • Tests added / passed
  • Passes black dask / flake8 dask

@mrocklin
Copy link
Member

cc @jorisvandenbossche and @martindurant , who might be interested in this

@martindurant
Copy link
Member

The file_path in the pieces themselves should not be populated, though, because they ought to be readable as individual files if need be, but the metadata chunk passed back should contain it. Is my thinking right?

@rjzamora
Copy link
Member Author

The file_path in the pieces themselves should not be populated, though, because they ought to be readable as individual files if need be, but the metadata chunk passed back should contain it. Is my thinking right?

Yes - We want the "file_path" field in "_metadata" to be correctly populated (with a relative path to the root directory where "_metadata" is). However, the footer metadata in the actual parquet-data files should not have this field populated.

@rjzamora
Copy link
Member Author

Note that the latest changes allow the ArrowEngine to generate "correct" "_metadata" files for partitioned datasets. However, this required me to do the actual partitioning within dask (only a bit of code needed to be borrowed from pyarrow). Since the file_path metadata field is now populated, we can sort the row_groups by the path in the same way that we sort the pieces (which ensures that the filtering is correct).

full_path = "/".join([prefix, outfile])
with fs.open(full_path, "wb") as f:
pq.write_table(subtable, f, metadata_collector=md_list, **kwargs)
md_list[-1].set_file_path("/".join([subdir, outfile]))
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@jorisvandenbossche - We can go back to using pyarrow's write_to_dataset api if that function can set the file path for new metadata objects, or pass back the appropriate paths. Do you have any thoughts/suggestions on this?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, at least we should also return the path (so it can be fixed afterwards), or otherwise set it ourselves. Starting to also return the path is not really backwards compatible. About changing the path, that might depend on whether there are other use cases for collecting this metadata (although I assume dask is the main user of this keyword). In any case worth opening a JIRA issue.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Opened ARROW-8244 to discuss/address this.

@rjzamora
Copy link
Member Author

Any thoughts here @martindurant ?

@martindurant
Copy link
Member

I am happy to look again, but I thought you were waiting for clarity from pyarrow?

@rjzamora
Copy link
Member Author

I am happy to look again, but I thought you were waiting for clarity from pyarrow?

Right, I was hoping to get thoughts from @jorisvandenbossche about the possibility of avoiding changes in dask in cases where the changes could be made in pyarrow. However, I would really like to support proper "_metadata" creation/handling for current/older version of pyarrow. So, even if we can get the necessary changes into pyarrow, I do think we need to temporarily move the partitioning logic from write_to_dataset into dask.

@jorisvandenbossche
Copy link
Member

Taking a look now.

But indeed, even if there are some changes that could be made to pyarrow's write_to_dataset, you probably want to include them here already to avoid needing to wait on pyarrow (if it doesn't add too much code)

I actually didn't realise dask was using write_to_dataset (as I was thinking that since each partition of the dask DataFrame needs to be written separately (and to a separate file(s)), you need to handle multiple writes anyway. But I suppose a partition key is not necessarily unique within one dask DataFrame partition, so in that way you still make use of the groupby-functionality of write_to_dataset ?

@martindurant
Copy link
Member

No, partition keys are not unique within partitions. You end up with files like

/key=1/part.0.parq
/key=1/part.1.parq
/key=2/part.0.parq
/key=2/part.1.parq

where the part.0s are from partition 0, etc.

Copy link
Member

@jorisvandenbossche jorisvandenbossche left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added some comments / questions.

One more thing I am wondering: what happens if you specify the combination of split_row_groups=False, gather_statistics=True ? Is filtering supported then?

fs, root_path = pq._get_filesystem_and_path(filesystem, root_path)
pq._mkdir_if_not_exists(fs, root_path)

df = table.to_pandas(ignore_metadata=True)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't know to what extent you want to keep the copy of the code from pyarrow as a pure copy, but, if including it here, there are some potential efficiency improvements to be made.

For example, just before calling this method, there is t = pa.Table.from_pandas(df, preserve_index=preserve_index, schema=schema) that converts the pandas DataFrame to an arrow table. You can also pass the dataframe directly, avoiding the conversion from arrow table back to a pandas dataframe here.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I tried a few different ways of avoiding this. Everything seems to work fine on my local system when I use the original df (with various different environments). However, the Windows CI is returning this failure. I am having trouble figuring out how to avoid this (especially since I cannot reproduce locally).

full_path = "/".join([prefix, outfile])
with fs.open(full_path, "wb") as f:
pq.write_table(subtable, f, metadata_collector=md_list, **kwargs)
md_list[-1].set_file_path("/".join([subdir, outfile]))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, at least we should also return the path (so it can be fixed afterwards), or otherwise set it ourselves. Starting to also return the path is not really backwards compatible. About changing the path, that might depend on whether there are other use cases for collecting this metadata (although I assume dask is the main user of this keyword). In any case worth opening a JIRA issue.

@jorisvandenbossche
Copy link
Member

About changing the path, that might depend on whether there are other use cases for collecting this metadata (although I assume dask is the main user of this keyword).

A github search turned up dask, cudf and spatialpandas as users of the metadata_collector keyword. I assume cudf needs the same fix as dask. I didn't check yet how it's used in spatialpandas.

@rjzamora
Copy link
Member Author

One more thing I am wondering: what happens if you specify the combination of split_row_groups=False, gather_statistics=True ? Is filtering supported then?

This is a great question. Originally, this was not supported (at least the result would be wrong). I just added some logic to merge row-group statistics for each file -- Hopefully this is supported "correctly" now.

@martindurant
Copy link
Member

Does this impact #6017 ?

@rjzamora
Copy link
Member Author

Does this impact #6017 ?

These changes are entirely focused on ArrowEngine - I believe 6017 should only affect FastParquetEngine

Copy link
Member

@martindurant martindurant left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am very glad not to have had to do this work!
The logic is hard to follow, but looks good from a high level. One would think it ought to be doable in a simpler way for all the combinations of partitions/stats/options, but I don't see how. I mainly have questions.
Is row-group-level filtering (multiple row-groups within a file) useful at all? This all seems to be leading towards real row-level filter pushdown...

s["file_path_0"] = row_group.column(0).file_path
if not split_row_groups and (s["file_path_0"] == path_last):
# Rather than appending a new "row-group", just merge
# new `s` statistics into last element of `stats`.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this specific unusual case tested, e.g., having the row-groups in a file out-of-order?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure I follow. This case occurs when the parquet-dataset pieces correspond to files, and each file contains multiple row-groups (a scenario that is tested now). In this case, we will have statistics for each row-group, so we need to convert them into file-level statistics for proper filtering etc.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Right, but I'm wondering whether the order of the row-groups within a file might be important, if we should also tests a case where they are reversed.

Copy link
Member Author

@rjzamora rjzamora Mar 27, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah - I think I see your point. If there are multiple row-groups in the same file, the order of those row groups does not matter if we are doing file-level filtering (split_row_groups=False), but does matter if we want filtering on the row-group level.

So, in this particular code block, there is no danger of filtering the wrong row-groups. If we were using split_row_groups=True, we could theoretically filter the wrong row-group if the intra-file ordering does not match that of pieces. However, since we are using the builtin sorted operation, the sort should be stable. Therefore, I cannot think of a practical case that the order would differ. My assumption is that the intra-file row-group metada is always generated in the correct order by the writer, and that that order is preserved in "_metadata"... I could certainly be wrong here.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think I agree with you. The max of the file for a given column should certainly be the max of the N maxs.

@rjzamora
Copy link
Member Author

Is row-group-level filtering (multiple row-groups within a file) useful at all? This all seems to be leading towards real row-level filter pushdown...

I am not much of a "user," so it is hard for me to say (with confidence) that it is useful. However, I have seen it used before in practice a few times (people dealing with multiple row-groups in very large parquet files).

Regardless of how useful it is, the row-group definitely does seem like it should be the smallest "unit" of filtering to me. Whether or not there are multiple row-groups in a file should not affect whether or not filtering is properly supported.

@martindurant
Copy link
Member

The comment on row-filtering is only pie-in-the-sky, no action for us. Indeed, the filtering would have to happen in the arrow internals anyway.

@TomAugspurger , could you have a look at the unrelated pandas csv failure?

@TomAugspurger
Copy link
Member

@martindurant that looks like #5910 and can probably be ignored.

@jorisvandenbossche
Copy link
Member

I have NEVER seen multiple row groups in a file where the dataset as a whole is partitioned.

I don't know how common it is in practice, but this is actually the case how this all started. I had written such a file (by specifying a row_group_size), which lead me to report the buggy behaviour in #5959.

@martindurant
Copy link
Member

OK, I am happy with everything here.

@rjzamora
Copy link
Member Author

OK, I am happy with everything here.

I really appreciate your time on this @martindurant ! It was also awesome to have your eyes on this @jorisvandenbossche !

@martindurant martindurant merged commit 241027d into dask:master Mar 27, 2020
min_i = stats[-1]["columns"][i]["min"]
max_i = stats[-1]["columns"][i]["max"]
stats[-1]["columns"][i]["min"] = min(min_i, min_n)
stats[-1]["columns"][i]["max"] = min(max_i, max_n)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should this be max instead of min?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Spotted!

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah! Thanks for pointing this out!

@ToelgeKilian
Copy link

Hey @rjzamora,
Sorry for reopening this particular change. It took me a while to find the commit that removed the write_to_dataset function from pyarrow within the dask code.
I am currently facing the following issue:

I'm working with time-series data, which I want to save in a date-partitioned format. Every day I calculate some features a few days ahead and want to save the new calculations in my partitioned data lake in a way that overwrites matching partitions but keeps old partitions.
Pyarrow has a nice functionality for this: pyarrow.parquet.write_to_dataset(existing_data_behavior='delete_matching')
Testing all kinds of combinations of the append, overwrite and ignore_divisions arguments of dask, I was never able to reproduce the wanted behavior.

Following the documentation of dask I read about the different engines ('fastparquet', 'parquet' and earlier also 'parquet-dataset') and the possibility to pass **kwargs down to the specific engine.
Now 'parquet-dataset' was removed as an engine, but seems to be the default, because you are using the ArrowDatasetEngine class.

So with this information I assumed that I could pass the following kwarg to the dask function: dd.to_parquet(existing_data_behavior='delete_matching')
Well turns out I was wrong, because it doesn't recognise the keyword 'existing_data_behavior'.
Turns out the ArrowDatasetEngine class has a very misleading name and the pyarrow engine used by dask is actually the legacy pa.write_table() function.

So after this introduction and deep dive into dask, here come my questions:

  1. Are you guys aware that you are actually not making use of the newer pyarrow.dataset engine when it comes to actually writing the dataset to parquet files? (You are only using it for the initialize_write() function.)
  2. Is there currently a way for me to replicate the wanted behavior of 'delete_matching' with the available arguments within dask? (maybe with fastparquet? I did not yet deep-dive into that part, because I do prefer using pyarrow)
  3. If not, are there any plans to actually step over to the newer pyarrow.dataset engine in order to make fully use of pyarrows capabilities?

Kind regards,
Kilian

P.S.: Sorry for not creating a new issue with this. I thought it might be helpful to link the change that removed the pyarrow.dataset engine from the dask code base to my question.

@rjzamora
Copy link
Member Author

  1. Are you guys aware that you are actually not making use of the newer pyarrow.dataset engine when it comes to actually writing the dataset to parquet files? (You are only using it for the initialize_write() function.)

Yes - The write API was a pretty late addition to pyarrow.dataset, and is a bit less convenient than the read API for a parallel framework like Dask. To say this another way: The "arrow-dataset" engine was originally added at a time when pyarrow.dataset could only be used to read existing data, and Dask's ArrowDatasetEngine has not been updated to actually use these newer write features yet.

Since the Dask programming model more-or-less assumes that we can/will write out each partition in an embarrassingly-parallel fashion, using a feature like existing_data_behavior='delete_matching' is far from simple. In order to use this feature "safely" we would need to add a lot of validation logic or completely change the to_parquet algorithm to guarantee that we are not overwriting the wrong data (e.g. we need to know that concurrent processes i and j cannot be writing data to the same directory partition).

  1. Is there currently a way for me to replicate the wanted behavior of 'delete_matching' with the available arguments within dask? (maybe with fastparquet? I did not yet deep-dive into that part, because I do prefer using pyarrow)

If your data is already partitioned (in the Dask-DataFrame sense) in such a way that you know data from two distinct Dask partitions will never be written to the same parquet-directory partition, then you may be best-off implementing this logic yourself. For example, I haven't tested this code myself, but I can imagine you doing something like the following:

ddf = <your collection>
base_dir = <root of your partitioned dataset>

# Define custom logic to write a partition to disk
def custom_write(df):

    # Convert df to an arrow table, and then use pyarrow's dataset API as you wish... 
    ds.write_dataset(table, base_dir , …, existing_data_behavior=delete_matching’)

    # Probably simplest to return an empty dataframe to make map_partitions happy
    return ddf._meta

# Call/compute map_partitions to write out your data
# using your own custom logic
ddf.map_partitions(custom_write).compute()

If not, are there any plans to actually step over to the newer pyarrow.dataset engine in order to make fully use of pyarrows capabilities?

I'm not aware of any explicit plans, but I'd be open to it if such an update would be beneficial.

@ToelgeKilian
Copy link

Thank you for your quick reply!
Okay, that makes it more understandable why you were referring to it as 'pyarrow-dataset' engine.

I'm new to dask and the entire split-up-dataframe mechanics, but I think I understand enough to see that this behavior might be very complex to implement without running the risk of deleting other dask-dataframe partitions.

About my dataset: I know that it is already sorted by date, but I don't know if the underlying partitions are split in between dates or if the same date can exist in multiple partitions. How can I ensure that the second one is not the case?
Because I have multiple rows associated with the same date. I guess that's also why I cannot use date as an index within dask, because it is not unique per row.

And in general with regards to your last sentence, I think it's always beneficial to stay up-to-date with the latest features and changes of the underlying packages. But I can imagine that it would take quite some effort to implement those changes, especially since writing to parquet is just one of the many functionalities of dask.

@ToelgeKilian
Copy link

@rjzamora I tried with just deleting the existing partitions above my minimum date myself. But now I get an error when trying to load the dataset again, because of course the metadata file is not updated with the removed partitions.
So I guess manually deleting partitions is not a good approach to this? Or is there an easy way to also remove those partitions from the metadata file?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

BUG: Filtering on columns in read_parquet with partitioned datasets working incorrectly

6 participants