From e4de63625f0e288eda81c3615bdc9b82e26ce86d Mon Sep 17 00:00:00 2001 From: Weny Xu Date: Wed, 6 Sep 2023 15:35:13 +0800 Subject: [PATCH] refactor: refactor raft engine backend and state store (#2336) * refactor: remove redundant code * refactor: refactor RaftEngineBackend Error to common_meta::error::Error * refactor: refactor state store * chore: apply suggestions from CR --- src/catalog/src/remote/client.rs | 20 ++-- src/client/src/region.rs | 2 +- src/common/meta/src/error.rs | 30 +---- src/common/meta/src/lib.rs | 2 + .../meta/src}/state_store.rs | 110 +++++++++--------- src/log-store/src/raft_engine/backend.rs | 71 ++++++++--- src/meta-client/src/client.rs | 2 +- src/meta-srv/src/cache_invalidator.rs | 2 +- src/meta-srv/src/metasrv/builder.rs | 4 +- src/meta-srv/src/procedure.rs | 1 - src/meta-srv/src/service/store/kv.rs | 20 ++-- src/meta-srv/src/table_creator.rs | 2 +- src/meta-srv/src/test_util.rs | 4 +- 13 files changed, 141 insertions(+), 129 deletions(-) rename src/{meta-srv/src/procedure => common/meta/src}/state_store.rs (61%) diff --git a/src/catalog/src/remote/client.rs b/src/catalog/src/remote/client.rs index 981460cb74..aed8be54f1 100644 --- a/src/catalog/src/remote/client.rs +++ b/src/catalog/src/remote/client.rs @@ -19,7 +19,7 @@ use std::time::Duration; use common_error::ext::BoxedError; use common_meta::error::Error::{CacheNotGet, GetKvCache}; -use common_meta::error::{CacheNotGetSnafu, Error, MetaSrvSnafu, Result}; +use common_meta::error::{CacheNotGetSnafu, Error, ExternalSnafu, Result}; use common_meta::kv_backend::{KvBackend, KvBackendRef, TxnService}; use common_meta::rpc::store::{ BatchDeleteRequest, BatchDeleteResponse, BatchGetRequest, BatchGetResponse, BatchPutRequest, @@ -251,7 +251,7 @@ impl KvBackend for MetaKvBackend { .range(req) .await .map_err(BoxedError::new) - .context(MetaSrvSnafu) + .context(ExternalSnafu) } async fn get(&self, key: &[u8]) -> Result> { @@ -260,7 +260,7 @@ impl KvBackend for MetaKvBackend { .range(RangeRequest::new().with_key(key)) .await .map_err(BoxedError::new) - .context(MetaSrvSnafu)?; + .context(ExternalSnafu)?; Ok(response.take_kvs().get_mut(0).map(|kv| KeyValue { key: kv.take_key(), value: kv.take_value(), @@ -272,7 +272,7 @@ impl KvBackend for MetaKvBackend { .batch_put(req) .await .map_err(BoxedError::new) - .context(MetaSrvSnafu) + .context(ExternalSnafu) } async fn put(&self, req: PutRequest) -> Result { @@ -280,7 +280,7 @@ impl KvBackend for MetaKvBackend { .put(req) .await .map_err(BoxedError::new) - .context(MetaSrvSnafu) + .context(ExternalSnafu) } async fn delete_range(&self, req: DeleteRangeRequest) -> Result { @@ -288,7 +288,7 @@ impl KvBackend for MetaKvBackend { .delete_range(req) .await .map_err(BoxedError::new) - .context(MetaSrvSnafu) + .context(ExternalSnafu) } async fn batch_delete(&self, req: BatchDeleteRequest) -> Result { @@ -296,7 +296,7 @@ impl KvBackend for MetaKvBackend { .batch_delete(req) .await .map_err(BoxedError::new) - .context(MetaSrvSnafu) + .context(ExternalSnafu) } async fn batch_get(&self, req: BatchGetRequest) -> Result { @@ -304,7 +304,7 @@ impl KvBackend for MetaKvBackend { .batch_get(req) .await .map_err(BoxedError::new) - .context(MetaSrvSnafu) + .context(ExternalSnafu) } async fn compare_and_put( @@ -315,7 +315,7 @@ impl KvBackend for MetaKvBackend { .compare_and_put(request) .await .map_err(BoxedError::new) - .context(MetaSrvSnafu) + .context(ExternalSnafu) } async fn move_value(&self, req: MoveValueRequest) -> Result { @@ -323,7 +323,7 @@ impl KvBackend for MetaKvBackend { .move_value(req) .await .map_err(BoxedError::new) - .context(MetaSrvSnafu) + .context(ExternalSnafu) } fn as_any(&self) -> &dyn Any { diff --git a/src/client/src/region.rs b/src/client/src/region.rs index f8dc4f599f..44e5ce107f 100644 --- a/src/client/src/region.rs +++ b/src/client/src/region.rs @@ -49,7 +49,7 @@ impl Datanode for RegionRequester { source: BoxedError::new(err), } } else { - meta_error::Error::OperateRegion { + meta_error::Error::External { source: BoxedError::new(err), location: location!(), } diff --git a/src/common/meta/src/error.rs b/src/common/meta/src/error.rs index c7090cb5c1..c37b2b64ca 100644 --- a/src/common/meta/src/error.rs +++ b/src/common/meta/src/error.rs @@ -206,12 +206,6 @@ pub enum Error { #[snafu(display("Get null from cache, key: {}", key))] CacheNotGet { key: String, location: Location }, - #[snafu(display("{source}"))] - MetaSrv { - source: BoxedError, - location: Location, - }, - #[snafu(display("Etcd txn error: {err_msg}"))] EtcdTxnOpResponse { err_msg: String, location: Location }, @@ -234,24 +228,15 @@ pub enum Error { location: Location, }, - #[snafu(display("External error: {}", err_msg))] - External { location: Location, err_msg: String }, + #[snafu(display("{}", source))] + External { + location: Location, + source: BoxedError, + }, #[snafu(display("Invalid heartbeat response, location: {}", location))] InvalidHeartbeatResponse { location: Location }, - #[snafu(display("{}", source))] - OperateRegion { - location: Location, - source: BoxedError, - }, - - #[snafu(display("{}", source))] - ExecuteDdl { - location: Location, - source: BoxedError, - }, - #[snafu(display("Failed to operate on datanode: {}, source: {}", peer, source))] OperateDatanode { location: Location, @@ -278,7 +263,6 @@ impl ErrorExt for Error { | InvalidTableMetadata { .. } | MoveRegion { .. } | Unexpected { .. } - | External { .. } | TableInfoNotFound { .. } | NextSequence { .. } | SequenceOutOfRange { .. } @@ -309,12 +293,10 @@ impl ErrorExt for Error { SubmitProcedure { source, .. } | WaitProcedure { source, .. } => source.status_code(), RegisterProcedureLoader { source, .. } => source.status_code(), + External { source, .. } => source.status_code(), OperateDatanode { source, .. } => source.status_code(), Table { source, .. } => source.status_code(), RetryLater { source, .. } => source.status_code(), - OperateRegion { source, .. } => source.status_code(), - ExecuteDdl { source, .. } => source.status_code(), - MetaSrv { source, .. } => source.status_code(), InvalidCatalogValue { source, .. } => source.status_code(), ConvertAlterTableRequest { source, .. } => source.status_code(), } diff --git a/src/common/meta/src/lib.rs b/src/common/meta/src/lib.rs index fb002207d6..4387ceaeaf 100644 --- a/src/common/meta/src/lib.rs +++ b/src/common/meta/src/lib.rs @@ -13,6 +13,7 @@ // limitations under the License. #![feature(btree_extract_if)] +#![feature(async_closure)] pub mod cache_invalidator; pub mod datanode_manager; @@ -32,6 +33,7 @@ pub mod peer; pub mod range_stream; pub mod rpc; pub mod sequence; +pub mod state_store; pub mod table_name; pub mod util; diff --git a/src/meta-srv/src/procedure/state_store.rs b/src/common/meta/src/state_store.rs similarity index 61% rename from src/meta-srv/src/procedure/state_store.rs rename to src/common/meta/src/state_store.rs index c2af6b3259..28e5d88273 100644 --- a/src/meta-srv/src/procedure/state_store.rs +++ b/src/common/meta/src/state_store.rs @@ -12,19 +12,23 @@ // See the License for the specific language governing permissions and // limitations under the License. -use async_stream::try_stream; +use std::sync::Arc; + use async_trait::async_trait; use common_error::ext::BoxedError; -use common_meta::rpc::store::{BatchDeleteRequest, PutRequest, RangeRequest}; -use common_meta::util; -use common_procedure::error::{ - CorruptedDataSnafu, DeleteStatesSnafu, ListStateSnafu, PutStateSnafu, -}; +use common_procedure::error::{DeleteStatesSnafu, ListStateSnafu, PutStateSnafu}; use common_procedure::store::state_store::{KeyValueStream, StateStore}; -use common_procedure::Result; +use common_procedure::Result as ProcedureResult; +use futures::StreamExt; use snafu::ResultExt; -use crate::service::store::kv::KvStoreRef; +use crate::error::Result; +use crate::kv_backend::KvBackendRef; +use crate::range_stream::PaginationStream; +use crate::rpc::store::{BatchDeleteRequest, PutRequest, RangeRequest}; +use crate::rpc::KeyValue; + +const DELIMITER: &str = "/"; const PROCEDURE_PREFIX: &str = "/__procedure__/"; @@ -36,25 +40,35 @@ fn strip_prefix(key: &str) -> String { key.trim_start_matches(PROCEDURE_PREFIX).to_string() } -pub(crate) struct MetaStateStore { - kv_store: KvStoreRef, - max_size_per_range: i64, +pub struct KvStateStore { + kv_backend: KvBackendRef, + // limit is set to 0, it is treated as no limit. + max_size_per_range: usize, } -impl MetaStateStore { - pub(crate) fn new(kv_store: KvStoreRef) -> Self { +impl KvStateStore { + // `max_size_per_range` is set to 0, it is treated as no limit. + pub fn new(kv_backend: KvBackendRef) -> Self { Self { - kv_store, - max_size_per_range: -1, + kv_backend, + max_size_per_range: 0, } } } +fn decode_kv(kv: KeyValue) -> Result<(String, Vec)> { + let key = String::from_utf8_lossy(&kv.key); + let key = strip_prefix(&key); + let value = kv.value; + + Ok((key, value)) +} + #[async_trait] -impl StateStore for MetaStateStore { - async fn put(&self, key: &str, value: Vec) -> Result<()> { +impl StateStore for KvStateStore { + async fn put(&self, key: &str, value: Vec) -> ProcedureResult<()> { let _ = self - .kv_store + .kv_backend .put(PutRequest { key: with_prefix(key).into_bytes(), value, @@ -66,52 +80,32 @@ impl StateStore for MetaStateStore { Ok(()) } - async fn walk_top_down(&self, path: &str) -> Result { + async fn walk_top_down(&self, path: &str) -> ProcedureResult { // extend their lifetimes to be used in the stream let path = path.to_string(); - let kv_store = self.kv_store.clone(); - let limit = self.max_size_per_range; - let stream = try_stream! { - let mut key = with_prefix(path.trim_start_matches('/')).into_bytes(); - let range_end = util::get_prefix_end_key(&key); - loop { - let req = RangeRequest { - key: key.clone(), - range_end: range_end.clone(), - limit, - ..Default::default() - }; - let resp = kv_store.range(req).await.map_err(BoxedError::new).with_context(|_| - ListStateSnafu { path: path.clone() } - )?; + let key = with_prefix(path.trim_start_matches(DELIMITER)).into_bytes(); + let req = RangeRequest::new().with_prefix(key); - let mut no_more_data = true; - if resp.more { - if let Some(last) = resp.kvs.last() { - key = util::get_prefix_end_key(&last.key); - no_more_data = false; - } - } + let stream = PaginationStream::new( + self.kv_backend.clone(), + req, + self.max_size_per_range, + Arc::new(decode_kv), + ); - for kv in resp.kvs { - let key = String::from_utf8(kv.key).context(CorruptedDataSnafu)?; - let key = strip_prefix(&key); - let value = kv.value; - yield (key, value) - } + let stream = stream.map(move |r| { + let path = path.clone(); + r.map_err(BoxedError::new) + .with_context(|_| ListStateSnafu { path }) + }); - if no_more_data { - break; - } - } - }; Ok(Box::pin(stream)) } - async fn batch_delete(&self, keys: &[String]) -> Result<()> { + async fn batch_delete(&self, keys: &[String]) -> ProcedureResult<()> { let _ = self - .kv_store + .kv_backend .batch_delete(BatchDeleteRequest { keys: keys .iter() @@ -127,7 +121,7 @@ impl StateStore for MetaStateStore { Ok(()) } - async fn delete(&self, key: &str) -> Result<()> { + async fn delete(&self, key: &str) -> ProcedureResult<()> { self.batch_delete(&[key.to_string()]).await } } @@ -140,12 +134,12 @@ mod tests { use futures::TryStreamExt; use super::*; - use crate::service::store::memory::MemStore; + use crate::kv_backend::memory::MemoryKvBackend; #[tokio::test] async fn test_meta_state_store() { - let store = &MetaStateStore { - kv_store: Arc::new(MemStore::new()), + let store = &KvStateStore { + kv_backend: Arc::new(MemoryKvBackend::new()), max_size_per_range: 1, // for testing "more" in range }; diff --git a/src/log-store/src/raft_engine/backend.rs b/src/log-store/src/raft_engine/backend.rs index 090f9fc55f..d7479799b5 100644 --- a/src/log-store/src/raft_engine/backend.rs +++ b/src/log-store/src/raft_engine/backend.rs @@ -17,6 +17,8 @@ use std::any::Any; use std::sync::RwLock; +use common_error::ext::BoxedError; +use common_meta::error as meta_error; use common_meta::kv_backend::txn::{Txn, TxnOp, TxnOpResponse, TxnRequest, TxnResponse}; use common_meta::kv_backend::{KvBackend, TxnService}; use common_meta::rpc::store::{ @@ -29,7 +31,6 @@ use common_meta::rpc::KeyValue; use raft_engine::{Engine, LogBatch}; use snafu::ResultExt; -use crate::error; use crate::error::RaftEngineSnafu; pub(crate) const SYSTEM_NAMESPACE: u64 = 0; @@ -41,9 +42,9 @@ pub struct RaftEngineBackend { #[async_trait::async_trait] impl TxnService for RaftEngineBackend { - type Error = error::Error; + type Error = meta_error::Error; - async fn txn(&self, txn: Txn) -> Result { + async fn txn(&self, txn: Txn) -> meta_error::Result { let TxnRequest { compare, success, @@ -65,7 +66,9 @@ impl TxnService for RaftEngineBackend { TxnOp::Put(key, value) => { batch .put(SYSTEM_NAMESPACE, key.clone(), value) - .context(RaftEngineSnafu)?; + .context(RaftEngineSnafu) + .map_err(BoxedError::new) + .context(meta_error::ExternalSnafu)?; Ok(TxnOpResponse::ResponsePut(PutResponse { prev_kv: None })) } @@ -98,9 +101,13 @@ impl TxnService for RaftEngineBackend { let responses = if succeeded { success } else { failure } .into_iter() .map(do_txn) - .collect::>()?; + .collect::>()?; - engine.write(&mut batch, false).context(RaftEngineSnafu)?; + engine + .write(&mut batch, false) + .context(RaftEngineSnafu) + .map_err(BoxedError::new) + .context(meta_error::ExternalSnafu)?; Ok(TxnResponse { succeeded, @@ -137,7 +144,9 @@ impl KvBackend for RaftEngineBackend { true }, ) - .context(RaftEngineSnafu)?; + .context(RaftEngineSnafu) + .map_err(BoxedError::new) + .context(meta_error::ExternalSnafu)?; Ok(RangeResponse { kvs: res, more: false, @@ -179,10 +188,16 @@ impl KvBackend for RaftEngineBackend { } batch .put(SYSTEM_NAMESPACE, kv.key, kv.value) - .context(RaftEngineSnafu)?; + .context(RaftEngineSnafu) + .map_err(BoxedError::new) + .context(meta_error::ExternalSnafu)?; } - engine.write(&mut batch, false).context(RaftEngineSnafu)?; + engine + .write(&mut batch, false) + .context(RaftEngineSnafu) + .map_err(BoxedError::new) + .context(meta_error::ExternalSnafu)?; Ok(BatchPutResponse { prev_kvs }) } @@ -222,8 +237,14 @@ impl KvBackend for RaftEngineBackend { if eq { batch .put(SYSTEM_NAMESPACE, key, value) - .context(RaftEngineSnafu)?; - engine.write(&mut batch, false).context(RaftEngineSnafu)?; + .context(RaftEngineSnafu) + .map_err(BoxedError::new) + .context(meta_error::ExternalSnafu)?; + engine + .write(&mut batch, false) + .context(RaftEngineSnafu) + .map_err(BoxedError::new) + .context(meta_error::ExternalSnafu)?; } Ok(CompareAndPutResponse { success: eq, @@ -284,7 +305,11 @@ impl KvBackend for RaftEngineBackend { batch.delete(SYSTEM_NAMESPACE, key); } let engine = self.engine.read().unwrap(); - engine.write(&mut batch, false).context(RaftEngineSnafu)?; + engine + .write(&mut batch, false) + .context(RaftEngineSnafu) + .map_err(BoxedError::new) + .context(meta_error::ExternalSnafu)?; Ok(BatchDeleteResponse { prev_kvs }) } @@ -312,7 +337,7 @@ impl KvBackend for RaftEngineBackend { } } -fn engine_get(engine: &Engine, key: &[u8]) -> error::Result> { +fn engine_get(engine: &Engine, key: &[u8]) -> meta_error::Result> { let res = engine.get(SYSTEM_NAMESPACE, key); Ok(res.map(|value| KeyValue { key: key.to_vec(), @@ -320,19 +345,29 @@ fn engine_get(engine: &Engine, key: &[u8]) -> error::Result> { })) } -fn engine_put(engine: &Engine, key: Vec, value: Vec) -> error::Result<()> { +fn engine_put(engine: &Engine, key: Vec, value: Vec) -> meta_error::Result<()> { let mut batch = LogBatch::with_capacity(1); batch .put(SYSTEM_NAMESPACE, key, value) - .context(RaftEngineSnafu)?; - engine.write(&mut batch, false).context(RaftEngineSnafu)?; + .context(RaftEngineSnafu) + .map_err(BoxedError::new) + .context(meta_error::ExternalSnafu)?; + engine + .write(&mut batch, false) + .context(RaftEngineSnafu) + .map_err(BoxedError::new) + .context(meta_error::ExternalSnafu)?; Ok(()) } -fn engine_delete(engine: &Engine, key: &[u8]) -> error::Result<()> { +fn engine_delete(engine: &Engine, key: &[u8]) -> meta_error::Result<()> { let mut batch = LogBatch::with_capacity(1); batch.delete(SYSTEM_NAMESPACE, key.to_vec()); - engine.write(&mut batch, false).context(RaftEngineSnafu)?; + engine + .write(&mut batch, false) + .context(RaftEngineSnafu) + .map_err(BoxedError::new) + .context(meta_error::ExternalSnafu)?; Ok(()) } diff --git a/src/meta-client/src/client.rs b/src/meta-client/src/client.rs index 0648ff282f..015e87c9bb 100644 --- a/src/meta-client/src/client.rs +++ b/src/meta-client/src/client.rs @@ -187,7 +187,7 @@ impl DdlExecutor for MetaClient { self.submit_ddl_task(request) .await .map_err(BoxedError::new) - .context(meta_error::ExecuteDdlSnafu) + .context(meta_error::ExternalSnafu) } } diff --git a/src/meta-srv/src/cache_invalidator.rs b/src/meta-srv/src/cache_invalidator.rs index be3cab10ec..fa130ef2d1 100644 --- a/src/meta-srv/src/cache_invalidator.rs +++ b/src/meta-srv/src/cache_invalidator.rs @@ -59,6 +59,6 @@ impl CacheInvalidator for MetasrvCacheInvalidator { .broadcast(&BroadcastChannel::Frontend, msg) .await .map_err(BoxedError::new) - .context(meta_error::MetaSrvSnafu) + .context(meta_error::ExternalSnafu) } } diff --git a/src/meta-srv/src/metasrv/builder.rs b/src/meta-srv/src/metasrv/builder.rs index 93882563bd..196c1ce14b 100644 --- a/src/meta-srv/src/metasrv/builder.rs +++ b/src/meta-srv/src/metasrv/builder.rs @@ -21,6 +21,7 @@ use common_grpc::channel_manager::ChannelConfig; use common_meta::ddl_manager::{DdlManager, DdlManagerRef}; use common_meta::key::{TableMetadataManager, TableMetadataManagerRef}; use common_meta::sequence::{Sequence, SequenceRef}; +use common_meta::state_store::KvStateStore; use common_procedure::local::{LocalManager, ManagerConfig}; use common_procedure::ProcedureManagerRef; @@ -47,7 +48,6 @@ use crate::metasrv::{ ElectionRef, MetaSrv, MetaSrvOptions, MetasrvInfo, SelectorContext, SelectorRef, TABLE_ID_SEQ, }; use crate::procedure::region_failover::RegionFailoverManager; -use crate::procedure::state_store::MetaStateStore; use crate::pubsub::{PublishRef, SubscribeManagerRef}; use crate::selector::lease_based::LeaseBasedSelector; use crate::service::mailbox::MailboxRef; @@ -329,7 +329,7 @@ fn build_procedure_manager(options: &MetaSrvOptions, kv_store: &KvStoreRef) -> P retry_delay: options.procedure.retry_delay, ..Default::default() }; - let state_store = Arc::new(MetaStateStore::new(kv_store.clone())); + let state_store = Arc::new(KvStateStore::new(KvBackendAdapter::wrap(kv_store.clone()))); Arc::new(LocalManager::new(manager_config, state_store)) } diff --git a/src/meta-srv/src/procedure.rs b/src/meta-srv/src/procedure.rs index 358fe3c46c..8e82b7cf16 100644 --- a/src/meta-srv/src/procedure.rs +++ b/src/meta-srv/src/procedure.rs @@ -13,7 +13,6 @@ // limitations under the License. pub mod region_failover; -pub(crate) mod state_store; #[cfg(test)] mod tests; mod utils; diff --git a/src/meta-srv/src/service/store/kv.rs b/src/meta-srv/src/service/store/kv.rs index ebe3b27e0e..a661436039 100644 --- a/src/meta-srv/src/service/store/kv.rs +++ b/src/meta-srv/src/service/store/kv.rs @@ -17,7 +17,7 @@ use std::sync::Arc; use async_trait::async_trait; use common_error::ext::BoxedError; -use common_meta::error::MetaSrvSnafu; +use common_meta::error::ExternalSnafu; use common_meta::kv_backend::txn::{Txn, TxnResponse}; use common_meta::kv_backend::{KvBackend, KvBackendRef, TxnService}; use common_meta::rpc::store::{ @@ -55,7 +55,7 @@ impl TxnService for KvBackendAdapter { .txn(txn) .await .map_err(BoxedError::new) - .context(MetaSrvSnafu) + .context(ExternalSnafu) } } @@ -74,7 +74,7 @@ impl KvBackend for KvBackendAdapter { .range(req) .await .map_err(BoxedError::new) - .context(MetaSrvSnafu) + .context(ExternalSnafu) } async fn put(&self, req: PutRequest) -> Result { @@ -82,7 +82,7 @@ impl KvBackend for KvBackendAdapter { .put(req) .await .map_err(BoxedError::new) - .context(MetaSrvSnafu) + .context(ExternalSnafu) } async fn batch_put(&self, req: BatchPutRequest) -> Result { @@ -90,7 +90,7 @@ impl KvBackend for KvBackendAdapter { .batch_put(req) .await .map_err(BoxedError::new) - .context(MetaSrvSnafu) + .context(ExternalSnafu) } async fn batch_get(&self, req: BatchGetRequest) -> Result { @@ -98,7 +98,7 @@ impl KvBackend for KvBackendAdapter { .batch_get(req) .await .map_err(BoxedError::new) - .context(MetaSrvSnafu) + .context(ExternalSnafu) } async fn compare_and_put( @@ -109,7 +109,7 @@ impl KvBackend for KvBackendAdapter { .compare_and_put(req) .await .map_err(BoxedError::new) - .context(MetaSrvSnafu) + .context(ExternalSnafu) } async fn delete_range( @@ -120,7 +120,7 @@ impl KvBackend for KvBackendAdapter { .delete_range(req) .await .map_err(BoxedError::new) - .context(MetaSrvSnafu) + .context(ExternalSnafu) } async fn batch_delete( @@ -131,7 +131,7 @@ impl KvBackend for KvBackendAdapter { .batch_delete(req) .await .map_err(BoxedError::new) - .context(MetaSrvSnafu) + .context(ExternalSnafu) } async fn move_value(&self, req: MoveValueRequest) -> Result { @@ -139,6 +139,6 @@ impl KvBackend for KvBackendAdapter { .move_value(req) .await .map_err(BoxedError::new) - .context(MetaSrvSnafu) + .context(ExternalSnafu) } } diff --git a/src/meta-srv/src/table_creator.rs b/src/meta-srv/src/table_creator.rs index af417dfa96..1c56dd07aa 100644 --- a/src/meta-srv/src/table_creator.rs +++ b/src/meta-srv/src/table_creator.rs @@ -65,7 +65,7 @@ impl TableCreator for MetaSrvTableCreator { ) .await .map_err(BoxedError::new) - .context(meta_error::MetaSrvSnafu) + .context(meta_error::ExternalSnafu) } } diff --git a/src/meta-srv/src/test_util.rs b/src/meta-srv/src/test_util.rs index 5d2f928dc3..e45ed3cbd7 100644 --- a/src/meta-srv/src/test_util.rs +++ b/src/meta-srv/src/test_util.rs @@ -18,6 +18,7 @@ use common_meta::key::TableMetadataManager; use common_meta::peer::Peer; use common_meta::rpc::router::{Region, RegionRoute}; use common_meta::sequence::Sequence; +use common_meta::state_store::KvStateStore; use common_procedure::local::{LocalManager, ManagerConfig}; use crate::cluster::MetaPeerClientBuilder; @@ -25,7 +26,6 @@ use crate::handler::{HeartbeatMailbox, Pushers}; use crate::lock::memory::MemLock; use crate::metasrv::SelectorContext; use crate::procedure::region_failover::RegionFailoverManager; -use crate::procedure::state_store::MetaStateStore; use crate::selector::lease_based::LeaseBasedSelector; use crate::service::store::kv::KvBackendAdapter; use crate::service::store::memory::MemStore; @@ -57,7 +57,7 @@ pub(crate) fn create_region_failover_manager() -> Arc { ); let mailbox = HeartbeatMailbox::create(pushers, mailbox_sequence); - let state_store = Arc::new(MetaStateStore::new(kv_store.clone())); + let state_store = Arc::new(KvStateStore::new(KvBackendAdapter::wrap(kv_store.clone()))); let procedure_manager = Arc::new(LocalManager::new(ManagerConfig::default(), state_store)); let in_memory = Arc::new(MemStore::new());