3535
3636#include < atomic>
3737#include < chrono>
38+ #include < exception>
3839#include < memory>
3940
4041
@@ -120,7 +121,7 @@ using ViewsDataPtr = std::shared_ptr<ViewsData>;
120121class CopyingDataToViewsTransform final : public IProcessor
121122{
122123public:
123- CopyingDataToViewsTransform (const Block & header, ViewsDataPtr data);
124+ CopyingDataToViewsTransform (const Block & header, ViewsDataPtr data, size_t view_level_ );
124125
125126 String getName () const override { return " CopyingDataToViewsTransform" ; }
126127 Status prepare () override ;
@@ -129,6 +130,7 @@ class CopyingDataToViewsTransform final : public IProcessor
129130private:
130131 InputPort & input;
131132 ViewsDataPtr views_data;
133+ size_t view_level;
132134};
133135
134136// / For source chunk, execute view query over it.
@@ -223,6 +225,7 @@ class FinalizingViewsTransform final : public IProcessor
223225// / Generates one chain part for every view in buildPushingToViewsChain
224226std::optional<Chain> generateViewChain (
225227 ContextPtr context,
228+ size_t view_level,
226229 const StorageID & view_id,
227230 ThreadGroupPtr running_group,
228231 Chain & result_chain,
@@ -369,7 +372,7 @@ std::optional<Chain> generateViewChain(
369372
370373 // / TODO: remove sql_security_type check after we turn `ignore_empty_sql_security_in_create_view_query=false`
371374 bool check_access = !materialized_view->hasInnerTable () && materialized_view->getInMemoryMetadataPtr ()->sql_security_type ;
372- out = interpreter.buildChain (inner_table, inner_metadata_snapshot, insert_columns, thread_status_holder, view_counter_ms, check_access);
375+ out = interpreter.buildChain (inner_table, view_level + 1 , inner_metadata_snapshot, insert_columns, thread_status_holder, view_counter_ms, check_access);
373376
374377 if (interpreter.shouldAddSquashingFroStorage (inner_table))
375378 {
@@ -400,6 +403,7 @@ std::optional<Chain> generateViewChain(
400403 query = live_view->getInnerQuery ();
401404 out = buildPushingToViewsChain (
402405 view, view_metadata_snapshot, insert_context, ASTPtr (),
406+ view_level + 1 ,
403407 /* no_destination= */ true ,
404408 thread_status_holder, running_group, view_counter_ms, async_insert, storage_header);
405409 }
@@ -409,12 +413,14 @@ std::optional<Chain> generateViewChain(
409413 query = window_view->getMergeableQuery ();
410414 out = buildPushingToViewsChain (
411415 view, view_metadata_snapshot, insert_context, ASTPtr (),
416+ view_level + 1 ,
412417 /* no_destination= */ true ,
413418 thread_status_holder, running_group, view_counter_ms, async_insert);
414419 }
415420 else
416421 out = buildPushingToViewsChain (
417422 view, view_metadata_snapshot, insert_context, ASTPtr (),
423+ view_level + 1 ,
418424 /* no_destination= */ false ,
419425 thread_status_holder, running_group, view_counter_ms, async_insert);
420426
@@ -466,12 +472,14 @@ Chain buildPushingToViewsChain(
466472 const StorageMetadataPtr & metadata_snapshot,
467473 ContextPtr context,
468474 const ASTPtr & query_ptr,
475+ size_t view_level,
469476 bool no_destination,
470477 ThreadStatusesHolderPtr thread_status_holder,
471478 ThreadGroupPtr running_group,
472479 std::atomic_uint64_t * elapsed_counter_ms,
473480 bool async_insert,
474- const Block & live_view_header)
481+ const Block & live_view_header
482+ )
475483{
476484 checkStackSize ();
477485 Chain result_chain;
@@ -514,7 +522,7 @@ Chain buildPushingToViewsChain(
514522 try
515523 {
516524 auto out = generateViewChain (
517- context, view_id, running_group, result_chain,
525+ context, view_level, view_id, running_group, result_chain,
518526 views_data, thread_status_holder, async_insert, storage_header, disable_deduplication_for_children);
519527
520528 if (!out.has_value ())
@@ -554,7 +562,7 @@ Chain buildPushingToViewsChain(
554562 for (const auto & chain : chains)
555563 headers.push_back (chain.getOutputHeader ());
556564
557- auto copying_data = std::make_shared<CopyingDataToViewsTransform>(storage_header, views_data);
565+ auto copying_data = std::make_shared<CopyingDataToViewsTransform>(storage_header, views_data, view_level );
558566 auto finalizing_views = std::make_shared<FinalizingViewsTransform>(std::move (headers), views_data);
559567 auto out = copying_data->getOutputs ().begin ();
560568 auto in = finalizing_views->getInputs ().begin ();
@@ -726,10 +734,11 @@ static void logQueryViews(std::list<ViewRuntimeData> & views, ContextPtr context
726734}
727735
728736
729- CopyingDataToViewsTransform::CopyingDataToViewsTransform (const Block & header, ViewsDataPtr data)
737+ CopyingDataToViewsTransform::CopyingDataToViewsTransform (const Block & header, ViewsDataPtr data, size_t view_level_ )
730738 : IProcessor({header}, OutputPorts(data->views.size(), header))
731739 , input(inputs.front())
732740 , views_data(std::move(data))
741+ , view_level(view_level_)
733742{
734743 if (views_data->views .empty ())
735744 throw Exception (ErrorCodes::LOGICAL_ERROR, " CopyingDataToViewsTransform cannot have zero outputs" );
@@ -765,6 +774,12 @@ IProcessor::Status CopyingDataToViewsTransform::prepare()
765774 auto data = input.pullData ();
766775 if (data.exception )
767776 {
777+ // If view_level == 0 than the exception comes from the source table.
778+ // There is no case when we could tolerate exceptions from the source table.
779+ // Do not tolerate incoming exception and do not pass it to the following processors.
780+ if (view_level == 0 )
781+ std::rethrow_exception (data.exception );
782+
768783 if (!views_data->has_exception )
769784 {
770785 views_data->first_exception = data.exception ;
0 commit comments