diff --git a/Cargo.lock b/Cargo.lock index f92da5ec51..8ac198c364 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -6781,6 +6781,7 @@ dependencies = [ "serde_assert", "serde_json", "serde_path_to_error", + "serde_with", "signal-hook", "strum", "strum_macros", diff --git a/libs/utils/Cargo.toml b/libs/utils/Cargo.toml index 545317f958..a51dff7202 100644 --- a/libs/utils/Cargo.toml +++ b/libs/utils/Cargo.toml @@ -32,6 +32,7 @@ pin-project-lite.workspace = true regex.workspace = true routerify.workspace = true serde.workspace = true +serde_with.workspace = true serde_json.workspace = true signal-hook.workspace = true thiserror.workspace = true diff --git a/libs/utils/src/postgres_client.rs b/libs/utils/src/postgres_client.rs index dba74f5b0b..3073bbde4c 100644 --- a/libs/utils/src/postgres_client.rs +++ b/libs/utils/src/postgres_client.rs @@ -7,29 +7,94 @@ use postgres_connection::{parse_host_port, PgConnectionConfig}; use crate::id::TenantTimelineId; +/// Postgres client protocol types +#[derive( + Copy, + Clone, + PartialEq, + Eq, + strum_macros::EnumString, + strum_macros::Display, + serde_with::DeserializeFromStr, + serde_with::SerializeDisplay, + Debug, +)] +#[strum(serialize_all = "kebab-case")] +#[repr(u8)] +pub enum PostgresClientProtocol { + /// Usual Postgres replication protocol + Vanilla, + /// Custom shard-aware protocol that replicates interpreted records. + /// Used to send wal from safekeeper to pageserver. + Interpreted, +} + +impl TryFrom for PostgresClientProtocol { + type Error = u8; + + fn try_from(value: u8) -> Result { + Ok(match value { + v if v == (PostgresClientProtocol::Vanilla as u8) => PostgresClientProtocol::Vanilla, + v if v == (PostgresClientProtocol::Interpreted as u8) => { + PostgresClientProtocol::Interpreted + } + x => return Err(x), + }) + } +} + +pub struct ConnectionConfigArgs<'a> { + pub protocol: PostgresClientProtocol, + + pub ttid: TenantTimelineId, + pub shard_number: Option, + pub shard_count: Option, + pub shard_stripe_size: Option, + + pub listen_pg_addr_str: &'a str, + + pub auth_token: Option<&'a str>, + pub availability_zone: Option<&'a str>, +} + +impl<'a> ConnectionConfigArgs<'a> { + fn options(&'a self) -> Vec { + let mut options = vec![ + "-c".to_owned(), + format!("timeline_id={}", self.ttid.timeline_id), + format!("tenant_id={}", self.ttid.tenant_id), + format!("protocol={}", self.protocol as u8), + ]; + + if self.shard_number.is_some() { + assert!(self.shard_count.is_some()); + assert!(self.shard_stripe_size.is_some()); + + options.push(format!("shard_count={}", self.shard_count.unwrap())); + options.push(format!("shard_number={}", self.shard_number.unwrap())); + options.push(format!( + "shard_stripe_size={}", + self.shard_stripe_size.unwrap() + )); + } + + options + } +} + /// Create client config for fetching WAL from safekeeper on particular timeline. /// listen_pg_addr_str is in form host:\[port\]. pub fn wal_stream_connection_config( - TenantTimelineId { - tenant_id, - timeline_id, - }: TenantTimelineId, - listen_pg_addr_str: &str, - auth_token: Option<&str>, - availability_zone: Option<&str>, + args: ConnectionConfigArgs, ) -> anyhow::Result { let (host, port) = - parse_host_port(listen_pg_addr_str).context("Unable to parse listen_pg_addr_str")?; + parse_host_port(args.listen_pg_addr_str).context("Unable to parse listen_pg_addr_str")?; let port = port.unwrap_or(5432); let mut connstr = PgConnectionConfig::new_host_port(host, port) - .extend_options([ - "-c".to_owned(), - format!("timeline_id={}", timeline_id), - format!("tenant_id={}", tenant_id), - ]) - .set_password(auth_token.map(|s| s.to_owned())); + .extend_options(args.options()) + .set_password(args.auth_token.map(|s| s.to_owned())); - if let Some(availability_zone) = availability_zone { + if let Some(availability_zone) = args.availability_zone { connstr = connstr.extend_options([format!("availability_zone={}", availability_zone)]); } diff --git a/pageserver/src/tenant/timeline/walreceiver/connection_manager.rs b/pageserver/src/tenant/timeline/walreceiver/connection_manager.rs index de50f217d8..6fb0eeb8bc 100644 --- a/pageserver/src/tenant/timeline/walreceiver/connection_manager.rs +++ b/pageserver/src/tenant/timeline/walreceiver/connection_manager.rs @@ -36,7 +36,9 @@ use postgres_connection::PgConnectionConfig; use utils::backoff::{ exponential_backoff, DEFAULT_BASE_BACKOFF_SECONDS, DEFAULT_MAX_BACKOFF_SECONDS, }; -use utils::postgres_client::wal_stream_connection_config; +use utils::postgres_client::{ + wal_stream_connection_config, ConnectionConfigArgs, PostgresClientProtocol, +}; use utils::{ id::{NodeId, TenantTimelineId}, lsn::Lsn, @@ -984,15 +986,19 @@ impl ConnectionManagerState { if info.safekeeper_connstr.is_empty() { return None; // no connection string, ignore sk } - match wal_stream_connection_config( - self.id, - info.safekeeper_connstr.as_ref(), - match &self.conf.auth_token { - None => None, - Some(x) => Some(x), - }, - self.conf.availability_zone.as_deref(), - ) { + + let connection_conf_args = ConnectionConfigArgs { + protocol: PostgresClientProtocol::Vanilla, + ttid: self.id, + shard_number: None, + shard_count: None, + shard_stripe_size: None, + listen_pg_addr_str: info.safekeeper_connstr.as_ref(), + auth_token: self.conf.auth_token.as_ref().map(|t| t.as_str()), + availability_zone: self.conf.availability_zone.as_deref() + }; + + match wal_stream_connection_config(connection_conf_args) { Ok(connstr) => Some((*sk_id, info, connstr)), Err(e) => { error!("Failed to create wal receiver connection string from broker data of safekeeper node {}: {e:#}", sk_id); diff --git a/safekeeper/src/recovery.rs b/safekeeper/src/recovery.rs index 9c4149d8f1..7b87166aa0 100644 --- a/safekeeper/src/recovery.rs +++ b/safekeeper/src/recovery.rs @@ -17,6 +17,7 @@ use tokio::{ use tokio_postgres::replication::ReplicationStream; use tokio_postgres::types::PgLsn; use tracing::*; +use utils::postgres_client::{ConnectionConfigArgs, PostgresClientProtocol}; use utils::{id::NodeId, lsn::Lsn, postgres_client::wal_stream_connection_config}; use crate::receive_wal::{WalAcceptor, REPLY_QUEUE_SIZE}; @@ -325,7 +326,17 @@ async fn recovery_stream( conf: &SafeKeeperConf, ) -> anyhow::Result { // TODO: pass auth token - let cfg = wal_stream_connection_config(tli.ttid, &donor.pg_connstr, None, None)?; + let connection_conf_args = ConnectionConfigArgs { + protocol: PostgresClientProtocol::Vanilla, + ttid: tli.ttid, + shard_number: None, + shard_count: None, + shard_stripe_size: None, + listen_pg_addr_str: &donor.pg_connstr, + auth_token: None, + availability_zone: None, + }; + let cfg = wal_stream_connection_config(connection_conf_args)?; let mut cfg = cfg.to_tokio_postgres_config(); // It will make safekeeper give out not committed WAL (up to flush_lsn). cfg.application_name(&format!("safekeeper_{}", conf.my_id));