Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 17 additions & 11 deletions src/frontend/src/instance.rs
Original file line number Diff line number Diff line change
Expand Up @@ -303,7 +303,7 @@ impl Instance {
.await
}
_ => {
query_interceptor.pre_execute(&stmt, None, query_ctx.clone())?;
query_interceptor.pre_execute(Some(&stmt), None, query_ctx.clone())?;
self.statement_executor
.execute_sql(stmt, query_ctx)
.await
Expand All @@ -326,7 +326,7 @@ impl Instance {
let QueryStatement::Sql(stmt) = stmt else {
unreachable!()
};
query_interceptor.pre_execute(&stmt, Some(&plan), query_ctx.clone())?;
query_interceptor.pre_execute(Some(&stmt), Some(&plan), query_ctx.clone())?;

self.statement_executor
.exec_plan(plan, query_ctx.clone())
Expand All @@ -344,7 +344,11 @@ impl Instance {
.statement_executor
.plan_tql(tql.clone(), query_ctx)
.await?;
query_interceptor.pre_execute(&Statement::Tql(tql), Some(&plan), query_ctx.clone())?;
query_interceptor.pre_execute(
Some(&Statement::Tql(tql)),
Some(&plan),
query_ctx.clone(),
)?;
self.statement_executor
.exec_plan(plan, query_ctx.clone())
.await
Expand Down Expand Up @@ -649,9 +653,7 @@ impl Instance {
let query_interceptor_opt = self.plugins.get::<SqlQueryInterceptorRef<Error>>();
let query_interceptor = query_interceptor_opt.as_ref();

if let Some(ref s) = stmt {
query_interceptor.pre_execute(s, Some(&plan), query_ctx.clone())?;
}
query_interceptor.pre_execute(stmt.as_ref(), Some(&plan), query_ctx.clone())?;

let query = stmt
.as_ref()
Expand Down Expand Up @@ -880,7 +882,11 @@ impl PrometheusHandler for Instance {
.map_err(BoxedError::new)
.context(ExecuteQuerySnafu)?;

interceptor.pre_execute(query, Some(&plan), query_ctx.clone())?;
let QueryStatement::Promql(eval_stmt, _) = &stmt else {
unreachable!("query is parsed from promql");
};

interceptor.pre_execute(query, &eval_stmt.expr, Some(&plan), query_ctx.clone())?;

// Take the EvalStmt from the original QueryStatement and use it to create the CatalogQueryStatement.
let query_statement = if let QueryStatement::Promql(eval_stmt, alias) = stmt {
Expand All @@ -892,7 +898,7 @@ impl PrometheusHandler for Instance {
}
.fail();
};
let query = query_statement.to_string();
let raw_query = query_statement.to_string();

let slow_query_timer = self
.slow_query_options
Expand All @@ -912,7 +918,7 @@ impl PrometheusHandler for Instance {
let ticket = self.process_manager.register_query(
query_ctx.current_catalog().to_string(),
vec![query_ctx.current_schema()],
query,
raw_query,
query_ctx.conn_info().to_string(),
Some(query_ctx.process_id()),
slow_query_timer,
Expand Down Expand Up @@ -1270,11 +1276,11 @@ mod tests {

fn pre_execute(
&self,
statement: &Statement,
statement: Option<&Statement>,
_plan: Option<&LogicalPlan>,
_query_ctx: QueryContextRef,
) -> Result<()> {
let Statement::Insert(insert) = statement else {
let Some(Statement::Insert(insert)) = statement else {
return Ok(());
};
if !insert.has_non_values_query_source() {
Expand Down
42 changes: 39 additions & 3 deletions src/servers/src/interceptor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ use common_error::ext::ErrorExt;
use common_query::Output;
use datafusion_expr::LogicalPlan;
use log_query::LogQuery;
use promql_parser::parser::Expr;
use query::parser::PromQuery;
use session::context::QueryContextRef;
use sql::statements::statement::Statement;
Expand Down Expand Up @@ -58,7 +59,7 @@ pub trait SqlQueryInterceptor {
/// Called before sql is actually executed. This hook is not called at the moment.
fn pre_execute(
&self,
_statement: &Statement,
_statement: Option<&Statement>,
_plan: Option<&LogicalPlan>,
_query_ctx: QueryContextRef,
) -> Result<(), Self::Error> {
Expand Down Expand Up @@ -111,7 +112,7 @@ where

fn pre_execute(
&self,
statement: &Statement,
statement: Option<&Statement>,
plan: Option<&LogicalPlan>,
query_ctx: QueryContextRef,
) -> Result<(), Self::Error> {
Expand Down Expand Up @@ -224,6 +225,7 @@ pub trait PromQueryInterceptor {
fn pre_execute(
&self,
_query: &PromQuery,
_expr: &Expr,
_plan: Option<&LogicalPlan>,
_query_ctx: QueryContextRef,
) -> Result<(), Self::Error> {
Expand Down Expand Up @@ -253,11 +255,45 @@ where
fn pre_execute(
&self,
query: &PromQuery,
expr: &Expr,
plan: Option<&LogicalPlan>,
query_ctx: QueryContextRef,
) -> Result<(), Self::Error> {
if let Some(this) = self {
this.pre_execute(query, plan, query_ctx)
this.pre_execute(query, expr, plan, query_ctx)
} else {
Ok(())
}
}

fn post_execute(
&self,
output: Output,
query_ctx: QueryContextRef,
) -> Result<Output, Self::Error> {
if let Some(this) = self {
this.post_execute(output, query_ctx)
} else {
Ok(output)
}
}
}

impl<E> PromQueryInterceptor for Option<&PromQueryInterceptorRef<E>>
where
E: ErrorExt,
{
type Error = E;

fn pre_execute(
&self,
query: &PromQuery,
expr: &Expr,
plan: Option<&LogicalPlan>,
query_ctx: QueryContextRef,
) -> Result<(), Self::Error> {
if let Some(this) = self {
this.pre_execute(query, expr, plan, query_ctx)
} else {
Ok(())
}
Expand Down
9 changes: 8 additions & 1 deletion src/servers/tests/interceptor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,7 @@ impl PromQueryInterceptor for NoopInterceptor {
fn pre_execute(
&self,
query: &PromQuery,
_expr: &promql_parser::parser::Expr,
_plan: Option<&LogicalPlan>,
_query_ctx: QueryContextRef,
) -> std::result::Result<(), Self::Error> {
Expand Down Expand Up @@ -121,7 +122,13 @@ fn test_prom_interceptor() {
..Default::default()
};

let fail = PromQueryInterceptor::pre_execute(&di, &query, None, ctx.clone());
let fail = PromQueryInterceptor::pre_execute(
&di,
&query,
&promql_parser::parser::parse(&query.query).unwrap(),
None,
ctx.clone(),
);
assert!(fail.is_err());

let output = Output::new_with_affected_rows(1);
Expand Down
4 changes: 2 additions & 2 deletions tests-integration/src/instance.rs
Original file line number Diff line number Diff line change
Expand Up @@ -323,7 +323,7 @@ mod tests {

fn pre_execute(
&self,
_statement: &Statement,
_statement: Option<&Statement>,
_plan: Option<&LogicalPlan>,
_query_ctx: QueryContextRef,
) -> Result<()> {
Expand Down Expand Up @@ -396,7 +396,7 @@ mod tests {

fn pre_execute(
&self,
_statement: &Statement,
_statement: Option<&Statement>,
_plan: Option<&LogicalPlan>,
_query_ctx: QueryContextRef,
) -> Result<()> {
Expand Down
Loading