Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 2 additions & 3 deletions dataframely/_storage/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@
# SPDX-License-Identifier: BSD-3-Clause

from ._base import StorageBackend
from ._fsspec import get_file_prefix

__all__ = [
"StorageBackend",
]
__all__ = ["StorageBackend", "get_file_prefix"]
18 changes: 18 additions & 0 deletions dataframely/_storage/_fsspec.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
# Copyright (c) QuantCo 2025-2026
# SPDX-License-Identifier: BSD-3-Clause

from fsspec import AbstractFileSystem


def get_file_prefix(fs: AbstractFileSystem) -> str:
match fs.protocol:
case "file":
return ""
case str():
return f"{fs.protocol}://"
case ["file", *_]:
return ""
case [proto, *_]:
return f"{proto}://"
case _:
raise ValueError(f"Unexpected fs.protocol: {fs.protocol}")
12 changes: 3 additions & 9 deletions dataframely/_storage/parquet.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@
import polars as pl
from fsspec import AbstractFileSystem, url_to_fs

from dataframely._storage import get_file_prefix

from ._base import (
SerializedCollection,
SerializedRules,
Expand Down Expand Up @@ -155,15 +157,7 @@ def _collection_from_parquet(
if is_file:
collection_types.append(_read_serialized_collection(source_path))
else:
prefix = (
""
if fs.protocol == "file"
else (
f"{fs.protocol}://"
if isinstance(fs.protocol, str)
else f"{fs.protocol[0]}://"
)
)
prefix = get_file_prefix(fs)
for file in fs.glob(fs.sep.join([source_path, "**", "*.parquet"])):
collection_types.append(
_read_serialized_collection(f"{prefix}{file}")
Expand Down
17 changes: 2 additions & 15 deletions dataframely/testing/storage.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
import dataframely as dy
from dataframely import FailureInfo, Validation
from dataframely._compat import deltalake
from dataframely._storage import get_file_prefix
from dataframely._storage.delta import _to_delta_table

# ----------------------------------- Schema -------------------------------------------
Expand Down Expand Up @@ -190,21 +191,7 @@ def set_metadata(self, path: str, metadata: dict[str, Any]) -> None:
metadata."""

def _prefix_path(self, path: str, fs: AbstractFileSystem) -> str:
return f"{self._get_prefix(fs)}{path}"

@staticmethod
def _get_prefix(fs: AbstractFileSystem) -> str:
match fs.protocol:
case "file":
return ""
case str():
return f"{fs.protocol}://"
case ["file", *_]:
return ""
case [proto, *_]:
return f"{proto}://"
case _:
raise ValueError(f"Unexpected fs.protocol: {fs.protocol}")
return f"{get_file_prefix(fs)}{path}"


class ParquetCollectionStorageTester(CollectionStorageTester):
Expand Down