From 5603259c53b62350a71370cbb5325cf81d9e008e Mon Sep 17 00:00:00 2001 From: Arseny Sher Date: Wed, 10 Nov 2021 00:25:02 +0300 Subject: [PATCH] In wal_proposer_recovery, don't wait outcoming WAL to be committed. Otherwise we're deadlocking ourselves. Oversight of 33007cc. --- walkeeper/src/replication.rs | 47 ++++++++++++++++++++++++++---------- 1 file changed, 34 insertions(+), 13 deletions(-) diff --git a/walkeeper/src/replication.rs b/walkeeper/src/replication.rs index 29709668c9..ae9b8544d1 100644 --- a/walkeeper/src/replication.rs +++ b/walkeeper/src/replication.rs @@ -194,8 +194,22 @@ impl ReplicationConn { break; } } - let (_, timeline) = swh.timeline.get().get_end_of_wal(); - info!("Start replication from {}", start_pos); + let (wal_end, timeline) = swh.timeline.get().get_end_of_wal(); + // Walproposer gets special handling: safekeeper must give proposer all + // local WAL till the end, whether committed or not (walproposer will + // hang otherwise). That's because walproposer runs the consensus and + // synchronizes safekeepers on the most advanced one. + // + // There is a small risk of this WAL getting concurrently garbaged if + // another compute rises which collects majority and starts fixing log + // on this safekeeper itself. That's ok as (old) proposer will never be + // able to commit such WAL. + let stop_pos: Option = if swh.appname == Some("wal_proposer_recovery".to_string()) { + Some(wal_end) + } else { + None + }; + info!("Start replication from {:?} till {:?}", start_pos, stop_pos); // switch to copy pgb.write_message(&BeMessage::CopyBothResponse)?; @@ -204,18 +218,25 @@ impl ReplicationConn { let mut wal_file: Option = None; loop { - /* Wait until we have some data to stream */ - if let Some(lsn) = swh.timeline.get().wait_for_lsn(start_pos) { - end_pos = lsn + if let Some(stop_pos) = stop_pos { + if start_pos >= stop_pos { + break; /* recovery finished */ + } + end_pos = stop_pos; } else { - // timeout expired: request pageserver status - pgb.write_message(&BeMessage::KeepAlive(WalSndKeepAlive { - sent_ptr: end_pos.0, - timestamp: get_current_timestamp(), - request_reply: true, - })) - .context("Failed to send KeepAlive message")?; - continue; + /* Wait until we have some data to stream */ + if let Some(lsn) = swh.timeline.get().wait_for_lsn(start_pos) { + end_pos = lsn + } else { + // timeout expired: request pageserver status + pgb.write_message(&BeMessage::KeepAlive(WalSndKeepAlive { + sent_ptr: end_pos.0, + timestamp: get_current_timestamp(), + request_reply: true, + })) + .context("Failed to send KeepAlive message")?; + continue; + } } if end_pos == END_REPLICATION_MARKER { break;