From 87f1a8c622e04444c948f65ac35a120f3af7a8fe Mon Sep 17 00:00:00 2001 From: discord9 Date: Mon, 21 Apr 2025 19:16:31 +0800 Subject: [PATCH] refactor: per review --- src/common/query/src/logical_plan.rs | 3 +- src/flow/src/adapter/flownode_impl.rs | 49 ++++++++++--------- src/flow/src/batching_mode/engine.rs | 25 +++++----- src/flow/src/engine.rs | 2 +- src/flow/src/error.rs | 27 ++++++++-- .../common/flow/flow_advance_ttl.result | 2 +- 6 files changed, 66 insertions(+), 42 deletions(-) diff --git a/src/common/query/src/logical_plan.rs b/src/common/query/src/logical_plan.rs index 7f09739adc..bc534f4e7a 100644 --- a/src/common/query/src/logical_plan.rs +++ b/src/common/query/src/logical_plan.rs @@ -88,7 +88,8 @@ pub fn rename_logical_plan_columns( /// /// if input logical plan is not `insert into table_name `, return None /// -/// Returned TableName will use provided catalog and schema if not specified in the logical plan +/// Returned TableName will use provided catalog and schema if not specified in the logical plan, +/// if table scan in logical plan have full table name, will **NOT** override it. pub fn breakup_insert_plan( plan: &LogicalPlan, catalog: &str, diff --git a/src/flow/src/adapter/flownode_impl.rs b/src/flow/src/adapter/flownode_impl.rs index 3d32c1c022..ab8e0ffa5c 100644 --- a/src/flow/src/adapter/flownode_impl.rs +++ b/src/flow/src/adapter/flownode_impl.rs @@ -31,7 +31,7 @@ use datatypes::value::Value; use futures::TryStreamExt; use itertools::Itertools; use session::context::QueryContextBuilder; -use snafu::{IntoError, OptionExt, ResultExt}; +use snafu::{ensure, IntoError, OptionExt, ResultExt}; use store_api::storage::{RegionId, TableId}; use tokio::sync::{Mutex, RwLock}; @@ -40,7 +40,7 @@ use crate::batching_mode::engine::BatchingEngine; use crate::engine::FlowEngine; use crate::error::{ CreateFlowSnafu, ExternalSnafu, FlowNotFoundSnafu, InsertIntoFlowSnafu, InternalSnafu, - ListFlowsSnafu, + ListFlowsSnafu, SyncCheckTaskSnafu, }; use crate::metrics::METRIC_FLOW_TASK_COUNT; use crate::repr::{self, DiffRow}; @@ -111,11 +111,13 @@ impl FlowDualEngine { } if retry == max_retry { - return crate::error::UnexpectedSnafu { - reason: format!( - "Can't sync with check task for flow {} with allow_drop={}", - flow_id, allow_drop - ), + error!( + "Can't sync with check task for flow {} with allow_drop={}", + flow_id, allow_drop + ); + return SyncCheckTaskSnafu { + flow_id, + allow_drop, } .fail(); } @@ -273,27 +275,30 @@ impl FlowDualEngine { } pub async fn start_flow_consistent_check_task(self: &Arc) -> Result<(), Error> { - if self.check_task.lock().await.is_some() { + let mut check_task = self.check_task.lock().await; + ensure!( + check_task.is_none(), crate::error::UnexpectedSnafu { reason: "Flow consistent check task already exists", } - .fail()?; - } + ); let task = ConsistentCheckTask::start_check_task(self).await?; - self.check_task.lock().await.replace(task); + *check_task = Some(task); Ok(()) } pub async fn stop_flow_consistent_check_task(&self) -> Result<(), Error> { info!("Stopping flow consistent check task"); - if let Some(task) = self.check_task.lock().await.take() { - task.stop().await?; - } else { + let mut check_task = self.check_task.lock().await; + + ensure!( + check_task.is_some(), crate::error::UnexpectedSnafu { reason: "Flow consistent check task does not exist", } - .fail()?; - } + ); + + check_task.take().expect("Already checked").stop().await?; info!("Stopped flow consistent check task"); Ok(()) } @@ -526,11 +531,11 @@ impl FlowEngine for FlowDualEngine { } } - async fn list_flows(&self) -> Result, Error> { - let mut stream_flows = self.streaming_engine.list_flows().await?; + async fn list_flows(&self) -> Result, Error> { + let stream_flows = self.streaming_engine.list_flows().await?; let batch_flows = self.batching_engine.list_flows().await?; - stream_flows.extend(batch_flows); - Ok(stream_flows) + + Ok(stream_flows.into_iter().chain(batch_flows)) } async fn handle_flow_inserts( @@ -807,14 +812,14 @@ impl FlowEngine for FlowWorkerManager { self.flow_exist_inner(flow_id).await } - async fn list_flows(&self) -> Result, Error> { + async fn list_flows(&self) -> Result, Error> { Ok(self .flow_err_collectors .read() .await .keys() .cloned() - .collect()) + .collect::>()) } async fn handle_flow_inserts( diff --git a/src/flow/src/batching_mode/engine.rs b/src/flow/src/batching_mode/engine.rs index 9ab2ea0046..7504362003 100644 --- a/src/flow/src/batching_mode/engine.rs +++ b/src/flow/src/batching_mode/engine.rs @@ -24,8 +24,8 @@ use common_meta::key::flow::FlowMetadataManagerRef; use common_meta::key::table_info::{TableInfoManager, TableInfoValue}; use common_meta::key::TableMetadataManagerRef; use common_runtime::JoinHandle; -use common_telemetry::info; use common_telemetry::tracing::warn; +use common_telemetry::{debug, info}; use common_time::TimeToLive; use query::QueryEngineRef; use snafu::{ensure, OptionExt, ResultExt}; @@ -39,8 +39,7 @@ use crate::batching_mode::time_window::{find_time_window_expr, TimeWindowExpr}; use crate::batching_mode::utils::sql_to_df_plan; use crate::engine::FlowEngine; use crate::error::{ - ExternalSnafu, FlowAlreadyExistSnafu, InvalidQuerySnafu, TableNotFoundMetaSnafu, - UnexpectedSnafu, + ExternalSnafu, FlowAlreadyExistSnafu, TableNotFoundMetaSnafu, UnexpectedSnafu, UnsupportedSnafu, }; use crate::{CreateFlowArgs, Error, FlowId, TableName}; @@ -187,10 +186,10 @@ async fn get_table_name( table_info: &TableInfoManager, table_id: &TableId, ) -> Result { - get_table_info(table_info, table_id) - .await - .map(|info| info.table_name()) - .map(|name| [name.catalog_name, name.schema_name, name.table_name]) + get_table_info(table_info, table_id).await.map(|info| { + let name = info.table_name(); + [name.catalog_name, name.schema_name, name.table_name] + }) } async fn get_table_info( @@ -269,9 +268,9 @@ impl BatchingEngine { let table_name = get_table_name(self.table_meta.table_info_manager(), &src_id).await?; let table_info = get_table_info(self.table_meta.table_info_manager(), &src_id).await?; if table_info.table_info.meta.options.ttl == Some(TimeToLive::Instant) { - InvalidQuerySnafu { + UnsupportedSnafu { reason: format!( - "Source table `{}`(id={}) has instant TTL, flow will only evaluate to empty results with such table, use a small ttl instead of instant", + "Source table `{}`(id={}) has instant TTL, Instant TTL is not supported under batching mode. Consider using a TTL longer than flush interval", table_name.join("."), src_id ), @@ -363,7 +362,7 @@ impl BatchingEngine { } pub async fn flush_flow_inner(&self, flow_id: FlowId) -> Result { - info!("Try flush flow {flow_id}"); + debug!("Try flush flow {flow_id}"); let task = self.tasks.read().await.get(&flow_id).cloned(); let task = task.with_context(|| UnexpectedSnafu { reason: format!("Can't found task for flow {flow_id}"), @@ -376,7 +375,7 @@ impl BatchingEngine { .await?; let affected_rows = res.map(|(r, _)| r).unwrap_or_default() as usize; - info!( + debug!( "Successfully flush flow {flow_id}, affected rows={}", affected_rows ); @@ -402,8 +401,8 @@ impl FlowEngine for BatchingEngine { async fn flow_exist(&self, flow_id: FlowId) -> Result { Ok(self.flow_exist_inner(flow_id).await) } - async fn list_flows(&self) -> Result, Error> { - Ok(self.tasks.read().await.keys().cloned().collect()) + async fn list_flows(&self) -> Result, Error> { + Ok(self.tasks.read().await.keys().cloned().collect::>()) } async fn handle_flow_inserts( &self, diff --git a/src/flow/src/engine.rs b/src/flow/src/engine.rs index df227e8197..1338e893f3 100644 --- a/src/flow/src/engine.rs +++ b/src/flow/src/engine.rs @@ -50,7 +50,7 @@ pub trait FlowEngine { /// Check if the flow exists async fn flow_exist(&self, flow_id: FlowId) -> Result; /// List all flows - async fn list_flows(&self) -> Result, Error>; + async fn list_flows(&self) -> Result, Error>; /// Handle the insert requests for the flow async fn handle_flow_inserts( &self, diff --git a/src/flow/src/error.rs b/src/flow/src/error.rs index 1741f8cb1b..0f79652a41 100644 --- a/src/flow/src/error.rs +++ b/src/flow/src/error.rs @@ -149,6 +149,13 @@ pub enum Error { location: Location, }, + #[snafu(display("Unsupported: {reason}"))] + Unsupported { + reason: String, + #[snafu(implicit)] + location: Location, + }, + #[snafu(display("Unsupported temporal filter: {reason}"))] UnsupportedTemporalFilter { reason: String, @@ -189,6 +196,18 @@ pub enum Error { location: Location, }, + #[snafu(display( + "Failed to sync with check task for flow {} with allow_drop={}", + flow_id, + allow_drop + ))] + SyncCheckTask { + flow_id: FlowId, + allow_drop: bool, + #[snafu(implicit)] + location: Location, + }, + #[snafu(display("Failed to start server"))] StartServer { #[snafu(implicit)] @@ -280,10 +299,10 @@ impl ErrorExt for Error { Self::CreateFlow { .. } | Self::Arrow { .. } | Self::Time { .. } => { StatusCode::EngineExecuteQuery } - Self::Unexpected { .. } => StatusCode::Unexpected, - Self::NotImplemented { .. } | Self::UnsupportedTemporalFilter { .. } => { - StatusCode::Unsupported - } + Self::Unexpected { .. } | Self::SyncCheckTask { .. } => StatusCode::Unexpected, + Self::NotImplemented { .. } + | Self::UnsupportedTemporalFilter { .. } + | Self::Unsupported { .. } => StatusCode::Unsupported, Self::External { source, .. } => source.status_code(), Self::Internal { .. } | Self::CacheRequired { .. } => StatusCode::Internal, Self::StartServer { source, .. } | Self::ShutdownServer { source, .. } => { diff --git a/tests/cases/standalone/common/flow/flow_advance_ttl.result b/tests/cases/standalone/common/flow/flow_advance_ttl.result index f23928ba7d..3f5a940977 100644 --- a/tests/cases/standalone/common/flow/flow_advance_ttl.result +++ b/tests/cases/standalone/common/flow/flow_advance_ttl.result @@ -16,7 +16,7 @@ SELECT FROM distinct_basic; -Error: 3001(EngineExecuteQuery), Invalid query: Source table `greptime.public.distinct_basic`(id=REDACTED) has instant TTL, flow will only evaluate to empty results with such table, use a small ttl instead of instant +Error: 3001(EngineExecuteQuery), Unsupported: Source table `greptime.public.distinct_basic`(id=REDACTED) has instant TTL, Instant TTL is not supported under batching mode. Consider using a TTL longer than flush interval ALTER TABLE distinct_basic SET 'ttl' = '5s';