Skip to content

Conversation

@westonpace
Copy link
Member

@westonpace westonpace commented Aug 27, 2021

This PR adds a write node. The write node takes in FileSystemDatasetWriteOptions and the projected schema and writes the incoming data to disk. It is a sink node but it is a bit different than the existing sink node. The existing sink node transferred batches via an AsyncGenerator but that puts ownership of the batches outside of the push-based flow of the exec plan. I added a new ConsumingSinkNode which consumes the batches as part of the push-based flow. This makes it possible to block the exec plan from finishing until all data has been written to disk.

In addition this PR refines the AsyncTaskGroup a little. WaitForTasksToFinish was not a very clearly named method. Once called it actually transitioned the task group from a "top level tasks can be added" state to a "no more top level tasks can be added" state and that was not clear in the name. The new name (End) is hopefully more clear.

This PR does not solve the backpressure problem. Instead a serial async task group was created. This will run all tasks in order (the default task group allows them to run in parallel) but not necessarily on the calling thread (i.e. unlike SerialTaskGroup we do not block on the AddTask method). This allows tasks to pile up in a queue and, if the write is slow, this will become a pile-up point which will eventually run out of memory (provided there is enough data being written). That problem is solved in a follow-up #11286

The AsyncSerializedTaskGroup and AsyncTaskGroup classes have very similar APIs but I did not create an interface / abstract base class because I don't yet envision any case where they would be interchangeable. The distinction is a "can these tasks run in parallel or not" and is not a performance / resource question.

As a consequence of using the ExecPlan dataset writes no longer have reliable ordering. If you pass in three batches and they are all destined for the same file then the batches may be written in any order in the destination file. This is because the ExecPlan creates a thread task for each input batch and so they could arrive at the write node in any order.

@github-actions
Copy link

@westonpace westonpace force-pushed the feature/ARROW-13542-write-node branch from c651c4a to 4240953 Compare October 4, 2021 21:10
@westonpace westonpace marked this pull request as ready for review October 4, 2021 21:22
Copy link
Member

@lidavidm lidavidm left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM. There are a lot of files touched, but things look straightforward.

///
/// The future will be marked complete if all `futures` complete
/// successfully. Otherwise, it will be marked failed with the status of
/// the first failing future.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I left a nit about the docstring on the MakeTask PR: #11210 (comment)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I clarified the comment according to that review and also changed the name to AllFinished.

// is finished. Use SinkNode if you are transferring the ownership of the data to another
// system. Use ConsumingSinkNode if the data is being consumed within the exec plan (i.e.
// the exec plan should not complete until the consumption has completed).
class ConsumingSinkNode : public ExecNode {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we add an explicit test of this node in plan_test.cc, as with the other nodes?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I added a few unit tests to plan_test.cc

@aocsa
Copy link
Contributor

aocsa commented Oct 6, 2021

LGTM too me too. I reviewed the code and there are some common patterns to handle signals, input_counter and future finished vars that Is common in different ExecNodes as sames as MapExecNode I think should documented somewhere, specially to the newbies that don't want to mess with race condition.

Copy link
Member

@lidavidm lidavidm left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the updates. LGTM

@westonpace westonpace closed this in 4b8ffe4 Oct 8, 2021
ViniciusSouzaRoque pushed a commit to s1mbi0se/arrow that referenced this pull request Oct 20, 2021
…ng rows from an ExecPlan to disk

This PR adds a write node.  The write node takes in `FileSystemDatasetWriteOptions` and the projected schema and writes the incoming data to disk.  It is a sink node but it is a bit different than the existing sink node.  The existing sink node transferred batches via an AsyncGenerator but that puts ownership of the batches outside of the push-based flow of the exec plan.  I added a new ConsumingSinkNode which consumes the batches as part of the push-based flow.  This makes it possible to block the exec plan from finishing until all data has been written to disk.

In addition this PR refines the AsyncTaskGroup a little.  `WaitForTasksToFinish` was not a very clearly named method.  Once called it actually transitioned the task group from a "top level tasks can be added" state to a "no more top level tasks can be added" state and that was not clear in the name.  The new name (`End`) is hopefully more clear.

This PR does not solve the backpressure problem.  Instead a serial async task group was created.  This will run all tasks in order (the default task group allows them to run in parallel) but not necessarily on the calling thread (i.e. unlike SerialTaskGroup we do not block on the AddTask method).  This allows tasks to pile up in a queue and, if the write is slow, this will become a pile-up point which will eventually run out of memory (provided there is enough data being written).  That problem is solved in a follow-up apache#11286

The `AsyncSerializedTaskGroup` and `AsyncTaskGroup` classes have very similar APIs but I did not create an interface / abstract base class because I don't yet envision any case where they would be interchangeable.  The distinction is a "can these tasks run in parallel or not" and is not a performance / resource question.

As a consequence of using the ExecPlan dataset writes no longer have reliable ordering.  If you pass in three batches and they are all destined for the same file then the batches may be written in any order in the destination file.  This is because the ExecPlan creates a thread task for each input batch and so they could arrive at the write node in any order.

Closes apache#11017 from westonpace/feature/ARROW-13542-write-node

Authored-by: Weston Pace <weston.pace@gmail.com>
Signed-off-by: Weston Pace <weston.pace@gmail.com>
@westonpace westonpace deleted the feature/ARROW-13542-write-node branch January 6, 2022 08:15
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants