mirror of
https://github.com/GreptimeTeam/greptimedb.git
synced 2025-12-22 22:20:02 +00:00
refactor: introduce HeartbeatHandlerGroupBuilder (#4785)
This commit is contained in:
@@ -22,12 +22,28 @@ use api::v1::meta::{
|
||||
HeartbeatRequest, HeartbeatResponse, MailboxMessage, RegionLease, RequestHeader,
|
||||
ResponseHeader, Role, PROTOCOL_VERSION,
|
||||
};
|
||||
use check_leader_handler::CheckLeaderHandler;
|
||||
use collect_cluster_info_handler::{
|
||||
CollectDatanodeClusterInfoHandler, CollectFlownodeClusterInfoHandler,
|
||||
CollectFrontendClusterInfoHandler,
|
||||
};
|
||||
use collect_stats_handler::CollectStatsHandler;
|
||||
use common_base::Plugins;
|
||||
use common_meta::datanode::Stat;
|
||||
use common_meta::instruction::{Instruction, InstructionReply};
|
||||
use common_meta::sequence::Sequence;
|
||||
use common_telemetry::{debug, info, warn};
|
||||
use dashmap::DashMap;
|
||||
use extract_stat_handler::ExtractStatHandler;
|
||||
use failure_handler::RegionFailureHandler;
|
||||
use filter_inactive_region_stats::FilterInactiveRegionStatsHandler;
|
||||
use futures::future::join_all;
|
||||
use keep_lease_handler::{DatanodeKeepLeaseHandler, FlownodeKeepLeaseHandler};
|
||||
use mailbox_handler::MailboxHandler;
|
||||
use on_leader_start_handler::OnLeaderStartHandler;
|
||||
use publish_heartbeat_handler::PublishHeartbeatHandler;
|
||||
use region_lease_handler::RegionLeaseHandler;
|
||||
use response_header_handler::ResponseHeaderHandler;
|
||||
use snafu::{OptionExt, ResultExt};
|
||||
use store_api::storage::RegionId;
|
||||
use tokio::sync::mpsc::Sender;
|
||||
@@ -36,6 +52,7 @@ use tokio::sync::{oneshot, Notify, RwLock};
|
||||
use crate::error::{self, DeserializeFromJsonSnafu, Result, UnexpectedInstructionReplySnafu};
|
||||
use crate::metasrv::Context;
|
||||
use crate::metrics::{METRIC_META_HANDLER_EXECUTE, METRIC_META_HEARTBEAT_CONNECTION_NUM};
|
||||
use crate::pubsub::PublisherRef;
|
||||
use crate::service::mailbox::{
|
||||
BroadcastChannel, Channel, Mailbox, MailboxReceiver, MailboxRef, MessageId,
|
||||
};
|
||||
@@ -96,6 +113,7 @@ impl HeartbeatAccumulator {
|
||||
}
|
||||
}
|
||||
|
||||
/// The pusher of the heartbeat response.
|
||||
pub struct Pusher {
|
||||
sender: Sender<std::result::Result<HeartbeatResponse, tonic::Status>>,
|
||||
res_header: ResponseHeader,
|
||||
@@ -131,6 +149,7 @@ impl Pusher {
|
||||
}
|
||||
}
|
||||
|
||||
/// The group of heartbeat pushers.
|
||||
#[derive(Clone, Default)]
|
||||
pub struct Pushers(Arc<RwLock<BTreeMap<String, Pusher>>>);
|
||||
|
||||
@@ -203,50 +222,57 @@ impl NameCachedHandler {
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Clone, Default)]
|
||||
pub type HeartbeatHandlerGroupRef = Arc<HeartbeatHandlerGroup>;
|
||||
|
||||
/// The group of heartbeat handlers.
|
||||
#[derive(Default)]
|
||||
pub struct HeartbeatHandlerGroup {
|
||||
handlers: Arc<RwLock<Vec<NameCachedHandler>>>,
|
||||
handlers: Vec<NameCachedHandler>,
|
||||
pushers: Pushers,
|
||||
}
|
||||
|
||||
impl HeartbeatHandlerGroup {
|
||||
pub(crate) fn new(pushers: Pushers) -> Self {
|
||||
Self {
|
||||
handlers: Arc::new(RwLock::new(vec![])),
|
||||
handlers: vec![],
|
||||
pushers,
|
||||
}
|
||||
}
|
||||
|
||||
pub async fn add_handler(&self, handler: impl HeartbeatHandler + 'static) {
|
||||
let mut handlers = self.handlers.write().await;
|
||||
handlers.push(NameCachedHandler::new(handler));
|
||||
fn add_handler(&mut self, handler: impl HeartbeatHandler + 'static) {
|
||||
self.handlers.push(NameCachedHandler::new(handler));
|
||||
}
|
||||
|
||||
pub async fn register(&self, key: impl AsRef<str>, pusher: Pusher) {
|
||||
/// Registers the heartbeat response [`Pusher`] with the given key to the group.
|
||||
pub async fn register_pusher(&self, key: impl AsRef<str>, pusher: Pusher) {
|
||||
let key = key.as_ref();
|
||||
METRIC_META_HEARTBEAT_CONNECTION_NUM.inc();
|
||||
info!("Pusher register: {}", key);
|
||||
let _ = self.pushers.insert(key.to_string(), pusher).await;
|
||||
}
|
||||
|
||||
pub async fn deregister(&self, key: impl AsRef<str>) -> Option<Pusher> {
|
||||
/// Deregisters the heartbeat response [`Pusher`] with the given key from the group.
|
||||
///
|
||||
/// Returns the [`Pusher`] if it exists.
|
||||
pub async fn deregister_push(&self, key: impl AsRef<str>) -> Option<Pusher> {
|
||||
let key = key.as_ref();
|
||||
METRIC_META_HEARTBEAT_CONNECTION_NUM.dec();
|
||||
info!("Pusher unregister: {}", key);
|
||||
self.pushers.remove(key).await
|
||||
}
|
||||
|
||||
/// Returns the [`Pushers`] of the group.
|
||||
pub fn pushers(&self) -> Pushers {
|
||||
self.pushers.clone()
|
||||
}
|
||||
|
||||
/// Handles the heartbeat request.
|
||||
pub async fn handle(
|
||||
&self,
|
||||
req: HeartbeatRequest,
|
||||
mut ctx: Context,
|
||||
) -> Result<HeartbeatResponse> {
|
||||
let mut acc = HeartbeatAccumulator::default();
|
||||
let handlers = self.handlers.read().await;
|
||||
let role = req
|
||||
.header
|
||||
.as_ref()
|
||||
@@ -255,7 +281,7 @@ impl HeartbeatHandlerGroup {
|
||||
err_msg: format!("invalid role: {:?}", req.header),
|
||||
})?;
|
||||
|
||||
for NameCachedHandler { name, handler } in handlers.iter() {
|
||||
for NameCachedHandler { name, handler } in self.handlers.iter() {
|
||||
if !handler.is_acceptable(role) {
|
||||
continue;
|
||||
}
|
||||
@@ -426,6 +452,84 @@ impl Mailbox for HeartbeatMailbox {
|
||||
}
|
||||
}
|
||||
|
||||
/// The builder to build the group of heartbeat handlers.
|
||||
pub struct HeartbeatHandlerGroupBuilder {
|
||||
/// The handler to handle region failure.
|
||||
region_failure_handler: Option<RegionFailureHandler>,
|
||||
|
||||
/// The handler to handle region lease.
|
||||
region_lease_handler: RegionLeaseHandler,
|
||||
|
||||
/// The plugins.
|
||||
plugins: Option<Plugins>,
|
||||
|
||||
/// The heartbeat response pushers.
|
||||
pushers: Pushers,
|
||||
}
|
||||
|
||||
impl HeartbeatHandlerGroupBuilder {
|
||||
pub fn new(pushers: Pushers, region_lease_handler: RegionLeaseHandler) -> Self {
|
||||
Self {
|
||||
region_failure_handler: None,
|
||||
region_lease_handler,
|
||||
plugins: None,
|
||||
pushers,
|
||||
}
|
||||
}
|
||||
|
||||
/// Sets the [`RegionFailureHandler`].
|
||||
pub fn with_region_failure_handler(mut self, handler: Option<RegionFailureHandler>) -> Self {
|
||||
self.region_failure_handler = handler;
|
||||
self
|
||||
}
|
||||
|
||||
/// Sets the [`Plugins`].
|
||||
pub fn with_plugins(mut self, plugins: Option<Plugins>) -> Self {
|
||||
self.plugins = plugins;
|
||||
self
|
||||
}
|
||||
|
||||
/// Builds the group of heartbeat handlers.
|
||||
pub fn build(self) -> HeartbeatHandlerGroup {
|
||||
// Extract the `PublishHeartbeatHandler` from the plugins.
|
||||
let publish_heartbeat_handler = if let Some(plugins) = self.plugins {
|
||||
plugins
|
||||
.get::<PublisherRef>()
|
||||
.map(|publish| PublishHeartbeatHandler::new(publish.clone()))
|
||||
} else {
|
||||
None
|
||||
};
|
||||
|
||||
// TODO(weny): Considers classifying handlers
|
||||
// to make it easier for upper layers to customize handler groups.
|
||||
let mut group = HeartbeatHandlerGroup::new(self.pushers);
|
||||
group.add_handler(ResponseHeaderHandler);
|
||||
// `KeepLeaseHandler` should preferably be in front of `CheckLeaderHandler`,
|
||||
// because even if the current meta-server node is no longer the leader it can
|
||||
// still help the datanode to keep lease.
|
||||
group.add_handler(DatanodeKeepLeaseHandler);
|
||||
group.add_handler(FlownodeKeepLeaseHandler);
|
||||
group.add_handler(CheckLeaderHandler);
|
||||
group.add_handler(OnLeaderStartHandler);
|
||||
group.add_handler(ExtractStatHandler);
|
||||
group.add_handler(CollectDatanodeClusterInfoHandler);
|
||||
group.add_handler(CollectFrontendClusterInfoHandler);
|
||||
group.add_handler(CollectFlownodeClusterInfoHandler);
|
||||
group.add_handler(MailboxHandler);
|
||||
group.add_handler(self.region_lease_handler);
|
||||
group.add_handler(FilterInactiveRegionStatsHandler);
|
||||
if let Some(region_failure_handler) = self.region_failure_handler {
|
||||
group.add_handler(region_failure_handler);
|
||||
}
|
||||
if let Some(publish_heartbeat_handler) = publish_heartbeat_handler {
|
||||
group.add_handler(publish_heartbeat_handler);
|
||||
}
|
||||
group.add_handler(CollectStatsHandler::default());
|
||||
|
||||
group
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
|
||||
@@ -489,7 +593,7 @@ mod tests {
|
||||
let pusher: Pusher = Pusher::new(pusher_tx, &res_header);
|
||||
let handler_group = HeartbeatHandlerGroup::default();
|
||||
handler_group
|
||||
.register(format!("{}-{}", Role::Datanode as i32, datanode_id), pusher)
|
||||
.register_pusher(format!("{}-{}", Role::Datanode as i32, datanode_id), pusher)
|
||||
.await;
|
||||
|
||||
let kv_backend = Arc::new(MemoryKvBackend::new());
|
||||
@@ -519,21 +623,21 @@ mod tests {
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_handler_name() {
|
||||
let group = HeartbeatHandlerGroup::default();
|
||||
group.add_handler(ResponseHeaderHandler).await;
|
||||
group.add_handler(DatanodeKeepLeaseHandler).await;
|
||||
group.add_handler(FlownodeKeepLeaseHandler).await;
|
||||
group.add_handler(CheckLeaderHandler).await;
|
||||
group.add_handler(OnLeaderStartHandler).await;
|
||||
group.add_handler(ExtractStatHandler).await;
|
||||
group.add_handler(CollectDatanodeClusterInfoHandler).await;
|
||||
group.add_handler(CollectFrontendClusterInfoHandler).await;
|
||||
group.add_handler(CollectFlownodeClusterInfoHandler).await;
|
||||
group.add_handler(MailboxHandler).await;
|
||||
group.add_handler(FilterInactiveRegionStatsHandler).await;
|
||||
group.add_handler(CollectStatsHandler::default()).await;
|
||||
let mut group = HeartbeatHandlerGroup::default();
|
||||
group.add_handler(ResponseHeaderHandler);
|
||||
group.add_handler(DatanodeKeepLeaseHandler);
|
||||
group.add_handler(FlownodeKeepLeaseHandler);
|
||||
group.add_handler(CheckLeaderHandler);
|
||||
group.add_handler(OnLeaderStartHandler);
|
||||
group.add_handler(ExtractStatHandler);
|
||||
group.add_handler(CollectDatanodeClusterInfoHandler);
|
||||
group.add_handler(CollectFrontendClusterInfoHandler);
|
||||
group.add_handler(CollectFlownodeClusterInfoHandler);
|
||||
group.add_handler(MailboxHandler);
|
||||
group.add_handler(FilterInactiveRegionStatsHandler);
|
||||
group.add_handler(CollectStatsHandler::default());
|
||||
|
||||
let handlers = group.handlers.read().await;
|
||||
let handlers = group.handlers;
|
||||
|
||||
assert_eq!(12, handlers.len());
|
||||
|
||||
|
||||
@@ -52,7 +52,7 @@ use crate::error::{
|
||||
StopProcedureManagerSnafu,
|
||||
};
|
||||
use crate::failure_detector::PhiAccrualFailureDetectorOptions;
|
||||
use crate::handler::HeartbeatHandlerGroup;
|
||||
use crate::handler::HeartbeatHandlerGroupRef;
|
||||
use crate::lease::lookup_datanode_peer;
|
||||
use crate::lock::DistLockRef;
|
||||
use crate::procedure::region_migration::manager::RegionMigrationManagerRef;
|
||||
@@ -366,7 +366,7 @@ pub struct Metasrv {
|
||||
selector: SelectorRef,
|
||||
// The flow selector is used to select a target flownode.
|
||||
flow_selector: SelectorRef,
|
||||
handler_group: HeartbeatHandlerGroup,
|
||||
handler_group: HeartbeatHandlerGroupRef,
|
||||
election: Option<ElectionRef>,
|
||||
lock: DistLockRef,
|
||||
procedure_manager: ProcedureManagerRef,
|
||||
@@ -562,7 +562,7 @@ impl Metasrv {
|
||||
&self.flow_selector
|
||||
}
|
||||
|
||||
pub fn handler_group(&self) -> &HeartbeatHandlerGroup {
|
||||
pub fn handler_group(&self) -> &HeartbeatHandlerGroupRef {
|
||||
&self.handler_group
|
||||
}
|
||||
|
||||
|
||||
@@ -46,22 +46,11 @@ use crate::cluster::{MetaPeerClientBuilder, MetaPeerClientRef};
|
||||
use crate::error::{self, Result};
|
||||
use crate::flow_meta_alloc::FlowPeerAllocator;
|
||||
use crate::greptimedb_telemetry::get_greptimedb_telemetry_task;
|
||||
use crate::handler::check_leader_handler::CheckLeaderHandler;
|
||||
use crate::handler::collect_cluster_info_handler::{
|
||||
CollectDatanodeClusterInfoHandler, CollectFlownodeClusterInfoHandler,
|
||||
CollectFrontendClusterInfoHandler,
|
||||
};
|
||||
use crate::handler::collect_stats_handler::CollectStatsHandler;
|
||||
use crate::handler::extract_stat_handler::ExtractStatHandler;
|
||||
use crate::handler::failure_handler::RegionFailureHandler;
|
||||
use crate::handler::filter_inactive_region_stats::FilterInactiveRegionStatsHandler;
|
||||
use crate::handler::keep_lease_handler::{DatanodeKeepLeaseHandler, FlownodeKeepLeaseHandler};
|
||||
use crate::handler::mailbox_handler::MailboxHandler;
|
||||
use crate::handler::on_leader_start_handler::OnLeaderStartHandler;
|
||||
use crate::handler::publish_heartbeat_handler::PublishHeartbeatHandler;
|
||||
use crate::handler::region_lease_handler::RegionLeaseHandler;
|
||||
use crate::handler::response_header_handler::ResponseHeaderHandler;
|
||||
use crate::handler::{HeartbeatHandlerGroup, HeartbeatMailbox, Pushers};
|
||||
use crate::handler::{
|
||||
HeartbeatHandlerGroup, HeartbeatHandlerGroupBuilder, HeartbeatMailbox, Pushers,
|
||||
};
|
||||
use crate::lease::MetaPeerLookupService;
|
||||
use crate::lock::memory::MemLock;
|
||||
use crate::lock::DistLockRef;
|
||||
@@ -70,7 +59,6 @@ use crate::metasrv::{
|
||||
};
|
||||
use crate::procedure::region_migration::manager::RegionMigrationManager;
|
||||
use crate::procedure::region_migration::DefaultContextFactory;
|
||||
use crate::pubsub::PublisherRef;
|
||||
use crate::region::supervisor::{
|
||||
HeartbeatAcceptor, RegionFailureDetectorControl, RegionSupervisor, RegionSupervisorTicker,
|
||||
DEFAULT_TICK_INTERVAL,
|
||||
@@ -364,41 +352,16 @@ impl MetasrvBuilder {
|
||||
let handler_group = match handler_group {
|
||||
Some(handler_group) => handler_group,
|
||||
None => {
|
||||
let publish_heartbeat_handler = plugins
|
||||
.clone()
|
||||
.and_then(|plugins| plugins.get::<PublisherRef>())
|
||||
.map(|publish| PublishHeartbeatHandler::new(publish.clone()));
|
||||
|
||||
let region_lease_handler = RegionLeaseHandler::new(
|
||||
distributed_time_constants::REGION_LEASE_SECS,
|
||||
table_metadata_manager.clone(),
|
||||
memory_region_keeper.clone(),
|
||||
);
|
||||
|
||||
let group = HeartbeatHandlerGroup::new(pushers);
|
||||
group.add_handler(ResponseHeaderHandler).await;
|
||||
// `KeepLeaseHandler` should preferably be in front of `CheckLeaderHandler`,
|
||||
// because even if the current meta-server node is no longer the leader it can
|
||||
// still help the datanode to keep lease.
|
||||
group.add_handler(DatanodeKeepLeaseHandler).await;
|
||||
group.add_handler(FlownodeKeepLeaseHandler).await;
|
||||
group.add_handler(CheckLeaderHandler).await;
|
||||
group.add_handler(OnLeaderStartHandler).await;
|
||||
group.add_handler(ExtractStatHandler).await;
|
||||
group.add_handler(CollectDatanodeClusterInfoHandler).await;
|
||||
group.add_handler(CollectFrontendClusterInfoHandler).await;
|
||||
group.add_handler(CollectFlownodeClusterInfoHandler).await;
|
||||
group.add_handler(MailboxHandler).await;
|
||||
group.add_handler(region_lease_handler).await;
|
||||
group.add_handler(FilterInactiveRegionStatsHandler).await;
|
||||
if let Some(region_failover_handler) = region_failover_handler {
|
||||
group.add_handler(region_failover_handler).await;
|
||||
}
|
||||
if let Some(publish_heartbeat_handler) = publish_heartbeat_handler {
|
||||
group.add_handler(publish_heartbeat_handler).await;
|
||||
}
|
||||
group.add_handler(CollectStatsHandler::default()).await;
|
||||
group
|
||||
HeartbeatHandlerGroupBuilder::new(pushers, region_lease_handler)
|
||||
.with_plugins(plugins.clone())
|
||||
.with_region_failure_handler(region_failover_handler)
|
||||
.build()
|
||||
}
|
||||
};
|
||||
|
||||
@@ -417,7 +380,7 @@ impl MetasrvBuilder {
|
||||
selector,
|
||||
// TODO(jeremy): We do not allow configuring the flow selector.
|
||||
flow_selector: Arc::new(RoundRobinSelector::new(SelectTarget::Flownode)),
|
||||
handler_group,
|
||||
handler_group: Arc::new(handler_group),
|
||||
election,
|
||||
lock,
|
||||
procedure_manager,
|
||||
|
||||
@@ -113,7 +113,7 @@ impl heartbeat_server::Heartbeat for Metasrv {
|
||||
);
|
||||
|
||||
if let Some(key) = pusher_key {
|
||||
let _ = handler_group.deregister(&key).await;
|
||||
let _ = handler_group.deregister_push(&key).await;
|
||||
}
|
||||
});
|
||||
|
||||
@@ -177,7 +177,7 @@ async fn register_pusher(
|
||||
let node_id = get_node_id(header);
|
||||
let key = format!("{}-{}", role, node_id);
|
||||
let pusher = Pusher::new(sender, header);
|
||||
handler_group.register(&key, pusher).await;
|
||||
handler_group.register_pusher(&key, pusher).await;
|
||||
Some(key)
|
||||
}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user