Safekeeper's START_REPLICATION handler: remove stop_point, do not handle start_point == 0 (#777)

This commit is contained in:
Egor Suvorov
2021-11-04 14:50:33 +03:00
committed by GitHub
parent 987833e0b9
commit 33007cc0bb

View File

@@ -130,16 +130,16 @@ impl ReplicationConn {
Ok(()) Ok(())
} }
/// Helper function that parses a pair of LSNs. /// Helper function that parses a single LSN.
fn parse_start_stop(cmd: &[u8]) -> Result<(Lsn, Lsn)> { fn parse_start(cmd: &[u8]) -> Result<Lsn> {
let re = Regex::new(r"([[:xdigit:]]+/[[:xdigit:]]+)").unwrap(); let re = Regex::new(r"([[:xdigit:]]+/[[:xdigit:]]+)").unwrap();
let caps = re.captures_iter(str::from_utf8(cmd)?); let caps = re.captures_iter(str::from_utf8(cmd)?);
let mut lsns = caps.map(|cap| cap[1].parse::<Lsn>()); let mut lsns = caps.map(|cap| cap[1].parse::<Lsn>());
let start_pos = lsns let start_pos = lsns
.next() .next()
.ok_or_else(|| anyhow!("Failed to parse start LSN from command"))??; .ok_or_else(|| anyhow!("Failed to parse start LSN from command"))??;
let stop_pos = lsns.next().transpose()?.unwrap_or(Lsn(0)); assert!(lsns.next().is_none());
Ok((start_pos, stop_pos)) Ok(start_pos)
} }
/// Helper function for opening a wal file. /// Helper function for opening a wal file.
@@ -182,7 +182,7 @@ impl ReplicationConn {
}) })
.unwrap(); .unwrap();
let (mut start_pos, mut stop_pos) = Self::parse_start_stop(cmd)?; let mut start_pos = Self::parse_start(cmd)?;
let mut wal_seg_size: usize; let mut wal_seg_size: usize;
loop { loop {
@@ -194,14 +194,8 @@ impl ReplicationConn {
break; break;
} }
} }
let (wal_end, timeline) = swh.timeline.get().get_end_of_wal(); let (_, timeline) = swh.timeline.get().get_end_of_wal();
if start_pos == Lsn(0) { info!("Start replication from {}", start_pos);
start_pos = wal_end;
}
if stop_pos == Lsn(0) && swh.appname == Some("wal_proposer_recovery".to_string()) {
stop_pos = wal_end;
}
info!("Start replication from {} till {}", start_pos, stop_pos);
// switch to copy // switch to copy
pgb.write_message(&BeMessage::CopyBothResponse)?; pgb.write_message(&BeMessage::CopyBothResponse)?;
@@ -211,28 +205,17 @@ impl ReplicationConn {
loop { loop {
/* Wait until we have some data to stream */ /* Wait until we have some data to stream */
if stop_pos != Lsn(0) { if let Some(lsn) = swh.timeline.get().wait_for_lsn(start_pos) {
/* recovery mode: stream up to the specified LSN (VCL) */ end_pos = lsn
if start_pos >= stop_pos {
/* recovery finished */
break;
}
end_pos = stop_pos;
} else { } else {
/* normal mode */ // timeout expired: request pageserver status
let timeline = swh.timeline.get(); pgb.write_message(&BeMessage::KeepAlive(WalSndKeepAlive {
if let Some(lsn) = timeline.wait_for_lsn(start_pos) { sent_ptr: end_pos.0,
end_pos = lsn timestamp: get_current_timestamp(),
} else { request_reply: true,
// timeout expired: request pageserver status }))
pgb.write_message(&BeMessage::KeepAlive(WalSndKeepAlive { .context("Failed to send KeepAlive message")?;
sent_ptr: end_pos.0, continue;
timestamp: get_current_timestamp(),
request_reply: true,
}))
.context("Failed to send KeepAlive message")?;
continue;
}
} }
if end_pos == END_REPLICATION_MARKER { if end_pos == END_REPLICATION_MARKER {
break; break;