Skip to content

Commit e2ea980

Browse files
maomaodevpan3793
authored andcommitted
[KYUUBI #7406] Remove redundant table properties when KSHC create table
### Why are the changes needed? Fixed #6443. When creating an external table via KSHC with `EXTERNAL` and `LOCATION` specified, the KSHC table properties will include `external=true`(lowercase). Meanwhile, Hive Metastore automatically appends `EXTERNAL=TRUE` (uppercase). On some metastore backends (e.g. MySQL with a case-insensitive collation), this leads to a duplicate primary key conflict in TABLE_PARAMS. This PR removes redundant table properties when KSHC creates tables. For example, the `external` property will be excluded. ### How was this patch tested? UT ### Was this patch authored or co-authored using generative AI tooling? No Closes #7406 from maomaodev/kyuubi-6443. Closes #7406 83e3f92 [lifumao] fix ut e006281 [lifumao] fix ut 5d8e916 [lifumao] Remove redundant table properties when KSHC create table Authored-by: lifumao <[email protected]> Signed-off-by: Cheng Pan <[email protected]>
1 parent 4c33a42 commit e2ea980

2 files changed

Lines changed: 99 additions & 15 deletions

File tree

extensions/spark/kyuubi-spark-connector-hive/src/main/scala/org/apache/kyuubi/spark/connector/hive/HiveTableCatalog.scala

Lines changed: 48 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -47,7 +47,7 @@ import org.apache.spark.sql.types.StructType
4747
import org.apache.spark.sql.util.CaseInsensitiveStringMap
4848

4949
import org.apache.kyuubi.spark.connector.hive.HiveConnectorUtils.withSparkSQLConf
50-
import org.apache.kyuubi.spark.connector.hive.HiveTableCatalog.{getStorageFormatAndProvider, toCatalogDatabase, CatalogDatabaseHelper, IdentifierHelper, NamespaceHelper}
50+
import org.apache.kyuubi.spark.connector.hive.HiveTableCatalog.{getStorageFormatAndProvider, toCatalogDatabase, CatalogDatabaseHelper, HIVE_TABLE_RESERVED_SERDE_PROPERTIES, IdentifierHelper, NamespaceHelper}
5151
import org.apache.kyuubi.spark.connector.hive.KyuubiHiveConnectorConf.DROP_TABLE_AS_PURGE_TABLE
5252
import org.apache.kyuubi.spark.connector.hive.KyuubiHiveConnectorDelegationTokenProvider.metastoreTokenSignature
5353
import org.apache.kyuubi.util.reflect.{DynClasses, DynConstructors}
@@ -317,13 +317,13 @@ class HiveTableCatalog(sparkSession: SparkSession)
317317
val (partitionColumns, maybeBucketSpec) = partitions.toSeq.convertTransforms
318318
val location = Option(properties.get(TableCatalog.PROP_LOCATION))
319319
val maybeProvider = Option(properties.get(TableCatalog.PROP_PROVIDER))
320-
val tableProps = properties.asScala.toMap
321-
val (optionsProps, serdeProps) = toOptionsAndSerdeProps(tableProps)
320+
val allProps = properties.asScala.toMap
321+
val (optionsProps, serdeProps) = toOptionsAndSerdeProps(allProps)
322322
val (storage, provider) =
323323
getStorageFormatAndProvider(
324324
maybeProvider,
325325
location,
326-
tableProps,
326+
allProps,
327327
optionsProps,
328328
serdeProps)
329329
val isExternal = properties.containsKey(TableCatalog.PROP_EXTERNAL)
@@ -342,7 +342,7 @@ class HiveTableCatalog(sparkSession: SparkSession)
342342
provider = Some(provider),
343343
partitionColumnNames = partitionColumns,
344344
bucketSpec = maybeBucketSpec,
345-
properties = tableProps,
345+
properties = toTableProps(allProps, optionsProps ++ serdeProps),
346346
tracksPartitionsInCatalog = conf.manageFilesourcePartitions,
347347
comment = Option(properties.get(TableCatalog.PROP_COMMENT)))
348348

@@ -455,6 +455,28 @@ class HiveTableCatalog(sparkSession: SparkSession)
455455
(optionsProps, serdeProps)
456456
}
457457

458+
/**
459+
* Return table properties to be stored in the Hive metastore after excluding the following:
460+
*
461+
* - Excludes `CatalogV2Util.TABLE_RESERVED_PROPERTIES`.
462+
* - Excludes keys with the `options.` prefix.
463+
* - Excludes stripped keys already extracted from OPTIONS or SERDEPROPERTIES.
464+
* - Excludes Hive SerDe/storage keys such as `hive.serde` and `hive.stored-as`.
465+
*
466+
* @param properties the full properties map
467+
* @param optionsAndSerdeProps stripped keys extracted from OPTIONS and SERDEPROPERTIES
468+
* @return table properties to be stored in the Hive metastore
469+
*/
470+
private[hive] def toTableProps(
471+
properties: Map[String, String],
472+
optionsAndSerdeProps: Map[String, String]): Map[String, String] = {
473+
properties.filterKeys(key =>
474+
!CatalogV2Util.TABLE_RESERVED_PROPERTIES.contains(key)
475+
&& !key.startsWith(TableCatalog.OPTION_PREFIX)
476+
&& !optionsAndSerdeProps.contains(key)
477+
&& !HIVE_TABLE_RESERVED_SERDE_PROPERTIES.contains(key)).toMap
478+
}
479+
458480
override def listNamespaces(): Array[Array[String]] =
459481
withSparkSQLConf(LEGACY_NON_IDENTIFIER_OUTPUT_CATALOG_NAME -> "true") {
460482
catalog.listDatabases().map(Array(_)).toArray
@@ -583,6 +605,17 @@ class HiveTableCatalog(sparkSession: SparkSession)
583605
}
584606

585607
private object HiveTableCatalog extends Logging {
608+
private val HIVE_SERDE = "hive.serde"
609+
private val HIVE_STORED_AS = "hive.stored-as"
610+
private val HIVE_OUTPUT_FORMAT = "hive.output-format"
611+
private val HIVE_INPUT_FORMAT = "hive.input-format"
612+
613+
private val HIVE_TABLE_RESERVED_SERDE_PROPERTIES = Set(
614+
HIVE_SERDE,
615+
HIVE_STORED_AS,
616+
HIVE_OUTPUT_FORMAT,
617+
HIVE_INPUT_FORMAT)
618+
586619
private def toCatalogDatabase(
587620
db: String,
588621
metadata: util.Map[String, String],
@@ -601,7 +634,7 @@ private object HiveTableCatalog extends Logging {
601634
private def getStorageFormatAndProvider(
602635
provider: Option[String],
603636
location: Option[String],
604-
tableProps: Map[String, String],
637+
allProps: Map[String, String],
605638
optionsProps: Map[String, String],
606639
serdeProps: Map[String, String]): (CatalogStorageFormat, String) = {
607640
val nonHiveStorageFormat = CatalogStorageFormat.empty.copy(
@@ -615,11 +648,11 @@ private object HiveTableCatalog extends Logging {
615648

616649
if (provider.isDefined) {
617650
(nonHiveStorageFormat, provider.get)
618-
} else if (serdeIsDefined(tableProps)) {
619-
val maybeSerde = tableProps.get("hive.serde")
620-
val maybeStoredAs = tableProps.get("hive.stored-as")
621-
val maybeInputFormat = tableProps.get("hive.input-format")
622-
val maybeOutputFormat = tableProps.get("hive.output-format")
651+
} else if (serdeIsDefined(allProps)) {
652+
val maybeSerde = allProps.get(HIVE_SERDE)
653+
val maybeStoredAs = allProps.get(HIVE_STORED_AS)
654+
val maybeInputFormat = allProps.get(HIVE_INPUT_FORMAT)
655+
val maybeOutputFormat = allProps.get(HIVE_OUTPUT_FORMAT)
623656
val storageFormat = if (maybeStoredAs.isDefined) {
624657
// If `STORED AS fileFormat` is used, infer inputFormat, outputFormat and serde from it.
625658
HiveSerDe.sourceToSerDe(maybeStoredAs.get) match {
@@ -656,10 +689,10 @@ private object HiveTableCatalog extends Logging {
656689
}
657690

658691
private def serdeIsDefined(options: Map[String, String]): Boolean = {
659-
val maybeStoredAs = options.get("hive.stored-as")
660-
val maybeInputFormat = options.get("hive.input-format")
661-
val maybeOutputFormat = options.get("hive.output-format")
662-
val maybeSerde = options.get("hive.serde")
692+
val maybeStoredAs = options.get(HIVE_STORED_AS)
693+
val maybeInputFormat = options.get(HIVE_INPUT_FORMAT)
694+
val maybeOutputFormat = options.get(HIVE_OUTPUT_FORMAT)
695+
val maybeSerde = options.get(HIVE_SERDE)
663696
maybeStoredAs.isDefined || maybeInputFormat.isDefined ||
664697
maybeOutputFormat.isDefined || maybeSerde.isDefined
665698
}

extensions/spark/kyuubi-spark-connector-hive/src/test/scala/org/apache/kyuubi/spark/connector/hive/HiveCatalogSuite.scala

Lines changed: 51 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@ import com.google.common.collect.Maps
2828
import org.apache.hadoop.fs.Path
2929
import org.apache.spark.sql.AnalysisException
3030
import org.apache.spark.sql.catalyst.analysis.{NoSuchNamespaceException, NoSuchTableException, TableAlreadyExistsException}
31+
import org.apache.spark.sql.catalyst.catalog.CatalogTableType
3132
import org.apache.spark.sql.catalyst.parser.CatalystSqlParser
3233
import org.apache.spark.sql.connector.catalog.{Identifier, SupportsNamespaces, TableCatalog}
3334
import org.apache.spark.sql.connector.expressions.Transform
@@ -308,6 +309,26 @@ class HiveCatalogSuite extends KyuubiHiveTest {
308309
"line.delim" -> "\n"))
309310
}
310311

312+
test("toTableProps") {
313+
val properties = Map(
314+
"foo" -> "bar",
315+
TableCatalog.PROP_EXTERNAL -> "true",
316+
TableCatalog.PROP_OWNER -> "hadoop",
317+
TableCatalog.PROP_COMMENT -> "test table",
318+
"hive.serde" -> "org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe",
319+
"header" -> "false",
320+
"field.delim" -> ",",
321+
TableCatalog.OPTION_PREFIX + "header" -> "false",
322+
TableCatalog.OPTION_PREFIX + "delimiter" -> "#",
323+
TableCatalog.OPTION_PREFIX + "field.delim" -> ",",
324+
TableCatalog.OPTION_PREFIX + "line.delim" -> "\n")
325+
326+
val (optionsProps, serdeProps) = catalog.toOptionsAndSerdeProps(properties)
327+
val tableProps = catalog.toTableProps(properties, optionsProps ++ serdeProps)
328+
329+
assert(tableProps == Map("foo" -> "bar"))
330+
}
331+
311332
test("createTable: SERDEPROPERTIES") {
312333
val properties = new util.HashMap[String, String]()
313334
properties.put("hive.serde", "org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe")
@@ -327,6 +348,36 @@ class HiveCatalogSuite extends KyuubiHiveTest {
327348
catalog.dropTable(testIdent)
328349
}
329350

351+
test("createTable: external") {
352+
val properties = new util.HashMap[String, String]()
353+
properties.put("hive.serde", "org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe")
354+
properties.put(TableCatalog.OPTION_PREFIX + "field.delim", ",")
355+
properties.put("field.delim", ",")
356+
properties.put(TableCatalog.PROP_EXTERNAL, "true")
357+
properties.put(TableCatalog.PROP_LOCATION, "file:/tmp/path")
358+
properties.put("foo", "bar")
359+
assert(!catalog.tableExists(testIdent))
360+
361+
val table = catalog.createTable(
362+
testIdent,
363+
schema,
364+
Array.empty[Transform],
365+
properties).asInstanceOf[HiveTable]
366+
367+
assert(table.catalogTable.tableType === CatalogTableType.EXTERNAL)
368+
assert(table.catalogTable.location.toString === "file:/tmp/path")
369+
370+
assert(Set(
371+
"hive.serde",
372+
TableCatalog.OPTION_PREFIX + "field.delim",
373+
"field.delim",
374+
TableCatalog.PROP_EXTERNAL,
375+
TableCatalog.PROP_LOCATION)
376+
.forall(key => !table.catalogTable.properties.contains(key)))
377+
assert(table.catalogTable.properties.get("foo").contains("bar"))
378+
catalog.dropTable(testIdent)
379+
}
380+
330381
test("loadTable") {
331382
val table = catalog.createTable(testIdent, schema, Array.empty[Transform], emptyProps)
332383
val loaded = catalog.loadTable(testIdent)

0 commit comments

Comments
 (0)