From bc20c1a3ac539579777466cf3297b1c5e52b6e80 Mon Sep 17 00:00:00 2001 From: Hassieb Pakzad <68423100+hassiebp@users.noreply.github.com> Date: Wed, 25 Feb 2026 14:40:14 +0100 Subject: [PATCH] fix(media): retry failed uploads --- langfuse/_task_manager/media_manager.py | 35 +++++++++-- tests/test_media_manager.py | 80 +++++++++++++++++++++++++ 2 files changed, 109 insertions(+), 6 deletions(-) create mode 100644 tests/test_media_manager.py diff --git a/langfuse/_task_manager/media_manager.py b/langfuse/_task_manager/media_manager.py index 58b24e742..c885f9ba4 100644 --- a/langfuse/_task_manager/media_manager.py +++ b/langfuse/_task_manager/media_manager.py @@ -252,13 +252,36 @@ def _process_upload_media_job( headers["x-ms-blob-type"] = "BlockBlob" headers["x-amz-checksum-sha256"] = data["content_sha256_hash"] + def _upload_with_status_check() -> httpx.Response: + response = self._httpx_client.put( + upload_url, + headers=headers, + content=data["content_bytes"], + ) + response.raise_for_status() + + return response + upload_start_time = time.time() - upload_response = self._request_with_backoff( - self._httpx_client.put, - upload_url, - headers=headers, - content=data["content_bytes"], - ) + + try: + upload_response = self._request_with_backoff(_upload_with_status_check) + except httpx.HTTPStatusError as e: + upload_time_ms = int((time.time() - upload_start_time) * 1000) + failed_response = e.response + + if failed_response is not None: + self._request_with_backoff( + self._api_client.media.patch, + media_id=data["media_id"], + uploaded_at=_get_timestamp(), + upload_http_status=failed_response.status_code, + upload_http_error=failed_response.text, + upload_time_ms=upload_time_ms, + ) + + raise + upload_time_ms = int((time.time() - upload_start_time) * 1000) self._request_with_backoff( diff --git a/tests/test_media_manager.py b/tests/test_media_manager.py new file mode 100644 index 000000000..7a10da7dc --- /dev/null +++ b/tests/test_media_manager.py @@ -0,0 +1,80 @@ +from queue import Queue +from types import SimpleNamespace +from unittest.mock import Mock + +import httpx +import pytest + +from langfuse._task_manager.media_manager import MediaManager + + +def _upload_response(status_code: int, text: str = "") -> httpx.Response: + request = httpx.Request("PUT", "https://example.com/upload") + return httpx.Response(status_code=status_code, request=request, text=text) + + +def _upload_job() -> dict: + return { + "media_id": "media-id", + "content_bytes": b"payload", + "content_type": "image/jpeg", + "content_length": 7, + "content_sha256_hash": "sha256hash", + "trace_id": "trace-id", + "observation_id": None, + "field": "input", + } + + +def test_media_upload_retries_on_retryable_http_status(): + media_api = Mock() + media_api.get_upload_url.return_value = SimpleNamespace( + upload_url="https://example.com/upload", + media_id="media-id", + ) + media_api.patch.return_value = None + + httpx_client = Mock() + httpx_client.put.side_effect = [ + _upload_response(503, "temporary failure"), + _upload_response(200, "ok"), + ] + + manager = MediaManager( + api_client=SimpleNamespace(media=media_api), + httpx_client=httpx_client, + media_upload_queue=Queue(), + max_retries=3, + ) + + manager._process_upload_media_job(data=_upload_job()) + + assert httpx_client.put.call_count == 2 + media_api.patch.assert_called_once() + assert media_api.patch.call_args.kwargs["upload_http_status"] == 200 + + +def test_media_upload_gives_up_on_non_retryable_http_status(): + media_api = Mock() + media_api.get_upload_url.return_value = SimpleNamespace( + upload_url="https://example.com/upload", + media_id="media-id", + ) + media_api.patch.return_value = None + + httpx_client = Mock() + httpx_client.put.return_value = _upload_response(403, "forbidden") + + manager = MediaManager( + api_client=SimpleNamespace(media=media_api), + httpx_client=httpx_client, + media_upload_queue=Queue(), + max_retries=3, + ) + + with pytest.raises(httpx.HTTPStatusError): + manager._process_upload_media_job(data=_upload_job()) + + assert httpx_client.put.call_count == 1 + media_api.patch.assert_called_once() + assert media_api.patch.call_args.kwargs["upload_http_status"] == 403