fix: correct the range behavior in MemoryKvBackend & RaftEngineBackend (#2615)

* fix: correct the range behavior in MemoryKvBackend & RaftEngineBackend

* refactor: migrate tests from MemoryKvBackend

* chore: apply suggestions from CR

* fix: fix license header

* chore: apply suggestions from CR

* chore: apply suggestions from CR

* fix: fix range bugs
This commit is contained in:
Weny Xu
2023-10-20 11:30:47 +09:00
committed by GitHub
parent b5d9d635eb
commit e1dcf83326
6 changed files with 528 additions and 293 deletions

View File

@@ -13,6 +13,7 @@
// limitations under the License.
pub mod memory;
pub mod test;
pub mod txn;
use std::any::Any;

View File

@@ -17,7 +17,6 @@ use std::collections::btree_map::Entry;
use std::collections::BTreeMap;
use std::fmt::{Display, Formatter};
use std::marker::PhantomData;
use std::ops::Range;
use std::sync::RwLock;
use async_trait::async_trait;
@@ -85,21 +84,25 @@ impl<T: ErrorExt + Send + Sync + 'static> KvBackend for MemoryKvBackend<T> {
}
async fn range(&self, req: RangeRequest) -> Result<RangeResponse, Self::Error> {
let range = req.range();
let RangeRequest {
key,
range_end,
limit,
keys_only,
limit, keys_only, ..
} = req;
let kvs = self.kvs.read().unwrap();
let values = kvs.range(range);
let iter: Box<dyn Iterator<Item = (&Vec<u8>, &Vec<u8>)>> = if range_end.is_empty() {
Box::new(kvs.get_key_value(&key).into_iter())
} else {
Box::new(kvs.range(key..range_end))
};
let mut kvs = iter
let mut more = false;
let mut iter: i64 = 0;
let kvs = values
.take_while(|_| {
let take = limit == 0 || iter != limit;
iter += 1;
more = limit > 0 && iter > limit;
take
})
.map(|(k, v)| {
let key = k.clone();
let value = if keys_only { vec![] } else { v.clone() };
@@ -107,13 +110,6 @@ impl<T: ErrorExt + Send + Sync + 'static> KvBackend for MemoryKvBackend<T> {
})
.collect::<Vec<_>>();
let more = if limit > 0 && kvs.len() > limit as usize {
kvs.truncate(limit as usize);
true
} else {
false
};
Ok(RangeResponse { kvs, more })
}
@@ -215,36 +211,32 @@ impl<T: ErrorExt + Send + Sync + 'static> KvBackend for MemoryKvBackend<T> {
&self,
req: DeleteRangeRequest,
) -> Result<DeleteRangeResponse, Self::Error> {
let DeleteRangeRequest {
key,
range_end,
prev_kv,
} = req;
let range = req.range();
let DeleteRangeRequest { prev_kv, .. } = req;
let mut kvs = self.kvs.write().unwrap();
let prev_kvs = if range_end.is_empty() {
kvs.remove(&key)
.into_iter()
.map(|value| KeyValue {
key: key.clone(),
value,
})
.collect::<Vec<_>>()
} else {
let range = Range {
start: key,
end: range_end,
};
kvs.extract_if(|key, _| range.contains(key))
.map(Into::into)
.collect::<Vec<_>>()
};
let keys = kvs
.range(range)
.map(|(key, _)| key.clone())
.collect::<Vec<_>>();
Ok(DeleteRangeResponse {
deleted: prev_kvs.len() as i64,
prev_kvs: if prev_kv { prev_kvs } else { vec![] },
})
let mut prev_kvs = if prev_kv {
Vec::with_capacity(keys.len())
} else {
vec![]
};
let deleted = keys.len() as i64;
for key in keys {
if let Some(value) = kvs.remove(&key) {
if prev_kv {
prev_kvs.push((key.clone(), value).into())
}
}
}
Ok(DeleteRangeResponse { deleted, prev_kvs })
}
async fn batch_delete(
@@ -358,254 +350,63 @@ impl<T: ErrorExt + Send + Sync> TxnService for MemoryKvBackend<T> {
#[cfg(test)]
mod tests {
use std::sync::atomic::{AtomicU8, Ordering};
use std::sync::Arc;
use super::*;
use crate::error::Error;
use crate::kv_backend::test::{
prepare_kv, test_kv_batch_delete, test_kv_batch_get, test_kv_compare_and_put,
test_kv_delete_range, test_kv_put, test_kv_range, test_kv_range_2,
};
use crate::kv_backend::KvBackend;
use crate::rpc::store::{BatchGetRequest, BatchPutRequest};
use crate::rpc::KeyValue;
use crate::util;
async fn mock_mem_store_with_data() -> MemoryKvBackend<Error> {
let kv_store = MemoryKvBackend::<Error>::new();
let kvs = mock_kvs();
assert!(kv_store
.batch_put(BatchPutRequest {
kvs,
..Default::default()
})
.await
.is_ok());
assert!(kv_store
.put(PutRequest {
key: b"key11".to_vec(),
value: b"val11".to_vec(),
..Default::default()
})
.await
.is_ok());
prepare_kv(&kv_store).await;
kv_store
}
fn mock_kvs() -> Vec<KeyValue> {
vec![
KeyValue {
key: b"key1".to_vec(),
value: b"val1".to_vec(),
},
KeyValue {
key: b"key2".to_vec(),
value: b"val2".to_vec(),
},
KeyValue {
key: b"key3".to_vec(),
value: b"val3".to_vec(),
},
]
}
#[tokio::test]
async fn test_put() {
let kv_store = mock_mem_store_with_data().await;
let resp = kv_store
.put(PutRequest {
key: b"key11".to_vec(),
value: b"val12".to_vec(),
prev_kv: false,
})
.await
.unwrap();
assert!(resp.prev_kv.is_none());
let resp = kv_store
.put(PutRequest {
key: b"key11".to_vec(),
value: b"val13".to_vec(),
prev_kv: true,
})
.await
.unwrap();
let prev_kv = resp.prev_kv.unwrap();
assert_eq!(b"key11", prev_kv.key());
assert_eq!(b"val12", prev_kv.value());
test_kv_put(kv_store).await;
}
#[tokio::test]
async fn test_range() {
let kv_store = mock_mem_store_with_data().await;
let key = b"key1".to_vec();
let range_end = util::get_prefix_end_key(b"key1");
test_kv_range(kv_store).await;
}
let resp = kv_store
.range(RangeRequest {
key: key.clone(),
range_end: range_end.clone(),
limit: 0,
keys_only: false,
})
.await
.unwrap();
#[tokio::test]
async fn test_range_2() {
let kv = MemoryKvBackend::<Error>::new();
assert_eq!(2, resp.kvs.len());
assert_eq!(b"key1", resp.kvs[0].key());
assert_eq!(b"val1", resp.kvs[0].value());
assert_eq!(b"key11", resp.kvs[1].key());
assert_eq!(b"val11", resp.kvs[1].value());
let resp = kv_store
.range(RangeRequest {
key: key.clone(),
range_end: range_end.clone(),
limit: 0,
keys_only: true,
})
.await
.unwrap();
assert_eq!(2, resp.kvs.len());
assert_eq!(b"key1", resp.kvs[0].key());
assert_eq!(b"", resp.kvs[0].value());
assert_eq!(b"key11", resp.kvs[1].key());
assert_eq!(b"", resp.kvs[1].value());
let resp = kv_store
.range(RangeRequest {
key: key.clone(),
limit: 0,
keys_only: false,
..Default::default()
})
.await
.unwrap();
assert_eq!(1, resp.kvs.len());
assert_eq!(b"key1", resp.kvs[0].key());
assert_eq!(b"val1", resp.kvs[0].value());
let resp = kv_store
.range(RangeRequest {
key,
range_end,
limit: 1,
keys_only: false,
})
.await
.unwrap();
assert_eq!(1, resp.kvs.len());
assert_eq!(b"key1", resp.kvs[0].key());
assert_eq!(b"val1", resp.kvs[0].value());
test_kv_range_2(kv).await;
}
#[tokio::test]
async fn test_batch_get() {
let kv_store = mock_mem_store_with_data().await;
let keys = vec![];
let resp = kv_store.batch_get(BatchGetRequest { keys }).await.unwrap();
assert!(resp.kvs.is_empty());
let keys = vec![b"key10".to_vec()];
let resp = kv_store.batch_get(BatchGetRequest { keys }).await.unwrap();
assert!(resp.kvs.is_empty());
let keys = vec![b"key1".to_vec(), b"key3".to_vec(), b"key4".to_vec()];
let resp = kv_store.batch_get(BatchGetRequest { keys }).await.unwrap();
assert_eq!(2, resp.kvs.len());
assert_eq!(b"key1", resp.kvs[0].key());
assert_eq!(b"val1", resp.kvs[0].value());
assert_eq!(b"key3", resp.kvs[1].key());
assert_eq!(b"val3", resp.kvs[1].value());
test_kv_batch_get(kv_store).await;
}
#[tokio::test(flavor = "multi_thread")]
async fn test_compare_and_put() {
let kv_store = Arc::new(MemoryKvBackend::<Error>::new());
let success = Arc::new(AtomicU8::new(0));
let mut joins = vec![];
for _ in 0..20 {
let kv_store_clone = kv_store.clone();
let success_clone = success.clone();
let join = tokio::spawn(async move {
let req = CompareAndPutRequest {
key: b"key".to_vec(),
expect: vec![],
value: b"val_new".to_vec(),
};
let resp = kv_store_clone.compare_and_put(req).await.unwrap();
if resp.success {
success_clone.fetch_add(1, Ordering::SeqCst);
}
});
joins.push(join);
}
for join in joins {
join.await.unwrap();
}
assert_eq!(1, success.load(Ordering::SeqCst));
test_kv_compare_and_put(kv_store).await;
}
#[tokio::test]
async fn test_delete_range() {
let kv_store = mock_mem_store_with_data().await;
let req = DeleteRangeRequest {
key: b"key3".to_vec(),
range_end: vec![],
prev_kv: true,
};
let resp = kv_store.delete_range(req).await.unwrap();
assert_eq!(1, resp.prev_kvs.len());
assert_eq!(b"key3", resp.prev_kvs[0].key());
assert_eq!(b"val3", resp.prev_kvs[0].value());
let resp = kv_store.get(b"key3").await.unwrap();
assert!(resp.is_none());
let req = DeleteRangeRequest {
key: b"key2".to_vec(),
range_end: vec![],
prev_kv: false,
};
let resp = kv_store.delete_range(req).await.unwrap();
assert!(resp.prev_kvs.is_empty());
let resp = kv_store.get(b"key2").await.unwrap();
assert!(resp.is_none());
let key = b"key1".to_vec();
let range_end = util::get_prefix_end_key(b"key1");
let req = DeleteRangeRequest {
key: key.clone(),
range_end: range_end.clone(),
prev_kv: true,
};
let resp = kv_store.delete_range(req).await.unwrap();
assert_eq!(2, resp.prev_kvs.len());
let req = RangeRequest {
key,
range_end,
..Default::default()
};
let resp = kv_store.range(req).await.unwrap();
assert!(resp.kvs.is_empty());
test_kv_delete_range(kv_store).await;
}
#[tokio::test]
@@ -636,35 +437,6 @@ mod tests {
async fn test_batch_delete() {
let kv_store = mock_mem_store_with_data().await;
assert!(kv_store.get(b"key1").await.unwrap().is_some());
assert!(kv_store.get(b"key100").await.unwrap().is_none());
let req = BatchDeleteRequest {
keys: vec![b"key1".to_vec(), b"key100".to_vec()],
prev_kv: true,
};
let resp = kv_store.batch_delete(req).await.unwrap();
assert_eq!(1, resp.prev_kvs.len());
assert_eq!(
vec![KeyValue {
key: b"key1".to_vec(),
value: b"val1".to_vec()
}],
resp.prev_kvs
);
assert!(kv_store.get(b"key1").await.unwrap().is_none());
assert!(kv_store.get(b"key2").await.unwrap().is_some());
assert!(kv_store.get(b"key3").await.unwrap().is_some());
let req = BatchDeleteRequest {
keys: vec![b"key2".to_vec(), b"key3".to_vec()],
prev_kv: false,
};
let resp = kv_store.batch_delete(req).await.unwrap();
assert!(resp.prev_kvs.is_empty());
assert!(kv_store.get(b"key2").await.unwrap().is_none());
assert!(kv_store.get(b"key3").await.unwrap().is_none());
test_kv_batch_delete(kv_store).await;
}
}

View File

@@ -0,0 +1,352 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use std::sync::atomic::{AtomicU8, Ordering};
use std::sync::Arc;
use super::{KvBackend, *};
use crate::error::Error;
use crate::rpc::store::{BatchGetRequest, PutRequest};
use crate::rpc::KeyValue;
use crate::util;
pub fn mock_kvs() -> Vec<KeyValue> {
vec![
KeyValue {
key: b"key1".to_vec(),
value: b"val1".to_vec(),
},
KeyValue {
key: b"key2".to_vec(),
value: b"val2".to_vec(),
},
KeyValue {
key: b"key3".to_vec(),
value: b"val3".to_vec(),
},
]
}
pub async fn prepare_kv(kv_store: &impl KvBackend) {
let kvs = mock_kvs();
assert!(kv_store
.batch_put(BatchPutRequest {
kvs,
..Default::default()
})
.await
.is_ok());
assert!(kv_store
.put(PutRequest {
key: b"key11".to_vec(),
value: b"val11".to_vec(),
..Default::default()
})
.await
.is_ok());
}
pub async fn test_kv_put(kv_store: impl KvBackend) {
let resp = kv_store
.put(PutRequest {
key: b"key11".to_vec(),
value: b"val12".to_vec(),
prev_kv: false,
})
.await
.unwrap();
assert!(resp.prev_kv.is_none());
let resp = kv_store
.put(PutRequest {
key: b"key11".to_vec(),
value: b"val13".to_vec(),
prev_kv: true,
})
.await
.unwrap();
let prev_kv = resp.prev_kv.unwrap();
assert_eq!(b"key11", prev_kv.key());
assert_eq!(b"val12", prev_kv.value());
}
pub async fn test_kv_range(kv_store: impl KvBackend) {
let key = b"key1".to_vec();
let range_end = util::get_prefix_end_key(b"key1");
let resp = kv_store
.range(RangeRequest {
key: key.clone(),
range_end: range_end.clone(),
limit: 0,
keys_only: false,
})
.await
.unwrap();
assert_eq!(2, resp.kvs.len());
assert_eq!(b"key1", resp.kvs[0].key());
assert_eq!(b"val1", resp.kvs[0].value());
assert_eq!(b"key11", resp.kvs[1].key());
assert_eq!(b"val11", resp.kvs[1].value());
let resp = kv_store
.range(RangeRequest {
key: key.clone(),
range_end: range_end.clone(),
limit: 0,
keys_only: true,
})
.await
.unwrap();
assert_eq!(2, resp.kvs.len());
assert_eq!(b"key1", resp.kvs[0].key());
assert_eq!(b"", resp.kvs[0].value());
assert_eq!(b"key11", resp.kvs[1].key());
assert_eq!(b"", resp.kvs[1].value());
let resp = kv_store
.range(RangeRequest {
key: key.clone(),
limit: 0,
keys_only: false,
..Default::default()
})
.await
.unwrap();
assert_eq!(1, resp.kvs.len());
assert_eq!(b"key1", resp.kvs[0].key());
assert_eq!(b"val1", resp.kvs[0].value());
let resp = kv_store
.range(RangeRequest {
key,
range_end,
limit: 1,
keys_only: false,
})
.await
.unwrap();
assert_eq!(1, resp.kvs.len());
assert_eq!(b"key1", resp.kvs[0].key());
assert_eq!(b"val1", resp.kvs[0].value());
}
pub async fn test_kv_range_2(kv_store: impl KvBackend) {
kv_store
.put(PutRequest::new().with_key("atest").with_value("value"))
.await
.unwrap();
kv_store
.put(PutRequest::new().with_key("test").with_value("value"))
.await
.unwrap();
// If both key and range_end are \0, then range represents all keys.
let result = kv_store
.range(RangeRequest::new().with_range(b"\0".to_vec(), b"\0".to_vec()))
.await
.unwrap();
assert_eq!(result.kvs.len(), 2);
assert!(!result.more);
// If range_end is \0, the range is all keys greater than or equal to the key argument.
let result = kv_store
.range(RangeRequest::new().with_range(b"a".to_vec(), b"\0".to_vec()))
.await
.unwrap();
assert_eq!(result.kvs.len(), 2);
let result = kv_store
.range(RangeRequest::new().with_range(b"b".to_vec(), b"\0".to_vec()))
.await
.unwrap();
assert_eq!(result.kvs.len(), 1);
assert_eq!(result.kvs[0].key, b"test");
// Fetches the keys >= "a", set limit to 1, the `more` should be true.
let result = kv_store
.range(
RangeRequest::new()
.with_range(b"a".to_vec(), b"\0".to_vec())
.with_limit(1),
)
.await
.unwrap();
assert_eq!(result.kvs.len(), 1);
assert!(result.more);
// Fetches the keys >= "a", set limit to 2, the `more` should be false.
let result = kv_store
.range(
RangeRequest::new()
.with_range(b"a".to_vec(), b"\0".to_vec())
.with_limit(2),
)
.await
.unwrap();
assert_eq!(result.kvs.len(), 2);
assert!(!result.more);
// Fetches the keys >= "a", set limit to 3, the `more` should be false.
let result = kv_store
.range(
RangeRequest::new()
.with_range(b"a".to_vec(), b"\0".to_vec())
.with_limit(3),
)
.await
.unwrap();
assert_eq!(result.kvs.len(), 2);
assert!(!result.more);
}
pub async fn test_kv_batch_get(kv_store: impl KvBackend) {
let keys = vec![];
let resp = kv_store.batch_get(BatchGetRequest { keys }).await.unwrap();
assert!(resp.kvs.is_empty());
let keys = vec![b"key10".to_vec()];
let resp = kv_store.batch_get(BatchGetRequest { keys }).await.unwrap();
assert!(resp.kvs.is_empty());
let keys = vec![b"key1".to_vec(), b"key3".to_vec(), b"key4".to_vec()];
let resp = kv_store.batch_get(BatchGetRequest { keys }).await.unwrap();
assert_eq!(2, resp.kvs.len());
assert_eq!(b"key1", resp.kvs[0].key());
assert_eq!(b"val1", resp.kvs[0].value());
assert_eq!(b"key3", resp.kvs[1].key());
assert_eq!(b"val3", resp.kvs[1].value());
}
pub async fn test_kv_compare_and_put(kv_store: Arc<dyn KvBackend<Error = Error>>) {
let success = Arc::new(AtomicU8::new(0));
let mut joins = vec![];
for _ in 0..20 {
let kv_store_clone = kv_store.clone();
let success_clone = success.clone();
let join = tokio::spawn(async move {
let req = CompareAndPutRequest {
key: b"key".to_vec(),
expect: vec![],
value: b"val_new".to_vec(),
};
let resp = kv_store_clone.compare_and_put(req).await.unwrap();
if resp.success {
success_clone.fetch_add(1, Ordering::SeqCst);
}
});
joins.push(join);
}
for join in joins {
join.await.unwrap();
}
assert_eq!(1, success.load(Ordering::SeqCst));
}
pub async fn test_kv_delete_range(kv_store: impl KvBackend) {
let req = DeleteRangeRequest {
key: b"key3".to_vec(),
range_end: vec![],
prev_kv: true,
};
let resp = kv_store.delete_range(req).await.unwrap();
assert_eq!(1, resp.prev_kvs.len());
assert_eq!(1, resp.deleted);
assert_eq!(b"key3", resp.prev_kvs[0].key());
assert_eq!(b"val3", resp.prev_kvs[0].value());
let resp = kv_store.get(b"key3").await.unwrap();
assert!(resp.is_none());
let req = DeleteRangeRequest {
key: b"key2".to_vec(),
range_end: vec![],
prev_kv: false,
};
let resp = kv_store.delete_range(req).await.unwrap();
assert_eq!(1, resp.deleted);
assert!(resp.prev_kvs.is_empty());
let resp = kv_store.get(b"key2").await.unwrap();
assert!(resp.is_none());
let key = b"key1".to_vec();
let range_end = util::get_prefix_end_key(b"key1");
let req = DeleteRangeRequest {
key: key.clone(),
range_end: range_end.clone(),
prev_kv: true,
};
let resp = kv_store.delete_range(req).await.unwrap();
assert_eq!(2, resp.prev_kvs.len());
let req = RangeRequest {
key,
range_end,
..Default::default()
};
let resp = kv_store.range(req).await.unwrap();
assert!(resp.kvs.is_empty());
}
pub async fn test_kv_batch_delete(kv_store: impl KvBackend) {
assert!(kv_store.get(b"key1").await.unwrap().is_some());
assert!(kv_store.get(b"key100").await.unwrap().is_none());
let req = BatchDeleteRequest {
keys: vec![b"key1".to_vec(), b"key100".to_vec()],
prev_kv: true,
};
let resp = kv_store.batch_delete(req).await.unwrap();
assert_eq!(1, resp.prev_kvs.len());
assert_eq!(
vec![KeyValue {
key: b"key1".to_vec(),
value: b"val1".to_vec()
}],
resp.prev_kvs
);
assert!(kv_store.get(b"key1").await.unwrap().is_none());
assert!(kv_store.get(b"key2").await.unwrap().is_some());
assert!(kv_store.get(b"key3").await.unwrap().is_some());
let req = BatchDeleteRequest {
keys: vec![b"key2".to_vec(), b"key3".to_vec()],
prev_kv: false,
};
let resp = kv_store.batch_delete(req).await.unwrap();
assert!(resp.prev_kvs.is_empty());
assert!(kv_store.get(b"key2").await.unwrap().is_none());
assert!(kv_store.get(b"key3").await.unwrap().is_none());
}

View File

@@ -13,6 +13,7 @@
// limitations under the License.
use std::fmt::{Display, Formatter};
use std::ops::Bound;
use api::v1::meta::{
BatchDeleteRequest as PbBatchDeleteRequest, BatchDeleteResponse as PbBatchDeleteResponse,
@@ -30,6 +31,17 @@ use crate::error;
use crate::error::Result;
use crate::rpc::{util, KeyValue};
pub fn to_range(key: Vec<u8>, range_end: Vec<u8>) -> (Bound<Vec<u8>>, Bound<Vec<u8>>) {
match (&key[..], &range_end[..]) {
(_, []) => (Bound::Included(key.clone()), Bound::Included(key)),
// If both key and range_end are \0, then range represents all keys.
([0], [0]) => (Bound::Unbounded, Bound::Unbounded),
// If range_end is \0, the range is all keys greater than or equal to the key argument.
(_, [0]) => (Bound::Included(key), Bound::Unbounded),
(_, _) => (Bound::Included(key), Bound::Excluded(range_end)),
}
}
#[derive(Debug, Clone, Default)]
pub struct RangeRequest {
/// key is the first key for the range, If range_end is not given, the
@@ -96,6 +108,11 @@ impl RangeRequest {
}
}
/// Returns the `RangeBounds`.
pub fn range(&self) -> (Bound<Vec<u8>>, Bound<Vec<u8>>) {
to_range(self.key.clone(), self.range_end.clone())
}
/// key is the first key for the range, If range_end is not given, the
/// request only looks up key.
#[inline]
@@ -690,6 +707,11 @@ impl DeleteRangeRequest {
}
}
/// Returns the `RangeBounds`.
pub fn range(&self) -> (Bound<Vec<u8>>, Bound<Vec<u8>>) {
to_range(self.key.clone(), self.range_end.clone())
}
/// key is the first key to delete in the range. If range_end is not given,
/// the range is defined to contain only the key argument.
#[inline]

View File

@@ -19,7 +19,7 @@ common-base = { workspace = true }
common-config = { workspace = true }
common-error = { workspace = true }
common-macro = { workspace = true }
common-meta = { workspace = true }
common-meta = { workspace = true, features = ["testing"] }
common-runtime = { workspace = true }
common-telemetry = { workspace = true }
futures-util.workspace = true

View File

@@ -15,6 +15,7 @@
//! [KvBackend] implementation based on [raft_engine::Engine].
use std::any::Any;
use std::ops::Bound::{Excluded, Included, Unbounded};
use std::sync::RwLock;
use common_error::ext::BoxedError;
@@ -28,6 +29,7 @@ use common_meta::rpc::store::{
RangeRequest, RangeResponse,
};
use common_meta::rpc::KeyValue;
use common_meta::util::get_next_prefix_key;
use raft_engine::{Config, Engine, LogBatch};
use snafu::ResultExt;
@@ -137,29 +139,48 @@ impl KvBackend for RaftEngineBackend {
async fn range(&self, req: RangeRequest) -> Result<RangeResponse, Self::Error> {
let mut res = vec![];
let (start, end) = req.range();
let RangeRequest {
keys_only, limit, ..
} = req;
let (start_key, end_key) = match (start, end) {
(Included(start), Included(end)) => (Some(start), Some(get_next_prefix_key(&end))),
(Unbounded, Unbounded) => (None, None),
(Included(start), Excluded(end)) => (Some(start), Some(end)),
(Included(start), Unbounded) => (Some(start), None),
_ => unreachable!(),
};
let mut more = false;
let mut iter = 0;
self.engine
.read()
.unwrap()
.scan_raw_messages(
SYSTEM_NAMESPACE,
Some(&req.key),
Some(&req.range_end),
start_key.as_deref(),
end_key.as_deref(),
false,
|key, value| {
res.push(KeyValue {
key: key.to_vec(),
value: value.to_vec(),
});
true
let take = limit == 0 || iter != limit;
iter += 1;
more = limit > 0 && iter > limit;
if take {
res.push(KeyValue {
key: key.to_vec(),
value: if keys_only { vec![] } else { value.to_vec() },
});
}
take
},
)
.context(RaftEngineSnafu)
.map_err(BoxedError::new)
.context(meta_error::ExternalSnafu)?;
Ok(RangeResponse {
kvs: res,
more: false,
})
Ok(RangeResponse { kvs: res, more })
}
async fn put(&self, req: PutRequest) -> Result<PutResponse, Self::Error> {
@@ -275,7 +296,7 @@ impl KvBackend for RaftEngineBackend {
key,
range_end,
limit: 0,
keys_only: true,
keys_only: false,
};
let range_resp = self.range(range).await?;
@@ -383,7 +404,12 @@ fn engine_delete(engine: &Engine, key: &[u8]) -> meta_error::Result<()> {
#[cfg(test)]
mod tests {
use std::collections::HashSet;
use std::sync::Arc;
use common_meta::kv_backend::test::{
prepare_kv, test_kv_batch_delete, test_kv_batch_get, test_kv_compare_and_put,
test_kv_delete_range, test_kv_put, test_kv_range, test_kv_range_2,
};
use common_test_util::temp_dir::create_temp_dir;
use raft_engine::{Config, ReadableSize, RecoveryMode};
@@ -615,4 +641,66 @@ mod tests {
keys
);
}
#[tokio::test]
async fn test_range() {
let dir = create_temp_dir("range");
let backend = build_kv_backend(dir.path().to_str().unwrap().to_string());
prepare_kv(&backend).await;
test_kv_range(backend).await;
}
#[tokio::test]
async fn test_range_2() {
let dir = create_temp_dir("range2");
let backend = build_kv_backend(dir.path().to_str().unwrap().to_string());
test_kv_range_2(backend).await;
}
#[tokio::test]
async fn test_put() {
let dir = create_temp_dir("put");
let backend = build_kv_backend(dir.path().to_str().unwrap().to_string());
prepare_kv(&backend).await;
test_kv_put(backend).await;
}
#[tokio::test]
async fn test_batch_get() {
let dir = create_temp_dir("batch_get");
let backend = build_kv_backend(dir.path().to_str().unwrap().to_string());
prepare_kv(&backend).await;
test_kv_batch_get(backend).await;
}
#[tokio::test]
async fn test_batch_delete() {
let dir = create_temp_dir("batch_delete");
let backend = build_kv_backend(dir.path().to_str().unwrap().to_string());
prepare_kv(&backend).await;
test_kv_batch_delete(backend).await;
}
#[tokio::test]
async fn test_delete_range() {
let dir = create_temp_dir("delete_range");
let backend = build_kv_backend(dir.path().to_str().unwrap().to_string());
prepare_kv(&backend).await;
test_kv_delete_range(backend).await;
}
#[tokio::test(flavor = "multi_thread")]
async fn test_compare_and_put_2() {
let dir = create_temp_dir("compare_and_put");
let backend = build_kv_backend(dir.path().to_str().unwrap().to_string());
prepare_kv(&backend).await;
test_kv_compare_and_put(Arc::new(backend)).await;
}
}