diff --git a/libs/walproposer/src/api_bindings.rs b/libs/walproposer/src/api_bindings.rs index e884f8438a..1f7bf952dc 100644 --- a/libs/walproposer/src/api_bindings.rs +++ b/libs/walproposer/src/api_bindings.rs @@ -326,14 +326,6 @@ extern "C" fn process_safekeeper_feedback(wp: *mut WalProposer, commit_lsn: XLog } } -extern "C" fn confirm_wal_streamed(wp: *mut WalProposer, lsn: XLogRecPtr) { - unsafe { - let callback_data = (*(*wp).config).callback_data; - let api = callback_data as *mut Box; - (*api).confirm_wal_streamed(&mut (*wp), lsn) - } -} - extern "C" fn log_internal( wp: *mut WalProposer, level: ::std::os::raw::c_int, @@ -419,7 +411,6 @@ pub(crate) fn create_api() -> walproposer_api { get_redo_start_lsn: Some(get_redo_start_lsn), finish_sync_safekeepers: Some(finish_sync_safekeepers), process_safekeeper_feedback: Some(process_safekeeper_feedback), - confirm_wal_streamed: Some(confirm_wal_streamed), log_internal: Some(log_internal), } } diff --git a/libs/walproposer/src/walproposer.rs b/libs/walproposer/src/walproposer.rs index 87001c9c66..35c8f6904d 100644 --- a/libs/walproposer/src/walproposer.rs +++ b/libs/walproposer/src/walproposer.rs @@ -142,10 +142,6 @@ pub trait ApiImpl { todo!() } - fn confirm_wal_streamed(&self, _wp: &mut WalProposer, _lsn: u64) { - todo!() - } - fn log_internal(&self, _wp: &mut WalProposer, _level: Level, _msg: &str) { todo!() } diff --git a/pgxn/neon/walproposer.c b/pgxn/neon/walproposer.c index 5874d199f9..7fb0cab9a0 100644 --- a/pgxn/neon/walproposer.c +++ b/pgxn/neon/walproposer.c @@ -1643,35 +1643,26 @@ static void HandleSafekeeperResponse(WalProposer *wp) { XLogRecPtr minQuorumLsn; - XLogRecPtr minFlushLsn; + XLogRecPtr candidateTruncateLsn; minQuorumLsn = GetAcknowledgedByQuorumWALPosition(wp); wp->api.process_safekeeper_feedback(wp, minQuorumLsn); /* - * Try to advance truncateLsn to minFlushLsn, which is the last record - * flushed to all safekeepers. We must always start streaming from the - * beginning of the record, which simplifies decoding on the far end. + * Try to advance truncateLsn -- the last record flushed to all + * safekeepers. * - * Advanced truncateLsn should be not further than nearest commitLsn. This - * prevents surprising violation of truncateLsn <= commitLsn invariant - * which might occur because 1) truncateLsn can be advanced immediately - * once chunk is broadcast to all safekeepers, and commitLsn generally - * can't be advanced based on feedback from safekeeper who is still in the - * previous epoch (similar to 'leader can't commit entries from previous - * term' in Raft); 2) chunks we read from WAL and send are plain sheets of - * bytes, but safekeepers ack only on record boundaries. + * Advanced truncateLsn should be not higher than commitLsn. This prevents + * surprising violation of truncateLsn <= commitLsn invariant which might + * occur because commitLsn generally can't be advanced based on feedback + * from safekeeper who is still in the previous epoch (similar to 'leader + * can't commit entries from previous term' in Raft); 2) */ - minFlushLsn = CalculateMinFlushLsn(wp); - if (minFlushLsn > wp->truncateLsn) + candidateTruncateLsn = CalculateMinFlushLsn(wp); + candidateTruncateLsn = Min(candidateTruncateLsn, minQuorumLsn); + if (candidateTruncateLsn > wp->truncateLsn) { - wp->truncateLsn = minFlushLsn; - - /* - * Advance the replication slot to free up old WAL files. Note that - * slot doesn't exist if we are in syncSafekeepers mode. - */ - wp->api.confirm_wal_streamed(wp, wp->truncateLsn); + wp->truncateLsn = candidateTruncateLsn; } /* diff --git a/pgxn/neon/walproposer.h b/pgxn/neon/walproposer.h index 4c2b53a1ef..6d478076fe 100644 --- a/pgxn/neon/walproposer.h +++ b/pgxn/neon/walproposer.h @@ -557,12 +557,6 @@ typedef struct walproposer_api */ void (*process_safekeeper_feedback) (WalProposer *wp, XLogRecPtr commitLsn); - /* - * Called on peer_horizon_lsn updates. Used to advance replication slot - * and to free up disk space by deleting unnecessary WAL. - */ - void (*confirm_wal_streamed) (WalProposer *wp, XLogRecPtr lsn); - /* * Write a log message to the internal log processor. This is used only * when walproposer is compiled as a library. Otherwise, all logging is diff --git a/pgxn/neon/walproposer_pg.c b/pgxn/neon/walproposer_pg.c index 57be2d8d96..10c740840f 100644 --- a/pgxn/neon/walproposer_pg.c +++ b/pgxn/neon/walproposer_pg.c @@ -1812,7 +1812,7 @@ walprop_pg_finish_sync_safekeepers(WalProposer *wp, XLogRecPtr lsn) } /* - * Get PageserverFeedback fields from the most advanced safekeeper + * Choose most advanced PageserverFeedback and set it to *rf. */ static void GetLatestNeonFeedback(PageserverFeedback *rf, WalProposer *wp) @@ -1842,8 +1842,6 @@ GetLatestNeonFeedback(PageserverFeedback *rf, WalProposer *wp) LSN_FORMAT_ARGS(rf->disk_consistent_lsn), LSN_FORMAT_ARGS(rf->remote_consistent_lsn), rf->replytime); - - replication_feedback_set(rf); } /* @@ -1883,63 +1881,69 @@ CombineHotStanbyFeedbacks(HotStandbyFeedback *hs, WalProposer *wp) hs->catalog_xmin = InvalidFullTransactionId; } +/* + * Based on commitLsn and safekeeper responses including pageserver feedback, + * 1) Propagate cluster size received from ps to ensure the limit. + * 2) Propagate pageserver LSN positions to ensure backpressure limits. + * 3) Advance walproposer slot to commitLsn (releasing WAL & waking up waiters). + * 4) Propagate hot standby feedback. + * + * None of that is functional in sync-safekeepers. + */ static void walprop_pg_process_safekeeper_feedback(WalProposer *wp, XLogRecPtr commitLsn) { HotStandbyFeedback hsFeedback; - XLogRecPtr diskConsistentLsn; + XLogRecPtr oldDiskConsistentLsn; - diskConsistentLsn = quorumFeedback.rf.disk_consistent_lsn; + if (wp->config->syncSafekeepers) + return; - if (!wp->config->syncSafekeepers) + oldDiskConsistentLsn = quorumFeedback.rf.disk_consistent_lsn; + + /* Get PageserverFeedback fields from the most advanced safekeeper */ + GetLatestNeonFeedback(&quorumFeedback.rf, wp); + replication_feedback_set(&quorumFeedback.rf); + SetZenithCurrentClusterSize(quorumFeedback.rf.currentClusterSize); + + if (commitLsn > quorumFeedback.flushLsn || oldDiskConsistentLsn != quorumFeedback.rf.disk_consistent_lsn) { - /* Get PageserverFeedback fields from the most advanced safekeeper */ - GetLatestNeonFeedback(&quorumFeedback.rf, wp); - SetZenithCurrentClusterSize(quorumFeedback.rf.currentClusterSize); - } - - if (commitLsn > quorumFeedback.flushLsn || diskConsistentLsn != quorumFeedback.rf.disk_consistent_lsn) - { - if (commitLsn > quorumFeedback.flushLsn) quorumFeedback.flushLsn = commitLsn; - /* advance the replication slot */ - if (!wp->config->syncSafekeepers) - ProcessStandbyReply( - /* write_lsn - This is what durably stored in WAL service. */ - quorumFeedback.flushLsn, - /* flush_lsn - This is what durably stored in WAL service. */ - quorumFeedback.flushLsn, + /* + * Advance the replication slot to commitLsn. WAL before it is + * hardened and will be fetched from one of safekeepers by + * neon_walreader if needed. + * + * Also wakes up syncrep waiters. + */ + ProcessStandbyReply( + /* write_lsn - This is what durably stored in WAL service. */ + quorumFeedback.flushLsn, + /* flush_lsn - This is what durably stored in WAL service. */ + quorumFeedback.flushLsn, - /* - * apply_lsn - This is what processed and durably saved at* - * pageserver. - */ - quorumFeedback.rf.disk_consistent_lsn, - walprop_pg_get_current_timestamp(wp), false); + /* + * apply_lsn - This is what processed and durably saved at* + * pageserver. + */ + quorumFeedback.rf.disk_consistent_lsn, + walprop_pg_get_current_timestamp(wp), false); } CombineHotStanbyFeedbacks(&hsFeedback, wp); if (hsFeedback.ts != 0 && memcmp(&hsFeedback, &quorumFeedback.hs, sizeof hsFeedback) != 0) { quorumFeedback.hs = hsFeedback; - if (!wp->config->syncSafekeepers) - ProcessStandbyHSFeedback(hsFeedback.ts, - XidFromFullTransactionId(hsFeedback.xmin), - EpochFromFullTransactionId(hsFeedback.xmin), - XidFromFullTransactionId(hsFeedback.catalog_xmin), - EpochFromFullTransactionId(hsFeedback.catalog_xmin)); + ProcessStandbyHSFeedback(hsFeedback.ts, + XidFromFullTransactionId(hsFeedback.xmin), + EpochFromFullTransactionId(hsFeedback.xmin), + XidFromFullTransactionId(hsFeedback.catalog_xmin), + EpochFromFullTransactionId(hsFeedback.catalog_xmin)); } } -static void -walprop_pg_confirm_wal_streamed(WalProposer *wp, XLogRecPtr lsn) -{ - if (MyReplicationSlot) - PhysicalConfirmReceivedLocation(lsn); -} - static XLogRecPtr walprop_pg_get_redo_start_lsn(WalProposer *wp) { @@ -2040,6 +2044,5 @@ static const walproposer_api walprop_pg = { .get_redo_start_lsn = walprop_pg_get_redo_start_lsn, .finish_sync_safekeepers = walprop_pg_finish_sync_safekeepers, .process_safekeeper_feedback = walprop_pg_process_safekeeper_feedback, - .confirm_wal_streamed = walprop_pg_confirm_wal_streamed, .log_internal = walprop_pg_log_internal, };