feat: enable metasrv leader cached kv (#2629)

* feat: enable metasrv leader cached kv

* fix: fix cached kv caching the empty value bug

* chore: apply suggestions from CR

* chore: apply suggestions from CR

* refactor: change DEFAULT_PAGE_SIZE to 1536
This commit is contained in:
Weny Xu
2023-11-03 12:56:28 +09:00
committed by GitHub
parent fb8d0c6ce5
commit 39d52f25bf
9 changed files with 274 additions and 28 deletions

View File

@@ -91,12 +91,20 @@ pub const REMOVED_PREFIX: &str = "__removed";
const NAME_PATTERN: &str = r"[a-zA-Z_:-][a-zA-Z0-9_:\-\.]*";
const DATANODE_TABLE_KEY_PREFIX: &str = "__dn_table";
const TABLE_INFO_KEY_PREFIX: &str = "__table_info";
const TABLE_NAME_KEY_PREFIX: &str = "__table_name";
const TABLE_REGION_KEY_PREFIX: &str = "__table_region";
const CATALOG_NAME_KEY_PREFIX: &str = "__catalog_name";
const SCHEMA_NAME_KEY_PREFIX: &str = "__schema_name";
const TABLE_ROUTE_PREFIX: &str = "__table_route";
pub const TABLE_INFO_KEY_PREFIX: &str = "__table_info";
pub const TABLE_NAME_KEY_PREFIX: &str = "__table_name";
pub const CATALOG_NAME_KEY_PREFIX: &str = "__catalog_name";
pub const SCHEMA_NAME_KEY_PREFIX: &str = "__schema_name";
pub const TABLE_ROUTE_PREFIX: &str = "__table_route";
pub const CACHE_KEY_PREFIXES: [&str; 4] = [
TABLE_NAME_KEY_PREFIX,
CATALOG_NAME_KEY_PREFIX,
SCHEMA_NAME_KEY_PREFIX,
TABLE_ROUTE_PREFIX,
];
pub type RegionDistribution = BTreeMap<DatanodeId, Vec<RegionNumber>>;

View File

@@ -39,7 +39,16 @@ enum PaginationStreamState<K, V> {
Error,
}
pub const DEFAULT_PAGE_SIZE: usize = 512;
/// The Range Request's default page size.
///
/// It dependents on upstream KvStore server side grpc message size limitation.
/// (e.g., etcd has default grpc message size limitation is 4MiB)
///
/// Generally, almost all metadata is smaller than is 2700 Byte.
/// Therefore, We can set the [DEFAULT_PAGE_SIZE] to 1536 statically.
///
/// TODO(weny): Considers updating the default page size dynamically.
pub const DEFAULT_PAGE_SIZE: usize = 1536;
struct PaginationStreamFactory {
kv: KvBackendRef,

View File

@@ -35,8 +35,10 @@ impl HeartbeatHandler for OnLeaderStartHandler {
if let Some(election) = &ctx.election {
if election.in_infancy() {
ctx.is_infancy = true;
// TODO(weny): Unifies the multiple leader state between Context and MetaSrv.
// we can't ensure the in-memory kv has already been reset in the outside loop.
// We still use heartbeat requests to trigger resetting in-memory kv.
ctx.reset_in_memory();
ctx.reset_leader_cached_kv_backend();
}
}
Ok(())

View File

@@ -14,6 +14,7 @@
#![feature(async_closure)]
#![feature(result_flattening)]
#![feature(assert_matches)]
pub mod bootstrap;
mod cache_invalidator;
@@ -34,6 +35,7 @@ pub mod pubsub;
pub mod region;
pub mod selector;
pub mod service;
pub mod state;
pub mod table_meta_alloc;
pub use crate::error::Result;

View File

@@ -24,7 +24,7 @@ use common_greptimedb_telemetry::GreptimeDBTelemetryTask;
use common_grpc::channel_manager;
use common_meta::ddl::DdlTaskExecutorRef;
use common_meta::key::TableMetadataManagerRef;
use common_meta::kv_backend::{KvBackendRef, ResettableKvBackendRef};
use common_meta::kv_backend::{KvBackendRef, ResettableKvBackend, ResettableKvBackendRef};
use common_meta::sequence::SequenceRef;
use common_procedure::options::ProcedureConfig;
use common_procedure::ProcedureManagerRef;
@@ -39,7 +39,7 @@ use tokio::sync::broadcast::error::RecvError;
use crate::cluster::MetaPeerClientRef;
use crate::election::{Election, LeaderChangeMessage};
use crate::error::{
InitMetadataSnafu, Result, StartProcedureManagerSnafu, StartTelemetryTaskSnafu,
self, InitMetadataSnafu, Result, StartProcedureManagerSnafu, StartTelemetryTaskSnafu,
StopProcedureManagerSnafu,
};
use crate::handler::HeartbeatHandlerGroup;
@@ -47,6 +47,9 @@ use crate::lock::DistLockRef;
use crate::pubsub::{PublishRef, SubscribeManagerRef};
use crate::selector::{Selector, SelectorType};
use crate::service::mailbox::MailboxRef;
use crate::service::store::cached_kv::LeaderCachedKvBackend;
use crate::state::{become_follower, become_leader, StateRef};
pub const TABLE_ID_SEQ: &str = "table_id";
pub const METASRV_HOME: &str = "/tmp/metasrv";
@@ -176,10 +179,20 @@ pub struct MetaStateHandler {
procedure_manager: ProcedureManagerRef,
subscribe_manager: Option<SubscribeManagerRef>,
greptimedb_telemetry_task: Arc<GreptimeDBTelemetryTask>,
leader_cached_kv_backend: Arc<LeaderCachedKvBackend>,
state: StateRef,
}
impl MetaStateHandler {
pub async fn on_become_leader(&self) {
self.state.write().unwrap().next_state(become_leader(false));
if let Err(e) = self.leader_cached_kv_backend.load().await {
error!(e; "Failed to load kv into leader cache kv store");
} else {
self.state.write().unwrap().next_state(become_leader(true));
}
if let Err(e) = self.procedure_manager.start().await {
error!(e; "Failed to start procedure manager");
}
@@ -187,6 +200,8 @@ impl MetaStateHandler {
}
pub async fn on_become_follower(&self) {
self.state.write().unwrap().next_state(become_follower());
// Stops the procedures.
if let Err(e) = self.procedure_manager.stop().await {
error!(e; "Failed to stop procedure manager");
@@ -205,13 +220,14 @@ impl MetaStateHandler {
#[derive(Clone)]
pub struct MetaSrv {
state: StateRef,
started: Arc<AtomicBool>,
options: MetaSrvOptions,
// It is only valid at the leader node and is used to temporarily
// store some data that will not be persisted.
in_memory: ResettableKvBackendRef,
kv_backend: KvBackendRef,
leader_cached_kv_backend: ResettableKvBackendRef,
leader_cached_kv_backend: Arc<LeaderCachedKvBackend>,
table_id_sequence: SequenceRef,
meta_peer_client: MetaPeerClientRef,
selector: SelectorRef,
@@ -254,6 +270,8 @@ impl MetaSrv {
greptimedb_telemetry_task,
subscribe_manager,
procedure_manager,
state: self.state.clone(),
leader_cached_kv_backend: leader_cached_kv_backend.clone(),
};
let _handle = common_runtime::spawn_bg(async move {
loop {
@@ -299,6 +317,11 @@ impl MetaSrv {
info!("MetaSrv stopped");
});
} else {
// Always load kv into cached kv store.
self.leader_cached_kv_backend
.load()
.await
.context(error::KvBackendSnafu)?;
self.procedure_manager
.start()
.await
@@ -337,10 +360,6 @@ impl MetaSrv {
&self.kv_backend
}
pub fn leader_cached_kv_backend(&self) -> &ResettableKvBackendRef {
&self.leader_cached_kv_backend
}
pub fn meta_peer_client(&self) -> &MetaPeerClientRef {
&self.meta_peer_client
}

View File

@@ -13,7 +13,7 @@
// limitations under the License.
use std::sync::atomic::AtomicBool;
use std::sync::Arc;
use std::sync::{Arc, RwLock};
use std::time::Duration;
use client::client_manager::DatanodeClients;
@@ -55,6 +55,7 @@ use crate::pubsub::PublishRef;
use crate::selector::lease_based::LeaseBasedSelector;
use crate::service::mailbox::MailboxRef;
use crate::service::store::cached_kv::{CheckLeader, LeaderCachedKvBackend};
use crate::state::State;
use crate::table_meta_alloc::MetaSrvTableMetadataAllocator;
// TODO(fys): try use derive_builder macro
@@ -157,7 +158,18 @@ impl MetaSrvBuilder {
let kv_backend = kv_backend.unwrap_or_else(|| Arc::new(MemoryKvBackend::new()));
let in_memory = in_memory.unwrap_or_else(|| Arc::new(MemoryKvBackend::new()));
let leader_cached_kv_backend = build_leader_cached_kv_backend(&election, &kv_backend);
let state = Arc::new(RwLock::new(match election {
None => State::leader(options.server_addr.to_string(), true),
Some(_) => State::follower(options.server_addr.to_string()),
}));
let leader_cached_kv_backend = Arc::new(LeaderCachedKvBackend::new(
state.clone(),
kv_backend.clone(),
));
let kv_backend = leader_cached_kv_backend.clone() as _;
let meta_peer_client = meta_peer_client
.unwrap_or_else(|| build_default_meta_peer_client(&election, &in_memory));
let selector = selector.unwrap_or_else(|| Arc::new(LeaseBasedSelector));
@@ -241,6 +253,7 @@ impl MetaSrvBuilder {
let metasrv_home = options.data_home.to_string();
Ok(MetaSrv {
state,
started,
options,
in_memory,
@@ -267,16 +280,6 @@ impl MetaSrvBuilder {
}
}
fn build_leader_cached_kv_backend(
election: &Option<ElectionRef>,
kv_backend: &KvBackendRef,
) -> Arc<LeaderCachedKvBackend> {
Arc::new(LeaderCachedKvBackend::new(
Arc::new(CheckLeaderByElection(election.clone())),
kv_backend.clone(),
))
}
fn build_default_meta_peer_client(
election: &Option<ElectionRef>,
in_memory: &ResettableKvBackendRef,

View File

@@ -31,4 +31,7 @@ lazy_static! {
register_histogram_vec!("meta_handler_execute", "meta handler execute", &["name"]).unwrap();
pub static ref METRIC_META_INACTIVE_REGIONS: IntGauge =
register_int_gauge!("meta_inactive_regions", "meta inactive regions").unwrap();
pub static ref METRIC_META_LEADER_CACHED_KV_LOAD: HistogramVec =
register_histogram_vec!("meta_leader_cache_kv_load", "meta load cache", &["prefix"])
.unwrap();
}

View File

@@ -15,20 +15,26 @@
use std::any::Any;
use std::collections::HashSet;
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::Arc;
use std::sync::{Arc, RwLock};
use common_meta::error::{Error, Result};
use common_meta::key::CACHE_KEY_PREFIXES;
use common_meta::kv_backend::memory::MemoryKvBackend;
use common_meta::kv_backend::txn::{Txn, TxnOp, TxnRequest, TxnResponse};
use common_meta::kv_backend::{
KvBackend, KvBackendRef, ResettableKvBackend, ResettableKvBackendRef, TxnService,
};
use common_meta::range_stream::{PaginationStream, DEFAULT_PAGE_SIZE};
use common_meta::rpc::store::{
BatchDeleteRequest, BatchDeleteResponse, BatchGetRequest, BatchGetResponse, BatchPutRequest,
BatchPutResponse, CompareAndPutRequest, CompareAndPutResponse, DeleteRangeRequest,
DeleteRangeResponse, PutRequest, PutResponse, RangeRequest, RangeResponse,
};
use common_meta::rpc::KeyValue;
use futures::TryStreamExt;
use crate::metrics;
use crate::state::State;
pub type CheckLeaderRef = Arc<dyn CheckLeader>;
@@ -44,6 +50,12 @@ impl CheckLeader for AlwaysLeader {
}
}
impl CheckLeader for RwLock<State> {
fn check(&self) -> bool {
self.read().unwrap().enable_leader_cache()
}
}
/// A cache dedicated to a Leader node, in order to cache some metadata.
///
/// To use this cache, the following constraints must be followed:
@@ -79,6 +91,37 @@ impl LeaderCachedKvBackend {
Self::new(Arc::new(AlwaysLeader), store)
}
/// The caller MUST ensure during the loading, there are no mutation requests reaching the `LeaderCachedKvStore`.
pub async fn load(&self) -> Result<()> {
for prefix in &CACHE_KEY_PREFIXES[..] {
let _timer = metrics::METRIC_META_LEADER_CACHED_KV_LOAD.with_label_values(&[prefix]);
// TODO(weny): Refactors PaginationStream's output to unary output.
let stream = PaginationStream::new(
self.store.clone(),
RangeRequest::new().with_prefix(prefix.as_bytes()),
DEFAULT_PAGE_SIZE,
Arc::new(|kv| Ok((kv, ()))),
);
let kvs = stream
.try_collect::<Vec<_>>()
.await?
.into_iter()
.map(|(kv, _)| kv)
.collect();
self.cache
.batch_put(BatchPutRequest {
kvs,
prev_kv: false,
})
.await?;
}
Ok(())
}
#[inline]
fn is_leader(&self) -> bool {
self.check_leader.check()
@@ -141,7 +184,14 @@ impl KvBackend for LeaderCachedKvBackend {
let ver = self.get_version();
let res = self.store.range(req.clone()).await?;
let res = self
.store
.range(RangeRequest {
// ignores `keys_only`
keys_only: false,
..req.clone()
})
.await?;
if !res.kvs.is_empty() {
let KeyValue { key, value } = res.kvs[0].clone();
let put_req = PutRequest {

150
src/meta-srv/src/state.rs Normal file
View File

@@ -0,0 +1,150 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use std::sync::{Arc, RwLock};
pub type StateRef = Arc<RwLock<State>>;
/// State transition.
/// ```text
/// +------------------------------+
/// | |
/// | |
/// | |
/// +-------------------v--------------------+ |
/// | LeaderState{enable_leader_cache:false} | |
/// +-------------------+--------------------+ |
/// | |
/// | |
/// +---------v---------+ |
/// | Init Leader Cache | |
/// +---------+---------+ |
/// | |
/// | |
/// +-------------------v-------------------+ |
/// | LeaderState{enable_leader_cache:true} | |
/// +-------------------+-------------------+ |
/// | |
/// | |
/// +-------v-------+ |
/// | FollowerState | |
/// +-------+-------+ |
/// | |
/// | |
/// +------------------------------+
///```
#[derive(Debug, Clone)]
pub enum State {
Leader(LeaderState),
Follower(FollowerState),
}
#[derive(Debug, Clone)]
pub struct LeaderState {
// Disables the leader cache during initiation
pub enable_leader_cache: bool,
pub server_addr: String,
}
#[derive(Debug, Clone)]
pub struct FollowerState {
pub server_addr: String,
}
impl State {
pub fn follower(server_addr: String) -> State {
Self::Follower(FollowerState { server_addr })
}
pub fn leader(server_addr: String, enable_leader_cache: bool) -> State {
Self::Leader(LeaderState {
enable_leader_cache,
server_addr,
})
}
pub fn enable_leader_cache(&self) -> bool {
match &self {
State::Leader(leader) => leader.enable_leader_cache,
State::Follower(_) => false,
}
}
pub fn next_state<F>(&mut self, f: F)
where
F: FnOnce(&State) -> State,
{
*self = f(self);
}
}
pub fn become_leader(enable_leader_cache: bool) -> impl FnOnce(&State) -> State {
move |prev| match prev {
State::Leader(leader) => State::Leader(LeaderState { ..leader.clone() }),
State::Follower(follower) => State::Leader(LeaderState {
server_addr: follower.server_addr.to_string(),
enable_leader_cache,
}),
}
}
pub fn become_follower() -> impl FnOnce(&State) -> State {
move |prev| match prev {
State::Leader(leader) => State::Follower(FollowerState {
server_addr: leader.server_addr.to_string(),
}),
State::Follower(follower) => State::Follower(FollowerState { ..follower.clone() }),
}
}
#[cfg(test)]
mod tests {
use std::assert_matches::assert_matches;
use crate::state::{become_follower, become_leader, FollowerState, LeaderState, State};
#[tokio::test]
async fn test_next_state() {
let mut state = State::follower("test".to_string());
state.next_state(become_leader(false));
assert_matches!(
state,
State::Leader(LeaderState {
enable_leader_cache: false,
..
})
);
state.next_state(become_leader(false));
assert_matches!(
state,
State::Leader(LeaderState {
enable_leader_cache: false,
..
})
);
state.next_state(become_follower());
assert_matches!(state, State::Follower(FollowerState { .. }));
state.next_state(become_follower());
assert_matches!(state, State::Follower(FollowerState { .. }));
}
}