Skip to content

Conversation

@westonpace
Copy link
Member

@westonpace westonpace commented Aug 20, 2021

This PR aims to introduce some structured concurrency utilities for working with futures. The async nursery contains a few utilities for managing the lifetimes of objects to avoid constantly worrying about capturing shared_ptr's to maintain lifetimes.

A nursery is created with a lambda and everything that runs in that lambda runs "in the nursery". Any object (that has a reference to the nursery) can add a future to the nursery as a dependent task. The nursery will not finish until all of those futures are finished. This means that any objects created outside the nursery (e.g. typically things like options objects, request objects, etc.) will remain valid and can be captured by reference.

Objects that spawn callbacks which need references to this can extend AsyncCloseable and override DoClose (which returns a Future). These objects will not be deleted until the future returned by DoClose has been completed. In addition, objects extending AsyncCloseable can add futures as dependent tasks and the object will be kept alive until those futures complete. In order for this to work these objects must be created by the nursery using MakeSharedCloseable or MakeUniqueCloseable. Any object created in this way will keep the nursery alive until the object's DoClose and any of the object's dependent tasks have finished.

Objects that extend AsyncCloseable have a OnClosed future which will only be completed when the object is destroyed. This facilitates parent/child relationships. If a parent needs to stay alive until a child has completed all of its work (often because the child has callbacks referencing parent state or the parent needs to perform some final cleanup) then the parent can add the child's OnClosed future as a dependent task.

Cons:

  • It's a bit tricky to get it working with the pimpl pattern but NurseryPimpl helps here
  • There are a lot of "nursery-like" objects starting to pop up (IOContext, ExecContext, cancellation tokens, ...) and we may want to consolidate at some point

ToDo:

  • Fix up some of the APIs with some template hackery (e.g. merging RunInNursery and RunInNurserySt)

@github-actions
Copy link

@westonpace
Copy link
Member Author

@pitrou @bkietz @lidavidm curious to get your thoughts on the general idea. I think I still have at least one good overhaul (hopefully getting rid of the base classes) but so far it has been very helpful keeping the dataset writer logic clean.

Copy link
Member

@lidavidm lidavidm left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As mentioned before, I'm supportive of the idea. It would be good to try to consolidate some of these abstractions (TaskGroup is another) and I think it would be good to study prior art more seriously if/when we build this out more (though perhaps we won't need anything more sophisticated).

TaskGroup is the closest to this abstraction IMO. There's not that many uses of it in the codebase in the first place, either.

…ing for a future to complete and then deleting the future (which still has callbacks to run that haven't yet been captured
// Lazily create the future to save effort if we don't need it
if (!on_closed_.is_valid()) {
on_closed_ = Future<>::Make();
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm a bit worried about this being race-prone. So far it looks like it's only used in the one constructor, perhaps it could be inlined there?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think you're right about the danger. It is used elsewhere at the moment in the dataset writer PR (https://github.com/apache/arrow/pull/10955/files#diff-387ad04c2450a38044e667e07183b8265866cb3736d10acdce137c2b83737b16R345-R346). I use it decrement the number of open writers when the file has finished writing. So I just changed it so we always create the future. It was a bit of a premature optimization anyways.

@westonpace
Copy link
Member Author

I've cleaned up the todos and addressed PR comments.

Copy link
Member

@lidavidm lidavidm left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks good to me.

@pitrou
Copy link
Member

pitrou commented Aug 30, 2021

I'm trying to understand the ergonomics of this API.
If I understand correctly, nothing here actually keeps the tasks alive. What this relies upon is that the user stored the shared_ptr or unique_ptr returned by Nursery::MakeXXXCloseable somewhere so that lifetimes are handled correctly?
Is there a risk that these pointers may be kept alive too long (and delay the DoClose calls accordingly)? What is the recommended strategy for using this facility?

Did you try to use this in the codebase to check that it actually reduces the burden of managing the sequencing of destructor calls?

class ARROW_EXPORT Nursery {
public:
template <typename T, typename... Args>
typename std::enable_if<!std::is_array<T>::value, std::shared_ptr<T>>::type
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The enable_if doesn't seem useful here (especially as you have a static_assert below that would catch arrays)

public:
template <typename T, typename... Args>
typename std::enable_if<!std::is_array<T>::value, std::shared_ptr<T>>::type
MakeSharedCloseable(Args&&... args) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you add a docstring explaining what it does?

friend struct DestroyingDeleter;
};

class ARROW_EXPORT Nursery {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you add a docstring explaining what this is/does?


template <typename T, typename... Args>
typename std::enable_if<!std::is_array<T>::value,
std::unique_ptr<T, DestroyingDeleter<T>>>::type
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same remarks here.

on_closed_.MarkFinished(st);
}
nursery_->OnTaskFinished(st);
delete this;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok, so this mandates that this object is heap-allocated using the default C++ allocator, right? Can you mention this somewhere in the docstring?


#pragma once

#include <list>
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This doesn't seem used?

finish_fut = DoClose();
}
} else {
// No dependent tasks were added
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm... is it possible for dependent tasks to be added after this?

bool* destroyed_;
};

class EvictsChild : public AsyncCloseable {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add a comment explaining what this does/exercises?

child_future, final_future);
evicts_child->EvictChild();
// Owner no longer has reference to child here but it's kept alive by nursery
// because it isn't done
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't understand this comment, because by reading the source code, I get the impression that the nursery doesn't keep anything alive (it does not have a container of tasks or futures).

/// Subclasses should override this and perform any cleanup. Once the future returned
/// by this method finishes then this object is eligible for destruction and any
/// reference to `this` will be invalid
virtual Future<> DoClose() = 0;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

API ergonomics question: since this is the single point of customization, would it be easier if AsyncCloseable took a std::function<Future<>> close_func parameter, instead of having to write a subclass?

@westonpace
Copy link
Member Author

westonpace commented Aug 31, 2021

If I understand correctly, nothing here actually keeps the tasks alive. What this relies upon is that the user stored the shared_ptr or unique_ptr returned by Nursery::MakeXXXCloseable somewhere so that lifetimes are handled correctly?

There's two concerns here. 1) Keeping the object alive (the nursery does not do this, but the smart pointers do) and 2) Not returning until all tasks have finished (the nursery does this because it blocks until every task has finished).

Is there a risk that these pointers may be kept alive too long (and delay the DoClose calls accordingly)?

Yes, the nursery could very much trigger deadlock if a task never finishes. I'll call back to this later.

What is the recommended strategy for using this facility?
Did you try to use this in the codebase to check that it actually reduces the burden of managing the sequencing of destructor calls?

See #11017
@pitrou


I also received some negative feedback offline from @bkietz and so I tried today to rewrite this in another light, using only the asynchronous smart pointers and an asynchronous task group. The result is here. However, it still doesn't quite solve the problem. So, let me try and state the problem I am trying to solve in clear terms and I am open to any solution someone can come up with but at the moment this PR is still my preferred solution.

Problem Statement

The use case here is from the perspective of a developer trying to write some code and they want to ensure the code they write is used safely. For example, let's consider the case of writing a file writer queue. A synchronous declaration might look something like this...

class FileWriterQueue {
public:
  FileWriterQueue(std::unique_ptr<FileWriter> writer, const FileWriteOptions& options);
  ~FileWriterQueue();
  Status QueueBatch(std::shared_ptr<RecordBatch> batch);
  void Finish();
private:
  std::unique_ptr<FileWriter> writer;
  const FileWriteOptions& options;
};

Now, let's pretend our implementation creates a dedicated writer thread and every call to QueueBatch adds the batch to a producer consumer queue that the writer thread drains. Our destructor would then look like this...

FileWriterQueue::~FileWriterQueue() {
  EnsureFinished();
  writer_thread.join();
}

Now let's look at how this is used...

void WriteBatches(std::vector<std::shared_ptr<RecordBatch>> batches, const FileWriterOptions& options) {
  std::unique_ptr<FileWriter> file_writer = OpenWriter();
  FileWriterQueue file_writer_queue(std::move(file_writer));
  for (const auto& batch : batches) {
    ARROW_RETURN_NOT_OK(file_writer_queue.QueueBatch(batch));
  }
  file_writer_queue.Finish(); // I could also skip this and just rely on the EnsureFinished in the destructor
}

We are ensured several things:

  1. The FileWriterQueue will not be deleted until the thread is joined
  2. The options remain valid until the thread is joined
  3. All work is completely done when WriteBatches returns

My goal is to allow those same guarantees to exist if the "finishing work" (the stuff in the destructor) is a future. For example, the asynchronous analogue of above might be...

class FileWriterQueue {
public:
  FileWriterQueue(std::unique_ptr<FileWriter> writer, const FileWriteOptions& options);
  ~FileWriterQueue();
  void QueueBatch(std::shared_ptr<RecordBatch> batch);
  Future<> Finish();
private:
  std::unique_ptr<FileWriter> writer;
  const FileWriteOptions& options;
};

Now our destructor can't do anything. It can't block on Finish() because that would defeat the purpose of being asynchronous. Someone naively using this class might do...

void WriteBatches(std::vector<std::shared_ptr<RecordBatch>> batches, const FileWriterOptions& options) {
  std::unique_ptr<FileWriter> file_writer = OpenWriter();
  FileWriterQueue file_writer_queue(std::move(file_writer));
  for (const auto& batch : batches) {
    ARROW_RETURN_NOT_OK(file_writer_queue.QueueBatch(batch));
  }
  return file_writer_queue.Finish().status();
}

This works ok until a call to QueueBatch returns an error status halfway through the write. Then the code will bail out and both file_writer_queue and options will go out of scope. This means if there are any leftover captures and they get executed they will segfault.

With the nursery you can write...

void WriteBatches(std::vector<std::shared_ptr<RecordBatch>> batches, const FileWriterOptions& options) {
  return util::RunInNursery([] (Nursery* nursery) {
    std::unique_ptr<FileWriter> file_writer = OpenWriter();
    FileWriterQueue file_writer_queue(nursery, std::move(file_writer));
    for (const auto& batch : batches) {
      ARROW_RETURN_NOT_OK(file_writer_queue.QueueBatch(batch));
    }
    return file_writer_queue.status();
  }
}

...and now you can be assured that an error occurring during QueueBatch will not cause a segmentation fault because the nursery will still block until any outstanding captures have resolved.

Is there a risk that these pointers may be kept alive too long (and delay the DoClose calls accordingly)?

Yes, if there is a bug, but, the risk is no greater than what you have with...

FileWriterQueue::~FileWriterQueue() {
  EnsureFinished();
  writer_thread.join(); // Could deadlock / delay for a long time if there is a bug
}

@westonpace
Copy link
Member Author

Why can't FileWriteOptions be a shared_ptr?

It can, and if we force everything to be shared_ptr then we just have to keep this alive and an asynchronous smart pointer would be sufficient but there are still lots of things like ExecContext, Executor, IOContext, etc. which we often pass by pointer and can go out of scope when the operation finishes.

Plus, it still seems disingenuous to return from a high-level function/operation when cleanup work is still outstanding.

@westonpace
Copy link
Member Author

Ok, I took a week and did other things and, coming back, I've decided not to pursue this PR in this form. Instead I'll push the async smart ptrs + an async task group as its own thing. I appreciate all the feedback as it has helped guide me tremendously in coming up with the best solution.

@westonpace westonpace closed this Sep 4, 2021
westonpace added a commit that referenced this pull request Oct 4, 2021
…r logic

This ended up being a fairly comprehensive overhaul of the existing dataset writer mechanism.  ~~This PR relies on #10968 and will remain in draft until that is completed.~~

Breaking Changes:

 * The dataset writer no longer works with the synchronous scanner.  I don't think this would be a huge change but I think the current plan is to hopefully deprecate the synchronous scanner. (ARROW-13338)  This required changes in a python/r/ruby which will presumably be reverted when ARROW-13338 is done.
 * The default behavior is now to error if the output directory has any existing data.  This can be controlled with `existing_data_behavior` (see below)
 * Previously a single global counter was used for naming files.  This PR changes to a counter per directory.  So the following...
 ```
 /a1/b1/part-0.parquet
 /a1/b1/part-2.parquet
 /a1/b2/part-1.parquet
 ```
...would be impossible.  Instead you would receive...
```
/a1/b1/part-0.parquet
/a1/b1/part-1.parquet
/a1/b2/part-0.parquet
```
...this does not, however, mean that the resulting data files will be deterministic.  If the data in `/a1/b1/part-0.parquet` and `/a1/b1/part-1.parquet` originated from two different files being scanned in an unordered fashion then either part could represent either file.  A number of test cases in all implementations had to change as the expected paths for dataset writes changed.

* New features:
  * The dataset writer now works with the async scanner (ARROW-12803)
  * The dataset writer now respects backpressure (closes ARROW-2628?, related to but does not fully solve ARROW-13590 and ARROW-13611) and will stop pulling from the scanner when `max_rows_queued` (provided as an argument to `DatasetWriter`) is exceeded.  By default `max_rows_queued` is 64M.  This is not an "option" as it I don't think it should be exposed to the user.  I think it would be offering too many knobs.  I think eventually we may want to wrap up all backpressure into a single configurable setting.
  * `FileSystemDatasetWriteOptions` now has a `max_rows_per_file` setting (ARROW-10439).
  * `FileSystemDatasetWriteOptions` now has a `max_open_files` setting (ARROW-12321) which prevents opening too many files.  Instead the writer will apply backpressure on the scanner while also closing the open file with the greatest # of rows already written (then resume writing once the file is closed).
  * `FileSystemDatasetWriteOptions` now has a `existing_data_behavior` setting (ARROW-12358, ARROW-7706) which controls what to do if there is data in the destination.

 * Deferred for future work:
   * Add the new options to the python/R APIs (ARROW-13703)
   * Limiting based on file size (ARROW-10439)
   * More fine grained error control (ARROW-14175)

Closes #10955 from westonpace/feature/ARROW-13542--c-compute-dataset-add-dataset-writenode-for

Authored-by: Weston Pace <weston.pace@gmail.com>
Signed-off-by: Weston Pace <weston.pace@gmail.com>
ViniciusSouzaRoque pushed a commit to s1mbi0se/arrow that referenced this pull request Oct 20, 2021
…r logic

This ended up being a fairly comprehensive overhaul of the existing dataset writer mechanism.  ~~This PR relies on apache#10968 and will remain in draft until that is completed.~~

Breaking Changes:

 * The dataset writer no longer works with the synchronous scanner.  I don't think this would be a huge change but I think the current plan is to hopefully deprecate the synchronous scanner. (ARROW-13338)  This required changes in a python/r/ruby which will presumably be reverted when ARROW-13338 is done.
 * The default behavior is now to error if the output directory has any existing data.  This can be controlled with `existing_data_behavior` (see below)
 * Previously a single global counter was used for naming files.  This PR changes to a counter per directory.  So the following...
 ```
 /a1/b1/part-0.parquet
 /a1/b1/part-2.parquet
 /a1/b2/part-1.parquet
 ```
...would be impossible.  Instead you would receive...
```
/a1/b1/part-0.parquet
/a1/b1/part-1.parquet
/a1/b2/part-0.parquet
```
...this does not, however, mean that the resulting data files will be deterministic.  If the data in `/a1/b1/part-0.parquet` and `/a1/b1/part-1.parquet` originated from two different files being scanned in an unordered fashion then either part could represent either file.  A number of test cases in all implementations had to change as the expected paths for dataset writes changed.

* New features:
  * The dataset writer now works with the async scanner (ARROW-12803)
  * The dataset writer now respects backpressure (closes ARROW-2628?, related to but does not fully solve ARROW-13590 and ARROW-13611) and will stop pulling from the scanner when `max_rows_queued` (provided as an argument to `DatasetWriter`) is exceeded.  By default `max_rows_queued` is 64M.  This is not an "option" as it I don't think it should be exposed to the user.  I think it would be offering too many knobs.  I think eventually we may want to wrap up all backpressure into a single configurable setting.
  * `FileSystemDatasetWriteOptions` now has a `max_rows_per_file` setting (ARROW-10439).
  * `FileSystemDatasetWriteOptions` now has a `max_open_files` setting (ARROW-12321) which prevents opening too many files.  Instead the writer will apply backpressure on the scanner while also closing the open file with the greatest # of rows already written (then resume writing once the file is closed).
  * `FileSystemDatasetWriteOptions` now has a `existing_data_behavior` setting (ARROW-12358, ARROW-7706) which controls what to do if there is data in the destination.

 * Deferred for future work:
   * Add the new options to the python/R APIs (ARROW-13703)
   * Limiting based on file size (ARROW-10439)
   * More fine grained error control (ARROW-14175)

Closes apache#10955 from westonpace/feature/ARROW-13542--c-compute-dataset-add-dataset-writenode-for

Authored-by: Weston Pace <weston.pace@gmail.com>
Signed-off-by: Weston Pace <weston.pace@gmail.com>
@westonpace westonpace deleted the feature/arrow-13680-async-nursery branch January 6, 2022 08:16
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants