feat: remap flow route address (#5565)

* feat: remap fow peers

* refactor: not stream

* feat: remap flownode addr on FlowRoute and TableFlow

* fix: unit test

* Update src/meta-srv/src/handler/remap_flow_peer_handler.rs

Co-authored-by: Lei, HUANG <6406592+v0y4g3r@users.noreply.github.com>

* chore: by comment

* Update src/meta-srv/src/handler/remap_flow_peer_handler.rs

* Update src/common/meta/src/key/flow/table_flow.rs

* Update src/common/meta/src/key/flow/flow_route.rs

* chore: remove duplicate field

---------

Co-authored-by: Lei, HUANG <6406592+v0y4g3r@users.noreply.github.com>
This commit is contained in:
jeremyhi
2025-02-20 16:21:32 +08:00
committed by GitHub
parent a0ff9e751e
commit 71007e200c
13 changed files with 251 additions and 47 deletions

View File

@@ -16,7 +16,6 @@ use std::collections::HashMap;
use std::sync::Arc;
use futures::future::BoxFuture;
use futures::TryStreamExt;
use moka::future::Cache;
use moka::ops::compute::Op;
use table::metadata::TableId;
@@ -54,9 +53,13 @@ fn init_factory(table_flow_manager: TableFlowManagerRef) -> Initializer<TableId,
Box::pin(async move {
table_flow_manager
.flows(table_id)
.map_ok(|(key, value)| (key.flownode_id(), value.peer))
.try_collect::<HashMap<_, _>>()
.await
.map(|flows| {
flows
.into_iter()
.map(|(key, value)| (key.flownode_id(), value.peer))
.collect::<HashMap<_, _>>()
})
// We must cache the `HashSet` even if it's empty,
// to avoid future requests to the remote storage next time;
// If the value is added to the remote storage,

View File

@@ -29,7 +29,6 @@ use common_procedure::{
use common_telemetry::info;
use common_telemetry::tracing_context::TracingContext;
use futures::future::join_all;
use futures::TryStreamExt;
use itertools::Itertools;
use serde::{Deserialize, Serialize};
use snafu::{ensure, ResultExt};
@@ -131,9 +130,10 @@ impl CreateFlowProcedure {
.flow_metadata_manager
.flow_route_manager()
.routes(flow_id)
.map_ok(|(_, value)| value.peer)
.try_collect::<Vec<_>>()
.await?;
.await?
.into_iter()
.map(|(_, value)| value.peer)
.collect::<Vec<_>>();
self.data.flow_id = Some(flow_id);
self.data.peers = peers;
info!("Replacing flow, flow_id: {}", flow_id);

View File

@@ -13,7 +13,6 @@
// limitations under the License.
use common_catalog::format_full_flow_name;
use futures::TryStreamExt;
use snafu::{ensure, OptionExt};
use crate::ddl::drop_flow::DropFlowProcedure;
@@ -39,9 +38,10 @@ impl DropFlowProcedure {
.flow_metadata_manager
.flow_route_manager()
.routes(self.data.task.flow_id)
.map_ok(|(_, value)| value)
.try_collect::<Vec<_>>()
.await?;
.await?
.into_iter()
.map(|(_, value)| value)
.collect::<Vec<_>>();
ensure!(
!flow_route_values.is_empty(),
error::FlowRouteNotFoundSnafu {

View File

@@ -16,9 +16,9 @@ pub mod flow_info;
pub(crate) mod flow_name;
pub(crate) mod flow_route;
pub mod flow_state;
mod flownode_addr_helper;
pub(crate) mod flownode_flow;
pub(crate) mod table_flow;
use std::ops::Deref;
use std::sync::Arc;
@@ -506,7 +506,6 @@ mod tests {
let routes = flow_metadata_manager
.flow_route_manager()
.routes(flow_id)
.try_collect::<Vec<_>>()
.await
.unwrap();
assert_eq!(
@@ -538,7 +537,6 @@ mod tests {
let nodes = flow_metadata_manager
.table_flow_manager()
.flows(table_id)
.try_collect::<Vec<_>>()
.await
.unwrap();
assert_eq!(
@@ -727,7 +725,6 @@ mod tests {
let routes = flow_metadata_manager
.flow_route_manager()
.routes(flow_id)
.try_collect::<Vec<_>>()
.await
.unwrap();
assert_eq!(
@@ -759,7 +756,6 @@ mod tests {
let nodes = flow_metadata_manager
.table_flow_manager()
.flows(table_id)
.try_collect::<Vec<_>>()
.await
.unwrap();
assert_eq!(

View File

@@ -12,14 +12,15 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use futures::stream::BoxStream;
use futures::TryStreamExt;
use lazy_static::lazy_static;
use regex::Regex;
use serde::{Deserialize, Serialize};
use snafu::OptionExt;
use crate::error::{self, Result};
use crate::key::flow::FlowScoped;
use crate::key::flow::{flownode_addr_helper, FlowScoped};
use crate::key::node_address::NodeAddressKey;
use crate::key::{BytesAdapter, FlowId, FlowPartitionId, MetadataKey, MetadataValue};
use crate::kv_backend::txn::{Txn, TxnOp};
use crate::kv_backend::KvBackendRef;
@@ -167,10 +168,7 @@ impl FlowRouteManager {
}
/// Retrieves all [FlowRouteValue]s of the specified `flow_id`.
pub fn routes(
&self,
flow_id: FlowId,
) -> BoxStream<'static, Result<(FlowRouteKey, FlowRouteValue)>> {
pub async fn routes(&self, flow_id: FlowId) -> Result<Vec<(FlowRouteKey, FlowRouteValue)>> {
let start_key = FlowRouteKey::range_start_key(flow_id);
let req = RangeRequest::new().with_prefix(start_key);
let stream = PaginationStream::new(
@@ -181,7 +179,9 @@ impl FlowRouteManager {
)
.into_stream();
Box::pin(stream)
let mut res = stream.try_collect::<Vec<_>>().await?;
self.remap_flow_route_addresses(&mut res).await?;
Ok(res)
}
/// Builds a create flow routes transaction.
@@ -203,6 +203,28 @@ impl FlowRouteManager {
Ok(Txn::new().and_then(txns))
}
async fn remap_flow_route_addresses(
&self,
flow_routes: &mut [(FlowRouteKey, FlowRouteValue)],
) -> Result<()> {
let keys = flow_routes
.iter()
.map(|(_, value)| NodeAddressKey::with_flownode(value.peer.id))
.collect();
let flow_node_addrs =
flownode_addr_helper::get_flownode_addresses(&self.kv_backend, keys).await?;
for (_, flow_route_value) in flow_routes.iter_mut() {
let flownode_id = flow_route_value.peer.id;
// If an id lacks a corresponding address in the `flow_node_addrs`,
// it means the old address in `table_flow_value` is still valid,
// which is expected.
if let Some(node_addr) = flow_node_addrs.get(&flownode_id) {
flow_route_value.peer.addr = node_addr.peer.addr.clone();
}
}
Ok(())
}
}
#[cfg(test)]

View File

@@ -0,0 +1,47 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use std::collections::HashMap;
use crate::error::Result;
use crate::key::node_address::{NodeAddressKey, NodeAddressValue};
use crate::key::{MetadataKey, MetadataValue};
use crate::kv_backend::KvBackendRef;
use crate::rpc::store::BatchGetRequest;
/// Get the addresses of the flownodes.
/// The result is a map: node_id -> NodeAddressValue
pub(crate) async fn get_flownode_addresses(
kv_backend: &KvBackendRef,
keys: Vec<NodeAddressKey>,
) -> Result<HashMap<u64, NodeAddressValue>> {
if keys.is_empty() {
return Ok(HashMap::default());
}
let req = BatchGetRequest {
keys: keys.into_iter().map(|k| k.to_bytes()).collect(),
};
kv_backend
.batch_get(req)
.await?
.kvs
.into_iter()
.map(|kv| {
let key = NodeAddressKey::from_bytes(&kv.key)?;
let value = NodeAddressValue::try_from_raw_value(&kv.value)?;
Ok((key.node_id, value))
})
.collect()
}

View File

@@ -14,7 +14,7 @@
use std::sync::Arc;
use futures::stream::BoxStream;
use futures::TryStreamExt;
use lazy_static::lazy_static;
use regex::Regex;
use serde::{Deserialize, Serialize};
@@ -22,7 +22,8 @@ use snafu::OptionExt;
use table::metadata::TableId;
use crate::error::{self, Result};
use crate::key::flow::FlowScoped;
use crate::key::flow::{flownode_addr_helper, FlowScoped};
use crate::key::node_address::NodeAddressKey;
use crate::key::{BytesAdapter, FlowId, FlowPartitionId, MetadataKey, MetadataValue};
use crate::kv_backend::txn::{Txn, TxnOp};
use crate::kv_backend::KvBackendRef;
@@ -196,10 +197,7 @@ impl TableFlowManager {
/// Retrieves all [TableFlowKey]s of the specified `table_id`.
///
/// TODO(discord9): add cache for it since range request does not support cache.
pub fn flows(
&self,
table_id: TableId,
) -> BoxStream<'static, Result<(TableFlowKey, TableFlowValue)>> {
pub async fn flows(&self, table_id: TableId) -> Result<Vec<(TableFlowKey, TableFlowValue)>> {
let start_key = TableFlowKey::range_start_key(table_id);
let req = RangeRequest::new().with_prefix(start_key);
let stream = PaginationStream::new(
@@ -210,7 +208,9 @@ impl TableFlowManager {
)
.into_stream();
Box::pin(stream)
let mut res = stream.try_collect::<Vec<_>>().await?;
self.remap_table_flow_addresses(&mut res).await?;
Ok(res)
}
/// Builds a create table flow transaction.
@@ -238,6 +238,28 @@ impl TableFlowManager {
Ok(Txn::new().and_then(txns))
}
async fn remap_table_flow_addresses(
&self,
table_flows: &mut [(TableFlowKey, TableFlowValue)],
) -> Result<()> {
let keys = table_flows
.iter()
.map(|(_, value)| NodeAddressKey::with_flownode(value.peer.id))
.collect::<Vec<_>>();
let flownode_addrs =
flownode_addr_helper::get_flownode_addresses(&self.kv_backend, keys).await?;
for (_, table_flow_value) in table_flows.iter_mut() {
let flownode_id = table_flow_value.peer.id;
// If an id lacks a corresponding address in the `flow_node_addrs`,
// it means the old address in `table_flow_value` is still valid,
// which is expected.
if let Some(flownode_addr) = flownode_addrs.get(&flownode_id) {
table_flow_value.peer.addr = flownode_addr.peer.addr.clone();
}
}
Ok(())
}
}
#[cfg(test)]

View File

@@ -39,6 +39,10 @@ impl NodeAddressKey {
pub fn with_datanode(node_id: u64) -> Self {
Self::new(Role::Datanode, node_id)
}
pub fn with_flownode(node_id: u64) -> Self {
Self::new(Role::Flownode, node_id)
}
}
#[derive(Debug, PartialEq, Serialize, Deserialize, Clone)]

View File

@@ -60,12 +60,12 @@ async fn query_flow_state(
#[derive(Clone)]
pub struct HeartbeatTask {
node_id: u64,
node_epoch: u64,
peer_addr: String,
meta_client: Arc<MetaClient>,
report_interval: Duration,
retry_interval: Duration,
resp_handler_executor: HeartbeatResponseHandlerExecutorRef,
start_time_ms: u64,
running: Arc<AtomicBool>,
query_stat_size: Option<SizeReportSender>,
}
@@ -83,12 +83,12 @@ impl HeartbeatTask {
) -> Self {
Self {
node_id: opts.node_id.unwrap_or(0),
node_epoch: common_time::util::current_time_millis() as u64,
peer_addr: addrs::resolve_addr(&opts.grpc.bind_addr, Some(&opts.grpc.server_addr)),
meta_client,
report_interval: heartbeat_opts.interval,
retry_interval: heartbeat_opts.retry_interval,
resp_handler_executor,
start_time_ms: common_time::util::current_time_millis() as u64,
running: Arc::new(AtomicBool::new(false)),
query_stat_size: None,
}
@@ -181,7 +181,7 @@ impl HeartbeatTask {
mut outgoing_rx: mpsc::Receiver<OutgoingMessage>,
) {
let report_interval = self.report_interval;
let start_time_ms = self.start_time_ms;
let node_epoch = self.node_epoch;
let self_peer = Some(Peer {
id: self.node_id,
addr: self.peer_addr.clone(),
@@ -198,7 +198,8 @@ impl HeartbeatTask {
let heartbeat_request = HeartbeatRequest {
peer: self_peer,
info: Self::build_node_info(start_time_ms),
node_epoch,
info: Self::build_node_info(node_epoch),
..Default::default()
};

View File

@@ -44,6 +44,7 @@ use mailbox_handler::MailboxHandler;
use on_leader_start_handler::OnLeaderStartHandler;
use publish_heartbeat_handler::PublishHeartbeatHandler;
use region_lease_handler::RegionLeaseHandler;
use remap_flow_peer_handler::RemapFlowPeerHandler;
use response_header_handler::ResponseHeaderHandler;
use snafu::{OptionExt, ResultExt};
use store_api::storage::RegionId;
@@ -71,6 +72,7 @@ pub mod mailbox_handler;
pub mod on_leader_start_handler;
pub mod publish_heartbeat_handler;
pub mod region_lease_handler;
pub mod remap_flow_peer_handler;
pub mod response_header_handler;
#[async_trait::async_trait]
@@ -573,6 +575,7 @@ impl HeartbeatHandlerGroupBuilder {
self.add_handler_last(publish_heartbeat_handler);
}
self.add_handler_last(CollectStatsHandler::new(self.flush_stats_factor));
self.add_handler_last(RemapFlowPeerHandler::default());
if let Some(flow_state_handler) = self.flow_state_handler.take() {
self.add_handler_last(flow_state_handler);
@@ -853,7 +856,7 @@ mod tests {
.unwrap();
let handlers = group.handlers;
assert_eq!(12, handlers.len());
assert_eq!(13, handlers.len());
let names = [
"ResponseHeaderHandler",
@@ -868,6 +871,7 @@ mod tests {
"MailboxHandler",
"FilterInactiveRegionStatsHandler",
"CollectStatsHandler",
"RemapFlowPeerHandler",
];
for (handler, name) in handlers.iter().zip(names.into_iter()) {
@@ -888,7 +892,7 @@ mod tests {
let group = builder.build().unwrap();
let handlers = group.handlers;
assert_eq!(13, handlers.len());
assert_eq!(14, handlers.len());
let names = [
"ResponseHeaderHandler",
@@ -904,6 +908,7 @@ mod tests {
"CollectStatsHandler",
"FilterInactiveRegionStatsHandler",
"CollectStatsHandler",
"RemapFlowPeerHandler",
];
for (handler, name) in handlers.iter().zip(names.into_iter()) {
@@ -921,7 +926,7 @@ mod tests {
let group = builder.build().unwrap();
let handlers = group.handlers;
assert_eq!(13, handlers.len());
assert_eq!(14, handlers.len());
let names = [
"CollectStatsHandler",
@@ -937,6 +942,7 @@ mod tests {
"MailboxHandler",
"FilterInactiveRegionStatsHandler",
"CollectStatsHandler",
"RemapFlowPeerHandler",
];
for (handler, name) in handlers.iter().zip(names.into_iter()) {
@@ -954,7 +960,7 @@ mod tests {
let group = builder.build().unwrap();
let handlers = group.handlers;
assert_eq!(13, handlers.len());
assert_eq!(14, handlers.len());
let names = [
"ResponseHeaderHandler",
@@ -970,6 +976,7 @@ mod tests {
"CollectStatsHandler",
"FilterInactiveRegionStatsHandler",
"CollectStatsHandler",
"RemapFlowPeerHandler",
];
for (handler, name) in handlers.iter().zip(names.into_iter()) {
@@ -987,7 +994,7 @@ mod tests {
let group = builder.build().unwrap();
let handlers = group.handlers;
assert_eq!(13, handlers.len());
assert_eq!(14, handlers.len());
let names = [
"ResponseHeaderHandler",
@@ -1003,6 +1010,7 @@ mod tests {
"FilterInactiveRegionStatsHandler",
"CollectStatsHandler",
"ResponseHeaderHandler",
"RemapFlowPeerHandler",
];
for (handler, name) in handlers.iter().zip(names.into_iter()) {
@@ -1020,7 +1028,7 @@ mod tests {
let group = builder.build().unwrap();
let handlers = group.handlers;
assert_eq!(12, handlers.len());
assert_eq!(13, handlers.len());
let names = [
"ResponseHeaderHandler",
@@ -1035,6 +1043,7 @@ mod tests {
"CollectStatsHandler",
"FilterInactiveRegionStatsHandler",
"CollectStatsHandler",
"RemapFlowPeerHandler",
];
for (handler, name) in handlers.iter().zip(names.into_iter()) {
@@ -1052,7 +1061,7 @@ mod tests {
let group = builder.build().unwrap();
let handlers = group.handlers;
assert_eq!(12, handlers.len());
assert_eq!(13, handlers.len());
let names = [
"ResponseHeaderHandler",
@@ -1067,6 +1076,7 @@ mod tests {
"MailboxHandler",
"FilterInactiveRegionStatsHandler",
"ResponseHeaderHandler",
"RemapFlowPeerHandler",
];
for (handler, name) in handlers.iter().zip(names.into_iter()) {
@@ -1084,7 +1094,7 @@ mod tests {
let group = builder.build().unwrap();
let handlers = group.handlers;
assert_eq!(12, handlers.len());
assert_eq!(13, handlers.len());
let names = [
"CollectStatsHandler",
@@ -1099,6 +1109,7 @@ mod tests {
"MailboxHandler",
"FilterInactiveRegionStatsHandler",
"CollectStatsHandler",
"RemapFlowPeerHandler",
];
for (handler, name) in handlers.iter().zip(names.into_iter()) {

View File

@@ -21,7 +21,7 @@ use common_meta::key::node_address::{NodeAddressKey, NodeAddressValue};
use common_meta::key::{MetadataKey, MetadataValue};
use common_meta::peer::Peer;
use common_meta::rpc::store::PutRequest;
use common_telemetry::{error, warn};
use common_telemetry::{error, info, warn};
use dashmap::DashMap;
use snafu::ResultExt;
@@ -185,6 +185,10 @@ async fn rewrite_node_address(ctx: &mut Context, stat: &Stat) {
match ctx.leader_cached_kv_backend.put(put).await {
Ok(_) => {
info!(
"Successfully updated datanode `NodeAddressValue`: {:?}",
peer
);
// broadcast invalidating cache
let cache_idents = stat
.table_ids()
@@ -200,11 +204,14 @@ async fn rewrite_node_address(ctx: &mut Context, stat: &Stat) {
}
}
Err(e) => {
error!(e; "Failed to update NodeAddressValue: {:?}", peer);
error!(e; "Failed to update datanode `NodeAddressValue`: {:?}", peer);
}
}
} else {
warn!("Failed to serialize NodeAddressValue: {:?}", peer);
warn!(
"Failed to serialize datanode `NodeAddressValue`: {:?}",
peer
);
}
}

View File

@@ -0,0 +1,92 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use api::v1::meta::{HeartbeatRequest, Peer, Role};
use common_meta::key::node_address::{NodeAddressKey, NodeAddressValue};
use common_meta::key::{MetadataKey, MetadataValue};
use common_meta::rpc::store::PutRequest;
use common_telemetry::{error, info, warn};
use dashmap::DashMap;
use crate::handler::{HandleControl, HeartbeatAccumulator, HeartbeatHandler};
use crate::metasrv::Context;
use crate::Result;
#[derive(Debug, Default)]
pub struct RemapFlowPeerHandler {
/// flow_node_id -> epoch
epoch_cache: DashMap<u64, u64>,
}
#[async_trait::async_trait]
impl HeartbeatHandler for RemapFlowPeerHandler {
fn is_acceptable(&self, role: Role) -> bool {
role == Role::Flownode
}
async fn handle(
&self,
req: &HeartbeatRequest,
ctx: &mut Context,
_acc: &mut HeartbeatAccumulator,
) -> Result<HandleControl> {
let Some(peer) = req.peer.as_ref() else {
return Ok(HandleControl::Continue);
};
let current_epoch = req.node_epoch;
let flow_node_id = peer.id;
let refresh = if let Some(mut epoch) = self.epoch_cache.get_mut(&flow_node_id) {
if current_epoch > *epoch.value() {
*epoch.value_mut() = current_epoch;
true
} else {
false
}
} else {
self.epoch_cache.insert(flow_node_id, current_epoch);
true
};
if refresh {
rewrite_node_address(ctx, peer).await;
}
Ok(HandleControl::Continue)
}
}
async fn rewrite_node_address(ctx: &mut Context, peer: &Peer) {
let key = NodeAddressKey::with_flownode(peer.id).to_bytes();
if let Ok(value) = NodeAddressValue::new(peer.clone().into()).try_as_raw_value() {
let put = PutRequest {
key,
value,
prev_kv: false,
};
match ctx.leader_cached_kv_backend.put(put).await {
Ok(_) => {
info!("Successfully updated flow `NodeAddressValue`: {:?}", peer);
// TODO(discord): broadcast invalidating cache to all frontends
}
Err(e) => {
error!(e; "Failed to update flow `NodeAddressValue`: {:?}", peer);
}
}
} else {
warn!("Failed to serialize flow `NodeAddressValue`: {:?}", peer);
}
}

View File

@@ -21,7 +21,7 @@ use common_meta::node_manager::NodeManagerRef;
use common_query::error::Result;
use common_telemetry::tracing_context::TracingContext;
use futures::stream::FuturesUnordered;
use futures::{StreamExt, TryStreamExt};
use futures::StreamExt;
use session::context::QueryContextRef;
use snafu::{OptionExt, ResultExt};
@@ -81,7 +81,6 @@ impl FlowServiceOperator {
.flow_metadata_manager
.flow_route_manager()
.routes(id)
.try_collect::<Vec<_>>()
.await
.map_err(BoxedError::new)
.context(common_query::error::ExecuteSnafu)?;