feat: decouple error retryability from status codes (#8301)

* feat: add retry hint to common error

Signed-off-by: WenyXu <wenymedia@gmail.com>

* feat: propagate retry hints in core errors

Signed-off-by: WenyXu <wenymedia@gmail.com>

* fix: propagate retry hint through RPC metadata

Signed-off-by: WenyXu <wenymedia@gmail.com>

* fix: fallback retry hints to status codes

Signed-off-by: WenyXu <wenymedia@gmail.com>

* fix: add explicit retry hints for retryable errors

Signed-off-by: WenyXu <wenymedia@gmail.com>

* refactor: remove status code retry fallback

Signed-off-by: WenyXu <wenymedia@gmail.com>

* fix: classify io retry hints

Signed-off-by: WenyXu <wenymedia@gmail.com>

* fix: preserve retry hints across error wrappers

Signed-off-by: WenyXu <wenymedia@gmail.com>

* refactor: minior

Signed-off-by: WenyXu <wenymedia@gmail.com>

* fix: preserve datasource retry hints in query

Signed-off-by: WenyXu <wenymedia@gmail.com>

* fix: preserve metric engine retry hints

Signed-off-by: WenyXu <wenymedia@gmail.com>

* fix: preserve flow and frontend retry hints

Signed-off-by: WenyXu <wenymedia@gmail.com>

* fix: share rskafka retry hint mapping

Signed-off-by: WenyXu <wenymedia@gmail.com>

---------

Signed-off-by: WenyXu <wenymedia@gmail.com>
This commit is contained in:
Weny Xu
2026-06-17 16:27:53 +08:00
committed by GitHub
parent a3cbfdb983
commit e520ff8300
34 changed files with 1286 additions and 122 deletions

View File

@@ -12,7 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use common_error::ext::{BoxedError, ErrorExt};
use common_error::ext::{BoxedError, ErrorExt, RetryHint, retry_hint_from_io_error};
use common_error::status_code::StatusCode;
use common_macro::stack_trace_debug;
use snafu::{Location, Snafu};
@@ -111,6 +111,14 @@ impl ErrorExt for Error {
fn as_any(&self) -> &dyn std::any::Any {
self
}
fn retry_hint(&self) -> RetryHint {
match self {
Error::Io { error, .. } => retry_hint_from_io_error(error),
Error::AuthBackend { source, .. } => source.retry_hint(),
_ => RetryHint::NonRetryable,
}
}
}
pub type Result<T> = std::result::Result<T, Error>;

View File

@@ -14,9 +14,10 @@
use std::any::Any;
use common_error::ext::ErrorExt;
use common_error::ext::{ErrorExt, RetryHint};
use common_error::status_code::StatusCode;
use common_macro::stack_trace_debug;
use object_store::error::retry_hint_from_opendal_error;
use snafu::{Location, Snafu};
#[derive(Snafu)]
@@ -243,4 +244,14 @@ impl ErrorExt for Error {
fn as_any(&self) -> &dyn Any {
self
}
fn retry_hint(&self) -> RetryHint {
match self {
Error::StorageOperation { error, .. } | Error::BuildObjectStore { error, .. } => {
retry_hint_from_opendal_error(error)
}
Error::Database { error, .. } => error.retry_hint(),
_ => RetryHint::NonRetryable,
}
}
}

View File

@@ -14,7 +14,7 @@
use std::any::Any;
use common_error::ext::ErrorExt;
use common_error::ext::{ErrorExt, RetryHint, retry_hint_from_io_error};
use common_error::status_code::StatusCode;
use common_macro::stack_trace_debug;
use snafu::{Location, Snafu};
@@ -216,6 +216,25 @@ impl ErrorExt for Error {
}
}
fn retry_hint(&self) -> RetryHint {
match self {
#[cfg(test)]
Error::TestTaskFailed { retryable, .. } => {
if *retryable {
RetryHint::Retryable
} else {
RetryHint::NonRetryable
}
}
Error::Database { error, .. } => error.retry_hint(),
Error::SnapshotStorage { error, .. } | Error::ChunkImportFailed { error, .. } => {
error.retry_hint()
}
Error::ImportStateIo { error, .. } => retry_hint_from_io_error(error),
_ => RetryHint::NonRetryable,
}
}
fn as_any(&self) -> &dyn Any {
self
}

View File

@@ -868,13 +868,17 @@ mod tests {
use api::v1::auth_header::AuthScheme;
use api::v1::{AuthHeader, Basic};
use common_error::ext::{ErrorExt, RetryHint};
use common_error::status_code::StatusCode;
use common_error::{GREPTIME_DB_HEADER_ERROR_CODE, GREPTIME_DB_HEADER_ERROR_RETRY_HINT};
use common_query::OutputData;
use common_recordbatch::{OrderOption, RecordBatch, RecordBatchStream};
use datatypes::prelude::{ConcreteDataType, VectorRef};
use datatypes::schema::{ColumnSchema, Schema};
use datatypes::vectors::Int32Vector;
use futures_util::StreamExt;
use tonic::codegen::http::{HeaderMap, HeaderValue};
use tonic::metadata::MetadataMap;
use tonic::{Code, Status};
use super::*;
@@ -1008,6 +1012,7 @@ mod tests {
code: StatusCode::Internal,
msg: "blabla".to_string(),
tonic_code: Code::Internal,
retry_hint: RetryHint::NonRetryable,
}
.build();
@@ -1015,6 +1020,52 @@ mod tests {
let actual: Error = status.into();
assert_eq!(expected.to_string(), actual.to_string());
assert_eq!(expected.retry_hint(), actual.retry_hint());
assert_eq!(expected.should_retry(), actual.should_retry());
}
#[test]
fn test_from_tonic_status_with_retry_hint() {
let mut headers = HeaderMap::new();
headers.insert(
GREPTIME_DB_HEADER_ERROR_CODE,
HeaderValue::from(StatusCode::Internal as u32),
);
headers.insert(
GREPTIME_DB_HEADER_ERROR_RETRY_HINT,
HeaderValue::from_static(RetryHint::Retryable.as_str()),
);
let status =
Status::with_metadata(Code::Internal, "blabla", MetadataMap::from_headers(headers));
let actual: Error = status.into();
assert_eq!(actual.retry_hint(), RetryHint::Retryable);
assert!(actual.should_retry());
}
#[test]
fn test_from_tonic_status_fallback() {
let mut headers = HeaderMap::new();
headers.insert(
GREPTIME_DB_HEADER_ERROR_CODE,
HeaderValue::from(StatusCode::InvalidArguments as u32),
);
let status =
Status::with_metadata(Code::Internal, "blabla", MetadataMap::from_headers(headers));
let actual: Error = status.into();
assert_eq!(actual.retry_hint(), RetryHint::NonRetryable);
assert!(!actual.should_retry());
}
#[test]
fn test_should_retry_preserves_transport_retry() {
let status = Status::new(Code::Unavailable, "blabla");
let actual: Error = status.into();
assert!(actual.should_retry());
}
#[tokio::test]

View File

@@ -15,7 +15,7 @@
use std::any::Any;
use common_error::define_from_tonic_status;
use common_error::ext::{BoxedError, ErrorExt};
use common_error::ext::{BoxedError, ErrorExt, RetryHint};
use common_error::status_code::StatusCode;
use common_macro::stack_trace_debug;
use snafu::{Location, Snafu};
@@ -130,6 +130,7 @@ pub enum Error {
code: StatusCode,
msg: String,
tonic_code: Code,
retry_hint: RetryHint,
#[snafu(implicit)]
location: Location,
},
@@ -168,6 +169,21 @@ impl ErrorExt for Error {
fn as_any(&self) -> &dyn Any {
self
}
fn retry_hint(&self) -> RetryHint {
match self {
Error::Tonic { retry_hint, .. } => *retry_hint,
Error::FlightGet { source, .. }
| Error::RegionServer { source, .. }
| Error::FlowServer { source, .. }
| Error::External { source, .. } => source.retry_hint(),
Error::ConvertFlightData { source, .. }
| Error::CreateChannel { source, .. }
| Error::CreateTlsChannel { source, .. } => source.retry_hint(),
Error::ConvertSchema { source, .. } => source.retry_hint(),
_ => RetryHint::NonRetryable,
}
}
}
define_from_tonic_status!(Error, Tonic);
@@ -194,7 +210,8 @@ impl Error {
}
pub fn should_retry(&self) -> bool {
self.is_connection_error()
self.retry_hint().is_retryable()
|| self.is_connection_error()
|| matches!(
self.tonic_code(),
Some(Code::Cancelled) | Some(Code::DeadlineExceeded)

View File

@@ -15,10 +15,11 @@
use std::any::Any;
use arrow_schema::ArrowError;
use common_error::ext::ErrorExt;
use common_error::ext::{ErrorExt, RetryHint, retry_hint_from_io_error};
use common_error::status_code::StatusCode;
use common_macro::stack_trace_debug;
use datafusion::parquet::errors::ParquetError;
use object_store::error::retry_hint_from_opendal_error;
use snafu::{Location, Snafu};
use url::ParseError;
@@ -250,4 +251,18 @@ impl ErrorExt for Error {
fn as_any(&self) -> &dyn Any {
self
}
fn retry_hint(&self) -> RetryHint {
use Error::*;
match self {
BuildBackend { error, .. }
| ListObjects { error, .. }
| ReadObject { error, .. }
| WriteObject { error, .. } => retry_hint_from_opendal_error(error),
AsyncWrite { error, .. } => retry_hint_from_io_error(error),
WriteParquet { .. } => RetryHint::Retryable,
_ => RetryHint::NonRetryable,
}
}
}

View File

@@ -14,12 +14,83 @@
use std::any::Any;
use std::fmt::{Debug, Formatter};
use std::io::ErrorKind;
use std::str::FromStr;
use std::sync::Arc;
use snafu::{FromString, Snafu};
use crate::status_code::StatusCode;
/// Describes whether an error instance is safe and useful to retry.
///
/// This is intentionally separate from [`StatusCode`]: status code describes the
/// error category exposed to users, while retry hint describes retry policy for
/// this specific error instance.
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum RetryHint {
/// The operation may succeed if retried later.
Retryable,
/// Retrying the same operation is not expected to help.
///
/// This is the default for errors that do not explicitly opt in to retry.
NonRetryable,
}
const RETRY_HINT_RETRYABLE: &str = "retryable";
const RETRY_HINT_NON_RETRYABLE: &str = "non_retryable";
impl RetryHint {
pub fn is_retryable(self) -> bool {
matches!(self, RetryHint::Retryable)
}
pub fn as_str(self) -> &'static str {
match self {
RetryHint::Retryable => RETRY_HINT_RETRYABLE,
RetryHint::NonRetryable => RETRY_HINT_NON_RETRYABLE,
}
}
}
impl FromStr for RetryHint {
type Err = ();
fn from_str(s: &str) -> Result<Self, Self::Err> {
match s {
RETRY_HINT_RETRYABLE => Ok(RetryHint::Retryable),
RETRY_HINT_NON_RETRYABLE => Ok(RetryHint::NonRetryable),
_ => Err(()),
}
}
}
/// Converts a [`std::io::Error`] into a conservative [`RetryHint`].
///
/// This helper classifies known transient I/O conditions as retryable and treats
/// request, permission, filesystem-capacity, and data-shape errors as
/// non-retryable. `std::io::ErrorKind` is non-exhaustive, so future or
/// unclassified kinds are considered non-retryable until reviewed explicitly.
pub fn retry_hint_from_io_error(error: &std::io::Error) -> RetryHint {
match error.kind() {
ErrorKind::ConnectionRefused
| ErrorKind::ConnectionReset
| ErrorKind::HostUnreachable
| ErrorKind::NetworkUnreachable
| ErrorKind::ConnectionAborted
| ErrorKind::NotConnected
| ErrorKind::NetworkDown
| ErrorKind::BrokenPipe
| ErrorKind::WouldBlock
| ErrorKind::StaleNetworkFileHandle
| ErrorKind::TimedOut
| ErrorKind::ResourceBusy
| ErrorKind::Interrupted => RetryHint::Retryable,
_ => RetryHint::NonRetryable,
}
}
/// Extension to [`Error`](std::error::Error) in std.
pub trait ErrorExt: StackError {
/// Map this error to [StatusCode].
@@ -27,6 +98,23 @@ pub trait ErrorExt: StackError {
StatusCode::Unknown
}
/// Returns the retry hint for this error instance.
///
/// Implementations should return [`RetryHint::Retryable`] only when retrying the
/// same operation may succeed without changing the request. The default is
/// [`RetryHint::NonRetryable`] to avoid accidental retry loops.
fn retry_hint(&self) -> RetryHint {
RetryHint::NonRetryable
}
/// Returns whether this error instance is marked retryable.
///
/// This is derived from [`Self::retry_hint`]. Transport-level retries, such as a
/// gRPC `Unavailable`, may still be handled separately by client code.
fn is_retryable(&self) -> bool {
self.retry_hint().is_retryable()
}
/// Returns the error as [Any](std::any::Any) so that it can be
/// downcast to a specific implementation.
fn as_any(&self) -> &dyn Any;
@@ -194,6 +282,10 @@ impl crate::ext::ErrorExt for BoxedError {
self.inner.status_code()
}
fn retry_hint(&self) -> RetryHint {
self.inner.retry_hint()
}
fn as_any(&self) -> &dyn std::any::Any {
self.inner.as_any()
}

View File

@@ -27,6 +27,7 @@ use crate::status_code::StatusCode;
// please define in `src/servers/src/http/header.rs`.
pub const GREPTIME_DB_HEADER_ERROR_CODE: &str = "x-greptime-err-code";
pub const GREPTIME_DB_HEADER_ERROR_MSG: &str = "x-greptime-err-msg";
pub const GREPTIME_DB_HEADER_ERROR_RETRY_HINT: &str = "x-greptime-err-retry-hint";
/// Create a http header map from error code and message.
/// using `GREPTIME_DB_HEADER_ERROR_CODE` and `GREPTIME_DB_HEADER_ERROR_MSG` as keys.

View File

@@ -134,54 +134,6 @@ impl StatusCode {
Self::Success as u32 == code
}
/// Returns `true` if the error with this code is retryable.
pub fn is_retryable(&self) -> bool {
match self {
StatusCode::StorageUnavailable
| StatusCode::RuntimeResourcesExhausted
| StatusCode::Internal
| StatusCode::RegionNotReady
| StatusCode::TableUnavailable
| StatusCode::RegionBusy => true,
StatusCode::Success
| StatusCode::Unknown
| StatusCode::Unsupported
| StatusCode::IllegalState
| StatusCode::Unexpected
| StatusCode::InvalidArguments
| StatusCode::Cancelled
| StatusCode::DeadlineExceeded
| StatusCode::InvalidSyntax
| StatusCode::DatabaseAlreadyExists
| StatusCode::PlanQuery
| StatusCode::EngineExecuteQuery
| StatusCode::TableAlreadyExists
| StatusCode::TableNotFound
| StatusCode::RegionAlreadyExists
| StatusCode::RegionNotFound
| StatusCode::FlowAlreadyExists
| StatusCode::FlowNotFound
| StatusCode::TriggerAlreadyExists
| StatusCode::TriggerNotFound
| StatusCode::RegionReadonly
| StatusCode::TableColumnNotFound
| StatusCode::TableColumnExists
| StatusCode::DatabaseNotFound
| StatusCode::RateLimited
| StatusCode::UserNotFound
| StatusCode::UnsupportedPasswordType
| StatusCode::UserPasswordMismatch
| StatusCode::AuthHeaderNotFound
| StatusCode::InvalidAuthHeader
| StatusCode::AccessDenied
| StatusCode::PermissionDenied
| StatusCode::RequestOutdated
| StatusCode::External
| StatusCode::Suspended => false,
}
}
/// Returns `true` if we should print an error log for an error with
/// this status code.
pub fn should_log_error(&self) -> bool {
@@ -271,12 +223,16 @@ macro_rules! define_from_tonic_status {
let msg = metadata_value(&e, $crate::GREPTIME_DB_HEADER_ERROR_MSG)
.unwrap_or_else(|| e.message().to_string());
let retry_hint = metadata_value(&e, $crate::GREPTIME_DB_HEADER_ERROR_RETRY_HINT)
.and_then(|s| s.parse().ok())
.unwrap_or($crate::ext::RetryHint::NonRetryable);
// TODO(LFC): Make the error variant defined automatically.
Self::$Variant {
code,
msg,
tonic_code: e.code(),
retry_hint,
location: location!(),
}
}
@@ -291,11 +247,13 @@ macro_rules! define_into_tonic_status {
fn from(err: $Error) -> Self {
use tonic::codegen::http::{HeaderMap, HeaderValue};
use tonic::metadata::MetadataMap;
use $crate::GREPTIME_DB_HEADER_ERROR_CODE;
use $crate::{
GREPTIME_DB_HEADER_ERROR_CODE, GREPTIME_DB_HEADER_ERROR_RETRY_HINT,
};
common_telemetry::error!(err; "Failed to handle request");
let mut headers = HeaderMap::<HeaderValue>::with_capacity(2);
let mut headers = HeaderMap::<HeaderValue>::with_capacity(3);
// If either of the status_code or error msg cannot convert to valid HTTP header value
// (which is a very rare case), just ignore. Client will use Tonic status code and message.
@@ -304,6 +262,10 @@ macro_rules! define_into_tonic_status {
GREPTIME_DB_HEADER_ERROR_CODE,
HeaderValue::from(status_code as u32),
);
headers.insert(
GREPTIME_DB_HEADER_ERROR_RETRY_HINT,
HeaderValue::from_static(err.retry_hint().as_str()),
);
let root_error = err.output_msg();
let metadata = MetadataMap::from_headers(headers);

View File

@@ -13,8 +13,14 @@
// limitations under the License.
use std::any::Any;
use std::fmt::{Display, Formatter};
use std::io::{Error as IoError, ErrorKind};
use std::str::FromStr;
use common_error::ext::{ErrorExt, PlainError, StackError, WhateverResult};
use common_error::ext::{
BoxedError, ErrorExt, PlainError, RetryHint, StackError, WhateverResult,
retry_hint_from_io_error,
};
use common_error::status_code::StatusCode;
use common_macro::stack_trace_debug;
use snafu::{Location, ResultExt, Snafu};
@@ -70,8 +76,9 @@ fn test_into_whatever_error() {
let whatever = f(normal_error).unwrap_err();
assert_eq!(
normalize_path(&whatever.to_string()),
// The location points to the `NormalSnafu` context in `normal_error()`.
format!(
r#"0: A normal error with "display" attribute, message "blabla", at {}:55:22
r#"0: A normal error with "display" attribute, message "blabla", at {}:61:22
1: PlainError {{ msg: "<root cause>", status_code: Unexpected }}"#,
normalize_path(file!())
)
@@ -80,8 +87,9 @@ fn test_into_whatever_error() {
let whatever = f(transparent_error).unwrap_err();
assert_eq!(
normalize_path(&whatever.to_string()),
// The location points to the transparent `?` return in `transparent_error()`.
format!(
r#"0: <transparent>, at {}:60:5
r#"0: <transparent>, at {}:66:5
1: PlainError {{ msg: "<root cause>", status_code: Unexpected }}"#,
normalize_path(file!())
)
@@ -123,8 +131,9 @@ fn test_debug_format() {
assert_eq!(
normalize_path(&debug_output),
// The location points to the `NormalSnafu` context in `normal_error()`.
format!(
r#"0: A normal error with "display" attribute, message "blabla", at {}:55:22
r#"0: A normal error with "display" attribute, message "blabla", at {}:61:22
1: PlainError {{ msg: "<root cause>", status_code: Unexpected }}"#,
normalize_path(file!())
)
@@ -134,8 +143,9 @@ fn test_debug_format() {
let debug_output = format!("{:?}", result.unwrap_err());
assert_eq!(
normalize_path(&debug_output),
// The location points to the transparent `?` return in `transparent_error()`.
format!(
r#"0: <transparent>, at {}:60:5
r#"0: <transparent>, at {}:66:5
1: PlainError {{ msg: "<root cause>", status_code: Unexpected }}"#,
normalize_path(file!())
)
@@ -150,3 +160,134 @@ fn test_transparent_flag() {
let result = transparent_error();
assert!(result.unwrap_err().transparent());
}
#[test]
fn test_retry_hint_helpers() {
assert!(RetryHint::Retryable.is_retryable());
assert!(!RetryHint::NonRetryable.is_retryable());
assert_eq!(RetryHint::Retryable.as_str(), "retryable");
assert_eq!(RetryHint::NonRetryable.as_str(), "non_retryable");
assert_eq!(
RetryHint::from_str("retryable").unwrap(),
RetryHint::Retryable
);
assert_eq!(
RetryHint::from_str("non_retryable").unwrap(),
RetryHint::NonRetryable
);
assert!(RetryHint::from_str("unknown").is_err());
}
#[test]
fn test_retry_hint_default() {
let err = normal_error().unwrap_err();
assert_eq!(err.retry_hint(), RetryHint::NonRetryable);
assert!(!err.is_retryable());
}
#[test]
fn test_boxed_error_retry_hint() {
let err = BoxedError::new(RetryableError);
assert_eq!(err.retry_hint(), RetryHint::Retryable);
assert!(err.is_retryable());
}
#[test]
fn test_retry_hint_from_io_error() {
let retryable_kinds = [
ErrorKind::ConnectionRefused,
ErrorKind::ConnectionReset,
ErrorKind::HostUnreachable,
ErrorKind::NetworkUnreachable,
ErrorKind::ConnectionAborted,
ErrorKind::NotConnected,
ErrorKind::NetworkDown,
ErrorKind::BrokenPipe,
ErrorKind::WouldBlock,
ErrorKind::StaleNetworkFileHandle,
ErrorKind::TimedOut,
ErrorKind::ResourceBusy,
ErrorKind::Interrupted,
];
for kind in retryable_kinds {
let error = IoError::from(kind);
assert_eq!(
retry_hint_from_io_error(&error),
RetryHint::Retryable,
"{kind:?}"
);
}
let non_retryable_kinds = [
ErrorKind::NotFound,
ErrorKind::PermissionDenied,
ErrorKind::AddrInUse,
ErrorKind::AddrNotAvailable,
ErrorKind::AlreadyExists,
ErrorKind::NotADirectory,
ErrorKind::IsADirectory,
ErrorKind::DirectoryNotEmpty,
ErrorKind::ReadOnlyFilesystem,
ErrorKind::InvalidInput,
ErrorKind::InvalidData,
ErrorKind::WriteZero,
ErrorKind::StorageFull,
ErrorKind::NotSeekable,
ErrorKind::QuotaExceeded,
ErrorKind::FileTooLarge,
ErrorKind::ExecutableFileBusy,
ErrorKind::Deadlock,
ErrorKind::CrossesDevices,
ErrorKind::TooManyLinks,
ErrorKind::InvalidFilename,
ErrorKind::ArgumentListTooLong,
ErrorKind::Unsupported,
ErrorKind::UnexpectedEof,
ErrorKind::OutOfMemory,
ErrorKind::Other,
];
for kind in non_retryable_kinds {
let error = IoError::from(kind);
assert_eq!(
retry_hint_from_io_error(&error),
RetryHint::NonRetryable,
"{kind:?}"
);
}
}
#[derive(Debug)]
struct RetryableError;
impl Display for RetryableError {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
write!(f, "retryable error")
}
}
impl std::error::Error for RetryableError {}
impl StackError for RetryableError {
fn debug_fmt(&self, layer: usize, buf: &mut Vec<String>) {
buf.push(format!("{}: retryable error", layer))
}
fn next(&self) -> Option<&dyn StackError> {
None
}
}
impl ErrorExt for RetryableError {
fn retry_hint(&self) -> RetryHint {
RetryHint::Retryable
}
fn as_any(&self) -> &dyn Any {
self
}
}

View File

@@ -12,7 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use common_error::ext::{BoxedError, ErrorExt};
use common_error::ext::{BoxedError, ErrorExt, RetryHint};
use common_error::status_code::StatusCode;
use common_macro::stack_trace_debug;
use snafu::{Location, Snafu};
@@ -75,4 +75,15 @@ impl ErrorExt for Error {
fn as_any(&self) -> &dyn std::any::Any {
self
}
fn retry_hint(&self) -> RetryHint {
use Error::*;
match self {
External { source, .. } => source.retry_hint(),
Meta { source, .. } => source.retry_hint(),
CreateChannel { source, .. } => source.retry_hint(),
_ => RetryHint::NonRetryable,
}
}
}

View File

@@ -15,7 +15,7 @@
use std::any::Any;
use std::io;
use common_error::ext::ErrorExt;
use common_error::ext::{ErrorExt, RetryHint, retry_hint_from_io_error};
use common_error::status_code::StatusCode;
use common_macro::stack_trace_debug;
use datatypes::arrow::error::ArrowError;
@@ -133,4 +133,20 @@ impl ErrorExt for Error {
fn as_any(&self) -> &dyn Any {
self
}
fn retry_hint(&self) -> RetryHint {
match self {
Error::CreateChannel { .. } => RetryHint::Retryable,
Error::Arrow { error, .. } => retry_hint_from_arrow_error(error),
Error::FileWatch { source, .. } => source.retry_hint(),
_ => RetryHint::NonRetryable,
}
}
}
fn retry_hint_from_arrow_error(error: &ArrowError) -> RetryHint {
match error {
ArrowError::IoError(_, error) => retry_hint_from_io_error(error),
_ => RetryHint::NonRetryable,
}
}

View File

@@ -15,7 +15,7 @@
use std::any::Any;
use std::path::PathBuf;
use common_error::ext::{BoxedError, ErrorExt};
use common_error::ext::{BoxedError, ErrorExt, RetryHint, retry_hint_from_io_error};
use common_error::status_code::StatusCode;
use common_macro::stack_trace_debug;
use snafu::{Location, Snafu};
@@ -101,6 +101,14 @@ impl ErrorExt for Error {
}
}
fn retry_hint(&self) -> RetryHint {
match self {
Error::OpenTempFile { error, .. } => retry_hint_from_io_error(error),
Error::DumpProfileData { .. } => RetryHint::Retryable,
_ => RetryHint::NonRetryable,
}
}
fn as_any(&self) -> &dyn Any {
self
}

View File

@@ -15,7 +15,7 @@
use std::any::Any;
use std::time::Duration;
use common_error::ext::ErrorExt;
use common_error::ext::{ErrorExt, RetryHint};
use common_error::status_code::StatusCode;
use common_macro::stack_trace_debug;
use snafu::Snafu;
@@ -60,4 +60,13 @@ impl ErrorExt for Error {
fn as_any(&self) -> &dyn Any {
self
}
fn retry_hint(&self) -> RetryHint {
match self {
Error::MemoryLimitExceeded { .. } | Error::MemoryAcquireTimeout { .. } => {
RetryHint::Retryable
}
_ => RetryHint::NonRetryable,
}
}
}

View File

@@ -15,10 +15,12 @@
use std::str::Utf8Error;
use std::sync::Arc;
use common_error::ext::{BoxedError, ErrorExt};
use common_error::ext::{BoxedError, ErrorExt, RetryHint};
use common_error::status_code::StatusCode;
use common_macro::stack_trace_debug;
use common_procedure::ProcedureId;
use common_wal::kafka::rskafka_client_error_to_retry_hint;
use object_store::error::retry_hint_from_opendal_error;
use serde_json::error::Error as JsonError;
use snafu::{Location, Snafu};
use store_api::storage::RegionId;
@@ -1267,6 +1269,47 @@ impl ErrorExt for Error {
fn as_any(&self) -> &dyn std::any::Any {
self
}
fn retry_hint(&self) -> RetryHint {
use Error::*;
match self {
RetryLater { .. }
| GetLatestCacheRetryExceeded { .. }
| NoLeader { .. }
| ElectionNoLeader { .. }
| ElectionLeaderLeaseExpired { .. }
| ElectionLeaderLeaseChanged { .. } => RetryHint::Retryable,
WriteObject { error, .. } | ReadObject { error, .. } => {
retry_hint_from_opendal_error(error)
}
BuildKafkaClient { error, .. }
| BuildKafkaCtrlClient { error, .. }
| KafkaPartitionClient { error, .. }
| KafkaGetOffset { error, .. }
| ProduceRecord { error, .. }
| CreateKafkaWalTopic { error, .. } => rskafka_client_error_to_retry_hint(error),
SubmitProcedure { source, .. }
| QueryProcedure { source, .. }
| WaitProcedure { source, .. }
| StartProcedureManager { source, .. }
| StopProcedureManager { source, .. }
| RegisterProcedureLoader { source, .. }
| PutPoison { source, .. }
| ProcedureStateReceiver { source, .. } => source.retry_hint(),
External { source, .. }
| ResponseExceededSizeLimit { source, .. }
| OperateDatanode { source, .. }
| AbortProcedure { source, .. }
| RegisterRepartitionProcedureLoader { source, .. }
| CreateRepartitionProcedure { source, .. } => source.retry_hint(),
Table { source, .. } => source.retry_hint(),
ConvertAlterTableRequest { source, .. } => source.retry_hint(),
ConvertColumnDef { source, .. } => source.retry_hint(),
GetCache { source, .. } => source.retry_hint(),
_ => RetryHint::NonRetryable,
}
}
}
impl Error {
@@ -1336,3 +1379,44 @@ impl Error {
}
}
}
#[cfg(test)]
mod retry_hint_tests {
use std::sync::Arc;
use common_error::mock::MockError;
use super::*;
#[test]
fn test_retry_later_hint_is_retryable() {
let err = Error::retry_later(MockError::new(StatusCode::Internal));
assert_eq!(err.retry_hint(), RetryHint::Retryable);
}
#[test]
fn test_latest_cache_retry_exceeded_hint_is_retryable() {
let err = GetLatestCacheRetryExceededSnafu { attempts: 3_usize }.build();
assert_eq!(err.retry_hint(), RetryHint::Retryable);
}
#[test]
fn test_get_cache_forwards_retry_hint() {
let source = Arc::new(Error::retry_later(MockError::new(StatusCode::Internal)));
let err = Error::GetCache { source };
assert_eq!(err.retry_hint(), RetryHint::Retryable);
}
#[test]
fn test_default_hint_is_non_retryable() {
let err = UnexpectedSnafu {
err_msg: "mock error",
}
.build();
assert_eq!(err.retry_hint(), RetryHint::NonRetryable);
}
}

View File

@@ -15,9 +15,10 @@
use std::any::Any;
use std::sync::Arc;
use common_error::ext::{BoxedError, ErrorExt};
use common_error::ext::{BoxedError, ErrorExt, RetryHint};
use common_error::status_code::StatusCode;
use common_macro::stack_trace_debug;
use object_store::error::retry_hint_from_opendal_error;
use snafu::{Location, Snafu};
use crate::PoisonKey;
@@ -293,6 +294,28 @@ impl ErrorExt for Error {
fn as_any(&self) -> &dyn Any {
self
}
fn retry_hint(&self) -> RetryHint {
match self {
Error::RetryLater { .. } => RetryHint::Retryable,
Error::External { source, .. }
| Error::PutState { source, .. }
| Error::DeleteStates { source, .. }
| Error::ListState { source, .. }
| Error::PutPoison { source, .. }
| Error::DeletePoison { source, .. }
| Error::GetPoison { source, .. }
| Error::CheckStatus { source, .. } => source.retry_hint(),
Error::ProcedureExec { source, .. } => source.retry_hint(),
Error::StartRemoveOutdatedMetaTask { source, .. }
| Error::StopRemoveOutdatedMetaTask { source, .. } => source.retry_hint(),
Error::DeleteState { error, .. } => retry_hint_from_opendal_error(error),
Error::RetryTimesExceeded { .. } | Error::RollbackTimesExceeded { .. } => {
RetryHint::NonRetryable
}
_ => RetryHint::NonRetryable,
}
}
}
impl Error {
@@ -339,13 +362,70 @@ impl Error {
|| matches!(self, Error::RetryLater { clean_poisons, .. } if *clean_poisons)
}
#[cfg(test)]
/// Creates a new [Error::RetryLater] or [Error::External] error from source `err` according
/// to its [StatusCode].
/// to its [RetryHint].
pub fn from_error_ext<E: ErrorExt + Send + Sync + 'static>(err: E) -> Self {
if err.status_code().is_retryable() {
if err.retry_hint().is_retryable() {
Error::retry_later(err)
} else {
Error::external(err)
}
}
}
#[cfg(test)]
mod tests {
use std::sync::Arc;
use common_error::mock::MockError;
use super::*;
#[test]
fn test_retry_later_hint_is_retryable() {
let err = Error::retry_later(MockError::new(StatusCode::Internal));
assert_eq!(err.retry_hint(), RetryHint::Retryable);
}
#[test]
fn test_external_forwards_retry_hint() {
let source = Error::retry_later(MockError::new(StatusCode::Internal));
let err = Error::external(source);
assert_eq!(err.retry_hint(), RetryHint::Retryable);
}
#[test]
fn test_retry_exceeded_hint_is_non_retryable() {
let source = Arc::new(Error::retry_later(MockError::new(StatusCode::Internal)));
let err = Error::RetryTimesExceeded {
source: source.clone(),
procedure_id: ProcedureId::random(),
};
assert_eq!(err.retry_hint(), RetryHint::NonRetryable);
let err = Error::RollbackTimesExceeded {
source,
procedure_id: ProcedureId::random(),
};
assert_eq!(err.retry_hint(), RetryHint::NonRetryable);
}
#[test]
fn test_from_error_ext_uses_retry_hint() {
let err = Error::from_error_ext(Error::retry_later(MockError::new(
StatusCode::InvalidArguments,
)));
assert!(err.is_retry_later());
let err = Error::from_error_ext(MockError::new(StatusCode::InvalidArguments));
assert!(!err.is_retry_later());
let err = Error::from_error_ext(MockError::new(StatusCode::Internal));
assert!(!err.is_retry_later());
}
}

View File

@@ -15,7 +15,7 @@
//! Error of record batch.
use std::any::Any;
use common_error::ext::{BoxedError, ErrorExt};
use common_error::ext::{BoxedError, ErrorExt, RetryHint};
use common_error::status_code::StatusCode;
use common_macro::stack_trace_debug;
use datafusion_common::ScalarValue;
@@ -230,4 +230,15 @@ impl ErrorExt for Error {
fn as_any(&self) -> &dyn Any {
self
}
fn retry_hint(&self) -> RetryHint {
match self {
Error::ExceedMemoryLimit { .. } => RetryHint::Retryable,
Error::External { source, .. } => source.retry_hint(),
Error::SchemaConversion { source, .. } | Error::CastVector { source, .. } => {
source.retry_hint()
}
_ => RetryHint::NonRetryable,
}
}
}

View File

@@ -0,0 +1,96 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use common_error::ext::RetryHint;
/// Maps an rskafka client error to a conservative retry hint.
///
/// rskafka already retries most transient Kafka/network errors internally. This helper only
/// marks errors retryable when rskafka has exhausted its own retry loop or returns an explicit
/// client timeout.
pub fn rskafka_client_error_to_retry_hint(error: &rskafka::client::error::Error) -> RetryHint {
use rskafka::client::error::Error;
match error {
Error::RetryFailed(_)
| Error::Timeout
| Error::Connection(rskafka::ConnectionError::RetryFailed(_)) => RetryHint::Retryable,
_ => RetryHint::NonRetryable,
}
}
#[cfg(test)]
mod tests {
use std::time::Duration;
use common_error::ext::RetryHint;
use rskafka::BackoffError;
use rskafka::client::error::{Error as KafkaClientError, ProtocolError, RequestContext};
use super::*;
fn retry_failed_error() -> KafkaClientError {
KafkaClientError::RetryFailed(BackoffError::DeadlineExceded {
deadline: Duration::from_secs(1),
source: Box::new(std::io::Error::other("retry failed")),
})
}
#[test]
fn test_retry_failed_hint_is_retryable() {
assert_eq!(
rskafka_client_error_to_retry_hint(&retry_failed_error()),
RetryHint::Retryable
);
}
#[test]
fn test_timeout_hint_is_retryable() {
assert_eq!(
rskafka_client_error_to_retry_hint(&KafkaClientError::Timeout),
RetryHint::Retryable
);
}
#[test]
fn test_connection_retry_failed_hint_is_retryable() {
let err = KafkaClientError::Connection(rskafka::ConnectionError::RetryFailed(
BackoffError::DeadlineExceded {
deadline: Duration::from_secs(1),
source: Box::new(std::io::Error::other("retry failed")),
},
));
assert_eq!(
rskafka_client_error_to_retry_hint(&err),
RetryHint::Retryable
);
}
#[test]
fn test_raw_protocol_error_hint_is_non_retryable() {
let err = KafkaClientError::ServerError {
protocol_error: ProtocolError::NetworkException,
error_message: None,
request: RequestContext::Topic("test_topic".to_string()),
response: None,
is_virtual: false,
};
assert_eq!(
rskafka_client_error_to_retry_hint(&err),
RetryHint::NonRetryable
);
}
}

View File

@@ -21,6 +21,7 @@ use tokio::net;
pub mod config;
pub mod error;
pub mod kafka;
pub mod options;
#[cfg(any(test, feature = "testing"))]
pub mod test_util;

View File

@@ -16,7 +16,7 @@ use std::any::Any;
use std::sync::Arc;
use common_error::define_into_tonic_status;
use common_error::ext::{BoxedError, ErrorExt};
use common_error::ext::{BoxedError, ErrorExt, RetryHint};
use common_error::status_code::StatusCode;
use common_macro::stack_trace_debug;
use common_runtime::JoinError;
@@ -506,6 +506,69 @@ impl ErrorExt for Error {
fn as_any(&self) -> &dyn Any {
self
}
fn retry_hint(&self) -> RetryHint {
use Error::*;
match self {
RegionBusy { .. }
| RegionNotReady { .. }
| ConcurrentQueryLimiterClosed { .. }
| ConcurrentQueryLimiterTimeout { .. } => RetryHint::Retryable,
NewPlanDecoder { source, .. } | ExecuteLogicalPlan { source, .. } => {
source.retry_hint()
}
HandleHeartbeatResponse { source, .. } | GetMetadata { source, .. } => {
source.retry_hint()
}
DecodeLogicalPlan { source, .. } => source.retry_hint(),
Delete { source, .. } => source.retry_hint(),
AsyncTaskExecute { source, .. } => source.retry_hint(),
StartServer { source, .. } | ShutdownServer { source, .. } => source.retry_hint(),
OpenLogStore { source, .. } => source.retry_hint(),
MetaClientInit { source, .. } => source.retry_hint(),
HandleRegionRequest { source, .. }
| GetRegionMetadata { source, .. }
| HandleBatchOpenRequest { source, .. }
| HandleBatchDdlRequest { source, .. }
| StopRegionEngine { source, .. } => source.retry_hint(),
FindLogicalRegions { source, .. } => source.retry_hint(),
BuildMitoEngine { source, .. } => source.retry_hint(),
GcMitoEngine { source, .. } => source.retry_hint(),
BuildMetricEngine { source, .. } => source.retry_hint(),
ListStorageSsts { source, .. } => source.retry_hint(),
ObjectStore { source, .. } => source.retry_hint(),
_ => RetryHint::NonRetryable,
}
}
}
define_into_tonic_status!(Error);
#[cfg(test)]
mod tests {
use common_error::ext::RetryHint;
use super::*;
#[test]
fn test_region_state_hints_are_retryable() {
let region_id = RegionId::new(1024, 1);
let err = RegionBusySnafu { region_id }.build();
assert_eq!(err.retry_hint(), RetryHint::Retryable);
let err = RegionNotReadySnafu { region_id }.build();
assert_eq!(err.retry_hint(), RetryHint::Retryable);
}
#[test]
fn test_default_hint_is_non_retryable() {
let err = UnexpectedSnafu {
violated: "mock error",
}
.build();
assert_eq!(err.retry_hint(), RetryHint::NonRetryable);
}
}

View File

@@ -14,11 +14,12 @@
use std::any::Any;
use common_error::ext::ErrorExt;
use common_error::ext::{ErrorExt, RetryHint};
use common_error::status_code::StatusCode;
use common_macro::stack_trace_debug;
use datafusion::arrow::error::ArrowError;
use datafusion::error::DataFusionError;
use object_store::error::retry_hint_from_opendal_error;
use serde_json::error::Error as JsonError;
use snafu::{Location, Snafu};
use store_api::storage::RegionId;
@@ -234,10 +235,17 @@ impl ErrorExt for Error {
fn as_any(&self) -> &dyn Any {
self
}
}
impl From<Error> for common_procedure::Error {
fn from(e: Error) -> common_procedure::Error {
common_procedure::Error::from_error_ext(e)
fn retry_hint(&self) -> RetryHint {
use Error::*;
match self {
BuildBackend { source, .. } | ParseFileFormat { source, .. } => source.retry_hint(),
CheckObject { error, .. }
| StoreRegionManifest { error, .. }
| LoadRegionManifest { error, .. }
| DeleteRegionManifest { error, .. } => retry_hint_from_opendal_error(error),
_ => RetryHint::NonRetryable,
}
}
}

View File

@@ -18,12 +18,15 @@ use std::any::Any;
use api::v1::CreateTableExpr;
use arrow_schema::ArrowError;
use common_error::ext::BoxedError;
use common_error::{define_into_tonic_status, from_err_code_msg_to_header};
use common_error::ext::{BoxedError, RetryHint};
use common_error::{
GREPTIME_DB_HEADER_ERROR_RETRY_HINT, define_into_tonic_status, from_err_code_msg_to_header,
};
use common_macro::stack_trace_debug;
use common_telemetry::common_error::ext::ErrorExt;
use common_telemetry::common_error::status_code::StatusCode;
use snafu::{Location, ResultExt, Snafu};
use tonic::codegen::http::HeaderValue;
use tonic::metadata::MetadataMap;
use crate::FlowId;
@@ -308,7 +311,11 @@ pub fn to_status_with_last_err(err: impl ErrorExt) -> tonic::Status {
let msg = err.to_string();
let last_err_msg = common_error::ext::StackError::last(&err).to_string();
let code = err.status_code() as u32;
let header = from_err_code_msg_to_header(code, &last_err_msg);
let mut header = from_err_code_msg_to_header(code, &last_err_msg);
header.insert(
GREPTIME_DB_HEADER_ERROR_RETRY_HINT,
HeaderValue::from_static(err.retry_hint().as_str()),
);
tonic::Status::with_metadata(
tonic::Code::InvalidArgument,
@@ -366,6 +373,35 @@ impl ErrorExt for Error {
fn as_any(&self) -> &dyn Any {
self
}
fn retry_hint(&self) -> RetryHint {
match self {
Self::FlowNotRecovered { .. } | Self::NoAvailableFrontend { .. } => {
RetryHint::Retryable
}
Self::InsertIntoFlow { source, .. }
| Self::CreateFlow { source, .. }
| Self::CreateSinkTable { source, .. }
| Self::External { source, .. } => source.retry_hint(),
Self::Time { source, .. } => source.retry_hint(),
Self::TableNotFoundMeta { source, .. } | Self::ListFlows { source, .. } => {
source.retry_hint()
}
Self::Datatypes { source, .. } => source.retry_hint(),
Self::StartServer { source, .. } | Self::ShutdownServer { source, .. } => {
source.retry_hint()
}
Self::MetaClientInit { source, .. } => source.retry_hint(),
Self::InvalidRequest { source, .. } => source.retry_hint(),
Self::SubstraitEncodeLogicalPlan { source, .. } => source.retry_hint(),
Self::ConvertColumnSchema { source, .. } => source.retry_hint(),
Self::InvalidClientConfig { source, .. } => source.retry_hint(),
_ => RetryHint::NonRetryable,
}
}
}
define_into_tonic_status!(Error);

View File

@@ -16,7 +16,7 @@ use std::any::Any;
use common_datasource::file_format::Format;
use common_error::define_into_tonic_status;
use common_error::ext::{BoxedError, ErrorExt};
use common_error::ext::{BoxedError, ErrorExt, RetryHint};
use common_error::status_code::StatusCode;
use common_macro::stack_trace_debug;
use common_query::error::datafusion_status_code;
@@ -448,6 +448,46 @@ impl ErrorExt for Error {
fn as_any(&self) -> &dyn Any {
self
}
fn retry_hint(&self) -> RetryHint {
match self {
Error::InvalidateTableCache { source, .. }
| Error::HandleHeartbeatResponse { source, .. }
| Error::RequestQuery { source, .. } => source.retry_hint(),
Error::External { source, .. }
| Error::SqlExecIntercepted { source, .. }
| Error::InitPlugin { source, .. } => source.retry_hint(),
Error::StartServer { source, .. }
| Error::ShutdownServer { source, .. }
| Error::InvokeRegionServer { source, .. }
| Error::ExecutePromql { source, .. }
| Error::PromStoreRemoteQueryPlan { source, .. }
| Error::PrometheusMetricNamesQueryPlan { source, .. } => source.retry_hint(),
Error::ParseSql { source, .. } => source.retry_hint(),
Error::Catalog { source, .. } => source.retry_hint(),
Error::CreateMetaHeartbeatStream { source, .. } => source.retry_hint(),
Error::FindRegionPeer { source, .. } => source.retry_hint(),
Error::Table { source, .. } => source.retry_hint(),
Error::CollectRecordbatch { source, .. } => source.retry_hint(),
Error::PlanStatement { source, .. }
| Error::ReadTable { source, .. }
| Error::ExecLogicalPlan { source, .. }
| Error::DescribeStatement { source, .. } => source.retry_hint(),
Error::PrometheusLabelValuesQueryPlan { source, .. } => source.retry_hint(),
Error::Insert { source, .. } => source.retry_hint(),
Error::Permission { source, .. } => source.retry_hint(),
Error::TableOperation { source, .. } => source.retry_hint(),
Error::IllegalAuthConfig { source, .. } => source.retry_hint(),
Error::TomlFormat { source, .. } => source.retry_hint(),
Error::InvalidTlsConfig { error, .. } => error.retry_hint(),
Error::SubstraitDecodeLogicalPlan { source, .. } => source.retry_hint(),
_ => RetryHint::NonRetryable,
}
}
}
define_into_tonic_status!(Error);

View File

@@ -543,7 +543,12 @@ impl Instance {
| StatusCode::TableColumnNotFound => ChunkFailureReaction::RetryPerSpan,
StatusCode::DatabaseNotFound => ChunkFailureReaction::DiscardChunk,
StatusCode::Cancelled | StatusCode::DeadlineExceeded => ChunkFailureReaction::Propagate,
_ if status.is_retryable() => ChunkFailureReaction::Propagate,
StatusCode::StorageUnavailable
| StatusCode::RuntimeResourcesExhausted
| StatusCode::Internal
| StatusCode::RegionNotReady
| StatusCode::TableUnavailable
| StatusCode::RegionBusy => ChunkFailureReaction::Propagate,
_ => ChunkFailureReaction::DiscardChunk,
}
}

View File

@@ -14,10 +14,12 @@
use std::any::Any;
use common_error::ext::ErrorExt;
use common_error::ext::{ErrorExt, RetryHint, retry_hint_from_io_error};
use common_error::status_code::StatusCode;
use common_macro::stack_trace_debug;
use common_runtime::error::Error as RuntimeError;
use common_wal::kafka::rskafka_client_error_to_retry_hint;
use object_store::error::retry_hint_from_opendal_error;
use serde_json::error::Error as JsonError;
use snafu::{Location, Snafu};
use store_api::storage::RegionId;
@@ -375,4 +377,30 @@ impl ErrorExt for Error {
| ConsumeRecord { error, .. } => rskafka_client_error_to_status_code(error),
}
}
fn retry_hint(&self) -> RetryHint {
use Error::*;
match self {
CreateWriter { error, .. } | WriteIndex { error, .. } | ReadIndex { error, .. } => {
retry_hint_from_opendal_error(error)
}
Io { error, .. } => retry_hint_from_io_error(error),
FetchEntry { .. } | RaftEngine { .. } | AddEntryLogBatch { .. } => RetryHint::Retryable,
ProduceRecord { error, .. } => match error {
rskafka::client::producer::Error::Client(error) => {
rskafka_client_error_to_retry_hint(error)
}
rskafka::client::producer::Error::Aggregator(_)
| rskafka::client::producer::Error::FlushError(_)
| rskafka::client::producer::Error::TooLarge => RetryHint::NonRetryable,
},
BuildClient { error, .. }
| BuildPartitionClient { error, .. }
| BatchProduce { error, .. }
| GetOffset { error, .. }
| ConsumeRecord { error, .. } => rskafka_client_error_to_retry_hint(error),
_ => RetryHint::NonRetryable,
}
}
}

View File

@@ -13,7 +13,7 @@
// limitations under the License.
use common_error::define_from_tonic_status;
use common_error::ext::ErrorExt;
use common_error::ext::{ErrorExt, RetryHint};
use common_error::status_code::StatusCode;
use common_macro::stack_trace_debug;
use snafu::{Location, Snafu};
@@ -34,6 +34,7 @@ pub enum Error {
code: StatusCode,
msg: String,
tonic_code: tonic::Code,
retry_hint: RetryHint,
#[snafu(implicit)]
location: Location,
},
@@ -156,6 +157,18 @@ impl ErrorExt for Error {
| Error::GetFlowStat { source, .. } => source.status_code(),
}
}
fn retry_hint(&self) -> RetryHint {
match self {
Error::MetaServer { retry_hint, .. } => *retry_hint,
Error::InvalidResponseHeader { source, .. }
| Error::ConvertMetaRequest { source, .. }
| Error::ConvertMetaResponse { source, .. }
| Error::GetFlowStat { source, .. } => source.retry_hint(),
Error::CreateChannel { source, .. } => source.retry_hint(),
_ => RetryHint::NonRetryable,
}
}
}
impl Error {
@@ -171,3 +184,62 @@ impl Error {
}
define_from_tonic_status!(Error, MetaServer);
#[cfg(test)]
mod tests {
use common_error::ext::{ErrorExt, RetryHint};
use common_error::{GREPTIME_DB_HEADER_ERROR_CODE, GREPTIME_DB_HEADER_ERROR_RETRY_HINT};
use tonic::codegen::http::{HeaderMap, HeaderValue};
use tonic::metadata::MetadataMap;
use super::*;
#[test]
fn test_from_tonic_status_fallbacks_to_status_code() {
let status = tonic::Status::new(tonic::Code::Internal, "blabla");
let err: Error = status.into();
assert_eq!(err.retry_hint(), RetryHint::NonRetryable);
}
#[test]
fn test_from_tonic_status_fallback_can_be_non_retryable() {
let mut headers = HeaderMap::new();
headers.insert(
GREPTIME_DB_HEADER_ERROR_CODE,
HeaderValue::from(StatusCode::InvalidArguments as u32),
);
let status = tonic::Status::with_metadata(
tonic::Code::Internal,
"blabla",
MetadataMap::from_headers(headers),
);
let err: Error = status.into();
assert_eq!(err.retry_hint(), RetryHint::NonRetryable);
}
#[test]
fn test_from_tonic_status_with_retry_hint() {
let mut headers = HeaderMap::new();
headers.insert(
GREPTIME_DB_HEADER_ERROR_CODE,
HeaderValue::from(StatusCode::Internal as u32),
);
headers.insert(
GREPTIME_DB_HEADER_ERROR_RETRY_HINT,
HeaderValue::from_static(RetryHint::Retryable.as_str()),
);
let status = tonic::Status::with_metadata(
tonic::Code::Internal,
"blabla",
MetadataMap::from_headers(headers),
);
let err: Error = status.into();
assert_eq!(err.retry_hint(), RetryHint::Retryable);
}
}

View File

@@ -13,12 +13,13 @@
// limitations under the License.
use common_error::define_into_tonic_status;
use common_error::ext::{BoxedError, ErrorExt};
use common_error::ext::{BoxedError, ErrorExt, RetryHint};
use common_error::status_code::StatusCode;
use common_macro::stack_trace_debug;
use common_meta::DatanodeId;
use common_procedure::ProcedureId;
use common_runtime::JoinError;
use common_wal::kafka::rskafka_client_error_to_retry_hint;
use snafu::{Location, Snafu};
use store_api::storage::RegionId;
use table::metadata::TableId;
@@ -1124,29 +1125,7 @@ pub enum Error {
impl Error {
/// Returns `true` if the error is retryable.
pub fn is_retryable(&self) -> bool {
matches!(
self,
Error::RetryLater { .. }
| Error::RetryLaterWithSource { .. }
| Error::MailboxTimeout { .. }
) || matches!(
self,
Error::AllocateRegions { source, .. } if source.is_retry_later()
) || matches!(
self,
Error::DeallocateRegions { source, .. } if source.is_retry_later()
) || matches!(
self,
Error::DeleteRecords { error, .. }
| Error::BuildPartitionClient { error, .. }
| Error::GetOffset { error, .. }
if Self::is_retryable_kafka_client_error(error)
)
}
/// Returns `true` if the Kafka client has exhausted its internal retry.
fn is_retryable_kafka_client_error(err: &rskafka::client::error::Error) -> bool {
matches!(err, rskafka::client::error::Error::RetryFailed(_))
self.retry_hint().is_retryable()
}
}
@@ -1311,6 +1290,35 @@ impl ErrorExt for Error {
fn as_any(&self) -> &dyn std::any::Any {
self
}
fn retry_hint(&self) -> RetryHint {
match self {
Error::RetryLater { .. }
| Error::RetryLaterWithSource { .. }
| Error::MailboxTimeout { .. }
| Error::NoEnoughAvailableNode { .. }
| Error::NoLeader { .. }
| Error::LeaderLeaseExpired { .. }
| Error::LeaderLeaseChanged { .. }
| Error::PeerUnavailable { .. } => RetryHint::Retryable,
Error::AllocateRegions { source, .. } | Error::DeallocateRegions { source, .. }
if source.retry_hint().is_retryable() =>
{
RetryHint::Retryable
}
Error::DeleteRecords { error, .. }
| Error::BuildPartitionClient { error, .. }
| Error::GetOffset { error, .. } => rskafka_client_error_to_retry_hint(error),
Error::PusherNotFound { .. }
| Error::PushMessage { .. }
| Error::ExceededDeadline { .. } => RetryHint::NonRetryable,
_ => RetryHint::NonRetryable,
}
}
}
// for form tonic
@@ -1338,6 +1346,7 @@ pub(crate) fn match_for_io_error(err_status: &tonic::Status) -> Option<&std::io:
mod tests {
use std::time::Duration;
use common_error::ext::ErrorExt;
use common_error::mock::MockError;
use common_error::status_code::StatusCode;
use rskafka::BackoffError;
@@ -1363,6 +1372,7 @@ mod tests {
.unwrap_err();
assert!(err.is_retryable());
assert!(err.retry_hint().is_retryable());
}
#[test]
@@ -1376,6 +1386,7 @@ mod tests {
.unwrap_err();
assert!(!err.is_retryable());
assert!(!err.retry_hint().is_retryable());
}
#[test]
@@ -1402,24 +1413,28 @@ mod tests {
assert!(delete_records_err.is_retryable());
assert!(build_partition_client_err.is_retryable());
assert!(get_offset_err.is_retryable());
assert!(delete_records_err.retry_hint().is_retryable());
assert!(build_partition_client_err.retry_hint().is_retryable());
assert!(get_offset_err.retry_hint().is_retryable());
}
#[test]
fn test_kafka_non_retry_failed_errors_are_not_retryable() {
let delete_records_err = Err::<(), _>(KafkaClientError::Timeout)
let delete_records_err = Err::<(), _>(KafkaClientError::InvalidResponse("invalid".into()))
.context(DeleteRecordsSnafu {
topic: "test_topic",
partition: 0,
offset: 1024u64,
})
.unwrap_err();
let build_partition_client_err = Err::<(), _>(KafkaClientError::Timeout)
.context(BuildPartitionClientSnafu {
topic: "test_topic",
partition: 0,
})
.unwrap_err();
let get_offset_err = Err::<(), _>(KafkaClientError::Timeout)
let build_partition_client_err =
Err::<(), _>(KafkaClientError::InvalidResponse("invalid".into()))
.context(BuildPartitionClientSnafu {
topic: "test_topic",
partition: 0,
})
.unwrap_err();
let get_offset_err = Err::<(), _>(KafkaClientError::InvalidResponse("invalid".into()))
.context(GetOffsetSnafu {
topic: "test_topic",
})
@@ -1428,5 +1443,8 @@ mod tests {
assert!(!delete_records_err.is_retryable());
assert!(!build_partition_client_err.is_retryable());
assert!(!get_offset_err.is_retryable());
assert!(!delete_records_err.retry_hint().is_retryable());
assert!(!build_partition_client_err.retry_hint().is_retryable());
assert!(!get_offset_err.retry_hint().is_retryable());
}
}

View File

@@ -15,7 +15,7 @@
use std::any::Any;
use std::sync::Arc;
use common_error::ext::{BoxedError, ErrorExt};
use common_error::ext::{BoxedError, ErrorExt, RetryHint};
use common_error::status_code::StatusCode;
use common_macro::stack_trace_debug;
use datatypes::prelude::ConcreteDataType;
@@ -464,4 +464,30 @@ impl ErrorExt for Error {
fn as_any(&self) -> &dyn Any {
self
}
fn retry_hint(&self) -> RetryHint {
use Error::*;
match self {
CreateMitoRegion { source, .. }
| OpenMitoRegion { source, .. }
| BatchOpenMitoRegion { source, .. }
| BatchCatchupMitoRegion { source, .. }
| CloseMitoRegion { source, .. }
| MitoReadOperation { source, .. }
| MitoWriteOperation { source, .. }
| MitoFlushOperation { source, .. }
| MitoSyncOperation { source, .. }
| MitoEnterStagingOperation { source, .. }
| MitoCopyRegionFromOperation { source, .. }
| MitoEditRegion { source, .. } => source.retry_hint(),
EncodePrimaryKey { source, .. } => source.retry_hint(),
CollectRecordBatchStream { source, .. } => source.retry_hint(),
StartRepeatedTask { source, .. } => source.retry_hint(),
CacheGet { source, .. } => source.retry_hint(),
_ => RetryHint::NonRetryable,
}
}
}

View File

@@ -16,7 +16,7 @@ use std::any::Any;
use std::sync::Arc;
use common_datasource::compression::CompressionType;
use common_error::ext::{BoxedError, ErrorExt};
use common_error::ext::{BoxedError, ErrorExt, RetryHint};
use common_error::status_code::StatusCode;
use common_macro::stack_trace_debug;
use common_memory_manager;
@@ -26,6 +26,7 @@ use common_time::timestamp::TimeUnit;
use datatypes::arrow::error::ArrowError;
use datatypes::prelude::ConcreteDataType;
use object_store::ErrorKind;
use object_store::error::retry_hint_from_opendal_error;
use partition::error::Error as PartitionError;
use prost::DecodeError;
use snafu::{Location, Snafu};
@@ -1498,4 +1499,80 @@ impl ErrorExt for Error {
fn as_any(&self) -> &dyn Any {
self
}
fn retry_hint(&self) -> RetryHint {
use Error::*;
match self {
ReadParquet { .. }
| WriteParquet { .. }
| RejectWrite { .. }
| Download { .. }
| Upload { .. }
| RegionState { .. }
| UpdateManifest { .. }
| RegionStopped { .. }
| RegionBusy { .. }
| FlushableRegionState { .. } => RetryHint::Retryable,
OpenDal { error, .. }
| DeleteSsts { error, .. }
| DeleteIndex { error, .. }
| DeleteIndexes { error, .. } => retry_hint_from_opendal_error(error),
WriteWal { source, .. }
| ReadWal { source, .. }
| DeleteWal { source, .. }
| FetchManifests { source, .. }
| External { source, .. } => source.retry_hint(),
OpenRegion { source, .. }
| WriteGroup { source, .. }
| FlushRegion { source, .. }
| BuildIndexAsync { source, .. }
| CompactRegion { source, .. }
| EditRegion { source, .. }
| ScanSeries { source, .. }
| PruneFile { source, .. } => source.retry_hint(),
DataTypeMismatch { source, .. }
| ConvertVector { source, .. }
| ConvertValue { source, .. }
| IndexOptions { source, .. }
| CastVector { source, .. } => source.retry_hint(),
BuildIndexApplier { source, .. }
| PushIndexValue { source, .. }
| ApplyInvertedIndex { source, .. }
| IndexFinish { source, .. } => source.retry_hint(),
ApplyBloomFilterIndex { source, .. }
| PushBloomFilterValue { source, .. }
| BloomFilterFinish { source, .. } => source.retry_hint(),
PuffinReadBlob { source, .. }
| PuffinAddBlob { source, .. }
| PuffinInitStager { source, .. }
| PuffinBuildReader { source, .. }
| PuffinPurgeStager { source, .. } => source.retry_hint(),
CreateFulltextCreator { source, .. }
| FulltextPushText { source, .. }
| FulltextFinish { source, .. }
| ApplyFulltextIndex { source, .. } => source.retry_hint(),
InvalidRegionRequest { source, .. } => source.retry_hint(),
InvalidPartitionExpr { source, .. } => source.retry_hint(),
RecordBatch { source, .. } => source.retry_hint(),
GetSchemaMetadata { source, .. } => source.retry_hint(),
CompactionMemoryExhausted { source, .. } => source.retry_hint(),
ConvertBulkWalEntry { source, .. } => source.retry_hint(),
Encode { source, .. } | Decode { source, .. } => source.retry_hint(),
#[cfg(feature = "enterprise")]
ScanExternalRange { source, .. } => source.retry_hint(),
_ => RetryHint::NonRetryable,
}
}
}

View File

@@ -15,7 +15,7 @@
use std::any::Any;
use common_macro::stack_trace_debug;
use common_telemetry::common_error::ext::ErrorExt;
use common_telemetry::common_error::ext::{ErrorExt, RetryHint};
use common_telemetry::common_error::status_code::StatusCode;
use snafu::{Location, Snafu};
@@ -56,6 +56,15 @@ pub enum Error {
pub type Result<T> = std::result::Result<T, Error>;
/// Converts an `opendal::Error` into a `RetryHint`.
pub fn retry_hint_from_opendal_error(error: &opendal::Error) -> RetryHint {
if error.is_temporary() {
RetryHint::Retryable
} else {
RetryHint::NonRetryable
}
}
impl ErrorExt for Error {
fn status_code(&self) -> StatusCode {
use Error::*;
@@ -69,4 +78,11 @@ impl ErrorExt for Error {
fn as_any(&self) -> &dyn Any {
self
}
fn retry_hint(&self) -> RetryHint {
match self {
Error::InitBackend { error, .. } => retry_hint_from_opendal_error(error),
_ => RetryHint::NonRetryable,
}
}
}

View File

@@ -16,13 +16,14 @@ use std::any::Any;
use common_datasource::file_format::Format;
use common_error::define_into_tonic_status;
use common_error::ext::{BoxedError, ErrorExt};
use common_error::ext::{BoxedError, ErrorExt, RetryHint};
use common_error::status_code::StatusCode;
use common_macro::stack_trace_debug;
use common_query::error::Error as QueryResult;
use datafusion::parquet;
use datafusion_common::DataFusionError;
use datatypes::arrow::error::ArrowError;
use object_store::error::retry_hint_from_opendal_error;
use snafu::{Location, Snafu};
use table::metadata::TableType;
@@ -1062,6 +1063,72 @@ impl ErrorExt for Error {
fn as_any(&self) -> &dyn Any {
self
}
fn retry_hint(&self) -> RetryHint {
match self {
Error::ReadObject { error, .. } => retry_hint_from_opendal_error(error),
Error::ReadParquetMetadata { .. } => RetryHint::Retryable,
Error::InvalidateTableCache { source, .. }
| Error::ExecuteDdl { source, .. }
| Error::RequestInserts { source, .. }
| Error::RequestDeletes { source, .. }
| Error::RequestRegion { source, .. }
| Error::FindViewInfo { source, .. }
| Error::TableMetadataManager { source, .. } => source.retry_hint(),
Error::ParseFileFormat { source, .. }
| Error::InferSchema { source, .. }
| Error::ListObjects { source, .. }
| Error::ParseUrl { source, .. }
| Error::BuildBackend { source, .. }
| Error::ReadOrc { source, .. } => source.retry_hint(),
Error::ExtractTableNames { source, .. }
| Error::ExecuteStatement { source, .. }
| Error::PlanStatement { source, .. }
| Error::ParseQuery { source, .. }
| Error::ExecLogicalPlan { source, .. }
| Error::DescribeStatement { source, .. } => source.retry_hint(),
Error::FindTablePartitionRule { source, .. }
| Error::SplitInsert { source, .. }
| Error::SplitDelete { source, .. }
| Error::FindRegionLeader { source, .. } => source.retry_hint(),
Error::BuildCreateExprOnInsertion { source, .. }
| Error::FindNewColumnsOnInsertion { source, .. }
| Error::AlterExprToRequest { source, .. } => source.retry_hint(),
Error::ConvertColumnDefaultConstraint { source, .. }
| Error::IntoVectors { source, .. }
| Error::ColumnDefaultValue { source, .. } => source.retry_hint(),
Error::ColumnDataType { source, .. }
| Error::InvalidColumnDef { source, .. }
| Error::ColumnOptions { source, .. } => source.retry_hint(),
Error::Table { source, .. }
| Error::Insert { source, .. }
| Error::MissingTimeIndexColumn { source, .. } => source.retry_hint(),
Error::Cast { source, .. } => source.retry_hint(),
Error::ParseSql { source, .. } => source.retry_hint(),
Error::Catalog { source, .. } => source.retry_hint(),
Error::SubstraitCodec { source, .. } => source.retry_hint(),
Error::External { source, .. } => source.retry_hint(),
Error::BuildRecordBatch { source, .. } => source.retry_hint(),
Error::DecodeFlightData { source, .. } => source.retry_hint(),
Error::SqlCommon { source, .. } => source.retry_hint(),
Error::ConvertSchema { source, .. } => source.retry_hint(),
Error::WriteStreamToFile { source, .. } => source.retry_hint(),
Error::PrepareFileTable { source, .. } | Error::InferFileTableSchema { source, .. } => {
source.retry_hint()
}
#[cfg(feature = "enterprise")]
Error::TriggerQuerier { source, .. } => source.retry_hint(),
_ => RetryHint::NonRetryable,
}
}
}
define_into_tonic_status!(Error);

View File

@@ -14,7 +14,7 @@
use std::any::Any;
use common_error::ext::ErrorExt;
use common_error::ext::{ErrorExt, RetryHint};
use common_error::status_code::StatusCode;
use common_macro::stack_trace_debug;
use datafusion_common::ScalarValue;
@@ -283,6 +283,17 @@ impl ErrorExt for Error {
fn as_any(&self) -> &dyn Any {
self
}
fn retry_hint(&self) -> RetryHint {
match self {
Error::GetCache { .. } | Error::FindLeader { .. } => RetryHint::Retryable,
Error::TableRouteManager { source, .. }
| Error::GetPartitionInfo { source, .. }
| Error::UnexpectedLogicalRouteTable { source, .. } => source.retry_hint(),
Error::ConvertToVector { source, .. } => source.retry_hint(),
_ => RetryHint::NonRetryable,
}
}
}
pub type Result<T> = std::result::Result<T, Error>;

View File

@@ -15,7 +15,7 @@
use std::any::Any;
use std::time::{Duration, TryFromFloatSecsError};
use common_error::ext::{BoxedError, ErrorExt};
use common_error::ext::{BoxedError, ErrorExt, RetryHint};
use common_error::status_code::StatusCode;
use common_macro::stack_trace_debug;
use common_query::error::datafusion_status_code;
@@ -462,6 +462,29 @@ impl ErrorExt for Error {
fn as_any(&self) -> &dyn Any {
self
}
fn retry_hint(&self) -> RetryHint {
use Error::*;
match self {
BuildBackend { source, .. } | ListObjects { source, .. } => source.retry_hint(),
GetRegionMetadata { .. } => RetryHint::Retryable,
ParseFileFormat { source, .. } | InferSchema { source, .. } => source.retry_hint(),
Catalog { source, .. } => source.retry_hint(),
CreateRecordBatch { source, .. } => source.retry_hint(),
PartitionRuleManager { source, .. } => source.retry_hint(),
QueryExecution { source, .. } | QueryPlan { source, .. } => source.retry_hint(),
Sql { source, .. } => source.retry_hint(),
ConvertSqlType { source, .. } | ConvertSqlValue { source, .. } => source.retry_hint(),
RegionQuery { source, .. } => source.retry_hint(),
TableMutation { source, .. } => source.retry_hint(),
GetFulltextOptions { source, .. }
| GetSkippingIndexOptions { source, .. }
| GetVectorIndexOptions { source, .. }
| Datatypes { source, .. } => source.retry_hint(),
_ => RetryHint::NonRetryable,
}
}
}
pub type Result<T> = std::result::Result<T, Error>;

View File

@@ -23,7 +23,7 @@ use axum::{Json, http};
use base64::DecodeError;
use common_base::readable_size::ReadableSize;
use common_error::define_into_tonic_status;
use common_error::ext::{BoxedError, ErrorExt};
use common_error::ext::{BoxedError, ErrorExt, RetryHint};
use common_error::status_code::StatusCode;
use common_macro::stack_trace_debug;
use common_telemetry::{error, warn};
@@ -853,6 +853,47 @@ impl ErrorExt for Error {
}
}
fn retry_hint(&self) -> RetryHint {
use Error::*;
match self {
ExecuteQuery { source, .. }
| ExecutePlan { source, .. }
| ExecuteGrpcQuery { source, .. }
| ExecuteGrpcRequest { source, .. }
| CheckDatabaseValidity { source, .. }
| DescribeStatement { source } => source.retry_hint(),
Pipeline { source, .. } => source.retry_hint(),
CommonMeta { source, .. } => source.retry_hint(),
Catalog { source, .. } => source.retry_hint(),
RowWriter { source, .. } => source.retry_hint(),
Auth { source, .. } => source.retry_hint(),
#[cfg(feature = "mem-prof")]
DumpProfileData { source, .. } => source.retry_hint(),
ParsePromQL { source, .. } => source.retry_hint(),
Other { source, .. } => source.retry_hint(),
#[cfg(feature = "pprof")]
DumpPprof { source, .. } => source.retry_hint(),
ConvertScalarValue { source, .. } => source.retry_hint(),
ConvertSqlValue { source, .. } => source.retry_hint(),
GreptimeProto { source, .. } => source.retry_hint(),
Partition { source, .. } => source.retry_hint(),
MetricEngine { source, .. } => source.retry_hint(),
SubmitBatch { source, .. } => source.retry_hint(),
MemoryLimitExceeded { source, .. } => source.retry_hint(),
CollectRecordbatch { source, .. } => source.retry_hint(),
TooManyConcurrentRequests { .. } => RetryHint::Retryable,
_ => RetryHint::NonRetryable,
}
}
fn as_any(&self) -> &dyn Any {
self
}