From 4ec247f34da9b0b448ce3f3cd87063ff1256de79 Mon Sep 17 00:00:00 2001 From: Weny Xu Date: Thu, 4 Jul 2024 17:37:23 +0800 Subject: [PATCH] feat: store peer info in `TableFlowValue` (#4280) * feat: store peer info in `TableFlowValue` * chore: apply suggestions from CR --- .../meta/src/cache/flow/table_flownode.rs | 71 +++++++++++++------ src/common/meta/src/ddl/create_flow.rs | 2 +- src/common/meta/src/instruction.rs | 3 +- src/common/meta/src/key.rs | 4 +- src/common/meta/src/key/flow.rs | 34 +++++++-- src/common/meta/src/key/flow/flow_route.rs | 2 +- src/common/meta/src/key/flow/table_flow.rs | 52 +++++++++----- src/operator/src/insert.rs | 4 +- 8 files changed, 120 insertions(+), 52 deletions(-) diff --git a/src/common/meta/src/cache/flow/table_flownode.rs b/src/common/meta/src/cache/flow/table_flownode.rs index b952d056ab..684478c151 100644 --- a/src/common/meta/src/cache/flow/table_flownode.rs +++ b/src/common/meta/src/cache/flow/table_flownode.rs @@ -12,7 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -use std::collections::HashSet; +use std::collections::HashMap; use std::sync::Arc; use futures::future::BoxFuture; @@ -26,9 +26,10 @@ use crate::error::Result; use crate::instruction::{CacheIdent, CreateFlow, DropFlow}; use crate::key::flow::{TableFlowManager, TableFlowManagerRef}; use crate::kv_backend::KvBackendRef; +use crate::peer::Peer; use crate::FlownodeId; -type FlownodeSet = HashSet; +type FlownodeSet = Arc>; pub type TableFlownodeSetCacheRef = Arc; @@ -53,13 +54,14 @@ fn init_factory(table_flow_manager: TableFlowManagerRef) -> Initializer>() + .map_ok(|(key, value)| (key.flownode_id(), value.peer)) + .try_collect::>() .await // We must cache the `HashSet` even if it's empty, // to avoid future requests to the remote storage next time; // If the value is added to the remote storage, // we have a corresponding cache invalidation mechanism to invalidate `(Key, EmptyHashSet)`. + .map(Arc::new) .map(Some) }) }) @@ -69,21 +71,23 @@ async fn handle_create_flow( cache: &Cache, CreateFlow { source_table_ids, - flownode_ids, + flownodes: flownode_peers, }: &CreateFlow, ) { for table_id in source_table_ids { let entry = cache.entry(*table_id); entry .and_compute_with( - async |entry: Option>>| match entry { + async |entry: Option>>>| match entry { Some(entry) => { - let mut set = entry.into_value(); - set.extend(flownode_ids.iter().cloned()); + let mut map = entry.into_value().as_ref().clone(); + map.extend(flownode_peers.iter().map(|peer| (peer.id, peer.clone()))); - Op::Put(set) + Op::Put(Arc::new(map)) } - None => Op::Put(HashSet::from_iter(flownode_ids.iter().cloned())), + None => Op::Put(Arc::new(HashMap::from_iter( + flownode_peers.iter().map(|peer| (peer.id, peer.clone())), + ))), }, ) .await; @@ -101,14 +105,14 @@ async fn handle_drop_flow( let entry = cache.entry(*table_id); entry .and_compute_with( - async |entry: Option>>| match entry { + async |entry: Option>>>| match entry { Some(entry) => { - let mut set = entry.into_value(); + let mut set = entry.into_value().as_ref().clone(); for flownode_id in flownode_ids { set.remove(flownode_id); } - Op::Put(set) + Op::Put(Arc::new(set)) } None => { // Do nothing @@ -140,7 +144,7 @@ fn filter(ident: &CacheIdent) -> bool { #[cfg(test)] mod tests { - use std::collections::{BTreeMap, HashSet}; + use std::collections::{BTreeMap, HashMap}; use std::sync::Arc; use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME}; @@ -150,8 +154,10 @@ mod tests { use crate::cache::flow::table_flownode::new_table_flownode_set_cache; use crate::instruction::{CacheIdent, CreateFlow, DropFlow}; use crate::key::flow::flow_info::FlowInfoValue; + use crate::key::flow::flow_route::FlowRouteValue; use crate::key::flow::FlowMetadataManager; use crate::kv_backend::memory::MemoryKvBackend; + use crate::peer::Peer; #[tokio::test] async fn test_cache_empty_set() { @@ -184,16 +190,31 @@ mod tests { comment: "comment".to_string(), options: Default::default(), }, - vec![], + (1..=3) + .map(|i| { + ( + (i - 1) as u32, + FlowRouteValue { + peer: Peer::empty(i), + }, + ) + }) + .collect::>(), ) .await .unwrap(); let cache = CacheBuilder::new(128).build(); let cache = new_table_flownode_set_cache("test".to_string(), cache, mem_kv); let set = cache.get(1024).await.unwrap().unwrap(); - assert_eq!(set, HashSet::from([1, 2, 3])); + assert_eq!( + set.as_ref().clone(), + HashMap::from_iter((1..=3).map(|i| { (i, Peer::empty(i),) })) + ); let set = cache.get(1025).await.unwrap().unwrap(); - assert_eq!(set, HashSet::from([1, 2, 3])); + assert_eq!( + set.as_ref().clone(), + HashMap::from_iter((1..=3).map(|i| { (i, Peer::empty(i),) })) + ); let result = cache.get(1026).await.unwrap().unwrap(); assert_eq!(result.len(), 0); } @@ -205,7 +226,7 @@ mod tests { let cache = new_table_flownode_set_cache("test".to_string(), cache, mem_kv); let ident = vec![CacheIdent::CreateFlow(CreateFlow { source_table_ids: vec![1024, 1025], - flownode_ids: vec![1, 2, 3, 4, 5], + flownodes: (1..=5).map(Peer::empty).collect(), })]; cache.invalidate(&ident).await.unwrap(); let set = cache.get(1024).await.unwrap().unwrap(); @@ -222,11 +243,11 @@ mod tests { let ident = vec![ CacheIdent::CreateFlow(CreateFlow { source_table_ids: vec![1024, 1025], - flownode_ids: vec![1, 2, 3, 4, 5], + flownodes: (1..=5).map(Peer::empty).collect(), }), CacheIdent::CreateFlow(CreateFlow { source_table_ids: vec![1024, 1025], - flownode_ids: vec![11, 12], + flownodes: (11..=12).map(Peer::empty).collect(), }), ]; cache.invalidate(&ident).await.unwrap(); @@ -241,8 +262,14 @@ mod tests { })]; cache.invalidate(&ident).await.unwrap(); let set = cache.get(1024).await.unwrap().unwrap(); - assert_eq!(set, HashSet::from([11, 12])); + assert_eq!( + set.as_ref().clone(), + HashMap::from_iter((11..=12).map(|i| { (i, Peer::empty(i),) })) + ); let set = cache.get(1025).await.unwrap().unwrap(); - assert_eq!(set, HashSet::from([11, 12])); + assert_eq!( + set.as_ref().clone(), + HashMap::from_iter((11..=12).map(|i| { (i, Peer::empty(i),) })) + ); } } diff --git a/src/common/meta/src/ddl/create_flow.rs b/src/common/meta/src/ddl/create_flow.rs index 2217ccf8d5..d60ee1dfd3 100644 --- a/src/common/meta/src/ddl/create_flow.rs +++ b/src/common/meta/src/ddl/create_flow.rs @@ -194,7 +194,7 @@ impl CreateFlowProcedure { &ctx, &[CacheIdent::CreateFlow(CreateFlow { source_table_ids: self.data.source_table_ids.clone(), - flownode_ids: self.data.peers.iter().map(|peer| peer.id).collect(), + flownodes: self.data.peers.clone(), })], ) .await?; diff --git a/src/common/meta/src/instruction.rs b/src/common/meta/src/instruction.rs index aee1844b0f..fa7126ed3f 100644 --- a/src/common/meta/src/instruction.rs +++ b/src/common/meta/src/instruction.rs @@ -25,6 +25,7 @@ use table::table_name::TableName; use crate::flow_name::FlowName; use crate::key::schema_name::SchemaName; use crate::key::FlowId; +use crate::peer::Peer; use crate::{ClusterId, DatanodeId, FlownodeId}; #[derive(Eq, Hash, PartialEq, Clone, Debug, Serialize, Deserialize)] @@ -169,7 +170,7 @@ pub enum CacheIdent { #[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)] pub struct CreateFlow { pub source_table_ids: Vec, - pub flownode_ids: Vec, + pub flownodes: Vec, } #[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)] diff --git a/src/common/meta/src/key.rs b/src/common/meta/src/key.rs index d6ea96808d..ad06f650d5 100644 --- a/src/common/meta/src/key.rs +++ b/src/common/meta/src/key.rs @@ -112,6 +112,7 @@ use common_catalog::consts::{ use common_telemetry::warn; use datanode_table::{DatanodeTableKey, DatanodeTableManager, DatanodeTableValue}; use flow::flow_route::FlowRouteValue; +use flow::table_flow::TableFlowValue; use lazy_static::lazy_static; use regex::Regex; use serde::de::DeserializeOwned; @@ -1193,7 +1194,8 @@ impl_table_meta_value! { DatanodeTableValue, FlowInfoValue, FlowNameValue, - FlowRouteValue + FlowRouteValue, + TableFlowValue } impl_optional_meta_value! { diff --git a/src/common/meta/src/key/flow.rs b/src/common/meta/src/key/flow.rs index 1bc6894664..89f68abedf 100644 --- a/src/common/meta/src/key/flow.rs +++ b/src/common/meta/src/key/flow.rs @@ -24,6 +24,7 @@ use std::sync::Arc; use common_telemetry::info; use flow_route::{FlowRouteKey, FlowRouteManager, FlowRouteValue}; use snafu::{ensure, OptionExt}; +use table_flow::TableFlowValue; use self::flow_info::{FlowInfoKey, FlowInfoValue}; use self::flow_name::FlowNameKey; @@ -159,7 +160,7 @@ impl FlowMetadataManager { let create_flow_routes_txn = self .flow_route_manager - .build_create_txn(flow_id, flow_routes)?; + .build_create_txn(flow_id, flow_routes.clone())?; let create_flownode_flow_txn = self .flownode_flow_manager @@ -167,9 +168,12 @@ impl FlowMetadataManager { let create_table_flow_txn = self.table_flow_manager.build_create_txn( flow_id, - flow_info.flownode_ids().clone(), + flow_routes + .into_iter() + .map(|(partition_id, route)| (partition_id, TableFlowValue { peer: route.peer })) + .collect(), flow_info.source_table_ids(), - ); + )?; let txn = Txn::merge_all(vec![ create_flow_flow_name_txn, @@ -354,7 +358,11 @@ mod tests { let mem_kv = Arc::new(MemoryKvBackend::default()); let flow_metadata_manager = FlowMetadataManager::new(mem_kv.clone()); let flow_id = 10; - let flow_value = test_flow_info_value("flow", [(0, 1u64)].into(), vec![1024, 1025, 1026]); + let flow_value = test_flow_info_value( + "flow", + [(0, 1u64), (1, 2u64)].into(), + vec![1024, 1025, 1026], + ); let flow_routes = vec![ ( 1u32, @@ -422,7 +430,23 @@ mod tests { .try_collect::>() .await .unwrap(); - assert_eq!(nodes, vec![TableFlowKey::new(table_id, 1, flow_id, 0)]); + assert_eq!( + nodes, + vec![ + ( + TableFlowKey::new(table_id, 1, flow_id, 1), + TableFlowValue { + peer: Peer::empty(1) + } + ), + ( + TableFlowKey::new(table_id, 2, flow_id, 2), + TableFlowValue { + peer: Peer::empty(2) + } + ) + ] + ); } } diff --git a/src/common/meta/src/key/flow/flow_route.rs b/src/common/meta/src/key/flow/flow_route.rs index e7d179ab37..7de452182b 100644 --- a/src/common/meta/src/key/flow/flow_route.rs +++ b/src/common/meta/src/key/flow/flow_route.rs @@ -187,7 +187,7 @@ impl FlowRouteManager { /// Builds a create flow routes transaction. /// - /// Puts `__flow/route/{flownode_id}/{partitions}` keys. + /// Puts `__flow/route/{flow_id}/{partition_id}` keys. pub(crate) fn build_create_txn>( &self, flow_id: FlowId, diff --git a/src/common/meta/src/key/flow/table_flow.rs b/src/common/meta/src/key/flow/table_flow.rs index 63dff27bed..bc06462625 100644 --- a/src/common/meta/src/key/flow/table_flow.rs +++ b/src/common/meta/src/key/flow/table_flow.rs @@ -17,14 +17,16 @@ use std::sync::Arc; use futures::stream::BoxStream; use lazy_static::lazy_static; use regex::Regex; +use serde::{Deserialize, Serialize}; use snafu::OptionExt; use table::metadata::TableId; use crate::error::{self, Result}; use crate::key::flow::FlowScoped; -use crate::key::{BytesAdapter, FlowId, FlowPartitionId, MetaKey}; +use crate::key::{BytesAdapter, FlowId, FlowPartitionId, MetaKey, TableMetaValue}; use crate::kv_backend::txn::{Txn, TxnOp}; use crate::kv_backend::KvBackendRef; +use crate::peer::Peer; use crate::range_stream::{PaginationStream, DEFAULT_PAGE_SIZE}; use crate::rpc::store::RangeRequest; use crate::rpc::KeyValue; @@ -166,9 +168,16 @@ impl<'a> MetaKey<'a, TableFlowKeyInner> for TableFlowKeyInner { } } +#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)] +pub struct TableFlowValue { + pub(crate) peer: Peer, +} + /// Decodes `KeyValue` to [TableFlowKey]. -pub fn table_flow_decoder(kv: KeyValue) -> Result { - TableFlowKey::from_bytes(&kv.key) +pub fn table_flow_decoder(kv: KeyValue) -> Result<(TableFlowKey, TableFlowValue)> { + let key = TableFlowKey::from_bytes(&kv.key)?; + let value = TableFlowValue::try_from_raw_value(&kv.value)?; + Ok((key, value)) } pub type TableFlowManagerRef = Arc; @@ -187,7 +196,10 @@ impl TableFlowManager { /// Retrieves all [TableFlowKey]s of the specified `table_id`. /// /// TODO(discord9): add cache for it since range request does not support cache. - pub fn flows(&self, table_id: TableId) -> BoxStream<'static, Result> { + pub fn flows( + &self, + table_id: TableId, + ) -> BoxStream<'static, Result<(TableFlowKey, TableFlowValue)>> { let start_key = TableFlowKey::range_start_key(table_id); let req = RangeRequest::new().with_prefix(start_key); let stream = PaginationStream::new( @@ -203,25 +215,27 @@ impl TableFlowManager { /// Builds a create table flow transaction. /// /// Puts `__flow/source_table/{table_id}/{node_id}/{partition_id}` keys. - pub fn build_create_txn>( + pub fn build_create_txn( &self, flow_id: FlowId, - flownode_ids: I, + table_flow_values: Vec<(FlowPartitionId, TableFlowValue)>, source_table_ids: &[TableId], - ) -> Txn { - let txns = flownode_ids - .into_iter() - .flat_map(|(partition_id, flownode_id)| { - source_table_ids.iter().map(move |table_id| { - TxnOp::Put( - TableFlowKey::new(*table_id, flownode_id, flow_id, partition_id).to_bytes(), - vec![], - ) - }) - }) - .collect::>(); + ) -> Result { + let mut txns = Vec::with_capacity(source_table_ids.len() * table_flow_values.len()); - Txn::new().and_then(txns) + for (partition_id, table_flow_value) in table_flow_values { + let flownode_id = table_flow_value.peer.id; + let value = table_flow_value.try_as_raw_value()?; + for source_table_id in source_table_ids { + txns.push(TxnOp::Put( + TableFlowKey::new(*source_table_id, flownode_id, flow_id, partition_id) + .to_bytes(), + value.clone(), + )); + } + } + + Ok(Txn::new().and_then(txns)) } } diff --git a/src/operator/src/insert.rs b/src/operator/src/insert.rs index 20cf67df35..9f15519c44 100644 --- a/src/operator/src/insert.rs +++ b/src/operator/src/insert.rs @@ -341,8 +341,8 @@ impl Inserter { .await .context(RequestInsertsSnafu)? .unwrap_or_default() - .into_iter() - .map(|id| Peer::new(id, "")) + .values() + .cloned() .collect::>(); if !peers.is_empty() {