diff --git a/src/meta-srv/src/election.rs b/src/meta-srv/src/election.rs index b91ca9b22c..005e5d79ec 100644 --- a/src/meta-srv/src/election.rs +++ b/src/meta-srv/src/election.rs @@ -17,7 +17,7 @@ pub mod etcd; use crate::error::Result; pub const LEASE_SECS: i64 = 3; -pub const PROCLAIM_PERIOD_SECS: u64 = LEASE_SECS as u64 * 2 / 3; +pub const KEEP_ALIVE_PERIOD_SECS: u64 = LEASE_SECS as u64 * 2 / 3; pub const ELECTION_KEY: &str = "__meta_srv_election"; #[async_trait::async_trait] @@ -27,6 +27,13 @@ pub trait Election: Send + Sync { /// Returns `true` if current node is the leader. fn is_leader(&self) -> bool; + /// When a new leader is born, it may need some initialization + /// operations (asynchronous), this method tells us when these + /// initialization operations can be performed. + /// + /// note: a new leader will only return true on the first call. + fn in_infancy(&self) -> bool; + /// Campaign waits to acquire leadership in an election. /// /// Multiple sessions can participate in the election, diff --git a/src/meta-srv/src/election/etcd.rs b/src/meta-srv/src/election/etcd.rs index 59fca5e313..fc4ceef578 100644 --- a/src/meta-srv/src/election/etcd.rs +++ b/src/meta-srv/src/election/etcd.rs @@ -20,7 +20,7 @@ use common_telemetry::{info, warn}; use etcd_client::Client; use snafu::{OptionExt, ResultExt}; -use crate::election::{Election, ELECTION_KEY, LEASE_SECS, PROCLAIM_PERIOD_SECS}; +use crate::election::{Election, ELECTION_KEY, KEEP_ALIVE_PERIOD_SECS, LEASE_SECS}; use crate::error; use crate::error::Result; use crate::metasrv::{ElectionRef, LeaderValue}; @@ -29,6 +29,7 @@ pub struct EtcdElection { leader_value: String, client: Client, is_leader: AtomicBool, + infancy: AtomicBool, } impl EtcdElection { @@ -46,6 +47,7 @@ impl EtcdElection { leader_value, client, is_leader: AtomicBool::new(false), + infancy: AtomicBool::new(false), })) } } @@ -58,6 +60,12 @@ impl Election for EtcdElection { self.is_leader.load(Ordering::Relaxed) } + fn in_infancy(&self) -> bool { + self.infancy + .compare_exchange(true, false, Ordering::Relaxed, Ordering::Relaxed) + .is_ok() + } + async fn campaign(&self) -> Result<()> { let mut lease_client = self.client.lease_client(); let mut election_client = self.client.election_client(); @@ -67,22 +75,21 @@ impl Election for EtcdElection { .context(error::EtcdFailedSnafu)?; let lease_id = res.id(); - info!("Election grant ttl: {:?}, id: {:?}", res.ttl(), lease_id); + info!("Election grant ttl: {:?}, lease: {:?}", res.ttl(), lease_id); - // campaign + // Campaign, waits to acquire leadership in an election, returning + // a LeaderKey representing the leadership if successful. + // + // The method will be blocked until the election is won, and after + // passing the method, it is necessary to execute `keep_alive` immediately + // to confirm that it is a valid leader, because it is possible that the + // election's lease expires. let res = election_client .campaign(ELECTION_KEY, self.leader_value.clone(), lease_id) .await .context(error::EtcdFailedSnafu)?; if let Some(leader) = res.leader() { - info!( - "[{}] becoming leader: {:?}, lease: {}", - &self.leader_value, - leader.name_str(), - leader.lease() - ); - let (mut keeper, mut receiver) = self .client .lease_client() @@ -90,17 +97,31 @@ impl Election for EtcdElection { .await .context(error::EtcdFailedSnafu)?; - let mut interval = tokio::time::interval(Duration::from_secs(PROCLAIM_PERIOD_SECS)); + let mut keep_alive_interval = + tokio::time::interval(Duration::from_secs(KEEP_ALIVE_PERIOD_SECS)); loop { - interval.tick().await; + keep_alive_interval.tick().await; keeper.keep_alive().await.context(error::EtcdFailedSnafu)?; if let Some(res) = receiver.message().await.context(error::EtcdFailedSnafu)? { if res.ttl() > 0 { - self.is_leader.store(true, Ordering::Relaxed); + // Only after a successful `keep_alive` is the leader considered official. + if self + .is_leader + .compare_exchange(false, true, Ordering::Relaxed, Ordering::Relaxed) + .is_ok() + { + self.infancy.store(true, Ordering::Relaxed); + info!( + "[{}] becoming leader: {:?}, lease: {}", + &self.leader_value, + leader.name_str(), + leader.lease() + ); + } } else { warn!( - "Already lost leader status, lease: {}, will re-initiate election", + "Failed to keep-alive, lease: {}, will re-initiate election", leader.lease() ); break; diff --git a/src/meta-srv/src/handler.rs b/src/meta-srv/src/handler.rs index 4b7b39e3ac..6c146e517a 100644 --- a/src/meta-srv/src/handler.rs +++ b/src/meta-srv/src/handler.rs @@ -15,6 +15,7 @@ pub use check_leader_handler::CheckLeaderHandler; pub use collect_stats_handler::CollectStatsHandler; pub use keep_lease_handler::KeepLeaseHandler; +pub use on_leader_start::OnLeaderStartHandler; pub use persist_stats_handler::PersistStatsHandler; pub use response_header_handler::ResponseHeaderHandler; @@ -23,6 +24,7 @@ mod collect_stats_handler; mod instruction; mod keep_lease_handler; pub(crate) mod node_stat; +mod on_leader_start; mod persist_stats_handler; mod response_header_handler; @@ -44,7 +46,7 @@ pub trait HeartbeatHandler: Send + Sync { async fn handle( &self, req: &HeartbeatRequest, - ctx: &Context, + ctx: &mut Context, acc: &mut HeartbeatAccumulator, ) -> Result<()>; } @@ -91,11 +93,15 @@ impl HeartbeatHandlerGroup { pushers.remove(key) } - pub async fn handle(&self, req: HeartbeatRequest, ctx: Context) -> Result { + pub async fn handle( + &self, + req: HeartbeatRequest, + mut ctx: Context, + ) -> Result { let mut acc = HeartbeatAccumulator::default(); let handlers = self.handlers.read().await; for h in handlers.iter() { - h.handle(&req, &ctx, &mut acc).await?; + h.handle(&req, &mut ctx, &mut acc).await?; } let header = std::mem::take(&mut acc.header); let res = HeartbeatResponse { diff --git a/src/meta-srv/src/handler/check_leader_handler.rs b/src/meta-srv/src/handler/check_leader_handler.rs index c35c7f822c..c30f41fb18 100644 --- a/src/meta-srv/src/handler/check_leader_handler.rs +++ b/src/meta-srv/src/handler/check_leader_handler.rs @@ -26,7 +26,7 @@ impl HeartbeatHandler for CheckLeaderHandler { async fn handle( &self, _req: &HeartbeatRequest, - ctx: &Context, + ctx: &mut Context, acc: &mut HeartbeatAccumulator, ) -> Result<()> { if let Some(election) = &ctx.election { diff --git a/src/meta-srv/src/handler/collect_stats_handler.rs b/src/meta-srv/src/handler/collect_stats_handler.rs index 7374415846..d5c34222d8 100644 --- a/src/meta-srv/src/handler/collect_stats_handler.rs +++ b/src/meta-srv/src/handler/collect_stats_handler.rs @@ -51,14 +51,14 @@ impl HeartbeatHandler for CollectStatsHandler { async fn handle( &self, req: &HeartbeatRequest, - ctx: &Context, + ctx: &mut Context, acc: &mut HeartbeatAccumulator, ) -> Result<()> { if ctx.is_skip_all() { return Ok(()); } - match Stat::try_from(req) { + match Stat::try_from(req.clone()) { Ok(stat) => { let key = (stat.cluster_id, stat.id); match self.cache.entry(key) { diff --git a/src/meta-srv/src/handler/keep_lease_handler.rs b/src/meta-srv/src/handler/keep_lease_handler.rs index e3bfcd056a..820ee6b061 100644 --- a/src/meta-srv/src/handler/keep_lease_handler.rs +++ b/src/meta-srv/src/handler/keep_lease_handler.rs @@ -58,7 +58,7 @@ impl HeartbeatHandler for KeepLeaseHandler { async fn handle( &self, req: &HeartbeatRequest, - ctx: &Context, + ctx: &mut Context, _acc: &mut HeartbeatAccumulator, ) -> Result<()> { if ctx.is_skip_all() { diff --git a/src/meta-srv/src/handler/node_stat.rs b/src/meta-srv/src/handler/node_stat.rs index 9431b01a53..3c2ad1d943 100644 --- a/src/meta-srv/src/handler/node_stat.rs +++ b/src/meta-srv/src/handler/node_stat.rs @@ -69,10 +69,10 @@ impl Stat { } } -impl TryFrom<&HeartbeatRequest> for Stat { +impl TryFrom for Stat { type Error = (); - fn try_from(value: &HeartbeatRequest) -> Result { + fn try_from(value: HeartbeatRequest) -> Result { let HeartbeatRequest { header, peer, @@ -87,8 +87,8 @@ impl TryFrom<&HeartbeatRequest> for Stat { timestamp_millis: time_util::current_time_millis(), cluster_id: header.cluster_id, id: peer.id, - addr: peer.addr.clone(), - is_leader: *is_leader, + addr: peer.addr, + is_leader, rcus: node_stat.rcus, wcus: node_stat.wcus, table_num: node_stat.table_num, @@ -97,15 +97,15 @@ impl TryFrom<&HeartbeatRequest> for Stat { load: node_stat.load, read_io_rate: node_stat.read_io_rate, write_io_rate: node_stat.write_io_rate, - region_stats: region_stats.iter().map(RegionStat::from).collect(), + region_stats: region_stats.into_iter().map(RegionStat::from).collect(), }), _ => Err(()), } } } -impl From<&api::v1::meta::RegionStat> for RegionStat { - fn from(value: &api::v1::meta::RegionStat) -> Self { +impl From for RegionStat { + fn from(value: api::v1::meta::RegionStat) -> Self { let table = value.table_name.as_ref(); Self { id: value.region_id, diff --git a/src/meta-srv/src/handler/on_leader_start.rs b/src/meta-srv/src/handler/on_leader_start.rs new file mode 100644 index 0000000000..8e733e9744 --- /dev/null +++ b/src/meta-srv/src/handler/on_leader_start.rs @@ -0,0 +1,39 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use api::v1::meta::HeartbeatRequest; + +use crate::error::Result; +use crate::handler::{HeartbeatAccumulator, HeartbeatHandler}; +use crate::metasrv::Context; + +#[derive(Default)] +pub struct OnLeaderStartHandler; + +#[async_trait::async_trait] +impl HeartbeatHandler for OnLeaderStartHandler { + async fn handle( + &self, + _req: &HeartbeatRequest, + ctx: &mut Context, + _acc: &mut HeartbeatAccumulator, + ) -> Result<()> { + if let Some(election) = &ctx.election { + if election.in_infancy() { + ctx.reset_in_memory(); + } + } + Ok(()) + } +} diff --git a/src/meta-srv/src/handler/persist_stats_handler.rs b/src/meta-srv/src/handler/persist_stats_handler.rs index d3c6b21751..146dc8be30 100644 --- a/src/meta-srv/src/handler/persist_stats_handler.rs +++ b/src/meta-srv/src/handler/persist_stats_handler.rs @@ -27,7 +27,7 @@ impl HeartbeatHandler for PersistStatsHandler { async fn handle( &self, _req: &HeartbeatRequest, - ctx: &Context, + ctx: &mut Context, acc: &mut HeartbeatAccumulator, ) -> Result<()> { if ctx.is_skip_all() || acc.stats.is_empty() { @@ -43,7 +43,7 @@ impl HeartbeatHandler for PersistStatsHandler { // take stats from &mut acc.stats, avoid clone of vec let stats = std::mem::take(stats); - let val = &StatValue { stats }; + let val = StatValue { stats }; let put = PutRequest { key: key.into(), @@ -71,10 +71,12 @@ mod tests { #[tokio::test] async fn test_handle_datanode_stats() { + let in_memory = Arc::new(MemStore::new()); let kv_store = Arc::new(MemStore::new()); - let ctx = Context { + let mut ctx = Context { datanode_lease_secs: 30, server_addr: "127.0.0.1:0000".to_string(), + in_memory, kv_store, election: None, skip_all: Arc::new(AtomicBool::new(false)), @@ -92,7 +94,10 @@ mod tests { }; let stats_handler = PersistStatsHandler; - stats_handler.handle(&req, &ctx, &mut acc).await.unwrap(); + stats_handler + .handle(&req, &mut ctx, &mut acc) + .await + .unwrap(); let key = StatKey { cluster_id: 3, diff --git a/src/meta-srv/src/handler/response_header_handler.rs b/src/meta-srv/src/handler/response_header_handler.rs index c81f3e013c..f4158a0e5d 100644 --- a/src/meta-srv/src/handler/response_header_handler.rs +++ b/src/meta-srv/src/handler/response_header_handler.rs @@ -26,7 +26,7 @@ impl HeartbeatHandler for ResponseHeaderHandler { async fn handle( &self, req: &HeartbeatRequest, - _ctx: &Context, + _ctx: &mut Context, acc: &mut HeartbeatAccumulator, ) -> Result<()> { let HeartbeatRequest { header, .. } = req; @@ -53,10 +53,12 @@ mod tests { #[tokio::test] async fn test_handle_heartbeat_resp_header() { + let in_memory = Arc::new(MemStore::new()); let kv_store = Arc::new(MemStore::new()); - let ctx = Context { + let mut ctx = Context { datanode_lease_secs: 30, server_addr: "127.0.0.1:0000".to_string(), + in_memory, kv_store, election: None, skip_all: Arc::new(AtomicBool::new(false)), @@ -69,7 +71,10 @@ mod tests { let mut acc = HeartbeatAccumulator::default(); let response_handler = ResponseHeaderHandler {}; - response_handler.handle(&req, &ctx, &mut acc).await.unwrap(); + response_handler + .handle(&req, &mut ctx, &mut acc) + .await + .unwrap(); let header = std::mem::take(&mut acc.header); let res = HeartbeatResponse { header, diff --git a/src/meta-srv/src/keys.rs b/src/meta-srv/src/keys.rs index b4f5b22ab3..2f3283fe9e 100644 --- a/src/meta-srv/src/keys.rs +++ b/src/meta-srv/src/keys.rs @@ -228,11 +228,11 @@ pub struct StatValue { pub stats: Vec, } -impl TryFrom<&StatValue> for Vec { +impl TryFrom for Vec { type Error = error::Error; - fn try_from(stats: &StatValue) -> Result { - Ok(serde_json::to_string(stats) + fn try_from(stats: StatValue) -> Result { + Ok(serde_json::to_string(&stats) .context(crate::error::SerializeToJsonSnafu { input: format!("{stats:?}"), })? @@ -286,7 +286,7 @@ mod tests { ..Default::default() }; - let stat_val = &StatValue { stats: vec![stat] }; + let stat_val = StatValue { stats: vec![stat] }; let bytes: Vec = stat_val.try_into().unwrap(); let stat_val: StatValue = bytes.try_into().unwrap(); diff --git a/src/meta-srv/src/metasrv.rs b/src/meta-srv/src/metasrv.rs index 0a042ed582..91cae7f8ee 100644 --- a/src/meta-srv/src/metasrv.rs +++ b/src/meta-srv/src/metasrv.rs @@ -22,12 +22,13 @@ use serde::{Deserialize, Serialize}; use crate::election::Election; use crate::handler::{ CheckLeaderHandler, CollectStatsHandler, HeartbeatHandlerGroup, KeepLeaseHandler, - PersistStatsHandler, ResponseHeaderHandler, + OnLeaderStartHandler, PersistStatsHandler, ResponseHeaderHandler, }; use crate::selector::lease_based::LeaseBasedSelector; use crate::selector::Selector; use crate::sequence::{Sequence, SequenceRef}; -use crate::service::store::kv::KvStoreRef; +use crate::service::store::kv::{KvStoreRef, ResetableKvStoreRef}; +use crate::service::store::memory::MemStore; pub const TABLE_ID_SEQ: &str = "table_id"; @@ -55,6 +56,7 @@ impl Default for MetaSrvOptions { pub struct Context { pub datanode_lease_secs: i64, pub server_addr: String, + pub in_memory: ResetableKvStoreRef, pub kv_store: KvStoreRef, pub election: Option, pub skip_all: Arc, @@ -68,6 +70,10 @@ impl Context { pub fn set_skip_all(&self) { self.skip_all.store(true, Ordering::Relaxed); } + + pub fn reset_in_memory(&self) { + self.in_memory.reset(); + } } pub struct LeaderValue(pub String); @@ -79,6 +85,9 @@ pub type ElectionRef = Arc>; pub struct MetaSrv { started: Arc, options: MetaSrvOptions, + // It is only valid at the leader node and is used to temporarily + // store some data that will not be persisted. + in_memory: ResetableKvStoreRef, kv_store: KvStoreRef, table_id_sequence: SequenceRef, selector: SelectorRef, @@ -97,26 +106,29 @@ impl MetaSrv { let started = Arc::new(AtomicBool::new(false)); let table_id_sequence = Arc::new(Sequence::new(TABLE_ID_SEQ, 1024, 10, kv_store.clone())); let selector = selector.unwrap_or_else(|| Arc::new(LeaseBasedSelector {})); + let in_memory = Arc::new(MemStore::default()); let handler_group = match handler_group { Some(hg) => hg, None => { - let hg = HeartbeatHandlerGroup::default(); - let kv_store = kv_store.clone(); - hg.add_handler(ResponseHeaderHandler::default()).await; + let group = HeartbeatHandlerGroup::default(); + let keep_lease_handler = KeepLeaseHandler::new(kv_store.clone()); + group.add_handler(ResponseHeaderHandler::default()).await; // `KeepLeaseHandler` should preferably be in front of `CheckLeaderHandler`, // because even if the current meta-server node is no longer the leader it can // still help the datanode to keep lease. - hg.add_handler(KeepLeaseHandler::new(kv_store)).await; - hg.add_handler(CheckLeaderHandler::default()).await; - hg.add_handler(CollectStatsHandler::default()).await; - hg.add_handler(PersistStatsHandler::default()).await; - hg + group.add_handler(keep_lease_handler).await; + group.add_handler(CheckLeaderHandler::default()).await; + group.add_handler(OnLeaderStartHandler::default()).await; + group.add_handler(CollectStatsHandler::default()).await; + group.add_handler(PersistStatsHandler::default()).await; + group } }; Self { started, options, + in_memory, kv_store, table_id_sequence, selector, @@ -162,6 +174,11 @@ impl MetaSrv { &self.options } + #[inline] + pub fn in_memory(&self) -> ResetableKvStoreRef { + self.in_memory.clone() + } + #[inline] pub fn kv_store(&self) -> KvStoreRef { self.kv_store.clone() @@ -191,12 +208,14 @@ impl MetaSrv { pub fn new_ctx(&self) -> Context { let datanode_lease_secs = self.options().datanode_lease_secs; let server_addr = self.options().server_addr.clone(); + let in_memory = self.in_memory(); let kv_store = self.kv_store(); let election = self.election(); let skip_all = Arc::new(AtomicBool::new(false)); Context { datanode_lease_secs, server_addr, + in_memory, kv_store, election, skip_all, diff --git a/src/meta-srv/src/service/store/etcd.rs b/src/meta-srv/src/service/store/etcd.rs index fb1d6edeb4..38dd9220f4 100644 --- a/src/meta-srv/src/service/store/etcd.rs +++ b/src/meta-srv/src/service/store/etcd.rs @@ -29,7 +29,6 @@ use crate::error; use crate::error::Result; use crate::service::store::kv::{KvStore, KvStoreRef}; -#[derive(Clone)] pub struct EtcdStore { client: Client, } diff --git a/src/meta-srv/src/service/store/kv.rs b/src/meta-srv/src/service/store/kv.rs index 17962abd7b..532fe28124 100644 --- a/src/meta-srv/src/service/store/kv.rs +++ b/src/meta-srv/src/service/store/kv.rs @@ -23,6 +23,7 @@ use api::v1::meta::{ use crate::error::Result; pub type KvStoreRef = Arc; +pub type ResetableKvStoreRef = Arc; #[async_trait::async_trait] pub trait KvStore: Send + Sync { @@ -38,3 +39,7 @@ pub trait KvStore: Send + Sync { async fn move_value(&self, req: MoveValueRequest) -> Result; } + +pub trait ResetableKvStore: KvStore { + fn reset(&self); +} diff --git a/src/meta-srv/src/service/store/memory.rs b/src/meta-srv/src/service/store/memory.rs index f6a6fe4701..29dc59a990 100644 --- a/src/meta-srv/src/service/store/memory.rs +++ b/src/meta-srv/src/service/store/memory.rs @@ -15,7 +15,6 @@ use std::collections::btree_map::Entry; use std::collections::BTreeMap; use std::ops::Range; -use std::sync::Arc; use api::v1::meta::{ BatchPutRequest, BatchPutResponse, CompareAndPutRequest, CompareAndPutResponse, @@ -25,12 +24,10 @@ use api::v1::meta::{ use parking_lot::RwLock; use crate::error::Result; -use crate::service::store::kv::KvStore; +use crate::service::store::kv::{KvStore, ResetableKvStore}; -/// Only for mock test -#[derive(Clone)] pub struct MemStore { - inner: Arc, Vec>>>, + inner: RwLock, Vec>>, } impl Default for MemStore { @@ -42,11 +39,17 @@ impl Default for MemStore { impl MemStore { pub fn new() -> Self { Self { - inner: Arc::new(RwLock::new(Default::default())), + inner: RwLock::new(Default::default()), } } } +impl ResetableKvStore for MemStore { + fn reset(&self) { + self.inner.write().clear(); + } +} + #[async_trait::async_trait] impl KvStore for MemStore { async fn range(&self, req: RangeRequest) -> Result {