Skip to content

Commit cd844d3

Browse files
committed
feat(rest): support storage-credentials in PlanCompleted response (partial fix for #3165)
Adds storage-credentials support for CompletedPlanningResult (PlanCompleted). LoadCredentialsResponse support is pending.
1 parent 1a54e9c commit cd844d3

3 files changed

Lines changed: 128 additions & 4 deletions

File tree

pyiceberg/catalog/rest/__init__.py

Lines changed: 26 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -528,6 +528,29 @@ def plan_scan(self, identifier: str | Identifier, request: PlanTableScanRequest)
528528
Returns:
529529
List of FileScanTask objects ready for execution.
530530
531+
Raises:
532+
RuntimeError: If planning fails, is cancelled, or returns unexpected response.
533+
NotImplementedError: If async planning is required but not yet supported.
534+
"""
535+
tasks, _ = self._plan_scan_for_table(identifier, request)
536+
return tasks
537+
538+
def _plan_scan_for_table(
539+
self, identifier: str | Identifier, request: PlanTableScanRequest, table_location: str | None = None
540+
) -> tuple[list[FileScanTask], Properties]:
541+
"""Plan a table scan and return FileScanTasks along with resolved storage credentials.
542+
543+
Per Iceberg spec: storage-credentials in the PlanCompleted response take precedence over
544+
catalog config and should be applied to the FileIO used to read data files.
545+
546+
Args:
547+
identifier: Table identifier.
548+
request: The scan plan request parameters.
549+
table_location: The table's metadata location, used for credential prefix matching.
550+
551+
Returns:
552+
Tuple of (list of FileScanTask objects, resolved credential Properties).
553+
531554
Raises:
532555
RuntimeError: If planning fails, is cancelled, or returns unexpected response.
533556
NotImplementedError: If async planning is required but not yet supported.
@@ -548,6 +571,8 @@ def plan_scan(self, identifier: str | Identifier, request: PlanTableScanRequest)
548571
if not isinstance(response, PlanCompleted):
549572
raise RuntimeError(f"Invalid planStatus for response: {type(response).__name__}")
550573

574+
credential_config = self._resolve_storage_credentials(response.storage_credentials or [], table_location)
575+
551576
tasks: list[FileScanTask] = []
552577

553578
# Collect tasks from initial response
@@ -563,7 +588,7 @@ def plan_scan(self, identifier: str | Identifier, request: PlanTableScanRequest)
563588
tasks.append(FileScanTask.from_rest_response(task, batch.delete_files))
564589
pending_tasks.extend(batch.plan_tasks)
565590

566-
return tasks
591+
return tasks, credential_config
567592

568593
def _create_legacy_oauth2_auth_manager(self, session: Session) -> AuthManager:
569594
"""Create the LegacyOAuth2AuthManager by fetching required properties.

pyiceberg/table/__init__.py

Lines changed: 14 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -2054,7 +2054,12 @@ def _plan_files_server_side(self) -> Iterable[FileScanTask]:
20542054
case_sensitive=self.case_sensitive,
20552055
)
20562056

2057-
return self.catalog.plan_scan(self.table_identifier, request)
2057+
tasks, credential_config = self.catalog._plan_scan_for_table(self.table_identifier, request, self.table_metadata.location)
2058+
# Per Iceberg spec: storage-credentials from PlanCompleted take precedence over config.
2059+
# Update the FileIO so data files in this scan are read with the vended credentials.
2060+
if credential_config:
2061+
self.io = self.catalog._load_file_io({**self.io.properties, **credential_config}, self.table_metadata.location)
2062+
return tasks
20582063

20592064
def _plan_files_local(self) -> Iterable[FileScanTask]:
20602065
"""Plan files locally by reading manifests."""
@@ -2112,9 +2117,13 @@ def to_arrow(self) -> pa.Table:
21122117
"""
21132118
from pyiceberg.io.pyarrow import ArrowScan
21142119

2120+
# plan_files() must be called before capturing self.io so that any vended credentials
2121+
# returned by server-side scan planning (PlanCompleted.storage_credentials) are applied
2122+
# to self.io before ArrowScan is constructed.
2123+
tasks = self.plan_files()
21152124
return ArrowScan(
21162125
self.table_metadata, self.io, self.projection(), self.row_filter, self.case_sensitive, self.limit
2117-
).to_table(self.plan_files())
2126+
).to_table(tasks)
21182127

21192128
def to_arrow_batch_reader(self) -> pa.RecordBatchReader:
21202129
"""Return an Arrow RecordBatchReader from this DataScan.
@@ -2132,9 +2141,11 @@ def to_arrow_batch_reader(self) -> pa.RecordBatchReader:
21322141
from pyiceberg.io.pyarrow import ArrowScan, schema_to_pyarrow
21332142

21342143
target_schema = schema_to_pyarrow(self.projection())
2144+
# plan_files() must be called before capturing self.io (same reason as to_arrow).
2145+
tasks = self.plan_files()
21352146
batches = ArrowScan(
21362147
self.table_metadata, self.io, self.projection(), self.row_filter, self.case_sensitive, self.limit
2137-
).to_record_batches(self.plan_files())
2148+
).to_record_batches(tasks)
21382149

21392150
return pa.RecordBatchReader.from_batches(
21402151
target_schema,

tests/catalog/test_rest.py

Lines changed: 88 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2621,3 +2621,91 @@ def test_load_table_without_storage_credentials(
26212621
)
26222622
assert actual.metadata.model_dump() == expected.metadata.model_dump()
26232623
assert actual == expected
2624+
2625+
2626+
def test_plan_scan_with_storage_credentials(rest_mock: Mocker, example_table_metadata_with_snapshot_v1: dict[str, Any]) -> None:
2627+
metadata_location = "s3://warehouse/database/table/metadata/00001.metadata.json"
2628+
rest_mock.get(
2629+
f"{TEST_URI}v1/namespaces/fokko/tables/table",
2630+
json={
2631+
"metadata-location": metadata_location,
2632+
"metadata": example_table_metadata_with_snapshot_v1,
2633+
"config": {},
2634+
},
2635+
status_code=200,
2636+
request_headers=TEST_HEADERS,
2637+
)
2638+
rest_mock.post(
2639+
f"{TEST_URI}v1/namespaces/fokko/tables/table/plan",
2640+
json={
2641+
"status": "completed",
2642+
"file-scan-tasks": [],
2643+
"delete-files": [],
2644+
"plan-tasks": [],
2645+
"storage-credentials": [
2646+
{
2647+
"prefix": "s3://warehouse/database/table",
2648+
"config": {
2649+
"s3.access-key-id": "plan-vended-key",
2650+
"s3.secret-access-key": "plan-vended-secret",
2651+
"s3.session-token": "plan-vended-token",
2652+
},
2653+
}
2654+
],
2655+
},
2656+
status_code=200,
2657+
request_headers=TEST_HEADERS,
2658+
)
2659+
from pyiceberg.catalog.rest.scan_planning import PlanTableScanRequest
2660+
2661+
catalog = RestCatalog("rest", uri=TEST_URI, token=TEST_TOKEN, **{"rest-scan-planning-enabled": "true"})
2662+
tasks, credential_config = catalog._plan_scan_for_table(
2663+
("fokko", "table"),
2664+
PlanTableScanRequest(),
2665+
table_location="s3://warehouse/database/table",
2666+
)
2667+
2668+
assert tasks == []
2669+
assert credential_config == {
2670+
"s3.access-key-id": "plan-vended-key",
2671+
"s3.secret-access-key": "plan-vended-secret",
2672+
"s3.session-token": "plan-vended-token",
2673+
}
2674+
2675+
2676+
def test_plan_scan_without_storage_credentials(
2677+
rest_mock: Mocker, example_table_metadata_with_snapshot_v1: dict[str, Any]
2678+
) -> None:
2679+
metadata_location = "s3://warehouse/database/table/metadata/00001.metadata.json"
2680+
rest_mock.get(
2681+
f"{TEST_URI}v1/namespaces/fokko/tables/table",
2682+
json={
2683+
"metadata-location": metadata_location,
2684+
"metadata": example_table_metadata_with_snapshot_v1,
2685+
"config": {},
2686+
},
2687+
status_code=200,
2688+
request_headers=TEST_HEADERS,
2689+
)
2690+
rest_mock.post(
2691+
f"{TEST_URI}v1/namespaces/fokko/tables/table/plan",
2692+
json={
2693+
"status": "completed",
2694+
"file-scan-tasks": [],
2695+
"delete-files": [],
2696+
"plan-tasks": [],
2697+
},
2698+
status_code=200,
2699+
request_headers=TEST_HEADERS,
2700+
)
2701+
from pyiceberg.catalog.rest.scan_planning import PlanTableScanRequest
2702+
2703+
catalog = RestCatalog("rest", uri=TEST_URI, token=TEST_TOKEN, **{"rest-scan-planning-enabled": "true"})
2704+
tasks, credential_config = catalog._plan_scan_for_table(
2705+
("fokko", "table"),
2706+
PlanTableScanRequest(),
2707+
table_location="s3://warehouse/database/table",
2708+
)
2709+
2710+
assert tasks == []
2711+
assert credential_config == {}

0 commit comments

Comments
 (0)