diff --git a/src/common/meta/src/cache_invalidator.rs b/src/common/meta/src/cache_invalidator.rs index 7c74fa14f7..41098ed69b 100644 --- a/src/common/meta/src/cache_invalidator.rs +++ b/src/common/meta/src/cache_invalidator.rs @@ -16,9 +16,12 @@ use std::sync::Arc; use crate::error::Result; use crate::flow_name::FlowName; -use crate::instruction::CacheIdent; +use crate::instruction::{CacheIdent, DropFlow}; use crate::key::flow::flow_info::FlowInfoKey; use crate::key::flow::flow_name::FlowNameKey; +use crate::key::flow::flow_route::FlowRouteKey; +use crate::key::flow::flownode_flow::FlownodeFlowKey; +use crate::key::flow::table_flow::TableFlowKey; use crate::key::schema_name::SchemaNameKey; use crate::key::table_info::TableInfoKey; use crate::key::table_name::TableNameKey; @@ -89,9 +92,40 @@ where let key: SchemaNameKey = schema_name.into(); self.invalidate_key(&key.to_bytes()).await; } - CacheIdent::CreateFlow(_) | CacheIdent::DropFlow(_) => { + CacheIdent::CreateFlow(_) => { // Do nothing } + CacheIdent::DropFlow(DropFlow { + flow_id, + source_table_ids, + flow_part2node_id, + }) => { + // invalidate flow route/flownode flow/table flow + let mut keys = Vec::with_capacity( + source_table_ids.len() * flow_part2node_id.len() + + flow_part2node_id.len() * 2, + ); + for table_id in source_table_ids { + for (partition_id, node_id) in flow_part2node_id { + let key = + TableFlowKey::new(*table_id, *node_id, *flow_id, *partition_id) + .to_bytes(); + keys.push(key); + } + } + + for (partition_id, node_id) in flow_part2node_id { + let key = + FlownodeFlowKey::new(*node_id, *flow_id, *partition_id).to_bytes(); + keys.push(key); + let key = FlowRouteKey::new(*flow_id, *partition_id).to_bytes(); + keys.push(key); + } + + for key in keys { + self.invalidate_key(&key).await; + } + } CacheIdent::FlowName(FlowName { catalog_name, flow_name, diff --git a/src/common/meta/src/key/flow.rs b/src/common/meta/src/key/flow.rs index 62123e8cf4..9db5205448 100644 --- a/src/common/meta/src/key/flow.rs +++ b/src/common/meta/src/key/flow.rs @@ -45,7 +45,7 @@ use crate::kv_backend::KvBackendRef; use crate::rpc::store::BatchDeleteRequest; /// The key of `__flow/` scope. -#[derive(Debug, PartialEq)] +#[derive(Debug, Clone, PartialEq)] pub struct FlowScoped { inner: T, } diff --git a/src/common/meta/src/key/flow/flow_info.rs b/src/common/meta/src/key/flow/flow_info.rs index 280b2de6af..d75eb6efe3 100644 --- a/src/common/meta/src/key/flow/flow_info.rs +++ b/src/common/meta/src/key/flow/flow_info.rs @@ -153,6 +153,15 @@ impl FlowInfoValue { &self.flownode_ids } + /// Insert a new flownode id for a partition. + pub fn insert_flownode_id( + &mut self, + partition: FlowPartitionId, + node: FlownodeId, + ) -> Option { + self.flownode_ids.insert(partition, node) + } + /// Returns the `source_table`. pub fn source_table_ids(&self) -> &[TableId] { &self.source_table_ids diff --git a/src/common/meta/src/key/flow/flow_route.rs b/src/common/meta/src/key/flow/flow_route.rs index 9aead7a9f9..83102f0d49 100644 --- a/src/common/meta/src/key/flow/flow_route.rs +++ b/src/common/meta/src/key/flow/flow_route.rs @@ -42,7 +42,7 @@ lazy_static! { /// The key stores the route info of the flow. /// /// The layout: `__flow/route/{flow_id}/{partition_id}`. -#[derive(Debug, PartialEq)] +#[derive(Debug, Clone, PartialEq)] pub struct FlowRouteKey(FlowScoped); impl FlowRouteKey { @@ -145,6 +145,12 @@ pub struct FlowRouteValue { pub(crate) peer: Peer, } +impl From for FlowRouteValue { + fn from(peer: Peer) -> Self { + Self { peer } + } +} + impl FlowRouteValue { /// Returns the `peer`. pub fn peer(&self) -> &Peer { diff --git a/src/common/meta/src/key/flow/flownode_flow.rs b/src/common/meta/src/key/flow/flownode_flow.rs index 360adeb349..e7924ad702 100644 --- a/src/common/meta/src/key/flow/flownode_flow.rs +++ b/src/common/meta/src/key/flow/flownode_flow.rs @@ -166,6 +166,17 @@ impl FlownodeFlowManager { Self { kv_backend } } + /// Whether given flow exist on this flownode. + pub async fn exists( + &self, + flownode_id: FlownodeId, + flow_id: FlowId, + partition_id: FlowPartitionId, + ) -> Result { + let key = FlownodeFlowKey::new(flownode_id, flow_id, partition_id).to_bytes(); + Ok(self.kv_backend.get(&key).await?.is_some()) + } + /// Retrieves all [FlowId] and [FlowPartitionId]s of the specified `flownode_id`. pub fn flows( &self, diff --git a/src/flow/src/adapter/flownode_impl.rs b/src/flow/src/adapter/flownode_impl.rs index 99e1f7c179..b42ae2c29e 100644 --- a/src/flow/src/adapter/flownode_impl.rs +++ b/src/flow/src/adapter/flownode_impl.rs @@ -826,9 +826,17 @@ fn to_meta_err( location: snafu::Location, ) -> impl FnOnce(crate::error::Error) -> common_meta::error::Error { move |err: crate::error::Error| -> common_meta::error::Error { - common_meta::error::Error::External { - location, - source: BoxedError::new(err), + match err { + crate::error::Error::FlowNotFound { id, .. } => { + common_meta::error::Error::FlowNotFound { + flow_name: format!("flow_id={id}"), + location, + } + } + _ => common_meta::error::Error::External { + location, + source: BoxedError::new(err), + }, } } } diff --git a/src/flow/src/batching_mode/engine.rs b/src/flow/src/batching_mode/engine.rs index d98e929f16..06bd4a4d3c 100644 --- a/src/flow/src/batching_mode/engine.rs +++ b/src/flow/src/batching_mode/engine.rs @@ -39,7 +39,8 @@ use crate::batching_mode::time_window::{find_time_window_expr, TimeWindowExpr}; use crate::batching_mode::utils::sql_to_df_plan; use crate::engine::FlowEngine; use crate::error::{ - ExternalSnafu, FlowAlreadyExistSnafu, TableNotFoundMetaSnafu, UnexpectedSnafu, UnsupportedSnafu, + ExternalSnafu, FlowAlreadyExistSnafu, FlowNotFoundSnafu, TableNotFoundMetaSnafu, + UnexpectedSnafu, UnsupportedSnafu, }; use crate::{CreateFlowArgs, Error, FlowId, TableName}; @@ -349,7 +350,8 @@ impl BatchingEngine { pub async fn remove_flow_inner(&self, flow_id: FlowId) -> Result<(), Error> { if self.tasks.write().await.remove(&flow_id).is_none() { - warn!("Flow {flow_id} not found in tasks") + warn!("Flow {flow_id} not found in tasks"); + FlowNotFoundSnafu { id: flow_id }.fail()?; } let Some(tx) = self.shutdown_txs.write().await.remove(&flow_id) else { UnexpectedSnafu { @@ -366,9 +368,7 @@ impl BatchingEngine { pub async fn flush_flow_inner(&self, flow_id: FlowId) -> Result { debug!("Try flush flow {flow_id}"); let task = self.tasks.read().await.get(&flow_id).cloned(); - let task = task.with_context(|| UnexpectedSnafu { - reason: format!("Can't found task for flow {flow_id}"), - })?; + let task = task.with_context(|| FlowNotFoundSnafu { id: flow_id })?; task.mark_all_windows_as_dirty()?; diff --git a/src/flow/src/error.rs b/src/flow/src/error.rs index 9bb4a8ffd9..15b8a53ab4 100644 --- a/src/flow/src/error.rs +++ b/src/flow/src/error.rs @@ -321,8 +321,8 @@ impl ErrorExt for Error { Self::FlowAlreadyExist { .. } => StatusCode::TableAlreadyExists, Self::TableNotFound { .. } | Self::TableNotFoundMeta { .. } - | Self::FlowNotFound { .. } | Self::ListFlows { .. } => StatusCode::TableNotFound, + Self::FlowNotFound { .. } => StatusCode::FlowNotFound, Self::Plan { .. } | Self::Datatypes { .. } => StatusCode::PlanQuery, Self::CreateFlow { .. } | Self::Arrow { .. } | Self::Time { .. } => { StatusCode::EngineExecuteQuery