diff --git a/tests/rptest/tests/datalake/schema_evolution_test.py b/tests/rptest/tests/datalake/schema_evolution_test.py index 8543130425a61..2112694a1906e 100644 --- a/tests/rptest/tests/datalake/schema_evolution_test.py +++ b/tests/rptest/tests/datalake/schema_evolution_test.py @@ -11,6 +11,7 @@ from collections.abc import Callable from contextlib import contextmanager from enum import Enum +from itertools import product from time import time from typing import NamedTuple @@ -246,9 +247,10 @@ class EvolutionTestCase(NamedTuple): initial_schema: GenericSchema next_schema: GenericSchema partition_spec: str | None = None + valid: bool = True -LEGAL_TEST_CASES = { +TEST_CASES = { "add_column": EvolutionTestCase( initial_schema=GenericSchema( fields=[ @@ -432,10 +434,7 @@ class EvolutionTestCase(NamedTuple): ), partition_spec="(first)", ), -} - -ILLEGAL_TEST_CASES = { - "illegal promotion int->string": EvolutionTestCase( + "illegal_promotion_int_to_string": EvolutionTestCase( initial_schema=GenericSchema( fields=[ { @@ -464,6 +463,7 @@ class EvolutionTestCase(NamedTuple): "ordinal": str(x), }, ), + valid=False, ), "drop column that appears in partition spec": EvolutionTestCase( initial_schema=GenericSchema( @@ -517,6 +517,28 @@ class EvolutionTestCase(NamedTuple): ] +def query_engines_and_catalogs() -> list[list[QueryEngineType | CatalogType]]: + """ + Produce a list of QueryEngineType/CatalogType pairs that includes each member + of each set at least once. The general idea here is to avoid redundant work, + in the sense that we're not interested in testing combinations of query engine + and catalog but rather verifying that our schema evolution approach doesn't + break any particular one of either. + + Returns a list of lists rather than tuples so the result is json roundtrippable + for use in matrix params. + """ + n_engines = len(QUERY_ENGINES) + n_catalogs = len(supported_catalog_types()) + return [ + [ + QUERY_ENGINES[i % n_engines], + supported_catalog_types()[i % n_catalogs], + ] + for i in range(0, max(n_engines, n_catalogs)) + ] + + class SchemaEvolutionE2ETests(RedpandaTest): def __init__(self, test_ctx, *args, **kwargs): super(SchemaEvolutionE2ETests, self).__init__( @@ -533,8 +555,6 @@ def __init__(self, test_ctx, *args, **kwargs): **kwargs, ) self.test_ctx = test_ctx - self.topic_name = "test" - self.table_name = f"redpanda.{self.topic_name}" def setUp(self): # redpanda will be started by DatalakeServices @@ -543,25 +563,52 @@ def setUp(self): def select( self, dl: DatalakeServices, + table: str, query_engine: QueryEngineType, cols=list[str], sort_by_offset: bool = True, ): qe = dl.spark() if query_engine == QueryEngineType.SPARK else dl.trino() - query = f"select redpanda.offset, {', '.join(cols)} from {self.table_name}" + query = f"select redpanda.offset, {', '.join(cols)} from {table}" self.redpanda.logger.debug(f"QUERY: '{query}'") out = qe.run_query_fetch_all(query) if sort_by_offset: out.sort(key=lambda r: r[0]) return out + @property + def valid_cases(self) -> dict[str, EvolutionTestCase]: + return {label: tc for label, tc in self.all_cases.items() if tc.valid} + + @property + def invalid_cases(self) -> dict[str, EvolutionTestCase]: + return {label: tc for label, tc in self.all_cases.items() if not tc.valid} + + @property + def all_cases(self) -> dict[str, EvolutionTestCase]: + return TEST_CASES + + def cases_by_modes( + self, cases: dict[str, EvolutionTestCase] + ) -> list[tuple[str, EvolutionTestCase, ProducerType]]: + return [t + (p,) for t, p in product(cases.items(), PRODUCER_MODES)] + + def topic_and_table(self, base: str, mode: ProducerType) -> tuple[str, str]: + topic = f"{base}_{mode}" + table = f"redpanda.{topic}" + return ( + topic, + table, + ) + @contextmanager def setup_services( self, query_engine: QueryEngineType, compat_level: str = "NONE", - partition_spec: str = None, catalog_type: CatalogType = filesystem_catalog_type(), + test_cases: dict[str, EvolutionTestCase] = {}, + with_partitioning: bool = False, ): with DatalakeServices( self.test_ctx, @@ -571,128 +618,144 @@ def setup_services( query_engine, ], ) as dl: - config = {TopicSpec.PROPERTY_ICEBERG_INVALID_RECORD_ACTION: "dlq_table"} - if partition_spec is not None: - config["redpanda.iceberg.partition.spec"] = partition_spec - dl.create_iceberg_enabled_topic( - self.topic_name, - iceberg_mode="value_schema_id_prefix", - config=config, - ) - SchemaRegistryClient( - {"url": self.redpanda.schema_reg().split(",")[0]} - ).set_compatibility( - subject_name=f"{self.topic_name}-value", level=compat_level - ) + for label, tc, produce_mode in self.cases_by_modes(test_cases): + topic, _ = self.topic_and_table(label, produce_mode) + config = {TopicSpec.PROPERTY_ICEBERG_INVALID_RECORD_ACTION: "dlq_table"} + if with_partitioning and tc.partition_spec is not None: + config["redpanda.iceberg.partition.spec"] = tc.partition_spec + dl.create_iceberg_enabled_topic( + topic, + iceberg_mode="value_schema_id_prefix", + config=config, + ) + SchemaRegistryClient( + {"url": self.redpanda.schema_reg().split(",")[0]} + ).set_compatibility(subject_name=f"{topic}-value", level=compat_level) yield dl # make sure nothing we did trashed our ability to read the whole table - self.select(dl, query_engine, cols=["*"]) + for label, tc, produce_mode in self.cases_by_modes(test_cases): + _, table = self.topic_and_table(label, produce_mode) + self.select( + dl, + table, + query_engine, + cols=["*"], + ) @cluster(num_nodes=3) @matrix( cloud_storage_type=supported_storage_types(), - query_engine=QUERY_ENGINES, - test_case=list(LEGAL_TEST_CASES.keys()), - produce_mode=PRODUCER_MODES, - catalog_type=supported_catalog_types(), + qe_and_cat=query_engines_and_catalogs(), ) - def test_legal_schema_evolution( - self, cloud_storage_type, query_engine, test_case, produce_mode, catalog_type - ): + def test_schema_evolution(self, cloud_storage_type, qe_and_cat): """ Test that rows written with schema A are still readable after evolving the table to schema B. """ - tc = LEGAL_TEST_CASES[test_case] - with self.setup_services( - query_engine, partition_spec=tc.partition_spec, catalog_type=catalog_type - ) as dl: - count = 10 - ctx = TranslationContext() - tc.initial_schema.produce( - dl, self.topic_name, count, ctx, mode=produce_mode - ) - - tc.initial_schema.check_table_schema(dl, self.table_name, query_engine) - tc.next_schema.produce(dl, self.topic_name, count, ctx, mode=produce_mode) - tc.next_schema.check_table_schema(dl, self.table_name, query_engine) - - select_out = self.select(dl, query_engine, tc.next_schema.field_names) - assert len(select_out) == count * 2, ( - f"Expected {count * 2} rows, got {select_out}" - ) - - @cluster(num_nodes=3) - @matrix( - cloud_storage_type=supported_storage_types(), - query_engine=QUERY_ENGINES, - test_case=list(ILLEGAL_TEST_CASES.keys()), - produce_mode=PRODUCER_MODES, - catalog_type=supported_catalog_types(), - ) - def test_illegal_schema_evolution( - self, cloud_storage_type, query_engine, test_case, produce_mode, catalog_type - ): - """ - check that records produced with an incompatible schema don't wind up - in the table. - """ - tc = ILLEGAL_TEST_CASES[test_case] + query_engine, catalog_type = qe_and_cat with self.setup_services( - query_engine, partition_spec=tc.partition_spec, catalog_type=catalog_type + query_engine, + catalog_type=catalog_type, + test_cases=self.all_cases, ) as dl: - count = 10 - ctx = TranslationContext() - tc.initial_schema.produce( - dl, self.topic_name, count, ctx, mode=produce_mode - ) - tc.initial_schema.check_table_schema(dl, self.table_name, query_engine) - tc.next_schema.produce( - dl, - self.topic_name, - count, - ctx, - mode=produce_mode, - should_translate=False, - ) - tc.initial_schema.check_table_schema(dl, self.table_name, query_engine) - - select_out = self.select(dl, query_engine, tc.next_schema.field_names) - assert len(select_out) == count, f"Expected {count} rows, got {select_out}" - assert ctx.dlq == count, ( - f"Expected {count} records were dlq'ed, got {ctx.dlq}" - ) + for label, tc, produce_mode in self.cases_by_modes(self.all_cases): + try: + topic, table = self.topic_and_table(label, produce_mode) + count = 10 + ctx = TranslationContext() + tc.initial_schema.produce(dl, topic, count, ctx, mode=produce_mode) + + tc.initial_schema.check_table_schema(dl, table, query_engine) + tc.next_schema.produce( + dl, + topic, + count, + ctx, + mode=produce_mode, + should_translate=tc.valid, + ) + if tc.valid: + tc.next_schema.check_table_schema(dl, table, query_engine) + + select_out = self.select( + dl, + table, + query_engine, + tc.next_schema.field_names, + ) + assert len(select_out) == count * 2, ( + f"Expected {count * 2} rows, got {select_out}" + ) + + # check that we can still produce with the original schema and that + # the current table schema doesn't change back as a result + tc.initial_schema.produce( + dl, topic, count, ctx, mode=produce_mode + ) + tc.next_schema.check_table_schema(dl, table, query_engine) + select_out = self.select( + dl, table, query_engine, tc.next_schema.field_names + ) + assert len(select_out) == count * 3, ( + f"Expected {count * 3} rows, got {len(select_out)}" + ) + + # and finally check that producing with latest schema still works + tc.next_schema.produce(dl, topic, count, ctx, mode=produce_mode) + select_out = self.select( + dl, table, query_engine, tc.next_schema.field_names + ) + assert len(select_out) == count * 4, ( + f"Expected {count * 4} rows, got {len(select_out)}" + ) + + else: + tc.initial_schema.check_table_schema(dl, table, query_engine) + + select_out = self.select( + dl, table, query_engine, tc.next_schema.field_names + ) + assert len(select_out) == count, ( + f"Expected {count} rows, got {select_out}" + ) + assert ctx.dlq == count, ( + f"Expected {count} records were dlq'ed, got {ctx.dlq}" + ) + except Exception as e: + raise Exception(f"Test failed for {label=}, {produce_mode=}") from e - @cluster(num_nodes=3) @cluster(num_nodes=3) @matrix( cloud_storage_type=supported_storage_types(), - query_engine=QUERY_ENGINES, produce_mode=PRODUCER_MODES, - catalog_type=supported_catalog_types(), + qe_and_cat=query_engines_and_catalogs(), ) def test_dropped_column_no_collision( - self, cloud_storage_type, query_engine, produce_mode, catalog_type + self, cloud_storage_type, produce_mode, qe_and_cat ): """ Translate some records, drop field A, translate some more, reintroduce field A *by name* (this should create a *new* column). Confirm that 'select A' reads only the new column, producing nulls for all rows written prior to the final update. """ - + query_engine, catalog_type = qe_and_cat + label = "drop_column" + tc = self.valid_cases[label] with self.setup_services(query_engine, catalog_type=catalog_type) as dl: + topic, table = self.topic_and_table(label, produce_mode) count = 10 ctx = TranslationContext() - initial_schema, next_schema, _ = LEGAL_TEST_CASES["drop_column"] + initial_schema = tc.initial_schema + next_schema = tc.next_schema dropped_field_names = list( set(initial_schema.field_names) - set(next_schema.field_names) ) for schema in [initial_schema, next_schema]: - schema.produce(dl, self.topic_name, count, ctx, mode=produce_mode) - schema.check_table_schema(dl, self.table_name, query_engine) - select_out = self.select(dl, query_engine, schema.field_names) + schema.produce(dl, topic, count, ctx, mode=produce_mode) + schema.check_table_schema(dl, table, query_engine) + select_out = self.select(dl, table, query_engine, schema.field_names) assert len(select_out) == ctx.total, ( f"Expected {ctx.total} rows, got {select_out}" ) @@ -722,10 +785,10 @@ def test_dropped_column_no_collision( ], ) - restored_schema.produce(dl, self.topic_name, count, ctx, mode=produce_mode) - restored_schema.check_table_schema(dl, self.table_name, query_engine) + restored_schema.produce(dl, topic, count, ctx, mode=produce_mode) + restored_schema.check_table_schema(dl, table, query_engine) - select_out = self.select(dl, query_engine, dropped_field_names) + select_out = self.select(dl, table, query_engine, dropped_field_names) assert len(select_out) == count * 3, ( f"Expected {count * 3} rows, got {select_out}" ) @@ -740,35 +803,39 @@ def test_dropped_column_no_collision( @cluster(num_nodes=3) @matrix( cloud_storage_type=supported_storage_types(), - query_engine=QUERY_ENGINES, produce_mode=PRODUCER_MODES, - catalog_type=supported_catalog_types(), + qe_and_cat=query_engines_and_catalogs(), ) def test_dropped_column_select_fails( - self, cloud_storage_type, query_engine, produce_mode, catalog_type + self, cloud_storage_type, produce_mode, qe_and_cat ): """ Test that selecting a dropped column fails "gracefully" - or at least predictably and consistently. """ + query_engine, catalog_type = qe_and_cat + label = "drop_column" + tc = self.valid_cases[label] with self.setup_services(query_engine, catalog_type=catalog_type) as dl: + topic, table = self.topic_and_table(label, produce_mode) count = 10 ctx = TranslationContext() - initial_schema, next_schema, _ = LEGAL_TEST_CASES["drop_column"] + initial_schema = tc.initial_schema + next_schema = tc.next_schema dropped_field_names = list( set(initial_schema.field_names) - set(next_schema.field_names) ) for schema in [initial_schema, next_schema]: - schema.produce(dl, self.topic_name, count, ctx, mode=produce_mode) - schema.check_table_schema(dl, self.table_name, query_engine) + schema.produce(dl, topic, count, ctx, mode=produce_mode) + schema.check_table_schema(dl, table, query_engine) if query_engine == QueryEngineType.SPARK: with expect_exception( pyhive.exc.OperationalError, lambda e: "UNRESOLVED_COLUMN" in e.args[0].status.errorMessage, ): - self.select(dl, query_engine, dropped_field_names) + self.select(dl, table, query_engine, dropped_field_names) else: with expect_exception( pyhive.exc.DatabaseError, @@ -779,65 +846,42 @@ def test_dropped_column_select_fails( @cluster(num_nodes=3) @matrix( cloud_storage_type=supported_storage_types(), - query_engine=QUERY_ENGINES, - produce_mode=PRODUCER_MODES, - catalog_type=supported_catalog_types(), + qe_and_cat=query_engines_and_catalogs(), ) - def test_reorder_columns( - self, cloud_storage_type, query_engine, produce_mode, catalog_type - ): + def test_reorder_columns(self, cloud_storage_type, qe_and_cat): """ Test that changing the order of columns doesn't change the values associated with a column or field name. """ - with self.setup_services(query_engine, catalog_type=catalog_type) as dl: - count = 10 - ctx = TranslationContext() - initial_schema, next_schema, _ = LEGAL_TEST_CASES["reorder_columns"] - for schema in [initial_schema, next_schema]: - schema.produce(dl, self.topic_name, count, ctx, mode=produce_mode) - schema.check_table_schema(dl, self.table_name, query_engine) - - for field in initial_schema.field_names: - select_out = self.select(dl, query_engine, [field]) - assert len(select_out) == count * 2, ( - f"Expected {count * 2} rows, got {len(select_out)}" - ) - assert all(r[1] == field for r in select_out), ( - f"{field} column mangled: {select_out}" - ) - - @cluster(num_nodes=3) - @matrix( - cloud_storage_type=supported_storage_types(), - query_engine=QUERY_ENGINES, - test_case=list(LEGAL_TEST_CASES.keys()), - produce_mode=PRODUCER_MODES, - catalog_type=supported_catalog_types(), - ) - def test_old_schema_writer( - self, cloud_storage_type, query_engine, test_case, produce_mode, catalog_type - ): - """ - Tests that, after a backwards compatible update from schema A to schema B, we can keep - tranlsating records produced with schema A without another schema update by falling back - to an already extant parquet writer for schema A. - """ - with self.setup_services(query_engine, catalog_type=catalog_type) as dl: - count = 10 - ctx = TranslationContext() - - initial_schema, next_schema, _ = LEGAL_TEST_CASES[test_case] - - for schema in [initial_schema, next_schema]: - schema.produce(dl, self.topic_name, count, ctx, mode=produce_mode) - schema.check_table_schema(dl, self.table_name, query_engine) - - initial_schema.produce(dl, self.topic_name, count, ctx, mode=produce_mode) - next_schema.check_table_schema(dl, self.table_name, query_engine) - - select_out = self.select(dl, query_engine, next_schema.field_names) - - assert len(select_out) == count * 3, ( - f"Expected {count * 3} rows, got {len(select_out)}" - ) + query_engine, catalog_type = qe_and_cat + label = "reorder_columns" + tc = self.valid_cases[label] + with self.setup_services( + query_engine, + catalog_type=catalog_type, + test_cases={ + label: tc, + }, + ) as dl: + for produce_mode in PRODUCER_MODES: + try: + topic, table = self.topic_and_table(label, produce_mode) + count = 10 + ctx = TranslationContext() + initial_schema = tc.initial_schema + next_schema = tc.next_schema + for schema in [initial_schema, next_schema]: + schema.produce(dl, topic, count, ctx, mode=produce_mode) + schema.check_table_schema(dl, table, query_engine) + + for field in initial_schema.field_names: + select_out = self.select(dl, table, query_engine, [field]) + assert len(select_out) == count * 2, ( + f"Expected {count * 2} rows, got {len(select_out)}" + ) + assert all(r[1] == field for r in select_out), ( + f"{field} column mangled: {select_out}" + ) + + except Exception as e: + raise Exception(f"Test failed for {produce_mode=}") from e