feat: store peer info in TableFlowValue (#4280)

* feat: store peer info in `TableFlowValue`

* chore: apply suggestions from CR
This commit is contained in:
Weny Xu
2024-07-04 17:37:23 +08:00
committed by GitHub
parent 22f4d43b10
commit 4ec247f34d
8 changed files with 120 additions and 52 deletions

View File

@@ -12,7 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use std::collections::HashSet;
use std::collections::HashMap;
use std::sync::Arc;
use futures::future::BoxFuture;
@@ -26,9 +26,10 @@ use crate::error::Result;
use crate::instruction::{CacheIdent, CreateFlow, DropFlow};
use crate::key::flow::{TableFlowManager, TableFlowManagerRef};
use crate::kv_backend::KvBackendRef;
use crate::peer::Peer;
use crate::FlownodeId;
type FlownodeSet = HashSet<FlownodeId>;
type FlownodeSet = Arc<HashMap<FlownodeId, Peer>>;
pub type TableFlownodeSetCacheRef = Arc<TableFlownodeSetCache>;
@@ -53,13 +54,14 @@ fn init_factory(table_flow_manager: TableFlowManagerRef) -> Initializer<TableId,
Box::pin(async move {
table_flow_manager
.flows(table_id)
.map_ok(|key| key.flownode_id())
.try_collect::<HashSet<_>>()
.map_ok(|(key, value)| (key.flownode_id(), value.peer))
.try_collect::<HashMap<_, _>>()
.await
// We must cache the `HashSet` even if it's empty,
// to avoid future requests to the remote storage next time;
// If the value is added to the remote storage,
// we have a corresponding cache invalidation mechanism to invalidate `(Key, EmptyHashSet)`.
.map(Arc::new)
.map(Some)
})
})
@@ -69,21 +71,23 @@ async fn handle_create_flow(
cache: &Cache<TableId, FlownodeSet>,
CreateFlow {
source_table_ids,
flownode_ids,
flownodes: flownode_peers,
}: &CreateFlow,
) {
for table_id in source_table_ids {
let entry = cache.entry(*table_id);
entry
.and_compute_with(
async |entry: Option<moka::Entry<u32, HashSet<u64>>>| match entry {
async |entry: Option<moka::Entry<u32, Arc<HashMap<u64, _>>>>| match entry {
Some(entry) => {
let mut set = entry.into_value();
set.extend(flownode_ids.iter().cloned());
let mut map = entry.into_value().as_ref().clone();
map.extend(flownode_peers.iter().map(|peer| (peer.id, peer.clone())));
Op::Put(set)
Op::Put(Arc::new(map))
}
None => Op::Put(HashSet::from_iter(flownode_ids.iter().cloned())),
None => Op::Put(Arc::new(HashMap::from_iter(
flownode_peers.iter().map(|peer| (peer.id, peer.clone())),
))),
},
)
.await;
@@ -101,14 +105,14 @@ async fn handle_drop_flow(
let entry = cache.entry(*table_id);
entry
.and_compute_with(
async |entry: Option<moka::Entry<u32, HashSet<u64>>>| match entry {
async |entry: Option<moka::Entry<u32, Arc<HashMap<u64, _>>>>| match entry {
Some(entry) => {
let mut set = entry.into_value();
let mut set = entry.into_value().as_ref().clone();
for flownode_id in flownode_ids {
set.remove(flownode_id);
}
Op::Put(set)
Op::Put(Arc::new(set))
}
None => {
// Do nothing
@@ -140,7 +144,7 @@ fn filter(ident: &CacheIdent) -> bool {
#[cfg(test)]
mod tests {
use std::collections::{BTreeMap, HashSet};
use std::collections::{BTreeMap, HashMap};
use std::sync::Arc;
use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME};
@@ -150,8 +154,10 @@ mod tests {
use crate::cache::flow::table_flownode::new_table_flownode_set_cache;
use crate::instruction::{CacheIdent, CreateFlow, DropFlow};
use crate::key::flow::flow_info::FlowInfoValue;
use crate::key::flow::flow_route::FlowRouteValue;
use crate::key::flow::FlowMetadataManager;
use crate::kv_backend::memory::MemoryKvBackend;
use crate::peer::Peer;
#[tokio::test]
async fn test_cache_empty_set() {
@@ -184,16 +190,31 @@ mod tests {
comment: "comment".to_string(),
options: Default::default(),
},
vec![],
(1..=3)
.map(|i| {
(
(i - 1) as u32,
FlowRouteValue {
peer: Peer::empty(i),
},
)
})
.collect::<Vec<_>>(),
)
.await
.unwrap();
let cache = CacheBuilder::new(128).build();
let cache = new_table_flownode_set_cache("test".to_string(), cache, mem_kv);
let set = cache.get(1024).await.unwrap().unwrap();
assert_eq!(set, HashSet::from([1, 2, 3]));
assert_eq!(
set.as_ref().clone(),
HashMap::from_iter((1..=3).map(|i| { (i, Peer::empty(i),) }))
);
let set = cache.get(1025).await.unwrap().unwrap();
assert_eq!(set, HashSet::from([1, 2, 3]));
assert_eq!(
set.as_ref().clone(),
HashMap::from_iter((1..=3).map(|i| { (i, Peer::empty(i),) }))
);
let result = cache.get(1026).await.unwrap().unwrap();
assert_eq!(result.len(), 0);
}
@@ -205,7 +226,7 @@ mod tests {
let cache = new_table_flownode_set_cache("test".to_string(), cache, mem_kv);
let ident = vec![CacheIdent::CreateFlow(CreateFlow {
source_table_ids: vec![1024, 1025],
flownode_ids: vec![1, 2, 3, 4, 5],
flownodes: (1..=5).map(Peer::empty).collect(),
})];
cache.invalidate(&ident).await.unwrap();
let set = cache.get(1024).await.unwrap().unwrap();
@@ -222,11 +243,11 @@ mod tests {
let ident = vec![
CacheIdent::CreateFlow(CreateFlow {
source_table_ids: vec![1024, 1025],
flownode_ids: vec![1, 2, 3, 4, 5],
flownodes: (1..=5).map(Peer::empty).collect(),
}),
CacheIdent::CreateFlow(CreateFlow {
source_table_ids: vec![1024, 1025],
flownode_ids: vec![11, 12],
flownodes: (11..=12).map(Peer::empty).collect(),
}),
];
cache.invalidate(&ident).await.unwrap();
@@ -241,8 +262,14 @@ mod tests {
})];
cache.invalidate(&ident).await.unwrap();
let set = cache.get(1024).await.unwrap().unwrap();
assert_eq!(set, HashSet::from([11, 12]));
assert_eq!(
set.as_ref().clone(),
HashMap::from_iter((11..=12).map(|i| { (i, Peer::empty(i),) }))
);
let set = cache.get(1025).await.unwrap().unwrap();
assert_eq!(set, HashSet::from([11, 12]));
assert_eq!(
set.as_ref().clone(),
HashMap::from_iter((11..=12).map(|i| { (i, Peer::empty(i),) }))
);
}
}

View File

@@ -194,7 +194,7 @@ impl CreateFlowProcedure {
&ctx,
&[CacheIdent::CreateFlow(CreateFlow {
source_table_ids: self.data.source_table_ids.clone(),
flownode_ids: self.data.peers.iter().map(|peer| peer.id).collect(),
flownodes: self.data.peers.clone(),
})],
)
.await?;

View File

@@ -25,6 +25,7 @@ use table::table_name::TableName;
use crate::flow_name::FlowName;
use crate::key::schema_name::SchemaName;
use crate::key::FlowId;
use crate::peer::Peer;
use crate::{ClusterId, DatanodeId, FlownodeId};
#[derive(Eq, Hash, PartialEq, Clone, Debug, Serialize, Deserialize)]
@@ -169,7 +170,7 @@ pub enum CacheIdent {
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
pub struct CreateFlow {
pub source_table_ids: Vec<TableId>,
pub flownode_ids: Vec<FlownodeId>,
pub flownodes: Vec<Peer>,
}
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]

View File

@@ -112,6 +112,7 @@ use common_catalog::consts::{
use common_telemetry::warn;
use datanode_table::{DatanodeTableKey, DatanodeTableManager, DatanodeTableValue};
use flow::flow_route::FlowRouteValue;
use flow::table_flow::TableFlowValue;
use lazy_static::lazy_static;
use regex::Regex;
use serde::de::DeserializeOwned;
@@ -1193,7 +1194,8 @@ impl_table_meta_value! {
DatanodeTableValue,
FlowInfoValue,
FlowNameValue,
FlowRouteValue
FlowRouteValue,
TableFlowValue
}
impl_optional_meta_value! {

View File

@@ -24,6 +24,7 @@ use std::sync::Arc;
use common_telemetry::info;
use flow_route::{FlowRouteKey, FlowRouteManager, FlowRouteValue};
use snafu::{ensure, OptionExt};
use table_flow::TableFlowValue;
use self::flow_info::{FlowInfoKey, FlowInfoValue};
use self::flow_name::FlowNameKey;
@@ -159,7 +160,7 @@ impl FlowMetadataManager {
let create_flow_routes_txn = self
.flow_route_manager
.build_create_txn(flow_id, flow_routes)?;
.build_create_txn(flow_id, flow_routes.clone())?;
let create_flownode_flow_txn = self
.flownode_flow_manager
@@ -167,9 +168,12 @@ impl FlowMetadataManager {
let create_table_flow_txn = self.table_flow_manager.build_create_txn(
flow_id,
flow_info.flownode_ids().clone(),
flow_routes
.into_iter()
.map(|(partition_id, route)| (partition_id, TableFlowValue { peer: route.peer }))
.collect(),
flow_info.source_table_ids(),
);
)?;
let txn = Txn::merge_all(vec![
create_flow_flow_name_txn,
@@ -354,7 +358,11 @@ mod tests {
let mem_kv = Arc::new(MemoryKvBackend::default());
let flow_metadata_manager = FlowMetadataManager::new(mem_kv.clone());
let flow_id = 10;
let flow_value = test_flow_info_value("flow", [(0, 1u64)].into(), vec![1024, 1025, 1026]);
let flow_value = test_flow_info_value(
"flow",
[(0, 1u64), (1, 2u64)].into(),
vec![1024, 1025, 1026],
);
let flow_routes = vec![
(
1u32,
@@ -422,7 +430,23 @@ mod tests {
.try_collect::<Vec<_>>()
.await
.unwrap();
assert_eq!(nodes, vec![TableFlowKey::new(table_id, 1, flow_id, 0)]);
assert_eq!(
nodes,
vec![
(
TableFlowKey::new(table_id, 1, flow_id, 1),
TableFlowValue {
peer: Peer::empty(1)
}
),
(
TableFlowKey::new(table_id, 2, flow_id, 2),
TableFlowValue {
peer: Peer::empty(2)
}
)
]
);
}
}

View File

@@ -187,7 +187,7 @@ impl FlowRouteManager {
/// Builds a create flow routes transaction.
///
/// Puts `__flow/route/{flownode_id}/{partitions}` keys.
/// Puts `__flow/route/{flow_id}/{partition_id}` keys.
pub(crate) fn build_create_txn<I: IntoIterator<Item = (FlowPartitionId, FlowRouteValue)>>(
&self,
flow_id: FlowId,

View File

@@ -17,14 +17,16 @@ use std::sync::Arc;
use futures::stream::BoxStream;
use lazy_static::lazy_static;
use regex::Regex;
use serde::{Deserialize, Serialize};
use snafu::OptionExt;
use table::metadata::TableId;
use crate::error::{self, Result};
use crate::key::flow::FlowScoped;
use crate::key::{BytesAdapter, FlowId, FlowPartitionId, MetaKey};
use crate::key::{BytesAdapter, FlowId, FlowPartitionId, MetaKey, TableMetaValue};
use crate::kv_backend::txn::{Txn, TxnOp};
use crate::kv_backend::KvBackendRef;
use crate::peer::Peer;
use crate::range_stream::{PaginationStream, DEFAULT_PAGE_SIZE};
use crate::rpc::store::RangeRequest;
use crate::rpc::KeyValue;
@@ -166,9 +168,16 @@ impl<'a> MetaKey<'a, TableFlowKeyInner> for TableFlowKeyInner {
}
}
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
pub struct TableFlowValue {
pub(crate) peer: Peer,
}
/// Decodes `KeyValue` to [TableFlowKey].
pub fn table_flow_decoder(kv: KeyValue) -> Result<TableFlowKey> {
TableFlowKey::from_bytes(&kv.key)
pub fn table_flow_decoder(kv: KeyValue) -> Result<(TableFlowKey, TableFlowValue)> {
let key = TableFlowKey::from_bytes(&kv.key)?;
let value = TableFlowValue::try_from_raw_value(&kv.value)?;
Ok((key, value))
}
pub type TableFlowManagerRef = Arc<TableFlowManager>;
@@ -187,7 +196,10 @@ impl TableFlowManager {
/// Retrieves all [TableFlowKey]s of the specified `table_id`.
///
/// TODO(discord9): add cache for it since range request does not support cache.
pub fn flows(&self, table_id: TableId) -> BoxStream<'static, Result<TableFlowKey>> {
pub fn flows(
&self,
table_id: TableId,
) -> BoxStream<'static, Result<(TableFlowKey, TableFlowValue)>> {
let start_key = TableFlowKey::range_start_key(table_id);
let req = RangeRequest::new().with_prefix(start_key);
let stream = PaginationStream::new(
@@ -203,25 +215,27 @@ impl TableFlowManager {
/// Builds a create table flow transaction.
///
/// Puts `__flow/source_table/{table_id}/{node_id}/{partition_id}` keys.
pub fn build_create_txn<I: IntoIterator<Item = (FlowPartitionId, FlownodeId)>>(
pub fn build_create_txn(
&self,
flow_id: FlowId,
flownode_ids: I,
table_flow_values: Vec<(FlowPartitionId, TableFlowValue)>,
source_table_ids: &[TableId],
) -> Txn {
let txns = flownode_ids
.into_iter()
.flat_map(|(partition_id, flownode_id)| {
source_table_ids.iter().map(move |table_id| {
TxnOp::Put(
TableFlowKey::new(*table_id, flownode_id, flow_id, partition_id).to_bytes(),
vec![],
)
})
})
.collect::<Vec<_>>();
) -> Result<Txn> {
let mut txns = Vec::with_capacity(source_table_ids.len() * table_flow_values.len());
Txn::new().and_then(txns)
for (partition_id, table_flow_value) in table_flow_values {
let flownode_id = table_flow_value.peer.id;
let value = table_flow_value.try_as_raw_value()?;
for source_table_id in source_table_ids {
txns.push(TxnOp::Put(
TableFlowKey::new(*source_table_id, flownode_id, flow_id, partition_id)
.to_bytes(),
value.clone(),
));
}
}
Ok(Txn::new().and_then(txns))
}
}

View File

@@ -341,8 +341,8 @@ impl Inserter {
.await
.context(RequestInsertsSnafu)?
.unwrap_or_default()
.into_iter()
.map(|id| Peer::new(id, ""))
.values()
.cloned()
.collect::<Vec<_>>();
if !peers.is_empty() {