From 0df3467146f9368d67a145c59949e7f0ee6c01c9 Mon Sep 17 00:00:00 2001 From: Dmitry Ivanov Date: Mon, 31 Oct 2022 18:39:29 +0300 Subject: [PATCH] Refactoring: replace `utils::connstring` with `Url`-based APIs --- Cargo.lock | 1 + control_plane/Cargo.toml | 15 ++--- control_plane/src/compute.rs | 4 +- control_plane/src/connection.rs | 57 +++++++++++++++++++ control_plane/src/lib.rs | 1 + control_plane/src/safekeeper.rs | 12 ++-- control_plane/src/storage.rs | 15 +++-- libs/utils/src/connstring.rs | 52 ----------------- libs/utils/src/lib.rs | 3 - .../src/walreceiver/connection_manager.rs | 37 ++++++------ 10 files changed, 103 insertions(+), 94 deletions(-) create mode 100644 control_plane/src/connection.rs delete mode 100644 libs/utils/src/connstring.rs diff --git a/Cargo.lock b/Cargo.lock index 3e67126add..326cccaecb 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -600,6 +600,7 @@ dependencies = [ "tar", "thiserror", "toml", + "url", "utils", "workspace_hack", ] diff --git a/control_plane/Cargo.toml b/control_plane/Cargo.toml index 287385c709..a9d30b4a86 100644 --- a/control_plane/Cargo.toml +++ b/control_plane/Cargo.toml @@ -4,20 +4,21 @@ version = "0.1.0" edition = "2021" [dependencies] +anyhow = "1.0" clap = "4.0" comfy-table = "6.1" git-version = "0.3.5" -tar = "0.4.38" +nix = "0.25" +once_cell = "1.13.0" postgres = { git = "https://github.com/neondatabase/rust-postgres.git", rev = "d052ee8b86fff9897c77b0fe89ea9daba0e1fa38" } +regex = "1" +reqwest = { version = "0.11", default-features = false, features = ["blocking", "json", "rustls-tls"] } serde = { version = "1.0", features = ["derive"] } serde_with = "2.0" -toml = "0.5" -once_cell = "1.13.0" -regex = "1" -anyhow = "1.0" +tar = "0.4.38" thiserror = "1" -nix = "0.25" -reqwest = { version = "0.11", default-features = false, features = ["blocking", "json", "rustls-tls"] } +toml = "0.5" +url = "2.2.2" # Note: Do not directly depend on pageserver or safekeeper; use pageserver_api or safekeeper_api # instead, so that recompile times are better. diff --git a/control_plane/src/compute.rs b/control_plane/src/compute.rs index b3f90b5922..89e4e85eb0 100644 --- a/control_plane/src/compute.rs +++ b/control_plane/src/compute.rs @@ -12,7 +12,6 @@ use std::time::Duration; use anyhow::{Context, Result}; use utils::{ - connstring::connection_host_port, id::{TenantId, TimelineId}, lsn::Lsn, postgres_backend::AuthType, @@ -300,7 +299,8 @@ impl PostgresNode { // Configure the node to fetch pages from pageserver let pageserver_connstr = { - let (host, port) = connection_host_port(&self.pageserver.pg_connection_config); + let config = &self.pageserver.pg_connection_config; + let (host, port) = (config.host(), config.port()); // Set up authentication // diff --git a/control_plane/src/connection.rs b/control_plane/src/connection.rs new file mode 100644 index 0000000000..cca837de6e --- /dev/null +++ b/control_plane/src/connection.rs @@ -0,0 +1,57 @@ +use url::Url; + +#[derive(Debug)] +pub struct PgConnectionConfig { + url: Url, +} + +impl PgConnectionConfig { + pub fn host(&self) -> &str { + self.url.host_str().expect("BUG: no host") + } + + pub fn port(&self) -> u16 { + self.url.port().expect("BUG: no port") + } + + /// Return a `:` string. + pub fn raw_address(&self) -> String { + format!("{}:{}", self.host(), self.port()) + } + + /// Connect using postgres protocol with TLS disabled. + pub fn connect_no_tls(&self) -> Result { + postgres::Client::connect(self.url.as_str(), postgres::NoTls) + } +} + +impl std::str::FromStr for PgConnectionConfig { + type Err = anyhow::Error; + + fn from_str(s: &str) -> Result { + let mut url: Url = s.parse()?; + + match url.scheme() { + "postgres" | "postgresql" => {} + other => anyhow::bail!("invalid scheme: {other}"), + } + + // It's not a valid connection url if host is unavailable. + if url.host().is_none() { + anyhow::bail!(url::ParseError::EmptyHost); + } + + // E.g. `postgres:bar`. + if url.cannot_be_a_base() { + anyhow::bail!("URL cannot be a base"); + } + + // Set the default PG port if it's missing. + if url.port().is_none() { + url.set_port(Some(5432)) + .expect("BUG: couldn't set the default port"); + } + + Ok(Self { url }) + } +} diff --git a/control_plane/src/lib.rs b/control_plane/src/lib.rs index 17232ccf45..f22dce1810 100644 --- a/control_plane/src/lib.rs +++ b/control_plane/src/lib.rs @@ -12,6 +12,7 @@ use std::path::Path; use std::process::Command; pub mod compute; +pub mod connection; pub mod etcd; pub mod local_env; pub mod postgresql_conf; diff --git a/control_plane/src/safekeeper.rs b/control_plane/src/safekeeper.rs index 17f5d0c109..91cedeca23 100644 --- a/control_plane/src/safekeeper.rs +++ b/control_plane/src/safekeeper.rs @@ -9,12 +9,12 @@ use anyhow::bail; use nix::errno::Errno; use nix::sys::signal::{kill, Signal}; use nix::unistd::Pid; -use postgres::Config; use reqwest::blocking::{Client, RequestBuilder, Response}; use reqwest::{IntoUrl, Method}; use thiserror::Error; -use utils::{connstring::connection_address, http::error::HttpErrorBody, id::NodeId}; +use utils::{http::error::HttpErrorBody, id::NodeId}; +use crate::connection::PgConnectionConfig; use crate::local_env::{LocalEnv, SafekeeperConf}; use crate::storage::PageServerNode; use crate::{fill_aws_secrets_vars, fill_rust_env_vars, read_pidfile}; @@ -63,7 +63,7 @@ pub struct SafekeeperNode { pub conf: SafekeeperConf, - pub pg_connection_config: Config, + pub pg_connection_config: PgConnectionConfig, pub env: LocalEnv, pub http_client: Client, pub http_base_url: String, @@ -87,9 +87,9 @@ impl SafekeeperNode { } /// Construct libpq connection string for connecting to this safekeeper. - fn safekeeper_connection_config(port: u16) -> Config { + fn safekeeper_connection_config(port: u16) -> PgConnectionConfig { // TODO safekeeper authentication not implemented yet - format!("postgresql://no_user@127.0.0.1:{}/no_db", port) + format!("postgresql://no_user@127.0.0.1:{port}/no_db") .parse() .unwrap() } @@ -109,7 +109,7 @@ impl SafekeeperNode { pub fn start(&self) -> anyhow::Result<()> { print!( "Starting safekeeper at '{}' in '{}'", - connection_address(&self.pg_connection_config), + self.pg_connection_config.raw_address(), self.datadir_path().display() ); io::stdout().flush().unwrap(); diff --git a/control_plane/src/storage.rs b/control_plane/src/storage.rs index 59cb3d7efb..4b705690f0 100644 --- a/control_plane/src/storage.rs +++ b/control_plane/src/storage.rs @@ -7,6 +7,7 @@ use std::process::Command; use std::time::Duration; use std::{io, result, thread}; +use crate::connection::PgConnectionConfig; use anyhow::{bail, Context}; use nix::errno::Errno; use nix::sys::signal::{kill, Signal}; @@ -14,12 +15,10 @@ use nix::unistd::Pid; use pageserver_api::models::{ TenantConfigRequest, TenantCreateRequest, TenantInfo, TimelineCreateRequest, TimelineInfo, }; -use postgres::{Config, NoTls}; use reqwest::blocking::{Client, RequestBuilder, Response}; use reqwest::{IntoUrl, Method}; use thiserror::Error; use utils::{ - connstring::connection_address, http::error::HttpErrorBody, id::{TenantId, TimelineId}, lsn::Lsn, @@ -75,7 +74,7 @@ impl ResponseErrorMessageExt for Response { // #[derive(Debug)] pub struct PageServerNode { - pub pg_connection_config: Config, + pub pg_connection_config: PgConnectionConfig, pub env: LocalEnv, pub http_client: Client, pub http_base_url: String, @@ -101,7 +100,7 @@ impl PageServerNode { } /// Construct libpq connection string for connecting to the pageserver. - fn pageserver_connection_config(password: &str, listen_addr: &str) -> Config { + fn pageserver_connection_config(password: &str, listen_addr: &str) -> PgConnectionConfig { format!("postgresql://no_user:{password}@{listen_addr}/no_db") .parse() .unwrap() @@ -212,7 +211,7 @@ impl PageServerNode { ) -> anyhow::Result<()> { println!( "Starting pageserver at '{}' in '{}'", - connection_address(&self.pg_connection_config), + self.pg_connection_config.raw_address(), datadir.display() ); io::stdout().flush()?; @@ -343,14 +342,14 @@ impl PageServerNode { } pub fn page_server_psql(&self, sql: &str) -> Vec { - let mut client = self.pg_connection_config.connect(NoTls).unwrap(); + let mut client = self.pg_connection_config.connect_no_tls().unwrap(); println!("Pageserver query: '{sql}'"); client.simple_query(sql).unwrap() } pub fn page_server_psql_client(&self) -> result::Result { - self.pg_connection_config.connect(NoTls) + self.pg_connection_config.connect_no_tls() } fn http_request(&self, method: Method, url: U) -> RequestBuilder { @@ -549,7 +548,7 @@ impl PageServerNode { pg_wal: Option<(Lsn, PathBuf)>, pg_version: u32, ) -> anyhow::Result<()> { - let mut client = self.pg_connection_config.connect(NoTls).unwrap(); + let mut client = self.pg_connection_config.connect_no_tls().unwrap(); // Init base reader let (start_lsn, base_tarfile_path) = base; diff --git a/libs/utils/src/connstring.rs b/libs/utils/src/connstring.rs deleted file mode 100644 index cda8eeac86..0000000000 --- a/libs/utils/src/connstring.rs +++ /dev/null @@ -1,52 +0,0 @@ -use postgres::Config; - -pub fn connection_host_port(config: &Config) -> (String, u16) { - assert_eq!( - config.get_hosts().len(), - 1, - "only one pair of host and port is supported in connection string" - ); - assert_eq!( - config.get_ports().len(), - 1, - "only one pair of host and port is supported in connection string" - ); - let host = match &config.get_hosts()[0] { - postgres::config::Host::Tcp(host) => host.as_ref(), - postgres::config::Host::Unix(host) => host.to_str().unwrap(), - }; - (host.to_owned(), config.get_ports()[0]) -} - -pub fn connection_address(config: &Config) -> String { - let (host, port) = connection_host_port(config); - format!("{}:{}", host, port) -} - -#[cfg(test)] -mod tests { - use super::*; - - #[test] - fn test_connection_host_port() { - let config: Config = "postgresql://no_user@localhost:64000/no_db" - .parse() - .unwrap(); - assert_eq!( - connection_host_port(&config), - ("localhost".to_owned(), 64000) - ); - } - - #[test] - #[should_panic(expected = "only one pair of host and port is supported in connection string")] - fn test_connection_host_port_multiple_ports() { - let config: Config = "postgresql://no_user@localhost:64000,localhost:64001/no_db" - .parse() - .unwrap(); - assert_eq!( - connection_host_port(&config), - ("localhost".to_owned(), 64000) - ); - } -} diff --git a/libs/utils/src/lib.rs b/libs/utils/src/lib.rs index f1f48f5a90..aff86c8076 100644 --- a/libs/utils/src/lib.rs +++ b/libs/utils/src/lib.rs @@ -19,9 +19,6 @@ pub mod postgres_backend; pub mod postgres_backend_async; pub mod pq_proto; -// dealing with connstring parsing and handy access to it's parts -pub mod connstring; - // helper functions for creating and fsyncing pub mod crashsafe; diff --git a/pageserver/src/walreceiver/connection_manager.rs b/pageserver/src/walreceiver/connection_manager.rs index 1d53df788d..d527e521e0 100644 --- a/pageserver/src/walreceiver/connection_manager.rs +++ b/pageserver/src/walreceiver/connection_manager.rs @@ -836,15 +836,20 @@ fn wal_stream_connection_string( listen_pg_addr_str: &str, ) -> anyhow::Result { let sk_connstr = format!("postgresql://no_user@{listen_pg_addr_str}/no_db"); - let me_conf = sk_connstr - .parse::() - .with_context(|| { - format!("Failed to parse pageserver connection string '{sk_connstr}' as a postgres one") - })?; - let (host, port) = utils::connstring::connection_host_port(&me_conf); - Ok(format!( - "host={host} port={port} options='-c timeline_id={timeline_id} tenant_id={tenant_id}'" - )) + sk_connstr + .parse() + .context("bad url") + .and_then(|url: url::Url| { + let host = url.host_str().context("host is missing")?; + let port = url.port().unwrap_or(5432); // default PG port + + Ok(format!( + "host={host} \ + port={port} \ + options='-c timeline_id={timeline_id} tenant_id={tenant_id}'" + )) + }) + .with_context(|| format!("Failed to parse pageserver connection URL '{sk_connstr}'")) } #[cfg(test)] @@ -892,7 +897,7 @@ mod tests { peer_horizon_lsn: None, local_start_lsn: None, - safekeeper_connstr: Some("no commit_lsn".to_string()), + safekeeper_connstr: Some("no_commit_lsn".to_string()), }, etcd_version: 0, latest_update: now, @@ -909,7 +914,7 @@ mod tests { remote_consistent_lsn: None, peer_horizon_lsn: None, local_start_lsn: None, - safekeeper_connstr: Some("no commit_lsn".to_string()), + safekeeper_connstr: Some("no_commit_lsn".to_string()), }, etcd_version: 0, latest_update: now, @@ -1005,7 +1010,7 @@ mod tests { peer_horizon_lsn: None, local_start_lsn: None, - safekeeper_connstr: Some("not advanced Lsn".to_string()), + safekeeper_connstr: Some("not_advanced_lsn".to_string()), }, etcd_version: 0, latest_update: now, @@ -1023,7 +1028,7 @@ mod tests { peer_horizon_lsn: None, local_start_lsn: None, - safekeeper_connstr: Some("not enough advanced Lsn".to_string()), + safekeeper_connstr: Some("not_enough_advanced_lsn".to_string()), }, etcd_version: 0, latest_update: now, @@ -1093,7 +1098,7 @@ mod tests { peer_horizon_lsn: None, local_start_lsn: None, - safekeeper_connstr: Some("smaller commit_lsn".to_string()), + safekeeper_connstr: Some("smaller_commit_lsn".to_string()), }, etcd_version: 0, latest_update: now, @@ -1283,7 +1288,7 @@ mod tests { peer_horizon_lsn: None, local_start_lsn: None, - safekeeper_connstr: Some("advanced by Lsn safekeeper".to_string()), + safekeeper_connstr: Some("advanced_by_lsn_safekeeper".to_string()), }, etcd_version: 0, latest_update: now, @@ -1307,7 +1312,7 @@ mod tests { ); assert!(over_threshcurrent_candidate .wal_source_connstr - .contains("advanced by Lsn safekeeper")); + .contains("advanced_by_lsn_safekeeper")); Ok(()) }