@@ -530,8 +530,11 @@ KafkaConsumerPtr StorageKafka::popConsumer(std::chrono::milliseconds timeout)
530530 else if (!ret_consumer_ptr && closed_consumer_index.has_value ())
531531 {
532532 ret_consumer_ptr = consumers[*closed_consumer_index];
533+
534+ cppkafka::Configuration consumer_config = getConsumerConfiguration (*closed_consumer_index);
533535 // / It should be OK to create consumer under lock, since it should be fast (without subscribing).
534- ret_consumer_ptr->setConsumer (createConsumer (*ret_consumer_ptr, *closed_consumer_index));
536+ ret_consumer_ptr->createConsumer (consumer_config);
537+ LOG_TRACE (log, " Created #{} consumer" , *closed_consumer_index);
535538 }
536539 // / 3. There is no free consumer and num_consumers already created, waiting @timeout.
537540 else
@@ -576,29 +579,6 @@ KafkaConsumerPtr StorageKafka::createKafkaConsumer(size_t consumer_number)
576579 return kafka_consumer_ptr;
577580}
578581
579- ConsumerPtr StorageKafka::createConsumer (KafkaConsumer & kafka_consumer, size_t consumer_number)
580- {
581- cppkafka::Configuration consumer_config = getConsumerConfiguration (consumer_number);
582-
583- // / Using KafkaConsumer by reference should be safe, since
584- // / cppkafka::Consumer can poll messages (including statistics, which will
585- // / trigger the callback below) only via KafkaConsumer.
586- if (consumer_config.get (" statistics.interval.ms" ) != " 0" )
587- {
588- consumer_config.set_stats_callback ([&kafka_consumer](cppkafka::KafkaHandleBase &, const std::string & stat_json)
589- {
590- kafka_consumer.setRDKafkaStat (stat_json);
591- });
592- }
593-
594- auto consumer_ptr = std::make_shared<cppkafka::Consumer>(consumer_config);
595- consumer_ptr->set_destroy_flags (RD_KAFKA_DESTROY_F_NO_CONSUMER_CLOSE);
596-
597- LOG_TRACE (log, " Created #{} consumer" , consumer_number);
598-
599- return consumer_ptr;
600- }
601-
602582cppkafka::Configuration StorageKafka::getConsumerConfiguration (size_t consumer_number)
603583{
604584 cppkafka::Configuration conf;
0 commit comments