Skip to content

Commit 7ec0217

Browse files
committed
perf: Speed up purge_table by deduplicating manifest reads and parallelizing file deletion
Three changes to reduce purge_table wall time from ~7s to ~0.13s (54x) on a table with 200 snapshots: 1. Deduplicate manifests by path before iterating in delete_data_files(). The same manifest appears across many snapshots' manifest lists. For 200 snapshots this reduces 20,100 manifest opens to 200. 2. Parallelize file deletion using the existing ExecutorFactory ThreadPoolExecutor, matching the pattern already used for manifest reading in plan_files() and data file reading in to_arrow(). This aligns with the Java reference implementation (CatalogUtil.dropTableData) which also deletes files concurrently via a worker thread pool. 3. Cache Avro-to-Iceberg schema conversion and reader tree resolution. All manifests of the same type share the same Avro schema, but it was being JSON-parsed, converted, and resolved into a reader tree on every open. Uses explicit threading.Lock for thread safety across all Python implementations.
1 parent 1a54e9c commit 7ec0217

2 files changed

Lines changed: 61 additions & 15 deletions

File tree

pyiceberg/avro/file.py

Lines changed: 39 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -22,11 +22,13 @@
2222
import io
2323
import json
2424
import os
25+
import threading
2526
from collections.abc import Callable
2627
from dataclasses import dataclass
2728
from enum import Enum
2829
from types import TracebackType
2930
from typing import (
31+
Any,
3032
Generic,
3133
TypeVar,
3234
)
@@ -68,6 +70,41 @@
6870
_SCHEMA_KEY = "avro.schema"
6971

7072

73+
# Cache Avro-to-Iceberg schema conversion and resolved reader trees.
74+
# Manifests of the same type share the same Avro schema, so these caches
75+
# avoid redundant JSON parsing, schema conversion, and reader tree construction.
76+
# Reader objects are stateless — read() takes a decoder and returns decoded
77+
# data without mutating self, so sharing across threads/calls is safe.
78+
# Uses explicit locking instead of lru_cache for thread safety across all
79+
# Python implementations (not just CPython).
80+
_schema_cache: dict[str, Schema] = {}
81+
_reader_cache: dict[tuple[Any, ...], Reader] = {}
82+
_cache_lock = threading.Lock()
83+
84+
85+
def _cached_avro_to_iceberg(avro_schema_string: str) -> Schema:
86+
if avro_schema_string not in _schema_cache:
87+
with _cache_lock:
88+
if avro_schema_string not in _schema_cache:
89+
avro_schema = json.loads(avro_schema_string)
90+
_schema_cache[avro_schema_string] = AvroSchemaConversion().avro_to_iceberg(avro_schema)
91+
return _schema_cache[avro_schema_string]
92+
93+
94+
def _cached_resolve_reader(
95+
file_schema: Schema,
96+
read_schema: Schema,
97+
read_types: dict[int, Callable[..., StructProtocol]],
98+
read_enums: dict[int, Callable[..., Enum]],
99+
) -> Reader:
100+
key = (str(file_schema), str(read_schema), tuple(sorted(read_types.items())), tuple(sorted(read_enums.items())))
101+
if key not in _reader_cache:
102+
with _cache_lock:
103+
if key not in _reader_cache:
104+
_reader_cache[key] = resolve_reader(file_schema, read_schema, read_types, read_enums)
105+
return _reader_cache[key]
106+
107+
71108
class AvroFileHeader(Record):
72109
@property
73110
def magic(self) -> bytes:
@@ -97,9 +134,7 @@ def compression_codec(self) -> type[Codec] | None:
97134

98135
def get_schema(self) -> Schema:
99136
if _SCHEMA_KEY in self.meta:
100-
avro_schema_string = self.meta[_SCHEMA_KEY]
101-
avro_schema = json.loads(avro_schema_string)
102-
return AvroSchemaConversion().avro_to_iceberg(avro_schema)
137+
return _cached_avro_to_iceberg(self.meta[_SCHEMA_KEY])
103138
else:
104139
raise ValueError("No schema found in Avro file headers")
105140

@@ -178,7 +213,7 @@ def __enter__(self) -> AvroFile[D]:
178213
if not self.read_schema:
179214
self.read_schema = self.schema
180215

181-
self.reader = resolve_reader(self.schema, self.read_schema, self.read_types, self.read_enums)
216+
self.reader = _cached_resolve_reader(self.schema, self.read_schema, self.read_types, self.read_enums)
182217

183218
return self
184219

pyiceberg/catalog/__init__.py

Lines changed: 22 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -90,6 +90,7 @@
9090
MANIFEST_LIST = "manifest list"
9191
PREVIOUS_METADATA = "previous metadata"
9292
METADATA = "metadata"
93+
DATA_FILE = "data"
9394
URI = "uri"
9495
LOCATION = "location"
9596
EXTERNAL_TABLE = "EXTERNAL_TABLE"
@@ -284,7 +285,7 @@ def list_catalogs() -> list[str]:
284285

285286

286287
def delete_files(io: FileIO, files_to_delete: set[str], file_type: str) -> None:
287-
"""Delete files.
288+
"""Delete files in parallel.
288289
289290
Log warnings if failing to delete any file.
290291
@@ -293,32 +294,42 @@ def delete_files(io: FileIO, files_to_delete: set[str], file_type: str) -> None:
293294
files_to_delete: A set of file paths to be deleted.
294295
file_type: The type of the file.
295296
"""
296-
for file in files_to_delete:
297+
from pyiceberg.utils.concurrent import ExecutorFactory
298+
299+
def _delete_file(file: str) -> None:
297300
try:
298301
io.delete(file)
299302
except OSError:
300303
logger.warning(f"Failed to delete {file_type} file {file}", exc_info=logger.isEnabledFor(logging.DEBUG))
301304

305+
executor = ExecutorFactory.get_or_create()
306+
list(executor.map(_delete_file, files_to_delete))
307+
302308

303309
def delete_data_files(io: FileIO, manifests_to_delete: list[ManifestFile]) -> None:
304310
"""Delete data files linked to given manifests.
305311
306-
Log warnings if failing to delete any file.
312+
Deduplicates manifests by path (the same manifest appears across many snapshots)
313+
and deletes data files in parallel.
307314
308315
Args:
309316
io: The FileIO used to delete the object.
310317
manifests_to_delete: A list of manifest contains paths of data files to be deleted.
311318
"""
312-
deleted_files: dict[str, bool] = {}
319+
# Deduplicate manifests — with N snapshots, each manifest appears in up to N snapshot
320+
# manifest lists. For 200 snapshots this reduces 20,100 manifest opens to 200.
321+
unique_manifests: dict[str, ManifestFile] = {}
313322
for manifest_file in manifests_to_delete:
323+
unique_manifests.setdefault(manifest_file.manifest_path, manifest_file)
324+
325+
# Collect all unique data file paths
326+
data_file_paths: set[str] = set()
327+
for manifest_file in unique_manifests.values():
314328
for entry in manifest_file.fetch_manifest_entry(io, discard_deleted=False):
315-
path = entry.data_file.file_path
316-
if not deleted_files.get(path, False):
317-
try:
318-
io.delete(path)
319-
except OSError:
320-
logger.warning(f"Failed to delete data file {path}", exc_info=logger.isEnabledFor(logging.DEBUG))
321-
deleted_files[path] = True
329+
data_file_paths.add(entry.data_file.file_path)
330+
331+
# Delete in parallel
332+
delete_files(io, data_file_paths, DATA_FILE)
322333

323334

324335
def _import_catalog(name: str, catalog_impl: str, properties: Properties) -> Catalog | None:

0 commit comments

Comments
 (0)