[G-API] Handle exceptions in streaming executor#21660
Conversation
a6f530d to
086872a
Compare
| #ifndef OPENCV_GAPI_GCPUKERNEL_HPP | ||
| #define OPENCV_GAPI_GCPUKERNEL_HPP | ||
|
|
||
| #ifdef _MSC_VER |
There was a problem hiding this comment.
Seems to be false positive warning, but need to double check
| } | ||
| } | ||
|
|
||
| virtual void post(cv::gimpl::Exception&& error) override |
There was a problem hiding this comment.
Copy paste from post(EndOfStream&&) except incrementing m_stops_sent
There was a problem hiding this comment.
IMHO:
From my perspective there are a lot of sort-out and shitfing from queue to queue code here as a legacy part.
But it this function just copied & pasted then I would recommend introduce self-documented private function here like
dispatch<Message>()
because it would be easier to understand full-body of post function when see that post just makes dispatch to subsequent
There was a problem hiding this comment.
That's the problem, this function isn't fully copy-pasted. (m_stops_sent++ removed)
There was a problem hiding this comment.
i assumed some customization here, for example
post(Exception )
{
(void)dispatch<Exception> ()
}
post(Stop)
{
stop_sent += dispatch <Stop>();
}
Please consider this comment as optional
| GAPI_ITT_AUTO_TRACE_GUARD(collector_push_hndl); | ||
| out_queue.push(Cmd{Result{std::move(this_result), flags}}); | ||
| case QueueReader::V::index_of<Stop>(): | ||
| if (handle_stop) |
There was a problem hiding this comment.
@AsyaPronina
Does it make sense to put traces here?
|
@sivanov-work Could you have a look? |
sivanov-work
left a comment
There was a problem hiding this comment.
IMHO: there a lot of switch-like code block which handles different types of variant and it's hard to understand & read it. Furthermore, for future customization ( adding new message type) it requires to sort-out a lot of similar code and doesn't look upgraded easily
I'd propose to introduce full-descriptive-class of Message of ControlMessage as part of current std::variant at least then introduce message_handle(ControlMessage &) to encapsulate operations with it - to make code clear
Also, there might be a sort of problem in working with queue capacity: when exception (and other command messages) cannot be pushed due to queue fullness. I'd propose to consider to modify queue to some kind of smart queue: which allow to push controlMessages when it's full as high priority messages
| try { | ||
| t.run(request); | ||
| } catch (...) { | ||
| request.SetCompletionCallback([](){}); |
There was a problem hiding this comment.
i'd put LOG_WARNING/ERROR here something like:
request by id: " << id << " failed, error" << ex.what()
There was a problem hiding this comment.
Don't think that log is needed here, user will see exception message when call pull.
I'd propose to make exception msg a bit clearer.
There was a problem hiding this comment.
it's optional comment: please feel free to ignore it
in defense of logger I could say that it may provide more detailed information about what actually happened and what current states we have which will help in further troubleshooting
|
|
||
| if (code != IE::StatusCode::OK) { | ||
| ctx->eptr = std::make_exception_ptr( | ||
| std::logic_error("IE::InferRequest finished with not OK status")); |
There was a problem hiding this comment.
from my perspective it would be great also to print status value
|
|
||
| struct Exception { | ||
| std::exception_ptr eptr; | ||
| }; |
There was a problem hiding this comment.
IMHO:
encapsulation in some kind of ServiceMessage will simplify future extensibility:
struct ServiceMessage
{
enum { EndOfStream, Exception }
....
};
using StreamMsg = cv::util::variant<ServiceMessage, cv::GRunArgs>;
then
if (cv::util::holds_alternative<cv::gimpl::EndOfStream>(in_msg)) {
}
if (cv::util::holds_alternative<cv::gimpl::Exception>(in_msg)) {
}
might be modified as
if (cv::util::holds_alternative<cv::gimpl::ServiceMessage>(in_msg)) {
out.post(in_msg);
}
There was a problem hiding this comment.
It's really make code a bit clearer, but
- Is there any other possible message types? (I can't imagine)
EndOfStream&Exceptionshould be handled in different ways (almost the same but not)
There was a problem hiding this comment.
Is there any other possible message types?
there had been only one Stop message before you introduced Exception - so hypothetically if any new messages are required we will need to repeat this effort again for adding new variant types
EndOfStream & Exception should be handled in different ways (almost the same but not)
that's because we would separate "traffic" messages & "service" messages in processing. Something like that:
if (cv::util::holds_alternative<cv::gimpl::ServiceMessage>(in_msg)) {
//encapsulate switch-case
process_service_message(in_msg); //dispatch for other islands, stop, exception and so on...
}
else
{
.. frame processing
}
P.S. optional comment
| break; | ||
| } | ||
| default: | ||
| cv::util::throw_error(std::logic_error("Unsupported cmd type in getInputVector()")); |
There was a problem hiding this comment.
i think it had better to put GAPI_DbgAssert here in consideration that it acts as c-like assert
There was a problem hiding this comment.
What do you think about GAPI_Assert instead? In order to keep consistency with other code (see above)
GAPI_Assert(stop.kind == Stop::Kind::HARD); and etc...
| oq->push(Cmd{cv::gimpl::Exception{eptr}}); | ||
| } | ||
| // NB: Go to the next iteration. | ||
| continue; |
There was a problem hiding this comment.
Will we hang here forever?
There was a problem hiding this comment.
when emitter produces exception constantly then we catch it and go to 'continue' for while (true)
There was a problem hiding this comment.
If source produces exceptions constantly then these exceptions will be propagated through the graph and rethrown in GStreamingExecutor::pull to the user.
Emitter shouldn't be stopped if source caused exception, see the test where source throw exception every second frame
| break; | ||
| case Posting::V::index_of<cv::gimpl::EndOfStream>(): | ||
| cmd = Cmd{Stop{}}; | ||
| m_stops_sent++; |
There was a problem hiding this comment.
Does it make sense to introduce m_exceptions_caught++ for exception case, doesn't it?
There was a problem hiding this comment.
I didn't get the idea, why it's needed?
There was a problem hiding this comment.
not sure, for statistics/analytics maybe - in case of multiple exception are allowed in pipeline without stopping it
If this case is not supported - then please ignore this comment
There was a problem hiding this comment.
Multiple exceptions aren't supported.
In this PR, I try not to allow exceptions to be thrown from not main thread.
| } | ||
| } | ||
|
|
||
| virtual void post(cv::gimpl::Exception&& error) override |
There was a problem hiding this comment.
IMHO:
From my perspective there are a lot of sort-out and shitfing from queue to queue code here as a legacy part.
But it this function just copied & pasted then I would recommend introduce self-documented private function here like
dispatch<Message>()
because it would be easier to understand full-body of post function when see that post just makes dispatch to subsequent
sivanov-work
left a comment
There was a problem hiding this comment.
LGTM if old & new tests are passed
| try { | ||
| t.run(request); | ||
| } catch (...) { | ||
| request.SetCompletionCallback([](){}); |
There was a problem hiding this comment.
it's optional comment: please feel free to ignore it
in defense of logger I could say that it may provide more detailed information about what actually happened and what current states we have which will help in further troubleshooting
|
|
||
| struct Exception { | ||
| std::exception_ptr eptr; | ||
| }; |
There was a problem hiding this comment.
Is there any other possible message types?
there had been only one Stop message before you introduced Exception - so hypothetically if any new messages are required we will need to repeat this effort again for adding new variant types
EndOfStream & Exception should be handled in different ways (almost the same but not)
that's because we would separate "traffic" messages & "service" messages in processing. Something like that:
if (cv::util::holds_alternative<cv::gimpl::ServiceMessage>(in_msg)) {
//encapsulate switch-case
process_service_message(in_msg); //dispatch for other islands, stop, exception and so on...
}
else
{
.. frame processing
}
P.S. optional comment
| { | ||
| if (eptr) | ||
| { | ||
| std::rethrow_exception(eptr); |
There was a problem hiding this comment.
is it some kind of default/stub implementation?
because it now has a "state" and it affects multithreading behavior between simultaneous post and verify at least
There was a problem hiding this comment.
GExecutor::IOutput methods might be called from different threads, but not concurrently.
verify() is supposed to be called after post()
That's a good point, but I don't know how guarantee that's methods won't be called concurrently in future, except putting comment
| bool QueueReader::getInputVector(std::vector<Q*> &in_queues, | ||
| cv::GRunArgs &in_constants, | ||
| cv::GRunArgs &isl_inputs) | ||
| cv::gimpl::StreamMsg QueueReader::getInputVector(std::vector<Q*> &in_queues, |
There was a problem hiding this comment.
can you clarify - why it changed on returning value?
There was a problem hiding this comment.
Previously bool means != Stop{}, now except Stop there can be Exception message, so it was changed from bool to StreamMsg
| oq->push(Cmd{cv::gimpl::Exception{eptr}}); | ||
| } | ||
| // NB: Go to the next iteration. | ||
| continue; |
There was a problem hiding this comment.
when emitter produces exception constantly then we catch it and go to 'continue' for while (true)
| break; | ||
| case Posting::V::index_of<cv::gimpl::EndOfStream>(): | ||
| cmd = Cmd{Stop{}}; | ||
| m_stops_sent++; |
There was a problem hiding this comment.
not sure, for statistics/analytics maybe - in case of multiple exception are allowed in pipeline without stopping it
If this case is not supported - then please ignore this comment
| } | ||
| } | ||
|
|
||
| virtual void post(cv::gimpl::Exception&& error) override |
There was a problem hiding this comment.
i assumed some customization here, for example
post(Exception )
{
(void)dispatch<Exception> ()
}
post(Stop)
{
stop_sent += dispatch <Stop>();
}
Please consider this comment as optional
| case Cmd::index_of<Exception>(): { | ||
| std::rethrow_exception(cv::util::get<Exception>(cmd).eptr); | ||
| return true; | ||
| } |
There was a problem hiding this comment.
Is there any warning about missing 'default' ?
|
@dmatveev Could you have a look, please? |
dmatveev
left a comment
There was a problem hiding this comment.
Updating every possible backend with this new exception handling is quite invasive.
Can you please offer some other way to do it?
Can those changes be only introduced at the executors level?
| if (cv::util::holds_alternative<cv::gimpl::Exception>(in_msg)) | ||
| { | ||
| // (4) If the Exception message is revieved, propagate it further. | ||
| out.post(std::move(cv::util::get<cv::gimpl::Exception>(in_msg))); | ||
| return; | ||
| } |
There was a problem hiding this comment.
Should this be done at this level? As well as every other island's level?
There was a problem hiding this comment.
I mean, shouldn't this be handled uniformly for all possible islands? A level above.
There was a problem hiding this comment.
Couldn't agree more. Handle the input exception in a general way.
| if (cv::util::holds_alternative<cv::gimpl::Exception>(in_msg)) { | ||
| out.post(std::move(cv::util::get<cv::gimpl::Exception>(in_msg))); | ||
| return; | ||
| } |
There was a problem hiding this comment.
well, if we need to update every our backend now, this change is quite invasive.
What's about our huge internal part?
There was a problem hiding this comment.
Answered above
dmatveev
left a comment
There was a problem hiding this comment.
It is probably done right but I can't say it for sure until it is properly documented
| try { | ||
| t.run(request); | ||
| } catch (...) { | ||
| request.SetCompletionCallback([](){}); |
There was a problem hiding this comment.
A comment is required here. Previous single-liner was clear, now I have no idea why this should happen in catch. And I doubt if anybody knows.
| void cv::gimpl::ie::RequestPool::callback(cv::gimpl::ie::RequestPool::Task task, | ||
| size_t id, | ||
| IE::InferRequest request, | ||
| IE::StatusCode code) { | ||
| // FIXME: Any exception which is arrised here must not leave this callback, | ||
| // because it won't be handled. | ||
| try { | ||
| if (code != IE::StatusCode::OK) { | ||
| throw std::logic_error("IE::InferRequest finished with not OK status"); | ||
| } | ||
| task.callback(request); | ||
| // NB: IE::InferRequest keeps the callback until the new one is set. | ||
| // Since user's callback might keep resources that should be released, | ||
| // need to destroy its after execution. | ||
| // Let's set the empty one to cause the destruction of a callback. | ||
| request.SetCompletionCallback([](){}); | ||
| m_idle_ids.push(id); | ||
| } catch (const std::exception& e) { | ||
| GAPI_LOG_FATAL(NULL, "Callback failed with error: " << e.what()); | ||
| //FIXME: Exception CAN't be rethrown here, since this callback works | ||
| // in separate IE thread and such scenarios aren't handled properly in | ||
| // G-API so far. | ||
| } | ||
| IE::StatusCode code) noexcept { | ||
| task.callback(request, code); | ||
| request.SetCompletionCallback([](){}); | ||
| m_idle_ids.push(id); |
There was a problem hiding this comment.
What is this all about? Why? What's for? A simple comment would help to understand the need in this method.
| return; | ||
| } | ||
|
|
||
| GAPI_Assert(cv::util::holds_alternative<cv::GRunArgs>(in_msg)); |
There was a problem hiding this comment.
Should this be a DgbAssert? get<> will fail anyway if the message is of other type.
There was a problem hiding this comment.
Make sense, done
* Put more comments * Fix alignment * Move test outside of HAVE_NGRAPH
|
@alalek Could you merge it, please? |
…on-in-streamingexecutor [G-API] Handle exceptions in streaming executor * Handle exceptions in streaming executor * Rethrow exception in non-streaming executor * Clean up * Put more tests * Handle exceptions in IE backend * Handle exception in IE callbacks * Handle exception in GExecutor * Handle all exceptions in IE backend * Not only (std::exception& e) * Fix comments to review * Handle input exception in generic way * Fix comment * Clean up * Apply review comments * Put more comments * Fix alignment * Move test outside of HAVE_NGRAPH * Fix compilation
Pull Request Readiness Checklist
See details at https://github.com/opencv/opencv/wiki/How_to_contribute#making-a-good-pull-request
Patch to opencv_extra has the same branch name.
Build configuration