From f712f978cfccb7ef0b95486bc49a2155d1a9d502 Mon Sep 17 00:00:00 2001 From: Zheming Li Date: Wed, 15 Mar 2023 11:11:32 +0800 Subject: [PATCH] feat: Report disk usage stats to metasrv thru heartbeat (#1167) * feat: Report disk usage stats to metasrv thru heartbeat Signed-off-by: Zheming Li * Update src/catalog/src/error.rs Co-authored-by: fys <40801205+Fengys123@users.noreply.github.com> * Update src/catalog/src/lib.rs Co-authored-by: fys <40801205+Fengys123@users.noreply.github.com> * Update src/mito/src/table.rs Co-authored-by: fys <40801205+Fengys123@users.noreply.github.com> --------- Signed-off-by: Zheming Li Co-authored-by: fys <40801205+Fengys123@users.noreply.github.com> --- src/catalog/src/error.rs | 18 +++++++++++++++++- src/catalog/src/lib.rs | 33 ++++++++++++++++++++++++++------- src/datanode/src/heartbeat.rs | 11 ++++++----- src/mito/src/table.rs | 13 ++++++++++++- src/table/src/table.rs | 14 ++++++++++++++ 5 files changed, 75 insertions(+), 14 deletions(-) diff --git a/src/catalog/src/error.rs b/src/catalog/src/error.rs index f527aed4f9..137286207d 100644 --- a/src/catalog/src/error.rs +++ b/src/catalog/src/error.rs @@ -204,6 +204,21 @@ pub enum Error { #[snafu(display("Illegal access to catalog: {} and schema: {}", catalog, schema))] QueryAccessDenied { catalog: String, schema: String }, + + #[snafu(display( + "Failed to get region stats, catalog: {}, schema: {}, table: {}, source: {}", + catalog, + schema, + table, + source + ))] + RegionStats { + catalog: String, + schema: String, + table: String, + #[snafu(backtrace)] + source: table::error::Error, + }, } pub type Result = std::result::Result; @@ -238,7 +253,8 @@ impl ErrorExt for Error { | Error::InsertCatalogRecord { source, .. } | Error::OpenTable { source, .. } | Error::CreateTable { source, .. } - | Error::DeregisterTable { source, .. } => source.status_code(), + | Error::DeregisterTable { source, .. } + | Error::RegionStats { source, .. } => source.status_code(), Error::MetaSrv { source, .. } => source.status_code(), Error::SystemCatalogTableScan { source } => source.status_code(), diff --git a/src/catalog/src/lib.rs b/src/catalog/src/lib.rs index faad0000a8..3b0e0bd8b7 100644 --- a/src/catalog/src/lib.rs +++ b/src/catalog/src/lib.rs @@ -18,6 +18,7 @@ use std::any::Any; use std::fmt::{Debug, Formatter}; use std::sync::Arc; +use api::v1::meta::{RegionStat, TableName}; use common_telemetry::info; use snafu::{OptionExt, ResultExt}; use table::engine::{EngineContext, TableEngineRef}; @@ -225,10 +226,10 @@ pub(crate) async fn handle_system_table_request<'a, M: CatalogManager>( Ok(()) } -/// The number of regions in the datanode node. -pub async fn region_number(catalog_manager: &CatalogManagerRef) -> Result { - let mut region_number: u64 = 0; - +/// The stat of regions in the datanode node. +/// The number of regions can be got from len of vec. +pub async fn region_stats(catalog_manager: &CatalogManagerRef) -> Result> { + let mut region_stats = Vec::new(); for catalog_name in catalog_manager.catalog_names()? { let catalog = catalog_manager @@ -254,10 +255,28 @@ pub async fn region_number(catalog_manager: &CatalogManagerRef) -> Result { table_info: &table_name, })?; - let region_numbers = &table.table_info().meta.region_numbers; - region_number += region_numbers.len() as u64; + region_stats.extend( + table + .region_stats() + .context(error::RegionStatsSnafu { + catalog: &catalog_name, + schema: &schema_name, + table: &table_name, + })? + .into_iter() + .map(|stat| RegionStat { + region_id: stat.region_id, + table_name: Some(TableName { + catalog_name: catalog_name.clone(), + schema_name: schema_name.clone(), + table_name: table_name.clone(), + }), + approximate_bytes: stat.disk_usage_bytes as i64, + ..Default::default() + }), + ); } } } - Ok(region_number) + Ok(region_stats) } diff --git a/src/datanode/src/heartbeat.rs b/src/datanode/src/heartbeat.rs index 842cbe6eed..539af7f819 100644 --- a/src/datanode/src/heartbeat.rs +++ b/src/datanode/src/heartbeat.rs @@ -17,7 +17,7 @@ use std::sync::Arc; use std::time::Duration; use api::v1::meta::{HeartbeatRequest, HeartbeatResponse, NodeStat, Peer}; -use catalog::{region_number, CatalogManagerRef}; +use catalog::{region_stats, CatalogManagerRef}; use common_telemetry::{error, info, warn}; use meta_client::client::{HeartbeatSender, MetaClient}; use snafu::ResultExt; @@ -106,11 +106,11 @@ impl HeartbeatTask { let mut tx = Self::create_streams(&meta_client, running.clone()).await?; common_runtime::spawn_bg(async move { while running.load(Ordering::Acquire) { - let region_num = match region_number(&catalog_manager_clone).await { - Ok(region_num) => region_num as i64, + let (region_num, region_stats) = match region_stats(&catalog_manager_clone).await { + Ok(region_stats) => (region_stats.len() as i64, region_stats), Err(e) => { - error!("failed to get region number, err: {e:?}"); - -1 + error!("failed to get region status, err: {e:?}"); + (-1, vec![]) } }; @@ -123,6 +123,7 @@ impl HeartbeatTask { region_num, ..Default::default() }), + region_stats, ..Default::default() }; diff --git a/src/mito/src/table.rs b/src/mito/src/table.rs index 8bf9a568ec..2e837a04db 100644 --- a/src/mito/src/table.rs +++ b/src/mito/src/table.rs @@ -47,7 +47,7 @@ use table::requests::{ AddColumnRequest, AlterKind, AlterTableRequest, DeleteRequest, FlushTableRequest, InsertRequest, }; use table::table::scan::SimpleTableScan; -use table::table::{AlterContext, Table}; +use table::table::{AlterContext, RegionStat, Table}; use tokio::sync::Mutex; use crate::error; @@ -350,6 +350,17 @@ impl Table for MitoTable { Ok(()) } + + fn region_stats(&self) -> TableResult> { + Ok(self + .regions + .values() + .map(|region| RegionStat { + region_id: region.id(), + disk_usage_bytes: region.disk_usage_bytes(), + }) + .collect()) + } } struct ChunkStream { diff --git a/src/table/src/table.rs b/src/table/src/table.rs index 223f132dda..844fd8af18 100644 --- a/src/table/src/table.rs +++ b/src/table/src/table.rs @@ -103,6 +103,14 @@ pub trait Table: Send + Sync { async fn close(&self) -> Result<()> { Ok(()) } + + /// Get region stats in this table. + fn region_stats(&self) -> Result> { + UnsupportedSnafu { + operation: "REGION_STATS", + } + .fail()? + } } pub type TableRef = Arc; @@ -113,3 +121,9 @@ pub trait TableIdProvider { } pub type TableIdProviderRef = Arc; + +#[derive(Default, Debug)] +pub struct RegionStat { + pub region_id: u64, + pub disk_usage_bytes: u64, +}