Skip to content
Open
Show file tree
Hide file tree
Changes from 22 commits
Commits
Show all changes
29 commits
Select commit Hold shift + click to select a range
aaf9897
Add tests for TTL EXPORT partition (test-first)
mkmkme May 19, 2026
399bd77
Add `EXPORT` TTL mode for partition export to a destination table
mkmkme May 19, 2026
0fb5adb
Add per-part `export_ttl` info for partition export TTLs
mkmkme May 19, 2026
be213b7
Switch TTL EXPORT syntax to `EXPORT TO TABLE`
mkmkme May 20, 2026
0664ab8
Add `export_origin` to partition export manifests
mkmkme May 20, 2026
97d6ae1
Add `export_merge_tree_partition_mark_as_ttl` and ttl-marker invariant
mkmkme May 20, 2026
77df5e6
Verify export destination compatibility at TTL DDL time
mkmkme May 20, 2026
7a4acf4
Add `TTLExportScheduler` + table-level backoff settings
mkmkme May 20, 2026
f7d8a27
Wire `TTLExportScheduler` into `StorageReplicatedMergeTree`
mkmkme May 20, 2026
b876bb3
fix a test broken during rebase
mkmkme May 20, 2026
3e6e29e
use export_merge_tree_partition_task_entries instead of requests to z…
mkmkme May 20, 2026
0065763
Defer ttl-marker cleanup until after the new manifest is durable
mkmkme May 20, 2026
e7e7287
Register new TTL EXPORT settings in SettingsChangesHistory
mkmkme May 20, 2026
1ebd610
processExportTTL: fall back to storage.getStorageID().getDatabaseName()
mkmkme May 21, 2026
c3b83ac
Walk TTL EXPORT partitions in expiration-max order, not lex order
mkmkme May 21, 2026
48097f2
Store TTL EXPORT destination database and table separately
mkmkme May 21, 2026
e148b48
Guard ttl-marker move by expiration max, not partition_id lex order
mkmkme May 21, 2026
3428e69
Reject TTL ... EXPORT TO TABLE on non-replicated MergeTree at DDL time
mkmkme May 21, 2026
dd3a2ac
Tighten ttl-marker guard: collect only the two partitions of interest
mkmkme May 21, 2026
335938f
Fix use-after-free in TTL EXPORT backward-marker guard
mkmkme May 21, 2026
153b4a1
Resolve TTL EXPORT destination at the interpreter level
mkmkme May 21, 2026
f6cbe79
Persist TTL EXPORT high-water mark on the manifest
mkmkme May 21, 2026
45e0442
Tie-break TTL EXPORT scheduler walk by `(expiration_max, partition_id)`
mkmkme May 21, 2026
0469250
Wake TTL EXPORT scheduler on replicas receiving MODIFY TTL via replic…
mkmkme May 21, 2026
37eca94
drop the unused parameter from getPartitionExportTTLMax
mkmkme May 22, 2026
c7ed5f1
TTLExportScheduler: drop the backoff scheduler and reschedule agressi…
mkmkme May 22, 2026
3767581
Verify Iceberg partition compatibility at TTL DDL time
mkmkme May 22, 2026
14e5461
added tests for schema changing in-flight
mkmkme May 22, 2026
4abd8db
Merge remote-tracking branch 'origin/antalya-26.3' into mkmkme/antaly…
mkmkme May 22, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions src/Core/Settings.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -7531,6 +7531,9 @@ Overwrite file if it already exists when exporting a merge tree part
)", 0) \
DECLARE(Bool, export_merge_tree_partition_force_export, false, R"(
Ignore existing partition export and overwrite the zookeeper entry
)", 0) \
DECLARE(Bool, export_merge_tree_partition_mark_as_ttl, false, R"(
When set on `ALTER ... EXPORT PARTITION`, marks the manifest with `export_origin = 'ttl'` so it is treated as if submitted by the TTL scheduler: it is exempt from manifest-TTL eviction and participates in the cross-partition ordering check against other ttl-origin manifests. The TTL scheduler always sets this implicitly when it submits.
)", 0) \
DECLARE(UInt64, export_merge_tree_partition_max_retries, 3, R"(
Maximum number of retries for exporting a merge tree part in an export partition task
Expand Down
4 changes: 4 additions & 0 deletions src/Core/SettingsChangesHistory.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ const VersionToSettingsChangesMap & getSettingsChangesHistory()
addSettingsChanges(settings_changes_history, "26.3.1.20001.altinityantalya",
{
{"object_storage_cluster_join_mode", "allow", "allow", "New setting"},
{"export_merge_tree_partition_mark_as_ttl", false, false, "New setting."},
});
addSettingsChanges(settings_changes_history, "26.3",
{
Expand Down Expand Up @@ -1151,6 +1152,9 @@ const VersionToSettingsChangesMap & getMergeTreeSettingsChangesHistory()
{
addSettingsChanges(merge_tree_settings_changes_history, "26.3",
{
{"export_merge_tree_partition_ttl_poll_interval_seconds", 5, 5, "New setting for the TTL EXPORT scheduler poll interval."},
{"export_merge_tree_partition_ttl_min_backoff_seconds", 1, 1, "New setting for the TTL EXPORT scheduler minimum backoff."},
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you plan to merge it with the backoff policy?

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, I was planning to remove the backoff policy from the TTL scheduler itself and rely strictly on export partition backoff. Since it's not yet a thing, I thought about keeping it for a while. But considering the fact it actually introduces settings and changes in settings history, I think I'll drop it now

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Dropped: c7ed5f1

{"export_merge_tree_partition_ttl_max_backoff_seconds", 60, 60, "New setting for the TTL EXPORT scheduler maximum backoff."},
{"vertical_merge_optimize_ttl_delete", false, true, "Allow vertical merge algorithm for merges that need to remove rows expired by TTL"},
{"shared_merge_tree_replica_set_max_lifetime_seconds", 300, 300, "New setting"},
{"auto_statistics_types", "", "minmax, uniq", "Enable auto statistics by default"},
Expand Down
5 changes: 3 additions & 2 deletions src/Databases/DatabasesCommon.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -129,12 +129,13 @@ void validateCreateQuery(const ASTCreateQuery & query, ContextPtr context)
primary_key = KeyDescription::getKeyFromAST(storage.order_by->ptr(), columns_desc, context);
if (storage.primary_key)
primary_key = KeyDescription::getKeyFromAST(storage.primary_key->ptr(), columns_desc, context);
KeyDescription partition_key;
if (storage.partition_by)
KeyDescription::getKeyFromAST(storage.partition_by->ptr(), columns_desc, context);
partition_key = KeyDescription::getKeyFromAST(storage.partition_by->ptr(), columns_desc, context);
if (storage.sample_by)
KeyDescription::getKeyFromAST(storage.sample_by->ptr(), columns_desc, context);
if (storage.ttl_table && primary_key.has_value())
TTLTableDescription::getTTLForTableFromAST(storage.ttl_table->ptr(), columns_desc, context, *primary_key, true);
TTLTableDescription::getTTLForTableFromAST(storage.ttl_table->ptr(), columns_desc, context, *primary_key, partition_key, true);
}
}

Expand Down
12 changes: 11 additions & 1 deletion src/Interpreters/AddDefaultDatabaseVisitor.h
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,8 @@ class AddDefaultDatabaseVisitor
if (!tryVisitDynamicCast<ASTAlterQuery>(parent, ast) &&
!tryVisitDynamicCast<ASTQueryWithTableAndOutput>(parent, ast) &&
!tryVisitDynamicCast<ASTRenameQuery>(parent, ast) &&
!tryVisitDynamicCast<ASTFunction>(parent, ast))
!tryVisitDynamicCast<ASTFunction>(parent, ast) &&
!tryVisitDynamicCast<ASTTTLElement>(parent, ast))
{}
}

Expand Down Expand Up @@ -314,6 +315,15 @@ class AddDefaultDatabaseVisitor
}
}

void visitDDL(ASTPtr & /* parent */, ASTTTLElement & node, ASTPtr & /* ast */) const
{
if (only_replace_current_database_function)
return;

if (node.mode == TTLMode::EXPORT && node.destination_database.empty())
node.destination_database = database_name;
}

void visitDDL(ASTPtr & parent, ASTFunction & function, ASTPtr & node) const
{
if (function.name == "currentDatabase")
Expand Down
7 changes: 7 additions & 0 deletions src/Interpreters/InterpreterCreateQuery.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1663,6 +1663,13 @@ BlockIO InterpreterCreateQuery::createTable(ASTCreateQuery & create)
visitor.visit(*create.refresh_strategy);
}

if (create.storage && create.storage->ttl_table)
{
ASTPtr ttl_table_ptr = create.storage->ttl_table->ptr();
AddDefaultDatabaseVisitor visitor(getContext(), current_database);
visitor.visitDDL(ttl_table_ptr);
}

if (create.columns_list)
{
AddDefaultDatabaseVisitor visitor(getContext(), current_database);
Expand Down
12 changes: 12 additions & 0 deletions src/Parsers/ASTTTLElement.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,18 @@ void ASTTTLElement::formatImpl(WriteBuffer & ostr, const FormatSettings & settin
ostr << " RECOMPRESS ";
recompression_codec->format(ostr, settings, state, frame);
}
else if (mode == TTLMode::EXPORT)
{
if (destination_type != DataDestinationType::TABLE)
throw Exception(ErrorCodes::LOGICAL_ERROR,
"Unsupported destination type {} for TTL EXPORT",
magic_enum::enum_name(destination_type));

ostr << " EXPORT TO TABLE ";
if (!destination_database.empty())
ostr << backQuoteIfNeed(destination_database) << '.';
ostr << backQuoteIfNeed(destination_name);
}
else if (mode == TTLMode::DELETE)
{
/// It would be better to output "DELETE" here but that will break compatibility with earlier versions.
Expand Down
14 changes: 13 additions & 1 deletion src/Parsers/ASTTTLElement.h
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,12 @@ class ASTTTLElement : public IAST
public:
TTLMode mode;
DataDestinationType destination_type;
/// For TTLMode::EXPORT: the destination database; empty if the user wrote an unqualified
/// table name. For other modes (MOVE), unused.
String destination_database;
/// For TTLMode::EXPORT: just the destination table name. Never the joined `db.table` form,
/// so quoted table names that legitimately contain dots round-trip losslessly.
/// For TTLMode::MOVE: the disk or volume name.
String destination_name;
bool if_exists = false;

Expand All @@ -23,9 +29,15 @@ class ASTTTLElement : public IAST

ASTPtr recompression_codec;

ASTTTLElement(TTLMode mode_, DataDestinationType destination_type_, const String & destination_name_, bool if_exists_)
ASTTTLElement(
TTLMode mode_,
DataDestinationType destination_type_,
const String & destination_database_,
const String & destination_name_,
bool if_exists_)
: mode(mode_)
, destination_type(destination_type_)
, destination_database(destination_database_)
, destination_name(destination_name_)
, if_exists(if_exists_)
, ttl_expr_pos(-1)
Expand Down
1 change: 1 addition & 0 deletions src/Parsers/CommonParsers.h
Original file line number Diff line number Diff line change
Expand Up @@ -351,6 +351,7 @@ namespace DB
MR_MACROS(MOVE_PARTITION, "MOVE PARTITION") \
MR_MACROS(EXPORT_PART, "EXPORT PART") \
MR_MACROS(EXPORT_PARTITION, "EXPORT PARTITION") \
MR_MACROS(EXPORT_TO_TABLE, "EXPORT TO TABLE") \
MR_MACROS(MOVE, "MOVE") \
MR_MACROS(MS, "MS") \
MR_MACROS(MUTATION, "MUTATION") \
Expand Down
15 changes: 14 additions & 1 deletion src/Parsers/ExpressionElementParsers.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@
#include <Parsers/ExpressionElementParsers.h>
#include <Parsers/ParserCreateQuery.h>
#include <Parsers/ParserExplainQuery.h>
#include <Parsers/parseDatabaseAndTableName.h>

#include <Interpreters/StorageID.h>

Expand Down Expand Up @@ -2449,6 +2450,7 @@ bool ParserTTLElement::parseImpl(Pos & pos, ASTPtr & node, Expected & expected)
ParserKeyword s_set(Keyword::SET);
ParserKeyword s_recompress(Keyword::RECOMPRESS);
ParserKeyword s_codec(Keyword::CODEC);
ParserKeyword s_export_to_table(Keyword::EXPORT_TO_TABLE);
ParserKeyword s_materialize_ttl(Keyword::MATERIALIZE_TTL);
ParserKeyword s_remove_ttl(Keyword::REMOVE_TTL);
ParserKeyword s_modify_ttl(Keyword::MODIFY_TTL);
Expand Down Expand Up @@ -2476,6 +2478,7 @@ bool ParserTTLElement::parseImpl(Pos & pos, ASTPtr & node, Expected & expected)

TTLMode mode;
DataDestinationType destination_type = DataDestinationType::DELETE;
String destination_database;
String destination_name;

if (s_to_disk.ignore(pos, expected))
Expand All @@ -2496,6 +2499,11 @@ bool ParserTTLElement::parseImpl(Pos & pos, ASTPtr & node, Expected & expected)
{
mode = TTLMode::RECOMPRESS;
}
else if (s_export_to_table.ignore(pos, expected))
{
mode = TTLMode::EXPORT;
destination_type = DataDestinationType::TABLE;
}
else
{
/// DELETE is the default mode.
Expand Down Expand Up @@ -2547,8 +2555,13 @@ bool ParserTTLElement::parseImpl(Pos & pos, ASTPtr & node, Expected & expected)
if (!parser_codec.parse(pos, recompression_codec, expected))
return false;
}
else if (mode == TTLMode::EXPORT)
{
if (!parseDatabaseAndTableName(pos, expected, destination_database, destination_name))
return false;
}

auto ttl_element = make_intrusive<ASTTTLElement>(mode, destination_type, destination_name, if_exists);
auto ttl_element = make_intrusive<ASTTTLElement>(mode, destination_type, destination_database, destination_name, if_exists);
ttl_element->setTTL(std::move(ttl_expr));
if (where_expr)
ttl_element->setWhere(std::move(where_expr));
Expand Down
4 changes: 4 additions & 0 deletions src/Processors/TTL/TTLUpdateInfoAlgorithm.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,10 @@ void TTLUpdateInfoAlgorithm::finalize(const MutableDataPartPtr & data_part) cons
data_part->ttl_infos.columns_ttl[ttl_update_key] = new_ttl_info;
data_part->ttl_infos.updatePartMinMaxTTL(new_ttl_info.min, new_ttl_info.max);
}
else if (ttl_update_field == TTLUpdateField::EXPORT_TTL)
{
data_part->ttl_infos.export_ttl[ttl_update_key] = new_ttl_info;
}

}

Expand Down
1 change: 1 addition & 0 deletions src/Processors/TTL/TTLUpdateInfoAlgorithm.h
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ enum class TTLUpdateField : uint8_t
MOVES_TTL,
RECOMPRESSION_TTL,
GROUP_BY_TTL,
EXPORT_TTL,
};

/// Calculates new ttl_info and does nothing with data.
Expand Down
8 changes: 8 additions & 0 deletions src/Processors/Transforms/TTLCalcTransform.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,14 @@ TTLCalcTransform::TTLCalcTransform(
algorithms.emplace_back(std::make_unique<TTLUpdateInfoAlgorithm>(
getExpressions(recompression_ttl, subqueries_for_sets, context), recompression_ttl,
TTLUpdateField::RECOMPRESSION_TTL, recompression_ttl.result_column, old_ttl_infos.recompression_ttl[recompression_ttl.result_column], current_time_, force_));

for (const auto & export_ttl : metadata_snapshot_->getExportTTLs())
{
const auto export_key = export_ttl.getExportKey();
algorithms.emplace_back(std::make_unique<TTLUpdateInfoAlgorithm>(
getExpressions(export_ttl, subqueries_for_sets, context), export_ttl,
TTLUpdateField::EXPORT_TTL, export_key, old_ttl_infos.export_ttl[export_key], current_time_, force_));
}
}

void TTLCalcTransform::consume(Chunk chunk)
Expand Down
8 changes: 8 additions & 0 deletions src/Processors/Transforms/TTLTransform.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,14 @@ TTLTransform::TTLTransform(
algorithms.emplace_back(std::make_unique<TTLUpdateInfoAlgorithm>(
getExpressions(recompression_ttl, subqueries_for_sets, context), recompression_ttl,
TTLUpdateField::RECOMPRESSION_TTL, recompression_ttl.result_column, old_ttl_infos.recompression_ttl[recompression_ttl.result_column], current_time_, force_));

for (const auto & export_ttl : metadata_snapshot_->getExportTTLs())
{
const auto export_key = export_ttl.getExportKey();
algorithms.emplace_back(std::make_unique<TTLUpdateInfoAlgorithm>(
getExpressions(export_ttl, subqueries_for_sets, context), export_ttl,
TTLUpdateField::EXPORT_TTL, export_key, old_ttl_infos.export_ttl[export_key], current_time_, force_));
}
}

Block reorderColumns(Block block, const Block & header)
Expand Down
4 changes: 3 additions & 1 deletion src/Storages/AlterCommands.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -870,7 +870,8 @@ void AlterCommand::apply(StorageInMemoryMetadata & metadata, ContextPtr context)
else if (type == MODIFY_TTL)
{
metadata.table_ttl = TTLTableDescription::getTTLForTableFromAST(
ttl, metadata.columns, context, metadata.primary_key, context->getSettingsRef()[Setting::allow_suspicious_ttl_expressions]);
ttl, metadata.columns, context, metadata.primary_key, metadata.partition_key,
context->getSettingsRef()[Setting::allow_suspicious_ttl_expressions]);
}
else if (type == REMOVE_TTL)
{
Expand Down Expand Up @@ -1393,6 +1394,7 @@ void AlterCommands::apply(StorageInMemoryMetadata & metadata, ContextPtr context
metadata_copy.columns,
context,
metadata_copy.primary_key,
metadata_copy.partition_key,
context->getSettingsRef()[Setting::allow_suspicious_ttl_expressions]);

metadata = std::move(metadata_copy);
Expand Down
27 changes: 27 additions & 0 deletions src/Storages/ExportReplicatedMergeTreePartitionManifest.h
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,15 @@
namespace DB
{

/// Distinguishes manifests submitted by manual `ALTER ... EXPORT PARTITION` from those
/// submitted by the TTL scheduler. Persisted in the manifest body and surfaced through
/// `system.replicated_partition_exports.export_origin`.
enum class ExportOrigin : int8_t
{
alter = 0,
ttl = 1,
};

struct ExportReplicatedMergeTreePartitionProcessingPartEntry
{

Expand Down Expand Up @@ -121,6 +130,11 @@ struct ExportReplicatedMergeTreePartitionManifest
String filename_pattern;
bool write_full_path_in_iceberg_metadata = false;
String iceberg_metadata_json;
ExportOrigin export_origin = ExportOrigin::alter;
/// Partition-wide max of the EXPORT TTL expression at submission time.
/// Populated only when `export_origin == ttl`; together with `status = COMPLETED`
/// it is the scheduler's durable high-water mark (see `TTLExportScheduler`).
time_t export_ttl_max = 0;

std::string toJsonString() const
{
Expand Down Expand Up @@ -154,6 +168,9 @@ struct ExportReplicatedMergeTreePartitionManifest
json.set("ttl_seconds", ttl_seconds);
json.set("task_timeout_seconds", task_timeout_seconds);
json.set("write_full_path_in_iceberg_metadata", write_full_path_in_iceberg_metadata);
json.set("export_origin", String(magic_enum::enum_name(export_origin)));
if (export_origin == ExportOrigin::ttl)
json.set("export_ttl_max", export_ttl_max);
std::ostringstream oss; // STYLE_CHECK_ALLOW_STD_STRING_STREAM
oss.exceptions(std::ios::failbit);
Poco::JSON::Stringifier::stringify(json, oss);
Expand Down Expand Up @@ -208,6 +225,16 @@ struct ExportReplicatedMergeTreePartitionManifest

manifest.write_full_path_in_iceberg_metadata = json->getValue<bool>("write_full_path_in_iceberg_metadata");

/// Manifests written before this field existed default to `alter`.
if (json->has("export_origin"))
{
if (auto parsed = magic_enum::enum_cast<ExportOrigin>(json->getValue<String>("export_origin")))
manifest.export_origin = *parsed;
}

if (json->has("export_ttl_max"))
manifest.export_ttl_max = json->getValue<time_t>("export_ttl_max");

return manifest;
}
};
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,10 @@ namespace
auto & entries_by_key
)
{
bool has_expired = metadata.create_time < now - static_cast<time_t>(metadata.ttl_seconds);
/// Manifests submitted by the TTL scheduler are durable by design: the scheduler relies on the
/// last manifest for `(src, dest)` to know where to resume, so manifest-TTL eviction must skip them.
bool has_expired = metadata.export_origin != ExportOrigin::ttl
&& metadata.create_time < now - static_cast<time_t>(metadata.ttl_seconds);

bool task_timed_out = is_pending
&& metadata.task_timeout_seconds > 0
Expand Down Expand Up @@ -545,6 +548,7 @@ std::vector<ReplicatedPartitionExportInfo> ExportPartitionManifestUpdatingTask::
info.last_exception = last_exception;
info.exception_part = exception_part;
info.exception_count = exception_count;
info.export_origin = metadata.export_origin;
infos.emplace_back(std::move(info));
}

Expand Down Expand Up @@ -572,6 +576,7 @@ std::vector<ReplicatedPartitionExportInfo> ExportPartitionManifestUpdatingTask::
info.parts_to_do = entry.manifest.parts.size();
info.parts = entry.manifest.parts;
info.status = magic_enum::enum_name(entry.status);
info.export_origin = entry.manifest.export_origin;

infos.emplace_back(std::move(info));
}
Expand Down
25 changes: 25 additions & 0 deletions src/Storages/MergeTree/ExportPartitionUtils.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,11 @@
#include <Common/ProfileEvents.h>
#include <Common/FailPoint.h>
#include <Common/logger_useful.h>
#include <Parsers/IAST.h>
#include "Storages/ColumnsDescription.h"
#include "Storages/ExportReplicatedMergeTreePartitionManifest.h"
#include "Storages/ExportReplicatedMergeTreePartitionTaskEntry.h"
#include "Storages/StorageInMemoryMetadata.h"
#include <Storages/MergeTree/MergeTreeData.h>
#include <filesystem>
#include <thread>
Expand Down Expand Up @@ -32,6 +35,7 @@ namespace ErrorCodes
{
extern const int FAULT_INJECTED;
extern const int BAD_ARGUMENTS;
extern const int INCOMPATIBLE_COLUMNS;
extern const int NO_SUCH_DATA_PART;
extern const int CORRUPTED_DATA;
extern const int NETWORK_ERROR;
Expand All @@ -47,6 +51,27 @@ namespace fs = std::filesystem;

namespace ExportPartitionUtils
{
void verifyExportDestinationCompatibility(
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Haven't checked the rest of the code yet, but we must also check for partition expression compatbility for data lakes

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah true, thanks. I haven't moved that part of the code yet. Will do!

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done it in 3767581

const ColumnsDescription & src_columns,
const ASTPtr & src_partition_key_ast,
const StorageInMemoryMetadata & dest_metadata,
const IStorage & dest_storage)
{
if (src_columns.getReadable().sizeOfDifference(dest_metadata.getColumns().getInsertable()))
throw Exception(ErrorCodes::INCOMPATIBLE_COLUMNS, "Tables have different structure");

if (dest_storage.isDataLake())
return;

const auto ast_to_string = [](const ASTPtr & ast) -> String
{
return ast ? ast->formatWithSecretsOneLine() : "";
};

if (ast_to_string(src_partition_key_ast) != ast_to_string(dest_metadata.getPartitionKeyAST()))
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Tables have different partition key");
}

std::vector<Field> getPartitionValuesForIcebergCommit(
MergeTreeData & storage, const String & partition_id)
{
Expand Down
Loading
Loading