mirror of
https://github.com/GreptimeTeam/greptimedb.git
synced 2026-05-24 17:00:37 +00:00
refactor: remove move_value (#2661)
* chore: bump orc-rust to 0.2.42 * refactor: remove move_value
This commit is contained in:
7
Cargo.lock
generated
7
Cargo.lock
generated
@@ -3635,7 +3635,7 @@ checksum = "d2fabcfbdc87f4758337ca535fb41a6d701b65693ce38287d856d1674551ec9b"
|
||||
[[package]]
|
||||
name = "greptime-proto"
|
||||
version = "0.1.0"
|
||||
source = "git+https://github.com/GreptimeTeam/greptime-proto.git?rev=365922c3581ad954b3b3851af522b0ed0096970c#365922c3581ad954b3b3851af522b0ed0096970c"
|
||||
source = "git+https://github.com/GreptimeTeam/greptime-proto.git?rev=1d7dc6f18b310355a4c16fb453d3ca7ed09f048d#1d7dc6f18b310355a4c16fb453d3ca7ed09f048d"
|
||||
dependencies = [
|
||||
"prost 0.12.1",
|
||||
"serde",
|
||||
@@ -5658,8 +5658,9 @@ checksum = "978aa494585d3ca4ad74929863093e87cac9790d81fe7aba2b3dc2890643a0fc"
|
||||
|
||||
[[package]]
|
||||
name = "orc-rust"
|
||||
version = "0.2.42"
|
||||
source = "git+https://github.com/WenyXu/orc-rs.git?rev=5f6399f759ba30cb46610d06027b507949a117ca#5f6399f759ba30cb46610d06027b507949a117ca"
|
||||
version = "0.2.43"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "900310981898f6e3877286f1272b75f5c4a604628594a0a7026311b93a2aa5e6"
|
||||
dependencies = [
|
||||
"arrow",
|
||||
"bytes",
|
||||
|
||||
@@ -79,7 +79,7 @@ derive_builder = "0.12"
|
||||
etcd-client = "0.11"
|
||||
futures = "0.3"
|
||||
futures-util = "0.3"
|
||||
greptime-proto = { git = "https://github.com/GreptimeTeam/greptime-proto.git", rev = "365922c3581ad954b3b3851af522b0ed0096970c" }
|
||||
greptime-proto = { git = "https://github.com/GreptimeTeam/greptime-proto.git", rev = "1d7dc6f18b310355a4c16fb453d3ca7ed09f048d" }
|
||||
humantime-serde = "1.1"
|
||||
itertools = "0.10"
|
||||
lazy_static = "1.4"
|
||||
|
||||
@@ -25,8 +25,7 @@ use common_meta::kv_backend::{KvBackend, KvBackendRef, TxnService};
|
||||
use common_meta::rpc::store::{
|
||||
BatchDeleteRequest, BatchDeleteResponse, BatchGetRequest, BatchGetResponse, BatchPutRequest,
|
||||
BatchPutResponse, CompareAndPutRequest, CompareAndPutResponse, DeleteRangeRequest,
|
||||
DeleteRangeResponse, MoveValueRequest, MoveValueResponse, PutRequest, PutResponse,
|
||||
RangeRequest, RangeResponse,
|
||||
DeleteRangeResponse, PutRequest, PutResponse, RangeRequest, RangeResponse,
|
||||
};
|
||||
use common_meta::rpc::KeyValue;
|
||||
use common_telemetry::{debug, timer};
|
||||
@@ -152,20 +151,6 @@ impl KvBackend for CachedMetaKvBackend {
|
||||
}
|
||||
}
|
||||
|
||||
async fn move_value(&self, req: MoveValueRequest) -> Result<MoveValueResponse> {
|
||||
let from_key = &req.from_key.clone();
|
||||
let to_key = &req.to_key.clone();
|
||||
|
||||
let ret = self.kv_backend.move_value(req).await;
|
||||
|
||||
if ret.is_ok() {
|
||||
self.invalidate_key(from_key).await;
|
||||
self.invalidate_key(to_key).await;
|
||||
}
|
||||
|
||||
ret
|
||||
}
|
||||
|
||||
async fn get(&self, key: &[u8]) -> Result<Option<KeyValue>> {
|
||||
let _timer = timer!(METRIC_CATALOG_KV_GET);
|
||||
|
||||
@@ -319,14 +304,6 @@ impl KvBackend for MetaKvBackend {
|
||||
.context(ExternalSnafu)
|
||||
}
|
||||
|
||||
async fn move_value(&self, req: MoveValueRequest) -> Result<MoveValueResponse> {
|
||||
self.client
|
||||
.move_value(req)
|
||||
.await
|
||||
.map_err(BoxedError::new)
|
||||
.context(ExternalSnafu)
|
||||
}
|
||||
|
||||
fn as_any(&self) -> &dyn Any {
|
||||
self
|
||||
}
|
||||
|
||||
@@ -25,7 +25,7 @@ derive_builder.workspace = true
|
||||
futures.workspace = true
|
||||
lazy_static.workspace = true
|
||||
object-store = { workspace = true }
|
||||
orc-rust = { git = "https://github.com/WenyXu/orc-rs.git", rev = "5f6399f759ba30cb46610d06027b507949a117ca" }
|
||||
orc-rust = "0.2"
|
||||
paste = "1.0"
|
||||
regex = "1.7"
|
||||
serde.workspace = true
|
||||
|
||||
@@ -27,8 +27,7 @@ use crate::error::Error;
|
||||
use crate::rpc::store::{
|
||||
BatchDeleteRequest, BatchDeleteResponse, BatchGetRequest, BatchGetResponse, BatchPutRequest,
|
||||
BatchPutResponse, CompareAndPutRequest, CompareAndPutResponse, DeleteRangeRequest,
|
||||
DeleteRangeResponse, MoveValueRequest, MoveValueResponse, PutRequest, PutResponse,
|
||||
RangeRequest, RangeResponse,
|
||||
DeleteRangeResponse, PutRequest, PutResponse, RangeRequest, RangeResponse,
|
||||
};
|
||||
use crate::rpc::KeyValue;
|
||||
|
||||
@@ -66,9 +65,6 @@ where
|
||||
req: BatchDeleteRequest,
|
||||
) -> Result<BatchDeleteResponse, Self::Error>;
|
||||
|
||||
/// MoveValue atomically renames the key to the given updated key.
|
||||
async fn move_value(&self, req: MoveValueRequest) -> Result<MoveValueResponse, Self::Error>;
|
||||
|
||||
// The following methods are implemented based on the above methods,
|
||||
// and a higher-level interface is provided for to simplify usage.
|
||||
|
||||
|
||||
@@ -30,8 +30,7 @@ use crate::metrics::METRIC_META_TXN_REQUEST;
|
||||
use crate::rpc::store::{
|
||||
BatchDeleteRequest, BatchDeleteResponse, BatchGetRequest, BatchGetResponse, BatchPutRequest,
|
||||
BatchPutResponse, CompareAndPutRequest, CompareAndPutResponse, DeleteRangeRequest,
|
||||
DeleteRangeResponse, MoveValueRequest, MoveValueResponse, PutRequest, PutResponse,
|
||||
RangeRequest, RangeResponse,
|
||||
DeleteRangeResponse, PutRequest, PutResponse, RangeRequest, RangeResponse,
|
||||
};
|
||||
use crate::rpc::KeyValue;
|
||||
|
||||
@@ -263,27 +262,6 @@ impl<T: ErrorExt + Send + Sync + 'static> KvBackend for MemoryKvBackend<T> {
|
||||
|
||||
Ok(BatchDeleteResponse { prev_kvs })
|
||||
}
|
||||
|
||||
async fn move_value(&self, req: MoveValueRequest) -> Result<MoveValueResponse, Self::Error> {
|
||||
let MoveValueRequest { from_key, to_key } = req;
|
||||
|
||||
let mut kvs = self.kvs.write().unwrap();
|
||||
|
||||
let kv = if let Some(v) = kvs.remove(&from_key) {
|
||||
kvs.insert(to_key, v.clone());
|
||||
Some(KeyValue {
|
||||
key: from_key,
|
||||
value: v,
|
||||
})
|
||||
} else {
|
||||
kvs.get(&to_key).map(|v| KeyValue {
|
||||
key: to_key,
|
||||
value: v.clone(),
|
||||
})
|
||||
};
|
||||
|
||||
Ok(MoveValueResponse(kv))
|
||||
}
|
||||
}
|
||||
|
||||
#[async_trait]
|
||||
@@ -358,7 +336,6 @@ mod tests {
|
||||
prepare_kv, test_kv_batch_delete, test_kv_batch_get, test_kv_compare_and_put,
|
||||
test_kv_delete_range, test_kv_put, test_kv_range, test_kv_range_2,
|
||||
};
|
||||
use crate::kv_backend::KvBackend;
|
||||
|
||||
async fn mock_mem_store_with_data() -> MemoryKvBackend<Error> {
|
||||
let kv_store = MemoryKvBackend::<Error>::new();
|
||||
@@ -409,30 +386,6 @@ mod tests {
|
||||
test_kv_delete_range(kv_store).await;
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_move_value() {
|
||||
let kv_store = mock_mem_store_with_data().await;
|
||||
|
||||
let req = MoveValueRequest {
|
||||
from_key: b"key1".to_vec(),
|
||||
to_key: b"key111".to_vec(),
|
||||
};
|
||||
|
||||
let resp = kv_store.move_value(req).await.unwrap();
|
||||
assert_eq!(b"key1", resp.0.as_ref().unwrap().key());
|
||||
assert_eq!(b"val1", resp.0.as_ref().unwrap().value());
|
||||
|
||||
let kv_store = mock_mem_store_with_data().await;
|
||||
|
||||
let req = MoveValueRequest {
|
||||
from_key: b"notexistkey".to_vec(),
|
||||
to_key: b"key222".to_vec(),
|
||||
};
|
||||
|
||||
let resp = kv_store.move_value(req).await.unwrap();
|
||||
assert!(resp.0.is_none());
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_batch_delete() {
|
||||
let kv_store = mock_mem_store_with_data().await;
|
||||
|
||||
@@ -21,8 +21,7 @@ use api::v1::meta::{
|
||||
BatchPutRequest as PbBatchPutRequest, BatchPutResponse as PbBatchPutResponse,
|
||||
CompareAndPutRequest as PbCompareAndPutRequest,
|
||||
CompareAndPutResponse as PbCompareAndPutResponse, DeleteRangeRequest as PbDeleteRangeRequest,
|
||||
DeleteRangeResponse as PbDeleteRangeResponse, MoveValueRequest as PbMoveValueRequest,
|
||||
MoveValueResponse as PbMoveValueResponse, PutRequest as PbPutRequest,
|
||||
DeleteRangeResponse as PbDeleteRangeResponse, PutRequest as PbPutRequest,
|
||||
PutResponse as PbPutResponse, RangeRequest as PbRangeRequest, RangeResponse as PbRangeResponse,
|
||||
ResponseHeader as PbResponseHeader,
|
||||
};
|
||||
@@ -794,71 +793,6 @@ impl DeleteRangeResponse {
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, Default)]
|
||||
pub struct MoveValueRequest {
|
||||
/// If from_key dose not exist, return the value of to_key (if it exists).
|
||||
/// If from_key exists, move the value of from_key to to_key (i.e. rename),
|
||||
/// and return the value.
|
||||
pub from_key: Vec<u8>,
|
||||
pub to_key: Vec<u8>,
|
||||
}
|
||||
|
||||
impl From<MoveValueRequest> for PbMoveValueRequest {
|
||||
fn from(req: MoveValueRequest) -> Self {
|
||||
Self {
|
||||
header: None,
|
||||
from_key: req.from_key,
|
||||
to_key: req.to_key,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl From<PbMoveValueRequest> for MoveValueRequest {
|
||||
fn from(value: PbMoveValueRequest) -> Self {
|
||||
Self {
|
||||
from_key: value.from_key,
|
||||
to_key: value.to_key,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl MoveValueRequest {
|
||||
#[inline]
|
||||
pub fn new(from_key: impl Into<Vec<u8>>, to_key: impl Into<Vec<u8>>) -> Self {
|
||||
Self {
|
||||
from_key: from_key.into(),
|
||||
to_key: to_key.into(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone)]
|
||||
pub struct MoveValueResponse(pub Option<KeyValue>);
|
||||
|
||||
impl TryFrom<PbMoveValueResponse> for MoveValueResponse {
|
||||
type Error = error::Error;
|
||||
|
||||
fn try_from(pb: PbMoveValueResponse) -> Result<Self> {
|
||||
util::check_response_header(pb.header.as_ref())?;
|
||||
|
||||
Ok(Self(pb.kv.map(KeyValue::new)))
|
||||
}
|
||||
}
|
||||
|
||||
impl MoveValueResponse {
|
||||
pub fn to_proto_resp(self, header: PbResponseHeader) -> PbMoveValueResponse {
|
||||
PbMoveValueResponse {
|
||||
header: Some(header),
|
||||
kv: self.0.map(Into::into),
|
||||
}
|
||||
}
|
||||
|
||||
#[inline]
|
||||
pub fn take_kv(&mut self) -> Option<KeyValue> {
|
||||
self.0.take()
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use api::v1::meta::{
|
||||
@@ -866,10 +800,8 @@ mod tests {
|
||||
CompareAndPutRequest as PbCompareAndPutRequest,
|
||||
CompareAndPutResponse as PbCompareAndPutResponse,
|
||||
DeleteRangeRequest as PbDeleteRangeRequest, DeleteRangeResponse as PbDeleteRangeResponse,
|
||||
KeyValue as PbKeyValue, MoveValueRequest as PbMoveValueRequest,
|
||||
MoveValueResponse as PbMoveValueResponse, PutRequest as PbPutRequest,
|
||||
PutResponse as PbPutResponse, RangeRequest as PbRangeRequest,
|
||||
RangeResponse as PbRangeResponse,
|
||||
KeyValue as PbKeyValue, PutRequest as PbPutRequest, PutResponse as PbPutResponse,
|
||||
RangeRequest as PbRangeRequest, RangeResponse as PbRangeResponse,
|
||||
};
|
||||
|
||||
use super::*;
|
||||
@@ -1172,34 +1104,4 @@ mod tests {
|
||||
assert_eq!(b"v2".to_vec(), kv1.value().to_vec());
|
||||
assert_eq!(b"v2".to_vec(), kv1.take_value());
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_move_value_request_trans() {
|
||||
let (from_key, to_key) = (b"test_key1".to_vec(), b"test_key2".to_vec());
|
||||
|
||||
let req = MoveValueRequest::new(from_key.clone(), to_key.clone());
|
||||
|
||||
let into_req: PbMoveValueRequest = req.into();
|
||||
assert!(into_req.header.is_none());
|
||||
assert_eq!(from_key, into_req.from_key);
|
||||
assert_eq!(to_key, into_req.to_key);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_move_value_response_trans() {
|
||||
let pb_res = PbMoveValueResponse {
|
||||
header: None,
|
||||
kv: Some(PbKeyValue {
|
||||
key: b"k1".to_vec(),
|
||||
value: b"v1".to_vec(),
|
||||
}),
|
||||
};
|
||||
|
||||
let mut res: MoveValueResponse = pb_res.try_into().unwrap();
|
||||
let mut kv = res.take_kv().unwrap();
|
||||
assert_eq!(b"k1".to_vec(), kv.key().to_vec());
|
||||
assert_eq!(b"k1".to_vec(), kv.take_key());
|
||||
assert_eq!(b"v1".to_vec(), kv.value().to_vec());
|
||||
assert_eq!(b"v1".to_vec(), kv.take_value());
|
||||
}
|
||||
}
|
||||
|
||||
@@ -166,8 +166,7 @@ mod tests {
|
||||
use crate::rpc::store::{
|
||||
BatchDeleteRequest, BatchDeleteResponse, BatchGetRequest, BatchGetResponse,
|
||||
BatchPutRequest, BatchPutResponse, CompareAndPutResponse, DeleteRangeRequest,
|
||||
DeleteRangeResponse, MoveValueRequest, MoveValueResponse, PutRequest, PutResponse,
|
||||
RangeRequest, RangeResponse,
|
||||
DeleteRangeResponse, PutRequest, PutResponse, RangeRequest, RangeResponse,
|
||||
};
|
||||
|
||||
#[tokio::test]
|
||||
@@ -247,10 +246,6 @@ mod tests {
|
||||
async fn batch_delete(&self, _: BatchDeleteRequest) -> Result<BatchDeleteResponse> {
|
||||
unreachable!()
|
||||
}
|
||||
|
||||
async fn move_value(&self, _: MoveValueRequest) -> Result<MoveValueResponse> {
|
||||
unreachable!()
|
||||
}
|
||||
}
|
||||
|
||||
let kv_store = Arc::new(Noop {});
|
||||
|
||||
@@ -25,8 +25,7 @@ use common_meta::kv_backend::{KvBackend, TxnService};
|
||||
use common_meta::rpc::store::{
|
||||
BatchDeleteRequest, BatchDeleteResponse, BatchGetRequest, BatchGetResponse, BatchPutRequest,
|
||||
BatchPutResponse, CompareAndPutRequest, CompareAndPutResponse, DeleteRangeRequest,
|
||||
DeleteRangeResponse, MoveValueRequest, MoveValueResponse, PutRequest, PutResponse,
|
||||
RangeRequest, RangeResponse,
|
||||
DeleteRangeResponse, PutRequest, PutResponse, RangeRequest, RangeResponse,
|
||||
};
|
||||
use common_meta::rpc::KeyValue;
|
||||
use common_meta::util::get_next_prefix_key;
|
||||
@@ -343,10 +342,6 @@ impl KvBackend for RaftEngineBackend {
|
||||
Ok(BatchDeleteResponse { prev_kvs })
|
||||
}
|
||||
|
||||
async fn move_value(&self, _req: MoveValueRequest) -> Result<MoveValueResponse, Self::Error> {
|
||||
unimplemented!()
|
||||
}
|
||||
|
||||
async fn get(&self, key: &[u8]) -> Result<Option<KeyValue>, Self::Error> {
|
||||
engine_get(&self.engine.read().unwrap(), key)
|
||||
}
|
||||
|
||||
@@ -31,8 +31,7 @@ use common_meta::rpc::router::{RouteRequest, RouteResponse};
|
||||
use common_meta::rpc::store::{
|
||||
BatchDeleteRequest, BatchDeleteResponse, BatchGetRequest, BatchGetResponse, BatchPutRequest,
|
||||
BatchPutResponse, CompareAndPutRequest, CompareAndPutResponse, DeleteRangeRequest,
|
||||
DeleteRangeResponse, MoveValueRequest, MoveValueResponse, PutRequest, PutResponse,
|
||||
RangeRequest, RangeResponse,
|
||||
DeleteRangeResponse, PutRequest, PutResponse, RangeRequest, RangeResponse,
|
||||
};
|
||||
use common_telemetry::info;
|
||||
use ddl::Client as DdlClient;
|
||||
@@ -357,15 +356,6 @@ impl MetaClient {
|
||||
.context(ConvertMetaResponseSnafu)
|
||||
}
|
||||
|
||||
/// MoveValue atomically renames the key to the given updated key.
|
||||
pub async fn move_value(&self, req: MoveValueRequest) -> Result<MoveValueResponse> {
|
||||
self.store_client()?
|
||||
.move_value(req.into())
|
||||
.await?
|
||||
.try_into()
|
||||
.context(ConvertMetaResponseSnafu)
|
||||
}
|
||||
|
||||
pub async fn lock(&self, req: LockRequest) -> Result<LockResponse> {
|
||||
self.lock_client()?.lock(req.into()).await.map(Into::into)
|
||||
}
|
||||
@@ -898,38 +888,4 @@ mod tests {
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_move_value() {
|
||||
let tc = new_client("test_move_value").await;
|
||||
|
||||
let from_key = tc.key("from_key");
|
||||
let to_key = tc.key("to_key");
|
||||
|
||||
let req = MoveValueRequest::new(from_key.as_slice(), to_key.as_slice());
|
||||
let res = tc.client.move_value(req).await;
|
||||
assert!(res.unwrap().take_kv().is_none());
|
||||
|
||||
let req = PutRequest::new()
|
||||
.with_key(to_key.as_slice())
|
||||
.with_value(b"value".to_vec());
|
||||
let _ = tc.client.put(req).await;
|
||||
|
||||
let req = MoveValueRequest::new(from_key.as_slice(), to_key.as_slice());
|
||||
let res = tc.client.move_value(req).await;
|
||||
let mut kv = res.unwrap().take_kv().unwrap();
|
||||
assert_eq!(to_key.clone(), kv.take_key());
|
||||
assert_eq!(b"value".to_vec(), kv.take_value());
|
||||
|
||||
let req = PutRequest::new()
|
||||
.with_key(from_key.as_slice())
|
||||
.with_value(b"value2".to_vec());
|
||||
let _ = tc.client.put(req).await;
|
||||
|
||||
let req = MoveValueRequest::new(from_key.as_slice(), to_key.as_slice());
|
||||
let res = tc.client.move_value(req).await;
|
||||
let mut kv = res.unwrap().take_kv().unwrap();
|
||||
assert_eq!(from_key, kv.take_key());
|
||||
assert_eq!(b"value2".to_vec(), kv.take_value());
|
||||
}
|
||||
}
|
||||
|
||||
@@ -19,8 +19,7 @@ use api::v1::meta::store_client::StoreClient;
|
||||
use api::v1::meta::{
|
||||
BatchDeleteRequest, BatchDeleteResponse, BatchGetRequest, BatchGetResponse, BatchPutRequest,
|
||||
BatchPutResponse, CompareAndPutRequest, CompareAndPutResponse, DeleteRangeRequest,
|
||||
DeleteRangeResponse, MoveValueRequest, MoveValueResponse, PutRequest, PutResponse,
|
||||
RangeRequest, RangeResponse, Role,
|
||||
DeleteRangeResponse, PutRequest, PutResponse, RangeRequest, RangeResponse, Role,
|
||||
};
|
||||
use common_grpc::channel_manager::ChannelManager;
|
||||
use snafu::{ensure, OptionExt, ResultExt};
|
||||
@@ -99,11 +98,6 @@ impl Client {
|
||||
let inner = self.inner.read().await;
|
||||
inner.delete_range(req).await
|
||||
}
|
||||
|
||||
pub async fn move_value(&self, req: MoveValueRequest) -> Result<MoveValueResponse> {
|
||||
let inner = self.inner.read().await;
|
||||
inner.move_value(req).await
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
@@ -201,14 +195,6 @@ impl Inner {
|
||||
Ok(res.into_inner())
|
||||
}
|
||||
|
||||
async fn move_value(&self, mut req: MoveValueRequest) -> Result<MoveValueResponse> {
|
||||
let mut client = self.random_client()?;
|
||||
req.set_header(self.id, self.role);
|
||||
let res = client.move_value(req).await.map_err(error::Error::from)?;
|
||||
|
||||
Ok(res.into_inner())
|
||||
}
|
||||
|
||||
fn random_client(&self) -> Result<StoreClient<Channel>> {
|
||||
let len = self.peers.len();
|
||||
let peer = lb::random_get(len, |i| Some(&self.peers[i])).context(
|
||||
|
||||
@@ -24,14 +24,13 @@ use api::v1::meta::{
|
||||
BatchGetResponse as PbBatchGetResponse, BatchPutRequest as PbBatchPutRequest,
|
||||
BatchPutResponse as PbBatchPutResponse, CompareAndPutRequest as PbCompareAndPutRequest,
|
||||
CompareAndPutResponse as PbCompareAndPutResponse, DeleteRangeRequest as PbDeleteRangeRequest,
|
||||
DeleteRangeResponse as PbDeleteRangeResponse, MoveValueRequest as PbMoveValueRequest,
|
||||
MoveValueResponse as PbMoveValueResponse, PutRequest as PbPutRequest,
|
||||
DeleteRangeResponse as PbDeleteRangeResponse, PutRequest as PbPutRequest,
|
||||
PutResponse as PbPutResponse, RangeRequest as PbRangeRequest, RangeResponse as PbRangeResponse,
|
||||
ResponseHeader,
|
||||
};
|
||||
use common_meta::rpc::store::{
|
||||
BatchDeleteRequest, BatchGetRequest, BatchPutRequest, CompareAndPutRequest, DeleteRangeRequest,
|
||||
MoveValueRequest, PutRequest, RangeRequest,
|
||||
PutRequest, RangeRequest,
|
||||
};
|
||||
use common_telemetry::timer;
|
||||
use snafu::OptionExt;
|
||||
@@ -234,35 +233,6 @@ impl store_server::Store for MetaSrv {
|
||||
let res = res.to_proto_resp(ResponseHeader::success(cluster_id));
|
||||
Ok(Response::new(res))
|
||||
}
|
||||
|
||||
async fn move_value(
|
||||
&self,
|
||||
req: Request<PbMoveValueRequest>,
|
||||
) -> GrpcResult<PbMoveValueResponse> {
|
||||
let req = req.into_inner();
|
||||
|
||||
let cluster_id = req
|
||||
.header
|
||||
.as_ref()
|
||||
.context(MissingRequestHeaderSnafu)?
|
||||
.cluster_id;
|
||||
|
||||
let _timer = timer!(
|
||||
METRIC_META_KV_REQUEST,
|
||||
&[
|
||||
("target", self.kv_store().name().to_string()),
|
||||
("op", "move_value".to_string()),
|
||||
("cluster_id", cluster_id.to_string()),
|
||||
]
|
||||
);
|
||||
|
||||
let req: MoveValueRequest = req.into();
|
||||
|
||||
let res = self.kv_store().move_value(req).await?;
|
||||
|
||||
let res = res.to_proto_resp(ResponseHeader::success(cluster_id));
|
||||
Ok(Response::new(res))
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
@@ -361,15 +331,4 @@ mod tests {
|
||||
|
||||
let _ = res.unwrap();
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_move_value() {
|
||||
let meta_srv = new_meta_srv().await;
|
||||
|
||||
let mut req = MoveValueRequest::default();
|
||||
req.set_header((1, 1), Role::Datanode);
|
||||
let res = meta_srv.move_value(req.into_request()).await;
|
||||
|
||||
let _ = res.unwrap();
|
||||
}
|
||||
}
|
||||
|
||||
@@ -22,8 +22,7 @@ use common_meta::kv_backend::{KvBackend, TxnService};
|
||||
use common_meta::rpc::store::{
|
||||
BatchDeleteRequest, BatchDeleteResponse, BatchGetRequest, BatchGetResponse, BatchPutRequest,
|
||||
BatchPutResponse, CompareAndPutRequest, CompareAndPutResponse, DeleteRangeRequest,
|
||||
DeleteRangeResponse, MoveValueRequest, MoveValueResponse, PutRequest, PutResponse,
|
||||
RangeRequest, RangeResponse,
|
||||
DeleteRangeResponse, PutRequest, PutResponse, RangeRequest, RangeResponse,
|
||||
};
|
||||
use common_meta::rpc::KeyValue;
|
||||
|
||||
@@ -282,25 +281,6 @@ impl KvBackend for LeaderCachedKvStore {
|
||||
let _ = self.cache.batch_delete(req).await?;
|
||||
Ok(res)
|
||||
}
|
||||
|
||||
async fn move_value(&self, req: MoveValueRequest) -> Result<MoveValueResponse> {
|
||||
if !self.is_leader() {
|
||||
return self.store.move_value(req).await;
|
||||
}
|
||||
|
||||
let _ = self.create_new_version();
|
||||
|
||||
let res = self.store.move_value(req.clone()).await?;
|
||||
let MoveValueRequest {
|
||||
from_key, to_key, ..
|
||||
} = req;
|
||||
// Delete all keys in the cache.
|
||||
//
|
||||
// Cache can not deal with the move operation, because it does
|
||||
// not contain full data, so we need to delete both keys.
|
||||
self.invalid_keys(vec![from_key, to_key]).await?;
|
||||
Ok(res)
|
||||
}
|
||||
}
|
||||
|
||||
#[async_trait::async_trait]
|
||||
|
||||
@@ -21,11 +21,10 @@ use common_meta::metrics::METRIC_META_TXN_REQUEST;
|
||||
use common_meta::rpc::store::{
|
||||
BatchDeleteRequest, BatchDeleteResponse, BatchGetRequest, BatchGetResponse, BatchPutRequest,
|
||||
BatchPutResponse, CompareAndPutRequest, CompareAndPutResponse, DeleteRangeRequest,
|
||||
DeleteRangeResponse, MoveValueRequest, MoveValueResponse, PutRequest, PutResponse,
|
||||
RangeRequest, RangeResponse,
|
||||
DeleteRangeResponse, PutRequest, PutResponse, RangeRequest, RangeResponse,
|
||||
};
|
||||
use common_meta::rpc::KeyValue;
|
||||
use common_telemetry::{timer, warn};
|
||||
use common_telemetry::timer;
|
||||
use etcd_client::{
|
||||
Client, Compare, CompareOp, DeleteOptions, GetOptions, PutOptions, Txn, TxnOp, TxnOpResponse,
|
||||
TxnResponse,
|
||||
@@ -289,79 +288,6 @@ impl KvBackend for EtcdStore {
|
||||
|
||||
Ok(BatchDeleteResponse { prev_kvs })
|
||||
}
|
||||
|
||||
async fn move_value(&self, req: MoveValueRequest) -> Result<MoveValueResponse> {
|
||||
let MoveValue {
|
||||
from_key,
|
||||
to_key,
|
||||
delete_options,
|
||||
} = req.try_into()?;
|
||||
|
||||
let mut client = self.client.kv_client();
|
||||
|
||||
// TODO(jiachun): Maybe it's better to let the users control it in the request
|
||||
const MAX_RETRIES: usize = 8;
|
||||
for _ in 0..MAX_RETRIES {
|
||||
let from_key = from_key.as_slice();
|
||||
let to_key = to_key.as_slice();
|
||||
|
||||
let res = client
|
||||
.get(from_key, None)
|
||||
.await
|
||||
.context(error::EtcdFailedSnafu)?;
|
||||
|
||||
let txn = match res.kvs().first() {
|
||||
None => {
|
||||
// get `to_key` if `from_key` absent
|
||||
// revision 0 means key was not exist
|
||||
let compare = Compare::create_revision(from_key, CompareOp::Equal, 0);
|
||||
let get = TxnOp::get(to_key, None);
|
||||
Txn::new().when(vec![compare]).and_then(vec![get])
|
||||
}
|
||||
Some(kv) => {
|
||||
// compare `from_key` and move to `to_key`
|
||||
let value = kv.value();
|
||||
let compare = Compare::value(from_key, CompareOp::Equal, value);
|
||||
let delete = TxnOp::delete(from_key, delete_options.clone());
|
||||
let put = TxnOp::put(to_key, value, None);
|
||||
Txn::new().when(vec![compare]).and_then(vec![delete, put])
|
||||
}
|
||||
};
|
||||
|
||||
let txn_res = client.txn(txn).await.context(error::EtcdFailedSnafu)?;
|
||||
|
||||
if !txn_res.succeeded() {
|
||||
warn!(
|
||||
"Failed to atomically move {:?} to {:?}, try again...",
|
||||
String::from_utf8_lossy(from_key),
|
||||
String::from_utf8_lossy(to_key)
|
||||
);
|
||||
continue;
|
||||
}
|
||||
|
||||
// [`get_res'] or [`delete_res`, `put_res`], `put_res` will be ignored.
|
||||
for op_res in txn_res.op_responses() {
|
||||
match op_res {
|
||||
TxnOpResponse::Get(res) => {
|
||||
return Ok(MoveValueResponse(
|
||||
res.kvs().first().map(KvPair::from_etcd_kv),
|
||||
));
|
||||
}
|
||||
TxnOpResponse::Delete(res) => {
|
||||
return Ok(MoveValueResponse(
|
||||
res.prev_kvs().first().map(KvPair::from_etcd_kv),
|
||||
));
|
||||
}
|
||||
_ => {}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
error::MoveValueSnafu {
|
||||
key: String::from_utf8_lossy(&from_key),
|
||||
}
|
||||
.fail()
|
||||
}
|
||||
}
|
||||
|
||||
#[async_trait::async_trait]
|
||||
@@ -570,26 +496,6 @@ impl TryFrom<DeleteRangeRequest> for Delete {
|
||||
}
|
||||
}
|
||||
|
||||
struct MoveValue {
|
||||
from_key: Vec<u8>,
|
||||
to_key: Vec<u8>,
|
||||
delete_options: Option<DeleteOptions>,
|
||||
}
|
||||
|
||||
impl TryFrom<MoveValueRequest> for MoveValue {
|
||||
type Error = Error;
|
||||
|
||||
fn try_from(req: MoveValueRequest) -> Result<Self> {
|
||||
let MoveValueRequest { from_key, to_key } = req;
|
||||
|
||||
Ok(MoveValue {
|
||||
from_key,
|
||||
to_key,
|
||||
delete_options: Some(DeleteOptions::default().with_prev_key()),
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
@@ -701,18 +607,4 @@ mod tests {
|
||||
assert_eq!(b"test_key".to_vec(), delete.key);
|
||||
let _ = delete.options.unwrap();
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_parse_move_value() {
|
||||
let req = MoveValueRequest {
|
||||
from_key: b"test_from_key".to_vec(),
|
||||
to_key: b"test_to_key".to_vec(),
|
||||
};
|
||||
|
||||
let move_value: MoveValue = req.try_into().unwrap();
|
||||
|
||||
assert_eq!(b"test_from_key".to_vec(), move_value.from_key);
|
||||
assert_eq!(b"test_to_key".to_vec(), move_value.to_key);
|
||||
let _ = move_value.delete_options.unwrap();
|
||||
}
|
||||
}
|
||||
|
||||
@@ -23,8 +23,7 @@ use common_meta::kv_backend::{KvBackend, KvBackendRef, TxnService};
|
||||
use common_meta::rpc::store::{
|
||||
BatchDeleteRequest, BatchDeleteResponse, BatchGetRequest, BatchGetResponse, BatchPutRequest,
|
||||
BatchPutResponse, CompareAndPutRequest, CompareAndPutResponse, DeleteRangeRequest,
|
||||
DeleteRangeResponse, MoveValueRequest, MoveValueResponse, PutRequest, PutResponse,
|
||||
RangeRequest, RangeResponse,
|
||||
DeleteRangeResponse, PutRequest, PutResponse, RangeRequest, RangeResponse,
|
||||
};
|
||||
use snafu::ResultExt;
|
||||
|
||||
@@ -133,12 +132,4 @@ impl KvBackend for KvBackendAdapter {
|
||||
.map_err(BoxedError::new)
|
||||
.context(ExternalSnafu)
|
||||
}
|
||||
|
||||
async fn move_value(&self, req: MoveValueRequest) -> Result<MoveValueResponse, Self::Error> {
|
||||
self.0
|
||||
.move_value(req)
|
||||
.await
|
||||
.map_err(BoxedError::new)
|
||||
.context(ExternalSnafu)
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user