From bc49c73fee83b39d4d161e693f1e38316d13febb Mon Sep 17 00:00:00 2001 From: Arseny Sher Date: Sun, 27 Aug 2023 14:32:52 +0300 Subject: [PATCH] Move wal_stream_connection_config to utils. It will be used by safekeeper as well. --- Cargo.lock | 1 + libs/utils/Cargo.toml | 1 + libs/utils/src/lib.rs | 2 + libs/utils/src/postgres_client.rs | 37 +++++++++++++++++++ .../walreceiver/connection_manager.rs | 30 +-------------- 5 files changed, 43 insertions(+), 28 deletions(-) create mode 100644 libs/utils/src/postgres_client.rs diff --git a/Cargo.lock b/Cargo.lock index c78b7605b8..c213cd4d0a 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -5014,6 +5014,7 @@ dependencies = [ "nix 0.26.2", "once_cell", "pin-project-lite", + "postgres_connection", "pq_proto", "rand", "regex", diff --git a/libs/utils/Cargo.toml b/libs/utils/Cargo.toml index 7315cda50e..1eb9e6ab4d 100644 --- a/libs/utils/Cargo.toml +++ b/libs/utils/Cargo.toml @@ -38,6 +38,7 @@ url.workspace = true uuid.workspace = true pq_proto.workspace = true +postgres_connection.workspace = true metrics.workspace = true workspace_hack.workspace = true diff --git a/libs/utils/src/lib.rs b/libs/utils/src/lib.rs index 638dba427b..6cf829a67c 100644 --- a/libs/utils/src/lib.rs +++ b/libs/utils/src/lib.rs @@ -58,6 +58,8 @@ pub mod serde_regex; pub mod pageserver_feedback; +pub mod postgres_client; + pub mod tracing_span_assert; pub mod rate_limit; diff --git a/libs/utils/src/postgres_client.rs b/libs/utils/src/postgres_client.rs new file mode 100644 index 0000000000..dba74f5b0b --- /dev/null +++ b/libs/utils/src/postgres_client.rs @@ -0,0 +1,37 @@ +//! Postgres client connection code common to other crates (safekeeper and +//! pageserver) which depends on tenant/timeline ids and thus not fitting into +//! postgres_connection crate. + +use anyhow::Context; +use postgres_connection::{parse_host_port, PgConnectionConfig}; + +use crate::id::TenantTimelineId; + +/// Create client config for fetching WAL from safekeeper on particular timeline. +/// listen_pg_addr_str is in form host:\[port\]. +pub fn wal_stream_connection_config( + TenantTimelineId { + tenant_id, + timeline_id, + }: TenantTimelineId, + listen_pg_addr_str: &str, + auth_token: Option<&str>, + availability_zone: Option<&str>, +) -> anyhow::Result { + let (host, port) = + parse_host_port(listen_pg_addr_str).context("Unable to parse listen_pg_addr_str")?; + let port = port.unwrap_or(5432); + let mut connstr = PgConnectionConfig::new_host_port(host, port) + .extend_options([ + "-c".to_owned(), + format!("timeline_id={}", timeline_id), + format!("tenant_id={}", tenant_id), + ]) + .set_password(auth_token.map(|s| s.to_owned())); + + if let Some(availability_zone) = availability_zone { + connstr = connstr.extend_options([format!("availability_zone={}", availability_zone)]); + } + + Ok(connstr) +} diff --git a/pageserver/src/tenant/timeline/walreceiver/connection_manager.rs b/pageserver/src/tenant/timeline/walreceiver/connection_manager.rs index a4666a6a86..36960c631c 100644 --- a/pageserver/src/tenant/timeline/walreceiver/connection_manager.rs +++ b/pageserver/src/tenant/timeline/walreceiver/connection_manager.rs @@ -31,10 +31,11 @@ use storage_broker::Streaming; use tokio::select; use tracing::*; -use postgres_connection::{parse_host_port, PgConnectionConfig}; +use postgres_connection::PgConnectionConfig; use utils::backoff::{ exponential_backoff, DEFAULT_BASE_BACKOFF_SECONDS, DEFAULT_MAX_BACKOFF_SECONDS, }; +use utils::postgres_client::wal_stream_connection_config; use utils::{ id::{NodeId, TenantTimelineId}, lsn::Lsn, @@ -879,33 +880,6 @@ impl ReconnectReason { } } -fn wal_stream_connection_config( - TenantTimelineId { - tenant_id, - timeline_id, - }: TenantTimelineId, - listen_pg_addr_str: &str, - auth_token: Option<&str>, - availability_zone: Option<&str>, -) -> anyhow::Result { - let (host, port) = - parse_host_port(listen_pg_addr_str).context("Unable to parse listen_pg_addr_str")?; - let port = port.unwrap_or(5432); - let mut connstr = PgConnectionConfig::new_host_port(host, port) - .extend_options([ - "-c".to_owned(), - format!("timeline_id={}", timeline_id), - format!("tenant_id={}", tenant_id), - ]) - .set_password(auth_token.map(|s| s.to_owned())); - - if let Some(availability_zone) = availability_zone { - connstr = connstr.extend_options([format!("availability_zone={}", availability_zone)]); - } - - Ok(connstr) -} - #[cfg(test)] mod tests { use super::*;