Skip to content

Commit 38de991

Browse files
authored
.NET: Fix RequestInfoEvent lost when resuming workflow from checkpoint (#4955)
* Fix RequestInfoEvent lost when resuming workflow from checkpoint * Fix streaming run double disposal in tests and lockstep republishing before Started event is emitted. * Fix bug to remove messages after sending to avoid losing messages on send failure. * Fix declarative test harness
1 parent 25696a7 commit 38de991

12 files changed

Lines changed: 744 additions & 55 deletions

File tree

dotnet/src/Microsoft.Agents.AI.Workflows/Checkpointing/ICheckpointingHandle.cs

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,15 @@ internal interface ICheckpointingHandle
2121
/// <summary>
2222
/// Restores the system state from the specified checkpoint asynchronously.
2323
/// </summary>
24+
/// <remarks>
25+
/// This contract is used by live runtime restore paths. Implementations may re-emit pending
26+
/// external request events as part of the restore once the active event stream is ready to
27+
/// observe them.
28+
///
29+
/// Initial resume paths that create a new event stream should restore state first and defer
30+
/// any replay until after the subscriber is attached, rather than calling this contract
31+
/// directly before the stream is ready.
32+
/// </remarks>
2433
/// <param name="checkpointInfo">The checkpoint information that identifies the state to restore. Cannot be null.</param>
2534
/// <param name="cancellationToken">A cancellation token that can be used to cancel the restore operation.</param>
2635
/// <returns>A <see cref="ValueTask"/> that represents the asynchronous restore operation.</returns>

dotnet/src/Microsoft.Agents.AI.Workflows/Execution/AsyncRunHandle.cs

Lines changed: 12 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -36,9 +36,10 @@ internal AsyncRunHandle(ISuperStepRunner stepRunner, ICheckpointingHandle checkp
3636

3737
this._eventStream.Start();
3838

39-
// If there are already unprocessed messages (e.g., from a checkpoint restore that happened
40-
// before this handle was created), signal the run loop to start processing them
41-
if (stepRunner.HasUnprocessedMessages)
39+
// If there are already unprocessed messages or unserviced requests (e.g., from a
40+
// checkpoint restore that happened before this handle was created), signal the run
41+
// loop to start processing them
42+
if (stepRunner.HasUnprocessedMessages || stepRunner.HasUnservicedRequests)
4243
{
4344
this.SignalInputToRunLoop();
4445
}
@@ -192,13 +193,17 @@ public async ValueTask RestoreCheckpointAsync(CheckpointInfo checkpointInfo, Can
192193
{
193194
streamingEventStream.ClearBufferedEvents();
194195
}
196+
else if (this._eventStream is LockstepRunEventStream lockstepEventStream)
197+
{
198+
lockstepEventStream.ClearBufferedEvents();
199+
}
195200

196-
// Restore the workflow state - this will republish unserviced requests as new events
201+
// Restore the workflow state through the live runtime-restore path.
202+
// This can re-emit pending requests into the already-active event stream.
197203
await this._checkpointingHandle.RestoreCheckpointAsync(checkpointInfo, cancellationToken).ConfigureAwait(false);
198204

199-
// After restore, signal the run loop to process any restored messages
200-
// This is necessary because ClearBufferedEvents() doesn't signal, and the restored
201-
// queued messages won't automatically wake up the run loop
205+
// After restore, signal the run loop to process any restored messages. Initial resume
206+
// paths handle this separately when they create the event stream after restoring state.
202207
this.SignalInputToRunLoop();
203208
}
204209
}

dotnet/src/Microsoft.Agents.AI.Workflows/Execution/ISuperStepRunner.cs

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,14 @@ internal interface ISuperStepRunner
2727

2828
ConcurrentEventSink OutgoingEvents { get; }
2929

30+
/// <summary>
31+
/// Re-emits <see cref="RequestInfoEvent"/>s for any pending external requests.
32+
/// Called by event streams after subscribing to <see cref="OutgoingEvents"/> so that
33+
/// requests restored from a checkpoint are observable even when the restore happened
34+
/// before the subscription was active.
35+
/// </summary>
36+
ValueTask RepublishPendingEventsAsync(CancellationToken cancellationToken = default);
37+
3038
ValueTask<bool> RunSuperStepAsync(CancellationToken cancellationToken);
3139

3240
// This cannot be cancelled

dotnet/src/Microsoft.Agents.AI.Workflows/Execution/LockstepRunEventStream.cs

Lines changed: 69 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@ internal sealed class LockstepRunEventStream : IRunEventStream
1515
{
1616
private readonly CancellationTokenSource _stopCancellation = new();
1717
private readonly InputWaiter _inputWaiter = new();
18+
private ConcurrentQueue<WorkflowEvent> _eventSink = new();
1819
private int _isDisposed;
1920

2021
private readonly ISuperStepRunner _stepRunner;
@@ -35,6 +36,8 @@ public void Start()
3536
// doesn't leak into caller code via AsyncLocal.
3637
Activity? previousActivity = Activity.Current;
3738

39+
this._stepRunner.OutgoingEvents.EventRaised += this.OnWorkflowEventAsync;
40+
3841
this._sessionActivity = this._stepRunner.TelemetryContext.StartWorkflowSessionActivity();
3942
this._sessionActivity?.SetTag(Tags.WorkflowId, this._stepRunner.StartExecutorId)
4043
.SetTag(Tags.SessionId, this._stepRunner.SessionId);
@@ -56,10 +59,6 @@ public async IAsyncEnumerable<WorkflowEvent> TakeEventStreamAsync(bool blockOnPe
5659

5760
using CancellationTokenSource linkedSource = CancellationTokenSource.CreateLinkedTokenSource(this._stopCancellation.Token, cancellationToken);
5861

59-
ConcurrentQueue<WorkflowEvent> eventSink = [];
60-
61-
this._stepRunner.OutgoingEvents.EventRaised += OnWorkflowEventAsync;
62-
6362
// Re-establish session as parent so the run activity nests correctly.
6463
Activity.Current = this._sessionActivity;
6564

@@ -73,7 +72,31 @@ public async IAsyncEnumerable<WorkflowEvent> TakeEventStreamAsync(bool blockOnPe
7372
runActivity?.AddEvent(new ActivityEvent(EventNames.WorkflowStarted));
7473

7574
// Emit WorkflowStartedEvent to the event stream for consumers
76-
eventSink.Enqueue(new WorkflowStartedEvent());
75+
this._eventSink.Enqueue(new WorkflowStartedEvent());
76+
77+
// Re-emit any pending external requests that were restored from a checkpoint
78+
// before this subscription was active. For non-resume starts this is a no-op.
79+
// This runs after WorkflowStartedEvent so consumers always see the started event first.
80+
await this._stepRunner.RepublishPendingEventsAsync(linkedSource.Token).ConfigureAwait(false);
81+
82+
// When resuming from a checkpoint with only pending requests (no queued messages),
83+
// the inner processing loop won't execute, so we must drain events now.
84+
// For normal starts this is a no-op since the inner loop handles the drain.
85+
if (!this._stepRunner.HasUnprocessedMessages)
86+
{
87+
var (drainedEvents, shouldHalt) = this.DrainAndFilterEvents();
88+
foreach (WorkflowEvent raisedEvent in drainedEvents)
89+
{
90+
yield return raisedEvent;
91+
}
92+
93+
if (shouldHalt)
94+
{
95+
yield break;
96+
}
97+
98+
this.RunStatus = this._stepRunner.HasUnservicedRequests ? RunStatus.PendingRequests : RunStatus.Idle;
99+
}
77100

78101
do
79102
{
@@ -107,26 +130,19 @@ public async IAsyncEnumerable<WorkflowEvent> TakeEventStreamAsync(bool blockOnPe
107130
yield break; // Exit if cancellation is requested
108131
}
109132

110-
bool hadRequestHaltEvent = false;
111-
foreach (WorkflowEvent raisedEvent in Interlocked.Exchange(ref eventSink, []))
133+
var (drainedEvents, shouldHalt) = this.DrainAndFilterEvents();
134+
135+
foreach (WorkflowEvent raisedEvent in drainedEvents)
112136
{
113137
if (linkedSource.Token.IsCancellationRequested)
114138
{
115139
yield break; // Exit if cancellation is requested
116140
}
117141

118-
// TODO: Do we actually want to interpret this as a termination request?
119-
if (raisedEvent is RequestHaltEvent)
120-
{
121-
hadRequestHaltEvent = true;
122-
}
123-
else
124-
{
125-
yield return raisedEvent;
126-
}
142+
yield return raisedEvent;
127143
}
128144

129-
if (hadRequestHaltEvent || linkedSource.Token.IsCancellationRequested)
145+
if (shouldHalt || linkedSource.Token.IsCancellationRequested)
130146
{
131147
// If we had a completion event, we are done.
132148
yield break;
@@ -151,25 +167,23 @@ public async IAsyncEnumerable<WorkflowEvent> TakeEventStreamAsync(bool blockOnPe
151167
finally
152168
{
153169
this.RunStatus = this._stepRunner.HasUnservicedRequests ? RunStatus.PendingRequests : RunStatus.Idle;
154-
this._stepRunner.OutgoingEvents.EventRaised -= OnWorkflowEventAsync;
155170

156171
// Explicitly dispose the Activity so Activity.Stop fires deterministically,
157172
// regardless of how the async iterator enumerator is disposed.
158173
runActivity?.Dispose();
159174
}
160175

161-
ValueTask OnWorkflowEventAsync(object? sender, WorkflowEvent e)
162-
{
163-
eventSink.Enqueue(e);
164-
return default;
165-
}
166-
167176
// If we are Idle or Ended, we should break out of the loop
168177
// If we are PendingRequests and not blocking on pending requests, we should break out of the loop
169178
// If cancellation is requested, we should break out of the loop
170179
bool ShouldBreak() => this.RunStatus is RunStatus.Idle or RunStatus.Ended ||
171-
(this.RunStatus == RunStatus.PendingRequests && !blockOnPendingRequest) ||
172-
linkedSource.Token.IsCancellationRequested;
180+
(this.RunStatus == RunStatus.PendingRequests && !blockOnPendingRequest) ||
181+
linkedSource.Token.IsCancellationRequested;
182+
}
183+
184+
internal void ClearBufferedEvents()
185+
{
186+
Interlocked.Exchange(ref this._eventSink, new ConcurrentQueue<WorkflowEvent>());
173187
}
174188

175189
/// <summary>
@@ -192,6 +206,7 @@ public ValueTask DisposeAsync()
192206
if (Interlocked.Exchange(ref this._isDisposed, 1) == 0)
193207
{
194208
this._stopCancellation.Cancel();
209+
this._stepRunner.OutgoingEvents.EventRaised -= this.OnWorkflowEventAsync;
195210

196211
// Stop the session activity
197212
if (this._sessionActivity is not null)
@@ -207,4 +222,32 @@ public ValueTask DisposeAsync()
207222

208223
return default;
209224
}
225+
226+
private ValueTask OnWorkflowEventAsync(object? sender, WorkflowEvent e)
227+
{
228+
this._eventSink.Enqueue(e);
229+
return default;
230+
}
231+
232+
// Atomically drains the event sink and separates workflow events from halt signals.
233+
// Used by both the early-drain (resume with pending requests only) and
234+
// the inner superstep drain to keep halt-detection logic in one place.
235+
private (List<WorkflowEvent> Events, bool ShouldHalt) DrainAndFilterEvents()
236+
{
237+
List<WorkflowEvent> events = [];
238+
bool shouldHalt = false;
239+
foreach (WorkflowEvent e in Interlocked.Exchange(ref this._eventSink, new ConcurrentQueue<WorkflowEvent>()))
240+
{
241+
if (e is RequestHaltEvent)
242+
{
243+
shouldHalt = true;
244+
}
245+
else
246+
{
247+
events.Add(e);
248+
}
249+
}
250+
251+
return (events, shouldHalt);
252+
}
210253
}

dotnet/src/Microsoft.Agents.AI.Workflows/Execution/StreamingRunEventStream.cs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -60,6 +60,10 @@ private async Task RunLoopAsync(CancellationToken cancellationToken)
6060
// Subscribe to events - they will flow directly to the channel as they're raised
6161
this._stepRunner.OutgoingEvents.EventRaised += OnEventRaisedAsync;
6262

63+
// Re-emit any pending external requests that were restored from a checkpoint
64+
// before this subscription was active. For non-resume starts this is a no-op.
65+
await this._stepRunner.RepublishPendingEventsAsync(linkedSource.Token).ConfigureAwait(false);
66+
6367
// Start the session-level activity that spans the entire run loop lifetime.
6468
// Individual run-stage activities are nested within this session activity.
6569
Activity? sessionActivity = this._stepRunner.TelemetryContext.StartWorkflowSessionActivity();

dotnet/src/Microsoft.Agents.AI.Workflows/InProc/InProcessExecutionEnvironment.cs

Lines changed: 31 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -50,10 +50,13 @@ internal ValueTask<AsyncRunHandle> BeginRunAsync(Workflow workflow, string? sess
5050
return runner.BeginStreamAsync(this.ExecutionMode, cancellationToken);
5151
}
5252

53-
internal ValueTask<AsyncRunHandle> ResumeRunAsync(Workflow workflow, CheckpointInfo fromCheckpoint, IEnumerable<Type> knownValidInputTypes, CancellationToken cancellationToken)
53+
internal ValueTask<AsyncRunHandle> ResumeRunAsync(Workflow workflow, CheckpointInfo fromCheckpoint, IEnumerable<Type> knownValidInputTypes, CancellationToken cancellationToken = default)
54+
=> this.ResumeRunAsync(workflow, fromCheckpoint, knownValidInputTypes, republishPendingEvents: true, cancellationToken);
55+
56+
internal ValueTask<AsyncRunHandle> ResumeRunAsync(Workflow workflow, CheckpointInfo fromCheckpoint, IEnumerable<Type> knownValidInputTypes, bool republishPendingEvents, CancellationToken cancellationToken = default)
5457
{
5558
InProcessRunner runner = InProcessRunner.CreateTopLevelRunner(workflow, this.CheckpointManager, fromCheckpoint.SessionId, this.EnableConcurrentRuns, knownValidInputTypes);
56-
return runner.ResumeStreamAsync(this.ExecutionMode, fromCheckpoint, cancellationToken);
59+
return runner.ResumeStreamAsync(this.ExecutionMode, fromCheckpoint, republishPendingEvents, cancellationToken);
5760
}
5861

5962
/// <inheritdoc/>
@@ -104,6 +107,32 @@ public async ValueTask<StreamingRun> ResumeStreamingAsync(
104107
return new(runHandle);
105108
}
106109

110+
/// <summary>
111+
/// Resumes a streaming workflow run from a checkpoint with control over whether
112+
/// pending request events are republished through the event stream.
113+
/// </summary>
114+
/// <param name="workflow">The workflow to resume.</param>
115+
/// <param name="fromCheckpoint">The checkpoint to resume from.</param>
116+
/// <param name="republishPendingEvents">
117+
/// When <see langword="true"/>, any pending request events are republished through the event
118+
/// stream after subscribing. When <see langword="false"/>, the caller is responsible for
119+
/// handling pending requests (e.g., <see cref="WorkflowSession"/> already sends responses).
120+
/// </param>
121+
/// <param name="cancellationToken">Cancellation token.</param>
122+
internal async ValueTask<StreamingRun> ResumeStreamingInternalAsync(
123+
Workflow workflow,
124+
CheckpointInfo fromCheckpoint,
125+
bool republishPendingEvents,
126+
CancellationToken cancellationToken = default)
127+
{
128+
this.VerifyCheckpointingConfigured();
129+
130+
AsyncRunHandle runHandle = await this.ResumeRunAsync(workflow, fromCheckpoint, [], republishPendingEvents, cancellationToken)
131+
.ConfigureAwait(false);
132+
133+
return new(runHandle);
134+
}
135+
107136
private async ValueTask<AsyncRunHandle> BeginRunHandlingChatProtocolAsync<TInput>(Workflow workflow,
108137
TInput input,
109138
string? sessionId = null,

0 commit comments

Comments
 (0)