From 75e717fe8695d611891efb2094e589a904776115 Mon Sep 17 00:00:00 2001 From: Dmitry Rodionov Date: Thu, 8 Jul 2021 19:15:56 +0300 Subject: [PATCH] allow both domains and ip addresses in connection options for pageserver and wal keeper. Also updated PageServerNode definition in control plane to account for that. resolves #303 --- Cargo.lock | 3 ++ control_plane/src/compute.rs | 6 ++-- control_plane/src/storage.rs | 46 ++++++++++--------------- pageserver/src/bin/pageserver.rs | 13 ++++--- pageserver/src/lib.rs | 3 +- pageserver/src/repository.rs | 2 +- test_runner/fixtures/zenith_fixtures.py | 6 ++-- walkeeper/src/bin/wal_acceptor.rs | 13 ++----- walkeeper/src/lib.rs | 5 ++- walkeeper/src/receive_wal.rs | 22 ++++++------ walkeeper/src/wal_service.rs | 2 +- zenith_utils/Cargo.toml | 1 + zenith_utils/src/connstring.rs | 34 ++++++++++++++++++ zenith_utils/src/lib.rs | 3 ++ 14 files changed, 92 insertions(+), 67 deletions(-) create mode 100644 zenith_utils/src/connstring.rs diff --git a/Cargo.lock b/Cargo.lock index 6a130d30b6..8026f86c97 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1,5 +1,7 @@ # This file is automatically @generated by Cargo. # It is not intended for manual editing. +version = 3 + [[package]] name = "ahash" version = "0.4.7" @@ -2471,6 +2473,7 @@ dependencies = [ "bytes", "hex-literal", "log", + "postgres", "serde", "thiserror", "workspace_hack", diff --git a/control_plane/src/compute.rs b/control_plane/src/compute.rs index b2066a8891..b43d6f1fcc 100644 --- a/control_plane/src/compute.rs +++ b/control_plane/src/compute.rs @@ -14,6 +14,7 @@ use std::{ use anyhow::{Context, Result}; use lazy_static::lazy_static; use regex::Regex; +use zenith_utils::connstring::connection_host_port; use crate::local_env::LocalEnv; use pageserver::ZTimelineId; @@ -289,14 +290,15 @@ impl PostgresNode { // Connect it to the page server. // Configure that node to take pages from pageserver + let (host, port) = connection_host_port(&self.pageserver.connection_config()); self.append_conf( "postgresql.conf", &format!( "shared_preload_libraries = zenith \n\ zenith.page_server_connstring = 'host={} port={}'\n\ zenith.zenith_timeline='{}'\n", - self.pageserver.address().ip(), - self.pageserver.address().port(), + host, + port, self.timelineid ), )?; diff --git a/control_plane/src/storage.rs b/control_plane/src/storage.rs index e6de017292..f86bf66e16 100644 --- a/control_plane/src/storage.rs +++ b/control_plane/src/storage.rs @@ -1,5 +1,5 @@ use std::collections::HashMap; -use std::net::{SocketAddr, TcpStream}; +use std::net::{TcpStream}; use std::path::PathBuf; use std::process::Command; use std::thread; @@ -8,10 +8,11 @@ use std::time::Duration; use anyhow::{anyhow, bail, Result}; use nix::sys::signal::{kill, Signal}; use nix::unistd::Pid; -use postgres::{Client, NoTls}; +use postgres::{Config, NoTls}; use crate::local_env::LocalEnv; use crate::read_pidfile; +use zenith_utils::connstring::connection_address; use pageserver::branches::BranchInfo; // @@ -21,7 +22,7 @@ use pageserver::branches::BranchInfo; // pub struct PageServerNode { pub kill_on_exit: bool, - pub listen_address: Option, + pub connection_config: Option, pub env: LocalEnv, } @@ -29,15 +30,19 @@ impl PageServerNode { pub fn from_env(env: &LocalEnv) -> PageServerNode { PageServerNode { kill_on_exit: false, - listen_address: None, // default + connection_config: None, // default env: env.clone(), } } - pub fn address(&self) -> SocketAddr { - match self.listen_address { - Some(addr) => addr, - None => "127.0.0.1:64000".parse().unwrap(), + fn default_config() -> Config { + "postgresql://no_user@localhost:64000/no_db".parse().unwrap() + } + + pub fn connection_config(&self) -> Config { + match &self.connection_config { + Some(config) => config.clone(), + None => Self::default_config(), } } @@ -74,7 +79,7 @@ impl PageServerNode { pub fn start(&self) -> Result<()> { println!( "Starting pageserver at '{}' in {}", - self.address(), + connection_address(&self.connection_config()), self.repo_path().display() ); @@ -116,42 +121,29 @@ impl PageServerNode { } // wait for pageserver stop + let address = connection_address(&self.connection_config()); for _ in 0..5 { - let stream = TcpStream::connect(self.address()); + let stream = TcpStream::connect(&address); thread::sleep(Duration::from_secs(1)); if let Err(_e) = stream { println!("Pageserver stopped"); return Ok(()); } - println!("Stopping pageserver on {}", self.address()); + println!("Stopping pageserver on {}", address); } bail!("Failed to stop pageserver with pid {}", pid); } pub fn page_server_psql(&self, sql: &str) -> Vec { - let connstring = format!( - "host={} port={} dbname={} user={}", - self.address().ip(), - self.address().port(), - "no_db", - "no_user", - ); - let mut client = Client::connect(connstring.as_str(), NoTls).unwrap(); + let mut client = self.connection_config().connect(NoTls).unwrap(); println!("Pageserver query: '{}'", sql); client.simple_query(sql).unwrap() } pub fn page_server_psql_client(&self) -> Result { - let connstring = format!( - "host={} port={} dbname={} user={}", - self.address().ip(), - self.address().port(), - "no_db", - "no_user", - ); - Client::connect(connstring.as_str(), NoTls) + self.connection_config().connect(NoTls) } pub fn branches_list(&self) -> Result> { diff --git a/pageserver/src/bin/pageserver.rs b/pageserver/src/bin/pageserver.rs index 4650362395..52a0f0a7d0 100644 --- a/pageserver/src/bin/pageserver.rs +++ b/pageserver/src/bin/pageserver.rs @@ -8,7 +8,7 @@ use std::{ env, fs::{File, OpenOptions}, io, - net::{SocketAddr, TcpListener}, + net::TcpListener, path::{Path, PathBuf}, process::exit, thread, @@ -65,11 +65,10 @@ impl CfgFileParams { /// Create a PageServerConf from these string parameters fn try_into_config(&self) -> Result { - let listen_addr: SocketAddr = self - .listen_addr - .as_deref() - .unwrap_or(DEFAULT_LISTEN_ADDR) - .parse()?; + let listen_addr = match self.listen_addr.as_ref() { + Some(addr) => addr.clone(), + None => DEFAULT_LISTEN_ADDR.to_owned(), + }; let gc_horizon: u64 = match self.gc_horizon.as_ref() { Some(horizon_str) => horizon_str.parse()?, @@ -272,7 +271,7 @@ fn start_pageserver(conf: &'static PageServerConf) -> Result<()> { // Check that we can bind to address before further initialization info!("Starting pageserver on {}", conf.listen_addr); - let pageserver_listener = TcpListener::bind(conf.listen_addr)?; + let pageserver_listener = TcpListener::bind(conf.listen_addr.clone())?; // Initialize page cache, this will spawn walredo_thread page_cache::init(conf); diff --git a/pageserver/src/lib.rs b/pageserver/src/lib.rs index 1666a65984..d4cce49f9c 100644 --- a/pageserver/src/lib.rs +++ b/pageserver/src/lib.rs @@ -1,7 +1,6 @@ use serde::{Deserialize, Serialize}; use std::fmt; -use std::net::SocketAddr; use std::path::PathBuf; use std::str::FromStr; use std::time::Duration; @@ -27,7 +26,7 @@ pub mod walredo; pub struct PageServerConf { pub daemonize: bool, pub interactive: bool, - pub listen_addr: SocketAddr, + pub listen_addr: String, pub gc_horizon: u64, pub gc_period: Duration, diff --git a/pageserver/src/repository.rs b/pageserver/src/repository.rs index 595b6c593c..9d62402ce1 100644 --- a/pageserver/src/repository.rs +++ b/pageserver/src/repository.rs @@ -305,7 +305,7 @@ mod tests { interactive: false, gc_horizon: 64 * 1024 * 1024, gc_period: Duration::from_secs(10), - listen_addr: "127.0.0.1:5430".parse().unwrap(), + listen_addr: "127.0.0.1:5430".to_string(), workdir: repo_dir, pg_distrib_dir: "".into(), }; diff --git a/test_runner/fixtures/zenith_fixtures.py b/test_runner/fixtures/zenith_fixtures.py index aa1557752c..07009d6c33 100644 --- a/test_runner/fixtures/zenith_fixtures.py +++ b/test_runner/fixtures/zenith_fixtures.py @@ -472,11 +472,11 @@ class WalAcceptor: cmd = [self.wa_binpath] cmd.extend(["-D", self.data_dir]) - cmd.extend(["-l", "127.0.0.1:{}".format(self.port)]) + cmd.extend(["-l", "localhost:{}".format(self.port)]) cmd.append("--daemonize") cmd.append("--no-sync") # Tell page server it can receive WAL from this WAL safekeeper - cmd.extend(["--pageserver", "127.0.0.1:{}".format(DEFAULT_PAGESERVER_PORT)]) + cmd.extend(["--pageserver", "localhost:{}".format(DEFAULT_PAGESERVER_PORT)]) cmd.extend(["--recall", "1 second"]) print('Running command "{}"'.format(' '.join(cmd))) subprocess.run(cmd, check=True) @@ -543,7 +543,7 @@ class WalAcceptorFactory: def get_connstrs(self) -> str: """ Get list of wal acceptor endpoints suitable for wal_acceptors GUC """ - return ','.join(["127.0.0.1:{}".format(wa.port) for wa in self.instances]) + return ','.join(["localhost:{}".format(wa.port) for wa in self.instances]) @zenfixture diff --git a/walkeeper/src/bin/wal_acceptor.rs b/walkeeper/src/bin/wal_acceptor.rs index 6d9e60bddf..1b39253415 100644 --- a/walkeeper/src/bin/wal_acceptor.rs +++ b/walkeeper/src/bin/wal_acceptor.rs @@ -8,7 +8,6 @@ use log::*; use parse_duration::parse; use slog::Drain; use std::io; -use std::net::ToSocketAddrs; use std::path::{Path, PathBuf}; use std::thread; use std::time::Duration; @@ -74,7 +73,7 @@ fn main() -> Result<()> { daemonize: false, no_sync: false, pageserver_addr: None, - listen_addr: "127.0.0.1:5454".parse()?, + listen_addr: "localhost:5454".to_string(), ttl: None, recall_period: None, }; @@ -95,17 +94,11 @@ fn main() -> Result<()> { } if let Some(addr) = arg_matches.value_of("listen") { - // TODO: keep addr vector in config and listen them all - // XXX: with our callmemaybe approach we need to set 'advertised address' - // as it is not always possible to listen it. Another reason to ditch callmemaybe. - let addrs: Vec<_> = addr.to_socket_addrs().unwrap().collect(); - conf.listen_addr = addrs[0]; + conf.listen_addr = addr.to_owned(); } if let Some(addr) = arg_matches.value_of("pageserver") { - // TODO: keep addr vector in config and check them all while connecting - let addrs: Vec<_> = addr.to_socket_addrs().unwrap().collect(); - conf.pageserver_addr = Some(addrs[0]); + conf.pageserver_addr = Some(addr.to_owned()); } if let Some(ttl) = arg_matches.value_of("ttl") { diff --git a/walkeeper/src/lib.rs b/walkeeper/src/lib.rs index d269776064..47a5ecaa0c 100644 --- a/walkeeper/src/lib.rs +++ b/walkeeper/src/lib.rs @@ -1,5 +1,4 @@ // -use std::net::SocketAddr; use std::path::PathBuf; use std::time::Duration; @@ -15,8 +14,8 @@ pub struct WalAcceptorConf { pub data_dir: PathBuf, pub daemonize: bool, pub no_sync: bool, - pub listen_addr: SocketAddr, - pub pageserver_addr: Option, + pub listen_addr: String, + pub pageserver_addr: Option, pub ttl: Option, pub recall_period: Option, } diff --git a/walkeeper/src/receive_wal.rs b/walkeeper/src/receive_wal.rs index 55aa2e6c29..ae8d3a6349 100644 --- a/walkeeper/src/receive_wal.rs +++ b/walkeeper/src/receive_wal.rs @@ -4,8 +4,9 @@ use anyhow::{bail, Result}; use log::*; -use postgres::{Client, NoTls}; +use postgres::{Client, Config, NoTls}; use serde::{Deserialize, Serialize}; +use zenith_utils::connstring::connection_host_port; use std::cmp::{max, min}; use std::fs::{self, File, OpenOptions}; use std::io::{BufReader, Read, Seek, SeekFrom, Write}; @@ -149,19 +150,18 @@ pub struct ReceiveWalConn { /// If pageserver already has replication channel, it will just ignore this request /// fn request_callback(conf: WalAcceptorConf, timelineid: ZTimelineId) { - let addr = conf.pageserver_addr.unwrap(); - let ps_connstr = format!( - "host={} port={} dbname={} user={}", - addr.ip(), - addr.port(), - "no_db", - "no_user", - ); + let ps_addr = conf.pageserver_addr.unwrap(); + let ps_connstr = format!("postgresql://no_user@{}/no_db", ps_addr); + + // use Config parsing because SockAddr parsing doesnt allow to use host names instead of ip addresses + let me_connstr = format!("postgresql://no_user@{}/no_db", conf.listen_addr); + let me_conf: Config = me_connstr.parse().unwrap(); + let (host, port) = connection_host_port(&me_conf); let callme = format!( "callmemaybe {} host={} port={} options='-c ztimelineid={}'", timelineid, - conf.listen_addr.ip(), - conf.listen_addr.port(), + host, + port, timelineid ); loop { diff --git a/walkeeper/src/wal_service.rs b/walkeeper/src/wal_service.rs index fab07a24c4..82796e1298 100644 --- a/walkeeper/src/wal_service.rs +++ b/walkeeper/src/wal_service.rs @@ -16,7 +16,7 @@ use zenith_utils::postgres_backend::PostgresBackend; /// Accept incoming TCP connections and spawn them into a background thread. pub fn thread_main(conf: WalAcceptorConf) -> Result<()> { info!("Starting wal acceptor on {}", conf.listen_addr); - let listener = TcpListener::bind(conf.listen_addr).map_err(|e| { + let listener = TcpListener::bind(conf.listen_addr.clone()).map_err(|e| { error!("failed to bind to address {}: {}", conf.listen_addr, e); e })?; diff --git a/zenith_utils/Cargo.toml b/zenith_utils/Cargo.toml index 3c644cb6d9..bd42109bf2 100644 --- a/zenith_utils/Cargo.toml +++ b/zenith_utils/Cargo.toml @@ -12,6 +12,7 @@ log = "0.4.14" serde = { version = "1.0", features = ["derive"] } bincode = "1.3" thiserror = "1.0" +postgres = { git = "https://github.com/zenithdb/rust-postgres.git", rev="9eb0dbfbeb6a6c1b79099b9f7ae4a8c021877858" } workspace_hack = { path = "../workspace_hack" } [dev-dependencies] diff --git a/zenith_utils/src/connstring.rs b/zenith_utils/src/connstring.rs new file mode 100644 index 0000000000..1340f0b899 --- /dev/null +++ b/zenith_utils/src/connstring.rs @@ -0,0 +1,34 @@ +use postgres::Config; + +pub fn connection_host_port(config: &Config) -> (String, u16) { + assert_eq!(config.get_hosts().len(), 1, "only one pair of host and port is supported in connection string"); + assert_eq!(config.get_ports().len(), 1, "only one pair of host and port is supported in connection string"); + let host = match &config.get_hosts()[0] { + postgres::config::Host::Tcp(host) => host.as_ref(), + postgres::config::Host::Unix(host) => host.to_str().unwrap(), + }; + (host.to_owned(), config.get_ports()[0]) +} + +pub fn connection_address(config: &Config) -> String { + let (host, port) = connection_host_port(config); + format!("{}:{}", host, port) +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_connection_host_port() { + let config: Config = "postgresql://no_user@localhost:64000/no_db".parse().unwrap(); + assert_eq!(connection_host_port(&config), ("localhost".to_owned(), 64000)); + } + + #[test] + #[should_panic(expected = "only one pair of host and port is supported in connection string")] + fn test_connection_host_port_multiple_ports() { + let config: Config = "postgresql://no_user@localhost:64000,localhost:64001/no_db".parse().unwrap(); + assert_eq!(connection_host_port(&config), ("localhost".to_owned(), 64000)); + } +} diff --git a/zenith_utils/src/lib.rs b/zenith_utils/src/lib.rs index fc24a0ad1e..5d867b7dce 100644 --- a/zenith_utils/src/lib.rs +++ b/zenith_utils/src/lib.rs @@ -13,3 +13,6 @@ pub mod bin_ser; pub mod postgres_backend; pub mod pq_proto; + +// dealing with connstring parsing and handy access to it's parts +pub mod connstring;