From a7fa40e16ddbd85053706523d22e935e1fb76d0e Mon Sep 17 00:00:00 2001 From: JeremyHi Date: Sun, 3 Sep 2023 10:25:50 +0800 Subject: [PATCH] fix: filter out outdated heartbeat (#2303) * fix: filter out outdated heartbeat, #1707 * feat: reorder handlers * refactor: disableXXX to enableXXX * feat: make full use of region leases to facilitate failover * chore: minor refactor * chore: by comment * feat: logging on inactive/active --- src/cmd/src/metasrv.rs | 6 +-- src/meta-srv/src/handler.rs | 37 ++++++------- .../src/handler/check_leader_handler.rs | 1 - .../src/handler/collect_stats_handler.rs | 1 - .../handler/filter_inactive_region_stats.rs | 54 +++++++++++++++++++ .../src/handler/keep_lease_handler.rs | 1 - src/meta-srv/src/handler/mailbox_handler.rs | 1 - src/meta-srv/src/handler/node_stat.rs | 14 +++++ .../src/handler/on_leader_start_handler.rs | 1 - .../src/handler/region_lease_handler.rs | 43 +++++++-------- .../src/handler/response_header_handler.rs | 1 - src/meta-srv/src/inactive_node_manager.rs | 37 +++++++------ src/meta-srv/src/keys.rs | 10 ++-- src/meta-srv/src/lease.rs | 4 +- src/meta-srv/src/metasrv.rs | 22 +++++--- src/meta-srv/src/metasrv/builder.rs | 45 ++++++++++------ src/meta-srv/src/procedure/region_failover.rs | 12 +++-- .../region_failover/activate_region.rs | 3 +- .../region_failover/deactivate_region.rs | 53 +++++++++--------- src/meta-srv/src/test_util.rs | 4 +- 20 files changed, 218 insertions(+), 132 deletions(-) create mode 100644 src/meta-srv/src/handler/filter_inactive_region_stats.rs diff --git a/src/cmd/src/metasrv.rs b/src/cmd/src/metasrv.rs index 2b6e6d9ce7..15f09fefab 100644 --- a/src/cmd/src/metasrv.rs +++ b/src/cmd/src/metasrv.rs @@ -93,7 +93,7 @@ struct StartCommand { #[clap(long)] use_memory_store: Option, #[clap(long)] - disable_region_failover: Option, + enable_region_failover: Option, #[clap(long)] http_addr: Option, #[clap(long)] @@ -140,8 +140,8 @@ impl StartCommand { opts.use_memory_store = use_memory_store; } - if let Some(disable_region_failover) = self.disable_region_failover { - opts.disable_region_failover = disable_region_failover; + if let Some(enable_region_failover) = self.enable_region_failover { + opts.enable_region_failover = enable_region_failover; } if let Some(http_addr) = &self.http_addr { diff --git a/src/meta-srv/src/handler.rs b/src/meta-srv/src/handler.rs index b75f74308c..bc51f7a3ea 100644 --- a/src/meta-srv/src/handler.rs +++ b/src/meta-srv/src/handler.rs @@ -12,7 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -use std::collections::BTreeMap; +use std::collections::{BTreeMap, HashSet}; use std::ops::Range; use std::sync::Arc; use std::time::Duration; @@ -22,17 +22,10 @@ use api::v1::meta::{ HeartbeatRequest, HeartbeatResponse, MailboxMessage, RegionLease, RequestHeader, ResponseHeader, Role, PROTOCOL_VERSION, }; -pub use check_leader_handler::CheckLeaderHandler; -pub use collect_stats_handler::CollectStatsHandler; use common_meta::instruction::{Instruction, InstructionReply}; use common_telemetry::{debug, info, timer, warn}; use dashmap::DashMap; -pub use failure_handler::RegionFailureHandler; -pub use keep_lease_handler::KeepLeaseHandler; use metrics::{decrement_gauge, increment_gauge}; -pub use on_leader_start_handler::OnLeaderStartHandler; -pub use persist_stats_handler::PersistStatsHandler; -pub use response_header_handler::ResponseHeaderHandler; use snafu::{OptionExt, ResultExt}; use tokio::sync::mpsc::Sender; use tokio::sync::{oneshot, Notify, RwLock}; @@ -46,17 +39,18 @@ use crate::service::mailbox::{ BroadcastChannel, Channel, Mailbox, MailboxReceiver, MailboxRef, MessageId, }; -mod check_leader_handler; -mod collect_stats_handler; -pub(crate) mod failure_handler; -mod keep_lease_handler; +pub mod check_leader_handler; +pub mod collect_stats_handler; +pub mod failure_handler; +pub mod filter_inactive_region_stats; +pub mod keep_lease_handler; pub mod mailbox_handler; pub mod node_stat; -mod on_leader_start_handler; -mod persist_stats_handler; +pub mod on_leader_start_handler; +pub mod persist_stats_handler; pub mod publish_heartbeat_handler; -pub(crate) mod region_lease_handler; -mod response_header_handler; +pub mod region_lease_handler; +pub mod response_header_handler; #[async_trait::async_trait] pub trait HeartbeatHandler: Send + Sync { @@ -81,6 +75,7 @@ pub struct HeartbeatAccumulator { pub header: Option, pub instructions: Vec, pub stat: Option, + pub inactive_region_ids: HashSet, pub region_lease: Option, } @@ -404,11 +399,13 @@ mod tests { use api::v1::meta::{MailboxMessage, RequestHeader, Role, PROTOCOL_VERSION}; use tokio::sync::mpsc; + use crate::handler::check_leader_handler::CheckLeaderHandler; + use crate::handler::collect_stats_handler::CollectStatsHandler; use crate::handler::mailbox_handler::MailboxHandler; - use crate::handler::{ - CheckLeaderHandler, CollectStatsHandler, HeartbeatHandlerGroup, HeartbeatMailbox, - OnLeaderStartHandler, PersistStatsHandler, Pusher, ResponseHeaderHandler, - }; + use crate::handler::on_leader_start_handler::OnLeaderStartHandler; + use crate::handler::persist_stats_handler::PersistStatsHandler; + use crate::handler::response_header_handler::ResponseHeaderHandler; + use crate::handler::{HeartbeatHandlerGroup, HeartbeatMailbox, Pusher}; use crate::sequence::Sequence; use crate::service::mailbox::{Channel, MailboxReceiver, MailboxRef}; use crate::service::store::memory::MemStore; diff --git a/src/meta-srv/src/handler/check_leader_handler.rs b/src/meta-srv/src/handler/check_leader_handler.rs index eadcce8d2a..cce1e83f5e 100644 --- a/src/meta-srv/src/handler/check_leader_handler.rs +++ b/src/meta-srv/src/handler/check_leader_handler.rs @@ -19,7 +19,6 @@ use crate::error::Result; use crate::handler::{HeartbeatAccumulator, HeartbeatHandler}; use crate::metasrv::Context; -#[derive(Default)] pub struct CheckLeaderHandler; #[async_trait::async_trait] diff --git a/src/meta-srv/src/handler/collect_stats_handler.rs b/src/meta-srv/src/handler/collect_stats_handler.rs index 4eb84e95a3..6f4efffcc8 100644 --- a/src/meta-srv/src/handler/collect_stats_handler.rs +++ b/src/meta-srv/src/handler/collect_stats_handler.rs @@ -20,7 +20,6 @@ use crate::error::Result; use crate::handler::{HeartbeatAccumulator, HeartbeatHandler}; use crate::metasrv::Context; -#[derive(Default)] pub struct CollectStatsHandler; #[async_trait::async_trait] diff --git a/src/meta-srv/src/handler/filter_inactive_region_stats.rs b/src/meta-srv/src/handler/filter_inactive_region_stats.rs new file mode 100644 index 0000000000..0f3f240c76 --- /dev/null +++ b/src/meta-srv/src/handler/filter_inactive_region_stats.rs @@ -0,0 +1,54 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use api::v1::meta::{HeartbeatRequest, Role}; +use async_trait::async_trait; +use common_telemetry::warn; + +use crate::error::Result; +use crate::handler::{HeartbeatAccumulator, HeartbeatHandler}; +use crate::metasrv::Context; + +pub struct FilterInactiveRegionStatsHandler; + +#[async_trait] +impl HeartbeatHandler for FilterInactiveRegionStatsHandler { + fn is_acceptable(&self, role: Role) -> bool { + role == Role::Datanode + } + + async fn handle( + &self, + req: &HeartbeatRequest, + _ctx: &mut Context, + acc: &mut HeartbeatAccumulator, + ) -> Result<()> { + if acc.inactive_region_ids.is_empty() { + return Ok(()); + } + + warn!( + "The heartbeat of this node {:?} contains some inactive regions: {:?}", + req.peer, acc.inactive_region_ids + ); + + let Some(stat) = acc.stat.as_mut() else { + return Ok(()); + }; + + stat.retain_active_region_stats(&acc.inactive_region_ids); + + Ok(()) + } +} diff --git a/src/meta-srv/src/handler/keep_lease_handler.rs b/src/meta-srv/src/handler/keep_lease_handler.rs index 779ab2ab91..9d607a1fa1 100644 --- a/src/meta-srv/src/handler/keep_lease_handler.rs +++ b/src/meta-srv/src/handler/keep_lease_handler.rs @@ -22,7 +22,6 @@ use crate::handler::{HeartbeatAccumulator, HeartbeatHandler}; use crate::keys::{LeaseKey, LeaseValue}; use crate::metasrv::Context; -#[derive(Default)] pub struct KeepLeaseHandler; #[async_trait::async_trait] diff --git a/src/meta-srv/src/handler/mailbox_handler.rs b/src/meta-srv/src/handler/mailbox_handler.rs index 973e869411..8ffbf9aa21 100644 --- a/src/meta-srv/src/handler/mailbox_handler.rs +++ b/src/meta-srv/src/handler/mailbox_handler.rs @@ -18,7 +18,6 @@ use crate::error::Result; use crate::handler::{HeartbeatAccumulator, HeartbeatHandler}; use crate::metasrv::Context; -#[derive(Default)] pub struct MailboxHandler; #[async_trait::async_trait] diff --git a/src/meta-srv/src/handler/node_stat.rs b/src/meta-srv/src/handler/node_stat.rs index 82b68eadaf..20072b4941 100644 --- a/src/meta-srv/src/handler/node_stat.rs +++ b/src/meta-srv/src/handler/node_stat.rs @@ -12,6 +12,8 @@ // See the License for the specific language governing permissions and // limitations under the License. +use std::collections::HashSet; + use api::v1::meta::HeartbeatRequest; use common_time::util as time_util; use serde::{Deserialize, Serialize}; @@ -60,6 +62,18 @@ impl Stat { pub fn region_ids(&self) -> Vec { self.region_stats.iter().map(|s| s.id).collect() } + + pub fn retain_active_region_stats(&mut self, inactive_region_ids: &HashSet) { + if inactive_region_ids.is_empty() { + return; + } + + self.region_stats + .retain(|r| !inactive_region_ids.contains(&r.id)); + self.rcus = self.region_stats.iter().map(|s| s.rcus).sum(); + self.wcus = self.region_stats.iter().map(|s| s.wcus).sum(); + self.region_num = self.region_stats.len() as u64; + } } impl TryFrom for Stat { diff --git a/src/meta-srv/src/handler/on_leader_start_handler.rs b/src/meta-srv/src/handler/on_leader_start_handler.rs index b770e7f4b4..e2666ef05c 100644 --- a/src/meta-srv/src/handler/on_leader_start_handler.rs +++ b/src/meta-srv/src/handler/on_leader_start_handler.rs @@ -18,7 +18,6 @@ use crate::error::Result; use crate::handler::{HeartbeatAccumulator, HeartbeatHandler}; use crate::metasrv::Context; -#[derive(Default)] pub struct OnLeaderStartHandler; #[async_trait::async_trait] diff --git a/src/meta-srv/src/handler/region_lease_handler.rs b/src/meta-srv/src/handler/region_lease_handler.rs index fe421f38f5..4942b650f6 100644 --- a/src/meta-srv/src/handler/region_lease_handler.rs +++ b/src/meta-srv/src/handler/region_lease_handler.rs @@ -12,24 +12,25 @@ // See the License for the specific language governing permissions and // limitations under the License. -use std::collections::HashMap; - use api::v1::meta::{HeartbeatRequest, RegionLease, Role}; use async_trait::async_trait; -use store_api::storage::RegionId; use crate::error::Result; use crate::handler::{HeartbeatAccumulator, HeartbeatHandler}; use crate::inactive_node_manager::InactiveNodeManager; use crate::metasrv::Context; -/// The lease seconds of a region. It's set by two default heartbeat intervals (5 second × 2) plus -/// two roundtrip time (2 second × 2 × 2), plus some extra buffer (2 second). -// TODO(LFC): Make region lease seconds calculated from Datanode heartbeat configuration. -pub(crate) const REGION_LEASE_SECONDS: u64 = 20; +pub struct RegionLeaseHandler { + region_lease_seconds: u64, +} -#[derive(Default)] -pub(crate) struct RegionLeaseHandler; +impl RegionLeaseHandler { + pub fn new(region_lease_seconds: u64) -> Self { + Self { + region_lease_seconds, + } + } +} #[async_trait] impl HeartbeatHandler for RegionLeaseHandler { @@ -47,26 +48,18 @@ impl HeartbeatHandler for RegionLeaseHandler { return Ok(()); }; - let mut table_region_leases = HashMap::new(); - stat.region_stats.iter().for_each(|region_stat| { - let region_id = RegionId::from(region_stat.id); - table_region_leases - .entry(region_id.table_id()) - .or_insert_with(Vec::new) - .push(region_id.region_number()); - }); - let mut region_ids = stat.region_ids(); let inactive_node_manager = InactiveNodeManager::new(&ctx.in_memory); - inactive_node_manager + let inactive_region_ids = inactive_node_manager .retain_active_regions(stat.cluster_id, stat.id, &mut region_ids) .await?; + acc.inactive_region_ids = inactive_region_ids; acc.region_lease = Some(RegionLease { region_ids, duration_since_epoch: req.duration_since_epoch, - lease_seconds: REGION_LEASE_SECONDS, + lease_seconds: self.region_lease_seconds, }); Ok(()) @@ -80,11 +73,12 @@ mod test { use common_meta::ident::TableIdent; use common_meta::key::TableMetadataManager; use common_meta::RegionIdent; - use store_api::storage::RegionNumber; + use store_api::storage::{RegionId, RegionNumber}; use super::*; use crate::handler::node_stat::{RegionStat, Stat}; use crate::metasrv::builder::MetaSrvBuilder; + use crate::metasrv::DEFAULT_REGION_LEASE_SECS; use crate::service::store::kv::KvBackendAdapter; use crate::{table_routes, test_util}; @@ -158,12 +152,15 @@ mod test { .await .unwrap(); - RegionLeaseHandler.handle(&req, ctx, acc).await.unwrap(); + RegionLeaseHandler::new(DEFAULT_REGION_LEASE_SECS) + .handle(&req, ctx, acc) + .await + .unwrap(); assert!(acc.region_lease.is_some()); let lease = acc.region_lease.as_ref().unwrap(); assert_eq!(lease.region_ids, vec![RegionId::new(table_id, 2).as_u64()]); assert_eq!(lease.duration_since_epoch, 1234); - assert_eq!(lease.lease_seconds, REGION_LEASE_SECONDS); + assert_eq!(lease.lease_seconds, DEFAULT_REGION_LEASE_SECS); } } diff --git a/src/meta-srv/src/handler/response_header_handler.rs b/src/meta-srv/src/handler/response_header_handler.rs index 25e3ff5bd6..54eda6288e 100644 --- a/src/meta-srv/src/handler/response_header_handler.rs +++ b/src/meta-srv/src/handler/response_header_handler.rs @@ -18,7 +18,6 @@ use crate::error::Result; use crate::handler::{HeartbeatAccumulator, HeartbeatHandler}; use crate::metasrv::Context; -#[derive(Default)] pub struct ResponseHeaderHandler; #[async_trait::async_trait] diff --git a/src/meta-srv/src/inactive_node_manager.rs b/src/meta-srv/src/inactive_node_manager.rs index a0e0a9a38a..427d37f2b7 100644 --- a/src/meta-srv/src/inactive_node_manager.rs +++ b/src/meta-srv/src/inactive_node_manager.rs @@ -19,7 +19,7 @@ use common_meta::RegionIdent; use store_api::storage::RegionId; use crate::error::Result; -use crate::keys::InactiveNodeKey; +use crate::keys::InactiveRegionKey; use crate::service::store::kv::ResettableKvStoreRef; pub struct InactiveNodeManager<'a> { @@ -37,7 +37,7 @@ impl<'a> InactiveNodeManager<'a> { region_ident.region_number, ) .as_u64(); - let key = InactiveNodeKey { + let key = InactiveRegionKey { cluster_id: region_ident.cluster_id, node_id: region_ident.datanode_id, region_id, @@ -57,7 +57,7 @@ impl<'a> InactiveNodeManager<'a> { region_ident.region_number, ) .as_u64(); - let key: Vec = InactiveNodeKey { + let key: Vec = InactiveRegionKey { cluster_id: region_ident.cluster_id, node_id: region_ident.datanode_id, region_id, @@ -68,19 +68,19 @@ impl<'a> InactiveNodeManager<'a> { } /// The input is a list of regions on a specific node. If one or more regions have been - /// set to inactive state by metasrv, the corresponding regions will be removed, then - /// return the remaining regions. + /// set to inactive state by metasrv, the corresponding regions will be removed(update the + /// `region_ids`), then returns the removed regions. pub async fn retain_active_regions( &self, cluster_id: u64, node_id: u64, region_ids: &mut Vec, - ) -> Result<()> { + ) -> Result> { let key_region_ids = region_ids .iter() .map(|region_id| { ( - InactiveNodeKey { + InactiveRegionKey { cluster_id, node_id, region_id: *region_id, @@ -94,17 +94,24 @@ impl<'a> InactiveNodeManager<'a> { let resp = self.store.batch_get(BatchGetRequest { keys }).await?; let kvs = resp.kvs; if kvs.is_empty() { - return Ok(()); + return Ok(HashSet::new()); } let inactive_keys = kvs.into_iter().map(|kv| kv.key).collect::>(); - let active_region_ids = key_region_ids - .into_iter() - .filter(|(key, _)| !inactive_keys.contains(key)) - .map(|(_, region_id)| region_id) - .collect::>(); - *region_ids = active_region_ids; + let (active_region_ids, inactive_region_ids): (Vec>, Vec>) = + key_region_ids + .into_iter() + .map(|(key, region_id)| { + let is_active = !inactive_keys.contains(&key); + if is_active { + (Some(region_id), None) + } else { + (None, Some(region_id)) + } + }) + .unzip(); + *region_ids = active_region_ids.into_iter().flatten().collect(); - Ok(()) + Ok(inactive_region_ids.into_iter().flatten().collect()) } } diff --git a/src/meta-srv/src/keys.rs b/src/meta-srv/src/keys.rs index bcfc498de1..a2ca626b4d 100644 --- a/src/meta-srv/src/keys.rs +++ b/src/meta-srv/src/keys.rs @@ -26,7 +26,7 @@ use crate::handler::node_stat::Stat; pub(crate) const DN_LEASE_PREFIX: &str = "__meta_dnlease"; pub(crate) const SEQ_PREFIX: &str = "__meta_seq"; -pub(crate) const INACTIVE_NODE_PREFIX: &str = "__meta_inactive_node"; +pub(crate) const INACTIVE_REGION_PREFIX: &str = "__meta_inactive_region"; pub const DN_STAT_PREFIX: &str = "__meta_dnstat"; @@ -238,17 +238,17 @@ impl TryFrom> for StatValue { } #[derive(Debug, Clone, Copy, Eq, PartialEq, Hash)] -pub struct InactiveNodeKey { +pub struct InactiveRegionKey { pub cluster_id: u64, pub node_id: u64, pub region_id: u64, } -impl From for Vec { - fn from(value: InactiveNodeKey) -> Self { +impl From for Vec { + fn from(value: InactiveRegionKey) -> Self { format!( "{}-{}-{}-{}", - INACTIVE_NODE_PREFIX, value.cluster_id, value.node_id, value.region_id + INACTIVE_REGION_PREFIX, value.cluster_id, value.node_id, value.region_id ) .into_bytes() } diff --git a/src/meta-srv/src/lease.rs b/src/meta-srv/src/lease.rs index 7da1c640f3..0f8c409406 100644 --- a/src/meta-srv/src/lease.rs +++ b/src/meta-srv/src/lease.rs @@ -24,10 +24,10 @@ use crate::keys::{LeaseKey, LeaseValue, DN_LEASE_PREFIX}; pub async fn alive_datanodes( cluster_id: u64, meta_peer_client: &MetaPeerClientRef, - lease_secs: i64, + lease_secs: u64, ) -> Result> { let lease_filter = |_: &LeaseKey, v: &LeaseValue| { - time_util::current_time_millis() - v.timestamp_millis < lease_secs * 1000 + ((time_util::current_time_millis() - v.timestamp_millis) as u64) < lease_secs * 1000 }; filter_datanodes(cluster_id, meta_peer_client, lease_filter).await diff --git a/src/meta-srv/src/metasrv.rs b/src/meta-srv/src/metasrv.rs index 2f1992bad8..0d0bc017cb 100644 --- a/src/meta-srv/src/metasrv.rs +++ b/src/meta-srv/src/metasrv.rs @@ -45,7 +45,12 @@ use crate::sequence::SequenceRef; use crate::service::mailbox::MailboxRef; use crate::service::store::kv::{KvStoreRef, ResettableKvStoreRef}; pub const TABLE_ID_SEQ: &str = "table_id"; -const METASRV_HOME: &str = "/tmp/metasrv"; +pub const METASRV_HOME: &str = "/tmp/metasrv"; + +pub const DEFAULT_DATANODE_LEASE_SECS: u64 = 20; +/// The lease seconds of a region. It's set by two default heartbeat intervals (5 second × 2) plus +/// two roundtrip time (2 second × 2 × 2), plus some extra buffer (2 second). +pub const DEFAULT_REGION_LEASE_SECS: u64 = 20; #[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)] #[serde(default)] @@ -53,10 +58,11 @@ pub struct MetaSrvOptions { pub bind_addr: String, pub server_addr: String, pub store_addr: String, - pub datanode_lease_secs: i64, + pub datanode_lease_secs: u64, + pub region_lease_secs: u64, pub selector: SelectorType, pub use_memory_store: bool, - pub disable_region_failover: bool, + pub enable_region_failover: bool, pub http_opts: HttpOptions, pub logging: LoggingOptions, pub procedure: ProcedureConfig, @@ -71,10 +77,11 @@ impl Default for MetaSrvOptions { bind_addr: "127.0.0.1:3002".to_string(), server_addr: "127.0.0.1:3002".to_string(), store_addr: "127.0.0.1:2379".to_string(), - datanode_lease_secs: 15, + datanode_lease_secs: DEFAULT_DATANODE_LEASE_SECS, + region_lease_secs: DEFAULT_REGION_LEASE_SECS, selector: SelectorType::default(), use_memory_store: false, - disable_region_failover: false, + enable_region_failover: true, http_opts: HttpOptions::default(), logging: LoggingOptions { dir: format!("{METASRV_HOME}/logs"), @@ -161,8 +168,8 @@ pub struct LeaderValue(pub String); #[derive(Clone)] pub struct SelectorContext { - pub datanode_lease_secs: i64, pub server_addr: String, + pub datanode_lease_secs: u64, pub kv_store: KvStoreRef, pub meta_peer_client: MetaPeerClientRef, pub catalog: Option, @@ -372,6 +379,7 @@ impl MetaSrv { let mailbox = self.mailbox.clone(); let election = self.election.clone(); let skip_all = Arc::new(AtomicBool::new(false)); + let table_metadata_manager = self.table_metadata_manager.clone(); Context { server_addr, @@ -383,7 +391,7 @@ impl MetaSrv { election, skip_all, is_infancy: false, - table_metadata_manager: self.table_metadata_manager.clone(), + table_metadata_manager, } } } diff --git a/src/meta-srv/src/metasrv/builder.rs b/src/meta-srv/src/metasrv/builder.rs index b027522065..146f8f4c31 100644 --- a/src/meta-srv/src/metasrv/builder.rs +++ b/src/meta-srv/src/metasrv/builder.rs @@ -27,14 +27,18 @@ use crate::cluster::{MetaPeerClientBuilder, MetaPeerClientRef}; use crate::ddl::{DdlManager, DdlManagerRef}; use crate::error::Result; use crate::greptimedb_telemetry::get_greptimedb_telemetry_task; +use crate::handler::check_leader_handler::CheckLeaderHandler; +use crate::handler::collect_stats_handler::CollectStatsHandler; +use crate::handler::failure_handler::RegionFailureHandler; +use crate::handler::filter_inactive_region_stats::FilterInactiveRegionStatsHandler; +use crate::handler::keep_lease_handler::KeepLeaseHandler; use crate::handler::mailbox_handler::MailboxHandler; +use crate::handler::on_leader_start_handler::OnLeaderStartHandler; +use crate::handler::persist_stats_handler::PersistStatsHandler; use crate::handler::publish_heartbeat_handler::PublishHeartbeatHandler; use crate::handler::region_lease_handler::RegionLeaseHandler; -use crate::handler::{ - CheckLeaderHandler, CollectStatsHandler, HeartbeatHandlerGroup, HeartbeatMailbox, - KeepLeaseHandler, OnLeaderStartHandler, PersistStatsHandler, Pushers, RegionFailureHandler, - ResponseHeaderHandler, -}; +use crate::handler::response_header_handler::ResponseHeaderHandler; +use crate::handler::{HeartbeatHandlerGroup, HeartbeatMailbox, Pushers}; use crate::lock::memory::MemLock; use crate::lock::DistLockRef; use crate::metadata_service::{DefaultMetadataService, MetadataServiceRef}; @@ -186,10 +190,8 @@ impl MetaSrvBuilder { let handler_group = match handler_group { Some(handler_group) => handler_group, None => { - let region_failover_handler = if options.disable_region_failover { - None - } else { - let select_ctx = SelectorContext { + let region_failover_handler = if options.enable_region_failover { + let selector_ctx = SelectorContext { server_addr: options.server_addr.clone(), datanode_lease_secs: options.datanode_lease_secs, kv_store: kv_store.clone(), @@ -199,11 +201,11 @@ impl MetaSrvBuilder { table: None, }; let region_failover_manager = Arc::new(RegionFailoverManager::new( + options.region_lease_secs, in_memory.clone(), mailbox.clone(), procedure_manager.clone(), - selector.clone(), - select_ctx, + (selector.clone(), selector_ctx), lock.clone(), table_metadata_manager.clone(), )); @@ -211,8 +213,18 @@ impl MetaSrvBuilder { RegionFailureHandler::try_new(election.clone(), region_failover_manager) .await?, ) + } else { + None }; + let publish_heartbeat_handler = if let Some((publish, _)) = pubsub.as_ref() { + Some(PublishHeartbeatHandler::new(publish.clone())) + } else { + None + }; + + let region_lease_handler = RegionLeaseHandler::new(options.region_lease_secs); + let group = HeartbeatHandlerGroup::new(pushers); group.add_handler(ResponseHeaderHandler).await; // `KeepLeaseHandler` should preferably be in front of `CheckLeaderHandler`, @@ -223,16 +235,15 @@ impl MetaSrvBuilder { group.add_handler(OnLeaderStartHandler).await; group.add_handler(CollectStatsHandler).await; group.add_handler(MailboxHandler).await; + group.add_handler(region_lease_handler).await; + group.add_handler(FilterInactiveRegionStatsHandler).await; if let Some(region_failover_handler) = region_failover_handler { group.add_handler(region_failover_handler).await; } - group.add_handler(RegionLeaseHandler).await; - group.add_handler(PersistStatsHandler::default()).await; - if let Some((publish, _)) = pubsub.as_ref() { - group - .add_handler(PublishHeartbeatHandler::new(publish.clone())) - .await; + if let Some(publish_heartbeat_handler) = publish_heartbeat_handler { + group.add_handler(publish_heartbeat_handler).await; } + group.add_handler(PersistStatsHandler::default()).await; group } }; diff --git a/src/meta-srv/src/procedure/region_failover.rs b/src/meta-srv/src/procedure/region_failover.rs index af19ce9c24..3c5de1cc1b 100644 --- a/src/meta-srv/src/procedure/region_failover.rs +++ b/src/meta-srv/src/procedure/region_failover.rs @@ -49,7 +49,6 @@ use crate::service::mailbox::MailboxRef; use crate::service::store::kv::ResettableKvStoreRef; const OPEN_REGION_MESSAGE_TIMEOUT: Duration = Duration::from_secs(30); -const CLOSE_REGION_MESSAGE_TIMEOUT: Duration = Duration::from_secs(2); /// A key for the preventing running multiple failover procedures for the same region. #[derive(PartialEq, Eq, Hash, Clone)] @@ -70,6 +69,7 @@ impl From for RegionFailoverKey { } pub(crate) struct RegionFailoverManager { + region_lease_secs: u64, in_memory: ResettableKvStoreRef, mailbox: MailboxRef, procedure_manager: ProcedureManagerRef, @@ -93,15 +93,16 @@ impl Drop for FailoverProcedureGuard { impl RegionFailoverManager { pub(crate) fn new( + region_lease_secs: u64, in_memory: ResettableKvStoreRef, mailbox: MailboxRef, procedure_manager: ProcedureManagerRef, - selector: SelectorRef, - selector_ctx: SelectorContext, + (selector, selector_ctx): (SelectorRef, SelectorContext), dist_lock: DistLockRef, table_metadata_manager: TableMetadataManagerRef, ) -> Self { Self { + region_lease_secs, in_memory, mailbox, procedure_manager, @@ -115,6 +116,7 @@ impl RegionFailoverManager { pub(crate) fn create_context(&self) -> RegionFailoverContext { RegionFailoverContext { + region_lease_secs: self.region_lease_secs, in_memory: self.in_memory.clone(), mailbox: self.mailbox.clone(), selector: self.selector.clone(), @@ -247,6 +249,7 @@ struct Node { /// The "Context" of region failover procedure state machine. #[derive(Clone)] pub struct RegionFailoverContext { + pub region_lease_secs: u64, pub in_memory: ResettableKvStoreRef, pub mailbox: MailboxRef, pub selector: SelectorRef, @@ -543,6 +546,7 @@ mod tests { TestingEnv { context: RegionFailoverContext { + region_lease_secs: 10, in_memory, mailbox, selector, @@ -770,7 +774,7 @@ mod tests { let result = procedure.execute(&ctx).await; assert!(matches!(result, Ok(Status::Executing { persist: true }))); assert_eq!( - r#"{"region_failover_state":"DeactivateRegion","candidate":{"id":42,"addr":""},"region_lease_expiry_seconds":40}"#, + r#"{"region_failover_state":"DeactivateRegion","candidate":{"id":42,"addr":""}}"#, serde_json::to_string(&procedure.node.state).unwrap() ); } diff --git a/src/meta-srv/src/procedure/region_failover/activate_region.rs b/src/meta-srv/src/procedure/region_failover/activate_region.rs index f250b452e4..a6c835c25d 100644 --- a/src/meta-srv/src/procedure/region_failover/activate_region.rs +++ b/src/meta-srv/src/procedure/region_failover/activate_region.rs @@ -19,7 +19,7 @@ use async_trait::async_trait; use common_meta::instruction::{Instruction, InstructionReply, SimpleReply}; use common_meta::peer::Peer; use common_meta::RegionIdent; -use common_telemetry::debug; +use common_telemetry::{debug, info}; use serde::{Deserialize, Serialize}; use snafu::ResultExt; @@ -135,6 +135,7 @@ impl State for ActivateRegion { ctx: &RegionFailoverContext, failed_region: &RegionIdent, ) -> Result> { + info!("Activating region: {failed_region:?}"); let mailbox_receiver = self .send_open_region_message(ctx, failed_region, OPEN_REGION_MESSAGE_TIMEOUT) .await?; diff --git a/src/meta-srv/src/procedure/region_failover/deactivate_region.rs b/src/meta-srv/src/procedure/region_failover/deactivate_region.rs index 9d8ec58396..a83108ad08 100644 --- a/src/meta-srv/src/procedure/region_failover/deactivate_region.rs +++ b/src/meta-srv/src/procedure/region_failover/deactivate_region.rs @@ -19,7 +19,7 @@ use async_trait::async_trait; use common_meta::instruction::{Instruction, InstructionReply, SimpleReply}; use common_meta::peer::Peer; use common_meta::RegionIdent; -use common_telemetry::debug; +use common_telemetry::{debug, info, warn}; use serde::{Deserialize, Serialize}; use snafu::ResultExt; @@ -28,31 +28,24 @@ use super::{RegionFailoverContext, State}; use crate::error::{ Error, Result, RetryLaterSnafu, SerializeToJsonSnafu, UnexpectedInstructionReplySnafu, }; -use crate::handler::region_lease_handler::REGION_LEASE_SECONDS; use crate::handler::HeartbeatMailbox; use crate::inactive_node_manager::InactiveNodeManager; -use crate::procedure::region_failover::CLOSE_REGION_MESSAGE_TIMEOUT; use crate::service::mailbox::{Channel, MailboxReceiver}; #[derive(Serialize, Deserialize, Debug)] pub(super) struct DeactivateRegion { candidate: Peer, - region_lease_expiry_seconds: u64, } impl DeactivateRegion { pub(super) fn new(candidate: Peer) -> Self { - Self { - candidate, - region_lease_expiry_seconds: REGION_LEASE_SECONDS * 2, - } + Self { candidate } } async fn send_close_region_message( &self, ctx: &RegionFailoverContext, failed_region: &RegionIdent, - timeout: Duration, ) -> Result { let instruction = Instruction::CloseRegion(failed_region.clone()); @@ -67,11 +60,17 @@ impl DeactivateRegion { input: instruction.to_string(), })?; + let ch = Channel::Datanode(failed_region.datanode_id); + // Mark the region as inactive InactiveNodeManager::new(&ctx.in_memory) .register_inactive_region(failed_region) .await?; - - let ch = Channel::Datanode(failed_region.datanode_id); + // We first marked the region as inactive, which means that the failed region cannot + // be successfully renewed from now on, so after the lease time is exceeded, the region + // will be automatically closed. + // If the deadline is exceeded, we can proceed to the next step with confidence, + // as the expiration means that the region has been closed. + let timeout = Duration::from_secs(ctx.region_lease_secs); ctx.mailbox.send(&ch, msg, timeout).await } @@ -110,10 +109,10 @@ impl DeactivateRegion { } } Err(Error::MailboxTimeout { .. }) => { - // Since we are in a region failover situation, the Datanode that the failed region - // resides might be unreachable. So we wait for the region lease to expire. The - // region would be closed by its own [RegionAliveKeeper]. - self.wait_for_region_lease_expiry().await; + // We have configured the timeout to match the region lease timeout before making + // the call and have disabled region lease renewal. Therefore, if a timeout error + // occurs, it can be concluded that the region has been closed. With this information, + // we can proceed confidently to the next step. Ok(Box::new(ActivateRegion::new(self.candidate.clone()))) } Err(e) => Err(e), @@ -123,8 +122,8 @@ impl DeactivateRegion { /// Sleep for `region_lease_expiry_seconds`, to make sure the region is closed (by its /// region alive keeper). This is critical for region not being opened in multiple Datanodes /// simultaneously. - async fn wait_for_region_lease_expiry(&self) { - tokio::time::sleep(Duration::from_secs(self.region_lease_expiry_seconds)).await; + async fn wait_for_region_lease_expiry(&self, ctx: &RegionFailoverContext) { + tokio::time::sleep(Duration::from_secs(ctx.region_lease_secs)).await; } } @@ -136,14 +135,17 @@ impl State for DeactivateRegion { ctx: &RegionFailoverContext, failed_region: &RegionIdent, ) -> Result> { - let result = self - .send_close_region_message(ctx, failed_region, CLOSE_REGION_MESSAGE_TIMEOUT) - .await; + info!("Deactivating region: {failed_region:?}"); + let result = self.send_close_region_message(ctx, failed_region).await; let mailbox_receiver = match result { Ok(mailbox_receiver) => mailbox_receiver, Err(Error::PusherNotFound { .. }) => { + warn!( + "Datanode {} is not reachable, skip deactivating region {}, just wait for the region lease to expire", + failed_region.datanode_id, failed_region + ); // See the mailbox received timeout situation comments above. - self.wait_for_region_lease_expiry().await; + self.wait_for_region_lease_expiry(ctx).await; return Ok(Box::new(ActivateRegion::new(self.candidate.clone()))); } Err(e) => return Err(e), @@ -171,7 +173,7 @@ mod tests { let state = DeactivateRegion::new(Peer::new(2, "")); let mailbox_receiver = state - .send_close_region_message(&env.context, &failed_region, Duration::from_millis(100)) + .send_close_region_message(&env.context, &failed_region) .await .unwrap(); @@ -235,12 +237,9 @@ mod tests { let mut env = TestingEnvBuilder::new().build().await; let failed_region = env.failed_region(1).await; - let state = DeactivateRegion { - candidate: Peer::new(2, ""), - region_lease_expiry_seconds: 2, - }; + let state = DeactivateRegion::new(Peer::new(2, "")); let mailbox_receiver = state - .send_close_region_message(&env.context, &failed_region, Duration::from_millis(100)) + .send_close_region_message(&env.context, &failed_region) .await .unwrap(); diff --git a/src/meta-srv/src/test_util.rs b/src/meta-srv/src/test_util.rs index d364b042bc..dd2fc8f0e6 100644 --- a/src/meta-srv/src/test_util.rs +++ b/src/meta-srv/src/test_util.rs @@ -76,11 +76,11 @@ pub(crate) fn create_region_failover_manager() -> Arc { }; Arc::new(RegionFailoverManager::new( + 10, in_memory, mailbox, procedure_manager, - selector, - selector_ctx, + (selector, selector_ctx), Arc::new(MemLock::default()), Arc::new(TableMetadataManager::new(KvBackendAdapter::wrap(kv_store))), ))