From 77af4fd981f2816ca05e2a94f2d8632e6f7aeedc Mon Sep 17 00:00:00 2001 From: Weny Xu Date: Mon, 30 Sep 2024 10:56:53 +0800 Subject: [PATCH] refactor: introduce `HeartbeatHandlerGroupBuilder` (#4785) --- src/meta-srv/src/handler.rs | 154 +++++++++++++++++++++----- src/meta-srv/src/metasrv.rs | 6 +- src/meta-srv/src/metasrv/builder.rs | 53 ++------- src/meta-srv/src/service/heartbeat.rs | 4 +- 4 files changed, 142 insertions(+), 75 deletions(-) diff --git a/src/meta-srv/src/handler.rs b/src/meta-srv/src/handler.rs index 063d3939c1..5363b6c548 100644 --- a/src/meta-srv/src/handler.rs +++ b/src/meta-srv/src/handler.rs @@ -22,12 +22,28 @@ use api::v1::meta::{ HeartbeatRequest, HeartbeatResponse, MailboxMessage, RegionLease, RequestHeader, ResponseHeader, Role, PROTOCOL_VERSION, }; +use check_leader_handler::CheckLeaderHandler; +use collect_cluster_info_handler::{ + CollectDatanodeClusterInfoHandler, CollectFlownodeClusterInfoHandler, + CollectFrontendClusterInfoHandler, +}; +use collect_stats_handler::CollectStatsHandler; +use common_base::Plugins; use common_meta::datanode::Stat; use common_meta::instruction::{Instruction, InstructionReply}; use common_meta::sequence::Sequence; use common_telemetry::{debug, info, warn}; use dashmap::DashMap; +use extract_stat_handler::ExtractStatHandler; +use failure_handler::RegionFailureHandler; +use filter_inactive_region_stats::FilterInactiveRegionStatsHandler; use futures::future::join_all; +use keep_lease_handler::{DatanodeKeepLeaseHandler, FlownodeKeepLeaseHandler}; +use mailbox_handler::MailboxHandler; +use on_leader_start_handler::OnLeaderStartHandler; +use publish_heartbeat_handler::PublishHeartbeatHandler; +use region_lease_handler::RegionLeaseHandler; +use response_header_handler::ResponseHeaderHandler; use snafu::{OptionExt, ResultExt}; use store_api::storage::RegionId; use tokio::sync::mpsc::Sender; @@ -36,6 +52,7 @@ use tokio::sync::{oneshot, Notify, RwLock}; use crate::error::{self, DeserializeFromJsonSnafu, Result, UnexpectedInstructionReplySnafu}; use crate::metasrv::Context; use crate::metrics::{METRIC_META_HANDLER_EXECUTE, METRIC_META_HEARTBEAT_CONNECTION_NUM}; +use crate::pubsub::PublisherRef; use crate::service::mailbox::{ BroadcastChannel, Channel, Mailbox, MailboxReceiver, MailboxRef, MessageId, }; @@ -96,6 +113,7 @@ impl HeartbeatAccumulator { } } +/// The pusher of the heartbeat response. pub struct Pusher { sender: Sender>, res_header: ResponseHeader, @@ -131,6 +149,7 @@ impl Pusher { } } +/// The group of heartbeat pushers. #[derive(Clone, Default)] pub struct Pushers(Arc>>); @@ -203,50 +222,57 @@ impl NameCachedHandler { } } -#[derive(Clone, Default)] +pub type HeartbeatHandlerGroupRef = Arc; + +/// The group of heartbeat handlers. +#[derive(Default)] pub struct HeartbeatHandlerGroup { - handlers: Arc>>, + handlers: Vec, pushers: Pushers, } impl HeartbeatHandlerGroup { pub(crate) fn new(pushers: Pushers) -> Self { Self { - handlers: Arc::new(RwLock::new(vec![])), + handlers: vec![], pushers, } } - pub async fn add_handler(&self, handler: impl HeartbeatHandler + 'static) { - let mut handlers = self.handlers.write().await; - handlers.push(NameCachedHandler::new(handler)); + fn add_handler(&mut self, handler: impl HeartbeatHandler + 'static) { + self.handlers.push(NameCachedHandler::new(handler)); } - pub async fn register(&self, key: impl AsRef, pusher: Pusher) { + /// Registers the heartbeat response [`Pusher`] with the given key to the group. + pub async fn register_pusher(&self, key: impl AsRef, pusher: Pusher) { let key = key.as_ref(); METRIC_META_HEARTBEAT_CONNECTION_NUM.inc(); info!("Pusher register: {}", key); let _ = self.pushers.insert(key.to_string(), pusher).await; } - pub async fn deregister(&self, key: impl AsRef) -> Option { + /// Deregisters the heartbeat response [`Pusher`] with the given key from the group. + /// + /// Returns the [`Pusher`] if it exists. + pub async fn deregister_push(&self, key: impl AsRef) -> Option { let key = key.as_ref(); METRIC_META_HEARTBEAT_CONNECTION_NUM.dec(); info!("Pusher unregister: {}", key); self.pushers.remove(key).await } + /// Returns the [`Pushers`] of the group. pub fn pushers(&self) -> Pushers { self.pushers.clone() } + /// Handles the heartbeat request. pub async fn handle( &self, req: HeartbeatRequest, mut ctx: Context, ) -> Result { let mut acc = HeartbeatAccumulator::default(); - let handlers = self.handlers.read().await; let role = req .header .as_ref() @@ -255,7 +281,7 @@ impl HeartbeatHandlerGroup { err_msg: format!("invalid role: {:?}", req.header), })?; - for NameCachedHandler { name, handler } in handlers.iter() { + for NameCachedHandler { name, handler } in self.handlers.iter() { if !handler.is_acceptable(role) { continue; } @@ -426,6 +452,84 @@ impl Mailbox for HeartbeatMailbox { } } +/// The builder to build the group of heartbeat handlers. +pub struct HeartbeatHandlerGroupBuilder { + /// The handler to handle region failure. + region_failure_handler: Option, + + /// The handler to handle region lease. + region_lease_handler: RegionLeaseHandler, + + /// The plugins. + plugins: Option, + + /// The heartbeat response pushers. + pushers: Pushers, +} + +impl HeartbeatHandlerGroupBuilder { + pub fn new(pushers: Pushers, region_lease_handler: RegionLeaseHandler) -> Self { + Self { + region_failure_handler: None, + region_lease_handler, + plugins: None, + pushers, + } + } + + /// Sets the [`RegionFailureHandler`]. + pub fn with_region_failure_handler(mut self, handler: Option) -> Self { + self.region_failure_handler = handler; + self + } + + /// Sets the [`Plugins`]. + pub fn with_plugins(mut self, plugins: Option) -> Self { + self.plugins = plugins; + self + } + + /// Builds the group of heartbeat handlers. + pub fn build(self) -> HeartbeatHandlerGroup { + // Extract the `PublishHeartbeatHandler` from the plugins. + let publish_heartbeat_handler = if let Some(plugins) = self.plugins { + plugins + .get::() + .map(|publish| PublishHeartbeatHandler::new(publish.clone())) + } else { + None + }; + + // TODO(weny): Considers classifying handlers + // to make it easier for upper layers to customize handler groups. + let mut group = HeartbeatHandlerGroup::new(self.pushers); + group.add_handler(ResponseHeaderHandler); + // `KeepLeaseHandler` should preferably be in front of `CheckLeaderHandler`, + // because even if the current meta-server node is no longer the leader it can + // still help the datanode to keep lease. + group.add_handler(DatanodeKeepLeaseHandler); + group.add_handler(FlownodeKeepLeaseHandler); + group.add_handler(CheckLeaderHandler); + group.add_handler(OnLeaderStartHandler); + group.add_handler(ExtractStatHandler); + group.add_handler(CollectDatanodeClusterInfoHandler); + group.add_handler(CollectFrontendClusterInfoHandler); + group.add_handler(CollectFlownodeClusterInfoHandler); + group.add_handler(MailboxHandler); + group.add_handler(self.region_lease_handler); + group.add_handler(FilterInactiveRegionStatsHandler); + if let Some(region_failure_handler) = self.region_failure_handler { + group.add_handler(region_failure_handler); + } + if let Some(publish_heartbeat_handler) = publish_heartbeat_handler { + group.add_handler(publish_heartbeat_handler); + } + group.add_handler(CollectStatsHandler::default()); + + group + } +} + #[cfg(test)] mod tests { @@ -489,7 +593,7 @@ mod tests { let pusher: Pusher = Pusher::new(pusher_tx, &res_header); let handler_group = HeartbeatHandlerGroup::default(); handler_group - .register(format!("{}-{}", Role::Datanode as i32, datanode_id), pusher) + .register_pusher(format!("{}-{}", Role::Datanode as i32, datanode_id), pusher) .await; let kv_backend = Arc::new(MemoryKvBackend::new()); @@ -519,21 +623,21 @@ mod tests { #[tokio::test] async fn test_handler_name() { - let group = HeartbeatHandlerGroup::default(); - group.add_handler(ResponseHeaderHandler).await; - group.add_handler(DatanodeKeepLeaseHandler).await; - group.add_handler(FlownodeKeepLeaseHandler).await; - group.add_handler(CheckLeaderHandler).await; - group.add_handler(OnLeaderStartHandler).await; - group.add_handler(ExtractStatHandler).await; - group.add_handler(CollectDatanodeClusterInfoHandler).await; - group.add_handler(CollectFrontendClusterInfoHandler).await; - group.add_handler(CollectFlownodeClusterInfoHandler).await; - group.add_handler(MailboxHandler).await; - group.add_handler(FilterInactiveRegionStatsHandler).await; - group.add_handler(CollectStatsHandler::default()).await; + let mut group = HeartbeatHandlerGroup::default(); + group.add_handler(ResponseHeaderHandler); + group.add_handler(DatanodeKeepLeaseHandler); + group.add_handler(FlownodeKeepLeaseHandler); + group.add_handler(CheckLeaderHandler); + group.add_handler(OnLeaderStartHandler); + group.add_handler(ExtractStatHandler); + group.add_handler(CollectDatanodeClusterInfoHandler); + group.add_handler(CollectFrontendClusterInfoHandler); + group.add_handler(CollectFlownodeClusterInfoHandler); + group.add_handler(MailboxHandler); + group.add_handler(FilterInactiveRegionStatsHandler); + group.add_handler(CollectStatsHandler::default()); - let handlers = group.handlers.read().await; + let handlers = group.handlers; assert_eq!(12, handlers.len()); diff --git a/src/meta-srv/src/metasrv.rs b/src/meta-srv/src/metasrv.rs index 2beb09859b..de7d54aa65 100644 --- a/src/meta-srv/src/metasrv.rs +++ b/src/meta-srv/src/metasrv.rs @@ -52,7 +52,7 @@ use crate::error::{ StopProcedureManagerSnafu, }; use crate::failure_detector::PhiAccrualFailureDetectorOptions; -use crate::handler::HeartbeatHandlerGroup; +use crate::handler::HeartbeatHandlerGroupRef; use crate::lease::lookup_datanode_peer; use crate::lock::DistLockRef; use crate::procedure::region_migration::manager::RegionMigrationManagerRef; @@ -366,7 +366,7 @@ pub struct Metasrv { selector: SelectorRef, // The flow selector is used to select a target flownode. flow_selector: SelectorRef, - handler_group: HeartbeatHandlerGroup, + handler_group: HeartbeatHandlerGroupRef, election: Option, lock: DistLockRef, procedure_manager: ProcedureManagerRef, @@ -562,7 +562,7 @@ impl Metasrv { &self.flow_selector } - pub fn handler_group(&self) -> &HeartbeatHandlerGroup { + pub fn handler_group(&self) -> &HeartbeatHandlerGroupRef { &self.handler_group } diff --git a/src/meta-srv/src/metasrv/builder.rs b/src/meta-srv/src/metasrv/builder.rs index 6b06bab867..662de433ab 100644 --- a/src/meta-srv/src/metasrv/builder.rs +++ b/src/meta-srv/src/metasrv/builder.rs @@ -46,22 +46,11 @@ use crate::cluster::{MetaPeerClientBuilder, MetaPeerClientRef}; use crate::error::{self, Result}; use crate::flow_meta_alloc::FlowPeerAllocator; use crate::greptimedb_telemetry::get_greptimedb_telemetry_task; -use crate::handler::check_leader_handler::CheckLeaderHandler; -use crate::handler::collect_cluster_info_handler::{ - CollectDatanodeClusterInfoHandler, CollectFlownodeClusterInfoHandler, - CollectFrontendClusterInfoHandler, -}; -use crate::handler::collect_stats_handler::CollectStatsHandler; -use crate::handler::extract_stat_handler::ExtractStatHandler; use crate::handler::failure_handler::RegionFailureHandler; -use crate::handler::filter_inactive_region_stats::FilterInactiveRegionStatsHandler; -use crate::handler::keep_lease_handler::{DatanodeKeepLeaseHandler, FlownodeKeepLeaseHandler}; -use crate::handler::mailbox_handler::MailboxHandler; -use crate::handler::on_leader_start_handler::OnLeaderStartHandler; -use crate::handler::publish_heartbeat_handler::PublishHeartbeatHandler; use crate::handler::region_lease_handler::RegionLeaseHandler; -use crate::handler::response_header_handler::ResponseHeaderHandler; -use crate::handler::{HeartbeatHandlerGroup, HeartbeatMailbox, Pushers}; +use crate::handler::{ + HeartbeatHandlerGroup, HeartbeatHandlerGroupBuilder, HeartbeatMailbox, Pushers, +}; use crate::lease::MetaPeerLookupService; use crate::lock::memory::MemLock; use crate::lock::DistLockRef; @@ -70,7 +59,6 @@ use crate::metasrv::{ }; use crate::procedure::region_migration::manager::RegionMigrationManager; use crate::procedure::region_migration::DefaultContextFactory; -use crate::pubsub::PublisherRef; use crate::region::supervisor::{ HeartbeatAcceptor, RegionFailureDetectorControl, RegionSupervisor, RegionSupervisorTicker, DEFAULT_TICK_INTERVAL, @@ -364,41 +352,16 @@ impl MetasrvBuilder { let handler_group = match handler_group { Some(handler_group) => handler_group, None => { - let publish_heartbeat_handler = plugins - .clone() - .and_then(|plugins| plugins.get::()) - .map(|publish| PublishHeartbeatHandler::new(publish.clone())); - let region_lease_handler = RegionLeaseHandler::new( distributed_time_constants::REGION_LEASE_SECS, table_metadata_manager.clone(), memory_region_keeper.clone(), ); - let group = HeartbeatHandlerGroup::new(pushers); - group.add_handler(ResponseHeaderHandler).await; - // `KeepLeaseHandler` should preferably be in front of `CheckLeaderHandler`, - // because even if the current meta-server node is no longer the leader it can - // still help the datanode to keep lease. - group.add_handler(DatanodeKeepLeaseHandler).await; - group.add_handler(FlownodeKeepLeaseHandler).await; - group.add_handler(CheckLeaderHandler).await; - group.add_handler(OnLeaderStartHandler).await; - group.add_handler(ExtractStatHandler).await; - group.add_handler(CollectDatanodeClusterInfoHandler).await; - group.add_handler(CollectFrontendClusterInfoHandler).await; - group.add_handler(CollectFlownodeClusterInfoHandler).await; - group.add_handler(MailboxHandler).await; - group.add_handler(region_lease_handler).await; - group.add_handler(FilterInactiveRegionStatsHandler).await; - if let Some(region_failover_handler) = region_failover_handler { - group.add_handler(region_failover_handler).await; - } - if let Some(publish_heartbeat_handler) = publish_heartbeat_handler { - group.add_handler(publish_heartbeat_handler).await; - } - group.add_handler(CollectStatsHandler::default()).await; - group + HeartbeatHandlerGroupBuilder::new(pushers, region_lease_handler) + .with_plugins(plugins.clone()) + .with_region_failure_handler(region_failover_handler) + .build() } }; @@ -417,7 +380,7 @@ impl MetasrvBuilder { selector, // TODO(jeremy): We do not allow configuring the flow selector. flow_selector: Arc::new(RoundRobinSelector::new(SelectTarget::Flownode)), - handler_group, + handler_group: Arc::new(handler_group), election, lock, procedure_manager, diff --git a/src/meta-srv/src/service/heartbeat.rs b/src/meta-srv/src/service/heartbeat.rs index f5ac74a4b5..569d6a8089 100644 --- a/src/meta-srv/src/service/heartbeat.rs +++ b/src/meta-srv/src/service/heartbeat.rs @@ -113,7 +113,7 @@ impl heartbeat_server::Heartbeat for Metasrv { ); if let Some(key) = pusher_key { - let _ = handler_group.deregister(&key).await; + let _ = handler_group.deregister_push(&key).await; } }); @@ -177,7 +177,7 @@ async fn register_pusher( let node_id = get_node_id(header); let key = format!("{}-{}", role, node_id); let pusher = Pusher::new(sender, header); - handler_group.register(&key, pusher).await; + handler_group.register_pusher(&key, pusher).await; Some(key) }