refactor: refactor raft engine backend and state store (#2336)

* refactor: remove redundant code

* refactor: refactor RaftEngineBackend Error to common_meta::error::Error

* refactor: refactor state store

* chore: apply suggestions from CR
This commit is contained in:
Weny Xu
2023-09-06 15:35:13 +08:00
committed by Ruihang Xia
parent 4b2b59c31b
commit e4de63625f
13 changed files with 141 additions and 129 deletions

View File

@@ -19,7 +19,7 @@ use std::time::Duration;
use common_error::ext::BoxedError;
use common_meta::error::Error::{CacheNotGet, GetKvCache};
use common_meta::error::{CacheNotGetSnafu, Error, MetaSrvSnafu, Result};
use common_meta::error::{CacheNotGetSnafu, Error, ExternalSnafu, Result};
use common_meta::kv_backend::{KvBackend, KvBackendRef, TxnService};
use common_meta::rpc::store::{
BatchDeleteRequest, BatchDeleteResponse, BatchGetRequest, BatchGetResponse, BatchPutRequest,
@@ -251,7 +251,7 @@ impl KvBackend for MetaKvBackend {
.range(req)
.await
.map_err(BoxedError::new)
.context(MetaSrvSnafu)
.context(ExternalSnafu)
}
async fn get(&self, key: &[u8]) -> Result<Option<KeyValue>> {
@@ -260,7 +260,7 @@ impl KvBackend for MetaKvBackend {
.range(RangeRequest::new().with_key(key))
.await
.map_err(BoxedError::new)
.context(MetaSrvSnafu)?;
.context(ExternalSnafu)?;
Ok(response.take_kvs().get_mut(0).map(|kv| KeyValue {
key: kv.take_key(),
value: kv.take_value(),
@@ -272,7 +272,7 @@ impl KvBackend for MetaKvBackend {
.batch_put(req)
.await
.map_err(BoxedError::new)
.context(MetaSrvSnafu)
.context(ExternalSnafu)
}
async fn put(&self, req: PutRequest) -> Result<PutResponse> {
@@ -280,7 +280,7 @@ impl KvBackend for MetaKvBackend {
.put(req)
.await
.map_err(BoxedError::new)
.context(MetaSrvSnafu)
.context(ExternalSnafu)
}
async fn delete_range(&self, req: DeleteRangeRequest) -> Result<DeleteRangeResponse> {
@@ -288,7 +288,7 @@ impl KvBackend for MetaKvBackend {
.delete_range(req)
.await
.map_err(BoxedError::new)
.context(MetaSrvSnafu)
.context(ExternalSnafu)
}
async fn batch_delete(&self, req: BatchDeleteRequest) -> Result<BatchDeleteResponse> {
@@ -296,7 +296,7 @@ impl KvBackend for MetaKvBackend {
.batch_delete(req)
.await
.map_err(BoxedError::new)
.context(MetaSrvSnafu)
.context(ExternalSnafu)
}
async fn batch_get(&self, req: BatchGetRequest) -> Result<BatchGetResponse> {
@@ -304,7 +304,7 @@ impl KvBackend for MetaKvBackend {
.batch_get(req)
.await
.map_err(BoxedError::new)
.context(MetaSrvSnafu)
.context(ExternalSnafu)
}
async fn compare_and_put(
@@ -315,7 +315,7 @@ impl KvBackend for MetaKvBackend {
.compare_and_put(request)
.await
.map_err(BoxedError::new)
.context(MetaSrvSnafu)
.context(ExternalSnafu)
}
async fn move_value(&self, req: MoveValueRequest) -> Result<MoveValueResponse> {
@@ -323,7 +323,7 @@ impl KvBackend for MetaKvBackend {
.move_value(req)
.await
.map_err(BoxedError::new)
.context(MetaSrvSnafu)
.context(ExternalSnafu)
}
fn as_any(&self) -> &dyn Any {

View File

@@ -49,7 +49,7 @@ impl Datanode for RegionRequester {
source: BoxedError::new(err),
}
} else {
meta_error::Error::OperateRegion {
meta_error::Error::External {
source: BoxedError::new(err),
location: location!(),
}

View File

@@ -206,12 +206,6 @@ pub enum Error {
#[snafu(display("Get null from cache, key: {}", key))]
CacheNotGet { key: String, location: Location },
#[snafu(display("{source}"))]
MetaSrv {
source: BoxedError,
location: Location,
},
#[snafu(display("Etcd txn error: {err_msg}"))]
EtcdTxnOpResponse { err_msg: String, location: Location },
@@ -234,24 +228,15 @@ pub enum Error {
location: Location,
},
#[snafu(display("External error: {}", err_msg))]
External { location: Location, err_msg: String },
#[snafu(display("{}", source))]
External {
location: Location,
source: BoxedError,
},
#[snafu(display("Invalid heartbeat response, location: {}", location))]
InvalidHeartbeatResponse { location: Location },
#[snafu(display("{}", source))]
OperateRegion {
location: Location,
source: BoxedError,
},
#[snafu(display("{}", source))]
ExecuteDdl {
location: Location,
source: BoxedError,
},
#[snafu(display("Failed to operate on datanode: {}, source: {}", peer, source))]
OperateDatanode {
location: Location,
@@ -278,7 +263,6 @@ impl ErrorExt for Error {
| InvalidTableMetadata { .. }
| MoveRegion { .. }
| Unexpected { .. }
| External { .. }
| TableInfoNotFound { .. }
| NextSequence { .. }
| SequenceOutOfRange { .. }
@@ -309,12 +293,10 @@ impl ErrorExt for Error {
SubmitProcedure { source, .. } | WaitProcedure { source, .. } => source.status_code(),
RegisterProcedureLoader { source, .. } => source.status_code(),
External { source, .. } => source.status_code(),
OperateDatanode { source, .. } => source.status_code(),
Table { source, .. } => source.status_code(),
RetryLater { source, .. } => source.status_code(),
OperateRegion { source, .. } => source.status_code(),
ExecuteDdl { source, .. } => source.status_code(),
MetaSrv { source, .. } => source.status_code(),
InvalidCatalogValue { source, .. } => source.status_code(),
ConvertAlterTableRequest { source, .. } => source.status_code(),
}

View File

@@ -13,6 +13,7 @@
// limitations under the License.
#![feature(btree_extract_if)]
#![feature(async_closure)]
pub mod cache_invalidator;
pub mod datanode_manager;
@@ -32,6 +33,7 @@ pub mod peer;
pub mod range_stream;
pub mod rpc;
pub mod sequence;
pub mod state_store;
pub mod table_name;
pub mod util;

View File

@@ -12,19 +12,23 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use async_stream::try_stream;
use std::sync::Arc;
use async_trait::async_trait;
use common_error::ext::BoxedError;
use common_meta::rpc::store::{BatchDeleteRequest, PutRequest, RangeRequest};
use common_meta::util;
use common_procedure::error::{
CorruptedDataSnafu, DeleteStatesSnafu, ListStateSnafu, PutStateSnafu,
};
use common_procedure::error::{DeleteStatesSnafu, ListStateSnafu, PutStateSnafu};
use common_procedure::store::state_store::{KeyValueStream, StateStore};
use common_procedure::Result;
use common_procedure::Result as ProcedureResult;
use futures::StreamExt;
use snafu::ResultExt;
use crate::service::store::kv::KvStoreRef;
use crate::error::Result;
use crate::kv_backend::KvBackendRef;
use crate::range_stream::PaginationStream;
use crate::rpc::store::{BatchDeleteRequest, PutRequest, RangeRequest};
use crate::rpc::KeyValue;
const DELIMITER: &str = "/";
const PROCEDURE_PREFIX: &str = "/__procedure__/";
@@ -36,25 +40,35 @@ fn strip_prefix(key: &str) -> String {
key.trim_start_matches(PROCEDURE_PREFIX).to_string()
}
pub(crate) struct MetaStateStore {
kv_store: KvStoreRef,
max_size_per_range: i64,
pub struct KvStateStore {
kv_backend: KvBackendRef,
// limit is set to 0, it is treated as no limit.
max_size_per_range: usize,
}
impl MetaStateStore {
pub(crate) fn new(kv_store: KvStoreRef) -> Self {
impl KvStateStore {
// `max_size_per_range` is set to 0, it is treated as no limit.
pub fn new(kv_backend: KvBackendRef) -> Self {
Self {
kv_store,
max_size_per_range: -1,
kv_backend,
max_size_per_range: 0,
}
}
}
fn decode_kv(kv: KeyValue) -> Result<(String, Vec<u8>)> {
let key = String::from_utf8_lossy(&kv.key);
let key = strip_prefix(&key);
let value = kv.value;
Ok((key, value))
}
#[async_trait]
impl StateStore for MetaStateStore {
async fn put(&self, key: &str, value: Vec<u8>) -> Result<()> {
impl StateStore for KvStateStore {
async fn put(&self, key: &str, value: Vec<u8>) -> ProcedureResult<()> {
let _ = self
.kv_store
.kv_backend
.put(PutRequest {
key: with_prefix(key).into_bytes(),
value,
@@ -66,52 +80,32 @@ impl StateStore for MetaStateStore {
Ok(())
}
async fn walk_top_down(&self, path: &str) -> Result<KeyValueStream> {
async fn walk_top_down(&self, path: &str) -> ProcedureResult<KeyValueStream> {
// extend their lifetimes to be used in the stream
let path = path.to_string();
let kv_store = self.kv_store.clone();
let limit = self.max_size_per_range;
let stream = try_stream! {
let mut key = with_prefix(path.trim_start_matches('/')).into_bytes();
let range_end = util::get_prefix_end_key(&key);
loop {
let req = RangeRequest {
key: key.clone(),
range_end: range_end.clone(),
limit,
..Default::default()
};
let resp = kv_store.range(req).await.map_err(BoxedError::new).with_context(|_|
ListStateSnafu { path: path.clone() }
)?;
let key = with_prefix(path.trim_start_matches(DELIMITER)).into_bytes();
let req = RangeRequest::new().with_prefix(key);
let mut no_more_data = true;
if resp.more {
if let Some(last) = resp.kvs.last() {
key = util::get_prefix_end_key(&last.key);
no_more_data = false;
}
}
let stream = PaginationStream::new(
self.kv_backend.clone(),
req,
self.max_size_per_range,
Arc::new(decode_kv),
);
for kv in resp.kvs {
let key = String::from_utf8(kv.key).context(CorruptedDataSnafu)?;
let key = strip_prefix(&key);
let value = kv.value;
yield (key, value)
}
let stream = stream.map(move |r| {
let path = path.clone();
r.map_err(BoxedError::new)
.with_context(|_| ListStateSnafu { path })
});
if no_more_data {
break;
}
}
};
Ok(Box::pin(stream))
}
async fn batch_delete(&self, keys: &[String]) -> Result<()> {
async fn batch_delete(&self, keys: &[String]) -> ProcedureResult<()> {
let _ = self
.kv_store
.kv_backend
.batch_delete(BatchDeleteRequest {
keys: keys
.iter()
@@ -127,7 +121,7 @@ impl StateStore for MetaStateStore {
Ok(())
}
async fn delete(&self, key: &str) -> Result<()> {
async fn delete(&self, key: &str) -> ProcedureResult<()> {
self.batch_delete(&[key.to_string()]).await
}
}
@@ -140,12 +134,12 @@ mod tests {
use futures::TryStreamExt;
use super::*;
use crate::service::store::memory::MemStore;
use crate::kv_backend::memory::MemoryKvBackend;
#[tokio::test]
async fn test_meta_state_store() {
let store = &MetaStateStore {
kv_store: Arc::new(MemStore::new()),
let store = &KvStateStore {
kv_backend: Arc::new(MemoryKvBackend::new()),
max_size_per_range: 1, // for testing "more" in range
};

View File

@@ -17,6 +17,8 @@
use std::any::Any;
use std::sync::RwLock;
use common_error::ext::BoxedError;
use common_meta::error as meta_error;
use common_meta::kv_backend::txn::{Txn, TxnOp, TxnOpResponse, TxnRequest, TxnResponse};
use common_meta::kv_backend::{KvBackend, TxnService};
use common_meta::rpc::store::{
@@ -29,7 +31,6 @@ use common_meta::rpc::KeyValue;
use raft_engine::{Engine, LogBatch};
use snafu::ResultExt;
use crate::error;
use crate::error::RaftEngineSnafu;
pub(crate) const SYSTEM_NAMESPACE: u64 = 0;
@@ -41,9 +42,9 @@ pub struct RaftEngineBackend {
#[async_trait::async_trait]
impl TxnService for RaftEngineBackend {
type Error = error::Error;
type Error = meta_error::Error;
async fn txn(&self, txn: Txn) -> Result<TxnResponse, Self::Error> {
async fn txn(&self, txn: Txn) -> meta_error::Result<TxnResponse> {
let TxnRequest {
compare,
success,
@@ -65,7 +66,9 @@ impl TxnService for RaftEngineBackend {
TxnOp::Put(key, value) => {
batch
.put(SYSTEM_NAMESPACE, key.clone(), value)
.context(RaftEngineSnafu)?;
.context(RaftEngineSnafu)
.map_err(BoxedError::new)
.context(meta_error::ExternalSnafu)?;
Ok(TxnOpResponse::ResponsePut(PutResponse { prev_kv: None }))
}
@@ -98,9 +101,13 @@ impl TxnService for RaftEngineBackend {
let responses = if succeeded { success } else { failure }
.into_iter()
.map(do_txn)
.collect::<error::Result<_>>()?;
.collect::<meta_error::Result<_>>()?;
engine.write(&mut batch, false).context(RaftEngineSnafu)?;
engine
.write(&mut batch, false)
.context(RaftEngineSnafu)
.map_err(BoxedError::new)
.context(meta_error::ExternalSnafu)?;
Ok(TxnResponse {
succeeded,
@@ -137,7 +144,9 @@ impl KvBackend for RaftEngineBackend {
true
},
)
.context(RaftEngineSnafu)?;
.context(RaftEngineSnafu)
.map_err(BoxedError::new)
.context(meta_error::ExternalSnafu)?;
Ok(RangeResponse {
kvs: res,
more: false,
@@ -179,10 +188,16 @@ impl KvBackend for RaftEngineBackend {
}
batch
.put(SYSTEM_NAMESPACE, kv.key, kv.value)
.context(RaftEngineSnafu)?;
.context(RaftEngineSnafu)
.map_err(BoxedError::new)
.context(meta_error::ExternalSnafu)?;
}
engine.write(&mut batch, false).context(RaftEngineSnafu)?;
engine
.write(&mut batch, false)
.context(RaftEngineSnafu)
.map_err(BoxedError::new)
.context(meta_error::ExternalSnafu)?;
Ok(BatchPutResponse { prev_kvs })
}
@@ -222,8 +237,14 @@ impl KvBackend for RaftEngineBackend {
if eq {
batch
.put(SYSTEM_NAMESPACE, key, value)
.context(RaftEngineSnafu)?;
engine.write(&mut batch, false).context(RaftEngineSnafu)?;
.context(RaftEngineSnafu)
.map_err(BoxedError::new)
.context(meta_error::ExternalSnafu)?;
engine
.write(&mut batch, false)
.context(RaftEngineSnafu)
.map_err(BoxedError::new)
.context(meta_error::ExternalSnafu)?;
}
Ok(CompareAndPutResponse {
success: eq,
@@ -284,7 +305,11 @@ impl KvBackend for RaftEngineBackend {
batch.delete(SYSTEM_NAMESPACE, key);
}
let engine = self.engine.read().unwrap();
engine.write(&mut batch, false).context(RaftEngineSnafu)?;
engine
.write(&mut batch, false)
.context(RaftEngineSnafu)
.map_err(BoxedError::new)
.context(meta_error::ExternalSnafu)?;
Ok(BatchDeleteResponse { prev_kvs })
}
@@ -312,7 +337,7 @@ impl KvBackend for RaftEngineBackend {
}
}
fn engine_get(engine: &Engine, key: &[u8]) -> error::Result<Option<KeyValue>> {
fn engine_get(engine: &Engine, key: &[u8]) -> meta_error::Result<Option<KeyValue>> {
let res = engine.get(SYSTEM_NAMESPACE, key);
Ok(res.map(|value| KeyValue {
key: key.to_vec(),
@@ -320,19 +345,29 @@ fn engine_get(engine: &Engine, key: &[u8]) -> error::Result<Option<KeyValue>> {
}))
}
fn engine_put(engine: &Engine, key: Vec<u8>, value: Vec<u8>) -> error::Result<()> {
fn engine_put(engine: &Engine, key: Vec<u8>, value: Vec<u8>) -> meta_error::Result<()> {
let mut batch = LogBatch::with_capacity(1);
batch
.put(SYSTEM_NAMESPACE, key, value)
.context(RaftEngineSnafu)?;
engine.write(&mut batch, false).context(RaftEngineSnafu)?;
.context(RaftEngineSnafu)
.map_err(BoxedError::new)
.context(meta_error::ExternalSnafu)?;
engine
.write(&mut batch, false)
.context(RaftEngineSnafu)
.map_err(BoxedError::new)
.context(meta_error::ExternalSnafu)?;
Ok(())
}
fn engine_delete(engine: &Engine, key: &[u8]) -> error::Result<()> {
fn engine_delete(engine: &Engine, key: &[u8]) -> meta_error::Result<()> {
let mut batch = LogBatch::with_capacity(1);
batch.delete(SYSTEM_NAMESPACE, key.to_vec());
engine.write(&mut batch, false).context(RaftEngineSnafu)?;
engine
.write(&mut batch, false)
.context(RaftEngineSnafu)
.map_err(BoxedError::new)
.context(meta_error::ExternalSnafu)?;
Ok(())
}

View File

@@ -187,7 +187,7 @@ impl DdlExecutor for MetaClient {
self.submit_ddl_task(request)
.await
.map_err(BoxedError::new)
.context(meta_error::ExecuteDdlSnafu)
.context(meta_error::ExternalSnafu)
}
}

View File

@@ -59,6 +59,6 @@ impl CacheInvalidator for MetasrvCacheInvalidator {
.broadcast(&BroadcastChannel::Frontend, msg)
.await
.map_err(BoxedError::new)
.context(meta_error::MetaSrvSnafu)
.context(meta_error::ExternalSnafu)
}
}

View File

@@ -21,6 +21,7 @@ use common_grpc::channel_manager::ChannelConfig;
use common_meta::ddl_manager::{DdlManager, DdlManagerRef};
use common_meta::key::{TableMetadataManager, TableMetadataManagerRef};
use common_meta::sequence::{Sequence, SequenceRef};
use common_meta::state_store::KvStateStore;
use common_procedure::local::{LocalManager, ManagerConfig};
use common_procedure::ProcedureManagerRef;
@@ -47,7 +48,6 @@ use crate::metasrv::{
ElectionRef, MetaSrv, MetaSrvOptions, MetasrvInfo, SelectorContext, SelectorRef, TABLE_ID_SEQ,
};
use crate::procedure::region_failover::RegionFailoverManager;
use crate::procedure::state_store::MetaStateStore;
use crate::pubsub::{PublishRef, SubscribeManagerRef};
use crate::selector::lease_based::LeaseBasedSelector;
use crate::service::mailbox::MailboxRef;
@@ -329,7 +329,7 @@ fn build_procedure_manager(options: &MetaSrvOptions, kv_store: &KvStoreRef) -> P
retry_delay: options.procedure.retry_delay,
..Default::default()
};
let state_store = Arc::new(MetaStateStore::new(kv_store.clone()));
let state_store = Arc::new(KvStateStore::new(KvBackendAdapter::wrap(kv_store.clone())));
Arc::new(LocalManager::new(manager_config, state_store))
}

View File

@@ -13,7 +13,6 @@
// limitations under the License.
pub mod region_failover;
pub(crate) mod state_store;
#[cfg(test)]
mod tests;
mod utils;

View File

@@ -17,7 +17,7 @@ use std::sync::Arc;
use async_trait::async_trait;
use common_error::ext::BoxedError;
use common_meta::error::MetaSrvSnafu;
use common_meta::error::ExternalSnafu;
use common_meta::kv_backend::txn::{Txn, TxnResponse};
use common_meta::kv_backend::{KvBackend, KvBackendRef, TxnService};
use common_meta::rpc::store::{
@@ -55,7 +55,7 @@ impl TxnService for KvBackendAdapter {
.txn(txn)
.await
.map_err(BoxedError::new)
.context(MetaSrvSnafu)
.context(ExternalSnafu)
}
}
@@ -74,7 +74,7 @@ impl KvBackend for KvBackendAdapter {
.range(req)
.await
.map_err(BoxedError::new)
.context(MetaSrvSnafu)
.context(ExternalSnafu)
}
async fn put(&self, req: PutRequest) -> Result<PutResponse, Self::Error> {
@@ -82,7 +82,7 @@ impl KvBackend for KvBackendAdapter {
.put(req)
.await
.map_err(BoxedError::new)
.context(MetaSrvSnafu)
.context(ExternalSnafu)
}
async fn batch_put(&self, req: BatchPutRequest) -> Result<BatchPutResponse, Self::Error> {
@@ -90,7 +90,7 @@ impl KvBackend for KvBackendAdapter {
.batch_put(req)
.await
.map_err(BoxedError::new)
.context(MetaSrvSnafu)
.context(ExternalSnafu)
}
async fn batch_get(&self, req: BatchGetRequest) -> Result<BatchGetResponse, Self::Error> {
@@ -98,7 +98,7 @@ impl KvBackend for KvBackendAdapter {
.batch_get(req)
.await
.map_err(BoxedError::new)
.context(MetaSrvSnafu)
.context(ExternalSnafu)
}
async fn compare_and_put(
@@ -109,7 +109,7 @@ impl KvBackend for KvBackendAdapter {
.compare_and_put(req)
.await
.map_err(BoxedError::new)
.context(MetaSrvSnafu)
.context(ExternalSnafu)
}
async fn delete_range(
@@ -120,7 +120,7 @@ impl KvBackend for KvBackendAdapter {
.delete_range(req)
.await
.map_err(BoxedError::new)
.context(MetaSrvSnafu)
.context(ExternalSnafu)
}
async fn batch_delete(
@@ -131,7 +131,7 @@ impl KvBackend for KvBackendAdapter {
.batch_delete(req)
.await
.map_err(BoxedError::new)
.context(MetaSrvSnafu)
.context(ExternalSnafu)
}
async fn move_value(&self, req: MoveValueRequest) -> Result<MoveValueResponse, Self::Error> {
@@ -139,6 +139,6 @@ impl KvBackend for KvBackendAdapter {
.move_value(req)
.await
.map_err(BoxedError::new)
.context(MetaSrvSnafu)
.context(ExternalSnafu)
}
}

View File

@@ -65,7 +65,7 @@ impl TableCreator for MetaSrvTableCreator {
)
.await
.map_err(BoxedError::new)
.context(meta_error::MetaSrvSnafu)
.context(meta_error::ExternalSnafu)
}
}

View File

@@ -18,6 +18,7 @@ use common_meta::key::TableMetadataManager;
use common_meta::peer::Peer;
use common_meta::rpc::router::{Region, RegionRoute};
use common_meta::sequence::Sequence;
use common_meta::state_store::KvStateStore;
use common_procedure::local::{LocalManager, ManagerConfig};
use crate::cluster::MetaPeerClientBuilder;
@@ -25,7 +26,6 @@ use crate::handler::{HeartbeatMailbox, Pushers};
use crate::lock::memory::MemLock;
use crate::metasrv::SelectorContext;
use crate::procedure::region_failover::RegionFailoverManager;
use crate::procedure::state_store::MetaStateStore;
use crate::selector::lease_based::LeaseBasedSelector;
use crate::service::store::kv::KvBackendAdapter;
use crate::service::store::memory::MemStore;
@@ -57,7 +57,7 @@ pub(crate) fn create_region_failover_manager() -> Arc<RegionFailoverManager> {
);
let mailbox = HeartbeatMailbox::create(pushers, mailbox_sequence);
let state_store = Arc::new(MetaStateStore::new(kv_store.clone()));
let state_store = Arc::new(KvStateStore::new(KvBackendAdapter::wrap(kv_store.clone())));
let procedure_manager = Arc::new(LocalManager::new(ManagerConfig::default(), state_store));
let in_memory = Arc::new(MemStore::new());