diff --git a/Cargo.lock b/Cargo.lock index 9f55fb71ee..7f30bf1e58 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2159,6 +2159,7 @@ dependencies = [ "postgres", "postgres-protocol", "postgres-types", + "postgres_connection", "postgres_ffi", "pprof", "pq_proto", @@ -2411,6 +2412,7 @@ name = "postgres_connection" version = "0.1.0" dependencies = [ "anyhow", + "itertools", "once_cell", "postgres", "tokio-postgres", diff --git a/libs/postgres_connection/Cargo.toml b/libs/postgres_connection/Cargo.toml index e7b5b49077..314f3c6f1c 100644 --- a/libs/postgres_connection/Cargo.toml +++ b/libs/postgres_connection/Cargo.toml @@ -7,6 +7,7 @@ edition = "2021" [dependencies] anyhow = "1.0" +itertools = "0.10.3" postgres = { git = "https://github.com/neondatabase/rust-postgres.git", rev = "d052ee8b86fff9897c77b0fe89ea9daba0e1fa38" } tokio-postgres = { git = "https://github.com/neondatabase/rust-postgres.git", rev="d052ee8b86fff9897c77b0fe89ea9daba0e1fa38" } url = "2.2.2" diff --git a/libs/postgres_connection/src/lib.rs b/libs/postgres_connection/src/lib.rs index 7edd5b7be6..35344a9168 100644 --- a/libs/postgres_connection/src/lib.rs +++ b/libs/postgres_connection/src/lib.rs @@ -1,4 +1,6 @@ use anyhow::{bail, Context}; +use itertools::Itertools; +use std::borrow::Cow; use std::fmt; use url::Host; @@ -59,10 +61,12 @@ mod tests_parse_host_port { } } +#[derive(Clone)] pub struct PgConnectionConfig { host: Host, port: u16, password: Option, + options: Vec, } /// A simplified PostgreSQL connection configuration. Supports only a subset of possible @@ -75,6 +79,7 @@ impl PgConnectionConfig { host, port, password: None, + options: vec![], } } @@ -101,6 +106,11 @@ impl PgConnectionConfig { self } + pub fn extend_options, S: Into>(mut self, i: I) -> Self { + self.options.extend(i.into_iter().map(|s| s.into())); + self + } + /// Return a `:` string. pub fn raw_address(&self) -> String { format!("{}:{}", self.host(), self.port()) @@ -117,6 +127,36 @@ impl PgConnectionConfig { if let Some(password) = &self.password { config.password(password); } + if !self.options.is_empty() { + // These options are command-line options and should be escaped before being passed + // as an 'options' connection string parameter, see + // https://www.postgresql.org/docs/15/libpq-connect.html#LIBPQ-CONNECT-OPTIONS + // + // They will be space-separated, so each space inside an option should be escaped, + // and all backslashes should be escaped before that. Although we don't expect options + // with spaces at the moment, they're supported by PostgreSQL. Hence we support them + // in this typesafe interface. + // + // We use `Cow` to avoid allocations in the best case (no escaping). A fully imperative + // solution would require 1-2 allocations in the worst case as well, but it's harder to + // implement and this function is hardly a bottleneck. The function is only called around + // establishing a new connection. + #[allow(unstable_name_collisions)] + config.options( + &self + .options + .iter() + .map(|s| { + if s.contains(['\\', ' ']) { + Cow::Owned(s.replace('\\', "\\\\").replace(' ', "\\ ")) + } else { + Cow::Borrowed(s.as_str()) + } + }) + .intersperse(Cow::Borrowed(" ")) // TODO: use impl from std once it's stabilized + .collect::(), + ); + } config } @@ -193,4 +233,21 @@ mod tests_pg_connection_config { "PgConnectionConfig { host: Domain(\"stub.host.example\"), port: 123, password: Some(REDACTED-STRING) }" ); } + + #[test] + fn test_with_options() { + let cfg = PgConnectionConfig::new_host_port(STUB_HOST.clone(), 123).extend_options([ + "hello", + "world", + "with space", + "and \\ backslashes", + ]); + assert_eq!(cfg.host(), &*STUB_HOST); + assert_eq!(cfg.port(), 123); + assert_eq!(cfg.raw_address(), "stub.host.example:123"); + assert_eq!( + cfg.to_tokio_postgres_config().get_options(), + Some("hello world with\\ space and\\ \\\\\\ backslashes") + ); + } } diff --git a/pageserver/Cargo.toml b/pageserver/Cargo.toml index 71ea68071f..c0d068c9c7 100644 --- a/pageserver/Cargo.toml +++ b/pageserver/Cargo.toml @@ -61,6 +61,7 @@ walkdir = "2.3.2" etcd_broker = { path = "../libs/etcd_broker" } metrics = { path = "../libs/metrics" } pageserver_api = { path = "../libs/pageserver_api" } +postgres_connection = { path = "../libs/postgres_connection" } postgres_ffi = { path = "../libs/postgres_ffi" } pq_proto = { path = "../libs/pq_proto" } remote_storage = { path = "../libs/remote_storage" } diff --git a/pageserver/src/http/routes.rs b/pageserver/src/http/routes.rs index db581efc7d..c823fa7bf1 100644 --- a/pageserver/src/http/routes.rs +++ b/pageserver/src/http/routes.rs @@ -91,7 +91,7 @@ async fn build_timeline_info( let guard = timeline.last_received_wal.lock().unwrap(); if let Some(info) = guard.as_ref() { ( - Some(info.wal_source_connstr.clone()), + Some(format!("{:?}", info.wal_source_connconf)), // Password is hidden, but it's for statistics only. Some(info.last_received_msg_lsn), Some(info.last_received_msg_ts), ) diff --git a/pageserver/src/tenant/timeline.rs b/pageserver/src/tenant/timeline.rs index b6eb488020..630d515311 100644 --- a/pageserver/src/tenant/timeline.rs +++ b/pageserver/src/tenant/timeline.rs @@ -40,6 +40,7 @@ use crate::pgdatadir_mapping::{is_rel_fsm_block_key, is_rel_vm_block_key}; use crate::tenant_config::TenantConfOpt; use pageserver_api::reltag::RelTag; +use postgres_connection::PgConnectionConfig; use postgres_ffi::to_pg_timestamp; use utils::{ id::{TenantId, TimelineId}, @@ -296,7 +297,7 @@ impl LogicalSize { } pub struct WalReceiverInfo { - pub wal_source_connstr: String, + pub wal_source_connconf: PgConnectionConfig, pub last_received_msg_lsn: Lsn, pub last_received_msg_ts: u128, } diff --git a/pageserver/src/walreceiver/connection_manager.rs b/pageserver/src/walreceiver/connection_manager.rs index d527e521e0..a2ebfb99d0 100644 --- a/pageserver/src/walreceiver/connection_manager.rs +++ b/pageserver/src/walreceiver/connection_manager.rs @@ -35,6 +35,7 @@ use crate::{ exponential_backoff, walreceiver::get_etcd_client, DEFAULT_BASE_BACKOFF_SECONDS, DEFAULT_MAX_BACKOFF_SECONDS, }; +use postgres_connection::{parse_host_port, PgConnectionConfig}; use utils::{ id::{NodeId, TenantTimelineId}, lsn::Lsn, @@ -247,7 +248,7 @@ async fn connection_manager_loop_step( walreceiver_state .change_connection( new_candidate.safekeeper_id, - new_candidate.wal_source_connstr, + new_candidate.wal_source_connconf, ) .await } @@ -425,7 +426,11 @@ impl WalreceiverState { } /// Shuts down the current connection (if any) and immediately starts another one with the given connection string. - async fn change_connection(&mut self, new_sk_id: NodeId, new_wal_source_connstr: String) { + async fn change_connection( + &mut self, + new_sk_id: NodeId, + new_wal_source_connconf: PgConnectionConfig, + ) { self.drop_old_connection(true).await; let id = self.id; @@ -435,7 +440,7 @@ impl WalreceiverState { async move { super::walreceiver_connection::handle_walreceiver_connection( timeline, - new_wal_source_connstr, + new_wal_source_connconf, events_sender, cancellation, connect_timeout, @@ -575,7 +580,7 @@ impl WalreceiverState { Some(existing_wal_connection) => { let connected_sk_node = existing_wal_connection.sk_id; - let (new_sk_id, new_safekeeper_etcd_data, new_wal_source_connstr) = + let (new_sk_id, new_safekeeper_etcd_data, new_wal_source_connconf) = self.select_connection_candidate(Some(connected_sk_node))?; let now = Utc::now().naive_utc(); @@ -586,7 +591,7 @@ impl WalreceiverState { if latest_interaciton > self.wal_connect_timeout { return Some(NewWalConnectionCandidate { safekeeper_id: new_sk_id, - wal_source_connstr: new_wal_source_connstr, + wal_source_connconf: new_wal_source_connconf, reason: ReconnectReason::NoKeepAlives { last_keep_alive: Some( existing_wal_connection.status.latest_connection_update, @@ -611,7 +616,7 @@ impl WalreceiverState { if new_sk_lsn_advantage >= self.max_lsn_wal_lag.get() { return Some(NewWalConnectionCandidate { safekeeper_id: new_sk_id, - wal_source_connstr: new_wal_source_connstr, + wal_source_connconf: new_wal_source_connconf, reason: ReconnectReason::LaggingWal { current_commit_lsn, new_commit_lsn, @@ -685,7 +690,7 @@ impl WalreceiverState { { return Some(NewWalConnectionCandidate { safekeeper_id: new_sk_id, - wal_source_connstr: new_wal_source_connstr, + wal_source_connconf: new_wal_source_connconf, reason: ReconnectReason::NoWalTimeout { current_lsn, current_commit_lsn, @@ -704,11 +709,11 @@ impl WalreceiverState { self.wal_connection.as_mut().unwrap().discovered_new_wal = discovered_new_wal; } None => { - let (new_sk_id, _, new_wal_source_connstr) = + let (new_sk_id, _, new_wal_source_connconf) = self.select_connection_candidate(None)?; return Some(NewWalConnectionCandidate { safekeeper_id: new_sk_id, - wal_source_connstr: new_wal_source_connstr, + wal_source_connconf: new_wal_source_connconf, reason: ReconnectReason::NoExistingConnection, }); } @@ -726,7 +731,7 @@ impl WalreceiverState { fn select_connection_candidate( &self, node_to_omit: Option, - ) -> Option<(NodeId, &SkTimelineInfo, String)> { + ) -> Option<(NodeId, &SkTimelineInfo, PgConnectionConfig)> { self.applicable_connection_candidates() .filter(|&(sk_id, _, _)| Some(sk_id) != node_to_omit) .max_by_key(|(_, info, _)| info.commit_lsn) @@ -736,7 +741,7 @@ impl WalreceiverState { /// Some safekeepers are filtered by the retry cooldown. fn applicable_connection_candidates( &self, - ) -> impl Iterator { + ) -> impl Iterator { let now = Utc::now().naive_utc(); self.wal_stream_candidates @@ -754,7 +759,7 @@ impl WalreceiverState { }) .filter_map(|(sk_id, etcd_info)| { let info = &etcd_info.timeline; - match wal_stream_connection_string( + match wal_stream_connection_config( self.id, info.safekeeper_connstr.as_deref()?, ) { @@ -797,10 +802,12 @@ impl WalreceiverState { } } -#[derive(Debug, PartialEq, Eq)] +#[derive(Debug)] struct NewWalConnectionCandidate { safekeeper_id: NodeId, - wal_source_connstr: String, + wal_source_connconf: PgConnectionConfig, + // This field is used in `derive(Debug)` only. + #[allow(dead_code)] reason: ReconnectReason, } @@ -828,34 +835,30 @@ enum ReconnectReason { }, } -fn wal_stream_connection_string( +fn wal_stream_connection_config( TenantTimelineId { tenant_id, timeline_id, }: TenantTimelineId, listen_pg_addr_str: &str, -) -> anyhow::Result { - let sk_connstr = format!("postgresql://no_user@{listen_pg_addr_str}/no_db"); - sk_connstr - .parse() - .context("bad url") - .and_then(|url: url::Url| { - let host = url.host_str().context("host is missing")?; - let port = url.port().unwrap_or(5432); // default PG port - - Ok(format!( - "host={host} \ - port={port} \ - options='-c timeline_id={timeline_id} tenant_id={tenant_id}'" - )) - }) - .with_context(|| format!("Failed to parse pageserver connection URL '{sk_connstr}'")) +) -> anyhow::Result { + let (host, port) = + parse_host_port(&listen_pg_addr_str).context("Unable to parse listen_pg_addr_str")?; + let port = port.unwrap_or(5432); + Ok( + PgConnectionConfig::new_host_port(host, port).extend_options([ + "-c".to_owned(), + format!("timeline_id={}", timeline_id), + format!("tenant_id={}", tenant_id), + ]), + ) } #[cfg(test)] mod tests { use super::*; use crate::tenant::harness::{TenantHarness, TIMELINE_ID}; + use url::Host; #[test] fn no_connection_no_candidate() -> anyhow::Result<()> { @@ -992,7 +995,7 @@ mod tests { peer_horizon_lsn: None, local_start_lsn: None, - safekeeper_connstr: Some(DUMMY_SAFEKEEPER_CONNSTR.to_string()), + safekeeper_connstr: Some(DUMMY_SAFEKEEPER_HOST.to_string()), }, etcd_version: 0, latest_update: now, @@ -1064,7 +1067,7 @@ mod tests { peer_horizon_lsn: None, local_start_lsn: None, - safekeeper_connstr: Some(DUMMY_SAFEKEEPER_CONNSTR.to_string()), + safekeeper_connstr: Some(DUMMY_SAFEKEEPER_HOST.to_string()), }, etcd_version: 0, latest_update: now, @@ -1080,9 +1083,10 @@ mod tests { ReconnectReason::NoExistingConnection, "Should select new safekeeper due to missing connection, even if there's also a lag in the wal over the threshold" ); - assert!(only_candidate - .wal_source_connstr - .contains(DUMMY_SAFEKEEPER_CONNSTR)); + assert_eq!( + only_candidate.wal_source_connconf.host(), + &Host::Domain(DUMMY_SAFEKEEPER_HOST.to_owned()) + ); let selected_lsn = 100_000; state.wal_stream_candidates = HashMap::from([ @@ -1116,7 +1120,7 @@ mod tests { peer_horizon_lsn: None, local_start_lsn: None, - safekeeper_connstr: Some(DUMMY_SAFEKEEPER_CONNSTR.to_string()), + safekeeper_connstr: Some(DUMMY_SAFEKEEPER_HOST.to_string()), }, etcd_version: 0, latest_update: now, @@ -1151,9 +1155,10 @@ mod tests { ReconnectReason::NoExistingConnection, "Should select new safekeeper due to missing connection, even if there's also a lag in the wal over the threshold" ); - assert!(biggest_wal_candidate - .wal_source_connstr - .contains(DUMMY_SAFEKEEPER_CONNSTR)); + assert_eq!( + biggest_wal_candidate.wal_source_connconf.host(), + &Host::Domain(DUMMY_SAFEKEEPER_HOST.to_owned()) + ); Ok(()) } @@ -1181,7 +1186,7 @@ mod tests { peer_horizon_lsn: None, local_start_lsn: None, - safekeeper_connstr: Some(DUMMY_SAFEKEEPER_CONNSTR.to_string()), + safekeeper_connstr: Some(DUMMY_SAFEKEEPER_HOST.to_string()), }, etcd_version: 0, latest_update: now, @@ -1199,7 +1204,7 @@ mod tests { peer_horizon_lsn: None, local_start_lsn: None, - safekeeper_connstr: Some(DUMMY_SAFEKEEPER_CONNSTR.to_string()), + safekeeper_connstr: Some(DUMMY_SAFEKEEPER_HOST.to_string()), }, etcd_version: 0, latest_update: now, @@ -1270,7 +1275,7 @@ mod tests { peer_horizon_lsn: None, local_start_lsn: None, - safekeeper_connstr: Some(DUMMY_SAFEKEEPER_CONNSTR.to_string()), + safekeeper_connstr: Some(DUMMY_SAFEKEEPER_HOST.to_string()), }, etcd_version: 0, latest_update: now, @@ -1310,9 +1315,10 @@ mod tests { }, "Should select bigger WAL safekeeper if it starts to lag enough" ); - assert!(over_threshcurrent_candidate - .wal_source_connstr - .contains("advanced_by_lsn_safekeeper")); + assert_eq!( + over_threshcurrent_candidate.wal_source_connconf.host(), + &Host::Domain("advanced_by_lsn_safekeeper".to_owned()) + ); Ok(()) } @@ -1361,7 +1367,7 @@ mod tests { peer_horizon_lsn: None, local_start_lsn: None, - safekeeper_connstr: Some(DUMMY_SAFEKEEPER_CONNSTR.to_string()), + safekeeper_connstr: Some(DUMMY_SAFEKEEPER_HOST.to_string()), }, etcd_version: 0, latest_update: now, @@ -1384,9 +1390,10 @@ mod tests { } unexpected => panic!("Unexpected reason: {unexpected:?}"), } - assert!(over_threshcurrent_candidate - .wal_source_connstr - .contains(DUMMY_SAFEKEEPER_CONNSTR)); + assert_eq!( + over_threshcurrent_candidate.wal_source_connconf.host(), + &Host::Domain(DUMMY_SAFEKEEPER_HOST.to_owned()) + ); Ok(()) } @@ -1434,7 +1441,7 @@ mod tests { peer_horizon_lsn: None, local_start_lsn: None, - safekeeper_connstr: Some(DUMMY_SAFEKEEPER_CONNSTR.to_string()), + safekeeper_connstr: Some(DUMMY_SAFEKEEPER_HOST.to_string()), }, etcd_version: 0, latest_update: now, @@ -1463,14 +1470,15 @@ mod tests { } unexpected => panic!("Unexpected reason: {unexpected:?}"), } - assert!(over_threshcurrent_candidate - .wal_source_connstr - .contains(DUMMY_SAFEKEEPER_CONNSTR)); + assert_eq!( + over_threshcurrent_candidate.wal_source_connconf.host(), + &Host::Domain(DUMMY_SAFEKEEPER_HOST.to_owned()) + ); Ok(()) } - const DUMMY_SAFEKEEPER_CONNSTR: &str = "safekeeper_connstr"; + const DUMMY_SAFEKEEPER_HOST: &str = "safekeeper_connstr"; fn dummy_state(harness: &TenantHarness<'_>) -> WalreceiverState { WalreceiverState { diff --git a/pageserver/src/walreceiver/walreceiver_connection.rs b/pageserver/src/walreceiver/walreceiver_connection.rs index 0070834288..3cd01fba0a 100644 --- a/pageserver/src/walreceiver/walreceiver_connection.rs +++ b/pageserver/src/walreceiver/walreceiver_connection.rs @@ -30,6 +30,7 @@ use crate::{ walingest::WalIngest, walrecord::DecodedWALRecord, }; +use postgres_connection::PgConnectionConfig; use postgres_ffi::waldecoder::WalStreamDecoder; use pq_proto::ReplicationFeedback; use utils::{id::TenantTimelineId, lsn::Lsn}; @@ -56,22 +57,23 @@ pub struct WalConnectionStatus { /// messages as we go. pub async fn handle_walreceiver_connection( timeline: Arc, - wal_source_connstr: String, + wal_source_connconf: PgConnectionConfig, events_sender: watch::Sender>, mut cancellation: watch::Receiver<()>, connect_timeout: Duration, ) -> anyhow::Result<()> { // Connect to the database in replication mode. - info!("connecting to {wal_source_connstr}"); - let connect_cfg = format!("{wal_source_connstr} application_name=pageserver replication=true"); + info!("connecting to {wal_source_connconf:?}"); - let (mut replication_client, connection) = time::timeout( - connect_timeout, - tokio_postgres::connect(&connect_cfg, postgres::NoTls), - ) - .await - .context("Timed out while waiting for walreceiver connection to open")? - .context("Failed to open walreceiver connection")?; + let (mut replication_client, connection) = { + let mut config = wal_source_connconf.to_tokio_postgres_config(); + config.application_name("pageserver"); + config.replication_mode(tokio_postgres::config::ReplicationMode::Physical); + time::timeout(connect_timeout, config.connect(postgres::NoTls)) + .await + .context("Timed out while waiting for walreceiver connection to open")? + .context("Failed to open walreceiver connection")? + }; info!("connected!"); let mut connection_status = WalConnectionStatus { @@ -316,7 +318,7 @@ pub async fn handle_walreceiver_connection( // Update the status about what we just received. This is shown in the mgmt API. let last_received_wal = WalReceiverInfo { - wal_source_connstr: wal_source_connstr.to_owned(), + wal_source_connconf: wal_source_connconf.clone(), last_received_msg_lsn: last_lsn, last_received_msg_ts: ts .duration_since(SystemTime::UNIX_EPOCH)