Skip to content

Commit df258f5

Browse files
authored
perf: build partition filter with balanced tree to avoid RecursionError (#3264)
1 parent 0826d3e commit df258f5

2 files changed

Lines changed: 25 additions & 12 deletions

File tree

pyiceberg/table/__init__.py

Lines changed: 12 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -367,20 +367,20 @@ def _build_partition_predicate(
367367
A predicate matching any of the input partition records.
368368
"""
369369
partition_fields = [schema.find_field(field.source_id).name for field in spec.fields]
370+
if not partition_records or not partition_fields:
371+
return AlwaysFalse()
370372

371-
expr: BooleanExpression = AlwaysFalse()
373+
per_record_exprs: list[BooleanExpression] = []
372374
for partition_record in partition_records:
373-
match_partition_expression: BooleanExpression = AlwaysTrue()
374-
375-
for pos, partition_field in enumerate(partition_fields):
376-
predicate = (
377-
EqualTo(Reference(partition_field), partition_record[pos])
378-
if partition_record[pos] is not None
379-
else IsNull(Reference(partition_field))
380-
)
381-
match_partition_expression = And(match_partition_expression, predicate)
382-
expr = Or(expr, match_partition_expression)
383-
return expr
375+
predicates: list[BooleanExpression] = [
376+
EqualTo(Reference(partition_field), partition_record[pos])
377+
if partition_record[pos] is not None
378+
else IsNull(Reference(partition_field))
379+
for pos, partition_field in enumerate(partition_fields)
380+
]
381+
per_record_exprs.append(And(*predicates) if len(predicates) > 1 else predicates[0])
382+
383+
return Or(*per_record_exprs) if len(per_record_exprs) > 1 else per_record_exprs[0]
384384

385385
def _append_snapshot_producer(
386386
self, snapshot_properties: dict[str, str], branch: str | None = MAIN_BRANCH

tests/table/test_init.py

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@
3232
EqualTo,
3333
In,
3434
)
35+
from pyiceberg.expressions.visitors import bind
3536
from pyiceberg.io import PY_IO_IMPL, load_file_io
3637
from pyiceberg.partitioning import PartitionField, PartitionSpec
3738
from pyiceberg.schema import Schema
@@ -90,6 +91,7 @@
9091
BucketTransform,
9192
IdentityTransform,
9293
)
94+
from pyiceberg.typedef import Record
9395
from pyiceberg.types import (
9496
BinaryType,
9597
BooleanType,
@@ -1753,3 +1755,14 @@ def test_check_uuid_passes_when_match(table_v2: Table, example_table_metadata_v2
17531755
new_metadata = TableMetadataV2(**example_table_metadata_v2)
17541756
# Should not raise with same uuid
17551757
Table._check_uuid(table_v2.metadata, new_metadata)
1758+
1759+
1760+
def test_build_large_partition_predicate(table_v2: Table) -> None:
1761+
with table_v2.transaction() as tx:
1762+
expr = tx._build_partition_predicate(
1763+
partition_records={Record(i) for i in range(5000)},
1764+
spec=table_v2.metadata.spec(),
1765+
schema=table_v2.metadata.schema(),
1766+
)
1767+
1768+
bind(table_v2.metadata.schema(), expr, case_sensitive=True)

0 commit comments

Comments
 (0)