diff --git a/config/config.md b/config/config.md index a1f8b1e362..5a0e46763f 100644 --- a/config/config.md +++ b/config/config.md @@ -296,6 +296,8 @@ | `store_addrs` | Array | -- | Store server address default to etcd store.
For postgres store, the format is:
"password=password dbname=postgres user=postgres host=localhost port=5432"
For etcd store, the format is:
"127.0.0.1:2379" | | `store_key_prefix` | String | `""` | If it's not empty, the metasrv will store all data with this key prefix. | | `backend` | String | `etcd_store` | The datastore for meta server.
Available values:
- `etcd_store` (default value)
- `memory_store`
- `postgres_store` | +| `meta_table_name` | String | `greptime_metakv` | Table name in RDS to store metadata. Effect when using a RDS kvbackend.
**Only used when backend is `postgres_store`.** | +| `meta_election_lock_id` | Integer | `1` | Advisory lock id in PostgreSQL for election. Effect when using PostgreSQL as kvbackend
Only used when backend is `postgres_store`. | | `selector` | String | `round_robin` | Datanode selector type.
- `round_robin` (default value)
- `lease_based`
- `load_based`
For details, please see "https://docs.greptime.com/developer-guide/metasrv/selector". | | `use_memory_store` | Bool | `false` | Store data in memory. | | `enable_region_failover` | Bool | `false` | Whether to enable region failover.
This feature is only available on GreptimeDB running on cluster mode and
- Using Remote WAL
- Using shared storage (e.g., s3). | diff --git a/config/metasrv.example.toml b/config/metasrv.example.toml index aaf5569a6d..b2b748c7f6 100644 --- a/config/metasrv.example.toml +++ b/config/metasrv.example.toml @@ -24,6 +24,14 @@ store_key_prefix = "" ## - `postgres_store` backend = "etcd_store" +## Table name in RDS to store metadata. Effect when using a RDS kvbackend. +## **Only used when backend is `postgres_store`.** +meta_table_name = "greptime_metakv" + +## Advisory lock id in PostgreSQL for election. Effect when using PostgreSQL as kvbackend +## Only used when backend is `postgres_store`. +meta_election_lock_id = 1 + ## Datanode selector type. ## - `round_robin` (default value) ## - `lease_based` diff --git a/src/meta-srv/src/bootstrap.rs b/src/meta-srv/src/bootstrap.rs index 9ed6ed66c3..aa06205aa4 100644 --- a/src/meta-srv/src/bootstrap.rs +++ b/src/meta-srv/src/bootstrap.rs @@ -230,7 +230,7 @@ pub async fn metasrv_builder( (None, BackendImpl::PostgresStore) => { let pool = create_postgres_pool(opts).await?; // TODO(CookiePie): use table name from config. - let kv_backend = PgStore::with_pg_pool(pool, "greptime_metakv", opts.max_txn_ops) + let kv_backend = PgStore::with_pg_pool(pool, &opts.meta_table_name, opts.max_txn_ops) .await .context(error::KvBackendSnafu)?; // Client for election should be created separately since we need a different session keep-alive idle time. @@ -240,6 +240,8 @@ pub async fn metasrv_builder( election_client, opts.store_key_prefix.clone(), CANDIDATE_LEASE_SECS, + &opts.meta_table_name, + opts.meta_election_lock_id, ) .await?; (kv_backend, Some(election)) diff --git a/src/meta-srv/src/election/postgres.rs b/src/meta-srv/src/election/postgres.rs index 192fa682bf..2aee20d0f9 100644 --- a/src/meta-srv/src/election/postgres.rs +++ b/src/meta-srv/src/election/postgres.rs @@ -35,50 +35,138 @@ use crate::error::{ }; use crate::metasrv::{ElectionRef, LeaderValue, MetasrvNodeInfo}; -// TODO(CookiePie): The lock id should be configurable. -const CAMPAIGN: &str = "SELECT pg_try_advisory_lock({})"; -const STEP_DOWN: &str = "SELECT pg_advisory_unlock({})"; -// Currently the session timeout is longer than the leader lease time, so the leader lease may expire while the session is still alive. -// Either the leader reconnects and step down or the session expires and the lock is released. -const SET_IDLE_SESSION_TIMEOUT: &str = "SET idle_session_timeout = '10s';"; - // Separator between value and expire time. const LEASE_SEP: &str = r#"||__metadata_lease_sep||"#; -// SQL to put a value with expire time. Parameters: key, value, LEASE_SEP, expire_time -const PUT_IF_NOT_EXISTS_WITH_EXPIRE_TIME: &str = r#" -WITH prev AS ( - SELECT k, v FROM greptime_metakv WHERE k = $1 -), insert AS ( - INSERT INTO greptime_metakv - VALUES($1, convert_to($2 || $3 || TO_CHAR(CURRENT_TIMESTAMP + INTERVAL '1 second' * $4, 'YYYY-MM-DD HH24:MI:SS.MS'), 'UTF8')) - ON CONFLICT (k) DO NOTHING -) - -SELECT k, v FROM prev; -"#; - -// SQL to update a value with expire time. Parameters: key, prev_value_with_lease, updated_value, LEASE_SEP, expire_time -const CAS_WITH_EXPIRE_TIME: &str = r#" -UPDATE greptime_metakv -SET k=$1, -v=convert_to($3 || $4 || TO_CHAR(CURRENT_TIMESTAMP + INTERVAL '1 second' * $5, 'YYYY-MM-DD HH24:MI:SS.MS'), 'UTF8') -WHERE - k=$1 AND v=$2 -"#; - -const GET_WITH_CURRENT_TIMESTAMP: &str = r#"SELECT v, TO_CHAR(CURRENT_TIMESTAMP, 'YYYY-MM-DD HH24:MI:SS.MS') FROM greptime_metakv WHERE k = $1"#; - -const PREFIX_GET_WITH_CURRENT_TIMESTAMP: &str = r#"SELECT v, TO_CHAR(CURRENT_TIMESTAMP, 'YYYY-MM-DD HH24:MI:SS.MS') FROM greptime_metakv WHERE k LIKE $1"#; - -const POINT_DELETE: &str = "DELETE FROM greptime_metakv WHERE k = $1 RETURNING k,v;"; - -fn campaign_sql(lock_id: u64) -> String { - CAMPAIGN.replace("{}", &lock_id.to_string()) +struct ElectionSqlFactory<'a> { + lock_id: u64, + table_name: &'a str, } -fn step_down_sql(lock_id: u64) -> String { - STEP_DOWN.replace("{}", &lock_id.to_string()) +struct ElectionSqlSet { + campaign: String, + step_down: String, + // SQL to put a value with expire time. + // + // Parameters for the query: + // `$1`: key, + // `$2`: value, + // `$3`: lease time in seconds + // + // Returns: + // If the key already exists, return the previous value. + put_value_with_lease: String, + // SQL to update a value with expire time. + // + // Parameters for the query: + // `$1`: key, + // `$2`: previous value, + // `$3`: updated value, + // `$4`: lease time in seconds + update_value_with_lease: String, + // SQL to get a value with expire time. + // + // Parameters: + // `$1`: key + get_value_with_lease: String, + // SQL to get all values with expire time with the given key prefix. + // + // Parameters: + // `$1`: key prefix like 'prefix%' + // + // Returns: + // column 0: value, + // column 1: current timestamp + get_value_with_lease_by_prefix: String, + // SQL to delete a value. + // + // Parameters: + // `$1`: key + // + // Returns: + // column 0: key deleted, + // column 1: value deleted + delete_value: String, +} + +impl<'a> ElectionSqlFactory<'a> { + fn new(lock_id: u64, table_name: &'a str) -> Self { + Self { + lock_id, + table_name, + } + } + + fn build(self) -> ElectionSqlSet { + ElectionSqlSet { + campaign: self.campaign_sql(), + step_down: self.step_down_sql(), + put_value_with_lease: self.put_value_with_lease_sql(), + update_value_with_lease: self.update_value_with_lease_sql(), + get_value_with_lease: self.get_value_with_lease_sql(), + get_value_with_lease_by_prefix: self.get_value_with_lease_by_prefix_sql(), + delete_value: self.delete_value_sql(), + } + } + + // Currently the session timeout is longer than the leader lease time, so the leader lease may expire while the session is still alive. + // Either the leader reconnects and step down or the session expires and the lock is released. + fn set_idle_session_timeout_sql(&self) -> &str { + "SET idle_session_timeout = '10s';" + } + + fn campaign_sql(&self) -> String { + format!("SELECT pg_try_advisory_lock({})", self.lock_id) + } + + fn step_down_sql(&self) -> String { + format!("SELECT pg_advisory_unlock({})", self.lock_id) + } + + fn put_value_with_lease_sql(&self) -> String { + format!( + r#"WITH prev AS ( + SELECT k, v FROM {} WHERE k = $1 + ), insert AS ( + INSERT INTO {} + VALUES($1, convert_to($2 || '{}' || TO_CHAR(CURRENT_TIMESTAMP + INTERVAL '1 second' * $3, 'YYYY-MM-DD HH24:MI:SS.MS'), 'UTF8')) + ON CONFLICT (k) DO NOTHING + ) + SELECT k, v FROM prev; + "#, + self.table_name, self.table_name, LEASE_SEP + ) + } + + fn update_value_with_lease_sql(&self) -> String { + format!( + r#"UPDATE {} + SET v = convert_to($3 || '{}' || TO_CHAR(CURRENT_TIMESTAMP + INTERVAL '1 second' * $4, 'YYYY-MM-DD HH24:MI:SS.MS'), 'UTF8') + WHERE k = $1 AND v = $2"#, + self.table_name, LEASE_SEP + ) + } + + fn get_value_with_lease_sql(&self) -> String { + format!( + r#"SELECT v, TO_CHAR(CURRENT_TIMESTAMP, 'YYYY-MM-DD HH24:MI:SS.MS') FROM {} WHERE k = $1"#, + self.table_name + ) + } + + fn get_value_with_lease_by_prefix_sql(&self) -> String { + format!( + r#"SELECT v, TO_CHAR(CURRENT_TIMESTAMP, 'YYYY-MM-DD HH24:MI:SS.MS') FROM {} WHERE k LIKE $1"#, + self.table_name + ) + } + + fn delete_value_sql(&self) -> String { + format!( + "DELETE FROM {} WHERE k = $1 RETURNING k,v;", + self.table_name + ) + } } /// Parse the value and expire time from the given string. The value should be in the format "value || LEASE_SEP || expire_time". @@ -138,7 +226,7 @@ pub struct PgElection { leader_watcher: broadcast::Sender, store_key_prefix: String, candidate_lease_ttl_secs: u64, - lock_id: u64, + sql_set: ElectionSqlSet, } impl PgElection { @@ -147,10 +235,13 @@ impl PgElection { client: Client, store_key_prefix: String, candidate_lease_ttl_secs: u64, + table_name: &str, + lock_id: u64, ) -> Result { + let sql_factory = ElectionSqlFactory::new(lock_id, table_name); // Set idle session timeout to IDLE_SESSION_TIMEOUT to avoid dead advisory lock. client - .execute(SET_IDLE_SESSION_TIMEOUT, &[]) + .execute(sql_factory.set_idle_session_timeout_sql(), &[]) .await .context(PostgresExecutionSnafu)?; @@ -163,8 +254,7 @@ impl PgElection { leader_watcher: tx, store_key_prefix, candidate_lease_ttl_secs, - // TODO(CookiePie): The lock id should be configurable. - lock_id: 28319, + sql_set: sql_factory.build(), })) } @@ -276,7 +366,7 @@ impl Election for PgElection { loop { let res = self .client - .query(&campaign_sql(self.lock_id), &[]) + .query(&self.sql_set.campaign, &[]) .await .context(PostgresExecutionSnafu)?; if let Some(row) = res.first() { @@ -333,10 +423,10 @@ impl PgElection { key: &str, with_origin: bool, ) -> Result)>> { - let key = key.as_bytes().to_vec(); + let key = key.as_bytes(); let res = self .client - .query(GET_WITH_CURRENT_TIMESTAMP, &[&key as &(dyn ToSql + Sync)]) + .query(&self.sql_set.get_value_with_lease, &[&key]) .await .context(PostgresExecutionSnafu)?; @@ -378,10 +468,7 @@ impl PgElection { let key_prefix = format!("{}%", key_prefix).as_bytes().to_vec(); let res = self .client - .query( - PREFIX_GET_WITH_CURRENT_TIMESTAMP, - &[(&key_prefix as &(dyn ToSql + Sync))], - ) + .query(&self.sql_set.get_value_with_lease_by_prefix, &[&key_prefix]) .await .context(PostgresExecutionSnafu)?; @@ -406,17 +493,16 @@ impl PgElection { } async fn update_value_with_lease(&self, key: &str, prev: &str, updated: &str) -> Result<()> { - let key = key.as_bytes().to_vec(); - let prev = prev.as_bytes().to_vec(); + let key = key.as_bytes(); + let prev = prev.as_bytes(); let res = self .client .execute( - CAS_WITH_EXPIRE_TIME, + &self.sql_set.update_value_with_lease, &[ - &key as &(dyn ToSql + Sync), - &prev as &(dyn ToSql + Sync), + &key, + &prev, &updated, - &LEASE_SEP, &(self.candidate_lease_ttl_secs as f64), ], ) @@ -426,7 +512,7 @@ impl PgElection { ensure!( res == 1, UnexpectedSnafu { - violated: format!("Failed to update key: {}", String::from_utf8_lossy(&key)), + violated: format!("Failed to update key: {}", String::from_utf8_lossy(key)), } ); @@ -440,17 +526,12 @@ impl PgElection { value: &str, lease_ttl_secs: u64, ) -> Result { - let key = key.as_bytes().to_vec(); + let key = key.as_bytes(); let lease_ttl_secs = lease_ttl_secs as f64; - let params: Vec<&(dyn ToSql + Sync)> = vec![ - &key as &(dyn ToSql + Sync), - &value as &(dyn ToSql + Sync), - &LEASE_SEP, - &lease_ttl_secs, - ]; + let params: Vec<&(dyn ToSql + Sync)> = vec![&key, &value, &lease_ttl_secs]; let res = self .client - .query(PUT_IF_NOT_EXISTS_WITH_EXPIRE_TIME, ¶ms) + .query(&self.sql_set.put_value_with_lease, ¶ms) .await .context(PostgresExecutionSnafu)?; Ok(res.is_empty()) @@ -459,10 +540,10 @@ impl PgElection { /// Returns `true` if the deletion is successful. /// Caution: Should only delete the key if the lease is expired. async fn delete_value(&self, key: &str) -> Result { - let key = key.as_bytes().to_vec(); + let key = key.as_bytes(); let res = self .client - .query(POINT_DELETE, &[&key as &(dyn ToSql + Sync)]) + .query(&self.sql_set.delete_value, &[&key]) .await .context(PostgresExecutionSnafu)?; @@ -574,7 +655,7 @@ impl PgElection { { self.delete_value(&key).await?; self.client - .query(&step_down_sql(self.lock_id), &[]) + .query(&self.sql_set.step_down, &[]) .await .context(PostgresExecutionSnafu)?; if let Err(e) = self @@ -686,7 +767,7 @@ mod tests { leader_watcher: tx, store_key_prefix: uuid::Uuid::new_v4().to_string(), candidate_lease_ttl_secs: 10, - lock_id: 28319, + sql_set: ElectionSqlFactory::new(28319, "greptime_metakv").build(), }; let res = pg_election @@ -760,7 +841,7 @@ mod tests { leader_watcher: tx, store_key_prefix, candidate_lease_ttl_secs, - lock_id: 28319, + sql_set: ElectionSqlFactory::new(28319, "greptime_metakv").build(), }; let node_info = MetasrvNodeInfo { @@ -802,7 +883,7 @@ mod tests { leader_watcher: tx, store_key_prefix: store_key_prefix.clone(), candidate_lease_ttl_secs, - lock_id: 28319, + sql_set: ElectionSqlFactory::new(28319, "greptime_metakv").build(), }; let candidates = pg_election.all_candidates().await.unwrap(); @@ -843,7 +924,7 @@ mod tests { leader_watcher: tx, store_key_prefix: uuid::Uuid::new_v4().to_string(), candidate_lease_ttl_secs, - lock_id: 28320, + sql_set: ElectionSqlFactory::new(28320, "greptime_metakv").build(), }; leader_pg_election.elected().await.unwrap(); @@ -952,13 +1033,13 @@ mod tests { leader_watcher: tx, store_key_prefix, candidate_lease_ttl_secs, - lock_id: 28321, + sql_set: ElectionSqlFactory::new(28321, "greptime_metakv").build(), }; // Step 1: No leader exists, campaign and elected. let res = leader_pg_election .client - .query(&campaign_sql(leader_pg_election.lock_id), &[]) + .query(&leader_pg_election.sql_set.campaign, &[]) .await .unwrap(); let res: bool = res[0].get(0); @@ -989,7 +1070,7 @@ mod tests { // Step 2: As a leader, renew the lease. let res = leader_pg_election .client - .query(&campaign_sql(leader_pg_election.lock_id), &[]) + .query(&leader_pg_election.sql_set.campaign, &[]) .await .unwrap(); let res: bool = res[0].get(0); @@ -1009,7 +1090,7 @@ mod tests { let res = leader_pg_election .client - .query(&campaign_sql(leader_pg_election.lock_id), &[]) + .query(&leader_pg_election.sql_set.campaign, &[]) .await .unwrap(); let res: bool = res[0].get(0); @@ -1037,7 +1118,7 @@ mod tests { // Step 4: Re-campaign and elected. let res = leader_pg_election .client - .query(&campaign_sql(leader_pg_election.lock_id), &[]) + .query(&leader_pg_election.sql_set.campaign, &[]) .await .unwrap(); let res: bool = res[0].get(0); @@ -1094,7 +1175,7 @@ mod tests { // Step 6: Re-campaign and elected. let res = leader_pg_election .client - .query(&campaign_sql(leader_pg_election.lock_id), &[]) + .query(&leader_pg_election.sql_set.campaign, &[]) .await .unwrap(); let res: bool = res[0].get(0); @@ -1125,7 +1206,7 @@ mod tests { // Step 7: Something wrong, the leader key changed by others. let res = leader_pg_election .client - .query(&campaign_sql(leader_pg_election.lock_id), &[]) + .query(&leader_pg_election.sql_set.campaign, &[]) .await .unwrap(); let res: bool = res[0].get(0); @@ -1162,7 +1243,7 @@ mod tests { // Clean up leader_pg_election .client - .query(&step_down_sql(leader_pg_election.lock_id), &[]) + .query(&leader_pg_election.sql_set.step_down, &[]) .await .unwrap(); } @@ -1183,7 +1264,7 @@ mod tests { leader_watcher: tx, store_key_prefix: store_key_prefix.clone(), candidate_lease_ttl_secs, - lock_id: 28322, + sql_set: ElectionSqlFactory::new(28322, "greptime_metakv").build(), }; let leader_client = create_postgres_client().await.unwrap(); @@ -1196,12 +1277,12 @@ mod tests { leader_watcher: tx, store_key_prefix, candidate_lease_ttl_secs, - lock_id: 28322, + sql_set: ElectionSqlFactory::new(28322, "greptime_metakv").build(), }; leader_pg_election .client - .query(&campaign_sql(leader_pg_election.lock_id), &[]) + .query(&leader_pg_election.sql_set.campaign, &[]) .await .unwrap(); leader_pg_election.elected().await.unwrap(); @@ -1242,7 +1323,7 @@ mod tests { // Clean up leader_pg_election .client - .query(&step_down_sql(leader_pg_election.lock_id), &[]) + .query(&leader_pg_election.sql_set.step_down, &[]) .await .unwrap(); } diff --git a/src/meta-srv/src/metasrv.rs b/src/meta-srv/src/metasrv.rs index 93921cbdd6..029b049d68 100644 --- a/src/meta-srv/src/metasrv.rs +++ b/src/meta-srv/src/metasrv.rs @@ -71,6 +71,11 @@ pub const TABLE_ID_SEQ: &str = "table_id"; pub const FLOW_ID_SEQ: &str = "flow_id"; pub const METASRV_HOME: &str = "/tmp/metasrv"; +#[cfg(feature = "pg_kvbackend")] +pub const DEFAULT_META_TABLE_NAME: &str = "greptime_metakv"; +#[cfg(feature = "pg_kvbackend")] +pub const DEFAULT_META_ELECTION_LOCK_ID: u64 = 1; + // The datastores that implements metadata kvbackend. #[derive(Clone, Debug, PartialEq, Serialize, Default, Deserialize, ValueEnum)] #[serde(rename_all = "snake_case")] @@ -140,6 +145,12 @@ pub struct MetasrvOptions { pub tracing: TracingOptions, /// The datastore for kv metadata. pub backend: BackendImpl, + #[cfg(feature = "pg_kvbackend")] + /// Table name of rds kv backend. + pub meta_table_name: String, + #[cfg(feature = "pg_kvbackend")] + /// Lock id for meta kv election. Only effect when using pg_kvbackend. + pub meta_election_lock_id: u64, } const DEFAULT_METASRV_ADDR_PORT: &str = "3002"; @@ -177,6 +188,10 @@ impl Default for MetasrvOptions { flush_stats_factor: 3, tracing: TracingOptions::default(), backend: BackendImpl::EtcdStore, + #[cfg(feature = "pg_kvbackend")] + meta_table_name: DEFAULT_META_TABLE_NAME.to_string(), + #[cfg(feature = "pg_kvbackend")] + meta_election_lock_id: DEFAULT_META_ELECTION_LOCK_ID, } } }