In wal_proposer_recovery, don't wait outcoming WAL to be committed.

Otherwise we're deadlocking ourselves. Oversight of 33007cc.
This commit is contained in:
Arseny Sher
2021-11-10 00:25:02 +03:00
parent ce15c62f35
commit 5603259c53

View File

@@ -194,8 +194,22 @@ impl ReplicationConn {
break;
}
}
let (_, timeline) = swh.timeline.get().get_end_of_wal();
info!("Start replication from {}", start_pos);
let (wal_end, timeline) = swh.timeline.get().get_end_of_wal();
// Walproposer gets special handling: safekeeper must give proposer all
// local WAL till the end, whether committed or not (walproposer will
// hang otherwise). That's because walproposer runs the consensus and
// synchronizes safekeepers on the most advanced one.
//
// There is a small risk of this WAL getting concurrently garbaged if
// another compute rises which collects majority and starts fixing log
// on this safekeeper itself. That's ok as (old) proposer will never be
// able to commit such WAL.
let stop_pos: Option<Lsn> = if swh.appname == Some("wal_proposer_recovery".to_string()) {
Some(wal_end)
} else {
None
};
info!("Start replication from {:?} till {:?}", start_pos, stop_pos);
// switch to copy
pgb.write_message(&BeMessage::CopyBothResponse)?;
@@ -204,18 +218,25 @@ impl ReplicationConn {
let mut wal_file: Option<File> = None;
loop {
/* Wait until we have some data to stream */
if let Some(lsn) = swh.timeline.get().wait_for_lsn(start_pos) {
end_pos = lsn
if let Some(stop_pos) = stop_pos {
if start_pos >= stop_pos {
break; /* recovery finished */
}
end_pos = stop_pos;
} else {
// timeout expired: request pageserver status
pgb.write_message(&BeMessage::KeepAlive(WalSndKeepAlive {
sent_ptr: end_pos.0,
timestamp: get_current_timestamp(),
request_reply: true,
}))
.context("Failed to send KeepAlive message")?;
continue;
/* Wait until we have some data to stream */
if let Some(lsn) = swh.timeline.get().wait_for_lsn(start_pos) {
end_pos = lsn
} else {
// timeout expired: request pageserver status
pgb.write_message(&BeMessage::KeepAlive(WalSndKeepAlive {
sent_ptr: end_pos.0,
timestamp: get_current_timestamp(),
request_reply: true,
}))
.context("Failed to send KeepAlive message")?;
continue;
}
}
if end_pos == END_REPLICATION_MARKER {
break;