Skip to content

Commit fd017ba

Browse files
Revert "turn on async insert"
1 parent c88c91a commit fd017ba

32 files changed

Lines changed: 33 additions & 154 deletions

File tree

src/Core/Settings.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6026,7 +6026,7 @@ Hard lower limit on the task size (even when the number of granules is low and t
60266026
Only has an effect in ClickHouse Cloud. Number of granules in stripe of compact part of MergeTree tables to use multibuffer reader, which supports parallel reading and prefetch. In case of reading from remote fs using of multibuffer reader increases number of read request.
60276027
)", 0) \
60286028
\
6029-
DECLARE(Bool, async_insert, true, R"(
6029+
DECLARE(Bool, async_insert, false, R"(
60306030
If true, data from INSERT query is stored in queue and later flushed to table in background. If wait_for_async_insert is false, INSERT query is processed almost instantly, otherwise client will wait until data will be flushed to table
60316031
)", 0) \
60326032
DECLARE(Bool, wait_for_async_insert, true, R"(

src/Core/SettingsChangesHistory.cpp

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -57,7 +57,6 @@ const VersionToSettingsChangesMap & getSettingsChangesHistory()
5757
{"ast_fuzzer_runs", 0, 0, "New setting to enable server-side AST fuzzer."},
5858
{"ast_fuzzer_any_query", false, false, "New setting to allow fuzzing all query types, not just read-only."},
5959
{"check_named_collection_dependencies", true, true, "New setting to check if dropping a named collection would break dependent tables."},
60-
{"async_insert", false, true, "Enable async inserts by default."},
6160
{"deduplicate_blocks_in_dependent_materialized_views", false, true, "Enable deduplication for dependent materialized views by default."},
6261
{"deduplicate_insert", "backward_compatible_choice", "enable", "Enable deduplication for all sync and async inserts by default."},
6362
{"enable_join_runtime_filters", false, true, "Enabled this optimization"},

src/Interpreters/AsynchronousInsertQueue.cpp

Lines changed: 9 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,4 @@
11
#include <future>
2-
#include <string>
32
#include <vector>
43
#include <Interpreters/AsynchronousInsertQueue.h>
54

@@ -396,13 +395,7 @@ void AsynchronousInsertQueue::preprocessInsertQuery(const ASTPtr & query, const
396395
/* async_insert */ false);
397396

398397
auto table = interpreter.getTable(insert_query);
399-
auto sample_block = InterpreterInsertQuery::getSampleBlock(
400-
insert_query,
401-
table,
402-
table->getInMemoryMetadataPtr(),
403-
query_context,
404-
/* no_destination */false,
405-
insert_context->getSettingsRef()[Setting::insert_allow_materialized_columns]);
398+
auto sample_block = InterpreterInsertQuery::getSampleBlock(insert_query, table, table->getInMemoryMetadataPtr(), query_context);
406399

407400
if (!FormatFactory::instance().isInputFormat(insert_query.format))
408401
{
@@ -500,14 +493,6 @@ AsynchronousInsertQueue::PushResult AsynchronousInsertQueue::pushQueryWithBlock(
500493
return pushDataChunk(std::move(query), std::move(block), std::move(query_context));
501494
}
502495

503-
std::vector<std::string> AsynchronousInsertQueue::getInsertQueryIds(InsertData & data)
504-
{
505-
std::vector<std::string> query_ids;
506-
for (const auto & entry : data.entries)
507-
query_ids.push_back(entry->query_id);
508-
return query_ids;
509-
}
510-
511496
AsynchronousInsertQueue::PushResult AsynchronousInsertQueue::pushDataChunk(ASTPtr query, DataChunk && chunk, ContextPtr query_context)
512497
{
513498
const auto & settings = query_context->getSettingsRef();
@@ -569,8 +554,8 @@ AsynchronousInsertQueue::PushResult AsynchronousInsertQueue::pushDataChunk(ASTPt
569554
data->entries.emplace_back(entry);
570555
insert_future = entry->getFuture();
571556

572-
LOG_TRACE(log, "Have {} pending inserts in shard {} with total {} bytes of data for the async insert queries '{}'",
573-
data->entries.size(), size_t(shard_num), data->size_in_bytes, fmt::join(getInsertQueryIds(*data), ", "));
557+
LOG_TRACE(log, "Have {} pending inserts in shard {} with total {} bytes of data for query '{}'",
558+
data->entries.size(), size_t(shard_num), data->size_in_bytes, key.query_str);
574559

575560
bool has_enough_bytes = data->size_in_bytes >= (*key.settings)[Setting::async_insert_max_data_size];
576561
bool has_enough_queries
@@ -591,7 +576,9 @@ AsynchronousInsertQueue::PushResult AsynchronousInsertQueue::pushDataChunk(ASTPt
591576
/// This works because queries with the same set of settings are already grouped together.
592577
if (!flush_stopped && (has_enough_bytes || has_enough_queries || max_busy_timeout_exceeded()))
593578
{
594-
LOG_TRACE(log, "Scheduling async insert processing job because {}",
579+
LOG_DEBUG(log, "Scheduling async insert processing job for query '{}' "
580+
"because {}",
581+
key.query_str,
595582
has_enough_bytes ? "enough bytes accumulated" :
596583
has_enough_queries ? "enough queries accumulated" :
597584
"maximum busy wait timeout exceeded");
@@ -1004,8 +991,6 @@ try
1004991
else
1005992
query_scope = CurrentThread::QueryScope::create(insert_context);
1006993

1007-
LOG_DEBUG(log, "Processing batch insert for the async inserts '{}'", fmt::join(getInsertQueryIds(*data), ", "));
1008-
1009994
String query_for_logging = serializeQuery(*key.query, insert_context->getSettingsRef()[Setting::log_queries_cut_to_length]);
1010995
UInt64 normalized_query_hash = normalizedQueryHash(query_for_logging, false);
1011996

@@ -1148,7 +1133,7 @@ try
11481133
appendElementsToLogSafe(*async_insert_log, std::move(log_elements), flush_time, "");
11491134
}
11501135

1151-
LOG_DEBUG(log, "Flushed {} rows, {} bytes", num_rows, num_bytes);
1136+
LOG_DEBUG(log, "Flushed {} rows, {} bytes for query '{}'", num_rows, num_bytes, key.query_str);
11521137
queue_shard_flush_time_history.updateWithCurrentTime();
11531138

11541139
LOG_DEBUG(log, "Asynchronous insert query logQueryFinish query_kind '{}', 'query_id {}'", query_log_elem.query_kind, query_log_elem.client_info.current_query_id);
@@ -1250,8 +1235,8 @@ Chunk AsynchronousInsertQueue::processEntriesWithParsing(
12501235
auto on_error = [&](const MutableColumns & result_columns, const ColumnCheckpoints & checkpoints, Exception & e)
12511236
{
12521237
current_exception = e.displayText();
1253-
LOG_ERROR(logger, "Failed parsing for insert query id {}. {}",
1254-
current_entry->query_id, current_exception);
1238+
LOG_ERROR(logger, "Failed parsing for query '{}' with query id {}. {}",
1239+
key.query_str, current_entry->query_id, current_exception);
12551240

12561241
for (size_t i = 0; i < result_columns.size(); ++i)
12571242
result_columns[i]->rollback(*checkpoints[i]);

src/Interpreters/AsynchronousInsertQueue.h

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -306,8 +306,6 @@ class AsynchronousInsertQueue : public WithContext
306306
template <typename E>
307307
static void finishWithException(const ASTPtr & query, const std::list<InsertData::EntryPtr> & entries, const E & exception);
308308

309-
static std::vector<std::string> getInsertQueryIds(InsertData & data);
310-
311309
public:
312310
auto getQueueLocked(size_t shard_num) const
313311
{

tests/config/install.sh

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -375,8 +375,6 @@ fi
375375

376376
if [[ "$USE_ASYNC_INSERT" == "1" ]]; then
377377
ln -sf $SRC_PATH/users.d/enable_async_inserts.xml $DEST_SERVER_PATH/users.d/
378-
else
379-
ln -sf $SRC_PATH/users.d/disable_async_inserts.xml $DEST_SERVER_PATH/users.d/
380378
fi
381379

382380
if [[ "$USE_DATABASE_REPLICATED" == "1" ]]; then

tests/config/users.d/disable_async_inserts.xml

Lines changed: 0 additions & 7 deletions
This file was deleted.

tests/integration/test_checking_s3_blobs_paranoid/configs/setting.xml

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,6 @@
33
<clickhouse>
44
<profiles>
55
<default>
6-
<async_insert>0</async_insert>
76
<s3_check_objects_after_upload>1</s3_check_objects_after_upload>
87
<enable_s3_requests_logging>1</enable_s3_requests_logging>
98
<s3_connect_timeout_ms>10000</s3_connect_timeout_ms>

tests/integration/test_extreme_deduplication/configs/sync_insert.xml

Lines changed: 0 additions & 7 deletions
This file was deleted.

tests/integration/test_extreme_deduplication/test.py

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -11,14 +11,12 @@
1111
node1 = cluster.add_instance(
1212
"node1",
1313
main_configs=["configs/conf.d/merge_tree.xml", "configs/conf.d/remote_servers.xml"],
14-
user_configs=["configs/sync_insert.xml"],
1514
with_zookeeper=True,
1615
macros={"layer": 0, "shard": 0, "replica": 1},
1716
)
1817
node2 = cluster.add_instance(
1918
"node2",
2019
main_configs=["configs/conf.d/merge_tree.xml", "configs/conf.d/remote_servers.xml"],
21-
user_configs=["configs/sync_insert.xml"],
2220
with_zookeeper=True,
2321
macros={"layer": 0, "shard": 0, "replica": 2},
2422
)
@@ -39,9 +37,8 @@ def started_cluster():
3937
def test_deduplication_window_in_seconds(started_cluster):
4038
node = node1
4139

42-
node.query(
40+
node1.query(
4341
"""
44-
DROP TABLE IF EXISTS simple ON CLUSTER test_cluster SYNC;
4542
CREATE TABLE simple ON CLUSTER test_cluster (date Date, id UInt32)
4643
ENGINE = ReplicatedMergeTree('/clickhouse/tables/{shard}/simple', '{replica}') PARTITION BY toYYYYMM(date) ORDER BY id"""
4744
)
@@ -72,3 +69,5 @@ def test_deduplication_window_in_seconds(started_cluster):
7269
"INSERT INTO simple VALUES (0, 0)"
7370
) # Deduplication doesn't work here as the first hash node was deleted
7471
assert TSV.toMat(node.query("SELECT count() FROM simple"))[0][0] == "3"
72+
73+
node1.query("""DROP TABLE simple ON CLUSTER test_cluster""")

tests/integration/test_insert_over_http_query_log/configs/users.xml

Lines changed: 0 additions & 7 deletions
This file was deleted.

0 commit comments

Comments
 (0)