feat: Report disk usage stats to metasrv thru heartbeat (#1167)

* feat: Report disk usage stats to metasrv thru heartbeat

Signed-off-by: Zheming Li <nkdudu@126.com>

* Update src/catalog/src/error.rs

Co-authored-by: fys <40801205+Fengys123@users.noreply.github.com>

* Update src/catalog/src/lib.rs

Co-authored-by: fys <40801205+Fengys123@users.noreply.github.com>

* Update src/mito/src/table.rs

Co-authored-by: fys <40801205+Fengys123@users.noreply.github.com>

---------

Signed-off-by: Zheming Li <nkdudu@126.com>
Co-authored-by: fys <40801205+Fengys123@users.noreply.github.com>
This commit is contained in:
Zheming Li
2023-03-15 11:11:32 +08:00
committed by GitHub
parent cbf64e65b9
commit f712f978cf
5 changed files with 75 additions and 14 deletions

View File

@@ -204,6 +204,21 @@ pub enum Error {
#[snafu(display("Illegal access to catalog: {} and schema: {}", catalog, schema))]
QueryAccessDenied { catalog: String, schema: String },
#[snafu(display(
"Failed to get region stats, catalog: {}, schema: {}, table: {}, source: {}",
catalog,
schema,
table,
source
))]
RegionStats {
catalog: String,
schema: String,
table: String,
#[snafu(backtrace)]
source: table::error::Error,
},
}
pub type Result<T> = std::result::Result<T, Error>;
@@ -238,7 +253,8 @@ impl ErrorExt for Error {
| Error::InsertCatalogRecord { source, .. }
| Error::OpenTable { source, .. }
| Error::CreateTable { source, .. }
| Error::DeregisterTable { source, .. } => source.status_code(),
| Error::DeregisterTable { source, .. }
| Error::RegionStats { source, .. } => source.status_code(),
Error::MetaSrv { source, .. } => source.status_code(),
Error::SystemCatalogTableScan { source } => source.status_code(),

View File

@@ -18,6 +18,7 @@ use std::any::Any;
use std::fmt::{Debug, Formatter};
use std::sync::Arc;
use api::v1::meta::{RegionStat, TableName};
use common_telemetry::info;
use snafu::{OptionExt, ResultExt};
use table::engine::{EngineContext, TableEngineRef};
@@ -225,10 +226,10 @@ pub(crate) async fn handle_system_table_request<'a, M: CatalogManager>(
Ok(())
}
/// The number of regions in the datanode node.
pub async fn region_number(catalog_manager: &CatalogManagerRef) -> Result<u64> {
let mut region_number: u64 = 0;
/// The stat of regions in the datanode node.
/// The number of regions can be got from len of vec.
pub async fn region_stats(catalog_manager: &CatalogManagerRef) -> Result<Vec<RegionStat>> {
let mut region_stats = Vec::new();
for catalog_name in catalog_manager.catalog_names()? {
let catalog =
catalog_manager
@@ -254,10 +255,28 @@ pub async fn region_number(catalog_manager: &CatalogManagerRef) -> Result<u64> {
table_info: &table_name,
})?;
let region_numbers = &table.table_info().meta.region_numbers;
region_number += region_numbers.len() as u64;
region_stats.extend(
table
.region_stats()
.context(error::RegionStatsSnafu {
catalog: &catalog_name,
schema: &schema_name,
table: &table_name,
})?
.into_iter()
.map(|stat| RegionStat {
region_id: stat.region_id,
table_name: Some(TableName {
catalog_name: catalog_name.clone(),
schema_name: schema_name.clone(),
table_name: table_name.clone(),
}),
approximate_bytes: stat.disk_usage_bytes as i64,
..Default::default()
}),
);
}
}
}
Ok(region_number)
Ok(region_stats)
}

View File

@@ -17,7 +17,7 @@ use std::sync::Arc;
use std::time::Duration;
use api::v1::meta::{HeartbeatRequest, HeartbeatResponse, NodeStat, Peer};
use catalog::{region_number, CatalogManagerRef};
use catalog::{region_stats, CatalogManagerRef};
use common_telemetry::{error, info, warn};
use meta_client::client::{HeartbeatSender, MetaClient};
use snafu::ResultExt;
@@ -106,11 +106,11 @@ impl HeartbeatTask {
let mut tx = Self::create_streams(&meta_client, running.clone()).await?;
common_runtime::spawn_bg(async move {
while running.load(Ordering::Acquire) {
let region_num = match region_number(&catalog_manager_clone).await {
Ok(region_num) => region_num as i64,
let (region_num, region_stats) = match region_stats(&catalog_manager_clone).await {
Ok(region_stats) => (region_stats.len() as i64, region_stats),
Err(e) => {
error!("failed to get region number, err: {e:?}");
-1
error!("failed to get region status, err: {e:?}");
(-1, vec![])
}
};
@@ -123,6 +123,7 @@ impl HeartbeatTask {
region_num,
..Default::default()
}),
region_stats,
..Default::default()
};

View File

@@ -47,7 +47,7 @@ use table::requests::{
AddColumnRequest, AlterKind, AlterTableRequest, DeleteRequest, FlushTableRequest, InsertRequest,
};
use table::table::scan::SimpleTableScan;
use table::table::{AlterContext, Table};
use table::table::{AlterContext, RegionStat, Table};
use tokio::sync::Mutex;
use crate::error;
@@ -350,6 +350,17 @@ impl<R: Region> Table for MitoTable<R> {
Ok(())
}
fn region_stats(&self) -> TableResult<Vec<RegionStat>> {
Ok(self
.regions
.values()
.map(|region| RegionStat {
region_id: region.id(),
disk_usage_bytes: region.disk_usage_bytes(),
})
.collect())
}
}
struct ChunkStream {

View File

@@ -103,6 +103,14 @@ pub trait Table: Send + Sync {
async fn close(&self) -> Result<()> {
Ok(())
}
/// Get region stats in this table.
fn region_stats(&self) -> Result<Vec<RegionStat>> {
UnsupportedSnafu {
operation: "REGION_STATS",
}
.fail()?
}
}
pub type TableRef = Arc<dyn Table>;
@@ -113,3 +121,9 @@ pub trait TableIdProvider {
}
pub type TableIdProviderRef = Arc<dyn TableIdProvider + Send + Sync>;
#[derive(Default, Debug)]
pub struct RegionStat {
pub region_id: u64,
pub disk_usage_bytes: u64,
}