diff --git a/Cargo.lock b/Cargo.lock index 2703420dcb..c560f4686a 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4111,12 +4111,14 @@ version = "0.1.1" dependencies = [ "anymap", "api", + "async-stream", "async-trait", "catalog", "common-base", "common-catalog", "common-error", "common-grpc", + "common-procedure", "common-runtime", "common-telemetry", "common-time", diff --git a/src/common/procedure/src/error.rs b/src/common/procedure/src/error.rs index eca8c9f938..8f9515dd9d 100644 --- a/src/common/procedure/src/error.rs +++ b/src/common/procedure/src/error.rs @@ -13,6 +13,7 @@ // limitations under the License. use std::any::Any; +use std::string::FromUtf8Error; use std::sync::Arc; use common_error::prelude::*; @@ -47,10 +48,11 @@ pub enum Error { backtrace: Backtrace, }, - #[snafu(display("Failed to put {}, source: {}", key, source))] + #[snafu(display("Failed to put state, key: '{key}', source: {source}"))] PutState { key: String, - source: object_store::Error, + #[snafu(backtrace)] + source: BoxedError, }, #[snafu(display("Failed to delete {}, source: {}", key, source))] @@ -59,10 +61,18 @@ pub enum Error { source: object_store::Error, }, - #[snafu(display("Failed to list {}, source: {}", path, source))] + #[snafu(display("Failed to delete keys: '{keys}', source: {source}"))] + DeleteStates { + keys: String, + #[snafu(backtrace)] + source: BoxedError, + }, + + #[snafu(display("Failed to list state, path: '{path}', source: {source}"))] ListState { path: String, - source: object_store::Error, + #[snafu(backtrace)] + source: BoxedError, }, #[snafu(display("Failed to read {}, source: {}", key, source))] @@ -107,6 +117,9 @@ pub enum Error { source: Arc, procedure_id: ProcedureId, }, + + #[snafu(display("Corrupted data, error: {source}"))] + CorruptedData { source: FromUtf8Error }, } pub type Result = std::result::Result; @@ -114,11 +127,13 @@ pub type Result = std::result::Result; impl ErrorExt for Error { fn status_code(&self) -> StatusCode { match self { - Error::External { source } => source.status_code(), + Error::External { source } + | Error::PutState { source, .. } + | Error::DeleteStates { source, .. } + | Error::ListState { source, .. } => source.status_code(), + Error::ToJson { .. } - | Error::PutState { .. } | Error::DeleteState { .. } - | Error::ListState { .. } | Error::ReadState { .. } | Error::FromJson { .. } | Error::RetryTimesExceeded { .. } @@ -127,7 +142,7 @@ impl ErrorExt for Error { Error::LoaderConflict { .. } | Error::DuplicateProcedure { .. } => { StatusCode::InvalidArguments } - Error::ProcedurePanic { .. } => StatusCode::Unexpected, + Error::ProcedurePanic { .. } | Error::CorruptedData { .. } => StatusCode::Unexpected, Error::ProcedureExec { source, .. } => source.status_code(), } } diff --git a/src/common/procedure/src/lib.rs b/src/common/procedure/src/lib.rs index 50134e2a69..a124f94f6e 100644 --- a/src/common/procedure/src/lib.rs +++ b/src/common/procedure/src/lib.rs @@ -17,7 +17,7 @@ pub mod error; pub mod local; mod procedure; -mod store; +pub mod store; pub mod watcher; pub use crate::error::{Error, Result}; diff --git a/src/common/procedure/src/local.rs b/src/common/procedure/src/local.rs index df14e70b0f..0c59b85b9c 100644 --- a/src/common/procedure/src/local.rs +++ b/src/common/procedure/src/local.rs @@ -22,7 +22,6 @@ use std::time::Duration; use async_trait::async_trait; use backon::ExponentialBuilder; use common_telemetry::logging; -use object_store::ObjectStore; use snafu::ensure; use tokio::sync::watch::{self, Receiver, Sender}; use tokio::sync::Notify; @@ -31,7 +30,7 @@ use crate::error::{DuplicateProcedureSnafu, LoaderConflictSnafu, Result}; use crate::local::lock::LockMap; use crate::local::runner::Runner; use crate::procedure::BoxedProcedureLoader; -use crate::store::{ObjectStateStore, ProcedureMessage, ProcedureStore, StateStoreRef}; +use crate::store::{ProcedureMessage, ProcedureStore, StateStoreRef}; use crate::{ BoxedProcedure, ContextProvider, LockKey, ProcedureId, ProcedureManager, ProcedureState, ProcedureWithId, Watcher, @@ -291,12 +290,19 @@ impl ManagerContext { /// Config for [LocalManager]. #[derive(Debug)] pub struct ManagerConfig { - /// Object store - pub object_store: ObjectStore, pub max_retry_times: usize, pub retry_delay: Duration, } +impl Default for ManagerConfig { + fn default() -> Self { + Self { + max_retry_times: 3, + retry_delay: Duration::from_millis(500), + } + } +} + /// A [ProcedureManager] that maintains procedure states locally. pub struct LocalManager { manager_ctx: Arc, @@ -307,10 +313,10 @@ pub struct LocalManager { impl LocalManager { /// Create a new [LocalManager] with specific `config`. - pub fn new(config: ManagerConfig) -> LocalManager { + pub fn new(config: ManagerConfig, state_store: StateStoreRef) -> LocalManager { LocalManager { manager_ctx: Arc::new(ManagerContext::new()), - state_store: Arc::new(ObjectStateStore::new(config.object_store)), + state_store, max_retry_times: config.max_retry_times, retry_delay: config.retry_delay, } @@ -423,6 +429,7 @@ impl ProcedureManager for LocalManager { mod test_util { use common_test_util::temp_dir::TempDir; use object_store::services::Fs as Builder; + use object_store::ObjectStore; use super::*; @@ -446,6 +453,7 @@ mod tests { use super::*; use crate::error::Error; + use crate::store::ObjectStateStore; use crate::{Context, Procedure, Status}; #[test] @@ -554,11 +562,11 @@ mod tests { fn test_register_loader() { let dir = create_temp_dir("register"); let config = ManagerConfig { - object_store: test_util::new_object_store(&dir), max_retry_times: 3, retry_delay: Duration::from_millis(500), }; - let manager = LocalManager::new(config); + let state_store = Arc::new(ObjectStateStore::new(test_util::new_object_store(&dir))); + let manager = LocalManager::new(config, state_store); manager .register_loader("ProcedureToLoad", ProcedureToLoad::loader()) @@ -575,11 +583,11 @@ mod tests { let dir = create_temp_dir("recover"); let object_store = test_util::new_object_store(&dir); let config = ManagerConfig { - object_store: object_store.clone(), max_retry_times: 3, retry_delay: Duration::from_millis(500), }; - let manager = LocalManager::new(config); + let state_store = Arc::new(ObjectStateStore::new(object_store.clone())); + let manager = LocalManager::new(config, state_store); manager .register_loader("ProcedureToLoad", ProcedureToLoad::loader()) @@ -621,11 +629,11 @@ mod tests { async fn test_submit_procedure() { let dir = create_temp_dir("submit"); let config = ManagerConfig { - object_store: test_util::new_object_store(&dir), max_retry_times: 3, retry_delay: Duration::from_millis(500), }; - let manager = LocalManager::new(config); + let state_store = Arc::new(ObjectStateStore::new(test_util::new_object_store(&dir))); + let manager = LocalManager::new(config, state_store); let procedure_id = ProcedureId::random(); assert!(manager @@ -669,11 +677,11 @@ mod tests { async fn test_state_changed_on_err() { let dir = create_temp_dir("on_err"); let config = ManagerConfig { - object_store: test_util::new_object_store(&dir), max_retry_times: 3, retry_delay: Duration::from_millis(500), }; - let manager = LocalManager::new(config); + let state_store = Arc::new(ObjectStateStore::new(test_util::new_object_store(&dir))); + let manager = LocalManager::new(config, state_store); #[derive(Debug)] struct MockProcedure { diff --git a/src/common/procedure/src/store.rs b/src/common/procedure/src/store.rs index 07d0fb8eb9..8b1472e9f7 100644 --- a/src/common/procedure/src/store.rs +++ b/src/common/procedure/src/store.rs @@ -26,7 +26,7 @@ use crate::error::{Result, ToJsonSnafu}; pub(crate) use crate::store::state_store::{ObjectStateStore, StateStoreRef}; use crate::{BoxedProcedure, ProcedureId}; -mod state_store; +pub mod state_store; /// Serialized data of a procedure. #[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] diff --git a/src/common/procedure/src/store/state_store.rs b/src/common/procedure/src/store/state_store.rs index 450940a1ad..88db83cf75 100644 --- a/src/common/procedure/src/store/state_store.rs +++ b/src/common/procedure/src/store/state_store.rs @@ -17,21 +17,23 @@ use std::sync::Arc; use async_stream::try_stream; use async_trait::async_trait; +use common_error::ext::PlainError; +use common_error::prelude::{BoxedError, StatusCode}; use futures::{Stream, StreamExt}; use object_store::{EntryMode, Metakey, ObjectStore}; use snafu::ResultExt; -use crate::error::{DeleteStateSnafu, Error, ListStateSnafu, PutStateSnafu, Result}; +use crate::error::{DeleteStateSnafu, ListStateSnafu, PutStateSnafu, Result}; /// Key value from state store. -type KeyValue = (String, Vec); +pub type KeyValue = (String, Vec); /// Stream that yields [KeyValue]. -type KeyValueStream = Pin> + Send>>; +pub type KeyValueStream = Pin> + Send>>; /// Storage layer for persisting procedure's state. #[async_trait] -pub(crate) trait StateStore: Send + Sync { +pub trait StateStore: Send + Sync { /// Puts `key` and `value` into the store. async fn put(&self, key: &str, value: Vec) -> Result<()>; @@ -51,13 +53,13 @@ pub(crate) type StateStoreRef = Arc; /// [StateStore] based on [ObjectStore]. #[derive(Debug)] -pub(crate) struct ObjectStateStore { +pub struct ObjectStateStore { store: ObjectStore, } impl ObjectStateStore { /// Returns a new [ObjectStateStore] with specific `store`. - pub(crate) fn new(store: ObjectStore) -> ObjectStateStore { + pub fn new(store: ObjectStore) -> ObjectStateStore { ObjectStateStore { store } } } @@ -68,31 +70,65 @@ impl StateStore for ObjectStateStore { self.store .write(key, value) .await + .map_err(|e| { + BoxedError::new(PlainError::new( + e.to_string(), + StatusCode::StorageUnavailable, + )) + }) .context(PutStateSnafu { key }) } async fn walk_top_down(&self, path: &str) -> Result { let path_string = path.to_string(); - let mut lister = self.store.scan(path).await.map_err(|e| Error::ListState { - path: path_string.clone(), - source: e, - })?; + let mut lister = self + .store + .scan(path) + .await + .map_err(|e| { + BoxedError::new(PlainError::new( + e.to_string(), + StatusCode::StorageUnavailable, + )) + }) + .with_context(|_| ListStateSnafu { + path: path_string.clone(), + })?; let store = self.store.clone(); let stream = try_stream!({ while let Some(res) = lister.next().await { - let entry = res.context(ListStateSnafu { path: &path_string })?; + let entry = res + .map_err(|e| { + BoxedError::new(PlainError::new( + e.to_string(), + StatusCode::StorageUnavailable, + )) + }) + .context(ListStateSnafu { path: &path_string })?; let key = entry.path(); let metadata = store .metadata(&entry, Metakey::Mode) .await + .map_err(|e| { + BoxedError::new(PlainError::new( + e.to_string(), + StatusCode::StorageUnavailable, + )) + }) .context(ListStateSnafu { path: key })?; if let EntryMode::FILE = metadata.mode() { let value = store .read(key) .await + .map_err(|e| { + BoxedError::new(PlainError::new( + e.to_string(), + StatusCode::StorageUnavailable, + )) + }) .context(ListStateSnafu { path: key })?; yield (key.to_string(), value); } diff --git a/src/datanode/src/instance.rs b/src/datanode/src/instance.rs index 7e55add6eb..b8153104ea 100644 --- a/src/datanode/src/instance.rs +++ b/src/datanode/src/instance.rs @@ -23,6 +23,7 @@ use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, MIN_USER use common_error::prelude::BoxedError; use common_grpc::channel_manager::{ChannelConfig, ChannelManager}; use common_procedure::local::{LocalManager, ManagerConfig}; +use common_procedure::store::state_store::ObjectStateStore; use common_procedure::ProcedureManagerRef; use common_telemetry::logging::info; use log_store::raft_engine::log_store::RaftEngineLogStore; @@ -522,11 +523,15 @@ pub(crate) async fn create_procedure_manager( ); let object_store = new_object_store(&procedure_config.store).await?; + let state_store = Arc::new(ObjectStateStore::new(object_store)); + let manager_config = ManagerConfig { - object_store, max_retry_times: procedure_config.max_retry_times, retry_delay: procedure_config.retry_delay, }; - Ok(Some(Arc::new(LocalManager::new(manager_config)))) + Ok(Some(Arc::new(LocalManager::new( + manager_config, + state_store, + )))) } diff --git a/src/meta-srv/Cargo.toml b/src/meta-srv/Cargo.toml index baa599dbda..db451b8fae 100644 --- a/src/meta-srv/Cargo.toml +++ b/src/meta-srv/Cargo.toml @@ -10,12 +10,14 @@ mock = [] [dependencies] anymap = "1.0.0-beta.2" api = { path = "../api" } +async-stream.workspace = true async-trait = "0.1" catalog = { path = "../catalog" } common-base = { path = "../common/base" } common-catalog = { path = "../common/catalog" } common-error = { path = "../common/error" } common-grpc = { path = "../common/grpc" } +common-procedure = { path = "../common/procedure" } common-runtime = { path = "../common/runtime" } common-telemetry = { path = "../common/telemetry" } common-time = { path = "../common/time" } diff --git a/src/meta-srv/src/bootstrap.rs b/src/meta-srv/src/bootstrap.rs index 96bdd5fb77..77b89961f0 100644 --- a/src/meta-srv/src/bootstrap.rs +++ b/src/meta-srv/src/bootstrap.rs @@ -61,7 +61,8 @@ impl MetaSrvInstance { } pub async fn start(&mut self) -> Result<()> { - self.meta_srv.start().await; + self.meta_srv.try_start().await?; + let (tx, mut rx) = mpsc::channel::<()>(1); self.signal_sender = Some(tx); @@ -131,10 +132,7 @@ pub async fn build_meta_srv(opts: &MetaSrvOptions) -> Result { .context(error::ConnectEtcdSnafu)?; ( EtcdStore::with_etcd_client(etcd_client.clone())?, - Some(EtcdElection::with_etcd_client( - &opts.server_addr, - etcd_client.clone(), - )?), + Some(EtcdElection::with_etcd_client(&opts.server_addr, etcd_client.clone()).await?), Some(EtcdLock::with_etcd_client(etcd_client)?), ) }; @@ -172,7 +170,7 @@ pub async fn build_meta_srv(opts: &MetaSrvOptions) -> Result { pub async fn make_meta_srv(opts: &MetaSrvOptions) -> Result { let meta_srv = build_meta_srv(opts).await?; - meta_srv.start().await; + meta_srv.try_start().await?; Ok(meta_srv) } diff --git a/src/meta-srv/src/election.rs b/src/meta-srv/src/election.rs index 005e5d79ec..77401e1278 100644 --- a/src/meta-srv/src/election.rs +++ b/src/meta-srv/src/election.rs @@ -14,12 +14,23 @@ pub mod etcd; +use std::sync::Arc; + +use etcd_client::LeaderKey; +use tokio::sync::broadcast::Receiver; + use crate::error::Result; pub const LEASE_SECS: i64 = 3; pub const KEEP_ALIVE_PERIOD_SECS: u64 = LEASE_SECS as u64 * 2 / 3; pub const ELECTION_KEY: &str = "__meta_srv_election"; +#[derive(Clone)] +pub enum LeaderChangeMessage { + Elected(Arc), + StepDown(Arc), +} + #[async_trait::async_trait] pub trait Election: Send + Sync { type Leader; @@ -46,4 +57,6 @@ pub trait Election: Send + Sync { /// Releases election leadership so other campaigners may /// acquire leadership on the election. async fn resign(&self) -> Result<()>; + + fn subscribe_leader_change(&self) -> Receiver; } diff --git a/src/meta-srv/src/election/etcd.rs b/src/meta-srv/src/election/etcd.rs index ee19eede97..8e49ccdc36 100644 --- a/src/meta-srv/src/election/etcd.rs +++ b/src/meta-srv/src/election/etcd.rs @@ -16,11 +16,16 @@ use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::Arc; use std::time::Duration; -use common_telemetry::{info, warn}; +use common_telemetry::{error, info, warn}; use etcd_client::Client; use snafu::{OptionExt, ResultExt}; +use tokio::sync::broadcast; +use tokio::sync::broadcast::error::RecvError; +use tokio::sync::broadcast::Receiver; -use crate::election::{Election, ELECTION_KEY, KEEP_ALIVE_PERIOD_SECS, LEASE_SECS}; +use crate::election::{ + Election, LeaderChangeMessage, ELECTION_KEY, KEEP_ALIVE_PERIOD_SECS, LEASE_SECS, +}; use crate::error; use crate::error::Result; use crate::metasrv::{ElectionRef, LeaderValue}; @@ -30,6 +35,7 @@ pub struct EtcdElection { client: Client, is_leader: AtomicBool, infancy: AtomicBool, + leader_watcher: broadcast::Sender, } impl EtcdElection { @@ -42,20 +48,50 @@ impl EtcdElection { .await .context(error::ConnectEtcdSnafu)?; - Self::with_etcd_client(leader_value, client) + Self::with_etcd_client(leader_value, client).await } - pub fn with_etcd_client(leader_value: E, client: Client) -> Result + pub async fn with_etcd_client(leader_value: E, client: Client) -> Result where E: AsRef, { - let leader_value = leader_value.as_ref().into(); + let leader_value: String = leader_value.as_ref().into(); + + let leader_ident = leader_value.clone(); + let (tx, mut rx) = broadcast::channel(100); + common_runtime::spawn_bg(async move { + loop { + match rx.recv().await { + Ok(msg) => match msg { + LeaderChangeMessage::Elected(key) => { + info!( + "[{leader_ident}] is elected as leader: {:?}, lease: {}", + key.name_str(), + key.lease() + ); + } + LeaderChangeMessage::StepDown(key) => { + warn!( + "[{leader_ident}] is stepping down: {:?}, lease: {}", + key.name_str(), + key.lease() + ); + } + }, + Err(RecvError::Lagged(_)) => { + warn!("Log printing is too slow or leader changed too fast!"); + } + Err(RecvError::Closed) => break, + } + } + }); Ok(Arc::new(Self { leader_value, client, is_leader: AtomicBool::new(false), infancy: AtomicBool::new(false), + leader_watcher: tx, })) } } @@ -120,18 +156,21 @@ impl Election for EtcdElection { .is_ok() { self.infancy.store(true, Ordering::Relaxed); - info!( - "[{}] becoming leader: {:?}, lease: {}", - &self.leader_value, - leader.name_str(), - leader.lease() - ); + + if let Err(e) = self + .leader_watcher + .send(LeaderChangeMessage::Elected(Arc::new(leader.clone()))) + { + error!("Failed to send leader change message, error: {e}"); + } } } else { - warn!( - "Failed to keep-alive, lease: {}, will re-initiate election", - leader.lease() - ); + if let Err(e) = self + .leader_watcher + .send(LeaderChangeMessage::StepDown(Arc::new(leader.clone()))) + { + error!("Failed to send leader change message, error: {e}"); + } break; } } @@ -162,4 +201,8 @@ impl Election for EtcdElection { async fn resign(&self) -> Result<()> { todo!() } + + fn subscribe_leader_change(&self) -> Receiver { + self.leader_watcher.subscribe() + } } diff --git a/src/meta-srv/src/error.rs b/src/meta-srv/src/error.rs index 09661f1b7f..d311e5bb8f 100644 --- a/src/meta-srv/src/error.rs +++ b/src/meta-srv/src/error.rs @@ -274,6 +274,12 @@ pub enum Error { #[snafu(display("Missing required parameter, param: {:?}", param))] MissingRequiredParameter { param: String }, + + #[snafu(display("Failed to recover procedure, source: {source}"))] + RecoverProcedure { + #[snafu(backtrace)] + source: common_procedure::Error, + }, } pub type Result = std::result::Result; @@ -341,6 +347,7 @@ impl ErrorExt for Error { Error::TableNotFound { .. } => StatusCode::TableNotFound, Error::InvalidCatalogValue { source, .. } => source.status_code(), Error::MetaInternal { source } => source.status_code(), + Error::RecoverProcedure { source } => source.status_code(), } } } diff --git a/src/meta-srv/src/lib.rs b/src/meta-srv/src/lib.rs index af28e19da8..a03022e624 100644 --- a/src/meta-srv/src/lib.rs +++ b/src/meta-srv/src/lib.rs @@ -12,7 +12,9 @@ // See the License for the specific language governing permissions and // limitations under the License. +#![feature(async_closure)] #![feature(btree_drain_filter)] + pub mod bootstrap; pub mod cluster; pub mod election; @@ -25,6 +27,7 @@ pub mod lock; pub mod metasrv; #[cfg(feature = "mock")] pub mod mocks; +mod procedure; pub mod selector; mod sequence; pub mod service; diff --git a/src/meta-srv/src/metasrv.rs b/src/meta-srv/src/metasrv.rs index 608f40822f..951dd393d2 100644 --- a/src/meta-srv/src/metasrv.rs +++ b/src/meta-srv/src/metasrv.rs @@ -18,11 +18,15 @@ use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::Arc; use api::v1::meta::Peer; -use common_telemetry::{info, warn}; +use common_procedure::ProcedureManagerRef; +use common_telemetry::{error, info, warn}; use serde::{Deserialize, Serialize}; +use snafu::ResultExt; +use tokio::sync::broadcast::error::RecvError; use crate::cluster::MetaPeerClient; -use crate::election::Election; +use crate::election::{Election, LeaderChangeMessage}; +use crate::error::{RecoverProcedureSnafu, Result}; use crate::handler::HeartbeatHandlerGroup; use crate::lock::DistLockRef; use crate::selector::{Selector, SelectorType}; @@ -102,20 +106,51 @@ pub struct MetaSrv { election: Option, meta_peer_client: Option, lock: Option, + procedure_manager: ProcedureManagerRef, } impl MetaSrv { - pub async fn start(&self) { + pub async fn try_start(&self) -> Result<()> { if self .started .compare_exchange(false, true, Ordering::Relaxed, Ordering::Relaxed) .is_err() { warn!("MetaSrv already started"); - return; + return Ok(()); } if let Some(election) = self.election() { + let procedure_manager = self.procedure_manager.clone(); + let mut rx = election.subscribe_leader_change(); + common_runtime::spawn_bg(async move { + loop { + match rx.recv().await { + Ok(msg) => { + match msg { + LeaderChangeMessage::Elected(_) => { + if let Err(e) = procedure_manager.recover().await { + error!("Failed to recover procedures, error: {e}"); + } + } + LeaderChangeMessage::StepDown(_) => { + // TODO(LFC): TBC + unimplemented!() + } + } + } + Err(RecvError::Closed) => { + error!("Not expected, is leader election loop still running?"); + break; + } + Err(RecvError::Lagged(_)) => { + // TODO(LFC): TBC + break; + } + } + } + }); + let election = election.clone(); let started = self.started.clone(); common_runtime::spawn_bg(async move { @@ -128,9 +163,15 @@ impl MetaSrv { } info!("MetaSrv stopped"); }); + } else { + self.procedure_manager + .recover() + .await + .context(RecoverProcedureSnafu)?; } info!("MetaSrv started"); + Ok(()) } pub fn shutdown(&self) { diff --git a/src/meta-srv/src/metasrv/builder.rs b/src/meta-srv/src/metasrv/builder.rs index c39254df48..d8ec581adf 100644 --- a/src/meta-srv/src/metasrv/builder.rs +++ b/src/meta-srv/src/metasrv/builder.rs @@ -15,6 +15,8 @@ use std::sync::atomic::AtomicBool; use std::sync::Arc; +use common_procedure::local::{LocalManager, ManagerConfig}; + use crate::cluster::MetaPeerClient; use crate::handler::{ CheckLeaderHandler, CollectStatsHandler, HeartbeatHandlerGroup, KeepLeaseHandler, @@ -22,6 +24,7 @@ use crate::handler::{ }; use crate::lock::DistLockRef; use crate::metasrv::{ElectionRef, MetaSrv, MetaSrvOptions, SelectorRef, TABLE_ID_SEQ}; +use crate::procedure::state_store::MetaStateStore; use crate::selector::lease_based::LeaseBasedSelector; use crate::sequence::Sequence; use crate::service::store::kv::{KvStoreRef, ResettableKvStoreRef}; @@ -139,6 +142,10 @@ impl MetaSrvBuilder { let table_id_sequence = Arc::new(Sequence::new(TABLE_ID_SEQ, 1024, 10, kv_store.clone())); + let config = ManagerConfig::default(); + let state_store = Arc::new(MetaStateStore::new(kv_store.clone())); + let procedure_manager = Arc::new(LocalManager::new(config, state_store)); + MetaSrv { started, options, @@ -150,6 +157,7 @@ impl MetaSrvBuilder { election, meta_peer_client, lock, + procedure_manager, } } } diff --git a/src/meta-srv/src/procedure.rs b/src/meta-srv/src/procedure.rs new file mode 100644 index 0000000000..0439dacd75 --- /dev/null +++ b/src/meta-srv/src/procedure.rs @@ -0,0 +1,15 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +pub(crate) mod state_store; diff --git a/src/meta-srv/src/procedure/state_store.rs b/src/meta-srv/src/procedure/state_store.rs new file mode 100644 index 0000000000..19bb226b01 --- /dev/null +++ b/src/meta-srv/src/procedure/state_store.rs @@ -0,0 +1,194 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use api::v1::meta::{BatchDeleteRequest, PutRequest, RangeRequest}; +use async_stream::try_stream; +use async_trait::async_trait; +use common_error::prelude::BoxedError; +use common_procedure::error::{ + CorruptedDataSnafu, DeleteStatesSnafu, ListStateSnafu, PutStateSnafu, +}; +use common_procedure::store::state_store::{KeyValueStream, StateStore}; +use common_procedure::Result; +use snafu::ResultExt; + +use crate::service::store::kv::KvStoreRef; +use crate::util; + +const PROCEDURE_PREFIX: &str = "/__procedure__/"; + +fn with_prefix(key: &str) -> String { + format!("{PROCEDURE_PREFIX}{key}") +} + +fn strip_prefix(key: &str) -> String { + key.trim_start_matches(PROCEDURE_PREFIX).to_string() +} + +pub(crate) struct MetaStateStore { + kv_store: KvStoreRef, + max_size_per_range: i64, +} + +impl MetaStateStore { + pub(crate) fn new(kv_store: KvStoreRef) -> Self { + Self { + kv_store, + max_size_per_range: -1, + } + } +} + +#[async_trait] +impl StateStore for MetaStateStore { + async fn put(&self, key: &str, value: Vec) -> Result<()> { + let _ = self + .kv_store + .put(PutRequest { + key: with_prefix(key).into_bytes(), + value, + ..Default::default() + }) + .await + .map_err(BoxedError::new) + .context(PutStateSnafu { key })?; + Ok(()) + } + + async fn walk_top_down(&self, path: &str) -> Result { + // extend their lifetimes to be used in the stream + let path = path.to_string(); + let kv_store = self.kv_store.clone(); + let limit = self.max_size_per_range; + + let stream = try_stream! { + let mut key = with_prefix(path.trim_start_matches('/')).into_bytes(); + let range_end = util::get_prefix_end_key(&key); + loop { + let req = RangeRequest { + key: key.clone(), + range_end: range_end.clone(), + limit, + ..Default::default() + }; + let resp = kv_store.range(req).await.map_err(BoxedError::new).with_context(|_| + ListStateSnafu { path: path.clone() } + )?; + + let mut no_more_data = true; + if resp.more { + if let Some(last) = resp.kvs.last() { + key = util::get_prefix_end_key(&last.key); + no_more_data = false; + } + } + + for kv in resp.kvs { + let key = String::from_utf8(kv.key).context(CorruptedDataSnafu)?; + let key = strip_prefix(&key); + let value = kv.value; + yield (key, value) + } + + if no_more_data { + break; + } + } + }; + Ok(Box::pin(stream)) + } + + async fn delete(&self, keys: &[String]) -> Result<()> { + let _ = self + .kv_store + .batch_delete(BatchDeleteRequest { + keys: keys + .iter() + .map(|x| with_prefix(x).into_bytes()) + .collect::>(), + ..Default::default() + }) + .await + .map_err(BoxedError::new) + .with_context(|_| DeleteStatesSnafu { + keys: format!("{:?}", keys.to_vec()), + })?; + Ok(()) + } +} + +#[cfg(test)] +mod tests { + use std::sync::Arc; + + use common_procedure::store::state_store::KeyValue; + use futures::TryStreamExt; + + use super::*; + use crate::service::store::memory::MemStore; + + #[tokio::test] + async fn test_meta_state_store() { + let store = &MetaStateStore { + kv_store: Arc::new(MemStore::new()), + max_size_per_range: 1, // for testing "more" in range + }; + + let walk_top_down = async move |path: &str| -> Vec { + let mut data = store + .walk_top_down(path) + .await + .unwrap() + .try_collect::>() + .await + .unwrap(); + data.sort_unstable_by(|a, b| a.0.cmp(&b.0)); + data + }; + + let data = walk_top_down("/").await; + assert!(data.is_empty()); + + store.put("a/1", b"v1".to_vec()).await.unwrap(); + store.put("a/2", b"v2".to_vec()).await.unwrap(); + store.put("b/1", b"v3".to_vec()).await.unwrap(); + + let data = walk_top_down("/").await; + assert_eq!( + vec![ + ("a/1".to_string(), b"v1".to_vec()), + ("a/2".to_string(), b"v2".to_vec()), + ("b/1".to_string(), b"v3".to_vec()) + ], + data + ); + + let data = walk_top_down("a/").await; + assert_eq!( + vec![ + ("a/1".to_string(), b"v1".to_vec()), + ("a/2".to_string(), b"v2".to_vec()), + ], + data + ); + + store + .delete(&["a/2".to_string(), "b/1".to_string()]) + .await + .unwrap(); + + let data = walk_top_down("a/").await; + assert_eq!(vec![("a/1".to_string(), b"v1".to_vec()),], data); + } +} diff --git a/src/table-procedure/src/test_util.rs b/src/table-procedure/src/test_util.rs index 2789c4c45e..c86433284e 100644 --- a/src/table-procedure/src/test_util.rs +++ b/src/table-procedure/src/test_util.rs @@ -18,6 +18,7 @@ use std::time::Duration; use catalog::local::MemoryCatalogManager; use catalog::CatalogManagerRef; use common_procedure::local::{LocalManager, ManagerConfig}; +use common_procedure::store::state_store::ObjectStateStore; use common_procedure::ProcedureManagerRef; use common_test_util::temp_dir::{create_temp_dir, TempDir}; use log_store::NoopLogStore; @@ -62,11 +63,12 @@ impl TestEnv { builder.root(&procedure_dir); let object_store = ObjectStore::new(builder).unwrap().finish(); - let procedure_manager = Arc::new(LocalManager::new(ManagerConfig { - object_store, + let config = ManagerConfig { max_retry_times: 3, retry_delay: Duration::from_secs(500), - })); + }; + let state_store = Arc::new(ObjectStateStore::new(object_store)); + let procedure_manager = Arc::new(LocalManager::new(config, state_store)); let catalog_manager = Arc::new(MemoryCatalogManager::default());