From 335e18a1c01f7300a3a9ce19bb7e0022874ee581 Mon Sep 17 00:00:00 2001 From: Nicolae Vartolomei Date: Wed, 13 May 2026 09:58:59 +0100 Subject: [PATCH 1/3] iceberg: set MAP logical type on parquet map columns Without the LogicalType.MAP annotation, strict parquet readers (Spark via SparkParquetReaders) fail with "Not a struct type" when reading the column, even though `describe` reports the column as a map. The declared type lives in the iceberg metadata and is unaffected by the missing annotation; only column read-back was broken. Mirror the LIST converter and stamp the annotation on the map root element. Cover with a unit-test assertion that the annotation is present and an end-to-end ducktape test that exercises map read-back via both Spark and Trino. (cherry picked from commit 4ae42da5b48147c9b55b32a0606af3ac6e0ec0cc) --- src/v/iceberg/conversion/schema_parquet.cc | 1 + .../conversion/tests/iceberg_parquet_tests.cc | 3 ++ .../tests/datalake/datalake_e2e_test.py | 50 +++++++++++++++++++ 3 files changed, 54 insertions(+) diff --git a/src/v/iceberg/conversion/schema_parquet.cc b/src/v/iceberg/conversion/schema_parquet.cc index e978bd753c574..3833f7829152c 100644 --- a/src/v/iceberg/conversion/schema_parquet.cc +++ b/src/v/iceberg/conversion/schema_parquet.cc @@ -171,6 +171,7 @@ struct type_converting_visitor { * } */ serde::parquet::schema_element res; + res.logical_type = serde::parquet::map_type{}; serde::parquet::schema_element map_wrapper; map_wrapper.repetition_type = serde::parquet::field_repetition_type::repeated; diff --git a/src/v/iceberg/conversion/tests/iceberg_parquet_tests.cc b/src/v/iceberg/conversion/tests/iceberg_parquet_tests.cc index 9f0a2ce253df3..80350569bea23 100644 --- a/src/v/iceberg/conversion/tests/iceberg_parquet_tests.cc +++ b/src/v/iceberg/conversion/tests/iceberg_parquet_tests.cc @@ -474,6 +474,9 @@ TEST(DatalakeParquetSchema, Maps) { ? serde::parquet::field_repetition_type::required : serde::parquet::field_repetition_type::optional, schema.fields[i]->id)); + ASSERT_TRUE( + std::holds_alternative( + map.logical_type)); // check map key_value wrapper ASSERT_EQ(map.children.size(), 1); ASSERT_EQ(map.children[0].name(), "key_value"); diff --git a/tests/rptest/tests/datalake/datalake_e2e_test.py b/tests/rptest/tests/datalake/datalake_e2e_test.py index 6536197d10385..f7d04f0f8d4f4 100644 --- a/tests/rptest/tests/datalake/datalake_e2e_test.py +++ b/tests/rptest/tests/datalake/datalake_e2e_test.py @@ -556,6 +556,56 @@ def test_avro_schema(self, cloud_storage_type, query_engine, catalog_type): spark_describe_out ) + @cluster(num_nodes=3) + @matrix( + cloud_storage_type=supported_storage_types(), + query_engine=[QueryEngineType.SPARK, QueryEngineType.TRINO], + catalog_type=[CatalogType.REST_JDBC], + ) + def test_avro_map_values(self, cloud_storage_type, query_engine, catalog_type): + """Verify avro map values round-trip end-to-end.""" + count = 100 + topic = "avro_map_test_case" + table = f"redpanda.{topic}" + schema_str = """ + { + "type": "record", + "namespace": "com.redpanda.examples.avro", + "name": "MapTest", + "fields": [ + {"name": "kv", "type": {"type": "map", "values": "long"}} + ] + } + """ + + with DatalakeServices( + self.test_ctx, + redpanda=self.redpanda, + include_query_engines=[query_engine], + catalog_type=catalog_type, + ) as dl: + dl.create_iceberg_enabled_topic( + topic, iceberg_mode="value_schema_id_prefix" + ) + producer = AvroProducer( + { + "bootstrap.servers": self.redpanda.brokers(), + "schema.registry.url": self.redpanda.schema_reg().split(",")[0], + }, + default_value_schema=avro.loads(schema_str), + ) + for i in range(count): + producer.produce(topic=topic, value={"kv": {"k": i}}) + producer.flush() + dl.wait_for_translation(topic, msg_count=count) + + engine = dl.trino() if query_engine == QueryEngineType.TRINO else dl.spark() + rows = engine.run_query_fetch_all( + f"select sum(element_at(kv, 'k')) from {table}" + ) + expected = sum(range(count)) + assert rows[0][0] == expected, f"expected sum={expected}, got {rows}" + # Note: nothing unique about this test so run it with single catalog/query engine. @cluster(num_nodes=3) @matrix( From d02a90ed319901947f297be3a0e21fa3ee21a0db Mon Sep 17 00:00:00 2001 From: Nicolae Vartolomei Date: Wed, 13 May 2026 10:24:23 +0100 Subject: [PATCH 2/3] rptest: also read back map column via pyiceberg Trino's parquet reader is permissive enough to accept a map column written without the LogicalType.MAP annotation, so the engine-side assertion alone misses regressions that only show up in stricter readers. Add a pyiceberg read-back step that scans the column via arrow; pyiceberg rejects the malformed file (different failure mode than Spark) which gives us coverage independent of the query engine. (cherry picked from commit 5bfb4d39a1835919e0b7bc73a65024d3be3e8ac8) --- tests/rptest/tests/datalake/datalake_e2e_test.py | 12 ++++++++++-- 1 file changed, 10 insertions(+), 2 deletions(-) diff --git a/tests/rptest/tests/datalake/datalake_e2e_test.py b/tests/rptest/tests/datalake/datalake_e2e_test.py index f7d04f0f8d4f4..76431da4dd9a5 100644 --- a/tests/rptest/tests/datalake/datalake_e2e_test.py +++ b/tests/rptest/tests/datalake/datalake_e2e_test.py @@ -599,12 +599,20 @@ def test_avro_map_values(self, cloud_storage_type, query_engine, catalog_type): producer.flush() dl.wait_for_translation(topic, msg_count=count) + expected = sum(range(count)) + engine = dl.trino() if query_engine == QueryEngineType.TRINO else dl.spark() rows = engine.run_query_fetch_all( f"select sum(element_at(kv, 'k')) from {table}" ) - expected = sum(range(count)) - assert rows[0][0] == expected, f"expected sum={expected}, got {rows}" + assert rows[0][0] == expected, f"engine expected={expected}, got={rows}" + + iceberg_tbl = dl.catalog_client().load_table(("redpanda", topic)) + kv_col = iceberg_tbl.scan().to_arrow()["kv"].to_pylist() + pyiceberg_total = sum(dict(entry)["k"] for entry in kv_col) + assert pyiceberg_total == expected, ( + f"pyiceberg expected={expected}, got={pyiceberg_total}" + ) # Note: nothing unique about this test so run it with single catalog/query engine. @cluster(num_nodes=3) From 9b0d3907578dc0817f1def4507ac7125083da7ec Mon Sep 17 00:00:00 2001 From: Will Berkeley Date: Wed, 13 May 2026 08:25:24 -0700 Subject: [PATCH 3/3] iceberg: fix clang-format in parquet map test Co-Authored-By: Claude Sonnet 4.6 --- src/v/iceberg/conversion/tests/iceberg_parquet_tests.cc | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/src/v/iceberg/conversion/tests/iceberg_parquet_tests.cc b/src/v/iceberg/conversion/tests/iceberg_parquet_tests.cc index 80350569bea23..8df6a4366a37f 100644 --- a/src/v/iceberg/conversion/tests/iceberg_parquet_tests.cc +++ b/src/v/iceberg/conversion/tests/iceberg_parquet_tests.cc @@ -474,9 +474,8 @@ TEST(DatalakeParquetSchema, Maps) { ? serde::parquet::field_repetition_type::required : serde::parquet::field_repetition_type::optional, schema.fields[i]->id)); - ASSERT_TRUE( - std::holds_alternative( - map.logical_type)); + ASSERT_TRUE(std::holds_alternative( + map.logical_type)); // check map key_value wrapper ASSERT_EQ(map.children.size(), 1); ASSERT_EQ(map.children[0].name(), "key_value");