From 6db5e3f681de43e395c9ab9bd2d493edcc4c88fe Mon Sep 17 00:00:00 2001 From: Blake Ledden Date: Mon, 23 Feb 2026 22:26:12 -0800 Subject: [PATCH] fix(streaming): preserve BetaCompactionBlock type during streaming accumulation When the beta streaming accumulator builds the final message snapshot, it reconstructs each content block by converting it to a dict and passing it through construct_type with the ParsedBetaContentBlock discriminated union. On some Python versions, the generic ParsedBetaTextBlock[T] variant in that union breaks Pydantic's discriminator resolution, causing all non-text blocks (including BetaCompactionBlock) to be incorrectly deserialized as ParsedBetaTextBlock. The fix: only text blocks need reconstruction through construct_type (to wrap them in ParsedBetaTextBlock which adds the parsed_output field). Non-text blocks are already the correct type from the event and can be appended directly to the snapshot. Fixes #1175 --- src/anthropic/lib/streaming/_beta_messages.py | 24 +++++--- .../fixtures/compaction_response.txt | 29 ++++++++++ tests/lib/streaming/test_beta_messages.py | 56 +++++++++++++++++++ 3 files changed, 102 insertions(+), 7 deletions(-) create mode 100644 tests/lib/streaming/fixtures/compaction_response.txt diff --git a/src/anthropic/lib/streaming/_beta_messages.py b/src/anthropic/lib/streaming/_beta_messages.py index c1447a8d..d638bca9 100644 --- a/src/anthropic/lib/streaming/_beta_messages.py +++ b/src/anthropic/lib/streaming/_beta_messages.py @@ -30,7 +30,7 @@ from ...types.beta import BetaRawMessageStreamEvent from ..._utils._utils import is_given from .._parse._response import ResponseFormatT, parse_text -from ...types.beta.parsed_beta_message import ParsedBetaMessage, ParsedBetaContentBlock +from ...types.beta.parsed_beta_message import ParsedBetaMessage, ParsedBetaContentBlock, ParsedBetaTextBlock class BetaMessageStream(Generic[ResponseFormatT]): @@ -475,12 +475,22 @@ def accumulate_event( if event.type == "content_block_start": # TODO: check index - current_snapshot.content.append( - cast( - Any, # Pydantic does not support generic unions at runtime - construct_type(type_=ParsedBetaContentBlock, value=event.content_block.to_dict()), - ), - ) + content_block = event.content_block + if content_block.type == "text": + # Text blocks need to be reconstructed as ParsedBetaTextBlock + # to add the parsed_output field + parsed_block = cast( + Any, + construct_type(type_=ParsedBetaTextBlock, value=content_block.to_dict()), + ) + else: + # Non-text blocks (tool_use, thinking, compaction, etc.) are already + # the correct type from the event and don't need reconstruction. + # Reconstructing through the ParsedBetaContentBlock union can fail + # on some Python versions due to the generic ParsedBetaTextBlock[T] + # variant breaking Pydantic's discriminator resolution. + parsed_block = content_block + current_snapshot.content.append(parsed_block) elif event.type == "content_block_delta": content = current_snapshot.content[event.index] if event.delta.type == "text_delta": diff --git a/tests/lib/streaming/fixtures/compaction_response.txt b/tests/lib/streaming/fixtures/compaction_response.txt new file mode 100644 index 00000000..a57f42cb --- /dev/null +++ b/tests/lib/streaming/fixtures/compaction_response.txt @@ -0,0 +1,29 @@ +event: message_start +data: {"type":"message_start","message":{"id":"msg_compaction_test_123","type":"message","role":"assistant","content":[],"model":"claude-opus-4-20250514","stop_reason":null,"stop_sequence":null,"usage":{"input_tokens":50000,"output_tokens":1}}} + +event: content_block_start +data: {"type":"content_block_start","index":0,"content_block":{"type":"compaction","content":null}} + +event: content_block_delta +data: {"type":"content_block_delta","index":0,"delta":{"type":"compaction_delta","content":"Summary of the previous conversation."}} + +event: content_block_stop +data: {"type":"content_block_stop","index":0} + +event: content_block_start +data: {"type":"content_block_start","index":1,"content_block":{"type":"text","text":""}} + +event: content_block_delta +data: {"type":"content_block_delta","index":1,"delta":{"type":"text_delta","text":"Here is "}} + +event: content_block_delta +data: {"type":"content_block_delta","index":1,"delta":{"type":"text_delta","text":"my response."}} + +event: content_block_stop +data: {"type":"content_block_stop","index":1} + +event: message_delta +data: {"type":"message_delta","delta":{"stop_reason":"end_turn","stop_sequence":null},"usage":{"output_tokens":20}} + +event: message_stop +data: {"type":"message_stop"} diff --git a/tests/lib/streaming/test_beta_messages.py b/tests/lib/streaming/test_beta_messages.py index c37dca9a..e7bca7bc 100644 --- a/tests/lib/streaming/test_beta_messages.py +++ b/tests/lib/streaming/test_beta_messages.py @@ -13,6 +13,7 @@ from anthropic import Anthropic, AsyncAnthropic from anthropic._compat import PYDANTIC_V1 from anthropic.types.beta.beta_message import BetaMessage +from anthropic.types.beta.beta_compaction_block import BetaCompactionBlock from anthropic.lib.streaming._beta_types import ParsedBetaMessageStreamEvent from anthropic.resources.messages.messages import DEPRECATED_MODELS from anthropic.lib.streaming._beta_messages import TRACKS_TOOL_INPUT, BetaMessageStream, BetaAsyncMessageStream @@ -373,6 +374,61 @@ async def test_incomplete_response(self, respx_mock: MockRouter) -> None: ) +class TestCompactionBlockTyping: + """Regression test for #1175: BetaCompactionBlock deserialized as ParsedBetaTextBlock.""" + + @pytest.mark.respx(base_url=base_url) + def test_compaction_block_type_in_final_message(self, respx_mock: MockRouter) -> None: + respx_mock.post("/v1/messages").mock( + return_value=httpx.Response(200, content=get_response("compaction_response.txt")) + ) + + with sync_client.beta.messages.stream( + max_tokens=1024, + messages=[{"role": "user", "content": "Hello"}], + model="claude-opus-4-20250514", + ) as stream: + message = stream.get_final_message() + + # The first content block should be a BetaCompactionBlock, not ParsedBetaTextBlock + compaction_block = message.content[0] + assert compaction_block.type == "compaction" + assert isinstance(compaction_block, BetaCompactionBlock), ( + f"Expected BetaCompactionBlock, got {type(compaction_block).__name__}" + ) + assert compaction_block.content == "Summary of the previous conversation." + + # The second content block should be a text block + text_block = message.content[1] + assert text_block.type == "text" + assert text_block.text == "Here is my response." + + @pytest.mark.asyncio + @pytest.mark.respx(base_url=base_url) + async def test_compaction_block_type_in_final_message_async(self, respx_mock: MockRouter) -> None: + respx_mock.post("/v1/messages").mock( + return_value=httpx.Response(200, content=to_async_iter(get_response("compaction_response.txt"))) + ) + + async with async_client.beta.messages.stream( + max_tokens=1024, + messages=[{"role": "user", "content": "Hello"}], + model="claude-opus-4-20250514", + ) as stream: + message = await stream.get_final_message() + + compaction_block = message.content[0] + assert compaction_block.type == "compaction" + assert isinstance(compaction_block, BetaCompactionBlock), ( + f"Expected BetaCompactionBlock, got {type(compaction_block).__name__}" + ) + assert compaction_block.content == "Summary of the previous conversation." + + text_block = message.content[1] + assert text_block.type == "text" + assert text_block.text == "Here is my response." + + @pytest.mark.parametrize("sync", [True, False], ids=["sync", "async"]) def test_stream_method_definition_in_sync(sync: bool) -> None: client: Anthropic | AsyncAnthropic = sync_client if sync else async_client