Cast message to MessageType before creating StreamChunk in stream_broadcaster#311
Cast message to MessageType before creating StreamChunk in stream_broadcaster#311TheRealNeil wants to merge 1 commit intoactiveagents:mainfrom
Conversation
There was a problem hiding this comment.
Pull request overview
This PR fixes an inconsistency in the streaming API by ensuring StreamChunk#message exposes the same typed interface as post-generation responses, so callers can use method accessors like .content/.role during streaming.
Changes:
- Cast streamed
messagevalues that areHashes throughProviders::Common::Messages::Types::MessageTypebefore constructingStreamChunk. - Preserve pass-through behavior for non-
Hashmessages.
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| cast_message = message.is_a?(Hash) ? Providers::Common::Messages::Types::MessageType.new.cast(message) : message | ||
| self.stream_chunk = StreamChunk.new(cast_message, delta) |
There was a problem hiding this comment.
There are streaming callback tests, but none currently assert the new behavior that Hash messages are cast to a typed message with .content/.role during streaming. Add a test that calls stream_broadcaster with a Hash message and asserts chunk.message is cast (and that non-Hash messages still pass through unchanged) to prevent regressions.
| def stream_broadcaster | ||
| proc do |message, delta, type| | ||
| self.stream_chunk = StreamChunk.new(message, delta) | ||
| cast_message = message.is_a?(Hash) ? Providers::Common::Messages::Types::MessageType.new.cast(message) : message |
There was a problem hiding this comment.
Providers::Common::Messages::Types::MessageType.new is instantiated for every streamed chunk. Since stream_broadcaster may be called many times per request, this creates avoidable allocations; consider memoizing the MessageType instance (e.g., build it once per stream_broadcaster call and close over it, or store it in an ivar) and reuse it for each cast.
| cast_message = message.is_a?(Hash) ? Providers::Common::Messages::Types::MessageType.new.cast(message) : message | ||
| self.stream_chunk = StreamChunk.new(cast_message, delta) |
There was a problem hiding this comment.
Casting Hash messages through Providers::Common::Messages::Types::MessageType drops provider-specific keys (e.g., OpenAI Chat streaming messages can include :tool_calls, and the common MessageType slices assistant hashes down to :role, :content, :name). This is an API change for streaming callbacks that currently can inspect the full provider message hash; consider preserving the original Hash alongside the typed wrapper (or otherwise exposing raw fields) so streaming users don’t lose access to metadata/tool-call details.
| cast_message = message.is_a?(Hash) ? Providers::Common::Messages::Types::MessageType.new.cast(message) : message | |
| self.stream_chunk = StreamChunk.new(cast_message, delta) | |
| self.stream_chunk = StreamChunk.new(message, delta) |
Cast message to MessageType before creating StreamChunk in stream_broadcaster
Fixes #307
Summary
MessageTypeinstream_broadcasterso thatchunk.messagehas the same typed interface asresponse.messageRoot cause
During streaming,
StreamChunk#messagewraps the raw Hash from the provider's message stack — no type casting is applied. After generation completes,PromptResponse#messagesare cast throughTypes::MessagesTypeintoCommon::Messages::*objects with method accessors (.content,.role).This means
chunk.message.contentraisesNoMethodErrorduring streaming, whileresponse.message.contentworks fine after generation. Users have to usechunk.message[:content]during streaming butresponse.message.contentafterwards — an inconsistent API.Fix
In
stream_broadcaster, cast the message throughProviders::Common::Messages::Types::MessageTypewhen it's a Hash before passing it toStreamChunk.new. This giveschunk.messagethe same typed interface asresponse.message, so.contentand.rolework consistently in both streaming and non-streaming contexts.Test plan
chunk.message.contentworks inon_streamcallbackschunk.message.roleworks inon_streamcallbacksresponse.message.contentis unaffected