From e0ce0a64464d6569b03c0e4b801df0e22cd2463b Mon Sep 17 00:00:00 2001 From: Weny Xu Date: Wed, 10 Sep 2025 11:43:46 +0800 Subject: [PATCH] refactor: refactor `PeerLookupService` and simplify `Selector` implementations (#6939) * refactor: move `lease` into `discovery` dir Signed-off-by: WenyXu * feat: introduce discovery components Signed-off-by: WenyXu * refactor: simplify selector Signed-off-by: WenyXu * refactor: remove duplicate peer allocator trait Signed-off-by: WenyXu * chore: apply suggestions Signed-off-by: WenyXu * refactor: minor refactor Signed-off-by: WenyXu * chore: refine comments Signed-off-by: WenyXu --------- Signed-off-by: WenyXu --- src/common/meta/src/ddl/flow_meta.rs | 35 +- src/common/meta/src/ddl/table_meta.rs | 21 +- src/common/meta/src/peer.rs | 72 ++- src/common/meta/src/test_util.rs | 10 +- src/common/time/src/util.rs | 23 + src/meta-srv/src/bootstrap.rs | 35 +- src/meta-srv/src/cluster.rs | 3 +- src/meta-srv/src/discovery.rs | 88 +++ src/meta-srv/src/discovery/lease.rs | 326 +++++++++++ src/meta-srv/src/discovery/node_info.rs | 63 ++ src/meta-srv/src/discovery/utils.rs | 182 ++++++ src/meta-srv/src/error.rs | 32 +- src/meta-srv/src/flow_meta_alloc.rs | 52 -- .../src/handler/keep_lease_handler.rs | 5 +- src/meta-srv/src/lease.rs | 552 ------------------ src/meta-srv/src/lib.rs | 7 +- src/meta-srv/src/metasrv.rs | 16 +- src/meta-srv/src/metasrv/builder.rs | 53 +- src/meta-srv/src/node_excluder.rs | 32 - .../src/{table_meta_alloc.rs => peer.rs} | 52 +- .../src/procedure/region_migration.rs | 4 +- .../downgrade_leader_region.rs | 4 +- src/meta-srv/src/region/supervisor.rs | 18 +- src/meta-srv/src/selector/lease_based.rs | 58 +- src/meta-srv/src/selector/load_based.rs | 144 +---- src/meta-srv/src/selector/round_robin.rs | 81 +-- src/meta-srv/src/selector/test_utils.rs | 8 +- src/meta-srv/src/service/admin/node_lease.rs | 14 +- src/meta-srv/src/test_util.rs | 17 - src/meta-srv/src/utils/insert_forwarder.rs | 14 +- tests-integration/src/cluster.rs | 14 +- 31 files changed, 976 insertions(+), 1059 deletions(-) create mode 100644 src/meta-srv/src/discovery.rs create mode 100644 src/meta-srv/src/discovery/lease.rs create mode 100644 src/meta-srv/src/discovery/node_info.rs create mode 100644 src/meta-srv/src/discovery/utils.rs delete mode 100644 src/meta-srv/src/flow_meta_alloc.rs delete mode 100644 src/meta-srv/src/lease.rs delete mode 100644 src/meta-srv/src/node_excluder.rs rename src/meta-srv/src/{table_meta_alloc.rs => peer.rs} (70%) diff --git a/src/common/meta/src/ddl/flow_meta.rs b/src/common/meta/src/ddl/flow_meta.rs index 2e71c97217..85c1f3e989 100644 --- a/src/common/meta/src/ddl/flow_meta.rs +++ b/src/common/meta/src/ddl/flow_meta.rs @@ -14,11 +14,9 @@ use std::sync::Arc; -use tonic::async_trait; - use crate::error::Result; use crate::key::FlowId; -use crate::peer::Peer; +use crate::peer::{NoopPeerAllocator, Peer, PeerAllocatorRef}; use crate::sequence::SequenceRef; /// The reference of [FlowMetadataAllocator]. @@ -30,25 +28,25 @@ pub type FlowMetadataAllocatorRef = Arc; #[derive(Clone)] pub struct FlowMetadataAllocator { flow_id_sequence: SequenceRef, - partition_peer_allocator: PartitionPeerAllocatorRef, + peer_allocator: PeerAllocatorRef, } impl FlowMetadataAllocator { - /// Returns the [FlowMetadataAllocator] with [NoopPartitionPeerAllocator]. + /// Returns the [FlowMetadataAllocator] with [NoopPeerAllocator]. pub fn with_noop_peer_allocator(flow_id_sequence: SequenceRef) -> Self { Self { flow_id_sequence, - partition_peer_allocator: Arc::new(NoopPartitionPeerAllocator), + peer_allocator: Arc::new(NoopPeerAllocator), } } pub fn with_peer_allocator( flow_id_sequence: SequenceRef, - peer_allocator: Arc, + peer_allocator: PeerAllocatorRef, ) -> Self { Self { flow_id_sequence, - partition_peer_allocator: peer_allocator, + peer_allocator, } } @@ -61,27 +59,8 @@ impl FlowMetadataAllocator { /// Allocates the [FlowId] and [Peer]s. pub async fn create(&self, partitions: usize) -> Result<(FlowId, Vec)> { let flow_id = self.allocate_flow_id().await?; - let peers = self.partition_peer_allocator.alloc(partitions).await?; + let peers = self.peer_allocator.alloc(partitions).await?; Ok((flow_id, peers)) } } - -/// Allocates [Peer]s for partitions. -#[async_trait] -pub trait PartitionPeerAllocator: Send + Sync { - /// Allocates [Peer] nodes for storing partitions. - async fn alloc(&self, partitions: usize) -> Result>; -} - -/// [PartitionPeerAllocatorRef] allocates [Peer]s for partitions. -pub type PartitionPeerAllocatorRef = Arc; - -struct NoopPartitionPeerAllocator; - -#[async_trait] -impl PartitionPeerAllocator for NoopPartitionPeerAllocator { - async fn alloc(&self, partitions: usize) -> Result> { - Ok(vec![Peer::default(); partitions]) - } -} diff --git a/src/common/meta/src/ddl/table_meta.rs b/src/common/meta/src/ddl/table_meta.rs index b2906e8438..42cc893ee6 100644 --- a/src/common/meta/src/ddl/table_meta.rs +++ b/src/common/meta/src/ddl/table_meta.rs @@ -15,7 +15,6 @@ use std::collections::HashMap; use std::sync::Arc; -use async_trait::async_trait; use common_telemetry::{debug, info}; use snafu::ensure; use store_api::storage::{RegionId, RegionNumber, TableId}; @@ -23,7 +22,7 @@ use store_api::storage::{RegionId, RegionNumber, TableId}; use crate::ddl::TableMetadata; use crate::error::{Result, UnsupportedSnafu}; use crate::key::table_route::PhysicalTableRouteValue; -use crate::peer::Peer; +use crate::peer::{NoopPeerAllocator, PeerAllocatorRef}; use crate::rpc::ddl::CreateTableTask; use crate::rpc::router::{Region, RegionRoute}; use crate::sequence::SequenceRef; @@ -186,21 +185,3 @@ impl TableMetadataAllocator { self.table_id_sequence.clone() } } - -pub type PeerAllocatorRef = Arc; - -/// [`PeerAllocator`] allocates [`Peer`]s for creating regions. -#[async_trait] -pub trait PeerAllocator: Send + Sync { - /// Allocates `regions` size [`Peer`]s. - async fn alloc(&self, regions: usize) -> Result>; -} - -struct NoopPeerAllocator; - -#[async_trait] -impl PeerAllocator for NoopPeerAllocator { - async fn alloc(&self, regions: usize) -> Result> { - Ok(vec![Peer::default(); regions]) - } -} diff --git a/src/common/meta/src/peer.rs b/src/common/meta/src/peer.rs index 834573b98b..0c11c49075 100644 --- a/src/common/meta/src/peer.rs +++ b/src/common/meta/src/peer.rs @@ -15,21 +15,77 @@ use std::sync::Arc; pub use api::v1::meta::Peer; +use api::v1::meta::heartbeat_request::NodeWorkloads; use crate::error::Error; use crate::{DatanodeId, FlownodeId}; -/// PeerLookupService is a service that can lookup peers. +/// [`PeerResolver`] provides methods to look up peer information by node ID. +/// +/// This trait allows resolving both datanode and flownode peers, regardless of their current activity status. +/// Returned peers may be inactive (i.e., not currently alive in the cluster). #[async_trait::async_trait] -pub trait PeerLookupService { - /// Returns the datanode with the given id. It may return inactive peers. +pub trait PeerResolver: Send + Sync { + /// Looks up a datanode peer by its ID. + /// + /// Returns `Some(Peer)` if the datanode exists, or `None` if not found. + /// The returned peer may be inactive. async fn datanode(&self, id: DatanodeId) -> Result, Error>; - /// Returns the flownode with the given id. It may return inactive peers. + /// Looks up a flownode peer by its ID. + /// + /// Returns `Some(Peer)` if the flownode exists, or `None` if not found. + /// The returned peer may be inactive. async fn flownode(&self, id: FlownodeId) -> Result, Error>; - - /// Returns all currently active frontend nodes that have reported a heartbeat within the most recent heartbeat interval from the in-memory backend. - async fn active_frontends(&self) -> Result, Error>; } -pub type PeerLookupServiceRef = Arc; +pub type PeerResolverRef = Arc; + +/// [`PeerDiscovery`] is a service for discovering active peers in the cluster. +#[async_trait::async_trait] +pub trait PeerDiscovery: Send + Sync { + /// Returns all currently active frontend nodes. + /// + /// A frontend is considered active if it has reported a heartbeat within the most recent heartbeat interval, + /// as determined by the in-memory backend. + async fn active_frontends(&self) -> Result, Error>; + + /// Returns all currently active datanodes, optionally filtered by a predicate on their workloads. + /// + /// A datanode is considered active if it has reported a heartbeat within the most recent heartbeat interval, + /// as determined by the in-memory backend. + /// The optional `filter` allows further selection based on the node's workloads. + async fn active_datanodes( + &self, + filter: Option fn(&'a NodeWorkloads) -> bool>, + ) -> Result, Error>; + + /// Returns all currently active flownodes, optionally filtered by a predicate on their workloads. + /// + /// A flownode is considered active if it has reported a heartbeat within the most recent heartbeat interval, + /// as determined by the in-memory backend. + /// The optional `filter` allows further selection based on the node's workloads. + async fn active_flownodes( + &self, + filter: Option fn(&'a NodeWorkloads) -> bool>, + ) -> Result, Error>; +} + +pub type PeerDiscoveryRef = Arc; + +/// [`PeerAllocator`] allocates [`Peer`]s for creating region or flow. +#[async_trait::async_trait] +pub trait PeerAllocator: Send + Sync { + async fn alloc(&self, num: usize) -> Result, Error>; +} + +pub type PeerAllocatorRef = Arc; + +pub struct NoopPeerAllocator; + +#[async_trait::async_trait] +impl PeerAllocator for NoopPeerAllocator { + async fn alloc(&self, num: usize) -> Result, Error> { + Ok(vec![Peer::default(); num]) + } +} diff --git a/src/common/meta/src/test_util.rs b/src/common/meta/src/test_util.rs index 110e43dca2..09af2ad8d0 100644 --- a/src/common/meta/src/test_util.rs +++ b/src/common/meta/src/test_util.rs @@ -35,7 +35,7 @@ use crate::kv_backend::memory::MemoryKvBackend; use crate::node_manager::{ Datanode, DatanodeManager, DatanodeRef, Flownode, FlownodeManager, FlownodeRef, NodeManagerRef, }; -use crate::peer::{Peer, PeerLookupService}; +use crate::peer::{Peer, PeerResolver}; use crate::region_keeper::MemoryRegionKeeper; use crate::region_registry::LeaderRegionRegistry; use crate::sequence::SequenceBuilder; @@ -208,10 +208,10 @@ pub fn new_ddl_context_with_kv_backend( } } -pub struct NoopPeerLookupService; +pub struct NoopPeerResolver; #[async_trait::async_trait] -impl PeerLookupService for NoopPeerLookupService { +impl PeerResolver for NoopPeerResolver { async fn datanode(&self, id: DatanodeId) -> Result> { Ok(Some(Peer::empty(id))) } @@ -219,10 +219,6 @@ impl PeerLookupService for NoopPeerLookupService { async fn flownode(&self, id: FlownodeId) -> Result> { Ok(Some(Peer::empty(id))) } - - async fn active_frontends(&self) -> Result> { - Ok(vec![]) - } } /// Create a kafka topic pool for testing. diff --git a/src/common/time/src/util.rs b/src/common/time/src/util.rs index ca02bb897c..1be4d2c55b 100644 --- a/src/common/time/src/util.rs +++ b/src/common/time/src/util.rs @@ -52,6 +52,29 @@ pub fn find_tz_from_env() -> Option { .and_then(|tz| Tz::from_str(&tz).ok()) } +/// A trait for types that provide the current system time. +pub trait SystemTimer { + /// Returns the time duration since UNIX_EPOCH in milliseconds. + fn current_time_millis(&self) -> i64; + + /// Returns the current time in rfc3339 format. + fn current_time_rfc3339(&self) -> String; +} + +/// Default implementation of [`SystemTimer`] +#[derive(Debug, Default, Clone, Copy)] +pub struct DefaultSystemTimer; + +impl SystemTimer for DefaultSystemTimer { + fn current_time_millis(&self) -> i64 { + current_time_millis() + } + + fn current_time_rfc3339(&self) -> String { + current_time_rfc3339() + } +} + /// Returns the time duration since UNIX_EPOCH in milliseconds. pub fn current_time_millis() -> i64 { chrono::Utc::now().timestamp_millis() diff --git a/src/meta-srv/src/bootstrap.rs b/src/meta-srv/src/bootstrap.rs index d9bf45cd81..3f10a0bcd4 100644 --- a/src/meta-srv/src/bootstrap.rs +++ b/src/meta-srv/src/bootstrap.rs @@ -63,6 +63,7 @@ use tokio_postgres::NoTls; use tonic::codec::CompressionEncoding; use tonic::transport::server::{Router, TcpIncoming}; +use crate::cluster::{MetaPeerClientBuilder, MetaPeerClientRef}; #[cfg(any(feature = "pg_kvbackend", feature = "mysql_kvbackend"))] use crate::election::CANDIDATE_LEASE_SECS; use crate::election::etcd::EtcdElection; @@ -71,8 +72,9 @@ use crate::election::rds::mysql::MySqlElection; #[cfg(feature = "pg_kvbackend")] use crate::election::rds::postgres::PgElection; use crate::metasrv::builder::MetasrvBuilder; -use crate::metasrv::{BackendImpl, Metasrv, MetasrvOptions, SelectTarget, SelectorRef}; -use crate::node_excluder::NodeExcluderRef; +use crate::metasrv::{ + BackendImpl, ElectionRef, Metasrv, MetasrvOptions, SelectTarget, SelectorRef, +}; use crate::selector::SelectorType; use crate::selector::lease_based::LeaseBasedSelector; use crate::selector::load_based::LoadBasedSelector; @@ -405,10 +407,8 @@ pub async fn metasrv_builder( } let in_memory = Arc::new(MemoryKvBackend::new()) as ResettableKvBackendRef; + let meta_peer_client = build_default_meta_peer_client(&election, &in_memory); - let node_excluder = plugins - .get::() - .unwrap_or_else(|| Arc::new(Vec::new()) as NodeExcluderRef); let selector = if let Some(selector) = plugins.get::() { info!("Using selector from plugins"); selector @@ -416,15 +416,12 @@ pub async fn metasrv_builder( let selector = match opts.selector { SelectorType::LoadBased => Arc::new(LoadBasedSelector::new( RegionNumsBasedWeightCompute, - node_excluder, + meta_peer_client.clone(), )) as SelectorRef, - SelectorType::LeaseBased => { - Arc::new(LeaseBasedSelector::new(node_excluder)) as SelectorRef + SelectorType::LeaseBased => Arc::new(LeaseBasedSelector) as SelectorRef, + SelectorType::RoundRobin => { + Arc::new(RoundRobinSelector::new(SelectTarget::Datanode)) as SelectorRef } - SelectorType::RoundRobin => Arc::new(RoundRobinSelector::new( - SelectTarget::Datanode, - node_excluder, - )) as SelectorRef, }; info!( "Using selector from options, selector type: {}", @@ -439,9 +436,23 @@ pub async fn metasrv_builder( .in_memory(in_memory) .selector(selector) .election(election) + .meta_peer_client(meta_peer_client) .plugins(plugins)) } +pub(crate) fn build_default_meta_peer_client( + election: &Option, + in_memory: &ResettableKvBackendRef, +) -> MetaPeerClientRef { + MetaPeerClientBuilder::default() + .election(election.clone()) + .in_memory(in_memory.clone()) + .build() + .map(Arc::new) + // Safety: all required fields set at initialization + .unwrap() +} + pub async fn create_etcd_client(store_addrs: &[String]) -> Result { create_etcd_client_with_tls(store_addrs, None).await } diff --git a/src/meta-srv/src/cluster.rs b/src/meta-srv/src/cluster.rs index 0114cced6f..35b15b3b29 100644 --- a/src/meta-srv/src/cluster.rs +++ b/src/meta-srv/src/cluster.rs @@ -36,8 +36,7 @@ use common_telemetry::warn; use derive_builder::Builder; use snafu::{OptionExt, ResultExt, ensure}; -use crate::error; -use crate::error::{Result, match_for_io_error}; +use crate::error::{self, Result, match_for_io_error}; use crate::metasrv::ElectionRef; pub type MetaPeerClientRef = Arc; diff --git a/src/meta-srv/src/discovery.rs b/src/meta-srv/src/discovery.rs new file mode 100644 index 0000000000..6151e7afbd --- /dev/null +++ b/src/meta-srv/src/discovery.rs @@ -0,0 +1,88 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +pub mod lease; +pub mod node_info; +pub mod utils; + +use std::time::Duration; + +use api::v1::meta::heartbeat_request::NodeWorkloads; +use common_error::ext::BoxedError; +use common_meta::distributed_time_constants::{ + DATANODE_LEASE_SECS, FLOWNODE_LEASE_SECS, FRONTEND_HEARTBEAT_INTERVAL_MILLIS, +}; +use common_meta::error::Result; +use common_meta::peer::{Peer, PeerDiscovery, PeerResolver}; +use common_meta::{DatanodeId, FlownodeId}; +use snafu::ResultExt; + +use crate::cluster::MetaPeerClient; +use crate::discovery::lease::{LeaseValueAccessor, LeaseValueType}; + +#[async_trait::async_trait] +impl PeerDiscovery for MetaPeerClient { + async fn active_frontends(&self) -> Result> { + utils::alive_frontends( + self, + Duration::from_millis(FRONTEND_HEARTBEAT_INTERVAL_MILLIS), + ) + .await + .map_err(BoxedError::new) + .context(common_meta::error::ExternalSnafu) + } + + async fn active_datanodes( + &self, + filter: Option fn(&'a NodeWorkloads) -> bool>, + ) -> Result> { + utils::alive_datanodes(self, Duration::from_secs(DATANODE_LEASE_SECS), filter) + .await + .map_err(BoxedError::new) + .context(common_meta::error::ExternalSnafu) + } + + async fn active_flownodes( + &self, + filter: Option fn(&'a NodeWorkloads) -> bool>, + ) -> Result> { + utils::alive_flownodes(self, Duration::from_secs(FLOWNODE_LEASE_SECS), filter) + .await + .map_err(BoxedError::new) + .context(common_meta::error::ExternalSnafu) + } +} + +#[async_trait::async_trait] +impl PeerResolver for MetaPeerClient { + async fn datanode(&self, id: DatanodeId) -> Result> { + let peer = self + .lease_value(LeaseValueType::Datanode, id) + .await + .map_err(BoxedError::new) + .context(common_meta::error::ExternalSnafu)? + .map(|(peer_id, lease)| Peer::new(peer_id, lease.node_addr)); + Ok(peer) + } + + async fn flownode(&self, id: FlownodeId) -> Result> { + let peer = self + .lease_value(LeaseValueType::Flownode, id) + .await + .map_err(BoxedError::new) + .context(common_meta::error::ExternalSnafu)? + .map(|(peer_id, lease)| Peer::new(peer_id, lease.node_addr)); + Ok(peer) + } +} diff --git a/src/meta-srv/src/discovery/lease.rs b/src/meta-srv/src/discovery/lease.rs new file mode 100644 index 0000000000..8f078d5ddf --- /dev/null +++ b/src/meta-srv/src/discovery/lease.rs @@ -0,0 +1,326 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use common_meta::kv_backend::KvBackend; +use common_meta::rpc::KeyValue; +use common_meta::rpc::store::RangeRequest; + +use crate::cluster::MetaPeerClient; +use crate::error::Result; +use crate::key::{DatanodeLeaseKey, FlownodeLeaseKey, LeaseValue}; + +#[derive(Clone, Copy)] +pub enum LeaseValueType { + Flownode, + Datanode, +} + +#[async_trait::async_trait] +pub trait LeaseValueAccessor: Send + Sync { + /// Returns the peer id and lease value. + async fn lease_values( + &self, + lease_value_type: LeaseValueType, + ) -> Result>; + + async fn lease_value( + &self, + lease_value_type: LeaseValueType, + node_id: u64, + ) -> Result>; +} + +fn decoder(lease_value_type: LeaseValueType, kv: KeyValue) -> Result<(u64, LeaseValue)> { + match lease_value_type { + LeaseValueType::Flownode => { + let lease_key: FlownodeLeaseKey = kv.key.try_into()?; + let lease_value: LeaseValue = kv.value.try_into()?; + Ok((lease_key.node_id, lease_value)) + } + LeaseValueType::Datanode => { + let lease_key: DatanodeLeaseKey = kv.key.try_into()?; + let lease_value: LeaseValue = kv.value.try_into()?; + Ok((lease_key.node_id, lease_value)) + } + } +} + +#[async_trait::async_trait] +impl LeaseValueAccessor for MetaPeerClient { + async fn lease_values( + &self, + lease_value_type: LeaseValueType, + ) -> Result> { + let prefix = match lease_value_type { + LeaseValueType::Flownode => FlownodeLeaseKey::prefix_key_by_cluster(), + LeaseValueType::Datanode => DatanodeLeaseKey::prefix_key(), + }; + let range_request = RangeRequest::new().with_prefix(prefix); + let response = self.range(range_request).await?; + response + .kvs + .into_iter() + .map(|kv| { + let (lease_key, lease_value) = decoder(lease_value_type, kv)?; + Ok((lease_key, lease_value)) + }) + .collect::>>() + } + + async fn lease_value( + &self, + lease_value_type: LeaseValueType, + node_id: u64, + ) -> Result> { + let key: Vec = match lease_value_type { + LeaseValueType::Flownode => FlownodeLeaseKey { node_id }.try_into()?, + LeaseValueType::Datanode => DatanodeLeaseKey { node_id }.try_into()?, + }; + + let response = self.get(&key).await?; + response.map(|kv| decoder(lease_value_type, kv)).transpose() + } +} + +#[cfg(test)] +mod tests { + use std::time::Duration; + + use api::v1::meta::DatanodeWorkloads; + use api::v1::meta::heartbeat_request::NodeWorkloads; + use common_meta::cluster::{FrontendStatus, NodeInfo, NodeInfoKey, NodeStatus, Role}; + use common_meta::distributed_time_constants::FRONTEND_HEARTBEAT_INTERVAL_MILLIS; + use common_meta::kv_backend::ResettableKvBackendRef; + use common_meta::peer::{Peer, PeerDiscovery}; + use common_meta::rpc::store::PutRequest; + use common_time::util::current_time_millis; + use common_workload::DatanodeWorkloadType; + + use crate::discovery::utils::{self, is_datanode_accept_ingest_workload}; + use crate::key::{DatanodeLeaseKey, LeaseValue}; + use crate::test_util::create_meta_peer_client; + + async fn put_lease_value( + kv_backend: &ResettableKvBackendRef, + key: DatanodeLeaseKey, + value: LeaseValue, + ) { + kv_backend + .put(PutRequest { + key: key.try_into().unwrap(), + value: value.try_into().unwrap(), + prev_kv: false, + }) + .await + .unwrap(); + } + + #[tokio::test] + async fn test_alive_datanodes() { + let client = create_meta_peer_client(); + let in_memory = client.memory_backend(); + let lease_secs = 10; + + // put a stale lease value for node 1 + let key = DatanodeLeaseKey { node_id: 1 }; + let value = LeaseValue { + // 20s ago + timestamp_millis: current_time_millis() - lease_secs * 2 * 1000, + node_addr: "127.0.0.1:20201".to_string(), + workloads: NodeWorkloads::Datanode(DatanodeWorkloads { + types: vec![DatanodeWorkloadType::Hybrid as i32], + }), + }; + put_lease_value(&in_memory, key, value).await; + + // put a fresh lease value for node 2 + let key = DatanodeLeaseKey { node_id: 2 }; + let value = LeaseValue { + timestamp_millis: current_time_millis(), + node_addr: "127.0.0.1:20202".to_string(), + workloads: NodeWorkloads::Datanode(DatanodeWorkloads { + types: vec![DatanodeWorkloadType::Hybrid as i32], + }), + }; + put_lease_value(&in_memory, key.clone(), value.clone()).await; + let peers = utils::alive_datanodes( + client.as_ref(), + Duration::from_secs(lease_secs as u64), + None, + ) + .await + .unwrap(); + assert_eq!(peers.len(), 1); + assert_eq!(peers, vec![Peer::new(2, "127.0.0.1:20202".to_string())]); + } + + #[tokio::test] + async fn test_alive_datanodes_with_condition() { + let client = create_meta_peer_client(); + let in_memory = client.memory_backend(); + let lease_secs = 10; + + // put a lease value for node 1 without mode info + let key = DatanodeLeaseKey { node_id: 1 }; + let value = LeaseValue { + // 20s ago + timestamp_millis: current_time_millis() - 20 * 1000, + node_addr: "127.0.0.1:20201".to_string(), + workloads: NodeWorkloads::Datanode(DatanodeWorkloads { + types: vec![DatanodeWorkloadType::Hybrid as i32], + }), + }; + put_lease_value(&in_memory, key, value).await; + + // put a lease value for node 2 with mode info + let key = DatanodeLeaseKey { node_id: 2 }; + let value = LeaseValue { + timestamp_millis: current_time_millis(), + node_addr: "127.0.0.1:20202".to_string(), + workloads: NodeWorkloads::Datanode(DatanodeWorkloads { + types: vec![DatanodeWorkloadType::Hybrid as i32], + }), + }; + put_lease_value(&in_memory, key, value).await; + + // put a lease value for node 3 with mode info + let key = DatanodeLeaseKey { node_id: 3 }; + let value = LeaseValue { + timestamp_millis: current_time_millis(), + node_addr: "127.0.0.1:20203".to_string(), + workloads: NodeWorkloads::Datanode(DatanodeWorkloads { + types: vec![i32::MAX], + }), + }; + put_lease_value(&in_memory, key, value).await; + + // put a lease value for node 3 with mode info + let key = DatanodeLeaseKey { node_id: 4 }; + let value = LeaseValue { + timestamp_millis: current_time_millis(), + node_addr: "127.0.0.1:20204".to_string(), + workloads: NodeWorkloads::Datanode(DatanodeWorkloads { + types: vec![i32::MAX], + }), + }; + put_lease_value(&in_memory, key, value).await; + + let peers = utils::alive_datanodes( + client.as_ref(), + Duration::from_secs(lease_secs), + Some(is_datanode_accept_ingest_workload), + ) + .await + .unwrap(); + assert_eq!(peers.len(), 1); + assert!(peers.contains(&Peer::new(2, "127.0.0.1:20202".to_string()))); + } + + #[tokio::test] + async fn test_lookup_frontends() { + let client = create_meta_peer_client(); + let in_memory = client.memory_backend(); + let lease_secs = 10; + + let active_frontend_node = NodeInfo { + peer: Peer { + id: 0, + addr: "127.0.0.1:20201".to_string(), + }, + last_activity_ts: current_time_millis(), + status: NodeStatus::Frontend(FrontendStatus {}), + version: "1.0.0".to_string(), + git_commit: "1234567890".to_string(), + start_time_ms: current_time_millis() as u64, + cpus: 0, + memory_bytes: 0, + }; + + let key_prefix = NodeInfoKey::key_prefix_with_role(Role::Frontend); + + in_memory + .put(PutRequest { + key: format!("{}{}", key_prefix, "0").into(), + value: active_frontend_node.try_into().unwrap(), + prev_kv: false, + }) + .await + .unwrap(); + + let inactive_frontend_node = NodeInfo { + peer: Peer { + id: 1, + addr: "127.0.0.1:20201".to_string(), + }, + last_activity_ts: current_time_millis() - 20 * 1000, + status: NodeStatus::Frontend(FrontendStatus {}), + version: "1.0.0".to_string(), + git_commit: "1234567890".to_string(), + start_time_ms: current_time_millis() as u64, + cpus: 0, + memory_bytes: 0, + }; + + in_memory + .put(PutRequest { + key: format!("{}{}", key_prefix, "1").into(), + value: inactive_frontend_node.try_into().unwrap(), + prev_kv: false, + }) + .await + .unwrap(); + + let peers = utils::alive_frontends(client.as_ref(), Duration::from_secs(lease_secs)) + .await + .unwrap(); + assert_eq!(peers.len(), 1); + assert_eq!(peers[0].id, 0); + } + + #[tokio::test] + async fn test_no_active_frontends() { + let client = create_meta_peer_client(); + let in_memory = client.memory_backend(); + + let last_activity_ts = + current_time_millis() - FRONTEND_HEARTBEAT_INTERVAL_MILLIS as i64 - 1000; + let active_frontend_node = NodeInfo { + peer: Peer { + id: 0, + addr: "127.0.0.1:20201".to_string(), + }, + last_activity_ts, + status: NodeStatus::Frontend(FrontendStatus {}), + version: "1.0.0".to_string(), + git_commit: "1234567890".to_string(), + start_time_ms: last_activity_ts as u64, + cpus: 0, + memory_bytes: 0, + }; + + let key_prefix = NodeInfoKey::key_prefix_with_role(Role::Frontend); + + in_memory + .put(PutRequest { + key: format!("{}{}", key_prefix, "0").into(), + value: active_frontend_node.try_into().unwrap(), + prev_kv: false, + }) + .await + .unwrap(); + + let peers = client.active_frontends().await.unwrap(); + assert_eq!(peers.len(), 0); + } +} diff --git a/src/meta-srv/src/discovery/node_info.rs b/src/meta-srv/src/discovery/node_info.rs new file mode 100644 index 0000000000..03a19173c1 --- /dev/null +++ b/src/meta-srv/src/discovery/node_info.rs @@ -0,0 +1,63 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use common_meta::cluster::{NodeInfo, NodeInfoKey, Role}; +use common_meta::kv_backend::KvBackend; +use common_meta::rpc::store::RangeRequest; +use snafu::ResultExt; + +use crate::cluster::MetaPeerClient; +use crate::error::{InvalidNodeInfoFormatSnafu, Result}; + +#[derive(Clone, Copy)] +pub enum NodeInfoType { + Frontend, + Datanode, + Flownode, +} + +impl From for Role { + fn from(node_info_type: NodeInfoType) -> Self { + match node_info_type { + NodeInfoType::Frontend => Role::Frontend, + NodeInfoType::Datanode => Role::Datanode, + NodeInfoType::Flownode => Role::Flownode, + } + } +} + +/// Trait for types that can access node info. +#[async_trait::async_trait] +pub trait NodeInfoAccessor: Send + Sync { + /// Returns the peer id and node info. + async fn node_infos(&self, node_info_type: NodeInfoType) -> Result>; +} + +#[async_trait::async_trait] +impl NodeInfoAccessor for MetaPeerClient { + async fn node_infos(&self, node_info_type: NodeInfoType) -> Result> { + let range_request = RangeRequest::new() + .with_prefix(NodeInfoKey::key_prefix_with_role(node_info_type.into())); + let response = self.range(range_request).await?; + + response + .kvs + .into_iter() + .map(|kv| { + let node_info = NodeInfo::try_from(kv.value).context(InvalidNodeInfoFormatSnafu)?; + Ok((node_info.peer.id, node_info)) + }) + .collect::>>() + } +} diff --git a/src/meta-srv/src/discovery/utils.rs b/src/meta-srv/src/discovery/utils.rs new file mode 100644 index 0000000000..ace6adc5da --- /dev/null +++ b/src/meta-srv/src/discovery/utils.rs @@ -0,0 +1,182 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::time::Duration; + +use api::v1::meta::heartbeat_request::NodeWorkloads; +use common_meta::DatanodeId; +use common_meta::cluster::NodeInfo; +use common_meta::kv_backend::KvBackendRef; +use common_meta::peer::Peer; +use common_time::util::{DefaultSystemTimer, SystemTimer}; +use common_workload::DatanodeWorkloadType; +use snafu::ResultExt; + +use crate::discovery::lease::{LeaseValueAccessor, LeaseValueType}; +use crate::discovery::node_info::{NodeInfoAccessor, NodeInfoType}; +use crate::error::{KvBackendSnafu, Result}; +use crate::key::{DatanodeLeaseKey, LeaseValue}; + +impl LastActiveTs for LeaseValue { + fn last_active_ts(&self) -> i64 { + self.timestamp_millis + } +} + +impl LastActiveTs for NodeInfo { + fn last_active_ts(&self) -> i64 { + self.last_activity_ts + } +} + +/// Trait for types that have a last active timestamp. +pub trait LastActiveTs { + /// Returns the last active timestamp in milliseconds. + fn last_active_ts(&self) -> i64; +} + +/// Builds a filter closure that checks whether a [`LastActiveTs`] item +/// is still within the specified active duration, relative to the +/// current time provided by the given [`SystemTimer`]. +/// +/// The returned closure uses the timestamp at the time of building, +/// so the "now" reference point is fixed when this function is called. +pub fn build_active_filter( + timer: impl SystemTimer, + active_duration: Duration, +) -> impl Fn(&T) -> bool { + let now = timer.current_time_millis(); + let active_duration = active_duration.as_millis() as u64; + move |item: &T| { + let elapsed = now.saturating_sub(item.last_active_ts()) as u64; + elapsed < active_duration + } +} + +/// Returns the alive datanodes. +pub async fn alive_datanodes( + accessor: &impl LeaseValueAccessor, + active_duration: Duration, + condition: Option bool>, +) -> Result> { + let active_filter = build_active_filter(DefaultSystemTimer, active_duration); + let condition = condition.unwrap_or(|_| true); + Ok(accessor + .lease_values(LeaseValueType::Datanode) + .await? + .into_iter() + .filter_map(|(peer_id, lease_value)| { + if active_filter(&lease_value) && condition(&lease_value.workloads) { + Some(Peer::new(peer_id, lease_value.node_addr)) + } else { + None + } + }) + .collect::>()) +} + +/// Returns the alive flownodes. +pub async fn alive_flownodes( + accessor: &impl LeaseValueAccessor, + active_duration: Duration, + condition: Option bool>, +) -> Result> { + let active_filter = build_active_filter(DefaultSystemTimer, active_duration); + let condition = condition.unwrap_or(|_| true); + Ok(accessor + .lease_values(LeaseValueType::Flownode) + .await? + .into_iter() + .filter_map(|(peer_id, lease_value)| { + if active_filter(&lease_value) && condition(&lease_value.workloads) { + Some(Peer::new(peer_id, lease_value.node_addr)) + } else { + None + } + }) + .collect::>()) +} + +/// Returns the alive frontends. +pub async fn alive_frontends( + lister: &impl NodeInfoAccessor, + active_duration: Duration, +) -> Result> { + let active_filter = build_active_filter(DefaultSystemTimer, active_duration); + Ok(lister + .node_infos(NodeInfoType::Frontend) + .await? + .into_iter() + .filter_map(|(_, node_info)| { + if active_filter(&node_info) { + Some(node_info.peer) + } else { + None + } + }) + .collect::>()) +} + +/// Returns the alive datanode peer. +pub async fn alive_datanode( + lister: &impl LeaseValueAccessor, + peer_id: u64, + active_duration: Duration, +) -> Result> { + let active_filter = build_active_filter(DefaultSystemTimer, active_duration); + let v = lister + .lease_value(LeaseValueType::Datanode, peer_id) + .await? + .filter(|(_, lease)| active_filter(lease)) + .map(|(peer_id, lease)| Peer::new(peer_id, lease.node_addr)); + + Ok(v) +} + +/// Returns true if the datanode can accept ingest workload based on its workload types. +/// +/// A datanode is considered to accept ingest workload if it supports either: +/// - Hybrid workload (both ingest and query workloads) +/// - Ingest workload (only ingest workload) +pub fn is_datanode_accept_ingest_workload(datanode_workloads: &NodeWorkloads) -> bool { + match &datanode_workloads { + NodeWorkloads::Datanode(workloads) => workloads + .types + .iter() + .filter_map(|w| DatanodeWorkloadType::from_i32(*w)) + .any(|w| w.accept_ingest()), + _ => false, + } +} + +/// Returns the lease value of the given datanode id, if the datanode is not found, returns None. +pub async fn find_datanode_lease_value( + in_memory: &KvBackendRef, + datanode_id: DatanodeId, +) -> Result> { + let lease_key = DatanodeLeaseKey { + node_id: datanode_id, + }; + let lease_key_bytes: Vec = lease_key.try_into()?; + let Some(kv) = in_memory + .get(&lease_key_bytes) + .await + .context(KvBackendSnafu)? + else { + return Ok(None); + }; + + let lease_value: LeaseValue = kv.value.try_into()?; + Ok(Some(lease_value)) +} diff --git a/src/meta-srv/src/error.rs b/src/meta-srv/src/error.rs index 29174a97f2..f35497b60d 100644 --- a/src/meta-srv/src/error.rs +++ b/src/meta-srv/src/error.rs @@ -54,8 +54,22 @@ pub enum Error { peer_id: u64, }, - #[snafu(display("Failed to lookup frontends"))] - LookupFrontends { + #[snafu(display("Failed to list active frontends"))] + ListActiveFrontends { + #[snafu(implicit)] + location: Location, + source: common_meta::error::Error, + }, + + #[snafu(display("Failed to list active datanodes"))] + ListActiveDatanodes { + #[snafu(implicit)] + location: Location, + source: common_meta::error::Error, + }, + + #[snafu(display("Failed to list active flownodes"))] + ListActiveFlownodes { #[snafu(implicit)] location: Location, source: common_meta::error::Error, @@ -796,6 +810,13 @@ pub enum Error { source: common_meta::error::Error, }, + #[snafu(display("Invalid node info format"))] + InvalidNodeInfoFormat { + #[snafu(implicit)] + location: Location, + source: common_meta::error::Error, + }, + #[snafu(display("Failed to serialize options to TOML"))] TomlFormat { #[snafu(implicit)] @@ -1055,7 +1076,8 @@ impl ErrorExt for Error { Error::TableNotFound { .. } => StatusCode::TableNotFound, Error::SaveClusterInfo { source, .. } | Error::InvalidClusterInfoFormat { source, .. } - | Error::InvalidDatanodeStatFormat { source, .. } => source.status_code(), + | Error::InvalidDatanodeStatFormat { source, .. } + | Error::InvalidNodeInfoFormat { source, .. } => source.status_code(), Error::InvalidateTableCache { source, .. } => source.status_code(), Error::SubmitProcedure { source, .. } | Error::WaitProcedure { source, .. } @@ -1085,7 +1107,9 @@ impl ErrorExt for Error { | Error::UnexpectedLogicalRouteTable { source, .. } | Error::UpdateTopicNameValue { source, .. } | Error::ParseWalOptions { source, .. } => source.status_code(), - Error::LookupFrontends { source, .. } => source.status_code(), + Error::ListActiveFrontends { source, .. } + | Error::ListActiveDatanodes { source, .. } + | Error::ListActiveFlownodes { source, .. } => source.status_code(), Error::NoAvailableFrontend { .. } => StatusCode::IllegalState, Error::InitMetadata { source, .. } diff --git a/src/meta-srv/src/flow_meta_alloc.rs b/src/meta-srv/src/flow_meta_alloc.rs deleted file mode 100644 index 5c9dd82301..0000000000 --- a/src/meta-srv/src/flow_meta_alloc.rs +++ /dev/null @@ -1,52 +0,0 @@ -// Copyright 2023 Greptime Team -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -use std::collections::HashSet; - -use common_error::ext::BoxedError; -use common_meta::ddl::flow_meta::PartitionPeerAllocator; -use common_meta::peer::Peer; -use snafu::ResultExt; - -use crate::metasrv::{SelectorContext, SelectorRef}; -use crate::selector::SelectorOptions; - -pub struct FlowPeerAllocator { - ctx: SelectorContext, - selector: SelectorRef, -} - -impl FlowPeerAllocator { - pub fn new(ctx: SelectorContext, selector: SelectorRef) -> Self { - Self { ctx, selector } - } -} - -#[async_trait::async_trait] -impl PartitionPeerAllocator for FlowPeerAllocator { - async fn alloc(&self, partitions: usize) -> common_meta::error::Result> { - self.selector - .select( - &self.ctx, - SelectorOptions { - min_required_items: partitions, - allow_duplication: true, - exclude_peer_ids: HashSet::new(), - }, - ) - .await - .map_err(BoxedError::new) - .context(common_meta::error::ExternalSnafu) - } -} diff --git a/src/meta-srv/src/handler/keep_lease_handler.rs b/src/meta-srv/src/handler/keep_lease_handler.rs index 53273ce40e..45570a831d 100644 --- a/src/meta-srv/src/handler/keep_lease_handler.rs +++ b/src/meta-srv/src/handler/keep_lease_handler.rs @@ -132,8 +132,8 @@ mod tests { use common_meta::datanode::Stat; use super::*; + use crate::discovery::utils::find_datanode_lease_value; use crate::handler::test_utils::TestEnv; - use crate::lease::find_datanode_lease_value; #[tokio::test] async fn test_put_into_memory_store() { @@ -143,7 +143,8 @@ mod tests { let handler = DatanodeKeepLeaseHandler; handle_request_many_times(ctx.clone(), &handler, 1).await; - let lease_value = find_datanode_lease_value(1, &ctx.in_memory) + let in_memory = ctx.in_memory.clone() as _; + let lease_value = find_datanode_lease_value(&in_memory, 1) .await .unwrap() .unwrap(); diff --git a/src/meta-srv/src/lease.rs b/src/meta-srv/src/lease.rs deleted file mode 100644 index 6fcd4d9fb9..0000000000 --- a/src/meta-srv/src/lease.rs +++ /dev/null @@ -1,552 +0,0 @@ -// Copyright 2023 Greptime Team -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -use std::collections::HashMap; -use std::future::Future; -use std::hash::Hash; -use std::pin::Pin; -use std::task::{Context, Poll}; -use std::time::Duration; - -use api::v1::meta::heartbeat_request::NodeWorkloads; -use common_error::ext::BoxedError; -use common_meta::cluster::{NodeInfo, NodeInfoKey, Role as ClusterRole}; -use common_meta::distributed_time_constants::FRONTEND_HEARTBEAT_INTERVAL_MILLIS; -use common_meta::kv_backend::{KvBackend, ResettableKvBackendRef}; -use common_meta::peer::{Peer, PeerLookupService}; -use common_meta::rpc::store::RangeRequest; -use common_meta::{DatanodeId, FlownodeId, util}; -use common_time::util as time_util; -use common_workload::DatanodeWorkloadType; -use snafu::ResultExt; - -use crate::cluster::MetaPeerClientRef; -use crate::error::{Error, KvBackendSnafu, Result}; -use crate::key::{DatanodeLeaseKey, FlownodeLeaseKey, LeaseValue}; - -enum Value<'a> { - LeaseValue(&'a LeaseValue), - NodeInfo(&'a NodeInfo), -} - -fn build_lease_filter(lease: Duration) -> impl Fn(Value) -> bool { - move |value: Value| { - let active_time = match value { - Value::LeaseValue(lease_value) => lease_value.timestamp_millis, - Value::NodeInfo(node_info) => node_info.last_activity_ts, - }; - - ((time_util::current_time_millis() - active_time) as u64) < lease.as_millis() as u64 - } -} - -/// Returns true if the datanode can accept ingest workload based on its workload types. -/// -/// A datanode is considered to accept ingest workload if it supports either: -/// - Hybrid workload (both ingest and query workloads) -/// - Ingest workload (only ingest workload) -pub fn is_datanode_accept_ingest_workload(lease_value: &LeaseValue) -> bool { - match &lease_value.workloads { - NodeWorkloads::Datanode(workloads) => workloads - .types - .iter() - .filter_map(|w| DatanodeWorkloadType::from_i32(*w)) - .any(|w| w.accept_ingest()), - _ => false, - } -} - -/// Returns the lease value of the given datanode id, if the datanode is not found, returns None. -pub async fn find_datanode_lease_value( - datanode_id: DatanodeId, - in_memory_key: &ResettableKvBackendRef, -) -> Result> { - let lease_key = DatanodeLeaseKey { - node_id: datanode_id, - }; - let lease_key_bytes: Vec = lease_key.try_into()?; - let Some(kv) = in_memory_key - .get(&lease_key_bytes) - .await - .context(KvBackendSnafu)? - else { - return Ok(None); - }; - - let lease_value: LeaseValue = kv.value.try_into()?; - - Ok(Some(lease_value)) -} - -/// look up [`Peer`] given [`ClusterId`] and [`DatanodeId`], will only return if it's alive under given `lease_secs` -pub async fn lookup_datanode_peer( - datanode_id: DatanodeId, - meta_peer_client: &MetaPeerClientRef, - lease: Duration, -) -> Result> { - let lease_filter = build_lease_filter(lease); - let lease_key = DatanodeLeaseKey { - node_id: datanode_id, - }; - let lease_key_bytes: Vec = lease_key.clone().try_into()?; - let Some(kv) = meta_peer_client.get(&lease_key_bytes).await? else { - return Ok(None); - }; - let lease_value: LeaseValue = kv.value.try_into()?; - let is_alive = lease_filter(Value::LeaseValue(&lease_value)); - if is_alive { - Ok(Some(Peer { - id: lease_key.node_id, - addr: lease_value.node_addr, - })) - } else { - Ok(None) - } -} - -type LeaseFilterFuture<'a, K> = - Pin>> + Send + 'a>>; - -pub struct LeaseFilter<'a, K> -where - K: Eq + Hash + TryFrom, Error = Error> + 'a, -{ - lease: Duration, - key_prefix: Vec, - meta_peer_client: &'a MetaPeerClientRef, - condition: Option bool>, - inner_future: Option>, -} - -impl<'a, K> LeaseFilter<'a, K> -where - K: Eq + Hash + TryFrom, Error = Error> + 'a, -{ - pub fn new( - lease: Duration, - key_prefix: Vec, - meta_peer_client: &'a MetaPeerClientRef, - ) -> Self { - Self { - lease, - key_prefix, - meta_peer_client, - condition: None, - inner_future: None, - } - } - - /// Set the condition for the lease filter. - pub fn with_condition(mut self, condition: fn(&LeaseValue) -> bool) -> Self { - self.condition = Some(condition); - self - } -} - -impl<'a, K> Future for LeaseFilter<'a, K> -where - K: Eq + Hash + TryFrom, Error = Error> + 'a, -{ - type Output = Result>; - - fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { - let this = self.get_mut(); - - if this.inner_future.is_none() { - let lease_filter = build_lease_filter(this.lease); - let condition = this.condition; - let key_prefix = std::mem::take(&mut this.key_prefix); - let fut = filter(key_prefix, this.meta_peer_client, move |v| { - lease_filter(Value::LeaseValue(v)) && condition.unwrap_or(|_| true)(v) - }); - - this.inner_future = Some(Box::pin(fut)); - } - - let fut = this.inner_future.as_mut().unwrap(); - let result = futures::ready!(fut.as_mut().poll(cx))?; - - Poll::Ready(Ok(result)) - } -} - -/// Find all alive datanodes -pub fn alive_datanodes( - meta_peer_client: &MetaPeerClientRef, - lease: Duration, -) -> LeaseFilter<'_, DatanodeLeaseKey> { - LeaseFilter::new(lease, DatanodeLeaseKey::prefix_key(), meta_peer_client) -} - -/// look up [`Peer`] given [`ClusterId`] and [`DatanodeId`], only return if it's alive under given `lease_secs` -pub async fn lookup_flownode_peer( - flownode_id: FlownodeId, - meta_peer_client: &MetaPeerClientRef, - lease: Duration, -) -> Result> { - let lease_filter = build_lease_filter(lease); - let lease_key = FlownodeLeaseKey { - node_id: flownode_id, - }; - let lease_key_bytes: Vec = lease_key.clone().try_into()?; - let Some(kv) = meta_peer_client.get(&lease_key_bytes).await? else { - return Ok(None); - }; - let lease_value: LeaseValue = kv.value.try_into()?; - - let is_alive = lease_filter(Value::LeaseValue(&lease_value)); - if is_alive { - Ok(Some(Peer { - id: lease_key.node_id, - addr: lease_value.node_addr, - })) - } else { - Ok(None) - } -} - -/// Lookup all alive frontends from the memory backend, only return if it's alive under given `lease_secs`. -pub async fn lookup_frontends( - meta_peer_client: &MetaPeerClientRef, - lease: Duration, -) -> Result> { - let range_request = - RangeRequest::new().with_prefix(NodeInfoKey::key_prefix_with_role(ClusterRole::Frontend)); - - let response = meta_peer_client.range(range_request).await?; - let lease_filter = build_lease_filter(lease); - - let mut peers = Vec::with_capacity(response.kvs.len()); - for kv in response.kvs { - let node_info = NodeInfo::try_from(kv.value).context(KvBackendSnafu)?; - let is_alive = lease_filter(Value::NodeInfo(&node_info)); - if is_alive { - peers.push(node_info.peer); - } - } - - Ok(peers) -} - -/// Find all alive flownodes -pub fn alive_flownodes( - meta_peer_client: &MetaPeerClientRef, - lease: Duration, -) -> LeaseFilter<'_, FlownodeLeaseKey> { - LeaseFilter::new( - lease, - FlownodeLeaseKey::prefix_key_by_cluster(), - meta_peer_client, - ) -} - -pub async fn filter( - key: Vec, - meta_peer_client: &MetaPeerClientRef, - predicate: P, -) -> Result> -where - P: Fn(&LeaseValue) -> bool, - K: Eq + Hash + TryFrom, Error = Error>, -{ - let range_end = util::get_prefix_end_key(&key); - let range_req = common_meta::rpc::store::RangeRequest { - key, - range_end, - keys_only: false, - ..Default::default() - }; - let kvs = meta_peer_client.range(range_req).await?.kvs; - let mut lease_kvs = HashMap::new(); - for kv in kvs { - let lease_key: K = kv.key.try_into()?; - let lease_value: LeaseValue = kv.value.try_into()?; - if !predicate(&lease_value) { - continue; - } - let _ = lease_kvs.insert(lease_key, lease_value); - } - - Ok(lease_kvs) -} - -#[derive(Clone)] -pub struct MetaPeerLookupService { - pub meta_peer_client: MetaPeerClientRef, -} - -impl MetaPeerLookupService { - pub fn new(meta_peer_client: MetaPeerClientRef) -> Self { - Self { meta_peer_client } - } -} - -#[async_trait::async_trait] -impl PeerLookupService for MetaPeerLookupService { - async fn datanode(&self, id: DatanodeId) -> common_meta::error::Result> { - lookup_datanode_peer(id, &self.meta_peer_client, Duration::from_secs(u64::MAX)) - .await - .map_err(BoxedError::new) - .context(common_meta::error::ExternalSnafu) - } - - async fn flownode(&self, id: FlownodeId) -> common_meta::error::Result> { - lookup_flownode_peer(id, &self.meta_peer_client, Duration::from_secs(u64::MAX)) - .await - .map_err(BoxedError::new) - .context(common_meta::error::ExternalSnafu) - } - - async fn active_frontends(&self) -> common_meta::error::Result> { - // Get the active frontends within the last heartbeat interval. - lookup_frontends( - &self.meta_peer_client, - // TODO(zyy17): How to get the heartbeat interval of the frontend if it uses a custom heartbeat interval? - Duration::from_millis(FRONTEND_HEARTBEAT_INTERVAL_MILLIS), - ) - .await - .map_err(BoxedError::new) - .context(common_meta::error::ExternalSnafu) - } -} - -#[cfg(test)] -mod tests { - use std::time::Duration; - - use api::v1::meta::DatanodeWorkloads; - use api::v1::meta::heartbeat_request::NodeWorkloads; - use common_meta::cluster::{FrontendStatus, NodeInfo, NodeInfoKey, NodeStatus}; - use common_meta::distributed_time_constants::FRONTEND_HEARTBEAT_INTERVAL_MILLIS; - use common_meta::kv_backend::ResettableKvBackendRef; - use common_meta::peer::{Peer, PeerLookupService}; - use common_meta::rpc::store::PutRequest; - use common_time::util::current_time_millis; - use common_workload::DatanodeWorkloadType; - - use crate::key::{DatanodeLeaseKey, LeaseValue}; - use crate::lease::{ - ClusterRole, MetaPeerLookupService, alive_datanodes, is_datanode_accept_ingest_workload, - lookup_frontends, - }; - use crate::test_util::create_meta_peer_client; - - async fn put_lease_value( - kv_backend: &ResettableKvBackendRef, - key: DatanodeLeaseKey, - value: LeaseValue, - ) { - kv_backend - .put(PutRequest { - key: key.try_into().unwrap(), - value: value.try_into().unwrap(), - prev_kv: false, - }) - .await - .unwrap(); - } - - #[tokio::test] - async fn test_alive_datanodes() { - let client = create_meta_peer_client(); - let in_memory = client.memory_backend(); - let lease_secs = 10; - - // put a stale lease value for node 1 - let key = DatanodeLeaseKey { node_id: 1 }; - let value = LeaseValue { - // 20s ago - timestamp_millis: current_time_millis() - lease_secs * 2 * 1000, - node_addr: "127.0.0.1:20201".to_string(), - workloads: NodeWorkloads::Datanode(DatanodeWorkloads { - types: vec![DatanodeWorkloadType::Hybrid as i32], - }), - }; - put_lease_value(&in_memory, key, value).await; - - // put a fresh lease value for node 2 - let key = DatanodeLeaseKey { node_id: 2 }; - let value = LeaseValue { - timestamp_millis: current_time_millis(), - node_addr: "127.0.0.1:20202".to_string(), - workloads: NodeWorkloads::Datanode(DatanodeWorkloads { - types: vec![DatanodeWorkloadType::Hybrid as i32], - }), - }; - put_lease_value(&in_memory, key.clone(), value.clone()).await; - let leases = alive_datanodes(&client, Duration::from_secs(lease_secs as u64)) - .await - .unwrap(); - assert_eq!(leases.len(), 1); - assert_eq!(leases.get(&key), Some(&value)); - } - - #[tokio::test] - async fn test_alive_datanodes_with_condition() { - let client = create_meta_peer_client(); - let in_memory = client.memory_backend(); - let lease_secs = 10; - - // put a lease value for node 1 without mode info - let key = DatanodeLeaseKey { node_id: 1 }; - let value = LeaseValue { - // 20s ago - timestamp_millis: current_time_millis() - 20 * 1000, - node_addr: "127.0.0.1:20201".to_string(), - workloads: NodeWorkloads::Datanode(DatanodeWorkloads { - types: vec![DatanodeWorkloadType::Hybrid as i32], - }), - }; - put_lease_value(&in_memory, key, value).await; - - // put a lease value for node 2 with mode info - let key = DatanodeLeaseKey { node_id: 2 }; - let value = LeaseValue { - timestamp_millis: current_time_millis(), - node_addr: "127.0.0.1:20202".to_string(), - workloads: NodeWorkloads::Datanode(DatanodeWorkloads { - types: vec![DatanodeWorkloadType::Hybrid as i32], - }), - }; - put_lease_value(&in_memory, key, value).await; - - // put a lease value for node 3 with mode info - let key = DatanodeLeaseKey { node_id: 3 }; - let value = LeaseValue { - timestamp_millis: current_time_millis(), - node_addr: "127.0.0.1:20203".to_string(), - workloads: NodeWorkloads::Datanode(DatanodeWorkloads { - types: vec![i32::MAX], - }), - }; - put_lease_value(&in_memory, key, value).await; - - // put a lease value for node 3 with mode info - let key = DatanodeLeaseKey { node_id: 4 }; - let value = LeaseValue { - timestamp_millis: current_time_millis(), - node_addr: "127.0.0.1:20204".to_string(), - workloads: NodeWorkloads::Datanode(DatanodeWorkloads { - types: vec![i32::MAX], - }), - }; - put_lease_value(&in_memory, key, value).await; - - let leases = alive_datanodes(&client, Duration::from_secs(lease_secs)) - .with_condition(is_datanode_accept_ingest_workload) - .await - .unwrap(); - assert_eq!(leases.len(), 1); - assert!(leases.contains_key(&DatanodeLeaseKey { node_id: 2 })); - } - - #[tokio::test] - async fn test_lookup_frontends() { - let client = create_meta_peer_client(); - let in_memory = client.memory_backend(); - let lease_secs = 10; - - let active_frontend_node = NodeInfo { - peer: Peer { - id: 0, - addr: "127.0.0.1:20201".to_string(), - }, - last_activity_ts: current_time_millis(), - status: NodeStatus::Frontend(FrontendStatus {}), - version: "1.0.0".to_string(), - git_commit: "1234567890".to_string(), - start_time_ms: current_time_millis() as u64, - cpus: 0, - memory_bytes: 0, - }; - - let key_prefix = NodeInfoKey::key_prefix_with_role(ClusterRole::Frontend); - - in_memory - .put(PutRequest { - key: format!("{}{}", key_prefix, "0").into(), - value: active_frontend_node.try_into().unwrap(), - prev_kv: false, - }) - .await - .unwrap(); - - let inactive_frontend_node = NodeInfo { - peer: Peer { - id: 1, - addr: "127.0.0.1:20201".to_string(), - }, - last_activity_ts: current_time_millis() - 20 * 1000, - status: NodeStatus::Frontend(FrontendStatus {}), - version: "1.0.0".to_string(), - git_commit: "1234567890".to_string(), - start_time_ms: current_time_millis() as u64, - cpus: 0, - memory_bytes: 0, - }; - - in_memory - .put(PutRequest { - key: format!("{}{}", key_prefix, "1").into(), - value: inactive_frontend_node.try_into().unwrap(), - prev_kv: false, - }) - .await - .unwrap(); - - let peers = lookup_frontends(&client, Duration::from_secs(lease_secs)) - .await - .unwrap(); - - assert_eq!(peers.len(), 1); - assert_eq!(peers[0].id, 0); - } - - #[tokio::test] - async fn test_no_active_frontends() { - let client = create_meta_peer_client(); - let in_memory = client.memory_backend(); - - let last_activity_ts = - current_time_millis() - FRONTEND_HEARTBEAT_INTERVAL_MILLIS as i64 - 1000; - let active_frontend_node = NodeInfo { - peer: Peer { - id: 0, - addr: "127.0.0.1:20201".to_string(), - }, - last_activity_ts, - status: NodeStatus::Frontend(FrontendStatus {}), - version: "1.0.0".to_string(), - git_commit: "1234567890".to_string(), - start_time_ms: last_activity_ts as u64, - cpus: 0, - memory_bytes: 0, - }; - - let key_prefix = NodeInfoKey::key_prefix_with_role(ClusterRole::Frontend); - - in_memory - .put(PutRequest { - key: format!("{}{}", key_prefix, "0").into(), - value: active_frontend_node.try_into().unwrap(), - prev_kv: false, - }) - .await - .unwrap(); - - let service = MetaPeerLookupService::new(client); - let peers = service.active_frontends().await.unwrap(); - assert_eq!(peers.len(), 0); - } -} diff --git a/src/meta-srv/src/lib.rs b/src/meta-srv/src/lib.rs index 7123598ec4..c2a3de9544 100644 --- a/src/meta-srv/src/lib.rs +++ b/src/meta-srv/src/lib.rs @@ -18,30 +18,29 @@ #![feature(let_chains)] #![feature(duration_constructors_lite)] #![feature(duration_constructors)] +#![feature(string_from_utf8_lossy_owned)] pub mod bootstrap; pub mod cache_invalidator; pub mod cluster; +pub mod discovery; pub mod election; pub mod error; pub mod events; mod failure_detector; -pub mod flow_meta_alloc; pub mod handler; pub mod key; -pub mod lease; pub mod metasrv; pub mod metrics; #[cfg(feature = "mock")] pub mod mocks; -pub mod node_excluder; +pub mod peer; pub mod procedure; pub mod pubsub; pub mod region; pub mod selector; pub mod service; pub mod state; -pub mod table_meta_alloc; pub mod utils; pub use crate::error::Result; diff --git a/src/meta-srv/src/metasrv.rs b/src/meta-srv/src/metasrv.rs index 73c4909e9a..82780ea721 100644 --- a/src/meta-srv/src/metasrv.rs +++ b/src/meta-srv/src/metasrv.rs @@ -36,7 +36,7 @@ use common_meta::leadership_notifier::{ LeadershipChangeNotifier, LeadershipChangeNotifierCustomizerRef, }; use common_meta::node_expiry_listener::NodeExpiryListener; -use common_meta::peer::Peer; +use common_meta::peer::{Peer, PeerDiscoveryRef}; use common_meta::reconciliation::manager::ReconciliationManagerRef; use common_meta::region_keeper::MemoryRegionKeeperRef; use common_meta::region_registry::LeaderRegionRegistryRef; @@ -57,10 +57,10 @@ use servers::http::HttpOptions; use servers::tls::TlsOption; use snafu::{OptionExt, ResultExt}; use store_api::storage::RegionId; -use table::metadata::TableId; use tokio::sync::broadcast::error::RecvError; use crate::cluster::MetaPeerClientRef; +use crate::discovery; use crate::election::{Election, LeaderChangeMessage}; use crate::error::{ self, InitMetadataSnafu, KvBackendSnafu, Result, StartProcedureManagerSnafu, @@ -68,7 +68,6 @@ use crate::error::{ }; use crate::failure_detector::PhiAccrualFailureDetectorOptions; use crate::handler::{HeartbeatHandlerGroupBuilder, HeartbeatHandlerGroupRef}; -use crate::lease::lookup_datanode_peer; use crate::procedure::ProcedureManagerListenerAdapter; use crate::procedure::region_migration::manager::RegionMigrationManagerRef; use crate::procedure::wal_prune::manager::WalPruneTickerRef; @@ -414,12 +413,7 @@ impl Display for SelectTarget { #[derive(Clone)] pub struct SelectorContext { - pub server_addr: String, - pub datanode_lease_secs: u64, - pub flownode_lease_secs: u64, - pub kv_backend: KvBackendRef, - pub meta_peer_client: MetaPeerClientRef, - pub table_id: Option, + pub peer_discovery: PeerDiscoveryRef, } pub type SelectorRef = Arc>>; @@ -708,9 +702,9 @@ impl Metasrv { /// Looks up a datanode peer by peer_id, returning it only when it's alive. /// A datanode is considered alive when it's still within the lease period. pub(crate) async fn lookup_datanode_peer(&self, peer_id: u64) -> Result> { - lookup_datanode_peer( + discovery::utils::alive_datanode( + self.meta_peer_client.as_ref(), peer_id, - &self.meta_peer_client, Duration::from_secs(distributed_time_constants::DATANODE_LEASE_SECS), ) .await diff --git a/src/meta-srv/src/metasrv/builder.rs b/src/meta-srv/src/metasrv/builder.rs index 87529fdfbf..d55e2bb7bf 100644 --- a/src/meta-srv/src/metasrv/builder.rs +++ b/src/meta-srv/src/metasrv/builder.rs @@ -48,23 +48,24 @@ use common_procedure::ProcedureManagerRef; use common_procedure::local::{LocalManager, ManagerConfig}; use common_telemetry::{info, warn}; use snafu::{ResultExt, ensure}; +use store_api::storage::MAX_REGION_SEQ; +use crate::bootstrap::build_default_meta_peer_client; use crate::cache_invalidator::MetasrvCacheInvalidator; -use crate::cluster::{MetaPeerClientBuilder, MetaPeerClientRef}; +use crate::cluster::MetaPeerClientRef; use crate::error::{self, BuildWalOptionsAllocatorSnafu, Result}; use crate::events::EventHandlerImpl; -use crate::flow_meta_alloc::FlowPeerAllocator; use crate::greptimedb_telemetry::get_greptimedb_telemetry_task; use crate::handler::failure_handler::RegionFailureHandler; use crate::handler::flow_state_handler::FlowStateHandler; use crate::handler::persist_stats_handler::PersistStatsHandler; use crate::handler::region_lease_handler::{CustomizedRegionLeaseRenewerRef, RegionLeaseHandler}; use crate::handler::{HeartbeatHandlerGroupBuilder, HeartbeatMailbox, Pushers}; -use crate::lease::MetaPeerLookupService; use crate::metasrv::{ ElectionRef, FLOW_ID_SEQ, METASRV_DATA_DIR, Metasrv, MetasrvInfo, MetasrvOptions, RegionStatAwareSelectorRef, SelectTarget, SelectorContext, SelectorRef, TABLE_ID_SEQ, }; +use crate::peer::MetasrvPeerAllocator; use crate::procedure::region_migration::DefaultContextFactory; use crate::procedure::region_migration::manager::RegionMigrationManager; use crate::procedure::wal_prune::Context as WalPruneContext; @@ -80,7 +81,6 @@ use crate::selector::round_robin::RoundRobinSelector; use crate::service::mailbox::MailboxRef; use crate::service::store::cached_kv::LeaderCachedKvBackend; use crate::state::State; -use crate::table_meta_alloc::MetasrvPeerAllocator; use crate::utils::insert_forwarder::InsertForwarder; /// The time window for twcs compaction of the region stats table. @@ -203,10 +203,9 @@ impl MetasrvBuilder { let meta_peer_client = meta_peer_client .unwrap_or_else(|| build_default_meta_peer_client(&election, &in_memory)); - let peer_lookup_service = Arc::new(MetaPeerLookupService::new(meta_peer_client.clone())); let event_inserter = Box::new(InsertForwarder::new( - peer_lookup_service.clone(), + meta_peer_client.clone(), Some(InsertOptions { ttl: options.event_recorder.ttl, append_mode: true, @@ -218,7 +217,7 @@ impl MetasrvBuilder { event_inserter, )))); - let selector = selector.unwrap_or_else(|| Arc::new(LeaseBasedSelector::default())); + let selector = selector.unwrap_or_else(|| Arc::new(LeaseBasedSelector)); let pushers = Pushers::default(); let mailbox = build_mailbox(&kv_backend, &pushers); let runtime_switch_manager = Arc::new(RuntimeSwitchManager::new(kv_backend.clone())); @@ -237,12 +236,7 @@ impl MetasrvBuilder { )); let selector_ctx = SelectorContext { - server_addr: options.grpc.server_addr.clone(), - datanode_lease_secs: distributed_time_constants::DATANODE_LEASE_SECS, - flownode_lease_secs: distributed_time_constants::FLOWNODE_LEASE_SECS, - kv_backend: kv_backend.clone(), - meta_peer_client: meta_peer_client.clone(), - table_id: None, + peer_discovery: meta_peer_client.clone(), }; let wal_options_allocator = build_wal_options_allocator(&options.wal, kv_backend.clone()) @@ -257,10 +251,10 @@ impl MetasrvBuilder { .step(10) .build(), ); - let peer_allocator = Arc::new(MetasrvPeerAllocator::new( - selector_ctx.clone(), - selector.clone(), - )); + let peer_allocator = Arc::new( + MetasrvPeerAllocator::new(selector_ctx.clone(), selector.clone()) + .with_max_items(MAX_REGION_SEQ), + ); Arc::new(TableMetadataAllocator::with_peer_allocator( sequence, wal_options_allocator.clone(), @@ -269,15 +263,13 @@ impl MetasrvBuilder { }); let table_id_sequence = table_metadata_allocator.table_id_sequence(); - let flow_selector = Arc::new(RoundRobinSelector::new( - SelectTarget::Flownode, - Arc::new(Vec::new()), - )) as SelectorRef; + let flow_selector = + Arc::new(RoundRobinSelector::new(SelectTarget::Flownode)) as SelectorRef; let flow_metadata_allocator = { // for now flownode just use round-robin selector let flow_selector_ctx = selector_ctx.clone(); - let peer_allocator = Arc::new(FlowPeerAllocator::new( + let peer_allocator = Arc::new(MetasrvPeerAllocator::new( flow_selector_ctx, flow_selector.clone(), )); @@ -378,7 +370,7 @@ impl MetasrvBuilder { supervisor_selector, region_migration_manager.clone(), runtime_switch_manager.clone(), - peer_lookup_service.clone(), + meta_peer_client.clone(), leader_cached_kv_backend.clone(), ); @@ -470,7 +462,7 @@ impl MetasrvBuilder { let persist_region_stats_handler = if !options.stats_persistence.ttl.is_zero() { let inserter = Box::new(InsertForwarder::new( - peer_lookup_service.clone(), + meta_peer_client.clone(), Some(InsertOptions { ttl: options.stats_persistence.ttl, append_mode: true, @@ -569,19 +561,6 @@ impl MetasrvBuilder { } } -fn build_default_meta_peer_client( - election: &Option, - in_memory: &ResettableKvBackendRef, -) -> MetaPeerClientRef { - MetaPeerClientBuilder::default() - .election(election.clone()) - .in_memory(in_memory.clone()) - .build() - .map(Arc::new) - // Safety: all required fields set at initialization - .unwrap() -} - fn build_mailbox(kv_backend: &KvBackendRef, pushers: &Pushers) -> MailboxRef { let mailbox_sequence = SequenceBuilder::new("heartbeat_mailbox", kv_backend.clone()) .initial(1) diff --git a/src/meta-srv/src/node_excluder.rs b/src/meta-srv/src/node_excluder.rs deleted file mode 100644 index a7bc6e0f69..0000000000 --- a/src/meta-srv/src/node_excluder.rs +++ /dev/null @@ -1,32 +0,0 @@ -// Copyright 2023 Greptime Team -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -use std::sync::Arc; - -use common_meta::DatanodeId; - -pub type NodeExcluderRef = Arc; - -/// [NodeExcluder] is used to help decide whether some nodes should be excluded (out of consideration) -/// in certain situations. For example, in some node selectors. -pub trait NodeExcluder: Send + Sync { - /// Returns the excluded datanode ids. - fn excluded_datanode_ids(&self) -> &Vec; -} - -impl NodeExcluder for Vec { - fn excluded_datanode_ids(&self) -> &Vec { - self - } -} diff --git a/src/meta-srv/src/table_meta_alloc.rs b/src/meta-srv/src/peer.rs similarity index 70% rename from src/meta-srv/src/table_meta_alloc.rs rename to src/meta-srv/src/peer.rs index 250fac63c2..f7ea784548 100644 --- a/src/meta-srv/src/table_meta_alloc.rs +++ b/src/meta-srv/src/peer.rs @@ -16,25 +16,36 @@ use std::collections::HashSet; use async_trait::async_trait; use common_error::ext::BoxedError; -use common_meta::ddl::table_meta::PeerAllocator; use common_meta::error::{ExternalSnafu, Result as MetaResult}; -use common_meta::peer::Peer; +use common_meta::peer::{Peer, PeerAllocator}; use snafu::{ResultExt, ensure}; -use store_api::storage::MAX_REGION_SEQ; -use crate::error::{self, Result, TooManyPartitionsSnafu}; -use crate::metasrv::{SelectTarget, SelectorContext, SelectorRef}; +use crate::error::{Result, TooManyPartitionsSnafu}; +use crate::metasrv::{SelectorContext, SelectorRef}; use crate::selector::SelectorOptions; pub struct MetasrvPeerAllocator { ctx: SelectorContext, selector: SelectorRef, + max_items: Option, } impl MetasrvPeerAllocator { /// Creates a new [`MetasrvPeerAllocator`] with the given [`SelectorContext`] and [`SelectorRef`]. pub fn new(ctx: SelectorContext, selector: SelectorRef) -> Self { - Self { ctx, selector } + Self { + ctx, + selector, + max_items: None, + } + } + + pub fn with_max_items(self, max_items: u32) -> Self { + Self { + ctx: self.ctx, + selector: self.selector, + max_items: Some(max_items), + } } /// Allocates a specified number (by `regions`) of [`Peer`] instances based on the number of @@ -43,33 +54,24 @@ impl MetasrvPeerAllocator { /// This method is mainly a wrapper around the [`SelectorRef`]::`select` method. There is /// no guarantee that how the returned peers are used, like whether they are from the same /// table or not. So this method isn't idempotent. - async fn alloc(&self, regions: usize) -> Result> { - ensure!(regions <= MAX_REGION_SEQ as usize, TooManyPartitionsSnafu); + async fn alloc(&self, min_required_items: usize) -> Result> { + if let Some(max_items) = self.max_items { + ensure!( + min_required_items <= max_items as usize, + TooManyPartitionsSnafu + ); + } - let mut peers = self - .selector + self.selector .select( &self.ctx, SelectorOptions { - min_required_items: regions, + min_required_items, allow_duplication: true, exclude_peer_ids: HashSet::new(), }, ) - .await?; - - ensure!( - peers.len() >= regions, - error::NoEnoughAvailableNodeSnafu { - required: regions, - available: peers.len(), - select_target: SelectTarget::Datanode - } - ); - - peers.truncate(regions); - - Ok(peers) + .await } } diff --git a/src/meta-srv/src/procedure/region_migration.rs b/src/meta-srv/src/procedure/region_migration.rs index 2e4ce0c043..cfef9158ef 100644 --- a/src/meta-srv/src/procedure/region_migration.rs +++ b/src/meta-srv/src/procedure/region_migration.rs @@ -40,7 +40,7 @@ use common_meta::key::table_info::TableInfoValue; use common_meta::key::table_route::TableRouteValue; use common_meta::key::topic_region::{ReplayCheckpoint, TopicRegionKey}; use common_meta::key::{DeserializedValueWithBytes, TableMetadataManagerRef}; -use common_meta::kv_backend::ResettableKvBackendRef; +use common_meta::kv_backend::{KvBackendRef, ResettableKvBackendRef}; use common_meta::lock_key::{CatalogLock, RegionLock, SchemaLock}; use common_meta::peer::Peer; use common_meta::region_keeper::{MemoryRegionKeeperRef, OperatingRegionGuard}; @@ -336,7 +336,7 @@ impl ContextFactory for DefaultContextFactory { pub struct Context { persistent_ctx: Arc, volatile_ctx: VolatileContext, - in_memory: ResettableKvBackendRef, + in_memory: KvBackendRef, table_metadata_manager: TableMetadataManagerRef, opening_region_keeper: MemoryRegionKeeperRef, region_failure_detector_controller: RegionFailureDetectorControllerRef, diff --git a/src/meta-srv/src/procedure/region_migration/downgrade_leader_region.rs b/src/meta-srv/src/procedure/region_migration/downgrade_leader_region.rs index 6a23cc92d1..ad805ae680 100644 --- a/src/meta-srv/src/procedure/region_migration/downgrade_leader_region.rs +++ b/src/meta-srv/src/procedure/region_migration/downgrade_leader_region.rs @@ -28,9 +28,9 @@ use serde::{Deserialize, Serialize}; use snafu::{OptionExt, ResultExt}; use tokio::time::{Instant, sleep}; +use crate::discovery::utils::find_datanode_lease_value; use crate::error::{self, Result}; use crate::handler::HeartbeatMailbox; -use crate::lease::find_datanode_lease_value; use crate::procedure::region_migration::update_metadata::UpdateMetadata; use crate::procedure::region_migration::upgrade_candidate_region::UpgradeCandidateRegion; use crate::procedure::region_migration::{Context, State}; @@ -241,7 +241,7 @@ impl DowngradeLeaderRegion { async fn update_leader_region_lease_deadline(&self, ctx: &mut Context) { let leader = &ctx.persistent_ctx.from_peer; - let last_connection_at = match find_datanode_lease_value(leader.id, &ctx.in_memory).await { + let last_connection_at = match find_datanode_lease_value(&ctx.in_memory, leader.id).await { Ok(lease_value) => lease_value.map(|lease_value| lease_value.timestamp_millis), Err(err) => { error!(err; "Failed to find datanode lease value for datanode: {}, during region migration, region: {}", leader, ctx.persistent_ctx.region_id); diff --git a/src/meta-srv/src/region/supervisor.rs b/src/meta-srv/src/region/supervisor.rs index 719bded6e8..48cc7ae092 100644 --- a/src/meta-srv/src/region/supervisor.rs +++ b/src/meta-srv/src/region/supervisor.rs @@ -26,7 +26,7 @@ use common_meta::key::table_route::{TableRouteKey, TableRouteValue}; use common_meta::key::{MetadataKey, MetadataValue}; use common_meta::kv_backend::KvBackendRef; use common_meta::leadership_notifier::LeadershipChangeListener; -use common_meta::peer::{Peer, PeerLookupServiceRef}; +use common_meta::peer::{Peer, PeerResolverRef}; use common_meta::range_stream::{DEFAULT_PAGE_SIZE, PaginationStream}; use common_meta::rpc::store::RangeRequest; use common_runtime::JoinHandle; @@ -285,8 +285,8 @@ pub struct RegionSupervisor { region_migration_manager: RegionMigrationManagerRef, /// The maintenance mode manager. runtime_switch_manager: RuntimeSwitchManagerRef, - /// Peer lookup service - peer_lookup: PeerLookupServiceRef, + /// Peer resolver + peer_resolver: PeerResolverRef, /// The kv backend. kv_backend: KvBackendRef, } @@ -359,7 +359,7 @@ impl RegionSupervisor { selector: RegionSupervisorSelector, region_migration_manager: RegionMigrationManagerRef, runtime_switch_manager: RuntimeSwitchManagerRef, - peer_lookup: PeerLookupServiceRef, + peer_resolver: PeerResolverRef, kv_backend: KvBackendRef, ) -> Self { Self { @@ -370,7 +370,7 @@ impl RegionSupervisor { selector, region_migration_manager, runtime_switch_manager, - peer_lookup, + peer_resolver, kv_backend, } } @@ -633,7 +633,7 @@ impl RegionSupervisor { ) -> Result> { let mut tasks = Vec::with_capacity(regions.len()); let from_peer = self - .peer_lookup + .peer_resolver .datanode(from_peer_id) .await .ok() @@ -778,7 +778,7 @@ pub(crate) mod tests { use common_meta::key::{TableMetadataManager, runtime_switch}; use common_meta::peer::Peer; use common_meta::rpc::router::{Region, RegionRoute}; - use common_meta::test_util::NoopPeerLookupService; + use common_meta::test_util::NoopPeerResolver; use common_telemetry::info; use common_time::util::current_time_millis; use rand::Rng; @@ -807,7 +807,7 @@ pub(crate) mod tests { )); let runtime_switch_manager = Arc::new(runtime_switch::RuntimeSwitchManager::new(env.kv_backend())); - let peer_lookup = Arc::new(NoopPeerLookupService); + let peer_resolver = Arc::new(NoopPeerResolver); let (tx, rx) = RegionSupervisor::channel(); let kv_backend = env.kv_backend(); @@ -819,7 +819,7 @@ pub(crate) mod tests { RegionSupervisorSelector::NaiveSelector(selector), region_migration_manager, runtime_switch_manager, - peer_lookup, + peer_resolver, kv_backend, ), tx, diff --git a/src/meta-srv/src/selector/lease_based.rs b/src/meta-srv/src/selector/lease_based.rs index 5ddf3881d6..c91aeeee04 100644 --- a/src/meta-srv/src/selector/lease_based.rs +++ b/src/meta-srv/src/selector/lease_based.rs @@ -12,38 +12,19 @@ // See the License for the specific language governing permissions and // limitations under the License. -use std::collections::HashSet; -use std::sync::Arc; -use std::time::Duration; - use common_meta::peer::Peer; +use snafu::ResultExt; -use crate::error::Result; -use crate::lease; +use crate::discovery::utils::is_datanode_accept_ingest_workload; +use crate::error::{ListActiveDatanodesSnafu, Result}; use crate::metasrv::SelectorContext; -use crate::node_excluder::NodeExcluderRef; use crate::selector::common::{choose_items, filter_out_excluded_peers}; use crate::selector::weighted_choose::{RandomWeightedChoose, WeightedItem}; use crate::selector::{Selector, SelectorOptions}; /// Select all alive datanodes based using a random weighted choose. -pub struct LeaseBasedSelector { - node_excluder: NodeExcluderRef, -} - -impl LeaseBasedSelector { - pub fn new(node_excluder: NodeExcluderRef) -> Self { - Self { node_excluder } - } -} - -impl Default for LeaseBasedSelector { - fn default() -> Self { - Self { - node_excluder: Arc::new(Vec::new()), - } - } -} +#[derive(Default)] +pub struct LeaseBasedSelector; #[async_trait::async_trait] impl Selector for LeaseBasedSelector { @@ -52,34 +33,23 @@ impl Selector for LeaseBasedSelector { async fn select(&self, ctx: &Self::Context, opts: SelectorOptions) -> Result { // 1. get alive datanodes. - let lease_kvs = lease::alive_datanodes( - &ctx.meta_peer_client, - Duration::from_secs(ctx.datanode_lease_secs), - ) - .with_condition(lease::is_datanode_accept_ingest_workload) - .await?; + let alive_datanodes = ctx + .peer_discovery + .active_datanodes(Some(is_datanode_accept_ingest_workload)) + .await + .context(ListActiveDatanodesSnafu)?; // 2. compute weight array, but the weight of each item is the same. - let mut weight_array = lease_kvs + let mut weight_array = alive_datanodes .into_iter() - .map(|(k, v)| WeightedItem { - item: Peer { - id: k.node_id, - addr: v.node_addr.clone(), - }, + .map(|p| WeightedItem { + item: p, weight: 1.0, }) .collect(); // 3. choose peers by weight_array. - let mut exclude_peer_ids = self - .node_excluder - .excluded_datanode_ids() - .iter() - .cloned() - .collect::>(); - exclude_peer_ids.extend(opts.exclude_peer_ids.iter()); - filter_out_excluded_peers(&mut weight_array, &exclude_peer_ids); + filter_out_excluded_peers(&mut weight_array, &opts.exclude_peer_ids); let mut weighted_choose = RandomWeightedChoose::new(weight_array); let selected = choose_items(&opts, &mut weighted_choose)?; diff --git a/src/meta-srv/src/selector/load_based.rs b/src/meta-srv/src/selector/load_based.rs index b00e3a9461..b08e52ede8 100644 --- a/src/meta-srv/src/selector/load_based.rs +++ b/src/meta-srv/src/selector/load_based.rs @@ -12,47 +12,32 @@ // See the License for the specific language governing permissions and // limitations under the License. -use std::collections::{HashMap, HashSet}; -use std::sync::Arc; -use std::time::Duration; +use std::collections::HashMap; use common_meta::datanode::{DatanodeStatKey, DatanodeStatValue}; -use common_meta::key::TableMetadataManager; use common_meta::peer::Peer; -use common_meta::rpc::router::find_leaders; -use common_telemetry::{debug, info}; +use common_telemetry::debug; use snafu::ResultExt; -use table::metadata::TableId; -use crate::error::{self, Result}; -use crate::key::{DatanodeLeaseKey, LeaseValue}; -use crate::lease; +use crate::cluster::MetaPeerClientRef; +use crate::discovery::utils::is_datanode_accept_ingest_workload; +use crate::error::{ListActiveDatanodesSnafu, Result}; use crate::metasrv::SelectorContext; -use crate::node_excluder::NodeExcluderRef; use crate::selector::common::{choose_items, filter_out_excluded_peers}; -use crate::selector::weight_compute::{RegionNumsBasedWeightCompute, WeightCompute}; +use crate::selector::weight_compute::WeightCompute; use crate::selector::weighted_choose::RandomWeightedChoose; use crate::selector::{Selector, SelectorOptions}; pub struct LoadBasedSelector { weight_compute: C, - node_excluder: NodeExcluderRef, + meta_peer_client: MetaPeerClientRef, } impl LoadBasedSelector { - pub fn new(weight_compute: C, node_excluder: NodeExcluderRef) -> Self { + pub fn new(weight_compute: C, meta_peer_client: MetaPeerClientRef) -> Self { Self { weight_compute, - node_excluder, - } - } -} - -impl Default for LoadBasedSelector { - fn default() -> Self { - Self { - weight_compute: RegionNumsBasedWeightCompute, - node_excluder: Arc::new(Vec::new()), + meta_peer_client, } } } @@ -67,50 +52,28 @@ where async fn select(&self, ctx: &Self::Context, opts: SelectorOptions) -> Result { // 1. get alive datanodes. - let lease_kvs = lease::alive_datanodes( - &ctx.meta_peer_client, - Duration::from_secs(ctx.datanode_lease_secs), - ) - .with_condition(lease::is_datanode_accept_ingest_workload) - .await?; + let alive_datanodes = ctx + .peer_discovery + .active_datanodes(Some(is_datanode_accept_ingest_workload)) + .await + .context(ListActiveDatanodesSnafu)?; // 2. get stat kvs and filter out expired datanodes. - let stat_keys = lease_kvs.keys().map(|k| k.into()).collect(); + let stat_keys = alive_datanodes + .iter() + .map(|k| DatanodeStatKey { node_id: k.id }) + .collect(); let stat_kvs = filter_out_expired_datanode( - ctx.meta_peer_client.get_dn_stat_kvs(stat_keys).await?, - &lease_kvs, + self.meta_peer_client.get_dn_stat_kvs(stat_keys).await?, + &alive_datanodes, ); - // 3. try to make the regions of a table distributed on different datanodes as much as possible. - let stat_kvs = if let Some(table_id) = ctx.table_id { - let table_metadata_manager = TableMetadataManager::new(ctx.kv_backend.clone()); - let leader_peer_ids = get_leader_peer_ids(&table_metadata_manager, table_id).await?; - let filter_result = filter_out_datanode_by_table(&stat_kvs, &leader_peer_ids); - if filter_result.is_empty() { - info!( - "The regions of the table cannot be allocated to completely different datanodes, table id: {}.", - table_id - ); - stat_kvs - } else { - filter_result - } - } else { - stat_kvs - }; - - // 4. compute weight array. + // 3. compute weight array. let mut weight_array = self.weight_compute.compute(&stat_kvs); + // 4. filter out excluded peers. + filter_out_excluded_peers(&mut weight_array, &opts.exclude_peer_ids); // 5. choose peers by weight_array. - let mut exclude_peer_ids = self - .node_excluder - .excluded_datanode_ids() - .iter() - .cloned() - .collect::>(); - exclude_peer_ids.extend(opts.exclude_peer_ids.iter()); - filter_out_excluded_peers(&mut weight_array, &exclude_peer_ids); let mut weighted_choose = RandomWeightedChoose::new(weight_array); let selected = choose_items(&opts, &mut weighted_choose)?; @@ -125,61 +88,21 @@ where fn filter_out_expired_datanode( mut stat_kvs: HashMap, - lease_kvs: &HashMap, + datanodes: &[Peer], ) -> HashMap { - lease_kvs + datanodes .iter() - .filter_map(|(lease_k, _)| stat_kvs.remove_entry(&lease_k.into())) + .filter_map(|p| stat_kvs.remove_entry(&DatanodeStatKey { node_id: p.id })) .collect() } -fn filter_out_datanode_by_table( - stat_kvs: &HashMap, - leader_peer_ids: &[u64], -) -> HashMap { - stat_kvs - .iter() - .filter(|(stat_k, _)| leader_peer_ids.contains(&stat_k.node_id)) - .map(|(stat_k, stat_v)| (*stat_k, stat_v.clone())) - .collect() -} - -async fn get_leader_peer_ids( - table_metadata_manager: &TableMetadataManager, - table_id: TableId, -) -> Result> { - table_metadata_manager - .table_route_manager() - .table_route_storage() - .get(table_id) - .await - .context(error::TableMetadataManagerSnafu) - .map(|route| { - route.map_or_else( - || Ok(Vec::new()), - |route| { - let region_routes = route - .region_routes() - .context(error::UnexpectedLogicalRouteTableSnafu { err_msg: "" })?; - Ok(find_leaders(region_routes) - .into_iter() - .map(|peer| peer.id) - .collect()) - }, - ) - })? -} - #[cfg(test)] mod tests { use std::collections::HashMap; - use api::v1::meta::DatanodeWorkloads; - use api::v1::meta::heartbeat_request::NodeWorkloads; use common_meta::datanode::{DatanodeStatKey, DatanodeStatValue}; - use common_workload::DatanodeWorkloadType; + use common_meta::peer::Peer; - use crate::key::{DatanodeLeaseKey, LeaseValue}; use crate::selector::load_based::filter_out_expired_datanode; #[test] @@ -198,18 +121,7 @@ mod tests { DatanodeStatValue { stats: vec![] }, ); - let mut lease_kvs = HashMap::new(); - lease_kvs.insert( - DatanodeLeaseKey { node_id: 1 }, - LeaseValue { - timestamp_millis: 0, - node_addr: "127.0.0.1:3002".to_string(), - workloads: NodeWorkloads::Datanode(DatanodeWorkloads { - types: vec![DatanodeWorkloadType::Hybrid.to_i32()], - }), - }, - ); - + let lease_kvs = vec![Peer::new(1, "127.0.0.1:3002".to_string())]; let alive_stat_kvs = filter_out_expired_datanode(stat_kvs, &lease_kvs); assert_eq!(1, alive_stat_kvs.len()); diff --git a/src/meta-srv/src/selector/round_robin.rs b/src/meta-srv/src/selector/round_robin.rs index 801301659f..853173b874 100644 --- a/src/meta-srv/src/selector/round_robin.rs +++ b/src/meta-srv/src/selector/round_robin.rs @@ -12,18 +12,16 @@ // See the License for the specific language governing permissions and // limitations under the License. -use std::collections::HashSet; -use std::sync::Arc; use std::sync::atomic::AtomicUsize; -use std::time::Duration; use common_meta::peer::Peer; -use snafu::ensure; +use snafu::{ResultExt, ensure}; -use crate::error::{NoEnoughAvailableNodeSnafu, Result}; -use crate::lease; +use crate::discovery::utils::is_datanode_accept_ingest_workload; +use crate::error::{ + ListActiveDatanodesSnafu, ListActiveFlownodesSnafu, NoEnoughAvailableNodeSnafu, Result, +}; use crate::metasrv::{SelectTarget, SelectorContext}; -use crate::node_excluder::NodeExcluderRef; use crate::selector::{Selector, SelectorOptions}; /// Round-robin selector that returns the next peer in the list in sequence. @@ -36,7 +34,6 @@ use crate::selector::{Selector, SelectorOptions}; pub struct RoundRobinSelector { select_target: SelectTarget, counter: AtomicUsize, - node_excluder: NodeExcluderRef, } impl Default for RoundRobinSelector { @@ -44,16 +41,14 @@ impl Default for RoundRobinSelector { Self { select_target: SelectTarget::Datanode, counter: AtomicUsize::new(0), - node_excluder: Arc::new(Vec::new()), } } } impl RoundRobinSelector { - pub fn new(select_target: SelectTarget, node_excluder: NodeExcluderRef) -> Self { + pub fn new(select_target: SelectTarget) -> Self { Self { select_target, - node_excluder, ..Default::default() } } @@ -62,41 +57,23 @@ impl RoundRobinSelector { let mut peers = match self.select_target { SelectTarget::Datanode => { // 1. get alive datanodes. - let lease_kvs = lease::alive_datanodes( - &ctx.meta_peer_client, - Duration::from_secs(ctx.datanode_lease_secs), - ) - .with_condition(lease::is_datanode_accept_ingest_workload) - .await?; + let alive_datanodes = ctx + .peer_discovery + .active_datanodes(Some(is_datanode_accept_ingest_workload)) + .await + .context(ListActiveDatanodesSnafu)?; - let mut exclude_peer_ids = self - .node_excluder - .excluded_datanode_ids() - .iter() - .cloned() - .collect::>(); - exclude_peer_ids.extend(opts.exclude_peer_ids.iter()); - // 2. map into peers - lease_kvs + // 2. filter out excluded datanodes. + alive_datanodes .into_iter() - .filter(|(k, _)| !exclude_peer_ids.contains(&k.node_id)) - .map(|(k, v)| Peer::new(k.node_id, v.node_addr)) - .collect::>() - } - SelectTarget::Flownode => { - // 1. get alive flownodes. - let lease_kvs = lease::alive_flownodes( - &ctx.meta_peer_client, - Duration::from_secs(ctx.flownode_lease_secs), - ) - .await?; - - // 2. map into peers - lease_kvs - .into_iter() - .map(|(k, v)| Peer::new(k.node_id, v.node_addr)) + .filter(|p| !opts.exclude_peer_ids.contains(&p.id)) .collect::>() } + SelectTarget::Flownode => ctx + .peer_discovery + .active_flownodes(None) + .await + .context(ListActiveFlownodesSnafu)?, }; ensure!( @@ -141,12 +118,15 @@ mod test { use std::collections::HashSet; use super::*; - use crate::test_util::{create_selector_context, put_datanodes}; + use crate::test_util::{create_meta_peer_client, put_datanodes}; #[tokio::test] async fn test_round_robin_selector() { let selector = RoundRobinSelector::default(); - let ctx = create_selector_context(); + let meta_peer_client = create_meta_peer_client(); + let ctx = SelectorContext { + peer_discovery: meta_peer_client.clone(), + }; // add three nodes let peer1 = Peer { id: 2, @@ -161,7 +141,7 @@ mod test { addr: "node3".to_string(), }; let peers = vec![peer1.clone(), peer2.clone(), peer3.clone()]; - put_datanodes(&ctx.meta_peer_client, peers).await; + put_datanodes(&meta_peer_client, peers).await; let peers = selector .select( @@ -197,8 +177,11 @@ mod test { #[tokio::test] async fn test_round_robin_selector_with_exclude_peer_ids() { - let selector = RoundRobinSelector::new(SelectTarget::Datanode, Arc::new(vec![5])); - let ctx = create_selector_context(); + let selector = RoundRobinSelector::new(SelectTarget::Datanode); + let meta_peer_client = create_meta_peer_client(); + let ctx = SelectorContext { + peer_discovery: meta_peer_client.clone(), + }; // add three nodes let peer1 = Peer { id: 2, @@ -213,7 +196,7 @@ mod test { addr: "node3".to_string(), }; put_datanodes( - &ctx.meta_peer_client, + &meta_peer_client, vec![peer1.clone(), peer2.clone(), peer3.clone()], ) .await; @@ -224,7 +207,7 @@ mod test { SelectorOptions { min_required_items: 1, allow_duplication: true, - exclude_peer_ids: HashSet::from([2]), + exclude_peer_ids: HashSet::from([2, 5]), }, ) .await diff --git a/src/meta-srv/src/selector/test_utils.rs b/src/meta-srv/src/selector/test_utils.rs index 966b6733e0..6a992e967f 100644 --- a/src/meta-srv/src/selector/test_utils.rs +++ b/src/meta-srv/src/selector/test_utils.rs @@ -14,7 +14,6 @@ use std::sync::Arc; -use common_meta::distributed_time_constants::{FLOWNODE_LEASE_SECS, REGION_LEASE_SECS}; use common_meta::kv_backend::memory::MemoryKvBackend; use common_meta::peer::Peer; use rand::prelude::SliceRandom; @@ -35,12 +34,7 @@ pub fn new_test_selector_context() -> SelectorContext { .unwrap(); SelectorContext { - server_addr: "127.0.0.1:3002".to_string(), - datanode_lease_secs: REGION_LEASE_SECS, - flownode_lease_secs: FLOWNODE_LEASE_SECS, - kv_backend, - meta_peer_client, - table_id: None, + peer_discovery: meta_peer_client, } } diff --git a/src/meta-srv/src/service/admin/node_lease.rs b/src/meta-srv/src/service/admin/node_lease.rs index 0e3ff73919..f49de2b476 100644 --- a/src/meta-srv/src/service/admin/node_lease.rs +++ b/src/meta-srv/src/service/admin/node_lease.rs @@ -13,7 +13,6 @@ // limitations under the License. use std::collections::HashMap; -use std::time::Duration; use axum::Json; use axum::extract::State; @@ -23,9 +22,9 @@ use snafu::ResultExt; use tonic::codegen::http; use crate::cluster::MetaPeerClientRef; +use crate::discovery::lease::{LeaseValueAccessor, LeaseValueType}; use crate::error::{self, Result}; use crate::key::{DatanodeLeaseKey, LeaseValue}; -use crate::lease; use crate::service::admin::HttpHandler; use crate::service::admin::util::ErrorHandler; @@ -36,12 +35,17 @@ pub struct NodeLeaseHandler { impl NodeLeaseHandler { async fn get_node_lease(&self) -> Result { - let leases = - lease::alive_datanodes(&self.meta_peer_client, Duration::from_secs(u64::MAX)).await?; + let leases = self + .meta_peer_client + .lease_values(LeaseValueType::Datanode) + .await? + .into_iter() + .collect::>(); + let leases = leases .into_iter() .map(|(k, v)| HumanLease { - name: k, + name: DatanodeLeaseKey { node_id: k }, human_time: common_time::Timestamp::new_millisecond(v.timestamp_millis) .to_local_string(), lease: v, diff --git a/src/meta-srv/src/test_util.rs b/src/meta-srv/src/test_util.rs index 01be0fd63b..20ba6e2091 100644 --- a/src/meta-srv/src/test_util.rs +++ b/src/meta-srv/src/test_util.rs @@ -24,7 +24,6 @@ use common_workload::DatanodeWorkloadType; use crate::cluster::{MetaPeerClientBuilder, MetaPeerClientRef}; use crate::key::{DatanodeLeaseKey, LeaseValue}; -use crate::metasrv::SelectorContext; pub(crate) fn new_region_route(region_id: u64, peers: &[Peer], leader_node: u64) -> RegionRoute { let region = Region { @@ -54,22 +53,6 @@ pub(crate) fn create_meta_peer_client() -> MetaPeerClientRef { .unwrap() } -/// Builds and returns a [`SelectorContext`]. To access its inner state, -/// use `memory_backend` on [`MetaPeerClientRef`]. -pub(crate) fn create_selector_context() -> SelectorContext { - let meta_peer_client = create_meta_peer_client(); - let in_memory = meta_peer_client.memory_backend(); - - SelectorContext { - datanode_lease_secs: 10, - flownode_lease_secs: 10, - server_addr: "127.0.0.1:3002".to_string(), - kv_backend: in_memory, - meta_peer_client, - table_id: None, - } -} - pub(crate) async fn put_datanodes(meta_peer_client: &MetaPeerClientRef, datanodes: Vec) { let backend = meta_peer_client.memory_backend(); for datanode in datanodes { diff --git a/src/meta-srv/src/utils/insert_forwarder.rs b/src/meta-srv/src/utils/insert_forwarder.rs index 6b6353de57..78c7c454d4 100644 --- a/src/meta-srv/src/utils/insert_forwarder.rs +++ b/src/meta-srv/src/utils/insert_forwarder.rs @@ -19,28 +19,28 @@ use client::error::{ExternalSnafu, Result as ClientResult}; use client::inserter::{Context, InsertOptions, Inserter}; use client::{Client, Database}; use common_error::ext::BoxedError; -use common_meta::peer::PeerLookupServiceRef; +use common_meta::peer::PeerDiscoveryRef; use common_telemetry::{debug, warn}; use snafu::{ResultExt, ensure}; use tokio::sync::RwLock; -use crate::error::{LookupFrontendsSnafu, NoAvailableFrontendSnafu}; +use crate::error::{ListActiveFrontendsSnafu, NoAvailableFrontendSnafu}; pub type InsertForwarderRef = Arc; /// [`InsertForwarder`] is the inserter for the metasrv. /// It forwards insert requests to available frontend instances. pub struct InsertForwarder { - peer_lookup_service: PeerLookupServiceRef, + peer_discovery: PeerDiscoveryRef, client: RwLock>, options: Option, } impl InsertForwarder { /// Creates a new InsertForwarder with the given peer lookup service. - pub fn new(peer_lookup_service: PeerLookupServiceRef, options: Option) -> Self { + pub fn new(peer_discovery: PeerDiscoveryRef, options: Option) -> Self { Self { - peer_lookup_service, + peer_discovery, client: RwLock::new(None), options, } @@ -49,10 +49,10 @@ impl InsertForwarder { /// Builds a new client. async fn build_client(&self) -> crate::error::Result { let frontends = self - .peer_lookup_service + .peer_discovery .active_frontends() .await - .context(LookupFrontendsSnafu)?; + .context(ListActiveFrontendsSnafu)?; ensure!(!frontends.is_empty(), NoAvailableFrontendSnafu); diff --git a/tests-integration/src/cluster.rs b/tests-integration/src/cluster.rs index 1676d40e99..6be94cbcd4 100644 --- a/tests-integration/src/cluster.rs +++ b/tests-integration/src/cluster.rs @@ -56,6 +56,7 @@ use frontend::server::Services; use hyper_util::rt::TokioIo; use meta_client::client::MetaClientBuilder; use meta_srv::cluster::MetaPeerClientRef; +use meta_srv::discovery; use meta_srv::metasrv::{Metasrv, MetasrvOptions, SelectorRef}; use meta_srv::mocks::MockInfo; use object_store::config::ObjectStoreConfig; @@ -316,11 +317,14 @@ impl GreptimeDbClusterBuilder { expected_datanodes: usize, ) { for _ in 0..100 { - let alive_datanodes = - meta_srv::lease::alive_datanodes(meta_peer_client, Duration::from_secs(u64::MAX)) - .await - .unwrap() - .len(); + let alive_datanodes = discovery::utils::alive_datanodes( + meta_peer_client.as_ref(), + Duration::from_secs(u64::MAX), + None, + ) + .await + .unwrap() + .len(); if alive_datanodes == expected_datanodes { return; }