allow both domains and ip addresses in connection options for

pageserver and wal keeper. Also updated PageServerNode definition in
control plane to account for that. resolves #303
This commit is contained in:
Dmitry Rodionov
2021-07-08 19:15:56 +03:00
committed by Stas Kelvich
parent 4987d5ee1f
commit 75e717fe86
14 changed files with 92 additions and 67 deletions

3
Cargo.lock generated
View File

@@ -1,5 +1,7 @@
# This file is automatically @generated by Cargo.
# It is not intended for manual editing.
version = 3
[[package]]
name = "ahash"
version = "0.4.7"
@@ -2471,6 +2473,7 @@ dependencies = [
"bytes",
"hex-literal",
"log",
"postgres",
"serde",
"thiserror",
"workspace_hack",

View File

@@ -14,6 +14,7 @@ use std::{
use anyhow::{Context, Result};
use lazy_static::lazy_static;
use regex::Regex;
use zenith_utils::connstring::connection_host_port;
use crate::local_env::LocalEnv;
use pageserver::ZTimelineId;
@@ -289,14 +290,15 @@ impl PostgresNode {
// Connect it to the page server.
// Configure that node to take pages from pageserver
let (host, port) = connection_host_port(&self.pageserver.connection_config());
self.append_conf(
"postgresql.conf",
&format!(
"shared_preload_libraries = zenith \n\
zenith.page_server_connstring = 'host={} port={}'\n\
zenith.zenith_timeline='{}'\n",
self.pageserver.address().ip(),
self.pageserver.address().port(),
host,
port,
self.timelineid
),
)?;

View File

@@ -1,5 +1,5 @@
use std::collections::HashMap;
use std::net::{SocketAddr, TcpStream};
use std::net::{TcpStream};
use std::path::PathBuf;
use std::process::Command;
use std::thread;
@@ -8,10 +8,11 @@ use std::time::Duration;
use anyhow::{anyhow, bail, Result};
use nix::sys::signal::{kill, Signal};
use nix::unistd::Pid;
use postgres::{Client, NoTls};
use postgres::{Config, NoTls};
use crate::local_env::LocalEnv;
use crate::read_pidfile;
use zenith_utils::connstring::connection_address;
use pageserver::branches::BranchInfo;
//
@@ -21,7 +22,7 @@ use pageserver::branches::BranchInfo;
//
pub struct PageServerNode {
pub kill_on_exit: bool,
pub listen_address: Option<SocketAddr>,
pub connection_config: Option<Config>,
pub env: LocalEnv,
}
@@ -29,15 +30,19 @@ impl PageServerNode {
pub fn from_env(env: &LocalEnv) -> PageServerNode {
PageServerNode {
kill_on_exit: false,
listen_address: None, // default
connection_config: None, // default
env: env.clone(),
}
}
pub fn address(&self) -> SocketAddr {
match self.listen_address {
Some(addr) => addr,
None => "127.0.0.1:64000".parse().unwrap(),
fn default_config() -> Config {
"postgresql://no_user@localhost:64000/no_db".parse().unwrap()
}
pub fn connection_config(&self) -> Config {
match &self.connection_config {
Some(config) => config.clone(),
None => Self::default_config(),
}
}
@@ -74,7 +79,7 @@ impl PageServerNode {
pub fn start(&self) -> Result<()> {
println!(
"Starting pageserver at '{}' in {}",
self.address(),
connection_address(&self.connection_config()),
self.repo_path().display()
);
@@ -116,42 +121,29 @@ impl PageServerNode {
}
// wait for pageserver stop
let address = connection_address(&self.connection_config());
for _ in 0..5 {
let stream = TcpStream::connect(self.address());
let stream = TcpStream::connect(&address);
thread::sleep(Duration::from_secs(1));
if let Err(_e) = stream {
println!("Pageserver stopped");
return Ok(());
}
println!("Stopping pageserver on {}", self.address());
println!("Stopping pageserver on {}", address);
}
bail!("Failed to stop pageserver with pid {}", pid);
}
pub fn page_server_psql(&self, sql: &str) -> Vec<postgres::SimpleQueryMessage> {
let connstring = format!(
"host={} port={} dbname={} user={}",
self.address().ip(),
self.address().port(),
"no_db",
"no_user",
);
let mut client = Client::connect(connstring.as_str(), NoTls).unwrap();
let mut client = self.connection_config().connect(NoTls).unwrap();
println!("Pageserver query: '{}'", sql);
client.simple_query(sql).unwrap()
}
pub fn page_server_psql_client(&self) -> Result<postgres::Client, postgres::Error> {
let connstring = format!(
"host={} port={} dbname={} user={}",
self.address().ip(),
self.address().port(),
"no_db",
"no_user",
);
Client::connect(connstring.as_str(), NoTls)
self.connection_config().connect(NoTls)
}
pub fn branches_list(&self) -> Result<Vec<BranchInfo>> {

View File

@@ -8,7 +8,7 @@ use std::{
env,
fs::{File, OpenOptions},
io,
net::{SocketAddr, TcpListener},
net::TcpListener,
path::{Path, PathBuf},
process::exit,
thread,
@@ -65,11 +65,10 @@ impl CfgFileParams {
/// Create a PageServerConf from these string parameters
fn try_into_config(&self) -> Result<PageServerConf> {
let listen_addr: SocketAddr = self
.listen_addr
.as_deref()
.unwrap_or(DEFAULT_LISTEN_ADDR)
.parse()?;
let listen_addr = match self.listen_addr.as_ref() {
Some(addr) => addr.clone(),
None => DEFAULT_LISTEN_ADDR.to_owned(),
};
let gc_horizon: u64 = match self.gc_horizon.as_ref() {
Some(horizon_str) => horizon_str.parse()?,
@@ -272,7 +271,7 @@ fn start_pageserver(conf: &'static PageServerConf) -> Result<()> {
// Check that we can bind to address before further initialization
info!("Starting pageserver on {}", conf.listen_addr);
let pageserver_listener = TcpListener::bind(conf.listen_addr)?;
let pageserver_listener = TcpListener::bind(conf.listen_addr.clone())?;
// Initialize page cache, this will spawn walredo_thread
page_cache::init(conf);

View File

@@ -1,7 +1,6 @@
use serde::{Deserialize, Serialize};
use std::fmt;
use std::net::SocketAddr;
use std::path::PathBuf;
use std::str::FromStr;
use std::time::Duration;
@@ -27,7 +26,7 @@ pub mod walredo;
pub struct PageServerConf {
pub daemonize: bool,
pub interactive: bool,
pub listen_addr: SocketAddr,
pub listen_addr: String,
pub gc_horizon: u64,
pub gc_period: Duration,

View File

@@ -305,7 +305,7 @@ mod tests {
interactive: false,
gc_horizon: 64 * 1024 * 1024,
gc_period: Duration::from_secs(10),
listen_addr: "127.0.0.1:5430".parse().unwrap(),
listen_addr: "127.0.0.1:5430".to_string(),
workdir: repo_dir,
pg_distrib_dir: "".into(),
};

View File

@@ -472,11 +472,11 @@ class WalAcceptor:
cmd = [self.wa_binpath]
cmd.extend(["-D", self.data_dir])
cmd.extend(["-l", "127.0.0.1:{}".format(self.port)])
cmd.extend(["-l", "localhost:{}".format(self.port)])
cmd.append("--daemonize")
cmd.append("--no-sync")
# Tell page server it can receive WAL from this WAL safekeeper
cmd.extend(["--pageserver", "127.0.0.1:{}".format(DEFAULT_PAGESERVER_PORT)])
cmd.extend(["--pageserver", "localhost:{}".format(DEFAULT_PAGESERVER_PORT)])
cmd.extend(["--recall", "1 second"])
print('Running command "{}"'.format(' '.join(cmd)))
subprocess.run(cmd, check=True)
@@ -543,7 +543,7 @@ class WalAcceptorFactory:
def get_connstrs(self) -> str:
""" Get list of wal acceptor endpoints suitable for wal_acceptors GUC """
return ','.join(["127.0.0.1:{}".format(wa.port) for wa in self.instances])
return ','.join(["localhost:{}".format(wa.port) for wa in self.instances])
@zenfixture

View File

@@ -8,7 +8,6 @@ use log::*;
use parse_duration::parse;
use slog::Drain;
use std::io;
use std::net::ToSocketAddrs;
use std::path::{Path, PathBuf};
use std::thread;
use std::time::Duration;
@@ -74,7 +73,7 @@ fn main() -> Result<()> {
daemonize: false,
no_sync: false,
pageserver_addr: None,
listen_addr: "127.0.0.1:5454".parse()?,
listen_addr: "localhost:5454".to_string(),
ttl: None,
recall_period: None,
};
@@ -95,17 +94,11 @@ fn main() -> Result<()> {
}
if let Some(addr) = arg_matches.value_of("listen") {
// TODO: keep addr vector in config and listen them all
// XXX: with our callmemaybe approach we need to set 'advertised address'
// as it is not always possible to listen it. Another reason to ditch callmemaybe.
let addrs: Vec<_> = addr.to_socket_addrs().unwrap().collect();
conf.listen_addr = addrs[0];
conf.listen_addr = addr.to_owned();
}
if let Some(addr) = arg_matches.value_of("pageserver") {
// TODO: keep addr vector in config and check them all while connecting
let addrs: Vec<_> = addr.to_socket_addrs().unwrap().collect();
conf.pageserver_addr = Some(addrs[0]);
conf.pageserver_addr = Some(addr.to_owned());
}
if let Some(ttl) = arg_matches.value_of("ttl") {

View File

@@ -1,5 +1,4 @@
//
use std::net::SocketAddr;
use std::path::PathBuf;
use std::time::Duration;
@@ -15,8 +14,8 @@ pub struct WalAcceptorConf {
pub data_dir: PathBuf,
pub daemonize: bool,
pub no_sync: bool,
pub listen_addr: SocketAddr,
pub pageserver_addr: Option<SocketAddr>,
pub listen_addr: String,
pub pageserver_addr: Option<String>,
pub ttl: Option<Duration>,
pub recall_period: Option<Duration>,
}

View File

@@ -4,8 +4,9 @@
use anyhow::{bail, Result};
use log::*;
use postgres::{Client, NoTls};
use postgres::{Client, Config, NoTls};
use serde::{Deserialize, Serialize};
use zenith_utils::connstring::connection_host_port;
use std::cmp::{max, min};
use std::fs::{self, File, OpenOptions};
use std::io::{BufReader, Read, Seek, SeekFrom, Write};
@@ -149,19 +150,18 @@ pub struct ReceiveWalConn {
/// If pageserver already has replication channel, it will just ignore this request
///
fn request_callback(conf: WalAcceptorConf, timelineid: ZTimelineId) {
let addr = conf.pageserver_addr.unwrap();
let ps_connstr = format!(
"host={} port={} dbname={} user={}",
addr.ip(),
addr.port(),
"no_db",
"no_user",
);
let ps_addr = conf.pageserver_addr.unwrap();
let ps_connstr = format!("postgresql://no_user@{}/no_db", ps_addr);
// use Config parsing because SockAddr parsing doesnt allow to use host names instead of ip addresses
let me_connstr = format!("postgresql://no_user@{}/no_db", conf.listen_addr);
let me_conf: Config = me_connstr.parse().unwrap();
let (host, port) = connection_host_port(&me_conf);
let callme = format!(
"callmemaybe {} host={} port={} options='-c ztimelineid={}'",
timelineid,
conf.listen_addr.ip(),
conf.listen_addr.port(),
host,
port,
timelineid
);
loop {

View File

@@ -16,7 +16,7 @@ use zenith_utils::postgres_backend::PostgresBackend;
/// Accept incoming TCP connections and spawn them into a background thread.
pub fn thread_main(conf: WalAcceptorConf) -> Result<()> {
info!("Starting wal acceptor on {}", conf.listen_addr);
let listener = TcpListener::bind(conf.listen_addr).map_err(|e| {
let listener = TcpListener::bind(conf.listen_addr.clone()).map_err(|e| {
error!("failed to bind to address {}: {}", conf.listen_addr, e);
e
})?;

View File

@@ -12,6 +12,7 @@ log = "0.4.14"
serde = { version = "1.0", features = ["derive"] }
bincode = "1.3"
thiserror = "1.0"
postgres = { git = "https://github.com/zenithdb/rust-postgres.git", rev="9eb0dbfbeb6a6c1b79099b9f7ae4a8c021877858" }
workspace_hack = { path = "../workspace_hack" }
[dev-dependencies]

View File

@@ -0,0 +1,34 @@
use postgres::Config;
pub fn connection_host_port(config: &Config) -> (String, u16) {
assert_eq!(config.get_hosts().len(), 1, "only one pair of host and port is supported in connection string");
assert_eq!(config.get_ports().len(), 1, "only one pair of host and port is supported in connection string");
let host = match &config.get_hosts()[0] {
postgres::config::Host::Tcp(host) => host.as_ref(),
postgres::config::Host::Unix(host) => host.to_str().unwrap(),
};
(host.to_owned(), config.get_ports()[0])
}
pub fn connection_address(config: &Config) -> String {
let (host, port) = connection_host_port(config);
format!("{}:{}", host, port)
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_connection_host_port() {
let config: Config = "postgresql://no_user@localhost:64000/no_db".parse().unwrap();
assert_eq!(connection_host_port(&config), ("localhost".to_owned(), 64000));
}
#[test]
#[should_panic(expected = "only one pair of host and port is supported in connection string")]
fn test_connection_host_port_multiple_ports() {
let config: Config = "postgresql://no_user@localhost:64000,localhost:64001/no_db".parse().unwrap();
assert_eq!(connection_host_port(&config), ("localhost".to_owned(), 64000));
}
}

View File

@@ -13,3 +13,6 @@ pub mod bin_ser;
pub mod postgres_backend;
pub mod pq_proto;
// dealing with connstring parsing and handy access to it's parts
pub mod connstring;