diff --git a/.github/workflows/develop.yml b/.github/workflows/develop.yml index c8f8c681b2..f446ccfc6c 100644 --- a/.github/workflows/develop.yml +++ b/.github/workflows/develop.yml @@ -100,7 +100,7 @@ jobs: - name: Build greptime binaries shell: bash # `cargo gc` will invoke `cargo build` with specified args - run: cargo gc -- --bin greptime --bin sqlness-runner + run: cargo gc -- --bin greptime --bin sqlness-runner --features pg_kvbackend - name: Pack greptime binaries shell: bash run: | @@ -261,7 +261,7 @@ jobs: - name: Build greptime bianry shell: bash # `cargo gc` will invoke `cargo build` with specified args - run: cargo gc --profile ci -- --bin greptime + run: cargo gc --profile ci -- --bin greptime --features pg_kvbackend - name: Pack greptime binary shell: bash run: | @@ -573,6 +573,9 @@ jobs: - name: "Remote WAL" opts: "-w kafka -k 127.0.0.1:9092" kafka: true + - name: "Pg Kvbackend" + opts: "--setup-pg" + kafka: false timeout-minutes: 60 steps: - uses: actions/checkout@v4 diff --git a/Cargo.lock b/Cargo.lock index 1c5b578345..d0503c3b9f 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2174,6 +2174,7 @@ dependencies = [ "async-recursion", "async-stream", "async-trait", + "backon", "base64 0.21.7", "bytes", "chrono", diff --git a/Cargo.toml b/Cargo.toml index 73571315fe..91b05578d1 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -99,6 +99,7 @@ arrow-schema = { version = "51.0", features = ["serde"] } async-stream = "0.3" async-trait = "0.1" axum = { version = "0.6", features = ["headers"] } +backon = "1" base64 = "0.21" bigdecimal = "0.4.2" bitflags = "2.4.1" diff --git a/src/cli/src/bench.rs b/src/cli/src/bench.rs index c045125480..c948859b52 100644 --- a/src/cli/src/bench.rs +++ b/src/cli/src/bench.rs @@ -78,7 +78,9 @@ impl BenchTableMetadataCommand { #[cfg(feature = "pg_kvbackend")] let kv_backend = if let Some(postgres_addr) = &self.postgres_addr { info!("Using postgres as kv backend"); - PgStore::with_url(postgres_addr, 128).await.unwrap() + PgStore::with_url(postgres_addr, "greptime_metakv", 128) + .await + .unwrap() } else { kv_backend }; diff --git a/src/common/meta/Cargo.toml b/src/common/meta/Cargo.toml index 00da3cacca..231399a578 100644 --- a/src/common/meta/Cargo.toml +++ b/src/common/meta/Cargo.toml @@ -6,7 +6,7 @@ license.workspace = true [features] testing = [] -pg_kvbackend = ["dep:tokio-postgres"] +pg_kvbackend = ["dep:tokio-postgres", "dep:backon"] [lints] workspace = true @@ -17,6 +17,7 @@ api.workspace = true async-recursion = "1.0" async-stream = "0.3" async-trait.workspace = true +backon = { workspace = true, optional = true } base64.workspace = true bytes.workspace = true chrono.workspace = true diff --git a/src/common/meta/src/error.rs b/src/common/meta/src/error.rs index 8c92146a46..0fc483879d 100644 --- a/src/common/meta/src/error.rs +++ b/src/common/meta/src/error.rs @@ -639,15 +639,6 @@ pub enum Error { location: Location, }, - #[snafu(display("Failed to parse {} from str to utf8", name))] - StrFromUtf8 { - name: String, - #[snafu(source)] - error: std::str::Utf8Error, - #[snafu(implicit)] - location: Location, - }, - #[snafu(display("Value not exists"))] ValueNotExist { #[snafu(implicit)] @@ -658,8 +649,9 @@ pub enum Error { GetCache { source: Arc }, #[cfg(feature = "pg_kvbackend")] - #[snafu(display("Failed to execute via Postgres"))] + #[snafu(display("Failed to execute via Postgres, sql: {}", sql))] PostgresExecution { + sql: String, #[snafu(source)] error: tokio_postgres::Error, #[snafu(implicit)] @@ -693,6 +685,13 @@ pub enum Error { operation: String, }, + #[cfg(feature = "pg_kvbackend")] + #[snafu(display("Postgres transaction retry failed"))] + PostgresTransactionRetryFailed { + #[snafu(implicit)] + location: Location, + }, + #[snafu(display( "Datanode table info not found, table id: {}, datanode id: {}", table_id, @@ -756,8 +755,7 @@ impl ErrorExt for Error { | UnexpectedLogicalRouteTable { .. } | ProcedureOutput { .. } | FromUtf8 { .. } - | MetadataCorruption { .. } - | StrFromUtf8 { .. } => StatusCode::Unexpected, + | MetadataCorruption { .. } => StatusCode::Unexpected, SendMessage { .. } | GetKvCache { .. } | CacheNotGet { .. } => StatusCode::Internal, @@ -807,7 +805,8 @@ impl ErrorExt for Error { PostgresExecution { .. } | CreatePostgresPool { .. } | GetPostgresConnection { .. } - | PostgresTransaction { .. } => StatusCode::Internal, + | PostgresTransaction { .. } + | PostgresTransactionRetryFailed { .. } => StatusCode::Internal, Error::DatanodeTableInfoNotFound { .. } => StatusCode::Internal, } } @@ -818,6 +817,20 @@ impl ErrorExt for Error { } impl Error { + #[cfg(feature = "pg_kvbackend")] + /// Check if the error is a serialization error. + pub fn is_serialization_error(&self) -> bool { + match self { + Error::PostgresTransaction { error, .. } => { + error.code() == Some(&tokio_postgres::error::SqlState::T_R_SERIALIZATION_FAILURE) + } + Error::PostgresExecution { error, .. } => { + error.code() == Some(&tokio_postgres::error::SqlState::T_R_SERIALIZATION_FAILURE) + } + _ => false, + } + } + /// Creates a new [Error::RetryLater] error from source `err`. pub fn retry_later(err: E) -> Error { Error::RetryLater { diff --git a/src/common/meta/src/kv_backend/etcd.rs b/src/common/meta/src/kv_backend/etcd.rs index 213489a583..17c4b7db71 100644 --- a/src/common/meta/src/kv_backend/etcd.rs +++ b/src/common/meta/src/kv_backend/etcd.rs @@ -591,7 +591,7 @@ mod tests { #[tokio::test] async fn test_range_2() { if let Some(kv_backend) = build_kv_backend().await { - test_kv_range_2_with_prefix(kv_backend, b"range2/".to_vec()).await; + test_kv_range_2_with_prefix(&kv_backend, b"range2/".to_vec()).await; } } @@ -618,7 +618,8 @@ mod tests { if let Some(kv_backend) = build_kv_backend().await { let prefix = b"deleteRange/"; prepare_kv_with_prefix(&kv_backend, prefix.to_vec()).await; - test_kv_delete_range_with_prefix(kv_backend, prefix.to_vec()).await; + test_kv_delete_range_with_prefix(&kv_backend, prefix.to_vec()).await; + unprepare_kv(&kv_backend, prefix).await; } } @@ -627,20 +628,20 @@ mod tests { if let Some(kv_backend) = build_kv_backend().await { let prefix = b"batchDelete/"; prepare_kv_with_prefix(&kv_backend, prefix.to_vec()).await; - test_kv_batch_delete_with_prefix(kv_backend, prefix.to_vec()).await; + test_kv_batch_delete_with_prefix(&kv_backend, prefix.to_vec()).await; + unprepare_kv(&kv_backend, prefix).await; } } #[tokio::test] async fn test_etcd_txn() { if let Some(kv_backend) = build_kv_backend().await { - let kv_backend_ref = Arc::new(kv_backend); - test_txn_one_compare_op(kv_backend_ref.clone()).await; - text_txn_multi_compare_op(kv_backend_ref.clone()).await; - test_txn_compare_equal(kv_backend_ref.clone()).await; - test_txn_compare_greater(kv_backend_ref.clone()).await; - test_txn_compare_less(kv_backend_ref.clone()).await; - test_txn_compare_not_equal(kv_backend_ref).await; + test_txn_one_compare_op(&kv_backend).await; + text_txn_multi_compare_op(&kv_backend).await; + test_txn_compare_equal(&kv_backend).await; + test_txn_compare_greater(&kv_backend).await; + test_txn_compare_less(&kv_backend).await; + test_txn_compare_not_equal(&kv_backend).await; } } } diff --git a/src/common/meta/src/kv_backend/memory.rs b/src/common/meta/src/kv_backend/memory.rs index b236d7b576..989a91ea16 100644 --- a/src/common/meta/src/kv_backend/memory.rs +++ b/src/common/meta/src/kv_backend/memory.rs @@ -355,7 +355,7 @@ mod tests { async fn test_range_2() { let kv = MemoryKvBackend::::new(); - test_kv_range_2(kv).await; + test_kv_range_2(&kv).await; } #[tokio::test] @@ -376,24 +376,24 @@ mod tests { async fn test_delete_range() { let kv_backend = mock_mem_store_with_data().await; - test_kv_delete_range(kv_backend).await; + test_kv_delete_range(&kv_backend).await; } #[tokio::test] async fn test_batch_delete() { let kv_backend = mock_mem_store_with_data().await; - test_kv_batch_delete(kv_backend).await; + test_kv_batch_delete(&kv_backend).await; } #[tokio::test] async fn test_memory_txn() { - let kv_backend = Arc::new(MemoryKvBackend::::new()); - test_txn_one_compare_op(kv_backend.clone()).await; - text_txn_multi_compare_op(kv_backend.clone()).await; - test_txn_compare_equal(kv_backend.clone()).await; - test_txn_compare_greater(kv_backend.clone()).await; - test_txn_compare_less(kv_backend.clone()).await; - test_txn_compare_not_equal(kv_backend).await; + let kv_backend = MemoryKvBackend::::new(); + test_txn_one_compare_op(&kv_backend).await; + text_txn_multi_compare_op(&kv_backend).await; + test_txn_compare_equal(&kv_backend).await; + test_txn_compare_greater(&kv_backend).await; + test_txn_compare_less(&kv_backend).await; + test_txn_compare_not_equal(&kv_backend).await; } } diff --git a/src/common/meta/src/kv_backend/postgres.rs b/src/common/meta/src/kv_backend/postgres.rs index b75f045314..7f3333575a 100644 --- a/src/common/meta/src/kv_backend/postgres.rs +++ b/src/common/meta/src/kv_backend/postgres.rs @@ -13,17 +13,21 @@ // limitations under the License. use std::any::Any; -use std::borrow::Cow; +use std::collections::HashMap; +use std::marker::PhantomData; use std::sync::Arc; +use std::time::Duration; +use backon::{BackoffBuilder, ExponentialBuilder}; +use common_telemetry::debug; use deadpool_postgres::{Config, Pool, Runtime}; use snafu::ResultExt; use tokio_postgres::types::ToSql; -use tokio_postgres::NoTls; +use tokio_postgres::{IsolationLevel, NoTls, Row}; use crate::error::{ CreatePostgresPoolSnafu, Error, GetPostgresConnectionSnafu, PostgresExecutionSnafu, - PostgresTransactionSnafu, Result, StrFromUtf8Snafu, + PostgresTransactionRetryFailedSnafu, PostgresTransactionSnafu, Result, }; use crate::kv_backend::txn::{ Compare, Txn as KvTxn, TxnOp, TxnOpResponse, TxnResponse as KvTxnResponse, @@ -32,8 +36,8 @@ use crate::kv_backend::{KvBackend, KvBackendRef, TxnService}; use crate::metrics::METRIC_META_TXN_REQUEST; use crate::rpc::store::{ BatchDeleteRequest, BatchDeleteResponse, BatchGetRequest, BatchGetResponse, BatchPutRequest, - BatchPutResponse, CompareAndPutRequest, CompareAndPutResponse, DeleteRangeRequest, - DeleteRangeResponse, PutRequest, PutResponse, RangeRequest, RangeResponse, + BatchPutResponse, DeleteRangeRequest, DeleteRangeResponse, PutRequest, PutResponse, + RangeRequest, RangeResponse, }; use crate::rpc::KeyValue; @@ -54,11 +58,11 @@ impl PgQueryExecutor<'_> { PgQueryExecutor::Client(client) => client .query(query, params) .await - .context(PostgresExecutionSnafu), + .context(PostgresExecutionSnafu { sql: query }), PgQueryExecutor::Transaction(txn) => txn .query(query, params) .await - .context(PostgresExecutionSnafu), + .context(PostgresExecutionSnafu { sql: query }), } } @@ -74,78 +78,188 @@ impl PgQueryExecutor<'_> { } } +const PG_STORE_TXN_RETRY_COUNT: usize = 3; + /// Posgres backend store for metasrv -pub struct PgStore { +pub struct PgStore { pool: Pool, max_txn_ops: usize, + sql_template_set: SqlTemplateSet, + txn_retry_count: usize, } const EMPTY: &[u8] = &[0]; -// TODO: allow users to configure metadata table name. -const METADKV_CREATION: &str = - "CREATE TABLE IF NOT EXISTS greptime_metakv(k varchar PRIMARY KEY, v varchar)"; +/// Factory for building sql templates. +struct SqlTemplateFactory<'a, T> { + table_name: &'a str, + _phantom: PhantomData, +} -const FULL_TABLE_SCAN: &str = "SELECT k, v FROM greptime_metakv $1 ORDER BY K"; +impl<'a, T> SqlTemplateFactory<'a, T> { + /// Creates a new [`SqlTemplateFactory`] with the given table name. + fn new(table_name: &'a str) -> Self { + Self { + table_name, + _phantom: PhantomData, + } + } -const POINT_GET: &str = "SELECT k, v FROM greptime_metakv WHERE k = $1"; + /// Builds the template set for the given table name. + fn build( + &self, + key_value_from_row: fn(Row) -> T, + key_value_from_row_key_only: fn(Row) -> T, + ) -> SqlTemplateSet { + let table_name = self.table_name; + SqlTemplateSet { + table_name: table_name.to_string(), + create_table_statement: format!( + "CREATE TABLE IF NOT EXISTS {table_name}(k bytea PRIMARY KEY, v bytea)", + ), + range_template: RangeTemplate { + point: format!("SELECT k, v FROM {table_name} WHERE k = $1"), + range: format!("SELECT k, v FROM {table_name} WHERE k >= $1 AND k < $2 ORDER BY k"), + full: format!("SELECT k, v FROM {table_name} $1 ORDER BY k"), + left_bounded: format!("SELECT k, v FROM {table_name} WHERE k >= $1 ORDER BY k"), + prefix: format!("SELECT k, v FROM {table_name} WHERE k LIKE $1 ORDER BY k"), + }, + delete_template: RangeTemplate { + point: format!("DELETE FROM {table_name} WHERE k = $1 RETURNING k,v;"), + range: format!("DELETE FROM {table_name} WHERE k >= $1 AND k < $2 RETURNING k,v;"), + full: format!("DELETE FROM {table_name} RETURNING k,v"), + left_bounded: format!("DELETE FROM {table_name} WHERE k >= $1 RETURNING k,v;"), + prefix: format!("DELETE FROM {table_name} WHERE k LIKE $1 RETURNING k,v;"), + }, + key_value_from_row, + key_value_from_row_key_only, + } + } +} -const PREFIX_SCAN: &str = "SELECT k, v FROM greptime_metakv WHERE k LIKE $1 ORDER BY K"; +/// Templates for the given table name. +#[derive(Debug, Clone)] +pub struct SqlTemplateSet { + table_name: String, + create_table_statement: String, + range_template: RangeTemplate, + delete_template: RangeTemplate, + key_value_from_row: fn(Row) -> T, + key_value_from_row_key_only: fn(Row) -> T, +} -const RANGE_SCAN_LEFT_BOUNDED: &str = "SELECT k, v FROM greptime_metakv WHERE k >= $1 ORDER BY K"; +impl SqlTemplateSet { + /// Converts a row to a [`KeyValue`] with options. + fn key_value_from_row_with_opts(&self, keys_only: bool) -> impl Fn(Row) -> T { + if keys_only { + self.key_value_from_row_key_only + } else { + self.key_value_from_row + } + } -const RANGE_SCAN_FULL_RANGE: &str = - "SELECT k, v FROM greptime_metakv WHERE k >= $1 AND K < $2 ORDER BY K"; + /// Generates the sql for batch get. + fn generate_batch_get_query(&self, key_len: usize) -> String { + let table_name = &self.table_name; + let in_clause = generate_in_placeholders(1, key_len).join(", "); + format!("SELECT k, v FROM {table_name} WHERE k in ({});", in_clause) + } -const FULL_TABLE_DELETE: &str = "DELETE FROM greptime_metakv RETURNING k,v"; + /// Generates the sql for batch delete. + fn generate_batch_delete_query(&self, key_len: usize) -> String { + let table_name = &self.table_name; + let in_clause = generate_in_placeholders(1, key_len).join(", "); + format!( + "DELETE FROM {table_name} WHERE k in ({}) RETURNING k,v;", + in_clause + ) + } -const POINT_DELETE: &str = "DELETE FROM greptime_metakv WHERE K = $1 RETURNING k,v;"; + /// Generates the sql for batch upsert. + fn generate_batch_upsert_query(&self, kv_len: usize) -> String { + let table_name = &self.table_name; + let in_placeholders: Vec = (1..=kv_len).map(|i| format!("${}", i)).collect(); + let in_clause = in_placeholders.join(", "); + let mut param_index = kv_len + 1; + let mut values_placeholders = Vec::new(); + for _ in 0..kv_len { + values_placeholders.push(format!("(${0}, ${1})", param_index, param_index + 1)); + param_index += 2; + } + let values_clause = values_placeholders.join(", "); -const PREFIX_DELETE: &str = "DELETE FROM greptime_metakv WHERE k LIKE $1 RETURNING k,v;"; + format!( + r#" + WITH prev AS ( + SELECT k,v FROM {table_name} WHERE k IN ({in_clause}) + ), update AS ( + INSERT INTO {table_name} (k, v) VALUES + {values_clause} + ON CONFLICT ( + k + ) DO UPDATE SET + v = excluded.v + ) -const RANGE_DELETE_LEFT_BOUNDED: &str = "DELETE FROM greptime_metakv WHERE k >= $1 RETURNING k,v;"; + SELECT k, v FROM prev; + "# + ) + } +} -const RANGE_DELETE_FULL_RANGE: &str = - "DELETE FROM greptime_metakv WHERE k >= $1 AND K < $2 RETURNING k,v;"; +/// Default sql template set for [`KeyValue`]. +pub type DefaultSqlTemplateSet = SqlTemplateSet; -const CAS: &str = r#" -WITH prev AS ( - SELECT k,v FROM greptime_metakv WHERE k = $1 AND v = $2 -), update AS ( -UPDATE greptime_metakv -SET k=$1, -v=$2 -WHERE - k=$1 AND v=$3 -) +/// Default pg store for [`KeyValue`]. +pub type DefaultPgStore = PgStore; -SELECT k, v FROM prev; -"#; +impl PgStore { + async fn client(&self) -> Result { + match self.pool.get().await { + Ok(client) => Ok(client), + Err(e) => GetPostgresConnectionSnafu { + reason: e.to_string(), + } + .fail(), + } + } -const PUT_IF_NOT_EXISTS: &str = r#" -WITH prev AS ( - select k,v from greptime_metakv where k = $1 -), insert AS ( - INSERT INTO greptime_metakv - VALUES ($1, $2) - ON CONFLICT (k) DO NOTHING -) + async fn client_executor(&self) -> Result> { + let client = self.client().await?; + Ok(PgQueryExecutor::Client(client)) + } -SELECT k, v FROM prev;"#; + async fn txn_executor<'a>(&self, client: &'a mut PgClient) -> Result> { + let txn = client + .build_transaction() + .isolation_level(IsolationLevel::Serializable) + .start() + .await + .context(PostgresTransactionSnafu { + operation: "start".to_string(), + })?; + Ok(PgQueryExecutor::Transaction(txn)) + } +} -impl PgStore { +impl DefaultPgStore { /// Create pgstore impl of KvBackendRef from url. - pub async fn with_url(url: &str, max_txn_ops: usize) -> Result { + pub async fn with_url(url: &str, table_name: &str, max_txn_ops: usize) -> Result { let mut cfg = Config::new(); cfg.url = Some(url.to_string()); + // TODO(weny, CookiePie): add tls support let pool = cfg .create_pool(Some(Runtime::Tokio1), NoTls) .context(CreatePostgresPoolSnafu)?; - Self::with_pg_pool(pool, max_txn_ops).await + Self::with_pg_pool(pool, table_name, max_txn_ops).await } /// Create pgstore impl of KvBackendRef from tokio-postgres client. - pub async fn with_pg_pool(pool: Pool, max_txn_ops: usize) -> Result { + pub async fn with_pg_pool( + pool: Pool, + table_name: &str, + max_txn_ops: usize, + ) -> Result { // This step ensures the postgres metadata backend is ready to use. // We check if greptime_metakv table exists, and we will create a new table // if it does not exist. @@ -158,144 +272,104 @@ impl PgStore { .fail(); } }; + let template_factory = SqlTemplateFactory::new(table_name); + let sql_template_set = + template_factory.build(key_value_from_row, key_value_from_row_key_only); client - .execute(METADKV_CREATION, &[]) + .execute(&sql_template_set.create_table_statement, &[]) .await - .context(PostgresExecutionSnafu)?; - Ok(Arc::new(Self { pool, max_txn_ops })) + .with_context(|_| PostgresExecutionSnafu { + sql: sql_template_set.create_table_statement.to_string(), + })?; + Ok(Arc::new(Self { + pool, + max_txn_ops, + sql_template_set, + txn_retry_count: PG_STORE_TXN_RETRY_COUNT, + })) } +} - async fn get_client(&self) -> Result { - match self.pool.get().await { - Ok(client) => Ok(client), - Err(e) => GetPostgresConnectionSnafu { - reason: e.to_string(), +/// Type of range template. +#[derive(Debug, Clone, Copy)] +enum RangeTemplateType { + Point, + Range, + Full, + LeftBounded, + Prefix, +} + +/// Builds params for the given range template type. +impl RangeTemplateType { + fn build_params(&self, mut key: Vec, range_end: Vec) -> Vec> { + match self { + RangeTemplateType::Point => vec![key], + RangeTemplateType::Range => vec![key, range_end], + RangeTemplateType::Full => vec![], + RangeTemplateType::LeftBounded => vec![key], + RangeTemplateType::Prefix => { + key.push(b'%'); + vec![key] } - .fail(), + } + } +} + +/// Templates for range request. +#[derive(Debug, Clone)] +struct RangeTemplate { + point: String, + range: String, + full: String, + left_bounded: String, + prefix: String, +} + +impl RangeTemplate { + /// Gets the template for the given type. + fn get(&self, typ: RangeTemplateType) -> &str { + match typ { + RangeTemplateType::Point => &self.point, + RangeTemplateType::Range => &self.range, + RangeTemplateType::Full => &self.full, + RangeTemplateType::LeftBounded => &self.left_bounded, + RangeTemplateType::Prefix => &self.prefix, } } - async fn get_client_executor(&self) -> Result> { - let client = self.get_client().await?; - Ok(PgQueryExecutor::Client(client)) - } - - async fn get_txn_executor<'a>(&self, client: &'a mut PgClient) -> Result> { - let txn = client - .transaction() - .await - .context(PostgresTransactionSnafu { - operation: "start".to_string(), - })?; - Ok(PgQueryExecutor::Transaction(txn)) - } - - async fn put_if_not_exists_with_query_executor( - &self, - query_executor: &PgQueryExecutor<'_>, - key: &str, - value: &str, - ) -> Result { - let res = query_executor - .query(PUT_IF_NOT_EXISTS, &[&key, &value]) - .await?; - Ok(res.is_empty()) + /// Adds limit to the template. + fn with_limit(template: &str, limit: i64) -> String { + if limit == 0 { + return format!("{};", template); + } + format!("{} LIMIT {};", template, limit) } } -fn select_range_template(req: &RangeRequest) -> &str { - if req.range_end.is_empty() { - return POINT_GET; - } - if req.key == EMPTY && req.range_end == EMPTY { - FULL_TABLE_SCAN - } else if req.range_end == EMPTY { - RANGE_SCAN_LEFT_BOUNDED - } else if is_prefix_range(&req.key, &req.range_end) { - PREFIX_SCAN - } else { - RANGE_SCAN_FULL_RANGE +/// Determine the template type for range request. +fn range_template(key: &[u8], range_end: &[u8]) -> RangeTemplateType { + match (key, range_end) { + (_, &[]) => RangeTemplateType::Point, + (EMPTY, EMPTY) => RangeTemplateType::Full, + (_, EMPTY) => RangeTemplateType::LeftBounded, + (start, end) => { + if is_prefix_range(start, end) { + RangeTemplateType::Prefix + } else { + RangeTemplateType::Range + } + } } } -fn select_range_delete_template(req: &DeleteRangeRequest) -> &str { - if req.range_end.is_empty() { - return POINT_DELETE; - } - if req.key == EMPTY && req.range_end == EMPTY { - FULL_TABLE_DELETE - } else if req.range_end == EMPTY { - RANGE_DELETE_LEFT_BOUNDED - } else if is_prefix_range(&req.key, &req.range_end) { - PREFIX_DELETE - } else { - RANGE_DELETE_FULL_RANGE - } -} - -// Generate dynamic parameterized sql for batch get. -fn generate_batch_get_query(key_len: usize) -> String { - let in_placeholders: Vec = (1..=key_len).map(|i| format!("${}", i)).collect(); - let in_clause = in_placeholders.join(", "); - format!( - "SELECT k, v FROM greptime_metakv WHERE k in ({});", - in_clause - ) -} - -// Generate dynamic parameterized sql for batch delete. -fn generate_batch_delete_query(key_len: usize) -> String { - let in_placeholders: Vec = (1..=key_len).map(|i| format!("${}", i)).collect(); - let in_clause = in_placeholders.join(", "); - format!( - "DELETE FROM greptime_metakv WHERE k in ({}) RETURNING k, v;", - in_clause - ) -} - -// Generate dynamic parameterized sql for batch upsert. -fn generate_batch_upsert_query(kv_len: usize) -> String { - let in_placeholders: Vec = (1..=kv_len).map(|i| format!("${}", i)).collect(); - let in_clause = in_placeholders.join(", "); - let mut param_index = kv_len + 1; - let mut values_placeholders = Vec::new(); - for _ in 0..kv_len { - values_placeholders.push(format!("(${0}, ${1})", param_index, param_index + 1)); - param_index += 2; - } - let values_clause = values_placeholders.join(", "); - - format!( - r#" - WITH prev AS ( - SELECT k,v FROM greptime_metakv WHERE k IN ({in_clause}) - ), update AS ( - INSERT INTO greptime_metakv (k, v) VALUES - {values_clause} - ON CONFLICT ( - k - ) DO UPDATE SET - v = excluded.v - ) - - SELECT k, v FROM prev; - "# - ) -} - -// Trim null byte at the end and convert bytes to string. -fn process_bytes<'a>(data: &'a [u8], name: &str) -> Result<&'a str> { - let mut len = data.len(); - // remove trailing null bytes to avoid error in postgres encoding. - while len > 0 && data[len - 1] == 0 { - len -= 1; - } - let res = std::str::from_utf8(&data[0..len]).context(StrFromUtf8Snafu { name })?; - Ok(res) +/// Generate in placeholders for sql. +fn generate_in_placeholders(from: usize, to: usize) -> Vec { + (from..=to).map(|i| format!("${}", i)).collect() } #[async_trait::async_trait] -impl KvBackend for PgStore { +impl KvBackend for DefaultPgStore { fn name(&self) -> &str { "Postgres" } @@ -305,101 +379,83 @@ impl KvBackend for PgStore { } async fn range(&self, req: RangeRequest) -> Result { - let client = self.get_client_executor().await?; + let client = self.client_executor().await?; self.range_with_query_executor(&client, req).await } async fn put(&self, req: PutRequest) -> Result { - let client = self.get_client_executor().await?; + let client = self.client_executor().await?; self.put_with_query_executor(&client, req).await } async fn batch_put(&self, req: BatchPutRequest) -> Result { - let client = self.get_client_executor().await?; + let client = self.client_executor().await?; self.batch_put_with_query_executor(&client, req).await } async fn batch_get(&self, req: BatchGetRequest) -> Result { - let client = self.get_client_executor().await?; + let client = self.client_executor().await?; self.batch_get_with_query_executor(&client, req).await } async fn delete_range(&self, req: DeleteRangeRequest) -> Result { - let client = self.get_client_executor().await?; + let client = self.client_executor().await?; self.delete_range_with_query_executor(&client, req).await } async fn batch_delete(&self, req: BatchDeleteRequest) -> Result { - let client = self.get_client_executor().await?; + let client = self.client_executor().await?; self.batch_delete_with_query_executor(&client, req).await } - - async fn compare_and_put(&self, req: CompareAndPutRequest) -> Result { - let client = self.get_client_executor().await?; - self.compare_and_put_with_query_executor(&client, req).await - } } -impl PgStore { +/// Converts a row to a [`KeyValue`] with key only. +fn key_value_from_row_key_only(r: Row) -> KeyValue { + KeyValue { + key: r.get(0), + value: vec![], + } +} + +/// Converts a row to a [`KeyValue`]. +fn key_value_from_row(r: Row) -> KeyValue { + KeyValue { + key: r.get(0), + value: r.get(1), + } +} + +impl DefaultPgStore { async fn range_with_query_executor( &self, query_executor: &PgQueryExecutor<'_>, req: RangeRequest, ) -> Result { - let mut params = vec![]; - let template = select_range_template(&req); - if req.key != EMPTY { - let key = process_bytes(&req.key, "rangeKey")?; - if template == PREFIX_SCAN { - let prefix = format!("{key}%"); - params.push(Cow::Owned(prefix)) - } else { - params.push(Cow::Borrowed(key)) - } - } - if template == RANGE_SCAN_FULL_RANGE && req.range_end != EMPTY { - let range_end = process_bytes(&req.range_end, "rangeEnd")?; - params.push(Cow::Borrowed(range_end)); - } + let template_type = range_template(&req.key, &req.range_end); + let template = self.sql_template_set.range_template.get(template_type); + let params = template_type.build_params(req.key, req.range_end); + let params_ref = params.iter().map(|x| x as _).collect::>(); + // Always add 1 to limit to check if there is more data + let query = + RangeTemplate::with_limit(template, if req.limit == 0 { 0 } else { req.limit + 1 }); let limit = req.limit as usize; - let limit_cause = match limit > 0 { - true => format!(" LIMIT {};", limit + 1), - false => ";".to_string(), - }; - let template = format!("{}{}", template, limit_cause); - let params: Vec<&(dyn ToSql + Sync)> = params - .iter() - .map(|x| match x { - Cow::Borrowed(borrowed) => borrowed as &(dyn ToSql + Sync), - Cow::Owned(owned) => owned as &(dyn ToSql + Sync), - }) - .collect(); - let res = query_executor.query(&template, ¶ms).await?; - let kvs: Vec = res + debug!("query: {:?}, params: {:?}", query, params); + let res = query_executor.query(&query, ¶ms_ref).await?; + let mut kvs: Vec = res .into_iter() - .map(|r| { - let key: String = r.get(0); - if req.keys_only { - return KeyValue { - key: key.into_bytes(), - value: vec![], - }; - } - let value: String = r.get(1); - KeyValue { - key: key.into_bytes(), - value: value.into_bytes(), - } - }) + .map( + self.sql_template_set + .key_value_from_row_with_opts(req.keys_only), + ) .collect(); - if limit == 0 || limit > kvs.len() { + // If limit is 0, we always return all data + if limit == 0 || kvs.len() <= limit { return Ok(RangeResponse { kvs, more: false }); } - let (filtered_kvs, _) = kvs.split_at(limit); - Ok(RangeResponse { - kvs: filtered_kvs.to_vec(), - more: kvs.len() > limit, - }) + // If limit is greater than the number of rows, we remove the last row and set more to true + let removed = kvs.pop(); + debug_assert!(removed.is_some()); + Ok(RangeResponse { kvs, more: true }) } async fn put_with_query_executor( @@ -422,11 +478,12 @@ impl PgStore { .await?; if !res.prev_kvs.is_empty() { + debug_assert!(req.prev_kv); return Ok(PutResponse { prev_kv: Some(res.prev_kvs.remove(0)), }); } - Ok(PutResponse { prev_kv: None }) + Ok(PutResponse::default()) } async fn batch_put_with_query_executor( @@ -434,41 +491,33 @@ impl PgStore { query_executor: &PgQueryExecutor<'_>, req: BatchPutRequest, ) -> Result { - let mut in_params = Vec::with_capacity(req.kvs.len()); + let mut in_params = Vec::with_capacity(req.kvs.len() * 3); let mut values_params = Vec::with_capacity(req.kvs.len() * 2); for kv in &req.kvs { - let processed_key = process_bytes(&kv.key, "BatchPutRequestKey")?; + let processed_key = &kv.key; in_params.push(processed_key); - let processed_value = process_bytes(&kv.value, "BatchPutRequestValue")?; + let processed_value = &kv.value; values_params.push(processed_key); values_params.push(processed_value); } in_params.extend(values_params); - let params: Vec<&(dyn ToSql + Sync)> = - in_params.iter().map(|x| x as &(dyn ToSql + Sync)).collect(); - - let query = generate_batch_upsert_query(req.kvs.len()); - + let params = in_params.iter().map(|x| x as _).collect::>(); + let query = self + .sql_template_set + .generate_batch_upsert_query(req.kvs.len()); let res = query_executor.query(&query, ¶ms).await?; if req.prev_kv { - let kvs: Vec = res - .into_iter() - .map(|r| { - let key: String = r.get(0); - let value: String = r.get(1); - KeyValue { - key: key.into_bytes(), - value: value.into_bytes(), - } - }) - .collect(); - if !kvs.is_empty() { - return Ok(BatchPutResponse { prev_kvs: kvs }); - } + Ok(BatchPutResponse { + prev_kvs: res + .into_iter() + .map(self.sql_template_set.key_value_from_row) + .collect(), + }) + } else { + Ok(BatchPutResponse::default()) } - Ok(BatchPutResponse { prev_kvs: vec![] }) } /// Batch get with certain client. It's needed for a client with transaction. @@ -480,30 +529,17 @@ impl PgStore { if req.keys.is_empty() { return Ok(BatchGetResponse { kvs: vec![] }); } - let query = generate_batch_get_query(req.keys.len()); - let value_params = req - .keys - .iter() - .map(|k| process_bytes(k, "BatchGetRequestKey")) - .collect::>>()?; - let params: Vec<&(dyn ToSql + Sync)> = value_params - .iter() - .map(|x| x as &(dyn ToSql + Sync)) - .collect(); - + let query = self + .sql_template_set + .generate_batch_get_query(req.keys.len()); + let params = req.keys.iter().map(|x| x as _).collect::>(); let res = query_executor.query(&query, ¶ms).await?; - let kvs: Vec = res - .into_iter() - .map(|r| { - let key: String = r.get(0); - let value: String = r.get(1); - KeyValue { - key: key.into_bytes(), - value: value.into_bytes(), - } - }) - .collect(); - Ok(BatchGetResponse { kvs }) + Ok(BatchGetResponse { + kvs: res + .into_iter() + .map(self.sql_template_set.key_value_from_row) + .collect(), + }) } async fn delete_range_with_query_executor( @@ -511,54 +547,20 @@ impl PgStore { query_executor: &PgQueryExecutor<'_>, req: DeleteRangeRequest, ) -> Result { - let mut params = vec![]; - let template = select_range_delete_template(&req); - if req.key != EMPTY { - let key = process_bytes(&req.key, "deleteRangeKey")?; - if template == PREFIX_DELETE { - let prefix = format!("{key}%"); - params.push(Cow::Owned(prefix)); - } else { - params.push(Cow::Borrowed(key)); - } + let template_type = range_template(&req.key, &req.range_end); + let template = self.sql_template_set.delete_template.get(template_type); + let params = template_type.build_params(req.key, req.range_end); + let params_ref = params.iter().map(|x| x as _).collect::>(); + let res = query_executor.query(template, ¶ms_ref).await?; + let mut resp = DeleteRangeResponse::new(res.len() as i64); + if req.prev_kv { + resp.with_prev_kvs( + res.into_iter() + .map(self.sql_template_set.key_value_from_row) + .collect(), + ); } - if template == RANGE_DELETE_FULL_RANGE && req.range_end != EMPTY { - let range_end = process_bytes(&req.range_end, "deleteRangeEnd")?; - params.push(Cow::Borrowed(range_end)); - } - let params: Vec<&(dyn ToSql + Sync)> = params - .iter() - .map(|x| match x { - Cow::Borrowed(borrowed) => borrowed as &(dyn ToSql + Sync), - Cow::Owned(owned) => owned as &(dyn ToSql + Sync), - }) - .collect(); - - let res = query_executor.query(template, ¶ms).await?; - let deleted = res.len() as i64; - if !req.prev_kv { - return Ok({ - DeleteRangeResponse { - deleted, - prev_kvs: vec![], - } - }); - } - let kvs: Vec = res - .into_iter() - .map(|r| { - let key: String = r.get(0); - let value: String = r.get(1); - KeyValue { - key: key.into_bytes(), - value: value.into_bytes(), - } - }) - .collect(); - Ok(DeleteRangeResponse { - deleted, - prev_kvs: kvs, - }) + Ok(resp) } async fn batch_delete_with_query_executor( @@ -567,78 +569,22 @@ impl PgStore { req: BatchDeleteRequest, ) -> Result { if req.keys.is_empty() { - return Ok(BatchDeleteResponse { prev_kvs: vec![] }); + return Ok(BatchDeleteResponse::default()); } - let query = generate_batch_delete_query(req.keys.len()); - let value_params = req - .keys - .iter() - .map(|k| process_bytes(k, "BatchDeleteRequestKey")) - .collect::>>()?; - let params: Vec<&(dyn ToSql + Sync)> = value_params - .iter() - .map(|x| x as &(dyn ToSql + Sync)) - .collect(); - + let query = self + .sql_template_set + .generate_batch_delete_query(req.keys.len()); + let params = req.keys.iter().map(|x| x as _).collect::>(); let res = query_executor.query(&query, ¶ms).await?; - if !req.prev_kv { - return Ok(BatchDeleteResponse { prev_kvs: vec![] }); - } - let kvs: Vec = res - .into_iter() - .map(|r| { - let key: String = r.get(0); - let value: String = r.get(1); - KeyValue { - key: key.into_bytes(), - value: value.into_bytes(), - } - }) - .collect(); - Ok(BatchDeleteResponse { prev_kvs: kvs }) - } - - async fn compare_and_put_with_query_executor( - &self, - query_executor: &PgQueryExecutor<'_>, - req: CompareAndPutRequest, - ) -> Result { - let key = process_bytes(&req.key, "CASKey")?; - let value = process_bytes(&req.value, "CASValue")?; - if req.expect.is_empty() { - let put_res = self - .put_if_not_exists_with_query_executor(query_executor, key, value) - .await?; - return Ok(CompareAndPutResponse { - success: put_res, - prev_kv: None, - }); - } - let expect = process_bytes(&req.expect, "CASExpect")?; - - let res = query_executor.query(CAS, &[&key, &value, &expect]).await?; - match res.is_empty() { - true => Ok(CompareAndPutResponse { - success: false, - prev_kv: None, - }), - false => { - let mut kvs: Vec = res + if req.prev_kv { + Ok(BatchDeleteResponse { + prev_kvs: res .into_iter() - .map(|r| { - let key: String = r.get(0); - let value: String = r.get(1); - KeyValue { - key: key.into_bytes(), - value: value.into_bytes(), - } - }) - .collect(); - Ok(CompareAndPutResponse { - success: true, - prev_kv: Some(kvs.remove(0)), - }) - } + .map(self.sql_template_set.key_value_from_row) + .collect(), + }) + } else { + Ok(BatchDeleteResponse::default()) } } @@ -653,11 +599,12 @@ impl PgStore { let res = self .batch_get_with_query_executor(query_executor, batch_get_req) .await?; + debug!("batch get res: {:?}", res); let res_map = res .kvs .into_iter() .map(|kv| (kv.key, kv.value)) - .collect::, Vec>>(); + .collect::, Vec>>(); for c in cmp { let value = res_map.get(&c.key); if !c.compare_value(value) { @@ -676,130 +623,121 @@ impl PgStore { if !check_txn_ops(txn_ops)? { return Ok(None); } - match txn_ops.first() { - Some(TxnOp::Delete(_)) => { - let mut batch_del_req = BatchDeleteRequest { - keys: vec![], - prev_kv: false, - }; - for op in txn_ops { - if let TxnOp::Delete(key) = op { - batch_del_req.keys.push(key.clone()); - } - } - let res = self - .batch_delete_with_query_executor(query_executor, batch_del_req) - .await?; - let res_map = res - .prev_kvs - .into_iter() - .map(|kv| (kv.key, kv.value)) - .collect::, Vec>>(); - let mut resps = Vec::with_capacity(txn_ops.len()); - for op in txn_ops { - if let TxnOp::Delete(key) = op { - let value = res_map.get(key); - resps.push(TxnOpResponse::ResponseDelete(DeleteRangeResponse { - deleted: if value.is_some() { 1 } else { 0 }, - prev_kvs: value - .map(|v| { - vec![KeyValue { - key: key.clone(), - value: v.clone(), - }] - }) - .unwrap_or_default(), - })); - } - } - Ok(Some(resps)) - } - Some(TxnOp::Put(_, _)) => { - let mut batch_put_req = BatchPutRequest { - kvs: vec![], - prev_kv: false, - }; - for op in txn_ops { - if let TxnOp::Put(key, value) = op { - batch_put_req.kvs.push(KeyValue { - key: key.clone(), - value: value.clone(), - }); - } - } - let res = self - .batch_put_with_query_executor(query_executor, batch_put_req) - .await?; - let res_map = res - .prev_kvs - .into_iter() - .map(|kv| (kv.key, kv.value)) - .collect::, Vec>>(); - let mut resps = Vec::with_capacity(txn_ops.len()); - for op in txn_ops { - if let TxnOp::Put(key, _) = op { - let prev_kv = res_map.get(key); - match prev_kv { - Some(v) => { - resps.push(TxnOpResponse::ResponsePut(PutResponse { - prev_kv: Some(KeyValue { - key: key.clone(), - value: v.clone(), - }), - })); - } - None => { - resps.push(TxnOpResponse::ResponsePut(PutResponse { - prev_kv: None, - })); - } - } - } - } - Ok(Some(resps)) - } - Some(TxnOp::Get(_)) => { - let mut batch_get_req = BatchGetRequest { keys: vec![] }; - for op in txn_ops { - if let TxnOp::Get(key) = op { - batch_get_req.keys.push(key.clone()); - } - } - let res = self - .batch_get_with_query_executor(query_executor, batch_get_req) - .await?; - let res_map = res - .kvs - .into_iter() - .map(|kv| (kv.key, kv.value)) - .collect::, Vec>>(); - let mut resps = Vec::with_capacity(txn_ops.len()); - for op in txn_ops { - if let TxnOp::Get(key) = op { - let value = res_map.get(key); - resps.push(TxnOpResponse::ResponseGet(RangeResponse { - kvs: value - .map(|v| { - vec![KeyValue { - key: key.clone(), - value: v.clone(), - }] - }) - .unwrap_or_default(), - more: false, - })); - } - } - Ok(Some(resps)) - } - None => Ok(Some(vec![])), + // Safety: txn_ops is not empty + match txn_ops.first().unwrap() { + TxnOp::Delete(_) => self.handle_batch_delete(query_executor, txn_ops).await, + TxnOp::Put(_, _) => self.handle_batch_put(query_executor, txn_ops).await, + TxnOp::Get(_) => self.handle_batch_get(query_executor, txn_ops).await, } } + async fn handle_batch_delete( + &self, + query_executor: &PgQueryExecutor<'_>, + txn_ops: &[TxnOp], + ) -> Result>> { + let mut batch_del_req = BatchDeleteRequest { + keys: vec![], + prev_kv: true, + }; + for op in txn_ops { + if let TxnOp::Delete(key) = op { + batch_del_req.keys.push(key.clone()); + } + } + let res = self + .batch_delete_with_query_executor(query_executor, batch_del_req) + .await?; + let res_map = res + .prev_kvs + .into_iter() + .map(|kv| (kv.key, kv.value)) + .collect::, Vec>>(); + let mut resps = Vec::with_capacity(txn_ops.len()); + for op in txn_ops { + if let TxnOp::Delete(key) = op { + let value = res_map.get(key); + resps.push(TxnOpResponse::ResponseDelete(DeleteRangeResponse { + deleted: if value.is_some() { 1 } else { 0 }, + prev_kvs: vec![], + })); + } + } + Ok(Some(resps)) + } + + async fn handle_batch_put( + &self, + query_executor: &PgQueryExecutor<'_>, + txn_ops: &[TxnOp], + ) -> Result>> { + let mut batch_put_req = BatchPutRequest { + kvs: vec![], + prev_kv: false, + }; + for op in txn_ops { + if let TxnOp::Put(key, value) = op { + batch_put_req.kvs.push(KeyValue { + key: key.clone(), + value: value.clone(), + }); + } + } + let _ = self + .batch_put_with_query_executor(query_executor, batch_put_req) + .await?; + let mut resps = Vec::with_capacity(txn_ops.len()); + for op in txn_ops { + if let TxnOp::Put(_, _) = op { + resps.push(TxnOpResponse::ResponsePut(PutResponse { prev_kv: None })); + } + } + Ok(Some(resps)) + } + + async fn handle_batch_get( + &self, + query_executor: &PgQueryExecutor<'_>, + txn_ops: &[TxnOp], + ) -> Result>> { + let mut batch_get_req = BatchGetRequest { keys: vec![] }; + for op in txn_ops { + if let TxnOp::Get(key) = op { + batch_get_req.keys.push(key.clone()); + } + } + let res = self + .batch_get_with_query_executor(query_executor, batch_get_req) + .await?; + let res_map = res + .kvs + .into_iter() + .map(|kv| (kv.key, kv.value)) + .collect::, Vec>>(); + let mut resps = Vec::with_capacity(txn_ops.len()); + for op in txn_ops { + if let TxnOp::Get(key) = op { + let value = res_map.get(key); + resps.push(TxnOpResponse::ResponseGet(RangeResponse { + kvs: value + .map(|v| { + vec![KeyValue { + key: key.clone(), + value: v.clone(), + }] + }) + .unwrap_or_default(), + more: false, + })); + } + } + Ok(Some(resps)) + } + async fn execute_txn_op( &self, query_executor: &PgQueryExecutor<'_>, - op: TxnOp, + op: &TxnOp, ) -> Result { match op { TxnOp::Put(key, value) => { @@ -807,8 +745,8 @@ impl PgStore { .put_with_query_executor( query_executor, PutRequest { - key, - value, + key: key.clone(), + value: value.clone(), prev_kv: false, }, ) @@ -820,7 +758,7 @@ impl PgStore { .range_with_query_executor( query_executor, RangeRequest { - key, + key: key.clone(), range_end: vec![], limit: 1, keys_only: false, @@ -834,7 +772,7 @@ impl PgStore { .delete_range_with_query_executor( query_executor, DeleteRangeRequest { - key, + key: key.clone(), range_end: vec![], prev_kv: false, }, @@ -844,19 +782,10 @@ impl PgStore { } } } -} -#[async_trait::async_trait] -impl TxnService for PgStore { - type Error = Error; - - async fn txn(&self, txn: KvTxn) -> Result { - let _timer = METRIC_META_TXN_REQUEST - .with_label_values(&["postgres", "txn"]) - .start_timer(); - - let mut client = self.get_client().await?; - let pg_txn = self.get_txn_executor(&mut client).await?; + async fn txn_inner(&self, txn: &KvTxn) -> Result { + let mut client = self.client().await?; + let pg_txn = self.txn_executor(&mut client).await?; let mut success = true; if txn.c_when { success = self.execute_txn_cmp(&pg_txn, &txn.req.compare).await?; @@ -866,7 +795,7 @@ impl TxnService for PgStore { match self.try_batch_txn(&pg_txn, &txn.req.success).await? { Some(res) => responses.extend(res), None => { - for txnop in txn.req.success { + for txnop in &txn.req.success { let res = self.execute_txn_op(&pg_txn, txnop).await?; responses.push(res); } @@ -876,7 +805,7 @@ impl TxnService for PgStore { match self.try_batch_txn(&pg_txn, &txn.req.failure).await? { Some(res) => responses.extend(res), None => { - for txnop in txn.req.failure { + for txnop in &txn.req.failure { let res = self.execute_txn_op(&pg_txn, txnop).await?; responses.push(res); } @@ -890,6 +819,43 @@ impl TxnService for PgStore { succeeded: success, }) } +} + +#[async_trait::async_trait] +impl TxnService for DefaultPgStore { + type Error = Error; + + async fn txn(&self, txn: KvTxn) -> Result { + let _timer = METRIC_META_TXN_REQUEST + .with_label_values(&["postgres", "txn"]) + .start_timer(); + + let mut backoff = ExponentialBuilder::default() + .with_min_delay(Duration::from_millis(10)) + .with_max_delay(Duration::from_millis(200)) + .with_max_times(self.txn_retry_count) + .build(); + + loop { + match self.txn_inner(&txn).await { + Ok(res) => return Ok(res), + Err(e) => { + if e.is_serialization_error() { + let d = backoff.next(); + if let Some(d) = d { + tokio::time::sleep(d).await; + continue; + } + break; + } else { + return Err(e); + } + } + } + } + + PostgresTransactionRetryFailedSnafu {}.fail() + } fn max_txn_ops(&self) -> usize { self.max_txn_ops @@ -908,23 +874,20 @@ fn is_prefix_range(start: &[u8], end: &[u8]) -> bool { false } -/// Check if the transaction operations are the same type. +/// Checks if the transaction operations are the same type. fn check_txn_ops(txn_ops: &[TxnOp]) -> Result { if txn_ops.is_empty() { return Ok(false); } - let first_op = &txn_ops[0]; - for op in txn_ops { - match (op, first_op) { - (TxnOp::Put(_, _), TxnOp::Put(_, _)) => {} - (TxnOp::Get(_), TxnOp::Get(_)) => {} - (TxnOp::Delete(_), TxnOp::Delete(_)) => {} - _ => { - return Ok(false); - } - } - } - Ok(true) + let same = txn_ops.windows(2).all(|a| { + matches!( + (&a[0], &a[1]), + (TxnOp::Put(_, _), TxnOp::Put(_, _)) + | (TxnOp::Get(_), TxnOp::Get(_)) + | (TxnOp::Delete(_), TxnOp::Delete(_)) + ) + }); + Ok(same) } #[cfg(test)] @@ -939,7 +902,7 @@ mod tests { unprepare_kv, }; - async fn build_pg_kv_backend() -> Option { + async fn build_pg_kv_backend(table_name: &str) -> Option { let endpoints = std::env::var("GT_POSTGRES_ENDPOINTS").unwrap_or_default(); if endpoints.is_empty() { return None; @@ -952,71 +915,110 @@ mod tests { .context(CreatePostgresPoolSnafu) .unwrap(); let client = pool.get().await.unwrap(); + let template_factory = SqlTemplateFactory::new(table_name); + let sql_templates = template_factory.build(key_value_from_row, key_value_from_row_key_only); client - .execute(METADKV_CREATION, &[]) + .execute(&sql_templates.create_table_statement, &[]) .await - .context(PostgresExecutionSnafu) + .context(PostgresExecutionSnafu { + sql: sql_templates.create_table_statement.to_string(), + }) .unwrap(); Some(PgStore { pool, max_txn_ops: 128, + sql_template_set: sql_templates, + txn_retry_count: PG_STORE_TXN_RETRY_COUNT, }) } #[tokio::test] - async fn test_pg_crud() { - if let Some(kv_backend) = build_pg_kv_backend().await { + async fn test_pg_put() { + if let Some(kv_backend) = build_pg_kv_backend("put_test").await { let prefix = b"put/"; prepare_kv_with_prefix(&kv_backend, prefix.to_vec()).await; test_kv_put_with_prefix(&kv_backend, prefix.to_vec()).await; unprepare_kv(&kv_backend, prefix).await; + } + } + #[tokio::test] + async fn test_pg_range() { + if let Some(kv_backend) = build_pg_kv_backend("range_test").await { let prefix = b"range/"; prepare_kv_with_prefix(&kv_backend, prefix.to_vec()).await; test_kv_range_with_prefix(&kv_backend, prefix.to_vec()).await; unprepare_kv(&kv_backend, prefix).await; + } + } - let prefix = b"batchGet/"; + #[tokio::test] + async fn test_pg_range_2() { + if let Some(kv_backend) = build_pg_kv_backend("range2_test").await { + let prefix = b"range2/"; + test_kv_range_2_with_prefix(&kv_backend, prefix.to_vec()).await; + unprepare_kv(&kv_backend, prefix).await; + } + } + + #[tokio::test] + async fn test_pg_batch_get() { + if let Some(kv_backend) = build_pg_kv_backend("batch_get_test").await { + let prefix = b"batch_get/"; prepare_kv_with_prefix(&kv_backend, prefix.to_vec()).await; test_kv_batch_get_with_prefix(&kv_backend, prefix.to_vec()).await; unprepare_kv(&kv_backend, prefix).await; + } + } - let prefix = b"deleteRange/"; + #[tokio::test] + async fn test_pg_batch_delete() { + if let Some(kv_backend) = build_pg_kv_backend("batch_delete_test").await { + let prefix = b"batch_delete/"; prepare_kv_with_prefix(&kv_backend, prefix.to_vec()).await; - test_kv_delete_range_with_prefix(kv_backend, prefix.to_vec()).await; + test_kv_delete_range_with_prefix(&kv_backend, prefix.to_vec()).await; + unprepare_kv(&kv_backend, prefix).await; } + } - if let Some(kv_backend) = build_pg_kv_backend().await { - test_kv_range_2_with_prefix(kv_backend, b"range2/".to_vec()).await; + #[tokio::test] + async fn test_pg_batch_delete_with_prefix() { + if let Some(kv_backend) = build_pg_kv_backend("batch_delete_prefix_test").await { + let prefix = b"batch_delete/"; + prepare_kv_with_prefix(&kv_backend, prefix.to_vec()).await; + test_kv_batch_delete_with_prefix(&kv_backend, prefix.to_vec()).await; + unprepare_kv(&kv_backend, prefix).await; } + } - if let Some(kv_backend) = build_pg_kv_backend().await { + #[tokio::test] + async fn test_pg_delete_range() { + if let Some(kv_backend) = build_pg_kv_backend("delete_range_test").await { + let prefix = b"delete_range/"; + prepare_kv_with_prefix(&kv_backend, prefix.to_vec()).await; + test_kv_delete_range_with_prefix(&kv_backend, prefix.to_vec()).await; + unprepare_kv(&kv_backend, prefix).await; + } + } + + #[tokio::test] + async fn test_pg_compare_and_put() { + if let Some(kv_backend) = build_pg_kv_backend("compare_and_put_test").await { + let prefix = b"compare_and_put/"; let kv_backend = Arc::new(kv_backend); - test_kv_compare_and_put_with_prefix(kv_backend, b"compareAndPut/".to_vec()).await; + test_kv_compare_and_put_with_prefix(kv_backend.clone(), prefix.to_vec()).await; } + } - if let Some(kv_backend) = build_pg_kv_backend().await { - let prefix = b"batchDelete/"; - prepare_kv_with_prefix(&kv_backend, prefix.to_vec()).await; - test_kv_batch_delete_with_prefix(kv_backend, prefix.to_vec()).await; - } - - if let Some(kv_backend) = build_pg_kv_backend().await { - let kv_backend_ref = Arc::new(kv_backend); - test_txn_one_compare_op(kv_backend_ref.clone()).await; - text_txn_multi_compare_op(kv_backend_ref.clone()).await; - test_txn_compare_equal(kv_backend_ref.clone()).await; - test_txn_compare_greater(kv_backend_ref.clone()).await; - test_txn_compare_less(kv_backend_ref.clone()).await; - test_txn_compare_not_equal(kv_backend_ref.clone()).await; - // Clean up - kv_backend_ref - .get_client() - .await - .unwrap() - .execute("DELETE FROM greptime_metakv", &[]) - .await - .unwrap(); + #[tokio::test] + async fn test_pg_txn() { + if let Some(kv_backend) = build_pg_kv_backend("txn_test").await { + test_txn_one_compare_op(&kv_backend).await; + text_txn_multi_compare_op(&kv_backend).await; + test_txn_compare_equal(&kv_backend).await; + test_txn_compare_greater(&kv_backend).await; + test_txn_compare_less(&kv_backend).await; + test_txn_compare_not_equal(&kv_backend).await; } } } diff --git a/src/common/meta/src/kv_backend/test.rs b/src/common/meta/src/kv_backend/test.rs index d428b6ed22..bb38c5a205 100644 --- a/src/common/meta/src/kv_backend/test.rs +++ b/src/common/meta/src/kv_backend/test.rs @@ -61,14 +61,18 @@ pub async fn prepare_kv_with_prefix(kv_backend: &impl KvBackend, prefix: Vec pub async fn unprepare_kv(kv_backend: &impl KvBackend, prefix: &[u8]) { let range_end = util::get_prefix_end_key(prefix); - assert!(kv_backend - .delete_range(DeleteRangeRequest { - key: prefix.to_vec(), - range_end, - ..Default::default() - }) - .await - .is_ok()); + assert!( + kv_backend + .delete_range(DeleteRangeRequest { + key: prefix.to_vec(), + range_end, + ..Default::default() + }) + .await + .is_ok(), + "prefix: {:?}", + std::str::from_utf8(prefix).unwrap() + ); } pub async fn test_kv_put(kv_backend: &impl KvBackend) { @@ -170,11 +174,11 @@ pub async fn test_kv_range_with_prefix(kv_backend: &impl KvBackend, prefix: Vec< assert_eq!(b"val1", resp.kvs[0].value()); } -pub async fn test_kv_range_2(kv_backend: impl KvBackend) { +pub async fn test_kv_range_2(kv_backend: &impl KvBackend) { test_kv_range_2_with_prefix(kv_backend, vec![]).await; } -pub async fn test_kv_range_2_with_prefix(kv_backend: impl KvBackend, prefix: Vec) { +pub async fn test_kv_range_2_with_prefix(kv_backend: &impl KvBackend, prefix: Vec) { let atest = [prefix.clone(), b"atest".to_vec()].concat(); let test = [prefix.clone(), b"test".to_vec()].concat(); @@ -348,11 +352,11 @@ pub async fn test_kv_compare_and_put_with_prefix( assert!(resp.is_none()); } -pub async fn test_kv_delete_range(kv_backend: impl KvBackend) { +pub async fn test_kv_delete_range(kv_backend: &impl KvBackend) { test_kv_delete_range_with_prefix(kv_backend, vec![]).await; } -pub async fn test_kv_delete_range_with_prefix(kv_backend: impl KvBackend, prefix: Vec) { +pub async fn test_kv_delete_range_with_prefix(kv_backend: &impl KvBackend, prefix: Vec) { let key3 = [prefix.clone(), b"key3".to_vec()].concat(); let req = DeleteRangeRequest { key: key3.clone(), @@ -403,11 +407,11 @@ pub async fn test_kv_delete_range_with_prefix(kv_backend: impl KvBackend, prefix assert!(resp.kvs.is_empty()); } -pub async fn test_kv_batch_delete(kv_backend: impl KvBackend) { +pub async fn test_kv_batch_delete(kv_backend: &impl KvBackend) { test_kv_batch_delete_with_prefix(kv_backend, vec![]).await; } -pub async fn test_kv_batch_delete_with_prefix(kv_backend: impl KvBackend, prefix: Vec) { +pub async fn test_kv_batch_delete_with_prefix(kv_backend: &impl KvBackend, prefix: Vec) { let key1 = [prefix.clone(), b"key1".to_vec()].concat(); let key100 = [prefix.clone(), b"key100".to_vec()].concat(); assert!(kv_backend.get(&key1).await.unwrap().is_some()); @@ -447,7 +451,7 @@ pub async fn test_kv_batch_delete_with_prefix(kv_backend: impl KvBackend, prefix assert!(kv_backend.get(&key11).await.unwrap().is_none()); } -pub async fn test_txn_one_compare_op(kv_backend: KvBackendRef) { +pub async fn test_txn_one_compare_op(kv_backend: &impl KvBackend) { let _ = kv_backend .put(PutRequest { key: vec![11], @@ -472,7 +476,7 @@ pub async fn test_txn_one_compare_op(kv_backend: KvBackendRef) { assert_eq!(txn_response.responses.len(), 1); } -pub async fn text_txn_multi_compare_op(kv_backend: KvBackendRef) { +pub async fn text_txn_multi_compare_op(kv_backend: &impl KvBackend) { for i in 1..3 { let _ = kv_backend .put(PutRequest { @@ -502,7 +506,7 @@ pub async fn text_txn_multi_compare_op(kv_backend: KvBackendRef) { assert_eq!(txn_response.responses.len(), 2); } -pub async fn test_txn_compare_equal(kv_backend: KvBackendRef) { +pub async fn test_txn_compare_equal(kv_backend: &impl KvBackend) { let key = vec![101u8]; kv_backend.delete(&key, false).await.unwrap(); @@ -531,7 +535,7 @@ pub async fn test_txn_compare_equal(kv_backend: KvBackendRef) { assert!(txn_response.succeeded); } -pub async fn test_txn_compare_greater(kv_backend: KvBackendRef) { +pub async fn test_txn_compare_greater(kv_backend: &impl KvBackend) { let key = vec![102u8]; kv_backend.delete(&key, false).await.unwrap(); @@ -571,7 +575,7 @@ pub async fn test_txn_compare_greater(kv_backend: KvBackendRef) { ); } -pub async fn test_txn_compare_less(kv_backend: KvBackendRef) { +pub async fn test_txn_compare_less(kv_backend: &impl KvBackend) { let key = vec![103u8]; kv_backend.delete(&[3], false).await.unwrap(); @@ -611,7 +615,7 @@ pub async fn test_txn_compare_less(kv_backend: KvBackendRef) { ); } -pub async fn test_txn_compare_not_equal(kv_backend: KvBackendRef) { +pub async fn test_txn_compare_not_equal(kv_backend: &impl KvBackend) { let key = vec![104u8]; kv_backend.delete(&key, false).await.unwrap(); diff --git a/src/common/meta/src/rpc/store.rs b/src/common/meta/src/rpc/store.rs index f763d6b443..2553699397 100644 --- a/src/common/meta/src/rpc/store.rs +++ b/src/common/meta/src/rpc/store.rs @@ -266,7 +266,7 @@ impl PutRequest { } } -#[derive(Debug, Clone, PartialEq)] +#[derive(Debug, Clone, PartialEq, Default)] pub struct PutResponse { pub prev_kv: Option, } @@ -425,7 +425,7 @@ impl BatchPutRequest { } } -#[derive(Debug, Clone)] +#[derive(Debug, Clone, Default)] pub struct BatchPutResponse { pub prev_kvs: Vec, } @@ -509,7 +509,7 @@ impl BatchDeleteRequest { } } -#[derive(Debug, Clone)] +#[derive(Debug, Clone, Default)] pub struct BatchDeleteResponse { pub prev_kvs: Vec, } @@ -754,6 +754,19 @@ impl TryFrom for DeleteRangeResponse { } impl DeleteRangeResponse { + /// Creates a new [`DeleteRangeResponse`] with the given deleted count. + pub fn new(deleted: i64) -> Self { + Self { + deleted, + prev_kvs: vec![], + } + } + + /// Creates a new [`DeleteRangeResponse`] with the given deleted count and previous key-value pairs. + pub fn with_prev_kvs(&mut self, prev_kvs: Vec) { + self.prev_kvs = prev_kvs; + } + pub fn to_proto_resp(self, header: PbResponseHeader) -> PbDeleteRangeResponse { PbDeleteRangeResponse { header: Some(header), diff --git a/src/common/procedure/Cargo.toml b/src/common/procedure/Cargo.toml index 1d8c6736e3..99ed2e5617 100644 --- a/src/common/procedure/Cargo.toml +++ b/src/common/procedure/Cargo.toml @@ -13,7 +13,7 @@ workspace = true [dependencies] async-stream.workspace = true async-trait.workspace = true -backon = "1" +backon.workspace = true common-base.workspace = true common-error.workspace = true common-macro.workspace = true diff --git a/src/log-store/src/raft_engine/backend.rs b/src/log-store/src/raft_engine/backend.rs index 33cb64a2e8..37a9af18a4 100644 --- a/src/log-store/src/raft_engine/backend.rs +++ b/src/log-store/src/raft_engine/backend.rs @@ -644,7 +644,7 @@ mod tests { let dir = create_temp_dir("range2"); let backend = build_kv_backend(dir.path().to_str().unwrap().to_string()); - test_kv_range_2(backend).await; + test_kv_range_2(&backend).await; } #[tokio::test] @@ -671,7 +671,7 @@ mod tests { let backend = build_kv_backend(dir.path().to_str().unwrap().to_string()); prepare_kv(&backend).await; - test_kv_batch_delete(backend).await; + test_kv_batch_delete(&backend).await; } #[tokio::test] @@ -680,7 +680,7 @@ mod tests { let backend = build_kv_backend(dir.path().to_str().unwrap().to_string()); prepare_kv(&backend).await; - test_kv_delete_range(backend).await; + test_kv_delete_range(&backend).await; } #[tokio::test(flavor = "multi_thread")] diff --git a/src/meta-srv/src/bootstrap.rs b/src/meta-srv/src/bootstrap.rs index 91a58e7d5b..9ed6ed66c3 100644 --- a/src/meta-srv/src/bootstrap.rs +++ b/src/meta-srv/src/bootstrap.rs @@ -229,7 +229,8 @@ pub async fn metasrv_builder( #[cfg(feature = "pg_kvbackend")] (None, BackendImpl::PostgresStore) => { let pool = create_postgres_pool(opts).await?; - let kv_backend = PgStore::with_pg_pool(pool, opts.max_txn_ops) + // TODO(CookiePie): use table name from config. + let kv_backend = PgStore::with_pg_pool(pool, "greptime_metakv", opts.max_txn_ops) .await .context(error::KvBackendSnafu)?; // Client for election should be created separately since we need a different session keep-alive idle time. diff --git a/src/meta-srv/src/election/postgres.rs b/src/meta-srv/src/election/postgres.rs index 94de90bde9..192fa682bf 100644 --- a/src/meta-srv/src/election/postgres.rs +++ b/src/meta-srv/src/election/postgres.rs @@ -23,6 +23,7 @@ use itertools::Itertools; use snafu::{ensure, OptionExt, ResultExt}; use tokio::sync::broadcast; use tokio::time::MissedTickBehavior; +use tokio_postgres::types::ToSql; use tokio_postgres::Client; use crate::election::{ @@ -39,7 +40,7 @@ const CAMPAIGN: &str = "SELECT pg_try_advisory_lock({})"; const STEP_DOWN: &str = "SELECT pg_advisory_unlock({})"; // Currently the session timeout is longer than the leader lease time, so the leader lease may expire while the session is still alive. // Either the leader reconnects and step down or the session expires and the lock is released. -const SET_IDLE_SESSION_TIMEOUT: &str = "SET idle_in_transaction_session_timeout = '10s';"; +const SET_IDLE_SESSION_TIMEOUT: &str = "SET idle_session_timeout = '10s';"; // Separator between value and expire time. const LEASE_SEP: &str = r#"||__metadata_lease_sep||"#; @@ -50,7 +51,7 @@ WITH prev AS ( SELECT k, v FROM greptime_metakv WHERE k = $1 ), insert AS ( INSERT INTO greptime_metakv - VALUES($1, $2 || $3 || TO_CHAR(CURRENT_TIMESTAMP + INTERVAL '1 second' * $4, 'YYYY-MM-DD HH24:MI:SS.MS')) + VALUES($1, convert_to($2 || $3 || TO_CHAR(CURRENT_TIMESTAMP + INTERVAL '1 second' * $4, 'YYYY-MM-DD HH24:MI:SS.MS'), 'UTF8')) ON CONFLICT (k) DO NOTHING ) @@ -61,7 +62,7 @@ SELECT k, v FROM prev; const CAS_WITH_EXPIRE_TIME: &str = r#" UPDATE greptime_metakv SET k=$1, -v=$3 || $4 || TO_CHAR(CURRENT_TIMESTAMP + INTERVAL '1 second' * $5, 'YYYY-MM-DD HH24:MI:SS.MS') +v=convert_to($3 || $4 || TO_CHAR(CURRENT_TIMESTAMP + INTERVAL '1 second' * $5, 'YYYY-MM-DD HH24:MI:SS.MS'), 'UTF8') WHERE k=$1 AND v=$2 "#; @@ -329,12 +330,13 @@ impl PgElection { /// Returns value, expire time and current time. If `with_origin` is true, the origin string is also returned. async fn get_value_with_lease( &self, - key: &String, + key: &str, with_origin: bool, ) -> Result)>> { + let key = key.as_bytes().to_vec(); let res = self .client - .query(GET_WITH_CURRENT_TIMESTAMP, &[&key]) + .query(GET_WITH_CURRENT_TIMESTAMP, &[&key as &(dyn ToSql + Sync)]) .await .context(PostgresExecutionSnafu)?; @@ -342,7 +344,7 @@ impl PgElection { Ok(None) } else { // Safety: Checked if res is empty above. - let current_time_str = res[0].get(1); + let current_time_str = res[0].try_get(1).unwrap_or_default(); let current_time = match Timestamp::from_str(current_time_str, None) { Ok(ts) => ts, Err(_) => UnexpectedSnafu { @@ -351,8 +353,9 @@ impl PgElection { .fail()?, }; // Safety: Checked if res is empty above. - let value_and_expire_time = res[0].get(0); - let (value, expire_time) = parse_value_and_expire_time(value_and_expire_time)?; + let value_and_expire_time = + String::from_utf8_lossy(res[0].try_get(0).unwrap_or_default()); + let (value, expire_time) = parse_value_and_expire_time(&value_and_expire_time)?; if with_origin { Ok(Some(( @@ -372,17 +375,20 @@ impl PgElection { &self, key_prefix: &str, ) -> Result<(Vec<(String, Timestamp)>, Timestamp)> { - let key_prefix = format!("{}%", key_prefix); + let key_prefix = format!("{}%", key_prefix).as_bytes().to_vec(); let res = self .client - .query(PREFIX_GET_WITH_CURRENT_TIMESTAMP, &[&key_prefix]) + .query( + PREFIX_GET_WITH_CURRENT_TIMESTAMP, + &[(&key_prefix as &(dyn ToSql + Sync))], + ) .await .context(PostgresExecutionSnafu)?; let mut values_with_leases = vec![]; let mut current = Timestamp::default(); for row in res { - let current_time_str = row.get(1); + let current_time_str = row.try_get(1).unwrap_or_default(); current = match Timestamp::from_str(current_time_str, None) { Ok(ts) => ts, Err(_) => UnexpectedSnafu { @@ -391,8 +397,8 @@ impl PgElection { .fail()?, }; - let value_and_expire_time = row.get(0); - let (value, expire_time) = parse_value_and_expire_time(value_and_expire_time)?; + let value_and_expire_time = String::from_utf8_lossy(row.try_get(0).unwrap_or_default()); + let (value, expire_time) = parse_value_and_expire_time(&value_and_expire_time)?; values_with_leases.push((value, expire_time)); } @@ -400,13 +406,15 @@ impl PgElection { } async fn update_value_with_lease(&self, key: &str, prev: &str, updated: &str) -> Result<()> { + let key = key.as_bytes().to_vec(); + let prev = prev.as_bytes().to_vec(); let res = self .client .execute( CAS_WITH_EXPIRE_TIME, &[ - &key, - &prev, + &key as &(dyn ToSql + Sync), + &prev as &(dyn ToSql + Sync), &updated, &LEASE_SEP, &(self.candidate_lease_ttl_secs as f64), @@ -418,7 +426,7 @@ impl PgElection { ensure!( res == 1, UnexpectedSnafu { - violated: format!("Failed to update key: {}", key), + violated: format!("Failed to update key: {}", String::from_utf8_lossy(&key)), } ); @@ -432,12 +440,17 @@ impl PgElection { value: &str, lease_ttl_secs: u64, ) -> Result { + let key = key.as_bytes().to_vec(); + let lease_ttl_secs = lease_ttl_secs as f64; + let params: Vec<&(dyn ToSql + Sync)> = vec![ + &key as &(dyn ToSql + Sync), + &value as &(dyn ToSql + Sync), + &LEASE_SEP, + &lease_ttl_secs, + ]; let res = self .client - .query( - PUT_IF_NOT_EXISTS_WITH_EXPIRE_TIME, - &[&key, &value, &LEASE_SEP, &(lease_ttl_secs as f64)], - ) + .query(PUT_IF_NOT_EXISTS_WITH_EXPIRE_TIME, ¶ms) .await .context(PostgresExecutionSnafu)?; Ok(res.is_empty()) @@ -445,10 +458,11 @@ impl PgElection { /// Returns `true` if the deletion is successful. /// Caution: Should only delete the key if the lease is expired. - async fn delete_value(&self, key: &String) -> Result { + async fn delete_value(&self, key: &str) -> Result { + let key = key.as_bytes().to_vec(); let res = self .client - .query(POINT_DELETE, &[&key]) + .query(POINT_DELETE, &[&key as &(dyn ToSql + Sync)]) .await .context(PostgresExecutionSnafu)?; @@ -635,6 +649,8 @@ mod tests { use super::*; use crate::error::PostgresExecutionSnafu; + const CREATE_TABLE: &str = + "CREATE TABLE IF NOT EXISTS greptime_metakv(k bytea PRIMARY KEY, v bytea);"; async fn create_postgres_client() -> Result { let endpoint = env::var("GT_POSTGRES_ENDPOINTS").unwrap_or_default(); @@ -650,6 +666,7 @@ mod tests { tokio::spawn(async move { connection.await.context(PostgresExecutionSnafu).unwrap(); }); + client.execute(CREATE_TABLE, &[]).await.unwrap(); Ok(client) } @@ -1152,6 +1169,7 @@ mod tests { #[tokio::test] async fn test_follower_action() { + common_telemetry::init_default_ut_logging(); let candidate_lease_ttl_secs = 5; let store_key_prefix = uuid::Uuid::new_v4().to_string(); diff --git a/tests/runner/src/env.rs b/tests/runner/src/env.rs index 81bbe2fb0b..2782661c25 100644 --- a/tests/runner/src/env.rs +++ b/tests/runner/src/env.rs @@ -70,6 +70,7 @@ pub enum WalConfig { pub struct StoreConfig { pub store_addrs: Vec, pub setup_etcd: bool, + pub setup_pg: bool, } #[derive(Clone)] @@ -159,6 +160,7 @@ impl Env { self.build_db(); self.setup_wal(); self.setup_etcd(); + self.setup_pg(); let db_ctx = GreptimeDBContext::new(self.wal.clone(), self.store_config.clone()); @@ -383,7 +385,21 @@ impl Env { "-c".to_string(), self.generate_config_file(subcommand, db_ctx), ]; - if db_ctx.store_config().store_addrs.is_empty() { + if db_ctx.store_config().setup_pg { + let client_ports = self + .store_config + .store_addrs + .iter() + .map(|s| s.split(':').nth(1).unwrap().parse::().unwrap()) + .collect::>(); + let client_port = client_ports.first().unwrap_or(&5432); + let pg_server_addr = format!( + "postgresql://greptimedb:admin@127.0.0.1:{}/postgres", + client_port + ); + args.extend(vec!["--backend".to_string(), "postgres-store".to_string()]); + args.extend(vec!["--store-addrs".to_string(), pg_server_addr]); + } else if db_ctx.store_config().store_addrs.is_empty() { args.extend(vec!["--backend".to_string(), "memory-store".to_string()]) } (args, vec![METASRV_ADDR.to_string()]) @@ -570,6 +586,20 @@ impl Env { } } + /// Setup PostgreSql if needed. + fn setup_pg(&self) { + if self.store_config.setup_pg { + let client_ports = self + .store_config + .store_addrs + .iter() + .map(|s| s.split(':').nth(1).unwrap().parse::().unwrap()) + .collect::>(); + let client_port = client_ports.first().unwrap_or(&5432); + util::setup_pg(*client_port, None); + } + } + /// Generate config file to `/tmp/{subcommand}-{current_time}.toml` fn generate_config_file(&self, subcommand: &str, db_ctx: &GreptimeDBContext) -> String { let mut tt = TinyTemplate::new(); diff --git a/tests/runner/src/main.rs b/tests/runner/src/main.rs index 2e3158e195..9701765d0a 100644 --- a/tests/runner/src/main.rs +++ b/tests/runner/src/main.rs @@ -106,6 +106,10 @@ struct Args { /// Whether to setup etcd, by default it is false. #[clap(long, default_value = "false")] setup_etcd: bool, + + /// Whether to setup pg, by default it is false. + #[clap(long, default_value = "false")] + setup_pg: bool, } #[tokio::main] @@ -154,6 +158,7 @@ async fn main() { let store = StoreConfig { store_addrs: args.store_addrs.clone(), setup_etcd: args.setup_etcd, + setup_pg: args.setup_pg, }; let runner = Runner::new( diff --git a/tests/runner/src/util.rs b/tests/runner/src/util.rs index 4bcd482a26..5baa0cd80e 100644 --- a/tests/runner/src/util.rs +++ b/tests/runner/src/util.rs @@ -305,6 +305,53 @@ pub fn stop_rm_etcd() { } } +/// Set up a PostgreSQL server in docker. +pub fn setup_pg(pg_port: u16, pg_version: Option<&str>) { + if std::process::Command::new("docker") + .args(["-v"]) + .status() + .is_err() + { + panic!("Docker is not installed"); + } + + let pg_image = if let Some(pg_version) = pg_version { + format!("postgres:{pg_version}") + } else { + "postgres:latest".to_string() + }; + let pg_password = "admin"; + let pg_user = "greptimedb"; + + let mut arg_list = vec![]; + arg_list.extend(["run", "-d"]); + + let pg_password_env = format!("POSTGRES_PASSWORD={pg_password}"); + let pg_user_env = format!("POSTGRES_USER={pg_user}"); + let pg_port_forward = format!("{pg_port}:5432"); + arg_list.extend(["-e", &pg_password_env, "-e", &pg_user_env]); + arg_list.extend(["-p", &pg_port_forward]); + + arg_list.extend(["--name", "greptimedb_pg", &pg_image]); + + let mut cmd = std::process::Command::new("docker"); + + cmd.args(arg_list); + + println!("Starting PostgreSQL with command: {:?}", cmd); + + let status = cmd.status(); + if status.is_err() { + panic!("Failed to start PostgreSQL: {:?}", status); + } else if let Ok(status) = status { + if status.success() { + println!("Started PostgreSQL with port {}", pg_port); + } else { + panic!("Failed to start PostgreSQL: {:?}", status); + } + } +} + /// Get the dir of test cases. This function only works when the runner is run /// under the project's dir because it depends on some envs set by cargo. pub fn get_case_dir(case_dir: Option) -> String {