diff --git a/dataframely/_storage/__init__.py b/dataframely/_storage/__init__.py index 780ec79..25528dd 100644 --- a/dataframely/_storage/__init__.py +++ b/dataframely/_storage/__init__.py @@ -2,7 +2,6 @@ # SPDX-License-Identifier: BSD-3-Clause from ._base import StorageBackend +from ._fsspec import get_file_prefix -__all__ = [ - "StorageBackend", -] +__all__ = ["StorageBackend", "get_file_prefix"] diff --git a/dataframely/_storage/_fsspec.py b/dataframely/_storage/_fsspec.py new file mode 100644 index 0000000..24ad780 --- /dev/null +++ b/dataframely/_storage/_fsspec.py @@ -0,0 +1,18 @@ +# Copyright (c) QuantCo 2025-2026 +# SPDX-License-Identifier: BSD-3-Clause + +from fsspec import AbstractFileSystem + + +def get_file_prefix(fs: AbstractFileSystem) -> str: + match fs.protocol: + case "file": + return "" + case str(): + return f"{fs.protocol}://" + case ["file", *_]: + return "" + case [proto, *_]: + return f"{proto}://" + case _: + raise ValueError(f"Unexpected fs.protocol: {fs.protocol}") diff --git a/dataframely/_storage/parquet.py b/dataframely/_storage/parquet.py index fc8ba46..bc870fb 100644 --- a/dataframely/_storage/parquet.py +++ b/dataframely/_storage/parquet.py @@ -7,6 +7,8 @@ import polars as pl from fsspec import AbstractFileSystem, url_to_fs +from dataframely._storage import get_file_prefix + from ._base import ( SerializedCollection, SerializedRules, @@ -155,15 +157,7 @@ def _collection_from_parquet( if is_file: collection_types.append(_read_serialized_collection(source_path)) else: - prefix = ( - "" - if fs.protocol == "file" - else ( - f"{fs.protocol}://" - if isinstance(fs.protocol, str) - else f"{fs.protocol[0]}://" - ) - ) + prefix = get_file_prefix(fs) for file in fs.glob(fs.sep.join([source_path, "**", "*.parquet"])): collection_types.append( _read_serialized_collection(f"{prefix}{file}") diff --git a/dataframely/testing/storage.py b/dataframely/testing/storage.py index aa571a5..e09c247 100644 --- a/dataframely/testing/storage.py +++ b/dataframely/testing/storage.py @@ -10,6 +10,7 @@ import dataframely as dy from dataframely import FailureInfo, Validation from dataframely._compat import deltalake +from dataframely._storage import get_file_prefix from dataframely._storage.delta import _to_delta_table # ----------------------------------- Schema ------------------------------------------- @@ -190,21 +191,7 @@ def set_metadata(self, path: str, metadata: dict[str, Any]) -> None: metadata.""" def _prefix_path(self, path: str, fs: AbstractFileSystem) -> str: - return f"{self._get_prefix(fs)}{path}" - - @staticmethod - def _get_prefix(fs: AbstractFileSystem) -> str: - match fs.protocol: - case "file": - return "" - case str(): - return f"{fs.protocol}://" - case ["file", *_]: - return "" - case [proto, *_]: - return f"{proto}://" - case _: - raise ValueError(f"Unexpected fs.protocol: {fs.protocol}") + return f"{get_file_prefix(fs)}{path}" class ParquetCollectionStorageTester(CollectionStorageTester):