Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions src/v/iceberg/conversion/schema_parquet.cc
Original file line number Diff line number Diff line change
Expand Up @@ -171,6 +171,7 @@ struct type_converting_visitor {
* }
*/
serde::parquet::schema_element res;
res.logical_type = serde::parquet::map_type{};
serde::parquet::schema_element map_wrapper;
map_wrapper.repetition_type
= serde::parquet::field_repetition_type::repeated;
Expand Down
3 changes: 3 additions & 0 deletions src/v/iceberg/conversion/tests/iceberg_parquet_tests.cc
Original file line number Diff line number Diff line change
Expand Up @@ -494,6 +494,9 @@ TEST(DatalakeParquetSchema, Maps) {
? serde::parquet::field_repetition_type::required
: serde::parquet::field_repetition_type::optional,
schema.fields[i]->id));
ASSERT_TRUE(
std::holds_alternative<serde::parquet::map_type>(
map.logical_type));
// check map key_value wrapper
ASSERT_EQ(map.children.size(), 1);
ASSERT_EQ(map.children[0].name(), "key_value");
Expand Down
50 changes: 50 additions & 0 deletions tests/rptest/tests/datalake/datalake_e2e_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -517,6 +517,56 @@ def test_avro_schema(self, cloud_storage_type, query_engine, catalog_type):
spark_describe_out
)

@cluster(num_nodes=3)
@matrix(
cloud_storage_type=supported_storage_types(),
query_engine=[QueryEngineType.SPARK, QueryEngineType.TRINO],
catalog_type=[CatalogType.REST_JDBC],
)
def test_avro_map_values(self, cloud_storage_type, query_engine, catalog_type):
"""Verify avro map<string, long> values round-trip end-to-end."""
count = 100
topic = "avro_map_test_case"
table = f"redpanda.{topic}"
schema_str = """
{
"type": "record",
"namespace": "com.redpanda.examples.avro",
"name": "MapTest",
"fields": [
{"name": "kv", "type": {"type": "map", "values": "long"}}
]
}
"""

with DatalakeServices(
self.test_ctx,
redpanda=self.redpanda,
include_query_engines=[query_engine],
catalog_type=catalog_type,
) as dl:
dl.create_iceberg_enabled_topic(
topic, iceberg_mode="value_schema_id_prefix"
)
producer = AvroProducer(
{
"bootstrap.servers": self.redpanda.brokers(),
"schema.registry.url": self.redpanda.schema_reg().split(",")[0],
},
default_value_schema=avro.loads(schema_str),
)
for i in range(count):
producer.produce(topic=topic, value={"kv": {"k": i}})
producer.flush()
dl.wait_for_translation(topic, msg_count=count)

engine = dl.trino() if query_engine == QueryEngineType.TRINO else dl.spark()
rows = engine.run_query_fetch_all(
f"select sum(element_at(kv, 'k')) from {table}"
)
expected = sum(range(count))
assert rows[0][0] == expected, f"expected sum={expected}, got {rows}"

# Note: nothing unique about this test so run it with single catalog/query engine.
@cluster(num_nodes=3)
@matrix(
Expand Down
Loading