diff --git a/src/flow/src/adapter/flownode_impl.rs b/src/flow/src/adapter/flownode_impl.rs index a060b4a1f7..be16c38cb6 100644 --- a/src/flow/src/adapter/flownode_impl.rs +++ b/src/flow/src/adapter/flownode_impl.rs @@ -40,7 +40,8 @@ use crate::batching_mode::engine::BatchingEngine; use crate::engine::FlowEngine; use crate::error::{ CreateFlowSnafu, ExternalSnafu, FlowNotFoundSnafu, IllegalCheckTaskStateSnafu, - InsertIntoFlowSnafu, InternalSnafu, ListFlowsSnafu, SyncCheckTaskSnafu, UnexpectedSnafu, + InsertIntoFlowSnafu, InternalSnafu, JoinTaskSnafu, ListFlowsSnafu, SyncCheckTaskSnafu, + UnexpectedSnafu, }; use crate::metrics::METRIC_FLOW_TASK_COUNT; use crate::repr::{self, DiffRow}; @@ -585,12 +586,7 @@ impl FlowEngine for FlowDualEngine { requests: to_batch_engine, }) .await?; - stream_handler.await.map_err(|e| { - UnexpectedSnafu { - reason: format!("JoinError when handle inserts for flow stream engine: {e:?}"), - } - .build() - })??; + stream_handler.await.context(JoinTaskSnafu)??; Ok(()) }