From cbcfdf9d6535aa52bfd4be997bd43110dba1e044 Mon Sep 17 00:00:00 2001 From: Logic Date: Thu, 28 Aug 2025 17:24:14 +0800 Subject: [PATCH] feat: add optional schema for Postgres metadata tables (#6764) * feat(meta): add optional schema for Postgres metadata tables - Add `schema` option to specify a custom schema for metadata tables - Update `PgStore` and `PgElection` to support optional schema - Modify SQL templates to use schema when provided - Add tests for schema support in Postgres backend Signed-off-by: Logic * refactor(meta): remove unused `create_schema_statement` and simplify `PgSqlTemplateFactory` - Remove `create_schema_statement` from `PgSqlTemplateSet` struct - Simplify `PgSqlTemplateFactory` by removing `new` method and merging it with `with_schema` - Update related tests to reflect these changes Signed-off-by: Logic * refactor(meta-srv): remove unused imports - Remove unused import of BoxedError from common_error::ext- Remove unused import of TlsOption from servers::tls Signed-off-by: Logic * build(meta): update Postgres version and add error handling imports - Update Postgres version to 17 in docker-compose.yml - Add BoxedError import for error handling in meta-srv Signed-off-by: Logic * feat(postgres): add support for optional schema in PgElection and related components Signed-off-by: Logic * feat(postgres): add support for optional schema in PgElection and related components Signed-off-by: Logic * fix(develop): update Postgres schema commands to specify host Signed-off-by: Logic * refactor(postgres): simplify plugin options handling and update SQL examples Signed-off-by: Logic * refactor(postgres): simplify plugin options handling and update SQL examples Signed-off-by: Logic * fix(postgres): update meta_election_lock_id description for optional schema support Signed-off-by: Logic * fix(postgres): add health check and fallback wait for Postgres in CI setup * fix(postgres): update Docker setup for Postgres and add support for Postgres 15 * fix(postgres): remove redundant Postgres setup step in CI configuration * Update tests-integration/fixtures/postgres/init.sql Co-authored-by: Weny Xu * Update .github/workflows/develop.yml * Update tests-integration/fixtures/docker-compose.yml * Update src/common/meta/src/kv_backend/rds/postgres.rs Co-authored-by: Weny Xu * Update src/common/meta/src/kv_backend/rds/postgres.rs Co-authored-by: Weny Xu * Update src/common/meta/src/kv_backend/rds/postgres.rs Co-authored-by: Weny Xu * Update src/common/meta/src/kv_backend/rds/postgres.rs Co-authored-by: Weny Xu * fix: Refactor PostgreSQL backend to support optional schema in PgStore and related SQL templates * feat: Update PostgreSQL configuration and add PG15 specific integration tests * feat: Update PostgreSQL configuration and add PG15 specific integration tests * refactor(postgres): update test schemas from 'greptime_schema' to 'test_schema' * Update .github/workflows/develop.yml * refactor: minor factor Signed-off-by: WenyXu * chore: apply suggestions Signed-off-by: WenyXu * fix: fix unit test Signed-off-by: WenyXu --------- Signed-off-by: Logic Signed-off-by: WenyXu Co-authored-by: Weny Xu --- .github/workflows/develop.yml | 5 +- config/config.md | 1 + config/metasrv.example.toml | 8 + src/cli/src/bench.rs | 3 + src/cli/src/metadata/common.rs | 6 + src/cmd/tests/load_config_test.rs | 1 + src/common/meta/src/key.rs | 1 - .../meta/src/kv_backend/rds/postgres.rs | 169 ++++++++++++++---- src/common/meta/src/test_util.rs | 20 ++- src/meta-srv/src/bootstrap.rs | 12 +- src/meta-srv/src/election/rds/postgres.rs | 93 +++++++--- src/meta-srv/src/metasrv.rs | 7 + tests-integration/fixtures/docker-compose.yml | 13 ++ tests-integration/fixtures/postgres/init.sql | 10 ++ 14 files changed, 281 insertions(+), 68 deletions(-) create mode 100644 tests-integration/fixtures/postgres/init.sql diff --git a/.github/workflows/develop.yml b/.github/workflows/develop.yml index f01f8c145c..dfd2ccaab3 100644 --- a/.github/workflows/develop.yml +++ b/.github/workflows/develop.yml @@ -766,6 +766,8 @@ jobs: GT_ETCD_TLS_ENDPOINTS: https://127.0.0.1:2378 GT_ETCD_ENDPOINTS: http://127.0.0.1:2379 GT_POSTGRES_ENDPOINTS: postgres://greptimedb:admin@127.0.0.1:5432/postgres + GT_POSTGRES15_ENDPOINTS: postgres://test_user:test_password@127.0.0.1:5433/postgres + GT_POSTGRES15_SCHEMA: test_schema GT_MYSQL_ENDPOINTS: mysql://greptimedb:admin@127.0.0.1:3306/mysql GT_KAFKA_ENDPOINTS: 127.0.0.1:9092 GT_KAFKA_SASL_ENDPOINTS: 127.0.0.1:9093 @@ -819,6 +821,8 @@ jobs: GT_ETCD_TLS_ENDPOINTS: https://127.0.0.1:2378 GT_ETCD_ENDPOINTS: http://127.0.0.1:2379 GT_POSTGRES_ENDPOINTS: postgres://greptimedb:admin@127.0.0.1:5432/postgres + GT_POSTGRES15_ENDPOINTS: postgres://test_user:test_password@127.0.0.1:5433/postgres + GT_POSTGRES15_SCHEMA: test_schema GT_MYSQL_ENDPOINTS: mysql://greptimedb:admin@127.0.0.1:3306/mysql GT_KAFKA_ENDPOINTS: 127.0.0.1:9092 GT_KAFKA_SASL_ENDPOINTS: 127.0.0.1:9093 @@ -850,4 +854,3 @@ jobs: # mkdir -p ./bins/current # tar -xvf ./bins.tar.gz --strip-components=1 -C ./bins/current # - run: ./tests/compat/test-compat.sh 0.6.0 - \ No newline at end of file diff --git a/config/config.md b/config/config.md index 1d92390926..847c18cf08 100644 --- a/config/config.md +++ b/config/config.md @@ -343,6 +343,7 @@ | `store_key_prefix` | String | `""` | If it's not empty, the metasrv will store all data with this key prefix. | | `backend` | String | `etcd_store` | The datastore for meta server.
Available values:
- `etcd_store` (default value)
- `memory_store`
- `postgres_store`
- `mysql_store` | | `meta_table_name` | String | `greptime_metakv` | Table name in RDS to store metadata. Effect when using a RDS kvbackend.
**Only used when backend is `postgres_store`.** | +| `meta_schema_name` | String | `greptime_schema` | Optional PostgreSQL schema for metadata table and election table name qualification.
When PostgreSQL public schema is not writable (e.g., PostgreSQL 15+ with restricted public),
set this to a writable schema. GreptimeDB will use `meta_schema_name`.`meta_table_name`.
GreptimeDB will NOT create the schema automatically; please ensure it exists or the user has permission.
**Only used when backend is `postgres_store`.** | | `meta_election_lock_id` | Integer | `1` | Advisory lock id in PostgreSQL for election. Effect when using PostgreSQL as kvbackend
Only used when backend is `postgres_store`. | | `selector` | String | `round_robin` | Datanode selector type.
- `round_robin` (default value)
- `lease_based`
- `load_based`
For details, please see "https://docs.greptime.com/developer-guide/metasrv/selector". | | `use_memory_store` | Bool | `false` | Store data in memory. | diff --git a/config/metasrv.example.toml b/config/metasrv.example.toml index d9817b0cb0..736f035949 100644 --- a/config/metasrv.example.toml +++ b/config/metasrv.example.toml @@ -23,6 +23,14 @@ backend = "etcd_store" ## **Only used when backend is `postgres_store`.** meta_table_name = "greptime_metakv" +## Optional PostgreSQL schema for metadata table and election table name qualification. +## When PostgreSQL public schema is not writable (e.g., PostgreSQL 15+ with restricted public), +## set this to a writable schema. GreptimeDB will use `meta_schema_name`.`meta_table_name`. +## GreptimeDB will NOT create the schema automatically; please ensure it exists or the user has permission. +## **Only used when backend is `postgres_store`.** + +meta_schema_name = "greptime_schema" + ## Advisory lock id in PostgreSQL for election. Effect when using PostgreSQL as kvbackend ## Only used when backend is `postgres_store`. meta_election_lock_id = 1 diff --git a/src/cli/src/bench.rs b/src/cli/src/bench.rs index bbd1895f36..34fbbe9fac 100644 --- a/src/cli/src/bench.rs +++ b/src/cli/src/bench.rs @@ -66,6 +66,9 @@ pub struct BenchTableMetadataCommand { #[cfg(feature = "pg_kvbackend")] #[clap(long)] postgres_addr: Option, + #[cfg(feature = "pg_kvbackend")] + #[clap(long)] + postgres_schema: Option, #[cfg(feature = "mysql_kvbackend")] #[clap(long)] mysql_addr: Option, diff --git a/src/cli/src/metadata/common.rs b/src/cli/src/metadata/common.rs index 960b13f065..2a9b2d1c4d 100644 --- a/src/cli/src/metadata/common.rs +++ b/src/cli/src/metadata/common.rs @@ -57,6 +57,10 @@ pub(crate) struct StoreConfig { #[clap(long, default_value = common_meta::kv_backend::DEFAULT_META_TABLE_NAME)] meta_table_name: String, + /// Optional PostgreSQL schema for metadata table (defaults to current search_path if unset). + #[cfg(feature = "pg_kvbackend")] + #[clap(long)] + meta_schema_name: Option, /// TLS mode for backend store connections (etcd, PostgreSQL, MySQL) #[clap(long = "backend-tls-mode", value_enum, default_value = "disable")] backend_tls_mode: TlsMode, @@ -110,8 +114,10 @@ impl StoreConfig { let pool = meta_srv::bootstrap::create_postgres_pool(store_addrs, None) .await .map_err(BoxedError::new)?; + let schema_name = self.meta_schema_name.as_deref(); Ok(common_meta::kv_backend::rds::PgStore::with_pg_pool( pool, + schema_name, table_name, max_txn_ops, ) diff --git a/src/cmd/tests/load_config_test.rs b/src/cmd/tests/load_config_test.rs index e8b85b7c2a..9058a579b0 100644 --- a/src/cmd/tests/load_config_test.rs +++ b/src/cmd/tests/load_config_test.rs @@ -199,6 +199,7 @@ fn test_load_metasrv_example_config() { ca_cert_path: String::new(), watch: false, }), + meta_schema_name: Some("greptime_schema".to_string()), ..Default::default() }, ..Default::default() diff --git a/src/common/meta/src/key.rs b/src/common/meta/src/key.rs index 3688091843..ea1ca22971 100644 --- a/src/common/meta/src/key.rs +++ b/src/common/meta/src/key.rs @@ -628,7 +628,6 @@ impl TableMetadataManager { &self.topic_region_manager } - #[cfg(feature = "testing")] pub fn kv_backend(&self) -> &KvBackendRef { &self.kv_backend } diff --git a/src/common/meta/src/kv_backend/rds/postgres.rs b/src/common/meta/src/kv_backend/rds/postgres.rs index e5d2ca98d0..fa8abadffa 100644 --- a/src/common/meta/src/kv_backend/rds/postgres.rs +++ b/src/common/meta/src/kv_backend/rds/postgres.rs @@ -192,50 +192,61 @@ fn pg_generate_in_placeholders(from: usize, to: usize) -> Vec { /// Factory for building sql templates. struct PgSqlTemplateFactory<'a> { + schema_name: Option<&'a str>, table_name: &'a str, } impl<'a> PgSqlTemplateFactory<'a> { - /// Creates a new [`SqlTemplateFactory`] with the given table name. - fn new(table_name: &'a str) -> Self { - Self { table_name } + /// Creates a new factory with optional schema. + fn new(schema_name: Option<&'a str>, table_name: &'a str) -> Self { + Self { + schema_name, + table_name, + } } /// Builds the template set for the given table name. fn build(&self) -> PgSqlTemplateSet { - let table_name = self.table_name; + let table_ident = Self::format_table_ident(self.schema_name, self.table_name); // Some of queries don't end with `;`, because we need to add `LIMIT` clause. PgSqlTemplateSet { - table_name: table_name.to_string(), + table_ident: table_ident.clone(), + // Do not attempt to create schema implicitly to avoid extra privileges requirement. create_table_statement: format!( - "CREATE TABLE IF NOT EXISTS \"{table_name}\"(k bytea PRIMARY KEY, v bytea)", + "CREATE TABLE IF NOT EXISTS {table_ident}(k bytea PRIMARY KEY, v bytea)", ), range_template: RangeTemplate { - point: format!("SELECT k, v FROM \"{table_name}\" WHERE k = $1"), + point: format!("SELECT k, v FROM {table_ident} WHERE k = $1"), range: format!( - "SELECT k, v FROM \"{table_name}\" WHERE k >= $1 AND k < $2 ORDER BY k" + "SELECT k, v FROM {table_ident} WHERE k >= $1 AND k < $2 ORDER BY k" ), - full: format!("SELECT k, v FROM \"{table_name}\" ORDER BY k"), - left_bounded: format!("SELECT k, v FROM \"{table_name}\" WHERE k >= $1 ORDER BY k"), - prefix: format!("SELECT k, v FROM \"{table_name}\" WHERE k LIKE $1 ORDER BY k"), + full: format!("SELECT k, v FROM {table_ident} ORDER BY k"), + left_bounded: format!("SELECT k, v FROM {table_ident} WHERE k >= $1 ORDER BY k"), + prefix: format!("SELECT k, v FROM {table_ident} WHERE k LIKE $1 ORDER BY k"), }, delete_template: RangeTemplate { - point: format!("DELETE FROM \"{table_name}\" WHERE k = $1 RETURNING k,v;"), - range: format!( - "DELETE FROM \"{table_name}\" WHERE k >= $1 AND k < $2 RETURNING k,v;" - ), - full: format!("DELETE FROM \"{table_name}\" RETURNING k,v"), - left_bounded: format!("DELETE FROM \"{table_name}\" WHERE k >= $1 RETURNING k,v;"), - prefix: format!("DELETE FROM \"{table_name}\" WHERE k LIKE $1 RETURNING k,v;"), + point: format!("DELETE FROM {table_ident} WHERE k = $1 RETURNING k,v;"), + range: format!("DELETE FROM {table_ident} WHERE k >= $1 AND k < $2 RETURNING k,v;"), + full: format!("DELETE FROM {table_ident} RETURNING k,v"), + left_bounded: format!("DELETE FROM {table_ident} WHERE k >= $1 RETURNING k,v;"), + prefix: format!("DELETE FROM {table_ident} WHERE k LIKE $1 RETURNING k,v;"), }, } } + + /// Formats the table reference with schema if provided. + fn format_table_ident(schema_name: Option<&str>, table_name: &str) -> String { + match schema_name { + Some(s) if !s.is_empty() => format!("\"{}\".\"{}\"", s, table_name), + _ => format!("\"{}\"", table_name), + } + } } /// Templates for the given table name. #[derive(Debug, Clone)] pub struct PgSqlTemplateSet { - table_name: String, + table_ident: String, create_table_statement: String, range_template: RangeTemplate, delete_template: RangeTemplate, @@ -244,27 +255,24 @@ pub struct PgSqlTemplateSet { impl PgSqlTemplateSet { /// Generates the sql for batch get. fn generate_batch_get_query(&self, key_len: usize) -> String { - let table_name = &self.table_name; let in_clause = pg_generate_in_placeholders(1, key_len).join(", "); format!( - "SELECT k, v FROM \"{table_name}\" WHERE k in ({});", - in_clause + "SELECT k, v FROM {} WHERE k in ({});", + self.table_ident, in_clause ) } /// Generates the sql for batch delete. fn generate_batch_delete_query(&self, key_len: usize) -> String { - let table_name = &self.table_name; let in_clause = pg_generate_in_placeholders(1, key_len).join(", "); format!( - "DELETE FROM \"{table_name}\" WHERE k in ({}) RETURNING k,v;", - in_clause + "DELETE FROM {} WHERE k in ({}) RETURNING k,v;", + self.table_ident, in_clause ) } /// Generates the sql for batch upsert. fn generate_batch_upsert_query(&self, kv_len: usize) -> String { - let table_name = &self.table_name; let in_placeholders: Vec = (1..=kv_len).map(|i| format!("${}", i)).collect(); let in_clause = in_placeholders.join(", "); let mut param_index = kv_len + 1; @@ -278,9 +286,9 @@ impl PgSqlTemplateSet { format!( r#" WITH prev AS ( - SELECT k,v FROM "{table_name}" WHERE k IN ({in_clause}) + SELECT k,v FROM {table} WHERE k IN ({in_clause}) ), update AS ( - INSERT INTO "{table_name}" (k, v) VALUES + INSERT INTO {table} (k, v) VALUES {values_clause} ON CONFLICT ( k @@ -289,7 +297,10 @@ impl PgSqlTemplateSet { ) SELECT k, v FROM prev; - "# + "#, + table = self.table_ident, + in_clause = in_clause, + values_clause = values_clause ) } } @@ -835,7 +846,7 @@ impl PgStore { .context(CreatePostgresPoolSnafu)?, }; - Self::with_pg_pool(pool, table_name, max_txn_ops).await + Self::with_pg_pool(pool, None, table_name, max_txn_ops).await } /// Create [PgStore] impl of [KvBackendRef] from url (backward compatibility). @@ -843,15 +854,14 @@ impl PgStore { Self::with_url_and_tls(url, table_name, max_txn_ops, None).await } - /// Create [PgStore] impl of [KvBackendRef] from [deadpool_postgres::Pool]. + /// Create [PgStore] impl of [KvBackendRef] from [deadpool_postgres::Pool] with optional schema. pub async fn with_pg_pool( pool: Pool, + schema_name: Option<&str>, table_name: &str, max_txn_ops: usize, ) -> Result { - // This step ensures the postgres metadata backend is ready to use. - // We check if greptime_metakv table exists, and we will create a new table - // if it does not exist. + // Ensure the postgres metadata backend is ready to use. let client = match pool.get().await { Ok(client) => client, Err(e) => { @@ -861,8 +871,9 @@ impl PgStore { .fail(); } }; - let template_factory = PgSqlTemplateFactory::new(table_name); + let template_factory = PgSqlTemplateFactory::new(schema_name, table_name); let sql_template_set = template_factory.build(); + // Do not attempt to create schema implicitly. client .execute(&sql_template_set.create_table_statement, &[]) .await @@ -890,7 +901,7 @@ mod tests { test_txn_compare_less, test_txn_compare_not_equal, test_txn_one_compare_op, text_txn_multi_compare_op, unprepare_kv, }; - use crate::maybe_skip_postgres_integration_test; + use crate::{maybe_skip_postgres15_integration_test, maybe_skip_postgres_integration_test}; async fn build_pg_kv_backend(table_name: &str) -> Option { let endpoints = std::env::var("GT_POSTGRES_ENDPOINTS").unwrap_or_default(); @@ -905,8 +916,10 @@ mod tests { .context(CreatePostgresPoolSnafu) .unwrap(); let client = pool.get().await.unwrap(); - let template_factory = PgSqlTemplateFactory::new(table_name); + // use the default schema (i.e., public) + let template_factory = PgSqlTemplateFactory::new(None, table_name); let sql_templates = template_factory.build(); + // Do not attempt to create schema implicitly. client .execute(&sql_templates.create_table_statement, &[]) .await @@ -923,6 +936,61 @@ mod tests { }) } + async fn build_pg15_pool() -> Option { + let url = std::env::var("GT_POSTGRES15_ENDPOINTS").unwrap_or_default(); + if url.is_empty() { + return None; + } + let mut cfg = Config::new(); + cfg.url = Some(url); + let pool = cfg + .create_pool(Some(Runtime::Tokio1), NoTls) + .context(CreatePostgresPoolSnafu) + .ok()?; + Some(pool) + } + + #[tokio::test] + async fn test_pg15_create_table_in_public_should_fail() { + maybe_skip_postgres15_integration_test!(); + let Some(pool) = build_pg15_pool().await else { + return; + }; + let res = PgStore::with_pg_pool(pool, None, "pg15_public_should_fail", 128).await; + assert!( + res.is_err(), + "creating table in public should fail for test_user" + ); + } + + #[tokio::test] + async fn test_pg15_create_table_in_test_schema_and_crud_should_succeed() { + maybe_skip_postgres15_integration_test!(); + let Some(pool) = build_pg15_pool().await else { + return; + }; + let schema_name = std::env::var("GT_POSTGRES15_SCHEMA").unwrap(); + let client = pool.get().await.unwrap(); + let factory = PgSqlTemplateFactory::new(Some(&schema_name), "pg15_ok"); + let templates = factory.build(); + client + .execute(&templates.create_table_statement, &[]) + .await + .unwrap(); + let kv = PgStore { + max_txn_ops: 128, + sql_template_set: templates, + txn_retry_count: RDS_STORE_TXN_RETRY_COUNT, + executor_factory: PgExecutorFactory { pool }, + _phantom: PhantomData, + }; + let prefix = b"pg15_crud/"; + prepare_kv_with_prefix(&kv, prefix.to_vec()).await; + test_kv_put_with_prefix(&kv, prefix.to_vec()).await; + test_kv_batch_get_with_prefix(&kv, prefix.to_vec()).await; + unprepare_kv(&kv, prefix).await; + } + #[tokio::test] async fn test_pg_put() { maybe_skip_postgres_integration_test!(); @@ -1024,4 +1092,31 @@ mod tests { test_txn_compare_less(&kv_backend).await; test_txn_compare_not_equal(&kv_backend).await; } + + #[test] + fn test_pg_template_with_schema() { + let factory = PgSqlTemplateFactory::new(Some("test_schema"), "greptime_metakv"); + let t = factory.build(); + assert!(t + .create_table_statement + .contains("\"test_schema\".\"greptime_metakv\"")); + let upsert = t.generate_batch_upsert_query(1); + assert!(upsert.contains("\"test_schema\".\"greptime_metakv\"")); + let get = t.generate_batch_get_query(1); + assert!(get.contains("\"test_schema\".\"greptime_metakv\"")); + let del = t.generate_batch_delete_query(1); + assert!(del.contains("\"test_schema\".\"greptime_metakv\"")); + } + + #[test] + fn test_format_table_ident() { + let t = PgSqlTemplateFactory::format_table_ident(None, "test_table"); + assert_eq!(t, "\"test_table\""); + + let t = PgSqlTemplateFactory::format_table_ident(Some("test_schema"), "test_table"); + assert_eq!(t, "\"test_schema\".\"test_table\""); + + let t = PgSqlTemplateFactory::format_table_ident(Some(""), "test_table"); + assert_eq!(t, "\"test_table\""); + } } diff --git a/src/common/meta/src/test_util.rs b/src/common/meta/src/test_util.rs index 9d8e9a6822..95a2d23800 100644 --- a/src/common/meta/src/test_util.rs +++ b/src/common/meta/src/test_util.rs @@ -260,7 +260,7 @@ pub async fn test_kafka_topic_pool( /// Skip the test if the environment variable `GT_POSTGRES_ENDPOINTS` is not set. /// /// The format of the environment variable is: -/// ``` +/// ```text /// GT_POSTGRES_ENDPOINTS=localhost:9092,localhost:9093 /// ``` macro_rules! maybe_skip_postgres_integration_test { @@ -276,7 +276,7 @@ macro_rules! maybe_skip_postgres_integration_test { /// Skip the test if the environment variable `GT_MYSQL_ENDPOINTS` is not set. /// /// The format of the environment variable is: -/// ``` +/// ```text /// GT_MYSQL_ENDPOINTS=localhost:9092,localhost:9093 /// ``` macro_rules! maybe_skip_mysql_integration_test { @@ -287,3 +287,19 @@ macro_rules! maybe_skip_mysql_integration_test { } }; } + +#[macro_export] +/// Skip the test if the environment variable `GT_POSTGRES15_ENDPOINTS` is not set. +/// +/// The format of the environment variable is: +/// ```text +/// GT_POSTGRES15_ENDPOINTS=postgres://user:password@127.0.0.1:5433/postgres +/// ``` +macro_rules! maybe_skip_postgres15_integration_test { + () => { + if std::env::var("GT_POSTGRES15_ENDPOINTS").is_err() { + common_telemetry::warn!("The PG15 endpoints is empty, skipping the test"); + return; + } + }; +} diff --git a/src/meta-srv/src/bootstrap.rs b/src/meta-srv/src/bootstrap.rs index 86c917051d..6bd7b95b55 100644 --- a/src/meta-srv/src/bootstrap.rs +++ b/src/meta-srv/src/bootstrap.rs @@ -332,15 +332,21 @@ pub async fn metasrv_builder( opts.store_key_prefix.clone(), candidate_lease_ttl, meta_lease_ttl, + opts.meta_schema_name.as_deref(), &opts.meta_table_name, opts.meta_election_lock_id, ) .await?; let pool = create_postgres_pool(&opts.store_addrs, opts.backend_tls.clone()).await?; - let kv_backend = PgStore::with_pg_pool(pool, &opts.meta_table_name, opts.max_txn_ops) - .await - .context(error::KvBackendSnafu)?; + let kv_backend = PgStore::with_pg_pool( + pool, + opts.meta_schema_name.as_deref(), + &opts.meta_table_name, + opts.max_txn_ops, + ) + .await + .context(error::KvBackendSnafu)?; (kv_backend, Some(election)) } diff --git a/src/meta-srv/src/election/rds/postgres.rs b/src/meta-srv/src/election/rds/postgres.rs index 4d50fe0867..870f5377e4 100644 --- a/src/meta-srv/src/election/rds/postgres.rs +++ b/src/meta-srv/src/election/rds/postgres.rs @@ -38,6 +38,7 @@ use crate::metasrv::{ElectionRef, LeaderValue, MetasrvNodeInfo}; struct ElectionSqlFactory<'a> { lock_id: u64, + schema_name: Option<&'a str>, table_name: &'a str, } @@ -88,13 +89,21 @@ struct ElectionSqlSet { } impl<'a> ElectionSqlFactory<'a> { - fn new(lock_id: u64, table_name: &'a str) -> Self { + fn new(lock_id: u64, schema_name: Option<&'a str>, table_name: &'a str) -> Self { Self { lock_id, + schema_name, table_name, } } + fn table_ident(&self) -> String { + match self.schema_name { + Some(s) if !s.is_empty() => format!("\"{}\".\"{}\"", s, self.table_name), + _ => format!("\"{}\"", self.table_name), + } + } + fn build(self) -> ElectionSqlSet { ElectionSqlSet { campaign: self.campaign_sql(), @@ -116,47 +125,54 @@ impl<'a> ElectionSqlFactory<'a> { } fn put_value_with_lease_sql(&self) -> String { + let table = self.table_ident(); format!( r#"WITH prev AS ( - SELECT k, v FROM "{}" WHERE k = $1 + SELECT k, v FROM {table} WHERE k = $1 ), insert AS ( - INSERT INTO "{}" - VALUES($1, convert_to($2 || '{}' || TO_CHAR(CURRENT_TIMESTAMP + INTERVAL '1 second' * $3, 'YYYY-MM-DD HH24:MI:SS.MS'), 'UTF8')) + INSERT INTO {table} + VALUES($1, convert_to($2 || '{lease_sep}' || TO_CHAR(CURRENT_TIMESTAMP + INTERVAL '1 second' * $3, 'YYYY-MM-DD HH24:MI:SS.MS'), 'UTF8')) ON CONFLICT (k) DO NOTHING ) SELECT k, v FROM prev; "#, - self.table_name, self.table_name, LEASE_SEP + table = table, + lease_sep = LEASE_SEP ) } fn update_value_with_lease_sql(&self) -> String { + let table = self.table_ident(); format!( - r#"UPDATE "{}" - SET v = convert_to($3 || '{}' || TO_CHAR(CURRENT_TIMESTAMP + INTERVAL '1 second' * $4, 'YYYY-MM-DD HH24:MI:SS.MS'), 'UTF8') + r#"UPDATE {table} + SET v = convert_to($3 || '{lease_sep}' || TO_CHAR(CURRENT_TIMESTAMP + INTERVAL '1 second' * $4, 'YYYY-MM-DD HH24:MI:SS.MS'), 'UTF8') WHERE k = $1 AND v = $2"#, - self.table_name, LEASE_SEP + table = table, + lease_sep = LEASE_SEP ) } fn get_value_with_lease_sql(&self) -> String { + let table = self.table_ident(); format!( - r#"SELECT v, TO_CHAR(CURRENT_TIMESTAMP, 'YYYY-MM-DD HH24:MI:SS.MS') FROM "{}" WHERE k = $1"#, - self.table_name + r#"SELECT v, TO_CHAR(CURRENT_TIMESTAMP, 'YYYY-MM-DD HH24:MI:SS.MS') FROM {table} WHERE k = $1"#, + table = table ) } fn get_value_with_lease_by_prefix_sql(&self) -> String { + let table = self.table_ident(); format!( - r#"SELECT v, TO_CHAR(CURRENT_TIMESTAMP, 'YYYY-MM-DD HH24:MI:SS.MS') FROM "{}" WHERE k LIKE $1"#, - self.table_name + r#"SELECT v, TO_CHAR(CURRENT_TIMESTAMP, 'YYYY-MM-DD HH24:MI:SS.MS') FROM {table} WHERE k LIKE $1"#, + table = table ) } fn delete_value_sql(&self) -> String { + let table = self.table_ident(); format!( - "DELETE FROM \"{}\" WHERE k = $1 RETURNING k,v;", - self.table_name + "DELETE FROM {table} WHERE k = $1 RETURNING k,v;", + table = table ) } } @@ -299,16 +315,23 @@ impl PgElection { Ok(()) } + #[allow(clippy::too_many_arguments)] pub async fn with_pg_client( leader_value: String, pg_client: ElectionPgClient, store_key_prefix: String, candidate_lease_ttl: Duration, meta_lease_ttl: Duration, + schema_name: Option<&str>, table_name: &str, lock_id: u64, ) -> Result { - let sql_factory = ElectionSqlFactory::new(lock_id, table_name); + if let Some(s) = schema_name { + common_telemetry::info!("PgElection uses schema: {}", s); + } else { + common_telemetry::info!("PgElection uses default search_path (no schema provided)"); + } + let sql_factory = ElectionSqlFactory::new(lock_id, schema_name, table_name); let tx = listen_leader_change(leader_value.clone()); Ok(Arc::new(Self { @@ -638,8 +661,8 @@ impl PgElection { /// after a period of time during which other leaders have been elected and stepped down. /// - **Case 1.4**: If no lease information is found, it also steps down and re-initiates the campaign. /// - /// - **Case 2**: If the current instance is not leader previously, it calls the - /// `elected` method as a newly elected leader. + /// - **Case 2**: If the current instance is not leader previously, it calls the `elected` method + /// as a newly elected leader. async fn leader_action(&self) -> Result<()> { let key = self.election_key(); // Case 1 @@ -881,7 +904,7 @@ mod tests { store_key_prefix: uuid, candidate_lease_ttl, meta_lease_ttl, - sql_set: ElectionSqlFactory::new(28319, table_name).build(), + sql_set: ElectionSqlFactory::new(28319, None, table_name).build(), }; let res = pg_election @@ -969,7 +992,7 @@ mod tests { store_key_prefix, candidate_lease_ttl, meta_lease_ttl, - sql_set: ElectionSqlFactory::new(28319, &table_name).build(), + sql_set: ElectionSqlFactory::new(28319, None, &table_name).build(), }; let node_info = MetasrvNodeInfo { @@ -1026,7 +1049,7 @@ mod tests { store_key_prefix: uuid.clone(), candidate_lease_ttl, meta_lease_ttl, - sql_set: ElectionSqlFactory::new(28319, table_name).build(), + sql_set: ElectionSqlFactory::new(28319, None, table_name).build(), }; let candidates = pg_election.all_candidates().await.unwrap(); @@ -1081,7 +1104,7 @@ mod tests { store_key_prefix: uuid, candidate_lease_ttl, meta_lease_ttl, - sql_set: ElectionSqlFactory::new(28320, table_name).build(), + sql_set: ElectionSqlFactory::new(28320, None, table_name).build(), }; leader_pg_election.elected().await.unwrap(); @@ -1206,7 +1229,7 @@ mod tests { store_key_prefix: uuid, candidate_lease_ttl, meta_lease_ttl, - sql_set: ElectionSqlFactory::new(28321, table_name).build(), + sql_set: ElectionSqlFactory::new(28321, None, table_name).build(), }; // Step 1: No leader exists, campaign and elected. @@ -1473,7 +1496,7 @@ mod tests { store_key_prefix: uuid.clone(), candidate_lease_ttl, meta_lease_ttl, - sql_set: ElectionSqlFactory::new(28322, table_name).build(), + sql_set: ElectionSqlFactory::new(28322, None, table_name).build(), }; let leader_client = create_postgres_client( @@ -1494,7 +1517,7 @@ mod tests { store_key_prefix: uuid, candidate_lease_ttl, meta_lease_ttl, - sql_set: ElectionSqlFactory::new(28322, table_name).build(), + sql_set: ElectionSqlFactory::new(28322, None, table_name).build(), }; leader_pg_election @@ -1578,4 +1601,26 @@ mod tests { client.reset_client().await.unwrap(); let _ = client.query("SELECT 1", &[]).await.unwrap(); } + + #[test] + fn test_election_sql_with_schema() { + let f = ElectionSqlFactory::new(42, Some("test_schema"), "greptime_metakv"); + let s = f.build(); + assert!(s.campaign.contains("pg_try_advisory_lock")); + assert!(s + .put_value_with_lease + .contains("\"test_schema\".\"greptime_metakv\"")); + assert!(s + .update_value_with_lease + .contains("\"test_schema\".\"greptime_metakv\"")); + assert!(s + .get_value_with_lease + .contains("\"test_schema\".\"greptime_metakv\"")); + assert!(s + .get_value_with_lease_by_prefix + .contains("\"test_schema\".\"greptime_metakv\"")); + assert!(s + .delete_value + .contains("\"test_schema\".\"greptime_metakv\"")); + } } diff --git a/src/meta-srv/src/metasrv.rs b/src/meta-srv/src/metasrv.rs index 28cd2b13b9..3f44904d70 100644 --- a/src/meta-srv/src/metasrv.rs +++ b/src/meta-srv/src/metasrv.rs @@ -200,6 +200,9 @@ pub struct MetasrvOptions { #[cfg(feature = "pg_kvbackend")] /// Lock id for meta kv election. Only effect when using pg_kvbackend. pub meta_election_lock_id: u64, + #[cfg(feature = "pg_kvbackend")] + /// Optional PostgreSQL schema for metadata table (defaults to current search_path if empty). + pub meta_schema_name: Option, #[serde(with = "humantime_serde")] pub node_max_idle_time: Duration, /// The event recorder options. @@ -244,6 +247,8 @@ impl fmt::Debug for MetasrvOptions { #[cfg(feature = "pg_kvbackend")] debug_struct.field("meta_election_lock_id", &self.meta_election_lock_id); + #[cfg(feature = "pg_kvbackend")] + debug_struct.field("meta_schema_name", &self.meta_schema_name); debug_struct .field("node_max_idle_time", &self.node_max_idle_time) @@ -297,6 +302,8 @@ impl Default for MetasrvOptions { meta_table_name: common_meta::kv_backend::DEFAULT_META_TABLE_NAME.to_string(), #[cfg(feature = "pg_kvbackend")] meta_election_lock_id: common_meta::kv_backend::DEFAULT_META_ELECTION_LOCK_ID, + #[cfg(feature = "pg_kvbackend")] + meta_schema_name: None, node_max_idle_time: Duration::from_secs(24 * 60 * 60), event_recorder: EventRecorderOptions::default(), stats_persistence: StatsPersistenceOptions::default(), diff --git a/tests-integration/fixtures/docker-compose.yml b/tests-integration/fixtures/docker-compose.yml index 8e435e47d4..164331f692 100644 --- a/tests-integration/fixtures/docker-compose.yml +++ b/tests-integration/fixtures/docker-compose.yml @@ -94,6 +94,19 @@ services: - POSTGRES_DB=postgres - POSTGRES_PASSWORD=admin + postgres15: + image: docker.io/postgres:15-alpine + ports: + - 5433:5432 + volumes: + - ~/apps/postgres15:/var/lib/postgresql/data + - ./postgres/init.sql:/docker-entrypoint-initdb.d/01-init.sql:ro + environment: + - POSTGRES_USER=greptimedb + - POSTGRES_DB=postgres + - POSTGRES_PASSWORD=admin + + mysql: image: bitnami/mysql:5.7 ports: diff --git a/tests-integration/fixtures/postgres/init.sql b/tests-integration/fixtures/postgres/init.sql new file mode 100644 index 0000000000..7d7327bff7 --- /dev/null +++ b/tests-integration/fixtures/postgres/init.sql @@ -0,0 +1,10 @@ +-- 1. Create a non-superuser +CREATE USER test_user WITH PASSWORD 'test_password'; + +-- 2. Prevent non-superusers from creating tables in the public schema +-- Explicitly revoke CREATE permission from PUBLIC to avoid inherited grants +REVOKE CREATE ON SCHEMA public FROM PUBLIC; +REVOKE CREATE ON SCHEMA public FROM test_user; + +-- 3. Create a separate schema owned by the non-superuser +CREATE SCHEMA test_schema AUTHORIZATION test_user;