From bdaad9db77e0e19209a6d0c4c5840869dd96fae2 Mon Sep 17 00:00:00 2001 From: shuiyisong <113876041+shuiyisong@users.noreply.github.com> Date: Thu, 7 May 2026 16:48:49 +0800 Subject: [PATCH] refactor: extract functions for building mysql/pg's kvbackend and electionref (#8076) chore: extract functions for building mysql/pg's kvbackend and electionref Signed-off-by: shuiyisong --- src/cli/src/common/store.rs | 19 ++------ src/meta-srv/src/bootstrap.rs | 75 +++++++++------------------- src/meta-srv/src/utils/mysql.rs | 64 ++++++++++++++++++++++++ src/meta-srv/src/utils/postgres.rs | 78 ++++++++++++++++++++++++++++++ 4 files changed, 169 insertions(+), 67 deletions(-) diff --git a/src/cli/src/common/store.rs b/src/cli/src/common/store.rs index 373c96a37a..34baaba4ff 100644 --- a/src/cli/src/common/store.rs +++ b/src/cli/src/common/store.rs @@ -153,17 +153,11 @@ impl StoreConfig { BackendImpl::PostgresStore => { let table_name = &self.meta_table_name; let tls_config = self.tls_config(); - let pool = meta_srv::utils::postgres::create_postgres_pool( + Ok(meta_srv::utils::postgres::build_postgres_kv_backend( store_addrs, None, tls_config, - ) - .await - .map_err(BoxedError::new)?; - let schema_name = self.meta_schema_name.as_deref(); - Ok(common_meta::kv_backend::rds::PgStore::with_pg_pool( - pool, - schema_name, + self.meta_schema_name.as_deref(), table_name, max_txn_ops, self.auto_create_schema, @@ -175,12 +169,9 @@ impl StoreConfig { BackendImpl::MysqlStore => { let table_name = &self.meta_table_name; let tls_config = self.tls_config(); - let pool = - meta_srv::utils::mysql::create_mysql_pool(store_addrs, tls_config.as_ref()) - .await - .map_err(BoxedError::new)?; - Ok(common_meta::kv_backend::rds::MySqlStore::with_mysql_pool( - pool, + Ok(meta_srv::utils::mysql::build_mysql_kv_backend( + store_addrs, + tls_config.as_ref(), table_name, max_txn_ops, ) diff --git a/src/meta-srv/src/bootstrap.rs b/src/meta-srv/src/bootstrap.rs index 51d2b4d37b..a9cc72ec53 100644 --- a/src/meta-srv/src/bootstrap.rs +++ b/src/meta-srv/src/bootstrap.rs @@ -290,16 +290,11 @@ pub async fn metasrv_builder( use common_meta::distributed_time_constants::POSTGRES_KEEP_ALIVE_SECS; use common_meta::election::CANDIDATE_LEASE_SECS; - use common_meta::election::rds::postgres::{ElectionPgClient, PgElection}; - use common_meta::kv_backend::rds::PgStore; use deadpool_postgres::{Config, ManagerConfig, RecyclingMethod}; - use crate::utils::postgres::create_postgres_pool; + use crate::utils::postgres::{build_postgres_election, build_postgres_kv_backend}; let candidate_lease_ttl = Duration::from_secs(CANDIDATE_LEASE_SECS); - let execution_timeout = Duration::from_secs(META_LEASE_SECS); - let statement_timeout = Duration::from_secs(META_LEASE_SECS); - let idle_session_timeout = Duration::from_secs(META_LEASE_SECS); let meta_lease_ttl = Duration::from_secs(META_LEASE_SECS); let mut cfg = Config::new(); @@ -308,24 +303,12 @@ pub async fn metasrv_builder( cfg.manager = Some(ManagerConfig { recycling_method: RecyclingMethod::Verified, }); - // Use a dedicated pool for the election client to allow customized session settings. - let pool = create_postgres_pool( + + let election = build_postgres_election( &opts.store_addrs, Some(cfg.clone()), opts.backend_tls.clone(), - ) - .await?; - - let election_client = ElectionPgClient::new( - pool, - execution_timeout, - idle_session_timeout, - statement_timeout, - ) - .context(error::KvBackendSnafu)?; - let election = PgElection::with_pg_client( opts.grpc.server_addr.clone(), - election_client, opts.store_key_prefix.clone(), candidate_lease_ttl, meta_lease_ttl, @@ -333,20 +316,18 @@ pub async fn metasrv_builder( &opts.meta_table_name, opts.meta_election_lock_id, ) - .await - .context(error::KvBackendSnafu)?; + .await?; - let pool = create_postgres_pool(&opts.store_addrs, Some(cfg), opts.backend_tls.clone()) - .await?; - let kv_backend = PgStore::with_pg_pool( - pool, + let kv_backend = build_postgres_kv_backend( + &opts.store_addrs, + Some(cfg), + opts.backend_tls.clone(), opts.meta_schema_name.as_deref(), &opts.meta_table_name, opts.max_txn_ops, opts.auto_create_schema, ) - .await - .context(error::KvBackendSnafu)?; + .await?; (kv_backend, Some(election)) } @@ -355,45 +336,33 @@ pub async fn metasrv_builder( use std::time::Duration; use common_meta::election::CANDIDATE_LEASE_SECS; - use common_meta::election::rds::mysql::{ElectionMysqlClient, MySqlElection}; - use common_meta::kv_backend::rds::MySqlStore; - use crate::utils::mysql::create_mysql_pool; + use crate::utils::mysql::{build_mysql_election, build_mysql_kv_backend}; - let pool = create_mysql_pool(&opts.store_addrs, opts.backend_tls.as_ref()).await?; - let kv_backend = - MySqlStore::with_mysql_pool(pool, &opts.meta_table_name, opts.max_txn_ops) - .await - .context(error::KvBackendSnafu)?; + let kv_backend = build_mysql_kv_backend( + &opts.store_addrs, + opts.backend_tls.as_ref(), + &opts.meta_table_name, + opts.max_txn_ops, + ) + .await?; // Since election will acquire a lock of the table, we need a separate table for election. let election_table_name = opts.meta_table_name.clone() + "_election"; - // We use a separate pool for election since we need a different session keep-alive idle time. - let pool = create_mysql_pool(&opts.store_addrs, opts.backend_tls.as_ref()).await?; - let execution_timeout = Duration::from_secs(META_LEASE_SECS); - let statement_timeout = Duration::from_secs(META_LEASE_SECS); - let idle_session_timeout = Duration::from_secs(META_LEASE_SECS); let innode_lock_wait_timeout = Duration::from_secs(META_LEASE_SECS / 2); let meta_lease_ttl = Duration::from_secs(META_LEASE_SECS); let candidate_lease_ttl = Duration::from_secs(CANDIDATE_LEASE_SECS); - let election_client = ElectionMysqlClient::new( - pool, - execution_timeout, - statement_timeout, - innode_lock_wait_timeout, - idle_session_timeout, - &election_table_name, - ); - let election = MySqlElection::with_mysql_client( + let election = build_mysql_election( + &opts.store_addrs, + opts.backend_tls.as_ref(), opts.grpc.server_addr.clone(), - election_client, opts.store_key_prefix.clone(), candidate_lease_ttl, meta_lease_ttl, &election_table_name, + innode_lock_wait_timeout, ) - .await - .context(error::KvBackendSnafu)?; + .await?; (kv_backend, Some(election)) } }; diff --git a/src/meta-srv/src/utils/mysql.rs b/src/meta-srv/src/utils/mysql.rs index 1daa36a361..326f3880f4 100644 --- a/src/meta-srv/src/utils/mysql.rs +++ b/src/meta-srv/src/utils/mysql.rs @@ -12,6 +12,10 @@ // See the License for the specific language governing permissions and // limitations under the License. +use common_meta::election::ElectionRef; +use common_meta::election::rds::mysql::{ElectionMysqlClient, MySqlElection}; +use common_meta::kv_backend::KvBackendRef; +use common_meta::kv_backend::rds::MySqlStore; use common_telemetry::info; use servers::tls::{TlsMode, TlsOption}; use snafu::{OptionExt, ResultExt}; @@ -83,3 +87,63 @@ pub async fn create_mysql_pool( Ok(pool) } + +/// Builds a MySQL-backed metadata [`KvBackendRef`]. +/// +/// * `store_addrs` - MySQL connection URLs; only the first address is used. +/// * `tls_config` - optional TLS settings for the MySQL connection. +/// * `table_name` - metadata KV table name. +/// * `max_txn_ops` - maximum operations allowed in one metadata transaction. +pub async fn build_mysql_kv_backend( + store_addrs: &[String], + tls_config: Option<&TlsOption>, + table_name: &str, + max_txn_ops: usize, +) -> Result { + let pool = create_mysql_pool(store_addrs, tls_config).await?; + MySqlStore::with_mysql_pool(pool, table_name, max_txn_ops) + .await + .context(error::KvBackendSnafu) +} + +/// Builds a MySQL-backed election implementation. +/// +/// * `store_addrs` - MySQL connection URLs; only the first address is used. +/// * `tls_config` - optional TLS settings for the MySQL connection. +/// * `leader_value` - advertised address of this election candidate. +/// * `store_key_prefix` - prefix for election and candidate keys. +/// * `candidate_lease_ttl` - TTL for registered candidate metadata. +/// * `meta_lease_ttl` - TTL for the elected leader metadata. +/// * `election_table_name` - dedicated table used for election locking and records. +/// * `innodb_lock_wait_timeout` - session lock wait timeout for election transactions. +#[allow(clippy::too_many_arguments)] +pub async fn build_mysql_election( + store_addrs: &[String], + tls_config: Option<&TlsOption>, + leader_value: String, + store_key_prefix: String, + candidate_lease_ttl: std::time::Duration, + meta_lease_ttl: std::time::Duration, + election_table_name: &str, + innodb_lock_wait_timeout: std::time::Duration, +) -> Result { + let pool = create_mysql_pool(store_addrs, tls_config).await?; + let election_client = ElectionMysqlClient::new( + pool, + meta_lease_ttl, + meta_lease_ttl, + innodb_lock_wait_timeout, + meta_lease_ttl, + election_table_name, + ); + MySqlElection::with_mysql_client( + leader_value, + election_client, + store_key_prefix, + candidate_lease_ttl, + meta_lease_ttl, + election_table_name, + ) + .await + .context(error::KvBackendSnafu) +} diff --git a/src/meta-srv/src/utils/postgres.rs b/src/meta-srv/src/utils/postgres.rs index e7dfd59383..ec46c076e2 100644 --- a/src/meta-srv/src/utils/postgres.rs +++ b/src/meta-srv/src/utils/postgres.rs @@ -13,6 +13,10 @@ // limitations under the License. use common_error::ext::BoxedError; +use common_meta::election::ElectionRef; +use common_meta::election::rds::postgres::{ElectionPgClient, PgElection}; +use common_meta::kv_backend::KvBackendRef; +use common_meta::kv_backend::rds::PgStore; use common_meta::kv_backend::rds::postgres::{ TlsMode as PgTlsMode, TlsOption as PgTlsOption, create_postgres_tls_connector, }; @@ -72,3 +76,77 @@ pub async fn create_postgres_pool( Ok(pool) } + +/// Builds a Postgres-backed metadata [`KvBackendRef`]. +/// +/// * `store_addrs` - Postgres connection URLs; only the first address is used. +/// * `cfg` - optional deadpool config to customize pool/session behavior. +/// * `tls_config` - optional TLS settings for the Postgres connection. +/// * `schema_name` - optional schema containing the metadata table. +/// * `table_name` - metadata KV table name. +/// * `max_txn_ops` - maximum operations allowed in one metadata transaction. +/// * `auto_create_schema` - whether to create `schema_name` when it is missing. +#[allow(clippy::too_many_arguments)] +pub async fn build_postgres_kv_backend( + store_addrs: &[String], + cfg: Option, + tls_config: Option, + schema_name: Option<&str>, + table_name: &str, + max_txn_ops: usize, + auto_create_schema: bool, +) -> Result { + let pool = create_postgres_pool(store_addrs, cfg, tls_config).await?; + PgStore::with_pg_pool( + pool, + schema_name, + table_name, + max_txn_ops, + auto_create_schema, + ) + .await + .context(error::KvBackendSnafu) +} + +/// Builds a Postgres-backed election implementation. +/// +/// * `store_addrs` - Postgres connection URLs; only the first address is used. +/// * `cfg` - optional deadpool config to customize pool/session behavior. +/// * `tls_config` - optional TLS settings for the Postgres connection. +/// * `leader_value` - advertised address of this election candidate. +/// * `store_key_prefix` - prefix for election and candidate keys. +/// * `candidate_lease_ttl` - TTL for registered candidate metadata. +/// * `meta_lease_ttl` - TTL for the elected leader metadata. +/// * `schema_name` - optional schema containing the metadata table. +/// * `table_name` - metadata KV table name used for election records. +/// * `lock_id` - Postgres advisory lock id used by the election. +#[allow(clippy::too_many_arguments)] +pub async fn build_postgres_election( + store_addrs: &[String], + cfg: Option, + tls_config: Option, + leader_value: String, + store_key_prefix: String, + candidate_lease_ttl: std::time::Duration, + meta_lease_ttl: std::time::Duration, + schema_name: Option<&str>, + table_name: &str, + lock_id: u64, +) -> Result { + let pool = create_postgres_pool(store_addrs, cfg, tls_config).await?; + let election_client = + ElectionPgClient::new(pool, meta_lease_ttl, meta_lease_ttl, meta_lease_ttl) + .context(error::KvBackendSnafu)?; + PgElection::with_pg_client( + leader_value, + election_client, + store_key_prefix, + candidate_lease_ttl, + meta_lease_ttl, + schema_name, + table_name, + lock_id, + ) + .await + .context(error::KvBackendSnafu) +}