Skip to content
This repository was archived by the owner on Aug 22, 2025. It is now read-only.

Commit 9e7012f

Browse files
[CROSSDATA-1896] Pushdown limit when serializing result in executors (#693)
1 parent e98f8e5 commit 9e7012f

File tree

15 files changed

+44
-658
lines changed

15 files changed

+44
-658
lines changed

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@ Only listing significant user-visible, not internal code cleanups and minor bug
1313
* [SS-3782][SS-3857] Serve HDFS conf files properly
1414
* [POC] Function interface for lineage
1515
* Configuration: Optional serialization in executors
16+
* [CROSSDATA-1896] Apply limit optimization before result serialization in executors
1617

1718
## 2.15.0-24f463e (Built: January 14, 2019 | Released: January 17, 2019)
1819

Jenkinsfile

Lines changed: 0 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -50,20 +50,6 @@ hose {
5050
'POSTGRES_DB=hakama']
5151
]
5252
],
53-
['GOVERNANCE': [
54-
'image': 'stratio/governance-catalog-api:0.3.0',
55-
'sleep': 30,
56-
'healthcheck': 10000,
57-
'env': ['USE_DYNAMIC_AUTHENTICATION=false',
58-
'API_SERVICE_TIMEOUT=5000',
59-
'API_LOG_LEVEL=INFO',
60-
'SERVICE_LOG_LEVEL=INFO',
61-
'POSTGRESQL_HOSTS=%%POSTGRES',
62-
'POSTGRESQL_USER=postgres',
63-
'POSTGRESQL_PASSWORD=postgres',
64-
'POSTGRESQL_DATABASE=hakama']
65-
]
66-
],
6753
['ELASTIC': [
6854
'image': 'stratio/elasticsearch:6.1.1',
6955
'sleep': 30,

core/src/main/scala/com/stratio/crossdata/metrics/MetricsRegister.scala

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,8 @@ import java.util.concurrent.TimeUnit
44

55
import com.codahale.metrics._
66

7+
import scala.util.Try
8+
79
object MetricsGlossary {
810

911
object Counter {
@@ -192,6 +194,6 @@ class MetricsRegister(metricsRegistry: MetricRegistry) {
192194
}
193195

194196
private def getFailingQueryCounterKey(message: String, user: String): String =
195-
s"[$user][${message.replaceAll(" ", "_")}]"
197+
Option(message).map(msg => s"[$user][${msg.replaceAll(" ", "_")}]").getOrElse(s"[$user][null]")
196198

197199
}

core/src/main/scala/org/apache/spark/sql/crossdata/execution/XDQueryExecution.scala

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -8,23 +8,22 @@ package org.apache.spark.sql.crossdata.execution
88
import com.stratio.common.utils.components.logger.impl.Slf4jLoggerComponent
99
import com.stratio.crossdata.common.profiler.PerformanceLogger
1010
import com.stratio.crossdata.security._
11-
import org.apache.spark.sql.{SaveMode, SparkSession}
1211
import org.apache.spark.sql.catalyst.TableIdentifier
1312
import org.apache.spark.sql.catalyst.analysis.{UnresolvedAlias, UnresolvedAttribute, UnresolvedRelation}
1413
import org.apache.spark.sql.catalyst.expressions.{And, Cast, Literal, Sha2}
1514
import org.apache.spark.sql.catalyst.plans.logical._
1615
import org.apache.spark.sql.catalyst.rules.Rule
17-
import org.apache.spark.sql.crossdata.checkAndApplyRule
18-
import org.apache.spark.sql.crossdata.XDSession
1916
import org.apache.spark.sql.crossdata.catalyst.catalog.XDSessionCatalog
2017
import org.apache.spark.sql.crossdata.catalyst.plans.logical.{XDFilter, XDProject}
2118
import org.apache.spark.sql.crossdata.execution.auth.AuthDirectivesExtractor
2219
import org.apache.spark.sql.crossdata.rules.XDCompanionRules
2320
import org.apache.spark.sql.crossdata.session.XDSharedState
21+
import org.apache.spark.sql.crossdata.{XDSession, checkAndApplyRule}
2422
import org.apache.spark.sql.execution._
2523
import org.apache.spark.sql.execution.command.CreateTableLikeCommand
2624
import org.apache.spark.sql.execution.datasources.CreateTable
2725
import org.apache.spark.sql.types._
26+
import org.apache.spark.sql.{SaveMode, SparkSession}
2827
import org.apache.spark.util.Utils
2928

3029
import scala.util.{Failure, Success, Try}
@@ -462,6 +461,7 @@ class XDQueryExecution(sparkSession: SparkSession, parsedPlan: LogicalPlan, used
462461
override lazy val sparkPlan: SparkPlan = {
463462
logPerformance(s"[sparkPlan]") {
464463
SparkSession.setActiveSession(sparkSession)
464+
465465
// TODO: We use next(), i.e. take the first plan returned by the planner, here for now,
466466
// but we will implement to choose the best plan.
467467
planner.plan(ReturnAnswer(optimizedPlan)).next()

core/src/main/scala/org/apache/spark/sql/crossdata/session/XDSessionStateBuilder.scala

Lines changed: 27 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -5,17 +5,19 @@
55
*/
66
package org.apache.spark.sql.crossdata.session
77

8-
import org.apache.spark.sql.SparkSession
8+
import org.apache.spark.sql.{SparkSession, Strategy}
99
import org.apache.spark.sql.catalyst.analysis.Analyzer
1010
import org.apache.spark.sql.catalyst.catalog.SessionCatalog
1111
import org.apache.spark.sql.catalyst.parser.ParserInterface
12-
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
12+
import org.apache.spark.sql.catalyst.plans.logical._
13+
import org.apache.spark.sql.catalyst.rules.Rule
1314
import org.apache.spark.sql.crossdata.XDSession
1415
import org.apache.spark.sql.crossdata.catalyst.catalog.{XDCatalogWrapper, XDSessionCatalog}
1516
import org.apache.spark.sql.crossdata.catalyst.catalog.temporary.implementations.DefaultTemporaryCatalog
1617
import org.apache.spark.sql.crossdata.execution.{XDQueryExecution, XDSparkSqlParser}
17-
import org.apache.spark.sql.execution.QueryExecution
18-
import org.apache.spark.sql.internal.{BaseSessionStateBuilder, SessionState}
18+
import org.apache.spark.sql.execution.{QueryExecution, SparkPlanner}
19+
import org.apache.spark.sql.internal.SQLConf.buildConf
20+
import org.apache.spark.sql.internal.{BaseSessionStateBuilder, SQLConf, SessionState}
1921

2022

2123
class XDSessionStateBuilder(sparkSession: SparkSession, parentState: Option[SessionState] = None) extends BaseSessionStateBuilder(sparkSession, parentState){
@@ -28,6 +30,27 @@ class XDSessionStateBuilder(sparkSession: SparkSession, parentState: Option[Sess
2830

2931
override lazy val analyzer: Analyzer = super.analyzer
3032

33+
object XDLimitRule extends Rule[LogicalPlan] {
34+
val applyXDLimitRule = sparkSession.sparkContext.conf.getBoolean("spark.sql.crossdata.limitRule", false)
35+
logDebug(s"applyXDLimitRule? $applyXDLimitRule")
36+
override def apply(plan: LogicalPlan): LogicalPlan = plan transform {
37+
case s@SerializeFromObject(serializer, MapPartitions(func, outputObjAttr, DeserializeToObject(des, attrs, gl@GlobalLimit(_, _)))) =>
38+
if(applyXDLimitRule){
39+
SerializeFromObject(serializer, MapPartitions(func, outputObjAttr, DeserializeToObject(des, attrs, ReturnAnswer(gl))))
40+
} else {
41+
s
42+
}
43+
}
44+
}
45+
46+
override def planner: SparkPlanner = {
47+
experimentalMethods.extraOptimizations = experimentalMethods.extraOptimizations :+ XDLimitRule
48+
new SparkPlanner(session.sparkContext, conf, experimentalMethods) {
49+
override def extraPlanningStrategies: Seq[Strategy] =
50+
super.extraPlanningStrategies ++ customPlanningStrategies
51+
}
52+
}
53+
3154
override lazy val catalog: SessionCatalog = new XDSessionCatalog(
3255
sparkSession.asInstanceOf[XDSession].config,
3356
DefaultTemporaryCatalog, //TODO: Next iterations requiring value propagation will need a distributed temp map.

core/src/test/resources/governance-catalog-partial-cache.conf

Lines changed: 0 additions & 39 deletions
This file was deleted.

core/src/test/resources/governance-catalog.conf

Lines changed: 0 additions & 39 deletions
This file was deleted.

core/src/test/resources/nostorage-reference.conf

Whitespace-only changes.

0 commit comments

Comments
 (0)