feat: add mysql election logic (#5694)

* feat: add mysql election

* feat: add mysql election

* chore: fix deps

* chore: fix deps

* fix: duplicate container

* fix: duplicate setup for sqlness

* fix: call once

* fix: do not use NOWAIT for mysql 5.7

* chore: apply comments

* fix: no parallel sqlness for mysql

* chore: comments and minor revert

* chore: apply comments

* chore: apply comments

* chore: add  to table name

* ci: use 2 metasrv to detect election bugs

* refactor: better election logic

* chore: apply comments

* chore: apply comments

* feat: version check before startup
This commit is contained in:
Yohan Wal
2025-03-19 19:31:18 +08:00
committed by GitHub
parent 2431cd3bdf
commit 1dc4a196bf
17 changed files with 1059 additions and 44 deletions

View File

@@ -8,7 +8,7 @@ inputs:
default: 2 default: 2
description: "Number of Datanode replicas" description: "Number of Datanode replicas"
meta-replicas: meta-replicas:
default: 1 default: 2
description: "Number of Metasrv replicas" description: "Number of Metasrv replicas"
image-registry: image-registry:
default: "docker.io" default: "docker.io"

View File

@@ -576,9 +576,12 @@ jobs:
- name: "Remote WAL" - name: "Remote WAL"
opts: "-w kafka -k 127.0.0.1:9092" opts: "-w kafka -k 127.0.0.1:9092"
kafka: true kafka: true
- name: "Pg Kvbackend" - name: "PostgreSQL KvBackend"
opts: "--setup-pg" opts: "--setup-pg"
kafka: false kafka: false
- name: "MySQL Kvbackend"
opts: "--setup-mysql"
kafka: false
timeout-minutes: 60 timeout-minutes: 60
steps: steps:
- uses: actions/checkout@v4 - uses: actions/checkout@v4

1
Cargo.lock generated
View File

@@ -6720,6 +6720,7 @@ dependencies = [
"servers", "servers",
"session", "session",
"snafu 0.8.5", "snafu 0.8.5",
"sqlx",
"store-api", "store-api",
"strum 0.25.0", "strum 0.25.0",
"table", "table",

View File

@@ -191,6 +191,8 @@ snafu = "0.8"
sqlx = { version = "0.8", features = [ sqlx = { version = "0.8", features = [
"runtime-tokio-rustls", "runtime-tokio-rustls",
"mysql", "mysql",
"postgres",
"chrono",
] } ] }
sysinfo = "0.30" sysinfo = "0.30"
# on branch v0.52.x # on branch v0.52.x

View File

@@ -155,7 +155,7 @@ impl<'a> MySqlTemplateFactory<'a> {
table_name: table_name.to_string(), table_name: table_name.to_string(),
create_table_statement: format!( create_table_statement: format!(
// Cannot be more than 3072 bytes in PRIMARY KEY // Cannot be more than 3072 bytes in PRIMARY KEY
"CREATE TABLE IF NOT EXISTS {table_name}(k VARBINARY(3072) PRIMARY KEY, v BLOB);", "CREATE TABLE IF NOT EXISTS `{table_name}`(k VARBINARY(3072) PRIMARY KEY, v BLOB);",
), ),
range_template: RangeTemplate { range_template: RangeTemplate {
point: format!("SELECT k, v FROM {table_name} WHERE k = ?"), point: format!("SELECT k, v FROM {table_name} WHERE k = ?"),

View File

@@ -6,7 +6,8 @@ license.workspace = true
[features] [features]
mock = [] mock = []
pg_kvbackend = ["dep:tokio-postgres", "common-meta/pg_kvbackend"] pg_kvbackend = ["dep:tokio-postgres", "common-meta/pg_kvbackend", "dep:deadpool-postgres", "dep:deadpool"]
mysql_kvbackend = ["dep:sqlx", "common-meta/mysql_kvbackend"]
[lints] [lints]
workspace = true workspace = true
@@ -38,8 +39,8 @@ common-version.workspace = true
common-wal.workspace = true common-wal.workspace = true
dashmap.workspace = true dashmap.workspace = true
datatypes.workspace = true datatypes.workspace = true
deadpool.workspace = true deadpool = { workspace = true, optional = true }
deadpool-postgres.workspace = true deadpool-postgres = { workspace = true, optional = true }
derive_builder.workspace = true derive_builder.workspace = true
etcd-client.workspace = true etcd-client.workspace = true
futures.workspace = true futures.workspace = true
@@ -60,6 +61,7 @@ serde.workspace = true
serde_json.workspace = true serde_json.workspace = true
servers.workspace = true servers.workspace = true
snafu.workspace = true snafu.workspace = true
sqlx = { workspace = true, optional = true }
store-api.workspace = true store-api.workspace = true
strum.workspace = true strum.workspace = true
table.workspace = true table.workspace = true

View File

@@ -23,6 +23,8 @@ use common_config::Configurable;
use common_meta::kv_backend::chroot::ChrootKvBackend; use common_meta::kv_backend::chroot::ChrootKvBackend;
use common_meta::kv_backend::etcd::EtcdStore; use common_meta::kv_backend::etcd::EtcdStore;
use common_meta::kv_backend::memory::MemoryKvBackend; use common_meta::kv_backend::memory::MemoryKvBackend;
#[cfg(feature = "mysql_kvbackend")]
use common_meta::kv_backend::rds::MySqlStore;
#[cfg(feature = "pg_kvbackend")] #[cfg(feature = "pg_kvbackend")]
use common_meta::kv_backend::rds::PgStore; use common_meta::kv_backend::rds::PgStore;
use common_meta::kv_backend::{KvBackendRef, ResettableKvBackendRef}; use common_meta::kv_backend::{KvBackendRef, ResettableKvBackendRef};
@@ -38,9 +40,15 @@ use servers::export_metrics::ExportMetricsTask;
use servers::http::{HttpServer, HttpServerBuilder}; use servers::http::{HttpServer, HttpServerBuilder};
use servers::metrics_handler::MetricsHandler; use servers::metrics_handler::MetricsHandler;
use servers::server::Server; use servers::server::Server;
#[cfg(feature = "pg_kvbackend")] #[cfg(any(feature = "pg_kvbackend", feature = "mysql_kvbackend"))]
use snafu::OptionExt; use snafu::OptionExt;
use snafu::ResultExt; use snafu::ResultExt;
#[cfg(feature = "mysql_kvbackend")]
use sqlx::mysql::MySqlConnectOptions;
#[cfg(feature = "mysql_kvbackend")]
use sqlx::mysql::{MySqlConnection, MySqlPool};
#[cfg(feature = "mysql_kvbackend")]
use sqlx::Connection;
use tokio::net::TcpListener; use tokio::net::TcpListener;
use tokio::sync::mpsc::{self, Receiver, Sender}; use tokio::sync::mpsc::{self, Receiver, Sender};
#[cfg(feature = "pg_kvbackend")] #[cfg(feature = "pg_kvbackend")]
@@ -49,9 +57,11 @@ use tonic::codec::CompressionEncoding;
use tonic::transport::server::{Router, TcpIncoming}; use tonic::transport::server::{Router, TcpIncoming};
use crate::election::etcd::EtcdElection; use crate::election::etcd::EtcdElection;
#[cfg(feature = "mysql_kvbackend")]
use crate::election::mysql::MySqlElection;
#[cfg(feature = "pg_kvbackend")] #[cfg(feature = "pg_kvbackend")]
use crate::election::postgres::PgElection; use crate::election::postgres::PgElection;
#[cfg(feature = "pg_kvbackend")] #[cfg(any(feature = "pg_kvbackend", feature = "mysql_kvbackend"))]
use crate::election::CANDIDATE_LEASE_SECS; use crate::election::CANDIDATE_LEASE_SECS;
use crate::metasrv::builder::MetasrvBuilder; use crate::metasrv::builder::MetasrvBuilder;
use crate::metasrv::{BackendImpl, Metasrv, MetasrvOptions, SelectorRef}; use crate::metasrv::{BackendImpl, Metasrv, MetasrvOptions, SelectorRef};
@@ -229,7 +239,6 @@ pub async fn metasrv_builder(
#[cfg(feature = "pg_kvbackend")] #[cfg(feature = "pg_kvbackend")]
(None, BackendImpl::PostgresStore) => { (None, BackendImpl::PostgresStore) => {
let pool = create_postgres_pool(opts).await?; let pool = create_postgres_pool(opts).await?;
// TODO(CookiePie): use table name from config.
let kv_backend = PgStore::with_pg_pool(pool, &opts.meta_table_name, opts.max_txn_ops) let kv_backend = PgStore::with_pg_pool(pool, &opts.meta_table_name, opts.max_txn_ops)
.await .await
.context(error::KvBackendSnafu)?; .context(error::KvBackendSnafu)?;
@@ -246,6 +255,26 @@ pub async fn metasrv_builder(
.await?; .await?;
(kv_backend, Some(election)) (kv_backend, Some(election))
} }
#[cfg(feature = "mysql_kvbackend")]
(None, BackendImpl::MysqlStore) => {
let pool = create_mysql_pool(opts).await?;
let kv_backend =
MySqlStore::with_mysql_pool(pool, &opts.meta_table_name, opts.max_txn_ops)
.await
.context(error::KvBackendSnafu)?;
// Since election will acquire a lock of the table, we need a separate table for election.
let election_table_name = opts.meta_table_name.clone() + "_election";
let election_client = create_mysql_client(opts).await?;
let election = MySqlElection::with_mysql_client(
opts.server_addr.clone(),
election_client,
opts.store_key_prefix.clone(),
CANDIDATE_LEASE_SECS,
&election_table_name,
)
.await?;
(kv_backend, Some(election))
}
}; };
if !opts.store_key_prefix.is_empty() { if !opts.store_key_prefix.is_empty() {
@@ -323,3 +352,41 @@ async fn create_postgres_pool(opts: &MetasrvOptions) -> Result<deadpool_postgres
.context(error::CreatePostgresPoolSnafu)?; .context(error::CreatePostgresPoolSnafu)?;
Ok(pool) Ok(pool)
} }
#[cfg(feature = "mysql_kvbackend")]
async fn setup_mysql_options(opts: &MetasrvOptions) -> Result<MySqlConnectOptions> {
let mysql_url = opts
.store_addrs
.first()
.context(error::InvalidArgumentsSnafu {
err_msg: "empty store addrs",
})?;
// Avoid `SET` commands in sqlx
let opts: MySqlConnectOptions = mysql_url
.parse()
.context(error::ParseMySqlUrlSnafu { mysql_url })?;
let opts = opts
.no_engine_substitution(false)
.pipes_as_concat(false)
.timezone(None)
.set_names(false);
Ok(opts)
}
#[cfg(feature = "mysql_kvbackend")]
async fn create_mysql_pool(opts: &MetasrvOptions) -> Result<MySqlPool> {
let opts = setup_mysql_options(opts).await?;
let pool = MySqlPool::connect_with(opts)
.await
.context(error::CreateMySqlPoolSnafu)?;
Ok(pool)
}
#[cfg(feature = "mysql_kvbackend")]
async fn create_mysql_client(opts: &MetasrvOptions) -> Result<MySqlConnection> {
let opts = setup_mysql_options(opts).await?;
let client = MySqlConnection::connect_with(&opts)
.await
.context(error::ConnectMySqlSnafu)?;
Ok(client)
}

View File

@@ -13,6 +13,8 @@
// limitations under the License. // limitations under the License.
pub mod etcd; pub mod etcd;
#[cfg(feature = "mysql_kvbackend")]
pub mod mysql;
#[cfg(feature = "pg_kvbackend")] #[cfg(feature = "pg_kvbackend")]
pub mod postgres; pub mod postgres;

View File

@@ -0,0 +1,800 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Arc;
use std::time::Duration;
use common_meta::distributed_time_constants::{META_KEEP_ALIVE_INTERVAL_SECS, META_LEASE_SECS};
use common_telemetry::{error, warn};
use common_time::Timestamp;
use itertools::Itertools;
use snafu::{ensure, OptionExt, ResultExt};
use sqlx::mysql::{MySqlArguments, MySqlRow};
use sqlx::query::Query;
use sqlx::{MySql, MySqlConnection, MySqlTransaction, Row};
use tokio::sync::{broadcast, Mutex, MutexGuard};
use tokio::time::{Interval, MissedTickBehavior};
use crate::election::{
listen_leader_change, Election, LeaderChangeMessage, LeaderKey, CANDIDATES_ROOT, ELECTION_KEY,
};
use crate::error::{
DeserializeFromJsonSnafu, MySqlExecutionSnafu, NoLeaderSnafu, Result, SerializeToJsonSnafu,
UnexpectedSnafu,
};
use crate::metasrv::{ElectionRef, LeaderValue, MetasrvNodeInfo};
// Separator between value and expire time.
const LEASE_SEP: &str = r#"||__metadata_lease_sep||"#;
/// Lease information.
/// TODO(CookiePie): PgElection can also use this struct. Refactor it to a common module.
#[derive(Default, Clone)]
struct Lease {
leader_value: String,
expire_time: Timestamp,
current: Timestamp,
// origin is the origin value of the lease, used for CAS.
origin: String,
}
struct ElectionSqlFactory<'a> {
table_name: &'a str,
}
struct ElectionSqlSet {
campaign: String,
// SQL to put a value with expire time.
//
// Parameters for the query:
// `$1`: key,
// `$2`: value,
// `$3`: lease time in seconds
//
// Returns:
// If the key already exists, return the previous value.
put_value_with_lease: String,
// SQL to update a value with expire time.
//
// Parameters for the query:
// `$1`: updated value,
// `$2`: lease time in seconds
// `$3`: key,
// `$4`: previous value,
update_value_with_lease: String,
// SQL to get a value with expire time.
//
// Parameters:
// `$1`: key
get_value_with_lease: String,
// SQL to get all values with expire time with the given key prefix.
//
// Parameters:
// `$1`: key prefix like 'prefix%'
//
// Returns:
// column 0: value,
// column 1: current timestamp
get_value_with_lease_by_prefix: String,
// SQL to delete a value.
//
// Parameters:
// `?`: key
//
// Returns:
// Rows affected
delete_value: String,
}
impl<'a> ElectionSqlFactory<'a> {
fn new(table_name: &'a str) -> Self {
Self { table_name }
}
fn build(self) -> ElectionSqlSet {
ElectionSqlSet {
campaign: self.campaign_sql(),
put_value_with_lease: self.put_value_with_lease_sql(),
update_value_with_lease: self.update_value_with_lease_sql(),
get_value_with_lease: self.get_value_with_lease_sql(),
get_value_with_lease_by_prefix: self.get_value_with_lease_by_prefix_sql(),
delete_value: self.delete_value_sql(),
}
}
// Currently the session timeout is longer than the leader lease time.
// So the leader will renew the lease twice before the session timeout if everything goes well.
fn set_idle_session_timeout_sql(&self) -> String {
format!("SET SESSION wait_timeout = {};", META_LEASE_SECS + 1)
}
fn set_lock_wait_timeout_sql(&self) -> &str {
"SET SESSION innodb_lock_wait_timeout = 1;"
}
fn create_table_sql(&self) -> String {
format!(
r#"
CREATE TABLE IF NOT EXISTS `{}` (
k VARBINARY(3072) PRIMARY KEY,
v BLOB
);
"#,
self.table_name
)
}
fn insert_once(&self) -> String {
format!(
"INSERT IGNORE INTO `{}` (k, v) VALUES ('__place_holder_for_lock', '');",
self.table_name
)
}
fn check_version(&self) -> &str {
"SELECT @@version;"
}
fn campaign_sql(&self) -> String {
format!("SELECT * FROM `{}` FOR UPDATE;", self.table_name)
}
fn put_value_with_lease_sql(&self) -> String {
format!(
r#"
INSERT INTO `{}` (k, v) VALUES (
?,
CONCAT(
?,
'{}',
DATE_FORMAT(DATE_ADD(NOW(4), INTERVAL ? SECOND), '%Y-%m-%d %T.%f')
)
)
ON DUPLICATE KEY UPDATE v = VALUES(v);
"#,
self.table_name, LEASE_SEP
)
}
fn update_value_with_lease_sql(&self) -> String {
format!(
r#"UPDATE `{}`
SET v = CONCAT(?, '{}', DATE_FORMAT(DATE_ADD(NOW(4), INTERVAL ? SECOND), '%Y-%m-%d %T.%f'))
WHERE k = ? AND v = ?"#,
self.table_name, LEASE_SEP
)
}
fn get_value_with_lease_sql(&self) -> String {
format!(
r#"SELECT v, DATE_FORMAT(NOW(4), '%Y-%m-%d %T.%f') FROM `{}` WHERE k = ?"#,
self.table_name
)
}
fn get_value_with_lease_by_prefix_sql(&self) -> String {
format!(
r#"SELECT v, DATE_FORMAT(NOW(4), '%Y-%m-%d %T.%f') FROM `{}` WHERE k LIKE ?"#,
self.table_name
)
}
fn delete_value_sql(&self) -> String {
format!("DELETE FROM {} WHERE k = ?;", self.table_name)
}
}
/// Parse the value and expire time from the given string. The value should be in the format "value || LEASE_SEP || expire_time".
fn parse_value_and_expire_time(value: &str) -> Result<(String, Timestamp)> {
let (value, expire_time) =
value
.split(LEASE_SEP)
.collect_tuple()
.with_context(|| UnexpectedSnafu {
violated: format!(
"Invalid value {}, expect node info || {} || expire time",
value, LEASE_SEP
),
})?;
// Given expire_time is in the format 'YYYY-MM-DD HH24:MI:SS.MS'
let expire_time = match Timestamp::from_str(expire_time, None) {
Ok(ts) => ts,
Err(_) => UnexpectedSnafu {
violated: format!("Invalid timestamp: {}", expire_time),
}
.fail()?,
};
Ok((value.to_string(), expire_time))
}
#[derive(Debug, Clone, Default)]
struct MySqlLeaderKey {
name: Vec<u8>,
key: Vec<u8>,
rev: i64,
lease: i64,
}
impl LeaderKey for MySqlLeaderKey {
fn name(&self) -> &[u8] {
&self.name
}
fn key(&self) -> &[u8] {
&self.key
}
fn revision(&self) -> i64 {
self.rev
}
fn lease_id(&self) -> i64 {
self.lease
}
}
enum Executor<'a> {
Default(MutexGuard<'a, MySqlConnection>),
Txn(MySqlTransaction<'a>),
}
impl Executor<'_> {
async fn query(
&mut self,
query: Query<'_, MySql, MySqlArguments>,
sql: &str,
) -> Result<Vec<MySqlRow>> {
match self {
Executor::Default(client) => {
let res = query
.fetch_all(&mut **client)
.await
.context(MySqlExecutionSnafu { sql })?;
Ok(res)
}
Executor::Txn(txn) => {
let res = query
.fetch_all(&mut **txn)
.await
.context(MySqlExecutionSnafu { sql })?;
Ok(res)
}
}
}
async fn execute(&mut self, query: Query<'_, MySql, MySqlArguments>, sql: &str) -> Result<u64> {
match self {
Executor::Default(client) => {
let res = query
.execute(&mut **client)
.await
.context(MySqlExecutionSnafu { sql })?;
Ok(res.rows_affected())
}
Executor::Txn(txn) => {
let res = query
.execute(&mut **txn)
.await
.context(MySqlExecutionSnafu { sql })?;
Ok(res.rows_affected())
}
}
}
async fn commit(self) -> Result<()> {
match self {
Executor::Txn(txn) => {
txn.commit()
.await
.context(MySqlExecutionSnafu { sql: "COMMIT" })?;
Ok(())
}
_ => Ok(()),
}
}
}
/// MySQL implementation of Election.
pub struct MySqlElection {
leader_value: String,
client: Mutex<MySqlConnection>,
is_leader: AtomicBool,
leader_infancy: AtomicBool,
leader_watcher: broadcast::Sender<LeaderChangeMessage>,
store_key_prefix: String,
candidate_lease_ttl_secs: u64,
sql_set: ElectionSqlSet,
}
impl MySqlElection {
pub async fn with_mysql_client(
leader_value: String,
mut client: sqlx::MySqlConnection,
store_key_prefix: String,
candidate_lease_ttl_secs: u64,
table_name: &str,
) -> Result<ElectionRef> {
let sql_factory = ElectionSqlFactory::new(table_name);
sqlx::query(&sql_factory.create_table_sql())
.execute(&mut client)
.await
.context(MySqlExecutionSnafu {
sql: &sql_factory.create_table_sql(),
})?;
// Set idle session timeout to IDLE_SESSION_TIMEOUT to avoid dead lock.
sqlx::query(&sql_factory.set_idle_session_timeout_sql())
.execute(&mut client)
.await
.context(MySqlExecutionSnafu {
sql: &sql_factory.set_idle_session_timeout_sql(),
})?;
// Set lock wait timeout to LOCK_WAIT_TIMEOUT to avoid waiting too long.
sqlx::query(sql_factory.set_lock_wait_timeout_sql())
.execute(&mut client)
.await
.context(MySqlExecutionSnafu {
sql: sql_factory.set_lock_wait_timeout_sql(),
})?;
// Insert at least one row for `SELECT * FOR UPDATE` to work.
sqlx::query(&sql_factory.insert_once())
.execute(&mut client)
.await
.context(MySqlExecutionSnafu {
sql: &sql_factory.insert_once(),
})?;
// Check MySQL version
Self::check_version(&mut client, sql_factory.check_version()).await?;
let tx = listen_leader_change(leader_value.clone());
Ok(Arc::new(Self {
leader_value,
client: Mutex::new(client),
is_leader: AtomicBool::new(false),
leader_infancy: AtomicBool::new(false),
leader_watcher: tx,
store_key_prefix,
candidate_lease_ttl_secs,
sql_set: sql_factory.build(),
}))
}
fn election_key(&self) -> String {
format!("{}{}", self.store_key_prefix, ELECTION_KEY)
}
fn candidate_root(&self) -> String {
format!("{}{}", self.store_key_prefix, CANDIDATES_ROOT)
}
fn candidate_key(&self) -> String {
format!("{}{}", self.candidate_root(), self.leader_value)
}
}
#[async_trait::async_trait]
impl Election for MySqlElection {
type Leader = LeaderValue;
fn is_leader(&self) -> bool {
self.is_leader.load(Ordering::Relaxed)
}
fn in_leader_infancy(&self) -> bool {
self.leader_infancy
.compare_exchange(true, false, Ordering::Relaxed, Ordering::Relaxed)
.is_ok()
}
async fn register_candidate(&self, node_info: &MetasrvNodeInfo) -> Result<()> {
let key = self.candidate_key();
let node_info =
serde_json::to_string(node_info).with_context(|_| SerializeToJsonSnafu {
input: format!("{node_info:?}"),
})?;
{
let client = self.client.lock().await;
let mut executor = Executor::Default(client);
let res = self
.put_value_with_lease(
&key,
&node_info,
self.candidate_lease_ttl_secs,
&mut executor,
)
.await?;
// May registered before, just update the lease.
if !res {
warn!("Candidate already registered, update the lease");
self.delete_value(&key, &mut executor).await?;
self.put_value_with_lease(
&key,
&node_info,
self.candidate_lease_ttl_secs,
&mut executor,
)
.await?;
}
}
// Check if the current lease has expired and renew the lease.
let mut keep_alive_interval =
tokio::time::interval(Duration::from_secs(self.candidate_lease_ttl_secs / 2));
loop {
let _ = keep_alive_interval.tick().await;
let client = self.client.lock().await;
let mut executor = Executor::Default(client);
let lease = self
.get_value_with_lease(&key, &mut executor)
.await?
.unwrap_or_default();
ensure!(
lease.expire_time > lease.current,
UnexpectedSnafu {
violated: format!(
"Candidate lease expired at {:?} (current time: {:?}), key: {:?}",
lease.expire_time,
lease.current,
String::from_utf8_lossy(&key.into_bytes())
),
}
);
self.update_value_with_lease(&key, &lease.origin, &node_info, &mut executor)
.await?;
std::mem::drop(executor);
}
}
async fn all_candidates(&self) -> Result<Vec<MetasrvNodeInfo>> {
let key_prefix = self.candidate_root();
let client = self.client.lock().await;
let mut executor = Executor::Default(client);
let (mut candidates, current) = self
.get_value_with_lease_by_prefix(&key_prefix, &mut executor)
.await?;
// Remove expired candidates
candidates.retain(|c| c.1 > current);
let mut valid_candidates = Vec::with_capacity(candidates.len());
for (c, _) in candidates {
let node_info: MetasrvNodeInfo =
serde_json::from_str(&c).with_context(|_| DeserializeFromJsonSnafu {
input: format!("{:?}", c),
})?;
valid_candidates.push(node_info);
}
Ok(valid_candidates)
}
async fn campaign(&self) -> Result<()> {
let mut keep_alive_interval =
tokio::time::interval(Duration::from_secs(META_KEEP_ALIVE_INTERVAL_SECS));
keep_alive_interval.set_missed_tick_behavior(MissedTickBehavior::Delay);
loop {
let _ = self.do_campaign(&mut keep_alive_interval).await;
}
}
async fn leader(&self) -> Result<Self::Leader> {
if self.is_leader.load(Ordering::Relaxed) {
Ok(self.leader_value.as_bytes().into())
} else {
let key = self.election_key();
let client = self.client.lock().await;
let mut executor = Executor::Default(client);
if let Some(lease) = self.get_value_with_lease(&key, &mut executor).await? {
ensure!(lease.expire_time > lease.current, NoLeaderSnafu);
Ok(lease.leader_value.as_bytes().into())
} else {
NoLeaderSnafu.fail()
}
}
}
async fn resign(&self) -> Result<()> {
todo!()
}
fn subscribe_leader_change(&self) -> broadcast::Receiver<LeaderChangeMessage> {
self.leader_watcher.subscribe()
}
}
impl MySqlElection {
/// Returns value, expire time and current time. If `with_origin` is true, the origin string is also returned.
async fn get_value_with_lease(
&self,
key: &str,
executor: &mut Executor<'_>,
) -> Result<Option<Lease>> {
let key = key.as_bytes();
let query = sqlx::query(&self.sql_set.get_value_with_lease).bind(key);
let res = executor
.query(query, &self.sql_set.get_value_with_lease)
.await?;
if res.is_empty() {
return Ok(None);
}
// Safety: Checked if res is empty above.
let current_time_str = String::from_utf8_lossy(res[0].try_get(1).unwrap());
let current_time = match Timestamp::from_str(&current_time_str, None) {
Ok(ts) => ts,
Err(_) => UnexpectedSnafu {
violated: format!("Invalid timestamp: {}", current_time_str),
}
.fail()?,
};
// Safety: Checked if res is empty above.
let value_and_expire_time = String::from_utf8_lossy(res[0].try_get(0).unwrap_or_default());
let (value, expire_time) = parse_value_and_expire_time(&value_and_expire_time)?;
Ok(Some(Lease {
leader_value: value,
expire_time,
current: current_time,
origin: value_and_expire_time.to_string(),
}))
}
/// Returns all values and expire time with the given key prefix. Also returns the current time.
async fn get_value_with_lease_by_prefix(
&self,
key_prefix: &str,
executor: &mut Executor<'_>,
) -> Result<(Vec<(String, Timestamp)>, Timestamp)> {
let key_prefix = format!("{}%", key_prefix).as_bytes().to_vec();
let query = sqlx::query(&self.sql_set.get_value_with_lease_by_prefix).bind(key_prefix);
let res = executor
.query(query, &self.sql_set.get_value_with_lease_by_prefix)
.await?;
let mut values_with_leases = vec![];
let mut current = Timestamp::default();
for row in res {
let current_time_str = row.try_get(1).unwrap_or_default();
current = match Timestamp::from_str(current_time_str, None) {
Ok(ts) => ts,
Err(_) => UnexpectedSnafu {
violated: format!("Invalid timestamp: {}", current_time_str),
}
.fail()?,
};
let value_and_expire_time = String::from_utf8_lossy(row.try_get(0).unwrap_or_default());
let (value, expire_time) = parse_value_and_expire_time(&value_and_expire_time)?;
values_with_leases.push((value, expire_time));
}
Ok((values_with_leases, current))
}
async fn update_value_with_lease(
&self,
key: &str,
prev: &str,
updated: &str,
executor: &mut Executor<'_>,
) -> Result<()> {
let key = key.as_bytes();
let prev = prev.as_bytes();
let updated = updated.as_bytes();
let query = sqlx::query(&self.sql_set.update_value_with_lease)
.bind(updated)
.bind(self.candidate_lease_ttl_secs as f64)
.bind(key)
.bind(prev);
let res = executor
.execute(query, &self.sql_set.update_value_with_lease)
.await?;
ensure!(
res == 1,
UnexpectedSnafu {
violated: format!("Failed to update key: {}", String::from_utf8_lossy(key)),
}
);
Ok(())
}
/// Returns `true` if the insertion is successful
async fn put_value_with_lease(
&self,
key: &str,
value: &str,
lease_ttl_secs: u64,
executor: &mut Executor<'_>,
) -> Result<bool> {
let key = key.as_bytes();
let lease_ttl_secs = lease_ttl_secs as f64;
let query = sqlx::query(&self.sql_set.put_value_with_lease)
.bind(key)
.bind(value)
.bind(lease_ttl_secs);
let res = executor
.query(query, &self.sql_set.put_value_with_lease)
.await?;
Ok(res.is_empty())
}
/// Returns `true` if the deletion is successful.
/// Caution: Should only delete the key if the lease is expired.
async fn delete_value(&self, key: &str, executor: &mut Executor<'_>) -> Result<bool> {
let key = key.as_bytes();
let query = sqlx::query(&self.sql_set.delete_value).bind(key);
let res = executor.execute(query, &self.sql_set.delete_value).await?;
Ok(res == 1)
}
/// Attempts to acquire leadership by executing a campaign. This function continuously checks
/// if the current lease is still valid.
async fn do_campaign(&self, interval: &mut Interval) -> Result<()> {
// Need to restrict the scope of the client to avoid ambiguous overloads.
use sqlx::Acquire;
loop {
let client = self.client.lock().await;
let executor = Executor::Default(client);
let mut lease = Lease::default();
match (
self.lease_check(executor, &mut lease).await,
self.is_leader(),
) {
// If the leader lease is valid and I'm the leader, renew the lease.
(Ok(_), true) => {
let mut client = self.client.lock().await;
let txn = client
.begin()
.await
.context(MySqlExecutionSnafu { sql: "BEGIN" })?;
let mut executor = Executor::Txn(txn);
let query = sqlx::query(&self.sql_set.campaign);
executor.query(query, &self.sql_set.campaign).await?;
self.renew_lease(executor, lease).await?;
}
// If the leader lease expires and I'm the leader, notify the leader watcher and step down.
// Another instance should be elected as the leader in this case.
(Err(_), true) => {
warn!("Leader lease expired, re-initiate the campaign");
self.step_down_without_lock().await?;
}
// If the leader lease expires and I'm not the leader, elect myself.
(Err(_), false) => {
warn!("Leader lease expired, re-initiate the campaign");
let mut client = self.client.lock().await;
let txn = client
.begin()
.await
.context(MySqlExecutionSnafu { sql: "BEGIN" })?;
let mut executor = Executor::Txn(txn);
let query = sqlx::query(&self.sql_set.campaign);
executor.query(query, &self.sql_set.campaign).await?;
self.elected(&mut executor).await?;
executor.commit().await?;
}
// If the leader lease is valid and I'm not the leader, do nothing.
(Ok(_), false) => {}
}
interval.tick().await;
}
}
/// Renew the lease
async fn renew_lease(&self, mut executor: Executor<'_>, lease: Lease) -> Result<()> {
let key = self.election_key();
self.update_value_with_lease(&key, &lease.origin, &self.leader_value, &mut executor)
.await?;
executor.commit().await?;
Ok(())
}
/// Performs a lease check during the election process.
///
/// This function performs the following checks and actions:
///
/// - **Case 1**: If the current instance is not the leader but the lease has expired, it raises an error
/// to re-initiate the campaign. If the leader failed to renew the lease, its session will expire and the lock
/// will be released.
/// - **Case 2**: If all checks pass, the function returns without performing any actions.
async fn lease_check(&self, mut executor: Executor<'_>, lease: &mut Lease) -> Result<()> {
let key = self.election_key();
let check_lease = self
.get_value_with_lease(&key, &mut executor)
.await?
.context(NoLeaderSnafu)?;
*lease = check_lease;
// Case 1: Lease expired
ensure!(lease.expire_time > lease.current, NoLeaderSnafu);
// Case 2: Everything is fine
Ok(())
}
/// Still consider itself as the leader locally but failed to acquire the lock. Step down without deleting the key.
async fn step_down_without_lock(&self) -> Result<()> {
let key = self.election_key().into_bytes();
let leader_key = MySqlLeaderKey {
name: self.leader_value.clone().into_bytes(),
key: key.clone(),
..Default::default()
};
if self
.is_leader
.compare_exchange(true, false, Ordering::Relaxed, Ordering::Relaxed)
.is_ok()
{
if let Err(e) = self
.leader_watcher
.send(LeaderChangeMessage::StepDown(Arc::new(leader_key)))
{
error!(e; "Failed to send leader change message");
}
}
Ok(())
}
/// Elected as leader. The leader should put the key and notify the leader watcher.
/// Caution: Should only elected while holding the lock.
async fn elected(&self, executor: &mut Executor<'_>) -> Result<()> {
let key = self.election_key();
let leader_key = MySqlLeaderKey {
name: self.leader_value.clone().into_bytes(),
key: key.clone().into_bytes(),
..Default::default()
};
self.delete_value(&key, executor).await?;
self.put_value_with_lease(&key, &self.leader_value, META_LEASE_SECS, executor)
.await?;
if self
.is_leader
.compare_exchange(false, true, Ordering::Relaxed, Ordering::Relaxed)
.is_ok()
{
self.leader_infancy.store(true, Ordering::Relaxed);
if let Err(e) = self
.leader_watcher
.send(LeaderChangeMessage::Elected(Arc::new(leader_key)))
{
error!(e; "Failed to send leader change message");
}
}
Ok(())
}
/// Check if the MySQL version is supported.
async fn check_version(client: &mut MySqlConnection, sql: &str) -> Result<()> {
let query = sqlx::query(sql);
match query.fetch_one(client).await {
Ok(row) => {
let version: String = row.try_get(0).unwrap();
if !version.starts_with("8.0") || !version.starts_with("5.7") {
warn!(
"Unsupported MySQL version: {}, expected: [5.7, 8.0]",
version
);
}
}
Err(e) => {
warn!(e; "Failed to check MySQL version through sql: {}", sql);
}
}
Ok(())
}
}

View File

@@ -109,10 +109,10 @@ impl<'a> ElectionSqlFactory<'a> {
} }
} }
// Currently the session timeout is longer than the leader lease time, so the leader lease may expire while the session is still alive. // Currently the session timeout is longer than the leader lease time.
// Either the leader reconnects and step down or the session expires and the lock is released. // So the leader will renew the lease twice before the session timeout if everything goes well.
fn set_idle_session_timeout_sql(&self) -> &str { fn set_idle_session_timeout_sql(&self) -> String {
"SET idle_session_timeout = '10s';" format!("SET idle_session_timeout = '{}s';", META_LEASE_SECS + 1)
} }
fn campaign_sql(&self) -> String { fn campaign_sql(&self) -> String {
@@ -241,7 +241,7 @@ impl PgElection {
let sql_factory = ElectionSqlFactory::new(lock_id, table_name); let sql_factory = ElectionSqlFactory::new(lock_id, table_name);
// Set idle session timeout to IDLE_SESSION_TIMEOUT to avoid dead advisory lock. // Set idle session timeout to IDLE_SESSION_TIMEOUT to avoid dead advisory lock.
client client
.execute(sql_factory.set_idle_session_timeout_sql(), &[]) .execute(&sql_factory.set_idle_session_timeout_sql(), &[])
.await .await
.context(PostgresExecutionSnafu)?; .context(PostgresExecutionSnafu)?;
@@ -317,7 +317,9 @@ impl Election for PgElection {
prev_expire_time > current_time, prev_expire_time > current_time,
UnexpectedSnafu { UnexpectedSnafu {
violated: format!( violated: format!(
"Candidate lease expired, key: {:?}", "Candidate lease expired at {:?} (current time {:?}), key: {:?}",
prev_expire_time,
current_time,
String::from_utf8_lossy(&key.into_bytes()) String::from_utf8_lossy(&key.into_bytes())
), ),
} }
@@ -369,23 +371,19 @@ impl Election for PgElection {
.query(&self.sql_set.campaign, &[]) .query(&self.sql_set.campaign, &[])
.await .await
.context(PostgresExecutionSnafu)?; .context(PostgresExecutionSnafu)?;
if let Some(row) = res.first() { let row = res.first().context(UnexpectedSnafu {
match row.try_get(0) { violated: "Failed to get the result of acquiring advisory lock",
Ok(true) => self.leader_action().await?, })?;
Ok(false) => self.follower_action().await?, let is_leader = row.try_get(0).map_err(|_| {
Err(_) => { UnexpectedSnafu {
return UnexpectedSnafu { violated: "Failed to get the result of get lock",
violated: "Failed to get the result of acquiring advisory lock"
.to_string(),
}
.fail();
}
} }
.build()
})?;
if is_leader {
self.leader_action().await?;
} else { } else {
return UnexpectedSnafu { self.follower_action().await?;
violated: "Failed to get the result of acquiring advisory lock".to_string(),
}
.fail();
} }
let _ = keep_alive_interval.tick().await; let _ = keep_alive_interval.tick().await;
} }

View File

@@ -343,6 +343,16 @@ pub enum Error {
location: Location, location: Location,
}, },
#[cfg(feature = "mysql_kvbackend")]
#[snafu(display("Failed to parse mysql url: {}", mysql_url))]
ParseMySqlUrl {
#[snafu(source)]
error: sqlx::error::Error,
mysql_url: String,
#[snafu(implicit)]
location: Location,
},
#[snafu(display("Failed to find table route for {table_id}"))] #[snafu(display("Failed to find table route for {table_id}"))]
TableRouteNotFound { TableRouteNotFound {
table_id: TableId, table_id: TableId,
@@ -729,6 +739,34 @@ pub enum Error {
location: Location, location: Location,
}, },
#[cfg(feature = "mysql_kvbackend")]
#[snafu(display("Failed to execute via mysql, sql: {}", sql))]
MySqlExecution {
#[snafu(source)]
error: sqlx::Error,
#[snafu(implicit)]
location: Location,
sql: String,
},
#[cfg(feature = "mysql_kvbackend")]
#[snafu(display("Failed to create mysql pool"))]
CreateMySqlPool {
#[snafu(source)]
error: sqlx::Error,
#[snafu(implicit)]
location: Location,
},
#[cfg(feature = "mysql_kvbackend")]
#[snafu(display("Failed to connect to mysql"))]
ConnectMySql {
#[snafu(source)]
error: sqlx::Error,
#[snafu(implicit)]
location: Location,
},
#[snafu(display("Handler not found: {}", name))] #[snafu(display("Handler not found: {}", name))]
HandlerNotFound { HandlerNotFound {
name: String, name: String,
@@ -911,6 +949,11 @@ impl ErrorExt for Error {
| Error::GetPostgresConnection { .. } | Error::GetPostgresConnection { .. }
| Error::PostgresExecution { .. } | Error::PostgresExecution { .. }
| Error::ConnectPostgres { .. } => StatusCode::Internal, | Error::ConnectPostgres { .. } => StatusCode::Internal,
#[cfg(feature = "mysql_kvbackend")]
Error::MySqlExecution { .. }
| Error::CreateMySqlPool { .. }
| Error::ConnectMySql { .. }
| Error::ParseMySqlUrl { .. } => StatusCode::Internal,
} }
} }

View File

@@ -72,9 +72,9 @@ pub const TABLE_ID_SEQ: &str = "table_id";
pub const FLOW_ID_SEQ: &str = "flow_id"; pub const FLOW_ID_SEQ: &str = "flow_id";
pub const METASRV_HOME: &str = "./greptimedb_data/metasrv"; pub const METASRV_HOME: &str = "./greptimedb_data/metasrv";
#[cfg(feature = "pg_kvbackend")] #[cfg(any(feature = "pg_kvbackend", feature = "mysql_kvbackend"))]
pub const DEFAULT_META_TABLE_NAME: &str = "greptime_metakv"; pub const DEFAULT_META_TABLE_NAME: &str = "greptime_metakv";
#[cfg(feature = "pg_kvbackend")] #[cfg(any(feature = "pg_kvbackend", feature = "mysql_kvbackend"))]
pub const DEFAULT_META_ELECTION_LOCK_ID: u64 = 1; pub const DEFAULT_META_ELECTION_LOCK_ID: u64 = 1;
// The datastores that implements metadata kvbackend. // The datastores that implements metadata kvbackend.
@@ -89,6 +89,9 @@ pub enum BackendImpl {
#[cfg(feature = "pg_kvbackend")] #[cfg(feature = "pg_kvbackend")]
// Postgres as metadata storage. // Postgres as metadata storage.
PostgresStore, PostgresStore,
#[cfg(feature = "mysql_kvbackend")]
// MySql as metadata storage.
MysqlStore,
} }
#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)] #[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
@@ -146,7 +149,7 @@ pub struct MetasrvOptions {
pub tracing: TracingOptions, pub tracing: TracingOptions,
/// The datastore for kv metadata. /// The datastore for kv metadata.
pub backend: BackendImpl, pub backend: BackendImpl,
#[cfg(feature = "pg_kvbackend")] #[cfg(any(feature = "pg_kvbackend", feature = "mysql_kvbackend"))]
/// Table name of rds kv backend. /// Table name of rds kv backend.
pub meta_table_name: String, pub meta_table_name: String,
#[cfg(feature = "pg_kvbackend")] #[cfg(feature = "pg_kvbackend")]
@@ -191,7 +194,7 @@ impl Default for MetasrvOptions {
flush_stats_factor: 3, flush_stats_factor: 3,
tracing: TracingOptions::default(), tracing: TracingOptions::default(),
backend: BackendImpl::EtcdStore, backend: BackendImpl::EtcdStore,
#[cfg(feature = "pg_kvbackend")] #[cfg(any(feature = "pg_kvbackend", feature = "mysql_kvbackend"))]
meta_table_name: DEFAULT_META_TABLE_NAME.to_string(), meta_table_name: DEFAULT_META_TABLE_NAME.to_string(),
#[cfg(feature = "pg_kvbackend")] #[cfg(feature = "pg_kvbackend")]
meta_election_lock_id: DEFAULT_META_ELECTION_LOCK_ID, meta_election_lock_id: DEFAULT_META_ELECTION_LOCK_ID,

View File

@@ -53,12 +53,7 @@ serde_yaml = "0.9"
snafu = { workspace = true } snafu = { workspace = true }
sql = { workspace = true } sql = { workspace = true }
sqlparser.workspace = true sqlparser.workspace = true
sqlx = { version = "0.8", features = [ sqlx.workspace = true
"runtime-tokio-rustls",
"mysql",
"postgres",
"chrono",
] }
store-api = { workspace = true } store-api = { workspace = true }
strum.workspace = true strum.workspace = true
tinytemplate = "1.2" tinytemplate = "1.2"

View File

@@ -72,6 +72,7 @@ pub struct StoreConfig {
pub store_addrs: Vec<String>, pub store_addrs: Vec<String>,
pub setup_etcd: bool, pub setup_etcd: bool,
pub setup_pg: bool, pub setup_pg: bool,
pub setup_mysql: bool,
} }
#[derive(Clone)] #[derive(Clone)]
@@ -146,7 +147,6 @@ impl Env {
} else { } else {
self.build_db(); self.build_db();
self.setup_wal(); self.setup_wal();
let mut db_ctx = GreptimeDBContext::new(self.wal.clone(), self.store_config.clone()); let mut db_ctx = GreptimeDBContext::new(self.wal.clone(), self.store_config.clone());
let server_mode = ServerMode::random_standalone(); let server_mode = ServerMode::random_standalone();
@@ -171,7 +171,7 @@ impl Env {
self.setup_wal(); self.setup_wal();
self.setup_etcd(); self.setup_etcd();
self.setup_pg(); self.setup_pg();
self.setup_mysql().await;
let mut db_ctx = GreptimeDBContext::new(self.wal.clone(), self.store_config.clone()); let mut db_ctx = GreptimeDBContext::new(self.wal.clone(), self.store_config.clone());
// start a distributed GreptimeDB // start a distributed GreptimeDB
@@ -526,6 +526,23 @@ impl Env {
} }
} }
/// Setup MySql if needed.
async fn setup_mysql(&self) {
if self.store_config.setup_mysql {
let client_ports = self
.store_config
.store_addrs
.iter()
.map(|s| s.split(':').nth(1).unwrap().parse::<u16>().unwrap())
.collect::<Vec<_>>();
let client_port = client_ports.first().unwrap_or(&3306);
util::setup_mysql(*client_port, None);
// Docker of MySQL starts slowly, so we need to wait for a while
tokio::time::sleep(Duration::from_secs(10)).await;
}
}
/// Build the DB with `cargo build --bin greptime` /// Build the DB with `cargo build --bin greptime`
fn build_db(&self) { fn build_db(&self) {
if self.bins_dir.lock().unwrap().is_some() { if self.bins_dir.lock().unwrap().is_some() {
@@ -535,7 +552,13 @@ impl Env {
println!("Going to build the DB..."); println!("Going to build the DB...");
let output = Command::new("cargo") let output = Command::new("cargo")
.current_dir(util::get_workspace_root()) .current_dir(util::get_workspace_root())
.args(["build", "--bin", "greptime"]) .args([
"build",
"--bin",
"greptime",
"--features",
"pg_kvbackend,mysql_kvbackend",
])
.output() .output()
.expect("Failed to start GreptimeDB"); .expect("Failed to start GreptimeDB");
if !output.status.success() { if !output.status.success() {

View File

@@ -112,6 +112,10 @@ struct Args {
#[clap(long, default_value = "false")] #[clap(long, default_value = "false")]
setup_pg: bool, setup_pg: bool,
/// Whether to setup mysql, by default it is false.
#[clap(long, default_value = "false")]
setup_mysql: bool,
/// The number of jobs to run in parallel. Default to half of the cores. /// The number of jobs to run in parallel. Default to half of the cores.
#[clap(short, long, default_value = "0")] #[clap(short, long, default_value = "0")]
jobs: usize, jobs: usize,
@@ -143,13 +147,15 @@ async fn main() {
} }
// normalize parallelism to 1 if any of the following conditions are met: // normalize parallelism to 1 if any of the following conditions are met:
// Note: parallelism in pg and mysql is possible, but need configuration.
if args.server_addr.server_addr.is_some() if args.server_addr.server_addr.is_some()
|| args.setup_etcd || args.setup_etcd
|| args.setup_pg || args.setup_pg
|| args.setup_mysql
|| args.kafka_wal_broker_endpoints.is_some() || args.kafka_wal_broker_endpoints.is_some()
{ {
args.jobs = 1; args.jobs = 1;
println!("Normalizing parallelism to 1 due to server addresses or etcd/pg setup"); println!("Normalizing parallelism to 1 due to server addresses or etcd/pg/mysql setup");
} }
let config = ConfigBuilder::default() let config = ConfigBuilder::default()
@@ -179,6 +185,7 @@ async fn main() {
store_addrs: args.store_addrs.clone(), store_addrs: args.store_addrs.clone(),
setup_etcd: args.setup_etcd, setup_etcd: args.setup_etcd,
setup_pg: args.setup_pg, setup_pg: args.setup_pg,
setup_mysql: args.setup_mysql,
}; };
let runner = Runner::new( let runner = Runner::new(

View File

@@ -436,6 +436,18 @@ impl ServerMode {
); );
args.extend(vec!["--backend".to_string(), "postgres-store".to_string()]); args.extend(vec!["--backend".to_string(), "postgres-store".to_string()]);
args.extend(vec!["--store-addrs".to_string(), pg_server_addr]); args.extend(vec!["--store-addrs".to_string(), pg_server_addr]);
} else if db_ctx.store_config().setup_mysql {
let client_ports = db_ctx
.store_config()
.store_addrs
.iter()
.map(|s| s.split(':').nth(1).unwrap().parse::<u16>().unwrap())
.collect::<Vec<_>>();
let client_port = client_ports.first().unwrap_or(&3306);
let mysql_server_addr =
format!("mysql://greptimedb:admin@127.0.0.1:{}/mysql", client_port);
args.extend(vec!["--backend".to_string(), "mysql-store".to_string()]);
args.extend(vec!["--store-addrs".to_string(), mysql_server_addr]);
} else if db_ctx.store_config().store_addrs.is_empty() { } else if db_ctx.store_config().store_addrs.is_empty() {
args.extend(vec!["--backend".to_string(), "memory-store".to_string()]) args.extend(vec!["--backend".to_string(), "memory-store".to_string()])
} }

View File

@@ -352,6 +352,63 @@ pub fn setup_pg(pg_port: u16, pg_version: Option<&str>) {
} }
} }
/// Set up a MySql server in docker.
pub fn setup_mysql(mysql_port: u16, mysql_version: Option<&str>) {
if std::process::Command::new("docker")
.args(["-v"])
.status()
.is_err()
{
panic!("Docker is not installed");
}
let mysql_image = if let Some(mysql_version) = mysql_version {
format!("bitnami/mysql:{mysql_version}")
} else {
"bitnami/mysql:5.7".to_string()
};
let mysql_password = "admin";
let mysql_user = "greptimedb";
let mut arg_list = vec![];
arg_list.extend(["run", "-d"]);
let mysql_password_env = format!("MYSQL_PASSWORD={mysql_password}");
let mysql_user_env = format!("MYSQL_USER={mysql_user}");
let mysql_root_password_env = format!("MYSQL_ROOT_PASSWORD={mysql_password}");
let mysql_port_forward = format!("{mysql_port}:3306");
arg_list.extend([
"-e",
&mysql_password_env,
"-e",
&mysql_user_env,
"-e",
&mysql_root_password_env,
"-e",
"MYSQL_DATABASE=mysql",
]);
arg_list.extend(["-p", &mysql_port_forward]);
arg_list.extend(["--name", "greptimedb_mysql", &mysql_image]);
let mut cmd = std::process::Command::new("docker");
cmd.args(arg_list);
println!("Starting MySQL with command: {:?}", cmd);
let status = cmd.status();
if status.is_err() {
panic!("Failed to start MySQL: {:?}", status);
} else if let Ok(status) = status {
if status.success() {
println!("Started MySQL with port {}", mysql_port);
} else {
panic!("Failed to start MySQL: {:?}", status);
}
}
}
/// Get the dir of test cases. This function only works when the runner is run /// Get the dir of test cases. This function only works when the runner is run
/// under the project's dir because it depends on some envs set by cargo. /// under the project's dir because it depends on some envs set by cargo.
pub fn get_case_dir(case_dir: Option<PathBuf>) -> String { pub fn get_case_dir(case_dir: Option<PathBuf>) -> String {