From 6f5639fccd175816e0e17cc537f2d6d749690b85 Mon Sep 17 00:00:00 2001 From: fys <40801205+Fengys123@users.noreply.github.com> Date: Tue, 17 Jan 2023 10:25:00 +0800 Subject: [PATCH] feat: add load_based selector in meta (#874) * fix: wrong error info * add derive hash for StatKey * add a attrs field in Context * add load_based selector * add license * make Nodestat module public * add meta startup config item about selector * cr: remove attrs, add concrete type in context * cr: change region_number type to Option * cr: add comment in example.toml * cr --- Cargo.lock | 1 + config/metasrv.example.toml | 2 + src/cmd/src/error.rs | 8 + src/cmd/src/metasrv.rs | 16 +- src/meta-srv/Cargo.toml | 1 + src/meta-srv/src/bootstrap.rs | 3 +- src/meta-srv/src/error.rs | 32 +++ src/meta-srv/src/handler.rs | 2 +- src/meta-srv/src/handler/node_stat.rs | 43 ++-- .../src/handler/persist_stats_handler.rs | 7 +- .../src/handler/response_header_handler.rs | 3 + src/meta-srv/src/keys.rs | 62 +++++- src/meta-srv/src/metasrv.rs | 10 +- src/meta-srv/src/selector.rs | 71 ++++++ src/meta-srv/src/selector/load_based.rs | 210 ++++++++++++++++++ src/meta-srv/src/service/router.rs | 24 +- 16 files changed, 462 insertions(+), 33 deletions(-) create mode 100644 src/meta-srv/src/selector/load_based.rs diff --git a/Cargo.lock b/Cargo.lock index b0d8d8d832..96b40dbb4e 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3818,6 +3818,7 @@ dependencies = [ name = "meta-srv" version = "0.1.0" dependencies = [ + "anymap", "api", "async-trait", "catalog", diff --git a/config/metasrv.example.toml b/config/metasrv.example.toml index 5531e6e4a6..63b5dbc7f4 100644 --- a/config/metasrv.example.toml +++ b/config/metasrv.example.toml @@ -2,3 +2,5 @@ bind_addr = '127.0.0.1:3002' server_addr = '127.0.0.1:3002' store_addr = '127.0.0.1:2379' datanode_lease_secs = 15 +# selector: 'LeaseBased', 'LoadBased' +selector = 'LeaseBased' diff --git a/src/cmd/src/error.rs b/src/cmd/src/error.rs index 1b996f1ff9..fd2eb7d1ae 100644 --- a/src/cmd/src/error.rs +++ b/src/cmd/src/error.rs @@ -61,6 +61,13 @@ pub enum Error { #[snafu(backtrace)] source: servers::auth::Error, }, + + #[snafu(display("Unsupported selector type, {} source: {}", selector_type, source))] + UnsupportedSelectorType { + selector_type: String, + #[snafu(backtrace)] + source: meta_srv::error::Error, + }, } pub type Result = std::result::Result; @@ -71,6 +78,7 @@ impl ErrorExt for Error { Error::StartDatanode { source } => source.status_code(), Error::StartFrontend { source } => source.status_code(), Error::StartMetaServer { source } => source.status_code(), + Error::UnsupportedSelectorType { source, .. } => source.status_code(), Error::ReadConfig { .. } | Error::ParseConfig { .. } | Error::MissingConfig { .. } => { StatusCode::InvalidArguments } diff --git a/src/cmd/src/metasrv.rs b/src/cmd/src/metasrv.rs index 22d4729f90..a7c726e2fb 100644 --- a/src/cmd/src/metasrv.rs +++ b/src/cmd/src/metasrv.rs @@ -13,7 +13,7 @@ // limitations under the License. use clap::Parser; -use common_telemetry::logging; +use common_telemetry::{info, logging}; use meta_srv::bootstrap; use meta_srv::metasrv::MetaSrvOptions; use snafu::ResultExt; @@ -56,6 +56,8 @@ struct StartCommand { store_addr: Option, #[clap(short, long)] config_file: Option, + #[clap(short, long)] + selector: Option, } impl StartCommand { @@ -91,6 +93,12 @@ impl TryFrom for MetaSrvOptions { if let Some(addr) = cmd.store_addr { opts.store_addr = addr; } + if let Some(selector_type) = &cmd.selector { + opts.selector = selector_type[..] + .try_into() + .context(error::UnsupportedSelectorTypeSnafu { selector_type })?; + info!("Using {} selector", selector_type); + } Ok(opts) } @@ -98,6 +106,8 @@ impl TryFrom for MetaSrvOptions { #[cfg(test)] mod tests { + use meta_srv::selector::SelectorType; + use super::*; #[test] @@ -107,11 +117,13 @@ mod tests { server_addr: Some("127.0.0.1:3002".to_string()), store_addr: Some("127.0.0.1:2380".to_string()), config_file: None, + selector: Some("LoadBased".to_string()), }; let options: MetaSrvOptions = cmd.try_into().unwrap(); assert_eq!("127.0.0.1:3002".to_string(), options.bind_addr); assert_eq!("127.0.0.1:3002".to_string(), options.server_addr); assert_eq!("127.0.0.1:2380".to_string(), options.store_addr); + assert_eq!(SelectorType::LoadBased, options.selector); } #[test] @@ -120,6 +132,7 @@ mod tests { bind_addr: None, server_addr: None, store_addr: None, + selector: None, config_file: Some(format!( "{}/../../config/metasrv.example.toml", std::env::current_dir().unwrap().as_path().to_str().unwrap() @@ -130,5 +143,6 @@ mod tests { assert_eq!("127.0.0.1:3002".to_string(), options.server_addr); assert_eq!("127.0.0.1:2379".to_string(), options.store_addr); assert_eq!(15, options.datanode_lease_secs); + assert_eq!(SelectorType::LeaseBased, options.selector); } } diff --git a/src/meta-srv/Cargo.toml b/src/meta-srv/Cargo.toml index 003bec65f6..0ccdeba585 100644 --- a/src/meta-srv/Cargo.toml +++ b/src/meta-srv/Cargo.toml @@ -8,6 +8,7 @@ license.workspace = true mock = [] [dependencies] +anymap = "1.0.0-beta.2" api = { path = "../api" } async-trait = "0.1" catalog = { path = "../catalog" } diff --git a/src/meta-srv/src/bootstrap.rs b/src/meta-srv/src/bootstrap.rs index c03582d693..1ddb5944df 100644 --- a/src/meta-srv/src/bootstrap.rs +++ b/src/meta-srv/src/bootstrap.rs @@ -60,7 +60,8 @@ pub fn router(meta_srv: MetaSrv) -> Router { pub async fn make_meta_srv(opts: MetaSrvOptions) -> Result { let kv_store = EtcdStore::with_endpoints([&opts.store_addr]).await?; let election = EtcdElection::with_endpoints(&opts.server_addr, [&opts.store_addr]).await?; - let meta_srv = MetaSrv::new(opts, kv_store, None, Some(election), None).await; + let selector = opts.selector.clone().into(); + let meta_srv = MetaSrv::new(opts, kv_store, Some(selector), Some(election), None).await; meta_srv.start().await; Ok(meta_srv) } diff --git a/src/meta-srv/src/error.rs b/src/meta-srv/src/error.rs index 7037078184..23acdacc91 100644 --- a/src/meta-srv/src/error.rs +++ b/src/meta-srv/src/error.rs @@ -55,12 +55,33 @@ pub enum Error { #[snafu(display("Invalid datanode lease key: {}", key))] InvalidLeaseKey { key: String, backtrace: Backtrace }, + #[snafu(display("Invalid datanode stat key: {}", key))] + InvalidStatKey { key: String, backtrace: Backtrace }, + #[snafu(display("Failed to parse datanode lease key from utf8: {}", source))] LeaseKeyFromUtf8 { source: std::string::FromUtf8Error, backtrace: Backtrace, }, + #[snafu(display("Failed to parse datanode lease value from utf8: {}", source))] + LeaseValueFromUtf8 { + source: std::string::FromUtf8Error, + backtrace: Backtrace, + }, + + #[snafu(display("Failed to parse datanode stat key from utf8: {}", source))] + StatKeyFromUtf8 { + source: std::string::FromUtf8Error, + backtrace: Backtrace, + }, + + #[snafu(display("Failed to parse datanode stat value from utf8: {}", source))] + StatValueFromUtf8 { + source: std::string::FromUtf8Error, + backtrace: Backtrace, + }, + #[snafu(display("Failed to serialize to json: {}", input))] SerializeToJson { input: String, @@ -132,6 +153,12 @@ pub enum Error { key ))] MoveValue { key: String, backtrace: Backtrace }, + + #[snafu(display("Unsupported selector type, {}", selector_type))] + UnsupportedSelectorType { + selector_type: String, + backtrace: Backtrace, + }, } pub type Result = std::result::Result; @@ -165,9 +192,14 @@ impl ErrorExt for Error { Error::EmptyKey { .. } | Error::EmptyTableName { .. } | Error::InvalidLeaseKey { .. } + | Error::InvalidStatKey { .. } | Error::ParseNum { .. } + | Error::UnsupportedSelectorType { .. } | Error::InvalidArguments { .. } => StatusCode::InvalidArguments, Error::LeaseKeyFromUtf8 { .. } + | Error::LeaseValueFromUtf8 { .. } + | Error::StatKeyFromUtf8 { .. } + | Error::StatValueFromUtf8 { .. } | Error::UnexceptedSequenceValue { .. } | Error::TableRouteNotFound { .. } | Error::NextSequence { .. } diff --git a/src/meta-srv/src/handler.rs b/src/meta-srv/src/handler.rs index 6c146e517a..62a3d26483 100644 --- a/src/meta-srv/src/handler.rs +++ b/src/meta-srv/src/handler.rs @@ -23,7 +23,7 @@ mod check_leader_handler; mod collect_stats_handler; mod instruction; mod keep_lease_handler; -pub(crate) mod node_stat; +pub mod node_stat; mod on_leader_start; mod persist_stats_handler; mod response_header_handler; diff --git a/src/meta-srv/src/handler/node_stat.rs b/src/meta-srv/src/handler/node_stat.rs index 3c2ad1d943..1cdb49c1c8 100644 --- a/src/meta-srv/src/handler/node_stat.rs +++ b/src/meta-srv/src/handler/node_stat.rs @@ -33,7 +33,7 @@ pub struct Stat { /// How many tables on this node pub table_num: i64, /// How many regions on this node - pub region_num: i64, + pub region_num: Option, pub cpu_usage: f64, pub load: f64, /// Read disk IO on this node @@ -83,22 +83,29 @@ impl TryFrom for Stat { } = value; match (header, peer, node_stat) { - (Some(header), Some(peer), Some(node_stat)) => Ok(Self { - timestamp_millis: time_util::current_time_millis(), - cluster_id: header.cluster_id, - id: peer.id, - addr: peer.addr, - is_leader, - rcus: node_stat.rcus, - wcus: node_stat.wcus, - table_num: node_stat.table_num, - region_num: node_stat.region_num, - cpu_usage: node_stat.cpu_usage, - load: node_stat.load, - read_io_rate: node_stat.read_io_rate, - write_io_rate: node_stat.write_io_rate, - region_stats: region_stats.into_iter().map(RegionStat::from).collect(), - }), + (Some(header), Some(peer), Some(node_stat)) => { + let region_num = if node_stat.region_num >= 0 { + Some(node_stat.region_num as u64) + } else { + None + }; + Ok(Self { + timestamp_millis: time_util::current_time_millis(), + cluster_id: header.cluster_id, + id: peer.id, + addr: peer.addr, + is_leader, + rcus: node_stat.rcus, + wcus: node_stat.wcus, + table_num: node_stat.table_num, + region_num, + cpu_usage: node_stat.cpu_usage, + load: node_stat.load, + read_io_rate: node_stat.read_io_rate, + write_io_rate: node_stat.write_io_rate, + region_stats: region_stats.into_iter().map(RegionStat::from).collect(), + }) + } _ => Err(()), } } @@ -129,7 +136,7 @@ mod tests { let stat = Stat { cluster_id: 3, id: 101, - region_num: 10, + region_num: Some(10), ..Default::default() }; diff --git a/src/meta-srv/src/handler/persist_stats_handler.rs b/src/meta-srv/src/handler/persist_stats_handler.rs index 146dc8be30..c1e36ba9dc 100644 --- a/src/meta-srv/src/handler/persist_stats_handler.rs +++ b/src/meta-srv/src/handler/persist_stats_handler.rs @@ -80,6 +80,9 @@ mod tests { kv_store, election: None, skip_all: Arc::new(AtomicBool::new(false)), + catalog: None, + schema: None, + table: None, }; let req = HeartbeatRequest::default(); @@ -87,7 +90,7 @@ mod tests { stats: vec![Stat { cluster_id: 3, id: 101, - region_num: 100, + region_num: Some(100), ..Default::default() }], ..Default::default() @@ -122,6 +125,6 @@ mod tests { let val: StatValue = kv.value.clone().try_into().unwrap(); assert_eq!(1, val.stats.len()); - assert_eq!(100, val.stats[0].region_num); + assert_eq!(Some(100), val.stats[0].region_num); } } diff --git a/src/meta-srv/src/handler/response_header_handler.rs b/src/meta-srv/src/handler/response_header_handler.rs index f4158a0e5d..dbbec2db59 100644 --- a/src/meta-srv/src/handler/response_header_handler.rs +++ b/src/meta-srv/src/handler/response_header_handler.rs @@ -62,6 +62,9 @@ mod tests { kv_store, election: None, skip_all: Arc::new(AtomicBool::new(false)), + catalog: None, + schema: None, + table: None, }; let req = HeartbeatRequest { diff --git a/src/meta-srv/src/keys.rs b/src/meta-srv/src/keys.rs index 2f3283fe9e..d4b0040b20 100644 --- a/src/meta-srv/src/keys.rs +++ b/src/meta-srv/src/keys.rs @@ -112,7 +112,7 @@ impl TryFrom> for LeaseValue { fn try_from(bytes: Vec) -> Result { String::from_utf8(bytes) - .context(error::LeaseKeyFromUtf8Snafu {}) + .context(error::LeaseValueFromUtf8Snafu {}) .map(|x| x.parse())? } } @@ -174,7 +174,7 @@ impl<'a> TableRouteKey<'a> { } } -#[derive(Eq, PartialEq, Debug)] +#[derive(Eq, PartialEq, Debug, Hash)] pub struct StatKey { pub cluster_id: u64, pub node_id: u64, @@ -194,7 +194,7 @@ impl FromStr for StatKey { .captures(key) .context(error::InvalidLeaseKeySnafu { key })?; - ensure!(caps.len() == 3, error::InvalidLeaseKeySnafu { key }); + ensure!(caps.len() == 3, error::InvalidStatKeySnafu { key }); let cluster_id = caps[1].to_string(); let node_id = caps[2].to_string(); @@ -217,7 +217,7 @@ impl TryFrom> for StatKey { fn try_from(bytes: Vec) -> Result { String::from_utf8(bytes) - .context(error::LeaseKeyFromUtf8Snafu {}) + .context(error::StatKeyFromUtf8Snafu {}) .map(|x| x.parse())? } } @@ -228,6 +228,19 @@ pub struct StatValue { pub stats: Vec, } +impl StatValue { + /// Get the region number from stat value. + pub fn region_num(&self) -> Option { + for stat in self.stats.iter() { + match stat.region_num { + Some(region_num) => return Some(region_num), + None => continue, + } + } + None + } +} + impl TryFrom for Vec { type Error = error::Error; @@ -253,7 +266,7 @@ impl TryFrom> for StatValue { fn try_from(value: Vec) -> Result { String::from_utf8(value) - .context(error::LeaseKeyFromUtf8Snafu {}) + .context(error::StatValueFromUtf8Snafu {}) .map(|x| x.parse())? } } @@ -282,7 +295,7 @@ mod tests { cluster_id: 0, id: 101, is_leader: false, - region_num: 100, + region_num: Some(100), ..Default::default() }; @@ -298,7 +311,7 @@ mod tests { assert_eq!(0, stat.cluster_id); assert_eq!(101, stat.id); assert!(!stat.is_leader); - assert_eq!(100, stat.region_num); + assert_eq!(Some(100), stat.region_num); } #[test] @@ -326,4 +339,39 @@ mod tests { assert_eq!(new_value, value); } + + #[test] + fn test_get_region_num_from_stat_val() { + let empty = StatValue { stats: vec![] }; + let region_num = empty.region_num(); + assert!(region_num.is_none()); + + let wrong = StatValue { + stats: vec![Stat { + region_num: None, + ..Default::default() + }], + }; + let right = wrong.region_num(); + assert!(right.is_none()); + + let stat_val = StatValue { + stats: vec![ + Stat { + region_num: Some(1), + ..Default::default() + }, + Stat { + region_num: None, + ..Default::default() + }, + Stat { + region_num: Some(2), + ..Default::default() + }, + ], + }; + let region_num = stat_val.region_num().unwrap(); + assert_eq!(1, region_num); + } } diff --git a/src/meta-srv/src/metasrv.rs b/src/meta-srv/src/metasrv.rs index 91cae7f8ee..4b05605ae3 100644 --- a/src/meta-srv/src/metasrv.rs +++ b/src/meta-srv/src/metasrv.rs @@ -25,7 +25,7 @@ use crate::handler::{ OnLeaderStartHandler, PersistStatsHandler, ResponseHeaderHandler, }; use crate::selector::lease_based::LeaseBasedSelector; -use crate::selector::Selector; +use crate::selector::{Selector, SelectorType}; use crate::sequence::{Sequence, SequenceRef}; use crate::service::store::kv::{KvStoreRef, ResetableKvStoreRef}; use crate::service::store::memory::MemStore; @@ -39,6 +39,7 @@ pub struct MetaSrvOptions { pub server_addr: String, pub store_addr: String, pub datanode_lease_secs: i64, + pub selector: SelectorType, } impl Default for MetaSrvOptions { @@ -48,6 +49,7 @@ impl Default for MetaSrvOptions { server_addr: "127.0.0.1:3002".to_string(), store_addr: "127.0.0.1:2379".to_string(), datanode_lease_secs: 15, + selector: SelectorType::default(), } } } @@ -60,6 +62,9 @@ pub struct Context { pub kv_store: KvStoreRef, pub election: Option, pub skip_all: Arc, + pub catalog: Option, + pub schema: Option, + pub table: Option, } impl Context { @@ -219,6 +224,9 @@ impl MetaSrv { kv_store, election, skip_all, + catalog: None, + schema: None, + table: None, } } } diff --git a/src/meta-srv/src/selector.rs b/src/meta-srv/src/selector.rs index 2404565146..065f8f95bf 100644 --- a/src/meta-srv/src/selector.rs +++ b/src/meta-srv/src/selector.rs @@ -13,8 +13,17 @@ // limitations under the License. pub mod lease_based; +pub mod load_based; +use std::sync::Arc; + +use serde::{Deserialize, Serialize}; + +use self::lease_based::LeaseBasedSelector; +use self::load_based::LoadBasedSelector; +use crate::error; use crate::error::Result; +use crate::metasrv::SelectorRef; pub type Namespace = u64; @@ -25,3 +34,65 @@ pub trait Selector: Send + Sync { async fn select(&self, ns: Namespace, ctx: &Self::Context) -> Result; } + +#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)] +pub enum SelectorType { + LoadBased, + LeaseBased, +} + +impl From for SelectorRef { + fn from(selector_type: SelectorType) -> Self { + match selector_type { + SelectorType::LoadBased => Arc::new(LoadBasedSelector) as SelectorRef, + SelectorType::LeaseBased => Arc::new(LeaseBasedSelector) as SelectorRef, + } + } +} + +impl Default for SelectorType { + fn default() -> Self { + SelectorType::LeaseBased + } +} + +impl TryFrom<&str> for SelectorType { + type Error = error::Error; + + fn try_from(value: &str) -> Result { + match value { + "LoadBased" => Ok(SelectorType::LoadBased), + "LeaseBased" => Ok(SelectorType::LeaseBased), + other => error::UnsupportedSelectorTypeSnafu { + selector_type: other, + } + .fail(), + } + } +} + +#[cfg(test)] +mod tests { + use super::SelectorType; + use crate::error::Result; + + #[test] + fn test_default_selector_type() { + assert_eq!(SelectorType::LeaseBased, SelectorType::default()); + } + + #[test] + fn test_convert_str_to_selector_type() { + let leasebased = "LeaseBased"; + let selector_type = leasebased.try_into().unwrap(); + assert_eq!(SelectorType::LeaseBased, selector_type); + + let loadbased = "LoadBased"; + let selector_type = loadbased.try_into().unwrap(); + assert_eq!(SelectorType::LoadBased, selector_type); + + let unknow = "unknow"; + let selector_type: Result = unknow.try_into(); + assert!(selector_type.is_err()); + } +} diff --git a/src/meta-srv/src/selector/load_based.rs b/src/meta-srv/src/selector/load_based.rs new file mode 100644 index 0000000000..15ec2754a7 --- /dev/null +++ b/src/meta-srv/src/selector/load_based.rs @@ -0,0 +1,210 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::collections::HashMap; + +use api::v1::meta::{Peer, RangeRequest}; +use common_time::util as time_util; + +use super::{Namespace, Selector}; +use crate::error::Result; +use crate::keys::{LeaseKey, LeaseValue, StatKey, StatValue, DN_STAT_PREFIX}; +use crate::metasrv::Context; +use crate::service::store::kv::KvStoreRef; +use crate::{lease, util}; + +pub struct LoadBasedSelector; + +#[async_trait::async_trait] +impl Selector for LoadBasedSelector { + type Context = Context; + type Output = Vec; + + async fn select(&self, ns: Namespace, ctx: &Self::Context) -> Result { + // get alive datanodes + let lease_filter = |_: &LeaseKey, v: &LeaseValue| { + time_util::current_time_millis() - v.timestamp_millis < ctx.datanode_lease_secs * 1000 + }; + let lease_kvs: HashMap = + lease::alive_datanodes(ns, &ctx.kv_store, lease_filter) + .await? + .into_iter() + .collect(); + + // get stats of datanodes + let stat_kvs = all_stat_kvs(ns, &ctx.kv_store).await?; + + // filter out expired datanodes and nodes that cannot get region number + let mut tuples: Vec<_> = stat_kvs + .iter() + .filter_map(|(stat_key, stat_val)| { + match ( + lease_kvs.get(&to_lease_key(stat_key)), + stat_val.region_num(), + ) { + (Some(lease_val), Some(region_num)) => Some((stat_key, lease_val, region_num)), + _ => None, + } + }) + .collect(); + + // sort the datanodes according to the number of regions + tuples.sort_by(|a, b| a.2.cmp(&b.2)); + + Ok(tuples + .into_iter() + .map(|(stat_key, lease_val, _)| Peer { + id: stat_key.node_id, + addr: lease_val.node_addr.clone(), + }) + .collect()) + } +} + +// get all stat kvs from store +pub async fn all_stat_kvs( + cluster_id: u64, + kv_store: &KvStoreRef, +) -> Result> { + let key = stat_prefix(cluster_id); + let range_end = util::get_prefix_end_key(&key); + let req = RangeRequest { + key, + range_end, + ..Default::default() + }; + + let kvs = kv_store.range(req).await?.kvs; + + let mut stat_kvs = Vec::with_capacity(kvs.len()); + + for kv in kvs { + let key: StatKey = kv.key.try_into()?; + let value: StatValue = kv.value.try_into()?; + stat_kvs.push((key, value)); + } + + Ok(stat_kvs) +} + +fn to_lease_key(k: &StatKey) -> LeaseKey { + LeaseKey { + cluster_id: k.cluster_id, + node_id: k.node_id, + } +} + +fn stat_prefix(cluster_id: u64) -> Vec { + format!("{DN_STAT_PREFIX}-{cluster_id}").into_bytes() +} + +#[cfg(test)] +mod tests { + use std::sync::Arc; + + use api::v1::meta::PutRequest; + + use super::{all_stat_kvs, to_lease_key}; + use crate::handler::node_stat::Stat; + use crate::keys::{StatKey, StatValue}; + use crate::selector::load_based::stat_prefix; + use crate::service::store::kv::{KvStore, KvStoreRef}; + use crate::service::store::memory::MemStore; + + #[tokio::test] + async fn test_all_stat_kvs() { + let kv_store = Arc::new(MemStore::new()) as Arc; + let kvs = all_stat_kvs(0, &kv_store).await.unwrap(); + assert!(kvs.is_empty()); + + let mut kv_store = Arc::new(MemStore::new()) as Arc; + put_stats_to_store(&mut kv_store).await; + let kvs = all_stat_kvs(0, &kv_store).await.unwrap(); + assert_eq!(2, kvs.len()); + let kvs = all_stat_kvs(1, &kv_store).await.unwrap(); + assert_eq!(1, kvs.len()); + } + + #[test] + fn test_to_lease_key() { + let statkey = StatKey { + cluster_id: 1, + node_id: 101, + }; + let lease_key = to_lease_key(&statkey); + assert_eq!(1, lease_key.cluster_id); + assert_eq!(101, lease_key.node_id); + } + + #[test] + fn test_stat_prefix() { + assert_eq!(stat_prefix(1), b"__meta_dnstat-1"); + } + + async fn put_stats_to_store(store: &mut KvStoreRef) { + let put1 = PutRequest { + key: StatKey { + cluster_id: 0, + node_id: 101, + } + .into(), + value: StatValue { + stats: vec![Stat { + region_num: Some(100), + ..Default::default() + }], + } + .try_into() + .unwrap(), + ..Default::default() + }; + store.put(put1).await.unwrap(); + + let put2 = PutRequest { + key: StatKey { + cluster_id: 0, + node_id: 102, + } + .into(), + value: StatValue { + stats: vec![Stat { + region_num: Some(99), + ..Default::default() + }], + } + .try_into() + .unwrap(), + ..Default::default() + }; + store.put(put2).await.unwrap(); + + let put3 = PutRequest { + key: StatKey { + cluster_id: 1, + node_id: 103, + } + .into(), + value: StatValue { + stats: vec![Stat { + region_num: Some(98), + ..Default::default() + }], + } + .try_into() + .unwrap(), + ..Default::default() + }; + store.put(put3).await.unwrap(); + } +} diff --git a/src/meta-srv/src/service/router.rs b/src/meta-srv/src/service/router.rs index 29c94b8a4e..ffe102d294 100644 --- a/src/meta-srv/src/service/router.rs +++ b/src/meta-srv/src/service/router.rs @@ -15,7 +15,7 @@ use api::v1::meta::{ router_server, CreateRequest, DeleteRequest, Error, MoveValueRequest, Peer, PeerDict, PutRequest, RangeRequest, Region, RegionRoute, ResponseHeader, RouteRequest, RouteResponse, - Table, TableRoute, TableRouteValue, + Table, TableName, TableRoute, TableRouteValue, }; use catalog::helper::{TableGlobalKey, TableGlobalValue}; use common_telemetry::warn; @@ -34,9 +34,14 @@ use crate::service::GrpcResult; impl router_server::Router for MetaSrv { async fn create(&self, req: Request) -> GrpcResult { let req = req.into_inner(); - let ctx = self.new_ctx(); + + let CreateRequest { table_name, .. } = &req; + let table_name = table_name.clone().context(error::EmptyTableNameSnafu)?; + let ctx = self.create_ctx(table_name); + let selector = self.selector(); let table_id_sequence = self.table_id_sequence(); + let res = handle_create(req, ctx, selector, table_id_sequence).await?; Ok(Response::new(res)) @@ -59,6 +64,21 @@ impl router_server::Router for MetaSrv { } } +impl MetaSrv { + fn create_ctx(&self, table_name: TableName) -> Context { + let mut ctx = self.new_ctx(); + let TableName { + catalog_name, + schema_name, + table_name, + } = table_name; + ctx.catalog = Some(catalog_name); + ctx.schema = Some(schema_name); + ctx.table = Some(table_name); + ctx + } +} + async fn handle_create( req: CreateRequest, ctx: Context,