From af1920defc3aebf86b4354f3e52fa0769f8b7db9 Mon Sep 17 00:00:00 2001 From: Yohan Wal Date: Wed, 12 Mar 2025 14:52:56 +0800 Subject: [PATCH] feat: add mysql kvbackend (#5528) * feat: add mysql kvbackend txn support * chore: error handling * chore: follow review comments * chore: follow review comments * chore: follow review comments * revert: mysql QAQ * revert: revert changes to sqls This reverts commit cf98c50dd9b69e443431ef1440588d610eed64fe. * chore: add comments --- .../actions/build-linux-artifacts/action.yml | 4 +- .github/workflows/develop.yml | 10 +- Cargo.lock | 1 + Cargo.toml | 4 + src/cli/Cargo.toml | 1 + src/cli/src/bench.rs | 15 + src/common/meta/Cargo.toml | 2 + src/common/meta/src/error.rs | 55 +- src/common/meta/src/kv_backend.rs | 2 +- src/common/meta/src/kv_backend/rds.rs | 16 +- src/common/meta/src/kv_backend/rds/mysql.rs | 650 ++++++++++++++++++ .../meta/src/kv_backend/rds/postgres.rs | 1 + tests-integration/fixtures/docker-compose.yml | 12 + 13 files changed, 761 insertions(+), 12 deletions(-) create mode 100644 src/common/meta/src/kv_backend/rds/mysql.rs diff --git a/.github/actions/build-linux-artifacts/action.yml b/.github/actions/build-linux-artifacts/action.yml index 4ed82f9ce9..e3db250f16 100644 --- a/.github/actions/build-linux-artifacts/action.yml +++ b/.github/actions/build-linux-artifacts/action.yml @@ -52,7 +52,7 @@ runs: uses: ./.github/actions/build-greptime-binary with: base-image: ubuntu - features: servers/dashboard,pg_kvbackend + features: servers/dashboard,pg_kvbackend,mysql_kvbackend cargo-profile: ${{ inputs.cargo-profile }} artifacts-dir: greptime-linux-${{ inputs.arch }}-${{ inputs.version }} version: ${{ inputs.version }} @@ -70,7 +70,7 @@ runs: if: ${{ inputs.arch == 'amd64' && inputs.dev-mode == 'false' }} # Builds greptime for centos if the host machine is amd64. with: base-image: centos - features: servers/dashboard,pg_kvbackend + features: servers/dashboard,pg_kvbackend,mysql_kvbackend cargo-profile: ${{ inputs.cargo-profile }} artifacts-dir: greptime-linux-${{ inputs.arch }}-centos-${{ inputs.version }} version: ${{ inputs.version }} diff --git a/.github/workflows/develop.yml b/.github/workflows/develop.yml index e9267ebe72..473ca83a03 100644 --- a/.github/workflows/develop.yml +++ b/.github/workflows/develop.yml @@ -111,7 +111,7 @@ jobs: - name: Build greptime binaries shell: bash # `cargo gc` will invoke `cargo build` with specified args - run: cargo gc -- --bin greptime --bin sqlness-runner --features pg_kvbackend + run: cargo gc -- --bin greptime --bin sqlness-runner --features "pg_kvbackend,mysql_kvbackend" - name: Pack greptime binaries shell: bash run: | @@ -270,7 +270,7 @@ jobs: - name: Build greptime bianry shell: bash # `cargo gc` will invoke `cargo build` with specified args - run: cargo gc --profile ci -- --bin greptime --features pg_kvbackend + run: cargo gc --profile ci -- --bin greptime --features "pg_kvbackend,mysql_kvbackend" - name: Pack greptime binary shell: bash run: | @@ -687,7 +687,7 @@ jobs: working-directory: tests-integration/fixtures run: docker compose up -d --wait - name: Run nextest cases - run: cargo nextest run --workspace -F dashboard -F pg_kvbackend + run: cargo nextest run --workspace -F dashboard -F pg_kvbackend -F mysql_kvbackend env: CARGO_BUILD_RUSTFLAGS: "-C link-arg=-fuse-ld=mold" RUST_BACKTRACE: 1 @@ -704,6 +704,7 @@ jobs: GT_MINIO_ENDPOINT_URL: http://127.0.0.1:9000 GT_ETCD_ENDPOINTS: http://127.0.0.1:2379 GT_POSTGRES_ENDPOINTS: postgres://greptimedb:admin@127.0.0.1:5432/postgres + GT_MYSQL_ENDPOINTS: mysql://greptimedb:admin@127.0.0.1:3306/mysql GT_KAFKA_ENDPOINTS: 127.0.0.1:9092 GT_KAFKA_SASL_ENDPOINTS: 127.0.0.1:9093 UNITTEST_LOG_DIR: "__unittest_logs" @@ -739,7 +740,7 @@ jobs: working-directory: tests-integration/fixtures run: docker compose up -d --wait - name: Run nextest cases - run: cargo llvm-cov nextest --workspace --lcov --output-path lcov.info -F dashboard -F pg_kvbackend + run: cargo llvm-cov nextest --workspace --lcov --output-path lcov.info -F dashboard -F pg_kvbackend -F mysql_kvbackend env: CARGO_BUILD_RUSTFLAGS: "-C link-arg=-fuse-ld=mold" RUST_BACKTRACE: 1 @@ -755,6 +756,7 @@ jobs: GT_MINIO_ENDPOINT_URL: http://127.0.0.1:9000 GT_ETCD_ENDPOINTS: http://127.0.0.1:2379 GT_POSTGRES_ENDPOINTS: postgres://greptimedb:admin@127.0.0.1:5432/postgres + GT_MYSQL_ENDPOINTS: mysql://greptimedb:admin@127.0.0.1:3306/mysql GT_KAFKA_ENDPOINTS: 127.0.0.1:9092 GT_KAFKA_SASL_ENDPOINTS: 127.0.0.1:9093 UNITTEST_LOG_DIR: "__unittest_logs" diff --git a/Cargo.lock b/Cargo.lock index 923a28ea5b..dd196a7d52 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2198,6 +2198,7 @@ dependencies = [ "serde_with", "session", "snafu 0.8.5", + "sqlx", "store-api", "strum 0.25.0", "table", diff --git a/Cargo.toml b/Cargo.toml index 6d4127e412..467820c40a 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -188,6 +188,10 @@ shadow-rs = "0.38" similar-asserts = "1.6.0" smallvec = { version = "1", features = ["serde"] } snafu = "0.8" +sqlx = { version = "0.8", features = [ + "runtime-tokio-rustls", + "mysql", +] } sysinfo = "0.30" # on branch v0.52.x sqlparser = { git = "https://github.com/GreptimeTeam/sqlparser-rs.git", rev = "71dd86058d2af97b9925093d40c4e03360403170", features = [ diff --git a/src/cli/Cargo.toml b/src/cli/Cargo.toml index 8904c91935..302c3a292f 100644 --- a/src/cli/Cargo.toml +++ b/src/cli/Cargo.toml @@ -6,6 +6,7 @@ license.workspace = true [features] pg_kvbackend = ["common-meta/pg_kvbackend"] +mysql_kvbackend = ["common-meta/mysql_kvbackend"] [lints] workspace = true diff --git a/src/cli/src/bench.rs b/src/cli/src/bench.rs index 5fc01db3c6..d57cc92685 100644 --- a/src/cli/src/bench.rs +++ b/src/cli/src/bench.rs @@ -23,6 +23,8 @@ use common_error::ext::BoxedError; use common_meta::key::{TableMetadataManager, TableMetadataManagerRef}; use common_meta::kv_backend::etcd::EtcdStore; use common_meta::kv_backend::memory::MemoryKvBackend; +#[cfg(feature = "mysql_kvbackend")] +use common_meta::kv_backend::rds::MySqlStore; #[cfg(feature = "pg_kvbackend")] use common_meta::kv_backend::rds::PgStore; use common_meta::peer::Peer; @@ -63,6 +65,9 @@ pub struct BenchTableMetadataCommand { #[cfg(feature = "pg_kvbackend")] #[clap(long)] postgres_addr: Option, + #[cfg(feature = "mysql_kvbackend")] + #[clap(long)] + mysql_addr: Option, #[clap(long)] count: u32, } @@ -86,6 +91,16 @@ impl BenchTableMetadataCommand { kv_backend }; + #[cfg(feature = "mysql_kvbackend")] + let kv_backend = if let Some(mysql_addr) = &self.mysql_addr { + info!("Using mysql as kv backend"); + MySqlStore::with_url(mysql_addr, "greptime_metakv", 128) + .await + .unwrap() + } else { + kv_backend + }; + let table_metadata_manager = Arc::new(TableMetadataManager::new(kv_backend)); let tool = BenchTableMetadata { diff --git a/src/common/meta/Cargo.toml b/src/common/meta/Cargo.toml index 58ec2cfea8..3003b4408c 100644 --- a/src/common/meta/Cargo.toml +++ b/src/common/meta/Cargo.toml @@ -7,6 +7,7 @@ license.workspace = true [features] testing = [] pg_kvbackend = ["dep:tokio-postgres", "dep:backon", "dep:deadpool-postgres", "dep:deadpool"] +mysql_kvbackend = ["dep:sqlx", "dep:backon"] [lints] workspace = true @@ -57,6 +58,7 @@ serde_json.workspace = true serde_with.workspace = true session.workspace = true snafu.workspace = true +sqlx = { workspace = true, optional = true } store-api.workspace = true strum.workspace = true table.workspace = true diff --git a/src/common/meta/src/error.rs b/src/common/meta/src/error.rs index fb5edc1112..e83aefbe33 100644 --- a/src/common/meta/src/error.rs +++ b/src/common/meta/src/error.rs @@ -685,7 +685,36 @@ pub enum Error { operation: String, }, - #[cfg(feature = "pg_kvbackend")] + #[cfg(feature = "mysql_kvbackend")] + #[snafu(display("Failed to execute via MySql, sql: {}", sql))] + MySqlExecution { + sql: String, + #[snafu(source)] + error: sqlx::Error, + #[snafu(implicit)] + location: Location, + }, + + #[cfg(feature = "mysql_kvbackend")] + #[snafu(display("Failed to create connection pool for MySql"))] + CreateMySqlPool { + #[snafu(source)] + error: sqlx::Error, + #[snafu(implicit)] + location: Location, + }, + + #[cfg(feature = "mysql_kvbackend")] + #[snafu(display("Failed to {} MySql transaction", operation))] + MySqlTransaction { + #[snafu(source)] + error: sqlx::Error, + #[snafu(implicit)] + location: Location, + operation: String, + }, + + #[cfg(any(feature = "pg_kvbackend", feature = "mysql_kvbackend"))] #[snafu(display("Rds transaction retry failed"))] RdsTransactionRetryFailed { #[snafu(implicit)] @@ -823,8 +852,13 @@ impl ErrorExt for Error { PostgresExecution { .. } | CreatePostgresPool { .. } | GetPostgresConnection { .. } - | PostgresTransaction { .. } - | RdsTransactionRetryFailed { .. } => StatusCode::Internal, + | PostgresTransaction { .. } => StatusCode::Internal, + #[cfg(feature = "mysql_kvbackend")] + MySqlExecution { .. } | CreateMySqlPool { .. } | MySqlTransaction { .. } => { + StatusCode::Internal + } + #[cfg(any(feature = "pg_kvbackend", feature = "mysql_kvbackend"))] + RdsTransactionRetryFailed { .. } => StatusCode::Internal, Error::DatanodeTableInfoNotFound { .. } => StatusCode::Internal, } } @@ -835,16 +869,29 @@ impl ErrorExt for Error { } impl Error { - #[cfg(feature = "pg_kvbackend")] + #[cfg(any(feature = "pg_kvbackend", feature = "mysql_kvbackend"))] /// Check if the error is a serialization error. pub fn is_serialization_error(&self) -> bool { match self { + #[cfg(feature = "pg_kvbackend")] Error::PostgresTransaction { error, .. } => { error.code() == Some(&tokio_postgres::error::SqlState::T_R_SERIALIZATION_FAILURE) } + #[cfg(feature = "pg_kvbackend")] Error::PostgresExecution { error, .. } => { error.code() == Some(&tokio_postgres::error::SqlState::T_R_SERIALIZATION_FAILURE) } + #[cfg(feature = "mysql_kvbackend")] + Error::MySqlExecution { + error: sqlx::Error::Database(database_error), + .. + } => { + matches!( + database_error.message(), + "Deadlock found when trying to get lock; try restarting transaction" + | "can't serialize access for this transaction" + ) + } _ => false, } } diff --git a/src/common/meta/src/kv_backend.rs b/src/common/meta/src/kv_backend.rs index b8fd4f3e26..05c7348fa4 100644 --- a/src/common/meta/src/kv_backend.rs +++ b/src/common/meta/src/kv_backend.rs @@ -31,7 +31,7 @@ use crate::rpc::KeyValue; pub mod chroot; pub mod etcd; pub mod memory; -#[cfg(feature = "pg_kvbackend")] +#[cfg(any(feature = "mysql_kvbackend", feature = "pg_kvbackend"))] pub mod rds; pub mod test; pub mod txn; diff --git a/src/common/meta/src/kv_backend/rds.rs b/src/common/meta/src/kv_backend/rds.rs index 15f4cdd390..dbd28c5d78 100644 --- a/src/common/meta/src/kv_backend/rds.rs +++ b/src/common/meta/src/kv_backend/rds.rs @@ -33,10 +33,16 @@ use crate::rpc::store::{ }; use crate::rpc::KeyValue; +#[cfg(feature = "pg_kvbackend")] mod postgres; - +#[cfg(feature = "pg_kvbackend")] pub use postgres::PgStore; +#[cfg(feature = "mysql_kvbackend")] +mod mysql; +#[cfg(feature = "mysql_kvbackend")] +pub use mysql::MySqlStore; + const RDS_STORE_TXN_RETRY_COUNT: usize = 3; /// Query executor for rds. It can execute queries or generate a transaction executor. @@ -106,6 +112,14 @@ impl ExecutorImpl<'_, T> { } } + #[warn(dead_code)] // Used in #[cfg(feature = "mysql_kvbackend")] + async fn execute(&mut self, query: &str, params: &Vec<&Vec>) -> Result<()> { + match self { + Self::Default(executor) => executor.execute(query, params).await, + Self::Txn(executor) => executor.execute(query, params).await, + } + } + async fn commit(self) -> Result<()> { match self { Self::Txn(executor) => executor.commit().await, diff --git a/src/common/meta/src/kv_backend/rds/mysql.rs b/src/common/meta/src/kv_backend/rds/mysql.rs new file mode 100644 index 0000000000..f38952e090 --- /dev/null +++ b/src/common/meta/src/kv_backend/rds/mysql.rs @@ -0,0 +1,650 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::marker::PhantomData; +use std::sync::Arc; + +use common_telemetry::debug; +use snafu::ResultExt; +use sqlx::mysql::MySqlRow; +use sqlx::pool::Pool; +use sqlx::{MySql, MySqlPool, Row, Transaction as MySqlTransaction}; + +use crate::error::{CreateMySqlPoolSnafu, MySqlExecutionSnafu, MySqlTransactionSnafu, Result}; +use crate::kv_backend::rds::{ + Executor, ExecutorFactory, ExecutorImpl, KvQueryExecutor, RdsStore, Transaction, + RDS_STORE_TXN_RETRY_COUNT, +}; +use crate::kv_backend::KvBackendRef; +use crate::rpc::store::{ + BatchDeleteRequest, BatchDeleteResponse, BatchGetRequest, BatchGetResponse, BatchPutRequest, + BatchPutResponse, DeleteRangeRequest, DeleteRangeResponse, RangeRequest, RangeResponse, +}; +use crate::rpc::KeyValue; + +type MySqlClient = Arc>; +pub struct MySqlTxnClient(MySqlTransaction<'static, MySql>); + +fn key_value_from_row(row: MySqlRow) -> KeyValue { + // Safety: key and value are the first two columns in the row + KeyValue { + key: row.get_unchecked(0), + value: row.get_unchecked(1), + } +} + +const EMPTY: &[u8] = &[0]; + +/// Type of range template. +#[derive(Debug, Clone, Copy)] +enum RangeTemplateType { + Point, + Range, + Full, + LeftBounded, + Prefix, +} + +/// Builds params for the given range template type. +impl RangeTemplateType { + fn build_params(&self, mut key: Vec, range_end: Vec) -> Vec> { + match self { + RangeTemplateType::Point => vec![key], + RangeTemplateType::Range => vec![key, range_end], + RangeTemplateType::Full => vec![], + RangeTemplateType::LeftBounded => vec![key], + RangeTemplateType::Prefix => { + key.push(b'%'); + vec![key] + } + } + } +} + +/// Templates for range request. +#[derive(Debug, Clone)] +struct RangeTemplate { + point: String, + range: String, + full: String, + left_bounded: String, + prefix: String, +} + +impl RangeTemplate { + /// Gets the template for the given type. + fn get(&self, typ: RangeTemplateType) -> &str { + match typ { + RangeTemplateType::Point => &self.point, + RangeTemplateType::Range => &self.range, + RangeTemplateType::Full => &self.full, + RangeTemplateType::LeftBounded => &self.left_bounded, + RangeTemplateType::Prefix => &self.prefix, + } + } + + /// Adds limit to the template. + fn with_limit(template: &str, limit: i64) -> String { + if limit == 0 { + return format!("{};", template); + } + format!("{} LIMIT {};", template, limit) + } +} + +fn is_prefix_range(start: &[u8], end: &[u8]) -> bool { + if start.len() != end.len() { + return false; + } + let l = start.len(); + let same_prefix = start[0..l - 1] == end[0..l - 1]; + if let (Some(rhs), Some(lhs)) = (start.last(), end.last()) { + return same_prefix && (*rhs + 1) == *lhs; + } + false +} + +/// Determine the template type for range request. +fn range_template(key: &[u8], range_end: &[u8]) -> RangeTemplateType { + match (key, range_end) { + (_, &[]) => RangeTemplateType::Point, + (EMPTY, EMPTY) => RangeTemplateType::Full, + (_, EMPTY) => RangeTemplateType::LeftBounded, + (start, end) => { + if is_prefix_range(start, end) { + RangeTemplateType::Prefix + } else { + RangeTemplateType::Range + } + } + } +} + +/// Generate in placeholders for MySQL. +fn mysql_generate_in_placeholders(from: usize, to: usize) -> Vec { + (from..=to).map(|_| "?".to_string()).collect() +} + +/// Factory for building sql templates. +struct MySqlTemplateFactory<'a> { + table_name: &'a str, +} + +impl<'a> MySqlTemplateFactory<'a> { + /// Creates a new [`SqlTemplateFactory`] with the given table name. + fn new(table_name: &'a str) -> Self { + Self { table_name } + } + + /// Builds the template set for the given table name. + fn build(&self) -> MySqlTemplateSet { + let table_name = self.table_name; + // Some of queries don't end with `;`, because we need to add `LIMIT` clause. + MySqlTemplateSet { + table_name: table_name.to_string(), + create_table_statement: format!( + // Cannot be more than 3072 bytes in PRIMARY KEY + "CREATE TABLE IF NOT EXISTS {table_name}(k VARBINARY(3072) PRIMARY KEY, v BLOB);", + ), + range_template: RangeTemplate { + point: format!("SELECT k, v FROM {table_name} WHERE k = ?"), + range: format!("SELECT k, v FROM {table_name} WHERE k >= ? AND k < ? ORDER BY k"), + full: format!("SELECT k, v FROM {table_name} ? ORDER BY k"), + left_bounded: format!("SELECT k, v FROM {table_name} WHERE k >= ? ORDER BY k"), + prefix: format!("SELECT k, v FROM {table_name} WHERE k LIKE ? ORDER BY k"), + }, + delete_template: RangeTemplate { + point: format!("DELETE FROM {table_name} WHERE k = ?;"), + range: format!("DELETE FROM {table_name} WHERE k >= ? AND k < ?;"), + full: format!("DELETE FROM {table_name}"), + left_bounded: format!("DELETE FROM {table_name} WHERE k >= ?;"), + prefix: format!("DELETE FROM {table_name} WHERE k LIKE ?;"), + }, + } + } +} + +/// Templates for the given table name. +#[derive(Debug, Clone)] +pub struct MySqlTemplateSet { + table_name: String, + create_table_statement: String, + range_template: RangeTemplate, + delete_template: RangeTemplate, +} + +impl MySqlTemplateSet { + /// Generates the sql for batch get. + fn generate_batch_get_query(&self, key_len: usize) -> String { + let table_name = &self.table_name; + let in_clause = mysql_generate_in_placeholders(1, key_len).join(", "); + format!("SELECT k, v FROM {table_name} WHERE k in ({});", in_clause) + } + + /// Generates the sql for batch delete. + fn generate_batch_delete_query(&self, key_len: usize) -> String { + let table_name = &self.table_name; + let in_clause = mysql_generate_in_placeholders(1, key_len).join(", "); + format!("DELETE FROM {table_name} WHERE k in ({});", in_clause) + } + + /// Generates the sql for batch upsert. + /// For MySQL, it also generates a select query to get the previous values. + fn generate_batch_upsert_query(&self, kv_len: usize) -> (String, String) { + let table_name = &self.table_name; + let in_placeholders: Vec = (1..=kv_len).map(|_| "?".to_string()).collect(); + let in_clause = in_placeholders.join(", "); + let mut values_placeholders = Vec::new(); + for _ in 0..kv_len { + values_placeholders.push("(?, ?)".to_string()); + } + let values_clause = values_placeholders.join(", "); + + ( + format!(r#"SELECT k, v FROM {table_name} WHERE k IN ({in_clause})"#,), + format!( + r#"INSERT INTO {table_name} (k, v) VALUES {values_clause} ON DUPLICATE KEY UPDATE v = VALUES(v);"#, + ), + ) + } +} + +#[async_trait::async_trait] +impl Executor for MySqlClient { + type Transaction<'a> + = MySqlTxnClient + where + Self: 'a; + + fn name() -> &'static str { + "MySql" + } + + async fn query(&mut self, raw_query: &str, params: &[&Vec]) -> Result> { + let query = sqlx::query(raw_query); + let query = params.iter().fold(query, |query, param| query.bind(param)); + let rows = query + .fetch_all(&**self) + .await + .context(MySqlExecutionSnafu { sql: raw_query })?; + Ok(rows.into_iter().map(key_value_from_row).collect()) + } + + async fn execute(&mut self, raw_query: &str, params: &[&Vec]) -> Result<()> { + let query = sqlx::query(raw_query); + let query = params.iter().fold(query, |query, param| query.bind(param)); + query + .execute(&**self) + .await + .context(MySqlExecutionSnafu { sql: raw_query })?; + Ok(()) + } + + async fn txn_executor<'a>(&'a mut self) -> Result> { + // sqlx has no isolation level support for now, so we have to set it manually. + // TODO(CookiePie): Waiting for https://github.com/launchbadge/sqlx/pull/3614 and remove this. + sqlx::query("SET SESSION TRANSACTION ISOLATION LEVEL SERIALIZABLE") + .execute(&**self) + .await + .context(MySqlExecutionSnafu { + sql: "SET SESSION TRANSACTION ISOLATION LEVEL SERIALIZABLE", + })?; + let txn = self + .begin() + .await + .context(MySqlExecutionSnafu { sql: "begin" })?; + Ok(MySqlTxnClient(txn)) + } +} + +#[async_trait::async_trait] +impl Transaction<'_> for MySqlTxnClient { + async fn query(&mut self, raw_query: &str, params: &[&Vec]) -> Result> { + let query = sqlx::query(raw_query); + let query = params.iter().fold(query, |query, param| query.bind(param)); + // As said in https://docs.rs/sqlx/latest/sqlx/trait.Executor.html, we need a `&mut *transaction`. Weird. + let rows = query + .fetch_all(&mut *(self.0)) + .await + .context(MySqlExecutionSnafu { sql: raw_query })?; + Ok(rows.into_iter().map(key_value_from_row).collect()) + } + + async fn execute(&mut self, raw_query: &str, params: &[&Vec]) -> Result<()> { + let query = sqlx::query(raw_query); + let query = params.iter().fold(query, |query, param| query.bind(param)); + // As said in https://docs.rs/sqlx/latest/sqlx/trait.Executor.html, we need a `&mut *transaction`. Weird. + query + .execute(&mut *(self.0)) + .await + .context(MySqlExecutionSnafu { sql: raw_query })?; + Ok(()) + } + + /// Caution: sqlx will stuck on the query if two transactions conflict with each other. + /// Don't know if it's a feature or it depends on the database. Be careful. + async fn commit(self) -> Result<()> { + self.0.commit().await.context(MySqlTransactionSnafu { + operation: "commit", + })?; + Ok(()) + } +} + +pub struct MySqlExecutorFactory { + pool: Arc>, +} + +#[async_trait::async_trait] +impl ExecutorFactory for MySqlExecutorFactory { + async fn default_executor(&self) -> Result { + Ok(self.pool.clone()) + } + + async fn txn_executor<'a>( + &self, + default_executor: &'a mut MySqlClient, + ) -> Result { + default_executor.txn_executor().await + } +} + +/// A MySQL-backed key-value store. +/// It uses [sqlx::Pool] as the connection pool for [RdsStore]. +pub type MySqlStore = RdsStore; + +#[async_trait::async_trait] +impl KvQueryExecutor for MySqlStore { + async fn range_with_query_executor( + &self, + query_executor: &mut ExecutorImpl<'_, MySqlClient>, + req: RangeRequest, + ) -> Result { + let template_type = range_template(&req.key, &req.range_end); + let template = self.sql_template_set.range_template.get(template_type); + let params = template_type.build_params(req.key, req.range_end); + let params_ref = params.iter().collect::>(); + // Always add 1 to limit to check if there is more data + let query = + RangeTemplate::with_limit(template, if req.limit == 0 { 0 } else { req.limit + 1 }); + let limit = req.limit as usize; + debug!("query: {:?}, params: {:?}", query, params); + let mut kvs = query_executor.query(&query, ¶ms_ref).await?; + if req.keys_only { + kvs.iter_mut().for_each(|kv| kv.value = vec![]); + } + // If limit is 0, we always return all data + if limit == 0 || kvs.len() <= limit { + return Ok(RangeResponse { kvs, more: false }); + } + // If limit is greater than the number of rows, we remove the last row and set more to true + let removed = kvs.pop(); + debug_assert!(removed.is_some()); + Ok(RangeResponse { kvs, more: true }) + } + + async fn batch_put_with_query_executor( + &self, + query_executor: &mut ExecutorImpl<'_, MySqlClient>, + req: BatchPutRequest, + ) -> Result { + let mut in_params = Vec::with_capacity(req.kvs.len() * 3); + let mut values_params = Vec::with_capacity(req.kvs.len() * 2); + + for kv in &req.kvs { + let processed_key = &kv.key; + in_params.push(processed_key); + + let processed_value = &kv.value; + values_params.push(processed_key); + values_params.push(processed_value); + } + let in_params = in_params.iter().map(|x| x as _).collect::>(); + let values_params = values_params.iter().map(|x| x as _).collect::>(); + let (select, update) = self + .sql_template_set + .generate_batch_upsert_query(req.kvs.len()); + + // Fast path: if we don't need previous kvs, we can just upsert the keys. + if !req.prev_kv { + query_executor.execute(&update, &values_params).await?; + return Ok(BatchPutResponse::default()); + } + // Should use transaction to ensure atomicity. + if let ExecutorImpl::Default(query_executor) = query_executor { + let txn = query_executor.txn_executor().await?; + let mut txn = ExecutorImpl::Txn(txn); + let res = self.batch_put_with_query_executor(&mut txn, req).await; + txn.commit().await?; + return res; + } + let prev_kvs = query_executor.query(&select, &in_params).await?; + query_executor.execute(&update, &values_params).await?; + Ok(BatchPutResponse { prev_kvs }) + } + + async fn batch_get_with_query_executor( + &self, + query_executor: &mut ExecutorImpl<'_, MySqlClient>, + req: BatchGetRequest, + ) -> Result { + if req.keys.is_empty() { + return Ok(BatchGetResponse { kvs: vec![] }); + } + let query = self + .sql_template_set + .generate_batch_get_query(req.keys.len()); + let params = req.keys.iter().map(|x| x as _).collect::>(); + let kvs = query_executor.query(&query, ¶ms).await?; + Ok(BatchGetResponse { kvs }) + } + + async fn delete_range_with_query_executor( + &self, + query_executor: &mut ExecutorImpl<'_, MySqlClient>, + req: DeleteRangeRequest, + ) -> Result { + // Since we need to know the number of deleted keys, we have no fast path here. + // Should use transaction to ensure atomicity. + if let ExecutorImpl::Default(query_executor) = query_executor { + let txn = query_executor.txn_executor().await?; + let mut txn = ExecutorImpl::Txn(txn); + let res = self.delete_range_with_query_executor(&mut txn, req).await; + txn.commit().await?; + return res; + } + let range_get_req = RangeRequest { + key: req.key.clone(), + range_end: req.range_end.clone(), + limit: 0, + keys_only: false, + }; + let prev_kvs = self + .range_with_query_executor(query_executor, range_get_req) + .await? + .kvs; + let template_type = range_template(&req.key, &req.range_end); + let template = self.sql_template_set.delete_template.get(template_type); + let params = template_type.build_params(req.key, req.range_end); + let params_ref = params.iter().map(|x| x as _).collect::>(); + query_executor.execute(template, ¶ms_ref).await?; + let mut resp = DeleteRangeResponse::new(prev_kvs.len() as i64); + if req.prev_kv { + resp.with_prev_kvs(prev_kvs); + } + Ok(resp) + } + + async fn batch_delete_with_query_executor( + &self, + query_executor: &mut ExecutorImpl<'_, MySqlClient>, + req: BatchDeleteRequest, + ) -> Result { + if req.keys.is_empty() { + return Ok(BatchDeleteResponse::default()); + } + let query = self + .sql_template_set + .generate_batch_delete_query(req.keys.len()); + let params = req.keys.iter().map(|x| x as _).collect::>(); + // Fast path: if we don't need previous kvs, we can just delete the keys. + if !req.prev_kv { + query_executor.execute(&query, ¶ms).await?; + return Ok(BatchDeleteResponse::default()); + } + // Should use transaction to ensure atomicity. + if let ExecutorImpl::Default(query_executor) = query_executor { + let txn = query_executor.txn_executor().await?; + let mut txn = ExecutorImpl::Txn(txn); + let res = self.batch_delete_with_query_executor(&mut txn, req).await; + txn.commit().await?; + return res; + } + // Should get previous kvs first + let batch_get_req = BatchGetRequest { + keys: req.keys.clone(), + }; + let prev_kvs = self + .batch_get_with_query_executor(query_executor, batch_get_req) + .await? + .kvs; + // Pure `DELETE` has no return value, so we need to use `execute` instead of `query`. + query_executor.execute(&query, ¶ms).await?; + if req.prev_kv { + Ok(BatchDeleteResponse { prev_kvs }) + } else { + Ok(BatchDeleteResponse::default()) + } + } +} + +impl MySqlStore { + /// Create [MySqlStore] impl of [KvBackendRef] from url. + pub async fn with_url(url: &str, table_name: &str, max_txn_ops: usize) -> Result { + let pool = MySqlPool::connect(url) + .await + .context(CreateMySqlPoolSnafu)?; + Self::with_mysql_pool(pool, table_name, max_txn_ops).await + } + + /// Create [MySqlStore] impl of [KvBackendRef] from [sqlx::Pool]. + pub async fn with_mysql_pool( + pool: Pool, + table_name: &str, + max_txn_ops: usize, + ) -> Result { + // This step ensures the mysql metadata backend is ready to use. + // We check if greptime_metakv table exists, and we will create a new table + // if it does not exist. + let sql_template_set = MySqlTemplateFactory::new(table_name).build(); + sqlx::query(&sql_template_set.create_table_statement) + .execute(&pool) + .await + .context(MySqlExecutionSnafu { + sql: sql_template_set.create_table_statement.to_string(), + })?; + Ok(Arc::new(MySqlStore { + max_txn_ops, + sql_template_set, + txn_retry_count: RDS_STORE_TXN_RETRY_COUNT, + executor_factory: MySqlExecutorFactory { + pool: Arc::new(pool), + }, + _phantom: PhantomData, + })) + } +} + +#[cfg(test)] +mod tests { + use common_telemetry::init_default_ut_logging; + + use super::*; + use crate::kv_backend::test::{ + prepare_kv_with_prefix, test_kv_batch_delete_with_prefix, test_kv_batch_get_with_prefix, + test_kv_compare_and_put_with_prefix, test_kv_delete_range_with_prefix, + test_kv_put_with_prefix, test_kv_range_2_with_prefix, test_kv_range_with_prefix, + test_txn_compare_equal, test_txn_compare_greater, test_txn_compare_less, + test_txn_compare_not_equal, test_txn_one_compare_op, text_txn_multi_compare_op, + unprepare_kv, + }; + + async fn build_mysql_kv_backend(table_name: &str) -> Option { + init_default_ut_logging(); + let endpoints = std::env::var("GT_MYSQL_ENDPOINTS").unwrap_or_default(); + if endpoints.is_empty() { + return None; + } + let pool = MySqlPool::connect(&endpoints).await.unwrap(); + let sql_templates = MySqlTemplateFactory::new(table_name).build(); + sqlx::query(&sql_templates.create_table_statement) + .execute(&pool) + .await + .unwrap(); + Some(MySqlStore { + max_txn_ops: 128, + sql_template_set: sql_templates, + txn_retry_count: RDS_STORE_TXN_RETRY_COUNT, + executor_factory: MySqlExecutorFactory { + pool: Arc::new(pool), + }, + _phantom: PhantomData, + }) + } + + #[tokio::test] + async fn test_mysql_put() { + let kv_backend = build_mysql_kv_backend("put_test").await.unwrap(); + let prefix = b"put/"; + prepare_kv_with_prefix(&kv_backend, prefix.to_vec()).await; + test_kv_put_with_prefix(&kv_backend, prefix.to_vec()).await; + unprepare_kv(&kv_backend, prefix).await; + } + + #[tokio::test] + async fn test_mysql_range() { + let kv_backend = build_mysql_kv_backend("range_test").await.unwrap(); + let prefix = b"range/"; + prepare_kv_with_prefix(&kv_backend, prefix.to_vec()).await; + test_kv_range_with_prefix(&kv_backend, prefix.to_vec()).await; + unprepare_kv(&kv_backend, prefix).await; + } + + #[tokio::test] + async fn test_mysql_range_2() { + let kv_backend = build_mysql_kv_backend("range2_test").await.unwrap(); + let prefix = b"range2/"; + test_kv_range_2_with_prefix(&kv_backend, prefix.to_vec()).await; + unprepare_kv(&kv_backend, prefix).await; + } + + #[tokio::test] + async fn test_mysql_batch_get() { + let kv_backend = build_mysql_kv_backend("batch_get_test").await.unwrap(); + let prefix = b"batch_get/"; + prepare_kv_with_prefix(&kv_backend, prefix.to_vec()).await; + test_kv_batch_get_with_prefix(&kv_backend, prefix.to_vec()).await; + unprepare_kv(&kv_backend, prefix).await; + } + + #[tokio::test] + async fn test_mysql_batch_delete() { + let kv_backend = build_mysql_kv_backend("batch_delete_test").await.unwrap(); + let prefix = b"batch_delete/"; + prepare_kv_with_prefix(&kv_backend, prefix.to_vec()).await; + test_kv_delete_range_with_prefix(&kv_backend, prefix.to_vec()).await; + unprepare_kv(&kv_backend, prefix).await; + } + + #[tokio::test] + async fn test_mysql_batch_delete_with_prefix() { + let kv_backend = build_mysql_kv_backend("batch_delete_with_prefix_test") + .await + .unwrap(); + let prefix = b"batch_delete/"; + prepare_kv_with_prefix(&kv_backend, prefix.to_vec()).await; + test_kv_batch_delete_with_prefix(&kv_backend, prefix.to_vec()).await; + unprepare_kv(&kv_backend, prefix).await; + } + + #[tokio::test] + async fn test_mysql_delete_range() { + let kv_backend = build_mysql_kv_backend("delete_range_test").await.unwrap(); + let prefix = b"delete_range/"; + prepare_kv_with_prefix(&kv_backend, prefix.to_vec()).await; + test_kv_delete_range_with_prefix(&kv_backend, prefix.to_vec()).await; + unprepare_kv(&kv_backend, prefix).await; + } + + #[tokio::test] + async fn test_mysql_compare_and_put() { + let kv_backend = build_mysql_kv_backend("compare_and_put_test") + .await + .unwrap(); + let prefix = b"compare_and_put/"; + let kv_backend = Arc::new(kv_backend); + test_kv_compare_and_put_with_prefix(kv_backend.clone(), prefix.to_vec()).await; + } + + #[tokio::test] + async fn test_mysql_txn() { + let kv_backend = build_mysql_kv_backend("txn_test").await.unwrap(); + test_txn_one_compare_op(&kv_backend).await; + text_txn_multi_compare_op(&kv_backend).await; + test_txn_compare_equal(&kv_backend).await; + test_txn_compare_greater(&kv_backend).await; + test_txn_compare_less(&kv_backend).await; + test_txn_compare_not_equal(&kv_backend).await; + } +} diff --git a/src/common/meta/src/kv_backend/rds/postgres.rs b/src/common/meta/src/kv_backend/rds/postgres.rs index ef325e8269..8502be8f3c 100644 --- a/src/common/meta/src/kv_backend/rds/postgres.rs +++ b/src/common/meta/src/kv_backend/rds/postgres.rs @@ -153,6 +153,7 @@ impl<'a> PgSqlTemplateFactory<'a> { /// Builds the template set for the given table name. fn build(&self) -> PgSqlTemplateSet { let table_name = self.table_name; + // Some of queries don't end with `;`, because we need to add `LIMIT` clause. PgSqlTemplateSet { table_name: table_name.to_string(), create_table_statement: format!( diff --git a/tests-integration/fixtures/docker-compose.yml b/tests-integration/fixtures/docker-compose.yml index 7b47c2ed97..349062522e 100644 --- a/tests-integration/fixtures/docker-compose.yml +++ b/tests-integration/fixtures/docker-compose.yml @@ -67,6 +67,18 @@ services: - POSTGRES_DB=postgres - POSTGRES_PASSWORD=admin + mysql: + image: bitnami/mysql:5.7 + ports: + - 3306:3306 + volumes: + - ~/apps/mysql:/var/lib/mysql + environment: + - MYSQL_DATABASE=mysql + - MYSQL_USER=greptimedb + - MYSQL_PASSWORD=admin + - MYSQL_ROOT_PASSWORD=admin + volumes: minio_data: driver: local