Skip to content

[G-API] GStreamingExecutor abstract interface#21816

Closed
OrestChura wants to merge 1 commit intoopencv:4.xfrom
OrestChura:oc/gstreamexec_abstract
Closed

[G-API] GStreamingExecutor abstract interface#21816
OrestChura wants to merge 1 commit intoopencv:4.xfrom
OrestChura:oc/gstreamexec_abstract

Conversation

@OrestChura
Copy link
Copy Markdown
Contributor

This pull request separates GStreamingExecutor into interface and GTBBStreamingExecutor which is a TBB implementation for the streaming mode executor.

Pull Request Readiness Checklist

See details at https://github.com/opencv/opencv/wiki/How_to_contribute#making-a-good-pull-request

  • I agree to contribute to the project under Apache 2 License.
  • To the best of my knowledge, the proposed patch is not based on a code under GPL or another license that is incompatible with OpenCV
  • The PR is proposed to the proper branch
  • There is a reference to the original bug report and related work
  • There is accuracy test, performance test and test data in opencv_extra repository, if applicable
    Patch to opencv_extra has the same branch name.
  • The feature is well documented and sample code can be built with the project CMake

// Need to eliminate throwings from stop(): make stop_impl() & wait_shutdown() no_throw
try {
if (state == State::READY || state == State::RUNNING)
if (state == State::READY || state == State::RUNNING) {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

optional:
here or in stop_impl it'd better to put GAPI_LOG_INFO(nullptr, "<executor name> is stopping, state: " << state);


if (state == State::STOPPED)
if (state == State::STOPPED) {
return false;
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  • GAPI_LOG_INFO("executor has stopped: no data available")


GAPI_Assert(m_sink_queues.size() == outs.size());
GAPI_Assert(GModel::Graph(*m_orig_graph).metadata().get<Protocol>().out_nhs.size() == outs.size() &&
"Number of data objects in cv::gout() must match the number of graph outputs in cv::GOut()");
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can it be placed under GAPI_DbgAssert ? Then it doesn't evaluated for Release builds

if (cv::util::holds_alternative<Stop>(cmd))
{
cv::GRunArgs this_result;
if (!try_pull_impl(this_result)) {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The only difference between pulll and try_pull is here?

try {
if (state == State::READY || state == State::RUNNING)
if (state == State::READY || state == State::RUNNING) {
stop();
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

UB if stop is a virtual or invoke a virtual method inside. I checked code below and find out that it triggers stop_impl which is virtual and UB is possible

void clear() {
for (auto &q : m_sync_queues) q.clear();
m_sync_queues.clear();
m_synchronized_emitters.clear();
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are there guarantees that join called before clear?
otherwise i suggest to put join inside clear

// Pipeline depth is equal to number of its (pipeline) steps.
auto has_queue_capacity = cv::gapi::getCompileArg<cv::gapi::streaming::queue_capacity>(m_comp_args);
const auto queue_capacity = has_queue_capacity ? has_queue_capacity->capacity :
3*std::count_if
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what is 3?

// Produces the next video frame when pulled.
case T::index_of<cv::gapi::wip::IStreamSource::Ptr>():
#if !defined(GAPI_STANDALONE)
emitter.reset(new VideoEmitter{emit_arg});
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

GAPI_LOG_INFO("create VideoEmitter ")

default:
// Create a constant emitter.
// Produces always the same ("constant") value when pulled.
emitter.reset(new ConstEmitter{emit_arg});
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

GAPI_LOG_INFO("create ConstEmitter" );


class GTBBStreamingExecutor final : public GStreamingExecutor {
protected:
const bool m_desync;
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I suggest to implement TWO different GTBBStreamingExecutor:
GTBB_SYNC_StreamingExecutor ---> GTBBStreamingExecutor <-- GTBB_DESYNC_StreamingExecutor

Do do not stick flag-based logic through single and huge GTBBStreamingExecutor

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@sivanov-work wouldn't it involve a huge chunk of code being copy-pasted?

@sivanov-work
Copy link
Copy Markdown
Contributor

I suggest that we separated utils-like queues , reader and so on into different files. Because it is hard to investigate these. Also they could be reused in different components
Also i suggest that we should move out const desync flag and apply further division of TBB Executor on multiple descendants: for sync & desync cases.

bool cv::GStreamingCompiled::Priv::pull(cv::GOptRunArgsP &&outs)
{
return m_exec->pull(std::move(outs));
auto* exec = dynamic_cast<cv::gimpl::GTBBStreamingExecutor*>(m_exec.get());
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we need those checks in each pull?

bool virtual pull_impl(cv::GRunArgs& this_result) = 0;
bool virtual try_pull_impl(cv::GRunArgs& this_result) = 0;
void virtual stop_impl() = 0;
void virtual wait_shutdown() = 0;
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

_impl?

void wait_shutdown();

cv::GTypesInfo out_info;
void virtual start_impl() = 0;
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you, please, explain why _impls are virtual and not the default functions?

Copy link
Copy Markdown
Contributor

@smirnov-alexey smirnov-alexey Apr 5, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see now that there is some little logic in those default functions. Is it OK to apply that logic for each potential nested executor class? If so, I'm OK with having virtual _impls

static const char *name() { return "DesyncSpecialCase"; }
};

std::vector<Q*> reader_queues( ade::Graph &g,
Copy link
Copy Markdown
Contributor

@smirnov-alexey smirnov-alexey Apr 5, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Alignment is broken here and below in input_queues

bool available = std::get<2>(it);

using T = cv::GOptRunArgP;
#define HANDLE_CASE(Type) \
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please, align the \

auto &mat_opt = *cv::util::get<O<cv::Mat>*>(out_obj);
if (available) {
auto q_map = cv::util::get<cv::RMat>(res_obj).access(cv::RMat::Access::R);
// FIXME: Copy! Maybe we could do some optimization for this case!
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shouldn't we be concerned about this copy? Is there a task to fix it later if needed?

// Need to eliminate throwings from stop(): make stop_impl() & wait_shutdown() no_throw
try {
if (state == State::READY || state == State::RUNNING)
if (state == State::READY || state == State::RUNNING) {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shouldn't we make state an atomic/guard it? I guess the question is not related to this PR but still

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please, ignore if it's not a critical section

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants