feat: start LocalManager in Metasrv (#1279)

* feat: procedure store in Metasrv, backed by Etcd; start `LocalManager` in Metasrv leader

* fix: resolve PR comments

* fix: resolve PR comments
This commit is contained in:
LFC
2023-03-31 15:32:59 +08:00
committed by GitHub
parent dee20144d7
commit eb77f9aafd
18 changed files with 457 additions and 65 deletions

2
Cargo.lock generated
View File

@@ -4111,12 +4111,14 @@ version = "0.1.1"
dependencies = [
"anymap",
"api",
"async-stream",
"async-trait",
"catalog",
"common-base",
"common-catalog",
"common-error",
"common-grpc",
"common-procedure",
"common-runtime",
"common-telemetry",
"common-time",

View File

@@ -13,6 +13,7 @@
// limitations under the License.
use std::any::Any;
use std::string::FromUtf8Error;
use std::sync::Arc;
use common_error::prelude::*;
@@ -47,10 +48,11 @@ pub enum Error {
backtrace: Backtrace,
},
#[snafu(display("Failed to put {}, source: {}", key, source))]
#[snafu(display("Failed to put state, key: '{key}', source: {source}"))]
PutState {
key: String,
source: object_store::Error,
#[snafu(backtrace)]
source: BoxedError,
},
#[snafu(display("Failed to delete {}, source: {}", key, source))]
@@ -59,10 +61,18 @@ pub enum Error {
source: object_store::Error,
},
#[snafu(display("Failed to list {}, source: {}", path, source))]
#[snafu(display("Failed to delete keys: '{keys}', source: {source}"))]
DeleteStates {
keys: String,
#[snafu(backtrace)]
source: BoxedError,
},
#[snafu(display("Failed to list state, path: '{path}', source: {source}"))]
ListState {
path: String,
source: object_store::Error,
#[snafu(backtrace)]
source: BoxedError,
},
#[snafu(display("Failed to read {}, source: {}", key, source))]
@@ -107,6 +117,9 @@ pub enum Error {
source: Arc<Error>,
procedure_id: ProcedureId,
},
#[snafu(display("Corrupted data, error: {source}"))]
CorruptedData { source: FromUtf8Error },
}
pub type Result<T> = std::result::Result<T, Error>;
@@ -114,11 +127,13 @@ pub type Result<T> = std::result::Result<T, Error>;
impl ErrorExt for Error {
fn status_code(&self) -> StatusCode {
match self {
Error::External { source } => source.status_code(),
Error::External { source }
| Error::PutState { source, .. }
| Error::DeleteStates { source, .. }
| Error::ListState { source, .. } => source.status_code(),
Error::ToJson { .. }
| Error::PutState { .. }
| Error::DeleteState { .. }
| Error::ListState { .. }
| Error::ReadState { .. }
| Error::FromJson { .. }
| Error::RetryTimesExceeded { .. }
@@ -127,7 +142,7 @@ impl ErrorExt for Error {
Error::LoaderConflict { .. } | Error::DuplicateProcedure { .. } => {
StatusCode::InvalidArguments
}
Error::ProcedurePanic { .. } => StatusCode::Unexpected,
Error::ProcedurePanic { .. } | Error::CorruptedData { .. } => StatusCode::Unexpected,
Error::ProcedureExec { source, .. } => source.status_code(),
}
}

View File

@@ -17,7 +17,7 @@
pub mod error;
pub mod local;
mod procedure;
mod store;
pub mod store;
pub mod watcher;
pub use crate::error::{Error, Result};

View File

@@ -22,7 +22,6 @@ use std::time::Duration;
use async_trait::async_trait;
use backon::ExponentialBuilder;
use common_telemetry::logging;
use object_store::ObjectStore;
use snafu::ensure;
use tokio::sync::watch::{self, Receiver, Sender};
use tokio::sync::Notify;
@@ -31,7 +30,7 @@ use crate::error::{DuplicateProcedureSnafu, LoaderConflictSnafu, Result};
use crate::local::lock::LockMap;
use crate::local::runner::Runner;
use crate::procedure::BoxedProcedureLoader;
use crate::store::{ObjectStateStore, ProcedureMessage, ProcedureStore, StateStoreRef};
use crate::store::{ProcedureMessage, ProcedureStore, StateStoreRef};
use crate::{
BoxedProcedure, ContextProvider, LockKey, ProcedureId, ProcedureManager, ProcedureState,
ProcedureWithId, Watcher,
@@ -291,12 +290,19 @@ impl ManagerContext {
/// Config for [LocalManager].
#[derive(Debug)]
pub struct ManagerConfig {
/// Object store
pub object_store: ObjectStore,
pub max_retry_times: usize,
pub retry_delay: Duration,
}
impl Default for ManagerConfig {
fn default() -> Self {
Self {
max_retry_times: 3,
retry_delay: Duration::from_millis(500),
}
}
}
/// A [ProcedureManager] that maintains procedure states locally.
pub struct LocalManager {
manager_ctx: Arc<ManagerContext>,
@@ -307,10 +313,10 @@ pub struct LocalManager {
impl LocalManager {
/// Create a new [LocalManager] with specific `config`.
pub fn new(config: ManagerConfig) -> LocalManager {
pub fn new(config: ManagerConfig, state_store: StateStoreRef) -> LocalManager {
LocalManager {
manager_ctx: Arc::new(ManagerContext::new()),
state_store: Arc::new(ObjectStateStore::new(config.object_store)),
state_store,
max_retry_times: config.max_retry_times,
retry_delay: config.retry_delay,
}
@@ -423,6 +429,7 @@ impl ProcedureManager for LocalManager {
mod test_util {
use common_test_util::temp_dir::TempDir;
use object_store::services::Fs as Builder;
use object_store::ObjectStore;
use super::*;
@@ -446,6 +453,7 @@ mod tests {
use super::*;
use crate::error::Error;
use crate::store::ObjectStateStore;
use crate::{Context, Procedure, Status};
#[test]
@@ -554,11 +562,11 @@ mod tests {
fn test_register_loader() {
let dir = create_temp_dir("register");
let config = ManagerConfig {
object_store: test_util::new_object_store(&dir),
max_retry_times: 3,
retry_delay: Duration::from_millis(500),
};
let manager = LocalManager::new(config);
let state_store = Arc::new(ObjectStateStore::new(test_util::new_object_store(&dir)));
let manager = LocalManager::new(config, state_store);
manager
.register_loader("ProcedureToLoad", ProcedureToLoad::loader())
@@ -575,11 +583,11 @@ mod tests {
let dir = create_temp_dir("recover");
let object_store = test_util::new_object_store(&dir);
let config = ManagerConfig {
object_store: object_store.clone(),
max_retry_times: 3,
retry_delay: Duration::from_millis(500),
};
let manager = LocalManager::new(config);
let state_store = Arc::new(ObjectStateStore::new(object_store.clone()));
let manager = LocalManager::new(config, state_store);
manager
.register_loader("ProcedureToLoad", ProcedureToLoad::loader())
@@ -621,11 +629,11 @@ mod tests {
async fn test_submit_procedure() {
let dir = create_temp_dir("submit");
let config = ManagerConfig {
object_store: test_util::new_object_store(&dir),
max_retry_times: 3,
retry_delay: Duration::from_millis(500),
};
let manager = LocalManager::new(config);
let state_store = Arc::new(ObjectStateStore::new(test_util::new_object_store(&dir)));
let manager = LocalManager::new(config, state_store);
let procedure_id = ProcedureId::random();
assert!(manager
@@ -669,11 +677,11 @@ mod tests {
async fn test_state_changed_on_err() {
let dir = create_temp_dir("on_err");
let config = ManagerConfig {
object_store: test_util::new_object_store(&dir),
max_retry_times: 3,
retry_delay: Duration::from_millis(500),
};
let manager = LocalManager::new(config);
let state_store = Arc::new(ObjectStateStore::new(test_util::new_object_store(&dir)));
let manager = LocalManager::new(config, state_store);
#[derive(Debug)]
struct MockProcedure {

View File

@@ -26,7 +26,7 @@ use crate::error::{Result, ToJsonSnafu};
pub(crate) use crate::store::state_store::{ObjectStateStore, StateStoreRef};
use crate::{BoxedProcedure, ProcedureId};
mod state_store;
pub mod state_store;
/// Serialized data of a procedure.
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]

View File

@@ -17,21 +17,23 @@ use std::sync::Arc;
use async_stream::try_stream;
use async_trait::async_trait;
use common_error::ext::PlainError;
use common_error::prelude::{BoxedError, StatusCode};
use futures::{Stream, StreamExt};
use object_store::{EntryMode, Metakey, ObjectStore};
use snafu::ResultExt;
use crate::error::{DeleteStateSnafu, Error, ListStateSnafu, PutStateSnafu, Result};
use crate::error::{DeleteStateSnafu, ListStateSnafu, PutStateSnafu, Result};
/// Key value from state store.
type KeyValue = (String, Vec<u8>);
pub type KeyValue = (String, Vec<u8>);
/// Stream that yields [KeyValue].
type KeyValueStream = Pin<Box<dyn Stream<Item = Result<KeyValue>> + Send>>;
pub type KeyValueStream = Pin<Box<dyn Stream<Item = Result<KeyValue>> + Send>>;
/// Storage layer for persisting procedure's state.
#[async_trait]
pub(crate) trait StateStore: Send + Sync {
pub trait StateStore: Send + Sync {
/// Puts `key` and `value` into the store.
async fn put(&self, key: &str, value: Vec<u8>) -> Result<()>;
@@ -51,13 +53,13 @@ pub(crate) type StateStoreRef = Arc<dyn StateStore>;
/// [StateStore] based on [ObjectStore].
#[derive(Debug)]
pub(crate) struct ObjectStateStore {
pub struct ObjectStateStore {
store: ObjectStore,
}
impl ObjectStateStore {
/// Returns a new [ObjectStateStore] with specific `store`.
pub(crate) fn new(store: ObjectStore) -> ObjectStateStore {
pub fn new(store: ObjectStore) -> ObjectStateStore {
ObjectStateStore { store }
}
}
@@ -68,31 +70,65 @@ impl StateStore for ObjectStateStore {
self.store
.write(key, value)
.await
.map_err(|e| {
BoxedError::new(PlainError::new(
e.to_string(),
StatusCode::StorageUnavailable,
))
})
.context(PutStateSnafu { key })
}
async fn walk_top_down(&self, path: &str) -> Result<KeyValueStream> {
let path_string = path.to_string();
let mut lister = self.store.scan(path).await.map_err(|e| Error::ListState {
path: path_string.clone(),
source: e,
})?;
let mut lister = self
.store
.scan(path)
.await
.map_err(|e| {
BoxedError::new(PlainError::new(
e.to_string(),
StatusCode::StorageUnavailable,
))
})
.with_context(|_| ListStateSnafu {
path: path_string.clone(),
})?;
let store = self.store.clone();
let stream = try_stream!({
while let Some(res) = lister.next().await {
let entry = res.context(ListStateSnafu { path: &path_string })?;
let entry = res
.map_err(|e| {
BoxedError::new(PlainError::new(
e.to_string(),
StatusCode::StorageUnavailable,
))
})
.context(ListStateSnafu { path: &path_string })?;
let key = entry.path();
let metadata = store
.metadata(&entry, Metakey::Mode)
.await
.map_err(|e| {
BoxedError::new(PlainError::new(
e.to_string(),
StatusCode::StorageUnavailable,
))
})
.context(ListStateSnafu { path: key })?;
if let EntryMode::FILE = metadata.mode() {
let value = store
.read(key)
.await
.map_err(|e| {
BoxedError::new(PlainError::new(
e.to_string(),
StatusCode::StorageUnavailable,
))
})
.context(ListStateSnafu { path: key })?;
yield (key.to_string(), value);
}

View File

@@ -23,6 +23,7 @@ use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, MIN_USER
use common_error::prelude::BoxedError;
use common_grpc::channel_manager::{ChannelConfig, ChannelManager};
use common_procedure::local::{LocalManager, ManagerConfig};
use common_procedure::store::state_store::ObjectStateStore;
use common_procedure::ProcedureManagerRef;
use common_telemetry::logging::info;
use log_store::raft_engine::log_store::RaftEngineLogStore;
@@ -522,11 +523,15 @@ pub(crate) async fn create_procedure_manager(
);
let object_store = new_object_store(&procedure_config.store).await?;
let state_store = Arc::new(ObjectStateStore::new(object_store));
let manager_config = ManagerConfig {
object_store,
max_retry_times: procedure_config.max_retry_times,
retry_delay: procedure_config.retry_delay,
};
Ok(Some(Arc::new(LocalManager::new(manager_config))))
Ok(Some(Arc::new(LocalManager::new(
manager_config,
state_store,
))))
}

View File

@@ -10,12 +10,14 @@ mock = []
[dependencies]
anymap = "1.0.0-beta.2"
api = { path = "../api" }
async-stream.workspace = true
async-trait = "0.1"
catalog = { path = "../catalog" }
common-base = { path = "../common/base" }
common-catalog = { path = "../common/catalog" }
common-error = { path = "../common/error" }
common-grpc = { path = "../common/grpc" }
common-procedure = { path = "../common/procedure" }
common-runtime = { path = "../common/runtime" }
common-telemetry = { path = "../common/telemetry" }
common-time = { path = "../common/time" }

View File

@@ -61,7 +61,8 @@ impl MetaSrvInstance {
}
pub async fn start(&mut self) -> Result<()> {
self.meta_srv.start().await;
self.meta_srv.try_start().await?;
let (tx, mut rx) = mpsc::channel::<()>(1);
self.signal_sender = Some(tx);
@@ -131,10 +132,7 @@ pub async fn build_meta_srv(opts: &MetaSrvOptions) -> Result<MetaSrv> {
.context(error::ConnectEtcdSnafu)?;
(
EtcdStore::with_etcd_client(etcd_client.clone())?,
Some(EtcdElection::with_etcd_client(
&opts.server_addr,
etcd_client.clone(),
)?),
Some(EtcdElection::with_etcd_client(&opts.server_addr, etcd_client.clone()).await?),
Some(EtcdLock::with_etcd_client(etcd_client)?),
)
};
@@ -172,7 +170,7 @@ pub async fn build_meta_srv(opts: &MetaSrvOptions) -> Result<MetaSrv> {
pub async fn make_meta_srv(opts: &MetaSrvOptions) -> Result<MetaSrv> {
let meta_srv = build_meta_srv(opts).await?;
meta_srv.start().await;
meta_srv.try_start().await?;
Ok(meta_srv)
}

View File

@@ -14,12 +14,23 @@
pub mod etcd;
use std::sync::Arc;
use etcd_client::LeaderKey;
use tokio::sync::broadcast::Receiver;
use crate::error::Result;
pub const LEASE_SECS: i64 = 3;
pub const KEEP_ALIVE_PERIOD_SECS: u64 = LEASE_SECS as u64 * 2 / 3;
pub const ELECTION_KEY: &str = "__meta_srv_election";
#[derive(Clone)]
pub enum LeaderChangeMessage {
Elected(Arc<LeaderKey>),
StepDown(Arc<LeaderKey>),
}
#[async_trait::async_trait]
pub trait Election: Send + Sync {
type Leader;
@@ -46,4 +57,6 @@ pub trait Election: Send + Sync {
/// Releases election leadership so other campaigners may
/// acquire leadership on the election.
async fn resign(&self) -> Result<()>;
fn subscribe_leader_change(&self) -> Receiver<LeaderChangeMessage>;
}

View File

@@ -16,11 +16,16 @@ use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Arc;
use std::time::Duration;
use common_telemetry::{info, warn};
use common_telemetry::{error, info, warn};
use etcd_client::Client;
use snafu::{OptionExt, ResultExt};
use tokio::sync::broadcast;
use tokio::sync::broadcast::error::RecvError;
use tokio::sync::broadcast::Receiver;
use crate::election::{Election, ELECTION_KEY, KEEP_ALIVE_PERIOD_SECS, LEASE_SECS};
use crate::election::{
Election, LeaderChangeMessage, ELECTION_KEY, KEEP_ALIVE_PERIOD_SECS, LEASE_SECS,
};
use crate::error;
use crate::error::Result;
use crate::metasrv::{ElectionRef, LeaderValue};
@@ -30,6 +35,7 @@ pub struct EtcdElection {
client: Client,
is_leader: AtomicBool,
infancy: AtomicBool,
leader_watcher: broadcast::Sender<LeaderChangeMessage>,
}
impl EtcdElection {
@@ -42,20 +48,50 @@ impl EtcdElection {
.await
.context(error::ConnectEtcdSnafu)?;
Self::with_etcd_client(leader_value, client)
Self::with_etcd_client(leader_value, client).await
}
pub fn with_etcd_client<E>(leader_value: E, client: Client) -> Result<ElectionRef>
pub async fn with_etcd_client<E>(leader_value: E, client: Client) -> Result<ElectionRef>
where
E: AsRef<str>,
{
let leader_value = leader_value.as_ref().into();
let leader_value: String = leader_value.as_ref().into();
let leader_ident = leader_value.clone();
let (tx, mut rx) = broadcast::channel(100);
common_runtime::spawn_bg(async move {
loop {
match rx.recv().await {
Ok(msg) => match msg {
LeaderChangeMessage::Elected(key) => {
info!(
"[{leader_ident}] is elected as leader: {:?}, lease: {}",
key.name_str(),
key.lease()
);
}
LeaderChangeMessage::StepDown(key) => {
warn!(
"[{leader_ident}] is stepping down: {:?}, lease: {}",
key.name_str(),
key.lease()
);
}
},
Err(RecvError::Lagged(_)) => {
warn!("Log printing is too slow or leader changed too fast!");
}
Err(RecvError::Closed) => break,
}
}
});
Ok(Arc::new(Self {
leader_value,
client,
is_leader: AtomicBool::new(false),
infancy: AtomicBool::new(false),
leader_watcher: tx,
}))
}
}
@@ -120,18 +156,21 @@ impl Election for EtcdElection {
.is_ok()
{
self.infancy.store(true, Ordering::Relaxed);
info!(
"[{}] becoming leader: {:?}, lease: {}",
&self.leader_value,
leader.name_str(),
leader.lease()
);
if let Err(e) = self
.leader_watcher
.send(LeaderChangeMessage::Elected(Arc::new(leader.clone())))
{
error!("Failed to send leader change message, error: {e}");
}
}
} else {
warn!(
"Failed to keep-alive, lease: {}, will re-initiate election",
leader.lease()
);
if let Err(e) = self
.leader_watcher
.send(LeaderChangeMessage::StepDown(Arc::new(leader.clone())))
{
error!("Failed to send leader change message, error: {e}");
}
break;
}
}
@@ -162,4 +201,8 @@ impl Election for EtcdElection {
async fn resign(&self) -> Result<()> {
todo!()
}
fn subscribe_leader_change(&self) -> Receiver<LeaderChangeMessage> {
self.leader_watcher.subscribe()
}
}

View File

@@ -274,6 +274,12 @@ pub enum Error {
#[snafu(display("Missing required parameter, param: {:?}", param))]
MissingRequiredParameter { param: String },
#[snafu(display("Failed to recover procedure, source: {source}"))]
RecoverProcedure {
#[snafu(backtrace)]
source: common_procedure::Error,
},
}
pub type Result<T> = std::result::Result<T, Error>;
@@ -341,6 +347,7 @@ impl ErrorExt for Error {
Error::TableNotFound { .. } => StatusCode::TableNotFound,
Error::InvalidCatalogValue { source, .. } => source.status_code(),
Error::MetaInternal { source } => source.status_code(),
Error::RecoverProcedure { source } => source.status_code(),
}
}
}

View File

@@ -12,7 +12,9 @@
// See the License for the specific language governing permissions and
// limitations under the License.
#![feature(async_closure)]
#![feature(btree_drain_filter)]
pub mod bootstrap;
pub mod cluster;
pub mod election;
@@ -25,6 +27,7 @@ pub mod lock;
pub mod metasrv;
#[cfg(feature = "mock")]
pub mod mocks;
mod procedure;
pub mod selector;
mod sequence;
pub mod service;

View File

@@ -18,11 +18,15 @@ use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Arc;
use api::v1::meta::Peer;
use common_telemetry::{info, warn};
use common_procedure::ProcedureManagerRef;
use common_telemetry::{error, info, warn};
use serde::{Deserialize, Serialize};
use snafu::ResultExt;
use tokio::sync::broadcast::error::RecvError;
use crate::cluster::MetaPeerClient;
use crate::election::Election;
use crate::election::{Election, LeaderChangeMessage};
use crate::error::{RecoverProcedureSnafu, Result};
use crate::handler::HeartbeatHandlerGroup;
use crate::lock::DistLockRef;
use crate::selector::{Selector, SelectorType};
@@ -102,20 +106,51 @@ pub struct MetaSrv {
election: Option<ElectionRef>,
meta_peer_client: Option<MetaPeerClient>,
lock: Option<DistLockRef>,
procedure_manager: ProcedureManagerRef,
}
impl MetaSrv {
pub async fn start(&self) {
pub async fn try_start(&self) -> Result<()> {
if self
.started
.compare_exchange(false, true, Ordering::Relaxed, Ordering::Relaxed)
.is_err()
{
warn!("MetaSrv already started");
return;
return Ok(());
}
if let Some(election) = self.election() {
let procedure_manager = self.procedure_manager.clone();
let mut rx = election.subscribe_leader_change();
common_runtime::spawn_bg(async move {
loop {
match rx.recv().await {
Ok(msg) => {
match msg {
LeaderChangeMessage::Elected(_) => {
if let Err(e) = procedure_manager.recover().await {
error!("Failed to recover procedures, error: {e}");
}
}
LeaderChangeMessage::StepDown(_) => {
// TODO(LFC): TBC
unimplemented!()
}
}
}
Err(RecvError::Closed) => {
error!("Not expected, is leader election loop still running?");
break;
}
Err(RecvError::Lagged(_)) => {
// TODO(LFC): TBC
break;
}
}
}
});
let election = election.clone();
let started = self.started.clone();
common_runtime::spawn_bg(async move {
@@ -128,9 +163,15 @@ impl MetaSrv {
}
info!("MetaSrv stopped");
});
} else {
self.procedure_manager
.recover()
.await
.context(RecoverProcedureSnafu)?;
}
info!("MetaSrv started");
Ok(())
}
pub fn shutdown(&self) {

View File

@@ -15,6 +15,8 @@
use std::sync::atomic::AtomicBool;
use std::sync::Arc;
use common_procedure::local::{LocalManager, ManagerConfig};
use crate::cluster::MetaPeerClient;
use crate::handler::{
CheckLeaderHandler, CollectStatsHandler, HeartbeatHandlerGroup, KeepLeaseHandler,
@@ -22,6 +24,7 @@ use crate::handler::{
};
use crate::lock::DistLockRef;
use crate::metasrv::{ElectionRef, MetaSrv, MetaSrvOptions, SelectorRef, TABLE_ID_SEQ};
use crate::procedure::state_store::MetaStateStore;
use crate::selector::lease_based::LeaseBasedSelector;
use crate::sequence::Sequence;
use crate::service::store::kv::{KvStoreRef, ResettableKvStoreRef};
@@ -139,6 +142,10 @@ impl MetaSrvBuilder {
let table_id_sequence = Arc::new(Sequence::new(TABLE_ID_SEQ, 1024, 10, kv_store.clone()));
let config = ManagerConfig::default();
let state_store = Arc::new(MetaStateStore::new(kv_store.clone()));
let procedure_manager = Arc::new(LocalManager::new(config, state_store));
MetaSrv {
started,
options,
@@ -150,6 +157,7 @@ impl MetaSrvBuilder {
election,
meta_peer_client,
lock,
procedure_manager,
}
}
}

View File

@@ -0,0 +1,15 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
pub(crate) mod state_store;

View File

@@ -0,0 +1,194 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use api::v1::meta::{BatchDeleteRequest, PutRequest, RangeRequest};
use async_stream::try_stream;
use async_trait::async_trait;
use common_error::prelude::BoxedError;
use common_procedure::error::{
CorruptedDataSnafu, DeleteStatesSnafu, ListStateSnafu, PutStateSnafu,
};
use common_procedure::store::state_store::{KeyValueStream, StateStore};
use common_procedure::Result;
use snafu::ResultExt;
use crate::service::store::kv::KvStoreRef;
use crate::util;
const PROCEDURE_PREFIX: &str = "/__procedure__/";
fn with_prefix(key: &str) -> String {
format!("{PROCEDURE_PREFIX}{key}")
}
fn strip_prefix(key: &str) -> String {
key.trim_start_matches(PROCEDURE_PREFIX).to_string()
}
pub(crate) struct MetaStateStore {
kv_store: KvStoreRef,
max_size_per_range: i64,
}
impl MetaStateStore {
pub(crate) fn new(kv_store: KvStoreRef) -> Self {
Self {
kv_store,
max_size_per_range: -1,
}
}
}
#[async_trait]
impl StateStore for MetaStateStore {
async fn put(&self, key: &str, value: Vec<u8>) -> Result<()> {
let _ = self
.kv_store
.put(PutRequest {
key: with_prefix(key).into_bytes(),
value,
..Default::default()
})
.await
.map_err(BoxedError::new)
.context(PutStateSnafu { key })?;
Ok(())
}
async fn walk_top_down(&self, path: &str) -> Result<KeyValueStream> {
// extend their lifetimes to be used in the stream
let path = path.to_string();
let kv_store = self.kv_store.clone();
let limit = self.max_size_per_range;
let stream = try_stream! {
let mut key = with_prefix(path.trim_start_matches('/')).into_bytes();
let range_end = util::get_prefix_end_key(&key);
loop {
let req = RangeRequest {
key: key.clone(),
range_end: range_end.clone(),
limit,
..Default::default()
};
let resp = kv_store.range(req).await.map_err(BoxedError::new).with_context(|_|
ListStateSnafu { path: path.clone() }
)?;
let mut no_more_data = true;
if resp.more {
if let Some(last) = resp.kvs.last() {
key = util::get_prefix_end_key(&last.key);
no_more_data = false;
}
}
for kv in resp.kvs {
let key = String::from_utf8(kv.key).context(CorruptedDataSnafu)?;
let key = strip_prefix(&key);
let value = kv.value;
yield (key, value)
}
if no_more_data {
break;
}
}
};
Ok(Box::pin(stream))
}
async fn delete(&self, keys: &[String]) -> Result<()> {
let _ = self
.kv_store
.batch_delete(BatchDeleteRequest {
keys: keys
.iter()
.map(|x| with_prefix(x).into_bytes())
.collect::<Vec<_>>(),
..Default::default()
})
.await
.map_err(BoxedError::new)
.with_context(|_| DeleteStatesSnafu {
keys: format!("{:?}", keys.to_vec()),
})?;
Ok(())
}
}
#[cfg(test)]
mod tests {
use std::sync::Arc;
use common_procedure::store::state_store::KeyValue;
use futures::TryStreamExt;
use super::*;
use crate::service::store::memory::MemStore;
#[tokio::test]
async fn test_meta_state_store() {
let store = &MetaStateStore {
kv_store: Arc::new(MemStore::new()),
max_size_per_range: 1, // for testing "more" in range
};
let walk_top_down = async move |path: &str| -> Vec<KeyValue> {
let mut data = store
.walk_top_down(path)
.await
.unwrap()
.try_collect::<Vec<_>>()
.await
.unwrap();
data.sort_unstable_by(|a, b| a.0.cmp(&b.0));
data
};
let data = walk_top_down("/").await;
assert!(data.is_empty());
store.put("a/1", b"v1".to_vec()).await.unwrap();
store.put("a/2", b"v2".to_vec()).await.unwrap();
store.put("b/1", b"v3".to_vec()).await.unwrap();
let data = walk_top_down("/").await;
assert_eq!(
vec![
("a/1".to_string(), b"v1".to_vec()),
("a/2".to_string(), b"v2".to_vec()),
("b/1".to_string(), b"v3".to_vec())
],
data
);
let data = walk_top_down("a/").await;
assert_eq!(
vec![
("a/1".to_string(), b"v1".to_vec()),
("a/2".to_string(), b"v2".to_vec()),
],
data
);
store
.delete(&["a/2".to_string(), "b/1".to_string()])
.await
.unwrap();
let data = walk_top_down("a/").await;
assert_eq!(vec![("a/1".to_string(), b"v1".to_vec()),], data);
}
}

View File

@@ -18,6 +18,7 @@ use std::time::Duration;
use catalog::local::MemoryCatalogManager;
use catalog::CatalogManagerRef;
use common_procedure::local::{LocalManager, ManagerConfig};
use common_procedure::store::state_store::ObjectStateStore;
use common_procedure::ProcedureManagerRef;
use common_test_util::temp_dir::{create_temp_dir, TempDir};
use log_store::NoopLogStore;
@@ -62,11 +63,12 @@ impl TestEnv {
builder.root(&procedure_dir);
let object_store = ObjectStore::new(builder).unwrap().finish();
let procedure_manager = Arc::new(LocalManager::new(ManagerConfig {
object_store,
let config = ManagerConfig {
max_retry_times: 3,
retry_delay: Duration::from_secs(500),
}));
};
let state_store = Arc::new(ObjectStateStore::new(object_store));
let procedure_manager = Arc::new(LocalManager::new(config, state_store));
let catalog_manager = Arc::new(MemoryCatalogManager::default());