diff --git a/config/config.md b/config/config.md index aaa92c7f35..107da0b35b 100644 --- a/config/config.md +++ b/config/config.md @@ -319,6 +319,7 @@ | `selector` | String | `round_robin` | Datanode selector type.
- `round_robin` (default value)
- `lease_based`
- `load_based`
For details, please see "https://docs.greptime.com/developer-guide/metasrv/selector". | | `use_memory_store` | Bool | `false` | Store data in memory. | | `enable_region_failover` | Bool | `false` | Whether to enable region failover.
This feature is only available on GreptimeDB running on cluster mode and
- Using Remote WAL
- Using shared storage (e.g., s3). | +| `node_max_idle_time` | String | `24hours` | Max allowed idle time before removing node info from metasrv memory. | | `enable_telemetry` | Bool | `true` | Whether to enable greptimedb telemetry. Enabled by default. | | `runtime` | -- | -- | The runtime options. | | `runtime.global_rt_size` | Integer | `8` | The number of threads to execute the runtime for global read operations. | diff --git a/config/metasrv.example.toml b/config/metasrv.example.toml index 18b203f204..842ac21530 100644 --- a/config/metasrv.example.toml +++ b/config/metasrv.example.toml @@ -50,6 +50,9 @@ use_memory_store = false ## - Using shared storage (e.g., s3). enable_region_failover = false +## Max allowed idle time before removing node info from metasrv memory. +node_max_idle_time = "24hours" + ## Whether to enable greptimedb telemetry. Enabled by default. #+ enable_telemetry = true diff --git a/src/common/meta/src/cluster.rs b/src/common/meta/src/cluster.rs index bb2429c0e6..f73dcf1537 100644 --- a/src/common/meta/src/cluster.rs +++ b/src/common/meta/src/cluster.rs @@ -57,12 +57,10 @@ pub trait ClusterInfo { } /// The key of [NodeInfo] in the storage. The format is `__meta_cluster_node_info-{cluster_id}-{role}-{node_id}`. -/// -/// This key cannot be used to describe the `Metasrv` because the `Metasrv` does not have -/// a `cluster_id`, it serves multiple clusters. #[derive(Debug, Clone, Copy, Eq, Hash, PartialEq, Serialize, Deserialize)] pub struct NodeInfoKey { /// The cluster id. + // todo(hl): remove cluster_id as it is not assigned anywhere. pub cluster_id: ClusterId, /// The role of the node. It can be `[Role::Datanode]` or `[Role::Frontend]`. pub role: Role, @@ -232,8 +230,8 @@ impl TryFrom> for NodeInfoKey { } } -impl From for Vec { - fn from(key: NodeInfoKey) -> Self { +impl From<&NodeInfoKey> for Vec { + fn from(key: &NodeInfoKey) -> Self { format!( "{}-{}-{}-{}", CLUSTER_NODE_INFO_PREFIX, @@ -315,7 +313,7 @@ mod tests { node_id: 2, }; - let key_bytes: Vec = key.into(); + let key_bytes: Vec = (&key).into(); let new_key: NodeInfoKey = key_bytes.try_into().unwrap(); assert_eq!(1, new_key.cluster_id); diff --git a/src/common/meta/src/lib.rs b/src/common/meta/src/lib.rs index fd6fc775a4..7479a14337 100644 --- a/src/common/meta/src/lib.rs +++ b/src/common/meta/src/lib.rs @@ -34,6 +34,7 @@ pub mod kv_backend; pub mod leadership_notifier; pub mod lock_key; pub mod metrics; +pub mod node_expiry_listener; pub mod node_manager; pub mod peer; pub mod range_stream; diff --git a/src/common/meta/src/node_expiry_listener.rs b/src/common/meta/src/node_expiry_listener.rs new file mode 100644 index 0000000000..c5da2936a5 --- /dev/null +++ b/src/common/meta/src/node_expiry_listener.rs @@ -0,0 +1,152 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::sync::Mutex; +use std::time::Duration; + +use common_telemetry::{debug, error, info, warn}; +use tokio::task::JoinHandle; +use tokio::time::{interval, MissedTickBehavior}; + +use crate::cluster::{NodeInfo, NodeInfoKey}; +use crate::error; +use crate::kv_backend::ResettableKvBackendRef; +use crate::leadership_notifier::LeadershipChangeListener; +use crate::rpc::store::RangeRequest; +use crate::rpc::KeyValue; + +/// [NodeExpiryListener] periodically checks all node info in memory and removes +/// expired node info to prevent memory leak. +pub struct NodeExpiryListener { + handle: Mutex>>, + max_idle_time: Duration, + in_memory: ResettableKvBackendRef, +} + +impl Drop for NodeExpiryListener { + fn drop(&mut self) { + self.stop(); + } +} + +impl NodeExpiryListener { + pub fn new(max_idle_time: Duration, in_memory: ResettableKvBackendRef) -> Self { + Self { + handle: Mutex::new(None), + max_idle_time, + in_memory, + } + } + + async fn start(&self) { + let mut handle = self.handle.lock().unwrap(); + if handle.is_none() { + let in_memory = self.in_memory.clone(); + + let max_idle_time = self.max_idle_time; + let ticker_loop = tokio::spawn(async move { + // Run clean task every minute. + let mut interval = interval(Duration::from_secs(60)); + interval.set_missed_tick_behavior(MissedTickBehavior::Skip); + loop { + interval.tick().await; + if let Err(e) = Self::clean_expired_nodes(&in_memory, max_idle_time).await { + error!(e; "Failed to clean expired node"); + } + } + }); + *handle = Some(ticker_loop); + } + } + + fn stop(&self) { + if let Some(handle) = self.handle.lock().unwrap().take() { + handle.abort(); + info!("Node expiry listener stopped") + } + } + + /// Cleans expired nodes from memory. + async fn clean_expired_nodes( + in_memory: &ResettableKvBackendRef, + max_idle_time: Duration, + ) -> error::Result<()> { + let node_keys = Self::list_expired_nodes(in_memory, max_idle_time).await?; + for key in node_keys { + let key_bytes: Vec = (&key).into(); + if let Err(e) = in_memory.delete(&key_bytes, false).await { + warn!(e; "Failed to delete expired node: {:?}", key_bytes); + } else { + debug!("Deleted expired node key: {:?}", key); + } + } + Ok(()) + } + + /// Lists expired nodes that have been inactive more than `max_idle_time`. + async fn list_expired_nodes( + in_memory: &ResettableKvBackendRef, + max_idle_time: Duration, + ) -> error::Result> { + let prefix = NodeInfoKey::key_prefix_with_cluster_id(0); + let req = RangeRequest::new().with_prefix(prefix); + let current_time_millis = common_time::util::current_time_millis(); + let resp = in_memory.range(req).await?; + Ok(resp + .kvs + .into_iter() + .filter_map(move |KeyValue { key, value }| { + let Ok(info) = NodeInfo::try_from(value).inspect_err(|e| { + warn!(e; "Unrecognized node info value"); + }) else { + return None; + }; + if (current_time_millis - info.last_activity_ts) > max_idle_time.as_millis() as i64 + { + NodeInfoKey::try_from(key) + .inspect_err(|e| { + warn!(e; "Unrecognized node info key: {:?}", info.peer); + }) + .ok() + .inspect(|node_key| { + debug!("Found expired node: {:?}", node_key); + }) + } else { + None + } + })) + } +} + +#[async_trait::async_trait] +impl LeadershipChangeListener for NodeExpiryListener { + fn name(&self) -> &str { + "NodeExpiryListener" + } + + async fn on_leader_start(&self) -> error::Result<()> { + self.start().await; + info!( + "On leader start, node expiry listener started with max idle time: {:?}", + self.max_idle_time + ); + Ok(()) + } + + async fn on_leader_stop(&self) -> error::Result<()> { + self.stop(); + info!("On leader stop, node expiry listener stopped"); + Ok(()) + } +} diff --git a/src/meta-srv/src/handler/collect_cluster_info_handler.rs b/src/meta-srv/src/handler/collect_cluster_info_handler.rs index 0723ae9cad..1c897e050b 100644 --- a/src/meta-srv/src/handler/collect_cluster_info_handler.rs +++ b/src/meta-srv/src/handler/collect_cluster_info_handler.rs @@ -157,7 +157,7 @@ fn extract_base_info(request: &HeartbeatRequest) -> Option<(NodeInfoKey, Peer, P } async fn put_into_memory_store(ctx: &mut Context, key: NodeInfoKey, value: NodeInfo) -> Result<()> { - let key = key.into(); + let key = (&key).into(); let value = value.try_into().context(InvalidClusterInfoFormatSnafu)?; let put_req = PutRequest { key, diff --git a/src/meta-srv/src/metasrv.rs b/src/meta-srv/src/metasrv.rs index d46692ebf6..b8c29d988a 100644 --- a/src/meta-srv/src/metasrv.rs +++ b/src/meta-srv/src/metasrv.rs @@ -32,6 +32,7 @@ use common_meta::kv_backend::{KvBackendRef, ResettableKvBackend, ResettableKvBac use common_meta::leadership_notifier::{ LeadershipChangeNotifier, LeadershipChangeNotifierCustomizerRef, }; +use common_meta::node_expiry_listener::NodeExpiryListener; use common_meta::peer::Peer; use common_meta::region_keeper::MemoryRegionKeeperRef; use common_meta::wal_options_allocator::WalOptionsAllocatorRef; @@ -151,6 +152,8 @@ pub struct MetasrvOptions { #[cfg(feature = "pg_kvbackend")] /// Lock id for meta kv election. Only effect when using pg_kvbackend. pub meta_election_lock_id: u64, + #[serde(with = "humantime_serde")] + pub node_max_idle_time: Duration, } const DEFAULT_METASRV_ADDR_PORT: &str = "3002"; @@ -192,6 +195,7 @@ impl Default for MetasrvOptions { meta_table_name: DEFAULT_META_TABLE_NAME.to_string(), #[cfg(feature = "pg_kvbackend")] meta_election_lock_id: DEFAULT_META_ELECTION_LOCK_ID, + node_max_idle_time: Duration::from_secs(24 * 60 * 60), } } } @@ -442,6 +446,10 @@ impl Metasrv { leadership_change_notifier.add_listener(self.wal_options_allocator.clone()); leadership_change_notifier .add_listener(Arc::new(ProcedureManagerListenerAdapter(procedure_manager))); + leadership_change_notifier.add_listener(Arc::new(NodeExpiryListener::new( + self.options.node_max_idle_time, + self.in_memory.clone(), + ))); if let Some(region_supervisor_ticker) = &self.region_supervisor_ticker { leadership_change_notifier.add_listener(region_supervisor_ticker.clone() as _); } diff --git a/src/meta-srv/src/service/heartbeat.rs b/src/meta-srv/src/service/heartbeat.rs index 3d839fd082..45adb5f57e 100644 --- a/src/meta-srv/src/service/heartbeat.rs +++ b/src/meta-srv/src/service/heartbeat.rs @@ -68,13 +68,15 @@ impl heartbeat_server::Heartbeat for Metasrv { }; if pusher_id.is_none() { - pusher_id = register_pusher(&handler_group, header, tx.clone()).await; + pusher_id = + Some(register_pusher(&handler_group, header, tx.clone()).await); } if let Some(k) = &pusher_id { METRIC_META_HEARTBEAT_RECV.with_label_values(&[&k.to_string()]); } else { METRIC_META_HEARTBEAT_RECV.with_label_values(&["none"]); } + let res = handler_group .handle(req, ctx.clone()) .await @@ -173,13 +175,13 @@ async fn register_pusher( handler_group: &HeartbeatHandlerGroup, header: &RequestHeader, sender: Sender>, -) -> Option { +) -> PusherId { let role = header.role(); let id = get_node_id(header); let pusher_id = PusherId::new(role, id); let pusher = Pusher::new(sender, header); handler_group.register_pusher(pusher_id, pusher).await; - Some(pusher_id) + pusher_id } #[cfg(test)]