diff --git a/src/common/meta/src/cache/flow/table_flownode.rs b/src/common/meta/src/cache/flow/table_flownode.rs index b285088822..c2ccd9fd72 100644 --- a/src/common/meta/src/cache/flow/table_flownode.rs +++ b/src/common/meta/src/cache/flow/table_flownode.rs @@ -24,21 +24,39 @@ use crate::cache::{CacheContainer, Initializer}; use crate::error::Result; use crate::instruction::{CacheIdent, CreateFlow, DropFlow}; use crate::key::flow::{TableFlowManager, TableFlowManagerRef}; +use crate::key::{FlowId, FlowPartitionId}; use crate::kv_backend::KvBackendRef; use crate::peer::Peer; -use crate::FlownodeId; -type FlownodeSet = Arc>; +/// Flow id&flow partition key +#[derive(Debug, Clone, PartialEq, Eq, Hash)] +pub struct FlowIdent { + pub flow_id: FlowId, + pub partition_id: FlowPartitionId, +} + +impl FlowIdent { + pub fn new(flow_id: FlowId, partition_id: FlowPartitionId) -> Self { + Self { + flow_id, + partition_id, + } + } +} + +/// cache for TableFlowManager, the table_id part is in the outer cache +/// include flownode_id, flow_id, partition_id mapping to Peer +type FlownodeFlowSet = Arc>; pub type TableFlownodeSetCacheRef = Arc; /// [TableFlownodeSetCache] caches the [TableId] to [FlownodeSet] mapping. -pub type TableFlownodeSetCache = CacheContainer; +pub type TableFlownodeSetCache = CacheContainer; /// Constructs a [TableFlownodeSetCache]. pub fn new_table_flownode_set_cache( name: String, - cache: Cache, + cache: Cache, kv_backend: KvBackendRef, ) -> TableFlownodeSetCache { let table_flow_manager = Arc::new(TableFlowManager::new(kv_backend)); @@ -47,7 +65,7 @@ pub fn new_table_flownode_set_cache( CacheContainer::new(name, cache, Box::new(invalidator), init, filter) } -fn init_factory(table_flow_manager: TableFlowManagerRef) -> Initializer { +fn init_factory(table_flow_manager: TableFlowManagerRef) -> Initializer { Arc::new(move |&table_id| { let table_flow_manager = table_flow_manager.clone(); Box::pin(async move { @@ -57,7 +75,12 @@ fn init_factory(table_flow_manager: TableFlowManagerRef) -> Initializer>() }) // We must cache the `HashSet` even if it's empty, @@ -71,26 +94,33 @@ fn init_factory(table_flow_manager: TableFlowManagerRef) -> Initializer, + cache: &Cache, CreateFlow { + flow_id, source_table_ids, - flownodes: flownode_peers, + partition_to_peer_mapping: flow_part2nodes, }: &CreateFlow, ) { for table_id in source_table_ids { let entry = cache.entry(*table_id); entry .and_compute_with( - async |entry: Option>>>| match entry { + async |entry: Option>| match entry { Some(entry) => { let mut map = entry.into_value().as_ref().clone(); - map.extend(flownode_peers.iter().map(|peer| (peer.id, peer.clone()))); + map.extend( + flow_part2nodes.iter().map(|(part, peer)| { + (FlowIdent::new(*flow_id, *part), peer.clone()) + }), + ); Op::Put(Arc::new(map)) } - None => Op::Put(Arc::new(HashMap::from_iter( - flownode_peers.iter().map(|peer| (peer.id, peer.clone())), - ))), + None => { + Op::Put(Arc::new(HashMap::from_iter(flow_part2nodes.iter().map( + |(part, peer)| (FlowIdent::new(*flow_id, *part), peer.clone()), + )))) + } }, ) .await; @@ -98,21 +128,23 @@ async fn handle_create_flow( } async fn handle_drop_flow( - cache: &Cache, + cache: &Cache, DropFlow { + flow_id, source_table_ids, - flownode_ids, + flow_part2node_id, }: &DropFlow, ) { for table_id in source_table_ids { let entry = cache.entry(*table_id); entry .and_compute_with( - async |entry: Option>>>| match entry { + async |entry: Option>| match entry { Some(entry) => { let mut set = entry.into_value().as_ref().clone(); - for flownode_id in flownode_ids { - set.remove(flownode_id); + for (part, _node) in flow_part2node_id { + let key = FlowIdent::new(*flow_id, *part); + set.remove(&key); } Op::Put(Arc::new(set)) @@ -128,7 +160,7 @@ async fn handle_drop_flow( } fn invalidator<'a>( - cache: &'a Cache, + cache: &'a Cache, ident: &'a CacheIdent, ) -> BoxFuture<'a, Result<()>> { Box::pin(async move { @@ -154,7 +186,7 @@ mod tests { use moka::future::CacheBuilder; use table::table_name::TableName; - use crate::cache::flow::table_flownode::new_table_flownode_set_cache; + use crate::cache::flow::table_flownode::{new_table_flownode_set_cache, FlowIdent}; use crate::instruction::{CacheIdent, CreateFlow, DropFlow}; use crate::key::flow::flow_info::FlowInfoValue; use crate::key::flow::flow_route::FlowRouteValue; @@ -214,12 +246,16 @@ mod tests { let set = cache.get(1024).await.unwrap().unwrap(); assert_eq!( set.as_ref().clone(), - HashMap::from_iter((1..=3).map(|i| { (i, Peer::empty(i),) })) + HashMap::from_iter( + (1..=3).map(|i| { (FlowIdent::new(1024, (i - 1) as u32), Peer::empty(i),) }) + ) ); let set = cache.get(1025).await.unwrap().unwrap(); assert_eq!( set.as_ref().clone(), - HashMap::from_iter((1..=3).map(|i| { (i, Peer::empty(i),) })) + HashMap::from_iter( + (1..=3).map(|i| { (FlowIdent::new(1024, (i - 1) as u32), Peer::empty(i),) }) + ) ); let result = cache.get(1026).await.unwrap().unwrap(); assert_eq!(result.len(), 0); @@ -231,8 +267,9 @@ mod tests { let cache = CacheBuilder::new(128).build(); let cache = new_table_flownode_set_cache("test".to_string(), cache, mem_kv); let ident = vec![CacheIdent::CreateFlow(CreateFlow { + flow_id: 2001, source_table_ids: vec![1024, 1025], - flownodes: (1..=5).map(Peer::empty).collect(), + partition_to_peer_mapping: (1..=5).map(|i| (i as u32, Peer::empty(i + 1))).collect(), })]; cache.invalidate(&ident).await.unwrap(); let set = cache.get(1024).await.unwrap().unwrap(); @@ -241,6 +278,54 @@ mod tests { assert_eq!(set.len(), 5); } + #[tokio::test] + async fn test_replace_flow() { + let mem_kv = Arc::new(MemoryKvBackend::default()); + let cache = CacheBuilder::new(128).build(); + let cache = new_table_flownode_set_cache("test".to_string(), cache, mem_kv); + let ident = vec![CacheIdent::CreateFlow(CreateFlow { + flow_id: 2001, + source_table_ids: vec![1024, 1025], + partition_to_peer_mapping: (1..=5).map(|i| (i as u32, Peer::empty(i + 1))).collect(), + })]; + cache.invalidate(&ident).await.unwrap(); + let set = cache.get(1024).await.unwrap().unwrap(); + assert_eq!(set.len(), 5); + let set = cache.get(1025).await.unwrap().unwrap(); + assert_eq!(set.len(), 5); + + let drop_then_create_flow = vec![ + CacheIdent::DropFlow(DropFlow { + flow_id: 2001, + source_table_ids: vec![1024, 1025], + flow_part2node_id: (1..=5).map(|i| (i as u32, i + 1)).collect(), + }), + CacheIdent::CreateFlow(CreateFlow { + flow_id: 2001, + source_table_ids: vec![1026, 1027], + partition_to_peer_mapping: (11..=15) + .map(|i| (i as u32, Peer::empty(i + 1))) + .collect(), + }), + CacheIdent::FlowId(2001), + ]; + cache.invalidate(&drop_then_create_flow).await.unwrap(); + + let set = cache.get(1024).await.unwrap().unwrap(); + assert!(set.is_empty()); + + let expected = HashMap::from_iter( + (11..=15).map(|i| (FlowIdent::new(2001, i as u32), Peer::empty(i + 1))), + ); + let set = cache.get(1026).await.unwrap().unwrap(); + + assert_eq!(set.as_ref().clone(), expected); + + let set = cache.get(1027).await.unwrap().unwrap(); + + assert_eq!(set.as_ref().clone(), expected); + } + #[tokio::test] async fn test_drop_flow() { let mem_kv = Arc::new(MemoryKvBackend::default()); @@ -248,34 +333,57 @@ mod tests { let cache = new_table_flownode_set_cache("test".to_string(), cache, mem_kv); let ident = vec![ CacheIdent::CreateFlow(CreateFlow { + flow_id: 2001, source_table_ids: vec![1024, 1025], - flownodes: (1..=5).map(Peer::empty).collect(), + partition_to_peer_mapping: (1..=5) + .map(|i| (i as u32, Peer::empty(i + 1))) + .collect(), }), CacheIdent::CreateFlow(CreateFlow { + flow_id: 2002, source_table_ids: vec![1024, 1025], - flownodes: (11..=12).map(Peer::empty).collect(), + partition_to_peer_mapping: (11..=12) + .map(|i| (i as u32, Peer::empty(i + 1))) + .collect(), + }), + // same flownode that hold multiple flows + CacheIdent::CreateFlow(CreateFlow { + flow_id: 2003, + source_table_ids: vec![1024, 1025], + partition_to_peer_mapping: (1..=5) + .map(|i| (i as u32, Peer::empty(i + 1))) + .collect(), }), ]; cache.invalidate(&ident).await.unwrap(); let set = cache.get(1024).await.unwrap().unwrap(); - assert_eq!(set.len(), 7); + assert_eq!(set.len(), 12); let set = cache.get(1025).await.unwrap().unwrap(); - assert_eq!(set.len(), 7); + assert_eq!(set.len(), 12); let ident = vec![CacheIdent::DropFlow(DropFlow { + flow_id: 2001, source_table_ids: vec![1024, 1025], - flownode_ids: vec![1, 2, 3, 4, 5], + flow_part2node_id: (1..=5).map(|i| (i as u32, i + 1)).collect(), })]; cache.invalidate(&ident).await.unwrap(); let set = cache.get(1024).await.unwrap().unwrap(); assert_eq!( set.as_ref().clone(), - HashMap::from_iter((11..=12).map(|i| { (i, Peer::empty(i),) })) + HashMap::from_iter( + (11..=12) + .map(|i| (FlowIdent::new(2002, i as u32), Peer::empty(i + 1))) + .chain((1..=5).map(|i| (FlowIdent::new(2003, i as u32), Peer::empty(i + 1)))) + ) ); let set = cache.get(1025).await.unwrap().unwrap(); assert_eq!( set.as_ref().clone(), - HashMap::from_iter((11..=12).map(|i| { (i, Peer::empty(i),) })) + HashMap::from_iter( + (11..=12) + .map(|i| (FlowIdent::new(2002, i as u32), Peer::empty(i + 1))) + .chain((1..=5).map(|i| (FlowIdent::new(2003, i as u32), Peer::empty(i + 1)))) + ) ); } } diff --git a/src/common/meta/src/ddl/create_flow.rs b/src/common/meta/src/ddl/create_flow.rs index 278a3a6c9e..191bc59259 100644 --- a/src/common/meta/src/ddl/create_flow.rs +++ b/src/common/meta/src/ddl/create_flow.rs @@ -39,7 +39,7 @@ use crate::cache_invalidator::Context; use crate::ddl::utils::{add_peer_context_if_needed, handle_retry_error}; use crate::ddl::DdlContext; use crate::error::{self, Result, UnexpectedSnafu}; -use crate::instruction::{CacheIdent, CreateFlow}; +use crate::instruction::{CacheIdent, CreateFlow, DropFlow}; use crate::key::flow::flow_info::FlowInfoValue; use crate::key::flow::flow_route::FlowRouteValue; use crate::key::table_name::TableNameKey; @@ -70,6 +70,7 @@ impl CreateFlowProcedure { query_context, state: CreateFlowState::Prepare, prev_flow_info_value: None, + did_replace: false, flow_type: None, }, } @@ -224,6 +225,7 @@ impl CreateFlowProcedure { .update_flow_metadata(flow_id, prev_flow_value, &flow_info, flow_routes) .await?; info!("Replaced flow metadata for flow {flow_id}"); + self.data.did_replace = true; } else { self.context .flow_metadata_manager @@ -240,22 +242,43 @@ impl CreateFlowProcedure { debug_assert!(self.data.state == CreateFlowState::InvalidateFlowCache); // Safety: The flow id must be allocated. let flow_id = self.data.flow_id.unwrap(); + let did_replace = self.data.did_replace; let ctx = Context { subject: Some("Invalidate flow cache by creating flow".to_string()), }; + let mut caches = vec![]; + + // if did replaced, invalidate the flow cache with drop the old flow + if did_replace { + let old_flow_info = self.data.prev_flow_info_value.as_ref().unwrap(); + + // only drop flow is needed, since flow name haven't changed, and flow id already invalidated below + caches.extend([CacheIdent::DropFlow(DropFlow { + flow_id, + source_table_ids: old_flow_info.source_table_ids.clone(), + flow_part2node_id: old_flow_info.flownode_ids().clone().into_iter().collect(), + })]); + } + + let (_flow_info, flow_routes) = (&self.data).into(); + let flow_part2peers = flow_routes + .into_iter() + .map(|(part_id, route)| (part_id, route.peer)) + .collect(); + + caches.extend([ + CacheIdent::CreateFlow(CreateFlow { + flow_id, + source_table_ids: self.data.source_table_ids.clone(), + partition_to_peer_mapping: flow_part2peers, + }), + CacheIdent::FlowId(flow_id), + ]); + self.context .cache_invalidator - .invalidate( - &ctx, - &[ - CacheIdent::CreateFlow(CreateFlow { - source_table_ids: self.data.source_table_ids.clone(), - flownodes: self.data.peers.clone(), - }), - CacheIdent::FlowId(flow_id), - ], - ) + .invalidate(&ctx, &caches) .await?; Ok(Status::done_with_output(flow_id)) @@ -377,6 +400,10 @@ pub struct CreateFlowData { /// For verify if prev value is consistent when need to update flow metadata. /// only set when `or_replace` is true. pub(crate) prev_flow_info_value: Option>, + /// Only set to true when replace actually happened. + /// This is used to determine whether to invalidate the cache. + #[serde(default)] + pub(crate) did_replace: bool, pub(crate) flow_type: Option, } diff --git a/src/common/meta/src/ddl/drop_flow.rs b/src/common/meta/src/ddl/drop_flow.rs index d1c73c06d0..b5058754ec 100644 --- a/src/common/meta/src/ddl/drop_flow.rs +++ b/src/common/meta/src/ddl/drop_flow.rs @@ -13,6 +13,7 @@ // limitations under the License. mod metadata; + use api::v1::flow::{flow_request, DropRequest, FlowRequest}; use async_trait::async_trait; use common_catalog::format_full_flow_name; @@ -153,6 +154,12 @@ impl DropFlowProcedure { }; let flow_info_value = self.data.flow_info_value.as_ref().unwrap(); + let flow_part2nodes = flow_info_value + .flownode_ids() + .clone() + .into_iter() + .collect::>(); + self.context .cache_invalidator .invalidate( @@ -164,8 +171,9 @@ impl DropFlowProcedure { flow_name: flow_info_value.flow_name.to_string(), }), CacheIdent::DropFlow(DropFlow { + flow_id, source_table_ids: flow_info_value.source_table_ids.clone(), - flownode_ids: flow_info_value.flownode_ids.values().cloned().collect(), + flow_part2node_id: flow_part2nodes, }), ], ) diff --git a/src/common/meta/src/instruction.rs b/src/common/meta/src/instruction.rs index 5e00437332..077cd2691f 100644 --- a/src/common/meta/src/instruction.rs +++ b/src/common/meta/src/instruction.rs @@ -24,7 +24,7 @@ use table::table_name::TableName; use crate::flow_name::FlowName; use crate::key::schema_name::SchemaName; -use crate::key::FlowId; +use crate::key::{FlowId, FlowPartitionId}; use crate::peer::Peer; use crate::{DatanodeId, FlownodeId}; @@ -184,14 +184,19 @@ pub enum CacheIdent { #[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)] pub struct CreateFlow { + /// The unique identifier for the flow. + pub flow_id: FlowId, pub source_table_ids: Vec, - pub flownodes: Vec, + /// Mapping of flow partition to peer information + pub partition_to_peer_mapping: Vec<(FlowPartitionId, Peer)>, } #[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)] pub struct DropFlow { + pub flow_id: FlowId, pub source_table_ids: Vec, - pub flownode_ids: Vec, + /// Mapping of flow partition to flownode id + pub flow_part2node_id: Vec<(FlowPartitionId, FlownodeId)>, } /// Flushes a batch of regions. diff --git a/src/common/meta/src/key/flow.rs b/src/common/meta/src/key/flow.rs index bc66c08a9d..62123e8cf4 100644 --- a/src/common/meta/src/key/flow.rs +++ b/src/common/meta/src/key/flow.rs @@ -246,27 +246,32 @@ impl FlowMetadataManager { new_flow_info: &FlowInfoValue, flow_routes: Vec<(FlowPartitionId, FlowRouteValue)>, ) -> Result<()> { - let (create_flow_flow_name_txn, on_create_flow_flow_name_failure) = + let (update_flow_flow_name_txn, on_create_flow_flow_name_failure) = self.flow_name_manager.build_update_txn( &new_flow_info.catalog_name, &new_flow_info.flow_name, flow_id, )?; - let (create_flow_txn, on_create_flow_failure) = + let (update_flow_txn, on_create_flow_failure) = self.flow_info_manager .build_update_txn(flow_id, current_flow_info, new_flow_info)?; - let create_flow_routes_txn = self - .flow_route_manager - .build_create_txn(flow_id, flow_routes.clone())?; - - let create_flownode_flow_txn = self - .flownode_flow_manager - .build_create_txn(flow_id, new_flow_info.flownode_ids().clone()); - - let create_table_flow_txn = self.table_flow_manager.build_create_txn( + let update_flow_routes_txn = self.flow_route_manager.build_update_txn( flow_id, + current_flow_info, + flow_routes.clone(), + )?; + + let update_flownode_flow_txn = self.flownode_flow_manager.build_update_txn( + flow_id, + current_flow_info, + new_flow_info.flownode_ids().clone(), + ); + + let update_table_flow_txn = self.table_flow_manager.build_update_txn( + flow_id, + current_flow_info, flow_routes .into_iter() .map(|(partition_id, route)| (partition_id, TableFlowValue { peer: route.peer })) @@ -275,11 +280,11 @@ impl FlowMetadataManager { )?; let txn = Txn::merge_all(vec![ - create_flow_flow_name_txn, - create_flow_txn, - create_flow_routes_txn, - create_flownode_flow_txn, - create_table_flow_txn, + update_flow_flow_name_txn, + update_flow_txn, + update_flow_routes_txn, + update_flownode_flow_txn, + update_table_flow_txn, ]); info!( "Creating flow {}.{}({}), with {} txn operations", @@ -783,6 +788,141 @@ mod tests { } } + #[tokio::test] + async fn test_update_flow_metadata_diff_flownode() { + let mem_kv = Arc::new(MemoryKvBackend::default()); + let flow_metadata_manager = FlowMetadataManager::new(mem_kv.clone()); + let flow_id = 10; + let flow_value = test_flow_info_value( + "flow", + [(0u32, 1u64), (1u32, 2u64)].into(), + vec![1024, 1025, 1026], + ); + let flow_routes = vec![ + ( + 0u32, + FlowRouteValue { + peer: Peer::empty(1), + }, + ), + ( + 1, + FlowRouteValue { + peer: Peer::empty(2), + }, + ), + ]; + flow_metadata_manager + .create_flow_metadata(flow_id, flow_value.clone(), flow_routes.clone()) + .await + .unwrap(); + + let new_flow_value = { + let mut tmp = flow_value.clone(); + tmp.raw_sql = "new".to_string(); + // move to different flownodes + tmp.flownode_ids = [(0, 3u64), (1, 4u64)].into(); + tmp + }; + let new_flow_routes = vec![ + ( + 0u32, + FlowRouteValue { + peer: Peer::empty(3), + }, + ), + ( + 1, + FlowRouteValue { + peer: Peer::empty(4), + }, + ), + ]; + + // Update flow instead + flow_metadata_manager + .update_flow_metadata( + flow_id, + &DeserializedValueWithBytes::from_inner(flow_value.clone()), + &new_flow_value, + new_flow_routes.clone(), + ) + .await + .unwrap(); + + let got = flow_metadata_manager + .flow_info_manager() + .get(flow_id) + .await + .unwrap() + .unwrap(); + let routes = flow_metadata_manager + .flow_route_manager() + .routes(flow_id) + .await + .unwrap(); + assert_eq!( + routes, + vec![ + ( + FlowRouteKey::new(flow_id, 0), + FlowRouteValue { + peer: Peer::empty(3), + }, + ), + ( + FlowRouteKey::new(flow_id, 1), + FlowRouteValue { + peer: Peer::empty(4), + }, + ), + ] + ); + assert_eq!(got, new_flow_value); + + let flows = flow_metadata_manager + .flownode_flow_manager() + .flows(1) + .try_collect::>() + .await + .unwrap(); + // should moved to different flownode + assert_eq!(flows, vec![]); + + let flows = flow_metadata_manager + .flownode_flow_manager() + .flows(3) + .try_collect::>() + .await + .unwrap(); + assert_eq!(flows, vec![(flow_id, 0)]); + + for table_id in [1024, 1025, 1026] { + let nodes = flow_metadata_manager + .table_flow_manager() + .flows(table_id) + .await + .unwrap(); + assert_eq!( + nodes, + vec![ + ( + TableFlowKey::new(table_id, 3, flow_id, 0), + TableFlowValue { + peer: Peer::empty(3) + } + ), + ( + TableFlowKey::new(table_id, 4, flow_id, 1), + TableFlowValue { + peer: Peer::empty(4) + } + ) + ] + ); + } + } + #[tokio::test] async fn test_update_flow_metadata_flow_replace_diff_id_err() { let mem_kv = Arc::new(MemoryKvBackend::default()); diff --git a/src/common/meta/src/key/flow/flow_info.rs b/src/common/meta/src/key/flow/flow_info.rs index 1ed3f1e6f4..280b2de6af 100644 --- a/src/common/meta/src/key/flow/flow_info.rs +++ b/src/common/meta/src/key/flow/flow_info.rs @@ -272,10 +272,11 @@ impl FlowInfoManager { let raw_value = new_flow_value.try_as_raw_value()?; let prev_value = current_flow_value.get_raw_bytes(); let txn = Txn::new() - .when(vec![ - Compare::new(key.clone(), CompareOp::NotEqual, None), - Compare::new(key.clone(), CompareOp::Equal, Some(prev_value)), - ]) + .when(vec![Compare::new( + key.clone(), + CompareOp::Equal, + Some(prev_value), + )]) .and_then(vec![TxnOp::Put(key.clone(), raw_value)]) .or_else(vec![TxnOp::Get(key.clone())]); diff --git a/src/common/meta/src/key/flow/flow_route.rs b/src/common/meta/src/key/flow/flow_route.rs index 1459675686..c1e43957f0 100644 --- a/src/common/meta/src/key/flow/flow_route.rs +++ b/src/common/meta/src/key/flow/flow_route.rs @@ -19,9 +19,12 @@ use serde::{Deserialize, Serialize}; use snafu::OptionExt; use crate::error::{self, Result}; +use crate::key::flow::flow_info::FlowInfoValue; use crate::key::flow::{flownode_addr_helper, FlowScoped}; use crate::key::node_address::NodeAddressKey; -use crate::key::{BytesAdapter, FlowId, FlowPartitionId, MetadataKey, MetadataValue}; +use crate::key::{ + BytesAdapter, DeserializedValueWithBytes, FlowId, FlowPartitionId, MetadataKey, MetadataValue, +}; use crate::kv_backend::txn::{Txn, TxnOp}; use crate::kv_backend::KvBackendRef; use crate::peer::Peer; @@ -204,6 +207,33 @@ impl FlowRouteManager { Ok(Txn::new().and_then(txns)) } + /// Builds a update flow routes transaction. + /// + /// Puts `__flow/route/{flow_id}/{partition_id}` keys. + /// Also removes `__flow/route/{flow_id}/{old_partition_id}` keys. + pub(crate) fn build_update_txn>( + &self, + flow_id: FlowId, + current_flow_info: &DeserializedValueWithBytes, + flow_routes: I, + ) -> Result { + let del_txns = current_flow_info + .flownode_ids() + .iter() + .map(|(partition_id, _)| { + let key = FlowRouteKey::new(flow_id, *partition_id).to_bytes(); + Ok(TxnOp::Delete(key)) + }); + + let put_txns = flow_routes.into_iter().map(|(partition_id, route)| { + let key = FlowRouteKey::new(flow_id, partition_id).to_bytes(); + + Ok(TxnOp::Put(key, route.try_as_raw_value()?)) + }); + let txns = del_txns.chain(put_txns).collect::>>()?; + Ok(Txn::new().and_then(txns)) + } + async fn remap_flow_route_addresses( &self, flow_routes: &mut [(FlowRouteKey, FlowRouteValue)], diff --git a/src/common/meta/src/key/flow/flownode_flow.rs b/src/common/meta/src/key/flow/flownode_flow.rs index 6d987c7f4a..360adeb349 100644 --- a/src/common/meta/src/key/flow/flownode_flow.rs +++ b/src/common/meta/src/key/flow/flownode_flow.rs @@ -19,8 +19,9 @@ use regex::Regex; use snafu::OptionExt; use crate::error::{self, Result}; +use crate::key::flow::flow_info::FlowInfoValue; use crate::key::flow::FlowScoped; -use crate::key::{BytesAdapter, FlowId, FlowPartitionId, MetadataKey}; +use crate::key::{BytesAdapter, DeserializedValueWithBytes, FlowId, FlowPartitionId, MetadataKey}; use crate::kv_backend::txn::{Txn, TxnOp}; use crate::kv_backend::KvBackendRef; use crate::range_stream::{PaginationStream, DEFAULT_PAGE_SIZE}; @@ -202,6 +203,33 @@ impl FlownodeFlowManager { Txn::new().and_then(txns) } + + /// Builds a update flownode flow transaction. + /// + /// Puts `__flownode_flow/{flownode_id}/{flow_id}/{partition_id}` keys. + /// Remove the old `__flownode_flow/{old_flownode_id}/{flow_id}/{old_partition_id}` keys. + pub(crate) fn build_update_txn>( + &self, + flow_id: FlowId, + current_flow_info: &DeserializedValueWithBytes, + flownode_ids: I, + ) -> Txn { + let del_txns = + current_flow_info + .flownode_ids() + .iter() + .map(|(partition_id, flownode_id)| { + let key = FlownodeFlowKey::new(*flownode_id, flow_id, *partition_id).to_bytes(); + TxnOp::Delete(key) + }); + let put_txns = flownode_ids.into_iter().map(|(partition_id, flownode_id)| { + let key = FlownodeFlowKey::new(flownode_id, flow_id, partition_id).to_bytes(); + TxnOp::Put(key, vec![]) + }); + let txns = del_txns.chain(put_txns).collect::>(); + + Txn::new().and_then(txns) + } } #[cfg(test)] diff --git a/src/common/meta/src/key/flow/table_flow.rs b/src/common/meta/src/key/flow/table_flow.rs index fff9a746c8..63f97e6602 100644 --- a/src/common/meta/src/key/flow/table_flow.rs +++ b/src/common/meta/src/key/flow/table_flow.rs @@ -22,9 +22,12 @@ use snafu::OptionExt; use table::metadata::TableId; use crate::error::{self, Result}; +use crate::key::flow::flow_info::FlowInfoValue; use crate::key::flow::{flownode_addr_helper, FlowScoped}; use crate::key::node_address::NodeAddressKey; -use crate::key::{BytesAdapter, FlowId, FlowPartitionId, MetadataKey, MetadataValue}; +use crate::key::{ + BytesAdapter, DeserializedValueWithBytes, FlowId, FlowPartitionId, MetadataKey, MetadataValue, +}; use crate::kv_backend::txn::{Txn, TxnOp}; use crate::kv_backend::KvBackendRef; use crate::peer::Peer; @@ -215,7 +218,7 @@ impl TableFlowManager { /// Builds a create table flow transaction. /// - /// Puts `__flow/source_table/{table_id}/{node_id}/{partition_id}` keys. + /// Puts `__flow/source_table/{table_id}/{node_id}/{flow_id}/{partition_id}` keys. pub fn build_create_txn( &self, flow_id: FlowId, @@ -239,6 +242,44 @@ impl TableFlowManager { Ok(Txn::new().and_then(txns)) } + /// Builds a update table flow transaction. + /// + /// Puts `__flow/source_table/{table_id}/{node_id}/{flow_id}/{partition_id}` keys, + /// Also remove previous + /// `__flow/source_table/{table_id}/{old_node_id}/{flow_id}/{partition_id}` keys. + pub fn build_update_txn( + &self, + flow_id: FlowId, + current_flow_info: &DeserializedValueWithBytes, + table_flow_values: Vec<(FlowPartitionId, TableFlowValue)>, + source_table_ids: &[TableId], + ) -> Result { + let mut txns = Vec::with_capacity(2 * source_table_ids.len() * table_flow_values.len()); + + // first remove the old keys + for (part_id, node_id) in current_flow_info.flownode_ids() { + for source_table_id in current_flow_info.source_table_ids() { + txns.push(TxnOp::Delete( + TableFlowKey::new(*source_table_id, *node_id, flow_id, *part_id).to_bytes(), + )); + } + } + + for (partition_id, table_flow_value) in table_flow_values { + let flownode_id = table_flow_value.peer.id; + let value = table_flow_value.try_as_raw_value()?; + for source_table_id in source_table_ids { + txns.push(TxnOp::Put( + TableFlowKey::new(*source_table_id, flownode_id, flow_id, partition_id) + .to_bytes(), + value.clone(), + )); + } + } + + Ok(Txn::new().and_then(txns)) + } + async fn remap_table_flow_addresses( &self, table_flows: &mut [(TableFlowKey, TableFlowValue)], diff --git a/src/operator/src/insert.rs b/src/operator/src/insert.rs index 2f7c30ccd0..637da8c731 100644 --- a/src/operator/src/insert.rs +++ b/src/operator/src/insert.rs @@ -946,6 +946,7 @@ impl FlowMirrorTask { // already know this is not source table Some(None) => continue, _ => { + // dedup peers let peers = cache .get(table_id) .await @@ -953,6 +954,8 @@ impl FlowMirrorTask { .unwrap_or_default() .values() .cloned() + .collect::>() + .into_iter() .collect::>(); if !peers.is_empty() {