rgw/async/notifications: use common async waiter in pubsub push#58765
rgw/async/notifications: use common async waiter in pubsub push#58765
Conversation
| #include <thread> | ||
| #include <boost/asio/io_context.hpp> | ||
| #include <boost/asio/spawn.hpp> | ||
| #include <boost/context/protected_fixedsize_stack.hpp> |
|
|
||
| void invoke_callback(int expected_reply, std::function<void(int)> cb) { | ||
| auto t = std::thread([cb, expected_reply] { | ||
| std::this_thread::sleep_for(std::chrono::seconds(1)); |
There was a problem hiding this comment.
i've really been trying to avoid sleeps in unit tests. short sleeps make for flaky tests, and long sleeps make them slow (that adds up in 'make check'). if the test cases are fast, you can run them with something like --gtest_repeat=1000 under valgrind or sanitizers to help catch any races
in this case, you just need to ensure that cb() is called after waiter.async_wait() suspends
boost::asio::defer() is a useful tool for this. it schedules a handler to run after the current one finishes. you can use that to avoid the sleep by spawning the thread in a deferred block:
- invoke_callback(expected_reply, [&waiter](int r) {waiter.complete(boost::system::error_code{}, r);});
+ // trigger completion after async_wait() suspends
+ boost::asio::defer(yield.get_executor(), [&] {
+ invoke_callback(expected_reply, [&waiter](int r) {waiter.complete(boost::system::error_code{}, r);});
+ });
reply = waiter.async_wait(yield);cross_thread_cancel is a recent example that does something similar, although it spawns a separate thread for the execution context, waits on a latch for that thread to block in throttle.wait(), then triggers cancellation from the main thread: ac4c8ad
lack of sleeps there keep the runtime extremely short:
12 tests from co_throttle (3 ms total)
There was a problem hiding this comment.
hmm. calling "defer" (with or without the timeout) fixes the tsan errors.
maybe this is the root cause for the problem?
will try to modify that in rgw_pubsub_push.cc
There was a problem hiding this comment.
changed the notifications code to use defer
| if (y) { | ||
| boost::system::error_code ec; | ||
| auto yield = y.get_yield_context(); | ||
| auto&& token = yield[ec]; | ||
| boost::asio::async_initiate<boost::asio::yield_context, Signature>( | ||
| [this, &l] (auto handler, auto ex) { | ||
| completion = Completion::create(ex, std::move(handler)); | ||
| l.unlock(); // unlock before suspend | ||
| }, token, yield.get_executor()); | ||
| return -ec.value(); | ||
| } | ||
| maybe_warn_about_blocking(dpp); | ||
|
|
||
| cond.wait(l, [this]{return (done==true);}); |
There was a problem hiding this comment.
the existing Waiter handles both states of optional_yield. ceph::async::yield_waiter only accepts the yield_context, and calling y.get_yield_context() on a null_yield is undefined behavior
i think this Waiter should use ceph::async::yield_waiter internally, while preserving the mutex/condition_variable for the null_yield case
There was a problem hiding this comment.
currently, these functions are called either from the frontend or from the notifications manager.
both are not using the null_yield. to avoid undefined behavior, i can add an assert
There was a problem hiding this comment.
don't we also send notifications from background threads like lifecycle and multisite? those would pass null_yield
There was a problem hiding this comment.
yes, you are right (forgot about these)
There was a problem hiding this comment.
yes, you are right (forgot about these)
There was a problem hiding this comment.
looks like i'm missing a non-persistent lifecycle notifications test (lifecycle still passing with persistent notifications).
There was a problem hiding this comment.
not possible to test LC+non persistent. see: https://tracker.ceph.com/issues/67174
There was a problem hiding this comment.
also, no coverage for multisite sync notifications: https://tracker.ceph.com/issues/67178
There was a problem hiding this comment.
for testing, you can disable rgw_beast_enable_async so that frontend requests always see null_yield
There was a problem hiding this comment.
kafka tests are passing locally with the flag set to "false" (vstart.sh -n -d -o "rgw_beast_enable_async=false")
can we set the flag from commandline in a test, so we can cover that in teuthology as well?
* use the "yield_waiter" and "waiter" from common/async insteasd of the "waiter" implemented inside the bucket notification code (this is so we don't need separate investigations for 2 implementations) * added a unit test that simulate how a separate thread (kafka or amqp) is resuming a coroutine which is created by either the frontend or the notification manager. before using "defer" the unit test is passing, however, when executed under thread sanitizer (using the WITH_TSAN cmake flag) the following errors are observed: https://0x0.st/Xp4P.txt after using "defer" the unit test passes under TSAN without errors. Fixes: https://tracker.ceph.com/issues/64184 Signed-off-by: Yuval Lifshitz <ylifshit@ibm.com>
| }); | ||
| return w.async_wait(yield); | ||
| } | ||
| ceph::async::waiter<int> w; |
There was a problem hiding this comment.
@adamemerson could you please have a look here?
|
teuthology is mostly passing: https://pulpito.ceph.com/yuvalif-2024-07-26_08:21:47-rgw:notifications-wip-yuval-64184-distro-default-smithi/ |
|
jenkins test api |
Fixes: https://tracker.ceph.com/issues/64184
Checklist
Show available Jenkins commands
jenkins retest this pleasejenkins test classic perfjenkins test crimson perfjenkins test signedjenkins test make checkjenkins test make check arm64jenkins test submodulesjenkins test dashboardjenkins test dashboard cephadmjenkins test apijenkins test docsjenkins render docsjenkins test ceph-volume alljenkins test ceph-volume toxjenkins test windowsjenkins test rook e2e