From dcae21208bf9b0511ca21174464bffdd52f0cfe5 Mon Sep 17 00:00:00 2001 From: jeremyhi Date: Tue, 10 Sep 2024 20:08:59 +0800 Subject: [PATCH] chore: refresh route table (#4673) * chore: remove error:: * chore: avoid to use get_raw if unnecessary * chore: clearer method name * feat: remap node addresses in table route * chore: add unit test for remap address * feat: refresh node address mapping via heartbeat * feat: broadcast table cache invalidate on new epoch * chore: clarify heartbeat log * chore: remove InvalidHeartbeatRequest * chore: add log * feat: add role into NodeAddressKey * chore: fix test * Update src/common/meta/src/key/table_route.rs Co-authored-by: LFC <990479+MichaelScofield@users.noreply.github.com> * chore: simplify code --------- Co-authored-by: LFC <990479+MichaelScofield@users.noreply.github.com> --- .../meta/src/ddl/alter_logical_tables.rs | 5 +- src/common/meta/src/ddl_manager.rs | 6 +- src/common/meta/src/key.rs | 27 +- src/common/meta/src/key/node_address.rs | 114 ++++++++ src/common/meta/src/key/table_route.rs | 245 +++++++++++++++--- src/datanode/src/heartbeat.rs | 2 +- src/meta-srv/src/error.rs | 8 - .../src/handler/collect_stats_handler.rs | 53 +++- .../src/handler/extract_stat_handler.rs | 11 +- src/meta-srv/src/handler/node_stat.rs | 42 +-- .../src/procedure/region_migration.rs | 2 +- .../downgrade_leader_region.rs | 2 +- .../rollback_downgraded_region.rs | 2 +- .../upgrade_candidate_region.rs | 2 +- 14 files changed, 440 insertions(+), 81 deletions(-) create mode 100644 src/common/meta/src/key/node_address.rs diff --git a/src/common/meta/src/ddl/alter_logical_tables.rs b/src/common/meta/src/ddl/alter_logical_tables.rs index 3af359ef6e..d9b318e9e9 100644 --- a/src/common/meta/src/ddl/alter_logical_tables.rs +++ b/src/common/meta/src/ddl/alter_logical_tables.rs @@ -39,7 +39,7 @@ use crate::key::DeserializedValueWithBytes; use crate::lock_key::{CatalogLock, SchemaLock, TableLock}; use crate::rpc::ddl::AlterTableTask; use crate::rpc::router::find_leaders; -use crate::{cache_invalidator, metrics, ClusterId}; +use crate::{metrics, ClusterId}; pub struct AlterLogicalTablesProcedure { pub context: DdlContext, @@ -170,12 +170,11 @@ impl AlterLogicalTablesProcedure { } pub(crate) async fn on_invalidate_table_cache(&mut self) -> Result { - let ctx = cache_invalidator::Context::default(); let to_invalidate = self.build_table_cache_keys_to_invalidate(); self.context .cache_invalidator - .invalidate(&ctx, &to_invalidate) + .invalidate(&Default::default(), &to_invalidate) .await?; Ok(Status::done()) } diff --git a/src/common/meta/src/ddl_manager.rs b/src/common/meta/src/ddl_manager.rs index 1b561b3043..152a4631e2 100644 --- a/src/common/meta/src/ddl_manager.rs +++ b/src/common/meta/src/ddl_manager.rs @@ -441,11 +441,9 @@ async fn handle_alter_table_task( .table_metadata_manager() .table_route_manager() .table_route_storage() - .get_raw(table_id) + .get(table_id) .await? - .context(TableRouteNotFoundSnafu { table_id })? - .into_inner(); - + .context(TableRouteNotFoundSnafu { table_id })?; ensure!( table_route_value.is_physical(), UnexpectedLogicalRouteTableSnafu { diff --git a/src/common/meta/src/key.rs b/src/common/meta/src/key.rs index df9c8161fa..3cf6f3a3b7 100644 --- a/src/common/meta/src/key.rs +++ b/src/common/meta/src/key.rs @@ -90,6 +90,7 @@ pub mod catalog_name; pub mod datanode_table; pub mod flow; +pub mod node_address; pub mod schema_name; pub mod table_info; pub mod table_name; @@ -102,7 +103,7 @@ pub mod view_info; use std::collections::{BTreeMap, HashMap, HashSet}; use std::fmt::Debug; -use std::ops::Deref; +use std::ops::{Deref, DerefMut}; use std::sync::Arc; use bytes::Bytes; @@ -134,6 +135,7 @@ use self::table_route::{TableRouteManager, TableRouteValue}; use self::tombstone::TombstoneManager; use crate::ddl::utils::region_storage_path; use crate::error::{self, Result, SerdeJsonSnafu}; +use crate::key::node_address::NodeAddressValue; use crate::key::table_route::TableRouteKey; use crate::key::txn_helper::TxnOpGetResponseSet; use crate::kv_backend::txn::{Txn, TxnOp}; @@ -152,12 +154,15 @@ pub const TABLE_NAME_KEY_PREFIX: &str = "__table_name"; pub const CATALOG_NAME_KEY_PREFIX: &str = "__catalog_name"; pub const SCHEMA_NAME_KEY_PREFIX: &str = "__schema_name"; pub const TABLE_ROUTE_PREFIX: &str = "__table_route"; +pub const NODE_ADDRESS_PREFIX: &str = "__node_address"; -pub const CACHE_KEY_PREFIXES: [&str; 4] = [ +/// The keys with these prefixes will be loaded into the cache when the leader starts. +pub const CACHE_KEY_PREFIXES: [&str; 5] = [ TABLE_NAME_KEY_PREFIX, CATALOG_NAME_KEY_PREFIX, SCHEMA_NAME_KEY_PREFIX, TABLE_ROUTE_PREFIX, + NODE_ADDRESS_PREFIX, ]; pub type RegionDistribution = BTreeMap>; @@ -210,6 +215,11 @@ lazy_static! { .unwrap(); } +lazy_static! { + static ref NODE_ADDRESS_PATTERN: Regex = + Regex::new(&format!("^{NODE_ADDRESS_PREFIX}/([0-9]+)/([0-9]+)$")).unwrap(); +} + /// The key of metadata. pub trait MetadataKey<'a, T> { fn to_bytes(&self) -> Vec; @@ -306,6 +316,12 @@ impl Deref for DeserializedValueWithBytes { } } +impl DerefMut for DeserializedValueWithBytes { + fn deref_mut(&mut self) -> &mut Self::Target { + &mut self.inner + } +} + impl Debug for DeserializedValueWithBytes { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { write!( @@ -1230,7 +1246,8 @@ impl_metadata_value! { FlowInfoValue, FlowNameValue, FlowRouteValue, - TableFlowValue + TableFlowValue, + NodeAddressValue } impl_optional_metadata_value! { @@ -1952,7 +1969,7 @@ mod tests { let table_route_value = table_metadata_manager .table_route_manager .table_route_storage() - .get_raw(table_id) + .get_with_raw_bytes(table_id) .await .unwrap() .unwrap(); @@ -2005,7 +2022,7 @@ mod tests { let table_route_value = table_metadata_manager .table_route_manager .table_route_storage() - .get_raw(table_id) + .get_with_raw_bytes(table_id) .await .unwrap() .unwrap(); diff --git a/src/common/meta/src/key/node_address.rs b/src/common/meta/src/key/node_address.rs new file mode 100644 index 0000000000..55085c928f --- /dev/null +++ b/src/common/meta/src/key/node_address.rs @@ -0,0 +1,114 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::fmt::Display; + +use api::v1::meta::Role; +use serde::{Deserialize, Serialize}; +use snafu::OptionExt; + +use crate::error::{InvalidMetadataSnafu, Result}; +use crate::key::{MetadataKey, NODE_ADDRESS_PATTERN, NODE_ADDRESS_PREFIX}; +use crate::peer::Peer; + +/// The key stores node address. +/// +/// The layout: `__node_address/{role}/{node_id}` +#[derive(Debug, PartialEq)] +pub struct NodeAddressKey { + pub role: Role, + pub node_id: u64, +} + +impl NodeAddressKey { + pub fn new(role: Role, node_id: u64) -> Self { + Self { role, node_id } + } + + pub fn with_datanode(node_id: u64) -> Self { + Self::new(Role::Datanode, node_id) + } +} + +#[derive(Debug, PartialEq, Serialize, Deserialize, Clone)] +pub struct NodeAddressValue { + pub peer: Peer, +} + +impl NodeAddressValue { + pub fn new(peer: Peer) -> Self { + Self { peer } + } +} + +impl<'a> MetadataKey<'a, NodeAddressKey> for NodeAddressKey { + fn to_bytes(&self) -> Vec { + self.to_string().into_bytes() + } + + fn from_bytes(bytes: &[u8]) -> Result { + let key = std::str::from_utf8(bytes).map_err(|e| { + InvalidMetadataSnafu { + err_msg: format!( + "NodeAddressKey '{}' is not a valid UTF8 string: {e}", + String::from_utf8_lossy(bytes) + ), + } + .build() + })?; + let captures = NODE_ADDRESS_PATTERN + .captures(key) + .context(InvalidMetadataSnafu { + err_msg: format!("Invalid NodeAddressKey '{key}'"), + })?; + // Safety: pass the regex check above + let role = captures[1].parse::().unwrap(); + let role = Role::try_from(role).map_err(|_| { + InvalidMetadataSnafu { + err_msg: format!("Invalid Role value: {role}"), + } + .build() + })?; + let node_id = captures[2].parse::().unwrap(); + Ok(NodeAddressKey::new(role, node_id)) + } +} + +impl Display for NodeAddressKey { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!( + f, + "{}/{}/{}", + NODE_ADDRESS_PREFIX, self.role as i32, self.node_id + ) + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_node_address_key() { + let key = NodeAddressKey::new(Role::Datanode, 1); + let bytes = key.to_bytes(); + let key2 = NodeAddressKey::from_bytes(&bytes).unwrap(); + assert_eq!(key, key2); + + let key = NodeAddressKey::new(Role::Flownode, 3); + let bytes = key.to_bytes(); + let key2 = NodeAddressKey::from_bytes(&bytes).unwrap(); + assert_eq!(key, key2); + } +} diff --git a/src/common/meta/src/key/table_route.rs b/src/common/meta/src/key/table_route.rs index baaaeb75b9..0be0aab3aa 100644 --- a/src/common/meta/src/key/table_route.rs +++ b/src/common/meta/src/key/table_route.rs @@ -22,9 +22,10 @@ use store_api::storage::{RegionId, RegionNumber}; use table::metadata::TableId; use crate::error::{ - self, InvalidMetadataSnafu, MetadataCorruptionSnafu, Result, SerdeJsonSnafu, - TableRouteNotFoundSnafu, UnexpectedLogicalRouteTableSnafu, + InvalidMetadataSnafu, MetadataCorruptionSnafu, Result, SerdeJsonSnafu, TableRouteNotFoundSnafu, + UnexpectedLogicalRouteTableSnafu, }; +use crate::key::node_address::{NodeAddressKey, NodeAddressValue}; use crate::key::txn_helper::TxnOpGetResponseSet; use crate::key::{ DeserializedValueWithBytes, MetadataKey, MetadataValue, RegionDistribution, @@ -85,7 +86,7 @@ impl TableRouteValue { debug_assert_eq!(region.region.id.table_id(), physical_table_id); RegionId::new(table_id, region.region.id.region_number()) }) - .collect::>(); + .collect(); TableRouteValue::logical(physical_table_id, region_routes) } } @@ -189,12 +190,12 @@ impl TableRouteValue { .region_routes .iter() .map(|region_route| region_route.region.id.region_number()) - .collect::>(), + .collect(), TableRouteValue::Logical(x) => x .region_ids() .iter() .map(|region_id| region_id.region_number()) - .collect::>(), + .collect(), } } } @@ -301,7 +302,7 @@ impl TableRouteManager { Some(route) => { ensure!( route.is_physical(), - error::UnexpectedLogicalRouteTableSnafu { + UnexpectedLogicalRouteTableSnafu { err_msg: format!("{route:?} is a non-physical TableRouteValue.") } ); @@ -321,7 +322,7 @@ impl TableRouteManager { ) -> Result { let table_route = self .storage - .get(logical_or_physical_table_id) + .get_inner(logical_or_physical_table_id) .await? .context(TableRouteNotFoundSnafu { table_id: logical_or_physical_table_id, @@ -335,7 +336,7 @@ impl TableRouteManager { /// Returns the [TableRouteValue::Physical] recursively. /// - /// Returns a [TableRouteNotFound](crate::error::Error::TableRouteNotFound) Error if: + /// Returns a [TableRouteNotFound](error::Error::TableRouteNotFound) Error if: /// - the physical table(`logical_or_physical_table_id`) does not exist /// - the corresponding physical table of the logical table(`logical_or_physical_table_id`) does not exist. pub async fn get_physical_table_route( @@ -528,6 +529,15 @@ impl TableRouteStorage { /// Returns the [`TableRouteValue`]. pub async fn get(&self, table_id: TableId) -> Result> { + let mut table_route = self.get_inner(table_id).await?; + if let Some(table_route) = &mut table_route { + self.remap_route_address(table_route).await?; + }; + + Ok(table_route) + } + + async fn get_inner(&self, table_id: TableId) -> Result> { let key = TableRouteKey::new(table_id); self.kv_backend .get(&key.to_bytes()) @@ -537,7 +547,19 @@ impl TableRouteStorage { } /// Returns the [`TableRouteValue`] wrapped with [`DeserializedValueWithBytes`]. - pub async fn get_raw( + pub async fn get_with_raw_bytes( + &self, + table_id: TableId, + ) -> Result>> { + let mut table_route = self.get_with_raw_bytes_inner(table_id).await?; + if let Some(table_route) = &mut table_route { + self.remap_route_address(table_route).await?; + }; + + Ok(table_route) + } + + async fn get_with_raw_bytes_inner( &self, table_id: TableId, ) -> Result>> { @@ -554,27 +576,27 @@ impl TableRouteStorage { /// Returns a [TableRouteNotFound](crate::error::Error::TableRouteNotFound) Error if: /// - the physical table(`logical_or_physical_table_id`) does not exist /// - the corresponding physical table of the logical table(`logical_or_physical_table_id`) does not exist. - pub async fn get_raw_physical_table_route( + pub async fn get_physical_table_route_with_raw_bytes( &self, logical_or_physical_table_id: TableId, ) -> Result<(TableId, DeserializedValueWithBytes)> { - let table_route = - self.get_raw(logical_or_physical_table_id) - .await? - .context(TableRouteNotFoundSnafu { - table_id: logical_or_physical_table_id, - })?; + let table_route = self + .get_with_raw_bytes(logical_or_physical_table_id) + .await? + .context(TableRouteNotFoundSnafu { + table_id: logical_or_physical_table_id, + })?; match table_route.get_inner_ref() { TableRouteValue::Physical(_) => Ok((logical_or_physical_table_id, table_route)), TableRouteValue::Logical(x) => { let physical_table_id = x.physical_table_id(); - let physical_table_route = - self.get_raw(physical_table_id) - .await? - .context(TableRouteNotFoundSnafu { - table_id: physical_table_id, - })?; + let physical_table_route = self + .get_with_raw_bytes(physical_table_id) + .await? + .context(TableRouteNotFoundSnafu { + table_id: physical_table_id, + })?; Ok((physical_table_id, physical_table_route)) } } @@ -582,6 +604,13 @@ impl TableRouteStorage { /// Returns batch of [`TableRouteValue`] that respects the order of `table_ids`. pub async fn batch_get(&self, table_ids: &[TableId]) -> Result>> { + let mut table_routes = self.batch_get_inner(table_ids).await?; + self.remap_routes_addresses(&mut table_routes).await?; + + Ok(table_routes) + } + + async fn batch_get_inner(&self, table_ids: &[TableId]) -> Result>> { let keys = table_ids .iter() .map(|id| TableRouteKey::new(*id).to_bytes()) @@ -604,8 +633,107 @@ impl TableRouteStorage { Ok(None) } }) - .collect::>>() + .collect() } + + async fn remap_routes_addresses( + &self, + table_routes: &mut [Option], + ) -> Result<()> { + let keys = table_routes + .iter() + .flat_map(|table_route| { + table_route + .as_ref() + .map(extract_address_keys) + .unwrap_or_default() + }) + .collect::>() + .into_iter() + .collect(); + let node_addrs = self.get_node_addresses(keys).await?; + for table_route in table_routes.iter_mut().flatten() { + set_addresses(&node_addrs, table_route)?; + } + + Ok(()) + } + + async fn remap_route_address(&self, table_route: &mut TableRouteValue) -> Result<()> { + let keys = extract_address_keys(table_route).into_iter().collect(); + let node_addrs = self.get_node_addresses(keys).await?; + set_addresses(&node_addrs, table_route)?; + + Ok(()) + } + + async fn get_node_addresses( + &self, + keys: Vec>, + ) -> Result> { + if keys.is_empty() { + return Ok(HashMap::default()); + } + + self.kv_backend + .batch_get(BatchGetRequest { keys }) + .await? + .kvs + .into_iter() + .map(|kv| { + let node_id = NodeAddressKey::from_bytes(&kv.key)?.node_id; + let node_addr = NodeAddressValue::try_from_raw_value(&kv.value)?; + Ok((node_id, node_addr)) + }) + .collect() + } +} + +fn set_addresses( + node_addrs: &HashMap, + table_route: &mut TableRouteValue, +) -> Result<()> { + let TableRouteValue::Physical(physical_table_route) = table_route else { + return Ok(()); + }; + + for region_route in &mut physical_table_route.region_routes { + if let Some(leader) = &mut region_route.leader_peer { + if let Some(node_addr) = node_addrs.get(&leader.id) { + leader.addr = node_addr.peer.addr.clone(); + } + } + for follower in &mut region_route.follower_peers { + if let Some(node_addr) = node_addrs.get(&follower.id) { + follower.addr = node_addr.peer.addr.clone(); + } + } + } + + Ok(()) +} + +fn extract_address_keys(table_route: &TableRouteValue) -> HashSet> { + let TableRouteValue::Physical(physical_table_route) = table_route else { + return HashSet::default(); + }; + + physical_table_route + .region_routes + .iter() + .flat_map(|region_route| { + region_route + .follower_peers + .iter() + .map(|peer| NodeAddressKey::with_datanode(peer.id).to_bytes()) + .chain( + region_route + .leader_peer + .as_ref() + .map(|leader| NodeAddressKey::with_datanode(leader.id).to_bytes()), + ) + }) + .collect() } #[cfg(test)] @@ -614,7 +742,9 @@ mod tests { use super::*; use crate::kv_backend::memory::MemoryKvBackend; - use crate::kv_backend::TxnService; + use crate::kv_backend::{KvBackend, TxnService}; + use crate::peer::Peer; + use crate::rpc::store::PutRequest; #[test] fn test_table_route_compatibility() { @@ -643,18 +773,18 @@ mod tests { } #[tokio::test] - async fn test_table_route_storage_get_raw_empty() { + async fn test_table_route_storage_get_with_raw_bytes_empty() { let kv = Arc::new(MemoryKvBackend::default()); let table_route_storage = TableRouteStorage::new(kv); - let table_route = table_route_storage.get_raw(1024).await.unwrap(); + let table_route = table_route_storage.get_with_raw_bytes(1024).await.unwrap(); assert!(table_route.is_none()); } #[tokio::test] - async fn test_table_route_storage_get_raw() { + async fn test_table_route_storage_get_with_raw_bytes() { let kv = Arc::new(MemoryKvBackend::default()); let table_route_storage = TableRouteStorage::new(kv.clone()); - let table_route = table_route_storage.get_raw(1024).await.unwrap(); + let table_route = table_route_storage.get_with_raw_bytes(1024).await.unwrap(); assert!(table_route.is_none()); let table_route_manager = TableRouteManager::new(kv.clone()); let table_route_value = TableRouteValue::Logical(LogicalTableRouteValue { @@ -667,7 +797,7 @@ mod tests { .unwrap(); let r = kv.txn(txn).await.unwrap(); assert!(r.succeeded); - let table_route = table_route_storage.get_raw(1024).await.unwrap(); + let table_route = table_route_storage.get_with_raw_bytes(1024).await.unwrap(); assert!(table_route.is_some()); let got = table_route.unwrap().inner; assert_eq!(got, table_route_value); @@ -718,4 +848,61 @@ mod tests { assert!(results[2].is_none()); assert_eq!(results[3].as_ref().unwrap(), &routes[0].1); } + + #[tokio::test] + async fn remap_route_address_updates_addresses() { + let kv = Arc::new(MemoryKvBackend::default()); + let table_route_storage = TableRouteStorage::new(kv.clone()); + let mut table_route = TableRouteValue::Physical(PhysicalTableRouteValue { + region_routes: vec![RegionRoute { + leader_peer: Some(Peer { + id: 1, + ..Default::default() + }), + follower_peers: vec![Peer { + id: 2, + ..Default::default() + }], + ..Default::default() + }], + version: 0, + }); + + kv.put(PutRequest { + key: NodeAddressKey::with_datanode(1).to_bytes(), + value: NodeAddressValue { + peer: Peer { + addr: "addr1".to_string(), + ..Default::default() + }, + } + .try_as_raw_value() + .unwrap(), + ..Default::default() + }) + .await + .unwrap(); + + table_route_storage + .remap_route_address(&mut table_route) + .await + .unwrap(); + + if let TableRouteValue::Physical(physical_table_route) = table_route { + assert_eq!( + physical_table_route.region_routes[0] + .leader_peer + .as_ref() + .unwrap() + .addr, + "addr1" + ); + assert_eq!( + physical_table_route.region_routes[0].follower_peers[0].addr, + "" + ); + } else { + panic!("Expected PhysicalTableRouteValue"); + } + } } diff --git a/src/datanode/src/heartbeat.rs b/src/datanode/src/heartbeat.rs index 68b4637fce..6633d0ab62 100644 --- a/src/datanode/src/heartbeat.rs +++ b/src/datanode/src/heartbeat.rs @@ -192,7 +192,7 @@ impl HeartbeatTask { let (outgoing_tx, mut outgoing_rx) = mpsc::channel(16); let mailbox = Arc::new(HeartbeatMailbox::new(outgoing_tx)); - let quit_signal = Arc::new(tokio::sync::Notify::new()); + let quit_signal = Arc::new(Notify::new()); let mut tx = Self::create_streams( &meta_client, diff --git a/src/meta-srv/src/error.rs b/src/meta-srv/src/error.rs index a6f3721f30..d0a58d688e 100644 --- a/src/meta-srv/src/error.rs +++ b/src/meta-srv/src/error.rs @@ -655,13 +655,6 @@ pub enum Error { location: Location, }, - #[snafu(display("Invalid heartbeat request: {}", err_msg))] - InvalidHeartbeatRequest { - err_msg: String, - #[snafu(implicit)] - location: Location, - }, - #[snafu(display("Failed to publish message"))] PublishMessage { #[snafu(source)] @@ -809,7 +802,6 @@ impl ErrorExt for Error { | Error::UnsupportedSelectorType { .. } | Error::InvalidArguments { .. } | Error::InitExportMetricsTask { .. } - | Error::InvalidHeartbeatRequest { .. } | Error::ProcedureNotFound { .. } | Error::TooManyPartitions { .. } | Error::TomlFormat { .. } => StatusCode::InvalidArguments, diff --git a/src/meta-srv/src/handler/collect_stats_handler.rs b/src/meta-srv/src/handler/collect_stats_handler.rs index ca4df868d1..ec9fa231e1 100644 --- a/src/meta-srv/src/handler/collect_stats_handler.rs +++ b/src/meta-srv/src/handler/collect_stats_handler.rs @@ -15,8 +15,12 @@ use std::cmp::Ordering; use api::v1::meta::{HeartbeatRequest, Role}; +use common_meta::instruction::CacheIdent; +use common_meta::key::node_address::{NodeAddressKey, NodeAddressValue}; +use common_meta::key::{MetadataKey, MetadataValue}; +use common_meta::peer::Peer; use common_meta::rpc::store::PutRequest; -use common_telemetry::warn; +use common_telemetry::{error, warn}; use dashmap::DashMap; use snafu::ResultExt; @@ -120,6 +124,13 @@ impl HeartbeatHandler for CollectStatsHandler { true }; + // Need to refresh the [datanode -> address] mapping + if refresh { + // Safety: `epoch_stats.stats` is not empty + let last = epoch_stats.stats.last().unwrap(); + rewrite_node_address(ctx, last).await; + } + if !refresh && epoch_stats.len() < MAX_CACHED_STATS_PER_KEY { return Ok(HandleControl::Continue); } @@ -131,7 +142,7 @@ impl HeartbeatHandler for CollectStatsHandler { let put = PutRequest { key, value, - ..Default::default() + prev_kv: false, }; let _ = ctx @@ -144,6 +155,44 @@ impl HeartbeatHandler for CollectStatsHandler { } } +async fn rewrite_node_address(ctx: &mut Context, stat: &Stat) { + let peer = Peer { + id: stat.id, + addr: stat.addr.clone(), + }; + let key = NodeAddressKey::with_datanode(peer.id).to_bytes(); + if let Ok(value) = NodeAddressValue::new(peer.clone()).try_as_raw_value() { + let put = PutRequest { + key, + value, + prev_kv: false, + }; + + match ctx.leader_cached_kv_backend.put(put).await { + Ok(_) => { + // broadcast invalidating cache + let cache_idents = stat + .table_ids() + .into_iter() + .map(CacheIdent::TableId) + .collect::>(); + if let Err(e) = ctx + .cache_invalidator + .invalidate(&Default::default(), &cache_idents) + .await + { + error!(e; "Failed to invalidate {} `NodeAddressKey` cache, peer: {:?}", cache_idents.len(), peer); + } + } + Err(e) => { + error!(e; "Failed to update NodeAddressValue: {:?}", peer); + } + } + } else { + warn!("Failed to serialize NodeAddressValue: {:?}", peer); + } +} + #[cfg(test)] mod tests { use std::sync::Arc; diff --git a/src/meta-srv/src/handler/extract_stat_handler.rs b/src/meta-srv/src/handler/extract_stat_handler.rs index 8124ec8336..c23e78314f 100644 --- a/src/meta-srv/src/handler/extract_stat_handler.rs +++ b/src/meta-srv/src/handler/extract_stat_handler.rs @@ -13,7 +13,7 @@ // limitations under the License. use api::v1::meta::{HeartbeatRequest, Role}; -use common_telemetry::warn; +use common_telemetry::{info, warn}; use super::node_stat::Stat; use crate::error::Result; @@ -40,12 +40,15 @@ impl HeartbeatHandler for ExtractStatHandler { return Ok(HandleControl::Continue); } - match Stat::try_from(req.clone()) { + match Stat::try_from(req) { Ok(stat) => { let _ = acc.stat.insert(stat); } - Err(err) => { - warn!(err; "Incomplete heartbeat data: {:?}", req); + Err(Some(header)) => { + info!("New handshake request: {:?}", header); + } + Err(_) => { + warn!("Incomplete heartbeat data: {:?}", req); } }; diff --git a/src/meta-srv/src/handler/node_stat.rs b/src/meta-srv/src/handler/node_stat.rs index 5f1ec1cc2b..3a6c6355cd 100644 --- a/src/meta-srv/src/handler/node_stat.rs +++ b/src/meta-srv/src/handler/node_stat.rs @@ -14,14 +14,14 @@ use std::collections::{HashMap, HashSet}; -use api::v1::meta::HeartbeatRequest; +use api::v1::meta::{HeartbeatRequest, RequestHeader}; use common_meta::ClusterId; use common_time::util as time_util; use serde::{Deserialize, Serialize}; use store_api::region_engine::RegionRole; use store_api::storage::RegionId; +use table::metadata::TableId; -use crate::error::{Error, InvalidHeartbeatRequestSnafu}; use crate::key::DatanodeStatKey; #[derive(Debug, Clone, Default, Serialize, Deserialize)] @@ -79,6 +79,11 @@ impl Stat { self.region_stats.iter().map(|s| (s.id, s.role)).collect() } + /// Returns all table ids in the region stats. + pub fn table_ids(&self) -> HashSet { + self.region_stats.iter().map(|s| s.id.table_id()).collect() + } + pub fn retain_active_region_stats(&mut self, inactive_region_ids: &HashSet) { if inactive_region_ids.is_empty() { return; @@ -92,10 +97,10 @@ impl Stat { } } -impl TryFrom for Stat { - type Error = Error; +impl TryFrom<&HeartbeatRequest> for Stat { + type Error = Option; - fn try_from(value: HeartbeatRequest) -> Result { + fn try_from(value: &HeartbeatRequest) -> Result { let HeartbeatRequest { header, peer, @@ -107,9 +112,9 @@ impl TryFrom for Stat { match (header, peer) { (Some(header), Some(peer)) => { let region_stats = region_stats - .into_iter() - .map(RegionStat::try_from) - .collect::, _>>()?; + .iter() + .map(RegionStat::from) + .collect::>(); Ok(Self { timestamp_millis: time_util::current_time_millis(), @@ -117,35 +122,30 @@ impl TryFrom for Stat { // datanode id id: peer.id, // datanode address - addr: peer.addr, + addr: peer.addr.clone(), rcus: region_stats.iter().map(|s| s.rcus).sum(), wcus: region_stats.iter().map(|s| s.wcus).sum(), region_num: region_stats.len() as u64, region_stats, - node_epoch, + node_epoch: *node_epoch, }) } - _ => InvalidHeartbeatRequestSnafu { - err_msg: "missing header or peer", - } - .fail(), + (header, _) => Err(header.clone()), } } } -impl TryFrom for RegionStat { - type Error = Error; - - fn try_from(value: api::v1::meta::RegionStat) -> Result { - Ok(Self { +impl From<&api::v1::meta::RegionStat> for RegionStat { + fn from(value: &api::v1::meta::RegionStat) -> Self { + Self { id: RegionId::from_u64(value.region_id), rcus: value.rcus, wcus: value.wcus, approximate_bytes: value.approximate_bytes, engine: value.engine.to_string(), role: RegionRole::from(value.role()), - extensions: value.extensions, - }) + extensions: value.extensions.clone(), + } } } diff --git a/src/meta-srv/src/procedure/region_migration.rs b/src/meta-srv/src/procedure/region_migration.rs index 123a489cd5..e56afdbf11 100644 --- a/src/meta-srv/src/procedure/region_migration.rs +++ b/src/meta-srv/src/procedure/region_migration.rs @@ -232,7 +232,7 @@ impl Context { .table_metadata_manager .table_route_manager() .table_route_storage() - .get_raw(table_id) + .get_with_raw_bytes(table_id) .await .context(error::TableMetadataManagerSnafu) .map_err(BoxedError::new) diff --git a/src/meta-srv/src/procedure/region_migration/update_metadata/downgrade_leader_region.rs b/src/meta-srv/src/procedure/region_migration/update_metadata/downgrade_leader_region.rs index 11dbebb182..3b3f6a6c0c 100644 --- a/src/meta-srv/src/procedure/region_migration/update_metadata/downgrade_leader_region.rs +++ b/src/meta-srv/src/procedure/region_migration/update_metadata/downgrade_leader_region.rs @@ -146,7 +146,7 @@ mod tests { let original_table_route = table_metadata_manager .table_route_manager() .table_route_storage() - .get_raw(table_id) + .get_with_raw_bytes(table_id) .await .unwrap() .unwrap(); diff --git a/src/meta-srv/src/procedure/region_migration/update_metadata/rollback_downgraded_region.rs b/src/meta-srv/src/procedure/region_migration/update_metadata/rollback_downgraded_region.rs index 8da1bbb0db..4e6f20ef19 100644 --- a/src/meta-srv/src/procedure/region_migration/update_metadata/rollback_downgraded_region.rs +++ b/src/meta-srv/src/procedure/region_migration/update_metadata/rollback_downgraded_region.rs @@ -140,7 +140,7 @@ mod tests { let old_table_route = table_metadata_manager .table_route_manager() .table_route_storage() - .get_raw(table_id) + .get_with_raw_bytes(table_id) .await .unwrap() .unwrap(); diff --git a/src/meta-srv/src/procedure/region_migration/update_metadata/upgrade_candidate_region.rs b/src/meta-srv/src/procedure/region_migration/update_metadata/upgrade_candidate_region.rs index 17cdabc6a0..75f93f760e 100644 --- a/src/meta-srv/src/procedure/region_migration/update_metadata/upgrade_candidate_region.rs +++ b/src/meta-srv/src/procedure/region_migration/update_metadata/upgrade_candidate_region.rs @@ -337,7 +337,7 @@ mod tests { let original_table_route = table_metadata_manager .table_route_manager() .table_route_storage() - .get_raw(table_id) + .get_with_raw_bytes(table_id) .await .unwrap() .unwrap();