diff --git a/src/meta-srv/src/error.rs b/src/meta-srv/src/error.rs index a0f800f981..ba45b45b49 100644 --- a/src/meta-srv/src/error.rs +++ b/src/meta-srv/src/error.rs @@ -1142,8 +1142,19 @@ impl Error { ) || matches!( self, Error::DeallocateRegions { source, .. } if source.is_retry_later() + ) || matches!( + self, + Error::DeleteRecords { error, .. } + | Error::BuildPartitionClient { error, .. } + | Error::GetOffset { error, .. } + if Self::is_retryable_kafka_client_error(error) ) } + + /// Returns `true` if the Kafka client has exhausted its internal retry. + fn is_retryable_kafka_client_error(err: &rskafka::client::error::Error) -> bool { + matches!(err, rskafka::client::error::Error::RetryFailed(_)) + } } pub type Result = std::result::Result; @@ -1333,11 +1344,24 @@ pub(crate) fn match_for_io_error(err_status: &tonic::Status) -> Option<&std::io: #[cfg(test)] mod tests { + use std::time::Duration; + use common_error::mock::MockError; use common_error::status_code::StatusCode; + use rskafka::BackoffError; + use rskafka::client::error::Error as KafkaClientError; use snafu::ResultExt; - use super::DeallocateRegionsSnafu; + use super::{ + BuildPartitionClientSnafu, DeallocateRegionsSnafu, DeleteRecordsSnafu, GetOffsetSnafu, + }; + + fn retry_failed_kafka_error() -> KafkaClientError { + KafkaClientError::RetryFailed(BackoffError::DeadlineExceded { + deadline: Duration::from_secs(1), + source: Box::new(std::io::Error::other("retry failed")), + }) + } #[test] fn test_deallocate_regions_is_retryable_when_source_is_retry_later() { @@ -1361,4 +1385,56 @@ mod tests { assert!(!err.is_retryable()); } + + #[test] + fn test_kafka_retry_failed_errors_are_retryable() { + let delete_records_err = Err::<(), _>(retry_failed_kafka_error()) + .context(DeleteRecordsSnafu { + topic: "test_topic", + partition: 0, + offset: 1024u64, + }) + .unwrap_err(); + let build_partition_client_err = Err::<(), _>(retry_failed_kafka_error()) + .context(BuildPartitionClientSnafu { + topic: "test_topic", + partition: 0, + }) + .unwrap_err(); + let get_offset_err = Err::<(), _>(retry_failed_kafka_error()) + .context(GetOffsetSnafu { + topic: "test_topic", + }) + .unwrap_err(); + + assert!(delete_records_err.is_retryable()); + assert!(build_partition_client_err.is_retryable()); + assert!(get_offset_err.is_retryable()); + } + + #[test] + fn test_kafka_non_retry_failed_errors_are_not_retryable() { + let delete_records_err = Err::<(), _>(KafkaClientError::Timeout) + .context(DeleteRecordsSnafu { + topic: "test_topic", + partition: 0, + offset: 1024u64, + }) + .unwrap_err(); + let build_partition_client_err = Err::<(), _>(KafkaClientError::Timeout) + .context(BuildPartitionClientSnafu { + topic: "test_topic", + partition: 0, + }) + .unwrap_err(); + let get_offset_err = Err::<(), _>(KafkaClientError::Timeout) + .context(GetOffsetSnafu { + topic: "test_topic", + }) + .unwrap_err(); + + assert!(!delete_records_err.is_retryable()); + assert!(!build_partition_client_err.is_retryable()); + assert!(!get_offset_err.is_retryable()); + } } diff --git a/src/meta-srv/src/procedure/wal_prune.rs b/src/meta-srv/src/procedure/wal_prune.rs index f5e74ef543..6f64a141e9 100644 --- a/src/meta-srv/src/procedure/wal_prune.rs +++ b/src/meta-srv/src/procedure/wal_prune.rs @@ -105,8 +105,8 @@ impl WalPruneProcedure { /// Prune the WAL and persist the minimum prunable entry id. /// /// Retry: - /// - Failed to update the minimum prunable entry id in kvbackend. - /// - Failed to delete records. + /// - Kafka client errors that have exhausted rskafka's internal retry. + /// - Failed to update the pruned entry id in the table metadata manager. pub async fn on_prune(&mut self) -> Result { let partition_client = get_partition_client(&self.context.client, &self.data.topic).await?; let (earliest_offset, latest_offset) = @@ -125,14 +125,7 @@ impl WalPruneProcedure { &self.data.topic, self.data.prunable_entry_id, ) - .await - .map_err(BoxedError::new) - .with_context(|_| error::RetryLaterWithSourceSnafu { - reason: format!( - "Failed to delete records for topic: {}, prunable entry id: {}, latest offset: {}", - self.data.topic, self.data.prunable_entry_id, latest_offset - ), - })?; + .await?; // Update the pruned entry id for the topic. update_pruned_entry_id( diff --git a/typos.toml b/typos.toml index c2ce77b826..bb14c40bab 100644 --- a/typos.toml +++ b/typos.toml @@ -8,6 +8,9 @@ typs = "typs" unqualifed = "unqualifed" excluder = "excluder" consts = "consts" +# Upstream crate rskafka defines a `DeadlineExceded` error variant, but it is misspelled. +# We need to extend the word list to avoid typos in our codebase. +Exceded = "Exceded" [files] extend-exclude = [