fix: try prune one less (#5965)

* try prune one less

* test: also not add one

* ci: use longer fuzz time

* revert fuzz time&per review

* chore: no (

* docs: add explain to offset used in delete records

* test: fix test_procedure_execution
This commit is contained in:
discord9
2025-04-24 00:57:54 +08:00
committed by GitHub
parent a0900f5b90
commit 9557b76224

View File

@@ -335,22 +335,21 @@ impl WalPruneProcedure {
})?;
partition_client
.delete_records(
(self.data.prunable_entry_id + 1) as i64,
// notice here no "+1" is needed because the offset arg is exclusive, and it's defensive programming just in case somewhere else have a off by one error, see https://kafka.apache.org/36/javadoc/org/apache/kafka/clients/consumer/KafkaConsumer.html#endOffsets(java.util.Collection) which we use to get the end offset from high watermark
self.data.prunable_entry_id as i64,
DELETE_RECORDS_TIMEOUT.as_millis() as i32,
)
.await
.context(DeleteRecordsSnafu {
topic: &self.data.topic,
partition: DEFAULT_PARTITION,
offset: (self.data.prunable_entry_id + 1),
offset: self.data.prunable_entry_id,
})
.map_err(BoxedError::new)
.with_context(|_| error::RetryLaterWithSourceSnafu {
reason: format!(
"Failed to delete records for topic: {}, partition: {}, offset: {}",
self.data.topic,
DEFAULT_PARTITION,
self.data.prunable_entry_id + 1
self.data.topic, DEFAULT_PARTITION, self.data.prunable_entry_id
),
})?;
info!(
@@ -605,19 +604,19 @@ mod tests {
// Step 3: Test `on_prune`.
let status = procedure.on_prune().await.unwrap();
assert_matches!(status, Status::Done { output: None });
// Check if the entry ids after `prunable_entry_id` still exist.
check_entry_id_existence(
procedure.context.client.clone(),
&topic_name,
procedure.data.prunable_entry_id as i64 + 1,
true,
)
.await;
// Check if the entry s before `prunable_entry_id` are deleted.
// Check if the entry ids after(include) `prunable_entry_id` still exist.
check_entry_id_existence(
procedure.context.client.clone(),
&topic_name,
procedure.data.prunable_entry_id as i64,
true,
)
.await;
// Check if the entry ids before `prunable_entry_id` are deleted.
check_entry_id_existence(
procedure.context.client.clone(),
&topic_name,
procedure.data.prunable_entry_id as i64 - 1,
false,
)
.await;