From 9557b76224d17ee1fc1cfed1c529a71ba1bf2ea2 Mon Sep 17 00:00:00 2001 From: discord9 <55937128+discord9@users.noreply.github.com> Date: Thu, 24 Apr 2025 00:57:54 +0800 Subject: [PATCH] fix: try prune one less (#5965) * try prune one less * test: also not add one * ci: use longer fuzz time * revert fuzz time&per review * chore: no ( * docs: add explain to offset used in delete records * test: fix test_procedure_execution --- src/meta-srv/src/procedure/wal_prune.rs | 27 ++++++++++++------------- 1 file changed, 13 insertions(+), 14 deletions(-) diff --git a/src/meta-srv/src/procedure/wal_prune.rs b/src/meta-srv/src/procedure/wal_prune.rs index 0247c928ec..e9b6403942 100644 --- a/src/meta-srv/src/procedure/wal_prune.rs +++ b/src/meta-srv/src/procedure/wal_prune.rs @@ -335,22 +335,21 @@ impl WalPruneProcedure { })?; partition_client .delete_records( - (self.data.prunable_entry_id + 1) as i64, + // notice here no "+1" is needed because the offset arg is exclusive, and it's defensive programming just in case somewhere else have a off by one error, see https://kafka.apache.org/36/javadoc/org/apache/kafka/clients/consumer/KafkaConsumer.html#endOffsets(java.util.Collection) which we use to get the end offset from high watermark + self.data.prunable_entry_id as i64, DELETE_RECORDS_TIMEOUT.as_millis() as i32, ) .await .context(DeleteRecordsSnafu { topic: &self.data.topic, partition: DEFAULT_PARTITION, - offset: (self.data.prunable_entry_id + 1), + offset: self.data.prunable_entry_id, }) .map_err(BoxedError::new) .with_context(|_| error::RetryLaterWithSourceSnafu { reason: format!( "Failed to delete records for topic: {}, partition: {}, offset: {}", - self.data.topic, - DEFAULT_PARTITION, - self.data.prunable_entry_id + 1 + self.data.topic, DEFAULT_PARTITION, self.data.prunable_entry_id ), })?; info!( @@ -605,19 +604,19 @@ mod tests { // Step 3: Test `on_prune`. let status = procedure.on_prune().await.unwrap(); assert_matches!(status, Status::Done { output: None }); - // Check if the entry ids after `prunable_entry_id` still exist. - check_entry_id_existence( - procedure.context.client.clone(), - &topic_name, - procedure.data.prunable_entry_id as i64 + 1, - true, - ) - .await; - // Check if the entry s before `prunable_entry_id` are deleted. + // Check if the entry ids after(include) `prunable_entry_id` still exist. check_entry_id_existence( procedure.context.client.clone(), &topic_name, procedure.data.prunable_entry_id as i64, + true, + ) + .await; + // Check if the entry ids before `prunable_entry_id` are deleted. + check_entry_id_existence( + procedure.context.client.clone(), + &topic_name, + procedure.data.prunable_entry_id as i64 - 1, false, ) .await;