From 8dca448baf64c2c7c9b93119f5cd6c7f05495139 Mon Sep 17 00:00:00 2001 From: Weny Xu Date: Fri, 9 May 2025 18:16:21 +0800 Subject: [PATCH] feat: add datanode workloads support (#6055) * feat: add datanode workload type support * refactor: enhance datanode lease filtering with mode conditions * chore: update config.md * fix: fix clippy * chore: apply suggestions from CR * feat: add feature gate * fix: fmt and clippy * refactor: minor refactor * chore: apply suggestions from CR * chore: apply suggestions from CR * refactor: minior refactor * test: fix unit test --- Cargo.lock | 14 +- Cargo.toml | 4 +- src/cmd/src/datanode.rs | 1 + src/common/meta/Cargo.toml | 1 + src/common/meta/src/cluster.rs | 10 +- src/common/meta/src/datanode.rs | 8 +- src/common/meta/src/heartbeat/utils.rs | 41 +++- src/common/workload/Cargo.toml | 13 + src/common/workload/src/lib.rs | 68 ++++++ src/datanode/Cargo.toml | 1 + src/datanode/src/config.rs | 10 + src/datanode/src/heartbeat.rs | 10 +- src/meta-srv/Cargo.toml | 1 + .../handler/collect_cluster_info_handler.rs | 1 + .../src/handler/keep_lease_handler.rs | 14 +- src/meta-srv/src/key.rs | 7 +- src/meta-srv/src/lease.rs | 228 +++++++++++++++++- src/meta-srv/src/selector/lease_based.rs | 5 +- src/meta-srv/src/selector/load_based.rs | 11 +- src/meta-srv/src/selector/round_robin.rs | 4 +- src/meta-srv/src/test_util.rs | 23 +- 21 files changed, 442 insertions(+), 33 deletions(-) create mode 100644 src/common/workload/Cargo.toml create mode 100644 src/common/workload/src/lib.rs diff --git a/Cargo.lock b/Cargo.lock index 7c6863cbe4..f3c6810bd9 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2322,6 +2322,7 @@ dependencies = [ "common-telemetry", "common-time", "common-wal", + "common-workload", "datafusion-common", "datafusion-expr", "datatypes", @@ -2593,6 +2594,15 @@ dependencies = [ "toml 0.8.19", ] +[[package]] +name = "common-workload" +version = "0.15.0" +dependencies = [ + "api", + "common-telemetry", + "serde", +] + [[package]] name = "concurrent-queue" version = "2.5.0" @@ -3520,6 +3530,7 @@ dependencies = [ "common-time", "common-version", "common-wal", + "common-workload", "dashmap", "datafusion", "datafusion-common", @@ -4842,7 +4853,7 @@ dependencies = [ [[package]] name = "greptime-proto" version = "0.1.0" -source = "git+https://github.com/GreptimeTeam/greptime-proto.git?rev=17a3550751c8b1e02ec16be40101d5f24dc255c3#17a3550751c8b1e02ec16be40101d5f24dc255c3" +source = "git+https://github.com/GreptimeTeam/greptime-proto.git?rev=7668a882d57ca6a2333146e0574b8f0c9d5008ae#7668a882d57ca6a2333146e0574b8f0c9d5008ae" dependencies = [ "prost 0.13.5", "serde", @@ -6771,6 +6782,7 @@ dependencies = [ "common-time", "common-version", "common-wal", + "common-workload", "dashmap", "datatypes", "deadpool", diff --git a/Cargo.toml b/Cargo.toml index b6a76c4309..c655e42356 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -36,6 +36,7 @@ members = [ "src/common/time", "src/common/version", "src/common/wal", + "src/common/workload", "src/datanode", "src/datatypes", "src/file-engine", @@ -130,7 +131,7 @@ etcd-client = "0.14" fst = "0.4.7" futures = "0.3" futures-util = "0.3" -greptime-proto = { git = "https://github.com/GreptimeTeam/greptime-proto.git", rev = "17a3550751c8b1e02ec16be40101d5f24dc255c3" } +greptime-proto = { git = "https://github.com/GreptimeTeam/greptime-proto.git", rev = "7668a882d57ca6a2333146e0574b8f0c9d5008ae" } hex = "0.4" http = "1" humantime = "2.1" @@ -256,6 +257,7 @@ common-test-util = { path = "src/common/test-util" } common-time = { path = "src/common/time" } common-version = { path = "src/common/version" } common-wal = { path = "src/common/wal" } +common-workload = { path = "src/common/workload" } datanode = { path = "src/datanode" } datatypes = { path = "src/datatypes" } file-engine = { path = "src/file-engine" } diff --git a/src/cmd/src/datanode.rs b/src/cmd/src/datanode.rs index 6603fbc2bf..a5ccabe08d 100644 --- a/src/cmd/src/datanode.rs +++ b/src/cmd/src/datanode.rs @@ -156,6 +156,7 @@ impl StartCommand { .context(LoadLayeredConfigSnafu)?; self.merge_with_cli_options(global_options, &mut opts)?; + opts.component.sanitize(); Ok(opts) } diff --git a/src/common/meta/Cargo.toml b/src/common/meta/Cargo.toml index d83956903e..bc2facd703 100644 --- a/src/common/meta/Cargo.toml +++ b/src/common/meta/Cargo.toml @@ -34,6 +34,7 @@ common-recordbatch.workspace = true common-telemetry.workspace = true common-time.workspace = true common-wal.workspace = true +common-workload.workspace = true datafusion-common.workspace = true datafusion-expr.workspace = true datatypes.workspace = true diff --git a/src/common/meta/src/cluster.rs b/src/common/meta/src/cluster.rs index 9d9bc50362..f334734e7b 100644 --- a/src/common/meta/src/cluster.rs +++ b/src/common/meta/src/cluster.rs @@ -15,7 +15,7 @@ use std::hash::{DefaultHasher, Hash, Hasher}; use std::str::FromStr; -use api::v1::meta::HeartbeatRequest; +use api::v1::meta::{DatanodeWorkloads, HeartbeatRequest}; use common_error::ext::ErrorExt; use lazy_static::lazy_static; use regex::Regex; @@ -161,6 +161,8 @@ pub struct DatanodeStatus { pub leader_regions: usize, /// How many follower regions on this node. pub follower_regions: usize, + /// The workloads of the datanode. + pub workloads: DatanodeWorkloads, } /// The status of a frontend. @@ -281,6 +283,8 @@ impl TryFrom for Role { mod tests { use std::assert_matches::assert_matches; + use common_workload::DatanodeWorkloadType; + use super::*; use crate::cluster::Role::{Datanode, Frontend}; use crate::cluster::{DatanodeStatus, NodeInfo, NodeInfoKey, NodeStatus}; @@ -313,6 +317,9 @@ mod tests { wcus: 2, leader_regions: 3, follower_regions: 4, + workloads: DatanodeWorkloads { + types: vec![DatanodeWorkloadType::Hybrid.to_i32()], + }, }), version: "".to_string(), git_commit: "".to_string(), @@ -332,6 +339,7 @@ mod tests { wcus: 2, leader_regions: 3, follower_regions: 4, + .. }), start_time_ms: 1, .. diff --git a/src/common/meta/src/datanode.rs b/src/common/meta/src/datanode.rs index 499ed865a2..bee7f989c6 100644 --- a/src/common/meta/src/datanode.rs +++ b/src/common/meta/src/datanode.rs @@ -15,7 +15,7 @@ use std::collections::HashSet; use std::str::FromStr; -use api::v1::meta::{HeartbeatRequest, RequestHeader}; +use api::v1::meta::{DatanodeWorkloads, HeartbeatRequest, RequestHeader}; use common_time::util as time_util; use lazy_static::lazy_static; use regex::Regex; @@ -27,6 +27,7 @@ use table::metadata::TableId; use crate::error; use crate::error::Result; +use crate::heartbeat::utils::get_datanode_workloads; pub(crate) const DATANODE_LEASE_PREFIX: &str = "__meta_datanode_lease"; const INACTIVE_REGION_PREFIX: &str = "__meta_inactive_region"; @@ -65,6 +66,8 @@ pub struct Stat { pub region_stats: Vec, // The node epoch is used to check whether the node has restarted or redeployed. pub node_epoch: u64, + /// The datanode workloads. + pub datanode_workloads: DatanodeWorkloads, } /// The statistics of a region. @@ -197,6 +200,7 @@ impl TryFrom<&HeartbeatRequest> for Stat { peer, region_stats, node_epoch, + node_workloads, .. } = value; @@ -207,6 +211,7 @@ impl TryFrom<&HeartbeatRequest> for Stat { .map(RegionStat::from) .collect::>(); + let datanode_workloads = get_datanode_workloads(node_workloads.as_ref()); Ok(Self { timestamp_millis: time_util::current_time_millis(), // datanode id @@ -218,6 +223,7 @@ impl TryFrom<&HeartbeatRequest> for Stat { region_num: region_stats.len() as u64, region_stats, node_epoch: *node_epoch, + datanode_workloads, }) } (header, _) => Err(header.clone()), diff --git a/src/common/meta/src/heartbeat/utils.rs b/src/common/meta/src/heartbeat/utils.rs index afee357372..df438a01de 100644 --- a/src/common/meta/src/heartbeat/utils.rs +++ b/src/common/meta/src/heartbeat/utils.rs @@ -12,9 +12,12 @@ // See the License for the specific language governing permissions and // limitations under the License. +use api::v1::meta::heartbeat_request::NodeWorkloads; use api::v1::meta::mailbox_message::Payload; -use api::v1::meta::MailboxMessage; +use api::v1::meta::{DatanodeWorkloads, MailboxMessage}; +use common_telemetry::warn; use common_time::util::current_time_millis; +use common_workload::DatanodeWorkloadType; use snafu::{OptionExt, ResultExt}; use crate::error::{self, Result}; @@ -56,3 +59,39 @@ pub fn outgoing_message_to_mailbox_message( )), }) } + +/// Extracts datanode workloads from the provided optional `NodeWorkloads`. +/// +/// Returns default datanode workloads if the input is `None`. +pub fn get_datanode_workloads(node_workloads: Option<&NodeWorkloads>) -> DatanodeWorkloads { + match node_workloads { + Some(NodeWorkloads::Datanode(datanode_workloads)) => { + let mut datanode_workloads = datanode_workloads.clone(); + let unexpected_workloads = datanode_workloads + .types + .extract_if(.., |t| DatanodeWorkloadType::from_i32(*t).is_none()) + .collect::>(); + if !unexpected_workloads.is_empty() { + warn!("Unexpected datanode workloads: {:?}", unexpected_workloads); + } + datanode_workloads + } + _ => DatanodeWorkloads { + types: vec![DatanodeWorkloadType::Hybrid.to_i32()], + }, + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_get_datanode_workloads() { + let node_workloads = Some(NodeWorkloads::Datanode(DatanodeWorkloads { + types: vec![DatanodeWorkloadType::Hybrid.to_i32(), 100], + })); + let workloads = get_datanode_workloads(node_workloads.as_ref()); + assert_eq!(workloads.types, vec![DatanodeWorkloadType::Hybrid.to_i32()]); + } +} diff --git a/src/common/workload/Cargo.toml b/src/common/workload/Cargo.toml new file mode 100644 index 0000000000..b85692e30d --- /dev/null +++ b/src/common/workload/Cargo.toml @@ -0,0 +1,13 @@ +[package] +name = "common-workload" +version.workspace = true +edition.workspace = true +license.workspace = true + +[lints] +workspace = true + +[dependencies] +api.workspace = true +common-telemetry.workspace = true +serde.workspace = true diff --git a/src/common/workload/src/lib.rs b/src/common/workload/src/lib.rs new file mode 100644 index 0000000000..41fb9bd59e --- /dev/null +++ b/src/common/workload/src/lib.rs @@ -0,0 +1,68 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use common_telemetry::info; +use serde::{Deserialize, Serialize}; + +/// The workload type of the datanode. +#[derive(Clone, Copy, Debug, Serialize, Deserialize, PartialEq, Eq, Hash)] +#[serde(rename_all = "snake_case")] +pub enum DatanodeWorkloadType { + /// The datanode can handle all workloads. + Hybrid = 0, +} + +impl DatanodeWorkloadType { + /// Convert from `i32` to `DatanodeWorkloadType`. + pub fn from_i32(value: i32) -> Option { + match value { + v if v == Self::Hybrid as i32 => Some(Self::Hybrid), + _ => None, + } + } + + /// Convert from `DatanodeWorkloadType` to `i32`. + pub fn to_i32(self) -> i32 { + self as i32 + } + + pub fn accept_ingest(&self) -> bool { + matches!(self, Self::Hybrid) + } +} + +/// Sanitize the workload types. +pub fn sanitize_workload_types(workload_types: &mut Vec) { + if workload_types.is_empty() { + info!("The workload types is empty, use Hybrid workload type"); + workload_types.push(DatanodeWorkloadType::Hybrid); + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_sanitize_workload_types() { + let hybrid = DatanodeWorkloadType::Hybrid; + assert_eq!(hybrid as i32, 0); + let hybrid_i32 = hybrid.to_i32(); + assert_eq!(hybrid_i32, 0); + assert_eq!(DatanodeWorkloadType::from_i32(hybrid_i32), Some(hybrid)); + + let unexpected_i32 = 100; + assert_eq!(DatanodeWorkloadType::from_i32(unexpected_i32), None); + } +} diff --git a/src/datanode/Cargo.toml b/src/datanode/Cargo.toml index 3d0bfdda9b..15823fb498 100644 --- a/src/datanode/Cargo.toml +++ b/src/datanode/Cargo.toml @@ -33,6 +33,7 @@ common-telemetry.workspace = true common-time.workspace = true common-version.workspace = true common-wal.workspace = true +common-workload.workspace = true dashmap.workspace = true datafusion.workspace = true datafusion-common.workspace = true diff --git a/src/datanode/src/config.rs b/src/datanode/src/config.rs index 7d63057a72..9c845c5553 100644 --- a/src/datanode/src/config.rs +++ b/src/datanode/src/config.rs @@ -22,6 +22,7 @@ use common_config::Configurable; pub use common_procedure::options::ProcedureConfig; use common_telemetry::logging::{LoggingOptions, TracingOptions}; use common_wal::config::DatanodeWalConfig; +use common_workload::{sanitize_workload_types, DatanodeWorkloadType}; use file_engine::config::EngineConfig as FileEngineConfig; use meta_client::MetaClientOptions; use metric_engine::config::EngineConfig as MetricEngineConfig; @@ -360,6 +361,7 @@ impl Default for ObjectStoreConfig { #[serde(default)] pub struct DatanodeOptions { pub node_id: Option, + pub workload_types: Vec, pub require_lease_before_startup: bool, pub init_regions_in_background: bool, pub init_regions_parallelism: usize, @@ -391,11 +393,19 @@ pub struct DatanodeOptions { pub rpc_max_send_message_size: Option, } +impl DatanodeOptions { + /// Sanitize the `DatanodeOptions` to ensure the config is valid. + pub fn sanitize(&mut self) { + sanitize_workload_types(&mut self.workload_types); + } +} + impl Default for DatanodeOptions { #[allow(deprecated)] fn default() -> Self { Self { node_id: None, + workload_types: vec![DatanodeWorkloadType::Hybrid], require_lease_before_startup: false, init_regions_in_background: false, init_regions_parallelism: 16, diff --git a/src/datanode/src/heartbeat.rs b/src/datanode/src/heartbeat.rs index 606144ee38..590095bee2 100644 --- a/src/datanode/src/heartbeat.rs +++ b/src/datanode/src/heartbeat.rs @@ -17,7 +17,8 @@ use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::Arc; use std::time::Duration; -use api::v1::meta::{HeartbeatRequest, NodeInfo, Peer, RegionRole, RegionStat}; +use api::v1::meta::heartbeat_request::NodeWorkloads; +use api::v1::meta::{DatanodeWorkloads, HeartbeatRequest, NodeInfo, Peer, RegionRole, RegionStat}; use common_base::Plugins; use common_meta::cache_invalidator::CacheInvalidatorRef; use common_meta::datanode::REGION_STATISTIC_KEY; @@ -30,6 +31,7 @@ use common_meta::heartbeat::handler::{ use common_meta::heartbeat::mailbox::{HeartbeatMailbox, MailboxRef}; use common_meta::heartbeat::utils::outgoing_message_to_mailbox_message; use common_telemetry::{debug, error, info, trace, warn}; +use common_workload::DatanodeWorkloadType; use meta_client::client::{HeartbeatSender, MetaClient}; use meta_client::MetaClientRef; use servers::addrs; @@ -51,6 +53,7 @@ pub(crate) mod task_tracker; /// The datanode heartbeat task which sending `[HeartbeatRequest]` to Metasrv periodically in background. pub struct HeartbeatTask { node_id: u64, + workload_types: Vec, node_epoch: u64, peer_addr: String, running: Arc, @@ -91,6 +94,7 @@ impl HeartbeatTask { Ok(Self { node_id: opts.node_id.unwrap_or(0), + workload_types: opts.workload_types.clone(), // We use datanode's start time millis as the node's epoch. node_epoch: common_time::util::current_time_millis() as u64, peer_addr: addrs::resolve_addr(&opts.grpc.bind_addr, Some(&opts.grpc.server_addr)), @@ -221,6 +225,7 @@ impl HeartbeatTask { addr: addr.clone(), }); let epoch = self.region_alive_keeper.epoch(); + let workload_types = self.workload_types.clone(); self.region_alive_keeper.start(Some(event_receiver)).await?; let mut last_sent = Instant::now(); @@ -239,6 +244,9 @@ impl HeartbeatTask { start_time_ms: node_epoch, cpus: num_cpus::get() as u32, }), + node_workloads: Some(NodeWorkloads::Datanode(DatanodeWorkloads { + types: workload_types.iter().map(|w| w.to_i32()).collect(), + })), ..Default::default() }; diff --git a/src/meta-srv/Cargo.toml b/src/meta-srv/Cargo.toml index 0e0f3e5cde..6c61ffd546 100644 --- a/src/meta-srv/Cargo.toml +++ b/src/meta-srv/Cargo.toml @@ -38,6 +38,7 @@ common-telemetry.workspace = true common-time.workspace = true common-version.workspace = true common-wal.workspace = true +common-workload.workspace = true dashmap.workspace = true datatypes.workspace = true deadpool = { workspace = true, optional = true } diff --git a/src/meta-srv/src/handler/collect_cluster_info_handler.rs b/src/meta-srv/src/handler/collect_cluster_info_handler.rs index 7e470377c0..1a76b4a8cd 100644 --- a/src/meta-srv/src/handler/collect_cluster_info_handler.rs +++ b/src/meta-srv/src/handler/collect_cluster_info_handler.rs @@ -131,6 +131,7 @@ impl HeartbeatHandler for CollectDatanodeClusterInfoHandler { wcus: stat.wcus, leader_regions, follower_regions, + workloads: stat.datanode_workloads.clone(), }), version: info.version, git_commit: info.git_commit, diff --git a/src/meta-srv/src/handler/keep_lease_handler.rs b/src/meta-srv/src/handler/keep_lease_handler.rs index e57e0307f0..007d328ef0 100644 --- a/src/meta-srv/src/handler/keep_lease_handler.rs +++ b/src/meta-srv/src/handler/keep_lease_handler.rs @@ -12,7 +12,9 @@ // See the License for the specific language governing permissions and // limitations under the License. -use api::v1::meta::{HeartbeatRequest, Peer, Role}; +use api::v1::meta::heartbeat_request::NodeWorkloads; +use api::v1::meta::{FlownodeWorkloads, HeartbeatRequest, Peer, Role}; +use common_meta::heartbeat::utils::get_datanode_workloads; use common_meta::rpc::store::PutRequest; use common_telemetry::{trace, warn}; use common_time::util as time_util; @@ -37,7 +39,12 @@ impl HeartbeatHandler for DatanodeKeepLeaseHandler { ctx: &mut Context, _acc: &mut HeartbeatAccumulator, ) -> Result { - let HeartbeatRequest { header, peer, .. } = req; + let HeartbeatRequest { + header, + peer, + node_workloads, + .. + } = req; let Some(_header) = &header else { return Ok(HandleControl::Continue); }; @@ -45,10 +52,12 @@ impl HeartbeatHandler for DatanodeKeepLeaseHandler { return Ok(HandleControl::Continue); }; + let datanode_workloads = get_datanode_workloads(node_workloads.as_ref()); let key = DatanodeLeaseKey { node_id: peer.id }; let value = LeaseValue { timestamp_millis: time_util::current_time_millis(), node_addr: peer.addr.clone(), + workloads: NodeWorkloads::Datanode(datanode_workloads), }; trace!("Receive a heartbeat from datanode: {key:?}, {value:?}"); @@ -88,6 +97,7 @@ impl HeartbeatHandler for FlownodeKeepLeaseHandler { let value = LeaseValue { timestamp_millis: time_util::current_time_millis(), node_addr: peer.addr.clone(), + workloads: NodeWorkloads::Flownode(FlownodeWorkloads { types: vec![] }), }; trace!("Receive a heartbeat from flownode: {key:?}, {value:?}"); diff --git a/src/meta-srv/src/key.rs b/src/meta-srv/src/key.rs index aabebb8bbc..33f40c87df 100644 --- a/src/meta-srv/src/key.rs +++ b/src/meta-srv/src/key.rs @@ -17,6 +17,7 @@ mod flownode; use std::str::FromStr; +use api::v1::meta::heartbeat_request::NodeWorkloads; pub use datanode::*; pub use flownode::*; use serde::{Deserialize, Serialize}; @@ -74,11 +75,12 @@ macro_rules! impl_try_from_lease_key { impl_try_from_lease_key!(FlownodeLeaseKey, FLOWNODE_LEASE_PREFIX); impl_try_from_lease_key!(DatanodeLeaseKey, DATANODE_LEASE_PREFIX); -#[derive(Debug, Clone, Eq, PartialEq, Serialize, Deserialize)] +#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)] pub struct LeaseValue { // last activity pub timestamp_millis: i64, pub node_addr: String, + pub workloads: NodeWorkloads, } impl FromStr for LeaseValue { @@ -113,6 +115,8 @@ impl TryFrom for Vec { #[cfg(test)] mod tests { + use api::v1::meta::DatanodeWorkloads; + use super::*; #[test] @@ -120,6 +124,7 @@ mod tests { let value = LeaseValue { timestamp_millis: 111, node_addr: "127.0.0.1:3002".to_string(), + workloads: NodeWorkloads::Datanode(DatanodeWorkloads { types: vec![] }), }; let value_bytes: Vec = value.clone().try_into().unwrap(); diff --git a/src/meta-srv/src/lease.rs b/src/meta-srv/src/lease.rs index 53f1ecd8ed..e9254c2d4f 100644 --- a/src/meta-srv/src/lease.rs +++ b/src/meta-srv/src/lease.rs @@ -13,13 +13,18 @@ // limitations under the License. use std::collections::HashMap; +use std::future::Future; use std::hash::Hash; +use std::pin::Pin; +use std::task::{Context, Poll}; +use api::v1::meta::heartbeat_request::NodeWorkloads; use common_error::ext::BoxedError; use common_meta::kv_backend::{KvBackend, ResettableKvBackendRef}; use common_meta::peer::{Peer, PeerLookupService}; use common_meta::{util, DatanodeId, FlownodeId}; use common_time::util as time_util; +use common_workload::DatanodeWorkloadType; use snafu::ResultExt; use crate::cluster::MetaPeerClientRef; @@ -33,6 +38,22 @@ fn build_lease_filter(lease_secs: u64) -> impl Fn(&LeaseValue) -> bool { } } +/// Returns true if the datanode can accept ingest workload based on its workload types. +/// +/// A datanode is considered to accept ingest workload if it supports either: +/// - Hybrid workload (both ingest and query workloads) +/// - Ingest workload (only ingest workload) +pub fn is_datanode_accept_ingest_workload(lease_value: &LeaseValue) -> bool { + match &lease_value.workloads { + NodeWorkloads::Datanode(workloads) => workloads + .types + .iter() + .filter_map(|w| DatanodeWorkloadType::from_i32(*w)) + .any(|w| w.accept_ingest()), + _ => false, + } +} + /// Returns the lease value of the given datanode id, if the datanode is not found, returns None. pub async fn find_datanode_lease_value( datanode_id: DatanodeId, @@ -81,16 +102,78 @@ pub async fn lookup_datanode_peer( } } +type LeaseFilterFuture<'a, K> = + Pin>> + Send + 'a>>; + +pub struct LeaseFilter<'a, K> +where + K: Eq + Hash + TryFrom, Error = Error> + 'a, +{ + lease_secs: u64, + key_prefix: Vec, + meta_peer_client: &'a MetaPeerClientRef, + condition: Option bool>, + inner_future: Option>, +} + +impl<'a, K> LeaseFilter<'a, K> +where + K: Eq + Hash + TryFrom, Error = Error> + 'a, +{ + pub fn new( + lease_secs: u64, + key_prefix: Vec, + meta_peer_client: &'a MetaPeerClientRef, + ) -> Self { + Self { + lease_secs, + key_prefix, + meta_peer_client, + condition: None, + inner_future: None, + } + } + + /// Set the condition for the lease filter. + pub fn with_condition(mut self, condition: fn(&LeaseValue) -> bool) -> Self { + self.condition = Some(condition); + self + } +} + +impl<'a, K> Future for LeaseFilter<'a, K> +where + K: Eq + Hash + TryFrom, Error = Error> + 'a, +{ + type Output = Result>; + + fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + let this = self.get_mut(); + + if this.inner_future.is_none() { + let lease_filter = build_lease_filter(this.lease_secs); + let condition = this.condition; + let key_prefix = std::mem::take(&mut this.key_prefix); + let fut = filter(key_prefix, this.meta_peer_client, move |v| { + lease_filter(v) && condition.unwrap_or(|_| true)(v) + }); + + this.inner_future = Some(Box::pin(fut)); + } + + let fut = this.inner_future.as_mut().unwrap(); + let result = futures::ready!(fut.as_mut().poll(cx))?; + + Poll::Ready(Ok(result)) + } +} + /// Find all alive datanodes -pub async fn alive_datanodes( +pub fn alive_datanodes( meta_peer_client: &MetaPeerClientRef, lease_secs: u64, -) -> Result> { - let predicate = build_lease_filter(lease_secs); - filter(DatanodeLeaseKey::prefix_key(), meta_peer_client, |v| { - predicate(v) - }) - .await +) -> LeaseFilter<'_, DatanodeLeaseKey> { + LeaseFilter::new(lease_secs, DatanodeLeaseKey::prefix_key(), meta_peer_client) } /// look up [`Peer`] given [`ClusterId`] and [`DatanodeId`], only return if it's alive under given `lease_secs` @@ -121,17 +204,15 @@ pub async fn lookup_flownode_peer( } /// Find all alive flownodes -pub async fn alive_flownodes( +pub fn alive_flownodes( meta_peer_client: &MetaPeerClientRef, lease_secs: u64, -) -> Result> { - let predicate = build_lease_filter(lease_secs); - filter( +) -> LeaseFilter<'_, FlownodeLeaseKey> { + LeaseFilter::new( + lease_secs, FlownodeLeaseKey::prefix_key_by_cluster(), meta_peer_client, - |v| predicate(v), ) - .await } pub async fn filter( @@ -190,3 +271,124 @@ impl PeerLookupService for MetaPeerLookupService { .context(common_meta::error::ExternalSnafu) } } + +#[cfg(test)] +mod tests { + use api::v1::meta::heartbeat_request::NodeWorkloads; + use api::v1::meta::DatanodeWorkloads; + use common_meta::kv_backend::ResettableKvBackendRef; + use common_meta::rpc::store::PutRequest; + use common_time::util::current_time_millis; + use common_workload::DatanodeWorkloadType; + + use crate::key::{DatanodeLeaseKey, LeaseValue}; + use crate::lease::{alive_datanodes, is_datanode_accept_ingest_workload}; + use crate::test_util::create_meta_peer_client; + + async fn put_lease_value( + kv_backend: &ResettableKvBackendRef, + key: DatanodeLeaseKey, + value: LeaseValue, + ) { + kv_backend + .put(PutRequest { + key: key.try_into().unwrap(), + value: value.try_into().unwrap(), + prev_kv: false, + }) + .await + .unwrap(); + } + + #[tokio::test] + async fn test_alive_datanodes() { + let client = create_meta_peer_client(); + let in_memory = client.memory_backend(); + let lease_secs = 10; + + // put a stale lease value for node 1 + let key = DatanodeLeaseKey { node_id: 1 }; + let value = LeaseValue { + // 20s ago + timestamp_millis: current_time_millis() - lease_secs * 2 * 1000, + node_addr: "127.0.0.1:20201".to_string(), + workloads: NodeWorkloads::Datanode(DatanodeWorkloads { + types: vec![DatanodeWorkloadType::Hybrid as i32], + }), + }; + put_lease_value(&in_memory, key, value).await; + + // put a fresh lease value for node 2 + let key = DatanodeLeaseKey { node_id: 2 }; + let value = LeaseValue { + timestamp_millis: current_time_millis(), + node_addr: "127.0.0.1:20202".to_string(), + workloads: NodeWorkloads::Datanode(DatanodeWorkloads { + types: vec![DatanodeWorkloadType::Hybrid as i32], + }), + }; + put_lease_value(&in_memory, key.clone(), value.clone()).await; + let leases = alive_datanodes(&client, lease_secs as u64).await.unwrap(); + assert_eq!(leases.len(), 1); + assert_eq!(leases.get(&key), Some(&value)); + } + + #[tokio::test] + async fn test_alive_datanodes_with_condition() { + let client = create_meta_peer_client(); + let in_memory = client.memory_backend(); + let lease_secs = 10; + + // put a lease value for node 1 without mode info + let key = DatanodeLeaseKey { node_id: 1 }; + let value = LeaseValue { + // 20s ago + timestamp_millis: current_time_millis() - 20 * 1000, + node_addr: "127.0.0.1:20201".to_string(), + workloads: NodeWorkloads::Datanode(DatanodeWorkloads { + types: vec![DatanodeWorkloadType::Hybrid as i32], + }), + }; + put_lease_value(&in_memory, key, value).await; + + // put a lease value for node 2 with mode info + let key = DatanodeLeaseKey { node_id: 2 }; + let value = LeaseValue { + timestamp_millis: current_time_millis(), + node_addr: "127.0.0.1:20202".to_string(), + workloads: NodeWorkloads::Datanode(DatanodeWorkloads { + types: vec![DatanodeWorkloadType::Hybrid as i32], + }), + }; + put_lease_value(&in_memory, key, value).await; + + // put a lease value for node 3 with mode info + let key = DatanodeLeaseKey { node_id: 3 }; + let value = LeaseValue { + timestamp_millis: current_time_millis(), + node_addr: "127.0.0.1:20203".to_string(), + workloads: NodeWorkloads::Datanode(DatanodeWorkloads { + types: vec![i32::MAX], + }), + }; + put_lease_value(&in_memory, key, value).await; + + // put a lease value for node 3 with mode info + let key = DatanodeLeaseKey { node_id: 4 }; + let value = LeaseValue { + timestamp_millis: current_time_millis(), + node_addr: "127.0.0.1:20204".to_string(), + workloads: NodeWorkloads::Datanode(DatanodeWorkloads { + types: vec![i32::MAX], + }), + }; + put_lease_value(&in_memory, key, value).await; + + let leases = alive_datanodes(&client, lease_secs as u64) + .with_condition(is_datanode_accept_ingest_workload) + .await + .unwrap(); + assert_eq!(leases.len(), 1); + assert!(leases.contains_key(&DatanodeLeaseKey { node_id: 2 })); + } +} diff --git a/src/meta-srv/src/selector/lease_based.rs b/src/meta-srv/src/selector/lease_based.rs index 448c26b08e..53a0dcceb5 100644 --- a/src/meta-srv/src/selector/lease_based.rs +++ b/src/meta-srv/src/selector/lease_based.rs @@ -51,8 +51,9 @@ impl Selector for LeaseBasedSelector { async fn select(&self, ctx: &Self::Context, opts: SelectorOptions) -> Result { // 1. get alive datanodes. - let lease_kvs = - lease::alive_datanodes(&ctx.meta_peer_client, ctx.datanode_lease_secs).await?; + let lease_kvs = lease::alive_datanodes(&ctx.meta_peer_client, ctx.datanode_lease_secs) + .with_condition(lease::is_datanode_accept_ingest_workload) + .await?; // 2. compute weight array, but the weight of each item is the same. let mut weight_array = lease_kvs diff --git a/src/meta-srv/src/selector/load_based.rs b/src/meta-srv/src/selector/load_based.rs index 4f33245a28..70af63f085 100644 --- a/src/meta-srv/src/selector/load_based.rs +++ b/src/meta-srv/src/selector/load_based.rs @@ -66,8 +66,9 @@ where async fn select(&self, ctx: &Self::Context, opts: SelectorOptions) -> Result { // 1. get alive datanodes. - let lease_kvs = - lease::alive_datanodes(&ctx.meta_peer_client, ctx.datanode_lease_secs).await?; + let lease_kvs = lease::alive_datanodes(&ctx.meta_peer_client, ctx.datanode_lease_secs) + .with_condition(lease::is_datanode_accept_ingest_workload) + .await?; // 2. get stat kvs and filter out expired datanodes. let stat_keys = lease_kvs.keys().map(|k| k.into()).collect(); @@ -166,7 +167,10 @@ async fn get_leader_peer_ids( mod tests { use std::collections::HashMap; + use api::v1::meta::heartbeat_request::NodeWorkloads; + use api::v1::meta::DatanodeWorkloads; use common_meta::datanode::{DatanodeStatKey, DatanodeStatValue}; + use common_workload::DatanodeWorkloadType; use crate::key::{DatanodeLeaseKey, LeaseValue}; use crate::selector::load_based::filter_out_expired_datanode; @@ -193,6 +197,9 @@ mod tests { LeaseValue { timestamp_millis: 0, node_addr: "127.0.0.1:3002".to_string(), + workloads: NodeWorkloads::Datanode(DatanodeWorkloads { + types: vec![DatanodeWorkloadType::Hybrid.to_i32()], + }), }, ); diff --git a/src/meta-srv/src/selector/round_robin.rs b/src/meta-srv/src/selector/round_robin.rs index d930ca06cb..536892dfca 100644 --- a/src/meta-srv/src/selector/round_robin.rs +++ b/src/meta-srv/src/selector/round_robin.rs @@ -62,7 +62,9 @@ impl RoundRobinSelector { SelectTarget::Datanode => { // 1. get alive datanodes. let lease_kvs = - lease::alive_datanodes(&ctx.meta_peer_client, ctx.datanode_lease_secs).await?; + lease::alive_datanodes(&ctx.meta_peer_client, ctx.datanode_lease_secs) + .with_condition(lease::is_datanode_accept_ingest_workload) + .await?; let mut exclude_peer_ids = self .node_excluder diff --git a/src/meta-srv/src/test_util.rs b/src/meta-srv/src/test_util.rs index b12e11fd19..01db2d00d9 100644 --- a/src/meta-srv/src/test_util.rs +++ b/src/meta-srv/src/test_util.rs @@ -14,10 +14,13 @@ use std::sync::Arc; +use api::v1::meta::heartbeat_request::NodeWorkloads; +use api::v1::meta::DatanodeWorkloads; use common_meta::kv_backend::memory::MemoryKvBackend; use common_meta::peer::Peer; use common_meta::rpc::router::{Region, RegionRoute}; use common_time::util as time_util; +use common_workload::DatanodeWorkloadType; use crate::cluster::{MetaPeerClientBuilder, MetaPeerClientRef}; use crate::key::{DatanodeLeaseKey, LeaseValue}; @@ -40,17 +43,22 @@ pub(crate) fn new_region_route(region_id: u64, peers: &[Peer], leader_node: u64) } } -/// Builds and returns a [`SelectorContext`]. To access its inner state, -/// use `memory_backend` on [`MetaPeerClientRef`]. -pub(crate) fn create_selector_context() -> SelectorContext { +pub(crate) fn create_meta_peer_client() -> MetaPeerClientRef { let in_memory = Arc::new(MemoryKvBackend::new()); - let meta_peer_client = MetaPeerClientBuilder::default() + MetaPeerClientBuilder::default() .election(None) - .in_memory(in_memory.clone()) + .in_memory(in_memory) .build() .map(Arc::new) // Safety: all required fields set at initialization - .unwrap(); + .unwrap() +} + +/// Builds and returns a [`SelectorContext`]. To access its inner state, +/// use `memory_backend` on [`MetaPeerClientRef`]. +pub(crate) fn create_selector_context() -> SelectorContext { + let meta_peer_client = create_meta_peer_client(); + let in_memory = meta_peer_client.memory_backend(); SelectorContext { datanode_lease_secs: 10, @@ -71,6 +79,9 @@ pub(crate) async fn put_datanodes(meta_peer_client: &MetaPeerClientRef, datanode let lease_value = LeaseValue { timestamp_millis: time_util::current_time_millis(), node_addr: datanode.addr, + workloads: NodeWorkloads::Datanode(DatanodeWorkloads { + types: vec![DatanodeWorkloadType::Hybrid.to_i32()], + }), }; let lease_key_bytes: Vec = lease_key.try_into().unwrap(); let lease_value_bytes: Vec = lease_value.try_into().unwrap();