diff --git a/src/meta-srv/src/handler.rs b/src/meta-srv/src/handler.rs index 0b0fff204f..8d763c7d92 100644 --- a/src/meta-srv/src/handler.rs +++ b/src/meta-srv/src/handler.rs @@ -67,7 +67,16 @@ pub trait HeartbeatHandler: Send + Sync { req: &HeartbeatRequest, ctx: &mut Context, acc: &mut HeartbeatAccumulator, - ) -> Result<()>; + ) -> Result; +} + +/// HandleControl +/// +/// Controls process of handling heartbeat request. +#[derive(PartialEq)] +pub enum HandleControl { + Continue, + Done, } #[derive(Debug, Default)] @@ -246,15 +255,16 @@ impl HeartbeatHandlerGroup { })?; for NameCachedHandler { name, handler } in handlers.iter() { - if ctx.is_skip_all() { - break; + if !handler.is_acceptable(role) { + continue; } - if handler.is_acceptable(role) { - let _timer = METRIC_META_HANDLER_EXECUTE - .with_label_values(&[*name]) - .start_timer(); - handler.handle(&req, &mut ctx, &mut acc).await?; + let _timer = METRIC_META_HANDLER_EXECUTE + .with_label_values(&[*name]) + .start_timer(); + + if handler.handle(&req, &mut ctx, &mut acc).await? == HandleControl::Done { + break; } } let header = std::mem::take(&mut acc.header); diff --git a/src/meta-srv/src/handler/check_leader_handler.rs b/src/meta-srv/src/handler/check_leader_handler.rs index cce1e83f5e..0da7d0737e 100644 --- a/src/meta-srv/src/handler/check_leader_handler.rs +++ b/src/meta-srv/src/handler/check_leader_handler.rs @@ -16,7 +16,7 @@ use api::v1::meta::{Error, HeartbeatRequest, Role}; use common_telemetry::warn; use crate::error::Result; -use crate::handler::{HeartbeatAccumulator, HeartbeatHandler}; +use crate::handler::{HandleControl, HeartbeatAccumulator, HeartbeatHandler}; use crate::metasrv::Context; pub struct CheckLeaderHandler; @@ -32,17 +32,25 @@ impl HeartbeatHandler for CheckLeaderHandler { req: &HeartbeatRequest, ctx: &mut Context, acc: &mut HeartbeatAccumulator, - ) -> Result<()> { - if let Some(election) = &ctx.election { - if election.is_leader() { - return Ok(()); - } - if let Some(header) = &mut acc.header { - header.error = Some(Error::is_not_leader()); - ctx.set_skip_all(); - warn!("Received a heartbeat {:?}, but the current node is not the leader, so the heartbeat will be ignored.", req.header); - } + ) -> Result { + let Some(election) = &ctx.election else { + return Ok(HandleControl::Continue); + }; + + if election.is_leader() { + return Ok(HandleControl::Continue); } - Ok(()) + + warn!( + "A heartbeat was received {:?}, however, since the current node is not the leader,\ + this heartbeat will be disregarded.", + req.header + ); + + if let Some(header) = &mut acc.header { + header.error = Some(Error::is_not_leader()); + } + + return Ok(HandleControl::Done); } } diff --git a/src/meta-srv/src/handler/collect_stats_handler.rs b/src/meta-srv/src/handler/collect_stats_handler.rs index 536748aaaf..4e59a81b24 100644 --- a/src/meta-srv/src/handler/collect_stats_handler.rs +++ b/src/meta-srv/src/handler/collect_stats_handler.rs @@ -17,7 +17,7 @@ use common_telemetry::warn; use super::node_stat::Stat; use crate::error::Result; -use crate::handler::{HeartbeatAccumulator, HeartbeatHandler}; +use crate::handler::{HandleControl, HeartbeatAccumulator, HeartbeatHandler}; use crate::metasrv::Context; pub struct CollectStatsHandler; @@ -33,11 +33,11 @@ impl HeartbeatHandler for CollectStatsHandler { req: &HeartbeatRequest, _ctx: &mut Context, acc: &mut HeartbeatAccumulator, - ) -> Result<()> { + ) -> Result { if req.mailbox_message.is_some() { // If the heartbeat is a mailbox message, it may have no other valid information, // so we don't need to collect stats. - return Ok(()); + return Ok(HandleControl::Continue); } match Stat::try_from(req.clone()) { @@ -49,6 +49,6 @@ impl HeartbeatHandler for CollectStatsHandler { } }; - Ok(()) + Ok(HandleControl::Continue) } } diff --git a/src/meta-srv/src/handler/failure_handler.rs b/src/meta-srv/src/handler/failure_handler.rs index 7170128961..a5ac222993 100644 --- a/src/meta-srv/src/handler/failure_handler.rs +++ b/src/meta-srv/src/handler/failure_handler.rs @@ -25,7 +25,7 @@ use store_api::storage::RegionId; use crate::error::Result; use crate::failure_detector::PhiAccrualFailureDetectorOptions; use crate::handler::failure_handler::runner::{FailureDetectControl, FailureDetectRunner}; -use crate::handler::{HeartbeatAccumulator, HeartbeatHandler}; +use crate::handler::{HandleControl, HeartbeatAccumulator, HeartbeatHandler}; use crate::metasrv::{Context, ElectionRef}; use crate::procedure::region_failover::RegionFailoverManager; @@ -70,7 +70,7 @@ impl HeartbeatHandler for RegionFailureHandler { _: &HeartbeatRequest, ctx: &mut Context, acc: &mut HeartbeatAccumulator, - ) -> Result<()> { + ) -> Result { if ctx.is_infancy { self.failure_detect_runner .send_control(FailureDetectControl::Purge) @@ -78,7 +78,7 @@ impl HeartbeatHandler for RegionFailureHandler { } let Some(stat) = acc.stat.as_ref() else { - return Ok(()); + return Ok(HandleControl::Continue); }; let heartbeat = DatanodeHeartbeat { @@ -101,7 +101,8 @@ impl HeartbeatHandler for RegionFailureHandler { }; self.failure_detect_runner.send_heartbeat(heartbeat).await; - Ok(()) + + Ok(HandleControl::Continue) } } diff --git a/src/meta-srv/src/handler/filter_inactive_region_stats.rs b/src/meta-srv/src/handler/filter_inactive_region_stats.rs index 0f3f240c76..fc1518f8b6 100644 --- a/src/meta-srv/src/handler/filter_inactive_region_stats.rs +++ b/src/meta-srv/src/handler/filter_inactive_region_stats.rs @@ -17,7 +17,7 @@ use async_trait::async_trait; use common_telemetry::warn; use crate::error::Result; -use crate::handler::{HeartbeatAccumulator, HeartbeatHandler}; +use crate::handler::{HandleControl, HeartbeatAccumulator, HeartbeatHandler}; use crate::metasrv::Context; pub struct FilterInactiveRegionStatsHandler; @@ -33,9 +33,9 @@ impl HeartbeatHandler for FilterInactiveRegionStatsHandler { req: &HeartbeatRequest, _ctx: &mut Context, acc: &mut HeartbeatAccumulator, - ) -> Result<()> { + ) -> Result { if acc.inactive_region_ids.is_empty() { - return Ok(()); + return Ok(HandleControl::Continue); } warn!( @@ -44,11 +44,11 @@ impl HeartbeatHandler for FilterInactiveRegionStatsHandler { ); let Some(stat) = acc.stat.as_mut() else { - return Ok(()); + return Ok(HandleControl::Continue); }; stat.retain_active_region_stats(&acc.inactive_region_ids); - Ok(()) + Ok(HandleControl::Continue) } } diff --git a/src/meta-srv/src/handler/keep_lease_handler.rs b/src/meta-srv/src/handler/keep_lease_handler.rs index dc669aee1c..a3b332f00f 100644 --- a/src/meta-srv/src/handler/keep_lease_handler.rs +++ b/src/meta-srv/src/handler/keep_lease_handler.rs @@ -18,7 +18,7 @@ use common_telemetry::{trace, warn}; use common_time::util as time_util; use crate::error::Result; -use crate::handler::{HeartbeatAccumulator, HeartbeatHandler}; +use crate::handler::{HandleControl, HeartbeatAccumulator, HeartbeatHandler}; use crate::keys::{LeaseKey, LeaseValue}; use crate::metasrv::Context; @@ -35,13 +35,13 @@ impl HeartbeatHandler for KeepLeaseHandler { req: &HeartbeatRequest, ctx: &mut Context, _acc: &mut HeartbeatAccumulator, - ) -> Result<()> { + ) -> Result { let HeartbeatRequest { header, peer, .. } = req; let Some(header) = &header else { - return Ok(()); + return Ok(HandleControl::Continue); }; let Some(peer) = &peer else { - return Ok(()); + return Ok(HandleControl::Continue); }; let key = LeaseKey { @@ -69,6 +69,6 @@ impl HeartbeatHandler for KeepLeaseHandler { warn!("Failed to update lease KV, peer: {peer:?}, {err}"); } - Ok(()) + Ok(HandleControl::Continue) } } diff --git a/src/meta-srv/src/handler/mailbox_handler.rs b/src/meta-srv/src/handler/mailbox_handler.rs index 4bc3b543ba..d95d31e977 100644 --- a/src/meta-srv/src/handler/mailbox_handler.rs +++ b/src/meta-srv/src/handler/mailbox_handler.rs @@ -15,7 +15,7 @@ use api::v1::meta::{HeartbeatRequest, Role}; use crate::error::Result; -use crate::handler::{HeartbeatAccumulator, HeartbeatHandler}; +use crate::handler::{HandleControl, HeartbeatAccumulator, HeartbeatHandler}; use crate::metasrv::Context; pub struct MailboxHandler; @@ -31,12 +31,13 @@ impl HeartbeatHandler for MailboxHandler { req: &HeartbeatRequest, ctx: &mut Context, _acc: &mut HeartbeatAccumulator, - ) -> Result<()> { - if let Some(message) = &req.mailbox_message { - ctx.mailbox.on_recv(message.id, Ok(message.clone())).await?; - ctx.set_skip_all(); - } + ) -> Result { + let Some(message) = &req.mailbox_message else { + return Ok(HandleControl::Continue); + }; - Ok(()) + ctx.mailbox.on_recv(message.id, Ok(message.clone())).await?; + + Ok(HandleControl::Done) } } diff --git a/src/meta-srv/src/handler/on_leader_start_handler.rs b/src/meta-srv/src/handler/on_leader_start_handler.rs index 9f32b443b7..58f70005aa 100644 --- a/src/meta-srv/src/handler/on_leader_start_handler.rs +++ b/src/meta-srv/src/handler/on_leader_start_handler.rs @@ -15,7 +15,7 @@ use api::v1::meta::{HeartbeatRequest, Role}; use crate::error::Result; -use crate::handler::{HeartbeatAccumulator, HeartbeatHandler}; +use crate::handler::{HandleControl, HeartbeatAccumulator, HeartbeatHandler}; use crate::metasrv::Context; pub struct OnLeaderStartHandler; @@ -31,16 +31,19 @@ impl HeartbeatHandler for OnLeaderStartHandler { _req: &HeartbeatRequest, ctx: &mut Context, _acc: &mut HeartbeatAccumulator, - ) -> Result<()> { - if let Some(election) = &ctx.election { - if election.in_infancy() { - ctx.is_infancy = true; - // TODO(weny): Unifies the multiple leader state between Context and MetaSrv. - // we can't ensure the in-memory kv has already been reset in the outside loop. - // We still use heartbeat requests to trigger resetting in-memory kv. - ctx.reset_in_memory(); - } + ) -> Result { + let Some(election) = &ctx.election else { + return Ok(HandleControl::Continue); + }; + + if election.in_infancy() { + ctx.is_infancy = true; + // TODO(weny): Unifies the multiple leader state between Context and MetaSrv. + // we can't ensure the in-memory kv has already been reset in the outside loop. + // We still use heartbeat requests to trigger resetting in-memory kv. + ctx.reset_in_memory(); } - Ok(()) + + Ok(HandleControl::Continue) } } diff --git a/src/meta-srv/src/handler/persist_stats_handler.rs b/src/meta-srv/src/handler/persist_stats_handler.rs index 488d10c203..4d4ade748f 100644 --- a/src/meta-srv/src/handler/persist_stats_handler.rs +++ b/src/meta-srv/src/handler/persist_stats_handler.rs @@ -22,7 +22,7 @@ use snafu::ResultExt; use crate::error::{self, Result}; use crate::handler::node_stat::Stat; -use crate::handler::{HeartbeatAccumulator, HeartbeatHandler}; +use crate::handler::{HandleControl, HeartbeatAccumulator, HeartbeatHandler}; use crate::keys::{StatKey, StatValue}; use crate::metasrv::Context; @@ -82,9 +82,9 @@ impl HeartbeatHandler for PersistStatsHandler { _req: &HeartbeatRequest, ctx: &mut Context, acc: &mut HeartbeatAccumulator, - ) -> Result<()> { + ) -> Result { let Some(current_stat) = acc.stat.take() else { - return Ok(()); + return Ok(HandleControl::Continue); }; let key = current_stat.stat_key(); @@ -118,7 +118,7 @@ impl HeartbeatHandler for PersistStatsHandler { epoch_stats.push(current_stat); if !refresh && epoch_stats.len() < MAX_CACHED_STATS_PER_KEY { - return Ok(()); + return Ok(HandleControl::Continue); } let value: Vec = StatValue { @@ -137,13 +137,12 @@ impl HeartbeatHandler for PersistStatsHandler { .await .context(error::KvBackendSnafu)?; - Ok(()) + Ok(HandleControl::Continue) } } #[cfg(test)] mod tests { - use std::sync::atomic::AtomicBool; use std::sync::Arc; use common_meta::key::TableMetadataManager; @@ -180,7 +179,6 @@ mod tests { meta_peer_client, mailbox, election: None, - skip_all: Arc::new(AtomicBool::new(false)), is_infancy: false, table_metadata_manager: Arc::new(TableMetadataManager::new(kv_backend.clone())), }; diff --git a/src/meta-srv/src/handler/publish_heartbeat_handler.rs b/src/meta-srv/src/handler/publish_heartbeat_handler.rs index beceb4fe9a..b5fb8572f5 100644 --- a/src/meta-srv/src/handler/publish_heartbeat_handler.rs +++ b/src/meta-srv/src/handler/publish_heartbeat_handler.rs @@ -16,7 +16,7 @@ use api::v1::meta::{HeartbeatRequest, Role}; use async_trait::async_trait; use crate::error::Result; -use crate::handler::{HeartbeatAccumulator, HeartbeatHandler}; +use crate::handler::{HandleControl, HeartbeatAccumulator, HeartbeatHandler}; use crate::metasrv::Context; use crate::pubsub::{Message, PublishRef}; @@ -41,10 +41,10 @@ impl HeartbeatHandler for PublishHeartbeatHandler { req: &HeartbeatRequest, _: &mut Context, _: &mut HeartbeatAccumulator, - ) -> Result<()> { + ) -> Result { let msg = Message::Heartbeat(Box::new(req.clone())); self.publish.send_msg(msg).await; - Ok(()) + Ok(HandleControl::Continue) } } diff --git a/src/meta-srv/src/handler/region_lease_handler.rs b/src/meta-srv/src/handler/region_lease_handler.rs index 7ef74713c8..8ca70bd32f 100644 --- a/src/meta-srv/src/handler/region_lease_handler.rs +++ b/src/meta-srv/src/handler/region_lease_handler.rs @@ -22,7 +22,7 @@ use store_api::region_engine::{GrantedRegion, RegionRole}; use store_api::storage::RegionId; use crate::error::Result; -use crate::handler::{HeartbeatAccumulator, HeartbeatHandler}; +use crate::handler::{HandleControl, HeartbeatAccumulator, HeartbeatHandler}; use crate::metasrv::Context; use crate::region::lease_keeper::{OpeningRegionKeeperRef, RegionLeaseKeeperRef}; use crate::region::RegionLeaseKeeper; @@ -90,9 +90,9 @@ impl HeartbeatHandler for RegionLeaseHandler { req: &HeartbeatRequest, _ctx: &mut Context, acc: &mut HeartbeatAccumulator, - ) -> Result<()> { + ) -> Result { let Some(stat) = acc.stat.as_ref() else { - return Ok(()); + return Ok(HandleControl::Continue); }; let regions = stat.regions(); @@ -152,7 +152,7 @@ impl HeartbeatHandler for RegionLeaseHandler { lease_seconds: self.region_lease_seconds, }); - Ok(()) + Ok(HandleControl::Continue) } } diff --git a/src/meta-srv/src/handler/response_header_handler.rs b/src/meta-srv/src/handler/response_header_handler.rs index ce1731411e..3b588fb6f0 100644 --- a/src/meta-srv/src/handler/response_header_handler.rs +++ b/src/meta-srv/src/handler/response_header_handler.rs @@ -15,7 +15,7 @@ use api::v1::meta::{HeartbeatRequest, ResponseHeader, Role, PROTOCOL_VERSION}; use crate::error::Result; -use crate::handler::{HeartbeatAccumulator, HeartbeatHandler}; +use crate::handler::{HandleControl, HeartbeatAccumulator, HeartbeatHandler}; use crate::metasrv::Context; pub struct ResponseHeaderHandler; @@ -31,7 +31,7 @@ impl HeartbeatHandler for ResponseHeaderHandler { req: &HeartbeatRequest, _ctx: &mut Context, acc: &mut HeartbeatAccumulator, - ) -> Result<()> { + ) -> Result { let HeartbeatRequest { header, .. } = req; let res_header = ResponseHeader { protocol_version: PROTOCOL_VERSION, @@ -40,13 +40,12 @@ impl HeartbeatHandler for ResponseHeaderHandler { }; acc.header = Some(res_header); - Ok(()) + Ok(HandleControl::Continue) } } #[cfg(test)] mod tests { - use std::sync::atomic::AtomicBool; use std::sync::Arc; use api::v1::meta::{HeartbeatResponse, RequestHeader}; @@ -84,7 +83,6 @@ mod tests { meta_peer_client, mailbox, election: None, - skip_all: Arc::new(AtomicBool::new(false)), is_infancy: false, table_metadata_manager: Arc::new(TableMetadataManager::new(kv_backend.clone())), }; diff --git a/src/meta-srv/src/metasrv.rs b/src/meta-srv/src/metasrv.rs index 0db67bd0b1..9821d628b7 100644 --- a/src/meta-srv/src/metasrv.rs +++ b/src/meta-srv/src/metasrv.rs @@ -142,20 +142,11 @@ pub struct Context { pub meta_peer_client: MetaPeerClientRef, pub mailbox: MailboxRef, pub election: Option, - pub skip_all: Arc, pub is_infancy: bool, pub table_metadata_manager: TableMetadataManagerRef, } impl Context { - pub fn is_skip_all(&self) -> bool { - self.skip_all.load(Ordering::Relaxed) - } - - pub fn set_skip_all(&self) { - self.skip_all.store(true, Ordering::Relaxed); - } - pub fn reset_in_memory(&self) { self.in_memory.reset(); } @@ -430,7 +421,6 @@ impl MetaSrv { let meta_peer_client = self.meta_peer_client.clone(); let mailbox = self.mailbox.clone(); let election = self.election.clone(); - let skip_all = Arc::new(AtomicBool::new(false)); let table_metadata_manager = self.table_metadata_manager.clone(); Context { @@ -441,7 +431,6 @@ impl MetaSrv { meta_peer_client, mailbox, election, - skip_all, is_infancy: false, table_metadata_manager, } diff --git a/src/meta-srv/src/service.rs b/src/meta-srv/src/service.rs index 733bef295b..3a13b1fe50 100644 --- a/src/meta-srv/src/service.rs +++ b/src/meta-srv/src/service.rs @@ -25,5 +25,5 @@ pub mod lock; pub mod mailbox; pub mod store; -pub type GrpcResult = std::result::Result, Status>; +pub type GrpcResult = Result, Status>; pub type GrpcStream = Pin> + Send + Sync + 'static>>; diff --git a/src/meta-srv/src/table_meta_alloc.rs b/src/meta-srv/src/table_meta_alloc.rs index 60fba75390..395104a9fc 100644 --- a/src/meta-srv/src/table_meta_alloc.rs +++ b/src/meta-srv/src/table_meta_alloc.rs @@ -91,7 +91,16 @@ async fn handle_create_region_routes( .await?; if peers.len() < partitions.len() { - warn!("Create table failed due to no enough available datanodes, table: {}, partition number: {}, datanode number: {}", format_full_table_name(&table_info.catalog_name,&table_info.schema_name,&table_info.name), partitions.len(), peers.len()); + warn!( + "Create table failed due to no enough available datanodes, table: {}, partition number: {}, datanode number: {}", + format_full_table_name( + &table_info.catalog_name, + &table_info.schema_name, + &table_info.name + ), + partitions.len(), + peers.len() + ); return error::NoEnoughAvailableDatanodeSnafu { required: partitions.len(), available: peers.len(),