From 8d9afc83e332d1e90ded907aa212e5435690b908 Mon Sep 17 00:00:00 2001 From: Lanqing Yang Date: Tue, 23 Dec 2025 16:55:24 +0800 Subject: [PATCH] feat: allow auto schema creation for pg (#7459) Signed-off-by: lyang24 --- config/config.md | 3 +- config/metasrv.example.toml | 11 +- src/cli/src/common/store.rs | 7 + .../meta/src/kv_backend/rds/postgres.rs | 266 +++++++++++++++++- src/meta-srv/src/bootstrap.rs | 1 + src/meta-srv/src/metasrv.rs | 5 + 6 files changed, 287 insertions(+), 6 deletions(-) diff --git a/config/config.md b/config/config.md index ecbc767708..66b345b5ef 100644 --- a/config/config.md +++ b/config/config.md @@ -344,7 +344,8 @@ | `store_key_prefix` | String | `""` | If it's not empty, the metasrv will store all data with this key prefix. | | `backend` | String | `etcd_store` | The datastore for meta server.
Available values:
- `etcd_store` (default value)
- `memory_store`
- `postgres_store`
- `mysql_store` | | `meta_table_name` | String | `greptime_metakv` | Table name in RDS to store metadata. Effect when using a RDS kvbackend.
**Only used when backend is `postgres_store`.** | -| `meta_schema_name` | String | `greptime_schema` | Optional PostgreSQL schema for metadata table and election table name qualification.
When PostgreSQL public schema is not writable (e.g., PostgreSQL 15+ with restricted public),
set this to a writable schema. GreptimeDB will use `meta_schema_name`.`meta_table_name`.
GreptimeDB will NOT create the schema automatically; please ensure it exists or the user has permission.
**Only used when backend is `postgres_store`.** | +| `meta_schema_name` | String | `greptime_schema` | Optional PostgreSQL schema for metadata table and election table name qualification.
When PostgreSQL public schema is not writable (e.g., PostgreSQL 15+ with restricted public),
set this to a writable schema. GreptimeDB will use `meta_schema_name`.`meta_table_name`.
**Only used when backend is `postgres_store`.** | +| `auto_create_schema` | Bool | `true` | Automatically create PostgreSQL schema if it doesn't exist.
When enabled, the system will execute `CREATE SCHEMA IF NOT EXISTS `
before creating metadata tables. This is useful in production environments where
manual schema creation may be restricted.
Default is true.
Note: The PostgreSQL user must have CREATE SCHEMA permission for this to work.
**Only used when backend is `postgres_store`.** | | `meta_election_lock_id` | Integer | `1` | Advisory lock id in PostgreSQL for election. Effect when using PostgreSQL as kvbackend
Only used when backend is `postgres_store`. | | `selector` | String | `round_robin` | Datanode selector type.
- `round_robin` (default value)
- `lease_based`
- `load_based`
For details, please see "https://docs.greptime.com/developer-guide/metasrv/selector". | | `enable_region_failover` | Bool | `false` | Whether to enable region failover.
This feature is only available on GreptimeDB running on cluster mode and
- Using Remote WAL
- Using shared storage (e.g., s3). | diff --git a/config/metasrv.example.toml b/config/metasrv.example.toml index a640de2588..aedd670740 100644 --- a/config/metasrv.example.toml +++ b/config/metasrv.example.toml @@ -34,11 +34,18 @@ meta_table_name = "greptime_metakv" ## Optional PostgreSQL schema for metadata table and election table name qualification. ## When PostgreSQL public schema is not writable (e.g., PostgreSQL 15+ with restricted public), ## set this to a writable schema. GreptimeDB will use `meta_schema_name`.`meta_table_name`. -## GreptimeDB will NOT create the schema automatically; please ensure it exists or the user has permission. ## **Only used when backend is `postgres_store`.** - meta_schema_name = "greptime_schema" +## Automatically create PostgreSQL schema if it doesn't exist. +## When enabled, the system will execute `CREATE SCHEMA IF NOT EXISTS ` +## before creating metadata tables. This is useful in production environments where +## manual schema creation may be restricted. +## Default is true. +## Note: The PostgreSQL user must have CREATE SCHEMA permission for this to work. +## **Only used when backend is `postgres_store`.** +auto_create_schema = true + ## Advisory lock id in PostgreSQL for election. Effect when using PostgreSQL as kvbackend ## Only used when backend is `postgres_store`. meta_election_lock_id = 1 diff --git a/src/cli/src/common/store.rs b/src/cli/src/common/store.rs index 03415b93ee..932cbe2eb4 100644 --- a/src/cli/src/common/store.rs +++ b/src/cli/src/common/store.rs @@ -61,6 +61,12 @@ pub struct StoreConfig { #[cfg(feature = "pg_kvbackend")] #[clap(long)] pub meta_schema_name: Option, + + /// Automatically create PostgreSQL schema if it doesn't exist (default: true). + #[cfg(feature = "pg_kvbackend")] + #[clap(long, default_value_t = true)] + pub auto_create_schema: bool, + /// TLS mode for backend store connections (etcd, PostgreSQL, MySQL) #[clap(long = "backend-tls-mode", value_enum, default_value = "disable")] pub backend_tls_mode: TlsMode, @@ -138,6 +144,7 @@ impl StoreConfig { schema_name, table_name, max_txn_ops, + self.auto_create_schema, ) .await .map_err(BoxedError::new)?) diff --git a/src/common/meta/src/kv_backend/rds/postgres.rs b/src/common/meta/src/kv_backend/rds/postgres.rs index 72c250d8d8..5d03637b7c 100644 --- a/src/common/meta/src/kv_backend/rds/postgres.rs +++ b/src/common/meta/src/kv_backend/rds/postgres.rs @@ -848,7 +848,7 @@ impl PgStore { .context(CreatePostgresPoolSnafu)?, }; - Self::with_pg_pool(pool, None, table_name, max_txn_ops).await + Self::with_pg_pool(pool, None, table_name, max_txn_ops, false).await } /// Create [PgStore] impl of [KvBackendRef] from url (backward compatibility). @@ -862,6 +862,7 @@ impl PgStore { schema_name: Option<&str>, table_name: &str, max_txn_ops: usize, + auto_create_schema: bool, ) -> Result { // Ensure the postgres metadata backend is ready to use. let client = match pool.get().await { @@ -873,9 +874,23 @@ impl PgStore { .fail(); } }; + + // Automatically create schema if enabled and schema_name is provided. + if auto_create_schema + && let Some(schema) = schema_name + && !schema.is_empty() + { + let create_schema_sql = format!("CREATE SCHEMA IF NOT EXISTS \"{}\"", schema); + client + .execute(&create_schema_sql, &[]) + .await + .with_context(|_| PostgresExecutionSnafu { + sql: create_schema_sql.clone(), + })?; + } + let template_factory = PgSqlTemplateFactory::new(schema_name, table_name); let sql_template_set = template_factory.build(); - // Do not attempt to create schema implicitly. client .execute(&sql_template_set.create_table_statement, &[]) .await @@ -959,7 +974,7 @@ mod tests { let Some(pool) = build_pg15_pool().await else { return; }; - let res = PgStore::with_pg_pool(pool, None, "pg15_public_should_fail", 128).await; + let res = PgStore::with_pg_pool(pool, None, "pg15_public_should_fail", 128, false).await; assert!( res.is_err(), "creating table in public should fail for test_user" @@ -1214,4 +1229,249 @@ mod tests { let t = PgSqlTemplateFactory::format_table_ident(Some(""), "test_table"); assert_eq!(t, "\"test_table\""); } + + #[tokio::test] + async fn test_auto_create_schema_enabled() { + common_telemetry::init_default_ut_logging(); + maybe_skip_postgres_integration_test!(); + let endpoints = std::env::var("GT_POSTGRES_ENDPOINTS").unwrap(); + let mut cfg = Config::new(); + cfg.url = Some(endpoints); + let pool = cfg + .create_pool(Some(Runtime::Tokio1), NoTls) + .context(CreatePostgresPoolSnafu) + .unwrap(); + + let schema_name = "test_auto_create_enabled"; + let table_name = "test_table"; + + // Drop the schema if it exists to start clean + let client = pool.get().await.unwrap(); + let _ = client + .execute( + &format!("DROP SCHEMA IF EXISTS \"{}\" CASCADE", schema_name), + &[], + ) + .await; + + // Create store with auto_create_schema enabled + let _ = PgStore::with_pg_pool(pool.clone(), Some(schema_name), table_name, 128, true) + .await + .unwrap(); + + // Verify schema was created + let row = client + .query_one( + "SELECT schema_name FROM information_schema.schemata WHERE schema_name = $1", + &[&schema_name], + ) + .await + .unwrap(); + let created_schema: String = row.get(0); + assert_eq!(created_schema, schema_name); + + // Verify table was created in the schema + let row = client + .query_one( + "SELECT table_schema, table_name FROM information_schema.tables WHERE table_schema = $1 AND table_name = $2", + &[&schema_name, &table_name], + ) + .await + .unwrap(); + let created_table_schema: String = row.get(0); + let created_table_name: String = row.get(1); + assert_eq!(created_table_schema, schema_name); + assert_eq!(created_table_name, table_name); + + // Cleanup + let _ = client + .execute( + &format!("DROP SCHEMA IF EXISTS \"{}\" CASCADE", schema_name), + &[], + ) + .await; + } + + #[tokio::test] + async fn test_auto_create_schema_disabled() { + common_telemetry::init_default_ut_logging(); + maybe_skip_postgres_integration_test!(); + let endpoints = std::env::var("GT_POSTGRES_ENDPOINTS").unwrap(); + let mut cfg = Config::new(); + cfg.url = Some(endpoints); + let pool = cfg + .create_pool(Some(Runtime::Tokio1), NoTls) + .context(CreatePostgresPoolSnafu) + .unwrap(); + + let schema_name = "test_auto_create_disabled"; + let table_name = "test_table"; + + // Drop the schema if it exists to start clean + let client = pool.get().await.unwrap(); + let _ = client + .execute( + &format!("DROP SCHEMA IF EXISTS \"{}\" CASCADE", schema_name), + &[], + ) + .await; + + // Try to create store with auto_create_schema disabled (should fail) + let result = + PgStore::with_pg_pool(pool.clone(), Some(schema_name), table_name, 128, false).await; + + // Verify it failed because schema doesn't exist + assert!( + result.is_err(), + "Expected error when schema doesn't exist and auto_create_schema is disabled" + ); + } + + #[tokio::test] + async fn test_auto_create_schema_already_exists() { + common_telemetry::init_default_ut_logging(); + maybe_skip_postgres_integration_test!(); + let endpoints = std::env::var("GT_POSTGRES_ENDPOINTS").unwrap(); + let mut cfg = Config::new(); + cfg.url = Some(endpoints); + let pool = cfg + .create_pool(Some(Runtime::Tokio1), NoTls) + .context(CreatePostgresPoolSnafu) + .unwrap(); + + let schema_name = "test_auto_create_existing"; + let table_name = "test_table"; + + // Manually create the schema first + let client = pool.get().await.unwrap(); + let _ = client + .execute( + &format!("DROP SCHEMA IF EXISTS \"{}\" CASCADE", schema_name), + &[], + ) + .await; + client + .execute(&format!("CREATE SCHEMA \"{}\"", schema_name), &[]) + .await + .unwrap(); + + // Create store with auto_create_schema enabled (should succeed idempotently) + let _ = PgStore::with_pg_pool(pool.clone(), Some(schema_name), table_name, 128, true) + .await + .unwrap(); + + // Verify schema still exists + let row = client + .query_one( + "SELECT schema_name FROM information_schema.schemata WHERE schema_name = $1", + &[&schema_name], + ) + .await + .unwrap(); + let created_schema: String = row.get(0); + assert_eq!(created_schema, schema_name); + + // Verify table was created in the schema + let row = client + .query_one( + "SELECT table_schema, table_name FROM information_schema.tables WHERE table_schema = $1 AND table_name = $2", + &[&schema_name, &table_name], + ) + .await + .unwrap(); + let created_table_schema: String = row.get(0); + let created_table_name: String = row.get(1); + assert_eq!(created_table_schema, schema_name); + assert_eq!(created_table_name, table_name); + + // Cleanup + let _ = client + .execute( + &format!("DROP SCHEMA IF EXISTS \"{}\" CASCADE", schema_name), + &[], + ) + .await; + } + + #[tokio::test] + async fn test_auto_create_schema_no_schema_name() { + common_telemetry::init_default_ut_logging(); + maybe_skip_postgres_integration_test!(); + let endpoints = std::env::var("GT_POSTGRES_ENDPOINTS").unwrap(); + let mut cfg = Config::new(); + cfg.url = Some(endpoints); + let pool = cfg + .create_pool(Some(Runtime::Tokio1), NoTls) + .context(CreatePostgresPoolSnafu) + .unwrap(); + + let table_name = "test_table_no_schema"; + + // Create store with auto_create_schema enabled but no schema name (should succeed) + // This should create the table in the default schema (public) + let _ = PgStore::with_pg_pool(pool.clone(), None, table_name, 128, true) + .await + .unwrap(); + + // Verify table was created in public schema + let client = pool.get().await.unwrap(); + let row = client + .query_one( + "SELECT table_schema, table_name FROM information_schema.tables WHERE table_name = $1", + &[&table_name], + ) + .await + .unwrap(); + let created_table_schema: String = row.get(0); + let created_table_name: String = row.get(1); + assert_eq!(created_table_name, table_name); + // Verify it's in public schema (or whichever is the default) + assert!(created_table_schema == "public" || !created_table_schema.is_empty()); + + // Cleanup + let _ = client + .execute(&format!("DROP TABLE IF EXISTS \"{}\"", table_name), &[]) + .await; + } + + #[tokio::test] + async fn test_auto_create_schema_with_empty_schema_name() { + common_telemetry::init_default_ut_logging(); + maybe_skip_postgres_integration_test!(); + let endpoints = std::env::var("GT_POSTGRES_ENDPOINTS").unwrap(); + let mut cfg = Config::new(); + cfg.url = Some(endpoints); + let pool = cfg + .create_pool(Some(Runtime::Tokio1), NoTls) + .context(CreatePostgresPoolSnafu) + .unwrap(); + + let table_name = "test_table_empty_schema"; + + // Create store with auto_create_schema enabled but empty schema name (should succeed) + // This should create the table in the default schema (public) + let _ = PgStore::with_pg_pool(pool.clone(), Some(""), table_name, 128, true) + .await + .unwrap(); + + // Verify table was created in public schema + let client = pool.get().await.unwrap(); + let row = client + .query_one( + "SELECT table_schema, table_name FROM information_schema.tables WHERE table_name = $1", + &[&table_name], + ) + .await + .unwrap(); + let created_table_schema: String = row.get(0); + let created_table_name: String = row.get(1); + assert_eq!(created_table_name, table_name); + // Verify it's in public schema (or whichever is the default) + assert!(created_table_schema == "public" || !created_table_schema.is_empty()); + + // Cleanup + let _ = client + .execute(&format!("DROP TABLE IF EXISTS \"{}\"", table_name), &[]) + .await; + } } diff --git a/src/meta-srv/src/bootstrap.rs b/src/meta-srv/src/bootstrap.rs index 8c60623b47..2a5ec494a4 100644 --- a/src/meta-srv/src/bootstrap.rs +++ b/src/meta-srv/src/bootstrap.rs @@ -339,6 +339,7 @@ pub async fn metasrv_builder( opts.meta_schema_name.as_deref(), &opts.meta_table_name, opts.max_txn_ops, + opts.auto_create_schema, ) .await .context(error::KvBackendSnafu)?; diff --git a/src/meta-srv/src/metasrv.rs b/src/meta-srv/src/metasrv.rs index 9e6eacf9b4..4fc2ebf3dd 100644 --- a/src/meta-srv/src/metasrv.rs +++ b/src/meta-srv/src/metasrv.rs @@ -231,6 +231,9 @@ pub struct MetasrvOptions { #[cfg(feature = "pg_kvbackend")] /// Optional PostgreSQL schema for metadata table (defaults to current search_path if empty). pub meta_schema_name: Option, + #[cfg(feature = "pg_kvbackend")] + /// Automatically create PostgreSQL schema if it doesn't exist (default: true). + pub auto_create_schema: bool, #[serde(with = "humantime_serde")] pub node_max_idle_time: Duration, /// The event recorder options. @@ -333,6 +336,8 @@ impl Default for MetasrvOptions { meta_election_lock_id: common_meta::kv_backend::DEFAULT_META_ELECTION_LOCK_ID, #[cfg(feature = "pg_kvbackend")] meta_schema_name: None, + #[cfg(feature = "pg_kvbackend")] + auto_create_schema: true, node_max_idle_time: Duration::from_secs(24 * 60 * 60), event_recorder: EventRecorderOptions::default(), stats_persistence: StatsPersistenceOptions::default(),