From ae00e28b2a8a76cbc41d62b98923e77168ffeeb8 Mon Sep 17 00:00:00 2001 From: discord9 Date: Mon, 21 Apr 2025 20:56:43 +0800 Subject: [PATCH] refactor: per review partially --- src/common/meta/src/error.rs | 10 ++- src/flow/src/adapter/flownode_impl.rs | 22 ++---- src/flow/src/error.rs | 11 ++- src/operator/src/statement/ddl.rs | 109 +++++++++++++------------- 4 files changed, 78 insertions(+), 74 deletions(-) diff --git a/src/common/meta/src/error.rs b/src/common/meta/src/error.rs index 1bdb3d0857..445e8d99cc 100644 --- a/src/common/meta/src/error.rs +++ b/src/common/meta/src/error.rs @@ -401,6 +401,13 @@ pub enum Error { location: Location, }, + #[snafu(display("Invalid flow request body: {:?}", body))] + InvalidFlowRequestBody { + body: Box>, + #[snafu(implicit)] + location: Location, + }, + #[snafu(display("Failed to get kv cache, err: {}", err_msg))] GetKvCache { err_msg: String }, @@ -853,7 +860,8 @@ impl ErrorExt for Error { | TlsConfig { .. } | InvalidSetDatabaseOption { .. } | InvalidUnsetDatabaseOption { .. } - | InvalidTopicNamePrefix { .. } => StatusCode::InvalidArguments, + | InvalidTopicNamePrefix { .. } + | InvalidFlowRequestBody { .. } => StatusCode::InvalidArguments, FlowNotFound { .. } => StatusCode::FlowNotFound, FlowRouteNotFound { .. } => StatusCode::Unexpected, diff --git a/src/flow/src/adapter/flownode_impl.rs b/src/flow/src/adapter/flownode_impl.rs index 3459232f08..2c2088328d 100644 --- a/src/flow/src/adapter/flownode_impl.rs +++ b/src/flow/src/adapter/flownode_impl.rs @@ -39,8 +39,8 @@ use crate::adapter::{CreateFlowArgs, FlowWorkerManager}; use crate::batching_mode::engine::BatchingEngine; use crate::engine::FlowEngine; use crate::error::{ - CreateFlowSnafu, ExternalSnafu, FlowNotFoundSnafu, InsertIntoFlowSnafu, InternalSnafu, - ListFlowsSnafu, SyncCheckTaskSnafu, UnexpectedSnafu, + CreateFlowSnafu, ExternalSnafu, FlowNotFoundSnafu, IllegalCheckTaskStateSnafu, + InsertIntoFlowSnafu, InternalSnafu, ListFlowsSnafu, SyncCheckTaskSnafu, UnexpectedSnafu, }; use crate::metrics::METRIC_FLOW_TASK_COUNT; use crate::repr::{self, DiffRow}; @@ -278,7 +278,7 @@ impl FlowDualEngine { let mut check_task = self.check_task.lock().await; ensure!( check_task.is_none(), - UnexpectedSnafu { + IllegalCheckTaskStateSnafu { reason: "Flow consistent check task already exists", } ); @@ -293,7 +293,7 @@ impl FlowDualEngine { ensure!( check_task.is_some(), - UnexpectedSnafu { + IllegalCheckTaskStateSnafu { reason: "Flow consistent check task does not exist", } ); @@ -670,10 +670,7 @@ impl common_meta::node_manager::Flownode for FlowDualEngine { ..Default::default() }) } - other => common_meta::error::UnexpectedSnafu { - err_msg: format!("Invalid request body: {other:?}"), - } - .fail(), + other => common_meta::error::InvalidFlowRequestBodySnafu { body: other }.fail(), } } @@ -772,14 +769,7 @@ impl common_meta::node_manager::Flownode for FlowWorkerManager { ..Default::default() }) } - None => common_meta::error::UnexpectedSnafu { - err_msg: "Missing request body", - } - .fail(), - _ => common_meta::error::UnexpectedSnafu { - err_msg: "Invalid request body.", - } - .fail(), + other => common_meta::error::InvalidFlowRequestBodySnafu { body: other }.fail(), } } diff --git a/src/flow/src/error.rs b/src/flow/src/error.rs index 0f79652a41..904a0a8fa7 100644 --- a/src/flow/src/error.rs +++ b/src/flow/src/error.rs @@ -196,6 +196,13 @@ pub enum Error { location: Location, }, + #[snafu(display("Illegal check task state: {reason}"))] + IllegalCheckTaskState { + reason: String, + #[snafu(implicit)] + location: Location, + }, + #[snafu(display( "Failed to sync with check task for flow {} with allow_drop={}", flow_id, @@ -299,7 +306,9 @@ impl ErrorExt for Error { Self::CreateFlow { .. } | Self::Arrow { .. } | Self::Time { .. } => { StatusCode::EngineExecuteQuery } - Self::Unexpected { .. } | Self::SyncCheckTask { .. } => StatusCode::Unexpected, + Self::Unexpected { .. } + | Self::SyncCheckTask { .. } + | Self::IllegalCheckTaskState { .. } => StatusCode::Unexpected, Self::NotImplemented { .. } | Self::UnsupportedTemporalFilter { .. } | Self::Unsupported { .. } => StatusCode::Unsupported, diff --git a/src/operator/src/statement/ddl.rs b/src/operator/src/statement/ddl.rs index 44edb339c7..afae466dc0 100644 --- a/src/operator/src/statement/ddl.rs +++ b/src/operator/src/statement/ddl.rs @@ -52,7 +52,6 @@ use query::parser::{QueryLanguageParser, QueryStatement}; use query::plan::extract_and_rewrite_full_table_names; use query::query_engine::DefaultSerializer; use query::sql::create_table_stmt; -use query::QueryEngineRef; use regex::Regex; use session::context::QueryContextRef; use session::table_name::table_idents_to_full_name; @@ -369,62 +368,11 @@ impl StatementExecutor { expr: CreateFlowExpr, query_context: QueryContextRef, ) -> Result { - async fn sql_to_df_plan( - query_ctx: QueryContextRef, - engine: QueryEngineRef, - sql: &str, - ) -> Result { - let stmt = QueryLanguageParser::parse_sql(sql, &query_ctx) - .map_err(BoxedError::new) - .context(ExternalSnafu)?; - let plan = engine - .planner() - .plan(&stmt, query_ctx) - .await - .map_err(BoxedError::new) - .context(ExternalSnafu)?; - Ok(plan) - } - - async fn determine_flow_type(plan: &LogicalPlan) -> Result { - pub struct FindAggr { - is_aggr: bool, - } - - impl TreeNodeVisitor<'_> for FindAggr { - type Node = LogicalPlan; - fn f_down( - &mut self, - node: &Self::Node, - ) -> datafusion_common::Result - { - match node { - LogicalPlan::Aggregate(_) | LogicalPlan::Distinct(_) => { - self.is_aggr = true; - return Ok(datafusion_common::tree_node::TreeNodeRecursion::Stop); - } - _ => (), - } - Ok(datafusion_common::tree_node::TreeNodeRecursion::Continue) - } - } - - let mut find_aggr = FindAggr { is_aggr: false }; - - plan.visit_with_subqueries(&mut find_aggr) - .context(BuildDfLogicalPlanSnafu)?; - if find_aggr.is_aggr { - Ok(FlowType::Batching) - } else { - Ok(FlowType::Streaming) - } - } - - let plan = - sql_to_df_plan(query_context.clone(), self.query_engine.clone(), &expr.sql).await?; - - let flow_type = determine_flow_type(&plan).await?; + let flow_type = self + .determine_flow_type(&expr.sql, query_context.clone()) + .await?; info!("determined flow={} type: {:#?}", expr.flow_name, flow_type); + let expr = { let mut expr = expr; expr.flow_options @@ -447,6 +395,55 @@ impl StatementExecutor { .context(error::ExecuteDdlSnafu) } + /// Determine the flow type based on the SQL query + /// + /// If it contains aggregation or distinct, then it is a batch flow, otherwise it is a streaming flow + async fn determine_flow_type(&self, sql: &str, query_ctx: QueryContextRef) -> Result { + let engine = &self.query_engine; + let stmt = QueryLanguageParser::parse_sql(sql, &query_ctx) + .map_err(BoxedError::new) + .context(ExternalSnafu)?; + let plan = engine + .planner() + .plan(&stmt, query_ctx) + .await + .map_err(BoxedError::new) + .context(ExternalSnafu)?; + + /// Visitor to find aggregation or distinct + struct FindAggr { + is_aggr: bool, + } + + impl TreeNodeVisitor<'_> for FindAggr { + type Node = LogicalPlan; + fn f_down( + &mut self, + node: &Self::Node, + ) -> datafusion_common::Result + { + match node { + LogicalPlan::Aggregate(_) | LogicalPlan::Distinct(_) => { + self.is_aggr = true; + return Ok(datafusion_common::tree_node::TreeNodeRecursion::Stop); + } + _ => (), + } + Ok(datafusion_common::tree_node::TreeNodeRecursion::Continue) + } + } + + let mut find_aggr = FindAggr { is_aggr: false }; + + plan.visit_with_subqueries(&mut find_aggr) + .context(BuildDfLogicalPlanSnafu)?; + if find_aggr.is_aggr { + Ok(FlowType::Batching) + } else { + Ok(FlowType::Streaming) + } + } + #[tracing::instrument(skip_all)] pub async fn create_view( &self,