diff --git a/src/common/meta/src/key/tombstone.rs b/src/common/meta/src/key/tombstone.rs index 470482c75b..22039142f5 100644 --- a/src/common/meta/src/key/tombstone.rs +++ b/src/common/meta/src/key/tombstone.rs @@ -59,6 +59,8 @@ pub struct TombstoneManager { } const TOMBSTONE_PREFIX: &str = "__tombstone/"; +const MOVE_VALUE_TXN_OPS_PER_KEY: usize = 4; +const RESTORE_VALUE_TXN_OPS_PER_KEY: usize = 6; impl TombstoneManager { /// Returns [TombstoneManager]. @@ -292,7 +294,12 @@ impl TombstoneManager { if keys.is_empty() { return Ok(0); } - let chunk_size = self.max_txn_ops() / 2; + let txn_ops_per_key = if require_dest_not_exists { + RESTORE_VALUE_TXN_OPS_PER_KEY + } else { + MOVE_VALUE_TXN_OPS_PER_KEY + }; + let chunk_size = (self.max_txn_ops() / txn_ops_per_key).max(1); if keys.len() > chunk_size { debug!( "Moving values with multiple chunks, keys len: {}, chunk_size: {}", @@ -374,14 +381,84 @@ impl TombstoneManager { #[cfg(test)] mod tests { + use std::any::Any; use std::collections::HashMap; use std::sync::Arc; - use crate::error::Error; + use crate::error::{Error, Result}; use crate::key::tombstone::TombstoneManager; - use crate::kv_backend::KvBackend; use crate::kv_backend::memory::MemoryKvBackend; - use crate::rpc::store::PutRequest; + use crate::kv_backend::txn::{Txn, TxnRequest, TxnResponse}; + use crate::kv_backend::{KvBackend, TxnService}; + use crate::rpc::store::{ + BatchDeleteRequest, BatchDeleteResponse, BatchGetRequest, BatchGetResponse, + BatchPutRequest, BatchPutResponse, DeleteRangeRequest, DeleteRangeResponse, PutRequest, + PutResponse, RangeRequest, RangeResponse, + }; + + struct TxnOpLimitKvBackend { + inner: Arc>, + max_txn_ops: usize, + } + + #[async_trait::async_trait] + impl TxnService for TxnOpLimitKvBackend { + type Error = Error; + + async fn txn(&self, txn: Txn) -> Result { + let TxnRequest { + compare, + success, + failure, + } = txn.req(); + let txn_ops = compare.len() + success.len() + failure.len(); + assert!( + txn_ops <= self.max_txn_ops, + "txn ops {txn_ops} exceeds limit {}", + self.max_txn_ops + ); + self.inner.txn(txn).await + } + + fn max_txn_ops(&self) -> usize { + self.max_txn_ops + } + } + + #[async_trait::async_trait] + impl KvBackend for TxnOpLimitKvBackend { + fn name(&self) -> &str { + "txn_op_limit" + } + + fn as_any(&self) -> &dyn Any { + self + } + + async fn range(&self, req: RangeRequest) -> Result { + self.inner.range(req).await + } + + async fn put(&self, req: PutRequest) -> Result { + self.inner.put(req).await + } + + async fn batch_put(&self, req: BatchPutRequest) -> Result { + self.inner.batch_put(req).await + } + + async fn batch_get(&self, req: BatchGetRequest) -> Result { + self.inner.batch_get(req).await + } + + async fn delete_range(&self, req: DeleteRangeRequest) -> Result { + self.inner.delete_range(req).await + } + + async fn batch_delete(&self, req: BatchDeleteRequest) -> Result { + self.inner.batch_delete(req).await + } + } #[derive(Debug, Clone)] struct MoveValue { @@ -654,6 +731,70 @@ mod tests { check_moved_values(kv_backend.clone(), &move_values).await; } + #[tokio::test] + async fn test_restore_chunks_by_total_txn_ops_limit() { + let inner = Arc::new(MemoryKvBackend::default()); + let kv_backend = Arc::new(TxnOpLimitKvBackend { + inner: inner.clone(), + max_txn_ops: 6, + }); + let tombstone_manager = TombstoneManager::new(kv_backend); + let kvs = HashMap::from([ + (b"bar".to_vec(), b"baz".to_vec()), + (b"foo".to_vec(), b"hi".to_vec()), + (b"baz".to_vec(), b"hello".to_vec()), + ]); + for (key, value) in &kvs { + inner + .put( + PutRequest::new() + .with_key(tombstone_manager.to_tombstone(key)) + .with_value(value.clone()), + ) + .await + .unwrap(); + } + + let restored = tombstone_manager + .restore(kvs.keys().cloned().collect()) + .await + .unwrap(); + + assert_eq!(kvs.len(), restored); + } + + #[tokio::test] + async fn test_create_chunks_by_total_txn_ops_limit() { + let inner = Arc::new(MemoryKvBackend::default()); + let kv_backend = Arc::new(TxnOpLimitKvBackend { + inner: inner.clone(), + max_txn_ops: 4, + }); + let tombstone_manager = TombstoneManager::new(kv_backend); + let kvs = HashMap::from([ + (b"bar".to_vec(), b"baz".to_vec()), + (b"foo".to_vec(), b"hi".to_vec()), + (b"baz".to_vec(), b"hello".to_vec()), + ]); + for (key, value) in &kvs { + inner + .put( + PutRequest::new() + .with_key(key.clone()) + .with_value(value.clone()), + ) + .await + .unwrap(); + } + + let moved = tombstone_manager + .create(kvs.keys().cloned().collect()) + .await + .unwrap(); + + assert_eq!(kvs.len(), moved); + } + #[tokio::test] async fn test_move_values_with_non_exists_values() { let kv_backend = Arc::new(MemoryKvBackend::default());