Skip to content

rgw/kafka: reply with an error when broker is down#55051

Closed
yuvalif wants to merge 1 commit intoceph:mainfrom
yuvalif:wip-yuval-63915
Closed

rgw/kafka: reply with an error when broker is down#55051
yuvalif wants to merge 1 commit intoceph:mainfrom
yuvalif:wip-yuval-63915

Conversation

@yuvalif
Copy link
Contributor

@yuvalif yuvalif commented Jan 3, 2024

Fixes: https://tracker.ceph.com/issues/63915

test: https://gist.github.com/yuvalif/33487bff19883e3409caa8a843a0b353

TODO: implement amqp::check_broker()

Checklist

  • Tracker (select at least one)
    • References tracker ticket
    • Very recent bug; references commit where it was introduced
    • New feature (ticket optional)
    • Doc update (no ticket needed)
    • Code cleanup (no ticket needed)
  • Component impact
    • Affects Dashboard, opened tracker ticket
    • Affects Orchestrator, opened tracker ticket
    • No impact that needs to be tracked
  • Documentation (select at least one)
    • Updates relevant documentation
    • No doc update is appropriate
  • Tests (select at least one)
Show available Jenkins commands
  • jenkins retest this please
  • jenkins test classic perf
  • jenkins test crimson perf
  • jenkins test signed
  • jenkins test make check
  • jenkins test make check arm64
  • jenkins test submodules
  • jenkins test dashboard
  • jenkins test dashboard cephadm
  • jenkins test api
  • jenkins test docs
  • jenkins render docs
  • jenkins test ceph-volume all
  • jenkins test ceph-volume tox
  • jenkins test windows
  • jenkins test rook e2e

@yuvalif yuvalif requested a review from a team as a code owner January 3, 2024 12:03
@github-actions github-actions bot added the rgw label Jan 3, 2024
@yuvalif
Copy link
Contributor Author

yuvalif commented Jan 10, 2024

dpp->get_cct());
ldpp_dout(res.dpp, 20) << "INFO: push endpoint created: "
<< topic_cfg.dest.push_endpoint << dendl;
const auto ret = endpoint->ping(dpp->get_cct(), res.yield);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

With this ping, if there is a transient short lived error at same time, we will be failing the request and not allowing the kafka retry which happens otherwise
ideally there should be a way once we call kafka_produce after timeout to verify if the broker was down and then return the error?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

note that this is only for syncronouse notifications, so there is no retry mechanism at our level of the code. if you want a to retry, usepersistent notifications.
if there is an error in "produce()" or "poll()", this is after the kafka library deicded that there is an error (e.g. exhausted internal retries).
since wew mimic here the 2 phases, we cannot try to send the notification, because, if we succeed, and then the S3 op fails, there is no way to rollback the notification.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i still do not understand how is this solving the issue,
you call ping which checks the conn->status in publish_reserve().
the conn->status gets updated when kafka is idle indicating the broker is down (it also gets updated for other transient issues, but i guess those are irrelevant). As soon as it gets idle we also delete the connection.
so now the next s3 requests will not find the connections and again do a new connection when RGWPubSubKafkaEndpoint::Create is called and new connection will be emplaced in map andstatus for new connection is set to ok, so the new connection will also wait for idle_timeout and until then the conn->status will ok, but since broker is down the new s3 request will wait until timeout before returning, so the same issue will still exist ??

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

as long as there are notifications to be sent to a broker, it is not considered to be idle (even if it is "down").
we set conn->timestamp inside the publish_internal() call regardless of connection state.

the "idle" mechanism is meant to eb a garbage collector for the case that we delete all topics and notifications that used to send to this broker, but the kafka connection is still there.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

as long as there are notifications to be sent to a broker, it is not considered to be idle (even if it is "down"). we set conn->timestamp inside the publish_internal() call regardless of connection state.

the "idle" mechanism is meant to eb a garbage collector for the case that we delete all topics and notifications that used to send to this broker, but the kafka connection is still there.

as per code here (https://github.com/ceph/ceph/blob/main/src/rgw/rgw_kafka.cc#L467-L471), if there is timeout for any one of the sent event connections.erase which will invoke the destroy and remove the connection from list. So again all the subsequent connect will go through same timeout logic ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if there is timeout for any one of the sent event

the code above checks idleness for the connection itself, not a timeout for an event
and a connection is considered idle only if there aren't any events that we try to send to it for more than couple of minutes.
see: https://github.com/ceph/ceph/blob/main/src/rgw/rgw_kafka.cc#L354

@cbodley
Copy link
Contributor

cbodley commented Jan 30, 2024

adding a synchronous round trip for every call to publish_reserve() seems like an expensive way to solve this issue, effectively doubling the traffic between rgw<->kafka

maybe we could just return a retryable error if some other request to that broker has failed/timed out recently?

@yuvalif
Copy link
Contributor Author

yuvalif commented Jan 30, 2024

adding a synchronous round trip for every call to publish_reserve() seems like an expensive way to solve this issue, effectively doubling the traffic between rgw<->kafka

maybe we could just return a retryable error if some other request to that broker has failed/timed out recently?

in case of kafka, the "ping" does send anything to the broker, just check the state of the connection.
this is not a 100% guarantee that the "commit" would be successfull, but better than nothing, and would cover the common case where the broker is down.
see: https://github.com/ceph/ceph/pull/55051/files#diff-925da66a25513280414fbc7daa5d76d7f6f5ea1d243163c3b53dd3052acb4b32R622

@cbodley
Copy link
Contributor

cbodley commented Jan 30, 2024

in case of kafka, the "ping" does send anything to the broker, just check the state of the connection. this is not a 100% guarantee that the "commit" would be successfull, but better than nothing, and would cover the common case where the broker is down. see: https://github.com/ceph/ceph/pull/55051/files#diff-925da66a25513280414fbc7daa5d76d7f6f5ea1d243163c3b53dd3052acb4b32R622

@yuvalif oops, sorry i didn't review closely enough. i think the verb 'ping' is what confused me there. maybe 'is_alive' or 'is_connected' would clarify that?

@kchheda3
Copy link
Contributor

ble error if some other request to that broker has failed/timed out recen

@yuvalif instead we could use rd_kafka_conf_set_background_event_cb that will give the kafka_events like Kafka error: Local: All broker connections are down immediately after we call kafka_produce and we do not have to wait for idle_timeout before returning the error

@yuvalif
Copy link
Contributor Author

yuvalif commented Jan 30, 2024

ble error if some other request to that broker has failed/timed out recen

@yuvalif instead we could use rd_kafka_conf_set_background_event_cb that will give the kafka_events like Kafka error: Local: All broker connections are down immediately after we call kafka_produce and we do not have to wait for idle_timeout before returning the error

we get the broker down indication (via the error callback) immediatly from the "kafka_poll()" call

@kchheda3
Copy link
Contributor

ble error if some other request to that broker has failed/timed out recen

@yuvalif instead we could use rd_kafka_conf_set_background_event_cb that will give the kafka_events like Kafka error: Local: All broker connections are down immediately after we call kafka_produce and we do not have to wait for idle_timeout before returning the error

we get the broker down indication (via the error callback) immediatly from the "kafka_poll()" call

if that's the case, then we do not need all these changes and can rely on that indication ?

@yuvalif
Copy link
Contributor Author

yuvalif commented Jan 31, 2024

ble error if some other request to that broker has failed/timed out recen

@yuvalif instead we could use rd_kafka_conf_set_background_event_cb that will give the kafka_events like Kafka error: Local: All broker connections are down immediately after we call kafka_produce and we do not have to wait for idle_timeout before returning the error

we get the broker down indication (via the error callback) immediatly from the "kafka_poll()" call

if that's the case, then we do not need all these changes and can rely on that indication ?

the changes in this PR are needed for 2 main reasons:

  • adding the "ping" or "is_alive" interface is needed so we can have "2 phase commit" in the non-persistent case
  • previously, the kafka code was not detecting when a broker was down, since the indication is made via the error callback of the "kafka_poll()" call, and not through a specific event failure. as a result, there were some other changes needed in the cleanup code

@kchheda3
Copy link
Contributor

kchheda3 commented Feb 1, 2024

ble error if some other request to that broker has failed/timed out recen

@yuvalif instead we could use rd_kafka_conf_set_background_event_cb that will give the kafka_events like Kafka error: Local: All broker connections are down immediately after we call kafka_produce and we do not have to wait for idle_timeout before returning the error

we get the broker down indication (via the error callback) immediatly from the "kafka_poll()" call

if that's the case, then we do not need all these changes and can rely on that indication ?

the changes in this PR are needed for 2 main reasons:

  • adding the "ping" or "is_alive" interface is needed so we can have "2 phase commit" in the non-persistent case

  • previously, the kafka code was not detecting when a broker was down, since the indication is made via the error callback of the "kafka_poll()" call, and not through a specific event failure. as a result, there were some other changes needed in the cleanup code

How does it matter for non persistent notifications, coz those are best effort only. So now why is it important to report error on s3 request if broker is down? With current changes publish reserve will report broker down n eventually failing the request which is not what non persistent request are designed for

@yuvalif
Copy link
Contributor Author

yuvalif commented Feb 1, 2024

How does it matter for non persistent notifications, coz those are best effort only.
So now why is it important to report error on s3 request if broker is down?

the are not best effort, if you define a topic to be non-persistent, and set the ack level to broker, it means that you want to fail the s3 operation of the notification cannot be delivered.
for that to work properly, we need 2 phases.

With current changes publish reserve will report broker down n eventually failing the request which is not what non persistent request are designed for

even with current code, because non-persistent requests are syncronous (when ack-level=broker), the publish_commit() call will fail if the broker is down.
the fix here is to address 2 things:

  • use 2 phase process which is more correct
  • there are no per-request errors when broker is down in libedkafka, instead there is a per connection callback for that. so the rgw_kafka code was modified to handle that

void poll_err_callback(rd_kafka_t *rk, int err, const char *reason, void *opaque) {
const auto conn = reinterpret_cast<connection_t*>(rd_kafka_opaque(rk));
ldout(conn->cct, 10) << "Kafka run: poll error(" << err << "): " << reason << dendl;
conn->status = err;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

so from what i understand is this poll_err_callback will return an error for when the kafka is down and now the connection->status is set to ERROR returned from this function.
Now the next call to same topic on non-persistent notification will call is_alive and that will retrieve the connection from the map and return the status and since the status is not OK, it will not even try to send the notification? And then eventually the notification will be removed from the map after the IDLE_TIMEOUT and now next time create_endpoint will try re-creating the connection and store it in the map.
Am i correct in understanding this workflow ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

so from what i understand is this poll_err_callback will return an error for when the kafka is down and now the connection->status is set to ERROR returned from this function. Now the next call to same topic on non-persistent notification will call is_alive and that will retrieve the connection from the map and return the status and since the status is not OK, it will not even try to send the notification?

correct. not only it will not send the notification, it will fail the operation (PUT, DELETE etc.) that triggered the notification.

And then eventually the notification will be removed from the map after the IDLE_TIMEOUT

the connection will be removed from the map after the IDLE_TIMEOUT if there are no attempts to send to it, regardless of the status of the connection. this is to do cleanup of decomissioned brokers, and cleanup if all topics that point to a brokers are deleted. and is unrelated to the state of the conection

and now next time create_endpoint will try re-creating the connection and store it in the map. Am i correct in understanding this workflow ?

if the broker is down (in error state) it is not deleted, we retry to conect in the main thread. see:

// try to reconnect the connection if it has an error

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if the broker is down (in error state) it is not deleted, we retry to conect in the main thread. see:

the main thread just resets all the config param of kafka and also sets the STATUS = OK.
this essentially will lead to non deterministic behavior, Some S3(PUT,DELETE,etc) request will receive 200 but notification event will have FAILED due to broker down and some S3 request that sneak in between the window where the poll_err_callback sets the status to ERROR and main::run sets is back to OK, will receive 503/EINVAL.
what i would suggest is, let each S3 request try to send the notification event to broker. The message_callback or poll_err_callback will receive the error from broker immediately and the just invoke the s3 callback for error status. currently only message_callback invokes the callback, we can invoke the s3:callback from poll_err_callback as well? Essentially the issue we are trying to solve here is not wait till IDLE_TIMEOUT for the status to reflect ? correct?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if the broker is down (in error state) it is not deleted, we retry to conect in the main thread. see:

the main thread just resets all the config param of kafka and also sets the STATUS = OK. this essentially will lead to non deterministic behavior, Some S3(PUT,DELETE,etc) request will receive 200 but notification event will have FAILED due to broker down and some S3 request that sneak in between the window where the poll_err_callback sets the status to ERROR and main::run sets is back to OK, will receive 503/EINVAL.

you are right

what i would suggest is, let each S3 request try to send the notification event to broker. The message_callback or poll_err_callback will receive the error from broker immediately and the just invoke the s3 callback for error status.

poll_err_callback is invoked from rd_kafka_poll and cannot be invoked immediatly or per messsage.

currently only message_callback invokes the callback, we can invoke the s3:callback from poll_err_callback as well?

no

Essentially the issue we are trying to solve here is not wait till IDLE_TIMEOUT for the status to reflect ? correct?

IDLE_TIMEOUT is a different and unrelated issue. is was wrongly used before as a way to clean up error states.

one thing we can do to be more deterministic is to send a dummy message and then poll for an answer every time we create a new connection. and set the status accordingly.
it will take longer to create a connection, but since we reuse the connection when sending notifications it should not really impact overall perf.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if the broker is down (in error state) it is not deleted, we retry to conect in the main thread. see:

the main thread just resets all the config param of kafka and also sets the STATUS = OK. this essentially will lead to non deterministic behavior, Some S3(PUT,DELETE,etc) request will receive 200 but notification event will have FAILED due to broker down and some S3 request that sneak in between the window where the poll_err_callback sets the status to ERROR and main::run sets is back to OK, will receive 503/EINVAL.

you are right

what i would suggest is, let each S3 request try to send the notification event to broker. The message_callback or poll_err_callback will receive the error from broker immediately and the just invoke the s3 callback for error status.

poll_err_callback is invoked from rd_kafka_poll and cannot be invoked immediatly or per messsage.

currently only message_callback invokes the callback, we can invoke the s3:callback from poll_err_callback as well?

no

Essentially the issue we are trying to solve here is not wait till IDLE_TIMEOUT for the status to reflect ? correct?

IDLE_TIMEOUT is a different and unrelated issue. is was wrongly used before as a way to clean up error states.

one thing we can do to be more deterministic is to send a dummy message and then poll for an answer every time we create a new connection. and set the status accordingly. it will take longer to create a connection, but since we reuse the connection when sending notifications it should not really impact overall perf.

sending a dummy message will be same as current, dummy message would succeed at start but assuming the broker goes down in the middle, then all the events sent in middle will still not not delivered.
reading through the kafka doc, can we leverage this rd_kafka_conf_set_background_event_cb, this gets called immediately once the broker is down and then we could go and update the status of connection for that topic ?

But one thing is sure, to have consistent behavior we do not have to reject/error the s3 request if the broker is down for NON Persistent notification, and continue the best effort logic.

Copy link
Contributor Author

@yuvalif yuvalif Feb 22, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sending a dummy message will be same as current, dummy message would succeed at start but assuming the broker goes down in the middle, then all the events sent in middle will still not not delivered.

IMO, there is a big difference between sending the wrong response while the broker goes down (undesireable, but acceptble) and sending wrong responses while the broker is down (unacceptable)

reading through the kafka doc, can we leverage this rd_kafka_conf_set_background_event_cb, this gets called immediately once the broker is down and then we could go and update the status of connection for that topic ?

I need to test it first, but on rhel8 we use librdkafka-devel 0.11.4 which does not support that

But one thing is sure, to have consistent behavior we do not have to reject/error the s3 request if the broker is down for NON Persistent notification, and continue the best effort logic.

if you are good with best effort behavior, you can just set the ack-level to none. in this case the notification will fail only if there is something wrong with the configuration of the broker.

if (perfcounter) perfcounter->inc(l_rgw_pubsub_push_failed);
return ret;
}
if (perfcounter) perfcounter->inc(l_rgw_pubsub_push_ok);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i don't think we should increment this one during publish_reserve(). publish_commit() increments it again at the end

} else {
// no reservation - try to connect to endpoint instead
try {
const auto endpoint = RGWPubSubEndpoint::create(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

now that this returns a raw pointer, the memory will leak in the error paths below. please leave unique_ptr as the factory function's return type. you can move the unique_ptr into res.endpoint below

req_id, null_yield);

auto notify_res = static_cast<rgw::sal::RadosNotification*>(notify.get())->get_reservation();
rgw::notify::reservation_t notify_res(dpp, store, &dest_obj, nullptr, &dest_bucket, user_id, tenant, req_id, null_yield);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍

}

int is_alive(CephContext* cct, optional_yield y) override {
return kafka::check_broker(conn_name, topic);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

check_broker() is returning internal error codes like STATUS_CONNECTION_CLOSED that the rest of rgw doesn't understand. should there be some kind of error mapping so we can choose the right http error to return?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

changed to -EIO

@kchheda3
Copy link
Contributor

@yuvalif i have an idea on how to do this in a different way, but before doing things i want to confirm we are trying to achieve 2 things here

  • for Sync notification if the broker gets down in middle and remains down for sometime, we do not want the s3 requests to timeout or result in hang
  • second you want to now introduce this new feature for SYNC notification where if the ack_level == borker (may be not NONE), and if message is not delivered/broker is down the s3 request should send failure code (however the s3 operation would have finished successfully)

to address the first concern, we should set the kafka config variables message.send.max.retries & retry.backoff.ms and message.timeout.ms. we should set these values between 10-30seconds (can be configured via ceph.conf). Currently we do not set these values, so the default timeout = 5 minutes causing the sync threads to hang in event the kafka goes down and does not recover within 5 minutes. Setting the max_retries would ensure the s3 thread does not remain in hang state for 5 minutes !

Second concern is more of a feature, and i am not yet 100% sure we should do this, return non 200 for SYNC notification if sending event failed for ack_level broker even though the s3 request has completed successfully and cannot be reverted.
but if we want to achieve that we can still do this, by having the retry value set and then honor the publish_commit value. currently we ignore the publish_commit value in s3 requests. For persistent notification publish_commit is more or less success unless there is race condition event size > 1024 (we optimistically reserve 1k size in queue) and when we retry reserving actual size & queue is full publish_commit returns error. So return value for publish_commit can still be used for returning back to s3 request.

@kchheda3
Copy link
Contributor

kchheda3 commented Feb 23, 2024

message.send.max.retries & retry.backoff.ms and message.timeout.ms

IMO irrespective what we decide, we definitely need to set the kafka config values message.send.max.retries & retry.backoff.ms or message.timeout.ms atleast for the SYNC notification

@yuvalif
Copy link
Contributor Author

yuvalif commented Feb 26, 2024

@yuvalif i have an idea on how to do this in a different way, but before doing things i want to confirm we are trying to achieve 2 things here

  • for Sync notification if the broker gets down in middle and remains down for sometime, we do not want the s3 requests to timeout or result in hang
  • second you want to now introduce this new feature for SYNC notification where if the ack_level == borker (may be not NONE), and if message is not delivered/broker is down the s3 request should send failure code (however the s3 operation would have finished successfully)

thanks @kchheda3, agree that it makes sense to separate to 2 things:

  • fix the current behavior (and add necessary conf parameters)
  • add an option to reflect kafka delivery failures to s3 client

Second concern is more of a feature, and i am not yet 100% sure we should do this, return non 200 for SYNC notification if sending event failed for ack_level broker even though the s3 request has completed successfully and cannot be reverted.

note that this is not how it behaves. we use the "reserve/commit" semantics to make sure that we reply with S3 failure only if no S3 operation was done, and no notification was sent.
and we reply with 200 OK only if the S3 operation was successfull and the notification was most likely sent

but if we want to achieve that we can still do this, by having the retry value set and then honor the publish_commit value. currently we ignore the publish_commit value in s3 requests. For persistent notification publish_commit is more or less success unless there is race condition event size > 1024 (we optimistically reserve 1k size in queue) and when we retry reserving actual size & queue is full publish_commit returns error. So return value for publish_commit can still be used for returning back to s3 request.

if we reflect back the return value from publish_commit (even with s3 client retries, which are not sufficient to overcome kafka outage) we would give an indication that the S3 operation failed, while it wasn't.

@kchheda3
Copy link
Contributor

most likel

Ok so from what you mentioned and i interpret it as,

  • we do not want to return NON-200 even if the s3 operation is success but notification delivery is not success.
  • we want to figure out a way that notification will most likely sent (basically current state of broker is not DOWN inside the publish_reserve phase, so s3 request can be failed right away?
    The 2nd requirement IMO is a tricky one and as i pointed earlier it will result in NON-DETERMINISTIC behavior as some s3 request will pass even with broker down and some s3 request will be rejected right away. if want to do it and want it more deterministic then only way IMO is to not store the connections to the topic in a map/cache it and rather create a new connection for every event and new connection creation should be part of publish_reserve and it verifies whether the broker is up/running.
    this will give desired result with same behavior for all cases (except for a case where the broker goes down in middle of publish_reserve n publish_commit). But this approach would add lot of latency as we are creating the request every time or even if cache the connection, we at least check the broker is up every time in the publish_reserve.

Now comes the question how to verify the broker is down in publish_reserve phase. send a fake message during the publish reserve phase and then have a way either via poll_callback or message_callback to confirm the state of the broker. ??
This step will add spam of sending fake messages to broker (i know you proposed to send it only once, till the connection state is OK, but that is resulting in non-deterministic behavior)

IMO this whole new feature needs to be revisited and re-thought that do we really need this. IIRC we were gonna stop supporting sync_notifications in future and ask ppl to move/migrate to persistent ?

@yuvalif
Copy link
Contributor Author

yuvalif commented Feb 26, 2024

most likel

Ok so from what you mentioned and i interpret it as,

  • we do not want to return NON-200 even if the s3 operation is success but notification delivery is not success.

yes. because this will be incorrect from S3 perspective

  • we want to figure out a way that notification will most likely sent (basically current state of broker is not DOWN inside the publish_reserve phase, so s3 request can be failed right away?

yes

The 2nd requirement IMO is a tricky one and as i pointed earlier it will result in NON-DETERMINISTIC behavior as some s3 request will pass even with broker down and some s3 request will be rejected right away.

we may get some non-determinitic behavior for the notifications sent as the broker goes down. but once the broker is down, the behavior will be deterministic

if want to do it and want it more deterministic then only way IMO is to not store the connections to the topic in a map/cache it and rather create a new connection for every event and new connection creation should be part of publish_reserve and it verifies whether the broker is up/running.
this will give desired result with same behavior for all cases (except for a case where the broker goes down in middle of publish_reserve n publish_commit). But this approach would add lot of latency as we are creating the request every time or even if cache the connection, we at least check the broker is up every time in the publish_reserve.
Now comes the question how to verify the broker is down in publish_reserve phase. send a fake message during the publish reserve phase and then have a way either via poll_callback or message_callback to confirm the state of the broker. ?? This step will add spam of sending fake messages to broker (i know you proposed to send it only once, till the connection state is OK, but that is resulting in non-deterministic behavior)

you are right, this will spam the broker, and also create latency due to the call to "poll". currently we do everyting in batches, which is much more efficient

IMO this whole new feature needs to be revisited and re-thought that do we really need this. IIRC we were gonna stop supporting sync_notifications in future and ask ppl to move/migrate to persistent ?

overall, we recommend using persistent notifications. however, there could be usecases where we use the feature for near-reltime messaging of large objects. in this case, there is no requirement to overcome broker outages, and the requirement is actually to let the S3 clients know when the broker is down.

i will add a conf parameter, so that the new feature is optional.

in general, I don't think we should deprecate sync notifications. there is a valid usecase where we want best effort notifications and do not want to pay the extra cost using the RADOS backed queue.
in such a case setting ack-level=none will make them really best effort. setting
ack-level=broker will not guarantee delivery, but will, at least, pace the S3 clients if the broker is slow, and would prevent broker overloading.

Comment on lines +679 to +720
int check_broker(const std::string& conn_name,
const std::string& topic) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is the topic argument supposed to be used? the header says // verify that broker is up and that topic exists

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we create the topic as part of the connection creation, if the creation failed this will be reflected in the status.
added it as a parameter for the generic SAL interface (might be needed in other cases).
but i can remove for now

// derived class specific arguments are passed in http args format
// may throw a configuration_error if creation fails
static Ptr create(const std::string& endpoint, const std::string& topic, const RGWHTTPArgs& args, CephContext *cct=nullptr);
static RGWPubSubEndpoint* create(const std::string& endpoint, const std::string& topic, const RGWHTTPArgs& args, CephContext *cct=nullptr);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why does this not return unique_ptr anymore?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

no reason. will bring it back

* check for connectivity status with broker when connection is created
* exponential backoff for broker reconnect

Fixes: https://tracker.ceph.com/issues/63915

Signed-off-by: Yuval Lifshitz <ylifshit@ibm.com>
@yuvalif yuvalif added the DNM label Mar 5, 2024
@github-actions github-actions bot added the common label Mar 5, 2024
@yuvalif
Copy link
Contributor Author

yuvalif commented Mar 7, 2024

based on the discussion here: https://youtu.be/Ur0Fc6aOGTc?si=W7HtlHpgfjnSmzhV (skip to 55:00)
will break this PR to 3 PRs:

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants