diff --git a/src/auth/src/error.rs b/src/auth/src/error.rs index a8dfe7f629..2b339b8655 100644 --- a/src/auth/src/error.rs +++ b/src/auth/src/error.rs @@ -12,7 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -use common_error::ext::{BoxedError, ErrorExt}; +use common_error::ext::{BoxedError, ErrorExt, RetryHint, retry_hint_from_io_error}; use common_error::status_code::StatusCode; use common_macro::stack_trace_debug; use snafu::{Location, Snafu}; @@ -111,6 +111,14 @@ impl ErrorExt for Error { fn as_any(&self) -> &dyn std::any::Any { self } + + fn retry_hint(&self) -> RetryHint { + match self { + Error::Io { error, .. } => retry_hint_from_io_error(error), + Error::AuthBackend { source, .. } => source.retry_hint(), + _ => RetryHint::NonRetryable, + } + } } pub type Result = std::result::Result; diff --git a/src/cli/src/data/export_v2/error.rs b/src/cli/src/data/export_v2/error.rs index e16e3a6176..078f5bd2e1 100644 --- a/src/cli/src/data/export_v2/error.rs +++ b/src/cli/src/data/export_v2/error.rs @@ -14,9 +14,10 @@ use std::any::Any; -use common_error::ext::ErrorExt; +use common_error::ext::{ErrorExt, RetryHint}; use common_error::status_code::StatusCode; use common_macro::stack_trace_debug; +use object_store::error::retry_hint_from_opendal_error; use snafu::{Location, Snafu}; #[derive(Snafu)] @@ -243,4 +244,14 @@ impl ErrorExt for Error { fn as_any(&self) -> &dyn Any { self } + + fn retry_hint(&self) -> RetryHint { + match self { + Error::StorageOperation { error, .. } | Error::BuildObjectStore { error, .. } => { + retry_hint_from_opendal_error(error) + } + Error::Database { error, .. } => error.retry_hint(), + _ => RetryHint::NonRetryable, + } + } } diff --git a/src/cli/src/data/import_v2/error.rs b/src/cli/src/data/import_v2/error.rs index 165f1c0118..967a62bd83 100644 --- a/src/cli/src/data/import_v2/error.rs +++ b/src/cli/src/data/import_v2/error.rs @@ -14,7 +14,7 @@ use std::any::Any; -use common_error::ext::ErrorExt; +use common_error::ext::{ErrorExt, RetryHint, retry_hint_from_io_error}; use common_error::status_code::StatusCode; use common_macro::stack_trace_debug; use snafu::{Location, Snafu}; @@ -216,6 +216,25 @@ impl ErrorExt for Error { } } + fn retry_hint(&self) -> RetryHint { + match self { + #[cfg(test)] + Error::TestTaskFailed { retryable, .. } => { + if *retryable { + RetryHint::Retryable + } else { + RetryHint::NonRetryable + } + } + Error::Database { error, .. } => error.retry_hint(), + Error::SnapshotStorage { error, .. } | Error::ChunkImportFailed { error, .. } => { + error.retry_hint() + } + Error::ImportStateIo { error, .. } => retry_hint_from_io_error(error), + _ => RetryHint::NonRetryable, + } + } + fn as_any(&self) -> &dyn Any { self } diff --git a/src/client/src/database.rs b/src/client/src/database.rs index 6616962838..384a6ad4c2 100644 --- a/src/client/src/database.rs +++ b/src/client/src/database.rs @@ -868,13 +868,17 @@ mod tests { use api::v1::auth_header::AuthScheme; use api::v1::{AuthHeader, Basic}; + use common_error::ext::{ErrorExt, RetryHint}; use common_error::status_code::StatusCode; + use common_error::{GREPTIME_DB_HEADER_ERROR_CODE, GREPTIME_DB_HEADER_ERROR_RETRY_HINT}; use common_query::OutputData; use common_recordbatch::{OrderOption, RecordBatch, RecordBatchStream}; use datatypes::prelude::{ConcreteDataType, VectorRef}; use datatypes::schema::{ColumnSchema, Schema}; use datatypes::vectors::Int32Vector; use futures_util::StreamExt; + use tonic::codegen::http::{HeaderMap, HeaderValue}; + use tonic::metadata::MetadataMap; use tonic::{Code, Status}; use super::*; @@ -1008,6 +1012,7 @@ mod tests { code: StatusCode::Internal, msg: "blabla".to_string(), tonic_code: Code::Internal, + retry_hint: RetryHint::NonRetryable, } .build(); @@ -1015,6 +1020,52 @@ mod tests { let actual: Error = status.into(); assert_eq!(expected.to_string(), actual.to_string()); + assert_eq!(expected.retry_hint(), actual.retry_hint()); + assert_eq!(expected.should_retry(), actual.should_retry()); + } + + #[test] + fn test_from_tonic_status_with_retry_hint() { + let mut headers = HeaderMap::new(); + headers.insert( + GREPTIME_DB_HEADER_ERROR_CODE, + HeaderValue::from(StatusCode::Internal as u32), + ); + headers.insert( + GREPTIME_DB_HEADER_ERROR_RETRY_HINT, + HeaderValue::from_static(RetryHint::Retryable.as_str()), + ); + let status = + Status::with_metadata(Code::Internal, "blabla", MetadataMap::from_headers(headers)); + + let actual: Error = status.into(); + + assert_eq!(actual.retry_hint(), RetryHint::Retryable); + assert!(actual.should_retry()); + } + + #[test] + fn test_from_tonic_status_fallback() { + let mut headers = HeaderMap::new(); + headers.insert( + GREPTIME_DB_HEADER_ERROR_CODE, + HeaderValue::from(StatusCode::InvalidArguments as u32), + ); + let status = + Status::with_metadata(Code::Internal, "blabla", MetadataMap::from_headers(headers)); + + let actual: Error = status.into(); + + assert_eq!(actual.retry_hint(), RetryHint::NonRetryable); + assert!(!actual.should_retry()); + } + + #[test] + fn test_should_retry_preserves_transport_retry() { + let status = Status::new(Code::Unavailable, "blabla"); + let actual: Error = status.into(); + + assert!(actual.should_retry()); } #[tokio::test] diff --git a/src/client/src/error.rs b/src/client/src/error.rs index aa6940dd8b..1db0858f08 100644 --- a/src/client/src/error.rs +++ b/src/client/src/error.rs @@ -15,7 +15,7 @@ use std::any::Any; use common_error::define_from_tonic_status; -use common_error::ext::{BoxedError, ErrorExt}; +use common_error::ext::{BoxedError, ErrorExt, RetryHint}; use common_error::status_code::StatusCode; use common_macro::stack_trace_debug; use snafu::{Location, Snafu}; @@ -130,6 +130,7 @@ pub enum Error { code: StatusCode, msg: String, tonic_code: Code, + retry_hint: RetryHint, #[snafu(implicit)] location: Location, }, @@ -168,6 +169,21 @@ impl ErrorExt for Error { fn as_any(&self) -> &dyn Any { self } + + fn retry_hint(&self) -> RetryHint { + match self { + Error::Tonic { retry_hint, .. } => *retry_hint, + Error::FlightGet { source, .. } + | Error::RegionServer { source, .. } + | Error::FlowServer { source, .. } + | Error::External { source, .. } => source.retry_hint(), + Error::ConvertFlightData { source, .. } + | Error::CreateChannel { source, .. } + | Error::CreateTlsChannel { source, .. } => source.retry_hint(), + Error::ConvertSchema { source, .. } => source.retry_hint(), + _ => RetryHint::NonRetryable, + } + } } define_from_tonic_status!(Error, Tonic); @@ -194,7 +210,8 @@ impl Error { } pub fn should_retry(&self) -> bool { - self.is_connection_error() + self.retry_hint().is_retryable() + || self.is_connection_error() || matches!( self.tonic_code(), Some(Code::Cancelled) | Some(Code::DeadlineExceeded) diff --git a/src/common/datasource/src/error.rs b/src/common/datasource/src/error.rs index 4b514b5b20..3c68bbc147 100644 --- a/src/common/datasource/src/error.rs +++ b/src/common/datasource/src/error.rs @@ -15,10 +15,11 @@ use std::any::Any; use arrow_schema::ArrowError; -use common_error::ext::ErrorExt; +use common_error::ext::{ErrorExt, RetryHint, retry_hint_from_io_error}; use common_error::status_code::StatusCode; use common_macro::stack_trace_debug; use datafusion::parquet::errors::ParquetError; +use object_store::error::retry_hint_from_opendal_error; use snafu::{Location, Snafu}; use url::ParseError; @@ -250,4 +251,18 @@ impl ErrorExt for Error { fn as_any(&self) -> &dyn Any { self } + + fn retry_hint(&self) -> RetryHint { + use Error::*; + + match self { + BuildBackend { error, .. } + | ListObjects { error, .. } + | ReadObject { error, .. } + | WriteObject { error, .. } => retry_hint_from_opendal_error(error), + AsyncWrite { error, .. } => retry_hint_from_io_error(error), + WriteParquet { .. } => RetryHint::Retryable, + _ => RetryHint::NonRetryable, + } + } } diff --git a/src/common/error/src/ext.rs b/src/common/error/src/ext.rs index edd49fed35..aa3213c4d2 100644 --- a/src/common/error/src/ext.rs +++ b/src/common/error/src/ext.rs @@ -14,12 +14,83 @@ use std::any::Any; use std::fmt::{Debug, Formatter}; +use std::io::ErrorKind; +use std::str::FromStr; use std::sync::Arc; use snafu::{FromString, Snafu}; use crate::status_code::StatusCode; +/// Describes whether an error instance is safe and useful to retry. +/// +/// This is intentionally separate from [`StatusCode`]: status code describes the +/// error category exposed to users, while retry hint describes retry policy for +/// this specific error instance. +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +pub enum RetryHint { + /// The operation may succeed if retried later. + Retryable, + /// Retrying the same operation is not expected to help. + /// + /// This is the default for errors that do not explicitly opt in to retry. + NonRetryable, +} + +const RETRY_HINT_RETRYABLE: &str = "retryable"; +const RETRY_HINT_NON_RETRYABLE: &str = "non_retryable"; + +impl RetryHint { + pub fn is_retryable(self) -> bool { + matches!(self, RetryHint::Retryable) + } + + pub fn as_str(self) -> &'static str { + match self { + RetryHint::Retryable => RETRY_HINT_RETRYABLE, + RetryHint::NonRetryable => RETRY_HINT_NON_RETRYABLE, + } + } +} + +impl FromStr for RetryHint { + type Err = (); + + fn from_str(s: &str) -> Result { + match s { + RETRY_HINT_RETRYABLE => Ok(RetryHint::Retryable), + RETRY_HINT_NON_RETRYABLE => Ok(RetryHint::NonRetryable), + _ => Err(()), + } + } +} + +/// Converts a [`std::io::Error`] into a conservative [`RetryHint`]. +/// +/// This helper classifies known transient I/O conditions as retryable and treats +/// request, permission, filesystem-capacity, and data-shape errors as +/// non-retryable. `std::io::ErrorKind` is non-exhaustive, so future or +/// unclassified kinds are considered non-retryable until reviewed explicitly. +pub fn retry_hint_from_io_error(error: &std::io::Error) -> RetryHint { + match error.kind() { + ErrorKind::ConnectionRefused + | ErrorKind::ConnectionReset + | ErrorKind::HostUnreachable + | ErrorKind::NetworkUnreachable + | ErrorKind::ConnectionAborted + | ErrorKind::NotConnected + | ErrorKind::NetworkDown + | ErrorKind::BrokenPipe + | ErrorKind::WouldBlock + | ErrorKind::StaleNetworkFileHandle + | ErrorKind::TimedOut + | ErrorKind::ResourceBusy + | ErrorKind::Interrupted => RetryHint::Retryable, + + _ => RetryHint::NonRetryable, + } +} + /// Extension to [`Error`](std::error::Error) in std. pub trait ErrorExt: StackError { /// Map this error to [StatusCode]. @@ -27,6 +98,23 @@ pub trait ErrorExt: StackError { StatusCode::Unknown } + /// Returns the retry hint for this error instance. + /// + /// Implementations should return [`RetryHint::Retryable`] only when retrying the + /// same operation may succeed without changing the request. The default is + /// [`RetryHint::NonRetryable`] to avoid accidental retry loops. + fn retry_hint(&self) -> RetryHint { + RetryHint::NonRetryable + } + + /// Returns whether this error instance is marked retryable. + /// + /// This is derived from [`Self::retry_hint`]. Transport-level retries, such as a + /// gRPC `Unavailable`, may still be handled separately by client code. + fn is_retryable(&self) -> bool { + self.retry_hint().is_retryable() + } + /// Returns the error as [Any](std::any::Any) so that it can be /// downcast to a specific implementation. fn as_any(&self) -> &dyn Any; @@ -194,6 +282,10 @@ impl crate::ext::ErrorExt for BoxedError { self.inner.status_code() } + fn retry_hint(&self) -> RetryHint { + self.inner.retry_hint() + } + fn as_any(&self) -> &dyn std::any::Any { self.inner.as_any() } diff --git a/src/common/error/src/lib.rs b/src/common/error/src/lib.rs index 9b6facda2c..eeedc63713 100644 --- a/src/common/error/src/lib.rs +++ b/src/common/error/src/lib.rs @@ -27,6 +27,7 @@ use crate::status_code::StatusCode; // please define in `src/servers/src/http/header.rs`. pub const GREPTIME_DB_HEADER_ERROR_CODE: &str = "x-greptime-err-code"; pub const GREPTIME_DB_HEADER_ERROR_MSG: &str = "x-greptime-err-msg"; +pub const GREPTIME_DB_HEADER_ERROR_RETRY_HINT: &str = "x-greptime-err-retry-hint"; /// Create a http header map from error code and message. /// using `GREPTIME_DB_HEADER_ERROR_CODE` and `GREPTIME_DB_HEADER_ERROR_MSG` as keys. diff --git a/src/common/error/src/status_code.rs b/src/common/error/src/status_code.rs index 4dc5a0398e..a299b18994 100644 --- a/src/common/error/src/status_code.rs +++ b/src/common/error/src/status_code.rs @@ -134,54 +134,6 @@ impl StatusCode { Self::Success as u32 == code } - /// Returns `true` if the error with this code is retryable. - pub fn is_retryable(&self) -> bool { - match self { - StatusCode::StorageUnavailable - | StatusCode::RuntimeResourcesExhausted - | StatusCode::Internal - | StatusCode::RegionNotReady - | StatusCode::TableUnavailable - | StatusCode::RegionBusy => true, - - StatusCode::Success - | StatusCode::Unknown - | StatusCode::Unsupported - | StatusCode::IllegalState - | StatusCode::Unexpected - | StatusCode::InvalidArguments - | StatusCode::Cancelled - | StatusCode::DeadlineExceeded - | StatusCode::InvalidSyntax - | StatusCode::DatabaseAlreadyExists - | StatusCode::PlanQuery - | StatusCode::EngineExecuteQuery - | StatusCode::TableAlreadyExists - | StatusCode::TableNotFound - | StatusCode::RegionAlreadyExists - | StatusCode::RegionNotFound - | StatusCode::FlowAlreadyExists - | StatusCode::FlowNotFound - | StatusCode::TriggerAlreadyExists - | StatusCode::TriggerNotFound - | StatusCode::RegionReadonly - | StatusCode::TableColumnNotFound - | StatusCode::TableColumnExists - | StatusCode::DatabaseNotFound - | StatusCode::RateLimited - | StatusCode::UserNotFound - | StatusCode::UnsupportedPasswordType - | StatusCode::UserPasswordMismatch - | StatusCode::AuthHeaderNotFound - | StatusCode::InvalidAuthHeader - | StatusCode::AccessDenied - | StatusCode::PermissionDenied - | StatusCode::RequestOutdated - | StatusCode::External - | StatusCode::Suspended => false, - } - } - /// Returns `true` if we should print an error log for an error with /// this status code. pub fn should_log_error(&self) -> bool { @@ -271,12 +223,16 @@ macro_rules! define_from_tonic_status { let msg = metadata_value(&e, $crate::GREPTIME_DB_HEADER_ERROR_MSG) .unwrap_or_else(|| e.message().to_string()); + let retry_hint = metadata_value(&e, $crate::GREPTIME_DB_HEADER_ERROR_RETRY_HINT) + .and_then(|s| s.parse().ok()) + .unwrap_or($crate::ext::RetryHint::NonRetryable); // TODO(LFC): Make the error variant defined automatically. Self::$Variant { code, msg, tonic_code: e.code(), + retry_hint, location: location!(), } } @@ -291,11 +247,13 @@ macro_rules! define_into_tonic_status { fn from(err: $Error) -> Self { use tonic::codegen::http::{HeaderMap, HeaderValue}; use tonic::metadata::MetadataMap; - use $crate::GREPTIME_DB_HEADER_ERROR_CODE; + use $crate::{ + GREPTIME_DB_HEADER_ERROR_CODE, GREPTIME_DB_HEADER_ERROR_RETRY_HINT, + }; common_telemetry::error!(err; "Failed to handle request"); - let mut headers = HeaderMap::::with_capacity(2); + let mut headers = HeaderMap::::with_capacity(3); // If either of the status_code or error msg cannot convert to valid HTTP header value // (which is a very rare case), just ignore. Client will use Tonic status code and message. @@ -304,6 +262,10 @@ macro_rules! define_into_tonic_status { GREPTIME_DB_HEADER_ERROR_CODE, HeaderValue::from(status_code as u32), ); + headers.insert( + GREPTIME_DB_HEADER_ERROR_RETRY_HINT, + HeaderValue::from_static(err.retry_hint().as_str()), + ); let root_error = err.output_msg(); let metadata = MetadataMap::from_headers(headers); diff --git a/src/common/error/tests/ext.rs b/src/common/error/tests/ext.rs index c9e6137729..8be8afa474 100644 --- a/src/common/error/tests/ext.rs +++ b/src/common/error/tests/ext.rs @@ -13,8 +13,14 @@ // limitations under the License. use std::any::Any; +use std::fmt::{Display, Formatter}; +use std::io::{Error as IoError, ErrorKind}; +use std::str::FromStr; -use common_error::ext::{ErrorExt, PlainError, StackError, WhateverResult}; +use common_error::ext::{ + BoxedError, ErrorExt, PlainError, RetryHint, StackError, WhateverResult, + retry_hint_from_io_error, +}; use common_error::status_code::StatusCode; use common_macro::stack_trace_debug; use snafu::{Location, ResultExt, Snafu}; @@ -70,8 +76,9 @@ fn test_into_whatever_error() { let whatever = f(normal_error).unwrap_err(); assert_eq!( normalize_path(&whatever.to_string()), + // The location points to the `NormalSnafu` context in `normal_error()`. format!( - r#"0: A normal error with "display" attribute, message "blabla", at {}:55:22 + r#"0: A normal error with "display" attribute, message "blabla", at {}:61:22 1: PlainError {{ msg: "", status_code: Unexpected }}"#, normalize_path(file!()) ) @@ -80,8 +87,9 @@ fn test_into_whatever_error() { let whatever = f(transparent_error).unwrap_err(); assert_eq!( normalize_path(&whatever.to_string()), + // The location points to the transparent `?` return in `transparent_error()`. format!( - r#"0: , at {}:60:5 + r#"0: , at {}:66:5 1: PlainError {{ msg: "", status_code: Unexpected }}"#, normalize_path(file!()) ) @@ -123,8 +131,9 @@ fn test_debug_format() { assert_eq!( normalize_path(&debug_output), + // The location points to the `NormalSnafu` context in `normal_error()`. format!( - r#"0: A normal error with "display" attribute, message "blabla", at {}:55:22 + r#"0: A normal error with "display" attribute, message "blabla", at {}:61:22 1: PlainError {{ msg: "", status_code: Unexpected }}"#, normalize_path(file!()) ) @@ -134,8 +143,9 @@ fn test_debug_format() { let debug_output = format!("{:?}", result.unwrap_err()); assert_eq!( normalize_path(&debug_output), + // The location points to the transparent `?` return in `transparent_error()`. format!( - r#"0: , at {}:60:5 + r#"0: , at {}:66:5 1: PlainError {{ msg: "", status_code: Unexpected }}"#, normalize_path(file!()) ) @@ -150,3 +160,134 @@ fn test_transparent_flag() { let result = transparent_error(); assert!(result.unwrap_err().transparent()); } + +#[test] +fn test_retry_hint_helpers() { + assert!(RetryHint::Retryable.is_retryable()); + assert!(!RetryHint::NonRetryable.is_retryable()); + + assert_eq!(RetryHint::Retryable.as_str(), "retryable"); + assert_eq!(RetryHint::NonRetryable.as_str(), "non_retryable"); + + assert_eq!( + RetryHint::from_str("retryable").unwrap(), + RetryHint::Retryable + ); + assert_eq!( + RetryHint::from_str("non_retryable").unwrap(), + RetryHint::NonRetryable + ); + assert!(RetryHint::from_str("unknown").is_err()); +} + +#[test] +fn test_retry_hint_default() { + let err = normal_error().unwrap_err(); + assert_eq!(err.retry_hint(), RetryHint::NonRetryable); + assert!(!err.is_retryable()); +} + +#[test] +fn test_boxed_error_retry_hint() { + let err = BoxedError::new(RetryableError); + + assert_eq!(err.retry_hint(), RetryHint::Retryable); + assert!(err.is_retryable()); +} + +#[test] +fn test_retry_hint_from_io_error() { + let retryable_kinds = [ + ErrorKind::ConnectionRefused, + ErrorKind::ConnectionReset, + ErrorKind::HostUnreachable, + ErrorKind::NetworkUnreachable, + ErrorKind::ConnectionAborted, + ErrorKind::NotConnected, + ErrorKind::NetworkDown, + ErrorKind::BrokenPipe, + ErrorKind::WouldBlock, + ErrorKind::StaleNetworkFileHandle, + ErrorKind::TimedOut, + ErrorKind::ResourceBusy, + ErrorKind::Interrupted, + ]; + + for kind in retryable_kinds { + let error = IoError::from(kind); + assert_eq!( + retry_hint_from_io_error(&error), + RetryHint::Retryable, + "{kind:?}" + ); + } + + let non_retryable_kinds = [ + ErrorKind::NotFound, + ErrorKind::PermissionDenied, + ErrorKind::AddrInUse, + ErrorKind::AddrNotAvailable, + ErrorKind::AlreadyExists, + ErrorKind::NotADirectory, + ErrorKind::IsADirectory, + ErrorKind::DirectoryNotEmpty, + ErrorKind::ReadOnlyFilesystem, + ErrorKind::InvalidInput, + ErrorKind::InvalidData, + ErrorKind::WriteZero, + ErrorKind::StorageFull, + ErrorKind::NotSeekable, + ErrorKind::QuotaExceeded, + ErrorKind::FileTooLarge, + ErrorKind::ExecutableFileBusy, + ErrorKind::Deadlock, + ErrorKind::CrossesDevices, + ErrorKind::TooManyLinks, + ErrorKind::InvalidFilename, + ErrorKind::ArgumentListTooLong, + ErrorKind::Unsupported, + ErrorKind::UnexpectedEof, + ErrorKind::OutOfMemory, + ErrorKind::Other, + ]; + + for kind in non_retryable_kinds { + let error = IoError::from(kind); + assert_eq!( + retry_hint_from_io_error(&error), + RetryHint::NonRetryable, + "{kind:?}" + ); + } +} + +#[derive(Debug)] +struct RetryableError; + +impl Display for RetryableError { + fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { + write!(f, "retryable error") + } +} + +impl std::error::Error for RetryableError {} + +impl StackError for RetryableError { + fn debug_fmt(&self, layer: usize, buf: &mut Vec) { + buf.push(format!("{}: retryable error", layer)) + } + + fn next(&self) -> Option<&dyn StackError> { + None + } +} + +impl ErrorExt for RetryableError { + fn retry_hint(&self) -> RetryHint { + RetryHint::Retryable + } + + fn as_any(&self) -> &dyn Any { + self + } +} diff --git a/src/common/frontend/src/error.rs b/src/common/frontend/src/error.rs index 19fb29aeb7..9961ed1879 100644 --- a/src/common/frontend/src/error.rs +++ b/src/common/frontend/src/error.rs @@ -12,7 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -use common_error::ext::{BoxedError, ErrorExt}; +use common_error::ext::{BoxedError, ErrorExt, RetryHint}; use common_error::status_code::StatusCode; use common_macro::stack_trace_debug; use snafu::{Location, Snafu}; @@ -75,4 +75,15 @@ impl ErrorExt for Error { fn as_any(&self) -> &dyn std::any::Any { self } + + fn retry_hint(&self) -> RetryHint { + use Error::*; + + match self { + External { source, .. } => source.retry_hint(), + Meta { source, .. } => source.retry_hint(), + CreateChannel { source, .. } => source.retry_hint(), + _ => RetryHint::NonRetryable, + } + } } diff --git a/src/common/grpc/src/error.rs b/src/common/grpc/src/error.rs index 1d987514df..26cef0902c 100644 --- a/src/common/grpc/src/error.rs +++ b/src/common/grpc/src/error.rs @@ -15,7 +15,7 @@ use std::any::Any; use std::io; -use common_error::ext::ErrorExt; +use common_error::ext::{ErrorExt, RetryHint, retry_hint_from_io_error}; use common_error::status_code::StatusCode; use common_macro::stack_trace_debug; use datatypes::arrow::error::ArrowError; @@ -133,4 +133,20 @@ impl ErrorExt for Error { fn as_any(&self) -> &dyn Any { self } + + fn retry_hint(&self) -> RetryHint { + match self { + Error::CreateChannel { .. } => RetryHint::Retryable, + Error::Arrow { error, .. } => retry_hint_from_arrow_error(error), + Error::FileWatch { source, .. } => source.retry_hint(), + _ => RetryHint::NonRetryable, + } + } +} + +fn retry_hint_from_arrow_error(error: &ArrowError) -> RetryHint { + match error { + ArrowError::IoError(_, error) => retry_hint_from_io_error(error), + _ => RetryHint::NonRetryable, + } } diff --git a/src/common/mem-prof/src/jemalloc/error.rs b/src/common/mem-prof/src/jemalloc/error.rs index 79e4b8f9a6..5e6442e0cf 100644 --- a/src/common/mem-prof/src/jemalloc/error.rs +++ b/src/common/mem-prof/src/jemalloc/error.rs @@ -15,7 +15,7 @@ use std::any::Any; use std::path::PathBuf; -use common_error::ext::{BoxedError, ErrorExt}; +use common_error::ext::{BoxedError, ErrorExt, RetryHint, retry_hint_from_io_error}; use common_error::status_code::StatusCode; use common_macro::stack_trace_debug; use snafu::{Location, Snafu}; @@ -101,6 +101,14 @@ impl ErrorExt for Error { } } + fn retry_hint(&self) -> RetryHint { + match self { + Error::OpenTempFile { error, .. } => retry_hint_from_io_error(error), + Error::DumpProfileData { .. } => RetryHint::Retryable, + _ => RetryHint::NonRetryable, + } + } + fn as_any(&self) -> &dyn Any { self } diff --git a/src/common/memory-manager/src/error.rs b/src/common/memory-manager/src/error.rs index 455b3c6a6d..f164c2fbf2 100644 --- a/src/common/memory-manager/src/error.rs +++ b/src/common/memory-manager/src/error.rs @@ -15,7 +15,7 @@ use std::any::Any; use std::time::Duration; -use common_error::ext::ErrorExt; +use common_error::ext::{ErrorExt, RetryHint}; use common_error::status_code::StatusCode; use common_macro::stack_trace_debug; use snafu::Snafu; @@ -60,4 +60,13 @@ impl ErrorExt for Error { fn as_any(&self) -> &dyn Any { self } + + fn retry_hint(&self) -> RetryHint { + match self { + Error::MemoryLimitExceeded { .. } | Error::MemoryAcquireTimeout { .. } => { + RetryHint::Retryable + } + _ => RetryHint::NonRetryable, + } + } } diff --git a/src/common/meta/src/error.rs b/src/common/meta/src/error.rs index fe8067fa13..881887feff 100644 --- a/src/common/meta/src/error.rs +++ b/src/common/meta/src/error.rs @@ -15,10 +15,12 @@ use std::str::Utf8Error; use std::sync::Arc; -use common_error::ext::{BoxedError, ErrorExt}; +use common_error::ext::{BoxedError, ErrorExt, RetryHint}; use common_error::status_code::StatusCode; use common_macro::stack_trace_debug; use common_procedure::ProcedureId; +use common_wal::kafka::rskafka_client_error_to_retry_hint; +use object_store::error::retry_hint_from_opendal_error; use serde_json::error::Error as JsonError; use snafu::{Location, Snafu}; use store_api::storage::RegionId; @@ -1267,6 +1269,47 @@ impl ErrorExt for Error { fn as_any(&self) -> &dyn std::any::Any { self } + + fn retry_hint(&self) -> RetryHint { + use Error::*; + + match self { + RetryLater { .. } + | GetLatestCacheRetryExceeded { .. } + | NoLeader { .. } + | ElectionNoLeader { .. } + | ElectionLeaderLeaseExpired { .. } + | ElectionLeaderLeaseChanged { .. } => RetryHint::Retryable, + WriteObject { error, .. } | ReadObject { error, .. } => { + retry_hint_from_opendal_error(error) + } + BuildKafkaClient { error, .. } + | BuildKafkaCtrlClient { error, .. } + | KafkaPartitionClient { error, .. } + | KafkaGetOffset { error, .. } + | ProduceRecord { error, .. } + | CreateKafkaWalTopic { error, .. } => rskafka_client_error_to_retry_hint(error), + SubmitProcedure { source, .. } + | QueryProcedure { source, .. } + | WaitProcedure { source, .. } + | StartProcedureManager { source, .. } + | StopProcedureManager { source, .. } + | RegisterProcedureLoader { source, .. } + | PutPoison { source, .. } + | ProcedureStateReceiver { source, .. } => source.retry_hint(), + External { source, .. } + | ResponseExceededSizeLimit { source, .. } + | OperateDatanode { source, .. } + | AbortProcedure { source, .. } + | RegisterRepartitionProcedureLoader { source, .. } + | CreateRepartitionProcedure { source, .. } => source.retry_hint(), + Table { source, .. } => source.retry_hint(), + ConvertAlterTableRequest { source, .. } => source.retry_hint(), + ConvertColumnDef { source, .. } => source.retry_hint(), + GetCache { source, .. } => source.retry_hint(), + _ => RetryHint::NonRetryable, + } + } } impl Error { @@ -1336,3 +1379,44 @@ impl Error { } } } + +#[cfg(test)] +mod retry_hint_tests { + use std::sync::Arc; + + use common_error::mock::MockError; + + use super::*; + + #[test] + fn test_retry_later_hint_is_retryable() { + let err = Error::retry_later(MockError::new(StatusCode::Internal)); + + assert_eq!(err.retry_hint(), RetryHint::Retryable); + } + + #[test] + fn test_latest_cache_retry_exceeded_hint_is_retryable() { + let err = GetLatestCacheRetryExceededSnafu { attempts: 3_usize }.build(); + + assert_eq!(err.retry_hint(), RetryHint::Retryable); + } + + #[test] + fn test_get_cache_forwards_retry_hint() { + let source = Arc::new(Error::retry_later(MockError::new(StatusCode::Internal))); + let err = Error::GetCache { source }; + + assert_eq!(err.retry_hint(), RetryHint::Retryable); + } + + #[test] + fn test_default_hint_is_non_retryable() { + let err = UnexpectedSnafu { + err_msg: "mock error", + } + .build(); + + assert_eq!(err.retry_hint(), RetryHint::NonRetryable); + } +} diff --git a/src/common/procedure/src/error.rs b/src/common/procedure/src/error.rs index c8081a67fc..67fab54b15 100644 --- a/src/common/procedure/src/error.rs +++ b/src/common/procedure/src/error.rs @@ -15,9 +15,10 @@ use std::any::Any; use std::sync::Arc; -use common_error::ext::{BoxedError, ErrorExt}; +use common_error::ext::{BoxedError, ErrorExt, RetryHint}; use common_error::status_code::StatusCode; use common_macro::stack_trace_debug; +use object_store::error::retry_hint_from_opendal_error; use snafu::{Location, Snafu}; use crate::PoisonKey; @@ -293,6 +294,28 @@ impl ErrorExt for Error { fn as_any(&self) -> &dyn Any { self } + + fn retry_hint(&self) -> RetryHint { + match self { + Error::RetryLater { .. } => RetryHint::Retryable, + Error::External { source, .. } + | Error::PutState { source, .. } + | Error::DeleteStates { source, .. } + | Error::ListState { source, .. } + | Error::PutPoison { source, .. } + | Error::DeletePoison { source, .. } + | Error::GetPoison { source, .. } + | Error::CheckStatus { source, .. } => source.retry_hint(), + Error::ProcedureExec { source, .. } => source.retry_hint(), + Error::StartRemoveOutdatedMetaTask { source, .. } + | Error::StopRemoveOutdatedMetaTask { source, .. } => source.retry_hint(), + Error::DeleteState { error, .. } => retry_hint_from_opendal_error(error), + Error::RetryTimesExceeded { .. } | Error::RollbackTimesExceeded { .. } => { + RetryHint::NonRetryable + } + _ => RetryHint::NonRetryable, + } + } } impl Error { @@ -339,13 +362,70 @@ impl Error { || matches!(self, Error::RetryLater { clean_poisons, .. } if *clean_poisons) } + #[cfg(test)] /// Creates a new [Error::RetryLater] or [Error::External] error from source `err` according - /// to its [StatusCode]. + /// to its [RetryHint]. pub fn from_error_ext(err: E) -> Self { - if err.status_code().is_retryable() { + if err.retry_hint().is_retryable() { Error::retry_later(err) } else { Error::external(err) } } } + +#[cfg(test)] +mod tests { + use std::sync::Arc; + + use common_error::mock::MockError; + + use super::*; + + #[test] + fn test_retry_later_hint_is_retryable() { + let err = Error::retry_later(MockError::new(StatusCode::Internal)); + + assert_eq!(err.retry_hint(), RetryHint::Retryable); + } + + #[test] + fn test_external_forwards_retry_hint() { + let source = Error::retry_later(MockError::new(StatusCode::Internal)); + let err = Error::external(source); + + assert_eq!(err.retry_hint(), RetryHint::Retryable); + } + + #[test] + fn test_retry_exceeded_hint_is_non_retryable() { + let source = Arc::new(Error::retry_later(MockError::new(StatusCode::Internal))); + let err = Error::RetryTimesExceeded { + source: source.clone(), + procedure_id: ProcedureId::random(), + }; + + assert_eq!(err.retry_hint(), RetryHint::NonRetryable); + + let err = Error::RollbackTimesExceeded { + source, + procedure_id: ProcedureId::random(), + }; + + assert_eq!(err.retry_hint(), RetryHint::NonRetryable); + } + + #[test] + fn test_from_error_ext_uses_retry_hint() { + let err = Error::from_error_ext(Error::retry_later(MockError::new( + StatusCode::InvalidArguments, + ))); + assert!(err.is_retry_later()); + + let err = Error::from_error_ext(MockError::new(StatusCode::InvalidArguments)); + assert!(!err.is_retry_later()); + + let err = Error::from_error_ext(MockError::new(StatusCode::Internal)); + assert!(!err.is_retry_later()); + } +} diff --git a/src/common/recordbatch/src/error.rs b/src/common/recordbatch/src/error.rs index 00d4291ead..469320c19c 100644 --- a/src/common/recordbatch/src/error.rs +++ b/src/common/recordbatch/src/error.rs @@ -15,7 +15,7 @@ //! Error of record batch. use std::any::Any; -use common_error::ext::{BoxedError, ErrorExt}; +use common_error::ext::{BoxedError, ErrorExt, RetryHint}; use common_error::status_code::StatusCode; use common_macro::stack_trace_debug; use datafusion_common::ScalarValue; @@ -230,4 +230,15 @@ impl ErrorExt for Error { fn as_any(&self) -> &dyn Any { self } + + fn retry_hint(&self) -> RetryHint { + match self { + Error::ExceedMemoryLimit { .. } => RetryHint::Retryable, + Error::External { source, .. } => source.retry_hint(), + Error::SchemaConversion { source, .. } | Error::CastVector { source, .. } => { + source.retry_hint() + } + _ => RetryHint::NonRetryable, + } + } } diff --git a/src/common/wal/src/kafka.rs b/src/common/wal/src/kafka.rs new file mode 100644 index 0000000000..2586f0a007 --- /dev/null +++ b/src/common/wal/src/kafka.rs @@ -0,0 +1,96 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use common_error::ext::RetryHint; + +/// Maps an rskafka client error to a conservative retry hint. +/// +/// rskafka already retries most transient Kafka/network errors internally. This helper only +/// marks errors retryable when rskafka has exhausted its own retry loop or returns an explicit +/// client timeout. +pub fn rskafka_client_error_to_retry_hint(error: &rskafka::client::error::Error) -> RetryHint { + use rskafka::client::error::Error; + + match error { + Error::RetryFailed(_) + | Error::Timeout + | Error::Connection(rskafka::ConnectionError::RetryFailed(_)) => RetryHint::Retryable, + _ => RetryHint::NonRetryable, + } +} + +#[cfg(test)] +mod tests { + use std::time::Duration; + + use common_error::ext::RetryHint; + use rskafka::BackoffError; + use rskafka::client::error::{Error as KafkaClientError, ProtocolError, RequestContext}; + + use super::*; + + fn retry_failed_error() -> KafkaClientError { + KafkaClientError::RetryFailed(BackoffError::DeadlineExceded { + deadline: Duration::from_secs(1), + source: Box::new(std::io::Error::other("retry failed")), + }) + } + + #[test] + fn test_retry_failed_hint_is_retryable() { + assert_eq!( + rskafka_client_error_to_retry_hint(&retry_failed_error()), + RetryHint::Retryable + ); + } + + #[test] + fn test_timeout_hint_is_retryable() { + assert_eq!( + rskafka_client_error_to_retry_hint(&KafkaClientError::Timeout), + RetryHint::Retryable + ); + } + + #[test] + fn test_connection_retry_failed_hint_is_retryable() { + let err = KafkaClientError::Connection(rskafka::ConnectionError::RetryFailed( + BackoffError::DeadlineExceded { + deadline: Duration::from_secs(1), + source: Box::new(std::io::Error::other("retry failed")), + }, + )); + + assert_eq!( + rskafka_client_error_to_retry_hint(&err), + RetryHint::Retryable + ); + } + + #[test] + fn test_raw_protocol_error_hint_is_non_retryable() { + let err = KafkaClientError::ServerError { + protocol_error: ProtocolError::NetworkException, + error_message: None, + request: RequestContext::Topic("test_topic".to_string()), + response: None, + is_virtual: false, + }; + + assert_eq!( + rskafka_client_error_to_retry_hint(&err), + RetryHint::NonRetryable + ); + } +} diff --git a/src/common/wal/src/lib.rs b/src/common/wal/src/lib.rs index a0b1dc99f9..cbf48a3e78 100644 --- a/src/common/wal/src/lib.rs +++ b/src/common/wal/src/lib.rs @@ -21,6 +21,7 @@ use tokio::net; pub mod config; pub mod error; +pub mod kafka; pub mod options; #[cfg(any(test, feature = "testing"))] pub mod test_util; diff --git a/src/datanode/src/error.rs b/src/datanode/src/error.rs index c09b43dcb4..0e2e1da959 100644 --- a/src/datanode/src/error.rs +++ b/src/datanode/src/error.rs @@ -16,7 +16,7 @@ use std::any::Any; use std::sync::Arc; use common_error::define_into_tonic_status; -use common_error::ext::{BoxedError, ErrorExt}; +use common_error::ext::{BoxedError, ErrorExt, RetryHint}; use common_error::status_code::StatusCode; use common_macro::stack_trace_debug; use common_runtime::JoinError; @@ -506,6 +506,69 @@ impl ErrorExt for Error { fn as_any(&self) -> &dyn Any { self } + + fn retry_hint(&self) -> RetryHint { + use Error::*; + + match self { + RegionBusy { .. } + | RegionNotReady { .. } + | ConcurrentQueryLimiterClosed { .. } + | ConcurrentQueryLimiterTimeout { .. } => RetryHint::Retryable, + NewPlanDecoder { source, .. } | ExecuteLogicalPlan { source, .. } => { + source.retry_hint() + } + HandleHeartbeatResponse { source, .. } | GetMetadata { source, .. } => { + source.retry_hint() + } + DecodeLogicalPlan { source, .. } => source.retry_hint(), + Delete { source, .. } => source.retry_hint(), + AsyncTaskExecute { source, .. } => source.retry_hint(), + StartServer { source, .. } | ShutdownServer { source, .. } => source.retry_hint(), + OpenLogStore { source, .. } => source.retry_hint(), + MetaClientInit { source, .. } => source.retry_hint(), + HandleRegionRequest { source, .. } + | GetRegionMetadata { source, .. } + | HandleBatchOpenRequest { source, .. } + | HandleBatchDdlRequest { source, .. } + | StopRegionEngine { source, .. } => source.retry_hint(), + FindLogicalRegions { source, .. } => source.retry_hint(), + BuildMitoEngine { source, .. } => source.retry_hint(), + GcMitoEngine { source, .. } => source.retry_hint(), + BuildMetricEngine { source, .. } => source.retry_hint(), + ListStorageSsts { source, .. } => source.retry_hint(), + ObjectStore { source, .. } => source.retry_hint(), + _ => RetryHint::NonRetryable, + } + } } define_into_tonic_status!(Error); + +#[cfg(test)] +mod tests { + use common_error::ext::RetryHint; + + use super::*; + + #[test] + fn test_region_state_hints_are_retryable() { + let region_id = RegionId::new(1024, 1); + + let err = RegionBusySnafu { region_id }.build(); + assert_eq!(err.retry_hint(), RetryHint::Retryable); + + let err = RegionNotReadySnafu { region_id }.build(); + assert_eq!(err.retry_hint(), RetryHint::Retryable); + } + + #[test] + fn test_default_hint_is_non_retryable() { + let err = UnexpectedSnafu { + violated: "mock error", + } + .build(); + + assert_eq!(err.retry_hint(), RetryHint::NonRetryable); + } +} diff --git a/src/file-engine/src/error.rs b/src/file-engine/src/error.rs index 877b493cc6..f40adfe65f 100644 --- a/src/file-engine/src/error.rs +++ b/src/file-engine/src/error.rs @@ -14,11 +14,12 @@ use std::any::Any; -use common_error::ext::ErrorExt; +use common_error::ext::{ErrorExt, RetryHint}; use common_error::status_code::StatusCode; use common_macro::stack_trace_debug; use datafusion::arrow::error::ArrowError; use datafusion::error::DataFusionError; +use object_store::error::retry_hint_from_opendal_error; use serde_json::error::Error as JsonError; use snafu::{Location, Snafu}; use store_api::storage::RegionId; @@ -234,10 +235,17 @@ impl ErrorExt for Error { fn as_any(&self) -> &dyn Any { self } -} -impl From for common_procedure::Error { - fn from(e: Error) -> common_procedure::Error { - common_procedure::Error::from_error_ext(e) + fn retry_hint(&self) -> RetryHint { + use Error::*; + + match self { + BuildBackend { source, .. } | ParseFileFormat { source, .. } => source.retry_hint(), + CheckObject { error, .. } + | StoreRegionManifest { error, .. } + | LoadRegionManifest { error, .. } + | DeleteRegionManifest { error, .. } => retry_hint_from_opendal_error(error), + _ => RetryHint::NonRetryable, + } } } diff --git a/src/flow/src/error.rs b/src/flow/src/error.rs index 825c1dc5ac..2dcefd95c8 100644 --- a/src/flow/src/error.rs +++ b/src/flow/src/error.rs @@ -18,12 +18,15 @@ use std::any::Any; use api::v1::CreateTableExpr; use arrow_schema::ArrowError; -use common_error::ext::BoxedError; -use common_error::{define_into_tonic_status, from_err_code_msg_to_header}; +use common_error::ext::{BoxedError, RetryHint}; +use common_error::{ + GREPTIME_DB_HEADER_ERROR_RETRY_HINT, define_into_tonic_status, from_err_code_msg_to_header, +}; use common_macro::stack_trace_debug; use common_telemetry::common_error::ext::ErrorExt; use common_telemetry::common_error::status_code::StatusCode; use snafu::{Location, ResultExt, Snafu}; +use tonic::codegen::http::HeaderValue; use tonic::metadata::MetadataMap; use crate::FlowId; @@ -308,7 +311,11 @@ pub fn to_status_with_last_err(err: impl ErrorExt) -> tonic::Status { let msg = err.to_string(); let last_err_msg = common_error::ext::StackError::last(&err).to_string(); let code = err.status_code() as u32; - let header = from_err_code_msg_to_header(code, &last_err_msg); + let mut header = from_err_code_msg_to_header(code, &last_err_msg); + header.insert( + GREPTIME_DB_HEADER_ERROR_RETRY_HINT, + HeaderValue::from_static(err.retry_hint().as_str()), + ); tonic::Status::with_metadata( tonic::Code::InvalidArgument, @@ -366,6 +373,35 @@ impl ErrorExt for Error { fn as_any(&self) -> &dyn Any { self } + + fn retry_hint(&self) -> RetryHint { + match self { + Self::FlowNotRecovered { .. } | Self::NoAvailableFrontend { .. } => { + RetryHint::Retryable + } + + Self::InsertIntoFlow { source, .. } + | Self::CreateFlow { source, .. } + | Self::CreateSinkTable { source, .. } + | Self::External { source, .. } => source.retry_hint(), + + Self::Time { source, .. } => source.retry_hint(), + Self::TableNotFoundMeta { source, .. } | Self::ListFlows { source, .. } => { + source.retry_hint() + } + Self::Datatypes { source, .. } => source.retry_hint(), + Self::StartServer { source, .. } | Self::ShutdownServer { source, .. } => { + source.retry_hint() + } + Self::MetaClientInit { source, .. } => source.retry_hint(), + Self::InvalidRequest { source, .. } => source.retry_hint(), + Self::SubstraitEncodeLogicalPlan { source, .. } => source.retry_hint(), + Self::ConvertColumnSchema { source, .. } => source.retry_hint(), + Self::InvalidClientConfig { source, .. } => source.retry_hint(), + + _ => RetryHint::NonRetryable, + } + } } define_into_tonic_status!(Error); diff --git a/src/frontend/src/error.rs b/src/frontend/src/error.rs index 6f78d23e14..144fb96ab9 100644 --- a/src/frontend/src/error.rs +++ b/src/frontend/src/error.rs @@ -16,7 +16,7 @@ use std::any::Any; use common_datasource::file_format::Format; use common_error::define_into_tonic_status; -use common_error::ext::{BoxedError, ErrorExt}; +use common_error::ext::{BoxedError, ErrorExt, RetryHint}; use common_error::status_code::StatusCode; use common_macro::stack_trace_debug; use common_query::error::datafusion_status_code; @@ -448,6 +448,46 @@ impl ErrorExt for Error { fn as_any(&self) -> &dyn Any { self } + + fn retry_hint(&self) -> RetryHint { + match self { + Error::InvalidateTableCache { source, .. } + | Error::HandleHeartbeatResponse { source, .. } + | Error::RequestQuery { source, .. } => source.retry_hint(), + + Error::External { source, .. } + | Error::SqlExecIntercepted { source, .. } + | Error::InitPlugin { source, .. } => source.retry_hint(), + + Error::StartServer { source, .. } + | Error::ShutdownServer { source, .. } + | Error::InvokeRegionServer { source, .. } + | Error::ExecutePromql { source, .. } + | Error::PromStoreRemoteQueryPlan { source, .. } + | Error::PrometheusMetricNamesQueryPlan { source, .. } => source.retry_hint(), + + Error::ParseSql { source, .. } => source.retry_hint(), + Error::Catalog { source, .. } => source.retry_hint(), + Error::CreateMetaHeartbeatStream { source, .. } => source.retry_hint(), + Error::FindRegionPeer { source, .. } => source.retry_hint(), + Error::Table { source, .. } => source.retry_hint(), + Error::CollectRecordbatch { source, .. } => source.retry_hint(), + Error::PlanStatement { source, .. } + | Error::ReadTable { source, .. } + | Error::ExecLogicalPlan { source, .. } + | Error::DescribeStatement { source, .. } => source.retry_hint(), + Error::PrometheusLabelValuesQueryPlan { source, .. } => source.retry_hint(), + Error::Insert { source, .. } => source.retry_hint(), + Error::Permission { source, .. } => source.retry_hint(), + Error::TableOperation { source, .. } => source.retry_hint(), + Error::IllegalAuthConfig { source, .. } => source.retry_hint(), + Error::TomlFormat { source, .. } => source.retry_hint(), + Error::InvalidTlsConfig { error, .. } => error.retry_hint(), + Error::SubstraitDecodeLogicalPlan { source, .. } => source.retry_hint(), + + _ => RetryHint::NonRetryable, + } + } } define_into_tonic_status!(Error); diff --git a/src/frontend/src/instance/otlp.rs b/src/frontend/src/instance/otlp.rs index 4b4b1b6cfd..9672b05301 100644 --- a/src/frontend/src/instance/otlp.rs +++ b/src/frontend/src/instance/otlp.rs @@ -543,7 +543,12 @@ impl Instance { | StatusCode::TableColumnNotFound => ChunkFailureReaction::RetryPerSpan, StatusCode::DatabaseNotFound => ChunkFailureReaction::DiscardChunk, StatusCode::Cancelled | StatusCode::DeadlineExceeded => ChunkFailureReaction::Propagate, - _ if status.is_retryable() => ChunkFailureReaction::Propagate, + StatusCode::StorageUnavailable + | StatusCode::RuntimeResourcesExhausted + | StatusCode::Internal + | StatusCode::RegionNotReady + | StatusCode::TableUnavailable + | StatusCode::RegionBusy => ChunkFailureReaction::Propagate, _ => ChunkFailureReaction::DiscardChunk, } } diff --git a/src/log-store/src/error.rs b/src/log-store/src/error.rs index b56334a01a..a83944d517 100644 --- a/src/log-store/src/error.rs +++ b/src/log-store/src/error.rs @@ -14,10 +14,12 @@ use std::any::Any; -use common_error::ext::ErrorExt; +use common_error::ext::{ErrorExt, RetryHint, retry_hint_from_io_error}; use common_error::status_code::StatusCode; use common_macro::stack_trace_debug; use common_runtime::error::Error as RuntimeError; +use common_wal::kafka::rskafka_client_error_to_retry_hint; +use object_store::error::retry_hint_from_opendal_error; use serde_json::error::Error as JsonError; use snafu::{Location, Snafu}; use store_api::storage::RegionId; @@ -375,4 +377,30 @@ impl ErrorExt for Error { | ConsumeRecord { error, .. } => rskafka_client_error_to_status_code(error), } } + + fn retry_hint(&self) -> RetryHint { + use Error::*; + + match self { + CreateWriter { error, .. } | WriteIndex { error, .. } | ReadIndex { error, .. } => { + retry_hint_from_opendal_error(error) + } + Io { error, .. } => retry_hint_from_io_error(error), + FetchEntry { .. } | RaftEngine { .. } | AddEntryLogBatch { .. } => RetryHint::Retryable, + ProduceRecord { error, .. } => match error { + rskafka::client::producer::Error::Client(error) => { + rskafka_client_error_to_retry_hint(error) + } + rskafka::client::producer::Error::Aggregator(_) + | rskafka::client::producer::Error::FlushError(_) + | rskafka::client::producer::Error::TooLarge => RetryHint::NonRetryable, + }, + BuildClient { error, .. } + | BuildPartitionClient { error, .. } + | BatchProduce { error, .. } + | GetOffset { error, .. } + | ConsumeRecord { error, .. } => rskafka_client_error_to_retry_hint(error), + _ => RetryHint::NonRetryable, + } + } } diff --git a/src/meta-client/src/error.rs b/src/meta-client/src/error.rs index 19577f672c..917559fffb 100644 --- a/src/meta-client/src/error.rs +++ b/src/meta-client/src/error.rs @@ -13,7 +13,7 @@ // limitations under the License. use common_error::define_from_tonic_status; -use common_error::ext::ErrorExt; +use common_error::ext::{ErrorExt, RetryHint}; use common_error::status_code::StatusCode; use common_macro::stack_trace_debug; use snafu::{Location, Snafu}; @@ -34,6 +34,7 @@ pub enum Error { code: StatusCode, msg: String, tonic_code: tonic::Code, + retry_hint: RetryHint, #[snafu(implicit)] location: Location, }, @@ -156,6 +157,18 @@ impl ErrorExt for Error { | Error::GetFlowStat { source, .. } => source.status_code(), } } + + fn retry_hint(&self) -> RetryHint { + match self { + Error::MetaServer { retry_hint, .. } => *retry_hint, + Error::InvalidResponseHeader { source, .. } + | Error::ConvertMetaRequest { source, .. } + | Error::ConvertMetaResponse { source, .. } + | Error::GetFlowStat { source, .. } => source.retry_hint(), + Error::CreateChannel { source, .. } => source.retry_hint(), + _ => RetryHint::NonRetryable, + } + } } impl Error { @@ -171,3 +184,62 @@ impl Error { } define_from_tonic_status!(Error, MetaServer); + +#[cfg(test)] +mod tests { + use common_error::ext::{ErrorExt, RetryHint}; + use common_error::{GREPTIME_DB_HEADER_ERROR_CODE, GREPTIME_DB_HEADER_ERROR_RETRY_HINT}; + use tonic::codegen::http::{HeaderMap, HeaderValue}; + use tonic::metadata::MetadataMap; + + use super::*; + + #[test] + fn test_from_tonic_status_fallbacks_to_status_code() { + let status = tonic::Status::new(tonic::Code::Internal, "blabla"); + + let err: Error = status.into(); + + assert_eq!(err.retry_hint(), RetryHint::NonRetryable); + } + + #[test] + fn test_from_tonic_status_fallback_can_be_non_retryable() { + let mut headers = HeaderMap::new(); + headers.insert( + GREPTIME_DB_HEADER_ERROR_CODE, + HeaderValue::from(StatusCode::InvalidArguments as u32), + ); + let status = tonic::Status::with_metadata( + tonic::Code::Internal, + "blabla", + MetadataMap::from_headers(headers), + ); + + let err: Error = status.into(); + + assert_eq!(err.retry_hint(), RetryHint::NonRetryable); + } + + #[test] + fn test_from_tonic_status_with_retry_hint() { + let mut headers = HeaderMap::new(); + headers.insert( + GREPTIME_DB_HEADER_ERROR_CODE, + HeaderValue::from(StatusCode::Internal as u32), + ); + headers.insert( + GREPTIME_DB_HEADER_ERROR_RETRY_HINT, + HeaderValue::from_static(RetryHint::Retryable.as_str()), + ); + let status = tonic::Status::with_metadata( + tonic::Code::Internal, + "blabla", + MetadataMap::from_headers(headers), + ); + + let err: Error = status.into(); + + assert_eq!(err.retry_hint(), RetryHint::Retryable); + } +} diff --git a/src/meta-srv/src/error.rs b/src/meta-srv/src/error.rs index 505eff4b1d..66f95b0446 100644 --- a/src/meta-srv/src/error.rs +++ b/src/meta-srv/src/error.rs @@ -13,12 +13,13 @@ // limitations under the License. use common_error::define_into_tonic_status; -use common_error::ext::{BoxedError, ErrorExt}; +use common_error::ext::{BoxedError, ErrorExt, RetryHint}; use common_error::status_code::StatusCode; use common_macro::stack_trace_debug; use common_meta::DatanodeId; use common_procedure::ProcedureId; use common_runtime::JoinError; +use common_wal::kafka::rskafka_client_error_to_retry_hint; use snafu::{Location, Snafu}; use store_api::storage::RegionId; use table::metadata::TableId; @@ -1124,29 +1125,7 @@ pub enum Error { impl Error { /// Returns `true` if the error is retryable. pub fn is_retryable(&self) -> bool { - matches!( - self, - Error::RetryLater { .. } - | Error::RetryLaterWithSource { .. } - | Error::MailboxTimeout { .. } - ) || matches!( - self, - Error::AllocateRegions { source, .. } if source.is_retry_later() - ) || matches!( - self, - Error::DeallocateRegions { source, .. } if source.is_retry_later() - ) || matches!( - self, - Error::DeleteRecords { error, .. } - | Error::BuildPartitionClient { error, .. } - | Error::GetOffset { error, .. } - if Self::is_retryable_kafka_client_error(error) - ) - } - - /// Returns `true` if the Kafka client has exhausted its internal retry. - fn is_retryable_kafka_client_error(err: &rskafka::client::error::Error) -> bool { - matches!(err, rskafka::client::error::Error::RetryFailed(_)) + self.retry_hint().is_retryable() } } @@ -1311,6 +1290,35 @@ impl ErrorExt for Error { fn as_any(&self) -> &dyn std::any::Any { self } + + fn retry_hint(&self) -> RetryHint { + match self { + Error::RetryLater { .. } + | Error::RetryLaterWithSource { .. } + | Error::MailboxTimeout { .. } + | Error::NoEnoughAvailableNode { .. } + | Error::NoLeader { .. } + | Error::LeaderLeaseExpired { .. } + | Error::LeaderLeaseChanged { .. } + | Error::PeerUnavailable { .. } => RetryHint::Retryable, + + Error::AllocateRegions { source, .. } | Error::DeallocateRegions { source, .. } + if source.retry_hint().is_retryable() => + { + RetryHint::Retryable + } + + Error::DeleteRecords { error, .. } + | Error::BuildPartitionClient { error, .. } + | Error::GetOffset { error, .. } => rskafka_client_error_to_retry_hint(error), + + Error::PusherNotFound { .. } + | Error::PushMessage { .. } + | Error::ExceededDeadline { .. } => RetryHint::NonRetryable, + + _ => RetryHint::NonRetryable, + } + } } // for form tonic @@ -1338,6 +1346,7 @@ pub(crate) fn match_for_io_error(err_status: &tonic::Status) -> Option<&std::io: mod tests { use std::time::Duration; + use common_error::ext::ErrorExt; use common_error::mock::MockError; use common_error::status_code::StatusCode; use rskafka::BackoffError; @@ -1363,6 +1372,7 @@ mod tests { .unwrap_err(); assert!(err.is_retryable()); + assert!(err.retry_hint().is_retryable()); } #[test] @@ -1376,6 +1386,7 @@ mod tests { .unwrap_err(); assert!(!err.is_retryable()); + assert!(!err.retry_hint().is_retryable()); } #[test] @@ -1402,24 +1413,28 @@ mod tests { assert!(delete_records_err.is_retryable()); assert!(build_partition_client_err.is_retryable()); assert!(get_offset_err.is_retryable()); + assert!(delete_records_err.retry_hint().is_retryable()); + assert!(build_partition_client_err.retry_hint().is_retryable()); + assert!(get_offset_err.retry_hint().is_retryable()); } #[test] fn test_kafka_non_retry_failed_errors_are_not_retryable() { - let delete_records_err = Err::<(), _>(KafkaClientError::Timeout) + let delete_records_err = Err::<(), _>(KafkaClientError::InvalidResponse("invalid".into())) .context(DeleteRecordsSnafu { topic: "test_topic", partition: 0, offset: 1024u64, }) .unwrap_err(); - let build_partition_client_err = Err::<(), _>(KafkaClientError::Timeout) - .context(BuildPartitionClientSnafu { - topic: "test_topic", - partition: 0, - }) - .unwrap_err(); - let get_offset_err = Err::<(), _>(KafkaClientError::Timeout) + let build_partition_client_err = + Err::<(), _>(KafkaClientError::InvalidResponse("invalid".into())) + .context(BuildPartitionClientSnafu { + topic: "test_topic", + partition: 0, + }) + .unwrap_err(); + let get_offset_err = Err::<(), _>(KafkaClientError::InvalidResponse("invalid".into())) .context(GetOffsetSnafu { topic: "test_topic", }) @@ -1428,5 +1443,8 @@ mod tests { assert!(!delete_records_err.is_retryable()); assert!(!build_partition_client_err.is_retryable()); assert!(!get_offset_err.is_retryable()); + assert!(!delete_records_err.retry_hint().is_retryable()); + assert!(!build_partition_client_err.retry_hint().is_retryable()); + assert!(!get_offset_err.retry_hint().is_retryable()); } } diff --git a/src/metric-engine/src/error.rs b/src/metric-engine/src/error.rs index f01737e764..59aee77b67 100644 --- a/src/metric-engine/src/error.rs +++ b/src/metric-engine/src/error.rs @@ -15,7 +15,7 @@ use std::any::Any; use std::sync::Arc; -use common_error::ext::{BoxedError, ErrorExt}; +use common_error::ext::{BoxedError, ErrorExt, RetryHint}; use common_error::status_code::StatusCode; use common_macro::stack_trace_debug; use datatypes::prelude::ConcreteDataType; @@ -464,4 +464,30 @@ impl ErrorExt for Error { fn as_any(&self) -> &dyn Any { self } + + fn retry_hint(&self) -> RetryHint { + use Error::*; + + match self { + CreateMitoRegion { source, .. } + | OpenMitoRegion { source, .. } + | BatchOpenMitoRegion { source, .. } + | BatchCatchupMitoRegion { source, .. } + | CloseMitoRegion { source, .. } + | MitoReadOperation { source, .. } + | MitoWriteOperation { source, .. } + | MitoFlushOperation { source, .. } + | MitoSyncOperation { source, .. } + | MitoEnterStagingOperation { source, .. } + | MitoCopyRegionFromOperation { source, .. } + | MitoEditRegion { source, .. } => source.retry_hint(), + + EncodePrimaryKey { source, .. } => source.retry_hint(), + CollectRecordBatchStream { source, .. } => source.retry_hint(), + StartRepeatedTask { source, .. } => source.retry_hint(), + CacheGet { source, .. } => source.retry_hint(), + + _ => RetryHint::NonRetryable, + } + } } diff --git a/src/mito2/src/error.rs b/src/mito2/src/error.rs index 2cbe54c6f4..219f376195 100644 --- a/src/mito2/src/error.rs +++ b/src/mito2/src/error.rs @@ -16,7 +16,7 @@ use std::any::Any; use std::sync::Arc; use common_datasource::compression::CompressionType; -use common_error::ext::{BoxedError, ErrorExt}; +use common_error::ext::{BoxedError, ErrorExt, RetryHint}; use common_error::status_code::StatusCode; use common_macro::stack_trace_debug; use common_memory_manager; @@ -26,6 +26,7 @@ use common_time::timestamp::TimeUnit; use datatypes::arrow::error::ArrowError; use datatypes::prelude::ConcreteDataType; use object_store::ErrorKind; +use object_store::error::retry_hint_from_opendal_error; use partition::error::Error as PartitionError; use prost::DecodeError; use snafu::{Location, Snafu}; @@ -1498,4 +1499,80 @@ impl ErrorExt for Error { fn as_any(&self) -> &dyn Any { self } + + fn retry_hint(&self) -> RetryHint { + use Error::*; + + match self { + ReadParquet { .. } + | WriteParquet { .. } + | RejectWrite { .. } + | Download { .. } + | Upload { .. } + | RegionState { .. } + | UpdateManifest { .. } + | RegionStopped { .. } + | RegionBusy { .. } + | FlushableRegionState { .. } => RetryHint::Retryable, + + OpenDal { error, .. } + | DeleteSsts { error, .. } + | DeleteIndex { error, .. } + | DeleteIndexes { error, .. } => retry_hint_from_opendal_error(error), + + WriteWal { source, .. } + | ReadWal { source, .. } + | DeleteWal { source, .. } + | FetchManifests { source, .. } + | External { source, .. } => source.retry_hint(), + + OpenRegion { source, .. } + | WriteGroup { source, .. } + | FlushRegion { source, .. } + | BuildIndexAsync { source, .. } + | CompactRegion { source, .. } + | EditRegion { source, .. } + | ScanSeries { source, .. } + | PruneFile { source, .. } => source.retry_hint(), + + DataTypeMismatch { source, .. } + | ConvertVector { source, .. } + | ConvertValue { source, .. } + | IndexOptions { source, .. } + | CastVector { source, .. } => source.retry_hint(), + + BuildIndexApplier { source, .. } + | PushIndexValue { source, .. } + | ApplyInvertedIndex { source, .. } + | IndexFinish { source, .. } => source.retry_hint(), + + ApplyBloomFilterIndex { source, .. } + | PushBloomFilterValue { source, .. } + | BloomFilterFinish { source, .. } => source.retry_hint(), + + PuffinReadBlob { source, .. } + | PuffinAddBlob { source, .. } + | PuffinInitStager { source, .. } + | PuffinBuildReader { source, .. } + | PuffinPurgeStager { source, .. } => source.retry_hint(), + + CreateFulltextCreator { source, .. } + | FulltextPushText { source, .. } + | FulltextFinish { source, .. } + | ApplyFulltextIndex { source, .. } => source.retry_hint(), + + InvalidRegionRequest { source, .. } => source.retry_hint(), + InvalidPartitionExpr { source, .. } => source.retry_hint(), + RecordBatch { source, .. } => source.retry_hint(), + GetSchemaMetadata { source, .. } => source.retry_hint(), + CompactionMemoryExhausted { source, .. } => source.retry_hint(), + ConvertBulkWalEntry { source, .. } => source.retry_hint(), + Encode { source, .. } | Decode { source, .. } => source.retry_hint(), + + #[cfg(feature = "enterprise")] + ScanExternalRange { source, .. } => source.retry_hint(), + + _ => RetryHint::NonRetryable, + } + } } diff --git a/src/object-store/src/error.rs b/src/object-store/src/error.rs index cb259ef5c8..14661e261c 100644 --- a/src/object-store/src/error.rs +++ b/src/object-store/src/error.rs @@ -15,7 +15,7 @@ use std::any::Any; use common_macro::stack_trace_debug; -use common_telemetry::common_error::ext::ErrorExt; +use common_telemetry::common_error::ext::{ErrorExt, RetryHint}; use common_telemetry::common_error::status_code::StatusCode; use snafu::{Location, Snafu}; @@ -56,6 +56,15 @@ pub enum Error { pub type Result = std::result::Result; +/// Converts an `opendal::Error` into a `RetryHint`. +pub fn retry_hint_from_opendal_error(error: &opendal::Error) -> RetryHint { + if error.is_temporary() { + RetryHint::Retryable + } else { + RetryHint::NonRetryable + } +} + impl ErrorExt for Error { fn status_code(&self) -> StatusCode { use Error::*; @@ -69,4 +78,11 @@ impl ErrorExt for Error { fn as_any(&self) -> &dyn Any { self } + + fn retry_hint(&self) -> RetryHint { + match self { + Error::InitBackend { error, .. } => retry_hint_from_opendal_error(error), + _ => RetryHint::NonRetryable, + } + } } diff --git a/src/operator/src/error.rs b/src/operator/src/error.rs index da65cc8c2f..13b86140aa 100644 --- a/src/operator/src/error.rs +++ b/src/operator/src/error.rs @@ -16,13 +16,14 @@ use std::any::Any; use common_datasource::file_format::Format; use common_error::define_into_tonic_status; -use common_error::ext::{BoxedError, ErrorExt}; +use common_error::ext::{BoxedError, ErrorExt, RetryHint}; use common_error::status_code::StatusCode; use common_macro::stack_trace_debug; use common_query::error::Error as QueryResult; use datafusion::parquet; use datafusion_common::DataFusionError; use datatypes::arrow::error::ArrowError; +use object_store::error::retry_hint_from_opendal_error; use snafu::{Location, Snafu}; use table::metadata::TableType; @@ -1062,6 +1063,72 @@ impl ErrorExt for Error { fn as_any(&self) -> &dyn Any { self } + + fn retry_hint(&self) -> RetryHint { + match self { + Error::ReadObject { error, .. } => retry_hint_from_opendal_error(error), + Error::ReadParquetMetadata { .. } => RetryHint::Retryable, + Error::InvalidateTableCache { source, .. } + | Error::ExecuteDdl { source, .. } + | Error::RequestInserts { source, .. } + | Error::RequestDeletes { source, .. } + | Error::RequestRegion { source, .. } + | Error::FindViewInfo { source, .. } + | Error::TableMetadataManager { source, .. } => source.retry_hint(), + + Error::ParseFileFormat { source, .. } + | Error::InferSchema { source, .. } + | Error::ListObjects { source, .. } + | Error::ParseUrl { source, .. } + | Error::BuildBackend { source, .. } + | Error::ReadOrc { source, .. } => source.retry_hint(), + + Error::ExtractTableNames { source, .. } + | Error::ExecuteStatement { source, .. } + | Error::PlanStatement { source, .. } + | Error::ParseQuery { source, .. } + | Error::ExecLogicalPlan { source, .. } + | Error::DescribeStatement { source, .. } => source.retry_hint(), + + Error::FindTablePartitionRule { source, .. } + | Error::SplitInsert { source, .. } + | Error::SplitDelete { source, .. } + | Error::FindRegionLeader { source, .. } => source.retry_hint(), + + Error::BuildCreateExprOnInsertion { source, .. } + | Error::FindNewColumnsOnInsertion { source, .. } + | Error::AlterExprToRequest { source, .. } => source.retry_hint(), + + Error::ConvertColumnDefaultConstraint { source, .. } + | Error::IntoVectors { source, .. } + | Error::ColumnDefaultValue { source, .. } => source.retry_hint(), + + Error::ColumnDataType { source, .. } + | Error::InvalidColumnDef { source, .. } + | Error::ColumnOptions { source, .. } => source.retry_hint(), + + Error::Table { source, .. } + | Error::Insert { source, .. } + | Error::MissingTimeIndexColumn { source, .. } => source.retry_hint(), + + Error::Cast { source, .. } => source.retry_hint(), + Error::ParseSql { source, .. } => source.retry_hint(), + Error::Catalog { source, .. } => source.retry_hint(), + Error::SubstraitCodec { source, .. } => source.retry_hint(), + Error::External { source, .. } => source.retry_hint(), + Error::BuildRecordBatch { source, .. } => source.retry_hint(), + Error::DecodeFlightData { source, .. } => source.retry_hint(), + Error::SqlCommon { source, .. } => source.retry_hint(), + Error::ConvertSchema { source, .. } => source.retry_hint(), + Error::WriteStreamToFile { source, .. } => source.retry_hint(), + Error::PrepareFileTable { source, .. } | Error::InferFileTableSchema { source, .. } => { + source.retry_hint() + } + #[cfg(feature = "enterprise")] + Error::TriggerQuerier { source, .. } => source.retry_hint(), + _ => RetryHint::NonRetryable, + } + } } define_into_tonic_status!(Error); diff --git a/src/partition/src/error.rs b/src/partition/src/error.rs index 569c8ff738..065ff9d525 100644 --- a/src/partition/src/error.rs +++ b/src/partition/src/error.rs @@ -14,7 +14,7 @@ use std::any::Any; -use common_error::ext::ErrorExt; +use common_error::ext::{ErrorExt, RetryHint}; use common_error::status_code::StatusCode; use common_macro::stack_trace_debug; use datafusion_common::ScalarValue; @@ -283,6 +283,17 @@ impl ErrorExt for Error { fn as_any(&self) -> &dyn Any { self } + + fn retry_hint(&self) -> RetryHint { + match self { + Error::GetCache { .. } | Error::FindLeader { .. } => RetryHint::Retryable, + Error::TableRouteManager { source, .. } + | Error::GetPartitionInfo { source, .. } + | Error::UnexpectedLogicalRouteTable { source, .. } => source.retry_hint(), + Error::ConvertToVector { source, .. } => source.retry_hint(), + _ => RetryHint::NonRetryable, + } + } } pub type Result = std::result::Result; diff --git a/src/query/src/error.rs b/src/query/src/error.rs index 6d2a81bd73..d7b4a66703 100644 --- a/src/query/src/error.rs +++ b/src/query/src/error.rs @@ -15,7 +15,7 @@ use std::any::Any; use std::time::{Duration, TryFromFloatSecsError}; -use common_error::ext::{BoxedError, ErrorExt}; +use common_error::ext::{BoxedError, ErrorExt, RetryHint}; use common_error::status_code::StatusCode; use common_macro::stack_trace_debug; use common_query::error::datafusion_status_code; @@ -462,6 +462,29 @@ impl ErrorExt for Error { fn as_any(&self) -> &dyn Any { self } + + fn retry_hint(&self) -> RetryHint { + use Error::*; + + match self { + BuildBackend { source, .. } | ListObjects { source, .. } => source.retry_hint(), + GetRegionMetadata { .. } => RetryHint::Retryable, + ParseFileFormat { source, .. } | InferSchema { source, .. } => source.retry_hint(), + Catalog { source, .. } => source.retry_hint(), + CreateRecordBatch { source, .. } => source.retry_hint(), + PartitionRuleManager { source, .. } => source.retry_hint(), + QueryExecution { source, .. } | QueryPlan { source, .. } => source.retry_hint(), + Sql { source, .. } => source.retry_hint(), + ConvertSqlType { source, .. } | ConvertSqlValue { source, .. } => source.retry_hint(), + RegionQuery { source, .. } => source.retry_hint(), + TableMutation { source, .. } => source.retry_hint(), + GetFulltextOptions { source, .. } + | GetSkippingIndexOptions { source, .. } + | GetVectorIndexOptions { source, .. } + | Datatypes { source, .. } => source.retry_hint(), + _ => RetryHint::NonRetryable, + } + } } pub type Result = std::result::Result; diff --git a/src/servers/src/error.rs b/src/servers/src/error.rs index 825e2296c0..4a390e2e5b 100644 --- a/src/servers/src/error.rs +++ b/src/servers/src/error.rs @@ -23,7 +23,7 @@ use axum::{Json, http}; use base64::DecodeError; use common_base::readable_size::ReadableSize; use common_error::define_into_tonic_status; -use common_error::ext::{BoxedError, ErrorExt}; +use common_error::ext::{BoxedError, ErrorExt, RetryHint}; use common_error::status_code::StatusCode; use common_macro::stack_trace_debug; use common_telemetry::{error, warn}; @@ -853,6 +853,47 @@ impl ErrorExt for Error { } } + fn retry_hint(&self) -> RetryHint { + use Error::*; + match self { + ExecuteQuery { source, .. } + | ExecutePlan { source, .. } + | ExecuteGrpcQuery { source, .. } + | ExecuteGrpcRequest { source, .. } + | CheckDatabaseValidity { source, .. } + | DescribeStatement { source } => source.retry_hint(), + + Pipeline { source, .. } => source.retry_hint(), + CommonMeta { source, .. } => source.retry_hint(), + Catalog { source, .. } => source.retry_hint(), + RowWriter { source, .. } => source.retry_hint(), + Auth { source, .. } => source.retry_hint(), + + #[cfg(feature = "mem-prof")] + DumpProfileData { source, .. } => source.retry_hint(), + + ParsePromQL { source, .. } => source.retry_hint(), + Other { source, .. } => source.retry_hint(), + + #[cfg(feature = "pprof")] + DumpPprof { source, .. } => source.retry_hint(), + + ConvertScalarValue { source, .. } => source.retry_hint(), + ConvertSqlValue { source, .. } => source.retry_hint(), + GreptimeProto { source, .. } => source.retry_hint(), + Partition { source, .. } => source.retry_hint(), + MetricEngine { source, .. } => source.retry_hint(), + SubmitBatch { source, .. } => source.retry_hint(), + + MemoryLimitExceeded { source, .. } => source.retry_hint(), + CollectRecordbatch { source, .. } => source.retry_hint(), + + TooManyConcurrentRequests { .. } => RetryHint::Retryable, + + _ => RetryHint::NonRetryable, + } + } + fn as_any(&self) -> &dyn Any { self }