diff --git a/Cargo.lock b/Cargo.lock index 991829a758..3d5783dcd4 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3202,6 +3202,7 @@ dependencies = [ "query", "reqwest", "serde", + "serde_json", "servers", "session", "snafu 0.8.4", diff --git a/src/catalog/src/error.rs b/src/catalog/src/error.rs index 14748a4695..dd7071b095 100644 --- a/src/catalog/src/error.rs +++ b/src/catalog/src/error.rs @@ -50,13 +50,20 @@ pub enum Error { source: BoxedError, }, - #[snafu(display("Failed to list nodes in cluster: {source}"))] + #[snafu(display("Failed to list nodes in cluster"))] ListNodes { #[snafu(implicit)] location: Location, source: BoxedError, }, + #[snafu(display("Failed to region stats in cluster"))] + ListRegionStats { + #[snafu(implicit)] + location: Location, + source: BoxedError, + }, + #[snafu(display("Failed to list flows in catalog {catalog}"))] ListFlows { #[snafu(implicit)] @@ -314,6 +321,7 @@ impl ErrorExt for Error { | Error::ListTables { source, .. } | Error::ListFlows { source, .. } | Error::ListProcedures { source, .. } + | Error::ListRegionStats { source, .. } | Error::ConvertProtoData { source, .. } => source.status_code(), Error::CreateTable { source, .. } => source.status_code(), diff --git a/src/catalog/src/kvbackend/manager.rs b/src/catalog/src/kvbackend/manager.rs index 2495a95255..e3377afbe1 100644 --- a/src/catalog/src/kvbackend/manager.rs +++ b/src/catalog/src/kvbackend/manager.rs @@ -64,12 +64,17 @@ use crate::CatalogManager; #[derive(Clone)] pub struct KvBackendCatalogManager { mode: Mode, + /// Only available in `Distributed` mode. meta_client: Option>, + /// Manages partition rules. partition_manager: PartitionRuleManagerRef, + /// Manages table metadata. table_metadata_manager: TableMetadataManagerRef, /// A sub-CatalogManager that handles system tables system_catalog: SystemCatalog, + /// Cache registry for all caches. cache_registry: LayeredCacheRegistryRef, + /// Only available in `Standalone` mode. procedure_manager: Option, } diff --git a/src/catalog/src/system_schema/information_schema.rs b/src/catalog/src/system_schema/information_schema.rs index 83f2ff4926..9fa31b85fd 100644 --- a/src/catalog/src/system_schema/information_schema.rs +++ b/src/catalog/src/system_schema/information_schema.rs @@ -20,6 +20,7 @@ pub mod key_column_usage; mod partitions; mod procedure_info; mod region_peers; +mod region_statistics; mod runtime_metrics; pub mod schemata; mod table_constraints; @@ -194,6 +195,11 @@ impl SystemSchemaProviderInner for InformationSchemaProvider { self.catalog_manager.clone(), )) as _, ), + REGION_STATISTICS => Some(Arc::new( + region_statistics::InformationSchemaRegionStatistics::new( + self.catalog_manager.clone(), + ), + ) as _), _ => None, } } @@ -241,6 +247,14 @@ impl InformationSchemaProvider { CLUSTER_INFO.to_string(), self.build_table(CLUSTER_INFO).unwrap(), ); + tables.insert( + PROCEDURE_INFO.to_string(), + self.build_table(PROCEDURE_INFO).unwrap(), + ); + tables.insert( + REGION_STATISTICS.to_string(), + self.build_table(REGION_STATISTICS).unwrap(), + ); } tables.insert(TABLES.to_string(), self.build_table(TABLES).unwrap()); @@ -256,10 +270,6 @@ impl InformationSchemaProvider { self.build_table(TABLE_CONSTRAINTS).unwrap(), ); tables.insert(FLOWS.to_string(), self.build_table(FLOWS).unwrap()); - tables.insert( - PROCEDURE_INFO.to_string(), - self.build_table(PROCEDURE_INFO).unwrap(), - ); // Add memory tables for name in MEMORY_TABLES.iter() { tables.insert((*name).to_string(), self.build_table(name).expect(name)); diff --git a/src/catalog/src/system_schema/information_schema/region_statistics.rs b/src/catalog/src/system_schema/information_schema/region_statistics.rs new file mode 100644 index 0000000000..07b94ede54 --- /dev/null +++ b/src/catalog/src/system_schema/information_schema/region_statistics.rs @@ -0,0 +1,257 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::sync::{Arc, Weak}; + +use arrow_schema::SchemaRef as ArrowSchemaRef; +use common_catalog::consts::INFORMATION_SCHEMA_REGION_STATISTICS_TABLE_ID; +use common_config::Mode; +use common_error::ext::BoxedError; +use common_meta::cluster::ClusterInfo; +use common_meta::datanode::RegionStat; +use common_recordbatch::adapter::RecordBatchStreamAdapter; +use common_recordbatch::{DfSendableRecordBatchStream, RecordBatch, SendableRecordBatchStream}; +use common_telemetry::tracing::warn; +use datafusion::execution::TaskContext; +use datafusion::physical_plan::stream::RecordBatchStreamAdapter as DfRecordBatchStreamAdapter; +use datafusion::physical_plan::streaming::PartitionStream as DfPartitionStream; +use datatypes::prelude::{ConcreteDataType, ScalarVectorBuilder, VectorRef}; +use datatypes::schema::{ColumnSchema, Schema, SchemaRef}; +use datatypes::value::Value; +use datatypes::vectors::{StringVectorBuilder, UInt32VectorBuilder, UInt64VectorBuilder}; +use snafu::ResultExt; +use store_api::storage::{ScanRequest, TableId}; + +use super::{InformationTable, REGION_STATISTICS}; +use crate::error::{CreateRecordBatchSnafu, InternalSnafu, ListRegionStatsSnafu, Result}; +use crate::information_schema::Predicates; +use crate::system_schema::utils; +use crate::CatalogManager; + +const REGION_ID: &str = "region_id"; +const TABLE_ID: &str = "table_id"; +const REGION_NUMBER: &str = "region_number"; +const MEMTABLE_SIZE: &str = "memtable_size"; +const MANIFEST_SIZE: &str = "manifest_size"; +const SST_SIZE: &str = "sst_size"; +const ENGINE: &str = "engine"; +const REGION_ROLE: &str = "region_role"; + +const INIT_CAPACITY: usize = 42; + +/// The `REGION_STATISTICS` table provides information about the region statistics. Including fields: +/// +/// - `region_id`: The region id. +/// - `table_id`: The table id. +/// - `region_number`: The region number. +/// - `memtable_size`: The memtable size in bytes. +/// - `manifest_size`: The manifest size in bytes. +/// - `sst_size`: The sst size in bytes. +/// - `engine`: The engine type. +/// - `region_role`: The region role. +/// +pub(super) struct InformationSchemaRegionStatistics { + schema: SchemaRef, + catalog_manager: Weak, +} + +impl InformationSchemaRegionStatistics { + pub(super) fn new(catalog_manager: Weak) -> Self { + Self { + schema: Self::schema(), + catalog_manager, + } + } + + pub(crate) fn schema() -> SchemaRef { + Arc::new(Schema::new(vec![ + ColumnSchema::new(REGION_ID, ConcreteDataType::uint64_datatype(), false), + ColumnSchema::new(TABLE_ID, ConcreteDataType::uint32_datatype(), false), + ColumnSchema::new(REGION_NUMBER, ConcreteDataType::uint32_datatype(), false), + ColumnSchema::new(MEMTABLE_SIZE, ConcreteDataType::uint64_datatype(), true), + ColumnSchema::new(MANIFEST_SIZE, ConcreteDataType::uint64_datatype(), true), + ColumnSchema::new(SST_SIZE, ConcreteDataType::uint64_datatype(), true), + ColumnSchema::new(ENGINE, ConcreteDataType::string_datatype(), true), + ColumnSchema::new(REGION_ROLE, ConcreteDataType::string_datatype(), true), + ])) + } + + fn builder(&self) -> InformationSchemaRegionStatisticsBuilder { + InformationSchemaRegionStatisticsBuilder::new( + self.schema.clone(), + self.catalog_manager.clone(), + ) + } +} + +impl InformationTable for InformationSchemaRegionStatistics { + fn table_id(&self) -> TableId { + INFORMATION_SCHEMA_REGION_STATISTICS_TABLE_ID + } + + fn table_name(&self) -> &'static str { + REGION_STATISTICS + } + + fn schema(&self) -> SchemaRef { + self.schema.clone() + } + + fn to_stream(&self, request: ScanRequest) -> Result { + let schema = self.schema.arrow_schema().clone(); + let mut builder = self.builder(); + + let stream = Box::pin(DfRecordBatchStreamAdapter::new( + schema, + futures::stream::once(async move { + builder + .make_region_statistics(Some(request)) + .await + .map(|x| x.into_df_record_batch()) + .map_err(Into::into) + }), + )); + + Ok(Box::pin( + RecordBatchStreamAdapter::try_new(stream) + .map_err(BoxedError::new) + .context(InternalSnafu)?, + )) + } +} + +struct InformationSchemaRegionStatisticsBuilder { + schema: SchemaRef, + catalog_manager: Weak, + + region_ids: UInt64VectorBuilder, + table_ids: UInt32VectorBuilder, + region_numbers: UInt32VectorBuilder, + memtable_sizes: UInt64VectorBuilder, + manifest_sizes: UInt64VectorBuilder, + sst_sizes: UInt64VectorBuilder, + engines: StringVectorBuilder, + region_roles: StringVectorBuilder, +} + +impl InformationSchemaRegionStatisticsBuilder { + fn new(schema: SchemaRef, catalog_manager: Weak) -> Self { + Self { + schema, + catalog_manager, + region_ids: UInt64VectorBuilder::with_capacity(INIT_CAPACITY), + table_ids: UInt32VectorBuilder::with_capacity(INIT_CAPACITY), + region_numbers: UInt32VectorBuilder::with_capacity(INIT_CAPACITY), + memtable_sizes: UInt64VectorBuilder::with_capacity(INIT_CAPACITY), + manifest_sizes: UInt64VectorBuilder::with_capacity(INIT_CAPACITY), + sst_sizes: UInt64VectorBuilder::with_capacity(INIT_CAPACITY), + engines: StringVectorBuilder::with_capacity(INIT_CAPACITY), + region_roles: StringVectorBuilder::with_capacity(INIT_CAPACITY), + } + } + + /// Construct a new `InformationSchemaRegionStatistics` from the collected data. + async fn make_region_statistics( + &mut self, + request: Option, + ) -> Result { + let predicates = Predicates::from_scan_request(&request); + let mode = utils::running_mode(&self.catalog_manager)?.unwrap_or(Mode::Standalone); + + match mode { + Mode::Standalone => { + // TODO(weny): implement it + } + Mode::Distributed => { + if let Some(meta_client) = utils::meta_client(&self.catalog_manager)? { + let region_stats = meta_client + .list_region_stats() + .await + .map_err(BoxedError::new) + .context(ListRegionStatsSnafu)?; + for region_stat in region_stats { + self.add_region_statistic(&predicates, region_stat); + } + } else { + warn!("Meta client is not available"); + } + } + } + + self.finish() + } + + fn add_region_statistic(&mut self, predicate: &Predicates, region_stat: RegionStat) { + let row = [ + (REGION_ID, &Value::from(region_stat.id.as_u64())), + (TABLE_ID, &Value::from(region_stat.id.table_id())), + (REGION_NUMBER, &Value::from(region_stat.id.region_number())), + (MEMTABLE_SIZE, &Value::from(region_stat.memtable_size)), + (MANIFEST_SIZE, &Value::from(region_stat.manifest_size)), + (SST_SIZE, &Value::from(region_stat.sst_size)), + (ENGINE, &Value::from(region_stat.engine.as_str())), + (REGION_ROLE, &Value::from(region_stat.role.to_string())), + ]; + + if !predicate.eval(&row) { + return; + } + + self.region_ids.push(Some(region_stat.id.as_u64())); + self.table_ids.push(Some(region_stat.id.table_id())); + self.region_numbers + .push(Some(region_stat.id.region_number())); + self.memtable_sizes.push(Some(region_stat.memtable_size)); + self.manifest_sizes.push(Some(region_stat.manifest_size)); + self.sst_sizes.push(Some(region_stat.sst_size)); + self.engines.push(Some(®ion_stat.engine)); + self.region_roles.push(Some(®ion_stat.role.to_string())); + } + + fn finish(&mut self) -> Result { + let columns: Vec = vec![ + Arc::new(self.region_ids.finish()), + Arc::new(self.table_ids.finish()), + Arc::new(self.region_numbers.finish()), + Arc::new(self.memtable_sizes.finish()), + Arc::new(self.manifest_sizes.finish()), + Arc::new(self.sst_sizes.finish()), + Arc::new(self.engines.finish()), + Arc::new(self.region_roles.finish()), + ]; + + RecordBatch::new(self.schema.clone(), columns).context(CreateRecordBatchSnafu) + } +} + +impl DfPartitionStream for InformationSchemaRegionStatistics { + fn schema(&self) -> &ArrowSchemaRef { + self.schema.arrow_schema() + } + + fn execute(&self, _: Arc) -> DfSendableRecordBatchStream { + let schema = self.schema.arrow_schema().clone(); + let mut builder = self.builder(); + Box::pin(DfRecordBatchStreamAdapter::new( + schema, + futures::stream::once(async move { + builder + .make_region_statistics(None) + .await + .map(|x| x.into_df_record_batch()) + .map_err(Into::into) + }), + )) + } +} diff --git a/src/catalog/src/system_schema/information_schema/table_names.rs b/src/catalog/src/system_schema/information_schema/table_names.rs index a62f4ddb40..53541bf955 100644 --- a/src/catalog/src/system_schema/information_schema/table_names.rs +++ b/src/catalog/src/system_schema/information_schema/table_names.rs @@ -46,3 +46,4 @@ pub const CLUSTER_INFO: &str = "cluster_info"; pub const VIEWS: &str = "views"; pub const FLOWS: &str = "flows"; pub const PROCEDURE_INFO: &str = "procedure_info"; +pub const REGION_STATISTICS: &str = "region_statistics"; diff --git a/src/common/catalog/src/consts.rs b/src/common/catalog/src/consts.rs index 2a8e2fc0e4..9cf02b81ee 100644 --- a/src/common/catalog/src/consts.rs +++ b/src/common/catalog/src/consts.rs @@ -100,6 +100,9 @@ pub const INFORMATION_SCHEMA_VIEW_TABLE_ID: u32 = 32; pub const INFORMATION_SCHEMA_FLOW_TABLE_ID: u32 = 33; /// id for information_schema.procedure_info pub const INFORMATION_SCHEMA_PROCEDURE_INFO_TABLE_ID: u32 = 34; +/// id for information_schema.region_statistics +pub const INFORMATION_SCHEMA_REGION_STATISTICS_TABLE_ID: u32 = 35; + /// ----- End of information_schema tables ----- /// ----- Begin of pg_catalog tables ----- diff --git a/src/common/meta/src/cluster.rs b/src/common/meta/src/cluster.rs index 0cf593f1ca..785a28f17d 100644 --- a/src/common/meta/src/cluster.rs +++ b/src/common/meta/src/cluster.rs @@ -20,6 +20,7 @@ use regex::Regex; use serde::{Deserialize, Serialize}; use snafu::{ensure, OptionExt, ResultExt}; +use crate::datanode::RegionStat; use crate::error::{ DecodeJsonSnafu, EncodeJsonSnafu, Error, FromUtf8Snafu, InvalidNodeInfoKeySnafu, InvalidRoleSnafu, ParseNumSnafu, Result, @@ -47,6 +48,9 @@ pub trait ClusterInfo { role: Option, ) -> std::result::Result, Self::Error>; + /// List all region stats in the cluster. + async fn list_region_stats(&self) -> std::result::Result, Self::Error>; + // TODO(jeremy): Other info, like region status, etc. } diff --git a/src/common/meta/src/datanode.rs b/src/common/meta/src/datanode.rs new file mode 100644 index 0000000000..4551b8de2f --- /dev/null +++ b/src/common/meta/src/datanode.rs @@ -0,0 +1,413 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::collections::HashSet; +use std::str::FromStr; + +use api::v1::meta::{HeartbeatRequest, RequestHeader}; +use common_time::util as time_util; +use lazy_static::lazy_static; +use regex::Regex; +use serde::{Deserialize, Serialize}; +use snafu::{ensure, OptionExt, ResultExt}; +use store_api::region_engine::{RegionRole, RegionStatistic}; +use store_api::storage::RegionId; +use table::metadata::TableId; + +use crate::error::Result; +use crate::{error, ClusterId}; + +pub(crate) const DATANODE_LEASE_PREFIX: &str = "__meta_datanode_lease"; +const INACTIVE_REGION_PREFIX: &str = "__meta_inactive_region"; + +const DATANODE_STAT_PREFIX: &str = "__meta_datanode_stat"; + +pub const REGION_STATISTIC_KEY: &str = "__region_statistic"; + +lazy_static! { + pub(crate) static ref DATANODE_LEASE_KEY_PATTERN: Regex = + Regex::new(&format!("^{DATANODE_LEASE_PREFIX}-([0-9]+)-([0-9]+)$")).unwrap(); + static ref DATANODE_STAT_KEY_PATTERN: Regex = + Regex::new(&format!("^{DATANODE_STAT_PREFIX}-([0-9]+)-([0-9]+)$")).unwrap(); + static ref INACTIVE_REGION_KEY_PATTERN: Regex = Regex::new(&format!( + "^{INACTIVE_REGION_PREFIX}-([0-9]+)-([0-9]+)-([0-9]+)$" + )) + .unwrap(); +} + +/// The key of the datanode stat in the storage. +/// +/// The format is `__meta_datanode_stat-{cluster_id}-{node_id}`. +#[derive(Debug, Clone, Default, Serialize, Deserialize)] +pub struct Stat { + pub timestamp_millis: i64, + pub cluster_id: ClusterId, + // The datanode Id. + pub id: u64, + // The datanode address. + pub addr: String, + /// The read capacity units during this period + pub rcus: i64, + /// The write capacity units during this period + pub wcus: i64, + /// How many regions on this node + pub region_num: u64, + pub region_stats: Vec, + // The node epoch is used to check whether the node has restarted or redeployed. + pub node_epoch: u64, +} + +/// The statistics of a region. +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct RegionStat { + /// The region_id. + pub id: RegionId, + /// The read capacity units during this period + pub rcus: i64, + /// The write capacity units during this period + pub wcus: i64, + /// Approximate bytes of this region + pub approximate_bytes: i64, + /// The engine name. + pub engine: String, + /// The region role. + pub role: RegionRole, + /// The size of the memtable in bytes. + pub memtable_size: u64, + /// The size of the manifest in bytes. + pub manifest_size: u64, + /// The size of the SST files in bytes. + pub sst_size: u64, +} + +impl Stat { + #[inline] + pub fn is_empty(&self) -> bool { + self.region_stats.is_empty() + } + + pub fn stat_key(&self) -> DatanodeStatKey { + DatanodeStatKey { + cluster_id: self.cluster_id, + node_id: self.id, + } + } + + /// Returns a tuple array containing [RegionId] and [RegionRole]. + pub fn regions(&self) -> Vec<(RegionId, RegionRole)> { + self.region_stats.iter().map(|s| (s.id, s.role)).collect() + } + + /// Returns all table ids in the region stats. + pub fn table_ids(&self) -> HashSet { + self.region_stats.iter().map(|s| s.id.table_id()).collect() + } + + /// Retains the active region stats and updates the rcus, wcus, and region_num. + pub fn retain_active_region_stats(&mut self, inactive_region_ids: &HashSet) { + if inactive_region_ids.is_empty() { + return; + } + + self.region_stats + .retain(|r| !inactive_region_ids.contains(&r.id)); + self.rcus = self.region_stats.iter().map(|s| s.rcus).sum(); + self.wcus = self.region_stats.iter().map(|s| s.wcus).sum(); + self.region_num = self.region_stats.len() as u64; + } +} + +impl TryFrom<&HeartbeatRequest> for Stat { + type Error = Option; + + fn try_from(value: &HeartbeatRequest) -> std::result::Result { + let HeartbeatRequest { + header, + peer, + region_stats, + node_epoch, + .. + } = value; + + match (header, peer) { + (Some(header), Some(peer)) => { + let region_stats = region_stats + .iter() + .map(RegionStat::from) + .collect::>(); + + Ok(Self { + timestamp_millis: time_util::current_time_millis(), + cluster_id: header.cluster_id, + // datanode id + id: peer.id, + // datanode address + addr: peer.addr.clone(), + rcus: region_stats.iter().map(|s| s.rcus).sum(), + wcus: region_stats.iter().map(|s| s.wcus).sum(), + region_num: region_stats.len() as u64, + region_stats, + node_epoch: *node_epoch, + }) + } + (header, _) => Err(header.clone()), + } + } +} + +impl From<&api::v1::meta::RegionStat> for RegionStat { + fn from(value: &api::v1::meta::RegionStat) -> Self { + let region_stat = value + .extensions + .get(REGION_STATISTIC_KEY) + .and_then(|value| RegionStatistic::deserialize_from_slice(value)) + .unwrap_or_default(); + + Self { + id: RegionId::from_u64(value.region_id), + rcus: value.rcus, + wcus: value.wcus, + approximate_bytes: value.approximate_bytes, + engine: value.engine.to_string(), + role: RegionRole::from(value.role()), + memtable_size: region_stat.memtable_size, + manifest_size: region_stat.manifest_size, + sst_size: region_stat.sst_size, + } + } +} + +/// The key of the datanode stat in the memory store. +/// +/// The format is `__meta_datanode_stat-{cluster_id}-{node_id}`. +#[derive(Debug, Clone, Copy, Eq, PartialEq, Hash)] +pub struct DatanodeStatKey { + pub cluster_id: ClusterId, + pub node_id: u64, +} + +impl DatanodeStatKey { + /// The key prefix. + pub fn prefix_key() -> Vec { + format!("{DATANODE_STAT_PREFIX}-").into_bytes() + } + + /// The key prefix with the cluster id. + pub fn key_prefix_with_cluster_id(cluster_id: ClusterId) -> String { + format!("{DATANODE_STAT_PREFIX}-{cluster_id}-") + } +} + +impl From for Vec { + fn from(value: DatanodeStatKey) -> Self { + format!( + "{}-{}-{}", + DATANODE_STAT_PREFIX, value.cluster_id, value.node_id + ) + .into_bytes() + } +} + +impl FromStr for DatanodeStatKey { + type Err = error::Error; + + fn from_str(key: &str) -> Result { + let caps = DATANODE_STAT_KEY_PATTERN + .captures(key) + .context(error::InvalidStatKeySnafu { key })?; + + ensure!(caps.len() == 3, error::InvalidStatKeySnafu { key }); + + let cluster_id = caps[1].to_string(); + let node_id = caps[2].to_string(); + let cluster_id: u64 = cluster_id.parse().context(error::ParseNumSnafu { + err_msg: format!("invalid cluster_id: {cluster_id}"), + })?; + let node_id: u64 = node_id.parse().context(error::ParseNumSnafu { + err_msg: format!("invalid node_id: {node_id}"), + })?; + + Ok(Self { + cluster_id, + node_id, + }) + } +} + +impl TryFrom> for DatanodeStatKey { + type Error = error::Error; + + fn try_from(bytes: Vec) -> Result { + String::from_utf8(bytes) + .context(error::FromUtf8Snafu { + name: "DatanodeStatKey", + }) + .map(|x| x.parse())? + } +} + +/// The value of the datanode stat in the memory store. +#[derive(Debug, Clone, Serialize, Deserialize)] +#[serde(transparent)] +pub struct DatanodeStatValue { + pub stats: Vec, +} + +impl DatanodeStatValue { + /// Get the latest number of regions. + pub fn region_num(&self) -> Option { + self.stats.last().map(|x| x.region_num) + } + + /// Get the latest node addr. + pub fn node_addr(&self) -> Option { + self.stats.last().map(|x| x.addr.clone()) + } +} + +impl TryFrom for Vec { + type Error = error::Error; + + fn try_from(stats: DatanodeStatValue) -> Result { + Ok(serde_json::to_string(&stats) + .context(error::SerializeToJsonSnafu { + input: format!("{stats:?}"), + })? + .into_bytes()) + } +} + +impl FromStr for DatanodeStatValue { + type Err = error::Error; + + fn from_str(value: &str) -> Result { + serde_json::from_str(value).context(error::DeserializeFromJsonSnafu { input: value }) + } +} + +impl TryFrom> for DatanodeStatValue { + type Error = error::Error; + + fn try_from(value: Vec) -> Result { + String::from_utf8(value) + .context(error::FromUtf8Snafu { + name: "DatanodeStatValue", + }) + .map(|x| x.parse())? + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_stat_key() { + let stat = Stat { + cluster_id: 3, + id: 101, + region_num: 10, + ..Default::default() + }; + + let stat_key = stat.stat_key(); + + assert_eq!(3, stat_key.cluster_id); + assert_eq!(101, stat_key.node_id); + } + + #[test] + fn test_stat_val_round_trip() { + let stat = Stat { + cluster_id: 0, + id: 101, + region_num: 100, + ..Default::default() + }; + + let stat_val = DatanodeStatValue { stats: vec![stat] }; + + let bytes: Vec = stat_val.try_into().unwrap(); + let stat_val: DatanodeStatValue = bytes.try_into().unwrap(); + let stats = stat_val.stats; + + assert_eq!(1, stats.len()); + + let stat = stats.first().unwrap(); + assert_eq!(0, stat.cluster_id); + assert_eq!(101, stat.id); + assert_eq!(100, stat.region_num); + } + + #[test] + fn test_get_addr_from_stat_val() { + let empty = DatanodeStatValue { stats: vec![] }; + let addr = empty.node_addr(); + assert!(addr.is_none()); + + let stat_val = DatanodeStatValue { + stats: vec![ + Stat { + addr: "1".to_string(), + ..Default::default() + }, + Stat { + addr: "2".to_string(), + ..Default::default() + }, + Stat { + addr: "3".to_string(), + ..Default::default() + }, + ], + }; + let addr = stat_val.node_addr().unwrap(); + assert_eq!("3", addr); + } + + #[test] + fn test_get_region_num_from_stat_val() { + let empty = DatanodeStatValue { stats: vec![] }; + let region_num = empty.region_num(); + assert!(region_num.is_none()); + + let wrong = DatanodeStatValue { + stats: vec![Stat { + region_num: 0, + ..Default::default() + }], + }; + let right = wrong.region_num(); + assert_eq!(Some(0), right); + + let stat_val = DatanodeStatValue { + stats: vec![ + Stat { + region_num: 1, + ..Default::default() + }, + Stat { + region_num: 0, + ..Default::default() + }, + Stat { + region_num: 2, + ..Default::default() + }, + ], + }; + let region_num = stat_val.region_num().unwrap(); + assert_eq!(2, region_num); + } +} diff --git a/src/common/meta/src/error.rs b/src/common/meta/src/error.rs index b244b1e310..849ee28948 100644 --- a/src/common/meta/src/error.rs +++ b/src/common/meta/src/error.rs @@ -218,6 +218,24 @@ pub enum Error { error: JsonError, }, + #[snafu(display("Failed to serialize to json: {}", input))] + SerializeToJson { + input: String, + #[snafu(source)] + error: serde_json::error::Error, + #[snafu(implicit)] + location: Location, + }, + + #[snafu(display("Failed to deserialize from json: {}", input))] + DeserializeFromJson { + input: String, + #[snafu(source)] + error: serde_json::error::Error, + #[snafu(implicit)] + location: Location, + }, + #[snafu(display("Payload not exist"))] PayloadNotExist { #[snafu(implicit)] @@ -531,13 +549,20 @@ pub enum Error { location: Location, }, - #[snafu(display("Invalid node info key: {}", key))] + #[snafu(display("Invalid node info key: {}", key))] InvalidNodeInfoKey { key: String, #[snafu(implicit)] location: Location, }, + #[snafu(display("Invalid node stat key: {}", key))] + InvalidStatKey { + key: String, + #[snafu(implicit)] + location: Location, + }, + #[snafu(display("Failed to parse number: {}", err_msg))] ParseNum { err_msg: String, @@ -627,7 +652,9 @@ impl ErrorExt for Error { | EtcdTxnFailed { .. } | ConnectEtcd { .. } | MoveValues { .. } - | GetCache { .. } => StatusCode::Internal, + | GetCache { .. } + | SerializeToJson { .. } + | DeserializeFromJson { .. } => StatusCode::Internal, ValueNotExist { .. } => StatusCode::Unexpected, @@ -700,6 +727,7 @@ impl ErrorExt for Error { | InvalidNumTopics { .. } | SchemaNotFound { .. } | InvalidNodeInfoKey { .. } + | InvalidStatKey { .. } | ParseNum { .. } | InvalidRole { .. } | EmptyDdlTasks { .. } => StatusCode::InvalidArguments, diff --git a/src/common/meta/src/lib.rs b/src/common/meta/src/lib.rs index 78d111c479..1dd658890c 100644 --- a/src/common/meta/src/lib.rs +++ b/src/common/meta/src/lib.rs @@ -22,6 +22,7 @@ pub mod cache; pub mod cache_invalidator; pub mod cluster; +pub mod datanode; pub mod ddl; pub mod ddl_manager; pub mod distributed_time_constants; diff --git a/src/datanode/Cargo.toml b/src/datanode/Cargo.toml index 26a7ccb675..c7ef0a7242 100644 --- a/src/datanode/Cargo.toml +++ b/src/datanode/Cargo.toml @@ -53,6 +53,7 @@ prost.workspace = true query.workspace = true reqwest.workspace = true serde.workspace = true +serde_json.workspace = true servers.workspace = true session.workspace = true snafu.workspace = true diff --git a/src/datanode/src/heartbeat.rs b/src/datanode/src/heartbeat.rs index 6633d0ab62..d84552a8d2 100644 --- a/src/datanode/src/heartbeat.rs +++ b/src/datanode/src/heartbeat.rs @@ -12,11 +12,13 @@ // See the License for the specific language governing permissions and // limitations under the License. +use std::collections::HashMap; use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::Arc; use std::time::Duration; use api::v1::meta::{HeartbeatRequest, NodeInfo, Peer, RegionRole, RegionStat}; +use common_meta::datanode::REGION_STATISTIC_KEY; use common_meta::distributed_time_constants::META_KEEP_ALIVE_INTERVAL_SECS; use common_meta::heartbeat::handler::parse_mailbox_message::ParseMailboxMessageHandler; use common_meta::heartbeat::handler::{ @@ -320,16 +322,25 @@ impl HeartbeatTask { region_server .reportable_regions() .into_iter() - .map(|stat| RegionStat { - region_id: stat.region_id.as_u64(), - engine: stat.engine, - role: RegionRole::from(stat.role).into(), - // TODO(weny): w/rcus - rcus: 0, - wcus: 0, - approximate_bytes: region_server.region_disk_usage(stat.region_id).unwrap_or(0), - // TODO(weny): add extensions - extensions: Default::default(), + .map(|stat| { + let region_stat = region_server + .region_statistic(stat.region_id) + .unwrap_or_default(); + let mut extensions = HashMap::new(); + if let Some(serialized) = region_stat.serialize_to_vec() { + extensions.insert(REGION_STATISTIC_KEY.to_string(), serialized); + } + + RegionStat { + region_id: stat.region_id.as_u64(), + engine: stat.engine, + role: RegionRole::from(stat.role).into(), + // TODO(weny): w/rcus + rcus: 0, + wcus: 0, + approximate_bytes: region_stat.estimated_disk_size() as i64, + extensions, + } }) .collect() } diff --git a/src/datanode/src/region_server.rs b/src/datanode/src/region_server.rs index 4fcdfb86af..aa80f52a5c 100644 --- a/src/datanode/src/region_server.rs +++ b/src/datanode/src/region_server.rs @@ -54,7 +54,7 @@ use snafu::{ensure, OptionExt, ResultExt}; use store_api::metric_engine_consts::{ FILE_ENGINE_NAME, LOGICAL_TABLE_METADATA_KEY, METRIC_ENGINE_NAME, }; -use store_api::region_engine::{RegionEngineRef, RegionRole, SetReadonlyResponse}; +use store_api::region_engine::{RegionEngineRef, RegionRole, RegionStatistic, SetReadonlyResponse}; use store_api::region_request::{ AffectedRows, RegionCloseRequest, RegionOpenRequest, RegionRequest, }; @@ -312,9 +312,9 @@ impl RegionServer { self.inner.runtime.clone() } - pub fn region_disk_usage(&self, region_id: RegionId) -> Option { + pub fn region_statistic(&self, region_id: RegionId) -> Option { match self.inner.region_map.get(®ion_id) { - Some(e) => e.region_disk_usage(region_id), + Some(e) => e.region_statistic(region_id), None => None, } } diff --git a/src/datanode/src/tests.rs b/src/datanode/src/tests.rs index 8966dc4932..35f513bc83 100644 --- a/src/datanode/src/tests.rs +++ b/src/datanode/src/tests.rs @@ -31,7 +31,9 @@ use query::query_engine::{DescribeResult, QueryEngineState}; use query::{QueryEngine, QueryEngineContext}; use session::context::QueryContextRef; use store_api::metadata::RegionMetadataRef; -use store_api::region_engine::{RegionEngine, RegionRole, RegionScannerRef, SetReadonlyResponse}; +use store_api::region_engine::{ + RegionEngine, RegionRole, RegionScannerRef, RegionStatistic, SetReadonlyResponse, +}; use store_api::region_request::{AffectedRows, RegionRequest}; use store_api::storage::{RegionId, ScanRequest}; use table::TableRef; @@ -210,7 +212,7 @@ impl RegionEngine for MockRegionEngine { unimplemented!() } - fn region_disk_usage(&self, _region_id: RegionId) -> Option { + fn region_statistic(&self, _region_id: RegionId) -> Option { unimplemented!() } diff --git a/src/file-engine/src/engine.rs b/src/file-engine/src/engine.rs index f8cbbe7ddc..32e1a1d58d 100644 --- a/src/file-engine/src/engine.rs +++ b/src/file-engine/src/engine.rs @@ -26,7 +26,8 @@ use object_store::ObjectStore; use snafu::{ensure, OptionExt}; use store_api::metadata::RegionMetadataRef; use store_api::region_engine::{ - RegionEngine, RegionRole, RegionScannerRef, SetReadonlyResponse, SinglePartitionScanner, + RegionEngine, RegionRole, RegionScannerRef, RegionStatistic, SetReadonlyResponse, + SinglePartitionScanner, }; use store_api::region_request::{ AffectedRows, RegionCloseRequest, RegionCreateRequest, RegionDropRequest, RegionOpenRequest, @@ -108,7 +109,7 @@ impl RegionEngine for FileRegionEngine { self.inner.stop().await.map_err(BoxedError::new) } - fn region_disk_usage(&self, _: RegionId) -> Option { + fn region_statistic(&self, _: RegionId) -> Option { None } diff --git a/src/meta-client/src/client.rs b/src/meta-client/src/client.rs index d5f7c6aaaf..1c5b96a684 100644 --- a/src/meta-client/src/client.rs +++ b/src/meta-client/src/client.rs @@ -29,6 +29,7 @@ use common_grpc::channel_manager::{ChannelConfig, ChannelManager}; use common_meta::cluster::{ ClusterInfo, MetasrvStatus, NodeInfo, NodeInfoKey, NodeStatus, Role as ClusterRole, }; +use common_meta::datanode::{DatanodeStatKey, DatanodeStatValue, RegionStat}; use common_meta::ddl::{ExecutorContext, ProcedureExecutor}; use common_meta::error::{self as meta_error, Result as MetaResult}; use common_meta::rpc::ddl::{SubmitDdlTaskRequest, SubmitDdlTaskResponse}; @@ -327,6 +328,28 @@ impl ClusterInfo for MetaClient { Ok(nodes) } + + async fn list_region_stats(&self) -> Result> { + let cluster_client = self.cluster_client()?; + let range_prefix = DatanodeStatKey::key_prefix_with_cluster_id(self.id.0); + let req = RangeRequest::new().with_prefix(range_prefix); + let mut datanode_stats = cluster_client + .range(req) + .await? + .kvs + .into_iter() + .map(|kv| DatanodeStatValue::try_from(kv.value).context(ConvertMetaRequestSnafu)) + .collect::>>()?; + let region_stats = datanode_stats + .iter_mut() + .flat_map(|datanode_stat| { + let last = datanode_stat.stats.pop(); + last.map(|stat| stat.region_stats).unwrap_or_default() + }) + .collect::>(); + + Ok(region_stats) + } } impl MetaClient { diff --git a/src/meta-srv/src/cluster.rs b/src/meta-srv/src/cluster.rs index ebad9e3e6b..9a6cecbd36 100644 --- a/src/meta-srv/src/cluster.rs +++ b/src/meta-srv/src/cluster.rs @@ -23,6 +23,7 @@ use api::v1::meta::{ RangeRequest as PbRangeRequest, RangeResponse as PbRangeResponse, ResponseHeader, }; use common_grpc::channel_manager::ChannelManager; +use common_meta::datanode::{DatanodeStatKey, DatanodeStatValue}; use common_meta::kv_backend::{KvBackend, ResettableKvBackendRef, TxnService}; use common_meta::rpc::store::{ BatchDeleteRequest, BatchDeleteResponse, BatchGetRequest, BatchGetResponse, BatchPutRequest, @@ -37,7 +38,6 @@ use snafu::{ensure, OptionExt, ResultExt}; use crate::error; use crate::error::{match_for_io_error, Result}; -use crate::key::{DatanodeStatKey, DatanodeStatValue}; use crate::metasrv::ElectionRef; pub type MetaPeerClientRef = Arc; @@ -217,7 +217,11 @@ impl MetaPeerClient { pub async fn get_node_cnt(&self) -> Result { let kvs = self.get_dn_key_value(true).await?; kvs.into_iter() - .map(|kv| kv.key.try_into()) + .map(|kv| { + kv.key + .try_into() + .context(error::InvalidDatanodeStatFormatSnafu {}) + }) .collect::>>() .map(|hash_set| hash_set.len() as i32) } @@ -319,7 +323,14 @@ impl MetaPeerClient { fn to_stat_kv_map(kvs: Vec) -> Result> { let mut map = HashMap::with_capacity(kvs.len()); for kv in kvs { - let _ = map.insert(kv.key.try_into()?, kv.value.try_into()?); + let _ = map.insert( + kv.key + .try_into() + .context(error::InvalidDatanodeStatFormatSnafu {})?, + kv.value + .try_into() + .context(error::InvalidDatanodeStatFormatSnafu {})?, + ); } Ok(map) } @@ -356,12 +367,11 @@ fn need_retry(error: &error::Error) -> bool { #[cfg(test)] mod tests { use api::v1::meta::{Error, ErrorCode, ResponseHeader}; + use common_meta::datanode::{DatanodeStatKey, DatanodeStatValue, Stat}; use common_meta::rpc::KeyValue; use super::{check_resp_header, to_stat_kv_map, Context}; use crate::error; - use crate::handler::node_stat::Stat; - use crate::key::{DatanodeStatKey, DatanodeStatValue}; #[test] fn test_to_stat_kv_map() { diff --git a/src/meta-srv/src/error.rs b/src/meta-srv/src/error.rs index 728f326871..7c8d17cbe0 100644 --- a/src/meta-srv/src/error.rs +++ b/src/meta-srv/src/error.rs @@ -285,22 +285,6 @@ pub enum Error { location: Location, }, - #[snafu(display("Failed to parse stat key from utf8"))] - StatKeyFromUtf8 { - #[snafu(source)] - error: std::string::FromUtf8Error, - #[snafu(implicit)] - location: Location, - }, - - #[snafu(display("Failed to parse stat value from utf8"))] - StatValueFromUtf8 { - #[snafu(source)] - error: std::string::FromUtf8Error, - #[snafu(implicit)] - location: Location, - }, - #[snafu(display("Failed to parse invalid region key from utf8"))] InvalidRegionKeyFromUtf8 { #[snafu(source)] @@ -719,6 +703,13 @@ pub enum Error { source: common_meta::error::Error, }, + #[snafu(display("Invalid datanode stat format"))] + InvalidDatanodeStatFormat { + #[snafu(implicit)] + location: Location, + source: common_meta::error::Error, + }, + #[snafu(display("Failed to serialize options to TOML"))] TomlFormat { #[snafu(implicit)] @@ -815,8 +806,6 @@ impl ErrorExt for Error { | Error::TomlFormat { .. } => StatusCode::InvalidArguments, Error::LeaseKeyFromUtf8 { .. } | Error::LeaseValueFromUtf8 { .. } - | Error::StatKeyFromUtf8 { .. } - | Error::StatValueFromUtf8 { .. } | Error::InvalidRegionKeyFromUtf8 { .. } | Error::TableRouteNotFound { .. } | Error::TableInfoNotFound { .. } @@ -830,7 +819,8 @@ impl ErrorExt for Error { | Error::MigrationRunning { .. } => StatusCode::Unexpected, Error::TableNotFound { .. } => StatusCode::TableNotFound, Error::SaveClusterInfo { source, .. } - | Error::InvalidClusterInfoFormat { source, .. } => source.status_code(), + | Error::InvalidClusterInfoFormat { source, .. } + | Error::InvalidDatanodeStatFormat { source, .. } => source.status_code(), Error::InvalidateTableCache { source, .. } => source.status_code(), Error::SubmitProcedure { source, .. } | Error::WaitProcedure { source, .. } diff --git a/src/meta-srv/src/handler.rs b/src/meta-srv/src/handler.rs index d74d49027d..063d3939c1 100644 --- a/src/meta-srv/src/handler.rs +++ b/src/meta-srv/src/handler.rs @@ -22,6 +22,7 @@ use api::v1::meta::{ HeartbeatRequest, HeartbeatResponse, MailboxMessage, RegionLease, RequestHeader, ResponseHeader, Role, PROTOCOL_VERSION, }; +use common_meta::datanode::Stat; use common_meta::instruction::{Instruction, InstructionReply}; use common_meta::sequence::Sequence; use common_telemetry::{debug, info, warn}; @@ -32,7 +33,6 @@ use store_api::storage::RegionId; use tokio::sync::mpsc::Sender; use tokio::sync::{oneshot, Notify, RwLock}; -use self::node_stat::Stat; use crate::error::{self, DeserializeFromJsonSnafu, Result, UnexpectedInstructionReplySnafu}; use crate::metasrv::Context; use crate::metrics::{METRIC_META_HANDLER_EXECUTE, METRIC_META_HEARTBEAT_CONNECTION_NUM}; @@ -48,7 +48,6 @@ pub mod failure_handler; pub mod filter_inactive_region_stats; pub mod keep_lease_handler; pub mod mailbox_handler; -pub mod node_stat; pub mod on_leader_start_handler; pub mod publish_heartbeat_handler; pub mod region_lease_handler; diff --git a/src/meta-srv/src/handler/collect_stats_handler.rs b/src/meta-srv/src/handler/collect_stats_handler.rs index ec9fa231e1..b399b961ee 100644 --- a/src/meta-srv/src/handler/collect_stats_handler.rs +++ b/src/meta-srv/src/handler/collect_stats_handler.rs @@ -15,6 +15,7 @@ use std::cmp::Ordering; use api::v1::meta::{HeartbeatRequest, Role}; +use common_meta::datanode::{DatanodeStatKey, DatanodeStatValue, Stat}; use common_meta::instruction::CacheIdent; use common_meta::key::node_address::{NodeAddressKey, NodeAddressValue}; use common_meta::key::{MetadataKey, MetadataValue}; @@ -25,9 +26,7 @@ use dashmap::DashMap; use snafu::ResultExt; use crate::error::{self, Result}; -use crate::handler::node_stat::Stat; use crate::handler::{HandleControl, HeartbeatAccumulator, HeartbeatHandler}; -use crate::key::{DatanodeStatKey, DatanodeStatValue}; use crate::metasrv::Context; const MAX_CACHED_STATS_PER_KEY: usize = 10; @@ -138,7 +137,8 @@ impl HeartbeatHandler for CollectStatsHandler { let value: Vec = DatanodeStatValue { stats: epoch_stats.drain_all(), } - .try_into()?; + .try_into() + .context(error::InvalidDatanodeStatFormatSnafu {})?; let put = PutRequest { key, value, @@ -198,6 +198,7 @@ mod tests { use std::sync::Arc; use common_meta::cache_invalidator::DummyCacheInvalidator; + use common_meta::datanode::DatanodeStatKey; use common_meta::key::TableMetadataManager; use common_meta::kv_backend::memory::MemoryKvBackend; use common_meta::sequence::SequenceBuilder; @@ -205,7 +206,6 @@ mod tests { use super::*; use crate::cluster::MetaPeerClientBuilder; use crate::handler::{HeartbeatMailbox, Pushers}; - use crate::key::DatanodeStatKey; use crate::service::store::cached_kv::LeaderCachedKvBackend; #[tokio::test] diff --git a/src/meta-srv/src/handler/extract_stat_handler.rs b/src/meta-srv/src/handler/extract_stat_handler.rs index c23e78314f..b31f41e4ef 100644 --- a/src/meta-srv/src/handler/extract_stat_handler.rs +++ b/src/meta-srv/src/handler/extract_stat_handler.rs @@ -13,9 +13,9 @@ // limitations under the License. use api::v1::meta::{HeartbeatRequest, Role}; +use common_meta::datanode::Stat; use common_telemetry::{info, warn}; -use super::node_stat::Stat; use crate::error::Result; use crate::handler::{HandleControl, HeartbeatAccumulator, HeartbeatHandler}; use crate::metasrv::Context; diff --git a/src/meta-srv/src/handler/failure_handler.rs b/src/meta-srv/src/handler/failure_handler.rs index ebeeaf6b7f..02f423c4b4 100644 --- a/src/meta-srv/src/handler/failure_handler.rs +++ b/src/meta-srv/src/handler/failure_handler.rs @@ -64,12 +64,12 @@ impl HeartbeatHandler for RegionFailureHandler { mod tests { use api::v1::meta::HeartbeatRequest; use common_catalog::consts::default_engine; + use common_meta::datanode::{RegionStat, Stat}; use store_api::region_engine::RegionRole; use store_api::storage::RegionId; use tokio::sync::oneshot; use crate::handler::failure_handler::RegionFailureHandler; - use crate::handler::node_stat::{RegionStat, Stat}; use crate::handler::{HeartbeatAccumulator, HeartbeatHandler}; use crate::metasrv::builder::MetasrvBuilder; use crate::region::supervisor::tests::new_test_supervisor; @@ -93,7 +93,9 @@ mod tests { approximate_bytes: 0, engine: default_engine().to_string(), role: RegionRole::Follower, - extensions: Default::default(), + memtable_size: 0, + manifest_size: 0, + sst_size: 0, } } acc.stat = Some(Stat { diff --git a/src/meta-srv/src/handler/node_stat.rs b/src/meta-srv/src/handler/node_stat.rs deleted file mode 100644 index 3a6c6355cd..0000000000 --- a/src/meta-srv/src/handler/node_stat.rs +++ /dev/null @@ -1,170 +0,0 @@ -// Copyright 2023 Greptime Team -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -use std::collections::{HashMap, HashSet}; - -use api::v1::meta::{HeartbeatRequest, RequestHeader}; -use common_meta::ClusterId; -use common_time::util as time_util; -use serde::{Deserialize, Serialize}; -use store_api::region_engine::RegionRole; -use store_api::storage::RegionId; -use table::metadata::TableId; - -use crate::key::DatanodeStatKey; - -#[derive(Debug, Clone, Default, Serialize, Deserialize)] -pub struct Stat { - pub timestamp_millis: i64, - pub cluster_id: ClusterId, - // The datanode Id. - pub id: u64, - // The datanode address. - pub addr: String, - /// The read capacity units during this period - pub rcus: i64, - /// The write capacity units during this period - pub wcus: i64, - /// How many regions on this node - pub region_num: u64, - pub region_stats: Vec, - // The node epoch is used to check whether the node has restarted or redeployed. - pub node_epoch: u64, -} - -#[derive(Debug, Clone, Serialize, Deserialize)] -pub struct RegionStat { - /// The region_id. - pub id: RegionId, - /// The read capacity units during this period - pub rcus: i64, - /// The write capacity units during this period - pub wcus: i64, - /// Approximate bytes of this region - pub approximate_bytes: i64, - /// The engine name. - pub engine: String, - /// The region role. - pub role: RegionRole, - /// The extension info of this region - pub extensions: HashMap>, -} - -impl Stat { - #[inline] - pub fn is_empty(&self) -> bool { - self.region_stats.is_empty() - } - - pub fn stat_key(&self) -> DatanodeStatKey { - DatanodeStatKey { - cluster_id: self.cluster_id, - node_id: self.id, - } - } - - /// Returns a tuple array containing [RegionId] and [RegionRole]. - pub fn regions(&self) -> Vec<(RegionId, RegionRole)> { - self.region_stats.iter().map(|s| (s.id, s.role)).collect() - } - - /// Returns all table ids in the region stats. - pub fn table_ids(&self) -> HashSet { - self.region_stats.iter().map(|s| s.id.table_id()).collect() - } - - pub fn retain_active_region_stats(&mut self, inactive_region_ids: &HashSet) { - if inactive_region_ids.is_empty() { - return; - } - - self.region_stats - .retain(|r| !inactive_region_ids.contains(&r.id)); - self.rcus = self.region_stats.iter().map(|s| s.rcus).sum(); - self.wcus = self.region_stats.iter().map(|s| s.wcus).sum(); - self.region_num = self.region_stats.len() as u64; - } -} - -impl TryFrom<&HeartbeatRequest> for Stat { - type Error = Option; - - fn try_from(value: &HeartbeatRequest) -> Result { - let HeartbeatRequest { - header, - peer, - region_stats, - node_epoch, - .. - } = value; - - match (header, peer) { - (Some(header), Some(peer)) => { - let region_stats = region_stats - .iter() - .map(RegionStat::from) - .collect::>(); - - Ok(Self { - timestamp_millis: time_util::current_time_millis(), - cluster_id: header.cluster_id, - // datanode id - id: peer.id, - // datanode address - addr: peer.addr.clone(), - rcus: region_stats.iter().map(|s| s.rcus).sum(), - wcus: region_stats.iter().map(|s| s.wcus).sum(), - region_num: region_stats.len() as u64, - region_stats, - node_epoch: *node_epoch, - }) - } - (header, _) => Err(header.clone()), - } - } -} - -impl From<&api::v1::meta::RegionStat> for RegionStat { - fn from(value: &api::v1::meta::RegionStat) -> Self { - Self { - id: RegionId::from_u64(value.region_id), - rcus: value.rcus, - wcus: value.wcus, - approximate_bytes: value.approximate_bytes, - engine: value.engine.to_string(), - role: RegionRole::from(value.role()), - extensions: value.extensions.clone(), - } - } -} - -#[cfg(test)] -mod tests { - use crate::handler::node_stat::Stat; - - #[test] - fn test_stat_key() { - let stat = Stat { - cluster_id: 3, - id: 101, - region_num: 10, - ..Default::default() - }; - - let stat_key = stat.stat_key(); - - assert_eq!(3, stat_key.cluster_id); - assert_eq!(101, stat_key.node_id); - } -} diff --git a/src/meta-srv/src/handler/region_lease_handler.rs b/src/meta-srv/src/handler/region_lease_handler.rs index 28ddb436e0..06cf818d23 100644 --- a/src/meta-srv/src/handler/region_lease_handler.rs +++ b/src/meta-srv/src/handler/region_lease_handler.rs @@ -103,6 +103,7 @@ mod test { use std::collections::{HashMap, HashSet}; use std::sync::Arc; + use common_meta::datanode::{RegionStat, Stat}; use common_meta::distributed_time_constants; use common_meta::key::table_route::TableRouteValue; use common_meta::key::test_utils::new_test_table_info; @@ -115,7 +116,6 @@ mod test { use store_api::storage::RegionId; use super::*; - use crate::handler::node_stat::{RegionStat, Stat}; use crate::metasrv::builder::MetasrvBuilder; fn new_test_keeper() -> RegionLeaseKeeper { @@ -135,7 +135,9 @@ mod test { wcus: 0, approximate_bytes: 0, engine: String::new(), - extensions: Default::default(), + memtable_size: 0, + manifest_size: 0, + sst_size: 0, } } diff --git a/src/meta-srv/src/key/datanode.rs b/src/meta-srv/src/key/datanode.rs index 4fe55685b2..1c4583c233 100644 --- a/src/meta-srv/src/key/datanode.rs +++ b/src/meta-srv/src/key/datanode.rs @@ -14,6 +14,7 @@ use std::str::FromStr; +use common_meta::datanode::DatanodeStatKey; use common_meta::ClusterId; use lazy_static::lazy_static; use regex::Regex; @@ -22,7 +23,6 @@ use snafu::{ensure, OptionExt, ResultExt}; use crate::error; use crate::error::Result; -use crate::handler::node_stat::Stat; pub(crate) const DATANODE_LEASE_PREFIX: &str = "__meta_datanode_lease"; const INACTIVE_REGION_PREFIX: &str = "__meta_inactive_region"; @@ -52,18 +52,6 @@ impl DatanodeLeaseKey { } } -#[derive(Debug, Clone, Copy, Eq, PartialEq, Hash)] -pub struct DatanodeStatKey { - pub cluster_id: ClusterId, - pub node_id: u64, -} - -impl DatanodeStatKey { - pub fn prefix_key() -> Vec { - format!("{DATANODE_STAT_PREFIX}-").into_bytes() - } -} - impl From<&DatanodeLeaseKey> for DatanodeStatKey { fn from(lease_key: &DatanodeLeaseKey) -> Self { DatanodeStatKey { @@ -73,100 +61,6 @@ impl From<&DatanodeLeaseKey> for DatanodeStatKey { } } -impl From for Vec { - fn from(value: DatanodeStatKey) -> Self { - format!( - "{}-{}-{}", - DATANODE_STAT_PREFIX, value.cluster_id, value.node_id - ) - .into_bytes() - } -} - -impl FromStr for DatanodeStatKey { - type Err = error::Error; - - fn from_str(key: &str) -> Result { - let caps = DATANODE_STAT_KEY_PATTERN - .captures(key) - .context(error::InvalidStatKeySnafu { key })?; - - ensure!(caps.len() == 3, error::InvalidStatKeySnafu { key }); - - let cluster_id = caps[1].to_string(); - let node_id = caps[2].to_string(); - let cluster_id: u64 = cluster_id.parse().context(error::ParseNumSnafu { - err_msg: format!("invalid cluster_id: {cluster_id}"), - })?; - let node_id: u64 = node_id.parse().context(error::ParseNumSnafu { - err_msg: format!("invalid node_id: {node_id}"), - })?; - - Ok(Self { - cluster_id, - node_id, - }) - } -} - -impl TryFrom> for DatanodeStatKey { - type Error = error::Error; - - fn try_from(bytes: Vec) -> Result { - String::from_utf8(bytes) - .context(error::StatKeyFromUtf8Snafu {}) - .map(|x| x.parse())? - } -} - -#[derive(Debug, Clone, Serialize, Deserialize)] -#[serde(transparent)] -pub struct DatanodeStatValue { - pub stats: Vec, -} - -impl DatanodeStatValue { - /// Get the latest number of regions. - pub fn region_num(&self) -> Option { - self.stats.last().map(|x| x.region_num) - } - - /// Get the latest node addr. - pub fn node_addr(&self) -> Option { - self.stats.last().map(|x| x.addr.clone()) - } -} - -impl TryFrom for Vec { - type Error = error::Error; - - fn try_from(stats: DatanodeStatValue) -> Result { - Ok(serde_json::to_string(&stats) - .context(error::SerializeToJsonSnafu { - input: format!("{stats:?}"), - })? - .into_bytes()) - } -} - -impl FromStr for DatanodeStatValue { - type Err = error::Error; - - fn from_str(value: &str) -> Result { - serde_json::from_str(value).context(error::DeserializeFromJsonSnafu { input: value }) - } -} - -impl TryFrom> for DatanodeStatValue { - type Error = error::Error; - - fn try_from(value: Vec) -> Result { - String::from_utf8(value) - .context(error::StatValueFromUtf8Snafu {}) - .map(|x| x.parse())? - } -} - #[derive(Debug, Clone, Copy, Eq, PartialEq, Hash, Serialize, Deserialize)] pub struct InactiveRegionKey { pub cluster_id: ClusterId, @@ -253,29 +147,6 @@ mod tests { assert_eq!(1, new_key.node_id); } - #[test] - fn test_stat_val_round_trip() { - let stat = Stat { - cluster_id: 0, - id: 101, - region_num: 100, - ..Default::default() - }; - - let stat_val = DatanodeStatValue { stats: vec![stat] }; - - let bytes: Vec = stat_val.try_into().unwrap(); - let stat_val: DatanodeStatValue = bytes.try_into().unwrap(); - let stats = stat_val.stats; - - assert_eq!(1, stats.len()); - - let stat = stats.first().unwrap(); - assert_eq!(0, stat.cluster_id); - assert_eq!(101, stat.id); - assert_eq!(100, stat.region_num); - } - #[test] fn test_lease_key_round_trip() { let key = DatanodeLeaseKey { @@ -289,67 +160,6 @@ mod tests { assert_eq!(new_key, key); } - #[test] - fn test_get_addr_from_stat_val() { - let empty = DatanodeStatValue { stats: vec![] }; - let addr = empty.node_addr(); - assert!(addr.is_none()); - - let stat_val = DatanodeStatValue { - stats: vec![ - Stat { - addr: "1".to_string(), - ..Default::default() - }, - Stat { - addr: "2".to_string(), - ..Default::default() - }, - Stat { - addr: "3".to_string(), - ..Default::default() - }, - ], - }; - let addr = stat_val.node_addr().unwrap(); - assert_eq!("3", addr); - } - - #[test] - fn test_get_region_num_from_stat_val() { - let empty = DatanodeStatValue { stats: vec![] }; - let region_num = empty.region_num(); - assert!(region_num.is_none()); - - let wrong = DatanodeStatValue { - stats: vec![Stat { - region_num: 0, - ..Default::default() - }], - }; - let right = wrong.region_num(); - assert_eq!(Some(0), right); - - let stat_val = DatanodeStatValue { - stats: vec![ - Stat { - region_num: 1, - ..Default::default() - }, - Stat { - region_num: 0, - ..Default::default() - }, - Stat { - region_num: 2, - ..Default::default() - }, - ], - }; - let region_num = stat_val.region_num().unwrap(); - assert_eq!(2, region_num); - } - #[test] fn test_lease_key_to_stat_key() { let lease_key = DatanodeLeaseKey { diff --git a/src/meta-srv/src/region/supervisor.rs b/src/meta-srv/src/region/supervisor.rs index 79b305bb80..8367acbb26 100644 --- a/src/meta-srv/src/region/supervisor.rs +++ b/src/meta-srv/src/region/supervisor.rs @@ -16,6 +16,7 @@ use std::fmt::Debug; use std::sync::{Arc, Mutex}; use std::time::Duration; +use common_meta::datanode::Stat; use common_meta::ddl::{DetectingRegion, RegionFailureDetectorController}; use common_meta::key::MAINTENANCE_KEY; use common_meta::kv_backend::KvBackendRef; @@ -32,7 +33,6 @@ use tokio::time::{interval, MissedTickBehavior}; use crate::error::{self, Result}; use crate::failure_detector::PhiAccrualFailureDetectorOptions; -use crate::handler::node_stat::Stat; use crate::metasrv::{SelectorContext, SelectorRef}; use crate::procedure::region_migration::manager::RegionMigrationManagerRef; use crate::procedure::region_migration::RegionMigrationProcedureTask; diff --git a/src/meta-srv/src/selector/load_based.rs b/src/meta-srv/src/selector/load_based.rs index 12dea5854b..d7b650deda 100644 --- a/src/meta-srv/src/selector/load_based.rs +++ b/src/meta-srv/src/selector/load_based.rs @@ -14,6 +14,7 @@ use std::collections::HashMap; +use common_meta::datanode::{DatanodeStatKey, DatanodeStatValue}; use common_meta::key::TableMetadataManager; use common_meta::peer::Peer; use common_meta::rpc::router::find_leaders; @@ -23,7 +24,7 @@ use snafu::ResultExt; use table::metadata::TableId; use crate::error::{self, Result}; -use crate::key::{DatanodeLeaseKey, DatanodeStatKey, DatanodeStatValue, LeaseValue}; +use crate::key::{DatanodeLeaseKey, LeaseValue}; use crate::lease; use crate::metasrv::SelectorContext; use crate::selector::common::choose_peers; @@ -162,7 +163,9 @@ async fn get_leader_peer_ids( mod tests { use std::collections::HashMap; - use crate::key::{DatanodeLeaseKey, DatanodeStatKey, DatanodeStatValue, LeaseValue}; + use common_meta::datanode::{DatanodeStatKey, DatanodeStatValue}; + + use crate::key::{DatanodeLeaseKey, LeaseValue}; use crate::selector::load_based::filter_out_expired_datanode; #[test] diff --git a/src/meta-srv/src/selector/weight_compute.rs b/src/meta-srv/src/selector/weight_compute.rs index c8c555d204..09d8833e2e 100644 --- a/src/meta-srv/src/selector/weight_compute.rs +++ b/src/meta-srv/src/selector/weight_compute.rs @@ -14,10 +14,10 @@ use std::collections::HashMap; +use common_meta::datanode::{DatanodeStatKey, DatanodeStatValue}; use common_meta::peer::Peer; use itertools::{Itertools, MinMaxResult}; -use crate::key::{DatanodeStatKey, DatanodeStatValue}; use crate::selector::weighted_choose::WeightedItem; /// The [`WeightCompute`] trait is used to compute the weight array by heartbeats. @@ -95,13 +95,12 @@ impl WeightCompute for RegionNumsBasedWeightCompute { mod tests { use std::collections::HashMap; + use common_meta::datanode::{DatanodeStatKey, DatanodeStatValue, RegionStat, Stat}; use common_meta::peer::Peer; use store_api::region_engine::RegionRole; use store_api::storage::RegionId; use super::{RegionNumsBasedWeightCompute, WeightCompute}; - use crate::handler::node_stat::{RegionStat, Stat}; - use crate::key::{DatanodeStatKey, DatanodeStatValue}; #[test] fn test_weight_compute() { @@ -199,7 +198,9 @@ mod tests { approximate_bytes: 1, engine: "mito2".to_string(), role: RegionRole::Leader, - extensions: Default::default(), + memtable_size: 0, + manifest_size: 0, + sst_size: 0, }], ..Default::default() } @@ -216,7 +217,9 @@ mod tests { approximate_bytes: 1, engine: "mito2".to_string(), role: RegionRole::Leader, - extensions: Default::default(), + memtable_size: 0, + manifest_size: 0, + sst_size: 0, }], ..Default::default() } @@ -233,7 +236,9 @@ mod tests { approximate_bytes: 1, engine: "mito2".to_string(), role: RegionRole::Leader, - extensions: Default::default(), + memtable_size: 0, + manifest_size: 0, + sst_size: 0, }], ..Default::default() } diff --git a/src/meta-srv/src/service/admin/heartbeat.rs b/src/meta-srv/src/service/admin/heartbeat.rs index aa4b7cc96d..078d6cdc77 100644 --- a/src/meta-srv/src/service/admin/heartbeat.rs +++ b/src/meta-srv/src/service/admin/heartbeat.rs @@ -14,13 +14,13 @@ use std::collections::HashMap; +use common_meta::datanode::DatanodeStatValue; use serde::{Deserialize, Serialize}; use snafu::ResultExt; use tonic::codegen::http; use crate::cluster::MetaPeerClientRef; use crate::error::{self, Result}; -use crate::key::DatanodeStatValue; use crate::service::admin::{util, HttpHandler}; #[derive(Clone)] @@ -85,8 +85,8 @@ fn filter_by_addr(stat_vals: Vec, addr: &str) -> Vec Option { + fn region_statistic(&self, region_id: RegionId) -> Option { if self.inner.is_physical_region(region_id) { - self.inner.mito.region_disk_usage(region_id) + self.inner.mito.region_statistic(region_id) } else { None } @@ -377,7 +379,7 @@ mod test { let logical_region_id = env.default_logical_region_id(); let physical_region_id = env.default_physical_region_id(); - assert!(env.metric().region_disk_usage(logical_region_id).is_none()); - assert!(env.metric().region_disk_usage(physical_region_id).is_some()); + assert!(env.metric().region_statistic(logical_region_id).is_none()); + assert!(env.metric().region_statistic(physical_region_id).is_some()); } } diff --git a/src/mito2/src/engine.rs b/src/mito2/src/engine.rs index c0655eb690..e9177d40bf 100644 --- a/src/mito2/src/engine.rs +++ b/src/mito2/src/engine.rs @@ -76,7 +76,8 @@ use store_api::logstore::provider::Provider; use store_api::logstore::LogStore; use store_api::metadata::RegionMetadataRef; use store_api::region_engine::{ - BatchResponses, RegionEngine, RegionRole, RegionScannerRef, SetReadonlyResponse, + BatchResponses, RegionEngine, RegionRole, RegionScannerRef, RegionStatistic, + SetReadonlyResponse, }; use store_api::region_request::{AffectedRows, RegionOpenRequest, RegionRequest}; use store_api::storage::{RegionId, ScanRequest}; @@ -89,7 +90,6 @@ use crate::error::{ use crate::manifest::action::RegionEdit; use crate::metrics::HANDLE_REQUEST_ELAPSED; use crate::read::scan_region::{ScanParallism, ScanRegion, Scanner}; -use crate::region::RegionUsage; use crate::request::{RegionEditRequest, WorkerRequest}; use crate::wal::entry_distributor::{ build_wal_entry_distributor_and_receivers, DEFAULT_ENTRY_RECEIVER_BUFFER_SIZE, @@ -133,15 +133,12 @@ impl MitoEngine { self.inner.workers.is_region_opening(region_id) } - /// Returns the region disk/memory usage information. - pub fn get_region_usage(&self, region_id: RegionId) -> Result { - let region = self - .inner + /// Returns the region disk/memory statistic. + pub fn get_region_statistic(&self, region_id: RegionId) -> Option { + self.inner .workers .get_region(region_id) - .context(RegionNotFoundSnafu { region_id })?; - - Ok(region.region_usage()) + .map(|region| region.region_statistic()) } /// Handle substrait query and return a stream of record batches @@ -546,12 +543,8 @@ impl RegionEngine for MitoEngine { self.inner.stop().await.map_err(BoxedError::new) } - fn region_disk_usage(&self, region_id: RegionId) -> Option { - let size = self - .get_region_usage(region_id) - .map(|usage| usage.disk_usage()) - .ok()?; - size.try_into().ok() + fn region_statistic(&self, region_id: RegionId) -> Option { + self.get_region_statistic(region_id) } fn set_writable(&self, region_id: RegionId, writable: bool) -> Result<(), BoxedError> { diff --git a/src/mito2/src/engine/basic_test.rs b/src/mito2/src/engine/basic_test.rs index 9179d8a074..533b6a2ea1 100644 --- a/src/mito2/src/engine/basic_test.rs +++ b/src/mito2/src/engine/basic_test.rs @@ -552,8 +552,8 @@ async fn test_region_usage() { .unwrap(); // region is empty now, check manifest size let region = engine.get_region(region_id).unwrap(); - let region_stat = region.region_usage(); - assert_eq!(region_stat.manifest_usage, 686); + let region_stat = region.region_statistic(); + assert_eq!(region_stat.manifest_size, 686); // put some rows let rows = Rows { @@ -563,8 +563,8 @@ async fn test_region_usage() { put_rows(&engine, region_id, rows).await; - let region_stat = region.region_usage(); - assert!(region_stat.wal_usage > 0); + let region_stat = region.region_statistic(); + assert!(region_stat.wal_size > 0); // delete some rows let rows = Rows { @@ -573,18 +573,18 @@ async fn test_region_usage() { }; delete_rows(&engine, region_id, rows).await; - let region_stat = region.region_usage(); - assert!(region_stat.wal_usage > 0); + let region_stat = region.region_statistic(); + assert!(region_stat.wal_size > 0); // flush region flush_region(&engine, region_id, None).await; - let region_stat = region.region_usage(); - assert_eq!(region_stat.sst_usage, 3010); + let region_stat = region.region_statistic(); + assert_eq!(region_stat.sst_size, 3010); // region total usage // Some memtables may share items. - assert!(region_stat.disk_usage() >= 4028); + assert!(region_stat.estimated_disk_size() >= 4028); } #[tokio::test] diff --git a/src/mito2/src/region.rs b/src/mito2/src/region.rs index e2a4801643..8fc9095ae5 100644 --- a/src/mito2/src/region.rs +++ b/src/mito2/src/region.rs @@ -28,6 +28,7 @@ use crossbeam_utils::atomic::AtomicCell; use snafu::{ensure, OptionExt}; use store_api::logstore::provider::Provider; use store_api::metadata::RegionMetadataRef; +use store_api::region_engine::RegionStatistic; use store_api::storage::RegionId; use crate::access_layer::AccessLayerRef; @@ -252,10 +253,8 @@ impl MitoRegion { } } - /// Returns the region usage in bytes. - pub(crate) fn region_usage(&self) -> RegionUsage { - let region_id = self.region_id; - + /// Returns the region statistic. + pub(crate) fn region_statistic(&self) -> RegionStatistic { let version = self.version(); let memtables = &version.memtables; let memtable_usage = (memtables.mutable_usage() + memtables.immutables_usage()) as u64; @@ -265,11 +264,11 @@ impl MitoRegion { let wal_usage = self.estimated_wal_usage(memtable_usage); let manifest_usage = self.stats.total_manifest_size(); - RegionUsage { - region_id, - wal_usage, - sst_usage, - manifest_usage, + RegionStatistic { + memtable_size: memtable_usage, + wal_size: wal_usage, + manifest_size: manifest_usage, + sst_size: sst_usage, } } diff --git a/src/query/src/optimizer/test_util.rs b/src/query/src/optimizer/test_util.rs index 773270351f..6b14a5af52 100644 --- a/src/query/src/optimizer/test_util.rs +++ b/src/query/src/optimizer/test_util.rs @@ -27,7 +27,9 @@ use datatypes::schema::ColumnSchema; use store_api::metadata::{ ColumnMetadata, RegionMetadata, RegionMetadataBuilder, RegionMetadataRef, }; -use store_api::region_engine::{RegionEngine, RegionRole, RegionScannerRef, SetReadonlyResponse}; +use store_api::region_engine::{ + RegionEngine, RegionRole, RegionScannerRef, RegionStatistic, SetReadonlyResponse, +}; use store_api::region_request::RegionRequest; use store_api::storage::{ConcreteDataType, RegionId, ScanRequest}; @@ -79,7 +81,7 @@ impl RegionEngine for MetaRegionEngine { }) } - fn region_disk_usage(&self, _region_id: RegionId) -> Option { + fn region_statistic(&self, _region_id: RegionId) -> Option { None } diff --git a/src/store-api/src/region_engine.rs b/src/store-api/src/region_engine.rs index 84555a595b..483d3cc1ad 100644 --- a/src/store-api/src/region_engine.rs +++ b/src/store-api/src/region_engine.rs @@ -242,6 +242,42 @@ pub type RegionScannerRef = Box; pub type BatchResponses = Vec<(RegionId, Result)>; +/// Represents the statistics of a region. +#[derive(Debug, Deserialize, Serialize, Default)] +pub struct RegionStatistic { + /// The size of memtable in bytes. + pub memtable_size: u64, + /// The size of WAL in bytes. + pub wal_size: u64, + /// The size of manifest in bytes. + pub manifest_size: u64, + /// The size of SST files in bytes. + pub sst_size: u64, +} + +impl RegionStatistic { + /// Deserializes the region statistic to a byte array. + /// + /// Returns None if the deserialization fails. + pub fn deserialize_from_slice(value: &[u8]) -> Option { + serde_json::from_slice(value).ok() + } + + /// Serializes the region statistic to a byte array. + /// + /// Returns None if the serialization fails. + pub fn serialize_to_vec(&self) -> Option> { + serde_json::to_vec(self).ok() + } +} + +impl RegionStatistic { + /// Returns the estimated disk size of the region. + pub fn estimated_disk_size(&self) -> u64 { + self.wal_size + self.sst_size + self.manifest_size + } +} + #[async_trait] pub trait RegionEngine: Send + Sync { /// Name of this engine @@ -289,8 +325,8 @@ pub trait RegionEngine: Send + Sync { /// Retrieves region's metadata. async fn get_metadata(&self, region_id: RegionId) -> Result; - /// Retrieves region's disk usage. - fn region_disk_usage(&self, region_id: RegionId) -> Option; + /// Retrieves region's statistic. + fn region_statistic(&self, region_id: RegionId) -> Option; /// Stops the engine async fn stop(&self) -> Result<(), BoxedError>; diff --git a/tests/cases/standalone/common/show/show_databases_tables.result b/tests/cases/standalone/common/show/show_databases_tables.result index c92a5b33c8..6dc267b61b 100644 --- a/tests/cases/standalone/common/show/show_databases_tables.result +++ b/tests/cases/standalone/common/show/show_databases_tables.result @@ -49,6 +49,7 @@ SHOW TABLES; | profiling | | referential_constraints | | region_peers | +| region_statistics | | routines | | runtime_metrics | | schema_privileges | @@ -96,6 +97,7 @@ SHOW FULL TABLES; | profiling | LOCAL TEMPORARY | | referential_constraints | LOCAL TEMPORARY | | region_peers | LOCAL TEMPORARY | +| region_statistics | LOCAL TEMPORARY | | routines | LOCAL TEMPORARY | | runtime_metrics | LOCAL TEMPORARY | | schema_privileges | LOCAL TEMPORARY | @@ -137,6 +139,7 @@ SHOW TABLE STATUS; |profiling||11|Fixed|0|0|0|0|0|0|0|DATETIME|||utf8_bin|0||| |referential_constraints||11|Fixed|0|0|0|0|0|0|0|DATETIME|||utf8_bin|0||| |region_peers||11|Fixed|0|0|0|0|0|0|0|DATETIME|||utf8_bin|0||| +|region_statistics||11|Fixed|0|0|0|0|0|0|0|DATETIME|||utf8_bin|0||| |routines||11|Fixed|0|0|0|0|0|0|0|DATETIME|||utf8_bin|0||| |runtime_metrics||11|Fixed|0|0|0|0|0|0|0|DATETIME|||utf8_bin|0||| |schema_privileges||11|Fixed|0|0|0|0|0|0|0|DATETIME|||utf8_bin|0||| diff --git a/tests/cases/standalone/common/system/information_schema.result b/tests/cases/standalone/common/system/information_schema.result index 03b65cf390..882607f842 100644 --- a/tests/cases/standalone/common/system/information_schema.result +++ b/tests/cases/standalone/common/system/information_schema.result @@ -36,6 +36,7 @@ order by table_schema, table_name; |greptime|information_schema|profiling|LOCALTEMPORARY|19|0|0|0|0|0||11|Fixed|0|0|0|DATETIME|||utf8_bin|0|||Y| |greptime|information_schema|referential_constraints|LOCALTEMPORARY|20|0|0|0|0|0||11|Fixed|0|0|0|DATETIME|||utf8_bin|0|||Y| |greptime|information_schema|region_peers|LOCALTEMPORARY|29|0|0|0|0|0||11|Fixed|0|0|0|DATETIME|||utf8_bin|0|||Y| +|greptime|information_schema|region_statistics|LOCALTEMPORARY|35|0|0|0|0|0||11|Fixed|0|0|0|DATETIME|||utf8_bin|0|||Y| |greptime|information_schema|routines|LOCALTEMPORARY|21|0|0|0|0|0||11|Fixed|0|0|0|DATETIME|||utf8_bin|0|||Y| |greptime|information_schema|runtime_metrics|LOCALTEMPORARY|27|0|0|0|0|0||11|Fixed|0|0|0|DATETIME|||utf8_bin|0|||Y| |greptime|information_schema|schema_privileges|LOCALTEMPORARY|22|0|0|0|0|0||11|Fixed|0|0|0|DATETIME|||utf8_bin|0|||Y| @@ -298,6 +299,14 @@ select * from information_schema.columns order by table_schema, table_name, colu | greptime | information_schema | region_peers | peer_id | 2 | | | 20 | 0 | | | | | | select,insert | | UInt64 | bigint unsigned | FIELD | | Yes | bigint unsigned | | | | greptime | information_schema | region_peers | region_id | 1 | | | 20 | 0 | | | | | | select,insert | | UInt64 | bigint unsigned | FIELD | | No | bigint unsigned | | | | greptime | information_schema | region_peers | status | 5 | 2147483647 | 2147483647 | | | | utf8 | utf8_bin | | | select,insert | | String | string | FIELD | | Yes | string | | | +| greptime | information_schema | region_statistics | engine | 7 | 2147483647 | 2147483647 | | | | utf8 | utf8_bin | | | select,insert | | String | string | FIELD | | Yes | string | | | +| greptime | information_schema | region_statistics | manifest_size | 5 | | | 20 | 0 | | | | | | select,insert | | UInt64 | bigint unsigned | FIELD | | Yes | bigint unsigned | | | +| greptime | information_schema | region_statistics | memtable_size | 4 | | | 20 | 0 | | | | | | select,insert | | UInt64 | bigint unsigned | FIELD | | Yes | bigint unsigned | | | +| greptime | information_schema | region_statistics | region_id | 1 | | | 20 | 0 | | | | | | select,insert | | UInt64 | bigint unsigned | FIELD | | No | bigint unsigned | | | +| greptime | information_schema | region_statistics | region_number | 3 | | | 10 | 0 | | | | | | select,insert | | UInt32 | int unsigned | FIELD | | No | int unsigned | | | +| greptime | information_schema | region_statistics | region_role | 8 | 2147483647 | 2147483647 | | | | utf8 | utf8_bin | | | select,insert | | String | string | FIELD | | Yes | string | | | +| greptime | information_schema | region_statistics | sst_size | 6 | | | 20 | 0 | | | | | | select,insert | | UInt64 | bigint unsigned | FIELD | | Yes | bigint unsigned | | | +| greptime | information_schema | region_statistics | table_id | 2 | | | 10 | 0 | | | | | | select,insert | | UInt32 | int unsigned | FIELD | | No | int unsigned | | | | greptime | information_schema | routines | character_maximum_length | 7 | | | 19 | 0 | | | | | | select,insert | | Int64 | bigint | FIELD | | No | bigint | | | | greptime | information_schema | routines | character_octet_length | 8 | | | 19 | 0 | | | | | | select,insert | | Int64 | bigint | FIELD | | No | bigint | | | | greptime | information_schema | routines | character_set_client | 29 | 2147483647 | 2147483647 | | | | utf8 | utf8_bin | | | select,insert | | String | string | FIELD | | No | string | | | diff --git a/tests/cases/standalone/common/view/create.result b/tests/cases/standalone/common/view/create.result index 4fd2f59356..06d892d189 100644 --- a/tests/cases/standalone/common/view/create.result +++ b/tests/cases/standalone/common/view/create.result @@ -110,6 +110,7 @@ SELECT * FROM INFORMATION_SCHEMA.TABLES ORDER BY TABLE_NAME, TABLE_TYPE; |greptime|information_schema|profiling|LOCALTEMPORARY|ID|ID|ID|ID|ID|ID||ID|Fixed|ID|ID|ID|DATETIME|||utf8_bin|ID|||Y| |greptime|information_schema|referential_constraints|LOCALTEMPORARY|ID|ID|ID|ID|ID|ID||ID|Fixed|ID|ID|ID|DATETIME|||utf8_bin|ID|||Y| |greptime|information_schema|region_peers|LOCALTEMPORARY|ID|ID|ID|ID|ID|ID||ID|Fixed|ID|ID|ID|DATETIME|||utf8_bin|ID|||Y| +|greptime|information_schema|region_statistics|LOCALTEMPORARY|ID|ID|ID|ID|ID|ID||ID|Fixed|ID|ID|ID|DATETIME|||utf8_bin|ID|||Y| |greptime|information_schema|routines|LOCALTEMPORARY|ID|ID|ID|ID|ID|ID||ID|Fixed|ID|ID|ID|DATETIME|||utf8_bin|ID|||Y| |greptime|information_schema|runtime_metrics|LOCALTEMPORARY|ID|ID|ID|ID|ID|ID||ID|Fixed|ID|ID|ID|DATETIME|||utf8_bin|ID|||Y| |greptime|information_schema|schema_privileges|LOCALTEMPORARY|ID|ID|ID|ID|ID|ID||ID|Fixed|ID|ID|ID|DATETIME|||utf8_bin|ID|||Y|