-
Notifications
You must be signed in to change notification settings - Fork 4k
GH-32653: [C++] Cleanup error handling in execution engine #15253
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
GH-32653: [C++] Cleanup error handling in execution engine #15253
Conversation
|
Thanks for opening a pull request! If this is not a minor PR. Could you open an issue for this pull request on GitHub? https://github.com/apache/arrow/issues/new/choose Opening GitHub issues ahead of time contributes to the Openness of the Apache Arrow project. Then could you also rename the pull request title in the following format? or In the case of old issues on JIRA the title also supports: See also: |
|
|
|
This was largely based on #13848 (thanks @save-buffer). In fact, I started by trying to rebase #13848 but it ended up being too much work. @save-buffer do you want to take a look at this PR? It's mostly just an extension of your PR. Probably the most significant change is I did not try and collect multiple errors and instead stuck with the pattern of "return the first error and ignore subsequent errors" |
|
Hmm, it appears that OpenTelemetry was depending on the However, this PR, as it stands, will very much break OT output (spans won't be finished and so simply will not output). I'll try and put up a PR on Monday to fix OT that can come before this one. |
441184d to
2fc72c2
Compare
save-buffer
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Overall seems good. A couple of nits here and there. At first I thought that making StartProducing return void was a bit contentious, but now I like that it makes a plan's Status always be stored in one place, no matter where it fails. Other than that, I'd like to see MapNode removed (lower priority) and to think more about our definition of a sink node (higher priority). But overall I like this.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Would it make sense to while we're at it remove MapNode?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I poked at that a little. I think it still serves some benefit:
- Forwards backpressure
- Forwards guarantees (and soon, batch index)
- Uses an AtomicCounter to trigger an optional finish signal when all processing is done (used by the tee node)
I think this justifies its existence.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can probably remove return from here
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why does a sink not have an output schema? Sinks still output stuff batches, I think it would be more correct to define a node to be a sink if its output_ is null.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I only need is_sink for validation purposes at the moment. If a node is a sink we validate that output_ is null (e.g. we don't let you try and use it as an input during plan creation). So there are options here:
- We can get rid of the validation check and leave it to the individual nodes (I think I may have already added this validation to
SinkNode), this would probably be my preference. - We can pass in some
is_sinkbool to theExecNodeconstructor (I'd rather not do this as it is tedious but it is essentially what we had before)
I'll make sure I can implement the first option and, if so, get rid of this concept.
That being said, I am a bit curious why this is important. I think it is valid to say that a sink node doesn't have an output schema. We don't rely on it having an output schema anywhere (or we shouldn't) that I know of.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That being said, I am a bit curious why this is important. I think it is valid to say that a sink node doesn't have an output schema. We don't rely on it having an output schema anywhere (or we shouldn't) that I know of.
@save-buffer ping
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I guess it's a pretty semantic thing, but I feel like a sink node has an output schema, just not a node that it outputs to (since a sink node does output to an AsyncGenerator). Maybe it doesn't matter too much though
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For the sake of simplicity I'm going to leave this as-is (I'm about to merge this if CI passes). However, if we want to amend this definition (and remove this check) later then I have no problem with that.
…ing complexity without value and also error prone.
2fc72c2 to
e63fc49
Compare
|
|
| Defer cleanup([this]() { finished_.MarkFinished(); }); | ||
| outputs_[0]->InputFinished(this, batches_produced_); | ||
| })); | ||
| ARROW_UNUSED( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should this instead be a DCHECK or a warning or similar?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If Spawn fails it means the thread pool has shut down. There is no point in marking the plan finished and we might as well avoid an abort here so we don't accidentally hide the true cause.
…n_->finished() is always looked at.
paleolimbot
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for doing these R changes!
kou
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1 for the GLib part.
|
Benchmark runs are scheduled for baseline = 6abe6b6 and contender = 295c664. 295c664 is a master commit associated with this PR. Results will be available as each benchmark for each run completes. |
|
['Python', 'R'] benchmarks have high level of regressions. |
Simplifies error handling in exec plans.
There were several different places that exec plan failures could be reported. Now there is just one.
The
ExecNode::ErrorReceivedmechanism was removed. NowInputReceivedandInputFinishedreturn aStatusinstead. This allows for use of the existing macros instead of things likeErrorNotOkand removes the burden of error propagation from nodes and removes the burden of error handling from sink nodes.ExecNode::finishedhas now been removed. This could lead to deadlock if nodes failed to mark the future complete (this was easy to do in error scenarios). In addition, it served no real purpose. A plan is done when all of its tasks have finished.BREAKING CHANGE: ExecPlan::StartProducing now returns void. Errors that were returned from this method will now be returned from
ExecPlan::finished.BREAKING CHANGE: If a plan is stopped early (with
ExecPlan::StopProducing) then it will complete with a cancelled status instead of an ok status (assuming no other errors). This is to reflect the fact that the plan did not produce complete data.BREAKING CHANGE: Previously the sink node would push some plan errors onto the generator. Now, all errors will be output on
ExecPlan::finished. The sink node will never push an error, only batches. Readers should make sure to checkExecPlan::finished.BREAKING CHANGE: When a plan is cancelled it will no longer attempt to flush output. For example, a plan with an aggregate node will not produce an aggregation based on partial results after a cancel.