feat: add mysql kvbackend txn support

This commit is contained in:
CookiePieWw
2025-03-03 15:28:37 +08:00
parent 0d19e8f089
commit 9513195eb0
13 changed files with 760 additions and 15 deletions

View File

@@ -52,7 +52,7 @@ runs:
uses: ./.github/actions/build-greptime-binary
with:
base-image: ubuntu
features: servers/dashboard,pg_kvbackend
features: servers/dashboard,pg_kvbackend,mysql_kvbackend
cargo-profile: ${{ inputs.cargo-profile }}
artifacts-dir: greptime-linux-${{ inputs.arch }}-${{ inputs.version }}
version: ${{ inputs.version }}
@@ -70,7 +70,7 @@ runs:
if: ${{ inputs.arch == 'amd64' && inputs.dev-mode == 'false' }} # Builds greptime for centos if the host machine is amd64.
with:
base-image: centos
features: servers/dashboard,pg_kvbackend
features: servers/dashboard,pg_kvbackend,mysql_kvbackend
cargo-profile: ${{ inputs.cargo-profile }}
artifacts-dir: greptime-linux-${{ inputs.arch }}-centos-${{ inputs.version }}
version: ${{ inputs.version }}

View File

@@ -111,7 +111,7 @@ jobs:
- name: Build greptime binaries
shell: bash
# `cargo gc` will invoke `cargo build` with specified args
run: cargo gc -- --bin greptime --bin sqlness-runner --features pg_kvbackend
run: cargo gc -- --bin greptime --bin sqlness-runner --features "pg_kvbackend,mysql_kvbackend"
- name: Pack greptime binaries
shell: bash
run: |
@@ -270,7 +270,7 @@ jobs:
- name: Build greptime bianry
shell: bash
# `cargo gc` will invoke `cargo build` with specified args
run: cargo gc --profile ci -- --bin greptime --features pg_kvbackend
run: cargo gc --profile ci -- --bin greptime --features "pg_kvbackend,mysql_kvbackend"
- name: Pack greptime binary
shell: bash
run: |
@@ -687,7 +687,7 @@ jobs:
working-directory: tests-integration/fixtures
run: docker compose up -d --wait
- name: Run nextest cases
run: cargo nextest run --workspace -F dashboard -F pg_kvbackend
run: cargo nextest run --workspace -F dashboard -F pg_kvbackend -F mysql_kvbackend
env:
CARGO_BUILD_RUSTFLAGS: "-C link-arg=-fuse-ld=mold"
RUST_BACKTRACE: 1
@@ -704,6 +704,7 @@ jobs:
GT_MINIO_ENDPOINT_URL: http://127.0.0.1:9000
GT_ETCD_ENDPOINTS: http://127.0.0.1:2379
GT_POSTGRES_ENDPOINTS: postgres://greptimedb:admin@127.0.0.1:5432/postgres
GT_MYSQL_ENDPOINTS: mysql://greptimedb:admin@127.0.0.1:3306/mysql
GT_KAFKA_ENDPOINTS: 127.0.0.1:9092
GT_KAFKA_SASL_ENDPOINTS: 127.0.0.1:9093
UNITTEST_LOG_DIR: "__unittest_logs"
@@ -739,7 +740,7 @@ jobs:
working-directory: tests-integration/fixtures
run: docker compose up -d --wait
- name: Run nextest cases
run: cargo llvm-cov nextest --workspace --lcov --output-path lcov.info -F dashboard -F pg_kvbackend
run: cargo llvm-cov nextest --workspace --lcov --output-path lcov.info -F dashboard -F pg_kvbackend -F mysql_kvbackend
env:
CARGO_BUILD_RUSTFLAGS: "-C link-arg=-fuse-ld=mold"
RUST_BACKTRACE: 1
@@ -755,6 +756,7 @@ jobs:
GT_MINIO_ENDPOINT_URL: http://127.0.0.1:9000
GT_ETCD_ENDPOINTS: http://127.0.0.1:2379
GT_POSTGRES_ENDPOINTS: postgres://greptimedb:admin@127.0.0.1:5432/postgres
GT_MYSQL_ENDPOINTS: mysql://greptimedb:admin@127.0.0.1:3306/mysql
GT_KAFKA_ENDPOINTS: 127.0.0.1:9092
GT_KAFKA_SASL_ENDPOINTS: 127.0.0.1:9093
UNITTEST_LOG_DIR: "__unittest_logs"

1
Cargo.lock generated
View File

@@ -2190,6 +2190,7 @@ dependencies = [
"serde_with",
"session",
"snafu 0.8.5",
"sqlx",
"store-api",
"strum 0.25.0",
"table",

View File

@@ -188,6 +188,10 @@ shadow-rs = "0.38"
similar-asserts = "1.6.0"
smallvec = { version = "1", features = ["serde"] }
snafu = "0.8"
sqlx = { version = "0.8", features = [
"runtime-tokio-rustls",
"mysql",
] }
sysinfo = "0.30"
# on branch v0.52.x
sqlparser = { git = "https://github.com/GreptimeTeam/sqlparser-rs.git", rev = "71dd86058d2af97b9925093d40c4e03360403170", features = [

View File

@@ -6,6 +6,7 @@ license.workspace = true
[features]
pg_kvbackend = ["common-meta/pg_kvbackend"]
mysql_kvbackend = ["common-meta/mysql_kvbackend"]
[lints]
workspace = true

View File

@@ -23,6 +23,8 @@ use common_error::ext::BoxedError;
use common_meta::key::{TableMetadataManager, TableMetadataManagerRef};
use common_meta::kv_backend::etcd::EtcdStore;
use common_meta::kv_backend::memory::MemoryKvBackend;
#[cfg(feature = "mysql_kvbackend")]
use common_meta::kv_backend::rds::MySqlStore;
#[cfg(feature = "pg_kvbackend")]
use common_meta::kv_backend::rds::PgStore;
use common_meta::peer::Peer;
@@ -63,6 +65,9 @@ pub struct BenchTableMetadataCommand {
#[cfg(feature = "pg_kvbackend")]
#[clap(long)]
postgres_addr: Option<String>,
#[cfg(feature = "mysql_kvbackend")]
#[clap(long)]
mysql_addr: Option<String>,
#[clap(long)]
count: u32,
}
@@ -86,6 +91,16 @@ impl BenchTableMetadataCommand {
kv_backend
};
#[cfg(feature = "mysql_kvbackend")]
let kv_backend = if let Some(mysql_addr) = &self.mysql_addr {
info!("Using mysql as kv backend");
MySqlStore::with_url(mysql_addr, "greptime_metakv", 128)
.await
.unwrap()
} else {
kv_backend
};
let table_metadata_manager = Arc::new(TableMetadataManager::new(kv_backend));
let tool = BenchTableMetadata {

View File

@@ -7,6 +7,7 @@ license.workspace = true
[features]
testing = []
pg_kvbackend = ["dep:tokio-postgres", "dep:backon", "dep:deadpool-postgres", "dep:deadpool"]
mysql_kvbackend = ["dep:sqlx", "dep:backon"]
[lints]
workspace = true
@@ -57,6 +58,7 @@ serde_json.workspace = true
serde_with.workspace = true
session.workspace = true
snafu.workspace = true
sqlx = { workspace = true, optional = true }
store-api.workspace = true
strum.workspace = true
table.workspace = true

View File

@@ -685,7 +685,36 @@ pub enum Error {
operation: String,
},
#[cfg(feature = "pg_kvbackend")]
#[cfg(feature = "mysql_kvbackend")]
#[snafu(display("Failed to execute via MySql, sql: {}", sql))]
MySqlExecution {
sql: String,
#[snafu(source)]
error: sqlx::Error,
#[snafu(implicit)]
location: Location,
},
#[cfg(feature = "mysql_kvbackend")]
#[snafu(display("Failed to create connection pool for MySql"))]
CreateMySqlPool {
#[snafu(source)]
error: sqlx::Error,
#[snafu(implicit)]
location: Location,
},
#[cfg(feature = "mysql_kvbackend")]
#[snafu(display("Failed to {} MySql transaction", operation))]
MySqlTransaction {
#[snafu(source)]
error: sqlx::Error,
#[snafu(implicit)]
location: Location,
operation: String,
},
#[cfg(any(feature = "pg_kvbackend", feature = "mysql_kvbackend"))]
#[snafu(display("Rds transaction retry failed"))]
RdsTransactionRetryFailed {
#[snafu(implicit)]
@@ -823,8 +852,13 @@ impl ErrorExt for Error {
PostgresExecution { .. }
| CreatePostgresPool { .. }
| GetPostgresConnection { .. }
| PostgresTransaction { .. }
| RdsTransactionRetryFailed { .. } => StatusCode::Internal,
| PostgresTransaction { .. } => StatusCode::Internal,
#[cfg(feature = "mysql_kvbackend")]
MySqlExecution { .. } | CreateMySqlPool { .. } | MySqlTransaction { .. } => {
StatusCode::Internal
}
#[cfg(any(feature = "pg_kvbackend", feature = "mysql_kvbackend"))]
RdsTransactionRetryFailed { .. } => StatusCode::Internal,
Error::DatanodeTableInfoNotFound { .. } => StatusCode::Internal,
}
}
@@ -835,16 +869,26 @@ impl ErrorExt for Error {
}
impl Error {
#[cfg(feature = "pg_kvbackend")]
#[cfg(any(feature = "pg_kvbackend", feature = "mysql_kvbackend"))]
/// Check if the error is a serialization error.
pub fn is_serialization_error(&self) -> bool {
match self {
#[cfg(feature = "pg_kvbackend")]
Error::PostgresTransaction { error, .. } => {
error.code() == Some(&tokio_postgres::error::SqlState::T_R_SERIALIZATION_FAILURE)
}
#[cfg(feature = "pg_kvbackend")]
Error::PostgresExecution { error, .. } => {
error.code() == Some(&tokio_postgres::error::SqlState::T_R_SERIALIZATION_FAILURE)
}
#[cfg(feature = "mysql_kvbackend")]
Error::MySqlExecution {
error: sqlx::Error::Database(database_error),
..
} => {
database_error.message()
== "Deadlock found when trying to get lock; try restarting transaction"
}
_ => false,
}
}

View File

@@ -31,7 +31,7 @@ use crate::rpc::KeyValue;
pub mod chroot;
pub mod etcd;
pub mod memory;
#[cfg(feature = "pg_kvbackend")]
#[cfg(any(feature = "mysql_kvbackend", feature = "pg_kvbackend"))]
pub mod rds;
pub mod test;
pub mod txn;

View File

@@ -33,10 +33,16 @@ use crate::rpc::store::{
};
use crate::rpc::KeyValue;
#[cfg(feature = "pg_kvbackend")]
mod postgres;
#[cfg(feature = "pg_kvbackend")]
pub use postgres::PgStore;
#[cfg(feature = "mysql_kvbackend")]
mod mysql;
#[cfg(feature = "mysql_kvbackend")]
pub use mysql::MySqlStore;
const RDS_STORE_TXN_RETRY_COUNT: usize = 3;
/// Query executor for rds. It can execute queries or generate a transaction executor.
@@ -106,6 +112,14 @@ impl<T: Executor> ExecutorImpl<'_, T> {
}
}
#[warn(dead_code)] // Used in #[cfg(feature = "mysql_kvbackend")]
async fn execute(&mut self, query: &str, params: &Vec<&Vec<u8>>) -> Result<()> {
match self {
Self::Default(executor) => executor.execute(query, params).await,
Self::Txn(executor) => executor.execute(query, params).await,
}
}
async fn commit(self) -> Result<()> {
match self {
Self::Txn(executor) => executor.commit().await,

View File

@@ -0,0 +1,648 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use std::marker::PhantomData;
use std::sync::Arc;
use common_telemetry::debug;
use snafu::ResultExt;
use sqlx::mysql::MySqlRow;
use sqlx::pool::Pool;
use sqlx::{MySql, MySqlPool, Row, Transaction as MySqlTransaction};
use crate::error::{CreateMySqlPoolSnafu, MySqlExecutionSnafu, MySqlTransactionSnafu, Result};
use crate::kv_backend::rds::{
Executor, ExecutorFactory, ExecutorImpl, KvQueryExecutor, RdsStore, Transaction,
RDS_STORE_TXN_RETRY_COUNT,
};
use crate::kv_backend::KvBackendRef;
use crate::rpc::store::{
BatchDeleteRequest, BatchDeleteResponse, BatchGetRequest, BatchGetResponse, BatchPutRequest,
BatchPutResponse, DeleteRangeRequest, DeleteRangeResponse, RangeRequest, RangeResponse,
};
use crate::rpc::KeyValue;
type MySqlClient = Arc<Pool<MySql>>;
pub struct MySqlTxnClient(MySqlTransaction<'static, MySql>);
fn key_value_from_row(row: MySqlRow) -> KeyValue {
// Safety: key and value are the first two columns in the row
KeyValue {
key: row.get_unchecked(0),
value: row.get_unchecked(1),
}
}
const EMPTY: &[u8] = &[0];
/// Type of range template.
#[derive(Debug, Clone, Copy)]
enum RangeTemplateType {
Point,
Range,
Full,
LeftBounded,
Prefix,
}
/// Builds params for the given range template type.
impl RangeTemplateType {
fn build_params(&self, mut key: Vec<u8>, range_end: Vec<u8>) -> Vec<Vec<u8>> {
match self {
RangeTemplateType::Point => vec![key],
RangeTemplateType::Range => vec![key, range_end],
RangeTemplateType::Full => vec![],
RangeTemplateType::LeftBounded => vec![key],
RangeTemplateType::Prefix => {
key.push(b'%');
vec![key]
}
}
}
}
/// Templates for range request.
#[derive(Debug, Clone)]
struct RangeTemplate {
point: String,
range: String,
full: String,
left_bounded: String,
prefix: String,
}
impl RangeTemplate {
/// Gets the template for the given type.
fn get(&self, typ: RangeTemplateType) -> &str {
match typ {
RangeTemplateType::Point => &self.point,
RangeTemplateType::Range => &self.range,
RangeTemplateType::Full => &self.full,
RangeTemplateType::LeftBounded => &self.left_bounded,
RangeTemplateType::Prefix => &self.prefix,
}
}
/// Adds limit to the template.
fn with_limit(template: &str, limit: i64) -> String {
if limit == 0 {
return format!("{};", template);
}
format!("{} LIMIT {};", template, limit)
}
}
fn is_prefix_range(start: &[u8], end: &[u8]) -> bool {
if start.len() != end.len() {
return false;
}
let l = start.len();
let same_prefix = start[0..l - 1] == end[0..l - 1];
if let (Some(rhs), Some(lhs)) = (start.last(), end.last()) {
return same_prefix && (*rhs + 1) == *lhs;
}
false
}
/// Determine the template type for range request.
fn range_template(key: &[u8], range_end: &[u8]) -> RangeTemplateType {
match (key, range_end) {
(_, &[]) => RangeTemplateType::Point,
(EMPTY, EMPTY) => RangeTemplateType::Full,
(_, EMPTY) => RangeTemplateType::LeftBounded,
(start, end) => {
if is_prefix_range(start, end) {
RangeTemplateType::Prefix
} else {
RangeTemplateType::Range
}
}
}
}
/// Generate in placeholders for MySQL.
fn mysql_generate_in_placeholders(from: usize, to: usize) -> Vec<String> {
(from..=to).map(|_| "?".to_string()).collect()
}
/// Factory for building sql templates.
struct MySqlTemplateFactory<'a> {
table_name: &'a str,
}
impl<'a> MySqlTemplateFactory<'a> {
/// Creates a new [`SqlTemplateFactory`] with the given table name.
fn new(table_name: &'a str) -> Self {
Self { table_name }
}
/// Builds the template set for the given table name.
fn build(&self) -> MySqlTemplateSet {
let table_name = self.table_name;
MySqlTemplateSet {
table_name: table_name.to_string(),
create_table_statement: format!(
"CREATE TABLE IF NOT EXISTS {table_name}(k VARBINARY(3072) PRIMARY KEY, v BLOB);",
),
range_template: RangeTemplate {
point: format!("SELECT k, v FROM {table_name} WHERE k = ?"),
range: format!("SELECT k, v FROM {table_name} WHERE k >= ? AND k < ? ORDER BY k"),
full: format!("SELECT k, v FROM {table_name} ? ORDER BY k"),
left_bounded: format!("SELECT k, v FROM {table_name} WHERE k >= ? ORDER BY k"),
prefix: format!("SELECT k, v FROM {table_name} WHERE k LIKE ? ORDER BY k"),
},
delete_template: RangeTemplate {
point: format!("DELETE FROM {table_name} WHERE k = ?;"),
range: format!("DELETE FROM {table_name} WHERE k >= ? AND k < ?;"),
full: format!("DELETE FROM {table_name}"),
left_bounded: format!("DELETE FROM {table_name} WHERE k >= ?;"),
prefix: format!("DELETE FROM {table_name} WHERE k LIKE ?;"),
},
}
}
}
/// Templates for the given table name.
#[derive(Debug, Clone)]
pub struct MySqlTemplateSet {
table_name: String,
create_table_statement: String,
range_template: RangeTemplate,
delete_template: RangeTemplate,
}
impl MySqlTemplateSet {
/// Generates the sql for batch get.
fn generate_batch_get_query(&self, key_len: usize) -> String {
let table_name = &self.table_name;
let in_clause = mysql_generate_in_placeholders(1, key_len).join(", ");
format!("SELECT k, v FROM {table_name} WHERE k in ({});", in_clause)
}
/// Generates the sql for batch delete.
fn generate_batch_delete_query(&self, key_len: usize) -> String {
let table_name = &self.table_name;
let in_clause = mysql_generate_in_placeholders(1, key_len).join(", ");
format!("DELETE FROM {table_name} WHERE k in ({});", in_clause)
}
/// Generates the sql for batch upsert.
/// For MySQL, it also generates a select query to get the previous values.
fn generate_batch_upsert_query(&self, kv_len: usize) -> (String, String) {
let table_name = &self.table_name;
let in_placeholders: Vec<String> = (1..=kv_len).map(|_| "?".to_string()).collect();
let in_clause = in_placeholders.join(", ");
let mut values_placeholders = Vec::new();
for _ in 0..kv_len {
values_placeholders.push("(?, ?)".to_string());
}
let values_clause = values_placeholders.join(", ");
(
format!(r#"SELECT k, v FROM {table_name} WHERE k IN ({in_clause})"#,),
format!(
r#"INSERT INTO {table_name} (k, v) VALUES {values_clause} ON DUPLICATE KEY UPDATE v = VALUES(v);"#,
),
)
}
}
#[async_trait::async_trait]
impl Executor for MySqlClient {
type Transaction<'a>
= MySqlTxnClient
where
Self: 'a;
fn name() -> &'static str {
"MySql"
}
async fn query(&mut self, raw_query: &str, params: &[&Vec<u8>]) -> Result<Vec<KeyValue>> {
let query = sqlx::query(raw_query);
let query = params.iter().fold(query, |query, param| query.bind(param));
let rows = query
.fetch_all(&**self)
.await
.context(MySqlExecutionSnafu { sql: raw_query })?;
Ok(rows.into_iter().map(key_value_from_row).collect())
}
async fn execute(&mut self, raw_query: &str, params: &[&Vec<u8>]) -> Result<()> {
let query = sqlx::query(raw_query);
let query = params.iter().fold(query, |query, param| query.bind(param));
query
.execute(&**self)
.await
.context(MySqlExecutionSnafu { sql: raw_query })?;
Ok(())
}
async fn txn_executor<'a>(&'a mut self) -> Result<Self::Transaction<'a>> {
// sqlx has no isolation level support for now, so we have to set it manually.
// TODO(CookiePie): Waiting for https://github.com/launchbadge/sqlx/pull/3614 and remove this.
sqlx::query("SET SESSION TRANSACTION ISOLATION LEVEL SERIALIZABLE")
.execute(&**self)
.await
.context(MySqlExecutionSnafu {
sql: "SET SESSION TRANSACTION ISOLATION LEVEL SERIALIZABLE",
})?;
let txn = self
.begin()
.await
.context(MySqlExecutionSnafu { sql: "begin" })?;
Ok(MySqlTxnClient(txn))
}
}
#[async_trait::async_trait]
impl Transaction<'_> for MySqlTxnClient {
async fn query(&mut self, raw_query: &str, params: &[&Vec<u8>]) -> Result<Vec<KeyValue>> {
let query = sqlx::query(raw_query);
let query = params.iter().fold(query, |query, param| query.bind(param));
// As said in https://docs.rs/sqlx/latest/sqlx/trait.Executor.html, we need a `&mut *transaction`. Weird.
let rows = query
.fetch_all(&mut *(self.0))
.await
.context(MySqlExecutionSnafu { sql: raw_query })?;
Ok(rows.into_iter().map(key_value_from_row).collect())
}
async fn execute(&mut self, raw_query: &str, params: &[&Vec<u8>]) -> Result<()> {
let query = sqlx::query(raw_query);
let query = params.iter().fold(query, |query, param| query.bind(param));
// As said in https://docs.rs/sqlx/latest/sqlx/trait.Executor.html, we need a `&mut *transaction`. Weird.
query
.execute(&mut *(self.0))
.await
.context(MySqlExecutionSnafu { sql: raw_query })?;
Ok(())
}
/// Caution: sqlx will stuck on the query if two transactions conflict with each other.
/// Don't know if it's a feature or it depends on the database. Be careful.
async fn commit(self) -> Result<()> {
self.0.commit().await.context(MySqlTransactionSnafu {
operation: "commit",
})?;
Ok(())
}
}
pub struct MySqlExecutorFactory {
pool: Arc<Pool<MySql>>,
}
#[async_trait::async_trait]
impl ExecutorFactory<MySqlClient> for MySqlExecutorFactory {
async fn default_executor(&self) -> Result<MySqlClient> {
Ok(self.pool.clone())
}
async fn txn_executor<'a>(
&self,
default_executor: &'a mut MySqlClient,
) -> Result<MySqlTxnClient> {
default_executor.txn_executor().await
}
}
/// A MySQL-backed key-value store.
/// It uses [sqlx::Pool<MySql>] as the connection pool for [RdsStore].
pub type MySqlStore = RdsStore<MySqlClient, MySqlExecutorFactory, MySqlTemplateSet>;
#[async_trait::async_trait]
impl KvQueryExecutor<MySqlClient> for MySqlStore {
async fn range_with_query_executor(
&self,
query_executor: &mut ExecutorImpl<'_, MySqlClient>,
req: RangeRequest,
) -> Result<RangeResponse> {
let template_type = range_template(&req.key, &req.range_end);
let template = self.sql_template_set.range_template.get(template_type);
let params = template_type.build_params(req.key, req.range_end);
let params_ref = params.iter().collect::<Vec<_>>();
// Always add 1 to limit to check if there is more data
let query =
RangeTemplate::with_limit(template, if req.limit == 0 { 0 } else { req.limit + 1 });
let limit = req.limit as usize;
debug!("query: {:?}, params: {:?}", query, params);
let mut kvs = query_executor.query(&query, &params_ref).await?;
if req.keys_only {
kvs.iter_mut().for_each(|kv| kv.value = vec![]);
}
// If limit is 0, we always return all data
if limit == 0 || kvs.len() <= limit {
return Ok(RangeResponse { kvs, more: false });
}
// If limit is greater than the number of rows, we remove the last row and set more to true
let removed = kvs.pop();
debug_assert!(removed.is_some());
Ok(RangeResponse { kvs, more: true })
}
async fn batch_put_with_query_executor(
&self,
query_executor: &mut ExecutorImpl<'_, MySqlClient>,
req: BatchPutRequest,
) -> Result<BatchPutResponse> {
let mut in_params = Vec::with_capacity(req.kvs.len() * 3);
let mut values_params = Vec::with_capacity(req.kvs.len() * 2);
for kv in &req.kvs {
let processed_key = &kv.key;
in_params.push(processed_key);
let processed_value = &kv.value;
values_params.push(processed_key);
values_params.push(processed_value);
}
let in_params = in_params.iter().map(|x| x as _).collect::<Vec<_>>();
let values_params = values_params.iter().map(|x| x as _).collect::<Vec<_>>();
let (select, update) = self
.sql_template_set
.generate_batch_upsert_query(req.kvs.len());
// Fast path: if we don't need previous kvs, we can just upsert the keys.
if !req.prev_kv {
query_executor.execute(&update, &values_params).await?;
return Ok(BatchPutResponse::default());
}
// Should use transaction to ensure atomicity.
if let ExecutorImpl::Default(query_executor) = query_executor {
let txn = query_executor.txn_executor().await?;
let mut txn = ExecutorImpl::Txn(txn);
let res = self.batch_put_with_query_executor(&mut txn, req).await;
txn.commit().await?;
return res;
}
let prev_kvs = query_executor.query(&select, &in_params).await?;
query_executor.execute(&update, &values_params).await?;
Ok(BatchPutResponse { prev_kvs })
}
async fn batch_get_with_query_executor(
&self,
query_executor: &mut ExecutorImpl<'_, MySqlClient>,
req: BatchGetRequest,
) -> Result<BatchGetResponse> {
if req.keys.is_empty() {
return Ok(BatchGetResponse { kvs: vec![] });
}
let query = self
.sql_template_set
.generate_batch_get_query(req.keys.len());
let params = req.keys.iter().map(|x| x as _).collect::<Vec<_>>();
let kvs = query_executor.query(&query, &params).await?;
Ok(BatchGetResponse { kvs })
}
async fn delete_range_with_query_executor(
&self,
query_executor: &mut ExecutorImpl<'_, MySqlClient>,
req: DeleteRangeRequest,
) -> Result<DeleteRangeResponse> {
// Since we need to know the number of deleted keys, we have no fast path here.
// Should use transaction to ensure atomicity.
if let ExecutorImpl::Default(query_executor) = query_executor {
let txn = query_executor.txn_executor().await?;
let mut txn = ExecutorImpl::Txn(txn);
let res = self.delete_range_with_query_executor(&mut txn, req).await;
txn.commit().await?;
return res;
}
let range_get_req = RangeRequest {
key: req.key.clone(),
range_end: req.range_end.clone(),
limit: 0,
keys_only: false,
};
let prev_kvs = self
.range_with_query_executor(query_executor, range_get_req)
.await?
.kvs;
let template_type = range_template(&req.key, &req.range_end);
let template = self.sql_template_set.delete_template.get(template_type);
let params = template_type.build_params(req.key, req.range_end);
let params_ref = params.iter().map(|x| x as _).collect::<Vec<_>>();
query_executor.execute(template, &params_ref).await?;
let mut resp = DeleteRangeResponse::new(prev_kvs.len() as i64);
if req.prev_kv {
resp.with_prev_kvs(prev_kvs);
}
Ok(resp)
}
async fn batch_delete_with_query_executor(
&self,
query_executor: &mut ExecutorImpl<'_, MySqlClient>,
req: BatchDeleteRequest,
) -> Result<BatchDeleteResponse> {
if req.keys.is_empty() {
return Ok(BatchDeleteResponse::default());
}
let query = self
.sql_template_set
.generate_batch_delete_query(req.keys.len());
let params = req.keys.iter().map(|x| x as _).collect::<Vec<_>>();
// Fast path: if we don't need previous kvs, we can just delete the keys.
if !req.prev_kv {
query_executor.execute(&query, &params).await?;
return Ok(BatchDeleteResponse::default());
}
// Should use transaction to ensure atomicity.
if let ExecutorImpl::Default(query_executor) = query_executor {
let txn = query_executor.txn_executor().await?;
let mut txn = ExecutorImpl::Txn(txn);
let res = self.batch_delete_with_query_executor(&mut txn, req).await;
txn.commit().await?;
return res;
}
// Should get previous kvs first
let batch_get_req = BatchGetRequest {
keys: req.keys.clone(),
};
let prev_kvs = self
.batch_get_with_query_executor(query_executor, batch_get_req)
.await?
.kvs;
// Pure `DELETE` has no return value, so we need to use `execute` instead of `query`.
query_executor.execute(&query, &params).await?;
if req.prev_kv {
Ok(BatchDeleteResponse { prev_kvs })
} else {
Ok(BatchDeleteResponse::default())
}
}
}
impl MySqlStore {
/// Create [MySqlStore] impl of [KvBackendRef] from url.
pub async fn with_url(url: &str, table_name: &str, max_txn_ops: usize) -> Result<KvBackendRef> {
let pool = MySqlPool::connect(url)
.await
.context(CreateMySqlPoolSnafu)?;
Self::with_mysql_pool(pool, table_name, max_txn_ops).await
}
/// Create [MySqlStore] impl of [KvBackendRef] from [sqlx::Pool<MySql>].
pub async fn with_mysql_pool(
pool: Pool<MySql>,
table_name: &str,
max_txn_ops: usize,
) -> Result<KvBackendRef> {
// This step ensures the mysql metadata backend is ready to use.
// We check if greptime_metakv table exists, and we will create a new table
// if it does not exist.
let sql_template_set = MySqlTemplateFactory::new(table_name).build();
sqlx::query(&sql_template_set.create_table_statement)
.execute(&pool)
.await
.context(MySqlExecutionSnafu {
sql: sql_template_set.create_table_statement.to_string(),
})?;
Ok(Arc::new(MySqlStore {
max_txn_ops,
sql_template_set,
txn_retry_count: RDS_STORE_TXN_RETRY_COUNT,
executor_factory: MySqlExecutorFactory {
pool: Arc::new(pool),
},
_phantom: PhantomData,
}))
}
}
#[cfg(test)]
mod tests {
use common_telemetry::init_default_ut_logging;
use super::*;
use crate::kv_backend::test::{
prepare_kv_with_prefix, test_kv_batch_delete_with_prefix, test_kv_batch_get_with_prefix,
test_kv_compare_and_put_with_prefix, test_kv_delete_range_with_prefix,
test_kv_put_with_prefix, test_kv_range_2_with_prefix, test_kv_range_with_prefix,
test_txn_compare_equal, test_txn_compare_greater, test_txn_compare_less,
test_txn_compare_not_equal, test_txn_one_compare_op, text_txn_multi_compare_op,
unprepare_kv,
};
async fn build_mysql_kv_backend(table_name: &str) -> Option<MySqlStore> {
init_default_ut_logging();
let endpoints = std::env::var("GT_MYSQL_ENDPOINTS").unwrap_or_default();
if endpoints.is_empty() {
return None;
}
let pool = MySqlPool::connect(&endpoints).await.unwrap();
let sql_templates = MySqlTemplateFactory::new(table_name).build();
sqlx::query(&sql_templates.create_table_statement)
.execute(&pool)
.await
.unwrap();
Some(MySqlStore {
max_txn_ops: 128,
sql_template_set: sql_templates,
txn_retry_count: RDS_STORE_TXN_RETRY_COUNT,
executor_factory: MySqlExecutorFactory {
pool: Arc::new(pool),
},
_phantom: PhantomData,
})
}
#[tokio::test]
async fn test_mysql_put() {
let kv_backend = build_mysql_kv_backend("put_test").await.unwrap();
let prefix = b"put/";
prepare_kv_with_prefix(&kv_backend, prefix.to_vec()).await;
test_kv_put_with_prefix(&kv_backend, prefix.to_vec()).await;
unprepare_kv(&kv_backend, prefix).await;
}
#[tokio::test]
async fn test_mysql_range() {
let kv_backend = build_mysql_kv_backend("range_test").await.unwrap();
let prefix = b"range/";
prepare_kv_with_prefix(&kv_backend, prefix.to_vec()).await;
test_kv_range_with_prefix(&kv_backend, prefix.to_vec()).await;
unprepare_kv(&kv_backend, prefix).await;
}
#[tokio::test]
async fn test_mysql_range_2() {
let kv_backend = build_mysql_kv_backend("range2_test").await.unwrap();
let prefix = b"range2/";
test_kv_range_2_with_prefix(&kv_backend, prefix.to_vec()).await;
unprepare_kv(&kv_backend, prefix).await;
}
#[tokio::test]
async fn test_mysql_batch_get() {
let kv_backend = build_mysql_kv_backend("batch_get_test").await.unwrap();
let prefix = b"batch_get/";
prepare_kv_with_prefix(&kv_backend, prefix.to_vec()).await;
test_kv_batch_get_with_prefix(&kv_backend, prefix.to_vec()).await;
unprepare_kv(&kv_backend, prefix).await;
}
#[tokio::test]
async fn test_mysql_batch_delete() {
let kv_backend = build_mysql_kv_backend("batch_delete_test").await.unwrap();
let prefix = b"batch_delete/";
prepare_kv_with_prefix(&kv_backend, prefix.to_vec()).await;
test_kv_delete_range_with_prefix(&kv_backend, prefix.to_vec()).await;
unprepare_kv(&kv_backend, prefix).await;
}
#[tokio::test]
async fn test_mysql_batch_delete_with_prefix() {
let kv_backend = build_mysql_kv_backend("batch_delete_with_prefix_test")
.await
.unwrap();
let prefix = b"batch_delete/";
prepare_kv_with_prefix(&kv_backend, prefix.to_vec()).await;
test_kv_batch_delete_with_prefix(&kv_backend, prefix.to_vec()).await;
unprepare_kv(&kv_backend, prefix).await;
}
#[tokio::test]
async fn test_mysql_delete_range() {
let kv_backend = build_mysql_kv_backend("delete_range_test").await.unwrap();
let prefix = b"delete_range/";
prepare_kv_with_prefix(&kv_backend, prefix.to_vec()).await;
test_kv_delete_range_with_prefix(&kv_backend, prefix.to_vec()).await;
unprepare_kv(&kv_backend, prefix).await;
}
#[tokio::test]
async fn test_mysql_compare_and_put() {
let kv_backend = build_mysql_kv_backend("compare_and_put_test")
.await
.unwrap();
let prefix = b"compare_and_put/";
let kv_backend = Arc::new(kv_backend);
test_kv_compare_and_put_with_prefix(kv_backend.clone(), prefix.to_vec()).await;
}
#[tokio::test]
async fn test_mysql_txn() {
let kv_backend = build_mysql_kv_backend("txn_test").await.unwrap();
test_txn_one_compare_op(&kv_backend).await;
text_txn_multi_compare_op(&kv_backend).await;
test_txn_compare_equal(&kv_backend).await;
test_txn_compare_greater(&kv_backend).await;
test_txn_compare_less(&kv_backend).await;
test_txn_compare_not_equal(&kv_backend).await;
}
}

View File

@@ -334,9 +334,11 @@ pub async fn test_kv_compare_and_put_with_prefix(
expect: vec![],
value: b"val_new".to_vec(),
};
let resp = kv_backend_clone.compare_and_put(req).await.unwrap();
if resp.success {
success_clone.fetch_add(1, Ordering::SeqCst);
let resp = kv_backend_clone.compare_and_put(req).await;
if let Ok(resp) = resp {
if resp.success {
success_clone.fetch_add(1, Ordering::SeqCst);
}
}
});
joins.push(join);

View File

@@ -67,6 +67,18 @@ services:
- POSTGRES_DB=postgres
- POSTGRES_PASSWORD=admin
mysql:
image: bitnami/mysql:5.7
ports:
- 3306:3306
volumes:
- ~/apps/mysql:/var/lib/mysql
environment:
- MYSQL_DATABASE=mysql
- MYSQL_USER=greptimedb
- MYSQL_PASSWORD=admin
- MYSQL_ROOT_PASSWORD=admin
volumes:
minio_data:
driver: local