From 66a784b58a3315010ce64f60536bea03e7760b6f Mon Sep 17 00:00:00 2001 From: shuiyisong <113876041+shuiyisong@users.noreply.github.com> Date: Wed, 2 Jul 2025 21:21:57 -0700 Subject: [PATCH] fix: fix dest_keys chunks bug in TombstoneManager (#6432) (#6448) * fix(meta): fix dest_keys_chunks bug in TombstoneManager * chore: fix typo * fix: fix sqlness tests --------- Signed-off-by: WenyXu Co-authored-by: Weny Xu --- src/common/meta/src/key.rs | 5 +- src/common/meta/src/key/tombstone.rs | 189 ++++++++++++++++++++++----- src/flow/src/adapter.rs | 2 +- 3 files changed, 163 insertions(+), 33 deletions(-) diff --git a/src/common/meta/src/key.rs b/src/common/meta/src/key.rs index 72a93a2d94..f44787fce6 100644 --- a/src/common/meta/src/key.rs +++ b/src/common/meta/src/key.rs @@ -875,7 +875,10 @@ impl TableMetadataManager { ) -> Result<()> { let table_metadata_keys = self.table_metadata_keys(table_id, table_name, table_route_value, region_wal_options)?; - self.tombstone_manager.delete(table_metadata_keys).await + self.tombstone_manager + .delete(table_metadata_keys) + .await + .map(|_| ()) } /// Restores metadata for table. diff --git a/src/common/meta/src/key/tombstone.rs b/src/common/meta/src/key/tombstone.rs index 9aa2dd69ee..273ab0a895 100644 --- a/src/common/meta/src/key/tombstone.rs +++ b/src/common/meta/src/key/tombstone.rs @@ -14,19 +14,23 @@ use std::collections::HashMap; +use common_telemetry::debug; use snafu::ensure; use crate::error::{self, Result}; use crate::key::txn_helper::TxnOpGetResponseSet; use crate::kv_backend::txn::{Compare, CompareOp, Txn, TxnOp}; use crate::kv_backend::KvBackendRef; -use crate::rpc::store::BatchGetRequest; +use crate::rpc::store::{BatchDeleteRequest, BatchGetRequest}; /// [TombstoneManager] provides the ability to: /// - logically delete values /// - restore the deleted values pub(crate) struct TombstoneManager { kv_backend: KvBackendRef, + // Only used for testing. + #[cfg(test)] + max_txn_ops: Option, } const TOMBSTONE_PREFIX: &str = "__tombstone/"; @@ -38,7 +42,16 @@ fn to_tombstone(key: &[u8]) -> Vec { impl TombstoneManager { /// Returns [TombstoneManager]. pub fn new(kv_backend: KvBackendRef) -> Self { - Self { kv_backend } + Self { + kv_backend, + #[cfg(test)] + max_txn_ops: None, + } + } + + #[cfg(test)] + pub fn set_max_txn_ops(&mut self, max_txn_ops: usize) { + self.max_txn_ops = Some(max_txn_ops); } /// Moves value to `dest_key`. @@ -67,11 +80,15 @@ impl TombstoneManager { (txn, TxnOpGetResponseSet::filter(src_key)) } - async fn move_values_inner(&self, keys: &[Vec], dest_keys: &[Vec]) -> Result<()> { + async fn move_values_inner(&self, keys: &[Vec], dest_keys: &[Vec]) -> Result { ensure!( keys.len() == dest_keys.len(), error::UnexpectedSnafu { - err_msg: "The length of keys does not match the length of dest_keys." + err_msg: format!( + "The length of keys({}) does not match the length of dest_keys({}).", + keys.len(), + dest_keys.len() + ), } ); // The key -> dest key mapping. @@ -102,7 +119,7 @@ impl TombstoneManager { .unzip(); let mut resp = self.kv_backend.txn(Txn::merge_all(txns)).await?; if resp.succeeded { - return Ok(()); + return Ok(keys.len()); } let mut set = TxnOpGetResponseSet::from(&mut resp.responses); // Updates results. @@ -124,17 +141,45 @@ impl TombstoneManager { .fail() } - /// Moves values to `dest_key`. - async fn move_values(&self, keys: Vec>, dest_keys: Vec>) -> Result<()> { - let chunk_size = self.kv_backend.max_txn_ops() / 2; - if keys.len() > chunk_size { - let keys_chunks = keys.chunks(chunk_size).collect::>(); - let dest_keys_chunks = keys.chunks(chunk_size).collect::>(); - for (keys, dest_keys) in keys_chunks.into_iter().zip(dest_keys_chunks) { - self.move_values_inner(keys, dest_keys).await?; - } + fn max_txn_ops(&self) -> usize { + #[cfg(test)] + if let Some(max_txn_ops) = self.max_txn_ops { + return max_txn_ops; + } + self.kv_backend.max_txn_ops() + } - Ok(()) + /// Moves values to `dest_key`. + /// + /// Returns the number of keys that were moved. + async fn move_values(&self, keys: Vec>, dest_keys: Vec>) -> Result { + ensure!( + keys.len() == dest_keys.len(), + error::UnexpectedSnafu { + err_msg: format!( + "The length of keys({}) does not match the length of dest_keys({}).", + keys.len(), + dest_keys.len() + ), + } + ); + if keys.is_empty() { + return Ok(0); + } + let chunk_size = self.max_txn_ops() / 2; + if keys.len() > chunk_size { + debug!( + "Moving values with multiple chunks, keys len: {}, chunk_size: {}", + keys.len(), + chunk_size + ); + let mut moved_keys = 0; + let keys_chunks = keys.chunks(chunk_size).collect::>(); + let dest_keys_chunks = dest_keys.chunks(chunk_size).collect::>(); + for (keys, dest_keys) in keys_chunks.into_iter().zip(dest_keys_chunks) { + moved_keys += self.move_values_inner(keys, dest_keys).await?; + } + Ok(moved_keys) } else { self.move_values_inner(&keys, &dest_keys).await } @@ -154,7 +199,7 @@ impl TombstoneManager { }) .unzip(); - self.move_values(keys, dest_keys).await + self.move_values(keys, dest_keys).await.map(|_| ()) } /// Restores tombstones for keys. @@ -171,20 +216,22 @@ impl TombstoneManager { }) .unzip(); - self.move_values(keys, dest_keys).await + self.move_values(keys, dest_keys).await.map(|_| ()) } /// Deletes tombstones values for the specified `keys`. - pub(crate) async fn delete(&self, keys: Vec>) -> Result<()> { - let operations = keys - .iter() - .map(|key| TxnOp::Delete(to_tombstone(key))) - .collect::>(); + /// + /// Returns the number of keys that were deleted. + pub async fn delete(&self, keys: Vec>) -> Result { + let keys = keys.iter().map(|key| to_tombstone(key)).collect::>(); - let txn = Txn::new().and_then(operations); - // Always success. - let _ = self.kv_backend.txn(txn).await?; - Ok(()) + let num_keys = keys.len(); + let _ = self + .kv_backend + .batch_delete(BatchDeleteRequest::new().with_keys(keys)) + .await?; + + Ok(num_keys) } } @@ -373,16 +420,73 @@ mod tests { .into_iter() .map(|kv| (kv.key, kv.dest_key)) .unzip(); - tombstone_manager + let moved_keys = tombstone_manager .move_values(keys.clone(), dest_keys.clone()) .await .unwrap(); + assert_eq!(kvs.len(), moved_keys); check_moved_values(kv_backend.clone(), &move_values).await; // Moves again - tombstone_manager + let moved_keys = tombstone_manager .move_values(keys.clone(), dest_keys.clone()) .await .unwrap(); + assert_eq!(0, moved_keys); + check_moved_values(kv_backend.clone(), &move_values).await; + } + + #[tokio::test] + async fn test_move_values_with_max_txn_ops() { + common_telemetry::init_default_ut_logging(); + let kv_backend = Arc::new(MemoryKvBackend::default()); + let mut tombstone_manager = TombstoneManager::new(kv_backend.clone()); + tombstone_manager.set_max_txn_ops(4); + let kvs = HashMap::from([ + (b"bar".to_vec(), b"baz".to_vec()), + (b"foo".to_vec(), b"hi".to_vec()), + (b"baz".to_vec(), b"hello".to_vec()), + (b"qux".to_vec(), b"world".to_vec()), + (b"quux".to_vec(), b"world".to_vec()), + (b"quuux".to_vec(), b"world".to_vec()), + (b"quuuux".to_vec(), b"world".to_vec()), + (b"quuuuux".to_vec(), b"world".to_vec()), + (b"quuuuuux".to_vec(), b"world".to_vec()), + ]); + for (key, value) in &kvs { + kv_backend + .put( + PutRequest::new() + .with_key(key.clone()) + .with_value(value.clone()), + ) + .await + .unwrap(); + } + let move_values = kvs + .iter() + .map(|(key, value)| MoveValue { + key: key.clone(), + dest_key: to_tombstone(key), + value: value.clone(), + }) + .collect::>(); + let (keys, dest_keys): (Vec<_>, Vec<_>) = move_values + .clone() + .into_iter() + .map(|kv| (kv.key, kv.dest_key)) + .unzip(); + let moved_keys = tombstone_manager + .move_values(keys.clone(), dest_keys.clone()) + .await + .unwrap(); + assert_eq!(kvs.len(), moved_keys); + check_moved_values(kv_backend.clone(), &move_values).await; + // Moves again + let moved_keys = tombstone_manager + .move_values(keys.clone(), dest_keys.clone()) + .await + .unwrap(); + assert_eq!(0, moved_keys); check_moved_values(kv_backend.clone(), &move_values).await; } @@ -420,17 +524,19 @@ mod tests { .unzip(); keys.push(b"non-exists".to_vec()); dest_keys.push(b"hi/non-exists".to_vec()); - tombstone_manager + let moved_keys = tombstone_manager .move_values(keys.clone(), dest_keys.clone()) .await .unwrap(); check_moved_values(kv_backend.clone(), &move_values).await; + assert_eq!(3, moved_keys); // Moves again - tombstone_manager + let moved_keys = tombstone_manager .move_values(keys.clone(), dest_keys.clone()) .await .unwrap(); check_moved_values(kv_backend.clone(), &move_values).await; + assert_eq!(0, moved_keys); } #[tokio::test] @@ -471,10 +577,11 @@ mod tests { .into_iter() .map(|kv| (kv.key, kv.dest_key)) .unzip(); - tombstone_manager + let moved_keys = tombstone_manager .move_values(keys, dest_keys) .await .unwrap(); + assert_eq!(kvs.len(), moved_keys); } #[tokio::test] @@ -552,4 +659,24 @@ mod tests { .unwrap(); check_moved_values(kv_backend.clone(), &move_values).await; } + + #[tokio::test] + async fn test_move_values_with_different_lengths() { + let kv_backend = Arc::new(MemoryKvBackend::default()); + let tombstone_manager = TombstoneManager::new(kv_backend.clone()); + + let keys = vec![b"bar".to_vec(), b"foo".to_vec()]; + let dest_keys = vec![b"bar".to_vec(), b"foo".to_vec(), b"baz".to_vec()]; + + let err = tombstone_manager + .move_values(keys, dest_keys) + .await + .unwrap_err(); + assert!(err + .to_string() + .contains("The length of keys(2) does not match the length of dest_keys(3)."),); + + let moved_keys = tombstone_manager.move_values(vec![], vec![]).await.unwrap(); + assert_eq!(0, moved_keys); + } } diff --git a/src/flow/src/adapter.rs b/src/flow/src/adapter.rs index a613e3f83a..558328ae37 100644 --- a/src/flow/src/adapter.rs +++ b/src/flow/src/adapter.rs @@ -897,7 +897,7 @@ impl StreamingEngine { let rows_send = self.run_available(true).await?; let row = self.send_writeback_requests().await?; debug!( - "Done to flush flow_id={:?} with {} input rows flushed, {} rows sended and {} output rows flushed", + "Done to flush flow_id={:?} with {} input rows flushed, {} rows sent and {} output rows flushed", flow_id, flushed_input_rows, rows_send, row ); Ok(row)