Refactor handling responses in walproposer.

Remove confirm_wal_streamed; we already apply both write and flush positions of
the slot to commit_lsn which is fine because 1) we need to wake up waiters 2)
committed WAL can be fetched from safekeepers by neon_walreader now.
This commit is contained in:
Arseny Sher
2023-12-16 00:00:49 +03:00
committed by Arseny Sher
parent d5fbfe2399
commit bfc98f36e3
5 changed files with 56 additions and 81 deletions

View File

@@ -326,14 +326,6 @@ extern "C" fn process_safekeeper_feedback(wp: *mut WalProposer, commit_lsn: XLog
}
}
extern "C" fn confirm_wal_streamed(wp: *mut WalProposer, lsn: XLogRecPtr) {
unsafe {
let callback_data = (*(*wp).config).callback_data;
let api = callback_data as *mut Box<dyn ApiImpl>;
(*api).confirm_wal_streamed(&mut (*wp), lsn)
}
}
extern "C" fn log_internal(
wp: *mut WalProposer,
level: ::std::os::raw::c_int,
@@ -419,7 +411,6 @@ pub(crate) fn create_api() -> walproposer_api {
get_redo_start_lsn: Some(get_redo_start_lsn),
finish_sync_safekeepers: Some(finish_sync_safekeepers),
process_safekeeper_feedback: Some(process_safekeeper_feedback),
confirm_wal_streamed: Some(confirm_wal_streamed),
log_internal: Some(log_internal),
}
}

View File

@@ -142,10 +142,6 @@ pub trait ApiImpl {
todo!()
}
fn confirm_wal_streamed(&self, _wp: &mut WalProposer, _lsn: u64) {
todo!()
}
fn log_internal(&self, _wp: &mut WalProposer, _level: Level, _msg: &str) {
todo!()
}

View File

@@ -1643,35 +1643,26 @@ static void
HandleSafekeeperResponse(WalProposer *wp)
{
XLogRecPtr minQuorumLsn;
XLogRecPtr minFlushLsn;
XLogRecPtr candidateTruncateLsn;
minQuorumLsn = GetAcknowledgedByQuorumWALPosition(wp);
wp->api.process_safekeeper_feedback(wp, minQuorumLsn);
/*
* Try to advance truncateLsn to minFlushLsn, which is the last record
* flushed to all safekeepers. We must always start streaming from the
* beginning of the record, which simplifies decoding on the far end.
* Try to advance truncateLsn -- the last record flushed to all
* safekeepers.
*
* Advanced truncateLsn should be not further than nearest commitLsn. This
* prevents surprising violation of truncateLsn <= commitLsn invariant
* which might occur because 1) truncateLsn can be advanced immediately
* once chunk is broadcast to all safekeepers, and commitLsn generally
* can't be advanced based on feedback from safekeeper who is still in the
* previous epoch (similar to 'leader can't commit entries from previous
* term' in Raft); 2) chunks we read from WAL and send are plain sheets of
* bytes, but safekeepers ack only on record boundaries.
* Advanced truncateLsn should be not higher than commitLsn. This prevents
* surprising violation of truncateLsn <= commitLsn invariant which might
* occur because commitLsn generally can't be advanced based on feedback
* from safekeeper who is still in the previous epoch (similar to 'leader
* can't commit entries from previous term' in Raft); 2)
*/
minFlushLsn = CalculateMinFlushLsn(wp);
if (minFlushLsn > wp->truncateLsn)
candidateTruncateLsn = CalculateMinFlushLsn(wp);
candidateTruncateLsn = Min(candidateTruncateLsn, minQuorumLsn);
if (candidateTruncateLsn > wp->truncateLsn)
{
wp->truncateLsn = minFlushLsn;
/*
* Advance the replication slot to free up old WAL files. Note that
* slot doesn't exist if we are in syncSafekeepers mode.
*/
wp->api.confirm_wal_streamed(wp, wp->truncateLsn);
wp->truncateLsn = candidateTruncateLsn;
}
/*

View File

@@ -557,12 +557,6 @@ typedef struct walproposer_api
*/
void (*process_safekeeper_feedback) (WalProposer *wp, XLogRecPtr commitLsn);
/*
* Called on peer_horizon_lsn updates. Used to advance replication slot
* and to free up disk space by deleting unnecessary WAL.
*/
void (*confirm_wal_streamed) (WalProposer *wp, XLogRecPtr lsn);
/*
* Write a log message to the internal log processor. This is used only
* when walproposer is compiled as a library. Otherwise, all logging is

View File

@@ -1812,7 +1812,7 @@ walprop_pg_finish_sync_safekeepers(WalProposer *wp, XLogRecPtr lsn)
}
/*
* Get PageserverFeedback fields from the most advanced safekeeper
* Choose most advanced PageserverFeedback and set it to *rf.
*/
static void
GetLatestNeonFeedback(PageserverFeedback *rf, WalProposer *wp)
@@ -1842,8 +1842,6 @@ GetLatestNeonFeedback(PageserverFeedback *rf, WalProposer *wp)
LSN_FORMAT_ARGS(rf->disk_consistent_lsn),
LSN_FORMAT_ARGS(rf->remote_consistent_lsn),
rf->replytime);
replication_feedback_set(rf);
}
/*
@@ -1883,63 +1881,69 @@ CombineHotStanbyFeedbacks(HotStandbyFeedback *hs, WalProposer *wp)
hs->catalog_xmin = InvalidFullTransactionId;
}
/*
* Based on commitLsn and safekeeper responses including pageserver feedback,
* 1) Propagate cluster size received from ps to ensure the limit.
* 2) Propagate pageserver LSN positions to ensure backpressure limits.
* 3) Advance walproposer slot to commitLsn (releasing WAL & waking up waiters).
* 4) Propagate hot standby feedback.
*
* None of that is functional in sync-safekeepers.
*/
static void
walprop_pg_process_safekeeper_feedback(WalProposer *wp, XLogRecPtr commitLsn)
{
HotStandbyFeedback hsFeedback;
XLogRecPtr diskConsistentLsn;
XLogRecPtr oldDiskConsistentLsn;
diskConsistentLsn = quorumFeedback.rf.disk_consistent_lsn;
if (wp->config->syncSafekeepers)
return;
if (!wp->config->syncSafekeepers)
oldDiskConsistentLsn = quorumFeedback.rf.disk_consistent_lsn;
/* Get PageserverFeedback fields from the most advanced safekeeper */
GetLatestNeonFeedback(&quorumFeedback.rf, wp);
replication_feedback_set(&quorumFeedback.rf);
SetZenithCurrentClusterSize(quorumFeedback.rf.currentClusterSize);
if (commitLsn > quorumFeedback.flushLsn || oldDiskConsistentLsn != quorumFeedback.rf.disk_consistent_lsn)
{
/* Get PageserverFeedback fields from the most advanced safekeeper */
GetLatestNeonFeedback(&quorumFeedback.rf, wp);
SetZenithCurrentClusterSize(quorumFeedback.rf.currentClusterSize);
}
if (commitLsn > quorumFeedback.flushLsn || diskConsistentLsn != quorumFeedback.rf.disk_consistent_lsn)
{
if (commitLsn > quorumFeedback.flushLsn)
quorumFeedback.flushLsn = commitLsn;
/* advance the replication slot */
if (!wp->config->syncSafekeepers)
ProcessStandbyReply(
/* write_lsn - This is what durably stored in WAL service. */
quorumFeedback.flushLsn,
/* flush_lsn - This is what durably stored in WAL service. */
quorumFeedback.flushLsn,
/*
* Advance the replication slot to commitLsn. WAL before it is
* hardened and will be fetched from one of safekeepers by
* neon_walreader if needed.
*
* Also wakes up syncrep waiters.
*/
ProcessStandbyReply(
/* write_lsn - This is what durably stored in WAL service. */
quorumFeedback.flushLsn,
/* flush_lsn - This is what durably stored in WAL service. */
quorumFeedback.flushLsn,
/*
* apply_lsn - This is what processed and durably saved at*
* pageserver.
*/
quorumFeedback.rf.disk_consistent_lsn,
walprop_pg_get_current_timestamp(wp), false);
/*
* apply_lsn - This is what processed and durably saved at*
* pageserver.
*/
quorumFeedback.rf.disk_consistent_lsn,
walprop_pg_get_current_timestamp(wp), false);
}
CombineHotStanbyFeedbacks(&hsFeedback, wp);
if (hsFeedback.ts != 0 && memcmp(&hsFeedback, &quorumFeedback.hs, sizeof hsFeedback) != 0)
{
quorumFeedback.hs = hsFeedback;
if (!wp->config->syncSafekeepers)
ProcessStandbyHSFeedback(hsFeedback.ts,
XidFromFullTransactionId(hsFeedback.xmin),
EpochFromFullTransactionId(hsFeedback.xmin),
XidFromFullTransactionId(hsFeedback.catalog_xmin),
EpochFromFullTransactionId(hsFeedback.catalog_xmin));
ProcessStandbyHSFeedback(hsFeedback.ts,
XidFromFullTransactionId(hsFeedback.xmin),
EpochFromFullTransactionId(hsFeedback.xmin),
XidFromFullTransactionId(hsFeedback.catalog_xmin),
EpochFromFullTransactionId(hsFeedback.catalog_xmin));
}
}
static void
walprop_pg_confirm_wal_streamed(WalProposer *wp, XLogRecPtr lsn)
{
if (MyReplicationSlot)
PhysicalConfirmReceivedLocation(lsn);
}
static XLogRecPtr
walprop_pg_get_redo_start_lsn(WalProposer *wp)
{
@@ -2040,6 +2044,5 @@ static const walproposer_api walprop_pg = {
.get_redo_start_lsn = walprop_pg_get_redo_start_lsn,
.finish_sync_safekeepers = walprop_pg_finish_sync_safekeepers,
.process_safekeeper_feedback = walprop_pg_process_safekeeper_feedback,
.confirm_wal_streamed = walprop_pg_confirm_wal_streamed,
.log_internal = walprop_pg_log_internal,
};