refactor: unify KvBackend and KvStore (#1890)

* refactor: unify KvBackend and KvStore
This commit is contained in:
LFC
2023-07-07 19:53:49 +08:00
committed by GitHub
parent 4cc42e2ba6
commit f19498f73e
54 changed files with 2116 additions and 2156 deletions

View File

@@ -13,6 +13,7 @@ common-error = { path = "../error" }
common-runtime = { path = "../runtime" }
common-telemetry = { path = "../telemetry" }
common-time = { path = "../time" }
etcd-client.workspace = true
futures.workspace = true
lazy_static.workspace = true
prost.workspace = true

View File

@@ -15,6 +15,8 @@
use common_error::prelude::*;
use serde_json::error::Error as JsonError;
use snafu::Location;
use store_api::storage::RegionNumber;
use table::metadata::TableId;
#[derive(Debug, Snafu)]
#[snafu(visibility(pub))]
@@ -73,6 +75,22 @@ pub enum Error {
source: BoxedError,
location: Location,
},
#[snafu(display("Etcd txn error: {err_msg}"))]
EtcdTxnOpResponse { err_msg: String, location: Location },
#[snafu(display(
"Failed to move region {} in table {}, err: {}",
region,
table_id,
err_msg
))]
MoveRegion {
table_id: TableId,
region: RegionNumber,
err_msg: String,
location: Location,
},
}
pub type Result<T> = std::result::Result<T, Error>;
@@ -81,12 +99,13 @@ impl ErrorExt for Error {
fn status_code(&self) -> StatusCode {
use Error::*;
match self {
IllegalServerState { .. } => StatusCode::Internal,
IllegalServerState { .. } | EtcdTxnOpResponse { .. } => StatusCode::Internal,
SerdeJson { .. }
| RouteInfoCorrupted { .. }
| InvalidProtoMsg { .. }
| InvalidTableMetadata { .. } => StatusCode::Unexpected,
| InvalidTableMetadata { .. }
| MoveRegion { .. } => StatusCode::Unexpected,
SendMessage { .. }
| GetKvCache { .. }

View File

@@ -12,16 +12,19 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use futures::{StreamExt, TryStreamExt};
use serde::{Deserialize, Serialize};
use snafu::OptionExt;
use snafu::{ensure, OptionExt};
use store_api::storage::RegionNumber;
use table::metadata::TableId;
use super::{DATANODE_TABLE_KEY_PATTERN, DATANODE_TABLE_KEY_PREFIX};
use crate::error::{ConcurrentModifyRegionsPlacementSnafu, InvalidTableMetadataSnafu, Result};
use crate::error::{
ConcurrentModifyRegionsPlacementSnafu, InvalidTableMetadataSnafu, MoveRegionSnafu, Result,
};
use crate::key::{to_removed_key, TableMetaKey};
use crate::kv_backend::txn::{Compare, CompareOp, Txn, TxnOp};
use crate::kv_backend::KvBackendRef;
use crate::rpc::store::{CompareAndPutRequest, MoveValueRequest, RangeRequest};
use crate::DatanodeId;
struct DatanodeTableKey {
@@ -100,7 +103,7 @@ impl DatanodeTableManager {
self.kv_backend
.get(&key.as_raw_key())
.await?
.map(|kv| DatanodeTableValue::try_from_raw_value(kv.1))
.map(|kv| DatanodeTableValue::try_from_raw_value(kv.value))
.transpose()
}
@@ -112,72 +115,123 @@ impl DatanodeTableManager {
) -> Result<()> {
let key = DatanodeTableKey::new(datanode_id, table_id).as_raw_key();
let val = DatanodeTableValue::new(table_id, regions).try_as_raw_value()?;
self.kv_backend
.compare_and_set(&key, &[], &val)
.await?
.map_err(|curr| {
let curr = if let Some(curr) = curr {
DatanodeTableValue::try_from_raw_value(curr).map_or_else(
|e| format!("invalid DatanodeTableValue for Datanode {datanode_id}: {e}"),
let req = CompareAndPutRequest::new().with_key(key).with_value(val);
let resp = self.kv_backend.compare_and_put(req).await?;
if !resp.success {
let curr = resp.prev_kv.map_or_else(
|| "empty".to_string(),
|kv| {
DatanodeTableValue::try_from_raw_value(kv.value).map_or_else(
|e| format!("Invalid DatanodeTableValue for Datanode {datanode_id}: {e}"),
|v| format!("{v:?}"),
)
} else {
"empty".to_string()
};
ConcurrentModifyRegionsPlacementSnafu {
err_msg: format!("Datanode {datanode_id} already existed {curr}"),
}
.build()
})
},
);
return ConcurrentModifyRegionsPlacementSnafu {
err_msg: format!("Datanode {datanode_id} already existed {curr}"),
}
.fail();
}
Ok(())
}
pub async fn remove(&self, datanode_id: DatanodeId, table_id: TableId) -> Result<()> {
let key = DatanodeTableKey::new(datanode_id, table_id);
let removed_key = to_removed_key(&String::from_utf8_lossy(&key.as_raw_key()));
self.kv_backend
.move_value(&key.as_raw_key(), removed_key.as_bytes())
.await
let req = MoveValueRequest::new(key.as_raw_key(), removed_key.as_bytes());
let _ = self.kv_backend.move_value(req).await?;
Ok(())
}
// TODO(LFC): Use transaction to move region once the KvBackend and KvStore are merged into one.
pub async fn move_region(
&self,
from_datanode: DatanodeId,
to_datanode: DatanodeId,
table_id: TableId,
region: RegionNumber,
) -> Result<()> {
) -> Result<bool> {
let from_key = DatanodeTableKey::new(from_datanode, table_id);
let from_value = self.get(&from_key).await?;
if let Some(mut from_value) = from_value {
from_value.regions.retain(|x| *x != region);
from_value.version += 1;
self.kv_backend
.set(&from_key.as_raw_key(), &from_value.try_as_raw_value()?)
.await?;
}
let mut from_value = self.get(&from_key).await?.context(MoveRegionSnafu {
table_id,
region,
err_msg: format!("DatanodeTableKey not found for Datanode {from_datanode}"),
})?;
ensure!(
from_value.regions.contains(&region),
MoveRegionSnafu {
table_id,
region,
err_msg: format!("target region not found in Datanode {from_datanode}"),
}
);
let to_key = DatanodeTableKey::new(to_datanode, table_id);
let to_value = self.get(&to_key).await?;
if let Some(mut to_value) = to_value {
to_value.regions.push(region);
to_value.version += 1;
self.kv_backend
.set(&to_key.as_raw_key(), &to_value.try_as_raw_value()?)
.await?;
if let Some(v) = to_value.as_ref() {
ensure!(
!v.regions.contains(&region),
MoveRegionSnafu {
table_id,
region,
err_msg: format!("target region already existed in Datanode {to_datanode}"),
}
);
}
Ok(())
let compares = vec![
Compare::with_value(
from_key.as_raw_key(),
CompareOp::Equal,
from_value.try_as_raw_value()?,
),
Compare::new(
to_key.as_raw_key(),
CompareOp::Equal,
to_value
.as_ref()
.map(|x| x.try_as_raw_value())
.transpose()?,
),
];
let mut operations = Vec::with_capacity(2);
from_value.regions.retain(|x| *x != region);
if from_value.regions.is_empty() {
operations.push(TxnOp::Delete(from_key.as_raw_key()));
} else {
from_value.version += 1;
operations.push(TxnOp::Put(
from_key.as_raw_key(),
from_value.try_as_raw_value()?,
));
}
if let Some(mut v) = to_value {
v.regions.push(region);
v.version += 1;
operations.push(TxnOp::Put(to_key.as_raw_key(), v.try_as_raw_value()?));
} else {
let v = DatanodeTableValue::new(table_id, vec![region]);
operations.push(TxnOp::Put(to_key.as_raw_key(), v.try_as_raw_value()?));
}
let txn = Txn::new().when(compares).and_then(operations);
let resp = self.kv_backend.txn(txn).await?;
Ok(resp.succeeded)
}
pub async fn tables(&self, datanode_id: DatanodeId) -> Result<Vec<DatanodeTableValue>> {
let prefix = DatanodeTableKey::prefix(datanode_id);
let table_ids = self
.kv_backend
.range(prefix.as_bytes())
.map(|result| result.map(|kv| DatanodeTableValue::try_from_raw_value(kv.1)))
.try_collect::<Vec<_>>()
.await?
let req = RangeRequest::new().with_prefix(prefix.as_bytes());
let resp = self.kv_backend.range(req).await?;
let table_ids = resp
.kvs
.into_iter()
.map(|kv| DatanodeTableValue::try_from_raw_value(kv.value))
.collect::<Result<Vec<_>>>()?;
Ok(table_ids)
}
@@ -195,11 +249,20 @@ mod tests {
async fn test_move_region() {
let manager = DatanodeTableManager::new(Arc::new(MemoryKvBackend::default()));
assert!(manager.create(1, 1, vec![1, 2]).await.is_ok());
assert!(manager.create(2, 1, vec![3, 4]).await.is_ok());
let result = manager.move_region(1, 2, 1, 1).await;
assert!(result.unwrap_err().to_string().contains(
"Failed to move region 1 in table 1, err: DatanodeTableKey not found for Datanode 1"
));
assert!(manager.move_region(1, 2, 1, 1).await.is_ok());
assert!(manager.create(1, 1, vec![1, 2, 3]).await.is_ok());
let result = manager.move_region(1, 2, 1, 100).await;
assert!(result.unwrap_err().to_string().contains(
"Failed to move region 100 in table 1, err: target region not found in Datanode 1"
));
// Move region 1 from datanode 1 to datanode 2.
// Note that the DatanodeTableValue is not existed for datanode 2 now.
assert!(manager.move_region(1, 2, 1, 1).await.unwrap());
let value = manager
.get(&DatanodeTableKey::new(1, 1))
.await
@@ -209,11 +272,10 @@ mod tests {
value,
DatanodeTableValue {
table_id: 1,
regions: vec![2],
regions: vec![2, 3],
version: 1,
}
);
let value = manager
.get(&DatanodeTableKey::new(2, 1))
.await
@@ -223,10 +285,57 @@ mod tests {
value,
DatanodeTableValue {
table_id: 1,
regions: vec![3, 4, 1],
regions: vec![1],
version: 0,
}
);
// Move region 2 from datanode 1 to datanode 2.
assert!(manager.move_region(1, 2, 1, 2).await.is_ok());
let value = manager
.get(&DatanodeTableKey::new(1, 1))
.await
.unwrap()
.unwrap();
assert_eq!(
value,
DatanodeTableValue {
table_id: 1,
regions: vec![3],
version: 2,
}
);
let value = manager
.get(&DatanodeTableKey::new(2, 1))
.await
.unwrap()
.unwrap();
assert_eq!(
value,
DatanodeTableValue {
table_id: 1,
regions: vec![1, 2],
version: 1,
}
);
// Move region 3 (the last region) from datanode 1 to datanode 2.
assert!(manager.move_region(1, 2, 1, 3).await.is_ok());
let value = manager.get(&DatanodeTableKey::new(1, 1)).await.unwrap();
assert!(value.is_none());
let value = manager
.get(&DatanodeTableKey::new(2, 1))
.await
.unwrap()
.unwrap();
assert_eq!(
value,
DatanodeTableValue {
table_id: 1,
regions: vec![1, 2, 3],
version: 2,
}
);
}
#[tokio::test]
@@ -262,8 +371,8 @@ mod tests {
.await
.unwrap()
.unwrap();
assert_eq!(b"__removed-__dn_table/2/1", kv.0.as_slice());
let value = DatanodeTableValue::try_from_raw_value(kv.1).unwrap();
assert_eq!(b"__removed-__dn_table/2/1", kv.key());
let value = DatanodeTableValue::try_from_raw_value(kv.value).unwrap();
assert_eq!(value, expected_value);
let values = manager.tables(1).await.unwrap();

View File

@@ -19,6 +19,7 @@ use super::TABLE_INFO_KEY_PREFIX;
use crate::error::Result;
use crate::key::{to_removed_key, TableMetaKey};
use crate::kv_backend::KvBackendRef;
use crate::rpc::store::{CompareAndPutRequest, MoveValueRequest};
pub struct TableInfoKey {
table_id: TableId,
@@ -57,11 +58,18 @@ impl TableInfoManager {
self.kv_backend
.get(&raw_key)
.await?
.map(|x| TableInfoValue::try_from_raw_value(x.1))
.map(|x| TableInfoValue::try_from_raw_value(x.value))
.transpose()
}
pub async fn compare_and_set(
/// Compare and put value of key. `expect` is the expected value, if backend's current value associated
/// with key is the same as `expect`, the value will be updated to `val`.
///
/// - If the compare-and-set operation successfully updated value, this method will return an `Ok(Ok())`
/// - If associated value is not the same as `expect`, no value will be updated and an `Ok(Err(Vec<u8>))`
/// will be returned, the `Err(Vec<u8>)` indicates the current associated value of key.
/// - If any error happens during operation, an `Err(Error)` will be returned.
pub async fn compare_and_put(
&self,
table_id: TableId,
expect: Option<TableInfoValue>,
@@ -82,17 +90,24 @@ impl TableInfoManager {
};
let raw_value = value.try_as_raw_value()?;
self.kv_backend
.compare_and_set(&raw_key, &expect, &raw_value)
.await
let req = CompareAndPutRequest::new()
.with_key(raw_key)
.with_expect(expect)
.with_value(raw_value);
let resp = self.kv_backend.compare_and_put(req).await?;
Ok(if resp.success {
Ok(())
} else {
Err(resp.prev_kv.map(|x| x.value))
})
}
pub async fn remove(&self, table_id: TableId) -> Result<()> {
let key = TableInfoKey::new(table_id);
let removed_key = to_removed_key(&String::from_utf8_lossy(key.as_raw_key().as_slice()));
self.kv_backend
.move_value(&key.as_raw_key(), removed_key.as_bytes())
.await
let key = TableInfoKey::new(table_id).as_raw_key();
let removed_key = to_removed_key(&String::from_utf8_lossy(&key));
let req = MoveValueRequest::new(key, removed_key.as_bytes());
self.kv_backend.move_value(req).await?;
Ok(())
}
}
@@ -107,6 +122,7 @@ mod tests {
use super::*;
use crate::kv_backend::memory::MemoryKvBackend;
use crate::kv_backend::KvBackend;
use crate::rpc::store::PutRequest;
#[tokio::test]
async fn test_table_info_manager() {
@@ -120,7 +136,8 @@ mod tests {
}
.try_as_raw_value()
.unwrap();
backend.set(&key, &val).await.unwrap();
let req = PutRequest::new().with_key(key).with_value(val);
backend.put(req).await.unwrap();
}
let manager = TableInfoManager::new(backend.clone());
@@ -137,7 +154,7 @@ mod tests {
let table_info = new_table_info(4);
let result = manager
.compare_and_set(4, None, table_info.clone())
.compare_and_put(4, None, table_info.clone())
.await
.unwrap();
assert!(result.is_ok());
@@ -145,7 +162,7 @@ mod tests {
// test cas failed, the new table info is not set
let new_table_info = new_table_info(4);
let result = manager
.compare_and_set(4, None, new_table_info.clone())
.compare_and_put(4, None, new_table_info.clone())
.await
.unwrap();
let actual = TableInfoValue::try_from_raw_value(result.unwrap_err().unwrap()).unwrap();
@@ -159,7 +176,7 @@ mod tests {
// test cas success
let result = manager
.compare_and_set(4, Some(actual), new_table_info.clone())
.compare_and_put(4, Some(actual), new_table_info.clone())
.await
.unwrap();
assert!(result.is_ok());
@@ -171,8 +188,8 @@ mod tests {
.await
.unwrap()
.unwrap();
assert_eq!(b"__removed-__table_info/4", kv.0.as_slice());
let value = TableInfoValue::try_from_raw_value(kv.1).unwrap();
assert_eq!(b"__removed-__table_info/4", kv.key.as_slice());
let value = TableInfoValue::try_from_raw_value(kv.value).unwrap();
assert_eq!(value.table_info, new_table_info);
assert_eq!(value.version, 1);
}

View File

@@ -14,7 +14,6 @@
use std::sync::Arc;
use futures::{StreamExt, TryStreamExt};
use serde::{Deserialize, Serialize};
use snafu::OptionExt;
use table::metadata::TableId;
@@ -24,6 +23,7 @@ use crate::error::{InvalidTableMetadataSnafu, Result};
use crate::key::{to_removed_key, TableMetaKey};
use crate::kv_backend::memory::MemoryKvBackend;
use crate::kv_backend::KvBackendRef;
use crate::rpc::store::{CompareAndPutRequest, MoveValueRequest, RangeRequest};
#[derive(Debug)]
pub struct TableNameKey<'a> {
@@ -111,11 +111,11 @@ impl TableNameManager {
let raw_key = key.as_raw_key();
let value = TableNameValue::new(table_id);
let raw_value = value.try_as_raw_value()?;
let result = self
.kv_backend
.compare_and_set(&raw_key, &[], &raw_value)
.await?;
Ok(matches!(result, Ok(())))
let req = CompareAndPutRequest::new()
.with_key(raw_key)
.with_value(raw_value);
let result = self.kv_backend.compare_and_put(req).await?;
Ok(result.success)
}
pub async fn get(&self, key: &TableNameKey<'_>) -> Result<Option<TableNameValue>> {
@@ -123,19 +123,18 @@ impl TableNameManager {
self.kv_backend
.get(&raw_key)
.await?
.map(|x| TableNameValue::try_from_raw_value(x.1))
.map(|x| TableNameValue::try_from_raw_value(x.value))
.transpose()
}
pub async fn tables(&self, catalog: &str, schema: &str) -> Result<Vec<String>> {
let key = TableNameKey::prefix_to_table(catalog, schema).into_bytes();
let table_names = self
.kv_backend
.range(&key)
.map(|x| x.map(|kv| TableNameKey::strip_table_name(&kv.0)))
.try_collect::<Vec<_>>()
.await?
let req = RangeRequest::new().with_prefix(key);
let resp = self.kv_backend.range(req).await?;
let table_names = resp
.kvs
.into_iter()
.map(|kv| TableNameKey::strip_table_name(kv.key()))
.collect::<Result<Vec<_>>>()?;
Ok(table_names)
}
@@ -143,9 +142,9 @@ impl TableNameManager {
pub async fn remove(&self, key: &TableNameKey<'_>) -> Result<()> {
let raw_key = key.as_raw_key();
let removed_key = to_removed_key(&String::from_utf8_lossy(&raw_key));
self.kv_backend
.move_value(&raw_key, removed_key.as_bytes())
.await
let req = MoveValueRequest::new(raw_key, removed_key.as_bytes());
let _ = self.kv_backend.move_value(req).await?;
Ok(())
}
}
@@ -183,7 +182,7 @@ mod tests {
.await
.unwrap()
.unwrap();
let value = TableNameValue::try_from_raw_value(kv.1).unwrap();
let value = TableNameValue::try_from_raw_value(kv.value).unwrap();
assert_eq!(value.table_id(), 99);
let tables = manager.tables("my_catalog", "my_schema").await.unwrap();

View File

@@ -22,6 +22,7 @@ use super::TABLE_REGION_KEY_PREFIX;
use crate::error::Result;
use crate::key::{to_removed_key, TableMetaKey};
use crate::kv_backend::KvBackendRef;
use crate::rpc::store::{CompareAndPutRequest, MoveValueRequest};
use crate::DatanodeId;
pub type RegionDistribution = BTreeMap<DatanodeId, Vec<RegionNumber>>;
@@ -63,11 +64,18 @@ impl TableRegionManager {
self.kv_backend
.get(&raw_key)
.await?
.map(|x| TableRegionValue::try_from_raw_value(x.1))
.map(|x| TableRegionValue::try_from_raw_value(x.value))
.transpose()
}
pub async fn compare_and_set(
/// Compare and put value of key. `expect` is the expected value, if backend's current value associated
/// with key is the same as `expect`, the value will be updated to `val`.
///
/// - If the compare-and-set operation successfully updated value, this method will return an `Ok(Ok())`
/// - If associated value is not the same as `expect`, no value will be updated and an `Ok(Err(Vec<u8>))`
/// will be returned, the `Err(Vec<u8>)` indicates the current associated value of key.
/// - If any error happens during operation, an `Err(Error)` will be returned.
pub async fn compare_and_put(
&self,
table_id: TableId,
expect: Option<TableRegionValue>,
@@ -88,17 +96,24 @@ impl TableRegionManager {
};
let raw_value = value.try_as_raw_value()?;
self.kv_backend
.compare_and_set(&raw_key, &expect, &raw_value)
.await
let req = CompareAndPutRequest::new()
.with_key(raw_key)
.with_expect(expect)
.with_value(raw_value);
let resp = self.kv_backend.compare_and_put(req).await?;
Ok(if resp.success {
Ok(())
} else {
Err(resp.prev_kv.map(|x| x.value))
})
}
pub async fn remove(&self, table_id: TableId) -> Result<()> {
let key = TableRegionKey::new(table_id);
let remove_key = to_removed_key(&String::from_utf8_lossy(key.as_raw_key().as_slice()));
self.kv_backend
.move_value(&key.as_raw_key(), remove_key.as_bytes())
.await
let key = TableRegionKey::new(table_id).as_raw_key();
let remove_key = to_removed_key(&String::from_utf8_lossy(&key));
let req = MoveValueRequest::new(key, remove_key.as_bytes());
self.kv_backend.move_value(req).await?;
Ok(())
}
}
@@ -118,7 +133,7 @@ mod tests {
let region_distribution =
RegionDistribution::from([(1, vec![1, 2, 3]), (2, vec![4, 5, 6])]);
let result = manager
.compare_and_set(1, None, region_distribution.clone())
.compare_and_put(1, None, region_distribution.clone())
.await
.unwrap();
assert!(result.is_ok());
@@ -126,7 +141,7 @@ mod tests {
let new_region_distribution =
RegionDistribution::from([(1, vec![4, 5, 6]), (2, vec![1, 2, 3])]);
let curr = manager
.compare_and_set(1, None, new_region_distribution.clone())
.compare_and_put(1, None, new_region_distribution.clone())
.await
.unwrap()
.unwrap_err()
@@ -141,7 +156,7 @@ mod tests {
);
assert!(manager
.compare_and_set(1, Some(curr), new_region_distribution.clone())
.compare_and_put(1, Some(curr), new_region_distribution.clone())
.await
.unwrap()
.is_ok());
@@ -163,8 +178,8 @@ mod tests {
.await
.unwrap()
.unwrap();
assert_eq!(b"__removed-__table_region/1", kv.0.as_slice());
let value = TableRegionValue::try_from_raw_value(kv.1).unwrap();
assert_eq!(b"__removed-__table_region/1", kv.key.as_slice());
let value = TableRegionValue::try_from_raw_value(kv.value).unwrap();
assert_eq!(value.region_distribution, new_region_distribution);
assert_eq!(value.version, 1);
}

View File

@@ -13,68 +13,84 @@
// limitations under the License.
pub mod memory;
pub mod txn;
use std::any::Any;
use std::pin::Pin;
use std::sync::Arc;
use async_trait::async_trait;
use common_error::ext::ErrorExt;
use futures::{Stream, StreamExt};
pub use txn::TxnService;
use crate::error::Error;
use crate::rpc::store::{
BatchDeleteRequest, BatchDeleteResponse, BatchGetRequest, BatchGetResponse, BatchPutRequest,
BatchPutResponse, CompareAndPutRequest, CompareAndPutResponse, DeleteRangeRequest,
DeleteRangeResponse, MoveValueRequest, MoveValueResponse, PutRequest, PutResponse,
RangeRequest, RangeResponse,
};
use crate::rpc::KeyValue;
#[derive(Debug, Clone, PartialEq)]
pub struct Kv(pub Vec<u8>, pub Vec<u8>);
pub type ValueIter<'a, E> = Pin<Box<dyn Stream<Item = Result<Kv, E>> + Send + 'a>>;
pub type KvBackendRef = Arc<dyn KvBackend<Error = Error>>;
pub type KvBackendRef = Arc<dyn KvBackend<Error = Error> + Send + Sync>;
#[async_trait]
pub trait KvBackend: Send + Sync {
type Error: ErrorExt;
pub trait KvBackend: TxnService
where
Self::Error: ErrorExt,
{
fn name(&self) -> &str;
fn range<'a, 'b>(&'a self, key: &[u8]) -> ValueIter<'b, Self::Error>
where
'a: 'b;
async fn range(&self, req: RangeRequest) -> Result<RangeResponse, Self::Error>;
async fn set(&self, key: &[u8], val: &[u8]) -> Result<(), Self::Error>;
async fn put(&self, req: PutRequest) -> Result<PutResponse, Self::Error>;
/// Compare and set value of key. `expect` is the expected value, if backend's current value associated
/// with key is the same as `expect`, the value will be updated to `val`.
///
/// - If the compare-and-set operation successfully updated value, this method will return an `Ok(Ok())`
/// - If associated value is not the same as `expect`, no value will be updated and an `Ok(Err(Vec<u8>))`
/// will be returned, the `Err(Vec<u8>)` indicates the current associated value of key.
/// - If any error happens during operation, an `Err(Error)` will be returned.
async fn compare_and_set(
async fn batch_put(&self, req: BatchPutRequest) -> Result<BatchPutResponse, Self::Error>;
async fn compare_and_put(
&self,
key: &[u8],
expect: &[u8],
val: &[u8],
) -> Result<Result<(), Option<Vec<u8>>>, Self::Error>;
req: CompareAndPutRequest,
) -> Result<CompareAndPutResponse, Self::Error>;
async fn delete_range(&self, key: &[u8], end: &[u8]) -> Result<(), Self::Error>;
async fn delete_range(
&self,
req: DeleteRangeRequest,
) -> Result<DeleteRangeResponse, Self::Error>;
async fn delete(&self, key: &[u8]) -> Result<(), Self::Error> {
self.delete_range(key, &[]).await
async fn delete(&self, key: &[u8], prev_kv: bool) -> Result<Option<KeyValue>, Self::Error> {
let mut req = DeleteRangeRequest::new().with_key(key.to_vec());
if prev_kv {
req = req.with_prev_kv();
}
let resp = self.delete_range(req).await?;
if prev_kv {
Ok(resp.prev_kvs.into_iter().next())
} else {
Ok(None)
}
}
async fn batch_delete(
&self,
req: BatchDeleteRequest,
) -> Result<BatchDeleteResponse, Self::Error>;
/// Default get is implemented based on `range` method.
async fn get(&self, key: &[u8]) -> Result<Option<Kv>, Self::Error> {
let mut iter = self.range(key);
while let Some(r) = iter.next().await {
let kv = r?;
if kv.0 == key {
return Ok(Some(kv));
}
}
return Ok(None);
async fn get(&self, key: &[u8]) -> Result<Option<KeyValue>, Self::Error> {
let req = RangeRequest::new().with_key(key.to_vec());
let mut resp = self.range(req).await?;
Ok(if resp.kvs.is_empty() {
None
} else {
Some(resp.kvs.remove(0))
})
}
async fn batch_get(&self, req: BatchGetRequest) -> Result<BatchGetResponse, Self::Error>;
/// MoveValue atomically renames the key to the given updated key.
async fn move_value(&self, from_key: &[u8], to_key: &[u8]) -> Result<(), Self::Error>;
async fn move_value(&self, req: MoveValueRequest) -> Result<MoveValueResponse, Self::Error>;
fn as_any(&self) -> &dyn Any;
}

View File

@@ -16,20 +16,32 @@ use std::any::Any;
use std::collections::btree_map::Entry;
use std::collections::BTreeMap;
use std::fmt::{Display, Formatter};
use std::marker::PhantomData;
use std::ops::Range;
use std::sync::RwLock;
use async_stream::stream;
use async_trait::async_trait;
use common_error::prelude::ErrorExt;
use common_telemetry::timer;
use serde::Serializer;
use crate::error::Error;
use crate::kv_backend::{Kv, KvBackend, ValueIter};
use crate::kv_backend::txn::{Txn, TxnOp, TxnOpResponse, TxnRequest, TxnResponse};
use crate::kv_backend::{KvBackend, TxnService};
use crate::metrics::METRIC_META_TXN_REQUEST;
use crate::rpc::store::{
BatchDeleteRequest, BatchDeleteResponse, BatchGetRequest, BatchGetResponse, BatchPutRequest,
BatchPutResponse, CompareAndPutRequest, CompareAndPutResponse, DeleteRangeRequest,
DeleteRangeResponse, MoveValueRequest, MoveValueResponse, PutRequest, PutResponse,
RangeRequest, RangeResponse,
};
use crate::rpc::KeyValue;
pub struct MemoryKvBackend {
pub struct MemoryKvBackend<T> {
kvs: RwLock<BTreeMap<Vec<u8>, Vec<u8>>>,
_phantom: PhantomData<T>,
}
impl Display for MemoryKvBackend {
impl<T> Display for MemoryKvBackend<T> {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
let kvs = self.kvs.read().unwrap();
for (k, v) in kvs.iter() {
@@ -42,93 +54,239 @@ impl Display for MemoryKvBackend {
}
}
impl Default for MemoryKvBackend {
impl<T> Default for MemoryKvBackend<T> {
fn default() -> Self {
Self {
kvs: RwLock::new(BTreeMap::new()),
_phantom: PhantomData,
}
}
}
impl<T> MemoryKvBackend<T> {
pub fn new() -> Self {
Self::default()
}
pub fn clear(&self) {
let mut kvs = self.kvs.write().unwrap();
kvs.clear();
}
}
#[async_trait]
impl KvBackend for MemoryKvBackend {
type Error = Error;
impl<T: ErrorExt + Send + Sync + 'static> KvBackend for MemoryKvBackend<T> {
fn name(&self) -> &str {
"Memory"
}
async fn range(&self, req: RangeRequest) -> Result<RangeResponse, Self::Error> {
let RangeRequest {
key,
range_end,
limit,
keys_only,
} = req;
fn range<'a, 'b>(&'a self, prefix: &[u8]) -> ValueIter<'b, Error>
where
'a: 'b,
{
let kvs = self.kvs.read().unwrap();
let kvs = kvs.clone();
let prefix = prefix.to_vec();
Box::pin(stream!({
for (k, v) in kvs.range(prefix.clone()..) {
if !k.starts_with(&prefix) {
break;
let iter: Box<dyn Iterator<Item = (&Vec<u8>, &Vec<u8>)>> = if range_end.is_empty() {
Box::new(kvs.get_key_value(&key).into_iter())
} else {
Box::new(kvs.range(key..range_end))
};
let mut kvs = iter
.map(|(k, v)| {
let key = k.clone();
let value = if keys_only { vec![] } else { v.clone() };
KeyValue { key, value }
})
.collect::<Vec<_>>();
let more = if limit > 0 && kvs.len() > limit as usize {
kvs.truncate(limit as usize);
true
} else {
false
};
Ok(RangeResponse { kvs, more })
}
async fn put(&self, req: PutRequest) -> Result<PutResponse, Self::Error> {
let PutRequest {
key,
value,
prev_kv,
} = req;
let mut kvs = self.kvs.write().unwrap();
let prev_kv = if prev_kv {
kvs.insert(key.clone(), value)
.map(|value| KeyValue { key, value })
} else {
kvs.insert(key, value);
None
};
Ok(PutResponse { prev_kv })
}
async fn batch_put(&self, req: BatchPutRequest) -> Result<BatchPutResponse, Self::Error> {
let mut kvs = self.kvs.write().unwrap();
let mut prev_kvs = if req.prev_kv {
Vec::with_capacity(req.kvs.len())
} else {
vec![]
};
for kv in req.kvs {
if req.prev_kv {
if let Some(value) = kvs.insert(kv.key.clone(), kv.value) {
prev_kvs.push(KeyValue { key: kv.key, value });
}
yield Ok(Kv(k.clone(), v.clone()));
} else {
kvs.insert(kv.key, kv.value);
}
}))
}
Ok(BatchPutResponse { prev_kvs })
}
async fn set(&self, key: &[u8], val: &[u8]) -> Result<(), Error> {
let mut kvs = self.kvs.write().unwrap();
let _ = kvs.insert(key.to_vec(), val.to_vec());
Ok(())
}
async fn compare_and_set(
async fn compare_and_put(
&self,
key: &[u8],
expect: &[u8],
val: &[u8],
) -> Result<Result<(), Option<Vec<u8>>>, Error> {
let key = key.to_vec();
let val = val.to_vec();
req: CompareAndPutRequest,
) -> Result<CompareAndPutResponse, Self::Error> {
let CompareAndPutRequest { key, expect, value } = req;
let mut kvs = self.kvs.write().unwrap();
let existed = kvs.entry(key);
Ok(match existed {
let (success, prev_kv) = match existed {
Entry::Vacant(e) => {
if expect.is_empty() {
let _ = e.insert(val);
Ok(())
} else {
Err(None)
let expected = expect.is_empty();
if expected {
let _ = e.insert(value);
}
(expected, None)
}
Entry::Occupied(mut existed) => {
if existed.get() == expect {
let _ = existed.insert(val);
Ok(())
let expected = existed.get() == &expect;
let prev_kv = if expected {
let _ = existed.insert(value);
None
} else {
Err(Some(existed.get().clone()))
}
Some(KeyValue {
key: existed.key().clone(),
value: existed.get().clone(),
})
};
(expected, prev_kv)
}
};
Ok(CompareAndPutResponse { success, prev_kv })
}
async fn delete_range(
&self,
req: DeleteRangeRequest,
) -> Result<DeleteRangeResponse, Self::Error> {
let DeleteRangeRequest {
key,
range_end,
prev_kv,
} = req;
let mut kvs = self.kvs.write().unwrap();
let prev_kvs = if range_end.is_empty() {
kvs.remove(&key)
.into_iter()
.map(|value| KeyValue {
key: key.clone(),
value,
})
.collect::<Vec<_>>()
} else {
let range = Range {
start: key,
end: range_end,
};
kvs.drain_filter(|key, _| range.contains(key))
.map(Into::into)
.collect::<Vec<_>>()
};
Ok(DeleteRangeResponse {
deleted: prev_kvs.len() as i64,
prev_kvs: if prev_kv { prev_kvs } else { vec![] },
})
}
async fn delete_range(&self, key: &[u8], end: &[u8]) -> Result<(), Error> {
async fn batch_delete(
&self,
req: BatchDeleteRequest,
) -> Result<BatchDeleteResponse, Self::Error> {
let mut kvs = self.kvs.write().unwrap();
if end.is_empty() {
let _ = kvs.remove(key);
} else {
let start = key.to_vec();
let end = end.to_vec();
let range = start..end;
kvs.retain(|k, _| !range.contains(k));
let mut prev_kvs = if req.prev_kv {
Vec::with_capacity(req.keys.len())
} else {
vec![]
};
for key in req.keys {
if req.prev_kv {
if let Some(value) = kvs.remove(&key) {
prev_kvs.push(KeyValue { key, value });
}
} else {
kvs.remove(&key);
}
}
Ok(())
Ok(BatchDeleteResponse { prev_kvs })
}
async fn move_value(&self, from_key: &[u8], to_key: &[u8]) -> Result<(), Error> {
async fn batch_get(&self, req: BatchGetRequest) -> Result<BatchGetResponse, Self::Error> {
let kvs = self.kvs.read().unwrap();
let kvs = req
.keys
.into_iter()
.filter_map(|key| {
kvs.get_key_value(&key).map(|(k, v)| KeyValue {
key: k.clone(),
value: v.clone(),
})
})
.collect::<Vec<_>>();
Ok(BatchGetResponse { kvs })
}
async fn move_value(&self, req: MoveValueRequest) -> Result<MoveValueResponse, Self::Error> {
let MoveValueRequest { from_key, to_key } = req;
let mut kvs = self.kvs.write().unwrap();
if let Some(v) = kvs.remove(from_key) {
let _ = kvs.insert(to_key.to_vec(), v);
}
Ok(())
let kv = if let Some(v) = kvs.remove(&from_key) {
kvs.insert(to_key, v.clone());
Some(KeyValue {
key: from_key,
value: v,
})
} else {
kvs.get(&to_key).map(|v| KeyValue {
key: to_key,
value: v.clone(),
})
};
Ok(MoveValueResponse(kv))
}
fn as_any(&self) -> &dyn Any {
@@ -136,62 +294,383 @@ impl KvBackend for MemoryKvBackend {
}
}
#[cfg(test)]
mod tests {
use futures::TryStreamExt;
#[async_trait]
impl<T: ErrorExt + Send + Sync> TxnService for MemoryKvBackend<T> {
type Error = T;
use super::*;
#[tokio::test]
async fn test_memory_kv_backend() {
let backend = MemoryKvBackend::default();
for i in 1..10 {
let key = format!("key{}", i);
let val = format!("val{}", i);
assert!(backend.set(key.as_bytes(), val.as_bytes()).await.is_ok());
}
let result = backend
.compare_and_set(b"hello", b"what", b"world")
.await
.unwrap();
assert!(result.unwrap_err().is_none());
let result = backend
.compare_and_set(b"hello", b"", b"world")
.await
.unwrap();
assert!(result.is_ok());
let result = backend
.compare_and_set(b"hello", b"world", b"greptime")
.await
.unwrap();
assert!(result.is_ok());
let result = backend
.compare_and_set(b"hello", b"world", b"what")
.await
.unwrap();
assert_eq!(result.unwrap_err().unwrap(), b"greptime");
assert!(backend.delete_range(b"key1", &[]).await.is_ok());
assert!(backend.delete_range(b"key3", b"key9").await.is_ok());
assert!(backend.move_value(b"key9", b"key10").await.is_ok());
assert_eq!(
backend.to_string(),
r#"hello -> greptime
key10 -> val9
key2 -> val2
"#
async fn txn(&self, txn: Txn) -> Result<TxnResponse, Self::Error> {
let _timer = timer!(
METRIC_META_TXN_REQUEST,
&[("target", "memory"), ("op", "txn")]
);
let range = backend.range(b"key").try_collect::<Vec<_>>().await.unwrap();
assert_eq!(range.len(), 2);
assert_eq!(range[0], Kv(b"key10".to_vec(), b"val9".to_vec()));
assert_eq!(range[1], Kv(b"key2".to_vec(), b"val2".to_vec()));
let TxnRequest {
compare,
success,
failure,
} = txn.into();
let mut kvs = self.kvs.write().unwrap();
let succeeded = compare
.iter()
.all(|x| x.compare_with_value(kvs.get(&x.key)));
let do_txn = |txn_op| match txn_op {
TxnOp::Put(key, value) => {
let prev_value = kvs.insert(key.clone(), value);
let prev_kv = prev_value.map(|value| KeyValue { key, value });
TxnOpResponse::ResponsePut(PutResponse { prev_kv })
}
TxnOp::Get(key) => {
let value = kvs.get(&key);
let kvs = value
.into_iter()
.map(|value| KeyValue {
key: key.clone(),
value: value.clone(),
})
.collect();
TxnOpResponse::ResponseGet(RangeResponse { kvs, more: false })
}
TxnOp::Delete(key) => {
let prev_value = kvs.remove(&key);
let deleted = prev_value.as_ref().map(|x| x.len()).unwrap_or(0) as i64;
let prev_kvs = prev_value
.into_iter()
.map(|value| KeyValue {
key: key.clone(),
value,
})
.collect();
TxnOpResponse::ResponseDelete(DeleteRangeResponse { deleted, prev_kvs })
}
};
let responses: Vec<_> = if succeeded { success } else { failure }
.into_iter()
.map(do_txn)
.collect();
Ok(TxnResponse {
succeeded,
responses,
})
}
}
#[cfg(test)]
mod tests {
use std::sync::atomic::{AtomicU8, Ordering};
use std::sync::Arc;
use super::*;
use crate::error::Error;
use crate::kv_backend::KvBackend;
use crate::rpc::store::{BatchGetRequest, BatchPutRequest};
use crate::rpc::KeyValue;
use crate::util;
async fn mock_mem_store_with_data() -> MemoryKvBackend<Error> {
let kv_store = MemoryKvBackend::<Error>::new();
let kvs = mock_kvs();
assert!(kv_store
.batch_put(BatchPutRequest {
kvs,
..Default::default()
})
.await
.is_ok());
assert!(kv_store
.put(PutRequest {
key: b"key11".to_vec(),
value: b"val11".to_vec(),
..Default::default()
})
.await
.is_ok());
kv_store
}
fn mock_kvs() -> Vec<KeyValue> {
vec![
KeyValue {
key: b"key1".to_vec(),
value: b"val1".to_vec(),
},
KeyValue {
key: b"key2".to_vec(),
value: b"val2".to_vec(),
},
KeyValue {
key: b"key3".to_vec(),
value: b"val3".to_vec(),
},
]
}
#[tokio::test]
async fn test_put() {
let kv_store = mock_mem_store_with_data().await;
let resp = kv_store
.put(PutRequest {
key: b"key11".to_vec(),
value: b"val12".to_vec(),
prev_kv: false,
})
.await
.unwrap();
assert!(resp.prev_kv.is_none());
let resp = kv_store
.put(PutRequest {
key: b"key11".to_vec(),
value: b"val13".to_vec(),
prev_kv: true,
})
.await
.unwrap();
let prev_kv = resp.prev_kv.unwrap();
assert_eq!(b"key11", prev_kv.key());
assert_eq!(b"val12", prev_kv.value());
}
#[tokio::test]
async fn test_range() {
let kv_store = mock_mem_store_with_data().await;
let key = b"key1".to_vec();
let range_end = util::get_prefix_end_key(b"key1");
let resp = kv_store
.range(RangeRequest {
key: key.clone(),
range_end: range_end.clone(),
limit: 0,
keys_only: false,
})
.await
.unwrap();
assert_eq!(2, resp.kvs.len());
assert_eq!(b"key1", resp.kvs[0].key());
assert_eq!(b"val1", resp.kvs[0].value());
assert_eq!(b"key11", resp.kvs[1].key());
assert_eq!(b"val11", resp.kvs[1].value());
let resp = kv_store
.range(RangeRequest {
key: key.clone(),
range_end: range_end.clone(),
limit: 0,
keys_only: true,
})
.await
.unwrap();
assert_eq!(2, resp.kvs.len());
assert_eq!(b"key1", resp.kvs[0].key());
assert_eq!(b"", resp.kvs[0].value());
assert_eq!(b"key11", resp.kvs[1].key());
assert_eq!(b"", resp.kvs[1].value());
let resp = kv_store
.range(RangeRequest {
key: key.clone(),
limit: 0,
keys_only: false,
..Default::default()
})
.await
.unwrap();
assert_eq!(1, resp.kvs.len());
assert_eq!(b"key1", resp.kvs[0].key());
assert_eq!(b"val1", resp.kvs[0].value());
let resp = kv_store
.range(RangeRequest {
key,
range_end,
limit: 1,
keys_only: false,
})
.await
.unwrap();
assert_eq!(1, resp.kvs.len());
assert_eq!(b"key1", resp.kvs[0].key());
assert_eq!(b"val1", resp.kvs[0].value());
}
#[tokio::test]
async fn test_batch_get() {
let kv_store = mock_mem_store_with_data().await;
let keys = vec![];
let resp = kv_store.batch_get(BatchGetRequest { keys }).await.unwrap();
assert!(resp.kvs.is_empty());
let keys = vec![b"key10".to_vec()];
let resp = kv_store.batch_get(BatchGetRequest { keys }).await.unwrap();
assert!(resp.kvs.is_empty());
let keys = vec![b"key1".to_vec(), b"key3".to_vec(), b"key4".to_vec()];
let resp = kv_store.batch_get(BatchGetRequest { keys }).await.unwrap();
assert_eq!(2, resp.kvs.len());
assert_eq!(b"key1", resp.kvs[0].key());
assert_eq!(b"val1", resp.kvs[0].value());
assert_eq!(b"key3", resp.kvs[1].key());
assert_eq!(b"val3", resp.kvs[1].value());
}
#[tokio::test(flavor = "multi_thread")]
async fn test_compare_and_put() {
let kv_store = Arc::new(MemoryKvBackend::<Error>::new());
let success = Arc::new(AtomicU8::new(0));
let mut joins = vec![];
for _ in 0..20 {
let kv_store_clone = kv_store.clone();
let success_clone = success.clone();
let join = tokio::spawn(async move {
let req = CompareAndPutRequest {
key: b"key".to_vec(),
expect: vec![],
value: b"val_new".to_vec(),
};
let resp = kv_store_clone.compare_and_put(req).await.unwrap();
if resp.success {
success_clone.fetch_add(1, Ordering::SeqCst);
}
});
joins.push(join);
}
for join in joins {
join.await.unwrap();
}
assert_eq!(1, success.load(Ordering::SeqCst));
}
#[tokio::test]
async fn test_delete_range() {
let kv_store = mock_mem_store_with_data().await;
let req = DeleteRangeRequest {
key: b"key3".to_vec(),
range_end: vec![],
prev_kv: true,
};
let resp = kv_store.delete_range(req).await.unwrap();
assert_eq!(1, resp.prev_kvs.len());
assert_eq!(b"key3", resp.prev_kvs[0].key());
assert_eq!(b"val3", resp.prev_kvs[0].value());
let resp = kv_store.get(b"key3").await.unwrap();
assert!(resp.is_none());
let req = DeleteRangeRequest {
key: b"key2".to_vec(),
range_end: vec![],
prev_kv: false,
};
let resp = kv_store.delete_range(req).await.unwrap();
assert!(resp.prev_kvs.is_empty());
let resp = kv_store.get(b"key2").await.unwrap();
assert!(resp.is_none());
let key = b"key1".to_vec();
let range_end = util::get_prefix_end_key(b"key1");
let req = DeleteRangeRequest {
key: key.clone(),
range_end: range_end.clone(),
prev_kv: true,
};
let resp = kv_store.delete_range(req).await.unwrap();
assert_eq!(2, resp.prev_kvs.len());
let req = RangeRequest {
key,
range_end,
..Default::default()
};
let resp = kv_store.range(req).await.unwrap();
assert!(resp.kvs.is_empty());
}
#[tokio::test]
async fn test_move_value() {
let kv_store = mock_mem_store_with_data().await;
let req = MoveValueRequest {
from_key: b"key1".to_vec(),
to_key: b"key111".to_vec(),
};
let resp = kv_store.move_value(req).await.unwrap();
assert_eq!(b"key1", resp.0.as_ref().unwrap().key());
assert_eq!(b"val1", resp.0.as_ref().unwrap().value());
let kv_store = mock_mem_store_with_data().await;
let req = MoveValueRequest {
from_key: b"notexistkey".to_vec(),
to_key: b"key222".to_vec(),
};
let resp = kv_store.move_value(req).await.unwrap();
assert!(resp.0.is_none());
}
#[tokio::test]
async fn test_batch_delete() {
let kv_store = mock_mem_store_with_data().await;
assert!(kv_store.get(b"key1").await.unwrap().is_some());
assert!(kv_store.get(b"key100").await.unwrap().is_none());
let req = BatchDeleteRequest {
keys: vec![b"key1".to_vec(), b"key100".to_vec()],
prev_kv: true,
};
let resp = kv_store.batch_delete(req).await.unwrap();
assert_eq!(1, resp.prev_kvs.len());
assert_eq!(
vec![KeyValue {
key: b"key1".to_vec(),
value: b"val1".to_vec()
}],
resp.prev_kvs
);
assert!(kv_store.get(b"key1").await.unwrap().is_none());
assert!(kv_store.get(b"key2").await.unwrap().is_some());
assert!(kv_store.get(b"key3").await.unwrap().is_some());
let req = BatchDeleteRequest {
keys: vec![b"key2".to_vec(), b"key3".to_vec()],
prev_kv: false,
};
let resp = kv_store.batch_delete(req).await.unwrap();
assert!(resp.prev_kvs.is_empty());
assert!(kv_store.get(b"key2").await.unwrap().is_none());
assert!(kv_store.get(b"key3").await.unwrap().is_none());
}
}

View File

@@ -0,0 +1,471 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
mod etcd;
use common_error::prelude::ErrorExt;
use crate::rpc::store::{DeleteRangeResponse, PutResponse, RangeResponse};
#[async_trait::async_trait]
pub trait TxnService: Sync + Send {
type Error: ErrorExt;
async fn txn(&self, _txn: Txn) -> Result<TxnResponse, Self::Error> {
unimplemented!("txn is not implemented")
}
}
#[derive(Debug, Clone, PartialEq)]
pub enum CompareOp {
Equal,
Greater,
Less,
NotEqual,
}
#[derive(Debug, Clone, PartialEq)]
pub struct Compare {
pub key: Vec<u8>,
pub cmp: CompareOp,
/// None means the key does not exist.
pub target: Option<Vec<u8>>,
}
impl Compare {
pub fn new(key: Vec<u8>, cmp: CompareOp, target: Option<Vec<u8>>) -> Self {
Self { key, cmp, target }
}
pub fn with_value(key: Vec<u8>, cmp: CompareOp, target: Vec<u8>) -> Self {
Self::new(key, cmp, Some(target))
}
pub fn with_not_exist_value(key: Vec<u8>, cmp: CompareOp) -> Self {
Self::new(key, cmp, None)
}
pub fn compare_with_value(&self, value: Option<&Vec<u8>>) -> bool {
match (value, &self.target) {
(Some(value), Some(target)) => match self.cmp {
CompareOp::Equal => *value == *target,
CompareOp::Greater => *value > *target,
CompareOp::Less => *value < *target,
CompareOp::NotEqual => *value != *target,
},
(Some(_), None) => match self.cmp {
CompareOp::Equal => false,
CompareOp::Greater => true,
CompareOp::Less => false,
CompareOp::NotEqual => true,
},
(None, Some(_)) => match self.cmp {
CompareOp::Equal => false,
CompareOp::Greater => false,
CompareOp::Less => true,
CompareOp::NotEqual => true,
},
(None, None) => match self.cmp {
CompareOp::Equal => true,
CompareOp::Greater => false,
CompareOp::Less => false,
CompareOp::NotEqual => false,
},
}
}
}
#[derive(Debug, Clone, PartialEq)]
pub enum TxnOp {
Put(Vec<u8>, Vec<u8>),
Get(Vec<u8>),
Delete(Vec<u8>),
}
#[derive(Debug, Clone, Default, PartialEq)]
pub struct TxnRequest {
pub compare: Vec<Compare>,
pub success: Vec<TxnOp>,
pub failure: Vec<TxnOp>,
}
#[derive(Debug, Clone, PartialEq)]
pub enum TxnOpResponse {
ResponsePut(PutResponse),
ResponseGet(RangeResponse),
ResponseDelete(DeleteRangeResponse),
}
pub struct TxnResponse {
pub succeeded: bool,
pub responses: Vec<TxnOpResponse>,
}
#[derive(Debug, Clone, Default, PartialEq)]
pub struct Txn {
req: TxnRequest,
c_when: bool,
c_then: bool,
c_else: bool,
}
impl Txn {
pub fn new() -> Self {
Txn::default()
}
/// Takes a list of comparison. If all comparisons passed in succeed,
/// the operations passed into `and_then()` will be executed. Or the operations
/// passed into `or_else()` will be executed.
#[inline]
pub fn when(mut self, compares: impl Into<Vec<Compare>>) -> Self {
assert!(!self.c_when, "cannot call `when` twice");
assert!(!self.c_then, "cannot call `when` after `and_then`");
assert!(!self.c_else, "cannot call `when` after `or_else`");
self.c_when = true;
self.req.compare = compares.into();
self
}
/// Takes a list of operations. The operations list will be executed, if the
/// comparisons passed into `when()` succeed.
#[inline]
pub fn and_then(mut self, operations: impl Into<Vec<TxnOp>>) -> Self {
assert!(!self.c_then, "cannot call `and_then` twice");
assert!(!self.c_else, "cannot call `and_then` after `or_else`");
self.c_then = true;
self.req.success = operations.into();
self
}
/// Takes a list of operations. The operations list will be executed, if the
/// comparisons passed into `when()` fail.
#[inline]
pub fn or_else(mut self, operations: impl Into<Vec<TxnOp>>) -> Self {
assert!(!self.c_else, "cannot call `or_else` twice");
self.c_else = true;
self.req.failure = operations.into();
self
}
}
impl From<Txn> for TxnRequest {
fn from(txn: Txn) -> Self {
txn.req
}
}
#[cfg(test)]
mod tests {
use std::sync::Arc;
use super::*;
use crate::error::Error;
use crate::kv_backend::memory::MemoryKvBackend;
use crate::kv_backend::KvBackendRef;
use crate::rpc::store::PutRequest;
use crate::rpc::KeyValue;
#[test]
fn test_compare() {
// Equal
let compare = Compare::with_value(vec![1], CompareOp::Equal, vec![1]);
assert!(compare.compare_with_value(Some(&vec![1])));
assert!(!compare.compare_with_value(None));
let compare = Compare::with_not_exist_value(vec![1], CompareOp::Equal);
assert!(compare.compare_with_value(None));
// Greater
let compare = Compare::with_value(vec![1], CompareOp::Greater, vec![1]);
assert!(compare.compare_with_value(Some(&vec![2])));
assert!(!compare.compare_with_value(None));
let compare = Compare::with_not_exist_value(vec![1], CompareOp::Greater);
assert!(!compare.compare_with_value(None));
assert!(compare.compare_with_value(Some(&vec![1])));
// Less
let compare = Compare::with_value(vec![1], CompareOp::Less, vec![1]);
assert!(compare.compare_with_value(Some(&vec![0])));
assert!(compare.compare_with_value(None));
let compare = Compare::with_not_exist_value(vec![1], CompareOp::Less);
assert!(!compare.compare_with_value(None));
assert!(!compare.compare_with_value(Some(&vec![1])));
// NotEqual
let compare = Compare::with_value(vec![1], CompareOp::NotEqual, vec![1]);
assert!(!compare.compare_with_value(Some(&vec![1])));
assert!(compare.compare_with_value(Some(&vec![2])));
assert!(compare.compare_with_value(None));
let compare = Compare::with_not_exist_value(vec![1], CompareOp::NotEqual);
assert!(!compare.compare_with_value(None));
assert!(compare.compare_with_value(Some(&vec![1])));
}
#[test]
fn test_txn() {
let txn = Txn::new()
.when(vec![Compare::with_value(
vec![1],
CompareOp::Equal,
vec![1],
)])
.and_then(vec![TxnOp::Put(vec![1], vec![1])])
.or_else(vec![TxnOp::Put(vec![1], vec![2])]);
assert_eq!(
txn,
Txn {
req: TxnRequest {
compare: vec![Compare::with_value(vec![1], CompareOp::Equal, vec![1])],
success: vec![TxnOp::Put(vec![1], vec![1])],
failure: vec![TxnOp::Put(vec![1], vec![2])],
},
c_when: true,
c_then: true,
c_else: true,
}
);
}
#[tokio::test]
async fn test_txn_one_compare_op() {
let kv_store = create_kv_store().await;
let _ = kv_store
.put(PutRequest {
key: vec![11],
value: vec![3],
..Default::default()
})
.await
.unwrap();
let txn = Txn::new()
.when(vec![Compare::with_value(
vec![11],
CompareOp::Greater,
vec![1],
)])
.and_then(vec![TxnOp::Put(vec![11], vec![1])])
.or_else(vec![TxnOp::Put(vec![11], vec![2])]);
let txn_response = kv_store.txn(txn).await.unwrap();
assert!(txn_response.succeeded);
assert_eq!(txn_response.responses.len(), 1);
}
#[tokio::test]
async fn test_txn_multi_compare_op() {
let kv_store = create_kv_store().await;
for i in 1..3 {
let _ = kv_store
.put(PutRequest {
key: vec![i],
value: vec![i],
..Default::default()
})
.await
.unwrap();
}
let when: Vec<_> = (1..3u8)
.map(|i| Compare::with_value(vec![i], CompareOp::Equal, vec![i]))
.collect();
let txn = Txn::new()
.when(when)
.and_then(vec![
TxnOp::Put(vec![1], vec![10]),
TxnOp::Put(vec![2], vec![20]),
])
.or_else(vec![TxnOp::Put(vec![1], vec![11])]);
let txn_response = kv_store.txn(txn).await.unwrap();
assert!(txn_response.succeeded);
assert_eq!(txn_response.responses.len(), 2);
}
#[tokio::test]
async fn test_txn_compare_equal() {
let kv_store = create_kv_store().await;
let key = vec![101u8];
kv_store.delete(&key, false).await.unwrap();
let txn = Txn::new()
.when(vec![Compare::with_not_exist_value(
key.clone(),
CompareOp::Equal,
)])
.and_then(vec![TxnOp::Put(key.clone(), vec![1])])
.or_else(vec![TxnOp::Put(key.clone(), vec![2])]);
let txn_response = kv_store.txn(txn.clone()).await.unwrap();
assert!(txn_response.succeeded);
let txn_response = kv_store.txn(txn).await.unwrap();
assert!(!txn_response.succeeded);
let txn = Txn::new()
.when(vec![Compare::with_value(
key.clone(),
CompareOp::Equal,
vec![2],
)])
.and_then(vec![TxnOp::Put(key.clone(), vec![3])])
.or_else(vec![TxnOp::Put(key, vec![4])]);
let txn_response = kv_store.txn(txn).await.unwrap();
assert!(txn_response.succeeded);
}
#[tokio::test]
async fn test_txn_compare_greater() {
let kv_store = create_kv_store().await;
let key = vec![102u8];
kv_store.delete(&key, false).await.unwrap();
let txn = Txn::new()
.when(vec![Compare::with_not_exist_value(
key.clone(),
CompareOp::Greater,
)])
.and_then(vec![TxnOp::Put(key.clone(), vec![1])])
.or_else(vec![TxnOp::Put(key.clone(), vec![2])]);
let txn_response = kv_store.txn(txn.clone()).await.unwrap();
assert!(!txn_response.succeeded);
let txn_response = kv_store.txn(txn).await.unwrap();
assert!(txn_response.succeeded);
let txn = Txn::new()
.when(vec![Compare::with_value(
key.clone(),
CompareOp::Greater,
vec![1],
)])
.and_then(vec![TxnOp::Put(key.clone(), vec![3])])
.or_else(vec![TxnOp::Get(key.clone())]);
let mut txn_response = kv_store.txn(txn).await.unwrap();
assert!(!txn_response.succeeded);
let res = txn_response.responses.pop().unwrap();
assert_eq!(
res,
TxnOpResponse::ResponseGet(RangeResponse {
kvs: vec![KeyValue {
key,
value: vec![1]
}],
more: false,
})
);
}
#[tokio::test]
async fn test_txn_compare_less() {
let kv_store = create_kv_store().await;
let key = vec![103u8];
kv_store.delete(&[3], false).await.unwrap();
let txn = Txn::new()
.when(vec![Compare::with_not_exist_value(
key.clone(),
CompareOp::Less,
)])
.and_then(vec![TxnOp::Put(key.clone(), vec![1])])
.or_else(vec![TxnOp::Put(key.clone(), vec![2])]);
let txn_response = kv_store.txn(txn.clone()).await.unwrap();
assert!(!txn_response.succeeded);
let txn_response = kv_store.txn(txn).await.unwrap();
assert!(!txn_response.succeeded);
let txn = Txn::new()
.when(vec![Compare::with_value(
key.clone(),
CompareOp::Less,
vec![2],
)])
.and_then(vec![TxnOp::Put(key.clone(), vec![3])])
.or_else(vec![TxnOp::Get(key.clone())]);
let mut txn_response = kv_store.txn(txn).await.unwrap();
assert!(!txn_response.succeeded);
let res = txn_response.responses.pop().unwrap();
assert_eq!(
res,
TxnOpResponse::ResponseGet(RangeResponse {
kvs: vec![KeyValue {
key,
value: vec![2]
}],
more: false,
})
);
}
#[tokio::test]
async fn test_txn_compare_not_equal() {
let kv_store = create_kv_store().await;
let key = vec![104u8];
kv_store.delete(&key, false).await.unwrap();
let txn = Txn::new()
.when(vec![Compare::with_not_exist_value(
key.clone(),
CompareOp::NotEqual,
)])
.and_then(vec![TxnOp::Put(key.clone(), vec![1])])
.or_else(vec![TxnOp::Put(key.clone(), vec![2])]);
let txn_response = kv_store.txn(txn.clone()).await.unwrap();
assert!(!txn_response.succeeded);
let txn_response = kv_store.txn(txn).await.unwrap();
assert!(txn_response.succeeded);
let txn = Txn::new()
.when(vec![Compare::with_value(
key.clone(),
CompareOp::Equal,
vec![2],
)])
.and_then(vec![TxnOp::Put(key.clone(), vec![3])])
.or_else(vec![TxnOp::Get(key.clone())]);
let mut txn_response = kv_store.txn(txn).await.unwrap();
assert!(!txn_response.succeeded);
let res = txn_response.responses.pop().unwrap();
assert_eq!(
res,
TxnOpResponse::ResponseGet(RangeResponse {
kvs: vec![KeyValue {
key,
value: vec![1]
}],
more: false,
})
);
}
async fn create_kv_store() -> KvBackendRef {
Arc::new(MemoryKvBackend::<Error>::new())
// TODO(jiachun): Add a feature to test against etcd in github CI
//
// The same test can be run against etcd by uncommenting the following line
// crate::service::store::etcd::EtcdStore::with_endpoints(["127.0.0.1:2379"])
// .await
// .unwrap()
}
}

View File

@@ -0,0 +1,135 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use etcd_client::{
Compare as EtcdCompare, CompareOp as EtcdCompareOp, Txn as EtcdTxn, TxnOp as EtcdTxnOp,
TxnOpResponse as EtcdTxnOpResponse, TxnResponse as EtcdTxnResponse,
};
use super::{Compare, CompareOp, Txn, TxnOp, TxnOpResponse, TxnResponse};
use crate::error::{self, Result};
use crate::rpc::store::{DeleteRangeResponse, PutResponse, RangeResponse};
impl From<Txn> for EtcdTxn {
fn from(txn: Txn) -> Self {
let mut etcd_txn = EtcdTxn::new();
if txn.c_when {
let compares = txn
.req
.compare
.into_iter()
.map(EtcdCompare::from)
.collect::<Vec<_>>();
etcd_txn = etcd_txn.when(compares);
}
if txn.c_then {
let success = txn
.req
.success
.into_iter()
.map(EtcdTxnOp::from)
.collect::<Vec<_>>();
etcd_txn = etcd_txn.and_then(success);
}
if txn.c_else {
let failure = txn
.req
.failure
.into_iter()
.map(EtcdTxnOp::from)
.collect::<Vec<_>>();
etcd_txn = etcd_txn.or_else(failure);
}
etcd_txn
}
}
impl From<Compare> for EtcdCompare {
fn from(cmp: Compare) -> Self {
let etcd_cmp = match cmp.cmp {
CompareOp::Equal => EtcdCompareOp::Equal,
CompareOp::Greater => EtcdCompareOp::Greater,
CompareOp::Less => EtcdCompareOp::Less,
CompareOp::NotEqual => EtcdCompareOp::NotEqual,
};
match cmp.target {
Some(target) => EtcdCompare::value(cmp.key, etcd_cmp, target),
// create revision 0 means key was not exist
None => EtcdCompare::create_revision(cmp.key, etcd_cmp, 0),
}
}
}
impl From<TxnOp> for EtcdTxnOp {
fn from(op: TxnOp) -> Self {
match op {
TxnOp::Put(key, value) => EtcdTxnOp::put(key, value, None),
TxnOp::Get(key) => EtcdTxnOp::get(key, None),
TxnOp::Delete(key) => EtcdTxnOp::delete(key, None),
}
}
}
impl TryFrom<EtcdTxnOpResponse> for TxnOpResponse {
type Error = error::Error;
fn try_from(op_resp: EtcdTxnOpResponse) -> Result<Self> {
match op_resp {
EtcdTxnOpResponse::Put(res) => {
let prev_kv = res.prev_key().cloned().map(Into::into);
let put_res = PutResponse { prev_kv };
Ok(TxnOpResponse::ResponsePut(put_res))
}
EtcdTxnOpResponse::Get(res) => {
let kvs = res.kvs().iter().cloned().map(Into::into).collect();
let range_res = RangeResponse { kvs, more: false };
Ok(TxnOpResponse::ResponseGet(range_res))
}
EtcdTxnOpResponse::Delete(res) => {
let prev_kvs = res
.prev_kvs()
.iter()
.cloned()
.map(Into::into)
.collect::<Vec<_>>();
let delete_res = DeleteRangeResponse {
prev_kvs,
deleted: res.deleted(),
};
Ok(TxnOpResponse::ResponseDelete(delete_res))
}
EtcdTxnOpResponse::Txn(_) => error::EtcdTxnOpResponseSnafu {
err_msg: "nested txn is not supported",
}
.fail(),
}
}
}
impl TryFrom<EtcdTxnResponse> for TxnResponse {
type Error = error::Error;
fn try_from(resp: EtcdTxnResponse) -> Result<Self> {
let succeeded = resp.succeeded();
let responses = resp
.op_responses()
.into_iter()
.map(TxnOpResponse::try_from)
.collect::<Result<Vec<_>>>()?;
Ok(Self {
succeeded,
responses,
})
}
}

View File

@@ -12,15 +12,19 @@
// See the License for the specific language governing permissions and
// limitations under the License.
#![feature(btree_drain_filter)]
pub mod error;
pub mod heartbeat;
pub mod ident;
pub mod instruction;
pub mod key;
pub mod kv_backend;
pub mod metrics;
pub mod peer;
pub mod rpc;
pub mod table_name;
pub mod util;
pub type ClusterId = u64;
pub type DatanodeId = u64;

View File

@@ -0,0 +1,15 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
pub const METRIC_META_TXN_REQUEST: &str = "meta.txn_request";

View File

@@ -18,17 +18,14 @@ pub mod router;
pub mod store;
pub mod util;
use std::fmt::{Display, Formatter};
use api::v1::meta::{KeyValue as PbKeyValue, ResponseHeader as PbResponseHeader};
#[derive(Debug, Clone)]
pub struct ResponseHeader(PbResponseHeader);
impl ResponseHeader {
#[inline]
pub(crate) fn new(header: PbResponseHeader) -> Self {
Self(header)
}
#[inline]
pub fn protocol_version(&self) -> u64 {
self.0.protocol_version
@@ -56,33 +53,83 @@ impl ResponseHeader {
}
}
#[derive(Debug, Clone)]
pub struct KeyValue(PbKeyValue);
#[derive(Debug, Clone, PartialEq)]
pub struct KeyValue {
pub key: Vec<u8>,
pub value: Vec<u8>,
}
impl From<KeyValue> for PbKeyValue {
fn from(kv: KeyValue) -> Self {
Self {
key: kv.key,
value: kv.value,
}
}
}
impl From<etcd_client::KeyValue> for KeyValue {
fn from(kv: etcd_client::KeyValue) -> Self {
Self {
key: kv.key().to_vec(),
value: kv.value().to_vec(),
}
}
}
impl From<KeyValue> for (Vec<u8>, Vec<u8>) {
fn from(kv: KeyValue) -> Self {
(kv.key, kv.value)
}
}
impl From<(Vec<u8>, Vec<u8>)> for KeyValue {
fn from(kv: (Vec<u8>, Vec<u8>)) -> Self {
Self {
key: kv.0,
value: kv.1,
}
}
}
impl Display for KeyValue {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
write!(
f,
"({}, {})",
String::from_utf8_lossy(&self.key),
String::from_utf8_lossy(&self.value)
)
}
}
impl KeyValue {
#[inline]
pub(crate) fn new(kv: PbKeyValue) -> Self {
Self(kv)
pub fn new(kv: PbKeyValue) -> Self {
Self {
key: kv.key,
value: kv.value,
}
}
#[inline]
pub fn key(&self) -> &[u8] {
&self.0.key
&self.key
}
#[inline]
pub fn take_key(&mut self) -> Vec<u8> {
std::mem::take(&mut self.0.key)
std::mem::take(&mut self.key)
}
#[inline]
pub fn value(&self) -> &[u8] {
&self.0.value
&self.value
}
#[inline]
pub fn take_value(&mut self) -> Vec<u8> {
std::mem::take(&mut self.0.value)
std::mem::take(&mut self.value)
}
}
@@ -103,7 +150,7 @@ mod tests {
}),
};
let header = ResponseHeader::new(pb_header);
let header = ResponseHeader(pb_header);
assert_eq!(101, header.protocol_version());
assert_eq!(1, header.cluster_id());
assert_eq!(100, header.error_code());

View File

@@ -12,21 +12,23 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use std::fmt::{Display, Formatter};
use api::v1::meta::{
BatchDeleteRequest as PbBatchDeleteRequest, BatchDeleteResponse as PbBatchDeleteResponse,
BatchGetRequest as PbBatchGetRequest, BatchGetResponse as PbBatchGetResponse,
BatchPutRequest as PbBatchPutRequest, BatchPutResponse as PbBatchPutResponse,
CompareAndPutRequest as PbCompareAndPutRequest,
CompareAndPutResponse as PbCompareAndPutResponse, DeleteRangeRequest as PbDeleteRangeRequest,
DeleteRangeResponse as PbDeleteRangeResponse, KeyValue as PbKeyValue,
MoveValueRequest as PbMoveValueRequest, MoveValueResponse as PbMoveValueResponse,
PutRequest as PbPutRequest, PutResponse as PbPutResponse, RangeRequest as PbRangeRequest,
RangeResponse as PbRangeResponse,
DeleteRangeResponse as PbDeleteRangeResponse, MoveValueRequest as PbMoveValueRequest,
MoveValueResponse as PbMoveValueResponse, PutRequest as PbPutRequest,
PutResponse as PbPutResponse, RangeRequest as PbRangeRequest, RangeResponse as PbRangeResponse,
ResponseHeader as PbResponseHeader,
};
use crate::error;
use crate::error::Result;
use crate::rpc::{util, KeyValue, ResponseHeader};
use crate::rpc::{util, KeyValue};
#[derive(Debug, Clone, Default)]
pub struct RangeRequest {
@@ -59,6 +61,30 @@ impl From<RangeRequest> for PbRangeRequest {
}
}
impl From<PbRangeRequest> for RangeRequest {
fn from(value: PbRangeRequest) -> Self {
Self {
key: value.key,
range_end: value.range_end,
limit: value.limit,
keys_only: value.keys_only,
}
}
}
impl Display for RangeRequest {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
write!(
f,
"RangeRequest{{key: '{}', range_end: '{}', limit: {}, keys_only: {}}}",
String::from_utf8_lossy(&self.key),
String::from_utf8_lossy(&self.range_end),
self.limit,
self.keys_only
)
}
}
impl RangeRequest {
#[inline]
pub fn new() -> Self {
@@ -119,8 +145,26 @@ impl RangeRequest {
}
}
#[derive(Debug, Clone)]
pub struct RangeResponse(PbRangeResponse);
#[derive(Debug, Clone, PartialEq)]
pub struct RangeResponse {
pub kvs: Vec<KeyValue>,
pub more: bool,
}
impl Display for RangeResponse {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
write!(
f,
"RangeResponse{{kvs: [{}], more: {}}}",
self.kvs
.iter()
.map(|kv| kv.to_string())
.collect::<Vec<_>>()
.join(", "),
self.more
)
}
}
impl TryFrom<PbRangeResponse> for RangeResponse {
type Error = error::Error;
@@ -128,29 +172,25 @@ impl TryFrom<PbRangeResponse> for RangeResponse {
fn try_from(pb: PbRangeResponse) -> Result<Self> {
util::check_response_header(pb.header.as_ref())?;
Ok(Self::new(pb))
Ok(Self {
kvs: pb.kvs.into_iter().map(KeyValue::new).collect(),
more: pb.more,
})
}
}
impl RangeResponse {
#[inline]
pub fn new(res: PbRangeResponse) -> Self {
Self(res)
}
#[inline]
pub fn take_header(&mut self) -> Option<ResponseHeader> {
self.0.header.take().map(ResponseHeader::new)
pub fn to_proto_resp(self, header: PbResponseHeader) -> PbRangeResponse {
PbRangeResponse {
header: Some(header),
kvs: self.kvs.into_iter().map(Into::into).collect(),
more: self.more,
}
}
#[inline]
pub fn take_kvs(&mut self) -> Vec<KeyValue> {
self.0.kvs.drain(..).map(KeyValue::new).collect()
}
#[inline]
pub fn more(&self) -> bool {
self.0.more
self.kvs.drain(..).collect()
}
}
@@ -177,6 +217,16 @@ impl From<PutRequest> for PbPutRequest {
}
}
impl From<PbPutRequest> for PutRequest {
fn from(value: PbPutRequest) -> Self {
Self {
key: value.key,
value: value.value,
prev_kv: value.prev_kv,
}
}
}
impl PutRequest {
#[inline]
pub fn new() -> Self {
@@ -211,8 +261,10 @@ impl PutRequest {
}
}
#[derive(Debug, Clone)]
pub struct PutResponse(PbPutResponse);
#[derive(Debug, Clone, PartialEq)]
pub struct PutResponse {
pub prev_kv: Option<KeyValue>,
}
impl TryFrom<PbPutResponse> for PutResponse {
type Error = error::Error;
@@ -220,27 +272,22 @@ impl TryFrom<PbPutResponse> for PutResponse {
fn try_from(pb: PbPutResponse) -> Result<Self> {
util::check_response_header(pb.header.as_ref())?;
Ok(Self::new(pb))
Ok(Self {
prev_kv: pb.prev_kv.map(KeyValue::new),
})
}
}
impl PutResponse {
#[inline]
pub fn new(res: PbPutResponse) -> Self {
Self(res)
}
#[inline]
pub fn take_header(&mut self) -> Option<ResponseHeader> {
self.0.header.take().map(ResponseHeader::new)
}
#[inline]
pub fn take_prev_kv(&mut self) -> Option<KeyValue> {
self.0.prev_kv.take().map(KeyValue::new)
pub fn to_proto_resp(self, header: PbResponseHeader) -> PbPutResponse {
PbPutResponse {
header: Some(header),
prev_kv: self.prev_kv.map(Into::into),
}
}
}
#[derive(Clone)]
pub struct BatchGetRequest {
pub keys: Vec<Vec<u8>>,
}
@@ -254,6 +301,12 @@ impl From<BatchGetRequest> for PbBatchGetRequest {
}
}
impl From<PbBatchGetRequest> for BatchGetRequest {
fn from(value: PbBatchGetRequest) -> Self {
Self { keys: value.keys }
}
}
impl Default for BatchGetRequest {
fn default() -> Self {
Self::new()
@@ -274,7 +327,23 @@ impl BatchGetRequest {
}
#[derive(Debug, Clone)]
pub struct BatchGetResponse(PbBatchGetResponse);
pub struct BatchGetResponse {
pub kvs: Vec<KeyValue>,
}
impl Display for BatchGetResponse {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
write!(
f,
"[{}]",
self.kvs
.iter()
.map(|kv| kv.to_string())
.collect::<Vec<_>>()
.join(", "),
)
}
}
impl TryFrom<PbBatchGetResponse> for BatchGetResponse {
type Error = error::Error;
@@ -282,30 +351,24 @@ impl TryFrom<PbBatchGetResponse> for BatchGetResponse {
fn try_from(pb: PbBatchGetResponse) -> Result<Self> {
util::check_response_header(pb.header.as_ref())?;
Ok(Self(pb))
Ok(Self {
kvs: pb.kvs.into_iter().map(KeyValue::new).collect(),
})
}
}
impl BatchGetResponse {
#[inline]
pub fn new(res: PbBatchGetResponse) -> Self {
Self(res)
}
#[inline]
pub fn take_header(&mut self) -> Option<ResponseHeader> {
self.0.header.take().map(ResponseHeader::new)
}
#[inline]
pub fn take_kvs(&mut self) -> Vec<KeyValue> {
self.0.kvs.drain(..).map(KeyValue::new).collect()
pub fn to_proto_resp(self, header: PbResponseHeader) -> PbBatchGetResponse {
PbBatchGetResponse {
header: Some(header),
kvs: self.kvs.into_iter().map(Into::into).collect(),
}
}
}
#[derive(Debug, Clone, Default)]
pub struct BatchPutRequest {
pub kvs: Vec<PbKeyValue>,
pub kvs: Vec<KeyValue>,
/// If prev_kv is set, gets the previous key-value pairs before changing it.
/// The previous key-value pairs will be returned in the batch put response.
pub prev_kv: bool,
@@ -315,12 +378,21 @@ impl From<BatchPutRequest> for PbBatchPutRequest {
fn from(req: BatchPutRequest) -> Self {
Self {
header: None,
kvs: req.kvs,
kvs: req.kvs.into_iter().map(Into::into).collect(),
prev_kv: req.prev_kv,
}
}
}
impl From<PbBatchPutRequest> for BatchPutRequest {
fn from(value: PbBatchPutRequest) -> Self {
Self {
kvs: value.kvs.into_iter().map(KeyValue::new).collect(),
prev_kv: value.prev_kv,
}
}
}
impl BatchPutRequest {
#[inline]
pub fn new() -> Self {
@@ -332,7 +404,7 @@ impl BatchPutRequest {
#[inline]
pub fn add_kv(mut self, key: impl Into<Vec<u8>>, value: impl Into<Vec<u8>>) -> Self {
self.kvs.push(PbKeyValue {
self.kvs.push(KeyValue {
key: key.into(),
value: value.into(),
});
@@ -349,7 +421,9 @@ impl BatchPutRequest {
}
#[derive(Debug, Clone)]
pub struct BatchPutResponse(PbBatchPutResponse);
pub struct BatchPutResponse {
pub prev_kvs: Vec<KeyValue>,
}
impl TryFrom<PbBatchPutResponse> for BatchPutResponse {
type Error = error::Error;
@@ -357,24 +431,23 @@ impl TryFrom<PbBatchPutResponse> for BatchPutResponse {
fn try_from(pb: PbBatchPutResponse) -> Result<Self> {
util::check_response_header(pb.header.as_ref())?;
Ok(Self::new(pb))
Ok(Self {
prev_kvs: pb.prev_kvs.into_iter().map(KeyValue::new).collect(),
})
}
}
impl BatchPutResponse {
#[inline]
pub fn new(res: PbBatchPutResponse) -> Self {
Self(res)
}
#[inline]
pub fn take_header(&mut self) -> Option<ResponseHeader> {
self.0.header.take().map(ResponseHeader::new)
pub fn to_proto_resp(self, header: PbResponseHeader) -> PbBatchPutResponse {
PbBatchPutResponse {
header: Some(header),
prev_kvs: self.prev_kvs.into_iter().map(Into::into).collect(),
}
}
#[inline]
pub fn take_prev_kvs(&mut self) -> Vec<KeyValue> {
self.0.prev_kvs.drain(..).map(KeyValue::new).collect()
self.prev_kvs.drain(..).collect()
}
}
@@ -396,6 +469,15 @@ impl From<BatchDeleteRequest> for PbBatchDeleteRequest {
}
}
impl From<PbBatchDeleteRequest> for BatchDeleteRequest {
fn from(value: PbBatchDeleteRequest) -> Self {
Self {
keys: value.keys,
prev_kv: value.prev_kv,
}
}
}
impl BatchDeleteRequest {
#[inline]
pub fn new() -> Self {
@@ -421,7 +503,9 @@ impl BatchDeleteRequest {
}
#[derive(Debug, Clone)]
pub struct BatchDeleteResponse(PbBatchDeleteResponse);
pub struct BatchDeleteResponse {
pub prev_kvs: Vec<KeyValue>,
}
impl TryFrom<PbBatchDeleteResponse> for BatchDeleteResponse {
type Error = error::Error;
@@ -429,24 +513,18 @@ impl TryFrom<PbBatchDeleteResponse> for BatchDeleteResponse {
fn try_from(pb: PbBatchDeleteResponse) -> Result<Self> {
util::check_response_header(pb.header.as_ref())?;
Ok(Self::new(pb))
Ok(Self {
prev_kvs: pb.prev_kvs.into_iter().map(KeyValue::new).collect(),
})
}
}
impl BatchDeleteResponse {
#[inline]
pub fn new(res: PbBatchDeleteResponse) -> Self {
Self(res)
}
#[inline]
pub fn take_header(&mut self) -> Option<ResponseHeader> {
self.0.header.take().map(ResponseHeader::new)
}
#[inline]
pub fn take_prev_kvs(&mut self) -> Vec<KeyValue> {
self.0.prev_kvs.drain(..).map(KeyValue::new).collect()
pub fn to_proto_resp(self, header: PbResponseHeader) -> PbBatchDeleteResponse {
PbBatchDeleteResponse {
header: Some(header),
prev_kvs: self.prev_kvs.into_iter().map(Into::into).collect(),
}
}
}
@@ -471,6 +549,16 @@ impl From<CompareAndPutRequest> for PbCompareAndPutRequest {
}
}
impl From<PbCompareAndPutRequest> for CompareAndPutRequest {
fn from(value: PbCompareAndPutRequest) -> Self {
Self {
key: value.key,
expect: value.expect,
value: value.value,
}
}
}
impl CompareAndPutRequest {
#[inline]
pub fn new() -> Self {
@@ -504,8 +592,11 @@ impl CompareAndPutRequest {
}
}
#[derive(Debug, Clone)]
pub struct CompareAndPutResponse(PbCompareAndPutResponse);
#[derive(Debug, Clone, Default)]
pub struct CompareAndPutResponse {
pub success: bool,
pub prev_kv: Option<KeyValue>,
}
impl TryFrom<PbCompareAndPutResponse> for CompareAndPutResponse {
type Error = error::Error;
@@ -513,29 +604,30 @@ impl TryFrom<PbCompareAndPutResponse> for CompareAndPutResponse {
fn try_from(pb: PbCompareAndPutResponse) -> Result<Self> {
util::check_response_header(pb.header.as_ref())?;
Ok(Self::new(pb))
Ok(Self {
success: pb.success,
prev_kv: pb.prev_kv.map(KeyValue::new),
})
}
}
impl CompareAndPutResponse {
#[inline]
pub fn new(res: PbCompareAndPutResponse) -> Self {
Self(res)
}
#[inline]
pub fn take_header(&mut self) -> Option<ResponseHeader> {
self.0.header.take().map(ResponseHeader::new)
pub fn to_proto_resp(self, header: PbResponseHeader) -> PbCompareAndPutResponse {
PbCompareAndPutResponse {
header: Some(header),
success: self.success,
prev_kv: self.prev_kv.map(Into::into),
}
}
#[inline]
pub fn is_success(&self) -> bool {
self.0.success
self.success
}
#[inline]
pub fn take_prev_kv(&mut self) -> Option<KeyValue> {
self.0.prev_kv.take().map(KeyValue::new)
self.prev_kv.take()
}
}
@@ -571,6 +663,16 @@ impl From<DeleteRangeRequest> for PbDeleteRangeRequest {
}
}
impl From<PbDeleteRangeRequest> for DeleteRangeRequest {
fn from(value: PbDeleteRangeRequest) -> Self {
Self {
key: value.key,
range_end: value.range_end,
prev_kv: value.prev_kv,
}
}
}
impl DeleteRangeRequest {
#[inline]
pub fn new() -> Self {
@@ -624,8 +726,11 @@ impl DeleteRangeRequest {
}
}
#[derive(Debug, Clone)]
pub struct DeleteRangeResponse(PbDeleteRangeResponse);
#[derive(Debug, Clone, PartialEq)]
pub struct DeleteRangeResponse {
pub deleted: i64,
pub prev_kvs: Vec<KeyValue>,
}
impl TryFrom<PbDeleteRangeResponse> for DeleteRangeResponse {
type Error = error::Error;
@@ -633,29 +738,30 @@ impl TryFrom<PbDeleteRangeResponse> for DeleteRangeResponse {
fn try_from(pb: PbDeleteRangeResponse) -> Result<Self> {
util::check_response_header(pb.header.as_ref())?;
Ok(Self::new(pb))
Ok(Self {
deleted: pb.deleted,
prev_kvs: pb.prev_kvs.into_iter().map(KeyValue::new).collect(),
})
}
}
impl DeleteRangeResponse {
#[inline]
pub fn new(res: PbDeleteRangeResponse) -> Self {
Self(res)
}
#[inline]
pub fn take_header(&mut self) -> Option<ResponseHeader> {
self.0.header.take().map(ResponseHeader::new)
pub fn to_proto_resp(self, header: PbResponseHeader) -> PbDeleteRangeResponse {
PbDeleteRangeResponse {
header: Some(header),
deleted: self.deleted,
prev_kvs: self.prev_kvs.into_iter().map(Into::into).collect(),
}
}
#[inline]
pub fn deleted(&self) -> i64 {
self.0.deleted
self.deleted
}
#[inline]
pub fn take_prev_kvs(&mut self) -> Vec<KeyValue> {
self.0.prev_kvs.drain(..).map(KeyValue::new).collect()
self.prev_kvs.drain(..).collect()
}
}
@@ -678,6 +784,15 @@ impl From<MoveValueRequest> for PbMoveValueRequest {
}
}
impl From<PbMoveValueRequest> for MoveValueRequest {
fn from(value: PbMoveValueRequest) -> Self {
Self {
from_key: value.from_key,
to_key: value.to_key,
}
}
}
impl MoveValueRequest {
#[inline]
pub fn new(from_key: impl Into<Vec<u8>>, to_key: impl Into<Vec<u8>>) -> Self {
@@ -689,7 +804,7 @@ impl MoveValueRequest {
}
#[derive(Debug, Clone)]
pub struct MoveValueResponse(PbMoveValueResponse);
pub struct MoveValueResponse(pub Option<KeyValue>);
impl TryFrom<PbMoveValueResponse> for MoveValueResponse {
type Error = error::Error;
@@ -697,24 +812,21 @@ impl TryFrom<PbMoveValueResponse> for MoveValueResponse {
fn try_from(pb: PbMoveValueResponse) -> Result<Self> {
util::check_response_header(pb.header.as_ref())?;
Ok(Self::new(pb))
Ok(Self(pb.kv.map(KeyValue::new)))
}
}
impl MoveValueResponse {
#[inline]
pub fn new(res: PbMoveValueResponse) -> Self {
Self(res)
}
#[inline]
pub fn take_header(&mut self) -> Option<ResponseHeader> {
self.0.header.take().map(ResponseHeader::new)
pub fn to_proto_resp(self, header: PbResponseHeader) -> PbMoveValueResponse {
PbMoveValueResponse {
header: Some(header),
kv: self.0.map(Into::into),
}
}
#[inline]
pub fn take_kv(&mut self) -> Option<KeyValue> {
self.0.kv.take().map(KeyValue::new)
self.0.take()
}
}
@@ -784,9 +896,8 @@ mod tests {
more: true,
};
let mut res = RangeResponse::new(pb_res);
assert!(res.take_header().is_none());
assert!(res.more());
let mut res: RangeResponse = pb_res.try_into().unwrap();
assert!(res.more);
let mut kvs = res.take_kvs();
let kv0 = kvs.get_mut(0).unwrap();
assert_eq!(b"k1".to_vec(), kv0.key().to_vec());
@@ -826,9 +937,8 @@ mod tests {
}),
};
let mut res = PutResponse::new(pb_res);
assert!(res.take_header().is_none());
let mut kv = res.take_prev_kv().unwrap();
let res: PutResponse = pb_res.try_into().unwrap();
let mut kv = res.prev_kv.unwrap();
assert_eq!(b"k1".to_vec(), kv.key().to_vec());
assert_eq!(b"k1".to_vec(), kv.take_key());
assert_eq!(b"v1".to_vec(), kv.value().to_vec());
@@ -859,12 +969,8 @@ mod tests {
value: b"test_value1".to_vec(),
}],
};
let mut res = BatchGetResponse::new(pb_res);
assert!(res.take_header().is_none());
let kvs = res.take_kvs();
let res: BatchGetResponse = pb_res.try_into().unwrap();
let kvs = res.kvs;
assert_eq!(b"test_key1".as_slice(), kvs[0].key());
assert_eq!(b"test_value1".as_slice(), kvs[0].value());
}
@@ -898,8 +1004,7 @@ mod tests {
}],
};
let mut res = BatchPutResponse::new(pb_res);
assert!(res.take_header().is_none());
let mut res: BatchPutResponse = pb_res.try_into().unwrap();
let kvs = res.take_prev_kvs();
assert_eq!(b"k1".to_vec(), kvs[0].key().to_vec());
assert_eq!(b"v1".to_vec(), kvs[0].value().to_vec());
@@ -931,9 +1036,8 @@ mod tests {
}],
};
let mut res = BatchDeleteResponse::new(pb_res);
assert!(res.take_header().is_none());
let kvs = res.take_prev_kvs();
let res: BatchDeleteResponse = pb_res.try_into().unwrap();
let kvs = res.prev_kvs;
assert_eq!(b"k1".to_vec(), kvs[0].key().to_vec());
assert_eq!(b"v1".to_vec(), kvs[0].value().to_vec());
}
@@ -969,8 +1073,7 @@ mod tests {
}),
};
let mut res = CompareAndPutResponse::new(pb_res);
assert!(res.take_header().is_none());
let mut res: CompareAndPutResponse = pb_res.try_into().unwrap();
let mut kv = res.take_prev_kv().unwrap();
assert_eq!(b"k1".to_vec(), kv.key().to_vec());
assert_eq!(b"k1".to_vec(), kv.take_key());
@@ -1026,7 +1129,6 @@ mod tests {
};
let mut res: DeleteRangeResponse = pb_res.try_into().unwrap();
assert!(res.take_header().is_none());
assert_eq!(2, res.deleted());
let mut kvs = res.take_prev_kvs();
let kv0 = kvs.get_mut(0).unwrap();
@@ -1064,8 +1166,7 @@ mod tests {
}),
};
let mut res = MoveValueResponse::new(pb_res);
assert!(res.take_header().is_none());
let mut res: MoveValueResponse = pb_res.try_into().unwrap();
let mut kv = res.take_kv().unwrap();
assert_eq!(b"k1".to_vec(), kv.key().to_vec());
assert_eq!(b"k1".to_vec(), kv.take_key());

View File

@@ -0,0 +1,48 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
/// Get prefix end key of `key`.
#[inline]
pub fn get_prefix_end_key(key: &[u8]) -> Vec<u8> {
for (i, v) in key.iter().enumerate().rev() {
if *v < 0xFF {
let mut end = Vec::from(&key[..=i]);
end[i] = *v + 1;
return end;
}
}
// next prefix does not exist (e.g., 0xffff);
vec![0]
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_get_prefix() {
let key = b"testa";
assert_eq!(b"testb".to_vec(), get_prefix_end_key(key));
let key = vec![0, 0, 26];
assert_eq!(vec![0, 0, 27], get_prefix_end_key(&key));
let key = vec![0, 0, 255];
assert_eq!(vec![0, 1], get_prefix_end_key(&key));
let key = vec![0, 255, 255];
assert_eq!(vec![1], get_prefix_end_key(&key));
}
}