Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,9 @@ public async IAsyncEnumerable<WorkflowEvent> TakeEventStreamAsync(bool blockOnPe
this.RunStatus = RunStatus.Running;
runActivity?.AddEvent(new ActivityEvent(EventNames.WorkflowStarted));

// Emit WorkflowStartedEvent to the event stream for consumers
eventSink.Enqueue(new WorkflowStartedEvent());
Comment on lines +75 to +76
Copy link

Copilot AI Mar 6, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In lockstep mode, WorkflowStartedEvent is only enqueued into the local eventSink here, but the sink is drained/yielded only after a RunSuperStepAsync completes. If the first RunSuperStepAsync throws (or the iterator exits before draining), consumers may never observe WorkflowStartedEvent. To make the event reliable and consistent with StreamingRunEventStream, emit/yield the WorkflowStartedEvent before entering the first superstep execution (and ensure it is only emitted once per run cycle).

Suggested change
// Emit WorkflowStartedEvent to the event stream for consumers
eventSink.Enqueue(new WorkflowStartedEvent());
// Emit WorkflowStartedEvent directly so it is observed even if the first superstep fails
yield return new WorkflowStartedEvent();

Copilot uses AI. Check for mistakes.
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@lokitoth I see the benefits of yielding workflow started event immediately before processing begins, but it hands control back to the user and breaks the single channel pattern by bypassing the unified event drain path at the end. Thoughts?

Copy link
Member

@lokitoth lokitoth Mar 11, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That does make sense; but equally reasonable to block until the first SuperStep finishes, because if it fails with a catchable exception that will turn into a WorkflowErrorEvent.


do
{
while (this._stepRunner.HasUnprocessedMessages &&
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,9 +88,16 @@ private async Task RunLoopAsync(CancellationToken cancellationToken)

// Run all available supersteps continuously
// Events are streamed out in real-time as they happen via the event handler
while (this._stepRunner.HasUnprocessedMessages && !linkedSource.Token.IsCancellationRequested)
if (this._stepRunner.HasUnprocessedMessages)
{
await this._stepRunner.RunSuperStepAsync(linkedSource.Token).ConfigureAwait(false);
// Emit WorkflowStartedEvent only when there's actual work to process
// This avoids spurious events on timeout-only loop iterations
await this._eventChannel.Writer.WriteAsync(new WorkflowStartedEvent(), linkedSource.Token).ConfigureAwait(false);

while (this._stepRunner.HasUnprocessedMessages && !linkedSource.Token.IsCancellationRequested)
{
await this._stepRunner.RunSuperStepAsync(linkedSource.Token).ConfigureAwait(false);
}
}

// Update status based on what's waiting
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
// Copyright (c) Microsoft. All rights reserved.

using System.Collections.Generic;
using System.Linq;
using System.Threading.Tasks;
using FluentAssertions;
using Microsoft.Extensions.AI;

namespace Microsoft.Agents.AI.Workflows.UnitTests;
Expand Down Expand Up @@ -88,4 +90,69 @@ public void AgentResponseEvent_IsWorkflowOutputEvent()
Assert.Same(response, evt.Response);
Assert.Same(response, evt.Data);
}

/// <summary>
/// Verifies that WorkflowStartedEvent is emitted first before any SuperStepStartedEvent.
/// </summary>
[Fact]
public async Task StreamingRun_WorkflowStartedEvent_ShouldBeEmittedBefore_SuperStepStartedAsync()
{
// Arrange
TestEchoAgent agent = new("test-agent");
Workflow workflow = AgentWorkflowBuilder.BuildSequential(agent);
ChatMessage inputMessage = new(ChatRole.User, "Hello");

// Act
await using StreamingRun run = await InProcessExecution.RunStreamingAsync(workflow, new List<ChatMessage> { inputMessage });
await run.TrySendMessageAsync(new TurnToken(emitEvents: true));

List<WorkflowEvent> events = [];
await foreach (WorkflowEvent evt in run.WatchStreamAsync())
{
events.Add(evt);
}

// Assert
events.Should().NotBeEmpty();

List<WorkflowStartedEvent> startedEvents = events.OfType<WorkflowStartedEvent>().ToList();
startedEvents.Should().NotBeEmpty();

WorkflowStartedEvent? firstStartedEvent = startedEvents.FirstOrDefault();
SuperStepStartedEvent? firstSuperStepEvent = events.OfType<SuperStepStartedEvent>().FirstOrDefault();
firstSuperStepEvent.Should().NotBeNull();

int startedIndex = events.IndexOf(firstStartedEvent!);
int superStepIndex = events.IndexOf(firstSuperStepEvent!);

startedIndex.Should().BeLessThan(superStepIndex);
}

/// <summary>
/// Verifies that WorkflowStartedEvent is emitted using Lockstep execution mode.
/// </summary>
[Fact]
public async Task StreamingRun_LockstepExecution_ShouldEmit_WorkflowStartedEventAsync()
{
// Arrange
TestEchoAgent agent = new("test-agent");
Workflow workflow = AgentWorkflowBuilder.BuildSequential(agent);
ChatMessage inputMessage = new(ChatRole.User, "Hello");

// Act: Use Lockstep execution mode
await using StreamingRun run = await InProcessExecution.Lockstep.RunStreamingAsync(workflow, new List<ChatMessage> { inputMessage });
await run.TrySendMessageAsync(new TurnToken(emitEvents: true));

List<WorkflowEvent> events = [];
await foreach (WorkflowEvent evt in run.WatchStreamAsync())
{
events.Add(evt);
}

// Assert
events.Should().NotBeEmpty();

List<WorkflowStartedEvent> startedEvents = events.OfType<WorkflowStartedEvent>().ToList();
startedEvents.Should().NotBeEmpty();
}
}
Loading