diff --git a/.github/workflows/develop.yml b/.github/workflows/develop.yml
index f01f8c145c..dfd2ccaab3 100644
--- a/.github/workflows/develop.yml
+++ b/.github/workflows/develop.yml
@@ -766,6 +766,8 @@ jobs:
GT_ETCD_TLS_ENDPOINTS: https://127.0.0.1:2378
GT_ETCD_ENDPOINTS: http://127.0.0.1:2379
GT_POSTGRES_ENDPOINTS: postgres://greptimedb:admin@127.0.0.1:5432/postgres
+ GT_POSTGRES15_ENDPOINTS: postgres://test_user:test_password@127.0.0.1:5433/postgres
+ GT_POSTGRES15_SCHEMA: test_schema
GT_MYSQL_ENDPOINTS: mysql://greptimedb:admin@127.0.0.1:3306/mysql
GT_KAFKA_ENDPOINTS: 127.0.0.1:9092
GT_KAFKA_SASL_ENDPOINTS: 127.0.0.1:9093
@@ -819,6 +821,8 @@ jobs:
GT_ETCD_TLS_ENDPOINTS: https://127.0.0.1:2378
GT_ETCD_ENDPOINTS: http://127.0.0.1:2379
GT_POSTGRES_ENDPOINTS: postgres://greptimedb:admin@127.0.0.1:5432/postgres
+ GT_POSTGRES15_ENDPOINTS: postgres://test_user:test_password@127.0.0.1:5433/postgres
+ GT_POSTGRES15_SCHEMA: test_schema
GT_MYSQL_ENDPOINTS: mysql://greptimedb:admin@127.0.0.1:3306/mysql
GT_KAFKA_ENDPOINTS: 127.0.0.1:9092
GT_KAFKA_SASL_ENDPOINTS: 127.0.0.1:9093
@@ -850,4 +854,3 @@ jobs:
# mkdir -p ./bins/current
# tar -xvf ./bins.tar.gz --strip-components=1 -C ./bins/current
# - run: ./tests/compat/test-compat.sh 0.6.0
-
\ No newline at end of file
diff --git a/config/config.md b/config/config.md
index 1d92390926..847c18cf08 100644
--- a/config/config.md
+++ b/config/config.md
@@ -343,6 +343,7 @@
| `store_key_prefix` | String | `""` | If it's not empty, the metasrv will store all data with this key prefix. |
| `backend` | String | `etcd_store` | The datastore for meta server.
Available values:
- `etcd_store` (default value)
- `memory_store`
- `postgres_store`
- `mysql_store` |
| `meta_table_name` | String | `greptime_metakv` | Table name in RDS to store metadata. Effect when using a RDS kvbackend.
**Only used when backend is `postgres_store`.** |
+| `meta_schema_name` | String | `greptime_schema` | Optional PostgreSQL schema for metadata table and election table name qualification.
When PostgreSQL public schema is not writable (e.g., PostgreSQL 15+ with restricted public),
set this to a writable schema. GreptimeDB will use `meta_schema_name`.`meta_table_name`.
GreptimeDB will NOT create the schema automatically; please ensure it exists or the user has permission.
**Only used when backend is `postgres_store`.** |
| `meta_election_lock_id` | Integer | `1` | Advisory lock id in PostgreSQL for election. Effect when using PostgreSQL as kvbackend
Only used when backend is `postgres_store`. |
| `selector` | String | `round_robin` | Datanode selector type.
- `round_robin` (default value)
- `lease_based`
- `load_based`
For details, please see "https://docs.greptime.com/developer-guide/metasrv/selector". |
| `use_memory_store` | Bool | `false` | Store data in memory. |
diff --git a/config/metasrv.example.toml b/config/metasrv.example.toml
index d9817b0cb0..736f035949 100644
--- a/config/metasrv.example.toml
+++ b/config/metasrv.example.toml
@@ -23,6 +23,14 @@ backend = "etcd_store"
## **Only used when backend is `postgres_store`.**
meta_table_name = "greptime_metakv"
+## Optional PostgreSQL schema for metadata table and election table name qualification.
+## When PostgreSQL public schema is not writable (e.g., PostgreSQL 15+ with restricted public),
+## set this to a writable schema. GreptimeDB will use `meta_schema_name`.`meta_table_name`.
+## GreptimeDB will NOT create the schema automatically; please ensure it exists or the user has permission.
+## **Only used when backend is `postgres_store`.**
+
+meta_schema_name = "greptime_schema"
+
## Advisory lock id in PostgreSQL for election. Effect when using PostgreSQL as kvbackend
## Only used when backend is `postgres_store`.
meta_election_lock_id = 1
diff --git a/src/cli/src/bench.rs b/src/cli/src/bench.rs
index bbd1895f36..34fbbe9fac 100644
--- a/src/cli/src/bench.rs
+++ b/src/cli/src/bench.rs
@@ -66,6 +66,9 @@ pub struct BenchTableMetadataCommand {
#[cfg(feature = "pg_kvbackend")]
#[clap(long)]
postgres_addr: Option,
+ #[cfg(feature = "pg_kvbackend")]
+ #[clap(long)]
+ postgres_schema: Option,
#[cfg(feature = "mysql_kvbackend")]
#[clap(long)]
mysql_addr: Option,
diff --git a/src/cli/src/metadata/common.rs b/src/cli/src/metadata/common.rs
index 960b13f065..2a9b2d1c4d 100644
--- a/src/cli/src/metadata/common.rs
+++ b/src/cli/src/metadata/common.rs
@@ -57,6 +57,10 @@ pub(crate) struct StoreConfig {
#[clap(long, default_value = common_meta::kv_backend::DEFAULT_META_TABLE_NAME)]
meta_table_name: String,
+ /// Optional PostgreSQL schema for metadata table (defaults to current search_path if unset).
+ #[cfg(feature = "pg_kvbackend")]
+ #[clap(long)]
+ meta_schema_name: Option,
/// TLS mode for backend store connections (etcd, PostgreSQL, MySQL)
#[clap(long = "backend-tls-mode", value_enum, default_value = "disable")]
backend_tls_mode: TlsMode,
@@ -110,8 +114,10 @@ impl StoreConfig {
let pool = meta_srv::bootstrap::create_postgres_pool(store_addrs, None)
.await
.map_err(BoxedError::new)?;
+ let schema_name = self.meta_schema_name.as_deref();
Ok(common_meta::kv_backend::rds::PgStore::with_pg_pool(
pool,
+ schema_name,
table_name,
max_txn_ops,
)
diff --git a/src/cmd/tests/load_config_test.rs b/src/cmd/tests/load_config_test.rs
index e8b85b7c2a..9058a579b0 100644
--- a/src/cmd/tests/load_config_test.rs
+++ b/src/cmd/tests/load_config_test.rs
@@ -199,6 +199,7 @@ fn test_load_metasrv_example_config() {
ca_cert_path: String::new(),
watch: false,
}),
+ meta_schema_name: Some("greptime_schema".to_string()),
..Default::default()
},
..Default::default()
diff --git a/src/common/meta/src/key.rs b/src/common/meta/src/key.rs
index 3688091843..ea1ca22971 100644
--- a/src/common/meta/src/key.rs
+++ b/src/common/meta/src/key.rs
@@ -628,7 +628,6 @@ impl TableMetadataManager {
&self.topic_region_manager
}
- #[cfg(feature = "testing")]
pub fn kv_backend(&self) -> &KvBackendRef {
&self.kv_backend
}
diff --git a/src/common/meta/src/kv_backend/rds/postgres.rs b/src/common/meta/src/kv_backend/rds/postgres.rs
index e5d2ca98d0..fa8abadffa 100644
--- a/src/common/meta/src/kv_backend/rds/postgres.rs
+++ b/src/common/meta/src/kv_backend/rds/postgres.rs
@@ -192,50 +192,61 @@ fn pg_generate_in_placeholders(from: usize, to: usize) -> Vec {
/// Factory for building sql templates.
struct PgSqlTemplateFactory<'a> {
+ schema_name: Option<&'a str>,
table_name: &'a str,
}
impl<'a> PgSqlTemplateFactory<'a> {
- /// Creates a new [`SqlTemplateFactory`] with the given table name.
- fn new(table_name: &'a str) -> Self {
- Self { table_name }
+ /// Creates a new factory with optional schema.
+ fn new(schema_name: Option<&'a str>, table_name: &'a str) -> Self {
+ Self {
+ schema_name,
+ table_name,
+ }
}
/// Builds the template set for the given table name.
fn build(&self) -> PgSqlTemplateSet {
- let table_name = self.table_name;
+ let table_ident = Self::format_table_ident(self.schema_name, self.table_name);
// Some of queries don't end with `;`, because we need to add `LIMIT` clause.
PgSqlTemplateSet {
- table_name: table_name.to_string(),
+ table_ident: table_ident.clone(),
+ // Do not attempt to create schema implicitly to avoid extra privileges requirement.
create_table_statement: format!(
- "CREATE TABLE IF NOT EXISTS \"{table_name}\"(k bytea PRIMARY KEY, v bytea)",
+ "CREATE TABLE IF NOT EXISTS {table_ident}(k bytea PRIMARY KEY, v bytea)",
),
range_template: RangeTemplate {
- point: format!("SELECT k, v FROM \"{table_name}\" WHERE k = $1"),
+ point: format!("SELECT k, v FROM {table_ident} WHERE k = $1"),
range: format!(
- "SELECT k, v FROM \"{table_name}\" WHERE k >= $1 AND k < $2 ORDER BY k"
+ "SELECT k, v FROM {table_ident} WHERE k >= $1 AND k < $2 ORDER BY k"
),
- full: format!("SELECT k, v FROM \"{table_name}\" ORDER BY k"),
- left_bounded: format!("SELECT k, v FROM \"{table_name}\" WHERE k >= $1 ORDER BY k"),
- prefix: format!("SELECT k, v FROM \"{table_name}\" WHERE k LIKE $1 ORDER BY k"),
+ full: format!("SELECT k, v FROM {table_ident} ORDER BY k"),
+ left_bounded: format!("SELECT k, v FROM {table_ident} WHERE k >= $1 ORDER BY k"),
+ prefix: format!("SELECT k, v FROM {table_ident} WHERE k LIKE $1 ORDER BY k"),
},
delete_template: RangeTemplate {
- point: format!("DELETE FROM \"{table_name}\" WHERE k = $1 RETURNING k,v;"),
- range: format!(
- "DELETE FROM \"{table_name}\" WHERE k >= $1 AND k < $2 RETURNING k,v;"
- ),
- full: format!("DELETE FROM \"{table_name}\" RETURNING k,v"),
- left_bounded: format!("DELETE FROM \"{table_name}\" WHERE k >= $1 RETURNING k,v;"),
- prefix: format!("DELETE FROM \"{table_name}\" WHERE k LIKE $1 RETURNING k,v;"),
+ point: format!("DELETE FROM {table_ident} WHERE k = $1 RETURNING k,v;"),
+ range: format!("DELETE FROM {table_ident} WHERE k >= $1 AND k < $2 RETURNING k,v;"),
+ full: format!("DELETE FROM {table_ident} RETURNING k,v"),
+ left_bounded: format!("DELETE FROM {table_ident} WHERE k >= $1 RETURNING k,v;"),
+ prefix: format!("DELETE FROM {table_ident} WHERE k LIKE $1 RETURNING k,v;"),
},
}
}
+
+ /// Formats the table reference with schema if provided.
+ fn format_table_ident(schema_name: Option<&str>, table_name: &str) -> String {
+ match schema_name {
+ Some(s) if !s.is_empty() => format!("\"{}\".\"{}\"", s, table_name),
+ _ => format!("\"{}\"", table_name),
+ }
+ }
}
/// Templates for the given table name.
#[derive(Debug, Clone)]
pub struct PgSqlTemplateSet {
- table_name: String,
+ table_ident: String,
create_table_statement: String,
range_template: RangeTemplate,
delete_template: RangeTemplate,
@@ -244,27 +255,24 @@ pub struct PgSqlTemplateSet {
impl PgSqlTemplateSet {
/// Generates the sql for batch get.
fn generate_batch_get_query(&self, key_len: usize) -> String {
- let table_name = &self.table_name;
let in_clause = pg_generate_in_placeholders(1, key_len).join(", ");
format!(
- "SELECT k, v FROM \"{table_name}\" WHERE k in ({});",
- in_clause
+ "SELECT k, v FROM {} WHERE k in ({});",
+ self.table_ident, in_clause
)
}
/// Generates the sql for batch delete.
fn generate_batch_delete_query(&self, key_len: usize) -> String {
- let table_name = &self.table_name;
let in_clause = pg_generate_in_placeholders(1, key_len).join(", ");
format!(
- "DELETE FROM \"{table_name}\" WHERE k in ({}) RETURNING k,v;",
- in_clause
+ "DELETE FROM {} WHERE k in ({}) RETURNING k,v;",
+ self.table_ident, in_clause
)
}
/// Generates the sql for batch upsert.
fn generate_batch_upsert_query(&self, kv_len: usize) -> String {
- let table_name = &self.table_name;
let in_placeholders: Vec = (1..=kv_len).map(|i| format!("${}", i)).collect();
let in_clause = in_placeholders.join(", ");
let mut param_index = kv_len + 1;
@@ -278,9 +286,9 @@ impl PgSqlTemplateSet {
format!(
r#"
WITH prev AS (
- SELECT k,v FROM "{table_name}" WHERE k IN ({in_clause})
+ SELECT k,v FROM {table} WHERE k IN ({in_clause})
), update AS (
- INSERT INTO "{table_name}" (k, v) VALUES
+ INSERT INTO {table} (k, v) VALUES
{values_clause}
ON CONFLICT (
k
@@ -289,7 +297,10 @@ impl PgSqlTemplateSet {
)
SELECT k, v FROM prev;
- "#
+ "#,
+ table = self.table_ident,
+ in_clause = in_clause,
+ values_clause = values_clause
)
}
}
@@ -835,7 +846,7 @@ impl PgStore {
.context(CreatePostgresPoolSnafu)?,
};
- Self::with_pg_pool(pool, table_name, max_txn_ops).await
+ Self::with_pg_pool(pool, None, table_name, max_txn_ops).await
}
/// Create [PgStore] impl of [KvBackendRef] from url (backward compatibility).
@@ -843,15 +854,14 @@ impl PgStore {
Self::with_url_and_tls(url, table_name, max_txn_ops, None).await
}
- /// Create [PgStore] impl of [KvBackendRef] from [deadpool_postgres::Pool].
+ /// Create [PgStore] impl of [KvBackendRef] from [deadpool_postgres::Pool] with optional schema.
pub async fn with_pg_pool(
pool: Pool,
+ schema_name: Option<&str>,
table_name: &str,
max_txn_ops: usize,
) -> Result {
- // This step ensures the postgres metadata backend is ready to use.
- // We check if greptime_metakv table exists, and we will create a new table
- // if it does not exist.
+ // Ensure the postgres metadata backend is ready to use.
let client = match pool.get().await {
Ok(client) => client,
Err(e) => {
@@ -861,8 +871,9 @@ impl PgStore {
.fail();
}
};
- let template_factory = PgSqlTemplateFactory::new(table_name);
+ let template_factory = PgSqlTemplateFactory::new(schema_name, table_name);
let sql_template_set = template_factory.build();
+ // Do not attempt to create schema implicitly.
client
.execute(&sql_template_set.create_table_statement, &[])
.await
@@ -890,7 +901,7 @@ mod tests {
test_txn_compare_less, test_txn_compare_not_equal, test_txn_one_compare_op,
text_txn_multi_compare_op, unprepare_kv,
};
- use crate::maybe_skip_postgres_integration_test;
+ use crate::{maybe_skip_postgres15_integration_test, maybe_skip_postgres_integration_test};
async fn build_pg_kv_backend(table_name: &str) -> Option {
let endpoints = std::env::var("GT_POSTGRES_ENDPOINTS").unwrap_or_default();
@@ -905,8 +916,10 @@ mod tests {
.context(CreatePostgresPoolSnafu)
.unwrap();
let client = pool.get().await.unwrap();
- let template_factory = PgSqlTemplateFactory::new(table_name);
+ // use the default schema (i.e., public)
+ let template_factory = PgSqlTemplateFactory::new(None, table_name);
let sql_templates = template_factory.build();
+ // Do not attempt to create schema implicitly.
client
.execute(&sql_templates.create_table_statement, &[])
.await
@@ -923,6 +936,61 @@ mod tests {
})
}
+ async fn build_pg15_pool() -> Option {
+ let url = std::env::var("GT_POSTGRES15_ENDPOINTS").unwrap_or_default();
+ if url.is_empty() {
+ return None;
+ }
+ let mut cfg = Config::new();
+ cfg.url = Some(url);
+ let pool = cfg
+ .create_pool(Some(Runtime::Tokio1), NoTls)
+ .context(CreatePostgresPoolSnafu)
+ .ok()?;
+ Some(pool)
+ }
+
+ #[tokio::test]
+ async fn test_pg15_create_table_in_public_should_fail() {
+ maybe_skip_postgres15_integration_test!();
+ let Some(pool) = build_pg15_pool().await else {
+ return;
+ };
+ let res = PgStore::with_pg_pool(pool, None, "pg15_public_should_fail", 128).await;
+ assert!(
+ res.is_err(),
+ "creating table in public should fail for test_user"
+ );
+ }
+
+ #[tokio::test]
+ async fn test_pg15_create_table_in_test_schema_and_crud_should_succeed() {
+ maybe_skip_postgres15_integration_test!();
+ let Some(pool) = build_pg15_pool().await else {
+ return;
+ };
+ let schema_name = std::env::var("GT_POSTGRES15_SCHEMA").unwrap();
+ let client = pool.get().await.unwrap();
+ let factory = PgSqlTemplateFactory::new(Some(&schema_name), "pg15_ok");
+ let templates = factory.build();
+ client
+ .execute(&templates.create_table_statement, &[])
+ .await
+ .unwrap();
+ let kv = PgStore {
+ max_txn_ops: 128,
+ sql_template_set: templates,
+ txn_retry_count: RDS_STORE_TXN_RETRY_COUNT,
+ executor_factory: PgExecutorFactory { pool },
+ _phantom: PhantomData,
+ };
+ let prefix = b"pg15_crud/";
+ prepare_kv_with_prefix(&kv, prefix.to_vec()).await;
+ test_kv_put_with_prefix(&kv, prefix.to_vec()).await;
+ test_kv_batch_get_with_prefix(&kv, prefix.to_vec()).await;
+ unprepare_kv(&kv, prefix).await;
+ }
+
#[tokio::test]
async fn test_pg_put() {
maybe_skip_postgres_integration_test!();
@@ -1024,4 +1092,31 @@ mod tests {
test_txn_compare_less(&kv_backend).await;
test_txn_compare_not_equal(&kv_backend).await;
}
+
+ #[test]
+ fn test_pg_template_with_schema() {
+ let factory = PgSqlTemplateFactory::new(Some("test_schema"), "greptime_metakv");
+ let t = factory.build();
+ assert!(t
+ .create_table_statement
+ .contains("\"test_schema\".\"greptime_metakv\""));
+ let upsert = t.generate_batch_upsert_query(1);
+ assert!(upsert.contains("\"test_schema\".\"greptime_metakv\""));
+ let get = t.generate_batch_get_query(1);
+ assert!(get.contains("\"test_schema\".\"greptime_metakv\""));
+ let del = t.generate_batch_delete_query(1);
+ assert!(del.contains("\"test_schema\".\"greptime_metakv\""));
+ }
+
+ #[test]
+ fn test_format_table_ident() {
+ let t = PgSqlTemplateFactory::format_table_ident(None, "test_table");
+ assert_eq!(t, "\"test_table\"");
+
+ let t = PgSqlTemplateFactory::format_table_ident(Some("test_schema"), "test_table");
+ assert_eq!(t, "\"test_schema\".\"test_table\"");
+
+ let t = PgSqlTemplateFactory::format_table_ident(Some(""), "test_table");
+ assert_eq!(t, "\"test_table\"");
+ }
}
diff --git a/src/common/meta/src/test_util.rs b/src/common/meta/src/test_util.rs
index 9d8e9a6822..95a2d23800 100644
--- a/src/common/meta/src/test_util.rs
+++ b/src/common/meta/src/test_util.rs
@@ -260,7 +260,7 @@ pub async fn test_kafka_topic_pool(
/// Skip the test if the environment variable `GT_POSTGRES_ENDPOINTS` is not set.
///
/// The format of the environment variable is:
-/// ```
+/// ```text
/// GT_POSTGRES_ENDPOINTS=localhost:9092,localhost:9093
/// ```
macro_rules! maybe_skip_postgres_integration_test {
@@ -276,7 +276,7 @@ macro_rules! maybe_skip_postgres_integration_test {
/// Skip the test if the environment variable `GT_MYSQL_ENDPOINTS` is not set.
///
/// The format of the environment variable is:
-/// ```
+/// ```text
/// GT_MYSQL_ENDPOINTS=localhost:9092,localhost:9093
/// ```
macro_rules! maybe_skip_mysql_integration_test {
@@ -287,3 +287,19 @@ macro_rules! maybe_skip_mysql_integration_test {
}
};
}
+
+#[macro_export]
+/// Skip the test if the environment variable `GT_POSTGRES15_ENDPOINTS` is not set.
+///
+/// The format of the environment variable is:
+/// ```text
+/// GT_POSTGRES15_ENDPOINTS=postgres://user:password@127.0.0.1:5433/postgres
+/// ```
+macro_rules! maybe_skip_postgres15_integration_test {
+ () => {
+ if std::env::var("GT_POSTGRES15_ENDPOINTS").is_err() {
+ common_telemetry::warn!("The PG15 endpoints is empty, skipping the test");
+ return;
+ }
+ };
+}
diff --git a/src/meta-srv/src/bootstrap.rs b/src/meta-srv/src/bootstrap.rs
index 86c917051d..6bd7b95b55 100644
--- a/src/meta-srv/src/bootstrap.rs
+++ b/src/meta-srv/src/bootstrap.rs
@@ -332,15 +332,21 @@ pub async fn metasrv_builder(
opts.store_key_prefix.clone(),
candidate_lease_ttl,
meta_lease_ttl,
+ opts.meta_schema_name.as_deref(),
&opts.meta_table_name,
opts.meta_election_lock_id,
)
.await?;
let pool = create_postgres_pool(&opts.store_addrs, opts.backend_tls.clone()).await?;
- let kv_backend = PgStore::with_pg_pool(pool, &opts.meta_table_name, opts.max_txn_ops)
- .await
- .context(error::KvBackendSnafu)?;
+ let kv_backend = PgStore::with_pg_pool(
+ pool,
+ opts.meta_schema_name.as_deref(),
+ &opts.meta_table_name,
+ opts.max_txn_ops,
+ )
+ .await
+ .context(error::KvBackendSnafu)?;
(kv_backend, Some(election))
}
diff --git a/src/meta-srv/src/election/rds/postgres.rs b/src/meta-srv/src/election/rds/postgres.rs
index 4d50fe0867..870f5377e4 100644
--- a/src/meta-srv/src/election/rds/postgres.rs
+++ b/src/meta-srv/src/election/rds/postgres.rs
@@ -38,6 +38,7 @@ use crate::metasrv::{ElectionRef, LeaderValue, MetasrvNodeInfo};
struct ElectionSqlFactory<'a> {
lock_id: u64,
+ schema_name: Option<&'a str>,
table_name: &'a str,
}
@@ -88,13 +89,21 @@ struct ElectionSqlSet {
}
impl<'a> ElectionSqlFactory<'a> {
- fn new(lock_id: u64, table_name: &'a str) -> Self {
+ fn new(lock_id: u64, schema_name: Option<&'a str>, table_name: &'a str) -> Self {
Self {
lock_id,
+ schema_name,
table_name,
}
}
+ fn table_ident(&self) -> String {
+ match self.schema_name {
+ Some(s) if !s.is_empty() => format!("\"{}\".\"{}\"", s, self.table_name),
+ _ => format!("\"{}\"", self.table_name),
+ }
+ }
+
fn build(self) -> ElectionSqlSet {
ElectionSqlSet {
campaign: self.campaign_sql(),
@@ -116,47 +125,54 @@ impl<'a> ElectionSqlFactory<'a> {
}
fn put_value_with_lease_sql(&self) -> String {
+ let table = self.table_ident();
format!(
r#"WITH prev AS (
- SELECT k, v FROM "{}" WHERE k = $1
+ SELECT k, v FROM {table} WHERE k = $1
), insert AS (
- INSERT INTO "{}"
- VALUES($1, convert_to($2 || '{}' || TO_CHAR(CURRENT_TIMESTAMP + INTERVAL '1 second' * $3, 'YYYY-MM-DD HH24:MI:SS.MS'), 'UTF8'))
+ INSERT INTO {table}
+ VALUES($1, convert_to($2 || '{lease_sep}' || TO_CHAR(CURRENT_TIMESTAMP + INTERVAL '1 second' * $3, 'YYYY-MM-DD HH24:MI:SS.MS'), 'UTF8'))
ON CONFLICT (k) DO NOTHING
)
SELECT k, v FROM prev;
"#,
- self.table_name, self.table_name, LEASE_SEP
+ table = table,
+ lease_sep = LEASE_SEP
)
}
fn update_value_with_lease_sql(&self) -> String {
+ let table = self.table_ident();
format!(
- r#"UPDATE "{}"
- SET v = convert_to($3 || '{}' || TO_CHAR(CURRENT_TIMESTAMP + INTERVAL '1 second' * $4, 'YYYY-MM-DD HH24:MI:SS.MS'), 'UTF8')
+ r#"UPDATE {table}
+ SET v = convert_to($3 || '{lease_sep}' || TO_CHAR(CURRENT_TIMESTAMP + INTERVAL '1 second' * $4, 'YYYY-MM-DD HH24:MI:SS.MS'), 'UTF8')
WHERE k = $1 AND v = $2"#,
- self.table_name, LEASE_SEP
+ table = table,
+ lease_sep = LEASE_SEP
)
}
fn get_value_with_lease_sql(&self) -> String {
+ let table = self.table_ident();
format!(
- r#"SELECT v, TO_CHAR(CURRENT_TIMESTAMP, 'YYYY-MM-DD HH24:MI:SS.MS') FROM "{}" WHERE k = $1"#,
- self.table_name
+ r#"SELECT v, TO_CHAR(CURRENT_TIMESTAMP, 'YYYY-MM-DD HH24:MI:SS.MS') FROM {table} WHERE k = $1"#,
+ table = table
)
}
fn get_value_with_lease_by_prefix_sql(&self) -> String {
+ let table = self.table_ident();
format!(
- r#"SELECT v, TO_CHAR(CURRENT_TIMESTAMP, 'YYYY-MM-DD HH24:MI:SS.MS') FROM "{}" WHERE k LIKE $1"#,
- self.table_name
+ r#"SELECT v, TO_CHAR(CURRENT_TIMESTAMP, 'YYYY-MM-DD HH24:MI:SS.MS') FROM {table} WHERE k LIKE $1"#,
+ table = table
)
}
fn delete_value_sql(&self) -> String {
+ let table = self.table_ident();
format!(
- "DELETE FROM \"{}\" WHERE k = $1 RETURNING k,v;",
- self.table_name
+ "DELETE FROM {table} WHERE k = $1 RETURNING k,v;",
+ table = table
)
}
}
@@ -299,16 +315,23 @@ impl PgElection {
Ok(())
}
+ #[allow(clippy::too_many_arguments)]
pub async fn with_pg_client(
leader_value: String,
pg_client: ElectionPgClient,
store_key_prefix: String,
candidate_lease_ttl: Duration,
meta_lease_ttl: Duration,
+ schema_name: Option<&str>,
table_name: &str,
lock_id: u64,
) -> Result {
- let sql_factory = ElectionSqlFactory::new(lock_id, table_name);
+ if let Some(s) = schema_name {
+ common_telemetry::info!("PgElection uses schema: {}", s);
+ } else {
+ common_telemetry::info!("PgElection uses default search_path (no schema provided)");
+ }
+ let sql_factory = ElectionSqlFactory::new(lock_id, schema_name, table_name);
let tx = listen_leader_change(leader_value.clone());
Ok(Arc::new(Self {
@@ -638,8 +661,8 @@ impl PgElection {
/// after a period of time during which other leaders have been elected and stepped down.
/// - **Case 1.4**: If no lease information is found, it also steps down and re-initiates the campaign.
///
- /// - **Case 2**: If the current instance is not leader previously, it calls the
- /// `elected` method as a newly elected leader.
+ /// - **Case 2**: If the current instance is not leader previously, it calls the `elected` method
+ /// as a newly elected leader.
async fn leader_action(&self) -> Result<()> {
let key = self.election_key();
// Case 1
@@ -881,7 +904,7 @@ mod tests {
store_key_prefix: uuid,
candidate_lease_ttl,
meta_lease_ttl,
- sql_set: ElectionSqlFactory::new(28319, table_name).build(),
+ sql_set: ElectionSqlFactory::new(28319, None, table_name).build(),
};
let res = pg_election
@@ -969,7 +992,7 @@ mod tests {
store_key_prefix,
candidate_lease_ttl,
meta_lease_ttl,
- sql_set: ElectionSqlFactory::new(28319, &table_name).build(),
+ sql_set: ElectionSqlFactory::new(28319, None, &table_name).build(),
};
let node_info = MetasrvNodeInfo {
@@ -1026,7 +1049,7 @@ mod tests {
store_key_prefix: uuid.clone(),
candidate_lease_ttl,
meta_lease_ttl,
- sql_set: ElectionSqlFactory::new(28319, table_name).build(),
+ sql_set: ElectionSqlFactory::new(28319, None, table_name).build(),
};
let candidates = pg_election.all_candidates().await.unwrap();
@@ -1081,7 +1104,7 @@ mod tests {
store_key_prefix: uuid,
candidate_lease_ttl,
meta_lease_ttl,
- sql_set: ElectionSqlFactory::new(28320, table_name).build(),
+ sql_set: ElectionSqlFactory::new(28320, None, table_name).build(),
};
leader_pg_election.elected().await.unwrap();
@@ -1206,7 +1229,7 @@ mod tests {
store_key_prefix: uuid,
candidate_lease_ttl,
meta_lease_ttl,
- sql_set: ElectionSqlFactory::new(28321, table_name).build(),
+ sql_set: ElectionSqlFactory::new(28321, None, table_name).build(),
};
// Step 1: No leader exists, campaign and elected.
@@ -1473,7 +1496,7 @@ mod tests {
store_key_prefix: uuid.clone(),
candidate_lease_ttl,
meta_lease_ttl,
- sql_set: ElectionSqlFactory::new(28322, table_name).build(),
+ sql_set: ElectionSqlFactory::new(28322, None, table_name).build(),
};
let leader_client = create_postgres_client(
@@ -1494,7 +1517,7 @@ mod tests {
store_key_prefix: uuid,
candidate_lease_ttl,
meta_lease_ttl,
- sql_set: ElectionSqlFactory::new(28322, table_name).build(),
+ sql_set: ElectionSqlFactory::new(28322, None, table_name).build(),
};
leader_pg_election
@@ -1578,4 +1601,26 @@ mod tests {
client.reset_client().await.unwrap();
let _ = client.query("SELECT 1", &[]).await.unwrap();
}
+
+ #[test]
+ fn test_election_sql_with_schema() {
+ let f = ElectionSqlFactory::new(42, Some("test_schema"), "greptime_metakv");
+ let s = f.build();
+ assert!(s.campaign.contains("pg_try_advisory_lock"));
+ assert!(s
+ .put_value_with_lease
+ .contains("\"test_schema\".\"greptime_metakv\""));
+ assert!(s
+ .update_value_with_lease
+ .contains("\"test_schema\".\"greptime_metakv\""));
+ assert!(s
+ .get_value_with_lease
+ .contains("\"test_schema\".\"greptime_metakv\""));
+ assert!(s
+ .get_value_with_lease_by_prefix
+ .contains("\"test_schema\".\"greptime_metakv\""));
+ assert!(s
+ .delete_value
+ .contains("\"test_schema\".\"greptime_metakv\""));
+ }
}
diff --git a/src/meta-srv/src/metasrv.rs b/src/meta-srv/src/metasrv.rs
index 28cd2b13b9..3f44904d70 100644
--- a/src/meta-srv/src/metasrv.rs
+++ b/src/meta-srv/src/metasrv.rs
@@ -200,6 +200,9 @@ pub struct MetasrvOptions {
#[cfg(feature = "pg_kvbackend")]
/// Lock id for meta kv election. Only effect when using pg_kvbackend.
pub meta_election_lock_id: u64,
+ #[cfg(feature = "pg_kvbackend")]
+ /// Optional PostgreSQL schema for metadata table (defaults to current search_path if empty).
+ pub meta_schema_name: Option,
#[serde(with = "humantime_serde")]
pub node_max_idle_time: Duration,
/// The event recorder options.
@@ -244,6 +247,8 @@ impl fmt::Debug for MetasrvOptions {
#[cfg(feature = "pg_kvbackend")]
debug_struct.field("meta_election_lock_id", &self.meta_election_lock_id);
+ #[cfg(feature = "pg_kvbackend")]
+ debug_struct.field("meta_schema_name", &self.meta_schema_name);
debug_struct
.field("node_max_idle_time", &self.node_max_idle_time)
@@ -297,6 +302,8 @@ impl Default for MetasrvOptions {
meta_table_name: common_meta::kv_backend::DEFAULT_META_TABLE_NAME.to_string(),
#[cfg(feature = "pg_kvbackend")]
meta_election_lock_id: common_meta::kv_backend::DEFAULT_META_ELECTION_LOCK_ID,
+ #[cfg(feature = "pg_kvbackend")]
+ meta_schema_name: None,
node_max_idle_time: Duration::from_secs(24 * 60 * 60),
event_recorder: EventRecorderOptions::default(),
stats_persistence: StatsPersistenceOptions::default(),
diff --git a/tests-integration/fixtures/docker-compose.yml b/tests-integration/fixtures/docker-compose.yml
index 8e435e47d4..164331f692 100644
--- a/tests-integration/fixtures/docker-compose.yml
+++ b/tests-integration/fixtures/docker-compose.yml
@@ -94,6 +94,19 @@ services:
- POSTGRES_DB=postgres
- POSTGRES_PASSWORD=admin
+ postgres15:
+ image: docker.io/postgres:15-alpine
+ ports:
+ - 5433:5432
+ volumes:
+ - ~/apps/postgres15:/var/lib/postgresql/data
+ - ./postgres/init.sql:/docker-entrypoint-initdb.d/01-init.sql:ro
+ environment:
+ - POSTGRES_USER=greptimedb
+ - POSTGRES_DB=postgres
+ - POSTGRES_PASSWORD=admin
+
+
mysql:
image: bitnami/mysql:5.7
ports:
diff --git a/tests-integration/fixtures/postgres/init.sql b/tests-integration/fixtures/postgres/init.sql
new file mode 100644
index 0000000000..7d7327bff7
--- /dev/null
+++ b/tests-integration/fixtures/postgres/init.sql
@@ -0,0 +1,10 @@
+-- 1. Create a non-superuser
+CREATE USER test_user WITH PASSWORD 'test_password';
+
+-- 2. Prevent non-superusers from creating tables in the public schema
+-- Explicitly revoke CREATE permission from PUBLIC to avoid inherited grants
+REVOKE CREATE ON SCHEMA public FROM PUBLIC;
+REVOKE CREATE ON SCHEMA public FROM test_user;
+
+-- 3. Create a separate schema owned by the non-superuser
+CREATE SCHEMA test_schema AUTHORIZATION test_user;