feat: forgot collect flownode clusterinfo handler (#4236)

* feat: forgot collect flownode clusterinfo handler

* fix: unit test

* fix: filter stale heartbeat
This commit is contained in:
Jeremyhi
2024-07-01 14:42:31 +08:00
committed by GitHub
parent fe2c5c3735
commit 5d396bd6d7
5 changed files with 295 additions and 265 deletions

View File

@@ -43,13 +43,13 @@ use crate::service::mailbox::{
pub mod check_leader_handler;
pub mod collect_cluster_info_handler;
pub mod collect_stats_handler;
pub mod extract_stat_handler;
pub mod failure_handler;
pub mod filter_inactive_region_stats;
pub mod keep_lease_handler;
pub mod mailbox_handler;
pub mod node_stat;
pub mod on_leader_start_handler;
pub mod persist_stats_handler;
pub mod publish_heartbeat_handler;
pub mod region_lease_handler;
pub mod response_header_handler;
@@ -439,10 +439,16 @@ mod tests {
use tokio::sync::mpsc;
use crate::handler::check_leader_handler::CheckLeaderHandler;
use crate::handler::collect_cluster_info_handler::{
CollectDatanodeClusterInfoHandler, CollectFlownodeClusterInfoHandler,
CollectFrontendClusterInfoHandler,
};
use crate::handler::collect_stats_handler::CollectStatsHandler;
use crate::handler::extract_stat_handler::ExtractStatHandler;
use crate::handler::filter_inactive_region_stats::FilterInactiveRegionStatsHandler;
use crate::handler::keep_lease_handler::{DatanodeKeepLeaseHandler, FlownodeKeepLeaseHandler};
use crate::handler::mailbox_handler::MailboxHandler;
use crate::handler::on_leader_start_handler::OnLeaderStartHandler;
use crate::handler::persist_stats_handler::PersistStatsHandler;
use crate::handler::response_header_handler::ResponseHeaderHandler;
use crate::handler::{HeartbeatHandlerGroup, HeartbeatMailbox, Pusher};
use crate::service::mailbox::{Channel, MailboxReceiver, MailboxRef};
@@ -516,20 +522,39 @@ mod tests {
async fn test_handler_name() {
let group = HeartbeatHandlerGroup::default();
group.add_handler(ResponseHeaderHandler).await;
group.add_handler(DatanodeKeepLeaseHandler).await;
group.add_handler(FlownodeKeepLeaseHandler).await;
group.add_handler(CheckLeaderHandler).await;
group.add_handler(OnLeaderStartHandler).await;
group.add_handler(CollectStatsHandler).await;
group.add_handler(ExtractStatHandler).await;
group.add_handler(CollectDatanodeClusterInfoHandler).await;
group.add_handler(CollectFrontendClusterInfoHandler).await;
group.add_handler(CollectFlownodeClusterInfoHandler).await;
group.add_handler(MailboxHandler).await;
group.add_handler(PersistStatsHandler::default()).await;
group.add_handler(FilterInactiveRegionStatsHandler).await;
group.add_handler(CollectStatsHandler::default()).await;
let handlers = group.handlers.read().await;
assert_eq!(6, handlers.len());
assert_eq!("ResponseHeaderHandler", handlers[0].handler.name());
assert_eq!("CheckLeaderHandler", handlers[1].handler.name());
assert_eq!("OnLeaderStartHandler", handlers[2].handler.name());
assert_eq!("CollectStatsHandler", handlers[3].handler.name());
assert_eq!("MailboxHandler", handlers[4].handler.name());
assert_eq!("PersistStatsHandler", handlers[5].handler.name());
assert_eq!(12, handlers.len());
let names = [
"ResponseHeaderHandler",
"DatanodeKeepLeaseHandler",
"FlownodeKeepLeaseHandler",
"CheckLeaderHandler",
"OnLeaderStartHandler",
"ExtractStatHandler",
"CollectDatanodeClusterInfoHandler",
"CollectFrontendClusterInfoHandler",
"CollectFlownodeClusterInfoHandler",
"MailboxHandler",
"FilterInactiveRegionStatsHandler",
"CollectStatsHandler",
];
for (handler, name) in handlers.iter().zip(names.into_iter()) {
assert_eq!(handler.name, name);
}
}
}

View File

@@ -12,15 +12,64 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use api::v1::meta::{HeartbeatRequest, Role};
use common_telemetry::warn;
use std::cmp::Ordering;
use super::node_stat::Stat;
use crate::error::Result;
use api::v1::meta::{HeartbeatRequest, Role};
use common_meta::rpc::store::PutRequest;
use common_telemetry::warn;
use dashmap::DashMap;
use snafu::ResultExt;
use crate::error::{self, Result};
use crate::handler::node_stat::Stat;
use crate::handler::{HandleControl, HeartbeatAccumulator, HeartbeatHandler};
use crate::key::{DatanodeStatKey, DatanodeStatValue};
use crate::metasrv::Context;
pub struct CollectStatsHandler;
const MAX_CACHED_STATS_PER_KEY: usize = 10;
#[derive(Default)]
struct EpochStats {
stats: Vec<Stat>,
epoch: Option<u64>,
}
impl EpochStats {
#[inline]
fn drain_all(&mut self) -> Vec<Stat> {
self.stats.drain(..).collect()
}
#[inline]
fn clear_stats(&mut self) {
self.stats.clear();
}
#[inline]
fn push_stat(&mut self, stat: Stat) {
self.stats.push(stat);
}
#[inline]
fn len(&self) -> usize {
self.stats.len()
}
#[inline]
fn epoch(&self) -> Option<u64> {
self.epoch
}
#[inline]
fn set_epoch(&mut self, epoch: u64) {
self.epoch = Some(epoch);
}
}
#[derive(Default)]
pub struct CollectStatsHandler {
stats_cache: DashMap<DatanodeStatKey, EpochStats>,
}
#[async_trait::async_trait]
impl HeartbeatHandler for CollectStatsHandler {
@@ -30,25 +79,158 @@ impl HeartbeatHandler for CollectStatsHandler {
async fn handle(
&self,
req: &HeartbeatRequest,
_ctx: &mut Context,
_req: &HeartbeatRequest,
ctx: &mut Context,
acc: &mut HeartbeatAccumulator,
) -> Result<HandleControl> {
if req.mailbox_message.is_some() {
// If the heartbeat is a mailbox message, it may have no other valid information,
// so we don't need to collect stats.
let Some(current_stat) = acc.stat.take() else {
return Ok(HandleControl::Continue);
};
let key = current_stat.stat_key();
let mut entry = self.stats_cache.entry(key).or_default();
let key: Vec<u8> = key.into();
let epoch_stats = entry.value_mut();
let refresh = if let Some(epoch) = epoch_stats.epoch() {
match current_stat.node_epoch.cmp(&epoch) {
Ordering::Greater => {
// This node may have been redeployed.
epoch_stats.clear_stats();
epoch_stats.set_epoch(current_stat.node_epoch);
epoch_stats.push_stat(current_stat);
true
}
Ordering::Equal => {
epoch_stats.push_stat(current_stat);
false
}
Ordering::Less => {
warn!("Ignore stale heartbeat: {:?}", current_stat);
false
}
}
} else {
epoch_stats.set_epoch(current_stat.node_epoch);
epoch_stats.push_stat(current_stat);
// If the epoch is empty, it indicates that the current node sending the heartbeat
// for the first time to the current meta leader, so it is necessary to save
// the data to the KV store as soon as possible.
true
};
if !refresh && epoch_stats.len() < MAX_CACHED_STATS_PER_KEY {
return Ok(HandleControl::Continue);
}
match Stat::try_from(req.clone()) {
Ok(stat) => {
let _ = acc.stat.insert(stat);
}
Err(err) => {
warn!(err; "Incomplete heartbeat data: {:?}", req);
}
let value: Vec<u8> = DatanodeStatValue {
stats: epoch_stats.drain_all(),
}
.try_into()?;
let put = PutRequest {
key,
value,
..Default::default()
};
let _ = ctx
.in_memory
.put(put)
.await
.context(error::KvBackendSnafu)?;
Ok(HandleControl::Continue)
}
}
#[cfg(test)]
mod tests {
use std::sync::Arc;
use common_meta::key::TableMetadataManager;
use common_meta::kv_backend::memory::MemoryKvBackend;
use common_meta::sequence::SequenceBuilder;
use super::*;
use crate::cluster::MetaPeerClientBuilder;
use crate::handler::{HeartbeatMailbox, Pushers};
use crate::key::DatanodeStatKey;
use crate::service::store::cached_kv::LeaderCachedKvBackend;
#[tokio::test]
async fn test_handle_datanode_stats() {
let in_memory = Arc::new(MemoryKvBackend::new());
let kv_backend = Arc::new(MemoryKvBackend::new());
let leader_cached_kv_backend = Arc::new(LeaderCachedKvBackend::with_always_leader(
kv_backend.clone(),
));
let seq = SequenceBuilder::new("test_seq", kv_backend.clone()).build();
let mailbox = HeartbeatMailbox::create(Pushers::default(), seq);
let meta_peer_client = MetaPeerClientBuilder::default()
.election(None)
.in_memory(in_memory.clone())
.build()
.map(Arc::new)
// Safety: all required fields set at initialization
.unwrap();
let ctx = Context {
server_addr: "127.0.0.1:0000".to_string(),
in_memory,
kv_backend: kv_backend.clone(),
leader_cached_kv_backend,
meta_peer_client,
mailbox,
election: None,
is_infancy: false,
table_metadata_manager: Arc::new(TableMetadataManager::new(kv_backend.clone())),
};
let handler = CollectStatsHandler::default();
handle_request_many_times(ctx.clone(), &handler, 1).await;
let key = DatanodeStatKey {
cluster_id: 3,
node_id: 101,
};
let key: Vec<u8> = key.into();
let res = ctx.in_memory.get(&key).await.unwrap();
let kv = res.unwrap();
let key: DatanodeStatKey = kv.key.clone().try_into().unwrap();
assert_eq!(3, key.cluster_id);
assert_eq!(101, key.node_id);
let val: DatanodeStatValue = kv.value.try_into().unwrap();
// first new stat must be set in kv store immediately
assert_eq!(1, val.stats.len());
assert_eq!(1, val.stats[0].region_num);
handle_request_many_times(ctx.clone(), &handler, 10).await;
let key: Vec<u8> = key.into();
let res = ctx.in_memory.get(&key).await.unwrap();
let kv = res.unwrap();
let val: DatanodeStatValue = kv.value.try_into().unwrap();
// refresh every 10 stats
assert_eq!(10, val.stats.len());
}
async fn handle_request_many_times(
mut ctx: Context,
handler: &CollectStatsHandler,
loop_times: i32,
) {
let req = HeartbeatRequest::default();
for i in 1..=loop_times {
let mut acc = HeartbeatAccumulator {
stat: Some(Stat {
cluster_id: 3,
id: 101,
region_num: i as _,
..Default::default()
}),
..Default::default()
};
handler.handle(&req, &mut ctx, &mut acc).await.unwrap();
}
}
}

View File

@@ -0,0 +1,54 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use api::v1::meta::{HeartbeatRequest, Role};
use common_telemetry::warn;
use super::node_stat::Stat;
use crate::error::Result;
use crate::handler::{HandleControl, HeartbeatAccumulator, HeartbeatHandler};
use crate::metasrv::Context;
pub struct ExtractStatHandler;
#[async_trait::async_trait]
impl HeartbeatHandler for ExtractStatHandler {
fn is_acceptable(&self, role: Role) -> bool {
role == Role::Datanode
}
async fn handle(
&self,
req: &HeartbeatRequest,
_ctx: &mut Context,
acc: &mut HeartbeatAccumulator,
) -> Result<HandleControl> {
if req.mailbox_message.is_some() {
// If the heartbeat is a mailbox message, it may have no other valid information,
// so we don't need to collect stats.
return Ok(HandleControl::Continue);
}
match Stat::try_from(req.clone()) {
Ok(stat) => {
let _ = acc.stat.insert(stat);
}
Err(err) => {
warn!(err; "Incomplete heartbeat data: {:?}", req);
}
};
Ok(HandleControl::Continue)
}
}

View File

@@ -1,233 +0,0 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use std::cmp::Ordering;
use api::v1::meta::{HeartbeatRequest, Role};
use common_meta::rpc::store::PutRequest;
use common_telemetry::warn;
use dashmap::DashMap;
use snafu::ResultExt;
use crate::error::{self, Result};
use crate::handler::node_stat::Stat;
use crate::handler::{HandleControl, HeartbeatAccumulator, HeartbeatHandler};
use crate::key::{DatanodeStatKey, DatanodeStatValue};
use crate::metasrv::Context;
const MAX_CACHED_STATS_PER_KEY: usize = 10;
#[derive(Default)]
struct EpochStats {
stats: Vec<Stat>,
epoch: Option<u64>,
}
impl EpochStats {
#[inline]
fn drain_all(&mut self) -> Vec<Stat> {
self.stats.drain(..).collect()
}
#[inline]
fn clear(&mut self) {
self.stats.clear();
}
#[inline]
fn push(&mut self, stat: Stat) {
self.stats.push(stat);
}
#[inline]
fn len(&self) -> usize {
self.stats.len()
}
#[inline]
fn epoch(&self) -> Option<u64> {
self.epoch
}
#[inline]
fn set_epoch(&mut self, epoch: u64) {
self.epoch = Some(epoch);
}
}
#[derive(Default)]
pub struct PersistStatsHandler {
stats_cache: DashMap<DatanodeStatKey, EpochStats>,
}
#[async_trait::async_trait]
impl HeartbeatHandler for PersistStatsHandler {
fn is_acceptable(&self, role: Role) -> bool {
role == Role::Datanode
}
async fn handle(
&self,
_req: &HeartbeatRequest,
ctx: &mut Context,
acc: &mut HeartbeatAccumulator,
) -> Result<HandleControl> {
let Some(current_stat) = acc.stat.take() else {
return Ok(HandleControl::Continue);
};
let key = current_stat.stat_key();
let mut entry = self.stats_cache.entry(key).or_default();
let key: Vec<u8> = key.into();
let epoch_stats = entry.value_mut();
let refresh = if let Some(epoch) = epoch_stats.epoch() {
match current_stat.node_epoch.cmp(&epoch) {
Ordering::Greater => {
// This node may have been redeployed.
epoch_stats.set_epoch(current_stat.node_epoch);
epoch_stats.clear();
true
}
Ordering::Less => {
warn!("Ignore stale heartbeat: {:?}", current_stat);
false
}
Ordering::Equal => false,
}
} else {
epoch_stats.set_epoch(current_stat.node_epoch);
// If the epoch is empty, it indicates that the current node sending the heartbeat
// for the first time to the current meta leader, so it is necessary to persist
// the data to the KV store as soon as possible.
true
};
epoch_stats.push(current_stat);
if !refresh && epoch_stats.len() < MAX_CACHED_STATS_PER_KEY {
return Ok(HandleControl::Continue);
}
let value: Vec<u8> = DatanodeStatValue {
stats: epoch_stats.drain_all(),
}
.try_into()?;
let put = PutRequest {
key,
value,
..Default::default()
};
let _ = ctx
.in_memory
.put(put)
.await
.context(error::KvBackendSnafu)?;
Ok(HandleControl::Continue)
}
}
#[cfg(test)]
mod tests {
use std::sync::Arc;
use common_meta::key::TableMetadataManager;
use common_meta::kv_backend::memory::MemoryKvBackend;
use common_meta::sequence::SequenceBuilder;
use super::*;
use crate::cluster::MetaPeerClientBuilder;
use crate::handler::{HeartbeatMailbox, Pushers};
use crate::key::DatanodeStatKey;
use crate::service::store::cached_kv::LeaderCachedKvBackend;
#[tokio::test]
async fn test_handle_datanode_stats() {
let in_memory = Arc::new(MemoryKvBackend::new());
let kv_backend = Arc::new(MemoryKvBackend::new());
let leader_cached_kv_backend = Arc::new(LeaderCachedKvBackend::with_always_leader(
kv_backend.clone(),
));
let seq = SequenceBuilder::new("test_seq", kv_backend.clone()).build();
let mailbox = HeartbeatMailbox::create(Pushers::default(), seq);
let meta_peer_client = MetaPeerClientBuilder::default()
.election(None)
.in_memory(in_memory.clone())
.build()
.map(Arc::new)
// Safety: all required fields set at initialization
.unwrap();
let ctx = Context {
server_addr: "127.0.0.1:0000".to_string(),
in_memory,
kv_backend: kv_backend.clone(),
leader_cached_kv_backend,
meta_peer_client,
mailbox,
election: None,
is_infancy: false,
table_metadata_manager: Arc::new(TableMetadataManager::new(kv_backend.clone())),
};
let handler = PersistStatsHandler::default();
handle_request_many_times(ctx.clone(), &handler, 1).await;
let key = DatanodeStatKey {
cluster_id: 3,
node_id: 101,
};
let key: Vec<u8> = key.into();
let res = ctx.in_memory.get(&key).await.unwrap();
let kv = res.unwrap();
let key: DatanodeStatKey = kv.key.clone().try_into().unwrap();
assert_eq!(3, key.cluster_id);
assert_eq!(101, key.node_id);
let val: DatanodeStatValue = kv.value.try_into().unwrap();
// first new stat must be set in kv store immediately
assert_eq!(1, val.stats.len());
assert_eq!(1, val.stats[0].region_num);
handle_request_many_times(ctx.clone(), &handler, 10).await;
let key: Vec<u8> = key.into();
let res = ctx.in_memory.get(&key).await.unwrap();
let kv = res.unwrap();
let val: DatanodeStatValue = kv.value.try_into().unwrap();
// refresh every 10 stats
assert_eq!(10, val.stats.len());
}
async fn handle_request_many_times(
mut ctx: Context,
handler: &PersistStatsHandler,
loop_times: i32,
) {
let req = HeartbeatRequest::default();
for i in 1..=loop_times {
let mut acc = HeartbeatAccumulator {
stat: Some(Stat {
cluster_id: 3,
id: 101,
region_num: i as _,
..Default::default()
}),
..Default::default()
};
handler.handle(&req, &mut ctx, &mut acc).await.unwrap();
}
}
}

View File

@@ -48,15 +48,16 @@ use crate::flow_meta_alloc::FlowPeerAllocator;
use crate::greptimedb_telemetry::get_greptimedb_telemetry_task;
use crate::handler::check_leader_handler::CheckLeaderHandler;
use crate::handler::collect_cluster_info_handler::{
CollectDatanodeClusterInfoHandler, CollectFrontendClusterInfoHandler,
CollectDatanodeClusterInfoHandler, CollectFlownodeClusterInfoHandler,
CollectFrontendClusterInfoHandler,
};
use crate::handler::collect_stats_handler::CollectStatsHandler;
use crate::handler::extract_stat_handler::ExtractStatHandler;
use crate::handler::failure_handler::RegionFailureHandler;
use crate::handler::filter_inactive_region_stats::FilterInactiveRegionStatsHandler;
use crate::handler::keep_lease_handler::{DatanodeKeepLeaseHandler, FlownodeKeepLeaseHandler};
use crate::handler::mailbox_handler::MailboxHandler;
use crate::handler::on_leader_start_handler::OnLeaderStartHandler;
use crate::handler::persist_stats_handler::PersistStatsHandler;
use crate::handler::publish_heartbeat_handler::PublishHeartbeatHandler;
use crate::handler::region_lease_handler::RegionLeaseHandler;
use crate::handler::response_header_handler::ResponseHeaderHandler;
@@ -383,9 +384,10 @@ impl MetasrvBuilder {
group.add_handler(FlownodeKeepLeaseHandler).await;
group.add_handler(CheckLeaderHandler).await;
group.add_handler(OnLeaderStartHandler).await;
group.add_handler(CollectStatsHandler).await;
group.add_handler(ExtractStatHandler).await;
group.add_handler(CollectDatanodeClusterInfoHandler).await;
group.add_handler(CollectFrontendClusterInfoHandler).await;
group.add_handler(CollectFlownodeClusterInfoHandler).await;
group.add_handler(MailboxHandler).await;
group.add_handler(region_lease_handler).await;
group.add_handler(FilterInactiveRegionStatsHandler).await;
@@ -395,7 +397,7 @@ impl MetasrvBuilder {
if let Some(publish_heartbeat_handler) = publish_heartbeat_handler {
group.add_handler(publish_heartbeat_handler).await;
}
group.add_handler(PersistStatsHandler::default()).await;
group.add_handler(CollectStatsHandler::default()).await;
group
}
};