rgw/kafka: reply with an error when broker is down#55051
rgw/kafka: reply with an error when broker is down#55051
Conversation
361bb2e to
95a6665
Compare
|
teuthology is failing in the amqp tests: https://pulpito.ceph.com/yuvalif-2024-01-10_10:44:27-rgw:notifications-wip-yuval-63915-distro-default-smithi/ |
src/rgw/driver/rados/rgw_notify.cc
Outdated
| dpp->get_cct()); | ||
| ldpp_dout(res.dpp, 20) << "INFO: push endpoint created: " | ||
| << topic_cfg.dest.push_endpoint << dendl; | ||
| const auto ret = endpoint->ping(dpp->get_cct(), res.yield); |
There was a problem hiding this comment.
With this ping, if there is a transient short lived error at same time, we will be failing the request and not allowing the kafka retry which happens otherwise
ideally there should be a way once we call kafka_produce after timeout to verify if the broker was down and then return the error?
There was a problem hiding this comment.
note that this is only for syncronouse notifications, so there is no retry mechanism at our level of the code. if you want a to retry, usepersistent notifications.
if there is an error in "produce()" or "poll()", this is after the kafka library deicded that there is an error (e.g. exhausted internal retries).
since wew mimic here the 2 phases, we cannot try to send the notification, because, if we succeed, and then the S3 op fails, there is no way to rollback the notification.
There was a problem hiding this comment.
i still do not understand how is this solving the issue,
you call ping which checks the conn->status in publish_reserve().
the conn->status gets updated when kafka is idle indicating the broker is down (it also gets updated for other transient issues, but i guess those are irrelevant). As soon as it gets idle we also delete the connection.
so now the next s3 requests will not find the connections and again do a new connection when RGWPubSubKafkaEndpoint::Create is called and new connection will be emplaced in map andstatus for new connection is set to ok, so the new connection will also wait for idle_timeout and until then the conn->status will ok, but since broker is down the new s3 request will wait until timeout before returning, so the same issue will still exist ??
There was a problem hiding this comment.
as long as there are notifications to be sent to a broker, it is not considered to be idle (even if it is "down").
we set conn->timestamp inside the publish_internal() call regardless of connection state.
the "idle" mechanism is meant to eb a garbage collector for the case that we delete all topics and notifications that used to send to this broker, but the kafka connection is still there.
There was a problem hiding this comment.
as long as there are notifications to be sent to a broker, it is not considered to be idle (even if it is "down"). we set
conn->timestampinside thepublish_internal()call regardless of connection state.the "idle" mechanism is meant to eb a garbage collector for the case that we delete all topics and notifications that used to send to this broker, but the kafka connection is still there.
as per code here (https://github.com/ceph/ceph/blob/main/src/rgw/rgw_kafka.cc#L467-L471), if there is timeout for any one of the sent event connections.erase which will invoke the destroy and remove the connection from list. So again all the subsequent connect will go through same timeout logic ?
There was a problem hiding this comment.
if there is timeout for any one of the sent event
the code above checks idleness for the connection itself, not a timeout for an event
and a connection is considered idle only if there aren't any events that we try to send to it for more than couple of minutes.
see: https://github.com/ceph/ceph/blob/main/src/rgw/rgw_kafka.cc#L354
|
adding a synchronous round trip for every call to publish_reserve() seems like an expensive way to solve this issue, effectively doubling the traffic between rgw<->kafka maybe we could just return a retryable error if some other request to that broker has failed/timed out recently? |
in case of kafka, the "ping" does send anything to the broker, just check the state of the connection. |
@yuvalif oops, sorry i didn't review closely enough. i think the verb 'ping' is what confused me there. maybe 'is_alive' or 'is_connected' would clarify that? |
@yuvalif instead we could use |
we get the broker down indication (via the error callback) immediatly from the "kafka_poll()" call |
if that's the case, then we do not need all these changes and can rely on that indication ? |
the changes in this PR are needed for 2 main reasons:
|
95a6665 to
8214dfa
Compare
How does it matter for non persistent notifications, coz those are best effort only. So now why is it important to report error on s3 request if broker is down? With current changes publish reserve will report broker down n eventually failing the request which is not what non persistent request are designed for |
the are not best effort, if you define a topic to be non-persistent, and set the ack level to broker, it means that you want to fail the s3 operation of the notification cannot be delivered.
even with current code, because non-persistent requests are syncronous (when
|
src/rgw/rgw_kafka.cc
Outdated
| void poll_err_callback(rd_kafka_t *rk, int err, const char *reason, void *opaque) { | ||
| const auto conn = reinterpret_cast<connection_t*>(rd_kafka_opaque(rk)); | ||
| ldout(conn->cct, 10) << "Kafka run: poll error(" << err << "): " << reason << dendl; | ||
| conn->status = err; |
There was a problem hiding this comment.
so from what i understand is this poll_err_callback will return an error for when the kafka is down and now the connection->status is set to ERROR returned from this function.
Now the next call to same topic on non-persistent notification will call is_alive and that will retrieve the connection from the map and return the status and since the status is not OK, it will not even try to send the notification? And then eventually the notification will be removed from the map after the IDLE_TIMEOUT and now next time create_endpoint will try re-creating the connection and store it in the map.
Am i correct in understanding this workflow ?
There was a problem hiding this comment.
so from what i understand is this
poll_err_callbackwill return an error for when the kafka is down and now the connection->status is set toERRORreturned from this function. Now the next call to same topic on non-persistent notification will callis_aliveand that will retrieve the connection from the map and return the status and since the status is not OK, it will not even try to send the notification?
correct. not only it will not send the notification, it will fail the operation (PUT, DELETE etc.) that triggered the notification.
And then eventually the notification will be removed from the map after the
IDLE_TIMEOUT
the connection will be removed from the map after the IDLE_TIMEOUT if there are no attempts to send to it, regardless of the status of the connection. this is to do cleanup of decomissioned brokers, and cleanup if all topics that point to a brokers are deleted. and is unrelated to the state of the conection
and now next time create_endpoint will try re-creating the connection and store it in the map. Am i correct in understanding this workflow ?
if the broker is down (in error state) it is not deleted, we retry to conect in the main thread. see:
Line 480 in 8214dfa
There was a problem hiding this comment.
if the broker is down (in error state) it is not deleted, we retry to conect in the main thread. see:
the main thread just resets all the config param of kafka and also sets the STATUS = OK.
this essentially will lead to non deterministic behavior, Some S3(PUT,DELETE,etc) request will receive 200 but notification event will have FAILED due to broker down and some S3 request that sneak in between the window where the poll_err_callback sets the status to ERROR and main::run sets is back to OK, will receive 503/EINVAL.
what i would suggest is, let each S3 request try to send the notification event to broker. The message_callback or poll_err_callback will receive the error from broker immediately and the just invoke the s3 callback for error status. currently only message_callback invokes the callback, we can invoke the s3:callback from poll_err_callback as well? Essentially the issue we are trying to solve here is not wait till IDLE_TIMEOUT for the status to reflect ? correct?
There was a problem hiding this comment.
if the broker is down (in error state) it is not deleted, we retry to conect in the main thread. see:
the main thread just resets all the config param of kafka and also sets the
STATUS = OK. this essentially will lead to non deterministic behavior, Some S3(PUT,DELETE,etc) request will receive 200 but notification event will have FAILED due to broker down and some S3 request that sneak in between the window where thepoll_err_callbacksets the status to ERROR andmain::runsets is back toOK, will receive503/EINVAL.
you are right
what i would suggest is, let each S3 request try to send the notification event to broker. The
message_callbackorpoll_err_callbackwill receive the error from broker immediately and the just invoke the s3 callback for error status.
poll_err_callback is invoked from rd_kafka_poll and cannot be invoked immediatly or per messsage.
currently only
message_callbackinvokes the callback, we can invoke the s3:callback frompoll_err_callbackas well?
no
Essentially the issue we are trying to solve here is not wait till IDLE_TIMEOUT for the status to reflect ? correct?
IDLE_TIMEOUT is a different and unrelated issue. is was wrongly used before as a way to clean up error states.
one thing we can do to be more deterministic is to send a dummy message and then poll for an answer every time we create a new connection. and set the status accordingly.
it will take longer to create a connection, but since we reuse the connection when sending notifications it should not really impact overall perf.
There was a problem hiding this comment.
if the broker is down (in error state) it is not deleted, we retry to conect in the main thread. see:
the main thread just resets all the config param of kafka and also sets the
STATUS = OK. this essentially will lead to non deterministic behavior, Some S3(PUT,DELETE,etc) request will receive 200 but notification event will have FAILED due to broker down and some S3 request that sneak in between the window where thepoll_err_callbacksets the status to ERROR andmain::runsets is back toOK, will receive503/EINVAL.you are right
what i would suggest is, let each S3 request try to send the notification event to broker. The
message_callbackorpoll_err_callbackwill receive the error from broker immediately and the just invoke the s3 callback for error status.
poll_err_callbackis invoked fromrd_kafka_polland cannot be invoked immediatly or per messsage.currently only
message_callbackinvokes the callback, we can invoke the s3:callback frompoll_err_callbackas well?no
Essentially the issue we are trying to solve here is not wait till IDLE_TIMEOUT for the status to reflect ? correct?
IDLE_TIMEOUTis a different and unrelated issue. is was wrongly used before as a way to clean up error states.one thing we can do to be more deterministic is to send a dummy message and then poll for an answer every time we create a new connection. and set the status accordingly. it will take longer to create a connection, but since we reuse the connection when sending notifications it should not really impact overall perf.
sending a dummy message will be same as current, dummy message would succeed at start but assuming the broker goes down in the middle, then all the events sent in middle will still not not delivered.
reading through the kafka doc, can we leverage this rd_kafka_conf_set_background_event_cb, this gets called immediately once the broker is down and then we could go and update the status of connection for that topic ?
But one thing is sure, to have consistent behavior we do not have to reject/error the s3 request if the broker is down for NON Persistent notification, and continue the best effort logic.
There was a problem hiding this comment.
sending a dummy message will be same as current, dummy message would succeed at start but assuming the broker goes down in the middle, then all the events sent in middle will still not not delivered.
IMO, there is a big difference between sending the wrong response while the broker goes down (undesireable, but acceptble) and sending wrong responses while the broker is down (unacceptable)
reading through the kafka doc, can we leverage this rd_kafka_conf_set_background_event_cb, this gets called immediately once the broker is down and then we could go and update the status of connection for that topic ?
I need to test it first, but on rhel8 we use librdkafka-devel 0.11.4 which does not support that
But one thing is sure, to have consistent behavior we do not have to reject/error the s3 request if the broker is down for NON Persistent notification, and continue the best effort logic.
if you are good with best effort behavior, you can just set the ack-level to none. in this case the notification will fail only if there is something wrong with the configuration of the broker.
src/rgw/driver/rados/rgw_notify.cc
Outdated
| if (perfcounter) perfcounter->inc(l_rgw_pubsub_push_failed); | ||
| return ret; | ||
| } | ||
| if (perfcounter) perfcounter->inc(l_rgw_pubsub_push_ok); |
There was a problem hiding this comment.
i don't think we should increment this one during publish_reserve(). publish_commit() increments it again at the end
src/rgw/driver/rados/rgw_notify.cc
Outdated
| } else { | ||
| // no reservation - try to connect to endpoint instead | ||
| try { | ||
| const auto endpoint = RGWPubSubEndpoint::create( |
There was a problem hiding this comment.
now that this returns a raw pointer, the memory will leak in the error paths below. please leave unique_ptr as the factory function's return type. you can move the unique_ptr into res.endpoint below
| req_id, null_yield); | ||
|
|
||
| auto notify_res = static_cast<rgw::sal::RadosNotification*>(notify.get())->get_reservation(); | ||
| rgw::notify::reservation_t notify_res(dpp, store, &dest_obj, nullptr, &dest_bucket, user_id, tenant, req_id, null_yield); |
| } | ||
|
|
||
| int is_alive(CephContext* cct, optional_yield y) override { | ||
| return kafka::check_broker(conn_name, topic); |
There was a problem hiding this comment.
check_broker() is returning internal error codes like STATUS_CONNECTION_CLOSED that the rest of rgw doesn't understand. should there be some kind of error mapping so we can choose the right http error to return?
8214dfa to
54cf2d9
Compare
|
@yuvalif i have an idea on how to do this in a different way, but before doing things i want to confirm we are trying to achieve 2 things here
to address the first concern, we should set the kafka config variables Second concern is more of a feature, and i am not yet 100% sure we should do this, return non 200 for SYNC notification if sending event failed for ack_level broker even though the s3 request has completed successfully and cannot be reverted. |
IMO irrespective what we decide, we definitely need to set the kafka config values |
thanks @kchheda3, agree that it makes sense to separate to 2 things:
note that this is not how it behaves. we use the "reserve/commit" semantics to make sure that we reply with S3 failure only if no S3 operation was done, and no notification was sent.
if we reflect back the return value from |
Ok so from what you mentioned and i interpret it as,
Now comes the question how to verify the broker is down in publish_reserve phase. send a fake message during the publish reserve phase and then have a way either via poll_callback or message_callback to confirm the state of the broker. ?? IMO this whole new feature needs to be revisited and re-thought that do we really need this. IIRC we were gonna stop supporting sync_notifications in future and ask ppl to move/migrate to persistent ? |
yes. because this will be incorrect from S3 perspective
yes
we may get some non-determinitic behavior for the notifications sent as the broker goes down. but once the broker is down, the behavior will be deterministic
you are right, this will spam the broker, and also create latency due to the call to "poll". currently we do everyting in batches, which is much more efficient
overall, we recommend using persistent notifications. however, there could be usecases where we use the feature for near-reltime messaging of large objects. in this case, there is no requirement to overcome broker outages, and the requirement is actually to let the S3 clients know when the broker is down. i will add a conf parameter, so that the new feature is optional. in general, I don't think we should deprecate sync notifications. there is a valid usecase where we want best effort notifications and do not want to pay the extra cost using the RADOS backed queue. |
| int check_broker(const std::string& conn_name, | ||
| const std::string& topic) { |
There was a problem hiding this comment.
is the topic argument supposed to be used? the header says // verify that broker is up and that topic exists
There was a problem hiding this comment.
we create the topic as part of the connection creation, if the creation failed this will be reflected in the status.
added it as a parameter for the generic SAL interface (might be needed in other cases).
but i can remove for now
| // derived class specific arguments are passed in http args format | ||
| // may throw a configuration_error if creation fails | ||
| static Ptr create(const std::string& endpoint, const std::string& topic, const RGWHTTPArgs& args, CephContext *cct=nullptr); | ||
| static RGWPubSubEndpoint* create(const std::string& endpoint, const std::string& topic, const RGWHTTPArgs& args, CephContext *cct=nullptr); |
There was a problem hiding this comment.
why does this not return unique_ptr anymore?
There was a problem hiding this comment.
no reason. will bring it back
* check for connectivity status with broker when connection is created * exponential backoff for broker reconnect Fixes: https://tracker.ceph.com/issues/63915 Signed-off-by: Yuval Lifshitz <ylifshit@ibm.com>
|
based on the discussion here: https://youtu.be/Ur0Fc6aOGTc?si=W7HtlHpgfjnSmzhV (skip to 55:00)
|
Fixes: https://tracker.ceph.com/issues/63915
test: https://gist.github.com/yuvalif/33487bff19883e3409caa8a843a0b353
TODO: implement
amqp::check_broker()Checklist
Show available Jenkins commands
jenkins retest this pleasejenkins test classic perfjenkins test crimson perfjenkins test signedjenkins test make checkjenkins test make check arm64jenkins test submodulesjenkins test dashboardjenkins test dashboard cephadmjenkins test apijenkins test docsjenkins render docsjenkins test ceph-volume alljenkins test ceph-volume toxjenkins test windowsjenkins test rook e2e