Skip to content

Commit a4fa4ff

Browse files
Backport #72898 to 24.12: Revert "Revert "Revert "Revert "make d-tor Finalizer more obvious""""
1 parent f5da7b2 commit a4fa4ff

15 files changed

Lines changed: 282 additions & 27 deletions

src/Interpreters/InterpreterInsertQuery.cpp

Lines changed: 15 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -303,6 +303,7 @@ static bool isTrivialSelect(const ASTPtr & select)
303303

304304
Chain InterpreterInsertQuery::buildChain(
305305
const StoragePtr & table,
306+
size_t view_level,
306307
const StorageMetadataPtr & metadata_snapshot,
307308
const Names & columns,
308309
ThreadStatusesHolderPtr thread_status_holder,
@@ -324,7 +325,7 @@ Chain InterpreterInsertQuery::buildChain(
324325
if (check_access)
325326
getContext()->checkAccess(AccessType::INSERT, table->getStorageID(), sample.getNames());
326327

327-
Chain sink = buildSink(table, metadata_snapshot, thread_status_holder, running_group, elapsed_counter_ms);
328+
Chain sink = buildSink(table, view_level, metadata_snapshot, thread_status_holder, running_group, elapsed_counter_ms);
328329
Chain chain = buildPreSinkChain(sink.getInputHeader(), table, metadata_snapshot, sample);
329330

330331
chain.appendChain(std::move(sink));
@@ -333,6 +334,7 @@ Chain InterpreterInsertQuery::buildChain(
333334

334335
Chain InterpreterInsertQuery::buildSink(
335336
const StoragePtr & table,
337+
size_t view_level,
336338
const StorageMetadataPtr & metadata_snapshot,
337339
ThreadStatusesHolderPtr thread_status_holder,
338340
ThreadGroupPtr running_group,
@@ -361,7 +363,7 @@ Chain InterpreterInsertQuery::buildSink(
361363
else
362364
{
363365
out = buildPushingToViewsChain(table, metadata_snapshot, context_ptr,
364-
query_ptr, no_destination,
366+
query_ptr, view_level, no_destination,
365367
thread_status_holder, running_group, elapsed_counter_ms, async_insert);
366368
}
367369

@@ -423,7 +425,14 @@ Chain InterpreterInsertQuery::buildPreSinkChain(
423425
return out;
424426
}
425427

426-
std::pair<std::vector<Chain>, std::vector<Chain>> InterpreterInsertQuery::buildPreAndSinkChains(size_t presink_streams, size_t sink_streams, StoragePtr table, const StorageMetadataPtr & metadata_snapshot, const Block & query_sample_block)
428+
std::pair<std::vector<Chain>, std::vector<Chain>> InterpreterInsertQuery::buildPreAndSinkChains(
429+
size_t presink_streams,
430+
size_t sink_streams,
431+
StoragePtr table,
432+
size_t view_level,
433+
const StorageMetadataPtr & metadata_snapshot,
434+
const Block & query_sample_block
435+
)
427436
{
428437
chassert(presink_streams > 0);
429438
chassert(sink_streams > 0);
@@ -439,7 +448,7 @@ std::pair<std::vector<Chain>, std::vector<Chain>> InterpreterInsertQuery::buildP
439448

440449
for (size_t i = 0; i < sink_streams; ++i)
441450
{
442-
auto out = buildSink(table, metadata_snapshot, /* thread_status_holder= */ nullptr,
451+
auto out = buildSink(table, view_level, metadata_snapshot, /* thread_status_holder= */ nullptr,
443452
running_group, /* elapsed_counter_ms= */ nullptr);
444453

445454
sink_chains.emplace_back(std::move(out));
@@ -639,7 +648,7 @@ QueryPipeline InterpreterInsertQuery::buildInsertSelectPipeline(ASTInsertQuery &
639648

640649
auto [presink_chains, sink_chains] = buildPreAndSinkChains(
641650
presink_streams_size, sink_streams_size,
642-
table, metadata_snapshot, query_sample_block);
651+
table, /* view_level */ 0, metadata_snapshot, query_sample_block);
643652

644653
pipeline.resize(presink_chains.size());
645654

@@ -693,7 +702,7 @@ QueryPipeline InterpreterInsertQuery::buildInsertPipeline(ASTInsertQuery & query
693702
{
694703
auto [presink_chains, sink_chains] = buildPreAndSinkChains(
695704
/* presink_streams */1, /* sink_streams */1,
696-
table, metadata_snapshot, query_sample_block);
705+
table, /* view_level */ 0, metadata_snapshot, query_sample_block);
697706

698707
chain = std::move(presink_chains.front());
699708
chain.appendChain(std::move(sink_chains.front()));

src/Interpreters/InterpreterInsertQuery.h

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,7 @@ class InterpreterInsertQuery : public IInterpreter, WithContext
4343

4444
Chain buildChain(
4545
const StoragePtr & table,
46+
size_t view_level,
4647
const StorageMetadataPtr & metadata_snapshot,
4748
const Names & columns,
4849
ThreadStatusesHolderPtr thread_status_holder = {},
@@ -79,13 +80,20 @@ class InterpreterInsertQuery : public IInterpreter, WithContext
7980

8081
std::vector<std::unique_ptr<ReadBuffer>> owned_buffers;
8182

82-
std::pair<std::vector<Chain>, std::vector<Chain>> buildPreAndSinkChains(size_t presink_streams, size_t sink_streams, StoragePtr table, const StorageMetadataPtr & metadata_snapshot, const Block & query_sample_block);
83+
std::pair<std::vector<Chain>, std::vector<Chain>> buildPreAndSinkChains(
84+
size_t presink_streams,
85+
size_t sink_streams,
86+
StoragePtr table,
87+
size_t view_level,
88+
const StorageMetadataPtr & metadata_snapshot,
89+
const Block & query_sample_block);
8390

8491
QueryPipeline buildInsertSelectPipeline(ASTInsertQuery & query, StoragePtr table);
8592
QueryPipeline buildInsertPipeline(ASTInsertQuery & query, StoragePtr table);
8693

8794
Chain buildSink(
8895
const StoragePtr & table,
96+
size_t view_level,
8997
const StorageMetadataPtr & metadata_snapshot,
9098
ThreadStatusesHolderPtr thread_status_holder,
9199
ThreadGroupPtr running_group,

src/Processors/Executors/PushingPipelineExecutor.cpp

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -127,7 +127,10 @@ void PushingPipelineExecutor::finish()
127127
finished = true;
128128

129129
if (executor)
130-
executor->executeStep();
130+
{
131+
auto res = executor->executeStep();
132+
chassert(!res);
133+
}
131134
}
132135

133136
void PushingPipelineExecutor::cancel()

src/Processors/Transforms/ExceptionKeepingTransform.cpp

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
#include <exception>
12
#include <Processors/Transforms/ExceptionKeepingTransform.h>
23
#include <Common/ThreadStatus.h>
34
#include <Common/Stopwatch.h>

src/Processors/Transforms/buildPushingToViewsChain.cpp

Lines changed: 21 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@
3535

3636
#include <atomic>
3737
#include <chrono>
38+
#include <exception>
3839
#include <memory>
3940

4041

@@ -120,7 +121,7 @@ using ViewsDataPtr = std::shared_ptr<ViewsData>;
120121
class CopyingDataToViewsTransform final : public IProcessor
121122
{
122123
public:
123-
CopyingDataToViewsTransform(const Block & header, ViewsDataPtr data);
124+
CopyingDataToViewsTransform(const Block & header, ViewsDataPtr data, size_t view_level_);
124125

125126
String getName() const override { return "CopyingDataToViewsTransform"; }
126127
Status prepare() override;
@@ -129,6 +130,7 @@ class CopyingDataToViewsTransform final : public IProcessor
129130
private:
130131
InputPort & input;
131132
ViewsDataPtr views_data;
133+
size_t view_level;
132134
};
133135

134136
/// For source chunk, execute view query over it.
@@ -223,6 +225,7 @@ class FinalizingViewsTransform final : public IProcessor
223225
/// Generates one chain part for every view in buildPushingToViewsChain
224226
std::optional<Chain> generateViewChain(
225227
ContextPtr context,
228+
size_t view_level,
226229
const StorageID & view_id,
227230
ThreadGroupPtr running_group,
228231
Chain & result_chain,
@@ -369,7 +372,7 @@ std::optional<Chain> generateViewChain(
369372

370373
/// TODO: remove sql_security_type check after we turn `ignore_empty_sql_security_in_create_view_query=false`
371374
bool check_access = !materialized_view->hasInnerTable() && materialized_view->getInMemoryMetadataPtr()->sql_security_type;
372-
out = interpreter.buildChain(inner_table, inner_metadata_snapshot, insert_columns, thread_status_holder, view_counter_ms, check_access);
375+
out = interpreter.buildChain(inner_table, view_level + 1, inner_metadata_snapshot, insert_columns, thread_status_holder, view_counter_ms, check_access);
373376

374377
if (interpreter.shouldAddSquashingFroStorage(inner_table))
375378
{
@@ -400,6 +403,7 @@ std::optional<Chain> generateViewChain(
400403
query = live_view->getInnerQuery();
401404
out = buildPushingToViewsChain(
402405
view, view_metadata_snapshot, insert_context, ASTPtr(),
406+
view_level + 1,
403407
/* no_destination= */ true,
404408
thread_status_holder, running_group, view_counter_ms, async_insert, storage_header);
405409
}
@@ -409,12 +413,14 @@ std::optional<Chain> generateViewChain(
409413
query = window_view->getMergeableQuery();
410414
out = buildPushingToViewsChain(
411415
view, view_metadata_snapshot, insert_context, ASTPtr(),
416+
view_level + 1,
412417
/* no_destination= */ true,
413418
thread_status_holder, running_group, view_counter_ms, async_insert);
414419
}
415420
else
416421
out = buildPushingToViewsChain(
417422
view, view_metadata_snapshot, insert_context, ASTPtr(),
423+
view_level + 1,
418424
/* no_destination= */ false,
419425
thread_status_holder, running_group, view_counter_ms, async_insert);
420426

@@ -466,12 +472,14 @@ Chain buildPushingToViewsChain(
466472
const StorageMetadataPtr & metadata_snapshot,
467473
ContextPtr context,
468474
const ASTPtr & query_ptr,
475+
size_t view_level,
469476
bool no_destination,
470477
ThreadStatusesHolderPtr thread_status_holder,
471478
ThreadGroupPtr running_group,
472479
std::atomic_uint64_t * elapsed_counter_ms,
473480
bool async_insert,
474-
const Block & live_view_header)
481+
const Block & live_view_header
482+
)
475483
{
476484
checkStackSize();
477485
Chain result_chain;
@@ -514,7 +522,7 @@ Chain buildPushingToViewsChain(
514522
try
515523
{
516524
auto out = generateViewChain(
517-
context, view_id, running_group, result_chain,
525+
context, view_level, view_id, running_group, result_chain,
518526
views_data, thread_status_holder, async_insert, storage_header, disable_deduplication_for_children);
519527

520528
if (!out.has_value())
@@ -554,7 +562,7 @@ Chain buildPushingToViewsChain(
554562
for (const auto & chain : chains)
555563
headers.push_back(chain.getOutputHeader());
556564

557-
auto copying_data = std::make_shared<CopyingDataToViewsTransform>(storage_header, views_data);
565+
auto copying_data = std::make_shared<CopyingDataToViewsTransform>(storage_header, views_data, view_level);
558566
auto finalizing_views = std::make_shared<FinalizingViewsTransform>(std::move(headers), views_data);
559567
auto out = copying_data->getOutputs().begin();
560568
auto in = finalizing_views->getInputs().begin();
@@ -726,10 +734,11 @@ static void logQueryViews(std::list<ViewRuntimeData> & views, ContextPtr context
726734
}
727735

728736

729-
CopyingDataToViewsTransform::CopyingDataToViewsTransform(const Block & header, ViewsDataPtr data)
737+
CopyingDataToViewsTransform::CopyingDataToViewsTransform(const Block & header, ViewsDataPtr data, size_t view_level_)
730738
: IProcessor({header}, OutputPorts(data->views.size(), header))
731739
, input(inputs.front())
732740
, views_data(std::move(data))
741+
, view_level(view_level_)
733742
{
734743
if (views_data->views.empty())
735744
throw Exception(ErrorCodes::LOGICAL_ERROR, "CopyingDataToViewsTransform cannot have zero outputs");
@@ -765,6 +774,12 @@ IProcessor::Status CopyingDataToViewsTransform::prepare()
765774
auto data = input.pullData();
766775
if (data.exception)
767776
{
777+
// If view_level == 0 than the exception comes from the source table.
778+
// There is no case when we could tolerate exceptions from the source table.
779+
// Do not tolerate incoming exception and do not pass it to the following processors.
780+
if (view_level == 0)
781+
std::rethrow_exception(data.exception);
782+
768783
if (!views_data->has_exception)
769784
{
770785
views_data->first_exception = data.exception;

src/Processors/Transforms/buildPushingToViewsChain.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -60,6 +60,7 @@ Chain buildPushingToViewsChain(
6060
const StorageMetadataPtr & metadata_snapshot,
6161
ContextPtr context,
6262
const ASTPtr & query_ptr,
63+
size_t view_level,
6364
/// It is true when we should not insert into table, but only to views.
6465
/// Used e.g. for kafka. We should try to remove it somehow.
6566
bool no_destination,

src/Storages/MergeTree/MergeTreeSink.cpp

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
#include <exception>
12
#include <Storages/MergeTree/MergeTreeSink.h>
23
#include <Storages/StorageMergeTree.h>
34
#include <Interpreters/PartLog.h>
@@ -44,6 +45,8 @@ MergeTreeSink::~MergeTreeSink()
4445
if (!delayed_chunk)
4546
return;
4647

48+
chassert(isCancelled() || std::uncaught_exceptions());
49+
4750
for (auto & partition : delayed_chunk->partitions)
4851
{
4952
partition.temp_part.cancel();
@@ -76,6 +79,7 @@ void MergeTreeSink::onStart()
7679
void MergeTreeSink::onFinish()
7780
{
7881
chassert(!isCancelled());
82+
7983
finishDelayedChunk();
8084
}
8185

src/Storages/MergeTree/MergedBlockOutputStream.cpp

Lines changed: 5 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -119,7 +119,7 @@ struct MergedBlockOutputStream::Finalizer::Impl
119119
}
120120

121121
void finish();
122-
void cancel();
122+
void cancel() noexcept;
123123
};
124124

125125
void MergedBlockOutputStream::Finalizer::finish()
@@ -130,7 +130,7 @@ void MergedBlockOutputStream::Finalizer::finish()
130130
to_finish->finish();
131131
}
132132

133-
void MergedBlockOutputStream::Finalizer::cancel()
133+
void MergedBlockOutputStream::Finalizer::cancel() noexcept
134134
{
135135
std::unique_ptr<Impl> to_cancel = std::move(impl);
136136
impl.reset();
@@ -167,7 +167,7 @@ void MergedBlockOutputStream::Finalizer::Impl::finish()
167167
part->getDataPartStorage().removeFile(file_name);
168168
}
169169

170-
void MergedBlockOutputStream::Finalizer::Impl::cancel()
170+
void MergedBlockOutputStream::Finalizer::Impl::cancel() noexcept
171171
{
172172
writer.cancel();
173173

@@ -183,15 +183,8 @@ MergedBlockOutputStream::Finalizer::Finalizer(std::unique_ptr<Impl> impl_) : imp
183183

184184
MergedBlockOutputStream::Finalizer::~Finalizer()
185185
{
186-
try
187-
{
188-
if (impl)
189-
finish();
190-
}
191-
catch (...)
192-
{
193-
tryLogCurrentException(__PRETTY_FUNCTION__);
194-
}
186+
if (impl)
187+
cancel();
195188
}
196189

197190

src/Storages/MergeTree/MergedBlockOutputStream.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -55,7 +55,7 @@ class MergedBlockOutputStream final : public IMergedBlockOutputStream
5555
~Finalizer();
5656

5757
void finish();
58-
void cancel();
58+
void cancel() noexcept;
5959
};
6060

6161
/// Finalize writing part and fill inner structures

src/Storages/MergeTree/ReplicatedMergeTreeSink.cpp

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -179,6 +179,8 @@ ReplicatedMergeTreeSinkImpl<async_insert>::~ReplicatedMergeTreeSinkImpl()
179179
if (!delayed_chunk)
180180
return;
181181

182+
chassert(isCancelled() || std::uncaught_exceptions());
183+
182184
for (auto & partition : delayed_chunk->partitions)
183185
{
184186
partition.temp_part.cancel();

0 commit comments

Comments
 (0)