diff --git a/walkeeper/src/replication.rs b/walkeeper/src/replication.rs index 1975f91ac4..b6cbfdec12 100644 --- a/walkeeper/src/replication.rs +++ b/walkeeper/src/replication.rs @@ -130,16 +130,16 @@ impl ReplicationConn { Ok(()) } - /// Helper function that parses a pair of LSNs. - fn parse_start_stop(cmd: &[u8]) -> Result<(Lsn, Lsn)> { + /// Helper function that parses a single LSN. + fn parse_start(cmd: &[u8]) -> Result { let re = Regex::new(r"([[:xdigit:]]+/[[:xdigit:]]+)").unwrap(); let caps = re.captures_iter(str::from_utf8(cmd)?); let mut lsns = caps.map(|cap| cap[1].parse::()); let start_pos = lsns .next() .ok_or_else(|| anyhow!("Failed to parse start LSN from command"))??; - let stop_pos = lsns.next().transpose()?.unwrap_or(Lsn(0)); - Ok((start_pos, stop_pos)) + assert!(lsns.next().is_none()); + Ok(start_pos) } /// Helper function for opening a wal file. @@ -182,7 +182,7 @@ impl ReplicationConn { }) .unwrap(); - let (mut start_pos, mut stop_pos) = Self::parse_start_stop(cmd)?; + let mut start_pos = Self::parse_start(cmd)?; let mut wal_seg_size: usize; loop { @@ -194,14 +194,8 @@ impl ReplicationConn { break; } } - let (wal_end, timeline) = swh.timeline.get().get_end_of_wal(); - if start_pos == Lsn(0) { - start_pos = wal_end; - } - if stop_pos == Lsn(0) && swh.appname == Some("wal_proposer_recovery".to_string()) { - stop_pos = wal_end; - } - info!("Start replication from {} till {}", start_pos, stop_pos); + let (_, timeline) = swh.timeline.get().get_end_of_wal(); + info!("Start replication from {}", start_pos); // switch to copy pgb.write_message(&BeMessage::CopyBothResponse)?; @@ -211,28 +205,17 @@ impl ReplicationConn { loop { /* Wait until we have some data to stream */ - if stop_pos != Lsn(0) { - /* recovery mode: stream up to the specified LSN (VCL) */ - if start_pos >= stop_pos { - /* recovery finished */ - break; - } - end_pos = stop_pos; + if let Some(lsn) = swh.timeline.get().wait_for_lsn(start_pos) { + end_pos = lsn } else { - /* normal mode */ - let timeline = swh.timeline.get(); - if let Some(lsn) = timeline.wait_for_lsn(start_pos) { - end_pos = lsn - } else { - // timeout expired: request pageserver status - pgb.write_message(&BeMessage::KeepAlive(WalSndKeepAlive { - sent_ptr: end_pos.0, - timestamp: get_current_timestamp(), - request_reply: true, - })) - .context("Failed to send KeepAlive message")?; - continue; - } + // timeout expired: request pageserver status + pgb.write_message(&BeMessage::KeepAlive(WalSndKeepAlive { + sent_ptr: end_pos.0, + timestamp: get_current_timestamp(), + request_reply: true, + })) + .context("Failed to send KeepAlive message")?; + continue; } if end_pos == END_REPLICATION_MARKER { break;