[G-API] GStreamingExecutor abstract interface#21816
[G-API] GStreamingExecutor abstract interface#21816OrestChura wants to merge 1 commit intoopencv:4.xfrom
Conversation
| // Need to eliminate throwings from stop(): make stop_impl() & wait_shutdown() no_throw | ||
| try { | ||
| if (state == State::READY || state == State::RUNNING) | ||
| if (state == State::READY || state == State::RUNNING) { |
There was a problem hiding this comment.
optional:
here or in stop_impl it'd better to put GAPI_LOG_INFO(nullptr, "<executor name> is stopping, state: " << state);
|
|
||
| if (state == State::STOPPED) | ||
| if (state == State::STOPPED) { | ||
| return false; |
There was a problem hiding this comment.
- GAPI_LOG_INFO("executor has stopped: no data available")
|
|
||
| GAPI_Assert(m_sink_queues.size() == outs.size()); | ||
| GAPI_Assert(GModel::Graph(*m_orig_graph).metadata().get<Protocol>().out_nhs.size() == outs.size() && | ||
| "Number of data objects in cv::gout() must match the number of graph outputs in cv::GOut()"); |
There was a problem hiding this comment.
Can it be placed under GAPI_DbgAssert ? Then it doesn't evaluated for Release builds
| if (cv::util::holds_alternative<Stop>(cmd)) | ||
| { | ||
| cv::GRunArgs this_result; | ||
| if (!try_pull_impl(this_result)) { |
There was a problem hiding this comment.
The only difference between pulll and try_pull is here?
| try { | ||
| if (state == State::READY || state == State::RUNNING) | ||
| if (state == State::READY || state == State::RUNNING) { | ||
| stop(); |
There was a problem hiding this comment.
UB if stop is a virtual or invoke a virtual method inside. I checked code below and find out that it triggers stop_impl which is virtual and UB is possible
| void clear() { | ||
| for (auto &q : m_sync_queues) q.clear(); | ||
| m_sync_queues.clear(); | ||
| m_synchronized_emitters.clear(); |
There was a problem hiding this comment.
Are there guarantees that join called before clear?
otherwise i suggest to put join inside clear
| // Pipeline depth is equal to number of its (pipeline) steps. | ||
| auto has_queue_capacity = cv::gapi::getCompileArg<cv::gapi::streaming::queue_capacity>(m_comp_args); | ||
| const auto queue_capacity = has_queue_capacity ? has_queue_capacity->capacity : | ||
| 3*std::count_if |
| // Produces the next video frame when pulled. | ||
| case T::index_of<cv::gapi::wip::IStreamSource::Ptr>(): | ||
| #if !defined(GAPI_STANDALONE) | ||
| emitter.reset(new VideoEmitter{emit_arg}); |
There was a problem hiding this comment.
GAPI_LOG_INFO("create VideoEmitter ")
| default: | ||
| // Create a constant emitter. | ||
| // Produces always the same ("constant") value when pulled. | ||
| emitter.reset(new ConstEmitter{emit_arg}); |
There was a problem hiding this comment.
GAPI_LOG_INFO("create ConstEmitter" );
|
|
||
| class GTBBStreamingExecutor final : public GStreamingExecutor { | ||
| protected: | ||
| const bool m_desync; |
There was a problem hiding this comment.
I suggest to implement TWO different GTBBStreamingExecutor:
GTBB_SYNC_StreamingExecutor ---> GTBBStreamingExecutor <-- GTBB_DESYNC_StreamingExecutor
Do do not stick flag-based logic through single and huge GTBBStreamingExecutor
There was a problem hiding this comment.
@sivanov-work wouldn't it involve a huge chunk of code being copy-pasted?
|
I suggest that we separated utils-like |
| bool cv::GStreamingCompiled::Priv::pull(cv::GOptRunArgsP &&outs) | ||
| { | ||
| return m_exec->pull(std::move(outs)); | ||
| auto* exec = dynamic_cast<cv::gimpl::GTBBStreamingExecutor*>(m_exec.get()); |
There was a problem hiding this comment.
Why do we need those checks in each pull?
| bool virtual pull_impl(cv::GRunArgs& this_result) = 0; | ||
| bool virtual try_pull_impl(cv::GRunArgs& this_result) = 0; | ||
| void virtual stop_impl() = 0; | ||
| void virtual wait_shutdown() = 0; |
| void wait_shutdown(); | ||
|
|
||
| cv::GTypesInfo out_info; | ||
| void virtual start_impl() = 0; |
There was a problem hiding this comment.
Could you, please, explain why _impls are virtual and not the default functions?
There was a problem hiding this comment.
I see now that there is some little logic in those default functions. Is it OK to apply that logic for each potential nested executor class? If so, I'm OK with having virtual _impls
| static const char *name() { return "DesyncSpecialCase"; } | ||
| }; | ||
|
|
||
| std::vector<Q*> reader_queues( ade::Graph &g, |
There was a problem hiding this comment.
Alignment is broken here and below in input_queues
| bool available = std::get<2>(it); | ||
|
|
||
| using T = cv::GOptRunArgP; | ||
| #define HANDLE_CASE(Type) \ |
| auto &mat_opt = *cv::util::get<O<cv::Mat>*>(out_obj); | ||
| if (available) { | ||
| auto q_map = cv::util::get<cv::RMat>(res_obj).access(cv::RMat::Access::R); | ||
| // FIXME: Copy! Maybe we could do some optimization for this case! |
There was a problem hiding this comment.
Shouldn't we be concerned about this copy? Is there a task to fix it later if needed?
| // Need to eliminate throwings from stop(): make stop_impl() & wait_shutdown() no_throw | ||
| try { | ||
| if (state == State::READY || state == State::RUNNING) | ||
| if (state == State::READY || state == State::RUNNING) { |
There was a problem hiding this comment.
Shouldn't we make state an atomic/guard it? I guess the question is not related to this PR but still
There was a problem hiding this comment.
Please, ignore if it's not a critical section
This pull request separates
GStreamingExecutorinto interface andGTBBStreamingExecutorwhich is a TBB implementation for the streaming mode executor.Pull Request Readiness Checklist
See details at https://github.com/opencv/opencv/wiki/How_to_contribute#making-a-good-pull-request
Patch to opencv_extra has the same branch name.