-
Notifications
You must be signed in to change notification settings - Fork 4k
ARROW-17381: [C++][Acero] Centralize error handling in ExecPlan #13848
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
|
|
9dbb7e2 to
25dbf30
Compare
25dbf30 to
2cdc239
Compare
westonpace
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I like the cleanup, this is definitely simplifying ExecNode/ExecPlan. I have some initial thoughts.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
| // COMMIT cd5346e14450d7e5ca156acb4c2f396885c77aa0 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Eventually this case will go away
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why does the order no longer matter here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Wouldn't a more appropriate place to trigger EndTaskGroup be when InputFinished is received on all sinks?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
EndTaskGroup has a nice property that it ends when it runs out of tasks to perform, here's the comment:
/// It is allowed for tasks to be added after this call provided the future has not yet
/// completed. This should be safe as long as the tasks being added are added as part
/// of a task that is tracked. As soon as the count of running tasks reaches 0 this
/// future will be marked complete.
So we will end when all of the tasks have finished running and no new tasks have been scheduled.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Wouldn't we call node->Abort when we transition to aborted_ = true?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We want to avoid any possible race conditions while aborting/doing cleanup and running tasks, so it's only safe to Abort when we're sure that no other tasks are running.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Very happy to see this move into the base class.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
At this point maybe we should just move the body of DoProject into this method?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can this be a default implementation for ExecNode::InputFinished?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah it probably can be. Actually this span thing is a bit broken right now in general because we don't enforce that InputFinished is called after all batches have been output. InputFinished is merely to specify the total number of batches that will be output, so e.g. in the case of scalar aggregates that output only one row ever, InputFinished is called in StartProducing, and so a project above a scalar aggregate node would be ended immediately.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we keep the comment?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What does Abort execution mean for a node? In theory all "execution" is handled via the scheduler so does a node really need to do anything here? Why ExecNode::Abort instead of doing the cleanup in the ExecNode destructor?
|
@zagto do you mind taking a look at this when you get a chance? |
zagto
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nice work. I love seeing the code becoming cleaner and easier to unterstand.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think this std::move does anything, given that status is a const reference.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If we get a non-ok status here, would that mean we just abort while discarding the Status/message? This seems confusing to the user. Maybe we could have an ExecPlan::Abort(Status) that adds the status to ExecPlanImpl::errors_?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
| auto values = batch.values; | |
| auto values = std::move(batch.values); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
was this intentional?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why do we 3 calls to SleepABit? Probably because one may not be enough on slower systems, but I think a comment would be helpful here
1cc334d to
279bf83
Compare
279bf83 to
1c75db4
Compare
westonpace
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Are you interested in dusting this off and rebasing now that the previous cleanup has merged?
| /// \brief Stop producing definitively to a single output | ||
| /// | ||
| /// This call is a hint that an output node has completed and is not willing | ||
| /// to receive any further data. | ||
| virtual void StopProducing(ExecNode* output) = 0; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I've since learned that this is still needed. This covers the case where a LIMIT X node is placed on one branch of a query. It is intended to stop part of the plan but not abort the entire plan. Do you think we can leave it in?
|
@save-buffer are you interested in rebasing this? |
|
Closing because it has been untouched for a while, in case it's still relevant feel free to reopen and move it forward 👍 |
No description provided.