@@ -51,6 +51,7 @@ namespace
5151{
5252 const auto RESCHEDULE_MS = 500 ;
5353 const auto CLEANUP_TIMEOUT_MS = 3000 ;
54+ const auto MAX_THREAD_WORK_DURATION_MS = 60000 ; // once per minute leave do reschedule (we can't lock threads in pool forever)
5455
5556 // / Configuration prefix
5657 const String CONFIG_PREFIX = " kafka" ;
@@ -386,6 +387,8 @@ void StorageKafka::threadFunc()
386387 size_t dependencies_count = DatabaseCatalog::instance ().getDependencies (table_id).size ();
387388 if (dependencies_count)
388389 {
390+ auto start_time = std::chrono::steady_clock::now ();
391+
389392 // Keep streaming as long as there are attached views and streaming is not cancelled
390393 while (!stream_cancelled && num_created_consumers > 0 )
391394 {
@@ -394,9 +397,21 @@ void StorageKafka::threadFunc()
394397
395398 LOG_DEBUG (log, " Started streaming to {} attached views" , dependencies_count);
396399
397- // Reschedule if not limited
398- if (!streamToViews ())
400+ // Exit the loop & reschedule if some stream stalled
401+ auto some_stream_is_stalled = streamToViews ();
402+ if (some_stream_is_stalled)
403+ {
404+ LOG_TRACE (log, " Stream(s) stalled. Reschedule." );
405+ break ;
406+ }
407+
408+ auto ts = std::chrono::steady_clock::now ();
409+ auto duration = std::chrono::duration_cast<std::chrono::milliseconds>(ts-start_time);
410+ if (duration.count () > MAX_THREAD_WORK_DURATION_MS)
411+ {
412+ LOG_TRACE (log, " Thread work duration limit exceeded. Reschedule." );
399413 break ;
414+ }
400415 }
401416 }
402417 }
@@ -459,15 +474,15 @@ bool StorageKafka::streamToViews()
459474 // It will be cancelled on underlying layer (kafka buffer)
460475 std::atomic<bool > stub = {false };
461476 copyData (*in, *block_io.out , &stub);
477+
478+ bool some_stream_is_stalled = false ;
462479 for (auto & stream : streams)
480+ {
481+ some_stream_is_stalled = some_stream_is_stalled || stream->as <KafkaBlockInputStream>()->isStalled ();
463482 stream->as <KafkaBlockInputStream>()->commit ();
483+ }
464484
465- // Check whether the limits were applied during query execution
466- bool limits_applied = false ;
467- const BlockStreamProfileInfo & info = in->getProfileInfo ();
468- limits_applied = info.hasAppliedLimit ();
469-
470- return limits_applied;
485+ return some_stream_is_stalled;
471486}
472487
473488void registerStorageKafka (StorageFactory & factory)
0 commit comments