feat: expose workload filter to selector options (#6951)

* feat: add workload filtering support to selector options

Signed-off-by: WenyXu <wenymedia@gmail.com>

* chore: apply suggestions

Signed-off-by: WenyXu <wenymedia@gmail.com>

---------

Signed-off-by: WenyXu <wenymedia@gmail.com>
This commit is contained in:
Weny Xu
2025-09-11 11:11:13 +08:00
committed by GitHub
parent 2bddbe8c47
commit 6a15e62719
9 changed files with 26 additions and 12 deletions

View File

@@ -107,7 +107,7 @@ mod tests {
use common_time::util::current_time_millis;
use common_workload::DatanodeWorkloadType;
use crate::discovery::utils::{self, is_datanode_accept_ingest_workload};
use crate::discovery::utils::{self, accept_ingest_workload};
use crate::key::{DatanodeLeaseKey, LeaseValue};
use crate::test_util::create_meta_peer_client;
@@ -219,7 +219,7 @@ mod tests {
let peers = utils::alive_datanodes(
client.as_ref(),
Duration::from_secs(lease_secs),
Some(is_datanode_accept_ingest_workload),
Some(accept_ingest_workload),
)
.await
.unwrap();

View File

@@ -144,19 +144,22 @@ pub async fn alive_datanode(
Ok(v)
}
/// Returns true if the datanode can accept ingest workload based on its workload types.
/// Determines if a datanode is capable of accepting ingest workloads.
/// Returns `true` if the datanode's workload types include ingest capability,
/// or if the node is not of type [NodeWorkloads::Datanode].
///
/// A datanode is considered to accept ingest workload if it supports either:
/// - Hybrid workload (both ingest and query workloads)
/// - Ingest workload (only ingest workload)
pub fn is_datanode_accept_ingest_workload(datanode_workloads: &NodeWorkloads) -> bool {
pub fn accept_ingest_workload(datanode_workloads: &NodeWorkloads) -> bool {
match &datanode_workloads {
NodeWorkloads::Datanode(workloads) => workloads
.types
.iter()
.filter_map(|w| DatanodeWorkloadType::from_i32(*w))
.any(|w| w.accept_ingest()),
_ => false,
// If the [NodeWorkloads] type is not [NodeWorkloads::Datanode], returns true.
_ => true,
}
}

View File

@@ -20,6 +20,7 @@ use common_meta::error::{ExternalSnafu, Result as MetaResult};
use common_meta::peer::{Peer, PeerAllocator};
use snafu::{ResultExt, ensure};
use crate::discovery::utils::accept_ingest_workload;
use crate::error::{Result, TooManyPartitionsSnafu};
use crate::metasrv::{SelectorContext, SelectorRef};
use crate::selector::SelectorOptions;
@@ -69,6 +70,7 @@ impl MetasrvPeerAllocator {
min_required_items,
allow_duplication: true,
exclude_peer_ids: HashSet::new(),
workload_filter: Some(accept_ingest_workload),
},
)
.await

View File

@@ -40,6 +40,7 @@ use tokio::sync::mpsc::{Receiver, Sender};
use tokio::sync::oneshot;
use tokio::time::{MissedTickBehavior, interval, interval_at};
use crate::discovery::utils::accept_ingest_workload;
use crate::error::{self, Result};
use crate::failure_detector::PhiAccrualFailureDetectorOptions;
use crate::metasrv::{RegionStatAwareSelectorRef, SelectTarget, SelectorContext, SelectorRef};
@@ -584,6 +585,7 @@ impl RegionSupervisor {
min_required_items: regions.len(),
allow_duplication: true,
exclude_peer_ids,
workload_filter: Some(accept_ingest_workload),
};
let peers = selector.select(&self.selector_context, opt).await?;
ensure!(

View File

@@ -22,6 +22,7 @@ pub mod weight_compute;
pub mod weighted_choose;
use std::collections::HashSet;
use api::v1::meta::heartbeat_request::NodeWorkloads;
use serde::{Deserialize, Serialize};
use store_api::storage::RegionId;
use strum::AsRefStr;
@@ -63,6 +64,8 @@ pub struct SelectorOptions {
pub allow_duplication: bool,
/// The peers to exclude from the selection.
pub exclude_peer_ids: HashSet<u64>,
/// The filter to select the peers based on their workloads.
pub workload_filter: Option<fn(&NodeWorkloads) -> bool>,
}
impl Default for SelectorOptions {
@@ -71,6 +74,7 @@ impl Default for SelectorOptions {
min_required_items: 1,
allow_duplication: false,
exclude_peer_ids: HashSet::new(),
workload_filter: None,
}
}
}

View File

@@ -139,6 +139,7 @@ mod tests {
min_required_items: i,
allow_duplication: false,
exclude_peer_ids: HashSet::new(),
workload_filter: None,
};
let selected_peers: HashSet<_> =
@@ -154,6 +155,7 @@ mod tests {
min_required_items: 6,
allow_duplication: false,
exclude_peer_ids: HashSet::new(),
workload_filter: None,
};
let selected_result =
@@ -165,6 +167,7 @@ mod tests {
min_required_items: i,
allow_duplication: true,
exclude_peer_ids: HashSet::new(),
workload_filter: None,
};
let selected_peers =

View File

@@ -15,7 +15,6 @@
use common_meta::peer::Peer;
use snafu::ResultExt;
use crate::discovery::utils::is_datanode_accept_ingest_workload;
use crate::error::{ListActiveDatanodesSnafu, Result};
use crate::metasrv::SelectorContext;
use crate::selector::common::{choose_items, filter_out_excluded_peers};
@@ -35,7 +34,7 @@ impl Selector for LeaseBasedSelector {
// 1. get alive datanodes.
let alive_datanodes = ctx
.peer_discovery
.active_datanodes(Some(is_datanode_accept_ingest_workload))
.active_datanodes(opts.workload_filter)
.await
.context(ListActiveDatanodesSnafu)?;

View File

@@ -20,7 +20,6 @@ use common_telemetry::debug;
use snafu::ResultExt;
use crate::cluster::MetaPeerClientRef;
use crate::discovery::utils::is_datanode_accept_ingest_workload;
use crate::error::{ListActiveDatanodesSnafu, Result};
use crate::metasrv::SelectorContext;
use crate::selector::common::{choose_items, filter_out_excluded_peers};
@@ -54,7 +53,7 @@ where
// 1. get alive datanodes.
let alive_datanodes = ctx
.peer_discovery
.active_datanodes(Some(is_datanode_accept_ingest_workload))
.active_datanodes(opts.workload_filter)
.await
.context(ListActiveDatanodesSnafu)?;

View File

@@ -17,7 +17,6 @@ use std::sync::atomic::AtomicUsize;
use common_meta::peer::Peer;
use snafu::{ResultExt, ensure};
use crate::discovery::utils::is_datanode_accept_ingest_workload;
use crate::error::{
ListActiveDatanodesSnafu, ListActiveFlownodesSnafu, NoEnoughAvailableNodeSnafu, Result,
};
@@ -59,7 +58,7 @@ impl RoundRobinSelector {
// 1. get alive datanodes.
let alive_datanodes = ctx
.peer_discovery
.active_datanodes(Some(is_datanode_accept_ingest_workload))
.active_datanodes(opts.workload_filter)
.await
.context(ListActiveDatanodesSnafu)?;
@@ -71,7 +70,7 @@ impl RoundRobinSelector {
}
SelectTarget::Flownode => ctx
.peer_discovery
.active_flownodes(None)
.active_flownodes(opts.workload_filter)
.await
.context(ListActiveFlownodesSnafu)?,
};
@@ -150,6 +149,7 @@ mod test {
min_required_items: 4,
allow_duplication: true,
exclude_peer_ids: HashSet::new(),
workload_filter: None,
},
)
.await
@@ -167,6 +167,7 @@ mod test {
min_required_items: 2,
allow_duplication: true,
exclude_peer_ids: HashSet::new(),
workload_filter: None,
},
)
.await
@@ -208,6 +209,7 @@ mod test {
min_required_items: 1,
allow_duplication: true,
exclude_peer_ids: HashSet::from([2, 5]),
workload_filter: None,
},
)
.await