Skip to content

Streaming inserts#94509

Merged
Fgrtue merged 46 commits intoClickHouse:masterfrom
Sasao4o:streaming_inserts
Feb 20, 2026
Merged

Streaming inserts#94509
Fgrtue merged 46 commits intoClickHouse:masterfrom
Sasao4o:streaming_inserts

Conversation

@Sasao4o
Copy link
Copy Markdown
Contributor

@Sasao4o Sasao4o commented Jan 17, 2026

Changelog category (leave one):

  • Improvement

Changelog entry (a user-readable short description of the changes that goes into CHANGELOG.md):

Added input_format_max_block_wait_ms setting to emit data blocks by timeout and allowed processing of remaining data when an HTTP connection is closed unexpectedly.

  • Example use

const http = require('http');

const CLICKHOUSE_HOST = 'localhost';
const CLICKHOUSE_PORT = 8123;

const QUERY =
 'INSERT INTO my_first_table ' +
 'SETTINGS input_format_max_block_size_bytes=20000, ' +
 'max_insert_block_size=100, ' +
 'input_format_parallel_parsing=0, ' +
 'input_format_max_block_wait_ms=3000 ' +
 'FORMAT JSONEachRow';  

const INTERVAL_MS = 100;

const N = parseInt(process.argv[2], 10);
if (isNaN(N) || N <= 0) {
 console.error('Usage: node script.js <N>');
 process.exit(1);
}

console.log(`Sending ${N} rows initially, then keeping connection open...`);
 
const options = {
 hostname: CLICKHOUSE_HOST,
 port: CLICKHOUSE_PORT,
 path: `/?max_query_size=1000&query=${encodeURIComponent(QUERY)}`,
 method: 'POST',
 headers: {
   'Content-Type': 'application/json',
   'Transfer-Encoding': 'chunked',
   'Connection': 'keep-alive',
 }
};

let request = null;
let shouldRun = true;
function createRequest() {
 request = http.request(options, (res) => {
   console.log(`Status: ${res.statusCode}`);

   let responseData = '';
   res.on('data', chunk => responseData += chunk);
   res.on('end', () => {
     console.log('Response ended:', responseData);
   });
 });

 request.on('error', (err) => {
   console.error('Request error:', err.message);
 });

 sendInitialRows();
}

async function sendInitialRows() {
 for (let i = 0; i < N; i++) {
   if (request.destroyed) break;

   const timestamp = Math.floor(Date.now() / 1000);
   const rowObj = {
     id: i + 1,
     message: 'hello',
     ts: timestamp,
     code: 3
   };
   const row = JSON.stringify(rowObj) + '\n'; // <-- JSON row

   request.write(row);
   console.log(`[${new Date().toISOString()}] Sent: ${row.trim()}`);

   await new Promise(r => setTimeout(r, INTERVAL_MS));
 }

 console.log(`Finished sending ${N} rows. Connection remains open.`);
 keepConnectionOpen();
}

async function keepConnectionOpen() {
 while (true) {
   await new Promise(r => setTimeout(r, 1000));
 }
}

process.on('SIGINT', () => {
 console.log('\nStopping stream...');
 if (request && !request.destroyed) {
   request.end();
 }
 process.exit(0);
});

createRequest();

This code demonstrates streaming inserts over HTTP and shows that ClickHouse can successfully insert records even when max_insert_block_size is not reached, by flushing data based on input_format_max_block_wait_ms. It also verifies that if the connection is unexpectedly closed (for example, due to a timeout), ClickHouse still correctly parses and inserts any remaining data instead of treating the situation as an error.

This closes #41439

@Sasao4o
Copy link
Copy Markdown
Contributor Author

Sasao4o commented Jan 18, 2026

While trying to write a Python unit test for this behavior, I noticed that emitting chunks does not necessarily mean the data is immediately inserted into the table. Based on this, I am thinking of generating a number of records smaller than max_insert_block_size and verifying that SELECT count(*) returns a value greater than zero, and that after the timeout expires, all records are eventually inserted (using JSONEachRow).

Does this sound like a reasonable way to test this feature?

Note: max_query_size must be explicitly set and input_format_parallel_parsing must be disabled (input_format_parallel_parsing = 0) so that this behavior can be observed with a small number of rows.

@alexey-milovidov alexey-milovidov added the can be tested Allows running workflows for external contributors label Jan 18, 2026
@clickhouse-gh
Copy link
Copy Markdown
Contributor

clickhouse-gh bot commented Jan 18, 2026

Workflow [PR], commit [1d826a9]

Summary:

@clickhouse-gh clickhouse-gh bot added the pr-feature Pull request with new product feature label Jan 18, 2026
@Fgrtue Fgrtue self-assigned this Jan 19, 2026
Copy link
Copy Markdown
Contributor

@Fgrtue Fgrtue left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi @Sasao4o. Thank you for contribution! The changes look good.

Regarding your questions on testing.

I am thinking of generating a number of records smaller than max_insert_block_size and verifying that SELECT count(*) returns a value greater than zero, and that after the timeout expires, all records are eventually inserted (using JSONEachRow).

This could be a way to got, yes. Although, I would suggest writing a bash test, since it should be more straightforward. We need to test two expected behaviors that you implemented:

  1. The input format should be able to form blocks of data for INSERT not only by the threshold on the number of rows or bytes but also by timeout. -- we could check then the number of parts (using one of system tables) that are inserted. The number of parts should be "cut" according with the time delimiter.

  2. When the connection is unexpectedly closed, it should parse and process the remaining data instead of treating it as an error. -- could be done similarly to the approach above, or probably even by using SELECT count()

@Sasao4o
Copy link
Copy Markdown
Contributor Author

Sasao4o commented Jan 23, 2026

Hi @Sasao4o. Thank you for contribution! The changes look good.

Regarding your questions on testing.

I am thinking of generating a number of records smaller than max_insert_block_size and verifying that SELECT count(*) returns a value greater than zero, and that after the timeout expires, all records are eventually inserted (using JSONEachRow).

This could be a way to got, yes. Although, I would suggest writing a bash test, since it should be more straightforward. We need to test two expected behaviors that you implemented:

  1. The input format should be able to form blocks of data for INSERT not only by the threshold on the number of rows or bytes but also by timeout. -- we could check then the number of parts (using one of system tables) that are inserted. The number of parts should be "cut" according with the time delimiter.
  2. When the connection is unexpectedly closed, it should parse and process the remaining data instead of treating it as an error. -- could be done similarly to the approach above, or probably even by using SELECT count()

You are welcome , my pleasure.
Yes the system parts thing actually made the first test really easy i was thinking how to query in middle of streaming :D thank you

the second test (the connection timeout one) i want remove this SLEEP 31(the default) to be sth smaller but http_receive_timeout setting does not seem to have any effect on my HTTP requests i also made sure that i wrote it in the correct place
bug
what do you think?

@Sasao4o
Copy link
Copy Markdown
Contributor Author

Sasao4o commented Jan 26, 2026

Hi @Sasao4o. Thank you for contribution! The changes look good.
Regarding your questions on testing.

I am thinking of generating a number of records smaller than max_insert_block_size and verifying that SELECT count(*) returns a value greater than zero, and that after the timeout expires, all records are eventually inserted (using JSONEachRow).

This could be a way to got, yes. Although, I would suggest writing a bash test, since it should be more straightforward. We need to test two expected behaviors that you implemented:

  1. The input format should be able to form blocks of data for INSERT not only by the threshold on the number of rows or bytes but also by timeout. -- we could check then the number of parts (using one of system tables) that are inserted. The number of parts should be "cut" according with the time delimiter.
  2. When the connection is unexpectedly closed, it should parse and process the remaining data instead of treating it as an error. -- could be done similarly to the approach above, or probably even by using SELECT count()

You are welcome , my pleasure. Yes the system parts thing actually made the first test really easy i was thinking how to query in middle of streaming :D thank you

the second test (the connection timeout one) i want remove this SLEEP 31(the default) to be sth smaller but http_receive_timeout setting does not seem to have any effect on my HTTP requests i also made sure that i wrote it in the correct place bug what do you think?

never mind i changed the test to get out of this timeout mess but i found out that UNEXPECTED_END_OF_LINE is thrown when connection is dropped at middle of streaming so i added it to the connection error codes

@clickhouse-gh clickhouse-gh bot added pr-improvement Pull request with some product improvements and removed pr-feature Pull request with new product feature labels Jan 26, 2026
@Fgrtue
Copy link
Copy Markdown
Contributor

Fgrtue commented Jan 27, 2026

@Sasao4o some of the test are failing. We should add some tags on your tests to prevent your tests to be added to some test suites. For example, no-async-insert is likely to be handy for you. Please take a look at other .sh tests and add that one.

Please also check parallel tests -- they are also giving wrong results. Just to be sure try to add SYSTEM FLUSH LOGS <log_system_tables> before you query the table. And if this doesn't help you can try to query other system databases for checking how many parts were inserted (see 03723_max_insert_block_size_bytes_http.sh as a reference).

In the drop test, we can try to increase the waiting time before killing the process. This might help exclude the latency problem if it is present.

If this doesn't help, it is likely that the test are incompatible with some other test suites, then we can remove those test suites using tags as well.

…rts table + increase waiting time to 3 sec before killing process
@Fgrtue
Copy link
Copy Markdown
Contributor

Fgrtue commented Jan 28, 2026

@Sasao4o it seems that we need to pull from master, and update the setting history file -- your setting should be now in 26-2 release.

@Sasao4o
Copy link
Copy Markdown
Contributor Author

Sasao4o commented Jan 28, 2026

@Sasao4o it seems that we need to pull from master, and update the setting history file -- your setting should be now in 26-2 release.

yea i felt happy when i knew that was the reason of test failure xd

@Sasao4o
Copy link
Copy Markdown
Contributor Author

Sasao4o commented Jan 29, 2026

Hello @Fgrtue ,
I ran the timeout tests many times locally using flaky-check, and I managed to reproduce the failure twice. After inspecting the system.parts logs table, I found that may be it is a latency issue.: sometimes it comes from the client side (the first part ends up larger than expected), and sometimes from the server side. Because of that, I increased the time on both sides.

For the drop-timeout test, I wasn’t able to reproduce the failure locally. I ran it many times with the parallel flag (-j) together with many other tests from the report, but it always succeeded. I also tried running it with the exact same settings, and it still succeeded. Do you think S3 could be the reason? (more latency or sth like this?

@Sasao4o
Copy link
Copy Markdown
Contributor Author

Sasao4o commented Feb 11, 2026

@Fgrtue Hello
Shouldn’t we add the following in IRowInputFormat.cpp?

if (params.in_transaction) throw

Otherwise, we won’t throw an error when inside a transaction, which could break atomicity. I was expecting 02435_rollback_cancelled_queries to fail. What do you think?

@Fgrtue
Copy link
Copy Markdown
Contributor

Fgrtue commented Feb 11, 2026

Let's see if the test fails this time.
I assume it failed last since we tried to implement the behavior for parallel parsing. So I assume we won't need anything extra.

@Fgrtue
Copy link
Copy Markdown
Contributor

Fgrtue commented Feb 11, 2026

Just to clarify, in which case do you suggest that we should throw and error?

@Sasao4o
Copy link
Copy Markdown
Contributor Author

Sasao4o commented Feb 11, 2026

Just to clarify, in which case do you suggest that we should throw and error?

If the INSERT is wrapped in BEGIN/END (transaction semantics), and the connection is dropped while we are shutting down gracefully, the rollback logic in executeQuery will not run.

at first i thought clickhouse doesn't support transactions but i found it as an experimental feature. So should we take it into account?

@Fgrtue
Copy link
Copy Markdown
Contributor

Fgrtue commented Feb 11, 2026

I see. Instead of propagating an exception in IRowInputFormat during graceful handling, let's better then throw an exception in format factory -- in case we are in transaction and input_format_max_block_wait_ms setting is non zero. This would allow users to clearly see that they shouldn't use this setting within transactions. Let's also document that it is impossible to use this setting when we use transactions.

@Sasao4o
Copy link
Copy Markdown
Contributor Author

Sasao4o commented Feb 11, 2026

I see. Instead of propagating an exception in IRowInputFormat during graceful handling, let's better then throw an exception in format factory -- in case we are in transaction and input_format_max_block_wait_ms setting is non zero. This would allow users to clearly see that they shouldn't use this setting within transactions. Let's also document that it is impossible to use this setting when we use transactions.

i didn't mean the input_format_max_block_wait_ms (i think it works well with transaction)

what i meant is what we are doing now

if (connectionError) don't throw and  emit chunk

so executeQuery doesn't know that there is a connection error so it doesnt execute rollback

I tested this locally using our test, and I added (implicit transaction = 1). I should find 0 parts, but I found more than 0

so the proposed solution is to pass is_transaction to iRowInputFormat (as it was before reverting)
and

if (!transaction && connectionError) safe to emit because we are not in txn

i also think we will need to catch the exception in this layer so we can rollback partially filled rows (the popBack() code)

@Fgrtue
Copy link
Copy Markdown
Contributor

Fgrtue commented Feb 11, 2026

I discussed with the team, and we think what should be done it to make another setting that will control this functionality with exception AND possibility to use input_format_max_block_wait_ms.

So we introduce a setting input_format_connection_handling (or something else) which can be either 0 or 1, and set to 0 by default. For this setting we document what behavior it controls -- When the connection is unexpectedly closed, it should parse and process the remaining data instead of treating it as an error.. Also, we should NOT be able to set input_format_max_block_wait_ms to non-zero value, unless input_format_connection_handling is true.

Then, in the implementation, instead of

if (connectionError) don't throw and  emit chunk

we should have

if (input_format_connection_handling && connectionError) ...

It should be clearly mentioned, that in case of connection error we cannot guarantee that the blocks of data will be deduplicated. Speaking about transactions, such a behavior can be expected with the semantics that this setting introduces -- that some of the data will be inserted even in case of connection error.

Comment on lines +35 to +42
sleep 8

kill -9 $PIPELINE_PID 2>/dev/null

wait $PIPELINE_PID 2>/dev/null


sleep 1
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

On the internal testing this test fails due to difference in result from the reference -- it gets 0 parts instead of 1 part. Let's try to increase the duration of sleep. The test suit that fails is related to s3 in asan build, so this is likely a latency issue.

@clickhouse-gh clickhouse-gh bot added the manual approve Manual approve required to run CI label Feb 14, 2026
@Fgrtue Fgrtue added this pull request to the merge queue Feb 20, 2026
Merged via the queue into ClickHouse:master with commit e77bae5 Feb 20, 2026
147 checks passed
@robot-clickhouse-ci-1 robot-clickhouse-ci-1 added the pr-synced-to-cloud The PR is synced to the cloud repo label Feb 20, 2026
Fgrtue added a commit that referenced this pull request Feb 23, 2026
@Fgrtue
Copy link
Copy Markdown
Contributor

Fgrtue commented Mar 17, 2026

The changes are under the setting so harmless for the production. Also, before merging I consulted with @tavplubix, who approved the changes.

@Fgrtue Fgrtue added post-approved Approved, but after the PR is merged. labels Mar 17, 2026
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

can be tested Allows running workflows for external contributors manual approve Manual approve required to run CI post-approved Approved, but after the PR is merged. pr-improvement Pull request with some product improvements pr-synced-to-cloud The PR is synced to the cloud repo

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Streaming Inserts

4 participants