mirror of
https://github.com/GreptimeTeam/greptimedb.git
synced 2025-12-26 08:00:01 +00:00
feat: allow auto schema creation for pg (#7459)
Signed-off-by: lyang24 <lanqingy93@gmail.com>
This commit is contained in:
@@ -344,7 +344,8 @@
|
||||
| `store_key_prefix` | String | `""` | If it's not empty, the metasrv will store all data with this key prefix. |
|
||||
| `backend` | String | `etcd_store` | The datastore for meta server.<br/>Available values:<br/>- `etcd_store` (default value)<br/>- `memory_store`<br/>- `postgres_store`<br/>- `mysql_store` |
|
||||
| `meta_table_name` | String | `greptime_metakv` | Table name in RDS to store metadata. Effect when using a RDS kvbackend.<br/>**Only used when backend is `postgres_store`.** |
|
||||
| `meta_schema_name` | String | `greptime_schema` | Optional PostgreSQL schema for metadata table and election table name qualification.<br/>When PostgreSQL public schema is not writable (e.g., PostgreSQL 15+ with restricted public),<br/>set this to a writable schema. GreptimeDB will use `meta_schema_name`.`meta_table_name`.<br/>GreptimeDB will NOT create the schema automatically; please ensure it exists or the user has permission.<br/>**Only used when backend is `postgres_store`.** |
|
||||
| `meta_schema_name` | String | `greptime_schema` | Optional PostgreSQL schema for metadata table and election table name qualification.<br/>When PostgreSQL public schema is not writable (e.g., PostgreSQL 15+ with restricted public),<br/>set this to a writable schema. GreptimeDB will use `meta_schema_name`.`meta_table_name`.<br/>**Only used when backend is `postgres_store`.** |
|
||||
| `auto_create_schema` | Bool | `true` | Automatically create PostgreSQL schema if it doesn't exist.<br/>When enabled, the system will execute `CREATE SCHEMA IF NOT EXISTS <schema_name>`<br/>before creating metadata tables. This is useful in production environments where<br/>manual schema creation may be restricted.<br/>Default is true.<br/>Note: The PostgreSQL user must have CREATE SCHEMA permission for this to work.<br/>**Only used when backend is `postgres_store`.** |
|
||||
| `meta_election_lock_id` | Integer | `1` | Advisory lock id in PostgreSQL for election. Effect when using PostgreSQL as kvbackend<br/>Only used when backend is `postgres_store`. |
|
||||
| `selector` | String | `round_robin` | Datanode selector type.<br/>- `round_robin` (default value)<br/>- `lease_based`<br/>- `load_based`<br/>For details, please see "https://docs.greptime.com/developer-guide/metasrv/selector". |
|
||||
| `enable_region_failover` | Bool | `false` | Whether to enable region failover.<br/>This feature is only available on GreptimeDB running on cluster mode and<br/>- Using Remote WAL<br/>- Using shared storage (e.g., s3). |
|
||||
|
||||
@@ -34,11 +34,18 @@ meta_table_name = "greptime_metakv"
|
||||
## Optional PostgreSQL schema for metadata table and election table name qualification.
|
||||
## When PostgreSQL public schema is not writable (e.g., PostgreSQL 15+ with restricted public),
|
||||
## set this to a writable schema. GreptimeDB will use `meta_schema_name`.`meta_table_name`.
|
||||
## GreptimeDB will NOT create the schema automatically; please ensure it exists or the user has permission.
|
||||
## **Only used when backend is `postgres_store`.**
|
||||
|
||||
meta_schema_name = "greptime_schema"
|
||||
|
||||
## Automatically create PostgreSQL schema if it doesn't exist.
|
||||
## When enabled, the system will execute `CREATE SCHEMA IF NOT EXISTS <schema_name>`
|
||||
## before creating metadata tables. This is useful in production environments where
|
||||
## manual schema creation may be restricted.
|
||||
## Default is true.
|
||||
## Note: The PostgreSQL user must have CREATE SCHEMA permission for this to work.
|
||||
## **Only used when backend is `postgres_store`.**
|
||||
auto_create_schema = true
|
||||
|
||||
## Advisory lock id in PostgreSQL for election. Effect when using PostgreSQL as kvbackend
|
||||
## Only used when backend is `postgres_store`.
|
||||
meta_election_lock_id = 1
|
||||
|
||||
@@ -61,6 +61,12 @@ pub struct StoreConfig {
|
||||
#[cfg(feature = "pg_kvbackend")]
|
||||
#[clap(long)]
|
||||
pub meta_schema_name: Option<String>,
|
||||
|
||||
/// Automatically create PostgreSQL schema if it doesn't exist (default: true).
|
||||
#[cfg(feature = "pg_kvbackend")]
|
||||
#[clap(long, default_value_t = true)]
|
||||
pub auto_create_schema: bool,
|
||||
|
||||
/// TLS mode for backend store connections (etcd, PostgreSQL, MySQL)
|
||||
#[clap(long = "backend-tls-mode", value_enum, default_value = "disable")]
|
||||
pub backend_tls_mode: TlsMode,
|
||||
@@ -138,6 +144,7 @@ impl StoreConfig {
|
||||
schema_name,
|
||||
table_name,
|
||||
max_txn_ops,
|
||||
self.auto_create_schema,
|
||||
)
|
||||
.await
|
||||
.map_err(BoxedError::new)?)
|
||||
|
||||
@@ -848,7 +848,7 @@ impl PgStore {
|
||||
.context(CreatePostgresPoolSnafu)?,
|
||||
};
|
||||
|
||||
Self::with_pg_pool(pool, None, table_name, max_txn_ops).await
|
||||
Self::with_pg_pool(pool, None, table_name, max_txn_ops, false).await
|
||||
}
|
||||
|
||||
/// Create [PgStore] impl of [KvBackendRef] from url (backward compatibility).
|
||||
@@ -862,6 +862,7 @@ impl PgStore {
|
||||
schema_name: Option<&str>,
|
||||
table_name: &str,
|
||||
max_txn_ops: usize,
|
||||
auto_create_schema: bool,
|
||||
) -> Result<KvBackendRef> {
|
||||
// Ensure the postgres metadata backend is ready to use.
|
||||
let client = match pool.get().await {
|
||||
@@ -873,9 +874,23 @@ impl PgStore {
|
||||
.fail();
|
||||
}
|
||||
};
|
||||
|
||||
// Automatically create schema if enabled and schema_name is provided.
|
||||
if auto_create_schema
|
||||
&& let Some(schema) = schema_name
|
||||
&& !schema.is_empty()
|
||||
{
|
||||
let create_schema_sql = format!("CREATE SCHEMA IF NOT EXISTS \"{}\"", schema);
|
||||
client
|
||||
.execute(&create_schema_sql, &[])
|
||||
.await
|
||||
.with_context(|_| PostgresExecutionSnafu {
|
||||
sql: create_schema_sql.clone(),
|
||||
})?;
|
||||
}
|
||||
|
||||
let template_factory = PgSqlTemplateFactory::new(schema_name, table_name);
|
||||
let sql_template_set = template_factory.build();
|
||||
// Do not attempt to create schema implicitly.
|
||||
client
|
||||
.execute(&sql_template_set.create_table_statement, &[])
|
||||
.await
|
||||
@@ -959,7 +974,7 @@ mod tests {
|
||||
let Some(pool) = build_pg15_pool().await else {
|
||||
return;
|
||||
};
|
||||
let res = PgStore::with_pg_pool(pool, None, "pg15_public_should_fail", 128).await;
|
||||
let res = PgStore::with_pg_pool(pool, None, "pg15_public_should_fail", 128, false).await;
|
||||
assert!(
|
||||
res.is_err(),
|
||||
"creating table in public should fail for test_user"
|
||||
@@ -1214,4 +1229,249 @@ mod tests {
|
||||
let t = PgSqlTemplateFactory::format_table_ident(Some(""), "test_table");
|
||||
assert_eq!(t, "\"test_table\"");
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_auto_create_schema_enabled() {
|
||||
common_telemetry::init_default_ut_logging();
|
||||
maybe_skip_postgres_integration_test!();
|
||||
let endpoints = std::env::var("GT_POSTGRES_ENDPOINTS").unwrap();
|
||||
let mut cfg = Config::new();
|
||||
cfg.url = Some(endpoints);
|
||||
let pool = cfg
|
||||
.create_pool(Some(Runtime::Tokio1), NoTls)
|
||||
.context(CreatePostgresPoolSnafu)
|
||||
.unwrap();
|
||||
|
||||
let schema_name = "test_auto_create_enabled";
|
||||
let table_name = "test_table";
|
||||
|
||||
// Drop the schema if it exists to start clean
|
||||
let client = pool.get().await.unwrap();
|
||||
let _ = client
|
||||
.execute(
|
||||
&format!("DROP SCHEMA IF EXISTS \"{}\" CASCADE", schema_name),
|
||||
&[],
|
||||
)
|
||||
.await;
|
||||
|
||||
// Create store with auto_create_schema enabled
|
||||
let _ = PgStore::with_pg_pool(pool.clone(), Some(schema_name), table_name, 128, true)
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
// Verify schema was created
|
||||
let row = client
|
||||
.query_one(
|
||||
"SELECT schema_name FROM information_schema.schemata WHERE schema_name = $1",
|
||||
&[&schema_name],
|
||||
)
|
||||
.await
|
||||
.unwrap();
|
||||
let created_schema: String = row.get(0);
|
||||
assert_eq!(created_schema, schema_name);
|
||||
|
||||
// Verify table was created in the schema
|
||||
let row = client
|
||||
.query_one(
|
||||
"SELECT table_schema, table_name FROM information_schema.tables WHERE table_schema = $1 AND table_name = $2",
|
||||
&[&schema_name, &table_name],
|
||||
)
|
||||
.await
|
||||
.unwrap();
|
||||
let created_table_schema: String = row.get(0);
|
||||
let created_table_name: String = row.get(1);
|
||||
assert_eq!(created_table_schema, schema_name);
|
||||
assert_eq!(created_table_name, table_name);
|
||||
|
||||
// Cleanup
|
||||
let _ = client
|
||||
.execute(
|
||||
&format!("DROP SCHEMA IF EXISTS \"{}\" CASCADE", schema_name),
|
||||
&[],
|
||||
)
|
||||
.await;
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_auto_create_schema_disabled() {
|
||||
common_telemetry::init_default_ut_logging();
|
||||
maybe_skip_postgres_integration_test!();
|
||||
let endpoints = std::env::var("GT_POSTGRES_ENDPOINTS").unwrap();
|
||||
let mut cfg = Config::new();
|
||||
cfg.url = Some(endpoints);
|
||||
let pool = cfg
|
||||
.create_pool(Some(Runtime::Tokio1), NoTls)
|
||||
.context(CreatePostgresPoolSnafu)
|
||||
.unwrap();
|
||||
|
||||
let schema_name = "test_auto_create_disabled";
|
||||
let table_name = "test_table";
|
||||
|
||||
// Drop the schema if it exists to start clean
|
||||
let client = pool.get().await.unwrap();
|
||||
let _ = client
|
||||
.execute(
|
||||
&format!("DROP SCHEMA IF EXISTS \"{}\" CASCADE", schema_name),
|
||||
&[],
|
||||
)
|
||||
.await;
|
||||
|
||||
// Try to create store with auto_create_schema disabled (should fail)
|
||||
let result =
|
||||
PgStore::with_pg_pool(pool.clone(), Some(schema_name), table_name, 128, false).await;
|
||||
|
||||
// Verify it failed because schema doesn't exist
|
||||
assert!(
|
||||
result.is_err(),
|
||||
"Expected error when schema doesn't exist and auto_create_schema is disabled"
|
||||
);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_auto_create_schema_already_exists() {
|
||||
common_telemetry::init_default_ut_logging();
|
||||
maybe_skip_postgres_integration_test!();
|
||||
let endpoints = std::env::var("GT_POSTGRES_ENDPOINTS").unwrap();
|
||||
let mut cfg = Config::new();
|
||||
cfg.url = Some(endpoints);
|
||||
let pool = cfg
|
||||
.create_pool(Some(Runtime::Tokio1), NoTls)
|
||||
.context(CreatePostgresPoolSnafu)
|
||||
.unwrap();
|
||||
|
||||
let schema_name = "test_auto_create_existing";
|
||||
let table_name = "test_table";
|
||||
|
||||
// Manually create the schema first
|
||||
let client = pool.get().await.unwrap();
|
||||
let _ = client
|
||||
.execute(
|
||||
&format!("DROP SCHEMA IF EXISTS \"{}\" CASCADE", schema_name),
|
||||
&[],
|
||||
)
|
||||
.await;
|
||||
client
|
||||
.execute(&format!("CREATE SCHEMA \"{}\"", schema_name), &[])
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
// Create store with auto_create_schema enabled (should succeed idempotently)
|
||||
let _ = PgStore::with_pg_pool(pool.clone(), Some(schema_name), table_name, 128, true)
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
// Verify schema still exists
|
||||
let row = client
|
||||
.query_one(
|
||||
"SELECT schema_name FROM information_schema.schemata WHERE schema_name = $1",
|
||||
&[&schema_name],
|
||||
)
|
||||
.await
|
||||
.unwrap();
|
||||
let created_schema: String = row.get(0);
|
||||
assert_eq!(created_schema, schema_name);
|
||||
|
||||
// Verify table was created in the schema
|
||||
let row = client
|
||||
.query_one(
|
||||
"SELECT table_schema, table_name FROM information_schema.tables WHERE table_schema = $1 AND table_name = $2",
|
||||
&[&schema_name, &table_name],
|
||||
)
|
||||
.await
|
||||
.unwrap();
|
||||
let created_table_schema: String = row.get(0);
|
||||
let created_table_name: String = row.get(1);
|
||||
assert_eq!(created_table_schema, schema_name);
|
||||
assert_eq!(created_table_name, table_name);
|
||||
|
||||
// Cleanup
|
||||
let _ = client
|
||||
.execute(
|
||||
&format!("DROP SCHEMA IF EXISTS \"{}\" CASCADE", schema_name),
|
||||
&[],
|
||||
)
|
||||
.await;
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_auto_create_schema_no_schema_name() {
|
||||
common_telemetry::init_default_ut_logging();
|
||||
maybe_skip_postgres_integration_test!();
|
||||
let endpoints = std::env::var("GT_POSTGRES_ENDPOINTS").unwrap();
|
||||
let mut cfg = Config::new();
|
||||
cfg.url = Some(endpoints);
|
||||
let pool = cfg
|
||||
.create_pool(Some(Runtime::Tokio1), NoTls)
|
||||
.context(CreatePostgresPoolSnafu)
|
||||
.unwrap();
|
||||
|
||||
let table_name = "test_table_no_schema";
|
||||
|
||||
// Create store with auto_create_schema enabled but no schema name (should succeed)
|
||||
// This should create the table in the default schema (public)
|
||||
let _ = PgStore::with_pg_pool(pool.clone(), None, table_name, 128, true)
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
// Verify table was created in public schema
|
||||
let client = pool.get().await.unwrap();
|
||||
let row = client
|
||||
.query_one(
|
||||
"SELECT table_schema, table_name FROM information_schema.tables WHERE table_name = $1",
|
||||
&[&table_name],
|
||||
)
|
||||
.await
|
||||
.unwrap();
|
||||
let created_table_schema: String = row.get(0);
|
||||
let created_table_name: String = row.get(1);
|
||||
assert_eq!(created_table_name, table_name);
|
||||
// Verify it's in public schema (or whichever is the default)
|
||||
assert!(created_table_schema == "public" || !created_table_schema.is_empty());
|
||||
|
||||
// Cleanup
|
||||
let _ = client
|
||||
.execute(&format!("DROP TABLE IF EXISTS \"{}\"", table_name), &[])
|
||||
.await;
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_auto_create_schema_with_empty_schema_name() {
|
||||
common_telemetry::init_default_ut_logging();
|
||||
maybe_skip_postgres_integration_test!();
|
||||
let endpoints = std::env::var("GT_POSTGRES_ENDPOINTS").unwrap();
|
||||
let mut cfg = Config::new();
|
||||
cfg.url = Some(endpoints);
|
||||
let pool = cfg
|
||||
.create_pool(Some(Runtime::Tokio1), NoTls)
|
||||
.context(CreatePostgresPoolSnafu)
|
||||
.unwrap();
|
||||
|
||||
let table_name = "test_table_empty_schema";
|
||||
|
||||
// Create store with auto_create_schema enabled but empty schema name (should succeed)
|
||||
// This should create the table in the default schema (public)
|
||||
let _ = PgStore::with_pg_pool(pool.clone(), Some(""), table_name, 128, true)
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
// Verify table was created in public schema
|
||||
let client = pool.get().await.unwrap();
|
||||
let row = client
|
||||
.query_one(
|
||||
"SELECT table_schema, table_name FROM information_schema.tables WHERE table_name = $1",
|
||||
&[&table_name],
|
||||
)
|
||||
.await
|
||||
.unwrap();
|
||||
let created_table_schema: String = row.get(0);
|
||||
let created_table_name: String = row.get(1);
|
||||
assert_eq!(created_table_name, table_name);
|
||||
// Verify it's in public schema (or whichever is the default)
|
||||
assert!(created_table_schema == "public" || !created_table_schema.is_empty());
|
||||
|
||||
// Cleanup
|
||||
let _ = client
|
||||
.execute(&format!("DROP TABLE IF EXISTS \"{}\"", table_name), &[])
|
||||
.await;
|
||||
}
|
||||
}
|
||||
|
||||
@@ -339,6 +339,7 @@ pub async fn metasrv_builder(
|
||||
opts.meta_schema_name.as_deref(),
|
||||
&opts.meta_table_name,
|
||||
opts.max_txn_ops,
|
||||
opts.auto_create_schema,
|
||||
)
|
||||
.await
|
||||
.context(error::KvBackendSnafu)?;
|
||||
|
||||
@@ -231,6 +231,9 @@ pub struct MetasrvOptions {
|
||||
#[cfg(feature = "pg_kvbackend")]
|
||||
/// Optional PostgreSQL schema for metadata table (defaults to current search_path if empty).
|
||||
pub meta_schema_name: Option<String>,
|
||||
#[cfg(feature = "pg_kvbackend")]
|
||||
/// Automatically create PostgreSQL schema if it doesn't exist (default: true).
|
||||
pub auto_create_schema: bool,
|
||||
#[serde(with = "humantime_serde")]
|
||||
pub node_max_idle_time: Duration,
|
||||
/// The event recorder options.
|
||||
@@ -333,6 +336,8 @@ impl Default for MetasrvOptions {
|
||||
meta_election_lock_id: common_meta::kv_backend::DEFAULT_META_ELECTION_LOCK_ID,
|
||||
#[cfg(feature = "pg_kvbackend")]
|
||||
meta_schema_name: None,
|
||||
#[cfg(feature = "pg_kvbackend")]
|
||||
auto_create_schema: true,
|
||||
node_max_idle_time: Duration::from_secs(24 * 60 * 60),
|
||||
event_recorder: EventRecorderOptions::default(),
|
||||
stats_persistence: StatsPersistenceOptions::default(),
|
||||
|
||||
Reference in New Issue
Block a user