diff --git a/Cargo.lock b/Cargo.lock index 249066b9d4..0be1a8021a 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3635,7 +3635,7 @@ checksum = "d2fabcfbdc87f4758337ca535fb41a6d701b65693ce38287d856d1674551ec9b" [[package]] name = "greptime-proto" version = "0.1.0" -source = "git+https://github.com/GreptimeTeam/greptime-proto.git?rev=365922c3581ad954b3b3851af522b0ed0096970c#365922c3581ad954b3b3851af522b0ed0096970c" +source = "git+https://github.com/GreptimeTeam/greptime-proto.git?rev=1d7dc6f18b310355a4c16fb453d3ca7ed09f048d#1d7dc6f18b310355a4c16fb453d3ca7ed09f048d" dependencies = [ "prost 0.12.1", "serde", @@ -5658,8 +5658,9 @@ checksum = "978aa494585d3ca4ad74929863093e87cac9790d81fe7aba2b3dc2890643a0fc" [[package]] name = "orc-rust" -version = "0.2.42" -source = "git+https://github.com/WenyXu/orc-rs.git?rev=5f6399f759ba30cb46610d06027b507949a117ca#5f6399f759ba30cb46610d06027b507949a117ca" +version = "0.2.43" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "900310981898f6e3877286f1272b75f5c4a604628594a0a7026311b93a2aa5e6" dependencies = [ "arrow", "bytes", diff --git a/Cargo.toml b/Cargo.toml index 2dcc975d6f..a018cdf484 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -79,7 +79,7 @@ derive_builder = "0.12" etcd-client = "0.11" futures = "0.3" futures-util = "0.3" -greptime-proto = { git = "https://github.com/GreptimeTeam/greptime-proto.git", rev = "365922c3581ad954b3b3851af522b0ed0096970c" } +greptime-proto = { git = "https://github.com/GreptimeTeam/greptime-proto.git", rev = "1d7dc6f18b310355a4c16fb453d3ca7ed09f048d" } humantime-serde = "1.1" itertools = "0.10" lazy_static = "1.4" diff --git a/src/catalog/src/kvbackend/client.rs b/src/catalog/src/kvbackend/client.rs index ac63e5a0b2..a01650b7e5 100644 --- a/src/catalog/src/kvbackend/client.rs +++ b/src/catalog/src/kvbackend/client.rs @@ -25,8 +25,7 @@ use common_meta::kv_backend::{KvBackend, KvBackendRef, TxnService}; use common_meta::rpc::store::{ BatchDeleteRequest, BatchDeleteResponse, BatchGetRequest, BatchGetResponse, BatchPutRequest, BatchPutResponse, CompareAndPutRequest, CompareAndPutResponse, DeleteRangeRequest, - DeleteRangeResponse, MoveValueRequest, MoveValueResponse, PutRequest, PutResponse, - RangeRequest, RangeResponse, + DeleteRangeResponse, PutRequest, PutResponse, RangeRequest, RangeResponse, }; use common_meta::rpc::KeyValue; use common_telemetry::{debug, timer}; @@ -152,20 +151,6 @@ impl KvBackend for CachedMetaKvBackend { } } - async fn move_value(&self, req: MoveValueRequest) -> Result { - let from_key = &req.from_key.clone(); - let to_key = &req.to_key.clone(); - - let ret = self.kv_backend.move_value(req).await; - - if ret.is_ok() { - self.invalidate_key(from_key).await; - self.invalidate_key(to_key).await; - } - - ret - } - async fn get(&self, key: &[u8]) -> Result> { let _timer = timer!(METRIC_CATALOG_KV_GET); @@ -319,14 +304,6 @@ impl KvBackend for MetaKvBackend { .context(ExternalSnafu) } - async fn move_value(&self, req: MoveValueRequest) -> Result { - self.client - .move_value(req) - .await - .map_err(BoxedError::new) - .context(ExternalSnafu) - } - fn as_any(&self) -> &dyn Any { self } diff --git a/src/common/datasource/Cargo.toml b/src/common/datasource/Cargo.toml index efe6aec638..e6bff69538 100644 --- a/src/common/datasource/Cargo.toml +++ b/src/common/datasource/Cargo.toml @@ -25,7 +25,7 @@ derive_builder.workspace = true futures.workspace = true lazy_static.workspace = true object-store = { workspace = true } -orc-rust = { git = "https://github.com/WenyXu/orc-rs.git", rev = "5f6399f759ba30cb46610d06027b507949a117ca" } +orc-rust = "0.2" paste = "1.0" regex = "1.7" serde.workspace = true diff --git a/src/common/meta/src/kv_backend.rs b/src/common/meta/src/kv_backend.rs index d924b5b7f7..8a67ebd888 100644 --- a/src/common/meta/src/kv_backend.rs +++ b/src/common/meta/src/kv_backend.rs @@ -27,8 +27,7 @@ use crate::error::Error; use crate::rpc::store::{ BatchDeleteRequest, BatchDeleteResponse, BatchGetRequest, BatchGetResponse, BatchPutRequest, BatchPutResponse, CompareAndPutRequest, CompareAndPutResponse, DeleteRangeRequest, - DeleteRangeResponse, MoveValueRequest, MoveValueResponse, PutRequest, PutResponse, - RangeRequest, RangeResponse, + DeleteRangeResponse, PutRequest, PutResponse, RangeRequest, RangeResponse, }; use crate::rpc::KeyValue; @@ -66,9 +65,6 @@ where req: BatchDeleteRequest, ) -> Result; - /// MoveValue atomically renames the key to the given updated key. - async fn move_value(&self, req: MoveValueRequest) -> Result; - // The following methods are implemented based on the above methods, // and a higher-level interface is provided for to simplify usage. diff --git a/src/common/meta/src/kv_backend/memory.rs b/src/common/meta/src/kv_backend/memory.rs index dd434fe850..484d520895 100644 --- a/src/common/meta/src/kv_backend/memory.rs +++ b/src/common/meta/src/kv_backend/memory.rs @@ -30,8 +30,7 @@ use crate::metrics::METRIC_META_TXN_REQUEST; use crate::rpc::store::{ BatchDeleteRequest, BatchDeleteResponse, BatchGetRequest, BatchGetResponse, BatchPutRequest, BatchPutResponse, CompareAndPutRequest, CompareAndPutResponse, DeleteRangeRequest, - DeleteRangeResponse, MoveValueRequest, MoveValueResponse, PutRequest, PutResponse, - RangeRequest, RangeResponse, + DeleteRangeResponse, PutRequest, PutResponse, RangeRequest, RangeResponse, }; use crate::rpc::KeyValue; @@ -263,27 +262,6 @@ impl KvBackend for MemoryKvBackend { Ok(BatchDeleteResponse { prev_kvs }) } - - async fn move_value(&self, req: MoveValueRequest) -> Result { - let MoveValueRequest { from_key, to_key } = req; - - let mut kvs = self.kvs.write().unwrap(); - - let kv = if let Some(v) = kvs.remove(&from_key) { - kvs.insert(to_key, v.clone()); - Some(KeyValue { - key: from_key, - value: v, - }) - } else { - kvs.get(&to_key).map(|v| KeyValue { - key: to_key, - value: v.clone(), - }) - }; - - Ok(MoveValueResponse(kv)) - } } #[async_trait] @@ -358,7 +336,6 @@ mod tests { prepare_kv, test_kv_batch_delete, test_kv_batch_get, test_kv_compare_and_put, test_kv_delete_range, test_kv_put, test_kv_range, test_kv_range_2, }; - use crate::kv_backend::KvBackend; async fn mock_mem_store_with_data() -> MemoryKvBackend { let kv_store = MemoryKvBackend::::new(); @@ -409,30 +386,6 @@ mod tests { test_kv_delete_range(kv_store).await; } - #[tokio::test] - async fn test_move_value() { - let kv_store = mock_mem_store_with_data().await; - - let req = MoveValueRequest { - from_key: b"key1".to_vec(), - to_key: b"key111".to_vec(), - }; - - let resp = kv_store.move_value(req).await.unwrap(); - assert_eq!(b"key1", resp.0.as_ref().unwrap().key()); - assert_eq!(b"val1", resp.0.as_ref().unwrap().value()); - - let kv_store = mock_mem_store_with_data().await; - - let req = MoveValueRequest { - from_key: b"notexistkey".to_vec(), - to_key: b"key222".to_vec(), - }; - - let resp = kv_store.move_value(req).await.unwrap(); - assert!(resp.0.is_none()); - } - #[tokio::test] async fn test_batch_delete() { let kv_store = mock_mem_store_with_data().await; diff --git a/src/common/meta/src/rpc/store.rs b/src/common/meta/src/rpc/store.rs index b307894337..73b7d0de7c 100644 --- a/src/common/meta/src/rpc/store.rs +++ b/src/common/meta/src/rpc/store.rs @@ -21,8 +21,7 @@ use api::v1::meta::{ BatchPutRequest as PbBatchPutRequest, BatchPutResponse as PbBatchPutResponse, CompareAndPutRequest as PbCompareAndPutRequest, CompareAndPutResponse as PbCompareAndPutResponse, DeleteRangeRequest as PbDeleteRangeRequest, - DeleteRangeResponse as PbDeleteRangeResponse, MoveValueRequest as PbMoveValueRequest, - MoveValueResponse as PbMoveValueResponse, PutRequest as PbPutRequest, + DeleteRangeResponse as PbDeleteRangeResponse, PutRequest as PbPutRequest, PutResponse as PbPutResponse, RangeRequest as PbRangeRequest, RangeResponse as PbRangeResponse, ResponseHeader as PbResponseHeader, }; @@ -794,71 +793,6 @@ impl DeleteRangeResponse { } } -#[derive(Debug, Clone, Default)] -pub struct MoveValueRequest { - /// If from_key dose not exist, return the value of to_key (if it exists). - /// If from_key exists, move the value of from_key to to_key (i.e. rename), - /// and return the value. - pub from_key: Vec, - pub to_key: Vec, -} - -impl From for PbMoveValueRequest { - fn from(req: MoveValueRequest) -> Self { - Self { - header: None, - from_key: req.from_key, - to_key: req.to_key, - } - } -} - -impl From for MoveValueRequest { - fn from(value: PbMoveValueRequest) -> Self { - Self { - from_key: value.from_key, - to_key: value.to_key, - } - } -} - -impl MoveValueRequest { - #[inline] - pub fn new(from_key: impl Into>, to_key: impl Into>) -> Self { - Self { - from_key: from_key.into(), - to_key: to_key.into(), - } - } -} - -#[derive(Debug, Clone)] -pub struct MoveValueResponse(pub Option); - -impl TryFrom for MoveValueResponse { - type Error = error::Error; - - fn try_from(pb: PbMoveValueResponse) -> Result { - util::check_response_header(pb.header.as_ref())?; - - Ok(Self(pb.kv.map(KeyValue::new))) - } -} - -impl MoveValueResponse { - pub fn to_proto_resp(self, header: PbResponseHeader) -> PbMoveValueResponse { - PbMoveValueResponse { - header: Some(header), - kv: self.0.map(Into::into), - } - } - - #[inline] - pub fn take_kv(&mut self) -> Option { - self.0.take() - } -} - #[cfg(test)] mod tests { use api::v1::meta::{ @@ -866,10 +800,8 @@ mod tests { CompareAndPutRequest as PbCompareAndPutRequest, CompareAndPutResponse as PbCompareAndPutResponse, DeleteRangeRequest as PbDeleteRangeRequest, DeleteRangeResponse as PbDeleteRangeResponse, - KeyValue as PbKeyValue, MoveValueRequest as PbMoveValueRequest, - MoveValueResponse as PbMoveValueResponse, PutRequest as PbPutRequest, - PutResponse as PbPutResponse, RangeRequest as PbRangeRequest, - RangeResponse as PbRangeResponse, + KeyValue as PbKeyValue, PutRequest as PbPutRequest, PutResponse as PbPutResponse, + RangeRequest as PbRangeRequest, RangeResponse as PbRangeResponse, }; use super::*; @@ -1172,34 +1104,4 @@ mod tests { assert_eq!(b"v2".to_vec(), kv1.value().to_vec()); assert_eq!(b"v2".to_vec(), kv1.take_value()); } - - #[test] - fn test_move_value_request_trans() { - let (from_key, to_key) = (b"test_key1".to_vec(), b"test_key2".to_vec()); - - let req = MoveValueRequest::new(from_key.clone(), to_key.clone()); - - let into_req: PbMoveValueRequest = req.into(); - assert!(into_req.header.is_none()); - assert_eq!(from_key, into_req.from_key); - assert_eq!(to_key, into_req.to_key); - } - - #[test] - fn test_move_value_response_trans() { - let pb_res = PbMoveValueResponse { - header: None, - kv: Some(PbKeyValue { - key: b"k1".to_vec(), - value: b"v1".to_vec(), - }), - }; - - let mut res: MoveValueResponse = pb_res.try_into().unwrap(); - let mut kv = res.take_kv().unwrap(); - assert_eq!(b"k1".to_vec(), kv.key().to_vec()); - assert_eq!(b"k1".to_vec(), kv.take_key()); - assert_eq!(b"v1".to_vec(), kv.value().to_vec()); - assert_eq!(b"v1".to_vec(), kv.take_value()); - } } diff --git a/src/common/meta/src/sequence.rs b/src/common/meta/src/sequence.rs index 184d6cdac9..f5e2b69c94 100644 --- a/src/common/meta/src/sequence.rs +++ b/src/common/meta/src/sequence.rs @@ -166,8 +166,7 @@ mod tests { use crate::rpc::store::{ BatchDeleteRequest, BatchDeleteResponse, BatchGetRequest, BatchGetResponse, BatchPutRequest, BatchPutResponse, CompareAndPutResponse, DeleteRangeRequest, - DeleteRangeResponse, MoveValueRequest, MoveValueResponse, PutRequest, PutResponse, - RangeRequest, RangeResponse, + DeleteRangeResponse, PutRequest, PutResponse, RangeRequest, RangeResponse, }; #[tokio::test] @@ -247,10 +246,6 @@ mod tests { async fn batch_delete(&self, _: BatchDeleteRequest) -> Result { unreachable!() } - - async fn move_value(&self, _: MoveValueRequest) -> Result { - unreachable!() - } } let kv_store = Arc::new(Noop {}); diff --git a/src/log-store/src/raft_engine/backend.rs b/src/log-store/src/raft_engine/backend.rs index 8b27b99b14..b4320b1852 100644 --- a/src/log-store/src/raft_engine/backend.rs +++ b/src/log-store/src/raft_engine/backend.rs @@ -25,8 +25,7 @@ use common_meta::kv_backend::{KvBackend, TxnService}; use common_meta::rpc::store::{ BatchDeleteRequest, BatchDeleteResponse, BatchGetRequest, BatchGetResponse, BatchPutRequest, BatchPutResponse, CompareAndPutRequest, CompareAndPutResponse, DeleteRangeRequest, - DeleteRangeResponse, MoveValueRequest, MoveValueResponse, PutRequest, PutResponse, - RangeRequest, RangeResponse, + DeleteRangeResponse, PutRequest, PutResponse, RangeRequest, RangeResponse, }; use common_meta::rpc::KeyValue; use common_meta::util::get_next_prefix_key; @@ -343,10 +342,6 @@ impl KvBackend for RaftEngineBackend { Ok(BatchDeleteResponse { prev_kvs }) } - async fn move_value(&self, _req: MoveValueRequest) -> Result { - unimplemented!() - } - async fn get(&self, key: &[u8]) -> Result, Self::Error> { engine_get(&self.engine.read().unwrap(), key) } diff --git a/src/meta-client/src/client.rs b/src/meta-client/src/client.rs index 73e132353a..d39f6951f0 100644 --- a/src/meta-client/src/client.rs +++ b/src/meta-client/src/client.rs @@ -31,8 +31,7 @@ use common_meta::rpc::router::{RouteRequest, RouteResponse}; use common_meta::rpc::store::{ BatchDeleteRequest, BatchDeleteResponse, BatchGetRequest, BatchGetResponse, BatchPutRequest, BatchPutResponse, CompareAndPutRequest, CompareAndPutResponse, DeleteRangeRequest, - DeleteRangeResponse, MoveValueRequest, MoveValueResponse, PutRequest, PutResponse, - RangeRequest, RangeResponse, + DeleteRangeResponse, PutRequest, PutResponse, RangeRequest, RangeResponse, }; use common_telemetry::info; use ddl::Client as DdlClient; @@ -357,15 +356,6 @@ impl MetaClient { .context(ConvertMetaResponseSnafu) } - /// MoveValue atomically renames the key to the given updated key. - pub async fn move_value(&self, req: MoveValueRequest) -> Result { - self.store_client()? - .move_value(req.into()) - .await? - .try_into() - .context(ConvertMetaResponseSnafu) - } - pub async fn lock(&self, req: LockRequest) -> Result { self.lock_client()?.lock(req.into()).await.map(Into::into) } @@ -898,38 +888,4 @@ mod tests { ); } } - - #[tokio::test] - async fn test_move_value() { - let tc = new_client("test_move_value").await; - - let from_key = tc.key("from_key"); - let to_key = tc.key("to_key"); - - let req = MoveValueRequest::new(from_key.as_slice(), to_key.as_slice()); - let res = tc.client.move_value(req).await; - assert!(res.unwrap().take_kv().is_none()); - - let req = PutRequest::new() - .with_key(to_key.as_slice()) - .with_value(b"value".to_vec()); - let _ = tc.client.put(req).await; - - let req = MoveValueRequest::new(from_key.as_slice(), to_key.as_slice()); - let res = tc.client.move_value(req).await; - let mut kv = res.unwrap().take_kv().unwrap(); - assert_eq!(to_key.clone(), kv.take_key()); - assert_eq!(b"value".to_vec(), kv.take_value()); - - let req = PutRequest::new() - .with_key(from_key.as_slice()) - .with_value(b"value2".to_vec()); - let _ = tc.client.put(req).await; - - let req = MoveValueRequest::new(from_key.as_slice(), to_key.as_slice()); - let res = tc.client.move_value(req).await; - let mut kv = res.unwrap().take_kv().unwrap(); - assert_eq!(from_key, kv.take_key()); - assert_eq!(b"value2".to_vec(), kv.take_value()); - } } diff --git a/src/meta-client/src/client/store.rs b/src/meta-client/src/client/store.rs index 15a93886eb..53219ae464 100644 --- a/src/meta-client/src/client/store.rs +++ b/src/meta-client/src/client/store.rs @@ -19,8 +19,7 @@ use api::v1::meta::store_client::StoreClient; use api::v1::meta::{ BatchDeleteRequest, BatchDeleteResponse, BatchGetRequest, BatchGetResponse, BatchPutRequest, BatchPutResponse, CompareAndPutRequest, CompareAndPutResponse, DeleteRangeRequest, - DeleteRangeResponse, MoveValueRequest, MoveValueResponse, PutRequest, PutResponse, - RangeRequest, RangeResponse, Role, + DeleteRangeResponse, PutRequest, PutResponse, RangeRequest, RangeResponse, Role, }; use common_grpc::channel_manager::ChannelManager; use snafu::{ensure, OptionExt, ResultExt}; @@ -99,11 +98,6 @@ impl Client { let inner = self.inner.read().await; inner.delete_range(req).await } - - pub async fn move_value(&self, req: MoveValueRequest) -> Result { - let inner = self.inner.read().await; - inner.move_value(req).await - } } #[derive(Debug)] @@ -201,14 +195,6 @@ impl Inner { Ok(res.into_inner()) } - async fn move_value(&self, mut req: MoveValueRequest) -> Result { - let mut client = self.random_client()?; - req.set_header(self.id, self.role); - let res = client.move_value(req).await.map_err(error::Error::from)?; - - Ok(res.into_inner()) - } - fn random_client(&self) -> Result> { let len = self.peers.len(); let peer = lb::random_get(len, |i| Some(&self.peers[i])).context( diff --git a/src/meta-srv/src/service/store.rs b/src/meta-srv/src/service/store.rs index bafee9190b..7a74e4499b 100644 --- a/src/meta-srv/src/service/store.rs +++ b/src/meta-srv/src/service/store.rs @@ -24,14 +24,13 @@ use api::v1::meta::{ BatchGetResponse as PbBatchGetResponse, BatchPutRequest as PbBatchPutRequest, BatchPutResponse as PbBatchPutResponse, CompareAndPutRequest as PbCompareAndPutRequest, CompareAndPutResponse as PbCompareAndPutResponse, DeleteRangeRequest as PbDeleteRangeRequest, - DeleteRangeResponse as PbDeleteRangeResponse, MoveValueRequest as PbMoveValueRequest, - MoveValueResponse as PbMoveValueResponse, PutRequest as PbPutRequest, + DeleteRangeResponse as PbDeleteRangeResponse, PutRequest as PbPutRequest, PutResponse as PbPutResponse, RangeRequest as PbRangeRequest, RangeResponse as PbRangeResponse, ResponseHeader, }; use common_meta::rpc::store::{ BatchDeleteRequest, BatchGetRequest, BatchPutRequest, CompareAndPutRequest, DeleteRangeRequest, - MoveValueRequest, PutRequest, RangeRequest, + PutRequest, RangeRequest, }; use common_telemetry::timer; use snafu::OptionExt; @@ -234,35 +233,6 @@ impl store_server::Store for MetaSrv { let res = res.to_proto_resp(ResponseHeader::success(cluster_id)); Ok(Response::new(res)) } - - async fn move_value( - &self, - req: Request, - ) -> GrpcResult { - let req = req.into_inner(); - - let cluster_id = req - .header - .as_ref() - .context(MissingRequestHeaderSnafu)? - .cluster_id; - - let _timer = timer!( - METRIC_META_KV_REQUEST, - &[ - ("target", self.kv_store().name().to_string()), - ("op", "move_value".to_string()), - ("cluster_id", cluster_id.to_string()), - ] - ); - - let req: MoveValueRequest = req.into(); - - let res = self.kv_store().move_value(req).await?; - - let res = res.to_proto_resp(ResponseHeader::success(cluster_id)); - Ok(Response::new(res)) - } } #[cfg(test)] @@ -361,15 +331,4 @@ mod tests { let _ = res.unwrap(); } - - #[tokio::test] - async fn test_move_value() { - let meta_srv = new_meta_srv().await; - - let mut req = MoveValueRequest::default(); - req.set_header((1, 1), Role::Datanode); - let res = meta_srv.move_value(req.into_request()).await; - - let _ = res.unwrap(); - } } diff --git a/src/meta-srv/src/service/store/cached_kv.rs b/src/meta-srv/src/service/store/cached_kv.rs index 080bd18f07..907ab6715a 100644 --- a/src/meta-srv/src/service/store/cached_kv.rs +++ b/src/meta-srv/src/service/store/cached_kv.rs @@ -22,8 +22,7 @@ use common_meta::kv_backend::{KvBackend, TxnService}; use common_meta::rpc::store::{ BatchDeleteRequest, BatchDeleteResponse, BatchGetRequest, BatchGetResponse, BatchPutRequest, BatchPutResponse, CompareAndPutRequest, CompareAndPutResponse, DeleteRangeRequest, - DeleteRangeResponse, MoveValueRequest, MoveValueResponse, PutRequest, PutResponse, - RangeRequest, RangeResponse, + DeleteRangeResponse, PutRequest, PutResponse, RangeRequest, RangeResponse, }; use common_meta::rpc::KeyValue; @@ -282,25 +281,6 @@ impl KvBackend for LeaderCachedKvStore { let _ = self.cache.batch_delete(req).await?; Ok(res) } - - async fn move_value(&self, req: MoveValueRequest) -> Result { - if !self.is_leader() { - return self.store.move_value(req).await; - } - - let _ = self.create_new_version(); - - let res = self.store.move_value(req.clone()).await?; - let MoveValueRequest { - from_key, to_key, .. - } = req; - // Delete all keys in the cache. - // - // Cache can not deal with the move operation, because it does - // not contain full data, so we need to delete both keys. - self.invalid_keys(vec![from_key, to_key]).await?; - Ok(res) - } } #[async_trait::async_trait] diff --git a/src/meta-srv/src/service/store/etcd.rs b/src/meta-srv/src/service/store/etcd.rs index a9d8f70713..c6c0d6be03 100644 --- a/src/meta-srv/src/service/store/etcd.rs +++ b/src/meta-srv/src/service/store/etcd.rs @@ -21,11 +21,10 @@ use common_meta::metrics::METRIC_META_TXN_REQUEST; use common_meta::rpc::store::{ BatchDeleteRequest, BatchDeleteResponse, BatchGetRequest, BatchGetResponse, BatchPutRequest, BatchPutResponse, CompareAndPutRequest, CompareAndPutResponse, DeleteRangeRequest, - DeleteRangeResponse, MoveValueRequest, MoveValueResponse, PutRequest, PutResponse, - RangeRequest, RangeResponse, + DeleteRangeResponse, PutRequest, PutResponse, RangeRequest, RangeResponse, }; use common_meta::rpc::KeyValue; -use common_telemetry::{timer, warn}; +use common_telemetry::timer; use etcd_client::{ Client, Compare, CompareOp, DeleteOptions, GetOptions, PutOptions, Txn, TxnOp, TxnOpResponse, TxnResponse, @@ -289,79 +288,6 @@ impl KvBackend for EtcdStore { Ok(BatchDeleteResponse { prev_kvs }) } - - async fn move_value(&self, req: MoveValueRequest) -> Result { - let MoveValue { - from_key, - to_key, - delete_options, - } = req.try_into()?; - - let mut client = self.client.kv_client(); - - // TODO(jiachun): Maybe it's better to let the users control it in the request - const MAX_RETRIES: usize = 8; - for _ in 0..MAX_RETRIES { - let from_key = from_key.as_slice(); - let to_key = to_key.as_slice(); - - let res = client - .get(from_key, None) - .await - .context(error::EtcdFailedSnafu)?; - - let txn = match res.kvs().first() { - None => { - // get `to_key` if `from_key` absent - // revision 0 means key was not exist - let compare = Compare::create_revision(from_key, CompareOp::Equal, 0); - let get = TxnOp::get(to_key, None); - Txn::new().when(vec![compare]).and_then(vec![get]) - } - Some(kv) => { - // compare `from_key` and move to `to_key` - let value = kv.value(); - let compare = Compare::value(from_key, CompareOp::Equal, value); - let delete = TxnOp::delete(from_key, delete_options.clone()); - let put = TxnOp::put(to_key, value, None); - Txn::new().when(vec![compare]).and_then(vec![delete, put]) - } - }; - - let txn_res = client.txn(txn).await.context(error::EtcdFailedSnafu)?; - - if !txn_res.succeeded() { - warn!( - "Failed to atomically move {:?} to {:?}, try again...", - String::from_utf8_lossy(from_key), - String::from_utf8_lossy(to_key) - ); - continue; - } - - // [`get_res'] or [`delete_res`, `put_res`], `put_res` will be ignored. - for op_res in txn_res.op_responses() { - match op_res { - TxnOpResponse::Get(res) => { - return Ok(MoveValueResponse( - res.kvs().first().map(KvPair::from_etcd_kv), - )); - } - TxnOpResponse::Delete(res) => { - return Ok(MoveValueResponse( - res.prev_kvs().first().map(KvPair::from_etcd_kv), - )); - } - _ => {} - } - } - } - - error::MoveValueSnafu { - key: String::from_utf8_lossy(&from_key), - } - .fail() - } } #[async_trait::async_trait] @@ -570,26 +496,6 @@ impl TryFrom for Delete { } } -struct MoveValue { - from_key: Vec, - to_key: Vec, - delete_options: Option, -} - -impl TryFrom for MoveValue { - type Error = Error; - - fn try_from(req: MoveValueRequest) -> Result { - let MoveValueRequest { from_key, to_key } = req; - - Ok(MoveValue { - from_key, - to_key, - delete_options: Some(DeleteOptions::default().with_prev_key()), - }) - } -} - #[cfg(test)] mod tests { use super::*; @@ -701,18 +607,4 @@ mod tests { assert_eq!(b"test_key".to_vec(), delete.key); let _ = delete.options.unwrap(); } - - #[test] - fn test_parse_move_value() { - let req = MoveValueRequest { - from_key: b"test_from_key".to_vec(), - to_key: b"test_to_key".to_vec(), - }; - - let move_value: MoveValue = req.try_into().unwrap(); - - assert_eq!(b"test_from_key".to_vec(), move_value.from_key); - assert_eq!(b"test_to_key".to_vec(), move_value.to_key); - let _ = move_value.delete_options.unwrap(); - } } diff --git a/src/meta-srv/src/service/store/kv.rs b/src/meta-srv/src/service/store/kv.rs index a661436039..3e60798605 100644 --- a/src/meta-srv/src/service/store/kv.rs +++ b/src/meta-srv/src/service/store/kv.rs @@ -23,8 +23,7 @@ use common_meta::kv_backend::{KvBackend, KvBackendRef, TxnService}; use common_meta::rpc::store::{ BatchDeleteRequest, BatchDeleteResponse, BatchGetRequest, BatchGetResponse, BatchPutRequest, BatchPutResponse, CompareAndPutRequest, CompareAndPutResponse, DeleteRangeRequest, - DeleteRangeResponse, MoveValueRequest, MoveValueResponse, PutRequest, PutResponse, - RangeRequest, RangeResponse, + DeleteRangeResponse, PutRequest, PutResponse, RangeRequest, RangeResponse, }; use snafu::ResultExt; @@ -133,12 +132,4 @@ impl KvBackend for KvBackendAdapter { .map_err(BoxedError::new) .context(ExternalSnafu) } - - async fn move_value(&self, req: MoveValueRequest) -> Result { - self.0 - .move_value(req) - .await - .map_err(BoxedError::new) - .context(ExternalSnafu) - } }