Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.util.CaseInsensitiveStringMap

import org.apache.kyuubi.spark.connector.hive.HiveConnectorUtils.withSparkSQLConf
import org.apache.kyuubi.spark.connector.hive.HiveTableCatalog.{getStorageFormatAndProvider, toCatalogDatabase, CatalogDatabaseHelper, IdentifierHelper, NamespaceHelper}
import org.apache.kyuubi.spark.connector.hive.HiveTableCatalog.{getStorageFormatAndProvider, toCatalogDatabase, CatalogDatabaseHelper, HIVE_TABLE_RESERVED_SERDE_PROPERTIES, IdentifierHelper, NamespaceHelper}
import org.apache.kyuubi.spark.connector.hive.KyuubiHiveConnectorConf.DROP_TABLE_AS_PURGE_TABLE
import org.apache.kyuubi.spark.connector.hive.KyuubiHiveConnectorDelegationTokenProvider.metastoreTokenSignature
import org.apache.kyuubi.util.reflect.{DynClasses, DynConstructors}
Expand Down Expand Up @@ -317,13 +317,13 @@ class HiveTableCatalog(sparkSession: SparkSession)
val (partitionColumns, maybeBucketSpec) = partitions.toSeq.convertTransforms
val location = Option(properties.get(TableCatalog.PROP_LOCATION))
val maybeProvider = Option(properties.get(TableCatalog.PROP_PROVIDER))
val tableProps = properties.asScala.toMap
val (optionsProps, serdeProps) = toOptionsAndSerdeProps(tableProps)
val allProps = properties.asScala.toMap
val (optionsProps, serdeProps) = toOptionsAndSerdeProps(allProps)
val (storage, provider) =
getStorageFormatAndProvider(
maybeProvider,
location,
tableProps,
allProps,
optionsProps,
serdeProps)
val isExternal = properties.containsKey(TableCatalog.PROP_EXTERNAL)
Expand All @@ -342,7 +342,7 @@ class HiveTableCatalog(sparkSession: SparkSession)
provider = Some(provider),
partitionColumnNames = partitionColumns,
bucketSpec = maybeBucketSpec,
properties = tableProps,
properties = toTableProps(allProps, optionsProps ++ serdeProps),
tracksPartitionsInCatalog = conf.manageFilesourcePartitions,
comment = Option(properties.get(TableCatalog.PROP_COMMENT)))

Expand Down Expand Up @@ -455,6 +455,28 @@ class HiveTableCatalog(sparkSession: SparkSession)
(optionsProps, serdeProps)
}

/**
* Return table properties to be stored in the Hive metastore after excluding the following:
*
* - Excludes `CatalogV2Util.TABLE_RESERVED_PROPERTIES`.
* - Excludes keys with the `options.` prefix.
* - Excludes stripped keys already extracted from OPTIONS or SERDEPROPERTIES.
* - Excludes Hive SerDe/storage keys such as `hive.serde` and `hive.stored-as`.
*
* @param properties the full properties map
* @param optionsAndSerdeProps stripped keys extracted from OPTIONS and SERDEPROPERTIES
* @return table properties to be stored in the Hive metastore
*/
private[hive] def toTableProps(
properties: Map[String, String],
optionsAndSerdeProps: Map[String, String]): Map[String, String] = {
properties.filterKeys(key =>
!CatalogV2Util.TABLE_RESERVED_PROPERTIES.contains(key)
&& !key.startsWith(TableCatalog.OPTION_PREFIX)
&& !optionsAndSerdeProps.contains(key)
&& !HIVE_TABLE_RESERVED_SERDE_PROPERTIES.contains(key)).toMap
}

override def listNamespaces(): Array[Array[String]] =
withSparkSQLConf(LEGACY_NON_IDENTIFIER_OUTPUT_CATALOG_NAME -> "true") {
catalog.listDatabases().map(Array(_)).toArray
Expand Down Expand Up @@ -583,6 +605,17 @@ class HiveTableCatalog(sparkSession: SparkSession)
}

private object HiveTableCatalog extends Logging {
private val HIVE_SERDE = "hive.serde"
private val HIVE_STORED_AS = "hive.stored-as"
private val HIVE_OUTPUT_FORMAT = "hive.output-format"
private val HIVE_INPUT_FORMAT = "hive.input-format"

private val HIVE_TABLE_RESERVED_SERDE_PROPERTIES = Set(
HIVE_SERDE,
HIVE_STORED_AS,
HIVE_OUTPUT_FORMAT,
HIVE_INPUT_FORMAT)

private def toCatalogDatabase(
db: String,
metadata: util.Map[String, String],
Expand All @@ -601,7 +634,7 @@ private object HiveTableCatalog extends Logging {
private def getStorageFormatAndProvider(
provider: Option[String],
location: Option[String],
tableProps: Map[String, String],
allProps: Map[String, String],
optionsProps: Map[String, String],
serdeProps: Map[String, String]): (CatalogStorageFormat, String) = {
val nonHiveStorageFormat = CatalogStorageFormat.empty.copy(
Expand All @@ -615,11 +648,11 @@ private object HiveTableCatalog extends Logging {

if (provider.isDefined) {
(nonHiveStorageFormat, provider.get)
} else if (serdeIsDefined(tableProps)) {
val maybeSerde = tableProps.get("hive.serde")
val maybeStoredAs = tableProps.get("hive.stored-as")
val maybeInputFormat = tableProps.get("hive.input-format")
val maybeOutputFormat = tableProps.get("hive.output-format")
} else if (serdeIsDefined(allProps)) {
val maybeSerde = allProps.get(HIVE_SERDE)
val maybeStoredAs = allProps.get(HIVE_STORED_AS)
val maybeInputFormat = allProps.get(HIVE_INPUT_FORMAT)
val maybeOutputFormat = allProps.get(HIVE_OUTPUT_FORMAT)
val storageFormat = if (maybeStoredAs.isDefined) {
// If `STORED AS fileFormat` is used, infer inputFormat, outputFormat and serde from it.
HiveSerDe.sourceToSerDe(maybeStoredAs.get) match {
Expand Down Expand Up @@ -656,10 +689,10 @@ private object HiveTableCatalog extends Logging {
}

private def serdeIsDefined(options: Map[String, String]): Boolean = {
val maybeStoredAs = options.get("hive.stored-as")
val maybeInputFormat = options.get("hive.input-format")
val maybeOutputFormat = options.get("hive.output-format")
val maybeSerde = options.get("hive.serde")
val maybeStoredAs = options.get(HIVE_STORED_AS)
val maybeInputFormat = options.get(HIVE_INPUT_FORMAT)
val maybeOutputFormat = options.get(HIVE_OUTPUT_FORMAT)
val maybeSerde = options.get(HIVE_SERDE)
maybeStoredAs.isDefined || maybeInputFormat.isDefined ||
maybeOutputFormat.isDefined || maybeSerde.isDefined
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import com.google.common.collect.Maps
import org.apache.hadoop.fs.Path
import org.apache.spark.sql.AnalysisException
import org.apache.spark.sql.catalyst.analysis.{NoSuchNamespaceException, NoSuchTableException, TableAlreadyExistsException}
import org.apache.spark.sql.catalyst.catalog.CatalogTableType
import org.apache.spark.sql.catalyst.parser.CatalystSqlParser
import org.apache.spark.sql.connector.catalog.{Identifier, SupportsNamespaces, TableCatalog}
import org.apache.spark.sql.connector.expressions.Transform
Expand Down Expand Up @@ -308,6 +309,26 @@ class HiveCatalogSuite extends KyuubiHiveTest {
"line.delim" -> "\n"))
}

test("toTableProps") {
val properties = Map(
"foo" -> "bar",
TableCatalog.PROP_EXTERNAL -> "true",
TableCatalog.PROP_OWNER -> "hadoop",
TableCatalog.PROP_COMMENT -> "test table",
"hive.serde" -> "org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe",
"header" -> "false",
"field.delim" -> ",",
TableCatalog.OPTION_PREFIX + "header" -> "false",
TableCatalog.OPTION_PREFIX + "delimiter" -> "#",
TableCatalog.OPTION_PREFIX + "field.delim" -> ",",
TableCatalog.OPTION_PREFIX + "line.delim" -> "\n")

val (optionsProps, serdeProps) = catalog.toOptionsAndSerdeProps(properties)
val tableProps = catalog.toTableProps(properties, optionsProps ++ serdeProps)

assert(tableProps == Map("foo" -> "bar"))
}

test("createTable: SERDEPROPERTIES") {
val properties = new util.HashMap[String, String]()
properties.put("hive.serde", "org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe")
Expand All @@ -327,6 +348,36 @@ class HiveCatalogSuite extends KyuubiHiveTest {
catalog.dropTable(testIdent)
}

test("createTable: external") {
val properties = new util.HashMap[String, String]()
properties.put("hive.serde", "org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe")
properties.put(TableCatalog.OPTION_PREFIX + "field.delim", ",")
properties.put("field.delim", ",")
properties.put(TableCatalog.PROP_EXTERNAL, "true")
properties.put(TableCatalog.PROP_LOCATION, "file:/tmp/path")
properties.put("foo", "bar")
assert(!catalog.tableExists(testIdent))

val table = catalog.createTable(
testIdent,
schema,
Array.empty[Transform],
properties).asInstanceOf[HiveTable]

assert(table.catalogTable.tableType === CatalogTableType.EXTERNAL)
assert(table.catalogTable.location.toString === "file:/tmp/path")

assert(Set(
"hive.serde",
TableCatalog.OPTION_PREFIX + "field.delim",
"field.delim",
TableCatalog.PROP_EXTERNAL,
TableCatalog.PROP_LOCATION)
.forall(key => !table.catalogTable.properties.contains(key)))
assert(table.catalogTable.properties.get("foo").contains("bar"))
catalog.dropTable(testIdent)
}

test("loadTable") {
val table = catalog.createTable(testIdent, schema, Array.empty[Transform], emptyProps)
val loaded = catalog.loadTable(testIdent)
Expand Down
Loading