diff --git a/libs/walproposer/src/api_bindings.rs b/libs/walproposer/src/api_bindings.rs index 2f633243be..e884f8438a 100644 --- a/libs/walproposer/src/api_bindings.rs +++ b/libs/walproposer/src/api_bindings.rs @@ -14,7 +14,6 @@ use crate::bindings::PGAsyncWriteResult; use crate::bindings::Safekeeper; use crate::bindings::Size; use crate::bindings::StringInfoData; -use crate::bindings::TimeLineID; use crate::bindings::TimestampTz; use crate::bindings::WalProposer; use crate::bindings::WalProposerConnStatusType; @@ -179,16 +178,11 @@ extern "C" fn conn_blocking_write( } } -extern "C" fn recovery_download( - sk: *mut Safekeeper, - _timeline: TimeLineID, - startpos: XLogRecPtr, - endpos: XLogRecPtr, -) -> bool { +extern "C" fn recovery_download(wp: *mut WalProposer, sk: *mut Safekeeper) -> bool { unsafe { let callback_data = (*(*(*sk).wp).config).callback_data; let api = callback_data as *mut Box; - (*api).recovery_download(&mut (*sk), startpos, endpos) + (*api).recovery_download(&mut (*wp), &mut (*sk)) } } @@ -354,14 +348,6 @@ extern "C" fn log_internal( } } -extern "C" fn after_election(wp: *mut WalProposer) { - unsafe { - let callback_data = (*(*wp).config).callback_data; - let api = callback_data as *mut Box; - (*api).after_election(&mut (*wp)) - } -} - #[derive(Debug)] pub enum Level { Debug5, @@ -435,7 +421,6 @@ pub(crate) fn create_api() -> walproposer_api { process_safekeeper_feedback: Some(process_safekeeper_feedback), confirm_wal_streamed: Some(confirm_wal_streamed), log_internal: Some(log_internal), - after_election: Some(after_election), } } diff --git a/libs/walproposer/src/walproposer.rs b/libs/walproposer/src/walproposer.rs index 013400325d..87001c9c66 100644 --- a/libs/walproposer/src/walproposer.rs +++ b/libs/walproposer/src/walproposer.rs @@ -86,7 +86,7 @@ pub trait ApiImpl { todo!() } - fn recovery_download(&self, _sk: &mut Safekeeper, _startpos: u64, _endpos: u64) -> bool { + fn recovery_download(&self, _wp: &mut WalProposer, _sk: &mut Safekeeper) -> bool { todo!() } @@ -364,6 +364,14 @@ mod tests { true } + fn recovery_download( + &self, + _wp: &mut crate::bindings::WalProposer, + _sk: &mut crate::bindings::Safekeeper, + ) -> bool { + true + } + fn wal_reader_allocate(&self, _: &mut crate::bindings::Safekeeper) -> NeonWALReadResult { println!("wal_reader_allocate"); crate::bindings::NeonWALReadResult_NEON_WALREAD_SUCCESS diff --git a/pgxn/neon/walproposer.c b/pgxn/neon/walproposer.c index 4fb9a46d15..5874d199f9 100644 --- a/pgxn/neon/walproposer.c +++ b/pgxn/neon/walproposer.c @@ -809,7 +809,7 @@ RecvVoteResponse(Safekeeper *sk) } else if (wp->n_votes > wp->quorum) { - /* recovery already performed, just start streaming */ + /* already elected, start streaming */ SendProposerElected(sk); } else @@ -835,21 +835,16 @@ HandleElectedProposer(WalProposer *wp) DetermineEpochStartLsn(wp); /* - * Check if not all safekeepers are up-to-date, we need to download WAL - * needed to synchronize them + * Synchronously download WAL from the most advanced safekeeper. We do + * that only for logical replication (and switching logical walsenders to + * neon_walreader is a todo.) */ - if (wp->truncateLsn < wp->propEpochStartLsn) + if (!wp->api.recovery_download(wp, &wp->safekeeper[wp->donor])) { - walprop_log(LOG, - "start recovery because truncateLsn=%X/%X is not " - "equal to epochStartLsn=%X/%X", - LSN_FORMAT_ARGS(wp->truncateLsn), - LSN_FORMAT_ARGS(wp->propEpochStartLsn)); - /* Perform recovery */ - if (!wp->api.recovery_download(&wp->safekeeper[wp->donor], wp->greetRequest.timeline, wp->truncateLsn, wp->propEpochStartLsn)) - walprop_log(FATAL, "Failed to recover state"); + walprop_log(FATAL, "failed to download WAL for logical replicaiton"); } - else if (wp->config->syncSafekeepers) + + if (wp->truncateLsn == wp->propEpochStartLsn && wp->config->syncSafekeepers) { /* Sync is not needed: just exit */ wp->api.finish_sync_safekeepers(wp, wp->propEpochStartLsn); @@ -1047,13 +1042,6 @@ DetermineEpochStartLsn(WalProposer *wp) } walprop_shared->mineLastElectedTerm = wp->propTerm; } - - /* - * WalProposer has just elected itself and initialized history, so we can - * call election callback. Usually it updates truncateLsn to fetch WAL for - * logical replication. - */ - wp->api.after_election(wp); } /* diff --git a/pgxn/neon/walproposer.h b/pgxn/neon/walproposer.h index a90e87b54f..2b2c252a18 100644 --- a/pgxn/neon/walproposer.h +++ b/pgxn/neon/walproposer.h @@ -485,8 +485,11 @@ typedef struct walproposer_api /* Blocking CopyData write, aka PQputCopyData + PQflush. */ bool (*conn_blocking_write) (Safekeeper *sk, void const *buf, size_t size); - /* Download WAL from startpos to endpos and make it available locally. */ - bool (*recovery_download) (Safekeeper *sk, TimeLineID timeline, XLogRecPtr startpos, XLogRecPtr endpos); + /* + * Download WAL before basebackup for logical walsenders from sk, if + * needed + */ + bool (*recovery_download) (WalProposer *wp, Safekeeper *sk); /* Allocate WAL reader. */ void (*wal_reader_allocate) (Safekeeper *sk); @@ -556,14 +559,6 @@ typedef struct walproposer_api * handled by elog(). */ void (*log_internal) (WalProposer *wp, int level, const char *line); - - /* - * Called right after the proposer was elected, but before it started - * recovery and sent ProposerElected message to the safekeepers. - * - * Used by logical replication to update truncateLsn. - */ - void (*after_election) (WalProposer *wp); } walproposer_api; /* diff --git a/pgxn/neon/walproposer_pg.c b/pgxn/neon/walproposer_pg.c index 6199def43f..734e627b4d 100644 --- a/pgxn/neon/walproposer_pg.c +++ b/pgxn/neon/walproposer_pg.c @@ -101,6 +101,8 @@ static void add_nwr_event_set(Safekeeper *sk, uint32 events); static void update_nwr_event_set(Safekeeper *sk, uint32 events); static void rm_safekeeper_event_set(Safekeeper *to_remove, bool is_sk); +static XLogRecPtr GetLogRepRestartLSN(WalProposer *wp); + static void init_walprop_config(bool syncSafekeepers) { @@ -1211,16 +1213,38 @@ XLogBroadcastWalProposer(WalProposer *wp) } } -/* - * Receive WAL from most advanced safekeeper - */ +/* Download WAL before basebackup for logical walsenders from sk, if needed */ static bool -WalProposerRecovery(Safekeeper *sk, TimeLineID timeline, XLogRecPtr startpos, XLogRecPtr endpos) +WalProposerRecovery(WalProposer *wp, Safekeeper *sk) { char *err; WalReceiverConn *wrconn; WalRcvStreamOptions options; char conninfo[MAXCONNINFO]; + TimeLineID timeline; + XLogRecPtr startpos; + XLogRecPtr endpos; + uint64 download_range_mb; + + startpos = GetLogRepRestartLSN(wp); + if (startpos == InvalidXLogRecPtr) + return true; /* recovery not needed */ + endpos = wp->propEpochStartLsn; + + /* + * If we need to download more than a max_slot_wal_keep_size, cap to it to + * avoid risk of exploding pg_wal. Logical replication won't work until + * recreated, but at least compute would start; this also follows + * max_slot_wal_keep_size semantics. + */ + download_range_mb = (endpos - startpos) / 1024 / 1024; + if (max_slot_wal_keep_size_mb > 0 && download_range_mb >= max_slot_wal_keep_size_mb) + { + startpos = endpos - max_slot_wal_keep_size_mb * 1024 * 1024; + walprop_log(WARNING, "capped WAL download for logical replication to %X/%X as max_slot_wal_keep_size=%dMB", + LSN_FORMAT_ARGS(startpos), max_slot_wal_keep_size_mb); + } + timeline = wp->greetRequest.timeline; if (!neon_auth_token) { @@ -1250,7 +1274,7 @@ WalProposerRecovery(Safekeeper *sk, TimeLineID timeline, XLogRecPtr startpos, XL return false; } elog(LOG, - "start recovery from %s:%s starting from %X/%08X till %X/%08X timeline " + "start recovery for logical replication from %s:%s starting from %X/%08X till %X/%08X timeline " "%d", sk->host, sk->port, (uint32) (startpos >> 32), (uint32) startpos, (uint32) (endpos >> 32), (uint32) endpos, timeline); @@ -1928,15 +1952,15 @@ walprop_pg_log_internal(WalProposer *wp, int level, const char *line) elog(FATAL, "unexpected log_internal message at level %d: %s", level, line); } -static void -walprop_pg_after_election(WalProposer *wp) +static XLogRecPtr +GetLogRepRestartLSN(WalProposer *wp) { FILE *f; - XLogRecPtr lrRestartLsn; + XLogRecPtr lrRestartLsn = InvalidXLogRecPtr; /* We don't need to do anything in syncSafekeepers mode. */ if (wp->config->syncSafekeepers) - return; + return InvalidXLogRecPtr; /* * If there are active logical replication subscription we need to provide @@ -1944,25 +1968,40 @@ walprop_pg_after_election(WalProposer *wp) * replication slots. */ f = fopen("restart.lsn", "rb"); - if (f != NULL && !wp->config->syncSafekeepers) + if (f != NULL) { - size_t rc = fread(&lrRestartLsn, sizeof(lrRestartLsn), 1, f); + size_t rc = fread(&lrRestartLsn, sizeof(lrRestartLsn), 1, f); + fclose(f); if (rc == 1 && lrRestartLsn != InvalidXLogRecPtr) { - elog(LOG, "Logical replication restart LSN %X/%X", LSN_FORMAT_ARGS(lrRestartLsn)); + uint64 download_range_mb; - if (max_slot_wal_keep_size_mb <= 0 || lrRestartLsn + max_slot_wal_keep_size_mb*MB > wp->truncateLsn) + elog(LOG, "logical replication restart LSN %X/%X", LSN_FORMAT_ARGS(lrRestartLsn)); + + /* + * If we need to download more than a max_slot_wal_keep_size, + * don't do it to avoid risk of exploding pg_wal. Logical + * replication won't work until recreated, but at least compute + * would start; this also follows max_slot_wal_keep_size + * semantics. + */ + download_range_mb = (wp->propEpochStartLsn - lrRestartLsn) / MB; + if (max_slot_wal_keep_size_mb > 0 && download_range_mb >= max_slot_wal_keep_size_mb) { - /* - * start from the beginning of the segment to fetch page headers - * verifed by XLogReader - */ - lrRestartLsn = lrRestartLsn - XLogSegmentOffset(lrRestartLsn, wal_segment_size); - wp->truncateLsn = Min(wp->truncateLsn, lrRestartLsn); + walprop_log(WARNING, "not downloading WAL for logical replication since %X/%X as max_slot_wal_keep_size=%dMB", + LSN_FORMAT_ARGS(lrRestartLsn), max_slot_wal_keep_size_mb); + return InvalidXLogRecPtr; } + + /* + * start from the beginning of the segment to fetch page headers + * verifed by XLogReader + */ + lrRestartLsn = lrRestartLsn - XLogSegmentOffset(lrRestartLsn, wal_segment_size); } } + return lrRestartLsn; } static const walproposer_api walprop_pg = { @@ -1997,5 +2036,4 @@ static const walproposer_api walprop_pg = { .process_safekeeper_feedback = walprop_pg_process_safekeeper_feedback, .confirm_wal_streamed = walprop_pg_confirm_wal_streamed, .log_internal = walprop_pg_log_internal, - .after_election = walprop_pg_after_election, };