@@ -15,6 +15,7 @@ internal sealed class LockstepRunEventStream : IRunEventStream
1515{
1616 private readonly CancellationTokenSource _stopCancellation = new ( ) ;
1717 private readonly InputWaiter _inputWaiter = new ( ) ;
18+ private ConcurrentQueue < WorkflowEvent > _eventSink = new ( ) ;
1819 private int _isDisposed ;
1920
2021 private readonly ISuperStepRunner _stepRunner ;
@@ -35,6 +36,8 @@ public void Start()
3536 // doesn't leak into caller code via AsyncLocal.
3637 Activity ? previousActivity = Activity . Current ;
3738
39+ this . _stepRunner . OutgoingEvents . EventRaised += this . OnWorkflowEventAsync ;
40+
3841 this . _sessionActivity = this . _stepRunner . TelemetryContext . StartWorkflowSessionActivity ( ) ;
3942 this . _sessionActivity ? . SetTag ( Tags . WorkflowId , this . _stepRunner . StartExecutorId )
4043 . SetTag ( Tags . SessionId , this . _stepRunner . SessionId ) ;
@@ -56,10 +59,6 @@ public async IAsyncEnumerable<WorkflowEvent> TakeEventStreamAsync(bool blockOnPe
5659
5760 using CancellationTokenSource linkedSource = CancellationTokenSource . CreateLinkedTokenSource ( this . _stopCancellation . Token , cancellationToken ) ;
5861
59- ConcurrentQueue < WorkflowEvent > eventSink = [ ] ;
60-
61- this . _stepRunner . OutgoingEvents . EventRaised += OnWorkflowEventAsync ;
62-
6362 // Re-establish session as parent so the run activity nests correctly.
6463 Activity . Current = this . _sessionActivity ;
6564
@@ -73,7 +72,31 @@ public async IAsyncEnumerable<WorkflowEvent> TakeEventStreamAsync(bool blockOnPe
7372 runActivity ? . AddEvent ( new ActivityEvent ( EventNames . WorkflowStarted ) ) ;
7473
7574 // Emit WorkflowStartedEvent to the event stream for consumers
76- eventSink . Enqueue ( new WorkflowStartedEvent ( ) ) ;
75+ this . _eventSink . Enqueue ( new WorkflowStartedEvent ( ) ) ;
76+
77+ // Re-emit any pending external requests that were restored from a checkpoint
78+ // before this subscription was active. For non-resume starts this is a no-op.
79+ // This runs after WorkflowStartedEvent so consumers always see the started event first.
80+ await this . _stepRunner . RepublishPendingEventsAsync ( linkedSource . Token ) . ConfigureAwait ( false ) ;
81+
82+ // When resuming from a checkpoint with only pending requests (no queued messages),
83+ // the inner processing loop won't execute, so we must drain events now.
84+ // For normal starts this is a no-op since the inner loop handles the drain.
85+ if ( ! this . _stepRunner . HasUnprocessedMessages )
86+ {
87+ var ( drainedEvents , shouldHalt ) = this . DrainAndFilterEvents ( ) ;
88+ foreach ( WorkflowEvent raisedEvent in drainedEvents )
89+ {
90+ yield return raisedEvent ;
91+ }
92+
93+ if ( shouldHalt )
94+ {
95+ yield break ;
96+ }
97+
98+ this . RunStatus = this . _stepRunner . HasUnservicedRequests ? RunStatus . PendingRequests : RunStatus . Idle ;
99+ }
77100
78101 do
79102 {
@@ -107,26 +130,19 @@ public async IAsyncEnumerable<WorkflowEvent> TakeEventStreamAsync(bool blockOnPe
107130 yield break ; // Exit if cancellation is requested
108131 }
109132
110- bool hadRequestHaltEvent = false ;
111- foreach ( WorkflowEvent raisedEvent in Interlocked . Exchange ( ref eventSink , [ ] ) )
133+ var ( drainedEvents , shouldHalt ) = this . DrainAndFilterEvents ( ) ;
134+
135+ foreach ( WorkflowEvent raisedEvent in drainedEvents )
112136 {
113137 if ( linkedSource . Token . IsCancellationRequested )
114138 {
115139 yield break ; // Exit if cancellation is requested
116140 }
117141
118- // TODO: Do we actually want to interpret this as a termination request?
119- if ( raisedEvent is RequestHaltEvent )
120- {
121- hadRequestHaltEvent = true ;
122- }
123- else
124- {
125- yield return raisedEvent ;
126- }
142+ yield return raisedEvent ;
127143 }
128144
129- if ( hadRequestHaltEvent || linkedSource . Token . IsCancellationRequested )
145+ if ( shouldHalt || linkedSource . Token . IsCancellationRequested )
130146 {
131147 // If we had a completion event, we are done.
132148 yield break ;
@@ -151,25 +167,23 @@ public async IAsyncEnumerable<WorkflowEvent> TakeEventStreamAsync(bool blockOnPe
151167 finally
152168 {
153169 this . RunStatus = this . _stepRunner . HasUnservicedRequests ? RunStatus . PendingRequests : RunStatus . Idle ;
154- this . _stepRunner . OutgoingEvents . EventRaised -= OnWorkflowEventAsync ;
155170
156171 // Explicitly dispose the Activity so Activity.Stop fires deterministically,
157172 // regardless of how the async iterator enumerator is disposed.
158173 runActivity ? . Dispose ( ) ;
159174 }
160175
161- ValueTask OnWorkflowEventAsync ( object ? sender , WorkflowEvent e )
162- {
163- eventSink . Enqueue ( e ) ;
164- return default ;
165- }
166-
167176 // If we are Idle or Ended, we should break out of the loop
168177 // If we are PendingRequests and not blocking on pending requests, we should break out of the loop
169178 // If cancellation is requested, we should break out of the loop
170179 bool ShouldBreak ( ) => this . RunStatus is RunStatus . Idle or RunStatus . Ended ||
171- ( this . RunStatus == RunStatus . PendingRequests && ! blockOnPendingRequest ) ||
172- linkedSource . Token . IsCancellationRequested ;
180+ ( this . RunStatus == RunStatus . PendingRequests && ! blockOnPendingRequest ) ||
181+ linkedSource . Token . IsCancellationRequested ;
182+ }
183+
184+ internal void ClearBufferedEvents ( )
185+ {
186+ Interlocked . Exchange ( ref this . _eventSink , new ConcurrentQueue < WorkflowEvent > ( ) ) ;
173187 }
174188
175189 /// <summary>
@@ -192,6 +206,7 @@ public ValueTask DisposeAsync()
192206 if ( Interlocked . Exchange ( ref this . _isDisposed , 1 ) == 0 )
193207 {
194208 this . _stopCancellation . Cancel ( ) ;
209+ this . _stepRunner . OutgoingEvents . EventRaised -= this . OnWorkflowEventAsync ;
195210
196211 // Stop the session activity
197212 if ( this . _sessionActivity is not null )
@@ -207,4 +222,32 @@ public ValueTask DisposeAsync()
207222
208223 return default ;
209224 }
225+
226+ private ValueTask OnWorkflowEventAsync ( object ? sender , WorkflowEvent e )
227+ {
228+ this . _eventSink . Enqueue ( e ) ;
229+ return default ;
230+ }
231+
232+ // Atomically drains the event sink and separates workflow events from halt signals.
233+ // Used by both the early-drain (resume with pending requests only) and
234+ // the inner superstep drain to keep halt-detection logic in one place.
235+ private ( List < WorkflowEvent > Events , bool ShouldHalt ) DrainAndFilterEvents ( )
236+ {
237+ List < WorkflowEvent > events = [ ] ;
238+ bool shouldHalt = false ;
239+ foreach ( WorkflowEvent e in Interlocked . Exchange ( ref this . _eventSink , new ConcurrentQueue < WorkflowEvent > ( ) ) )
240+ {
241+ if ( e is RequestHaltEvent )
242+ {
243+ shouldHalt = true ;
244+ }
245+ else
246+ {
247+ events . Add ( e ) ;
248+ }
249+ }
250+
251+ return ( events , shouldHalt ) ;
252+ }
210253}
0 commit comments