diff --git a/src/meta-client/src/client.rs b/src/meta-client/src/client.rs index 99f110f7da..8f2bfefb92 100644 --- a/src/meta-client/src/client.rs +++ b/src/meta-client/src/client.rs @@ -32,9 +32,10 @@ use crate::error::Result; use crate::rpc::lock::{LockRequest, LockResponse, UnlockRequest}; use crate::rpc::router::DeleteRequest; use crate::rpc::{ - BatchPutRequest, BatchPutResponse, CompareAndPutRequest, CompareAndPutResponse, CreateRequest, - DeleteRangeRequest, DeleteRangeResponse, MoveValueRequest, MoveValueResponse, PutRequest, - PutResponse, RangeRequest, RangeResponse, RouteRequest, RouteResponse, + BatchGetRequest, BatchGetResponse, BatchPutRequest, BatchPutResponse, CompareAndPutRequest, + CompareAndPutResponse, CreateRequest, DeleteRangeRequest, DeleteRangeResponse, + MoveValueRequest, MoveValueResponse, PutRequest, PutResponse, RangeRequest, RangeResponse, + RouteRequest, RouteResponse, }; pub type Id = (u64, u64); @@ -245,6 +246,11 @@ impl MetaClient { self.store_client()?.put(req.into()).await?.try_into() } + /// BatchGet atomically get values by the given keys from the key-value store. + pub async fn batch_get(&self, req: BatchGetRequest) -> Result { + self.store_client()?.batch_get(req.into()).await?.try_into() + } + /// BatchPut atomically puts the given keys into the key-value store. pub async fn batch_put(&self, req: BatchPutRequest) -> Result { self.store_client()?.batch_put(req.into()).await?.try_into() @@ -713,6 +719,26 @@ mod tests { assert_eq!(2, kvs.len()); } + #[tokio::test] + async fn test_batch_get() { + let tc = new_client("test_batch_get").await; + tc.gen_data().await; + + let req = BatchGetRequest::default() + .add_key(tc.key("key-1")) + .add_key(tc.key("key-2")); + let mut res = tc.client.batch_get(req).await.unwrap(); + + assert_eq!(2, res.take_kvs().len()); + + let req = BatchGetRequest::default() + .add_key(tc.key("key-1")) + .add_key(tc.key("key-222")); + let mut res = tc.client.batch_get(req).await.unwrap(); + + assert_eq!(1, res.take_kvs().len()); + } + #[tokio::test] async fn test_batch_put_with_prev_kv() { let tc = new_client("test_batch_put_with_prev_kv").await; diff --git a/src/meta-client/src/client/store.rs b/src/meta-client/src/client/store.rs index b7b890ec28..2b01009f3e 100644 --- a/src/meta-client/src/client/store.rs +++ b/src/meta-client/src/client/store.rs @@ -17,9 +17,9 @@ use std::sync::Arc; use api::v1::meta::store_client::StoreClient; use api::v1::meta::{ - BatchPutRequest, BatchPutResponse, CompareAndPutRequest, CompareAndPutResponse, - DeleteRangeRequest, DeleteRangeResponse, MoveValueRequest, MoveValueResponse, PutRequest, - PutResponse, RangeRequest, RangeResponse, + BatchGetRequest, BatchGetResponse, BatchPutRequest, BatchPutResponse, CompareAndPutRequest, + CompareAndPutResponse, DeleteRangeRequest, DeleteRangeResponse, MoveValueRequest, + MoveValueResponse, PutRequest, PutResponse, RangeRequest, RangeResponse, }; use common_grpc::channel_manager::ChannelManager; use snafu::{ensure, OptionExt, ResultExt}; @@ -70,6 +70,11 @@ impl Client { inner.put(req).await } + pub async fn batch_get(&self, req: BatchGetRequest) -> Result { + let inner = self.inner.read().await; + inner.batch_get(req).await + } + pub async fn batch_put(&self, req: BatchPutRequest) -> Result { let inner = self.inner.read().await; inner.batch_put(req).await @@ -141,6 +146,18 @@ impl Inner { Ok(res.into_inner()) } + async fn batch_get(&self, mut req: BatchGetRequest) -> Result { + let mut client = self.random_client()?; + req.set_header(self.id); + + let res = client + .batch_get(req) + .await + .context(error::TonicStatusSnafu)?; + + Ok(res.into_inner()) + } + async fn batch_put(&self, mut req: BatchPutRequest) -> Result { let mut client = self.random_client()?; req.set_header(self.id); diff --git a/src/meta-client/src/rpc.rs b/src/meta-client/src/rpc.rs index 2264f0033b..09a479cf71 100644 --- a/src/meta-client/src/rpc.rs +++ b/src/meta-client/src/rpc.rs @@ -28,9 +28,9 @@ pub use router::{ }; use serde::{Deserialize, Serialize}; pub use store::{ - BatchPutRequest, BatchPutResponse, CompareAndPutRequest, CompareAndPutResponse, - DeleteRangeRequest, DeleteRangeResponse, MoveValueRequest, MoveValueResponse, PutRequest, - PutResponse, RangeRequest, RangeResponse, + BatchGetRequest, BatchGetResponse, BatchPutRequest, BatchPutResponse, CompareAndPutRequest, + CompareAndPutResponse, DeleteRangeRequest, DeleteRangeResponse, MoveValueRequest, + MoveValueResponse, PutRequest, PutResponse, RangeRequest, RangeResponse, }; #[derive(Debug, Clone)] diff --git a/src/meta-client/src/rpc/store.rs b/src/meta-client/src/rpc/store.rs index 50e126148d..abb98d3ba0 100644 --- a/src/meta-client/src/rpc/store.rs +++ b/src/meta-client/src/rpc/store.rs @@ -13,6 +13,7 @@ // limitations under the License. use api::v1::meta::{ + BatchGetRequest as PbBatchGetRequest, BatchGetResponse as PbBatchGetResponse, BatchPutRequest as PbBatchPutRequest, BatchPutResponse as PbBatchPutResponse, CompareAndPutRequest as PbCompareAndPutRequest, CompareAndPutResponse as PbCompareAndPutResponse, DeleteRangeRequest as PbDeleteRangeRequest, @@ -239,6 +240,68 @@ impl PutResponse { } } +pub struct BatchGetRequest { + pub keys: Vec>, +} + +impl From for PbBatchGetRequest { + fn from(req: BatchGetRequest) -> Self { + Self { + header: None, + keys: req.keys, + } + } +} + +impl Default for BatchGetRequest { + fn default() -> Self { + Self::new() + } +} + +impl BatchGetRequest { + #[inline] + pub fn new() -> Self { + Self { keys: vec![] } + } + + #[inline] + pub fn add_key(mut self, key: impl Into>) -> Self { + self.keys.push(key.into()); + self + } +} + +#[derive(Debug, Clone)] +pub struct BatchGetResponse(PbBatchGetResponse); + +impl TryFrom for BatchGetResponse { + type Error = error::Error; + + fn try_from(pb: PbBatchGetResponse) -> Result { + util::check_response_header(pb.header.as_ref())?; + + Ok(Self(pb)) + } +} + +impl BatchGetResponse { + #[inline] + pub fn new(res: PbBatchGetResponse) -> Self { + Self(res) + } + + #[inline] + pub fn take_header(&mut self) -> Option { + self.0.header.take().map(ResponseHeader::new) + } + + #[inline] + pub fn take_kvs(&mut self) -> Vec { + self.0.kvs.drain(..).map(KeyValue::new).collect() + } +} + #[derive(Debug, Clone, Default)] pub struct BatchPutRequest { pub kvs: Vec, @@ -699,6 +762,40 @@ mod tests { assert_eq!(b"v1".to_vec(), kv.take_value()); } + #[test] + fn test_batch_get_request_trans() { + let req = BatchGetRequest::default() + .add_key(b"test_key1".to_vec()) + .add_key(b"test_key2".to_vec()) + .add_key(b"test_key3".to_vec()); + + let into_req: PbBatchGetRequest = req.into(); + + assert!(into_req.header.is_none()); + assert_eq!(b"test_key1".as_slice(), into_req.keys.get(0).unwrap()); + assert_eq!(b"test_key2".as_slice(), into_req.keys.get(1).unwrap()); + assert_eq!(b"test_key3".as_slice(), into_req.keys.get(2).unwrap()); + } + + #[test] + fn test_batch_get_response_trans() { + let pb_res = PbBatchGetResponse { + header: None, + kvs: vec![PbKeyValue { + key: b"test_key1".to_vec(), + value: b"test_value1".to_vec(), + }], + }; + let mut res = BatchGetResponse::new(pb_res); + + assert!(res.take_header().is_none()); + + let kvs = res.take_kvs(); + + assert_eq!(b"test_key1".as_slice(), kvs[0].key()); + assert_eq!(b"test_value1".as_slice(), kvs[0].value()); + } + #[test] fn test_batch_put_request_trans() { let req = BatchPutRequest::new() diff --git a/src/meta-srv/src/cluster.rs b/src/meta-srv/src/cluster.rs index 01c9bce7f0..297cda860a 100644 --- a/src/meta-srv/src/cluster.rs +++ b/src/meta-srv/src/cluster.rs @@ -27,7 +27,6 @@ use snafu::{ensure, OptionExt, ResultExt}; use crate::error::{match_for_io_error, Result}; use crate::keys::{StatKey, StatValue, DN_STAT_PREFIX}; use crate::metasrv::ElectionRef; -use crate::service::store::ext::KvStoreExt; use crate::service::store::kv::ResettableKvStoreRef; use crate::{error, util}; @@ -130,7 +129,12 @@ impl MetaPeerClient { // Get kv information from the leader's in_mem kv store pub async fn batch_get(&self, keys: Vec>) -> Result> { if self.is_leader() { - return self.in_memory.batch_get(keys).await; + let request = BatchGetRequest { + keys, + ..Default::default() + }; + + return self.in_memory.batch_get(request).await.map(|resp| resp.kvs); } let max_retry_count = self.max_retry_count; diff --git a/src/meta-srv/src/sequence.rs b/src/meta-srv/src/sequence.rs index 4b87f6b032..b6c9737c4f 100644 --- a/src/meta-srv/src/sequence.rs +++ b/src/meta-srv/src/sequence.rs @@ -150,6 +150,8 @@ impl Inner { mod tests { use std::sync::Arc; + use api::v1::meta::{BatchGetRequest, BatchGetResponse}; + use super::*; use crate::service::store::kv::KvStore; use crate::service::store::memory::MemStore; @@ -192,6 +194,10 @@ mod tests { unreachable!() } + async fn batch_get(&self, _: BatchGetRequest) -> Result { + unreachable!() + } + async fn compare_and_put( &self, _: CompareAndPutRequest, diff --git a/src/meta-srv/src/service/cluster.rs b/src/meta-srv/src/service/cluster.rs index 9ba265b666..257cf36d15 100644 --- a/src/meta-srv/src/service/cluster.rs +++ b/src/meta-srv/src/service/cluster.rs @@ -20,7 +20,6 @@ use common_telemetry::warn; use tonic::{Request, Response}; use crate::metasrv::MetaSrv; -use crate::service::store::ext::KvStoreExt; use crate::service::GrpcResult; #[async_trait::async_trait] @@ -37,8 +36,8 @@ impl cluster_server::Cluster for MetaSrv { return Ok(Response::new(resp)); } - let req = req.into_inner(); - let kvs = self.in_memory().batch_get(req.keys).await?; + let kvs = self.in_memory().batch_get(req.into_inner()).await?.kvs; + let success = ResponseHeader::success(0); let get_resp = BatchGetResponse { diff --git a/src/meta-srv/src/service/store.rs b/src/meta-srv/src/service/store.rs index ae7d1d22a2..92d3409816 100644 --- a/src/meta-srv/src/service/store.rs +++ b/src/meta-srv/src/service/store.rs @@ -22,7 +22,7 @@ use api::v1::meta::{ CompareAndPutRequest, CompareAndPutResponse, DeleteRangeRequest, DeleteRangeResponse, MoveValueRequest, MoveValueResponse, PutRequest, PutResponse, RangeRequest, RangeResponse, }; -use tonic::{Request, Response, Status}; +use tonic::{Request, Response}; use crate::metasrv::MetaSrv; use crate::service::GrpcResult; @@ -43,12 +43,11 @@ impl store_server::Store for MetaSrv { Ok(Response::new(res)) } - async fn batch_get( - &self, - _request: Request, - ) -> Result, Status> { - // TODO(fys): please fix this - unimplemented!() + async fn batch_get(&self, req: Request) -> GrpcResult { + let req = req.into_inner(); + let res = self.kv_store().batch_get(req).await?; + + Ok(Response::new(res)) } async fn batch_put(&self, req: Request) -> GrpcResult { @@ -121,6 +120,18 @@ mod tests { assert!(res.is_ok()); } + #[tokio::test] + async fn test_batch_get() { + let kv_store = Arc::new(MemStore::new()); + + let meta_srv = MetaSrvBuilder::new().kv_store(kv_store).build().await; + + let req = BatchGetRequest::default(); + let res = meta_srv.batch_get(req.into_request()).await; + + assert!(res.is_ok()); + } + #[tokio::test] async fn test_batch_put() { let kv_store = Arc::new(MemStore::new()); diff --git a/src/meta-srv/src/service/store/etcd.rs b/src/meta-srv/src/service/store/etcd.rs index 91cd1f7c16..5c0f1b0200 100644 --- a/src/meta-srv/src/service/store/etcd.rs +++ b/src/meta-srv/src/service/store/etcd.rs @@ -15,9 +15,9 @@ use std::sync::Arc; use api::v1::meta::{ - BatchPutRequest, BatchPutResponse, CompareAndPutRequest, CompareAndPutResponse, - DeleteRangeRequest, DeleteRangeResponse, KeyValue, MoveValueRequest, MoveValueResponse, - PutRequest, PutResponse, RangeRequest, RangeResponse, ResponseHeader, + BatchGetRequest, BatchGetResponse, BatchPutRequest, BatchPutResponse, CompareAndPutRequest, + CompareAndPutResponse, DeleteRangeRequest, DeleteRangeResponse, KeyValue, MoveValueRequest, + MoveValueResponse, PutRequest, PutResponse, RangeRequest, RangeResponse, ResponseHeader, }; use common_error::prelude::*; use common_telemetry::warn; @@ -98,6 +98,40 @@ impl KvStore for EtcdStore { Ok(PutResponse { header, prev_kv }) } + async fn batch_get(&self, req: BatchGetRequest) -> Result { + let BatchGet { + cluster_id, + keys, + options, + } = req.try_into()?; + + let get_ops: Vec<_> = keys + .into_iter() + .map(|k| TxnOp::get(k, options.clone())) + .collect(); + let txn = Txn::new().and_then(get_ops); + + let txn_res = self + .client + .kv_client() + .txn(txn) + .await + .context(error::EtcdFailedSnafu)?; + + let mut kvs = vec![]; + for op_res in txn_res.op_responses() { + let get_res = match op_res { + TxnOpResponse::Get(get_res) => get_res, + _ => unreachable!(), + }; + + kvs.extend(get_res.kvs().iter().map(KvPair::to_kv)); + } + + let header = Some(ResponseHeader::success(cluster_id)); + Ok(BatchGetResponse { header, kvs }) + } + async fn batch_put(&self, req: BatchPutRequest) -> Result { let BatchPut { cluster_id, @@ -360,6 +394,28 @@ impl TryFrom for Put { } } +struct BatchGet { + cluster_id: u64, + keys: Vec>, + options: Option, +} + +impl TryFrom for BatchGet { + type Error = error::Error; + + fn try_from(req: BatchGetRequest) -> Result { + let BatchGetRequest { header, keys } = req; + + let options = GetOptions::default().with_keys_only(); + + Ok(BatchGet { + cluster_id: header.map_or(0, |h| h.cluster_id), + keys, + options: Some(options), + }) + } +} + struct BatchPut { cluster_id: u64, kvs: Vec, @@ -539,6 +595,21 @@ mod tests { assert!(put.options.is_some()); } + #[test] + fn test_parse_batch_get() { + let req = BatchGetRequest { + keys: vec![b"k1".to_vec(), b"k2".to_vec(), b"k3".to_vec()], + ..Default::default() + }; + + let batch_get: BatchGet = req.try_into().unwrap(); + let keys = batch_get.keys; + + assert_eq!(b"k1".to_vec(), keys.get(0).unwrap().to_vec()); + assert_eq!(b"k2".to_vec(), keys.get(1).unwrap().to_vec()); + assert_eq!(b"k3".to_vec(), keys.get(2).unwrap().to_vec()); + } + #[test] fn test_parse_batch_put() { let req = BatchPutRequest { diff --git a/src/meta-srv/src/service/store/ext.rs b/src/meta-srv/src/service/store/ext.rs index ba0cce2a46..30b28c9a2f 100644 --- a/src/meta-srv/src/service/store/ext.rs +++ b/src/meta-srv/src/service/store/ext.rs @@ -21,8 +21,6 @@ use crate::service::store::kv::KvStore; #[async_trait::async_trait] pub trait KvStoreExt { async fn get(&self, key: Vec) -> Result>; - - async fn batch_get(&self, key: Vec>) -> Result>; } #[async_trait::async_trait] @@ -53,18 +51,6 @@ where // Safety: the length check has been performed before using unwrap() Ok(Some(kvs.pop().unwrap())) } - - async fn batch_get(&self, keys: Vec>) -> Result> { - let mut kvs = Vec::with_capacity(keys.len()); - - for key in keys { - if let Some(kv) = self.get(key).await? { - kvs.push(kv); - } - } - - Ok(kvs) - } } #[cfg(test)] @@ -106,31 +92,6 @@ mod tests { assert!(may_kv.is_none()); } - #[tokio::test] - async fn test_batch_get() { - let mut in_mem = Arc::new(MemStore::new()) as KvStoreRef; - - put_stats_to_store(&mut in_mem).await; - - let keys = vec![ - "test_key1".as_bytes().to_vec(), - "test_key1".as_bytes().to_vec(), - "test_key2".as_bytes().to_vec(), - ]; - - let kvs = in_mem.batch_get(keys).await.unwrap(); - - assert_eq!(3, kvs.len()); - - assert_eq!("test_key1".as_bytes(), kvs[0].key); - assert_eq!("test_key1".as_bytes(), kvs[1].key); - assert_eq!("test_key2".as_bytes(), kvs[2].key); - - assert_eq!("test_val1".as_bytes(), kvs[0].value); - assert_eq!("test_val1".as_bytes(), kvs[1].value); - assert_eq!("test_val2".as_bytes(), kvs[2].value); - } - async fn put_stats_to_store(store: &mut KvStoreRef) { store .put(PutRequest { diff --git a/src/meta-srv/src/service/store/kv.rs b/src/meta-srv/src/service/store/kv.rs index 30e236e19e..1824922f13 100644 --- a/src/meta-srv/src/service/store/kv.rs +++ b/src/meta-srv/src/service/store/kv.rs @@ -15,9 +15,9 @@ use std::sync::Arc; use api::v1::meta::{ - BatchPutRequest, BatchPutResponse, CompareAndPutRequest, CompareAndPutResponse, - DeleteRangeRequest, DeleteRangeResponse, MoveValueRequest, MoveValueResponse, PutRequest, - PutResponse, RangeRequest, RangeResponse, + BatchGetRequest, BatchGetResponse, BatchPutRequest, BatchPutResponse, CompareAndPutRequest, + CompareAndPutResponse, DeleteRangeRequest, DeleteRangeResponse, MoveValueRequest, + MoveValueResponse, PutRequest, PutResponse, RangeRequest, RangeResponse, }; use crate::error::Result; @@ -31,6 +31,8 @@ pub trait KvStore: Send + Sync { async fn put(&self, req: PutRequest) -> Result; + async fn batch_get(&self, req: BatchGetRequest) -> Result; + async fn batch_put(&self, req: BatchPutRequest) -> Result; async fn compare_and_put(&self, req: CompareAndPutRequest) -> Result; diff --git a/src/meta-srv/src/service/store/memory.rs b/src/meta-srv/src/service/store/memory.rs index 2769065ae2..1d5ce93dd2 100644 --- a/src/meta-srv/src/service/store/memory.rs +++ b/src/meta-srv/src/service/store/memory.rs @@ -17,12 +17,13 @@ use std::collections::BTreeMap; use std::ops::Range; use api::v1::meta::{ - BatchPutRequest, BatchPutResponse, CompareAndPutRequest, CompareAndPutResponse, - DeleteRangeRequest, DeleteRangeResponse, KeyValue, MoveValueRequest, MoveValueResponse, - PutRequest, PutResponse, RangeRequest, RangeResponse, ResponseHeader, + BatchGetRequest, BatchGetResponse, BatchPutRequest, BatchPutResponse, CompareAndPutRequest, + CompareAndPutResponse, DeleteRangeRequest, DeleteRangeResponse, KeyValue, MoveValueRequest, + MoveValueResponse, PutRequest, PutResponse, RangeRequest, RangeResponse, ResponseHeader, }; use parking_lot::RwLock; +use super::ext::KvStoreExt; use crate::error::Result; use crate::service::store::kv::{KvStore, ResettableKvStore}; @@ -117,6 +118,22 @@ impl KvStore for MemStore { Ok(PutResponse { header, prev_kv }) } + async fn batch_get(&self, req: BatchGetRequest) -> Result { + let keys = req.keys; + + let mut kvs = Vec::with_capacity(keys.len()); + for key in keys { + if let Some(kv) = self.get(key).await? { + kvs.push(kv); + } + } + + Ok(BatchGetResponse { + kvs, + ..Default::default() + }) + } + async fn batch_put(&self, req: BatchPutRequest) -> Result { let BatchPutRequest { header, @@ -247,3 +264,310 @@ impl KvStore for MemStore { Ok(MoveValueResponse { header, kv }) } } + +#[cfg(test)] +mod tests { + use std::sync::atomic::{AtomicU8, Ordering}; + use std::sync::Arc; + + use api::v1::meta::{ + BatchGetRequest, BatchPutRequest, CompareAndPutRequest, DeleteRangeRequest, KeyValue, + MoveValueRequest, PutRequest, RangeRequest, + }; + + use super::MemStore; + use crate::service::store::ext::KvStoreExt; + use crate::service::store::kv::KvStore; + use crate::util; + + async fn mock_mem_store_with_data() -> MemStore { + let kv_store = MemStore::new(); + let kvs = mock_kvs(); + + kv_store + .batch_put(BatchPutRequest { + kvs, + ..Default::default() + }) + .await + .unwrap(); + + kv_store + .put(PutRequest { + key: b"key11".to_vec(), + value: b"val11".to_vec(), + ..Default::default() + }) + .await + .unwrap(); + + kv_store + } + + fn mock_kvs() -> Vec { + vec![ + KeyValue { + key: b"key1".to_vec(), + value: b"val1".to_vec(), + }, + KeyValue { + key: b"key2".to_vec(), + value: b"val2".to_vec(), + }, + KeyValue { + key: b"key3".to_vec(), + value: b"val3".to_vec(), + }, + ] + } + + #[tokio::test] + async fn test_put() { + let kv_store = mock_mem_store_with_data().await; + + let resp = kv_store + .put(PutRequest { + key: b"key11".to_vec(), + value: b"val12".to_vec(), + ..Default::default() + }) + .await + .unwrap(); + assert!(resp.prev_kv.is_none()); + + let resp = kv_store + .put(PutRequest { + key: b"key11".to_vec(), + value: b"val13".to_vec(), + prev_kv: true, + ..Default::default() + }) + .await + .unwrap(); + assert_eq!(b"key11".as_slice(), resp.prev_kv.as_ref().unwrap().key); + assert_eq!(b"val12".as_slice(), resp.prev_kv.as_ref().unwrap().value); + } + + #[tokio::test] + async fn test_range() { + let kv_store = mock_mem_store_with_data().await; + + let key = b"key1".to_vec(); + let range_end = util::get_prefix_end_key(b"key1"); + + let resp = kv_store + .range(RangeRequest { + key: key.clone(), + range_end: range_end.clone(), + limit: 0, + keys_only: false, + ..Default::default() + }) + .await + .unwrap(); + + assert_eq!(2, resp.kvs.len()); + assert_eq!(b"key1".as_slice(), resp.kvs[0].key); + assert_eq!(b"val1".as_slice(), resp.kvs[0].value); + assert_eq!(b"key11".as_slice(), resp.kvs[1].key); + assert_eq!(b"val11".as_slice(), resp.kvs[1].value); + + let resp = kv_store + .range(RangeRequest { + key: key.clone(), + range_end: range_end.clone(), + limit: 0, + keys_only: true, + ..Default::default() + }) + .await + .unwrap(); + + assert_eq!(2, resp.kvs.len()); + assert_eq!(b"key1".as_slice(), resp.kvs[0].key); + assert_eq!(b"".as_slice(), resp.kvs[0].value); + assert_eq!(b"key11".as_slice(), resp.kvs[1].key); + assert_eq!(b"".as_slice(), resp.kvs[1].value); + + let resp = kv_store + .range(RangeRequest { + key: key.clone(), + limit: 0, + keys_only: false, + ..Default::default() + }) + .await + .unwrap(); + + assert_eq!(1, resp.kvs.len()); + assert_eq!(b"key1".as_slice(), resp.kvs[0].key); + assert_eq!(b"val1".as_slice(), resp.kvs[0].value); + + let resp = kv_store + .range(RangeRequest { + key, + range_end, + limit: 1, + keys_only: false, + ..Default::default() + }) + .await + .unwrap(); + + assert_eq!(1, resp.kvs.len()); + assert_eq!(b"key1".as_slice(), resp.kvs[0].key); + assert_eq!(b"val1".as_slice(), resp.kvs[0].value); + } + + #[tokio::test] + async fn test_batch_get() { + let kv_store = mock_mem_store_with_data().await; + + let keys = vec![]; + let batch_resp = kv_store + .batch_get(BatchGetRequest { + keys, + ..Default::default() + }) + .await + .unwrap(); + + assert!(batch_resp.kvs.is_empty()); + + let keys = vec![b"key10".to_vec()]; + let batch_resp = kv_store + .batch_get(BatchGetRequest { + keys, + ..Default::default() + }) + .await + .unwrap(); + + assert!(batch_resp.kvs.is_empty()); + + let keys = vec![b"key1".to_vec(), b"key3".to_vec(), b"key4".to_vec()]; + let batch_resp = kv_store + .batch_get(BatchGetRequest { + keys, + ..Default::default() + }) + .await + .unwrap(); + + assert_eq!(2, batch_resp.kvs.len()); + assert_eq!(b"key1".as_slice(), batch_resp.kvs[0].key); + assert_eq!(b"val1".as_slice(), batch_resp.kvs[0].value); + assert_eq!(b"key3".as_slice(), batch_resp.kvs[1].key); + assert_eq!(b"val3".as_slice(), batch_resp.kvs[1].value); + } + + #[tokio::test(flavor = "multi_thread")] + async fn test_compare_and_put() { + let kv_store = Arc::new(MemStore::new()); + let success = Arc::new(AtomicU8::new(0)); + + let mut joins = vec![]; + for _ in 0..20 { + let kv_store_clone = kv_store.clone(); + let success_clone = success.clone(); + let join = tokio::spawn(async move { + let req = CompareAndPutRequest { + key: b"key".to_vec(), + expect: vec![], + value: b"val_new".to_vec(), + ..Default::default() + }; + let resp = kv_store_clone.compare_and_put(req).await.unwrap(); + if resp.success { + success_clone.fetch_add(1, Ordering::SeqCst); + } + }); + joins.push(join); + } + + for join in joins { + join.await.unwrap(); + } + + assert_eq!(1, success.load(Ordering::SeqCst)); + } + + #[tokio::test] + async fn test_delete_range() { + let kv_store = mock_mem_store_with_data().await; + + let req = DeleteRangeRequest { + key: b"key3".to_vec(), + range_end: vec![], + prev_kv: true, + ..Default::default() + }; + + let resp = kv_store.delete_range(req).await.unwrap(); + assert_eq!(1, resp.prev_kvs.len()); + assert_eq!(b"key3".as_slice(), resp.prev_kvs[0].key); + assert_eq!(b"val3".as_slice(), resp.prev_kvs[0].value); + + let get_resp = kv_store.get(b"key3".to_vec()).await.unwrap(); + assert!(get_resp.is_none()); + + let req = DeleteRangeRequest { + key: b"key2".to_vec(), + range_end: vec![], + prev_kv: false, + ..Default::default() + }; + + let resp = kv_store.delete_range(req).await.unwrap(); + assert!(resp.prev_kvs.is_empty()); + + let get_resp = kv_store.get(b"key2".to_vec()).await.unwrap(); + assert!(get_resp.is_none()); + + let key = b"key1".to_vec(); + let range_end = util::get_prefix_end_key(b"key1"); + + let req = DeleteRangeRequest { + key: key.clone(), + range_end: range_end.clone(), + prev_kv: true, + ..Default::default() + }; + let resp = kv_store.delete_range(req).await.unwrap(); + assert_eq!(2, resp.prev_kvs.len()); + + let req = RangeRequest { + key, + range_end, + ..Default::default() + }; + let resp = kv_store.range(req).await.unwrap(); + assert!(resp.kvs.is_empty()); + } + + #[tokio::test] + async fn test_move_value() { + let kv_store = mock_mem_store_with_data().await; + + let req = MoveValueRequest { + from_key: b"key1".to_vec(), + to_key: b"key111".to_vec(), + ..Default::default() + }; + + let resp = kv_store.move_value(req).await.unwrap(); + assert_eq!(b"key1".as_slice(), resp.kv.as_ref().unwrap().key); + assert_eq!(b"val1".as_slice(), resp.kv.as_ref().unwrap().value); + + let kv_store = mock_mem_store_with_data().await; + + let req = MoveValueRequest { + from_key: b"notexistkey".to_vec(), + to_key: b"key222".to_vec(), + ..Default::default() + }; + + let resp = kv_store.move_value(req).await.unwrap(); + assert!(resp.kv.is_none()); + } +}