refactor: pass LogicalPlan to promql execution interceptor (#4937)

This commit is contained in:
LFC
2024-11-05 16:49:05 +08:00
committed by GitHub
parent a8b426aebe
commit 2e9737c01d
7 changed files with 30 additions and 15 deletions

View File

@@ -37,7 +37,6 @@ use operator::delete::Deleter;
use operator::insert::Inserter;
use operator::statement::StatementExecutor;
use partition::manager::PartitionRuleManager;
use query::stats::StatementStatistics;
use query::{QueryEngine, QueryEngineFactory};
use servers::error::{AlreadyStartedSnafu, StartGrpcSnafu, TcpBindSnafu, TcpIncomingSnafu};
use servers::server::Server;
@@ -476,7 +475,6 @@ impl FrontendInvoker {
layered_cache_registry.clone(),
inserter.clone(),
table_route_cache,
StatementStatistics::default(),
));
let invoker = FrontendInvoker::new(inserter, deleter, statement_executor);

View File

@@ -51,6 +51,7 @@ use query::metrics::OnDone;
use query::parser::{PromQuery, QueryLanguageParser, QueryStatement};
use query::query_engine::options::{validate_catalog_and_schema, QueryOptions};
use query::query_engine::DescribeResult;
use query::stats::StatementStatistics;
use query::QueryEngineRef;
use raft_engine::{Config, ReadableSize, RecoveryMode};
use servers::error as server_error;
@@ -122,6 +123,7 @@ pub struct Instance {
deleter: DeleterRef,
export_metrics_task: Option<ExportMetricsTask>,
table_metadata_manager: TableMetadataManagerRef,
stats: StatementStatistics,
}
impl Instance {
@@ -228,6 +230,10 @@ impl Instance {
let query_interceptor = self.plugins.get::<SqlQueryInterceptorRef<Error>>();
let query_interceptor = query_interceptor.as_ref();
let _slow_query_timer = self
.stats
.start_slow_query_timer(QueryStatement::Sql(stmt.clone()));
let output = match stmt {
Statement::Query(_) | Statement::Explain(_) | Statement::Delete(_) => {
let stmt = QueryStatement::Sql(stmt);
@@ -412,7 +418,6 @@ impl PrometheusHandler for Instance {
let interceptor = self
.plugins
.get::<PromQueryInterceptorRef<server_error::Error>>();
interceptor.pre_execute(query, query_ctx.clone())?;
self.plugins
.get::<PermissionCheckerRef>()
@@ -426,9 +431,20 @@ impl PrometheusHandler for Instance {
}
})?;
let _slow_query_timer = self.stats.start_slow_query_timer(stmt.clone());
let plan = self
.statement_executor
.plan(&stmt, query_ctx.clone())
.await
.map_err(BoxedError::new)
.context(ExecuteQuerySnafu)?;
interceptor.pre_execute(query, Some(&plan), query_ctx.clone())?;
let output = self
.statement_executor
.execute_stmt(stmt, query_ctx.clone())
.exec_plan(plan, query_ctx.clone())
.await
.map_err(BoxedError::new)
.context(ExecuteQuerySnafu)?;

View File

@@ -185,7 +185,6 @@ impl FrontendBuilder {
local_cache_invalidator,
inserter.clone(),
table_route_cache,
self.stats,
));
let pipeline_operator = Arc::new(PipelineOperator::new(
@@ -211,6 +210,7 @@ impl FrontendBuilder {
deleter,
export_metrics_task: None,
table_metadata_manager: Arc::new(TableMetadataManager::new(kv_backend)),
stats: self.stats,
})
}
}

View File

@@ -45,7 +45,6 @@ use common_time::Timestamp;
use datafusion_expr::LogicalPlan;
use partition::manager::{PartitionRuleManager, PartitionRuleManagerRef};
use query::parser::QueryStatement;
use query::stats::StatementStatistics;
use query::QueryEngineRef;
use session::context::{Channel, QueryContextRef};
use session::table_name::table_idents_to_full_name;
@@ -81,13 +80,11 @@ pub struct StatementExecutor {
partition_manager: PartitionRuleManagerRef,
cache_invalidator: CacheInvalidatorRef,
inserter: InserterRef,
stats: StatementStatistics,
}
pub type StatementExecutorRef = Arc<StatementExecutor>;
impl StatementExecutor {
#[allow(clippy::too_many_arguments)]
pub fn new(
catalog_manager: CatalogManagerRef,
query_engine: QueryEngineRef,
@@ -96,7 +93,6 @@ impl StatementExecutor {
cache_invalidator: CacheInvalidatorRef,
inserter: InserterRef,
table_route_cache: TableRouteCacheRef,
stats: StatementStatistics,
) -> Self {
Self {
catalog_manager,
@@ -108,23 +104,22 @@ impl StatementExecutor {
partition_manager: Arc::new(PartitionRuleManager::new(kv_backend, table_route_cache)),
cache_invalidator,
inserter,
stats,
}
}
#[tracing::instrument(skip_all)]
#[cfg(feature = "testing")]
pub async fn execute_stmt(
&self,
stmt: QueryStatement,
query_ctx: QueryContextRef,
) -> Result<Output> {
let _slow_query_timer = self.stats.start_slow_query_timer(stmt.clone());
match stmt {
QueryStatement::Sql(stmt) => self.execute_sql(stmt, query_ctx).await,
QueryStatement::Promql(_) => self.plan_exec(stmt, query_ctx).await,
}
}
#[tracing::instrument(skip_all)]
pub async fn execute_sql(&self, stmt: Statement, query_ctx: QueryContextRef) -> Result<Output> {
match stmt {
Statement::Query(_) | Statement::Explain(_) | Statement::Delete(_) => {
@@ -361,6 +356,7 @@ impl StatementExecutor {
Ok(Output::new_with_affected_rows(0))
}
#[tracing::instrument(skip_all)]
pub async fn plan(
&self,
stmt: &QueryStatement,
@@ -374,6 +370,7 @@ impl StatementExecutor {
}
/// Execute [`LogicalPlan`] directly.
#[tracing::instrument(skip_all)]
pub async fn exec_plan(&self, plan: LogicalPlan, query_ctx: QueryContextRef) -> Result<Output> {
self.query_engine
.execute(plan, query_ctx)

View File

@@ -201,6 +201,7 @@ pub trait PromQueryInterceptor {
fn pre_execute(
&self,
_query: &PromQuery,
_plan: Option<&LogicalPlan>,
_query_ctx: QueryContextRef,
) -> Result<(), Self::Error> {
Ok(())
@@ -229,10 +230,11 @@ where
fn pre_execute(
&self,
query: &PromQuery,
plan: Option<&LogicalPlan>,
query_ctx: QueryContextRef,
) -> Result<(), Self::Error> {
if let Some(this) = self {
this.pre_execute(query, query_ctx)
this.pre_execute(query, plan, query_ctx)
} else {
Ok(())
}

View File

@@ -18,6 +18,7 @@ use api::v1::greptime_request::Request;
use api::v1::{InsertRequest, InsertRequests};
use client::OutputData;
use common_query::Output;
use datafusion_expr::LogicalPlan;
use query::parser::PromQuery;
use servers::error::{self, InternalSnafu, NotSupportedSnafu, Result};
use servers::interceptor::{GrpcQueryInterceptor, PromQueryInterceptor, SqlQueryInterceptor};
@@ -89,6 +90,7 @@ impl PromQueryInterceptor for NoopInterceptor {
fn pre_execute(
&self,
query: &PromQuery,
_plan: Option<&LogicalPlan>,
_query_ctx: QueryContextRef,
) -> std::result::Result<(), Self::Error> {
match query.query.as_str() {
@@ -119,7 +121,7 @@ fn test_prom_interceptor() {
..Default::default()
};
let fail = PromQueryInterceptor::pre_execute(&di, &query, ctx.clone());
let fail = PromQueryInterceptor::pre_execute(&di, &query, None, ctx.clone());
assert!(fail.is_err());
let output = Output::new_with_affected_rows(1);

View File

@@ -52,7 +52,7 @@ mysql_async = { version = "0.33", default-features = false, features = [
"default-rustls",
] }
object-store.workspace = true
operator.workspace = true
operator = { workspace = true, features = ["testing"] }
prost.workspace = true
query.workspace = true
rstest.workspace = true