diff --git a/pyiceberg/table/__init__.py b/pyiceberg/table/__init__.py index ae5eb400d8..afa193b113 100644 --- a/pyiceberg/table/__init__.py +++ b/pyiceberg/table/__init__.py @@ -936,6 +936,41 @@ def add_files( for data_file in data_files: append_files.append_data_file(data_file) + def delete_files( + self, + file_paths: list[str], + snapshot_properties: dict[str, str] = EMPTY_DICT, + branch: str | None = MAIN_BRANCH, + ) -> None: + """ + Shorthand API for removing data files from the table transaction by their paths. + + Args: + file_paths: The list of full file paths to be removed from the table + snapshot_properties: Custom properties to be added to the snapshot summary + branch: Branch to delete files from + + Raises: + ValueError: If file_paths contains duplicates + ValueError: If any file paths are not found in the table + """ + unique_file_paths = set(file_paths) + + if len(file_paths) != len(unique_file_paths): + raise ValueError("File paths must be unique") + + data_files = _get_data_files_from_snapshot( + table_metadata=self.table_metadata, file_paths=unique_file_paths, io=self._table.io, branch=branch + ) + + missing_files = unique_file_paths - set(data_files.keys()) + if missing_files: + raise ValueError(f"Cannot delete files that are not referenced by table, files: {', '.join(sorted(missing_files))}") + + with self.update_snapshot(snapshot_properties=snapshot_properties, branch=branch).overwrite() as overwrite_snapshot: + for data_file in data_files.values(): + overwrite_snapshot.delete_data_file(data_file) + def update_spec(self) -> UpdateSpec: """Create a new UpdateSpec to update the partitioning of the table. @@ -1506,6 +1541,31 @@ def add_files( branch=branch, ) + def delete_files( + self, + file_paths: list[str], + snapshot_properties: dict[str, str] = EMPTY_DICT, + branch: str | None = MAIN_BRANCH, + ) -> None: + """ + Shorthand API for removing data files from the table by their paths. + + Args: + file_paths: The list of full file paths to be removed from the table + snapshot_properties: Custom properties to be added to the snapshot summary + branch: Branch to delete files from + + Raises: + ValueError: If file_paths contains duplicates + ValueError: If any file paths are not found in the table + """ + with self.transaction() as tx: + tx.delete_files( + file_paths=file_paths, + snapshot_properties=snapshot_properties, + branch=branch, + ) + def update_spec(self, case_sensitive: bool = True) -> UpdateSpec: return UpdateSpec(Transaction(self, autocommit=True), case_sensitive=case_sensitive) @@ -2175,3 +2235,21 @@ def _parquet_files_to_data_files(table_metadata: TableMetadata, file_paths: list futures = [executor.submit(parquet_file_to_data_file, io, table_metadata, file_path) for file_path in file_paths] return [f.result() for f in futures if f.result()] + + +def _get_data_files_from_snapshot( + table_metadata: TableMetadata, file_paths: set[str], io: FileIO, branch: str | None = MAIN_BRANCH +) -> dict[str, DataFile]: + snapshot = table_metadata.snapshot_by_name(branch) if branch else table_metadata.current_snapshot() + if snapshot is None: + return {} + + result: dict[str, DataFile] = {} + for manifest in snapshot.manifests(io): + if manifest.content == ManifestContent.DATA: + for entry in manifest.fetch_manifest_entry(io, discard_deleted=True): + if entry.data_file.file_path in file_paths: + result[entry.data_file.file_path] = entry.data_file + if len(result) == len(file_paths): + return result + return result diff --git a/tests/integration/test_deletes.py b/tests/integration/test_deletes.py index e3b487e465..69a1f61fe0 100644 --- a/tests/integration/test_deletes.py +++ b/tests/integration/test_deletes.py @@ -15,23 +15,82 @@ # specific language governing permissions and limitations # under the License. # pylint:disable=redefined-outer-name -from collections.abc import Generator -from datetime import datetime +from collections.abc import Generator, Iterator +from datetime import date, datetime import pyarrow as pa +import pyarrow.parquet as pq import pytest from pyspark.sql import SparkSession +from pyiceberg.catalog import Catalog from pyiceberg.catalog.rest import RestCatalog from pyiceberg.exceptions import NoSuchTableError from pyiceberg.expressions import AlwaysTrue, EqualTo, LessThanOrEqual +from pyiceberg.io import FileIO from pyiceberg.manifest import ManifestEntryStatus -from pyiceberg.partitioning import PartitionField, PartitionSpec +from pyiceberg.partitioning import UNPARTITIONED_PARTITION_SPEC, PartitionField, PartitionSpec from pyiceberg.schema import Schema from pyiceberg.table import Table from pyiceberg.table.snapshots import Operation, Summary from pyiceberg.transforms import IdentityTransform -from pyiceberg.types import FloatType, IntegerType, LongType, NestedField, StringType, TimestampType +from pyiceberg.types import BooleanType, DateType, FloatType, IntegerType, LongType, NestedField, StringType, TimestampType + +# Schema and data used by delete_files tests (moved from test_add_files) +TABLE_SCHEMA_DELETE_FILES = Schema( + NestedField(field_id=1, name="foo", field_type=BooleanType(), required=False), + NestedField(field_id=2, name="bar", field_type=StringType(), required=False), + NestedField(field_id=4, name="baz", field_type=IntegerType(), required=False), + NestedField(field_id=10, name="qux", field_type=DateType(), required=False), +) + +ARROW_SCHEMA_DELETE_FILES = pa.schema( + [ + ("foo", pa.bool_()), + ("bar", pa.string()), + ("baz", pa.int32()), + ("qux", pa.date32()), + ] +) + +ARROW_TABLE_DELETE_FILES = pa.Table.from_pylist( + [ + { + "foo": True, + "bar": "bar_string", + "baz": 123, + "qux": date(2024, 3, 7), + } + ], + schema=ARROW_SCHEMA_DELETE_FILES, +) + + +def _write_parquet(io: FileIO, file_path: str, arrow_schema: pa.Schema, arrow_table: pa.Table) -> None: + fo = io.new_output(file_path) + with fo.create(overwrite=True) as fos: + with pq.ParquetWriter(fos, schema=arrow_schema) as writer: + writer.write_table(arrow_table) + + +def _create_table_for_delete_files( + session_catalog: Catalog, + identifier: str, + format_version: int, + partition_spec: PartitionSpec = UNPARTITIONED_PARTITION_SPEC, + schema: Schema = TABLE_SCHEMA_DELETE_FILES, +) -> Table: + try: + session_catalog.drop_table(identifier=identifier) + except NoSuchTableError: + pass + + return session_catalog.create_table( + identifier=identifier, + schema=schema, + properties={"format-version": str(format_version)}, + partition_spec=partition_spec, + ) def run_spark_commands(spark: SparkSession, sqls: list[str]) -> None: @@ -57,6 +116,12 @@ def test_table(session_catalog: RestCatalog) -> Generator[Table, None, None]: session_catalog.drop_table(identifier) +@pytest.fixture(name="format_version", params=[pytest.param(1, id="format_version=1"), pytest.param(2, id="format_version=2")]) +def format_version_fixture(request: "pytest.FixtureRequest") -> Iterator[int]: + """Fixture to run tests with different table format versions (for delete_files tests).""" + yield request.param + + @pytest.mark.integration @pytest.mark.parametrize("format_version", [1, 2]) def test_partitioned_table_delete_full_file(spark: SparkSession, session_catalog: RestCatalog, format_version: int) -> None: @@ -975,3 +1040,89 @@ def assert_manifest_entry(expected_status: ManifestEntryStatus, expected_snapsho assert after_delete_snapshot is not None assert_manifest_entry(ManifestEntryStatus.DELETED, after_delete_snapshot.snapshot_id) + + +@pytest.mark.integration +def test_delete_files_from_unpartitioned_table(spark: SparkSession, session_catalog: Catalog, format_version: int) -> None: + identifier = f"default.delete_files_unpartitioned_v{format_version}" + tbl = _create_table_for_delete_files(session_catalog, identifier, format_version) + + file_paths = [f"s3://warehouse/default/delete_unpartitioned/v{format_version}/test-{i}.parquet" for i in range(5)] + for file_path in file_paths: + _write_parquet(tbl.io, file_path, ARROW_SCHEMA_DELETE_FILES, ARROW_TABLE_DELETE_FILES) + + tbl.add_files(file_paths=file_paths) + assert len(tbl.scan().to_arrow()) == 5 + + tbl.delete_files(file_paths=file_paths[:2]) + + rows = spark.sql( + f""" + SELECT added_data_files_count, existing_data_files_count, deleted_data_files_count + FROM {identifier}.all_manifests + """ + ).collect() + + assert sum(row.deleted_data_files_count for row in rows) == 2 + + df = spark.table(identifier) + assert df.count() == 3 + + assert len(tbl.scan().to_arrow()) == 3 + + +@pytest.mark.integration +def test_delete_files_raises_on_nonexistent_file(session_catalog: Catalog, format_version: int) -> None: + identifier = f"default.delete_files_nonexistent_v{format_version}" + tbl = _create_table_for_delete_files(session_catalog, identifier, format_version) + + file_paths = [f"s3://warehouse/default/delete_nonexistent/v{format_version}/test-{i}.parquet" for i in range(3)] + for file_path in file_paths: + _write_parquet(tbl.io, file_path, ARROW_SCHEMA_DELETE_FILES, ARROW_TABLE_DELETE_FILES) + + tbl.add_files(file_paths=file_paths) + + with pytest.raises(ValueError, match="Cannot delete files that are not referenced by table"): + tbl.delete_files(file_paths=["s3://warehouse/default/does-not-exist.parquet"]) + + +@pytest.mark.integration +def test_delete_files_raises_on_duplicate_paths(session_catalog: Catalog, format_version: int) -> None: + identifier = f"default.delete_files_duplicate_v{format_version}" + tbl = _create_table_for_delete_files(session_catalog, identifier, format_version) + + file_path = f"s3://warehouse/default/delete_duplicate/v{format_version}/test.parquet" + _write_parquet(tbl.io, file_path, ARROW_SCHEMA_DELETE_FILES, ARROW_TABLE_DELETE_FILES) + + tbl.add_files(file_paths=[file_path]) + + with pytest.raises(ValueError, match="File paths must be unique"): + tbl.delete_files(file_paths=[file_path, file_path]) + + +@pytest.mark.integration +def test_delete_files_from_branch(spark: SparkSession, session_catalog: Catalog, format_version: int) -> None: + identifier = f"default.delete_files_branch_v{format_version}" + branch = "branch1" + + tbl = _create_table_for_delete_files(session_catalog, identifier, format_version) + + file_paths = [f"s3://warehouse/default/delete_branch/v{format_version}/test-{i}.parquet" for i in range(5)] + for file_path in file_paths: + _write_parquet(tbl.io, file_path, ARROW_SCHEMA_DELETE_FILES, ARROW_TABLE_DELETE_FILES) + + tbl.append(ARROW_TABLE_DELETE_FILES) + assert tbl.metadata.current_snapshot_id is not None + tbl.manage_snapshots().create_branch(snapshot_id=tbl.metadata.current_snapshot_id, branch_name=branch).commit() + + tbl.add_files(file_paths=file_paths, branch=branch) + branch_df = spark.table(f"{identifier}.branch_{branch}") + assert branch_df.count() == 6 + + tbl.delete_files(file_paths=file_paths[:3], branch=branch) + + branch_df = spark.table(f"{identifier}.branch_{branch}") + assert branch_df.count() == 3 + + main_df = spark.table(identifier) + assert main_df.count() == 1