Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
78 changes: 78 additions & 0 deletions pyiceberg/table/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -936,6 +936,41 @@ def add_files(
for data_file in data_files:
append_files.append_data_file(data_file)

def delete_files(
self,
file_paths: list[str],
snapshot_properties: dict[str, str] = EMPTY_DICT,
branch: str | None = MAIN_BRANCH,
) -> None:
"""
Shorthand API for removing data files from the table transaction by their paths.

Args:
file_paths: The list of full file paths to be removed from the table
snapshot_properties: Custom properties to be added to the snapshot summary
branch: Branch to delete files from

Raises:
ValueError: If file_paths contains duplicates
ValueError: If any file paths are not found in the table
"""
unique_file_paths = set(file_paths)

if len(file_paths) != len(unique_file_paths):
raise ValueError("File paths must be unique")

data_files = _get_data_files_from_snapshot(
table_metadata=self.table_metadata, file_paths=unique_file_paths, io=self._table.io, branch=branch
)

missing_files = unique_file_paths - set(data_files.keys())
if missing_files:
raise ValueError(f"Cannot delete files that are not referenced by table, files: {', '.join(sorted(missing_files))}")

with self.update_snapshot(snapshot_properties=snapshot_properties, branch=branch).overwrite() as overwrite_snapshot:
for data_file in data_files.values():
overwrite_snapshot.delete_data_file(data_file)

def update_spec(self) -> UpdateSpec:
"""Create a new UpdateSpec to update the partitioning of the table.

Expand Down Expand Up @@ -1506,6 +1541,31 @@ def add_files(
branch=branch,
)

def delete_files(
self,
file_paths: list[str],
snapshot_properties: dict[str, str] = EMPTY_DICT,
branch: str | None = MAIN_BRANCH,
) -> None:
"""
Shorthand API for removing data files from the table by their paths.

Args:
file_paths: The list of full file paths to be removed from the table
snapshot_properties: Custom properties to be added to the snapshot summary
branch: Branch to delete files from

Raises:
ValueError: If file_paths contains duplicates
ValueError: If any file paths are not found in the table
"""
with self.transaction() as tx:
tx.delete_files(
file_paths=file_paths,
snapshot_properties=snapshot_properties,
branch=branch,
)

def update_spec(self, case_sensitive: bool = True) -> UpdateSpec:
return UpdateSpec(Transaction(self, autocommit=True), case_sensitive=case_sensitive)

Expand Down Expand Up @@ -2175,3 +2235,21 @@ def _parquet_files_to_data_files(table_metadata: TableMetadata, file_paths: list
futures = [executor.submit(parquet_file_to_data_file, io, table_metadata, file_path) for file_path in file_paths]

return [f.result() for f in futures if f.result()]


def _get_data_files_from_snapshot(
table_metadata: TableMetadata, file_paths: set[str], io: FileIO, branch: str | None = MAIN_BRANCH
) -> dict[str, DataFile]:
snapshot = table_metadata.snapshot_by_name(branch) if branch else table_metadata.current_snapshot()
if snapshot is None:
return {}

result: dict[str, DataFile] = {}
for manifest in snapshot.manifests(io):
if manifest.content == ManifestContent.DATA:
for entry in manifest.fetch_manifest_entry(io, discard_deleted=True):
if entry.data_file.file_path in file_paths:
result[entry.data_file.file_path] = entry.data_file
if len(result) == len(file_paths):
return result
return result
159 changes: 155 additions & 4 deletions tests/integration/test_deletes.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,23 +15,82 @@
# specific language governing permissions and limitations
# under the License.
# pylint:disable=redefined-outer-name
from collections.abc import Generator
from datetime import datetime
from collections.abc import Generator, Iterator
from datetime import date, datetime

import pyarrow as pa
import pyarrow.parquet as pq
import pytest
from pyspark.sql import SparkSession

from pyiceberg.catalog import Catalog
from pyiceberg.catalog.rest import RestCatalog
from pyiceberg.exceptions import NoSuchTableError
from pyiceberg.expressions import AlwaysTrue, EqualTo, LessThanOrEqual
from pyiceberg.io import FileIO
from pyiceberg.manifest import ManifestEntryStatus
from pyiceberg.partitioning import PartitionField, PartitionSpec
from pyiceberg.partitioning import UNPARTITIONED_PARTITION_SPEC, PartitionField, PartitionSpec
from pyiceberg.schema import Schema
from pyiceberg.table import Table
from pyiceberg.table.snapshots import Operation, Summary
from pyiceberg.transforms import IdentityTransform
from pyiceberg.types import FloatType, IntegerType, LongType, NestedField, StringType, TimestampType
from pyiceberg.types import BooleanType, DateType, FloatType, IntegerType, LongType, NestedField, StringType, TimestampType

# Schema and data used by delete_files tests (moved from test_add_files)
TABLE_SCHEMA_DELETE_FILES = Schema(
NestedField(field_id=1, name="foo", field_type=BooleanType(), required=False),
NestedField(field_id=2, name="bar", field_type=StringType(), required=False),
NestedField(field_id=4, name="baz", field_type=IntegerType(), required=False),
NestedField(field_id=10, name="qux", field_type=DateType(), required=False),
)

ARROW_SCHEMA_DELETE_FILES = pa.schema(
[
("foo", pa.bool_()),
("bar", pa.string()),
("baz", pa.int32()),
("qux", pa.date32()),
]
)

ARROW_TABLE_DELETE_FILES = pa.Table.from_pylist(
[
{
"foo": True,
"bar": "bar_string",
"baz": 123,
"qux": date(2024, 3, 7),
}
],
schema=ARROW_SCHEMA_DELETE_FILES,
)


def _write_parquet(io: FileIO, file_path: str, arrow_schema: pa.Schema, arrow_table: pa.Table) -> None:
fo = io.new_output(file_path)
with fo.create(overwrite=True) as fos:
with pq.ParquetWriter(fos, schema=arrow_schema) as writer:
writer.write_table(arrow_table)


def _create_table_for_delete_files(
session_catalog: Catalog,
identifier: str,
format_version: int,
partition_spec: PartitionSpec = UNPARTITIONED_PARTITION_SPEC,
schema: Schema = TABLE_SCHEMA_DELETE_FILES,
) -> Table:
try:
session_catalog.drop_table(identifier=identifier)
except NoSuchTableError:
pass

return session_catalog.create_table(
identifier=identifier,
schema=schema,
properties={"format-version": str(format_version)},
partition_spec=partition_spec,
)


def run_spark_commands(spark: SparkSession, sqls: list[str]) -> None:
Expand All @@ -57,6 +116,12 @@ def test_table(session_catalog: RestCatalog) -> Generator[Table, None, None]:
session_catalog.drop_table(identifier)


@pytest.fixture(name="format_version", params=[pytest.param(1, id="format_version=1"), pytest.param(2, id="format_version=2")])
def format_version_fixture(request: "pytest.FixtureRequest") -> Iterator[int]:
"""Fixture to run tests with different table format versions (for delete_files tests)."""
yield request.param


@pytest.mark.integration
@pytest.mark.parametrize("format_version", [1, 2])
def test_partitioned_table_delete_full_file(spark: SparkSession, session_catalog: RestCatalog, format_version: int) -> None:
Expand Down Expand Up @@ -975,3 +1040,89 @@ def assert_manifest_entry(expected_status: ManifestEntryStatus, expected_snapsho
assert after_delete_snapshot is not None

assert_manifest_entry(ManifestEntryStatus.DELETED, after_delete_snapshot.snapshot_id)


@pytest.mark.integration
def test_delete_files_from_unpartitioned_table(spark: SparkSession, session_catalog: Catalog, format_version: int) -> None:
identifier = f"default.delete_files_unpartitioned_v{format_version}"
tbl = _create_table_for_delete_files(session_catalog, identifier, format_version)

file_paths = [f"s3://warehouse/default/delete_unpartitioned/v{format_version}/test-{i}.parquet" for i in range(5)]
for file_path in file_paths:
_write_parquet(tbl.io, file_path, ARROW_SCHEMA_DELETE_FILES, ARROW_TABLE_DELETE_FILES)

tbl.add_files(file_paths=file_paths)
assert len(tbl.scan().to_arrow()) == 5

tbl.delete_files(file_paths=file_paths[:2])

rows = spark.sql(
f"""
SELECT added_data_files_count, existing_data_files_count, deleted_data_files_count
FROM {identifier}.all_manifests
"""
).collect()

assert sum(row.deleted_data_files_count for row in rows) == 2

df = spark.table(identifier)
assert df.count() == 3

assert len(tbl.scan().to_arrow()) == 3


@pytest.mark.integration
def test_delete_files_raises_on_nonexistent_file(session_catalog: Catalog, format_version: int) -> None:
identifier = f"default.delete_files_nonexistent_v{format_version}"
tbl = _create_table_for_delete_files(session_catalog, identifier, format_version)

file_paths = [f"s3://warehouse/default/delete_nonexistent/v{format_version}/test-{i}.parquet" for i in range(3)]
for file_path in file_paths:
_write_parquet(tbl.io, file_path, ARROW_SCHEMA_DELETE_FILES, ARROW_TABLE_DELETE_FILES)

tbl.add_files(file_paths=file_paths)

with pytest.raises(ValueError, match="Cannot delete files that are not referenced by table"):
tbl.delete_files(file_paths=["s3://warehouse/default/does-not-exist.parquet"])


@pytest.mark.integration
def test_delete_files_raises_on_duplicate_paths(session_catalog: Catalog, format_version: int) -> None:
identifier = f"default.delete_files_duplicate_v{format_version}"
tbl = _create_table_for_delete_files(session_catalog, identifier, format_version)

file_path = f"s3://warehouse/default/delete_duplicate/v{format_version}/test.parquet"
_write_parquet(tbl.io, file_path, ARROW_SCHEMA_DELETE_FILES, ARROW_TABLE_DELETE_FILES)

tbl.add_files(file_paths=[file_path])

with pytest.raises(ValueError, match="File paths must be unique"):
tbl.delete_files(file_paths=[file_path, file_path])


@pytest.mark.integration
def test_delete_files_from_branch(spark: SparkSession, session_catalog: Catalog, format_version: int) -> None:
identifier = f"default.delete_files_branch_v{format_version}"
branch = "branch1"

tbl = _create_table_for_delete_files(session_catalog, identifier, format_version)

file_paths = [f"s3://warehouse/default/delete_branch/v{format_version}/test-{i}.parquet" for i in range(5)]
for file_path in file_paths:
_write_parquet(tbl.io, file_path, ARROW_SCHEMA_DELETE_FILES, ARROW_TABLE_DELETE_FILES)

tbl.append(ARROW_TABLE_DELETE_FILES)
assert tbl.metadata.current_snapshot_id is not None
tbl.manage_snapshots().create_branch(snapshot_id=tbl.metadata.current_snapshot_id, branch_name=branch).commit()

tbl.add_files(file_paths=file_paths, branch=branch)
branch_df = spark.table(f"{identifier}.branch_{branch}")
assert branch_df.count() == 6

tbl.delete_files(file_paths=file_paths[:3], branch=branch)

branch_df = spark.table(f"{identifier}.branch_{branch}")
assert branch_df.count() == 3

main_df = spark.table(identifier)
assert main_df.count() == 1