diff --git a/src/v/iceberg/conversion/schema_parquet.cc b/src/v/iceberg/conversion/schema_parquet.cc index e978bd753c574..3833f7829152c 100644 --- a/src/v/iceberg/conversion/schema_parquet.cc +++ b/src/v/iceberg/conversion/schema_parquet.cc @@ -171,6 +171,7 @@ struct type_converting_visitor { * } */ serde::parquet::schema_element res; + res.logical_type = serde::parquet::map_type{}; serde::parquet::schema_element map_wrapper; map_wrapper.repetition_type = serde::parquet::field_repetition_type::repeated; diff --git a/src/v/iceberg/conversion/tests/iceberg_parquet_tests.cc b/src/v/iceberg/conversion/tests/iceberg_parquet_tests.cc index 9f0a2ce253df3..8df6a4366a37f 100644 --- a/src/v/iceberg/conversion/tests/iceberg_parquet_tests.cc +++ b/src/v/iceberg/conversion/tests/iceberg_parquet_tests.cc @@ -474,6 +474,8 @@ TEST(DatalakeParquetSchema, Maps) { ? serde::parquet::field_repetition_type::required : serde::parquet::field_repetition_type::optional, schema.fields[i]->id)); + ASSERT_TRUE(std::holds_alternative( + map.logical_type)); // check map key_value wrapper ASSERT_EQ(map.children.size(), 1); ASSERT_EQ(map.children[0].name(), "key_value"); diff --git a/tests/rptest/tests/datalake/datalake_e2e_test.py b/tests/rptest/tests/datalake/datalake_e2e_test.py index 6536197d10385..76431da4dd9a5 100644 --- a/tests/rptest/tests/datalake/datalake_e2e_test.py +++ b/tests/rptest/tests/datalake/datalake_e2e_test.py @@ -556,6 +556,64 @@ def test_avro_schema(self, cloud_storage_type, query_engine, catalog_type): spark_describe_out ) + @cluster(num_nodes=3) + @matrix( + cloud_storage_type=supported_storage_types(), + query_engine=[QueryEngineType.SPARK, QueryEngineType.TRINO], + catalog_type=[CatalogType.REST_JDBC], + ) + def test_avro_map_values(self, cloud_storage_type, query_engine, catalog_type): + """Verify avro map values round-trip end-to-end.""" + count = 100 + topic = "avro_map_test_case" + table = f"redpanda.{topic}" + schema_str = """ + { + "type": "record", + "namespace": "com.redpanda.examples.avro", + "name": "MapTest", + "fields": [ + {"name": "kv", "type": {"type": "map", "values": "long"}} + ] + } + """ + + with DatalakeServices( + self.test_ctx, + redpanda=self.redpanda, + include_query_engines=[query_engine], + catalog_type=catalog_type, + ) as dl: + dl.create_iceberg_enabled_topic( + topic, iceberg_mode="value_schema_id_prefix" + ) + producer = AvroProducer( + { + "bootstrap.servers": self.redpanda.brokers(), + "schema.registry.url": self.redpanda.schema_reg().split(",")[0], + }, + default_value_schema=avro.loads(schema_str), + ) + for i in range(count): + producer.produce(topic=topic, value={"kv": {"k": i}}) + producer.flush() + dl.wait_for_translation(topic, msg_count=count) + + expected = sum(range(count)) + + engine = dl.trino() if query_engine == QueryEngineType.TRINO else dl.spark() + rows = engine.run_query_fetch_all( + f"select sum(element_at(kv, 'k')) from {table}" + ) + assert rows[0][0] == expected, f"engine expected={expected}, got={rows}" + + iceberg_tbl = dl.catalog_client().load_table(("redpanda", topic)) + kv_col = iceberg_tbl.scan().to_arrow()["kv"].to_pylist() + pyiceberg_total = sum(dict(entry)["k"] for entry in kv_col) + assert pyiceberg_total == expected, ( + f"pyiceberg expected={expected}, got={pyiceberg_total}" + ) + # Note: nothing unique about this test so run it with single catalog/query engine. @cluster(num_nodes=3) @matrix(