Skip to content

Commit ce52408

Browse files
authored
Merge pull request #6118 from cloudflare/mmcdonnell/do-cohorts
Add version+cohort options to durable objects
2 parents 9775779 + 559bf18 commit ce52408

11 files changed

Lines changed: 104 additions & 26 deletions

File tree

src/workerd/api/actor-state.c++

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1064,7 +1064,8 @@ DurableObjectState::DurableObjectState(jsg::Lock& js,
10641064
kj::Maybe<jsg::Ref<DurableObjectStorage>> storage,
10651065
kj::Maybe<rpc::Container::Client> container,
10661066
bool containerRunning,
1067-
kj::Maybe<Worker::Actor::FacetManager&> facetManager)
1067+
kj::Maybe<Worker::Actor::FacetManager&> facetManager,
1068+
kj::Maybe<ActorVersion> version)
10681069
: id(kj::mv(actorId)),
10691070
exports(js, exports),
10701071
props(js, props),
@@ -1073,7 +1074,8 @@ DurableObjectState::DurableObjectState(jsg::Lock& js,
10731074
return js.alloc<Container>(kj::mv(cap), containerRunning);
10741075
})),
10751076
facetManager(facetManager.map(
1076-
[&](Worker::Actor::FacetManager& ref) { return IoContext::current().addObject(ref); })) {}
1077+
[](Worker::Actor::FacetManager& ref) { return IoContext::current().addObject(ref); })),
1078+
version(kj::mv(version)) {}
10771079

10781080
void DurableObjectState::waitUntil(kj::Promise<void> promise) {
10791081
IoContext::current().addWaitUntil(kj::mv(promise));

src/workerd/api/actor-state.h

Lines changed: 15 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -568,7 +568,8 @@ class DurableObjectState: public jsg::Object {
568568
kj::Maybe<jsg::Ref<DurableObjectStorage>> storage,
569569
kj::Maybe<rpc::Container::Client> container,
570570
bool containerRunning,
571-
kj::Maybe<Worker::Actor::FacetManager&> facetManager);
571+
kj::Maybe<Worker::Actor::FacetManager&> facetManager,
572+
kj::Maybe<ActorVersion> version = kj::none);
572573

573574
void waitUntil(kj::Promise<void> promise);
574575

@@ -586,6 +587,15 @@ class DurableObjectState: public jsg::Object {
586587
return storage.map([&](jsg::Ref<DurableObjectStorage>& p) { return p.addRef(); });
587588
}
588589

590+
struct Version {
591+
jsg::Optional<kj::StringPtr> cohort;
592+
JSG_STRUCT(cohort);
593+
};
594+
jsg::Optional<Version> getVersion() {
595+
return version.map([](ActorVersion& v) -> Version {
596+
return Version{.cohort = v.cohort.map([](kj::String& s) -> kj::StringPtr { return s; })};
597+
});
598+
}
589599
jsg::Optional<jsg::Ref<Container>> getContainer() {
590600
return container.map([](jsg::Ref<Container>& c) { return c.addRef(); });
591601
}
@@ -666,6 +676,7 @@ class DurableObjectState: public jsg::Object {
666676
if (flags.getWorkerdExperimental()) {
667677
// Experimental new API, details may change!
668678
JSG_LAZY_INSTANCE_PROPERTY(facets, getFacets);
679+
JSG_LAZY_INSTANCE_PROPERTY(version, getVersion);
669680
}
670681
JSG_METHOD(blockConcurrencyWhile);
671682
JSG_METHOD(acceptWebSocket);
@@ -726,6 +737,7 @@ class DurableObjectState: public jsg::Object {
726737
kj::Maybe<jsg::Ref<DurableObjectStorage>> storage;
727738
kj::Maybe<jsg::Ref<Container>> container;
728739
kj::Maybe<IoPtr<Worker::Actor::FacetManager>> facetManager;
740+
kj::Maybe<ActorVersion> version;
729741

730742
// Limits for Hibernatable WebSocket tags.
731743

@@ -741,6 +753,7 @@ class DurableObjectState: public jsg::Object {
741753
api::DurableObjectStorageOperations::GetAlarmOptions, \
742754
api::DurableObjectStorageOperations::PutOptions, \
743755
api::DurableObjectStorageOperations::SetAlarmOptions, api::WebSocketRequestResponsePair, \
744-
api::DurableObjectFacets, api::DurableObjectFacets::StartupOptions
756+
api::DurableObjectFacets, api::DurableObjectFacets::StartupOptions, \
757+
api::DurableObjectState::Version
745758

746759
} // namespace workerd::api

src/workerd/api/actor.c++

Lines changed: 15 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -50,13 +50,13 @@ kj::Own<WorkerInterface> GlobalActorOutgoingFactory::newSingleUseClient(
5050
if (actorChannel == kj::none) {
5151
KJ_SWITCH_ONEOF(channelIdOrFactory) {
5252
KJ_CASE_ONEOF(channelId, uint) {
53-
actorChannel =
54-
context.getGlobalActorChannel(channelId, id->getInner(), kj::mv(locationHint), mode,
55-
enableReplicaRouting, routingMode, tracing.getInternalSpanParent());
53+
actorChannel = context.getGlobalActorChannel(channelId, id->getInner(),
54+
kj::mv(locationHint), mode, enableReplicaRouting, routingMode,
55+
tracing.getInternalSpanParent(), kj::mv(version));
5656
}
5757
KJ_CASE_ONEOF(factory, kj::Own<DurableObjectNamespace::ActorChannelFactory>) {
5858
actorChannel = factory->getGlobalActor(id->getInner(), kj::mv(locationHint), mode,
59-
enableReplicaRouting, routingMode, tracing.getInternalSpanParent());
59+
enableReplicaRouting, routingMode, tracing.getInternalSpanParent(), kj::mv(version));
6060
}
6161
}
6262
}
@@ -155,21 +155,28 @@ jsg::Ref<DurableObject> DurableObjectNamespace::getImpl(jsg::Lock& js,
155155

156156
auto& context = IoContext::current();
157157
kj::Maybe<kj::String> locationHint;
158+
kj::Maybe<ActorVersion> version;
158159
KJ_IF_SOME(o, options) {
159160
locationHint = kj::mv(o.locationHint);
161+
if (FeatureFlags::get(js).getWorkerdExperimental()) {
162+
KJ_IF_SOME(v, o.version) {
163+
version = ActorVersion{.cohort = kj::mv(v.cohort)};
164+
}
165+
}
160166
}
161167

162168
bool enableReplicaRouting = FeatureFlags::get(js).getReplicaRouting();
163169

164170
kj::Own<Fetcher::OutgoingFactory> outgoingFactory;
165171
KJ_SWITCH_ONEOF(channel) {
166172
KJ_CASE_ONEOF(channelId, uint) {
167-
outgoingFactory = kj::heap<GlobalActorOutgoingFactory>(
168-
channelId, id.addRef(), kj::mv(locationHint), mode, enableReplicaRouting, routingMode);
173+
outgoingFactory = kj::heap<GlobalActorOutgoingFactory>(channelId, id.addRef(),
174+
kj::mv(locationHint), mode, enableReplicaRouting, routingMode, kj::mv(version));
169175
}
170176
KJ_CASE_ONEOF(channelFactory, IoOwn<ActorChannelFactory>) {
171-
outgoingFactory = kj::heap<GlobalActorOutgoingFactory>(kj::addRef(*channelFactory),
172-
id.addRef(), kj::mv(locationHint), mode, enableReplicaRouting, routingMode);
177+
outgoingFactory =
178+
kj::heap<GlobalActorOutgoingFactory>(kj::addRef(*channelFactory), id.addRef(),
179+
kj::mv(locationHint), mode, enableReplicaRouting, routingMode, kj::mv(version));
173180
}
174181
}
175182

src/workerd/api/actor.h

Lines changed: 36 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -157,7 +157,8 @@ class DurableObjectNamespace: public jsg::Object {
157157
ActorGetMode mode,
158158
bool enableReplicaRouting,
159159
ActorRoutingMode routingMode,
160-
SpanParent parentSpan) = 0;
160+
SpanParent parentSpan,
161+
kj::Maybe<ActorVersion> version) = 0;
161162
};
162163

163164
DurableObjectNamespace(uint channel, kj::Own<ActorIdFactory> idFactory)
@@ -202,17 +203,39 @@ class DurableObjectNamespace: public jsg::Object {
202203
// - "primary-only": guarantees we route directly to the primary (skip any replicas).
203204
jsg::Optional<kj::String> routingMode;
204205

205-
JSG_STRUCT(locationHint, routingMode);
206+
struct VersionOptions {
207+
jsg::Optional<kj::String> cohort;
208+
JSG_STRUCT(cohort);
209+
JSG_STRUCT_TS_OVERRIDE_DYNAMIC(CompatibilityFlags::Reader flags) {
210+
if (!flags.getWorkerdExperimental()) {
211+
JSG_TS_OVERRIDE(type VersionOptions = never);
212+
}
213+
}
214+
};
215+
jsg::Optional<VersionOptions> version;
216+
217+
JSG_STRUCT(locationHint, routingMode, version);
206218

207219
// DurableObjectLocationHint values from https://developers.cloudflare.com/workers/runtime-apis/durable-objects/#providing-a-location-hint
208220
JSG_STRUCT_TS_DEFINE(
209221
type DurableObjectLocationHint = "wnam" | "enam" | "sam" | "weur" | "eeur" | "apac" | "oc" | "afr" | "me";
210222
type DurableObjectRoutingMode = "primary-only");
211223

212-
JSG_STRUCT_TS_OVERRIDE({
213-
locationHint?: DurableObjectLocationHint;
214-
routingMode?: DurableObjectRoutingMode;
215-
});
224+
JSG_STRUCT_TS_OVERRIDE_DYNAMIC(CompatibilityFlags::Reader flags) {
225+
if (flags.getWorkerdExperimental()) {
226+
JSG_TS_OVERRIDE({
227+
locationHint?: DurableObjectLocationHint;
228+
routingMode?: DurableObjectRoutingMode;
229+
version?: { cohort?: string };
230+
});
231+
} else {
232+
JSG_TS_OVERRIDE({
233+
locationHint?: DurableObjectLocationHint;
234+
routingMode?: DurableObjectRoutingMode;
235+
version: never;
236+
});
237+
}
238+
}
216239
};
217240

218241
// Gets a durable object by ID or creates it if it doesn't already exist.
@@ -281,13 +304,15 @@ class GlobalActorOutgoingFactory final: public Fetcher::OutgoingFactory {
281304
kj::Maybe<kj::String> locationHint,
282305
ActorGetMode mode,
283306
bool enableReplicaRouting,
284-
ActorRoutingMode routingMode)
307+
ActorRoutingMode routingMode,
308+
kj::Maybe<ActorVersion> version)
285309
: channelIdOrFactory(kj::mv(channelIdOrFactory)),
286310
id(kj::mv(id)),
287311
locationHint(kj::mv(locationHint)),
288312
mode(mode),
289313
enableReplicaRouting(enableReplicaRouting),
290-
routingMode(routingMode) {}
314+
routingMode(routingMode),
315+
version(kj::mv(version)) {}
291316

292317
kj::Own<WorkerInterface> newSingleUseClient(kj::Maybe<kj::String> cfStr) override;
293318

@@ -298,6 +323,7 @@ class GlobalActorOutgoingFactory final: public Fetcher::OutgoingFactory {
298323
ActorGetMode mode;
299324
bool enableReplicaRouting;
300325
ActorRoutingMode routingMode;
326+
kj::Maybe<ActorVersion> version;
301327
kj::Maybe<kj::Own<IoChannelFactory::ActorChannel>> actorChannel;
302328
};
303329

@@ -367,6 +393,7 @@ class DurableObjectClass: public jsg::Object {
367393
#define EW_ACTOR_ISOLATE_TYPES \
368394
api::ColoLocalActorNamespace, api::DurableObject, api::DurableObjectId, \
369395
api::DurableObjectNamespace, api::DurableObjectNamespace::NewUniqueIdOptions, \
370-
api::DurableObjectNamespace::GetDurableObjectOptions, api::DurableObjectClass
396+
api::DurableObjectNamespace::GetDurableObjectOptions, api::DurableObjectClass, \
397+
api::DurableObjectNamespace::GetDurableObjectOptions::VersionOptions
371398

372399
} // namespace workerd::api

src/workerd/io/actor-id.h

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,11 @@ enum class ActorRoutingMode {
2323
PRIMARY_ONLY
2424
};
2525

26+
// Version information for an actor. Used to specify cohort.
27+
struct ActorVersion {
28+
kj::Maybe<kj::String> cohort;
29+
};
30+
2631
// An abstract class that implements generation of global actor IDs in a particular namespace.
2732
//
2833
// This is NOT at I/O type. Each global actor namespace binding holds one instance of this which

src/workerd/io/io-channels.h

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -224,7 +224,8 @@ class IoChannelFactory {
224224
ActorGetMode mode,
225225
bool enableReplicaRouting,
226226
ActorRoutingMode routingMode,
227-
SpanParent parentSpan) = 0;
227+
SpanParent parentSpan,
228+
kj::Maybe<ActorVersion> version) = 0;
228229

229230
// Get an actor stub from the given namespace for the actor with the given name.
230231
virtual kj::Own<ActorChannel> getColoLocalActor(

src/workerd/io/io-context.h

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -844,9 +844,10 @@ class IoContext final: public kj::Refcounted, private kj::TaskSet::ErrorHandler
844844
ActorGetMode mode,
845845
bool enableReplicaRouting,
846846
ActorRoutingMode routingMode,
847-
SpanParent parentSpan) {
847+
SpanParent parentSpan,
848+
kj::Maybe<ActorVersion> version) {
848849
return getIoChannelFactory().getGlobalActor(channel, id, kj::mv(locationHint), mode,
849-
enableReplicaRouting, routingMode, kj::mv(parentSpan));
850+
enableReplicaRouting, routingMode, kj::mv(parentSpan), kj::mv(version));
850851
}
851852
kj::Own<IoChannelFactory::ActorChannel> getColoLocalActorChannel(
852853
uint channel, kj::StringPtr id, SpanParent parentSpan) {

src/workerd/server/server.c++

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3316,7 +3316,8 @@ class Server::WorkerService final: public Service,
33163316
ActorGetMode mode,
33173317
bool enableReplicaRouting,
33183318
ActorRoutingMode routingMode,
3319-
SpanParent parentSpan) override {
3319+
SpanParent parentSpan,
3320+
kj::Maybe<ActorVersion> version) override {
33203321
JSG_REQUIRE(mode == ActorGetMode::GET_OR_CREATE, Error,
33213322
"workerd only supports GET_OR_CREATE mode for getting actor stubs");
33223323
JSG_REQUIRE(!enableReplicaRouting, Error, "workerd does not support replica routing.");

src/workerd/tests/test-fixture.c++

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -132,7 +132,8 @@ struct DummyIoChannelFactory final: public IoChannelFactory {
132132
ActorGetMode mode,
133133
bool enableReplicaRouting,
134134
ActorRoutingMode routingMode,
135-
SpanParent parentSpan) override {
135+
SpanParent parentSpan,
136+
kj::Maybe<ActorVersion> version) override {
136137
KJ_FAIL_REQUIRE("no actor channels");
137138
}
138139

types/generated-snapshot/experimental/index.d.ts

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -635,10 +635,16 @@ type DurableObjectRoutingMode = "primary-only";
635635
interface DurableObjectNamespaceGetDurableObjectOptions {
636636
locationHint?: DurableObjectLocationHint;
637637
routingMode?: DurableObjectRoutingMode;
638+
version?: {
639+
cohort?: string;
640+
};
638641
}
639642
interface DurableObjectClass<
640643
_T extends Rpc.DurableObjectBranded | undefined = undefined,
641644
> {}
645+
interface DurableObjectNamespaceGetDurableObjectOptionsVersionOptions {
646+
cohort?: string;
647+
}
642648
interface DurableObjectState<Props = unknown> {
643649
waitUntil(promise: Promise<any>): void;
644650
readonly exports: Cloudflare.Exports;
@@ -647,6 +653,7 @@ interface DurableObjectState<Props = unknown> {
647653
readonly storage: DurableObjectStorage;
648654
container?: Container;
649655
facets: DurableObjectFacets;
656+
version?: DurableObjectStateVersion;
650657
blockConcurrencyWhile<T>(callback: () => Promise<T>): Promise<T>;
651658
acceptWebSocket(ws: WebSocket, tags?: string[]): void;
652659
getWebSockets(tag?: string): WebSocket[];
@@ -781,6 +788,9 @@ interface FacetStartupOptions<
781788
id?: DurableObjectId | string;
782789
class: DurableObjectClass<T>;
783790
}
791+
interface DurableObjectStateVersion {
792+
cohort?: string;
793+
}
784794
interface AnalyticsEngineDataset {
785795
writeDataPoint(event?: AnalyticsEngineDataPoint): void;
786796
}

0 commit comments

Comments
 (0)