feat: heartbeat handler control (#2780)

This commit is contained in:
JeremyHi
2023-11-21 10:48:11 +08:00
committed by GitHub
parent 5f87b1f714
commit dc351a6de9
15 changed files with 105 additions and 88 deletions

View File

@@ -67,7 +67,16 @@ pub trait HeartbeatHandler: Send + Sync {
req: &HeartbeatRequest,
ctx: &mut Context,
acc: &mut HeartbeatAccumulator,
) -> Result<()>;
) -> Result<HandleControl>;
}
/// HandleControl
///
/// Controls process of handling heartbeat request.
#[derive(PartialEq)]
pub enum HandleControl {
Continue,
Done,
}
#[derive(Debug, Default)]
@@ -246,15 +255,16 @@ impl HeartbeatHandlerGroup {
})?;
for NameCachedHandler { name, handler } in handlers.iter() {
if ctx.is_skip_all() {
break;
if !handler.is_acceptable(role) {
continue;
}
if handler.is_acceptable(role) {
let _timer = METRIC_META_HANDLER_EXECUTE
.with_label_values(&[*name])
.start_timer();
handler.handle(&req, &mut ctx, &mut acc).await?;
let _timer = METRIC_META_HANDLER_EXECUTE
.with_label_values(&[*name])
.start_timer();
if handler.handle(&req, &mut ctx, &mut acc).await? == HandleControl::Done {
break;
}
}
let header = std::mem::take(&mut acc.header);

View File

@@ -16,7 +16,7 @@ use api::v1::meta::{Error, HeartbeatRequest, Role};
use common_telemetry::warn;
use crate::error::Result;
use crate::handler::{HeartbeatAccumulator, HeartbeatHandler};
use crate::handler::{HandleControl, HeartbeatAccumulator, HeartbeatHandler};
use crate::metasrv::Context;
pub struct CheckLeaderHandler;
@@ -32,17 +32,25 @@ impl HeartbeatHandler for CheckLeaderHandler {
req: &HeartbeatRequest,
ctx: &mut Context,
acc: &mut HeartbeatAccumulator,
) -> Result<()> {
if let Some(election) = &ctx.election {
if election.is_leader() {
return Ok(());
}
if let Some(header) = &mut acc.header {
header.error = Some(Error::is_not_leader());
ctx.set_skip_all();
warn!("Received a heartbeat {:?}, but the current node is not the leader, so the heartbeat will be ignored.", req.header);
}
) -> Result<HandleControl> {
let Some(election) = &ctx.election else {
return Ok(HandleControl::Continue);
};
if election.is_leader() {
return Ok(HandleControl::Continue);
}
Ok(())
warn!(
"A heartbeat was received {:?}, however, since the current node is not the leader,\
this heartbeat will be disregarded.",
req.header
);
if let Some(header) = &mut acc.header {
header.error = Some(Error::is_not_leader());
}
return Ok(HandleControl::Done);
}
}

View File

@@ -17,7 +17,7 @@ use common_telemetry::warn;
use super::node_stat::Stat;
use crate::error::Result;
use crate::handler::{HeartbeatAccumulator, HeartbeatHandler};
use crate::handler::{HandleControl, HeartbeatAccumulator, HeartbeatHandler};
use crate::metasrv::Context;
pub struct CollectStatsHandler;
@@ -33,11 +33,11 @@ impl HeartbeatHandler for CollectStatsHandler {
req: &HeartbeatRequest,
_ctx: &mut Context,
acc: &mut HeartbeatAccumulator,
) -> Result<()> {
) -> Result<HandleControl> {
if req.mailbox_message.is_some() {
// If the heartbeat is a mailbox message, it may have no other valid information,
// so we don't need to collect stats.
return Ok(());
return Ok(HandleControl::Continue);
}
match Stat::try_from(req.clone()) {
@@ -49,6 +49,6 @@ impl HeartbeatHandler for CollectStatsHandler {
}
};
Ok(())
Ok(HandleControl::Continue)
}
}

View File

@@ -25,7 +25,7 @@ use store_api::storage::RegionId;
use crate::error::Result;
use crate::failure_detector::PhiAccrualFailureDetectorOptions;
use crate::handler::failure_handler::runner::{FailureDetectControl, FailureDetectRunner};
use crate::handler::{HeartbeatAccumulator, HeartbeatHandler};
use crate::handler::{HandleControl, HeartbeatAccumulator, HeartbeatHandler};
use crate::metasrv::{Context, ElectionRef};
use crate::procedure::region_failover::RegionFailoverManager;
@@ -70,7 +70,7 @@ impl HeartbeatHandler for RegionFailureHandler {
_: &HeartbeatRequest,
ctx: &mut Context,
acc: &mut HeartbeatAccumulator,
) -> Result<()> {
) -> Result<HandleControl> {
if ctx.is_infancy {
self.failure_detect_runner
.send_control(FailureDetectControl::Purge)
@@ -78,7 +78,7 @@ impl HeartbeatHandler for RegionFailureHandler {
}
let Some(stat) = acc.stat.as_ref() else {
return Ok(());
return Ok(HandleControl::Continue);
};
let heartbeat = DatanodeHeartbeat {
@@ -101,7 +101,8 @@ impl HeartbeatHandler for RegionFailureHandler {
};
self.failure_detect_runner.send_heartbeat(heartbeat).await;
Ok(())
Ok(HandleControl::Continue)
}
}

View File

@@ -17,7 +17,7 @@ use async_trait::async_trait;
use common_telemetry::warn;
use crate::error::Result;
use crate::handler::{HeartbeatAccumulator, HeartbeatHandler};
use crate::handler::{HandleControl, HeartbeatAccumulator, HeartbeatHandler};
use crate::metasrv::Context;
pub struct FilterInactiveRegionStatsHandler;
@@ -33,9 +33,9 @@ impl HeartbeatHandler for FilterInactiveRegionStatsHandler {
req: &HeartbeatRequest,
_ctx: &mut Context,
acc: &mut HeartbeatAccumulator,
) -> Result<()> {
) -> Result<HandleControl> {
if acc.inactive_region_ids.is_empty() {
return Ok(());
return Ok(HandleControl::Continue);
}
warn!(
@@ -44,11 +44,11 @@ impl HeartbeatHandler for FilterInactiveRegionStatsHandler {
);
let Some(stat) = acc.stat.as_mut() else {
return Ok(());
return Ok(HandleControl::Continue);
};
stat.retain_active_region_stats(&acc.inactive_region_ids);
Ok(())
Ok(HandleControl::Continue)
}
}

View File

@@ -18,7 +18,7 @@ use common_telemetry::{trace, warn};
use common_time::util as time_util;
use crate::error::Result;
use crate::handler::{HeartbeatAccumulator, HeartbeatHandler};
use crate::handler::{HandleControl, HeartbeatAccumulator, HeartbeatHandler};
use crate::keys::{LeaseKey, LeaseValue};
use crate::metasrv::Context;
@@ -35,13 +35,13 @@ impl HeartbeatHandler for KeepLeaseHandler {
req: &HeartbeatRequest,
ctx: &mut Context,
_acc: &mut HeartbeatAccumulator,
) -> Result<()> {
) -> Result<HandleControl> {
let HeartbeatRequest { header, peer, .. } = req;
let Some(header) = &header else {
return Ok(());
return Ok(HandleControl::Continue);
};
let Some(peer) = &peer else {
return Ok(());
return Ok(HandleControl::Continue);
};
let key = LeaseKey {
@@ -69,6 +69,6 @@ impl HeartbeatHandler for KeepLeaseHandler {
warn!("Failed to update lease KV, peer: {peer:?}, {err}");
}
Ok(())
Ok(HandleControl::Continue)
}
}

View File

@@ -15,7 +15,7 @@
use api::v1::meta::{HeartbeatRequest, Role};
use crate::error::Result;
use crate::handler::{HeartbeatAccumulator, HeartbeatHandler};
use crate::handler::{HandleControl, HeartbeatAccumulator, HeartbeatHandler};
use crate::metasrv::Context;
pub struct MailboxHandler;
@@ -31,12 +31,13 @@ impl HeartbeatHandler for MailboxHandler {
req: &HeartbeatRequest,
ctx: &mut Context,
_acc: &mut HeartbeatAccumulator,
) -> Result<()> {
if let Some(message) = &req.mailbox_message {
ctx.mailbox.on_recv(message.id, Ok(message.clone())).await?;
ctx.set_skip_all();
}
) -> Result<HandleControl> {
let Some(message) = &req.mailbox_message else {
return Ok(HandleControl::Continue);
};
Ok(())
ctx.mailbox.on_recv(message.id, Ok(message.clone())).await?;
Ok(HandleControl::Done)
}
}

View File

@@ -15,7 +15,7 @@
use api::v1::meta::{HeartbeatRequest, Role};
use crate::error::Result;
use crate::handler::{HeartbeatAccumulator, HeartbeatHandler};
use crate::handler::{HandleControl, HeartbeatAccumulator, HeartbeatHandler};
use crate::metasrv::Context;
pub struct OnLeaderStartHandler;
@@ -31,16 +31,19 @@ impl HeartbeatHandler for OnLeaderStartHandler {
_req: &HeartbeatRequest,
ctx: &mut Context,
_acc: &mut HeartbeatAccumulator,
) -> Result<()> {
if let Some(election) = &ctx.election {
if election.in_infancy() {
ctx.is_infancy = true;
// TODO(weny): Unifies the multiple leader state between Context and MetaSrv.
// we can't ensure the in-memory kv has already been reset in the outside loop.
// We still use heartbeat requests to trigger resetting in-memory kv.
ctx.reset_in_memory();
}
) -> Result<HandleControl> {
let Some(election) = &ctx.election else {
return Ok(HandleControl::Continue);
};
if election.in_infancy() {
ctx.is_infancy = true;
// TODO(weny): Unifies the multiple leader state between Context and MetaSrv.
// we can't ensure the in-memory kv has already been reset in the outside loop.
// We still use heartbeat requests to trigger resetting in-memory kv.
ctx.reset_in_memory();
}
Ok(())
Ok(HandleControl::Continue)
}
}

View File

@@ -22,7 +22,7 @@ use snafu::ResultExt;
use crate::error::{self, Result};
use crate::handler::node_stat::Stat;
use crate::handler::{HeartbeatAccumulator, HeartbeatHandler};
use crate::handler::{HandleControl, HeartbeatAccumulator, HeartbeatHandler};
use crate::keys::{StatKey, StatValue};
use crate::metasrv::Context;
@@ -82,9 +82,9 @@ impl HeartbeatHandler for PersistStatsHandler {
_req: &HeartbeatRequest,
ctx: &mut Context,
acc: &mut HeartbeatAccumulator,
) -> Result<()> {
) -> Result<HandleControl> {
let Some(current_stat) = acc.stat.take() else {
return Ok(());
return Ok(HandleControl::Continue);
};
let key = current_stat.stat_key();
@@ -118,7 +118,7 @@ impl HeartbeatHandler for PersistStatsHandler {
epoch_stats.push(current_stat);
if !refresh && epoch_stats.len() < MAX_CACHED_STATS_PER_KEY {
return Ok(());
return Ok(HandleControl::Continue);
}
let value: Vec<u8> = StatValue {
@@ -137,13 +137,12 @@ impl HeartbeatHandler for PersistStatsHandler {
.await
.context(error::KvBackendSnafu)?;
Ok(())
Ok(HandleControl::Continue)
}
}
#[cfg(test)]
mod tests {
use std::sync::atomic::AtomicBool;
use std::sync::Arc;
use common_meta::key::TableMetadataManager;
@@ -180,7 +179,6 @@ mod tests {
meta_peer_client,
mailbox,
election: None,
skip_all: Arc::new(AtomicBool::new(false)),
is_infancy: false,
table_metadata_manager: Arc::new(TableMetadataManager::new(kv_backend.clone())),
};

View File

@@ -16,7 +16,7 @@ use api::v1::meta::{HeartbeatRequest, Role};
use async_trait::async_trait;
use crate::error::Result;
use crate::handler::{HeartbeatAccumulator, HeartbeatHandler};
use crate::handler::{HandleControl, HeartbeatAccumulator, HeartbeatHandler};
use crate::metasrv::Context;
use crate::pubsub::{Message, PublishRef};
@@ -41,10 +41,10 @@ impl HeartbeatHandler for PublishHeartbeatHandler {
req: &HeartbeatRequest,
_: &mut Context,
_: &mut HeartbeatAccumulator,
) -> Result<()> {
) -> Result<HandleControl> {
let msg = Message::Heartbeat(Box::new(req.clone()));
self.publish.send_msg(msg).await;
Ok(())
Ok(HandleControl::Continue)
}
}

View File

@@ -22,7 +22,7 @@ use store_api::region_engine::{GrantedRegion, RegionRole};
use store_api::storage::RegionId;
use crate::error::Result;
use crate::handler::{HeartbeatAccumulator, HeartbeatHandler};
use crate::handler::{HandleControl, HeartbeatAccumulator, HeartbeatHandler};
use crate::metasrv::Context;
use crate::region::lease_keeper::{OpeningRegionKeeperRef, RegionLeaseKeeperRef};
use crate::region::RegionLeaseKeeper;
@@ -90,9 +90,9 @@ impl HeartbeatHandler for RegionLeaseHandler {
req: &HeartbeatRequest,
_ctx: &mut Context,
acc: &mut HeartbeatAccumulator,
) -> Result<()> {
) -> Result<HandleControl> {
let Some(stat) = acc.stat.as_ref() else {
return Ok(());
return Ok(HandleControl::Continue);
};
let regions = stat.regions();
@@ -152,7 +152,7 @@ impl HeartbeatHandler for RegionLeaseHandler {
lease_seconds: self.region_lease_seconds,
});
Ok(())
Ok(HandleControl::Continue)
}
}

View File

@@ -15,7 +15,7 @@
use api::v1::meta::{HeartbeatRequest, ResponseHeader, Role, PROTOCOL_VERSION};
use crate::error::Result;
use crate::handler::{HeartbeatAccumulator, HeartbeatHandler};
use crate::handler::{HandleControl, HeartbeatAccumulator, HeartbeatHandler};
use crate::metasrv::Context;
pub struct ResponseHeaderHandler;
@@ -31,7 +31,7 @@ impl HeartbeatHandler for ResponseHeaderHandler {
req: &HeartbeatRequest,
_ctx: &mut Context,
acc: &mut HeartbeatAccumulator,
) -> Result<()> {
) -> Result<HandleControl> {
let HeartbeatRequest { header, .. } = req;
let res_header = ResponseHeader {
protocol_version: PROTOCOL_VERSION,
@@ -40,13 +40,12 @@ impl HeartbeatHandler for ResponseHeaderHandler {
};
acc.header = Some(res_header);
Ok(())
Ok(HandleControl::Continue)
}
}
#[cfg(test)]
mod tests {
use std::sync::atomic::AtomicBool;
use std::sync::Arc;
use api::v1::meta::{HeartbeatResponse, RequestHeader};
@@ -84,7 +83,6 @@ mod tests {
meta_peer_client,
mailbox,
election: None,
skip_all: Arc::new(AtomicBool::new(false)),
is_infancy: false,
table_metadata_manager: Arc::new(TableMetadataManager::new(kv_backend.clone())),
};

View File

@@ -142,20 +142,11 @@ pub struct Context {
pub meta_peer_client: MetaPeerClientRef,
pub mailbox: MailboxRef,
pub election: Option<ElectionRef>,
pub skip_all: Arc<AtomicBool>,
pub is_infancy: bool,
pub table_metadata_manager: TableMetadataManagerRef,
}
impl Context {
pub fn is_skip_all(&self) -> bool {
self.skip_all.load(Ordering::Relaxed)
}
pub fn set_skip_all(&self) {
self.skip_all.store(true, Ordering::Relaxed);
}
pub fn reset_in_memory(&self) {
self.in_memory.reset();
}
@@ -430,7 +421,6 @@ impl MetaSrv {
let meta_peer_client = self.meta_peer_client.clone();
let mailbox = self.mailbox.clone();
let election = self.election.clone();
let skip_all = Arc::new(AtomicBool::new(false));
let table_metadata_manager = self.table_metadata_manager.clone();
Context {
@@ -441,7 +431,6 @@ impl MetaSrv {
meta_peer_client,
mailbox,
election,
skip_all,
is_infancy: false,
table_metadata_manager,
}

View File

@@ -25,5 +25,5 @@ pub mod lock;
pub mod mailbox;
pub mod store;
pub type GrpcResult<T> = std::result::Result<Response<T>, Status>;
pub type GrpcResult<T> = Result<Response<T>, Status>;
pub type GrpcStream<T> = Pin<Box<dyn Stream<Item = Result<T, Status>> + Send + Sync + 'static>>;

View File

@@ -91,7 +91,16 @@ async fn handle_create_region_routes(
.await?;
if peers.len() < partitions.len() {
warn!("Create table failed due to no enough available datanodes, table: {}, partition number: {}, datanode number: {}", format_full_table_name(&table_info.catalog_name,&table_info.schema_name,&table_info.name), partitions.len(), peers.len());
warn!(
"Create table failed due to no enough available datanodes, table: {}, partition number: {}, datanode number: {}",
format_full_table_name(
&table_info.catalog_name,
&table_info.schema_name,
&table_info.name
),
partitions.len(),
peers.len()
);
return error::NoEnoughAvailableDatanodeSnafu {
required: partitions.len(),
available: peers.len(),