From 765d1277ee6b71b2918e79cc478b73e4ca5388df Mon Sep 17 00:00:00 2001 From: "Lei, HUANG" <6406592+v0y4g3r@users.noreply.github.com> Date: Thu, 27 Feb 2025 14:16:36 +0800 Subject: [PATCH] fix(metasrv): clean expired nodes in memory (#5592) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * fix/frontend-node-state: Refactor NodeInfoKey and Context Handling in Meta Server • Removed unused cluster_id from NodeInfoKey struct. • Updated HeartbeatHandlerGroup to return Context alongside HeartbeatResponse. • Added current_node_info to Context for tracking node information. • Implemented on_node_disconnect in Context to handle node disconnection events, specifically for Frontend roles. • Adjusted register_pusher function to return PusherId directly. • Updated tests to accommodate changes in Context structure. * fix/frontend-node-state: Refactor Heartbeat Handler Context Management Refactored the HeartbeatHandlerGroup::handle method to use a mutable reference for Context instead of passing it by value. This change simplifies the context management by eliminating the need to return the context with the response. Updated the Metasrv implementation to align with this new context handling approach, improving code clarity and reducing unnecessary context cloning. * revert: clean cluster info on disconnect * fix/frontend-node-state: Add Frontend Expiry Listener and Update NodeInfoKey Conversion • Introduced FrontendExpiryListener to manage the expiration of frontend nodes, including its integration with leadership change notifications. • Modified NodeInfoKey conversion to use references, enhancing efficiency and consistency across the codebase. • Updated collect_cluster_info_handler and metasrv to incorporate the new listener and conversion changes. • Added frontend_expiry module to the project structure for better organization and maintainability. * chore: add config for node expiry * add some doc * fix: clippy * fix/frontend-node-state: ### Refactor Node Expiry Handling - **Configuration Update**: Removed `node_expiry_tick` from `metasrv.example.toml` and `MetasrvOptions` in `metasrv.rs`. - **Module Renaming**: Renamed `frontend_expiry.rs` to `node_expiry_listener.rs` and updated references in `lib.rs`. - **Code Refactoring**: Replaced `FrontendExpiryListener` with `NodeExpiryListener` in `node_expiry_listener.rs` and `metasrv.rs`, removing the tick interval and adjusting logic to use a fixed 60-second interval for node expiry checks. * fix/frontend-node-state: Improve logging in `node_expiry_listener.rs` - Enhanced warning message to include peer information when an unrecognized node info key is encountered in `node_expiry_listener.rs`. * docs: update config docs * fix/frontend-node-state: **Refactor Context Handling in Heartbeat Services** - Updated `HeartbeatHandlerGroup` in `handler.rs` to pass `Context` by value instead of by mutable reference, allowing for more flexible context management. - Modified `Metasrv` implementation in `heartbeat.rs` to clone `Context` when passing to `handle` method, ensuring thread safety and consistency in asynchronous operations. --- config/config.md | 1 + config/metasrv.example.toml | 3 + src/common/meta/src/cluster.rs | 10 +- src/common/meta/src/lib.rs | 1 + src/common/meta/src/node_expiry_listener.rs | 152 ++++++++++++++++++ .../handler/collect_cluster_info_handler.rs | 2 +- src/meta-srv/src/metasrv.rs | 8 + src/meta-srv/src/service/heartbeat.rs | 8 +- 8 files changed, 175 insertions(+), 10 deletions(-) create mode 100644 src/common/meta/src/node_expiry_listener.rs diff --git a/config/config.md b/config/config.md index aaa92c7f35..107da0b35b 100644 --- a/config/config.md +++ b/config/config.md @@ -319,6 +319,7 @@ | `selector` | String | `round_robin` | Datanode selector type.
- `round_robin` (default value)
- `lease_based`
- `load_based`
For details, please see "https://docs.greptime.com/developer-guide/metasrv/selector". | | `use_memory_store` | Bool | `false` | Store data in memory. | | `enable_region_failover` | Bool | `false` | Whether to enable region failover.
This feature is only available on GreptimeDB running on cluster mode and
- Using Remote WAL
- Using shared storage (e.g., s3). | +| `node_max_idle_time` | String | `24hours` | Max allowed idle time before removing node info from metasrv memory. | | `enable_telemetry` | Bool | `true` | Whether to enable greptimedb telemetry. Enabled by default. | | `runtime` | -- | -- | The runtime options. | | `runtime.global_rt_size` | Integer | `8` | The number of threads to execute the runtime for global read operations. | diff --git a/config/metasrv.example.toml b/config/metasrv.example.toml index 18b203f204..842ac21530 100644 --- a/config/metasrv.example.toml +++ b/config/metasrv.example.toml @@ -50,6 +50,9 @@ use_memory_store = false ## - Using shared storage (e.g., s3). enable_region_failover = false +## Max allowed idle time before removing node info from metasrv memory. +node_max_idle_time = "24hours" + ## Whether to enable greptimedb telemetry. Enabled by default. #+ enable_telemetry = true diff --git a/src/common/meta/src/cluster.rs b/src/common/meta/src/cluster.rs index bb2429c0e6..f73dcf1537 100644 --- a/src/common/meta/src/cluster.rs +++ b/src/common/meta/src/cluster.rs @@ -57,12 +57,10 @@ pub trait ClusterInfo { } /// The key of [NodeInfo] in the storage. The format is `__meta_cluster_node_info-{cluster_id}-{role}-{node_id}`. -/// -/// This key cannot be used to describe the `Metasrv` because the `Metasrv` does not have -/// a `cluster_id`, it serves multiple clusters. #[derive(Debug, Clone, Copy, Eq, Hash, PartialEq, Serialize, Deserialize)] pub struct NodeInfoKey { /// The cluster id. + // todo(hl): remove cluster_id as it is not assigned anywhere. pub cluster_id: ClusterId, /// The role of the node. It can be `[Role::Datanode]` or `[Role::Frontend]`. pub role: Role, @@ -232,8 +230,8 @@ impl TryFrom> for NodeInfoKey { } } -impl From for Vec { - fn from(key: NodeInfoKey) -> Self { +impl From<&NodeInfoKey> for Vec { + fn from(key: &NodeInfoKey) -> Self { format!( "{}-{}-{}-{}", CLUSTER_NODE_INFO_PREFIX, @@ -315,7 +313,7 @@ mod tests { node_id: 2, }; - let key_bytes: Vec = key.into(); + let key_bytes: Vec = (&key).into(); let new_key: NodeInfoKey = key_bytes.try_into().unwrap(); assert_eq!(1, new_key.cluster_id); diff --git a/src/common/meta/src/lib.rs b/src/common/meta/src/lib.rs index fd6fc775a4..7479a14337 100644 --- a/src/common/meta/src/lib.rs +++ b/src/common/meta/src/lib.rs @@ -34,6 +34,7 @@ pub mod kv_backend; pub mod leadership_notifier; pub mod lock_key; pub mod metrics; +pub mod node_expiry_listener; pub mod node_manager; pub mod peer; pub mod range_stream; diff --git a/src/common/meta/src/node_expiry_listener.rs b/src/common/meta/src/node_expiry_listener.rs new file mode 100644 index 0000000000..c5da2936a5 --- /dev/null +++ b/src/common/meta/src/node_expiry_listener.rs @@ -0,0 +1,152 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::sync::Mutex; +use std::time::Duration; + +use common_telemetry::{debug, error, info, warn}; +use tokio::task::JoinHandle; +use tokio::time::{interval, MissedTickBehavior}; + +use crate::cluster::{NodeInfo, NodeInfoKey}; +use crate::error; +use crate::kv_backend::ResettableKvBackendRef; +use crate::leadership_notifier::LeadershipChangeListener; +use crate::rpc::store::RangeRequest; +use crate::rpc::KeyValue; + +/// [NodeExpiryListener] periodically checks all node info in memory and removes +/// expired node info to prevent memory leak. +pub struct NodeExpiryListener { + handle: Mutex>>, + max_idle_time: Duration, + in_memory: ResettableKvBackendRef, +} + +impl Drop for NodeExpiryListener { + fn drop(&mut self) { + self.stop(); + } +} + +impl NodeExpiryListener { + pub fn new(max_idle_time: Duration, in_memory: ResettableKvBackendRef) -> Self { + Self { + handle: Mutex::new(None), + max_idle_time, + in_memory, + } + } + + async fn start(&self) { + let mut handle = self.handle.lock().unwrap(); + if handle.is_none() { + let in_memory = self.in_memory.clone(); + + let max_idle_time = self.max_idle_time; + let ticker_loop = tokio::spawn(async move { + // Run clean task every minute. + let mut interval = interval(Duration::from_secs(60)); + interval.set_missed_tick_behavior(MissedTickBehavior::Skip); + loop { + interval.tick().await; + if let Err(e) = Self::clean_expired_nodes(&in_memory, max_idle_time).await { + error!(e; "Failed to clean expired node"); + } + } + }); + *handle = Some(ticker_loop); + } + } + + fn stop(&self) { + if let Some(handle) = self.handle.lock().unwrap().take() { + handle.abort(); + info!("Node expiry listener stopped") + } + } + + /// Cleans expired nodes from memory. + async fn clean_expired_nodes( + in_memory: &ResettableKvBackendRef, + max_idle_time: Duration, + ) -> error::Result<()> { + let node_keys = Self::list_expired_nodes(in_memory, max_idle_time).await?; + for key in node_keys { + let key_bytes: Vec = (&key).into(); + if let Err(e) = in_memory.delete(&key_bytes, false).await { + warn!(e; "Failed to delete expired node: {:?}", key_bytes); + } else { + debug!("Deleted expired node key: {:?}", key); + } + } + Ok(()) + } + + /// Lists expired nodes that have been inactive more than `max_idle_time`. + async fn list_expired_nodes( + in_memory: &ResettableKvBackendRef, + max_idle_time: Duration, + ) -> error::Result> { + let prefix = NodeInfoKey::key_prefix_with_cluster_id(0); + let req = RangeRequest::new().with_prefix(prefix); + let current_time_millis = common_time::util::current_time_millis(); + let resp = in_memory.range(req).await?; + Ok(resp + .kvs + .into_iter() + .filter_map(move |KeyValue { key, value }| { + let Ok(info) = NodeInfo::try_from(value).inspect_err(|e| { + warn!(e; "Unrecognized node info value"); + }) else { + return None; + }; + if (current_time_millis - info.last_activity_ts) > max_idle_time.as_millis() as i64 + { + NodeInfoKey::try_from(key) + .inspect_err(|e| { + warn!(e; "Unrecognized node info key: {:?}", info.peer); + }) + .ok() + .inspect(|node_key| { + debug!("Found expired node: {:?}", node_key); + }) + } else { + None + } + })) + } +} + +#[async_trait::async_trait] +impl LeadershipChangeListener for NodeExpiryListener { + fn name(&self) -> &str { + "NodeExpiryListener" + } + + async fn on_leader_start(&self) -> error::Result<()> { + self.start().await; + info!( + "On leader start, node expiry listener started with max idle time: {:?}", + self.max_idle_time + ); + Ok(()) + } + + async fn on_leader_stop(&self) -> error::Result<()> { + self.stop(); + info!("On leader stop, node expiry listener stopped"); + Ok(()) + } +} diff --git a/src/meta-srv/src/handler/collect_cluster_info_handler.rs b/src/meta-srv/src/handler/collect_cluster_info_handler.rs index 0723ae9cad..1c897e050b 100644 --- a/src/meta-srv/src/handler/collect_cluster_info_handler.rs +++ b/src/meta-srv/src/handler/collect_cluster_info_handler.rs @@ -157,7 +157,7 @@ fn extract_base_info(request: &HeartbeatRequest) -> Option<(NodeInfoKey, Peer, P } async fn put_into_memory_store(ctx: &mut Context, key: NodeInfoKey, value: NodeInfo) -> Result<()> { - let key = key.into(); + let key = (&key).into(); let value = value.try_into().context(InvalidClusterInfoFormatSnafu)?; let put_req = PutRequest { key, diff --git a/src/meta-srv/src/metasrv.rs b/src/meta-srv/src/metasrv.rs index d46692ebf6..b8c29d988a 100644 --- a/src/meta-srv/src/metasrv.rs +++ b/src/meta-srv/src/metasrv.rs @@ -32,6 +32,7 @@ use common_meta::kv_backend::{KvBackendRef, ResettableKvBackend, ResettableKvBac use common_meta::leadership_notifier::{ LeadershipChangeNotifier, LeadershipChangeNotifierCustomizerRef, }; +use common_meta::node_expiry_listener::NodeExpiryListener; use common_meta::peer::Peer; use common_meta::region_keeper::MemoryRegionKeeperRef; use common_meta::wal_options_allocator::WalOptionsAllocatorRef; @@ -151,6 +152,8 @@ pub struct MetasrvOptions { #[cfg(feature = "pg_kvbackend")] /// Lock id for meta kv election. Only effect when using pg_kvbackend. pub meta_election_lock_id: u64, + #[serde(with = "humantime_serde")] + pub node_max_idle_time: Duration, } const DEFAULT_METASRV_ADDR_PORT: &str = "3002"; @@ -192,6 +195,7 @@ impl Default for MetasrvOptions { meta_table_name: DEFAULT_META_TABLE_NAME.to_string(), #[cfg(feature = "pg_kvbackend")] meta_election_lock_id: DEFAULT_META_ELECTION_LOCK_ID, + node_max_idle_time: Duration::from_secs(24 * 60 * 60), } } } @@ -442,6 +446,10 @@ impl Metasrv { leadership_change_notifier.add_listener(self.wal_options_allocator.clone()); leadership_change_notifier .add_listener(Arc::new(ProcedureManagerListenerAdapter(procedure_manager))); + leadership_change_notifier.add_listener(Arc::new(NodeExpiryListener::new( + self.options.node_max_idle_time, + self.in_memory.clone(), + ))); if let Some(region_supervisor_ticker) = &self.region_supervisor_ticker { leadership_change_notifier.add_listener(region_supervisor_ticker.clone() as _); } diff --git a/src/meta-srv/src/service/heartbeat.rs b/src/meta-srv/src/service/heartbeat.rs index 3d839fd082..45adb5f57e 100644 --- a/src/meta-srv/src/service/heartbeat.rs +++ b/src/meta-srv/src/service/heartbeat.rs @@ -68,13 +68,15 @@ impl heartbeat_server::Heartbeat for Metasrv { }; if pusher_id.is_none() { - pusher_id = register_pusher(&handler_group, header, tx.clone()).await; + pusher_id = + Some(register_pusher(&handler_group, header, tx.clone()).await); } if let Some(k) = &pusher_id { METRIC_META_HEARTBEAT_RECV.with_label_values(&[&k.to_string()]); } else { METRIC_META_HEARTBEAT_RECV.with_label_values(&["none"]); } + let res = handler_group .handle(req, ctx.clone()) .await @@ -173,13 +175,13 @@ async fn register_pusher( handler_group: &HeartbeatHandlerGroup, header: &RequestHeader, sender: Sender>, -) -> Option { +) -> PusherId { let role = header.role(); let id = get_node_id(header); let pusher_id = PusherId::new(role, id); let pusher = Pusher::new(sender, header); handler_group.register_pusher(pusher_id, pusher).await; - Some(pusher_id) + pusher_id } #[cfg(test)]