Skip to content

Commit db2c291

Browse files
maomaodevpan3793
authored andcommitted
[KYUUBI #7398] SERDEPROPERTIES are missing when KSHC create table
### Why are the changes needed? 1. Execute the following SQL to create table and insert. ``` CREATE TABLE test_table (name STRING, age INT) ROW FORMAT SERDE 'org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe' WITH SERDEPROPERTIES ('field.delim' = ',') STORED AS TEXTFILE; INSERT INTO TABLE test_table VALUES ('alice', 30); ``` 2. Check the underlying HDFS data. Output is `alice30`, Expected is `alice,30` ``` hdfs dfs -cat /usr/hive/warehouse/test_table/part-00000-08cdc3f0-15af-413a-a709-46e24f1ace91-c000 ``` 3. Cause: SERDEPROPERTIES are missing when KSHC create table, it did not strip the `option.` prefix. See: apache/spark#28026 ### How was this patch tested? UT ### Was this patch authored or co-authored using generative AI tooling? No Closes #7399 from maomaodev/kyuubi-7398. Closes #7398 815f60a [lifumao] add toOptionsAndSerdeProps ut 66b8fcd [lifumao] add toOptionsAndSerdeProps ut 11fc7b1 [lifumao] SERDEPROPERTIES are missing when KSHC create table 490009d [lifumao] SERDEPROPERTIES are missing when KSHC create table 3d2060c [lifumao] SERDEPROPERTIES are missing when KSHC create table 00fce8e [lifumao] SERDEPROPERTIES are missing when KSHC create table ecacba7 [lifumao] SERDEPROPERTIES are missing when KSHC create table Authored-by: lifumao <[email protected]> Signed-off-by: Cheng Pan <[email protected]>
1 parent f8bc00a commit db2c291

2 files changed

Lines changed: 77 additions & 17 deletions

File tree

extensions/spark/kyuubi-spark-connector-hive/src/main/scala/org/apache/kyuubi/spark/connector/hive/HiveTableCatalog.scala

Lines changed: 37 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -317,12 +317,15 @@ class HiveTableCatalog(sparkSession: SparkSession)
317317
val (partitionColumns, maybeBucketSpec) = partitions.toSeq.convertTransforms
318318
val location = Option(properties.get(TableCatalog.PROP_LOCATION))
319319
val maybeProvider = Option(properties.get(TableCatalog.PROP_PROVIDER))
320+
val tableProps = properties.asScala.toMap
321+
val (optionsProps, serdeProps) = toOptionsAndSerdeProps(tableProps)
320322
val (storage, provider) =
321323
getStorageFormatAndProvider(
322324
maybeProvider,
323325
location,
324-
properties.asScala.toMap)
325-
val tableProperties = properties.asScala
326+
tableProps,
327+
optionsProps,
328+
serdeProps)
326329
val isExternal = properties.containsKey(TableCatalog.PROP_EXTERNAL)
327330
val tableType =
328331
if (isExternal || location.isDefined) {
@@ -339,7 +342,7 @@ class HiveTableCatalog(sparkSession: SparkSession)
339342
provider = Some(provider),
340343
partitionColumnNames = partitionColumns,
341344
bucketSpec = maybeBucketSpec,
342-
properties = tableProperties.toMap,
345+
properties = tableProps,
343346
tracksPartitionsInCatalog = conf.manageFilesourcePartitions,
344347
comment = Option(properties.get(TableCatalog.PROP_COMMENT)))
345348

@@ -431,10 +434,25 @@ class HiveTableCatalog(sparkSession: SparkSession)
431434
catalog.renameTable(oldIdent.asTableIdentifier, newIdent.asTableIdentifier)
432435
}
433436

434-
private def toOptions(properties: Map[String, String]): Map[String, String] = {
435-
properties.filterKeys(_.startsWith(TableCatalog.OPTION_PREFIX)).map {
436-
case (key, value) => key.drop(TableCatalog.OPTION_PREFIX.length) -> value
437-
}.toMap
437+
/**
438+
* Splits properties into optionsProps and serdeProps based on the `options.` prefix.
439+
*
440+
* - optionsProps: keys with "options." prefix whose stripped key ALREADY exist in properties,
441+
* indicating they were originally specified via OPTIONS clause.
442+
* - serdeProps: keys with "options." prefix whose stripped key does NOT exists in properties,
443+
* indicating they were originally specified via SERDEPROPERTIES clause.
444+
*
445+
* @param properties the full properties map
446+
* @return a tuple of (optionsProps, serdeProps), both with the "options." prefix stripped
447+
*/
448+
private[hive] def toOptionsAndSerdeProps(
449+
properties: Map[String, String]): (Map[String, String], Map[String, String]) = {
450+
val (optionsProps, serdeProps) = properties
451+
.filterKeys(_.startsWith(TableCatalog.OPTION_PREFIX))
452+
.map { case (key, value) => key.drop(TableCatalog.OPTION_PREFIX.length) -> value }
453+
.toMap
454+
.partition { case (strippedKey, _) => properties.contains(strippedKey) }
455+
(optionsProps, serdeProps)
438456
}
439457

440458
override def listNamespaces(): Array[Array[String]] =
@@ -583,23 +601,25 @@ private object HiveTableCatalog extends Logging {
583601
private def getStorageFormatAndProvider(
584602
provider: Option[String],
585603
location: Option[String],
586-
options: Map[String, String]): (CatalogStorageFormat, String) = {
604+
tableProps: Map[String, String],
605+
optionsProps: Map[String, String],
606+
serdeProps: Map[String, String]): (CatalogStorageFormat, String) = {
587607
val nonHiveStorageFormat = CatalogStorageFormat.empty.copy(
588608
locationUri = location.map(CatalogUtils.stringToURI),
589-
properties = options)
609+
properties = optionsProps)
590610

591611
val conf = SQLConf.get
592612
val defaultHiveStorage = HiveSerDe.getDefaultStorage(conf).copy(
593613
locationUri = location.map(CatalogUtils.stringToURI),
594-
properties = options)
614+
properties = optionsProps)
595615

596616
if (provider.isDefined) {
597617
(nonHiveStorageFormat, provider.get)
598-
} else if (serdeIsDefined(options)) {
599-
val maybeSerde = options.get("hive.serde")
600-
val maybeStoredAs = options.get("hive.stored-as")
601-
val maybeInputFormat = options.get("hive.input-format")
602-
val maybeOutputFormat = options.get("hive.output-format")
618+
} else if (serdeIsDefined(tableProps)) {
619+
val maybeSerde = tableProps.get("hive.serde")
620+
val maybeStoredAs = tableProps.get("hive.stored-as")
621+
val maybeInputFormat = tableProps.get("hive.input-format")
622+
val maybeOutputFormat = tableProps.get("hive.output-format")
603623
val storageFormat = if (maybeStoredAs.isDefined) {
604624
// If `STORED AS fileFormat` is used, infer inputFormat, outputFormat and serde from it.
605625
HiveSerDe.sourceToSerDe(maybeStoredAs.get) match {
@@ -609,7 +629,7 @@ private object HiveTableCatalog extends Logging {
609629
outputFormat = hiveSerde.outputFormat.orElse(defaultHiveStorage.outputFormat),
610630
// User specified serde takes precedence over the one inferred from file format.
611631
serde = maybeSerde.orElse(hiveSerde.serde).orElse(defaultHiveStorage.serde),
612-
properties = options ++ defaultHiveStorage.properties)
632+
properties = serdeProps ++ defaultHiveStorage.properties)
613633
case _ => throw KyuubiHiveConnectorException(s"Unsupported serde ${maybeSerde.get}.")
614634
}
615635
} else {
@@ -619,7 +639,7 @@ private object HiveTableCatalog extends Logging {
619639
outputFormat =
620640
maybeOutputFormat.orElse(defaultHiveStorage.outputFormat),
621641
serde = maybeSerde.orElse(defaultHiveStorage.serde),
622-
properties = options ++ defaultHiveStorage.properties)
642+
properties = serdeProps ++ defaultHiveStorage.properties)
623643
}
624644
(storageFormat, DDLUtils.HIVE_PROVIDER)
625645
} else {

extensions/spark/kyuubi-spark-connector-hive/src/test/scala/org/apache/kyuubi/spark/connector/hive/HiveCatalogSuite.scala

Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -287,6 +287,46 @@ class HiveCatalogSuite extends KyuubiHiveTest {
287287
catalog.dropTable(testIdent)
288288
}
289289

290+
test("toOptionsAndSerdeProps") {
291+
val properties = Map(
292+
"hive.serde" -> "org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe",
293+
"owner" -> "hadoop",
294+
"header" -> "false",
295+
"delimiter" -> "#",
296+
TableCatalog.OPTION_PREFIX + "header" -> "false",
297+
TableCatalog.OPTION_PREFIX + "delimiter" -> "#",
298+
TableCatalog.OPTION_PREFIX + "field.delim" -> ",",
299+
TableCatalog.OPTION_PREFIX + "line.delim" -> "\n")
300+
301+
val (optionsProps, serdeProps) = catalog.toOptionsAndSerdeProps(properties)
302+
303+
assert(optionsProps == Map(
304+
"header" -> "false",
305+
"delimiter" -> "#"))
306+
assert(serdeProps == Map(
307+
"field.delim" -> ",",
308+
"line.delim" -> "\n"))
309+
}
310+
311+
test("createTable: SERDEPROPERTIES") {
312+
val properties = new util.HashMap[String, String]()
313+
properties.put("hive.serde", "org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe")
314+
properties.put(TableCatalog.OPTION_PREFIX + "field.delim", ",")
315+
assert(!catalog.tableExists(testIdent))
316+
317+
val table = catalog.createTable(
318+
testIdent,
319+
schema,
320+
Array.empty[Transform],
321+
properties).asInstanceOf[HiveTable]
322+
323+
assert(!table.catalogTable.storage.properties.keys.exists(
324+
_.startsWith(TableCatalog.OPTION_PREFIX)))
325+
assert(!table.catalogTable.storage.properties.contains("hive.serde"))
326+
assert(table.catalogTable.storage.properties.contains("field.delim"))
327+
catalog.dropTable(testIdent)
328+
}
329+
290330
test("loadTable") {
291331
val table = catalog.createTable(testIdent, schema, Array.empty[Transform], emptyProps)
292332
val loaded = catalog.loadTable(testIdent)

0 commit comments

Comments
 (0)