feat(metasrv): implement topic statistics collection (#6732)

* feat(metasrv): collect topic stats

Signed-off-by: WenyXu <wenymedia@gmail.com>

* chore: fix tests and apply suggestions

Signed-off-by: WenyXu <wenymedia@gmail.com>

* chore: apply suggestions from CR

Signed-off-by: WenyXu <wenymedia@gmail.com>

* chore: apply suggestions from CR

Signed-off-by: WenyXu <wenymedia@gmail.com>

---------

Signed-off-by: WenyXu <wenymedia@gmail.com>
This commit is contained in:
Weny Xu
2025-08-13 20:52:28 +08:00
committed by GitHub
parent 8659412cac
commit 4fb7d92f7c
13 changed files with 930 additions and 174 deletions

View File

@@ -63,7 +63,10 @@ pub struct Stat {
pub wcus: i64,
/// How many regions on this node
pub region_num: u64,
/// The region stats of the datanode.
pub region_stats: Vec<RegionStat>,
/// The topic stats of the datanode.
pub topic_stats: Vec<TopicStat>,
// The node epoch is used to check whether the node has restarted or redeployed.
pub node_epoch: u64,
/// The datanode workloads.
@@ -221,6 +224,7 @@ impl TryFrom<&HeartbeatRequest> for Stat {
region_stats,
node_epoch,
node_workloads,
topic_stats,
..
} = value;
@@ -230,6 +234,7 @@ impl TryFrom<&HeartbeatRequest> for Stat {
.iter()
.map(RegionStat::from)
.collect::<Vec<_>>();
let topic_stats = topic_stats.iter().map(TopicStat::from).collect::<Vec<_>>();
let datanode_workloads = get_datanode_workloads(node_workloads.as_ref());
Ok(Self {
@@ -242,6 +247,7 @@ impl TryFrom<&HeartbeatRequest> for Stat {
wcus: region_stats.iter().map(|s| s.wcus).sum(),
region_num: region_stats.len() as u64,
region_stats,
topic_stats,
node_epoch: *node_epoch,
datanode_workloads,
})
@@ -304,6 +310,17 @@ impl From<&api::v1::meta::RegionStat> for RegionStat {
}
}
impl From<&api::v1::meta::TopicStat> for TopicStat {
fn from(value: &api::v1::meta::TopicStat) -> Self {
Self {
topic: value.topic_name.clone(),
latest_entry_id: value.latest_entry_id,
record_size: value.record_size,
record_num: value.record_num,
}
}
}
/// The key of the datanode stat in the memory store.
///
/// The format is `__meta_datanode_stat-0-{node_id}`.

View File

@@ -46,6 +46,7 @@ pub mod rpc;
pub mod sequence;
pub mod snapshot;
pub mod state_store;
pub mod stats;
#[cfg(any(test, feature = "testing"))]
pub mod test_util;
pub mod util;

View File

@@ -0,0 +1,15 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
pub mod topic;

View File

@@ -0,0 +1,634 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use std::collections::{HashMap, VecDeque};
use std::sync::{Arc, RwLock};
use std::time::Duration;
use common_telemetry::{debug, warn};
use datafusion_common::HashSet;
use crate::datanode::TopicStat;
use crate::distributed_time_constants::{
TOPIC_STATS_REPORT_INTERVAL_SECS, TOPIC_STATS_RETENTION_SECS,
};
use crate::DatanodeId;
pub type TopicStatsRegistryRef = Arc<TopicStatsRegistry>;
/// Manages statistics for all topics across the cluster.
pub struct TopicStatsRegistry {
inner: RwLock<TopicStatsStore>,
}
impl Default for TopicStatsRegistry {
fn default() -> Self {
Self::new(
Duration::from_secs(TOPIC_STATS_RETENTION_SECS),
Duration::from_secs(TOPIC_STATS_REPORT_INTERVAL_SECS),
)
}
}
impl TopicStatsRegistry {
/// Creates a new topic stats registry.
///
/// # Panics
/// Panic if the window size is zero.
fn new(retention: Duration, window_size: Duration) -> Self {
let history_limit = (retention.as_secs() / window_size.as_secs()).max(10) as usize;
Self {
inner: RwLock::new(TopicStatsStore::new(history_limit, window_size)),
}
}
/// Adds a topic stat for a given datanode at a specific timestamp.
pub fn add_stat(&self, datanode_id: DatanodeId, stat: &TopicStat, millis_ts: i64) {
let mut inner = self.inner.write().unwrap();
inner.add_stat(datanode_id, stat, millis_ts);
}
/// Adds a list of topic stats for a given datanode at a specific timestamp.
pub fn add_stats(&self, datanode_id: DatanodeId, stats: &[TopicStat], millis_ts: i64) {
if stats.is_empty() {
return;
}
let mut inner = self.inner.write().unwrap();
for stat in stats {
inner.add_stat(datanode_id, stat, millis_ts);
}
}
/// Gets the calculated topic stat for a given topic.
pub fn get_calculated_topic_stat(
&self,
topic: &str,
period: Duration,
) -> Option<CalculatedTopicStat> {
let inner = self.inner.read().unwrap();
inner.get_calculated_topic_stat(topic, period)
}
/// Gets the latest entry id and timestamp for a given topic.
pub fn get_latest_entry_id(&self, topic: &str) -> Option<(u64, i64)> {
let inner = self.inner.read().unwrap();
inner.get_latest_entry_id(topic)
}
}
#[derive(Debug, PartialEq, Clone, Default)]
struct HistoryTopicStat {
/// The latest entry id of the topic.
pub latest_entry_id: u64,
/// The total size in bytes of records appended to the topic.
pub record_size: u64,
/// The total number of records appended to the topic.
pub record_num: u64,
/// The start timestamp of the stat.
start_ts: i64,
}
#[derive(Debug)]
struct PartialTopicStat {
/// The latest entry id of the topic.
pub latest_entry_id: u64,
/// The total size in bytes of records appended to the topic.
pub record_size: u64,
/// The total number of records appended to the topic.
pub record_num: u64,
/// The timestamp of the partial topic stat.
pub timestamp: i64,
}
struct ActiveBucket {
buffer: HashMap<DatanodeId, HashMap<String, PartialTopicStat>>,
start_ts: i64,
window_size: Duration,
}
impl ActiveBucket {
fn new(timestamp: i64, window_sec: Duration) -> Self {
Self {
buffer: HashMap::new(),
start_ts: timestamp,
window_size: window_sec,
}
}
fn acceptable_ts(&self, millis_ts: i64) -> bool {
let acceptable = millis_ts >= self.start_ts
&& millis_ts < self.start_ts + self.window_size.as_millis() as i64;
if !acceptable {
debug!(
"acceptable range: ts >= {} && ts < {}, ts: {}",
self.start_ts,
self.start_ts + self.window_size.as_millis() as i64,
millis_ts
);
}
acceptable
}
/// Add a topic stat to the current topic stats.
///
/// Returns true if the topic stat is added successfully (stale stat will be ignored directly),
/// false if the topic stat is out of the window.
fn add_stat(&mut self, datanode_id: DatanodeId, stat: &TopicStat, millis_ts: i64) -> bool {
if !self.acceptable_ts(millis_ts) {
return false;
}
let datanode_stats = self.buffer.entry(datanode_id).or_default();
// Overwrite the topic stat if it already exists.
if let Some(prev) = datanode_stats.get_mut(&stat.topic) {
if millis_ts > prev.timestamp {
*prev = PartialTopicStat {
latest_entry_id: stat.latest_entry_id,
record_size: stat.record_size,
record_num: stat.record_num,
timestamp: millis_ts,
};
} else {
warn!(
"Ignore stale topic stat for topic: {}, timestamp: {}, last recorded timestamp: {}",
stat.topic, millis_ts, prev.timestamp
);
}
} else {
datanode_stats.insert(
stat.topic.to_string(),
PartialTopicStat {
latest_entry_id: stat.latest_entry_id,
record_size: stat.record_size,
record_num: stat.record_num,
timestamp: millis_ts,
},
);
}
true
}
fn merge(self) -> HashMap<String, HistoryTopicStat> {
let all_topics = self
.buffer
.values()
.flat_map(|stats| stats.keys())
.collect::<HashSet<_>>();
let mut output = HashMap::with_capacity(all_topics.len());
for topic in all_topics {
let stats = self
.buffer
.values()
.flat_map(|stats| stats.get(topic))
.collect::<Vec<_>>();
debug!("stats: {:?} for topic: {}", stats, topic);
let latest_entry_id = stats
.iter()
.map(|stat| stat.latest_entry_id)
.max()
.unwrap_or(0);
let record_size = stats.iter().map(|stat| stat.record_size).sum::<u64>();
let record_num = stats.iter().map(|stat| stat.record_num).sum::<u64>();
output.insert(
topic.to_string(),
HistoryTopicStat {
latest_entry_id,
record_size,
record_num,
start_ts: self.start_ts,
},
);
}
output
}
/// Get the partial topic stat of a datanode.
#[cfg(test)]
fn get_stat(&self, datanode_id: DatanodeId, topic: &str) -> Option<&PartialTopicStat> {
self.buffer
.get(&datanode_id)
.and_then(|stats| stats.get(topic))
}
}
/// Manages topic statistics over time, including active and historical buckets.
struct TopicStatsStore {
/// The currently active bucket collecting stats.
active_bucket: Option<ActiveBucket>,
/// Historical merged buckets, grouped by topic.
history_by_topic: HashMap<String, VecDeque<HistoryTopicStat>>,
/// Maximum number of historical windows to keep per topic.
history_limit: usize,
/// Duration of each stats window in seconds.
window_size: Duration,
}
impl TopicStatsStore {
/// Create a new topic stats.
fn new(history_limit: usize, window_size: Duration) -> Self {
Self {
active_bucket: None,
history_by_topic: HashMap::new(),
history_limit,
window_size,
}
}
/// Aligns the timestamp to the nearest second.
fn align_ts(millis_ts: i64) -> i64 {
(millis_ts / 1000) * 1000
}
fn rotate_active_bucket(&mut self, start_ts: i64) {
let aligned_ts = Self::align_ts(start_ts);
if let Some(old_bucket) = self.active_bucket.take() {
let merged = old_bucket.merge();
for (topic, stat) in merged {
debug!(
"Merge current topic: {}, stats into history: {:?}",
topic, stat
);
let history = self.history_by_topic.entry(topic).or_default();
history.push_back(stat);
if history.len() > self.history_limit {
history.pop_front();
}
}
}
self.active_bucket = Some(ActiveBucket::new(aligned_ts, self.window_size));
}
/// Adds a topic stat for a given datanode at a specific timestamp.
fn add_stat(&mut self, datanode_id: DatanodeId, stat: &TopicStat, millis_ts: i64) {
let aligned_ts = Self::align_ts(millis_ts);
let need_rotate = match &self.active_bucket {
Some(bucket) => !bucket.acceptable_ts(aligned_ts),
None => true,
};
if need_rotate {
debug!("Rotate active bucket at ts: {}", aligned_ts);
self.rotate_active_bucket(aligned_ts);
}
// Safety: The current topic stats is initialized in the previous step.
let active_bucket = self.active_bucket.as_mut().unwrap();
debug_assert!(active_bucket.add_stat(datanode_id, stat, millis_ts));
}
/// Gets the calculated topic stat for a given topic.
fn get_calculated_topic_stat(
&self,
topic: &str,
period: Duration,
) -> Option<CalculatedTopicStat> {
let stats = self.history_by_topic.get(topic)?;
calculate_topic_stat(stats, period)
}
/// Gets the latest entry id and timestamp for a given topic.
fn get_latest_entry_id(&self, topic: &str) -> Option<(u64, i64)> {
self.history_by_topic.get(topic).and_then(|stats| {
stats
.back()
.map(|stat| (stat.latest_entry_id, stat.start_ts))
})
}
}
/// The calculated topic stat.
///
/// The average record size is the average record size of the topic over the window.
/// The start timestamp is the timestamp of the window start.
/// The end timestamp is the timestamp of the window end.
pub struct CalculatedTopicStat {
pub avg_record_size: usize,
pub start_ts: i64,
pub end_ts: i64,
}
/// Calculates the average record size for a topic within a specified time window based on recent merged statistics.
///
/// Returns `Some(CalculatedTopicStat)` if the calculation is successful, or `None` if insufficient data is available.
fn calculate_topic_stat(
stats: &VecDeque<HistoryTopicStat>,
period: Duration,
) -> Option<CalculatedTopicStat> {
if stats.len() < 2 {
return None;
}
let last_stat = stats.back().unwrap();
let first_stat = stats.front().unwrap();
// Not enough stats data.
if first_stat.start_ts + period.as_millis() as i64 > last_stat.start_ts {
return None;
}
// Find the first stat whose timestamp is less than the last stat's timestamp - period.as_millis() as i64.
// TODO(weny): Use binary search to find the target stat.
let target_stat = stats
.iter()
.rev()
.skip(1)
.find(|stat| (stat.start_ts + period.as_millis() as i64) < last_stat.start_ts);
let target_stat = target_stat?;
// The target stat's record size and record num should be less than the last stat's record size and record num.
if target_stat.record_size > last_stat.record_size
|| target_stat.record_num > last_stat.record_num
{
return None;
}
// Safety: the last stat's record size and record num must be greater than the target stat's record size and record num.
let record_size = last_stat.record_size - target_stat.record_size;
let record_num = last_stat.record_num - target_stat.record_num;
let avg_record_size = record_size.checked_div(record_num).unwrap_or(0) as usize;
let start_ts = target_stat.start_ts;
let end_ts = last_stat.start_ts;
Some(CalculatedTopicStat {
avg_record_size,
start_ts,
end_ts,
})
}
#[cfg(test)]
mod tests {
use std::collections::VecDeque;
use common_time::util::current_time_millis;
use super::*;
use crate::datanode::TopicStat;
fn merged_stat(ts: i64, record_size: u64, record_num: u64) -> HistoryTopicStat {
HistoryTopicStat {
start_ts: ts,
record_size,
record_num,
..Default::default()
}
}
#[test]
fn test_empty_stats() {
let stats: VecDeque<HistoryTopicStat> = VecDeque::new();
assert!(calculate_topic_stat(&stats, Duration::from_secs(10)).is_none());
}
#[test]
fn test_single_stat() {
let mut stats = VecDeque::new();
stats.push_back(merged_stat(1000, 100, 2));
assert!(calculate_topic_stat(&stats, Duration::from_secs(10)).is_none());
}
#[test]
fn test_no_target_stat_found() {
let mut stats = VecDeque::new();
stats.push_back(merged_stat(1000, 100, 2));
stats.push_back(merged_stat(2000, 200, 4));
// window_sec is large, so no stat will be found
assert!(calculate_topic_stat(&stats, Duration::from_secs(100)).is_none());
}
#[test]
fn test_target_stat_found() {
let mut stats = VecDeque::new();
stats.push_back(merged_stat(1000, 100, 2));
stats.push_back(merged_stat(3000, 200, 4));
stats.push_back(merged_stat(6000, 600, 6));
let result = calculate_topic_stat(&stats, Duration::from_secs(2));
assert!(result.is_some());
let stat = result.unwrap();
assert_eq!(stat.avg_record_size, 200); // (600 - 200) / (6 - 4)
assert_eq!(stat.start_ts, 3000);
assert_eq!(stat.end_ts, 6000);
}
#[test]
fn test_target_stat_decreasing() {
let mut stats = VecDeque::new();
stats.push_back(merged_stat(1000, 100, 2));
stats.push_back(merged_stat(3000, 200, 4));
stats.push_back(merged_stat(6000, 100, 1)); // Reset or something wrong
let result = calculate_topic_stat(&stats, Duration::from_secs(2));
assert!(result.is_none());
}
#[test]
fn test_multiple_stats_target_found() {
let mut stats = VecDeque::new();
stats.push_back(merged_stat(1000, 100, 2));
stats.push_back(merged_stat(2000, 200, 4));
stats.push_back(merged_stat(4000, 400, 8));
stats.push_back(merged_stat(8000, 800, 16));
let result = calculate_topic_stat(&stats, Duration::from_secs(3));
assert!(result.is_some());
let stat = result.unwrap();
assert_eq!(stat.avg_record_size, 50); // (800 - 400) / (16 - 8)
assert_eq!(stat.start_ts, 4000);
assert_eq!(stat.end_ts, 8000);
}
#[test]
fn test_active_bucket() {
let ts = current_time_millis();
let window_size = Duration::from_secs(3);
let mut active_bucket = ActiveBucket::new(ts, window_size);
assert!(active_bucket.add_stat(
0,
&TopicStat {
topic: "test".to_string(),
latest_entry_id: 1,
record_size: 256,
record_num: 1,
},
ts + 10,
));
assert!(active_bucket.add_stat(
1,
&TopicStat {
topic: "test".to_string(),
latest_entry_id: 10,
record_size: 5120,
record_num: 10,
},
ts + 10,
));
assert!(active_bucket.add_stat(
0,
&TopicStat {
topic: "test1".to_string(),
latest_entry_id: 2,
record_size: 128,
record_num: 2,
},
ts + 9,
));
// Out of the window.
assert!(!active_bucket.add_stat(
0,
&TopicStat {
topic: "test".to_string(),
latest_entry_id: 2,
record_size: 2,
record_num: 2,
},
ts + window_size.as_millis() as i64 + 1,
));
// Out of the window.
assert!(!active_bucket.add_stat(
0,
&TopicStat {
topic: "test".to_string(),
latest_entry_id: 2,
record_size: 2,
record_num: 2,
},
ts - 1
));
// Overwrite the topic stat if the timestamp is larger.
assert!(active_bucket.add_stat(
0,
&TopicStat {
topic: "test".to_string(),
latest_entry_id: 3,
record_size: 1024,
record_num: 3,
},
ts + 11,
));
assert_eq!(
active_bucket.get_stat(0, "test").unwrap().latest_entry_id,
3
);
// Ignore stale topic stat.
assert!(active_bucket.add_stat(
0,
&TopicStat {
topic: "test".to_string(),
latest_entry_id: 2,
record_size: 512,
record_num: 2,
},
ts + 9,
));
assert_eq!(
active_bucket.get_stat(0, "test").unwrap().latest_entry_id,
3
);
let merged = active_bucket.merge();
assert_eq!(merged.len(), 2);
assert_eq!(merged.get("test").unwrap().latest_entry_id, 10);
assert_eq!(merged.get("test").unwrap().record_size, 5120 + 1024);
assert_eq!(merged.get("test").unwrap().record_num, 10 + 3);
assert_eq!(merged.get("test1").unwrap().latest_entry_id, 2);
assert_eq!(merged.get("test1").unwrap().record_size, 128);
assert_eq!(merged.get("test1").unwrap().record_num, 2);
assert_eq!(merged.get("test1").unwrap().start_ts, ts);
}
#[test]
fn test_topic_stats() {
let topic_name = "test";
let window_size = Duration::from_secs(60);
let mut topic_stats = TopicStatsStore::new(5, window_size);
let ts = TopicStatsStore::align_ts(current_time_millis());
debug!("add stat at ts: {}", ts);
topic_stats.add_stat(
0,
&TopicStat {
topic: topic_name.to_string(),
latest_entry_id: 1,
record_size: 1024,
record_num: 1,
},
ts,
);
debug!("add stat at ts: {}", ts + window_size.as_millis() as i64);
topic_stats.add_stat(
1,
&TopicStat {
topic: topic_name.to_string(),
latest_entry_id: 4,
record_size: 4096,
record_num: 4,
},
ts + window_size.as_millis() as i64 - 1,
);
topic_stats.add_stat(
1,
&TopicStat {
topic: "another_topic".to_string(),
latest_entry_id: 4,
record_size: 4096,
record_num: 4,
},
ts + window_size.as_millis() as i64 - 1,
);
debug!(
"add stat at ts: {}",
ts + window_size.as_millis() as i64 + 1
);
// Add a stat that is out of the window.
topic_stats.add_stat(
1,
&TopicStat {
topic: topic_name.to_string(),
latest_entry_id: 5,
record_size: 8192,
record_num: 5,
},
ts + window_size.as_millis() as i64,
);
let history = topic_stats.history_by_topic.get(topic_name).unwrap();
assert_eq!(history.len(), 1);
assert_eq!(
history[0],
HistoryTopicStat {
latest_entry_id: 4,
record_size: 1024 + 4096,
record_num: 1 + 4,
start_ts: ts,
}
);
assert!(topic_stats.active_bucket.is_some());
}
}

View File

@@ -53,6 +53,7 @@ use tokio::sync::mpsc::Sender;
use tokio::sync::{oneshot, watch, Notify, RwLock};
use crate::error::{self, DeserializeFromJsonSnafu, Result, UnexpectedInstructionReplySnafu};
use crate::handler::collect_topic_stats_handler::CollectTopicStatsHandler;
use crate::handler::flow_state_handler::FlowStateHandler;
use crate::metasrv::Context;
use crate::metrics::{METRIC_META_HANDLER_EXECUTE, METRIC_META_HEARTBEAT_CONNECTION_NUM};
@@ -65,6 +66,7 @@ pub mod check_leader_handler;
pub mod collect_cluster_info_handler;
pub mod collect_leader_region_handler;
pub mod collect_stats_handler;
pub mod collect_topic_stats_handler;
pub mod extract_stat_handler;
pub mod failure_handler;
pub mod filter_inactive_region_stats;
@@ -76,6 +78,8 @@ pub mod publish_heartbeat_handler;
pub mod region_lease_handler;
pub mod remap_flow_peer_handler;
pub mod response_header_handler;
#[cfg(test)]
pub mod test_utils;
#[async_trait::async_trait]
pub trait HeartbeatHandler: Send + Sync {
@@ -619,6 +623,7 @@ impl HeartbeatHandlerGroupBuilder {
self.add_handler_last(publish_heartbeat_handler);
}
self.add_handler_last(CollectLeaderRegionHandler);
self.add_handler_last(CollectTopicStatsHandler);
self.add_handler_last(CollectStatsHandler::new(self.flush_stats_factor));
self.add_handler_last(RemapFlowPeerHandler::default());
@@ -897,8 +902,6 @@ mod tests {
.unwrap();
let handlers = group.handlers;
assert_eq!(14, handlers.len());
let names = [
"ResponseHeaderHandler",
"DatanodeKeepLeaseHandler",
@@ -912,10 +915,11 @@ mod tests {
"MailboxHandler",
"FilterInactiveRegionStatsHandler",
"CollectLeaderRegionHandler",
"CollectTopicStatsHandler",
"CollectStatsHandler",
"RemapFlowPeerHandler",
];
assert_eq!(names.len(), handlers.len());
for (handler, name) in handlers.iter().zip(names.into_iter()) {
assert_eq!(handler.name, name);
}
@@ -934,8 +938,6 @@ mod tests {
let group = builder.build().unwrap();
let handlers = group.handlers;
assert_eq!(15, handlers.len());
let names = [
"ResponseHeaderHandler",
"DatanodeKeepLeaseHandler",
@@ -950,10 +952,11 @@ mod tests {
"CollectStatsHandler",
"FilterInactiveRegionStatsHandler",
"CollectLeaderRegionHandler",
"CollectTopicStatsHandler",
"CollectStatsHandler",
"RemapFlowPeerHandler",
];
assert_eq!(names.len(), handlers.len());
for (handler, name) in handlers.iter().zip(names.into_iter()) {
assert_eq!(handler.name, name);
}
@@ -969,8 +972,6 @@ mod tests {
let group = builder.build().unwrap();
let handlers = group.handlers;
assert_eq!(15, handlers.len());
let names = [
"CollectStatsHandler",
"ResponseHeaderHandler",
@@ -985,10 +986,11 @@ mod tests {
"MailboxHandler",
"FilterInactiveRegionStatsHandler",
"CollectLeaderRegionHandler",
"CollectTopicStatsHandler",
"CollectStatsHandler",
"RemapFlowPeerHandler",
];
assert_eq!(names.len(), handlers.len());
for (handler, name) in handlers.iter().zip(names.into_iter()) {
assert_eq!(handler.name, name);
}
@@ -1004,8 +1006,6 @@ mod tests {
let group = builder.build().unwrap();
let handlers = group.handlers;
assert_eq!(15, handlers.len());
let names = [
"ResponseHeaderHandler",
"DatanodeKeepLeaseHandler",
@@ -1020,10 +1020,11 @@ mod tests {
"CollectStatsHandler",
"FilterInactiveRegionStatsHandler",
"CollectLeaderRegionHandler",
"CollectTopicStatsHandler",
"CollectStatsHandler",
"RemapFlowPeerHandler",
];
assert_eq!(names.len(), handlers.len());
for (handler, name) in handlers.iter().zip(names.into_iter()) {
assert_eq!(handler.name, name);
}
@@ -1039,8 +1040,6 @@ mod tests {
let group = builder.build().unwrap();
let handlers = group.handlers;
assert_eq!(15, handlers.len());
let names = [
"ResponseHeaderHandler",
"DatanodeKeepLeaseHandler",
@@ -1054,11 +1053,12 @@ mod tests {
"MailboxHandler",
"FilterInactiveRegionStatsHandler",
"CollectLeaderRegionHandler",
"CollectTopicStatsHandler",
"CollectStatsHandler",
"ResponseHeaderHandler",
"RemapFlowPeerHandler",
];
assert_eq!(names.len(), handlers.len());
for (handler, name) in handlers.iter().zip(names.into_iter()) {
assert_eq!(handler.name, name);
}
@@ -1074,8 +1074,6 @@ mod tests {
let group = builder.build().unwrap();
let handlers = group.handlers;
assert_eq!(14, handlers.len());
let names = [
"ResponseHeaderHandler",
"DatanodeKeepLeaseHandler",
@@ -1089,10 +1087,12 @@ mod tests {
"CollectStatsHandler",
"FilterInactiveRegionStatsHandler",
"CollectLeaderRegionHandler",
"CollectTopicStatsHandler",
"CollectStatsHandler",
"RemapFlowPeerHandler",
];
assert_eq!(names.len(), handlers.len());
for (handler, name) in handlers.iter().zip(names.into_iter()) {
assert_eq!(handler.name, name);
}
@@ -1108,8 +1108,6 @@ mod tests {
let group = builder.build().unwrap();
let handlers = group.handlers;
assert_eq!(14, handlers.len());
let names = [
"ResponseHeaderHandler",
"DatanodeKeepLeaseHandler",
@@ -1123,10 +1121,12 @@ mod tests {
"MailboxHandler",
"FilterInactiveRegionStatsHandler",
"CollectLeaderRegionHandler",
"CollectTopicStatsHandler",
"ResponseHeaderHandler",
"RemapFlowPeerHandler",
];
assert_eq!(names.len(), handlers.len());
for (handler, name) in handlers.iter().zip(names.into_iter()) {
assert_eq!(handler.name, name);
}
@@ -1142,8 +1142,6 @@ mod tests {
let group = builder.build().unwrap();
let handlers = group.handlers;
assert_eq!(14, handlers.len());
let names = [
"CollectStatsHandler",
"DatanodeKeepLeaseHandler",
@@ -1157,10 +1155,11 @@ mod tests {
"MailboxHandler",
"FilterInactiveRegionStatsHandler",
"CollectLeaderRegionHandler",
"CollectTopicStatsHandler",
"CollectStatsHandler",
"RemapFlowPeerHandler",
];
assert_eq!(names.len(), handlers.len());
for (handler, name) in handlers.iter().zip(names.into_iter()) {
assert_eq!(handler.name, name);
}

View File

@@ -59,51 +59,13 @@ impl HeartbeatHandler for CollectLeaderRegionHandler {
#[cfg(test)]
mod tests {
use std::sync::Arc;
use common_meta::cache_invalidator::DummyCacheInvalidator;
use common_meta::datanode::{RegionManifestInfo, RegionStat, Stat};
use common_meta::key::TableMetadataManager;
use common_meta::kv_backend::memory::MemoryKvBackend;
use common_meta::region_registry::{LeaderRegionManifestInfo, LeaderRegionRegistry};
use common_meta::sequence::SequenceBuilder;
use common_meta::region_registry::LeaderRegionManifestInfo;
use store_api::region_engine::RegionRole;
use store_api::storage::RegionId;
use super::*;
use crate::cluster::MetaPeerClientBuilder;
use crate::handler::{HeartbeatMailbox, Pushers};
use crate::service::store::cached_kv::LeaderCachedKvBackend;
fn mock_ctx() -> Context {
let in_memory = Arc::new(MemoryKvBackend::new());
let kv_backend = Arc::new(MemoryKvBackend::new());
let leader_cached_kv_backend = Arc::new(LeaderCachedKvBackend::with_always_leader(
kv_backend.clone(),
));
let seq = SequenceBuilder::new("test_seq", kv_backend.clone()).build();
let mailbox = HeartbeatMailbox::create(Pushers::default(), seq);
let meta_peer_client = MetaPeerClientBuilder::default()
.election(None)
.in_memory(in_memory.clone())
.build()
.map(Arc::new)
// Safety: all required fields set at initialization
.unwrap();
Context {
server_addr: "127.0.0.1:0000".to_string(),
in_memory,
kv_backend: kv_backend.clone(),
leader_cached_kv_backend,
meta_peer_client,
mailbox,
election: None,
is_infancy: false,
table_metadata_manager: Arc::new(TableMetadataManager::new(kv_backend.clone())),
cache_invalidator: Arc::new(DummyCacheInvalidator),
leader_region_registry: Arc::new(LeaderRegionRegistry::new()),
}
}
use crate::handler::test_utils::TestEnv;
fn new_region_stat(id: RegionId, manifest_version: u64, role: RegionRole) -> RegionStat {
RegionStat {
@@ -130,7 +92,8 @@ mod tests {
#[tokio::test]
async fn test_handle_collect_leader_region() {
let mut ctx = mock_ctx();
let env = TestEnv::new();
let mut ctx = env.ctx();
let mut acc = HeartbeatAccumulator {
stat: Some(Stat {

View File

@@ -217,49 +217,15 @@ async fn rewrite_node_address(ctx: &mut Context, stat: &Stat) {
#[cfg(test)]
mod tests {
use std::sync::Arc;
use common_meta::cache_invalidator::DummyCacheInvalidator;
use common_meta::datanode::DatanodeStatKey;
use common_meta::key::TableMetadataManager;
use common_meta::kv_backend::memory::MemoryKvBackend;
use common_meta::region_registry::LeaderRegionRegistry;
use common_meta::sequence::SequenceBuilder;
use super::*;
use crate::cluster::MetaPeerClientBuilder;
use crate::handler::{HeartbeatMailbox, Pushers};
use crate::service::store::cached_kv::LeaderCachedKvBackend;
use crate::handler::test_utils::TestEnv;
#[tokio::test]
async fn test_handle_datanode_stats() {
let in_memory = Arc::new(MemoryKvBackend::new());
let kv_backend = Arc::new(MemoryKvBackend::new());
let leader_cached_kv_backend = Arc::new(LeaderCachedKvBackend::with_always_leader(
kv_backend.clone(),
));
let seq = SequenceBuilder::new("test_seq", kv_backend.clone()).build();
let mailbox = HeartbeatMailbox::create(Pushers::default(), seq);
let meta_peer_client = MetaPeerClientBuilder::default()
.election(None)
.in_memory(in_memory.clone())
.build()
.map(Arc::new)
// Safety: all required fields set at initialization
.unwrap();
let ctx = Context {
server_addr: "127.0.0.1:0000".to_string(),
in_memory,
kv_backend: kv_backend.clone(),
leader_cached_kv_backend,
meta_peer_client,
mailbox,
election: None,
is_infancy: false,
table_metadata_manager: Arc::new(TableMetadataManager::new(kv_backend.clone())),
cache_invalidator: Arc::new(DummyCacheInvalidator),
leader_region_registry: Arc::new(LeaderRegionRegistry::new()),
};
let env = TestEnv::new();
let ctx = env.ctx();
let handler = CollectStatsHandler::default();
handle_request_many_times(ctx.clone(), &handler, 1).await;

View File

@@ -0,0 +1,125 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use api::v1::meta::{HeartbeatRequest, Role};
use crate::error::Result;
use crate::handler::{HandleControl, HeartbeatAccumulator, HeartbeatHandler};
use crate::metasrv::Context;
pub struct CollectTopicStatsHandler;
#[async_trait::async_trait]
impl HeartbeatHandler for CollectTopicStatsHandler {
fn is_acceptable(&self, role: Role) -> bool {
role == Role::Datanode
}
async fn handle(
&self,
_req: &HeartbeatRequest,
ctx: &mut Context,
acc: &mut HeartbeatAccumulator,
) -> Result<HandleControl> {
let Some(current_stat) = acc.stat.as_ref() else {
return Ok(HandleControl::Continue);
};
ctx.topic_stats_registry.add_stats(
current_stat.id,
&current_stat.topic_stats,
current_stat.timestamp_millis,
);
Ok(HandleControl::Continue)
}
}
#[cfg(test)]
mod tests {
use common_meta::datanode::{Stat, TopicStat};
use common_meta::distributed_time_constants::TOPIC_STATS_REPORT_INTERVAL_SECS;
use common_time::util::current_time_millis;
use super::*;
use crate::handler::test_utils::TestEnv;
#[tokio::test]
async fn test_handle_collect_topic_stats() {
let env = TestEnv::new();
let ctx = env.ctx();
let handler = CollectTopicStatsHandler;
let timestamp_millis = current_time_millis();
let aligned_ts = timestamp_millis - timestamp_millis % 1000;
handle_request_many_times(ctx.clone(), &handler, 1, timestamp_millis, 10).await;
handle_request_many_times(ctx.clone(), &handler, 2, timestamp_millis, 10).await;
// trigger the next window
let next_timestamp_millis =
timestamp_millis + (TOPIC_STATS_REPORT_INTERVAL_SECS * 1000) as i64;
handle_request_many_times(ctx.clone(), &handler, 1, next_timestamp_millis, 10).await;
let latest_entry_id = ctx
.topic_stats_registry
.get_latest_entry_id("test")
.unwrap();
assert_eq!(latest_entry_id, (15, aligned_ts));
let latest_entry_id = ctx
.topic_stats_registry
.get_latest_entry_id("test2")
.unwrap();
assert_eq!(latest_entry_id, (10, aligned_ts));
assert!(ctx
.topic_stats_registry
.get_latest_entry_id("test3")
.is_none());
}
async fn handle_request_many_times(
mut ctx: Context,
handler: &CollectTopicStatsHandler,
datanode_id: u64,
timestamp_millis: i64,
loop_times: i32,
) {
let req = HeartbeatRequest::default();
for i in 1..=loop_times {
let mut acc = HeartbeatAccumulator {
stat: Some(Stat {
id: datanode_id,
region_num: i as _,
timestamp_millis,
topic_stats: vec![
TopicStat {
topic: "test".to_string(),
latest_entry_id: 15,
record_size: 1024,
record_num: 2,
},
TopicStat {
topic: "test2".to_string(),
latest_entry_id: 10,
record_size: 1024,
record_num: 2,
},
],
..Default::default()
}),
..Default::default()
};
handler.handle(&req, &mut ctx, &mut acc).await.unwrap();
}
}
}

View File

@@ -127,51 +127,18 @@ async fn put_into_memory_store(ctx: &mut Context, key: Vec<u8>, value: Vec<u8>,
#[cfg(test)]
mod tests {
use std::collections::HashMap;
use std::sync::Arc;
use api::v1::meta::RequestHeader;
use common_meta::cache_invalidator::DummyCacheInvalidator;
use common_meta::datanode::Stat;
use common_meta::key::TableMetadataManager;
use common_meta::kv_backend::memory::MemoryKvBackend;
use common_meta::region_registry::LeaderRegionRegistry;
use common_meta::sequence::SequenceBuilder;
use super::*;
use crate::cluster::MetaPeerClientBuilder;
use crate::handler::{HeartbeatMailbox, Pushers};
use crate::handler::test_utils::TestEnv;
use crate::lease::find_datanode_lease_value;
use crate::service::store::cached_kv::LeaderCachedKvBackend;
#[tokio::test]
async fn test_put_into_memory_store() {
let in_memory = Arc::new(MemoryKvBackend::new());
let kv_backend = Arc::new(MemoryKvBackend::new());
let leader_cached_kv_backend = Arc::new(LeaderCachedKvBackend::with_always_leader(
kv_backend.clone(),
));
let seq = SequenceBuilder::new("test_seq", kv_backend.clone()).build();
let mailbox = HeartbeatMailbox::create(Pushers::default(), seq);
let meta_peer_client = MetaPeerClientBuilder::default()
.election(None)
.in_memory(in_memory.clone())
.build()
.map(Arc::new)
// Safety: all required fields set at initialization
.unwrap();
let ctx = Context {
server_addr: "127.0.0.1:0000".to_string(),
in_memory,
kv_backend: kv_backend.clone(),
leader_cached_kv_backend,
meta_peer_client,
mailbox,
election: None,
is_infancy: false,
table_metadata_manager: Arc::new(TableMetadataManager::new(kv_backend.clone())),
cache_invalidator: Arc::new(DummyCacheInvalidator),
leader_region_registry: Arc::new(LeaderRegionRegistry::new()),
};
let env = TestEnv::new();
let ctx = env.ctx();
let handler = DatanodeKeepLeaseHandler;
handle_request_many_times(ctx.clone(), &handler, 1).await;

View File

@@ -43,50 +43,16 @@ impl HeartbeatHandler for ResponseHeaderHandler {
#[cfg(test)]
mod tests {
use std::sync::Arc;
use api::v1::meta::RequestHeader;
use common_meta::cache_invalidator::DummyCacheInvalidator;
use common_meta::key::TableMetadataManager;
use common_meta::kv_backend::memory::MemoryKvBackend;
use common_meta::region_registry::LeaderRegionRegistry;
use common_meta::sequence::SequenceBuilder;
use common_telemetry::tracing_context::W3cTrace;
use super::*;
use crate::cluster::MetaPeerClientBuilder;
use crate::handler::{Context, HeartbeatMailbox, Pushers};
use crate::service::store::cached_kv::LeaderCachedKvBackend;
use crate::handler::test_utils::TestEnv;
#[tokio::test]
async fn test_handle_heartbeat_resp_header() {
let in_memory = Arc::new(MemoryKvBackend::new());
let kv_backend = Arc::new(MemoryKvBackend::new());
let leader_cached_kv_backend = Arc::new(LeaderCachedKvBackend::with_always_leader(
kv_backend.clone(),
));
let seq = SequenceBuilder::new("test_seq", kv_backend.clone()).build();
let mailbox = HeartbeatMailbox::create(Pushers::default(), seq);
let meta_peer_client = MetaPeerClientBuilder::default()
.election(None)
.in_memory(in_memory.clone())
.build()
.map(Arc::new)
// Safety: all required fields set at initialization
.unwrap();
let mut ctx = Context {
server_addr: "127.0.0.1:0000".to_string(),
in_memory,
kv_backend: kv_backend.clone(),
leader_cached_kv_backend,
meta_peer_client,
mailbox,
election: None,
is_infancy: false,
table_metadata_manager: Arc::new(TableMetadataManager::new(kv_backend.clone())),
cache_invalidator: Arc::new(DummyCacheInvalidator),
leader_region_registry: Arc::new(LeaderRegionRegistry::new()),
};
let env = TestEnv::new();
let mut ctx = env.ctx();
let req = HeartbeatRequest {
header: Some(RequestHeader::new(2, Role::Datanode, W3cTrace::new())),

View File

@@ -0,0 +1,95 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use std::sync::Arc;
use common_meta::cache_invalidator::{CacheInvalidatorRef, DummyCacheInvalidator};
use common_meta::key::{TableMetadataManager, TableMetadataManagerRef};
use common_meta::kv_backend::memory::MemoryKvBackend;
use common_meta::kv_backend::{KvBackendRef, ResettableKvBackendRef};
use common_meta::region_registry::{LeaderRegionRegistry, LeaderRegionRegistryRef};
use common_meta::sequence::SequenceBuilder;
use common_meta::stats::topic::{TopicStatsRegistry, TopicStatsRegistryRef};
use crate::cluster::{MetaPeerClientBuilder, MetaPeerClientRef};
use crate::handler::{HeartbeatMailbox, Pushers};
use crate::metasrv::Context;
use crate::service::mailbox::MailboxRef;
use crate::service::store::cached_kv::LeaderCachedKvBackend;
pub struct TestEnv {
in_memory: ResettableKvBackendRef,
kv_backend: KvBackendRef,
leader_cached_kv_backend: Arc<LeaderCachedKvBackend>,
mailbox: MailboxRef,
meta_peer_client: MetaPeerClientRef,
table_metadata_manager: TableMetadataManagerRef,
cache_invalidator: CacheInvalidatorRef,
leader_region_registry: LeaderRegionRegistryRef,
topic_stats_registry: TopicStatsRegistryRef,
}
impl Default for TestEnv {
fn default() -> Self {
Self::new()
}
}
impl TestEnv {
pub fn new() -> Self {
let in_memory = Arc::new(MemoryKvBackend::new());
let kv_backend = Arc::new(MemoryKvBackend::new());
let leader_cached_kv_backend = Arc::new(LeaderCachedKvBackend::with_always_leader(
kv_backend.clone(),
));
let seq = SequenceBuilder::new("test_seq", kv_backend.clone()).build();
let mailbox = HeartbeatMailbox::create(Pushers::default(), seq);
let meta_peer_client = MetaPeerClientBuilder::default()
.election(None)
.in_memory(in_memory.clone())
.build()
.map(Arc::new)
// Safety: all required fields set at initialization
.unwrap();
Self {
in_memory,
kv_backend: kv_backend.clone(),
leader_cached_kv_backend,
mailbox,
meta_peer_client,
table_metadata_manager: Arc::new(TableMetadataManager::new(kv_backend.clone())),
cache_invalidator: Arc::new(DummyCacheInvalidator),
leader_region_registry: Arc::new(LeaderRegionRegistry::new()),
topic_stats_registry: Arc::new(TopicStatsRegistry::default()),
}
}
/// Returns a new context for testing.
pub fn ctx(&self) -> Context {
Context {
in_memory: self.in_memory.clone(),
kv_backend: self.kv_backend.clone(),
leader_cached_kv_backend: self.leader_cached_kv_backend.clone(),
server_addr: "127.0.0.1:0000".to_string(),
meta_peer_client: self.meta_peer_client.clone(),
mailbox: self.mailbox.clone(),
election: None,
is_infancy: false,
table_metadata_manager: self.table_metadata_manager.clone(),
cache_invalidator: self.cache_invalidator.clone(),
leader_region_registry: self.leader_region_registry.clone(),
topic_stats_registry: self.topic_stats_registry.clone(),
}
}
}

View File

@@ -40,6 +40,7 @@ use common_meta::reconciliation::manager::ReconciliationManagerRef;
use common_meta::region_keeper::MemoryRegionKeeperRef;
use common_meta::region_registry::LeaderRegionRegistryRef;
use common_meta::sequence::SequenceRef;
use common_meta::stats::topic::TopicStatsRegistryRef;
use common_meta::wal_options_allocator::WalOptionsAllocatorRef;
use common_options::datanode::DatanodeClientOptions;
use common_options::memory::MemoryOptions;
@@ -308,6 +309,7 @@ pub struct Context {
pub table_metadata_manager: TableMetadataManagerRef,
pub cache_invalidator: CacheInvalidatorRef,
pub leader_region_registry: LeaderRegionRegistryRef,
pub topic_stats_registry: TopicStatsRegistryRef,
}
impl Context {
@@ -458,6 +460,7 @@ pub struct Metasrv {
region_supervisor_ticker: Option<RegionSupervisorTickerRef>,
cache_invalidator: CacheInvalidatorRef,
leader_region_registry: LeaderRegionRegistryRef,
topic_stats_registry: TopicStatsRegistryRef,
wal_prune_ticker: Option<WalPruneTickerRef>,
table_id_sequence: SequenceRef,
reconciliation_manager: ReconciliationManagerRef,
@@ -756,6 +759,7 @@ impl Metasrv {
let table_metadata_manager = self.table_metadata_manager.clone();
let cache_invalidator = self.cache_invalidator.clone();
let leader_region_registry = self.leader_region_registry.clone();
let topic_stats_registry = self.topic_stats_registry.clone();
Context {
server_addr,
@@ -769,6 +773,7 @@ impl Metasrv {
table_metadata_manager,
cache_invalidator,
leader_region_registry,
topic_stats_registry,
}
}
}

View File

@@ -27,7 +27,7 @@ use common_meta::ddl::{
DdlContext, NoopRegionFailureDetectorControl, RegionFailureDetectorControllerRef,
};
use common_meta::ddl_manager::DdlManager;
use common_meta::distributed_time_constants;
use common_meta::distributed_time_constants::{self};
use common_meta::key::flow::flow_state::FlowStateManager;
use common_meta::key::flow::FlowMetadataManager;
use common_meta::key::runtime_switch::{RuntimeSwitchManager, RuntimeSwitchManagerRef};
@@ -40,6 +40,7 @@ use common_meta::region_keeper::MemoryRegionKeeper;
use common_meta::region_registry::LeaderRegionRegistry;
use common_meta::sequence::SequenceBuilder;
use common_meta::state_store::KvStateStore;
use common_meta::stats::topic::TopicStatsRegistry;
use common_meta::wal_options_allocator::{build_kafka_client, build_wal_options_allocator};
use common_procedure::local::{LocalManager, ManagerConfig};
use common_procedure::ProcedureManagerRef;
@@ -372,6 +373,7 @@ impl MetasrvBuilder {
};
let leader_region_registry = Arc::new(LeaderRegionRegistry::default());
let topic_stats_registry = Arc::new(TopicStatsRegistry::default());
let ddl_context = DdlContext {
node_manager: node_manager.clone(),
@@ -507,6 +509,7 @@ impl MetasrvBuilder {
wal_prune_ticker,
table_id_sequence,
reconciliation_manager,
topic_stats_registry,
})
}
}