feat: add datanode workloads support (#6055)

* feat: add datanode workload type support

* refactor: enhance datanode lease filtering with mode conditions

* chore: update config.md

* fix: fix clippy

* chore: apply suggestions from CR

* feat: add feature gate

* fix: fmt and clippy

* refactor: minor refactor

* chore: apply suggestions from CR

* chore: apply suggestions from CR

* refactor: minior refactor

* test: fix unit test
This commit is contained in:
Weny Xu
2025-05-09 18:16:21 +08:00
committed by GitHub
parent 828f69a562
commit 8dca448baf
21 changed files with 442 additions and 33 deletions

14
Cargo.lock generated
View File

@@ -2322,6 +2322,7 @@ dependencies = [
"common-telemetry",
"common-time",
"common-wal",
"common-workload",
"datafusion-common",
"datafusion-expr",
"datatypes",
@@ -2593,6 +2594,15 @@ dependencies = [
"toml 0.8.19",
]
[[package]]
name = "common-workload"
version = "0.15.0"
dependencies = [
"api",
"common-telemetry",
"serde",
]
[[package]]
name = "concurrent-queue"
version = "2.5.0"
@@ -3520,6 +3530,7 @@ dependencies = [
"common-time",
"common-version",
"common-wal",
"common-workload",
"dashmap",
"datafusion",
"datafusion-common",
@@ -4842,7 +4853,7 @@ dependencies = [
[[package]]
name = "greptime-proto"
version = "0.1.0"
source = "git+https://github.com/GreptimeTeam/greptime-proto.git?rev=17a3550751c8b1e02ec16be40101d5f24dc255c3#17a3550751c8b1e02ec16be40101d5f24dc255c3"
source = "git+https://github.com/GreptimeTeam/greptime-proto.git?rev=7668a882d57ca6a2333146e0574b8f0c9d5008ae#7668a882d57ca6a2333146e0574b8f0c9d5008ae"
dependencies = [
"prost 0.13.5",
"serde",
@@ -6771,6 +6782,7 @@ dependencies = [
"common-time",
"common-version",
"common-wal",
"common-workload",
"dashmap",
"datatypes",
"deadpool",

View File

@@ -36,6 +36,7 @@ members = [
"src/common/time",
"src/common/version",
"src/common/wal",
"src/common/workload",
"src/datanode",
"src/datatypes",
"src/file-engine",
@@ -130,7 +131,7 @@ etcd-client = "0.14"
fst = "0.4.7"
futures = "0.3"
futures-util = "0.3"
greptime-proto = { git = "https://github.com/GreptimeTeam/greptime-proto.git", rev = "17a3550751c8b1e02ec16be40101d5f24dc255c3" }
greptime-proto = { git = "https://github.com/GreptimeTeam/greptime-proto.git", rev = "7668a882d57ca6a2333146e0574b8f0c9d5008ae" }
hex = "0.4"
http = "1"
humantime = "2.1"
@@ -256,6 +257,7 @@ common-test-util = { path = "src/common/test-util" }
common-time = { path = "src/common/time" }
common-version = { path = "src/common/version" }
common-wal = { path = "src/common/wal" }
common-workload = { path = "src/common/workload" }
datanode = { path = "src/datanode" }
datatypes = { path = "src/datatypes" }
file-engine = { path = "src/file-engine" }

View File

@@ -156,6 +156,7 @@ impl StartCommand {
.context(LoadLayeredConfigSnafu)?;
self.merge_with_cli_options(global_options, &mut opts)?;
opts.component.sanitize();
Ok(opts)
}

View File

@@ -34,6 +34,7 @@ common-recordbatch.workspace = true
common-telemetry.workspace = true
common-time.workspace = true
common-wal.workspace = true
common-workload.workspace = true
datafusion-common.workspace = true
datafusion-expr.workspace = true
datatypes.workspace = true

View File

@@ -15,7 +15,7 @@
use std::hash::{DefaultHasher, Hash, Hasher};
use std::str::FromStr;
use api::v1::meta::HeartbeatRequest;
use api::v1::meta::{DatanodeWorkloads, HeartbeatRequest};
use common_error::ext::ErrorExt;
use lazy_static::lazy_static;
use regex::Regex;
@@ -161,6 +161,8 @@ pub struct DatanodeStatus {
pub leader_regions: usize,
/// How many follower regions on this node.
pub follower_regions: usize,
/// The workloads of the datanode.
pub workloads: DatanodeWorkloads,
}
/// The status of a frontend.
@@ -281,6 +283,8 @@ impl TryFrom<i32> for Role {
mod tests {
use std::assert_matches::assert_matches;
use common_workload::DatanodeWorkloadType;
use super::*;
use crate::cluster::Role::{Datanode, Frontend};
use crate::cluster::{DatanodeStatus, NodeInfo, NodeInfoKey, NodeStatus};
@@ -313,6 +317,9 @@ mod tests {
wcus: 2,
leader_regions: 3,
follower_regions: 4,
workloads: DatanodeWorkloads {
types: vec![DatanodeWorkloadType::Hybrid.to_i32()],
},
}),
version: "".to_string(),
git_commit: "".to_string(),
@@ -332,6 +339,7 @@ mod tests {
wcus: 2,
leader_regions: 3,
follower_regions: 4,
..
}),
start_time_ms: 1,
..

View File

@@ -15,7 +15,7 @@
use std::collections::HashSet;
use std::str::FromStr;
use api::v1::meta::{HeartbeatRequest, RequestHeader};
use api::v1::meta::{DatanodeWorkloads, HeartbeatRequest, RequestHeader};
use common_time::util as time_util;
use lazy_static::lazy_static;
use regex::Regex;
@@ -27,6 +27,7 @@ use table::metadata::TableId;
use crate::error;
use crate::error::Result;
use crate::heartbeat::utils::get_datanode_workloads;
pub(crate) const DATANODE_LEASE_PREFIX: &str = "__meta_datanode_lease";
const INACTIVE_REGION_PREFIX: &str = "__meta_inactive_region";
@@ -65,6 +66,8 @@ pub struct Stat {
pub region_stats: Vec<RegionStat>,
// The node epoch is used to check whether the node has restarted or redeployed.
pub node_epoch: u64,
/// The datanode workloads.
pub datanode_workloads: DatanodeWorkloads,
}
/// The statistics of a region.
@@ -197,6 +200,7 @@ impl TryFrom<&HeartbeatRequest> for Stat {
peer,
region_stats,
node_epoch,
node_workloads,
..
} = value;
@@ -207,6 +211,7 @@ impl TryFrom<&HeartbeatRequest> for Stat {
.map(RegionStat::from)
.collect::<Vec<_>>();
let datanode_workloads = get_datanode_workloads(node_workloads.as_ref());
Ok(Self {
timestamp_millis: time_util::current_time_millis(),
// datanode id
@@ -218,6 +223,7 @@ impl TryFrom<&HeartbeatRequest> for Stat {
region_num: region_stats.len() as u64,
region_stats,
node_epoch: *node_epoch,
datanode_workloads,
})
}
(header, _) => Err(header.clone()),

View File

@@ -12,9 +12,12 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use api::v1::meta::heartbeat_request::NodeWorkloads;
use api::v1::meta::mailbox_message::Payload;
use api::v1::meta::MailboxMessage;
use api::v1::meta::{DatanodeWorkloads, MailboxMessage};
use common_telemetry::warn;
use common_time::util::current_time_millis;
use common_workload::DatanodeWorkloadType;
use snafu::{OptionExt, ResultExt};
use crate::error::{self, Result};
@@ -56,3 +59,39 @@ pub fn outgoing_message_to_mailbox_message(
)),
})
}
/// Extracts datanode workloads from the provided optional `NodeWorkloads`.
///
/// Returns default datanode workloads if the input is `None`.
pub fn get_datanode_workloads(node_workloads: Option<&NodeWorkloads>) -> DatanodeWorkloads {
match node_workloads {
Some(NodeWorkloads::Datanode(datanode_workloads)) => {
let mut datanode_workloads = datanode_workloads.clone();
let unexpected_workloads = datanode_workloads
.types
.extract_if(.., |t| DatanodeWorkloadType::from_i32(*t).is_none())
.collect::<Vec<_>>();
if !unexpected_workloads.is_empty() {
warn!("Unexpected datanode workloads: {:?}", unexpected_workloads);
}
datanode_workloads
}
_ => DatanodeWorkloads {
types: vec![DatanodeWorkloadType::Hybrid.to_i32()],
},
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_get_datanode_workloads() {
let node_workloads = Some(NodeWorkloads::Datanode(DatanodeWorkloads {
types: vec![DatanodeWorkloadType::Hybrid.to_i32(), 100],
}));
let workloads = get_datanode_workloads(node_workloads.as_ref());
assert_eq!(workloads.types, vec![DatanodeWorkloadType::Hybrid.to_i32()]);
}
}

View File

@@ -0,0 +1,13 @@
[package]
name = "common-workload"
version.workspace = true
edition.workspace = true
license.workspace = true
[lints]
workspace = true
[dependencies]
api.workspace = true
common-telemetry.workspace = true
serde.workspace = true

View File

@@ -0,0 +1,68 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use common_telemetry::info;
use serde::{Deserialize, Serialize};
/// The workload type of the datanode.
#[derive(Clone, Copy, Debug, Serialize, Deserialize, PartialEq, Eq, Hash)]
#[serde(rename_all = "snake_case")]
pub enum DatanodeWorkloadType {
/// The datanode can handle all workloads.
Hybrid = 0,
}
impl DatanodeWorkloadType {
/// Convert from `i32` to `DatanodeWorkloadType`.
pub fn from_i32(value: i32) -> Option<Self> {
match value {
v if v == Self::Hybrid as i32 => Some(Self::Hybrid),
_ => None,
}
}
/// Convert from `DatanodeWorkloadType` to `i32`.
pub fn to_i32(self) -> i32 {
self as i32
}
pub fn accept_ingest(&self) -> bool {
matches!(self, Self::Hybrid)
}
}
/// Sanitize the workload types.
pub fn sanitize_workload_types(workload_types: &mut Vec<DatanodeWorkloadType>) {
if workload_types.is_empty() {
info!("The workload types is empty, use Hybrid workload type");
workload_types.push(DatanodeWorkloadType::Hybrid);
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_sanitize_workload_types() {
let hybrid = DatanodeWorkloadType::Hybrid;
assert_eq!(hybrid as i32, 0);
let hybrid_i32 = hybrid.to_i32();
assert_eq!(hybrid_i32, 0);
assert_eq!(DatanodeWorkloadType::from_i32(hybrid_i32), Some(hybrid));
let unexpected_i32 = 100;
assert_eq!(DatanodeWorkloadType::from_i32(unexpected_i32), None);
}
}

View File

@@ -33,6 +33,7 @@ common-telemetry.workspace = true
common-time.workspace = true
common-version.workspace = true
common-wal.workspace = true
common-workload.workspace = true
dashmap.workspace = true
datafusion.workspace = true
datafusion-common.workspace = true

View File

@@ -22,6 +22,7 @@ use common_config::Configurable;
pub use common_procedure::options::ProcedureConfig;
use common_telemetry::logging::{LoggingOptions, TracingOptions};
use common_wal::config::DatanodeWalConfig;
use common_workload::{sanitize_workload_types, DatanodeWorkloadType};
use file_engine::config::EngineConfig as FileEngineConfig;
use meta_client::MetaClientOptions;
use metric_engine::config::EngineConfig as MetricEngineConfig;
@@ -360,6 +361,7 @@ impl Default for ObjectStoreConfig {
#[serde(default)]
pub struct DatanodeOptions {
pub node_id: Option<u64>,
pub workload_types: Vec<DatanodeWorkloadType>,
pub require_lease_before_startup: bool,
pub init_regions_in_background: bool,
pub init_regions_parallelism: usize,
@@ -391,11 +393,19 @@ pub struct DatanodeOptions {
pub rpc_max_send_message_size: Option<ReadableSize>,
}
impl DatanodeOptions {
/// Sanitize the `DatanodeOptions` to ensure the config is valid.
pub fn sanitize(&mut self) {
sanitize_workload_types(&mut self.workload_types);
}
}
impl Default for DatanodeOptions {
#[allow(deprecated)]
fn default() -> Self {
Self {
node_id: None,
workload_types: vec![DatanodeWorkloadType::Hybrid],
require_lease_before_startup: false,
init_regions_in_background: false,
init_regions_parallelism: 16,

View File

@@ -17,7 +17,8 @@ use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Arc;
use std::time::Duration;
use api::v1::meta::{HeartbeatRequest, NodeInfo, Peer, RegionRole, RegionStat};
use api::v1::meta::heartbeat_request::NodeWorkloads;
use api::v1::meta::{DatanodeWorkloads, HeartbeatRequest, NodeInfo, Peer, RegionRole, RegionStat};
use common_base::Plugins;
use common_meta::cache_invalidator::CacheInvalidatorRef;
use common_meta::datanode::REGION_STATISTIC_KEY;
@@ -30,6 +31,7 @@ use common_meta::heartbeat::handler::{
use common_meta::heartbeat::mailbox::{HeartbeatMailbox, MailboxRef};
use common_meta::heartbeat::utils::outgoing_message_to_mailbox_message;
use common_telemetry::{debug, error, info, trace, warn};
use common_workload::DatanodeWorkloadType;
use meta_client::client::{HeartbeatSender, MetaClient};
use meta_client::MetaClientRef;
use servers::addrs;
@@ -51,6 +53,7 @@ pub(crate) mod task_tracker;
/// The datanode heartbeat task which sending `[HeartbeatRequest]` to Metasrv periodically in background.
pub struct HeartbeatTask {
node_id: u64,
workload_types: Vec<DatanodeWorkloadType>,
node_epoch: u64,
peer_addr: String,
running: Arc<AtomicBool>,
@@ -91,6 +94,7 @@ impl HeartbeatTask {
Ok(Self {
node_id: opts.node_id.unwrap_or(0),
workload_types: opts.workload_types.clone(),
// We use datanode's start time millis as the node's epoch.
node_epoch: common_time::util::current_time_millis() as u64,
peer_addr: addrs::resolve_addr(&opts.grpc.bind_addr, Some(&opts.grpc.server_addr)),
@@ -221,6 +225,7 @@ impl HeartbeatTask {
addr: addr.clone(),
});
let epoch = self.region_alive_keeper.epoch();
let workload_types = self.workload_types.clone();
self.region_alive_keeper.start(Some(event_receiver)).await?;
let mut last_sent = Instant::now();
@@ -239,6 +244,9 @@ impl HeartbeatTask {
start_time_ms: node_epoch,
cpus: num_cpus::get() as u32,
}),
node_workloads: Some(NodeWorkloads::Datanode(DatanodeWorkloads {
types: workload_types.iter().map(|w| w.to_i32()).collect(),
})),
..Default::default()
};

View File

@@ -38,6 +38,7 @@ common-telemetry.workspace = true
common-time.workspace = true
common-version.workspace = true
common-wal.workspace = true
common-workload.workspace = true
dashmap.workspace = true
datatypes.workspace = true
deadpool = { workspace = true, optional = true }

View File

@@ -131,6 +131,7 @@ impl HeartbeatHandler for CollectDatanodeClusterInfoHandler {
wcus: stat.wcus,
leader_regions,
follower_regions,
workloads: stat.datanode_workloads.clone(),
}),
version: info.version,
git_commit: info.git_commit,

View File

@@ -12,7 +12,9 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use api::v1::meta::{HeartbeatRequest, Peer, Role};
use api::v1::meta::heartbeat_request::NodeWorkloads;
use api::v1::meta::{FlownodeWorkloads, HeartbeatRequest, Peer, Role};
use common_meta::heartbeat::utils::get_datanode_workloads;
use common_meta::rpc::store::PutRequest;
use common_telemetry::{trace, warn};
use common_time::util as time_util;
@@ -37,7 +39,12 @@ impl HeartbeatHandler for DatanodeKeepLeaseHandler {
ctx: &mut Context,
_acc: &mut HeartbeatAccumulator,
) -> Result<HandleControl> {
let HeartbeatRequest { header, peer, .. } = req;
let HeartbeatRequest {
header,
peer,
node_workloads,
..
} = req;
let Some(_header) = &header else {
return Ok(HandleControl::Continue);
};
@@ -45,10 +52,12 @@ impl HeartbeatHandler for DatanodeKeepLeaseHandler {
return Ok(HandleControl::Continue);
};
let datanode_workloads = get_datanode_workloads(node_workloads.as_ref());
let key = DatanodeLeaseKey { node_id: peer.id };
let value = LeaseValue {
timestamp_millis: time_util::current_time_millis(),
node_addr: peer.addr.clone(),
workloads: NodeWorkloads::Datanode(datanode_workloads),
};
trace!("Receive a heartbeat from datanode: {key:?}, {value:?}");
@@ -88,6 +97,7 @@ impl HeartbeatHandler for FlownodeKeepLeaseHandler {
let value = LeaseValue {
timestamp_millis: time_util::current_time_millis(),
node_addr: peer.addr.clone(),
workloads: NodeWorkloads::Flownode(FlownodeWorkloads { types: vec![] }),
};
trace!("Receive a heartbeat from flownode: {key:?}, {value:?}");

View File

@@ -17,6 +17,7 @@ mod flownode;
use std::str::FromStr;
use api::v1::meta::heartbeat_request::NodeWorkloads;
pub use datanode::*;
pub use flownode::*;
use serde::{Deserialize, Serialize};
@@ -74,11 +75,12 @@ macro_rules! impl_try_from_lease_key {
impl_try_from_lease_key!(FlownodeLeaseKey, FLOWNODE_LEASE_PREFIX);
impl_try_from_lease_key!(DatanodeLeaseKey, DATANODE_LEASE_PREFIX);
#[derive(Debug, Clone, Eq, PartialEq, Serialize, Deserialize)]
#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
pub struct LeaseValue {
// last activity
pub timestamp_millis: i64,
pub node_addr: String,
pub workloads: NodeWorkloads,
}
impl FromStr for LeaseValue {
@@ -113,6 +115,8 @@ impl TryFrom<LeaseValue> for Vec<u8> {
#[cfg(test)]
mod tests {
use api::v1::meta::DatanodeWorkloads;
use super::*;
#[test]
@@ -120,6 +124,7 @@ mod tests {
let value = LeaseValue {
timestamp_millis: 111,
node_addr: "127.0.0.1:3002".to_string(),
workloads: NodeWorkloads::Datanode(DatanodeWorkloads { types: vec![] }),
};
let value_bytes: Vec<u8> = value.clone().try_into().unwrap();

View File

@@ -13,13 +13,18 @@
// limitations under the License.
use std::collections::HashMap;
use std::future::Future;
use std::hash::Hash;
use std::pin::Pin;
use std::task::{Context, Poll};
use api::v1::meta::heartbeat_request::NodeWorkloads;
use common_error::ext::BoxedError;
use common_meta::kv_backend::{KvBackend, ResettableKvBackendRef};
use common_meta::peer::{Peer, PeerLookupService};
use common_meta::{util, DatanodeId, FlownodeId};
use common_time::util as time_util;
use common_workload::DatanodeWorkloadType;
use snafu::ResultExt;
use crate::cluster::MetaPeerClientRef;
@@ -33,6 +38,22 @@ fn build_lease_filter(lease_secs: u64) -> impl Fn(&LeaseValue) -> bool {
}
}
/// Returns true if the datanode can accept ingest workload based on its workload types.
///
/// A datanode is considered to accept ingest workload if it supports either:
/// - Hybrid workload (both ingest and query workloads)
/// - Ingest workload (only ingest workload)
pub fn is_datanode_accept_ingest_workload(lease_value: &LeaseValue) -> bool {
match &lease_value.workloads {
NodeWorkloads::Datanode(workloads) => workloads
.types
.iter()
.filter_map(|w| DatanodeWorkloadType::from_i32(*w))
.any(|w| w.accept_ingest()),
_ => false,
}
}
/// Returns the lease value of the given datanode id, if the datanode is not found, returns None.
pub async fn find_datanode_lease_value(
datanode_id: DatanodeId,
@@ -81,16 +102,78 @@ pub async fn lookup_datanode_peer(
}
}
type LeaseFilterFuture<'a, K> =
Pin<Box<dyn Future<Output = Result<HashMap<K, LeaseValue>>> + Send + 'a>>;
pub struct LeaseFilter<'a, K>
where
K: Eq + Hash + TryFrom<Vec<u8>, Error = Error> + 'a,
{
lease_secs: u64,
key_prefix: Vec<u8>,
meta_peer_client: &'a MetaPeerClientRef,
condition: Option<fn(&LeaseValue) -> bool>,
inner_future: Option<LeaseFilterFuture<'a, K>>,
}
impl<'a, K> LeaseFilter<'a, K>
where
K: Eq + Hash + TryFrom<Vec<u8>, Error = Error> + 'a,
{
pub fn new(
lease_secs: u64,
key_prefix: Vec<u8>,
meta_peer_client: &'a MetaPeerClientRef,
) -> Self {
Self {
lease_secs,
key_prefix,
meta_peer_client,
condition: None,
inner_future: None,
}
}
/// Set the condition for the lease filter.
pub fn with_condition(mut self, condition: fn(&LeaseValue) -> bool) -> Self {
self.condition = Some(condition);
self
}
}
impl<'a, K> Future for LeaseFilter<'a, K>
where
K: Eq + Hash + TryFrom<Vec<u8>, Error = Error> + 'a,
{
type Output = Result<HashMap<K, LeaseValue>>;
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let this = self.get_mut();
if this.inner_future.is_none() {
let lease_filter = build_lease_filter(this.lease_secs);
let condition = this.condition;
let key_prefix = std::mem::take(&mut this.key_prefix);
let fut = filter(key_prefix, this.meta_peer_client, move |v| {
lease_filter(v) && condition.unwrap_or(|_| true)(v)
});
this.inner_future = Some(Box::pin(fut));
}
let fut = this.inner_future.as_mut().unwrap();
let result = futures::ready!(fut.as_mut().poll(cx))?;
Poll::Ready(Ok(result))
}
}
/// Find all alive datanodes
pub async fn alive_datanodes(
pub fn alive_datanodes(
meta_peer_client: &MetaPeerClientRef,
lease_secs: u64,
) -> Result<HashMap<DatanodeLeaseKey, LeaseValue>> {
let predicate = build_lease_filter(lease_secs);
filter(DatanodeLeaseKey::prefix_key(), meta_peer_client, |v| {
predicate(v)
})
.await
) -> LeaseFilter<'_, DatanodeLeaseKey> {
LeaseFilter::new(lease_secs, DatanodeLeaseKey::prefix_key(), meta_peer_client)
}
/// look up [`Peer`] given [`ClusterId`] and [`DatanodeId`], only return if it's alive under given `lease_secs`
@@ -121,17 +204,15 @@ pub async fn lookup_flownode_peer(
}
/// Find all alive flownodes
pub async fn alive_flownodes(
pub fn alive_flownodes(
meta_peer_client: &MetaPeerClientRef,
lease_secs: u64,
) -> Result<HashMap<FlownodeLeaseKey, LeaseValue>> {
let predicate = build_lease_filter(lease_secs);
filter(
) -> LeaseFilter<'_, FlownodeLeaseKey> {
LeaseFilter::new(
lease_secs,
FlownodeLeaseKey::prefix_key_by_cluster(),
meta_peer_client,
|v| predicate(v),
)
.await
}
pub async fn filter<P, K>(
@@ -190,3 +271,124 @@ impl PeerLookupService for MetaPeerLookupService {
.context(common_meta::error::ExternalSnafu)
}
}
#[cfg(test)]
mod tests {
use api::v1::meta::heartbeat_request::NodeWorkloads;
use api::v1::meta::DatanodeWorkloads;
use common_meta::kv_backend::ResettableKvBackendRef;
use common_meta::rpc::store::PutRequest;
use common_time::util::current_time_millis;
use common_workload::DatanodeWorkloadType;
use crate::key::{DatanodeLeaseKey, LeaseValue};
use crate::lease::{alive_datanodes, is_datanode_accept_ingest_workload};
use crate::test_util::create_meta_peer_client;
async fn put_lease_value(
kv_backend: &ResettableKvBackendRef,
key: DatanodeLeaseKey,
value: LeaseValue,
) {
kv_backend
.put(PutRequest {
key: key.try_into().unwrap(),
value: value.try_into().unwrap(),
prev_kv: false,
})
.await
.unwrap();
}
#[tokio::test]
async fn test_alive_datanodes() {
let client = create_meta_peer_client();
let in_memory = client.memory_backend();
let lease_secs = 10;
// put a stale lease value for node 1
let key = DatanodeLeaseKey { node_id: 1 };
let value = LeaseValue {
// 20s ago
timestamp_millis: current_time_millis() - lease_secs * 2 * 1000,
node_addr: "127.0.0.1:20201".to_string(),
workloads: NodeWorkloads::Datanode(DatanodeWorkloads {
types: vec![DatanodeWorkloadType::Hybrid as i32],
}),
};
put_lease_value(&in_memory, key, value).await;
// put a fresh lease value for node 2
let key = DatanodeLeaseKey { node_id: 2 };
let value = LeaseValue {
timestamp_millis: current_time_millis(),
node_addr: "127.0.0.1:20202".to_string(),
workloads: NodeWorkloads::Datanode(DatanodeWorkloads {
types: vec![DatanodeWorkloadType::Hybrid as i32],
}),
};
put_lease_value(&in_memory, key.clone(), value.clone()).await;
let leases = alive_datanodes(&client, lease_secs as u64).await.unwrap();
assert_eq!(leases.len(), 1);
assert_eq!(leases.get(&key), Some(&value));
}
#[tokio::test]
async fn test_alive_datanodes_with_condition() {
let client = create_meta_peer_client();
let in_memory = client.memory_backend();
let lease_secs = 10;
// put a lease value for node 1 without mode info
let key = DatanodeLeaseKey { node_id: 1 };
let value = LeaseValue {
// 20s ago
timestamp_millis: current_time_millis() - 20 * 1000,
node_addr: "127.0.0.1:20201".to_string(),
workloads: NodeWorkloads::Datanode(DatanodeWorkloads {
types: vec![DatanodeWorkloadType::Hybrid as i32],
}),
};
put_lease_value(&in_memory, key, value).await;
// put a lease value for node 2 with mode info
let key = DatanodeLeaseKey { node_id: 2 };
let value = LeaseValue {
timestamp_millis: current_time_millis(),
node_addr: "127.0.0.1:20202".to_string(),
workloads: NodeWorkloads::Datanode(DatanodeWorkloads {
types: vec![DatanodeWorkloadType::Hybrid as i32],
}),
};
put_lease_value(&in_memory, key, value).await;
// put a lease value for node 3 with mode info
let key = DatanodeLeaseKey { node_id: 3 };
let value = LeaseValue {
timestamp_millis: current_time_millis(),
node_addr: "127.0.0.1:20203".to_string(),
workloads: NodeWorkloads::Datanode(DatanodeWorkloads {
types: vec![i32::MAX],
}),
};
put_lease_value(&in_memory, key, value).await;
// put a lease value for node 3 with mode info
let key = DatanodeLeaseKey { node_id: 4 };
let value = LeaseValue {
timestamp_millis: current_time_millis(),
node_addr: "127.0.0.1:20204".to_string(),
workloads: NodeWorkloads::Datanode(DatanodeWorkloads {
types: vec![i32::MAX],
}),
};
put_lease_value(&in_memory, key, value).await;
let leases = alive_datanodes(&client, lease_secs as u64)
.with_condition(is_datanode_accept_ingest_workload)
.await
.unwrap();
assert_eq!(leases.len(), 1);
assert!(leases.contains_key(&DatanodeLeaseKey { node_id: 2 }));
}
}

View File

@@ -51,8 +51,9 @@ impl Selector for LeaseBasedSelector {
async fn select(&self, ctx: &Self::Context, opts: SelectorOptions) -> Result<Self::Output> {
// 1. get alive datanodes.
let lease_kvs =
lease::alive_datanodes(&ctx.meta_peer_client, ctx.datanode_lease_secs).await?;
let lease_kvs = lease::alive_datanodes(&ctx.meta_peer_client, ctx.datanode_lease_secs)
.with_condition(lease::is_datanode_accept_ingest_workload)
.await?;
// 2. compute weight array, but the weight of each item is the same.
let mut weight_array = lease_kvs

View File

@@ -66,8 +66,9 @@ where
async fn select(&self, ctx: &Self::Context, opts: SelectorOptions) -> Result<Self::Output> {
// 1. get alive datanodes.
let lease_kvs =
lease::alive_datanodes(&ctx.meta_peer_client, ctx.datanode_lease_secs).await?;
let lease_kvs = lease::alive_datanodes(&ctx.meta_peer_client, ctx.datanode_lease_secs)
.with_condition(lease::is_datanode_accept_ingest_workload)
.await?;
// 2. get stat kvs and filter out expired datanodes.
let stat_keys = lease_kvs.keys().map(|k| k.into()).collect();
@@ -166,7 +167,10 @@ async fn get_leader_peer_ids(
mod tests {
use std::collections::HashMap;
use api::v1::meta::heartbeat_request::NodeWorkloads;
use api::v1::meta::DatanodeWorkloads;
use common_meta::datanode::{DatanodeStatKey, DatanodeStatValue};
use common_workload::DatanodeWorkloadType;
use crate::key::{DatanodeLeaseKey, LeaseValue};
use crate::selector::load_based::filter_out_expired_datanode;
@@ -193,6 +197,9 @@ mod tests {
LeaseValue {
timestamp_millis: 0,
node_addr: "127.0.0.1:3002".to_string(),
workloads: NodeWorkloads::Datanode(DatanodeWorkloads {
types: vec![DatanodeWorkloadType::Hybrid.to_i32()],
}),
},
);

View File

@@ -62,7 +62,9 @@ impl RoundRobinSelector {
SelectTarget::Datanode => {
// 1. get alive datanodes.
let lease_kvs =
lease::alive_datanodes(&ctx.meta_peer_client, ctx.datanode_lease_secs).await?;
lease::alive_datanodes(&ctx.meta_peer_client, ctx.datanode_lease_secs)
.with_condition(lease::is_datanode_accept_ingest_workload)
.await?;
let mut exclude_peer_ids = self
.node_excluder

View File

@@ -14,10 +14,13 @@
use std::sync::Arc;
use api::v1::meta::heartbeat_request::NodeWorkloads;
use api::v1::meta::DatanodeWorkloads;
use common_meta::kv_backend::memory::MemoryKvBackend;
use common_meta::peer::Peer;
use common_meta::rpc::router::{Region, RegionRoute};
use common_time::util as time_util;
use common_workload::DatanodeWorkloadType;
use crate::cluster::{MetaPeerClientBuilder, MetaPeerClientRef};
use crate::key::{DatanodeLeaseKey, LeaseValue};
@@ -40,17 +43,22 @@ pub(crate) fn new_region_route(region_id: u64, peers: &[Peer], leader_node: u64)
}
}
/// Builds and returns a [`SelectorContext`]. To access its inner state,
/// use `memory_backend` on [`MetaPeerClientRef`].
pub(crate) fn create_selector_context() -> SelectorContext {
pub(crate) fn create_meta_peer_client() -> MetaPeerClientRef {
let in_memory = Arc::new(MemoryKvBackend::new());
let meta_peer_client = MetaPeerClientBuilder::default()
MetaPeerClientBuilder::default()
.election(None)
.in_memory(in_memory.clone())
.in_memory(in_memory)
.build()
.map(Arc::new)
// Safety: all required fields set at initialization
.unwrap();
.unwrap()
}
/// Builds and returns a [`SelectorContext`]. To access its inner state,
/// use `memory_backend` on [`MetaPeerClientRef`].
pub(crate) fn create_selector_context() -> SelectorContext {
let meta_peer_client = create_meta_peer_client();
let in_memory = meta_peer_client.memory_backend();
SelectorContext {
datanode_lease_secs: 10,
@@ -71,6 +79,9 @@ pub(crate) async fn put_datanodes(meta_peer_client: &MetaPeerClientRef, datanode
let lease_value = LeaseValue {
timestamp_millis: time_util::current_time_millis(),
node_addr: datanode.addr,
workloads: NodeWorkloads::Datanode(DatanodeWorkloads {
types: vec![DatanodeWorkloadType::Hybrid.to_i32()],
}),
};
let lease_key_bytes: Vec<u8> = lease_key.try_into().unwrap();
let lease_value_bytes: Vec<u8> = lease_value.try_into().unwrap();