Perform synchronous WAL download in wp only for logical replication.

wp -> sk communication now uses neon_walreader which will fetch missing WAL on
demand from safekeepers, so doesn't need this anymore. Also, cap WAL download by
max_slot_wal_keep_size to be able to start compute if lag is too high.
This commit is contained in:
Arseny Sher
2023-12-14 17:08:36 +03:00
committed by Arseny Sher
parent df760e6de5
commit 9c493869c7
5 changed files with 82 additions and 68 deletions

View File

@@ -14,7 +14,6 @@ use crate::bindings::PGAsyncWriteResult;
use crate::bindings::Safekeeper;
use crate::bindings::Size;
use crate::bindings::StringInfoData;
use crate::bindings::TimeLineID;
use crate::bindings::TimestampTz;
use crate::bindings::WalProposer;
use crate::bindings::WalProposerConnStatusType;
@@ -179,16 +178,11 @@ extern "C" fn conn_blocking_write(
}
}
extern "C" fn recovery_download(
sk: *mut Safekeeper,
_timeline: TimeLineID,
startpos: XLogRecPtr,
endpos: XLogRecPtr,
) -> bool {
extern "C" fn recovery_download(wp: *mut WalProposer, sk: *mut Safekeeper) -> bool {
unsafe {
let callback_data = (*(*(*sk).wp).config).callback_data;
let api = callback_data as *mut Box<dyn ApiImpl>;
(*api).recovery_download(&mut (*sk), startpos, endpos)
(*api).recovery_download(&mut (*wp), &mut (*sk))
}
}
@@ -354,14 +348,6 @@ extern "C" fn log_internal(
}
}
extern "C" fn after_election(wp: *mut WalProposer) {
unsafe {
let callback_data = (*(*wp).config).callback_data;
let api = callback_data as *mut Box<dyn ApiImpl>;
(*api).after_election(&mut (*wp))
}
}
#[derive(Debug)]
pub enum Level {
Debug5,
@@ -435,7 +421,6 @@ pub(crate) fn create_api() -> walproposer_api {
process_safekeeper_feedback: Some(process_safekeeper_feedback),
confirm_wal_streamed: Some(confirm_wal_streamed),
log_internal: Some(log_internal),
after_election: Some(after_election),
}
}

View File

@@ -86,7 +86,7 @@ pub trait ApiImpl {
todo!()
}
fn recovery_download(&self, _sk: &mut Safekeeper, _startpos: u64, _endpos: u64) -> bool {
fn recovery_download(&self, _wp: &mut WalProposer, _sk: &mut Safekeeper) -> bool {
todo!()
}
@@ -364,6 +364,14 @@ mod tests {
true
}
fn recovery_download(
&self,
_wp: &mut crate::bindings::WalProposer,
_sk: &mut crate::bindings::Safekeeper,
) -> bool {
true
}
fn wal_reader_allocate(&self, _: &mut crate::bindings::Safekeeper) -> NeonWALReadResult {
println!("wal_reader_allocate");
crate::bindings::NeonWALReadResult_NEON_WALREAD_SUCCESS

View File

@@ -809,7 +809,7 @@ RecvVoteResponse(Safekeeper *sk)
}
else if (wp->n_votes > wp->quorum)
{
/* recovery already performed, just start streaming */
/* already elected, start streaming */
SendProposerElected(sk);
}
else
@@ -835,21 +835,16 @@ HandleElectedProposer(WalProposer *wp)
DetermineEpochStartLsn(wp);
/*
* Check if not all safekeepers are up-to-date, we need to download WAL
* needed to synchronize them
* Synchronously download WAL from the most advanced safekeeper. We do
* that only for logical replication (and switching logical walsenders to
* neon_walreader is a todo.)
*/
if (wp->truncateLsn < wp->propEpochStartLsn)
if (!wp->api.recovery_download(wp, &wp->safekeeper[wp->donor]))
{
walprop_log(LOG,
"start recovery because truncateLsn=%X/%X is not "
"equal to epochStartLsn=%X/%X",
LSN_FORMAT_ARGS(wp->truncateLsn),
LSN_FORMAT_ARGS(wp->propEpochStartLsn));
/* Perform recovery */
if (!wp->api.recovery_download(&wp->safekeeper[wp->donor], wp->greetRequest.timeline, wp->truncateLsn, wp->propEpochStartLsn))
walprop_log(FATAL, "Failed to recover state");
walprop_log(FATAL, "failed to download WAL for logical replicaiton");
}
else if (wp->config->syncSafekeepers)
if (wp->truncateLsn == wp->propEpochStartLsn && wp->config->syncSafekeepers)
{
/* Sync is not needed: just exit */
wp->api.finish_sync_safekeepers(wp, wp->propEpochStartLsn);
@@ -1047,13 +1042,6 @@ DetermineEpochStartLsn(WalProposer *wp)
}
walprop_shared->mineLastElectedTerm = wp->propTerm;
}
/*
* WalProposer has just elected itself and initialized history, so we can
* call election callback. Usually it updates truncateLsn to fetch WAL for
* logical replication.
*/
wp->api.after_election(wp);
}
/*

View File

@@ -485,8 +485,11 @@ typedef struct walproposer_api
/* Blocking CopyData write, aka PQputCopyData + PQflush. */
bool (*conn_blocking_write) (Safekeeper *sk, void const *buf, size_t size);
/* Download WAL from startpos to endpos and make it available locally. */
bool (*recovery_download) (Safekeeper *sk, TimeLineID timeline, XLogRecPtr startpos, XLogRecPtr endpos);
/*
* Download WAL before basebackup for logical walsenders from sk, if
* needed
*/
bool (*recovery_download) (WalProposer *wp, Safekeeper *sk);
/* Allocate WAL reader. */
void (*wal_reader_allocate) (Safekeeper *sk);
@@ -556,14 +559,6 @@ typedef struct walproposer_api
* handled by elog().
*/
void (*log_internal) (WalProposer *wp, int level, const char *line);
/*
* Called right after the proposer was elected, but before it started
* recovery and sent ProposerElected message to the safekeepers.
*
* Used by logical replication to update truncateLsn.
*/
void (*after_election) (WalProposer *wp);
} walproposer_api;
/*

View File

@@ -101,6 +101,8 @@ static void add_nwr_event_set(Safekeeper *sk, uint32 events);
static void update_nwr_event_set(Safekeeper *sk, uint32 events);
static void rm_safekeeper_event_set(Safekeeper *to_remove, bool is_sk);
static XLogRecPtr GetLogRepRestartLSN(WalProposer *wp);
static void
init_walprop_config(bool syncSafekeepers)
{
@@ -1211,16 +1213,38 @@ XLogBroadcastWalProposer(WalProposer *wp)
}
}
/*
* Receive WAL from most advanced safekeeper
*/
/* Download WAL before basebackup for logical walsenders from sk, if needed */
static bool
WalProposerRecovery(Safekeeper *sk, TimeLineID timeline, XLogRecPtr startpos, XLogRecPtr endpos)
WalProposerRecovery(WalProposer *wp, Safekeeper *sk)
{
char *err;
WalReceiverConn *wrconn;
WalRcvStreamOptions options;
char conninfo[MAXCONNINFO];
TimeLineID timeline;
XLogRecPtr startpos;
XLogRecPtr endpos;
uint64 download_range_mb;
startpos = GetLogRepRestartLSN(wp);
if (startpos == InvalidXLogRecPtr)
return true; /* recovery not needed */
endpos = wp->propEpochStartLsn;
/*
* If we need to download more than a max_slot_wal_keep_size, cap to it to
* avoid risk of exploding pg_wal. Logical replication won't work until
* recreated, but at least compute would start; this also follows
* max_slot_wal_keep_size semantics.
*/
download_range_mb = (endpos - startpos) / 1024 / 1024;
if (max_slot_wal_keep_size_mb > 0 && download_range_mb >= max_slot_wal_keep_size_mb)
{
startpos = endpos - max_slot_wal_keep_size_mb * 1024 * 1024;
walprop_log(WARNING, "capped WAL download for logical replication to %X/%X as max_slot_wal_keep_size=%dMB",
LSN_FORMAT_ARGS(startpos), max_slot_wal_keep_size_mb);
}
timeline = wp->greetRequest.timeline;
if (!neon_auth_token)
{
@@ -1250,7 +1274,7 @@ WalProposerRecovery(Safekeeper *sk, TimeLineID timeline, XLogRecPtr startpos, XL
return false;
}
elog(LOG,
"start recovery from %s:%s starting from %X/%08X till %X/%08X timeline "
"start recovery for logical replication from %s:%s starting from %X/%08X till %X/%08X timeline "
"%d",
sk->host, sk->port, (uint32) (startpos >> 32),
(uint32) startpos, (uint32) (endpos >> 32), (uint32) endpos, timeline);
@@ -1928,15 +1952,15 @@ walprop_pg_log_internal(WalProposer *wp, int level, const char *line)
elog(FATAL, "unexpected log_internal message at level %d: %s", level, line);
}
static void
walprop_pg_after_election(WalProposer *wp)
static XLogRecPtr
GetLogRepRestartLSN(WalProposer *wp)
{
FILE *f;
XLogRecPtr lrRestartLsn;
XLogRecPtr lrRestartLsn = InvalidXLogRecPtr;
/* We don't need to do anything in syncSafekeepers mode. */
if (wp->config->syncSafekeepers)
return;
return InvalidXLogRecPtr;
/*
* If there are active logical replication subscription we need to provide
@@ -1944,25 +1968,40 @@ walprop_pg_after_election(WalProposer *wp)
* replication slots.
*/
f = fopen("restart.lsn", "rb");
if (f != NULL && !wp->config->syncSafekeepers)
if (f != NULL)
{
size_t rc = fread(&lrRestartLsn, sizeof(lrRestartLsn), 1, f);
size_t rc = fread(&lrRestartLsn, sizeof(lrRestartLsn), 1, f);
fclose(f);
if (rc == 1 && lrRestartLsn != InvalidXLogRecPtr)
{
elog(LOG, "Logical replication restart LSN %X/%X", LSN_FORMAT_ARGS(lrRestartLsn));
uint64 download_range_mb;
if (max_slot_wal_keep_size_mb <= 0 || lrRestartLsn + max_slot_wal_keep_size_mb*MB > wp->truncateLsn)
elog(LOG, "logical replication restart LSN %X/%X", LSN_FORMAT_ARGS(lrRestartLsn));
/*
* If we need to download more than a max_slot_wal_keep_size,
* don't do it to avoid risk of exploding pg_wal. Logical
* replication won't work until recreated, but at least compute
* would start; this also follows max_slot_wal_keep_size
* semantics.
*/
download_range_mb = (wp->propEpochStartLsn - lrRestartLsn) / MB;
if (max_slot_wal_keep_size_mb > 0 && download_range_mb >= max_slot_wal_keep_size_mb)
{
/*
* start from the beginning of the segment to fetch page headers
* verifed by XLogReader
*/
lrRestartLsn = lrRestartLsn - XLogSegmentOffset(lrRestartLsn, wal_segment_size);
wp->truncateLsn = Min(wp->truncateLsn, lrRestartLsn);
walprop_log(WARNING, "not downloading WAL for logical replication since %X/%X as max_slot_wal_keep_size=%dMB",
LSN_FORMAT_ARGS(lrRestartLsn), max_slot_wal_keep_size_mb);
return InvalidXLogRecPtr;
}
/*
* start from the beginning of the segment to fetch page headers
* verifed by XLogReader
*/
lrRestartLsn = lrRestartLsn - XLogSegmentOffset(lrRestartLsn, wal_segment_size);
}
}
return lrRestartLsn;
}
static const walproposer_api walprop_pg = {
@@ -1997,5 +2036,4 @@ static const walproposer_api walprop_pg = {
.process_safekeeper_feedback = walprop_pg_process_safekeeper_feedback,
.confirm_wal_streamed = walprop_pg_confirm_wal_streamed,
.log_internal = walprop_pg_log_internal,
.after_election = walprop_pg_after_election,
};