Skip to content

[G-API] Handle exceptions in streaming executor#21660

Merged
alalek merged 14 commits intoopencv:4.xfrom
TolyaTalamanov:at/handle-exception-in-streamingexecutor
Mar 25, 2022
Merged

[G-API] Handle exceptions in streaming executor#21660
alalek merged 14 commits intoopencv:4.xfrom
TolyaTalamanov:at/handle-exception-in-streamingexecutor

Conversation

@TolyaTalamanov
Copy link
Copy Markdown
Contributor

Pull Request Readiness Checklist

See details at https://github.com/opencv/opencv/wiki/How_to_contribute#making-a-good-pull-request

  • I agree to contribute to the project under Apache 2 License.
  • To the best of my knowledge, the proposed patch is not based on a code under GPL or another license that is incompatible with OpenCV
  • The PR is proposed to the proper branch
  • There is a reference to the original bug report and related work
  • There is accuracy test, performance test and test data in opencv_extra repository, if applicable
    Patch to opencv_extra has the same branch name.
  • The feature is well documented and sample code can be built with the project CMake

Build configuration

force_builders=Custom,Custom Win,Custom Mac
build_gapi_standalone:Linux x64=ade-0.1.1f
build_gapi_standalone:Win64=ade-0.1.1f
build_gapi_standalone:Mac=ade-0.1.1f
build_gapi_standalone:Linux x64 Debug=ade-0.1.1f

Xbuild_image:Custom=centos:7
Xbuildworker:Custom=linux-1
build_gapi_standalone:Custom=ade-0.1.1f

build_image:Custom=ubuntu-openvino-2021.4.1:20.04
build_image:Custom Win=openvino-2021.4.1
build_image:Custom Mac=openvino-2021.4.1

test_modules:Custom=gapi,python2,python3,java
test_modules:Custom Win=gapi,python2,python3,java
test_modules:Custom Mac=gapi,python2,python3,java

buildworker:Custom=linux-1
# disabled due high memory usage: test_opencl:Custom=ON
test_opencl:Custom=OFF
test_bigdata:Custom=1
test_filter:Custom=*

@TolyaTalamanov TolyaTalamanov added this to the 4.6.0 milestone Feb 24, 2022
@TolyaTalamanov TolyaTalamanov force-pushed the at/handle-exception-in-streamingexecutor branch from a6f530d to 086872a Compare February 25, 2022 09:09
#ifndef OPENCV_GAPI_GCPUKERNEL_HPP
#define OPENCV_GAPI_GCPUKERNEL_HPP

#ifdef _MSC_VER
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems to be false positive warning, but need to double check

}
}

virtual void post(cv::gimpl::Exception&& error) override
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy paste from post(EndOfStream&&) except incrementing m_stops_sent

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IMHO:
From my perspective there are a lot of sort-out and shitfing from queue to queue code here as a legacy part.
But it this function just copied & pasted then I would recommend introduce self-documented private function here like
dispatch<Message>()
because it would be easier to understand full-body of post function when see that post just makes dispatch to subsequent

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's the problem, this function isn't fully copy-pasted. (m_stops_sent++ removed)

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i assumed some customization here, for example

post(Exception )
{
(void)dispatch<Exception> ()
}

post(Stop)
{
stop_sent += dispatch <Stop>();
}

Please consider this comment as optional

GAPI_ITT_AUTO_TRACE_GUARD(collector_push_hndl);
out_queue.push(Cmd{Result{std::move(this_result), flags}});
case QueueReader::V::index_of<Stop>():
if (handle_stop)
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@AsyaPronina
Does it make sense to put traces here?

@TolyaTalamanov
Copy link
Copy Markdown
Contributor Author

@sivanov-work Could you have a look?

Copy link
Copy Markdown
Contributor

@sivanov-work sivanov-work left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IMHO: there a lot of switch-like code block which handles different types of variant and it's hard to understand & read it. Furthermore, for future customization ( adding new message type) it requires to sort-out a lot of similar code and doesn't look upgraded easily

I'd propose to introduce full-descriptive-class of Message of ControlMessage as part of current std::variant at least then introduce message_handle(ControlMessage &) to encapsulate operations with it - to make code clear

Also, there might be a sort of problem in working with queue capacity: when exception (and other command messages) cannot be pushed due to queue fullness. I'd propose to consider to modify queue to some kind of smart queue: which allow to push controlMessages when it's full as high priority messages

try {
t.run(request);
} catch (...) {
request.SetCompletionCallback([](){});
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i'd put LOG_WARNING/ERROR here something like:

request by id: " << id << " failed, error" << ex.what()

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Don't think that log is needed here, user will see exception message when call pull.
I'd propose to make exception msg a bit clearer.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it's optional comment: please feel free to ignore it
in defense of logger I could say that it may provide more detailed information about what actually happened and what current states we have which will help in further troubleshooting


if (code != IE::StatusCode::OK) {
ctx->eptr = std::make_exception_ptr(
std::logic_error("IE::InferRequest finished with not OK status"));
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

from my perspective it would be great also to print status value

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added


struct Exception {
std::exception_ptr eptr;
};
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IMHO:
encapsulation in some kind of ServiceMessage will simplify future extensibility:

struct ServiceMessage
{
enum { EndOfStream, Exception }
....
};

using StreamMsg = cv::util::variant<ServiceMessage, cv::GRunArgs>;

then

if (cv::util::holds_alternative<cv::gimpl::EndOfStream>(in_msg))   {
    }
    if (cv::util::holds_alternative<cv::gimpl::Exception>(in_msg))    {
    }

might be modified as

if (cv::util::holds_alternative<cv::gimpl::ServiceMessage>(in_msg))   {
        out.post(in_msg);
    }

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's really make code a bit clearer, but

  • Is there any other possible message types? (I can't imagine)
  • EndOfStream & Exception should be handled in different ways (almost the same but not)

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there any other possible message types?

there had been only one Stop message before you introduced Exception - so hypothetically if any new messages are required we will need to repeat this effort again for adding new variant types

EndOfStream & Exception should be handled in different ways (almost the same but not)

that's because we would separate "traffic" messages & "service" messages in processing. Something like that:

if (cv::util::holds_alternative<cv::gimpl::ServiceMessage>(in_msg)) {
        //encapsulate switch-case
        process_service_message(in_msg); //dispatch for other islands, stop, exception and so on...
 } 
else 
{
.. frame processing
}

P.S. optional comment

break;
}
default:
cv::util::throw_error(std::logic_error("Unsupported cmd type in getInputVector()"));
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i think it had better to put GAPI_DbgAssert here in consideration that it acts as c-like assert

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What do you think about GAPI_Assert instead? In order to keep consistency with other code (see above)
GAPI_Assert(stop.kind == Stop::Kind::HARD); and etc...

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

oq->push(Cmd{cv::gimpl::Exception{eptr}});
}
// NB: Go to the next iteration.
continue;
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will we hang here forever?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, why?

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

when emitter produces exception constantly then we catch it and go to 'continue' for while (true)

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If source produces exceptions constantly then these exceptions will be propagated through the graph and rethrown in GStreamingExecutor::pull to the user.

Emitter shouldn't be stopped if source caused exception, see the test where source throw exception every second frame

break;
case Posting::V::index_of<cv::gimpl::EndOfStream>():
cmd = Cmd{Stop{}};
m_stops_sent++;
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does it make sense to introduce m_exceptions_caught++ for exception case, doesn't it?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I didn't get the idea, why it's needed?

Copy link
Copy Markdown
Contributor

@sivanov-work sivanov-work Mar 10, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

not sure, for statistics/analytics maybe - in case of multiple exception are allowed in pipeline without stopping it

If this case is not supported - then please ignore this comment

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Multiple exceptions aren't supported.

In this PR, I try not to allow exceptions to be thrown from not main thread.

}
}

virtual void post(cv::gimpl::Exception&& error) override
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IMHO:
From my perspective there are a lot of sort-out and shitfing from queue to queue code here as a legacy part.
But it this function just copied & pasted then I would recommend introduce self-documented private function here like
dispatch<Message>()
because it would be easier to understand full-body of post function when see that post just makes dispatch to subsequent

Copy link
Copy Markdown
Contributor

@sivanov-work sivanov-work left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM if old & new tests are passed

try {
t.run(request);
} catch (...) {
request.SetCompletionCallback([](){});
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it's optional comment: please feel free to ignore it
in defense of logger I could say that it may provide more detailed information about what actually happened and what current states we have which will help in further troubleshooting


struct Exception {
std::exception_ptr eptr;
};
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there any other possible message types?

there had been only one Stop message before you introduced Exception - so hypothetically if any new messages are required we will need to repeat this effort again for adding new variant types

EndOfStream & Exception should be handled in different ways (almost the same but not)

that's because we would separate "traffic" messages & "service" messages in processing. Something like that:

if (cv::util::holds_alternative<cv::gimpl::ServiceMessage>(in_msg)) {
        //encapsulate switch-case
        process_service_message(in_msg); //dispatch for other islands, stop, exception and so on...
 } 
else 
{
.. frame processing
}

P.S. optional comment

{
if (eptr)
{
std::rethrow_exception(eptr);
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is it some kind of default/stub implementation?

because it now has a "state" and it affects multithreading behavior between simultaneous post and verify at least

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

GExecutor::IOutput methods might be called from different threads, but not concurrently.
verify() is supposed to be called after post()

https://github.com/TolyaTalamanov/opencv/blob/at/handle-exception-in-streamingexecutor/modules/gapi/src/executor/gexecutor.cpp#L410-L412

That's a good point, but I don't know how guarantee that's methods won't be called concurrently in future, except putting comment

bool QueueReader::getInputVector(std::vector<Q*> &in_queues,
cv::GRunArgs &in_constants,
cv::GRunArgs &isl_inputs)
cv::gimpl::StreamMsg QueueReader::getInputVector(std::vector<Q*> &in_queues,
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can you clarify - why it changed on returning value?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Previously bool means != Stop{}, now except Stop there can be Exception message, so it was changed from bool to StreamMsg

oq->push(Cmd{cv::gimpl::Exception{eptr}});
}
// NB: Go to the next iteration.
continue;
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

when emitter produces exception constantly then we catch it and go to 'continue' for while (true)

break;
case Posting::V::index_of<cv::gimpl::EndOfStream>():
cmd = Cmd{Stop{}};
m_stops_sent++;
Copy link
Copy Markdown
Contributor

@sivanov-work sivanov-work Mar 10, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

not sure, for statistics/analytics maybe - in case of multiple exception are allowed in pipeline without stopping it

If this case is not supported - then please ignore this comment

}
}

virtual void post(cv::gimpl::Exception&& error) override
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i assumed some customization here, for example

post(Exception )
{
(void)dispatch<Exception> ()
}

post(Stop)
{
stop_sent += dispatch <Stop>();
}

Please consider this comment as optional

case Cmd::index_of<Exception>(): {
std::rethrow_exception(cv::util::get<Exception>(cmd).eptr);
return true;
}
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there any warning about missing 'default' ?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed

@TolyaTalamanov
Copy link
Copy Markdown
Contributor Author

@dmatveev Could you have a look, please?

Copy link
Copy Markdown
Contributor

@dmatveev dmatveev left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Updating every possible backend with this new exception handling is quite invasive.

Can you please offer some other way to do it?
Can those changes be only introduced at the executors level?

Comment on lines +801 to +806
if (cv::util::holds_alternative<cv::gimpl::Exception>(in_msg))
{
// (4) If the Exception message is revieved, propagate it further.
out.post(std::move(cv::util::get<cv::gimpl::Exception>(in_msg)));
return;
}
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should this be done at this level? As well as every other island's level?

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I mean, shouldn't this be handled uniformly for all possible islands? A level above.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Couldn't agree more. Handle the input exception in a general way.

Comment on lines +505 to +508
if (cv::util::holds_alternative<cv::gimpl::Exception>(in_msg)) {
out.post(std::move(cv::util::get<cv::gimpl::Exception>(in_msg)));
return;
}
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

well, if we need to update every our backend now, this change is quite invasive.

What's about our huge internal part?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Answered above

Copy link
Copy Markdown
Contributor

@dmatveev dmatveev left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is probably done right but I can't say it for sure until it is properly documented

try {
t.run(request);
} catch (...) {
request.SetCompletionCallback([](){});
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A comment is required here. Previous single-liner was clear, now I have no idea why this should happen in catch. And I doubt if anybody knows.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

Comment on lines 718 to +724
void cv::gimpl::ie::RequestPool::callback(cv::gimpl::ie::RequestPool::Task task,
size_t id,
IE::InferRequest request,
IE::StatusCode code) {
// FIXME: Any exception which is arrised here must not leave this callback,
// because it won't be handled.
try {
if (code != IE::StatusCode::OK) {
throw std::logic_error("IE::InferRequest finished with not OK status");
}
task.callback(request);
// NB: IE::InferRequest keeps the callback until the new one is set.
// Since user's callback might keep resources that should be released,
// need to destroy its after execution.
// Let's set the empty one to cause the destruction of a callback.
request.SetCompletionCallback([](){});
m_idle_ids.push(id);
} catch (const std::exception& e) {
GAPI_LOG_FATAL(NULL, "Callback failed with error: " << e.what());
//FIXME: Exception CAN't be rethrown here, since this callback works
// in separate IE thread and such scenarios aren't handled properly in
// G-API so far.
}
IE::StatusCode code) noexcept {
task.callback(request, code);
request.SetCompletionCallback([](){});
m_idle_ids.push(id);
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What is this all about? Why? What's for? A simple comment would help to understand the need in this method.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

return;
}

GAPI_Assert(cv::util::holds_alternative<cv::GRunArgs>(in_msg));
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should this be a DgbAssert? get<> will fail anyway if the message is of other type.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Make sense, done

* Put more comments
* Fix alignment
* Move test outside of HAVE_NGRAPH
Copy link
Copy Markdown
Contributor

@dmatveev dmatveev left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍

@TolyaTalamanov
Copy link
Copy Markdown
Contributor Author

@alalek Could you merge it, please?

@alalek alalek merged commit d98e07c into opencv:4.x Mar 25, 2022
@opencv-pushbot opencv-pushbot mentioned this pull request Apr 23, 2022
a-sajjad72 pushed a commit to a-sajjad72/opencv that referenced this pull request Mar 30, 2023
…on-in-streamingexecutor

[G-API] Handle exceptions in streaming executor

* Handle exceptions in streaming executor

* Rethrow exception in non-streaming executor

* Clean up

* Put more tests

* Handle exceptions in IE backend

* Handle exception in IE callbacks

* Handle exception in GExecutor

* Handle all exceptions in IE backend

* Not only (std::exception& e)

* Fix comments to review

* Handle input exception in generic way

* Fix comment

* Clean up

* Apply review comments

* Put more comments
* Fix alignment
* Move test outside of HAVE_NGRAPH

* Fix compilation
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants