diff --git a/src/common/meta/src/key.rs b/src/common/meta/src/key.rs index c9522a9d64..e9e2b0804a 100644 --- a/src/common/meta/src/key.rs +++ b/src/common/meta/src/key.rs @@ -91,12 +91,20 @@ pub const REMOVED_PREFIX: &str = "__removed"; const NAME_PATTERN: &str = r"[a-zA-Z_:-][a-zA-Z0-9_:\-\.]*"; const DATANODE_TABLE_KEY_PREFIX: &str = "__dn_table"; -const TABLE_INFO_KEY_PREFIX: &str = "__table_info"; -const TABLE_NAME_KEY_PREFIX: &str = "__table_name"; const TABLE_REGION_KEY_PREFIX: &str = "__table_region"; -const CATALOG_NAME_KEY_PREFIX: &str = "__catalog_name"; -const SCHEMA_NAME_KEY_PREFIX: &str = "__schema_name"; -const TABLE_ROUTE_PREFIX: &str = "__table_route"; + +pub const TABLE_INFO_KEY_PREFIX: &str = "__table_info"; +pub const TABLE_NAME_KEY_PREFIX: &str = "__table_name"; +pub const CATALOG_NAME_KEY_PREFIX: &str = "__catalog_name"; +pub const SCHEMA_NAME_KEY_PREFIX: &str = "__schema_name"; +pub const TABLE_ROUTE_PREFIX: &str = "__table_route"; + +pub const CACHE_KEY_PREFIXES: [&str; 4] = [ + TABLE_NAME_KEY_PREFIX, + CATALOG_NAME_KEY_PREFIX, + SCHEMA_NAME_KEY_PREFIX, + TABLE_ROUTE_PREFIX, +]; pub type RegionDistribution = BTreeMap>; diff --git a/src/common/meta/src/range_stream.rs b/src/common/meta/src/range_stream.rs index 5fb70d1b70..4aa60b00c3 100644 --- a/src/common/meta/src/range_stream.rs +++ b/src/common/meta/src/range_stream.rs @@ -39,7 +39,16 @@ enum PaginationStreamState { Error, } -pub const DEFAULT_PAGE_SIZE: usize = 512; +/// The Range Request's default page size. +/// +/// It dependents on upstream KvStore server side grpc message size limitation. +/// (e.g., etcd has default grpc message size limitation is 4MiB) +/// +/// Generally, almost all metadata is smaller than is 2700 Byte. +/// Therefore, We can set the [DEFAULT_PAGE_SIZE] to 1536 statically. +/// +/// TODO(weny): Considers updating the default page size dynamically. +pub const DEFAULT_PAGE_SIZE: usize = 1536; struct PaginationStreamFactory { kv: KvBackendRef, diff --git a/src/meta-srv/src/handler/on_leader_start_handler.rs b/src/meta-srv/src/handler/on_leader_start_handler.rs index 325bcf42a4..9f32b443b7 100644 --- a/src/meta-srv/src/handler/on_leader_start_handler.rs +++ b/src/meta-srv/src/handler/on_leader_start_handler.rs @@ -35,8 +35,10 @@ impl HeartbeatHandler for OnLeaderStartHandler { if let Some(election) = &ctx.election { if election.in_infancy() { ctx.is_infancy = true; + // TODO(weny): Unifies the multiple leader state between Context and MetaSrv. + // we can't ensure the in-memory kv has already been reset in the outside loop. + // We still use heartbeat requests to trigger resetting in-memory kv. ctx.reset_in_memory(); - ctx.reset_leader_cached_kv_backend(); } } Ok(()) diff --git a/src/meta-srv/src/lib.rs b/src/meta-srv/src/lib.rs index 77f8f3f821..14af05f8c0 100644 --- a/src/meta-srv/src/lib.rs +++ b/src/meta-srv/src/lib.rs @@ -14,6 +14,7 @@ #![feature(async_closure)] #![feature(result_flattening)] +#![feature(assert_matches)] pub mod bootstrap; mod cache_invalidator; @@ -34,6 +35,7 @@ pub mod pubsub; pub mod region; pub mod selector; pub mod service; +pub mod state; pub mod table_meta_alloc; pub use crate::error::Result; diff --git a/src/meta-srv/src/metasrv.rs b/src/meta-srv/src/metasrv.rs index c72e2d59e7..a810a55019 100644 --- a/src/meta-srv/src/metasrv.rs +++ b/src/meta-srv/src/metasrv.rs @@ -24,7 +24,7 @@ use common_greptimedb_telemetry::GreptimeDBTelemetryTask; use common_grpc::channel_manager; use common_meta::ddl::DdlTaskExecutorRef; use common_meta::key::TableMetadataManagerRef; -use common_meta::kv_backend::{KvBackendRef, ResettableKvBackendRef}; +use common_meta::kv_backend::{KvBackendRef, ResettableKvBackend, ResettableKvBackendRef}; use common_meta::sequence::SequenceRef; use common_procedure::options::ProcedureConfig; use common_procedure::ProcedureManagerRef; @@ -39,7 +39,7 @@ use tokio::sync::broadcast::error::RecvError; use crate::cluster::MetaPeerClientRef; use crate::election::{Election, LeaderChangeMessage}; use crate::error::{ - InitMetadataSnafu, Result, StartProcedureManagerSnafu, StartTelemetryTaskSnafu, + self, InitMetadataSnafu, Result, StartProcedureManagerSnafu, StartTelemetryTaskSnafu, StopProcedureManagerSnafu, }; use crate::handler::HeartbeatHandlerGroup; @@ -47,6 +47,9 @@ use crate::lock::DistLockRef; use crate::pubsub::{PublishRef, SubscribeManagerRef}; use crate::selector::{Selector, SelectorType}; use crate::service::mailbox::MailboxRef; +use crate::service::store::cached_kv::LeaderCachedKvBackend; +use crate::state::{become_follower, become_leader, StateRef}; + pub const TABLE_ID_SEQ: &str = "table_id"; pub const METASRV_HOME: &str = "/tmp/metasrv"; @@ -176,10 +179,20 @@ pub struct MetaStateHandler { procedure_manager: ProcedureManagerRef, subscribe_manager: Option, greptimedb_telemetry_task: Arc, + leader_cached_kv_backend: Arc, + state: StateRef, } impl MetaStateHandler { pub async fn on_become_leader(&self) { + self.state.write().unwrap().next_state(become_leader(false)); + + if let Err(e) = self.leader_cached_kv_backend.load().await { + error!(e; "Failed to load kv into leader cache kv store"); + } else { + self.state.write().unwrap().next_state(become_leader(true)); + } + if let Err(e) = self.procedure_manager.start().await { error!(e; "Failed to start procedure manager"); } @@ -187,6 +200,8 @@ impl MetaStateHandler { } pub async fn on_become_follower(&self) { + self.state.write().unwrap().next_state(become_follower()); + // Stops the procedures. if let Err(e) = self.procedure_manager.stop().await { error!(e; "Failed to stop procedure manager"); @@ -205,13 +220,14 @@ impl MetaStateHandler { #[derive(Clone)] pub struct MetaSrv { + state: StateRef, started: Arc, options: MetaSrvOptions, // It is only valid at the leader node and is used to temporarily // store some data that will not be persisted. in_memory: ResettableKvBackendRef, kv_backend: KvBackendRef, - leader_cached_kv_backend: ResettableKvBackendRef, + leader_cached_kv_backend: Arc, table_id_sequence: SequenceRef, meta_peer_client: MetaPeerClientRef, selector: SelectorRef, @@ -254,6 +270,8 @@ impl MetaSrv { greptimedb_telemetry_task, subscribe_manager, procedure_manager, + state: self.state.clone(), + leader_cached_kv_backend: leader_cached_kv_backend.clone(), }; let _handle = common_runtime::spawn_bg(async move { loop { @@ -299,6 +317,11 @@ impl MetaSrv { info!("MetaSrv stopped"); }); } else { + // Always load kv into cached kv store. + self.leader_cached_kv_backend + .load() + .await + .context(error::KvBackendSnafu)?; self.procedure_manager .start() .await @@ -337,10 +360,6 @@ impl MetaSrv { &self.kv_backend } - pub fn leader_cached_kv_backend(&self) -> &ResettableKvBackendRef { - &self.leader_cached_kv_backend - } - pub fn meta_peer_client(&self) -> &MetaPeerClientRef { &self.meta_peer_client } diff --git a/src/meta-srv/src/metasrv/builder.rs b/src/meta-srv/src/metasrv/builder.rs index 0f2dc0f78d..90d3404662 100644 --- a/src/meta-srv/src/metasrv/builder.rs +++ b/src/meta-srv/src/metasrv/builder.rs @@ -13,7 +13,7 @@ // limitations under the License. use std::sync::atomic::AtomicBool; -use std::sync::Arc; +use std::sync::{Arc, RwLock}; use std::time::Duration; use client::client_manager::DatanodeClients; @@ -55,6 +55,7 @@ use crate::pubsub::PublishRef; use crate::selector::lease_based::LeaseBasedSelector; use crate::service::mailbox::MailboxRef; use crate::service::store::cached_kv::{CheckLeader, LeaderCachedKvBackend}; +use crate::state::State; use crate::table_meta_alloc::MetaSrvTableMetadataAllocator; // TODO(fys): try use derive_builder macro @@ -157,7 +158,18 @@ impl MetaSrvBuilder { let kv_backend = kv_backend.unwrap_or_else(|| Arc::new(MemoryKvBackend::new())); let in_memory = in_memory.unwrap_or_else(|| Arc::new(MemoryKvBackend::new())); - let leader_cached_kv_backend = build_leader_cached_kv_backend(&election, &kv_backend); + + let state = Arc::new(RwLock::new(match election { + None => State::leader(options.server_addr.to_string(), true), + Some(_) => State::follower(options.server_addr.to_string()), + })); + + let leader_cached_kv_backend = Arc::new(LeaderCachedKvBackend::new( + state.clone(), + kv_backend.clone(), + )); + let kv_backend = leader_cached_kv_backend.clone() as _; + let meta_peer_client = meta_peer_client .unwrap_or_else(|| build_default_meta_peer_client(&election, &in_memory)); let selector = selector.unwrap_or_else(|| Arc::new(LeaseBasedSelector)); @@ -241,6 +253,7 @@ impl MetaSrvBuilder { let metasrv_home = options.data_home.to_string(); Ok(MetaSrv { + state, started, options, in_memory, @@ -267,16 +280,6 @@ impl MetaSrvBuilder { } } -fn build_leader_cached_kv_backend( - election: &Option, - kv_backend: &KvBackendRef, -) -> Arc { - Arc::new(LeaderCachedKvBackend::new( - Arc::new(CheckLeaderByElection(election.clone())), - kv_backend.clone(), - )) -} - fn build_default_meta_peer_client( election: &Option, in_memory: &ResettableKvBackendRef, diff --git a/src/meta-srv/src/metrics.rs b/src/meta-srv/src/metrics.rs index 57d4893995..8c73c9a7c1 100644 --- a/src/meta-srv/src/metrics.rs +++ b/src/meta-srv/src/metrics.rs @@ -31,4 +31,7 @@ lazy_static! { register_histogram_vec!("meta_handler_execute", "meta handler execute", &["name"]).unwrap(); pub static ref METRIC_META_INACTIVE_REGIONS: IntGauge = register_int_gauge!("meta_inactive_regions", "meta inactive regions").unwrap(); + pub static ref METRIC_META_LEADER_CACHED_KV_LOAD: HistogramVec = + register_histogram_vec!("meta_leader_cache_kv_load", "meta load cache", &["prefix"]) + .unwrap(); } diff --git a/src/meta-srv/src/service/store/cached_kv.rs b/src/meta-srv/src/service/store/cached_kv.rs index f92bd1fb61..450a967df1 100644 --- a/src/meta-srv/src/service/store/cached_kv.rs +++ b/src/meta-srv/src/service/store/cached_kv.rs @@ -15,20 +15,26 @@ use std::any::Any; use std::collections::HashSet; use std::sync::atomic::{AtomicUsize, Ordering}; -use std::sync::Arc; +use std::sync::{Arc, RwLock}; use common_meta::error::{Error, Result}; +use common_meta::key::CACHE_KEY_PREFIXES; use common_meta::kv_backend::memory::MemoryKvBackend; use common_meta::kv_backend::txn::{Txn, TxnOp, TxnRequest, TxnResponse}; use common_meta::kv_backend::{ KvBackend, KvBackendRef, ResettableKvBackend, ResettableKvBackendRef, TxnService, }; +use common_meta::range_stream::{PaginationStream, DEFAULT_PAGE_SIZE}; use common_meta::rpc::store::{ BatchDeleteRequest, BatchDeleteResponse, BatchGetRequest, BatchGetResponse, BatchPutRequest, BatchPutResponse, CompareAndPutRequest, CompareAndPutResponse, DeleteRangeRequest, DeleteRangeResponse, PutRequest, PutResponse, RangeRequest, RangeResponse, }; use common_meta::rpc::KeyValue; +use futures::TryStreamExt; + +use crate::metrics; +use crate::state::State; pub type CheckLeaderRef = Arc; @@ -44,6 +50,12 @@ impl CheckLeader for AlwaysLeader { } } +impl CheckLeader for RwLock { + fn check(&self) -> bool { + self.read().unwrap().enable_leader_cache() + } +} + /// A cache dedicated to a Leader node, in order to cache some metadata. /// /// To use this cache, the following constraints must be followed: @@ -79,6 +91,37 @@ impl LeaderCachedKvBackend { Self::new(Arc::new(AlwaysLeader), store) } + /// The caller MUST ensure during the loading, there are no mutation requests reaching the `LeaderCachedKvStore`. + pub async fn load(&self) -> Result<()> { + for prefix in &CACHE_KEY_PREFIXES[..] { + let _timer = metrics::METRIC_META_LEADER_CACHED_KV_LOAD.with_label_values(&[prefix]); + + // TODO(weny): Refactors PaginationStream's output to unary output. + let stream = PaginationStream::new( + self.store.clone(), + RangeRequest::new().with_prefix(prefix.as_bytes()), + DEFAULT_PAGE_SIZE, + Arc::new(|kv| Ok((kv, ()))), + ); + + let kvs = stream + .try_collect::>() + .await? + .into_iter() + .map(|(kv, _)| kv) + .collect(); + + self.cache + .batch_put(BatchPutRequest { + kvs, + prev_kv: false, + }) + .await?; + } + + Ok(()) + } + #[inline] fn is_leader(&self) -> bool { self.check_leader.check() @@ -141,7 +184,14 @@ impl KvBackend for LeaderCachedKvBackend { let ver = self.get_version(); - let res = self.store.range(req.clone()).await?; + let res = self + .store + .range(RangeRequest { + // ignores `keys_only` + keys_only: false, + ..req.clone() + }) + .await?; if !res.kvs.is_empty() { let KeyValue { key, value } = res.kvs[0].clone(); let put_req = PutRequest { diff --git a/src/meta-srv/src/state.rs b/src/meta-srv/src/state.rs new file mode 100644 index 0000000000..0466644c67 --- /dev/null +++ b/src/meta-srv/src/state.rs @@ -0,0 +1,150 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::sync::{Arc, RwLock}; + +pub type StateRef = Arc>; + +/// State transition. +/// ```text +/// +------------------------------+ +/// | | +/// | | +/// | | +/// +-------------------v--------------------+ | +/// | LeaderState{enable_leader_cache:false} | | +/// +-------------------+--------------------+ | +/// | | +/// | | +/// +---------v---------+ | +/// | Init Leader Cache | | +/// +---------+---------+ | +/// | | +/// | | +/// +-------------------v-------------------+ | +/// | LeaderState{enable_leader_cache:true} | | +/// +-------------------+-------------------+ | +/// | | +/// | | +/// +-------v-------+ | +/// | FollowerState | | +/// +-------+-------+ | +/// | | +/// | | +/// +------------------------------+ +///``` +#[derive(Debug, Clone)] +pub enum State { + Leader(LeaderState), + Follower(FollowerState), +} + +#[derive(Debug, Clone)] +pub struct LeaderState { + // Disables the leader cache during initiation + pub enable_leader_cache: bool, + + pub server_addr: String, +} + +#[derive(Debug, Clone)] +pub struct FollowerState { + pub server_addr: String, +} + +impl State { + pub fn follower(server_addr: String) -> State { + Self::Follower(FollowerState { server_addr }) + } + + pub fn leader(server_addr: String, enable_leader_cache: bool) -> State { + Self::Leader(LeaderState { + enable_leader_cache, + server_addr, + }) + } + + pub fn enable_leader_cache(&self) -> bool { + match &self { + State::Leader(leader) => leader.enable_leader_cache, + State::Follower(_) => false, + } + } + + pub fn next_state(&mut self, f: F) + where + F: FnOnce(&State) -> State, + { + *self = f(self); + } +} + +pub fn become_leader(enable_leader_cache: bool) -> impl FnOnce(&State) -> State { + move |prev| match prev { + State::Leader(leader) => State::Leader(LeaderState { ..leader.clone() }), + State::Follower(follower) => State::Leader(LeaderState { + server_addr: follower.server_addr.to_string(), + enable_leader_cache, + }), + } +} + +pub fn become_follower() -> impl FnOnce(&State) -> State { + move |prev| match prev { + State::Leader(leader) => State::Follower(FollowerState { + server_addr: leader.server_addr.to_string(), + }), + State::Follower(follower) => State::Follower(FollowerState { ..follower.clone() }), + } +} + +#[cfg(test)] +mod tests { + use std::assert_matches::assert_matches; + + use crate::state::{become_follower, become_leader, FollowerState, LeaderState, State}; + + #[tokio::test] + async fn test_next_state() { + let mut state = State::follower("test".to_string()); + + state.next_state(become_leader(false)); + + assert_matches!( + state, + State::Leader(LeaderState { + enable_leader_cache: false, + .. + }) + ); + + state.next_state(become_leader(false)); + + assert_matches!( + state, + State::Leader(LeaderState { + enable_leader_cache: false, + .. + }) + ); + + state.next_state(become_follower()); + + assert_matches!(state, State::Follower(FollowerState { .. })); + + state.next_state(become_follower()); + + assert_matches!(state, State::Follower(FollowerState { .. })); + } +}