From e5930c53675bd111e9ca0c3430218ac1c4498320 Mon Sep 17 00:00:00 2001 From: Weny Xu Date: Thu, 18 Jun 2026 19:07:34 +0800 Subject: [PATCH] refactor(meta): centralize backend retry classification (#8333) * refactor(meta): centralize backend retry hint classification Signed-off-by: WenyXu * refactor(meta): refine backend retry classification Signed-off-by: WenyXu * refactor(meta): address mysql retry review Signed-off-by: WenyXu * refactor(meta): include mysql lock timeout in txn retry Signed-off-by: WenyXu --------- Signed-off-by: WenyXu --- src/common/meta/src/error.rs | 55 +++- src/common/meta/src/error/retry_hint.rs | 28 ++ src/common/meta/src/error/retry_hint/etcd.rs | 74 +++++ src/common/meta/src/error/retry_hint/mysql.rs | 308 ++++++++++++++++++ .../meta/src/error/retry_hint/postgres.rs | 153 +++++++++ src/meta-srv/src/error.rs | 78 ++++- 6 files changed, 677 insertions(+), 19 deletions(-) create mode 100644 src/common/meta/src/error/retry_hint.rs create mode 100644 src/common/meta/src/error/retry_hint/etcd.rs create mode 100644 src/common/meta/src/error/retry_hint/mysql.rs create mode 100644 src/common/meta/src/error/retry_hint/postgres.rs diff --git a/src/common/meta/src/error.rs b/src/common/meta/src/error.rs index 881887feff..1528f4f387 100644 --- a/src/common/meta/src/error.rs +++ b/src/common/meta/src/error.rs @@ -29,6 +29,14 @@ use table::metadata::TableId; use crate::DatanodeId; use crate::peer::Peer; +mod retry_hint; + +pub use retry_hint::retry_hint_from_etcd_error; +#[cfg(feature = "mysql_kvbackend")] +pub use retry_hint::retry_hint_from_sqlx_error; +#[cfg(feature = "pg_kvbackend")] +pub use retry_hint::{retry_hint_from_postgres_error, retry_hint_from_postgres_pool_error}; + #[derive(Snafu)] #[snafu(visibility(pub))] #[stack_trace_debug] @@ -1280,6 +1288,9 @@ impl ErrorExt for Error { | ElectionNoLeader { .. } | ElectionLeaderLeaseExpired { .. } | ElectionLeaderLeaseChanged { .. } => RetryHint::Retryable, + ConnectEtcd { error, .. } | EtcdFailed { error, .. } | EtcdTxnFailed { error, .. } => { + retry_hint_from_etcd_error(error) + } WriteObject { error, .. } | ReadObject { error, .. } => { retry_hint_from_opendal_error(error) } @@ -1307,6 +1318,19 @@ impl ErrorExt for Error { ConvertAlterTableRequest { source, .. } => source.retry_hint(), ConvertColumnDef { source, .. } => source.retry_hint(), GetCache { source, .. } => source.retry_hint(), + #[cfg(feature = "pg_kvbackend")] + PostgresExecution { error, .. } | PostgresTransaction { error, .. } => { + retry_hint_from_postgres_error(error) + } + #[cfg(feature = "pg_kvbackend")] + GetPostgresClient { error, .. } => retry_hint_from_postgres_pool_error(error), + #[cfg(feature = "mysql_kvbackend")] + MySqlExecution { error, .. } + | CreateMySqlPool { error, .. } + | AcquireMySqlClient { error, .. } + | MySqlTransaction { error, .. } => retry_hint_from_sqlx_error(error), + #[cfg(any(feature = "pg_kvbackend", feature = "mysql_kvbackend"))] + SqlExecutionTimeout { .. } => RetryHint::Retryable, _ => RetryHint::NonRetryable, } } @@ -1318,23 +1342,12 @@ impl Error { pub fn is_serialization_error(&self) -> bool { match self { #[cfg(feature = "pg_kvbackend")] - Error::PostgresTransaction { error, .. } => { - error.code() == Some(&tokio_postgres::error::SqlState::T_R_SERIALIZATION_FAILURE) - } - #[cfg(feature = "pg_kvbackend")] - Error::PostgresExecution { error, .. } => { - error.code() == Some(&tokio_postgres::error::SqlState::T_R_SERIALIZATION_FAILURE) + Error::PostgresExecution { error, .. } | Error::PostgresTransaction { error, .. } => { + retry_hint::is_postgres_serialization_error(error) } #[cfg(feature = "mysql_kvbackend")] - Error::MySqlExecution { - error: sqlx::Error::Database(database_error), - .. - } => { - matches!( - database_error.message(), - "Deadlock found when trying to get lock; try restarting transaction" - | "can't serialize access for this transaction" - ) + Error::MySqlExecution { error, .. } | Error::MySqlTransaction { error, .. } => { + retry_hint::is_mysql_serialization_error(error) } _ => false, } @@ -1410,6 +1423,18 @@ mod retry_hint_tests { assert_eq!(err.retry_hint(), RetryHint::Retryable); } + #[cfg(any(feature = "pg_kvbackend", feature = "mysql_kvbackend"))] + #[test] + fn test_sql_execution_timeout_hint_is_retryable() { + let err = SqlExecutionTimeoutSnafu { + sql: "SELECT 1".to_string(), + duration: std::time::Duration::from_secs(1), + } + .build(); + + assert_eq!(err.retry_hint(), RetryHint::Retryable); + } + #[test] fn test_default_hint_is_non_retryable() { let err = UnexpectedSnafu { diff --git a/src/common/meta/src/error/retry_hint.rs b/src/common/meta/src/error/retry_hint.rs new file mode 100644 index 0000000000..fcf83c651c --- /dev/null +++ b/src/common/meta/src/error/retry_hint.rs @@ -0,0 +1,28 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +mod etcd; +#[cfg(feature = "mysql_kvbackend")] +mod mysql; +#[cfg(feature = "pg_kvbackend")] +mod postgres; + +pub use etcd::retry_hint_from_etcd_error; +#[cfg(feature = "mysql_kvbackend")] +pub use mysql::{is_mysql_serialization_error, retry_hint_from_sqlx_error}; +#[cfg(feature = "pg_kvbackend")] +pub use postgres::{ + is_postgres_serialization_error, retry_hint_from_postgres_error, + retry_hint_from_postgres_pool_error, +}; diff --git a/src/common/meta/src/error/retry_hint/etcd.rs b/src/common/meta/src/error/retry_hint/etcd.rs new file mode 100644 index 0000000000..761491b6bb --- /dev/null +++ b/src/common/meta/src/error/retry_hint/etcd.rs @@ -0,0 +1,74 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use common_error::ext::{RetryHint, retry_hint_from_io_error}; + +/// Converts an etcd client error into a conservative retry hint. +pub fn retry_hint_from_etcd_error(error: &etcd_client::Error) -> RetryHint { + match error { + etcd_client::Error::IoError(error) => retry_hint_from_io_error(error), + etcd_client::Error::TransportError(_) + | etcd_client::Error::EndpointError(_) + | etcd_client::Error::WatchError(_) + | etcd_client::Error::LeaseKeepAliveError(_) + | etcd_client::Error::ElectError(_) => RetryHint::Retryable, + etcd_client::Error::GRpcStatus(status) => retry_hint_from_etcd_grpc_code(status.code()), + etcd_client::Error::InvalidArgs(_) + | etcd_client::Error::InvalidUri(_) + | etcd_client::Error::Utf8Error(_) + | etcd_client::Error::InvalidHeaderValue(_) + | etcd_client::Error::EndpointsNotManaged => RetryHint::NonRetryable, + } +} + +/// Converts a tonic status code from an external backend into a retry hint. +fn retry_hint_from_etcd_grpc_code(code: tonic::Code) -> RetryHint { + match code { + tonic::Code::Unavailable | tonic::Code::DeadlineExceeded | tonic::Code::Aborted => { + RetryHint::Retryable + } + _ => RetryHint::NonRetryable, + } +} + +#[cfg(test)] +mod tests { + use common_error::ext::RetryHint; + + use super::*; + + #[test] + fn test_etcd_grpc_status_retry_hint() { + assert_eq!( + retry_hint_from_etcd_grpc_code(tonic::Code::Unavailable), + RetryHint::Retryable + ); + assert_eq!( + retry_hint_from_etcd_grpc_code(tonic::Code::DeadlineExceeded), + RetryHint::Retryable + ); + assert_eq!( + retry_hint_from_etcd_grpc_code(tonic::Code::Aborted), + RetryHint::Retryable + ); + assert_eq!( + retry_hint_from_etcd_grpc_code(tonic::Code::ResourceExhausted), + RetryHint::NonRetryable + ); + assert_eq!( + retry_hint_from_etcd_grpc_code(tonic::Code::InvalidArgument), + RetryHint::NonRetryable + ); + } +} diff --git a/src/common/meta/src/error/retry_hint/mysql.rs b/src/common/meta/src/error/retry_hint/mysql.rs new file mode 100644 index 0000000000..76aec1ea13 --- /dev/null +++ b/src/common/meta/src/error/retry_hint/mysql.rs @@ -0,0 +1,308 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use common_error::ext::{RetryHint, retry_hint_from_io_error}; + +// MySQL error reference: +// https://dev.mysql.com/doc/mysql-errors/8.0/en/server-error-reference.html +// https://dev.mysql.com/doc/mysql-errors/8.0/en/client-error-reference.html +// MySQL 5.7 error reference: +// https://docs.oracle.com/cd/E17952_01/mysql-errors-5.7-en/server-error-reference.html +// Prefer errno over SQLSTATE because MySQL mixes transient connection failures +// and non-retryable protocol/configuration errors under SQLSTATE class 08. + +// ER_CON_COUNT_ERROR, SQLSTATE 08004: Too many connections. +const ER_CON_COUNT_ERROR: u16 = 1040; +// ER_BAD_HOST_ERROR, SQLSTATE 08S01: Can't get hostname for your address. +const ER_BAD_HOST_ERROR: u16 = 1042; +// ER_HANDSHAKE_ERROR, SQLSTATE 08S01: Bad handshake. +// Treat it as non-retryable because it is commonly caused by protocol, +// authentication, TLS, or configuration mismatches. +const ER_HANDSHAKE_ERROR: u16 = 1043; +// ER_UNKNOWN_COM_ERROR, SQLSTATE 08S01: Unknown command. +const ER_UNKNOWN_COM_ERROR: u16 = 1047; +// ER_ACCESS_DENIED_ERROR, SQLSTATE 28000: Access denied for user. +const ER_ACCESS_DENIED_ERROR: u16 = 1045; +// ER_BAD_DB_ERROR, SQLSTATE 42000: Unknown database. +const ER_BAD_DB_ERROR: u16 = 1049; +// ER_SERVER_SHUTDOWN, SQLSTATE 08S01: Server shutdown in progress. +const ER_SERVER_SHUTDOWN: u16 = 1053; +// ER_FORCING_CLOSE, SQLSTATE 08S01: Forcing close of thread. +const ER_FORCING_CLOSE: u16 = 1080; +// ER_DUP_ENTRY, SQLSTATE 23000: Duplicate entry for key. +const ER_DUP_ENTRY: u16 = 1062; +// ER_NO_SUCH_TABLE, SQLSTATE 42S02: Table doesn't exist. +const ER_NO_SUCH_TABLE: u16 = 1146; + +// ER_ABORTING_CONNECTION, SQLSTATE 08S01: Aborted connection. +const ER_ABORTING_CONNECTION: u16 = 1152; +// ER_NET_PACKET_TOO_LARGE, SQLSTATE 08S01: Packet bigger than max_allowed_packet. +const ER_NET_PACKET_TOO_LARGE: u16 = 1153; +// ER_NET_READ_ERROR_FROM_PIPE, SQLSTATE 08S01: Read error from connection pipe. +const ER_NET_READ_ERROR_FROM_PIPE: u16 = 1154; +// ER_NET_FCNTL_ERROR, SQLSTATE 08S01: Error from fcntl(). +const ER_NET_FCNTL_ERROR: u16 = 1155; +// ER_NET_PACKETS_OUT_OF_ORDER, SQLSTATE 08S01: Packets out of order. +const ER_NET_PACKETS_OUT_OF_ORDER: u16 = 1156; +// ER_NET_UNCOMPRESS_ERROR, SQLSTATE 08S01: Couldn't uncompress communication packet. +const ER_NET_UNCOMPRESS_ERROR: u16 = 1157; +// ER_NET_READ_ERROR, SQLSTATE 08S01: Error reading communication packets. +const ER_NET_READ_ERROR: u16 = 1158; +// ER_NET_READ_INTERRUPTED, SQLSTATE 08S01: Timeout reading communication packets. +const ER_NET_READ_INTERRUPTED: u16 = 1159; +// ER_NET_ERROR_ON_WRITE, SQLSTATE 08S01: Error writing communication packets. +const ER_NET_ERROR_ON_WRITE: u16 = 1160; +// ER_NET_WRITE_INTERRUPTED, SQLSTATE 08S01: Timeout writing communication packets. +const ER_NET_WRITE_INTERRUPTED: u16 = 1161; +// ER_NEW_ABORTING_CONNECTION, SQLSTATE 08S01: Aborted connection. +const ER_NEW_ABORTING_CONNECTION: u16 = 1184; +// ER_MASTER_NET_READ / ER_SOURCE_NET_READ, SQLSTATE 08S01: Net error reading from source. +const ER_MASTER_NET_READ: u16 = 1189; +// ER_MASTER_NET_WRITE / ER_SOURCE_NET_WRITE, SQLSTATE 08S01: Net error writing to source. +const ER_MASTER_NET_WRITE: u16 = 1190; + +// MySQL documents ER_LOCK_WAIT_TIMEOUT as SQLSTATE HY000, so classify it by +// the structured errno instead of SQLSTATE. +// ER_TOO_MANY_USER_CONNECTIONS, SQLSTATE 42000: Too many user connections. +const ER_TOO_MANY_USER_CONNECTIONS: u16 = 1203; +// ER_LOCK_WAIT_TIMEOUT, SQLSTATE HY000: Lock wait timeout exceeded; try restarting transaction. +const ER_LOCK_WAIT_TIMEOUT: u16 = 1205; +// ER_LOCK_DEADLOCK, SQLSTATE 40001: Deadlock found when trying to get lock; try restarting transaction. +const ER_LOCK_DEADLOCK: u16 = 1213; +// ER_CONNECT_TO_MASTER / ER_CONNECT_TO_SOURCE, SQLSTATE 08S01: Error connecting to source. +const ER_CONNECT_TO_MASTER: u16 = 1218; +// ER_USER_LIMIT_REACHED, SQLSTATE 42000: User limit reached. +const ER_USER_LIMIT_REACHED: u16 = 1226; +// ER_NOT_SUPPORTED_AUTH_MODE, SQLSTATE 08004: Client does not support authentication protocol. +const ER_NOT_SUPPORTED_AUTH_MODE: u16 = 1251; +// ER_NET_OK_PACKET_TOO_LARGE, SQLSTATE 08S01: OK packet too large. +const ER_NET_OK_PACKET_TOO_LARGE: u16 = 3068; + +// CR_SERVER_GONE_ERROR, client error: MySQL server has gone away. +const CR_SERVER_GONE_ERROR: u16 = 2006; +// CR_SERVER_LOST, client error: Lost connection to MySQL server during query. +const CR_SERVER_LOST: u16 = 2013; + +/// Converts MySQL database error details into a conservative retry hint. +fn retry_hint_from_mysql_database_error(number: Option, message: &str) -> RetryHint { + match number { + Some( + ER_CON_COUNT_ERROR + | ER_TOO_MANY_USER_CONNECTIONS + | self::ER_USER_LIMIT_REACHED + | ER_BAD_HOST_ERROR + | ER_SERVER_SHUTDOWN + | ER_FORCING_CLOSE + | ER_ABORTING_CONNECTION + | ER_NET_READ_ERROR_FROM_PIPE + | ER_NET_FCNTL_ERROR + | ER_NET_READ_ERROR + | ER_NET_READ_INTERRUPTED + | ER_NET_ERROR_ON_WRITE + | ER_NET_WRITE_INTERRUPTED + | ER_NEW_ABORTING_CONNECTION + | ER_MASTER_NET_READ + | ER_MASTER_NET_WRITE + | ER_LOCK_WAIT_TIMEOUT + | ER_LOCK_DEADLOCK + | ER_CONNECT_TO_MASTER + | CR_SERVER_GONE_ERROR + | CR_SERVER_LOST, + ) => return RetryHint::Retryable, + // These are explicit reviewed non-retryable MySQL errno values. + // Retrying the same request usually cannot fix protocol, + // authentication, payload-size, or schema issues. + Some( + ER_HANDSHAKE_ERROR + | ER_UNKNOWN_COM_ERROR + | ER_ACCESS_DENIED_ERROR + | ER_BAD_DB_ERROR + | ER_DUP_ENTRY + | ER_NO_SUCH_TABLE + | ER_NET_PACKET_TOO_LARGE + | ER_NET_PACKETS_OUT_OF_ORDER + | ER_NET_UNCOMPRESS_ERROR + | ER_NOT_SUPPORTED_AUTH_MODE + | ER_NET_OK_PACKET_TOO_LARGE, + ) => return RetryHint::NonRetryable, + _ => {} + } + + if is_mysql_serialization_database_error(message) { + RetryHint::Retryable + } else { + RetryHint::NonRetryable + } +} + +fn is_mysql_serialization_database_error(message: &str) -> bool { + matches!( + message, + "Deadlock found when trying to get lock; try restarting transaction" + | "can't serialize access for this transaction" + ) +} + +pub fn is_mysql_serialization_error(error: &sqlx::Error) -> bool { + match error { + sqlx::Error::Database(error) => { + let mysql_error = error + .as_error() + .downcast_ref::(); + matches!( + mysql_error.map(|error| error.number()), + Some(ER_LOCK_WAIT_TIMEOUT | ER_LOCK_DEADLOCK) + ) || is_mysql_serialization_database_error(error.message()) + } + _ => false, + } +} + +/// Converts a sqlx error into a conservative retry hint. +pub fn retry_hint_from_sqlx_error(error: &sqlx::Error) -> RetryHint { + match error { + sqlx::Error::Io(error) => retry_hint_from_io_error(error), + // SQLx exposes TLS errors as boxed errors and protocol errors as debug + // strings, so we cannot classify them reliably by structured details. + // TLS errors are often certificate/configuration failures, while + // protocol errors may indicate a driver bug or protocol mismatch. + // Keep them non-retryable to avoid retrying deterministic failures. + sqlx::Error::Tls(_) | sqlx::Error::Protocol(_) => RetryHint::NonRetryable, + sqlx::Error::PoolTimedOut | sqlx::Error::WorkerCrashed => RetryHint::Retryable, + sqlx::Error::Database(error) => { + let mysql_error = error + .as_error() + .downcast_ref::(); + retry_hint_from_mysql_database_error( + mysql_error.map(|error| error.number()), + error.message(), + ) + } + sqlx::Error::Configuration(_) + | sqlx::Error::InvalidArgument(_) + | sqlx::Error::RowNotFound + | sqlx::Error::TypeNotFound { .. } + | sqlx::Error::ColumnIndexOutOfBounds { .. } + | sqlx::Error::ColumnNotFound(_) + | sqlx::Error::ColumnDecode { .. } + | sqlx::Error::Encode(_) + | sqlx::Error::Decode(_) + | sqlx::Error::AnyDriverError(_) + | sqlx::Error::PoolClosed + | sqlx::Error::InvalidSavePointStatement + | sqlx::Error::BeginFailed => RetryHint::NonRetryable, + _ => RetryHint::NonRetryable, + } +} + +#[cfg(test)] +mod tests { + use common_error::ext::RetryHint; + + use super::*; + + #[test] + fn test_mysql_database_error_retry_hint() { + let retryable_numbers = [ + ER_CON_COUNT_ERROR, + ER_TOO_MANY_USER_CONNECTIONS, + ER_USER_LIMIT_REACHED, + ER_BAD_HOST_ERROR, + ER_SERVER_SHUTDOWN, + ER_FORCING_CLOSE, + ER_ABORTING_CONNECTION, + ER_NET_READ_ERROR_FROM_PIPE, + ER_NET_FCNTL_ERROR, + ER_NET_READ_ERROR, + ER_NET_READ_INTERRUPTED, + ER_NET_ERROR_ON_WRITE, + ER_NET_WRITE_INTERRUPTED, + ER_NEW_ABORTING_CONNECTION, + ER_MASTER_NET_READ, + ER_MASTER_NET_WRITE, + ER_LOCK_WAIT_TIMEOUT, + ER_LOCK_DEADLOCK, + ER_CONNECT_TO_MASTER, + CR_SERVER_GONE_ERROR, + CR_SERVER_LOST, + ]; + + for number in retryable_numbers { + assert_eq!( + retry_hint_from_mysql_database_error(Some(number), "retryable mysql error"), + RetryHint::Retryable, + "errno {number} should be retryable" + ); + } + + let non_retryable_numbers = [ + ER_HANDSHAKE_ERROR, + ER_UNKNOWN_COM_ERROR, + ER_ACCESS_DENIED_ERROR, + ER_BAD_DB_ERROR, + ER_DUP_ENTRY, + ER_NO_SUCH_TABLE, + ER_NET_PACKET_TOO_LARGE, + ER_NET_PACKETS_OUT_OF_ORDER, + ER_NET_UNCOMPRESS_ERROR, + ER_NOT_SUPPORTED_AUTH_MODE, + ER_NET_OK_PACKET_TOO_LARGE, + ]; + + for number in non_retryable_numbers { + assert_eq!( + retry_hint_from_mysql_database_error(Some(number), "non-retryable mysql error"), + RetryHint::NonRetryable, + "errno {number} should be non-retryable" + ); + } + } + + #[test] + fn test_mysql_database_error_message_fallback_retry_hint() { + assert_eq!( + retry_hint_from_mysql_database_error( + None, + "Deadlock found when trying to get lock; try restarting transaction", + ), + RetryHint::Retryable + ); + assert_eq!( + retry_hint_from_mysql_database_error( + None, + "can't serialize access for this transaction", + ), + RetryHint::Retryable + ); + assert_eq!( + retry_hint_from_mysql_database_error(None, "unknown mysql error"), + RetryHint::NonRetryable + ); + assert_eq!( + retry_hint_from_mysql_database_error(Some(9999), "unknown mysql error"), + RetryHint::NonRetryable + ); + } + + #[test] + fn test_mysql_serialization_database_error() { + assert!(is_mysql_serialization_database_error( + "Deadlock found when trying to get lock; try restarting transaction", + )); + assert!(is_mysql_serialization_database_error( + "can't serialize access for this transaction" + )); + assert!(!is_mysql_serialization_database_error("duplicate entry")); + } +} diff --git a/src/common/meta/src/error/retry_hint/postgres.rs b/src/common/meta/src/error/retry_hint/postgres.rs new file mode 100644 index 0000000000..0c59d2f7f4 --- /dev/null +++ b/src/common/meta/src/error/retry_hint/postgres.rs @@ -0,0 +1,153 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use common_error::ext::RetryHint; + +/// Converts a tokio-postgres error into a conservative retry hint. +pub fn retry_hint_from_postgres_error(error: &tokio_postgres::Error) -> RetryHint { + if error.is_closed() { + return RetryHint::Retryable; + } + + retry_hint_from_postgres_sql_state(error.code()) +} + +/// Converts a Postgres SQLSTATE into a conservative retry hint. +fn retry_hint_from_postgres_sql_state( + state: Option<&tokio_postgres::error::SqlState>, +) -> RetryHint { + use tokio_postgres::error::SqlState; + + let Some(state) = state else { + return RetryHint::NonRetryable; + }; + + // PostgreSQL SQLSTATE reference: + // https://www.postgresql.org/docs/current/errcodes-appendix.html + match state { + // 40001 serialization_failure: concurrent transaction serialization conflict. + &SqlState::T_R_SERIALIZATION_FAILURE + // 40P01 deadlock_detected: transaction deadlock. + | &SqlState::T_R_DEADLOCK_DETECTED + // 55P03 lock_not_available: lock could not be acquired now. + | &SqlState::LOCK_NOT_AVAILABLE + // 53300 too_many_connections: backend connection capacity exhausted. + | &SqlState::TOO_MANY_CONNECTIONS + // 57P01 admin_shutdown: server is shutting down by administrator request. + | &SqlState::ADMIN_SHUTDOWN + // 57P02 crash_shutdown: server is shutting down after crash. + | &SqlState::CRASH_SHUTDOWN + // 57P03 cannot_connect_now: server is not accepting connections now. + | &SqlState::CANNOT_CONNECT_NOW + // 08000 connection_exception: generic connection exception. + | &SqlState::CONNECTION_EXCEPTION + // 08001 sqlclient_unable_to_establish_sqlconnection: client could not establish connection. + | &SqlState::SQLCLIENT_UNABLE_TO_ESTABLISH_SQLCONNECTION + // 08003 connection_does_not_exist: connection does not exist. + | &SqlState::CONNECTION_DOES_NOT_EXIST + // 08004 sqlserver_rejected_establishment_of_sqlconnection: server rejected connection establishment. + | &SqlState::SQLSERVER_REJECTED_ESTABLISHMENT_OF_SQLCONNECTION + // 08006 connection_failure: connection failure. + | &SqlState::CONNECTION_FAILURE => RetryHint::Retryable, + _ => RetryHint::NonRetryable, + } +} + +/// Converts a deadpool Postgres pool error into a conservative retry hint. +pub fn retry_hint_from_postgres_pool_error( + error: &deadpool::managed::PoolError, +) -> RetryHint { + match error { + deadpool::managed::PoolError::Timeout(_) => RetryHint::Retryable, + deadpool::managed::PoolError::Backend(error) => retry_hint_from_postgres_error(error), + deadpool::managed::PoolError::PostCreateHook(error) => match error { + deadpool::managed::HookError::Backend(error) => retry_hint_from_postgres_error(error), + deadpool::managed::HookError::Message(_) => RetryHint::NonRetryable, + }, + deadpool::managed::PoolError::Closed | deadpool::managed::PoolError::NoRuntimeSpecified => { + RetryHint::NonRetryable + } + } +} + +pub fn is_postgres_serialization_error(error: &tokio_postgres::Error) -> bool { + is_postgres_serialization_state(error.code()) +} + +fn is_postgres_serialization_state(state: Option<&tokio_postgres::error::SqlState>) -> bool { + use tokio_postgres::error::SqlState; + + matches!( + state, + Some(&SqlState::T_R_SERIALIZATION_FAILURE | &SqlState::T_R_DEADLOCK_DETECTED) + ) +} + +#[cfg(test)] +mod tests { + use common_error::ext::RetryHint; + use tokio_postgres::error::SqlState; + + use super::*; + + #[test] + fn test_postgres_sql_state_retry_hint() { + let retryable_states = [ + &SqlState::T_R_SERIALIZATION_FAILURE, + &SqlState::T_R_DEADLOCK_DETECTED, + &SqlState::LOCK_NOT_AVAILABLE, + &SqlState::TOO_MANY_CONNECTIONS, + &SqlState::ADMIN_SHUTDOWN, + &SqlState::CRASH_SHUTDOWN, + &SqlState::CANNOT_CONNECT_NOW, + &SqlState::CONNECTION_EXCEPTION, + &SqlState::SQLCLIENT_UNABLE_TO_ESTABLISH_SQLCONNECTION, + &SqlState::CONNECTION_DOES_NOT_EXIST, + &SqlState::SQLSERVER_REJECTED_ESTABLISHMENT_OF_SQLCONNECTION, + &SqlState::CONNECTION_FAILURE, + ]; + + for state in retryable_states { + assert_eq!( + retry_hint_from_postgres_sql_state(Some(state)), + RetryHint::Retryable, + "SQLSTATE {} should be retryable", + state.code() + ); + } + + assert_eq!( + retry_hint_from_postgres_sql_state(Some(&SqlState::UNDEFINED_TABLE)), + RetryHint::NonRetryable + ); + assert_eq!( + retry_hint_from_postgres_sql_state(None), + RetryHint::NonRetryable + ); + } + + #[test] + fn test_postgres_serialization_state() { + assert!(is_postgres_serialization_state(Some( + &SqlState::T_R_SERIALIZATION_FAILURE + ))); + assert!(is_postgres_serialization_state(Some( + &SqlState::T_R_DEADLOCK_DETECTED + ))); + assert!(!is_postgres_serialization_state(Some( + &SqlState::UNDEFINED_TABLE + ))); + assert!(!is_postgres_serialization_state(None)); + } +} diff --git a/src/meta-srv/src/error.rs b/src/meta-srv/src/error.rs index 66f95b0446..003bee94d3 100644 --- a/src/meta-srv/src/error.rs +++ b/src/meta-srv/src/error.rs @@ -1302,12 +1302,82 @@ impl ErrorExt for Error { | Error::LeaderLeaseChanged { .. } | Error::PeerUnavailable { .. } => RetryHint::Retryable, - Error::AllocateRegions { source, .. } | Error::DeallocateRegions { source, .. } - if source.retry_hint().is_retryable() => - { - RetryHint::Retryable + Error::ConnectEtcd { error, .. } | Error::EtcdFailed { error, .. } => { + common_meta::error::retry_hint_from_etcd_error(error) } + #[cfg(feature = "pg_kvbackend")] + Error::PostgresExecution { error, .. } => { + common_meta::error::retry_hint_from_postgres_error(error) + } + #[cfg(feature = "pg_kvbackend")] + Error::GetPostgresClient { error, .. } => { + common_meta::error::retry_hint_from_postgres_pool_error(error) + } + #[cfg(feature = "mysql_kvbackend")] + Error::MySqlExecution { error, .. } + | Error::CreateMySqlPool { error, .. } + | Error::AcquireMySqlClient { error, .. } => { + common_meta::error::retry_hint_from_sqlx_error(error) + } + #[cfg(any(feature = "pg_kvbackend", feature = "mysql_kvbackend"))] + Error::SqlExecutionTimeout { .. } => RetryHint::Retryable, + + Error::ListActiveFrontends { source, .. } + | Error::ListActiveDatanodes { source, .. } + | Error::ListActiveFlownodes { source, .. } + | Error::InitDdlManager { source, .. } + | Error::InitReconciliationManager { source, .. } + | Error::InitMetadata { source, .. } + | Error::NextSequence { source, .. } + | Error::SetNextSequence { source, .. } + | Error::PeekSequence { source, .. } + | Error::SubmitDdlTask { source, .. } + | Error::SubmitReconcileProcedure { source, .. } + | Error::InvalidateTableCache { source, .. } + | Error::ConvertProtoData { source, .. } + | Error::TableMetadataManager { source, .. } + | Error::RuntimeSwitchManager { source, .. } + | Error::KvBackend { source, .. } + | Error::UnexpectedLogicalRouteTable { source, .. } + | Error::SaveClusterInfo { source, .. } + | Error::InvalidClusterInfoFormat { source, .. } + | Error::InvalidDatanodeStatFormat { source, .. } + | Error::InvalidNodeInfoFormat { source, .. } + | Error::FlowStateHandler { source, .. } + | Error::BuildWalProvider { source, .. } + | Error::BuildKafkaClient { error: source, .. } + | Error::UpdateTopicNameValue { source, .. } + | Error::BuildTlsOptions { source, .. } + | Error::AllocateRegions { source, .. } + | Error::DeallocateRegions { source, .. } + | Error::BuildCreateRequest { source, .. } + | Error::AllocateRegionRoutes { source, .. } + | Error::AllocateWalOptions { source, .. } => source.retry_hint(), + + Error::Other { source, .. } + | Error::ListCatalogs { source, .. } + | Error::ListSchemas { source, .. } + | Error::ListTables { source, .. } + | Error::DowngradeLeader { source, .. } => source.retry_hint(), + + Error::SubmitProcedure { source, .. } + | Error::WaitProcedure { source, .. } + | Error::QueryProcedure { source, .. } + | Error::StartProcedureManager { source, .. } + | Error::StopProcedureManager { source, .. } + | Error::RegisterProcedureLoader { source, .. } + | Error::RepartitionSubprocedureStateReceiver { source, .. } => source.retry_hint(), + + Error::ShutdownServer { source, .. } | Error::StartHttp { source, .. } => { + source.retry_hint() + } + Error::StartTelemetryTask { source, .. } => source.retry_hint(), + Error::CreateChannel { source, .. } => source.retry_hint(), + Error::RepartitionCreateSubtasks { source, .. } => source.retry_hint(), + Error::SerializePartitionExpr { source, .. } + | Error::DeserializePartitionExpr { source, .. } => source.retry_hint(), + Error::DeleteRecords { error, .. } | Error::BuildPartitionClient { error, .. } | Error::GetOffset { error, .. } => rskafka_client_error_to_retry_hint(error),