From 3d01160263caa8042f2fed087bf80e4868b55286 Mon Sep 17 00:00:00 2001 From: Fanis Tharropoulos Date: Fri, 6 Feb 2026 17:41:53 +0200 Subject: [PATCH 1/8] feat(types): add conversation streaming config and builder - introduce typed stream callbacks and message chunks for conversation search - add decorator-based StreamConfigBuilder and wire streaming params into search --- src/typesense/types/document.py | 168 +++++++++++++++++++++++++++++++- 1 file changed, 163 insertions(+), 5 deletions(-) diff --git a/src/typesense/types/document.py b/src/typesense/types/document.py index 496432d..658b8d3 100644 --- a/src/typesense/types/document.py +++ b/src/typesense/types/document.py @@ -586,6 +586,162 @@ class NLLanguageParameters(typing.TypedDict): nl_query_debug: typing.NotRequired[bool] +class MessageChunk(typing.TypedDict): + """ + A single chunk from a conversation stream response. + + Attributes: + conversation_id (str): ID of the conversation. + message (str): Message content for this chunk. + """ + + conversation_id: str + message: str + + +class StreamConfig(typing.Generic[TDoc], typing.TypedDict, total=False): + """ + Configuration for streaming conversation search responses. + + Attributes: + on_chunk: Callback invoked for each streamed chunk (conversation_id, message). + on_complete: Callback invoked when the stream completes with the full search response. + on_error: Callback invoked if an error occurs during streaming. + """ + + on_chunk: typing.Callable[[MessageChunk], None] + on_complete: "OnCompleteCallback[TDoc]" + on_error: typing.Callable[[BaseException], None] + + +OnChunkCallback = typing.Callable[[MessageChunk], None] + + +class OnCompleteCallback(typing.Protocol[TDoc]): + def __call__(self, response: "SearchResponse[TDoc]") -> None: ... + + +OnErrorCallback = typing.Callable[[BaseException], None] + + +class StreamConfigBuilder(typing.Generic[TDoc]): + """ + Builder for StreamConfig using decorators. + + Example: + >>> stream = StreamConfigBuilder() + >>> + >>> @stream.on_chunk + ... def handle_chunk(chunk: MessageChunk) -> None: + ... print(chunk["message"], end="", flush=True) + >>> + >>> @stream.on_complete + ... def handle_complete(response: dict) -> None: + ... print(f"Done! Found {response.get('found', 0)}") + >>> + >>> response = await client.collections["docs"].documents.search({ + ... "q": "query", + ... "query_by": "content", + ... "conversation_stream": True, + ... "stream_config": stream, + ... }) + """ + + def __init__(self) -> None: + """Initialize an empty StreamConfigBuilder.""" + self._on_chunk: OnChunkCallback | None = None + self._on_complete: OnCompleteCallback[TDoc] | None = None + self._on_error: OnErrorCallback | None = None + + def on_chunk(self, func: OnChunkCallback) -> OnChunkCallback: + """ + Decorator to register an on_chunk callback. + + Args: + func: Callback invoked for each streamed message chunk. + + Returns: + The original function (unmodified). + """ + self._on_chunk = func + return func + + def on_complete(self, func: OnCompleteCallback[TDoc]) -> OnCompleteCallback[TDoc]: + """ + Decorator to register an on_complete callback. + + Args: + func: Callback invoked when streaming completes with the full response. + + Returns: + The original function (unmodified). + """ + self._on_complete = func + return func + + def on_error(self, func: OnErrorCallback) -> OnErrorCallback: + """ + Decorator to register an on_error callback. + + Args: + func: Callback invoked if an error occurs during streaming. + + Returns: + The original function (unmodified). + """ + self._on_error = func + return func + + def build(self) -> StreamConfig[TDoc]: + """ + Build the StreamConfig dictionary. + + Returns: + A StreamConfig with the registered callbacks. + """ + config: StreamConfig[TDoc] = {} + if self._on_chunk is not None: + config["on_chunk"] = self._on_chunk + if self._on_complete is not None: + config["on_complete"] = self._on_complete + if self._on_error is not None: + config["on_error"] = self._on_error + return config + + def get( + self, + key: typing.Literal["on_chunk", "on_complete", "on_error"], + default: typing.Callable[..., None] | None = None, + ) -> typing.Callable[..., None] | None: + """ + Get a callback by key (for compatibility with dict-like access). + + Args: + key: The callback name ('on_chunk', 'on_complete', or 'on_error'). + default: Default value if the callback is not set. + + Returns: + The callback function or the default value. + """ + return self.build().get(key, default) + + +class ConversationStreamParameters(typing.Generic[TDoc], typing.TypedDict): + """ + Parameters for conversational search streaming. + + Attributes: + conversation_stream (bool): When true, the search response is streamed (SSE). + stream_config: Callbacks for stream events. Not sent to the API. + Can be a StreamConfig dict or a StreamConfigBuilder instance. + """ + + conversation_stream: typing.NotRequired[bool] + stream_config: typing.NotRequired[ + typing.Union[StreamConfig[TDoc], StreamConfigBuilder[TDoc]] + ] + + class SearchParameters( RequiredSearchParameters, QueryParameters, @@ -598,11 +754,13 @@ class SearchParameters( TypoToleranceParameters, CachingParameters, NLLanguageParameters, + ConversationStreamParameters[TDoc], + typing.Generic[TDoc], ): """Parameters for searching documents.""" -class MultiSearchParameters(SearchParameters): +class MultiSearchParameters(SearchParameters[TDoc], typing.Generic[TDoc]): """ Parameters for performing a [Federated/Multi-Search](https://typesense.org/docs/26.0/api/federated-multi-search.html#federated-multi-search). @@ -856,7 +1014,7 @@ class LLMResponse(typing.TypedDict): model: str -class ParsedNLQuery(typing.TypedDict): +class ParsedNLQuery(typing.Generic[TDoc], typing.TypedDict): """ Schema for a parsed natural language query. @@ -868,8 +1026,8 @@ class ParsedNLQuery(typing.TypedDict): """ parse_time_ms: int - generated_params: SearchParameters - augmented_params: SearchParameters + generated_params: SearchParameters[TDoc] + augmented_params: SearchParameters[TDoc] llm_response: typing.NotRequired[LLMResponse] @@ -901,7 +1059,7 @@ class SearchResponse(typing.Generic[TDoc], typing.TypedDict): hits: typing.List[Hit[TDoc]] grouped_hits: typing.NotRequired[typing.List[GroupedHit[TDoc]]] conversation: typing.NotRequired[Conversation] - parsed_nl_query: typing.NotRequired[ParsedNLQuery] + parsed_nl_query: typing.NotRequired[ParsedNLQuery[TDoc]] class DeleteSingleDocumentParameters(typing.TypedDict): From 9fc38b5a96fe6c21f4855e9f1245c3e976d9c81b Mon Sep 17 00:00:00 2001 From: Fanis Tharropoulos Date: Fri, 6 Feb 2026 17:42:47 +0200 Subject: [PATCH 2/8] feat(streaming): add sse stream parsing and chunk combiner - parse conversation stream sse lines into message chunks or search responses - combine streamed chunks into a final search response for async calls --- src/typesense/stream_handlers.py | 164 +++++++++++++++++++++++++++++++ 1 file changed, 164 insertions(+) create mode 100644 src/typesense/stream_handlers.py diff --git a/src/typesense/stream_handlers.py b/src/typesense/stream_handlers.py new file mode 100644 index 0000000..45e38d5 --- /dev/null +++ b/src/typesense/stream_handlers.py @@ -0,0 +1,164 @@ +""" +SSE stream parsing and chunk combining for conversation search streaming. + +This module contains pure logic for parsing server-sent event lines from +conversation_stream responses and combining message chunks into a final +search response. Used by async API calls. +""" + +import json +import sys + +if sys.version_info >= (3, 11): + import typing +else: + import typing_extensions as typing + +from typesense.types.document import MessageChunk + +JSONPrimitive: typing.TypeAlias = typing.Union[str, int, float, bool, None] +JSONValue: typing.TypeAlias = typing.Union[ + JSONPrimitive, typing.Dict[str, "JSONValue"], typing.List["JSONValue"] +] +JSONDict: typing.TypeAlias = typing.Dict[str, JSONValue] + +_SEARCH_RESPONSE_KEYS = frozenset( + {"results", "found", "hits", "page", "search_time_ms"} +) + +StreamChunk: typing.TypeAlias = typing.Union[MessageChunk, JSONDict] + + +def parse_sse_line(line: str) -> typing.Optional[StreamChunk]: + """ + Parse a single SSE line into a MessageChunk, search response dict, or None. + + Handles: + - Empty lines and "data: [DONE]" -> None + - "data: {...}" -> parse JSON, return MessageChunk or search response + - Raw JSON line starting with "{" -> same + - Plain text -> return chunk with conversation_id="unknown", message=line + + Returns: + MessageChunk for conversation chunks, dict for search responses, or None to skip. + """ + line = line.strip() + if not line or line == "data: [DONE]": + return None + + # SSE format: "data: {...}" + if line.startswith("data:"): + content = line[5:].strip() + return _parse_data_content(content) + + # Raw JSON + if line.startswith("{"): + return _parse_json_content(line) + + return _chunk_from_text(line) + + +def _parse_data_content(content: str) -> typing.Optional[StreamChunk]: + """Parse the content after 'data:' into a MessageChunk, search response, or None.""" + if not content: + return None + if content.startswith("{"): + return _parse_json_content(content) + return _chunk_from_text(content) + + +def _parse_json_content(raw: str) -> StreamChunk: + """Parse a JSON string into a MessageChunk or search response dict.""" + try: + data = json.loads(raw) + except json.JSONDecodeError: + return _chunk_from_text(raw) + if not isinstance(data, dict): + return _chunk_from_text(json.dumps(data)) + + parsed = typing.cast(JSONDict, data) + conversation_id = parsed.get("conversation_id") + message = parsed.get("message") + nested_conversation = parsed.get("conversation") + + if conversation_id is None or message is None: + if isinstance(nested_conversation, dict): + nested_conversation_id = nested_conversation.get("conversation_id") + nested_message = nested_conversation.get("message") + if conversation_id is None and nested_conversation_id is not None: + conversation_id = nested_conversation_id + if message is None and nested_message is not None: + message = nested_message + + if conversation_id is None: + parsed["conversation_id"] = "unknown" + elif not isinstance(conversation_id, str): + parsed["conversation_id"] = str(conversation_id) + else: + parsed["conversation_id"] = conversation_id + + if message is None: + parsed["message"] = "" + elif not isinstance(message, str): + parsed["message"] = str(message) + else: + parsed["message"] = message + + return parsed + + +def _is_search_response_dict(data: typing.Mapping[str, JSONValue]) -> bool: + """Check if a dict is a search response (has found, hits, results, etc.).""" + return bool(set(data.keys()) & _SEARCH_RESPONSE_KEYS) + + +def is_message_chunk(chunk: JSONValue) -> bool: + """Return True if chunk is a conversation message chunk (has conversation_id and message).""" + if not isinstance(chunk, dict): + return False + if "message" not in chunk or "conversation_id" not in chunk: + return False + return not _is_search_response_dict(chunk) + + +def is_complete_search_response(chunk: JSONValue) -> bool: + """Return True if chunk looks like a full search response (has hits, found, etc.).""" + if not isinstance(chunk, dict) or not chunk: + return False + keys = set(chunk.keys()) + return bool(keys & _SEARCH_RESPONSE_KEYS) + + +def combine_stream_chunks( + chunks: typing.Sequence[StreamChunk], +) -> JSONDict: + """ + Combine streamed chunks into a single search response. + + - If no chunks, return empty dict. + - If one chunk, return it. + - If we have message chunks (conversation_id + message), find the metadata + chunk (complete search response) and return it; otherwise return last chunk + if it is complete. + - For regular search streams, return the last chunk if it is a complete response. + """ + if not chunks: + return {} + if len(chunks) == 1: + return typing.cast(JSONDict, chunks[0]) + + message_chunks = [c for c in chunks if is_message_chunk(c)] + if message_chunks: + for chunk in chunks: + if is_complete_search_response(chunk): + return typing.cast(JSONDict, chunk) + return typing.cast(JSONDict, chunks[-1]) + + last = chunks[-1] + if is_complete_search_response(last): + return typing.cast(JSONDict, last) + return typing.cast(JSONDict, last) + + +def _chunk_from_text(text: str) -> MessageChunk: + return {"conversation_id": "unknown", "message": text} From b414c77a5189db0d89977d4903cb0203583235b5 Mon Sep 17 00:00:00 2001 From: Fanis Tharropoulos Date: Fri, 6 Feb 2026 17:54:36 +0200 Subject: [PATCH 3/8] feat(async): support streaming conversation search over sse - add async sse handling with chunk parsing, callbacks, and final response combine - wire stream_config and conversation_stream through async search api --- src/typesense/async_/api_call.py | 103 ++++++++++++++++++++++++++++++ src/typesense/async_/documents.py | 12 +++- 2 files changed, 114 insertions(+), 1 deletion(-) diff --git a/src/typesense/async_/api_call.py b/src/typesense/async_/api_call.py index 0953310..8ce20d9 100644 --- a/src/typesense/async_/api_call.py +++ b/src/typesense/async_/api_call.py @@ -31,6 +31,7 @@ by other components of the library. """ +import json import sys from types import MappingProxyType, TracebackType @@ -51,6 +52,14 @@ ) from typesense.node_manager import NodeManager from typesense.request_handler import RequestHandler +from typesense.stream_handlers import ( + JSONDict, + StreamChunk, + combine_stream_chunks, + is_message_chunk, + parse_sse_line, +) +from typesense.types.document import StreamConfig if sys.version_info >= (3, 11): import typing @@ -186,6 +195,8 @@ async def get( entity_type: typing.Type[TEntityDict], as_json: typing.Literal[False], params: typing.Union[TParams, None] = None, + stream_config: StreamConfig[TEntityDict] | None = None, + is_streaming_request: bool = False, ) -> str: """ Execute an async GET request to the Typesense API. @@ -207,6 +218,8 @@ async def get( entity_type: typing.Type[TEntityDict], as_json: typing.Literal[True] = True, params: typing.Union[TParams, None] = None, + stream_config: StreamConfig[TEntityDict] | None = None, + is_streaming_request: bool = False, ) -> TEntityDict: """ Execute an async GET request to the Typesense API. @@ -227,6 +240,8 @@ async def get( entity_type: typing.Type[TEntityDict], as_json: typing.Union[typing.Literal[True], typing.Literal[False]] = True, params: typing.Union[TParams, None] = None, + stream_config: StreamConfig[TEntityDict] | None = None, + is_streaming_request: bool = False, ) -> typing.Union[TEntityDict, str]: """ Execute an async GET request to the Typesense API. @@ -246,6 +261,8 @@ async def get( entity_type, as_json, params=params, + stream_config=stream_config, + is_streaming_request=is_streaming_request, ) @typing.overload @@ -414,6 +431,8 @@ async def _execute_request( as_json: typing.Literal[True], last_exception: typing.Union[None, Exception] = None, num_retries: int = 0, + stream_config: StreamConfig[TEntityDict] | None = None, + is_streaming_request: bool = False, **kwargs: typing.Unpack[SessionFunctionKwargs[TParams, TBody]], ) -> TEntityDict: """Execute an async request with retry logic.""" @@ -427,6 +446,8 @@ async def _execute_request( as_json: typing.Literal[False], last_exception: typing.Union[None, Exception] = None, num_retries: int = 0, + stream_config: StreamConfig[TEntityDict] | None = None, + is_streaming_request: bool = False, **kwargs: typing.Unpack[SessionFunctionKwargs[TParams, TBody]], ) -> str: """Execute an async request with retry logic.""" @@ -439,6 +460,8 @@ async def _execute_request( as_json: typing.Union[typing.Literal[True], typing.Literal[False]] = True, last_exception: typing.Union[None, Exception] = None, num_retries: int = 0, + stream_config: StreamConfig[TEntityDict] | None = None, + is_streaming_request: bool = False, **kwargs: typing.Unpack[SessionFunctionKwargs[TParams, TBody]], ) -> typing.Union[TEntityDict, str]: """ @@ -470,6 +493,10 @@ async def _execute_request( node, url, request_kwargs = self._prepare_request_params(endpoint, **kwargs) try: + if is_streaming_request and method == "GET": + return await self._handle_streaming_get( + url, entity_type, stream_config, **request_kwargs + ) return await self._make_request_and_process_response( method, url, @@ -479,6 +506,13 @@ async def _execute_request( ) except _SERVER_ERRORS as server_error: self.node_manager.set_node_health(node, is_healthy=False) + if is_streaming_request and stream_config: + on_error = stream_config.get("on_error") + if on_error: + try: + on_error(server_error) + except Exception: + pass return await self._execute_request( method, endpoint, @@ -486,6 +520,8 @@ async def _execute_request( as_json, last_exception=server_error, num_retries=num_retries + 1, + stream_config=stream_config, + is_streaming_request=is_streaming_request, **kwargs, ) @@ -516,6 +552,73 @@ async def _make_request_and_process_response( else typing.cast(str, request_response) ) + async def _handle_streaming_get( + self, + url: str, + entity_type: typing.Type[TEntityDict], + stream_config: StreamConfig[TEntityDict] | None, + **kwargs: typing.Unpack[SessionFunctionKwargs[TParams, TBody]], + ) -> TEntityDict: + """Perform an async streaming GET, parse SSE lines, invoke callbacks, return combined result.""" + headers: typing.Dict[str, str] = { + self.request_handler.api_key_header_name: self.config.api_key, + "Accept": "text/event-stream", + } + headers.update(self.config.additional_headers) + extra_headers = kwargs.get("headers") + if extra_headers: + headers.update(extra_headers) + + params = kwargs.get("params") + content: typing.Union[str, bytes, None] = None + if body := kwargs.get("data"): + if isinstance(body, (str, bytes)): + content = body + else: + content = json.dumps(body) + + all_chunks: typing.List[StreamChunk] = [] + async with self._client.stream( + "GET", + url, + params=params, + content=content, + headers=headers, + timeout=self.config.connection_timeout_seconds, + ) as response: + if response.status_code < 200 or response.status_code >= 300: + await response.aread() + error_message = self.request_handler._get_error_message(response) + raise self.request_handler._get_exception(response.status_code)( + response.status_code, + error_message, + ) + async for line in response.aiter_lines(): + chunk = parse_sse_line(line) + if chunk is not None: + all_chunks.append(chunk) + if stream_config and is_message_chunk(chunk): + on_chunk = stream_config.get("on_chunk") + if on_chunk: + try: + on_chunk(chunk) + except Exception: + pass + + self.node_manager.set_node_health( + self.node_manager.get_node(), + is_healthy=True, + ) + final: JSONDict = combine_stream_chunks(all_chunks) + if stream_config: + on_complete = stream_config.get("on_complete") + if on_complete: + try: + on_complete(typing.cast(TEntityDict, final)) + except Exception: + pass + return typing.cast(TEntityDict, final) + def _prepare_request_params( self, endpoint: str, diff --git a/src/typesense/async_/documents.py b/src/typesense/async_/documents.py index 8228762..a2d6144 100644 --- a/src/typesense/async_/documents.py +++ b/src/typesense/async_/documents.py @@ -43,6 +43,7 @@ ImportResponseWithId, SearchParameters, SearchResponse, + StreamConfigBuilder, UpdateByFilterParameters, UpdateByFilterResponse, ) @@ -333,16 +334,25 @@ async def search(self, search_parameters: SearchParameters) -> SearchResponse[TD Args: search_parameters (SearchParameters): The search parameters. + Use conversation_stream=True and optionally stream_config (on_chunk, + on_complete, on_error) for conversational search streaming. Returns: SearchResponse[TDoc]: The search response containing matching documents. """ - stringified_search_params = stringify_search_params(search_parameters) + params_for_api = dict(search_parameters) + stream_config = params_for_api.pop("stream_config", None) + if isinstance(stream_config, StreamConfigBuilder): + stream_config = stream_config.build() + conversation_stream = params_for_api.get("conversation_stream") is True + stringified_search_params = stringify_search_params(params_for_api) response: SearchResponse[TDoc] = await self.api_call.get( self._endpoint_path("search"), params=stringified_search_params, entity_type=SearchResponse, as_json=True, + stream_config=stream_config, + is_streaming_request=conversation_stream, ) return response From 88dd13f363a46fea586da58e7feee82eecd483db Mon Sep 17 00:00:00 2001 From: Fanis Tharropoulos Date: Fri, 6 Feb 2026 17:46:55 +0200 Subject: [PATCH 4/8] test(streaming): add fixtures for async and sync stream handling - provide fake sse stream responses and contexts for unit tests - add integration fixtures for conversational streaming collections and docs --- tests/fixtures/streaming_fixtures.py | 180 +++++++++++++++++++++++++++ 1 file changed, 180 insertions(+) create mode 100644 tests/fixtures/streaming_fixtures.py diff --git a/tests/fixtures/streaming_fixtures.py b/tests/fixtures/streaming_fixtures.py new file mode 100644 index 0000000..4df0d52 --- /dev/null +++ b/tests/fixtures/streaming_fixtures.py @@ -0,0 +1,180 @@ +"""Fixtures for streaming tests.""" + +import json +import os +import sys +from types import TracebackType + +import pytest +import requests + +if sys.version_info >= (3, 11): + import typing +else: + import typing_extensions as typing + + +JSONPrimitive: typing.TypeAlias = typing.Union[str, int, float, bool, None] +JSONValue: typing.TypeAlias = typing.Union[ + JSONPrimitive, typing.Dict[str, "JSONValue"], typing.List["JSONValue"] +] +JSONDict: typing.TypeAlias = typing.Dict[str, JSONValue] + + +class FakeAsyncStreamResponse: + """Minimal async streaming response for httpx.AsyncClient.stream().""" + + def __init__( + self, + *, + lines: typing.Sequence[str], + status_code: int = 200, + headers: typing.Mapping[str, str] | None = None, + text: str = "", + ) -> None: + self.status_code = status_code + self._lines = list(lines) + self.headers = dict(headers or {}) + self.text = text + + async def aiter_lines(self) -> typing.AsyncIterator[str]: + for line in self._lines: + yield line + + async def aread(self) -> bytes: + return self.text.encode() + + def json(self) -> JSONDict: + return typing.cast(JSONDict, json.loads(self.text)) + + +class FakeAsyncStreamContext: + """Async context manager that yields a fake streaming response.""" + + def __init__(self, response: FakeAsyncStreamResponse) -> None: + self._response = response + + async def __aenter__(self) -> FakeAsyncStreamResponse: + return self._response + + async def __aexit__( + self, + exc_type: type[BaseException] | None, + exc_val: BaseException | None, + exc_tb: TracebackType | None, + ) -> None: + return None + + +class FakeStreamResponse: + """Minimal streaming response for httpx.Client.stream().""" + + def __init__( + self, + *, + lines: typing.Sequence[str], + status_code: int = 200, + headers: typing.Mapping[str, str] | None = None, + text: str = "", + ) -> None: + self.status_code = status_code + self._lines = list(lines) + self.headers = dict(headers or {}) + self.text = text + + def iter_lines(self) -> typing.Iterator[str]: + for line in self._lines: + yield line + + def read(self) -> bytes: + return self.text.encode() + + def json(self) -> JSONDict: + return typing.cast(JSONDict, json.loads(self.text)) + + +class FakeStreamContext: + """Sync context manager that yields a fake streaming response.""" + + def __init__(self, response: FakeStreamResponse) -> None: + self._response = response + + def __enter__(self) -> FakeStreamResponse: + return self._response + + def __exit__( + self, + exc_type: type[BaseException] | None, + exc_val: BaseException | None, + exc_tb: TracebackType | None, + ) -> None: + return None + + +@pytest.fixture(name="stream_response_async") +def stream_response_async_fixture() -> type[FakeAsyncStreamResponse]: + return FakeAsyncStreamResponse + + +@pytest.fixture(name="stream_context_async") +def stream_context_async_fixture() -> type[FakeAsyncStreamContext]: + return FakeAsyncStreamContext + + +@pytest.fixture(name="stream_response") +def stream_response_fixture() -> type[FakeStreamResponse]: + return FakeStreamResponse + + +@pytest.fixture(name="stream_context") +def stream_context_fixture() -> type[FakeStreamContext]: + return FakeStreamContext + + +@pytest.fixture(name="create_streaming_collection") +def create_streaming_collection_fixture(delete_all: None) -> str: + """Create a collection for streaming tests with an auto-embedding field.""" + open_ai_key = os.environ.get("OPEN_AI_KEY") + if not open_ai_key: + pytest.skip("OPEN_AI_KEY is required for streaming integration tests.") + url = "http://localhost:8108/collections" + headers = {"X-TYPESENSE-API-KEY": "xyz"} + collection_data = { + "name": "streaming_docs", + "fields": [ + { + "name": "title", + "type": "string", + }, + { + "name": "embedding", + "type": "float[]", + "embed": { + "from": ["title"], + "model_config": { + "model_name": "openai/text-embedding-3-small", + "api_key": open_ai_key, + }, + }, + }, + ], + } + + response = requests.post(url, headers=headers, json=collection_data, timeout=3) + response.raise_for_status() + return "streaming_docs" + + +@pytest.fixture(name="create_streaming_document") +def create_streaming_document_fixture(create_streaming_collection: str) -> str: + """Create a document for streaming tests.""" + url = "http://localhost:8108/collections/streaming_docs/documents" + headers = {"X-TYPESENSE-API-KEY": "xyz"} + document_data = { + "id": "stream-1", + "title": "Company profile", + } + + response = requests.post(url, headers=headers, json=document_data, timeout=3) + response.raise_for_status() + return "stream-1" From 718368f5c25d5785e9ea9124ba6d69e03db7abc0 Mon Sep 17 00:00:00 2001 From: Fanis Tharropoulos Date: Fri, 6 Feb 2026 17:49:09 +0200 Subject: [PATCH 5/8] fix(utils): fix prefix in sync client --- utils/run-unasync.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/utils/run-unasync.py b/utils/run-unasync.py index 49feabe..b4fba74 100644 --- a/utils/run-unasync.py +++ b/utils/run-unasync.py @@ -23,6 +23,8 @@ def collect_class_replacements(source_dir: Path) -> dict[str, str]: async_name = match.group(1) replacements[async_name] = async_name[len("Async") :] replacements["aclose"] = "close" + replacements["aiter_lines"] = "iter_lines" + replacements["aread"] = "read" return replacements From 0948ef2c4faff063016907d8b900b44924ae00ed Mon Sep 17 00:00:00 2001 From: Fanis Tharropoulos Date: Fri, 6 Feb 2026 17:47:44 +0200 Subject: [PATCH 6/8] feat(sync): generate sync client for streaming responses --- src/typesense/sync/api_call.py | 103 ++++++++++++++++++++++++++++++++ src/typesense/sync/documents.py | 12 +++- 2 files changed, 114 insertions(+), 1 deletion(-) diff --git a/src/typesense/sync/api_call.py b/src/typesense/sync/api_call.py index a24ce6a..8557aa2 100644 --- a/src/typesense/sync/api_call.py +++ b/src/typesense/sync/api_call.py @@ -31,6 +31,7 @@ by other components of the library. """ +import json import sys from types import MappingProxyType, TracebackType @@ -51,6 +52,14 @@ ) from typesense.node_manager import NodeManager from typesense.request_handler import RequestHandler +from typesense.stream_handlers import ( + JSONDict, + StreamChunk, + combine_stream_chunks, + is_message_chunk, + parse_sse_line, +) +from typesense.types.document import StreamConfig if sys.version_info >= (3, 11): import typing @@ -186,6 +195,8 @@ def get( entity_type: typing.Type[TEntityDict], as_json: typing.Literal[False], params: typing.Union[TParams, None] = None, + stream_config: StreamConfig[TEntityDict] | None = None, + is_streaming_request: bool = False, ) -> str: """ Execute an async GET request to the Typesense API. @@ -207,6 +218,8 @@ def get( entity_type: typing.Type[TEntityDict], as_json: typing.Literal[True] = True, params: typing.Union[TParams, None] = None, + stream_config: StreamConfig[TEntityDict] | None = None, + is_streaming_request: bool = False, ) -> TEntityDict: """ Execute an async GET request to the Typesense API. @@ -227,6 +240,8 @@ def get( entity_type: typing.Type[TEntityDict], as_json: typing.Union[typing.Literal[True], typing.Literal[False]] = True, params: typing.Union[TParams, None] = None, + stream_config: StreamConfig[TEntityDict] | None = None, + is_streaming_request: bool = False, ) -> typing.Union[TEntityDict, str]: """ Execute an async GET request to the Typesense API. @@ -246,6 +261,8 @@ def get( entity_type, as_json, params=params, + stream_config=stream_config, + is_streaming_request=is_streaming_request, ) @typing.overload @@ -414,6 +431,8 @@ def _execute_request( as_json: typing.Literal[True], last_exception: typing.Union[None, Exception] = None, num_retries: int = 0, + stream_config: StreamConfig[TEntityDict] | None = None, + is_streaming_request: bool = False, **kwargs: typing.Unpack[SessionFunctionKwargs[TParams, TBody]], ) -> TEntityDict: """Execute an async request with retry logic.""" @@ -427,6 +446,8 @@ def _execute_request( as_json: typing.Literal[False], last_exception: typing.Union[None, Exception] = None, num_retries: int = 0, + stream_config: StreamConfig[TEntityDict] | None = None, + is_streaming_request: bool = False, **kwargs: typing.Unpack[SessionFunctionKwargs[TParams, TBody]], ) -> str: """Execute an async request with retry logic.""" @@ -439,6 +460,8 @@ def _execute_request( as_json: typing.Union[typing.Literal[True], typing.Literal[False]] = True, last_exception: typing.Union[None, Exception] = None, num_retries: int = 0, + stream_config: StreamConfig[TEntityDict] | None = None, + is_streaming_request: bool = False, **kwargs: typing.Unpack[SessionFunctionKwargs[TParams, TBody]], ) -> typing.Union[TEntityDict, str]: """ @@ -470,6 +493,10 @@ def _execute_request( node, url, request_kwargs = self._prepare_request_params(endpoint, **kwargs) try: + if is_streaming_request and method == "GET": + return self._handle_streaming_get( + url, entity_type, stream_config, **request_kwargs + ) return self._make_request_and_process_response( method, url, @@ -479,6 +506,13 @@ def _execute_request( ) except _SERVER_ERRORS as server_error: self.node_manager.set_node_health(node, is_healthy=False) + if is_streaming_request and stream_config: + on_error = stream_config.get("on_error") + if on_error: + try: + on_error(server_error) + except Exception: + pass return self._execute_request( method, endpoint, @@ -486,6 +520,8 @@ def _execute_request( as_json, last_exception=server_error, num_retries=num_retries + 1, + stream_config=stream_config, + is_streaming_request=is_streaming_request, **kwargs, ) @@ -516,6 +552,73 @@ def _make_request_and_process_response( else typing.cast(str, request_response) ) + def _handle_streaming_get( + self, + url: str, + entity_type: typing.Type[TEntityDict], + stream_config: StreamConfig[TEntityDict] | None, + **kwargs: typing.Unpack[SessionFunctionKwargs[TParams, TBody]], + ) -> TEntityDict: + """Perform an async streaming GET, parse SSE lines, invoke callbacks, return combined result.""" + headers: typing.Dict[str, str] = { + self.request_handler.api_key_header_name: self.config.api_key, + "Accept": "text/event-stream", + } + headers.update(self.config.additional_headers) + extra_headers = kwargs.get("headers") + if extra_headers: + headers.update(extra_headers) + + params = kwargs.get("params") + content: typing.Union[str, bytes, None] = None + if body := kwargs.get("data"): + if isinstance(body, (str, bytes)): + content = body + else: + content = json.dumps(body) + + all_chunks: typing.List[StreamChunk] = [] + with self._client.stream( + "GET", + url, + params=params, + content=content, + headers=headers, + timeout=self.config.connection_timeout_seconds, + ) as response: + if response.status_code < 200 or response.status_code >= 300: + response.read() + error_message = self.request_handler._get_error_message(response) + raise self.request_handler._get_exception(response.status_code)( + response.status_code, + error_message, + ) + for line in response.iter_lines(): + chunk = parse_sse_line(line) + if chunk is not None: + all_chunks.append(chunk) + if stream_config and is_message_chunk(chunk): + on_chunk = stream_config.get("on_chunk") + if on_chunk: + try: + on_chunk(chunk) + except Exception: + pass + + self.node_manager.set_node_health( + self.node_manager.get_node(), + is_healthy=True, + ) + final: JSONDict = combine_stream_chunks(all_chunks) + if stream_config: + on_complete = stream_config.get("on_complete") + if on_complete: + try: + on_complete(typing.cast(TEntityDict, final)) + except Exception: + pass + return typing.cast(TEntityDict, final) + def _prepare_request_params( self, endpoint: str, diff --git a/src/typesense/sync/documents.py b/src/typesense/sync/documents.py index b22ef69..5eff55c 100644 --- a/src/typesense/sync/documents.py +++ b/src/typesense/sync/documents.py @@ -43,6 +43,7 @@ ImportResponseWithId, SearchParameters, SearchResponse, + StreamConfigBuilder, UpdateByFilterParameters, UpdateByFilterResponse, ) @@ -333,16 +334,25 @@ def search(self, search_parameters: SearchParameters) -> SearchResponse[TDoc]: Args: search_parameters (SearchParameters): The search parameters. + Use conversation_stream=True and optionally stream_config (on_chunk, + on_complete, on_error) for conversational search streaming. Returns: SearchResponse[TDoc]: The search response containing matching documents. """ - stringified_search_params = stringify_search_params(search_parameters) + params_for_api = dict(search_parameters) + stream_config = params_for_api.pop("stream_config", None) + if isinstance(stream_config, StreamConfigBuilder): + stream_config = stream_config.build() + conversation_stream = params_for_api.get("conversation_stream") is True + stringified_search_params = stringify_search_params(params_for_api) response: SearchResponse[TDoc] = self.api_call.get( self._endpoint_path("search"), params=stringified_search_params, entity_type=SearchResponse, as_json=True, + stream_config=stream_config, + is_streaming_request=conversation_stream, ) return response From 599d3fadbabb68a091e5e3caa9a0e8c0ee9a7d44 Mon Sep 17 00:00:00 2001 From: Fanis Tharropoulos Date: Fri, 6 Feb 2026 17:48:28 +0200 Subject: [PATCH 7/8] test(streaming): add tests for streaming responses - test both async and sync version of the client - add unit tests and tests against real typesense instance --- tests/streaming_async_test.py | 454 ++++++++++++++++++++++++++++++++++ tests/streaming_test.py | 414 +++++++++++++++++++++++++++++++ 2 files changed, 868 insertions(+) create mode 100644 tests/streaming_async_test.py create mode 100644 tests/streaming_test.py diff --git a/tests/streaming_async_test.py b/tests/streaming_async_test.py new file mode 100644 index 0000000..2882c22 --- /dev/null +++ b/tests/streaming_async_test.py @@ -0,0 +1,454 @@ +"""Async streaming conversation search tests.""" + +import sys + +import pytest + +if sys.version_info >= (3, 11): + import typing +else: + import typing_extensions as typing + +from tests.fixtures.streaming_fixtures import ( + FakeAsyncStreamContext, + FakeAsyncStreamResponse, + JSONValue, +) +from typesense.async_.api_call import AsyncApiCall +from typesense.async_.documents import AsyncDocuments +from typesense.exceptions import ServerError +from typesense.types.document import ( + DocumentSchema, + MessageChunk, + StreamConfig, + StreamConfigBuilder, +) + + +async def test_streaming_search_invokes_on_chunk_async( + fake_async_documents: AsyncDocuments[DocumentSchema], + stream_response_async: type[FakeAsyncStreamResponse], + stream_context_async: type[FakeAsyncStreamContext], + monkeypatch: pytest.MonkeyPatch, +) -> None: + """Test that streaming search invokes on_chunk for each message chunk.""" + chunks_received: typing.List[MessageChunk] = [] + + def on_chunk(chunk: MessageChunk) -> None: + chunks_received.append(chunk) + + stream_config: StreamConfig[DocumentSchema] = {"on_chunk": on_chunk} + + sse_lines = [ + 'data: {"conversation_id":"123","message":"First chunk"}', + 'data: {"conversation_id":"123","message":"Second chunk"}', + '{"found": 2, "hits": [], "page": 1, "search_time_ms": 10}', + ] + response = stream_response_async(lines=sse_lines) + + def fake_stream( + method: str, + url: str, + params: typing.Mapping[str, str] | None = None, + content: str | bytes | None = None, + headers: typing.Mapping[str, str] | None = None, + timeout: float | None = None, + ) -> FakeAsyncStreamContext: + return stream_context_async(response) + + monkeypatch.setattr( + fake_async_documents.api_call._client, + "stream", + fake_stream, + ) + + result = await fake_async_documents.search( + { + "q": "test query", + "query_by": "title", + "conversation_stream": True, + "stream_config": stream_config, + } + ) + + assert len(chunks_received) == 2 + assert chunks_received[0]["message"] == "First chunk" + assert chunks_received[1]["message"] == "Second chunk" + assert result["found"] == 2 + + +async def test_streaming_search_handles_plain_text_lines_async( + fake_async_documents: AsyncDocuments[DocumentSchema], + stream_response_async: type[FakeAsyncStreamResponse], + stream_context_async: type[FakeAsyncStreamContext], + monkeypatch: pytest.MonkeyPatch, +) -> None: + """Plain text lines should be treated as message chunks with unknown id.""" + chunks_received: typing.List[MessageChunk] = [] + + def on_chunk(chunk: MessageChunk) -> None: + chunks_received.append(chunk) + + sse_lines = [ + "Hello", + '{"found": 1, "hits": [], "page": 1, "search_time_ms": 5}', + ] + response = stream_response_async(lines=sse_lines) + + def fake_stream( + method: str, + url: str, + params: typing.Mapping[str, str] | None = None, + content: str | bytes | None = None, + headers: typing.Mapping[str, str] | None = None, + timeout: float | None = None, + ) -> FakeAsyncStreamContext: + return stream_context_async(response) + + monkeypatch.setattr( + fake_async_documents.api_call._client, + "stream", + fake_stream, + ) + + await fake_async_documents.search( + { + "q": "test", + "query_by": "title", + "conversation_stream": True, + "stream_config": {"on_chunk": on_chunk}, + } + ) + + assert len(chunks_received) == 1 + assert chunks_received[0]["conversation_id"] == "unknown" + assert chunks_received[0]["message"] == "Hello" + + +async def test_streaming_search_handles_missing_fields_async( + fake_async_documents: AsyncDocuments[DocumentSchema], + stream_response_async: type[FakeAsyncStreamResponse], + stream_context_async: type[FakeAsyncStreamContext], + monkeypatch: pytest.MonkeyPatch, +) -> None: + """JSON lines without conversation_id/message should use defaults.""" + chunks_received: typing.List[MessageChunk] = [] + + def on_chunk(chunk: MessageChunk) -> None: + chunks_received.append(chunk) + + sse_lines = [ + 'data: {"foo":"bar"}', + '{"found": 1, "hits": [], "page": 1, "search_time_ms": 5}', + ] + response = stream_response_async(lines=sse_lines) + + def fake_stream( + method: str, + url: str, + params: typing.Mapping[str, str] | None = None, + content: str | bytes | None = None, + headers: typing.Mapping[str, str] | None = None, + timeout: float | None = None, + ) -> FakeAsyncStreamContext: + return stream_context_async(response) + + monkeypatch.setattr( + fake_async_documents.api_call._client, + "stream", + fake_stream, + ) + + await fake_async_documents.search( + { + "q": "test", + "query_by": "title", + "conversation_stream": True, + "stream_config": {"on_chunk": on_chunk}, + } + ) + + assert len(chunks_received) == 1 + assert chunks_received[0]["conversation_id"] == "unknown" + assert chunks_received[0]["message"] == "" + + +async def test_streaming_search_skips_done_marker_async( + fake_async_documents: AsyncDocuments[DocumentSchema], + stream_response_async: type[FakeAsyncStreamResponse], + stream_context_async: type[FakeAsyncStreamContext], + monkeypatch: pytest.MonkeyPatch, +) -> None: + """data: [DONE] lines should be ignored.""" + chunks_received: typing.List[MessageChunk] = [] + + def on_chunk(chunk: MessageChunk) -> None: + chunks_received.append(chunk) + + sse_lines = [ + 'data: {"conversation_id":"123","message":"Chunk"}', + "data: [DONE]", + '{"found": 1, "hits": [], "page": 1, "search_time_ms": 1}', + ] + response = stream_response_async(lines=sse_lines) + + def fake_stream( + method: str, + url: str, + params: typing.Mapping[str, str] | None = None, + content: str | bytes | None = None, + headers: typing.Mapping[str, str] | None = None, + timeout: float | None = None, + ) -> FakeAsyncStreamContext: + return stream_context_async(response) + + monkeypatch.setattr( + fake_async_documents.api_call._client, + "stream", + fake_stream, + ) + + await fake_async_documents.search( + { + "q": "test", + "query_by": "title", + "conversation_stream": True, + "stream_config": {"on_chunk": on_chunk}, + } + ) + + assert len(chunks_received) == 1 + + +async def test_streaming_search_handles_json_array_lines_async( + fake_async_documents: AsyncDocuments[DocumentSchema], + stream_response_async: type[FakeAsyncStreamResponse], + stream_context_async: type[FakeAsyncStreamContext], + monkeypatch: pytest.MonkeyPatch, +) -> None: + """JSON arrays should be treated as plain text message chunks.""" + chunks_received: typing.List[MessageChunk] = [] + + def on_chunk(chunk: MessageChunk) -> None: + chunks_received.append(chunk) + + sse_lines = [ + 'data: ["a", "b"]', + '{"found": 1, "hits": [], "page": 1, "search_time_ms": 1}', + ] + response = stream_response_async(lines=sse_lines) + + def fake_stream( + method: str, + url: str, + params: typing.Mapping[str, str] | None = None, + content: str | bytes | None = None, + headers: typing.Mapping[str, str] | None = None, + timeout: float | None = None, + ) -> FakeAsyncStreamContext: + return stream_context_async(response) + + monkeypatch.setattr( + fake_async_documents.api_call._client, + "stream", + fake_stream, + ) + + await fake_async_documents.search( + { + "q": "test", + "query_by": "title", + "conversation_stream": True, + "stream_config": {"on_chunk": on_chunk}, + } + ) + + assert len(chunks_received) == 1 + assert chunks_received[0]["message"] == '["a", "b"]' + + +async def test_streaming_search_supports_builder_async( + fake_async_documents: AsyncDocuments[DocumentSchema], + stream_response_async: type[FakeAsyncStreamResponse], + stream_context_async: type[FakeAsyncStreamContext], + monkeypatch: pytest.MonkeyPatch, +) -> None: + """Test StreamConfigBuilder for streaming callbacks.""" + complete_calls: typing.List[int] = [] + + stream = StreamConfigBuilder() + + @stream.on_complete + def on_complete(response: typing.Mapping[str, JSONValue]) -> None: + found = response.get("found") + if isinstance(found, int): + complete_calls.append(found) + + sse_lines = [ + 'data: {"conversation_id":"123","message":"Hello"}', + '{"found": 1, "hits": [], "page": 1, "search_time_ms": 5}', + ] + response = stream_response_async(lines=sse_lines) + + def fake_stream( + method: str, + url: str, + params: typing.Mapping[str, str] | None = None, + content: str | bytes | None = None, + headers: typing.Mapping[str, str] | None = None, + timeout: float | None = None, + ) -> FakeAsyncStreamContext: + return stream_context_async(response) + + monkeypatch.setattr( + fake_async_documents.api_call._client, + "stream", + fake_stream, + ) + + await fake_async_documents.search( + { + "q": "test", + "query_by": "title", + "conversation_stream": True, + "stream_config": stream, + } + ) + + assert complete_calls == [1] + + +async def test_stream_config_not_sent_to_api_async( + fake_async_documents: AsyncDocuments[DocumentSchema], + stream_response_async: type[FakeAsyncStreamResponse], + stream_context_async: type[FakeAsyncStreamContext], + monkeypatch: pytest.MonkeyPatch, +) -> None: + """Test that stream_config is removed from API params.""" + captured_params: typing.Dict[str, str] = {} + + sse_lines = [ + '{"found": 0, "hits": [], "page": 1, "search_time_ms": 1}', + ] + response = stream_response_async(lines=sse_lines) + + def fake_stream( + method: str, + url: str, + params: typing.Mapping[str, str] | None = None, + content: str | bytes | None = None, + headers: typing.Mapping[str, str] | None = None, + timeout: float | None = None, + ) -> FakeAsyncStreamContext: + if params: + captured_params.update(params) + return stream_context_async(response) + + monkeypatch.setattr( + fake_async_documents.api_call._client, + "stream", + fake_stream, + ) + + stream_config: StreamConfig[DocumentSchema] = {"on_chunk": lambda _: None} + await fake_async_documents.search( + { + "q": "test", + "query_by": "title", + "conversation_stream": True, + "stream_config": stream_config, + } + ) + + assert "stream_config" not in captured_params + assert captured_params.get("conversation_stream") == "true" + + +async def test_streaming_search_invokes_on_error_async( + fake_async_documents: AsyncDocuments[DocumentSchema], + stream_response_async: type[FakeAsyncStreamResponse], + stream_context_async: type[FakeAsyncStreamContext], + monkeypatch: pytest.MonkeyPatch, +) -> None: + """Test that streaming search invokes on_error for request failures.""" + errors: typing.List[BaseException] = [] + + def on_error(error: BaseException) -> None: + errors.append(error) + + fake_async_documents.api_call.config.num_retries = 0 + + response = stream_response_async( + lines=[], + status_code=500, + headers={"Content-Type": "application/json"}, + text='{"message": "Server error"}', + ) + + def fake_stream( + method: str, + url: str, + params: typing.Mapping[str, str] | None = None, + content: str | bytes | None = None, + headers: typing.Mapping[str, str] | None = None, + timeout: float | None = None, + ) -> FakeAsyncStreamContext: + return stream_context_async(response) + + monkeypatch.setattr( + fake_async_documents.api_call._client, + "stream", + fake_stream, + ) + + with pytest.raises(ServerError): + await fake_async_documents.search( + { + "q": "test", + "query_by": "title", + "conversation_stream": True, + "stream_config": {"on_error": on_error}, + } + ) + + assert len(errors) == 1 + assert isinstance(errors[0], ServerError) + + +@pytest.mark.open_ai +async def test_actual_streaming_search_async( + actual_async_api_call: AsyncApiCall, + create_streaming_collection: str, + create_streaming_document: str, + create_conversations_model: str, +) -> None: + """Integration test against a real Typesense server with conversation streaming.""" + actual_async_documents = AsyncDocuments( + actual_async_api_call, + create_streaming_collection, + ) + chunks_received: typing.List[MessageChunk] = [] + complete_called: typing.List[bool] = [] + + def on_chunk(chunk: MessageChunk) -> None: + chunks_received.append(chunk) + + def on_complete(response: typing.Mapping[str, JSONValue]) -> None: + complete_called.append(True) + + response = await actual_async_documents.search( + { + "q": "What is this document about?", + "query_by": "embedding", + "conversation": True, + "conversation_stream": True, + "conversation_model_id": create_conversations_model, + "prefix": False, + "exclude_fields": "embedding", + "stream_config": {"on_chunk": on_chunk, "on_complete": on_complete}, + } + ) + + assert complete_called == [True] + assert len(chunks_received) > 0 + assert "found" in response or "hits" in response diff --git a/tests/streaming_test.py b/tests/streaming_test.py new file mode 100644 index 0000000..44b110b --- /dev/null +++ b/tests/streaming_test.py @@ -0,0 +1,414 @@ +"""Sync streaming conversation search tests.""" + +import sys + +import pytest + +if sys.version_info >= (3, 11): + import typing +else: + import typing_extensions as typing + +from tests.fixtures.streaming_fixtures import ( + FakeStreamContext, + FakeStreamResponse, + JSONValue, +) +from typesense.exceptions import ServerError +from typesense.sync.documents import Documents +from typesense.types.document import ( + DocumentSchema, + MessageChunk, + StreamConfig, + StreamConfigBuilder, +) + + +def test_streaming_search_invokes_on_chunk( + fake_documents: Documents[DocumentSchema], + stream_response: type[FakeStreamResponse], + stream_context: type[FakeStreamContext], + monkeypatch: pytest.MonkeyPatch, +) -> None: + """Test that streaming search invokes on_chunk for each message chunk.""" + chunks_received: typing.List[MessageChunk] = [] + + def on_chunk(chunk: MessageChunk) -> None: + chunks_received.append(chunk) + + stream_config: StreamConfig[DocumentSchema] = {"on_chunk": on_chunk} + + sse_lines = [ + 'data: {"conversation_id":"123","message":"First chunk"}', + 'data: {"conversation_id":"123","message":"Second chunk"}', + '{"found": 2, "hits": [], "page": 1, "search_time_ms": 10}', + ] + response = stream_response(lines=sse_lines) + + def fake_stream( + method: str, + url: str, + params: typing.Mapping[str, str] | None = None, + content: str | bytes | None = None, + headers: typing.Mapping[str, str] | None = None, + timeout: float | None = None, + ) -> FakeStreamContext: + return stream_context(response) + + monkeypatch.setattr( + fake_documents.api_call._client, + "stream", + fake_stream, + ) + + result = fake_documents.search( + { + "q": "test query", + "query_by": "title", + "conversation_stream": True, + "stream_config": stream_config, + } + ) + + assert len(chunks_received) == 2 + assert chunks_received[0]["message"] == "First chunk" + assert chunks_received[1]["message"] == "Second chunk" + assert result["found"] == 2 + + +def test_streaming_search_handles_plain_text_lines( + fake_documents: Documents[DocumentSchema], + stream_response: type[FakeStreamResponse], + stream_context: type[FakeStreamContext], + monkeypatch: pytest.MonkeyPatch, +) -> None: + """Plain text lines should be treated as message chunks with unknown id.""" + chunks_received: typing.List[MessageChunk] = [] + + def on_chunk(chunk: MessageChunk) -> None: + chunks_received.append(chunk) + + sse_lines = [ + "Hello", + '{"found": 1, "hits": [], "page": 1, "search_time_ms": 5}', + ] + response = stream_response(lines=sse_lines) + + def fake_stream( + method: str, + url: str, + params: typing.Mapping[str, str] | None = None, + content: str | bytes | None = None, + headers: typing.Mapping[str, str] | None = None, + timeout: float | None = None, + ) -> FakeStreamContext: + return stream_context(response) + + monkeypatch.setattr( + fake_documents.api_call._client, + "stream", + fake_stream, + ) + + fake_documents.search( + { + "q": "test", + "query_by": "title", + "conversation_stream": True, + "stream_config": {"on_chunk": on_chunk}, + } + ) + + assert len(chunks_received) == 1 + assert chunks_received[0]["conversation_id"] == "unknown" + assert chunks_received[0]["message"] == "Hello" + + +def test_streaming_search_handles_missing_fields( + fake_documents: Documents[DocumentSchema], + stream_response: type[FakeStreamResponse], + stream_context: type[FakeStreamContext], + monkeypatch: pytest.MonkeyPatch, +) -> None: + """JSON lines without conversation_id/message should use defaults.""" + chunks_received: typing.List[MessageChunk] = [] + + def on_chunk(chunk: MessageChunk) -> None: + chunks_received.append(chunk) + + sse_lines = [ + 'data: {"foo":"bar"}', + '{"found": 1, "hits": [], "page": 1, "search_time_ms": 5}', + ] + response = stream_response(lines=sse_lines) + + def fake_stream( + method: str, + url: str, + params: typing.Mapping[str, str] | None = None, + content: str | bytes | None = None, + headers: typing.Mapping[str, str] | None = None, + timeout: float | None = None, + ) -> FakeStreamContext: + return stream_context(response) + + monkeypatch.setattr( + fake_documents.api_call._client, + "stream", + fake_stream, + ) + + fake_documents.search( + { + "q": "test", + "query_by": "title", + "conversation_stream": True, + "stream_config": {"on_chunk": on_chunk}, + } + ) + + assert len(chunks_received) == 1 + assert chunks_received[0]["conversation_id"] == "unknown" + assert chunks_received[0]["message"] == "" + + +def test_streaming_search_skips_done_marker( + fake_documents: Documents[DocumentSchema], + stream_response: type[FakeStreamResponse], + stream_context: type[FakeStreamContext], + monkeypatch: pytest.MonkeyPatch, +) -> None: + """data: [DONE] lines should be ignored.""" + chunks_received: typing.List[MessageChunk] = [] + + def on_chunk(chunk: MessageChunk) -> None: + chunks_received.append(chunk) + + sse_lines = [ + 'data: {"conversation_id":"123","message":"Chunk"}', + "data: [DONE]", + '{"found": 1, "hits": [], "page": 1, "search_time_ms": 1}', + ] + response = stream_response(lines=sse_lines) + + def fake_stream( + method: str, + url: str, + params: typing.Mapping[str, str] | None = None, + content: str | bytes | None = None, + headers: typing.Mapping[str, str] | None = None, + timeout: float | None = None, + ) -> FakeStreamContext: + return stream_context(response) + + monkeypatch.setattr( + fake_documents.api_call._client, + "stream", + fake_stream, + ) + + fake_documents.search( + { + "q": "test", + "query_by": "title", + "conversation_stream": True, + "stream_config": {"on_chunk": on_chunk}, + } + ) + + assert len(chunks_received) == 1 + + +def test_streaming_search_handles_json_array_lines( + fake_documents: Documents[DocumentSchema], + stream_response: type[FakeStreamResponse], + stream_context: type[FakeStreamContext], + monkeypatch: pytest.MonkeyPatch, +) -> None: + """JSON arrays should be treated as plain text message chunks.""" + chunks_received: typing.List[MessageChunk] = [] + + def on_chunk(chunk: MessageChunk) -> None: + chunks_received.append(chunk) + + sse_lines = [ + 'data: ["a", "b"]', + '{"found": 1, "hits": [], "page": 1, "search_time_ms": 1}', + ] + response = stream_response(lines=sse_lines) + + def fake_stream( + method: str, + url: str, + params: typing.Mapping[str, str] | None = None, + content: str | bytes | None = None, + headers: typing.Mapping[str, str] | None = None, + timeout: float | None = None, + ) -> FakeStreamContext: + return stream_context(response) + + monkeypatch.setattr( + fake_documents.api_call._client, + "stream", + fake_stream, + ) + + fake_documents.search( + { + "q": "test", + "query_by": "title", + "conversation_stream": True, + "stream_config": {"on_chunk": on_chunk}, + } + ) + + assert len(chunks_received) == 1 + assert chunks_received[0]["message"] == '["a", "b"]' + + +def test_streaming_search_supports_builder( + fake_documents: Documents[DocumentSchema], + stream_response: type[FakeStreamResponse], + stream_context: type[FakeStreamContext], + monkeypatch: pytest.MonkeyPatch, +) -> None: + """Test StreamConfigBuilder for streaming callbacks.""" + complete_calls: typing.List[int] = [] + + stream = StreamConfigBuilder() + + @stream.on_complete + def on_complete(response: typing.Mapping[str, JSONValue]) -> None: + found = response.get("found") + if isinstance(found, int): + complete_calls.append(found) + + sse_lines = [ + 'data: {"conversation_id":"123","message":"Hello"}', + '{"found": 1, "hits": [], "page": 1, "search_time_ms": 5}', + ] + response = stream_response(lines=sse_lines) + + def fake_stream( + method: str, + url: str, + params: typing.Mapping[str, str] | None = None, + content: str | bytes | None = None, + headers: typing.Mapping[str, str] | None = None, + timeout: float | None = None, + ) -> FakeStreamContext: + return stream_context(response) + + monkeypatch.setattr( + fake_documents.api_call._client, + "stream", + fake_stream, + ) + + fake_documents.search( + { + "q": "test", + "query_by": "title", + "conversation_stream": True, + "stream_config": stream, + } + ) + + assert complete_calls == [1] + + +def test_stream_config_not_sent_to_api( + fake_documents: Documents[DocumentSchema], + stream_response: type[FakeStreamResponse], + stream_context: type[FakeStreamContext], + monkeypatch: pytest.MonkeyPatch, +) -> None: + """Test that stream_config is removed from API params.""" + captured_params: typing.Dict[str, str] = {} + + sse_lines = [ + '{"found": 0, "hits": [], "page": 1, "search_time_ms": 1}', + ] + response = stream_response(lines=sse_lines) + + def fake_stream( + method: str, + url: str, + params: typing.Mapping[str, str] | None = None, + content: str | bytes | None = None, + headers: typing.Mapping[str, str] | None = None, + timeout: float | None = None, + ) -> FakeStreamContext: + if params: + captured_params.update(params) + return stream_context(response) + + monkeypatch.setattr( + fake_documents.api_call._client, + "stream", + fake_stream, + ) + + stream_config: StreamConfig[DocumentSchema] = {"on_chunk": lambda _: None} + fake_documents.search( + { + "q": "test", + "query_by": "title", + "conversation_stream": True, + "stream_config": stream_config, + } + ) + + assert "stream_config" not in captured_params + assert captured_params.get("conversation_stream") == "true" + + +def test_streaming_search_invokes_on_error( + fake_documents: Documents[DocumentSchema], + stream_response: type[FakeStreamResponse], + stream_context: type[FakeStreamContext], + monkeypatch: pytest.MonkeyPatch, +) -> None: + """Test that streaming search invokes on_error for request failures.""" + errors: typing.List[BaseException] = [] + + def on_error(error: BaseException) -> None: + errors.append(error) + + fake_documents.api_call.config.num_retries = 0 + + response = stream_response( + lines=[], + status_code=500, + headers={"Content-Type": "application/json"}, + text='{"message": "Server error"}', + ) + + def fake_stream( + method: str, + url: str, + params: typing.Mapping[str, str] | None = None, + content: str | bytes | None = None, + headers: typing.Mapping[str, str] | None = None, + timeout: float | None = None, + ) -> FakeStreamContext: + return stream_context(response) + + monkeypatch.setattr( + fake_documents.api_call._client, + "stream", + fake_stream, + ) + + with pytest.raises(ServerError): + fake_documents.search( + { + "q": "test", + "query_by": "title", + "conversation_stream": True, + "stream_config": {"on_error": on_error}, + } + ) + + assert len(errors) == 1 + assert isinstance(errors[0], ServerError) From cce50e7a8dd45d2d78da3a2040eda38bd5f60cf2 Mon Sep 17 00:00:00 2001 From: Fanis Tharropoulos Date: Fri, 6 Feb 2026 17:48:47 +0200 Subject: [PATCH 8/8] docs(examples): add examples for streaming conversations --- examples/async_conversation_streaming.py | 136 +++++++++++++++++++++++ examples/conversation_streaming.py | 131 ++++++++++++++++++++++ 2 files changed, 267 insertions(+) create mode 100644 examples/async_conversation_streaming.py create mode 100644 examples/conversation_streaming.py diff --git a/examples/async_conversation_streaming.py b/examples/async_conversation_streaming.py new file mode 100644 index 0000000..ec7aab0 --- /dev/null +++ b/examples/async_conversation_streaming.py @@ -0,0 +1,136 @@ +import asyncio +import os +import sys + +curr_dir = os.path.dirname(os.path.realpath(__file__)) +repo_root = os.path.abspath(os.path.join(curr_dir, os.pardir)) +sys.path.insert(1, os.path.join(repo_root, "src")) + +import typesense + +from typesense.types.document import MessageChunk, StreamConfigBuilder + + +def require_env(name: str) -> str: + value = os.environ.get(name) + if not value: + raise RuntimeError(f"Missing required environment variable: {name}") + return value + + +async def main() -> None: + typesense_api_key = require_env("TYPESENSE_API_KEY") + openai_api_key = require_env("OPENAI_API_KEY") + + client = typesense.AsyncClient( + { + "api_key": typesense_api_key, + "nodes": [ + { + "host": "localhost", + "port": "8108", + "protocol": "http", + } + ], + "connection_timeout_seconds": 10, + } + ) + + try: + try: + await client.conversations_models["conv-model-1"].delete() + except Exception: + pass + + try: + await client.collections["streaming_docs"].delete() + except Exception: + pass + + try: + await client.collections["conversation_store"].delete() + except Exception: + pass + + await client.collections.create( + { + "name": "conversation_store", + "fields": [ + {"name": "conversation_id", "type": "string"}, + {"name": "model_id", "type": "string"}, + {"name": "timestamp", "type": "int32"}, + {"name": "role", "type": "string", "index": False}, + {"name": "message", "type": "string", "index": False}, + ], + } + ) + + await client.collections.create( + { + "name": "streaming_docs", + "fields": [ + {"name": "title", "type": "string"}, + { + "name": "embedding", + "type": "float[]", + "embed": { + "from": ["title"], + "model_config": { + "model_name": "openai/text-embedding-3-small", + "api_key": openai_api_key, + }, + }, + }, + ], + } + ) + + await client.collections["streaming_docs"].documents.create( + {"id": "stream-1", "title": "Company profile: a developer tools firm."} + ) + await client.collections["streaming_docs"].documents.create( + {"id": "stream-2", "title": "Internal memo about quarterly planning."} + ) + + conversation_model = await client.conversations_models.create( + { + "id": "conv-model-1", + "model_name": "openai/gpt-3.5-turbo", + "history_collection": "conversation_store", + "api_key": openai_api_key, + "system_prompt": ( + "You are an assistant for question-answering. " + "Only use the provided context. Add some fluff about you Being an assistant built for Typesense Conversational Search and a brief overview of how it works" + ), + "max_bytes": 16384, + } + ) + + stream = StreamConfigBuilder() + + @stream.on_chunk + def on_chunk(chunk: MessageChunk) -> None: + print(chunk["message"], end="", flush=True) + + @stream.on_complete + def on_complete(response: dict) -> None: + print("\n---\nComplete response keys:", response.keys()) + + await client.collections["streaming_docs"].documents.search( + { + "q": "What is this document about?", + "query_by": "embedding", + "exclude_fields": "embedding", + "conversation": True, + "prefix": False, + "conversation_stream": True, + "conversation_model_id": conversation_model["id"], + "stream_config": stream, + } + ) + finally: + await client.api_call.aclose() + + +if __name__ == "__main__": + asyncio.run(main()) diff --git a/examples/conversation_streaming.py b/examples/conversation_streaming.py new file mode 100644 index 0000000..dbdaf90 --- /dev/null +++ b/examples/conversation_streaming.py @@ -0,0 +1,131 @@ +from operator import truediv +import os +import sys + +curr_dir = os.path.dirname(os.path.realpath(__file__)) +repo_root = os.path.abspath(os.path.join(curr_dir, os.pardir)) +sys.path.insert(1, os.path.join(repo_root, "src")) + +import typesense + +from typesense.types.document import MessageChunk, StreamConfigBuilder + + +def require_env(name: str) -> str: + value = os.environ.get(name) + if not value: + raise RuntimeError(f"Missing required environment variable: {name}") + return value + + +typesense_api_key = require_env("TYPESENSE_API_KEY") +openai_api_key = require_env("OPENAI_API_KEY") + +client = typesense.Client( + { + "api_key": typesense_api_key, + "nodes": [ + { + "host": "localhost", + "port": "8108", + "protocol": "http", + } + ], + "connection_timeout_seconds": 10, + } +) + +try: + client.conversations_models["conv-model-1"].delete() +except Exception: + pass + +try: + client.collections["streaming_docs"].delete() +except Exception: + pass + +try: + client.collections["conversation_store"].delete() +except Exception: + pass + +client.collections.create( + { + "name": "conversation_store", + "fields": [ + {"name": "conversation_id", "type": "string"}, + {"name": "model_id", "type": "string"}, + {"name": "timestamp", "type": "int32"}, + {"name": "role", "type": "string", "index": False}, + {"name": "message", "type": "string", "index": False}, + ], + } +) + +client.collections.create( + { + "name": "streaming_docs", + "fields": [ + {"name": "title", "type": "string"}, + { + "name": "embedding", + "type": "float[]", + "embed": { + "from": ["title"], + "model_config": { + "model_name": "openai/text-embedding-3-small", + "api_key": openai_api_key, + }, + }, + }, + ], + } +) + +client.collections["streaming_docs"].documents.create( + {"id": "stream-1", "title": "Company profile: a developer tools firm."} +) +client.collections["streaming_docs"].documents.create( + {"id": "stream-2", "title": "Internal memo about a quarterly planning meeting."} +) + +conversation_model = client.conversations_models.create( + { + "id": "conv-model-1", + "model_name": "openai/gpt-3.5-turbo", + "history_collection": "conversation_store", + "api_key": openai_api_key, + "system_prompt": ( + "You are an assistant for question-answering. " + "Only use the provided context. Add some fluff about you Being an assistant built for Typesense Conversational Search and a brief overview of how it works" + ), + "max_bytes": 16384, + } +) + +stream = StreamConfigBuilder() + + +@stream.on_chunk +def on_chunk(chunk: MessageChunk) -> None: + print(chunk["message"], end="", flush=True) + + +@stream.on_complete +def on_complete(response: dict) -> None: + print("\n---\nComplete response keys:", response.keys()) + + +client.collections["streaming_docs"].documents.search( + { + "q": "What is this document about?", + "query_by": "embedding", + "exclude_fields": "embedding", + "conversation": True, + "prefix": False, + "conversation_stream": True, + "conversation_model_id": conversation_model["id"], + "stream_config": stream, + } +)