diff --git a/src/common/meta/src/key/tombstone.rs b/src/common/meta/src/key/tombstone.rs index 44f17bc794..3048d14318 100644 --- a/src/common/meta/src/key/tombstone.rs +++ b/src/common/meta/src/key/tombstone.rs @@ -14,13 +14,14 @@ use std::collections::HashMap; +use common_telemetry::debug; use snafu::ensure; use crate::error::{self, Result}; use crate::key::txn_helper::TxnOpGetResponseSet; use crate::kv_backend::txn::{Compare, CompareOp, Txn, TxnOp}; use crate::kv_backend::KvBackendRef; -use crate::rpc::store::BatchGetRequest; +use crate::rpc::store::{BatchDeleteRequest, BatchGetRequest}; /// [TombstoneManager] provides the ability to: /// - logically delete values @@ -28,6 +29,9 @@ use crate::rpc::store::BatchGetRequest; pub struct TombstoneManager { kv_backend: KvBackendRef, tombstone_prefix: String, + // Only used for testing. + #[cfg(test)] + max_txn_ops: Option, } const TOMBSTONE_PREFIX: &str = "__tombstone/"; @@ -35,10 +39,7 @@ const TOMBSTONE_PREFIX: &str = "__tombstone/"; impl TombstoneManager { /// Returns [TombstoneManager]. pub fn new(kv_backend: KvBackendRef) -> Self { - Self { - kv_backend, - tombstone_prefix: TOMBSTONE_PREFIX.to_string(), - } + Self::new_with_prefix(kv_backend, TOMBSTONE_PREFIX) } /// Returns [TombstoneManager] with a custom tombstone prefix. @@ -46,6 +47,8 @@ impl TombstoneManager { Self { kv_backend, tombstone_prefix: prefix.to_string(), + #[cfg(test)] + max_txn_ops: None, } } @@ -53,6 +56,11 @@ impl TombstoneManager { [self.tombstone_prefix.as_bytes(), key].concat() } + #[cfg(test)] + pub fn set_max_txn_ops(&mut self, max_txn_ops: usize) { + self.max_txn_ops = Some(max_txn_ops); + } + /// Moves value to `dest_key`. /// /// Puts `value` to `dest_key` if the value of `src_key` equals `value`. @@ -83,7 +91,11 @@ impl TombstoneManager { ensure!( keys.len() == dest_keys.len(), error::UnexpectedSnafu { - err_msg: "The length of keys does not match the length of dest_keys." + err_msg: format!( + "The length of keys({}) does not match the length of dest_keys({}).", + keys.len(), + dest_keys.len() + ), } ); // The key -> dest key mapping. @@ -136,19 +148,45 @@ impl TombstoneManager { .fail() } + fn max_txn_ops(&self) -> usize { + #[cfg(test)] + if let Some(max_txn_ops) = self.max_txn_ops { + return max_txn_ops; + } + self.kv_backend.max_txn_ops() + } + /// Moves values to `dest_key`. /// /// Returns the number of keys that were moved. async fn move_values(&self, keys: Vec>, dest_keys: Vec>) -> Result { - let chunk_size = self.kv_backend.max_txn_ops() / 2; - if keys.len() > chunk_size { - let keys_chunks = keys.chunks(chunk_size).collect::>(); - let dest_keys_chunks = keys.chunks(chunk_size).collect::>(); - for (keys, dest_keys) in keys_chunks.into_iter().zip(dest_keys_chunks) { - self.move_values_inner(keys, dest_keys).await?; + ensure!( + keys.len() == dest_keys.len(), + error::UnexpectedSnafu { + err_msg: format!( + "The length of keys({}) does not match the length of dest_keys({}).", + keys.len(), + dest_keys.len() + ), } - - Ok(keys.len()) + ); + if keys.is_empty() { + return Ok(0); + } + let chunk_size = self.max_txn_ops() / 2; + if keys.len() > chunk_size { + debug!( + "Moving values with multiple chunks, keys len: {}, chunk_size: {}", + keys.len(), + chunk_size + ); + let mut moved_keys = 0; + let keys_chunks = keys.chunks(chunk_size).collect::>(); + let dest_keys_chunks = dest_keys.chunks(chunk_size).collect::>(); + for (keys, dest_keys) in keys_chunks.into_iter().zip(dest_keys_chunks) { + moved_keys += self.move_values_inner(keys, dest_keys).await?; + } + Ok(moved_keys) } else { self.move_values_inner(&keys, &dest_keys).await } @@ -196,15 +234,18 @@ impl TombstoneManager { /// /// Returns the number of keys that were deleted. pub async fn delete(&self, keys: Vec>) -> Result { - let operations = keys + let keys = keys .iter() - .map(|key| TxnOp::Delete(self.to_tombstone(key))) + .map(|key| self.to_tombstone(key)) .collect::>(); - let txn = Txn::new().and_then(operations); - // Always success. - let _ = self.kv_backend.txn(txn).await?; - Ok(keys.len()) + let num_keys = keys.len(); + let _ = self + .kv_backend + .batch_delete(BatchDeleteRequest::new().with_keys(keys)) + .await?; + + Ok(num_keys) } } @@ -392,16 +433,73 @@ mod tests { .into_iter() .map(|kv| (kv.key, kv.dest_key)) .unzip(); - tombstone_manager + let moved_keys = tombstone_manager .move_values(keys.clone(), dest_keys.clone()) .await .unwrap(); + assert_eq!(kvs.len(), moved_keys); check_moved_values(kv_backend.clone(), &move_values).await; // Moves again - tombstone_manager + let moved_keys = tombstone_manager .move_values(keys.clone(), dest_keys.clone()) .await .unwrap(); + assert_eq!(0, moved_keys); + check_moved_values(kv_backend.clone(), &move_values).await; + } + + #[tokio::test] + async fn test_move_values_with_max_txn_ops() { + common_telemetry::init_default_ut_logging(); + let kv_backend = Arc::new(MemoryKvBackend::default()); + let mut tombstone_manager = TombstoneManager::new(kv_backend.clone()); + tombstone_manager.set_max_txn_ops(4); + let kvs = HashMap::from([ + (b"bar".to_vec(), b"baz".to_vec()), + (b"foo".to_vec(), b"hi".to_vec()), + (b"baz".to_vec(), b"hello".to_vec()), + (b"qux".to_vec(), b"world".to_vec()), + (b"quux".to_vec(), b"world".to_vec()), + (b"quuux".to_vec(), b"world".to_vec()), + (b"quuuux".to_vec(), b"world".to_vec()), + (b"quuuuux".to_vec(), b"world".to_vec()), + (b"quuuuuux".to_vec(), b"world".to_vec()), + ]); + for (key, value) in &kvs { + kv_backend + .put( + PutRequest::new() + .with_key(key.clone()) + .with_value(value.clone()), + ) + .await + .unwrap(); + } + let move_values = kvs + .iter() + .map(|(key, value)| MoveValue { + key: key.clone(), + dest_key: tombstone_manager.to_tombstone(key), + value: value.clone(), + }) + .collect::>(); + let (keys, dest_keys): (Vec<_>, Vec<_>) = move_values + .clone() + .into_iter() + .map(|kv| (kv.key, kv.dest_key)) + .unzip(); + let moved_keys = tombstone_manager + .move_values(keys.clone(), dest_keys.clone()) + .await + .unwrap(); + assert_eq!(kvs.len(), moved_keys); + check_moved_values(kv_backend.clone(), &move_values).await; + // Moves again + let moved_keys = tombstone_manager + .move_values(keys.clone(), dest_keys.clone()) + .await + .unwrap(); + assert_eq!(0, moved_keys); check_moved_values(kv_backend.clone(), &move_values).await; } @@ -439,17 +537,19 @@ mod tests { .unzip(); keys.push(b"non-exists".to_vec()); dest_keys.push(b"hi/non-exists".to_vec()); - tombstone_manager + let moved_keys = tombstone_manager .move_values(keys.clone(), dest_keys.clone()) .await .unwrap(); check_moved_values(kv_backend.clone(), &move_values).await; + assert_eq!(3, moved_keys); // Moves again - tombstone_manager + let moved_keys = tombstone_manager .move_values(keys.clone(), dest_keys.clone()) .await .unwrap(); check_moved_values(kv_backend.clone(), &move_values).await; + assert_eq!(0, moved_keys); } #[tokio::test] @@ -490,10 +590,11 @@ mod tests { .into_iter() .map(|kv| (kv.key, kv.dest_key)) .unzip(); - tombstone_manager + let moved_keys = tombstone_manager .move_values(keys, dest_keys) .await .unwrap(); + assert_eq!(kvs.len(), moved_keys); } #[tokio::test] @@ -571,4 +672,24 @@ mod tests { .unwrap(); check_moved_values(kv_backend.clone(), &move_values).await; } + + #[tokio::test] + async fn test_move_values_with_different_lengths() { + let kv_backend = Arc::new(MemoryKvBackend::default()); + let tombstone_manager = TombstoneManager::new(kv_backend.clone()); + + let keys = vec![b"bar".to_vec(), b"foo".to_vec()]; + let dest_keys = vec![b"bar".to_vec(), b"foo".to_vec(), b"baz".to_vec()]; + + let err = tombstone_manager + .move_values(keys, dest_keys) + .await + .unwrap_err(); + assert!(err + .to_string() + .contains("The length of keys(2) does not match the length of dest_keys(3)."),); + + let moved_keys = tombstone_manager.move_values(vec![], vec![]).await.unwrap(); + assert_eq!(0, moved_keys); + } }