Refactoring: replace utils::connstring with Url-based APIs

This commit is contained in:
Dmitry Ivanov
2022-10-31 18:39:29 +03:00
parent c64a121aa8
commit 0df3467146
10 changed files with 103 additions and 94 deletions

1
Cargo.lock generated
View File

@@ -600,6 +600,7 @@ dependencies = [
"tar",
"thiserror",
"toml",
"url",
"utils",
"workspace_hack",
]

View File

@@ -4,20 +4,21 @@ version = "0.1.0"
edition = "2021"
[dependencies]
anyhow = "1.0"
clap = "4.0"
comfy-table = "6.1"
git-version = "0.3.5"
tar = "0.4.38"
nix = "0.25"
once_cell = "1.13.0"
postgres = { git = "https://github.com/neondatabase/rust-postgres.git", rev = "d052ee8b86fff9897c77b0fe89ea9daba0e1fa38" }
regex = "1"
reqwest = { version = "0.11", default-features = false, features = ["blocking", "json", "rustls-tls"] }
serde = { version = "1.0", features = ["derive"] }
serde_with = "2.0"
toml = "0.5"
once_cell = "1.13.0"
regex = "1"
anyhow = "1.0"
tar = "0.4.38"
thiserror = "1"
nix = "0.25"
reqwest = { version = "0.11", default-features = false, features = ["blocking", "json", "rustls-tls"] }
toml = "0.5"
url = "2.2.2"
# Note: Do not directly depend on pageserver or safekeeper; use pageserver_api or safekeeper_api
# instead, so that recompile times are better.

View File

@@ -12,7 +12,6 @@ use std::time::Duration;
use anyhow::{Context, Result};
use utils::{
connstring::connection_host_port,
id::{TenantId, TimelineId},
lsn::Lsn,
postgres_backend::AuthType,
@@ -300,7 +299,8 @@ impl PostgresNode {
// Configure the node to fetch pages from pageserver
let pageserver_connstr = {
let (host, port) = connection_host_port(&self.pageserver.pg_connection_config);
let config = &self.pageserver.pg_connection_config;
let (host, port) = (config.host(), config.port());
// Set up authentication
//

View File

@@ -0,0 +1,57 @@
use url::Url;
#[derive(Debug)]
pub struct PgConnectionConfig {
url: Url,
}
impl PgConnectionConfig {
pub fn host(&self) -> &str {
self.url.host_str().expect("BUG: no host")
}
pub fn port(&self) -> u16 {
self.url.port().expect("BUG: no port")
}
/// Return a `<host>:<port>` string.
pub fn raw_address(&self) -> String {
format!("{}:{}", self.host(), self.port())
}
/// Connect using postgres protocol with TLS disabled.
pub fn connect_no_tls(&self) -> Result<postgres::Client, postgres::Error> {
postgres::Client::connect(self.url.as_str(), postgres::NoTls)
}
}
impl std::str::FromStr for PgConnectionConfig {
type Err = anyhow::Error;
fn from_str(s: &str) -> Result<Self, Self::Err> {
let mut url: Url = s.parse()?;
match url.scheme() {
"postgres" | "postgresql" => {}
other => anyhow::bail!("invalid scheme: {other}"),
}
// It's not a valid connection url if host is unavailable.
if url.host().is_none() {
anyhow::bail!(url::ParseError::EmptyHost);
}
// E.g. `postgres:bar`.
if url.cannot_be_a_base() {
anyhow::bail!("URL cannot be a base");
}
// Set the default PG port if it's missing.
if url.port().is_none() {
url.set_port(Some(5432))
.expect("BUG: couldn't set the default port");
}
Ok(Self { url })
}
}

View File

@@ -12,6 +12,7 @@ use std::path::Path;
use std::process::Command;
pub mod compute;
pub mod connection;
pub mod etcd;
pub mod local_env;
pub mod postgresql_conf;

View File

@@ -9,12 +9,12 @@ use anyhow::bail;
use nix::errno::Errno;
use nix::sys::signal::{kill, Signal};
use nix::unistd::Pid;
use postgres::Config;
use reqwest::blocking::{Client, RequestBuilder, Response};
use reqwest::{IntoUrl, Method};
use thiserror::Error;
use utils::{connstring::connection_address, http::error::HttpErrorBody, id::NodeId};
use utils::{http::error::HttpErrorBody, id::NodeId};
use crate::connection::PgConnectionConfig;
use crate::local_env::{LocalEnv, SafekeeperConf};
use crate::storage::PageServerNode;
use crate::{fill_aws_secrets_vars, fill_rust_env_vars, read_pidfile};
@@ -63,7 +63,7 @@ pub struct SafekeeperNode {
pub conf: SafekeeperConf,
pub pg_connection_config: Config,
pub pg_connection_config: PgConnectionConfig,
pub env: LocalEnv,
pub http_client: Client,
pub http_base_url: String,
@@ -87,9 +87,9 @@ impl SafekeeperNode {
}
/// Construct libpq connection string for connecting to this safekeeper.
fn safekeeper_connection_config(port: u16) -> Config {
fn safekeeper_connection_config(port: u16) -> PgConnectionConfig {
// TODO safekeeper authentication not implemented yet
format!("postgresql://no_user@127.0.0.1:{}/no_db", port)
format!("postgresql://no_user@127.0.0.1:{port}/no_db")
.parse()
.unwrap()
}
@@ -109,7 +109,7 @@ impl SafekeeperNode {
pub fn start(&self) -> anyhow::Result<()> {
print!(
"Starting safekeeper at '{}' in '{}'",
connection_address(&self.pg_connection_config),
self.pg_connection_config.raw_address(),
self.datadir_path().display()
);
io::stdout().flush().unwrap();

View File

@@ -7,6 +7,7 @@ use std::process::Command;
use std::time::Duration;
use std::{io, result, thread};
use crate::connection::PgConnectionConfig;
use anyhow::{bail, Context};
use nix::errno::Errno;
use nix::sys::signal::{kill, Signal};
@@ -14,12 +15,10 @@ use nix::unistd::Pid;
use pageserver_api::models::{
TenantConfigRequest, TenantCreateRequest, TenantInfo, TimelineCreateRequest, TimelineInfo,
};
use postgres::{Config, NoTls};
use reqwest::blocking::{Client, RequestBuilder, Response};
use reqwest::{IntoUrl, Method};
use thiserror::Error;
use utils::{
connstring::connection_address,
http::error::HttpErrorBody,
id::{TenantId, TimelineId},
lsn::Lsn,
@@ -75,7 +74,7 @@ impl ResponseErrorMessageExt for Response {
//
#[derive(Debug)]
pub struct PageServerNode {
pub pg_connection_config: Config,
pub pg_connection_config: PgConnectionConfig,
pub env: LocalEnv,
pub http_client: Client,
pub http_base_url: String,
@@ -101,7 +100,7 @@ impl PageServerNode {
}
/// Construct libpq connection string for connecting to the pageserver.
fn pageserver_connection_config(password: &str, listen_addr: &str) -> Config {
fn pageserver_connection_config(password: &str, listen_addr: &str) -> PgConnectionConfig {
format!("postgresql://no_user:{password}@{listen_addr}/no_db")
.parse()
.unwrap()
@@ -212,7 +211,7 @@ impl PageServerNode {
) -> anyhow::Result<()> {
println!(
"Starting pageserver at '{}' in '{}'",
connection_address(&self.pg_connection_config),
self.pg_connection_config.raw_address(),
datadir.display()
);
io::stdout().flush()?;
@@ -343,14 +342,14 @@ impl PageServerNode {
}
pub fn page_server_psql(&self, sql: &str) -> Vec<postgres::SimpleQueryMessage> {
let mut client = self.pg_connection_config.connect(NoTls).unwrap();
let mut client = self.pg_connection_config.connect_no_tls().unwrap();
println!("Pageserver query: '{sql}'");
client.simple_query(sql).unwrap()
}
pub fn page_server_psql_client(&self) -> result::Result<postgres::Client, postgres::Error> {
self.pg_connection_config.connect(NoTls)
self.pg_connection_config.connect_no_tls()
}
fn http_request<U: IntoUrl>(&self, method: Method, url: U) -> RequestBuilder {
@@ -549,7 +548,7 @@ impl PageServerNode {
pg_wal: Option<(Lsn, PathBuf)>,
pg_version: u32,
) -> anyhow::Result<()> {
let mut client = self.pg_connection_config.connect(NoTls).unwrap();
let mut client = self.pg_connection_config.connect_no_tls().unwrap();
// Init base reader
let (start_lsn, base_tarfile_path) = base;

View File

@@ -1,52 +0,0 @@
use postgres::Config;
pub fn connection_host_port(config: &Config) -> (String, u16) {
assert_eq!(
config.get_hosts().len(),
1,
"only one pair of host and port is supported in connection string"
);
assert_eq!(
config.get_ports().len(),
1,
"only one pair of host and port is supported in connection string"
);
let host = match &config.get_hosts()[0] {
postgres::config::Host::Tcp(host) => host.as_ref(),
postgres::config::Host::Unix(host) => host.to_str().unwrap(),
};
(host.to_owned(), config.get_ports()[0])
}
pub fn connection_address(config: &Config) -> String {
let (host, port) = connection_host_port(config);
format!("{}:{}", host, port)
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_connection_host_port() {
let config: Config = "postgresql://no_user@localhost:64000/no_db"
.parse()
.unwrap();
assert_eq!(
connection_host_port(&config),
("localhost".to_owned(), 64000)
);
}
#[test]
#[should_panic(expected = "only one pair of host and port is supported in connection string")]
fn test_connection_host_port_multiple_ports() {
let config: Config = "postgresql://no_user@localhost:64000,localhost:64001/no_db"
.parse()
.unwrap();
assert_eq!(
connection_host_port(&config),
("localhost".to_owned(), 64000)
);
}
}

View File

@@ -19,9 +19,6 @@ pub mod postgres_backend;
pub mod postgres_backend_async;
pub mod pq_proto;
// dealing with connstring parsing and handy access to it's parts
pub mod connstring;
// helper functions for creating and fsyncing
pub mod crashsafe;

View File

@@ -836,15 +836,20 @@ fn wal_stream_connection_string(
listen_pg_addr_str: &str,
) -> anyhow::Result<String> {
let sk_connstr = format!("postgresql://no_user@{listen_pg_addr_str}/no_db");
let me_conf = sk_connstr
.parse::<postgres::config::Config>()
.with_context(|| {
format!("Failed to parse pageserver connection string '{sk_connstr}' as a postgres one")
})?;
let (host, port) = utils::connstring::connection_host_port(&me_conf);
Ok(format!(
"host={host} port={port} options='-c timeline_id={timeline_id} tenant_id={tenant_id}'"
))
sk_connstr
.parse()
.context("bad url")
.and_then(|url: url::Url| {
let host = url.host_str().context("host is missing")?;
let port = url.port().unwrap_or(5432); // default PG port
Ok(format!(
"host={host} \
port={port} \
options='-c timeline_id={timeline_id} tenant_id={tenant_id}'"
))
})
.with_context(|| format!("Failed to parse pageserver connection URL '{sk_connstr}'"))
}
#[cfg(test)]
@@ -892,7 +897,7 @@ mod tests {
peer_horizon_lsn: None,
local_start_lsn: None,
safekeeper_connstr: Some("no commit_lsn".to_string()),
safekeeper_connstr: Some("no_commit_lsn".to_string()),
},
etcd_version: 0,
latest_update: now,
@@ -909,7 +914,7 @@ mod tests {
remote_consistent_lsn: None,
peer_horizon_lsn: None,
local_start_lsn: None,
safekeeper_connstr: Some("no commit_lsn".to_string()),
safekeeper_connstr: Some("no_commit_lsn".to_string()),
},
etcd_version: 0,
latest_update: now,
@@ -1005,7 +1010,7 @@ mod tests {
peer_horizon_lsn: None,
local_start_lsn: None,
safekeeper_connstr: Some("not advanced Lsn".to_string()),
safekeeper_connstr: Some("not_advanced_lsn".to_string()),
},
etcd_version: 0,
latest_update: now,
@@ -1023,7 +1028,7 @@ mod tests {
peer_horizon_lsn: None,
local_start_lsn: None,
safekeeper_connstr: Some("not enough advanced Lsn".to_string()),
safekeeper_connstr: Some("not_enough_advanced_lsn".to_string()),
},
etcd_version: 0,
latest_update: now,
@@ -1093,7 +1098,7 @@ mod tests {
peer_horizon_lsn: None,
local_start_lsn: None,
safekeeper_connstr: Some("smaller commit_lsn".to_string()),
safekeeper_connstr: Some("smaller_commit_lsn".to_string()),
},
etcd_version: 0,
latest_update: now,
@@ -1283,7 +1288,7 @@ mod tests {
peer_horizon_lsn: None,
local_start_lsn: None,
safekeeper_connstr: Some("advanced by Lsn safekeeper".to_string()),
safekeeper_connstr: Some("advanced_by_lsn_safekeeper".to_string()),
},
etcd_version: 0,
latest_update: now,
@@ -1307,7 +1312,7 @@ mod tests {
);
assert!(over_threshcurrent_candidate
.wal_source_connstr
.contains("advanced by Lsn safekeeper"));
.contains("advanced_by_lsn_safekeeper"));
Ok(())
}