From 70cdd5629445e5be883c40517eea66c56326ae90 Mon Sep 17 00:00:00 2001 From: Vlad Lazar Date: Tue, 5 Nov 2024 17:08:06 +0100 Subject: [PATCH] pageserver: include shard id when subscribing to safekeeper --- libs/utils/src/postgres_client.rs | 66 ++++++++++++++----- .../walreceiver/connection_manager.rs | 26 +++++--- safekeeper/src/recovery.rs | 13 +++- 3 files changed, 79 insertions(+), 26 deletions(-) diff --git a/libs/utils/src/postgres_client.rs b/libs/utils/src/postgres_client.rs index dba74f5b0b..ec7a13229f 100644 --- a/libs/utils/src/postgres_client.rs +++ b/libs/utils/src/postgres_client.rs @@ -7,29 +7,65 @@ use postgres_connection::{parse_host_port, PgConnectionConfig}; use crate::id::TenantTimelineId; +/// Protocol used for safekeeper recovery. This sends raw Postgres WAL. +pub const POSTGRES_PROTO_VERSION: u8 = 0; +/// Protocol used for safekeeper to pageserver communication. +/// This sends interpreted WAL records for the pageserver to ingest +/// and is shard-aware. +pub const PAGESERVER_SAFEKEEPER_PROTO_VERSION: u8 = 1; + +pub struct ConnectionConfigArgs<'a> { + pub protocol_version: u8, + + pub ttid: TenantTimelineId, + pub shard_number: Option, + pub shard_count: Option, + pub shard_stripe_size: Option, + + pub listen_pg_addr_str: &'a str, + + pub auth_token: Option<&'a str>, + pub availability_zone: Option<&'a str>, +} + +impl<'a> ConnectionConfigArgs<'a> { + fn options(&'a self) -> Vec { + let mut options = vec![ + "-c".to_owned(), + format!("timeline_id={}", self.ttid.timeline_id), + format!("tenant_id={}", self.ttid.tenant_id), + format!("protocol_version={}", self.protocol_version), + ]; + + if self.shard_number.is_some() { + assert!(self.shard_count.is_some()); + assert!(self.shard_stripe_size.is_some()); + + options.push(format!("shard_count={}", self.shard_count.unwrap())); + options.push(format!("shard_number={}", self.shard_number.unwrap())); + options.push(format!( + "shard_stripe_size={}", + self.shard_stripe_size.unwrap() + )); + } + + options + } +} + /// Create client config for fetching WAL from safekeeper on particular timeline. /// listen_pg_addr_str is in form host:\[port\]. pub fn wal_stream_connection_config( - TenantTimelineId { - tenant_id, - timeline_id, - }: TenantTimelineId, - listen_pg_addr_str: &str, - auth_token: Option<&str>, - availability_zone: Option<&str>, + args: ConnectionConfigArgs, ) -> anyhow::Result { let (host, port) = - parse_host_port(listen_pg_addr_str).context("Unable to parse listen_pg_addr_str")?; + parse_host_port(args.listen_pg_addr_str).context("Unable to parse listen_pg_addr_str")?; let port = port.unwrap_or(5432); let mut connstr = PgConnectionConfig::new_host_port(host, port) - .extend_options([ - "-c".to_owned(), - format!("timeline_id={}", timeline_id), - format!("tenant_id={}", tenant_id), - ]) - .set_password(auth_token.map(|s| s.to_owned())); + .extend_options(args.options()) + .set_password(args.auth_token.map(|s| s.to_owned())); - if let Some(availability_zone) = availability_zone { + if let Some(availability_zone) = args.availability_zone { connstr = connstr.extend_options([format!("availability_zone={}", availability_zone)]); } diff --git a/pageserver/src/tenant/timeline/walreceiver/connection_manager.rs b/pageserver/src/tenant/timeline/walreceiver/connection_manager.rs index de50f217d8..4d6a97ffb8 100644 --- a/pageserver/src/tenant/timeline/walreceiver/connection_manager.rs +++ b/pageserver/src/tenant/timeline/walreceiver/connection_manager.rs @@ -36,7 +36,9 @@ use postgres_connection::PgConnectionConfig; use utils::backoff::{ exponential_backoff, DEFAULT_BASE_BACKOFF_SECONDS, DEFAULT_MAX_BACKOFF_SECONDS, }; -use utils::postgres_client::wal_stream_connection_config; +use utils::postgres_client::{ + wal_stream_connection_config, ConnectionConfigArgs, PAGESERVER_SAFEKEEPER_PROTO_VERSION, +}; use utils::{ id::{NodeId, TenantTimelineId}, lsn::Lsn, @@ -984,15 +986,19 @@ impl ConnectionManagerState { if info.safekeeper_connstr.is_empty() { return None; // no connection string, ignore sk } - match wal_stream_connection_config( - self.id, - info.safekeeper_connstr.as_ref(), - match &self.conf.auth_token { - None => None, - Some(x) => Some(x), - }, - self.conf.availability_zone.as_deref(), - ) { + + let shard_identity = self.timeline.get_shard_identity(); + let connection_conf_args = ConnectionConfigArgs { + protocol_version: PAGESERVER_SAFEKEEPER_PROTO_VERSION, + ttid: self.id, + shard_number: Some(shard_identity.number.0), + shard_count: Some(shard_identity.count.0), + shard_stripe_size: Some(shard_identity.stripe_size.0), + listen_pg_addr_str: info.safekeeper_connstr.as_ref(), + auth_token: self.conf.auth_token.as_ref().map(|t| t.as_str()), + availability_zone: self.conf.availability_zone.as_deref() + }; + match wal_stream_connection_config(connection_conf_args) { Ok(connstr) => Some((*sk_id, info, connstr)), Err(e) => { error!("Failed to create wal receiver connection string from broker data of safekeeper node {}: {e:#}", sk_id); diff --git a/safekeeper/src/recovery.rs b/safekeeper/src/recovery.rs index 9c4149d8f1..ba9e989a32 100644 --- a/safekeeper/src/recovery.rs +++ b/safekeeper/src/recovery.rs @@ -17,6 +17,7 @@ use tokio::{ use tokio_postgres::replication::ReplicationStream; use tokio_postgres::types::PgLsn; use tracing::*; +use utils::postgres_client::{ConnectionConfigArgs, POSTGRES_PROTO_VERSION}; use utils::{id::NodeId, lsn::Lsn, postgres_client::wal_stream_connection_config}; use crate::receive_wal::{WalAcceptor, REPLY_QUEUE_SIZE}; @@ -325,7 +326,17 @@ async fn recovery_stream( conf: &SafeKeeperConf, ) -> anyhow::Result { // TODO: pass auth token - let cfg = wal_stream_connection_config(tli.ttid, &donor.pg_connstr, None, None)?; + let connection_conf_args = ConnectionConfigArgs { + protocol_version: POSTGRES_PROTO_VERSION, + ttid: tli.ttid, + shard_number: None, + shard_count: None, + shard_stripe_size: None, + listen_pg_addr_str: &donor.pg_connstr, + auth_token: None, + availability_zone: None, + }; + let cfg = wal_stream_connection_config(connection_conf_args)?; let mut cfg = cfg.to_tokio_postgres_config(); // It will make safekeeper give out not committed WAL (up to flush_lsn). cfg.application_name(&format!("safekeeper_{}", conf.my_id));