From 22bc690d2e6612d743e2820a629b78163adf5d44 Mon Sep 17 00:00:00 2001 From: Oren Leiman Date: Mon, 10 Nov 2025 21:44:20 -0800 Subject: [PATCH 1/4] dt/dl: Flatten schema_evolution_test parameter matrices Signed-off-by: Oren Leiman (cherry picked from commit 707422f85a2195537d9b6a807e916cca350784ba) --- .../tests/datalake/schema_evolution_test.py | 309 ++++++++++-------- 1 file changed, 174 insertions(+), 135 deletions(-) diff --git a/tests/rptest/tests/datalake/schema_evolution_test.py b/tests/rptest/tests/datalake/schema_evolution_test.py index 8543130425a61..fe66ee2dafe96 100644 --- a/tests/rptest/tests/datalake/schema_evolution_test.py +++ b/tests/rptest/tests/datalake/schema_evolution_test.py @@ -11,6 +11,7 @@ from collections.abc import Callable from contextlib import contextmanager from enum import Enum +from itertools import product from time import time from typing import NamedTuple @@ -246,9 +247,10 @@ class EvolutionTestCase(NamedTuple): initial_schema: GenericSchema next_schema: GenericSchema partition_spec: str | None = None + valid: bool = True -LEGAL_TEST_CASES = { +TEST_CASES = { "add_column": EvolutionTestCase( initial_schema=GenericSchema( fields=[ @@ -432,10 +434,7 @@ class EvolutionTestCase(NamedTuple): ), partition_spec="(first)", ), -} - -ILLEGAL_TEST_CASES = { - "illegal promotion int->string": EvolutionTestCase( + "illegal_promotion_int_to_string": EvolutionTestCase( initial_schema=GenericSchema( fields=[ { @@ -464,6 +463,7 @@ class EvolutionTestCase(NamedTuple): "ordinal": str(x), }, ), + valid=False, ), "drop column that appears in partition spec": EvolutionTestCase( initial_schema=GenericSchema( @@ -533,8 +533,6 @@ def __init__(self, test_ctx, *args, **kwargs): **kwargs, ) self.test_ctx = test_ctx - self.topic_name = "test" - self.table_name = f"redpanda.{self.topic_name}" def setUp(self): # redpanda will be started by DatalakeServices @@ -543,25 +541,52 @@ def setUp(self): def select( self, dl: DatalakeServices, + table: str, query_engine: QueryEngineType, cols=list[str], sort_by_offset: bool = True, ): qe = dl.spark() if query_engine == QueryEngineType.SPARK else dl.trino() - query = f"select redpanda.offset, {', '.join(cols)} from {self.table_name}" + query = f"select redpanda.offset, {', '.join(cols)} from {table}" self.redpanda.logger.debug(f"QUERY: '{query}'") out = qe.run_query_fetch_all(query) if sort_by_offset: out.sort(key=lambda r: r[0]) return out + @property + def valid_cases(self) -> dict[str, EvolutionTestCase]: + return {label: tc for label, tc in self.all_cases.items() if tc.valid} + + @property + def invalid_cases(self) -> dict[str, EvolutionTestCase]: + return {label: tc for label, tc in self.all_cases.items() if not tc.valid} + + @property + def all_cases(self) -> dict[str, EvolutionTestCase]: + return TEST_CASES + + def cases_by_modes( + self, cases: dict[str, EvolutionTestCase] + ) -> list[tuple[str, EvolutionTestCase, ProducerType]]: + return [t + (p,) for t, p in product(cases.items(), PRODUCER_MODES)] + + def topic_and_table(self, base: str, mode: ProducerType) -> tuple[str, str]: + topic = f"{base}_{mode}" + table = f"redpanda.{topic}" + return ( + topic, + table, + ) + @contextmanager def setup_services( self, query_engine: QueryEngineType, compat_level: str = "NONE", - partition_spec: str = None, catalog_type: CatalogType = filesystem_catalog_type(), + test_cases: dict[str, EvolutionTestCase] = {}, + with_partitioning: bool = False, ): with DatalakeServices( self.test_ctx, @@ -571,99 +596,82 @@ def setup_services( query_engine, ], ) as dl: - config = {TopicSpec.PROPERTY_ICEBERG_INVALID_RECORD_ACTION: "dlq_table"} - if partition_spec is not None: - config["redpanda.iceberg.partition.spec"] = partition_spec - dl.create_iceberg_enabled_topic( - self.topic_name, - iceberg_mode="value_schema_id_prefix", - config=config, - ) - SchemaRegistryClient( - {"url": self.redpanda.schema_reg().split(",")[0]} - ).set_compatibility( - subject_name=f"{self.topic_name}-value", level=compat_level - ) + for label, tc, produce_mode in self.cases_by_modes(test_cases): + topic, _ = self.topic_and_table(label, produce_mode) + config = {TopicSpec.PROPERTY_ICEBERG_INVALID_RECORD_ACTION: "dlq_table"} + if with_partitioning and tc.partition_spec is not None: + config["redpanda.iceberg.partition.spec"] = tc.partition_spec + dl.create_iceberg_enabled_topic( + topic, + iceberg_mode="value_schema_id_prefix", + config=config, + ) + SchemaRegistryClient( + {"url": self.redpanda.schema_reg().split(",")[0]} + ).set_compatibility(subject_name=f"{topic}-value", level=compat_level) yield dl # make sure nothing we did trashed our ability to read the whole table - self.select(dl, query_engine, cols=["*"]) + for label, tc, produce_mode in self.cases_by_modes(test_cases): + _, table = self.topic_and_table(label, produce_mode) + self.select( + dl, + table, + query_engine, + cols=["*"], + ) @cluster(num_nodes=3) @matrix( cloud_storage_type=supported_storage_types(), query_engine=QUERY_ENGINES, - test_case=list(LEGAL_TEST_CASES.keys()), - produce_mode=PRODUCER_MODES, catalog_type=supported_catalog_types(), ) - def test_legal_schema_evolution( - self, cloud_storage_type, query_engine, test_case, produce_mode, catalog_type - ): + def test_schema_evolution(self, cloud_storage_type, query_engine, catalog_type): """ Test that rows written with schema A are still readable after evolving the table to schema B. """ - tc = LEGAL_TEST_CASES[test_case] - with self.setup_services( - query_engine, partition_spec=tc.partition_spec, catalog_type=catalog_type - ) as dl: - count = 10 - ctx = TranslationContext() - tc.initial_schema.produce( - dl, self.topic_name, count, ctx, mode=produce_mode - ) - - tc.initial_schema.check_table_schema(dl, self.table_name, query_engine) - tc.next_schema.produce(dl, self.topic_name, count, ctx, mode=produce_mode) - tc.next_schema.check_table_schema(dl, self.table_name, query_engine) - select_out = self.select(dl, query_engine, tc.next_schema.field_names) - assert len(select_out) == count * 2, ( - f"Expected {count * 2} rows, got {select_out}" - ) - - @cluster(num_nodes=3) - @matrix( - cloud_storage_type=supported_storage_types(), - query_engine=QUERY_ENGINES, - test_case=list(ILLEGAL_TEST_CASES.keys()), - produce_mode=PRODUCER_MODES, - catalog_type=supported_catalog_types(), - ) - def test_illegal_schema_evolution( - self, cloud_storage_type, query_engine, test_case, produce_mode, catalog_type - ): - """ - check that records produced with an incompatible schema don't wind up - in the table. - """ - tc = ILLEGAL_TEST_CASES[test_case] with self.setup_services( - query_engine, partition_spec=tc.partition_spec, catalog_type=catalog_type + query_engine, + catalog_type=catalog_type, + test_cases=self.all_cases, ) as dl: - count = 10 - ctx = TranslationContext() - tc.initial_schema.produce( - dl, self.topic_name, count, ctx, mode=produce_mode - ) - tc.initial_schema.check_table_schema(dl, self.table_name, query_engine) - tc.next_schema.produce( - dl, - self.topic_name, - count, - ctx, - mode=produce_mode, - should_translate=False, - ) - tc.initial_schema.check_table_schema(dl, self.table_name, query_engine) - - select_out = self.select(dl, query_engine, tc.next_schema.field_names) - assert len(select_out) == count, f"Expected {count} rows, got {select_out}" - assert ctx.dlq == count, ( - f"Expected {count} records were dlq'ed, got {ctx.dlq}" - ) + for label, tc, produce_mode in self.cases_by_modes(self.all_cases): + topic, table = self.topic_and_table(label, produce_mode) + count = 10 + ctx = TranslationContext() + tc.initial_schema.produce(dl, topic, count, ctx, mode=produce_mode) + + tc.initial_schema.check_table_schema(dl, table, query_engine) + tc.next_schema.produce( + dl, topic, count, ctx, mode=produce_mode, should_translate=tc.valid + ) + if tc.valid: + tc.next_schema.check_table_schema(dl, table, query_engine) + + select_out = self.select( + dl, + table, + query_engine, + tc.next_schema.field_names, + ) + assert len(select_out) == count * 2, ( + f"Expected {count * 2} rows, got {select_out}" + ) + else: + tc.initial_schema.check_table_schema(dl, table, query_engine) + + select_out = self.select( + dl, table, query_engine, tc.next_schema.field_names + ) + assert len(select_out) == count, ( + f"Expected {count} rows, got {select_out}" + ) + assert ctx.dlq == count, ( + f"Expected {count} records were dlq'ed, got {ctx.dlq}" + ) - @cluster(num_nodes=3) @cluster(num_nodes=3) @matrix( cloud_storage_type=supported_storage_types(), @@ -679,20 +687,23 @@ def test_dropped_column_no_collision( (this should create a *new* column). Confirm that 'select A' reads only the new column, producing nulls for all rows written prior to the final update. """ - + label = "drop_column" + tc = self.valid_cases[label] with self.setup_services(query_engine, catalog_type=catalog_type) as dl: + topic, table = self.topic_and_table(label, produce_mode) count = 10 ctx = TranslationContext() - initial_schema, next_schema, _ = LEGAL_TEST_CASES["drop_column"] + initial_schema = tc.initial_schema + next_schema = tc.next_schema dropped_field_names = list( set(initial_schema.field_names) - set(next_schema.field_names) ) for schema in [initial_schema, next_schema]: - schema.produce(dl, self.topic_name, count, ctx, mode=produce_mode) - schema.check_table_schema(dl, self.table_name, query_engine) - select_out = self.select(dl, query_engine, schema.field_names) + schema.produce(dl, topic, count, ctx, mode=produce_mode) + schema.check_table_schema(dl, table, query_engine) + select_out = self.select(dl, table, query_engine, schema.field_names) assert len(select_out) == ctx.total, ( f"Expected {ctx.total} rows, got {select_out}" ) @@ -722,10 +733,10 @@ def test_dropped_column_no_collision( ], ) - restored_schema.produce(dl, self.topic_name, count, ctx, mode=produce_mode) - restored_schema.check_table_schema(dl, self.table_name, query_engine) + restored_schema.produce(dl, topic, count, ctx, mode=produce_mode) + restored_schema.check_table_schema(dl, table, query_engine) - select_out = self.select(dl, query_engine, dropped_field_names) + select_out = self.select(dl, table, query_engine, dropped_field_names) assert len(select_out) == count * 3, ( f"Expected {count * 3} rows, got {select_out}" ) @@ -751,24 +762,28 @@ def test_dropped_column_select_fails( Test that selecting a dropped column fails "gracefully" - or at least predictably and consistently. """ + label = "drop_column" + tc = self.valid_cases[label] with self.setup_services(query_engine, catalog_type=catalog_type) as dl: + topic, table = self.topic_and_table(label, produce_mode) count = 10 ctx = TranslationContext() - initial_schema, next_schema, _ = LEGAL_TEST_CASES["drop_column"] + initial_schema = tc.initial_schema + next_schema = tc.next_schema dropped_field_names = list( set(initial_schema.field_names) - set(next_schema.field_names) ) for schema in [initial_schema, next_schema]: - schema.produce(dl, self.topic_name, count, ctx, mode=produce_mode) - schema.check_table_schema(dl, self.table_name, query_engine) + schema.produce(dl, topic, count, ctx, mode=produce_mode) + schema.check_table_schema(dl, table, query_engine) if query_engine == QueryEngineType.SPARK: with expect_exception( pyhive.exc.OperationalError, lambda e: "UNRESOLVED_COLUMN" in e.args[0].status.errorMessage, ): - self.select(dl, query_engine, dropped_field_names) + self.select(dl, table, query_engine, dropped_field_names) else: with expect_exception( pyhive.exc.DatabaseError, @@ -780,64 +795,88 @@ def test_dropped_column_select_fails( @matrix( cloud_storage_type=supported_storage_types(), query_engine=QUERY_ENGINES, - produce_mode=PRODUCER_MODES, catalog_type=supported_catalog_types(), ) - def test_reorder_columns( - self, cloud_storage_type, query_engine, produce_mode, catalog_type - ): + def test_reorder_columns(self, cloud_storage_type, query_engine, catalog_type): """ Test that changing the order of columns doesn't change the values associated with a column or field name. """ - with self.setup_services(query_engine, catalog_type=catalog_type) as dl: - count = 10 - ctx = TranslationContext() - initial_schema, next_schema, _ = LEGAL_TEST_CASES["reorder_columns"] - for schema in [initial_schema, next_schema]: - schema.produce(dl, self.topic_name, count, ctx, mode=produce_mode) - schema.check_table_schema(dl, self.table_name, query_engine) - - for field in initial_schema.field_names: - select_out = self.select(dl, query_engine, [field]) - assert len(select_out) == count * 2, ( - f"Expected {count * 2} rows, got {len(select_out)}" - ) - assert all(r[1] == field for r in select_out), ( - f"{field} column mangled: {select_out}" - ) + label = "reorder_columns" + tc = self.valid_cases[label] + with self.setup_services( + query_engine, + catalog_type=catalog_type, + test_cases={ + label: tc, + }, + ) as dl: + for produce_mode in PRODUCER_MODES: + topic, table = self.topic_and_table(label, produce_mode) + count = 10 + ctx = TranslationContext() + initial_schema = tc.initial_schema + next_schema = tc.next_schema + for schema in [initial_schema, next_schema]: + schema.produce(dl, topic, count, ctx, mode=produce_mode) + schema.check_table_schema(dl, table, query_engine) + + for field in initial_schema.field_names: + select_out = self.select(dl, table, query_engine, [field]) + assert len(select_out) == count * 2, ( + f"Expected {count * 2} rows, got {len(select_out)}" + ) + assert all(r[1] == field for r in select_out), ( + f"{field} column mangled: {select_out}" + ) @cluster(num_nodes=3) @matrix( cloud_storage_type=supported_storage_types(), query_engine=QUERY_ENGINES, - test_case=list(LEGAL_TEST_CASES.keys()), - produce_mode=PRODUCER_MODES, catalog_type=supported_catalog_types(), ) - def test_old_schema_writer( - self, cloud_storage_type, query_engine, test_case, produce_mode, catalog_type - ): + def test_old_schema_writer(self, cloud_storage_type, query_engine, catalog_type): """ Tests that, after a backwards compatible update from schema A to schema B, we can keep tranlsating records produced with schema A without another schema update by falling back to an already extant parquet writer for schema A. """ - with self.setup_services(query_engine, catalog_type=catalog_type) as dl: - count = 10 - ctx = TranslationContext() + with self.setup_services( + query_engine, + catalog_type=catalog_type, + test_cases=self.valid_cases, + with_partitioning=False, + ) as dl: + for label, tc, produce_mode in self.cases_by_modes(self.valid_cases): + topic, table = self.topic_and_table(label, produce_mode) + count = 10 + ctx = TranslationContext() - initial_schema, next_schema, _ = LEGAL_TEST_CASES[test_case] + initial_schema, next_schema, _, _ = tc - for schema in [initial_schema, next_schema]: - schema.produce(dl, self.topic_name, count, ctx, mode=produce_mode) - schema.check_table_schema(dl, self.table_name, query_engine) + for schema in [initial_schema, next_schema]: + schema.produce(dl, topic, count, ctx, mode=produce_mode) + schema.check_table_schema(dl, table, query_engine) - initial_schema.produce(dl, self.topic_name, count, ctx, mode=produce_mode) - next_schema.check_table_schema(dl, self.table_name, query_engine) + initial_schema.produce(dl, topic, count, ctx, mode=produce_mode) + next_schema.check_table_schema(dl, table, query_engine) - select_out = self.select(dl, query_engine, next_schema.field_names) + select_out = self.select( + dl, table, query_engine, next_schema.field_names + ) - assert len(select_out) == count * 3, ( - f"Expected {count * 3} rows, got {len(select_out)}" - ) + assert len(select_out) == count * 3, ( + f"Expected {count * 3} rows, got {len(select_out)}" + ) + + # Check that producing with latest schema still works too. + next_schema.produce(dl, topic, count, ctx, mode=produce_mode) + + select_out = self.select( + dl, table, query_engine, next_schema.field_names + ) + + assert len(select_out) == count * 4, ( + f"Expected {count * 4} rows, got {len(select_out)}" + ) From 4e1a465e1590723f882b904a77e93e8ebd7dd9bf Mon Sep 17 00:00:00 2001 From: Oren Leiman Date: Tue, 11 Nov 2025 16:25:19 -0800 Subject: [PATCH 2/4] dt/dl: Collapse test_old_schema_writer into test_schema_evolution Signed-off-by: Oren Leiman (cherry picked from commit 8931902ce7f695f64f87fb34a2a9f99cea75ea0e) --- .../tests/datalake/schema_evolution_test.py | 72 ++++++------------- 1 file changed, 21 insertions(+), 51 deletions(-) diff --git a/tests/rptest/tests/datalake/schema_evolution_test.py b/tests/rptest/tests/datalake/schema_evolution_test.py index fe66ee2dafe96..f6bd3e810ec31 100644 --- a/tests/rptest/tests/datalake/schema_evolution_test.py +++ b/tests/rptest/tests/datalake/schema_evolution_test.py @@ -659,6 +659,27 @@ def test_schema_evolution(self, cloud_storage_type, query_engine, catalog_type): assert len(select_out) == count * 2, ( f"Expected {count * 2} rows, got {select_out}" ) + + # check that we can still produce with the original schema and that + # the current table schema doesn't change back as a result + tc.initial_schema.produce(dl, topic, count, ctx, mode=produce_mode) + tc.next_schema.check_table_schema(dl, table, query_engine) + select_out = self.select( + dl, table, query_engine, tc.next_schema.field_names + ) + assert len(select_out) == count * 3, ( + f"Expected {count * 3} rows, got {len(select_out)}" + ) + + # and finally check that producing with latest schema still works + tc.next_schema.produce(dl, topic, count, ctx, mode=produce_mode) + select_out = self.select( + dl, table, query_engine, tc.next_schema.field_names + ) + assert len(select_out) == count * 4, ( + f"Expected {count * 4} rows, got {len(select_out)}" + ) + else: tc.initial_schema.check_table_schema(dl, table, query_engine) @@ -829,54 +850,3 @@ def test_reorder_columns(self, cloud_storage_type, query_engine, catalog_type): assert all(r[1] == field for r in select_out), ( f"{field} column mangled: {select_out}" ) - - @cluster(num_nodes=3) - @matrix( - cloud_storage_type=supported_storage_types(), - query_engine=QUERY_ENGINES, - catalog_type=supported_catalog_types(), - ) - def test_old_schema_writer(self, cloud_storage_type, query_engine, catalog_type): - """ - Tests that, after a backwards compatible update from schema A to schema B, we can keep - tranlsating records produced with schema A without another schema update by falling back - to an already extant parquet writer for schema A. - """ - with self.setup_services( - query_engine, - catalog_type=catalog_type, - test_cases=self.valid_cases, - with_partitioning=False, - ) as dl: - for label, tc, produce_mode in self.cases_by_modes(self.valid_cases): - topic, table = self.topic_and_table(label, produce_mode) - count = 10 - ctx = TranslationContext() - - initial_schema, next_schema, _, _ = tc - - for schema in [initial_schema, next_schema]: - schema.produce(dl, topic, count, ctx, mode=produce_mode) - schema.check_table_schema(dl, table, query_engine) - - initial_schema.produce(dl, topic, count, ctx, mode=produce_mode) - next_schema.check_table_schema(dl, table, query_engine) - - select_out = self.select( - dl, table, query_engine, next_schema.field_names - ) - - assert len(select_out) == count * 3, ( - f"Expected {count * 3} rows, got {len(select_out)}" - ) - - # Check that producing with latest schema still works too. - next_schema.produce(dl, topic, count, ctx, mode=produce_mode) - - select_out = self.select( - dl, table, query_engine, next_schema.field_names - ) - - assert len(select_out) == count * 4, ( - f"Expected {count * 4} rows, got {len(select_out)}" - ) From 400a6cb2d3c3c0b9c65fa0f7cbceb465bb7afca2 Mon Sep 17 00:00:00 2001 From: Oren Leiman Date: Tue, 11 Nov 2025 23:12:15 -0800 Subject: [PATCH 3/4] dt/dl: Pare down query engine x catalog type params We don't actually need a cartesian product here, just to make sure that we cover every query engine and catalog type at least once. Signed-off-by: Oren Leiman (cherry picked from commit 2881d9cf778c1d451054403ba3913741d546a7e3) --- .../tests/datalake/schema_evolution_test.py | 47 ++++++++++++++----- 1 file changed, 34 insertions(+), 13 deletions(-) diff --git a/tests/rptest/tests/datalake/schema_evolution_test.py b/tests/rptest/tests/datalake/schema_evolution_test.py index f6bd3e810ec31..fbc2560757046 100644 --- a/tests/rptest/tests/datalake/schema_evolution_test.py +++ b/tests/rptest/tests/datalake/schema_evolution_test.py @@ -517,6 +517,28 @@ class EvolutionTestCase(NamedTuple): ] +def query_engines_and_catalogs() -> list[list[QueryEngineType | CatalogType]]: + """ + Produce a list of QueryEngineType/CatalogType pairs that includes each member + of each set at least once. The general idea here is to avoid redundant work, + in the sense that we're not interested in testing combinations of query engine + and catalog but rather verifying that our schema evolution approach doesn't + break any particular one of either. + + Returns a list of lists rather than tuples so the result is json roundtrippable + for use in matrix params. + """ + n_engines = len(QUERY_ENGINES) + n_catalogs = len(supported_catalog_types()) + return [ + [ + QUERY_ENGINES[i % n_engines], + supported_catalog_types()[i % n_catalogs], + ] + for i in range(0, max(n_engines, n_catalogs)) + ] + + class SchemaEvolutionE2ETests(RedpandaTest): def __init__(self, test_ctx, *args, **kwargs): super(SchemaEvolutionE2ETests, self).__init__( @@ -623,15 +645,14 @@ def setup_services( @cluster(num_nodes=3) @matrix( cloud_storage_type=supported_storage_types(), - query_engine=QUERY_ENGINES, - catalog_type=supported_catalog_types(), + qe_and_cat=query_engines_and_catalogs(), ) - def test_schema_evolution(self, cloud_storage_type, query_engine, catalog_type): + def test_schema_evolution(self, cloud_storage_type, qe_and_cat): """ Test that rows written with schema A are still readable after evolving the table to schema B. """ - + query_engine, catalog_type = qe_and_cat with self.setup_services( query_engine, catalog_type=catalog_type, @@ -696,18 +717,18 @@ def test_schema_evolution(self, cloud_storage_type, query_engine, catalog_type): @cluster(num_nodes=3) @matrix( cloud_storage_type=supported_storage_types(), - query_engine=QUERY_ENGINES, produce_mode=PRODUCER_MODES, - catalog_type=supported_catalog_types(), + qe_and_cat=query_engines_and_catalogs(), ) def test_dropped_column_no_collision( - self, cloud_storage_type, query_engine, produce_mode, catalog_type + self, cloud_storage_type, produce_mode, qe_and_cat ): """ Translate some records, drop field A, translate some more, reintroduce field A *by name* (this should create a *new* column). Confirm that 'select A' reads only the new column, producing nulls for all rows written prior to the final update. """ + query_engine, catalog_type = qe_and_cat label = "drop_column" tc = self.valid_cases[label] with self.setup_services(query_engine, catalog_type=catalog_type) as dl: @@ -772,17 +793,17 @@ def test_dropped_column_no_collision( @cluster(num_nodes=3) @matrix( cloud_storage_type=supported_storage_types(), - query_engine=QUERY_ENGINES, produce_mode=PRODUCER_MODES, - catalog_type=supported_catalog_types(), + qe_and_cat=query_engines_and_catalogs(), ) def test_dropped_column_select_fails( - self, cloud_storage_type, query_engine, produce_mode, catalog_type + self, cloud_storage_type, produce_mode, qe_and_cat ): """ Test that selecting a dropped column fails "gracefully" - or at least predictably and consistently. """ + query_engine, catalog_type = qe_and_cat label = "drop_column" tc = self.valid_cases[label] with self.setup_services(query_engine, catalog_type=catalog_type) as dl: @@ -815,14 +836,14 @@ def test_dropped_column_select_fails( @cluster(num_nodes=3) @matrix( cloud_storage_type=supported_storage_types(), - query_engine=QUERY_ENGINES, - catalog_type=supported_catalog_types(), + qe_and_cat=query_engines_and_catalogs(), ) - def test_reorder_columns(self, cloud_storage_type, query_engine, catalog_type): + def test_reorder_columns(self, cloud_storage_type, qe_and_cat): """ Test that changing the order of columns doesn't change the values associated with a column or field name. """ + query_engine, catalog_type = qe_and_cat label = "reorder_columns" tc = self.valid_cases[label] with self.setup_services( From f26f9afc2795a192e0184c63ef99a5762ee54f20 Mon Sep 17 00:00:00 2001 From: Oren Leiman Date: Thu, 13 Nov 2025 09:45:20 -0800 Subject: [PATCH 4/4] dt/dl: Improve logging in schema_evolution_test.py Signed-off-by: Oren Leiman (cherry picked from commit 75ab0daa139db7854f57e88909e1995faced9c14) --- .../tests/datalake/schema_evolution_test.py | 148 ++++++++++-------- 1 file changed, 81 insertions(+), 67 deletions(-) diff --git a/tests/rptest/tests/datalake/schema_evolution_test.py b/tests/rptest/tests/datalake/schema_evolution_test.py index fbc2560757046..2112694a1906e 100644 --- a/tests/rptest/tests/datalake/schema_evolution_test.py +++ b/tests/rptest/tests/datalake/schema_evolution_test.py @@ -659,60 +659,70 @@ def test_schema_evolution(self, cloud_storage_type, qe_and_cat): test_cases=self.all_cases, ) as dl: for label, tc, produce_mode in self.cases_by_modes(self.all_cases): - topic, table = self.topic_and_table(label, produce_mode) - count = 10 - ctx = TranslationContext() - tc.initial_schema.produce(dl, topic, count, ctx, mode=produce_mode) - - tc.initial_schema.check_table_schema(dl, table, query_engine) - tc.next_schema.produce( - dl, topic, count, ctx, mode=produce_mode, should_translate=tc.valid - ) - if tc.valid: - tc.next_schema.check_table_schema(dl, table, query_engine) - - select_out = self.select( - dl, - table, - query_engine, - tc.next_schema.field_names, - ) - assert len(select_out) == count * 2, ( - f"Expected {count * 2} rows, got {select_out}" - ) - - # check that we can still produce with the original schema and that - # the current table schema doesn't change back as a result + try: + topic, table = self.topic_and_table(label, produce_mode) + count = 10 + ctx = TranslationContext() tc.initial_schema.produce(dl, topic, count, ctx, mode=produce_mode) - tc.next_schema.check_table_schema(dl, table, query_engine) - select_out = self.select( - dl, table, query_engine, tc.next_schema.field_names - ) - assert len(select_out) == count * 3, ( - f"Expected {count * 3} rows, got {len(select_out)}" - ) - # and finally check that producing with latest schema still works - tc.next_schema.produce(dl, topic, count, ctx, mode=produce_mode) - select_out = self.select( - dl, table, query_engine, tc.next_schema.field_names - ) - assert len(select_out) == count * 4, ( - f"Expected {count * 4} rows, got {len(select_out)}" - ) - - else: tc.initial_schema.check_table_schema(dl, table, query_engine) - - select_out = self.select( - dl, table, query_engine, tc.next_schema.field_names - ) - assert len(select_out) == count, ( - f"Expected {count} rows, got {select_out}" - ) - assert ctx.dlq == count, ( - f"Expected {count} records were dlq'ed, got {ctx.dlq}" + tc.next_schema.produce( + dl, + topic, + count, + ctx, + mode=produce_mode, + should_translate=tc.valid, ) + if tc.valid: + tc.next_schema.check_table_schema(dl, table, query_engine) + + select_out = self.select( + dl, + table, + query_engine, + tc.next_schema.field_names, + ) + assert len(select_out) == count * 2, ( + f"Expected {count * 2} rows, got {select_out}" + ) + + # check that we can still produce with the original schema and that + # the current table schema doesn't change back as a result + tc.initial_schema.produce( + dl, topic, count, ctx, mode=produce_mode + ) + tc.next_schema.check_table_schema(dl, table, query_engine) + select_out = self.select( + dl, table, query_engine, tc.next_schema.field_names + ) + assert len(select_out) == count * 3, ( + f"Expected {count * 3} rows, got {len(select_out)}" + ) + + # and finally check that producing with latest schema still works + tc.next_schema.produce(dl, topic, count, ctx, mode=produce_mode) + select_out = self.select( + dl, table, query_engine, tc.next_schema.field_names + ) + assert len(select_out) == count * 4, ( + f"Expected {count * 4} rows, got {len(select_out)}" + ) + + else: + tc.initial_schema.check_table_schema(dl, table, query_engine) + + select_out = self.select( + dl, table, query_engine, tc.next_schema.field_names + ) + assert len(select_out) == count, ( + f"Expected {count} rows, got {select_out}" + ) + assert ctx.dlq == count, ( + f"Expected {count} records were dlq'ed, got {ctx.dlq}" + ) + except Exception as e: + raise Exception(f"Test failed for {label=}, {produce_mode=}") from e @cluster(num_nodes=3) @matrix( @@ -854,20 +864,24 @@ def test_reorder_columns(self, cloud_storage_type, qe_and_cat): }, ) as dl: for produce_mode in PRODUCER_MODES: - topic, table = self.topic_and_table(label, produce_mode) - count = 10 - ctx = TranslationContext() - initial_schema = tc.initial_schema - next_schema = tc.next_schema - for schema in [initial_schema, next_schema]: - schema.produce(dl, topic, count, ctx, mode=produce_mode) - schema.check_table_schema(dl, table, query_engine) - - for field in initial_schema.field_names: - select_out = self.select(dl, table, query_engine, [field]) - assert len(select_out) == count * 2, ( - f"Expected {count * 2} rows, got {len(select_out)}" - ) - assert all(r[1] == field for r in select_out), ( - f"{field} column mangled: {select_out}" - ) + try: + topic, table = self.topic_and_table(label, produce_mode) + count = 10 + ctx = TranslationContext() + initial_schema = tc.initial_schema + next_schema = tc.next_schema + for schema in [initial_schema, next_schema]: + schema.produce(dl, topic, count, ctx, mode=produce_mode) + schema.check_table_schema(dl, table, query_engine) + + for field in initial_schema.field_names: + select_out = self.select(dl, table, query_engine, [field]) + assert len(select_out) == count * 2, ( + f"Expected {count * 2} rows, got {len(select_out)}" + ) + assert all(r[1] == field for r in select_out), ( + f"{field} column mangled: {select_out}" + ) + + except Exception as e: + raise Exception(f"Test failed for {produce_mode=}") from e