refactor(meta): centralize backend retry classification (#8333)

* refactor(meta): centralize backend retry hint classification

Signed-off-by: WenyXu <wenymedia@gmail.com>

* refactor(meta): refine backend retry classification

Signed-off-by: WenyXu <wenymedia@gmail.com>

* refactor(meta): address mysql retry review

Signed-off-by: WenyXu <wenymedia@gmail.com>

* refactor(meta): include mysql lock timeout in txn retry

Signed-off-by: WenyXu <wenymedia@gmail.com>

---------

Signed-off-by: WenyXu <wenymedia@gmail.com>
This commit is contained in:
Weny Xu
2026-06-18 19:07:34 +08:00
committed by GitHub
parent a3e01e0018
commit e5930c5367
6 changed files with 677 additions and 19 deletions

View File

@@ -29,6 +29,14 @@ use table::metadata::TableId;
use crate::DatanodeId;
use crate::peer::Peer;
mod retry_hint;
pub use retry_hint::retry_hint_from_etcd_error;
#[cfg(feature = "mysql_kvbackend")]
pub use retry_hint::retry_hint_from_sqlx_error;
#[cfg(feature = "pg_kvbackend")]
pub use retry_hint::{retry_hint_from_postgres_error, retry_hint_from_postgres_pool_error};
#[derive(Snafu)]
#[snafu(visibility(pub))]
#[stack_trace_debug]
@@ -1280,6 +1288,9 @@ impl ErrorExt for Error {
| ElectionNoLeader { .. }
| ElectionLeaderLeaseExpired { .. }
| ElectionLeaderLeaseChanged { .. } => RetryHint::Retryable,
ConnectEtcd { error, .. } | EtcdFailed { error, .. } | EtcdTxnFailed { error, .. } => {
retry_hint_from_etcd_error(error)
}
WriteObject { error, .. } | ReadObject { error, .. } => {
retry_hint_from_opendal_error(error)
}
@@ -1307,6 +1318,19 @@ impl ErrorExt for Error {
ConvertAlterTableRequest { source, .. } => source.retry_hint(),
ConvertColumnDef { source, .. } => source.retry_hint(),
GetCache { source, .. } => source.retry_hint(),
#[cfg(feature = "pg_kvbackend")]
PostgresExecution { error, .. } | PostgresTransaction { error, .. } => {
retry_hint_from_postgres_error(error)
}
#[cfg(feature = "pg_kvbackend")]
GetPostgresClient { error, .. } => retry_hint_from_postgres_pool_error(error),
#[cfg(feature = "mysql_kvbackend")]
MySqlExecution { error, .. }
| CreateMySqlPool { error, .. }
| AcquireMySqlClient { error, .. }
| MySqlTransaction { error, .. } => retry_hint_from_sqlx_error(error),
#[cfg(any(feature = "pg_kvbackend", feature = "mysql_kvbackend"))]
SqlExecutionTimeout { .. } => RetryHint::Retryable,
_ => RetryHint::NonRetryable,
}
}
@@ -1318,23 +1342,12 @@ impl Error {
pub fn is_serialization_error(&self) -> bool {
match self {
#[cfg(feature = "pg_kvbackend")]
Error::PostgresTransaction { error, .. } => {
error.code() == Some(&tokio_postgres::error::SqlState::T_R_SERIALIZATION_FAILURE)
}
#[cfg(feature = "pg_kvbackend")]
Error::PostgresExecution { error, .. } => {
error.code() == Some(&tokio_postgres::error::SqlState::T_R_SERIALIZATION_FAILURE)
Error::PostgresExecution { error, .. } | Error::PostgresTransaction { error, .. } => {
retry_hint::is_postgres_serialization_error(error)
}
#[cfg(feature = "mysql_kvbackend")]
Error::MySqlExecution {
error: sqlx::Error::Database(database_error),
..
} => {
matches!(
database_error.message(),
"Deadlock found when trying to get lock; try restarting transaction"
| "can't serialize access for this transaction"
)
Error::MySqlExecution { error, .. } | Error::MySqlTransaction { error, .. } => {
retry_hint::is_mysql_serialization_error(error)
}
_ => false,
}
@@ -1410,6 +1423,18 @@ mod retry_hint_tests {
assert_eq!(err.retry_hint(), RetryHint::Retryable);
}
#[cfg(any(feature = "pg_kvbackend", feature = "mysql_kvbackend"))]
#[test]
fn test_sql_execution_timeout_hint_is_retryable() {
let err = SqlExecutionTimeoutSnafu {
sql: "SELECT 1".to_string(),
duration: std::time::Duration::from_secs(1),
}
.build();
assert_eq!(err.retry_hint(), RetryHint::Retryable);
}
#[test]
fn test_default_hint_is_non_retryable() {
let err = UnexpectedSnafu {

View File

@@ -0,0 +1,28 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
mod etcd;
#[cfg(feature = "mysql_kvbackend")]
mod mysql;
#[cfg(feature = "pg_kvbackend")]
mod postgres;
pub use etcd::retry_hint_from_etcd_error;
#[cfg(feature = "mysql_kvbackend")]
pub use mysql::{is_mysql_serialization_error, retry_hint_from_sqlx_error};
#[cfg(feature = "pg_kvbackend")]
pub use postgres::{
is_postgres_serialization_error, retry_hint_from_postgres_error,
retry_hint_from_postgres_pool_error,
};

View File

@@ -0,0 +1,74 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use common_error::ext::{RetryHint, retry_hint_from_io_error};
/// Converts an etcd client error into a conservative retry hint.
pub fn retry_hint_from_etcd_error(error: &etcd_client::Error) -> RetryHint {
match error {
etcd_client::Error::IoError(error) => retry_hint_from_io_error(error),
etcd_client::Error::TransportError(_)
| etcd_client::Error::EndpointError(_)
| etcd_client::Error::WatchError(_)
| etcd_client::Error::LeaseKeepAliveError(_)
| etcd_client::Error::ElectError(_) => RetryHint::Retryable,
etcd_client::Error::GRpcStatus(status) => retry_hint_from_etcd_grpc_code(status.code()),
etcd_client::Error::InvalidArgs(_)
| etcd_client::Error::InvalidUri(_)
| etcd_client::Error::Utf8Error(_)
| etcd_client::Error::InvalidHeaderValue(_)
| etcd_client::Error::EndpointsNotManaged => RetryHint::NonRetryable,
}
}
/// Converts a tonic status code from an external backend into a retry hint.
fn retry_hint_from_etcd_grpc_code(code: tonic::Code) -> RetryHint {
match code {
tonic::Code::Unavailable | tonic::Code::DeadlineExceeded | tonic::Code::Aborted => {
RetryHint::Retryable
}
_ => RetryHint::NonRetryable,
}
}
#[cfg(test)]
mod tests {
use common_error::ext::RetryHint;
use super::*;
#[test]
fn test_etcd_grpc_status_retry_hint() {
assert_eq!(
retry_hint_from_etcd_grpc_code(tonic::Code::Unavailable),
RetryHint::Retryable
);
assert_eq!(
retry_hint_from_etcd_grpc_code(tonic::Code::DeadlineExceeded),
RetryHint::Retryable
);
assert_eq!(
retry_hint_from_etcd_grpc_code(tonic::Code::Aborted),
RetryHint::Retryable
);
assert_eq!(
retry_hint_from_etcd_grpc_code(tonic::Code::ResourceExhausted),
RetryHint::NonRetryable
);
assert_eq!(
retry_hint_from_etcd_grpc_code(tonic::Code::InvalidArgument),
RetryHint::NonRetryable
);
}
}

View File

@@ -0,0 +1,308 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use common_error::ext::{RetryHint, retry_hint_from_io_error};
// MySQL error reference:
// https://dev.mysql.com/doc/mysql-errors/8.0/en/server-error-reference.html
// https://dev.mysql.com/doc/mysql-errors/8.0/en/client-error-reference.html
// MySQL 5.7 error reference:
// https://docs.oracle.com/cd/E17952_01/mysql-errors-5.7-en/server-error-reference.html
// Prefer errno over SQLSTATE because MySQL mixes transient connection failures
// and non-retryable protocol/configuration errors under SQLSTATE class 08.
// ER_CON_COUNT_ERROR, SQLSTATE 08004: Too many connections.
const ER_CON_COUNT_ERROR: u16 = 1040;
// ER_BAD_HOST_ERROR, SQLSTATE 08S01: Can't get hostname for your address.
const ER_BAD_HOST_ERROR: u16 = 1042;
// ER_HANDSHAKE_ERROR, SQLSTATE 08S01: Bad handshake.
// Treat it as non-retryable because it is commonly caused by protocol,
// authentication, TLS, or configuration mismatches.
const ER_HANDSHAKE_ERROR: u16 = 1043;
// ER_UNKNOWN_COM_ERROR, SQLSTATE 08S01: Unknown command.
const ER_UNKNOWN_COM_ERROR: u16 = 1047;
// ER_ACCESS_DENIED_ERROR, SQLSTATE 28000: Access denied for user.
const ER_ACCESS_DENIED_ERROR: u16 = 1045;
// ER_BAD_DB_ERROR, SQLSTATE 42000: Unknown database.
const ER_BAD_DB_ERROR: u16 = 1049;
// ER_SERVER_SHUTDOWN, SQLSTATE 08S01: Server shutdown in progress.
const ER_SERVER_SHUTDOWN: u16 = 1053;
// ER_FORCING_CLOSE, SQLSTATE 08S01: Forcing close of thread.
const ER_FORCING_CLOSE: u16 = 1080;
// ER_DUP_ENTRY, SQLSTATE 23000: Duplicate entry for key.
const ER_DUP_ENTRY: u16 = 1062;
// ER_NO_SUCH_TABLE, SQLSTATE 42S02: Table doesn't exist.
const ER_NO_SUCH_TABLE: u16 = 1146;
// ER_ABORTING_CONNECTION, SQLSTATE 08S01: Aborted connection.
const ER_ABORTING_CONNECTION: u16 = 1152;
// ER_NET_PACKET_TOO_LARGE, SQLSTATE 08S01: Packet bigger than max_allowed_packet.
const ER_NET_PACKET_TOO_LARGE: u16 = 1153;
// ER_NET_READ_ERROR_FROM_PIPE, SQLSTATE 08S01: Read error from connection pipe.
const ER_NET_READ_ERROR_FROM_PIPE: u16 = 1154;
// ER_NET_FCNTL_ERROR, SQLSTATE 08S01: Error from fcntl().
const ER_NET_FCNTL_ERROR: u16 = 1155;
// ER_NET_PACKETS_OUT_OF_ORDER, SQLSTATE 08S01: Packets out of order.
const ER_NET_PACKETS_OUT_OF_ORDER: u16 = 1156;
// ER_NET_UNCOMPRESS_ERROR, SQLSTATE 08S01: Couldn't uncompress communication packet.
const ER_NET_UNCOMPRESS_ERROR: u16 = 1157;
// ER_NET_READ_ERROR, SQLSTATE 08S01: Error reading communication packets.
const ER_NET_READ_ERROR: u16 = 1158;
// ER_NET_READ_INTERRUPTED, SQLSTATE 08S01: Timeout reading communication packets.
const ER_NET_READ_INTERRUPTED: u16 = 1159;
// ER_NET_ERROR_ON_WRITE, SQLSTATE 08S01: Error writing communication packets.
const ER_NET_ERROR_ON_WRITE: u16 = 1160;
// ER_NET_WRITE_INTERRUPTED, SQLSTATE 08S01: Timeout writing communication packets.
const ER_NET_WRITE_INTERRUPTED: u16 = 1161;
// ER_NEW_ABORTING_CONNECTION, SQLSTATE 08S01: Aborted connection.
const ER_NEW_ABORTING_CONNECTION: u16 = 1184;
// ER_MASTER_NET_READ / ER_SOURCE_NET_READ, SQLSTATE 08S01: Net error reading from source.
const ER_MASTER_NET_READ: u16 = 1189;
// ER_MASTER_NET_WRITE / ER_SOURCE_NET_WRITE, SQLSTATE 08S01: Net error writing to source.
const ER_MASTER_NET_WRITE: u16 = 1190;
// MySQL documents ER_LOCK_WAIT_TIMEOUT as SQLSTATE HY000, so classify it by
// the structured errno instead of SQLSTATE.
// ER_TOO_MANY_USER_CONNECTIONS, SQLSTATE 42000: Too many user connections.
const ER_TOO_MANY_USER_CONNECTIONS: u16 = 1203;
// ER_LOCK_WAIT_TIMEOUT, SQLSTATE HY000: Lock wait timeout exceeded; try restarting transaction.
const ER_LOCK_WAIT_TIMEOUT: u16 = 1205;
// ER_LOCK_DEADLOCK, SQLSTATE 40001: Deadlock found when trying to get lock; try restarting transaction.
const ER_LOCK_DEADLOCK: u16 = 1213;
// ER_CONNECT_TO_MASTER / ER_CONNECT_TO_SOURCE, SQLSTATE 08S01: Error connecting to source.
const ER_CONNECT_TO_MASTER: u16 = 1218;
// ER_USER_LIMIT_REACHED, SQLSTATE 42000: User limit reached.
const ER_USER_LIMIT_REACHED: u16 = 1226;
// ER_NOT_SUPPORTED_AUTH_MODE, SQLSTATE 08004: Client does not support authentication protocol.
const ER_NOT_SUPPORTED_AUTH_MODE: u16 = 1251;
// ER_NET_OK_PACKET_TOO_LARGE, SQLSTATE 08S01: OK packet too large.
const ER_NET_OK_PACKET_TOO_LARGE: u16 = 3068;
// CR_SERVER_GONE_ERROR, client error: MySQL server has gone away.
const CR_SERVER_GONE_ERROR: u16 = 2006;
// CR_SERVER_LOST, client error: Lost connection to MySQL server during query.
const CR_SERVER_LOST: u16 = 2013;
/// Converts MySQL database error details into a conservative retry hint.
fn retry_hint_from_mysql_database_error(number: Option<u16>, message: &str) -> RetryHint {
match number {
Some(
ER_CON_COUNT_ERROR
| ER_TOO_MANY_USER_CONNECTIONS
| self::ER_USER_LIMIT_REACHED
| ER_BAD_HOST_ERROR
| ER_SERVER_SHUTDOWN
| ER_FORCING_CLOSE
| ER_ABORTING_CONNECTION
| ER_NET_READ_ERROR_FROM_PIPE
| ER_NET_FCNTL_ERROR
| ER_NET_READ_ERROR
| ER_NET_READ_INTERRUPTED
| ER_NET_ERROR_ON_WRITE
| ER_NET_WRITE_INTERRUPTED
| ER_NEW_ABORTING_CONNECTION
| ER_MASTER_NET_READ
| ER_MASTER_NET_WRITE
| ER_LOCK_WAIT_TIMEOUT
| ER_LOCK_DEADLOCK
| ER_CONNECT_TO_MASTER
| CR_SERVER_GONE_ERROR
| CR_SERVER_LOST,
) => return RetryHint::Retryable,
// These are explicit reviewed non-retryable MySQL errno values.
// Retrying the same request usually cannot fix protocol,
// authentication, payload-size, or schema issues.
Some(
ER_HANDSHAKE_ERROR
| ER_UNKNOWN_COM_ERROR
| ER_ACCESS_DENIED_ERROR
| ER_BAD_DB_ERROR
| ER_DUP_ENTRY
| ER_NO_SUCH_TABLE
| ER_NET_PACKET_TOO_LARGE
| ER_NET_PACKETS_OUT_OF_ORDER
| ER_NET_UNCOMPRESS_ERROR
| ER_NOT_SUPPORTED_AUTH_MODE
| ER_NET_OK_PACKET_TOO_LARGE,
) => return RetryHint::NonRetryable,
_ => {}
}
if is_mysql_serialization_database_error(message) {
RetryHint::Retryable
} else {
RetryHint::NonRetryable
}
}
fn is_mysql_serialization_database_error(message: &str) -> bool {
matches!(
message,
"Deadlock found when trying to get lock; try restarting transaction"
| "can't serialize access for this transaction"
)
}
pub fn is_mysql_serialization_error(error: &sqlx::Error) -> bool {
match error {
sqlx::Error::Database(error) => {
let mysql_error = error
.as_error()
.downcast_ref::<sqlx::mysql::MySqlDatabaseError>();
matches!(
mysql_error.map(|error| error.number()),
Some(ER_LOCK_WAIT_TIMEOUT | ER_LOCK_DEADLOCK)
) || is_mysql_serialization_database_error(error.message())
}
_ => false,
}
}
/// Converts a sqlx error into a conservative retry hint.
pub fn retry_hint_from_sqlx_error(error: &sqlx::Error) -> RetryHint {
match error {
sqlx::Error::Io(error) => retry_hint_from_io_error(error),
// SQLx exposes TLS errors as boxed errors and protocol errors as debug
// strings, so we cannot classify them reliably by structured details.
// TLS errors are often certificate/configuration failures, while
// protocol errors may indicate a driver bug or protocol mismatch.
// Keep them non-retryable to avoid retrying deterministic failures.
sqlx::Error::Tls(_) | sqlx::Error::Protocol(_) => RetryHint::NonRetryable,
sqlx::Error::PoolTimedOut | sqlx::Error::WorkerCrashed => RetryHint::Retryable,
sqlx::Error::Database(error) => {
let mysql_error = error
.as_error()
.downcast_ref::<sqlx::mysql::MySqlDatabaseError>();
retry_hint_from_mysql_database_error(
mysql_error.map(|error| error.number()),
error.message(),
)
}
sqlx::Error::Configuration(_)
| sqlx::Error::InvalidArgument(_)
| sqlx::Error::RowNotFound
| sqlx::Error::TypeNotFound { .. }
| sqlx::Error::ColumnIndexOutOfBounds { .. }
| sqlx::Error::ColumnNotFound(_)
| sqlx::Error::ColumnDecode { .. }
| sqlx::Error::Encode(_)
| sqlx::Error::Decode(_)
| sqlx::Error::AnyDriverError(_)
| sqlx::Error::PoolClosed
| sqlx::Error::InvalidSavePointStatement
| sqlx::Error::BeginFailed => RetryHint::NonRetryable,
_ => RetryHint::NonRetryable,
}
}
#[cfg(test)]
mod tests {
use common_error::ext::RetryHint;
use super::*;
#[test]
fn test_mysql_database_error_retry_hint() {
let retryable_numbers = [
ER_CON_COUNT_ERROR,
ER_TOO_MANY_USER_CONNECTIONS,
ER_USER_LIMIT_REACHED,
ER_BAD_HOST_ERROR,
ER_SERVER_SHUTDOWN,
ER_FORCING_CLOSE,
ER_ABORTING_CONNECTION,
ER_NET_READ_ERROR_FROM_PIPE,
ER_NET_FCNTL_ERROR,
ER_NET_READ_ERROR,
ER_NET_READ_INTERRUPTED,
ER_NET_ERROR_ON_WRITE,
ER_NET_WRITE_INTERRUPTED,
ER_NEW_ABORTING_CONNECTION,
ER_MASTER_NET_READ,
ER_MASTER_NET_WRITE,
ER_LOCK_WAIT_TIMEOUT,
ER_LOCK_DEADLOCK,
ER_CONNECT_TO_MASTER,
CR_SERVER_GONE_ERROR,
CR_SERVER_LOST,
];
for number in retryable_numbers {
assert_eq!(
retry_hint_from_mysql_database_error(Some(number), "retryable mysql error"),
RetryHint::Retryable,
"errno {number} should be retryable"
);
}
let non_retryable_numbers = [
ER_HANDSHAKE_ERROR,
ER_UNKNOWN_COM_ERROR,
ER_ACCESS_DENIED_ERROR,
ER_BAD_DB_ERROR,
ER_DUP_ENTRY,
ER_NO_SUCH_TABLE,
ER_NET_PACKET_TOO_LARGE,
ER_NET_PACKETS_OUT_OF_ORDER,
ER_NET_UNCOMPRESS_ERROR,
ER_NOT_SUPPORTED_AUTH_MODE,
ER_NET_OK_PACKET_TOO_LARGE,
];
for number in non_retryable_numbers {
assert_eq!(
retry_hint_from_mysql_database_error(Some(number), "non-retryable mysql error"),
RetryHint::NonRetryable,
"errno {number} should be non-retryable"
);
}
}
#[test]
fn test_mysql_database_error_message_fallback_retry_hint() {
assert_eq!(
retry_hint_from_mysql_database_error(
None,
"Deadlock found when trying to get lock; try restarting transaction",
),
RetryHint::Retryable
);
assert_eq!(
retry_hint_from_mysql_database_error(
None,
"can't serialize access for this transaction",
),
RetryHint::Retryable
);
assert_eq!(
retry_hint_from_mysql_database_error(None, "unknown mysql error"),
RetryHint::NonRetryable
);
assert_eq!(
retry_hint_from_mysql_database_error(Some(9999), "unknown mysql error"),
RetryHint::NonRetryable
);
}
#[test]
fn test_mysql_serialization_database_error() {
assert!(is_mysql_serialization_database_error(
"Deadlock found when trying to get lock; try restarting transaction",
));
assert!(is_mysql_serialization_database_error(
"can't serialize access for this transaction"
));
assert!(!is_mysql_serialization_database_error("duplicate entry"));
}
}

View File

@@ -0,0 +1,153 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use common_error::ext::RetryHint;
/// Converts a tokio-postgres error into a conservative retry hint.
pub fn retry_hint_from_postgres_error(error: &tokio_postgres::Error) -> RetryHint {
if error.is_closed() {
return RetryHint::Retryable;
}
retry_hint_from_postgres_sql_state(error.code())
}
/// Converts a Postgres SQLSTATE into a conservative retry hint.
fn retry_hint_from_postgres_sql_state(
state: Option<&tokio_postgres::error::SqlState>,
) -> RetryHint {
use tokio_postgres::error::SqlState;
let Some(state) = state else {
return RetryHint::NonRetryable;
};
// PostgreSQL SQLSTATE reference:
// https://www.postgresql.org/docs/current/errcodes-appendix.html
match state {
// 40001 serialization_failure: concurrent transaction serialization conflict.
&SqlState::T_R_SERIALIZATION_FAILURE
// 40P01 deadlock_detected: transaction deadlock.
| &SqlState::T_R_DEADLOCK_DETECTED
// 55P03 lock_not_available: lock could not be acquired now.
| &SqlState::LOCK_NOT_AVAILABLE
// 53300 too_many_connections: backend connection capacity exhausted.
| &SqlState::TOO_MANY_CONNECTIONS
// 57P01 admin_shutdown: server is shutting down by administrator request.
| &SqlState::ADMIN_SHUTDOWN
// 57P02 crash_shutdown: server is shutting down after crash.
| &SqlState::CRASH_SHUTDOWN
// 57P03 cannot_connect_now: server is not accepting connections now.
| &SqlState::CANNOT_CONNECT_NOW
// 08000 connection_exception: generic connection exception.
| &SqlState::CONNECTION_EXCEPTION
// 08001 sqlclient_unable_to_establish_sqlconnection: client could not establish connection.
| &SqlState::SQLCLIENT_UNABLE_TO_ESTABLISH_SQLCONNECTION
// 08003 connection_does_not_exist: connection does not exist.
| &SqlState::CONNECTION_DOES_NOT_EXIST
// 08004 sqlserver_rejected_establishment_of_sqlconnection: server rejected connection establishment.
| &SqlState::SQLSERVER_REJECTED_ESTABLISHMENT_OF_SQLCONNECTION
// 08006 connection_failure: connection failure.
| &SqlState::CONNECTION_FAILURE => RetryHint::Retryable,
_ => RetryHint::NonRetryable,
}
}
/// Converts a deadpool Postgres pool error into a conservative retry hint.
pub fn retry_hint_from_postgres_pool_error(
error: &deadpool::managed::PoolError<tokio_postgres::Error>,
) -> RetryHint {
match error {
deadpool::managed::PoolError::Timeout(_) => RetryHint::Retryable,
deadpool::managed::PoolError::Backend(error) => retry_hint_from_postgres_error(error),
deadpool::managed::PoolError::PostCreateHook(error) => match error {
deadpool::managed::HookError::Backend(error) => retry_hint_from_postgres_error(error),
deadpool::managed::HookError::Message(_) => RetryHint::NonRetryable,
},
deadpool::managed::PoolError::Closed | deadpool::managed::PoolError::NoRuntimeSpecified => {
RetryHint::NonRetryable
}
}
}
pub fn is_postgres_serialization_error(error: &tokio_postgres::Error) -> bool {
is_postgres_serialization_state(error.code())
}
fn is_postgres_serialization_state(state: Option<&tokio_postgres::error::SqlState>) -> bool {
use tokio_postgres::error::SqlState;
matches!(
state,
Some(&SqlState::T_R_SERIALIZATION_FAILURE | &SqlState::T_R_DEADLOCK_DETECTED)
)
}
#[cfg(test)]
mod tests {
use common_error::ext::RetryHint;
use tokio_postgres::error::SqlState;
use super::*;
#[test]
fn test_postgres_sql_state_retry_hint() {
let retryable_states = [
&SqlState::T_R_SERIALIZATION_FAILURE,
&SqlState::T_R_DEADLOCK_DETECTED,
&SqlState::LOCK_NOT_AVAILABLE,
&SqlState::TOO_MANY_CONNECTIONS,
&SqlState::ADMIN_SHUTDOWN,
&SqlState::CRASH_SHUTDOWN,
&SqlState::CANNOT_CONNECT_NOW,
&SqlState::CONNECTION_EXCEPTION,
&SqlState::SQLCLIENT_UNABLE_TO_ESTABLISH_SQLCONNECTION,
&SqlState::CONNECTION_DOES_NOT_EXIST,
&SqlState::SQLSERVER_REJECTED_ESTABLISHMENT_OF_SQLCONNECTION,
&SqlState::CONNECTION_FAILURE,
];
for state in retryable_states {
assert_eq!(
retry_hint_from_postgres_sql_state(Some(state)),
RetryHint::Retryable,
"SQLSTATE {} should be retryable",
state.code()
);
}
assert_eq!(
retry_hint_from_postgres_sql_state(Some(&SqlState::UNDEFINED_TABLE)),
RetryHint::NonRetryable
);
assert_eq!(
retry_hint_from_postgres_sql_state(None),
RetryHint::NonRetryable
);
}
#[test]
fn test_postgres_serialization_state() {
assert!(is_postgres_serialization_state(Some(
&SqlState::T_R_SERIALIZATION_FAILURE
)));
assert!(is_postgres_serialization_state(Some(
&SqlState::T_R_DEADLOCK_DETECTED
)));
assert!(!is_postgres_serialization_state(Some(
&SqlState::UNDEFINED_TABLE
)));
assert!(!is_postgres_serialization_state(None));
}
}

View File

@@ -1302,12 +1302,82 @@ impl ErrorExt for Error {
| Error::LeaderLeaseChanged { .. }
| Error::PeerUnavailable { .. } => RetryHint::Retryable,
Error::AllocateRegions { source, .. } | Error::DeallocateRegions { source, .. }
if source.retry_hint().is_retryable() =>
{
RetryHint::Retryable
Error::ConnectEtcd { error, .. } | Error::EtcdFailed { error, .. } => {
common_meta::error::retry_hint_from_etcd_error(error)
}
#[cfg(feature = "pg_kvbackend")]
Error::PostgresExecution { error, .. } => {
common_meta::error::retry_hint_from_postgres_error(error)
}
#[cfg(feature = "pg_kvbackend")]
Error::GetPostgresClient { error, .. } => {
common_meta::error::retry_hint_from_postgres_pool_error(error)
}
#[cfg(feature = "mysql_kvbackend")]
Error::MySqlExecution { error, .. }
| Error::CreateMySqlPool { error, .. }
| Error::AcquireMySqlClient { error, .. } => {
common_meta::error::retry_hint_from_sqlx_error(error)
}
#[cfg(any(feature = "pg_kvbackend", feature = "mysql_kvbackend"))]
Error::SqlExecutionTimeout { .. } => RetryHint::Retryable,
Error::ListActiveFrontends { source, .. }
| Error::ListActiveDatanodes { source, .. }
| Error::ListActiveFlownodes { source, .. }
| Error::InitDdlManager { source, .. }
| Error::InitReconciliationManager { source, .. }
| Error::InitMetadata { source, .. }
| Error::NextSequence { source, .. }
| Error::SetNextSequence { source, .. }
| Error::PeekSequence { source, .. }
| Error::SubmitDdlTask { source, .. }
| Error::SubmitReconcileProcedure { source, .. }
| Error::InvalidateTableCache { source, .. }
| Error::ConvertProtoData { source, .. }
| Error::TableMetadataManager { source, .. }
| Error::RuntimeSwitchManager { source, .. }
| Error::KvBackend { source, .. }
| Error::UnexpectedLogicalRouteTable { source, .. }
| Error::SaveClusterInfo { source, .. }
| Error::InvalidClusterInfoFormat { source, .. }
| Error::InvalidDatanodeStatFormat { source, .. }
| Error::InvalidNodeInfoFormat { source, .. }
| Error::FlowStateHandler { source, .. }
| Error::BuildWalProvider { source, .. }
| Error::BuildKafkaClient { error: source, .. }
| Error::UpdateTopicNameValue { source, .. }
| Error::BuildTlsOptions { source, .. }
| Error::AllocateRegions { source, .. }
| Error::DeallocateRegions { source, .. }
| Error::BuildCreateRequest { source, .. }
| Error::AllocateRegionRoutes { source, .. }
| Error::AllocateWalOptions { source, .. } => source.retry_hint(),
Error::Other { source, .. }
| Error::ListCatalogs { source, .. }
| Error::ListSchemas { source, .. }
| Error::ListTables { source, .. }
| Error::DowngradeLeader { source, .. } => source.retry_hint(),
Error::SubmitProcedure { source, .. }
| Error::WaitProcedure { source, .. }
| Error::QueryProcedure { source, .. }
| Error::StartProcedureManager { source, .. }
| Error::StopProcedureManager { source, .. }
| Error::RegisterProcedureLoader { source, .. }
| Error::RepartitionSubprocedureStateReceiver { source, .. } => source.retry_hint(),
Error::ShutdownServer { source, .. } | Error::StartHttp { source, .. } => {
source.retry_hint()
}
Error::StartTelemetryTask { source, .. } => source.retry_hint(),
Error::CreateChannel { source, .. } => source.retry_hint(),
Error::RepartitionCreateSubtasks { source, .. } => source.retry_hint(),
Error::SerializePartitionExpr { source, .. }
| Error::DeserializePartitionExpr { source, .. } => source.retry_hint(),
Error::DeleteRecords { error, .. }
| Error::BuildPartitionClient { error, .. }
| Error::GetOffset { error, .. } => rskafka_client_error_to_retry_hint(error),