From c1b1d0d424982fe3a9f75b40f6d6b2de854f5bae Mon Sep 17 00:00:00 2001 From: parashardhapola Date: Sat, 7 Mar 2026 11:44:13 +0100 Subject: [PATCH 1/4] Update version to 0.19.0 and enhance file upload functionality - Bump package version to 0.19.0. - Increase maximum workers for file uploads from 4 to 6. - Introduce support for uploading chunks to presigned URLs, including progress reporting and error handling. - Refactor upload logic to accommodate both presigned URL and server uploads, improving overall upload reliability. --- cytetype/__init__.py | 2 +- cytetype/api/client.py | 101 +++++++++++++++++++++++++++++--------- cytetype/api/transport.py | 42 ++++++++++++++++ 3 files changed, 121 insertions(+), 24 deletions(-) diff --git a/cytetype/__init__.py b/cytetype/__init__.py index db09ae3..08c2bb0 100644 --- a/cytetype/__init__.py +++ b/cytetype/__init__.py @@ -1,4 +1,4 @@ -__version__ = "0.18.1" +__version__ = "0.19.0" import requests diff --git a/cytetype/api/client.py b/cytetype/api/client.py index a6280b3..7acbd38 100644 --- a/cytetype/api/client.py +++ b/cytetype/api/client.py @@ -40,7 +40,7 @@ def _upload_file( file_kind: UploadFileKind, file_path: str, timeout: float | tuple[float, float] = (60.0, 3600.0), - max_workers: int = 4, + max_workers: int = 6, ) -> UploadResponse: path_obj = Path(file_path) if not path_obj.is_file(): @@ -69,6 +69,10 @@ def _upload_file( n_chunks = math.ceil(size_bytes / chunk_size) if size_bytes > 0 else 0 + presigned_urls: list[str] | None = init_data.get("presigned_urls") + r2_upload_id: str | None = init_data.get("r2_upload_id") + use_r2 = presigned_urls is not None and r2_upload_id is not None + # Step 2 – Upload chunks in parallel. # Each worker thread gets its own HTTPTransport (and thus its own # requests.Session / connection pool) for thread safety. @@ -83,8 +87,61 @@ def _upload_file( ) _progress_lock = threading.Lock() _chunks_done = [0] + _etags: dict[int, str] = {} + _etags_lock = threading.Lock() + + def _update_progress() -> None: + if pbar is not None: + pbar.update(1) + else: + with _progress_lock: + _chunks_done[0] += 1 + done = _chunks_done[0] + pct = 100 * done / n_chunks + print( + f"\r Uploading: {done}/{n_chunks} chunks ({pct:.0f}%)", + end="", + flush=True, + ) + + def _upload_chunk_r2(chunk_idx: int) -> None: + if not hasattr(_tls, "transport"): + _tls.transport = HTTPTransport(base_url, auth_token) + offset = chunk_idx * chunk_size + read_size = min(chunk_size, size_bytes - offset) + with path_obj.open("rb") as f: + f.seek(offset) + chunk_data = f.read(read_size) - def _upload_chunk(chunk_idx: int) -> None: + url = presigned_urls[chunk_idx] # type: ignore[index] + last_exc: Exception | None = None + for attempt in range(1 + len(_CHUNK_RETRY_DELAYS)): + try: + etag = _tls.transport.put_to_presigned_url( + url, chunk_data, timeout=timeout + ) + with _etags_lock: + _etags[chunk_idx] = etag + _update_progress() + return + except (NetworkError, TimeoutError) as exc: + last_exc = exc + except APIError as exc: + if exc.error_code in _RETRYABLE_API_ERROR_CODES: + last_exc = exc + else: + raise + if attempt < len(_CHUNK_RETRY_DELAYS): + delay = _CHUNK_RETRY_DELAYS[attempt] + logger.warning( + f"Chunk {chunk_idx + 1}/{n_chunks} upload failed " + f"(attempt {attempt + 1}/{1 + len(_CHUNK_RETRY_DELAYS)}), " + f"retrying in {delay}s: {last_exc}" + ) + time.sleep(delay) + raise last_exc # type: ignore[misc] + + def _upload_chunk_server(chunk_idx: int) -> None: if not hasattr(_tls, "transport"): _tls.transport = HTTPTransport(base_url, auth_token) offset = chunk_idx * chunk_size @@ -101,18 +158,7 @@ def _upload_chunk(chunk_idx: int) -> None: data=chunk_data, timeout=timeout, ) - if pbar is not None: - pbar.update(1) - else: - with _progress_lock: - _chunks_done[0] += 1 - done = _chunks_done[0] - pct = 100 * done / n_chunks - print( - f"\r Uploading: {done}/{n_chunks} chunks ({pct:.0f}%)", - end="", - flush=True, - ) + _update_progress() return except (NetworkError, TimeoutError) as exc: last_exc = exc @@ -121,7 +167,6 @@ def _upload_chunk(chunk_idx: int) -> None: last_exc = exc else: raise - if attempt < len(_CHUNK_RETRY_DELAYS): delay = _CHUNK_RETRY_DELAYS[attempt] logger.warning( @@ -130,14 +175,15 @@ def _upload_chunk(chunk_idx: int) -> None: f"retrying in {delay}s: {last_exc}" ) time.sleep(delay) - raise last_exc # type: ignore[misc] + upload_fn = _upload_chunk_r2 if use_r2 else _upload_chunk_server + if n_chunks > 0: effective_workers = min(max_workers, n_chunks) try: with ThreadPoolExecutor(max_workers=effective_workers) as pool: - list(pool.map(_upload_chunk, range(n_chunks))) + list(pool.map(upload_fn, range(n_chunks))) if pbar is not None: pbar.close() else: @@ -151,10 +197,19 @@ def _upload_chunk(chunk_idx: int) -> None: print() raise - # Step 3 – Complete upload (returns same UploadResponse shape as before) - _, complete_data = transport.post_empty( - f"upload/{upload_id}/complete", timeout=timeout - ) + # Step 3 – Complete upload + if use_r2: + parts = [{"ETag": _etags[i], "PartNumber": i + 1} for i in range(n_chunks)] + _, complete_data = transport.post_json( + f"upload/{upload_id}/complete", + data={"parts": parts}, + timeout=timeout, + ) + else: + _, complete_data = transport.post_empty( + f"upload/{upload_id}/complete", timeout=timeout + ) + return UploadResponse(**complete_data) @@ -163,7 +218,7 @@ def upload_obs_duckdb( auth_token: str | None, file_path: str, timeout: float | tuple[float, float] = (60.0, 3600.0), - max_workers: int = 4, + max_workers: int = 6, ) -> UploadResponse: return _upload_file( base_url, @@ -180,7 +235,7 @@ def upload_vars_h5( auth_token: str | None, file_path: str, timeout: float | tuple[float, float] = (60.0, 3600.0), - max_workers: int = 4, + max_workers: int = 6, ) -> UploadResponse: return _upload_file( base_url, diff --git a/cytetype/api/transport.py b/cytetype/api/transport.py index 98bce28..897f5c6 100644 --- a/cytetype/api/transport.py +++ b/cytetype/api/transport.py @@ -124,6 +124,48 @@ def put_binary( self._handle_request_error(e) raise # For type checker + def put_to_presigned_url( + self, + url: str, + data: bytes, + timeout: float | tuple[float, float] = (30.0, 3600.0), + ) -> str: + """PUT raw bytes to a presigned URL. Returns the ETag header.""" + try: + response = self.session.put( + url, + data=data, + headers={"Content-Type": "application/octet-stream"}, + timeout=timeout, + ) + response.raise_for_status() + return response.headers.get("ETag", "") + except requests.RequestException as e: + self._handle_request_error(e) + raise + + def post_json( + self, + endpoint: str, + data: dict[str, Any], + timeout: float | tuple[float, float] = 30.0, + ) -> tuple[int, dict[str, Any]]: + """Make POST request with JSON body.""" + url = f"{self.base_url}/{endpoint.lstrip('/')}" + try: + response = self.session.post( + url, + json=data, + headers=self._build_headers(content_type="application/json"), + timeout=timeout, + ) + if not response.ok: + self._parse_error(response) + return response.status_code, response.json() + except requests.RequestException as e: + self._handle_request_error(e) + raise + def get(self, endpoint: str, timeout: int = 30) -> tuple[int, dict[str, Any]]: """Make GET request and return (status_code, data).""" url = f"{self.base_url}/{endpoint.lstrip('/')}" From 80a0e80e36ad02b7a5e2d70d398c067aab89fc25 Mon Sep 17 00:00:00 2001 From: parashardhapola Date: Sat, 7 Mar 2026 11:52:54 +0100 Subject: [PATCH 2/4] Implement validation for presigned URL count during file uploads - Added a check to ensure the number of presigned URLs matches the expected chunk count, raising a ValueError if there is a discrepancy. This enhancement improves error handling and ensures consistency in the file upload process. --- cytetype/api/client.py | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/cytetype/api/client.py b/cytetype/api/client.py index 7acbd38..6819b92 100644 --- a/cytetype/api/client.py +++ b/cytetype/api/client.py @@ -73,6 +73,12 @@ def _upload_file( r2_upload_id: str | None = init_data.get("r2_upload_id") use_r2 = presigned_urls is not None and r2_upload_id is not None + if use_r2 and len(presigned_urls) != n_chunks: # type: ignore[arg-type] + raise ValueError( + f"Server returned {len(presigned_urls)} presigned URLs " # type: ignore[arg-type] + f"but expected {n_chunks} (one per chunk)." + ) + # Step 2 – Upload chunks in parallel. # Each worker thread gets its own HTTPTransport (and thus its own # requests.Session / connection pool) for thread safety. From 765306fe8976fef5b60842dfeabb33eeac2c1bf1 Mon Sep 17 00:00:00 2001 From: parashardhapola Date: Sat, 7 Mar 2026 11:54:04 +0100 Subject: [PATCH 3/4] Enhance ETag handling in HTTPTransport - Updated the response handling in the HTTPTransport class to raise a NetworkError if the ETag header is missing after a successful presigned URL PUT request. This change improves error reporting and ensures that clients are informed of potential issues with the upload response. --- cytetype/api/transport.py | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/cytetype/api/transport.py b/cytetype/api/transport.py index 897f5c6..ead35ac 100644 --- a/cytetype/api/transport.py +++ b/cytetype/api/transport.py @@ -139,7 +139,13 @@ def put_to_presigned_url( timeout=timeout, ) response.raise_for_status() - return response.headers.get("ETag", "") + etag = response.headers.get("ETag") + if not etag: + raise NetworkError( + "Presigned URL PUT succeeded but response is missing the ETag header", + error_code="MISSING_ETAG", + ) + return etag except requests.RequestException as e: self._handle_request_error(e) raise From 195edf04679117c8fc8cc8870f982119b873ca5f Mon Sep 17 00:00:00 2001 From: parashardhapola Date: Sat, 7 Mar 2026 13:10:34 +0100 Subject: [PATCH 4/4] Enhance error handling for presigned URL uploads in HTTPTransport - Added APIError exception handling for client-side errors (HTTP 400-499) during presigned URL uploads. This improvement provides clearer feedback when uploads are rejected, enhancing the robustness of the upload process. --- cytetype/api/transport.py | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/cytetype/api/transport.py b/cytetype/api/transport.py index ead35ac..c5b5a72 100644 --- a/cytetype/api/transport.py +++ b/cytetype/api/transport.py @@ -1,7 +1,7 @@ import requests from typing import Any, BinaryIO -from .exceptions import create_api_exception, NetworkError, TimeoutError +from .exceptions import create_api_exception, APIError, NetworkError, TimeoutError from .schemas import ErrorResponse @@ -138,6 +138,12 @@ def put_to_presigned_url( headers={"Content-Type": "application/octet-stream"}, timeout=timeout, ) + if 400 <= response.status_code < 500: + raise APIError( + f"Presigned URL upload rejected (HTTP {response.status_code}): " + f"{response.text[:200]}", + error_code="PRESIGNED_URL_REJECTED", + ) response.raise_for_status() etag = response.headers.get("ETag") if not etag: