diff --git a/python/packages/core/agent_framework/_skills.py b/python/packages/core/agent_framework/_skills.py index c7d59d789e..fc71329a5f 100644 --- a/python/packages/core/agent_framework/_skills.py +++ b/python/packages/core/agent_framework/_skills.py @@ -112,9 +112,7 @@ def __init__( self._accepts_kwargs: bool = False if function is not None: sig = inspect.signature(function) - self._accepts_kwargs = any( - p.kind == inspect.Parameter.VAR_KEYWORD for p in sig.parameters.values() - ) + self._accepts_kwargs = any(p.kind == inspect.Parameter.VAR_KEYWORD for p in sig.parameters.values()) class Skill: diff --git a/python/packages/core/agent_framework/_types.py b/python/packages/core/agent_framework/_types.py index 7ae9dbaa3d..fd97672d65 100644 --- a/python/packages/core/agent_framework/_types.py +++ b/python/packages/core/agent_framework/_types.py @@ -2776,6 +2776,7 @@ async def __anext__(self) -> UpdateT: except StopAsyncIteration: self._consumed = True await self._run_cleanup_hooks() + await self.get_final_response() raise except Exception: await self._run_cleanup_hooks() @@ -2825,34 +2826,38 @@ async def get_final_response(self) -> FinalT: await self._get_stream() if self._inner_stream is None: raise RuntimeError("Inner stream not available") - if not self._finalized: + if not self._finalized and not self._consumed: # Consume outer stream (which delegates to inner) if not already consumed - if not self._consumed: - async for _ in self: - pass + async for _ in self: + pass - # First, finalize the inner stream and run its result hooks + # Re-check: __anext__ auto-finalization may have already finalized this stream + if not self._finalized: # This ensures inner post-processing (e.g., context provider notifications) runs - inner_stream = self._inner_stream - inner_result: Any - if inner_stream._finalizer is not None: - inner_finalizer = inner_stream._finalizer - inner_result = inner_finalizer(inner_stream._updates) - if isawaitable(inner_result): - inner_result = await inner_result + # Skip if inner stream was already finalized (e.g., via auto-finalization on iteration) + if not self._inner_stream._finalized: + inner_stream = self._inner_stream + inner_result: Any + if inner_stream._finalizer is not None: + inner_finalizer = inner_stream._finalizer + inner_result = inner_finalizer(inner_stream._updates) + if isawaitable(inner_result): + inner_result = await inner_result + else: + inner_result = list(inner_stream._updates) + + # Run inner stream's result hooks + inner_hooks = cast(list[Callable[[Any], Any | Awaitable[Any] | None]], inner_stream._result_hooks) + for hook in inner_hooks: + hooked_result = hook(inner_result) + if isawaitable(hooked_result): + hooked_result = await hooked_result + if hooked_result is not None: + inner_result = hooked_result + inner_stream._final_result = inner_result + inner_stream._finalized = True else: - inner_result = list(inner_stream._updates) - - # Run inner stream's result hooks - inner_hooks = cast(list[Callable[[Any], Any | Awaitable[Any] | None]], inner_stream._result_hooks) - for hook in inner_hooks: - hooked_result = hook(inner_result) - if isawaitable(hooked_result): - hooked_result = await hooked_result - if hooked_result is not None: - inner_result = hooked_result - inner_stream._final_result = inner_result - inner_stream._finalized = True + inner_result = self._inner_stream._final_result # Now finalize the outer stream with its own finalizer # If outer has no finalizer, use inner's result (preserves from_awaitable behavior) @@ -2877,12 +2882,12 @@ async def get_final_response(self) -> FinalT: self._finalized = True return self._final_result # type: ignore[return-value] - if not self._finalized: - if not self._consumed: - async for _ in self: - pass + if not self._finalized and not self._consumed: + async for _ in self: + pass - # Use finalizer if configured, otherwise return collected updates + # Re-check: __anext__ auto-finalization may have already finalized this stream + if not self._finalized: result: Any if self._finalizer is not None: result = self._finalizer(self._updates) diff --git a/python/packages/core/tests/core/test_agents.py b/python/packages/core/tests/core/test_agents.py index d41b87b707..b2704aa6a6 100644 --- a/python/packages/core/tests/core/test_agents.py +++ b/python/packages/core/tests/core/test_agents.py @@ -357,6 +357,40 @@ async def test_chat_client_agent_streaming_session_id_set_without_get_final_resp assert session.service_session_id == "resp_123" +async def test_chat_client_agent_streaming_session_history_saved_without_get_final_response( + chat_client_base: SupportsChatGetResponse, +) -> None: + """Test that session history is saved after streaming iteration without get_final_response(). + + Auto-finalization on iteration completion should trigger after_run providers, + persisting conversation history to the session. + """ + from agent_framework._sessions import InMemoryHistoryProvider + + chat_client_base.streaming_responses = [ + [ + ChatResponseUpdate( + contents=[Content.from_text("Hello Alice!")], + role="assistant", + response_id="resp_1", + finish_reason="stop", + ), + ] + ] + + agent = Agent(client=chat_client_base) + session = agent.create_session() + + # Only iterate — do NOT call get_final_response() + async for _ in agent.run("My name is Alice", session=session, stream=True): + pass + + chat_messages: list[Message] = session.state.get(InMemoryHistoryProvider.DEFAULT_SOURCE_ID, {}).get("messages", []) + assert len(chat_messages) == 2 + assert chat_messages[0].text == "My name is Alice" + assert chat_messages[1].text == "Hello Alice!" + + async def test_chat_client_agent_update_session_messages(client: SupportsChatGetResponse) -> None: from agent_framework._sessions import InMemoryHistoryProvider diff --git a/python/packages/core/tests/core/test_types.py b/python/packages/core/tests/core/test_types.py index 0d314c1aa5..312ab83f2e 100644 --- a/python/packages/core/tests/core/test_types.py +++ b/python/packages/core/tests/core/test_types.py @@ -2666,6 +2666,58 @@ async def test_updates_property_returns_collected(self) -> None: assert stream.updates[0].text == "update_0" assert stream.updates[1].text == "update_1" + async def test_auto_finalize_on_iteration_completion(self) -> None: + """Stream auto-finalizes when async iteration completes.""" + stream = ResponseStream(_generate_updates(2), finalizer=_combine_updates) + + async for _ in stream: + pass + + assert stream._finalized is True + assert stream._final_result is not None + assert stream._final_result.text == "update_0update_1" + + async def test_auto_finalize_runs_result_hooks(self) -> None: + """Result hooks run automatically when iteration completes.""" + hook_called = {"value": False} + + def tracking_hook(response: ChatResponse) -> ChatResponse: + hook_called["value"] = True + response.additional_properties["auto_finalized"] = True + return response + + stream = ResponseStream( + _generate_updates(2), + finalizer=_combine_updates, + result_hooks=[tracking_hook], + ) + + async for _ in stream: + pass + + assert hook_called["value"] is True + final = await stream.get_final_response() + assert final.additional_properties["auto_finalized"] is True + + async def test_get_final_response_idempotent_after_auto_finalize(self) -> None: + """get_final_response returns cached result after auto-finalization.""" + call_count = {"value": 0} + + def counting_finalizer(updates: list[ChatResponseUpdate]) -> ChatResponse: + call_count["value"] += 1 + return _combine_updates(updates) + + stream = ResponseStream(_generate_updates(2), finalizer=counting_finalizer) + + async for _ in stream: + pass + + final1 = await stream.get_final_response() + final2 = await stream.get_final_response() + + assert call_count["value"] == 1 + assert final1.text == final2.text + class TestResponseStreamTransformHooks: """Tests for transform hooks (per-update processing).""" diff --git a/python/samples/01-get-started/README.md b/python/samples/01-get-started/README.md index 5ba119e016..e1bae20b32 100644 --- a/python/samples/01-get-started/README.md +++ b/python/samples/01-get-started/README.md @@ -22,7 +22,7 @@ export AZURE_OPENAI_RESPONSES_DEPLOYMENT_NAME="gpt-4o" # optional, defaults to |---|------|-------------------| | 1 | [01_hello_agent.py](01_hello_agent.py) | Create your first agent and run it (streaming and non-streaming). | | 2 | [02_add_tools.py](02_add_tools.py) | Define a function tool with `@tool` and attach it to an agent. | -| 3 | [03_multi_turn.py](03_multi_turn.py) | Keep conversation history across turns with `AgentThread`. | +| 3 | [03_multi_turn.py](03_multi_turn.py) | Keep conversation history across turns with `AgentSession`. | | 4 | [04_memory.py](04_memory.py) | Add dynamic context with a custom `ContextProvider`. | | 5 | [05_first_workflow.py](05_first_workflow.py) | Chain executors into a workflow with edges. | | 6 | [06_host_your_agent.py](06_host_your_agent.py) | Host a single agent with Azure Functions. | diff --git a/python/samples/README.md b/python/samples/README.md index 1f353fbc52..fa091b78bc 100644 --- a/python/samples/README.md +++ b/python/samples/README.md @@ -18,7 +18,7 @@ Start with `01-get-started/` and work through the numbered files: 1. **[01_hello_agent.py](./01-get-started/01_hello_agent.py)** — Create and run your first agent 2. **[02_add_tools.py](./01-get-started/02_add_tools.py)** — Add function tools with `@tool` -3. **[03_multi_turn.py](./01-get-started/03_multi_turn.py)** — Multi-turn conversations with `AgentThread` +3. **[03_multi_turn.py](./01-get-started/03_multi_turn.py)** — Multi-turn conversations with `AgentSession` 4. **[04_memory.py](./01-get-started/04_memory.py)** — Agent memory with `ContextProvider` 5. **[05_first_workflow.py](./01-get-started/05_first_workflow.py)** — Build a workflow with executors and edges 6. **[06_host_your_agent.py](./01-get-started/06_host_your_agent.py)** — Host your agent via Azure Functions