Skip to content

Add support for rd_kafka_destroy_flags.#247

Merged
mfontanini merged 2 commits intomfontanini:masterfrom
filimonov:kafka_destroy_flags3
May 23, 2020
Merged

Add support for rd_kafka_destroy_flags.#247
mfontanini merged 2 commits intomfontanini:masterfrom
filimonov:kafka_destroy_flags3

Conversation

@filimonov
Copy link
Copy Markdown
Contributor

@filimonov filimonov commented May 21, 2020

That allows to set flags for rd_kafka_destroy_flags in the following manner:

     consumer.set_destroy_flags(RD_KAFKA_DESTROY_F_NO_CONSUMER_CLOSE);

It allows us to prevents potential hang during termination if consumer.close() is failing because of queued callback calls.

Refs:

More:

  • Maybe it can also be used as a default (because Consumer destructor call consumer_close)
  • or that flag maybe enabled automatically if an exception happens during consumer_close call in consumer destructor.
  • Most probably that test can be enabled:
    // This test may fail due to what seems to be an rdkafka bug. Skip it for now until we're
    // certain of what to do
    TEST_CASE("Event consumption", "[!hide][consumer]") {
    // Create a consumer and subscribe to the topic
    Consumer consumer(make_consumer_config());
    consumer.subscribe({ KAFKA_TOPICS[0] });
    vector<rd_kafka_event_type_t> types = {
    RD_KAFKA_EVENT_NONE
    };
    Queue queue = consumer.get_main_queue();
    for (const auto type : types) {
    const Event event = queue.next_event();
    CHECK(event.get_type() == type);
    }
    }

Many thanks to @azat for help.

Copy link
Copy Markdown
Contributor

@accelerated accelerated left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just my 2 cents on this PR. Good idea to port this API!

@filimonov
Copy link
Copy Markdown
Contributor Author

BTW: it didn't help with the original problem :\ Still need draining after unsubscribe: confluentinc/librdkafka#2898

@accelerated
Copy link
Copy Markdown
Contributor

Why do you need to connect 2 consumers inside the same app to the same topic?

@filimonov
Copy link
Copy Markdown
Contributor Author

filimonov commented May 22, 2020

3 reasons:

  1. actually i was just testing the behavior after rebalance and if was the simplest option to trigger rebalance
  2. it's out of my control, users of ClickHouse can create several 'tables' = 'kafka consumers'. I can't restrict that.
  3. parallel consumption is done by running several consumers instead of several queues connected to the same consumer (legacy code, should be rewritten, bit tricky).

@accelerated
Copy link
Copy Markdown
Contributor

I would recommend not having multiple consumers in the same app if you can. It's very counterintuitive and you may run into other issues down the line.

@filimonov
Copy link
Copy Markdown
Contributor Author

I would recommend not having multiple consumers in the same app if you can.

I can't :)

@accelerated
Copy link
Copy Markdown
Contributor

@filimonov Also, since HandleDeleter cannot be set with a null KafkaHandleBase I would recommend capturing it by reference. It makes intent clearer in the class design.

@filimonov
Copy link
Copy Markdown
Contributor Author

I've tried, but it seem like it will require deeper look to do that

[build] /home/mfilimonov/workspace/ClickHouse/contrib/libcxx/include/memory:2538:21: error: object of type 'cppkafka::KafkaHandleBase::HandleDeleter' cannot be assigned because its copy assignment operator is implicitly deleted
[build]     __ptr_.second() = _VSTD::forward<deleter_type>(__u.get_deleter());
[build]                     ^
[build] /home/mfilimonov/workspace/ClickHouse/contrib/cppkafka/src/kafka_handle_base.cpp:216:13: note: in instantiation of member function 'std::__1::unique_ptr<rd_kafka_s, cppkafka::KafkaHandleBase::HandleDeleter>::operator=' requested here
[build]     handle_ = HandlePtr(handle, HandleDeleter(*this));

If you can improve that - please do.

@accelerated
Copy link
Copy Markdown
Contributor

accelerated commented May 22, 2020

Yeah I just see there's a requirement for the deleter to be default constructible so yes, you can't use any references unfortunately. Will let @mfontanini review. Looks good to me.

@mfontanini mfontanini merged commit ca3a132 into mfontanini:master May 23, 2020
@mfontanini
Copy link
Copy Markdown
Owner

Thanks for the PR!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants