Fix bugs in _metadata creation and filtering in parquet ArrowEngine#6023
Fix bugs in _metadata creation and filtering in parquet ArrowEngine#6023martindurant merged 16 commits intodask:masterfrom
Conversation
|
cc @jorisvandenbossche and @martindurant , who might be interested in this |
|
The file_path in the pieces themselves should not be populated, though, because they ought to be readable as individual files if need be, but the metadata chunk passed back should contain it. Is my thinking right? |
Yes - We want the "file_path" field in "_metadata" to be correctly populated (with a relative path to the root directory where "_metadata" is). However, the footer metadata in the actual parquet-data files should not have this field populated. |
|
Note that the latest changes allow the |
dask/dataframe/io/parquet/arrow.py
Outdated
| full_path = "/".join([prefix, outfile]) | ||
| with fs.open(full_path, "wb") as f: | ||
| pq.write_table(subtable, f, metadata_collector=md_list, **kwargs) | ||
| md_list[-1].set_file_path("/".join([subdir, outfile])) |
There was a problem hiding this comment.
@jorisvandenbossche - We can go back to using pyarrow's write_to_dataset api if that function can set the file path for new metadata objects, or pass back the appropriate paths. Do you have any thoughts/suggestions on this?
There was a problem hiding this comment.
Yes, at least we should also return the path (so it can be fixed afterwards), or otherwise set it ourselves. Starting to also return the path is not really backwards compatible. About changing the path, that might depend on whether there are other use cases for collecting this metadata (although I assume dask is the main user of this keyword). In any case worth opening a JIRA issue.
|
Any thoughts here @martindurant ? |
|
I am happy to look again, but I thought you were waiting for clarity from pyarrow? |
Right, I was hoping to get thoughts from @jorisvandenbossche about the possibility of avoiding changes in dask in cases where the changes could be made in pyarrow. However, I would really like to support proper "_metadata" creation/handling for current/older version of pyarrow. So, even if we can get the necessary changes into pyarrow, I do think we need to temporarily move the partitioning logic from |
|
Taking a look now. But indeed, even if there are some changes that could be made to pyarrow's I actually didn't realise dask was using |
|
No, partition keys are not unique within partitions. You end up with files like where the part.0s are from partition 0, etc. |
jorisvandenbossche
left a comment
There was a problem hiding this comment.
Added some comments / questions.
One more thing I am wondering: what happens if you specify the combination of split_row_groups=False, gather_statistics=True ? Is filtering supported then?
| fs, root_path = pq._get_filesystem_and_path(filesystem, root_path) | ||
| pq._mkdir_if_not_exists(fs, root_path) | ||
|
|
||
| df = table.to_pandas(ignore_metadata=True) |
There was a problem hiding this comment.
I don't know to what extent you want to keep the copy of the code from pyarrow as a pure copy, but, if including it here, there are some potential efficiency improvements to be made.
For example, just before calling this method, there is t = pa.Table.from_pandas(df, preserve_index=preserve_index, schema=schema) that converts the pandas DataFrame to an arrow table. You can also pass the dataframe directly, avoiding the conversion from arrow table back to a pandas dataframe here.
There was a problem hiding this comment.
I tried a few different ways of avoiding this. Everything seems to work fine on my local system when I use the original df (with various different environments). However, the Windows CI is returning this failure. I am having trouble figuring out how to avoid this (especially since I cannot reproduce locally).
dask/dataframe/io/parquet/arrow.py
Outdated
| full_path = "/".join([prefix, outfile]) | ||
| with fs.open(full_path, "wb") as f: | ||
| pq.write_table(subtable, f, metadata_collector=md_list, **kwargs) | ||
| md_list[-1].set_file_path("/".join([subdir, outfile])) |
There was a problem hiding this comment.
Yes, at least we should also return the path (so it can be fixed afterwards), or otherwise set it ourselves. Starting to also return the path is not really backwards compatible. About changing the path, that might depend on whether there are other use cases for collecting this metadata (although I assume dask is the main user of this keyword). In any case worth opening a JIRA issue.
A github search turned up dask, cudf and spatialpandas as users of the |
… table-df round trip may be necessary
This is a great question. Originally, this was not supported (at least the result would be wrong). I just added some logic to merge row-group statistics for each file -- Hopefully this is supported "correctly" now. |
|
Does this impact #6017 ? |
These changes are entirely focused on |
martindurant
left a comment
There was a problem hiding this comment.
I am very glad not to have had to do this work!
The logic is hard to follow, but looks good from a high level. One would think it ought to be doable in a simpler way for all the combinations of partitions/stats/options, but I don't see how. I mainly have questions.
Is row-group-level filtering (multiple row-groups within a file) useful at all? This all seems to be leading towards real row-level filter pushdown...
| s["file_path_0"] = row_group.column(0).file_path | ||
| if not split_row_groups and (s["file_path_0"] == path_last): | ||
| # Rather than appending a new "row-group", just merge | ||
| # new `s` statistics into last element of `stats`. |
There was a problem hiding this comment.
Is this specific unusual case tested, e.g., having the row-groups in a file out-of-order?
There was a problem hiding this comment.
Not sure I follow. This case occurs when the parquet-dataset pieces correspond to files, and each file contains multiple row-groups (a scenario that is tested now). In this case, we will have statistics for each row-group, so we need to convert them into file-level statistics for proper filtering etc.
There was a problem hiding this comment.
Right, but I'm wondering whether the order of the row-groups within a file might be important, if we should also tests a case where they are reversed.
There was a problem hiding this comment.
Ah - I think I see your point. If there are multiple row-groups in the same file, the order of those row groups does not matter if we are doing file-level filtering (split_row_groups=False), but does matter if we want filtering on the row-group level.
So, in this particular code block, there is no danger of filtering the wrong row-groups. If we were using split_row_groups=True, we could theoretically filter the wrong row-group if the intra-file ordering does not match that of pieces. However, since we are using the builtin sorted operation, the sort should be stable. Therefore, I cannot think of a practical case that the order would differ. My assumption is that the intra-file row-group metada is always generated in the correct order by the writer, and that that order is preserved in "_metadata"... I could certainly be wrong here.
There was a problem hiding this comment.
I think I agree with you. The max of the file for a given column should certainly be the max of the N maxs.
I am not much of a "user," so it is hard for me to say (with confidence) that it is useful. However, I have seen it used before in practice a few times (people dealing with multiple row-groups in very large parquet files). Regardless of how useful it is, the row-group definitely does seem like it should be the smallest "unit" of filtering to me. Whether or not there are multiple row-groups in a file should not affect whether or not filtering is properly supported. |
|
The comment on row-filtering is only pie-in-the-sky, no action for us. Indeed, the filtering would have to happen in the arrow internals anyway. @TomAugspurger , could you have a look at the unrelated pandas csv failure? |
|
@martindurant that looks like #5910 and can probably be ignored. |
I don't know how common it is in practice, but this is actually the case how this all started. I had written such a file (by specifying a row_group_size), which lead me to report the buggy behaviour in #5959. |
|
OK, I am happy with everything here. |
I really appreciate your time on this @martindurant ! It was also awesome to have your eyes on this @jorisvandenbossche ! |
| min_i = stats[-1]["columns"][i]["min"] | ||
| max_i = stats[-1]["columns"][i]["max"] | ||
| stats[-1]["columns"][i]["min"] = min(min_i, min_n) | ||
| stats[-1]["columns"][i]["max"] = min(max_i, max_n) |
There was a problem hiding this comment.
Should this be max instead of min?
There was a problem hiding this comment.
Ah! Thanks for pointing this out!
|
Hey @rjzamora, I'm working with time-series data, which I want to save in a date-partitioned format. Every day I calculate some features a few days ahead and want to save the new calculations in my partitioned data lake in a way that overwrites matching partitions but keeps old partitions. Following the documentation of dask I read about the different engines ('fastparquet', 'parquet' and earlier also 'parquet-dataset') and the possibility to pass **kwargs down to the specific engine. So with this information I assumed that I could pass the following kwarg to the dask function: dd.to_parquet(existing_data_behavior='delete_matching') So after this introduction and deep dive into dask, here come my questions:
Kind regards, P.S.: Sorry for not creating a new issue with this. I thought it might be helpful to link the change that removed the pyarrow.dataset engine from the dask code base to my question. |
Yes - The write API was a pretty late addition to Since the Dask programming model more-or-less assumes that we can/will write out each partition in an embarrassingly-parallel fashion, using a feature like
If your data is already partitioned (in the Dask-DataFrame sense) in such a way that you know data from two distinct Dask partitions will never be written to the same parquet-directory partition, then you may be best-off implementing this logic yourself. For example, I haven't tested this code myself, but I can imagine you doing something like the following: ddf = <your collection>
base_dir = <root of your partitioned dataset>
# Define custom logic to write a partition to disk
def custom_write(df):
# Convert df to an arrow table, and then use pyarrow's dataset API as you wish...
ds.write_dataset(table, base_dir , …, existing_data_behavior=‘delete_matching’)
# Probably simplest to return an empty dataframe to make map_partitions happy
return ddf._meta
# Call/compute map_partitions to write out your data
# using your own custom logic
ddf.map_partitions(custom_write).compute()
I'm not aware of any explicit plans, but I'd be open to it if such an update would be beneficial. |
|
Thank you for your quick reply! I'm new to dask and the entire split-up-dataframe mechanics, but I think I understand enough to see that this behavior might be very complex to implement without running the risk of deleting other dask-dataframe partitions. About my dataset: I know that it is already sorted by date, but I don't know if the underlying partitions are split in between dates or if the same date can exist in multiple partitions. How can I ensure that the second one is not the case? And in general with regards to your last sentence, I think it's always beneficial to stay up-to-date with the latest features and changes of the underlying packages. But I can imagine that it would take quite some effort to implement those changes, especially since writing to parquet is just one of the many functionalities of dask. |
|
@rjzamora I tried with just deleting the existing partitions above my minimum date myself. But now I get an error when trying to load the dataset again, because of course the metadata file is not updated with the removed partitions. |
Mostly addresses #5959 (More work needed for a"_metadata"-based solution)Closes #5959
The currentArrowEngineimplementation is not writing a correct global"_metadata"file, because it is only including metadata for a single parquet partition originating from each dask partition. This PR includes a simple fix for the"_metadata"-creation bug, but still skips the step of populating the "file_path" fields in the metadata. We are skipping this for now, because it is not trivial to back out the full file_path for each metadata element.While the global metadata file now contains the correct number of row-groups in this PR, theread_metadatadefinition must ignore it (mostly) when gathering statistics, because the order of the row-groups is different than the order of thedataset.pieces(which uses anatural_sort_keyordering by default). There are several ways to rearrange the"_metadata"-based row groups to align withdataset.pieces, but the most general solution would be to add the "file_path" fields in the metadata (and simply align the row-group paths with the pieces paths). To make this clean, we may want to add an option to populate the file_path in pyarrow'swrite_to_datasetdefinition... I am still trying to make a decision on the best solution (I'm certainly open to ideas/suggestions).EDIT: This PR now generates/uses a correct "_metadata" file for partitioned datasets in
ArrowEngine(note that this has fixed some pyarrow-fastparquet compatibility issues). To achieve this, the dataset partitioning is now performed in dask rather than pyarrow. The new code added here for partitioning can be removed if/when the pyarrowwrite_to_datasetAPI is modified to return new file paths or to explicitly set the file_path for metadata objects added tometadata_collector.black dask/flake8 dask