From b4aa0c8b8b84db8484bff6b22d4629c9132deba4 Mon Sep 17 00:00:00 2001 From: discord9 Date: Mon, 21 Apr 2025 15:53:03 +0800 Subject: [PATCH] refactor: per review --- src/flow/src/adapter/flownode_impl.rs | 26 ++++++++++++------- src/flow/src/batching_mode/frontend_client.rs | 7 ++++- src/flow/src/batching_mode/task.rs | 21 +++------------ src/flow/src/batching_mode/utils.rs | 14 ++++++---- 4 files changed, 34 insertions(+), 34 deletions(-) diff --git a/src/flow/src/adapter/flownode_impl.rs b/src/flow/src/adapter/flownode_impl.rs index 87510f18ed..3d32c1c022 100644 --- a/src/flow/src/adapter/flownode_impl.rs +++ b/src/flow/src/adapter/flownode_impl.rs @@ -57,7 +57,6 @@ pub struct FlowDualEngine { streaming_engine: Arc, batching_engine: Arc, /// helper struct for faster query flow by table id or vice versa - /// need to also use as a lock so use tokio RwLock src_table2flow: RwLock, flow_metadata_manager: Arc, catalog_manager: Arc, @@ -112,7 +111,13 @@ impl FlowDualEngine { } if retry == max_retry { - return FlowNotFoundSnafu { id: flow_id }.fail(); + return crate::error::UnexpectedSnafu { + reason: format!( + "Can't sync with check task for flow {} with allow_drop={}", + flow_id, allow_drop + ), + } + .fail(); } info!("Successfully sync with check task for flow {}", flow_id); @@ -170,12 +175,12 @@ impl FlowDualEngine { .into_iter() .map(|i| i as FlowId) .collect::>(); - let actual_exist = self.list_flows().await?.into_iter().collect::>(); + let actual_exists = self.list_flows().await?.into_iter().collect::>(); let to_be_created = should_exists .iter() - .filter(|id| !actual_exist.contains(id)) + .filter(|id| !actual_exists.contains(id)) .collect::>(); - let to_be_dropped = actual_exist + let to_be_dropped = actual_exists .iter() .filter(|id| !should_exists.contains(id)) .collect::>(); @@ -463,20 +468,21 @@ impl FlowEngine for FlowDualEngine { let flow_id = args.flow_id; let src_table_ids = args.source_table_ids.clone(); - let mut src_table2flow = self.src_table2flow.write().await; let res = match flow_type { FlowType::Batching => self.batching_engine.create_flow(args).await, FlowType::Streaming => self.streaming_engine.create_flow(args).await, }?; - src_table2flow.add_flow(flow_id, flow_type, src_table_ids); + self.src_table2flow + .write() + .await + .add_flow(flow_id, flow_type, src_table_ids); Ok(res) } async fn remove_flow(&self, flow_id: FlowId) -> Result<(), Error> { - let mut src_table2flow = self.src_table2flow.write().await; - let flow_type = src_table2flow.get_flow_type(flow_id); + let flow_type = self.src_table2flow.read().await.get_flow_type(flow_id); match flow_type { Some(FlowType::Batching) => self.batching_engine.remove_flow(flow_id).await, @@ -495,7 +501,7 @@ impl FlowEngine for FlowDualEngine { } }?; // remove mapping - src_table2flow.remove_flow(flow_id); + self.src_table2flow.write().await.remove_flow(flow_id); Ok(()) } diff --git a/src/flow/src/batching_mode/frontend_client.rs b/src/flow/src/batching_mode/frontend_client.rs index 4824539056..002804fc92 100644 --- a/src/flow/src/batching_mode/frontend_client.rs +++ b/src/flow/src/batching_mode/frontend_client.rs @@ -241,7 +241,12 @@ impl FrontendClient { let database_client = { database_client .lock() - .unwrap() + .map_err(|e| { + UnexpectedSnafu { + reason: format!("Failed to lock database client: {e}"), + } + .build() + })? .as_ref() .context(UnexpectedSnafu { reason: "Standalone's frontend instance is not set", diff --git a/src/flow/src/batching_mode/task.rs b/src/flow/src/batching_mode/task.rs index 0d61a23d50..a3af6fe242 100644 --- a/src/flow/src/batching_mode/task.rs +++ b/src/flow/src/batching_mode/task.rs @@ -284,7 +284,7 @@ impl BatchingTask { // fix all table ref by make it fully qualified, i.e. "table_name" => "catalog_name.schema_name.table_name" let fixed_plan = plan .clone() - .transform(|p| { + .transform_down_with_subqueries(|p| { if let LogicalPlan::TableScan(mut table_scan) = p { let resolved = table_scan.table_name.resolve(catalog, schema); table_scan.table_name = resolved.into(); @@ -609,11 +609,7 @@ fn create_table_with_expr( AUTO_CREATED_UPDATE_AT_TS_COL, ConcreteDataType::timestamp_millisecond_datatype(), true, - ) - /* .with_default_constraint(Some(ColumnDefaultConstraint::Function(NOW_FN.to_string()))) - .context(DatatypesSnafu { - extra: "Failed to build column `update_at TimestampMillisecond default now()`", - })?*/ ; + ); column_schemas.push(update_at_schema); let time_index = if let Some(time_index) = first_time_stamp { @@ -625,15 +621,7 @@ fn create_table_with_expr( ConcreteDataType::timestamp_millisecond_datatype(), false, ) - .with_time_index(true), /* .with_default_constraint(Some(ColumnDefaultConstraint::Value(Value::Timestamp( - Timestamp::new_millisecond(0), - )))) - .context(DatatypesSnafu { - extra: format!( - "Failed to build column `{} TimestampMillisecond TIME INDEX default 0`", - AUTO_CREATED_PLACEHOLDER_TS_COL - ), - })?*/ + .with_time_index(true), ); AUTO_CREATED_PLACEHOLDER_TS_COL.to_string() }; @@ -726,7 +714,6 @@ mod test { ConcreteDataType::timestamp_millisecond_datatype(), true, ); - // .with_default_constraint(Some(ColumnDefaultConstraint::Function(NOW_FN.to_string()))) let ts_placeholder_schema = ColumnSchema::new( AUTO_CREATED_PLACEHOLDER_TS_COL, @@ -734,8 +721,6 @@ mod test { false, ) .with_time_index(true); - // .with_default_constraint(Some(ColumnDefaultConstraint::Value(Value::Timestamp( - // Timestamp::new_millisecond(0), )))) let testcases = vec![ TestCase { diff --git a/src/flow/src/batching_mode/utils.rs b/src/flow/src/batching_mode/utils.rs index 3b9329f546..b90c6a7be8 100644 --- a/src/flow/src/batching_mode/utils.rs +++ b/src/flow/src/batching_mode/utils.rs @@ -26,7 +26,7 @@ use datafusion::sql::unparser::Unparser; use datafusion_common::tree_node::{ Transformed, TreeNodeRecursion, TreeNodeRewriter, TreeNodeVisitor, }; -use datafusion_common::{DFSchema, DataFusionError}; +use datafusion_common::{DFSchema, DataFusionError, ScalarValue}; use datafusion_expr::{Distinct, LogicalPlan, Projection}; use datatypes::schema::SchemaRef; use query::parser::QueryLanguageParser; @@ -278,6 +278,9 @@ impl TreeNodeRewriter for AddAutoColumnRewriter { .get(idx) .map(|c| c.name.clone()) { + // if the data type mismatched, later check_execute will error out + // hence no need to check it here, beside, optimize pass might be able to cast it + // so checking here is not necessary *expr = expr.clone().alias(col_name); } } @@ -289,7 +292,8 @@ impl TreeNodeRewriter for AddAutoColumnRewriter { debug!("query_col_cnt={query_col_cnt}, table_col_cnt={table_col_cnt}"); let placeholder_ts_expr = - datafusion::logical_expr::lit(0).alias(AUTO_CREATED_PLACEHOLDER_TS_COL); + datafusion::logical_expr::lit(ScalarValue::TimestampMillisecond(Some(0), None)) + .alias(AUTO_CREATED_PLACEHOLDER_TS_COL); if query_col_cnt == table_col_cnt { // still need to add alias, see below @@ -499,7 +503,7 @@ mod test { // add ts placeholder ( "SELECT number FROM numbers_with_ts", - Ok("SELECT numbers_with_ts.number, 0 AS __ts_placeholder FROM numbers_with_ts"), + Ok("SELECT numbers_with_ts.number, CAST('1970-01-01 00:00:00' AS TIMESTAMP) AS __ts_placeholder FROM numbers_with_ts"), vec![ ColumnSchema::new("number", ConcreteDataType::int32_datatype(), true), ColumnSchema::new( @@ -527,7 +531,7 @@ mod test { // add update_at and ts placeholder ( "SELECT number FROM numbers_with_ts", - Ok("SELECT numbers_with_ts.number, now() AS update_at, 0 AS __ts_placeholder FROM numbers_with_ts"), + Ok("SELECT numbers_with_ts.number, now() AS update_at, CAST('1970-01-01 00:00:00' AS TIMESTAMP) AS __ts_placeholder FROM numbers_with_ts"), vec![ ColumnSchema::new("number", ConcreteDataType::int32_datatype(), true), ColumnSchema::new( @@ -546,7 +550,7 @@ mod test { // add ts placeholder ( "SELECT number, ts FROM numbers_with_ts", - Ok("SELECT numbers_with_ts.number, numbers_with_ts.ts AS update_at, 0 AS __ts_placeholder FROM numbers_with_ts"), + Ok("SELECT numbers_with_ts.number, numbers_with_ts.ts AS update_at, CAST('1970-01-01 00:00:00' AS TIMESTAMP) AS __ts_placeholder FROM numbers_with_ts"), vec![ ColumnSchema::new("number", ConcreteDataType::int32_datatype(), true), ColumnSchema::new(