diff --git a/src/flow/src/adapter/flownode_impl.rs b/src/flow/src/adapter/flownode_impl.rs index ab8e0ffa5c..3459232f08 100644 --- a/src/flow/src/adapter/flownode_impl.rs +++ b/src/flow/src/adapter/flownode_impl.rs @@ -23,7 +23,7 @@ use api::v1::region::InsertRequests; use catalog::CatalogManager; use common_error::ext::BoxedError; use common_meta::ddl::create_flow::FlowType; -use common_meta::error::{Result as MetaResult, UnexpectedSnafu}; +use common_meta::error::Result as MetaResult; use common_meta::key::flow::FlowMetadataManager; use common_runtime::JoinHandle; use common_telemetry::{error, info, trace, warn}; @@ -40,7 +40,7 @@ use crate::batching_mode::engine::BatchingEngine; use crate::engine::FlowEngine; use crate::error::{ CreateFlowSnafu, ExternalSnafu, FlowNotFoundSnafu, InsertIntoFlowSnafu, InternalSnafu, - ListFlowsSnafu, SyncCheckTaskSnafu, + ListFlowsSnafu, SyncCheckTaskSnafu, UnexpectedSnafu, }; use crate::metrics::METRIC_FLOW_TASK_COUNT; use crate::repr::{self, DiffRow}; @@ -278,7 +278,7 @@ impl FlowDualEngine { let mut check_task = self.check_task.lock().await; ensure!( check_task.is_none(), - crate::error::UnexpectedSnafu { + UnexpectedSnafu { reason: "Flow consistent check task already exists", } ); @@ -293,7 +293,7 @@ impl FlowDualEngine { ensure!( check_task.is_some(), - crate::error::UnexpectedSnafu { + UnexpectedSnafu { reason: "Flow consistent check task does not exist", } ); @@ -363,13 +363,13 @@ impl ConsistentCheckTask { .send((allow_create, allow_drop, tx)) .await .map_err(|_| { - crate::error::UnexpectedSnafu { + UnexpectedSnafu { reason: "Failed to send trigger signal", } .build() })?; rx.await.map_err(|_| { - crate::error::UnexpectedSnafu { + UnexpectedSnafu { reason: "Failed to receive trigger signal", } .build() @@ -379,7 +379,7 @@ impl ConsistentCheckTask { async fn stop(self) -> Result<(), Error> { self.shutdown_tx.send(()).await.map_err(|_| { - crate::error::UnexpectedSnafu { + UnexpectedSnafu { reason: "Failed to send shutdown signal", } .build() @@ -585,7 +585,7 @@ impl FlowEngine for FlowDualEngine { }) .await?; stream_handler.await.map_err(|e| { - crate::error::UnexpectedSnafu { + UnexpectedSnafu { reason: format!("JoinError when handle inserts for flow stream engine: {e:?}"), } .build() @@ -670,12 +670,8 @@ impl common_meta::node_manager::Flownode for FlowDualEngine { ..Default::default() }) } - None => UnexpectedSnafu { - err_msg: "Missing request body", - } - .fail(), - _ => UnexpectedSnafu { - err_msg: "Invalid request body.", + other => common_meta::error::UnexpectedSnafu { + err_msg: format!("Invalid request body: {other:?}"), } .fail(), } @@ -776,11 +772,11 @@ impl common_meta::node_manager::Flownode for FlowWorkerManager { ..Default::default() }) } - None => UnexpectedSnafu { + None => common_meta::error::UnexpectedSnafu { err_msg: "Missing request body", } .fail(), - _ => UnexpectedSnafu { + _ => common_meta::error::UnexpectedSnafu { err_msg: "Invalid request body.", } .fail(), @@ -925,7 +921,7 @@ impl FlowWorkerManager { .copied() .map(FetchFromRow::Idx) .or_else(|| col_default_val.clone().map(FetchFromRow::Default)) - .with_context(|| crate::error::UnexpectedSnafu { + .with_context(|| UnexpectedSnafu { reason: format!( "Column not found: {}, default_value: {:?}", col_name, col_default_val