feat: deal with node epoch (#1795)

* feat: deal with node epoch

* feat: dn send node_epoch

* Update src/meta-srv/src/handler/persist_stats_handler.rs

Co-authored-by: dennis zhuang <killme2008@gmail.com>

* Update src/meta-srv/src/service/store/ext.rs

Co-authored-by: dennis zhuang <killme2008@gmail.com>

* chore: by cr

---------

Co-authored-by: dennis zhuang <killme2008@gmail.com>
This commit is contained in:
JeremyHi
2023-06-20 15:07:05 +08:00
committed by GitHub
parent 30472cebae
commit 4fdee5ea3c
6 changed files with 159 additions and 45 deletions

2
Cargo.lock generated
View File

@@ -4097,7 +4097,7 @@ checksum = "d2fabcfbdc87f4758337ca535fb41a6d701b65693ce38287d856d1674551ec9b"
[[package]]
name = "greptime-proto"
version = "0.1.0"
source = "git+https://github.com/GreptimeTeam/greptime-proto.git?rev=5d5eb65bb985ff47b3a417fb2505e315e2f5c319#5d5eb65bb985ff47b3a417fb2505e315e2f5c319"
source = "git+https://github.com/GreptimeTeam/greptime-proto.git?rev=7aeaeaba1e0ca6a5c736b6ab2eb63144ae3d284b#7aeaeaba1e0ca6a5c736b6ab2eb63144ae3d284b"
dependencies = [
"prost",
"serde",

View File

@@ -72,7 +72,7 @@ datafusion-sql = { git = "https://github.com/waynexia/arrow-datafusion.git", rev
datafusion-substrait = { git = "https://github.com/waynexia/arrow-datafusion.git", rev = "63e52dde9e44cac4b1f6c6e6b6bf6368ba3bd323" }
futures = "0.3"
futures-util = "0.3"
greptime-proto = { git = "https://github.com/GreptimeTeam/greptime-proto.git", rev = "5d5eb65bb985ff47b3a417fb2505e315e2f5c319" }
greptime-proto = { git = "https://github.com/GreptimeTeam/greptime-proto.git", rev = "7aeaeaba1e0ca6a5c736b6ab2eb63144ae3d284b" }
itertools = "0.10"
parquet = "40.0"
paste = "1.0"

View File

@@ -36,6 +36,7 @@ pub(crate) mod handler;
pub struct HeartbeatTask {
node_id: u64,
node_epoch: u64,
server_addr: String,
server_hostname: Option<String>,
running: Arc<AtomicBool>,
@@ -65,6 +66,8 @@ impl HeartbeatTask {
) -> Self {
Self {
node_id,
// We use datanode's start time millis as the node's epoch.
node_epoch: common_time::util::current_time_millis() as u64,
server_addr,
server_hostname,
running: Arc::new(AtomicBool::new(false)),
@@ -133,6 +136,7 @@ impl HeartbeatTask {
}
let interval = self.interval;
let node_id = self.node_id;
let node_epoch = self.node_epoch;
let addr = resolve_addr(&self.server_addr, &self.server_hostname);
info!("Starting heartbeat to Metasrv with interval {interval}. My node id is {node_id}, address is {addr}.");
@@ -201,6 +205,7 @@ impl HeartbeatTask {
}),
region_stats,
duration_since_epoch: (Instant::now() - epoch).as_millis() as u64,
node_epoch,
..Default::default()
};
sleep.as_mut().reset(Instant::now() + Duration::from_millis(interval));

View File

@@ -42,6 +42,8 @@ pub struct Stat {
pub write_io_rate: f64,
/// Region stats on this node
pub region_stats: Vec<RegionStat>,
// The node epoch is used to check whether the node has restarted or redeployed.
pub node_epoch: u64,
}
#[derive(Debug, Default, Serialize, Deserialize)]
@@ -79,6 +81,7 @@ impl TryFrom<HeartbeatRequest> for Stat {
is_leader,
node_stat,
region_stats,
node_epoch,
..
} = value;
@@ -104,6 +107,7 @@ impl TryFrom<HeartbeatRequest> for Stat {
read_io_rate: node_stat.read_io_rate,
write_io_rate: node_stat.write_io_rate,
region_stats: region_stats.into_iter().map(RegionStat::from).collect(),
node_epoch,
})
}
_ => Err(()),

View File

@@ -23,9 +23,47 @@ use crate::metasrv::Context;
const MAX_CACHED_STATS_PER_KEY: usize = 10;
#[derive(Default)]
struct EpochStats {
stats: Vec<Stat>,
epoch: Option<u64>,
}
impl EpochStats {
#[inline]
fn drain_all(&mut self) -> Vec<Stat> {
self.stats.drain(..).collect()
}
#[inline]
fn clear(&mut self) {
self.stats.clear();
}
#[inline]
fn push(&mut self, stat: Stat) {
self.stats.push(stat);
}
#[inline]
fn len(&self) -> usize {
self.stats.len()
}
#[inline]
fn epoch(&self) -> Option<u64> {
self.epoch
}
#[inline]
fn set_epoch(&mut self, epoch: u64) {
self.epoch = Some(epoch);
}
}
#[derive(Default)]
pub struct PersistStatsHandler {
stats_cache: DashMap<StatKey, Vec<Stat>>,
stats_cache: DashMap<StatKey, EpochStats>,
}
#[async_trait::async_trait]
@@ -40,26 +78,47 @@ impl HeartbeatHandler for PersistStatsHandler {
ctx: &mut Context,
acc: &mut HeartbeatAccumulator,
) -> Result<()> {
let Some(stat) = acc.stat.take() else { return Ok(()) };
let Some(current_stat) = acc.stat.take() else { return Ok(()) };
let key = stat.stat_key();
let key = current_stat.stat_key();
let mut entry = self
.stats_cache
.entry(key)
.or_insert_with(|| Vec::with_capacity(MAX_CACHED_STATS_PER_KEY));
let stats = entry.value_mut();
stats.push(stat);
.or_insert_with(EpochStats::default);
if stats.len() < MAX_CACHED_STATS_PER_KEY {
let key: Vec<u8> = key.into();
let epoch_stats = entry.value_mut();
let refresh = if let Some(epoch) = epoch_stats.epoch() {
// This node may have been redeployed.
if current_stat.node_epoch > epoch {
epoch_stats.set_epoch(current_stat.node_epoch);
epoch_stats.clear();
true
} else {
false
}
} else {
epoch_stats.set_epoch(current_stat.node_epoch);
// If the epoch is empty, it indicates that the current node sending the heartbeat
// for the first time to the current meta leader, so it is necessary to persist
// the data to the KV store as soon as possible.
true
};
epoch_stats.push(current_stat);
if !refresh && epoch_stats.len() < MAX_CACHED_STATS_PER_KEY {
return Ok(());
}
let stats = stats.drain(..).collect();
let val = StatValue { stats };
let value: Vec<u8> = StatValue {
stats: epoch_stats.drain_all(),
}
.try_into()?;
let put = PutRequest {
key: key.into(),
value: val.try_into()?,
key,
value,
..Default::default()
};
@@ -74,12 +133,11 @@ mod tests {
use std::sync::atomic::AtomicBool;
use std::sync::Arc;
use api::v1::meta::RangeRequest;
use super::*;
use crate::handler::{HeartbeatMailbox, Pushers};
use crate::keys::StatKey;
use crate::sequence::Sequence;
use crate::service::store::ext::KvStoreExt;
use crate::service::store::memory::MemStore;
#[tokio::test]
@@ -88,7 +146,7 @@ mod tests {
let kv_store = Arc::new(MemStore::new());
let seq = Sequence::new("test_seq", 0, 10, kv_store.clone());
let mailbox = HeartbeatMailbox::create(Pushers::default(), seq);
let mut ctx = Context {
let ctx = Context {
server_addr: "127.0.0.1:0000".to_string(),
in_memory,
kv_store,
@@ -98,9 +156,40 @@ mod tests {
is_infancy: false,
};
let req = HeartbeatRequest::default();
let handler = PersistStatsHandler::default();
for i in 1..=MAX_CACHED_STATS_PER_KEY {
handle_request_many_times(ctx.clone(), &handler, 1).await;
let key = StatKey {
cluster_id: 3,
node_id: 101,
};
let res = ctx.in_memory.get(key.try_into().unwrap()).await.unwrap();
assert!(res.is_some());
let kv = res.unwrap();
let key: StatKey = kv.key.clone().try_into().unwrap();
assert_eq!(3, key.cluster_id);
assert_eq!(101, key.node_id);
let val: StatValue = kv.value.try_into().unwrap();
// first new stat must be set in kv store immediately
assert_eq!(1, val.stats.len());
assert_eq!(Some(1), val.stats[0].region_num);
handle_request_many_times(ctx.clone(), &handler, 10).await;
let res = ctx.in_memory.get(key.try_into().unwrap()).await.unwrap();
assert!(res.is_some());
let kv = res.unwrap();
let val: StatValue = kv.value.try_into().unwrap();
// refresh every 10 stats
assert_eq!(10, val.stats.len());
}
async fn handle_request_many_times(
mut ctx: Context,
handler: &PersistStatsHandler,
loop_times: i32,
) {
let req = HeartbeatRequest::default();
for i in 1..=loop_times {
let mut acc = HeartbeatAccumulator {
stat: Some(Stat {
cluster_id: 3,
@@ -112,30 +201,5 @@ mod tests {
};
handler.handle(&req, &mut ctx, &mut acc).await.unwrap();
}
let key = StatKey {
cluster_id: 3,
node_id: 101,
};
let req = RangeRequest {
key: key.try_into().unwrap(),
..Default::default()
};
let res = ctx.in_memory.range(req).await.unwrap();
assert_eq!(1, res.kvs.len());
let kv = &res.kvs[0];
let key: StatKey = kv.key.clone().try_into().unwrap();
assert_eq!(3, key.cluster_id);
assert_eq!(101, key.node_id);
let val: StatValue = kv.value.clone().try_into().unwrap();
assert_eq!(10, val.stats.len());
assert_eq!(Some(1), val.stats[0].region_num);
}
}

View File

@@ -12,7 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use api::v1::meta::{KeyValue, RangeRequest};
use api::v1::meta::{DeleteRangeRequest, KeyValue, RangeRequest};
use crate::error::Result;
use crate::service::store::kv::KvStore;
@@ -24,6 +24,10 @@ pub trait KvStoreExt {
/// Check if a key exists, it does not return the value.
async fn exists(&self, key: Vec<u8>) -> Result<bool>;
/// Delete the value by the given key. If prev_kv is true,
/// the previous key-value pairs will be returned.
async fn delete(&self, key: Vec<u8>, prev_kv: bool) -> Result<Option<KeyValue>>;
}
#[async_trait::async_trait]
@@ -53,6 +57,18 @@ where
Ok(!kvs.is_empty())
}
async fn delete(&self, key: Vec<u8>, prev_kv: bool) -> Result<Option<KeyValue>> {
let req = DeleteRangeRequest {
key,
prev_kv,
..Default::default()
};
let mut prev_kvs = self.delete_range(req).await?.prev_kvs;
Ok(prev_kvs.pop())
}
}
#[cfg(test)]
@@ -115,6 +131,31 @@ mod tests {
assert!(!in_mem.exists("test_key".as_bytes().to_vec()).await.unwrap());
}
#[tokio::test]
async fn test_delete() {
let mut in_mem = Arc::new(MemStore::new()) as KvStoreRef;
let mut prev_kv = in_mem
.delete("test_key1".as_bytes().to_vec(), true)
.await
.unwrap();
assert!(prev_kv.is_none());
put_stats_to_store(&mut in_mem).await;
assert!(in_mem
.exists("test_key1".as_bytes().to_vec())
.await
.unwrap());
prev_kv = in_mem
.delete("test_key1".as_bytes().to_vec(), true)
.await
.unwrap();
assert!(prev_kv.is_some());
assert_eq!("test_key1".as_bytes(), prev_kv.unwrap().key);
}
async fn put_stats_to_store(store: &mut KvStoreRef) {
store
.put(PutRequest {