Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 1 addition & 3 deletions python/packages/core/agent_framework/_skills.py
Original file line number Diff line number Diff line change
Expand Up @@ -112,9 +112,7 @@ def __init__(
self._accepts_kwargs: bool = False
if function is not None:
sig = inspect.signature(function)
self._accepts_kwargs = any(
p.kind == inspect.Parameter.VAR_KEYWORD for p in sig.parameters.values()
)
self._accepts_kwargs = any(p.kind == inspect.Parameter.VAR_KEYWORD for p in sig.parameters.values())


class Skill:
Expand Down
63 changes: 34 additions & 29 deletions python/packages/core/agent_framework/_types.py
Original file line number Diff line number Diff line change
Expand Up @@ -2776,6 +2776,7 @@ async def __anext__(self) -> UpdateT:
except StopAsyncIteration:
self._consumed = True
await self._run_cleanup_hooks()
await self.get_final_response()
raise
except Exception:
await self._run_cleanup_hooks()
Expand Down Expand Up @@ -2825,34 +2826,38 @@ async def get_final_response(self) -> FinalT:
await self._get_stream()
if self._inner_stream is None:
raise RuntimeError("Inner stream not available")
if not self._finalized:
if not self._finalized and not self._consumed:
# Consume outer stream (which delegates to inner) if not already consumed
if not self._consumed:
async for _ in self:
pass
async for _ in self:
pass

# First, finalize the inner stream and run its result hooks
# Re-check: __anext__ auto-finalization may have already finalized this stream
if not self._finalized:
# This ensures inner post-processing (e.g., context provider notifications) runs
inner_stream = self._inner_stream
inner_result: Any
if inner_stream._finalizer is not None:
inner_finalizer = inner_stream._finalizer
inner_result = inner_finalizer(inner_stream._updates)
if isawaitable(inner_result):
inner_result = await inner_result
# Skip if inner stream was already finalized (e.g., via auto-finalization on iteration)
if not self._inner_stream._finalized:
inner_stream = self._inner_stream
inner_result: Any
if inner_stream._finalizer is not None:
inner_finalizer = inner_stream._finalizer
inner_result = inner_finalizer(inner_stream._updates)
if isawaitable(inner_result):
inner_result = await inner_result
else:
inner_result = list(inner_stream._updates)

# Run inner stream's result hooks
inner_hooks = cast(list[Callable[[Any], Any | Awaitable[Any] | None]], inner_stream._result_hooks)
for hook in inner_hooks:
hooked_result = hook(inner_result)
if isawaitable(hooked_result):
hooked_result = await hooked_result
if hooked_result is not None:
inner_result = hooked_result
inner_stream._final_result = inner_result
inner_stream._finalized = True
else:
inner_result = list(inner_stream._updates)

# Run inner stream's result hooks
inner_hooks = cast(list[Callable[[Any], Any | Awaitable[Any] | None]], inner_stream._result_hooks)
for hook in inner_hooks:
hooked_result = hook(inner_result)
if isawaitable(hooked_result):
hooked_result = await hooked_result
if hooked_result is not None:
inner_result = hooked_result
inner_stream._final_result = inner_result
inner_stream._finalized = True
inner_result = self._inner_stream._final_result

# Now finalize the outer stream with its own finalizer
# If outer has no finalizer, use inner's result (preserves from_awaitable behavior)
Expand All @@ -2877,12 +2882,12 @@ async def get_final_response(self) -> FinalT:
self._finalized = True
return self._final_result # type: ignore[return-value]

if not self._finalized:
if not self._consumed:
async for _ in self:
pass
if not self._finalized and not self._consumed:
async for _ in self:
pass

# Use finalizer if configured, otherwise return collected updates
# Re-check: __anext__ auto-finalization may have already finalized this stream
if not self._finalized:
result: Any
if self._finalizer is not None:
result = self._finalizer(self._updates)
Expand Down
34 changes: 34 additions & 0 deletions python/packages/core/tests/core/test_agents.py
Original file line number Diff line number Diff line change
Expand Up @@ -357,6 +357,40 @@ async def test_chat_client_agent_streaming_session_id_set_without_get_final_resp
assert session.service_session_id == "resp_123"


async def test_chat_client_agent_streaming_session_history_saved_without_get_final_response(
chat_client_base: SupportsChatGetResponse,
) -> None:
"""Test that session history is saved after streaming iteration without get_final_response().

Auto-finalization on iteration completion should trigger after_run providers,
persisting conversation history to the session.
"""
from agent_framework._sessions import InMemoryHistoryProvider

chat_client_base.streaming_responses = [
[
ChatResponseUpdate(
contents=[Content.from_text("Hello Alice!")],
role="assistant",
response_id="resp_1",
finish_reason="stop",
),
]
]

agent = Agent(client=chat_client_base)
session = agent.create_session()

# Only iterate — do NOT call get_final_response()
async for _ in agent.run("My name is Alice", session=session, stream=True):
pass

chat_messages: list[Message] = session.state.get(InMemoryHistoryProvider.DEFAULT_SOURCE_ID, {}).get("messages", [])
assert len(chat_messages) == 2
assert chat_messages[0].text == "My name is Alice"
assert chat_messages[1].text == "Hello Alice!"


async def test_chat_client_agent_update_session_messages(client: SupportsChatGetResponse) -> None:
from agent_framework._sessions import InMemoryHistoryProvider

Expand Down
52 changes: 52 additions & 0 deletions python/packages/core/tests/core/test_types.py
Original file line number Diff line number Diff line change
Expand Up @@ -2666,6 +2666,58 @@ async def test_updates_property_returns_collected(self) -> None:
assert stream.updates[0].text == "update_0"
assert stream.updates[1].text == "update_1"

async def test_auto_finalize_on_iteration_completion(self) -> None:
"""Stream auto-finalizes when async iteration completes."""
stream = ResponseStream(_generate_updates(2), finalizer=_combine_updates)

async for _ in stream:
pass

assert stream._finalized is True
assert stream._final_result is not None
assert stream._final_result.text == "update_0update_1"

async def test_auto_finalize_runs_result_hooks(self) -> None:
"""Result hooks run automatically when iteration completes."""
hook_called = {"value": False}

def tracking_hook(response: ChatResponse) -> ChatResponse:
hook_called["value"] = True
response.additional_properties["auto_finalized"] = True
return response

stream = ResponseStream(
_generate_updates(2),
finalizer=_combine_updates,
result_hooks=[tracking_hook],
)

async for _ in stream:
pass

assert hook_called["value"] is True
final = await stream.get_final_response()
assert final.additional_properties["auto_finalized"] is True

async def test_get_final_response_idempotent_after_auto_finalize(self) -> None:
"""get_final_response returns cached result after auto-finalization."""
call_count = {"value": 0}

def counting_finalizer(updates: list[ChatResponseUpdate]) -> ChatResponse:
call_count["value"] += 1
return _combine_updates(updates)

stream = ResponseStream(_generate_updates(2), finalizer=counting_finalizer)

async for _ in stream:
pass

final1 = await stream.get_final_response()
final2 = await stream.get_final_response()

assert call_count["value"] == 1
assert final1.text == final2.text


class TestResponseStreamTransformHooks:
"""Tests for transform hooks (per-update processing)."""
Expand Down
2 changes: 1 addition & 1 deletion python/samples/01-get-started/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ export AZURE_OPENAI_RESPONSES_DEPLOYMENT_NAME="gpt-4o" # optional, defaults to
|---|------|-------------------|
| 1 | [01_hello_agent.py](01_hello_agent.py) | Create your first agent and run it (streaming and non-streaming). |
| 2 | [02_add_tools.py](02_add_tools.py) | Define a function tool with `@tool` and attach it to an agent. |
| 3 | [03_multi_turn.py](03_multi_turn.py) | Keep conversation history across turns with `AgentThread`. |
| 3 | [03_multi_turn.py](03_multi_turn.py) | Keep conversation history across turns with `AgentSession`. |
| 4 | [04_memory.py](04_memory.py) | Add dynamic context with a custom `ContextProvider`. |
| 5 | [05_first_workflow.py](05_first_workflow.py) | Chain executors into a workflow with edges. |
| 6 | [06_host_your_agent.py](06_host_your_agent.py) | Host a single agent with Azure Functions. |
Expand Down
2 changes: 1 addition & 1 deletion python/samples/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ Start with `01-get-started/` and work through the numbered files:

1. **[01_hello_agent.py](./01-get-started/01_hello_agent.py)** — Create and run your first agent
2. **[02_add_tools.py](./01-get-started/02_add_tools.py)** — Add function tools with `@tool`
3. **[03_multi_turn.py](./01-get-started/03_multi_turn.py)** — Multi-turn conversations with `AgentThread`
3. **[03_multi_turn.py](./01-get-started/03_multi_turn.py)** — Multi-turn conversations with `AgentSession`
4. **[04_memory.py](./01-get-started/04_memory.py)** — Agent memory with `ContextProvider`
5. **[05_first_workflow.py](./01-get-started/05_first_workflow.py)** — Build a workflow with executors and edges
6. **[06_host_your_agent.py](./01-get-started/06_host_your_agent.py)** — Host your agent via Azure Functions
Expand Down
Loading