refactor: move the batch_get to KvStore trait (#1029)

* move batch_get from KvStoreExt to KvStore

* add some unit tests

* add some unit test

* add some unit tests

* expose batch_get grpc method
This commit is contained in:
fys
2023-03-06 17:35:43 +08:00
committed by GitHub
parent 5a397917c0
commit ff6cfe8e70
12 changed files with 587 additions and 69 deletions

View File

@@ -32,9 +32,10 @@ use crate::error::Result;
use crate::rpc::lock::{LockRequest, LockResponse, UnlockRequest};
use crate::rpc::router::DeleteRequest;
use crate::rpc::{
BatchPutRequest, BatchPutResponse, CompareAndPutRequest, CompareAndPutResponse, CreateRequest,
DeleteRangeRequest, DeleteRangeResponse, MoveValueRequest, MoveValueResponse, PutRequest,
PutResponse, RangeRequest, RangeResponse, RouteRequest, RouteResponse,
BatchGetRequest, BatchGetResponse, BatchPutRequest, BatchPutResponse, CompareAndPutRequest,
CompareAndPutResponse, CreateRequest, DeleteRangeRequest, DeleteRangeResponse,
MoveValueRequest, MoveValueResponse, PutRequest, PutResponse, RangeRequest, RangeResponse,
RouteRequest, RouteResponse,
};
pub type Id = (u64, u64);
@@ -245,6 +246,11 @@ impl MetaClient {
self.store_client()?.put(req.into()).await?.try_into()
}
/// BatchGet atomically get values by the given keys from the key-value store.
pub async fn batch_get(&self, req: BatchGetRequest) -> Result<BatchGetResponse> {
self.store_client()?.batch_get(req.into()).await?.try_into()
}
/// BatchPut atomically puts the given keys into the key-value store.
pub async fn batch_put(&self, req: BatchPutRequest) -> Result<BatchPutResponse> {
self.store_client()?.batch_put(req.into()).await?.try_into()
@@ -713,6 +719,26 @@ mod tests {
assert_eq!(2, kvs.len());
}
#[tokio::test]
async fn test_batch_get() {
let tc = new_client("test_batch_get").await;
tc.gen_data().await;
let req = BatchGetRequest::default()
.add_key(tc.key("key-1"))
.add_key(tc.key("key-2"));
let mut res = tc.client.batch_get(req).await.unwrap();
assert_eq!(2, res.take_kvs().len());
let req = BatchGetRequest::default()
.add_key(tc.key("key-1"))
.add_key(tc.key("key-222"));
let mut res = tc.client.batch_get(req).await.unwrap();
assert_eq!(1, res.take_kvs().len());
}
#[tokio::test]
async fn test_batch_put_with_prev_kv() {
let tc = new_client("test_batch_put_with_prev_kv").await;

View File

@@ -17,9 +17,9 @@ use std::sync::Arc;
use api::v1::meta::store_client::StoreClient;
use api::v1::meta::{
BatchPutRequest, BatchPutResponse, CompareAndPutRequest, CompareAndPutResponse,
DeleteRangeRequest, DeleteRangeResponse, MoveValueRequest, MoveValueResponse, PutRequest,
PutResponse, RangeRequest, RangeResponse,
BatchGetRequest, BatchGetResponse, BatchPutRequest, BatchPutResponse, CompareAndPutRequest,
CompareAndPutResponse, DeleteRangeRequest, DeleteRangeResponse, MoveValueRequest,
MoveValueResponse, PutRequest, PutResponse, RangeRequest, RangeResponse,
};
use common_grpc::channel_manager::ChannelManager;
use snafu::{ensure, OptionExt, ResultExt};
@@ -70,6 +70,11 @@ impl Client {
inner.put(req).await
}
pub async fn batch_get(&self, req: BatchGetRequest) -> Result<BatchGetResponse> {
let inner = self.inner.read().await;
inner.batch_get(req).await
}
pub async fn batch_put(&self, req: BatchPutRequest) -> Result<BatchPutResponse> {
let inner = self.inner.read().await;
inner.batch_put(req).await
@@ -141,6 +146,18 @@ impl Inner {
Ok(res.into_inner())
}
async fn batch_get(&self, mut req: BatchGetRequest) -> Result<BatchGetResponse> {
let mut client = self.random_client()?;
req.set_header(self.id);
let res = client
.batch_get(req)
.await
.context(error::TonicStatusSnafu)?;
Ok(res.into_inner())
}
async fn batch_put(&self, mut req: BatchPutRequest) -> Result<BatchPutResponse> {
let mut client = self.random_client()?;
req.set_header(self.id);

View File

@@ -28,9 +28,9 @@ pub use router::{
};
use serde::{Deserialize, Serialize};
pub use store::{
BatchPutRequest, BatchPutResponse, CompareAndPutRequest, CompareAndPutResponse,
DeleteRangeRequest, DeleteRangeResponse, MoveValueRequest, MoveValueResponse, PutRequest,
PutResponse, RangeRequest, RangeResponse,
BatchGetRequest, BatchGetResponse, BatchPutRequest, BatchPutResponse, CompareAndPutRequest,
CompareAndPutResponse, DeleteRangeRequest, DeleteRangeResponse, MoveValueRequest,
MoveValueResponse, PutRequest, PutResponse, RangeRequest, RangeResponse,
};
#[derive(Debug, Clone)]

View File

@@ -13,6 +13,7 @@
// limitations under the License.
use api::v1::meta::{
BatchGetRequest as PbBatchGetRequest, BatchGetResponse as PbBatchGetResponse,
BatchPutRequest as PbBatchPutRequest, BatchPutResponse as PbBatchPutResponse,
CompareAndPutRequest as PbCompareAndPutRequest,
CompareAndPutResponse as PbCompareAndPutResponse, DeleteRangeRequest as PbDeleteRangeRequest,
@@ -239,6 +240,68 @@ impl PutResponse {
}
}
pub struct BatchGetRequest {
pub keys: Vec<Vec<u8>>,
}
impl From<BatchGetRequest> for PbBatchGetRequest {
fn from(req: BatchGetRequest) -> Self {
Self {
header: None,
keys: req.keys,
}
}
}
impl Default for BatchGetRequest {
fn default() -> Self {
Self::new()
}
}
impl BatchGetRequest {
#[inline]
pub fn new() -> Self {
Self { keys: vec![] }
}
#[inline]
pub fn add_key(mut self, key: impl Into<Vec<u8>>) -> Self {
self.keys.push(key.into());
self
}
}
#[derive(Debug, Clone)]
pub struct BatchGetResponse(PbBatchGetResponse);
impl TryFrom<PbBatchGetResponse> for BatchGetResponse {
type Error = error::Error;
fn try_from(pb: PbBatchGetResponse) -> Result<Self> {
util::check_response_header(pb.header.as_ref())?;
Ok(Self(pb))
}
}
impl BatchGetResponse {
#[inline]
pub fn new(res: PbBatchGetResponse) -> Self {
Self(res)
}
#[inline]
pub fn take_header(&mut self) -> Option<ResponseHeader> {
self.0.header.take().map(ResponseHeader::new)
}
#[inline]
pub fn take_kvs(&mut self) -> Vec<KeyValue> {
self.0.kvs.drain(..).map(KeyValue::new).collect()
}
}
#[derive(Debug, Clone, Default)]
pub struct BatchPutRequest {
pub kvs: Vec<PbKeyValue>,
@@ -699,6 +762,40 @@ mod tests {
assert_eq!(b"v1".to_vec(), kv.take_value());
}
#[test]
fn test_batch_get_request_trans() {
let req = BatchGetRequest::default()
.add_key(b"test_key1".to_vec())
.add_key(b"test_key2".to_vec())
.add_key(b"test_key3".to_vec());
let into_req: PbBatchGetRequest = req.into();
assert!(into_req.header.is_none());
assert_eq!(b"test_key1".as_slice(), into_req.keys.get(0).unwrap());
assert_eq!(b"test_key2".as_slice(), into_req.keys.get(1).unwrap());
assert_eq!(b"test_key3".as_slice(), into_req.keys.get(2).unwrap());
}
#[test]
fn test_batch_get_response_trans() {
let pb_res = PbBatchGetResponse {
header: None,
kvs: vec![PbKeyValue {
key: b"test_key1".to_vec(),
value: b"test_value1".to_vec(),
}],
};
let mut res = BatchGetResponse::new(pb_res);
assert!(res.take_header().is_none());
let kvs = res.take_kvs();
assert_eq!(b"test_key1".as_slice(), kvs[0].key());
assert_eq!(b"test_value1".as_slice(), kvs[0].value());
}
#[test]
fn test_batch_put_request_trans() {
let req = BatchPutRequest::new()

View File

@@ -27,7 +27,6 @@ use snafu::{ensure, OptionExt, ResultExt};
use crate::error::{match_for_io_error, Result};
use crate::keys::{StatKey, StatValue, DN_STAT_PREFIX};
use crate::metasrv::ElectionRef;
use crate::service::store::ext::KvStoreExt;
use crate::service::store::kv::ResettableKvStoreRef;
use crate::{error, util};
@@ -130,7 +129,12 @@ impl MetaPeerClient {
// Get kv information from the leader's in_mem kv store
pub async fn batch_get(&self, keys: Vec<Vec<u8>>) -> Result<Vec<KeyValue>> {
if self.is_leader() {
return self.in_memory.batch_get(keys).await;
let request = BatchGetRequest {
keys,
..Default::default()
};
return self.in_memory.batch_get(request).await.map(|resp| resp.kvs);
}
let max_retry_count = self.max_retry_count;

View File

@@ -150,6 +150,8 @@ impl Inner {
mod tests {
use std::sync::Arc;
use api::v1::meta::{BatchGetRequest, BatchGetResponse};
use super::*;
use crate::service::store::kv::KvStore;
use crate::service::store::memory::MemStore;
@@ -192,6 +194,10 @@ mod tests {
unreachable!()
}
async fn batch_get(&self, _: BatchGetRequest) -> Result<BatchGetResponse> {
unreachable!()
}
async fn compare_and_put(
&self,
_: CompareAndPutRequest,

View File

@@ -20,7 +20,6 @@ use common_telemetry::warn;
use tonic::{Request, Response};
use crate::metasrv::MetaSrv;
use crate::service::store::ext::KvStoreExt;
use crate::service::GrpcResult;
#[async_trait::async_trait]
@@ -37,8 +36,8 @@ impl cluster_server::Cluster for MetaSrv {
return Ok(Response::new(resp));
}
let req = req.into_inner();
let kvs = self.in_memory().batch_get(req.keys).await?;
let kvs = self.in_memory().batch_get(req.into_inner()).await?.kvs;
let success = ResponseHeader::success(0);
let get_resp = BatchGetResponse {

View File

@@ -22,7 +22,7 @@ use api::v1::meta::{
CompareAndPutRequest, CompareAndPutResponse, DeleteRangeRequest, DeleteRangeResponse,
MoveValueRequest, MoveValueResponse, PutRequest, PutResponse, RangeRequest, RangeResponse,
};
use tonic::{Request, Response, Status};
use tonic::{Request, Response};
use crate::metasrv::MetaSrv;
use crate::service::GrpcResult;
@@ -43,12 +43,11 @@ impl store_server::Store for MetaSrv {
Ok(Response::new(res))
}
async fn batch_get(
&self,
_request: Request<BatchGetRequest>,
) -> Result<Response<BatchGetResponse>, Status> {
// TODO(fys): please fix this
unimplemented!()
async fn batch_get(&self, req: Request<BatchGetRequest>) -> GrpcResult<BatchGetResponse> {
let req = req.into_inner();
let res = self.kv_store().batch_get(req).await?;
Ok(Response::new(res))
}
async fn batch_put(&self, req: Request<BatchPutRequest>) -> GrpcResult<BatchPutResponse> {
@@ -121,6 +120,18 @@ mod tests {
assert!(res.is_ok());
}
#[tokio::test]
async fn test_batch_get() {
let kv_store = Arc::new(MemStore::new());
let meta_srv = MetaSrvBuilder::new().kv_store(kv_store).build().await;
let req = BatchGetRequest::default();
let res = meta_srv.batch_get(req.into_request()).await;
assert!(res.is_ok());
}
#[tokio::test]
async fn test_batch_put() {
let kv_store = Arc::new(MemStore::new());

View File

@@ -15,9 +15,9 @@
use std::sync::Arc;
use api::v1::meta::{
BatchPutRequest, BatchPutResponse, CompareAndPutRequest, CompareAndPutResponse,
DeleteRangeRequest, DeleteRangeResponse, KeyValue, MoveValueRequest, MoveValueResponse,
PutRequest, PutResponse, RangeRequest, RangeResponse, ResponseHeader,
BatchGetRequest, BatchGetResponse, BatchPutRequest, BatchPutResponse, CompareAndPutRequest,
CompareAndPutResponse, DeleteRangeRequest, DeleteRangeResponse, KeyValue, MoveValueRequest,
MoveValueResponse, PutRequest, PutResponse, RangeRequest, RangeResponse, ResponseHeader,
};
use common_error::prelude::*;
use common_telemetry::warn;
@@ -98,6 +98,40 @@ impl KvStore for EtcdStore {
Ok(PutResponse { header, prev_kv })
}
async fn batch_get(&self, req: BatchGetRequest) -> Result<BatchGetResponse> {
let BatchGet {
cluster_id,
keys,
options,
} = req.try_into()?;
let get_ops: Vec<_> = keys
.into_iter()
.map(|k| TxnOp::get(k, options.clone()))
.collect();
let txn = Txn::new().and_then(get_ops);
let txn_res = self
.client
.kv_client()
.txn(txn)
.await
.context(error::EtcdFailedSnafu)?;
let mut kvs = vec![];
for op_res in txn_res.op_responses() {
let get_res = match op_res {
TxnOpResponse::Get(get_res) => get_res,
_ => unreachable!(),
};
kvs.extend(get_res.kvs().iter().map(KvPair::to_kv));
}
let header = Some(ResponseHeader::success(cluster_id));
Ok(BatchGetResponse { header, kvs })
}
async fn batch_put(&self, req: BatchPutRequest) -> Result<BatchPutResponse> {
let BatchPut {
cluster_id,
@@ -360,6 +394,28 @@ impl TryFrom<PutRequest> for Put {
}
}
struct BatchGet {
cluster_id: u64,
keys: Vec<Vec<u8>>,
options: Option<GetOptions>,
}
impl TryFrom<BatchGetRequest> for BatchGet {
type Error = error::Error;
fn try_from(req: BatchGetRequest) -> Result<Self> {
let BatchGetRequest { header, keys } = req;
let options = GetOptions::default().with_keys_only();
Ok(BatchGet {
cluster_id: header.map_or(0, |h| h.cluster_id),
keys,
options: Some(options),
})
}
}
struct BatchPut {
cluster_id: u64,
kvs: Vec<KeyValue>,
@@ -539,6 +595,21 @@ mod tests {
assert!(put.options.is_some());
}
#[test]
fn test_parse_batch_get() {
let req = BatchGetRequest {
keys: vec![b"k1".to_vec(), b"k2".to_vec(), b"k3".to_vec()],
..Default::default()
};
let batch_get: BatchGet = req.try_into().unwrap();
let keys = batch_get.keys;
assert_eq!(b"k1".to_vec(), keys.get(0).unwrap().to_vec());
assert_eq!(b"k2".to_vec(), keys.get(1).unwrap().to_vec());
assert_eq!(b"k3".to_vec(), keys.get(2).unwrap().to_vec());
}
#[test]
fn test_parse_batch_put() {
let req = BatchPutRequest {

View File

@@ -21,8 +21,6 @@ use crate::service::store::kv::KvStore;
#[async_trait::async_trait]
pub trait KvStoreExt {
async fn get(&self, key: Vec<u8>) -> Result<Option<KeyValue>>;
async fn batch_get(&self, key: Vec<Vec<u8>>) -> Result<Vec<KeyValue>>;
}
#[async_trait::async_trait]
@@ -53,18 +51,6 @@ where
// Safety: the length check has been performed before using unwrap()
Ok(Some(kvs.pop().unwrap()))
}
async fn batch_get(&self, keys: Vec<Vec<u8>>) -> Result<Vec<KeyValue>> {
let mut kvs = Vec::with_capacity(keys.len());
for key in keys {
if let Some(kv) = self.get(key).await? {
kvs.push(kv);
}
}
Ok(kvs)
}
}
#[cfg(test)]
@@ -106,31 +92,6 @@ mod tests {
assert!(may_kv.is_none());
}
#[tokio::test]
async fn test_batch_get() {
let mut in_mem = Arc::new(MemStore::new()) as KvStoreRef;
put_stats_to_store(&mut in_mem).await;
let keys = vec![
"test_key1".as_bytes().to_vec(),
"test_key1".as_bytes().to_vec(),
"test_key2".as_bytes().to_vec(),
];
let kvs = in_mem.batch_get(keys).await.unwrap();
assert_eq!(3, kvs.len());
assert_eq!("test_key1".as_bytes(), kvs[0].key);
assert_eq!("test_key1".as_bytes(), kvs[1].key);
assert_eq!("test_key2".as_bytes(), kvs[2].key);
assert_eq!("test_val1".as_bytes(), kvs[0].value);
assert_eq!("test_val1".as_bytes(), kvs[1].value);
assert_eq!("test_val2".as_bytes(), kvs[2].value);
}
async fn put_stats_to_store(store: &mut KvStoreRef) {
store
.put(PutRequest {

View File

@@ -15,9 +15,9 @@
use std::sync::Arc;
use api::v1::meta::{
BatchPutRequest, BatchPutResponse, CompareAndPutRequest, CompareAndPutResponse,
DeleteRangeRequest, DeleteRangeResponse, MoveValueRequest, MoveValueResponse, PutRequest,
PutResponse, RangeRequest, RangeResponse,
BatchGetRequest, BatchGetResponse, BatchPutRequest, BatchPutResponse, CompareAndPutRequest,
CompareAndPutResponse, DeleteRangeRequest, DeleteRangeResponse, MoveValueRequest,
MoveValueResponse, PutRequest, PutResponse, RangeRequest, RangeResponse,
};
use crate::error::Result;
@@ -31,6 +31,8 @@ pub trait KvStore: Send + Sync {
async fn put(&self, req: PutRequest) -> Result<PutResponse>;
async fn batch_get(&self, req: BatchGetRequest) -> Result<BatchGetResponse>;
async fn batch_put(&self, req: BatchPutRequest) -> Result<BatchPutResponse>;
async fn compare_and_put(&self, req: CompareAndPutRequest) -> Result<CompareAndPutResponse>;

View File

@@ -17,12 +17,13 @@ use std::collections::BTreeMap;
use std::ops::Range;
use api::v1::meta::{
BatchPutRequest, BatchPutResponse, CompareAndPutRequest, CompareAndPutResponse,
DeleteRangeRequest, DeleteRangeResponse, KeyValue, MoveValueRequest, MoveValueResponse,
PutRequest, PutResponse, RangeRequest, RangeResponse, ResponseHeader,
BatchGetRequest, BatchGetResponse, BatchPutRequest, BatchPutResponse, CompareAndPutRequest,
CompareAndPutResponse, DeleteRangeRequest, DeleteRangeResponse, KeyValue, MoveValueRequest,
MoveValueResponse, PutRequest, PutResponse, RangeRequest, RangeResponse, ResponseHeader,
};
use parking_lot::RwLock;
use super::ext::KvStoreExt;
use crate::error::Result;
use crate::service::store::kv::{KvStore, ResettableKvStore};
@@ -117,6 +118,22 @@ impl KvStore for MemStore {
Ok(PutResponse { header, prev_kv })
}
async fn batch_get(&self, req: BatchGetRequest) -> Result<BatchGetResponse> {
let keys = req.keys;
let mut kvs = Vec::with_capacity(keys.len());
for key in keys {
if let Some(kv) = self.get(key).await? {
kvs.push(kv);
}
}
Ok(BatchGetResponse {
kvs,
..Default::default()
})
}
async fn batch_put(&self, req: BatchPutRequest) -> Result<BatchPutResponse> {
let BatchPutRequest {
header,
@@ -247,3 +264,310 @@ impl KvStore for MemStore {
Ok(MoveValueResponse { header, kv })
}
}
#[cfg(test)]
mod tests {
use std::sync::atomic::{AtomicU8, Ordering};
use std::sync::Arc;
use api::v1::meta::{
BatchGetRequest, BatchPutRequest, CompareAndPutRequest, DeleteRangeRequest, KeyValue,
MoveValueRequest, PutRequest, RangeRequest,
};
use super::MemStore;
use crate::service::store::ext::KvStoreExt;
use crate::service::store::kv::KvStore;
use crate::util;
async fn mock_mem_store_with_data() -> MemStore {
let kv_store = MemStore::new();
let kvs = mock_kvs();
kv_store
.batch_put(BatchPutRequest {
kvs,
..Default::default()
})
.await
.unwrap();
kv_store
.put(PutRequest {
key: b"key11".to_vec(),
value: b"val11".to_vec(),
..Default::default()
})
.await
.unwrap();
kv_store
}
fn mock_kvs() -> Vec<KeyValue> {
vec![
KeyValue {
key: b"key1".to_vec(),
value: b"val1".to_vec(),
},
KeyValue {
key: b"key2".to_vec(),
value: b"val2".to_vec(),
},
KeyValue {
key: b"key3".to_vec(),
value: b"val3".to_vec(),
},
]
}
#[tokio::test]
async fn test_put() {
let kv_store = mock_mem_store_with_data().await;
let resp = kv_store
.put(PutRequest {
key: b"key11".to_vec(),
value: b"val12".to_vec(),
..Default::default()
})
.await
.unwrap();
assert!(resp.prev_kv.is_none());
let resp = kv_store
.put(PutRequest {
key: b"key11".to_vec(),
value: b"val13".to_vec(),
prev_kv: true,
..Default::default()
})
.await
.unwrap();
assert_eq!(b"key11".as_slice(), resp.prev_kv.as_ref().unwrap().key);
assert_eq!(b"val12".as_slice(), resp.prev_kv.as_ref().unwrap().value);
}
#[tokio::test]
async fn test_range() {
let kv_store = mock_mem_store_with_data().await;
let key = b"key1".to_vec();
let range_end = util::get_prefix_end_key(b"key1");
let resp = kv_store
.range(RangeRequest {
key: key.clone(),
range_end: range_end.clone(),
limit: 0,
keys_only: false,
..Default::default()
})
.await
.unwrap();
assert_eq!(2, resp.kvs.len());
assert_eq!(b"key1".as_slice(), resp.kvs[0].key);
assert_eq!(b"val1".as_slice(), resp.kvs[0].value);
assert_eq!(b"key11".as_slice(), resp.kvs[1].key);
assert_eq!(b"val11".as_slice(), resp.kvs[1].value);
let resp = kv_store
.range(RangeRequest {
key: key.clone(),
range_end: range_end.clone(),
limit: 0,
keys_only: true,
..Default::default()
})
.await
.unwrap();
assert_eq!(2, resp.kvs.len());
assert_eq!(b"key1".as_slice(), resp.kvs[0].key);
assert_eq!(b"".as_slice(), resp.kvs[0].value);
assert_eq!(b"key11".as_slice(), resp.kvs[1].key);
assert_eq!(b"".as_slice(), resp.kvs[1].value);
let resp = kv_store
.range(RangeRequest {
key: key.clone(),
limit: 0,
keys_only: false,
..Default::default()
})
.await
.unwrap();
assert_eq!(1, resp.kvs.len());
assert_eq!(b"key1".as_slice(), resp.kvs[0].key);
assert_eq!(b"val1".as_slice(), resp.kvs[0].value);
let resp = kv_store
.range(RangeRequest {
key,
range_end,
limit: 1,
keys_only: false,
..Default::default()
})
.await
.unwrap();
assert_eq!(1, resp.kvs.len());
assert_eq!(b"key1".as_slice(), resp.kvs[0].key);
assert_eq!(b"val1".as_slice(), resp.kvs[0].value);
}
#[tokio::test]
async fn test_batch_get() {
let kv_store = mock_mem_store_with_data().await;
let keys = vec![];
let batch_resp = kv_store
.batch_get(BatchGetRequest {
keys,
..Default::default()
})
.await
.unwrap();
assert!(batch_resp.kvs.is_empty());
let keys = vec![b"key10".to_vec()];
let batch_resp = kv_store
.batch_get(BatchGetRequest {
keys,
..Default::default()
})
.await
.unwrap();
assert!(batch_resp.kvs.is_empty());
let keys = vec![b"key1".to_vec(), b"key3".to_vec(), b"key4".to_vec()];
let batch_resp = kv_store
.batch_get(BatchGetRequest {
keys,
..Default::default()
})
.await
.unwrap();
assert_eq!(2, batch_resp.kvs.len());
assert_eq!(b"key1".as_slice(), batch_resp.kvs[0].key);
assert_eq!(b"val1".as_slice(), batch_resp.kvs[0].value);
assert_eq!(b"key3".as_slice(), batch_resp.kvs[1].key);
assert_eq!(b"val3".as_slice(), batch_resp.kvs[1].value);
}
#[tokio::test(flavor = "multi_thread")]
async fn test_compare_and_put() {
let kv_store = Arc::new(MemStore::new());
let success = Arc::new(AtomicU8::new(0));
let mut joins = vec![];
for _ in 0..20 {
let kv_store_clone = kv_store.clone();
let success_clone = success.clone();
let join = tokio::spawn(async move {
let req = CompareAndPutRequest {
key: b"key".to_vec(),
expect: vec![],
value: b"val_new".to_vec(),
..Default::default()
};
let resp = kv_store_clone.compare_and_put(req).await.unwrap();
if resp.success {
success_clone.fetch_add(1, Ordering::SeqCst);
}
});
joins.push(join);
}
for join in joins {
join.await.unwrap();
}
assert_eq!(1, success.load(Ordering::SeqCst));
}
#[tokio::test]
async fn test_delete_range() {
let kv_store = mock_mem_store_with_data().await;
let req = DeleteRangeRequest {
key: b"key3".to_vec(),
range_end: vec![],
prev_kv: true,
..Default::default()
};
let resp = kv_store.delete_range(req).await.unwrap();
assert_eq!(1, resp.prev_kvs.len());
assert_eq!(b"key3".as_slice(), resp.prev_kvs[0].key);
assert_eq!(b"val3".as_slice(), resp.prev_kvs[0].value);
let get_resp = kv_store.get(b"key3".to_vec()).await.unwrap();
assert!(get_resp.is_none());
let req = DeleteRangeRequest {
key: b"key2".to_vec(),
range_end: vec![],
prev_kv: false,
..Default::default()
};
let resp = kv_store.delete_range(req).await.unwrap();
assert!(resp.prev_kvs.is_empty());
let get_resp = kv_store.get(b"key2".to_vec()).await.unwrap();
assert!(get_resp.is_none());
let key = b"key1".to_vec();
let range_end = util::get_prefix_end_key(b"key1");
let req = DeleteRangeRequest {
key: key.clone(),
range_end: range_end.clone(),
prev_kv: true,
..Default::default()
};
let resp = kv_store.delete_range(req).await.unwrap();
assert_eq!(2, resp.prev_kvs.len());
let req = RangeRequest {
key,
range_end,
..Default::default()
};
let resp = kv_store.range(req).await.unwrap();
assert!(resp.kvs.is_empty());
}
#[tokio::test]
async fn test_move_value() {
let kv_store = mock_mem_store_with_data().await;
let req = MoveValueRequest {
from_key: b"key1".to_vec(),
to_key: b"key111".to_vec(),
..Default::default()
};
let resp = kv_store.move_value(req).await.unwrap();
assert_eq!(b"key1".as_slice(), resp.kv.as_ref().unwrap().key);
assert_eq!(b"val1".as_slice(), resp.kv.as_ref().unwrap().value);
let kv_store = mock_mem_store_with_data().await;
let req = MoveValueRequest {
from_key: b"notexistkey".to_vec(),
to_key: b"key222".to_vec(),
..Default::default()
};
let resp = kv_store.move_value(req).await.unwrap();
assert!(resp.kv.is_none());
}
}