Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions docs/decisions/0019-python-context-compaction-strategy.md
Original file line number Diff line number Diff line change
Expand Up @@ -1240,3 +1240,10 @@ class AttributionAwareStrategy(CompactionStrategy):

- [ADR-0016: Unifying Context Management with ContextPlugin](0016-python-context-middleware.md) — Parent ADR that established `ContextProvider`, `HistoryProvider`, and `AgentSession` architecture.
- [Context Compaction Limitations Analysis](https://gist.github.com/victordibia/ec3f3baf97345f7e47da025cf55b999f) — Detailed analysis of why current architecture cannot support in-run compaction, with attempted solutions and their failure modes. Option 4 in this ADR corresponds to "Option A: Middleware Access to Mutable Message Source" from that analysis; Options 1-3 correspond to "Option B: Tool Loop Hook", adapted here to a `BaseChatClient` hook instead of `FunctionInvocationConfiguration`.

### Implementation Rollout Note

Implementation is split into two phases:

1. **Phase 1 (PR 1):** runtime compaction foundation in `agent_framework/_compaction.py`, in-run integration, and extensive core tests, plus in-run compaction samples (`basics`, `advanced`, `custom`).
2. **Phase 2 (PR 2):** history/storage compaction (`upsert`-based full replacement), provider support, storage tests, and storage-focused sample (`storage`).
54 changes: 51 additions & 3 deletions python/packages/core/agent_framework/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,31 @@
SupportsMCPTool,
SupportsWebSearchTool,
)
from ._compaction import (
EXCLUDE_REASON_KEY,
EXCLUDED_KEY,
GROUP_ANNOTATION_KEY,
GROUP_HAS_REASONING_KEY,
GROUP_ID_KEY,
GROUP_INDEX_KEY,
GROUP_KIND_KEY,
GROUP_TOKEN_COUNT_KEY,
SUMMARIZED_BY_SUMMARY_ID_KEY,
SUMMARY_OF_GROUP_IDS_KEY,
SUMMARY_OF_MESSAGE_IDS_KEY,
CharacterEstimatorTokenizer,
CompactionStrategy,
SelectiveToolCallCompactionStrategy,
SlidingWindowStrategy,
SummarizationStrategy,
TokenBudgetComposedStrategy,
TokenizerProtocol,
TruncationStrategy,
annotate_message_groups,
apply_compaction,
included_messages,
included_token_count,
)
from ._mcp import MCPStdioTool, MCPStreamableHTTPTool, MCPWebsocketTool
from ._middleware import (
AgentContext,
Expand Down Expand Up @@ -191,6 +216,17 @@
"AGENT_FRAMEWORK_USER_AGENT",
"APP_INFO",
"DEFAULT_MAX_ITERATIONS",
"EXCLUDED_KEY",
"EXCLUDE_REASON_KEY",
"GROUP_ANNOTATION_KEY",
"GROUP_HAS_REASONING_KEY",
"GROUP_ID_KEY",
"GROUP_INDEX_KEY",
"GROUP_KIND_KEY",
"GROUP_TOKEN_COUNT_KEY",
"SUMMARIZED_BY_SUMMARY_ID_KEY",
"SUMMARY_OF_GROUP_IDS_KEY",
"SUMMARY_OF_MESSAGE_IDS_KEY",
"USER_AGENT_KEY",
"USER_AGENT_TELEMETRY_DISABLED_ENV_VAR",
"Agent",
Expand All @@ -205,16 +241,14 @@
"AgentResponseUpdate",
"AgentRunInputs",
"AgentSession",
"Skill",
"SkillResource",
"SkillsProvider",
"Annotation",
"BaseAgent",
"BaseChatClient",
"BaseContextProvider",
"BaseEmbeddingClient",
"BaseHistoryProvider",
"Case",
"CharacterEstimatorTokenizer",
"ChatAndFunctionMiddlewareTypes",
"ChatContext",
"ChatMiddleware",
Expand All @@ -224,6 +258,7 @@
"ChatResponse",
"ChatResponseUpdate",
"CheckpointStorage",
"CompactionStrategy",
"Content",
"ContinuationToken",
"Default",
Expand Down Expand Up @@ -270,10 +305,16 @@
"Runner",
"RunnerContext",
"SecretString",
"SelectiveToolCallCompactionStrategy",
"SessionContext",
"SingleEdgeGroup",
"Skill",
"SkillResource",
"SkillsProvider",
"SlidingWindowStrategy",
"SubWorkflowRequestMessage",
"SubWorkflowResponseMessage",
"SummarizationStrategy",
"SupportsAgentRun",
"SupportsChatGetResponse",
"SupportsCodeInterpreterTool",
Expand All @@ -286,8 +327,11 @@
"SwitchCaseEdgeGroupCase",
"SwitchCaseEdgeGroupDefault",
"TextSpanRegion",
"TokenBudgetComposedStrategy",
"TokenizerProtocol",
"ToolMode",
"ToolTypes",
"TruncationStrategy",
"TypeCompatibilityError",
"UpdateT",
"UsageDetails",
Expand All @@ -314,12 +358,16 @@
"__version__",
"add_usage_details",
"agent_middleware",
"annotate_message_groups",
"apply_compaction",
"chat_middleware",
"create_edge_runner",
"detect_media_type_from_base64",
"executor",
"function_middleware",
"handler",
"included_messages",
"included_token_count",
"load_settings",
"map_chat_to_agent_update",
"merge_chat_options",
Expand Down
15 changes: 15 additions & 0 deletions python/packages/core/agent_framework/_agents.py
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@
from typing_extensions import Self, TypedDict # pragma: no cover

if TYPE_CHECKING:
from ._compaction import CompactionStrategy, TokenizerProtocol
from ._types import ChatOptions

logger = logging.getLogger("agent_framework")
Expand Down Expand Up @@ -649,6 +650,8 @@ def __init__(
tools: ToolTypes | Callable[..., Any] | Sequence[ToolTypes | Callable[..., Any]] | None = None,
default_options: OptionsCoT | None = None,
context_providers: Sequence[BaseContextProvider] | None = None,
compaction_strategy: CompactionStrategy | None = None,
tokenizer: TokenizerProtocol | None = None,
**kwargs: Any,
) -> None:
"""Initialize a Agent instance.
Expand All @@ -672,6 +675,8 @@ def __init__(
Note: response_format typing does not flow into run outputs when set via default_options.
These can be overridden at runtime via the ``options`` parameter of ``run()``.
tools: The tools to use for the request.
compaction_strategy: Optional in-run compaction strategy for function-calling loops.
tokenizer: Optional tokenizer for token-aware compaction strategies.
kwargs: Any additional keyword arguments. Will be stored as ``additional_properties``.
"""
opts = dict(default_options) if default_options else {}
Expand All @@ -689,6 +694,12 @@ def __init__(
**kwargs,
)
self.client = client
self.compaction_strategy = compaction_strategy or getattr(client, "compaction_strategy", None)
self.tokenizer = tokenizer or getattr(client, "tokenizer", None)
if hasattr(self.client, "compaction_strategy"):
self.client.compaction_strategy = self.compaction_strategy
if hasattr(self.client, "tokenizer"):
self.client.tokenizer = self.tokenizer

# Get tools from options or named parameter (named param takes precedence)
tools_ = tools if tools is not None else opts.pop("tools", None)
Expand Down Expand Up @@ -1379,6 +1390,8 @@ def __init__(
default_options: OptionsCoT | None = None,
context_providers: Sequence[BaseContextProvider] | None = None,
middleware: Sequence[MiddlewareTypes] | None = None,
compaction_strategy: CompactionStrategy | None = None,
tokenizer: TokenizerProtocol | None = None,
**kwargs: Any,
) -> None:
"""Initialize a Agent instance."""
Expand All @@ -1392,5 +1405,7 @@ def __init__(
default_options=default_options,
context_providers=context_providers,
middleware=middleware,
compaction_strategy=compaction_strategy,
tokenizer=tokenizer,
**kwargs,
)
82 changes: 75 additions & 7 deletions python/packages/core/agent_framework/_clients.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@

if TYPE_CHECKING:
from ._agents import Agent
from ._compaction import CompactionStrategy, TokenizerProtocol
from ._middleware import (
MiddlewareTypes,
)
Expand Down Expand Up @@ -252,7 +253,11 @@ async def _stream():
"""

OTEL_PROVIDER_NAME: ClassVar[str] = "unknown"
DEFAULT_EXCLUDE: ClassVar[set[str]] = {"additional_properties"}
DEFAULT_EXCLUDE: ClassVar[set[str]] = {
"additional_properties",
"compaction_strategy",
"tokenizer",
}
STORES_BY_DEFAULT: ClassVar[bool] = False
"""Whether this client stores conversation history server-side by default.

Expand All @@ -267,15 +272,21 @@ def __init__(
self,
*,
additional_properties: dict[str, Any] | None = None,
compaction_strategy: CompactionStrategy | None = None,
tokenizer: TokenizerProtocol | None = None,
**kwargs: Any,
) -> None:
"""Initialize a BaseChatClient instance.

Keyword Args:
additional_properties: Additional properties for the client.
compaction_strategy: Optional compaction strategy to apply before model calls.
tokenizer: Optional tokenizer used by token-aware compaction strategies.
kwargs: Additional keyword arguments (merged into additional_properties).
"""
self.additional_properties = additional_properties or {}
self.compaction_strategy = compaction_strategy
self.tokenizer = tokenizer
super().__init__(**kwargs)

def to_dict(self, *, exclude: set[str] | None = None, exclude_none: bool = True) -> dict[str, Any]:
Expand Down Expand Up @@ -334,6 +345,23 @@ def _build_response_stream(
finalizer=lambda updates: self._finalize_response_updates(updates, response_format=response_format),
)

async def _prepare_messages_for_model_call(
self,
messages: Sequence[Message],
) -> list[Message]:
prepared_messages = list(messages)
strategy = getattr(self, "compaction_strategy", None)
if strategy is None:
return prepared_messages
tokenizer = getattr(self, "tokenizer", None)
from ._compaction import apply_compaction

return await apply_compaction(
prepared_messages,
strategy=strategy,
tokenizer=tokenizer,
)

# region Internal method to be implemented by derived classes

@abstractmethod
Expand Down Expand Up @@ -413,12 +441,43 @@ def get_response(
Returns:
When streaming a response stream of ChatResponseUpdates, otherwise an Awaitable ChatResponse.
"""
return self._inner_get_response(
messages=messages,
stream=stream,
options=options or {}, # type: ignore[arg-type]
**kwargs,
)
if getattr(self, "compaction_strategy", None) is None:
return self._inner_get_response(
messages=messages,
stream=stream,
options=options or {},
**kwargs,
)

if stream:

async def _get_stream() -> ResponseStream[ChatResponseUpdate, ChatResponse[Any]]:
prepared_messages = await self._prepare_messages_for_model_call(messages)
stream_response = self._inner_get_response(
messages=prepared_messages,
stream=True,
options=options or {},
**kwargs,
)
if isinstance(stream_response, ResponseStream):
return stream_response
awaited_stream_response = await stream_response
if isinstance(awaited_stream_response, ResponseStream):
return awaited_stream_response
raise ValueError("Streaming responses must return a ResponseStream.")

return ResponseStream.from_awaitable(_get_stream())

async def _get_response() -> ChatResponse[Any]:
prepared_messages = await self._prepare_messages_for_model_call(messages)
return await self._inner_get_response(
messages=prepared_messages,
stream=False,
options=options or {},
**kwargs,
)

return _get_response()

def service_url(self) -> str:
"""Get the URL of the service.
Expand All @@ -443,6 +502,8 @@ def as_agent(
context_providers: Sequence[Any] | None = None,
middleware: Sequence[MiddlewareTypes] | None = None,
function_invocation_configuration: FunctionInvocationConfiguration | None = None,
compaction_strategy: CompactionStrategy | None = None,
tokenizer: TokenizerProtocol | None = None,
**kwargs: Any,
) -> Agent[OptionsCoT]:
"""Create a Agent with this client.
Expand All @@ -465,6 +526,8 @@ def as_agent(
context_providers: Context providers to include during agent invocation.
middleware: List of middleware to intercept agent and function invocations.
function_invocation_configuration: Optional function invocation configuration override.
compaction_strategy: Optional in-run compaction strategy used by function-calling loops.
tokenizer: Optional tokenizer used by token-aware compaction strategies.
kwargs: Any additional keyword arguments. Will be stored as ``additional_properties``.

Returns:
Expand All @@ -490,6 +553,9 @@ def as_agent(
"""
from ._agents import Agent

strategy = getattr(self, "compaction_strategy", None) if compaction_strategy is None else compaction_strategy
resolved_tokenizer = getattr(self, "tokenizer", None) if tokenizer is None else tokenizer

return Agent(
client=self,
id=id,
Expand All @@ -501,6 +567,8 @@ def as_agent(
context_providers=context_providers,
middleware=middleware,
function_invocation_configuration=function_invocation_configuration,
compaction_strategy=strategy,
tokenizer=resolved_tokenizer,
**kwargs,
)

Expand Down
Loading