mirror of
https://github.com/neondatabase/neon.git
synced 2026-05-19 22:20:37 +00:00
On-demand WAL download for walsender (#6872)
## Problem There's allegedly a bug where if we connect a subscriber before WAL is downloaded from the safekeeper, it creates an error. ## Summary of changes Adds support for pausing safekeepers from sending WAL to computes, and then creates a compute and attaches a subscriber while it's in this paused state. Fails to reproduce the issue, but probably a good test to have --------- Co-authored-by: Arseny Sher <sher-ars@yandex.ru>
This commit is contained in:
@@ -50,6 +50,14 @@ extern "C" fn get_flush_rec_ptr(wp: *mut WalProposer) -> XLogRecPtr {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
extern "C" fn update_donor(wp: *mut WalProposer, donor: *mut Safekeeper, donor_lsn: XLogRecPtr) {
|
||||||
|
unsafe {
|
||||||
|
let callback_data = (*(*wp).config).callback_data;
|
||||||
|
let api = callback_data as *mut Box<dyn ApiImpl>;
|
||||||
|
(*api).update_donor(&mut (*donor), donor_lsn)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
extern "C" fn get_current_timestamp(wp: *mut WalProposer) -> TimestampTz {
|
extern "C" fn get_current_timestamp(wp: *mut WalProposer) -> TimestampTz {
|
||||||
unsafe {
|
unsafe {
|
||||||
let callback_data = (*(*wp).config).callback_data;
|
let callback_data = (*(*wp).config).callback_data;
|
||||||
@@ -391,6 +399,7 @@ pub(crate) fn create_api() -> walproposer_api {
|
|||||||
get_shmem_state: Some(get_shmem_state),
|
get_shmem_state: Some(get_shmem_state),
|
||||||
start_streaming: Some(start_streaming),
|
start_streaming: Some(start_streaming),
|
||||||
get_flush_rec_ptr: Some(get_flush_rec_ptr),
|
get_flush_rec_ptr: Some(get_flush_rec_ptr),
|
||||||
|
update_donor: Some(update_donor),
|
||||||
get_current_timestamp: Some(get_current_timestamp),
|
get_current_timestamp: Some(get_current_timestamp),
|
||||||
conn_error_message: Some(conn_error_message),
|
conn_error_message: Some(conn_error_message),
|
||||||
conn_status: Some(conn_status),
|
conn_status: Some(conn_status),
|
||||||
@@ -421,6 +430,32 @@ pub(crate) fn create_api() -> walproposer_api {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pub fn empty_shmem() -> crate::bindings::WalproposerShmemState {
|
||||||
|
let empty_feedback = crate::bindings::PageserverFeedback {
|
||||||
|
present: false,
|
||||||
|
currentClusterSize: 0,
|
||||||
|
last_received_lsn: 0,
|
||||||
|
disk_consistent_lsn: 0,
|
||||||
|
remote_consistent_lsn: 0,
|
||||||
|
replytime: 0,
|
||||||
|
shard_number: 0,
|
||||||
|
};
|
||||||
|
|
||||||
|
crate::bindings::WalproposerShmemState {
|
||||||
|
propEpochStartLsn: crate::bindings::pg_atomic_uint64 { value: 0 },
|
||||||
|
donor_name: [0; 64],
|
||||||
|
donor_conninfo: [0; 1024],
|
||||||
|
donor_lsn: 0,
|
||||||
|
mutex: 0,
|
||||||
|
mineLastElectedTerm: crate::bindings::pg_atomic_uint64 { value: 0 },
|
||||||
|
backpressureThrottlingTime: crate::bindings::pg_atomic_uint64 { value: 0 },
|
||||||
|
currentClusterSize: crate::bindings::pg_atomic_uint64 { value: 0 },
|
||||||
|
shard_ps_feedback: [empty_feedback; 128],
|
||||||
|
num_shards: 0,
|
||||||
|
min_ps_feedback: empty_feedback,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
impl std::fmt::Display for Level {
|
impl std::fmt::Display for Level {
|
||||||
fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
|
fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
|
||||||
write!(f, "{:?}", self)
|
write!(f, "{:?}", self)
|
||||||
|
|||||||
@@ -1,8 +1,5 @@
|
|||||||
use std::ffi::CString;
|
use std::ffi::CString;
|
||||||
|
|
||||||
use postgres_ffi::WAL_SEGMENT_SIZE;
|
|
||||||
use utils::{id::TenantTimelineId, lsn::Lsn};
|
|
||||||
|
|
||||||
use crate::{
|
use crate::{
|
||||||
api_bindings::{create_api, take_vec_u8, Level},
|
api_bindings::{create_api, take_vec_u8, Level},
|
||||||
bindings::{
|
bindings::{
|
||||||
@@ -10,6 +7,8 @@ use crate::{
|
|||||||
WalProposerCreate, WalProposerFree, WalProposerPoll, WalProposerStart,
|
WalProposerCreate, WalProposerFree, WalProposerPoll, WalProposerStart,
|
||||||
},
|
},
|
||||||
};
|
};
|
||||||
|
use postgres_ffi::WAL_SEGMENT_SIZE;
|
||||||
|
use utils::{id::TenantTimelineId, lsn::Lsn};
|
||||||
|
|
||||||
/// Rust high-level wrapper for C walproposer API. Many methods are not required
|
/// Rust high-level wrapper for C walproposer API. Many methods are not required
|
||||||
/// for simple cases, hence todo!() in default implementations.
|
/// for simple cases, hence todo!() in default implementations.
|
||||||
@@ -28,6 +27,10 @@ pub trait ApiImpl {
|
|||||||
todo!()
|
todo!()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
fn update_donor(&self, _donor: &mut Safekeeper, _donor_lsn: u64) {
|
||||||
|
todo!()
|
||||||
|
}
|
||||||
|
|
||||||
fn get_current_timestamp(&self) -> i64 {
|
fn get_current_timestamp(&self) -> i64 {
|
||||||
todo!()
|
todo!()
|
||||||
}
|
}
|
||||||
@@ -274,6 +277,7 @@ mod tests {
|
|||||||
sync::{atomic::AtomicUsize, mpsc::sync_channel},
|
sync::{atomic::AtomicUsize, mpsc::sync_channel},
|
||||||
};
|
};
|
||||||
|
|
||||||
|
use std::cell::UnsafeCell;
|
||||||
use utils::id::TenantTimelineId;
|
use utils::id::TenantTimelineId;
|
||||||
|
|
||||||
use crate::{api_bindings::Level, bindings::NeonWALReadResult, walproposer::Wrapper};
|
use crate::{api_bindings::Level, bindings::NeonWALReadResult, walproposer::Wrapper};
|
||||||
@@ -297,6 +301,8 @@ mod tests {
|
|||||||
replies_ptr: AtomicUsize,
|
replies_ptr: AtomicUsize,
|
||||||
// channel to send LSN to the main thread
|
// channel to send LSN to the main thread
|
||||||
sync_channel: std::sync::mpsc::SyncSender<u64>,
|
sync_channel: std::sync::mpsc::SyncSender<u64>,
|
||||||
|
// Shmem state, used for storing donor info
|
||||||
|
shmem: UnsafeCell<crate::bindings::WalproposerShmemState>,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl MockImpl {
|
impl MockImpl {
|
||||||
@@ -327,11 +333,22 @@ mod tests {
|
|||||||
}
|
}
|
||||||
|
|
||||||
impl ApiImpl for MockImpl {
|
impl ApiImpl for MockImpl {
|
||||||
|
fn get_shmem_state(&self) -> *mut crate::bindings::WalproposerShmemState {
|
||||||
|
self.shmem.get()
|
||||||
|
}
|
||||||
|
|
||||||
fn get_current_timestamp(&self) -> i64 {
|
fn get_current_timestamp(&self) -> i64 {
|
||||||
println!("get_current_timestamp");
|
println!("get_current_timestamp");
|
||||||
0
|
0
|
||||||
}
|
}
|
||||||
|
|
||||||
|
fn update_donor(&self, donor: &mut crate::bindings::Safekeeper, donor_lsn: u64) {
|
||||||
|
let mut shmem = unsafe { *self.get_shmem_state() };
|
||||||
|
shmem.propEpochStartLsn.value = donor_lsn;
|
||||||
|
shmem.donor_conninfo = donor.conninfo;
|
||||||
|
shmem.donor_lsn = donor_lsn;
|
||||||
|
}
|
||||||
|
|
||||||
fn conn_status(
|
fn conn_status(
|
||||||
&self,
|
&self,
|
||||||
_: &mut crate::bindings::Safekeeper,
|
_: &mut crate::bindings::Safekeeper,
|
||||||
@@ -507,6 +524,7 @@ mod tests {
|
|||||||
],
|
],
|
||||||
replies_ptr: AtomicUsize::new(0),
|
replies_ptr: AtomicUsize::new(0),
|
||||||
sync_channel: sender,
|
sync_channel: sender,
|
||||||
|
shmem: UnsafeCell::new(crate::api_bindings::empty_shmem()),
|
||||||
});
|
});
|
||||||
let config = crate::walproposer::Config {
|
let config = crate::walproposer::Config {
|
||||||
ttid,
|
ttid,
|
||||||
|
|||||||
@@ -14,7 +14,8 @@ OBJS = \
|
|||||||
relsize_cache.o \
|
relsize_cache.o \
|
||||||
walproposer.o \
|
walproposer.o \
|
||||||
walproposer_pg.o \
|
walproposer_pg.o \
|
||||||
control_plane_connector.o
|
control_plane_connector.o \
|
||||||
|
walsender_hooks.o
|
||||||
|
|
||||||
PG_CPPFLAGS = -I$(libpq_srcdir)
|
PG_CPPFLAGS = -I$(libpq_srcdir)
|
||||||
SHLIB_LINK_INTERNAL = $(libpq)
|
SHLIB_LINK_INTERNAL = $(libpq)
|
||||||
|
|||||||
@@ -34,6 +34,7 @@
|
|||||||
#include "walproposer.h"
|
#include "walproposer.h"
|
||||||
#include "pagestore_client.h"
|
#include "pagestore_client.h"
|
||||||
#include "control_plane_connector.h"
|
#include "control_plane_connector.h"
|
||||||
|
#include "walsender_hooks.h"
|
||||||
|
|
||||||
PG_MODULE_MAGIC;
|
PG_MODULE_MAGIC;
|
||||||
void _PG_init(void);
|
void _PG_init(void);
|
||||||
@@ -265,7 +266,6 @@ LogicalSlotsMonitorMain(Datum main_arg)
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
void
|
void
|
||||||
_PG_init(void)
|
_PG_init(void)
|
||||||
{
|
{
|
||||||
@@ -279,6 +279,7 @@ _PG_init(void)
|
|||||||
|
|
||||||
pg_init_libpagestore();
|
pg_init_libpagestore();
|
||||||
pg_init_walproposer();
|
pg_init_walproposer();
|
||||||
|
WalSender_Custom_XLogReaderRoutines = NeonOnDemandXLogReaderRoutines;
|
||||||
|
|
||||||
InitLogicalReplicationMonitor();
|
InitLogicalReplicationMonitor();
|
||||||
|
|
||||||
|
|||||||
@@ -36,10 +36,7 @@
|
|||||||
|
|
||||||
static NeonWALReadResult NeonWALReadRemote(NeonWALReader *state, char *buf, XLogRecPtr startptr, Size count, TimeLineID tli);
|
static NeonWALReadResult NeonWALReadRemote(NeonWALReader *state, char *buf, XLogRecPtr startptr, Size count, TimeLineID tli);
|
||||||
static NeonWALReadResult NeonWALReaderReadMsg(NeonWALReader *state);
|
static NeonWALReadResult NeonWALReaderReadMsg(NeonWALReader *state);
|
||||||
static void NeonWALReaderResetRemote(NeonWALReader *state);
|
|
||||||
static bool NeonWALReadLocal(NeonWALReader *state, char *buf, XLogRecPtr startptr, Size count, TimeLineID tli);
|
static bool NeonWALReadLocal(NeonWALReader *state, char *buf, XLogRecPtr startptr, Size count, TimeLineID tli);
|
||||||
static bool neon_wal_segment_open(NeonWALReader *state, XLogSegNo nextSegNo, TimeLineID *tli_p);
|
|
||||||
static void neon_wal_segment_close(NeonWALReader *state);
|
|
||||||
static bool is_wal_segment_exists(XLogSegNo segno, int segsize,
|
static bool is_wal_segment_exists(XLogSegNo segno, int segsize,
|
||||||
TimeLineID tli);
|
TimeLineID tli);
|
||||||
|
|
||||||
@@ -82,8 +79,9 @@ struct NeonWALReader
|
|||||||
XLogRecPtr req_lsn;
|
XLogRecPtr req_lsn;
|
||||||
Size req_len;
|
Size req_len;
|
||||||
Size req_progress;
|
Size req_progress;
|
||||||
WalProposer *wp; /* we learn donor through walproposer */
|
char donor_conninfo[MAXCONNINFO];
|
||||||
char donor_name[64]; /* saved donor safekeeper name for logging */
|
char donor_name[64]; /* saved donor safekeeper name for logging */
|
||||||
|
XLogRecPtr donor_lsn;
|
||||||
/* state of connection to safekeeper */
|
/* state of connection to safekeeper */
|
||||||
NeonWALReaderRemoteState rem_state;
|
NeonWALReaderRemoteState rem_state;
|
||||||
WalProposerConn *wp_conn;
|
WalProposerConn *wp_conn;
|
||||||
@@ -107,7 +105,7 @@ struct NeonWALReader
|
|||||||
|
|
||||||
/* palloc and initialize NeonWALReader */
|
/* palloc and initialize NeonWALReader */
|
||||||
NeonWALReader *
|
NeonWALReader *
|
||||||
NeonWALReaderAllocate(int wal_segment_size, XLogRecPtr available_lsn, WalProposer *wp, char *log_prefix)
|
NeonWALReaderAllocate(int wal_segment_size, XLogRecPtr available_lsn, char *log_prefix)
|
||||||
{
|
{
|
||||||
NeonWALReader *reader;
|
NeonWALReader *reader;
|
||||||
|
|
||||||
@@ -123,8 +121,6 @@ NeonWALReaderAllocate(int wal_segment_size, XLogRecPtr available_lsn, WalPropose
|
|||||||
reader->seg.ws_tli = 0;
|
reader->seg.ws_tli = 0;
|
||||||
reader->segcxt.ws_segsize = wal_segment_size;
|
reader->segcxt.ws_segsize = wal_segment_size;
|
||||||
|
|
||||||
reader->wp = wp;
|
|
||||||
|
|
||||||
reader->rem_state = RS_NONE;
|
reader->rem_state = RS_NONE;
|
||||||
|
|
||||||
if (log_prefix)
|
if (log_prefix)
|
||||||
@@ -204,21 +200,16 @@ NeonWALReadRemote(NeonWALReader *state, char *buf, XLogRecPtr startptr, Size cou
|
|||||||
{
|
{
|
||||||
if (state->rem_state == RS_NONE)
|
if (state->rem_state == RS_NONE)
|
||||||
{
|
{
|
||||||
XLogRecPtr donor_lsn;
|
if (!NeonWALReaderUpdateDonor(state))
|
||||||
|
|
||||||
/* no connection yet; start one */
|
|
||||||
Safekeeper *donor = GetDonor(state->wp, &donor_lsn);
|
|
||||||
|
|
||||||
if (donor == NULL)
|
|
||||||
{
|
{
|
||||||
snprintf(state->err_msg, sizeof(state->err_msg),
|
snprintf(state->err_msg, sizeof(state->err_msg),
|
||||||
"failed to establish remote connection to fetch WAL: no donor available");
|
"failed to establish remote connection to fetch WAL: no donor available");
|
||||||
return NEON_WALREAD_ERROR;
|
return NEON_WALREAD_ERROR;
|
||||||
|
|
||||||
}
|
}
|
||||||
snprintf(state->donor_name, sizeof(state->donor_name), "%s:%s", donor->host, donor->port);
|
/* no connection yet; start one */
|
||||||
nwr_log(LOG, "establishing connection to %s, flush_lsn %X/%X to fetch WAL",
|
nwr_log(LOG, "establishing connection to %s, lsn=%X/%X to fetch WAL", state->donor_name, LSN_FORMAT_ARGS(state->donor_lsn));
|
||||||
state->donor_name, LSN_FORMAT_ARGS(donor_lsn));
|
state->wp_conn = libpqwp_connect_start(state->donor_conninfo);
|
||||||
state->wp_conn = libpqwp_connect_start(donor->conninfo);
|
|
||||||
if (PQstatus(state->wp_conn->pg_conn) == CONNECTION_BAD)
|
if (PQstatus(state->wp_conn->pg_conn) == CONNECTION_BAD)
|
||||||
{
|
{
|
||||||
snprintf(state->err_msg, sizeof(state->err_msg),
|
snprintf(state->err_msg, sizeof(state->err_msg),
|
||||||
@@ -251,10 +242,22 @@ NeonWALReadRemote(NeonWALReader *state, char *buf, XLogRecPtr startptr, Size cou
|
|||||||
{
|
{
|
||||||
/* connection successfully established */
|
/* connection successfully established */
|
||||||
char start_repl_query[128];
|
char start_repl_query[128];
|
||||||
|
term_t term = pg_atomic_read_u64(&GetWalpropShmemState()->mineLastElectedTerm);
|
||||||
|
|
||||||
|
/*
|
||||||
|
* Set elected walproposer's term to pull only data from
|
||||||
|
* its history. Note: for logical walsender it means we
|
||||||
|
* might stream WAL not yet committed by safekeepers. It
|
||||||
|
* would be cleaner to fix this.
|
||||||
|
*
|
||||||
|
* mineLastElectedTerm shouldn't be 0 at this point
|
||||||
|
* because we checked above that donor exists and it
|
||||||
|
* appears only after successfull election.
|
||||||
|
*/
|
||||||
|
Assert(term > 0);
|
||||||
snprintf(start_repl_query, sizeof(start_repl_query),
|
snprintf(start_repl_query, sizeof(start_repl_query),
|
||||||
"START_REPLICATION PHYSICAL %X/%X (term='" UINT64_FORMAT "')",
|
"START_REPLICATION PHYSICAL %X/%X (term='" UINT64_FORMAT "')",
|
||||||
LSN_FORMAT_ARGS(startptr), state->wp->propTerm);
|
LSN_FORMAT_ARGS(startptr), term);
|
||||||
nwr_log(LOG, "connection to %s to fetch WAL succeeded, running %s",
|
nwr_log(LOG, "connection to %s to fetch WAL succeeded, running %s",
|
||||||
state->donor_name, start_repl_query);
|
state->donor_name, start_repl_query);
|
||||||
if (!libpqwp_send_query(state->wp_conn, start_repl_query))
|
if (!libpqwp_send_query(state->wp_conn, start_repl_query))
|
||||||
@@ -404,6 +407,10 @@ NeonWALReadRemote(NeonWALReader *state, char *buf, XLogRecPtr startptr, Size cou
|
|||||||
state->req_lsn = InvalidXLogRecPtr;
|
state->req_lsn = InvalidXLogRecPtr;
|
||||||
state->req_len = 0;
|
state->req_len = 0;
|
||||||
state->req_progress = 0;
|
state->req_progress = 0;
|
||||||
|
|
||||||
|
/* Update the current segment info. */
|
||||||
|
state->seg.ws_tli = tli;
|
||||||
|
|
||||||
return NEON_WALREAD_SUCCESS;
|
return NEON_WALREAD_SUCCESS;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -526,7 +533,7 @@ err:
|
|||||||
}
|
}
|
||||||
|
|
||||||
/* reset remote connection and request in progress */
|
/* reset remote connection and request in progress */
|
||||||
static void
|
void
|
||||||
NeonWALReaderResetRemote(NeonWALReader *state)
|
NeonWALReaderResetRemote(NeonWALReader *state)
|
||||||
{
|
{
|
||||||
state->req_lsn = InvalidXLogRecPtr;
|
state->req_lsn = InvalidXLogRecPtr;
|
||||||
@@ -691,13 +698,25 @@ NeonWALReadLocal(NeonWALReader *state, char *buf, XLogRecPtr startptr, Size coun
|
|||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
XLogRecPtr
|
||||||
|
NeonWALReaderGetRemLsn(NeonWALReader *state)
|
||||||
|
{
|
||||||
|
return state->rem_lsn;
|
||||||
|
}
|
||||||
|
|
||||||
|
const WALOpenSegment *
|
||||||
|
NeonWALReaderGetSegment(NeonWALReader *state)
|
||||||
|
{
|
||||||
|
return &state->seg;
|
||||||
|
}
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* Copy of vanilla wal_segment_open, but returns false in case of error instead
|
* Copy of vanilla wal_segment_open, but returns false in case of error instead
|
||||||
* of ERROR, with errno set.
|
* of ERROR, with errno set.
|
||||||
*
|
*
|
||||||
* XLogReaderRoutine->segment_open callback for local pg_wal files
|
* XLogReaderRoutine->segment_open callback for local pg_wal files
|
||||||
*/
|
*/
|
||||||
static bool
|
bool
|
||||||
neon_wal_segment_open(NeonWALReader *state, XLogSegNo nextSegNo,
|
neon_wal_segment_open(NeonWALReader *state, XLogSegNo nextSegNo,
|
||||||
TimeLineID *tli_p)
|
TimeLineID *tli_p)
|
||||||
{
|
{
|
||||||
@@ -724,7 +743,7 @@ is_wal_segment_exists(XLogSegNo segno, int segsize, TimeLineID tli)
|
|||||||
}
|
}
|
||||||
|
|
||||||
/* copy of vanilla wal_segment_close with NeonWALReader */
|
/* copy of vanilla wal_segment_close with NeonWALReader */
|
||||||
static void
|
void
|
||||||
neon_wal_segment_close(NeonWALReader *state)
|
neon_wal_segment_close(NeonWALReader *state)
|
||||||
{
|
{
|
||||||
if (state->seg.ws_file >= 0)
|
if (state->seg.ws_file >= 0)
|
||||||
@@ -740,3 +759,19 @@ NeonWALReaderErrMsg(NeonWALReader *state)
|
|||||||
{
|
{
|
||||||
return state->err_msg;
|
return state->err_msg;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/*
|
||||||
|
* Returns true if there is a donor, and false otherwise
|
||||||
|
*/
|
||||||
|
bool
|
||||||
|
NeonWALReaderUpdateDonor(NeonWALReader *state)
|
||||||
|
{
|
||||||
|
WalproposerShmemState *wps = GetWalpropShmemState();
|
||||||
|
|
||||||
|
SpinLockAcquire(&wps->mutex);
|
||||||
|
memcpy(state->donor_name, wps->donor_name, sizeof(state->donor_name));
|
||||||
|
memcpy(state->donor_conninfo, wps->donor_conninfo, sizeof(state->donor_conninfo));
|
||||||
|
state->donor_lsn = wps->donor_lsn;
|
||||||
|
SpinLockRelease(&wps->mutex);
|
||||||
|
return state->donor_name[0] != '\0';
|
||||||
|
}
|
||||||
|
|||||||
@@ -19,12 +19,19 @@ typedef enum
|
|||||||
NEON_WALREAD_ERROR,
|
NEON_WALREAD_ERROR,
|
||||||
} NeonWALReadResult;
|
} NeonWALReadResult;
|
||||||
|
|
||||||
extern NeonWALReader *NeonWALReaderAllocate(int wal_segment_size, XLogRecPtr available_lsn, WalProposer *wp, char *log_prefix);
|
extern NeonWALReader *NeonWALReaderAllocate(int wal_segment_size, XLogRecPtr available_lsn, char *log_prefix);
|
||||||
extern void NeonWALReaderFree(NeonWALReader *state);
|
extern void NeonWALReaderFree(NeonWALReader *state);
|
||||||
|
extern void NeonWALReaderResetRemote(NeonWALReader *state);
|
||||||
extern NeonWALReadResult NeonWALRead(NeonWALReader *state, char *buf, XLogRecPtr startptr, Size count, TimeLineID tli);
|
extern NeonWALReadResult NeonWALRead(NeonWALReader *state, char *buf, XLogRecPtr startptr, Size count, TimeLineID tli);
|
||||||
extern pgsocket NeonWALReaderSocket(NeonWALReader *state);
|
extern pgsocket NeonWALReaderSocket(NeonWALReader *state);
|
||||||
extern uint32 NeonWALReaderEvents(NeonWALReader *state);
|
extern uint32 NeonWALReaderEvents(NeonWALReader *state);
|
||||||
extern bool NeonWALReaderIsRemConnEstablished(NeonWALReader *state);
|
extern bool NeonWALReaderIsRemConnEstablished(NeonWALReader *state);
|
||||||
extern char *NeonWALReaderErrMsg(NeonWALReader *state);
|
extern char *NeonWALReaderErrMsg(NeonWALReader *state);
|
||||||
|
extern XLogRecPtr NeonWALReaderGetRemLsn(NeonWALReader *state);
|
||||||
|
extern const WALOpenSegment *NeonWALReaderGetSegment(NeonWALReader *state);
|
||||||
|
extern bool neon_wal_segment_open(NeonWALReader *state, XLogSegNo nextSegNo, TimeLineID *tli_p);
|
||||||
|
extern void neon_wal_segment_close(NeonWALReader *state);
|
||||||
|
extern bool NeonWALReaderUpdateDonor(NeonWALReader *state);
|
||||||
|
|
||||||
|
|
||||||
#endif /* __NEON_WALREADER_H__ */
|
#endif /* __NEON_WALREADER_H__ */
|
||||||
|
|||||||
@@ -80,7 +80,7 @@ static int CompareLsn(const void *a, const void *b);
|
|||||||
static char *FormatSafekeeperState(Safekeeper *sk);
|
static char *FormatSafekeeperState(Safekeeper *sk);
|
||||||
static void AssertEventsOkForState(uint32 events, Safekeeper *sk);
|
static void AssertEventsOkForState(uint32 events, Safekeeper *sk);
|
||||||
static char *FormatEvents(WalProposer *wp, uint32 events);
|
static char *FormatEvents(WalProposer *wp, uint32 events);
|
||||||
|
static void UpdateDonorShmem(WalProposer *wp);
|
||||||
|
|
||||||
WalProposer *
|
WalProposer *
|
||||||
WalProposerCreate(WalProposerConfig *config, walproposer_api api)
|
WalProposerCreate(WalProposerConfig *config, walproposer_api api)
|
||||||
@@ -922,7 +922,8 @@ static void
|
|||||||
DetermineEpochStartLsn(WalProposer *wp)
|
DetermineEpochStartLsn(WalProposer *wp)
|
||||||
{
|
{
|
||||||
TermHistory *dth;
|
TermHistory *dth;
|
||||||
int n_ready = 0;
|
int n_ready = 0;
|
||||||
|
WalproposerShmemState *walprop_shared;
|
||||||
|
|
||||||
wp->propEpochStartLsn = InvalidXLogRecPtr;
|
wp->propEpochStartLsn = InvalidXLogRecPtr;
|
||||||
wp->donorEpoch = 0;
|
wp->donorEpoch = 0;
|
||||||
@@ -964,16 +965,18 @@ DetermineEpochStartLsn(WalProposer *wp)
|
|||||||
if (n_ready < wp->quorum)
|
if (n_ready < wp->quorum)
|
||||||
{
|
{
|
||||||
/*
|
/*
|
||||||
* This is a rare case that can be triggered if safekeeper has voted and disconnected.
|
* This is a rare case that can be triggered if safekeeper has voted
|
||||||
* In this case, its state will not be SS_IDLE and its vote cannot be used, because
|
* and disconnected. In this case, its state will not be SS_IDLE and
|
||||||
* we clean up `voteResponse` in `ShutdownConnection`.
|
* its vote cannot be used, because we clean up `voteResponse` in
|
||||||
|
* `ShutdownConnection`.
|
||||||
*/
|
*/
|
||||||
wp_log(FATAL, "missing majority of votes, collected %d, expected %d, got %d", wp->n_votes, wp->quorum, n_ready);
|
wp_log(FATAL, "missing majority of votes, collected %d, expected %d, got %d", wp->n_votes, wp->quorum, n_ready);
|
||||||
}
|
}
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* If propEpochStartLsn is 0, it means flushLsn is 0 everywhere, we are bootstrapping
|
* If propEpochStartLsn is 0, it means flushLsn is 0 everywhere, we are
|
||||||
* and nothing was committed yet. Start streaming then from the basebackup LSN.
|
* bootstrapping and nothing was committed yet. Start streaming then from
|
||||||
|
* the basebackup LSN.
|
||||||
*/
|
*/
|
||||||
if (wp->propEpochStartLsn == InvalidXLogRecPtr && !wp->config->syncSafekeepers)
|
if (wp->propEpochStartLsn == InvalidXLogRecPtr && !wp->config->syncSafekeepers)
|
||||||
{
|
{
|
||||||
@@ -984,11 +987,12 @@ DetermineEpochStartLsn(WalProposer *wp)
|
|||||||
}
|
}
|
||||||
wp_log(LOG, "bumped epochStartLsn to the first record %X/%X", LSN_FORMAT_ARGS(wp->propEpochStartLsn));
|
wp_log(LOG, "bumped epochStartLsn to the first record %X/%X", LSN_FORMAT_ARGS(wp->propEpochStartLsn));
|
||||||
}
|
}
|
||||||
|
pg_atomic_write_u64(&wp->api.get_shmem_state(wp)->propEpochStartLsn, wp->propEpochStartLsn);
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* Safekeepers are setting truncateLsn after timelineStartLsn is known, so it
|
* Safekeepers are setting truncateLsn after timelineStartLsn is known, so
|
||||||
* should never be zero at this point, if we know timelineStartLsn.
|
* it should never be zero at this point, if we know timelineStartLsn.
|
||||||
*
|
*
|
||||||
* timelineStartLsn can be zero only on the first syncSafekeepers run.
|
* timelineStartLsn can be zero only on the first syncSafekeepers run.
|
||||||
*/
|
*/
|
||||||
Assert((wp->truncateLsn != InvalidXLogRecPtr) ||
|
Assert((wp->truncateLsn != InvalidXLogRecPtr) ||
|
||||||
@@ -1022,10 +1026,9 @@ DetermineEpochStartLsn(WalProposer *wp)
|
|||||||
* since which we are going to write according to the consensus. If not,
|
* since which we are going to write according to the consensus. If not,
|
||||||
* we must bail out, as clog and other non rel data is inconsistent.
|
* we must bail out, as clog and other non rel data is inconsistent.
|
||||||
*/
|
*/
|
||||||
|
walprop_shared = wp->api.get_shmem_state(wp);
|
||||||
if (!wp->config->syncSafekeepers)
|
if (!wp->config->syncSafekeepers)
|
||||||
{
|
{
|
||||||
WalproposerShmemState *walprop_shared = wp->api.get_shmem_state(wp);
|
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* Basebackup LSN always points to the beginning of the record (not
|
* Basebackup LSN always points to the beginning of the record (not
|
||||||
* the page), as StartupXLOG most probably wants it this way.
|
* the page), as StartupXLOG most probably wants it this way.
|
||||||
@@ -1040,7 +1043,7 @@ DetermineEpochStartLsn(WalProposer *wp)
|
|||||||
* compute (who could generate WAL) is ok.
|
* compute (who could generate WAL) is ok.
|
||||||
*/
|
*/
|
||||||
if (!((dth->n_entries >= 1) && (dth->entries[dth->n_entries - 1].term ==
|
if (!((dth->n_entries >= 1) && (dth->entries[dth->n_entries - 1].term ==
|
||||||
walprop_shared->mineLastElectedTerm)))
|
pg_atomic_read_u64(&walprop_shared->mineLastElectedTerm))))
|
||||||
{
|
{
|
||||||
/*
|
/*
|
||||||
* Panic to restart PG as we need to retake basebackup.
|
* Panic to restart PG as we need to retake basebackup.
|
||||||
@@ -1054,8 +1057,8 @@ DetermineEpochStartLsn(WalProposer *wp)
|
|||||||
LSN_FORMAT_ARGS(wp->api.get_redo_start_lsn(wp)));
|
LSN_FORMAT_ARGS(wp->api.get_redo_start_lsn(wp)));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
walprop_shared->mineLastElectedTerm = wp->propTerm;
|
|
||||||
}
|
}
|
||||||
|
pg_atomic_write_u64(&walprop_shared->mineLastElectedTerm, wp->propTerm);
|
||||||
}
|
}
|
||||||
|
|
||||||
/*
|
/*
|
||||||
@@ -1105,9 +1108,13 @@ SendProposerElected(Safekeeper *sk)
|
|||||||
{
|
{
|
||||||
/* safekeeper is empty or no common point, start from the beginning */
|
/* safekeeper is empty or no common point, start from the beginning */
|
||||||
sk->startStreamingAt = wp->propTermHistory.entries[0].lsn;
|
sk->startStreamingAt = wp->propTermHistory.entries[0].lsn;
|
||||||
wp_log(LOG, "no common point with sk %s:%s, streaming since first term at %X/%X, timelineStartLsn=%X/%X, termHistory.n_entries=%u" ,
|
wp_log(LOG, "no common point with sk %s:%s, streaming since first term at %X/%X, timelineStartLsn=%X/%X, termHistory.n_entries=%u",
|
||||||
sk->host, sk->port, LSN_FORMAT_ARGS(sk->startStreamingAt), LSN_FORMAT_ARGS(wp->timelineStartLsn), wp->propTermHistory.n_entries);
|
sk->host, sk->port, LSN_FORMAT_ARGS(sk->startStreamingAt), LSN_FORMAT_ARGS(wp->timelineStartLsn), wp->propTermHistory.n_entries);
|
||||||
/* wp->timelineStartLsn == InvalidXLogRecPtr can be only when timeline is created manually (test_s3_wal_replay) */
|
|
||||||
|
/*
|
||||||
|
* wp->timelineStartLsn == InvalidXLogRecPtr can be only when timeline
|
||||||
|
* is created manually (test_s3_wal_replay)
|
||||||
|
*/
|
||||||
Assert(sk->startStreamingAt == wp->timelineStartLsn || wp->timelineStartLsn == InvalidXLogRecPtr);
|
Assert(sk->startStreamingAt == wp->timelineStartLsn || wp->timelineStartLsn == InvalidXLogRecPtr);
|
||||||
}
|
}
|
||||||
else
|
else
|
||||||
@@ -1177,6 +1184,12 @@ StartStreaming(Safekeeper *sk)
|
|||||||
sk->active_state = SS_ACTIVE_SEND;
|
sk->active_state = SS_ACTIVE_SEND;
|
||||||
sk->streamingAt = sk->startStreamingAt;
|
sk->streamingAt = sk->startStreamingAt;
|
||||||
|
|
||||||
|
/*
|
||||||
|
* Donors can only be in SS_ACTIVE state, so we potentially update the
|
||||||
|
* donor when we switch one to SS_ACTIVE.
|
||||||
|
*/
|
||||||
|
UpdateDonorShmem(sk->wp);
|
||||||
|
|
||||||
/* event set will be updated inside SendMessageToNode */
|
/* event set will be updated inside SendMessageToNode */
|
||||||
SendMessageToNode(sk);
|
SendMessageToNode(sk);
|
||||||
}
|
}
|
||||||
@@ -1568,17 +1581,17 @@ GetAcknowledgedByQuorumWALPosition(WalProposer *wp)
|
|||||||
* none if it doesn't exist. donor_lsn is set to end position of the donor to
|
* none if it doesn't exist. donor_lsn is set to end position of the donor to
|
||||||
* the best of our knowledge.
|
* the best of our knowledge.
|
||||||
*/
|
*/
|
||||||
Safekeeper *
|
static void
|
||||||
GetDonor(WalProposer *wp, XLogRecPtr *donor_lsn)
|
UpdateDonorShmem(WalProposer *wp)
|
||||||
{
|
{
|
||||||
Safekeeper *donor = NULL;
|
Safekeeper *donor = NULL;
|
||||||
int i;
|
int i;
|
||||||
*donor_lsn = InvalidXLogRecPtr;
|
XLogRecPtr donor_lsn = InvalidXLogRecPtr;
|
||||||
|
|
||||||
if (wp->n_votes < wp->quorum)
|
if (wp->n_votes < wp->quorum)
|
||||||
{
|
{
|
||||||
wp_log(WARNING, "GetDonor called before elections are won");
|
wp_log(WARNING, "UpdateDonorShmem called before elections are won");
|
||||||
return NULL;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
/*
|
/*
|
||||||
@@ -1589,7 +1602,7 @@ GetDonor(WalProposer *wp, XLogRecPtr *donor_lsn)
|
|||||||
if (wp->safekeeper[wp->donor].state >= SS_IDLE)
|
if (wp->safekeeper[wp->donor].state >= SS_IDLE)
|
||||||
{
|
{
|
||||||
donor = &wp->safekeeper[wp->donor];
|
donor = &wp->safekeeper[wp->donor];
|
||||||
*donor_lsn = wp->propEpochStartLsn;
|
donor_lsn = wp->propEpochStartLsn;
|
||||||
}
|
}
|
||||||
|
|
||||||
/*
|
/*
|
||||||
@@ -1601,13 +1614,19 @@ GetDonor(WalProposer *wp, XLogRecPtr *donor_lsn)
|
|||||||
{
|
{
|
||||||
Safekeeper *sk = &wp->safekeeper[i];
|
Safekeeper *sk = &wp->safekeeper[i];
|
||||||
|
|
||||||
if (sk->state == SS_ACTIVE && sk->appendResponse.flushLsn > *donor_lsn)
|
if (sk->state == SS_ACTIVE && sk->appendResponse.flushLsn > donor_lsn)
|
||||||
{
|
{
|
||||||
donor = sk;
|
donor = sk;
|
||||||
*donor_lsn = sk->appendResponse.flushLsn;
|
donor_lsn = sk->appendResponse.flushLsn;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
return donor;
|
|
||||||
|
if (donor == NULL)
|
||||||
|
{
|
||||||
|
wp_log(WARNING, "UpdateDonorShmem didn't find a suitable donor, skipping");
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
wp->api.update_donor(wp, donor, donor_lsn);
|
||||||
}
|
}
|
||||||
|
|
||||||
/*
|
/*
|
||||||
@@ -1617,7 +1636,7 @@ static void
|
|||||||
HandleSafekeeperResponse(WalProposer *wp, Safekeeper *sk)
|
HandleSafekeeperResponse(WalProposer *wp, Safekeeper *sk)
|
||||||
{
|
{
|
||||||
XLogRecPtr candidateTruncateLsn;
|
XLogRecPtr candidateTruncateLsn;
|
||||||
XLogRecPtr newCommitLsn;
|
XLogRecPtr newCommitLsn;
|
||||||
|
|
||||||
newCommitLsn = GetAcknowledgedByQuorumWALPosition(wp);
|
newCommitLsn = GetAcknowledgedByQuorumWALPosition(wp);
|
||||||
if (newCommitLsn > wp->commitLsn)
|
if (newCommitLsn > wp->commitLsn)
|
||||||
@@ -1627,7 +1646,7 @@ HandleSafekeeperResponse(WalProposer *wp, Safekeeper *sk)
|
|||||||
BroadcastAppendRequest(wp);
|
BroadcastAppendRequest(wp);
|
||||||
}
|
}
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* Unlock syncrep waiters, update ps_feedback, CheckGracefulShutdown().
|
* Unlock syncrep waiters, update ps_feedback, CheckGracefulShutdown().
|
||||||
* The last one will terminate the process if the shutdown is requested
|
* The last one will terminate the process if the shutdown is requested
|
||||||
* and WAL is committed by the quorum. BroadcastAppendRequest() should be
|
* and WAL is committed by the quorum. BroadcastAppendRequest() should be
|
||||||
|
|||||||
@@ -284,14 +284,19 @@ typedef struct PageserverFeedback
|
|||||||
|
|
||||||
typedef struct WalproposerShmemState
|
typedef struct WalproposerShmemState
|
||||||
{
|
{
|
||||||
|
pg_atomic_uint64 propEpochStartLsn;
|
||||||
|
char donor_name[64];
|
||||||
|
char donor_conninfo[MAXCONNINFO];
|
||||||
|
XLogRecPtr donor_lsn;
|
||||||
|
|
||||||
slock_t mutex;
|
slock_t mutex;
|
||||||
term_t mineLastElectedTerm;
|
pg_atomic_uint64 mineLastElectedTerm;
|
||||||
pg_atomic_uint64 backpressureThrottlingTime;
|
pg_atomic_uint64 backpressureThrottlingTime;
|
||||||
pg_atomic_uint64 currentClusterSize;
|
pg_atomic_uint64 currentClusterSize;
|
||||||
|
|
||||||
/* last feedback from each shard */
|
/* last feedback from each shard */
|
||||||
PageserverFeedback shard_ps_feedback[MAX_SHARDS];
|
PageserverFeedback shard_ps_feedback[MAX_SHARDS];
|
||||||
int num_shards;
|
int num_shards;
|
||||||
|
|
||||||
/* aggregated feedback with min LSNs across shards */
|
/* aggregated feedback with min LSNs across shards */
|
||||||
PageserverFeedback min_ps_feedback;
|
PageserverFeedback min_ps_feedback;
|
||||||
@@ -465,6 +470,9 @@ typedef struct walproposer_api
|
|||||||
/* Get pointer to the latest available WAL. */
|
/* Get pointer to the latest available WAL. */
|
||||||
XLogRecPtr (*get_flush_rec_ptr) (WalProposer *wp);
|
XLogRecPtr (*get_flush_rec_ptr) (WalProposer *wp);
|
||||||
|
|
||||||
|
/* Update current donor info in WalProposer Shmem */
|
||||||
|
void (*update_donor) (WalProposer *wp, Safekeeper *donor, XLogRecPtr donor_lsn);
|
||||||
|
|
||||||
/* Get current time. */
|
/* Get current time. */
|
||||||
TimestampTz (*get_current_timestamp) (WalProposer *wp);
|
TimestampTz (*get_current_timestamp) (WalProposer *wp);
|
||||||
|
|
||||||
@@ -497,7 +505,7 @@ typedef struct walproposer_api
|
|||||||
*
|
*
|
||||||
* On success, the data is placed in *buf. It is valid until the next call
|
* On success, the data is placed in *buf. It is valid until the next call
|
||||||
* to this function.
|
* to this function.
|
||||||
*
|
*
|
||||||
* Returns PG_ASYNC_READ_FAIL on closed connection.
|
* Returns PG_ASYNC_READ_FAIL on closed connection.
|
||||||
*/
|
*/
|
||||||
PGAsyncReadResult (*conn_async_read) (Safekeeper *sk, char **buf, int *amount);
|
PGAsyncReadResult (*conn_async_read) (Safekeeper *sk, char **buf, int *amount);
|
||||||
@@ -545,13 +553,14 @@ typedef struct walproposer_api
|
|||||||
* Returns 0 if timeout is reached, 1 if some event happened. Updates
|
* Returns 0 if timeout is reached, 1 if some event happened. Updates
|
||||||
* events mask to indicate events and sets sk to the safekeeper which has
|
* events mask to indicate events and sets sk to the safekeeper which has
|
||||||
* an event.
|
* an event.
|
||||||
*
|
*
|
||||||
* On timeout, events is set to WL_NO_EVENTS. On socket event, events is
|
* On timeout, events is set to WL_NO_EVENTS. On socket event, events is
|
||||||
* set to WL_SOCKET_READABLE and/or WL_SOCKET_WRITEABLE. When socket is
|
* set to WL_SOCKET_READABLE and/or WL_SOCKET_WRITEABLE. When socket is
|
||||||
* closed, events is set to WL_SOCKET_READABLE.
|
* closed, events is set to WL_SOCKET_READABLE.
|
||||||
*
|
*
|
||||||
* WL_SOCKET_WRITEABLE is usually set only when we need to flush the buffer.
|
* WL_SOCKET_WRITEABLE is usually set only when we need to flush the
|
||||||
* It can be returned only if caller asked for this event in the last *_event_set call.
|
* buffer. It can be returned only if caller asked for this event in the
|
||||||
|
* last *_event_set call.
|
||||||
*/
|
*/
|
||||||
int (*wait_event_set) (WalProposer *wp, long timeout, Safekeeper **sk, uint32 *events);
|
int (*wait_event_set) (WalProposer *wp, long timeout, Safekeeper **sk, uint32 *events);
|
||||||
|
|
||||||
@@ -571,9 +580,9 @@ typedef struct walproposer_api
|
|||||||
void (*finish_sync_safekeepers) (WalProposer *wp, XLogRecPtr lsn);
|
void (*finish_sync_safekeepers) (WalProposer *wp, XLogRecPtr lsn);
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* Called after every AppendResponse from the safekeeper. Used to propagate
|
* Called after every AppendResponse from the safekeeper. Used to
|
||||||
* backpressure feedback and to confirm WAL persistence (has been commited
|
* propagate backpressure feedback and to confirm WAL persistence (has
|
||||||
* on the quorum of safekeepers).
|
* been commited on the quorum of safekeepers).
|
||||||
*/
|
*/
|
||||||
void (*process_safekeeper_feedback) (WalProposer *wp, Safekeeper *sk);
|
void (*process_safekeeper_feedback) (WalProposer *wp, Safekeeper *sk);
|
||||||
|
|
||||||
@@ -716,12 +725,14 @@ extern void WalProposerBroadcast(WalProposer *wp, XLogRecPtr startpos, XLogRecPt
|
|||||||
extern void WalProposerPoll(WalProposer *wp);
|
extern void WalProposerPoll(WalProposer *wp);
|
||||||
extern void WalProposerFree(WalProposer *wp);
|
extern void WalProposerFree(WalProposer *wp);
|
||||||
|
|
||||||
|
extern WalproposerShmemState *GetWalpropShmemState();
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* WaitEventSet API doesn't allow to remove socket, so walproposer_pg uses it to
|
* WaitEventSet API doesn't allow to remove socket, so walproposer_pg uses it to
|
||||||
* recreate set from scratch, hence the export.
|
* recreate set from scratch, hence the export.
|
||||||
*/
|
*/
|
||||||
extern void SafekeeperStateDesiredEvents(Safekeeper *sk, uint32 *sk_events, uint32 *nwr_events);
|
extern void SafekeeperStateDesiredEvents(Safekeeper *sk, uint32 *sk_events, uint32 *nwr_events);
|
||||||
extern Safekeeper *GetDonor(WalProposer *wp, XLogRecPtr *donor_lsn);
|
extern TimeLineID walprop_pg_get_timeline_id(void);
|
||||||
|
|
||||||
|
|
||||||
#define WPEVENT 1337 /* special log level for walproposer internal
|
#define WPEVENT 1337 /* special log level for walproposer internal
|
||||||
|
|||||||
@@ -85,7 +85,6 @@ static void walprop_pg_init_standalone_sync_safekeepers(void);
|
|||||||
static void walprop_pg_init_walsender(void);
|
static void walprop_pg_init_walsender(void);
|
||||||
static void walprop_pg_init_bgworker(void);
|
static void walprop_pg_init_bgworker(void);
|
||||||
static TimestampTz walprop_pg_get_current_timestamp(WalProposer *wp);
|
static TimestampTz walprop_pg_get_current_timestamp(WalProposer *wp);
|
||||||
static TimeLineID walprop_pg_get_timeline_id(void);
|
|
||||||
static void walprop_pg_load_libpqwalreceiver(void);
|
static void walprop_pg_load_libpqwalreceiver(void);
|
||||||
|
|
||||||
static process_interrupts_callback_t PrevProcessInterruptsCallback;
|
static process_interrupts_callback_t PrevProcessInterruptsCallback;
|
||||||
@@ -94,6 +93,8 @@ static shmem_startup_hook_type prev_shmem_startup_hook_type;
|
|||||||
static shmem_request_hook_type prev_shmem_request_hook = NULL;
|
static shmem_request_hook_type prev_shmem_request_hook = NULL;
|
||||||
static void walproposer_shmem_request(void);
|
static void walproposer_shmem_request(void);
|
||||||
#endif
|
#endif
|
||||||
|
static void WalproposerShmemInit_SyncSafekeeper(void);
|
||||||
|
|
||||||
|
|
||||||
static void StartProposerReplication(WalProposer *wp, StartReplicationCmd *cmd);
|
static void StartProposerReplication(WalProposer *wp, StartReplicationCmd *cmd);
|
||||||
static void WalSndLoop(WalProposer *wp);
|
static void WalSndLoop(WalProposer *wp);
|
||||||
@@ -136,6 +137,7 @@ WalProposerSync(int argc, char *argv[])
|
|||||||
WalProposer *wp;
|
WalProposer *wp;
|
||||||
|
|
||||||
init_walprop_config(true);
|
init_walprop_config(true);
|
||||||
|
WalproposerShmemInit_SyncSafekeeper();
|
||||||
walprop_pg_init_standalone_sync_safekeepers();
|
walprop_pg_init_standalone_sync_safekeepers();
|
||||||
walprop_pg_load_libpqwalreceiver();
|
walprop_pg_load_libpqwalreceiver();
|
||||||
|
|
||||||
@@ -281,6 +283,8 @@ WalproposerShmemInit(void)
|
|||||||
{
|
{
|
||||||
memset(walprop_shared, 0, WalproposerShmemSize());
|
memset(walprop_shared, 0, WalproposerShmemSize());
|
||||||
SpinLockInit(&walprop_shared->mutex);
|
SpinLockInit(&walprop_shared->mutex);
|
||||||
|
pg_atomic_init_u64(&walprop_shared->propEpochStartLsn, 0);
|
||||||
|
pg_atomic_init_u64(&walprop_shared->mineLastElectedTerm, 0);
|
||||||
pg_atomic_init_u64(&walprop_shared->backpressureThrottlingTime, 0);
|
pg_atomic_init_u64(&walprop_shared->backpressureThrottlingTime, 0);
|
||||||
pg_atomic_init_u64(&walprop_shared->currentClusterSize, 0);
|
pg_atomic_init_u64(&walprop_shared->currentClusterSize, 0);
|
||||||
}
|
}
|
||||||
@@ -289,6 +293,17 @@ WalproposerShmemInit(void)
|
|||||||
return found;
|
return found;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
static void
|
||||||
|
WalproposerShmemInit_SyncSafekeeper(void)
|
||||||
|
{
|
||||||
|
walprop_shared = palloc(WalproposerShmemSize());
|
||||||
|
memset(walprop_shared, 0, WalproposerShmemSize());
|
||||||
|
SpinLockInit(&walprop_shared->mutex);
|
||||||
|
pg_atomic_init_u64(&walprop_shared->propEpochStartLsn, 0);
|
||||||
|
pg_atomic_init_u64(&walprop_shared->mineLastElectedTerm, 0);
|
||||||
|
pg_atomic_init_u64(&walprop_shared->backpressureThrottlingTime, 0);
|
||||||
|
}
|
||||||
|
|
||||||
#define BACK_PRESSURE_DELAY 10000L // 0.01 sec
|
#define BACK_PRESSURE_DELAY 10000L // 0.01 sec
|
||||||
|
|
||||||
static bool
|
static bool
|
||||||
@@ -399,6 +414,13 @@ nwp_shmem_startup_hook(void)
|
|||||||
WalproposerShmemInit();
|
WalproposerShmemInit();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
WalproposerShmemState *
|
||||||
|
GetWalpropShmemState()
|
||||||
|
{
|
||||||
|
Assert(walprop_shared != NULL);
|
||||||
|
return walprop_shared;
|
||||||
|
}
|
||||||
|
|
||||||
static WalproposerShmemState *
|
static WalproposerShmemState *
|
||||||
walprop_pg_get_shmem_state(WalProposer *wp)
|
walprop_pg_get_shmem_state(WalProposer *wp)
|
||||||
{
|
{
|
||||||
@@ -431,14 +453,15 @@ record_pageserver_feedback(PageserverFeedback *ps_feedback)
|
|||||||
for (int i = 0; i < walprop_shared->num_shards; i++)
|
for (int i = 0; i < walprop_shared->num_shards; i++)
|
||||||
{
|
{
|
||||||
PageserverFeedback *feedback = &walprop_shared->shard_ps_feedback[i];
|
PageserverFeedback *feedback = &walprop_shared->shard_ps_feedback[i];
|
||||||
|
|
||||||
if (feedback->present)
|
if (feedback->present)
|
||||||
{
|
{
|
||||||
if (min_feedback.last_received_lsn == InvalidXLogRecPtr || feedback->last_received_lsn < min_feedback.last_received_lsn)
|
if (min_feedback.last_received_lsn == InvalidXLogRecPtr || feedback->last_received_lsn < min_feedback.last_received_lsn)
|
||||||
min_feedback.last_received_lsn = feedback->last_received_lsn;
|
min_feedback.last_received_lsn = feedback->last_received_lsn;
|
||||||
|
|
||||||
if (min_feedback.disk_consistent_lsn == InvalidXLogRecPtr || feedback->disk_consistent_lsn < min_feedback.disk_consistent_lsn)
|
if (min_feedback.disk_consistent_lsn == InvalidXLogRecPtr || feedback->disk_consistent_lsn < min_feedback.disk_consistent_lsn)
|
||||||
min_feedback.disk_consistent_lsn = feedback->disk_consistent_lsn;
|
min_feedback.disk_consistent_lsn = feedback->disk_consistent_lsn;
|
||||||
|
|
||||||
if (min_feedback.remote_consistent_lsn == InvalidXLogRecPtr || feedback->remote_consistent_lsn < min_feedback.remote_consistent_lsn)
|
if (min_feedback.remote_consistent_lsn == InvalidXLogRecPtr || feedback->remote_consistent_lsn < min_feedback.remote_consistent_lsn)
|
||||||
min_feedback.remote_consistent_lsn = feedback->remote_consistent_lsn;
|
min_feedback.remote_consistent_lsn = feedback->remote_consistent_lsn;
|
||||||
}
|
}
|
||||||
@@ -551,6 +574,7 @@ static void
|
|||||||
walprop_sigusr2(SIGNAL_ARGS)
|
walprop_sigusr2(SIGNAL_ARGS)
|
||||||
{
|
{
|
||||||
int save_errno = errno;
|
int save_errno = errno;
|
||||||
|
|
||||||
got_SIGUSR2 = true;
|
got_SIGUSR2 = true;
|
||||||
SetLatch(MyLatch);
|
SetLatch(MyLatch);
|
||||||
errno = save_errno;
|
errno = save_errno;
|
||||||
@@ -598,7 +622,7 @@ walprop_pg_get_current_timestamp(WalProposer *wp)
|
|||||||
return GetCurrentTimestamp();
|
return GetCurrentTimestamp();
|
||||||
}
|
}
|
||||||
|
|
||||||
static TimeLineID
|
TimeLineID
|
||||||
walprop_pg_get_timeline_id(void)
|
walprop_pg_get_timeline_id(void)
|
||||||
{
|
{
|
||||||
#if PG_VERSION_NUM >= 150000
|
#if PG_VERSION_NUM >= 150000
|
||||||
@@ -617,6 +641,20 @@ walprop_pg_load_libpqwalreceiver(void)
|
|||||||
wpg_log(ERROR, "libpqwalreceiver didn't initialize correctly");
|
wpg_log(ERROR, "libpqwalreceiver didn't initialize correctly");
|
||||||
}
|
}
|
||||||
|
|
||||||
|
static void
|
||||||
|
walprop_pg_update_donor(WalProposer *wp, Safekeeper *donor, XLogRecPtr donor_lsn)
|
||||||
|
{
|
||||||
|
WalproposerShmemState *wps = wp->api.get_shmem_state(wp);
|
||||||
|
char donor_name[64];
|
||||||
|
|
||||||
|
pg_snprintf(donor_name, sizeof(donor_name), "%s:%s", donor->host, donor->port);
|
||||||
|
SpinLockAcquire(&wps->mutex);
|
||||||
|
memcpy(wps->donor_name, donor_name, sizeof(donor_name));
|
||||||
|
memcpy(wps->donor_conninfo, donor->conninfo, sizeof(donor->conninfo));
|
||||||
|
wps->donor_lsn = donor_lsn;
|
||||||
|
SpinLockRelease(&wps->mutex);
|
||||||
|
}
|
||||||
|
|
||||||
/* Helper function */
|
/* Helper function */
|
||||||
static bool
|
static bool
|
||||||
ensure_nonblocking_status(WalProposerConn *conn, bool is_nonblocking)
|
ensure_nonblocking_status(WalProposerConn *conn, bool is_nonblocking)
|
||||||
@@ -717,7 +755,6 @@ walprop_connect_start(Safekeeper *sk)
|
|||||||
{
|
{
|
||||||
Assert(sk->conn == NULL);
|
Assert(sk->conn == NULL);
|
||||||
sk->conn = libpqwp_connect_start(sk->conninfo);
|
sk->conn = libpqwp_connect_start(sk->conninfo);
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
static WalProposerConnectPollStatusType
|
static WalProposerConnectPollStatusType
|
||||||
@@ -1091,7 +1128,7 @@ static void
|
|||||||
StartProposerReplication(WalProposer *wp, StartReplicationCmd *cmd)
|
StartProposerReplication(WalProposer *wp, StartReplicationCmd *cmd)
|
||||||
{
|
{
|
||||||
XLogRecPtr FlushPtr;
|
XLogRecPtr FlushPtr;
|
||||||
__attribute__((unused)) TimeLineID currTLI;
|
__attribute__((unused)) TimeLineID currTLI;
|
||||||
|
|
||||||
#if PG_VERSION_NUM < 150000
|
#if PG_VERSION_NUM < 150000
|
||||||
if (ThisTimeLineID == 0)
|
if (ThisTimeLineID == 0)
|
||||||
@@ -1295,116 +1332,13 @@ XLogBroadcastWalProposer(WalProposer *wp)
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/* Download WAL before basebackup for logical walsenders from sk, if needed */
|
/*
|
||||||
|
Used to download WAL before basebackup for logical walsenders from sk, no longer
|
||||||
|
needed because walsender always uses neon_walreader.
|
||||||
|
*/
|
||||||
static bool
|
static bool
|
||||||
WalProposerRecovery(WalProposer *wp, Safekeeper *sk)
|
WalProposerRecovery(WalProposer *wp, Safekeeper *sk)
|
||||||
{
|
{
|
||||||
char *err;
|
|
||||||
WalReceiverConn *wrconn;
|
|
||||||
WalRcvStreamOptions options;
|
|
||||||
char conninfo[MAXCONNINFO];
|
|
||||||
TimeLineID timeline;
|
|
||||||
XLogRecPtr startpos;
|
|
||||||
XLogRecPtr endpos;
|
|
||||||
|
|
||||||
startpos = GetLogRepRestartLSN(wp);
|
|
||||||
if (startpos == InvalidXLogRecPtr)
|
|
||||||
return true; /* recovery not needed */
|
|
||||||
endpos = wp->propEpochStartLsn;
|
|
||||||
|
|
||||||
timeline = wp->greetRequest.timeline;
|
|
||||||
|
|
||||||
if (!neon_auth_token)
|
|
||||||
{
|
|
||||||
memcpy(conninfo, sk->conninfo, MAXCONNINFO);
|
|
||||||
}
|
|
||||||
else
|
|
||||||
{
|
|
||||||
int written = 0;
|
|
||||||
|
|
||||||
written = snprintf((char *) conninfo, MAXCONNINFO, "password=%s %s", neon_auth_token, sk->conninfo);
|
|
||||||
if (written > MAXCONNINFO || written < 0)
|
|
||||||
wpg_log(FATAL, "could not append password to the safekeeper connection string");
|
|
||||||
}
|
|
||||||
|
|
||||||
#if PG_MAJORVERSION_NUM < 16
|
|
||||||
wrconn = walrcv_connect(conninfo, false, "wal_proposer_recovery", &err);
|
|
||||||
#else
|
|
||||||
wrconn = walrcv_connect(conninfo, false, false, "wal_proposer_recovery", &err);
|
|
||||||
#endif
|
|
||||||
|
|
||||||
if (!wrconn)
|
|
||||||
{
|
|
||||||
ereport(WARNING,
|
|
||||||
(errmsg("could not connect to WAL acceptor %s:%s: %s",
|
|
||||||
sk->host, sk->port,
|
|
||||||
err)));
|
|
||||||
return false;
|
|
||||||
}
|
|
||||||
wpg_log(LOG,
|
|
||||||
"start recovery for logical replication from %s:%s starting from %X/%08X till %X/%08X timeline "
|
|
||||||
"%d",
|
|
||||||
sk->host, sk->port, (uint32) (startpos >> 32),
|
|
||||||
(uint32) startpos, (uint32) (endpos >> 32), (uint32) endpos, timeline);
|
|
||||||
|
|
||||||
options.logical = false;
|
|
||||||
options.startpoint = startpos;
|
|
||||||
options.slotname = NULL;
|
|
||||||
options.proto.physical.startpointTLI = timeline;
|
|
||||||
|
|
||||||
if (walrcv_startstreaming(wrconn, &options))
|
|
||||||
{
|
|
||||||
XLogRecPtr rec_start_lsn;
|
|
||||||
XLogRecPtr rec_end_lsn = 0;
|
|
||||||
int len;
|
|
||||||
char *buf;
|
|
||||||
pgsocket wait_fd = PGINVALID_SOCKET;
|
|
||||||
|
|
||||||
while ((len = walrcv_receive(wrconn, &buf, &wait_fd)) >= 0)
|
|
||||||
{
|
|
||||||
if (len == 0)
|
|
||||||
{
|
|
||||||
(void) WaitLatchOrSocket(
|
|
||||||
MyLatch, WL_EXIT_ON_PM_DEATH | WL_SOCKET_READABLE, wait_fd,
|
|
||||||
-1, WAIT_EVENT_WAL_RECEIVER_MAIN);
|
|
||||||
}
|
|
||||||
else
|
|
||||||
{
|
|
||||||
Assert(buf[0] == 'w' || buf[0] == 'k');
|
|
||||||
if (buf[0] == 'k')
|
|
||||||
continue; /* keepalive */
|
|
||||||
memcpy(&rec_start_lsn, &buf[XLOG_HDR_START_POS],
|
|
||||||
sizeof rec_start_lsn);
|
|
||||||
rec_start_lsn = pg_ntoh64(rec_start_lsn);
|
|
||||||
rec_end_lsn = rec_start_lsn + len - XLOG_HDR_SIZE;
|
|
||||||
|
|
||||||
/* write WAL to disk */
|
|
||||||
XLogWalPropWrite(sk->wp, &buf[XLOG_HDR_SIZE], len - XLOG_HDR_SIZE, rec_start_lsn);
|
|
||||||
|
|
||||||
ereport(DEBUG1,
|
|
||||||
(errmsg("Recover message %X/%X length %d",
|
|
||||||
LSN_FORMAT_ARGS(rec_start_lsn), len)));
|
|
||||||
if (rec_end_lsn >= endpos)
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
ereport(LOG,
|
|
||||||
(errmsg("end of replication stream at %X/%X: %m",
|
|
||||||
LSN_FORMAT_ARGS(rec_end_lsn))));
|
|
||||||
walrcv_disconnect(wrconn);
|
|
||||||
|
|
||||||
/* failed to receive all WAL till endpos */
|
|
||||||
if (rec_end_lsn < endpos)
|
|
||||||
return false;
|
|
||||||
}
|
|
||||||
else
|
|
||||||
{
|
|
||||||
ereport(LOG,
|
|
||||||
(errmsg("primary server contains no more WAL on requested timeline %u LSN %X/%08X",
|
|
||||||
timeline, (uint32) (startpos >> 32), (uint32) startpos)));
|
|
||||||
return false;
|
|
||||||
}
|
|
||||||
|
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -1545,7 +1479,7 @@ walprop_pg_wal_reader_allocate(Safekeeper *sk)
|
|||||||
|
|
||||||
snprintf(log_prefix, sizeof(log_prefix), WP_LOG_PREFIX "sk %s:%s nwr: ", sk->host, sk->port);
|
snprintf(log_prefix, sizeof(log_prefix), WP_LOG_PREFIX "sk %s:%s nwr: ", sk->host, sk->port);
|
||||||
Assert(!sk->xlogreader);
|
Assert(!sk->xlogreader);
|
||||||
sk->xlogreader = NeonWALReaderAllocate(wal_segment_size, sk->wp->propEpochStartLsn, sk->wp, log_prefix);
|
sk->xlogreader = NeonWALReaderAllocate(wal_segment_size, sk->wp->propEpochStartLsn, log_prefix);
|
||||||
if (sk->xlogreader == NULL)
|
if (sk->xlogreader == NULL)
|
||||||
wpg_log(FATAL, "failed to allocate xlog reader");
|
wpg_log(FATAL, "failed to allocate xlog reader");
|
||||||
}
|
}
|
||||||
@@ -1960,8 +1894,8 @@ CombineHotStanbyFeedbacks(HotStandbyFeedback *hs, WalProposer *wp)
|
|||||||
static void
|
static void
|
||||||
walprop_pg_process_safekeeper_feedback(WalProposer *wp, Safekeeper *sk)
|
walprop_pg_process_safekeeper_feedback(WalProposer *wp, Safekeeper *sk)
|
||||||
{
|
{
|
||||||
HotStandbyFeedback hsFeedback;
|
HotStandbyFeedback hsFeedback;
|
||||||
bool needToAdvanceSlot = false;
|
bool needToAdvanceSlot = false;
|
||||||
|
|
||||||
if (wp->config->syncSafekeepers)
|
if (wp->config->syncSafekeepers)
|
||||||
return;
|
return;
|
||||||
@@ -2095,22 +2029,25 @@ GetLogRepRestartLSN(WalProposer *wp)
|
|||||||
return lrRestartLsn;
|
return lrRestartLsn;
|
||||||
}
|
}
|
||||||
|
|
||||||
void SetNeonCurrentClusterSize(uint64 size)
|
void
|
||||||
|
SetNeonCurrentClusterSize(uint64 size)
|
||||||
{
|
{
|
||||||
pg_atomic_write_u64(&walprop_shared->currentClusterSize, size);
|
pg_atomic_write_u64(&walprop_shared->currentClusterSize, size);
|
||||||
}
|
}
|
||||||
|
|
||||||
uint64 GetNeonCurrentClusterSize(void)
|
uint64
|
||||||
|
GetNeonCurrentClusterSize(void)
|
||||||
{
|
{
|
||||||
return pg_atomic_read_u64(&walprop_shared->currentClusterSize);
|
return pg_atomic_read_u64(&walprop_shared->currentClusterSize);
|
||||||
}
|
}
|
||||||
uint64 GetNeonCurrentClusterSize(void);
|
uint64 GetNeonCurrentClusterSize(void);
|
||||||
|
|
||||||
|
|
||||||
static const walproposer_api walprop_pg = {
|
static const walproposer_api walprop_pg = {
|
||||||
.get_shmem_state = walprop_pg_get_shmem_state,
|
.get_shmem_state = walprop_pg_get_shmem_state,
|
||||||
.start_streaming = walprop_pg_start_streaming,
|
.start_streaming = walprop_pg_start_streaming,
|
||||||
.get_flush_rec_ptr = walprop_pg_get_flush_rec_ptr,
|
.get_flush_rec_ptr = walprop_pg_get_flush_rec_ptr,
|
||||||
|
.update_donor = walprop_pg_update_donor,
|
||||||
.get_current_timestamp = walprop_pg_get_current_timestamp,
|
.get_current_timestamp = walprop_pg_get_current_timestamp,
|
||||||
.conn_error_message = walprop_error_message,
|
.conn_error_message = walprop_error_message,
|
||||||
.conn_status = walprop_status,
|
.conn_status = walprop_status,
|
||||||
|
|||||||
172
pgxn/neon/walsender_hooks.c
Normal file
172
pgxn/neon/walsender_hooks.c
Normal file
@@ -0,0 +1,172 @@
|
|||||||
|
/*-------------------------------------------------------------------------
|
||||||
|
*
|
||||||
|
* walsender_hooks.c
|
||||||
|
*
|
||||||
|
* Implements XLogReaderRoutine in terms of NeonWALReader. Allows for
|
||||||
|
* fetching WAL from safekeepers, which normal xlogreader can't do.
|
||||||
|
*
|
||||||
|
*-------------------------------------------------------------------------
|
||||||
|
*/
|
||||||
|
#include "walsender_hooks.h"
|
||||||
|
#include "postgres.h"
|
||||||
|
#include "fmgr.h"
|
||||||
|
#include "access/xlogdefs.h"
|
||||||
|
#include "replication/walsender.h"
|
||||||
|
#include "access/xlog.h"
|
||||||
|
#include "access/xlog_internal.h"
|
||||||
|
#include "access/xlogreader.h"
|
||||||
|
#include "miscadmin.h"
|
||||||
|
#include "utils/wait_event.h"
|
||||||
|
#include "utils/guc.h"
|
||||||
|
#include "postmaster/interrupt.h"
|
||||||
|
|
||||||
|
#include "neon_walreader.h"
|
||||||
|
#include "walproposer.h"
|
||||||
|
|
||||||
|
static NeonWALReader *wal_reader = NULL;
|
||||||
|
extern XLogRecPtr WalSndWaitForWal(XLogRecPtr loc);
|
||||||
|
extern bool GetDonorShmem(XLogRecPtr *donor_lsn);
|
||||||
|
|
||||||
|
static XLogRecPtr
|
||||||
|
NeonWALReadWaitForWAL(XLogRecPtr loc)
|
||||||
|
{
|
||||||
|
while (!NeonWALReaderUpdateDonor(wal_reader))
|
||||||
|
{
|
||||||
|
pg_usleep(1000);
|
||||||
|
CHECK_FOR_INTERRUPTS();
|
||||||
|
}
|
||||||
|
|
||||||
|
return WalSndWaitForWal(loc);
|
||||||
|
}
|
||||||
|
|
||||||
|
static int
|
||||||
|
NeonWALPageRead(
|
||||||
|
XLogReaderState *xlogreader,
|
||||||
|
XLogRecPtr targetPagePtr,
|
||||||
|
int reqLen,
|
||||||
|
XLogRecPtr targetRecPtr,
|
||||||
|
char *readBuf)
|
||||||
|
{
|
||||||
|
XLogRecPtr rem_lsn;
|
||||||
|
|
||||||
|
/* Wait for flush pointer to advance past our request */
|
||||||
|
XLogRecPtr flushptr = NeonWALReadWaitForWAL(targetPagePtr + reqLen);
|
||||||
|
int count;
|
||||||
|
|
||||||
|
if (flushptr < targetPagePtr + reqLen)
|
||||||
|
return -1;
|
||||||
|
|
||||||
|
/* Read at most XLOG_BLCKSZ bytes */
|
||||||
|
if (targetPagePtr + XLOG_BLCKSZ <= flushptr)
|
||||||
|
count = XLOG_BLCKSZ;
|
||||||
|
else
|
||||||
|
count = flushptr - targetPagePtr;
|
||||||
|
|
||||||
|
/*
|
||||||
|
* Sometimes walsender requests non-monotonic sequences of WAL. If that's
|
||||||
|
* the case, we have to reset streaming from remote at the correct
|
||||||
|
* position. For example, walsender may try to verify the segment header
|
||||||
|
* when trying to read in the middle of it.
|
||||||
|
*/
|
||||||
|
rem_lsn = NeonWALReaderGetRemLsn(wal_reader);
|
||||||
|
if (rem_lsn != InvalidXLogRecPtr && targetPagePtr != rem_lsn)
|
||||||
|
{
|
||||||
|
NeonWALReaderResetRemote(wal_reader);
|
||||||
|
}
|
||||||
|
|
||||||
|
for (;;)
|
||||||
|
{
|
||||||
|
NeonWALReadResult res = NeonWALRead(
|
||||||
|
wal_reader,
|
||||||
|
readBuf,
|
||||||
|
targetPagePtr,
|
||||||
|
count,
|
||||||
|
walprop_pg_get_timeline_id());
|
||||||
|
|
||||||
|
if (res == NEON_WALREAD_SUCCESS)
|
||||||
|
{
|
||||||
|
/*
|
||||||
|
* Setting ws_tli is required by the XLogReaderRoutine, it is used
|
||||||
|
* for segment name generation in error reports.
|
||||||
|
*
|
||||||
|
* ReadPageInternal updates ws_segno after calling cb on its own
|
||||||
|
* and XLogReaderRoutine description doesn't require it, but
|
||||||
|
* WALRead sets, let's follow it.
|
||||||
|
*/
|
||||||
|
xlogreader->seg.ws_tli = NeonWALReaderGetSegment(wal_reader)->ws_tli;
|
||||||
|
xlogreader->seg.ws_segno = NeonWALReaderGetSegment(wal_reader)->ws_segno;
|
||||||
|
|
||||||
|
/*
|
||||||
|
* ws_file doesn't exist in case of remote read, and isn't used by
|
||||||
|
* xlogreader except by WALRead on which we don't rely anyway.
|
||||||
|
*/
|
||||||
|
return count;
|
||||||
|
}
|
||||||
|
if (res == NEON_WALREAD_ERROR)
|
||||||
|
{
|
||||||
|
elog(ERROR, "[walsender] Failed to read WAL (req_lsn=%X/%X, len=%d): %s",
|
||||||
|
LSN_FORMAT_ARGS(targetPagePtr),
|
||||||
|
reqLen,
|
||||||
|
NeonWALReaderErrMsg(wal_reader));
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
|
||||||
|
/*
|
||||||
|
* Res is WOULDBLOCK, so we wait on the socket, recreating event set
|
||||||
|
* if necessary
|
||||||
|
*/
|
||||||
|
{
|
||||||
|
|
||||||
|
pgsocket sock = NeonWALReaderSocket(wal_reader);
|
||||||
|
uint32_t reader_events = NeonWALReaderEvents(wal_reader);
|
||||||
|
long timeout_ms = 1000;
|
||||||
|
|
||||||
|
ResetLatch(MyLatch);
|
||||||
|
CHECK_FOR_INTERRUPTS();
|
||||||
|
if (ConfigReloadPending)
|
||||||
|
{
|
||||||
|
ConfigReloadPending = false;
|
||||||
|
ProcessConfigFile(PGC_SIGHUP);
|
||||||
|
}
|
||||||
|
|
||||||
|
WaitLatchOrSocket(
|
||||||
|
MyLatch,
|
||||||
|
WL_LATCH_SET | WL_EXIT_ON_PM_DEATH | reader_events,
|
||||||
|
sock,
|
||||||
|
timeout_ms,
|
||||||
|
WAIT_EVENT_WAL_SENDER_MAIN);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
static void
|
||||||
|
NeonWALReadSegmentOpen(XLogReaderState *xlogreader, XLogSegNo nextSegNo, TimeLineID *tli_p)
|
||||||
|
{
|
||||||
|
neon_wal_segment_open(wal_reader, nextSegNo, tli_p);
|
||||||
|
xlogreader->seg.ws_file = NeonWALReaderGetSegment(wal_reader)->ws_file;
|
||||||
|
}
|
||||||
|
|
||||||
|
static void
|
||||||
|
NeonWALReadSegmentClose(XLogReaderState *xlogreader)
|
||||||
|
{
|
||||||
|
neon_wal_segment_close(wal_reader);
|
||||||
|
xlogreader->seg.ws_file = NeonWALReaderGetSegment(wal_reader)->ws_file;
|
||||||
|
}
|
||||||
|
|
||||||
|
void
|
||||||
|
NeonOnDemandXLogReaderRoutines(XLogReaderRoutine *xlr)
|
||||||
|
{
|
||||||
|
if (!wal_reader)
|
||||||
|
{
|
||||||
|
XLogRecPtr epochStartLsn = pg_atomic_read_u64(&GetWalpropShmemState()->propEpochStartLsn);
|
||||||
|
|
||||||
|
if (epochStartLsn == 0)
|
||||||
|
{
|
||||||
|
elog(ERROR, "Unable to start walsender when propEpochStartLsn is 0!");
|
||||||
|
}
|
||||||
|
wal_reader = NeonWALReaderAllocate(wal_segment_size, epochStartLsn, "[walsender] ");
|
||||||
|
}
|
||||||
|
xlr->page_read = NeonWALPageRead;
|
||||||
|
xlr->segment_open = NeonWALReadSegmentOpen;
|
||||||
|
xlr->segment_close = NeonWALReadSegmentClose;
|
||||||
|
}
|
||||||
7
pgxn/neon/walsender_hooks.h
Normal file
7
pgxn/neon/walsender_hooks.h
Normal file
@@ -0,0 +1,7 @@
|
|||||||
|
#ifndef __WALSENDER_HOOKS_H__
|
||||||
|
#define __WALSENDER_HOOKS_H__
|
||||||
|
|
||||||
|
struct XLogReaderRoutine;
|
||||||
|
void NeonOnDemandXLogReaderRoutines(struct XLogReaderRoutine *xlr);
|
||||||
|
|
||||||
|
#endif
|
||||||
@@ -506,6 +506,8 @@ struct WalSender<'a, IO> {
|
|||||||
send_buf: [u8; MAX_SEND_SIZE],
|
send_buf: [u8; MAX_SEND_SIZE],
|
||||||
}
|
}
|
||||||
|
|
||||||
|
const POLL_STATE_TIMEOUT: Duration = Duration::from_secs(1);
|
||||||
|
|
||||||
impl<IO: AsyncRead + AsyncWrite + Unpin> WalSender<'_, IO> {
|
impl<IO: AsyncRead + AsyncWrite + Unpin> WalSender<'_, IO> {
|
||||||
/// Send WAL until
|
/// Send WAL until
|
||||||
/// - an error occurs
|
/// - an error occurs
|
||||||
@@ -584,14 +586,22 @@ impl<IO: AsyncRead + AsyncWrite + Unpin> WalSender<'_, IO> {
|
|||||||
async fn wait_wal(&mut self) -> Result<(), CopyStreamHandlerEnd> {
|
async fn wait_wal(&mut self) -> Result<(), CopyStreamHandlerEnd> {
|
||||||
loop {
|
loop {
|
||||||
self.end_pos = self.end_watch.get();
|
self.end_pos = self.end_watch.get();
|
||||||
if self.end_pos > self.start_pos {
|
let have_something_to_send = (|| {
|
||||||
// We have something to send.
|
fail::fail_point!(
|
||||||
|
"sk-pause-send",
|
||||||
|
self.appname.as_deref() != Some("pageserver"),
|
||||||
|
|_| { false }
|
||||||
|
);
|
||||||
|
self.end_pos > self.start_pos
|
||||||
|
})();
|
||||||
|
|
||||||
|
if have_something_to_send {
|
||||||
trace!("got end_pos {:?}, streaming", self.end_pos);
|
trace!("got end_pos {:?}, streaming", self.end_pos);
|
||||||
return Ok(());
|
return Ok(());
|
||||||
}
|
}
|
||||||
|
|
||||||
// Wait for WAL to appear, now self.end_pos == self.start_pos.
|
// Wait for WAL to appear, now self.end_pos == self.start_pos.
|
||||||
if let Some(lsn) = wait_for_lsn(&mut self.end_watch, self.term, self.start_pos).await? {
|
if let Some(lsn) = self.wait_for_lsn().await? {
|
||||||
self.end_pos = lsn;
|
self.end_pos = lsn;
|
||||||
trace!("got end_pos {:?}, streaming", self.end_pos);
|
trace!("got end_pos {:?}, streaming", self.end_pos);
|
||||||
return Ok(());
|
return Ok(());
|
||||||
@@ -628,6 +638,54 @@ impl<IO: AsyncRead + AsyncWrite + Unpin> WalSender<'_, IO> {
|
|||||||
.await?;
|
.await?;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Wait until we have available WAL > start_pos or timeout expires. Returns
|
||||||
|
/// - Ok(Some(end_pos)) if needed lsn is successfully observed;
|
||||||
|
/// - Ok(None) if timeout expired;
|
||||||
|
/// - Err in case of error -- only if 1) term changed while fetching in recovery
|
||||||
|
/// mode 2) watch channel closed, which must never happen.
|
||||||
|
async fn wait_for_lsn(&mut self) -> anyhow::Result<Option<Lsn>> {
|
||||||
|
let fp = (|| {
|
||||||
|
fail::fail_point!(
|
||||||
|
"sk-pause-send",
|
||||||
|
self.appname.as_deref() != Some("pageserver"),
|
||||||
|
|_| { true }
|
||||||
|
);
|
||||||
|
false
|
||||||
|
})();
|
||||||
|
if fp {
|
||||||
|
tokio::time::sleep(POLL_STATE_TIMEOUT).await;
|
||||||
|
return Ok(None);
|
||||||
|
}
|
||||||
|
|
||||||
|
let res = timeout(POLL_STATE_TIMEOUT, async move {
|
||||||
|
loop {
|
||||||
|
let end_pos = self.end_watch.get();
|
||||||
|
if end_pos > self.start_pos {
|
||||||
|
return Ok(end_pos);
|
||||||
|
}
|
||||||
|
if let EndWatch::Flush(rx) = &self.end_watch {
|
||||||
|
let curr_term = rx.borrow().term;
|
||||||
|
if let Some(client_term) = self.term {
|
||||||
|
if curr_term != client_term {
|
||||||
|
bail!("term changed: requested {}, now {}", client_term, curr_term);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
self.end_watch.changed().await?;
|
||||||
|
}
|
||||||
|
})
|
||||||
|
.await;
|
||||||
|
|
||||||
|
match res {
|
||||||
|
// success
|
||||||
|
Ok(Ok(commit_lsn)) => Ok(Some(commit_lsn)),
|
||||||
|
// error inside closure
|
||||||
|
Ok(Err(err)) => Err(err),
|
||||||
|
// timeout
|
||||||
|
Err(_) => Ok(None),
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/// A half driving receiving replies.
|
/// A half driving receiving replies.
|
||||||
@@ -685,47 +743,6 @@ impl<IO: AsyncRead + AsyncWrite + Unpin> ReplyReader<IO> {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
const POLL_STATE_TIMEOUT: Duration = Duration::from_secs(1);
|
|
||||||
|
|
||||||
/// Wait until we have available WAL > start_pos or timeout expires. Returns
|
|
||||||
/// - Ok(Some(end_pos)) if needed lsn is successfully observed;
|
|
||||||
/// - Ok(None) if timeout expired;
|
|
||||||
/// - Err in case of error -- only if 1) term changed while fetching in recovery
|
|
||||||
/// mode 2) watch channel closed, which must never happen.
|
|
||||||
async fn wait_for_lsn(
|
|
||||||
rx: &mut EndWatch,
|
|
||||||
client_term: Option<Term>,
|
|
||||||
start_pos: Lsn,
|
|
||||||
) -> anyhow::Result<Option<Lsn>> {
|
|
||||||
let res = timeout(POLL_STATE_TIMEOUT, async move {
|
|
||||||
loop {
|
|
||||||
let end_pos = rx.get();
|
|
||||||
if end_pos > start_pos {
|
|
||||||
return Ok(end_pos);
|
|
||||||
}
|
|
||||||
if let EndWatch::Flush(rx) = rx {
|
|
||||||
let curr_term = rx.borrow().term;
|
|
||||||
if let Some(client_term) = client_term {
|
|
||||||
if curr_term != client_term {
|
|
||||||
bail!("term changed: requested {}, now {}", client_term, curr_term);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
rx.changed().await?;
|
|
||||||
}
|
|
||||||
})
|
|
||||||
.await;
|
|
||||||
|
|
||||||
match res {
|
|
||||||
// success
|
|
||||||
Ok(Ok(commit_lsn)) => Ok(Some(commit_lsn)),
|
|
||||||
// error inside closure
|
|
||||||
Ok(Err(err)) => Err(err),
|
|
||||||
// timeout
|
|
||||||
Err(_) => Ok(None),
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
#[cfg(test)]
|
#[cfg(test)]
|
||||||
mod tests {
|
mod tests {
|
||||||
use utils::id::{TenantId, TimelineId};
|
use utils::id::{TenantId, TimelineId};
|
||||||
|
|||||||
@@ -17,8 +17,7 @@ use utils::lsn::Lsn;
|
|||||||
use walproposer::{
|
use walproposer::{
|
||||||
api_bindings::Level,
|
api_bindings::Level,
|
||||||
bindings::{
|
bindings::{
|
||||||
pg_atomic_uint64, NeonWALReadResult, PageserverFeedback, SafekeeperStateDesiredEvents,
|
NeonWALReadResult, SafekeeperStateDesiredEvents, WL_SOCKET_READABLE, WL_SOCKET_WRITEABLE,
|
||||||
WL_SOCKET_READABLE, WL_SOCKET_WRITEABLE,
|
|
||||||
},
|
},
|
||||||
walproposer::{ApiImpl, Config},
|
walproposer::{ApiImpl, Config},
|
||||||
};
|
};
|
||||||
@@ -224,31 +223,13 @@ impl SimulationApi {
|
|||||||
})
|
})
|
||||||
.collect::<Vec<_>>();
|
.collect::<Vec<_>>();
|
||||||
|
|
||||||
let empty_feedback = PageserverFeedback {
|
|
||||||
present: false,
|
|
||||||
currentClusterSize: 0,
|
|
||||||
last_received_lsn: 0,
|
|
||||||
disk_consistent_lsn: 0,
|
|
||||||
remote_consistent_lsn: 0,
|
|
||||||
replytime: 0,
|
|
||||||
shard_number: 0,
|
|
||||||
};
|
|
||||||
|
|
||||||
Self {
|
Self {
|
||||||
os: args.os,
|
os: args.os,
|
||||||
safekeepers: RefCell::new(sk_conns),
|
safekeepers: RefCell::new(sk_conns),
|
||||||
disk: args.disk,
|
disk: args.disk,
|
||||||
redo_start_lsn: args.redo_start_lsn,
|
redo_start_lsn: args.redo_start_lsn,
|
||||||
last_logged_commit_lsn: 0,
|
last_logged_commit_lsn: 0,
|
||||||
shmem: UnsafeCell::new(walproposer::bindings::WalproposerShmemState {
|
shmem: UnsafeCell::new(walproposer::api_bindings::empty_shmem()),
|
||||||
mutex: 0,
|
|
||||||
mineLastElectedTerm: 0,
|
|
||||||
backpressureThrottlingTime: pg_atomic_uint64 { value: 0 },
|
|
||||||
currentClusterSize: pg_atomic_uint64 { value: 0 },
|
|
||||||
shard_ps_feedback: [empty_feedback; 128],
|
|
||||||
num_shards: 0,
|
|
||||||
min_ps_feedback: empty_feedback,
|
|
||||||
}),
|
|
||||||
config: args.config,
|
config: args.config,
|
||||||
event_set: RefCell::new(None),
|
event_set: RefCell::new(None),
|
||||||
}
|
}
|
||||||
@@ -274,6 +255,12 @@ impl ApiImpl for SimulationApi {
|
|||||||
self.os.now() as i64 * 1000
|
self.os.now() as i64 * 1000
|
||||||
}
|
}
|
||||||
|
|
||||||
|
fn update_donor(&self, donor: &mut walproposer::bindings::Safekeeper, donor_lsn: u64) {
|
||||||
|
let mut shmem = unsafe { *self.get_shmem_state() };
|
||||||
|
shmem.propEpochStartLsn.value = donor_lsn;
|
||||||
|
shmem.donor_conninfo = donor.conninfo;
|
||||||
|
}
|
||||||
|
|
||||||
fn conn_status(
|
fn conn_status(
|
||||||
&self,
|
&self,
|
||||||
_: &mut walproposer::bindings::Safekeeper,
|
_: &mut walproposer::bindings::Safekeeper,
|
||||||
|
|||||||
@@ -7,6 +7,7 @@ import pytest
|
|||||||
from fixtures.log_helper import log
|
from fixtures.log_helper import log
|
||||||
from fixtures.neon_fixtures import (
|
from fixtures.neon_fixtures import (
|
||||||
NeonEnv,
|
NeonEnv,
|
||||||
|
NeonEnvBuilder,
|
||||||
logical_replication_sync,
|
logical_replication_sync,
|
||||||
wait_for_last_flush_lsn,
|
wait_for_last_flush_lsn,
|
||||||
)
|
)
|
||||||
@@ -203,6 +204,81 @@ def test_obsolete_slot_drop(neon_simple_env: NeonEnv, vanilla_pg):
|
|||||||
wait_until(number_of_iterations=10, interval=2, func=partial(slot_removed, endpoint))
|
wait_until(number_of_iterations=10, interval=2, func=partial(slot_removed, endpoint))
|
||||||
|
|
||||||
|
|
||||||
|
# Tests that walsender correctly blocks until WAL is downloaded from safekeepers
|
||||||
|
def test_lr_with_slow_safekeeper(neon_env_builder: NeonEnvBuilder, vanilla_pg):
|
||||||
|
neon_env_builder.num_safekeepers = 3
|
||||||
|
env = neon_env_builder.init_start()
|
||||||
|
|
||||||
|
env.neon_cli.create_branch("init")
|
||||||
|
endpoint = env.endpoints.create_start("init")
|
||||||
|
|
||||||
|
with endpoint.connect().cursor() as cur:
|
||||||
|
cur.execute("create table wal_generator (id serial primary key, data text)")
|
||||||
|
cur.execute(
|
||||||
|
"""
|
||||||
|
INSERT INTO wal_generator (data)
|
||||||
|
SELECT repeat('A', 1024) -- Generates a kilobyte of data per row
|
||||||
|
FROM generate_series(1, 16384) AS seq; -- Inserts enough rows to exceed 16MB of data
|
||||||
|
"""
|
||||||
|
)
|
||||||
|
cur.execute("create table t(a int)")
|
||||||
|
cur.execute("create publication pub for table t")
|
||||||
|
cur.execute("insert into t values (1)")
|
||||||
|
|
||||||
|
vanilla_pg.start()
|
||||||
|
vanilla_pg.safe_psql("create table t(a int)")
|
||||||
|
connstr = endpoint.connstr().replace("'", "''")
|
||||||
|
vanilla_pg.safe_psql(f"create subscription sub1 connection '{connstr}' publication pub")
|
||||||
|
logical_replication_sync(vanilla_pg, endpoint)
|
||||||
|
vanilla_pg.stop()
|
||||||
|
|
||||||
|
# Pause the safekeepers so that they can't send WAL (except to pageserver)
|
||||||
|
for sk in env.safekeepers:
|
||||||
|
sk_http = sk.http_client()
|
||||||
|
sk_http.configure_failpoints([("sk-pause-send", "return")])
|
||||||
|
|
||||||
|
# Insert a 2
|
||||||
|
with endpoint.connect().cursor() as cur:
|
||||||
|
cur.execute("insert into t values (2)")
|
||||||
|
|
||||||
|
endpoint.stop_and_destroy()
|
||||||
|
|
||||||
|
# This new endpoint should contain [1, 2], but it can't access WAL from safekeeper
|
||||||
|
endpoint = env.endpoints.create_start("init")
|
||||||
|
with endpoint.connect().cursor() as cur:
|
||||||
|
cur.execute("select * from t")
|
||||||
|
res = [r[0] for r in cur.fetchall()]
|
||||||
|
assert res == [1, 2]
|
||||||
|
|
||||||
|
# Reconnect subscriber
|
||||||
|
vanilla_pg.start()
|
||||||
|
connstr = endpoint.connstr().replace("'", "''")
|
||||||
|
vanilla_pg.safe_psql(f"alter subscription sub1 connection '{connstr}'")
|
||||||
|
|
||||||
|
time.sleep(5)
|
||||||
|
# Make sure the 2 isn't replicated
|
||||||
|
assert [r[0] for r in vanilla_pg.safe_psql("select * from t")] == [1]
|
||||||
|
|
||||||
|
# Re-enable WAL download
|
||||||
|
for sk in env.safekeepers:
|
||||||
|
sk_http = sk.http_client()
|
||||||
|
sk_http.configure_failpoints([("sk-pause-send", "off")])
|
||||||
|
|
||||||
|
logical_replication_sync(vanilla_pg, endpoint)
|
||||||
|
assert [r[0] for r in vanilla_pg.safe_psql("select * from t")] == [1, 2]
|
||||||
|
|
||||||
|
# Check that local reads also work
|
||||||
|
with endpoint.connect().cursor() as cur:
|
||||||
|
cur.execute("insert into t values (3)")
|
||||||
|
logical_replication_sync(vanilla_pg, endpoint)
|
||||||
|
assert [r[0] for r in vanilla_pg.safe_psql("select * from t")] == [1, 2, 3]
|
||||||
|
|
||||||
|
log_path = vanilla_pg.pgdatadir / "pg.log"
|
||||||
|
with open(log_path, "r") as log_file:
|
||||||
|
logs = log_file.read()
|
||||||
|
assert "could not receive data from WAL stream" not in logs
|
||||||
|
|
||||||
|
|
||||||
# Test compute start at LSN page of which starts with contrecord
|
# Test compute start at LSN page of which starts with contrecord
|
||||||
# https://github.com/neondatabase/neon/issues/5749
|
# https://github.com/neondatabase/neon/issues/5749
|
||||||
def test_wal_page_boundary_start(neon_simple_env: NeonEnv, vanilla_pg):
|
def test_wal_page_boundary_start(neon_simple_env: NeonEnv, vanilla_pg):
|
||||||
|
|||||||
Reference in New Issue
Block a user