-
Notifications
You must be signed in to change notification settings - Fork 4k
ARROW-13680: [C++] Create an asynchronous nursery to simplify capture logic #10968
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
ARROW-13680: [C++] Create an asynchronous nursery to simplify capture logic #10968
Conversation
lidavidm
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As mentioned before, I'm supportive of the idea. It would be good to try to consolidate some of these abstractions (TaskGroup is another) and I think it would be good to study prior art more seriously if/when we build this out more (though perhaps we won't need anything more sophisticated).
TaskGroup is the closest to this abstraction IMO. There's not that many uses of it in the codebase in the first place, either.
…rent then the parent should stay alive until the child is finished, not the other way around.
…ing for a future to complete and then deleting the future (which still has callbacks to run that haven't yet been captured
cpp/src/arrow/util/async_nursery.cc
Outdated
| // Lazily create the future to save effort if we don't need it | ||
| if (!on_closed_.is_valid()) { | ||
| on_closed_ = Future<>::Make(); | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm a bit worried about this being race-prone. So far it looks like it's only used in the one constructor, perhaps it could be inlined there?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think you're right about the danger. It is used elsewhere at the moment in the dataset writer PR (https://github.com/apache/arrow/pull/10955/files#diff-387ad04c2450a38044e667e07183b8265866cb3736d10acdce137c2b83737b16R345-R346). I use it decrement the number of open writers when the file has finished writing. So I just changed it so we always create the future. It was a bit of a premature optimization anyways.
|
I've cleaned up the todos and addressed PR comments. |
lidavidm
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks good to me.
|
I'm trying to understand the ergonomics of this API. Did you try to use this in the codebase to check that it actually reduces the burden of managing the sequencing of destructor calls? |
| class ARROW_EXPORT Nursery { | ||
| public: | ||
| template <typename T, typename... Args> | ||
| typename std::enable_if<!std::is_array<T>::value, std::shared_ptr<T>>::type |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The enable_if doesn't seem useful here (especially as you have a static_assert below that would catch arrays)
| public: | ||
| template <typename T, typename... Args> | ||
| typename std::enable_if<!std::is_array<T>::value, std::shared_ptr<T>>::type | ||
| MakeSharedCloseable(Args&&... args) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you add a docstring explaining what it does?
| friend struct DestroyingDeleter; | ||
| }; | ||
|
|
||
| class ARROW_EXPORT Nursery { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you add a docstring explaining what this is/does?
|
|
||
| template <typename T, typename... Args> | ||
| typename std::enable_if<!std::is_array<T>::value, | ||
| std::unique_ptr<T, DestroyingDeleter<T>>>::type |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Same remarks here.
| on_closed_.MarkFinished(st); | ||
| } | ||
| nursery_->OnTaskFinished(st); | ||
| delete this; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ok, so this mandates that this object is heap-allocated using the default C++ allocator, right? Can you mention this somewhere in the docstring?
|
|
||
| #pragma once | ||
|
|
||
| #include <list> |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This doesn't seem used?
| finish_fut = DoClose(); | ||
| } | ||
| } else { | ||
| // No dependent tasks were added |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hmm... is it possible for dependent tasks to be added after this?
| bool* destroyed_; | ||
| }; | ||
|
|
||
| class EvictsChild : public AsyncCloseable { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Add a comment explaining what this does/exercises?
| child_future, final_future); | ||
| evicts_child->EvictChild(); | ||
| // Owner no longer has reference to child here but it's kept alive by nursery | ||
| // because it isn't done |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't understand this comment, because by reading the source code, I get the impression that the nursery doesn't keep anything alive (it does not have a container of tasks or futures).
| /// Subclasses should override this and perform any cleanup. Once the future returned | ||
| /// by this method finishes then this object is eligible for destruction and any | ||
| /// reference to `this` will be invalid | ||
| virtual Future<> DoClose() = 0; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
API ergonomics question: since this is the single point of customization, would it be easier if AsyncCloseable took a std::function<Future<>> close_func parameter, instead of having to write a subclass?
There's two concerns here. 1) Keeping the object alive (the nursery does not do this, but the smart pointers do) and 2) Not returning until all tasks have finished (the nursery does this because it blocks until every task has finished).
Yes, the nursery could very much trigger deadlock if a task never finishes. I'll call back to this later.
I also received some negative feedback offline from @bkietz and so I tried today to rewrite this in another light, using only the asynchronous smart pointers and an asynchronous task group. The result is here. However, it still doesn't quite solve the problem. So, let me try and state the problem I am trying to solve in clear terms and I am open to any solution someone can come up with but at the moment this PR is still my preferred solution. Problem StatementThe use case here is from the perspective of a developer trying to write some code and they want to ensure the code they write is used safely. For example, let's consider the case of writing a file writer queue. A synchronous declaration might look something like this... Now, let's pretend our implementation creates a dedicated writer thread and every call to QueueBatch adds the batch to a producer consumer queue that the writer thread drains. Our destructor would then look like this... Now let's look at how this is used... We are ensured several things:
My goal is to allow those same guarantees to exist if the "finishing work" (the stuff in the destructor) is a future. For example, the asynchronous analogue of above might be... Now our destructor can't do anything. It can't block on Finish() because that would defeat the purpose of being asynchronous. Someone naively using this class might do... This works ok until a call to QueueBatch returns an error status halfway through the write. Then the code will bail out and both With the nursery you can write... ...and now you can be assured that an error occurring during QueueBatch will not cause a segmentation fault because the nursery will still block until any outstanding captures have resolved.
Yes, if there is a bug, but, the risk is no greater than what you have with... |
Why can't FileWriteOptions be a
|
|
Ok, I took a week and did other things and, coming back, I've decided not to pursue this PR in this form. Instead I'll push the async smart ptrs + an async task group as its own thing. I appreciate all the feedback as it has helped guide me tremendously in coming up with the best solution. |
…r logic This ended up being a fairly comprehensive overhaul of the existing dataset writer mechanism. ~~This PR relies on #10968 and will remain in draft until that is completed.~~ Breaking Changes: * The dataset writer no longer works with the synchronous scanner. I don't think this would be a huge change but I think the current plan is to hopefully deprecate the synchronous scanner. (ARROW-13338) This required changes in a python/r/ruby which will presumably be reverted when ARROW-13338 is done. * The default behavior is now to error if the output directory has any existing data. This can be controlled with `existing_data_behavior` (see below) * Previously a single global counter was used for naming files. This PR changes to a counter per directory. So the following... ``` /a1/b1/part-0.parquet /a1/b1/part-2.parquet /a1/b2/part-1.parquet ``` ...would be impossible. Instead you would receive... ``` /a1/b1/part-0.parquet /a1/b1/part-1.parquet /a1/b2/part-0.parquet ``` ...this does not, however, mean that the resulting data files will be deterministic. If the data in `/a1/b1/part-0.parquet` and `/a1/b1/part-1.parquet` originated from two different files being scanned in an unordered fashion then either part could represent either file. A number of test cases in all implementations had to change as the expected paths for dataset writes changed. * New features: * The dataset writer now works with the async scanner (ARROW-12803) * The dataset writer now respects backpressure (closes ARROW-2628?, related to but does not fully solve ARROW-13590 and ARROW-13611) and will stop pulling from the scanner when `max_rows_queued` (provided as an argument to `DatasetWriter`) is exceeded. By default `max_rows_queued` is 64M. This is not an "option" as it I don't think it should be exposed to the user. I think it would be offering too many knobs. I think eventually we may want to wrap up all backpressure into a single configurable setting. * `FileSystemDatasetWriteOptions` now has a `max_rows_per_file` setting (ARROW-10439). * `FileSystemDatasetWriteOptions` now has a `max_open_files` setting (ARROW-12321) which prevents opening too many files. Instead the writer will apply backpressure on the scanner while also closing the open file with the greatest # of rows already written (then resume writing once the file is closed). * `FileSystemDatasetWriteOptions` now has a `existing_data_behavior` setting (ARROW-12358, ARROW-7706) which controls what to do if there is data in the destination. * Deferred for future work: * Add the new options to the python/R APIs (ARROW-13703) * Limiting based on file size (ARROW-10439) * More fine grained error control (ARROW-14175) Closes #10955 from westonpace/feature/ARROW-13542--c-compute-dataset-add-dataset-writenode-for Authored-by: Weston Pace <weston.pace@gmail.com> Signed-off-by: Weston Pace <weston.pace@gmail.com>
…r logic This ended up being a fairly comprehensive overhaul of the existing dataset writer mechanism. ~~This PR relies on apache#10968 and will remain in draft until that is completed.~~ Breaking Changes: * The dataset writer no longer works with the synchronous scanner. I don't think this would be a huge change but I think the current plan is to hopefully deprecate the synchronous scanner. (ARROW-13338) This required changes in a python/r/ruby which will presumably be reverted when ARROW-13338 is done. * The default behavior is now to error if the output directory has any existing data. This can be controlled with `existing_data_behavior` (see below) * Previously a single global counter was used for naming files. This PR changes to a counter per directory. So the following... ``` /a1/b1/part-0.parquet /a1/b1/part-2.parquet /a1/b2/part-1.parquet ``` ...would be impossible. Instead you would receive... ``` /a1/b1/part-0.parquet /a1/b1/part-1.parquet /a1/b2/part-0.parquet ``` ...this does not, however, mean that the resulting data files will be deterministic. If the data in `/a1/b1/part-0.parquet` and `/a1/b1/part-1.parquet` originated from two different files being scanned in an unordered fashion then either part could represent either file. A number of test cases in all implementations had to change as the expected paths for dataset writes changed. * New features: * The dataset writer now works with the async scanner (ARROW-12803) * The dataset writer now respects backpressure (closes ARROW-2628?, related to but does not fully solve ARROW-13590 and ARROW-13611) and will stop pulling from the scanner when `max_rows_queued` (provided as an argument to `DatasetWriter`) is exceeded. By default `max_rows_queued` is 64M. This is not an "option" as it I don't think it should be exposed to the user. I think it would be offering too many knobs. I think eventually we may want to wrap up all backpressure into a single configurable setting. * `FileSystemDatasetWriteOptions` now has a `max_rows_per_file` setting (ARROW-10439). * `FileSystemDatasetWriteOptions` now has a `max_open_files` setting (ARROW-12321) which prevents opening too many files. Instead the writer will apply backpressure on the scanner while also closing the open file with the greatest # of rows already written (then resume writing once the file is closed). * `FileSystemDatasetWriteOptions` now has a `existing_data_behavior` setting (ARROW-12358, ARROW-7706) which controls what to do if there is data in the destination. * Deferred for future work: * Add the new options to the python/R APIs (ARROW-13703) * Limiting based on file size (ARROW-10439) * More fine grained error control (ARROW-14175) Closes apache#10955 from westonpace/feature/ARROW-13542--c-compute-dataset-add-dataset-writenode-for Authored-by: Weston Pace <weston.pace@gmail.com> Signed-off-by: Weston Pace <weston.pace@gmail.com>
This PR aims to introduce some structured concurrency utilities for working with futures. The async nursery contains a few utilities for managing the lifetimes of objects to avoid constantly worrying about capturing shared_ptr's to maintain lifetimes.
A nursery is created with a lambda and everything that runs in that lambda runs "in the nursery". Any object (that has a reference to the nursery) can add a future to the nursery as a dependent task. The nursery will not finish until all of those futures are finished. This means that any objects created outside the nursery (e.g. typically things like options objects, request objects, etc.) will remain valid and can be captured by reference.
Objects that spawn callbacks which need references to
thiscan extendAsyncCloseableand overrideDoClose(which returns aFuture). These objects will not be deleted until the future returned byDoClosehas been completed. In addition, objects extendingAsyncCloseablecan add futures as dependent tasks and the object will be kept alive until those futures complete. In order for this to work these objects must be created by the nursery usingMakeSharedCloseableorMakeUniqueCloseable. Any object created in this way will keep the nursery alive until the object'sDoCloseand any of the object's dependent tasks have finished.Objects that extend
AsyncCloseablehave aOnClosedfuture which will only be completed when the object is destroyed. This facilitates parent/child relationships. If a parent needs to stay alive until a child has completed all of its work (often because the child has callbacks referencing parent state or the parent needs to perform some final cleanup) then the parent can add the child'sOnClosedfuture as a dependent task.Cons:
NurseryPimplhelps hereToDo:
RunInNurseryandRunInNurserySt)