From da68d8ce4b0e3f6f5314de5ef361268812632098 Mon Sep 17 00:00:00 2001 From: fys <40801205+fengys1996@users.noreply.github.com> Date: Mon, 20 Nov 2023 14:47:42 +0800 Subject: [PATCH] feat: add random weighted choose in load_based selector (#2234) * feat: add random weigted choose in load_based selector * fix: meta cannot save heartbeats when cluster have no region * chore: print some log * chore: remove unused code * cr * add some logs when filter result is empty --- Cargo.lock | 1 + src/meta-client/src/client.rs | 9 +- src/meta-srv/Cargo.toml | 1 + src/meta-srv/src/bootstrap.rs | 2 +- src/meta-srv/src/error.rs | 19 +- .../src/handler/collect_stats_handler.rs | 5 +- src/meta-srv/src/handler/mailbox_handler.rs | 1 + src/meta-srv/src/handler/node_stat.rs | 4 +- src/meta-srv/src/keys.rs | 33 ++- src/meta-srv/src/procedure/region_failover.rs | 16 +- .../region_failover/failover_start.rs | 4 +- src/meta-srv/src/selector.rs | 27 +- src/meta-srv/src/selector/common.rs | 192 ++++++++++++++ src/meta-srv/src/selector/lease_based.rs | 45 ++-- src/meta-srv/src/selector/load_based.rs | 208 +++++++++++---- src/meta-srv/src/selector/weight_compute.rs | 237 ++++++++++++++++++ src/meta-srv/src/selector/weighted_choose.rs | 176 +++++++++++++ src/meta-srv/src/table_meta_alloc.rs | 14 +- tests-integration/src/grpc.rs | 2 +- tests-integration/src/instance.rs | 2 +- tests-integration/tests/region_failover.rs | 9 +- 21 files changed, 912 insertions(+), 95 deletions(-) create mode 100644 src/meta-srv/src/selector/common.rs create mode 100644 src/meta-srv/src/selector/weight_compute.rs create mode 100644 src/meta-srv/src/selector/weighted_choose.rs diff --git a/Cargo.lock b/Cargo.lock index e08c498772..eb8bac66e3 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4723,6 +4723,7 @@ dependencies = [ "h2", "http-body", "humantime-serde", + "itertools 0.10.5", "lazy_static", "once_cell", "parking_lot 0.12.1", diff --git a/src/meta-client/src/client.rs b/src/meta-client/src/client.rs index 641eace2fb..a5e9d35c8a 100644 --- a/src/meta-client/src/client.rs +++ b/src/meta-client/src/client.rs @@ -385,7 +385,7 @@ impl MetaClient { mod tests { use api::v1::meta::{HeartbeatRequest, Peer}; use meta_srv::metasrv::SelectorContext; - use meta_srv::selector::{Namespace, Selector}; + use meta_srv::selector::{Namespace, Selector, SelectorOptions}; use meta_srv::Result as MetaResult; use super::*; @@ -547,7 +547,12 @@ mod tests { type Context = SelectorContext; type Output = Vec; - async fn select(&self, _ns: Namespace, _ctx: &Self::Context) -> MetaResult { + async fn select( + &self, + _ns: Namespace, + _ctx: &Self::Context, + _opts: SelectorOptions, + ) -> MetaResult { Ok(vec![ Peer { id: 0, diff --git a/src/meta-srv/Cargo.toml b/src/meta-srv/Cargo.toml index 829c315708..f38f82be64 100644 --- a/src/meta-srv/Cargo.toml +++ b/src/meta-srv/Cargo.toml @@ -34,6 +34,7 @@ futures.workspace = true h2 = "0.3" http-body = "0.4" humantime-serde.workspace = true +itertools.workspace = true lazy_static.workspace = true once_cell.workspace = true parking_lot = "0.12" diff --git a/src/meta-srv/src/bootstrap.rs b/src/meta-srv/src/bootstrap.rs index 975a3fb66f..dcf112e2e7 100644 --- a/src/meta-srv/src/bootstrap.rs +++ b/src/meta-srv/src/bootstrap.rs @@ -188,7 +188,7 @@ pub async fn build_meta_srv(opts: &MetaSrvOptions, plugins: Plugins) -> Result Arc::new(LoadBasedSelector) as SelectorRef, + SelectorType::LoadBased => Arc::new(LoadBasedSelector::default()) as SelectorRef, SelectorType::LeaseBased => Arc::new(LeaseBasedSelector) as SelectorRef, }; diff --git a/src/meta-srv/src/error.rs b/src/meta-srv/src/error.rs index 649383f06d..600c188e3a 100644 --- a/src/meta-srv/src/error.rs +++ b/src/meta-srv/src/error.rs @@ -18,6 +18,7 @@ use common_macro::stack_trace_debug; use common_meta::peer::Peer; use common_meta::DatanodeId; use common_runtime::JoinError; +use rand::distributions::WeightedError; use servers::define_into_tonic_status; use snafu::{Location, Snafu}; use store_api::storage::RegionId; @@ -114,13 +115,13 @@ pub enum Error { }, #[snafu(display( - "Failed to request Datanode, expected: {}, but only {} available", - expected, + "Failed to request Datanode, required: {}, but only {} available", + required, available ))] NoEnoughAvailableDatanode { location: Location, - expected: usize, + required: usize, available: usize, }, @@ -562,6 +563,16 @@ pub enum Error { operation: String, location: Location, }, + + #[snafu(display("Failed to set weight array"))] + WeightArray { + #[snafu(source)] + error: WeightedError, + location: Location, + }, + + #[snafu(display("Weight array is not set"))] + NotSetWeightArray { location: Location }, } impl Error { @@ -611,6 +622,8 @@ impl ErrorExt for Error { | Error::NoEnoughAvailableDatanode { .. } | Error::PublishMessage { .. } | Error::Join { .. } + | Error::WeightArray { .. } + | Error::NotSetWeightArray { .. } | Error::Unsupported { .. } => StatusCode::Internal, Error::TableAlreadyExists { .. } => StatusCode::TableAlreadyExists, Error::EmptyKey { .. } diff --git a/src/meta-srv/src/handler/collect_stats_handler.rs b/src/meta-srv/src/handler/collect_stats_handler.rs index a52a69be6e..536748aaaf 100644 --- a/src/meta-srv/src/handler/collect_stats_handler.rs +++ b/src/meta-srv/src/handler/collect_stats_handler.rs @@ -42,10 +42,7 @@ impl HeartbeatHandler for CollectStatsHandler { match Stat::try_from(req.clone()) { Ok(stat) => { - // If stat is empty, it means the request is a mailbox response - if !stat.is_empty() { - let _ = acc.stat.insert(stat); - } + let _ = acc.stat.insert(stat); } Err(err) => { warn!("Incomplete heartbeat data: {:?}, err: {:?}", req, err); diff --git a/src/meta-srv/src/handler/mailbox_handler.rs b/src/meta-srv/src/handler/mailbox_handler.rs index 8ffbf9aa21..4bc3b543ba 100644 --- a/src/meta-srv/src/handler/mailbox_handler.rs +++ b/src/meta-srv/src/handler/mailbox_handler.rs @@ -34,6 +34,7 @@ impl HeartbeatHandler for MailboxHandler { ) -> Result<()> { if let Some(message) = &req.mailbox_message { ctx.mailbox.on_recv(message.id, Ok(message.clone())).await?; + ctx.set_skip_all(); } Ok(()) diff --git a/src/meta-srv/src/handler/node_stat.rs b/src/meta-srv/src/handler/node_stat.rs index 609e806296..4e9343da35 100644 --- a/src/meta-srv/src/handler/node_stat.rs +++ b/src/meta-srv/src/handler/node_stat.rs @@ -23,7 +23,7 @@ use store_api::storage::RegionId; use crate::error::{Error, InvalidHeartbeatRequestSnafu}; use crate::keys::StatKey; -#[derive(Debug, Default, Serialize, Deserialize)] +#[derive(Debug, Clone, Default, Serialize, Deserialize)] pub struct Stat { pub timestamp_millis: i64, pub cluster_id: u64, @@ -42,7 +42,7 @@ pub struct Stat { pub node_epoch: u64, } -#[derive(Debug, Serialize, Deserialize)] +#[derive(Debug, Clone, Serialize, Deserialize)] pub struct RegionStat { /// The region_id. pub id: u64, diff --git a/src/meta-srv/src/keys.rs b/src/meta-srv/src/keys.rs index 0bfd3bb733..cb67af41cb 100644 --- a/src/meta-srv/src/keys.rs +++ b/src/meta-srv/src/keys.rs @@ -187,7 +187,7 @@ impl TryFrom> for StatKey { } } -#[derive(Debug, Serialize, Deserialize)] +#[derive(Debug, Clone, Serialize, Deserialize)] #[serde(transparent)] pub struct StatValue { pub stats: Vec, @@ -198,6 +198,11 @@ impl StatValue { pub fn region_num(&self) -> Option { self.stats.last().map(|x| x.region_num) } + + /// Get the latest node addr. + pub fn node_addr(&self) -> Option { + self.stats.last().map(|x| x.addr.clone()) + } } impl TryFrom for Vec { @@ -365,6 +370,32 @@ mod tests { assert_eq!(new_value, value); } + #[test] + fn test_get_addr_from_stat_val() { + let empty = StatValue { stats: vec![] }; + let addr = empty.node_addr(); + assert!(addr.is_none()); + + let stat_val = StatValue { + stats: vec![ + Stat { + addr: "1".to_string(), + ..Default::default() + }, + Stat { + addr: "2".to_string(), + ..Default::default() + }, + Stat { + addr: "3".to_string(), + ..Default::default() + }, + ], + }; + let addr = stat_val.node_addr().unwrap(); + assert_eq!("3", addr); + } + #[test] fn test_get_region_num_from_stat_val() { let empty = StatValue { stats: vec![] }; diff --git a/src/meta-srv/src/procedure/region_failover.rs b/src/meta-srv/src/procedure/region_failover.rs index 1ab93ce8e6..2694f8c88a 100644 --- a/src/meta-srv/src/procedure/region_failover.rs +++ b/src/meta-srv/src/procedure/region_failover.rs @@ -400,7 +400,7 @@ mod tests { use crate::cluster::MetaPeerClientBuilder; use crate::handler::{HeartbeatMailbox, Pusher, Pushers}; use crate::lock::memory::MemLock; - use crate::selector::{Namespace, Selector}; + use crate::selector::{Namespace, Selector, SelectorOptions}; use crate::service::mailbox::Channel; use crate::test_util; @@ -413,7 +413,12 @@ mod tests { type Context = SelectorContext; type Output = Vec; - async fn select(&self, _ns: Namespace, _ctx: &Self::Context) -> Result { + async fn select( + &self, + _ns: Namespace, + _ctx: &Self::Context, + _opts: SelectorOptions, + ) -> Result { let mut rng = rand::thread_rng(); let mut nodes = self.nodes.clone(); nodes.shuffle(&mut rng); @@ -711,7 +716,12 @@ mod tests { type Context = SelectorContext; type Output = Vec; - async fn select(&self, _ns: Namespace, _ctx: &Self::Context) -> Result { + async fn select( + &self, + _ns: Namespace, + _ctx: &Self::Context, + _opts: SelectorOptions, + ) -> Result { let mut peers = self.peers.lock().unwrap(); Ok(if let Some(Some(peer)) = peers.pop() { vec![peer] diff --git a/src/meta-srv/src/procedure/region_failover/failover_start.rs b/src/meta-srv/src/procedure/region_failover/failover_start.rs index e31de684ca..d3c2dea8f2 100644 --- a/src/meta-srv/src/procedure/region_failover/failover_start.rs +++ b/src/meta-srv/src/procedure/region_failover/failover_start.rs @@ -24,6 +24,7 @@ use snafu::ensure; use super::deactivate_region::DeactivateRegion; use super::{RegionFailoverContext, State}; use crate::error::{RegionFailoverCandidatesNotFoundSnafu, Result, RetryLaterSnafu}; +use crate::selector::SelectorOptions; #[derive(Serialize, Deserialize, Debug)] pub(super) struct RegionFailoverStart { @@ -50,9 +51,10 @@ impl RegionFailoverStart { selector_ctx.table_id = Some(failed_region.table_id); let cluster_id = failed_region.cluster_id; + let opts = SelectorOptions::default(); let candidates = ctx .selector - .select(cluster_id, &selector_ctx) + .select(cluster_id, &selector_ctx, opts) .await? .iter() .filter_map(|p| { diff --git a/src/meta-srv/src/selector.rs b/src/meta-srv/src/selector.rs index 58a83429c7..140f4e5711 100644 --- a/src/meta-srv/src/selector.rs +++ b/src/meta-srv/src/selector.rs @@ -12,8 +12,11 @@ // See the License for the specific language governing permissions and // limitations under the License. +mod common; pub mod lease_based; pub mod load_based; +mod weight_compute; +mod weighted_choose; use serde::{Deserialize, Serialize}; @@ -27,7 +30,29 @@ pub trait Selector: Send + Sync { type Context; type Output; - async fn select(&self, ns: Namespace, ctx: &Self::Context) -> Result; + async fn select( + &self, + ns: Namespace, + ctx: &Self::Context, + opts: SelectorOptions, + ) -> Result; +} + +#[derive(Debug)] +pub struct SelectorOptions { + /// Minimum number of selected results. + pub min_required_items: usize, + /// Whether duplicates are allowed in the selected result, default false. + pub allow_duplication: bool, +} + +impl Default for SelectorOptions { + fn default() -> Self { + Self { + min_required_items: 1, + allow_duplication: false, + } + } } #[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize, Default)] diff --git a/src/meta-srv/src/selector/common.rs b/src/meta-srv/src/selector/common.rs new file mode 100644 index 0000000000..8d807b7eb2 --- /dev/null +++ b/src/meta-srv/src/selector/common.rs @@ -0,0 +1,192 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::collections::HashSet; + +use api::v1::meta::Peer; +use snafu::ensure; + +use super::weighted_choose::{WeightedChoose, WeightedItem}; +use crate::error; +use crate::error::Result; +use crate::selector::SelectorOptions; + +/// According to the `opts`, choose peers from the `weight_array` through `weighted_choose`. +pub fn choose_peers( + mut weight_array: Vec>, + opts: &SelectorOptions, + weighted_choose: &mut W, +) -> Result> +where + W: WeightedChoose, +{ + let min_required_items = opts.min_required_items; + ensure!( + !weight_array.is_empty(), + error::NoEnoughAvailableDatanodeSnafu { + required: min_required_items, + available: 0_usize, + } + ); + + if opts.allow_duplication { + weighted_choose.set_weight_array(weight_array)?; + (0..min_required_items) + .map(|_| weighted_choose.choose_one()) + .collect::>() + } else { + let weight_array_len = weight_array.len(); + + // When opts.allow_duplication is false, we need to check that the length of the weighted array is greater than + // or equal to min_required_items, otherwise it may cause an infinite loop. + ensure!( + weight_array_len >= min_required_items, + error::NoEnoughAvailableDatanodeSnafu { + required: min_required_items, + available: weight_array_len, + } + ); + + if weight_array_len == min_required_items { + return Ok(weight_array.into_iter().map(|item| item.item).collect()); + } + + weighted_choose.set_weight_array(weight_array.clone())?; + + // Assume min_required_items is 3, weight_array_len is 100, then we can choose 3 items from the weight array + // and return. But assume min_required_items is 99, weight_array_len is 100. It's not cheap to choose 99 items + // from the weight array. So we can reverse choose 1 item from the weight array, and return the remaining 99 + // items. + if min_required_items * 2 > weight_array_len { + let select_num = weight_array_len - min_required_items; + let mut selected = HashSet::with_capacity(select_num); + while selected.len() < select_num { + let item = weighted_choose.reverse_choose_one()?; + selected.insert(item); + } + weight_array.retain(|item| !selected.contains(&item.item)); + Ok(weight_array.into_iter().map(|item| item.item).collect()) + } else { + let mut selected = HashSet::with_capacity(min_required_items); + while selected.len() < min_required_items { + let item = weighted_choose.choose_one()?; + selected.insert(item); + } + Ok(selected.into_iter().collect()) + } + } +} + +#[cfg(test)] +mod tests { + use std::collections::HashSet; + + use api::v1::meta::Peer; + + use crate::selector::common::choose_peers; + use crate::selector::weighted_choose::{RandomWeightedChoose, WeightedItem}; + use crate::selector::SelectorOptions; + + #[test] + fn test_choose_peers() { + let weight_array = vec![ + WeightedItem { + item: Peer { + id: 1, + addr: "127.0.0.1:3001".to_string(), + }, + weight: 1, + reverse_weight: 1, + }, + WeightedItem { + item: Peer { + id: 2, + addr: "127.0.0.1:3001".to_string(), + }, + weight: 1, + reverse_weight: 1, + }, + WeightedItem { + item: Peer { + id: 3, + addr: "127.0.0.1:3001".to_string(), + }, + weight: 1, + reverse_weight: 1, + }, + WeightedItem { + item: Peer { + id: 4, + addr: "127.0.0.1:3001".to_string(), + }, + weight: 1, + reverse_weight: 1, + }, + WeightedItem { + item: Peer { + id: 5, + addr: "127.0.0.1:3001".to_string(), + }, + weight: 1, + reverse_weight: 1, + }, + ]; + + for i in 1..=5 { + let opts = SelectorOptions { + min_required_items: i, + allow_duplication: false, + }; + + let selected_peers: HashSet<_> = choose_peers( + weight_array.clone(), + &opts, + &mut RandomWeightedChoose::default(), + ) + .unwrap() + .into_iter() + .collect(); + + assert_eq!(i, selected_peers.len()); + } + + let opts = SelectorOptions { + min_required_items: 6, + allow_duplication: false, + }; + + let selected_result = choose_peers( + weight_array.clone(), + &opts, + &mut RandomWeightedChoose::default(), + ); + assert!(selected_result.is_err()); + + for i in 1..=50 { + let opts = SelectorOptions { + min_required_items: i, + allow_duplication: true, + }; + + let selected_peers = choose_peers( + weight_array.clone(), + &opts, + &mut RandomWeightedChoose::default(), + ) + .unwrap(); + + assert_eq!(i, selected_peers.len()); + } + } +} diff --git a/src/meta-srv/src/selector/lease_based.rs b/src/meta-srv/src/selector/lease_based.rs index 83a4260510..268371a003 100644 --- a/src/meta-srv/src/selector/lease_based.rs +++ b/src/meta-srv/src/selector/lease_based.rs @@ -13,13 +13,13 @@ // limitations under the License. use api::v1::meta::Peer; -use rand::seq::SliceRandom; -use rand::thread_rng; use crate::error::Result; use crate::lease; use crate::metasrv::SelectorContext; -use crate::selector::{Namespace, Selector}; +use crate::selector::common::choose_peers; +use crate::selector::weighted_choose::{RandomWeightedChoose, WeightedItem}; +use crate::selector::{Namespace, Selector, SelectorOptions}; pub struct LeaseBasedSelector; @@ -28,24 +28,33 @@ impl Selector for LeaseBasedSelector { type Context = SelectorContext; type Output = Vec; - async fn select(&self, ns: Namespace, ctx: &Self::Context) -> Result { - // filter out the nodes out lease - let mut lease_kvs: Vec<_> = - lease::alive_datanodes(ns, &ctx.meta_peer_client, ctx.datanode_lease_secs) - .await? - .into_iter() - .collect(); + async fn select( + &self, + ns: Namespace, + ctx: &Self::Context, + opts: SelectorOptions, + ) -> Result { + // 1. get alive datanodes. + let lease_kvs = + lease::alive_datanodes(ns, &ctx.meta_peer_client, ctx.datanode_lease_secs).await?; - lease_kvs.shuffle(&mut thread_rng()); - - let peers = lease_kvs + // 2. compute weight array, but the weight of each item is the same. + let weight_array = lease_kvs .into_iter() - .map(|(k, v)| Peer { - id: k.node_id, - addr: v.node_addr, + .map(|(k, v)| WeightedItem { + item: Peer { + id: k.node_id, + addr: v.node_addr.clone(), + }, + weight: 1, + reverse_weight: 1, }) - .collect::>(); + .collect(); - Ok(peers) + // 3. choose peers by weight_array. + let weighted_choose = &mut RandomWeightedChoose::default(); + let selected = choose_peers(weight_array, &opts, weighted_choose)?; + + Ok(selected) } } diff --git a/src/meta-srv/src/selector/load_based.rs b/src/meta-srv/src/selector/load_based.rs index e816cc5cdc..114a48beff 100644 --- a/src/meta-srv/src/selector/load_based.rs +++ b/src/meta-srv/src/selector/load_based.rs @@ -12,22 +12,125 @@ // See the License for the specific language governing permissions and // limitations under the License. +use std::collections::HashMap; + use api::v1::meta::Peer; use common_meta::key::TableMetadataManager; use common_meta::rpc::router::find_leaders; -use common_telemetry::warn; +use common_telemetry::{debug, info}; +use parking_lot::RwLock; use snafu::ResultExt; use table::metadata::TableId; use crate::error::{self, Result}; -use crate::keys::{LeaseKey, LeaseValue, StatKey}; +use crate::keys::{LeaseKey, LeaseValue, StatKey, StatValue}; use crate::lease; use crate::metasrv::SelectorContext; -use crate::selector::{Namespace, Selector}; +use crate::selector::common::choose_peers; +use crate::selector::weight_compute::{RegionNumsBasedWeightCompute, WeightCompute}; +use crate::selector::weighted_choose::{RandomWeightedChoose, WeightedChoose}; +use crate::selector::{Namespace, Selector, SelectorOptions}; -const MAX_REGION_NUMBER: u64 = u64::MAX; +pub struct LoadBasedSelector { + weighted_choose: RwLock, + weight_compute: C, +} -pub struct LoadBasedSelector; +impl LoadBasedSelector { + pub fn new(weighted_choose: W, weight_compute: C) -> Self { + Self { + weighted_choose: RwLock::new(weighted_choose), + weight_compute, + } + } +} + +impl Default for LoadBasedSelector, RegionNumsBasedWeightCompute> { + fn default() -> Self { + Self { + weighted_choose: RwLock::new(RandomWeightedChoose::default()), + weight_compute: RegionNumsBasedWeightCompute, + } + } +} + +#[async_trait::async_trait] +impl Selector for LoadBasedSelector +where + W: WeightedChoose, + C: WeightCompute>, +{ + type Context = SelectorContext; + type Output = Vec; + + async fn select( + &self, + ns: Namespace, + ctx: &Self::Context, + opts: SelectorOptions, + ) -> Result { + // 1. get alive datanodes. + let lease_kvs = + lease::alive_datanodes(ns, &ctx.meta_peer_client, ctx.datanode_lease_secs).await?; + + // 2. get stat kvs and filter out expired datanodes. + let stat_keys = lease_kvs.keys().map(|k| k.into()).collect(); + let stat_kvs = filter_out_expired_datanode( + ctx.meta_peer_client.get_dn_stat_kvs(stat_keys).await?, + &lease_kvs, + ); + + // 3. try to make the regions of a table distributed on different datanodes as much as possible. + let stat_kvs = if let Some(table_id) = ctx.table_id { + let table_metadata_manager = TableMetadataManager::new(ctx.kv_backend.clone()); + let leader_peer_ids = get_leader_peer_ids(&table_metadata_manager, table_id).await?; + let filter_result = filter_out_datanode_by_table(&stat_kvs, &leader_peer_ids); + if filter_result.is_empty() { + info!("The regions of the table cannot be allocated to completely different datanodes, table id: {}.", table_id); + stat_kvs + } else { + filter_result + } + } else { + stat_kvs + }; + + // 4. compute weight array. + let weight_array = self.weight_compute.compute(&stat_kvs); + + // 5. choose peers by weight_array. + let mut weighted_choose = self.weighted_choose.write(); + let selected = choose_peers(weight_array, &opts, &mut *weighted_choose)?; + + debug!( + "LoadBasedSelector select peers: {:?}, namespace: {}, opts: {:?}.", + selected, ns, opts, + ); + + Ok(selected) + } +} + +fn filter_out_expired_datanode( + mut stat_kvs: HashMap, + lease_kvs: &HashMap, +) -> HashMap { + lease_kvs + .iter() + .filter_map(|(lease_k, _)| stat_kvs.remove_entry(&lease_k.into())) + .collect() +} + +fn filter_out_datanode_by_table( + stat_kvs: &HashMap, + leader_peer_ids: &[u64], +) -> HashMap { + stat_kvs + .iter() + .filter(|(stat_k, _)| leader_peer_ids.contains(&stat_k.node_id)) + .map(|(stat_k, stat_v)| (*stat_k, stat_v.clone())) + .collect() +} async fn get_leader_peer_ids( table_metadata_manager: &TableMetadataManager, @@ -48,59 +151,58 @@ async fn get_leader_peer_ids( }) } -#[async_trait::async_trait] -impl Selector for LoadBasedSelector { - type Context = SelectorContext; - type Output = Vec; +#[cfg(test)] +mod tests { + use std::collections::HashMap; - async fn select(&self, ns: Namespace, ctx: &Self::Context) -> Result { - // get alive datanodes - let lease_kvs = - lease::alive_datanodes(ns, &ctx.meta_peer_client, ctx.datanode_lease_secs).await?; - if lease_kvs.is_empty() { - return Ok(vec![]); - } + use crate::keys::{LeaseKey, LeaseValue, StatKey, StatValue}; + use crate::selector::load_based::filter_out_expired_datanode; - let stat_keys: Vec = lease_kvs.keys().map(|k| k.into()).collect(); - let stat_kvs = ctx.meta_peer_client.get_dn_stat_kvs(stat_keys).await?; + #[test] + fn test_filter_out_expired_datanode() { + let mut stat_kvs = HashMap::new(); + stat_kvs.insert( + StatKey { + cluster_id: 1, + node_id: 0, + }, + StatValue { stats: vec![] }, + ); + stat_kvs.insert( + StatKey { + cluster_id: 1, + node_id: 1, + }, + StatValue { stats: vec![] }, + ); + stat_kvs.insert( + StatKey { + cluster_id: 1, + node_id: 2, + }, + StatValue { stats: vec![] }, + ); - let leader_peer_ids = if let Some(table_id) = ctx.table_id { - let table_metadata_manager = TableMetadataManager::new(ctx.kv_backend.clone()); + let mut lease_kvs = HashMap::new(); + lease_kvs.insert( + LeaseKey { + cluster_id: 1, + node_id: 1, + }, + LeaseValue { + timestamp_millis: 0, + node_addr: "127.0.0.1:3002".to_string(), + }, + ); - get_leader_peer_ids(&table_metadata_manager, table_id).await? - } else { - Vec::new() - }; + let alive_stat_kvs = filter_out_expired_datanode(stat_kvs, &lease_kvs); - let mut tuples: Vec<(LeaseKey, LeaseValue, u64)> = lease_kvs - .into_iter() - .filter(|(lease_k, _)| !leader_peer_ids.contains(&lease_k.node_id)) - .map(|(lease_k, lease_v)| { - let stat_key: StatKey = (&lease_k).into(); - let region_num = match stat_kvs - .get(&stat_key) - .and_then(|stat_val| stat_val.region_num()) - { - Some(region_num) => region_num, - None => { - warn!("Failed to get stat_val by stat_key {:?}", stat_key); - MAX_REGION_NUMBER - } - }; - - (lease_k, lease_v, region_num) + assert_eq!(1, alive_stat_kvs.len()); + assert!(alive_stat_kvs + .get(&StatKey { + cluster_id: 1, + node_id: 1 }) - .collect(); - - // sort the datanodes according to the number of regions - tuples.sort_by(|a, b| a.2.cmp(&b.2)); - - Ok(tuples - .into_iter() - .map(|(lease_key, lease_val, _)| Peer { - id: lease_key.node_id, - addr: lease_val.node_addr, - }) - .collect()) + .is_some()); } } diff --git a/src/meta-srv/src/selector/weight_compute.rs b/src/meta-srv/src/selector/weight_compute.rs new file mode 100644 index 0000000000..8971f73acc --- /dev/null +++ b/src/meta-srv/src/selector/weight_compute.rs @@ -0,0 +1,237 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::collections::HashMap; + +use api::v1::meta::Peer; +use itertools::{Itertools, MinMaxResult}; + +use crate::keys::{StatKey, StatValue}; +use crate::selector::weighted_choose::WeightedItem; + +/// The [`WeightCompute`] trait is used to compute the weight array by heartbeats. +pub trait WeightCompute: Send + Sync { + type Source; + + fn compute(&self, stat_kvs: &Self::Source) -> Vec>; +} + +/// The ['RegionNumsBasedWeightCompute'] calculates weighted list based on region number obtained from the heartbeat. +/// +/// # How to calculate the weighted list? +/// weight = max_region_num - current_region_num + (max_region_num - min_region_num); +/// +/// # How to calculate the reverse weighted list? +/// reverse_weight = region_num - min_region_num + (max_region_num - min_region_num); +pub struct RegionNumsBasedWeightCompute; + +impl WeightCompute for RegionNumsBasedWeightCompute { + type Source = HashMap; + + fn compute(&self, stat_kvs: &HashMap) -> Vec> { + let mut region_nums = Vec::with_capacity(stat_kvs.len()); + let mut peers = Vec::with_capacity(stat_kvs.len()); + + for (stat_k, stat_v) in stat_kvs { + let Some(region_num) = stat_v.region_num() else { + continue; + }; + let Some(node_addr) = stat_v.node_addr() else { + continue; + }; + + let peer = Peer { + id: stat_k.node_id, + addr: node_addr, + }; + + region_nums.push(region_num); + peers.push(peer); + } + + if region_nums.is_empty() { + return vec![]; + } + + let (min_weight, max_weight) = match region_nums.iter().minmax() { + // unreachable safety: region_nums is not empty + MinMaxResult::NoElements => unreachable!(), + MinMaxResult::OneElement(minmax) => (*minmax, *minmax), + MinMaxResult::MinMax(min, max) => (*min, *max), + }; + + let base_weight = match max_weight - min_weight { + 0 => 1, + x => x, + }; + + peers + .into_iter() + .zip(region_nums) + .map(|(peer, region_num)| WeightedItem { + item: peer, + weight: (max_weight - region_num + base_weight) as usize, + reverse_weight: (region_num - min_weight + base_weight) as usize, + }) + .collect() + } +} + +#[cfg(test)] +mod tests { + use std::collections::HashMap; + + use api::v1::meta::Peer; + use store_api::region_engine::RegionRole; + + use super::{RegionNumsBasedWeightCompute, WeightCompute}; + use crate::handler::node_stat::{RegionStat, Stat}; + use crate::keys::{StatKey, StatValue}; + + #[test] + fn test_weight_compute() { + let mut stat_kvs: HashMap = HashMap::default(); + let stat_key = StatKey { + cluster_id: 1, + node_id: 1, + }; + let stat_val = StatValue { + stats: vec![mock_stat_1()], + }; + stat_kvs.insert(stat_key, stat_val); + let stat_key = StatKey { + cluster_id: 1, + node_id: 2, + }; + let stat_val = StatValue { + stats: vec![mock_stat_2()], + }; + stat_kvs.insert(stat_key, stat_val); + let stat_key = StatKey { + cluster_id: 1, + node_id: 3, + }; + let stat_val = StatValue { + stats: vec![mock_stat_3()], + }; + stat_kvs.insert(stat_key, stat_val); + + let compute = RegionNumsBasedWeightCompute; + let weight_array = compute.compute(&stat_kvs); + + let mut expected = HashMap::new(); + expected.insert( + Peer { + id: 1, + addr: "127.0.0.1:3001".to_string(), + }, + 4, + ); + expected.insert( + Peer { + id: 2, + addr: "127.0.0.1:3002".to_string(), + }, + 3, + ); + expected.insert( + Peer { + id: 3, + addr: "127.0.0.1:3003".to_string(), + }, + 2, + ); + for weight in weight_array.iter() { + assert_eq!(*expected.get(&weight.item).unwrap(), weight.weight,); + } + + let mut expected = HashMap::new(); + expected.insert( + Peer { + id: 1, + addr: "127.0.0.1:3001".to_string(), + }, + 2, + ); + expected.insert( + Peer { + id: 2, + addr: "127.0.0.1:3002".to_string(), + }, + 3, + ); + expected.insert( + Peer { + id: 3, + addr: "127.0.0.1:3003".to_string(), + }, + 4, + ); + + for weight in weight_array.iter() { + assert_eq!(weight.reverse_weight, *expected.get(&weight.item).unwrap()); + } + } + + fn mock_stat_1() -> Stat { + Stat { + addr: "127.0.0.1:3001".to_string(), + region_num: 11, + region_stats: vec![RegionStat { + id: 111, + rcus: 1, + wcus: 1, + approximate_bytes: 1, + approximate_rows: 1, + engine: "mito2".to_string(), + role: RegionRole::Leader, + }], + ..Default::default() + } + } + + fn mock_stat_2() -> Stat { + Stat { + addr: "127.0.0.1:3002".to_string(), + region_num: 12, + region_stats: vec![RegionStat { + id: 112, + rcus: 1, + wcus: 1, + approximate_bytes: 1, + approximate_rows: 1, + engine: "mito2".to_string(), + role: RegionRole::Leader, + }], + ..Default::default() + } + } + + fn mock_stat_3() -> Stat { + Stat { + addr: "127.0.0.1:3003".to_string(), + region_num: 13, + region_stats: vec![RegionStat { + id: 113, + rcus: 1, + wcus: 1, + approximate_bytes: 1, + approximate_rows: 1, + engine: "mito2".to_string(), + role: RegionRole::Leader, + }], + ..Default::default() + } + } +} diff --git a/src/meta-srv/src/selector/weighted_choose.rs b/src/meta-srv/src/selector/weighted_choose.rs new file mode 100644 index 0000000000..9e9f63abad --- /dev/null +++ b/src/meta-srv/src/selector/weighted_choose.rs @@ -0,0 +1,176 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use rand::distributions::WeightedIndex; +use rand::prelude::Distribution; +use rand::thread_rng; +use snafu::{ensure, ResultExt}; + +use crate::error; +use crate::error::Result; + +/// A common trait for weighted balance algorithm. +pub trait WeightedChoose: Send + Sync { + /// The method will re-set weight array. + /// + /// Note: + /// 1. make sure weight_array is not empty. + /// 2. the total weight is greater than 0. + /// Otherwise an error will be returned. + fn set_weight_array(&mut self, weight_array: Vec>) -> Result<()>; + + /// The method will choose one item. + /// + /// If not set weight_array before, an error will be returned. + fn choose_one(&mut self) -> Result; + + /// The method will reverse choose one item. + /// + /// If not set weight_array before, an error will be returned. + fn reverse_choose_one(&mut self) -> Result; +} + +#[derive(Debug, Clone, PartialEq, Eq)] +pub struct WeightedItem { + pub item: Item, + pub weight: usize, + pub reverse_weight: usize, +} + +/// A implementation of weighted balance: random weighted choose. +/// +/// The algorithm is as follows: +/// +/// ```text +/// random value +/// ─────────────────────────────────▶ +/// │ +/// ▼ +/// ┌─────────────────┬─────────┬──────────────────────┬─────┬─────────────────┐ +/// │element_0 │element_1│element_2 │... │element_n │ +/// └─────────────────┴─────────┴──────────────────────┴─────┴─────────────────┘ +/// ``` +pub struct RandomWeightedChoose { + items: Vec>, + weighted_index: Option>, + reverse_weighted_index: Option>, +} + +impl Default for RandomWeightedChoose { + fn default() -> Self { + Self { + items: Vec::default(), + weighted_index: None, + reverse_weighted_index: None, + } + } +} + +impl WeightedChoose for RandomWeightedChoose +where + Item: Clone + Send + Sync, +{ + fn set_weight_array(&mut self, weight_array: Vec>) -> Result<()> { + self.weighted_index = Some( + WeightedIndex::new(weight_array.iter().map(|item| item.weight)) + .context(error::WeightArraySnafu)?, + ); + + self.reverse_weighted_index = Some( + WeightedIndex::new(weight_array.iter().map(|item| item.reverse_weight)) + .context(error::WeightArraySnafu)?, + ); + + self.items = weight_array; + + Ok(()) + } + + fn choose_one(&mut self) -> Result { + ensure!( + !self.items.is_empty() && self.weighted_index.is_some(), + error::NotSetWeightArraySnafu + ); + + // unwrap safety: whether weighted_index is none has been checked before. + let weighted_index = self.weighted_index.as_ref().unwrap(); + + Ok(self.items[weighted_index.sample(&mut thread_rng())] + .item + .clone()) + } + + fn reverse_choose_one(&mut self) -> Result { + ensure!( + !self.items.is_empty() && self.reverse_weighted_index.is_some(), + error::NotSetWeightArraySnafu + ); + + // unwrap safety: whether reverse_weighted_index is none has been checked before. + let reverse_weighted_index = self.reverse_weighted_index.as_ref().unwrap(); + + Ok(self.items[reverse_weighted_index.sample(&mut thread_rng())] + .item + .clone()) + } +} + +#[cfg(test)] +mod tests { + use super::{RandomWeightedChoose, WeightedChoose, WeightedItem}; + + #[test] + fn test_random_weighted_choose() { + let mut choose = RandomWeightedChoose::default(); + choose + .set_weight_array(vec![ + WeightedItem { + item: 1, + weight: 100, + reverse_weight: 0, + }, + WeightedItem { + item: 2, + weight: 0, + reverse_weight: 100, + }, + ]) + .unwrap(); + for _ in 0..100 { + let ret = choose.choose_one().unwrap(); + assert_eq!(1, ret); + } + + for _ in 0..100 { + let ret = choose.reverse_choose_one().unwrap(); + assert_eq!(2, ret); + } + } + + #[test] + #[should_panic] + fn test_random_weighted_choose_should_panic() { + let mut choose: RandomWeightedChoose = RandomWeightedChoose::default(); + choose.set_weight_array(vec![]).unwrap(); + let _ = choose.choose_one().unwrap(); + } + + #[test] + #[should_panic] + fn test_random_reverse_weighted_choose_should_panic() { + let mut choose: RandomWeightedChoose = RandomWeightedChoose::default(); + choose.set_weight_array(vec![]).unwrap(); + let _ = choose.reverse_choose_one().unwrap(); + } +} diff --git a/src/meta-srv/src/table_meta_alloc.rs b/src/meta-srv/src/table_meta_alloc.rs index cfefbba0c2..60fba75390 100644 --- a/src/meta-srv/src/table_meta_alloc.rs +++ b/src/meta-srv/src/table_meta_alloc.rs @@ -26,6 +26,7 @@ use table::metadata::RawTableInfo; use crate::error::{self, Result, TooManyPartitionsSnafu}; use crate::metasrv::{SelectorContext, SelectorRef}; +use crate::selector::SelectorOptions; pub struct MetaSrvTableMetadataAllocator { ctx: SelectorContext, @@ -78,12 +79,21 @@ async fn handle_create_region_routes( selector: &SelectorRef, table_id_sequence: &SequenceRef, ) -> Result<(TableId, Vec)> { - let mut peers = selector.select(cluster_id, ctx).await?; + let mut peers = selector + .select( + cluster_id, + ctx, + SelectorOptions { + min_required_items: partitions.len(), + allow_duplication: true, + }, + ) + .await?; if peers.len() < partitions.len() { warn!("Create table failed due to no enough available datanodes, table: {}, partition number: {}, datanode number: {}", format_full_table_name(&table_info.catalog_name,&table_info.schema_name,&table_info.name), partitions.len(), peers.len()); return error::NoEnoughAvailableDatanodeSnafu { - expected: partitions.len(), + required: partitions.len(), available: peers.len(), } .fail(); diff --git a/tests-integration/src/grpc.rs b/tests-integration/src/grpc.rs index 127598468c..123e864dd5 100644 --- a/tests-integration/src/grpc.rs +++ b/tests-integration/src/grpc.rs @@ -524,7 +524,7 @@ CREATE TABLE {table_name} ( .iter() .map(|(k, v)| (v[0], *k)) .collect::>(); - assert_eq!(region_to_dn_map.len(), expected_distribution.len()); + assert!(region_to_dn_map.len() <= instance.datanodes().len()); let stmt = QueryLanguageParser::parse_sql(&format!( "SELECT ts, a, b FROM {table_name} ORDER BY ts" diff --git a/tests-integration/src/instance.rs b/tests-integration/src/instance.rs index 3bff7bf6b9..ac5a2e4b3c 100644 --- a/tests-integration/src/instance.rs +++ b/tests-integration/src/instance.rs @@ -221,7 +221,7 @@ mod tests { .iter() .map(|(k, v)| (v[0], *k)) .collect::>(); - assert_eq!(region_to_dn_map.len(), expected_distribution.len()); + assert!(region_to_dn_map.len() <= instance.datanodes().len()); let stmt = QueryLanguageParser::parse_sql("SELECT ts, host FROM demo ORDER BY ts").unwrap(); let LogicalPlan::DfPlan(plan) = instance diff --git a/tests-integration/tests/region_failover.rs b/tests-integration/tests/region_failover.rs index de60fd8f84..612a1fd4ab 100644 --- a/tests-integration/tests/region_failover.rs +++ b/tests-integration/tests/region_failover.rs @@ -30,7 +30,7 @@ use futures::TryStreamExt; use meta_srv::error::Result as MetaResult; use meta_srv::metasrv::{SelectorContext, SelectorRef}; use meta_srv::procedure::region_failover::{RegionFailoverContext, RegionFailoverProcedure}; -use meta_srv::selector::{Namespace, Selector}; +use meta_srv::selector::{Namespace, Selector, SelectorOptions}; use servers::query_handler::sql::SqlQueryHandler; use session::context::{QueryContext, QueryContextRef}; use table::metadata::TableId; @@ -325,7 +325,12 @@ impl Selector for ForeignNodeSelector { type Context = SelectorContext; type Output = Vec; - async fn select(&self, _ns: Namespace, _ctx: &Self::Context) -> MetaResult { + async fn select( + &self, + _ns: Namespace, + _ctx: &Self::Context, + _opts: SelectorOptions, + ) -> MetaResult { Ok(vec![self.foreign.clone()]) } }