diff --git a/src/common/meta/src/cache/flow/table_flownode.rs b/src/common/meta/src/cache/flow/table_flownode.rs index 50a47aade1..b8adfb5e02 100644 --- a/src/common/meta/src/cache/flow/table_flownode.rs +++ b/src/common/meta/src/cache/flow/table_flownode.rs @@ -16,7 +16,6 @@ use std::collections::HashMap; use std::sync::Arc; use futures::future::BoxFuture; -use futures::TryStreamExt; use moka::future::Cache; use moka::ops::compute::Op; use table::metadata::TableId; @@ -54,9 +53,13 @@ fn init_factory(table_flow_manager: TableFlowManagerRef) -> Initializer>() .await + .map(|flows| { + flows + .into_iter() + .map(|(key, value)| (key.flownode_id(), value.peer)) + .collect::>() + }) // We must cache the `HashSet` even if it's empty, // to avoid future requests to the remote storage next time; // If the value is added to the remote storage, diff --git a/src/common/meta/src/ddl/create_flow.rs b/src/common/meta/src/ddl/create_flow.rs index ba0582f71b..db8a700059 100644 --- a/src/common/meta/src/ddl/create_flow.rs +++ b/src/common/meta/src/ddl/create_flow.rs @@ -29,7 +29,6 @@ use common_procedure::{ use common_telemetry::info; use common_telemetry::tracing_context::TracingContext; use futures::future::join_all; -use futures::TryStreamExt; use itertools::Itertools; use serde::{Deserialize, Serialize}; use snafu::{ensure, ResultExt}; @@ -131,9 +130,10 @@ impl CreateFlowProcedure { .flow_metadata_manager .flow_route_manager() .routes(flow_id) - .map_ok(|(_, value)| value.peer) - .try_collect::>() - .await?; + .await? + .into_iter() + .map(|(_, value)| value.peer) + .collect::>(); self.data.flow_id = Some(flow_id); self.data.peers = peers; info!("Replacing flow, flow_id: {}", flow_id); diff --git a/src/common/meta/src/ddl/drop_flow/metadata.rs b/src/common/meta/src/ddl/drop_flow/metadata.rs index 68f99dd4b4..fa740b1775 100644 --- a/src/common/meta/src/ddl/drop_flow/metadata.rs +++ b/src/common/meta/src/ddl/drop_flow/metadata.rs @@ -13,7 +13,6 @@ // limitations under the License. use common_catalog::format_full_flow_name; -use futures::TryStreamExt; use snafu::{ensure, OptionExt}; use crate::ddl::drop_flow::DropFlowProcedure; @@ -39,9 +38,10 @@ impl DropFlowProcedure { .flow_metadata_manager .flow_route_manager() .routes(self.data.task.flow_id) - .map_ok(|(_, value)| value) - .try_collect::>() - .await?; + .await? + .into_iter() + .map(|(_, value)| value) + .collect::>(); ensure!( !flow_route_values.is_empty(), error::FlowRouteNotFoundSnafu { diff --git a/src/common/meta/src/key/flow.rs b/src/common/meta/src/key/flow.rs index 5ea39ae228..78e14594dd 100644 --- a/src/common/meta/src/key/flow.rs +++ b/src/common/meta/src/key/flow.rs @@ -16,9 +16,9 @@ pub mod flow_info; pub(crate) mod flow_name; pub(crate) mod flow_route; pub mod flow_state; +mod flownode_addr_helper; pub(crate) mod flownode_flow; pub(crate) mod table_flow; - use std::ops::Deref; use std::sync::Arc; @@ -506,7 +506,6 @@ mod tests { let routes = flow_metadata_manager .flow_route_manager() .routes(flow_id) - .try_collect::>() .await .unwrap(); assert_eq!( @@ -538,7 +537,6 @@ mod tests { let nodes = flow_metadata_manager .table_flow_manager() .flows(table_id) - .try_collect::>() .await .unwrap(); assert_eq!( @@ -727,7 +725,6 @@ mod tests { let routes = flow_metadata_manager .flow_route_manager() .routes(flow_id) - .try_collect::>() .await .unwrap(); assert_eq!( @@ -759,7 +756,6 @@ mod tests { let nodes = flow_metadata_manager .table_flow_manager() .flows(table_id) - .try_collect::>() .await .unwrap(); assert_eq!( diff --git a/src/common/meta/src/key/flow/flow_route.rs b/src/common/meta/src/key/flow/flow_route.rs index c8d81c5e2a..1459675686 100644 --- a/src/common/meta/src/key/flow/flow_route.rs +++ b/src/common/meta/src/key/flow/flow_route.rs @@ -12,14 +12,15 @@ // See the License for the specific language governing permissions and // limitations under the License. -use futures::stream::BoxStream; +use futures::TryStreamExt; use lazy_static::lazy_static; use regex::Regex; use serde::{Deserialize, Serialize}; use snafu::OptionExt; use crate::error::{self, Result}; -use crate::key::flow::FlowScoped; +use crate::key::flow::{flownode_addr_helper, FlowScoped}; +use crate::key::node_address::NodeAddressKey; use crate::key::{BytesAdapter, FlowId, FlowPartitionId, MetadataKey, MetadataValue}; use crate::kv_backend::txn::{Txn, TxnOp}; use crate::kv_backend::KvBackendRef; @@ -167,10 +168,7 @@ impl FlowRouteManager { } /// Retrieves all [FlowRouteValue]s of the specified `flow_id`. - pub fn routes( - &self, - flow_id: FlowId, - ) -> BoxStream<'static, Result<(FlowRouteKey, FlowRouteValue)>> { + pub async fn routes(&self, flow_id: FlowId) -> Result> { let start_key = FlowRouteKey::range_start_key(flow_id); let req = RangeRequest::new().with_prefix(start_key); let stream = PaginationStream::new( @@ -181,7 +179,9 @@ impl FlowRouteManager { ) .into_stream(); - Box::pin(stream) + let mut res = stream.try_collect::>().await?; + self.remap_flow_route_addresses(&mut res).await?; + Ok(res) } /// Builds a create flow routes transaction. @@ -203,6 +203,28 @@ impl FlowRouteManager { Ok(Txn::new().and_then(txns)) } + + async fn remap_flow_route_addresses( + &self, + flow_routes: &mut [(FlowRouteKey, FlowRouteValue)], + ) -> Result<()> { + let keys = flow_routes + .iter() + .map(|(_, value)| NodeAddressKey::with_flownode(value.peer.id)) + .collect(); + let flow_node_addrs = + flownode_addr_helper::get_flownode_addresses(&self.kv_backend, keys).await?; + for (_, flow_route_value) in flow_routes.iter_mut() { + let flownode_id = flow_route_value.peer.id; + // If an id lacks a corresponding address in the `flow_node_addrs`, + // it means the old address in `table_flow_value` is still valid, + // which is expected. + if let Some(node_addr) = flow_node_addrs.get(&flownode_id) { + flow_route_value.peer.addr = node_addr.peer.addr.clone(); + } + } + Ok(()) + } } #[cfg(test)] diff --git a/src/common/meta/src/key/flow/flownode_addr_helper.rs b/src/common/meta/src/key/flow/flownode_addr_helper.rs new file mode 100644 index 0000000000..fa7f27b1e9 --- /dev/null +++ b/src/common/meta/src/key/flow/flownode_addr_helper.rs @@ -0,0 +1,47 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::collections::HashMap; + +use crate::error::Result; +use crate::key::node_address::{NodeAddressKey, NodeAddressValue}; +use crate::key::{MetadataKey, MetadataValue}; +use crate::kv_backend::KvBackendRef; +use crate::rpc::store::BatchGetRequest; + +/// Get the addresses of the flownodes. +/// The result is a map: node_id -> NodeAddressValue +pub(crate) async fn get_flownode_addresses( + kv_backend: &KvBackendRef, + keys: Vec, +) -> Result> { + if keys.is_empty() { + return Ok(HashMap::default()); + } + + let req = BatchGetRequest { + keys: keys.into_iter().map(|k| k.to_bytes()).collect(), + }; + kv_backend + .batch_get(req) + .await? + .kvs + .into_iter() + .map(|kv| { + let key = NodeAddressKey::from_bytes(&kv.key)?; + let value = NodeAddressValue::try_from_raw_value(&kv.value)?; + Ok((key.node_id, value)) + }) + .collect() +} diff --git a/src/common/meta/src/key/flow/table_flow.rs b/src/common/meta/src/key/flow/table_flow.rs index 4aa4ab060a..fff9a746c8 100644 --- a/src/common/meta/src/key/flow/table_flow.rs +++ b/src/common/meta/src/key/flow/table_flow.rs @@ -14,7 +14,7 @@ use std::sync::Arc; -use futures::stream::BoxStream; +use futures::TryStreamExt; use lazy_static::lazy_static; use regex::Regex; use serde::{Deserialize, Serialize}; @@ -22,7 +22,8 @@ use snafu::OptionExt; use table::metadata::TableId; use crate::error::{self, Result}; -use crate::key::flow::FlowScoped; +use crate::key::flow::{flownode_addr_helper, FlowScoped}; +use crate::key::node_address::NodeAddressKey; use crate::key::{BytesAdapter, FlowId, FlowPartitionId, MetadataKey, MetadataValue}; use crate::kv_backend::txn::{Txn, TxnOp}; use crate::kv_backend::KvBackendRef; @@ -196,10 +197,7 @@ impl TableFlowManager { /// Retrieves all [TableFlowKey]s of the specified `table_id`. /// /// TODO(discord9): add cache for it since range request does not support cache. - pub fn flows( - &self, - table_id: TableId, - ) -> BoxStream<'static, Result<(TableFlowKey, TableFlowValue)>> { + pub async fn flows(&self, table_id: TableId) -> Result> { let start_key = TableFlowKey::range_start_key(table_id); let req = RangeRequest::new().with_prefix(start_key); let stream = PaginationStream::new( @@ -210,7 +208,9 @@ impl TableFlowManager { ) .into_stream(); - Box::pin(stream) + let mut res = stream.try_collect::>().await?; + self.remap_table_flow_addresses(&mut res).await?; + Ok(res) } /// Builds a create table flow transaction. @@ -238,6 +238,28 @@ impl TableFlowManager { Ok(Txn::new().and_then(txns)) } + + async fn remap_table_flow_addresses( + &self, + table_flows: &mut [(TableFlowKey, TableFlowValue)], + ) -> Result<()> { + let keys = table_flows + .iter() + .map(|(_, value)| NodeAddressKey::with_flownode(value.peer.id)) + .collect::>(); + let flownode_addrs = + flownode_addr_helper::get_flownode_addresses(&self.kv_backend, keys).await?; + for (_, table_flow_value) in table_flows.iter_mut() { + let flownode_id = table_flow_value.peer.id; + // If an id lacks a corresponding address in the `flow_node_addrs`, + // it means the old address in `table_flow_value` is still valid, + // which is expected. + if let Some(flownode_addr) = flownode_addrs.get(&flownode_id) { + table_flow_value.peer.addr = flownode_addr.peer.addr.clone(); + } + } + Ok(()) + } } #[cfg(test)] diff --git a/src/common/meta/src/key/node_address.rs b/src/common/meta/src/key/node_address.rs index a1a763940b..d1d7850706 100644 --- a/src/common/meta/src/key/node_address.rs +++ b/src/common/meta/src/key/node_address.rs @@ -39,6 +39,10 @@ impl NodeAddressKey { pub fn with_datanode(node_id: u64) -> Self { Self::new(Role::Datanode, node_id) } + + pub fn with_flownode(node_id: u64) -> Self { + Self::new(Role::Flownode, node_id) + } } #[derive(Debug, PartialEq, Serialize, Deserialize, Clone)] diff --git a/src/flow/src/heartbeat.rs b/src/flow/src/heartbeat.rs index 54164fef3d..9cef02eac1 100644 --- a/src/flow/src/heartbeat.rs +++ b/src/flow/src/heartbeat.rs @@ -60,12 +60,12 @@ async fn query_flow_state( #[derive(Clone)] pub struct HeartbeatTask { node_id: u64, + node_epoch: u64, peer_addr: String, meta_client: Arc, report_interval: Duration, retry_interval: Duration, resp_handler_executor: HeartbeatResponseHandlerExecutorRef, - start_time_ms: u64, running: Arc, query_stat_size: Option, } @@ -83,12 +83,12 @@ impl HeartbeatTask { ) -> Self { Self { node_id: opts.node_id.unwrap_or(0), + node_epoch: common_time::util::current_time_millis() as u64, peer_addr: addrs::resolve_addr(&opts.grpc.bind_addr, Some(&opts.grpc.server_addr)), meta_client, report_interval: heartbeat_opts.interval, retry_interval: heartbeat_opts.retry_interval, resp_handler_executor, - start_time_ms: common_time::util::current_time_millis() as u64, running: Arc::new(AtomicBool::new(false)), query_stat_size: None, } @@ -181,7 +181,7 @@ impl HeartbeatTask { mut outgoing_rx: mpsc::Receiver, ) { let report_interval = self.report_interval; - let start_time_ms = self.start_time_ms; + let node_epoch = self.node_epoch; let self_peer = Some(Peer { id: self.node_id, addr: self.peer_addr.clone(), @@ -198,7 +198,8 @@ impl HeartbeatTask { let heartbeat_request = HeartbeatRequest { peer: self_peer, - info: Self::build_node_info(start_time_ms), + node_epoch, + info: Self::build_node_info(node_epoch), ..Default::default() }; diff --git a/src/meta-srv/src/handler.rs b/src/meta-srv/src/handler.rs index ad1492cd7c..4eb9fef91d 100644 --- a/src/meta-srv/src/handler.rs +++ b/src/meta-srv/src/handler.rs @@ -44,6 +44,7 @@ use mailbox_handler::MailboxHandler; use on_leader_start_handler::OnLeaderStartHandler; use publish_heartbeat_handler::PublishHeartbeatHandler; use region_lease_handler::RegionLeaseHandler; +use remap_flow_peer_handler::RemapFlowPeerHandler; use response_header_handler::ResponseHeaderHandler; use snafu::{OptionExt, ResultExt}; use store_api::storage::RegionId; @@ -71,6 +72,7 @@ pub mod mailbox_handler; pub mod on_leader_start_handler; pub mod publish_heartbeat_handler; pub mod region_lease_handler; +pub mod remap_flow_peer_handler; pub mod response_header_handler; #[async_trait::async_trait] @@ -573,6 +575,7 @@ impl HeartbeatHandlerGroupBuilder { self.add_handler_last(publish_heartbeat_handler); } self.add_handler_last(CollectStatsHandler::new(self.flush_stats_factor)); + self.add_handler_last(RemapFlowPeerHandler::default()); if let Some(flow_state_handler) = self.flow_state_handler.take() { self.add_handler_last(flow_state_handler); @@ -853,7 +856,7 @@ mod tests { .unwrap(); let handlers = group.handlers; - assert_eq!(12, handlers.len()); + assert_eq!(13, handlers.len()); let names = [ "ResponseHeaderHandler", @@ -868,6 +871,7 @@ mod tests { "MailboxHandler", "FilterInactiveRegionStatsHandler", "CollectStatsHandler", + "RemapFlowPeerHandler", ]; for (handler, name) in handlers.iter().zip(names.into_iter()) { @@ -888,7 +892,7 @@ mod tests { let group = builder.build().unwrap(); let handlers = group.handlers; - assert_eq!(13, handlers.len()); + assert_eq!(14, handlers.len()); let names = [ "ResponseHeaderHandler", @@ -904,6 +908,7 @@ mod tests { "CollectStatsHandler", "FilterInactiveRegionStatsHandler", "CollectStatsHandler", + "RemapFlowPeerHandler", ]; for (handler, name) in handlers.iter().zip(names.into_iter()) { @@ -921,7 +926,7 @@ mod tests { let group = builder.build().unwrap(); let handlers = group.handlers; - assert_eq!(13, handlers.len()); + assert_eq!(14, handlers.len()); let names = [ "CollectStatsHandler", @@ -937,6 +942,7 @@ mod tests { "MailboxHandler", "FilterInactiveRegionStatsHandler", "CollectStatsHandler", + "RemapFlowPeerHandler", ]; for (handler, name) in handlers.iter().zip(names.into_iter()) { @@ -954,7 +960,7 @@ mod tests { let group = builder.build().unwrap(); let handlers = group.handlers; - assert_eq!(13, handlers.len()); + assert_eq!(14, handlers.len()); let names = [ "ResponseHeaderHandler", @@ -970,6 +976,7 @@ mod tests { "CollectStatsHandler", "FilterInactiveRegionStatsHandler", "CollectStatsHandler", + "RemapFlowPeerHandler", ]; for (handler, name) in handlers.iter().zip(names.into_iter()) { @@ -987,7 +994,7 @@ mod tests { let group = builder.build().unwrap(); let handlers = group.handlers; - assert_eq!(13, handlers.len()); + assert_eq!(14, handlers.len()); let names = [ "ResponseHeaderHandler", @@ -1003,6 +1010,7 @@ mod tests { "FilterInactiveRegionStatsHandler", "CollectStatsHandler", "ResponseHeaderHandler", + "RemapFlowPeerHandler", ]; for (handler, name) in handlers.iter().zip(names.into_iter()) { @@ -1020,7 +1028,7 @@ mod tests { let group = builder.build().unwrap(); let handlers = group.handlers; - assert_eq!(12, handlers.len()); + assert_eq!(13, handlers.len()); let names = [ "ResponseHeaderHandler", @@ -1035,6 +1043,7 @@ mod tests { "CollectStatsHandler", "FilterInactiveRegionStatsHandler", "CollectStatsHandler", + "RemapFlowPeerHandler", ]; for (handler, name) in handlers.iter().zip(names.into_iter()) { @@ -1052,7 +1061,7 @@ mod tests { let group = builder.build().unwrap(); let handlers = group.handlers; - assert_eq!(12, handlers.len()); + assert_eq!(13, handlers.len()); let names = [ "ResponseHeaderHandler", @@ -1067,6 +1076,7 @@ mod tests { "MailboxHandler", "FilterInactiveRegionStatsHandler", "ResponseHeaderHandler", + "RemapFlowPeerHandler", ]; for (handler, name) in handlers.iter().zip(names.into_iter()) { @@ -1084,7 +1094,7 @@ mod tests { let group = builder.build().unwrap(); let handlers = group.handlers; - assert_eq!(12, handlers.len()); + assert_eq!(13, handlers.len()); let names = [ "CollectStatsHandler", @@ -1099,6 +1109,7 @@ mod tests { "MailboxHandler", "FilterInactiveRegionStatsHandler", "CollectStatsHandler", + "RemapFlowPeerHandler", ]; for (handler, name) in handlers.iter().zip(names.into_iter()) { diff --git a/src/meta-srv/src/handler/collect_stats_handler.rs b/src/meta-srv/src/handler/collect_stats_handler.rs index 4c62f44bcf..7b57ab9e55 100644 --- a/src/meta-srv/src/handler/collect_stats_handler.rs +++ b/src/meta-srv/src/handler/collect_stats_handler.rs @@ -21,7 +21,7 @@ use common_meta::key::node_address::{NodeAddressKey, NodeAddressValue}; use common_meta::key::{MetadataKey, MetadataValue}; use common_meta::peer::Peer; use common_meta::rpc::store::PutRequest; -use common_telemetry::{error, warn}; +use common_telemetry::{error, info, warn}; use dashmap::DashMap; use snafu::ResultExt; @@ -185,6 +185,10 @@ async fn rewrite_node_address(ctx: &mut Context, stat: &Stat) { match ctx.leader_cached_kv_backend.put(put).await { Ok(_) => { + info!( + "Successfully updated datanode `NodeAddressValue`: {:?}", + peer + ); // broadcast invalidating cache let cache_idents = stat .table_ids() @@ -200,11 +204,14 @@ async fn rewrite_node_address(ctx: &mut Context, stat: &Stat) { } } Err(e) => { - error!(e; "Failed to update NodeAddressValue: {:?}", peer); + error!(e; "Failed to update datanode `NodeAddressValue`: {:?}", peer); } } } else { - warn!("Failed to serialize NodeAddressValue: {:?}", peer); + warn!( + "Failed to serialize datanode `NodeAddressValue`: {:?}", + peer + ); } } diff --git a/src/meta-srv/src/handler/remap_flow_peer_handler.rs b/src/meta-srv/src/handler/remap_flow_peer_handler.rs new file mode 100644 index 0000000000..13b505b2dd --- /dev/null +++ b/src/meta-srv/src/handler/remap_flow_peer_handler.rs @@ -0,0 +1,92 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use api::v1::meta::{HeartbeatRequest, Peer, Role}; +use common_meta::key::node_address::{NodeAddressKey, NodeAddressValue}; +use common_meta::key::{MetadataKey, MetadataValue}; +use common_meta::rpc::store::PutRequest; +use common_telemetry::{error, info, warn}; +use dashmap::DashMap; + +use crate::handler::{HandleControl, HeartbeatAccumulator, HeartbeatHandler}; +use crate::metasrv::Context; +use crate::Result; + +#[derive(Debug, Default)] +pub struct RemapFlowPeerHandler { + /// flow_node_id -> epoch + epoch_cache: DashMap, +} + +#[async_trait::async_trait] +impl HeartbeatHandler for RemapFlowPeerHandler { + fn is_acceptable(&self, role: Role) -> bool { + role == Role::Flownode + } + + async fn handle( + &self, + req: &HeartbeatRequest, + ctx: &mut Context, + _acc: &mut HeartbeatAccumulator, + ) -> Result { + let Some(peer) = req.peer.as_ref() else { + return Ok(HandleControl::Continue); + }; + + let current_epoch = req.node_epoch; + let flow_node_id = peer.id; + + let refresh = if let Some(mut epoch) = self.epoch_cache.get_mut(&flow_node_id) { + if current_epoch > *epoch.value() { + *epoch.value_mut() = current_epoch; + true + } else { + false + } + } else { + self.epoch_cache.insert(flow_node_id, current_epoch); + true + }; + + if refresh { + rewrite_node_address(ctx, peer).await; + } + + Ok(HandleControl::Continue) + } +} + +async fn rewrite_node_address(ctx: &mut Context, peer: &Peer) { + let key = NodeAddressKey::with_flownode(peer.id).to_bytes(); + if let Ok(value) = NodeAddressValue::new(peer.clone().into()).try_as_raw_value() { + let put = PutRequest { + key, + value, + prev_kv: false, + }; + + match ctx.leader_cached_kv_backend.put(put).await { + Ok(_) => { + info!("Successfully updated flow `NodeAddressValue`: {:?}", peer); + // TODO(discord): broadcast invalidating cache to all frontends + } + Err(e) => { + error!(e; "Failed to update flow `NodeAddressValue`: {:?}", peer); + } + } + } else { + warn!("Failed to serialize flow `NodeAddressValue`: {:?}", peer); + } +} diff --git a/src/operator/src/flow.rs b/src/operator/src/flow.rs index 1c82fcf00a..a816fb66e3 100644 --- a/src/operator/src/flow.rs +++ b/src/operator/src/flow.rs @@ -21,7 +21,7 @@ use common_meta::node_manager::NodeManagerRef; use common_query::error::Result; use common_telemetry::tracing_context::TracingContext; use futures::stream::FuturesUnordered; -use futures::{StreamExt, TryStreamExt}; +use futures::StreamExt; use session::context::QueryContextRef; use snafu::{OptionExt, ResultExt}; @@ -81,7 +81,6 @@ impl FlowServiceOperator { .flow_metadata_manager .flow_route_manager() .routes(id) - .try_collect::>() .await .map_err(BoxedError::new) .context(common_query::error::ExecuteSnafu)?;