fix: flow update use proper update (#6108)

* fix: flow update use proper update

* refactor: per review

* fix: flow cache

* chore: per copilot review

* refactor: rm flow node id

* refactor: per review

* chore: per review

* refactor: per review

* chore: per review
This commit is contained in:
discord9
2025-05-19 19:30:10 +08:00
committed by GitHub
parent a0d89c9ed1
commit 038acda7cd
10 changed files with 460 additions and 69 deletions

View File

@@ -24,21 +24,39 @@ use crate::cache::{CacheContainer, Initializer};
use crate::error::Result;
use crate::instruction::{CacheIdent, CreateFlow, DropFlow};
use crate::key::flow::{TableFlowManager, TableFlowManagerRef};
use crate::key::{FlowId, FlowPartitionId};
use crate::kv_backend::KvBackendRef;
use crate::peer::Peer;
use crate::FlownodeId;
type FlownodeSet = Arc<HashMap<FlownodeId, Peer>>;
/// Flow id&flow partition key
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
pub struct FlowIdent {
pub flow_id: FlowId,
pub partition_id: FlowPartitionId,
}
impl FlowIdent {
pub fn new(flow_id: FlowId, partition_id: FlowPartitionId) -> Self {
Self {
flow_id,
partition_id,
}
}
}
/// cache for TableFlowManager, the table_id part is in the outer cache
/// include flownode_id, flow_id, partition_id mapping to Peer
type FlownodeFlowSet = Arc<HashMap<FlowIdent, Peer>>;
pub type TableFlownodeSetCacheRef = Arc<TableFlownodeSetCache>;
/// [TableFlownodeSetCache] caches the [TableId] to [FlownodeSet] mapping.
pub type TableFlownodeSetCache = CacheContainer<TableId, FlownodeSet, CacheIdent>;
pub type TableFlownodeSetCache = CacheContainer<TableId, FlownodeFlowSet, CacheIdent>;
/// Constructs a [TableFlownodeSetCache].
pub fn new_table_flownode_set_cache(
name: String,
cache: Cache<TableId, FlownodeSet>,
cache: Cache<TableId, FlownodeFlowSet>,
kv_backend: KvBackendRef,
) -> TableFlownodeSetCache {
let table_flow_manager = Arc::new(TableFlowManager::new(kv_backend));
@@ -47,7 +65,7 @@ pub fn new_table_flownode_set_cache(
CacheContainer::new(name, cache, Box::new(invalidator), init, filter)
}
fn init_factory(table_flow_manager: TableFlowManagerRef) -> Initializer<TableId, FlownodeSet> {
fn init_factory(table_flow_manager: TableFlowManagerRef) -> Initializer<TableId, FlownodeFlowSet> {
Arc::new(move |&table_id| {
let table_flow_manager = table_flow_manager.clone();
Box::pin(async move {
@@ -57,7 +75,12 @@ fn init_factory(table_flow_manager: TableFlowManagerRef) -> Initializer<TableId,
.map(|flows| {
flows
.into_iter()
.map(|(key, value)| (key.flownode_id(), value.peer))
.map(|(key, value)| {
(
FlowIdent::new(key.flow_id(), key.partition_id()),
value.peer,
)
})
.collect::<HashMap<_, _>>()
})
// We must cache the `HashSet` even if it's empty,
@@ -71,26 +94,33 @@ fn init_factory(table_flow_manager: TableFlowManagerRef) -> Initializer<TableId,
}
async fn handle_create_flow(
cache: &Cache<TableId, FlownodeSet>,
cache: &Cache<TableId, FlownodeFlowSet>,
CreateFlow {
flow_id,
source_table_ids,
flownodes: flownode_peers,
partition_to_peer_mapping: flow_part2nodes,
}: &CreateFlow,
) {
for table_id in source_table_ids {
let entry = cache.entry(*table_id);
entry
.and_compute_with(
async |entry: Option<moka::Entry<u32, Arc<HashMap<u64, _>>>>| match entry {
async |entry: Option<moka::Entry<u32, FlownodeFlowSet>>| match entry {
Some(entry) => {
let mut map = entry.into_value().as_ref().clone();
map.extend(flownode_peers.iter().map(|peer| (peer.id, peer.clone())));
map.extend(
flow_part2nodes.iter().map(|(part, peer)| {
(FlowIdent::new(*flow_id, *part), peer.clone())
}),
);
Op::Put(Arc::new(map))
}
None => Op::Put(Arc::new(HashMap::from_iter(
flownode_peers.iter().map(|peer| (peer.id, peer.clone())),
))),
None => {
Op::Put(Arc::new(HashMap::from_iter(flow_part2nodes.iter().map(
|(part, peer)| (FlowIdent::new(*flow_id, *part), peer.clone()),
))))
}
},
)
.await;
@@ -98,21 +128,23 @@ async fn handle_create_flow(
}
async fn handle_drop_flow(
cache: &Cache<TableId, FlownodeSet>,
cache: &Cache<TableId, FlownodeFlowSet>,
DropFlow {
flow_id,
source_table_ids,
flownode_ids,
flow_part2node_id,
}: &DropFlow,
) {
for table_id in source_table_ids {
let entry = cache.entry(*table_id);
entry
.and_compute_with(
async |entry: Option<moka::Entry<u32, Arc<HashMap<u64, _>>>>| match entry {
async |entry: Option<moka::Entry<u32, FlownodeFlowSet>>| match entry {
Some(entry) => {
let mut set = entry.into_value().as_ref().clone();
for flownode_id in flownode_ids {
set.remove(flownode_id);
for (part, _node) in flow_part2node_id {
let key = FlowIdent::new(*flow_id, *part);
set.remove(&key);
}
Op::Put(Arc::new(set))
@@ -128,7 +160,7 @@ async fn handle_drop_flow(
}
fn invalidator<'a>(
cache: &'a Cache<TableId, FlownodeSet>,
cache: &'a Cache<TableId, FlownodeFlowSet>,
ident: &'a CacheIdent,
) -> BoxFuture<'a, Result<()>> {
Box::pin(async move {
@@ -154,7 +186,7 @@ mod tests {
use moka::future::CacheBuilder;
use table::table_name::TableName;
use crate::cache::flow::table_flownode::new_table_flownode_set_cache;
use crate::cache::flow::table_flownode::{new_table_flownode_set_cache, FlowIdent};
use crate::instruction::{CacheIdent, CreateFlow, DropFlow};
use crate::key::flow::flow_info::FlowInfoValue;
use crate::key::flow::flow_route::FlowRouteValue;
@@ -214,12 +246,16 @@ mod tests {
let set = cache.get(1024).await.unwrap().unwrap();
assert_eq!(
set.as_ref().clone(),
HashMap::from_iter((1..=3).map(|i| { (i, Peer::empty(i),) }))
HashMap::from_iter(
(1..=3).map(|i| { (FlowIdent::new(1024, (i - 1) as u32), Peer::empty(i),) })
)
);
let set = cache.get(1025).await.unwrap().unwrap();
assert_eq!(
set.as_ref().clone(),
HashMap::from_iter((1..=3).map(|i| { (i, Peer::empty(i),) }))
HashMap::from_iter(
(1..=3).map(|i| { (FlowIdent::new(1024, (i - 1) as u32), Peer::empty(i),) })
)
);
let result = cache.get(1026).await.unwrap().unwrap();
assert_eq!(result.len(), 0);
@@ -231,8 +267,9 @@ mod tests {
let cache = CacheBuilder::new(128).build();
let cache = new_table_flownode_set_cache("test".to_string(), cache, mem_kv);
let ident = vec![CacheIdent::CreateFlow(CreateFlow {
flow_id: 2001,
source_table_ids: vec![1024, 1025],
flownodes: (1..=5).map(Peer::empty).collect(),
partition_to_peer_mapping: (1..=5).map(|i| (i as u32, Peer::empty(i + 1))).collect(),
})];
cache.invalidate(&ident).await.unwrap();
let set = cache.get(1024).await.unwrap().unwrap();
@@ -241,6 +278,54 @@ mod tests {
assert_eq!(set.len(), 5);
}
#[tokio::test]
async fn test_replace_flow() {
let mem_kv = Arc::new(MemoryKvBackend::default());
let cache = CacheBuilder::new(128).build();
let cache = new_table_flownode_set_cache("test".to_string(), cache, mem_kv);
let ident = vec![CacheIdent::CreateFlow(CreateFlow {
flow_id: 2001,
source_table_ids: vec![1024, 1025],
partition_to_peer_mapping: (1..=5).map(|i| (i as u32, Peer::empty(i + 1))).collect(),
})];
cache.invalidate(&ident).await.unwrap();
let set = cache.get(1024).await.unwrap().unwrap();
assert_eq!(set.len(), 5);
let set = cache.get(1025).await.unwrap().unwrap();
assert_eq!(set.len(), 5);
let drop_then_create_flow = vec![
CacheIdent::DropFlow(DropFlow {
flow_id: 2001,
source_table_ids: vec![1024, 1025],
flow_part2node_id: (1..=5).map(|i| (i as u32, i + 1)).collect(),
}),
CacheIdent::CreateFlow(CreateFlow {
flow_id: 2001,
source_table_ids: vec![1026, 1027],
partition_to_peer_mapping: (11..=15)
.map(|i| (i as u32, Peer::empty(i + 1)))
.collect(),
}),
CacheIdent::FlowId(2001),
];
cache.invalidate(&drop_then_create_flow).await.unwrap();
let set = cache.get(1024).await.unwrap().unwrap();
assert!(set.is_empty());
let expected = HashMap::from_iter(
(11..=15).map(|i| (FlowIdent::new(2001, i as u32), Peer::empty(i + 1))),
);
let set = cache.get(1026).await.unwrap().unwrap();
assert_eq!(set.as_ref().clone(), expected);
let set = cache.get(1027).await.unwrap().unwrap();
assert_eq!(set.as_ref().clone(), expected);
}
#[tokio::test]
async fn test_drop_flow() {
let mem_kv = Arc::new(MemoryKvBackend::default());
@@ -248,34 +333,57 @@ mod tests {
let cache = new_table_flownode_set_cache("test".to_string(), cache, mem_kv);
let ident = vec![
CacheIdent::CreateFlow(CreateFlow {
flow_id: 2001,
source_table_ids: vec![1024, 1025],
flownodes: (1..=5).map(Peer::empty).collect(),
partition_to_peer_mapping: (1..=5)
.map(|i| (i as u32, Peer::empty(i + 1)))
.collect(),
}),
CacheIdent::CreateFlow(CreateFlow {
flow_id: 2002,
source_table_ids: vec![1024, 1025],
flownodes: (11..=12).map(Peer::empty).collect(),
partition_to_peer_mapping: (11..=12)
.map(|i| (i as u32, Peer::empty(i + 1)))
.collect(),
}),
// same flownode that hold multiple flows
CacheIdent::CreateFlow(CreateFlow {
flow_id: 2003,
source_table_ids: vec![1024, 1025],
partition_to_peer_mapping: (1..=5)
.map(|i| (i as u32, Peer::empty(i + 1)))
.collect(),
}),
];
cache.invalidate(&ident).await.unwrap();
let set = cache.get(1024).await.unwrap().unwrap();
assert_eq!(set.len(), 7);
assert_eq!(set.len(), 12);
let set = cache.get(1025).await.unwrap().unwrap();
assert_eq!(set.len(), 7);
assert_eq!(set.len(), 12);
let ident = vec![CacheIdent::DropFlow(DropFlow {
flow_id: 2001,
source_table_ids: vec![1024, 1025],
flownode_ids: vec![1, 2, 3, 4, 5],
flow_part2node_id: (1..=5).map(|i| (i as u32, i + 1)).collect(),
})];
cache.invalidate(&ident).await.unwrap();
let set = cache.get(1024).await.unwrap().unwrap();
assert_eq!(
set.as_ref().clone(),
HashMap::from_iter((11..=12).map(|i| { (i, Peer::empty(i),) }))
HashMap::from_iter(
(11..=12)
.map(|i| (FlowIdent::new(2002, i as u32), Peer::empty(i + 1)))
.chain((1..=5).map(|i| (FlowIdent::new(2003, i as u32), Peer::empty(i + 1))))
)
);
let set = cache.get(1025).await.unwrap().unwrap();
assert_eq!(
set.as_ref().clone(),
HashMap::from_iter((11..=12).map(|i| { (i, Peer::empty(i),) }))
HashMap::from_iter(
(11..=12)
.map(|i| (FlowIdent::new(2002, i as u32), Peer::empty(i + 1)))
.chain((1..=5).map(|i| (FlowIdent::new(2003, i as u32), Peer::empty(i + 1))))
)
);
}
}

View File

@@ -39,7 +39,7 @@ use crate::cache_invalidator::Context;
use crate::ddl::utils::{add_peer_context_if_needed, handle_retry_error};
use crate::ddl::DdlContext;
use crate::error::{self, Result, UnexpectedSnafu};
use crate::instruction::{CacheIdent, CreateFlow};
use crate::instruction::{CacheIdent, CreateFlow, DropFlow};
use crate::key::flow::flow_info::FlowInfoValue;
use crate::key::flow::flow_route::FlowRouteValue;
use crate::key::table_name::TableNameKey;
@@ -70,6 +70,7 @@ impl CreateFlowProcedure {
query_context,
state: CreateFlowState::Prepare,
prev_flow_info_value: None,
did_replace: false,
flow_type: None,
},
}
@@ -224,6 +225,7 @@ impl CreateFlowProcedure {
.update_flow_metadata(flow_id, prev_flow_value, &flow_info, flow_routes)
.await?;
info!("Replaced flow metadata for flow {flow_id}");
self.data.did_replace = true;
} else {
self.context
.flow_metadata_manager
@@ -240,22 +242,43 @@ impl CreateFlowProcedure {
debug_assert!(self.data.state == CreateFlowState::InvalidateFlowCache);
// Safety: The flow id must be allocated.
let flow_id = self.data.flow_id.unwrap();
let did_replace = self.data.did_replace;
let ctx = Context {
subject: Some("Invalidate flow cache by creating flow".to_string()),
};
let mut caches = vec![];
// if did replaced, invalidate the flow cache with drop the old flow
if did_replace {
let old_flow_info = self.data.prev_flow_info_value.as_ref().unwrap();
// only drop flow is needed, since flow name haven't changed, and flow id already invalidated below
caches.extend([CacheIdent::DropFlow(DropFlow {
flow_id,
source_table_ids: old_flow_info.source_table_ids.clone(),
flow_part2node_id: old_flow_info.flownode_ids().clone().into_iter().collect(),
})]);
}
let (_flow_info, flow_routes) = (&self.data).into();
let flow_part2peers = flow_routes
.into_iter()
.map(|(part_id, route)| (part_id, route.peer))
.collect();
caches.extend([
CacheIdent::CreateFlow(CreateFlow {
flow_id,
source_table_ids: self.data.source_table_ids.clone(),
partition_to_peer_mapping: flow_part2peers,
}),
CacheIdent::FlowId(flow_id),
]);
self.context
.cache_invalidator
.invalidate(
&ctx,
&[
CacheIdent::CreateFlow(CreateFlow {
source_table_ids: self.data.source_table_ids.clone(),
flownodes: self.data.peers.clone(),
}),
CacheIdent::FlowId(flow_id),
],
)
.invalidate(&ctx, &caches)
.await?;
Ok(Status::done_with_output(flow_id))
@@ -377,6 +400,10 @@ pub struct CreateFlowData {
/// For verify if prev value is consistent when need to update flow metadata.
/// only set when `or_replace` is true.
pub(crate) prev_flow_info_value: Option<DeserializedValueWithBytes<FlowInfoValue>>,
/// Only set to true when replace actually happened.
/// This is used to determine whether to invalidate the cache.
#[serde(default)]
pub(crate) did_replace: bool,
pub(crate) flow_type: Option<FlowType>,
}

View File

@@ -13,6 +13,7 @@
// limitations under the License.
mod metadata;
use api::v1::flow::{flow_request, DropRequest, FlowRequest};
use async_trait::async_trait;
use common_catalog::format_full_flow_name;
@@ -153,6 +154,12 @@ impl DropFlowProcedure {
};
let flow_info_value = self.data.flow_info_value.as_ref().unwrap();
let flow_part2nodes = flow_info_value
.flownode_ids()
.clone()
.into_iter()
.collect::<Vec<_>>();
self.context
.cache_invalidator
.invalidate(
@@ -164,8 +171,9 @@ impl DropFlowProcedure {
flow_name: flow_info_value.flow_name.to_string(),
}),
CacheIdent::DropFlow(DropFlow {
flow_id,
source_table_ids: flow_info_value.source_table_ids.clone(),
flownode_ids: flow_info_value.flownode_ids.values().cloned().collect(),
flow_part2node_id: flow_part2nodes,
}),
],
)

View File

@@ -24,7 +24,7 @@ use table::table_name::TableName;
use crate::flow_name::FlowName;
use crate::key::schema_name::SchemaName;
use crate::key::FlowId;
use crate::key::{FlowId, FlowPartitionId};
use crate::peer::Peer;
use crate::{DatanodeId, FlownodeId};
@@ -184,14 +184,19 @@ pub enum CacheIdent {
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
pub struct CreateFlow {
/// The unique identifier for the flow.
pub flow_id: FlowId,
pub source_table_ids: Vec<TableId>,
pub flownodes: Vec<Peer>,
/// Mapping of flow partition to peer information
pub partition_to_peer_mapping: Vec<(FlowPartitionId, Peer)>,
}
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
pub struct DropFlow {
pub flow_id: FlowId,
pub source_table_ids: Vec<TableId>,
pub flownode_ids: Vec<FlownodeId>,
/// Mapping of flow partition to flownode id
pub flow_part2node_id: Vec<(FlowPartitionId, FlownodeId)>,
}
/// Flushes a batch of regions.

View File

@@ -246,27 +246,32 @@ impl FlowMetadataManager {
new_flow_info: &FlowInfoValue,
flow_routes: Vec<(FlowPartitionId, FlowRouteValue)>,
) -> Result<()> {
let (create_flow_flow_name_txn, on_create_flow_flow_name_failure) =
let (update_flow_flow_name_txn, on_create_flow_flow_name_failure) =
self.flow_name_manager.build_update_txn(
&new_flow_info.catalog_name,
&new_flow_info.flow_name,
flow_id,
)?;
let (create_flow_txn, on_create_flow_failure) =
let (update_flow_txn, on_create_flow_failure) =
self.flow_info_manager
.build_update_txn(flow_id, current_flow_info, new_flow_info)?;
let create_flow_routes_txn = self
.flow_route_manager
.build_create_txn(flow_id, flow_routes.clone())?;
let create_flownode_flow_txn = self
.flownode_flow_manager
.build_create_txn(flow_id, new_flow_info.flownode_ids().clone());
let create_table_flow_txn = self.table_flow_manager.build_create_txn(
let update_flow_routes_txn = self.flow_route_manager.build_update_txn(
flow_id,
current_flow_info,
flow_routes.clone(),
)?;
let update_flownode_flow_txn = self.flownode_flow_manager.build_update_txn(
flow_id,
current_flow_info,
new_flow_info.flownode_ids().clone(),
);
let update_table_flow_txn = self.table_flow_manager.build_update_txn(
flow_id,
current_flow_info,
flow_routes
.into_iter()
.map(|(partition_id, route)| (partition_id, TableFlowValue { peer: route.peer }))
@@ -275,11 +280,11 @@ impl FlowMetadataManager {
)?;
let txn = Txn::merge_all(vec![
create_flow_flow_name_txn,
create_flow_txn,
create_flow_routes_txn,
create_flownode_flow_txn,
create_table_flow_txn,
update_flow_flow_name_txn,
update_flow_txn,
update_flow_routes_txn,
update_flownode_flow_txn,
update_table_flow_txn,
]);
info!(
"Creating flow {}.{}({}), with {} txn operations",
@@ -783,6 +788,141 @@ mod tests {
}
}
#[tokio::test]
async fn test_update_flow_metadata_diff_flownode() {
let mem_kv = Arc::new(MemoryKvBackend::default());
let flow_metadata_manager = FlowMetadataManager::new(mem_kv.clone());
let flow_id = 10;
let flow_value = test_flow_info_value(
"flow",
[(0u32, 1u64), (1u32, 2u64)].into(),
vec![1024, 1025, 1026],
);
let flow_routes = vec![
(
0u32,
FlowRouteValue {
peer: Peer::empty(1),
},
),
(
1,
FlowRouteValue {
peer: Peer::empty(2),
},
),
];
flow_metadata_manager
.create_flow_metadata(flow_id, flow_value.clone(), flow_routes.clone())
.await
.unwrap();
let new_flow_value = {
let mut tmp = flow_value.clone();
tmp.raw_sql = "new".to_string();
// move to different flownodes
tmp.flownode_ids = [(0, 3u64), (1, 4u64)].into();
tmp
};
let new_flow_routes = vec![
(
0u32,
FlowRouteValue {
peer: Peer::empty(3),
},
),
(
1,
FlowRouteValue {
peer: Peer::empty(4),
},
),
];
// Update flow instead
flow_metadata_manager
.update_flow_metadata(
flow_id,
&DeserializedValueWithBytes::from_inner(flow_value.clone()),
&new_flow_value,
new_flow_routes.clone(),
)
.await
.unwrap();
let got = flow_metadata_manager
.flow_info_manager()
.get(flow_id)
.await
.unwrap()
.unwrap();
let routes = flow_metadata_manager
.flow_route_manager()
.routes(flow_id)
.await
.unwrap();
assert_eq!(
routes,
vec![
(
FlowRouteKey::new(flow_id, 0),
FlowRouteValue {
peer: Peer::empty(3),
},
),
(
FlowRouteKey::new(flow_id, 1),
FlowRouteValue {
peer: Peer::empty(4),
},
),
]
);
assert_eq!(got, new_flow_value);
let flows = flow_metadata_manager
.flownode_flow_manager()
.flows(1)
.try_collect::<Vec<_>>()
.await
.unwrap();
// should moved to different flownode
assert_eq!(flows, vec![]);
let flows = flow_metadata_manager
.flownode_flow_manager()
.flows(3)
.try_collect::<Vec<_>>()
.await
.unwrap();
assert_eq!(flows, vec![(flow_id, 0)]);
for table_id in [1024, 1025, 1026] {
let nodes = flow_metadata_manager
.table_flow_manager()
.flows(table_id)
.await
.unwrap();
assert_eq!(
nodes,
vec![
(
TableFlowKey::new(table_id, 3, flow_id, 0),
TableFlowValue {
peer: Peer::empty(3)
}
),
(
TableFlowKey::new(table_id, 4, flow_id, 1),
TableFlowValue {
peer: Peer::empty(4)
}
)
]
);
}
}
#[tokio::test]
async fn test_update_flow_metadata_flow_replace_diff_id_err() {
let mem_kv = Arc::new(MemoryKvBackend::default());

View File

@@ -272,10 +272,11 @@ impl FlowInfoManager {
let raw_value = new_flow_value.try_as_raw_value()?;
let prev_value = current_flow_value.get_raw_bytes();
let txn = Txn::new()
.when(vec![
Compare::new(key.clone(), CompareOp::NotEqual, None),
Compare::new(key.clone(), CompareOp::Equal, Some(prev_value)),
])
.when(vec![Compare::new(
key.clone(),
CompareOp::Equal,
Some(prev_value),
)])
.and_then(vec![TxnOp::Put(key.clone(), raw_value)])
.or_else(vec![TxnOp::Get(key.clone())]);

View File

@@ -19,9 +19,12 @@ use serde::{Deserialize, Serialize};
use snafu::OptionExt;
use crate::error::{self, Result};
use crate::key::flow::flow_info::FlowInfoValue;
use crate::key::flow::{flownode_addr_helper, FlowScoped};
use crate::key::node_address::NodeAddressKey;
use crate::key::{BytesAdapter, FlowId, FlowPartitionId, MetadataKey, MetadataValue};
use crate::key::{
BytesAdapter, DeserializedValueWithBytes, FlowId, FlowPartitionId, MetadataKey, MetadataValue,
};
use crate::kv_backend::txn::{Txn, TxnOp};
use crate::kv_backend::KvBackendRef;
use crate::peer::Peer;
@@ -204,6 +207,33 @@ impl FlowRouteManager {
Ok(Txn::new().and_then(txns))
}
/// Builds a update flow routes transaction.
///
/// Puts `__flow/route/{flow_id}/{partition_id}` keys.
/// Also removes `__flow/route/{flow_id}/{old_partition_id}` keys.
pub(crate) fn build_update_txn<I: IntoIterator<Item = (FlowPartitionId, FlowRouteValue)>>(
&self,
flow_id: FlowId,
current_flow_info: &DeserializedValueWithBytes<FlowInfoValue>,
flow_routes: I,
) -> Result<Txn> {
let del_txns = current_flow_info
.flownode_ids()
.iter()
.map(|(partition_id, _)| {
let key = FlowRouteKey::new(flow_id, *partition_id).to_bytes();
Ok(TxnOp::Delete(key))
});
let put_txns = flow_routes.into_iter().map(|(partition_id, route)| {
let key = FlowRouteKey::new(flow_id, partition_id).to_bytes();
Ok(TxnOp::Put(key, route.try_as_raw_value()?))
});
let txns = del_txns.chain(put_txns).collect::<Result<Vec<_>>>()?;
Ok(Txn::new().and_then(txns))
}
async fn remap_flow_route_addresses(
&self,
flow_routes: &mut [(FlowRouteKey, FlowRouteValue)],

View File

@@ -19,8 +19,9 @@ use regex::Regex;
use snafu::OptionExt;
use crate::error::{self, Result};
use crate::key::flow::flow_info::FlowInfoValue;
use crate::key::flow::FlowScoped;
use crate::key::{BytesAdapter, FlowId, FlowPartitionId, MetadataKey};
use crate::key::{BytesAdapter, DeserializedValueWithBytes, FlowId, FlowPartitionId, MetadataKey};
use crate::kv_backend::txn::{Txn, TxnOp};
use crate::kv_backend::KvBackendRef;
use crate::range_stream::{PaginationStream, DEFAULT_PAGE_SIZE};
@@ -202,6 +203,33 @@ impl FlownodeFlowManager {
Txn::new().and_then(txns)
}
/// Builds a update flownode flow transaction.
///
/// Puts `__flownode_flow/{flownode_id}/{flow_id}/{partition_id}` keys.
/// Remove the old `__flownode_flow/{old_flownode_id}/{flow_id}/{old_partition_id}` keys.
pub(crate) fn build_update_txn<I: IntoIterator<Item = (FlowPartitionId, FlownodeId)>>(
&self,
flow_id: FlowId,
current_flow_info: &DeserializedValueWithBytes<FlowInfoValue>,
flownode_ids: I,
) -> Txn {
let del_txns =
current_flow_info
.flownode_ids()
.iter()
.map(|(partition_id, flownode_id)| {
let key = FlownodeFlowKey::new(*flownode_id, flow_id, *partition_id).to_bytes();
TxnOp::Delete(key)
});
let put_txns = flownode_ids.into_iter().map(|(partition_id, flownode_id)| {
let key = FlownodeFlowKey::new(flownode_id, flow_id, partition_id).to_bytes();
TxnOp::Put(key, vec![])
});
let txns = del_txns.chain(put_txns).collect::<Vec<_>>();
Txn::new().and_then(txns)
}
}
#[cfg(test)]

View File

@@ -22,9 +22,12 @@ use snafu::OptionExt;
use table::metadata::TableId;
use crate::error::{self, Result};
use crate::key::flow::flow_info::FlowInfoValue;
use crate::key::flow::{flownode_addr_helper, FlowScoped};
use crate::key::node_address::NodeAddressKey;
use crate::key::{BytesAdapter, FlowId, FlowPartitionId, MetadataKey, MetadataValue};
use crate::key::{
BytesAdapter, DeserializedValueWithBytes, FlowId, FlowPartitionId, MetadataKey, MetadataValue,
};
use crate::kv_backend::txn::{Txn, TxnOp};
use crate::kv_backend::KvBackendRef;
use crate::peer::Peer;
@@ -215,7 +218,7 @@ impl TableFlowManager {
/// Builds a create table flow transaction.
///
/// Puts `__flow/source_table/{table_id}/{node_id}/{partition_id}` keys.
/// Puts `__flow/source_table/{table_id}/{node_id}/{flow_id}/{partition_id}` keys.
pub fn build_create_txn(
&self,
flow_id: FlowId,
@@ -239,6 +242,44 @@ impl TableFlowManager {
Ok(Txn::new().and_then(txns))
}
/// Builds a update table flow transaction.
///
/// Puts `__flow/source_table/{table_id}/{node_id}/{flow_id}/{partition_id}` keys,
/// Also remove previous
/// `__flow/source_table/{table_id}/{old_node_id}/{flow_id}/{partition_id}` keys.
pub fn build_update_txn(
&self,
flow_id: FlowId,
current_flow_info: &DeserializedValueWithBytes<FlowInfoValue>,
table_flow_values: Vec<(FlowPartitionId, TableFlowValue)>,
source_table_ids: &[TableId],
) -> Result<Txn> {
let mut txns = Vec::with_capacity(2 * source_table_ids.len() * table_flow_values.len());
// first remove the old keys
for (part_id, node_id) in current_flow_info.flownode_ids() {
for source_table_id in current_flow_info.source_table_ids() {
txns.push(TxnOp::Delete(
TableFlowKey::new(*source_table_id, *node_id, flow_id, *part_id).to_bytes(),
));
}
}
for (partition_id, table_flow_value) in table_flow_values {
let flownode_id = table_flow_value.peer.id;
let value = table_flow_value.try_as_raw_value()?;
for source_table_id in source_table_ids {
txns.push(TxnOp::Put(
TableFlowKey::new(*source_table_id, flownode_id, flow_id, partition_id)
.to_bytes(),
value.clone(),
));
}
}
Ok(Txn::new().and_then(txns))
}
async fn remap_table_flow_addresses(
&self,
table_flows: &mut [(TableFlowKey, TableFlowValue)],

View File

@@ -950,6 +950,7 @@ impl FlowMirrorTask {
// already know this is not source table
Some(None) => continue,
_ => {
// dedup peers
let peers = cache
.get(table_id)
.await
@@ -957,6 +958,8 @@ impl FlowMirrorTask {
.unwrap_or_default()
.values()
.cloned()
.collect::<HashSet<_>>()
.into_iter()
.collect::<Vec<_>>();
if !peers.is_empty() {