refactor: refactor PeerLookupService and simplify Selector implementations (#6939)

* refactor: move `lease` into `discovery` dir

Signed-off-by: WenyXu <wenymedia@gmail.com>

* feat: introduce discovery components

Signed-off-by: WenyXu <wenymedia@gmail.com>

* refactor:  simplify selector

Signed-off-by: WenyXu <wenymedia@gmail.com>

* refactor: remove  duplicate peer allocator trait

Signed-off-by: WenyXu <wenymedia@gmail.com>

* chore: apply suggestions

Signed-off-by: WenyXu <wenymedia@gmail.com>

* refactor: minor refactor

Signed-off-by: WenyXu <wenymedia@gmail.com>

* chore: refine comments

Signed-off-by: WenyXu <wenymedia@gmail.com>

---------

Signed-off-by: WenyXu <wenymedia@gmail.com>
This commit is contained in:
Weny Xu
2025-09-10 11:43:46 +08:00
committed by GitHub
parent 2f6da3b718
commit e0ce0a6446
31 changed files with 976 additions and 1059 deletions

View File

@@ -14,11 +14,9 @@
use std::sync::Arc;
use tonic::async_trait;
use crate::error::Result;
use crate::key::FlowId;
use crate::peer::Peer;
use crate::peer::{NoopPeerAllocator, Peer, PeerAllocatorRef};
use crate::sequence::SequenceRef;
/// The reference of [FlowMetadataAllocator].
@@ -30,25 +28,25 @@ pub type FlowMetadataAllocatorRef = Arc<FlowMetadataAllocator>;
#[derive(Clone)]
pub struct FlowMetadataAllocator {
flow_id_sequence: SequenceRef,
partition_peer_allocator: PartitionPeerAllocatorRef,
peer_allocator: PeerAllocatorRef,
}
impl FlowMetadataAllocator {
/// Returns the [FlowMetadataAllocator] with [NoopPartitionPeerAllocator].
/// Returns the [FlowMetadataAllocator] with [NoopPeerAllocator].
pub fn with_noop_peer_allocator(flow_id_sequence: SequenceRef) -> Self {
Self {
flow_id_sequence,
partition_peer_allocator: Arc::new(NoopPartitionPeerAllocator),
peer_allocator: Arc::new(NoopPeerAllocator),
}
}
pub fn with_peer_allocator(
flow_id_sequence: SequenceRef,
peer_allocator: Arc<dyn PartitionPeerAllocator>,
peer_allocator: PeerAllocatorRef,
) -> Self {
Self {
flow_id_sequence,
partition_peer_allocator: peer_allocator,
peer_allocator,
}
}
@@ -61,27 +59,8 @@ impl FlowMetadataAllocator {
/// Allocates the [FlowId] and [Peer]s.
pub async fn create(&self, partitions: usize) -> Result<(FlowId, Vec<Peer>)> {
let flow_id = self.allocate_flow_id().await?;
let peers = self.partition_peer_allocator.alloc(partitions).await?;
let peers = self.peer_allocator.alloc(partitions).await?;
Ok((flow_id, peers))
}
}
/// Allocates [Peer]s for partitions.
#[async_trait]
pub trait PartitionPeerAllocator: Send + Sync {
/// Allocates [Peer] nodes for storing partitions.
async fn alloc(&self, partitions: usize) -> Result<Vec<Peer>>;
}
/// [PartitionPeerAllocatorRef] allocates [Peer]s for partitions.
pub type PartitionPeerAllocatorRef = Arc<dyn PartitionPeerAllocator>;
struct NoopPartitionPeerAllocator;
#[async_trait]
impl PartitionPeerAllocator for NoopPartitionPeerAllocator {
async fn alloc(&self, partitions: usize) -> Result<Vec<Peer>> {
Ok(vec![Peer::default(); partitions])
}
}

View File

@@ -15,7 +15,6 @@
use std::collections::HashMap;
use std::sync::Arc;
use async_trait::async_trait;
use common_telemetry::{debug, info};
use snafu::ensure;
use store_api::storage::{RegionId, RegionNumber, TableId};
@@ -23,7 +22,7 @@ use store_api::storage::{RegionId, RegionNumber, TableId};
use crate::ddl::TableMetadata;
use crate::error::{Result, UnsupportedSnafu};
use crate::key::table_route::PhysicalTableRouteValue;
use crate::peer::Peer;
use crate::peer::{NoopPeerAllocator, PeerAllocatorRef};
use crate::rpc::ddl::CreateTableTask;
use crate::rpc::router::{Region, RegionRoute};
use crate::sequence::SequenceRef;
@@ -186,21 +185,3 @@ impl TableMetadataAllocator {
self.table_id_sequence.clone()
}
}
pub type PeerAllocatorRef = Arc<dyn PeerAllocator>;
/// [`PeerAllocator`] allocates [`Peer`]s for creating regions.
#[async_trait]
pub trait PeerAllocator: Send + Sync {
/// Allocates `regions` size [`Peer`]s.
async fn alloc(&self, regions: usize) -> Result<Vec<Peer>>;
}
struct NoopPeerAllocator;
#[async_trait]
impl PeerAllocator for NoopPeerAllocator {
async fn alloc(&self, regions: usize) -> Result<Vec<Peer>> {
Ok(vec![Peer::default(); regions])
}
}

View File

@@ -15,21 +15,77 @@
use std::sync::Arc;
pub use api::v1::meta::Peer;
use api::v1::meta::heartbeat_request::NodeWorkloads;
use crate::error::Error;
use crate::{DatanodeId, FlownodeId};
/// PeerLookupService is a service that can lookup peers.
/// [`PeerResolver`] provides methods to look up peer information by node ID.
///
/// This trait allows resolving both datanode and flownode peers, regardless of their current activity status.
/// Returned peers may be inactive (i.e., not currently alive in the cluster).
#[async_trait::async_trait]
pub trait PeerLookupService {
/// Returns the datanode with the given id. It may return inactive peers.
pub trait PeerResolver: Send + Sync {
/// Looks up a datanode peer by its ID.
///
/// Returns `Some(Peer)` if the datanode exists, or `None` if not found.
/// The returned peer may be inactive.
async fn datanode(&self, id: DatanodeId) -> Result<Option<Peer>, Error>;
/// Returns the flownode with the given id. It may return inactive peers.
/// Looks up a flownode peer by its ID.
///
/// Returns `Some(Peer)` if the flownode exists, or `None` if not found.
/// The returned peer may be inactive.
async fn flownode(&self, id: FlownodeId) -> Result<Option<Peer>, Error>;
/// Returns all currently active frontend nodes that have reported a heartbeat within the most recent heartbeat interval from the in-memory backend.
async fn active_frontends(&self) -> Result<Vec<Peer>, Error>;
}
pub type PeerLookupServiceRef = Arc<dyn PeerLookupService + Send + Sync>;
pub type PeerResolverRef = Arc<dyn PeerResolver>;
/// [`PeerDiscovery`] is a service for discovering active peers in the cluster.
#[async_trait::async_trait]
pub trait PeerDiscovery: Send + Sync {
/// Returns all currently active frontend nodes.
///
/// A frontend is considered active if it has reported a heartbeat within the most recent heartbeat interval,
/// as determined by the in-memory backend.
async fn active_frontends(&self) -> Result<Vec<Peer>, Error>;
/// Returns all currently active datanodes, optionally filtered by a predicate on their workloads.
///
/// A datanode is considered active if it has reported a heartbeat within the most recent heartbeat interval,
/// as determined by the in-memory backend.
/// The optional `filter` allows further selection based on the node's workloads.
async fn active_datanodes(
&self,
filter: Option<for<'a> fn(&'a NodeWorkloads) -> bool>,
) -> Result<Vec<Peer>, Error>;
/// Returns all currently active flownodes, optionally filtered by a predicate on their workloads.
///
/// A flownode is considered active if it has reported a heartbeat within the most recent heartbeat interval,
/// as determined by the in-memory backend.
/// The optional `filter` allows further selection based on the node's workloads.
async fn active_flownodes(
&self,
filter: Option<for<'a> fn(&'a NodeWorkloads) -> bool>,
) -> Result<Vec<Peer>, Error>;
}
pub type PeerDiscoveryRef = Arc<dyn PeerDiscovery>;
/// [`PeerAllocator`] allocates [`Peer`]s for creating region or flow.
#[async_trait::async_trait]
pub trait PeerAllocator: Send + Sync {
async fn alloc(&self, num: usize) -> Result<Vec<Peer>, Error>;
}
pub type PeerAllocatorRef = Arc<dyn PeerAllocator>;
pub struct NoopPeerAllocator;
#[async_trait::async_trait]
impl PeerAllocator for NoopPeerAllocator {
async fn alloc(&self, num: usize) -> Result<Vec<Peer>, Error> {
Ok(vec![Peer::default(); num])
}
}

View File

@@ -35,7 +35,7 @@ use crate::kv_backend::memory::MemoryKvBackend;
use crate::node_manager::{
Datanode, DatanodeManager, DatanodeRef, Flownode, FlownodeManager, FlownodeRef, NodeManagerRef,
};
use crate::peer::{Peer, PeerLookupService};
use crate::peer::{Peer, PeerResolver};
use crate::region_keeper::MemoryRegionKeeper;
use crate::region_registry::LeaderRegionRegistry;
use crate::sequence::SequenceBuilder;
@@ -208,10 +208,10 @@ pub fn new_ddl_context_with_kv_backend(
}
}
pub struct NoopPeerLookupService;
pub struct NoopPeerResolver;
#[async_trait::async_trait]
impl PeerLookupService for NoopPeerLookupService {
impl PeerResolver for NoopPeerResolver {
async fn datanode(&self, id: DatanodeId) -> Result<Option<Peer>> {
Ok(Some(Peer::empty(id)))
}
@@ -219,10 +219,6 @@ impl PeerLookupService for NoopPeerLookupService {
async fn flownode(&self, id: FlownodeId) -> Result<Option<Peer>> {
Ok(Some(Peer::empty(id)))
}
async fn active_frontends(&self) -> Result<Vec<Peer>> {
Ok(vec![])
}
}
/// Create a kafka topic pool for testing.

View File

@@ -52,6 +52,29 @@ pub fn find_tz_from_env() -> Option<Tz> {
.and_then(|tz| Tz::from_str(&tz).ok())
}
/// A trait for types that provide the current system time.
pub trait SystemTimer {
/// Returns the time duration since UNIX_EPOCH in milliseconds.
fn current_time_millis(&self) -> i64;
/// Returns the current time in rfc3339 format.
fn current_time_rfc3339(&self) -> String;
}
/// Default implementation of [`SystemTimer`]
#[derive(Debug, Default, Clone, Copy)]
pub struct DefaultSystemTimer;
impl SystemTimer for DefaultSystemTimer {
fn current_time_millis(&self) -> i64 {
current_time_millis()
}
fn current_time_rfc3339(&self) -> String {
current_time_rfc3339()
}
}
/// Returns the time duration since UNIX_EPOCH in milliseconds.
pub fn current_time_millis() -> i64 {
chrono::Utc::now().timestamp_millis()

View File

@@ -63,6 +63,7 @@ use tokio_postgres::NoTls;
use tonic::codec::CompressionEncoding;
use tonic::transport::server::{Router, TcpIncoming};
use crate::cluster::{MetaPeerClientBuilder, MetaPeerClientRef};
#[cfg(any(feature = "pg_kvbackend", feature = "mysql_kvbackend"))]
use crate::election::CANDIDATE_LEASE_SECS;
use crate::election::etcd::EtcdElection;
@@ -71,8 +72,9 @@ use crate::election::rds::mysql::MySqlElection;
#[cfg(feature = "pg_kvbackend")]
use crate::election::rds::postgres::PgElection;
use crate::metasrv::builder::MetasrvBuilder;
use crate::metasrv::{BackendImpl, Metasrv, MetasrvOptions, SelectTarget, SelectorRef};
use crate::node_excluder::NodeExcluderRef;
use crate::metasrv::{
BackendImpl, ElectionRef, Metasrv, MetasrvOptions, SelectTarget, SelectorRef,
};
use crate::selector::SelectorType;
use crate::selector::lease_based::LeaseBasedSelector;
use crate::selector::load_based::LoadBasedSelector;
@@ -405,10 +407,8 @@ pub async fn metasrv_builder(
}
let in_memory = Arc::new(MemoryKvBackend::new()) as ResettableKvBackendRef;
let meta_peer_client = build_default_meta_peer_client(&election, &in_memory);
let node_excluder = plugins
.get::<NodeExcluderRef>()
.unwrap_or_else(|| Arc::new(Vec::new()) as NodeExcluderRef);
let selector = if let Some(selector) = plugins.get::<SelectorRef>() {
info!("Using selector from plugins");
selector
@@ -416,15 +416,12 @@ pub async fn metasrv_builder(
let selector = match opts.selector {
SelectorType::LoadBased => Arc::new(LoadBasedSelector::new(
RegionNumsBasedWeightCompute,
node_excluder,
meta_peer_client.clone(),
)) as SelectorRef,
SelectorType::LeaseBased => {
Arc::new(LeaseBasedSelector::new(node_excluder)) as SelectorRef
SelectorType::LeaseBased => Arc::new(LeaseBasedSelector) as SelectorRef,
SelectorType::RoundRobin => {
Arc::new(RoundRobinSelector::new(SelectTarget::Datanode)) as SelectorRef
}
SelectorType::RoundRobin => Arc::new(RoundRobinSelector::new(
SelectTarget::Datanode,
node_excluder,
)) as SelectorRef,
};
info!(
"Using selector from options, selector type: {}",
@@ -439,9 +436,23 @@ pub async fn metasrv_builder(
.in_memory(in_memory)
.selector(selector)
.election(election)
.meta_peer_client(meta_peer_client)
.plugins(plugins))
}
pub(crate) fn build_default_meta_peer_client(
election: &Option<ElectionRef>,
in_memory: &ResettableKvBackendRef,
) -> MetaPeerClientRef {
MetaPeerClientBuilder::default()
.election(election.clone())
.in_memory(in_memory.clone())
.build()
.map(Arc::new)
// Safety: all required fields set at initialization
.unwrap()
}
pub async fn create_etcd_client(store_addrs: &[String]) -> Result<Client> {
create_etcd_client_with_tls(store_addrs, None).await
}

View File

@@ -36,8 +36,7 @@ use common_telemetry::warn;
use derive_builder::Builder;
use snafu::{OptionExt, ResultExt, ensure};
use crate::error;
use crate::error::{Result, match_for_io_error};
use crate::error::{self, Result, match_for_io_error};
use crate::metasrv::ElectionRef;
pub type MetaPeerClientRef = Arc<MetaPeerClient>;

View File

@@ -0,0 +1,88 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
pub mod lease;
pub mod node_info;
pub mod utils;
use std::time::Duration;
use api::v1::meta::heartbeat_request::NodeWorkloads;
use common_error::ext::BoxedError;
use common_meta::distributed_time_constants::{
DATANODE_LEASE_SECS, FLOWNODE_LEASE_SECS, FRONTEND_HEARTBEAT_INTERVAL_MILLIS,
};
use common_meta::error::Result;
use common_meta::peer::{Peer, PeerDiscovery, PeerResolver};
use common_meta::{DatanodeId, FlownodeId};
use snafu::ResultExt;
use crate::cluster::MetaPeerClient;
use crate::discovery::lease::{LeaseValueAccessor, LeaseValueType};
#[async_trait::async_trait]
impl PeerDiscovery for MetaPeerClient {
async fn active_frontends(&self) -> Result<Vec<Peer>> {
utils::alive_frontends(
self,
Duration::from_millis(FRONTEND_HEARTBEAT_INTERVAL_MILLIS),
)
.await
.map_err(BoxedError::new)
.context(common_meta::error::ExternalSnafu)
}
async fn active_datanodes(
&self,
filter: Option<for<'a> fn(&'a NodeWorkloads) -> bool>,
) -> Result<Vec<Peer>> {
utils::alive_datanodes(self, Duration::from_secs(DATANODE_LEASE_SECS), filter)
.await
.map_err(BoxedError::new)
.context(common_meta::error::ExternalSnafu)
}
async fn active_flownodes(
&self,
filter: Option<for<'a> fn(&'a NodeWorkloads) -> bool>,
) -> Result<Vec<Peer>> {
utils::alive_flownodes(self, Duration::from_secs(FLOWNODE_LEASE_SECS), filter)
.await
.map_err(BoxedError::new)
.context(common_meta::error::ExternalSnafu)
}
}
#[async_trait::async_trait]
impl PeerResolver for MetaPeerClient {
async fn datanode(&self, id: DatanodeId) -> Result<Option<Peer>> {
let peer = self
.lease_value(LeaseValueType::Datanode, id)
.await
.map_err(BoxedError::new)
.context(common_meta::error::ExternalSnafu)?
.map(|(peer_id, lease)| Peer::new(peer_id, lease.node_addr));
Ok(peer)
}
async fn flownode(&self, id: FlownodeId) -> Result<Option<Peer>> {
let peer = self
.lease_value(LeaseValueType::Flownode, id)
.await
.map_err(BoxedError::new)
.context(common_meta::error::ExternalSnafu)?
.map(|(peer_id, lease)| Peer::new(peer_id, lease.node_addr));
Ok(peer)
}
}

View File

@@ -0,0 +1,326 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use common_meta::kv_backend::KvBackend;
use common_meta::rpc::KeyValue;
use common_meta::rpc::store::RangeRequest;
use crate::cluster::MetaPeerClient;
use crate::error::Result;
use crate::key::{DatanodeLeaseKey, FlownodeLeaseKey, LeaseValue};
#[derive(Clone, Copy)]
pub enum LeaseValueType {
Flownode,
Datanode,
}
#[async_trait::async_trait]
pub trait LeaseValueAccessor: Send + Sync {
/// Returns the peer id and lease value.
async fn lease_values(
&self,
lease_value_type: LeaseValueType,
) -> Result<Vec<(u64, LeaseValue)>>;
async fn lease_value(
&self,
lease_value_type: LeaseValueType,
node_id: u64,
) -> Result<Option<(u64, LeaseValue)>>;
}
fn decoder(lease_value_type: LeaseValueType, kv: KeyValue) -> Result<(u64, LeaseValue)> {
match lease_value_type {
LeaseValueType::Flownode => {
let lease_key: FlownodeLeaseKey = kv.key.try_into()?;
let lease_value: LeaseValue = kv.value.try_into()?;
Ok((lease_key.node_id, lease_value))
}
LeaseValueType::Datanode => {
let lease_key: DatanodeLeaseKey = kv.key.try_into()?;
let lease_value: LeaseValue = kv.value.try_into()?;
Ok((lease_key.node_id, lease_value))
}
}
}
#[async_trait::async_trait]
impl LeaseValueAccessor for MetaPeerClient {
async fn lease_values(
&self,
lease_value_type: LeaseValueType,
) -> Result<Vec<(u64, LeaseValue)>> {
let prefix = match lease_value_type {
LeaseValueType::Flownode => FlownodeLeaseKey::prefix_key_by_cluster(),
LeaseValueType::Datanode => DatanodeLeaseKey::prefix_key(),
};
let range_request = RangeRequest::new().with_prefix(prefix);
let response = self.range(range_request).await?;
response
.kvs
.into_iter()
.map(|kv| {
let (lease_key, lease_value) = decoder(lease_value_type, kv)?;
Ok((lease_key, lease_value))
})
.collect::<Result<Vec<_>>>()
}
async fn lease_value(
&self,
lease_value_type: LeaseValueType,
node_id: u64,
) -> Result<Option<(u64, LeaseValue)>> {
let key: Vec<u8> = match lease_value_type {
LeaseValueType::Flownode => FlownodeLeaseKey { node_id }.try_into()?,
LeaseValueType::Datanode => DatanodeLeaseKey { node_id }.try_into()?,
};
let response = self.get(&key).await?;
response.map(|kv| decoder(lease_value_type, kv)).transpose()
}
}
#[cfg(test)]
mod tests {
use std::time::Duration;
use api::v1::meta::DatanodeWorkloads;
use api::v1::meta::heartbeat_request::NodeWorkloads;
use common_meta::cluster::{FrontendStatus, NodeInfo, NodeInfoKey, NodeStatus, Role};
use common_meta::distributed_time_constants::FRONTEND_HEARTBEAT_INTERVAL_MILLIS;
use common_meta::kv_backend::ResettableKvBackendRef;
use common_meta::peer::{Peer, PeerDiscovery};
use common_meta::rpc::store::PutRequest;
use common_time::util::current_time_millis;
use common_workload::DatanodeWorkloadType;
use crate::discovery::utils::{self, is_datanode_accept_ingest_workload};
use crate::key::{DatanodeLeaseKey, LeaseValue};
use crate::test_util::create_meta_peer_client;
async fn put_lease_value(
kv_backend: &ResettableKvBackendRef,
key: DatanodeLeaseKey,
value: LeaseValue,
) {
kv_backend
.put(PutRequest {
key: key.try_into().unwrap(),
value: value.try_into().unwrap(),
prev_kv: false,
})
.await
.unwrap();
}
#[tokio::test]
async fn test_alive_datanodes() {
let client = create_meta_peer_client();
let in_memory = client.memory_backend();
let lease_secs = 10;
// put a stale lease value for node 1
let key = DatanodeLeaseKey { node_id: 1 };
let value = LeaseValue {
// 20s ago
timestamp_millis: current_time_millis() - lease_secs * 2 * 1000,
node_addr: "127.0.0.1:20201".to_string(),
workloads: NodeWorkloads::Datanode(DatanodeWorkloads {
types: vec![DatanodeWorkloadType::Hybrid as i32],
}),
};
put_lease_value(&in_memory, key, value).await;
// put a fresh lease value for node 2
let key = DatanodeLeaseKey { node_id: 2 };
let value = LeaseValue {
timestamp_millis: current_time_millis(),
node_addr: "127.0.0.1:20202".to_string(),
workloads: NodeWorkloads::Datanode(DatanodeWorkloads {
types: vec![DatanodeWorkloadType::Hybrid as i32],
}),
};
put_lease_value(&in_memory, key.clone(), value.clone()).await;
let peers = utils::alive_datanodes(
client.as_ref(),
Duration::from_secs(lease_secs as u64),
None,
)
.await
.unwrap();
assert_eq!(peers.len(), 1);
assert_eq!(peers, vec![Peer::new(2, "127.0.0.1:20202".to_string())]);
}
#[tokio::test]
async fn test_alive_datanodes_with_condition() {
let client = create_meta_peer_client();
let in_memory = client.memory_backend();
let lease_secs = 10;
// put a lease value for node 1 without mode info
let key = DatanodeLeaseKey { node_id: 1 };
let value = LeaseValue {
// 20s ago
timestamp_millis: current_time_millis() - 20 * 1000,
node_addr: "127.0.0.1:20201".to_string(),
workloads: NodeWorkloads::Datanode(DatanodeWorkloads {
types: vec![DatanodeWorkloadType::Hybrid as i32],
}),
};
put_lease_value(&in_memory, key, value).await;
// put a lease value for node 2 with mode info
let key = DatanodeLeaseKey { node_id: 2 };
let value = LeaseValue {
timestamp_millis: current_time_millis(),
node_addr: "127.0.0.1:20202".to_string(),
workloads: NodeWorkloads::Datanode(DatanodeWorkloads {
types: vec![DatanodeWorkloadType::Hybrid as i32],
}),
};
put_lease_value(&in_memory, key, value).await;
// put a lease value for node 3 with mode info
let key = DatanodeLeaseKey { node_id: 3 };
let value = LeaseValue {
timestamp_millis: current_time_millis(),
node_addr: "127.0.0.1:20203".to_string(),
workloads: NodeWorkloads::Datanode(DatanodeWorkloads {
types: vec![i32::MAX],
}),
};
put_lease_value(&in_memory, key, value).await;
// put a lease value for node 3 with mode info
let key = DatanodeLeaseKey { node_id: 4 };
let value = LeaseValue {
timestamp_millis: current_time_millis(),
node_addr: "127.0.0.1:20204".to_string(),
workloads: NodeWorkloads::Datanode(DatanodeWorkloads {
types: vec![i32::MAX],
}),
};
put_lease_value(&in_memory, key, value).await;
let peers = utils::alive_datanodes(
client.as_ref(),
Duration::from_secs(lease_secs),
Some(is_datanode_accept_ingest_workload),
)
.await
.unwrap();
assert_eq!(peers.len(), 1);
assert!(peers.contains(&Peer::new(2, "127.0.0.1:20202".to_string())));
}
#[tokio::test]
async fn test_lookup_frontends() {
let client = create_meta_peer_client();
let in_memory = client.memory_backend();
let lease_secs = 10;
let active_frontend_node = NodeInfo {
peer: Peer {
id: 0,
addr: "127.0.0.1:20201".to_string(),
},
last_activity_ts: current_time_millis(),
status: NodeStatus::Frontend(FrontendStatus {}),
version: "1.0.0".to_string(),
git_commit: "1234567890".to_string(),
start_time_ms: current_time_millis() as u64,
cpus: 0,
memory_bytes: 0,
};
let key_prefix = NodeInfoKey::key_prefix_with_role(Role::Frontend);
in_memory
.put(PutRequest {
key: format!("{}{}", key_prefix, "0").into(),
value: active_frontend_node.try_into().unwrap(),
prev_kv: false,
})
.await
.unwrap();
let inactive_frontend_node = NodeInfo {
peer: Peer {
id: 1,
addr: "127.0.0.1:20201".to_string(),
},
last_activity_ts: current_time_millis() - 20 * 1000,
status: NodeStatus::Frontend(FrontendStatus {}),
version: "1.0.0".to_string(),
git_commit: "1234567890".to_string(),
start_time_ms: current_time_millis() as u64,
cpus: 0,
memory_bytes: 0,
};
in_memory
.put(PutRequest {
key: format!("{}{}", key_prefix, "1").into(),
value: inactive_frontend_node.try_into().unwrap(),
prev_kv: false,
})
.await
.unwrap();
let peers = utils::alive_frontends(client.as_ref(), Duration::from_secs(lease_secs))
.await
.unwrap();
assert_eq!(peers.len(), 1);
assert_eq!(peers[0].id, 0);
}
#[tokio::test]
async fn test_no_active_frontends() {
let client = create_meta_peer_client();
let in_memory = client.memory_backend();
let last_activity_ts =
current_time_millis() - FRONTEND_HEARTBEAT_INTERVAL_MILLIS as i64 - 1000;
let active_frontend_node = NodeInfo {
peer: Peer {
id: 0,
addr: "127.0.0.1:20201".to_string(),
},
last_activity_ts,
status: NodeStatus::Frontend(FrontendStatus {}),
version: "1.0.0".to_string(),
git_commit: "1234567890".to_string(),
start_time_ms: last_activity_ts as u64,
cpus: 0,
memory_bytes: 0,
};
let key_prefix = NodeInfoKey::key_prefix_with_role(Role::Frontend);
in_memory
.put(PutRequest {
key: format!("{}{}", key_prefix, "0").into(),
value: active_frontend_node.try_into().unwrap(),
prev_kv: false,
})
.await
.unwrap();
let peers = client.active_frontends().await.unwrap();
assert_eq!(peers.len(), 0);
}
}

View File

@@ -0,0 +1,63 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use common_meta::cluster::{NodeInfo, NodeInfoKey, Role};
use common_meta::kv_backend::KvBackend;
use common_meta::rpc::store::RangeRequest;
use snafu::ResultExt;
use crate::cluster::MetaPeerClient;
use crate::error::{InvalidNodeInfoFormatSnafu, Result};
#[derive(Clone, Copy)]
pub enum NodeInfoType {
Frontend,
Datanode,
Flownode,
}
impl From<NodeInfoType> for Role {
fn from(node_info_type: NodeInfoType) -> Self {
match node_info_type {
NodeInfoType::Frontend => Role::Frontend,
NodeInfoType::Datanode => Role::Datanode,
NodeInfoType::Flownode => Role::Flownode,
}
}
}
/// Trait for types that can access node info.
#[async_trait::async_trait]
pub trait NodeInfoAccessor: Send + Sync {
/// Returns the peer id and node info.
async fn node_infos(&self, node_info_type: NodeInfoType) -> Result<Vec<(u64, NodeInfo)>>;
}
#[async_trait::async_trait]
impl NodeInfoAccessor for MetaPeerClient {
async fn node_infos(&self, node_info_type: NodeInfoType) -> Result<Vec<(u64, NodeInfo)>> {
let range_request = RangeRequest::new()
.with_prefix(NodeInfoKey::key_prefix_with_role(node_info_type.into()));
let response = self.range(range_request).await?;
response
.kvs
.into_iter()
.map(|kv| {
let node_info = NodeInfo::try_from(kv.value).context(InvalidNodeInfoFormatSnafu)?;
Ok((node_info.peer.id, node_info))
})
.collect::<Result<Vec<_>>>()
}
}

View File

@@ -0,0 +1,182 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use std::time::Duration;
use api::v1::meta::heartbeat_request::NodeWorkloads;
use common_meta::DatanodeId;
use common_meta::cluster::NodeInfo;
use common_meta::kv_backend::KvBackendRef;
use common_meta::peer::Peer;
use common_time::util::{DefaultSystemTimer, SystemTimer};
use common_workload::DatanodeWorkloadType;
use snafu::ResultExt;
use crate::discovery::lease::{LeaseValueAccessor, LeaseValueType};
use crate::discovery::node_info::{NodeInfoAccessor, NodeInfoType};
use crate::error::{KvBackendSnafu, Result};
use crate::key::{DatanodeLeaseKey, LeaseValue};
impl LastActiveTs for LeaseValue {
fn last_active_ts(&self) -> i64 {
self.timestamp_millis
}
}
impl LastActiveTs for NodeInfo {
fn last_active_ts(&self) -> i64 {
self.last_activity_ts
}
}
/// Trait for types that have a last active timestamp.
pub trait LastActiveTs {
/// Returns the last active timestamp in milliseconds.
fn last_active_ts(&self) -> i64;
}
/// Builds a filter closure that checks whether a [`LastActiveTs`] item
/// is still within the specified active duration, relative to the
/// current time provided by the given [`SystemTimer`].
///
/// The returned closure uses the timestamp at the time of building,
/// so the "now" reference point is fixed when this function is called.
pub fn build_active_filter<T: LastActiveTs>(
timer: impl SystemTimer,
active_duration: Duration,
) -> impl Fn(&T) -> bool {
let now = timer.current_time_millis();
let active_duration = active_duration.as_millis() as u64;
move |item: &T| {
let elapsed = now.saturating_sub(item.last_active_ts()) as u64;
elapsed < active_duration
}
}
/// Returns the alive datanodes.
pub async fn alive_datanodes(
accessor: &impl LeaseValueAccessor,
active_duration: Duration,
condition: Option<fn(&NodeWorkloads) -> bool>,
) -> Result<Vec<Peer>> {
let active_filter = build_active_filter(DefaultSystemTimer, active_duration);
let condition = condition.unwrap_or(|_| true);
Ok(accessor
.lease_values(LeaseValueType::Datanode)
.await?
.into_iter()
.filter_map(|(peer_id, lease_value)| {
if active_filter(&lease_value) && condition(&lease_value.workloads) {
Some(Peer::new(peer_id, lease_value.node_addr))
} else {
None
}
})
.collect::<Vec<_>>())
}
/// Returns the alive flownodes.
pub async fn alive_flownodes(
accessor: &impl LeaseValueAccessor,
active_duration: Duration,
condition: Option<fn(&NodeWorkloads) -> bool>,
) -> Result<Vec<Peer>> {
let active_filter = build_active_filter(DefaultSystemTimer, active_duration);
let condition = condition.unwrap_or(|_| true);
Ok(accessor
.lease_values(LeaseValueType::Flownode)
.await?
.into_iter()
.filter_map(|(peer_id, lease_value)| {
if active_filter(&lease_value) && condition(&lease_value.workloads) {
Some(Peer::new(peer_id, lease_value.node_addr))
} else {
None
}
})
.collect::<Vec<_>>())
}
/// Returns the alive frontends.
pub async fn alive_frontends(
lister: &impl NodeInfoAccessor,
active_duration: Duration,
) -> Result<Vec<Peer>> {
let active_filter = build_active_filter(DefaultSystemTimer, active_duration);
Ok(lister
.node_infos(NodeInfoType::Frontend)
.await?
.into_iter()
.filter_map(|(_, node_info)| {
if active_filter(&node_info) {
Some(node_info.peer)
} else {
None
}
})
.collect::<Vec<_>>())
}
/// Returns the alive datanode peer.
pub async fn alive_datanode(
lister: &impl LeaseValueAccessor,
peer_id: u64,
active_duration: Duration,
) -> Result<Option<Peer>> {
let active_filter = build_active_filter(DefaultSystemTimer, active_duration);
let v = lister
.lease_value(LeaseValueType::Datanode, peer_id)
.await?
.filter(|(_, lease)| active_filter(lease))
.map(|(peer_id, lease)| Peer::new(peer_id, lease.node_addr));
Ok(v)
}
/// Returns true if the datanode can accept ingest workload based on its workload types.
///
/// A datanode is considered to accept ingest workload if it supports either:
/// - Hybrid workload (both ingest and query workloads)
/// - Ingest workload (only ingest workload)
pub fn is_datanode_accept_ingest_workload(datanode_workloads: &NodeWorkloads) -> bool {
match &datanode_workloads {
NodeWorkloads::Datanode(workloads) => workloads
.types
.iter()
.filter_map(|w| DatanodeWorkloadType::from_i32(*w))
.any(|w| w.accept_ingest()),
_ => false,
}
}
/// Returns the lease value of the given datanode id, if the datanode is not found, returns None.
pub async fn find_datanode_lease_value(
in_memory: &KvBackendRef,
datanode_id: DatanodeId,
) -> Result<Option<LeaseValue>> {
let lease_key = DatanodeLeaseKey {
node_id: datanode_id,
};
let lease_key_bytes: Vec<u8> = lease_key.try_into()?;
let Some(kv) = in_memory
.get(&lease_key_bytes)
.await
.context(KvBackendSnafu)?
else {
return Ok(None);
};
let lease_value: LeaseValue = kv.value.try_into()?;
Ok(Some(lease_value))
}

View File

@@ -54,8 +54,22 @@ pub enum Error {
peer_id: u64,
},
#[snafu(display("Failed to lookup frontends"))]
LookupFrontends {
#[snafu(display("Failed to list active frontends"))]
ListActiveFrontends {
#[snafu(implicit)]
location: Location,
source: common_meta::error::Error,
},
#[snafu(display("Failed to list active datanodes"))]
ListActiveDatanodes {
#[snafu(implicit)]
location: Location,
source: common_meta::error::Error,
},
#[snafu(display("Failed to list active flownodes"))]
ListActiveFlownodes {
#[snafu(implicit)]
location: Location,
source: common_meta::error::Error,
@@ -796,6 +810,13 @@ pub enum Error {
source: common_meta::error::Error,
},
#[snafu(display("Invalid node info format"))]
InvalidNodeInfoFormat {
#[snafu(implicit)]
location: Location,
source: common_meta::error::Error,
},
#[snafu(display("Failed to serialize options to TOML"))]
TomlFormat {
#[snafu(implicit)]
@@ -1055,7 +1076,8 @@ impl ErrorExt for Error {
Error::TableNotFound { .. } => StatusCode::TableNotFound,
Error::SaveClusterInfo { source, .. }
| Error::InvalidClusterInfoFormat { source, .. }
| Error::InvalidDatanodeStatFormat { source, .. } => source.status_code(),
| Error::InvalidDatanodeStatFormat { source, .. }
| Error::InvalidNodeInfoFormat { source, .. } => source.status_code(),
Error::InvalidateTableCache { source, .. } => source.status_code(),
Error::SubmitProcedure { source, .. }
| Error::WaitProcedure { source, .. }
@@ -1085,7 +1107,9 @@ impl ErrorExt for Error {
| Error::UnexpectedLogicalRouteTable { source, .. }
| Error::UpdateTopicNameValue { source, .. }
| Error::ParseWalOptions { source, .. } => source.status_code(),
Error::LookupFrontends { source, .. } => source.status_code(),
Error::ListActiveFrontends { source, .. }
| Error::ListActiveDatanodes { source, .. }
| Error::ListActiveFlownodes { source, .. } => source.status_code(),
Error::NoAvailableFrontend { .. } => StatusCode::IllegalState,
Error::InitMetadata { source, .. }

View File

@@ -1,52 +0,0 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use std::collections::HashSet;
use common_error::ext::BoxedError;
use common_meta::ddl::flow_meta::PartitionPeerAllocator;
use common_meta::peer::Peer;
use snafu::ResultExt;
use crate::metasrv::{SelectorContext, SelectorRef};
use crate::selector::SelectorOptions;
pub struct FlowPeerAllocator {
ctx: SelectorContext,
selector: SelectorRef,
}
impl FlowPeerAllocator {
pub fn new(ctx: SelectorContext, selector: SelectorRef) -> Self {
Self { ctx, selector }
}
}
#[async_trait::async_trait]
impl PartitionPeerAllocator for FlowPeerAllocator {
async fn alloc(&self, partitions: usize) -> common_meta::error::Result<Vec<Peer>> {
self.selector
.select(
&self.ctx,
SelectorOptions {
min_required_items: partitions,
allow_duplication: true,
exclude_peer_ids: HashSet::new(),
},
)
.await
.map_err(BoxedError::new)
.context(common_meta::error::ExternalSnafu)
}
}

View File

@@ -132,8 +132,8 @@ mod tests {
use common_meta::datanode::Stat;
use super::*;
use crate::discovery::utils::find_datanode_lease_value;
use crate::handler::test_utils::TestEnv;
use crate::lease::find_datanode_lease_value;
#[tokio::test]
async fn test_put_into_memory_store() {
@@ -143,7 +143,8 @@ mod tests {
let handler = DatanodeKeepLeaseHandler;
handle_request_many_times(ctx.clone(), &handler, 1).await;
let lease_value = find_datanode_lease_value(1, &ctx.in_memory)
let in_memory = ctx.in_memory.clone() as _;
let lease_value = find_datanode_lease_value(&in_memory, 1)
.await
.unwrap()
.unwrap();

View File

@@ -1,552 +0,0 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use std::collections::HashMap;
use std::future::Future;
use std::hash::Hash;
use std::pin::Pin;
use std::task::{Context, Poll};
use std::time::Duration;
use api::v1::meta::heartbeat_request::NodeWorkloads;
use common_error::ext::BoxedError;
use common_meta::cluster::{NodeInfo, NodeInfoKey, Role as ClusterRole};
use common_meta::distributed_time_constants::FRONTEND_HEARTBEAT_INTERVAL_MILLIS;
use common_meta::kv_backend::{KvBackend, ResettableKvBackendRef};
use common_meta::peer::{Peer, PeerLookupService};
use common_meta::rpc::store::RangeRequest;
use common_meta::{DatanodeId, FlownodeId, util};
use common_time::util as time_util;
use common_workload::DatanodeWorkloadType;
use snafu::ResultExt;
use crate::cluster::MetaPeerClientRef;
use crate::error::{Error, KvBackendSnafu, Result};
use crate::key::{DatanodeLeaseKey, FlownodeLeaseKey, LeaseValue};
enum Value<'a> {
LeaseValue(&'a LeaseValue),
NodeInfo(&'a NodeInfo),
}
fn build_lease_filter(lease: Duration) -> impl Fn(Value) -> bool {
move |value: Value| {
let active_time = match value {
Value::LeaseValue(lease_value) => lease_value.timestamp_millis,
Value::NodeInfo(node_info) => node_info.last_activity_ts,
};
((time_util::current_time_millis() - active_time) as u64) < lease.as_millis() as u64
}
}
/// Returns true if the datanode can accept ingest workload based on its workload types.
///
/// A datanode is considered to accept ingest workload if it supports either:
/// - Hybrid workload (both ingest and query workloads)
/// - Ingest workload (only ingest workload)
pub fn is_datanode_accept_ingest_workload(lease_value: &LeaseValue) -> bool {
match &lease_value.workloads {
NodeWorkloads::Datanode(workloads) => workloads
.types
.iter()
.filter_map(|w| DatanodeWorkloadType::from_i32(*w))
.any(|w| w.accept_ingest()),
_ => false,
}
}
/// Returns the lease value of the given datanode id, if the datanode is not found, returns None.
pub async fn find_datanode_lease_value(
datanode_id: DatanodeId,
in_memory_key: &ResettableKvBackendRef,
) -> Result<Option<LeaseValue>> {
let lease_key = DatanodeLeaseKey {
node_id: datanode_id,
};
let lease_key_bytes: Vec<u8> = lease_key.try_into()?;
let Some(kv) = in_memory_key
.get(&lease_key_bytes)
.await
.context(KvBackendSnafu)?
else {
return Ok(None);
};
let lease_value: LeaseValue = kv.value.try_into()?;
Ok(Some(lease_value))
}
/// look up [`Peer`] given [`ClusterId`] and [`DatanodeId`], will only return if it's alive under given `lease_secs`
pub async fn lookup_datanode_peer(
datanode_id: DatanodeId,
meta_peer_client: &MetaPeerClientRef,
lease: Duration,
) -> Result<Option<Peer>> {
let lease_filter = build_lease_filter(lease);
let lease_key = DatanodeLeaseKey {
node_id: datanode_id,
};
let lease_key_bytes: Vec<u8> = lease_key.clone().try_into()?;
let Some(kv) = meta_peer_client.get(&lease_key_bytes).await? else {
return Ok(None);
};
let lease_value: LeaseValue = kv.value.try_into()?;
let is_alive = lease_filter(Value::LeaseValue(&lease_value));
if is_alive {
Ok(Some(Peer {
id: lease_key.node_id,
addr: lease_value.node_addr,
}))
} else {
Ok(None)
}
}
type LeaseFilterFuture<'a, K> =
Pin<Box<dyn Future<Output = Result<HashMap<K, LeaseValue>>> + Send + 'a>>;
pub struct LeaseFilter<'a, K>
where
K: Eq + Hash + TryFrom<Vec<u8>, Error = Error> + 'a,
{
lease: Duration,
key_prefix: Vec<u8>,
meta_peer_client: &'a MetaPeerClientRef,
condition: Option<fn(&LeaseValue) -> bool>,
inner_future: Option<LeaseFilterFuture<'a, K>>,
}
impl<'a, K> LeaseFilter<'a, K>
where
K: Eq + Hash + TryFrom<Vec<u8>, Error = Error> + 'a,
{
pub fn new(
lease: Duration,
key_prefix: Vec<u8>,
meta_peer_client: &'a MetaPeerClientRef,
) -> Self {
Self {
lease,
key_prefix,
meta_peer_client,
condition: None,
inner_future: None,
}
}
/// Set the condition for the lease filter.
pub fn with_condition(mut self, condition: fn(&LeaseValue) -> bool) -> Self {
self.condition = Some(condition);
self
}
}
impl<'a, K> Future for LeaseFilter<'a, K>
where
K: Eq + Hash + TryFrom<Vec<u8>, Error = Error> + 'a,
{
type Output = Result<HashMap<K, LeaseValue>>;
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let this = self.get_mut();
if this.inner_future.is_none() {
let lease_filter = build_lease_filter(this.lease);
let condition = this.condition;
let key_prefix = std::mem::take(&mut this.key_prefix);
let fut = filter(key_prefix, this.meta_peer_client, move |v| {
lease_filter(Value::LeaseValue(v)) && condition.unwrap_or(|_| true)(v)
});
this.inner_future = Some(Box::pin(fut));
}
let fut = this.inner_future.as_mut().unwrap();
let result = futures::ready!(fut.as_mut().poll(cx))?;
Poll::Ready(Ok(result))
}
}
/// Find all alive datanodes
pub fn alive_datanodes(
meta_peer_client: &MetaPeerClientRef,
lease: Duration,
) -> LeaseFilter<'_, DatanodeLeaseKey> {
LeaseFilter::new(lease, DatanodeLeaseKey::prefix_key(), meta_peer_client)
}
/// look up [`Peer`] given [`ClusterId`] and [`DatanodeId`], only return if it's alive under given `lease_secs`
pub async fn lookup_flownode_peer(
flownode_id: FlownodeId,
meta_peer_client: &MetaPeerClientRef,
lease: Duration,
) -> Result<Option<Peer>> {
let lease_filter = build_lease_filter(lease);
let lease_key = FlownodeLeaseKey {
node_id: flownode_id,
};
let lease_key_bytes: Vec<u8> = lease_key.clone().try_into()?;
let Some(kv) = meta_peer_client.get(&lease_key_bytes).await? else {
return Ok(None);
};
let lease_value: LeaseValue = kv.value.try_into()?;
let is_alive = lease_filter(Value::LeaseValue(&lease_value));
if is_alive {
Ok(Some(Peer {
id: lease_key.node_id,
addr: lease_value.node_addr,
}))
} else {
Ok(None)
}
}
/// Lookup all alive frontends from the memory backend, only return if it's alive under given `lease_secs`.
pub async fn lookup_frontends(
meta_peer_client: &MetaPeerClientRef,
lease: Duration,
) -> Result<Vec<Peer>> {
let range_request =
RangeRequest::new().with_prefix(NodeInfoKey::key_prefix_with_role(ClusterRole::Frontend));
let response = meta_peer_client.range(range_request).await?;
let lease_filter = build_lease_filter(lease);
let mut peers = Vec::with_capacity(response.kvs.len());
for kv in response.kvs {
let node_info = NodeInfo::try_from(kv.value).context(KvBackendSnafu)?;
let is_alive = lease_filter(Value::NodeInfo(&node_info));
if is_alive {
peers.push(node_info.peer);
}
}
Ok(peers)
}
/// Find all alive flownodes
pub fn alive_flownodes(
meta_peer_client: &MetaPeerClientRef,
lease: Duration,
) -> LeaseFilter<'_, FlownodeLeaseKey> {
LeaseFilter::new(
lease,
FlownodeLeaseKey::prefix_key_by_cluster(),
meta_peer_client,
)
}
pub async fn filter<P, K>(
key: Vec<u8>,
meta_peer_client: &MetaPeerClientRef,
predicate: P,
) -> Result<HashMap<K, LeaseValue>>
where
P: Fn(&LeaseValue) -> bool,
K: Eq + Hash + TryFrom<Vec<u8>, Error = Error>,
{
let range_end = util::get_prefix_end_key(&key);
let range_req = common_meta::rpc::store::RangeRequest {
key,
range_end,
keys_only: false,
..Default::default()
};
let kvs = meta_peer_client.range(range_req).await?.kvs;
let mut lease_kvs = HashMap::new();
for kv in kvs {
let lease_key: K = kv.key.try_into()?;
let lease_value: LeaseValue = kv.value.try_into()?;
if !predicate(&lease_value) {
continue;
}
let _ = lease_kvs.insert(lease_key, lease_value);
}
Ok(lease_kvs)
}
#[derive(Clone)]
pub struct MetaPeerLookupService {
pub meta_peer_client: MetaPeerClientRef,
}
impl MetaPeerLookupService {
pub fn new(meta_peer_client: MetaPeerClientRef) -> Self {
Self { meta_peer_client }
}
}
#[async_trait::async_trait]
impl PeerLookupService for MetaPeerLookupService {
async fn datanode(&self, id: DatanodeId) -> common_meta::error::Result<Option<Peer>> {
lookup_datanode_peer(id, &self.meta_peer_client, Duration::from_secs(u64::MAX))
.await
.map_err(BoxedError::new)
.context(common_meta::error::ExternalSnafu)
}
async fn flownode(&self, id: FlownodeId) -> common_meta::error::Result<Option<Peer>> {
lookup_flownode_peer(id, &self.meta_peer_client, Duration::from_secs(u64::MAX))
.await
.map_err(BoxedError::new)
.context(common_meta::error::ExternalSnafu)
}
async fn active_frontends(&self) -> common_meta::error::Result<Vec<Peer>> {
// Get the active frontends within the last heartbeat interval.
lookup_frontends(
&self.meta_peer_client,
// TODO(zyy17): How to get the heartbeat interval of the frontend if it uses a custom heartbeat interval?
Duration::from_millis(FRONTEND_HEARTBEAT_INTERVAL_MILLIS),
)
.await
.map_err(BoxedError::new)
.context(common_meta::error::ExternalSnafu)
}
}
#[cfg(test)]
mod tests {
use std::time::Duration;
use api::v1::meta::DatanodeWorkloads;
use api::v1::meta::heartbeat_request::NodeWorkloads;
use common_meta::cluster::{FrontendStatus, NodeInfo, NodeInfoKey, NodeStatus};
use common_meta::distributed_time_constants::FRONTEND_HEARTBEAT_INTERVAL_MILLIS;
use common_meta::kv_backend::ResettableKvBackendRef;
use common_meta::peer::{Peer, PeerLookupService};
use common_meta::rpc::store::PutRequest;
use common_time::util::current_time_millis;
use common_workload::DatanodeWorkloadType;
use crate::key::{DatanodeLeaseKey, LeaseValue};
use crate::lease::{
ClusterRole, MetaPeerLookupService, alive_datanodes, is_datanode_accept_ingest_workload,
lookup_frontends,
};
use crate::test_util::create_meta_peer_client;
async fn put_lease_value(
kv_backend: &ResettableKvBackendRef,
key: DatanodeLeaseKey,
value: LeaseValue,
) {
kv_backend
.put(PutRequest {
key: key.try_into().unwrap(),
value: value.try_into().unwrap(),
prev_kv: false,
})
.await
.unwrap();
}
#[tokio::test]
async fn test_alive_datanodes() {
let client = create_meta_peer_client();
let in_memory = client.memory_backend();
let lease_secs = 10;
// put a stale lease value for node 1
let key = DatanodeLeaseKey { node_id: 1 };
let value = LeaseValue {
// 20s ago
timestamp_millis: current_time_millis() - lease_secs * 2 * 1000,
node_addr: "127.0.0.1:20201".to_string(),
workloads: NodeWorkloads::Datanode(DatanodeWorkloads {
types: vec![DatanodeWorkloadType::Hybrid as i32],
}),
};
put_lease_value(&in_memory, key, value).await;
// put a fresh lease value for node 2
let key = DatanodeLeaseKey { node_id: 2 };
let value = LeaseValue {
timestamp_millis: current_time_millis(),
node_addr: "127.0.0.1:20202".to_string(),
workloads: NodeWorkloads::Datanode(DatanodeWorkloads {
types: vec![DatanodeWorkloadType::Hybrid as i32],
}),
};
put_lease_value(&in_memory, key.clone(), value.clone()).await;
let leases = alive_datanodes(&client, Duration::from_secs(lease_secs as u64))
.await
.unwrap();
assert_eq!(leases.len(), 1);
assert_eq!(leases.get(&key), Some(&value));
}
#[tokio::test]
async fn test_alive_datanodes_with_condition() {
let client = create_meta_peer_client();
let in_memory = client.memory_backend();
let lease_secs = 10;
// put a lease value for node 1 without mode info
let key = DatanodeLeaseKey { node_id: 1 };
let value = LeaseValue {
// 20s ago
timestamp_millis: current_time_millis() - 20 * 1000,
node_addr: "127.0.0.1:20201".to_string(),
workloads: NodeWorkloads::Datanode(DatanodeWorkloads {
types: vec![DatanodeWorkloadType::Hybrid as i32],
}),
};
put_lease_value(&in_memory, key, value).await;
// put a lease value for node 2 with mode info
let key = DatanodeLeaseKey { node_id: 2 };
let value = LeaseValue {
timestamp_millis: current_time_millis(),
node_addr: "127.0.0.1:20202".to_string(),
workloads: NodeWorkloads::Datanode(DatanodeWorkloads {
types: vec![DatanodeWorkloadType::Hybrid as i32],
}),
};
put_lease_value(&in_memory, key, value).await;
// put a lease value for node 3 with mode info
let key = DatanodeLeaseKey { node_id: 3 };
let value = LeaseValue {
timestamp_millis: current_time_millis(),
node_addr: "127.0.0.1:20203".to_string(),
workloads: NodeWorkloads::Datanode(DatanodeWorkloads {
types: vec![i32::MAX],
}),
};
put_lease_value(&in_memory, key, value).await;
// put a lease value for node 3 with mode info
let key = DatanodeLeaseKey { node_id: 4 };
let value = LeaseValue {
timestamp_millis: current_time_millis(),
node_addr: "127.0.0.1:20204".to_string(),
workloads: NodeWorkloads::Datanode(DatanodeWorkloads {
types: vec![i32::MAX],
}),
};
put_lease_value(&in_memory, key, value).await;
let leases = alive_datanodes(&client, Duration::from_secs(lease_secs))
.with_condition(is_datanode_accept_ingest_workload)
.await
.unwrap();
assert_eq!(leases.len(), 1);
assert!(leases.contains_key(&DatanodeLeaseKey { node_id: 2 }));
}
#[tokio::test]
async fn test_lookup_frontends() {
let client = create_meta_peer_client();
let in_memory = client.memory_backend();
let lease_secs = 10;
let active_frontend_node = NodeInfo {
peer: Peer {
id: 0,
addr: "127.0.0.1:20201".to_string(),
},
last_activity_ts: current_time_millis(),
status: NodeStatus::Frontend(FrontendStatus {}),
version: "1.0.0".to_string(),
git_commit: "1234567890".to_string(),
start_time_ms: current_time_millis() as u64,
cpus: 0,
memory_bytes: 0,
};
let key_prefix = NodeInfoKey::key_prefix_with_role(ClusterRole::Frontend);
in_memory
.put(PutRequest {
key: format!("{}{}", key_prefix, "0").into(),
value: active_frontend_node.try_into().unwrap(),
prev_kv: false,
})
.await
.unwrap();
let inactive_frontend_node = NodeInfo {
peer: Peer {
id: 1,
addr: "127.0.0.1:20201".to_string(),
},
last_activity_ts: current_time_millis() - 20 * 1000,
status: NodeStatus::Frontend(FrontendStatus {}),
version: "1.0.0".to_string(),
git_commit: "1234567890".to_string(),
start_time_ms: current_time_millis() as u64,
cpus: 0,
memory_bytes: 0,
};
in_memory
.put(PutRequest {
key: format!("{}{}", key_prefix, "1").into(),
value: inactive_frontend_node.try_into().unwrap(),
prev_kv: false,
})
.await
.unwrap();
let peers = lookup_frontends(&client, Duration::from_secs(lease_secs))
.await
.unwrap();
assert_eq!(peers.len(), 1);
assert_eq!(peers[0].id, 0);
}
#[tokio::test]
async fn test_no_active_frontends() {
let client = create_meta_peer_client();
let in_memory = client.memory_backend();
let last_activity_ts =
current_time_millis() - FRONTEND_HEARTBEAT_INTERVAL_MILLIS as i64 - 1000;
let active_frontend_node = NodeInfo {
peer: Peer {
id: 0,
addr: "127.0.0.1:20201".to_string(),
},
last_activity_ts,
status: NodeStatus::Frontend(FrontendStatus {}),
version: "1.0.0".to_string(),
git_commit: "1234567890".to_string(),
start_time_ms: last_activity_ts as u64,
cpus: 0,
memory_bytes: 0,
};
let key_prefix = NodeInfoKey::key_prefix_with_role(ClusterRole::Frontend);
in_memory
.put(PutRequest {
key: format!("{}{}", key_prefix, "0").into(),
value: active_frontend_node.try_into().unwrap(),
prev_kv: false,
})
.await
.unwrap();
let service = MetaPeerLookupService::new(client);
let peers = service.active_frontends().await.unwrap();
assert_eq!(peers.len(), 0);
}
}

View File

@@ -18,30 +18,29 @@
#![feature(let_chains)]
#![feature(duration_constructors_lite)]
#![feature(duration_constructors)]
#![feature(string_from_utf8_lossy_owned)]
pub mod bootstrap;
pub mod cache_invalidator;
pub mod cluster;
pub mod discovery;
pub mod election;
pub mod error;
pub mod events;
mod failure_detector;
pub mod flow_meta_alloc;
pub mod handler;
pub mod key;
pub mod lease;
pub mod metasrv;
pub mod metrics;
#[cfg(feature = "mock")]
pub mod mocks;
pub mod node_excluder;
pub mod peer;
pub mod procedure;
pub mod pubsub;
pub mod region;
pub mod selector;
pub mod service;
pub mod state;
pub mod table_meta_alloc;
pub mod utils;
pub use crate::error::Result;

View File

@@ -36,7 +36,7 @@ use common_meta::leadership_notifier::{
LeadershipChangeNotifier, LeadershipChangeNotifierCustomizerRef,
};
use common_meta::node_expiry_listener::NodeExpiryListener;
use common_meta::peer::Peer;
use common_meta::peer::{Peer, PeerDiscoveryRef};
use common_meta::reconciliation::manager::ReconciliationManagerRef;
use common_meta::region_keeper::MemoryRegionKeeperRef;
use common_meta::region_registry::LeaderRegionRegistryRef;
@@ -57,10 +57,10 @@ use servers::http::HttpOptions;
use servers::tls::TlsOption;
use snafu::{OptionExt, ResultExt};
use store_api::storage::RegionId;
use table::metadata::TableId;
use tokio::sync::broadcast::error::RecvError;
use crate::cluster::MetaPeerClientRef;
use crate::discovery;
use crate::election::{Election, LeaderChangeMessage};
use crate::error::{
self, InitMetadataSnafu, KvBackendSnafu, Result, StartProcedureManagerSnafu,
@@ -68,7 +68,6 @@ use crate::error::{
};
use crate::failure_detector::PhiAccrualFailureDetectorOptions;
use crate::handler::{HeartbeatHandlerGroupBuilder, HeartbeatHandlerGroupRef};
use crate::lease::lookup_datanode_peer;
use crate::procedure::ProcedureManagerListenerAdapter;
use crate::procedure::region_migration::manager::RegionMigrationManagerRef;
use crate::procedure::wal_prune::manager::WalPruneTickerRef;
@@ -414,12 +413,7 @@ impl Display for SelectTarget {
#[derive(Clone)]
pub struct SelectorContext {
pub server_addr: String,
pub datanode_lease_secs: u64,
pub flownode_lease_secs: u64,
pub kv_backend: KvBackendRef,
pub meta_peer_client: MetaPeerClientRef,
pub table_id: Option<TableId>,
pub peer_discovery: PeerDiscoveryRef,
}
pub type SelectorRef = Arc<dyn Selector<Context = SelectorContext, Output = Vec<Peer>>>;
@@ -708,9 +702,9 @@ impl Metasrv {
/// Looks up a datanode peer by peer_id, returning it only when it's alive.
/// A datanode is considered alive when it's still within the lease period.
pub(crate) async fn lookup_datanode_peer(&self, peer_id: u64) -> Result<Option<Peer>> {
lookup_datanode_peer(
discovery::utils::alive_datanode(
self.meta_peer_client.as_ref(),
peer_id,
&self.meta_peer_client,
Duration::from_secs(distributed_time_constants::DATANODE_LEASE_SECS),
)
.await

View File

@@ -48,23 +48,24 @@ use common_procedure::ProcedureManagerRef;
use common_procedure::local::{LocalManager, ManagerConfig};
use common_telemetry::{info, warn};
use snafu::{ResultExt, ensure};
use store_api::storage::MAX_REGION_SEQ;
use crate::bootstrap::build_default_meta_peer_client;
use crate::cache_invalidator::MetasrvCacheInvalidator;
use crate::cluster::{MetaPeerClientBuilder, MetaPeerClientRef};
use crate::cluster::MetaPeerClientRef;
use crate::error::{self, BuildWalOptionsAllocatorSnafu, Result};
use crate::events::EventHandlerImpl;
use crate::flow_meta_alloc::FlowPeerAllocator;
use crate::greptimedb_telemetry::get_greptimedb_telemetry_task;
use crate::handler::failure_handler::RegionFailureHandler;
use crate::handler::flow_state_handler::FlowStateHandler;
use crate::handler::persist_stats_handler::PersistStatsHandler;
use crate::handler::region_lease_handler::{CustomizedRegionLeaseRenewerRef, RegionLeaseHandler};
use crate::handler::{HeartbeatHandlerGroupBuilder, HeartbeatMailbox, Pushers};
use crate::lease::MetaPeerLookupService;
use crate::metasrv::{
ElectionRef, FLOW_ID_SEQ, METASRV_DATA_DIR, Metasrv, MetasrvInfo, MetasrvOptions,
RegionStatAwareSelectorRef, SelectTarget, SelectorContext, SelectorRef, TABLE_ID_SEQ,
};
use crate::peer::MetasrvPeerAllocator;
use crate::procedure::region_migration::DefaultContextFactory;
use crate::procedure::region_migration::manager::RegionMigrationManager;
use crate::procedure::wal_prune::Context as WalPruneContext;
@@ -80,7 +81,6 @@ use crate::selector::round_robin::RoundRobinSelector;
use crate::service::mailbox::MailboxRef;
use crate::service::store::cached_kv::LeaderCachedKvBackend;
use crate::state::State;
use crate::table_meta_alloc::MetasrvPeerAllocator;
use crate::utils::insert_forwarder::InsertForwarder;
/// The time window for twcs compaction of the region stats table.
@@ -203,10 +203,9 @@ impl MetasrvBuilder {
let meta_peer_client = meta_peer_client
.unwrap_or_else(|| build_default_meta_peer_client(&election, &in_memory));
let peer_lookup_service = Arc::new(MetaPeerLookupService::new(meta_peer_client.clone()));
let event_inserter = Box::new(InsertForwarder::new(
peer_lookup_service.clone(),
meta_peer_client.clone(),
Some(InsertOptions {
ttl: options.event_recorder.ttl,
append_mode: true,
@@ -218,7 +217,7 @@ impl MetasrvBuilder {
event_inserter,
))));
let selector = selector.unwrap_or_else(|| Arc::new(LeaseBasedSelector::default()));
let selector = selector.unwrap_or_else(|| Arc::new(LeaseBasedSelector));
let pushers = Pushers::default();
let mailbox = build_mailbox(&kv_backend, &pushers);
let runtime_switch_manager = Arc::new(RuntimeSwitchManager::new(kv_backend.clone()));
@@ -237,12 +236,7 @@ impl MetasrvBuilder {
));
let selector_ctx = SelectorContext {
server_addr: options.grpc.server_addr.clone(),
datanode_lease_secs: distributed_time_constants::DATANODE_LEASE_SECS,
flownode_lease_secs: distributed_time_constants::FLOWNODE_LEASE_SECS,
kv_backend: kv_backend.clone(),
meta_peer_client: meta_peer_client.clone(),
table_id: None,
peer_discovery: meta_peer_client.clone(),
};
let wal_options_allocator = build_wal_options_allocator(&options.wal, kv_backend.clone())
@@ -257,10 +251,10 @@ impl MetasrvBuilder {
.step(10)
.build(),
);
let peer_allocator = Arc::new(MetasrvPeerAllocator::new(
selector_ctx.clone(),
selector.clone(),
));
let peer_allocator = Arc::new(
MetasrvPeerAllocator::new(selector_ctx.clone(), selector.clone())
.with_max_items(MAX_REGION_SEQ),
);
Arc::new(TableMetadataAllocator::with_peer_allocator(
sequence,
wal_options_allocator.clone(),
@@ -269,15 +263,13 @@ impl MetasrvBuilder {
});
let table_id_sequence = table_metadata_allocator.table_id_sequence();
let flow_selector = Arc::new(RoundRobinSelector::new(
SelectTarget::Flownode,
Arc::new(Vec::new()),
)) as SelectorRef;
let flow_selector =
Arc::new(RoundRobinSelector::new(SelectTarget::Flownode)) as SelectorRef;
let flow_metadata_allocator = {
// for now flownode just use round-robin selector
let flow_selector_ctx = selector_ctx.clone();
let peer_allocator = Arc::new(FlowPeerAllocator::new(
let peer_allocator = Arc::new(MetasrvPeerAllocator::new(
flow_selector_ctx,
flow_selector.clone(),
));
@@ -378,7 +370,7 @@ impl MetasrvBuilder {
supervisor_selector,
region_migration_manager.clone(),
runtime_switch_manager.clone(),
peer_lookup_service.clone(),
meta_peer_client.clone(),
leader_cached_kv_backend.clone(),
);
@@ -470,7 +462,7 @@ impl MetasrvBuilder {
let persist_region_stats_handler = if !options.stats_persistence.ttl.is_zero() {
let inserter = Box::new(InsertForwarder::new(
peer_lookup_service.clone(),
meta_peer_client.clone(),
Some(InsertOptions {
ttl: options.stats_persistence.ttl,
append_mode: true,
@@ -569,19 +561,6 @@ impl MetasrvBuilder {
}
}
fn build_default_meta_peer_client(
election: &Option<ElectionRef>,
in_memory: &ResettableKvBackendRef,
) -> MetaPeerClientRef {
MetaPeerClientBuilder::default()
.election(election.clone())
.in_memory(in_memory.clone())
.build()
.map(Arc::new)
// Safety: all required fields set at initialization
.unwrap()
}
fn build_mailbox(kv_backend: &KvBackendRef, pushers: &Pushers) -> MailboxRef {
let mailbox_sequence = SequenceBuilder::new("heartbeat_mailbox", kv_backend.clone())
.initial(1)

View File

@@ -1,32 +0,0 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use std::sync::Arc;
use common_meta::DatanodeId;
pub type NodeExcluderRef = Arc<dyn NodeExcluder>;
/// [NodeExcluder] is used to help decide whether some nodes should be excluded (out of consideration)
/// in certain situations. For example, in some node selectors.
pub trait NodeExcluder: Send + Sync {
/// Returns the excluded datanode ids.
fn excluded_datanode_ids(&self) -> &Vec<DatanodeId>;
}
impl NodeExcluder for Vec<DatanodeId> {
fn excluded_datanode_ids(&self) -> &Vec<DatanodeId> {
self
}
}

View File

@@ -16,25 +16,36 @@ use std::collections::HashSet;
use async_trait::async_trait;
use common_error::ext::BoxedError;
use common_meta::ddl::table_meta::PeerAllocator;
use common_meta::error::{ExternalSnafu, Result as MetaResult};
use common_meta::peer::Peer;
use common_meta::peer::{Peer, PeerAllocator};
use snafu::{ResultExt, ensure};
use store_api::storage::MAX_REGION_SEQ;
use crate::error::{self, Result, TooManyPartitionsSnafu};
use crate::metasrv::{SelectTarget, SelectorContext, SelectorRef};
use crate::error::{Result, TooManyPartitionsSnafu};
use crate::metasrv::{SelectorContext, SelectorRef};
use crate::selector::SelectorOptions;
pub struct MetasrvPeerAllocator {
ctx: SelectorContext,
selector: SelectorRef,
max_items: Option<u32>,
}
impl MetasrvPeerAllocator {
/// Creates a new [`MetasrvPeerAllocator`] with the given [`SelectorContext`] and [`SelectorRef`].
pub fn new(ctx: SelectorContext, selector: SelectorRef) -> Self {
Self { ctx, selector }
Self {
ctx,
selector,
max_items: None,
}
}
pub fn with_max_items(self, max_items: u32) -> Self {
Self {
ctx: self.ctx,
selector: self.selector,
max_items: Some(max_items),
}
}
/// Allocates a specified number (by `regions`) of [`Peer`] instances based on the number of
@@ -43,33 +54,24 @@ impl MetasrvPeerAllocator {
/// This method is mainly a wrapper around the [`SelectorRef`]::`select` method. There is
/// no guarantee that how the returned peers are used, like whether they are from the same
/// table or not. So this method isn't idempotent.
async fn alloc(&self, regions: usize) -> Result<Vec<Peer>> {
ensure!(regions <= MAX_REGION_SEQ as usize, TooManyPartitionsSnafu);
async fn alloc(&self, min_required_items: usize) -> Result<Vec<Peer>> {
if let Some(max_items) = self.max_items {
ensure!(
min_required_items <= max_items as usize,
TooManyPartitionsSnafu
);
}
let mut peers = self
.selector
self.selector
.select(
&self.ctx,
SelectorOptions {
min_required_items: regions,
min_required_items,
allow_duplication: true,
exclude_peer_ids: HashSet::new(),
},
)
.await?;
ensure!(
peers.len() >= regions,
error::NoEnoughAvailableNodeSnafu {
required: regions,
available: peers.len(),
select_target: SelectTarget::Datanode
}
);
peers.truncate(regions);
Ok(peers)
.await
}
}

View File

@@ -40,7 +40,7 @@ use common_meta::key::table_info::TableInfoValue;
use common_meta::key::table_route::TableRouteValue;
use common_meta::key::topic_region::{ReplayCheckpoint, TopicRegionKey};
use common_meta::key::{DeserializedValueWithBytes, TableMetadataManagerRef};
use common_meta::kv_backend::ResettableKvBackendRef;
use common_meta::kv_backend::{KvBackendRef, ResettableKvBackendRef};
use common_meta::lock_key::{CatalogLock, RegionLock, SchemaLock};
use common_meta::peer::Peer;
use common_meta::region_keeper::{MemoryRegionKeeperRef, OperatingRegionGuard};
@@ -336,7 +336,7 @@ impl ContextFactory for DefaultContextFactory {
pub struct Context {
persistent_ctx: Arc<PersistentContext>,
volatile_ctx: VolatileContext,
in_memory: ResettableKvBackendRef,
in_memory: KvBackendRef,
table_metadata_manager: TableMetadataManagerRef,
opening_region_keeper: MemoryRegionKeeperRef,
region_failure_detector_controller: RegionFailureDetectorControllerRef,

View File

@@ -28,9 +28,9 @@ use serde::{Deserialize, Serialize};
use snafu::{OptionExt, ResultExt};
use tokio::time::{Instant, sleep};
use crate::discovery::utils::find_datanode_lease_value;
use crate::error::{self, Result};
use crate::handler::HeartbeatMailbox;
use crate::lease::find_datanode_lease_value;
use crate::procedure::region_migration::update_metadata::UpdateMetadata;
use crate::procedure::region_migration::upgrade_candidate_region::UpgradeCandidateRegion;
use crate::procedure::region_migration::{Context, State};
@@ -241,7 +241,7 @@ impl DowngradeLeaderRegion {
async fn update_leader_region_lease_deadline(&self, ctx: &mut Context) {
let leader = &ctx.persistent_ctx.from_peer;
let last_connection_at = match find_datanode_lease_value(leader.id, &ctx.in_memory).await {
let last_connection_at = match find_datanode_lease_value(&ctx.in_memory, leader.id).await {
Ok(lease_value) => lease_value.map(|lease_value| lease_value.timestamp_millis),
Err(err) => {
error!(err; "Failed to find datanode lease value for datanode: {}, during region migration, region: {}", leader, ctx.persistent_ctx.region_id);

View File

@@ -26,7 +26,7 @@ use common_meta::key::table_route::{TableRouteKey, TableRouteValue};
use common_meta::key::{MetadataKey, MetadataValue};
use common_meta::kv_backend::KvBackendRef;
use common_meta::leadership_notifier::LeadershipChangeListener;
use common_meta::peer::{Peer, PeerLookupServiceRef};
use common_meta::peer::{Peer, PeerResolverRef};
use common_meta::range_stream::{DEFAULT_PAGE_SIZE, PaginationStream};
use common_meta::rpc::store::RangeRequest;
use common_runtime::JoinHandle;
@@ -285,8 +285,8 @@ pub struct RegionSupervisor {
region_migration_manager: RegionMigrationManagerRef,
/// The maintenance mode manager.
runtime_switch_manager: RuntimeSwitchManagerRef,
/// Peer lookup service
peer_lookup: PeerLookupServiceRef,
/// Peer resolver
peer_resolver: PeerResolverRef,
/// The kv backend.
kv_backend: KvBackendRef,
}
@@ -359,7 +359,7 @@ impl RegionSupervisor {
selector: RegionSupervisorSelector,
region_migration_manager: RegionMigrationManagerRef,
runtime_switch_manager: RuntimeSwitchManagerRef,
peer_lookup: PeerLookupServiceRef,
peer_resolver: PeerResolverRef,
kv_backend: KvBackendRef,
) -> Self {
Self {
@@ -370,7 +370,7 @@ impl RegionSupervisor {
selector,
region_migration_manager,
runtime_switch_manager,
peer_lookup,
peer_resolver,
kv_backend,
}
}
@@ -633,7 +633,7 @@ impl RegionSupervisor {
) -> Result<Vec<(RegionMigrationProcedureTask, u32)>> {
let mut tasks = Vec::with_capacity(regions.len());
let from_peer = self
.peer_lookup
.peer_resolver
.datanode(from_peer_id)
.await
.ok()
@@ -778,7 +778,7 @@ pub(crate) mod tests {
use common_meta::key::{TableMetadataManager, runtime_switch};
use common_meta::peer::Peer;
use common_meta::rpc::router::{Region, RegionRoute};
use common_meta::test_util::NoopPeerLookupService;
use common_meta::test_util::NoopPeerResolver;
use common_telemetry::info;
use common_time::util::current_time_millis;
use rand::Rng;
@@ -807,7 +807,7 @@ pub(crate) mod tests {
));
let runtime_switch_manager =
Arc::new(runtime_switch::RuntimeSwitchManager::new(env.kv_backend()));
let peer_lookup = Arc::new(NoopPeerLookupService);
let peer_resolver = Arc::new(NoopPeerResolver);
let (tx, rx) = RegionSupervisor::channel();
let kv_backend = env.kv_backend();
@@ -819,7 +819,7 @@ pub(crate) mod tests {
RegionSupervisorSelector::NaiveSelector(selector),
region_migration_manager,
runtime_switch_manager,
peer_lookup,
peer_resolver,
kv_backend,
),
tx,

View File

@@ -12,38 +12,19 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use std::collections::HashSet;
use std::sync::Arc;
use std::time::Duration;
use common_meta::peer::Peer;
use snafu::ResultExt;
use crate::error::Result;
use crate::lease;
use crate::discovery::utils::is_datanode_accept_ingest_workload;
use crate::error::{ListActiveDatanodesSnafu, Result};
use crate::metasrv::SelectorContext;
use crate::node_excluder::NodeExcluderRef;
use crate::selector::common::{choose_items, filter_out_excluded_peers};
use crate::selector::weighted_choose::{RandomWeightedChoose, WeightedItem};
use crate::selector::{Selector, SelectorOptions};
/// Select all alive datanodes based using a random weighted choose.
pub struct LeaseBasedSelector {
node_excluder: NodeExcluderRef,
}
impl LeaseBasedSelector {
pub fn new(node_excluder: NodeExcluderRef) -> Self {
Self { node_excluder }
}
}
impl Default for LeaseBasedSelector {
fn default() -> Self {
Self {
node_excluder: Arc::new(Vec::new()),
}
}
}
#[derive(Default)]
pub struct LeaseBasedSelector;
#[async_trait::async_trait]
impl Selector for LeaseBasedSelector {
@@ -52,34 +33,23 @@ impl Selector for LeaseBasedSelector {
async fn select(&self, ctx: &Self::Context, opts: SelectorOptions) -> Result<Self::Output> {
// 1. get alive datanodes.
let lease_kvs = lease::alive_datanodes(
&ctx.meta_peer_client,
Duration::from_secs(ctx.datanode_lease_secs),
)
.with_condition(lease::is_datanode_accept_ingest_workload)
.await?;
let alive_datanodes = ctx
.peer_discovery
.active_datanodes(Some(is_datanode_accept_ingest_workload))
.await
.context(ListActiveDatanodesSnafu)?;
// 2. compute weight array, but the weight of each item is the same.
let mut weight_array = lease_kvs
let mut weight_array = alive_datanodes
.into_iter()
.map(|(k, v)| WeightedItem {
item: Peer {
id: k.node_id,
addr: v.node_addr.clone(),
},
.map(|p| WeightedItem {
item: p,
weight: 1.0,
})
.collect();
// 3. choose peers by weight_array.
let mut exclude_peer_ids = self
.node_excluder
.excluded_datanode_ids()
.iter()
.cloned()
.collect::<HashSet<_>>();
exclude_peer_ids.extend(opts.exclude_peer_ids.iter());
filter_out_excluded_peers(&mut weight_array, &exclude_peer_ids);
filter_out_excluded_peers(&mut weight_array, &opts.exclude_peer_ids);
let mut weighted_choose = RandomWeightedChoose::new(weight_array);
let selected = choose_items(&opts, &mut weighted_choose)?;

View File

@@ -12,47 +12,32 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use std::collections::{HashMap, HashSet};
use std::sync::Arc;
use std::time::Duration;
use std::collections::HashMap;
use common_meta::datanode::{DatanodeStatKey, DatanodeStatValue};
use common_meta::key::TableMetadataManager;
use common_meta::peer::Peer;
use common_meta::rpc::router::find_leaders;
use common_telemetry::{debug, info};
use common_telemetry::debug;
use snafu::ResultExt;
use table::metadata::TableId;
use crate::error::{self, Result};
use crate::key::{DatanodeLeaseKey, LeaseValue};
use crate::lease;
use crate::cluster::MetaPeerClientRef;
use crate::discovery::utils::is_datanode_accept_ingest_workload;
use crate::error::{ListActiveDatanodesSnafu, Result};
use crate::metasrv::SelectorContext;
use crate::node_excluder::NodeExcluderRef;
use crate::selector::common::{choose_items, filter_out_excluded_peers};
use crate::selector::weight_compute::{RegionNumsBasedWeightCompute, WeightCompute};
use crate::selector::weight_compute::WeightCompute;
use crate::selector::weighted_choose::RandomWeightedChoose;
use crate::selector::{Selector, SelectorOptions};
pub struct LoadBasedSelector<C> {
weight_compute: C,
node_excluder: NodeExcluderRef,
meta_peer_client: MetaPeerClientRef,
}
impl<C> LoadBasedSelector<C> {
pub fn new(weight_compute: C, node_excluder: NodeExcluderRef) -> Self {
pub fn new(weight_compute: C, meta_peer_client: MetaPeerClientRef) -> Self {
Self {
weight_compute,
node_excluder,
}
}
}
impl Default for LoadBasedSelector<RegionNumsBasedWeightCompute> {
fn default() -> Self {
Self {
weight_compute: RegionNumsBasedWeightCompute,
node_excluder: Arc::new(Vec::new()),
meta_peer_client,
}
}
}
@@ -67,50 +52,28 @@ where
async fn select(&self, ctx: &Self::Context, opts: SelectorOptions) -> Result<Self::Output> {
// 1. get alive datanodes.
let lease_kvs = lease::alive_datanodes(
&ctx.meta_peer_client,
Duration::from_secs(ctx.datanode_lease_secs),
)
.with_condition(lease::is_datanode_accept_ingest_workload)
.await?;
let alive_datanodes = ctx
.peer_discovery
.active_datanodes(Some(is_datanode_accept_ingest_workload))
.await
.context(ListActiveDatanodesSnafu)?;
// 2. get stat kvs and filter out expired datanodes.
let stat_keys = lease_kvs.keys().map(|k| k.into()).collect();
let stat_keys = alive_datanodes
.iter()
.map(|k| DatanodeStatKey { node_id: k.id })
.collect();
let stat_kvs = filter_out_expired_datanode(
ctx.meta_peer_client.get_dn_stat_kvs(stat_keys).await?,
&lease_kvs,
self.meta_peer_client.get_dn_stat_kvs(stat_keys).await?,
&alive_datanodes,
);
// 3. try to make the regions of a table distributed on different datanodes as much as possible.
let stat_kvs = if let Some(table_id) = ctx.table_id {
let table_metadata_manager = TableMetadataManager::new(ctx.kv_backend.clone());
let leader_peer_ids = get_leader_peer_ids(&table_metadata_manager, table_id).await?;
let filter_result = filter_out_datanode_by_table(&stat_kvs, &leader_peer_ids);
if filter_result.is_empty() {
info!(
"The regions of the table cannot be allocated to completely different datanodes, table id: {}.",
table_id
);
stat_kvs
} else {
filter_result
}
} else {
stat_kvs
};
// 4. compute weight array.
// 3. compute weight array.
let mut weight_array = self.weight_compute.compute(&stat_kvs);
// 4. filter out excluded peers.
filter_out_excluded_peers(&mut weight_array, &opts.exclude_peer_ids);
// 5. choose peers by weight_array.
let mut exclude_peer_ids = self
.node_excluder
.excluded_datanode_ids()
.iter()
.cloned()
.collect::<HashSet<_>>();
exclude_peer_ids.extend(opts.exclude_peer_ids.iter());
filter_out_excluded_peers(&mut weight_array, &exclude_peer_ids);
let mut weighted_choose = RandomWeightedChoose::new(weight_array);
let selected = choose_items(&opts, &mut weighted_choose)?;
@@ -125,61 +88,21 @@ where
fn filter_out_expired_datanode(
mut stat_kvs: HashMap<DatanodeStatKey, DatanodeStatValue>,
lease_kvs: &HashMap<DatanodeLeaseKey, LeaseValue>,
datanodes: &[Peer],
) -> HashMap<DatanodeStatKey, DatanodeStatValue> {
lease_kvs
datanodes
.iter()
.filter_map(|(lease_k, _)| stat_kvs.remove_entry(&lease_k.into()))
.filter_map(|p| stat_kvs.remove_entry(&DatanodeStatKey { node_id: p.id }))
.collect()
}
fn filter_out_datanode_by_table(
stat_kvs: &HashMap<DatanodeStatKey, DatanodeStatValue>,
leader_peer_ids: &[u64],
) -> HashMap<DatanodeStatKey, DatanodeStatValue> {
stat_kvs
.iter()
.filter(|(stat_k, _)| leader_peer_ids.contains(&stat_k.node_id))
.map(|(stat_k, stat_v)| (*stat_k, stat_v.clone()))
.collect()
}
async fn get_leader_peer_ids(
table_metadata_manager: &TableMetadataManager,
table_id: TableId,
) -> Result<Vec<u64>> {
table_metadata_manager
.table_route_manager()
.table_route_storage()
.get(table_id)
.await
.context(error::TableMetadataManagerSnafu)
.map(|route| {
route.map_or_else(
|| Ok(Vec::new()),
|route| {
let region_routes = route
.region_routes()
.context(error::UnexpectedLogicalRouteTableSnafu { err_msg: "" })?;
Ok(find_leaders(region_routes)
.into_iter()
.map(|peer| peer.id)
.collect())
},
)
})?
}
#[cfg(test)]
mod tests {
use std::collections::HashMap;
use api::v1::meta::DatanodeWorkloads;
use api::v1::meta::heartbeat_request::NodeWorkloads;
use common_meta::datanode::{DatanodeStatKey, DatanodeStatValue};
use common_workload::DatanodeWorkloadType;
use common_meta::peer::Peer;
use crate::key::{DatanodeLeaseKey, LeaseValue};
use crate::selector::load_based::filter_out_expired_datanode;
#[test]
@@ -198,18 +121,7 @@ mod tests {
DatanodeStatValue { stats: vec![] },
);
let mut lease_kvs = HashMap::new();
lease_kvs.insert(
DatanodeLeaseKey { node_id: 1 },
LeaseValue {
timestamp_millis: 0,
node_addr: "127.0.0.1:3002".to_string(),
workloads: NodeWorkloads::Datanode(DatanodeWorkloads {
types: vec![DatanodeWorkloadType::Hybrid.to_i32()],
}),
},
);
let lease_kvs = vec![Peer::new(1, "127.0.0.1:3002".to_string())];
let alive_stat_kvs = filter_out_expired_datanode(stat_kvs, &lease_kvs);
assert_eq!(1, alive_stat_kvs.len());

View File

@@ -12,18 +12,16 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use std::collections::HashSet;
use std::sync::Arc;
use std::sync::atomic::AtomicUsize;
use std::time::Duration;
use common_meta::peer::Peer;
use snafu::ensure;
use snafu::{ResultExt, ensure};
use crate::error::{NoEnoughAvailableNodeSnafu, Result};
use crate::lease;
use crate::discovery::utils::is_datanode_accept_ingest_workload;
use crate::error::{
ListActiveDatanodesSnafu, ListActiveFlownodesSnafu, NoEnoughAvailableNodeSnafu, Result,
};
use crate::metasrv::{SelectTarget, SelectorContext};
use crate::node_excluder::NodeExcluderRef;
use crate::selector::{Selector, SelectorOptions};
/// Round-robin selector that returns the next peer in the list in sequence.
@@ -36,7 +34,6 @@ use crate::selector::{Selector, SelectorOptions};
pub struct RoundRobinSelector {
select_target: SelectTarget,
counter: AtomicUsize,
node_excluder: NodeExcluderRef,
}
impl Default for RoundRobinSelector {
@@ -44,16 +41,14 @@ impl Default for RoundRobinSelector {
Self {
select_target: SelectTarget::Datanode,
counter: AtomicUsize::new(0),
node_excluder: Arc::new(Vec::new()),
}
}
}
impl RoundRobinSelector {
pub fn new(select_target: SelectTarget, node_excluder: NodeExcluderRef) -> Self {
pub fn new(select_target: SelectTarget) -> Self {
Self {
select_target,
node_excluder,
..Default::default()
}
}
@@ -62,41 +57,23 @@ impl RoundRobinSelector {
let mut peers = match self.select_target {
SelectTarget::Datanode => {
// 1. get alive datanodes.
let lease_kvs = lease::alive_datanodes(
&ctx.meta_peer_client,
Duration::from_secs(ctx.datanode_lease_secs),
)
.with_condition(lease::is_datanode_accept_ingest_workload)
.await?;
let alive_datanodes = ctx
.peer_discovery
.active_datanodes(Some(is_datanode_accept_ingest_workload))
.await
.context(ListActiveDatanodesSnafu)?;
let mut exclude_peer_ids = self
.node_excluder
.excluded_datanode_ids()
.iter()
.cloned()
.collect::<HashSet<_>>();
exclude_peer_ids.extend(opts.exclude_peer_ids.iter());
// 2. map into peers
lease_kvs
// 2. filter out excluded datanodes.
alive_datanodes
.into_iter()
.filter(|(k, _)| !exclude_peer_ids.contains(&k.node_id))
.map(|(k, v)| Peer::new(k.node_id, v.node_addr))
.collect::<Vec<_>>()
}
SelectTarget::Flownode => {
// 1. get alive flownodes.
let lease_kvs = lease::alive_flownodes(
&ctx.meta_peer_client,
Duration::from_secs(ctx.flownode_lease_secs),
)
.await?;
// 2. map into peers
lease_kvs
.into_iter()
.map(|(k, v)| Peer::new(k.node_id, v.node_addr))
.filter(|p| !opts.exclude_peer_ids.contains(&p.id))
.collect::<Vec<_>>()
}
SelectTarget::Flownode => ctx
.peer_discovery
.active_flownodes(None)
.await
.context(ListActiveFlownodesSnafu)?,
};
ensure!(
@@ -141,12 +118,15 @@ mod test {
use std::collections::HashSet;
use super::*;
use crate::test_util::{create_selector_context, put_datanodes};
use crate::test_util::{create_meta_peer_client, put_datanodes};
#[tokio::test]
async fn test_round_robin_selector() {
let selector = RoundRobinSelector::default();
let ctx = create_selector_context();
let meta_peer_client = create_meta_peer_client();
let ctx = SelectorContext {
peer_discovery: meta_peer_client.clone(),
};
// add three nodes
let peer1 = Peer {
id: 2,
@@ -161,7 +141,7 @@ mod test {
addr: "node3".to_string(),
};
let peers = vec![peer1.clone(), peer2.clone(), peer3.clone()];
put_datanodes(&ctx.meta_peer_client, peers).await;
put_datanodes(&meta_peer_client, peers).await;
let peers = selector
.select(
@@ -197,8 +177,11 @@ mod test {
#[tokio::test]
async fn test_round_robin_selector_with_exclude_peer_ids() {
let selector = RoundRobinSelector::new(SelectTarget::Datanode, Arc::new(vec![5]));
let ctx = create_selector_context();
let selector = RoundRobinSelector::new(SelectTarget::Datanode);
let meta_peer_client = create_meta_peer_client();
let ctx = SelectorContext {
peer_discovery: meta_peer_client.clone(),
};
// add three nodes
let peer1 = Peer {
id: 2,
@@ -213,7 +196,7 @@ mod test {
addr: "node3".to_string(),
};
put_datanodes(
&ctx.meta_peer_client,
&meta_peer_client,
vec![peer1.clone(), peer2.clone(), peer3.clone()],
)
.await;
@@ -224,7 +207,7 @@ mod test {
SelectorOptions {
min_required_items: 1,
allow_duplication: true,
exclude_peer_ids: HashSet::from([2]),
exclude_peer_ids: HashSet::from([2, 5]),
},
)
.await

View File

@@ -14,7 +14,6 @@
use std::sync::Arc;
use common_meta::distributed_time_constants::{FLOWNODE_LEASE_SECS, REGION_LEASE_SECS};
use common_meta::kv_backend::memory::MemoryKvBackend;
use common_meta::peer::Peer;
use rand::prelude::SliceRandom;
@@ -35,12 +34,7 @@ pub fn new_test_selector_context() -> SelectorContext {
.unwrap();
SelectorContext {
server_addr: "127.0.0.1:3002".to_string(),
datanode_lease_secs: REGION_LEASE_SECS,
flownode_lease_secs: FLOWNODE_LEASE_SECS,
kv_backend,
meta_peer_client,
table_id: None,
peer_discovery: meta_peer_client,
}
}

View File

@@ -13,7 +13,6 @@
// limitations under the License.
use std::collections::HashMap;
use std::time::Duration;
use axum::Json;
use axum::extract::State;
@@ -23,9 +22,9 @@ use snafu::ResultExt;
use tonic::codegen::http;
use crate::cluster::MetaPeerClientRef;
use crate::discovery::lease::{LeaseValueAccessor, LeaseValueType};
use crate::error::{self, Result};
use crate::key::{DatanodeLeaseKey, LeaseValue};
use crate::lease;
use crate::service::admin::HttpHandler;
use crate::service::admin::util::ErrorHandler;
@@ -36,12 +35,17 @@ pub struct NodeLeaseHandler {
impl NodeLeaseHandler {
async fn get_node_lease(&self) -> Result<LeaseValues> {
let leases =
lease::alive_datanodes(&self.meta_peer_client, Duration::from_secs(u64::MAX)).await?;
let leases = self
.meta_peer_client
.lease_values(LeaseValueType::Datanode)
.await?
.into_iter()
.collect::<HashMap<_, _>>();
let leases = leases
.into_iter()
.map(|(k, v)| HumanLease {
name: k,
name: DatanodeLeaseKey { node_id: k },
human_time: common_time::Timestamp::new_millisecond(v.timestamp_millis)
.to_local_string(),
lease: v,

View File

@@ -24,7 +24,6 @@ use common_workload::DatanodeWorkloadType;
use crate::cluster::{MetaPeerClientBuilder, MetaPeerClientRef};
use crate::key::{DatanodeLeaseKey, LeaseValue};
use crate::metasrv::SelectorContext;
pub(crate) fn new_region_route(region_id: u64, peers: &[Peer], leader_node: u64) -> RegionRoute {
let region = Region {
@@ -54,22 +53,6 @@ pub(crate) fn create_meta_peer_client() -> MetaPeerClientRef {
.unwrap()
}
/// Builds and returns a [`SelectorContext`]. To access its inner state,
/// use `memory_backend` on [`MetaPeerClientRef`].
pub(crate) fn create_selector_context() -> SelectorContext {
let meta_peer_client = create_meta_peer_client();
let in_memory = meta_peer_client.memory_backend();
SelectorContext {
datanode_lease_secs: 10,
flownode_lease_secs: 10,
server_addr: "127.0.0.1:3002".to_string(),
kv_backend: in_memory,
meta_peer_client,
table_id: None,
}
}
pub(crate) async fn put_datanodes(meta_peer_client: &MetaPeerClientRef, datanodes: Vec<Peer>) {
let backend = meta_peer_client.memory_backend();
for datanode in datanodes {

View File

@@ -19,28 +19,28 @@ use client::error::{ExternalSnafu, Result as ClientResult};
use client::inserter::{Context, InsertOptions, Inserter};
use client::{Client, Database};
use common_error::ext::BoxedError;
use common_meta::peer::PeerLookupServiceRef;
use common_meta::peer::PeerDiscoveryRef;
use common_telemetry::{debug, warn};
use snafu::{ResultExt, ensure};
use tokio::sync::RwLock;
use crate::error::{LookupFrontendsSnafu, NoAvailableFrontendSnafu};
use crate::error::{ListActiveFrontendsSnafu, NoAvailableFrontendSnafu};
pub type InsertForwarderRef = Arc<InsertForwarder>;
/// [`InsertForwarder`] is the inserter for the metasrv.
/// It forwards insert requests to available frontend instances.
pub struct InsertForwarder {
peer_lookup_service: PeerLookupServiceRef,
peer_discovery: PeerDiscoveryRef,
client: RwLock<Option<Client>>,
options: Option<InsertOptions>,
}
impl InsertForwarder {
/// Creates a new InsertForwarder with the given peer lookup service.
pub fn new(peer_lookup_service: PeerLookupServiceRef, options: Option<InsertOptions>) -> Self {
pub fn new(peer_discovery: PeerDiscoveryRef, options: Option<InsertOptions>) -> Self {
Self {
peer_lookup_service,
peer_discovery,
client: RwLock::new(None),
options,
}
@@ -49,10 +49,10 @@ impl InsertForwarder {
/// Builds a new client.
async fn build_client(&self) -> crate::error::Result<Client> {
let frontends = self
.peer_lookup_service
.peer_discovery
.active_frontends()
.await
.context(LookupFrontendsSnafu)?;
.context(ListActiveFrontendsSnafu)?;
ensure!(!frontends.is_empty(), NoAvailableFrontendSnafu);

View File

@@ -56,6 +56,7 @@ use frontend::server::Services;
use hyper_util::rt::TokioIo;
use meta_client::client::MetaClientBuilder;
use meta_srv::cluster::MetaPeerClientRef;
use meta_srv::discovery;
use meta_srv::metasrv::{Metasrv, MetasrvOptions, SelectorRef};
use meta_srv::mocks::MockInfo;
use object_store::config::ObjectStoreConfig;
@@ -316,11 +317,14 @@ impl GreptimeDbClusterBuilder {
expected_datanodes: usize,
) {
for _ in 0..100 {
let alive_datanodes =
meta_srv::lease::alive_datanodes(meta_peer_client, Duration::from_secs(u64::MAX))
.await
.unwrap()
.len();
let alive_datanodes = discovery::utils::alive_datanodes(
meta_peer_client.as_ref(),
Duration::from_secs(u64::MAX),
None,
)
.await
.unwrap()
.len();
if alive_datanodes == expected_datanodes {
return;
}