diff --git a/dotnet/src/Microsoft.Agents.AI.Workflows/Execution/LockstepRunEventStream.cs b/dotnet/src/Microsoft.Agents.AI.Workflows/Execution/LockstepRunEventStream.cs index 506a0d1039..72e96efb10 100644 --- a/dotnet/src/Microsoft.Agents.AI.Workflows/Execution/LockstepRunEventStream.cs +++ b/dotnet/src/Microsoft.Agents.AI.Workflows/Execution/LockstepRunEventStream.cs @@ -72,6 +72,9 @@ public async IAsyncEnumerable TakeEventStreamAsync(bool blockOnPe this.RunStatus = RunStatus.Running; runActivity?.AddEvent(new ActivityEvent(EventNames.WorkflowStarted)); + // Emit WorkflowStartedEvent to the event stream for consumers + eventSink.Enqueue(new WorkflowStartedEvent()); + do { while (this._stepRunner.HasUnprocessedMessages && diff --git a/dotnet/src/Microsoft.Agents.AI.Workflows/Execution/StreamingRunEventStream.cs b/dotnet/src/Microsoft.Agents.AI.Workflows/Execution/StreamingRunEventStream.cs index a09dedd8ad..6278f3446b 100644 --- a/dotnet/src/Microsoft.Agents.AI.Workflows/Execution/StreamingRunEventStream.cs +++ b/dotnet/src/Microsoft.Agents.AI.Workflows/Execution/StreamingRunEventStream.cs @@ -88,9 +88,16 @@ private async Task RunLoopAsync(CancellationToken cancellationToken) // Run all available supersteps continuously // Events are streamed out in real-time as they happen via the event handler - while (this._stepRunner.HasUnprocessedMessages && !linkedSource.Token.IsCancellationRequested) + if (this._stepRunner.HasUnprocessedMessages) { - await this._stepRunner.RunSuperStepAsync(linkedSource.Token).ConfigureAwait(false); + // Emit WorkflowStartedEvent only when there's actual work to process + // This avoids spurious events on timeout-only loop iterations + await this._eventChannel.Writer.WriteAsync(new WorkflowStartedEvent(), linkedSource.Token).ConfigureAwait(false); + + while (this._stepRunner.HasUnprocessedMessages && !linkedSource.Token.IsCancellationRequested) + { + await this._stepRunner.RunSuperStepAsync(linkedSource.Token).ConfigureAwait(false); + } } // Update status based on what's waiting diff --git a/dotnet/tests/Microsoft.Agents.AI.Workflows.UnitTests/AgentEventsTests.cs b/dotnet/tests/Microsoft.Agents.AI.Workflows.UnitTests/AgentEventsTests.cs index aadef98bac..2b8c4805d1 100644 --- a/dotnet/tests/Microsoft.Agents.AI.Workflows.UnitTests/AgentEventsTests.cs +++ b/dotnet/tests/Microsoft.Agents.AI.Workflows.UnitTests/AgentEventsTests.cs @@ -1,7 +1,9 @@ // Copyright (c) Microsoft. All rights reserved. using System.Collections.Generic; +using System.Linq; using System.Threading.Tasks; +using FluentAssertions; using Microsoft.Extensions.AI; namespace Microsoft.Agents.AI.Workflows.UnitTests; @@ -88,4 +90,69 @@ public void AgentResponseEvent_IsWorkflowOutputEvent() Assert.Same(response, evt.Response); Assert.Same(response, evt.Data); } + + /// + /// Verifies that WorkflowStartedEvent is emitted first before any SuperStepStartedEvent. + /// + [Fact] + public async Task StreamingRun_WorkflowStartedEvent_ShouldBeEmittedBefore_SuperStepStartedAsync() + { + // Arrange + TestEchoAgent agent = new("test-agent"); + Workflow workflow = AgentWorkflowBuilder.BuildSequential(agent); + ChatMessage inputMessage = new(ChatRole.User, "Hello"); + + // Act + await using StreamingRun run = await InProcessExecution.RunStreamingAsync(workflow, new List { inputMessage }); + await run.TrySendMessageAsync(new TurnToken(emitEvents: true)); + + List events = []; + await foreach (WorkflowEvent evt in run.WatchStreamAsync()) + { + events.Add(evt); + } + + // Assert + events.Should().NotBeEmpty(); + + List startedEvents = events.OfType().ToList(); + startedEvents.Should().NotBeEmpty(); + + WorkflowStartedEvent? firstStartedEvent = startedEvents.FirstOrDefault(); + SuperStepStartedEvent? firstSuperStepEvent = events.OfType().FirstOrDefault(); + firstSuperStepEvent.Should().NotBeNull(); + + int startedIndex = events.IndexOf(firstStartedEvent!); + int superStepIndex = events.IndexOf(firstSuperStepEvent!); + + startedIndex.Should().BeLessThan(superStepIndex); + } + + /// + /// Verifies that WorkflowStartedEvent is emitted using Lockstep execution mode. + /// + [Fact] + public async Task StreamingRun_LockstepExecution_ShouldEmit_WorkflowStartedEventAsync() + { + // Arrange + TestEchoAgent agent = new("test-agent"); + Workflow workflow = AgentWorkflowBuilder.BuildSequential(agent); + ChatMessage inputMessage = new(ChatRole.User, "Hello"); + + // Act: Use Lockstep execution mode + await using StreamingRun run = await InProcessExecution.Lockstep.RunStreamingAsync(workflow, new List { inputMessage }); + await run.TrySendMessageAsync(new TurnToken(emitEvents: true)); + + List events = []; + await foreach (WorkflowEvent evt in run.WatchStreamAsync()) + { + events.Add(evt); + } + + // Assert + events.Should().NotBeEmpty(); + + List startedEvents = events.OfType().ToList(); + startedEvents.Should().NotBeEmpty(); + } }