From e63feff3a005a73e90d9aca70b206d425f9b82f3 Mon Sep 17 00:00:00 2001 From: kanthi subramanian Date: Sun, 1 Feb 2026 14:32:51 -0600 Subject: [PATCH 1/3] Added implementation to delete file. --- pyiceberg/table/__init__.py | 77 ++++++++++++++++++++++++++ tests/integration/test_add_files.py | 86 +++++++++++++++++++++++++++++ 2 files changed, 163 insertions(+) diff --git a/pyiceberg/table/__init__.py b/pyiceberg/table/__init__.py index ae5eb400d8..67a0b8c60d 100644 --- a/pyiceberg/table/__init__.py +++ b/pyiceberg/table/__init__.py @@ -936,6 +936,40 @@ def add_files( for data_file in data_files: append_files.append_data_file(data_file) + def delete_files( + self, + file_paths: list[str], + snapshot_properties: dict[str, str] = EMPTY_DICT, + branch: str | None = MAIN_BRANCH, + ) -> None: + """ + Shorthand API for removing data files from the table transaction by their paths. + + Args: + file_paths: The list of full file paths to be removed from the table + snapshot_properties: Custom properties to be added to the snapshot summary + branch: Branch to delete files from + + Raises: + ValueError: If file_paths contains duplicates + ValueError: If any file paths are not found in the table + """ + if len(file_paths) != len(set(file_paths)): + raise ValueError("File paths must be unique") + + file_paths_set = set(file_paths) + data_files = _get_data_files_from_snapshot( + table_metadata=self.table_metadata, file_paths=file_paths_set, io=self._table.io, branch=branch + ) + + missing_files = file_paths_set - set(data_files.keys()) + if missing_files: + raise ValueError(f"Cannot delete files that are not referenced by table, files: {', '.join(sorted(missing_files))}") + + with self.update_snapshot(snapshot_properties=snapshot_properties, branch=branch).overwrite() as overwrite_snapshot: + for data_file in data_files.values(): + overwrite_snapshot.delete_data_file(data_file) + def update_spec(self) -> UpdateSpec: """Create a new UpdateSpec to update the partitioning of the table. @@ -1506,6 +1540,31 @@ def add_files( branch=branch, ) + def delete_files( + self, + file_paths: list[str], + snapshot_properties: dict[str, str] = EMPTY_DICT, + branch: str | None = MAIN_BRANCH, + ) -> None: + """ + Shorthand API for removing data files from the table by their paths. + + Args: + file_paths: The list of full file paths to be removed from the table + snapshot_properties: Custom properties to be added to the snapshot summary + branch: Branch to delete files from + + Raises: + ValueError: If file_paths contains duplicates + ValueError: If any file paths are not found in the table + """ + with self.transaction() as tx: + tx.delete_files( + file_paths=file_paths, + snapshot_properties=snapshot_properties, + branch=branch, + ) + def update_spec(self, case_sensitive: bool = True) -> UpdateSpec: return UpdateSpec(Transaction(self, autocommit=True), case_sensitive=case_sensitive) @@ -2175,3 +2234,21 @@ def _parquet_files_to_data_files(table_metadata: TableMetadata, file_paths: list futures = [executor.submit(parquet_file_to_data_file, io, table_metadata, file_path) for file_path in file_paths] return [f.result() for f in futures if f.result()] + + +def _get_data_files_from_snapshot( + table_metadata: TableMetadata, file_paths: set[str], io: FileIO, branch: str | None = MAIN_BRANCH +) -> dict[str, DataFile]: + snapshot = table_metadata.snapshot_by_name(branch) if branch else table_metadata.current_snapshot() + if snapshot is None: + return {} + + result: dict[str, DataFile] = {} + for manifest in snapshot.manifests(io): + if manifest.content == ManifestContent.DATA: + for entry in manifest.fetch_manifest_entry(io, discard_deleted=True): + if entry.data_file.file_path in file_paths: + result[entry.data_file.file_path] = entry.data_file + if len(result) == len(file_paths): + return result + return result diff --git a/tests/integration/test_add_files.py b/tests/integration/test_add_files.py index 86ef05e5f4..549d1fdcbe 100644 --- a/tests/integration/test_add_files.py +++ b/tests/integration/test_add_files.py @@ -1040,3 +1040,89 @@ def test_add_files_to_branch(spark: SparkSession, session_catalog: Catalog, form for col in branch_df.columns: assert branch_df.filter(branch_df[col].isNotNull()).count() == 6, "Expected all 6 rows to be non-null" + + +@pytest.mark.integration +def test_delete_files_from_unpartitioned_table(spark: SparkSession, session_catalog: Catalog, format_version: int) -> None: + identifier = f"default.delete_files_unpartitioned_v{format_version}" + tbl = _create_table(session_catalog, identifier, format_version) + + file_paths = [f"s3://warehouse/default/delete_unpartitioned/v{format_version}/test-{i}.parquet" for i in range(5)] + for file_path in file_paths: + _write_parquet(tbl.io, file_path, ARROW_SCHEMA, ARROW_TABLE) + + tbl.add_files(file_paths=file_paths) + assert len(tbl.scan().to_arrow()) == 5 + + tbl.delete_files(file_paths=file_paths[:2]) + + rows = spark.sql( + f""" + SELECT added_data_files_count, existing_data_files_count, deleted_data_files_count + FROM {identifier}.all_manifests + """ + ).collect() + + assert sum(row.deleted_data_files_count for row in rows) == 2 + + df = spark.table(identifier) + assert df.count() == 3 + + assert len(tbl.scan().to_arrow()) == 3 + + +@pytest.mark.integration +def test_delete_files_raises_on_nonexistent_file(session_catalog: Catalog, format_version: int) -> None: + identifier = f"default.delete_files_nonexistent_v{format_version}" + tbl = _create_table(session_catalog, identifier, format_version) + + file_paths = [f"s3://warehouse/default/delete_nonexistent/v{format_version}/test-{i}.parquet" for i in range(3)] + for file_path in file_paths: + _write_parquet(tbl.io, file_path, ARROW_SCHEMA, ARROW_TABLE) + + tbl.add_files(file_paths=file_paths) + + with pytest.raises(ValueError, match="Cannot delete files that are not referenced by table"): + tbl.delete_files(file_paths=["s3://warehouse/default/does-not-exist.parquet"]) + + +@pytest.mark.integration +def test_delete_files_raises_on_duplicate_paths(session_catalog: Catalog, format_version: int) -> None: + identifier = f"default.delete_files_duplicate_v{format_version}" + tbl = _create_table(session_catalog, identifier, format_version) + + file_path = f"s3://warehouse/default/delete_duplicate/v{format_version}/test.parquet" + _write_parquet(tbl.io, file_path, ARROW_SCHEMA, ARROW_TABLE) + + tbl.add_files(file_paths=[file_path]) + + with pytest.raises(ValueError, match="File paths must be unique"): + tbl.delete_files(file_paths=[file_path, file_path]) + + +@pytest.mark.integration +def test_delete_files_from_branch(spark: SparkSession, session_catalog: Catalog, format_version: int) -> None: + identifier = f"default.delete_files_branch_v{format_version}" + branch = "branch1" + + tbl = _create_table(session_catalog, identifier, format_version) + + file_paths = [f"s3://warehouse/default/delete_branch/v{format_version}/test-{i}.parquet" for i in range(5)] + for file_path in file_paths: + _write_parquet(tbl.io, file_path, ARROW_SCHEMA, ARROW_TABLE) + + tbl.append(ARROW_TABLE) + assert tbl.metadata.current_snapshot_id is not None + tbl.manage_snapshots().create_branch(snapshot_id=tbl.metadata.current_snapshot_id, branch_name=branch).commit() + + tbl.add_files(file_paths=file_paths, branch=branch) + branch_df = spark.table(f"{identifier}.branch_{branch}") + assert branch_df.count() == 6 + + tbl.delete_files(file_paths=file_paths[:3], branch=branch) + + branch_df = spark.table(f"{identifier}.branch_{branch}") + assert branch_df.count() == 3 + + main_df = spark.table(identifier) + assert main_df.count() == 1 From a3952c28ccac5a9f35658c9aa568553d02b7f2b2 Mon Sep 17 00:00:00 2001 From: kanthi subramanian Date: Fri, 6 Feb 2026 19:39:02 -0600 Subject: [PATCH 2/3] moved tests to test_deletes.py --- pyiceberg/table/__init__.py | 9 +- tests/integration/test_add_files.py | 86 --------------- tests/integration/test_deletes.py | 164 +++++++++++++++++++++++++++- 3 files changed, 165 insertions(+), 94 deletions(-) diff --git a/pyiceberg/table/__init__.py b/pyiceberg/table/__init__.py index 67a0b8c60d..afa193b113 100644 --- a/pyiceberg/table/__init__.py +++ b/pyiceberg/table/__init__.py @@ -954,15 +954,16 @@ def delete_files( ValueError: If file_paths contains duplicates ValueError: If any file paths are not found in the table """ - if len(file_paths) != len(set(file_paths)): + unique_file_paths = set(file_paths) + + if len(file_paths) != len(unique_file_paths): raise ValueError("File paths must be unique") - file_paths_set = set(file_paths) data_files = _get_data_files_from_snapshot( - table_metadata=self.table_metadata, file_paths=file_paths_set, io=self._table.io, branch=branch + table_metadata=self.table_metadata, file_paths=unique_file_paths, io=self._table.io, branch=branch ) - missing_files = file_paths_set - set(data_files.keys()) + missing_files = unique_file_paths - set(data_files.keys()) if missing_files: raise ValueError(f"Cannot delete files that are not referenced by table, files: {', '.join(sorted(missing_files))}") diff --git a/tests/integration/test_add_files.py b/tests/integration/test_add_files.py index 549d1fdcbe..86ef05e5f4 100644 --- a/tests/integration/test_add_files.py +++ b/tests/integration/test_add_files.py @@ -1040,89 +1040,3 @@ def test_add_files_to_branch(spark: SparkSession, session_catalog: Catalog, form for col in branch_df.columns: assert branch_df.filter(branch_df[col].isNotNull()).count() == 6, "Expected all 6 rows to be non-null" - - -@pytest.mark.integration -def test_delete_files_from_unpartitioned_table(spark: SparkSession, session_catalog: Catalog, format_version: int) -> None: - identifier = f"default.delete_files_unpartitioned_v{format_version}" - tbl = _create_table(session_catalog, identifier, format_version) - - file_paths = [f"s3://warehouse/default/delete_unpartitioned/v{format_version}/test-{i}.parquet" for i in range(5)] - for file_path in file_paths: - _write_parquet(tbl.io, file_path, ARROW_SCHEMA, ARROW_TABLE) - - tbl.add_files(file_paths=file_paths) - assert len(tbl.scan().to_arrow()) == 5 - - tbl.delete_files(file_paths=file_paths[:2]) - - rows = spark.sql( - f""" - SELECT added_data_files_count, existing_data_files_count, deleted_data_files_count - FROM {identifier}.all_manifests - """ - ).collect() - - assert sum(row.deleted_data_files_count for row in rows) == 2 - - df = spark.table(identifier) - assert df.count() == 3 - - assert len(tbl.scan().to_arrow()) == 3 - - -@pytest.mark.integration -def test_delete_files_raises_on_nonexistent_file(session_catalog: Catalog, format_version: int) -> None: - identifier = f"default.delete_files_nonexistent_v{format_version}" - tbl = _create_table(session_catalog, identifier, format_version) - - file_paths = [f"s3://warehouse/default/delete_nonexistent/v{format_version}/test-{i}.parquet" for i in range(3)] - for file_path in file_paths: - _write_parquet(tbl.io, file_path, ARROW_SCHEMA, ARROW_TABLE) - - tbl.add_files(file_paths=file_paths) - - with pytest.raises(ValueError, match="Cannot delete files that are not referenced by table"): - tbl.delete_files(file_paths=["s3://warehouse/default/does-not-exist.parquet"]) - - -@pytest.mark.integration -def test_delete_files_raises_on_duplicate_paths(session_catalog: Catalog, format_version: int) -> None: - identifier = f"default.delete_files_duplicate_v{format_version}" - tbl = _create_table(session_catalog, identifier, format_version) - - file_path = f"s3://warehouse/default/delete_duplicate/v{format_version}/test.parquet" - _write_parquet(tbl.io, file_path, ARROW_SCHEMA, ARROW_TABLE) - - tbl.add_files(file_paths=[file_path]) - - with pytest.raises(ValueError, match="File paths must be unique"): - tbl.delete_files(file_paths=[file_path, file_path]) - - -@pytest.mark.integration -def test_delete_files_from_branch(spark: SparkSession, session_catalog: Catalog, format_version: int) -> None: - identifier = f"default.delete_files_branch_v{format_version}" - branch = "branch1" - - tbl = _create_table(session_catalog, identifier, format_version) - - file_paths = [f"s3://warehouse/default/delete_branch/v{format_version}/test-{i}.parquet" for i in range(5)] - for file_path in file_paths: - _write_parquet(tbl.io, file_path, ARROW_SCHEMA, ARROW_TABLE) - - tbl.append(ARROW_TABLE) - assert tbl.metadata.current_snapshot_id is not None - tbl.manage_snapshots().create_branch(snapshot_id=tbl.metadata.current_snapshot_id, branch_name=branch).commit() - - tbl.add_files(file_paths=file_paths, branch=branch) - branch_df = spark.table(f"{identifier}.branch_{branch}") - assert branch_df.count() == 6 - - tbl.delete_files(file_paths=file_paths[:3], branch=branch) - - branch_df = spark.table(f"{identifier}.branch_{branch}") - assert branch_df.count() == 3 - - main_df = spark.table(identifier) - assert main_df.count() == 1 diff --git a/tests/integration/test_deletes.py b/tests/integration/test_deletes.py index e3b487e465..54be0e5f73 100644 --- a/tests/integration/test_deletes.py +++ b/tests/integration/test_deletes.py @@ -15,23 +15,83 @@ # specific language governing permissions and limitations # under the License. # pylint:disable=redefined-outer-name -from collections.abc import Generator -from datetime import datetime +from collections.abc import Generator, Iterator +from datetime import date, datetime import pyarrow as pa +import pyarrow.parquet as pq import pytest from pyspark.sql import SparkSession +from pyiceberg.catalog import Catalog from pyiceberg.catalog.rest import RestCatalog from pyiceberg.exceptions import NoSuchTableError from pyiceberg.expressions import AlwaysTrue, EqualTo, LessThanOrEqual +from pyiceberg.io import FileIO from pyiceberg.manifest import ManifestEntryStatus -from pyiceberg.partitioning import PartitionField, PartitionSpec +from pyiceberg.partitioning import UNPARTITIONED_PARTITION_SPEC, PartitionField, PartitionSpec from pyiceberg.schema import Schema from pyiceberg.table import Table from pyiceberg.table.snapshots import Operation, Summary from pyiceberg.transforms import IdentityTransform -from pyiceberg.types import FloatType, IntegerType, LongType, NestedField, StringType, TimestampType +from pyiceberg.types import BooleanType, DateType, FloatType, IntegerType, LongType, NestedField, StringType, TimestampType + + +# Schema and data used by delete_files tests (moved from test_add_files) +TABLE_SCHEMA_DELETE_FILES = Schema( + NestedField(field_id=1, name="foo", field_type=BooleanType(), required=False), + NestedField(field_id=2, name="bar", field_type=StringType(), required=False), + NestedField(field_id=4, name="baz", field_type=IntegerType(), required=False), + NestedField(field_id=10, name="qux", field_type=DateType(), required=False), +) + +ARROW_SCHEMA_DELETE_FILES = pa.schema( + [ + ("foo", pa.bool_()), + ("bar", pa.string()), + ("baz", pa.int32()), + ("qux", pa.date32()), + ] +) + +ARROW_TABLE_DELETE_FILES = pa.Table.from_pylist( + [ + { + "foo": True, + "bar": "bar_string", + "baz": 123, + "qux": date(2024, 3, 7), + } + ], + schema=ARROW_SCHEMA_DELETE_FILES, +) + + +def _write_parquet(io: FileIO, file_path: str, arrow_schema: pa.Schema, arrow_table: pa.Table) -> None: + fo = io.new_output(file_path) + with fo.create(overwrite=True) as fos: + with pq.ParquetWriter(fos, schema=arrow_schema) as writer: + writer.write_table(arrow_table) + + +def _create_table_for_delete_files( + session_catalog: Catalog, + identifier: str, + format_version: int, + partition_spec: PartitionSpec = UNPARTITIONED_PARTITION_SPEC, + schema: Schema = TABLE_SCHEMA_DELETE_FILES, +) -> Table: + try: + session_catalog.drop_table(identifier=identifier) + except NoSuchTableError: + pass + + return session_catalog.create_table( + identifier=identifier, + schema=schema, + properties={"format-version": str(format_version)}, + partition_spec=partition_spec, + ) def run_spark_commands(spark: SparkSession, sqls: list[str]) -> None: @@ -57,6 +117,12 @@ def test_table(session_catalog: RestCatalog) -> Generator[Table, None, None]: session_catalog.drop_table(identifier) +@pytest.fixture(name="format_version", params=[pytest.param(1, id="format_version=1"), pytest.param(2, id="format_version=2")]) +def format_version_fixture(request: "pytest.FixtureRequest") -> Iterator[int]: + """Fixture to run tests with different table format versions (for delete_files tests).""" + yield request.param + + @pytest.mark.integration @pytest.mark.parametrize("format_version", [1, 2]) def test_partitioned_table_delete_full_file(spark: SparkSession, session_catalog: RestCatalog, format_version: int) -> None: @@ -975,3 +1041,93 @@ def assert_manifest_entry(expected_status: ManifestEntryStatus, expected_snapsho assert after_delete_snapshot is not None assert_manifest_entry(ManifestEntryStatus.DELETED, after_delete_snapshot.snapshot_id) + + +@pytest.mark.integration +def test_delete_files_from_unpartitioned_table( + spark: SparkSession, session_catalog: Catalog, format_version: int +) -> None: + identifier = f"default.delete_files_unpartitioned_v{format_version}" + tbl = _create_table_for_delete_files(session_catalog, identifier, format_version) + + file_paths = [f"s3://warehouse/default/delete_unpartitioned/v{format_version}/test-{i}.parquet" for i in range(5)] + for file_path in file_paths: + _write_parquet(tbl.io, file_path, ARROW_SCHEMA_DELETE_FILES, ARROW_TABLE_DELETE_FILES) + + tbl.add_files(file_paths=file_paths) + assert len(tbl.scan().to_arrow()) == 5 + + tbl.delete_files(file_paths=file_paths[:2]) + + rows = spark.sql( + f""" + SELECT added_data_files_count, existing_data_files_count, deleted_data_files_count + FROM {identifier}.all_manifests + """ + ).collect() + + assert sum(row.deleted_data_files_count for row in rows) == 2 + + df = spark.table(identifier) + assert df.count() == 3 + + assert len(tbl.scan().to_arrow()) == 3 + + +@pytest.mark.integration +def test_delete_files_raises_on_nonexistent_file(session_catalog: Catalog, format_version: int) -> None: + identifier = f"default.delete_files_nonexistent_v{format_version}" + tbl = _create_table_for_delete_files(session_catalog, identifier, format_version) + + file_paths = [f"s3://warehouse/default/delete_nonexistent/v{format_version}/test-{i}.parquet" for i in range(3)] + for file_path in file_paths: + _write_parquet(tbl.io, file_path, ARROW_SCHEMA_DELETE_FILES, ARROW_TABLE_DELETE_FILES) + + tbl.add_files(file_paths=file_paths) + + with pytest.raises(ValueError, match="Cannot delete files that are not referenced by table"): + tbl.delete_files(file_paths=["s3://warehouse/default/does-not-exist.parquet"]) + + +@pytest.mark.integration +def test_delete_files_raises_on_duplicate_paths(session_catalog: Catalog, format_version: int) -> None: + identifier = f"default.delete_files_duplicate_v{format_version}" + tbl = _create_table_for_delete_files(session_catalog, identifier, format_version) + + file_path = f"s3://warehouse/default/delete_duplicate/v{format_version}/test.parquet" + _write_parquet(tbl.io, file_path, ARROW_SCHEMA_DELETE_FILES, ARROW_TABLE_DELETE_FILES) + + tbl.add_files(file_paths=[file_path]) + + with pytest.raises(ValueError, match="File paths must be unique"): + tbl.delete_files(file_paths=[file_path, file_path]) + + +@pytest.mark.integration +def test_delete_files_from_branch( + spark: SparkSession, session_catalog: Catalog, format_version: int +) -> None: + identifier = f"default.delete_files_branch_v{format_version}" + branch = "branch1" + + tbl = _create_table_for_delete_files(session_catalog, identifier, format_version) + + file_paths = [f"s3://warehouse/default/delete_branch/v{format_version}/test-{i}.parquet" for i in range(5)] + for file_path in file_paths: + _write_parquet(tbl.io, file_path, ARROW_SCHEMA_DELETE_FILES, ARROW_TABLE_DELETE_FILES) + + tbl.append(ARROW_TABLE_DELETE_FILES) + assert tbl.metadata.current_snapshot_id is not None + tbl.manage_snapshots().create_branch(snapshot_id=tbl.metadata.current_snapshot_id, branch_name=branch).commit() + + tbl.add_files(file_paths=file_paths, branch=branch) + branch_df = spark.table(f"{identifier}.branch_{branch}") + assert branch_df.count() == 6 + + tbl.delete_files(file_paths=file_paths[:3], branch=branch) + + branch_df = spark.table(f"{identifier}.branch_{branch}") + assert branch_df.count() == 3 + + main_df = spark.table(identifier) + assert main_df.count() == 1 From 5ee0ca667a38998624090df75f863f90edb77e76 Mon Sep 17 00:00:00 2001 From: kanthi subramanian Date: Fri, 6 Feb 2026 22:06:22 -0600 Subject: [PATCH 3/3] Fixed lint errors. --- tests/integration/test_deletes.py | 9 ++------- 1 file changed, 2 insertions(+), 7 deletions(-) diff --git a/tests/integration/test_deletes.py b/tests/integration/test_deletes.py index 54be0e5f73..69a1f61fe0 100644 --- a/tests/integration/test_deletes.py +++ b/tests/integration/test_deletes.py @@ -36,7 +36,6 @@ from pyiceberg.transforms import IdentityTransform from pyiceberg.types import BooleanType, DateType, FloatType, IntegerType, LongType, NestedField, StringType, TimestampType - # Schema and data used by delete_files tests (moved from test_add_files) TABLE_SCHEMA_DELETE_FILES = Schema( NestedField(field_id=1, name="foo", field_type=BooleanType(), required=False), @@ -1044,9 +1043,7 @@ def assert_manifest_entry(expected_status: ManifestEntryStatus, expected_snapsho @pytest.mark.integration -def test_delete_files_from_unpartitioned_table( - spark: SparkSession, session_catalog: Catalog, format_version: int -) -> None: +def test_delete_files_from_unpartitioned_table(spark: SparkSession, session_catalog: Catalog, format_version: int) -> None: identifier = f"default.delete_files_unpartitioned_v{format_version}" tbl = _create_table_for_delete_files(session_catalog, identifier, format_version) @@ -1104,9 +1101,7 @@ def test_delete_files_raises_on_duplicate_paths(session_catalog: Catalog, format @pytest.mark.integration -def test_delete_files_from_branch( - spark: SparkSession, session_catalog: Catalog, format_version: int -) -> None: +def test_delete_files_from_branch(spark: SparkSession, session_catalog: Catalog, format_version: int) -> None: identifier = f"default.delete_files_branch_v{format_version}" branch = "branch1"