diff --git a/src/meta-srv/src/election.rs b/src/meta-srv/src/election.rs index 8db9ee6477..98d3e29649 100644 --- a/src/meta-srv/src/election.rs +++ b/src/meta-srv/src/election.rs @@ -26,7 +26,7 @@ pub const LEASE_SECS: i64 = 5; pub const KEEP_ALIVE_PERIOD_SECS: u64 = LEASE_SECS as u64 / 2; pub const ELECTION_KEY: &str = "__meta_srv_election"; -#[derive(Clone)] +#[derive(Debug, Clone)] pub enum LeaderChangeMessage { Elected(Arc), StepDown(Arc), diff --git a/src/meta-srv/src/handler/on_leader_start_handler.rs b/src/meta-srv/src/handler/on_leader_start_handler.rs index c65ff412b6..b770e7f4b4 100644 --- a/src/meta-srv/src/handler/on_leader_start_handler.rs +++ b/src/meta-srv/src/handler/on_leader_start_handler.rs @@ -37,6 +37,7 @@ impl HeartbeatHandler for OnLeaderStartHandler { if election.in_infancy() { ctx.is_infancy = true; ctx.reset_in_memory(); + ctx.reset_leader_cached_kv_store(); } } Ok(()) diff --git a/src/meta-srv/src/handler/persist_stats_handler.rs b/src/meta-srv/src/handler/persist_stats_handler.rs index f8701fef5c..8ab335d9bf 100644 --- a/src/meta-srv/src/handler/persist_stats_handler.rs +++ b/src/meta-srv/src/handler/persist_stats_handler.rs @@ -146,6 +146,7 @@ mod tests { use crate::handler::{HeartbeatMailbox, Pushers}; use crate::keys::StatKey; use crate::sequence::Sequence; + use crate::service::store::cached_kv::LeaderCachedKvStore; use crate::service::store::ext::KvStoreExt; use crate::service::store::memory::MemStore; @@ -153,6 +154,8 @@ mod tests { async fn test_handle_datanode_stats() { let in_memory = Arc::new(MemStore::new()); let kv_store = Arc::new(MemStore::new()); + let leader_cached_kv_store = + Arc::new(LeaderCachedKvStore::with_always_leader(kv_store.clone())); let seq = Sequence::new("test_seq", 0, 10, kv_store.clone()); let mailbox = HeartbeatMailbox::create(Pushers::default(), seq); let meta_peer_client = MetaPeerClientBuilder::default() @@ -166,6 +169,7 @@ mod tests { server_addr: "127.0.0.1:0000".to_string(), in_memory, kv_store, + leader_cached_kv_store, meta_peer_client, mailbox, election: None, diff --git a/src/meta-srv/src/handler/response_header_handler.rs b/src/meta-srv/src/handler/response_header_handler.rs index 7db6ca8684..1210e6436c 100644 --- a/src/meta-srv/src/handler/response_header_handler.rs +++ b/src/meta-srv/src/handler/response_header_handler.rs @@ -56,12 +56,15 @@ mod tests { use crate::cluster::MetaPeerClientBuilder; use crate::handler::{Context, HeartbeatMailbox, Pushers}; use crate::sequence::Sequence; + use crate::service::store::cached_kv::LeaderCachedKvStore; use crate::service::store::memory::MemStore; #[tokio::test] async fn test_handle_heartbeat_resp_header() { let in_memory = Arc::new(MemStore::new()); let kv_store = Arc::new(MemStore::new()); + let leader_cached_kv_store = + Arc::new(LeaderCachedKvStore::with_always_leader(kv_store.clone())); let seq = Sequence::new("test_seq", 0, 10, kv_store.clone()); let mailbox = HeartbeatMailbox::create(Pushers::default(), seq); let meta_peer_client = MetaPeerClientBuilder::default() @@ -75,6 +78,7 @@ mod tests { server_addr: "127.0.0.1:0000".to_string(), in_memory, kv_store, + leader_cached_kv_store, meta_peer_client, mailbox, election: None, diff --git a/src/meta-srv/src/metasrv.rs b/src/meta-srv/src/metasrv.rs index 69d314bd28..7280c0a751 100644 --- a/src/meta-srv/src/metasrv.rs +++ b/src/meta-srv/src/metasrv.rs @@ -75,6 +75,7 @@ pub struct Context { pub server_addr: String, pub in_memory: ResettableKvStoreRef, pub kv_store: KvStoreRef, + pub leader_cached_kv_store: ResettableKvStoreRef, pub meta_peer_client: MetaPeerClientRef, pub mailbox: MailboxRef, pub election: Option, @@ -94,6 +95,10 @@ impl Context { pub fn reset_in_memory(&self) { self.in_memory.reset(); } + + pub fn reset_leader_cached_kv_store(&self) { + self.leader_cached_kv_store.reset(); + } } pub struct LeaderValue(pub String); @@ -120,6 +125,7 @@ pub struct MetaSrv { // store some data that will not be persisted. in_memory: ResettableKvStoreRef, kv_store: KvStoreRef, + leader_cached_kv_store: ResettableKvStoreRef, table_id_sequence: SequenceRef, meta_peer_client: MetaPeerClientRef, selector: SelectorRef, @@ -146,20 +152,30 @@ impl MetaSrv { if let Some(election) = self.election() { let procedure_manager = self.procedure_manager.clone(); + let in_memory = self.in_memory.clone(); + let leader_cached_kv_store = self.leader_cached_kv_store.clone(); let mut rx = election.subscribe_leader_change(); let _handle = common_runtime::spawn_bg(async move { loop { match rx.recv().await { - Ok(msg) => match msg { - LeaderChangeMessage::Elected(_) => { - if let Err(e) = procedure_manager.recover().await { - error!("Failed to recover procedures, error: {e}"); + Ok(msg) => { + in_memory.reset(); + leader_cached_kv_store.reset(); + info!( + "Leader's cache has bean cleared on leader change: {:?}", + msg + ); + match msg { + LeaderChangeMessage::Elected(_) => { + if let Err(e) = procedure_manager.recover().await { + error!("Failed to recover procedures, error: {e}"); + } + } + LeaderChangeMessage::StepDown(leader) => { + error!("Leader :{:?} step down", leader); } } - LeaderChangeMessage::StepDown(leader) => { - error!("Leader :{:?} step down", leader); - } - }, + } Err(RecvError::Closed) => { error!("Not expected, is leader election loop still running?"); break; @@ -219,6 +235,11 @@ impl MetaSrv { self.kv_store.clone() } + #[inline] + pub fn leader_cached_kv_store(&self) -> ResettableKvStoreRef { + self.leader_cached_kv_store.clone() + } + #[inline] pub fn meta_peer_client(&self) -> MetaPeerClientRef { self.meta_peer_client.clone() @@ -254,6 +275,7 @@ impl MetaSrv { self.mailbox.clone() } + #[inline] pub fn procedure_manager(&self) -> &ProcedureManagerRef { &self.procedure_manager } @@ -263,6 +285,7 @@ impl MetaSrv { let server_addr = self.options().server_addr.clone(); let in_memory = self.in_memory(); let kv_store = self.kv_store(); + let leader_cached_kv_store = self.leader_cached_kv_store(); let meta_peer_client = self.meta_peer_client(); let mailbox = self.mailbox(); let election = self.election(); @@ -271,6 +294,7 @@ impl MetaSrv { server_addr, in_memory, kv_store, + leader_cached_kv_store, meta_peer_client, mailbox, election, diff --git a/src/meta-srv/src/metasrv/builder.rs b/src/meta-srv/src/metasrv/builder.rs index 2a252fdd7c..e75b8cacbc 100644 --- a/src/meta-srv/src/metasrv/builder.rs +++ b/src/meta-srv/src/metasrv/builder.rs @@ -36,6 +36,7 @@ use crate::procedure::region_failover::RegionFailoverManager; use crate::procedure::state_store::MetaStateStore; use crate::selector::lease_based::LeaseBasedSelector; use crate::sequence::Sequence; +use crate::service::store::cached_kv::{CheckLeader, LeaderCachedKvStore}; use crate::service::store::kv::{KvStoreRef, ResettableKvStoreRef}; use crate::service::store::memory::MemStore; @@ -131,6 +132,10 @@ impl MetaSrvBuilder { let kv_store = kv_store.unwrap_or_else(|| Arc::new(MemStore::default())); let in_memory = in_memory.unwrap_or_else(|| Arc::new(MemStore::default())); + let leader_cached_kv_store = Arc::new(LeaderCachedKvStore::new( + Arc::new(CheckLeaderByElection(election.clone())), + kv_store.clone(), + )); let meta_peer_client = meta_peer_client.unwrap_or_else(|| { MetaPeerClientBuilder::default() .election(election.clone()) @@ -146,6 +151,9 @@ impl MetaSrvBuilder { let mailbox = HeartbeatMailbox::create(pushers.clone(), mailbox_sequence); let state_store = Arc::new(MetaStateStore::new(kv_store.clone())); let procedure_manager = Arc::new(LocalManager::new(ManagerConfig::default(), state_store)); + let table_id_sequence = Arc::new(Sequence::new(TABLE_ID_SEQ, 1024, 10, kv_store.clone())); + let metadata_service = metadata_service + .unwrap_or_else(|| Arc::new(DefaultMetadataService::new(kv_store.clone()))); let lock = lock.unwrap_or_else(|| Arc::new(MemLock::default())); let handler_group = match handler_group { @@ -202,16 +210,12 @@ impl MetaSrvBuilder { } }; - let table_id_sequence = Arc::new(Sequence::new(TABLE_ID_SEQ, 1024, 10, kv_store.clone())); - - let metadata_service = metadata_service - .unwrap_or_else(|| Arc::new(DefaultMetadataService::new(kv_store.clone()))); - Ok(MetaSrv { started, options, in_memory, kv_store, + leader_cached_kv_store, meta_peer_client, table_id_sequence, selector, @@ -230,3 +234,13 @@ impl Default for MetaSrvBuilder { Self::new() } } + +struct CheckLeaderByElection(Option); + +impl CheckLeader for CheckLeaderByElection { + fn check(&self) -> bool { + self.0 + .as_ref() + .map_or(false, |election| election.is_leader()) + } +} diff --git a/src/meta-srv/src/service/store.rs b/src/meta-srv/src/service/store.rs index 97f50e29b9..8b184f10df 100644 --- a/src/meta-srv/src/service/store.rs +++ b/src/meta-srv/src/service/store.rs @@ -12,6 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. +pub mod cached_kv; pub mod etcd; pub(crate) mod etcd_util; pub mod ext; diff --git a/src/meta-srv/src/service/store/cached_kv.rs b/src/meta-srv/src/service/store/cached_kv.rs new file mode 100644 index 0000000000..8637670a74 --- /dev/null +++ b/src/meta-srv/src/service/store/cached_kv.rs @@ -0,0 +1,474 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::collections::HashSet; +use std::sync::atomic::{AtomicUsize, Ordering}; +use std::sync::Arc; + +use api::v1::meta::{ + BatchDeleteRequest, BatchDeleteResponse, BatchGetRequest, BatchGetResponse, BatchPutRequest, + BatchPutResponse, CompareAndPutRequest, CompareAndPutResponse, DeleteRangeRequest, + DeleteRangeResponse, KeyValue, MoveValueRequest, MoveValueResponse, PutRequest, PutResponse, + RangeRequest, RangeResponse, +}; + +use crate::error::Result; +use crate::service::store::ext::KvStoreExt; +use crate::service::store::kv::{KvStore, KvStoreRef, ResettableKvStore, ResettableKvStoreRef}; +use crate::service::store::memory::MemStore; +use crate::service::store::txn::{Txn, TxnOp, TxnRequest, TxnResponse, TxnService}; + +pub type CheckLeaderRef = Arc; + +pub trait CheckLeader: Sync + Send { + fn check(&self) -> bool; +} + +struct AlwaysLeader; + +impl CheckLeader for AlwaysLeader { + fn check(&self) -> bool { + true + } +} + +/// A cache dedicated to a Leader node, in order to cache some metadata. +/// +/// To use this cache, the following constraints must be followed: +/// 1. The leader node can create this metadata. +/// 2. The follower node can create this metadata. The leader node can lazily retrieve +/// the corresponding data through the caching loading mechanism. +/// 3. Only the leader node can update this metadata, as the cache cannot detect +/// modifications made to the data on the follower node. +/// 4. Only the leader node can delete this metadata for the same reason mentioned above. +pub struct LeaderCachedKvStore { + check_leader: CheckLeaderRef, + store: KvStoreRef, + cache: ResettableKvStoreRef, + version: AtomicUsize, +} + +impl LeaderCachedKvStore { + pub fn new(check_leader: CheckLeaderRef, store: KvStoreRef) -> Self { + Self { + check_leader, + store, + cache: Arc::new(MemStore::new()), + version: AtomicUsize::new(0), + } + } + + /// With a leader checker which always returns true when checking, + /// mainly used in test scenarios. + pub fn with_always_leader(store: KvStoreRef) -> Self { + Self::new(Arc::new(AlwaysLeader), store) + } + + #[inline] + fn is_leader(&self) -> bool { + self.check_leader.check() + } + + #[inline] + async fn invalid_key(&self, key: Vec) -> Result<()> { + let _ = self.cache.delete(key, false).await?; + Ok(()) + } + + #[inline] + async fn invalid_keys(&self, keys: Vec>) -> Result<()> { + let txn = Txn::new().and_then(keys.into_iter().map(TxnOp::Delete).collect::>()); + let _ = self.cache.txn(txn).await?; + Ok(()) + } + + #[inline] + fn get_version(&self) -> usize { + self.version.load(Ordering::Relaxed) + } + + #[inline] + fn create_new_version(&self) -> usize { + self.version.fetch_add(1, Ordering::Relaxed) + 1 + } + + #[inline] + fn validate_version(&self, version: usize) -> bool { + version == self.version.load(Ordering::Relaxed) + } +} + +#[async_trait::async_trait] +impl KvStore for LeaderCachedKvStore { + async fn range(&self, req: RangeRequest) -> Result { + if !self.is_leader() { + return self.store.range(req).await; + } + + // We can only cache for exact key queries (i.e. get requests) + // because we cannot confirm if a range response is complete. + if !req.range_end.is_empty() { + return self.store.range(req).await; + } + + let res = self.cache.range(req.clone()).await?; + if !res.kvs.is_empty() { + return Ok(res); + } + + let ver = self.get_version(); + + let res = self.store.range(req.clone()).await?; + if !res.kvs.is_empty() { + let KeyValue { key, value } = res.kvs[0].clone(); + let put_req = PutRequest { + key: key.clone(), + value, + ..Default::default() + }; + let _ = self.cache.put(put_req).await?; + + if !self.validate_version(ver) { + self.invalid_key(key).await?; + } + } + + return Ok(res); + } + + async fn put(&self, req: PutRequest) -> Result { + if !self.is_leader() { + return self.store.put(req).await; + } + + let ver = self.create_new_version(); + + let res = self.store.put(req.clone()).await?; + let _ = self.cache.put(req.clone()).await?; + + if !self.validate_version(ver) { + self.invalid_key(req.key).await?; + } + + Ok(res) + } + + async fn batch_get(&self, req: BatchGetRequest) -> Result { + if !self.is_leader() { + return self.store.batch_get(req).await; + } + + let cached_res = self.cache.batch_get(req.clone()).await?; + // The cache hit all keys + if cached_res.kvs.len() == req.keys.len() { + return Ok(cached_res); + } + + let hit_keys = cached_res + .kvs + .iter() + .map(|kv| kv.key.clone()) + .collect::>(); + let missed_keys = req + .keys + .iter() + .filter(|key| !hit_keys.contains(*key)) + .cloned() + .collect::>(); + let remote_req = BatchGetRequest { + keys: missed_keys, + ..Default::default() + }; + + let ver = self.get_version(); + + let remote_res = self.store.batch_get(remote_req).await?; + let put_req = BatchPutRequest { + kvs: remote_res.kvs.clone(), + ..Default::default() + }; + let _ = self.cache.batch_put(put_req).await?; + + if !self.validate_version(ver) { + let keys = remote_res + .kvs + .iter() + .map(|kv| kv.key.clone()) + .collect::>(); + self.invalid_keys(keys).await?; + } + + let mut merged_res = cached_res; + merged_res.kvs.extend(remote_res.kvs); + Ok(merged_res) + } + + async fn batch_put(&self, req: BatchPutRequest) -> Result { + if !self.is_leader() { + return self.store.batch_put(req).await; + } + + let ver = self.create_new_version(); + + let res = self.store.batch_put(req.clone()).await?; + let _ = self.cache.batch_put(req.clone()).await?; + + if !self.validate_version(ver) { + let keys = req.kvs.into_iter().map(|kv| kv.key).collect::>(); + self.invalid_keys(keys).await?; + } + + Ok(res) + } + + async fn batch_delete(&self, req: BatchDeleteRequest) -> Result { + if !self.is_leader() { + return self.store.batch_delete(req).await; + } + + let _ = self.create_new_version(); + + let res = self.store.batch_delete(req.clone()).await?; + let _ = self.cache.batch_delete(req).await?; + Ok(res) + } + + async fn compare_and_put(&self, req: CompareAndPutRequest) -> Result { + if !self.is_leader() { + return self.store.compare_and_put(req).await; + } + + let _ = self.create_new_version(); + + let key = req.key.clone(); + let res = self.store.compare_and_put(req).await?; + // Delete key in the cache. + // + // Cache can not deal with the CAS operation, because it does + // not contain full data, so we need to delete the key. + self.invalid_key(key).await?; + Ok(res) + } + + async fn delete_range(&self, req: DeleteRangeRequest) -> Result { + if !self.is_leader() { + return self.store.delete_range(req).await; + } + + let _ = self.create_new_version(); + + let res = self.store.delete_range(req.clone()).await?; + let _ = self.cache.delete_range(req).await?; + Ok(res) + } + + async fn move_value(&self, req: MoveValueRequest) -> Result { + if !self.is_leader() { + return self.store.move_value(req).await; + } + + let _ = self.create_new_version(); + + let res = self.store.move_value(req.clone()).await?; + let MoveValueRequest { + from_key, to_key, .. + } = req; + // Delete all keys in the cache. + // + // Cache can not deal with the move operation, because it does + // not contain full data, so we need to delete both keys. + self.invalid_keys(vec![from_key, to_key]).await?; + Ok(res) + } +} + +#[async_trait::async_trait] +impl TxnService for LeaderCachedKvStore { + async fn txn(&self, txn: Txn) -> Result { + if !self.is_leader() { + return self.store.txn(txn).await; + } + + let _ = self.create_new_version(); + + let res = self.store.txn(txn.clone()).await?; + let TxnRequest { + success, failure, .. + } = txn.into(); + let mut all = success; + all.extend(failure); + // Delete all keys in the cache. + // + // Cache can not deal with the txn operation, because it does + // not contain full data, so we need to delete both keys. + let mut keys = Vec::with_capacity(all.len()); + for txn_op in all { + match txn_op { + TxnOp::Put(key, _) => { + keys.push(key); + } + TxnOp::Delete(key) => { + keys.push(key); + } + TxnOp::Get(_) => {} + } + } + self.invalid_keys(keys).await?; + + Ok(res) + } +} + +impl ResettableKvStore for LeaderCachedKvStore { + fn reset(&self) { + self.cache.reset() + } +} + +#[cfg(test)] +mod tests { + use api::v1::meta::KeyValue; + + use super::*; + use crate::service::store::memory::MemStore; + + fn create_leader_cached_kv_store() -> LeaderCachedKvStore { + let store = Arc::new(MemStore::new()); + LeaderCachedKvStore::with_always_leader(store) + } + + #[tokio::test] + async fn test_get_put_delete() { + let cached_store = create_leader_cached_kv_store(); + let inner_store = cached_store.store.clone(); + let inner_cache = cached_store.cache.clone(); + + let key = "test_key".to_owned().into_bytes(); + let value = "value".to_owned().into_bytes(); + + let put_req = PutRequest { + key: key.clone(), + value: value.clone(), + ..Default::default() + }; + let _ = inner_store.put(put_req).await.unwrap(); + + let cached_value = inner_cache.get(key.clone()).await.unwrap(); + assert!(cached_value.is_none()); + + let cached_value = cached_store.get(key.clone()).await.unwrap().unwrap(); + assert_eq!(cached_value.value, value); + + let cached_value = inner_cache.get(key.clone()).await.unwrap().unwrap(); + assert_eq!(cached_value.value, value); + + let res = cached_store + .delete(key.clone(), true) + .await + .unwrap() + .unwrap(); + assert_eq!(res.value, value); + + let cached_value = inner_cache.get(key.clone()).await.unwrap(); + assert!(cached_value.is_none()); + } + + #[tokio::test] + async fn test_batch_get_put_delete() { + let cached_store = create_leader_cached_kv_store(); + let inner_store = cached_store.store.clone(); + let inner_cache = cached_store.cache.clone(); + + let kvs = (1..3) + .map(|i| { + let key = format!("test_key_{}", i).into_bytes(); + let value = format!("value_{}", i).into_bytes(); + KeyValue { key, value } + }) + .collect::>(); + + let batch_put_req = BatchPutRequest { + kvs: kvs.clone(), + ..Default::default() + }; + + let _ = inner_store.batch_put(batch_put_req).await.unwrap(); + + let keys = (1..5) + .map(|i| format!("test_key_{}", i).into_bytes()) + .collect::>(); + + let batch_get_req = BatchGetRequest { + keys, + ..Default::default() + }; + + let cached_values = inner_cache.batch_get(batch_get_req.clone()).await.unwrap(); + assert!(cached_values.kvs.is_empty()); + + let cached_values = cached_store.batch_get(batch_get_req.clone()).await.unwrap(); + assert_eq!(cached_values.kvs.len(), 2); + + let cached_values = inner_cache.batch_get(batch_get_req.clone()).await.unwrap(); + assert_eq!(cached_values.kvs.len(), 2); + + cached_store.reset(); + + let cached_values = inner_cache.batch_get(batch_get_req).await.unwrap(); + assert!(cached_values.kvs.is_empty()); + } + + #[tokio::test] + async fn test_txn() { + let cached_store = create_leader_cached_kv_store(); + let inner_cache = cached_store.cache.clone(); + + let kvs = (1..5) + .map(|i| { + let key = format!("test_key_{}", i).into_bytes(); + let value = format!("value_{}", i).into_bytes(); + KeyValue { key, value } + }) + .collect::>(); + + let batch_put_req = BatchPutRequest { + kvs: kvs.clone(), + ..Default::default() + }; + let _ = cached_store.batch_put(batch_put_req).await.unwrap(); + + let keys = (1..5) + .map(|i| format!("test_key_{}", i).into_bytes()) + .collect::>(); + let batch_get_req = BatchGetRequest { + keys, + ..Default::default() + }; + let cached_values = inner_cache.batch_get(batch_get_req.clone()).await.unwrap(); + assert_eq!(cached_values.kvs.len(), 4); + + let put_ops = (1..5) + .map(|i| { + let key = format!("test_key_{}", i).into_bytes(); + let value = format!("value_{}", i).into_bytes(); + TxnOp::Put(key, value) + }) + .collect::>(); + let txn = Txn::new().and_then(put_ops); + let _ = cached_store.txn(txn).await.unwrap(); + + let cached_values = inner_cache.batch_get(batch_get_req).await.unwrap(); + assert!(cached_values.kvs.is_empty()); + } +}