mirror of
https://github.com/neondatabase/neon.git
synced 2026-01-09 06:22:57 +00:00
Safekeeper's START_REPLICATION handler: remove stop_point, do not handle start_point == 0 (#777)
This commit is contained in:
@@ -130,16 +130,16 @@ impl ReplicationConn {
|
|||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Helper function that parses a pair of LSNs.
|
/// Helper function that parses a single LSN.
|
||||||
fn parse_start_stop(cmd: &[u8]) -> Result<(Lsn, Lsn)> {
|
fn parse_start(cmd: &[u8]) -> Result<Lsn> {
|
||||||
let re = Regex::new(r"([[:xdigit:]]+/[[:xdigit:]]+)").unwrap();
|
let re = Regex::new(r"([[:xdigit:]]+/[[:xdigit:]]+)").unwrap();
|
||||||
let caps = re.captures_iter(str::from_utf8(cmd)?);
|
let caps = re.captures_iter(str::from_utf8(cmd)?);
|
||||||
let mut lsns = caps.map(|cap| cap[1].parse::<Lsn>());
|
let mut lsns = caps.map(|cap| cap[1].parse::<Lsn>());
|
||||||
let start_pos = lsns
|
let start_pos = lsns
|
||||||
.next()
|
.next()
|
||||||
.ok_or_else(|| anyhow!("Failed to parse start LSN from command"))??;
|
.ok_or_else(|| anyhow!("Failed to parse start LSN from command"))??;
|
||||||
let stop_pos = lsns.next().transpose()?.unwrap_or(Lsn(0));
|
assert!(lsns.next().is_none());
|
||||||
Ok((start_pos, stop_pos))
|
Ok(start_pos)
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Helper function for opening a wal file.
|
/// Helper function for opening a wal file.
|
||||||
@@ -182,7 +182,7 @@ impl ReplicationConn {
|
|||||||
})
|
})
|
||||||
.unwrap();
|
.unwrap();
|
||||||
|
|
||||||
let (mut start_pos, mut stop_pos) = Self::parse_start_stop(cmd)?;
|
let mut start_pos = Self::parse_start(cmd)?;
|
||||||
|
|
||||||
let mut wal_seg_size: usize;
|
let mut wal_seg_size: usize;
|
||||||
loop {
|
loop {
|
||||||
@@ -194,14 +194,8 @@ impl ReplicationConn {
|
|||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
let (wal_end, timeline) = swh.timeline.get().get_end_of_wal();
|
let (_, timeline) = swh.timeline.get().get_end_of_wal();
|
||||||
if start_pos == Lsn(0) {
|
info!("Start replication from {}", start_pos);
|
||||||
start_pos = wal_end;
|
|
||||||
}
|
|
||||||
if stop_pos == Lsn(0) && swh.appname == Some("wal_proposer_recovery".to_string()) {
|
|
||||||
stop_pos = wal_end;
|
|
||||||
}
|
|
||||||
info!("Start replication from {} till {}", start_pos, stop_pos);
|
|
||||||
|
|
||||||
// switch to copy
|
// switch to copy
|
||||||
pgb.write_message(&BeMessage::CopyBothResponse)?;
|
pgb.write_message(&BeMessage::CopyBothResponse)?;
|
||||||
@@ -211,28 +205,17 @@ impl ReplicationConn {
|
|||||||
|
|
||||||
loop {
|
loop {
|
||||||
/* Wait until we have some data to stream */
|
/* Wait until we have some data to stream */
|
||||||
if stop_pos != Lsn(0) {
|
if let Some(lsn) = swh.timeline.get().wait_for_lsn(start_pos) {
|
||||||
/* recovery mode: stream up to the specified LSN (VCL) */
|
end_pos = lsn
|
||||||
if start_pos >= stop_pos {
|
|
||||||
/* recovery finished */
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
end_pos = stop_pos;
|
|
||||||
} else {
|
} else {
|
||||||
/* normal mode */
|
// timeout expired: request pageserver status
|
||||||
let timeline = swh.timeline.get();
|
pgb.write_message(&BeMessage::KeepAlive(WalSndKeepAlive {
|
||||||
if let Some(lsn) = timeline.wait_for_lsn(start_pos) {
|
sent_ptr: end_pos.0,
|
||||||
end_pos = lsn
|
timestamp: get_current_timestamp(),
|
||||||
} else {
|
request_reply: true,
|
||||||
// timeout expired: request pageserver status
|
}))
|
||||||
pgb.write_message(&BeMessage::KeepAlive(WalSndKeepAlive {
|
.context("Failed to send KeepAlive message")?;
|
||||||
sent_ptr: end_pos.0,
|
continue;
|
||||||
timestamp: get_current_timestamp(),
|
|
||||||
request_reply: true,
|
|
||||||
}))
|
|
||||||
.context("Failed to send KeepAlive message")?;
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
if end_pos == END_REPLICATION_MARKER {
|
if end_pos == END_REPLICATION_MARKER {
|
||||||
break;
|
break;
|
||||||
|
|||||||
Reference in New Issue
Block a user