mirror of
https://github.com/neondatabase/neon.git
synced 2026-01-05 20:42:54 +00:00
On-demand WAL download for walsender (#6872)
## Problem There's allegedly a bug where if we connect a subscriber before WAL is downloaded from the safekeeper, it creates an error. ## Summary of changes Adds support for pausing safekeepers from sending WAL to computes, and then creates a compute and attaches a subscriber while it's in this paused state. Fails to reproduce the issue, but probably a good test to have --------- Co-authored-by: Arseny Sher <sher-ars@yandex.ru>
This commit is contained in:
@@ -50,6 +50,14 @@ extern "C" fn get_flush_rec_ptr(wp: *mut WalProposer) -> XLogRecPtr {
|
||||
}
|
||||
}
|
||||
|
||||
extern "C" fn update_donor(wp: *mut WalProposer, donor: *mut Safekeeper, donor_lsn: XLogRecPtr) {
|
||||
unsafe {
|
||||
let callback_data = (*(*wp).config).callback_data;
|
||||
let api = callback_data as *mut Box<dyn ApiImpl>;
|
||||
(*api).update_donor(&mut (*donor), donor_lsn)
|
||||
}
|
||||
}
|
||||
|
||||
extern "C" fn get_current_timestamp(wp: *mut WalProposer) -> TimestampTz {
|
||||
unsafe {
|
||||
let callback_data = (*(*wp).config).callback_data;
|
||||
@@ -391,6 +399,7 @@ pub(crate) fn create_api() -> walproposer_api {
|
||||
get_shmem_state: Some(get_shmem_state),
|
||||
start_streaming: Some(start_streaming),
|
||||
get_flush_rec_ptr: Some(get_flush_rec_ptr),
|
||||
update_donor: Some(update_donor),
|
||||
get_current_timestamp: Some(get_current_timestamp),
|
||||
conn_error_message: Some(conn_error_message),
|
||||
conn_status: Some(conn_status),
|
||||
@@ -421,6 +430,32 @@ pub(crate) fn create_api() -> walproposer_api {
|
||||
}
|
||||
}
|
||||
|
||||
pub fn empty_shmem() -> crate::bindings::WalproposerShmemState {
|
||||
let empty_feedback = crate::bindings::PageserverFeedback {
|
||||
present: false,
|
||||
currentClusterSize: 0,
|
||||
last_received_lsn: 0,
|
||||
disk_consistent_lsn: 0,
|
||||
remote_consistent_lsn: 0,
|
||||
replytime: 0,
|
||||
shard_number: 0,
|
||||
};
|
||||
|
||||
crate::bindings::WalproposerShmemState {
|
||||
propEpochStartLsn: crate::bindings::pg_atomic_uint64 { value: 0 },
|
||||
donor_name: [0; 64],
|
||||
donor_conninfo: [0; 1024],
|
||||
donor_lsn: 0,
|
||||
mutex: 0,
|
||||
mineLastElectedTerm: crate::bindings::pg_atomic_uint64 { value: 0 },
|
||||
backpressureThrottlingTime: crate::bindings::pg_atomic_uint64 { value: 0 },
|
||||
currentClusterSize: crate::bindings::pg_atomic_uint64 { value: 0 },
|
||||
shard_ps_feedback: [empty_feedback; 128],
|
||||
num_shards: 0,
|
||||
min_ps_feedback: empty_feedback,
|
||||
}
|
||||
}
|
||||
|
||||
impl std::fmt::Display for Level {
|
||||
fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
|
||||
write!(f, "{:?}", self)
|
||||
|
||||
@@ -1,8 +1,5 @@
|
||||
use std::ffi::CString;
|
||||
|
||||
use postgres_ffi::WAL_SEGMENT_SIZE;
|
||||
use utils::{id::TenantTimelineId, lsn::Lsn};
|
||||
|
||||
use crate::{
|
||||
api_bindings::{create_api, take_vec_u8, Level},
|
||||
bindings::{
|
||||
@@ -10,6 +7,8 @@ use crate::{
|
||||
WalProposerCreate, WalProposerFree, WalProposerPoll, WalProposerStart,
|
||||
},
|
||||
};
|
||||
use postgres_ffi::WAL_SEGMENT_SIZE;
|
||||
use utils::{id::TenantTimelineId, lsn::Lsn};
|
||||
|
||||
/// Rust high-level wrapper for C walproposer API. Many methods are not required
|
||||
/// for simple cases, hence todo!() in default implementations.
|
||||
@@ -28,6 +27,10 @@ pub trait ApiImpl {
|
||||
todo!()
|
||||
}
|
||||
|
||||
fn update_donor(&self, _donor: &mut Safekeeper, _donor_lsn: u64) {
|
||||
todo!()
|
||||
}
|
||||
|
||||
fn get_current_timestamp(&self) -> i64 {
|
||||
todo!()
|
||||
}
|
||||
@@ -274,6 +277,7 @@ mod tests {
|
||||
sync::{atomic::AtomicUsize, mpsc::sync_channel},
|
||||
};
|
||||
|
||||
use std::cell::UnsafeCell;
|
||||
use utils::id::TenantTimelineId;
|
||||
|
||||
use crate::{api_bindings::Level, bindings::NeonWALReadResult, walproposer::Wrapper};
|
||||
@@ -297,6 +301,8 @@ mod tests {
|
||||
replies_ptr: AtomicUsize,
|
||||
// channel to send LSN to the main thread
|
||||
sync_channel: std::sync::mpsc::SyncSender<u64>,
|
||||
// Shmem state, used for storing donor info
|
||||
shmem: UnsafeCell<crate::bindings::WalproposerShmemState>,
|
||||
}
|
||||
|
||||
impl MockImpl {
|
||||
@@ -327,11 +333,22 @@ mod tests {
|
||||
}
|
||||
|
||||
impl ApiImpl for MockImpl {
|
||||
fn get_shmem_state(&self) -> *mut crate::bindings::WalproposerShmemState {
|
||||
self.shmem.get()
|
||||
}
|
||||
|
||||
fn get_current_timestamp(&self) -> i64 {
|
||||
println!("get_current_timestamp");
|
||||
0
|
||||
}
|
||||
|
||||
fn update_donor(&self, donor: &mut crate::bindings::Safekeeper, donor_lsn: u64) {
|
||||
let mut shmem = unsafe { *self.get_shmem_state() };
|
||||
shmem.propEpochStartLsn.value = donor_lsn;
|
||||
shmem.donor_conninfo = donor.conninfo;
|
||||
shmem.donor_lsn = donor_lsn;
|
||||
}
|
||||
|
||||
fn conn_status(
|
||||
&self,
|
||||
_: &mut crate::bindings::Safekeeper,
|
||||
@@ -507,6 +524,7 @@ mod tests {
|
||||
],
|
||||
replies_ptr: AtomicUsize::new(0),
|
||||
sync_channel: sender,
|
||||
shmem: UnsafeCell::new(crate::api_bindings::empty_shmem()),
|
||||
});
|
||||
let config = crate::walproposer::Config {
|
||||
ttid,
|
||||
|
||||
@@ -14,7 +14,8 @@ OBJS = \
|
||||
relsize_cache.o \
|
||||
walproposer.o \
|
||||
walproposer_pg.o \
|
||||
control_plane_connector.o
|
||||
control_plane_connector.o \
|
||||
walsender_hooks.o
|
||||
|
||||
PG_CPPFLAGS = -I$(libpq_srcdir)
|
||||
SHLIB_LINK_INTERNAL = $(libpq)
|
||||
|
||||
@@ -34,6 +34,7 @@
|
||||
#include "walproposer.h"
|
||||
#include "pagestore_client.h"
|
||||
#include "control_plane_connector.h"
|
||||
#include "walsender_hooks.h"
|
||||
|
||||
PG_MODULE_MAGIC;
|
||||
void _PG_init(void);
|
||||
@@ -265,7 +266,6 @@ LogicalSlotsMonitorMain(Datum main_arg)
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
void
|
||||
_PG_init(void)
|
||||
{
|
||||
@@ -279,6 +279,7 @@ _PG_init(void)
|
||||
|
||||
pg_init_libpagestore();
|
||||
pg_init_walproposer();
|
||||
WalSender_Custom_XLogReaderRoutines = NeonOnDemandXLogReaderRoutines;
|
||||
|
||||
InitLogicalReplicationMonitor();
|
||||
|
||||
|
||||
@@ -36,10 +36,7 @@
|
||||
|
||||
static NeonWALReadResult NeonWALReadRemote(NeonWALReader *state, char *buf, XLogRecPtr startptr, Size count, TimeLineID tli);
|
||||
static NeonWALReadResult NeonWALReaderReadMsg(NeonWALReader *state);
|
||||
static void NeonWALReaderResetRemote(NeonWALReader *state);
|
||||
static bool NeonWALReadLocal(NeonWALReader *state, char *buf, XLogRecPtr startptr, Size count, TimeLineID tli);
|
||||
static bool neon_wal_segment_open(NeonWALReader *state, XLogSegNo nextSegNo, TimeLineID *tli_p);
|
||||
static void neon_wal_segment_close(NeonWALReader *state);
|
||||
static bool is_wal_segment_exists(XLogSegNo segno, int segsize,
|
||||
TimeLineID tli);
|
||||
|
||||
@@ -82,8 +79,9 @@ struct NeonWALReader
|
||||
XLogRecPtr req_lsn;
|
||||
Size req_len;
|
||||
Size req_progress;
|
||||
WalProposer *wp; /* we learn donor through walproposer */
|
||||
char donor_conninfo[MAXCONNINFO];
|
||||
char donor_name[64]; /* saved donor safekeeper name for logging */
|
||||
XLogRecPtr donor_lsn;
|
||||
/* state of connection to safekeeper */
|
||||
NeonWALReaderRemoteState rem_state;
|
||||
WalProposerConn *wp_conn;
|
||||
@@ -107,7 +105,7 @@ struct NeonWALReader
|
||||
|
||||
/* palloc and initialize NeonWALReader */
|
||||
NeonWALReader *
|
||||
NeonWALReaderAllocate(int wal_segment_size, XLogRecPtr available_lsn, WalProposer *wp, char *log_prefix)
|
||||
NeonWALReaderAllocate(int wal_segment_size, XLogRecPtr available_lsn, char *log_prefix)
|
||||
{
|
||||
NeonWALReader *reader;
|
||||
|
||||
@@ -123,8 +121,6 @@ NeonWALReaderAllocate(int wal_segment_size, XLogRecPtr available_lsn, WalPropose
|
||||
reader->seg.ws_tli = 0;
|
||||
reader->segcxt.ws_segsize = wal_segment_size;
|
||||
|
||||
reader->wp = wp;
|
||||
|
||||
reader->rem_state = RS_NONE;
|
||||
|
||||
if (log_prefix)
|
||||
@@ -204,21 +200,16 @@ NeonWALReadRemote(NeonWALReader *state, char *buf, XLogRecPtr startptr, Size cou
|
||||
{
|
||||
if (state->rem_state == RS_NONE)
|
||||
{
|
||||
XLogRecPtr donor_lsn;
|
||||
|
||||
/* no connection yet; start one */
|
||||
Safekeeper *donor = GetDonor(state->wp, &donor_lsn);
|
||||
|
||||
if (donor == NULL)
|
||||
if (!NeonWALReaderUpdateDonor(state))
|
||||
{
|
||||
snprintf(state->err_msg, sizeof(state->err_msg),
|
||||
"failed to establish remote connection to fetch WAL: no donor available");
|
||||
return NEON_WALREAD_ERROR;
|
||||
|
||||
}
|
||||
snprintf(state->donor_name, sizeof(state->donor_name), "%s:%s", donor->host, donor->port);
|
||||
nwr_log(LOG, "establishing connection to %s, flush_lsn %X/%X to fetch WAL",
|
||||
state->donor_name, LSN_FORMAT_ARGS(donor_lsn));
|
||||
state->wp_conn = libpqwp_connect_start(donor->conninfo);
|
||||
/* no connection yet; start one */
|
||||
nwr_log(LOG, "establishing connection to %s, lsn=%X/%X to fetch WAL", state->donor_name, LSN_FORMAT_ARGS(state->donor_lsn));
|
||||
state->wp_conn = libpqwp_connect_start(state->donor_conninfo);
|
||||
if (PQstatus(state->wp_conn->pg_conn) == CONNECTION_BAD)
|
||||
{
|
||||
snprintf(state->err_msg, sizeof(state->err_msg),
|
||||
@@ -251,10 +242,22 @@ NeonWALReadRemote(NeonWALReader *state, char *buf, XLogRecPtr startptr, Size cou
|
||||
{
|
||||
/* connection successfully established */
|
||||
char start_repl_query[128];
|
||||
term_t term = pg_atomic_read_u64(&GetWalpropShmemState()->mineLastElectedTerm);
|
||||
|
||||
/*
|
||||
* Set elected walproposer's term to pull only data from
|
||||
* its history. Note: for logical walsender it means we
|
||||
* might stream WAL not yet committed by safekeepers. It
|
||||
* would be cleaner to fix this.
|
||||
*
|
||||
* mineLastElectedTerm shouldn't be 0 at this point
|
||||
* because we checked above that donor exists and it
|
||||
* appears only after successfull election.
|
||||
*/
|
||||
Assert(term > 0);
|
||||
snprintf(start_repl_query, sizeof(start_repl_query),
|
||||
"START_REPLICATION PHYSICAL %X/%X (term='" UINT64_FORMAT "')",
|
||||
LSN_FORMAT_ARGS(startptr), state->wp->propTerm);
|
||||
LSN_FORMAT_ARGS(startptr), term);
|
||||
nwr_log(LOG, "connection to %s to fetch WAL succeeded, running %s",
|
||||
state->donor_name, start_repl_query);
|
||||
if (!libpqwp_send_query(state->wp_conn, start_repl_query))
|
||||
@@ -404,6 +407,10 @@ NeonWALReadRemote(NeonWALReader *state, char *buf, XLogRecPtr startptr, Size cou
|
||||
state->req_lsn = InvalidXLogRecPtr;
|
||||
state->req_len = 0;
|
||||
state->req_progress = 0;
|
||||
|
||||
/* Update the current segment info. */
|
||||
state->seg.ws_tli = tli;
|
||||
|
||||
return NEON_WALREAD_SUCCESS;
|
||||
}
|
||||
}
|
||||
@@ -526,7 +533,7 @@ err:
|
||||
}
|
||||
|
||||
/* reset remote connection and request in progress */
|
||||
static void
|
||||
void
|
||||
NeonWALReaderResetRemote(NeonWALReader *state)
|
||||
{
|
||||
state->req_lsn = InvalidXLogRecPtr;
|
||||
@@ -691,13 +698,25 @@ NeonWALReadLocal(NeonWALReader *state, char *buf, XLogRecPtr startptr, Size coun
|
||||
return true;
|
||||
}
|
||||
|
||||
XLogRecPtr
|
||||
NeonWALReaderGetRemLsn(NeonWALReader *state)
|
||||
{
|
||||
return state->rem_lsn;
|
||||
}
|
||||
|
||||
const WALOpenSegment *
|
||||
NeonWALReaderGetSegment(NeonWALReader *state)
|
||||
{
|
||||
return &state->seg;
|
||||
}
|
||||
|
||||
/*
|
||||
* Copy of vanilla wal_segment_open, but returns false in case of error instead
|
||||
* of ERROR, with errno set.
|
||||
*
|
||||
* XLogReaderRoutine->segment_open callback for local pg_wal files
|
||||
*/
|
||||
static bool
|
||||
bool
|
||||
neon_wal_segment_open(NeonWALReader *state, XLogSegNo nextSegNo,
|
||||
TimeLineID *tli_p)
|
||||
{
|
||||
@@ -724,7 +743,7 @@ is_wal_segment_exists(XLogSegNo segno, int segsize, TimeLineID tli)
|
||||
}
|
||||
|
||||
/* copy of vanilla wal_segment_close with NeonWALReader */
|
||||
static void
|
||||
void
|
||||
neon_wal_segment_close(NeonWALReader *state)
|
||||
{
|
||||
if (state->seg.ws_file >= 0)
|
||||
@@ -740,3 +759,19 @@ NeonWALReaderErrMsg(NeonWALReader *state)
|
||||
{
|
||||
return state->err_msg;
|
||||
}
|
||||
|
||||
/*
|
||||
* Returns true if there is a donor, and false otherwise
|
||||
*/
|
||||
bool
|
||||
NeonWALReaderUpdateDonor(NeonWALReader *state)
|
||||
{
|
||||
WalproposerShmemState *wps = GetWalpropShmemState();
|
||||
|
||||
SpinLockAcquire(&wps->mutex);
|
||||
memcpy(state->donor_name, wps->donor_name, sizeof(state->donor_name));
|
||||
memcpy(state->donor_conninfo, wps->donor_conninfo, sizeof(state->donor_conninfo));
|
||||
state->donor_lsn = wps->donor_lsn;
|
||||
SpinLockRelease(&wps->mutex);
|
||||
return state->donor_name[0] != '\0';
|
||||
}
|
||||
|
||||
@@ -19,12 +19,19 @@ typedef enum
|
||||
NEON_WALREAD_ERROR,
|
||||
} NeonWALReadResult;
|
||||
|
||||
extern NeonWALReader *NeonWALReaderAllocate(int wal_segment_size, XLogRecPtr available_lsn, WalProposer *wp, char *log_prefix);
|
||||
extern NeonWALReader *NeonWALReaderAllocate(int wal_segment_size, XLogRecPtr available_lsn, char *log_prefix);
|
||||
extern void NeonWALReaderFree(NeonWALReader *state);
|
||||
extern void NeonWALReaderResetRemote(NeonWALReader *state);
|
||||
extern NeonWALReadResult NeonWALRead(NeonWALReader *state, char *buf, XLogRecPtr startptr, Size count, TimeLineID tli);
|
||||
extern pgsocket NeonWALReaderSocket(NeonWALReader *state);
|
||||
extern uint32 NeonWALReaderEvents(NeonWALReader *state);
|
||||
extern bool NeonWALReaderIsRemConnEstablished(NeonWALReader *state);
|
||||
extern char *NeonWALReaderErrMsg(NeonWALReader *state);
|
||||
extern XLogRecPtr NeonWALReaderGetRemLsn(NeonWALReader *state);
|
||||
extern const WALOpenSegment *NeonWALReaderGetSegment(NeonWALReader *state);
|
||||
extern bool neon_wal_segment_open(NeonWALReader *state, XLogSegNo nextSegNo, TimeLineID *tli_p);
|
||||
extern void neon_wal_segment_close(NeonWALReader *state);
|
||||
extern bool NeonWALReaderUpdateDonor(NeonWALReader *state);
|
||||
|
||||
|
||||
#endif /* __NEON_WALREADER_H__ */
|
||||
|
||||
@@ -80,7 +80,7 @@ static int CompareLsn(const void *a, const void *b);
|
||||
static char *FormatSafekeeperState(Safekeeper *sk);
|
||||
static void AssertEventsOkForState(uint32 events, Safekeeper *sk);
|
||||
static char *FormatEvents(WalProposer *wp, uint32 events);
|
||||
|
||||
static void UpdateDonorShmem(WalProposer *wp);
|
||||
|
||||
WalProposer *
|
||||
WalProposerCreate(WalProposerConfig *config, walproposer_api api)
|
||||
@@ -922,7 +922,8 @@ static void
|
||||
DetermineEpochStartLsn(WalProposer *wp)
|
||||
{
|
||||
TermHistory *dth;
|
||||
int n_ready = 0;
|
||||
int n_ready = 0;
|
||||
WalproposerShmemState *walprop_shared;
|
||||
|
||||
wp->propEpochStartLsn = InvalidXLogRecPtr;
|
||||
wp->donorEpoch = 0;
|
||||
@@ -964,16 +965,18 @@ DetermineEpochStartLsn(WalProposer *wp)
|
||||
if (n_ready < wp->quorum)
|
||||
{
|
||||
/*
|
||||
* This is a rare case that can be triggered if safekeeper has voted and disconnected.
|
||||
* In this case, its state will not be SS_IDLE and its vote cannot be used, because
|
||||
* we clean up `voteResponse` in `ShutdownConnection`.
|
||||
* This is a rare case that can be triggered if safekeeper has voted
|
||||
* and disconnected. In this case, its state will not be SS_IDLE and
|
||||
* its vote cannot be used, because we clean up `voteResponse` in
|
||||
* `ShutdownConnection`.
|
||||
*/
|
||||
wp_log(FATAL, "missing majority of votes, collected %d, expected %d, got %d", wp->n_votes, wp->quorum, n_ready);
|
||||
}
|
||||
|
||||
/*
|
||||
* If propEpochStartLsn is 0, it means flushLsn is 0 everywhere, we are bootstrapping
|
||||
* and nothing was committed yet. Start streaming then from the basebackup LSN.
|
||||
* If propEpochStartLsn is 0, it means flushLsn is 0 everywhere, we are
|
||||
* bootstrapping and nothing was committed yet. Start streaming then from
|
||||
* the basebackup LSN.
|
||||
*/
|
||||
if (wp->propEpochStartLsn == InvalidXLogRecPtr && !wp->config->syncSafekeepers)
|
||||
{
|
||||
@@ -984,11 +987,12 @@ DetermineEpochStartLsn(WalProposer *wp)
|
||||
}
|
||||
wp_log(LOG, "bumped epochStartLsn to the first record %X/%X", LSN_FORMAT_ARGS(wp->propEpochStartLsn));
|
||||
}
|
||||
pg_atomic_write_u64(&wp->api.get_shmem_state(wp)->propEpochStartLsn, wp->propEpochStartLsn);
|
||||
|
||||
/*
|
||||
* Safekeepers are setting truncateLsn after timelineStartLsn is known, so it
|
||||
* should never be zero at this point, if we know timelineStartLsn.
|
||||
*
|
||||
* Safekeepers are setting truncateLsn after timelineStartLsn is known, so
|
||||
* it should never be zero at this point, if we know timelineStartLsn.
|
||||
*
|
||||
* timelineStartLsn can be zero only on the first syncSafekeepers run.
|
||||
*/
|
||||
Assert((wp->truncateLsn != InvalidXLogRecPtr) ||
|
||||
@@ -1022,10 +1026,9 @@ DetermineEpochStartLsn(WalProposer *wp)
|
||||
* since which we are going to write according to the consensus. If not,
|
||||
* we must bail out, as clog and other non rel data is inconsistent.
|
||||
*/
|
||||
walprop_shared = wp->api.get_shmem_state(wp);
|
||||
if (!wp->config->syncSafekeepers)
|
||||
{
|
||||
WalproposerShmemState *walprop_shared = wp->api.get_shmem_state(wp);
|
||||
|
||||
/*
|
||||
* Basebackup LSN always points to the beginning of the record (not
|
||||
* the page), as StartupXLOG most probably wants it this way.
|
||||
@@ -1040,7 +1043,7 @@ DetermineEpochStartLsn(WalProposer *wp)
|
||||
* compute (who could generate WAL) is ok.
|
||||
*/
|
||||
if (!((dth->n_entries >= 1) && (dth->entries[dth->n_entries - 1].term ==
|
||||
walprop_shared->mineLastElectedTerm)))
|
||||
pg_atomic_read_u64(&walprop_shared->mineLastElectedTerm))))
|
||||
{
|
||||
/*
|
||||
* Panic to restart PG as we need to retake basebackup.
|
||||
@@ -1054,8 +1057,8 @@ DetermineEpochStartLsn(WalProposer *wp)
|
||||
LSN_FORMAT_ARGS(wp->api.get_redo_start_lsn(wp)));
|
||||
}
|
||||
}
|
||||
walprop_shared->mineLastElectedTerm = wp->propTerm;
|
||||
}
|
||||
pg_atomic_write_u64(&walprop_shared->mineLastElectedTerm, wp->propTerm);
|
||||
}
|
||||
|
||||
/*
|
||||
@@ -1105,9 +1108,13 @@ SendProposerElected(Safekeeper *sk)
|
||||
{
|
||||
/* safekeeper is empty or no common point, start from the beginning */
|
||||
sk->startStreamingAt = wp->propTermHistory.entries[0].lsn;
|
||||
wp_log(LOG, "no common point with sk %s:%s, streaming since first term at %X/%X, timelineStartLsn=%X/%X, termHistory.n_entries=%u" ,
|
||||
sk->host, sk->port, LSN_FORMAT_ARGS(sk->startStreamingAt), LSN_FORMAT_ARGS(wp->timelineStartLsn), wp->propTermHistory.n_entries);
|
||||
/* wp->timelineStartLsn == InvalidXLogRecPtr can be only when timeline is created manually (test_s3_wal_replay) */
|
||||
wp_log(LOG, "no common point with sk %s:%s, streaming since first term at %X/%X, timelineStartLsn=%X/%X, termHistory.n_entries=%u",
|
||||
sk->host, sk->port, LSN_FORMAT_ARGS(sk->startStreamingAt), LSN_FORMAT_ARGS(wp->timelineStartLsn), wp->propTermHistory.n_entries);
|
||||
|
||||
/*
|
||||
* wp->timelineStartLsn == InvalidXLogRecPtr can be only when timeline
|
||||
* is created manually (test_s3_wal_replay)
|
||||
*/
|
||||
Assert(sk->startStreamingAt == wp->timelineStartLsn || wp->timelineStartLsn == InvalidXLogRecPtr);
|
||||
}
|
||||
else
|
||||
@@ -1177,6 +1184,12 @@ StartStreaming(Safekeeper *sk)
|
||||
sk->active_state = SS_ACTIVE_SEND;
|
||||
sk->streamingAt = sk->startStreamingAt;
|
||||
|
||||
/*
|
||||
* Donors can only be in SS_ACTIVE state, so we potentially update the
|
||||
* donor when we switch one to SS_ACTIVE.
|
||||
*/
|
||||
UpdateDonorShmem(sk->wp);
|
||||
|
||||
/* event set will be updated inside SendMessageToNode */
|
||||
SendMessageToNode(sk);
|
||||
}
|
||||
@@ -1568,17 +1581,17 @@ GetAcknowledgedByQuorumWALPosition(WalProposer *wp)
|
||||
* none if it doesn't exist. donor_lsn is set to end position of the donor to
|
||||
* the best of our knowledge.
|
||||
*/
|
||||
Safekeeper *
|
||||
GetDonor(WalProposer *wp, XLogRecPtr *donor_lsn)
|
||||
static void
|
||||
UpdateDonorShmem(WalProposer *wp)
|
||||
{
|
||||
Safekeeper *donor = NULL;
|
||||
int i;
|
||||
*donor_lsn = InvalidXLogRecPtr;
|
||||
XLogRecPtr donor_lsn = InvalidXLogRecPtr;
|
||||
|
||||
if (wp->n_votes < wp->quorum)
|
||||
{
|
||||
wp_log(WARNING, "GetDonor called before elections are won");
|
||||
return NULL;
|
||||
wp_log(WARNING, "UpdateDonorShmem called before elections are won");
|
||||
return;
|
||||
}
|
||||
|
||||
/*
|
||||
@@ -1589,7 +1602,7 @@ GetDonor(WalProposer *wp, XLogRecPtr *donor_lsn)
|
||||
if (wp->safekeeper[wp->donor].state >= SS_IDLE)
|
||||
{
|
||||
donor = &wp->safekeeper[wp->donor];
|
||||
*donor_lsn = wp->propEpochStartLsn;
|
||||
donor_lsn = wp->propEpochStartLsn;
|
||||
}
|
||||
|
||||
/*
|
||||
@@ -1601,13 +1614,19 @@ GetDonor(WalProposer *wp, XLogRecPtr *donor_lsn)
|
||||
{
|
||||
Safekeeper *sk = &wp->safekeeper[i];
|
||||
|
||||
if (sk->state == SS_ACTIVE && sk->appendResponse.flushLsn > *donor_lsn)
|
||||
if (sk->state == SS_ACTIVE && sk->appendResponse.flushLsn > donor_lsn)
|
||||
{
|
||||
donor = sk;
|
||||
*donor_lsn = sk->appendResponse.flushLsn;
|
||||
donor_lsn = sk->appendResponse.flushLsn;
|
||||
}
|
||||
}
|
||||
return donor;
|
||||
|
||||
if (donor == NULL)
|
||||
{
|
||||
wp_log(WARNING, "UpdateDonorShmem didn't find a suitable donor, skipping");
|
||||
return;
|
||||
}
|
||||
wp->api.update_donor(wp, donor, donor_lsn);
|
||||
}
|
||||
|
||||
/*
|
||||
@@ -1617,7 +1636,7 @@ static void
|
||||
HandleSafekeeperResponse(WalProposer *wp, Safekeeper *sk)
|
||||
{
|
||||
XLogRecPtr candidateTruncateLsn;
|
||||
XLogRecPtr newCommitLsn;
|
||||
XLogRecPtr newCommitLsn;
|
||||
|
||||
newCommitLsn = GetAcknowledgedByQuorumWALPosition(wp);
|
||||
if (newCommitLsn > wp->commitLsn)
|
||||
@@ -1627,7 +1646,7 @@ HandleSafekeeperResponse(WalProposer *wp, Safekeeper *sk)
|
||||
BroadcastAppendRequest(wp);
|
||||
}
|
||||
|
||||
/*
|
||||
/*
|
||||
* Unlock syncrep waiters, update ps_feedback, CheckGracefulShutdown().
|
||||
* The last one will terminate the process if the shutdown is requested
|
||||
* and WAL is committed by the quorum. BroadcastAppendRequest() should be
|
||||
|
||||
@@ -284,14 +284,19 @@ typedef struct PageserverFeedback
|
||||
|
||||
typedef struct WalproposerShmemState
|
||||
{
|
||||
pg_atomic_uint64 propEpochStartLsn;
|
||||
char donor_name[64];
|
||||
char donor_conninfo[MAXCONNINFO];
|
||||
XLogRecPtr donor_lsn;
|
||||
|
||||
slock_t mutex;
|
||||
term_t mineLastElectedTerm;
|
||||
pg_atomic_uint64 mineLastElectedTerm;
|
||||
pg_atomic_uint64 backpressureThrottlingTime;
|
||||
pg_atomic_uint64 currentClusterSize;
|
||||
|
||||
/* last feedback from each shard */
|
||||
PageserverFeedback shard_ps_feedback[MAX_SHARDS];
|
||||
int num_shards;
|
||||
int num_shards;
|
||||
|
||||
/* aggregated feedback with min LSNs across shards */
|
||||
PageserverFeedback min_ps_feedback;
|
||||
@@ -465,6 +470,9 @@ typedef struct walproposer_api
|
||||
/* Get pointer to the latest available WAL. */
|
||||
XLogRecPtr (*get_flush_rec_ptr) (WalProposer *wp);
|
||||
|
||||
/* Update current donor info in WalProposer Shmem */
|
||||
void (*update_donor) (WalProposer *wp, Safekeeper *donor, XLogRecPtr donor_lsn);
|
||||
|
||||
/* Get current time. */
|
||||
TimestampTz (*get_current_timestamp) (WalProposer *wp);
|
||||
|
||||
@@ -497,7 +505,7 @@ typedef struct walproposer_api
|
||||
*
|
||||
* On success, the data is placed in *buf. It is valid until the next call
|
||||
* to this function.
|
||||
*
|
||||
*
|
||||
* Returns PG_ASYNC_READ_FAIL on closed connection.
|
||||
*/
|
||||
PGAsyncReadResult (*conn_async_read) (Safekeeper *sk, char **buf, int *amount);
|
||||
@@ -545,13 +553,14 @@ typedef struct walproposer_api
|
||||
* Returns 0 if timeout is reached, 1 if some event happened. Updates
|
||||
* events mask to indicate events and sets sk to the safekeeper which has
|
||||
* an event.
|
||||
*
|
||||
*
|
||||
* On timeout, events is set to WL_NO_EVENTS. On socket event, events is
|
||||
* set to WL_SOCKET_READABLE and/or WL_SOCKET_WRITEABLE. When socket is
|
||||
* closed, events is set to WL_SOCKET_READABLE.
|
||||
*
|
||||
* WL_SOCKET_WRITEABLE is usually set only when we need to flush the buffer.
|
||||
* It can be returned only if caller asked for this event in the last *_event_set call.
|
||||
*
|
||||
* WL_SOCKET_WRITEABLE is usually set only when we need to flush the
|
||||
* buffer. It can be returned only if caller asked for this event in the
|
||||
* last *_event_set call.
|
||||
*/
|
||||
int (*wait_event_set) (WalProposer *wp, long timeout, Safekeeper **sk, uint32 *events);
|
||||
|
||||
@@ -571,9 +580,9 @@ typedef struct walproposer_api
|
||||
void (*finish_sync_safekeepers) (WalProposer *wp, XLogRecPtr lsn);
|
||||
|
||||
/*
|
||||
* Called after every AppendResponse from the safekeeper. Used to propagate
|
||||
* backpressure feedback and to confirm WAL persistence (has been commited
|
||||
* on the quorum of safekeepers).
|
||||
* Called after every AppendResponse from the safekeeper. Used to
|
||||
* propagate backpressure feedback and to confirm WAL persistence (has
|
||||
* been commited on the quorum of safekeepers).
|
||||
*/
|
||||
void (*process_safekeeper_feedback) (WalProposer *wp, Safekeeper *sk);
|
||||
|
||||
@@ -716,12 +725,14 @@ extern void WalProposerBroadcast(WalProposer *wp, XLogRecPtr startpos, XLogRecPt
|
||||
extern void WalProposerPoll(WalProposer *wp);
|
||||
extern void WalProposerFree(WalProposer *wp);
|
||||
|
||||
extern WalproposerShmemState *GetWalpropShmemState();
|
||||
|
||||
/*
|
||||
* WaitEventSet API doesn't allow to remove socket, so walproposer_pg uses it to
|
||||
* recreate set from scratch, hence the export.
|
||||
*/
|
||||
extern void SafekeeperStateDesiredEvents(Safekeeper *sk, uint32 *sk_events, uint32 *nwr_events);
|
||||
extern Safekeeper *GetDonor(WalProposer *wp, XLogRecPtr *donor_lsn);
|
||||
extern TimeLineID walprop_pg_get_timeline_id(void);
|
||||
|
||||
|
||||
#define WPEVENT 1337 /* special log level for walproposer internal
|
||||
|
||||
@@ -85,7 +85,6 @@ static void walprop_pg_init_standalone_sync_safekeepers(void);
|
||||
static void walprop_pg_init_walsender(void);
|
||||
static void walprop_pg_init_bgworker(void);
|
||||
static TimestampTz walprop_pg_get_current_timestamp(WalProposer *wp);
|
||||
static TimeLineID walprop_pg_get_timeline_id(void);
|
||||
static void walprop_pg_load_libpqwalreceiver(void);
|
||||
|
||||
static process_interrupts_callback_t PrevProcessInterruptsCallback;
|
||||
@@ -94,6 +93,8 @@ static shmem_startup_hook_type prev_shmem_startup_hook_type;
|
||||
static shmem_request_hook_type prev_shmem_request_hook = NULL;
|
||||
static void walproposer_shmem_request(void);
|
||||
#endif
|
||||
static void WalproposerShmemInit_SyncSafekeeper(void);
|
||||
|
||||
|
||||
static void StartProposerReplication(WalProposer *wp, StartReplicationCmd *cmd);
|
||||
static void WalSndLoop(WalProposer *wp);
|
||||
@@ -136,6 +137,7 @@ WalProposerSync(int argc, char *argv[])
|
||||
WalProposer *wp;
|
||||
|
||||
init_walprop_config(true);
|
||||
WalproposerShmemInit_SyncSafekeeper();
|
||||
walprop_pg_init_standalone_sync_safekeepers();
|
||||
walprop_pg_load_libpqwalreceiver();
|
||||
|
||||
@@ -281,6 +283,8 @@ WalproposerShmemInit(void)
|
||||
{
|
||||
memset(walprop_shared, 0, WalproposerShmemSize());
|
||||
SpinLockInit(&walprop_shared->mutex);
|
||||
pg_atomic_init_u64(&walprop_shared->propEpochStartLsn, 0);
|
||||
pg_atomic_init_u64(&walprop_shared->mineLastElectedTerm, 0);
|
||||
pg_atomic_init_u64(&walprop_shared->backpressureThrottlingTime, 0);
|
||||
pg_atomic_init_u64(&walprop_shared->currentClusterSize, 0);
|
||||
}
|
||||
@@ -289,6 +293,17 @@ WalproposerShmemInit(void)
|
||||
return found;
|
||||
}
|
||||
|
||||
static void
|
||||
WalproposerShmemInit_SyncSafekeeper(void)
|
||||
{
|
||||
walprop_shared = palloc(WalproposerShmemSize());
|
||||
memset(walprop_shared, 0, WalproposerShmemSize());
|
||||
SpinLockInit(&walprop_shared->mutex);
|
||||
pg_atomic_init_u64(&walprop_shared->propEpochStartLsn, 0);
|
||||
pg_atomic_init_u64(&walprop_shared->mineLastElectedTerm, 0);
|
||||
pg_atomic_init_u64(&walprop_shared->backpressureThrottlingTime, 0);
|
||||
}
|
||||
|
||||
#define BACK_PRESSURE_DELAY 10000L // 0.01 sec
|
||||
|
||||
static bool
|
||||
@@ -399,6 +414,13 @@ nwp_shmem_startup_hook(void)
|
||||
WalproposerShmemInit();
|
||||
}
|
||||
|
||||
WalproposerShmemState *
|
||||
GetWalpropShmemState()
|
||||
{
|
||||
Assert(walprop_shared != NULL);
|
||||
return walprop_shared;
|
||||
}
|
||||
|
||||
static WalproposerShmemState *
|
||||
walprop_pg_get_shmem_state(WalProposer *wp)
|
||||
{
|
||||
@@ -431,14 +453,15 @@ record_pageserver_feedback(PageserverFeedback *ps_feedback)
|
||||
for (int i = 0; i < walprop_shared->num_shards; i++)
|
||||
{
|
||||
PageserverFeedback *feedback = &walprop_shared->shard_ps_feedback[i];
|
||||
|
||||
if (feedback->present)
|
||||
{
|
||||
if (min_feedback.last_received_lsn == InvalidXLogRecPtr || feedback->last_received_lsn < min_feedback.last_received_lsn)
|
||||
min_feedback.last_received_lsn = feedback->last_received_lsn;
|
||||
|
||||
|
||||
if (min_feedback.disk_consistent_lsn == InvalidXLogRecPtr || feedback->disk_consistent_lsn < min_feedback.disk_consistent_lsn)
|
||||
min_feedback.disk_consistent_lsn = feedback->disk_consistent_lsn;
|
||||
|
||||
|
||||
if (min_feedback.remote_consistent_lsn == InvalidXLogRecPtr || feedback->remote_consistent_lsn < min_feedback.remote_consistent_lsn)
|
||||
min_feedback.remote_consistent_lsn = feedback->remote_consistent_lsn;
|
||||
}
|
||||
@@ -551,6 +574,7 @@ static void
|
||||
walprop_sigusr2(SIGNAL_ARGS)
|
||||
{
|
||||
int save_errno = errno;
|
||||
|
||||
got_SIGUSR2 = true;
|
||||
SetLatch(MyLatch);
|
||||
errno = save_errno;
|
||||
@@ -598,7 +622,7 @@ walprop_pg_get_current_timestamp(WalProposer *wp)
|
||||
return GetCurrentTimestamp();
|
||||
}
|
||||
|
||||
static TimeLineID
|
||||
TimeLineID
|
||||
walprop_pg_get_timeline_id(void)
|
||||
{
|
||||
#if PG_VERSION_NUM >= 150000
|
||||
@@ -617,6 +641,20 @@ walprop_pg_load_libpqwalreceiver(void)
|
||||
wpg_log(ERROR, "libpqwalreceiver didn't initialize correctly");
|
||||
}
|
||||
|
||||
static void
|
||||
walprop_pg_update_donor(WalProposer *wp, Safekeeper *donor, XLogRecPtr donor_lsn)
|
||||
{
|
||||
WalproposerShmemState *wps = wp->api.get_shmem_state(wp);
|
||||
char donor_name[64];
|
||||
|
||||
pg_snprintf(donor_name, sizeof(donor_name), "%s:%s", donor->host, donor->port);
|
||||
SpinLockAcquire(&wps->mutex);
|
||||
memcpy(wps->donor_name, donor_name, sizeof(donor_name));
|
||||
memcpy(wps->donor_conninfo, donor->conninfo, sizeof(donor->conninfo));
|
||||
wps->donor_lsn = donor_lsn;
|
||||
SpinLockRelease(&wps->mutex);
|
||||
}
|
||||
|
||||
/* Helper function */
|
||||
static bool
|
||||
ensure_nonblocking_status(WalProposerConn *conn, bool is_nonblocking)
|
||||
@@ -717,7 +755,6 @@ walprop_connect_start(Safekeeper *sk)
|
||||
{
|
||||
Assert(sk->conn == NULL);
|
||||
sk->conn = libpqwp_connect_start(sk->conninfo);
|
||||
|
||||
}
|
||||
|
||||
static WalProposerConnectPollStatusType
|
||||
@@ -1091,7 +1128,7 @@ static void
|
||||
StartProposerReplication(WalProposer *wp, StartReplicationCmd *cmd)
|
||||
{
|
||||
XLogRecPtr FlushPtr;
|
||||
__attribute__((unused)) TimeLineID currTLI;
|
||||
__attribute__((unused)) TimeLineID currTLI;
|
||||
|
||||
#if PG_VERSION_NUM < 150000
|
||||
if (ThisTimeLineID == 0)
|
||||
@@ -1295,116 +1332,13 @@ XLogBroadcastWalProposer(WalProposer *wp)
|
||||
}
|
||||
}
|
||||
|
||||
/* Download WAL before basebackup for logical walsenders from sk, if needed */
|
||||
/*
|
||||
Used to download WAL before basebackup for logical walsenders from sk, no longer
|
||||
needed because walsender always uses neon_walreader.
|
||||
*/
|
||||
static bool
|
||||
WalProposerRecovery(WalProposer *wp, Safekeeper *sk)
|
||||
{
|
||||
char *err;
|
||||
WalReceiverConn *wrconn;
|
||||
WalRcvStreamOptions options;
|
||||
char conninfo[MAXCONNINFO];
|
||||
TimeLineID timeline;
|
||||
XLogRecPtr startpos;
|
||||
XLogRecPtr endpos;
|
||||
|
||||
startpos = GetLogRepRestartLSN(wp);
|
||||
if (startpos == InvalidXLogRecPtr)
|
||||
return true; /* recovery not needed */
|
||||
endpos = wp->propEpochStartLsn;
|
||||
|
||||
timeline = wp->greetRequest.timeline;
|
||||
|
||||
if (!neon_auth_token)
|
||||
{
|
||||
memcpy(conninfo, sk->conninfo, MAXCONNINFO);
|
||||
}
|
||||
else
|
||||
{
|
||||
int written = 0;
|
||||
|
||||
written = snprintf((char *) conninfo, MAXCONNINFO, "password=%s %s", neon_auth_token, sk->conninfo);
|
||||
if (written > MAXCONNINFO || written < 0)
|
||||
wpg_log(FATAL, "could not append password to the safekeeper connection string");
|
||||
}
|
||||
|
||||
#if PG_MAJORVERSION_NUM < 16
|
||||
wrconn = walrcv_connect(conninfo, false, "wal_proposer_recovery", &err);
|
||||
#else
|
||||
wrconn = walrcv_connect(conninfo, false, false, "wal_proposer_recovery", &err);
|
||||
#endif
|
||||
|
||||
if (!wrconn)
|
||||
{
|
||||
ereport(WARNING,
|
||||
(errmsg("could not connect to WAL acceptor %s:%s: %s",
|
||||
sk->host, sk->port,
|
||||
err)));
|
||||
return false;
|
||||
}
|
||||
wpg_log(LOG,
|
||||
"start recovery for logical replication from %s:%s starting from %X/%08X till %X/%08X timeline "
|
||||
"%d",
|
||||
sk->host, sk->port, (uint32) (startpos >> 32),
|
||||
(uint32) startpos, (uint32) (endpos >> 32), (uint32) endpos, timeline);
|
||||
|
||||
options.logical = false;
|
||||
options.startpoint = startpos;
|
||||
options.slotname = NULL;
|
||||
options.proto.physical.startpointTLI = timeline;
|
||||
|
||||
if (walrcv_startstreaming(wrconn, &options))
|
||||
{
|
||||
XLogRecPtr rec_start_lsn;
|
||||
XLogRecPtr rec_end_lsn = 0;
|
||||
int len;
|
||||
char *buf;
|
||||
pgsocket wait_fd = PGINVALID_SOCKET;
|
||||
|
||||
while ((len = walrcv_receive(wrconn, &buf, &wait_fd)) >= 0)
|
||||
{
|
||||
if (len == 0)
|
||||
{
|
||||
(void) WaitLatchOrSocket(
|
||||
MyLatch, WL_EXIT_ON_PM_DEATH | WL_SOCKET_READABLE, wait_fd,
|
||||
-1, WAIT_EVENT_WAL_RECEIVER_MAIN);
|
||||
}
|
||||
else
|
||||
{
|
||||
Assert(buf[0] == 'w' || buf[0] == 'k');
|
||||
if (buf[0] == 'k')
|
||||
continue; /* keepalive */
|
||||
memcpy(&rec_start_lsn, &buf[XLOG_HDR_START_POS],
|
||||
sizeof rec_start_lsn);
|
||||
rec_start_lsn = pg_ntoh64(rec_start_lsn);
|
||||
rec_end_lsn = rec_start_lsn + len - XLOG_HDR_SIZE;
|
||||
|
||||
/* write WAL to disk */
|
||||
XLogWalPropWrite(sk->wp, &buf[XLOG_HDR_SIZE], len - XLOG_HDR_SIZE, rec_start_lsn);
|
||||
|
||||
ereport(DEBUG1,
|
||||
(errmsg("Recover message %X/%X length %d",
|
||||
LSN_FORMAT_ARGS(rec_start_lsn), len)));
|
||||
if (rec_end_lsn >= endpos)
|
||||
break;
|
||||
}
|
||||
}
|
||||
ereport(LOG,
|
||||
(errmsg("end of replication stream at %X/%X: %m",
|
||||
LSN_FORMAT_ARGS(rec_end_lsn))));
|
||||
walrcv_disconnect(wrconn);
|
||||
|
||||
/* failed to receive all WAL till endpos */
|
||||
if (rec_end_lsn < endpos)
|
||||
return false;
|
||||
}
|
||||
else
|
||||
{
|
||||
ereport(LOG,
|
||||
(errmsg("primary server contains no more WAL on requested timeline %u LSN %X/%08X",
|
||||
timeline, (uint32) (startpos >> 32), (uint32) startpos)));
|
||||
return false;
|
||||
}
|
||||
|
||||
return true;
|
||||
}
|
||||
|
||||
@@ -1545,7 +1479,7 @@ walprop_pg_wal_reader_allocate(Safekeeper *sk)
|
||||
|
||||
snprintf(log_prefix, sizeof(log_prefix), WP_LOG_PREFIX "sk %s:%s nwr: ", sk->host, sk->port);
|
||||
Assert(!sk->xlogreader);
|
||||
sk->xlogreader = NeonWALReaderAllocate(wal_segment_size, sk->wp->propEpochStartLsn, sk->wp, log_prefix);
|
||||
sk->xlogreader = NeonWALReaderAllocate(wal_segment_size, sk->wp->propEpochStartLsn, log_prefix);
|
||||
if (sk->xlogreader == NULL)
|
||||
wpg_log(FATAL, "failed to allocate xlog reader");
|
||||
}
|
||||
@@ -1960,8 +1894,8 @@ CombineHotStanbyFeedbacks(HotStandbyFeedback *hs, WalProposer *wp)
|
||||
static void
|
||||
walprop_pg_process_safekeeper_feedback(WalProposer *wp, Safekeeper *sk)
|
||||
{
|
||||
HotStandbyFeedback hsFeedback;
|
||||
bool needToAdvanceSlot = false;
|
||||
HotStandbyFeedback hsFeedback;
|
||||
bool needToAdvanceSlot = false;
|
||||
|
||||
if (wp->config->syncSafekeepers)
|
||||
return;
|
||||
@@ -2095,22 +2029,25 @@ GetLogRepRestartLSN(WalProposer *wp)
|
||||
return lrRestartLsn;
|
||||
}
|
||||
|
||||
void SetNeonCurrentClusterSize(uint64 size)
|
||||
void
|
||||
SetNeonCurrentClusterSize(uint64 size)
|
||||
{
|
||||
pg_atomic_write_u64(&walprop_shared->currentClusterSize, size);
|
||||
}
|
||||
|
||||
uint64 GetNeonCurrentClusterSize(void)
|
||||
uint64
|
||||
GetNeonCurrentClusterSize(void)
|
||||
{
|
||||
return pg_atomic_read_u64(&walprop_shared->currentClusterSize);
|
||||
}
|
||||
uint64 GetNeonCurrentClusterSize(void);
|
||||
uint64 GetNeonCurrentClusterSize(void);
|
||||
|
||||
|
||||
static const walproposer_api walprop_pg = {
|
||||
.get_shmem_state = walprop_pg_get_shmem_state,
|
||||
.start_streaming = walprop_pg_start_streaming,
|
||||
.get_flush_rec_ptr = walprop_pg_get_flush_rec_ptr,
|
||||
.update_donor = walprop_pg_update_donor,
|
||||
.get_current_timestamp = walprop_pg_get_current_timestamp,
|
||||
.conn_error_message = walprop_error_message,
|
||||
.conn_status = walprop_status,
|
||||
|
||||
172
pgxn/neon/walsender_hooks.c
Normal file
172
pgxn/neon/walsender_hooks.c
Normal file
@@ -0,0 +1,172 @@
|
||||
/*-------------------------------------------------------------------------
|
||||
*
|
||||
* walsender_hooks.c
|
||||
*
|
||||
* Implements XLogReaderRoutine in terms of NeonWALReader. Allows for
|
||||
* fetching WAL from safekeepers, which normal xlogreader can't do.
|
||||
*
|
||||
*-------------------------------------------------------------------------
|
||||
*/
|
||||
#include "walsender_hooks.h"
|
||||
#include "postgres.h"
|
||||
#include "fmgr.h"
|
||||
#include "access/xlogdefs.h"
|
||||
#include "replication/walsender.h"
|
||||
#include "access/xlog.h"
|
||||
#include "access/xlog_internal.h"
|
||||
#include "access/xlogreader.h"
|
||||
#include "miscadmin.h"
|
||||
#include "utils/wait_event.h"
|
||||
#include "utils/guc.h"
|
||||
#include "postmaster/interrupt.h"
|
||||
|
||||
#include "neon_walreader.h"
|
||||
#include "walproposer.h"
|
||||
|
||||
static NeonWALReader *wal_reader = NULL;
|
||||
extern XLogRecPtr WalSndWaitForWal(XLogRecPtr loc);
|
||||
extern bool GetDonorShmem(XLogRecPtr *donor_lsn);
|
||||
|
||||
static XLogRecPtr
|
||||
NeonWALReadWaitForWAL(XLogRecPtr loc)
|
||||
{
|
||||
while (!NeonWALReaderUpdateDonor(wal_reader))
|
||||
{
|
||||
pg_usleep(1000);
|
||||
CHECK_FOR_INTERRUPTS();
|
||||
}
|
||||
|
||||
return WalSndWaitForWal(loc);
|
||||
}
|
||||
|
||||
static int
|
||||
NeonWALPageRead(
|
||||
XLogReaderState *xlogreader,
|
||||
XLogRecPtr targetPagePtr,
|
||||
int reqLen,
|
||||
XLogRecPtr targetRecPtr,
|
||||
char *readBuf)
|
||||
{
|
||||
XLogRecPtr rem_lsn;
|
||||
|
||||
/* Wait for flush pointer to advance past our request */
|
||||
XLogRecPtr flushptr = NeonWALReadWaitForWAL(targetPagePtr + reqLen);
|
||||
int count;
|
||||
|
||||
if (flushptr < targetPagePtr + reqLen)
|
||||
return -1;
|
||||
|
||||
/* Read at most XLOG_BLCKSZ bytes */
|
||||
if (targetPagePtr + XLOG_BLCKSZ <= flushptr)
|
||||
count = XLOG_BLCKSZ;
|
||||
else
|
||||
count = flushptr - targetPagePtr;
|
||||
|
||||
/*
|
||||
* Sometimes walsender requests non-monotonic sequences of WAL. If that's
|
||||
* the case, we have to reset streaming from remote at the correct
|
||||
* position. For example, walsender may try to verify the segment header
|
||||
* when trying to read in the middle of it.
|
||||
*/
|
||||
rem_lsn = NeonWALReaderGetRemLsn(wal_reader);
|
||||
if (rem_lsn != InvalidXLogRecPtr && targetPagePtr != rem_lsn)
|
||||
{
|
||||
NeonWALReaderResetRemote(wal_reader);
|
||||
}
|
||||
|
||||
for (;;)
|
||||
{
|
||||
NeonWALReadResult res = NeonWALRead(
|
||||
wal_reader,
|
||||
readBuf,
|
||||
targetPagePtr,
|
||||
count,
|
||||
walprop_pg_get_timeline_id());
|
||||
|
||||
if (res == NEON_WALREAD_SUCCESS)
|
||||
{
|
||||
/*
|
||||
* Setting ws_tli is required by the XLogReaderRoutine, it is used
|
||||
* for segment name generation in error reports.
|
||||
*
|
||||
* ReadPageInternal updates ws_segno after calling cb on its own
|
||||
* and XLogReaderRoutine description doesn't require it, but
|
||||
* WALRead sets, let's follow it.
|
||||
*/
|
||||
xlogreader->seg.ws_tli = NeonWALReaderGetSegment(wal_reader)->ws_tli;
|
||||
xlogreader->seg.ws_segno = NeonWALReaderGetSegment(wal_reader)->ws_segno;
|
||||
|
||||
/*
|
||||
* ws_file doesn't exist in case of remote read, and isn't used by
|
||||
* xlogreader except by WALRead on which we don't rely anyway.
|
||||
*/
|
||||
return count;
|
||||
}
|
||||
if (res == NEON_WALREAD_ERROR)
|
||||
{
|
||||
elog(ERROR, "[walsender] Failed to read WAL (req_lsn=%X/%X, len=%d): %s",
|
||||
LSN_FORMAT_ARGS(targetPagePtr),
|
||||
reqLen,
|
||||
NeonWALReaderErrMsg(wal_reader));
|
||||
return -1;
|
||||
}
|
||||
|
||||
/*
|
||||
* Res is WOULDBLOCK, so we wait on the socket, recreating event set
|
||||
* if necessary
|
||||
*/
|
||||
{
|
||||
|
||||
pgsocket sock = NeonWALReaderSocket(wal_reader);
|
||||
uint32_t reader_events = NeonWALReaderEvents(wal_reader);
|
||||
long timeout_ms = 1000;
|
||||
|
||||
ResetLatch(MyLatch);
|
||||
CHECK_FOR_INTERRUPTS();
|
||||
if (ConfigReloadPending)
|
||||
{
|
||||
ConfigReloadPending = false;
|
||||
ProcessConfigFile(PGC_SIGHUP);
|
||||
}
|
||||
|
||||
WaitLatchOrSocket(
|
||||
MyLatch,
|
||||
WL_LATCH_SET | WL_EXIT_ON_PM_DEATH | reader_events,
|
||||
sock,
|
||||
timeout_ms,
|
||||
WAIT_EVENT_WAL_SENDER_MAIN);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
static void
|
||||
NeonWALReadSegmentOpen(XLogReaderState *xlogreader, XLogSegNo nextSegNo, TimeLineID *tli_p)
|
||||
{
|
||||
neon_wal_segment_open(wal_reader, nextSegNo, tli_p);
|
||||
xlogreader->seg.ws_file = NeonWALReaderGetSegment(wal_reader)->ws_file;
|
||||
}
|
||||
|
||||
static void
|
||||
NeonWALReadSegmentClose(XLogReaderState *xlogreader)
|
||||
{
|
||||
neon_wal_segment_close(wal_reader);
|
||||
xlogreader->seg.ws_file = NeonWALReaderGetSegment(wal_reader)->ws_file;
|
||||
}
|
||||
|
||||
void
|
||||
NeonOnDemandXLogReaderRoutines(XLogReaderRoutine *xlr)
|
||||
{
|
||||
if (!wal_reader)
|
||||
{
|
||||
XLogRecPtr epochStartLsn = pg_atomic_read_u64(&GetWalpropShmemState()->propEpochStartLsn);
|
||||
|
||||
if (epochStartLsn == 0)
|
||||
{
|
||||
elog(ERROR, "Unable to start walsender when propEpochStartLsn is 0!");
|
||||
}
|
||||
wal_reader = NeonWALReaderAllocate(wal_segment_size, epochStartLsn, "[walsender] ");
|
||||
}
|
||||
xlr->page_read = NeonWALPageRead;
|
||||
xlr->segment_open = NeonWALReadSegmentOpen;
|
||||
xlr->segment_close = NeonWALReadSegmentClose;
|
||||
}
|
||||
7
pgxn/neon/walsender_hooks.h
Normal file
7
pgxn/neon/walsender_hooks.h
Normal file
@@ -0,0 +1,7 @@
|
||||
#ifndef __WALSENDER_HOOKS_H__
|
||||
#define __WALSENDER_HOOKS_H__
|
||||
|
||||
struct XLogReaderRoutine;
|
||||
void NeonOnDemandXLogReaderRoutines(struct XLogReaderRoutine *xlr);
|
||||
|
||||
#endif
|
||||
@@ -506,6 +506,8 @@ struct WalSender<'a, IO> {
|
||||
send_buf: [u8; MAX_SEND_SIZE],
|
||||
}
|
||||
|
||||
const POLL_STATE_TIMEOUT: Duration = Duration::from_secs(1);
|
||||
|
||||
impl<IO: AsyncRead + AsyncWrite + Unpin> WalSender<'_, IO> {
|
||||
/// Send WAL until
|
||||
/// - an error occurs
|
||||
@@ -584,14 +586,22 @@ impl<IO: AsyncRead + AsyncWrite + Unpin> WalSender<'_, IO> {
|
||||
async fn wait_wal(&mut self) -> Result<(), CopyStreamHandlerEnd> {
|
||||
loop {
|
||||
self.end_pos = self.end_watch.get();
|
||||
if self.end_pos > self.start_pos {
|
||||
// We have something to send.
|
||||
let have_something_to_send = (|| {
|
||||
fail::fail_point!(
|
||||
"sk-pause-send",
|
||||
self.appname.as_deref() != Some("pageserver"),
|
||||
|_| { false }
|
||||
);
|
||||
self.end_pos > self.start_pos
|
||||
})();
|
||||
|
||||
if have_something_to_send {
|
||||
trace!("got end_pos {:?}, streaming", self.end_pos);
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
// Wait for WAL to appear, now self.end_pos == self.start_pos.
|
||||
if let Some(lsn) = wait_for_lsn(&mut self.end_watch, self.term, self.start_pos).await? {
|
||||
if let Some(lsn) = self.wait_for_lsn().await? {
|
||||
self.end_pos = lsn;
|
||||
trace!("got end_pos {:?}, streaming", self.end_pos);
|
||||
return Ok(());
|
||||
@@ -628,6 +638,54 @@ impl<IO: AsyncRead + AsyncWrite + Unpin> WalSender<'_, IO> {
|
||||
.await?;
|
||||
}
|
||||
}
|
||||
|
||||
/// Wait until we have available WAL > start_pos or timeout expires. Returns
|
||||
/// - Ok(Some(end_pos)) if needed lsn is successfully observed;
|
||||
/// - Ok(None) if timeout expired;
|
||||
/// - Err in case of error -- only if 1) term changed while fetching in recovery
|
||||
/// mode 2) watch channel closed, which must never happen.
|
||||
async fn wait_for_lsn(&mut self) -> anyhow::Result<Option<Lsn>> {
|
||||
let fp = (|| {
|
||||
fail::fail_point!(
|
||||
"sk-pause-send",
|
||||
self.appname.as_deref() != Some("pageserver"),
|
||||
|_| { true }
|
||||
);
|
||||
false
|
||||
})();
|
||||
if fp {
|
||||
tokio::time::sleep(POLL_STATE_TIMEOUT).await;
|
||||
return Ok(None);
|
||||
}
|
||||
|
||||
let res = timeout(POLL_STATE_TIMEOUT, async move {
|
||||
loop {
|
||||
let end_pos = self.end_watch.get();
|
||||
if end_pos > self.start_pos {
|
||||
return Ok(end_pos);
|
||||
}
|
||||
if let EndWatch::Flush(rx) = &self.end_watch {
|
||||
let curr_term = rx.borrow().term;
|
||||
if let Some(client_term) = self.term {
|
||||
if curr_term != client_term {
|
||||
bail!("term changed: requested {}, now {}", client_term, curr_term);
|
||||
}
|
||||
}
|
||||
}
|
||||
self.end_watch.changed().await?;
|
||||
}
|
||||
})
|
||||
.await;
|
||||
|
||||
match res {
|
||||
// success
|
||||
Ok(Ok(commit_lsn)) => Ok(Some(commit_lsn)),
|
||||
// error inside closure
|
||||
Ok(Err(err)) => Err(err),
|
||||
// timeout
|
||||
Err(_) => Ok(None),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// A half driving receiving replies.
|
||||
@@ -685,47 +743,6 @@ impl<IO: AsyncRead + AsyncWrite + Unpin> ReplyReader<IO> {
|
||||
}
|
||||
}
|
||||
|
||||
const POLL_STATE_TIMEOUT: Duration = Duration::from_secs(1);
|
||||
|
||||
/// Wait until we have available WAL > start_pos or timeout expires. Returns
|
||||
/// - Ok(Some(end_pos)) if needed lsn is successfully observed;
|
||||
/// - Ok(None) if timeout expired;
|
||||
/// - Err in case of error -- only if 1) term changed while fetching in recovery
|
||||
/// mode 2) watch channel closed, which must never happen.
|
||||
async fn wait_for_lsn(
|
||||
rx: &mut EndWatch,
|
||||
client_term: Option<Term>,
|
||||
start_pos: Lsn,
|
||||
) -> anyhow::Result<Option<Lsn>> {
|
||||
let res = timeout(POLL_STATE_TIMEOUT, async move {
|
||||
loop {
|
||||
let end_pos = rx.get();
|
||||
if end_pos > start_pos {
|
||||
return Ok(end_pos);
|
||||
}
|
||||
if let EndWatch::Flush(rx) = rx {
|
||||
let curr_term = rx.borrow().term;
|
||||
if let Some(client_term) = client_term {
|
||||
if curr_term != client_term {
|
||||
bail!("term changed: requested {}, now {}", client_term, curr_term);
|
||||
}
|
||||
}
|
||||
}
|
||||
rx.changed().await?;
|
||||
}
|
||||
})
|
||||
.await;
|
||||
|
||||
match res {
|
||||
// success
|
||||
Ok(Ok(commit_lsn)) => Ok(Some(commit_lsn)),
|
||||
// error inside closure
|
||||
Ok(Err(err)) => Err(err),
|
||||
// timeout
|
||||
Err(_) => Ok(None),
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use utils::id::{TenantId, TimelineId};
|
||||
|
||||
@@ -17,8 +17,7 @@ use utils::lsn::Lsn;
|
||||
use walproposer::{
|
||||
api_bindings::Level,
|
||||
bindings::{
|
||||
pg_atomic_uint64, NeonWALReadResult, PageserverFeedback, SafekeeperStateDesiredEvents,
|
||||
WL_SOCKET_READABLE, WL_SOCKET_WRITEABLE,
|
||||
NeonWALReadResult, SafekeeperStateDesiredEvents, WL_SOCKET_READABLE, WL_SOCKET_WRITEABLE,
|
||||
},
|
||||
walproposer::{ApiImpl, Config},
|
||||
};
|
||||
@@ -224,31 +223,13 @@ impl SimulationApi {
|
||||
})
|
||||
.collect::<Vec<_>>();
|
||||
|
||||
let empty_feedback = PageserverFeedback {
|
||||
present: false,
|
||||
currentClusterSize: 0,
|
||||
last_received_lsn: 0,
|
||||
disk_consistent_lsn: 0,
|
||||
remote_consistent_lsn: 0,
|
||||
replytime: 0,
|
||||
shard_number: 0,
|
||||
};
|
||||
|
||||
Self {
|
||||
os: args.os,
|
||||
safekeepers: RefCell::new(sk_conns),
|
||||
disk: args.disk,
|
||||
redo_start_lsn: args.redo_start_lsn,
|
||||
last_logged_commit_lsn: 0,
|
||||
shmem: UnsafeCell::new(walproposer::bindings::WalproposerShmemState {
|
||||
mutex: 0,
|
||||
mineLastElectedTerm: 0,
|
||||
backpressureThrottlingTime: pg_atomic_uint64 { value: 0 },
|
||||
currentClusterSize: pg_atomic_uint64 { value: 0 },
|
||||
shard_ps_feedback: [empty_feedback; 128],
|
||||
num_shards: 0,
|
||||
min_ps_feedback: empty_feedback,
|
||||
}),
|
||||
shmem: UnsafeCell::new(walproposer::api_bindings::empty_shmem()),
|
||||
config: args.config,
|
||||
event_set: RefCell::new(None),
|
||||
}
|
||||
@@ -274,6 +255,12 @@ impl ApiImpl for SimulationApi {
|
||||
self.os.now() as i64 * 1000
|
||||
}
|
||||
|
||||
fn update_donor(&self, donor: &mut walproposer::bindings::Safekeeper, donor_lsn: u64) {
|
||||
let mut shmem = unsafe { *self.get_shmem_state() };
|
||||
shmem.propEpochStartLsn.value = donor_lsn;
|
||||
shmem.donor_conninfo = donor.conninfo;
|
||||
}
|
||||
|
||||
fn conn_status(
|
||||
&self,
|
||||
_: &mut walproposer::bindings::Safekeeper,
|
||||
|
||||
@@ -7,6 +7,7 @@ import pytest
|
||||
from fixtures.log_helper import log
|
||||
from fixtures.neon_fixtures import (
|
||||
NeonEnv,
|
||||
NeonEnvBuilder,
|
||||
logical_replication_sync,
|
||||
wait_for_last_flush_lsn,
|
||||
)
|
||||
@@ -203,6 +204,81 @@ def test_obsolete_slot_drop(neon_simple_env: NeonEnv, vanilla_pg):
|
||||
wait_until(number_of_iterations=10, interval=2, func=partial(slot_removed, endpoint))
|
||||
|
||||
|
||||
# Tests that walsender correctly blocks until WAL is downloaded from safekeepers
|
||||
def test_lr_with_slow_safekeeper(neon_env_builder: NeonEnvBuilder, vanilla_pg):
|
||||
neon_env_builder.num_safekeepers = 3
|
||||
env = neon_env_builder.init_start()
|
||||
|
||||
env.neon_cli.create_branch("init")
|
||||
endpoint = env.endpoints.create_start("init")
|
||||
|
||||
with endpoint.connect().cursor() as cur:
|
||||
cur.execute("create table wal_generator (id serial primary key, data text)")
|
||||
cur.execute(
|
||||
"""
|
||||
INSERT INTO wal_generator (data)
|
||||
SELECT repeat('A', 1024) -- Generates a kilobyte of data per row
|
||||
FROM generate_series(1, 16384) AS seq; -- Inserts enough rows to exceed 16MB of data
|
||||
"""
|
||||
)
|
||||
cur.execute("create table t(a int)")
|
||||
cur.execute("create publication pub for table t")
|
||||
cur.execute("insert into t values (1)")
|
||||
|
||||
vanilla_pg.start()
|
||||
vanilla_pg.safe_psql("create table t(a int)")
|
||||
connstr = endpoint.connstr().replace("'", "''")
|
||||
vanilla_pg.safe_psql(f"create subscription sub1 connection '{connstr}' publication pub")
|
||||
logical_replication_sync(vanilla_pg, endpoint)
|
||||
vanilla_pg.stop()
|
||||
|
||||
# Pause the safekeepers so that they can't send WAL (except to pageserver)
|
||||
for sk in env.safekeepers:
|
||||
sk_http = sk.http_client()
|
||||
sk_http.configure_failpoints([("sk-pause-send", "return")])
|
||||
|
||||
# Insert a 2
|
||||
with endpoint.connect().cursor() as cur:
|
||||
cur.execute("insert into t values (2)")
|
||||
|
||||
endpoint.stop_and_destroy()
|
||||
|
||||
# This new endpoint should contain [1, 2], but it can't access WAL from safekeeper
|
||||
endpoint = env.endpoints.create_start("init")
|
||||
with endpoint.connect().cursor() as cur:
|
||||
cur.execute("select * from t")
|
||||
res = [r[0] for r in cur.fetchall()]
|
||||
assert res == [1, 2]
|
||||
|
||||
# Reconnect subscriber
|
||||
vanilla_pg.start()
|
||||
connstr = endpoint.connstr().replace("'", "''")
|
||||
vanilla_pg.safe_psql(f"alter subscription sub1 connection '{connstr}'")
|
||||
|
||||
time.sleep(5)
|
||||
# Make sure the 2 isn't replicated
|
||||
assert [r[0] for r in vanilla_pg.safe_psql("select * from t")] == [1]
|
||||
|
||||
# Re-enable WAL download
|
||||
for sk in env.safekeepers:
|
||||
sk_http = sk.http_client()
|
||||
sk_http.configure_failpoints([("sk-pause-send", "off")])
|
||||
|
||||
logical_replication_sync(vanilla_pg, endpoint)
|
||||
assert [r[0] for r in vanilla_pg.safe_psql("select * from t")] == [1, 2]
|
||||
|
||||
# Check that local reads also work
|
||||
with endpoint.connect().cursor() as cur:
|
||||
cur.execute("insert into t values (3)")
|
||||
logical_replication_sync(vanilla_pg, endpoint)
|
||||
assert [r[0] for r in vanilla_pg.safe_psql("select * from t")] == [1, 2, 3]
|
||||
|
||||
log_path = vanilla_pg.pgdatadir / "pg.log"
|
||||
with open(log_path, "r") as log_file:
|
||||
logs = log_file.read()
|
||||
assert "could not receive data from WAL stream" not in logs
|
||||
|
||||
|
||||
# Test compute start at LSN page of which starts with contrecord
|
||||
# https://github.com/neondatabase/neon/issues/5749
|
||||
def test_wal_page_boundary_start(neon_simple_env: NeonEnv, vanilla_pg):
|
||||
|
||||
Reference in New Issue
Block a user