fix(tombstone): chunk values by per-key txn ops instead of fixed divisor

Replaced the fixed `max_txn_ops() / 2` chunk size with operation-aware
constants (`MOVE_VALUE_TXN_OPS_PER_KEY=4`, `RESTORE_VALUE_TXN_OPS_PER_KEY=6`)
to correctly account for per-key transaction operations. Added
`TxnOpLimitKvBackend` test helper and two new tests
(`test_restore_chunks_by_total_txn_ops_limit`,
`test_create_chunks_by_total_txn_ops_limit`) verifying chunking under
tight txn op limits.

Affected file:
- `src/common/meta/src/key/tombstone.rs` — chunk size fix,
  `TxnOpLimitKvBackend` helper, two new tests

Signed-off-by: Lei, HUANG <ratuthomm@gmail.com>
Signed-off-by: Lei, HUANG <ratuthomm@gmail.com>
This commit is contained in:
Lei, HUANG
2026-07-02 14:17:54 +08:00
parent fb72cbd216
commit a6e116a3db

View File

@@ -59,6 +59,8 @@ pub struct TombstoneManager {
}
const TOMBSTONE_PREFIX: &str = "__tombstone/";
const MOVE_VALUE_TXN_OPS_PER_KEY: usize = 4;
const RESTORE_VALUE_TXN_OPS_PER_KEY: usize = 6;
impl TombstoneManager {
/// Returns [TombstoneManager].
@@ -292,7 +294,12 @@ impl TombstoneManager {
if keys.is_empty() {
return Ok(0);
}
let chunk_size = self.max_txn_ops() / 2;
let txn_ops_per_key = if require_dest_not_exists {
RESTORE_VALUE_TXN_OPS_PER_KEY
} else {
MOVE_VALUE_TXN_OPS_PER_KEY
};
let chunk_size = (self.max_txn_ops() / txn_ops_per_key).max(1);
if keys.len() > chunk_size {
debug!(
"Moving values with multiple chunks, keys len: {}, chunk_size: {}",
@@ -374,14 +381,84 @@ impl TombstoneManager {
#[cfg(test)]
mod tests {
use std::any::Any;
use std::collections::HashMap;
use std::sync::Arc;
use crate::error::Error;
use crate::error::{Error, Result};
use crate::key::tombstone::TombstoneManager;
use crate::kv_backend::KvBackend;
use crate::kv_backend::memory::MemoryKvBackend;
use crate::rpc::store::PutRequest;
use crate::kv_backend::txn::{Txn, TxnRequest, TxnResponse};
use crate::kv_backend::{KvBackend, TxnService};
use crate::rpc::store::{
BatchDeleteRequest, BatchDeleteResponse, BatchGetRequest, BatchGetResponse,
BatchPutRequest, BatchPutResponse, DeleteRangeRequest, DeleteRangeResponse, PutRequest,
PutResponse, RangeRequest, RangeResponse,
};
struct TxnOpLimitKvBackend {
inner: Arc<MemoryKvBackend<Error>>,
max_txn_ops: usize,
}
#[async_trait::async_trait]
impl TxnService for TxnOpLimitKvBackend {
type Error = Error;
async fn txn(&self, txn: Txn) -> Result<TxnResponse> {
let TxnRequest {
compare,
success,
failure,
} = txn.req();
let txn_ops = compare.len() + success.len() + failure.len();
assert!(
txn_ops <= self.max_txn_ops,
"txn ops {txn_ops} exceeds limit {}",
self.max_txn_ops
);
self.inner.txn(txn).await
}
fn max_txn_ops(&self) -> usize {
self.max_txn_ops
}
}
#[async_trait::async_trait]
impl KvBackend for TxnOpLimitKvBackend {
fn name(&self) -> &str {
"txn_op_limit"
}
fn as_any(&self) -> &dyn Any {
self
}
async fn range(&self, req: RangeRequest) -> Result<RangeResponse> {
self.inner.range(req).await
}
async fn put(&self, req: PutRequest) -> Result<PutResponse> {
self.inner.put(req).await
}
async fn batch_put(&self, req: BatchPutRequest) -> Result<BatchPutResponse> {
self.inner.batch_put(req).await
}
async fn batch_get(&self, req: BatchGetRequest) -> Result<BatchGetResponse> {
self.inner.batch_get(req).await
}
async fn delete_range(&self, req: DeleteRangeRequest) -> Result<DeleteRangeResponse> {
self.inner.delete_range(req).await
}
async fn batch_delete(&self, req: BatchDeleteRequest) -> Result<BatchDeleteResponse> {
self.inner.batch_delete(req).await
}
}
#[derive(Debug, Clone)]
struct MoveValue {
@@ -654,6 +731,70 @@ mod tests {
check_moved_values(kv_backend.clone(), &move_values).await;
}
#[tokio::test]
async fn test_restore_chunks_by_total_txn_ops_limit() {
let inner = Arc::new(MemoryKvBackend::default());
let kv_backend = Arc::new(TxnOpLimitKvBackend {
inner: inner.clone(),
max_txn_ops: 6,
});
let tombstone_manager = TombstoneManager::new(kv_backend);
let kvs = HashMap::from([
(b"bar".to_vec(), b"baz".to_vec()),
(b"foo".to_vec(), b"hi".to_vec()),
(b"baz".to_vec(), b"hello".to_vec()),
]);
for (key, value) in &kvs {
inner
.put(
PutRequest::new()
.with_key(tombstone_manager.to_tombstone(key))
.with_value(value.clone()),
)
.await
.unwrap();
}
let restored = tombstone_manager
.restore(kvs.keys().cloned().collect())
.await
.unwrap();
assert_eq!(kvs.len(), restored);
}
#[tokio::test]
async fn test_create_chunks_by_total_txn_ops_limit() {
let inner = Arc::new(MemoryKvBackend::default());
let kv_backend = Arc::new(TxnOpLimitKvBackend {
inner: inner.clone(),
max_txn_ops: 4,
});
let tombstone_manager = TombstoneManager::new(kv_backend);
let kvs = HashMap::from([
(b"bar".to_vec(), b"baz".to_vec()),
(b"foo".to_vec(), b"hi".to_vec()),
(b"baz".to_vec(), b"hello".to_vec()),
]);
for (key, value) in &kvs {
inner
.put(
PutRequest::new()
.with_key(key.clone())
.with_value(value.clone()),
)
.await
.unwrap();
}
let moved = tombstone_manager
.create(kvs.keys().cloned().collect())
.await
.unwrap();
assert_eq!(kvs.len(), moved);
}
#[tokio::test]
async fn test_move_values_with_non_exists_values() {
let kv_backend = Arc::new(MemoryKvBackend::default());