From 00d4da80a94e2d338f9ae8a8fdd5bf2544efbff2 Mon Sep 17 00:00:00 2001 From: Abby Whittier Date: Tue, 13 May 2025 15:42:05 +0000 Subject: [PATCH 01/36] staging changes for testing iceberg --- .gitignore | 5 +++ build.sbt | 21 +++++++++- .../chronon/spark/SparkSessionBuilder.scala | 10 ++++- .../chronon/spark/test/TableUtilsTest.scala | 39 +++++++++++++++++++ 4 files changed, 73 insertions(+), 2 deletions(-) diff --git a/.gitignore b/.gitignore index 90d6e9b021..d87cf1e3eb 100644 --- a/.gitignore +++ b/.gitignore @@ -10,6 +10,11 @@ *.logs *.iml .idea/ +*.jvmopts +.bloop* +.metals* +.venv* +*metals.sbt* .eclipse **/.vscode/ **/__pycache__/ diff --git a/build.sbt b/build.sbt index 438c034971..854ce2cf86 100644 --- a/build.sbt +++ b/build.sbt @@ -13,6 +13,7 @@ lazy val spark3_1_1 = "3.1.1" lazy val spark3_2_1 = "3.2.1" lazy val spark3_5_3 = "3.5.3" lazy val tmp_warehouse = "/tmp/chronon/" +lazy val icebergVersion = "0.14.0" ThisBuild / organization := "ai.chronon" ThisBuild / organizationName := "chronon" @@ -189,6 +190,22 @@ val VersionMatrix: Map[String, VersionDependency] = Map( Some("1.0.1"), Some("2.0.2") ), + "iceberg31" -> VersionDependency( + Seq( + "org.apache.iceberg" %% "iceberg-spark-3.1", + ), + None, + Some(icebergVersion), + Some(icebergVersion) + ), + "iceberg32" -> VersionDependency( + Seq( + "org.apache.iceberg" %% "iceberg-spark-3.2", + ), + None, + None, + Some(icebergVersion) + ), "jackson" -> VersionDependency( Seq( "com.fasterxml.jackson.core" % "jackson-core", @@ -427,7 +444,9 @@ lazy val spark_embedded = (project in file("spark")) libraryDependencies ++= (if (use_spark_3_5.value) fromMatrix(scalaVersion.value, "spark-all-3-5", "delta-core") else - fromMatrix(scalaVersion.value, "spark-all", "delta-core")), + fromMatrix(scalaVersion.value, "spark-all", "delta-core", "iceberg31", "iceberg32")), + dependencyOverrides := Seq( "com.fasterxml.jackson.core" % "jackson-databind" % "2.10.0", + "com.fasterxml.jackson.core" % "jackson-core" % "2.10.0"), target := target.value.toPath.resolveSibling("target-embedded").toFile, Test / test := {} ) diff --git a/spark/src/main/scala/ai/chronon/spark/SparkSessionBuilder.scala b/spark/src/main/scala/ai/chronon/spark/SparkSessionBuilder.scala index ebe6f6b1ad..c842c58f1c 100644 --- a/spark/src/main/scala/ai/chronon/spark/SparkSessionBuilder.scala +++ b/spark/src/main/scala/ai/chronon/spark/SparkSessionBuilder.scala @@ -42,7 +42,7 @@ object SparkSessionBuilder { // allow us to override the format by specifying env vars. This allows us to not have to worry about interference // between Spark sessions created in existing chronon tests that need the hive format and some specific tests // that require a format override like delta lake. - val (formatConfigs, kryoRegistrator) = sys.env.get(FormatTestEnvVar) match { + val (formatConfigs, kryoRegistrator) = Some("deltalake") match { case Some("deltalake") => val configMap = Map( "spark.sql.extensions" -> "io.delta.sql.DeltaSparkSessionExtension", @@ -50,6 +50,14 @@ object SparkSessionBuilder { "spark.chronon.table_write.format" -> "delta" ) (configMap, "ai.chronon.spark.ChrononDeltaLakeKryoRegistrator") + case Some("iceberg") => + val configMap = Map( + "spark.sql.extensions" -> "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions", + "spark.chronon.table_write.format" -> "iceberg", + "spark.sql.sources.partitionOverwriteMode" -> "dynamic" + ) + // TODO registrator for iceberg + (configMap, "ai.chronon.spark.ChrononKryoRegistrator") case _ => (Map.empty, "ai.chronon.spark.ChrononKryoRegistrator") } diff --git a/spark/src/test/scala/ai/chronon/spark/test/TableUtilsTest.scala b/spark/src/test/scala/ai/chronon/spark/test/TableUtilsTest.scala index 38417e7039..73b79d4322 100644 --- a/spark/src/test/scala/ai/chronon/spark/test/TableUtilsTest.scala +++ b/spark/src/test/scala/ai/chronon/spark/test/TableUtilsTest.scala @@ -547,4 +547,43 @@ class TableUtilsTest { assertTrue(firstDs.contains("2022-11-01")) } } + + @Test + def testGetPartitionsWithLongPartition(): Unit = { + val tableName = "db.test_long_partitions" + spark.sql("CREATE DATABASE IF NOT EXISTS db") + val structFields = Array( + StructField("dateint", LongType), + StructField("hr", IntType), + StructField("event_type", StringType), + StructField("label_ds", StringType), + StructField("feature_value", IntType) + ) + + val rows = List( + Row(20220101L, 1, "event1", "2022-01-01", 4), // 2022-01-01 with hr=1 + Row(20220102L, 2, "event2", "2022-01-02", 2), // 2022-01-02 with hr=2 + Row(20220103L, 10, "event1", "2022-01-03", 9), // 2022-01-03 with hr=10 + Row(20220104L, 12, "event1", "20224-01-04", 12) // 2022-01-04 with hr=12 + ) + + val df1 = makeDf( + spark, + StructType( + tableName, + structFields + ), + rows + ) + val partitionColumns = Seq("dateint", "hr", "event_type") + tableUtils.insertPartitions(df1, + tableName, + partitionColumns = partitionColumns, + ) + assert(tableUtils.tableExists(tableName)) + val partitions = tableUtils.partitions(tableName, Map.empty, partitionColOpt = Some("dateint")) + assert(partitions.size == 4) + assert(tableUtils.allPartitions(tableName).size == 4) + } + } From 93b7c923f1d0cfcdce3c6576f31a87d0340e6d71 Mon Sep 17 00:00:00 2001 From: Abby Whittier Date: Tue, 13 May 2025 19:52:03 +0000 Subject: [PATCH 02/36] timeboxing test changes --- api/py/python-api-build.sh | 4 ++++ build.sbt | 6 +++--- .../ai/chronon/spark/SparkSessionBuilder.scala | 16 ++++++++++++---- 3 files changed, 19 insertions(+), 7 deletions(-) diff --git a/api/py/python-api-build.sh b/api/py/python-api-build.sh index a984be5ee0..a808d56cc0 100755 --- a/api/py/python-api-build.sh +++ b/api/py/python-api-build.sh @@ -21,6 +21,10 @@ export CHRONON_VERSION_STR=$1 export CHRONON_BRANCH_STR=$2 ACTION=$3 +python3 -m venv .venv +source .venv/bin/activate +pip install build tox + echo "Finding working directory.." SCRIPT_DIR=$( cd -- "$( dirname -- "${BASH_SOURCE[0]}" )" &> /dev/null && pwd ) echo "Removing old distributions..." diff --git a/build.sbt b/build.sbt index 854ce2cf86..e7f272a11b 100644 --- a/build.sbt +++ b/build.sbt @@ -192,15 +192,15 @@ val VersionMatrix: Map[String, VersionDependency] = Map( ), "iceberg31" -> VersionDependency( Seq( - "org.apache.iceberg" %% "iceberg-spark-3.1", + "org.apache.iceberg" %% "iceberg-spark-runtime-3.1", ), None, Some(icebergVersion), - Some(icebergVersion) + None ), "iceberg32" -> VersionDependency( Seq( - "org.apache.iceberg" %% "iceberg-spark-3.2", + "org.apache.iceberg" %% "iceberg-spark-runtime-3.2", ), None, None, diff --git a/spark/src/main/scala/ai/chronon/spark/SparkSessionBuilder.scala b/spark/src/main/scala/ai/chronon/spark/SparkSessionBuilder.scala index c842c58f1c..3f786d9787 100644 --- a/spark/src/main/scala/ai/chronon/spark/SparkSessionBuilder.scala +++ b/spark/src/main/scala/ai/chronon/spark/SparkSessionBuilder.scala @@ -39,10 +39,14 @@ object SparkSessionBuilder { additionalConfig: Option[Map[String, String]] = None, enforceKryoSerializer: Boolean = true): SparkSession = { + + val userName = Properties.userName + val warehouseDir = localWarehouseLocation.map(expandUser).getOrElse(DefaultWarehouseDir.getAbsolutePath) + // allow us to override the format by specifying env vars. This allows us to not have to worry about interference // between Spark sessions created in existing chronon tests that need the hive format and some specific tests // that require a format override like delta lake. - val (formatConfigs, kryoRegistrator) = Some("deltalake") match { + val (formatConfigs, kryoRegistrator) = Some("iceberg") match { case Some("deltalake") => val configMap = Map( "spark.sql.extensions" -> "io.delta.sql.DeltaSparkSessionExtension", @@ -53,8 +57,13 @@ object SparkSessionBuilder { case Some("iceberg") => val configMap = Map( "spark.sql.extensions" -> "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions", + + "spark.sql.catalog.spark_catalog" -> "org.apache.iceberg.spark.SparkSessionCatalog", + "spark.sql.catalog.spark_catalog.type" -> "hadoop", + "spark.sql.catalog.iceberg_catalog.warehouse" -> s"$warehouseDir/iceberg-warehouse", + "spark.sql.catalog.iceberg_catalog.cache-enabled" -> "false", + "spark.chronon.table_write.format" -> "iceberg", - "spark.sql.sources.partitionOverwriteMode" -> "dynamic" ) // TODO registrator for iceberg (configMap, "ai.chronon.spark.ChrononKryoRegistrator") @@ -68,8 +77,7 @@ object SparkSessionBuilder { //required to run spark locally with hive support enabled - for sbt test System.setSecurityManager(null) } - val userName = Properties.userName - val warehouseDir = localWarehouseLocation.map(expandUser).getOrElse(DefaultWarehouseDir.getAbsolutePath) + var baseBuilder = SparkSession .builder() .appName(name) From 2d4ed6f935d77e1e3101edb9a55600fc16d3c2de Mon Sep 17 00:00:00 2001 From: Abby Whittier Date: Wed, 14 May 2025 04:01:54 +0000 Subject: [PATCH 03/36] bootstrapping spark test, 1/3 working on FormatTest --- .gitignore | 4 +- .../chronon/spark/SparkSessionBuilder.scala | 59 ++++++++++--------- .../spark/test/TableUtilsFormatTest.scala | 2 +- 3 files changed, 34 insertions(+), 31 deletions(-) diff --git a/.gitignore b/.gitignore index d87cf1e3eb..d276e1b53a 100644 --- a/.gitignore +++ b/.gitignore @@ -26,7 +26,9 @@ api/py/.coverage api/py/htmlcov/ **/derby.log cs - +*.bloop +*.metals +*.venv # Documentation builds docs/build/ diff --git a/spark/src/main/scala/ai/chronon/spark/SparkSessionBuilder.scala b/spark/src/main/scala/ai/chronon/spark/SparkSessionBuilder.scala index 3f786d9787..b988d64c59 100644 --- a/spark/src/main/scala/ai/chronon/spark/SparkSessionBuilder.scala +++ b/spark/src/main/scala/ai/chronon/spark/SparkSessionBuilder.scala @@ -39,14 +39,10 @@ object SparkSessionBuilder { additionalConfig: Option[Map[String, String]] = None, enforceKryoSerializer: Boolean = true): SparkSession = { - - val userName = Properties.userName - val warehouseDir = localWarehouseLocation.map(expandUser).getOrElse(DefaultWarehouseDir.getAbsolutePath) - // allow us to override the format by specifying env vars. This allows us to not have to worry about interference // between Spark sessions created in existing chronon tests that need the hive format and some specific tests // that require a format override like delta lake. - val (formatConfigs, kryoRegistrator) = Some("iceberg") match { + val (formatConfigs, kryoRegistrator) = sys.env.get(FormatTestEnvVar) match { case Some("deltalake") => val configMap = Map( "spark.sql.extensions" -> "io.delta.sql.DeltaSparkSessionExtension", @@ -54,18 +50,6 @@ object SparkSessionBuilder { "spark.chronon.table_write.format" -> "delta" ) (configMap, "ai.chronon.spark.ChrononDeltaLakeKryoRegistrator") - case Some("iceberg") => - val configMap = Map( - "spark.sql.extensions" -> "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions", - - "spark.sql.catalog.spark_catalog" -> "org.apache.iceberg.spark.SparkSessionCatalog", - "spark.sql.catalog.spark_catalog.type" -> "hadoop", - "spark.sql.catalog.iceberg_catalog.warehouse" -> s"$warehouseDir/iceberg-warehouse", - "spark.sql.catalog.iceberg_catalog.cache-enabled" -> "false", - - "spark.chronon.table_write.format" -> "iceberg", - ) - // TODO registrator for iceberg (configMap, "ai.chronon.spark.ChrononKryoRegistrator") case _ => (Map.empty, "ai.chronon.spark.ChrononKryoRegistrator") } @@ -79,17 +63,18 @@ object SparkSessionBuilder { } var baseBuilder = SparkSession - .builder() - .appName(name) - .enableHiveSupport() - .config("spark.sql.session.timeZone", "UTC") - //otherwise overwrite will delete ALL partitions, not just the ones it touches - .config("spark.sql.sources.partitionOverwriteMode", "dynamic") - .config("hive.exec.dynamic.partition", "true") - .config("hive.exec.dynamic.partition.mode", "nonstrict") - .config("spark.sql.catalogImplementation", "hive") - .config("spark.hadoop.hive.exec.max.dynamic.partitions", 30000) - .config("spark.sql.legacy.timeParserPolicy", "LEGACY") + .builder() + .appName(name) + .enableHiveSupport() + .config("spark.sql.session.timeZone", "UTC") + //otherwise overwrite will delete ALL partitions, not just the ones it touches + .config("spark.sql.sources.partitionOverwriteMode", "dynamic") + .config("hive.exec.dynamic.partition", "true") + .config("hive.exec.dynamic.partition.mode", "nonstrict") + .config("spark.sql.catalogImplementation", "hive") + .config("spark.hadoop.hive.exec.max.dynamic.partitions", 30000) + .config("spark.sql.legacy.timeParserPolicy", "LEGACY") + // Staging queries don't benefit from the KryoSerializer and in fact may fail with buffer underflow in some cases. if (enforceKryoSerializer) { @@ -107,7 +92,23 @@ object SparkSessionBuilder { baseBuilder.config("spark.sql.legacy.allowCreatingManagedTableUsingNonemptyLocation", "true") } - val builder = if (local) { + val userName = Properties.userName + val warehouseDir = localWarehouseLocation.map(expandUser).getOrElse(DefaultWarehouseDir.getAbsolutePath) + + val builder = if (true) { + SparkSession + .builder() + .appName(name) + .master("local[*]") + .config("spark.sql.extensions", "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions") + .config("spark.sql.catalog.spark_catalog", "org.apache.iceberg.spark.SparkCatalog") + .config("spark.sql.catalog.spark_catalog.type", "hadoop") + .config("spark.sql.catalog.spark_catalog.warehouse", s"$warehouseDir/data") + .config("spark.sql.catalog.spark_catalog.cache.enabled", "false") + .config("spark.driver.bindAddress", "127.0.0.1") + .config("spark.chronon.table_write.format","iceberg") + .config("spark.chronon.table_read.format","iceberg") + } else if (local) { logger.info(s"Building local spark session with warehouse at $warehouseDir") val metastoreDb = s"jdbc:derby:;databaseName=$warehouseDir/metastore_db;create=true" baseBuilder diff --git a/spark/src/test/scala/ai/chronon/spark/test/TableUtilsFormatTest.scala b/spark/src/test/scala/ai/chronon/spark/test/TableUtilsFormatTest.scala index d4c8b806ad..eab02dbba7 100644 --- a/spark/src/test/scala/ai/chronon/spark/test/TableUtilsFormatTest.scala +++ b/spark/src/test/scala/ai/chronon/spark/test/TableUtilsFormatTest.scala @@ -16,7 +16,7 @@ class TableUtilsFormatTest { import TableUtilsFormatTest._ // Read the format we want this instantiation of the test to run via environment vars - val format: String = sys.env.getOrElse(FormatTestEnvVar, "hive") + val format: String = "iceberg" val spark = SparkSessionBuilder.build("TableUtilsFormatTest", local = true) val tableUtils = TableUtils(spark) From 878f64af8d3d778019ac195e227c73af2d9c9465 Mon Sep 17 00:00:00 2001 From: Abby Whittier Date: Thu, 15 May 2025 17:31:49 +0000 Subject: [PATCH 04/36] got most tests working except droppartitions and the new dateint test --- build.sbt | 20 +++++++------------ .../chronon/spark/SparkSessionBuilder.scala | 11 +++++----- .../scala/ai/chronon/spark/TableUtils.scala | 18 ++++++++++++++--- .../spark/test/TableUtilsFormatTest.scala | 2 +- 4 files changed, 28 insertions(+), 23 deletions(-) diff --git a/build.sbt b/build.sbt index e7f272a11b..45282b8f3b 100644 --- a/build.sbt +++ b/build.sbt @@ -13,7 +13,7 @@ lazy val spark3_1_1 = "3.1.1" lazy val spark3_2_1 = "3.2.1" lazy val spark3_5_3 = "3.5.3" lazy val tmp_warehouse = "/tmp/chronon/" -lazy val icebergVersion = "0.14.0" +lazy val icebergVersion = "1.1.0" ThisBuild / organization := "ai.chronon" ThisBuild / organizationName := "chronon" @@ -42,6 +42,8 @@ ThisBuild / assembly / test := {} val use_spark_3_5 = settingKey[Boolean]("Flag to build for 3.5") ThisBuild / use_spark_3_5 := false +ThisBuild / scalaVersion := "2.13.6" + def buildTimestampSuffix = ";build.timestamp=" + new java.util.Date().getTime lazy val publishSettings = Seq( publishTo := { @@ -190,14 +192,8 @@ val VersionMatrix: Map[String, VersionDependency] = Map( Some("1.0.1"), Some("2.0.2") ), - "iceberg31" -> VersionDependency( - Seq( - "org.apache.iceberg" %% "iceberg-spark-runtime-3.1", - ), - None, - Some(icebergVersion), - None - ), + //3.2 is the minimum version for iceberg + // due to INSERT_INTO support without specifying iceberg format "iceberg32" -> VersionDependency( Seq( "org.apache.iceberg" %% "iceberg-spark-runtime-3.2", @@ -432,7 +428,7 @@ lazy val spark_uber = (project in file("spark")) libraryDependencies ++= (if (use_spark_3_5.value) fromMatrix(scalaVersion.value, "jackson", "spark-all-3-5/provided", "delta-core/provided") else - fromMatrix(scalaVersion.value, "jackson", "spark-all/provided", "delta-core/provided")) + fromMatrix(scalaVersion.value, "jackson", "spark-all/provided", "delta-core/provided", "iceberg32/provided")), ) lazy val spark_embedded = (project in file("spark")) @@ -444,9 +440,7 @@ lazy val spark_embedded = (project in file("spark")) libraryDependencies ++= (if (use_spark_3_5.value) fromMatrix(scalaVersion.value, "spark-all-3-5", "delta-core") else - fromMatrix(scalaVersion.value, "spark-all", "delta-core", "iceberg31", "iceberg32")), - dependencyOverrides := Seq( "com.fasterxml.jackson.core" % "jackson-databind" % "2.10.0", - "com.fasterxml.jackson.core" % "jackson-core" % "2.10.0"), + fromMatrix(scalaVersion.value, "spark-all", "delta-core", "iceberg32")), target := target.value.toPath.resolveSibling("target-embedded").toFile, Test / test := {} ) diff --git a/spark/src/main/scala/ai/chronon/spark/SparkSessionBuilder.scala b/spark/src/main/scala/ai/chronon/spark/SparkSessionBuilder.scala index b988d64c59..089aaddd7b 100644 --- a/spark/src/main/scala/ai/chronon/spark/SparkSessionBuilder.scala +++ b/spark/src/main/scala/ai/chronon/spark/SparkSessionBuilder.scala @@ -96,18 +96,17 @@ object SparkSessionBuilder { val warehouseDir = localWarehouseLocation.map(expandUser).getOrElse(DefaultWarehouseDir.getAbsolutePath) val builder = if (true) { - SparkSession - .builder() - .appName(name) + baseBuilder .master("local[*]") .config("spark.sql.extensions", "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions") - .config("spark.sql.catalog.spark_catalog", "org.apache.iceberg.spark.SparkCatalog") + .config("spark.sql.catalog.spark_catalog", "org.apache.iceberg.spark.SparkSessionCatalog") + .config("spark.sql.catalog.local", "org.apache.iceberg.spark.SparkCatalog") .config("spark.sql.catalog.spark_catalog.type", "hadoop") .config("spark.sql.catalog.spark_catalog.warehouse", s"$warehouseDir/data") - .config("spark.sql.catalog.spark_catalog.cache.enabled", "false") + .config("spark.jars.packages", "org.apache.iceberg:iceberg-spark-runtime-3.2_1.12:1.1.0") .config("spark.driver.bindAddress", "127.0.0.1") .config("spark.chronon.table_write.format","iceberg") - .config("spark.chronon.table_read.format","iceberg") + .config("spark.chronon.table_read.format", "iceberg") } else if (local) { logger.info(s"Building local spark session with warehouse at $warehouseDir") val metastoreDb = s"jdbc:derby:;databaseName=$warehouseDir/metastore_db;create=true" diff --git a/spark/src/main/scala/ai/chronon/spark/TableUtils.scala b/spark/src/main/scala/ai/chronon/spark/TableUtils.scala index 5a93b247d0..8cd70c394b 100644 --- a/spark/src/main/scala/ai/chronon/spark/TableUtils.scala +++ b/spark/src/main/scala/ai/chronon/spark/TableUtils.scala @@ -239,9 +239,14 @@ case object Iceberg extends Format { override def partitions(tableName: String, partitionColumns: Seq[String])(implicit sparkSession: SparkSession): Seq[Map[String, String]] = { sparkSession.sqlContext - .sql(s"SHOW PARTITIONS $tableName") + .sql(s"SELECT partition FROM $tableName"++".partitions") .collect() - .map(row => parseHivePartition(row.getString(0))) + .map { row => + val partitionStruct = row.getStruct(0) + partitionStruct.schema.fieldNames.zipWithIndex.map { case (fieldName, idx) => + fieldName -> partitionStruct.getString(idx) + }.toMap + } } private def getIcebergPartitions(tableName: String, @@ -395,7 +400,14 @@ case class TableUtils(sparkSession: SparkSession) { rdd } - def tableExists(tableName: String): Boolean = sparkSession.catalog.tableExists(tableName) + def tableExists(tableName: String): Boolean = { + try { + sparkSession.sql(s"DESCRIBE TABLE $tableName") + true + } catch { + case _: AnalysisException => false + } + } def loadEntireTable(tableName: String): DataFrame = sparkSession.table(tableName) diff --git a/spark/src/test/scala/ai/chronon/spark/test/TableUtilsFormatTest.scala b/spark/src/test/scala/ai/chronon/spark/test/TableUtilsFormatTest.scala index eab02dbba7..5bf204d94f 100644 --- a/spark/src/test/scala/ai/chronon/spark/test/TableUtilsFormatTest.scala +++ b/spark/src/test/scala/ai/chronon/spark/test/TableUtilsFormatTest.scala @@ -89,7 +89,7 @@ class TableUtilsFormatTest { Row(5L, 6, "7", "2022-10-02") ) ) - testInsertPartitions(spark, tableUtils, tableName, format, df1, df2, ds1 = "2022-10-01", ds2 = "2022-10-02") + testInsertPartitions(spark, tableUtils, tableName, format, df1, df2, ds1 = "2022-10-01", ds2 = "2022-10-02") } @Test From 4739e93141663e21ad8f0b6ff29bc4f3447e8d19 Mon Sep 17 00:00:00 2001 From: Abby Whittier Date: Thu, 15 May 2025 20:02:38 +0000 Subject: [PATCH 05/36] cleaning some local changes --- .../scala/ai/chronon/spark/SparkSessionBuilder.scala | 2 ++ .../src/main/scala/ai/chronon/spark/TableUtils.scala | 3 +++ .../ai/chronon/spark/test/TableUtilsFormatTest.scala | 2 +- .../scala/ai/chronon/spark/test/TableUtilsTest.scala | 12 ++++++++++++ 4 files changed, 18 insertions(+), 1 deletion(-) diff --git a/spark/src/main/scala/ai/chronon/spark/SparkSessionBuilder.scala b/spark/src/main/scala/ai/chronon/spark/SparkSessionBuilder.scala index 089aaddd7b..f85a03ef14 100644 --- a/spark/src/main/scala/ai/chronon/spark/SparkSessionBuilder.scala +++ b/spark/src/main/scala/ai/chronon/spark/SparkSessionBuilder.scala @@ -96,6 +96,8 @@ object SparkSessionBuilder { val warehouseDir = localWarehouseLocation.map(expandUser).getOrElse(DefaultWarehouseDir.getAbsolutePath) val builder = if (true) { + //iceberg can't use derby as a metastore + //https://github.com/apache/iceberg/issues/7847 baseBuilder .master("local[*]") .config("spark.sql.extensions", "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions") diff --git a/spark/src/main/scala/ai/chronon/spark/TableUtils.scala b/spark/src/main/scala/ai/chronon/spark/TableUtils.scala index 8cd70c394b..6d8868a6cf 100644 --- a/spark/src/main/scala/ai/chronon/spark/TableUtils.scala +++ b/spark/src/main/scala/ai/chronon/spark/TableUtils.scala @@ -985,6 +985,9 @@ case class TableUtils(sparkSession: SparkSession) { partitions: Seq[String], partitionColumn: String = partitionColumn, subPartitionFilters: Map[String, String] = Map.empty): Unit = { + // TODO this is using datasource v1 semantics, which won't be compatible with non-hive catalogs + // notably, the unit test iceberg integration uses hadoop because of + // https://github.com/apache/iceberg/issues/7847 if (partitions.nonEmpty && tableExists(tableName)) { val partitionSpecs = partitions .map { partition => diff --git a/spark/src/test/scala/ai/chronon/spark/test/TableUtilsFormatTest.scala b/spark/src/test/scala/ai/chronon/spark/test/TableUtilsFormatTest.scala index 5bf204d94f..ad0cb54906 100644 --- a/spark/src/test/scala/ai/chronon/spark/test/TableUtilsFormatTest.scala +++ b/spark/src/test/scala/ai/chronon/spark/test/TableUtilsFormatTest.scala @@ -16,7 +16,7 @@ class TableUtilsFormatTest { import TableUtilsFormatTest._ // Read the format we want this instantiation of the test to run via environment vars - val format: String = "iceberg" + val format: String = sys.env.getOrElse(FormatTestEnvVar, "hive") val spark = SparkSessionBuilder.build("TableUtilsFormatTest", local = true) val tableUtils = TableUtils(spark) diff --git a/spark/src/test/scala/ai/chronon/spark/test/TableUtilsTest.scala b/spark/src/test/scala/ai/chronon/spark/test/TableUtilsTest.scala index 73b79d4322..8c6ee41648 100644 --- a/spark/src/test/scala/ai/chronon/spark/test/TableUtilsTest.scala +++ b/spark/src/test/scala/ai/chronon/spark/test/TableUtilsTest.scala @@ -22,11 +22,13 @@ import ai.chronon.spark.test.TestUtils.makeDf import ai.chronon.api.{StructField, _} import ai.chronon.online.SparkConversions import ai.chronon.spark.{IncompatibleSchemaException, PartitionRange, SparkSessionBuilder, TableUtils} +import ai.chronon.spark.SparkSessionBuilder.FormatTestEnvVar import org.apache.hadoop.hive.ql.exec.UDF import org.apache.spark.sql.functions.col import org.apache.spark.sql.{AnalysisException, DataFrame, Row, SparkSession, types} import org.junit.Assert.{assertEquals, assertTrue} import org.junit.Test +import org.junit.Assume import java.time.Instant import scala.util.{Random, Try} @@ -40,6 +42,7 @@ class SimpleAddUDF extends UDF { } class TableUtilsTest { + val format: String = sys.env.getOrElse(FormatTestEnvVar, "hive") lazy val spark: SparkSession = SparkSessionBuilder.build("TableUtilsTest", local = true) private val tableUtils = TableUtils(spark) @@ -254,6 +257,11 @@ class TableUtilsTest { @Test def testDropPartitions(): Unit = { + // TODO this is using datasource v1 semantics, which won't be compatible with non-hive catalogs + // notably, the unit test iceberg integration uses hadoop because of + // https://github.com/apache/iceberg/issues/7847 + Assume.assumeTrue(format != "iceberg") + Assume.assumeTrue(false) val tableName = "db.test_drop_partitions_table" spark.sql("CREATE DATABASE IF NOT EXISTS db") val columns1 = Array( @@ -550,6 +558,10 @@ class TableUtilsTest { @Test def testGetPartitionsWithLongPartition(): Unit = { + // This is a known issue with iceberg + // To be fixed in a fast follow PR + Assume.assumeTrue(format != "iceberg") + Assume.assumeTrue(false) val tableName = "db.test_long_partitions" spark.sql("CREATE DATABASE IF NOT EXISTS db") val structFields = Array( From 65c81a10a4c61b0a056b7beb7e3acb7682e01058 Mon Sep 17 00:00:00 2001 From: Abby Whittier Date: Thu, 15 May 2025 20:11:05 +0000 Subject: [PATCH 06/36] reverting some local changes --- api/py/python-api-build.sh | 4 ---- .../src/test/scala/ai/chronon/spark/test/TableUtilsTest.scala | 2 -- 2 files changed, 6 deletions(-) diff --git a/api/py/python-api-build.sh b/api/py/python-api-build.sh index a808d56cc0..a984be5ee0 100755 --- a/api/py/python-api-build.sh +++ b/api/py/python-api-build.sh @@ -21,10 +21,6 @@ export CHRONON_VERSION_STR=$1 export CHRONON_BRANCH_STR=$2 ACTION=$3 -python3 -m venv .venv -source .venv/bin/activate -pip install build tox - echo "Finding working directory.." SCRIPT_DIR=$( cd -- "$( dirname -- "${BASH_SOURCE[0]}" )" &> /dev/null && pwd ) echo "Removing old distributions..." diff --git a/spark/src/test/scala/ai/chronon/spark/test/TableUtilsTest.scala b/spark/src/test/scala/ai/chronon/spark/test/TableUtilsTest.scala index 8c6ee41648..3dd2437bc7 100644 --- a/spark/src/test/scala/ai/chronon/spark/test/TableUtilsTest.scala +++ b/spark/src/test/scala/ai/chronon/spark/test/TableUtilsTest.scala @@ -261,7 +261,6 @@ class TableUtilsTest { // notably, the unit test iceberg integration uses hadoop because of // https://github.com/apache/iceberg/issues/7847 Assume.assumeTrue(format != "iceberg") - Assume.assumeTrue(false) val tableName = "db.test_drop_partitions_table" spark.sql("CREATE DATABASE IF NOT EXISTS db") val columns1 = Array( @@ -561,7 +560,6 @@ class TableUtilsTest { // This is a known issue with iceberg // To be fixed in a fast follow PR Assume.assumeTrue(format != "iceberg") - Assume.assumeTrue(false) val tableName = "db.test_long_partitions" spark.sql("CREATE DATABASE IF NOT EXISTS db") val structFields = Array( From cab2c6ada2cd9b0e4de9faf167dfcd347edd0724 Mon Sep 17 00:00:00 2001 From: Abby Whittier Date: Thu, 15 May 2025 20:11:39 +0000 Subject: [PATCH 07/36] formatting --- .../chronon/spark/SparkSessionBuilder.scala | 27 +++++++++---------- .../scala/ai/chronon/spark/TableUtils.scala | 11 ++++---- 2 files changed, 19 insertions(+), 19 deletions(-) diff --git a/spark/src/main/scala/ai/chronon/spark/SparkSessionBuilder.scala b/spark/src/main/scala/ai/chronon/spark/SparkSessionBuilder.scala index f85a03ef14..48e4ff2358 100644 --- a/spark/src/main/scala/ai/chronon/spark/SparkSessionBuilder.scala +++ b/spark/src/main/scala/ai/chronon/spark/SparkSessionBuilder.scala @@ -63,18 +63,17 @@ object SparkSessionBuilder { } var baseBuilder = SparkSession - .builder() - .appName(name) - .enableHiveSupport() - .config("spark.sql.session.timeZone", "UTC") - //otherwise overwrite will delete ALL partitions, not just the ones it touches - .config("spark.sql.sources.partitionOverwriteMode", "dynamic") - .config("hive.exec.dynamic.partition", "true") - .config("hive.exec.dynamic.partition.mode", "nonstrict") - .config("spark.sql.catalogImplementation", "hive") - .config("spark.hadoop.hive.exec.max.dynamic.partitions", 30000) - .config("spark.sql.legacy.timeParserPolicy", "LEGACY") - + .builder() + .appName(name) + .enableHiveSupport() + .config("spark.sql.session.timeZone", "UTC") + //otherwise overwrite will delete ALL partitions, not just the ones it touches + .config("spark.sql.sources.partitionOverwriteMode", "dynamic") + .config("hive.exec.dynamic.partition", "true") + .config("hive.exec.dynamic.partition.mode", "nonstrict") + .config("spark.sql.catalogImplementation", "hive") + .config("spark.hadoop.hive.exec.max.dynamic.partitions", 30000) + .config("spark.sql.legacy.timeParserPolicy", "LEGACY") // Staging queries don't benefit from the KryoSerializer and in fact may fail with buffer underflow in some cases. if (enforceKryoSerializer) { @@ -107,8 +106,8 @@ object SparkSessionBuilder { .config("spark.sql.catalog.spark_catalog.warehouse", s"$warehouseDir/data") .config("spark.jars.packages", "org.apache.iceberg:iceberg-spark-runtime-3.2_1.12:1.1.0") .config("spark.driver.bindAddress", "127.0.0.1") - .config("spark.chronon.table_write.format","iceberg") - .config("spark.chronon.table_read.format", "iceberg") + .config("spark.chronon.table_write.format", "iceberg") + .config("spark.chronon.table_read.format", "iceberg") } else if (local) { logger.info(s"Building local spark session with warehouse at $warehouseDir") val metastoreDb = s"jdbc:derby:;databaseName=$warehouseDir/metastore_db;create=true" diff --git a/spark/src/main/scala/ai/chronon/spark/TableUtils.scala b/spark/src/main/scala/ai/chronon/spark/TableUtils.scala index 6d8868a6cf..e4be997982 100644 --- a/spark/src/main/scala/ai/chronon/spark/TableUtils.scala +++ b/spark/src/main/scala/ai/chronon/spark/TableUtils.scala @@ -239,12 +239,13 @@ case object Iceberg extends Format { override def partitions(tableName: String, partitionColumns: Seq[String])(implicit sparkSession: SparkSession): Seq[Map[String, String]] = { sparkSession.sqlContext - .sql(s"SELECT partition FROM $tableName"++".partitions") + .sql(s"SELECT partition FROM $tableName" ++ ".partitions") .collect() .map { row => val partitionStruct = row.getStruct(0) - partitionStruct.schema.fieldNames.zipWithIndex.map { case (fieldName, idx) => - fieldName -> partitionStruct.getString(idx) + partitionStruct.schema.fieldNames.zipWithIndex.map { + case (fieldName, idx) => + fieldName -> partitionStruct.getString(idx) }.toMap } } @@ -986,8 +987,8 @@ case class TableUtils(sparkSession: SparkSession) { partitionColumn: String = partitionColumn, subPartitionFilters: Map[String, String] = Map.empty): Unit = { // TODO this is using datasource v1 semantics, which won't be compatible with non-hive catalogs - // notably, the unit test iceberg integration uses hadoop because of - // https://github.com/apache/iceberg/issues/7847 + // notably, the unit test iceberg integration uses hadoop because of + // https://github.com/apache/iceberg/issues/7847 if (partitions.nonEmpty && tableExists(tableName)) { val partitionSpecs = partitions .map { partition => From f4bfb70e0bc633ea39204e9271364f3bf490086c Mon Sep 17 00:00:00 2001 From: Abby Whittier Date: Thu, 15 May 2025 20:40:57 +0000 Subject: [PATCH 08/36] more silly local hacks --- spark/src/main/scala/ai/chronon/spark/SparkSessionBuilder.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/spark/src/main/scala/ai/chronon/spark/SparkSessionBuilder.scala b/spark/src/main/scala/ai/chronon/spark/SparkSessionBuilder.scala index 48e4ff2358..72e8126170 100644 --- a/spark/src/main/scala/ai/chronon/spark/SparkSessionBuilder.scala +++ b/spark/src/main/scala/ai/chronon/spark/SparkSessionBuilder.scala @@ -94,7 +94,7 @@ object SparkSessionBuilder { val userName = Properties.userName val warehouseDir = localWarehouseLocation.map(expandUser).getOrElse(DefaultWarehouseDir.getAbsolutePath) - val builder = if (true) { + val builder = if (sys.env.getOrElse(FormatTestEnvVar, "hive") == "iceberg") { //iceberg can't use derby as a metastore //https://github.com/apache/iceberg/issues/7847 baseBuilder From ae2088f94a5b221e801673e88fdb28f8c1373c15 Mon Sep 17 00:00:00 2001 From: Abby Whittier Date: Fri, 16 May 2025 14:44:24 +0000 Subject: [PATCH 09/36] fixed the constant derby flake --- .../src/main/scala/ai/chronon/spark/SparkSessionBuilder.scala | 3 +++ 1 file changed, 3 insertions(+) diff --git a/spark/src/main/scala/ai/chronon/spark/SparkSessionBuilder.scala b/spark/src/main/scala/ai/chronon/spark/SparkSessionBuilder.scala index 72e8126170..71b6c92ba6 100644 --- a/spark/src/main/scala/ai/chronon/spark/SparkSessionBuilder.scala +++ b/spark/src/main/scala/ai/chronon/spark/SparkSessionBuilder.scala @@ -23,6 +23,7 @@ import org.apache.spark.SPARK_VERSION import java.io.File import java.util.logging.Logger import scala.util.Properties +import java.util.UUID object SparkSessionBuilder { @transient private lazy val logger = LoggerFactory.getLogger(getClass) @@ -97,6 +98,7 @@ object SparkSessionBuilder { val builder = if (sys.env.getOrElse(FormatTestEnvVar, "hive") == "iceberg") { //iceberg can't use derby as a metastore //https://github.com/apache/iceberg/issues/7847 + val metastoreDb = s"jdbc:derby:;databaseName=$warehouseDir/${UUID.randomUUID}/metastore_db;create=true" baseBuilder .master("local[*]") .config("spark.sql.extensions", "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions") @@ -105,6 +107,7 @@ object SparkSessionBuilder { .config("spark.sql.catalog.spark_catalog.type", "hadoop") .config("spark.sql.catalog.spark_catalog.warehouse", s"$warehouseDir/data") .config("spark.jars.packages", "org.apache.iceberg:iceberg-spark-runtime-3.2_1.12:1.1.0") + .config("spark.hadoop.javax.jdo.option.ConnectionURL", metastoreDb) .config("spark.driver.bindAddress", "127.0.0.1") .config("spark.chronon.table_write.format", "iceberg") .config("spark.chronon.table_read.format", "iceberg") From dee01ab2f1bbdaf4b55295509770e851697a661b Mon Sep 17 00:00:00 2001 From: Abby Whittier Date: Fri, 16 May 2025 14:56:02 +0000 Subject: [PATCH 10/36] refactored to match deltalake --- .../chronon/spark/SparkSessionBuilder.scala | 36 +++++++++---------- 1 file changed, 16 insertions(+), 20 deletions(-) diff --git a/spark/src/main/scala/ai/chronon/spark/SparkSessionBuilder.scala b/spark/src/main/scala/ai/chronon/spark/SparkSessionBuilder.scala index 71b6c92ba6..4780f3bb27 100644 --- a/spark/src/main/scala/ai/chronon/spark/SparkSessionBuilder.scala +++ b/spark/src/main/scala/ai/chronon/spark/SparkSessionBuilder.scala @@ -40,6 +40,8 @@ object SparkSessionBuilder { additionalConfig: Option[Map[String, String]] = None, enforceKryoSerializer: Boolean = true): SparkSession = { + val userName = Properties.userName + val warehouseDir = localWarehouseLocation.map(expandUser).getOrElse(DefaultWarehouseDir.getAbsolutePath) // allow us to override the format by specifying env vars. This allows us to not have to worry about interference // between Spark sessions created in existing chronon tests that need the hive format and some specific tests // that require a format override like delta lake. @@ -52,6 +54,19 @@ object SparkSessionBuilder { ) (configMap, "ai.chronon.spark.ChrononDeltaLakeKryoRegistrator") (configMap, "ai.chronon.spark.ChrononKryoRegistrator") + case Some("iceberg") => + val configMap = Map( + "spark.sql.extensions" -> "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions", + "spark.sql.catalog.spark_catalog" -> "org.apache.iceberg.spark.SparkSessionCatalog", + "spark.chronon.table_write.format" -> "iceberg", + "spark.chronon.table_read.format" -> "iceberg", + "spark.sql.catalog.local" -> "org.apache.iceberg.spark.SparkCatalog", + "spark.sql.catalog.spark_catalog.type" -> "hadoop", + "spark.sql.catalog.spark_catalog.warehouse" -> s"$warehouseDir/data", + "spark.jars.packages" -> "org.apache.iceberg:iceberg-spark-runtime-3.2_1.12:1.1.0" + ) + // TODO add an iceberg kryo registrator + (configMap, "ai.chronon.spark.ChrononKryoRegistrator") case _ => (Map.empty, "ai.chronon.spark.ChrononKryoRegistrator") } @@ -92,26 +107,7 @@ object SparkSessionBuilder { baseBuilder.config("spark.sql.legacy.allowCreatingManagedTableUsingNonemptyLocation", "true") } - val userName = Properties.userName - val warehouseDir = localWarehouseLocation.map(expandUser).getOrElse(DefaultWarehouseDir.getAbsolutePath) - - val builder = if (sys.env.getOrElse(FormatTestEnvVar, "hive") == "iceberg") { - //iceberg can't use derby as a metastore - //https://github.com/apache/iceberg/issues/7847 - val metastoreDb = s"jdbc:derby:;databaseName=$warehouseDir/${UUID.randomUUID}/metastore_db;create=true" - baseBuilder - .master("local[*]") - .config("spark.sql.extensions", "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions") - .config("spark.sql.catalog.spark_catalog", "org.apache.iceberg.spark.SparkSessionCatalog") - .config("spark.sql.catalog.local", "org.apache.iceberg.spark.SparkCatalog") - .config("spark.sql.catalog.spark_catalog.type", "hadoop") - .config("spark.sql.catalog.spark_catalog.warehouse", s"$warehouseDir/data") - .config("spark.jars.packages", "org.apache.iceberg:iceberg-spark-runtime-3.2_1.12:1.1.0") - .config("spark.hadoop.javax.jdo.option.ConnectionURL", metastoreDb) - .config("spark.driver.bindAddress", "127.0.0.1") - .config("spark.chronon.table_write.format", "iceberg") - .config("spark.chronon.table_read.format", "iceberg") - } else if (local) { + val builder = if (local) { logger.info(s"Building local spark session with warehouse at $warehouseDir") val metastoreDb = s"jdbc:derby:;databaseName=$warehouseDir/metastore_db;create=true" baseBuilder From 69cd50da8a4e9f3a458ecd6d07e83bc82c0754e0 Mon Sep 17 00:00:00 2001 From: Abby Whittier Date: Fri, 16 May 2025 15:30:43 +0000 Subject: [PATCH 11/36] added Iceberg Kryo Serializer --- .../spark/ChrononKryoRegistrator.scala | 29 +++++++++++++++++++ .../chronon/spark/SparkSessionBuilder.scala | 2 +- 2 files changed, 30 insertions(+), 1 deletion(-) diff --git a/spark/src/main/scala/ai/chronon/spark/ChrononKryoRegistrator.scala b/spark/src/main/scala/ai/chronon/spark/ChrononKryoRegistrator.scala index 931efe6080..af2997104d 100644 --- a/spark/src/main/scala/ai/chronon/spark/ChrononKryoRegistrator.scala +++ b/spark/src/main/scala/ai/chronon/spark/ChrononKryoRegistrator.scala @@ -178,3 +178,32 @@ class ChrononDeltaLakeKryoRegistrator extends ChrononKryoRegistrator { additionalDeltaNames.foreach(name => doRegister(name, kryo)) } } + +class ChrononIcebergKryoRegistrator extends ChrononKryoRegistrator { + override def registerClasses(kryo: Kryo): Unit = { + super.registerClasses(kryo) + val additionalIcebergNames = Seq( + "org.apache.iceberg.spark.source.SerializableTableWithSize", + "org.apache.iceberg.encryption.PlaintextEncryptionManager", + "org.apache.iceberg.hadoop.HadoopFileIO", + "org.apache.iceberg.SerializableTable$SerializableConfSupplier", + "org.apache.iceberg.util.SerializableMap", + "org.apache.iceberg.LocationProviders$DefaultLocationProvider", + "org.apache.iceberg.spark.source.SparkWrite$TaskCommit", + "org.apache.iceberg.DataFile", + "org.apache.iceberg.GenericDataFile", + "org.apache.iceberg.FileContent", + "org.apache.iceberg.FileFormat", + "org.apache.iceberg.SerializableByteBufferMap", + "org.apache.iceberg.PartitionData", + "org.apache.iceberg.types.Types$StructType", + "org.apache.iceberg.types.Types$NestedField", + "org.apache.iceberg.types.Types$StringType", + "org.apache.iceberg.SnapshotRef", + "org.apache.iceberg.SnapshotRefType", + "org.apache.iceberg.spark.source.SerializableTableWithSize$SerializableMetadataTableWithSize", + "org.apache.iceberg.MetadataTableType", + ) + additionalIcebergNames.foreach(name => doRegister(name, kryo)) + } +} \ No newline at end of file diff --git a/spark/src/main/scala/ai/chronon/spark/SparkSessionBuilder.scala b/spark/src/main/scala/ai/chronon/spark/SparkSessionBuilder.scala index 4780f3bb27..af54cd9707 100644 --- a/spark/src/main/scala/ai/chronon/spark/SparkSessionBuilder.scala +++ b/spark/src/main/scala/ai/chronon/spark/SparkSessionBuilder.scala @@ -66,7 +66,7 @@ object SparkSessionBuilder { "spark.jars.packages" -> "org.apache.iceberg:iceberg-spark-runtime-3.2_1.12:1.1.0" ) // TODO add an iceberg kryo registrator - (configMap, "ai.chronon.spark.ChrononKryoRegistrator") + (configMap, "ai.chronon.spark.ChrononIcebergKryoRegistrator") case _ => (Map.empty, "ai.chronon.spark.ChrononKryoRegistrator") } From 5526abeff8c1e51a73d1b03507162c5df4b6e1b4 Mon Sep 17 00:00:00 2001 From: Abby Whittier Date: Fri, 16 May 2025 15:33:14 +0000 Subject: [PATCH 12/36] scalafmt --- .../main/scala/ai/chronon/spark/ChrononKryoRegistrator.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/spark/src/main/scala/ai/chronon/spark/ChrononKryoRegistrator.scala b/spark/src/main/scala/ai/chronon/spark/ChrononKryoRegistrator.scala index af2997104d..7788a1ac30 100644 --- a/spark/src/main/scala/ai/chronon/spark/ChrononKryoRegistrator.scala +++ b/spark/src/main/scala/ai/chronon/spark/ChrononKryoRegistrator.scala @@ -202,8 +202,8 @@ class ChrononIcebergKryoRegistrator extends ChrononKryoRegistrator { "org.apache.iceberg.SnapshotRef", "org.apache.iceberg.SnapshotRefType", "org.apache.iceberg.spark.source.SerializableTableWithSize$SerializableMetadataTableWithSize", - "org.apache.iceberg.MetadataTableType", + "org.apache.iceberg.MetadataTableType" ) additionalIcebergNames.foreach(name => doRegister(name, kryo)) } -} \ No newline at end of file +} From 2b9c246130bef292ca63cb184bb8883d2da90e67 Mon Sep 17 00:00:00 2001 From: Abby Whittier Date: Fri, 16 May 2025 15:50:13 +0000 Subject: [PATCH 13/36] iceberg circleci integration --- .circleci/config.yml | 34 +++++++++++++++++++++++++++++++++- 1 file changed, 33 insertions(+), 1 deletion(-) diff --git a/.circleci/config.yml b/.circleci/config.yml index 5e21b41812..3416ffd4aa 100644 --- a/.circleci/config.yml +++ b/.circleci/config.yml @@ -138,6 +138,35 @@ jobs: command: | conda activate chronon_py sbt +scalafmtCheck + # run these separately as we need a isolated JVM to not have Spark session settings interfere with other runs + # long term goal is to refactor the current testing spark session builder and avoid adding new single test to CI + "Scala 12 -- Iceberg Format Tests": + executor: docker_baseimg_executor + steps: + - checkout + - run: + name: Run Scala 12 tests for Iceberg format + environment: + format_test: iceberg + shell: /bin/bash -leuxo pipefail + command: | + conda activate chronon_py + # Increase if we see OOM. + export SBT_OPTS="-XX:+CMSClassUnloadingEnabled -XX:MaxPermSize=4G -Xmx4G -Xms2G" + sbt '++ 2.12.12' "testOnly ai.chronon.spark.test.TableUtilsFormatTest" + - store_test_results: + path: /chronon/spark/target/test-reports + - store_test_results: + path: /chronon/aggregator/target/test-reports + - run: + name: Compress spark-warehouse + command: | + cd /tmp/ && tar -czvf spark-warehouse.tar.gz chronon/spark-warehouse + when: on_fail + - store_artifacts: + path: /tmp/spark-warehouse.tar.gz + destination: spark_warehouse.tar.gz + when: on_fail workflows: build_test_deploy: @@ -160,4 +189,7 @@ workflows: - "Pull Docker Image" - "Chronon Python Lint": requires: - - "Pull Docker Image" + - "Pull Docker Image - + - "Scala 12 -- Iceberg Format Tests": + requires: + - "Pull Docker Image" \ No newline at end of file From b423f4b9553ab32b6f042369522833a92128e74e Mon Sep 17 00:00:00 2001 From: Abby Whittier Date: Fri, 16 May 2025 15:53:54 +0000 Subject: [PATCH 14/36] fixing typo --- .circleci/config.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.circleci/config.yml b/.circleci/config.yml index 3416ffd4aa..5c646cc547 100644 --- a/.circleci/config.yml +++ b/.circleci/config.yml @@ -189,7 +189,7 @@ workflows: - "Pull Docker Image" - "Chronon Python Lint": requires: - - "Pull Docker Image - + - "Pull Docker Image" - "Scala 12 -- Iceberg Format Tests": requires: - "Pull Docker Image" \ No newline at end of file From 9e5eaaee0a8fdd82b875e01b4294e4be22500bd0 Mon Sep 17 00:00:00 2001 From: Abby Whittier Date: Fri, 16 May 2025 16:02:30 +0000 Subject: [PATCH 15/36] giving circleci a dependency --- .env | 1 + build.sbt | 2 ++ 2 files changed, 3 insertions(+) create mode 100644 .env diff --git a/.env b/.env new file mode 100644 index 0000000000..a930e6e702 --- /dev/null +++ b/.env @@ -0,0 +1 @@ +format_test=iceberg \ No newline at end of file diff --git a/build.sbt b/build.sbt index 45282b8f3b..bfe88d4b99 100644 --- a/build.sbt +++ b/build.sbt @@ -429,6 +429,8 @@ lazy val spark_uber = (project in file("spark")) fromMatrix(scalaVersion.value, "jackson", "spark-all-3-5/provided", "delta-core/provided") else fromMatrix(scalaVersion.value, "jackson", "spark-all/provided", "delta-core/provided", "iceberg32/provided")), + // make Iceberg classes available in tests for circleci + Test / libraryDependencies ++= fromMatrix(scalaVersion.value, "iceberg32"), ) lazy val spark_embedded = (project in file("spark")) From 558adc3dad1115d2dad407d772191e6f46ed379d Mon Sep 17 00:00:00 2001 From: Abby Whittier Date: Fri, 16 May 2025 16:02:46 +0000 Subject: [PATCH 16/36] removing env file --- .env | 1 - 1 file changed, 1 deletion(-) delete mode 100644 .env diff --git a/.env b/.env deleted file mode 100644 index a930e6e702..0000000000 --- a/.env +++ /dev/null @@ -1 +0,0 @@ -format_test=iceberg \ No newline at end of file From c1eb8a238b06af992609f4931f868dfb031d6b44 Mon Sep 17 00:00:00 2001 From: Abby Whittier Date: Fri, 16 May 2025 16:16:20 +0000 Subject: [PATCH 17/36] moving integration test to spark_embedded --- .circleci/config.yml | 3 ++- build.sbt | 2 -- 2 files changed, 2 insertions(+), 3 deletions(-) diff --git a/.circleci/config.yml b/.circleci/config.yml index 5c646cc547..7ca7593486 100644 --- a/.circleci/config.yml +++ b/.circleci/config.yml @@ -153,7 +153,8 @@ jobs: conda activate chronon_py # Increase if we see OOM. export SBT_OPTS="-XX:+CMSClassUnloadingEnabled -XX:MaxPermSize=4G -Xmx4G -Xms2G" - sbt '++ 2.12.12' "testOnly ai.chronon.spark.test.TableUtilsFormatTest" + # spark_embedded only because we don't want to install a ton of jars on the uber if they're not needed + sbt '++ 2.12.12' "project spark_embedded" "testOnly ai.chronon.spark.test.TableUtilsFormatTest" - store_test_results: path: /chronon/spark/target/test-reports - store_test_results: diff --git a/build.sbt b/build.sbt index bfe88d4b99..45282b8f3b 100644 --- a/build.sbt +++ b/build.sbt @@ -429,8 +429,6 @@ lazy val spark_uber = (project in file("spark")) fromMatrix(scalaVersion.value, "jackson", "spark-all-3-5/provided", "delta-core/provided") else fromMatrix(scalaVersion.value, "jackson", "spark-all/provided", "delta-core/provided", "iceberg32/provided")), - // make Iceberg classes available in tests for circleci - Test / libraryDependencies ++= fromMatrix(scalaVersion.value, "iceberg32"), ) lazy val spark_embedded = (project in file("spark")) From 510266c28a9fb85e22d5d74d7fbb3867eccc9909 Mon Sep 17 00:00:00 2001 From: Abby Whittier Date: Fri, 16 May 2025 16:57:58 +0000 Subject: [PATCH 18/36] figured out why delta lake was on 2.13, need it for spark 3.2 --- .circleci/config.yml | 7 +++---- build.sbt | 4 +--- .../main/scala/ai/chronon/spark/SparkSessionBuilder.scala | 1 - 3 files changed, 4 insertions(+), 8 deletions(-) diff --git a/.circleci/config.yml b/.circleci/config.yml index 7ca7593486..22e8ca05a2 100644 --- a/.circleci/config.yml +++ b/.circleci/config.yml @@ -140,12 +140,12 @@ jobs: sbt +scalafmtCheck # run these separately as we need a isolated JVM to not have Spark session settings interfere with other runs # long term goal is to refactor the current testing spark session builder and avoid adding new single test to CI - "Scala 12 -- Iceberg Format Tests": + "Scala 13 -- Iceberg Format Tests": executor: docker_baseimg_executor steps: - checkout - run: - name: Run Scala 12 tests for Iceberg format + name: Run Scala 13 tests for Iceberg format environment: format_test: iceberg shell: /bin/bash -leuxo pipefail @@ -153,8 +153,7 @@ jobs: conda activate chronon_py # Increase if we see OOM. export SBT_OPTS="-XX:+CMSClassUnloadingEnabled -XX:MaxPermSize=4G -Xmx4G -Xms2G" - # spark_embedded only because we don't want to install a ton of jars on the uber if they're not needed - sbt '++ 2.12.12' "project spark_embedded" "testOnly ai.chronon.spark.test.TableUtilsFormatTest" + sbt '++ 2.13.6' "testOnly ai.chronon.spark.test.TableUtilsFormatTest" - store_test_results: path: /chronon/spark/target/test-reports - store_test_results: diff --git a/build.sbt b/build.sbt index 45282b8f3b..932f6e6259 100644 --- a/build.sbt +++ b/build.sbt @@ -42,8 +42,6 @@ ThisBuild / assembly / test := {} val use_spark_3_5 = settingKey[Boolean]("Flag to build for 3.5") ThisBuild / use_spark_3_5 := false -ThisBuild / scalaVersion := "2.13.6" - def buildTimestampSuffix = ";build.timestamp=" + new java.util.Date().getTime lazy val publishSettings = Seq( publishTo := { @@ -200,7 +198,7 @@ val VersionMatrix: Map[String, VersionDependency] = Map( ), None, None, - Some(icebergVersion) + Some(icebergVersion), ), "jackson" -> VersionDependency( Seq( diff --git a/spark/src/main/scala/ai/chronon/spark/SparkSessionBuilder.scala b/spark/src/main/scala/ai/chronon/spark/SparkSessionBuilder.scala index af54cd9707..ec5b07b936 100644 --- a/spark/src/main/scala/ai/chronon/spark/SparkSessionBuilder.scala +++ b/spark/src/main/scala/ai/chronon/spark/SparkSessionBuilder.scala @@ -63,7 +63,6 @@ object SparkSessionBuilder { "spark.sql.catalog.local" -> "org.apache.iceberg.spark.SparkCatalog", "spark.sql.catalog.spark_catalog.type" -> "hadoop", "spark.sql.catalog.spark_catalog.warehouse" -> s"$warehouseDir/data", - "spark.jars.packages" -> "org.apache.iceberg:iceberg-spark-runtime-3.2_1.12:1.1.0" ) // TODO add an iceberg kryo registrator (configMap, "ai.chronon.spark.ChrononIcebergKryoRegistrator") From 58a58ffa5045fd07a0d0c8b8d907a052ae47fc10 Mon Sep 17 00:00:00 2001 From: Abby Whittier Date: Fri, 16 May 2025 16:59:09 +0000 Subject: [PATCH 19/36] typo --- .circleci/config.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.circleci/config.yml b/.circleci/config.yml index 22e8ca05a2..1e3379e30c 100644 --- a/.circleci/config.yml +++ b/.circleci/config.yml @@ -190,6 +190,6 @@ workflows: - "Chronon Python Lint": requires: - "Pull Docker Image" - - "Scala 12 -- Iceberg Format Tests": + - "Scala 13 -- Iceberg Format Tests": requires: - "Pull Docker Image" \ No newline at end of file From 6ea41216720967aa72a8e549e8e52ad7b4c28d31 Mon Sep 17 00:00:00 2001 From: Abby Whittier Date: Fri, 16 May 2025 17:05:00 +0000 Subject: [PATCH 20/36] scalafmt --- spark/src/main/scala/ai/chronon/spark/SparkSessionBuilder.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/spark/src/main/scala/ai/chronon/spark/SparkSessionBuilder.scala b/spark/src/main/scala/ai/chronon/spark/SparkSessionBuilder.scala index ec5b07b936..1639f03e6e 100644 --- a/spark/src/main/scala/ai/chronon/spark/SparkSessionBuilder.scala +++ b/spark/src/main/scala/ai/chronon/spark/SparkSessionBuilder.scala @@ -62,7 +62,7 @@ object SparkSessionBuilder { "spark.chronon.table_read.format" -> "iceberg", "spark.sql.catalog.local" -> "org.apache.iceberg.spark.SparkCatalog", "spark.sql.catalog.spark_catalog.type" -> "hadoop", - "spark.sql.catalog.spark_catalog.warehouse" -> s"$warehouseDir/data", + "spark.sql.catalog.spark_catalog.warehouse" -> s"$warehouseDir/data" ) // TODO add an iceberg kryo registrator (configMap, "ai.chronon.spark.ChrononIcebergKryoRegistrator") From cb012a6d1d5f8d381696f74d23ffcac21516dbc7 Mon Sep 17 00:00:00 2001 From: Abby Whittier Date: Fri, 16 May 2025 17:22:13 +0000 Subject: [PATCH 21/36] skipping the flink parts since it doesn't compile to 2.13.6 --- .circleci/config.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.circleci/config.yml b/.circleci/config.yml index 1e3379e30c..ff5ec9d8e5 100644 --- a/.circleci/config.yml +++ b/.circleci/config.yml @@ -153,7 +153,7 @@ jobs: conda activate chronon_py # Increase if we see OOM. export SBT_OPTS="-XX:+CMSClassUnloadingEnabled -XX:MaxPermSize=4G -Xmx4G -Xms2G" - sbt '++ 2.13.6' "testOnly ai.chronon.spark.test.TableUtilsFormatTest" + sbt 'project spark_embedded; ++ 2.13.6' "testOnly ai.chronon.spark.test.TableUtilsFormatTest" - store_test_results: path: /chronon/spark/target/test-reports - store_test_results: From 117c4f74326aca59e7291b6243432682c3c7bb84 Mon Sep 17 00:00:00 2001 From: Abby Whittier Date: Fri, 16 May 2025 17:36:30 +0000 Subject: [PATCH 22/36] including TableUtilsTest as well in CI --- .circleci/config.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.circleci/config.yml b/.circleci/config.yml index ff5ec9d8e5..cdafc0fa0f 100644 --- a/.circleci/config.yml +++ b/.circleci/config.yml @@ -153,7 +153,7 @@ jobs: conda activate chronon_py # Increase if we see OOM. export SBT_OPTS="-XX:+CMSClassUnloadingEnabled -XX:MaxPermSize=4G -Xmx4G -Xms2G" - sbt 'project spark_embedded; ++ 2.13.6' "testOnly ai.chronon.spark.test.TableUtilsFormatTest" + sbt ';project spark_embedded; ++ 2.13.6; testOnly ai.chronon.spark.test.TableUtilsFormatTest ai.chronon.spark.test.TableUtilsTest' - store_test_results: path: /chronon/spark/target/test-reports - store_test_results: From 7d3272f48172f53e549a9cb7ae8a79eda92e76a3 Mon Sep 17 00:00:00 2001 From: Abby Whittier Date: Fri, 16 May 2025 18:33:01 +0000 Subject: [PATCH 23/36] sperating table utils and format for seperate jvms --- .circleci/config.yml | 32 +++++++++++++++++++++++++++++++- 1 file changed, 31 insertions(+), 1 deletion(-) diff --git a/.circleci/config.yml b/.circleci/config.yml index cdafc0fa0f..e27ea12392 100644 --- a/.circleci/config.yml +++ b/.circleci/config.yml @@ -153,7 +153,34 @@ jobs: conda activate chronon_py # Increase if we see OOM. export SBT_OPTS="-XX:+CMSClassUnloadingEnabled -XX:MaxPermSize=4G -Xmx4G -Xms2G" - sbt ';project spark_embedded; ++ 2.13.6; testOnly ai.chronon.spark.test.TableUtilsFormatTest ai.chronon.spark.test.TableUtilsTest' + sbt ';project spark_embedded; ++ 2.13.6; testOnly ai.chronon.spark.test.TableUtilsFormatTest' + - store_test_results: + path: /chronon/spark/target/test-reports + - store_test_results: + path: /chronon/aggregator/target/test-reports + - run: + name: Compress spark-warehouse + command: | + cd /tmp/ && tar -czvf spark-warehouse.tar.gz chronon/spark-warehouse + when: on_fail + - store_artifacts: + path: /tmp/spark-warehouse.tar.gz + destination: spark_warehouse.tar.gz + when: on_fail + "Scala 13 -- Iceberg Table Utils Tests": + executor: docker_baseimg_executor + steps: + - checkout + - run: + name: Run Scala 13 tests for Iceberg Table Utils + environment: + format_test: iceberg + shell: /bin/bash -leuxo pipefail + command: | + conda activate chronon_py + # Increase if we see OOM. + export SBT_OPTS="-XX:+CMSClassUnloadingEnabled -XX:MaxPermSize=4G -Xmx4G -Xms2G" + sbt ';project spark_embedded; ++ 2.13.6; ai.chronon.spark.test.TableUtilsTest' - store_test_results: path: /chronon/spark/target/test-reports - store_test_results: @@ -191,5 +218,8 @@ workflows: requires: - "Pull Docker Image" - "Scala 13 -- Iceberg Format Tests": + requires: + - "Pull Docker Image" + - "Scala 13 -- Iceberg Table Utils Tests": requires: - "Pull Docker Image" \ No newline at end of file From f009f97526ea3870eab7c2079875efaf9b360217 Mon Sep 17 00:00:00 2001 From: Abby Whittier Date: Fri, 16 May 2025 18:36:30 +0000 Subject: [PATCH 24/36] typo --- .circleci/config.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.circleci/config.yml b/.circleci/config.yml index e27ea12392..dfa66dadf7 100644 --- a/.circleci/config.yml +++ b/.circleci/config.yml @@ -180,7 +180,7 @@ jobs: conda activate chronon_py # Increase if we see OOM. export SBT_OPTS="-XX:+CMSClassUnloadingEnabled -XX:MaxPermSize=4G -Xmx4G -Xms2G" - sbt ';project spark_embedded; ++ 2.13.6; ai.chronon.spark.test.TableUtilsTest' + sbt ';project spark_embedded; ++ 2.13.6; testOnly ai.chronon.spark.test.TableUtilsTest' - store_test_results: path: /chronon/spark/target/test-reports - store_test_results: From 02c14638b83c4e763d36334542895a871ccc4607 Mon Sep 17 00:00:00 2001 From: Abby Whittier Date: Fri, 16 May 2025 21:03:03 +0000 Subject: [PATCH 25/36] corrected behavior for long partitions --- .../ai/chronon/spark/ChrononKryoRegistrator.scala | 11 +++++++++++ .../src/main/scala/ai/chronon/spark/TableUtils.scala | 2 +- 2 files changed, 12 insertions(+), 1 deletion(-) diff --git a/spark/src/main/scala/ai/chronon/spark/ChrononKryoRegistrator.scala b/spark/src/main/scala/ai/chronon/spark/ChrononKryoRegistrator.scala index 7788a1ac30..8b0d4d8f11 100644 --- a/spark/src/main/scala/ai/chronon/spark/ChrononKryoRegistrator.scala +++ b/spark/src/main/scala/ai/chronon/spark/ChrononKryoRegistrator.scala @@ -196,9 +196,20 @@ class ChrononIcebergKryoRegistrator extends ChrononKryoRegistrator { "org.apache.iceberg.FileFormat", "org.apache.iceberg.SerializableByteBufferMap", "org.apache.iceberg.PartitionData", + // For some reasons just .Types doesn't work "org.apache.iceberg.types.Types$StructType", "org.apache.iceberg.types.Types$NestedField", "org.apache.iceberg.types.Types$StringType", + "org.apache.iceberg.types.Types$IntegerType", + "org.apache.iceberg.types.Types$LongType", + "org.apache.iceberg.types.Types$DoubleType", + "org.apache.iceberg.types.Types$FloatType", + "org.apache.iceberg.types.Types$BooleanType", + "org.apache.iceberg.types.Types$DateType", + "org.apache.iceberg.types.Types$TimestampType", + "org.apache.iceberg.types.Types$TimeType", + "org.apache.iceberg.types.Types$DecimalType", + "org.apache.iceberg.types.Types$NestedField$", "org.apache.iceberg.SnapshotRef", "org.apache.iceberg.SnapshotRefType", "org.apache.iceberg.spark.source.SerializableTableWithSize$SerializableMetadataTableWithSize", diff --git a/spark/src/main/scala/ai/chronon/spark/TableUtils.scala b/spark/src/main/scala/ai/chronon/spark/TableUtils.scala index e4be997982..c3dd5c37df 100644 --- a/spark/src/main/scala/ai/chronon/spark/TableUtils.scala +++ b/spark/src/main/scala/ai/chronon/spark/TableUtils.scala @@ -245,7 +245,7 @@ case object Iceberg extends Format { val partitionStruct = row.getStruct(0) partitionStruct.schema.fieldNames.zipWithIndex.map { case (fieldName, idx) => - fieldName -> partitionStruct.getString(idx) + fieldName -> partitionStruct.get(idx).toString }.toMap } } From 059e16e834b5b1a37663b617ff9d1d957ada95ed Mon Sep 17 00:00:00 2001 From: Abby Whittier Date: Sat, 17 May 2025 12:28:42 +0000 Subject: [PATCH 26/36] eventeventlongds test, more kryo registration --- .../spark/ChrononKryoRegistrator.scala | 3 +- .../ai/chronon/spark/test/JoinTest.scala | 84 +++++++++++++++++++ 2 files changed, 86 insertions(+), 1 deletion(-) diff --git a/spark/src/main/scala/ai/chronon/spark/ChrononKryoRegistrator.scala b/spark/src/main/scala/ai/chronon/spark/ChrononKryoRegistrator.scala index 8b0d4d8f11..3726905977 100644 --- a/spark/src/main/scala/ai/chronon/spark/ChrononKryoRegistrator.scala +++ b/spark/src/main/scala/ai/chronon/spark/ChrononKryoRegistrator.scala @@ -147,7 +147,8 @@ class ChrononKryoRegistrator extends KryoRegistrator { "org.apache.spark.sql.catalyst.InternalRow$$anonfun$getAccessor$8", "org.apache.spark.sql.catalyst.InternalRow$$anonfun$getAccessor$5", "scala.collection.immutable.ArraySeq$ofRef", - "org.apache.spark.sql.catalyst.expressions.GenericInternalRow" + "org.apache.spark.sql.catalyst.expressions.GenericInternalRow", + "org.apache.iceberg.BaseFile$1" ) names.foreach(name => doRegister(name, kryo)) diff --git a/spark/src/test/scala/ai/chronon/spark/test/JoinTest.scala b/spark/src/test/scala/ai/chronon/spark/test/JoinTest.scala index 397986f63a..1c5b6a08f8 100644 --- a/spark/src/test/scala/ai/chronon/spark/test/JoinTest.scala +++ b/spark/src/test/scala/ai/chronon/spark/test/JoinTest.scala @@ -420,6 +420,90 @@ class JoinTest { assertEquals(diff.count(), 0) } + @Test + def testEventsEventsTemporalLongDs(): Unit = { + val spark: SparkSession = SparkSessionBuilder.build("JoinTest" + "_" + Random.alphanumeric.take(6).mkString, local = true) + spark.conf.set("spark.chronon.partition.format", "yyyy-MM-dd") + val tableUtils = TableUtils(spark) + val namespace = "test_namespace_jointest" + "_" + Random.alphanumeric.take(6).mkString + tableUtils.createDatabase(namespace) + val joinConf = getEventsEventsTemporal("temporal", namespace) + val viewsSchema = List( + Column("user", api.StringType, 10000), + Column("item", api.StringType, 100), + Column("time_spent_ms", api.LongType, 5000) + ) + + val viewsTable = s"$namespace.view_temporal" + DataFrameGen.events(spark, viewsSchema, count = 1000, partitions = 200) + .withColumn("ds", col("ds").cast("long")) + .save(viewsTable, Map("tblProp1" -> "1")) + + val viewsSource = Builders.Source.events( + table = viewsTable, + query = Builders.Query(selects = Builders.Selects("time_spent_ms"), startPartition = yearAgo) + ) + val viewsGroupBy = Builders.GroupBy( + sources = Seq(viewsSource), + keyColumns = Seq("item"), + aggregations = Seq( + Builders.Aggregation(operation = Operation.AVERAGE, inputColumn = "time_spent_ms"), + Builders.Aggregation(operation = Operation.MIN, inputColumn = "ts"), + Builders.Aggregation(operation = Operation.MAX, inputColumn = "ts") + // Builders.Aggregation(operation = Operation.APPROX_UNIQUE_COUNT, inputColumn = "ts") + // sql - APPROX_COUNT_DISTINCT(IF(queries.ts > $viewsTable.ts, time_spent_ms, null)) as user_ts_approx_unique_count + ), + metaData = Builders.MetaData(name = "unit_test.item_views", namespace = namespace) + ) + + // left side + val itemQueries = List(Column("item", api.StringType, 100)) + val itemQueriesTable = s"$namespace.item_queries" + val itemQueriesDf = DataFrameGen + .events(spark, itemQueries, 1000, partitions = 100) + // duplicate the events + itemQueriesDf.union(itemQueriesDf).save(itemQueriesTable) //.union(itemQueriesDf) + + val start = tableUtils.partitionSpec.minus(today, new Window(100, TimeUnit.DAYS)) + (new Analyzer(tableUtils, joinConf, monthAgo, today)).run() + val join = new Join(joinConf = joinConf, endPartition = dayAndMonthBefore, tableUtils) + val computed = join.computeJoin(Some(100)) + computed.show() + + val expected = tableUtils.sql(s""" + |WITH + | queries AS (SELECT item, ts, ds from $itemQueriesTable where ds >= '$start' and ds <= '$dayAndMonthBefore') + | SELECT queries.item, queries.ts, queries.ds, part.user_unit_test_item_views_ts_min, part.user_unit_test_item_views_ts_max, part.user_unit_test_item_views_time_spent_ms_average + | FROM (SELECT queries.item, + | queries.ts, + | queries.ds, + | MIN(IF(queries.ts > $viewsTable.ts, $viewsTable.ts, null)) as user_unit_test_item_views_ts_min, + | MAX(IF(queries.ts > $viewsTable.ts, $viewsTable.ts, null)) as user_unit_test_item_views_ts_max, + | AVG(IF(queries.ts > $viewsTable.ts, time_spent_ms, null)) as user_unit_test_item_views_time_spent_ms_average + | FROM queries left outer join $viewsTable + | ON queries.item = $viewsTable.item + | WHERE $viewsTable.item IS NOT NULL AND $viewsTable.ds >= '$yearAgo' AND $viewsTable.ds <= '$dayAndMonthBefore' + | GROUP BY queries.item, queries.ts, queries.ds) as part + | JOIN queries + | ON queries.item <=> part.item AND queries.ts <=> part.ts AND queries.ds <=> part.ds + |""".stripMargin) + expected.show() + + val diff = Comparison.sideBySide(computed, expected, List("item", "ts", "ds")) + val queriesBare = + tableUtils.sql(s"SELECT item, ts, ds from $itemQueriesTable where ds >= '$start' and ds <= '$dayAndMonthBefore'") + assertEquals(queriesBare.count(), computed.count()) + if (diff.count() > 0) { + logger.debug(s"Diff count: ${diff.count()}") + logger.debug(s"diff result rows") + diff + .replaceWithReadableTime(Seq("ts", "a_user_unit_test_item_views_ts_max", "b_user_unit_test_item_views_ts_max"), + dropOriginal = true) + .show() + } + assertEquals(diff.count(), 0) + } + @Test def testEventsEventsCumulative(): Unit = { val spark: SparkSession = SparkSessionBuilder.build("JoinTest" + "_" + Random.alphanumeric.take(6).mkString, local = true) From 907d2f89f057262a2f9188a833db46050aa6e721 Mon Sep 17 00:00:00 2001 From: Abby Whittier Date: Sat, 17 May 2025 13:20:29 +0000 Subject: [PATCH 27/36] iceberg drop partitions --- .../spark/ChrononKryoRegistrator.scala | 6 +-- .../scala/ai/chronon/spark/TableUtils.scala | 38 ++++++++++++++----- 2 files changed, 31 insertions(+), 13 deletions(-) diff --git a/spark/src/main/scala/ai/chronon/spark/ChrononKryoRegistrator.scala b/spark/src/main/scala/ai/chronon/spark/ChrononKryoRegistrator.scala index 3726905977..f5365d4995 100644 --- a/spark/src/main/scala/ai/chronon/spark/ChrononKryoRegistrator.scala +++ b/spark/src/main/scala/ai/chronon/spark/ChrononKryoRegistrator.scala @@ -147,8 +147,7 @@ class ChrononKryoRegistrator extends KryoRegistrator { "org.apache.spark.sql.catalyst.InternalRow$$anonfun$getAccessor$8", "org.apache.spark.sql.catalyst.InternalRow$$anonfun$getAccessor$5", "scala.collection.immutable.ArraySeq$ofRef", - "org.apache.spark.sql.catalyst.expressions.GenericInternalRow", - "org.apache.iceberg.BaseFile$1" + "org.apache.spark.sql.catalyst.expressions.GenericInternalRow" ) names.foreach(name => doRegister(name, kryo)) @@ -214,7 +213,8 @@ class ChrononIcebergKryoRegistrator extends ChrononKryoRegistrator { "org.apache.iceberg.SnapshotRef", "org.apache.iceberg.SnapshotRefType", "org.apache.iceberg.spark.source.SerializableTableWithSize$SerializableMetadataTableWithSize", - "org.apache.iceberg.MetadataTableType" + "org.apache.iceberg.MetadataTableType", + "org.apache.iceberg.BaseFile$1" ) additionalIcebergNames.foreach(name => doRegister(name, kryo)) } diff --git a/spark/src/main/scala/ai/chronon/spark/TableUtils.scala b/spark/src/main/scala/ai/chronon/spark/TableUtils.scala index c3dd5c37df..098ccb5f80 100644 --- a/spark/src/main/scala/ai/chronon/spark/TableUtils.scala +++ b/spark/src/main/scala/ai/chronon/spark/TableUtils.scala @@ -990,16 +990,34 @@ case class TableUtils(sparkSession: SparkSession) { // notably, the unit test iceberg integration uses hadoop because of // https://github.com/apache/iceberg/issues/7847 if (partitions.nonEmpty && tableExists(tableName)) { - val partitionSpecs = partitions - .map { partition => - val mainSpec = s"$partitionColumn='$partition'" - val specs = mainSpec +: subPartitionFilters.map { - case (key, value) => s"${key}='${value}'" - }.toSeq - specs.mkString("PARTITION (", ",", ")") - } - .mkString(",") - val dropSql = s"ALTER TABLE $tableName DROP IF EXISTS $partitionSpecs" + val dropSql = tableFormatProvider.readFormat(tableName) match { + // really this is Dsv1 vs Dsv2, not hive vs iceberg, + // but we break this way since only Iceberg is migrated to Dsv2 + case Iceberg => + // Build WHERE clause: (ds='2024-05-01' OR ds='2024-05-02') [AND k='v' AND …] + val mainPred = partitions + .map(p => s"$partitionColumn='${p}'") + .mkString("(", " OR ", ")") + + val extraPred = subPartitionFilters + .map { case (k, v) => s"$k='${v}'" } + .mkString(" AND ") + + val where = Seq(mainPred, extraPred).filter(_.nonEmpty).mkString(" AND ") + + s"DELETE FROM $tableName WHERE $where" + case _ => + val partitionSpecs = partitions + .map { partition => + val mainSpec = s"$partitionColumn='$partition'" + val specs = mainSpec +: subPartitionFilters.map { + case (key, value) => s"${key}='${value}'" + }.toSeq + specs.mkString("PARTITION (", ",", ")") + } + .mkString(",") + s"ALTER TABLE $tableName DROP IF EXISTS $partitionSpecs" + } sql(dropSql) } else { logger.info(s"$tableName doesn't exist, please double check before drop partitions") From 00654066ad81c8e4850065187dfcbdf05e2b4ad4 Mon Sep 17 00:00:00 2001 From: Abby Whittier Date: Sat, 17 May 2025 13:22:55 +0000 Subject: [PATCH 28/36] long partition testing --- .../chronon/spark/test/TableUtilsTest.scala | 35 +++++++++++++++++++ 1 file changed, 35 insertions(+) diff --git a/spark/src/test/scala/ai/chronon/spark/test/TableUtilsTest.scala b/spark/src/test/scala/ai/chronon/spark/test/TableUtilsTest.scala index 3dd2437bc7..3b8bfaf146 100644 --- a/spark/src/test/scala/ai/chronon/spark/test/TableUtilsTest.scala +++ b/spark/src/test/scala/ai/chronon/spark/test/TableUtilsTest.scala @@ -596,4 +596,39 @@ class TableUtilsTest { assert(tableUtils.allPartitions(tableName).size == 4) } + @Test + def testInsertPartitionsRemoveColumnsLongDs(): Unit = { + val tableName = "db.test_table_long_2" + spark.sql("CREATE DATABASE IF NOT EXISTS db") + val columns1 = Array( + StructField("long_field", LongType), + StructField("int_field", IntType), + StructField("string_field", StringType) + ) + val df1 = makeDf( + spark, + StructType( + tableName, + columns1 + :+ StructField("double_field", DoubleType) + :+ StructField("ds", LongType) + ), + List( + Row(1L, 2, "3", 4.0, 20221001L) + ) + ) + + val df2 = makeDf( + spark, + StructType( + tableName, + columns1 :+ StructField("ds", LongType) + ), + List( + Row(5L, 6, "7", 20221002L) + ) + ) + testInsertPartitions(tableName, df1, df2, ds1 = "2022-10-01", ds2 = "2022-10-02") + } + } From e980fa1f7a267208b17a773fd9b389ea878e612d Mon Sep 17 00:00:00 2001 From: Abby Whittier Date: Mon, 19 May 2025 12:20:58 +0000 Subject: [PATCH 29/36] unskipping fixed tests --- .../test/scala/ai/chronon/spark/test/TableUtilsTest.scala | 8 -------- 1 file changed, 8 deletions(-) diff --git a/spark/src/test/scala/ai/chronon/spark/test/TableUtilsTest.scala b/spark/src/test/scala/ai/chronon/spark/test/TableUtilsTest.scala index 3b8bfaf146..aa0e90452e 100644 --- a/spark/src/test/scala/ai/chronon/spark/test/TableUtilsTest.scala +++ b/spark/src/test/scala/ai/chronon/spark/test/TableUtilsTest.scala @@ -28,7 +28,6 @@ import org.apache.spark.sql.functions.col import org.apache.spark.sql.{AnalysisException, DataFrame, Row, SparkSession, types} import org.junit.Assert.{assertEquals, assertTrue} import org.junit.Test -import org.junit.Assume import java.time.Instant import scala.util.{Random, Try} @@ -257,10 +256,6 @@ class TableUtilsTest { @Test def testDropPartitions(): Unit = { - // TODO this is using datasource v1 semantics, which won't be compatible with non-hive catalogs - // notably, the unit test iceberg integration uses hadoop because of - // https://github.com/apache/iceberg/issues/7847 - Assume.assumeTrue(format != "iceberg") val tableName = "db.test_drop_partitions_table" spark.sql("CREATE DATABASE IF NOT EXISTS db") val columns1 = Array( @@ -557,9 +552,6 @@ class TableUtilsTest { @Test def testGetPartitionsWithLongPartition(): Unit = { - // This is a known issue with iceberg - // To be fixed in a fast follow PR - Assume.assumeTrue(format != "iceberg") val tableName = "db.test_long_partitions" spark.sql("CREATE DATABASE IF NOT EXISTS db") val structFields = Array( From 40548927bde2858e487d57922a79821a27b2ffa8 Mon Sep 17 00:00:00 2001 From: Abby Whittier Date: Mon, 19 May 2025 17:39:41 +0000 Subject: [PATCH 30/36] changing test schema --- .../src/test/scala/ai/chronon/spark/test/TableUtilsTest.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/spark/src/test/scala/ai/chronon/spark/test/TableUtilsTest.scala b/spark/src/test/scala/ai/chronon/spark/test/TableUtilsTest.scala index aa0e90452e..50a2e11bb3 100644 --- a/spark/src/test/scala/ai/chronon/spark/test/TableUtilsTest.scala +++ b/spark/src/test/scala/ai/chronon/spark/test/TableUtilsTest.scala @@ -556,7 +556,7 @@ class TableUtilsTest { spark.sql("CREATE DATABASE IF NOT EXISTS db") val structFields = Array( StructField("dateint", LongType), - StructField("hr", IntType), + StructField("hour", IntType), StructField("event_type", StringType), StructField("label_ds", StringType), StructField("feature_value", IntType) @@ -577,7 +577,7 @@ class TableUtilsTest { ), rows ) - val partitionColumns = Seq("dateint", "hr", "event_type") + val partitionColumns = Seq("dateint", "hour", "event_type") tableUtils.insertPartitions(df1, tableName, partitionColumns = partitionColumns, From 2ffd32de09019d357af391f4cd7639e2e1efdc77 Mon Sep 17 00:00:00 2001 From: Abby Whittier Date: Mon, 19 May 2025 19:59:21 +0000 Subject: [PATCH 31/36] updating drop partitions to be schemaless --- .../chronon/spark/test/TableUtilsTest.scala | 68 ++++++++++--------- 1 file changed, 35 insertions(+), 33 deletions(-) diff --git a/spark/src/test/scala/ai/chronon/spark/test/TableUtilsTest.scala b/spark/src/test/scala/ai/chronon/spark/test/TableUtilsTest.scala index 50a2e11bb3..ef9f5db8ca 100644 --- a/spark/src/test/scala/ai/chronon/spark/test/TableUtilsTest.scala +++ b/spark/src/test/scala/ai/chronon/spark/test/TableUtilsTest.scala @@ -32,8 +32,6 @@ import org.junit.Test import java.time.Instant import scala.util.{Random, Try} - - class SimpleAddUDF extends UDF { def evaluate(value: Int): Int = { value + 20 @@ -77,10 +75,12 @@ class TableUtilsTest { Seq( types.StructField("name", types.StringType, nullable = true), types.StructField("age", types.IntegerType, nullable = false), - types.StructField("address", types.StructType(Seq( - types.StructField("street", types.StringType, nullable = true), - types.StructField("city", types.StringType, nullable = true) - ))) + types.StructField("address", + types.StructType( + Seq( + types.StructField("street", types.StringType, nullable = true), + types.StructField("city", types.StringType, nullable = true) + ))) ) ) val expectedFieldNames = Seq("name", "age", "address", "address.street", "address.city") @@ -288,13 +288,11 @@ class TableUtilsTest { |""".stripMargin) assertEquals(updated.count(), 2) assertTrue( - updated - .collect() - .sameElements( - List( - Row(1L, 2, "2022-10-01", "2022-11-01"), - Row(3L, 8, "2022-10-05", "2022-11-03") - ))) + updated.collect().toSet == + Set( + Row(1L, 2, "2022-10-01", "2022-11-01"), + Row(3L, 8, "2022-10-05", "2022-11-03") + )) } @Test @@ -342,7 +340,9 @@ class TableUtilsTest { PartitionRange("2022-10-05", "2022-10-05")(tableUtils))) } - private def prepareTestDataWithSubPartitionsWithView(tableName: String, viewName: String, partitionColOpt: Option[String] = None): Unit = { + private def prepareTestDataWithSubPartitionsWithView(tableName: String, + viewName: String, + partitionColOpt: Option[String] = None): Unit = { prepareTestDataWithSubPartitions(tableName, partitionColOpt) tableUtils.sql(s"CREATE OR REPLACE VIEW $viewName AS SELECT * FROM $tableName") } @@ -369,10 +369,10 @@ class TableUtilsTest { Row(3L, "2022-11-03", "2022-11-03") ) ) - tableUtils.insertPartitions(df1, - tableName, - partitionColumns = Seq(partitionColOpt.getOrElse(tableUtils.partitionColumn), - Constants.LabelPartitionColumn)) + tableUtils.insertPartitions( + df1, + tableName, + partitionColumns = Seq(partitionColOpt.getOrElse(tableUtils.partitionColumn), Constants.LabelPartitionColumn)) } @@ -458,14 +458,16 @@ class TableUtilsTest { // test that chronon_archived flag exists and is set to true val tblProps = tableUtils.sql(s"SHOW TBLPROPERTIES $dbName.$archiveTableName").collect() val mapVal = readTblPropertiesMap(tblProps) - assert(mapVal.getOrElse("chronon_archived","false") == "true") + assert(mapVal.getOrElse("chronon_archived", "false") == "true") // test after a un-archive we can remove chronon_archived property tableUtils.sql(s"ALTER TABLE $dbName.$archiveTableName RENAME TO $tableName") - tableUtils.alterTableProperties(tableName, properties = Map("chronon_archived" -> "true"), unsetProperties = Seq("chronon_archived")) + tableUtils.alterTableProperties(tableName, + properties = Map("chronon_archived" -> "true"), + unsetProperties = Seq("chronon_archived")) val tblPropsAfter = tableUtils.sql(s"SHOW TBLPROPERTIES $tableName").collect() val mapValAfter = readTblPropertiesMap(tblPropsAfter) - assert(mapValAfter.getOrElse("chronon_archived","false") == "false") + assert(mapValAfter.getOrElse("chronon_archived", "false") == "false") } @Test @@ -489,7 +491,8 @@ class TableUtilsTest { val tableName = s"$dbName.test_table" val viewName = s"$dbName.v_test_table" tableUtils.sql(s"CREATE DATABASE IF NOT EXISTS $dbName") - tableUtils.sql(s"CREATE TABLE IF NOT EXISTS $tableName (test INT, test_col STRING) PARTITIONED BY (ds STRING) STORED AS PARQUET") + tableUtils.sql( + s"CREATE TABLE IF NOT EXISTS $tableName (test INT, test_col STRING) PARTITIONED BY (ds STRING) STORED AS PARQUET") tableUtils.sql(s"CREATE OR REPLACE VIEW $viewName AS SELECT test, test_col FROM $tableName") val table_format = tableUtils.tableReadFormat(tableName) @@ -514,7 +517,7 @@ class TableUtilsTest { val viewName = "db.v_test_table_with_sub_partition" val partitionCol = "custom_partition_date" prepareTestDataWithSubPartitionsWithView(tableName, viewName, partitionColOpt = Some(partitionCol)) - val partitions = tableUtils.partitions(viewName, partitionColOpt=Some(partitionCol)) + val partitions = tableUtils.partitions(viewName, partitionColOpt = Some(partitionCol)) assertEquals(Seq("2022-11-01", "2022-11-02", "2022-11-03").sorted, partitions.sorted) } @@ -524,7 +527,9 @@ class TableUtilsTest { val viewName = "db.v_test_table_with_sub_partition" val partitionCol = "custom_partition_date" prepareTestDataWithSubPartitionsWithView(tableName, viewName, partitionColOpt = Some(partitionCol)) - val partitions = tableUtils.partitions(viewName, subPartitionsFilter=Map("label_ds" -> "2022-11-02"), partitionColOpt=Some(partitionCol)) + val partitions = tableUtils.partitions(viewName, + subPartitionsFilter = Map("label_ds" -> "2022-11-02"), + partitionColOpt = Some(partitionCol)) assertEquals(Seq("2022-11-01", "2022-11-02").sorted, partitions.sorted) } @@ -563,10 +568,10 @@ class TableUtilsTest { ) val rows = List( - Row(20220101L, 1, "event1", "2022-01-01", 4), // 2022-01-01 with hr=1 - Row(20220102L, 2, "event2", "2022-01-02", 2), // 2022-01-02 with hr=2 - Row(20220103L, 10, "event1", "2022-01-03", 9), // 2022-01-03 with hr=10 - Row(20220104L, 12, "event1", "20224-01-04", 12) // 2022-01-04 with hr=12 + Row(20220101L, 1, "event1", "2022-01-01", 4), // 2022-01-01 with hr=1 + Row(20220102L, 2, "event2", "2022-01-02", 2), // 2022-01-02 with hr=2 + Row(20220103L, 10, "event1", "2022-01-03", 9), // 2022-01-03 with hr=10 + Row(20220104L, 12, "event1", "20224-01-04", 12) // 2022-01-04 with hr=12 ) val df1 = makeDf( @@ -578,17 +583,14 @@ class TableUtilsTest { rows ) val partitionColumns = Seq("dateint", "hour", "event_type") - tableUtils.insertPartitions(df1, - tableName, - partitionColumns = partitionColumns, - ) + tableUtils.insertPartitions(df1, tableName, partitionColumns = partitionColumns) assert(tableUtils.tableExists(tableName)) val partitions = tableUtils.partitions(tableName, Map.empty, partitionColOpt = Some("dateint")) assert(partitions.size == 4) assert(tableUtils.allPartitions(tableName).size == 4) } - @Test + @Test def testInsertPartitionsRemoveColumnsLongDs(): Unit = { val tableName = "db.test_table_long_2" spark.sql("CREATE DATABASE IF NOT EXISTS db") From 50445f0eabb9ed131078420fe1b893960fb7b424 Mon Sep 17 00:00:00 2001 From: Abby Whittier Date: Wed, 21 May 2025 13:16:04 +0000 Subject: [PATCH 32/36] found bug during CI testing --- .../main/scala/ai/chronon/spark/Extensions.scala | 2 +- .../ai/chronon/spark/test/ExtensionsTest.scala | 15 +++++++++++++++ 2 files changed, 16 insertions(+), 1 deletion(-) diff --git a/spark/src/main/scala/ai/chronon/spark/Extensions.scala b/spark/src/main/scala/ai/chronon/spark/Extensions.scala index e080d87a4e..c97fcbe5d2 100644 --- a/spark/src/main/scala/ai/chronon/spark/Extensions.scala +++ b/spark/src/main/scala/ai/chronon/spark/Extensions.scala @@ -83,7 +83,7 @@ object Extensions { .groupBy(col(TableUtils(dataFrame.sparkSession).partitionColumn)) .count() .collect() - .map(row => row.getString(0) -> row.getLong(1)) + .map(row => row.get(0).toString -> row.getLong(1)) .toMap DfWithStats(dataFrame, partitionCounts) } diff --git a/spark/src/test/scala/ai/chronon/spark/test/ExtensionsTest.scala b/spark/src/test/scala/ai/chronon/spark/test/ExtensionsTest.scala index aecee8bfe7..73198cfb1a 100644 --- a/spark/src/test/scala/ai/chronon/spark/test/ExtensionsTest.scala +++ b/spark/src/test/scala/ai/chronon/spark/test/ExtensionsTest.scala @@ -43,4 +43,19 @@ class ExtensionsTest { } assertEquals(0, diff.count()) } + + @Test + def testDfWithStatsLongPartition(): Unit = { + val df = Seq( + (1, 20240103L), + (2, 20240104L), + (3, 20240104L) + ).toDF("key", "ds") + + val dfWithStats: DfWithStats = DfWithStats(df) + val stats = dfWithStats.stats + + assertEquals(3L, stats.count) + } + } \ No newline at end of file From 589eba97e90ae94fd108dad042491485160b0c88 Mon Sep 17 00:00:00 2001 From: Abby Whittier Date: Sat, 7 Jun 2025 10:48:35 -0400 Subject: [PATCH 33/36] Apply suggestions from code review Co-authored-by: Pengyu Hou <3771747+pengyu-hou@users.noreply.github.com> Signed-off-by: Abby Whittier --- .../scala/ai/chronon/spark/TableUtils.scala | 1 + .../scala/ai/chronon/spark/test/JoinTest.scala | 4 +--- .../ai/chronon/spark/test/TableUtilsTest.scala | 18 +++++++++--------- 3 files changed, 11 insertions(+), 12 deletions(-) diff --git a/spark/src/main/scala/ai/chronon/spark/TableUtils.scala b/spark/src/main/scala/ai/chronon/spark/TableUtils.scala index 098ccb5f80..1880caf92d 100644 --- a/spark/src/main/scala/ai/chronon/spark/TableUtils.scala +++ b/spark/src/main/scala/ai/chronon/spark/TableUtils.scala @@ -1007,6 +1007,7 @@ case class TableUtils(sparkSession: SparkSession) { s"DELETE FROM $tableName WHERE $where" case _ => + // default case is for Hive val partitionSpecs = partitions .map { partition => val mainSpec = s"$partitionColumn='$partition'" diff --git a/spark/src/test/scala/ai/chronon/spark/test/JoinTest.scala b/spark/src/test/scala/ai/chronon/spark/test/JoinTest.scala index 1c5b6a08f8..23dda045bc 100644 --- a/spark/src/test/scala/ai/chronon/spark/test/JoinTest.scala +++ b/spark/src/test/scala/ai/chronon/spark/test/JoinTest.scala @@ -450,8 +450,6 @@ class JoinTest { Builders.Aggregation(operation = Operation.AVERAGE, inputColumn = "time_spent_ms"), Builders.Aggregation(operation = Operation.MIN, inputColumn = "ts"), Builders.Aggregation(operation = Operation.MAX, inputColumn = "ts") - // Builders.Aggregation(operation = Operation.APPROX_UNIQUE_COUNT, inputColumn = "ts") - // sql - APPROX_COUNT_DISTINCT(IF(queries.ts > $viewsTable.ts, time_spent_ms, null)) as user_ts_approx_unique_count ), metaData = Builders.MetaData(name = "unit_test.item_views", namespace = namespace) ) @@ -462,7 +460,7 @@ class JoinTest { val itemQueriesDf = DataFrameGen .events(spark, itemQueries, 1000, partitions = 100) // duplicate the events - itemQueriesDf.union(itemQueriesDf).save(itemQueriesTable) //.union(itemQueriesDf) + itemQueriesDf.union(itemQueriesDf).save(itemQueriesTable) val start = tableUtils.partitionSpec.minus(today, new Window(100, TimeUnit.DAYS)) (new Analyzer(tableUtils, joinConf, monthAgo, today)).run() diff --git a/spark/src/test/scala/ai/chronon/spark/test/TableUtilsTest.scala b/spark/src/test/scala/ai/chronon/spark/test/TableUtilsTest.scala index ef9f5db8ca..f960b5f0d0 100644 --- a/spark/src/test/scala/ai/chronon/spark/test/TableUtilsTest.scala +++ b/spark/src/test/scala/ai/chronon/spark/test/TableUtilsTest.scala @@ -560,11 +560,11 @@ class TableUtilsTest { val tableName = "db.test_long_partitions" spark.sql("CREATE DATABASE IF NOT EXISTS db") val structFields = Array( - StructField("dateint", LongType), + StructField("dateInt", LongType), StructField("hour", IntType), - StructField("event_type", StringType), - StructField("label_ds", StringType), - StructField("feature_value", IntType) + StructField("eventType", StringType), + StructField("labelDs", StringType), + StructField("featureValue", IntType) ) val rows = List( @@ -582,7 +582,7 @@ class TableUtilsTest { ), rows ) - val partitionColumns = Seq("dateint", "hour", "event_type") + val partitionColumns = Seq("dateInt", "hour", "eventType") tableUtils.insertPartitions(df1, tableName, partitionColumns = partitionColumns) assert(tableUtils.tableExists(tableName)) val partitions = tableUtils.partitions(tableName, Map.empty, partitionColOpt = Some("dateint")) @@ -595,16 +595,16 @@ class TableUtilsTest { val tableName = "db.test_table_long_2" spark.sql("CREATE DATABASE IF NOT EXISTS db") val columns1 = Array( - StructField("long_field", LongType), - StructField("int_field", IntType), - StructField("string_field", StringType) + StructField("longField", LongType), + StructField("intField", IntType), + StructField("stringField", StringType) ) val df1 = makeDf( spark, StructType( tableName, columns1 - :+ StructField("double_field", DoubleType) + :+ StructField("doubleField", DoubleType) :+ StructField("ds", LongType) ), List( From 8cfd6619a4114f31e2018ede2d3546fb1eecd711 Mon Sep 17 00:00:00 2001 From: Abby Whittier Date: Sat, 7 Jun 2025 17:10:39 +0000 Subject: [PATCH 34/36] formatting --- spark/src/main/scala/ai/chronon/spark/TableUtils.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/spark/src/main/scala/ai/chronon/spark/TableUtils.scala b/spark/src/main/scala/ai/chronon/spark/TableUtils.scala index 1880caf92d..9d2593dcd1 100644 --- a/spark/src/main/scala/ai/chronon/spark/TableUtils.scala +++ b/spark/src/main/scala/ai/chronon/spark/TableUtils.scala @@ -1007,7 +1007,7 @@ case class TableUtils(sparkSession: SparkSession) { s"DELETE FROM $tableName WHERE $where" case _ => - // default case is for Hive + // default case is for Hive val partitionSpecs = partitions .map { partition => val mainSpec = s"$partitionColumn='$partition'" From c72b74f2c6945e6433c2ec1a7e5df985667f03cd Mon Sep 17 00:00:00 2001 From: Abby Whittier Date: Sat, 7 Jun 2025 17:13:39 +0000 Subject: [PATCH 35/36] propping name refactor --- spark/src/test/scala/ai/chronon/spark/test/TableUtilsTest.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/spark/src/test/scala/ai/chronon/spark/test/TableUtilsTest.scala b/spark/src/test/scala/ai/chronon/spark/test/TableUtilsTest.scala index f960b5f0d0..6eb3414582 100644 --- a/spark/src/test/scala/ai/chronon/spark/test/TableUtilsTest.scala +++ b/spark/src/test/scala/ai/chronon/spark/test/TableUtilsTest.scala @@ -585,7 +585,7 @@ class TableUtilsTest { val partitionColumns = Seq("dateInt", "hour", "eventType") tableUtils.insertPartitions(df1, tableName, partitionColumns = partitionColumns) assert(tableUtils.tableExists(tableName)) - val partitions = tableUtils.partitions(tableName, Map.empty, partitionColOpt = Some("dateint")) + val partitions = tableUtils.partitions(tableName, Map.empty, partitionColOpt = Some("dateInt")) assert(partitions.size == 4) assert(tableUtils.allPartitions(tableName).size == 4) } From d13dc5ecb7224c4b505219cd23a522707e0935c9 Mon Sep 17 00:00:00 2001 From: Abby Whittier Date: Sat, 7 Jun 2025 17:23:03 +0000 Subject: [PATCH 36/36] fixing some typos --- spark/src/main/scala/ai/chronon/spark/SparkSessionBuilder.scala | 2 -- 1 file changed, 2 deletions(-) diff --git a/spark/src/main/scala/ai/chronon/spark/SparkSessionBuilder.scala b/spark/src/main/scala/ai/chronon/spark/SparkSessionBuilder.scala index 0b55313341..05bf8333a4 100644 --- a/spark/src/main/scala/ai/chronon/spark/SparkSessionBuilder.scala +++ b/spark/src/main/scala/ai/chronon/spark/SparkSessionBuilder.scala @@ -53,7 +53,6 @@ object SparkSessionBuilder { "spark.chronon.table_write.format" -> "delta" ) (configMap, "ai.chronon.spark.ChrononDeltaLakeKryoRegistrator") - (configMap, "ai.chronon.spark.ChrononKryoRegistrator") case Some("iceberg") => val configMap = Map( "spark.sql.extensions" -> "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions", @@ -64,7 +63,6 @@ object SparkSessionBuilder { "spark.sql.catalog.spark_catalog.type" -> "hadoop", "spark.sql.catalog.spark_catalog.warehouse" -> s"$warehouseDir/data" ) - // TODO add an iceberg kryo registrator (configMap, "ai.chronon.spark.ChrononIcebergKryoRegistrator") case _ => (Map.empty, "ai.chronon.spark.ChrononKryoRegistrator") }