refactor(meta): Ensure all moving values remain unchanged between two transactions (#3727)

* feat: implement `move_values`

* refactor: using `move_values`

* refactor: refactor executor

* chore: fix clippy

* refactor: remove `CasKeyChanged` error

* refactor: refactor `move_values`

* chore: update comments

* refactor: do not compare `dest_key`

* chore: update comments

* chore: apply suggestions from CR

* chore: remove `#[inline]`

* chore: check length of keys and dest_key
This commit is contained in:
Weny Xu
2024-04-18 13:35:54 +08:00
committed by GitHub
parent 4248dfcf36
commit 2a2a44883f
6 changed files with 427 additions and 484 deletions

View File

@@ -106,19 +106,6 @@ impl DropTableExecutor {
ctx: &DdlContext,
table_route_value: &TableRouteValue,
) -> Result<()> {
let table_name_key = TableNameKey::new(
&self.table.catalog_name,
&self.table.schema_name,
&self.table.table_name,
);
if !ctx
.table_metadata_manager
.table_name_manager()
.exists(table_name_key)
.await?
{
return Ok(());
}
ctx.table_metadata_manager
.delete_table_metadata(self.table_id, &self.table, table_route_value)
.await
@@ -152,19 +139,6 @@ impl DropTableExecutor {
ctx: &DdlContext,
table_route_value: &TableRouteValue,
) -> Result<()> {
let table_name_key = TableNameKey::new(
&self.table.catalog_name,
&self.table.schema_name,
&self.table.table_name,
);
if ctx
.table_metadata_manager
.table_name_manager()
.exists(table_name_key)
.await?
{
return Ok(());
}
ctx.table_metadata_manager
.restore_table_metadata(self.table_id, &self.table, table_route_value)
.await

View File

@@ -421,8 +421,8 @@ pub enum Error {
#[snafu(display("Invalid role: {}", role))]
InvalidRole { role: i32, location: Location },
#[snafu(display("Atomic key changed: {err_msg}"))]
CasKeyChanged { err_msg: String, location: Location },
#[snafu(display("Failed to move values: {err_msg}"))]
MoveValues { err_msg: String, location: Location },
#[snafu(display("Failed to parse {} from utf8", name))]
FromUtf8 {
@@ -444,7 +444,7 @@ impl ErrorExt for Error {
| EtcdFailed { .. }
| EtcdTxnFailed { .. }
| ConnectEtcd { .. }
| CasKeyChanged { .. } => StatusCode::Internal,
| MoveValues { .. } => StatusCode::Internal,
SerdeJson { .. }
| ParseOption { .. }

View File

@@ -88,13 +88,13 @@ use self::schema_name::{SchemaManager, SchemaNameKey, SchemaNameValue};
use self::table_route::{TableRouteManager, TableRouteValue};
use self::tombstone::TombstoneManager;
use crate::ddl::utils::region_storage_path;
use crate::error::{self, Result, SerdeJsonSnafu, UnexpectedSnafu};
use crate::error::{self, Result, SerdeJsonSnafu};
use crate::key::table_route::TableRouteKey;
use crate::key::tombstone::Key;
use crate::key::txn_helper::TxnOpGetResponseSet;
use crate::kv_backend::txn::{Txn, TxnOp, TxnOpResponse};
use crate::kv_backend::KvBackendRef;
use crate::rpc::router::{region_distribution, RegionRoute, RegionStatus};
use crate::rpc::store::BatchDeleteRequest;
use crate::table_name::TableName;
use crate::DatanodeId;
@@ -582,7 +582,7 @@ impl TableMetadataManager {
table_id: TableId,
table_name: &TableName,
table_route_value: &TableRouteValue,
) -> Result<Vec<Key>> {
) -> Result<Vec<Vec<u8>>> {
// Builds keys
let datanode_ids = if table_route_value.is_physical() {
region_distribution(table_route_value.region_routes()?)
@@ -604,11 +604,11 @@ impl TableMetadataManager {
.map(|datanode_id| DatanodeTableKey::new(datanode_id, table_id))
.collect::<HashSet<_>>();
keys.push(Key::compare_and_swap(table_name.as_raw_key()));
keys.push(Key::new(table_info_key.as_raw_key()));
keys.push(Key::new(table_route_key.as_raw_key()));
keys.push(table_name.as_raw_key());
keys.push(table_info_key.as_raw_key());
keys.push(table_route_key.as_raw_key());
for key in &datanode_table_keys {
keys.push(Key::new(key.as_raw_key()));
keys.push(key.as_raw_key());
}
Ok(keys)
}
@@ -622,8 +622,7 @@ impl TableMetadataManager {
table_route_value: &TableRouteValue,
) -> Result<()> {
let keys = self.table_metadata_keys(table_id, table_name, table_route_value)?;
self.tombstone_manager.create(keys).await?;
Ok(())
self.tombstone_manager.create(keys).await
}
/// Deletes metadata tombstone for table **permanently**.
@@ -634,11 +633,7 @@ impl TableMetadataManager {
table_name: &TableName,
table_route_value: &TableRouteValue,
) -> Result<()> {
let keys = self
.table_metadata_keys(table_id, table_name, table_route_value)?
.into_iter()
.map(|key| key.into_bytes())
.collect::<Vec<_>>();
let keys = self.table_metadata_keys(table_id, table_name, table_route_value)?;
self.tombstone_manager.delete(keys).await
}
@@ -651,8 +646,7 @@ impl TableMetadataManager {
table_route_value: &TableRouteValue,
) -> Result<()> {
let keys = self.table_metadata_keys(table_id, table_name, table_route_value)?;
self.tombstone_manager.restore(keys).await?;
Ok(())
self.tombstone_manager.restore(keys).await
}
/// Deletes metadata for table **permanently**.
@@ -663,21 +657,11 @@ impl TableMetadataManager {
table_name: &TableName,
table_route_value: &TableRouteValue,
) -> Result<()> {
let operations = self
.table_metadata_keys(table_id, table_name, table_route_value)?
.into_iter()
.map(|key| TxnOp::Delete(key.into_bytes()))
.collect::<Vec<_>>();
let txn = Txn::new().and_then(operations);
let resp = self.kv_backend.txn(txn).await?;
ensure!(
resp.succeeded,
UnexpectedSnafu {
err_msg: format!("Failed to destroy table metadata: {table_id}")
}
);
let keys = self.table_metadata_keys(table_id, table_name, table_route_value)?;
let _ = self
.kv_backend
.batch_delete(BatchDeleteRequest::new().with_keys(keys))
.await?;
Ok(())
}
@@ -1286,14 +1270,17 @@ mod tests {
.delete_table_metadata(table_id, &table_name, table_route_value)
.await
.unwrap();
// Should be ignored.
table_metadata_manager
.delete_table_metadata(table_id, &table_name, table_route_value)
.await
.unwrap();
assert!(table_metadata_manager
.table_info_manager()
.get(table_id)
.await
.unwrap()
.is_none());
assert!(table_metadata_manager
.table_route_manager()
.table_route_storage()
@@ -1301,7 +1288,6 @@ mod tests {
.await
.unwrap()
.is_none());
assert!(table_metadata_manager
.datanode_table_manager()
.tables(datanode_id)
@@ -1316,7 +1302,6 @@ mod tests {
.await
.unwrap();
assert!(table_info.is_none());
let table_route = table_metadata_manager
.table_route_manager()
.table_route_storage()
@@ -1792,5 +1777,12 @@ mod tests {
.unwrap();
let kvs = mem_kv.dump();
assert_eq!(kvs, expected_result);
// Should be ignored.
table_metadata_manager
.restore_table_metadata(table_id, &table_name, &table_route_value)
.await
.unwrap();
let kvs = mem_kv.dump();
assert_eq!(kvs, expected_result);
}
}

View File

@@ -12,13 +12,15 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use snafu::{ensure, OptionExt};
use std::collections::HashMap;
use snafu::ensure;
use super::TableMetaKeyGetTxnOp;
use crate::error::{self, Result};
use crate::key::txn_helper::TxnOpGetResponseSet;
use crate::kv_backend::txn::{Compare, CompareOp, Txn, TxnOp};
use crate::kv_backend::KvBackendRef;
use crate::rpc::store::BatchGetRequest;
/// [TombstoneManager] provides the ability to:
/// - logically delete values
@@ -29,307 +31,160 @@ pub(crate) struct TombstoneManager {
const TOMBSTONE_PREFIX: &str = "__tombstone/";
pub(crate) struct TombstoneKey<T>(T);
pub(crate) struct TombstoneKeyValue {
pub(crate) origin_key: Vec<u8>,
pub(crate) tombstone_key: Vec<u8>,
pub(crate) value: Vec<u8>,
}
fn to_tombstone(key: &[u8]) -> Vec<u8> {
[TOMBSTONE_PREFIX.as_bytes(), key].concat()
}
impl TombstoneKey<&Vec<u8>> {
/// Returns the origin key and tombstone key.
fn to_keys(&self) -> (Vec<u8>, Vec<u8>) {
let key = self.0;
let tombstone_key = to_tombstone(key);
(key.clone(), tombstone_key)
}
/// Returns the origin key and tombstone key.
fn into_keys(self) -> (Vec<u8>, Vec<u8>) {
self.to_keys()
}
/// Returns the tombstone key.
fn to_tombstone_key(&self) -> Vec<u8> {
let key = self.0;
to_tombstone(key)
}
}
impl TableMetaKeyGetTxnOp for TombstoneKey<&Vec<u8>> {
fn build_get_op(
&self,
) -> (
TxnOp,
impl FnMut(&'_ mut TxnOpGetResponseSet) -> Option<Vec<u8>>,
) {
TxnOpGetResponseSet::build_get_op(to_tombstone(self.0))
}
}
/// The key used in the [TombstoneManager].
pub(crate) struct Key {
bytes: Vec<u8>,
// Atomic Key:
// The value corresponding to the key remains consistent between two transactions.
atomic: bool,
}
impl Key {
/// Returns a new atomic key.
pub(crate) fn compare_and_swap<T: Into<Vec<u8>>>(key: T) -> Self {
Self {
bytes: key.into(),
atomic: true,
}
}
/// Returns a new normal key.
pub(crate) fn new<T: Into<Vec<u8>>>(key: T) -> Self {
Self {
bytes: key.into(),
atomic: false,
}
}
/// Into bytes
pub(crate) fn into_bytes(self) -> Vec<u8> {
self.bytes
}
fn get_inner(&self) -> &Vec<u8> {
&self.bytes
}
fn is_atomic(&self) -> bool {
self.atomic
}
}
impl TableMetaKeyGetTxnOp for Key {
fn build_get_op(
&self,
) -> (
TxnOp,
impl FnMut(&'_ mut TxnOpGetResponseSet) -> Option<Vec<u8>>,
) {
let key = self.get_inner().clone();
(TxnOp::Get(key.clone()), TxnOpGetResponseSet::filter(key))
}
}
fn format_on_failure_error_message<F: FnMut(&mut TxnOpGetResponseSet) -> Option<Vec<u8>>>(
mut set: TxnOpGetResponseSet,
on_failure_kv_and_filters: Vec<(Vec<u8>, Vec<u8>, F)>,
) -> String {
on_failure_kv_and_filters
.into_iter()
.flat_map(|(key, value, mut filter)| {
let got = filter(&mut set);
let Some(got) = got else {
return Some(format!(
"For key: {} was expected: {}, but value does not exists",
String::from_utf8_lossy(&key),
String::from_utf8_lossy(&value),
));
};
if got != value {
Some(format!(
"For key: {} was expected: {}, but got: {}",
String::from_utf8_lossy(&key),
String::from_utf8_lossy(&value),
String::from_utf8_lossy(&got),
))
} else {
None
}
})
.collect::<Vec<_>>()
.join("; ")
}
fn format_keys(keys: &[Key]) -> String {
keys.iter()
.map(|key| String::from_utf8_lossy(&key.bytes))
.collect::<Vec<_>>()
.join(", ")
}
impl TombstoneManager {
/// Returns [TombstoneManager].
pub fn new(kv_backend: KvBackendRef) -> Self {
Self { kv_backend }
}
/// Moves value to `dest_key`.
///
/// Puts `value` to `dest_key` if the value of `src_key` equals `value`.
///
/// Otherwise retrieves the value of `src_key`.
fn build_move_value_txn(
&self,
src_key: Vec<u8>,
value: Vec<u8>,
dest_key: Vec<u8>,
) -> (Txn, impl FnMut(&mut TxnOpGetResponseSet) -> Option<Vec<u8>>) {
let txn = Txn::new()
.when(vec![Compare::with_value(
src_key.clone(),
CompareOp::Equal,
value.clone(),
)])
.and_then(vec![
TxnOp::Put(dest_key.clone(), value.clone()),
TxnOp::Delete(src_key.clone()),
])
.or_else(vec![TxnOp::Get(src_key.clone())]);
(txn, TxnOpGetResponseSet::filter(src_key))
}
async fn move_values_inner(&self, keys: &[Vec<u8>], dest_keys: &[Vec<u8>]) -> Result<()> {
ensure!(
keys.len() == dest_keys.len(),
error::UnexpectedSnafu {
err_msg: "The length of keys does not match the length of dest_keys."
}
);
// The key -> dest key mapping.
let lookup_table = keys.iter().zip(dest_keys.iter()).collect::<HashMap<_, _>>();
let resp = self
.kv_backend
.batch_get(BatchGetRequest::new().with_keys(keys.to_vec()))
.await?;
let mut results = resp
.kvs
.into_iter()
.map(|kv| (kv.key, kv.value))
.collect::<HashMap<_, _>>();
const MAX_RETRIES: usize = 8;
for _ in 0..MAX_RETRIES {
let (txns, (keys, filters)): (Vec<_>, (Vec<_>, Vec<_>)) = results
.iter()
.map(|(key, value)| {
let (txn, filter) = self.build_move_value_txn(
key.clone(),
value.clone(),
lookup_table[&key].clone(),
);
(txn, (key.clone(), filter))
})
.unzip();
let mut resp = self.kv_backend.txn(Txn::merge_all(txns)).await?;
if resp.succeeded {
return Ok(());
}
let mut set = TxnOpGetResponseSet::from(&mut resp.responses);
// Updates results.
for (idx, mut filter) in filters.into_iter().enumerate() {
if let Some(value) = filter(&mut set) {
results.insert(keys[idx].clone(), value);
} else {
results.remove(&keys[idx]);
}
}
}
error::MoveValuesSnafu {
err_msg: format!(
"keys: {:?}",
keys.iter().map(|key| String::from_utf8_lossy(key)),
),
}
.fail()
}
/// Moves values to `dest_key`.
async fn move_values(&self, keys: Vec<Vec<u8>>, dest_keys: Vec<Vec<u8>>) -> Result<()> {
let chunk_size = self.kv_backend.max_txn_ops() / 2;
if keys.len() > chunk_size {
let keys_chunks = keys.chunks(chunk_size).collect::<Vec<_>>();
let dest_keys_chunks = keys.chunks(chunk_size).collect::<Vec<_>>();
for (keys, dest_keys) in keys_chunks.into_iter().zip(dest_keys_chunks) {
self.move_values_inner(keys, dest_keys).await?;
}
Ok(())
} else {
self.move_values_inner(&keys, &dest_keys).await
}
}
/// Creates tombstones for keys.
///
/// Preforms to:
/// - retrieve all values corresponding `keys`.
/// - deletes origin values.
/// - stores tombstone values.
pub(crate) async fn create(&self, keys: Vec<Key>) -> Result<()> {
// Builds transaction to retrieve all values
let (operations, mut filters): (Vec<_>, Vec<_>) =
keys.iter().map(|key| key.build_get_op()).unzip();
pub(crate) async fn create(&self, keys: Vec<Vec<u8>>) -> Result<()> {
let (keys, dest_keys): (Vec<_>, Vec<_>) = keys
.into_iter()
.map(|key| {
let tombstone_key = to_tombstone(&key);
(key, tombstone_key)
})
.unzip();
let txn = Txn::new().and_then(operations);
let mut resp = self.kv_backend.txn(txn).await?;
ensure!(
resp.succeeded,
error::UnexpectedSnafu {
err_msg: format!(
"Failed to retrieves the metadata, keys: {}",
format_keys(&keys)
),
}
);
let mut set = TxnOpGetResponseSet::from(&mut resp.responses);
// Builds the create tombstone transaction.
let mut tombstone_operations = Vec::with_capacity(keys.len() * 2);
let mut tombstone_comparison = vec![];
let mut on_failure_operations = vec![];
let mut on_failure_kv_and_filters = vec![];
for (idx, key) in keys.iter().enumerate() {
let filter = &mut filters[idx];
let value = filter(&mut set).with_context(|| error::UnexpectedSnafu {
err_msg: format!(
"Missing value, key: {}",
String::from_utf8_lossy(key.get_inner())
),
})?;
let (origin_key, tombstone_key) = TombstoneKey(key.get_inner()).into_keys();
// Compares the atomic key.
if key.is_atomic() {
tombstone_comparison.push(Compare::with_not_exist_value(
tombstone_key.clone(),
CompareOp::Equal,
));
tombstone_comparison.push(Compare::with_value(
origin_key.clone(),
CompareOp::Equal,
value.clone(),
));
let (op, filter) = TxnOpGetResponseSet::build_get_op(origin_key.clone());
on_failure_operations.push(op);
on_failure_kv_and_filters.push((origin_key.clone(), value.clone(), filter));
}
tombstone_operations.push(TxnOp::Delete(origin_key));
tombstone_operations.push(TxnOp::Put(tombstone_key, value));
}
let txn = if !tombstone_comparison.is_empty() {
Txn::new().when(tombstone_comparison)
} else {
Txn::new()
}
.and_then(tombstone_operations);
let txn = if !on_failure_operations.is_empty() {
txn.or_else(on_failure_operations)
} else {
txn
};
let mut resp = self.kv_backend.txn(txn).await?;
// TODO(weny): add tests for atomic key changed.
if !resp.succeeded {
let set = TxnOpGetResponseSet::from(&mut resp.responses);
let err_msg = format_on_failure_error_message(set, on_failure_kv_and_filters);
return error::CasKeyChangedSnafu { err_msg }.fail();
}
Ok(())
self.move_values(keys, dest_keys).await
}
/// Restores tombstones for keys.
///
/// Preforms to:
/// - retrieve all tombstone values corresponding `keys`.
/// - stores tombstone values.
pub(crate) async fn restore(&self, keys: Vec<Key>) -> Result<()> {
// Builds transaction to retrieve all tombstone values
let tombstone_keys = keys
.iter()
.map(|key| TombstoneKey(key.get_inner()))
.collect::<Vec<_>>();
let (operations, mut filters): (Vec<_>, Vec<_>) =
tombstone_keys.iter().map(|key| key.build_get_op()).unzip();
/// - restore origin value.
/// - deletes tombstone values.
pub(crate) async fn restore(&self, keys: Vec<Vec<u8>>) -> Result<()> {
let (keys, dest_keys): (Vec<_>, Vec<_>) = keys
.into_iter()
.map(|key| {
let tombstone_key = to_tombstone(&key);
(tombstone_key, key)
})
.unzip();
let txn = Txn::new().and_then(operations);
let mut resp = self.kv_backend.txn(txn).await?;
ensure!(
resp.succeeded,
error::UnexpectedSnafu {
err_msg: format!(
"Failed to retrieves the metadata, keys: {}",
format_keys(&keys)
),
}
);
let mut set = TxnOpGetResponseSet::from(&mut resp.responses);
// Builds the restore tombstone transaction.
let mut tombstone_operations = Vec::with_capacity(keys.len() * 2);
let mut tombstone_comparison = vec![];
let mut on_failure_operations = vec![];
let mut on_failure_kv_and_filters = vec![];
for (idx, key) in keys.iter().enumerate() {
let filter = &mut filters[idx];
let value = filter(&mut set).with_context(|| error::UnexpectedSnafu {
err_msg: format!(
"Missing value, key: {}",
String::from_utf8_lossy(key.get_inner())
),
})?;
let (origin_key, tombstone_key) = tombstone_keys[idx].to_keys();
// Compares the atomic key.
if key.is_atomic() {
tombstone_comparison.push(Compare::with_not_exist_value(
origin_key.clone(),
CompareOp::Equal,
));
tombstone_comparison.push(Compare::with_value(
tombstone_key.clone(),
CompareOp::Equal,
value.clone(),
));
let (op, filter) = tombstone_keys[idx].build_get_op();
on_failure_operations.push(op);
on_failure_kv_and_filters.push((tombstone_key.clone(), value.clone(), filter));
}
tombstone_operations.push(TxnOp::Delete(tombstone_key));
tombstone_operations.push(TxnOp::Put(origin_key, value));
}
let txn = if !tombstone_comparison.is_empty() {
Txn::new().when(tombstone_comparison)
} else {
Txn::new()
}
.and_then(tombstone_operations);
let txn = if !on_failure_operations.is_empty() {
txn.or_else(on_failure_operations)
} else {
txn
};
let mut resp = self.kv_backend.txn(txn).await?;
// TODO(weny): add tests for atomic key changed.
if !resp.succeeded {
let set = TxnOpGetResponseSet::from(&mut resp.responses);
let err_msg = format_on_failure_error_message(set, on_failure_kv_and_filters);
return error::CasKeyChangedSnafu { err_msg }.fail();
}
Ok(())
self.move_values(keys, dest_keys).await
}
/// Deletes tombstones for keys.
/// Deletes tombstones values for the specified `keys`.
pub(crate) async fn delete(&self, keys: Vec<Vec<u8>>) -> Result<()> {
let operations = keys
.iter()
.map(|key| TxnOp::Delete(TombstoneKey(key).to_tombstone_key()))
.map(|key| TxnOp::Delete(to_tombstone(key)))
.collect::<Vec<_>>();
let txn = Txn::new().and_then(operations);
@@ -342,13 +197,41 @@ impl TombstoneManager {
#[cfg(test)]
mod tests {
use std::collections::HashMap;
use std::sync::Arc;
use crate::key::tombstone::{Key, TombstoneKey, TombstoneManager};
use super::to_tombstone;
use crate::error::Error;
use crate::key::tombstone::TombstoneManager;
use crate::kv_backend::memory::MemoryKvBackend;
use crate::kv_backend::KvBackend;
use crate::rpc::store::PutRequest;
#[derive(Debug, Clone)]
struct MoveValue {
key: Vec<u8>,
dest_key: Vec<u8>,
value: Vec<u8>,
}
async fn check_moved_values(
kv_backend: Arc<MemoryKvBackend<Error>>,
move_values: &[MoveValue],
) {
for MoveValue {
key,
dest_key,
value,
} in move_values
{
assert!(kv_backend.get(key).await.unwrap().is_none());
assert_eq!(
&kv_backend.get(dest_key).await.unwrap().unwrap().value,
value,
);
}
}
#[tokio::test]
async fn test_create_tombstone() {
let kv_backend = Arc::new(MemoryKvBackend::default());
@@ -362,14 +245,14 @@ mod tests {
.await
.unwrap();
tombstone_manager
.create(vec![Key::compare_and_swap("bar"), Key::new("foo")])
.create(vec![b"bar".to_vec(), b"foo".to_vec()])
.await
.unwrap();
assert!(!kv_backend.exists(b"bar").await.unwrap());
assert!(!kv_backend.exists(b"foo").await.unwrap());
assert_eq!(
kv_backend
.get(&TombstoneKey(&"bar".into()).to_tombstone_key())
.get(&to_tombstone(b"bar"))
.await
.unwrap()
.unwrap()
@@ -378,7 +261,7 @@ mod tests {
);
assert_eq!(
kv_backend
.get(&TombstoneKey(&"foo".into()).to_tombstone_key())
.get(&to_tombstone(b"foo"))
.await
.unwrap()
.unwrap()
@@ -389,9 +272,10 @@ mod tests {
}
#[tokio::test]
async fn test_create_tombstone_without_atomic_key() {
async fn test_create_tombstone_with_non_exist_values() {
let kv_backend = Arc::new(MemoryKvBackend::default());
let tombstone_manager = TombstoneManager::new(kv_backend.clone());
kv_backend
.put(PutRequest::new().with_key("bar").with_value("baz"))
.await
@@ -400,52 +284,20 @@ mod tests {
.put(PutRequest::new().with_key("foo").with_value("hi"))
.await
.unwrap();
tombstone_manager
.create(vec![Key::new("bar"), Key::new("foo")])
.create(vec![b"bar".to_vec(), b"baz".to_vec()])
.await
.unwrap();
assert!(!kv_backend.exists(b"bar").await.unwrap());
assert!(!kv_backend.exists(b"foo").await.unwrap());
assert_eq!(
kv_backend
.get(&TombstoneKey(&"bar".into()).to_tombstone_key())
.await
.unwrap()
.unwrap()
.value,
b"baz"
);
assert_eq!(
kv_backend
.get(&TombstoneKey(&"foo".into()).to_tombstone_key())
.await
.unwrap()
.unwrap()
.value,
b"hi"
);
assert_eq!(kv_backend.len(), 2);
}
#[tokio::test]
async fn test_create_tombstone_origin_value_not_found_err() {
let kv_backend = Arc::new(MemoryKvBackend::default());
let tombstone_manager = TombstoneManager::new(kv_backend.clone());
kv_backend
.put(PutRequest::new().with_key("bar").with_value("baz"))
.await
.unwrap();
kv_backend
.put(PutRequest::new().with_key("foo").with_value("hi"))
.await
.unwrap();
let err = tombstone_manager
.create(vec![Key::compare_and_swap("bar"), Key::new("baz")])
.await
.unwrap_err();
assert!(err.to_string().contains("Missing value"));
check_moved_values(
kv_backend.clone(),
&[MoveValue {
key: b"bar".to_vec(),
dest_key: to_tombstone(b"bar"),
value: b"baz".to_vec(),
}],
)
.await;
}
#[tokio::test]
@@ -462,63 +314,16 @@ mod tests {
.unwrap();
let expected_kvs = kv_backend.dump();
tombstone_manager
.create(vec![Key::compare_and_swap("bar"), Key::new("foo")])
.create(vec![b"bar".to_vec(), b"foo".to_vec()])
.await
.unwrap();
tombstone_manager
.restore(vec![Key::compare_and_swap("bar"), Key::new("foo")])
.restore(vec![b"bar".to_vec(), b"foo".to_vec()])
.await
.unwrap();
assert_eq!(expected_kvs, kv_backend.dump());
}
#[tokio::test]
async fn test_restore_tombstone_without_atomic_key() {
let kv_backend = Arc::new(MemoryKvBackend::default());
let tombstone_manager = TombstoneManager::new(kv_backend.clone());
kv_backend
.put(PutRequest::new().with_key("bar").with_value("baz"))
.await
.unwrap();
kv_backend
.put(PutRequest::new().with_key("foo").with_value("hi"))
.await
.unwrap();
let expected_kvs = kv_backend.dump();
tombstone_manager
.create(vec![Key::compare_and_swap("bar"), Key::new("foo")])
.await
.unwrap();
tombstone_manager
.restore(vec![Key::new("bar"), Key::new("foo")])
.await
.unwrap();
assert_eq!(expected_kvs, kv_backend.dump());
}
#[tokio::test]
async fn test_restore_tombstone_origin_value_not_found_err() {
let kv_backend = Arc::new(MemoryKvBackend::default());
let tombstone_manager = TombstoneManager::new(kv_backend.clone());
kv_backend
.put(PutRequest::new().with_key("bar").with_value("baz"))
.await
.unwrap();
kv_backend
.put(PutRequest::new().with_key("foo").with_value("hi"))
.await
.unwrap();
tombstone_manager
.create(vec![Key::compare_and_swap("bar"), Key::new("foo")])
.await
.unwrap();
let err = tombstone_manager
.restore(vec![Key::new("bar"), Key::new("baz")])
.await
.unwrap_err();
assert!(err.to_string().contains("Missing value"));
}
#[tokio::test]
async fn test_delete_tombstone() {
let kv_backend = Arc::new(MemoryKvBackend::default());
@@ -532,7 +337,7 @@ mod tests {
.await
.unwrap();
tombstone_manager
.create(vec![Key::compare_and_swap("bar"), Key::new("foo")])
.create(vec![b"bar".to_vec(), b"foo".to_vec()])
.await
.unwrap();
tombstone_manager
@@ -541,4 +346,216 @@ mod tests {
.unwrap();
assert!(kv_backend.is_empty());
}
#[tokio::test]
async fn test_move_values() {
let kv_backend = Arc::new(MemoryKvBackend::default());
let tombstone_manager = TombstoneManager::new(kv_backend.clone());
let kvs = HashMap::from([
(b"bar".to_vec(), b"baz".to_vec()),
(b"foo".to_vec(), b"hi".to_vec()),
(b"baz".to_vec(), b"hello".to_vec()),
]);
for (key, value) in &kvs {
kv_backend
.put(
PutRequest::new()
.with_key(key.clone())
.with_value(value.clone()),
)
.await
.unwrap();
}
let move_values = kvs
.iter()
.map(|(key, value)| MoveValue {
key: key.clone(),
dest_key: to_tombstone(key),
value: value.clone(),
})
.collect::<Vec<_>>();
let (keys, dest_keys): (Vec<_>, Vec<_>) = move_values
.clone()
.into_iter()
.map(|kv| (kv.key, kv.dest_key))
.unzip();
tombstone_manager
.move_values(keys.clone(), dest_keys.clone())
.await
.unwrap();
check_moved_values(kv_backend.clone(), &move_values).await;
// Moves again
tombstone_manager
.move_values(keys.clone(), dest_keys.clone())
.await
.unwrap();
check_moved_values(kv_backend.clone(), &move_values).await;
}
#[tokio::test]
async fn test_move_values_with_non_exists_values() {
let kv_backend = Arc::new(MemoryKvBackend::default());
let tombstone_manager = TombstoneManager::new(kv_backend.clone());
let kvs = HashMap::from([
(b"bar".to_vec(), b"baz".to_vec()),
(b"foo".to_vec(), b"hi".to_vec()),
(b"baz".to_vec(), b"hello".to_vec()),
]);
for (key, value) in &kvs {
kv_backend
.put(
PutRequest::new()
.with_key(key.clone())
.with_value(value.clone()),
)
.await
.unwrap();
}
let move_values = kvs
.iter()
.map(|(key, value)| MoveValue {
key: key.clone(),
dest_key: to_tombstone(key),
value: value.clone(),
})
.collect::<Vec<_>>();
let (mut keys, mut dest_keys): (Vec<_>, Vec<_>) = move_values
.clone()
.into_iter()
.map(|kv| (kv.key, kv.dest_key))
.unzip();
keys.push(b"non-exists".to_vec());
dest_keys.push(b"hi/non-exists".to_vec());
tombstone_manager
.move_values(keys.clone(), dest_keys.clone())
.await
.unwrap();
check_moved_values(kv_backend.clone(), &move_values).await;
// Moves again
tombstone_manager
.move_values(keys.clone(), dest_keys.clone())
.await
.unwrap();
check_moved_values(kv_backend.clone(), &move_values).await;
}
#[tokio::test]
async fn test_move_values_changed() {
let kv_backend = Arc::new(MemoryKvBackend::default());
let tombstone_manager = TombstoneManager::new(kv_backend.clone());
let kvs = HashMap::from([
(b"bar".to_vec(), b"baz".to_vec()),
(b"foo".to_vec(), b"hi".to_vec()),
(b"baz".to_vec(), b"hello".to_vec()),
]);
for (key, value) in &kvs {
kv_backend
.put(
PutRequest::new()
.with_key(key.clone())
.with_value(value.clone()),
)
.await
.unwrap();
}
kv_backend
.put(PutRequest::new().with_key("baz").with_value("changed"))
.await
.unwrap();
let move_values = kvs
.iter()
.map(|(key, value)| MoveValue {
key: key.clone(),
dest_key: to_tombstone(key),
value: value.clone(),
})
.collect::<Vec<_>>();
let (keys, dest_keys): (Vec<_>, Vec<_>) = move_values
.clone()
.into_iter()
.map(|kv| (kv.key, kv.dest_key))
.unzip();
tombstone_manager
.move_values(keys, dest_keys)
.await
.unwrap();
}
#[tokio::test]
async fn test_move_values_overwrite_dest_values() {
let kv_backend = Arc::new(MemoryKvBackend::default());
let tombstone_manager = TombstoneManager::new(kv_backend.clone());
let kvs = HashMap::from([
(b"bar".to_vec(), b"baz".to_vec()),
(b"foo".to_vec(), b"hi".to_vec()),
(b"baz".to_vec(), b"hello".to_vec()),
]);
for (key, value) in &kvs {
kv_backend
.put(
PutRequest::new()
.with_key(key.clone())
.with_value(value.clone()),
)
.await
.unwrap();
}
// Prepares
let move_values = kvs
.iter()
.map(|(key, value)| MoveValue {
key: key.clone(),
dest_key: to_tombstone(key),
value: value.clone(),
})
.collect::<Vec<_>>();
let (keys, dest_keys): (Vec<_>, Vec<_>) = move_values
.clone()
.into_iter()
.map(|kv| (kv.key, kv.dest_key))
.unzip();
tombstone_manager
.move_values(keys, dest_keys)
.await
.unwrap();
check_moved_values(kv_backend.clone(), &move_values).await;
// Overwrites existing dest keys.
let kvs = HashMap::from([
(b"bar".to_vec(), b"new baz".to_vec()),
(b"foo".to_vec(), b"new hi".to_vec()),
(b"baz".to_vec(), b"new baz".to_vec()),
]);
for (key, value) in &kvs {
kv_backend
.put(
PutRequest::new()
.with_key(key.clone())
.with_value(value.clone()),
)
.await
.unwrap();
}
let move_values = kvs
.iter()
.map(|(key, value)| MoveValue {
key: key.clone(),
dest_key: to_tombstone(key),
value: value.clone(),
})
.collect::<Vec<_>>();
let (keys, dest_keys): (Vec<_>, Vec<_>) = move_values
.clone()
.into_iter()
.map(|kv| (kv.key, kv.dest_key))
.unzip();
tombstone_manager
.move_values(keys, dest_keys)
.await
.unwrap();
check_moved_values(kv_backend.clone(), &move_values).await;
}
}

View File

@@ -24,18 +24,6 @@ use crate::rpc::KeyValue;
pub(crate) struct TxnOpGetResponseSet(Vec<KeyValue>);
impl TxnOpGetResponseSet {
/// Returns a [TxnOp] to retrieve the value corresponding `key` and
/// a filter to consume corresponding [KeyValue] from [TxnOpGetResponseSet].
pub(crate) fn build_get_op<T: Into<Vec<u8>>>(
key: T,
) -> (
TxnOp,
impl FnMut(&'_ mut TxnOpGetResponseSet) -> Option<Vec<u8>>,
) {
let key = key.into();
(TxnOp::Get(key.clone()), TxnOpGetResponseSet::filter(key))
}
/// Returns a filter to consume a [KeyValue] where the key equals `key`.
pub(crate) fn filter(key: Vec<u8>) -> impl FnMut(&mut TxnOpGetResponseSet) -> Option<Vec<u8>> {
move |set| {

View File

@@ -97,7 +97,6 @@ impl Display for RangeRequest {
}
impl RangeRequest {
#[inline]
pub fn new() -> Self {
Self {
key: vec![],
@@ -114,7 +113,6 @@ impl RangeRequest {
/// key is the first key for the range, If range_end is not given, the
/// request only looks up key.
#[inline]
pub fn with_key(mut self, key: impl Into<Vec<u8>>) -> Self {
self.key = key.into();
self
@@ -129,7 +127,6 @@ impl RangeRequest {
/// then the range request gets all keys prefixed with key.
/// If both key and range_end are '\0', then the range request returns all
/// keys.
#[inline]
pub fn with_range(mut self, key: impl Into<Vec<u8>>, range_end: impl Into<Vec<u8>>) -> Self {
self.key = key.into();
self.range_end = range_end.into();
@@ -138,7 +135,6 @@ impl RangeRequest {
/// Gets all keys prefixed with key.
/// range_end is the key plus one (e.g., "aa"+1 == "ab", "a\xff"+1 == "b"),
#[inline]
pub fn with_prefix(mut self, key: impl Into<Vec<u8>>) -> Self {
self.key = key.into();
self.range_end = util::get_prefix_end_key(&self.key);
@@ -147,14 +143,12 @@ impl RangeRequest {
/// limit is a limit on the number of keys returned for the request. When
/// limit is set to 0, it is treated as no limit.
#[inline]
pub fn with_limit(mut self, limit: i64) -> Self {
self.limit = limit;
self
}
/// keys_only when set returns only the keys and not the values.
#[inline]
pub fn with_keys_only(mut self) -> Self {
self.keys_only = true;
self
@@ -204,7 +198,6 @@ impl RangeResponse {
}
}
#[inline]
pub fn take_kvs(&mut self) -> Vec<KeyValue> {
self.kvs.drain(..).collect()
}
@@ -244,7 +237,6 @@ impl From<PbPutRequest> for PutRequest {
}
impl PutRequest {
#[inline]
pub fn new() -> Self {
Self {
key: vec![],
@@ -254,7 +246,6 @@ impl PutRequest {
}
/// key is the key, in bytes, to put into the key-value store.
#[inline]
pub fn with_key(mut self, key: impl Into<Vec<u8>>) -> Self {
self.key = key.into();
self
@@ -262,7 +253,6 @@ impl PutRequest {
/// value is the value, in bytes, to associate with the key in the
/// key-value store.
#[inline]
pub fn with_value(mut self, value: impl Into<Vec<u8>>) -> Self {
self.value = value.into();
self
@@ -270,7 +260,6 @@ impl PutRequest {
/// If prev_kv is set, gets the previous key-value pair before changing it.
/// The previous key-value pair will be returned in the put response.
#[inline]
pub fn with_prev_kv(mut self) -> Self {
self.prev_kv = true;
self
@@ -330,18 +319,15 @@ impl Default for BatchGetRequest {
}
impl BatchGetRequest {
#[inline]
pub fn new() -> Self {
Self { keys: vec![] }
}
#[inline]
pub fn with_keys(mut self, keys: Vec<Vec<u8>>) -> Self {
self.keys = keys;
self
}
#[inline]
pub fn add_key(mut self, key: impl Into<Vec<u8>>) -> Self {
self.keys.push(key.into());
self
@@ -416,7 +402,6 @@ impl From<PbBatchPutRequest> for BatchPutRequest {
}
impl BatchPutRequest {
#[inline]
pub fn new() -> Self {
Self {
kvs: vec![],
@@ -424,7 +409,6 @@ impl BatchPutRequest {
}
}
#[inline]
pub fn add_kv(mut self, key: impl Into<Vec<u8>>, value: impl Into<Vec<u8>>) -> Self {
self.kvs.push(KeyValue {
key: key.into(),
@@ -435,7 +419,6 @@ impl BatchPutRequest {
/// If prev_kv is set, gets the previous key-value pair before changing it.
/// The previous key-value pair will be returned in the put response.
#[inline]
pub fn with_prev_kv(mut self) -> Self {
self.prev_kv = true;
self
@@ -467,7 +450,6 @@ impl BatchPutResponse {
}
}
#[inline]
pub fn take_prev_kvs(&mut self) -> Vec<KeyValue> {
self.prev_kvs.drain(..).collect()
}
@@ -501,7 +483,6 @@ impl From<PbBatchDeleteRequest> for BatchDeleteRequest {
}
impl BatchDeleteRequest {
#[inline]
pub fn new() -> Self {
Self {
keys: vec![],
@@ -509,7 +490,12 @@ impl BatchDeleteRequest {
}
}
#[inline]
/// Sets `keys`.
pub fn with_keys(mut self, keys: Vec<Vec<u8>>) -> Self {
self.keys = keys;
self
}
pub fn add_key(mut self, key: impl Into<Vec<u8>>) -> Self {
self.keys.push(key.into());
self
@@ -517,7 +503,6 @@ impl BatchDeleteRequest {
/// If prev_kv is set, gets the previous key-value pair before deleting it.
/// The previous key-value pair will be returned in the batch delete response.
#[inline]
pub fn with_prev_kv(mut self) -> Self {
self.prev_kv = true;
self
@@ -582,7 +567,6 @@ impl From<PbCompareAndPutRequest> for CompareAndPutRequest {
}
impl CompareAndPutRequest {
#[inline]
pub fn new() -> Self {
Self {
key: vec![],
@@ -592,14 +576,12 @@ impl CompareAndPutRequest {
}
/// key is the key, in bytes, to put into the key-value store.
#[inline]
pub fn with_key(mut self, key: impl Into<Vec<u8>>) -> Self {
self.key = key.into();
self
}
/// expect is the previous value, in bytes
#[inline]
pub fn with_expect(mut self, expect: impl Into<Vec<u8>>) -> Self {
self.expect = expect.into();
self
@@ -607,7 +589,6 @@ impl CompareAndPutRequest {
/// value is the value, in bytes, to associate with the key in the
/// key-value store.
#[inline]
pub fn with_value(mut self, value: impl Into<Vec<u8>>) -> Self {
self.value = value.into();
self
@@ -649,12 +630,10 @@ impl CompareAndPutResponse {
}
}
#[inline]
pub fn is_success(&self) -> bool {
self.success
}
#[inline]
pub fn take_prev_kv(&mut self) -> Option<KeyValue> {
self.prev_kv.take()
}
@@ -703,7 +682,6 @@ impl From<PbDeleteRangeRequest> for DeleteRangeRequest {
}
impl DeleteRangeRequest {
#[inline]
pub fn new() -> Self {
Self {
key: vec![],
@@ -719,7 +697,6 @@ impl DeleteRangeRequest {
/// key is the first key to delete in the range. If range_end is not given,
/// the range is defined to contain only the key argument.
#[inline]
pub fn with_key(mut self, key: impl Into<Vec<u8>>) -> Self {
self.key = key.into();
self
@@ -735,7 +712,6 @@ impl DeleteRangeRequest {
/// the keys with the prefix (the given key).
/// If range_end is '\0', the range is all keys greater than or equal to the
/// key argument.
#[inline]
pub fn with_range(mut self, key: impl Into<Vec<u8>>, range_end: impl Into<Vec<u8>>) -> Self {
self.key = key.into();
self.range_end = range_end.into();
@@ -744,7 +720,6 @@ impl DeleteRangeRequest {
/// Deletes all keys prefixed with key.
/// range_end is one bit larger than the given key.
#[inline]
pub fn with_prefix(mut self, key: impl Into<Vec<u8>>) -> Self {
self.key = key.into();
self.range_end = util::get_prefix_end_key(&self.key);
@@ -753,7 +728,6 @@ impl DeleteRangeRequest {
/// If prev_kv is set, gets the previous key-value pairs before deleting it.
/// The previous key-value pairs will be returned in the delete response.
#[inline]
pub fn with_prev_kv(mut self) -> Self {
self.prev_kv = true;
self
@@ -788,12 +762,10 @@ impl DeleteRangeResponse {
}
}
#[inline]
pub fn deleted(&self) -> i64 {
self.deleted
}
#[inline]
pub fn take_prev_kvs(&mut self) -> Vec<KeyValue> {
self.prev_kvs.drain(..).collect()
}