From 9e58311ecd2838761dbcfcccc85ae126b9422e88 Mon Sep 17 00:00:00 2001 From: fys <40801205+Fengys123@users.noreply.github.com> Date: Mon, 9 Jan 2023 16:13:53 +0800 Subject: [PATCH] feat: datanode support report number of regions to meta (#838) * feat: dn support report number of regions to meta * put the heartbeat batch to store * cr: change region_number's parameter to &CatalogManagerRef * cr: when dn failed to get region number, report region_num = -1 to meta --- src/api/greptime/v1/meta/heartbeat.proto | 16 +- src/catalog/src/lib.rs | 37 ++++- src/datanode/src/heartbeat.rs | 27 +++- src/datanode/src/instance.rs | 1 + src/datanode/src/mock.rs | 1 + src/meta-srv/src/handler.rs | 2 +- src/meta-srv/src/handler/node_stat.rs | 52 +++++-- .../src/handler/persist_stats_handler.rs | 86 ++++++++++- src/meta-srv/src/keys.rs | 138 +++++++++++++++++- src/meta-srv/src/lib.rs | 4 +- 10 files changed, 332 insertions(+), 32 deletions(-) diff --git a/src/api/greptime/v1/meta/heartbeat.proto b/src/api/greptime/v1/meta/heartbeat.proto index 6d37806a7a..91a8bcae55 100644 --- a/src/api/greptime/v1/meta/heartbeat.proto +++ b/src/api/greptime/v1/meta/heartbeat.proto @@ -34,13 +34,13 @@ message HeartbeatRequest { message NodeStat { // The read capacity units during this period - uint64 rcus = 1; + int64 rcus = 1; // The write capacity units during this period - uint64 wcus = 2; + int64 wcus = 2; // How many tables on this node - uint64 table_num = 3; + int64 table_num = 3; // How many regions on this node - uint64 region_num = 4; + int64 region_num = 4; double cpu_usage = 5; double load = 6; @@ -57,13 +57,13 @@ message RegionStat { uint64 region_id = 1; TableName table_name = 2; // The read capacity units during this period - uint64 rcus = 3; + int64 rcus = 3; // The write capacity units during this period - uint64 wcus = 4; + int64 wcus = 4; // Approximate bytes of this region - uint64 approximate_bytes = 5; + int64 approximate_bytes = 5; // Approximate number of rows in this region - uint64 approximate_rows = 6; + int64 approximate_rows = 6; // Others map attrs = 100; diff --git a/src/catalog/src/lib.rs b/src/catalog/src/lib.rs index a7c53ffe9e..5f1d2d89fb 100644 --- a/src/catalog/src/lib.rs +++ b/src/catalog/src/lib.rs @@ -19,7 +19,7 @@ use std::fmt::{Debug, Formatter}; use std::sync::Arc; use common_telemetry::info; -use snafu::ResultExt; +use snafu::{OptionExt, ResultExt}; use table::engine::{EngineContext, TableEngineRef}; use table::metadata::TableId; use table::requests::CreateTableRequest; @@ -208,3 +208,38 @@ pub(crate) async fn handle_system_table_request<'a, M: CatalogManager>( } Ok(()) } + +/// The number of regions in the datanode node. +pub fn region_number(catalog_manager: &CatalogManagerRef) -> Result { + let mut region_number: u64 = 0; + + for catalog_name in catalog_manager.catalog_names()? { + let catalog = + catalog_manager + .catalog(&catalog_name)? + .context(error::CatalogNotFoundSnafu { + catalog_name: &catalog_name, + })?; + + for schema_name in catalog.schema_names()? { + let schema = catalog + .schema(&schema_name)? + .context(error::SchemaNotFoundSnafu { + catalog: &catalog_name, + schema: &schema_name, + })?; + + for table_name in schema.table_names()? { + let table = schema + .table(&table_name)? + .context(error::TableNotFoundSnafu { + table_info: &table_name, + })?; + + let region_numbers = &table.table_info().meta.region_numbers; + region_number += region_numbers.len() as u64; + } + } + } + Ok(region_number) +} diff --git a/src/datanode/src/heartbeat.rs b/src/datanode/src/heartbeat.rs index bf17ef67ed..c791b8e69e 100644 --- a/src/datanode/src/heartbeat.rs +++ b/src/datanode/src/heartbeat.rs @@ -16,19 +16,20 @@ use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::Arc; use std::time::Duration; -use api::v1::meta::{HeartbeatRequest, HeartbeatResponse, Peer}; +use api::v1::meta::{HeartbeatRequest, HeartbeatResponse, NodeStat, Peer}; +use catalog::{region_number, CatalogManagerRef}; use common_telemetry::{error, info, warn}; use meta_client::client::{HeartbeatSender, MetaClient}; use snafu::ResultExt; use crate::error::{MetaClientInitSnafu, Result}; -#[derive(Debug, Clone, Default)] pub struct HeartbeatTask { node_id: u64, server_addr: String, running: Arc, meta_client: Arc, + catalog_manager: CatalogManagerRef, interval: u64, } @@ -40,12 +41,18 @@ impl Drop for HeartbeatTask { impl HeartbeatTask { /// Create a new heartbeat task instance. - pub fn new(node_id: u64, server_addr: String, meta_client: Arc) -> Self { + pub fn new( + node_id: u64, + server_addr: String, + meta_client: Arc, + catalog_manager: CatalogManagerRef, + ) -> Self { Self { node_id, server_addr, running: Arc::new(AtomicBool::new(false)), meta_client, + catalog_manager, interval: 5_000, // default interval is set to 5 secs } } @@ -92,16 +99,30 @@ impl HeartbeatTask { let server_addr = self.server_addr.clone(); let meta_client = self.meta_client.clone(); + let catalog_manager_clone = self.catalog_manager.clone(); let mut tx = Self::create_streams(&meta_client, running.clone()).await?; common_runtime::spawn_bg(async move { while running.load(Ordering::Acquire) { + let region_num = match region_number(&catalog_manager_clone) { + Ok(region_num) => region_num as i64, + Err(e) => { + error!("failed to get region number, err: {e:?}"); + -1 + } + }; + let req = HeartbeatRequest { peer: Some(Peer { id: node_id, addr: server_addr.clone(), }), + node_stat: Some(NodeStat { + region_num, + ..Default::default() + }), ..Default::default() }; + if let Err(e) = tx.send(req).await { error!("Failed to send heartbeat to metasrv, error: {:?}", e); match Self::create_streams(&meta_client, running.clone()).await { diff --git a/src/datanode/src/instance.rs b/src/datanode/src/instance.rs index 8a41520d3f..c2a3802253 100644 --- a/src/datanode/src/instance.rs +++ b/src/datanode/src/instance.rs @@ -146,6 +146,7 @@ impl Instance { opts.node_id.context(MissingNodeIdSnafu)?, opts.rpc_addr.clone(), meta_client.as_ref().unwrap().clone(), + catalog_manager.clone(), )), }; Ok(Self { diff --git a/src/datanode/src/mock.rs b/src/datanode/src/mock.rs index 211ef99ad9..053ab289fe 100644 --- a/src/datanode/src/mock.rs +++ b/src/datanode/src/mock.rs @@ -71,6 +71,7 @@ impl Instance { opts.node_id.unwrap_or(42), opts.rpc_addr.clone(), meta_client.clone(), + catalog_manager.clone(), ); Ok(Self { query_engine: query_engine.clone(), diff --git a/src/meta-srv/src/handler.rs b/src/meta-srv/src/handler.rs index 8f98913ee0..4b7b39e3ac 100644 --- a/src/meta-srv/src/handler.rs +++ b/src/meta-srv/src/handler.rs @@ -22,7 +22,7 @@ mod check_leader_handler; mod collect_stats_handler; mod instruction; mod keep_lease_handler; -mod node_stat; +pub(crate) mod node_stat; mod persist_stats_handler; mod response_header_handler; diff --git a/src/meta-srv/src/handler/node_stat.rs b/src/meta-srv/src/handler/node_stat.rs index e530469ace..9431b01a53 100644 --- a/src/meta-srv/src/handler/node_stat.rs +++ b/src/meta-srv/src/handler/node_stat.rs @@ -14,8 +14,11 @@ use api::v1::meta::HeartbeatRequest; use common_time::util as time_util; +use serde::{Deserialize, Serialize}; -#[derive(Debug)] +use crate::keys::StatKey; + +#[derive(Debug, Default, Serialize, Deserialize)] pub struct Stat { pub timestamp_millis: i64, pub cluster_id: u64, @@ -24,13 +27,13 @@ pub struct Stat { /// Leader node pub is_leader: bool, /// The read capacity units during this period - pub rcus: u64, + pub rcus: i64, /// The write capacity units during this period - pub wcus: u64, + pub wcus: i64, /// How many tables on this node - pub table_num: u64, + pub table_num: i64, /// How many regions on this node - pub region_num: u64, + pub region_num: i64, pub cpu_usage: f64, pub load: f64, /// Read disk IO on this node @@ -41,20 +44,29 @@ pub struct Stat { pub region_stats: Vec, } -#[derive(Debug)] +#[derive(Debug, Serialize, Deserialize)] pub struct RegionStat { pub id: u64, pub catalog: String, pub schema: String, pub table: String, /// The read capacity units during this period - pub rcus: u64, + pub rcus: i64, /// The write capacity units during this period - pub wcus: u64, + pub wcus: i64, /// Approximate bytes of this region - pub approximate_bytes: u64, + pub approximate_bytes: i64, /// Approximate number of rows in this region - pub approximate_rows: u64, + pub approximate_rows: i64, +} + +impl Stat { + pub fn stat_key(&self) -> StatKey { + StatKey { + cluster_id: self.cluster_id, + node_id: self.id, + } + } } impl TryFrom<&HeartbeatRequest> for Stat { @@ -107,3 +119,23 @@ impl From<&api::v1::meta::RegionStat> for RegionStat { } } } + +#[cfg(test)] +mod tests { + use crate::handler::node_stat::Stat; + + #[test] + fn test_stat_key() { + let stat = Stat { + cluster_id: 3, + id: 101, + region_num: 10, + ..Default::default() + }; + + let stat_key = stat.stat_key(); + + assert_eq!(3, stat_key.cluster_id); + assert_eq!(101, stat_key.node_id); + } +} diff --git a/src/meta-srv/src/handler/persist_stats_handler.rs b/src/meta-srv/src/handler/persist_stats_handler.rs index df3010a80f..d3c6b21751 100644 --- a/src/meta-srv/src/handler/persist_stats_handler.rs +++ b/src/meta-srv/src/handler/persist_stats_handler.rs @@ -12,10 +12,11 @@ // See the License for the specific language governing permissions and // limitations under the License. -use api::v1::meta::HeartbeatRequest; +use api::v1::meta::{HeartbeatRequest, PutRequest}; use crate::error::Result; use crate::handler::{HeartbeatAccumulator, HeartbeatHandler}; +use crate::keys::StatValue; use crate::metasrv::Context; #[derive(Default)] @@ -33,8 +34,89 @@ impl HeartbeatHandler for PersistStatsHandler { return Ok(()); } - // TODO(jiachun): remove stats from `acc` and persist to store + let stats = &mut acc.stats; + let key = match stats.get(0) { + Some(stat) => stat.stat_key(), + None => return Ok(()), + }; + + // take stats from &mut acc.stats, avoid clone of vec + let stats = std::mem::take(stats); + + let val = &StatValue { stats }; + + let put = PutRequest { + key: key.into(), + value: val.try_into()?, + ..Default::default() + }; + + ctx.kv_store.put(put).await?; Ok(()) } } + +#[cfg(test)] +mod tests { + use std::sync::atomic::AtomicBool; + use std::sync::Arc; + + use api::v1::meta::RangeRequest; + + use super::*; + use crate::handler::node_stat::Stat; + use crate::keys::StatKey; + use crate::service::store::memory::MemStore; + + #[tokio::test] + async fn test_handle_datanode_stats() { + let kv_store = Arc::new(MemStore::new()); + let ctx = Context { + datanode_lease_secs: 30, + server_addr: "127.0.0.1:0000".to_string(), + kv_store, + election: None, + skip_all: Arc::new(AtomicBool::new(false)), + }; + + let req = HeartbeatRequest::default(); + let mut acc = HeartbeatAccumulator { + stats: vec![Stat { + cluster_id: 3, + id: 101, + region_num: 100, + ..Default::default() + }], + ..Default::default() + }; + + let stats_handler = PersistStatsHandler; + stats_handler.handle(&req, &ctx, &mut acc).await.unwrap(); + + let key = StatKey { + cluster_id: 3, + node_id: 101, + }; + + let req = RangeRequest { + key: key.try_into().unwrap(), + ..Default::default() + }; + + let res = ctx.kv_store.range(req).await.unwrap(); + + assert_eq!(1, res.kvs.len()); + + let kv = &res.kvs[0]; + + let key: StatKey = kv.key.clone().try_into().unwrap(); + assert_eq!(3, key.cluster_id); + assert_eq!(101, key.node_id); + + let val: StatValue = kv.value.clone().try_into().unwrap(); + + assert_eq!(1, val.stats.len()); + assert_eq!(100, val.stats[0].region_num); + } +} diff --git a/src/meta-srv/src/keys.rs b/src/meta-srv/src/keys.rs index 1efec0c187..b4f5b22ab3 100644 --- a/src/meta-srv/src/keys.rs +++ b/src/meta-srv/src/keys.rs @@ -23,17 +23,22 @@ use snafu::{ensure, OptionExt, ResultExt}; use crate::error; use crate::error::Result; +use crate::handler::node_stat::Stat; pub(crate) const REMOVED_PREFIX: &str = "__removed"; pub(crate) const DN_LEASE_PREFIX: &str = "__meta_dnlease"; pub(crate) const SEQ_PREFIX: &str = "__meta_seq"; pub(crate) const TABLE_ROUTE_PREFIX: &str = "__meta_table_route"; +pub const DN_STAT_PREFIX: &str = "__meta_dnstat"; + lazy_static! { - static ref DATANODE_KEY_PATTERN: Regex = + static ref DATANODE_LEASE_KEY_PATTERN: Regex = Regex::new(&format!("^{DN_LEASE_PREFIX}-([0-9]+)-([0-9]+)$")).unwrap(); + static ref DATANODE_STAT_KEY_PATTERN: Regex = + Regex::new(&format!("^{DN_STAT_PREFIX}-([0-9]+)-([0-9]+)$")).unwrap(); } -#[derive(Debug, Clone, Eq, PartialEq)] +#[derive(Debug, Clone, Eq, Hash, PartialEq)] pub struct LeaseKey { pub cluster_id: u64, pub node_id: u64, @@ -43,7 +48,7 @@ impl FromStr for LeaseKey { type Err = error::Error; fn from_str(key: &str) -> Result { - let caps = DATANODE_KEY_PATTERN + let caps = DATANODE_LEASE_KEY_PATTERN .captures(key) .context(error::InvalidLeaseKeySnafu { key })?; @@ -169,12 +174,135 @@ impl<'a> TableRouteKey<'a> { } } +#[derive(Eq, PartialEq, Debug)] +pub struct StatKey { + pub cluster_id: u64, + pub node_id: u64, +} + +impl From for Vec { + fn from(value: StatKey) -> Self { + format!("{}-{}-{}", DN_STAT_PREFIX, value.cluster_id, value.node_id).into_bytes() + } +} + +impl FromStr for StatKey { + type Err = error::Error; + + fn from_str(key: &str) -> Result { + let caps = DATANODE_STAT_KEY_PATTERN + .captures(key) + .context(error::InvalidLeaseKeySnafu { key })?; + + ensure!(caps.len() == 3, error::InvalidLeaseKeySnafu { key }); + + let cluster_id = caps[1].to_string(); + let node_id = caps[2].to_string(); + let cluster_id: u64 = cluster_id.parse().context(error::ParseNumSnafu { + err_msg: format!("invalid cluster_id: {cluster_id}"), + })?; + let node_id: u64 = node_id.parse().context(error::ParseNumSnafu { + err_msg: format!("invalid node_id: {node_id}"), + })?; + + Ok(Self { + cluster_id, + node_id, + }) + } +} + +impl TryFrom> for StatKey { + type Error = error::Error; + + fn try_from(bytes: Vec) -> Result { + String::from_utf8(bytes) + .context(error::LeaseKeyFromUtf8Snafu {}) + .map(|x| x.parse())? + } +} + +#[derive(Debug, Serialize, Deserialize)] +#[serde(transparent)] +pub struct StatValue { + pub stats: Vec, +} + +impl TryFrom<&StatValue> for Vec { + type Error = error::Error; + + fn try_from(stats: &StatValue) -> Result { + Ok(serde_json::to_string(stats) + .context(crate::error::SerializeToJsonSnafu { + input: format!("{stats:?}"), + })? + .into_bytes()) + } +} + +impl FromStr for StatValue { + type Err = error::Error; + + fn from_str(value: &str) -> Result { + serde_json::from_str(value).context(error::DeserializeFromJsonSnafu { input: value }) + } +} + +impl TryFrom> for StatValue { + type Error = error::Error; + + fn try_from(value: Vec) -> Result { + String::from_utf8(value) + .context(error::LeaseKeyFromUtf8Snafu {}) + .map(|x| x.parse())? + } +} + #[cfg(test)] mod tests { use super::*; #[test] - fn test_datanode_lease_key() { + fn test_stat_key_round_trip() { + let key = StatKey { + cluster_id: 0, + node_id: 1, + }; + + let key_bytes: Vec = key.try_into().unwrap(); + let new_key: StatKey = key_bytes.try_into().unwrap(); + + assert_eq!(0, new_key.cluster_id); + assert_eq!(1, new_key.node_id); + } + + #[test] + fn test_stat_val_round_trip() { + let stat = Stat { + cluster_id: 0, + id: 101, + is_leader: false, + region_num: 100, + ..Default::default() + }; + + let stat_val = &StatValue { stats: vec![stat] }; + + let bytes: Vec = stat_val.try_into().unwrap(); + let stat_val: StatValue = bytes.try_into().unwrap(); + let stats = stat_val.stats; + + assert_eq!(1, stats.len()); + + let stat = stats.get(0).unwrap(); + assert_eq!(0, stat.cluster_id); + assert_eq!(101, stat.id); + assert!(!stat.is_leader); + assert_eq!(100, stat.region_num); + } + + #[test] + fn test_lease_key_round_trip() { let key = LeaseKey { cluster_id: 0, node_id: 1, @@ -187,7 +315,7 @@ mod tests { } #[test] - fn test_datanode_lease_value() { + fn test_lease_value_round_trip() { let value = LeaseValue { timestamp_millis: 111, node_addr: "127.0.0.1:3002".to_string(), diff --git a/src/meta-srv/src/lib.rs b/src/meta-srv/src/lib.rs index b0424b4ae2..02e671de62 100644 --- a/src/meta-srv/src/lib.rs +++ b/src/meta-srv/src/lib.rs @@ -17,7 +17,7 @@ pub mod bootstrap; pub mod election; pub mod error; pub mod handler; -mod keys; +pub mod keys; pub mod lease; pub mod metasrv; #[cfg(feature = "mock")] @@ -25,6 +25,6 @@ pub mod mocks; pub mod selector; mod sequence; pub mod service; -mod util; +pub mod util; pub use crate::error::Result;