From 1dc4a196bf3f34bffa02f23dcc231020de1a2af0 Mon Sep 17 00:00:00 2001 From: Yohan Wal Date: Wed, 19 Mar 2025 19:31:18 +0800 Subject: [PATCH] feat: add mysql election logic (#5694) * feat: add mysql election * feat: add mysql election * chore: fix deps * chore: fix deps * fix: duplicate container * fix: duplicate setup for sqlness * fix: call once * fix: do not use NOWAIT for mysql 5.7 * chore: apply comments * fix: no parallel sqlness for mysql * chore: comments and minor revert * chore: apply comments * chore: apply comments * chore: add to table name * ci: use 2 metasrv to detect election bugs * refactor: better election logic * chore: apply comments * chore: apply comments * feat: version check before startup --- .../setup-greptimedb-cluster/action.yml | 2 +- .github/workflows/develop.yml | 5 +- Cargo.lock | 1 + Cargo.toml | 2 + src/common/meta/src/kv_backend/rds/mysql.rs | 2 +- src/meta-srv/Cargo.toml | 8 +- src/meta-srv/src/bootstrap.rs | 73 +- src/meta-srv/src/election.rs | 2 + src/meta-srv/src/election/mysql.rs | 800 ++++++++++++++++++ src/meta-srv/src/election/postgres.rs | 40 +- src/meta-srv/src/error.rs | 43 + src/meta-srv/src/metasrv.rs | 11 +- tests-fuzz/Cargo.toml | 7 +- tests/runner/src/env.rs | 29 +- tests/runner/src/main.rs | 9 +- tests/runner/src/server_mode.rs | 12 + tests/runner/src/util.rs | 57 ++ 17 files changed, 1059 insertions(+), 44 deletions(-) create mode 100644 src/meta-srv/src/election/mysql.rs diff --git a/.github/actions/setup-greptimedb-cluster/action.yml b/.github/actions/setup-greptimedb-cluster/action.yml index 7c385c43a9..1af7796c50 100644 --- a/.github/actions/setup-greptimedb-cluster/action.yml +++ b/.github/actions/setup-greptimedb-cluster/action.yml @@ -8,7 +8,7 @@ inputs: default: 2 description: "Number of Datanode replicas" meta-replicas: - default: 1 + default: 2 description: "Number of Metasrv replicas" image-registry: default: "docker.io" diff --git a/.github/workflows/develop.yml b/.github/workflows/develop.yml index 473ca83a03..29ac08f979 100644 --- a/.github/workflows/develop.yml +++ b/.github/workflows/develop.yml @@ -576,9 +576,12 @@ jobs: - name: "Remote WAL" opts: "-w kafka -k 127.0.0.1:9092" kafka: true - - name: "Pg Kvbackend" + - name: "PostgreSQL KvBackend" opts: "--setup-pg" kafka: false + - name: "MySQL Kvbackend" + opts: "--setup-mysql" + kafka: false timeout-minutes: 60 steps: - uses: actions/checkout@v4 diff --git a/Cargo.lock b/Cargo.lock index 266d5a0940..27d829ba7c 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -6720,6 +6720,7 @@ dependencies = [ "servers", "session", "snafu 0.8.5", + "sqlx", "store-api", "strum 0.25.0", "table", diff --git a/Cargo.toml b/Cargo.toml index e08b37f436..2aaf58006e 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -191,6 +191,8 @@ snafu = "0.8" sqlx = { version = "0.8", features = [ "runtime-tokio-rustls", "mysql", + "postgres", + "chrono", ] } sysinfo = "0.30" # on branch v0.52.x diff --git a/src/common/meta/src/kv_backend/rds/mysql.rs b/src/common/meta/src/kv_backend/rds/mysql.rs index f38952e090..ef27dfcb1a 100644 --- a/src/common/meta/src/kv_backend/rds/mysql.rs +++ b/src/common/meta/src/kv_backend/rds/mysql.rs @@ -155,7 +155,7 @@ impl<'a> MySqlTemplateFactory<'a> { table_name: table_name.to_string(), create_table_statement: format!( // Cannot be more than 3072 bytes in PRIMARY KEY - "CREATE TABLE IF NOT EXISTS {table_name}(k VARBINARY(3072) PRIMARY KEY, v BLOB);", + "CREATE TABLE IF NOT EXISTS `{table_name}`(k VARBINARY(3072) PRIMARY KEY, v BLOB);", ), range_template: RangeTemplate { point: format!("SELECT k, v FROM {table_name} WHERE k = ?"), diff --git a/src/meta-srv/Cargo.toml b/src/meta-srv/Cargo.toml index d864fa5c7d..fcea00b3eb 100644 --- a/src/meta-srv/Cargo.toml +++ b/src/meta-srv/Cargo.toml @@ -6,7 +6,8 @@ license.workspace = true [features] mock = [] -pg_kvbackend = ["dep:tokio-postgres", "common-meta/pg_kvbackend"] +pg_kvbackend = ["dep:tokio-postgres", "common-meta/pg_kvbackend", "dep:deadpool-postgres", "dep:deadpool"] +mysql_kvbackend = ["dep:sqlx", "common-meta/mysql_kvbackend"] [lints] workspace = true @@ -38,8 +39,8 @@ common-version.workspace = true common-wal.workspace = true dashmap.workspace = true datatypes.workspace = true -deadpool.workspace = true -deadpool-postgres.workspace = true +deadpool = { workspace = true, optional = true } +deadpool-postgres = { workspace = true, optional = true } derive_builder.workspace = true etcd-client.workspace = true futures.workspace = true @@ -60,6 +61,7 @@ serde.workspace = true serde_json.workspace = true servers.workspace = true snafu.workspace = true +sqlx = { workspace = true, optional = true } store-api.workspace = true strum.workspace = true table.workspace = true diff --git a/src/meta-srv/src/bootstrap.rs b/src/meta-srv/src/bootstrap.rs index ef7eeda95b..9f78257e38 100644 --- a/src/meta-srv/src/bootstrap.rs +++ b/src/meta-srv/src/bootstrap.rs @@ -23,6 +23,8 @@ use common_config::Configurable; use common_meta::kv_backend::chroot::ChrootKvBackend; use common_meta::kv_backend::etcd::EtcdStore; use common_meta::kv_backend::memory::MemoryKvBackend; +#[cfg(feature = "mysql_kvbackend")] +use common_meta::kv_backend::rds::MySqlStore; #[cfg(feature = "pg_kvbackend")] use common_meta::kv_backend::rds::PgStore; use common_meta::kv_backend::{KvBackendRef, ResettableKvBackendRef}; @@ -38,9 +40,15 @@ use servers::export_metrics::ExportMetricsTask; use servers::http::{HttpServer, HttpServerBuilder}; use servers::metrics_handler::MetricsHandler; use servers::server::Server; -#[cfg(feature = "pg_kvbackend")] +#[cfg(any(feature = "pg_kvbackend", feature = "mysql_kvbackend"))] use snafu::OptionExt; use snafu::ResultExt; +#[cfg(feature = "mysql_kvbackend")] +use sqlx::mysql::MySqlConnectOptions; +#[cfg(feature = "mysql_kvbackend")] +use sqlx::mysql::{MySqlConnection, MySqlPool}; +#[cfg(feature = "mysql_kvbackend")] +use sqlx::Connection; use tokio::net::TcpListener; use tokio::sync::mpsc::{self, Receiver, Sender}; #[cfg(feature = "pg_kvbackend")] @@ -49,9 +57,11 @@ use tonic::codec::CompressionEncoding; use tonic::transport::server::{Router, TcpIncoming}; use crate::election::etcd::EtcdElection; +#[cfg(feature = "mysql_kvbackend")] +use crate::election::mysql::MySqlElection; #[cfg(feature = "pg_kvbackend")] use crate::election::postgres::PgElection; -#[cfg(feature = "pg_kvbackend")] +#[cfg(any(feature = "pg_kvbackend", feature = "mysql_kvbackend"))] use crate::election::CANDIDATE_LEASE_SECS; use crate::metasrv::builder::MetasrvBuilder; use crate::metasrv::{BackendImpl, Metasrv, MetasrvOptions, SelectorRef}; @@ -229,7 +239,6 @@ pub async fn metasrv_builder( #[cfg(feature = "pg_kvbackend")] (None, BackendImpl::PostgresStore) => { let pool = create_postgres_pool(opts).await?; - // TODO(CookiePie): use table name from config. let kv_backend = PgStore::with_pg_pool(pool, &opts.meta_table_name, opts.max_txn_ops) .await .context(error::KvBackendSnafu)?; @@ -246,6 +255,26 @@ pub async fn metasrv_builder( .await?; (kv_backend, Some(election)) } + #[cfg(feature = "mysql_kvbackend")] + (None, BackendImpl::MysqlStore) => { + let pool = create_mysql_pool(opts).await?; + let kv_backend = + MySqlStore::with_mysql_pool(pool, &opts.meta_table_name, opts.max_txn_ops) + .await + .context(error::KvBackendSnafu)?; + // Since election will acquire a lock of the table, we need a separate table for election. + let election_table_name = opts.meta_table_name.clone() + "_election"; + let election_client = create_mysql_client(opts).await?; + let election = MySqlElection::with_mysql_client( + opts.server_addr.clone(), + election_client, + opts.store_key_prefix.clone(), + CANDIDATE_LEASE_SECS, + &election_table_name, + ) + .await?; + (kv_backend, Some(election)) + } }; if !opts.store_key_prefix.is_empty() { @@ -323,3 +352,41 @@ async fn create_postgres_pool(opts: &MetasrvOptions) -> Result Result { + let mysql_url = opts + .store_addrs + .first() + .context(error::InvalidArgumentsSnafu { + err_msg: "empty store addrs", + })?; + // Avoid `SET` commands in sqlx + let opts: MySqlConnectOptions = mysql_url + .parse() + .context(error::ParseMySqlUrlSnafu { mysql_url })?; + let opts = opts + .no_engine_substitution(false) + .pipes_as_concat(false) + .timezone(None) + .set_names(false); + Ok(opts) +} + +#[cfg(feature = "mysql_kvbackend")] +async fn create_mysql_pool(opts: &MetasrvOptions) -> Result { + let opts = setup_mysql_options(opts).await?; + let pool = MySqlPool::connect_with(opts) + .await + .context(error::CreateMySqlPoolSnafu)?; + Ok(pool) +} + +#[cfg(feature = "mysql_kvbackend")] +async fn create_mysql_client(opts: &MetasrvOptions) -> Result { + let opts = setup_mysql_options(opts).await?; + let client = MySqlConnection::connect_with(&opts) + .await + .context(error::ConnectMySqlSnafu)?; + Ok(client) +} diff --git a/src/meta-srv/src/election.rs b/src/meta-srv/src/election.rs index a414c0cd1c..235a0b25ce 100644 --- a/src/meta-srv/src/election.rs +++ b/src/meta-srv/src/election.rs @@ -13,6 +13,8 @@ // limitations under the License. pub mod etcd; +#[cfg(feature = "mysql_kvbackend")] +pub mod mysql; #[cfg(feature = "pg_kvbackend")] pub mod postgres; diff --git a/src/meta-srv/src/election/mysql.rs b/src/meta-srv/src/election/mysql.rs new file mode 100644 index 0000000000..6ccf46cc4c --- /dev/null +++ b/src/meta-srv/src/election/mysql.rs @@ -0,0 +1,800 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::sync::atomic::{AtomicBool, Ordering}; +use std::sync::Arc; +use std::time::Duration; + +use common_meta::distributed_time_constants::{META_KEEP_ALIVE_INTERVAL_SECS, META_LEASE_SECS}; +use common_telemetry::{error, warn}; +use common_time::Timestamp; +use itertools::Itertools; +use snafu::{ensure, OptionExt, ResultExt}; +use sqlx::mysql::{MySqlArguments, MySqlRow}; +use sqlx::query::Query; +use sqlx::{MySql, MySqlConnection, MySqlTransaction, Row}; +use tokio::sync::{broadcast, Mutex, MutexGuard}; +use tokio::time::{Interval, MissedTickBehavior}; + +use crate::election::{ + listen_leader_change, Election, LeaderChangeMessage, LeaderKey, CANDIDATES_ROOT, ELECTION_KEY, +}; +use crate::error::{ + DeserializeFromJsonSnafu, MySqlExecutionSnafu, NoLeaderSnafu, Result, SerializeToJsonSnafu, + UnexpectedSnafu, +}; +use crate::metasrv::{ElectionRef, LeaderValue, MetasrvNodeInfo}; + +// Separator between value and expire time. +const LEASE_SEP: &str = r#"||__metadata_lease_sep||"#; + +/// Lease information. +/// TODO(CookiePie): PgElection can also use this struct. Refactor it to a common module. +#[derive(Default, Clone)] +struct Lease { + leader_value: String, + expire_time: Timestamp, + current: Timestamp, + // origin is the origin value of the lease, used for CAS. + origin: String, +} + +struct ElectionSqlFactory<'a> { + table_name: &'a str, +} + +struct ElectionSqlSet { + campaign: String, + // SQL to put a value with expire time. + // + // Parameters for the query: + // `$1`: key, + // `$2`: value, + // `$3`: lease time in seconds + // + // Returns: + // If the key already exists, return the previous value. + put_value_with_lease: String, + // SQL to update a value with expire time. + // + // Parameters for the query: + // `$1`: updated value, + // `$2`: lease time in seconds + // `$3`: key, + // `$4`: previous value, + update_value_with_lease: String, + // SQL to get a value with expire time. + // + // Parameters: + // `$1`: key + get_value_with_lease: String, + // SQL to get all values with expire time with the given key prefix. + // + // Parameters: + // `$1`: key prefix like 'prefix%' + // + // Returns: + // column 0: value, + // column 1: current timestamp + get_value_with_lease_by_prefix: String, + // SQL to delete a value. + // + // Parameters: + // `?`: key + // + // Returns: + // Rows affected + delete_value: String, +} + +impl<'a> ElectionSqlFactory<'a> { + fn new(table_name: &'a str) -> Self { + Self { table_name } + } + + fn build(self) -> ElectionSqlSet { + ElectionSqlSet { + campaign: self.campaign_sql(), + put_value_with_lease: self.put_value_with_lease_sql(), + update_value_with_lease: self.update_value_with_lease_sql(), + get_value_with_lease: self.get_value_with_lease_sql(), + get_value_with_lease_by_prefix: self.get_value_with_lease_by_prefix_sql(), + delete_value: self.delete_value_sql(), + } + } + + // Currently the session timeout is longer than the leader lease time. + // So the leader will renew the lease twice before the session timeout if everything goes well. + fn set_idle_session_timeout_sql(&self) -> String { + format!("SET SESSION wait_timeout = {};", META_LEASE_SECS + 1) + } + + fn set_lock_wait_timeout_sql(&self) -> &str { + "SET SESSION innodb_lock_wait_timeout = 1;" + } + + fn create_table_sql(&self) -> String { + format!( + r#" + CREATE TABLE IF NOT EXISTS `{}` ( + k VARBINARY(3072) PRIMARY KEY, + v BLOB + ); + "#, + self.table_name + ) + } + + fn insert_once(&self) -> String { + format!( + "INSERT IGNORE INTO `{}` (k, v) VALUES ('__place_holder_for_lock', '');", + self.table_name + ) + } + + fn check_version(&self) -> &str { + "SELECT @@version;" + } + + fn campaign_sql(&self) -> String { + format!("SELECT * FROM `{}` FOR UPDATE;", self.table_name) + } + + fn put_value_with_lease_sql(&self) -> String { + format!( + r#" + INSERT INTO `{}` (k, v) VALUES ( + ?, + CONCAT( + ?, + '{}', + DATE_FORMAT(DATE_ADD(NOW(4), INTERVAL ? SECOND), '%Y-%m-%d %T.%f') + ) + ) + ON DUPLICATE KEY UPDATE v = VALUES(v); + "#, + self.table_name, LEASE_SEP + ) + } + + fn update_value_with_lease_sql(&self) -> String { + format!( + r#"UPDATE `{}` + SET v = CONCAT(?, '{}', DATE_FORMAT(DATE_ADD(NOW(4), INTERVAL ? SECOND), '%Y-%m-%d %T.%f')) + WHERE k = ? AND v = ?"#, + self.table_name, LEASE_SEP + ) + } + + fn get_value_with_lease_sql(&self) -> String { + format!( + r#"SELECT v, DATE_FORMAT(NOW(4), '%Y-%m-%d %T.%f') FROM `{}` WHERE k = ?"#, + self.table_name + ) + } + + fn get_value_with_lease_by_prefix_sql(&self) -> String { + format!( + r#"SELECT v, DATE_FORMAT(NOW(4), '%Y-%m-%d %T.%f') FROM `{}` WHERE k LIKE ?"#, + self.table_name + ) + } + + fn delete_value_sql(&self) -> String { + format!("DELETE FROM {} WHERE k = ?;", self.table_name) + } +} + +/// Parse the value and expire time from the given string. The value should be in the format "value || LEASE_SEP || expire_time". +fn parse_value_and_expire_time(value: &str) -> Result<(String, Timestamp)> { + let (value, expire_time) = + value + .split(LEASE_SEP) + .collect_tuple() + .with_context(|| UnexpectedSnafu { + violated: format!( + "Invalid value {}, expect node info || {} || expire time", + value, LEASE_SEP + ), + })?; + // Given expire_time is in the format 'YYYY-MM-DD HH24:MI:SS.MS' + let expire_time = match Timestamp::from_str(expire_time, None) { + Ok(ts) => ts, + Err(_) => UnexpectedSnafu { + violated: format!("Invalid timestamp: {}", expire_time), + } + .fail()?, + }; + Ok((value.to_string(), expire_time)) +} + +#[derive(Debug, Clone, Default)] +struct MySqlLeaderKey { + name: Vec, + key: Vec, + rev: i64, + lease: i64, +} + +impl LeaderKey for MySqlLeaderKey { + fn name(&self) -> &[u8] { + &self.name + } + + fn key(&self) -> &[u8] { + &self.key + } + + fn revision(&self) -> i64 { + self.rev + } + + fn lease_id(&self) -> i64 { + self.lease + } +} + +enum Executor<'a> { + Default(MutexGuard<'a, MySqlConnection>), + Txn(MySqlTransaction<'a>), +} + +impl Executor<'_> { + async fn query( + &mut self, + query: Query<'_, MySql, MySqlArguments>, + sql: &str, + ) -> Result> { + match self { + Executor::Default(client) => { + let res = query + .fetch_all(&mut **client) + .await + .context(MySqlExecutionSnafu { sql })?; + Ok(res) + } + Executor::Txn(txn) => { + let res = query + .fetch_all(&mut **txn) + .await + .context(MySqlExecutionSnafu { sql })?; + Ok(res) + } + } + } + + async fn execute(&mut self, query: Query<'_, MySql, MySqlArguments>, sql: &str) -> Result { + match self { + Executor::Default(client) => { + let res = query + .execute(&mut **client) + .await + .context(MySqlExecutionSnafu { sql })?; + Ok(res.rows_affected()) + } + Executor::Txn(txn) => { + let res = query + .execute(&mut **txn) + .await + .context(MySqlExecutionSnafu { sql })?; + Ok(res.rows_affected()) + } + } + } + + async fn commit(self) -> Result<()> { + match self { + Executor::Txn(txn) => { + txn.commit() + .await + .context(MySqlExecutionSnafu { sql: "COMMIT" })?; + Ok(()) + } + _ => Ok(()), + } + } +} + +/// MySQL implementation of Election. +pub struct MySqlElection { + leader_value: String, + client: Mutex, + is_leader: AtomicBool, + leader_infancy: AtomicBool, + leader_watcher: broadcast::Sender, + store_key_prefix: String, + candidate_lease_ttl_secs: u64, + sql_set: ElectionSqlSet, +} + +impl MySqlElection { + pub async fn with_mysql_client( + leader_value: String, + mut client: sqlx::MySqlConnection, + store_key_prefix: String, + candidate_lease_ttl_secs: u64, + table_name: &str, + ) -> Result { + let sql_factory = ElectionSqlFactory::new(table_name); + sqlx::query(&sql_factory.create_table_sql()) + .execute(&mut client) + .await + .context(MySqlExecutionSnafu { + sql: &sql_factory.create_table_sql(), + })?; + // Set idle session timeout to IDLE_SESSION_TIMEOUT to avoid dead lock. + sqlx::query(&sql_factory.set_idle_session_timeout_sql()) + .execute(&mut client) + .await + .context(MySqlExecutionSnafu { + sql: &sql_factory.set_idle_session_timeout_sql(), + })?; + // Set lock wait timeout to LOCK_WAIT_TIMEOUT to avoid waiting too long. + sqlx::query(sql_factory.set_lock_wait_timeout_sql()) + .execute(&mut client) + .await + .context(MySqlExecutionSnafu { + sql: sql_factory.set_lock_wait_timeout_sql(), + })?; + // Insert at least one row for `SELECT * FOR UPDATE` to work. + sqlx::query(&sql_factory.insert_once()) + .execute(&mut client) + .await + .context(MySqlExecutionSnafu { + sql: &sql_factory.insert_once(), + })?; + // Check MySQL version + Self::check_version(&mut client, sql_factory.check_version()).await?; + let tx = listen_leader_change(leader_value.clone()); + Ok(Arc::new(Self { + leader_value, + client: Mutex::new(client), + is_leader: AtomicBool::new(false), + leader_infancy: AtomicBool::new(false), + leader_watcher: tx, + store_key_prefix, + candidate_lease_ttl_secs, + sql_set: sql_factory.build(), + })) + } + + fn election_key(&self) -> String { + format!("{}{}", self.store_key_prefix, ELECTION_KEY) + } + + fn candidate_root(&self) -> String { + format!("{}{}", self.store_key_prefix, CANDIDATES_ROOT) + } + + fn candidate_key(&self) -> String { + format!("{}{}", self.candidate_root(), self.leader_value) + } +} + +#[async_trait::async_trait] +impl Election for MySqlElection { + type Leader = LeaderValue; + + fn is_leader(&self) -> bool { + self.is_leader.load(Ordering::Relaxed) + } + + fn in_leader_infancy(&self) -> bool { + self.leader_infancy + .compare_exchange(true, false, Ordering::Relaxed, Ordering::Relaxed) + .is_ok() + } + + async fn register_candidate(&self, node_info: &MetasrvNodeInfo) -> Result<()> { + let key = self.candidate_key(); + let node_info = + serde_json::to_string(node_info).with_context(|_| SerializeToJsonSnafu { + input: format!("{node_info:?}"), + })?; + + { + let client = self.client.lock().await; + let mut executor = Executor::Default(client); + let res = self + .put_value_with_lease( + &key, + &node_info, + self.candidate_lease_ttl_secs, + &mut executor, + ) + .await?; + // May registered before, just update the lease. + if !res { + warn!("Candidate already registered, update the lease"); + self.delete_value(&key, &mut executor).await?; + self.put_value_with_lease( + &key, + &node_info, + self.candidate_lease_ttl_secs, + &mut executor, + ) + .await?; + } + } + + // Check if the current lease has expired and renew the lease. + let mut keep_alive_interval = + tokio::time::interval(Duration::from_secs(self.candidate_lease_ttl_secs / 2)); + loop { + let _ = keep_alive_interval.tick().await; + let client = self.client.lock().await; + let mut executor = Executor::Default(client); + let lease = self + .get_value_with_lease(&key, &mut executor) + .await? + .unwrap_or_default(); + + ensure!( + lease.expire_time > lease.current, + UnexpectedSnafu { + violated: format!( + "Candidate lease expired at {:?} (current time: {:?}), key: {:?}", + lease.expire_time, + lease.current, + String::from_utf8_lossy(&key.into_bytes()) + ), + } + ); + + self.update_value_with_lease(&key, &lease.origin, &node_info, &mut executor) + .await?; + std::mem::drop(executor); + } + } + + async fn all_candidates(&self) -> Result> { + let key_prefix = self.candidate_root(); + let client = self.client.lock().await; + let mut executor = Executor::Default(client); + let (mut candidates, current) = self + .get_value_with_lease_by_prefix(&key_prefix, &mut executor) + .await?; + // Remove expired candidates + candidates.retain(|c| c.1 > current); + let mut valid_candidates = Vec::with_capacity(candidates.len()); + for (c, _) in candidates { + let node_info: MetasrvNodeInfo = + serde_json::from_str(&c).with_context(|_| DeserializeFromJsonSnafu { + input: format!("{:?}", c), + })?; + valid_candidates.push(node_info); + } + Ok(valid_candidates) + } + + async fn campaign(&self) -> Result<()> { + let mut keep_alive_interval = + tokio::time::interval(Duration::from_secs(META_KEEP_ALIVE_INTERVAL_SECS)); + keep_alive_interval.set_missed_tick_behavior(MissedTickBehavior::Delay); + loop { + let _ = self.do_campaign(&mut keep_alive_interval).await; + } + } + + async fn leader(&self) -> Result { + if self.is_leader.load(Ordering::Relaxed) { + Ok(self.leader_value.as_bytes().into()) + } else { + let key = self.election_key(); + + let client = self.client.lock().await; + let mut executor = Executor::Default(client); + if let Some(lease) = self.get_value_with_lease(&key, &mut executor).await? { + ensure!(lease.expire_time > lease.current, NoLeaderSnafu); + Ok(lease.leader_value.as_bytes().into()) + } else { + NoLeaderSnafu.fail() + } + } + } + + async fn resign(&self) -> Result<()> { + todo!() + } + + fn subscribe_leader_change(&self) -> broadcast::Receiver { + self.leader_watcher.subscribe() + } +} + +impl MySqlElection { + /// Returns value, expire time and current time. If `with_origin` is true, the origin string is also returned. + async fn get_value_with_lease( + &self, + key: &str, + executor: &mut Executor<'_>, + ) -> Result> { + let key = key.as_bytes(); + let query = sqlx::query(&self.sql_set.get_value_with_lease).bind(key); + let res = executor + .query(query, &self.sql_set.get_value_with_lease) + .await?; + + if res.is_empty() { + return Ok(None); + } + // Safety: Checked if res is empty above. + let current_time_str = String::from_utf8_lossy(res[0].try_get(1).unwrap()); + let current_time = match Timestamp::from_str(¤t_time_str, None) { + Ok(ts) => ts, + Err(_) => UnexpectedSnafu { + violated: format!("Invalid timestamp: {}", current_time_str), + } + .fail()?, + }; + // Safety: Checked if res is empty above. + let value_and_expire_time = String::from_utf8_lossy(res[0].try_get(0).unwrap_or_default()); + let (value, expire_time) = parse_value_and_expire_time(&value_and_expire_time)?; + + Ok(Some(Lease { + leader_value: value, + expire_time, + current: current_time, + origin: value_and_expire_time.to_string(), + })) + } + + /// Returns all values and expire time with the given key prefix. Also returns the current time. + async fn get_value_with_lease_by_prefix( + &self, + key_prefix: &str, + executor: &mut Executor<'_>, + ) -> Result<(Vec<(String, Timestamp)>, Timestamp)> { + let key_prefix = format!("{}%", key_prefix).as_bytes().to_vec(); + let query = sqlx::query(&self.sql_set.get_value_with_lease_by_prefix).bind(key_prefix); + let res = executor + .query(query, &self.sql_set.get_value_with_lease_by_prefix) + .await?; + + let mut values_with_leases = vec![]; + let mut current = Timestamp::default(); + for row in res { + let current_time_str = row.try_get(1).unwrap_or_default(); + current = match Timestamp::from_str(current_time_str, None) { + Ok(ts) => ts, + Err(_) => UnexpectedSnafu { + violated: format!("Invalid timestamp: {}", current_time_str), + } + .fail()?, + }; + + let value_and_expire_time = String::from_utf8_lossy(row.try_get(0).unwrap_or_default()); + let (value, expire_time) = parse_value_and_expire_time(&value_and_expire_time)?; + + values_with_leases.push((value, expire_time)); + } + Ok((values_with_leases, current)) + } + + async fn update_value_with_lease( + &self, + key: &str, + prev: &str, + updated: &str, + executor: &mut Executor<'_>, + ) -> Result<()> { + let key = key.as_bytes(); + let prev = prev.as_bytes(); + let updated = updated.as_bytes(); + + let query = sqlx::query(&self.sql_set.update_value_with_lease) + .bind(updated) + .bind(self.candidate_lease_ttl_secs as f64) + .bind(key) + .bind(prev); + let res = executor + .execute(query, &self.sql_set.update_value_with_lease) + .await?; + + ensure!( + res == 1, + UnexpectedSnafu { + violated: format!("Failed to update key: {}", String::from_utf8_lossy(key)), + } + ); + + Ok(()) + } + + /// Returns `true` if the insertion is successful + async fn put_value_with_lease( + &self, + key: &str, + value: &str, + lease_ttl_secs: u64, + executor: &mut Executor<'_>, + ) -> Result { + let key = key.as_bytes(); + let lease_ttl_secs = lease_ttl_secs as f64; + let query = sqlx::query(&self.sql_set.put_value_with_lease) + .bind(key) + .bind(value) + .bind(lease_ttl_secs); + let res = executor + .query(query, &self.sql_set.put_value_with_lease) + .await?; + Ok(res.is_empty()) + } + + /// Returns `true` if the deletion is successful. + /// Caution: Should only delete the key if the lease is expired. + async fn delete_value(&self, key: &str, executor: &mut Executor<'_>) -> Result { + let key = key.as_bytes(); + let query = sqlx::query(&self.sql_set.delete_value).bind(key); + let res = executor.execute(query, &self.sql_set.delete_value).await?; + + Ok(res == 1) + } + + /// Attempts to acquire leadership by executing a campaign. This function continuously checks + /// if the current lease is still valid. + async fn do_campaign(&self, interval: &mut Interval) -> Result<()> { + // Need to restrict the scope of the client to avoid ambiguous overloads. + use sqlx::Acquire; + + loop { + let client = self.client.lock().await; + let executor = Executor::Default(client); + let mut lease = Lease::default(); + match ( + self.lease_check(executor, &mut lease).await, + self.is_leader(), + ) { + // If the leader lease is valid and I'm the leader, renew the lease. + (Ok(_), true) => { + let mut client = self.client.lock().await; + let txn = client + .begin() + .await + .context(MySqlExecutionSnafu { sql: "BEGIN" })?; + let mut executor = Executor::Txn(txn); + let query = sqlx::query(&self.sql_set.campaign); + executor.query(query, &self.sql_set.campaign).await?; + self.renew_lease(executor, lease).await?; + } + // If the leader lease expires and I'm the leader, notify the leader watcher and step down. + // Another instance should be elected as the leader in this case. + (Err(_), true) => { + warn!("Leader lease expired, re-initiate the campaign"); + self.step_down_without_lock().await?; + } + // If the leader lease expires and I'm not the leader, elect myself. + (Err(_), false) => { + warn!("Leader lease expired, re-initiate the campaign"); + let mut client = self.client.lock().await; + let txn = client + .begin() + .await + .context(MySqlExecutionSnafu { sql: "BEGIN" })?; + let mut executor = Executor::Txn(txn); + let query = sqlx::query(&self.sql_set.campaign); + executor.query(query, &self.sql_set.campaign).await?; + self.elected(&mut executor).await?; + executor.commit().await?; + } + // If the leader lease is valid and I'm not the leader, do nothing. + (Ok(_), false) => {} + } + interval.tick().await; + } + } + + /// Renew the lease + async fn renew_lease(&self, mut executor: Executor<'_>, lease: Lease) -> Result<()> { + let key = self.election_key(); + self.update_value_with_lease(&key, &lease.origin, &self.leader_value, &mut executor) + .await?; + executor.commit().await?; + Ok(()) + } + + /// Performs a lease check during the election process. + /// + /// This function performs the following checks and actions: + /// + /// - **Case 1**: If the current instance is not the leader but the lease has expired, it raises an error + /// to re-initiate the campaign. If the leader failed to renew the lease, its session will expire and the lock + /// will be released. + /// - **Case 2**: If all checks pass, the function returns without performing any actions. + async fn lease_check(&self, mut executor: Executor<'_>, lease: &mut Lease) -> Result<()> { + let key = self.election_key(); + let check_lease = self + .get_value_with_lease(&key, &mut executor) + .await? + .context(NoLeaderSnafu)?; + *lease = check_lease; + // Case 1: Lease expired + ensure!(lease.expire_time > lease.current, NoLeaderSnafu); + // Case 2: Everything is fine + Ok(()) + } + + /// Still consider itself as the leader locally but failed to acquire the lock. Step down without deleting the key. + async fn step_down_without_lock(&self) -> Result<()> { + let key = self.election_key().into_bytes(); + let leader_key = MySqlLeaderKey { + name: self.leader_value.clone().into_bytes(), + key: key.clone(), + ..Default::default() + }; + if self + .is_leader + .compare_exchange(true, false, Ordering::Relaxed, Ordering::Relaxed) + .is_ok() + { + if let Err(e) = self + .leader_watcher + .send(LeaderChangeMessage::StepDown(Arc::new(leader_key))) + { + error!(e; "Failed to send leader change message"); + } + } + Ok(()) + } + + /// Elected as leader. The leader should put the key and notify the leader watcher. + /// Caution: Should only elected while holding the lock. + async fn elected(&self, executor: &mut Executor<'_>) -> Result<()> { + let key = self.election_key(); + let leader_key = MySqlLeaderKey { + name: self.leader_value.clone().into_bytes(), + key: key.clone().into_bytes(), + ..Default::default() + }; + self.delete_value(&key, executor).await?; + self.put_value_with_lease(&key, &self.leader_value, META_LEASE_SECS, executor) + .await?; + + if self + .is_leader + .compare_exchange(false, true, Ordering::Relaxed, Ordering::Relaxed) + .is_ok() + { + self.leader_infancy.store(true, Ordering::Relaxed); + + if let Err(e) = self + .leader_watcher + .send(LeaderChangeMessage::Elected(Arc::new(leader_key))) + { + error!(e; "Failed to send leader change message"); + } + } + Ok(()) + } + + /// Check if the MySQL version is supported. + async fn check_version(client: &mut MySqlConnection, sql: &str) -> Result<()> { + let query = sqlx::query(sql); + match query.fetch_one(client).await { + Ok(row) => { + let version: String = row.try_get(0).unwrap(); + if !version.starts_with("8.0") || !version.starts_with("5.7") { + warn!( + "Unsupported MySQL version: {}, expected: [5.7, 8.0]", + version + ); + } + } + Err(e) => { + warn!(e; "Failed to check MySQL version through sql: {}", sql); + } + } + Ok(()) + } +} diff --git a/src/meta-srv/src/election/postgres.rs b/src/meta-srv/src/election/postgres.rs index 5ef5ad3cd8..2106103f92 100644 --- a/src/meta-srv/src/election/postgres.rs +++ b/src/meta-srv/src/election/postgres.rs @@ -109,10 +109,10 @@ impl<'a> ElectionSqlFactory<'a> { } } - // Currently the session timeout is longer than the leader lease time, so the leader lease may expire while the session is still alive. - // Either the leader reconnects and step down or the session expires and the lock is released. - fn set_idle_session_timeout_sql(&self) -> &str { - "SET idle_session_timeout = '10s';" + // Currently the session timeout is longer than the leader lease time. + // So the leader will renew the lease twice before the session timeout if everything goes well. + fn set_idle_session_timeout_sql(&self) -> String { + format!("SET idle_session_timeout = '{}s';", META_LEASE_SECS + 1) } fn campaign_sql(&self) -> String { @@ -241,7 +241,7 @@ impl PgElection { let sql_factory = ElectionSqlFactory::new(lock_id, table_name); // Set idle session timeout to IDLE_SESSION_TIMEOUT to avoid dead advisory lock. client - .execute(sql_factory.set_idle_session_timeout_sql(), &[]) + .execute(&sql_factory.set_idle_session_timeout_sql(), &[]) .await .context(PostgresExecutionSnafu)?; @@ -317,7 +317,9 @@ impl Election for PgElection { prev_expire_time > current_time, UnexpectedSnafu { violated: format!( - "Candidate lease expired, key: {:?}", + "Candidate lease expired at {:?} (current time {:?}), key: {:?}", + prev_expire_time, + current_time, String::from_utf8_lossy(&key.into_bytes()) ), } @@ -369,23 +371,19 @@ impl Election for PgElection { .query(&self.sql_set.campaign, &[]) .await .context(PostgresExecutionSnafu)?; - if let Some(row) = res.first() { - match row.try_get(0) { - Ok(true) => self.leader_action().await?, - Ok(false) => self.follower_action().await?, - Err(_) => { - return UnexpectedSnafu { - violated: "Failed to get the result of acquiring advisory lock" - .to_string(), - } - .fail(); - } + let row = res.first().context(UnexpectedSnafu { + violated: "Failed to get the result of acquiring advisory lock", + })?; + let is_leader = row.try_get(0).map_err(|_| { + UnexpectedSnafu { + violated: "Failed to get the result of get lock", } + .build() + })?; + if is_leader { + self.leader_action().await?; } else { - return UnexpectedSnafu { - violated: "Failed to get the result of acquiring advisory lock".to_string(), - } - .fail(); + self.follower_action().await?; } let _ = keep_alive_interval.tick().await; } diff --git a/src/meta-srv/src/error.rs b/src/meta-srv/src/error.rs index d309cb164c..1b43748baa 100644 --- a/src/meta-srv/src/error.rs +++ b/src/meta-srv/src/error.rs @@ -343,6 +343,16 @@ pub enum Error { location: Location, }, + #[cfg(feature = "mysql_kvbackend")] + #[snafu(display("Failed to parse mysql url: {}", mysql_url))] + ParseMySqlUrl { + #[snafu(source)] + error: sqlx::error::Error, + mysql_url: String, + #[snafu(implicit)] + location: Location, + }, + #[snafu(display("Failed to find table route for {table_id}"))] TableRouteNotFound { table_id: TableId, @@ -729,6 +739,34 @@ pub enum Error { location: Location, }, + #[cfg(feature = "mysql_kvbackend")] + #[snafu(display("Failed to execute via mysql, sql: {}", sql))] + MySqlExecution { + #[snafu(source)] + error: sqlx::Error, + #[snafu(implicit)] + location: Location, + sql: String, + }, + + #[cfg(feature = "mysql_kvbackend")] + #[snafu(display("Failed to create mysql pool"))] + CreateMySqlPool { + #[snafu(source)] + error: sqlx::Error, + #[snafu(implicit)] + location: Location, + }, + + #[cfg(feature = "mysql_kvbackend")] + #[snafu(display("Failed to connect to mysql"))] + ConnectMySql { + #[snafu(source)] + error: sqlx::Error, + #[snafu(implicit)] + location: Location, + }, + #[snafu(display("Handler not found: {}", name))] HandlerNotFound { name: String, @@ -911,6 +949,11 @@ impl ErrorExt for Error { | Error::GetPostgresConnection { .. } | Error::PostgresExecution { .. } | Error::ConnectPostgres { .. } => StatusCode::Internal, + #[cfg(feature = "mysql_kvbackend")] + Error::MySqlExecution { .. } + | Error::CreateMySqlPool { .. } + | Error::ConnectMySql { .. } + | Error::ParseMySqlUrl { .. } => StatusCode::Internal, } } diff --git a/src/meta-srv/src/metasrv.rs b/src/meta-srv/src/metasrv.rs index bffe354772..ca9f3ad87d 100644 --- a/src/meta-srv/src/metasrv.rs +++ b/src/meta-srv/src/metasrv.rs @@ -72,9 +72,9 @@ pub const TABLE_ID_SEQ: &str = "table_id"; pub const FLOW_ID_SEQ: &str = "flow_id"; pub const METASRV_HOME: &str = "./greptimedb_data/metasrv"; -#[cfg(feature = "pg_kvbackend")] +#[cfg(any(feature = "pg_kvbackend", feature = "mysql_kvbackend"))] pub const DEFAULT_META_TABLE_NAME: &str = "greptime_metakv"; -#[cfg(feature = "pg_kvbackend")] +#[cfg(any(feature = "pg_kvbackend", feature = "mysql_kvbackend"))] pub const DEFAULT_META_ELECTION_LOCK_ID: u64 = 1; // The datastores that implements metadata kvbackend. @@ -89,6 +89,9 @@ pub enum BackendImpl { #[cfg(feature = "pg_kvbackend")] // Postgres as metadata storage. PostgresStore, + #[cfg(feature = "mysql_kvbackend")] + // MySql as metadata storage. + MysqlStore, } #[derive(Clone, Debug, PartialEq, Serialize, Deserialize)] @@ -146,7 +149,7 @@ pub struct MetasrvOptions { pub tracing: TracingOptions, /// The datastore for kv metadata. pub backend: BackendImpl, - #[cfg(feature = "pg_kvbackend")] + #[cfg(any(feature = "pg_kvbackend", feature = "mysql_kvbackend"))] /// Table name of rds kv backend. pub meta_table_name: String, #[cfg(feature = "pg_kvbackend")] @@ -191,7 +194,7 @@ impl Default for MetasrvOptions { flush_stats_factor: 3, tracing: TracingOptions::default(), backend: BackendImpl::EtcdStore, - #[cfg(feature = "pg_kvbackend")] + #[cfg(any(feature = "pg_kvbackend", feature = "mysql_kvbackend"))] meta_table_name: DEFAULT_META_TABLE_NAME.to_string(), #[cfg(feature = "pg_kvbackend")] meta_election_lock_id: DEFAULT_META_ELECTION_LOCK_ID, diff --git a/tests-fuzz/Cargo.toml b/tests-fuzz/Cargo.toml index eab760c60d..1bb148f7f6 100644 --- a/tests-fuzz/Cargo.toml +++ b/tests-fuzz/Cargo.toml @@ -53,12 +53,7 @@ serde_yaml = "0.9" snafu = { workspace = true } sql = { workspace = true } sqlparser.workspace = true -sqlx = { version = "0.8", features = [ - "runtime-tokio-rustls", - "mysql", - "postgres", - "chrono", -] } +sqlx.workspace = true store-api = { workspace = true } strum.workspace = true tinytemplate = "1.2" diff --git a/tests/runner/src/env.rs b/tests/runner/src/env.rs index 00d9816984..3432b80840 100644 --- a/tests/runner/src/env.rs +++ b/tests/runner/src/env.rs @@ -72,6 +72,7 @@ pub struct StoreConfig { pub store_addrs: Vec, pub setup_etcd: bool, pub setup_pg: bool, + pub setup_mysql: bool, } #[derive(Clone)] @@ -146,7 +147,6 @@ impl Env { } else { self.build_db(); self.setup_wal(); - let mut db_ctx = GreptimeDBContext::new(self.wal.clone(), self.store_config.clone()); let server_mode = ServerMode::random_standalone(); @@ -171,7 +171,7 @@ impl Env { self.setup_wal(); self.setup_etcd(); self.setup_pg(); - + self.setup_mysql().await; let mut db_ctx = GreptimeDBContext::new(self.wal.clone(), self.store_config.clone()); // start a distributed GreptimeDB @@ -526,6 +526,23 @@ impl Env { } } + /// Setup MySql if needed. + async fn setup_mysql(&self) { + if self.store_config.setup_mysql { + let client_ports = self + .store_config + .store_addrs + .iter() + .map(|s| s.split(':').nth(1).unwrap().parse::().unwrap()) + .collect::>(); + let client_port = client_ports.first().unwrap_or(&3306); + util::setup_mysql(*client_port, None); + + // Docker of MySQL starts slowly, so we need to wait for a while + tokio::time::sleep(Duration::from_secs(10)).await; + } + } + /// Build the DB with `cargo build --bin greptime` fn build_db(&self) { if self.bins_dir.lock().unwrap().is_some() { @@ -535,7 +552,13 @@ impl Env { println!("Going to build the DB..."); let output = Command::new("cargo") .current_dir(util::get_workspace_root()) - .args(["build", "--bin", "greptime"]) + .args([ + "build", + "--bin", + "greptime", + "--features", + "pg_kvbackend,mysql_kvbackend", + ]) .output() .expect("Failed to start GreptimeDB"); if !output.status.success() { diff --git a/tests/runner/src/main.rs b/tests/runner/src/main.rs index b8ce851f1f..d2c0d7bdaf 100644 --- a/tests/runner/src/main.rs +++ b/tests/runner/src/main.rs @@ -112,6 +112,10 @@ struct Args { #[clap(long, default_value = "false")] setup_pg: bool, + /// Whether to setup mysql, by default it is false. + #[clap(long, default_value = "false")] + setup_mysql: bool, + /// The number of jobs to run in parallel. Default to half of the cores. #[clap(short, long, default_value = "0")] jobs: usize, @@ -143,13 +147,15 @@ async fn main() { } // normalize parallelism to 1 if any of the following conditions are met: + // Note: parallelism in pg and mysql is possible, but need configuration. if args.server_addr.server_addr.is_some() || args.setup_etcd || args.setup_pg + || args.setup_mysql || args.kafka_wal_broker_endpoints.is_some() { args.jobs = 1; - println!("Normalizing parallelism to 1 due to server addresses or etcd/pg setup"); + println!("Normalizing parallelism to 1 due to server addresses or etcd/pg/mysql setup"); } let config = ConfigBuilder::default() @@ -179,6 +185,7 @@ async fn main() { store_addrs: args.store_addrs.clone(), setup_etcd: args.setup_etcd, setup_pg: args.setup_pg, + setup_mysql: args.setup_mysql, }; let runner = Runner::new( diff --git a/tests/runner/src/server_mode.rs b/tests/runner/src/server_mode.rs index 39f47a5424..e7971dc73a 100644 --- a/tests/runner/src/server_mode.rs +++ b/tests/runner/src/server_mode.rs @@ -436,6 +436,18 @@ impl ServerMode { ); args.extend(vec!["--backend".to_string(), "postgres-store".to_string()]); args.extend(vec!["--store-addrs".to_string(), pg_server_addr]); + } else if db_ctx.store_config().setup_mysql { + let client_ports = db_ctx + .store_config() + .store_addrs + .iter() + .map(|s| s.split(':').nth(1).unwrap().parse::().unwrap()) + .collect::>(); + let client_port = client_ports.first().unwrap_or(&3306); + let mysql_server_addr = + format!("mysql://greptimedb:admin@127.0.0.1:{}/mysql", client_port); + args.extend(vec!["--backend".to_string(), "mysql-store".to_string()]); + args.extend(vec!["--store-addrs".to_string(), mysql_server_addr]); } else if db_ctx.store_config().store_addrs.is_empty() { args.extend(vec!["--backend".to_string(), "memory-store".to_string()]) } diff --git a/tests/runner/src/util.rs b/tests/runner/src/util.rs index b7c688bec8..7f7de8500a 100644 --- a/tests/runner/src/util.rs +++ b/tests/runner/src/util.rs @@ -352,6 +352,63 @@ pub fn setup_pg(pg_port: u16, pg_version: Option<&str>) { } } +/// Set up a MySql server in docker. +pub fn setup_mysql(mysql_port: u16, mysql_version: Option<&str>) { + if std::process::Command::new("docker") + .args(["-v"]) + .status() + .is_err() + { + panic!("Docker is not installed"); + } + + let mysql_image = if let Some(mysql_version) = mysql_version { + format!("bitnami/mysql:{mysql_version}") + } else { + "bitnami/mysql:5.7".to_string() + }; + let mysql_password = "admin"; + let mysql_user = "greptimedb"; + + let mut arg_list = vec![]; + arg_list.extend(["run", "-d"]); + + let mysql_password_env = format!("MYSQL_PASSWORD={mysql_password}"); + let mysql_user_env = format!("MYSQL_USER={mysql_user}"); + let mysql_root_password_env = format!("MYSQL_ROOT_PASSWORD={mysql_password}"); + let mysql_port_forward = format!("{mysql_port}:3306"); + arg_list.extend([ + "-e", + &mysql_password_env, + "-e", + &mysql_user_env, + "-e", + &mysql_root_password_env, + "-e", + "MYSQL_DATABASE=mysql", + ]); + arg_list.extend(["-p", &mysql_port_forward]); + + arg_list.extend(["--name", "greptimedb_mysql", &mysql_image]); + + let mut cmd = std::process::Command::new("docker"); + + cmd.args(arg_list); + + println!("Starting MySQL with command: {:?}", cmd); + + let status = cmd.status(); + if status.is_err() { + panic!("Failed to start MySQL: {:?}", status); + } else if let Ok(status) = status { + if status.success() { + println!("Started MySQL with port {}", mysql_port); + } else { + panic!("Failed to start MySQL: {:?}", status); + } + } +} + /// Get the dir of test cases. This function only works when the runner is run /// under the project's dir because it depends on some envs set by cargo. pub fn get_case_dir(case_dir: Option) -> String {