diff --git a/Cargo.lock b/Cargo.lock index c4ba4cbf1e..e5a5b385a8 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4072,7 +4072,7 @@ checksum = "d2fabcfbdc87f4758337ca535fb41a6d701b65693ce38287d856d1674551ec9b" [[package]] name = "greptime-proto" version = "0.1.0" -source = "git+https://github.com/GreptimeTeam/greptime-proto.git?rev=6d4131f57ece9a8d250a289b3af1567eab768c86#6d4131f57ece9a8d250a289b3af1567eab768c86" +source = "git+https://github.com/GreptimeTeam/greptime-proto.git?rev=ffc2bdfabea578b1d264a11b741df12395e89e87#ffc2bdfabea578b1d264a11b741df12395e89e87" dependencies = [ "prost", "serde", diff --git a/Cargo.toml b/Cargo.toml index 6a8a500fe1..07e3fb6335 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -78,7 +78,7 @@ datafusion-substrait = { git = "https://github.com/waynexia/arrow-datafusion.git derive_builder = "0.12" futures = "0.3" futures-util = "0.3" -greptime-proto = { git = "https://github.com/GreptimeTeam/greptime-proto.git", rev = "6d4131f57ece9a8d250a289b3af1567eab768c86" } +greptime-proto = { git = "https://github.com/GreptimeTeam/greptime-proto.git", rev = "ffc2bdfabea578b1d264a11b741df12395e89e87" } humantime-serde = "1.1" itertools = "0.10" lazy_static = "1.4" diff --git a/src/catalog/src/lib.rs b/src/catalog/src/lib.rs index e0f4a4e860..3a284cef4a 100644 --- a/src/catalog/src/lib.rs +++ b/src/catalog/src/lib.rs @@ -17,11 +17,10 @@ #![feature(try_blocks)] use std::any::Any; -use std::collections::HashMap; use std::fmt::{Debug, Formatter}; use std::sync::Arc; -use api::v1::meta::{RegionStat, TableIdent, TableName}; +use api::v1::meta::RegionStat; use common_telemetry::{info, warn}; use snafu::ResultExt; use table::engine::{EngineContext, TableEngineRef}; @@ -248,23 +247,13 @@ pub async fn datanode_stat(catalog_manager: &CatalogManagerRef) -> (u64, Vec { let stats = stats.into_iter().map(|stat| RegionStat { region_id: stat.region_id, - table_ident: Some(TableIdent { - table_id, - table_name: Some(TableName { - catalog_name: catalog_name.clone(), - schema_name: schema_name.clone(), - table_name: table_name.clone(), - }), - engine: engine.clone(), - }), approximate_bytes: stat.disk_usage_bytes as i64, - attrs: HashMap::from([("engine_name".to_owned(), engine.clone())]), + engine: engine.clone(), ..Default::default() }); diff --git a/src/catalog/src/remote/region_alive_keeper.rs b/src/catalog/src/remote/region_alive_keeper.rs index dff500b0f1..fb7c6b151f 100644 --- a/src/catalog/src/remote/region_alive_keeper.rs +++ b/src/catalog/src/remote/region_alive_keeper.rs @@ -26,7 +26,7 @@ use common_meta::ident::TableIdent; use common_meta::RegionIdent; use common_telemetry::{debug, error, info, warn}; use snafu::{OptionExt, ResultExt}; -use store_api::storage::RegionNumber; +use store_api::storage::{RegionId, RegionNumber}; use table::engine::manager::TableEngineManagerRef; use table::engine::{CloseTableResult, EngineContext, TableEngineRef}; use table::metadata::TableId; @@ -166,39 +166,35 @@ impl RegionAliveKeepers { #[async_trait] impl HeartbeatResponseHandler for RegionAliveKeepers { fn is_acceptable(&self, ctx: &HeartbeatResponseHandlerContext) -> bool { - !ctx.response.region_leases.is_empty() + ctx.response.region_lease.is_some() } async fn handle( &self, ctx: &mut HeartbeatResponseHandlerContext, ) -> common_meta::error::Result { - let leases = ctx.response.region_leases.drain(..).collect::>(); - for lease in leases { - let table_ident: TableIdent = match lease - .table_ident - .context(InvalidProtoMsgSnafu { - err_msg: "'table_ident' is missing in RegionLease", - }) - .and_then(|x| x.try_into()) - { - Ok(x) => x, - Err(e) => { - error!(e; ""); - continue; - } - }; - - let table_id = table_ident.table_id; + let region_lease = ctx + .response + .region_lease + .as_ref() + .context(InvalidProtoMsgSnafu { + err_msg: "'region_lease' is missing in heartbeat response", + })?; + let start_instant = self.epoch + Duration::from_millis(region_lease.duration_since_epoch); + let deadline = start_instant + Duration::from_secs(region_lease.lease_seconds); + for raw_region_id in ®ion_lease.region_ids { + let region_id = RegionId::from_u64(*raw_region_id); + let table_id = region_id.table_id(); let Some(keeper) = self.keepers.lock().await.get(&table_id).cloned() else { // Alive keeper could be affected by lagging msg, just warn and ignore. - warn!("Alive keeper for table {table_ident} is not found!"); + warn!("Alive keeper for table {table_id} is not found!"); continue; }; - let start_instant = self.epoch + Duration::from_millis(lease.duration_since_epoch); - let deadline = start_instant + Duration::from_secs(lease.lease_seconds); - keeper.keep_lived(lease.regions, deadline).await; + // TODO(jeremy): refactor this, use region_id + keeper + .keep_lived(vec![region_id.region_number()], deadline) + .await; } Ok(HandleControl::Continue) } @@ -602,12 +598,14 @@ mod test { let duration_since_epoch = (Instant::now() - keepers.epoch).as_millis() as _; let lease_seconds = 100; let response = HeartbeatResponse { - region_leases: vec![RegionLease { - table_ident: Some(table_ident.clone().into()), - regions: vec![1, 3], // Not extending region 2's lease time. + region_lease: Some(RegionLease { + region_ids: vec![ + RegionId::new(table_ident.table_id, 1).as_u64(), + RegionId::new(table_ident.table_id, 3).as_u64(), + ], // Not extending region 2's lease time. duration_since_epoch, lease_seconds, - }], + }), ..Default::default() }; let keep_alive_until = keepers.epoch diff --git a/src/datanode/src/heartbeat.rs b/src/datanode/src/heartbeat.rs index e6434e1e39..c10e02aab9 100644 --- a/src/datanode/src/heartbeat.rs +++ b/src/datanode/src/heartbeat.rs @@ -16,7 +16,7 @@ use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::Arc; use std::time::Duration; -use api::v1::meta::{HeartbeatRequest, NodeStat, Peer, RegionStat, TableIdent}; +use api::v1::meta::{HeartbeatRequest, Peer, RegionStat}; use catalog::remote::region_alive_keeper::RegionAliveKeepers; use common_meta::heartbeat::handler::parse_mailbox_message::ParseMailboxMessageHandler; use common_meta::heartbeat::handler::{ @@ -216,16 +216,13 @@ impl HeartbeatTask { } } _ = &mut sleep => { - let (region_num,region_stats) = Self::load_stats(®ion_server_clone).await; + // TODO(jeremy): refactor load_status + let (_,region_stats) = Self::load_stats(®ion_server_clone).await; let req = HeartbeatRequest { peer: Some(Peer { id: node_id, addr: addr.clone(), }), - node_stat: Some(NodeStat { - region_num: region_num as _, - ..Default::default() - }), region_stats, duration_since_epoch: (Instant::now() - epoch).as_millis() as u64, node_epoch, @@ -268,14 +265,9 @@ impl HeartbeatTask { let region_stats = region_ids .into_iter() .map(|region_id| RegionStat { - // TODO: scratch more info + // TODO(ruihang): scratch more info region_id: region_id.as_u64(), - table_ident: Some(TableIdent { - table_id: region_id.table_id(), - table_name: None, - engine: "MitoEngine".to_string(), - }), - + engine: "MitoEngine".to_string(), ..Default::default() }) .collect::>(); diff --git a/src/meta-srv/src/handler.rs b/src/meta-srv/src/handler.rs index 9546827297..b75f74308c 100644 --- a/src/meta-srv/src/handler.rs +++ b/src/meta-srv/src/handler.rs @@ -81,7 +81,7 @@ pub struct HeartbeatAccumulator { pub header: Option, pub instructions: Vec, pub stat: Option, - pub region_leases: Vec, + pub region_lease: Option, } impl HeartbeatAccumulator { @@ -256,7 +256,7 @@ impl HeartbeatHandlerGroup { let header = std::mem::take(&mut acc.header); let res = HeartbeatResponse { header, - region_leases: acc.region_leases, + region_lease: acc.region_lease, ..Default::default() }; Ok(res) diff --git a/src/meta-srv/src/handler/failure_handler.rs b/src/meta-srv/src/handler/failure_handler.rs index 2939d8790f..8676113fbc 100644 --- a/src/meta-srv/src/handler/failure_handler.rs +++ b/src/meta-srv/src/handler/failure_handler.rs @@ -81,18 +81,19 @@ impl HeartbeatHandler for RegionFailureHandler { region_idents: stat .region_stats .iter() - .map(|x| RegionIdent { - cluster_id: stat.cluster_id, - datanode_id: stat.id, - table_ident: TableIdent { - catalog: x.table_ident.catalog.clone(), - schema: x.table_ident.schema.clone(), - table: x.table_ident.table.clone(), - table_id: RegionId::from(x.id).table_id(), - // TODO(#1583): Use the actual table engine. - engine: MITO_ENGINE.to_string(), - }, - region_number: x.id as u32, + .map(|x| { + let region_id = RegionId::from(x.id); + RegionIdent { + cluster_id: stat.cluster_id, + datanode_id: stat.id, + table_ident: TableIdent { + table_id: region_id.table_id(), + // TODO(#1583): Use the actual table engine. + engine: MITO_ENGINE.to_string(), + ..Default::default() + }, + region_number: region_id.region_number(), + } }) .collect(), heartbeat_time: stat.timestamp_millis, @@ -128,13 +129,6 @@ mod tests { fn new_region_stat(region_id: u64) -> RegionStat { RegionStat { id: region_id, - table_ident: TableIdent { - catalog: "a".to_string(), - schema: "b".to_string(), - table: "c".to_string(), - table_id: 0, - engine: "d".to_string(), - }, rcus: 0, wcus: 0, approximate_bytes: 0, diff --git a/src/meta-srv/src/handler/node_stat.rs b/src/meta-srv/src/handler/node_stat.rs index 7464bc8639..16db9329de 100644 --- a/src/meta-srv/src/handler/node_stat.rs +++ b/src/meta-srv/src/handler/node_stat.rs @@ -13,10 +13,8 @@ // limitations under the License. use api::v1::meta::HeartbeatRequest; -use common_meta::ident::TableIdent; use common_time::util as time_util; use serde::{Deserialize, Serialize}; -use snafu::OptionExt; use crate::error::{Error, InvalidHeartbeatRequestSnafu}; use crate::keys::StatKey; @@ -31,10 +29,8 @@ pub struct Stat { pub rcus: i64, /// The write capacity units during this period pub wcus: i64, - /// How many tables on this node - pub table_num: i64, /// How many regions on this node - pub region_num: Option, + pub region_num: u64, pub region_stats: Vec, // The node epoch is used to check whether the node has restarted or redeployed. pub node_epoch: u64, @@ -43,7 +39,6 @@ pub struct Stat { #[derive(Debug, Default, Serialize, Deserialize)] pub struct RegionStat { pub id: u64, - pub table_ident: TableIdent, /// The read capacity units during this period pub rcus: i64, /// The write capacity units during this period @@ -70,19 +65,13 @@ impl TryFrom for Stat { let HeartbeatRequest { header, peer, - node_stat, region_stats, node_epoch, .. } = value; - match (header, peer, node_stat) { - (Some(header), Some(peer), Some(node_stat)) => { - let region_num = if node_stat.region_num >= 0 { - Some(node_stat.region_num as u64) - } else { - None - }; + match (header, peer) { + (Some(header), Some(peer)) => { let region_stats = region_stats .into_iter() .map(RegionStat::try_from) @@ -93,10 +82,9 @@ impl TryFrom for Stat { cluster_id: header.cluster_id, id: peer.id, addr: peer.addr, - rcus: node_stat.rcus, - wcus: node_stat.wcus, - table_num: node_stat.table_num, - region_num, + rcus: region_stats.iter().map(|s| s.rcus).sum(), + wcus: region_stats.iter().map(|s| s.wcus).sum(), + region_num: region_stats.len() as u64, region_stats, node_epoch, }) @@ -113,20 +101,8 @@ impl TryFrom for RegionStat { type Error = Error; fn try_from(value: api::v1::meta::RegionStat) -> Result { - let table_ident = value.table_ident.context(InvalidHeartbeatRequestSnafu { - err_msg: "missing table_ident", - })?; - let table_ident_result = TableIdent::try_from(table_ident); - let Ok(table_ident) = table_ident_result else { - return InvalidHeartbeatRequestSnafu { - err_msg: format!("invalid table_ident: {:?}", table_ident_result.err()), - } - .fail(); - }; - Ok(Self { id: value.region_id, - table_ident, rcus: value.rcus, wcus: value.wcus, approximate_bytes: value.approximate_bytes, @@ -144,7 +120,7 @@ mod tests { let stat = Stat { cluster_id: 3, id: 101, - region_num: Some(10), + region_num: 10, ..Default::default() }; diff --git a/src/meta-srv/src/handler/persist_stats_handler.rs b/src/meta-srv/src/handler/persist_stats_handler.rs index e22bcec34a..93373218b4 100644 --- a/src/meta-srv/src/handler/persist_stats_handler.rs +++ b/src/meta-srv/src/handler/persist_stats_handler.rs @@ -198,7 +198,7 @@ mod tests { let val: StatValue = kv.value.try_into().unwrap(); // first new stat must be set in kv store immediately assert_eq!(1, val.stats.len()); - assert_eq!(Some(1), val.stats[0].region_num); + assert_eq!(1, val.stats[0].region_num); handle_request_many_times(ctx.clone(), &handler, 10).await; @@ -221,7 +221,7 @@ mod tests { stat: Some(Stat { cluster_id: 3, id: 101, - region_num: Some(i as _), + region_num: i as _, ..Default::default() }), ..Default::default() diff --git a/src/meta-srv/src/handler/region_lease_handler.rs b/src/meta-srv/src/handler/region_lease_handler.rs index 058e6a760f..1177c60416 100644 --- a/src/meta-srv/src/handler/region_lease_handler.rs +++ b/src/meta-srv/src/handler/region_lease_handler.rs @@ -49,36 +49,37 @@ impl HeartbeatHandler for RegionLeaseHandler { let mut table_region_leases = HashMap::new(); stat.region_stats.iter().for_each(|region_stat| { - let table_ident = region_stat.table_ident.clone(); + let region_id = RegionId::from(region_stat.id); table_region_leases - .entry(table_ident) + .entry(region_id.table_id()) .or_insert_with(Vec::new) - .push(RegionId::from(region_stat.id).region_number()); + .push(region_id.region_number()); }); let inactive_node_manager = InactiveNodeManager::new(&ctx.in_memory); - for (table_ident, region_numbers) in table_region_leases.iter_mut() { + for (table_id, region_numbers) in table_region_leases.iter_mut() { + // TODO(jeremy): refactor this, use region_id inactive_node_manager - .retain_active_regions( - stat.cluster_id, - stat.id, - table_ident.table_id, - region_numbers, - ) + .retain_active_regions(stat.cluster_id, stat.id, *table_id, region_numbers) .await?; } - acc.region_leases = table_region_leases + let region_ids = table_region_leases .into_iter() - .filter(|(_, regions)| !regions.is_empty()) // filter out empty region_numbers - .map(|(table_ident, regions)| RegionLease { - table_ident: Some(table_ident.into()), - regions, - duration_since_epoch: req.duration_since_epoch, - lease_seconds: REGION_LEASE_SECONDS, + .filter(|(_, region_nums)| !region_nums.is_empty()) + .flat_map(|(table_id, region_nums)| { + region_nums + .into_iter() + .map(move |region_num| RegionId::new(table_id, region_num).as_u64()) }) .collect(); + acc.region_lease = Some(RegionLease { + region_ids, + duration_since_epoch: req.duration_since_epoch, + lease_seconds: REGION_LEASE_SECONDS, + }); + Ok(()) } } @@ -87,7 +88,6 @@ impl HeartbeatHandler for RegionLeaseHandler { mod test { use std::sync::Arc; - use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME}; use common_meta::ident::TableIdent; use common_meta::key::TableMetadataManager; use common_meta::RegionIdent; @@ -119,14 +119,6 @@ mod test { ) .await; - let table_ident = TableIdent { - catalog: DEFAULT_CATALOG_NAME.to_string(), - schema: DEFAULT_SCHEMA_NAME.to_string(), - table: table_name.to_string(), - table_id, - engine: "mito".to_string(), - }; - let req = HeartbeatRequest { duration_since_epoch: 1234, ..Default::default() @@ -141,13 +133,6 @@ mod test { let region_id = RegionId::new(table_id, region_number); RegionStat { id: region_id.as_u64(), - table_ident: TableIdent { - catalog: DEFAULT_CATALOG_NAME.to_string(), - schema: DEFAULT_SCHEMA_NAME.to_string(), - table: table_name.to_string(), - table_id: 1, - engine: "mito".to_string(), - }, ..Default::default() } }; @@ -186,10 +171,9 @@ mod test { RegionLeaseHandler.handle(&req, ctx, acc).await.unwrap(); - assert_eq!(acc.region_leases.len(), 1); - let lease = acc.region_leases.remove(0); - assert_eq!(lease.table_ident.unwrap(), table_ident.into()); - assert_eq!(lease.regions, vec![2]); + assert!(acc.region_lease.is_some()); + let lease = acc.region_lease.as_ref().unwrap(); + assert_eq!(lease.region_ids, vec![RegionId::new(table_id, 2).as_u64()]); assert_eq!(lease.duration_since_epoch, 1234); assert_eq!(lease.lease_seconds, REGION_LEASE_SECONDS); } diff --git a/src/meta-srv/src/keys.rs b/src/meta-srv/src/keys.rs index a64763cbfd..d01b704b9c 100644 --- a/src/meta-srv/src/keys.rs +++ b/src/meta-srv/src/keys.rs @@ -204,13 +204,7 @@ pub struct StatValue { impl StatValue { /// Get the latest number of regions. pub fn region_num(&self) -> Option { - for stat in self.stats.iter().rev() { - match stat.region_num { - Some(region_num) => return Some(region_num), - None => continue, - } - } - None + self.stats.last().map(|x| x.region_num) } } @@ -297,7 +291,7 @@ mod tests { let stat = Stat { cluster_id: 0, id: 101, - region_num: Some(100), + region_num: 100, ..Default::default() }; @@ -312,7 +306,7 @@ mod tests { let stat = stats.get(0).unwrap(); assert_eq!(0, stat.cluster_id); assert_eq!(101, stat.id); - assert_eq!(Some(100), stat.region_num); + assert_eq!(100, stat.region_num); } #[test] @@ -349,25 +343,25 @@ mod tests { let wrong = StatValue { stats: vec![Stat { - region_num: None, + region_num: 0, ..Default::default() }], }; let right = wrong.region_num(); - assert!(right.is_none()); + assert_eq!(Some(0), right); let stat_val = StatValue { stats: vec![ Stat { - region_num: Some(1), + region_num: 1, ..Default::default() }, Stat { - region_num: None, + region_num: 0, ..Default::default() }, Stat { - region_num: Some(2), + region_num: 2, ..Default::default() }, ], diff --git a/src/meta-srv/src/selector/load_based.rs b/src/meta-srv/src/selector/load_based.rs index 4045d5a898..72306bd9ed 100644 --- a/src/meta-srv/src/selector/load_based.rs +++ b/src/meta-srv/src/selector/load_based.rs @@ -13,12 +13,10 @@ // limitations under the License. use api::v1::meta::Peer; -use common_meta::ident::TableIdent; use common_telemetry::warn; use crate::error::Result; -use crate::handler::node_stat::RegionStat; -use crate::keys::{LeaseKey, LeaseValue, StatKey, StatValue}; +use crate::keys::{LeaseKey, LeaseValue, StatKey}; use crate::lease; use crate::metasrv::SelectorContext; use crate::selector::{Namespace, Selector}; @@ -45,17 +43,6 @@ impl Selector for LoadBasedSelector { let mut tuples: Vec<(LeaseKey, LeaseValue, u64)> = lease_kvs .into_iter() - // The regions of a table need to be distributed on different datanode. - .filter(|(lease_k, _)| { - if let Some(stat_val) = stat_kvs.get(&lease_k.into()) { - if let (Some(catalog), Some(schema), Some(table)) = - (&ctx.catalog, &ctx.schema, &ctx.table) - { - return contains_table(stat_val, catalog, schema, table) != Some(true); - } - } - true - }) .map(|(lease_k, lease_v)| { let stat_key: StatKey = (&lease_k).into(); @@ -86,160 +73,3 @@ impl Selector for LoadBasedSelector { .collect()) } } - -// Determine whether there is the table in datanode according to the heartbeats. -// -// Result: -// None indicates no heartbeats in stat_val; -// Some(true) indicates table exists in the datanode; -// Some(false) indicates that table not exists in datanode. -fn contains_table( - stat_val: &StatValue, - catalog_name: &str, - schema_name: &str, - table_name: &str, -) -> Option { - let may_latest = stat_val.stats.last(); - - if let Some(latest) = may_latest { - for RegionStat { - table_ident: - TableIdent { - catalog, - schema, - table, - .. - }, - .. - } in latest.region_stats.iter() - { - if catalog_name == catalog && schema_name == schema && table_name == table { - return Some(true); - } - } - } else { - return None; - } - - Some(false) -} - -#[cfg(test)] -mod tests { - use common_meta::ident::TableIdent; - - use crate::handler::node_stat::{RegionStat, Stat}; - use crate::keys::StatValue; - use crate::selector::load_based::contains_table; - - #[test] - fn test_contains_table_from_stat_val() { - let empty = StatValue { stats: vec![] }; - assert!(contains_table(&empty, "greptime_4", "public_4", "demo_5").is_none()); - - let stat_val = StatValue { - stats: vec![ - Stat { - region_stats: vec![ - RegionStat { - table_ident: TableIdent { - catalog: "greptime_1".to_string(), - schema: "public_1".to_string(), - table: "demo_1".to_string(), - ..Default::default() - }, - ..Default::default() - }, - RegionStat { - table_ident: TableIdent { - catalog: "greptime_2".to_string(), - schema: "public_2".to_string(), - table: "demo_2".to_string(), - ..Default::default() - }, - ..Default::default() - }, - RegionStat { - table_ident: TableIdent { - catalog: "greptime_3".to_string(), - schema: "public_3".to_string(), - table: "demo_3".to_string(), - ..Default::default() - }, - ..Default::default() - }, - ], - ..Default::default() - }, - Stat { - region_stats: vec![ - RegionStat { - table_ident: TableIdent { - catalog: "greptime_1".to_string(), - schema: "public_1".to_string(), - table: "demo_1".to_string(), - ..Default::default() - }, - ..Default::default() - }, - RegionStat { - table_ident: TableIdent { - catalog: "greptime_2".to_string(), - schema: "public_2".to_string(), - table: "demo_2".to_string(), - ..Default::default() - }, - ..Default::default() - }, - RegionStat { - table_ident: TableIdent { - catalog: "greptime_3".to_string(), - schema: "public_3".to_string(), - table: "demo_3".to_string(), - ..Default::default() - }, - ..Default::default() - }, - ], - ..Default::default() - }, - Stat { - region_stats: vec![ - RegionStat { - table_ident: TableIdent { - catalog: "greptime_1".to_string(), - schema: "public_1".to_string(), - table: "demo_1".to_string(), - ..Default::default() - }, - ..Default::default() - }, - RegionStat { - table_ident: TableIdent { - catalog: "greptime_2".to_string(), - schema: "public_2".to_string(), - table: "demo_2".to_string(), - ..Default::default() - }, - ..Default::default() - }, - RegionStat { - table_ident: TableIdent { - catalog: "greptime_4".to_string(), - schema: "public_4".to_string(), - table: "demo_4".to_string(), - ..Default::default() - }, - ..Default::default() - }, - ], - ..Default::default() - }, - ], - }; - assert!(contains_table(&stat_val, "greptime_1", "public_1", "demo_1").unwrap()); - assert!(contains_table(&stat_val, "greptime_2", "public_2", "demo_2").unwrap()); - assert!(!contains_table(&stat_val, "greptime_3", "public_3", "demo_3").unwrap()); - assert!(contains_table(&stat_val, "greptime_4", "public_4", "demo_4").unwrap()); - } -}