fix(meta): enhance mysql election client with timeouts and reconnection (#6341)

* fix(meta): enhance mysql election client with timeouts and reconnection

Signed-off-by: WenyXu <wenymedia@gmail.com>

* feat: improve MySQL election client lease management and error handling

Signed-off-by: WenyXu <wenymedia@gmail.com>

* chore: adjust timeout configurations for election clients

Signed-off-by: WenyXu <wenymedia@gmail.com>

* chore: remove unused error

Signed-off-by: WenyXu <wenymedia@gmail.com>

* fix: fix unit test

Signed-off-by: WenyXu <wenymedia@gmail.com>

* chore: apply suggestions from CR

Signed-off-by: WenyXu <wenymedia@gmail.com>

* chore: apply suggestions from CR

Signed-off-by: WenyXu <wenymedia@gmail.com>

* chore: apply suggestions from CR

Signed-off-by: WenyXu <wenymedia@gmail.com>

---------

Signed-off-by: WenyXu <wenymedia@gmail.com>
This commit is contained in:
Weny Xu
2025-06-19 20:44:23 +08:00
committed by GitHub
parent d4826b998d
commit 89e3c8edab
3 changed files with 711 additions and 286 deletions

View File

@@ -46,9 +46,7 @@ use snafu::ResultExt;
#[cfg(feature = "mysql_kvbackend")]
use sqlx::mysql::MySqlConnectOptions;
#[cfg(feature = "mysql_kvbackend")]
use sqlx::mysql::{MySqlConnection, MySqlPool};
#[cfg(feature = "mysql_kvbackend")]
use sqlx::Connection;
use sqlx::mysql::MySqlPool;
use tokio::net::TcpListener;
use tokio::sync::mpsc::{self, Receiver, Sender};
use tokio::sync::{oneshot, Mutex};
@@ -278,6 +276,7 @@ pub async fn metasrv_builder(
let candidate_lease_ttl = Duration::from_secs(CANDIDATE_LEASE_SECS);
let execution_timeout = Duration::from_secs(META_LEASE_SECS);
let statement_timeout = Duration::from_secs(META_LEASE_SECS);
let idle_session_timeout = Duration::from_secs(META_LEASE_SECS);
let meta_lease_ttl = Duration::from_secs(META_LEASE_SECS);
let mut cfg = Config::new();
@@ -286,8 +285,12 @@ pub async fn metasrv_builder(
// We use a separate pool for election since we need a different session keep-alive idle time.
let pool = create_postgres_pool_with(&opts.store_addrs, cfg).await?;
let election_client =
ElectionPgClient::new(pool, execution_timeout, meta_lease_ttl, statement_timeout)?;
let election_client = ElectionPgClient::new(
pool,
execution_timeout,
idle_session_timeout,
statement_timeout,
)?;
let election = PgElection::with_pg_client(
opts.grpc.server_addr.clone(),
election_client,
@@ -308,6 +311,10 @@ pub async fn metasrv_builder(
}
#[cfg(feature = "mysql_kvbackend")]
(None, BackendImpl::MysqlStore) => {
use std::time::Duration;
use crate::election::rds::mysql::ElectionMysqlClient;
let pool = create_mysql_pool(&opts.store_addrs).await?;
let kv_backend =
MySqlStore::with_mysql_pool(pool, &opts.meta_table_name, opts.max_txn_ops)
@@ -315,13 +322,29 @@ pub async fn metasrv_builder(
.context(error::KvBackendSnafu)?;
// Since election will acquire a lock of the table, we need a separate table for election.
let election_table_name = opts.meta_table_name.clone() + "_election";
let election_client = create_mysql_client(opts).await?;
// We use a separate pool for election since we need a different session keep-alive idle time.
let pool = create_mysql_pool(&opts.store_addrs).await?;
let execution_timeout = Duration::from_secs(META_LEASE_SECS);
let statement_timeout = Duration::from_secs(META_LEASE_SECS);
let idle_session_timeout = Duration::from_secs(META_LEASE_SECS);
let innode_lock_wait_timeout = Duration::from_secs(META_LEASE_SECS / 2);
let meta_lease_ttl = Duration::from_secs(META_LEASE_SECS);
let candidate_lease_ttl = Duration::from_secs(CANDIDATE_LEASE_SECS);
let election_client = ElectionMysqlClient::new(
pool,
execution_timeout,
statement_timeout,
innode_lock_wait_timeout,
idle_session_timeout,
&election_table_name,
);
let election = MySqlElection::with_mysql_client(
opts.grpc.server_addr.clone(),
election_client,
opts.store_key_prefix.clone(),
CANDIDATE_LEASE_SECS,
META_LEASE_SECS,
candidate_lease_ttl,
meta_lease_ttl,
&election_table_name,
)
.await?;
@@ -438,14 +461,6 @@ pub async fn create_mysql_pool(store_addrs: &[String]) -> Result<MySqlPool> {
let pool = MySqlPool::connect_with(opts)
.await
.context(error::CreateMySqlPoolSnafu)?;
Ok(pool)
}
#[cfg(feature = "mysql_kvbackend")]
async fn create_mysql_client(opts: &MetasrvOptions) -> Result<MySqlConnection> {
let opts = setup_mysql_options(&opts.store_addrs).await?;
let client = MySqlConnection::connect_with(&opts)
.await
.context(error::ConnectMySqlSnafu)?;
Ok(client)
}

File diff suppressed because it is too large Load Diff

View File

@@ -382,6 +382,14 @@ pub enum Error {
location: Location,
},
#[snafu(display("Failed to decode sql value"))]
DecodeSqlValue {
#[snafu(source)]
error: sqlx::error::Error,
#[snafu(implicit)]
location: Location,
},
#[snafu(display("Failed to find table route for {table_id}"))]
TableRouteNotFound {
table_id: TableId,
@@ -417,6 +425,18 @@ pub enum Error {
location: Location,
},
#[snafu(display("Leader lease expired"))]
LeaderLeaseExpired {
#[snafu(implicit)]
location: Location,
},
#[snafu(display("Leader lease changed during election"))]
LeaderLeaseChanged {
#[snafu(implicit)]
location: Location,
},
#[snafu(display("Table {} not found", name))]
TableNotFound {
name: String,
@@ -766,7 +786,7 @@ pub enum Error {
error: deadpool::managed::PoolError<tokio_postgres::Error>,
},
#[cfg(feature = "pg_kvbackend")]
#[cfg(any(feature = "pg_kvbackend", feature = "mysql_kvbackend"))]
#[snafu(display("Sql execution timeout, sql: {}, duration: {:?}", sql, duration))]
SqlExecutionTimeout {
#[snafu(implicit)]
@@ -812,8 +832,8 @@ pub enum Error {
},
#[cfg(feature = "mysql_kvbackend")]
#[snafu(display("Failed to connect to mysql"))]
ConnectMySql {
#[snafu(display("Failed to acquire mysql client from pool"))]
AcquireMySqlClient {
#[snafu(source)]
error: sqlx::Error,
#[snafu(implicit)]
@@ -911,6 +931,8 @@ impl ErrorExt for Error {
| Error::SerializeToJson { .. }
| Error::DeserializeFromJson { .. }
| Error::NoLeader { .. }
| Error::LeaderLeaseExpired { .. }
| Error::LeaderLeaseChanged { .. }
| Error::CreateChannel { .. }
| Error::BatchGet { .. }
| Error::Range { .. }
@@ -1012,19 +1034,21 @@ impl ErrorExt for Error {
Error::Other { source, .. } => source.status_code(),
Error::LookupPeer { source, .. } => source.status_code(),
Error::NoEnoughAvailableNode { .. } => StatusCode::RuntimeResourcesExhausted,
#[cfg(feature = "pg_kvbackend")]
Error::CreatePostgresPool { .. }
| Error::GetPostgresClient { .. }
| Error::GetPostgresConnection { .. }
| Error::PostgresExecution { .. }
| Error::SqlExecutionTimeout { .. } => StatusCode::Internal,
| Error::PostgresExecution { .. } => StatusCode::Internal,
#[cfg(feature = "mysql_kvbackend")]
Error::MySqlExecution { .. }
| Error::CreateMySqlPool { .. }
| Error::ConnectMySql { .. }
| Error::ParseMySqlUrl { .. } => StatusCode::Internal,
Error::NoEnoughAvailableNode { .. } => StatusCode::RuntimeResourcesExhausted,
| Error::ParseMySqlUrl { .. }
| Error::DecodeSqlValue { .. }
| Error::AcquireMySqlClient { .. } => StatusCode::Internal,
#[cfg(any(feature = "pg_kvbackend", feature = "mysql_kvbackend"))]
Error::SqlExecutionTimeout { .. } => StatusCode::Internal,
}
}