From 262319387619c18f700b2b03c7e3ad4fc25daa9e Mon Sep 17 00:00:00 2001 From: Kirill Bulatov Date: Thu, 2 Jun 2022 23:26:28 +0300 Subject: [PATCH] Remove pageserver_connstr from WAL stream logic --- libs/etcd_broker/src/lib.rs | 2 -- pageserver/src/walreceiver.rs | 50 +++-------------------------------- safekeeper/src/handler.rs | 18 ++++--------- safekeeper/src/receive_wal.rs | 14 ++-------- safekeeper/src/send_wal.rs | 3 +-- safekeeper/src/timeline.rs | 42 ++--------------------------- 6 files changed, 13 insertions(+), 116 deletions(-) diff --git a/libs/etcd_broker/src/lib.rs b/libs/etcd_broker/src/lib.rs index 81353450e0..6b3293ec40 100644 --- a/libs/etcd_broker/src/lib.rs +++ b/libs/etcd_broker/src/lib.rs @@ -57,8 +57,6 @@ pub struct SkTimelineInfo { pub peer_horizon_lsn: Option, #[serde(default)] pub safekeeper_connstr: Option, - #[serde(default)] - pub pageserver_connstr: Option, } #[derive(Debug, thiserror::Error)] diff --git a/pageserver/src/walreceiver.rs b/pageserver/src/walreceiver.rs index 11c8617a57..202a13545d 100644 --- a/pageserver/src/walreceiver.rs +++ b/pageserver/src/walreceiver.rs @@ -659,7 +659,6 @@ impl WalConnectionManager { match wal_stream_connection_string( self.id, info.safekeeper_connstr.as_deref()?, - info.pageserver_connstr.as_deref()?, ) { Ok(connstr) => Some((sk_id, info, connstr)), Err(e) => { @@ -749,7 +748,6 @@ fn wal_stream_connection_string( timeline_id, }: ZTenantTimelineId, listen_pg_addr_str: &str, - pageserver_connstr: &str, ) -> anyhow::Result { let sk_connstr = format!("postgresql://no_user@{listen_pg_addr_str}/no_db"); let me_conf = sk_connstr @@ -759,7 +757,7 @@ fn wal_stream_connection_string( })?; let (host, port) = utils::connstring::connection_host_port(&me_conf); Ok(format!( - "host={host} port={port} options='-c ztimelineid={timeline_id} ztenantid={tenant_id} pageserver_connstr={pageserver_connstr}'", + "host={host} port={port} options='-c ztimelineid={timeline_id} ztenantid={tenant_id}'" )) } @@ -792,20 +790,6 @@ mod tests { remote_consistent_lsn: None, peer_horizon_lsn: None, safekeeper_connstr: None, - pageserver_connstr: Some("no safekeeper_connstr".to_string()), - }, - ), - ( - NodeId(1), - SkTimelineInfo { - last_log_term: None, - flush_lsn: None, - commit_lsn: Some(Lsn(1)), - backup_lsn: None, - remote_consistent_lsn: None, - peer_horizon_lsn: None, - safekeeper_connstr: Some("no pageserver_connstr".to_string()), - pageserver_connstr: None, }, ), ( @@ -818,7 +802,6 @@ mod tests { remote_consistent_lsn: None, peer_horizon_lsn: None, safekeeper_connstr: Some("no commit_lsn".to_string()), - pageserver_connstr: Some("no commit_lsn (p)".to_string()), }, ), ( @@ -831,7 +814,6 @@ mod tests { remote_consistent_lsn: None, peer_horizon_lsn: None, safekeeper_connstr: Some("no commit_lsn".to_string()), - pageserver_connstr: Some("no commit_lsn (p)".to_string()), }, ), ])); @@ -887,7 +869,6 @@ mod tests { remote_consistent_lsn: None, peer_horizon_lsn: None, safekeeper_connstr: Some(DUMMY_SAFEKEEPER_CONNSTR.to_string()), - pageserver_connstr: Some(DUMMY_PAGESERVER_CONNSTR.to_string()), }, ), ( @@ -900,7 +881,6 @@ mod tests { remote_consistent_lsn: None, peer_horizon_lsn: None, safekeeper_connstr: Some("not advanced Lsn".to_string()), - pageserver_connstr: Some("not advanced Lsn (p)".to_string()), }, ), ( @@ -915,7 +895,6 @@ mod tests { remote_consistent_lsn: None, peer_horizon_lsn: None, safekeeper_connstr: Some("not enough advanced Lsn".to_string()), - pageserver_connstr: Some("not enough advanced Lsn (p)".to_string()), }, ), ])); @@ -947,7 +926,6 @@ mod tests { remote_consistent_lsn: None, peer_horizon_lsn: None, safekeeper_connstr: Some(DUMMY_SAFEKEEPER_CONNSTR.to_string()), - pageserver_connstr: Some(DUMMY_PAGESERVER_CONNSTR.to_string()), }, )])) .expect("Expected one candidate selected out of the only data option, but got none"); @@ -960,9 +938,6 @@ mod tests { assert!(only_candidate .wal_producer_connstr .contains(DUMMY_SAFEKEEPER_CONNSTR)); - assert!(only_candidate - .wal_producer_connstr - .contains(DUMMY_PAGESERVER_CONNSTR)); let selected_lsn = 100_000; let biggest_wal_candidate = data_manager_with_no_connection @@ -977,7 +952,6 @@ mod tests { remote_consistent_lsn: None, peer_horizon_lsn: None, safekeeper_connstr: Some("smaller commit_lsn".to_string()), - pageserver_connstr: Some("smaller commit_lsn (p)".to_string()), }, ), ( @@ -990,7 +964,6 @@ mod tests { remote_consistent_lsn: None, peer_horizon_lsn: None, safekeeper_connstr: Some(DUMMY_SAFEKEEPER_CONNSTR.to_string()), - pageserver_connstr: Some(DUMMY_PAGESERVER_CONNSTR.to_string()), }, ), ( @@ -1003,9 +976,6 @@ mod tests { remote_consistent_lsn: None, peer_horizon_lsn: None, safekeeper_connstr: None, - pageserver_connstr: Some( - "no safekeeper_connstr despite bigger commit_lsn".to_string(), - ), }, ), ])) @@ -1022,9 +992,6 @@ mod tests { assert!(biggest_wal_candidate .wal_producer_connstr .contains(DUMMY_SAFEKEEPER_CONNSTR)); - assert!(biggest_wal_candidate - .wal_producer_connstr - .contains(DUMMY_PAGESERVER_CONNSTR)); Ok(()) } @@ -1071,7 +1038,6 @@ mod tests { remote_consistent_lsn: None, peer_horizon_lsn: None, safekeeper_connstr: Some(DUMMY_SAFEKEEPER_CONNSTR.to_string()), - pageserver_connstr: Some(DUMMY_PAGESERVER_CONNSTR.to_string()), }, ), ( @@ -1084,7 +1050,6 @@ mod tests { remote_consistent_lsn: None, peer_horizon_lsn: None, safekeeper_connstr: Some("advanced by Lsn safekeeper".to_string()), - pageserver_connstr: Some("advanced by Lsn safekeeper (p)".to_string()), }, ), ]); @@ -1108,9 +1073,6 @@ mod tests { assert!(over_threshcurrent_candidate .wal_producer_connstr .contains("advanced by Lsn safekeeper")); - assert!(over_threshcurrent_candidate - .wal_producer_connstr - .contains("advanced by Lsn safekeeper (p)")); Ok(()) } @@ -1146,7 +1108,6 @@ mod tests { remote_consistent_lsn: None, peer_horizon_lsn: None, safekeeper_connstr: Some(DUMMY_SAFEKEEPER_CONNSTR.to_string()), - pageserver_connstr: Some(DUMMY_PAGESERVER_CONNSTR.to_string()), }, )])) .expect( @@ -1168,9 +1129,6 @@ mod tests { assert!(over_threshcurrent_candidate .wal_producer_connstr .contains(DUMMY_SAFEKEEPER_CONNSTR)); - assert!(over_threshcurrent_candidate - .wal_producer_connstr - .contains(DUMMY_PAGESERVER_CONNSTR)); Ok(()) } @@ -1197,7 +1155,6 @@ mod tests { } const DUMMY_SAFEKEEPER_CONNSTR: &str = "safekeeper_connstr"; - const DUMMY_PAGESERVER_CONNSTR: &str = "pageserver_connstr"; // the function itself does not need async, but it spawns a tokio::task underneath hence neeed // a runtime to not to panic @@ -1205,9 +1162,8 @@ mod tests { id: ZTenantTimelineId, safekeeper_id: NodeId, ) -> WalConnectionData { - let dummy_connstr = - wal_stream_connection_string(id, DUMMY_SAFEKEEPER_CONNSTR, DUMMY_PAGESERVER_CONNSTR) - .expect("Failed to construct dummy wal producer connstr"); + let dummy_connstr = wal_stream_connection_string(id, DUMMY_SAFEKEEPER_CONNSTR) + .expect("Failed to construct dummy wal producer connstr"); WalConnectionData { safekeeper_id, connection: WalReceiverConnection::open( diff --git a/safekeeper/src/handler.rs b/safekeeper/src/handler.rs index 9af78661f9..a8121e829e 100644 --- a/safekeeper/src/handler.rs +++ b/safekeeper/src/handler.rs @@ -29,12 +29,11 @@ pub struct SafekeeperPostgresHandler { pub ztenantid: Option, pub ztimelineid: Option, pub timeline: Option>, - pageserver_connstr: Option, } /// Parsed Postgres command. enum SafekeeperPostgresCommand { - StartWalPush { pageserver_connstr: Option }, + StartWalPush, StartReplication { start_lsn: Lsn }, IdentifySystem, JSONCtrl { cmd: AppendLogicalMessage }, @@ -42,11 +41,7 @@ enum SafekeeperPostgresCommand { fn parse_cmd(cmd: &str) -> Result { if cmd.starts_with("START_WAL_PUSH") { - let re = Regex::new(r"START_WAL_PUSH(?: (.+))?").unwrap(); - - let caps = re.captures(cmd).unwrap(); - let pageserver_connstr = caps.get(1).map(|m| m.as_str().to_owned()); - Ok(SafekeeperPostgresCommand::StartWalPush { pageserver_connstr }) + Ok(SafekeeperPostgresCommand::StartWalPush) } else if cmd.starts_with("START_REPLICATION") { let re = Regex::new(r"START_REPLICATION(?: PHYSICAL)? ([[:xdigit:]]+/[[:xdigit:]]+)").unwrap(); @@ -86,8 +81,6 @@ impl postgres_backend::Handler for SafekeeperPostgresHandler { self.appname = Some(app_name.clone()); } - self.pageserver_connstr = params.get("pageserver_connstr").cloned(); - Ok(()) } else { bail!("Safekeeper received unexpected initial message: {:?}", sm); @@ -113,14 +106,14 @@ impl postgres_backend::Handler for SafekeeperPostgresHandler { } match cmd { - SafekeeperPostgresCommand::StartWalPush { pageserver_connstr } => { - ReceiveWalConn::new(pgb, pageserver_connstr) + SafekeeperPostgresCommand::StartWalPush => { + ReceiveWalConn::new(pgb) .run(self) .context("failed to run ReceiveWalConn")?; } SafekeeperPostgresCommand::StartReplication { start_lsn } => { ReplicationConn::new(pgb) - .run(self, pgb, start_lsn, self.pageserver_connstr.clone()) + .run(self, pgb, start_lsn) .context("failed to run ReplicationConn")?; } SafekeeperPostgresCommand::IdentifySystem => { @@ -142,7 +135,6 @@ impl SafekeeperPostgresHandler { ztenantid: None, ztimelineid: None, timeline: None, - pageserver_connstr: None, } } diff --git a/safekeeper/src/receive_wal.rs b/safekeeper/src/receive_wal.rs index 88b7816912..af4cfb6ba4 100644 --- a/safekeeper/src/receive_wal.rs +++ b/safekeeper/src/receive_wal.rs @@ -32,22 +32,14 @@ pub struct ReceiveWalConn<'pg> { pg_backend: &'pg mut PostgresBackend, /// The cached result of `pg_backend.socket().peer_addr()` (roughly) peer_addr: SocketAddr, - /// Pageserver connection string forwarded from compute - /// NOTE that it is allowed to operate without a pageserver. - /// So if compute has no pageserver configured do not use it. - pageserver_connstr: Option, } impl<'pg> ReceiveWalConn<'pg> { - pub fn new( - pg: &'pg mut PostgresBackend, - pageserver_connstr: Option, - ) -> ReceiveWalConn<'pg> { + pub fn new(pg: &'pg mut PostgresBackend) -> ReceiveWalConn<'pg> { let peer_addr = *pg.get_peer_addr(); ReceiveWalConn { pg_backend: pg, peer_addr, - pageserver_connstr, } } @@ -120,9 +112,7 @@ impl<'pg> ReceiveWalConn<'pg> { // Register the connection and defer unregister. Do that only // after processing first message, as it sets wal_seg_size, // wanted by many. - spg.timeline - .get() - .on_compute_connect(self.pageserver_connstr.as_ref())?; + spg.timeline.get().on_compute_connect()?; _guard = Some(ComputeConnectionGuard { timeline: Arc::clone(spg.timeline.get()), }); diff --git a/safekeeper/src/send_wal.rs b/safekeeper/src/send_wal.rs index 7a6a8ca9b9..fd82a55efa 100644 --- a/safekeeper/src/send_wal.rs +++ b/safekeeper/src/send_wal.rs @@ -162,9 +162,8 @@ impl ReplicationConn { spg: &mut SafekeeperPostgresHandler, pgb: &mut PostgresBackend, mut start_pos: Lsn, - pageserver_connstr: Option, ) -> Result<()> { - let _enter = info_span!("WAL sender", timeline = %spg.ztimelineid.unwrap(), pageserver_connstr = %pageserver_connstr.as_deref().unwrap_or_default()).entered(); + let _enter = info_span!("WAL sender", timeline = %spg.ztimelineid.unwrap()).entered(); // spawn the background thread which receives HotStandbyFeedback messages. let bg_timeline = Arc::clone(spg.timeline.get()); diff --git a/safekeeper/src/timeline.rs b/safekeeper/src/timeline.rs index b7a549fef8..30c94f2543 100644 --- a/safekeeper/src/timeline.rs +++ b/safekeeper/src/timeline.rs @@ -95,7 +95,6 @@ struct SharedState { /// when tli is inactive instead of having this flag. active: bool, num_computes: u32, - pageserver_connstr: Option, last_removed_segno: XLogSegNo, } @@ -119,7 +118,6 @@ impl SharedState { wal_backup_active: false, active: false, num_computes: 0, - pageserver_connstr: None, last_removed_segno: 0, }) } @@ -139,7 +137,6 @@ impl SharedState { wal_backup_active: false, active: false, num_computes: 0, - pageserver_connstr: None, last_removed_segno: 0, }) } @@ -190,35 +187,6 @@ impl SharedState { self.wal_backup_active } - /// Activate timeline's walsender: start/change timeline information propagated into etcd for further pageserver connections. - fn activate_walsender( - &mut self, - zttid: &ZTenantTimelineId, - new_pageserver_connstr: Option, - ) { - if self.pageserver_connstr != new_pageserver_connstr { - self.deactivate_walsender(zttid); - - if new_pageserver_connstr.is_some() { - info!( - "timeline {} has activated its walsender with connstr {new_pageserver_connstr:?}", - zttid.timeline_id, - ); - } - self.pageserver_connstr = new_pageserver_connstr; - } - } - - /// Deactivate the timeline: stop sending the timeline data into etcd, so no pageserver can connect for WAL streaming. - fn deactivate_walsender(&mut self, zttid: &ZTenantTimelineId) { - if let Some(pageserver_connstr) = self.pageserver_connstr.take() { - info!( - "timeline {} had deactivated its wallsender with connstr {pageserver_connstr:?}", - zttid.timeline_id, - ) - } - } - fn get_wal_seg_size(&self) -> usize { self.sk.state.server.wal_seg_size as usize } @@ -318,17 +286,12 @@ impl Timeline { /// Register compute connection, starting timeline-related activity if it is /// not running yet. /// Can fail only if channel to a static thread got closed, which is not normal at all. - pub fn on_compute_connect(&self, pageserver_connstr: Option<&String>) -> Result<()> { + pub fn on_compute_connect(&self) -> Result<()> { let is_wal_backup_action_pending: bool; { let mut shared_state = self.mutex.lock().unwrap(); shared_state.num_computes += 1; is_wal_backup_action_pending = shared_state.update_status(); - // FIXME: currently we always adopt latest pageserver connstr, but we - // should have kind of generations assigned by compute to distinguish - // the latest one or even pass it through consensus to reliably deliver - // to all safekeepers. - shared_state.activate_walsender(&self.zttid, pageserver_connstr.cloned()); } // Wake up wal backup launcher, if offloading not started yet. if is_wal_backup_action_pending { @@ -364,7 +327,7 @@ impl Timeline { (replica_state.remote_consistent_lsn != Lsn::MAX && // Lsn::MAX means that we don't know the latest LSN yet. replica_state.remote_consistent_lsn >= shared_state.sk.inmem.commit_lsn); if stop { - shared_state.deactivate_walsender(&self.zttid); + shared_state.update_status(); return Ok(true); } } @@ -525,7 +488,6 @@ impl Timeline { )), peer_horizon_lsn: Some(shared_state.sk.inmem.peer_horizon_lsn), safekeeper_connstr: Some(conf.listen_pg_addr.clone()), - pageserver_connstr: shared_state.pageserver_connstr.clone(), backup_lsn: Some(shared_state.sk.inmem.backup_lsn), }) }