diff --git a/src/meta-srv/src/bootstrap.rs b/src/meta-srv/src/bootstrap.rs index ea55b06e77..f0da4120b2 100644 --- a/src/meta-srv/src/bootstrap.rs +++ b/src/meta-srv/src/bootstrap.rs @@ -30,7 +30,6 @@ use tokio::sync::mpsc::{self, Receiver, Sender}; use tokio_stream::wrappers::TcpListenerStream; use tonic::transport::server::Router; -use crate::cluster::MetaPeerClientBuilder; use crate::election::etcd::EtcdElection; use crate::lock::etcd::EtcdLock; use crate::lock::memory::MemLock; @@ -172,17 +171,8 @@ pub async fn build_meta_srv(opts: &MetaSrvOptions) -> Result { let in_memory = Arc::new(MemStore::default()) as ResettableKvStoreRef; - let meta_peer_client = MetaPeerClientBuilder::default() - .election(election.clone()) - .in_memory(in_memory.clone()) - .build() - // Safety: all required fields set at initialization - .unwrap(); - let selector = match opts.selector { - SelectorType::LoadBased => Arc::new(LoadBasedSelector { - meta_peer_client: meta_peer_client.clone(), - }) as SelectorRef, + SelectorType::LoadBased => Arc::new(LoadBasedSelector) as SelectorRef, SelectorType::LeaseBased => Arc::new(LeaseBasedSelector) as SelectorRef, }; @@ -192,7 +182,6 @@ pub async fn build_meta_srv(opts: &MetaSrvOptions) -> Result { .in_memory(in_memory) .selector(selector) .election(election) - .meta_peer_client(meta_peer_client) .lock(lock) .build() .await diff --git a/src/meta-srv/src/cluster.rs b/src/meta-srv/src/cluster.rs index 856cca567c..fa25a21eab 100644 --- a/src/meta-srv/src/cluster.rs +++ b/src/meta-srv/src/cluster.rs @@ -13,6 +13,7 @@ // limitations under the License. use std::collections::HashMap; +use std::sync::Arc; use std::time::Duration; use api::v1::meta::cluster_client::ClusterClient; @@ -30,7 +31,9 @@ use crate::metasrv::ElectionRef; use crate::service::store::kv::ResettableKvStoreRef; use crate::{error, util}; -#[derive(Builder, Clone)] +pub type MetaPeerClientRef = Arc; + +#[derive(Builder)] pub struct MetaPeerClient { election: Option, in_memory: ResettableKvStoreRef, diff --git a/src/meta-srv/src/election.rs b/src/meta-srv/src/election.rs index 77401e1278..8db9ee6477 100644 --- a/src/meta-srv/src/election.rs +++ b/src/meta-srv/src/election.rs @@ -21,8 +21,9 @@ use tokio::sync::broadcast::Receiver; use crate::error::Result; -pub const LEASE_SECS: i64 = 3; -pub const KEEP_ALIVE_PERIOD_SECS: u64 = LEASE_SECS as u64 * 2 / 3; +pub const LEASE_SECS: i64 = 5; +// In a lease, there are two opportunities for renewal. +pub const KEEP_ALIVE_PERIOD_SECS: u64 = LEASE_SECS as u64 / 2; pub const ELECTION_KEY: &str = "__meta_srv_election"; #[derive(Clone)] diff --git a/src/meta-srv/src/error.rs b/src/meta-srv/src/error.rs index c62f2e65d7..1e5c56f809 100644 --- a/src/meta-srv/src/error.rs +++ b/src/meta-srv/src/error.rs @@ -221,9 +221,6 @@ pub enum Error { location: Location, }, - #[snafu(display("MetaSrv has no meta peer client"))] - NoMetaPeerClient { location: Location }, - #[snafu(display("Invalid http body, source: {}", source))] InvalidHttpBody { source: http::Error, @@ -399,7 +396,6 @@ impl ErrorExt for Error { | Error::Range { .. } | Error::ResponseHeaderNotFound { .. } | Error::IsNotLeader { .. } - | Error::NoMetaPeerClient { .. } | Error::InvalidHttpBody { .. } | Error::Lock { .. } | Error::Unlock { .. } diff --git a/src/meta-srv/src/handler/keep_lease_handler.rs b/src/meta-srv/src/handler/keep_lease_handler.rs index cc8a67500d..3d1a1ad307 100644 --- a/src/meta-srv/src/handler/keep_lease_handler.rs +++ b/src/meta-srv/src/handler/keep_lease_handler.rs @@ -12,46 +12,17 @@ // See the License for the specific language governing permissions and // limitations under the License. -use api::v1::meta::{BatchPutRequest, HeartbeatRequest, KeyValue, Role}; +use api::v1::meta::{HeartbeatRequest, PutRequest, Role}; use common_telemetry::{trace, warn}; use common_time::util as time_util; -use tokio::sync::mpsc::{self, Sender}; use crate::error::Result; use crate::handler::{HeartbeatAccumulator, HeartbeatHandler}; use crate::keys::{LeaseKey, LeaseValue}; use crate::metasrv::Context; -use crate::service::store::kv::KvStoreRef; -pub struct KeepLeaseHandler { - tx: Sender, -} - -impl KeepLeaseHandler { - pub fn new(kv_store: KvStoreRef) -> Self { - let (tx, mut rx) = mpsc::channel(1024); - let _handle = common_runtime::spawn_bg(async move { - while let Some(kv) = rx.recv().await { - let mut kvs = vec![kv]; - - while let Ok(kv) = rx.try_recv() { - kvs.push(kv); - } - - let batch_put = BatchPutRequest { - kvs, - ..Default::default() - }; - - if let Err(err) = kv_store.batch_put(batch_put).await { - warn!("Failed to write lease KVs, {err}"); - } - } - }); - - Self { tx } - } -} +#[derive(Default)] +pub struct KeepLeaseHandler; #[async_trait::async_trait] impl HeartbeatHandler for KeepLeaseHandler { @@ -62,28 +33,35 @@ impl HeartbeatHandler for KeepLeaseHandler { async fn handle( &self, req: &HeartbeatRequest, - _ctx: &mut Context, + ctx: &mut Context, _acc: &mut HeartbeatAccumulator, ) -> Result<()> { let HeartbeatRequest { header, peer, .. } = req; - if let Some(peer) = &peer { - let key = LeaseKey { - cluster_id: header.as_ref().map_or(0, |h| h.cluster_id), - node_id: peer.id, - }; - let value = LeaseValue { - timestamp_millis: time_util::current_time_millis(), - node_addr: peer.addr.clone(), - }; + let Some(peer) = &peer else { return Ok(()); }; - trace!("Receive a heartbeat: {key:?}, {value:?}"); + let key = LeaseKey { + cluster_id: header.as_ref().map_or(0, |h| h.cluster_id), + node_id: peer.id, + }; + let value = LeaseValue { + timestamp_millis: time_util::current_time_millis(), + node_addr: peer.addr.clone(), + }; - let key = key.try_into()?; - let value = value.try_into()?; + trace!("Receive a heartbeat: {key:?}, {value:?}"); - if let Err(err) = self.tx.send(KeyValue { key, value }).await { - warn!("Failed to send lease KV to writer, peer: {peer:?}, {err}"); - } + let key = key.try_into()?; + let value = value.try_into()?; + let put_req = PutRequest { + key, + value, + ..Default::default() + }; + + let res = ctx.in_memory.put(put_req).await; + + if let Err(err) = res { + warn!("Failed to update lease KV, peer: {peer:?}, {err}"); } Ok(()) diff --git a/src/meta-srv/src/handler/persist_stats_handler.rs b/src/meta-srv/src/handler/persist_stats_handler.rs index 7bc040c798..f8701fef5c 100644 --- a/src/meta-srv/src/handler/persist_stats_handler.rs +++ b/src/meta-srv/src/handler/persist_stats_handler.rs @@ -12,7 +12,10 @@ // See the License for the specific language governing permissions and // limitations under the License. +use std::cmp::Ordering; + use api::v1::meta::{HeartbeatRequest, PutRequest, Role}; +use common_telemetry::warn; use dashmap::DashMap; use crate::error::Result; @@ -90,13 +93,18 @@ impl HeartbeatHandler for PersistStatsHandler { let epoch_stats = entry.value_mut(); let refresh = if let Some(epoch) = epoch_stats.epoch() { - // This node may have been redeployed. - if current_stat.node_epoch > epoch { - epoch_stats.set_epoch(current_stat.node_epoch); - epoch_stats.clear(); - true - } else { - false + match current_stat.node_epoch.cmp(&epoch) { + Ordering::Greater => { + // This node may have been redeployed. + epoch_stats.set_epoch(current_stat.node_epoch); + epoch_stats.clear(); + true + } + Ordering::Less => { + warn!("Ignore stale heartbeat: {:?}", current_stat); + false + } + Ordering::Equal => false, } } else { epoch_stats.set_epoch(current_stat.node_epoch); @@ -134,6 +142,7 @@ mod tests { use std::sync::Arc; use super::*; + use crate::cluster::MetaPeerClientBuilder; use crate::handler::{HeartbeatMailbox, Pushers}; use crate::keys::StatKey; use crate::sequence::Sequence; @@ -146,10 +155,18 @@ mod tests { let kv_store = Arc::new(MemStore::new()); let seq = Sequence::new("test_seq", 0, 10, kv_store.clone()); let mailbox = HeartbeatMailbox::create(Pushers::default(), seq); + let meta_peer_client = MetaPeerClientBuilder::default() + .election(None) + .in_memory(in_memory.clone()) + .build() + .map(Arc::new) + // Safety: all required fields set at initialization + .unwrap(); let ctx = Context { server_addr: "127.0.0.1:0000".to_string(), in_memory, kv_store, + meta_peer_client, mailbox, election: None, skip_all: Arc::new(AtomicBool::new(false)), diff --git a/src/meta-srv/src/handler/response_header_handler.rs b/src/meta-srv/src/handler/response_header_handler.rs index 6a0934b220..7db6ca8684 100644 --- a/src/meta-srv/src/handler/response_header_handler.rs +++ b/src/meta-srv/src/handler/response_header_handler.rs @@ -53,6 +53,7 @@ mod tests { use api::v1::meta::{HeartbeatResponse, RequestHeader}; use super::*; + use crate::cluster::MetaPeerClientBuilder; use crate::handler::{Context, HeartbeatMailbox, Pushers}; use crate::sequence::Sequence; use crate::service::store::memory::MemStore; @@ -63,10 +64,18 @@ mod tests { let kv_store = Arc::new(MemStore::new()); let seq = Sequence::new("test_seq", 0, 10, kv_store.clone()); let mailbox = HeartbeatMailbox::create(Pushers::default(), seq); + let meta_peer_client = MetaPeerClientBuilder::default() + .election(None) + .in_memory(in_memory.clone()) + .build() + .map(Arc::new) + // Safety: all required fields set at initialization + .unwrap(); let mut ctx = Context { server_addr: "127.0.0.1:0000".to_string(), in_memory, kv_store, + meta_peer_client, mailbox, election: None, skip_all: Arc::new(AtomicBool::new(false)), diff --git a/src/meta-srv/src/lease.rs b/src/meta-srv/src/lease.rs index 89b216dd73..377bb7ffb9 100644 --- a/src/meta-srv/src/lease.rs +++ b/src/meta-srv/src/lease.rs @@ -14,29 +14,28 @@ use std::collections::HashMap; -use api::v1::meta::RangeRequest; use common_time::util as time_util; +use crate::cluster::MetaPeerClientRef; use crate::error::Result; use crate::keys::{LeaseKey, LeaseValue, DN_LEASE_PREFIX}; -use crate::service::store::kv::KvStoreRef; use crate::util; pub async fn alive_datanodes( cluster_id: u64, - kv_store: &KvStoreRef, + meta_peer_client: &MetaPeerClientRef, lease_secs: i64, ) -> Result> { let lease_filter = |_: &LeaseKey, v: &LeaseValue| { time_util::current_time_millis() - v.timestamp_millis < lease_secs * 1000 }; - filter_datanodes(cluster_id, kv_store, lease_filter).await + filter_datanodes(cluster_id, meta_peer_client, lease_filter).await } pub async fn filter_datanodes

( cluster_id: u64, - kv_store: &KvStoreRef, + meta_peer_client: &MetaPeerClientRef, predicate: P, ) -> Result> where @@ -44,15 +43,8 @@ where { let key = get_lease_prefix(cluster_id); let range_end = util::get_prefix_end_key(&key); - let req = RangeRequest { - key, - range_end, - ..Default::default() - }; - let res = kv_store.range(req).await?; - - let kvs = res.kvs; + let kvs = meta_peer_client.range(key, range_end).await?; let mut lease_kvs = HashMap::new(); for kv in kvs { let lease_key: LeaseKey = kv.key.try_into()?; diff --git a/src/meta-srv/src/metasrv.rs b/src/meta-srv/src/metasrv.rs index f231a0ad9f..69d314bd28 100644 --- a/src/meta-srv/src/metasrv.rs +++ b/src/meta-srv/src/metasrv.rs @@ -27,7 +27,7 @@ use servers::http::HttpOptions; use snafu::ResultExt; use tokio::sync::broadcast::error::RecvError; -use crate::cluster::MetaPeerClient; +use crate::cluster::MetaPeerClientRef; use crate::election::{Election, LeaderChangeMessage}; use crate::error::{RecoverProcedureSnafu, Result}; use crate::handler::HeartbeatHandlerGroup; @@ -75,6 +75,7 @@ pub struct Context { pub server_addr: String, pub in_memory: ResettableKvStoreRef, pub kv_store: KvStoreRef, + pub meta_peer_client: MetaPeerClientRef, pub mailbox: MailboxRef, pub election: Option, pub skip_all: Arc, @@ -102,6 +103,7 @@ pub struct SelectorContext { pub datanode_lease_secs: i64, pub server_addr: String, pub kv_store: KvStoreRef, + pub meta_peer_client: MetaPeerClientRef, pub catalog: Option, pub schema: Option, pub table: Option, @@ -119,10 +121,10 @@ pub struct MetaSrv { in_memory: ResettableKvStoreRef, kv_store: KvStoreRef, table_id_sequence: SequenceRef, + meta_peer_client: MetaPeerClientRef, selector: SelectorRef, handler_group: HeartbeatHandlerGroup, election: Option, - meta_peer_client: Option, lock: DistLockRef, procedure_manager: ProcedureManagerRef, metadata_service: MetadataServiceRef, @@ -217,6 +219,11 @@ impl MetaSrv { self.kv_store.clone() } + #[inline] + pub fn meta_peer_client(&self) -> MetaPeerClientRef { + self.meta_peer_client.clone() + } + #[inline] pub fn table_id_sequence(&self) -> SequenceRef { self.table_id_sequence.clone() @@ -237,11 +244,6 @@ impl MetaSrv { self.election.clone() } - #[inline] - pub fn meta_peer_client(&self) -> Option { - self.meta_peer_client.clone() - } - #[inline] pub fn lock(&self) -> &DistLockRef { &self.lock @@ -261,6 +263,7 @@ impl MetaSrv { let server_addr = self.options().server_addr.clone(); let in_memory = self.in_memory(); let kv_store = self.kv_store(); + let meta_peer_client = self.meta_peer_client(); let mailbox = self.mailbox(); let election = self.election(); let skip_all = Arc::new(AtomicBool::new(false)); @@ -268,6 +271,7 @@ impl MetaSrv { server_addr, in_memory, kv_store, + meta_peer_client, mailbox, election, skip_all, diff --git a/src/meta-srv/src/metasrv/builder.rs b/src/meta-srv/src/metasrv/builder.rs index f10bae9a69..2a252fdd7c 100644 --- a/src/meta-srv/src/metasrv/builder.rs +++ b/src/meta-srv/src/metasrv/builder.rs @@ -17,7 +17,7 @@ use std::sync::Arc; use common_procedure::local::{LocalManager, ManagerConfig}; -use crate::cluster::MetaPeerClient; +use crate::cluster::{MetaPeerClientBuilder, MetaPeerClientRef}; use crate::error::Result; use crate::handler::mailbox_handler::MailboxHandler; use crate::handler::region_lease_handler::RegionLeaseHandler; @@ -47,7 +47,7 @@ pub struct MetaSrvBuilder { selector: Option, handler_group: Option, election: Option, - meta_peer_client: Option, + meta_peer_client: Option, lock: Option, metadata_service: Option, } @@ -92,7 +92,7 @@ impl MetaSrvBuilder { self } - pub fn meta_peer_client(mut self, meta_peer_client: MetaPeerClient) -> Self { + pub fn meta_peer_client(mut self, meta_peer_client: MetaPeerClientRef) -> Self { self.meta_peer_client = Some(meta_peer_client); self } @@ -130,18 +130,22 @@ impl MetaSrvBuilder { let options = options.unwrap_or_default(); let kv_store = kv_store.unwrap_or_else(|| Arc::new(MemStore::default())); - let in_memory = in_memory.unwrap_or_else(|| Arc::new(MemStore::default())); - + let meta_peer_client = meta_peer_client.unwrap_or_else(|| { + MetaPeerClientBuilder::default() + .election(election.clone()) + .in_memory(in_memory.clone()) + .build() + .map(Arc::new) + // Safety: all required fields set at initialization + .unwrap() + }); let selector = selector.unwrap_or_else(|| Arc::new(LeaseBasedSelector)); - let pushers = Pushers::default(); let mailbox_sequence = Sequence::new("heartbeat_mailbox", 1, 100, kv_store.clone()); let mailbox = HeartbeatMailbox::create(pushers.clone(), mailbox_sequence); - let state_store = Arc::new(MetaStateStore::new(kv_store.clone())); let procedure_manager = Arc::new(LocalManager::new(ManagerConfig::default(), state_store)); - let lock = lock.unwrap_or_else(|| Arc::new(MemLock::default())); let handler_group = match handler_group { @@ -158,6 +162,7 @@ impl MetaSrvBuilder { server_addr: options.server_addr.clone(), datanode_lease_secs: options.datanode_lease_secs, kv_store: kv_store.clone(), + meta_peer_client: meta_peer_client.clone(), catalog: None, schema: None, table: None, @@ -179,12 +184,11 @@ impl MetaSrvBuilder { ); let group = HeartbeatHandlerGroup::new(pushers); - let keep_lease_handler = KeepLeaseHandler::new(kv_store.clone()); group.add_handler(ResponseHeaderHandler::default()).await; // `KeepLeaseHandler` should preferably be in front of `CheckLeaderHandler`, // because even if the current meta-server node is no longer the leader it can // still help the datanode to keep lease. - group.add_handler(keep_lease_handler).await; + group.add_handler(KeepLeaseHandler::default()).await; group.add_handler(CheckLeaderHandler::default()).await; group.add_handler(OnLeaderStartHandler::default()).await; group.add_handler(CollectStatsHandler::default()).await; @@ -208,11 +212,11 @@ impl MetaSrvBuilder { options, in_memory, kv_store, + meta_peer_client, table_id_sequence, selector, handler_group, election, - meta_peer_client, lock, procedure_manager, metadata_service, diff --git a/src/meta-srv/src/procedure/region_failover.rs b/src/meta-srv/src/procedure/region_failover.rs index 0dcb213aa5..f291748515 100644 --- a/src/meta-srv/src/procedure/region_failover.rs +++ b/src/meta-srv/src/procedure/region_failover.rs @@ -381,6 +381,7 @@ mod tests { use tokio::sync::mpsc::Receiver; use super::*; + use crate::cluster::MetaPeerClientBuilder; use crate::handler::{HeartbeatMailbox, Pusher, Pushers}; use crate::lock::memory::MemLock; use crate::selector::{Namespace, Selector}; @@ -469,6 +470,13 @@ mod tests { pub async fn build(self) -> TestingEnv { let kv_store = Arc::new(MemStore::new()) as _; + let meta_peer_client = MetaPeerClientBuilder::default() + .election(None) + .in_memory(Arc::new(MemStore::new())) + .build() + .map(Arc::new) + // Safety: all required fields set at initialization + .unwrap(); let table = "my_table"; let (_, table_global_value) = @@ -505,6 +513,7 @@ mod tests { datanode_lease_secs: 10, server_addr: "127.0.0.1:3002".to_string(), kv_store, + meta_peer_client, catalog: Some(DEFAULT_CATALOG_NAME.to_string()), schema: Some(DEFAULT_SCHEMA_NAME.to_string()), table: Some(table.to_string()), diff --git a/src/meta-srv/src/selector/lease_based.rs b/src/meta-srv/src/selector/lease_based.rs index 74c0116521..3e45e76c61 100644 --- a/src/meta-srv/src/selector/lease_based.rs +++ b/src/meta-srv/src/selector/lease_based.rs @@ -29,7 +29,7 @@ impl Selector for LeaseBasedSelector { async fn select(&self, ns: Namespace, ctx: &Self::Context) -> Result { // filter out the nodes out lease let mut lease_kvs: Vec<_> = - lease::alive_datanodes(ns, &ctx.kv_store, ctx.datanode_lease_secs) + lease::alive_datanodes(ns, &ctx.meta_peer_client, ctx.datanode_lease_secs) .await? .into_iter() .collect(); diff --git a/src/meta-srv/src/selector/load_based.rs b/src/meta-srv/src/selector/load_based.rs index 6417a4590a..0e420e1e47 100644 --- a/src/meta-srv/src/selector/load_based.rs +++ b/src/meta-srv/src/selector/load_based.rs @@ -15,7 +15,6 @@ use api::v1::meta::Peer; use common_telemetry::warn; -use crate::cluster::MetaPeerClient; use crate::error::Result; use crate::handler::node_stat::RegionStat; use crate::keys::{LeaseKey, LeaseValue, StatKey, StatValue}; @@ -25,9 +24,7 @@ use crate::selector::{Namespace, Selector}; const MAX_REGION_NUMBER: u64 = u64::MAX; -pub struct LoadBasedSelector { - pub meta_peer_client: MetaPeerClient, -} +pub struct LoadBasedSelector; #[async_trait::async_trait] impl Selector for LoadBasedSelector { @@ -36,13 +33,14 @@ impl Selector for LoadBasedSelector { async fn select(&self, ns: Namespace, ctx: &Self::Context) -> Result { // get alive datanodes - let lease_kvs = lease::alive_datanodes(ns, &ctx.kv_store, ctx.datanode_lease_secs).await?; + let lease_kvs = + lease::alive_datanodes(ns, &ctx.meta_peer_client, ctx.datanode_lease_secs).await?; if lease_kvs.is_empty() { return Ok(vec![]); } let stat_keys: Vec = lease_kvs.keys().map(|k| k.into()).collect(); - let stat_kvs = self.meta_peer_client.get_dn_stat_kvs(stat_keys).await?; + let stat_kvs = ctx.meta_peer_client.get_dn_stat_kvs(stat_keys).await?; let mut tuples: Vec<(LeaseKey, LeaseValue, u64)> = lease_kvs .into_iter() diff --git a/src/meta-srv/src/service/admin/heartbeat.rs b/src/meta-srv/src/service/admin/heartbeat.rs index 1f83efe8de..f881ec1616 100644 --- a/src/meta-srv/src/service/admin/heartbeat.rs +++ b/src/meta-srv/src/service/admin/heartbeat.rs @@ -15,16 +15,16 @@ use std::collections::HashMap; use serde::{Deserialize, Serialize}; -use snafu::{OptionExt, ResultExt}; +use snafu::ResultExt; use tonic::codegen::http; -use crate::cluster::MetaPeerClient; +use crate::cluster::MetaPeerClientRef; use crate::error::{self, Result}; use crate::keys::StatValue; use crate::service::admin::HttpHandler; pub struct HeartBeatHandler { - pub meta_peer_client: Option, + pub meta_peer_client: MetaPeerClientRef, } #[async_trait::async_trait] @@ -34,12 +34,7 @@ impl HttpHandler for HeartBeatHandler { _: &str, params: &HashMap, ) -> Result> { - let meta_peer_client = self - .meta_peer_client - .as_ref() - .context(error::NoMetaPeerClientSnafu)?; - - let stat_kvs = meta_peer_client.get_all_dn_stat_kvs().await?; + let stat_kvs = self.meta_peer_client.get_all_dn_stat_kvs().await?; let mut stat_vals: Vec = stat_kvs.into_values().collect(); if let Some(addr) = params.get("addr") { diff --git a/src/meta-srv/src/service/router.rs b/src/meta-srv/src/service/router.rs index 949fe6a7d9..ae6a9ccba5 100644 --- a/src/meta-srv/src/service/router.rs +++ b/src/meta-srv/src/service/router.rs @@ -84,6 +84,7 @@ impl router_server::Router for MetaSrv { datanode_lease_secs: self.options().datanode_lease_secs, server_addr: self.options().server_addr.clone(), kv_store: self.kv_store(), + meta_peer_client: self.meta_peer_client(), catalog: Some(table_name.catalog_name.clone()), schema: Some(table_name.schema_name.clone()), table: Some(table_name.table_name.clone()), diff --git a/src/meta-srv/src/test_util.rs b/src/meta-srv/src/test_util.rs index 67b7900b3c..fbb2433ae0 100644 --- a/src/meta-srv/src/test_util.rs +++ b/src/meta-srv/src/test_util.rs @@ -16,6 +16,7 @@ use std::sync::Arc; use common_procedure::local::{LocalManager, ManagerConfig}; +use crate::cluster::MetaPeerClientBuilder; use crate::handler::{HeartbeatMailbox, Pushers}; use crate::lock::memory::MemLock; use crate::metasrv::SelectorContext; @@ -35,11 +36,21 @@ pub(crate) fn create_region_failover_manager() -> Arc { let state_store = Arc::new(MetaStateStore::new(kv_store.clone())); let procedure_manager = Arc::new(LocalManager::new(ManagerConfig::default(), state_store)); + let in_memory = Arc::new(MemStore::new()); + let meta_peer_client = MetaPeerClientBuilder::default() + .election(None) + .in_memory(in_memory) + .build() + .map(Arc::new) + // Safety: all required fields set at initialization + .unwrap(); + let selector = Arc::new(LeaseBasedSelector); let selector_ctx = SelectorContext { datanode_lease_secs: 10, server_addr: "127.0.0.1:3002".to_string(), kv_store, + meta_peer_client, catalog: None, schema: None, table: None, diff --git a/tests-integration/src/cluster.rs b/tests-integration/src/cluster.rs index 7425f55a2d..1943edc7f6 100644 --- a/tests-integration/src/cluster.rs +++ b/tests-integration/src/cluster.rs @@ -31,6 +31,7 @@ use datanode::heartbeat::HeartbeatTask; use datanode::instance::Instance as DatanodeInstance; use frontend::instance::{FrontendInstance, Instance as FeInstance}; use meta_client::client::MetaClientBuilder; +use meta_srv::cluster::MetaPeerClientRef; use meta_srv::metasrv::{MetaSrv, MetaSrvOptions}; use meta_srv::mocks::MockInfo; use meta_srv::service::store::kv::KvStoreRef; @@ -93,7 +94,8 @@ impl GreptimeDbClusterBuilder { let datanode_clients = build_datanode_clients(&datanode_instances, datanodes).await; - self.wait_datanodes_alive(datanodes).await; + self.wait_datanodes_alive(&meta_srv.meta_srv.meta_peer_client(), datanodes) + .await; let frontend = self .build_frontend(meta_srv.clone(), datanode_clients) @@ -162,13 +164,17 @@ impl GreptimeDbClusterBuilder { (instances, heartbeat_tasks, storage_guards, wal_guards) } - async fn wait_datanodes_alive(&self, expected_datanodes: u32) { - let kv_store = self.kv_store(); + async fn wait_datanodes_alive( + &self, + meta_peer_client: &MetaPeerClientRef, + expected_datanodes: u32, + ) { for _ in 0..10 { - let alive_datanodes = meta_srv::lease::filter_datanodes(1000, &kv_store, |_, _| true) - .await - .unwrap() - .len() as u32; + let alive_datanodes = + meta_srv::lease::filter_datanodes(1000, meta_peer_client, |_, _| true) + .await + .unwrap() + .len() as u32; if alive_datanodes == expected_datanodes { return; } @@ -225,10 +231,6 @@ impl GreptimeDbClusterBuilder { .unwrap(), ) } - - fn kv_store(&self) -> KvStoreRef { - self.kv_store.clone() - } } async fn build_datanode_clients( diff --git a/tests-integration/tests/region_failover.rs b/tests-integration/tests/region_failover.rs index c09a24738e..2bde845075 100644 --- a/tests-integration/tests/region_failover.rs +++ b/tests-integration/tests/region_failover.rs @@ -337,6 +337,7 @@ async fn run_region_failover_procedure( datanode_lease_secs: meta_srv.options().datanode_lease_secs, server_addr: meta_srv.options().server_addr.clone(), kv_store: meta_srv.kv_store(), + meta_peer_client: meta_srv.meta_peer_client(), catalog: None, schema: None, table: None,