fix: can not create table in the local distributed environment (#1207)

fix: create table in local distribute env
This commit is contained in:
fys
2023-03-20 20:12:35 +08:00
committed by GitHub
parent e19c8fa2b6
commit 0596d20a3b
2 changed files with 23 additions and 18 deletions

View File

@@ -19,7 +19,7 @@ use std::fmt::{Debug, Formatter};
use std::sync::Arc;
use api::v1::meta::{RegionStat, TableName};
use common_telemetry::info;
use common_telemetry::{info, warn};
use snafu::{OptionExt, ResultExt};
use table::engine::{EngineContext, TableEngineRef};
use table::metadata::TableId;
@@ -228,8 +228,10 @@ pub(crate) async fn handle_system_table_request<'a, M: CatalogManager>(
/// The stat of regions in the datanode node.
/// The number of regions can be got from len of vec.
pub async fn region_stats(catalog_manager: &CatalogManagerRef) -> Result<Vec<RegionStat>> {
pub async fn datanode_stat(catalog_manager: &CatalogManagerRef) -> Result<(u64, Vec<RegionStat>)> {
let mut region_number: u64 = 0;
let mut region_stats = Vec::new();
for catalog_name in catalog_manager.catalog_names()? {
let catalog =
catalog_manager
@@ -255,16 +257,12 @@ pub async fn region_stats(catalog_manager: &CatalogManagerRef) -> Result<Vec<Reg
table_info: &table_name,
})?;
region_stats.extend(
table
.region_stats()
.context(error::RegionStatsSnafu {
catalog: &catalog_name,
schema: &schema_name,
table: &table_name,
})?
.into_iter()
.map(|stat| RegionStat {
let region_numbers = &table.table_info().meta.region_numbers;
region_number += region_numbers.len() as u64;
match table.region_stats() {
Ok(stats) => {
let stats = stats.into_iter().map(|stat| RegionStat {
region_id: stat.region_id,
table_name: Some(TableName {
catalog_name: catalog_name.clone(),
@@ -273,10 +271,17 @@ pub async fn region_stats(catalog_manager: &CatalogManagerRef) -> Result<Vec<Reg
}),
approximate_bytes: stat.disk_usage_bytes as i64,
..Default::default()
}),
);
});
region_stats.extend(stats);
}
Err(e) => {
warn!("Failed to get region status, err: {:?}", e);
}
};
}
}
}
Ok(region_stats)
Ok((region_number, region_stats))
}

View File

@@ -17,7 +17,7 @@ use std::sync::Arc;
use std::time::Duration;
use api::v1::meta::{HeartbeatRequest, HeartbeatResponse, NodeStat, Peer};
use catalog::{region_stats, CatalogManagerRef};
use catalog::{datanode_stat, CatalogManagerRef};
use common_telemetry::{error, info, warn};
use meta_client::client::{HeartbeatSender, MetaClient};
use snafu::ResultExt;
@@ -106,8 +106,8 @@ impl HeartbeatTask {
let mut tx = Self::create_streams(&meta_client, running.clone()).await?;
common_runtime::spawn_bg(async move {
while running.load(Ordering::Acquire) {
let (region_num, region_stats) = match region_stats(&catalog_manager_clone).await {
Ok(region_stats) => (region_stats.len() as i64, region_stats),
let (region_num, region_stats) = match datanode_stat(&catalog_manager_clone).await {
Ok(datanode_stat) => (datanode_stat.0 as i64, datanode_stat.1),
Err(e) => {
error!("failed to get region status, err: {e:?}");
(-1, vec![])