Skip to content

Commit e61681d

Browse files
authored
runtime: making runtime accessible from non-worker threads (#7695)
Making runtime accessible for non-worker threads, and using the new accessor for runtime features. This allows the work done in #7601, moving the strict HTTP checks out of the HCM and into the codec, where the integration tests use them from client/server threads, and other downstream Envoys might use them from non-worker threads as well. Risk Level: High (affects runtime access for all runtime features) Testing: new unit tests, integration tests use in #7601 Docs Changes: n/a Release Notes: n/a Signed-off-by: Alyssa Wilk <alyssar@chromium.org>
1 parent 95e49d6 commit e61681d

9 files changed

Lines changed: 95 additions & 7 deletions

File tree

include/envoy/runtime/runtime.h

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -222,10 +222,16 @@ class Loader {
222222
/**
223223
* @return const Snapshot& the current snapshot. This reference is safe to use for the duration of
224224
* the calling routine, but may be overwritten on a future event loop cycle so should be
225-
* fetched again when needed.
225+
* fetched again when needed. This may only be called from worker threads.
226226
*/
227227
virtual const Snapshot& snapshot() PURE;
228228

229+
/**
230+
* @return shared_ptr<const Snapshot> the current snapshot. This function may safely be called
231+
* from non-worker theads.
232+
*/
233+
virtual std::shared_ptr<const Snapshot> threadsafeSnapshot() PURE;
234+
229235
/**
230236
* Merge the given map of key-value pairs into the runtime's state. To remove a previous merge for
231237
* a key, use an empty string as the value.

include/envoy/thread_local/thread_local.h

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,16 @@ class Slot {
2828
public:
2929
virtual ~Slot() = default;
3030

31+
/**
32+
* Returns if there is thread local data for this thread.
33+
*
34+
* This should return true for Envoy worker threads and false for threads which do not have thread
35+
* local storage allocated.
36+
*
37+
* @return true if registerThread has been called for this thread, false otherwise.
38+
*/
39+
virtual bool currentThreadRegistered() PURE;
40+
3141
/**
3242
* @return ThreadLocalObjectSharedPtr a thread local object stored in the slot.
3343
*/

source/common/runtime/runtime_impl.cc

Lines changed: 25 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,8 @@ namespace Runtime {
2727
bool runtimeFeatureEnabled(absl::string_view feature) {
2828
ASSERT(absl::StartsWith(feature, "envoy.reloadable_features"));
2929
if (Runtime::LoaderSingleton::getExisting()) {
30-
return Runtime::LoaderSingleton::getExisting()->snapshot().runtimeFeatureEnabled(feature);
30+
return Runtime::LoaderSingleton::getExisting()->threadsafeSnapshot()->runtimeFeatureEnabled(
31+
feature);
3132
}
3233
ENVOY_LOG_TO_LOGGER(Envoy::Logger::Registry::getLog(Envoy::Logger::Id::runtime), warn,
3334
"Unable to use runtime singleton for feature {}", feature);
@@ -551,13 +552,32 @@ void RtdsSubscription::validateUpdateSize(uint32_t num_resources) {
551552
}
552553

553554
void LoaderImpl::loadNewSnapshot() {
554-
ThreadLocal::ThreadLocalObjectSharedPtr ptr = createNewSnapshot();
555-
tls_->set([ptr = std::move(ptr)](Event::Dispatcher&) -> ThreadLocal::ThreadLocalObjectSharedPtr {
556-
return ptr;
555+
std::shared_ptr<SnapshotImpl> ptr = createNewSnapshot();
556+
tls_->set([ptr](Event::Dispatcher&) -> ThreadLocal::ThreadLocalObjectSharedPtr {
557+
return std::static_pointer_cast<ThreadLocal::ThreadLocalObject>(ptr);
557558
});
559+
560+
{
561+
absl::MutexLock lock(&snapshot_mutex_);
562+
thread_safe_snapshot_ = ptr;
563+
}
564+
}
565+
566+
const Snapshot& LoaderImpl::snapshot() {
567+
ASSERT(tls_->currentThreadRegistered(), "snapshot can only be called from a worker thread");
568+
return tls_->getTyped<Snapshot>();
558569
}
559570

560-
const Snapshot& LoaderImpl::snapshot() { return tls_->getTyped<Snapshot>(); }
571+
std::shared_ptr<const Snapshot> LoaderImpl::threadsafeSnapshot() {
572+
if (tls_->currentThreadRegistered()) {
573+
return std::dynamic_pointer_cast<const Snapshot>(tls_->get());
574+
}
575+
576+
{
577+
absl::ReaderMutexLock lock(&snapshot_mutex_);
578+
return thread_safe_snapshot_;
579+
}
580+
}
561581

562582
void LoaderImpl::mergeValues(const std::unordered_map<std::string, std::string>& values) {
563583
if (admin_layer_ == nullptr) {

source/common/runtime/runtime_impl.h

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -244,6 +244,7 @@ class LoaderImpl : public Loader, Logger::Loggable<Logger::Id::runtime> {
244244
// Runtime::Loader
245245
void initialize(Upstream::ClusterManager& cm) override;
246246
const Snapshot& snapshot() override;
247+
std::shared_ptr<const Snapshot> threadsafeSnapshot() override;
247248
void mergeValues(const std::unordered_map<std::string, std::string>& values) override;
248249

249250
private:
@@ -265,6 +266,9 @@ class LoaderImpl : public Loader, Logger::Loggable<Logger::Id::runtime> {
265266
Api::Api& api_;
266267
std::vector<RtdsSubscriptionPtr> subscriptions_;
267268
Upstream::ClusterManager* cm_{};
269+
270+
absl::Mutex snapshot_mutex_;
271+
std::shared_ptr<const Snapshot> thread_safe_snapshot_ GUARDED_BY(snapshot_mutex_);
268272
};
269273

270274
} // namespace Runtime

source/common/thread_local/thread_local_impl.cc

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -37,8 +37,12 @@ SlotPtr InstanceImpl::allocateSlot() {
3737
return slot;
3838
}
3939

40+
bool InstanceImpl::SlotImpl::currentThreadRegistered() {
41+
return thread_local_data_.data_.size() > index_;
42+
}
43+
4044
ThreadLocalObjectSharedPtr InstanceImpl::SlotImpl::get() {
41-
ASSERT(thread_local_data_.data_.size() > index_);
45+
ASSERT(currentThreadRegistered());
4246
return thread_local_data_.data_[index_];
4347
}
4448

source/common/thread_local/thread_local_impl.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@ class InstanceImpl : Logger::Loggable<Logger::Id::main>, public Instance {
3434

3535
// ThreadLocal::Slot
3636
ThreadLocalObjectSharedPtr get() override;
37+
bool currentThreadRegistered() override;
3738
void runOnAllThreads(Event::PostCb cb) override { parent_.runOnAllThreads(cb); }
3839
void runOnAllThreads(Event::PostCb cb, Event::PostCb main_callback) override {
3940
parent_.runOnAllThreads(cb, main_callback);

test/common/runtime/runtime_impl_test.cc

Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -610,6 +610,46 @@ TEST_F(StaticLoaderImplTest, ProtoParsing) {
610610
EXPECT_EQ(2, store_.gauge("runtime.num_layers", Stats::Gauge::ImportMode::NeverImport).value());
611611
}
612612

613+
TEST_F(StaticLoaderImplTest, RuntimeFromNonWorkerThreads) {
614+
// Force the thread to be considered a non-worker thread.
615+
tls_.registered_ = false;
616+
setup();
617+
618+
// Set up foo -> bar
619+
loader_->mergeValues({{"foo", "bar"}});
620+
EXPECT_EQ("bar", loader_->threadsafeSnapshot()->get("foo"));
621+
const Snapshot* original_snapshot_pointer = loader_->threadsafeSnapshot().get();
622+
623+
// Now set up a test thread which verifies foo -> bar
624+
//
625+
// Then change foo and make sure the test thread picks up the change.
626+
Thread::MutexBasicLockable mutex;
627+
Thread::CondVar foo_read;
628+
Thread::CondVar foo_changed;
629+
const Snapshot* original_thread_snapshot_pointer = nullptr;
630+
auto thread = Thread::threadFactoryForTest().createThread([&]() {
631+
Thread::LockGuard lock(mutex);
632+
EXPECT_EQ("bar", loader_->threadsafeSnapshot()->get("foo"));
633+
original_thread_snapshot_pointer = loader_->threadsafeSnapshot().get();
634+
EXPECT_EQ(original_thread_snapshot_pointer, loader_->threadsafeSnapshot().get());
635+
foo_read.notifyOne();
636+
637+
foo_changed.wait(mutex);
638+
EXPECT_EQ("eep", loader_->threadsafeSnapshot()->get("foo"));
639+
});
640+
641+
{
642+
Thread::LockGuard lock(mutex);
643+
foo_read.wait(mutex);
644+
loader_->mergeValues({{"foo", "eep"}});
645+
foo_changed.notifyOne();
646+
EXPECT_EQ("eep", loader_->threadsafeSnapshot()->get("foo"));
647+
}
648+
649+
thread->join();
650+
EXPECT_EQ(original_thread_snapshot_pointer, original_snapshot_pointer);
651+
}
652+
613653
class DiskLayerTest : public testing::Test {
614654
protected:
615655
DiskLayerTest() : api_(Api::createApiForTest()) {}

test/mocks/runtime/mocks.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -64,6 +64,7 @@ class MockLoader : public Loader {
6464

6565
MOCK_METHOD1(initialize, void(Upstream::ClusterManager& cm));
6666
MOCK_METHOD0(snapshot, const Snapshot&());
67+
MOCK_METHOD0(threadsafeSnapshot, std::shared_ptr<const Snapshot>());
6768
MOCK_METHOD1(mergeValues, void(const std::unordered_map<std::string, std::string>&));
6869

6970
testing::NiceMock<MockSnapshot> snapshot_;

test/mocks/thread_local/mocks.h

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -58,6 +58,7 @@ class MockInstance : public Instance {
5858

5959
// ThreadLocal::Slot
6060
ThreadLocalObjectSharedPtr get() override { return parent_.data_[index_]; }
61+
bool currentThreadRegistered() override { return parent_.registered_; }
6162
void runOnAllThreads(Event::PostCb cb) override { parent_.runOnAllThreads(cb); }
6263
void runOnAllThreads(Event::PostCb cb, Event::PostCb main_callback) override {
6364
parent_.runOnAllThreads(cb, main_callback);
@@ -72,6 +73,7 @@ class MockInstance : public Instance {
7273
testing::NiceMock<Event::MockDispatcher> dispatcher_;
7374
std::vector<ThreadLocalObjectSharedPtr> data_;
7475
bool shutdown_{};
76+
bool registered_{true};
7577
};
7678

7779
} // namespace ThreadLocal

0 commit comments

Comments
 (0)