From e647559d277822d9ab1233daf435bc9e1301bfb7 Mon Sep 17 00:00:00 2001 From: discord9 Date: Mon, 21 Apr 2025 15:01:13 +0800 Subject: [PATCH] refactor: AddAutoColumnRewriter check for Projection --- src/flow/src/batching_mode/utils.rs | 10 ++++++++-- src/flow/src/server.rs | 15 ++++++++++++++- src/frontend/src/error.rs | 12 ++++++++++++ src/frontend/src/instance/grpc.rs | 6 +++--- 4 files changed, 37 insertions(+), 6 deletions(-) diff --git a/src/flow/src/batching_mode/utils.rs b/src/flow/src/batching_mode/utils.rs index eba790bd65..3b9329f546 100644 --- a/src/flow/src/batching_mode/utils.rs +++ b/src/flow/src/batching_mode/utils.rs @@ -256,7 +256,13 @@ impl TreeNodeRewriter for AddAutoColumnRewriter { node = projection; } - let mut exprs = node.expressions(); + // only do rewrite if found the outermost projection + let mut exprs = if let LogicalPlan::Projection(project) = &node { + project.expr.clone() + } else { + return Ok(Transformed::no(node)); + }; + let all_names = self .schema .column_schemas() @@ -333,7 +339,7 @@ impl TreeNodeRewriter for AddAutoColumnRewriter { } else { return Err(DataFusionError::Plan(format!( "Expect table have 0,1 or 2 columns more than query columns, found {} query columns {:?}, {} table columns {:?} at node {:?}", - query_col_cnt, node.expressions(), table_col_cnt, self.schema.column_schemas(), node + query_col_cnt, exprs, table_col_cnt, self.schema.column_schemas(), node ))); } diff --git a/src/flow/src/server.rs b/src/flow/src/server.rs index 7e26b0a3ff..d7deba3e48 100644 --- a/src/flow/src/server.rs +++ b/src/flow/src/server.rs @@ -29,6 +29,7 @@ use common_meta::key::TableMetadataManagerRef; use common_meta::kv_backend::KvBackendRef; use common_meta::node_manager::{Flownode, NodeManagerRef}; use common_query::Output; +use common_runtime::JoinHandle; use common_telemetry::tracing::info; use futures::{FutureExt, TryStreamExt}; use greptime_proto::v1::flow::{flow_server, FlowRequest, FlowResponse, InsertRequests}; @@ -144,11 +145,16 @@ pub struct FlownodeServer { inner: Arc, } +/// FlownodeServerInner is the inner state of FlownodeServer, +/// this struct mostly useful for construct/start and stop the +/// flow node server struct FlownodeServerInner { /// worker shutdown signal, not to be confused with server_shutdown_tx worker_shutdown_tx: Mutex>, /// server shutdown signal for shutdown grpc server server_shutdown_tx: Mutex>, + /// streaming task handler + streaming_task_handler: Mutex>>, flow_service: FlowService, } @@ -161,6 +167,7 @@ impl FlownodeServer { flow_service, worker_shutdown_tx: Mutex::new(tx), server_shutdown_tx: Mutex::new(server_tx), + streaming_task_handler: Mutex::new(None), }), } } @@ -168,9 +175,15 @@ impl FlownodeServer { /// Start the background task for streaming computation. async fn start_workers(&self) -> Result<(), Error> { let manager_ref = self.inner.flow_service.dual_engine.clone(); - let _handle = manager_ref + let handle = manager_ref .streaming_engine() .run_background(Some(self.inner.worker_shutdown_tx.lock().await.subscribe())); + self.inner + .streaming_task_handler + .lock() + .await + .replace(handle); + self.inner .flow_service .dual_engine diff --git a/src/frontend/src/error.rs b/src/frontend/src/error.rs index 8b896f845f..7880656f51 100644 --- a/src/frontend/src/error.rs +++ b/src/frontend/src/error.rs @@ -19,6 +19,8 @@ use common_error::define_into_tonic_status; use common_error::ext::{BoxedError, ErrorExt}; use common_error::status_code::StatusCode; use common_macro::stack_trace_debug; +use common_query::error::datafusion_status_code; +use datafusion::error::DataFusionError; use session::ReadPreference; use snafu::{Location, Snafu}; use store_api::storage::RegionId; @@ -347,6 +349,14 @@ pub enum Error { location: Location, source: common_query::error::Error, }, + + #[snafu(display("DataFusionError"))] + DataFusion { + #[snafu(source)] + error: DataFusionError, + #[snafu(implicit)] + location: Location, + }, } pub type Result = std::result::Result; @@ -423,6 +433,8 @@ impl ErrorExt for Error { Error::TableOperation { source, .. } => source.status_code(), Error::InFlightWriteBytesExceeded { .. } => StatusCode::RateLimited, + + Error::DataFusion { error, .. } => datafusion_status_code::(error, None), } } diff --git a/src/frontend/src/instance/grpc.rs b/src/frontend/src/instance/grpc.rs index f9bc3b3696..4a83a99d12 100644 --- a/src/frontend/src/instance/grpc.rs +++ b/src/frontend/src/instance/grpc.rs @@ -36,8 +36,8 @@ use snafu::{ensure, OptionExt, ResultExt}; use table::table_name::TableName; use crate::error::{ - CatalogSnafu, Error, InFlightWriteBytesExceededSnafu, IncompleteGrpcRequestSnafu, - NotSupportedSnafu, PermissionSnafu, PlanStatementSnafu, Result, + CatalogSnafu, DataFusionSnafu, Error, InFlightWriteBytesExceededSnafu, + IncompleteGrpcRequestSnafu, NotSupportedSnafu, PermissionSnafu, PlanStatementSnafu, Result, SubstraitDecodeLogicalPlanSnafu, TableNotFoundSnafu, TableOperationSnafu, }; use crate::instance::{attach_timer, Instance}; @@ -365,7 +365,7 @@ impl Instance { .arrow_schema() .clone() .try_into() - .unwrap(), + .context(DataFusionSnafu)?, ); let insert_into = add_insert_to_logical_plan(table_name, df_schema, logical_plan)