feat: add region_statistics table (#4771)

* refactor: introduce `region_statistic`

* refactor: move DatanodeStat related structs to common_meta

* chore: add comments

* feat: implement `list_region_stats` for `ClusterInfo` trait

* feat: add `region_statistics` table

* feat: add table_id and region_number fields

* chore: rename unused snafu

* chore: udpate sqlness results

* chore: avoid to print source in error msg

* chore: move `procedure_info` under `greptime` catalog

* chore: apply suggestions from CR

* Update src/common/meta/src/datanode.rs

Co-authored-by: jeremyhi <jiachun_feng@proton.me>

---------

Co-authored-by: jeremyhi <jiachun_feng@proton.me>
This commit is contained in:
Weny Xu
2024-09-27 17:54:52 +08:00
committed by GitHub
parent cc4106cbd2
commit 4045298cb2
39 changed files with 939 additions and 474 deletions

View File

@@ -100,6 +100,9 @@ pub const INFORMATION_SCHEMA_VIEW_TABLE_ID: u32 = 32;
pub const INFORMATION_SCHEMA_FLOW_TABLE_ID: u32 = 33;
/// id for information_schema.procedure_info
pub const INFORMATION_SCHEMA_PROCEDURE_INFO_TABLE_ID: u32 = 34;
/// id for information_schema.region_statistics
pub const INFORMATION_SCHEMA_REGION_STATISTICS_TABLE_ID: u32 = 35;
/// ----- End of information_schema tables -----
/// ----- Begin of pg_catalog tables -----

View File

@@ -20,6 +20,7 @@ use regex::Regex;
use serde::{Deserialize, Serialize};
use snafu::{ensure, OptionExt, ResultExt};
use crate::datanode::RegionStat;
use crate::error::{
DecodeJsonSnafu, EncodeJsonSnafu, Error, FromUtf8Snafu, InvalidNodeInfoKeySnafu,
InvalidRoleSnafu, ParseNumSnafu, Result,
@@ -47,6 +48,9 @@ pub trait ClusterInfo {
role: Option<Role>,
) -> std::result::Result<Vec<NodeInfo>, Self::Error>;
/// List all region stats in the cluster.
async fn list_region_stats(&self) -> std::result::Result<Vec<RegionStat>, Self::Error>;
// TODO(jeremy): Other info, like region status, etc.
}

View File

@@ -0,0 +1,413 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use std::collections::HashSet;
use std::str::FromStr;
use api::v1::meta::{HeartbeatRequest, RequestHeader};
use common_time::util as time_util;
use lazy_static::lazy_static;
use regex::Regex;
use serde::{Deserialize, Serialize};
use snafu::{ensure, OptionExt, ResultExt};
use store_api::region_engine::{RegionRole, RegionStatistic};
use store_api::storage::RegionId;
use table::metadata::TableId;
use crate::error::Result;
use crate::{error, ClusterId};
pub(crate) const DATANODE_LEASE_PREFIX: &str = "__meta_datanode_lease";
const INACTIVE_REGION_PREFIX: &str = "__meta_inactive_region";
const DATANODE_STAT_PREFIX: &str = "__meta_datanode_stat";
pub const REGION_STATISTIC_KEY: &str = "__region_statistic";
lazy_static! {
pub(crate) static ref DATANODE_LEASE_KEY_PATTERN: Regex =
Regex::new(&format!("^{DATANODE_LEASE_PREFIX}-([0-9]+)-([0-9]+)$")).unwrap();
static ref DATANODE_STAT_KEY_PATTERN: Regex =
Regex::new(&format!("^{DATANODE_STAT_PREFIX}-([0-9]+)-([0-9]+)$")).unwrap();
static ref INACTIVE_REGION_KEY_PATTERN: Regex = Regex::new(&format!(
"^{INACTIVE_REGION_PREFIX}-([0-9]+)-([0-9]+)-([0-9]+)$"
))
.unwrap();
}
/// The key of the datanode stat in the storage.
///
/// The format is `__meta_datanode_stat-{cluster_id}-{node_id}`.
#[derive(Debug, Clone, Default, Serialize, Deserialize)]
pub struct Stat {
pub timestamp_millis: i64,
pub cluster_id: ClusterId,
// The datanode Id.
pub id: u64,
// The datanode address.
pub addr: String,
/// The read capacity units during this period
pub rcus: i64,
/// The write capacity units during this period
pub wcus: i64,
/// How many regions on this node
pub region_num: u64,
pub region_stats: Vec<RegionStat>,
// The node epoch is used to check whether the node has restarted or redeployed.
pub node_epoch: u64,
}
/// The statistics of a region.
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct RegionStat {
/// The region_id.
pub id: RegionId,
/// The read capacity units during this period
pub rcus: i64,
/// The write capacity units during this period
pub wcus: i64,
/// Approximate bytes of this region
pub approximate_bytes: i64,
/// The engine name.
pub engine: String,
/// The region role.
pub role: RegionRole,
/// The size of the memtable in bytes.
pub memtable_size: u64,
/// The size of the manifest in bytes.
pub manifest_size: u64,
/// The size of the SST files in bytes.
pub sst_size: u64,
}
impl Stat {
#[inline]
pub fn is_empty(&self) -> bool {
self.region_stats.is_empty()
}
pub fn stat_key(&self) -> DatanodeStatKey {
DatanodeStatKey {
cluster_id: self.cluster_id,
node_id: self.id,
}
}
/// Returns a tuple array containing [RegionId] and [RegionRole].
pub fn regions(&self) -> Vec<(RegionId, RegionRole)> {
self.region_stats.iter().map(|s| (s.id, s.role)).collect()
}
/// Returns all table ids in the region stats.
pub fn table_ids(&self) -> HashSet<TableId> {
self.region_stats.iter().map(|s| s.id.table_id()).collect()
}
/// Retains the active region stats and updates the rcus, wcus, and region_num.
pub fn retain_active_region_stats(&mut self, inactive_region_ids: &HashSet<RegionId>) {
if inactive_region_ids.is_empty() {
return;
}
self.region_stats
.retain(|r| !inactive_region_ids.contains(&r.id));
self.rcus = self.region_stats.iter().map(|s| s.rcus).sum();
self.wcus = self.region_stats.iter().map(|s| s.wcus).sum();
self.region_num = self.region_stats.len() as u64;
}
}
impl TryFrom<&HeartbeatRequest> for Stat {
type Error = Option<RequestHeader>;
fn try_from(value: &HeartbeatRequest) -> std::result::Result<Self, Self::Error> {
let HeartbeatRequest {
header,
peer,
region_stats,
node_epoch,
..
} = value;
match (header, peer) {
(Some(header), Some(peer)) => {
let region_stats = region_stats
.iter()
.map(RegionStat::from)
.collect::<Vec<_>>();
Ok(Self {
timestamp_millis: time_util::current_time_millis(),
cluster_id: header.cluster_id,
// datanode id
id: peer.id,
// datanode address
addr: peer.addr.clone(),
rcus: region_stats.iter().map(|s| s.rcus).sum(),
wcus: region_stats.iter().map(|s| s.wcus).sum(),
region_num: region_stats.len() as u64,
region_stats,
node_epoch: *node_epoch,
})
}
(header, _) => Err(header.clone()),
}
}
}
impl From<&api::v1::meta::RegionStat> for RegionStat {
fn from(value: &api::v1::meta::RegionStat) -> Self {
let region_stat = value
.extensions
.get(REGION_STATISTIC_KEY)
.and_then(|value| RegionStatistic::deserialize_from_slice(value))
.unwrap_or_default();
Self {
id: RegionId::from_u64(value.region_id),
rcus: value.rcus,
wcus: value.wcus,
approximate_bytes: value.approximate_bytes,
engine: value.engine.to_string(),
role: RegionRole::from(value.role()),
memtable_size: region_stat.memtable_size,
manifest_size: region_stat.manifest_size,
sst_size: region_stat.sst_size,
}
}
}
/// The key of the datanode stat in the memory store.
///
/// The format is `__meta_datanode_stat-{cluster_id}-{node_id}`.
#[derive(Debug, Clone, Copy, Eq, PartialEq, Hash)]
pub struct DatanodeStatKey {
pub cluster_id: ClusterId,
pub node_id: u64,
}
impl DatanodeStatKey {
/// The key prefix.
pub fn prefix_key() -> Vec<u8> {
format!("{DATANODE_STAT_PREFIX}-").into_bytes()
}
/// The key prefix with the cluster id.
pub fn key_prefix_with_cluster_id(cluster_id: ClusterId) -> String {
format!("{DATANODE_STAT_PREFIX}-{cluster_id}-")
}
}
impl From<DatanodeStatKey> for Vec<u8> {
fn from(value: DatanodeStatKey) -> Self {
format!(
"{}-{}-{}",
DATANODE_STAT_PREFIX, value.cluster_id, value.node_id
)
.into_bytes()
}
}
impl FromStr for DatanodeStatKey {
type Err = error::Error;
fn from_str(key: &str) -> Result<Self> {
let caps = DATANODE_STAT_KEY_PATTERN
.captures(key)
.context(error::InvalidStatKeySnafu { key })?;
ensure!(caps.len() == 3, error::InvalidStatKeySnafu { key });
let cluster_id = caps[1].to_string();
let node_id = caps[2].to_string();
let cluster_id: u64 = cluster_id.parse().context(error::ParseNumSnafu {
err_msg: format!("invalid cluster_id: {cluster_id}"),
})?;
let node_id: u64 = node_id.parse().context(error::ParseNumSnafu {
err_msg: format!("invalid node_id: {node_id}"),
})?;
Ok(Self {
cluster_id,
node_id,
})
}
}
impl TryFrom<Vec<u8>> for DatanodeStatKey {
type Error = error::Error;
fn try_from(bytes: Vec<u8>) -> Result<Self> {
String::from_utf8(bytes)
.context(error::FromUtf8Snafu {
name: "DatanodeStatKey",
})
.map(|x| x.parse())?
}
}
/// The value of the datanode stat in the memory store.
#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(transparent)]
pub struct DatanodeStatValue {
pub stats: Vec<Stat>,
}
impl DatanodeStatValue {
/// Get the latest number of regions.
pub fn region_num(&self) -> Option<u64> {
self.stats.last().map(|x| x.region_num)
}
/// Get the latest node addr.
pub fn node_addr(&self) -> Option<String> {
self.stats.last().map(|x| x.addr.clone())
}
}
impl TryFrom<DatanodeStatValue> for Vec<u8> {
type Error = error::Error;
fn try_from(stats: DatanodeStatValue) -> Result<Self> {
Ok(serde_json::to_string(&stats)
.context(error::SerializeToJsonSnafu {
input: format!("{stats:?}"),
})?
.into_bytes())
}
}
impl FromStr for DatanodeStatValue {
type Err = error::Error;
fn from_str(value: &str) -> Result<Self> {
serde_json::from_str(value).context(error::DeserializeFromJsonSnafu { input: value })
}
}
impl TryFrom<Vec<u8>> for DatanodeStatValue {
type Error = error::Error;
fn try_from(value: Vec<u8>) -> Result<Self> {
String::from_utf8(value)
.context(error::FromUtf8Snafu {
name: "DatanodeStatValue",
})
.map(|x| x.parse())?
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_stat_key() {
let stat = Stat {
cluster_id: 3,
id: 101,
region_num: 10,
..Default::default()
};
let stat_key = stat.stat_key();
assert_eq!(3, stat_key.cluster_id);
assert_eq!(101, stat_key.node_id);
}
#[test]
fn test_stat_val_round_trip() {
let stat = Stat {
cluster_id: 0,
id: 101,
region_num: 100,
..Default::default()
};
let stat_val = DatanodeStatValue { stats: vec![stat] };
let bytes: Vec<u8> = stat_val.try_into().unwrap();
let stat_val: DatanodeStatValue = bytes.try_into().unwrap();
let stats = stat_val.stats;
assert_eq!(1, stats.len());
let stat = stats.first().unwrap();
assert_eq!(0, stat.cluster_id);
assert_eq!(101, stat.id);
assert_eq!(100, stat.region_num);
}
#[test]
fn test_get_addr_from_stat_val() {
let empty = DatanodeStatValue { stats: vec![] };
let addr = empty.node_addr();
assert!(addr.is_none());
let stat_val = DatanodeStatValue {
stats: vec![
Stat {
addr: "1".to_string(),
..Default::default()
},
Stat {
addr: "2".to_string(),
..Default::default()
},
Stat {
addr: "3".to_string(),
..Default::default()
},
],
};
let addr = stat_val.node_addr().unwrap();
assert_eq!("3", addr);
}
#[test]
fn test_get_region_num_from_stat_val() {
let empty = DatanodeStatValue { stats: vec![] };
let region_num = empty.region_num();
assert!(region_num.is_none());
let wrong = DatanodeStatValue {
stats: vec![Stat {
region_num: 0,
..Default::default()
}],
};
let right = wrong.region_num();
assert_eq!(Some(0), right);
let stat_val = DatanodeStatValue {
stats: vec![
Stat {
region_num: 1,
..Default::default()
},
Stat {
region_num: 0,
..Default::default()
},
Stat {
region_num: 2,
..Default::default()
},
],
};
let region_num = stat_val.region_num().unwrap();
assert_eq!(2, region_num);
}
}

View File

@@ -218,6 +218,24 @@ pub enum Error {
error: JsonError,
},
#[snafu(display("Failed to serialize to json: {}", input))]
SerializeToJson {
input: String,
#[snafu(source)]
error: serde_json::error::Error,
#[snafu(implicit)]
location: Location,
},
#[snafu(display("Failed to deserialize from json: {}", input))]
DeserializeFromJson {
input: String,
#[snafu(source)]
error: serde_json::error::Error,
#[snafu(implicit)]
location: Location,
},
#[snafu(display("Payload not exist"))]
PayloadNotExist {
#[snafu(implicit)]
@@ -531,13 +549,20 @@ pub enum Error {
location: Location,
},
#[snafu(display("Invalid node info key: {}", key))]
#[snafu(display("Invalid node info key: {}", key))]
InvalidNodeInfoKey {
key: String,
#[snafu(implicit)]
location: Location,
},
#[snafu(display("Invalid node stat key: {}", key))]
InvalidStatKey {
key: String,
#[snafu(implicit)]
location: Location,
},
#[snafu(display("Failed to parse number: {}", err_msg))]
ParseNum {
err_msg: String,
@@ -627,7 +652,9 @@ impl ErrorExt for Error {
| EtcdTxnFailed { .. }
| ConnectEtcd { .. }
| MoveValues { .. }
| GetCache { .. } => StatusCode::Internal,
| GetCache { .. }
| SerializeToJson { .. }
| DeserializeFromJson { .. } => StatusCode::Internal,
ValueNotExist { .. } => StatusCode::Unexpected,
@@ -700,6 +727,7 @@ impl ErrorExt for Error {
| InvalidNumTopics { .. }
| SchemaNotFound { .. }
| InvalidNodeInfoKey { .. }
| InvalidStatKey { .. }
| ParseNum { .. }
| InvalidRole { .. }
| EmptyDdlTasks { .. } => StatusCode::InvalidArguments,

View File

@@ -22,6 +22,7 @@
pub mod cache;
pub mod cache_invalidator;
pub mod cluster;
pub mod datanode;
pub mod ddl;
pub mod ddl_manager;
pub mod distributed_time_constants;