feat: add optional schema for Postgres metadata tables (#6764)

* feat(meta): add optional schema for Postgres metadata tables

- Add `schema` option to specify a custom schema for metadata tables
- Update `PgStore` and `PgElection` to support optional schema
- Modify SQL templates to use schema when provided
- Add tests for schema support in Postgres backend

Signed-off-by: Logic <zqr10159@dromara.org>

* refactor(meta): remove unused `create_schema_statement` and simplify `PgSqlTemplateFactory`

- Remove `create_schema_statement` from `PgSqlTemplateSet` struct
- Simplify `PgSqlTemplateFactory` by removing `new` method and merging it with `with_schema`
- Update related tests to reflect these changes

Signed-off-by: Logic <zqr10159@dromara.org>

* refactor(meta-srv): remove unused imports

- Remove unused import of BoxedError from common_error::ext- Remove unused import of TlsOption from servers::tls

Signed-off-by: Logic <zqr10159@dromara.org>

* build(meta): update Postgres version and add error handling imports

- Update Postgres version to 17 in docker-compose.yml
- Add BoxedError import for error handling in meta-srv

Signed-off-by: Logic <zqr10159@dromara.org>

* feat(postgres): add support for optional schema in PgElection and related components

Signed-off-by: Logic <zqr10159@dromara.org>

* feat(postgres): add support for optional schema in PgElection and related components

Signed-off-by: Logic <zqr10159@dromara.org>

* fix(develop): update Postgres schema commands to specify host

Signed-off-by: Logic <zqr10159@dromara.org>

* refactor(postgres): simplify plugin options handling and update SQL examples

Signed-off-by: Logic <zqr10159@dromara.org>

* refactor(postgres): simplify plugin options handling and update SQL examples

Signed-off-by: Logic <zqr10159@dromara.org>

* fix(postgres): update meta_election_lock_id description for optional schema support

Signed-off-by: Logic <zqr10159@dromara.org>

* fix(postgres): add health check and fallback wait for Postgres in CI setup

* fix(postgres): update Docker setup for Postgres and add support for Postgres 15

* fix(postgres): remove redundant Postgres setup step in CI configuration

* Update tests-integration/fixtures/postgres/init.sql

Co-authored-by: Weny Xu <wenymedia@gmail.com>

* Update .github/workflows/develop.yml

* Update tests-integration/fixtures/docker-compose.yml

* Update src/common/meta/src/kv_backend/rds/postgres.rs

Co-authored-by: Weny Xu <wenymedia@gmail.com>

* Update src/common/meta/src/kv_backend/rds/postgres.rs

Co-authored-by: Weny Xu <wenymedia@gmail.com>

* Update src/common/meta/src/kv_backend/rds/postgres.rs

Co-authored-by: Weny Xu <wenymedia@gmail.com>

* Update src/common/meta/src/kv_backend/rds/postgres.rs

Co-authored-by: Weny Xu <wenymedia@gmail.com>

* fix: Refactor PostgreSQL backend to support optional schema in PgStore and related SQL templates

* feat: Update PostgreSQL configuration and add PG15 specific integration tests

* feat: Update PostgreSQL configuration and add PG15 specific integration tests

* refactor(postgres): update test schemas from 'greptime_schema' to 'test_schema'

* Update .github/workflows/develop.yml

* refactor: minor factor

Signed-off-by: WenyXu <wenymedia@gmail.com>

* chore: apply suggestions

Signed-off-by: WenyXu <wenymedia@gmail.com>

* fix: fix unit test

Signed-off-by: WenyXu <wenymedia@gmail.com>

---------

Signed-off-by: Logic <zqr10159@dromara.org>
Signed-off-by: WenyXu <wenymedia@gmail.com>
Co-authored-by: Weny Xu <wenymedia@gmail.com>
This commit is contained in:
Logic
2025-08-28 17:24:14 +08:00
committed by GitHub
parent bacd9c7d15
commit cbcfdf9d65
14 changed files with 281 additions and 68 deletions

View File

@@ -628,7 +628,6 @@ impl TableMetadataManager {
&self.topic_region_manager
}
#[cfg(feature = "testing")]
pub fn kv_backend(&self) -> &KvBackendRef {
&self.kv_backend
}

View File

@@ -192,50 +192,61 @@ fn pg_generate_in_placeholders(from: usize, to: usize) -> Vec<String> {
/// Factory for building sql templates.
struct PgSqlTemplateFactory<'a> {
schema_name: Option<&'a str>,
table_name: &'a str,
}
impl<'a> PgSqlTemplateFactory<'a> {
/// Creates a new [`SqlTemplateFactory`] with the given table name.
fn new(table_name: &'a str) -> Self {
Self { table_name }
/// Creates a new factory with optional schema.
fn new(schema_name: Option<&'a str>, table_name: &'a str) -> Self {
Self {
schema_name,
table_name,
}
}
/// Builds the template set for the given table name.
fn build(&self) -> PgSqlTemplateSet {
let table_name = self.table_name;
let table_ident = Self::format_table_ident(self.schema_name, self.table_name);
// Some of queries don't end with `;`, because we need to add `LIMIT` clause.
PgSqlTemplateSet {
table_name: table_name.to_string(),
table_ident: table_ident.clone(),
// Do not attempt to create schema implicitly to avoid extra privileges requirement.
create_table_statement: format!(
"CREATE TABLE IF NOT EXISTS \"{table_name}\"(k bytea PRIMARY KEY, v bytea)",
"CREATE TABLE IF NOT EXISTS {table_ident}(k bytea PRIMARY KEY, v bytea)",
),
range_template: RangeTemplate {
point: format!("SELECT k, v FROM \"{table_name}\" WHERE k = $1"),
point: format!("SELECT k, v FROM {table_ident} WHERE k = $1"),
range: format!(
"SELECT k, v FROM \"{table_name}\" WHERE k >= $1 AND k < $2 ORDER BY k"
"SELECT k, v FROM {table_ident} WHERE k >= $1 AND k < $2 ORDER BY k"
),
full: format!("SELECT k, v FROM \"{table_name}\" ORDER BY k"),
left_bounded: format!("SELECT k, v FROM \"{table_name}\" WHERE k >= $1 ORDER BY k"),
prefix: format!("SELECT k, v FROM \"{table_name}\" WHERE k LIKE $1 ORDER BY k"),
full: format!("SELECT k, v FROM {table_ident} ORDER BY k"),
left_bounded: format!("SELECT k, v FROM {table_ident} WHERE k >= $1 ORDER BY k"),
prefix: format!("SELECT k, v FROM {table_ident} WHERE k LIKE $1 ORDER BY k"),
},
delete_template: RangeTemplate {
point: format!("DELETE FROM \"{table_name}\" WHERE k = $1 RETURNING k,v;"),
range: format!(
"DELETE FROM \"{table_name}\" WHERE k >= $1 AND k < $2 RETURNING k,v;"
),
full: format!("DELETE FROM \"{table_name}\" RETURNING k,v"),
left_bounded: format!("DELETE FROM \"{table_name}\" WHERE k >= $1 RETURNING k,v;"),
prefix: format!("DELETE FROM \"{table_name}\" WHERE k LIKE $1 RETURNING k,v;"),
point: format!("DELETE FROM {table_ident} WHERE k = $1 RETURNING k,v;"),
range: format!("DELETE FROM {table_ident} WHERE k >= $1 AND k < $2 RETURNING k,v;"),
full: format!("DELETE FROM {table_ident} RETURNING k,v"),
left_bounded: format!("DELETE FROM {table_ident} WHERE k >= $1 RETURNING k,v;"),
prefix: format!("DELETE FROM {table_ident} WHERE k LIKE $1 RETURNING k,v;"),
},
}
}
/// Formats the table reference with schema if provided.
fn format_table_ident(schema_name: Option<&str>, table_name: &str) -> String {
match schema_name {
Some(s) if !s.is_empty() => format!("\"{}\".\"{}\"", s, table_name),
_ => format!("\"{}\"", table_name),
}
}
}
/// Templates for the given table name.
#[derive(Debug, Clone)]
pub struct PgSqlTemplateSet {
table_name: String,
table_ident: String,
create_table_statement: String,
range_template: RangeTemplate,
delete_template: RangeTemplate,
@@ -244,27 +255,24 @@ pub struct PgSqlTemplateSet {
impl PgSqlTemplateSet {
/// Generates the sql for batch get.
fn generate_batch_get_query(&self, key_len: usize) -> String {
let table_name = &self.table_name;
let in_clause = pg_generate_in_placeholders(1, key_len).join(", ");
format!(
"SELECT k, v FROM \"{table_name}\" WHERE k in ({});",
in_clause
"SELECT k, v FROM {} WHERE k in ({});",
self.table_ident, in_clause
)
}
/// Generates the sql for batch delete.
fn generate_batch_delete_query(&self, key_len: usize) -> String {
let table_name = &self.table_name;
let in_clause = pg_generate_in_placeholders(1, key_len).join(", ");
format!(
"DELETE FROM \"{table_name}\" WHERE k in ({}) RETURNING k,v;",
in_clause
"DELETE FROM {} WHERE k in ({}) RETURNING k,v;",
self.table_ident, in_clause
)
}
/// Generates the sql for batch upsert.
fn generate_batch_upsert_query(&self, kv_len: usize) -> String {
let table_name = &self.table_name;
let in_placeholders: Vec<String> = (1..=kv_len).map(|i| format!("${}", i)).collect();
let in_clause = in_placeholders.join(", ");
let mut param_index = kv_len + 1;
@@ -278,9 +286,9 @@ impl PgSqlTemplateSet {
format!(
r#"
WITH prev AS (
SELECT k,v FROM "{table_name}" WHERE k IN ({in_clause})
SELECT k,v FROM {table} WHERE k IN ({in_clause})
), update AS (
INSERT INTO "{table_name}" (k, v) VALUES
INSERT INTO {table} (k, v) VALUES
{values_clause}
ON CONFLICT (
k
@@ -289,7 +297,10 @@ impl PgSqlTemplateSet {
)
SELECT k, v FROM prev;
"#
"#,
table = self.table_ident,
in_clause = in_clause,
values_clause = values_clause
)
}
}
@@ -835,7 +846,7 @@ impl PgStore {
.context(CreatePostgresPoolSnafu)?,
};
Self::with_pg_pool(pool, table_name, max_txn_ops).await
Self::with_pg_pool(pool, None, table_name, max_txn_ops).await
}
/// Create [PgStore] impl of [KvBackendRef] from url (backward compatibility).
@@ -843,15 +854,14 @@ impl PgStore {
Self::with_url_and_tls(url, table_name, max_txn_ops, None).await
}
/// Create [PgStore] impl of [KvBackendRef] from [deadpool_postgres::Pool].
/// Create [PgStore] impl of [KvBackendRef] from [deadpool_postgres::Pool] with optional schema.
pub async fn with_pg_pool(
pool: Pool,
schema_name: Option<&str>,
table_name: &str,
max_txn_ops: usize,
) -> Result<KvBackendRef> {
// This step ensures the postgres metadata backend is ready to use.
// We check if greptime_metakv table exists, and we will create a new table
// if it does not exist.
// Ensure the postgres metadata backend is ready to use.
let client = match pool.get().await {
Ok(client) => client,
Err(e) => {
@@ -861,8 +871,9 @@ impl PgStore {
.fail();
}
};
let template_factory = PgSqlTemplateFactory::new(table_name);
let template_factory = PgSqlTemplateFactory::new(schema_name, table_name);
let sql_template_set = template_factory.build();
// Do not attempt to create schema implicitly.
client
.execute(&sql_template_set.create_table_statement, &[])
.await
@@ -890,7 +901,7 @@ mod tests {
test_txn_compare_less, test_txn_compare_not_equal, test_txn_one_compare_op,
text_txn_multi_compare_op, unprepare_kv,
};
use crate::maybe_skip_postgres_integration_test;
use crate::{maybe_skip_postgres15_integration_test, maybe_skip_postgres_integration_test};
async fn build_pg_kv_backend(table_name: &str) -> Option<PgStore> {
let endpoints = std::env::var("GT_POSTGRES_ENDPOINTS").unwrap_or_default();
@@ -905,8 +916,10 @@ mod tests {
.context(CreatePostgresPoolSnafu)
.unwrap();
let client = pool.get().await.unwrap();
let template_factory = PgSqlTemplateFactory::new(table_name);
// use the default schema (i.e., public)
let template_factory = PgSqlTemplateFactory::new(None, table_name);
let sql_templates = template_factory.build();
// Do not attempt to create schema implicitly.
client
.execute(&sql_templates.create_table_statement, &[])
.await
@@ -923,6 +936,61 @@ mod tests {
})
}
async fn build_pg15_pool() -> Option<Pool> {
let url = std::env::var("GT_POSTGRES15_ENDPOINTS").unwrap_or_default();
if url.is_empty() {
return None;
}
let mut cfg = Config::new();
cfg.url = Some(url);
let pool = cfg
.create_pool(Some(Runtime::Tokio1), NoTls)
.context(CreatePostgresPoolSnafu)
.ok()?;
Some(pool)
}
#[tokio::test]
async fn test_pg15_create_table_in_public_should_fail() {
maybe_skip_postgres15_integration_test!();
let Some(pool) = build_pg15_pool().await else {
return;
};
let res = PgStore::with_pg_pool(pool, None, "pg15_public_should_fail", 128).await;
assert!(
res.is_err(),
"creating table in public should fail for test_user"
);
}
#[tokio::test]
async fn test_pg15_create_table_in_test_schema_and_crud_should_succeed() {
maybe_skip_postgres15_integration_test!();
let Some(pool) = build_pg15_pool().await else {
return;
};
let schema_name = std::env::var("GT_POSTGRES15_SCHEMA").unwrap();
let client = pool.get().await.unwrap();
let factory = PgSqlTemplateFactory::new(Some(&schema_name), "pg15_ok");
let templates = factory.build();
client
.execute(&templates.create_table_statement, &[])
.await
.unwrap();
let kv = PgStore {
max_txn_ops: 128,
sql_template_set: templates,
txn_retry_count: RDS_STORE_TXN_RETRY_COUNT,
executor_factory: PgExecutorFactory { pool },
_phantom: PhantomData,
};
let prefix = b"pg15_crud/";
prepare_kv_with_prefix(&kv, prefix.to_vec()).await;
test_kv_put_with_prefix(&kv, prefix.to_vec()).await;
test_kv_batch_get_with_prefix(&kv, prefix.to_vec()).await;
unprepare_kv(&kv, prefix).await;
}
#[tokio::test]
async fn test_pg_put() {
maybe_skip_postgres_integration_test!();
@@ -1024,4 +1092,31 @@ mod tests {
test_txn_compare_less(&kv_backend).await;
test_txn_compare_not_equal(&kv_backend).await;
}
#[test]
fn test_pg_template_with_schema() {
let factory = PgSqlTemplateFactory::new(Some("test_schema"), "greptime_metakv");
let t = factory.build();
assert!(t
.create_table_statement
.contains("\"test_schema\".\"greptime_metakv\""));
let upsert = t.generate_batch_upsert_query(1);
assert!(upsert.contains("\"test_schema\".\"greptime_metakv\""));
let get = t.generate_batch_get_query(1);
assert!(get.contains("\"test_schema\".\"greptime_metakv\""));
let del = t.generate_batch_delete_query(1);
assert!(del.contains("\"test_schema\".\"greptime_metakv\""));
}
#[test]
fn test_format_table_ident() {
let t = PgSqlTemplateFactory::format_table_ident(None, "test_table");
assert_eq!(t, "\"test_table\"");
let t = PgSqlTemplateFactory::format_table_ident(Some("test_schema"), "test_table");
assert_eq!(t, "\"test_schema\".\"test_table\"");
let t = PgSqlTemplateFactory::format_table_ident(Some(""), "test_table");
assert_eq!(t, "\"test_table\"");
}
}

View File

@@ -260,7 +260,7 @@ pub async fn test_kafka_topic_pool(
/// Skip the test if the environment variable `GT_POSTGRES_ENDPOINTS` is not set.
///
/// The format of the environment variable is:
/// ```
/// ```text
/// GT_POSTGRES_ENDPOINTS=localhost:9092,localhost:9093
/// ```
macro_rules! maybe_skip_postgres_integration_test {
@@ -276,7 +276,7 @@ macro_rules! maybe_skip_postgres_integration_test {
/// Skip the test if the environment variable `GT_MYSQL_ENDPOINTS` is not set.
///
/// The format of the environment variable is:
/// ```
/// ```text
/// GT_MYSQL_ENDPOINTS=localhost:9092,localhost:9093
/// ```
macro_rules! maybe_skip_mysql_integration_test {
@@ -287,3 +287,19 @@ macro_rules! maybe_skip_mysql_integration_test {
}
};
}
#[macro_export]
/// Skip the test if the environment variable `GT_POSTGRES15_ENDPOINTS` is not set.
///
/// The format of the environment variable is:
/// ```text
/// GT_POSTGRES15_ENDPOINTS=postgres://user:password@127.0.0.1:5433/postgres
/// ```
macro_rules! maybe_skip_postgres15_integration_test {
() => {
if std::env::var("GT_POSTGRES15_ENDPOINTS").is_err() {
common_telemetry::warn!("The PG15 endpoints is empty, skipping the test");
return;
}
};
}