-
Notifications
You must be signed in to change notification settings - Fork 4k
ARROW-13650: [C++] Create dataset writer to encapsulate dataset writer logic #10955
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
ARROW-13650: [C++] Create dataset writer to encapsulate dataset writer logic #10955
Conversation
jorisvandenbossche
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Cool! (looking forward to those improvements :)) Took a quick look at the explanations of semantics and added some questions.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could it be an option that this does not happen? (each new batch is a new file) As alternative way to control the files instead of max_rows_per_file
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It would be pretty easy to add such an option. I don't know how often it will make sense though as the user doesn't often have a lot of control over the size of the incoming record batches.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I created ARROW-14164
cpp/src/arrow/dataset/file_base.h
Outdated
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So this basically allows to "append" a dataset (eg adding new days, if it was partitioned per day), without the risk of overriding existing data?
(seems like a nice option, should it be the default?)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The default is admittedly, a little messy, and will probably delete existing data. When set to true, you will definitely delete existing data, but it will be cleaner. I will add a third option (now that I remember) which is to error out instead of doing anything. This is all based on the conversations in ARROW-12358
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah, sorry, I misread :) It's indeed clearly still deleting data with this option. Adding an option to error sounds a good idea indeed.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I added an option to error. My first pass was going to be "raise an error if writing to a partition directory and that directory had files" however that could lead to a situation where we write some data and then encounter an error. So now I raise an error if any files exist in the destination directory at all. Those files may not be in directories that will be written to (if those partitions would not be modified) but there is no way for us to know that ahead of time.
a7e9978 to
3aa35f2
Compare
dae3596 to
d0637b6
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This looks convoluted. Why not a single API which would return a Future?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I have cleaned this up and simplified to a single API which returns a Future.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It is bizarre IMHO to have a single class managing different independent limits (you may want to write a new batch to an already open file even if the file limit was reached).
Why not separate classes? Also, perhaps give them more meaningful names e.g. RowsLimiter and OpenFilesLimiter.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I split this logic into a generic Throttle class and now have open_files_throttle_ and rows_in_flight_throttle_
e3fa28e to
5fa47f0
Compare
|
Hi In my POV the user should control the in-memory size of the datasets since the reason to separate the data set is to control the memory consumption of distributed data-consuming systems (like Spark, Presto, etc...). I can't see any benefit of controlling the number of the lines (besides the effect on the file size) |
5fa47f0 to
aa2c4aa
Compare
… logic This PR adds two new utilities. The first is an asynchronous smart pointer which makes it easier to ensure that asynchronous tasks are finished before an object is destroyed (and generally makes it safe to capture `this`) The second is an asynchronous task group which collects futures and can help to ensure that all futures are completed. It is similar to AllComplete except it doesn't require collecting all the futures at once. Combined, these two things can help give structured concurrency / nursery type control over asynchronous operations. I have used these utilities in #10955 if you would like to see an example of them in action. Closes #11084 from westonpace/experiment/async-smart-ptr Authored-by: Weston Pace <weston.pace@gmail.com> Signed-off-by: David Li <li.davidm96@gmail.com>
aa2c4aa to
5b268a1
Compare
0329017 to
570da68
Compare
|
CI for MinGW is currently failing due to ARROW-13999 |
|
I've finished cleaning everything up and this is ready for review. |
|
@motybz I see your point regarding bytes instead of rows. Typically rows can be used as a proxy for bytes but I agree that a bytes limit is typically more user friendly. I will leave ARROW-10439 open to track that work but will not add a bytes limit as part of this PR in the interest of moving this PR forwards. |
c6b5ee0 to
85bf07b
Compare
|
@pitrou @jorisvandenbossche Gentle ping |
lidavidm
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I haven't looked through the tests in detail yet, but had some questions about the writer itself.
…te dataset scanners) and fixed python tests to expect new file ordering
…sync scanner by default
…ed num iterations to speed up CI
…ture.MaxOpenFiles
…be caused when using custom deleter types in standard containers
… sync scanner - write dataset
Co-authored-by: Joris Van den Bossche <jorisvandenbossche@gmail.com> Co-authored-by: Neal Richardson <neal.p.richardson@gmail.com>
Co-authored-by: Joris Van den Bossche <jorisvandenbossche@gmail.com>
4afb1d7 to
f352036
Compare
… logic This PR adds two new utilities. The first is an asynchronous smart pointer which makes it easier to ensure that asynchronous tasks are finished before an object is destroyed (and generally makes it safe to capture `this`) The second is an asynchronous task group which collects futures and can help to ensure that all futures are completed. It is similar to AllComplete except it doesn't require collecting all the futures at once. Combined, these two things can help give structured concurrency / nursery type control over asynchronous operations. I have used these utilities in apache#10955 if you would like to see an example of them in action. Closes apache#11084 from westonpace/experiment/async-smart-ptr Authored-by: Weston Pace <weston.pace@gmail.com> Signed-off-by: David Li <li.davidm96@gmail.com>
…r logic This ended up being a fairly comprehensive overhaul of the existing dataset writer mechanism. ~~This PR relies on apache#10968 and will remain in draft until that is completed.~~ Breaking Changes: * The dataset writer no longer works with the synchronous scanner. I don't think this would be a huge change but I think the current plan is to hopefully deprecate the synchronous scanner. (ARROW-13338) This required changes in a python/r/ruby which will presumably be reverted when ARROW-13338 is done. * The default behavior is now to error if the output directory has any existing data. This can be controlled with `existing_data_behavior` (see below) * Previously a single global counter was used for naming files. This PR changes to a counter per directory. So the following... ``` /a1/b1/part-0.parquet /a1/b1/part-2.parquet /a1/b2/part-1.parquet ``` ...would be impossible. Instead you would receive... ``` /a1/b1/part-0.parquet /a1/b1/part-1.parquet /a1/b2/part-0.parquet ``` ...this does not, however, mean that the resulting data files will be deterministic. If the data in `/a1/b1/part-0.parquet` and `/a1/b1/part-1.parquet` originated from two different files being scanned in an unordered fashion then either part could represent either file. A number of test cases in all implementations had to change as the expected paths for dataset writes changed. * New features: * The dataset writer now works with the async scanner (ARROW-12803) * The dataset writer now respects backpressure (closes ARROW-2628?, related to but does not fully solve ARROW-13590 and ARROW-13611) and will stop pulling from the scanner when `max_rows_queued` (provided as an argument to `DatasetWriter`) is exceeded. By default `max_rows_queued` is 64M. This is not an "option" as it I don't think it should be exposed to the user. I think it would be offering too many knobs. I think eventually we may want to wrap up all backpressure into a single configurable setting. * `FileSystemDatasetWriteOptions` now has a `max_rows_per_file` setting (ARROW-10439). * `FileSystemDatasetWriteOptions` now has a `max_open_files` setting (ARROW-12321) which prevents opening too many files. Instead the writer will apply backpressure on the scanner while also closing the open file with the greatest # of rows already written (then resume writing once the file is closed). * `FileSystemDatasetWriteOptions` now has a `existing_data_behavior` setting (ARROW-12358, ARROW-7706) which controls what to do if there is data in the destination. * Deferred for future work: * Add the new options to the python/R APIs (ARROW-13703) * Limiting based on file size (ARROW-10439) * More fine grained error control (ARROW-14175) Closes apache#10955 from westonpace/feature/ARROW-13542--c-compute-dataset-add-dataset-writenode-for Authored-by: Weston Pace <weston.pace@gmail.com> Signed-off-by: Weston Pace <weston.pace@gmail.com>
This ended up being a fairly comprehensive overhaul of the existing dataset writer mechanism.
This PR relies on #10968 and will remain in draft until that is completed.Breaking Changes:
existing_data_behavior(see below)...would be impossible. Instead you would receive...
...this does not, however, mean that the resulting data files will be deterministic. If the data in
/a1/b1/part-0.parquetand/a1/b1/part-1.parquetoriginated from two different files being scanned in an unordered fashion then either part could represent either file. A number of test cases in all implementations had to change as the expected paths for dataset writes changed.New features:
max_rows_queued(provided as an argument toDatasetWriter) is exceeded. By defaultmax_rows_queuedis 64M. This is not an "option" as it I don't think it should be exposed to the user. I think it would be offering too many knobs. I think eventually we may want to wrap up all backpressure into a single configurable setting.FileSystemDatasetWriteOptionsnow has amax_rows_per_filesetting (ARROW-10439).FileSystemDatasetWriteOptionsnow has amax_open_filessetting (ARROW-12321) which prevents opening too many files. Instead the writer will apply backpressure on the scanner while also closing the open file with the greatest # of rows already written (then resume writing once the file is closed).FileSystemDatasetWriteOptionsnow has aexisting_data_behaviorsetting (ARROW-12358, ARROW-7706) which controls what to do if there is data in the destination.Deferred for future work: