Skip to content
33 changes: 18 additions & 15 deletions src/crimson/osd/osd_operations/peering_event.cc
Original file line number Diff line number Diff line change
Expand Up @@ -89,20 +89,25 @@ seastar::future<> PeeringEvent<T>::with_pg(
}).then_interruptible([this, pg](auto) {
return this->template enter_stage<interruptor>(peering_pp(*pg).process);
}).then_interruptible([this, pg, &shard_services] {
return pg->do_peering_event(evt, ctx
).then_interruptible([this] {
/* The DeleteSome event invokes PeeringListener::do_delete_work, which
* needs to return (without a future) the object to start with on the next
* call. As a consequence, crimson's do_delete_work implementation needs
* to use get() for the object listing. To support that, we wrap
* PG::do_peering_event with interruptor::async here.
*
* Otherwise, it's not ok to yield during peering event handler. Doing so
* allows other continuations to observe PeeringState in the middle
* of, for instance, a map advance. The interface *does not* support such
* usage. DeleteSome happens not to trigger that problem so it's ok for
* now, but we'll want to remove that as well.
* https://tracker.ceph.com/issues/66708
*/
return interruptor::async([this, pg, &shard_services] {
pg->do_peering_event(evt, ctx);
complete_rctx(shard_services, pg).get();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we call complete_rctx within do_peering_event to avoid future incorrect users?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd rather leave it as is -- pg_advance_map does potentially multiple events before calling complete_rctx.

}).then_interruptible([this] {
return that()->get_handle().complete();
}).then_interruptible([this, pg, &shard_services] {
return complete_rctx(shard_services, pg);
});
}).then_interruptible([pg, &shard_services]()
-> typename T::template interruptible_future<> {
if (!pg->get_need_up_thru()) {
return seastar::now();
}
return shard_services.send_alive(pg->get_same_interval_since());
}).then_interruptible([&shard_services] {
return shard_services.send_pg_temp();
});
}, [this](std::exception_ptr ep) {
LOG_PREFIX(PeeringEvent<T>::with_pg);
Expand All @@ -128,9 +133,7 @@ PeeringEvent<T>::complete_rctx(ShardServices &shard_services, Ref<PG> pg)
using interruptor = typename T::interruptor;
LOG_PREFIX(PeeringEvent<T>::complete_rctx);
DEBUGI("{}: submitting ctx", *this);
return shard_services.dispatch_context(
pg->get_collection_ref(),
std::move(ctx));
return pg->complete_rctx(std::move(ctx));
}

ConnectionPipeline &RemotePeeringEvent::get_connection_pipeline()
Expand Down
63 changes: 24 additions & 39 deletions src/crimson/osd/osd_operations/pg_advance_map.cc
Original file line number Diff line number Diff line change
Expand Up @@ -80,48 +80,33 @@ seastar::future<> PGAdvanceMap::start()
* See: https://tracker.ceph.com/issues/61744
*/
from = pg->get_osdmap_epoch();
auto fut = seastar::now();
if (do_init) {
fut = pg->handle_initialize(rctx
).then([this] {
return pg->handle_activate_map(rctx);
});
pg->handle_initialize(rctx);
pg->handle_activate_map(rctx);
Comment on lines +84 to +85
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍

}
return fut.then([this] {
ceph_assert(std::cmp_less_equal(*from, to));
return seastar::do_for_each(
boost::make_counting_iterator(*from + 1),
boost::make_counting_iterator(to + 1),
[this](epoch_t next_epoch) {
logger().debug("{}: start: getting map {}",
*this, next_epoch);
return shard_services.get_map(next_epoch).then(
[this] (cached_map_t&& next_map) {
logger().debug("{}: advancing map to {}",
*this, next_map->get_epoch());
return pg->handle_advance_map(next_map, rctx);
});
}).then([this] {
return pg->handle_activate_map(rctx).then([this] {
logger().debug("{}: map activated", *this);
if (do_init) {
shard_services.pg_created(pg->get_pgid(), pg);
logger().info("PGAdvanceMap::start new pg {}", *pg);
}
return seastar::when_all_succeed(
pg->get_need_up_thru()
? shard_services.send_alive(
pg->get_same_interval_since())
: seastar::now(),
shard_services.dispatch_context(
pg->get_collection_ref(),
std::move(rctx)));
ceph_assert(std::cmp_less_equal(*from, to));
return seastar::do_for_each(
boost::make_counting_iterator(*from + 1),
boost::make_counting_iterator(to + 1),
[this](epoch_t next_epoch) {
logger().debug("{}: start: getting map {}",
*this, next_epoch);
return shard_services.get_map(next_epoch).then(
[this] (cached_map_t&& next_map) {
logger().debug("{}: advancing map to {}",
*this, next_map->get_epoch());
pg->handle_advance_map(next_map, rctx);
return seastar::now();
});
}).then_unpack([this] {
logger().debug("{}: sending pg temp", *this);
return shard_services.send_pg_temp();
});
});
}).then([this] {
pg->handle_activate_map(rctx);
logger().debug("{}: map activated", *this);
if (do_init) {
shard_services.pg_created(pg->get_pgid(), pg);
logger().info("PGAdvanceMap::start new pg {}", *pg);
}
return pg->complete_rctx(std::move(rctx));
});
}).then([this] {
logger().debug("{}: complete", *this);
return handle.complete();
Expand Down
66 changes: 27 additions & 39 deletions src/crimson/osd/pg.cc
Original file line number Diff line number Diff line change
Expand Up @@ -738,61 +738,49 @@ seastar::future<> PG::read_state(crimson::os::FuturizedStore::Shard* store)
});
}

PG::interruptible_future<> PG::do_peering_event(
void PG::do_peering_event(
PGPeeringEvent& evt, PeeringCtx &rctx)
{
if (peering_state.pg_has_reset_since(evt.get_epoch_requested()) ||
peering_state.pg_has_reset_since(evt.get_epoch_sent())) {
logger().debug("{} ignoring {} -- pg has reset", __func__, evt.get_desc());
return interruptor::now();
} else {
logger().debug("{} handling {} for pg: {}", __func__, evt.get_desc(), pgid);
// all peering event handling needs to be run in a dedicated seastar::thread,
// so that event processing can involve I/O reqs freely, for example: PG::on_removal,
// PG::on_new_interval
return interruptor::async([this, &evt, &rctx] {
peering_state.handle_event(
evt.get_event(),
&rctx);
peering_state.write_if_dirty(rctx.transaction);
});
peering_state.handle_event(
evt.get_event(),
&rctx);
peering_state.write_if_dirty(rctx.transaction);
}
}

seastar::future<> PG::handle_advance_map(
void PG::handle_advance_map(
cached_map_t next_map, PeeringCtx &rctx)
{
return seastar::async([this, next_map=std::move(next_map), &rctx] {
vector<int> newup, newacting;
int up_primary, acting_primary;
next_map->pg_to_up_acting_osds(
pgid.pgid,
&newup, &up_primary,
&newacting, &acting_primary);
peering_state.advance_map(
next_map,
peering_state.get_osdmap(),
newup,
up_primary,
newacting,
acting_primary,
rctx);
osdmap_gate.got_map(next_map->get_epoch());
});
}

seastar::future<> PG::handle_activate_map(PeeringCtx &rctx)
vector<int> newup, newacting;
int up_primary, acting_primary;
next_map->pg_to_up_acting_osds(
pgid.pgid,
&newup, &up_primary,
&newacting, &acting_primary);
peering_state.advance_map(
next_map,
peering_state.get_osdmap(),
newup,
up_primary,
newacting,
acting_primary,
rctx);
osdmap_gate.got_map(next_map->get_epoch());
}

void PG::handle_activate_map(PeeringCtx &rctx)
{
return seastar::async([this, &rctx] {
peering_state.activate_map(rctx);
});
peering_state.activate_map(rctx);
}

seastar::future<> PG::handle_initialize(PeeringCtx &rctx)
void PG::handle_initialize(PeeringCtx &rctx)
{
return seastar::async([this, &rctx] {
peering_state.handle_event(PeeringState::Initialize{}, &rctx);
});
peering_state.handle_event(PeeringState::Initialize{}, &rctx);
}

void PG::init_collection_pool_opts()
Expand Down
Loading