diff --git a/extensions/spark/kyuubi-spark-connector-hive/src/main/scala/org/apache/kyuubi/spark/connector/hive/HiveTableCatalog.scala b/extensions/spark/kyuubi-spark-connector-hive/src/main/scala/org/apache/kyuubi/spark/connector/hive/HiveTableCatalog.scala index f6b6235dc30..b5a60e5e555 100644 --- a/extensions/spark/kyuubi-spark-connector-hive/src/main/scala/org/apache/kyuubi/spark/connector/hive/HiveTableCatalog.scala +++ b/extensions/spark/kyuubi-spark-connector-hive/src/main/scala/org/apache/kyuubi/spark/connector/hive/HiveTableCatalog.scala @@ -47,7 +47,7 @@ import org.apache.spark.sql.types.StructType import org.apache.spark.sql.util.CaseInsensitiveStringMap import org.apache.kyuubi.spark.connector.hive.HiveConnectorUtils.withSparkSQLConf -import org.apache.kyuubi.spark.connector.hive.HiveTableCatalog.{getStorageFormatAndProvider, toCatalogDatabase, CatalogDatabaseHelper, IdentifierHelper, NamespaceHelper} +import org.apache.kyuubi.spark.connector.hive.HiveTableCatalog.{getStorageFormatAndProvider, toCatalogDatabase, CatalogDatabaseHelper, HIVE_TABLE_RESERVED_SERDE_PROPERTIES, IdentifierHelper, NamespaceHelper} import org.apache.kyuubi.spark.connector.hive.KyuubiHiveConnectorConf.DROP_TABLE_AS_PURGE_TABLE import org.apache.kyuubi.spark.connector.hive.KyuubiHiveConnectorDelegationTokenProvider.metastoreTokenSignature import org.apache.kyuubi.util.reflect.{DynClasses, DynConstructors} @@ -317,13 +317,13 @@ class HiveTableCatalog(sparkSession: SparkSession) val (partitionColumns, maybeBucketSpec) = partitions.toSeq.convertTransforms val location = Option(properties.get(TableCatalog.PROP_LOCATION)) val maybeProvider = Option(properties.get(TableCatalog.PROP_PROVIDER)) - val tableProps = properties.asScala.toMap - val (optionsProps, serdeProps) = toOptionsAndSerdeProps(tableProps) + val allProps = properties.asScala.toMap + val (optionsProps, serdeProps) = toOptionsAndSerdeProps(allProps) val (storage, provider) = getStorageFormatAndProvider( maybeProvider, location, - tableProps, + allProps, optionsProps, serdeProps) val isExternal = properties.containsKey(TableCatalog.PROP_EXTERNAL) @@ -342,7 +342,7 @@ class HiveTableCatalog(sparkSession: SparkSession) provider = Some(provider), partitionColumnNames = partitionColumns, bucketSpec = maybeBucketSpec, - properties = tableProps, + properties = toTableProps(allProps, optionsProps ++ serdeProps), tracksPartitionsInCatalog = conf.manageFilesourcePartitions, comment = Option(properties.get(TableCatalog.PROP_COMMENT))) @@ -455,6 +455,28 @@ class HiveTableCatalog(sparkSession: SparkSession) (optionsProps, serdeProps) } + /** + * Return table properties to be stored in the Hive metastore after excluding the following: + * + * - Excludes `CatalogV2Util.TABLE_RESERVED_PROPERTIES`. + * - Excludes keys with the `options.` prefix. + * - Excludes stripped keys already extracted from OPTIONS or SERDEPROPERTIES. + * - Excludes Hive SerDe/storage keys such as `hive.serde` and `hive.stored-as`. + * + * @param properties the full properties map + * @param optionsAndSerdeProps stripped keys extracted from OPTIONS and SERDEPROPERTIES + * @return table properties to be stored in the Hive metastore + */ + private[hive] def toTableProps( + properties: Map[String, String], + optionsAndSerdeProps: Map[String, String]): Map[String, String] = { + properties.filterKeys(key => + !CatalogV2Util.TABLE_RESERVED_PROPERTIES.contains(key) + && !key.startsWith(TableCatalog.OPTION_PREFIX) + && !optionsAndSerdeProps.contains(key) + && !HIVE_TABLE_RESERVED_SERDE_PROPERTIES.contains(key)).toMap + } + override def listNamespaces(): Array[Array[String]] = withSparkSQLConf(LEGACY_NON_IDENTIFIER_OUTPUT_CATALOG_NAME -> "true") { catalog.listDatabases().map(Array(_)).toArray @@ -583,6 +605,17 @@ class HiveTableCatalog(sparkSession: SparkSession) } private object HiveTableCatalog extends Logging { + private val HIVE_SERDE = "hive.serde" + private val HIVE_STORED_AS = "hive.stored-as" + private val HIVE_OUTPUT_FORMAT = "hive.output-format" + private val HIVE_INPUT_FORMAT = "hive.input-format" + + private val HIVE_TABLE_RESERVED_SERDE_PROPERTIES = Set( + HIVE_SERDE, + HIVE_STORED_AS, + HIVE_OUTPUT_FORMAT, + HIVE_INPUT_FORMAT) + private def toCatalogDatabase( db: String, metadata: util.Map[String, String], @@ -601,7 +634,7 @@ private object HiveTableCatalog extends Logging { private def getStorageFormatAndProvider( provider: Option[String], location: Option[String], - tableProps: Map[String, String], + allProps: Map[String, String], optionsProps: Map[String, String], serdeProps: Map[String, String]): (CatalogStorageFormat, String) = { val nonHiveStorageFormat = CatalogStorageFormat.empty.copy( @@ -615,11 +648,11 @@ private object HiveTableCatalog extends Logging { if (provider.isDefined) { (nonHiveStorageFormat, provider.get) - } else if (serdeIsDefined(tableProps)) { - val maybeSerde = tableProps.get("hive.serde") - val maybeStoredAs = tableProps.get("hive.stored-as") - val maybeInputFormat = tableProps.get("hive.input-format") - val maybeOutputFormat = tableProps.get("hive.output-format") + } else if (serdeIsDefined(allProps)) { + val maybeSerde = allProps.get(HIVE_SERDE) + val maybeStoredAs = allProps.get(HIVE_STORED_AS) + val maybeInputFormat = allProps.get(HIVE_INPUT_FORMAT) + val maybeOutputFormat = allProps.get(HIVE_OUTPUT_FORMAT) val storageFormat = if (maybeStoredAs.isDefined) { // If `STORED AS fileFormat` is used, infer inputFormat, outputFormat and serde from it. HiveSerDe.sourceToSerDe(maybeStoredAs.get) match { @@ -656,10 +689,10 @@ private object HiveTableCatalog extends Logging { } private def serdeIsDefined(options: Map[String, String]): Boolean = { - val maybeStoredAs = options.get("hive.stored-as") - val maybeInputFormat = options.get("hive.input-format") - val maybeOutputFormat = options.get("hive.output-format") - val maybeSerde = options.get("hive.serde") + val maybeStoredAs = options.get(HIVE_STORED_AS) + val maybeInputFormat = options.get(HIVE_INPUT_FORMAT) + val maybeOutputFormat = options.get(HIVE_OUTPUT_FORMAT) + val maybeSerde = options.get(HIVE_SERDE) maybeStoredAs.isDefined || maybeInputFormat.isDefined || maybeOutputFormat.isDefined || maybeSerde.isDefined } diff --git a/extensions/spark/kyuubi-spark-connector-hive/src/test/scala/org/apache/kyuubi/spark/connector/hive/HiveCatalogSuite.scala b/extensions/spark/kyuubi-spark-connector-hive/src/test/scala/org/apache/kyuubi/spark/connector/hive/HiveCatalogSuite.scala index 56a89bac2a3..166a31ab6aa 100644 --- a/extensions/spark/kyuubi-spark-connector-hive/src/test/scala/org/apache/kyuubi/spark/connector/hive/HiveCatalogSuite.scala +++ b/extensions/spark/kyuubi-spark-connector-hive/src/test/scala/org/apache/kyuubi/spark/connector/hive/HiveCatalogSuite.scala @@ -28,6 +28,7 @@ import com.google.common.collect.Maps import org.apache.hadoop.fs.Path import org.apache.spark.sql.AnalysisException import org.apache.spark.sql.catalyst.analysis.{NoSuchNamespaceException, NoSuchTableException, TableAlreadyExistsException} +import org.apache.spark.sql.catalyst.catalog.CatalogTableType import org.apache.spark.sql.catalyst.parser.CatalystSqlParser import org.apache.spark.sql.connector.catalog.{Identifier, SupportsNamespaces, TableCatalog} import org.apache.spark.sql.connector.expressions.Transform @@ -308,6 +309,26 @@ class HiveCatalogSuite extends KyuubiHiveTest { "line.delim" -> "\n")) } + test("toTableProps") { + val properties = Map( + "foo" -> "bar", + TableCatalog.PROP_EXTERNAL -> "true", + TableCatalog.PROP_OWNER -> "hadoop", + TableCatalog.PROP_COMMENT -> "test table", + "hive.serde" -> "org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe", + "header" -> "false", + "field.delim" -> ",", + TableCatalog.OPTION_PREFIX + "header" -> "false", + TableCatalog.OPTION_PREFIX + "delimiter" -> "#", + TableCatalog.OPTION_PREFIX + "field.delim" -> ",", + TableCatalog.OPTION_PREFIX + "line.delim" -> "\n") + + val (optionsProps, serdeProps) = catalog.toOptionsAndSerdeProps(properties) + val tableProps = catalog.toTableProps(properties, optionsProps ++ serdeProps) + + assert(tableProps == Map("foo" -> "bar")) + } + test("createTable: SERDEPROPERTIES") { val properties = new util.HashMap[String, String]() properties.put("hive.serde", "org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe") @@ -327,6 +348,36 @@ class HiveCatalogSuite extends KyuubiHiveTest { catalog.dropTable(testIdent) } + test("createTable: external") { + val properties = new util.HashMap[String, String]() + properties.put("hive.serde", "org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe") + properties.put(TableCatalog.OPTION_PREFIX + "field.delim", ",") + properties.put("field.delim", ",") + properties.put(TableCatalog.PROP_EXTERNAL, "true") + properties.put(TableCatalog.PROP_LOCATION, "file:/tmp/path") + properties.put("foo", "bar") + assert(!catalog.tableExists(testIdent)) + + val table = catalog.createTable( + testIdent, + schema, + Array.empty[Transform], + properties).asInstanceOf[HiveTable] + + assert(table.catalogTable.tableType === CatalogTableType.EXTERNAL) + assert(table.catalogTable.location.toString === "file:/tmp/path") + + assert(Set( + "hive.serde", + TableCatalog.OPTION_PREFIX + "field.delim", + "field.delim", + TableCatalog.PROP_EXTERNAL, + TableCatalog.PROP_LOCATION) + .forall(key => !table.catalogTable.properties.contains(key))) + assert(table.catalogTable.properties.get("foo").contains("bar")) + catalog.dropTable(testIdent) + } + test("loadTable") { val table = catalog.createTable(testIdent, schema, Array.empty[Transform], emptyProps) val loaded = catalog.loadTable(testIdent)