feat: add load_based selector in meta (#874)

* fix: wrong error info

* add derive hash for StatKey

* add a attrs field in Context

* add load_based selector

* add license

* make Nodestat module public

* add meta startup config item about selector

* cr: remove attrs, add concrete type in context

* cr: change region_number type to Option<u64>

* cr: add comment in example.toml

* cr
This commit is contained in:
fys
2023-01-17 10:25:00 +08:00
committed by GitHub
parent 1e9d09099e
commit 6f5639fccd
16 changed files with 462 additions and 33 deletions

1
Cargo.lock generated
View File

@@ -3818,6 +3818,7 @@ dependencies = [
name = "meta-srv"
version = "0.1.0"
dependencies = [
"anymap",
"api",
"async-trait",
"catalog",

View File

@@ -2,3 +2,5 @@ bind_addr = '127.0.0.1:3002'
server_addr = '127.0.0.1:3002'
store_addr = '127.0.0.1:2379'
datanode_lease_secs = 15
# selector: 'LeaseBased', 'LoadBased'
selector = 'LeaseBased'

View File

@@ -61,6 +61,13 @@ pub enum Error {
#[snafu(backtrace)]
source: servers::auth::Error,
},
#[snafu(display("Unsupported selector type, {} source: {}", selector_type, source))]
UnsupportedSelectorType {
selector_type: String,
#[snafu(backtrace)]
source: meta_srv::error::Error,
},
}
pub type Result<T> = std::result::Result<T, Error>;
@@ -71,6 +78,7 @@ impl ErrorExt for Error {
Error::StartDatanode { source } => source.status_code(),
Error::StartFrontend { source } => source.status_code(),
Error::StartMetaServer { source } => source.status_code(),
Error::UnsupportedSelectorType { source, .. } => source.status_code(),
Error::ReadConfig { .. } | Error::ParseConfig { .. } | Error::MissingConfig { .. } => {
StatusCode::InvalidArguments
}

View File

@@ -13,7 +13,7 @@
// limitations under the License.
use clap::Parser;
use common_telemetry::logging;
use common_telemetry::{info, logging};
use meta_srv::bootstrap;
use meta_srv::metasrv::MetaSrvOptions;
use snafu::ResultExt;
@@ -56,6 +56,8 @@ struct StartCommand {
store_addr: Option<String>,
#[clap(short, long)]
config_file: Option<String>,
#[clap(short, long)]
selector: Option<String>,
}
impl StartCommand {
@@ -91,6 +93,12 @@ impl TryFrom<StartCommand> for MetaSrvOptions {
if let Some(addr) = cmd.store_addr {
opts.store_addr = addr;
}
if let Some(selector_type) = &cmd.selector {
opts.selector = selector_type[..]
.try_into()
.context(error::UnsupportedSelectorTypeSnafu { selector_type })?;
info!("Using {} selector", selector_type);
}
Ok(opts)
}
@@ -98,6 +106,8 @@ impl TryFrom<StartCommand> for MetaSrvOptions {
#[cfg(test)]
mod tests {
use meta_srv::selector::SelectorType;
use super::*;
#[test]
@@ -107,11 +117,13 @@ mod tests {
server_addr: Some("127.0.0.1:3002".to_string()),
store_addr: Some("127.0.0.1:2380".to_string()),
config_file: None,
selector: Some("LoadBased".to_string()),
};
let options: MetaSrvOptions = cmd.try_into().unwrap();
assert_eq!("127.0.0.1:3002".to_string(), options.bind_addr);
assert_eq!("127.0.0.1:3002".to_string(), options.server_addr);
assert_eq!("127.0.0.1:2380".to_string(), options.store_addr);
assert_eq!(SelectorType::LoadBased, options.selector);
}
#[test]
@@ -120,6 +132,7 @@ mod tests {
bind_addr: None,
server_addr: None,
store_addr: None,
selector: None,
config_file: Some(format!(
"{}/../../config/metasrv.example.toml",
std::env::current_dir().unwrap().as_path().to_str().unwrap()
@@ -130,5 +143,6 @@ mod tests {
assert_eq!("127.0.0.1:3002".to_string(), options.server_addr);
assert_eq!("127.0.0.1:2379".to_string(), options.store_addr);
assert_eq!(15, options.datanode_lease_secs);
assert_eq!(SelectorType::LeaseBased, options.selector);
}
}

View File

@@ -8,6 +8,7 @@ license.workspace = true
mock = []
[dependencies]
anymap = "1.0.0-beta.2"
api = { path = "../api" }
async-trait = "0.1"
catalog = { path = "../catalog" }

View File

@@ -60,7 +60,8 @@ pub fn router(meta_srv: MetaSrv) -> Router {
pub async fn make_meta_srv(opts: MetaSrvOptions) -> Result<MetaSrv> {
let kv_store = EtcdStore::with_endpoints([&opts.store_addr]).await?;
let election = EtcdElection::with_endpoints(&opts.server_addr, [&opts.store_addr]).await?;
let meta_srv = MetaSrv::new(opts, kv_store, None, Some(election), None).await;
let selector = opts.selector.clone().into();
let meta_srv = MetaSrv::new(opts, kv_store, Some(selector), Some(election), None).await;
meta_srv.start().await;
Ok(meta_srv)
}

View File

@@ -55,12 +55,33 @@ pub enum Error {
#[snafu(display("Invalid datanode lease key: {}", key))]
InvalidLeaseKey { key: String, backtrace: Backtrace },
#[snafu(display("Invalid datanode stat key: {}", key))]
InvalidStatKey { key: String, backtrace: Backtrace },
#[snafu(display("Failed to parse datanode lease key from utf8: {}", source))]
LeaseKeyFromUtf8 {
source: std::string::FromUtf8Error,
backtrace: Backtrace,
},
#[snafu(display("Failed to parse datanode lease value from utf8: {}", source))]
LeaseValueFromUtf8 {
source: std::string::FromUtf8Error,
backtrace: Backtrace,
},
#[snafu(display("Failed to parse datanode stat key from utf8: {}", source))]
StatKeyFromUtf8 {
source: std::string::FromUtf8Error,
backtrace: Backtrace,
},
#[snafu(display("Failed to parse datanode stat value from utf8: {}", source))]
StatValueFromUtf8 {
source: std::string::FromUtf8Error,
backtrace: Backtrace,
},
#[snafu(display("Failed to serialize to json: {}", input))]
SerializeToJson {
input: String,
@@ -132,6 +153,12 @@ pub enum Error {
key
))]
MoveValue { key: String, backtrace: Backtrace },
#[snafu(display("Unsupported selector type, {}", selector_type))]
UnsupportedSelectorType {
selector_type: String,
backtrace: Backtrace,
},
}
pub type Result<T> = std::result::Result<T, Error>;
@@ -165,9 +192,14 @@ impl ErrorExt for Error {
Error::EmptyKey { .. }
| Error::EmptyTableName { .. }
| Error::InvalidLeaseKey { .. }
| Error::InvalidStatKey { .. }
| Error::ParseNum { .. }
| Error::UnsupportedSelectorType { .. }
| Error::InvalidArguments { .. } => StatusCode::InvalidArguments,
Error::LeaseKeyFromUtf8 { .. }
| Error::LeaseValueFromUtf8 { .. }
| Error::StatKeyFromUtf8 { .. }
| Error::StatValueFromUtf8 { .. }
| Error::UnexceptedSequenceValue { .. }
| Error::TableRouteNotFound { .. }
| Error::NextSequence { .. }

View File

@@ -23,7 +23,7 @@ mod check_leader_handler;
mod collect_stats_handler;
mod instruction;
mod keep_lease_handler;
pub(crate) mod node_stat;
pub mod node_stat;
mod on_leader_start;
mod persist_stats_handler;
mod response_header_handler;

View File

@@ -33,7 +33,7 @@ pub struct Stat {
/// How many tables on this node
pub table_num: i64,
/// How many regions on this node
pub region_num: i64,
pub region_num: Option<u64>,
pub cpu_usage: f64,
pub load: f64,
/// Read disk IO on this node
@@ -83,22 +83,29 @@ impl TryFrom<HeartbeatRequest> for Stat {
} = value;
match (header, peer, node_stat) {
(Some(header), Some(peer), Some(node_stat)) => Ok(Self {
timestamp_millis: time_util::current_time_millis(),
cluster_id: header.cluster_id,
id: peer.id,
addr: peer.addr,
is_leader,
rcus: node_stat.rcus,
wcus: node_stat.wcus,
table_num: node_stat.table_num,
region_num: node_stat.region_num,
cpu_usage: node_stat.cpu_usage,
load: node_stat.load,
read_io_rate: node_stat.read_io_rate,
write_io_rate: node_stat.write_io_rate,
region_stats: region_stats.into_iter().map(RegionStat::from).collect(),
}),
(Some(header), Some(peer), Some(node_stat)) => {
let region_num = if node_stat.region_num >= 0 {
Some(node_stat.region_num as u64)
} else {
None
};
Ok(Self {
timestamp_millis: time_util::current_time_millis(),
cluster_id: header.cluster_id,
id: peer.id,
addr: peer.addr,
is_leader,
rcus: node_stat.rcus,
wcus: node_stat.wcus,
table_num: node_stat.table_num,
region_num,
cpu_usage: node_stat.cpu_usage,
load: node_stat.load,
read_io_rate: node_stat.read_io_rate,
write_io_rate: node_stat.write_io_rate,
region_stats: region_stats.into_iter().map(RegionStat::from).collect(),
})
}
_ => Err(()),
}
}
@@ -129,7 +136,7 @@ mod tests {
let stat = Stat {
cluster_id: 3,
id: 101,
region_num: 10,
region_num: Some(10),
..Default::default()
};

View File

@@ -80,6 +80,9 @@ mod tests {
kv_store,
election: None,
skip_all: Arc::new(AtomicBool::new(false)),
catalog: None,
schema: None,
table: None,
};
let req = HeartbeatRequest::default();
@@ -87,7 +90,7 @@ mod tests {
stats: vec![Stat {
cluster_id: 3,
id: 101,
region_num: 100,
region_num: Some(100),
..Default::default()
}],
..Default::default()
@@ -122,6 +125,6 @@ mod tests {
let val: StatValue = kv.value.clone().try_into().unwrap();
assert_eq!(1, val.stats.len());
assert_eq!(100, val.stats[0].region_num);
assert_eq!(Some(100), val.stats[0].region_num);
}
}

View File

@@ -62,6 +62,9 @@ mod tests {
kv_store,
election: None,
skip_all: Arc::new(AtomicBool::new(false)),
catalog: None,
schema: None,
table: None,
};
let req = HeartbeatRequest {

View File

@@ -112,7 +112,7 @@ impl TryFrom<Vec<u8>> for LeaseValue {
fn try_from(bytes: Vec<u8>) -> Result<Self> {
String::from_utf8(bytes)
.context(error::LeaseKeyFromUtf8Snafu {})
.context(error::LeaseValueFromUtf8Snafu {})
.map(|x| x.parse())?
}
}
@@ -174,7 +174,7 @@ impl<'a> TableRouteKey<'a> {
}
}
#[derive(Eq, PartialEq, Debug)]
#[derive(Eq, PartialEq, Debug, Hash)]
pub struct StatKey {
pub cluster_id: u64,
pub node_id: u64,
@@ -194,7 +194,7 @@ impl FromStr for StatKey {
.captures(key)
.context(error::InvalidLeaseKeySnafu { key })?;
ensure!(caps.len() == 3, error::InvalidLeaseKeySnafu { key });
ensure!(caps.len() == 3, error::InvalidStatKeySnafu { key });
let cluster_id = caps[1].to_string();
let node_id = caps[2].to_string();
@@ -217,7 +217,7 @@ impl TryFrom<Vec<u8>> for StatKey {
fn try_from(bytes: Vec<u8>) -> Result<Self> {
String::from_utf8(bytes)
.context(error::LeaseKeyFromUtf8Snafu {})
.context(error::StatKeyFromUtf8Snafu {})
.map(|x| x.parse())?
}
}
@@ -228,6 +228,19 @@ pub struct StatValue {
pub stats: Vec<Stat>,
}
impl StatValue {
/// Get the region number from stat value.
pub fn region_num(&self) -> Option<u64> {
for stat in self.stats.iter() {
match stat.region_num {
Some(region_num) => return Some(region_num),
None => continue,
}
}
None
}
}
impl TryFrom<StatValue> for Vec<u8> {
type Error = error::Error;
@@ -253,7 +266,7 @@ impl TryFrom<Vec<u8>> for StatValue {
fn try_from(value: Vec<u8>) -> Result<Self> {
String::from_utf8(value)
.context(error::LeaseKeyFromUtf8Snafu {})
.context(error::StatValueFromUtf8Snafu {})
.map(|x| x.parse())?
}
}
@@ -282,7 +295,7 @@ mod tests {
cluster_id: 0,
id: 101,
is_leader: false,
region_num: 100,
region_num: Some(100),
..Default::default()
};
@@ -298,7 +311,7 @@ mod tests {
assert_eq!(0, stat.cluster_id);
assert_eq!(101, stat.id);
assert!(!stat.is_leader);
assert_eq!(100, stat.region_num);
assert_eq!(Some(100), stat.region_num);
}
#[test]
@@ -326,4 +339,39 @@ mod tests {
assert_eq!(new_value, value);
}
#[test]
fn test_get_region_num_from_stat_val() {
let empty = StatValue { stats: vec![] };
let region_num = empty.region_num();
assert!(region_num.is_none());
let wrong = StatValue {
stats: vec![Stat {
region_num: None,
..Default::default()
}],
};
let right = wrong.region_num();
assert!(right.is_none());
let stat_val = StatValue {
stats: vec![
Stat {
region_num: Some(1),
..Default::default()
},
Stat {
region_num: None,
..Default::default()
},
Stat {
region_num: Some(2),
..Default::default()
},
],
};
let region_num = stat_val.region_num().unwrap();
assert_eq!(1, region_num);
}
}

View File

@@ -25,7 +25,7 @@ use crate::handler::{
OnLeaderStartHandler, PersistStatsHandler, ResponseHeaderHandler,
};
use crate::selector::lease_based::LeaseBasedSelector;
use crate::selector::Selector;
use crate::selector::{Selector, SelectorType};
use crate::sequence::{Sequence, SequenceRef};
use crate::service::store::kv::{KvStoreRef, ResetableKvStoreRef};
use crate::service::store::memory::MemStore;
@@ -39,6 +39,7 @@ pub struct MetaSrvOptions {
pub server_addr: String,
pub store_addr: String,
pub datanode_lease_secs: i64,
pub selector: SelectorType,
}
impl Default for MetaSrvOptions {
@@ -48,6 +49,7 @@ impl Default for MetaSrvOptions {
server_addr: "127.0.0.1:3002".to_string(),
store_addr: "127.0.0.1:2379".to_string(),
datanode_lease_secs: 15,
selector: SelectorType::default(),
}
}
}
@@ -60,6 +62,9 @@ pub struct Context {
pub kv_store: KvStoreRef,
pub election: Option<ElectionRef>,
pub skip_all: Arc<AtomicBool>,
pub catalog: Option<String>,
pub schema: Option<String>,
pub table: Option<String>,
}
impl Context {
@@ -219,6 +224,9 @@ impl MetaSrv {
kv_store,
election,
skip_all,
catalog: None,
schema: None,
table: None,
}
}
}

View File

@@ -13,8 +13,17 @@
// limitations under the License.
pub mod lease_based;
pub mod load_based;
use std::sync::Arc;
use serde::{Deserialize, Serialize};
use self::lease_based::LeaseBasedSelector;
use self::load_based::LoadBasedSelector;
use crate::error;
use crate::error::Result;
use crate::metasrv::SelectorRef;
pub type Namespace = u64;
@@ -25,3 +34,65 @@ pub trait Selector: Send + Sync {
async fn select(&self, ns: Namespace, ctx: &Self::Context) -> Result<Self::Output>;
}
#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
pub enum SelectorType {
LoadBased,
LeaseBased,
}
impl From<SelectorType> for SelectorRef {
fn from(selector_type: SelectorType) -> Self {
match selector_type {
SelectorType::LoadBased => Arc::new(LoadBasedSelector) as SelectorRef,
SelectorType::LeaseBased => Arc::new(LeaseBasedSelector) as SelectorRef,
}
}
}
impl Default for SelectorType {
fn default() -> Self {
SelectorType::LeaseBased
}
}
impl TryFrom<&str> for SelectorType {
type Error = error::Error;
fn try_from(value: &str) -> Result<Self> {
match value {
"LoadBased" => Ok(SelectorType::LoadBased),
"LeaseBased" => Ok(SelectorType::LeaseBased),
other => error::UnsupportedSelectorTypeSnafu {
selector_type: other,
}
.fail(),
}
}
}
#[cfg(test)]
mod tests {
use super::SelectorType;
use crate::error::Result;
#[test]
fn test_default_selector_type() {
assert_eq!(SelectorType::LeaseBased, SelectorType::default());
}
#[test]
fn test_convert_str_to_selector_type() {
let leasebased = "LeaseBased";
let selector_type = leasebased.try_into().unwrap();
assert_eq!(SelectorType::LeaseBased, selector_type);
let loadbased = "LoadBased";
let selector_type = loadbased.try_into().unwrap();
assert_eq!(SelectorType::LoadBased, selector_type);
let unknow = "unknow";
let selector_type: Result<SelectorType> = unknow.try_into();
assert!(selector_type.is_err());
}
}

View File

@@ -0,0 +1,210 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use std::collections::HashMap;
use api::v1::meta::{Peer, RangeRequest};
use common_time::util as time_util;
use super::{Namespace, Selector};
use crate::error::Result;
use crate::keys::{LeaseKey, LeaseValue, StatKey, StatValue, DN_STAT_PREFIX};
use crate::metasrv::Context;
use crate::service::store::kv::KvStoreRef;
use crate::{lease, util};
pub struct LoadBasedSelector;
#[async_trait::async_trait]
impl Selector for LoadBasedSelector {
type Context = Context;
type Output = Vec<Peer>;
async fn select(&self, ns: Namespace, ctx: &Self::Context) -> Result<Self::Output> {
// get alive datanodes
let lease_filter = |_: &LeaseKey, v: &LeaseValue| {
time_util::current_time_millis() - v.timestamp_millis < ctx.datanode_lease_secs * 1000
};
let lease_kvs: HashMap<LeaseKey, LeaseValue> =
lease::alive_datanodes(ns, &ctx.kv_store, lease_filter)
.await?
.into_iter()
.collect();
// get stats of datanodes
let stat_kvs = all_stat_kvs(ns, &ctx.kv_store).await?;
// filter out expired datanodes and nodes that cannot get region number
let mut tuples: Vec<_> = stat_kvs
.iter()
.filter_map(|(stat_key, stat_val)| {
match (
lease_kvs.get(&to_lease_key(stat_key)),
stat_val.region_num(),
) {
(Some(lease_val), Some(region_num)) => Some((stat_key, lease_val, region_num)),
_ => None,
}
})
.collect();
// sort the datanodes according to the number of regions
tuples.sort_by(|a, b| a.2.cmp(&b.2));
Ok(tuples
.into_iter()
.map(|(stat_key, lease_val, _)| Peer {
id: stat_key.node_id,
addr: lease_val.node_addr.clone(),
})
.collect())
}
}
// get all stat kvs from store
pub async fn all_stat_kvs(
cluster_id: u64,
kv_store: &KvStoreRef,
) -> Result<Vec<(StatKey, StatValue)>> {
let key = stat_prefix(cluster_id);
let range_end = util::get_prefix_end_key(&key);
let req = RangeRequest {
key,
range_end,
..Default::default()
};
let kvs = kv_store.range(req).await?.kvs;
let mut stat_kvs = Vec::with_capacity(kvs.len());
for kv in kvs {
let key: StatKey = kv.key.try_into()?;
let value: StatValue = kv.value.try_into()?;
stat_kvs.push((key, value));
}
Ok(stat_kvs)
}
fn to_lease_key(k: &StatKey) -> LeaseKey {
LeaseKey {
cluster_id: k.cluster_id,
node_id: k.node_id,
}
}
fn stat_prefix(cluster_id: u64) -> Vec<u8> {
format!("{DN_STAT_PREFIX}-{cluster_id}").into_bytes()
}
#[cfg(test)]
mod tests {
use std::sync::Arc;
use api::v1::meta::PutRequest;
use super::{all_stat_kvs, to_lease_key};
use crate::handler::node_stat::Stat;
use crate::keys::{StatKey, StatValue};
use crate::selector::load_based::stat_prefix;
use crate::service::store::kv::{KvStore, KvStoreRef};
use crate::service::store::memory::MemStore;
#[tokio::test]
async fn test_all_stat_kvs() {
let kv_store = Arc::new(MemStore::new()) as Arc<dyn KvStore>;
let kvs = all_stat_kvs(0, &kv_store).await.unwrap();
assert!(kvs.is_empty());
let mut kv_store = Arc::new(MemStore::new()) as Arc<dyn KvStore>;
put_stats_to_store(&mut kv_store).await;
let kvs = all_stat_kvs(0, &kv_store).await.unwrap();
assert_eq!(2, kvs.len());
let kvs = all_stat_kvs(1, &kv_store).await.unwrap();
assert_eq!(1, kvs.len());
}
#[test]
fn test_to_lease_key() {
let statkey = StatKey {
cluster_id: 1,
node_id: 101,
};
let lease_key = to_lease_key(&statkey);
assert_eq!(1, lease_key.cluster_id);
assert_eq!(101, lease_key.node_id);
}
#[test]
fn test_stat_prefix() {
assert_eq!(stat_prefix(1), b"__meta_dnstat-1");
}
async fn put_stats_to_store(store: &mut KvStoreRef) {
let put1 = PutRequest {
key: StatKey {
cluster_id: 0,
node_id: 101,
}
.into(),
value: StatValue {
stats: vec![Stat {
region_num: Some(100),
..Default::default()
}],
}
.try_into()
.unwrap(),
..Default::default()
};
store.put(put1).await.unwrap();
let put2 = PutRequest {
key: StatKey {
cluster_id: 0,
node_id: 102,
}
.into(),
value: StatValue {
stats: vec![Stat {
region_num: Some(99),
..Default::default()
}],
}
.try_into()
.unwrap(),
..Default::default()
};
store.put(put2).await.unwrap();
let put3 = PutRequest {
key: StatKey {
cluster_id: 1,
node_id: 103,
}
.into(),
value: StatValue {
stats: vec![Stat {
region_num: Some(98),
..Default::default()
}],
}
.try_into()
.unwrap(),
..Default::default()
};
store.put(put3).await.unwrap();
}
}

View File

@@ -15,7 +15,7 @@
use api::v1::meta::{
router_server, CreateRequest, DeleteRequest, Error, MoveValueRequest, Peer, PeerDict,
PutRequest, RangeRequest, Region, RegionRoute, ResponseHeader, RouteRequest, RouteResponse,
Table, TableRoute, TableRouteValue,
Table, TableName, TableRoute, TableRouteValue,
};
use catalog::helper::{TableGlobalKey, TableGlobalValue};
use common_telemetry::warn;
@@ -34,9 +34,14 @@ use crate::service::GrpcResult;
impl router_server::Router for MetaSrv {
async fn create(&self, req: Request<CreateRequest>) -> GrpcResult<RouteResponse> {
let req = req.into_inner();
let ctx = self.new_ctx();
let CreateRequest { table_name, .. } = &req;
let table_name = table_name.clone().context(error::EmptyTableNameSnafu)?;
let ctx = self.create_ctx(table_name);
let selector = self.selector();
let table_id_sequence = self.table_id_sequence();
let res = handle_create(req, ctx, selector, table_id_sequence).await?;
Ok(Response::new(res))
@@ -59,6 +64,21 @@ impl router_server::Router for MetaSrv {
}
}
impl MetaSrv {
fn create_ctx(&self, table_name: TableName) -> Context {
let mut ctx = self.new_ctx();
let TableName {
catalog_name,
schema_name,
table_name,
} = table_name;
ctx.catalog = Some(catalog_name);
ctx.schema = Some(schema_name);
ctx.table = Some(table_name);
ctx
}
}
async fn handle_create(
req: CreateRequest,
ctx: Context,