Skip to content

Conversation

@westonpace
Copy link
Member

@westonpace westonpace commented Aug 18, 2021

This ended up being a fairly comprehensive overhaul of the existing dataset writer mechanism. This PR relies on #10968 and will remain in draft until that is completed.

Breaking Changes:

  • The dataset writer no longer works with the synchronous scanner. I don't think this would be a huge change but I think the current plan is to hopefully deprecate the synchronous scanner. (ARROW-13338) This required changes in a python/r/ruby which will presumably be reverted when ARROW-13338 is done.
  • The default behavior is now to error if the output directory has any existing data. This can be controlled with existing_data_behavior (see below)
  • Previously a single global counter was used for naming files. This PR changes to a counter per directory. So the following...
/a1/b1/part-0.parquet
/a1/b1/part-2.parquet
/a1/b2/part-1.parquet

...would be impossible. Instead you would receive...

/a1/b1/part-0.parquet
/a1/b1/part-1.parquet
/a1/b2/part-0.parquet

...this does not, however, mean that the resulting data files will be deterministic. If the data in /a1/b1/part-0.parquet and /a1/b1/part-1.parquet originated from two different files being scanned in an unordered fashion then either part could represent either file. A number of test cases in all implementations had to change as the expected paths for dataset writes changed.

  • New features:

    • The dataset writer now works with the async scanner (ARROW-12803)
    • The dataset writer now respects backpressure (closes ARROW-2628?, related to but does not fully solve ARROW-13590 and ARROW-13611) and will stop pulling from the scanner when max_rows_queued (provided as an argument to DatasetWriter) is exceeded. By default max_rows_queued is 64M. This is not an "option" as it I don't think it should be exposed to the user. I think it would be offering too many knobs. I think eventually we may want to wrap up all backpressure into a single configurable setting.
    • FileSystemDatasetWriteOptions now has a max_rows_per_file setting (ARROW-10439).
    • FileSystemDatasetWriteOptions now has a max_open_files setting (ARROW-12321) which prevents opening too many files. Instead the writer will apply backpressure on the scanner while also closing the open file with the greatest # of rows already written (then resume writing once the file is closed).
    • FileSystemDatasetWriteOptions now has a existing_data_behavior setting (ARROW-12358, ARROW-7706) which controls what to do if there is data in the destination.
  • Deferred for future work:

@github-actions
Copy link

Copy link
Member

@jorisvandenbossche jorisvandenbossche left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Cool! (looking forward to those improvements :)) Took a quick look at the explanations of semantics and added some questions.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could it be an option that this does not happen? (each new batch is a new file) As alternative way to control the files instead of max_rows_per_file

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It would be pretty easy to add such an option. I don't know how often it will make sense though as the user doesn't often have a lot of control over the size of the incoming record batches.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I created ARROW-14164

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So this basically allows to "append" a dataset (eg adding new days, if it was partitioned per day), without the risk of overriding existing data?
(seems like a nice option, should it be the default?)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The default is admittedly, a little messy, and will probably delete existing data. When set to true, you will definitely delete existing data, but it will be cleaner. I will add a third option (now that I remember) which is to error out instead of doing anything. This is all based on the conversations in ARROW-12358

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, sorry, I misread :) It's indeed clearly still deleting data with this option. Adding an option to error sounds a good idea indeed.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I added an option to error. My first pass was going to be "raise an error if writing to a partition directory and that directory had files" however that could lead to a situation where we write some data and then encounter an error. So now I raise an error if any files exist in the destination directory at all. Those files may not be in directories that will be written to (if those partitions would not be modified) but there is no way for us to know that ahead of time.

@westonpace westonpace force-pushed the feature/ARROW-13542--c-compute-dataset-add-dataset-writenode-for branch from a7e9978 to 3aa35f2 Compare August 20, 2021 03:53
@westonpace westonpace force-pushed the feature/ARROW-13542--c-compute-dataset-add-dataset-writenode-for branch from dae3596 to d0637b6 Compare August 24, 2021 03:51
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This looks convoluted. Why not a single API which would return a Future?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have cleaned this up and simplified to a single API which returns a Future.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is bizarre IMHO to have a single class managing different independent limits (you may want to write a new batch to an already open file even if the file limit was reached).

Why not separate classes? Also, perhaps give them more meaningful names e.g. RowsLimiter and OpenFilesLimiter.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I split this logic into a generic Throttle class and now have open_files_throttle_ and rows_in_flight_throttle_

@westonpace westonpace force-pushed the feature/ARROW-13542--c-compute-dataset-add-dataset-writenode-for branch from e3fa28e to 5fa47f0 Compare August 27, 2021 21:21
@motybz
Copy link

motybz commented Aug 29, 2021

Hi

In my POV the user should control the in-memory size of the datasets since the reason to separate the data set is to control the memory consumption of distributed data-consuming systems (like Spark, Presto, etc...).
Moreover, the size of the final file of the data set depends on:
number of columns * column type (int8/16/32, string, float... )* number of lines * compression.

I can't see any benefit of controlling the number of the lines (besides the effect on the file size)

@westonpace westonpace force-pushed the feature/ARROW-13542--c-compute-dataset-add-dataset-writenode-for branch from 5fa47f0 to aa2c4aa Compare September 4, 2021 02:43
lidavidm pushed a commit that referenced this pull request Sep 9, 2021
… logic

This PR adds two new utilities.

The first is an asynchronous smart pointer which makes it easier to ensure that asynchronous tasks are finished before an object is destroyed (and generally makes it safe to capture `this`)

The second is an asynchronous task group which collects futures and can help to ensure that all futures are completed.  It is similar to AllComplete except it doesn't require collecting all the futures at once.

Combined, these two things can help give structured concurrency / nursery type control over asynchronous operations.  I have used these utilities in #10955 if you would like to see an example of them in action.

Closes #11084 from westonpace/experiment/async-smart-ptr

Authored-by: Weston Pace <weston.pace@gmail.com>
Signed-off-by: David Li <li.davidm96@gmail.com>
@westonpace westonpace force-pushed the feature/ARROW-13542--c-compute-dataset-add-dataset-writenode-for branch from aa2c4aa to 5b268a1 Compare September 10, 2021 02:36
@westonpace westonpace force-pushed the feature/ARROW-13542--c-compute-dataset-add-dataset-writenode-for branch from 0329017 to 570da68 Compare September 14, 2021 10:25
@westonpace
Copy link
Member Author

CI for MinGW is currently failing due to ARROW-13999
CI for RTools3.5 is sporadically failing and I am still investigating (I think I need to disable dataset writing examples)

@westonpace westonpace marked this pull request as ready for review September 15, 2021 01:39
@westonpace
Copy link
Member Author

I've finished cleaning everything up and this is ready for review.

@westonpace
Copy link
Member Author

@motybz I see your point regarding bytes instead of rows. Typically rows can be used as a proxy for bytes but I agree that a bytes limit is typically more user friendly. I will leave ARROW-10439 open to track that work but will not add a bytes limit as part of this PR in the interest of moving this PR forwards.

@westonpace westonpace force-pushed the feature/ARROW-13542--c-compute-dataset-add-dataset-writenode-for branch from c6b5ee0 to 85bf07b Compare September 21, 2021 00:09
@westonpace
Copy link
Member Author

@pitrou @jorisvandenbossche Gentle ping

Copy link
Member

@lidavidm lidavidm left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I haven't looked through the tests in detail yet, but had some questions about the writer itself.

westonpace and others added 23 commits October 1, 2021 10:33
…te dataset scanners) and fixed python tests to expect new file ordering
…be caused when using custom deleter types in standard containers
Co-authored-by: Joris Van den Bossche <jorisvandenbossche@gmail.com>
Co-authored-by: Neal Richardson <neal.p.richardson@gmail.com>
Co-authored-by: Joris Van den Bossche <jorisvandenbossche@gmail.com>
@westonpace westonpace force-pushed the feature/ARROW-13542--c-compute-dataset-add-dataset-writenode-for branch from 4afb1d7 to f352036 Compare October 1, 2021 20:33
@westonpace westonpace closed this in 543f33a Oct 4, 2021
ViniciusSouzaRoque pushed a commit to s1mbi0se/arrow that referenced this pull request Oct 20, 2021
… logic

This PR adds two new utilities.

The first is an asynchronous smart pointer which makes it easier to ensure that asynchronous tasks are finished before an object is destroyed (and generally makes it safe to capture `this`)

The second is an asynchronous task group which collects futures and can help to ensure that all futures are completed.  It is similar to AllComplete except it doesn't require collecting all the futures at once.

Combined, these two things can help give structured concurrency / nursery type control over asynchronous operations.  I have used these utilities in apache#10955 if you would like to see an example of them in action.

Closes apache#11084 from westonpace/experiment/async-smart-ptr

Authored-by: Weston Pace <weston.pace@gmail.com>
Signed-off-by: David Li <li.davidm96@gmail.com>
ViniciusSouzaRoque pushed a commit to s1mbi0se/arrow that referenced this pull request Oct 20, 2021
…r logic

This ended up being a fairly comprehensive overhaul of the existing dataset writer mechanism.  ~~This PR relies on apache#10968 and will remain in draft until that is completed.~~

Breaking Changes:

 * The dataset writer no longer works with the synchronous scanner.  I don't think this would be a huge change but I think the current plan is to hopefully deprecate the synchronous scanner. (ARROW-13338)  This required changes in a python/r/ruby which will presumably be reverted when ARROW-13338 is done.
 * The default behavior is now to error if the output directory has any existing data.  This can be controlled with `existing_data_behavior` (see below)
 * Previously a single global counter was used for naming files.  This PR changes to a counter per directory.  So the following...
 ```
 /a1/b1/part-0.parquet
 /a1/b1/part-2.parquet
 /a1/b2/part-1.parquet
 ```
...would be impossible.  Instead you would receive...
```
/a1/b1/part-0.parquet
/a1/b1/part-1.parquet
/a1/b2/part-0.parquet
```
...this does not, however, mean that the resulting data files will be deterministic.  If the data in `/a1/b1/part-0.parquet` and `/a1/b1/part-1.parquet` originated from two different files being scanned in an unordered fashion then either part could represent either file.  A number of test cases in all implementations had to change as the expected paths for dataset writes changed.

* New features:
  * The dataset writer now works with the async scanner (ARROW-12803)
  * The dataset writer now respects backpressure (closes ARROW-2628?, related to but does not fully solve ARROW-13590 and ARROW-13611) and will stop pulling from the scanner when `max_rows_queued` (provided as an argument to `DatasetWriter`) is exceeded.  By default `max_rows_queued` is 64M.  This is not an "option" as it I don't think it should be exposed to the user.  I think it would be offering too many knobs.  I think eventually we may want to wrap up all backpressure into a single configurable setting.
  * `FileSystemDatasetWriteOptions` now has a `max_rows_per_file` setting (ARROW-10439).
  * `FileSystemDatasetWriteOptions` now has a `max_open_files` setting (ARROW-12321) which prevents opening too many files.  Instead the writer will apply backpressure on the scanner while also closing the open file with the greatest # of rows already written (then resume writing once the file is closed).
  * `FileSystemDatasetWriteOptions` now has a `existing_data_behavior` setting (ARROW-12358, ARROW-7706) which controls what to do if there is data in the destination.

 * Deferred for future work:
   * Add the new options to the python/R APIs (ARROW-13703)
   * Limiting based on file size (ARROW-10439)
   * More fine grained error control (ARROW-14175)

Closes apache#10955 from westonpace/feature/ARROW-13542--c-compute-dataset-add-dataset-writenode-for

Authored-by: Weston Pace <weston.pace@gmail.com>
Signed-off-by: Weston Pace <weston.pace@gmail.com>
@westonpace westonpace deleted the feature/ARROW-13542--c-compute-dataset-add-dataset-writenode-for branch January 6, 2022 08:15
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

6 participants