Skip to content
1 change: 1 addition & 0 deletions src/Common/FailPoint.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,7 @@ static struct InitFiu
ONCE(iceberg_writes_cleanup) \
ONCE(iceberg_writes_non_retry_cleanup) \
ONCE(iceberg_writes_post_publish_throw) \
ONCE(iceberg_alter_catalog_update_metadata_fail) \
ONCE(iceberg_export_after_commit_before_zk_completed) \
REGULAR(export_partition_commit_always_throw) \
ONCE(export_partition_status_change_throw) \
Expand Down
278 changes: 228 additions & 50 deletions src/Databases/DataLake/RestCatalog.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
#include <Poco/JSON/Stringifier.h>
#include <Poco/Net/HTTPRequest.h>
#include <Common/Exception.h>
#include <Common/FailPoint.h>
#include <Common/logger_useful.h>
#include <Common/setThreadName.h>
#include <Storages/ObjectStorage/DataLakes/Iceberg/IcebergWrites.h>
Expand Down Expand Up @@ -31,6 +32,7 @@
#include <Interpreters/Context.h>
#include <filesystem>

#include <Storages/ObjectStorage/DataLakes/Iceberg/Constant.h>
#include <Storages/ObjectStorage/DataLakes/Iceberg/IcebergMetadata.h>
#include <Server/HTTP/HTMLForm.h>
#include <Formats/FormatFactory.h>
Expand All @@ -44,6 +46,9 @@
#include <Poco/Net/SSLManager.h>
#include <Poco/StreamCopier.h>

#include <sstream>
#include <unordered_set>


namespace DB::ErrorCodes
{
Expand All @@ -53,6 +58,11 @@ namespace DB::ErrorCodes
extern const int CATALOG_NAMESPACE_DISABLED;
}

namespace DB::FailPoints
{
extern const char iceberg_alter_catalog_update_metadata_fail[];
}

namespace DataLake
{

Expand Down Expand Up @@ -118,6 +128,215 @@ String encodeNamespaceForURI(const String & namespace_name)

}

namespace
{
Poco::JSON::Object::Ptr cloneJsonObject(const Poco::JSON::Object::Ptr & obj)
{
std::ostringstream oss; // STYLE_CHECK_ALLOW_STD_STRING_STREAM
obj->stringify(oss);

Poco::JSON::Parser parser;
return parser.parse(oss.str()).extract<Poco::JSON::Object::Ptr>();
}

void collectSchemaFieldIdsFromFields(const Poco::JSON::Array::Ptr & fields, std::unordered_set<Int32> & ids)
{
if (!fields)
return;

for (UInt32 i = 0; i < fields->size(); ++i)
{
auto field = fields->getObject(i);
ids.insert(field->getValue<Int32>(DB::Iceberg::f_id));
if (field->has(DB::Iceberg::f_fields))
collectSchemaFieldIdsFromFields(field->getArray(DB::Iceberg::f_fields), ids);
}
}

/// Returns true when the table default sort order references a column absent from the new schema.
bool sortOrderIncompatibleWithSchema(
const Poco::JSON::Object::Ptr & metadata, const Poco::JSON::Object::Ptr & new_schema_obj)
{
if (!metadata->has(DB::Iceberg::f_sort_orders) || !metadata->has(DB::Iceberg::f_default_sort_order_id))
return false;

const Int64 default_sort_order_id = metadata->getValue<Int64>(DB::Iceberg::f_default_sort_order_id);
auto sort_orders = metadata->getArray(DB::Iceberg::f_sort_orders);

Poco::JSON::Object::Ptr default_sort_order;
for (UInt32 i = 0; i < sort_orders->size(); ++i)
{
auto sort_order = sort_orders->getObject(i);
if (sort_order->getValue<Int64>(DB::Iceberg::f_order_id) == default_sort_order_id)
{
default_sort_order = sort_order;
break;
}
}

if (!default_sort_order || !default_sort_order->has(DB::Iceberg::f_fields))
return false;

auto sort_fields = default_sort_order->getArray(DB::Iceberg::f_fields);
if (sort_fields->size() == 0)
return false;

std::unordered_set<Int32> new_schema_field_ids;
if (new_schema_obj->has(DB::Iceberg::f_fields))
collectSchemaFieldIdsFromFields(new_schema_obj->getArray(DB::Iceberg::f_fields), new_schema_field_ids);

for (UInt32 i = 0; i < sort_fields->size(); ++i)
{
auto field = sort_fields->getObject(i);
if (!field->has(DB::Iceberg::f_source_id))
continue;

const Int32 source_id = field->getValue<Int32>(DB::Iceberg::f_source_id);
if (!new_schema_field_ids.contains(source_id))
return true;
}

return false;
}
}

Poco::JSON::Object::Ptr buildUpdateMetadataRequestBody(
const String & namespace_name, const String & table_name, Poco::JSON::Object::Ptr new_snapshot)
{
if (!new_snapshot)
return nullptr;

Poco::JSON::Object::Ptr request_body = new Poco::JSON::Object;
{
Poco::JSON::Object::Ptr identifier = new Poco::JSON::Object;
identifier->set("name", table_name);
Poco::JSON::Array::Ptr namespaces = new Poco::JSON::Array;
namespaces->add(namespace_name);
identifier->set("namespace", namespaces);

request_body->set("identifier", identifier);
}

// Schema-change commit path (ALTER TABLE add/drop/modify/rename column).
if (new_snapshot->has(DB::Iceberg::f_schemas))
{
if (!new_snapshot->has(DB::Iceberg::f_current_schema_id))
throw DB::Exception(
DB::ErrorCodes::DATALAKE_DATABASE_ERROR,
"Iceberg update-metadata for {}.{} is missing '{}' field",
namespace_name, table_name, DB::Iceberg::f_current_schema_id);

const Int32 new_schema_id = new_snapshot->getValue<Int32>(DB::Iceberg::f_current_schema_id);
const Int32 old_schema_id = new_schema_id - 1;

Poco::JSON::Object::Ptr new_schema_obj;
auto schemas = new_snapshot->getArray(DB::Iceberg::f_schemas);
for (UInt32 i = 0; i < schemas->size(); ++i)
{
auto s = schemas->getObject(i);
if (s->getValue<Int32>(DB::Iceberg::f_schema_id) == new_schema_id)
{
new_schema_obj = s;
break;
}
}
if (!new_schema_obj)
throw DB::Exception(
DB::ErrorCodes::DATALAKE_DATABASE_ERROR,
"Iceberg update-metadata for {}.{}: no schema object matching current-schema-id={}",
namespace_name, table_name, new_schema_id);

Poco::JSON::Object::Ptr schema_for_rest = cloneJsonObject(new_schema_obj);
if (!schema_for_rest->has("identifier-field-ids"))
{
Poco::JSON::Array::Ptr empty_identifier_field_ids = new Poco::JSON::Array;
schema_for_rest->set("identifier-field-ids", empty_identifier_field_ids);
}

if (old_schema_id >= 0)
{
Poco::JSON::Object::Ptr requirement = new Poco::JSON::Object;
requirement->set("type", "assert-current-schema-id");
requirement->set("current-schema-id", old_schema_id);

Poco::JSON::Array::Ptr requirements = new Poco::JSON::Array;
requirements->add(requirement);
request_body->set("requirements", requirements);
}

Poco::JSON::Array::Ptr updates = new Poco::JSON::Array;
{
Poco::JSON::Object::Ptr add_schema = new Poco::JSON::Object;
add_schema->set("action", "add-schema");
add_schema->set("schema", schema_for_rest);
if (new_snapshot->has(DB::Iceberg::f_last_column_id))
add_schema->set("last-column-id", new_snapshot->getValue<Int32>(DB::Iceberg::f_last_column_id));
updates->add(add_schema);
}
{
Poco::JSON::Object::Ptr set_current_schema = new Poco::JSON::Object;
set_current_schema->set("action", "set-current-schema");
set_current_schema->set("schema-id", -1);
updates->add(set_current_schema);
}
if (sortOrderIncompatibleWithSchema(new_snapshot, new_schema_obj))
{
Poco::JSON::Object::Ptr unsorted_sort_order = new Poco::JSON::Object;
unsorted_sort_order->set(DB::Iceberg::f_order_id, 0);
unsorted_sort_order->set(DB::Iceberg::f_fields, Poco::JSON::Array::Ptr(new Poco::JSON::Array));

Poco::JSON::Object::Ptr add_sort_order = new Poco::JSON::Object;
add_sort_order->set("action", "add-sort-order");
add_sort_order->set("sort-order", unsorted_sort_order);
updates->add(add_sort_order);

Poco::JSON::Object::Ptr set_default_sort_order = new Poco::JSON::Object;
set_default_sort_order->set("action", "set-default-sort-order");
set_default_sort_order->set("sort-order-id", -1);
updates->add(set_default_sort_order);
}
request_body->set("updates", updates);
}
else
{
// Snapshot-append commit path (INSERT / position-delete mutation).
if (new_snapshot->has("parent-snapshot-id"))
{
auto parent_snapshot_id = new_snapshot->getValue<Int64>("parent-snapshot-id");
if (parent_snapshot_id != -1)
{
Poco::JSON::Object::Ptr requirement = new Poco::JSON::Object;
requirement->set("type", "assert-ref-snapshot-id");
requirement->set("ref", "main");
requirement->set("snapshot-id", parent_snapshot_id);

Poco::JSON::Array::Ptr requirements = new Poco::JSON::Array;
requirements->add(requirement);
request_body->set("requirements", requirements);
}
}

Poco::JSON::Array::Ptr updates = new Poco::JSON::Array;
{
Poco::JSON::Object::Ptr add_snapshot = new Poco::JSON::Object;
add_snapshot->set("action", "add-snapshot");
add_snapshot->set("snapshot", new_snapshot);
updates->add(add_snapshot);
}
{
Poco::JSON::Object::Ptr set_snapshot = new Poco::JSON::Object;
set_snapshot->set("action", "set-snapshot-ref");
set_snapshot->set("ref-name", "main");
set_snapshot->set("type", "branch");
set_snapshot->set("snapshot-id", new_snapshot->getValue<Int64>("snapshot-id"));
updates->add(set_snapshot);
}
request_body->set("updates", updates);
}

return request_body;
}

std::string RestCatalog::Config::toString() const
{
DB::WriteBufferFromOwnString wb;
Expand Down Expand Up @@ -1086,64 +1305,23 @@ void RestCatalog::createTable(const String & namespace_name, const String & tabl

bool RestCatalog::updateMetadata(const String & namespace_name, const String & table_name, const String & /*new_metadata_path*/, Poco::JSON::Object::Ptr new_snapshot) const
{
const std::string endpoint = fmt::format("{}/namespaces/{}/tables/{}", base_url, namespace_name, table_name);
fiu_do_on(DB::FailPoints::iceberg_alter_catalog_update_metadata_fail, { return false; });

Poco::JSON::Object::Ptr request_body = new Poco::JSON::Object;
{
Poco::JSON::Object::Ptr identifier = new Poco::JSON::Object;
identifier->set("name", table_name);
Poco::JSON::Array::Ptr namespaces = new Poco::JSON::Array;
namespaces->add(namespace_name);
identifier->set("namespace", namespaces);

request_body->set("identifier", identifier);
}

if (new_snapshot->has("parent-snapshot-id"))
{
auto parent_snapshot_id = new_snapshot->getValue<Int64>("parent-snapshot-id");
if (parent_snapshot_id != -1)
{
Poco::JSON::Object::Ptr requirement = new Poco::JSON::Object;
requirement->set("type", "assert-ref-snapshot-id");
requirement->set("ref", "main");
requirement->set("snapshot-id", parent_snapshot_id);

Poco::JSON::Array::Ptr requirements = new Poco::JSON::Array;
requirements->add(requirement);

request_body->set("requirements", requirements);
}
}

{
Poco::JSON::Array::Ptr updates = new Poco::JSON::Array;

{
Poco::JSON::Object::Ptr add_snapshot = new Poco::JSON::Object;
add_snapshot->set("action", "add-snapshot");
add_snapshot->set("snapshot", new_snapshot);
updates->add(add_snapshot);
}

{
Poco::JSON::Object::Ptr set_snapshot = new Poco::JSON::Object;
set_snapshot->set("action", "set-snapshot-ref");
set_snapshot->set("ref-name", "main");
set_snapshot->set("type", "branch");
set_snapshot->set("snapshot-id", new_snapshot->getValue<Int64>("snapshot-id"));
const std::string endpoint = fmt::format("{}/namespaces/{}/tables/{}", base_url, namespace_name, table_name);

updates->add(set_snapshot);
}
request_body->set("updates", updates);
}
// Throws DB::Exception(DATALAKE_DATABASE_ERROR) on malformed metadata (programming error).
auto request_body = buildUpdateMetadataRequestBody(namespace_name, table_name, new_snapshot);
if (!request_body)
return true; // nothing to commit

try
{
sendRequest(endpoint, request_body);
}
catch (const DB::HTTPException &)
catch (const DB::HTTPException & ex)
{
LOG_WARNING(log, "Iceberg REST updateMetadata for {}.{} failed: {}",
namespace_name, table_name, ex.displayText());
return false;
}
return true;
Expand Down
10 changes: 10 additions & 0 deletions src/Databases/DataLake/RestCatalog.h
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,16 @@ struct AccessToken
}
};

/// Builds the JSON body for `POST .../namespaces/{ns}/tables/{table}` (Iceberg REST update).
///
/// Returns `nullptr` when `new_snapshot` is null (nothing to commit). Throws
/// `DB::Exception(DATALAKE_DATABASE_ERROR)` with a specific message when the metadata
/// blob is malformed (e.g. missing `current-schema-id`, no schema object matching it).
Poco::JSON::Object::Ptr buildUpdateMetadataRequestBody(
const String & namespace_name,
const String & table_name,
Poco::JSON::Object::Ptr new_snapshot);

class RestCatalog : public ICatalog, public DB::WithContext
{
public:
Expand Down
Loading
Loading