refactor: extract functions for building mysql/pg's kvbackend and electionref (#8076)

chore: extract functions for building mysql/pg's kvbackend and electionref

Signed-off-by: shuiyisong <xixing.sys@gmail.com>
This commit is contained in:
shuiyisong
2026-05-07 16:48:49 +08:00
committed by GitHub
parent 796aae3d9f
commit bdaad9db77
4 changed files with 169 additions and 67 deletions

View File

@@ -153,17 +153,11 @@ impl StoreConfig {
BackendImpl::PostgresStore => {
let table_name = &self.meta_table_name;
let tls_config = self.tls_config();
let pool = meta_srv::utils::postgres::create_postgres_pool(
Ok(meta_srv::utils::postgres::build_postgres_kv_backend(
store_addrs,
None,
tls_config,
)
.await
.map_err(BoxedError::new)?;
let schema_name = self.meta_schema_name.as_deref();
Ok(common_meta::kv_backend::rds::PgStore::with_pg_pool(
pool,
schema_name,
self.meta_schema_name.as_deref(),
table_name,
max_txn_ops,
self.auto_create_schema,
@@ -175,12 +169,9 @@ impl StoreConfig {
BackendImpl::MysqlStore => {
let table_name = &self.meta_table_name;
let tls_config = self.tls_config();
let pool =
meta_srv::utils::mysql::create_mysql_pool(store_addrs, tls_config.as_ref())
.await
.map_err(BoxedError::new)?;
Ok(common_meta::kv_backend::rds::MySqlStore::with_mysql_pool(
pool,
Ok(meta_srv::utils::mysql::build_mysql_kv_backend(
store_addrs,
tls_config.as_ref(),
table_name,
max_txn_ops,
)

View File

@@ -290,16 +290,11 @@ pub async fn metasrv_builder(
use common_meta::distributed_time_constants::POSTGRES_KEEP_ALIVE_SECS;
use common_meta::election::CANDIDATE_LEASE_SECS;
use common_meta::election::rds::postgres::{ElectionPgClient, PgElection};
use common_meta::kv_backend::rds::PgStore;
use deadpool_postgres::{Config, ManagerConfig, RecyclingMethod};
use crate::utils::postgres::create_postgres_pool;
use crate::utils::postgres::{build_postgres_election, build_postgres_kv_backend};
let candidate_lease_ttl = Duration::from_secs(CANDIDATE_LEASE_SECS);
let execution_timeout = Duration::from_secs(META_LEASE_SECS);
let statement_timeout = Duration::from_secs(META_LEASE_SECS);
let idle_session_timeout = Duration::from_secs(META_LEASE_SECS);
let meta_lease_ttl = Duration::from_secs(META_LEASE_SECS);
let mut cfg = Config::new();
@@ -308,24 +303,12 @@ pub async fn metasrv_builder(
cfg.manager = Some(ManagerConfig {
recycling_method: RecyclingMethod::Verified,
});
// Use a dedicated pool for the election client to allow customized session settings.
let pool = create_postgres_pool(
let election = build_postgres_election(
&opts.store_addrs,
Some(cfg.clone()),
opts.backend_tls.clone(),
)
.await?;
let election_client = ElectionPgClient::new(
pool,
execution_timeout,
idle_session_timeout,
statement_timeout,
)
.context(error::KvBackendSnafu)?;
let election = PgElection::with_pg_client(
opts.grpc.server_addr.clone(),
election_client,
opts.store_key_prefix.clone(),
candidate_lease_ttl,
meta_lease_ttl,
@@ -333,20 +316,18 @@ pub async fn metasrv_builder(
&opts.meta_table_name,
opts.meta_election_lock_id,
)
.await
.context(error::KvBackendSnafu)?;
.await?;
let pool = create_postgres_pool(&opts.store_addrs, Some(cfg), opts.backend_tls.clone())
.await?;
let kv_backend = PgStore::with_pg_pool(
pool,
let kv_backend = build_postgres_kv_backend(
&opts.store_addrs,
Some(cfg),
opts.backend_tls.clone(),
opts.meta_schema_name.as_deref(),
&opts.meta_table_name,
opts.max_txn_ops,
opts.auto_create_schema,
)
.await
.context(error::KvBackendSnafu)?;
.await?;
(kv_backend, Some(election))
}
@@ -355,45 +336,33 @@ pub async fn metasrv_builder(
use std::time::Duration;
use common_meta::election::CANDIDATE_LEASE_SECS;
use common_meta::election::rds::mysql::{ElectionMysqlClient, MySqlElection};
use common_meta::kv_backend::rds::MySqlStore;
use crate::utils::mysql::create_mysql_pool;
use crate::utils::mysql::{build_mysql_election, build_mysql_kv_backend};
let pool = create_mysql_pool(&opts.store_addrs, opts.backend_tls.as_ref()).await?;
let kv_backend =
MySqlStore::with_mysql_pool(pool, &opts.meta_table_name, opts.max_txn_ops)
.await
.context(error::KvBackendSnafu)?;
let kv_backend = build_mysql_kv_backend(
&opts.store_addrs,
opts.backend_tls.as_ref(),
&opts.meta_table_name,
opts.max_txn_ops,
)
.await?;
// Since election will acquire a lock of the table, we need a separate table for election.
let election_table_name = opts.meta_table_name.clone() + "_election";
// We use a separate pool for election since we need a different session keep-alive idle time.
let pool = create_mysql_pool(&opts.store_addrs, opts.backend_tls.as_ref()).await?;
let execution_timeout = Duration::from_secs(META_LEASE_SECS);
let statement_timeout = Duration::from_secs(META_LEASE_SECS);
let idle_session_timeout = Duration::from_secs(META_LEASE_SECS);
let innode_lock_wait_timeout = Duration::from_secs(META_LEASE_SECS / 2);
let meta_lease_ttl = Duration::from_secs(META_LEASE_SECS);
let candidate_lease_ttl = Duration::from_secs(CANDIDATE_LEASE_SECS);
let election_client = ElectionMysqlClient::new(
pool,
execution_timeout,
statement_timeout,
innode_lock_wait_timeout,
idle_session_timeout,
&election_table_name,
);
let election = MySqlElection::with_mysql_client(
let election = build_mysql_election(
&opts.store_addrs,
opts.backend_tls.as_ref(),
opts.grpc.server_addr.clone(),
election_client,
opts.store_key_prefix.clone(),
candidate_lease_ttl,
meta_lease_ttl,
&election_table_name,
innode_lock_wait_timeout,
)
.await
.context(error::KvBackendSnafu)?;
.await?;
(kv_backend, Some(election))
}
};

View File

@@ -12,6 +12,10 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use common_meta::election::ElectionRef;
use common_meta::election::rds::mysql::{ElectionMysqlClient, MySqlElection};
use common_meta::kv_backend::KvBackendRef;
use common_meta::kv_backend::rds::MySqlStore;
use common_telemetry::info;
use servers::tls::{TlsMode, TlsOption};
use snafu::{OptionExt, ResultExt};
@@ -83,3 +87,63 @@ pub async fn create_mysql_pool(
Ok(pool)
}
/// Builds a MySQL-backed metadata [`KvBackendRef`].
///
/// * `store_addrs` - MySQL connection URLs; only the first address is used.
/// * `tls_config` - optional TLS settings for the MySQL connection.
/// * `table_name` - metadata KV table name.
/// * `max_txn_ops` - maximum operations allowed in one metadata transaction.
pub async fn build_mysql_kv_backend(
store_addrs: &[String],
tls_config: Option<&TlsOption>,
table_name: &str,
max_txn_ops: usize,
) -> Result<KvBackendRef> {
let pool = create_mysql_pool(store_addrs, tls_config).await?;
MySqlStore::with_mysql_pool(pool, table_name, max_txn_ops)
.await
.context(error::KvBackendSnafu)
}
/// Builds a MySQL-backed election implementation.
///
/// * `store_addrs` - MySQL connection URLs; only the first address is used.
/// * `tls_config` - optional TLS settings for the MySQL connection.
/// * `leader_value` - advertised address of this election candidate.
/// * `store_key_prefix` - prefix for election and candidate keys.
/// * `candidate_lease_ttl` - TTL for registered candidate metadata.
/// * `meta_lease_ttl` - TTL for the elected leader metadata.
/// * `election_table_name` - dedicated table used for election locking and records.
/// * `innodb_lock_wait_timeout` - session lock wait timeout for election transactions.
#[allow(clippy::too_many_arguments)]
pub async fn build_mysql_election(
store_addrs: &[String],
tls_config: Option<&TlsOption>,
leader_value: String,
store_key_prefix: String,
candidate_lease_ttl: std::time::Duration,
meta_lease_ttl: std::time::Duration,
election_table_name: &str,
innodb_lock_wait_timeout: std::time::Duration,
) -> Result<ElectionRef> {
let pool = create_mysql_pool(store_addrs, tls_config).await?;
let election_client = ElectionMysqlClient::new(
pool,
meta_lease_ttl,
meta_lease_ttl,
innodb_lock_wait_timeout,
meta_lease_ttl,
election_table_name,
);
MySqlElection::with_mysql_client(
leader_value,
election_client,
store_key_prefix,
candidate_lease_ttl,
meta_lease_ttl,
election_table_name,
)
.await
.context(error::KvBackendSnafu)
}

View File

@@ -13,6 +13,10 @@
// limitations under the License.
use common_error::ext::BoxedError;
use common_meta::election::ElectionRef;
use common_meta::election::rds::postgres::{ElectionPgClient, PgElection};
use common_meta::kv_backend::KvBackendRef;
use common_meta::kv_backend::rds::PgStore;
use common_meta::kv_backend::rds::postgres::{
TlsMode as PgTlsMode, TlsOption as PgTlsOption, create_postgres_tls_connector,
};
@@ -72,3 +76,77 @@ pub async fn create_postgres_pool(
Ok(pool)
}
/// Builds a Postgres-backed metadata [`KvBackendRef`].
///
/// * `store_addrs` - Postgres connection URLs; only the first address is used.
/// * `cfg` - optional deadpool config to customize pool/session behavior.
/// * `tls_config` - optional TLS settings for the Postgres connection.
/// * `schema_name` - optional schema containing the metadata table.
/// * `table_name` - metadata KV table name.
/// * `max_txn_ops` - maximum operations allowed in one metadata transaction.
/// * `auto_create_schema` - whether to create `schema_name` when it is missing.
#[allow(clippy::too_many_arguments)]
pub async fn build_postgres_kv_backend(
store_addrs: &[String],
cfg: Option<Config>,
tls_config: Option<TlsOption>,
schema_name: Option<&str>,
table_name: &str,
max_txn_ops: usize,
auto_create_schema: bool,
) -> Result<KvBackendRef> {
let pool = create_postgres_pool(store_addrs, cfg, tls_config).await?;
PgStore::with_pg_pool(
pool,
schema_name,
table_name,
max_txn_ops,
auto_create_schema,
)
.await
.context(error::KvBackendSnafu)
}
/// Builds a Postgres-backed election implementation.
///
/// * `store_addrs` - Postgres connection URLs; only the first address is used.
/// * `cfg` - optional deadpool config to customize pool/session behavior.
/// * `tls_config` - optional TLS settings for the Postgres connection.
/// * `leader_value` - advertised address of this election candidate.
/// * `store_key_prefix` - prefix for election and candidate keys.
/// * `candidate_lease_ttl` - TTL for registered candidate metadata.
/// * `meta_lease_ttl` - TTL for the elected leader metadata.
/// * `schema_name` - optional schema containing the metadata table.
/// * `table_name` - metadata KV table name used for election records.
/// * `lock_id` - Postgres advisory lock id used by the election.
#[allow(clippy::too_many_arguments)]
pub async fn build_postgres_election(
store_addrs: &[String],
cfg: Option<Config>,
tls_config: Option<TlsOption>,
leader_value: String,
store_key_prefix: String,
candidate_lease_ttl: std::time::Duration,
meta_lease_ttl: std::time::Duration,
schema_name: Option<&str>,
table_name: &str,
lock_id: u64,
) -> Result<ElectionRef> {
let pool = create_postgres_pool(store_addrs, cfg, tls_config).await?;
let election_client =
ElectionPgClient::new(pool, meta_lease_ttl, meta_lease_ttl, meta_lease_ttl)
.context(error::KvBackendSnafu)?;
PgElection::with_pg_client(
leader_value,
election_client,
store_key_prefix,
candidate_lease_ttl,
meta_lease_ttl,
schema_name,
table_name,
lock_id,
)
.await
.context(error::KvBackendSnafu)
}