rgw/kafka: do not destroy the connection on errors#56033
Conversation
|
This pull request can no longer be automatically merged: a rebase is needed and changes have to be manually resolved |
8c401a9 to
7dee1d1
Compare
|
jenkins test make check |
|
i see that #55051 closed. should https://tracker.ceph.com/issues/63915 point to this pr now? |
pr #55051 was an attempt to add new functionality, allowing the propagation of kafka errors back to the user. |
|
@cbodley there is one bug fix here, which is causing a crash-on-close. should i open a tracker for it and link to this PR? |
yes, since i assume we need backport to squid at least |
src/rgw/rgw_kafka.cc
Outdated
| auto reply_count = 0U; | ||
| const auto send_count = messages.consume_all(std::bind(&Manager::publish_internal, this, std::placeholders::_1)); | ||
| std::vector<message_wrapper_t*> local_messages; | ||
| const auto send_count = messages.consume_all([&local_messages](auto message) {local_messages.push_back(message);}); |
There was a problem hiding this comment.
consume_all is a non-blocking function and prior to your changes, it was invoking publish_internal which was run async in diff thread where the conn was used. and while below code had a block where it checks to conn to be idle and if yes it deletes the connection.
so there were chances of race condition where before publish_internal goes and updates the conn->time the idle check was executed and conn was destroyed causing the race condition and crash when publish_internal tries to access the conn
if yes, i just have 1 doubt, consume_all will still run async and local_messages will not be populated immediately and then std::for_each would loop over the local_messages which would not have populated and this will result in loss of messages being delivered?
with current changes, you are consuming all the messages and then calling the publish_internal in std::for_each loop so ensuring the idle check will be executed only after the publish_internal as been completed for all of the messages and then this would avoid the race condition as destroy and publish_internal would never be executed in parallel?
There was a problem hiding this comment.
we were running in the same thread (the kafka manager thread) even before the change.
nothing was running in paralel, and consume_all is completly syncronouse, so, i don't think there was a risk for race condition.
the main reason for the change is that before the change, traversing the queue was taking longer (since we called publish_internal as we traversed the queue.
this means that it is more likely for the thread that pushes into the queue to find it full.
however, i did not see any visible performence improvments with the change, so i'm also ok with reverting it.
There was a problem hiding this comment.
probably reverting the change would make sense then for consume_all ??
There was a problem hiding this comment.
ok. will revert that
|
This pull request can no longer be automatically merged: a rebase is needed and changes have to be manually resolved |
|
@yuvalif do we not have a tracker for this |
This is crash on shutdown, so unlikely tonbe observed. This is mainly a code cleanup pr |
nope, we are able to repro this all the time |
i never saw this issue. but this just make this fix even more critical. |
yeah 100%, we need to backport it to squid for sure |
7dee1d1 to
3a4749e
Compare
|
jenkins test api |
3a4749e to
a1edb0d
Compare
as well as other simplifications: * do not store temporary configuration in the connection object. just use as a local variable * do not create a connection without a producer other improvements: * copy to a local list before publishing * convert internal error codes to errno Fixes: https://tracker.ceph.com/issues/66017 Signed-off-by: Yuval Lifshitz <ylifshit@ibm.com>
a1edb0d to
1b6e850
Compare
|
teuthology passing (kafka tests): https://pulpito.ceph.com/yuvalif-2024-05-23_10:33:57-rgw:notifications-wip-yuval-kafka-cleanup-distro-default-smithi/ |
|
jenkins test make check |
|
jenkins test docs |
|
jenkins render docs |
|
jenkins test windows |
| rd_kafka_err2str(result) << dendl; | ||
| } else { | ||
| ldout(conn->cct, 1) << "Kafka run: nack received with result=" << | ||
| rd_kafka_err2str(result) << dendl; |
There was a problem hiding this comment.
nit, if you could add the broker and topic name here and also for error after calling rd_kafka_produce at line 428
"topic: " << rd_kafka_name(rk) << " broker: " << conn->broker << dendl
There was a problem hiding this comment.
will do a round of debug log fixes (in a different PR), and add that there.
- use DoutPrefixProvider
- unify what log messages contain (broker+topic)
fixes: https://tracker.ceph.com/issues/66017
as well as other simplifications:
other improvements:
Checklist
Show available Jenkins commands
jenkins retest this pleasejenkins test classic perfjenkins test crimson perfjenkins test signedjenkins test make checkjenkins test make check arm64jenkins test submodulesjenkins test dashboardjenkins test dashboard cephadmjenkins test apijenkins test docsjenkins render docsjenkins test ceph-volume alljenkins test ceph-volume toxjenkins test windowsjenkins test rook e2e