mirror of
https://github.com/GreptimeTeam/greptimedb.git
synced 2026-05-23 00:10:38 +00:00
feat: save node lease into memory (#1841)
* feat: lease secs = 5 * feat: set lease data into memory of leader * fix: ignore stale heartbeat * Update src/meta-srv/src/election.rs Co-authored-by: LFC <bayinamine@gmail.com> --------- Co-authored-by: LFC <bayinamine@gmail.com>
This commit is contained in:
@@ -30,7 +30,6 @@ use tokio::sync::mpsc::{self, Receiver, Sender};
|
||||
use tokio_stream::wrappers::TcpListenerStream;
|
||||
use tonic::transport::server::Router;
|
||||
|
||||
use crate::cluster::MetaPeerClientBuilder;
|
||||
use crate::election::etcd::EtcdElection;
|
||||
use crate::lock::etcd::EtcdLock;
|
||||
use crate::lock::memory::MemLock;
|
||||
@@ -172,17 +171,8 @@ pub async fn build_meta_srv(opts: &MetaSrvOptions) -> Result<MetaSrv> {
|
||||
|
||||
let in_memory = Arc::new(MemStore::default()) as ResettableKvStoreRef;
|
||||
|
||||
let meta_peer_client = MetaPeerClientBuilder::default()
|
||||
.election(election.clone())
|
||||
.in_memory(in_memory.clone())
|
||||
.build()
|
||||
// Safety: all required fields set at initialization
|
||||
.unwrap();
|
||||
|
||||
let selector = match opts.selector {
|
||||
SelectorType::LoadBased => Arc::new(LoadBasedSelector {
|
||||
meta_peer_client: meta_peer_client.clone(),
|
||||
}) as SelectorRef,
|
||||
SelectorType::LoadBased => Arc::new(LoadBasedSelector) as SelectorRef,
|
||||
SelectorType::LeaseBased => Arc::new(LeaseBasedSelector) as SelectorRef,
|
||||
};
|
||||
|
||||
@@ -192,7 +182,6 @@ pub async fn build_meta_srv(opts: &MetaSrvOptions) -> Result<MetaSrv> {
|
||||
.in_memory(in_memory)
|
||||
.selector(selector)
|
||||
.election(election)
|
||||
.meta_peer_client(meta_peer_client)
|
||||
.lock(lock)
|
||||
.build()
|
||||
.await
|
||||
|
||||
@@ -13,6 +13,7 @@
|
||||
// limitations under the License.
|
||||
|
||||
use std::collections::HashMap;
|
||||
use std::sync::Arc;
|
||||
use std::time::Duration;
|
||||
|
||||
use api::v1::meta::cluster_client::ClusterClient;
|
||||
@@ -30,7 +31,9 @@ use crate::metasrv::ElectionRef;
|
||||
use crate::service::store::kv::ResettableKvStoreRef;
|
||||
use crate::{error, util};
|
||||
|
||||
#[derive(Builder, Clone)]
|
||||
pub type MetaPeerClientRef = Arc<MetaPeerClient>;
|
||||
|
||||
#[derive(Builder)]
|
||||
pub struct MetaPeerClient {
|
||||
election: Option<ElectionRef>,
|
||||
in_memory: ResettableKvStoreRef,
|
||||
|
||||
@@ -21,8 +21,9 @@ use tokio::sync::broadcast::Receiver;
|
||||
|
||||
use crate::error::Result;
|
||||
|
||||
pub const LEASE_SECS: i64 = 3;
|
||||
pub const KEEP_ALIVE_PERIOD_SECS: u64 = LEASE_SECS as u64 * 2 / 3;
|
||||
pub const LEASE_SECS: i64 = 5;
|
||||
// In a lease, there are two opportunities for renewal.
|
||||
pub const KEEP_ALIVE_PERIOD_SECS: u64 = LEASE_SECS as u64 / 2;
|
||||
pub const ELECTION_KEY: &str = "__meta_srv_election";
|
||||
|
||||
#[derive(Clone)]
|
||||
|
||||
@@ -221,9 +221,6 @@ pub enum Error {
|
||||
location: Location,
|
||||
},
|
||||
|
||||
#[snafu(display("MetaSrv has no meta peer client"))]
|
||||
NoMetaPeerClient { location: Location },
|
||||
|
||||
#[snafu(display("Invalid http body, source: {}", source))]
|
||||
InvalidHttpBody {
|
||||
source: http::Error,
|
||||
@@ -399,7 +396,6 @@ impl ErrorExt for Error {
|
||||
| Error::Range { .. }
|
||||
| Error::ResponseHeaderNotFound { .. }
|
||||
| Error::IsNotLeader { .. }
|
||||
| Error::NoMetaPeerClient { .. }
|
||||
| Error::InvalidHttpBody { .. }
|
||||
| Error::Lock { .. }
|
||||
| Error::Unlock { .. }
|
||||
|
||||
@@ -12,46 +12,17 @@
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
use api::v1::meta::{BatchPutRequest, HeartbeatRequest, KeyValue, Role};
|
||||
use api::v1::meta::{HeartbeatRequest, PutRequest, Role};
|
||||
use common_telemetry::{trace, warn};
|
||||
use common_time::util as time_util;
|
||||
use tokio::sync::mpsc::{self, Sender};
|
||||
|
||||
use crate::error::Result;
|
||||
use crate::handler::{HeartbeatAccumulator, HeartbeatHandler};
|
||||
use crate::keys::{LeaseKey, LeaseValue};
|
||||
use crate::metasrv::Context;
|
||||
use crate::service::store::kv::KvStoreRef;
|
||||
|
||||
pub struct KeepLeaseHandler {
|
||||
tx: Sender<KeyValue>,
|
||||
}
|
||||
|
||||
impl KeepLeaseHandler {
|
||||
pub fn new(kv_store: KvStoreRef) -> Self {
|
||||
let (tx, mut rx) = mpsc::channel(1024);
|
||||
let _handle = common_runtime::spawn_bg(async move {
|
||||
while let Some(kv) = rx.recv().await {
|
||||
let mut kvs = vec![kv];
|
||||
|
||||
while let Ok(kv) = rx.try_recv() {
|
||||
kvs.push(kv);
|
||||
}
|
||||
|
||||
let batch_put = BatchPutRequest {
|
||||
kvs,
|
||||
..Default::default()
|
||||
};
|
||||
|
||||
if let Err(err) = kv_store.batch_put(batch_put).await {
|
||||
warn!("Failed to write lease KVs, {err}");
|
||||
}
|
||||
}
|
||||
});
|
||||
|
||||
Self { tx }
|
||||
}
|
||||
}
|
||||
#[derive(Default)]
|
||||
pub struct KeepLeaseHandler;
|
||||
|
||||
#[async_trait::async_trait]
|
||||
impl HeartbeatHandler for KeepLeaseHandler {
|
||||
@@ -62,28 +33,35 @@ impl HeartbeatHandler for KeepLeaseHandler {
|
||||
async fn handle(
|
||||
&self,
|
||||
req: &HeartbeatRequest,
|
||||
_ctx: &mut Context,
|
||||
ctx: &mut Context,
|
||||
_acc: &mut HeartbeatAccumulator,
|
||||
) -> Result<()> {
|
||||
let HeartbeatRequest { header, peer, .. } = req;
|
||||
if let Some(peer) = &peer {
|
||||
let key = LeaseKey {
|
||||
cluster_id: header.as_ref().map_or(0, |h| h.cluster_id),
|
||||
node_id: peer.id,
|
||||
};
|
||||
let value = LeaseValue {
|
||||
timestamp_millis: time_util::current_time_millis(),
|
||||
node_addr: peer.addr.clone(),
|
||||
};
|
||||
let Some(peer) = &peer else { return Ok(()); };
|
||||
|
||||
trace!("Receive a heartbeat: {key:?}, {value:?}");
|
||||
let key = LeaseKey {
|
||||
cluster_id: header.as_ref().map_or(0, |h| h.cluster_id),
|
||||
node_id: peer.id,
|
||||
};
|
||||
let value = LeaseValue {
|
||||
timestamp_millis: time_util::current_time_millis(),
|
||||
node_addr: peer.addr.clone(),
|
||||
};
|
||||
|
||||
let key = key.try_into()?;
|
||||
let value = value.try_into()?;
|
||||
trace!("Receive a heartbeat: {key:?}, {value:?}");
|
||||
|
||||
if let Err(err) = self.tx.send(KeyValue { key, value }).await {
|
||||
warn!("Failed to send lease KV to writer, peer: {peer:?}, {err}");
|
||||
}
|
||||
let key = key.try_into()?;
|
||||
let value = value.try_into()?;
|
||||
let put_req = PutRequest {
|
||||
key,
|
||||
value,
|
||||
..Default::default()
|
||||
};
|
||||
|
||||
let res = ctx.in_memory.put(put_req).await;
|
||||
|
||||
if let Err(err) = res {
|
||||
warn!("Failed to update lease KV, peer: {peer:?}, {err}");
|
||||
}
|
||||
|
||||
Ok(())
|
||||
|
||||
@@ -12,7 +12,10 @@
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
use std::cmp::Ordering;
|
||||
|
||||
use api::v1::meta::{HeartbeatRequest, PutRequest, Role};
|
||||
use common_telemetry::warn;
|
||||
use dashmap::DashMap;
|
||||
|
||||
use crate::error::Result;
|
||||
@@ -90,13 +93,18 @@ impl HeartbeatHandler for PersistStatsHandler {
|
||||
let epoch_stats = entry.value_mut();
|
||||
|
||||
let refresh = if let Some(epoch) = epoch_stats.epoch() {
|
||||
// This node may have been redeployed.
|
||||
if current_stat.node_epoch > epoch {
|
||||
epoch_stats.set_epoch(current_stat.node_epoch);
|
||||
epoch_stats.clear();
|
||||
true
|
||||
} else {
|
||||
false
|
||||
match current_stat.node_epoch.cmp(&epoch) {
|
||||
Ordering::Greater => {
|
||||
// This node may have been redeployed.
|
||||
epoch_stats.set_epoch(current_stat.node_epoch);
|
||||
epoch_stats.clear();
|
||||
true
|
||||
}
|
||||
Ordering::Less => {
|
||||
warn!("Ignore stale heartbeat: {:?}", current_stat);
|
||||
false
|
||||
}
|
||||
Ordering::Equal => false,
|
||||
}
|
||||
} else {
|
||||
epoch_stats.set_epoch(current_stat.node_epoch);
|
||||
@@ -134,6 +142,7 @@ mod tests {
|
||||
use std::sync::Arc;
|
||||
|
||||
use super::*;
|
||||
use crate::cluster::MetaPeerClientBuilder;
|
||||
use crate::handler::{HeartbeatMailbox, Pushers};
|
||||
use crate::keys::StatKey;
|
||||
use crate::sequence::Sequence;
|
||||
@@ -146,10 +155,18 @@ mod tests {
|
||||
let kv_store = Arc::new(MemStore::new());
|
||||
let seq = Sequence::new("test_seq", 0, 10, kv_store.clone());
|
||||
let mailbox = HeartbeatMailbox::create(Pushers::default(), seq);
|
||||
let meta_peer_client = MetaPeerClientBuilder::default()
|
||||
.election(None)
|
||||
.in_memory(in_memory.clone())
|
||||
.build()
|
||||
.map(Arc::new)
|
||||
// Safety: all required fields set at initialization
|
||||
.unwrap();
|
||||
let ctx = Context {
|
||||
server_addr: "127.0.0.1:0000".to_string(),
|
||||
in_memory,
|
||||
kv_store,
|
||||
meta_peer_client,
|
||||
mailbox,
|
||||
election: None,
|
||||
skip_all: Arc::new(AtomicBool::new(false)),
|
||||
|
||||
@@ -53,6 +53,7 @@ mod tests {
|
||||
use api::v1::meta::{HeartbeatResponse, RequestHeader};
|
||||
|
||||
use super::*;
|
||||
use crate::cluster::MetaPeerClientBuilder;
|
||||
use crate::handler::{Context, HeartbeatMailbox, Pushers};
|
||||
use crate::sequence::Sequence;
|
||||
use crate::service::store::memory::MemStore;
|
||||
@@ -63,10 +64,18 @@ mod tests {
|
||||
let kv_store = Arc::new(MemStore::new());
|
||||
let seq = Sequence::new("test_seq", 0, 10, kv_store.clone());
|
||||
let mailbox = HeartbeatMailbox::create(Pushers::default(), seq);
|
||||
let meta_peer_client = MetaPeerClientBuilder::default()
|
||||
.election(None)
|
||||
.in_memory(in_memory.clone())
|
||||
.build()
|
||||
.map(Arc::new)
|
||||
// Safety: all required fields set at initialization
|
||||
.unwrap();
|
||||
let mut ctx = Context {
|
||||
server_addr: "127.0.0.1:0000".to_string(),
|
||||
in_memory,
|
||||
kv_store,
|
||||
meta_peer_client,
|
||||
mailbox,
|
||||
election: None,
|
||||
skip_all: Arc::new(AtomicBool::new(false)),
|
||||
|
||||
@@ -14,29 +14,28 @@
|
||||
|
||||
use std::collections::HashMap;
|
||||
|
||||
use api::v1::meta::RangeRequest;
|
||||
use common_time::util as time_util;
|
||||
|
||||
use crate::cluster::MetaPeerClientRef;
|
||||
use crate::error::Result;
|
||||
use crate::keys::{LeaseKey, LeaseValue, DN_LEASE_PREFIX};
|
||||
use crate::service::store::kv::KvStoreRef;
|
||||
use crate::util;
|
||||
|
||||
pub async fn alive_datanodes(
|
||||
cluster_id: u64,
|
||||
kv_store: &KvStoreRef,
|
||||
meta_peer_client: &MetaPeerClientRef,
|
||||
lease_secs: i64,
|
||||
) -> Result<HashMap<LeaseKey, LeaseValue>> {
|
||||
let lease_filter = |_: &LeaseKey, v: &LeaseValue| {
|
||||
time_util::current_time_millis() - v.timestamp_millis < lease_secs * 1000
|
||||
};
|
||||
|
||||
filter_datanodes(cluster_id, kv_store, lease_filter).await
|
||||
filter_datanodes(cluster_id, meta_peer_client, lease_filter).await
|
||||
}
|
||||
|
||||
pub async fn filter_datanodes<P>(
|
||||
cluster_id: u64,
|
||||
kv_store: &KvStoreRef,
|
||||
meta_peer_client: &MetaPeerClientRef,
|
||||
predicate: P,
|
||||
) -> Result<HashMap<LeaseKey, LeaseValue>>
|
||||
where
|
||||
@@ -44,15 +43,8 @@ where
|
||||
{
|
||||
let key = get_lease_prefix(cluster_id);
|
||||
let range_end = util::get_prefix_end_key(&key);
|
||||
let req = RangeRequest {
|
||||
key,
|
||||
range_end,
|
||||
..Default::default()
|
||||
};
|
||||
|
||||
let res = kv_store.range(req).await?;
|
||||
|
||||
let kvs = res.kvs;
|
||||
let kvs = meta_peer_client.range(key, range_end).await?;
|
||||
let mut lease_kvs = HashMap::new();
|
||||
for kv in kvs {
|
||||
let lease_key: LeaseKey = kv.key.try_into()?;
|
||||
|
||||
@@ -27,7 +27,7 @@ use servers::http::HttpOptions;
|
||||
use snafu::ResultExt;
|
||||
use tokio::sync::broadcast::error::RecvError;
|
||||
|
||||
use crate::cluster::MetaPeerClient;
|
||||
use crate::cluster::MetaPeerClientRef;
|
||||
use crate::election::{Election, LeaderChangeMessage};
|
||||
use crate::error::{RecoverProcedureSnafu, Result};
|
||||
use crate::handler::HeartbeatHandlerGroup;
|
||||
@@ -75,6 +75,7 @@ pub struct Context {
|
||||
pub server_addr: String,
|
||||
pub in_memory: ResettableKvStoreRef,
|
||||
pub kv_store: KvStoreRef,
|
||||
pub meta_peer_client: MetaPeerClientRef,
|
||||
pub mailbox: MailboxRef,
|
||||
pub election: Option<ElectionRef>,
|
||||
pub skip_all: Arc<AtomicBool>,
|
||||
@@ -102,6 +103,7 @@ pub struct SelectorContext {
|
||||
pub datanode_lease_secs: i64,
|
||||
pub server_addr: String,
|
||||
pub kv_store: KvStoreRef,
|
||||
pub meta_peer_client: MetaPeerClientRef,
|
||||
pub catalog: Option<String>,
|
||||
pub schema: Option<String>,
|
||||
pub table: Option<String>,
|
||||
@@ -119,10 +121,10 @@ pub struct MetaSrv {
|
||||
in_memory: ResettableKvStoreRef,
|
||||
kv_store: KvStoreRef,
|
||||
table_id_sequence: SequenceRef,
|
||||
meta_peer_client: MetaPeerClientRef,
|
||||
selector: SelectorRef,
|
||||
handler_group: HeartbeatHandlerGroup,
|
||||
election: Option<ElectionRef>,
|
||||
meta_peer_client: Option<MetaPeerClient>,
|
||||
lock: DistLockRef,
|
||||
procedure_manager: ProcedureManagerRef,
|
||||
metadata_service: MetadataServiceRef,
|
||||
@@ -217,6 +219,11 @@ impl MetaSrv {
|
||||
self.kv_store.clone()
|
||||
}
|
||||
|
||||
#[inline]
|
||||
pub fn meta_peer_client(&self) -> MetaPeerClientRef {
|
||||
self.meta_peer_client.clone()
|
||||
}
|
||||
|
||||
#[inline]
|
||||
pub fn table_id_sequence(&self) -> SequenceRef {
|
||||
self.table_id_sequence.clone()
|
||||
@@ -237,11 +244,6 @@ impl MetaSrv {
|
||||
self.election.clone()
|
||||
}
|
||||
|
||||
#[inline]
|
||||
pub fn meta_peer_client(&self) -> Option<MetaPeerClient> {
|
||||
self.meta_peer_client.clone()
|
||||
}
|
||||
|
||||
#[inline]
|
||||
pub fn lock(&self) -> &DistLockRef {
|
||||
&self.lock
|
||||
@@ -261,6 +263,7 @@ impl MetaSrv {
|
||||
let server_addr = self.options().server_addr.clone();
|
||||
let in_memory = self.in_memory();
|
||||
let kv_store = self.kv_store();
|
||||
let meta_peer_client = self.meta_peer_client();
|
||||
let mailbox = self.mailbox();
|
||||
let election = self.election();
|
||||
let skip_all = Arc::new(AtomicBool::new(false));
|
||||
@@ -268,6 +271,7 @@ impl MetaSrv {
|
||||
server_addr,
|
||||
in_memory,
|
||||
kv_store,
|
||||
meta_peer_client,
|
||||
mailbox,
|
||||
election,
|
||||
skip_all,
|
||||
|
||||
@@ -17,7 +17,7 @@ use std::sync::Arc;
|
||||
|
||||
use common_procedure::local::{LocalManager, ManagerConfig};
|
||||
|
||||
use crate::cluster::MetaPeerClient;
|
||||
use crate::cluster::{MetaPeerClientBuilder, MetaPeerClientRef};
|
||||
use crate::error::Result;
|
||||
use crate::handler::mailbox_handler::MailboxHandler;
|
||||
use crate::handler::region_lease_handler::RegionLeaseHandler;
|
||||
@@ -47,7 +47,7 @@ pub struct MetaSrvBuilder {
|
||||
selector: Option<SelectorRef>,
|
||||
handler_group: Option<HeartbeatHandlerGroup>,
|
||||
election: Option<ElectionRef>,
|
||||
meta_peer_client: Option<MetaPeerClient>,
|
||||
meta_peer_client: Option<MetaPeerClientRef>,
|
||||
lock: Option<DistLockRef>,
|
||||
metadata_service: Option<MetadataServiceRef>,
|
||||
}
|
||||
@@ -92,7 +92,7 @@ impl MetaSrvBuilder {
|
||||
self
|
||||
}
|
||||
|
||||
pub fn meta_peer_client(mut self, meta_peer_client: MetaPeerClient) -> Self {
|
||||
pub fn meta_peer_client(mut self, meta_peer_client: MetaPeerClientRef) -> Self {
|
||||
self.meta_peer_client = Some(meta_peer_client);
|
||||
self
|
||||
}
|
||||
@@ -130,18 +130,22 @@ impl MetaSrvBuilder {
|
||||
let options = options.unwrap_or_default();
|
||||
|
||||
let kv_store = kv_store.unwrap_or_else(|| Arc::new(MemStore::default()));
|
||||
|
||||
let in_memory = in_memory.unwrap_or_else(|| Arc::new(MemStore::default()));
|
||||
|
||||
let meta_peer_client = meta_peer_client.unwrap_or_else(|| {
|
||||
MetaPeerClientBuilder::default()
|
||||
.election(election.clone())
|
||||
.in_memory(in_memory.clone())
|
||||
.build()
|
||||
.map(Arc::new)
|
||||
// Safety: all required fields set at initialization
|
||||
.unwrap()
|
||||
});
|
||||
let selector = selector.unwrap_or_else(|| Arc::new(LeaseBasedSelector));
|
||||
|
||||
let pushers = Pushers::default();
|
||||
let mailbox_sequence = Sequence::new("heartbeat_mailbox", 1, 100, kv_store.clone());
|
||||
let mailbox = HeartbeatMailbox::create(pushers.clone(), mailbox_sequence);
|
||||
|
||||
let state_store = Arc::new(MetaStateStore::new(kv_store.clone()));
|
||||
let procedure_manager = Arc::new(LocalManager::new(ManagerConfig::default(), state_store));
|
||||
|
||||
let lock = lock.unwrap_or_else(|| Arc::new(MemLock::default()));
|
||||
|
||||
let handler_group = match handler_group {
|
||||
@@ -158,6 +162,7 @@ impl MetaSrvBuilder {
|
||||
server_addr: options.server_addr.clone(),
|
||||
datanode_lease_secs: options.datanode_lease_secs,
|
||||
kv_store: kv_store.clone(),
|
||||
meta_peer_client: meta_peer_client.clone(),
|
||||
catalog: None,
|
||||
schema: None,
|
||||
table: None,
|
||||
@@ -179,12 +184,11 @@ impl MetaSrvBuilder {
|
||||
);
|
||||
|
||||
let group = HeartbeatHandlerGroup::new(pushers);
|
||||
let keep_lease_handler = KeepLeaseHandler::new(kv_store.clone());
|
||||
group.add_handler(ResponseHeaderHandler::default()).await;
|
||||
// `KeepLeaseHandler` should preferably be in front of `CheckLeaderHandler`,
|
||||
// because even if the current meta-server node is no longer the leader it can
|
||||
// still help the datanode to keep lease.
|
||||
group.add_handler(keep_lease_handler).await;
|
||||
group.add_handler(KeepLeaseHandler::default()).await;
|
||||
group.add_handler(CheckLeaderHandler::default()).await;
|
||||
group.add_handler(OnLeaderStartHandler::default()).await;
|
||||
group.add_handler(CollectStatsHandler::default()).await;
|
||||
@@ -208,11 +212,11 @@ impl MetaSrvBuilder {
|
||||
options,
|
||||
in_memory,
|
||||
kv_store,
|
||||
meta_peer_client,
|
||||
table_id_sequence,
|
||||
selector,
|
||||
handler_group,
|
||||
election,
|
||||
meta_peer_client,
|
||||
lock,
|
||||
procedure_manager,
|
||||
metadata_service,
|
||||
|
||||
@@ -381,6 +381,7 @@ mod tests {
|
||||
use tokio::sync::mpsc::Receiver;
|
||||
|
||||
use super::*;
|
||||
use crate::cluster::MetaPeerClientBuilder;
|
||||
use crate::handler::{HeartbeatMailbox, Pusher, Pushers};
|
||||
use crate::lock::memory::MemLock;
|
||||
use crate::selector::{Namespace, Selector};
|
||||
@@ -469,6 +470,13 @@ mod tests {
|
||||
|
||||
pub async fn build(self) -> TestingEnv {
|
||||
let kv_store = Arc::new(MemStore::new()) as _;
|
||||
let meta_peer_client = MetaPeerClientBuilder::default()
|
||||
.election(None)
|
||||
.in_memory(Arc::new(MemStore::new()))
|
||||
.build()
|
||||
.map(Arc::new)
|
||||
// Safety: all required fields set at initialization
|
||||
.unwrap();
|
||||
|
||||
let table = "my_table";
|
||||
let (_, table_global_value) =
|
||||
@@ -505,6 +513,7 @@ mod tests {
|
||||
datanode_lease_secs: 10,
|
||||
server_addr: "127.0.0.1:3002".to_string(),
|
||||
kv_store,
|
||||
meta_peer_client,
|
||||
catalog: Some(DEFAULT_CATALOG_NAME.to_string()),
|
||||
schema: Some(DEFAULT_SCHEMA_NAME.to_string()),
|
||||
table: Some(table.to_string()),
|
||||
|
||||
@@ -29,7 +29,7 @@ impl Selector for LeaseBasedSelector {
|
||||
async fn select(&self, ns: Namespace, ctx: &Self::Context) -> Result<Self::Output> {
|
||||
// filter out the nodes out lease
|
||||
let mut lease_kvs: Vec<_> =
|
||||
lease::alive_datanodes(ns, &ctx.kv_store, ctx.datanode_lease_secs)
|
||||
lease::alive_datanodes(ns, &ctx.meta_peer_client, ctx.datanode_lease_secs)
|
||||
.await?
|
||||
.into_iter()
|
||||
.collect();
|
||||
|
||||
@@ -15,7 +15,6 @@
|
||||
use api::v1::meta::Peer;
|
||||
use common_telemetry::warn;
|
||||
|
||||
use crate::cluster::MetaPeerClient;
|
||||
use crate::error::Result;
|
||||
use crate::handler::node_stat::RegionStat;
|
||||
use crate::keys::{LeaseKey, LeaseValue, StatKey, StatValue};
|
||||
@@ -25,9 +24,7 @@ use crate::selector::{Namespace, Selector};
|
||||
|
||||
const MAX_REGION_NUMBER: u64 = u64::MAX;
|
||||
|
||||
pub struct LoadBasedSelector {
|
||||
pub meta_peer_client: MetaPeerClient,
|
||||
}
|
||||
pub struct LoadBasedSelector;
|
||||
|
||||
#[async_trait::async_trait]
|
||||
impl Selector for LoadBasedSelector {
|
||||
@@ -36,13 +33,14 @@ impl Selector for LoadBasedSelector {
|
||||
|
||||
async fn select(&self, ns: Namespace, ctx: &Self::Context) -> Result<Self::Output> {
|
||||
// get alive datanodes
|
||||
let lease_kvs = lease::alive_datanodes(ns, &ctx.kv_store, ctx.datanode_lease_secs).await?;
|
||||
let lease_kvs =
|
||||
lease::alive_datanodes(ns, &ctx.meta_peer_client, ctx.datanode_lease_secs).await?;
|
||||
if lease_kvs.is_empty() {
|
||||
return Ok(vec![]);
|
||||
}
|
||||
|
||||
let stat_keys: Vec<StatKey> = lease_kvs.keys().map(|k| k.into()).collect();
|
||||
let stat_kvs = self.meta_peer_client.get_dn_stat_kvs(stat_keys).await?;
|
||||
let stat_kvs = ctx.meta_peer_client.get_dn_stat_kvs(stat_keys).await?;
|
||||
|
||||
let mut tuples: Vec<(LeaseKey, LeaseValue, u64)> = lease_kvs
|
||||
.into_iter()
|
||||
|
||||
@@ -15,16 +15,16 @@
|
||||
use std::collections::HashMap;
|
||||
|
||||
use serde::{Deserialize, Serialize};
|
||||
use snafu::{OptionExt, ResultExt};
|
||||
use snafu::ResultExt;
|
||||
use tonic::codegen::http;
|
||||
|
||||
use crate::cluster::MetaPeerClient;
|
||||
use crate::cluster::MetaPeerClientRef;
|
||||
use crate::error::{self, Result};
|
||||
use crate::keys::StatValue;
|
||||
use crate::service::admin::HttpHandler;
|
||||
|
||||
pub struct HeartBeatHandler {
|
||||
pub meta_peer_client: Option<MetaPeerClient>,
|
||||
pub meta_peer_client: MetaPeerClientRef,
|
||||
}
|
||||
|
||||
#[async_trait::async_trait]
|
||||
@@ -34,12 +34,7 @@ impl HttpHandler for HeartBeatHandler {
|
||||
_: &str,
|
||||
params: &HashMap<String, String>,
|
||||
) -> Result<http::Response<String>> {
|
||||
let meta_peer_client = self
|
||||
.meta_peer_client
|
||||
.as_ref()
|
||||
.context(error::NoMetaPeerClientSnafu)?;
|
||||
|
||||
let stat_kvs = meta_peer_client.get_all_dn_stat_kvs().await?;
|
||||
let stat_kvs = self.meta_peer_client.get_all_dn_stat_kvs().await?;
|
||||
let mut stat_vals: Vec<StatValue> = stat_kvs.into_values().collect();
|
||||
|
||||
if let Some(addr) = params.get("addr") {
|
||||
|
||||
@@ -84,6 +84,7 @@ impl router_server::Router for MetaSrv {
|
||||
datanode_lease_secs: self.options().datanode_lease_secs,
|
||||
server_addr: self.options().server_addr.clone(),
|
||||
kv_store: self.kv_store(),
|
||||
meta_peer_client: self.meta_peer_client(),
|
||||
catalog: Some(table_name.catalog_name.clone()),
|
||||
schema: Some(table_name.schema_name.clone()),
|
||||
table: Some(table_name.table_name.clone()),
|
||||
|
||||
@@ -16,6 +16,7 @@ use std::sync::Arc;
|
||||
|
||||
use common_procedure::local::{LocalManager, ManagerConfig};
|
||||
|
||||
use crate::cluster::MetaPeerClientBuilder;
|
||||
use crate::handler::{HeartbeatMailbox, Pushers};
|
||||
use crate::lock::memory::MemLock;
|
||||
use crate::metasrv::SelectorContext;
|
||||
@@ -35,11 +36,21 @@ pub(crate) fn create_region_failover_manager() -> Arc<RegionFailoverManager> {
|
||||
let state_store = Arc::new(MetaStateStore::new(kv_store.clone()));
|
||||
let procedure_manager = Arc::new(LocalManager::new(ManagerConfig::default(), state_store));
|
||||
|
||||
let in_memory = Arc::new(MemStore::new());
|
||||
let meta_peer_client = MetaPeerClientBuilder::default()
|
||||
.election(None)
|
||||
.in_memory(in_memory)
|
||||
.build()
|
||||
.map(Arc::new)
|
||||
// Safety: all required fields set at initialization
|
||||
.unwrap();
|
||||
|
||||
let selector = Arc::new(LeaseBasedSelector);
|
||||
let selector_ctx = SelectorContext {
|
||||
datanode_lease_secs: 10,
|
||||
server_addr: "127.0.0.1:3002".to_string(),
|
||||
kv_store,
|
||||
meta_peer_client,
|
||||
catalog: None,
|
||||
schema: None,
|
||||
table: None,
|
||||
|
||||
@@ -31,6 +31,7 @@ use datanode::heartbeat::HeartbeatTask;
|
||||
use datanode::instance::Instance as DatanodeInstance;
|
||||
use frontend::instance::{FrontendInstance, Instance as FeInstance};
|
||||
use meta_client::client::MetaClientBuilder;
|
||||
use meta_srv::cluster::MetaPeerClientRef;
|
||||
use meta_srv::metasrv::{MetaSrv, MetaSrvOptions};
|
||||
use meta_srv::mocks::MockInfo;
|
||||
use meta_srv::service::store::kv::KvStoreRef;
|
||||
@@ -93,7 +94,8 @@ impl GreptimeDbClusterBuilder {
|
||||
|
||||
let datanode_clients = build_datanode_clients(&datanode_instances, datanodes).await;
|
||||
|
||||
self.wait_datanodes_alive(datanodes).await;
|
||||
self.wait_datanodes_alive(&meta_srv.meta_srv.meta_peer_client(), datanodes)
|
||||
.await;
|
||||
|
||||
let frontend = self
|
||||
.build_frontend(meta_srv.clone(), datanode_clients)
|
||||
@@ -162,13 +164,17 @@ impl GreptimeDbClusterBuilder {
|
||||
(instances, heartbeat_tasks, storage_guards, wal_guards)
|
||||
}
|
||||
|
||||
async fn wait_datanodes_alive(&self, expected_datanodes: u32) {
|
||||
let kv_store = self.kv_store();
|
||||
async fn wait_datanodes_alive(
|
||||
&self,
|
||||
meta_peer_client: &MetaPeerClientRef,
|
||||
expected_datanodes: u32,
|
||||
) {
|
||||
for _ in 0..10 {
|
||||
let alive_datanodes = meta_srv::lease::filter_datanodes(1000, &kv_store, |_, _| true)
|
||||
.await
|
||||
.unwrap()
|
||||
.len() as u32;
|
||||
let alive_datanodes =
|
||||
meta_srv::lease::filter_datanodes(1000, meta_peer_client, |_, _| true)
|
||||
.await
|
||||
.unwrap()
|
||||
.len() as u32;
|
||||
if alive_datanodes == expected_datanodes {
|
||||
return;
|
||||
}
|
||||
@@ -225,10 +231,6 @@ impl GreptimeDbClusterBuilder {
|
||||
.unwrap(),
|
||||
)
|
||||
}
|
||||
|
||||
fn kv_store(&self) -> KvStoreRef {
|
||||
self.kv_store.clone()
|
||||
}
|
||||
}
|
||||
|
||||
async fn build_datanode_clients(
|
||||
|
||||
@@ -337,6 +337,7 @@ async fn run_region_failover_procedure(
|
||||
datanode_lease_secs: meta_srv.options().datanode_lease_secs,
|
||||
server_addr: meta_srv.options().server_addr.clone(),
|
||||
kv_store: meta_srv.kv_store(),
|
||||
meta_peer_client: meta_srv.meta_peer_client(),
|
||||
catalog: None,
|
||||
schema: None,
|
||||
table: None,
|
||||
|
||||
Reference in New Issue
Block a user