pageserver: make wal_source_connstring: String a 'wal_source_connconf: PgConnectionConfig`

This commit is contained in:
Egor Suvorov
2022-11-18 21:07:10 +02:00
committed by Egor Suvorov
parent 46ea2a8e96
commit b6989e8928
8 changed files with 140 additions and 68 deletions

2
Cargo.lock generated
View File

@@ -2159,6 +2159,7 @@ dependencies = [
"postgres", "postgres",
"postgres-protocol", "postgres-protocol",
"postgres-types", "postgres-types",
"postgres_connection",
"postgres_ffi", "postgres_ffi",
"pprof", "pprof",
"pq_proto", "pq_proto",
@@ -2411,6 +2412,7 @@ name = "postgres_connection"
version = "0.1.0" version = "0.1.0"
dependencies = [ dependencies = [
"anyhow", "anyhow",
"itertools",
"once_cell", "once_cell",
"postgres", "postgres",
"tokio-postgres", "tokio-postgres",

View File

@@ -7,6 +7,7 @@ edition = "2021"
[dependencies] [dependencies]
anyhow = "1.0" anyhow = "1.0"
itertools = "0.10.3"
postgres = { git = "https://github.com/neondatabase/rust-postgres.git", rev = "d052ee8b86fff9897c77b0fe89ea9daba0e1fa38" } postgres = { git = "https://github.com/neondatabase/rust-postgres.git", rev = "d052ee8b86fff9897c77b0fe89ea9daba0e1fa38" }
tokio-postgres = { git = "https://github.com/neondatabase/rust-postgres.git", rev="d052ee8b86fff9897c77b0fe89ea9daba0e1fa38" } tokio-postgres = { git = "https://github.com/neondatabase/rust-postgres.git", rev="d052ee8b86fff9897c77b0fe89ea9daba0e1fa38" }
url = "2.2.2" url = "2.2.2"

View File

@@ -1,4 +1,6 @@
use anyhow::{bail, Context}; use anyhow::{bail, Context};
use itertools::Itertools;
use std::borrow::Cow;
use std::fmt; use std::fmt;
use url::Host; use url::Host;
@@ -59,10 +61,12 @@ mod tests_parse_host_port {
} }
} }
#[derive(Clone)]
pub struct PgConnectionConfig { pub struct PgConnectionConfig {
host: Host, host: Host,
port: u16, port: u16,
password: Option<String>, password: Option<String>,
options: Vec<String>,
} }
/// A simplified PostgreSQL connection configuration. Supports only a subset of possible /// A simplified PostgreSQL connection configuration. Supports only a subset of possible
@@ -75,6 +79,7 @@ impl PgConnectionConfig {
host, host,
port, port,
password: None, password: None,
options: vec![],
} }
} }
@@ -101,6 +106,11 @@ impl PgConnectionConfig {
self self
} }
pub fn extend_options<I: IntoIterator<Item = S>, S: Into<String>>(mut self, i: I) -> Self {
self.options.extend(i.into_iter().map(|s| s.into()));
self
}
/// Return a `<host>:<port>` string. /// Return a `<host>:<port>` string.
pub fn raw_address(&self) -> String { pub fn raw_address(&self) -> String {
format!("{}:{}", self.host(), self.port()) format!("{}:{}", self.host(), self.port())
@@ -117,6 +127,36 @@ impl PgConnectionConfig {
if let Some(password) = &self.password { if let Some(password) = &self.password {
config.password(password); config.password(password);
} }
if !self.options.is_empty() {
// These options are command-line options and should be escaped before being passed
// as an 'options' connection string parameter, see
// https://www.postgresql.org/docs/15/libpq-connect.html#LIBPQ-CONNECT-OPTIONS
//
// They will be space-separated, so each space inside an option should be escaped,
// and all backslashes should be escaped before that. Although we don't expect options
// with spaces at the moment, they're supported by PostgreSQL. Hence we support them
// in this typesafe interface.
//
// We use `Cow` to avoid allocations in the best case (no escaping). A fully imperative
// solution would require 1-2 allocations in the worst case as well, but it's harder to
// implement and this function is hardly a bottleneck. The function is only called around
// establishing a new connection.
#[allow(unstable_name_collisions)]
config.options(
&self
.options
.iter()
.map(|s| {
if s.contains(['\\', ' ']) {
Cow::Owned(s.replace('\\', "\\\\").replace(' ', "\\ "))
} else {
Cow::Borrowed(s.as_str())
}
})
.intersperse(Cow::Borrowed(" ")) // TODO: use impl from std once it's stabilized
.collect::<String>(),
);
}
config config
} }
@@ -193,4 +233,21 @@ mod tests_pg_connection_config {
"PgConnectionConfig { host: Domain(\"stub.host.example\"), port: 123, password: Some(REDACTED-STRING) }" "PgConnectionConfig { host: Domain(\"stub.host.example\"), port: 123, password: Some(REDACTED-STRING) }"
); );
} }
#[test]
fn test_with_options() {
let cfg = PgConnectionConfig::new_host_port(STUB_HOST.clone(), 123).extend_options([
"hello",
"world",
"with space",
"and \\ backslashes",
]);
assert_eq!(cfg.host(), &*STUB_HOST);
assert_eq!(cfg.port(), 123);
assert_eq!(cfg.raw_address(), "stub.host.example:123");
assert_eq!(
cfg.to_tokio_postgres_config().get_options(),
Some("hello world with\\ space and\\ \\\\\\ backslashes")
);
}
} }

View File

@@ -61,6 +61,7 @@ walkdir = "2.3.2"
etcd_broker = { path = "../libs/etcd_broker" } etcd_broker = { path = "../libs/etcd_broker" }
metrics = { path = "../libs/metrics" } metrics = { path = "../libs/metrics" }
pageserver_api = { path = "../libs/pageserver_api" } pageserver_api = { path = "../libs/pageserver_api" }
postgres_connection = { path = "../libs/postgres_connection" }
postgres_ffi = { path = "../libs/postgres_ffi" } postgres_ffi = { path = "../libs/postgres_ffi" }
pq_proto = { path = "../libs/pq_proto" } pq_proto = { path = "../libs/pq_proto" }
remote_storage = { path = "../libs/remote_storage" } remote_storage = { path = "../libs/remote_storage" }

View File

@@ -91,7 +91,7 @@ async fn build_timeline_info(
let guard = timeline.last_received_wal.lock().unwrap(); let guard = timeline.last_received_wal.lock().unwrap();
if let Some(info) = guard.as_ref() { if let Some(info) = guard.as_ref() {
( (
Some(info.wal_source_connstr.clone()), Some(format!("{:?}", info.wal_source_connconf)), // Password is hidden, but it's for statistics only.
Some(info.last_received_msg_lsn), Some(info.last_received_msg_lsn),
Some(info.last_received_msg_ts), Some(info.last_received_msg_ts),
) )

View File

@@ -40,6 +40,7 @@ use crate::pgdatadir_mapping::{is_rel_fsm_block_key, is_rel_vm_block_key};
use crate::tenant_config::TenantConfOpt; use crate::tenant_config::TenantConfOpt;
use pageserver_api::reltag::RelTag; use pageserver_api::reltag::RelTag;
use postgres_connection::PgConnectionConfig;
use postgres_ffi::to_pg_timestamp; use postgres_ffi::to_pg_timestamp;
use utils::{ use utils::{
id::{TenantId, TimelineId}, id::{TenantId, TimelineId},
@@ -296,7 +297,7 @@ impl LogicalSize {
} }
pub struct WalReceiverInfo { pub struct WalReceiverInfo {
pub wal_source_connstr: String, pub wal_source_connconf: PgConnectionConfig,
pub last_received_msg_lsn: Lsn, pub last_received_msg_lsn: Lsn,
pub last_received_msg_ts: u128, pub last_received_msg_ts: u128,
} }

View File

@@ -35,6 +35,7 @@ use crate::{
exponential_backoff, walreceiver::get_etcd_client, DEFAULT_BASE_BACKOFF_SECONDS, exponential_backoff, walreceiver::get_etcd_client, DEFAULT_BASE_BACKOFF_SECONDS,
DEFAULT_MAX_BACKOFF_SECONDS, DEFAULT_MAX_BACKOFF_SECONDS,
}; };
use postgres_connection::{parse_host_port, PgConnectionConfig};
use utils::{ use utils::{
id::{NodeId, TenantTimelineId}, id::{NodeId, TenantTimelineId},
lsn::Lsn, lsn::Lsn,
@@ -247,7 +248,7 @@ async fn connection_manager_loop_step(
walreceiver_state walreceiver_state
.change_connection( .change_connection(
new_candidate.safekeeper_id, new_candidate.safekeeper_id,
new_candidate.wal_source_connstr, new_candidate.wal_source_connconf,
) )
.await .await
} }
@@ -425,7 +426,11 @@ impl WalreceiverState {
} }
/// Shuts down the current connection (if any) and immediately starts another one with the given connection string. /// Shuts down the current connection (if any) and immediately starts another one with the given connection string.
async fn change_connection(&mut self, new_sk_id: NodeId, new_wal_source_connstr: String) { async fn change_connection(
&mut self,
new_sk_id: NodeId,
new_wal_source_connconf: PgConnectionConfig,
) {
self.drop_old_connection(true).await; self.drop_old_connection(true).await;
let id = self.id; let id = self.id;
@@ -435,7 +440,7 @@ impl WalreceiverState {
async move { async move {
super::walreceiver_connection::handle_walreceiver_connection( super::walreceiver_connection::handle_walreceiver_connection(
timeline, timeline,
new_wal_source_connstr, new_wal_source_connconf,
events_sender, events_sender,
cancellation, cancellation,
connect_timeout, connect_timeout,
@@ -575,7 +580,7 @@ impl WalreceiverState {
Some(existing_wal_connection) => { Some(existing_wal_connection) => {
let connected_sk_node = existing_wal_connection.sk_id; let connected_sk_node = existing_wal_connection.sk_id;
let (new_sk_id, new_safekeeper_etcd_data, new_wal_source_connstr) = let (new_sk_id, new_safekeeper_etcd_data, new_wal_source_connconf) =
self.select_connection_candidate(Some(connected_sk_node))?; self.select_connection_candidate(Some(connected_sk_node))?;
let now = Utc::now().naive_utc(); let now = Utc::now().naive_utc();
@@ -586,7 +591,7 @@ impl WalreceiverState {
if latest_interaciton > self.wal_connect_timeout { if latest_interaciton > self.wal_connect_timeout {
return Some(NewWalConnectionCandidate { return Some(NewWalConnectionCandidate {
safekeeper_id: new_sk_id, safekeeper_id: new_sk_id,
wal_source_connstr: new_wal_source_connstr, wal_source_connconf: new_wal_source_connconf,
reason: ReconnectReason::NoKeepAlives { reason: ReconnectReason::NoKeepAlives {
last_keep_alive: Some( last_keep_alive: Some(
existing_wal_connection.status.latest_connection_update, existing_wal_connection.status.latest_connection_update,
@@ -611,7 +616,7 @@ impl WalreceiverState {
if new_sk_lsn_advantage >= self.max_lsn_wal_lag.get() { if new_sk_lsn_advantage >= self.max_lsn_wal_lag.get() {
return Some(NewWalConnectionCandidate { return Some(NewWalConnectionCandidate {
safekeeper_id: new_sk_id, safekeeper_id: new_sk_id,
wal_source_connstr: new_wal_source_connstr, wal_source_connconf: new_wal_source_connconf,
reason: ReconnectReason::LaggingWal { reason: ReconnectReason::LaggingWal {
current_commit_lsn, current_commit_lsn,
new_commit_lsn, new_commit_lsn,
@@ -685,7 +690,7 @@ impl WalreceiverState {
{ {
return Some(NewWalConnectionCandidate { return Some(NewWalConnectionCandidate {
safekeeper_id: new_sk_id, safekeeper_id: new_sk_id,
wal_source_connstr: new_wal_source_connstr, wal_source_connconf: new_wal_source_connconf,
reason: ReconnectReason::NoWalTimeout { reason: ReconnectReason::NoWalTimeout {
current_lsn, current_lsn,
current_commit_lsn, current_commit_lsn,
@@ -704,11 +709,11 @@ impl WalreceiverState {
self.wal_connection.as_mut().unwrap().discovered_new_wal = discovered_new_wal; self.wal_connection.as_mut().unwrap().discovered_new_wal = discovered_new_wal;
} }
None => { None => {
let (new_sk_id, _, new_wal_source_connstr) = let (new_sk_id, _, new_wal_source_connconf) =
self.select_connection_candidate(None)?; self.select_connection_candidate(None)?;
return Some(NewWalConnectionCandidate { return Some(NewWalConnectionCandidate {
safekeeper_id: new_sk_id, safekeeper_id: new_sk_id,
wal_source_connstr: new_wal_source_connstr, wal_source_connconf: new_wal_source_connconf,
reason: ReconnectReason::NoExistingConnection, reason: ReconnectReason::NoExistingConnection,
}); });
} }
@@ -726,7 +731,7 @@ impl WalreceiverState {
fn select_connection_candidate( fn select_connection_candidate(
&self, &self,
node_to_omit: Option<NodeId>, node_to_omit: Option<NodeId>,
) -> Option<(NodeId, &SkTimelineInfo, String)> { ) -> Option<(NodeId, &SkTimelineInfo, PgConnectionConfig)> {
self.applicable_connection_candidates() self.applicable_connection_candidates()
.filter(|&(sk_id, _, _)| Some(sk_id) != node_to_omit) .filter(|&(sk_id, _, _)| Some(sk_id) != node_to_omit)
.max_by_key(|(_, info, _)| info.commit_lsn) .max_by_key(|(_, info, _)| info.commit_lsn)
@@ -736,7 +741,7 @@ impl WalreceiverState {
/// Some safekeepers are filtered by the retry cooldown. /// Some safekeepers are filtered by the retry cooldown.
fn applicable_connection_candidates( fn applicable_connection_candidates(
&self, &self,
) -> impl Iterator<Item = (NodeId, &SkTimelineInfo, String)> { ) -> impl Iterator<Item = (NodeId, &SkTimelineInfo, PgConnectionConfig)> {
let now = Utc::now().naive_utc(); let now = Utc::now().naive_utc();
self.wal_stream_candidates self.wal_stream_candidates
@@ -754,7 +759,7 @@ impl WalreceiverState {
}) })
.filter_map(|(sk_id, etcd_info)| { .filter_map(|(sk_id, etcd_info)| {
let info = &etcd_info.timeline; let info = &etcd_info.timeline;
match wal_stream_connection_string( match wal_stream_connection_config(
self.id, self.id,
info.safekeeper_connstr.as_deref()?, info.safekeeper_connstr.as_deref()?,
) { ) {
@@ -797,10 +802,12 @@ impl WalreceiverState {
} }
} }
#[derive(Debug, PartialEq, Eq)] #[derive(Debug)]
struct NewWalConnectionCandidate { struct NewWalConnectionCandidate {
safekeeper_id: NodeId, safekeeper_id: NodeId,
wal_source_connstr: String, wal_source_connconf: PgConnectionConfig,
// This field is used in `derive(Debug)` only.
#[allow(dead_code)]
reason: ReconnectReason, reason: ReconnectReason,
} }
@@ -828,34 +835,30 @@ enum ReconnectReason {
}, },
} }
fn wal_stream_connection_string( fn wal_stream_connection_config(
TenantTimelineId { TenantTimelineId {
tenant_id, tenant_id,
timeline_id, timeline_id,
}: TenantTimelineId, }: TenantTimelineId,
listen_pg_addr_str: &str, listen_pg_addr_str: &str,
) -> anyhow::Result<String> { ) -> anyhow::Result<PgConnectionConfig> {
let sk_connstr = format!("postgresql://no_user@{listen_pg_addr_str}/no_db"); let (host, port) =
sk_connstr parse_host_port(&listen_pg_addr_str).context("Unable to parse listen_pg_addr_str")?;
.parse() let port = port.unwrap_or(5432);
.context("bad url") Ok(
.and_then(|url: url::Url| { PgConnectionConfig::new_host_port(host, port).extend_options([
let host = url.host_str().context("host is missing")?; "-c".to_owned(),
let port = url.port().unwrap_or(5432); // default PG port format!("timeline_id={}", timeline_id),
format!("tenant_id={}", tenant_id),
Ok(format!( ]),
"host={host} \ )
port={port} \
options='-c timeline_id={timeline_id} tenant_id={tenant_id}'"
))
})
.with_context(|| format!("Failed to parse pageserver connection URL '{sk_connstr}'"))
} }
#[cfg(test)] #[cfg(test)]
mod tests { mod tests {
use super::*; use super::*;
use crate::tenant::harness::{TenantHarness, TIMELINE_ID}; use crate::tenant::harness::{TenantHarness, TIMELINE_ID};
use url::Host;
#[test] #[test]
fn no_connection_no_candidate() -> anyhow::Result<()> { fn no_connection_no_candidate() -> anyhow::Result<()> {
@@ -992,7 +995,7 @@ mod tests {
peer_horizon_lsn: None, peer_horizon_lsn: None,
local_start_lsn: None, local_start_lsn: None,
safekeeper_connstr: Some(DUMMY_SAFEKEEPER_CONNSTR.to_string()), safekeeper_connstr: Some(DUMMY_SAFEKEEPER_HOST.to_string()),
}, },
etcd_version: 0, etcd_version: 0,
latest_update: now, latest_update: now,
@@ -1064,7 +1067,7 @@ mod tests {
peer_horizon_lsn: None, peer_horizon_lsn: None,
local_start_lsn: None, local_start_lsn: None,
safekeeper_connstr: Some(DUMMY_SAFEKEEPER_CONNSTR.to_string()), safekeeper_connstr: Some(DUMMY_SAFEKEEPER_HOST.to_string()),
}, },
etcd_version: 0, etcd_version: 0,
latest_update: now, latest_update: now,
@@ -1080,9 +1083,10 @@ mod tests {
ReconnectReason::NoExistingConnection, ReconnectReason::NoExistingConnection,
"Should select new safekeeper due to missing connection, even if there's also a lag in the wal over the threshold" "Should select new safekeeper due to missing connection, even if there's also a lag in the wal over the threshold"
); );
assert!(only_candidate assert_eq!(
.wal_source_connstr only_candidate.wal_source_connconf.host(),
.contains(DUMMY_SAFEKEEPER_CONNSTR)); &Host::Domain(DUMMY_SAFEKEEPER_HOST.to_owned())
);
let selected_lsn = 100_000; let selected_lsn = 100_000;
state.wal_stream_candidates = HashMap::from([ state.wal_stream_candidates = HashMap::from([
@@ -1116,7 +1120,7 @@ mod tests {
peer_horizon_lsn: None, peer_horizon_lsn: None,
local_start_lsn: None, local_start_lsn: None,
safekeeper_connstr: Some(DUMMY_SAFEKEEPER_CONNSTR.to_string()), safekeeper_connstr: Some(DUMMY_SAFEKEEPER_HOST.to_string()),
}, },
etcd_version: 0, etcd_version: 0,
latest_update: now, latest_update: now,
@@ -1151,9 +1155,10 @@ mod tests {
ReconnectReason::NoExistingConnection, ReconnectReason::NoExistingConnection,
"Should select new safekeeper due to missing connection, even if there's also a lag in the wal over the threshold" "Should select new safekeeper due to missing connection, even if there's also a lag in the wal over the threshold"
); );
assert!(biggest_wal_candidate assert_eq!(
.wal_source_connstr biggest_wal_candidate.wal_source_connconf.host(),
.contains(DUMMY_SAFEKEEPER_CONNSTR)); &Host::Domain(DUMMY_SAFEKEEPER_HOST.to_owned())
);
Ok(()) Ok(())
} }
@@ -1181,7 +1186,7 @@ mod tests {
peer_horizon_lsn: None, peer_horizon_lsn: None,
local_start_lsn: None, local_start_lsn: None,
safekeeper_connstr: Some(DUMMY_SAFEKEEPER_CONNSTR.to_string()), safekeeper_connstr: Some(DUMMY_SAFEKEEPER_HOST.to_string()),
}, },
etcd_version: 0, etcd_version: 0,
latest_update: now, latest_update: now,
@@ -1199,7 +1204,7 @@ mod tests {
peer_horizon_lsn: None, peer_horizon_lsn: None,
local_start_lsn: None, local_start_lsn: None,
safekeeper_connstr: Some(DUMMY_SAFEKEEPER_CONNSTR.to_string()), safekeeper_connstr: Some(DUMMY_SAFEKEEPER_HOST.to_string()),
}, },
etcd_version: 0, etcd_version: 0,
latest_update: now, latest_update: now,
@@ -1270,7 +1275,7 @@ mod tests {
peer_horizon_lsn: None, peer_horizon_lsn: None,
local_start_lsn: None, local_start_lsn: None,
safekeeper_connstr: Some(DUMMY_SAFEKEEPER_CONNSTR.to_string()), safekeeper_connstr: Some(DUMMY_SAFEKEEPER_HOST.to_string()),
}, },
etcd_version: 0, etcd_version: 0,
latest_update: now, latest_update: now,
@@ -1310,9 +1315,10 @@ mod tests {
}, },
"Should select bigger WAL safekeeper if it starts to lag enough" "Should select bigger WAL safekeeper if it starts to lag enough"
); );
assert!(over_threshcurrent_candidate assert_eq!(
.wal_source_connstr over_threshcurrent_candidate.wal_source_connconf.host(),
.contains("advanced_by_lsn_safekeeper")); &Host::Domain("advanced_by_lsn_safekeeper".to_owned())
);
Ok(()) Ok(())
} }
@@ -1361,7 +1367,7 @@ mod tests {
peer_horizon_lsn: None, peer_horizon_lsn: None,
local_start_lsn: None, local_start_lsn: None,
safekeeper_connstr: Some(DUMMY_SAFEKEEPER_CONNSTR.to_string()), safekeeper_connstr: Some(DUMMY_SAFEKEEPER_HOST.to_string()),
}, },
etcd_version: 0, etcd_version: 0,
latest_update: now, latest_update: now,
@@ -1384,9 +1390,10 @@ mod tests {
} }
unexpected => panic!("Unexpected reason: {unexpected:?}"), unexpected => panic!("Unexpected reason: {unexpected:?}"),
} }
assert!(over_threshcurrent_candidate assert_eq!(
.wal_source_connstr over_threshcurrent_candidate.wal_source_connconf.host(),
.contains(DUMMY_SAFEKEEPER_CONNSTR)); &Host::Domain(DUMMY_SAFEKEEPER_HOST.to_owned())
);
Ok(()) Ok(())
} }
@@ -1434,7 +1441,7 @@ mod tests {
peer_horizon_lsn: None, peer_horizon_lsn: None,
local_start_lsn: None, local_start_lsn: None,
safekeeper_connstr: Some(DUMMY_SAFEKEEPER_CONNSTR.to_string()), safekeeper_connstr: Some(DUMMY_SAFEKEEPER_HOST.to_string()),
}, },
etcd_version: 0, etcd_version: 0,
latest_update: now, latest_update: now,
@@ -1463,14 +1470,15 @@ mod tests {
} }
unexpected => panic!("Unexpected reason: {unexpected:?}"), unexpected => panic!("Unexpected reason: {unexpected:?}"),
} }
assert!(over_threshcurrent_candidate assert_eq!(
.wal_source_connstr over_threshcurrent_candidate.wal_source_connconf.host(),
.contains(DUMMY_SAFEKEEPER_CONNSTR)); &Host::Domain(DUMMY_SAFEKEEPER_HOST.to_owned())
);
Ok(()) Ok(())
} }
const DUMMY_SAFEKEEPER_CONNSTR: &str = "safekeeper_connstr"; const DUMMY_SAFEKEEPER_HOST: &str = "safekeeper_connstr";
fn dummy_state(harness: &TenantHarness<'_>) -> WalreceiverState { fn dummy_state(harness: &TenantHarness<'_>) -> WalreceiverState {
WalreceiverState { WalreceiverState {

View File

@@ -30,6 +30,7 @@ use crate::{
walingest::WalIngest, walingest::WalIngest,
walrecord::DecodedWALRecord, walrecord::DecodedWALRecord,
}; };
use postgres_connection::PgConnectionConfig;
use postgres_ffi::waldecoder::WalStreamDecoder; use postgres_ffi::waldecoder::WalStreamDecoder;
use pq_proto::ReplicationFeedback; use pq_proto::ReplicationFeedback;
use utils::{id::TenantTimelineId, lsn::Lsn}; use utils::{id::TenantTimelineId, lsn::Lsn};
@@ -56,22 +57,23 @@ pub struct WalConnectionStatus {
/// messages as we go. /// messages as we go.
pub async fn handle_walreceiver_connection( pub async fn handle_walreceiver_connection(
timeline: Arc<Timeline>, timeline: Arc<Timeline>,
wal_source_connstr: String, wal_source_connconf: PgConnectionConfig,
events_sender: watch::Sender<TaskStateUpdate<WalConnectionStatus>>, events_sender: watch::Sender<TaskStateUpdate<WalConnectionStatus>>,
mut cancellation: watch::Receiver<()>, mut cancellation: watch::Receiver<()>,
connect_timeout: Duration, connect_timeout: Duration,
) -> anyhow::Result<()> { ) -> anyhow::Result<()> {
// Connect to the database in replication mode. // Connect to the database in replication mode.
info!("connecting to {wal_source_connstr}"); info!("connecting to {wal_source_connconf:?}");
let connect_cfg = format!("{wal_source_connstr} application_name=pageserver replication=true");
let (mut replication_client, connection) = time::timeout( let (mut replication_client, connection) = {
connect_timeout, let mut config = wal_source_connconf.to_tokio_postgres_config();
tokio_postgres::connect(&connect_cfg, postgres::NoTls), config.application_name("pageserver");
) config.replication_mode(tokio_postgres::config::ReplicationMode::Physical);
.await time::timeout(connect_timeout, config.connect(postgres::NoTls))
.context("Timed out while waiting for walreceiver connection to open")? .await
.context("Failed to open walreceiver connection")?; .context("Timed out while waiting for walreceiver connection to open")?
.context("Failed to open walreceiver connection")?
};
info!("connected!"); info!("connected!");
let mut connection_status = WalConnectionStatus { let mut connection_status = WalConnectionStatus {
@@ -316,7 +318,7 @@ pub async fn handle_walreceiver_connection(
// Update the status about what we just received. This is shown in the mgmt API. // Update the status about what we just received. This is shown in the mgmt API.
let last_received_wal = WalReceiverInfo { let last_received_wal = WalReceiverInfo {
wal_source_connstr: wal_source_connstr.to_owned(), wal_source_connconf: wal_source_connconf.clone(),
last_received_msg_lsn: last_lsn, last_received_msg_lsn: last_lsn,
last_received_msg_ts: ts last_received_msg_ts: ts
.duration_since(SystemTime::UNIX_EPOCH) .duration_since(SystemTime::UNIX_EPOCH)