diff --git a/Cargo.lock b/Cargo.lock index d17d0c05bd..67ec748de3 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4097,7 +4097,7 @@ checksum = "d2fabcfbdc87f4758337ca535fb41a6d701b65693ce38287d856d1674551ec9b" [[package]] name = "greptime-proto" version = "0.1.0" -source = "git+https://github.com/GreptimeTeam/greptime-proto.git?rev=5d5eb65bb985ff47b3a417fb2505e315e2f5c319#5d5eb65bb985ff47b3a417fb2505e315e2f5c319" +source = "git+https://github.com/GreptimeTeam/greptime-proto.git?rev=7aeaeaba1e0ca6a5c736b6ab2eb63144ae3d284b#7aeaeaba1e0ca6a5c736b6ab2eb63144ae3d284b" dependencies = [ "prost", "serde", diff --git a/Cargo.toml b/Cargo.toml index a183bbddfe..4a5d984538 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -72,7 +72,7 @@ datafusion-sql = { git = "https://github.com/waynexia/arrow-datafusion.git", rev datafusion-substrait = { git = "https://github.com/waynexia/arrow-datafusion.git", rev = "63e52dde9e44cac4b1f6c6e6b6bf6368ba3bd323" } futures = "0.3" futures-util = "0.3" -greptime-proto = { git = "https://github.com/GreptimeTeam/greptime-proto.git", rev = "5d5eb65bb985ff47b3a417fb2505e315e2f5c319" } +greptime-proto = { git = "https://github.com/GreptimeTeam/greptime-proto.git", rev = "7aeaeaba1e0ca6a5c736b6ab2eb63144ae3d284b" } itertools = "0.10" parquet = "40.0" paste = "1.0" diff --git a/src/datanode/src/heartbeat.rs b/src/datanode/src/heartbeat.rs index 87275b6613..efd0c1ec20 100644 --- a/src/datanode/src/heartbeat.rs +++ b/src/datanode/src/heartbeat.rs @@ -36,6 +36,7 @@ pub(crate) mod handler; pub struct HeartbeatTask { node_id: u64, + node_epoch: u64, server_addr: String, server_hostname: Option, running: Arc, @@ -65,6 +66,8 @@ impl HeartbeatTask { ) -> Self { Self { node_id, + // We use datanode's start time millis as the node's epoch. + node_epoch: common_time::util::current_time_millis() as u64, server_addr, server_hostname, running: Arc::new(AtomicBool::new(false)), @@ -133,6 +136,7 @@ impl HeartbeatTask { } let interval = self.interval; let node_id = self.node_id; + let node_epoch = self.node_epoch; let addr = resolve_addr(&self.server_addr, &self.server_hostname); info!("Starting heartbeat to Metasrv with interval {interval}. My node id is {node_id}, address is {addr}."); @@ -201,6 +205,7 @@ impl HeartbeatTask { }), region_stats, duration_since_epoch: (Instant::now() - epoch).as_millis() as u64, + node_epoch, ..Default::default() }; sleep.as_mut().reset(Instant::now() + Duration::from_millis(interval)); diff --git a/src/meta-srv/src/handler/node_stat.rs b/src/meta-srv/src/handler/node_stat.rs index ef9289470a..f0f17eefa9 100644 --- a/src/meta-srv/src/handler/node_stat.rs +++ b/src/meta-srv/src/handler/node_stat.rs @@ -42,6 +42,8 @@ pub struct Stat { pub write_io_rate: f64, /// Region stats on this node pub region_stats: Vec, + // The node epoch is used to check whether the node has restarted or redeployed. + pub node_epoch: u64, } #[derive(Debug, Default, Serialize, Deserialize)] @@ -79,6 +81,7 @@ impl TryFrom for Stat { is_leader, node_stat, region_stats, + node_epoch, .. } = value; @@ -104,6 +107,7 @@ impl TryFrom for Stat { read_io_rate: node_stat.read_io_rate, write_io_rate: node_stat.write_io_rate, region_stats: region_stats.into_iter().map(RegionStat::from).collect(), + node_epoch, }) } _ => Err(()), diff --git a/src/meta-srv/src/handler/persist_stats_handler.rs b/src/meta-srv/src/handler/persist_stats_handler.rs index 09751c32ee..2b1ad61e11 100644 --- a/src/meta-srv/src/handler/persist_stats_handler.rs +++ b/src/meta-srv/src/handler/persist_stats_handler.rs @@ -23,9 +23,47 @@ use crate::metasrv::Context; const MAX_CACHED_STATS_PER_KEY: usize = 10; +#[derive(Default)] +struct EpochStats { + stats: Vec, + epoch: Option, +} + +impl EpochStats { + #[inline] + fn drain_all(&mut self) -> Vec { + self.stats.drain(..).collect() + } + + #[inline] + fn clear(&mut self) { + self.stats.clear(); + } + + #[inline] + fn push(&mut self, stat: Stat) { + self.stats.push(stat); + } + + #[inline] + fn len(&self) -> usize { + self.stats.len() + } + + #[inline] + fn epoch(&self) -> Option { + self.epoch + } + + #[inline] + fn set_epoch(&mut self, epoch: u64) { + self.epoch = Some(epoch); + } +} + #[derive(Default)] pub struct PersistStatsHandler { - stats_cache: DashMap>, + stats_cache: DashMap, } #[async_trait::async_trait] @@ -40,26 +78,47 @@ impl HeartbeatHandler for PersistStatsHandler { ctx: &mut Context, acc: &mut HeartbeatAccumulator, ) -> Result<()> { - let Some(stat) = acc.stat.take() else { return Ok(()) }; + let Some(current_stat) = acc.stat.take() else { return Ok(()) }; - let key = stat.stat_key(); + let key = current_stat.stat_key(); let mut entry = self .stats_cache .entry(key) - .or_insert_with(|| Vec::with_capacity(MAX_CACHED_STATS_PER_KEY)); - let stats = entry.value_mut(); - stats.push(stat); + .or_insert_with(EpochStats::default); - if stats.len() < MAX_CACHED_STATS_PER_KEY { + let key: Vec = key.into(); + let epoch_stats = entry.value_mut(); + + let refresh = if let Some(epoch) = epoch_stats.epoch() { + // This node may have been redeployed. + if current_stat.node_epoch > epoch { + epoch_stats.set_epoch(current_stat.node_epoch); + epoch_stats.clear(); + true + } else { + false + } + } else { + epoch_stats.set_epoch(current_stat.node_epoch); + // If the epoch is empty, it indicates that the current node sending the heartbeat + // for the first time to the current meta leader, so it is necessary to persist + // the data to the KV store as soon as possible. + true + }; + + epoch_stats.push(current_stat); + + if !refresh && epoch_stats.len() < MAX_CACHED_STATS_PER_KEY { return Ok(()); } - let stats = stats.drain(..).collect(); - let val = StatValue { stats }; - + let value: Vec = StatValue { + stats: epoch_stats.drain_all(), + } + .try_into()?; let put = PutRequest { - key: key.into(), - value: val.try_into()?, + key, + value, ..Default::default() }; @@ -74,12 +133,11 @@ mod tests { use std::sync::atomic::AtomicBool; use std::sync::Arc; - use api::v1::meta::RangeRequest; - use super::*; use crate::handler::{HeartbeatMailbox, Pushers}; use crate::keys::StatKey; use crate::sequence::Sequence; + use crate::service::store::ext::KvStoreExt; use crate::service::store::memory::MemStore; #[tokio::test] @@ -88,7 +146,7 @@ mod tests { let kv_store = Arc::new(MemStore::new()); let seq = Sequence::new("test_seq", 0, 10, kv_store.clone()); let mailbox = HeartbeatMailbox::create(Pushers::default(), seq); - let mut ctx = Context { + let ctx = Context { server_addr: "127.0.0.1:0000".to_string(), in_memory, kv_store, @@ -98,9 +156,40 @@ mod tests { is_infancy: false, }; - let req = HeartbeatRequest::default(); let handler = PersistStatsHandler::default(); - for i in 1..=MAX_CACHED_STATS_PER_KEY { + handle_request_many_times(ctx.clone(), &handler, 1).await; + + let key = StatKey { + cluster_id: 3, + node_id: 101, + }; + let res = ctx.in_memory.get(key.try_into().unwrap()).await.unwrap(); + assert!(res.is_some()); + let kv = res.unwrap(); + let key: StatKey = kv.key.clone().try_into().unwrap(); + assert_eq!(3, key.cluster_id); + assert_eq!(101, key.node_id); + let val: StatValue = kv.value.try_into().unwrap(); + // first new stat must be set in kv store immediately + assert_eq!(1, val.stats.len()); + assert_eq!(Some(1), val.stats[0].region_num); + + handle_request_many_times(ctx.clone(), &handler, 10).await; + let res = ctx.in_memory.get(key.try_into().unwrap()).await.unwrap(); + assert!(res.is_some()); + let kv = res.unwrap(); + let val: StatValue = kv.value.try_into().unwrap(); + // refresh every 10 stats + assert_eq!(10, val.stats.len()); + } + + async fn handle_request_many_times( + mut ctx: Context, + handler: &PersistStatsHandler, + loop_times: i32, + ) { + let req = HeartbeatRequest::default(); + for i in 1..=loop_times { let mut acc = HeartbeatAccumulator { stat: Some(Stat { cluster_id: 3, @@ -112,30 +201,5 @@ mod tests { }; handler.handle(&req, &mut ctx, &mut acc).await.unwrap(); } - - let key = StatKey { - cluster_id: 3, - node_id: 101, - }; - - let req = RangeRequest { - key: key.try_into().unwrap(), - ..Default::default() - }; - - let res = ctx.in_memory.range(req).await.unwrap(); - - assert_eq!(1, res.kvs.len()); - - let kv = &res.kvs[0]; - - let key: StatKey = kv.key.clone().try_into().unwrap(); - assert_eq!(3, key.cluster_id); - assert_eq!(101, key.node_id); - - let val: StatValue = kv.value.clone().try_into().unwrap(); - - assert_eq!(10, val.stats.len()); - assert_eq!(Some(1), val.stats[0].region_num); } } diff --git a/src/meta-srv/src/service/store/ext.rs b/src/meta-srv/src/service/store/ext.rs index 9e629ef176..2cbe2c18ca 100644 --- a/src/meta-srv/src/service/store/ext.rs +++ b/src/meta-srv/src/service/store/ext.rs @@ -12,7 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -use api::v1::meta::{KeyValue, RangeRequest}; +use api::v1::meta::{DeleteRangeRequest, KeyValue, RangeRequest}; use crate::error::Result; use crate::service::store::kv::KvStore; @@ -24,6 +24,10 @@ pub trait KvStoreExt { /// Check if a key exists, it does not return the value. async fn exists(&self, key: Vec) -> Result; + + /// Delete the value by the given key. If prev_kv is true, + /// the previous key-value pairs will be returned. + async fn delete(&self, key: Vec, prev_kv: bool) -> Result>; } #[async_trait::async_trait] @@ -53,6 +57,18 @@ where Ok(!kvs.is_empty()) } + + async fn delete(&self, key: Vec, prev_kv: bool) -> Result> { + let req = DeleteRangeRequest { + key, + prev_kv, + ..Default::default() + }; + + let mut prev_kvs = self.delete_range(req).await?.prev_kvs; + + Ok(prev_kvs.pop()) + } } #[cfg(test)] @@ -115,6 +131,31 @@ mod tests { assert!(!in_mem.exists("test_key".as_bytes().to_vec()).await.unwrap()); } + #[tokio::test] + async fn test_delete() { + let mut in_mem = Arc::new(MemStore::new()) as KvStoreRef; + + let mut prev_kv = in_mem + .delete("test_key1".as_bytes().to_vec(), true) + .await + .unwrap(); + assert!(prev_kv.is_none()); + + put_stats_to_store(&mut in_mem).await; + + assert!(in_mem + .exists("test_key1".as_bytes().to_vec()) + .await + .unwrap()); + + prev_kv = in_mem + .delete("test_key1".as_bytes().to_vec(), true) + .await + .unwrap(); + assert!(prev_kv.is_some()); + assert_eq!("test_key1".as_bytes(), prev_kv.unwrap().key); + } + async fn put_stats_to_store(store: &mut KvStoreRef) { store .put(PutRequest {