diff --git a/src/common/meta/src/datanode.rs b/src/common/meta/src/datanode.rs index e2e79c9e81..2ffb722cfd 100644 --- a/src/common/meta/src/datanode.rs +++ b/src/common/meta/src/datanode.rs @@ -63,7 +63,10 @@ pub struct Stat { pub wcus: i64, /// How many regions on this node pub region_num: u64, + /// The region stats of the datanode. pub region_stats: Vec, + /// The topic stats of the datanode. + pub topic_stats: Vec, // The node epoch is used to check whether the node has restarted or redeployed. pub node_epoch: u64, /// The datanode workloads. @@ -221,6 +224,7 @@ impl TryFrom<&HeartbeatRequest> for Stat { region_stats, node_epoch, node_workloads, + topic_stats, .. } = value; @@ -230,6 +234,7 @@ impl TryFrom<&HeartbeatRequest> for Stat { .iter() .map(RegionStat::from) .collect::>(); + let topic_stats = topic_stats.iter().map(TopicStat::from).collect::>(); let datanode_workloads = get_datanode_workloads(node_workloads.as_ref()); Ok(Self { @@ -242,6 +247,7 @@ impl TryFrom<&HeartbeatRequest> for Stat { wcus: region_stats.iter().map(|s| s.wcus).sum(), region_num: region_stats.len() as u64, region_stats, + topic_stats, node_epoch: *node_epoch, datanode_workloads, }) @@ -304,6 +310,17 @@ impl From<&api::v1::meta::RegionStat> for RegionStat { } } +impl From<&api::v1::meta::TopicStat> for TopicStat { + fn from(value: &api::v1::meta::TopicStat) -> Self { + Self { + topic: value.topic_name.clone(), + latest_entry_id: value.latest_entry_id, + record_size: value.record_size, + record_num: value.record_num, + } + } +} + /// The key of the datanode stat in the memory store. /// /// The format is `__meta_datanode_stat-0-{node_id}`. diff --git a/src/common/meta/src/lib.rs b/src/common/meta/src/lib.rs index fa53ab0a6c..8381fa7cb8 100644 --- a/src/common/meta/src/lib.rs +++ b/src/common/meta/src/lib.rs @@ -46,6 +46,7 @@ pub mod rpc; pub mod sequence; pub mod snapshot; pub mod state_store; +pub mod stats; #[cfg(any(test, feature = "testing"))] pub mod test_util; pub mod util; diff --git a/src/common/meta/src/stats.rs b/src/common/meta/src/stats.rs new file mode 100644 index 0000000000..03c0784cd0 --- /dev/null +++ b/src/common/meta/src/stats.rs @@ -0,0 +1,15 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +pub mod topic; diff --git a/src/common/meta/src/stats/topic.rs b/src/common/meta/src/stats/topic.rs new file mode 100644 index 0000000000..e5a87f3332 --- /dev/null +++ b/src/common/meta/src/stats/topic.rs @@ -0,0 +1,634 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::collections::{HashMap, VecDeque}; +use std::sync::{Arc, RwLock}; +use std::time::Duration; + +use common_telemetry::{debug, warn}; +use datafusion_common::HashSet; + +use crate::datanode::TopicStat; +use crate::distributed_time_constants::{ + TOPIC_STATS_REPORT_INTERVAL_SECS, TOPIC_STATS_RETENTION_SECS, +}; +use crate::DatanodeId; + +pub type TopicStatsRegistryRef = Arc; + +/// Manages statistics for all topics across the cluster. +pub struct TopicStatsRegistry { + inner: RwLock, +} + +impl Default for TopicStatsRegistry { + fn default() -> Self { + Self::new( + Duration::from_secs(TOPIC_STATS_RETENTION_SECS), + Duration::from_secs(TOPIC_STATS_REPORT_INTERVAL_SECS), + ) + } +} + +impl TopicStatsRegistry { + /// Creates a new topic stats registry. + /// + /// # Panics + /// Panic if the window size is zero. + fn new(retention: Duration, window_size: Duration) -> Self { + let history_limit = (retention.as_secs() / window_size.as_secs()).max(10) as usize; + Self { + inner: RwLock::new(TopicStatsStore::new(history_limit, window_size)), + } + } + + /// Adds a topic stat for a given datanode at a specific timestamp. + pub fn add_stat(&self, datanode_id: DatanodeId, stat: &TopicStat, millis_ts: i64) { + let mut inner = self.inner.write().unwrap(); + inner.add_stat(datanode_id, stat, millis_ts); + } + + /// Adds a list of topic stats for a given datanode at a specific timestamp. + pub fn add_stats(&self, datanode_id: DatanodeId, stats: &[TopicStat], millis_ts: i64) { + if stats.is_empty() { + return; + } + + let mut inner = self.inner.write().unwrap(); + for stat in stats { + inner.add_stat(datanode_id, stat, millis_ts); + } + } + + /// Gets the calculated topic stat for a given topic. + pub fn get_calculated_topic_stat( + &self, + topic: &str, + period: Duration, + ) -> Option { + let inner = self.inner.read().unwrap(); + inner.get_calculated_topic_stat(topic, period) + } + + /// Gets the latest entry id and timestamp for a given topic. + pub fn get_latest_entry_id(&self, topic: &str) -> Option<(u64, i64)> { + let inner = self.inner.read().unwrap(); + inner.get_latest_entry_id(topic) + } +} + +#[derive(Debug, PartialEq, Clone, Default)] +struct HistoryTopicStat { + /// The latest entry id of the topic. + pub latest_entry_id: u64, + /// The total size in bytes of records appended to the topic. + pub record_size: u64, + /// The total number of records appended to the topic. + pub record_num: u64, + /// The start timestamp of the stat. + start_ts: i64, +} + +#[derive(Debug)] +struct PartialTopicStat { + /// The latest entry id of the topic. + pub latest_entry_id: u64, + /// The total size in bytes of records appended to the topic. + pub record_size: u64, + /// The total number of records appended to the topic. + pub record_num: u64, + /// The timestamp of the partial topic stat. + pub timestamp: i64, +} + +struct ActiveBucket { + buffer: HashMap>, + start_ts: i64, + window_size: Duration, +} + +impl ActiveBucket { + fn new(timestamp: i64, window_sec: Duration) -> Self { + Self { + buffer: HashMap::new(), + start_ts: timestamp, + window_size: window_sec, + } + } + + fn acceptable_ts(&self, millis_ts: i64) -> bool { + let acceptable = millis_ts >= self.start_ts + && millis_ts < self.start_ts + self.window_size.as_millis() as i64; + if !acceptable { + debug!( + "acceptable range: ts >= {} && ts < {}, ts: {}", + self.start_ts, + self.start_ts + self.window_size.as_millis() as i64, + millis_ts + ); + } + acceptable + } + + /// Add a topic stat to the current topic stats. + /// + /// Returns true if the topic stat is added successfully (stale stat will be ignored directly), + /// false if the topic stat is out of the window. + fn add_stat(&mut self, datanode_id: DatanodeId, stat: &TopicStat, millis_ts: i64) -> bool { + if !self.acceptable_ts(millis_ts) { + return false; + } + + let datanode_stats = self.buffer.entry(datanode_id).or_default(); + + // Overwrite the topic stat if it already exists. + if let Some(prev) = datanode_stats.get_mut(&stat.topic) { + if millis_ts > prev.timestamp { + *prev = PartialTopicStat { + latest_entry_id: stat.latest_entry_id, + record_size: stat.record_size, + record_num: stat.record_num, + timestamp: millis_ts, + }; + } else { + warn!( + "Ignore stale topic stat for topic: {}, timestamp: {}, last recorded timestamp: {}", + stat.topic, millis_ts, prev.timestamp + ); + } + } else { + datanode_stats.insert( + stat.topic.to_string(), + PartialTopicStat { + latest_entry_id: stat.latest_entry_id, + record_size: stat.record_size, + record_num: stat.record_num, + timestamp: millis_ts, + }, + ); + } + true + } + + fn merge(self) -> HashMap { + let all_topics = self + .buffer + .values() + .flat_map(|stats| stats.keys()) + .collect::>(); + + let mut output = HashMap::with_capacity(all_topics.len()); + for topic in all_topics { + let stats = self + .buffer + .values() + .flat_map(|stats| stats.get(topic)) + .collect::>(); + debug!("stats: {:?} for topic: {}", stats, topic); + let latest_entry_id = stats + .iter() + .map(|stat| stat.latest_entry_id) + .max() + .unwrap_or(0); + let record_size = stats.iter().map(|stat| stat.record_size).sum::(); + let record_num = stats.iter().map(|stat| stat.record_num).sum::(); + + output.insert( + topic.to_string(), + HistoryTopicStat { + latest_entry_id, + record_size, + record_num, + start_ts: self.start_ts, + }, + ); + } + + output + } + + /// Get the partial topic stat of a datanode. + #[cfg(test)] + fn get_stat(&self, datanode_id: DatanodeId, topic: &str) -> Option<&PartialTopicStat> { + self.buffer + .get(&datanode_id) + .and_then(|stats| stats.get(topic)) + } +} + +/// Manages topic statistics over time, including active and historical buckets. +struct TopicStatsStore { + /// The currently active bucket collecting stats. + active_bucket: Option, + /// Historical merged buckets, grouped by topic. + history_by_topic: HashMap>, + /// Maximum number of historical windows to keep per topic. + history_limit: usize, + /// Duration of each stats window in seconds. + window_size: Duration, +} + +impl TopicStatsStore { + /// Create a new topic stats. + fn new(history_limit: usize, window_size: Duration) -> Self { + Self { + active_bucket: None, + history_by_topic: HashMap::new(), + history_limit, + window_size, + } + } + + /// Aligns the timestamp to the nearest second. + fn align_ts(millis_ts: i64) -> i64 { + (millis_ts / 1000) * 1000 + } + + fn rotate_active_bucket(&mut self, start_ts: i64) { + let aligned_ts = Self::align_ts(start_ts); + if let Some(old_bucket) = self.active_bucket.take() { + let merged = old_bucket.merge(); + for (topic, stat) in merged { + debug!( + "Merge current topic: {}, stats into history: {:?}", + topic, stat + ); + let history = self.history_by_topic.entry(topic).or_default(); + history.push_back(stat); + if history.len() > self.history_limit { + history.pop_front(); + } + } + } + + self.active_bucket = Some(ActiveBucket::new(aligned_ts, self.window_size)); + } + + /// Adds a topic stat for a given datanode at a specific timestamp. + fn add_stat(&mut self, datanode_id: DatanodeId, stat: &TopicStat, millis_ts: i64) { + let aligned_ts = Self::align_ts(millis_ts); + + let need_rotate = match &self.active_bucket { + Some(bucket) => !bucket.acceptable_ts(aligned_ts), + None => true, + }; + + if need_rotate { + debug!("Rotate active bucket at ts: {}", aligned_ts); + self.rotate_active_bucket(aligned_ts); + } + + // Safety: The current topic stats is initialized in the previous step. + let active_bucket = self.active_bucket.as_mut().unwrap(); + debug_assert!(active_bucket.add_stat(datanode_id, stat, millis_ts)); + } + + /// Gets the calculated topic stat for a given topic. + fn get_calculated_topic_stat( + &self, + topic: &str, + period: Duration, + ) -> Option { + let stats = self.history_by_topic.get(topic)?; + calculate_topic_stat(stats, period) + } + + /// Gets the latest entry id and timestamp for a given topic. + fn get_latest_entry_id(&self, topic: &str) -> Option<(u64, i64)> { + self.history_by_topic.get(topic).and_then(|stats| { + stats + .back() + .map(|stat| (stat.latest_entry_id, stat.start_ts)) + }) + } +} + +/// The calculated topic stat. +/// +/// The average record size is the average record size of the topic over the window. +/// The start timestamp is the timestamp of the window start. +/// The end timestamp is the timestamp of the window end. +pub struct CalculatedTopicStat { + pub avg_record_size: usize, + pub start_ts: i64, + pub end_ts: i64, +} + +/// Calculates the average record size for a topic within a specified time window based on recent merged statistics. +/// +/// Returns `Some(CalculatedTopicStat)` if the calculation is successful, or `None` if insufficient data is available. +fn calculate_topic_stat( + stats: &VecDeque, + period: Duration, +) -> Option { + if stats.len() < 2 { + return None; + } + + let last_stat = stats.back().unwrap(); + let first_stat = stats.front().unwrap(); + // Not enough stats data. + if first_stat.start_ts + period.as_millis() as i64 > last_stat.start_ts { + return None; + } + + // Find the first stat whose timestamp is less than the last stat's timestamp - period.as_millis() as i64. + // TODO(weny): Use binary search to find the target stat. + let target_stat = stats + .iter() + .rev() + .skip(1) + .find(|stat| (stat.start_ts + period.as_millis() as i64) < last_stat.start_ts); + + let target_stat = target_stat?; + + // The target stat's record size and record num should be less than the last stat's record size and record num. + if target_stat.record_size > last_stat.record_size + || target_stat.record_num > last_stat.record_num + { + return None; + } + + // Safety: the last stat's record size and record num must be greater than the target stat's record size and record num. + let record_size = last_stat.record_size - target_stat.record_size; + let record_num = last_stat.record_num - target_stat.record_num; + let avg_record_size = record_size.checked_div(record_num).unwrap_or(0) as usize; + + let start_ts = target_stat.start_ts; + let end_ts = last_stat.start_ts; + Some(CalculatedTopicStat { + avg_record_size, + start_ts, + end_ts, + }) +} + +#[cfg(test)] +mod tests { + use std::collections::VecDeque; + + use common_time::util::current_time_millis; + + use super::*; + use crate::datanode::TopicStat; + + fn merged_stat(ts: i64, record_size: u64, record_num: u64) -> HistoryTopicStat { + HistoryTopicStat { + start_ts: ts, + record_size, + record_num, + ..Default::default() + } + } + + #[test] + fn test_empty_stats() { + let stats: VecDeque = VecDeque::new(); + assert!(calculate_topic_stat(&stats, Duration::from_secs(10)).is_none()); + } + + #[test] + fn test_single_stat() { + let mut stats = VecDeque::new(); + stats.push_back(merged_stat(1000, 100, 2)); + assert!(calculate_topic_stat(&stats, Duration::from_secs(10)).is_none()); + } + + #[test] + fn test_no_target_stat_found() { + let mut stats = VecDeque::new(); + stats.push_back(merged_stat(1000, 100, 2)); + stats.push_back(merged_stat(2000, 200, 4)); + // window_sec is large, so no stat will be found + assert!(calculate_topic_stat(&stats, Duration::from_secs(100)).is_none()); + } + + #[test] + fn test_target_stat_found() { + let mut stats = VecDeque::new(); + stats.push_back(merged_stat(1000, 100, 2)); + stats.push_back(merged_stat(3000, 200, 4)); + stats.push_back(merged_stat(6000, 600, 6)); + let result = calculate_topic_stat(&stats, Duration::from_secs(2)); + assert!(result.is_some()); + let stat = result.unwrap(); + assert_eq!(stat.avg_record_size, 200); // (600 - 200) / (6 - 4) + assert_eq!(stat.start_ts, 3000); + assert_eq!(stat.end_ts, 6000); + } + + #[test] + fn test_target_stat_decreasing() { + let mut stats = VecDeque::new(); + stats.push_back(merged_stat(1000, 100, 2)); + stats.push_back(merged_stat(3000, 200, 4)); + stats.push_back(merged_stat(6000, 100, 1)); // Reset or something wrong + let result = calculate_topic_stat(&stats, Duration::from_secs(2)); + assert!(result.is_none()); + } + + #[test] + fn test_multiple_stats_target_found() { + let mut stats = VecDeque::new(); + stats.push_back(merged_stat(1000, 100, 2)); + stats.push_back(merged_stat(2000, 200, 4)); + stats.push_back(merged_stat(4000, 400, 8)); + stats.push_back(merged_stat(8000, 800, 16)); + let result = calculate_topic_stat(&stats, Duration::from_secs(3)); + assert!(result.is_some()); + let stat = result.unwrap(); + assert_eq!(stat.avg_record_size, 50); // (800 - 400) / (16 - 8) + assert_eq!(stat.start_ts, 4000); + assert_eq!(stat.end_ts, 8000); + } + + #[test] + fn test_active_bucket() { + let ts = current_time_millis(); + let window_size = Duration::from_secs(3); + let mut active_bucket = ActiveBucket::new(ts, window_size); + + assert!(active_bucket.add_stat( + 0, + &TopicStat { + topic: "test".to_string(), + latest_entry_id: 1, + record_size: 256, + record_num: 1, + }, + ts + 10, + )); + + assert!(active_bucket.add_stat( + 1, + &TopicStat { + topic: "test".to_string(), + latest_entry_id: 10, + record_size: 5120, + record_num: 10, + }, + ts + 10, + )); + + assert!(active_bucket.add_stat( + 0, + &TopicStat { + topic: "test1".to_string(), + latest_entry_id: 2, + record_size: 128, + record_num: 2, + }, + ts + 9, + )); + + // Out of the window. + assert!(!active_bucket.add_stat( + 0, + &TopicStat { + topic: "test".to_string(), + latest_entry_id: 2, + record_size: 2, + record_num: 2, + }, + ts + window_size.as_millis() as i64 + 1, + )); + + // Out of the window. + assert!(!active_bucket.add_stat( + 0, + &TopicStat { + topic: "test".to_string(), + latest_entry_id: 2, + record_size: 2, + record_num: 2, + }, + ts - 1 + )); + + // Overwrite the topic stat if the timestamp is larger. + assert!(active_bucket.add_stat( + 0, + &TopicStat { + topic: "test".to_string(), + latest_entry_id: 3, + record_size: 1024, + record_num: 3, + }, + ts + 11, + )); + assert_eq!( + active_bucket.get_stat(0, "test").unwrap().latest_entry_id, + 3 + ); + + // Ignore stale topic stat. + assert!(active_bucket.add_stat( + 0, + &TopicStat { + topic: "test".to_string(), + latest_entry_id: 2, + record_size: 512, + record_num: 2, + }, + ts + 9, + )); + + assert_eq!( + active_bucket.get_stat(0, "test").unwrap().latest_entry_id, + 3 + ); + + let merged = active_bucket.merge(); + assert_eq!(merged.len(), 2); + assert_eq!(merged.get("test").unwrap().latest_entry_id, 10); + assert_eq!(merged.get("test").unwrap().record_size, 5120 + 1024); + assert_eq!(merged.get("test").unwrap().record_num, 10 + 3); + + assert_eq!(merged.get("test1").unwrap().latest_entry_id, 2); + assert_eq!(merged.get("test1").unwrap().record_size, 128); + assert_eq!(merged.get("test1").unwrap().record_num, 2); + assert_eq!(merged.get("test1").unwrap().start_ts, ts); + } + + #[test] + fn test_topic_stats() { + let topic_name = "test"; + let window_size = Duration::from_secs(60); + let mut topic_stats = TopicStatsStore::new(5, window_size); + let ts = TopicStatsStore::align_ts(current_time_millis()); + debug!("add stat at ts: {}", ts); + topic_stats.add_stat( + 0, + &TopicStat { + topic: topic_name.to_string(), + latest_entry_id: 1, + record_size: 1024, + record_num: 1, + }, + ts, + ); + + debug!("add stat at ts: {}", ts + window_size.as_millis() as i64); + topic_stats.add_stat( + 1, + &TopicStat { + topic: topic_name.to_string(), + latest_entry_id: 4, + record_size: 4096, + record_num: 4, + }, + ts + window_size.as_millis() as i64 - 1, + ); + + topic_stats.add_stat( + 1, + &TopicStat { + topic: "another_topic".to_string(), + latest_entry_id: 4, + record_size: 4096, + record_num: 4, + }, + ts + window_size.as_millis() as i64 - 1, + ); + + debug!( + "add stat at ts: {}", + ts + window_size.as_millis() as i64 + 1 + ); + // Add a stat that is out of the window. + topic_stats.add_stat( + 1, + &TopicStat { + topic: topic_name.to_string(), + latest_entry_id: 5, + record_size: 8192, + record_num: 5, + }, + ts + window_size.as_millis() as i64, + ); + + let history = topic_stats.history_by_topic.get(topic_name).unwrap(); + assert_eq!(history.len(), 1); + assert_eq!( + history[0], + HistoryTopicStat { + latest_entry_id: 4, + record_size: 1024 + 4096, + record_num: 1 + 4, + start_ts: ts, + } + ); + assert!(topic_stats.active_bucket.is_some()); + } +} diff --git a/src/meta-srv/src/handler.rs b/src/meta-srv/src/handler.rs index 6ab5496c7c..0e4bc763f0 100644 --- a/src/meta-srv/src/handler.rs +++ b/src/meta-srv/src/handler.rs @@ -53,6 +53,7 @@ use tokio::sync::mpsc::Sender; use tokio::sync::{oneshot, watch, Notify, RwLock}; use crate::error::{self, DeserializeFromJsonSnafu, Result, UnexpectedInstructionReplySnafu}; +use crate::handler::collect_topic_stats_handler::CollectTopicStatsHandler; use crate::handler::flow_state_handler::FlowStateHandler; use crate::metasrv::Context; use crate::metrics::{METRIC_META_HANDLER_EXECUTE, METRIC_META_HEARTBEAT_CONNECTION_NUM}; @@ -65,6 +66,7 @@ pub mod check_leader_handler; pub mod collect_cluster_info_handler; pub mod collect_leader_region_handler; pub mod collect_stats_handler; +pub mod collect_topic_stats_handler; pub mod extract_stat_handler; pub mod failure_handler; pub mod filter_inactive_region_stats; @@ -76,6 +78,8 @@ pub mod publish_heartbeat_handler; pub mod region_lease_handler; pub mod remap_flow_peer_handler; pub mod response_header_handler; +#[cfg(test)] +pub mod test_utils; #[async_trait::async_trait] pub trait HeartbeatHandler: Send + Sync { @@ -619,6 +623,7 @@ impl HeartbeatHandlerGroupBuilder { self.add_handler_last(publish_heartbeat_handler); } self.add_handler_last(CollectLeaderRegionHandler); + self.add_handler_last(CollectTopicStatsHandler); self.add_handler_last(CollectStatsHandler::new(self.flush_stats_factor)); self.add_handler_last(RemapFlowPeerHandler::default()); @@ -897,8 +902,6 @@ mod tests { .unwrap(); let handlers = group.handlers; - assert_eq!(14, handlers.len()); - let names = [ "ResponseHeaderHandler", "DatanodeKeepLeaseHandler", @@ -912,10 +915,11 @@ mod tests { "MailboxHandler", "FilterInactiveRegionStatsHandler", "CollectLeaderRegionHandler", + "CollectTopicStatsHandler", "CollectStatsHandler", "RemapFlowPeerHandler", ]; - + assert_eq!(names.len(), handlers.len()); for (handler, name) in handlers.iter().zip(names.into_iter()) { assert_eq!(handler.name, name); } @@ -934,8 +938,6 @@ mod tests { let group = builder.build().unwrap(); let handlers = group.handlers; - assert_eq!(15, handlers.len()); - let names = [ "ResponseHeaderHandler", "DatanodeKeepLeaseHandler", @@ -950,10 +952,11 @@ mod tests { "CollectStatsHandler", "FilterInactiveRegionStatsHandler", "CollectLeaderRegionHandler", + "CollectTopicStatsHandler", "CollectStatsHandler", "RemapFlowPeerHandler", ]; - + assert_eq!(names.len(), handlers.len()); for (handler, name) in handlers.iter().zip(names.into_iter()) { assert_eq!(handler.name, name); } @@ -969,8 +972,6 @@ mod tests { let group = builder.build().unwrap(); let handlers = group.handlers; - assert_eq!(15, handlers.len()); - let names = [ "CollectStatsHandler", "ResponseHeaderHandler", @@ -985,10 +986,11 @@ mod tests { "MailboxHandler", "FilterInactiveRegionStatsHandler", "CollectLeaderRegionHandler", + "CollectTopicStatsHandler", "CollectStatsHandler", "RemapFlowPeerHandler", ]; - + assert_eq!(names.len(), handlers.len()); for (handler, name) in handlers.iter().zip(names.into_iter()) { assert_eq!(handler.name, name); } @@ -1004,8 +1006,6 @@ mod tests { let group = builder.build().unwrap(); let handlers = group.handlers; - assert_eq!(15, handlers.len()); - let names = [ "ResponseHeaderHandler", "DatanodeKeepLeaseHandler", @@ -1020,10 +1020,11 @@ mod tests { "CollectStatsHandler", "FilterInactiveRegionStatsHandler", "CollectLeaderRegionHandler", + "CollectTopicStatsHandler", "CollectStatsHandler", "RemapFlowPeerHandler", ]; - + assert_eq!(names.len(), handlers.len()); for (handler, name) in handlers.iter().zip(names.into_iter()) { assert_eq!(handler.name, name); } @@ -1039,8 +1040,6 @@ mod tests { let group = builder.build().unwrap(); let handlers = group.handlers; - assert_eq!(15, handlers.len()); - let names = [ "ResponseHeaderHandler", "DatanodeKeepLeaseHandler", @@ -1054,11 +1053,12 @@ mod tests { "MailboxHandler", "FilterInactiveRegionStatsHandler", "CollectLeaderRegionHandler", + "CollectTopicStatsHandler", "CollectStatsHandler", "ResponseHeaderHandler", "RemapFlowPeerHandler", ]; - + assert_eq!(names.len(), handlers.len()); for (handler, name) in handlers.iter().zip(names.into_iter()) { assert_eq!(handler.name, name); } @@ -1074,8 +1074,6 @@ mod tests { let group = builder.build().unwrap(); let handlers = group.handlers; - assert_eq!(14, handlers.len()); - let names = [ "ResponseHeaderHandler", "DatanodeKeepLeaseHandler", @@ -1089,10 +1087,12 @@ mod tests { "CollectStatsHandler", "FilterInactiveRegionStatsHandler", "CollectLeaderRegionHandler", + "CollectTopicStatsHandler", "CollectStatsHandler", "RemapFlowPeerHandler", ]; + assert_eq!(names.len(), handlers.len()); for (handler, name) in handlers.iter().zip(names.into_iter()) { assert_eq!(handler.name, name); } @@ -1108,8 +1108,6 @@ mod tests { let group = builder.build().unwrap(); let handlers = group.handlers; - assert_eq!(14, handlers.len()); - let names = [ "ResponseHeaderHandler", "DatanodeKeepLeaseHandler", @@ -1123,10 +1121,12 @@ mod tests { "MailboxHandler", "FilterInactiveRegionStatsHandler", "CollectLeaderRegionHandler", + "CollectTopicStatsHandler", "ResponseHeaderHandler", "RemapFlowPeerHandler", ]; + assert_eq!(names.len(), handlers.len()); for (handler, name) in handlers.iter().zip(names.into_iter()) { assert_eq!(handler.name, name); } @@ -1142,8 +1142,6 @@ mod tests { let group = builder.build().unwrap(); let handlers = group.handlers; - assert_eq!(14, handlers.len()); - let names = [ "CollectStatsHandler", "DatanodeKeepLeaseHandler", @@ -1157,10 +1155,11 @@ mod tests { "MailboxHandler", "FilterInactiveRegionStatsHandler", "CollectLeaderRegionHandler", + "CollectTopicStatsHandler", "CollectStatsHandler", "RemapFlowPeerHandler", ]; - + assert_eq!(names.len(), handlers.len()); for (handler, name) in handlers.iter().zip(names.into_iter()) { assert_eq!(handler.name, name); } diff --git a/src/meta-srv/src/handler/collect_leader_region_handler.rs b/src/meta-srv/src/handler/collect_leader_region_handler.rs index 695d77ce62..149660c413 100644 --- a/src/meta-srv/src/handler/collect_leader_region_handler.rs +++ b/src/meta-srv/src/handler/collect_leader_region_handler.rs @@ -59,51 +59,13 @@ impl HeartbeatHandler for CollectLeaderRegionHandler { #[cfg(test)] mod tests { - use std::sync::Arc; - - use common_meta::cache_invalidator::DummyCacheInvalidator; use common_meta::datanode::{RegionManifestInfo, RegionStat, Stat}; - use common_meta::key::TableMetadataManager; - use common_meta::kv_backend::memory::MemoryKvBackend; - use common_meta::region_registry::{LeaderRegionManifestInfo, LeaderRegionRegistry}; - use common_meta::sequence::SequenceBuilder; + use common_meta::region_registry::LeaderRegionManifestInfo; use store_api::region_engine::RegionRole; use store_api::storage::RegionId; use super::*; - use crate::cluster::MetaPeerClientBuilder; - use crate::handler::{HeartbeatMailbox, Pushers}; - use crate::service::store::cached_kv::LeaderCachedKvBackend; - - fn mock_ctx() -> Context { - let in_memory = Arc::new(MemoryKvBackend::new()); - let kv_backend = Arc::new(MemoryKvBackend::new()); - let leader_cached_kv_backend = Arc::new(LeaderCachedKvBackend::with_always_leader( - kv_backend.clone(), - )); - let seq = SequenceBuilder::new("test_seq", kv_backend.clone()).build(); - let mailbox = HeartbeatMailbox::create(Pushers::default(), seq); - let meta_peer_client = MetaPeerClientBuilder::default() - .election(None) - .in_memory(in_memory.clone()) - .build() - .map(Arc::new) - // Safety: all required fields set at initialization - .unwrap(); - Context { - server_addr: "127.0.0.1:0000".to_string(), - in_memory, - kv_backend: kv_backend.clone(), - leader_cached_kv_backend, - meta_peer_client, - mailbox, - election: None, - is_infancy: false, - table_metadata_manager: Arc::new(TableMetadataManager::new(kv_backend.clone())), - cache_invalidator: Arc::new(DummyCacheInvalidator), - leader_region_registry: Arc::new(LeaderRegionRegistry::new()), - } - } + use crate::handler::test_utils::TestEnv; fn new_region_stat(id: RegionId, manifest_version: u64, role: RegionRole) -> RegionStat { RegionStat { @@ -130,7 +92,8 @@ mod tests { #[tokio::test] async fn test_handle_collect_leader_region() { - let mut ctx = mock_ctx(); + let env = TestEnv::new(); + let mut ctx = env.ctx(); let mut acc = HeartbeatAccumulator { stat: Some(Stat { diff --git a/src/meta-srv/src/handler/collect_stats_handler.rs b/src/meta-srv/src/handler/collect_stats_handler.rs index c0cca2ca03..d444498798 100644 --- a/src/meta-srv/src/handler/collect_stats_handler.rs +++ b/src/meta-srv/src/handler/collect_stats_handler.rs @@ -217,49 +217,15 @@ async fn rewrite_node_address(ctx: &mut Context, stat: &Stat) { #[cfg(test)] mod tests { - use std::sync::Arc; - - use common_meta::cache_invalidator::DummyCacheInvalidator; use common_meta::datanode::DatanodeStatKey; - use common_meta::key::TableMetadataManager; - use common_meta::kv_backend::memory::MemoryKvBackend; - use common_meta::region_registry::LeaderRegionRegistry; - use common_meta::sequence::SequenceBuilder; use super::*; - use crate::cluster::MetaPeerClientBuilder; - use crate::handler::{HeartbeatMailbox, Pushers}; - use crate::service::store::cached_kv::LeaderCachedKvBackend; + use crate::handler::test_utils::TestEnv; #[tokio::test] async fn test_handle_datanode_stats() { - let in_memory = Arc::new(MemoryKvBackend::new()); - let kv_backend = Arc::new(MemoryKvBackend::new()); - let leader_cached_kv_backend = Arc::new(LeaderCachedKvBackend::with_always_leader( - kv_backend.clone(), - )); - let seq = SequenceBuilder::new("test_seq", kv_backend.clone()).build(); - let mailbox = HeartbeatMailbox::create(Pushers::default(), seq); - let meta_peer_client = MetaPeerClientBuilder::default() - .election(None) - .in_memory(in_memory.clone()) - .build() - .map(Arc::new) - // Safety: all required fields set at initialization - .unwrap(); - let ctx = Context { - server_addr: "127.0.0.1:0000".to_string(), - in_memory, - kv_backend: kv_backend.clone(), - leader_cached_kv_backend, - meta_peer_client, - mailbox, - election: None, - is_infancy: false, - table_metadata_manager: Arc::new(TableMetadataManager::new(kv_backend.clone())), - cache_invalidator: Arc::new(DummyCacheInvalidator), - leader_region_registry: Arc::new(LeaderRegionRegistry::new()), - }; + let env = TestEnv::new(); + let ctx = env.ctx(); let handler = CollectStatsHandler::default(); handle_request_many_times(ctx.clone(), &handler, 1).await; diff --git a/src/meta-srv/src/handler/collect_topic_stats_handler.rs b/src/meta-srv/src/handler/collect_topic_stats_handler.rs new file mode 100644 index 0000000000..c26e9204ad --- /dev/null +++ b/src/meta-srv/src/handler/collect_topic_stats_handler.rs @@ -0,0 +1,125 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use api::v1::meta::{HeartbeatRequest, Role}; + +use crate::error::Result; +use crate::handler::{HandleControl, HeartbeatAccumulator, HeartbeatHandler}; +use crate::metasrv::Context; + +pub struct CollectTopicStatsHandler; + +#[async_trait::async_trait] +impl HeartbeatHandler for CollectTopicStatsHandler { + fn is_acceptable(&self, role: Role) -> bool { + role == Role::Datanode + } + + async fn handle( + &self, + _req: &HeartbeatRequest, + ctx: &mut Context, + acc: &mut HeartbeatAccumulator, + ) -> Result { + let Some(current_stat) = acc.stat.as_ref() else { + return Ok(HandleControl::Continue); + }; + + ctx.topic_stats_registry.add_stats( + current_stat.id, + ¤t_stat.topic_stats, + current_stat.timestamp_millis, + ); + + Ok(HandleControl::Continue) + } +} + +#[cfg(test)] +mod tests { + use common_meta::datanode::{Stat, TopicStat}; + use common_meta::distributed_time_constants::TOPIC_STATS_REPORT_INTERVAL_SECS; + use common_time::util::current_time_millis; + + use super::*; + use crate::handler::test_utils::TestEnv; + + #[tokio::test] + async fn test_handle_collect_topic_stats() { + let env = TestEnv::new(); + let ctx = env.ctx(); + + let handler = CollectTopicStatsHandler; + let timestamp_millis = current_time_millis(); + let aligned_ts = timestamp_millis - timestamp_millis % 1000; + handle_request_many_times(ctx.clone(), &handler, 1, timestamp_millis, 10).await; + handle_request_many_times(ctx.clone(), &handler, 2, timestamp_millis, 10).await; + + // trigger the next window + let next_timestamp_millis = + timestamp_millis + (TOPIC_STATS_REPORT_INTERVAL_SECS * 1000) as i64; + handle_request_many_times(ctx.clone(), &handler, 1, next_timestamp_millis, 10).await; + + let latest_entry_id = ctx + .topic_stats_registry + .get_latest_entry_id("test") + .unwrap(); + assert_eq!(latest_entry_id, (15, aligned_ts)); + let latest_entry_id = ctx + .topic_stats_registry + .get_latest_entry_id("test2") + .unwrap(); + assert_eq!(latest_entry_id, (10, aligned_ts)); + assert!(ctx + .topic_stats_registry + .get_latest_entry_id("test3") + .is_none()); + } + + async fn handle_request_many_times( + mut ctx: Context, + handler: &CollectTopicStatsHandler, + datanode_id: u64, + timestamp_millis: i64, + loop_times: i32, + ) { + let req = HeartbeatRequest::default(); + for i in 1..=loop_times { + let mut acc = HeartbeatAccumulator { + stat: Some(Stat { + id: datanode_id, + region_num: i as _, + timestamp_millis, + topic_stats: vec![ + TopicStat { + topic: "test".to_string(), + latest_entry_id: 15, + record_size: 1024, + record_num: 2, + }, + TopicStat { + topic: "test2".to_string(), + latest_entry_id: 10, + record_size: 1024, + record_num: 2, + }, + ], + ..Default::default() + }), + ..Default::default() + }; + handler.handle(&req, &mut ctx, &mut acc).await.unwrap(); + } + } +} diff --git a/src/meta-srv/src/handler/keep_lease_handler.rs b/src/meta-srv/src/handler/keep_lease_handler.rs index 007d328ef0..53273ce40e 100644 --- a/src/meta-srv/src/handler/keep_lease_handler.rs +++ b/src/meta-srv/src/handler/keep_lease_handler.rs @@ -127,51 +127,18 @@ async fn put_into_memory_store(ctx: &mut Context, key: Vec, value: Vec, #[cfg(test)] mod tests { use std::collections::HashMap; - use std::sync::Arc; use api::v1::meta::RequestHeader; - use common_meta::cache_invalidator::DummyCacheInvalidator; use common_meta::datanode::Stat; - use common_meta::key::TableMetadataManager; - use common_meta::kv_backend::memory::MemoryKvBackend; - use common_meta::region_registry::LeaderRegionRegistry; - use common_meta::sequence::SequenceBuilder; use super::*; - use crate::cluster::MetaPeerClientBuilder; - use crate::handler::{HeartbeatMailbox, Pushers}; + use crate::handler::test_utils::TestEnv; use crate::lease::find_datanode_lease_value; - use crate::service::store::cached_kv::LeaderCachedKvBackend; #[tokio::test] async fn test_put_into_memory_store() { - let in_memory = Arc::new(MemoryKvBackend::new()); - let kv_backend = Arc::new(MemoryKvBackend::new()); - let leader_cached_kv_backend = Arc::new(LeaderCachedKvBackend::with_always_leader( - kv_backend.clone(), - )); - let seq = SequenceBuilder::new("test_seq", kv_backend.clone()).build(); - let mailbox = HeartbeatMailbox::create(Pushers::default(), seq); - let meta_peer_client = MetaPeerClientBuilder::default() - .election(None) - .in_memory(in_memory.clone()) - .build() - .map(Arc::new) - // Safety: all required fields set at initialization - .unwrap(); - let ctx = Context { - server_addr: "127.0.0.1:0000".to_string(), - in_memory, - kv_backend: kv_backend.clone(), - leader_cached_kv_backend, - meta_peer_client, - mailbox, - election: None, - is_infancy: false, - table_metadata_manager: Arc::new(TableMetadataManager::new(kv_backend.clone())), - cache_invalidator: Arc::new(DummyCacheInvalidator), - leader_region_registry: Arc::new(LeaderRegionRegistry::new()), - }; + let env = TestEnv::new(); + let ctx = env.ctx(); let handler = DatanodeKeepLeaseHandler; handle_request_many_times(ctx.clone(), &handler, 1).await; diff --git a/src/meta-srv/src/handler/response_header_handler.rs b/src/meta-srv/src/handler/response_header_handler.rs index 3f7bdd5f02..5b017f9917 100644 --- a/src/meta-srv/src/handler/response_header_handler.rs +++ b/src/meta-srv/src/handler/response_header_handler.rs @@ -43,50 +43,16 @@ impl HeartbeatHandler for ResponseHeaderHandler { #[cfg(test)] mod tests { - use std::sync::Arc; - use api::v1::meta::RequestHeader; - use common_meta::cache_invalidator::DummyCacheInvalidator; - use common_meta::key::TableMetadataManager; - use common_meta::kv_backend::memory::MemoryKvBackend; - use common_meta::region_registry::LeaderRegionRegistry; - use common_meta::sequence::SequenceBuilder; use common_telemetry::tracing_context::W3cTrace; use super::*; - use crate::cluster::MetaPeerClientBuilder; - use crate::handler::{Context, HeartbeatMailbox, Pushers}; - use crate::service::store::cached_kv::LeaderCachedKvBackend; + use crate::handler::test_utils::TestEnv; #[tokio::test] async fn test_handle_heartbeat_resp_header() { - let in_memory = Arc::new(MemoryKvBackend::new()); - let kv_backend = Arc::new(MemoryKvBackend::new()); - let leader_cached_kv_backend = Arc::new(LeaderCachedKvBackend::with_always_leader( - kv_backend.clone(), - )); - let seq = SequenceBuilder::new("test_seq", kv_backend.clone()).build(); - let mailbox = HeartbeatMailbox::create(Pushers::default(), seq); - let meta_peer_client = MetaPeerClientBuilder::default() - .election(None) - .in_memory(in_memory.clone()) - .build() - .map(Arc::new) - // Safety: all required fields set at initialization - .unwrap(); - let mut ctx = Context { - server_addr: "127.0.0.1:0000".to_string(), - in_memory, - kv_backend: kv_backend.clone(), - leader_cached_kv_backend, - meta_peer_client, - mailbox, - election: None, - is_infancy: false, - table_metadata_manager: Arc::new(TableMetadataManager::new(kv_backend.clone())), - cache_invalidator: Arc::new(DummyCacheInvalidator), - leader_region_registry: Arc::new(LeaderRegionRegistry::new()), - }; + let env = TestEnv::new(); + let mut ctx = env.ctx(); let req = HeartbeatRequest { header: Some(RequestHeader::new(2, Role::Datanode, W3cTrace::new())), diff --git a/src/meta-srv/src/handler/test_utils.rs b/src/meta-srv/src/handler/test_utils.rs new file mode 100644 index 0000000000..742aee4b23 --- /dev/null +++ b/src/meta-srv/src/handler/test_utils.rs @@ -0,0 +1,95 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::sync::Arc; + +use common_meta::cache_invalidator::{CacheInvalidatorRef, DummyCacheInvalidator}; +use common_meta::key::{TableMetadataManager, TableMetadataManagerRef}; +use common_meta::kv_backend::memory::MemoryKvBackend; +use common_meta::kv_backend::{KvBackendRef, ResettableKvBackendRef}; +use common_meta::region_registry::{LeaderRegionRegistry, LeaderRegionRegistryRef}; +use common_meta::sequence::SequenceBuilder; +use common_meta::stats::topic::{TopicStatsRegistry, TopicStatsRegistryRef}; + +use crate::cluster::{MetaPeerClientBuilder, MetaPeerClientRef}; +use crate::handler::{HeartbeatMailbox, Pushers}; +use crate::metasrv::Context; +use crate::service::mailbox::MailboxRef; +use crate::service::store::cached_kv::LeaderCachedKvBackend; + +pub struct TestEnv { + in_memory: ResettableKvBackendRef, + kv_backend: KvBackendRef, + leader_cached_kv_backend: Arc, + mailbox: MailboxRef, + meta_peer_client: MetaPeerClientRef, + table_metadata_manager: TableMetadataManagerRef, + cache_invalidator: CacheInvalidatorRef, + leader_region_registry: LeaderRegionRegistryRef, + topic_stats_registry: TopicStatsRegistryRef, +} + +impl Default for TestEnv { + fn default() -> Self { + Self::new() + } +} + +impl TestEnv { + pub fn new() -> Self { + let in_memory = Arc::new(MemoryKvBackend::new()); + let kv_backend = Arc::new(MemoryKvBackend::new()); + let leader_cached_kv_backend = Arc::new(LeaderCachedKvBackend::with_always_leader( + kv_backend.clone(), + )); + let seq = SequenceBuilder::new("test_seq", kv_backend.clone()).build(); + let mailbox = HeartbeatMailbox::create(Pushers::default(), seq); + let meta_peer_client = MetaPeerClientBuilder::default() + .election(None) + .in_memory(in_memory.clone()) + .build() + .map(Arc::new) + // Safety: all required fields set at initialization + .unwrap(); + Self { + in_memory, + kv_backend: kv_backend.clone(), + leader_cached_kv_backend, + mailbox, + meta_peer_client, + table_metadata_manager: Arc::new(TableMetadataManager::new(kv_backend.clone())), + cache_invalidator: Arc::new(DummyCacheInvalidator), + leader_region_registry: Arc::new(LeaderRegionRegistry::new()), + topic_stats_registry: Arc::new(TopicStatsRegistry::default()), + } + } + + /// Returns a new context for testing. + pub fn ctx(&self) -> Context { + Context { + in_memory: self.in_memory.clone(), + kv_backend: self.kv_backend.clone(), + leader_cached_kv_backend: self.leader_cached_kv_backend.clone(), + server_addr: "127.0.0.1:0000".to_string(), + meta_peer_client: self.meta_peer_client.clone(), + mailbox: self.mailbox.clone(), + election: None, + is_infancy: false, + table_metadata_manager: self.table_metadata_manager.clone(), + cache_invalidator: self.cache_invalidator.clone(), + leader_region_registry: self.leader_region_registry.clone(), + topic_stats_registry: self.topic_stats_registry.clone(), + } + } +} diff --git a/src/meta-srv/src/metasrv.rs b/src/meta-srv/src/metasrv.rs index a62d875954..081a86dbc0 100644 --- a/src/meta-srv/src/metasrv.rs +++ b/src/meta-srv/src/metasrv.rs @@ -40,6 +40,7 @@ use common_meta::reconciliation::manager::ReconciliationManagerRef; use common_meta::region_keeper::MemoryRegionKeeperRef; use common_meta::region_registry::LeaderRegionRegistryRef; use common_meta::sequence::SequenceRef; +use common_meta::stats::topic::TopicStatsRegistryRef; use common_meta::wal_options_allocator::WalOptionsAllocatorRef; use common_options::datanode::DatanodeClientOptions; use common_options::memory::MemoryOptions; @@ -308,6 +309,7 @@ pub struct Context { pub table_metadata_manager: TableMetadataManagerRef, pub cache_invalidator: CacheInvalidatorRef, pub leader_region_registry: LeaderRegionRegistryRef, + pub topic_stats_registry: TopicStatsRegistryRef, } impl Context { @@ -458,6 +460,7 @@ pub struct Metasrv { region_supervisor_ticker: Option, cache_invalidator: CacheInvalidatorRef, leader_region_registry: LeaderRegionRegistryRef, + topic_stats_registry: TopicStatsRegistryRef, wal_prune_ticker: Option, table_id_sequence: SequenceRef, reconciliation_manager: ReconciliationManagerRef, @@ -756,6 +759,7 @@ impl Metasrv { let table_metadata_manager = self.table_metadata_manager.clone(); let cache_invalidator = self.cache_invalidator.clone(); let leader_region_registry = self.leader_region_registry.clone(); + let topic_stats_registry = self.topic_stats_registry.clone(); Context { server_addr, @@ -769,6 +773,7 @@ impl Metasrv { table_metadata_manager, cache_invalidator, leader_region_registry, + topic_stats_registry, } } } diff --git a/src/meta-srv/src/metasrv/builder.rs b/src/meta-srv/src/metasrv/builder.rs index 29456317c5..68e9af3dba 100644 --- a/src/meta-srv/src/metasrv/builder.rs +++ b/src/meta-srv/src/metasrv/builder.rs @@ -27,7 +27,7 @@ use common_meta::ddl::{ DdlContext, NoopRegionFailureDetectorControl, RegionFailureDetectorControllerRef, }; use common_meta::ddl_manager::DdlManager; -use common_meta::distributed_time_constants; +use common_meta::distributed_time_constants::{self}; use common_meta::key::flow::flow_state::FlowStateManager; use common_meta::key::flow::FlowMetadataManager; use common_meta::key::runtime_switch::{RuntimeSwitchManager, RuntimeSwitchManagerRef}; @@ -40,6 +40,7 @@ use common_meta::region_keeper::MemoryRegionKeeper; use common_meta::region_registry::LeaderRegionRegistry; use common_meta::sequence::SequenceBuilder; use common_meta::state_store::KvStateStore; +use common_meta::stats::topic::TopicStatsRegistry; use common_meta::wal_options_allocator::{build_kafka_client, build_wal_options_allocator}; use common_procedure::local::{LocalManager, ManagerConfig}; use common_procedure::ProcedureManagerRef; @@ -372,6 +373,7 @@ impl MetasrvBuilder { }; let leader_region_registry = Arc::new(LeaderRegionRegistry::default()); + let topic_stats_registry = Arc::new(TopicStatsRegistry::default()); let ddl_context = DdlContext { node_manager: node_manager.clone(), @@ -507,6 +509,7 @@ impl MetasrvBuilder { wal_prune_ticker, table_id_sequence, reconciliation_manager, + topic_stats_registry, }) } }