refactor: per review partially

This commit is contained in:
discord9
2025-04-21 20:56:43 +08:00
parent 92d2fafb33
commit ae00e28b2a
4 changed files with 78 additions and 74 deletions

View File

@@ -401,6 +401,13 @@ pub enum Error {
location: Location,
},
#[snafu(display("Invalid flow request body: {:?}", body))]
InvalidFlowRequestBody {
body: Box<Option<api::v1::flow::flow_request::Body>>,
#[snafu(implicit)]
location: Location,
},
#[snafu(display("Failed to get kv cache, err: {}", err_msg))]
GetKvCache { err_msg: String },
@@ -853,7 +860,8 @@ impl ErrorExt for Error {
| TlsConfig { .. }
| InvalidSetDatabaseOption { .. }
| InvalidUnsetDatabaseOption { .. }
| InvalidTopicNamePrefix { .. } => StatusCode::InvalidArguments,
| InvalidTopicNamePrefix { .. }
| InvalidFlowRequestBody { .. } => StatusCode::InvalidArguments,
FlowNotFound { .. } => StatusCode::FlowNotFound,
FlowRouteNotFound { .. } => StatusCode::Unexpected,

View File

@@ -39,8 +39,8 @@ use crate::adapter::{CreateFlowArgs, FlowWorkerManager};
use crate::batching_mode::engine::BatchingEngine;
use crate::engine::FlowEngine;
use crate::error::{
CreateFlowSnafu, ExternalSnafu, FlowNotFoundSnafu, InsertIntoFlowSnafu, InternalSnafu,
ListFlowsSnafu, SyncCheckTaskSnafu, UnexpectedSnafu,
CreateFlowSnafu, ExternalSnafu, FlowNotFoundSnafu, IllegalCheckTaskStateSnafu,
InsertIntoFlowSnafu, InternalSnafu, ListFlowsSnafu, SyncCheckTaskSnafu, UnexpectedSnafu,
};
use crate::metrics::METRIC_FLOW_TASK_COUNT;
use crate::repr::{self, DiffRow};
@@ -278,7 +278,7 @@ impl FlowDualEngine {
let mut check_task = self.check_task.lock().await;
ensure!(
check_task.is_none(),
UnexpectedSnafu {
IllegalCheckTaskStateSnafu {
reason: "Flow consistent check task already exists",
}
);
@@ -293,7 +293,7 @@ impl FlowDualEngine {
ensure!(
check_task.is_some(),
UnexpectedSnafu {
IllegalCheckTaskStateSnafu {
reason: "Flow consistent check task does not exist",
}
);
@@ -670,10 +670,7 @@ impl common_meta::node_manager::Flownode for FlowDualEngine {
..Default::default()
})
}
other => common_meta::error::UnexpectedSnafu {
err_msg: format!("Invalid request body: {other:?}"),
}
.fail(),
other => common_meta::error::InvalidFlowRequestBodySnafu { body: other }.fail(),
}
}
@@ -772,14 +769,7 @@ impl common_meta::node_manager::Flownode for FlowWorkerManager {
..Default::default()
})
}
None => common_meta::error::UnexpectedSnafu {
err_msg: "Missing request body",
}
.fail(),
_ => common_meta::error::UnexpectedSnafu {
err_msg: "Invalid request body.",
}
.fail(),
other => common_meta::error::InvalidFlowRequestBodySnafu { body: other }.fail(),
}
}

View File

@@ -196,6 +196,13 @@ pub enum Error {
location: Location,
},
#[snafu(display("Illegal check task state: {reason}"))]
IllegalCheckTaskState {
reason: String,
#[snafu(implicit)]
location: Location,
},
#[snafu(display(
"Failed to sync with check task for flow {} with allow_drop={}",
flow_id,
@@ -299,7 +306,9 @@ impl ErrorExt for Error {
Self::CreateFlow { .. } | Self::Arrow { .. } | Self::Time { .. } => {
StatusCode::EngineExecuteQuery
}
Self::Unexpected { .. } | Self::SyncCheckTask { .. } => StatusCode::Unexpected,
Self::Unexpected { .. }
| Self::SyncCheckTask { .. }
| Self::IllegalCheckTaskState { .. } => StatusCode::Unexpected,
Self::NotImplemented { .. }
| Self::UnsupportedTemporalFilter { .. }
| Self::Unsupported { .. } => StatusCode::Unsupported,

View File

@@ -52,7 +52,6 @@ use query::parser::{QueryLanguageParser, QueryStatement};
use query::plan::extract_and_rewrite_full_table_names;
use query::query_engine::DefaultSerializer;
use query::sql::create_table_stmt;
use query::QueryEngineRef;
use regex::Regex;
use session::context::QueryContextRef;
use session::table_name::table_idents_to_full_name;
@@ -369,62 +368,11 @@ impl StatementExecutor {
expr: CreateFlowExpr,
query_context: QueryContextRef,
) -> Result<SubmitDdlTaskResponse> {
async fn sql_to_df_plan(
query_ctx: QueryContextRef,
engine: QueryEngineRef,
sql: &str,
) -> Result<LogicalPlan> {
let stmt = QueryLanguageParser::parse_sql(sql, &query_ctx)
.map_err(BoxedError::new)
.context(ExternalSnafu)?;
let plan = engine
.planner()
.plan(&stmt, query_ctx)
.await
.map_err(BoxedError::new)
.context(ExternalSnafu)?;
Ok(plan)
}
async fn determine_flow_type(plan: &LogicalPlan) -> Result<FlowType> {
pub struct FindAggr {
is_aggr: bool,
}
impl TreeNodeVisitor<'_> for FindAggr {
type Node = LogicalPlan;
fn f_down(
&mut self,
node: &Self::Node,
) -> datafusion_common::Result<datafusion_common::tree_node::TreeNodeRecursion>
{
match node {
LogicalPlan::Aggregate(_) | LogicalPlan::Distinct(_) => {
self.is_aggr = true;
return Ok(datafusion_common::tree_node::TreeNodeRecursion::Stop);
}
_ => (),
}
Ok(datafusion_common::tree_node::TreeNodeRecursion::Continue)
}
}
let mut find_aggr = FindAggr { is_aggr: false };
plan.visit_with_subqueries(&mut find_aggr)
.context(BuildDfLogicalPlanSnafu)?;
if find_aggr.is_aggr {
Ok(FlowType::Batching)
} else {
Ok(FlowType::Streaming)
}
}
let plan =
sql_to_df_plan(query_context.clone(), self.query_engine.clone(), &expr.sql).await?;
let flow_type = determine_flow_type(&plan).await?;
let flow_type = self
.determine_flow_type(&expr.sql, query_context.clone())
.await?;
info!("determined flow={} type: {:#?}", expr.flow_name, flow_type);
let expr = {
let mut expr = expr;
expr.flow_options
@@ -447,6 +395,55 @@ impl StatementExecutor {
.context(error::ExecuteDdlSnafu)
}
/// Determine the flow type based on the SQL query
///
/// If it contains aggregation or distinct, then it is a batch flow, otherwise it is a streaming flow
async fn determine_flow_type(&self, sql: &str, query_ctx: QueryContextRef) -> Result<FlowType> {
let engine = &self.query_engine;
let stmt = QueryLanguageParser::parse_sql(sql, &query_ctx)
.map_err(BoxedError::new)
.context(ExternalSnafu)?;
let plan = engine
.planner()
.plan(&stmt, query_ctx)
.await
.map_err(BoxedError::new)
.context(ExternalSnafu)?;
/// Visitor to find aggregation or distinct
struct FindAggr {
is_aggr: bool,
}
impl TreeNodeVisitor<'_> for FindAggr {
type Node = LogicalPlan;
fn f_down(
&mut self,
node: &Self::Node,
) -> datafusion_common::Result<datafusion_common::tree_node::TreeNodeRecursion>
{
match node {
LogicalPlan::Aggregate(_) | LogicalPlan::Distinct(_) => {
self.is_aggr = true;
return Ok(datafusion_common::tree_node::TreeNodeRecursion::Stop);
}
_ => (),
}
Ok(datafusion_common::tree_node::TreeNodeRecursion::Continue)
}
}
let mut find_aggr = FindAggr { is_aggr: false };
plan.visit_with_subqueries(&mut find_aggr)
.context(BuildDfLogicalPlanSnafu)?;
if find_aggr.is_aggr {
Ok(FlowType::Batching)
} else {
Ok(FlowType::Streaming)
}
}
#[tracing::instrument(skip_all)]
pub async fn create_view(
&self,