mirror of
https://github.com/neondatabase/neon.git
synced 2026-05-14 11:40:38 +00:00
Compare commits
6 Commits
release-85
...
sk-wp-grac
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
f0cbd5353a | ||
|
|
8ea21686e1 | ||
|
|
a8e7eede2a | ||
|
|
2b91f507a8 | ||
|
|
bb2c3253c6 | ||
|
|
bdf3769a2b |
@@ -566,9 +566,7 @@ WalProposerInit(XLogRecPtr flushRecPtr, uint64 systemId)
|
||||
}
|
||||
|
||||
initStringInfo(&safekeeper[n_safekeepers].outbuf);
|
||||
safekeeper[n_safekeepers].xlogreader = XLogReaderAllocate(wal_segment_size, NULL, XL_ROUTINE(.segment_open = wal_segment_open,.segment_close = wal_segment_close), NULL);
|
||||
if (safekeeper[n_safekeepers].xlogreader == NULL)
|
||||
elog(FATAL, "Failed to allocate xlog reader");
|
||||
safekeeper[n_safekeepers].xlogreader = NULL;
|
||||
safekeeper[n_safekeepers].flushWrite = false;
|
||||
safekeeper[n_safekeepers].startStreamingAt = InvalidXLogRecPtr;
|
||||
safekeeper[n_safekeepers].streamingAt = InvalidXLogRecPtr;
|
||||
@@ -716,6 +714,12 @@ ShutdownConnection(Safekeeper *sk)
|
||||
sk->voteResponse.termHistory.entries = NULL;
|
||||
|
||||
HackyRemoveWalProposerEvent(sk);
|
||||
|
||||
if (sk->xlogreader)
|
||||
{
|
||||
NeonWALReaderFree(sk->xlogreader);
|
||||
sk->xlogreader = NULL;
|
||||
}
|
||||
}
|
||||
|
||||
/*
|
||||
@@ -1238,8 +1242,8 @@ HandleElectedProposer(void)
|
||||
LSN_FORMAT_ARGS(truncateLsn),
|
||||
LSN_FORMAT_ARGS(propEpochStartLsn));
|
||||
/* Perform recovery */
|
||||
if (!WalProposerRecovery(donor, greetRequest.timeline, truncateLsn, propEpochStartLsn))
|
||||
elog(FATAL, "Failed to recover state");
|
||||
// if (!WalProposerRecovery(donor, greetRequest.timeline, truncateLsn, propEpochStartLsn))
|
||||
// elog(FATAL, "Failed to recover state");
|
||||
}
|
||||
else if (syncSafekeepers)
|
||||
{
|
||||
@@ -1555,6 +1559,12 @@ SendProposerElected(Safekeeper *sk)
|
||||
term_t lastCommonTerm;
|
||||
int i;
|
||||
|
||||
/* It's a good moment to create WAL reader */
|
||||
Assert(!sk->xlogreader);
|
||||
sk->xlogreader = NeonWALReaderAllocate(wal_segment_size, propEpochStartLsn);
|
||||
if (!sk->xlogreader)
|
||||
elog(FATAL, "failed to allocate xlog reader");
|
||||
|
||||
/*
|
||||
* Determine start LSN by comparing safekeeper's log term switch history
|
||||
* and proposer's, searching for the divergence point.
|
||||
@@ -1834,19 +1844,24 @@ SendAppendRequests(Safekeeper *sk)
|
||||
|
||||
/* write the WAL itself */
|
||||
enlargeStringInfo(&sk->outbuf, req->endLsn - req->beginLsn);
|
||||
if (!WALRead(sk->xlogreader,
|
||||
|
||||
if (!NeonWALRead(sk->xlogreader,
|
||||
&sk->outbuf.data[sk->outbuf.len],
|
||||
req->beginLsn,
|
||||
req->endLsn - req->beginLsn,
|
||||
#if PG_VERSION_NUM >= 150000
|
||||
/* FIXME don't use hardcoded timeline_id here */
|
||||
1,
|
||||
1
|
||||
#else
|
||||
ThisTimeLineID,
|
||||
ThisTimeLineID
|
||||
#endif
|
||||
&errinfo))
|
||||
))
|
||||
{
|
||||
WALReadRaiseError(&errinfo);
|
||||
elog(WARNING, "WAL reading for node %s:%s failed: %s",
|
||||
sk->host, sk->port,
|
||||
sk->xlogreader->err_msg);
|
||||
ShutdownConnection(sk);
|
||||
return false;
|
||||
}
|
||||
sk->outbuf.len += req->endLsn - req->beginLsn;
|
||||
|
||||
|
||||
@@ -2,6 +2,7 @@
|
||||
#define __NEON_WALPROPOSER_H__
|
||||
|
||||
#include "access/xlogdefs.h"
|
||||
#include "access/xlogreader.h"
|
||||
#include "postgres.h"
|
||||
#include "port.h"
|
||||
#include "access/xlog_internal.h"
|
||||
@@ -327,6 +328,24 @@ typedef struct AppendResponse
|
||||
/* Other fields are fixed part */
|
||||
#define APPENDRESPONSE_FIXEDPART_SIZE offsetof(AppendResponse, rf)
|
||||
|
||||
#define NEON_WALREADER_ERR_MSG_LEN 128
|
||||
|
||||
/*
|
||||
* Like WALRead, but returns error instead of throwing ERROR when segment is
|
||||
* missing + doesn't attempt to read WAL before specified horizon -- basebackup
|
||||
* LSN. Missing WAL should be fetched by peer recovery, or, alternatively, on
|
||||
* demand WAL fetching from safekeepers should be implemented in NeonWALReader.
|
||||
*/
|
||||
typedef struct {
|
||||
/* LSN before */
|
||||
XLogRecPtr available_lsn;
|
||||
WALSegmentContext segcxt;
|
||||
WALOpenSegment seg;
|
||||
int wre_errno;
|
||||
/* Explains failure to read, static for simplicity. */
|
||||
char err_msg[NEON_WALREADER_ERR_MSG_LEN];
|
||||
} NeonWALReader;
|
||||
|
||||
/*
|
||||
* Descriptor of safekeeper
|
||||
*/
|
||||
@@ -358,7 +377,7 @@ typedef struct Safekeeper
|
||||
/*
|
||||
* WAL reader, allocated for each safekeeper.
|
||||
*/
|
||||
XLogReaderState *xlogreader;
|
||||
NeonWALReader *xlogreader;
|
||||
|
||||
/*
|
||||
* Streaming will start here; must be record boundary.
|
||||
@@ -508,4 +527,9 @@ extern bool walprop_blocking_write(WalProposerConn *conn, void const *buf, size_
|
||||
|
||||
extern uint64 BackpressureThrottlingTime(void);
|
||||
|
||||
extern NeonWALReader *NeonWALReaderAllocate(int wal_segment_size, XLogRecPtr available_lsn);
|
||||
extern void NeonWALReaderFree(NeonWALReader *state);
|
||||
extern bool NeonWALRead(NeonWALReader *state, char *buf, XLogRecPtr startptr, Size count, TimeLineID tli);
|
||||
|
||||
|
||||
#endif /* __NEON_WALPROPOSER_H__ */
|
||||
|
||||
@@ -12,6 +12,7 @@
|
||||
#include "replication/slot.h"
|
||||
#include "walproposer_utils.h"
|
||||
#include "replication/walsender_private.h"
|
||||
#include "utils/wait_event.h"
|
||||
|
||||
#include "storage/ipc.h"
|
||||
#include "utils/builtins.h"
|
||||
@@ -657,3 +658,185 @@ XLogBroadcastWalProposer(void)
|
||||
set_ps_display(activitymsg);
|
||||
}
|
||||
}
|
||||
|
||||
/* palloc and initialize NeonWALReader */
|
||||
NeonWALReader *NeonWALReaderAllocate(int wal_segment_size, XLogRecPtr available_lsn)
|
||||
{
|
||||
NeonWALReader *reader;
|
||||
|
||||
reader = (NeonWALReader *)
|
||||
palloc_extended(sizeof(NeonWALReader),
|
||||
MCXT_ALLOC_NO_OOM | MCXT_ALLOC_ZERO);
|
||||
if (!reader)
|
||||
return NULL;
|
||||
|
||||
reader->available_lsn = available_lsn;
|
||||
reader->seg.ws_file = -1;
|
||||
reader->seg.ws_segno = 0;
|
||||
reader->seg.ws_tli = 0;
|
||||
reader->segcxt.ws_segsize = wal_segment_size;
|
||||
|
||||
return reader;
|
||||
}
|
||||
|
||||
static void neon_wal_segment_close(NeonWALReader *state);
|
||||
|
||||
void
|
||||
NeonWALReaderFree(NeonWALReader *state)
|
||||
{
|
||||
if (state->seg.ws_file != -1)
|
||||
neon_wal_segment_close(state);
|
||||
pfree(state);
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* Copy of vanilla wal_segment_open, but returns false in case of error instead
|
||||
* of ERROR, with errno set.
|
||||
*
|
||||
* XLogReaderRoutine->segment_open callback for local pg_wal files
|
||||
*/
|
||||
static bool
|
||||
neon_wal_segment_open(NeonWALReader *state, XLogSegNo nextSegNo,
|
||||
TimeLineID *tli_p)
|
||||
{
|
||||
TimeLineID tli = *tli_p;
|
||||
char path[MAXPGPATH];
|
||||
|
||||
XLogFilePath(path, tli, nextSegNo, state->segcxt.ws_segsize);
|
||||
state->seg.ws_file = BasicOpenFile(path, O_RDONLY | PG_BINARY);
|
||||
if (state->seg.ws_file >= 0)
|
||||
return true;
|
||||
|
||||
return false;
|
||||
}
|
||||
|
||||
/* copy of vanilla wal_segment_close with NeonWALReader */
|
||||
void
|
||||
neon_wal_segment_close(NeonWALReader *state)
|
||||
{
|
||||
close(state->seg.ws_file);
|
||||
/* need to check errno? */
|
||||
state->seg.ws_file = -1;
|
||||
}
|
||||
|
||||
/*
|
||||
* Mostly copy of vanilla WALRead, but 1) returns error if requested data before
|
||||
* available_lsn 2) returns error is segment is missing instead of throwing
|
||||
* ERROR.
|
||||
*
|
||||
* Read 'count' bytes into 'buf', starting at location 'startptr', from WAL
|
||||
* fetched from timeline 'tli'.
|
||||
*
|
||||
* Returns true if succeeded, false if an error occurs, in which case
|
||||
* 'state->errno' shows whether it was missing WAL (ENOENT) or something else,
|
||||
* and 'err' the desciption.
|
||||
*/
|
||||
bool NeonWALRead(NeonWALReader *state, char *buf, XLogRecPtr startptr, Size count, TimeLineID tli)
|
||||
{
|
||||
char *p;
|
||||
XLogRecPtr recptr;
|
||||
Size nbytes;
|
||||
|
||||
if (startptr < state->available_lsn)
|
||||
{
|
||||
state->wre_errno = 0;
|
||||
snprintf(state->err_msg, sizeof(state->err_msg), "failed to read WAL at %X/%X which is earlier than available %X/%X",
|
||||
LSN_FORMAT_ARGS(startptr), LSN_FORMAT_ARGS(state->available_lsn));
|
||||
return false;
|
||||
}
|
||||
|
||||
p = buf;
|
||||
recptr = startptr;
|
||||
nbytes = count;
|
||||
|
||||
while (nbytes > 0)
|
||||
{
|
||||
uint32 startoff;
|
||||
int segbytes;
|
||||
int readbytes;
|
||||
|
||||
startoff = XLogSegmentOffset(recptr, state->segcxt.ws_segsize);
|
||||
|
||||
/*
|
||||
* If the data we want is not in a segment we have open, close what we
|
||||
* have (if anything) and open the next one, using the caller's
|
||||
* provided openSegment callback.
|
||||
*/
|
||||
if (state->seg.ws_file < 0 ||
|
||||
!XLByteInSeg(recptr, state->seg.ws_segno, state->segcxt.ws_segsize) ||
|
||||
tli != state->seg.ws_tli)
|
||||
{
|
||||
XLogSegNo nextSegNo;
|
||||
|
||||
if (state->seg.ws_file >= 0)
|
||||
neon_wal_segment_close(state);
|
||||
|
||||
XLByteToSeg(recptr, nextSegNo, state->segcxt.ws_segsize);
|
||||
if (!neon_wal_segment_open(state, nextSegNo, &tli))
|
||||
{
|
||||
char fname[MAXFNAMELEN];
|
||||
|
||||
state->wre_errno = errno;
|
||||
|
||||
XLogFileName(fname, tli, nextSegNo, state->segcxt.ws_segsize);
|
||||
snprintf(state->err_msg, sizeof(state->err_msg), "failed to open WAL segment %s while reading at %X/%X: %s",
|
||||
fname, LSN_FORMAT_ARGS(recptr), strerror(state->wre_errno));
|
||||
return false;
|
||||
}
|
||||
|
||||
/* This shouldn't happen -- indicates a bug in segment_open */
|
||||
Assert(state->seg.ws_file >= 0);
|
||||
|
||||
/* Update the current segment info. */
|
||||
state->seg.ws_tli = tli;
|
||||
state->seg.ws_segno = nextSegNo;
|
||||
}
|
||||
|
||||
/* How many bytes are within this segment? */
|
||||
if (nbytes > (state->segcxt.ws_segsize - startoff))
|
||||
segbytes = state->segcxt.ws_segsize - startoff;
|
||||
else
|
||||
segbytes = nbytes;
|
||||
|
||||
#ifndef FRONTEND
|
||||
pgstat_report_wait_start(WAIT_EVENT_WAL_READ);
|
||||
#endif
|
||||
|
||||
/* Reset errno first; eases reporting non-errno-affecting errors */
|
||||
errno = 0;
|
||||
readbytes = pg_pread(state->seg.ws_file, p, segbytes, (off_t) startoff);
|
||||
|
||||
#ifndef FRONTEND
|
||||
pgstat_report_wait_end();
|
||||
#endif
|
||||
|
||||
if (readbytes <= 0)
|
||||
{
|
||||
char fname[MAXFNAMELEN];
|
||||
|
||||
XLogFileName(fname, state->seg.ws_tli, state->seg.ws_segno, state->segcxt.ws_segsize);
|
||||
|
||||
if (readbytes < 0)
|
||||
{
|
||||
state->wre_errno = errno;
|
||||
snprintf(state->err_msg, sizeof(state->err_msg), "could not read from log segment %s, offset %d: %m: %s",
|
||||
fname, startoff, strerror(state->wre_errno));
|
||||
}
|
||||
else
|
||||
{
|
||||
snprintf(state->err_msg, sizeof(state->err_msg), "could not read from log segment %s, offset %d: %m: unexpected EOF",
|
||||
fname, startoff);
|
||||
}
|
||||
return false;
|
||||
}
|
||||
|
||||
/* Update state for read */
|
||||
recptr += readbytes;
|
||||
nbytes -= readbytes;
|
||||
p += readbytes;
|
||||
}
|
||||
|
||||
return true;
|
||||
|
||||
}
|
||||
@@ -2,7 +2,7 @@
|
||||
// Main entry point for the safekeeper executable
|
||||
//
|
||||
use anyhow::{bail, Context, Result};
|
||||
use clap::Parser;
|
||||
use clap::{ArgAction, Parser};
|
||||
use futures::future::BoxFuture;
|
||||
use futures::stream::FuturesUnordered;
|
||||
use futures::{FutureExt, StreamExt};
|
||||
@@ -105,6 +105,9 @@ struct Args {
|
||||
/// it during this period passed as a human readable duration.
|
||||
#[arg(long, value_parser= humantime::parse_duration, default_value = DEFAULT_HEARTBEAT_TIMEOUT, verbatim_doc_comment)]
|
||||
heartbeat_timeout: Duration,
|
||||
/// Disable/enable peer recovery. Used for disabling it in tests.
|
||||
#[arg(long, default_value = "true", action=ArgAction::Set)]
|
||||
peer_recovery: bool,
|
||||
/// Remote storage configuration for WAL backup (offloading to s3) as TOML
|
||||
/// inline table, e.g.
|
||||
/// {"max_concurrent_syncs" = 17, "max_sync_errors": 13, "bucket_name": "<BUCKETNAME>", "bucket_region":"<REGION>", "concurrency_limit": 119}
|
||||
@@ -268,6 +271,7 @@ async fn main() -> anyhow::Result<()> {
|
||||
broker_endpoint: args.broker_endpoint,
|
||||
broker_keepalive_interval: args.broker_keepalive_interval,
|
||||
heartbeat_timeout: args.heartbeat_timeout,
|
||||
peer_recovery_enabled: args.peer_recovery,
|
||||
remote_storage: args.remote_storage,
|
||||
max_offloader_lag_bytes: args.max_offloader_lag,
|
||||
wal_backup_enabled: !args.disable_wal_backup,
|
||||
|
||||
@@ -372,6 +372,13 @@ impl SafekeeperPostgresHandler {
|
||||
/// from a walproposer recovery function. This connection gets a special handling:
|
||||
/// safekeeper must stream all local WAL till the flush_lsn, whether committed or not.
|
||||
pub fn is_walproposer_recovery(&self) -> bool {
|
||||
self.appname == Some("wal_proposer_recovery".to_string())
|
||||
match &self.appname {
|
||||
None => false,
|
||||
Some(appname) => {
|
||||
appname == "wal_proposer_recovery" ||
|
||||
// set by safekeeper peer recovery
|
||||
appname.starts_with("safekeeper")
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -16,8 +16,8 @@ use tokio::io::AsyncReadExt;
|
||||
use utils::http::endpoint::request_span;
|
||||
|
||||
use crate::receive_wal::WalReceiverState;
|
||||
use crate::safekeeper::ServerInfo;
|
||||
use crate::safekeeper::Term;
|
||||
use crate::safekeeper::{ServerInfo, TermLsn};
|
||||
use crate::send_wal::WalSenderState;
|
||||
use crate::timeline::PeerInfo;
|
||||
use crate::{debug_dump, pull_timeline};
|
||||
@@ -60,16 +60,25 @@ fn get_conf(request: &Request<Body>) -> &SafeKeeperConf {
|
||||
.as_ref()
|
||||
}
|
||||
|
||||
/// Same as TermSwitchEntry, but serializes LSN using display serializer
|
||||
/// Same as TermLsn, but serializes LSN using display serializer
|
||||
/// in Postgres format, i.e. 0/FFFFFFFF. Used only for the API response.
|
||||
#[serde_as]
|
||||
#[derive(Debug, Serialize, Deserialize)]
|
||||
#[derive(Debug, Clone, Copy, Serialize, Deserialize)]
|
||||
pub struct TermSwitchApiEntry {
|
||||
pub term: Term,
|
||||
#[serde_as(as = "DisplayFromStr")]
|
||||
pub lsn: Lsn,
|
||||
}
|
||||
|
||||
impl From<TermSwitchApiEntry> for TermLsn {
|
||||
fn from(api_val: TermSwitchApiEntry) -> Self {
|
||||
TermLsn {
|
||||
term: api_val.term,
|
||||
lsn: api_val.lsn,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Augment AcceptorState with epoch for convenience
|
||||
#[derive(Debug, Serialize, Deserialize)]
|
||||
pub struct AcceptorStateStatus {
|
||||
|
||||
@@ -62,6 +62,7 @@ pub struct SafeKeeperConf {
|
||||
pub broker_endpoint: Uri,
|
||||
pub broker_keepalive_interval: Duration,
|
||||
pub heartbeat_timeout: Duration,
|
||||
pub peer_recovery_enabled: bool,
|
||||
pub remote_storage: Option<RemoteStorageConfig>,
|
||||
pub max_offloader_lag_bytes: u64,
|
||||
pub backup_parallel_jobs: usize,
|
||||
@@ -100,6 +101,7 @@ impl SafeKeeperConf {
|
||||
.parse()
|
||||
.expect("failed to parse default broker endpoint"),
|
||||
broker_keepalive_interval: Duration::from_secs(5),
|
||||
peer_recovery_enabled: true,
|
||||
wal_backup_enabled: true,
|
||||
backup_parallel_jobs: 1,
|
||||
pg_auth: None,
|
||||
|
||||
@@ -55,9 +55,12 @@ impl WalReceivers {
|
||||
|
||||
/// Register new walreceiver. Returned guard provides access to the slot and
|
||||
/// automatically deregisters in Drop.
|
||||
pub fn register(self: &Arc<WalReceivers>) -> WalReceiverGuard {
|
||||
pub fn register(self: &Arc<WalReceivers>, conn_id: Option<ConnectionId>) -> WalReceiverGuard {
|
||||
let slots = &mut self.mutex.lock().slots;
|
||||
let walreceiver = WalReceiverState::Voting;
|
||||
let walreceiver = WalReceiverState {
|
||||
conn_id,
|
||||
status: WalReceiverStatus::Voting,
|
||||
};
|
||||
// find empty slot or create new one
|
||||
let pos = if let Some(pos) = slots.iter().position(|s| s.is_none()) {
|
||||
slots[pos] = Some(walreceiver);
|
||||
@@ -96,6 +99,18 @@ impl WalReceivers {
|
||||
self.mutex.lock().slots.iter().flatten().cloned().collect()
|
||||
}
|
||||
|
||||
/// Get number of streaming walreceivers (normally 0 or 1) from compute.
|
||||
pub fn get_num_streaming(self: &Arc<WalReceivers>) -> usize {
|
||||
self.mutex
|
||||
.lock()
|
||||
.slots
|
||||
.iter()
|
||||
.flatten()
|
||||
// conn_id.is_none skips recovery which also registers here
|
||||
.filter(|s| s.conn_id.is_none() && matches!(s.status, WalReceiverStatus::Streaming))
|
||||
.count()
|
||||
}
|
||||
|
||||
/// Unregister walsender.
|
||||
fn unregister(self: &Arc<WalReceivers>, id: WalReceiverId) {
|
||||
let mut shared = self.mutex.lock();
|
||||
@@ -108,10 +123,17 @@ struct WalReceiversShared {
|
||||
slots: Vec<Option<WalReceiverState>>,
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, Serialize, Deserialize)]
|
||||
pub struct WalReceiverState {
|
||||
/// None means it is recovery initiated by us (this safekeeper).
|
||||
pub conn_id: Option<ConnectionId>,
|
||||
pub status: WalReceiverStatus,
|
||||
}
|
||||
|
||||
/// Walreceiver status. Currently only whether it passed voting stage and
|
||||
/// started receiving the stream, but it is easy to add more if needed.
|
||||
#[derive(Debug, Clone, Serialize, Deserialize)]
|
||||
pub enum WalReceiverState {
|
||||
pub enum WalReceiverStatus {
|
||||
Voting,
|
||||
Streaming,
|
||||
}
|
||||
@@ -136,8 +158,8 @@ impl Drop for WalReceiverGuard {
|
||||
}
|
||||
}
|
||||
|
||||
const MSG_QUEUE_SIZE: usize = 256;
|
||||
const REPLY_QUEUE_SIZE: usize = 16;
|
||||
pub const MSG_QUEUE_SIZE: usize = 256;
|
||||
pub const REPLY_QUEUE_SIZE: usize = 16;
|
||||
|
||||
impl SafekeeperPostgresHandler {
|
||||
/// Wrapper around handle_start_wal_push_guts handling result. Error is
|
||||
@@ -261,7 +283,7 @@ impl<'a, IO: AsyncRead + AsyncWrite + Unpin> NetworkReader<'a, IO> {
|
||||
tli.clone(),
|
||||
msg_rx,
|
||||
reply_tx,
|
||||
self.conn_id,
|
||||
Some(self.conn_id),
|
||||
));
|
||||
|
||||
// Forward all messages to WalAcceptor
|
||||
@@ -317,31 +339,41 @@ async fn network_write<IO: AsyncRead + AsyncWrite + Unpin>(
|
||||
// even when it writes a steady stream of messages.
|
||||
const KEEPALIVE_INTERVAL: Duration = Duration::from_secs(1);
|
||||
|
||||
/// Takes messages from msg_rx, processes and pushes replies to reply_tx.
|
||||
struct WalAcceptor {
|
||||
/// Encapsulates a task which takes messages from msg_rx, processes and pushes
|
||||
/// replies to reply_tx; reading from socket and writing to disk in parallel is
|
||||
/// beneficial for performance, this struct provides writing to disk part.
|
||||
pub struct WalAcceptor {
|
||||
tli: Arc<Timeline>,
|
||||
msg_rx: Receiver<ProposerAcceptorMessage>,
|
||||
reply_tx: Sender<AcceptorProposerMessage>,
|
||||
conn_id: Option<ConnectionId>,
|
||||
}
|
||||
|
||||
impl WalAcceptor {
|
||||
/// Spawn thread with WalAcceptor running, return handle to it.
|
||||
fn spawn(
|
||||
/// Spawn task with WalAcceptor running, return handle to it. Task returns
|
||||
/// Ok(()) if either of channels has closed, and Err if any error during
|
||||
/// message processing is encountered.
|
||||
///
|
||||
/// conn_id None means WalAcceptor is used by recovery initiated at this safekeeper.
|
||||
pub fn spawn(
|
||||
tli: Arc<Timeline>,
|
||||
msg_rx: Receiver<ProposerAcceptorMessage>,
|
||||
reply_tx: Sender<AcceptorProposerMessage>,
|
||||
conn_id: ConnectionId,
|
||||
conn_id: Option<ConnectionId>,
|
||||
) -> JoinHandle<anyhow::Result<()>> {
|
||||
task::spawn(async move {
|
||||
let mut wa = WalAcceptor {
|
||||
tli,
|
||||
msg_rx,
|
||||
reply_tx,
|
||||
conn_id,
|
||||
};
|
||||
|
||||
let span_ttid = wa.tli.ttid; // satisfy borrow checker
|
||||
wa.run()
|
||||
.instrument(info_span!("WAL acceptor", cid = %conn_id, ttid = %span_ttid))
|
||||
.instrument(
|
||||
info_span!("WAL acceptor", cid = %conn_id.unwrap_or(0), ttid = %span_ttid),
|
||||
)
|
||||
.await
|
||||
})
|
||||
}
|
||||
@@ -355,7 +387,7 @@ impl WalAcceptor {
|
||||
let _compute_conn_guard = ComputeConnectionGuard {
|
||||
timeline: Arc::clone(&self.tli),
|
||||
};
|
||||
let walreceiver_guard = self.tli.get_walreceivers().register();
|
||||
let walreceiver_guard = self.tli.get_walreceivers().register(self.conn_id);
|
||||
self.tli.update_status_notify().await?;
|
||||
|
||||
// After this timestamp we will stop processing AppendRequests and send a response
|
||||
@@ -372,7 +404,7 @@ impl WalAcceptor {
|
||||
|
||||
// Update walreceiver state in shmem for reporting.
|
||||
if let ProposerAcceptorMessage::Elected(_) = &next_msg {
|
||||
*walreceiver_guard.get() = WalReceiverState::Streaming;
|
||||
walreceiver_guard.get().status = WalReceiverStatus::Streaming;
|
||||
}
|
||||
|
||||
let reply_msg = if matches!(next_msg, ProposerAcceptorMessage::AppendRequest(_)) {
|
||||
|
||||
@@ -1,17 +1,41 @@
|
||||
//! This module implements pulling WAL from peer safekeepers if compute can't
|
||||
//! provide it, i.e. safekeeper lags too much.
|
||||
|
||||
use std::sync::Arc;
|
||||
use std::time::SystemTime;
|
||||
use std::{fmt, pin::pin, sync::Arc};
|
||||
|
||||
use tokio::{select, time::sleep, time::Duration};
|
||||
use tracing::{info, instrument};
|
||||
use anyhow::{bail, Context};
|
||||
use futures::StreamExt;
|
||||
use postgres_protocol::message::backend::ReplicationMessage;
|
||||
use tokio::sync::mpsc::{channel, Receiver, Sender};
|
||||
use tokio::time::timeout;
|
||||
use tokio::{
|
||||
select,
|
||||
time::sleep,
|
||||
time::{self, Duration},
|
||||
};
|
||||
use tokio_postgres::replication::ReplicationStream;
|
||||
use tokio_postgres::types::PgLsn;
|
||||
use tracing::*;
|
||||
use utils::{id::NodeId, lsn::Lsn, postgres_client::wal_stream_connection_config};
|
||||
|
||||
use crate::{timeline::Timeline, SafeKeeperConf};
|
||||
use crate::receive_wal::{WalAcceptor, REPLY_QUEUE_SIZE};
|
||||
use crate::safekeeper::{AppendRequest, AppendRequestHeader};
|
||||
use crate::{
|
||||
http::routes::TimelineStatus,
|
||||
receive_wal::MSG_QUEUE_SIZE,
|
||||
safekeeper::{
|
||||
AcceptorProposerMessage, ProposerAcceptorMessage, ProposerElected, Term, TermHistory,
|
||||
TermLsn, VoteRequest,
|
||||
},
|
||||
timeline::{PeerInfo, Timeline},
|
||||
SafeKeeperConf,
|
||||
};
|
||||
|
||||
/// Entrypoint for per timeline task which always runs, checking whether
|
||||
/// recovery for this safekeeper is needed and starting it if so.
|
||||
#[instrument(name = "recovery task", skip_all, fields(ttid = %tli.ttid))]
|
||||
pub async fn recovery_main(tli: Arc<Timeline>, _conf: SafeKeeperConf) {
|
||||
pub async fn recovery_main(tli: Arc<Timeline>, conf: SafeKeeperConf) {
|
||||
info!("started");
|
||||
let mut cancellation_rx = match tli.get_cancellation_rx() {
|
||||
Ok(rx) => rx,
|
||||
@@ -22,19 +46,387 @@ pub async fn recovery_main(tli: Arc<Timeline>, _conf: SafeKeeperConf) {
|
||||
};
|
||||
|
||||
select! {
|
||||
_ = recovery_main_loop(tli) => { unreachable!() }
|
||||
_ = recovery_main_loop(tli, conf) => { unreachable!() }
|
||||
_ = cancellation_rx.changed() => {
|
||||
info!("stopped");
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Result of Timeline::recovery_needed, contains donor(s) if recovery needed and
|
||||
/// fields to explain the choice.
|
||||
#[derive(Debug)]
|
||||
pub struct RecoveryNeededInfo {
|
||||
/// my term
|
||||
pub term: Term,
|
||||
/// my last_log_term
|
||||
pub last_log_term: Term,
|
||||
/// my flush_lsn
|
||||
pub flush_lsn: Lsn,
|
||||
/// peers from which we can fetch WAL, for observability.
|
||||
pub peers: Vec<PeerInfo>,
|
||||
/// for observability
|
||||
pub num_streaming_computes: usize,
|
||||
pub donors: Vec<Donor>,
|
||||
}
|
||||
|
||||
// Custom to omit not important fields from PeerInfo.
|
||||
impl fmt::Display for RecoveryNeededInfo {
|
||||
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
|
||||
write!(f, "{{")?;
|
||||
write!(
|
||||
f,
|
||||
"term: {}, last_log_term: {}, flush_lsn: {}, peers: {{",
|
||||
self.term, self.last_log_term, self.flush_lsn
|
||||
)?;
|
||||
for p in self.peers.iter() {
|
||||
write!(
|
||||
f,
|
||||
"PeerInfo {{ sk_id: {}, term: {}, last_log_term: {}, flush_lsn: {} }}, ",
|
||||
p.sk_id, p.term, p.last_log_term, p.flush_lsn
|
||||
)?;
|
||||
}
|
||||
write!(
|
||||
f,
|
||||
"}} num_streaming_computes: {}, donors: {:?}",
|
||||
self.num_streaming_computes, self.donors
|
||||
)
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Clone, Debug)]
|
||||
pub struct Donor {
|
||||
pub sk_id: NodeId,
|
||||
/// equals to last_log_term
|
||||
pub term: Term,
|
||||
pub flush_lsn: Lsn,
|
||||
pub pg_connstr: String,
|
||||
pub http_connstr: String,
|
||||
}
|
||||
|
||||
impl From<&PeerInfo> for Donor {
|
||||
fn from(p: &PeerInfo) -> Self {
|
||||
Donor {
|
||||
sk_id: p.sk_id,
|
||||
term: p.term,
|
||||
flush_lsn: p.flush_lsn,
|
||||
pg_connstr: p.pg_connstr.clone(),
|
||||
http_connstr: p.http_connstr.clone(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
const CHECK_INTERVAL_MS: u64 = 2000;
|
||||
|
||||
/// Check regularly whether we need to start recovery.
|
||||
async fn recovery_main_loop(_tli: Arc<Timeline>) {
|
||||
async fn recovery_main_loop(tli: Arc<Timeline>, conf: SafeKeeperConf) {
|
||||
let check_duration = Duration::from_millis(CHECK_INTERVAL_MS);
|
||||
loop {
|
||||
let recovery_needed_info = tli.recovery_needed(conf.heartbeat_timeout).await;
|
||||
match recovery_needed_info.donors.first() {
|
||||
Some(donor) => {
|
||||
info!(
|
||||
"starting recovery from donor {}: {}",
|
||||
donor.sk_id, recovery_needed_info
|
||||
);
|
||||
match recover(tli.clone(), donor, &conf).await {
|
||||
// Note: 'write_wal rewrites WAL written before' error is
|
||||
// expected here and might happen if compute and recovery
|
||||
// concurrently write the same data. Eventually compute
|
||||
// should win.
|
||||
Err(e) => warn!("recovery failed: {:#}", e),
|
||||
Ok(msg) => info!("recovery finished: {}", msg),
|
||||
}
|
||||
}
|
||||
None => {
|
||||
trace!(
|
||||
"recovery not needed or not possible: {}",
|
||||
recovery_needed_info
|
||||
);
|
||||
}
|
||||
}
|
||||
sleep(check_duration).await;
|
||||
}
|
||||
}
|
||||
|
||||
/// Recover from the specified donor. Returns message explaining normal finish
|
||||
/// reason or error.
|
||||
async fn recover(
|
||||
tli: Arc<Timeline>,
|
||||
donor: &Donor,
|
||||
conf: &SafeKeeperConf,
|
||||
) -> anyhow::Result<String> {
|
||||
// Learn donor term switch history to figure out starting point.
|
||||
let client = reqwest::Client::new();
|
||||
let timeline_info: TimelineStatus = client
|
||||
.get(format!(
|
||||
"http://{}/v1/tenant/{}/timeline/{}",
|
||||
donor.http_connstr, tli.ttid.tenant_id, tli.ttid.timeline_id
|
||||
))
|
||||
.send()
|
||||
.await?
|
||||
.json()
|
||||
.await?;
|
||||
if timeline_info.acceptor_state.term != donor.term {
|
||||
bail!(
|
||||
"donor term changed from {} to {}",
|
||||
donor.term,
|
||||
timeline_info.acceptor_state.term
|
||||
);
|
||||
}
|
||||
// convert from API TermSwitchApiEntry into TermLsn.
|
||||
let donor_th = TermHistory(
|
||||
timeline_info
|
||||
.acceptor_state
|
||||
.term_history
|
||||
.iter()
|
||||
.map(|tl| Into::<TermLsn>::into(*tl))
|
||||
.collect(),
|
||||
);
|
||||
|
||||
// Now understand our term history.
|
||||
let vote_request = ProposerAcceptorMessage::VoteRequest(VoteRequest { term: donor.term });
|
||||
let vote_response = match tli
|
||||
.process_msg(&vote_request)
|
||||
.await
|
||||
.context("VoteRequest handling")?
|
||||
{
|
||||
Some(AcceptorProposerMessage::VoteResponse(vr)) => vr,
|
||||
_ => {
|
||||
bail!("unexpected VoteRequest response"); // unreachable
|
||||
}
|
||||
};
|
||||
if vote_response.term != donor.term {
|
||||
bail!(
|
||||
"our term changed from {} to {}",
|
||||
donor.term,
|
||||
vote_response.term
|
||||
);
|
||||
}
|
||||
|
||||
let last_common_point = match TermHistory::find_highest_common_point(
|
||||
&donor_th,
|
||||
&vote_response.term_history,
|
||||
vote_response.flush_lsn,
|
||||
) {
|
||||
None => bail!(
|
||||
"couldn't find common point in histories, donor {:?}, sk {:?}",
|
||||
donor_th,
|
||||
vote_response.term_history,
|
||||
),
|
||||
Some(lcp) => lcp,
|
||||
};
|
||||
info!("found last common point at {:?}", last_common_point);
|
||||
|
||||
// truncate WAL locally
|
||||
let pe = ProposerAcceptorMessage::Elected(ProposerElected {
|
||||
term: donor.term,
|
||||
start_streaming_at: last_common_point.lsn,
|
||||
term_history: donor_th,
|
||||
timeline_start_lsn: Lsn::INVALID,
|
||||
});
|
||||
// Successful ProposerElected handling always returns None. If term changed,
|
||||
// we'll find out that during the streaming. Note: it is expected to get
|
||||
// 'refusing to overwrite correct WAL' here if walproposer reconnected
|
||||
// concurrently, restart helps here.
|
||||
tli.process_msg(&pe)
|
||||
.await
|
||||
.context("ProposerElected handling")?;
|
||||
|
||||
recovery_stream(tli, donor, last_common_point.lsn, conf).await
|
||||
}
|
||||
|
||||
// Pull WAL from donor, assuming handshake is already done.
|
||||
async fn recovery_stream(
|
||||
tli: Arc<Timeline>,
|
||||
donor: &Donor,
|
||||
start_streaming_at: Lsn,
|
||||
conf: &SafeKeeperConf,
|
||||
) -> anyhow::Result<String> {
|
||||
// TODO: pass auth token
|
||||
let cfg = wal_stream_connection_config(tli.ttid, &donor.pg_connstr, None, None)?;
|
||||
let mut cfg = cfg.to_tokio_postgres_config();
|
||||
// It will make safekeeper give out not committed WAL (up to flush_lsn).
|
||||
cfg.application_name(&format!("safekeeper_{}", conf.my_id));
|
||||
cfg.replication_mode(tokio_postgres::config::ReplicationMode::Physical);
|
||||
|
||||
let connect_timeout = Duration::from_millis(10000);
|
||||
let (client, connection) = match time::timeout(connect_timeout, cfg.connect(postgres::NoTls))
|
||||
.await
|
||||
{
|
||||
Ok(client_and_conn) => client_and_conn?,
|
||||
Err(_elapsed) => {
|
||||
bail!("timed out while waiting {connect_timeout:?} for connection to peer safekeeper to open");
|
||||
}
|
||||
};
|
||||
trace!("connected to {:?}", donor);
|
||||
|
||||
// The connection object performs the actual communication with the
|
||||
// server, spawn it off to run on its own.
|
||||
let ttid = tli.ttid;
|
||||
tokio::spawn(async move {
|
||||
if let Err(e) = connection
|
||||
.instrument(info_span!("recovery task connection poll", ttid = %ttid))
|
||||
.await
|
||||
{
|
||||
// This logging isn't very useful as error is anyway forwarded to client.
|
||||
trace!(
|
||||
"tokio_postgres connection object finished with error: {}",
|
||||
e
|
||||
);
|
||||
}
|
||||
});
|
||||
|
||||
let query = format!(
|
||||
"START_REPLICATION PHYSICAL {} (term='{}')",
|
||||
start_streaming_at, donor.term
|
||||
);
|
||||
|
||||
let copy_stream = client.copy_both_simple(&query).await?;
|
||||
let physical_stream = ReplicationStream::new(copy_stream);
|
||||
|
||||
// As in normal walreceiver, do networking and writing to disk in parallel.
|
||||
let (msg_tx, msg_rx) = channel(MSG_QUEUE_SIZE);
|
||||
let (reply_tx, reply_rx) = channel(REPLY_QUEUE_SIZE);
|
||||
let wa = WalAcceptor::spawn(tli.clone(), msg_rx, reply_tx, None);
|
||||
|
||||
let res = tokio::select! {
|
||||
r = network_io(physical_stream, msg_tx, donor.clone(), tli.clone(), conf.clone()) => r,
|
||||
r = read_replies(reply_rx, donor.term) => r.map(|()| None),
|
||||
};
|
||||
|
||||
// Join the spawned WalAcceptor. At this point chans to/from it passed to
|
||||
// network routines are dropped, so it will exit as soon as it touches them.
|
||||
match wa.await {
|
||||
Ok(Ok(())) => {
|
||||
// WalAcceptor finished normally, termination reason is different
|
||||
match res {
|
||||
Ok(Some(success_desc)) => Ok(success_desc),
|
||||
Ok(None) => bail!("unexpected recovery end without error/success"), // can't happen
|
||||
Err(e) => Err(e), // network error or term change
|
||||
}
|
||||
}
|
||||
Ok(Err(e)) => Err(e), // error while processing message
|
||||
Err(e) => bail!("WalAcceptor panicked: {}", e),
|
||||
}
|
||||
}
|
||||
|
||||
// Perform network part of streaming: read data and push it to msg_tx, send KA
|
||||
// to make sender hear from us. If there is nothing coming for a while, check
|
||||
// for termination.
|
||||
// Returns
|
||||
// - Ok(None) if channel to WalAcceptor closed -- its task should return error.
|
||||
// - Ok(Some(String)) if recovery successfully completed.
|
||||
// - Err if error happened while reading/writing to socket.
|
||||
async fn network_io(
|
||||
physical_stream: ReplicationStream,
|
||||
msg_tx: Sender<ProposerAcceptorMessage>,
|
||||
donor: Donor,
|
||||
tli: Arc<Timeline>,
|
||||
conf: SafeKeeperConf,
|
||||
) -> anyhow::Result<Option<String>> {
|
||||
let mut physical_stream = pin!(physical_stream);
|
||||
let mut last_received_lsn = Lsn::INVALID;
|
||||
// tear down connection if no data arrives withing this period
|
||||
let no_data_timeout = Duration::from_millis(30000);
|
||||
|
||||
loop {
|
||||
let msg = match timeout(no_data_timeout, physical_stream.next()).await {
|
||||
Ok(next) => match next {
|
||||
None => bail!("unexpected end of replication stream"),
|
||||
Some(msg) => msg.context("get replication message")?,
|
||||
},
|
||||
Err(_) => bail!("no message received within {:?}", no_data_timeout),
|
||||
};
|
||||
|
||||
match msg {
|
||||
ReplicationMessage::XLogData(xlog_data) => {
|
||||
let ar_hdr = AppendRequestHeader {
|
||||
term: donor.term,
|
||||
epoch_start_lsn: Lsn::INVALID, // unused
|
||||
begin_lsn: Lsn(xlog_data.wal_start()),
|
||||
end_lsn: Lsn(xlog_data.wal_start()) + xlog_data.data().len() as u64,
|
||||
commit_lsn: Lsn::INVALID, // do not attempt to advance, peer communication anyway does it
|
||||
truncate_lsn: Lsn::INVALID, // do not attempt to advance
|
||||
proposer_uuid: [0; 16],
|
||||
};
|
||||
let ar = AppendRequest {
|
||||
h: ar_hdr,
|
||||
wal_data: xlog_data.into_data(),
|
||||
};
|
||||
trace!(
|
||||
"processing AppendRequest {}-{}, len {}",
|
||||
ar.h.begin_lsn,
|
||||
ar.h.end_lsn,
|
||||
ar.wal_data.len()
|
||||
);
|
||||
last_received_lsn = ar.h.end_lsn;
|
||||
if msg_tx
|
||||
.send(ProposerAcceptorMessage::AppendRequest(ar))
|
||||
.await
|
||||
.is_err()
|
||||
{
|
||||
return Ok(None); // chan closed, WalAcceptor terminated
|
||||
}
|
||||
}
|
||||
ReplicationMessage::PrimaryKeepAlive(_) => {
|
||||
// keepalive means nothing is being streamed for a while. Check whether we need to stop.
|
||||
let recovery_needed_info = tli.recovery_needed(conf.heartbeat_timeout).await;
|
||||
// do current donors still contain one we currently connected to?
|
||||
if !recovery_needed_info
|
||||
.donors
|
||||
.iter()
|
||||
.any(|d| d.sk_id == donor.sk_id)
|
||||
{
|
||||
// Most likely it means we are caughtup.
|
||||
// note: just exiting makes tokio_postgres send CopyFail to the far end.
|
||||
return Ok(Some(format!(
|
||||
"terminating at {} as connected safekeeper {} with term {} is not a donor anymore: {}",
|
||||
last_received_lsn, donor.sk_id, donor.term, recovery_needed_info
|
||||
)));
|
||||
}
|
||||
}
|
||||
_ => {}
|
||||
}
|
||||
// Send reply to each message to keep connection alive. Ideally we
|
||||
// should do that once in a while instead, but this again requires
|
||||
// stream split or similar workaround, and recovery is anyway not that
|
||||
// performance critical.
|
||||
//
|
||||
// We do not know here real write/flush LSNs (need to take mutex again
|
||||
// or check replies which are read in different future), but neither
|
||||
// sender much cares about them, so just send last received.
|
||||
physical_stream
|
||||
.as_mut()
|
||||
.standby_status_update(
|
||||
PgLsn::from(last_received_lsn.0),
|
||||
PgLsn::from(last_received_lsn.0),
|
||||
PgLsn::from(last_received_lsn.0),
|
||||
SystemTime::now(),
|
||||
0,
|
||||
)
|
||||
.await?;
|
||||
}
|
||||
}
|
||||
|
||||
// Read replies from WalAcceptor. We are not interested much in sending them to
|
||||
// donor safekeeper, so don't route them anywhere. However, we should check if
|
||||
// term changes and exit if it does.
|
||||
// Returns Ok(()) if channel closed, Err in case of term change.
|
||||
async fn read_replies(
|
||||
mut reply_rx: Receiver<AcceptorProposerMessage>,
|
||||
donor_term: Term,
|
||||
) -> anyhow::Result<()> {
|
||||
loop {
|
||||
match reply_rx.recv().await {
|
||||
Some(msg) => {
|
||||
if let AcceptorProposerMessage::AppendResponse(ar) = msg {
|
||||
if ar.term != donor_term {
|
||||
bail!("donor term changed from {} to {}", donor_term, ar.term);
|
||||
}
|
||||
}
|
||||
}
|
||||
None => return Ok(()), // chan closed, WalAcceptor terminated
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -91,6 +91,59 @@ impl TermHistory {
|
||||
}
|
||||
TermHistory(res)
|
||||
}
|
||||
|
||||
/// Find point of divergence between leader (walproposer) term history and
|
||||
/// safekeeper. Arguments are not symmetrics as proposer history ends at
|
||||
/// +infinity while safekeeper at flush_lsn.
|
||||
/// C version is at walproposer SendProposerElected.
|
||||
pub fn find_highest_common_point(
|
||||
prop_th: &TermHistory,
|
||||
sk_th: &TermHistory,
|
||||
sk_wal_end: Lsn,
|
||||
) -> Option<TermLsn> {
|
||||
let (prop_th, sk_th) = (&prop_th.0, &sk_th.0); // avoid .0 below
|
||||
// find last common term, if any...
|
||||
let mut last_common_idx = None;
|
||||
for i in 0..min(sk_th.len(), prop_th.len()) {
|
||||
if prop_th[i].term != sk_th[i].term {
|
||||
break;
|
||||
}
|
||||
// If term is the same, LSN must be equal as well.
|
||||
assert!(
|
||||
prop_th[i].lsn == sk_th[i].lsn,
|
||||
"same term {} has different start LSNs: prop {}, sk {}",
|
||||
prop_th[i].term,
|
||||
prop_th[i].lsn,
|
||||
sk_th[i].lsn
|
||||
);
|
||||
last_common_idx = Some(i);
|
||||
}
|
||||
let last_common_idx = match last_common_idx {
|
||||
None => return None, // no common point
|
||||
Some(lci) => lci,
|
||||
};
|
||||
// Now find where it ends at both prop and sk and take min. End of
|
||||
// (common) term is the start of the next except it is the last one;
|
||||
// there it is flush_lsn in case of safekeeper or, in case of proposer
|
||||
// +infinity, so we just take flush_lsn then.
|
||||
if last_common_idx == prop_th.len() - 1 {
|
||||
Some(TermLsn {
|
||||
term: prop_th[last_common_idx].term,
|
||||
lsn: sk_wal_end,
|
||||
})
|
||||
} else {
|
||||
let prop_common_term_end = prop_th[last_common_idx + 1].lsn;
|
||||
let sk_common_term_end = if last_common_idx + 1 < sk_th.len() {
|
||||
sk_th[last_common_idx + 1].lsn
|
||||
} else {
|
||||
sk_wal_end
|
||||
};
|
||||
Some(TermLsn {
|
||||
term: prop_th[last_common_idx].term,
|
||||
lsn: min(prop_common_term_end, sk_common_term_end),
|
||||
})
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Display only latest entries for Debug.
|
||||
@@ -305,19 +358,19 @@ pub struct AcceptorGreeting {
|
||||
/// Vote request sent from proposer to safekeepers
|
||||
#[derive(Debug, Deserialize)]
|
||||
pub struct VoteRequest {
|
||||
term: Term,
|
||||
pub term: Term,
|
||||
}
|
||||
|
||||
/// Vote itself, sent from safekeeper to proposer
|
||||
#[derive(Debug, Serialize)]
|
||||
pub struct VoteResponse {
|
||||
term: Term, // safekeeper's current term; if it is higher than proposer's, the compute is out of date.
|
||||
pub term: Term, // safekeeper's current term; if it is higher than proposer's, the compute is out of date.
|
||||
vote_given: u64, // fixme u64 due to padding
|
||||
// Safekeeper flush_lsn (end of WAL) + history of term switches allow
|
||||
// proposer to choose the most advanced one.
|
||||
flush_lsn: Lsn,
|
||||
pub flush_lsn: Lsn,
|
||||
truncate_lsn: Lsn,
|
||||
term_history: TermHistory,
|
||||
pub term_history: TermHistory,
|
||||
timeline_start_lsn: Lsn,
|
||||
}
|
||||
|
||||
@@ -344,7 +397,8 @@ pub struct AppendRequest {
|
||||
pub struct AppendRequestHeader {
|
||||
// safekeeper's current term; if it is higher than proposer's, the compute is out of date.
|
||||
pub term: Term,
|
||||
// LSN since the proposer appends WAL; determines epoch switch point.
|
||||
// TODO: remove this field, it in unused -- LSN of term switch can be taken
|
||||
// from ProposerElected (as well as from term history).
|
||||
pub epoch_start_lsn: Lsn,
|
||||
/// start position of message in WAL
|
||||
pub begin_lsn: Lsn,
|
||||
@@ -759,7 +813,7 @@ where
|
||||
bail!("refusing ProposerElected which is going to overwrite correct WAL: term={}, flush_lsn={}, start_streaming_at={}; restarting the handshake should help",
|
||||
msg.term, self.flush_lsn(), msg.start_streaming_at)
|
||||
}
|
||||
// Otherwise this shouldn't happen.
|
||||
// Otherwise we must never attempt to truncate committed data.
|
||||
assert!(
|
||||
msg.start_streaming_at >= self.inmem.commit_lsn,
|
||||
"attempt to truncate committed data: start_streaming_at={}, commit_lsn={}",
|
||||
@@ -810,6 +864,14 @@ where
|
||||
|
||||
info!("start receiving WAL since {:?}", msg.start_streaming_at);
|
||||
|
||||
// Cache LSN where term starts to immediately fsync control file with
|
||||
// commit_lsn once we reach it -- sync-safekeepers finishes when
|
||||
// persisted commit_lsn on majority of safekeepers aligns.
|
||||
self.epoch_start_lsn = match msg.term_history.0.last() {
|
||||
None => bail!("proposer elected with empty term history"),
|
||||
Some(term_lsn_start) => term_lsn_start.lsn,
|
||||
};
|
||||
|
||||
Ok(None)
|
||||
}
|
||||
|
||||
@@ -835,10 +897,7 @@ where
|
||||
// file: walproposer in sync mode is very interested when this
|
||||
// happens. Note: this is for sync-safekeepers mode only, as
|
||||
// otherwise commit_lsn might jump over epoch_start_lsn.
|
||||
// Also note that commit_lsn can reach epoch_start_lsn earlier
|
||||
// that we receive new epoch_start_lsn, and we still need to sync
|
||||
// control file in this case.
|
||||
if commit_lsn == self.epoch_start_lsn && self.state.commit_lsn != commit_lsn {
|
||||
if commit_lsn >= self.epoch_start_lsn && self.state.commit_lsn < self.epoch_start_lsn {
|
||||
self.persist_control_file(self.state.clone()).await?;
|
||||
}
|
||||
|
||||
@@ -902,7 +961,6 @@ where
|
||||
// Now we know that we are in the same term as the proposer,
|
||||
// processing the message.
|
||||
|
||||
self.epoch_start_lsn = msg.h.epoch_start_lsn;
|
||||
self.inmem.proposer_uuid = msg.h.proposer_uuid;
|
||||
|
||||
// do the job
|
||||
@@ -1185,4 +1243,65 @@ mod tests {
|
||||
sk.wal_store.truncate_wal(Lsn(3)).await.unwrap(); // imitate the complete record at 3 %)
|
||||
assert_eq!(sk.get_epoch(), 1);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_find_highest_common_point_none() {
|
||||
let prop_th = TermHistory(vec![(0, Lsn(1)).into()]);
|
||||
let sk_th = TermHistory(vec![(1, Lsn(1)).into(), (2, Lsn(2)).into()]);
|
||||
assert_eq!(
|
||||
TermHistory::find_highest_common_point(&prop_th, &sk_th, Lsn(3),),
|
||||
None
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_find_highest_common_point_middle() {
|
||||
let prop_th = TermHistory(vec![
|
||||
(1, Lsn(10)).into(),
|
||||
(2, Lsn(20)).into(),
|
||||
(4, Lsn(40)).into(),
|
||||
]);
|
||||
let sk_th = TermHistory(vec![
|
||||
(1, Lsn(10)).into(),
|
||||
(2, Lsn(20)).into(),
|
||||
(3, Lsn(30)).into(), // sk ends last common term 2 at 30
|
||||
]);
|
||||
assert_eq!(
|
||||
TermHistory::find_highest_common_point(&prop_th, &sk_th, Lsn(40),),
|
||||
Some(TermLsn {
|
||||
term: 2,
|
||||
lsn: Lsn(30),
|
||||
})
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_find_highest_common_point_sk_end() {
|
||||
let prop_th = TermHistory(vec![
|
||||
(1, Lsn(10)).into(),
|
||||
(2, Lsn(20)).into(), // last common term 2, sk will end it at 32 sk_end_lsn
|
||||
(4, Lsn(40)).into(),
|
||||
]);
|
||||
let sk_th = TermHistory(vec![(1, Lsn(10)).into(), (2, Lsn(20)).into()]);
|
||||
assert_eq!(
|
||||
TermHistory::find_highest_common_point(&prop_th, &sk_th, Lsn(32),),
|
||||
Some(TermLsn {
|
||||
term: 2,
|
||||
lsn: Lsn(32),
|
||||
})
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_find_highest_common_point_walprop() {
|
||||
let prop_th = TermHistory(vec![(1, Lsn(10)).into(), (2, Lsn(20)).into()]);
|
||||
let sk_th = TermHistory(vec![(1, Lsn(10)).into(), (2, Lsn(20)).into()]);
|
||||
assert_eq!(
|
||||
TermHistory::find_highest_common_point(&prop_th, &sk_th, Lsn(32),),
|
||||
Some(TermLsn {
|
||||
term: 2,
|
||||
lsn: Lsn(32),
|
||||
})
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -418,10 +418,11 @@ impl SafekeeperPostgresHandler {
|
||||
}
|
||||
|
||||
info!(
|
||||
"starting streaming from {:?}, available WAL ends at {}, recovery={}",
|
||||
"starting streaming from {:?}, available WAL ends at {}, recovery={}, appname={:?}",
|
||||
start_pos,
|
||||
end_pos,
|
||||
matches!(end_watch, EndWatch::Flush(_))
|
||||
matches!(end_watch, EndWatch::Flush(_)),
|
||||
appname
|
||||
);
|
||||
|
||||
// switch to copy
|
||||
@@ -680,7 +681,7 @@ impl<IO: AsyncRead + AsyncWrite + Unpin> ReplyReader<IO> {
|
||||
}
|
||||
}
|
||||
|
||||
const POLL_STATE_TIMEOUT: Duration = Duration::from_secs(1);
|
||||
const POLL_STATE_TIMEOUT: Duration = Duration::from_secs(5);
|
||||
|
||||
/// Wait until we have available WAL > start_pos or timeout expires. Returns
|
||||
/// - Ok(Some(end_pos)) if needed lsn is successfully observed;
|
||||
|
||||
@@ -11,6 +11,7 @@ use serde_with::DisplayFromStr;
|
||||
use std::cmp::max;
|
||||
use std::path::PathBuf;
|
||||
use std::sync::Arc;
|
||||
use std::time::Duration;
|
||||
use tokio::sync::{Mutex, MutexGuard};
|
||||
use tokio::{
|
||||
sync::{mpsc::Sender, watch},
|
||||
@@ -27,7 +28,7 @@ use storage_broker::proto::SafekeeperTimelineInfo;
|
||||
use storage_broker::proto::TenantTimelineId as ProtoTenantTimelineId;
|
||||
|
||||
use crate::receive_wal::WalReceivers;
|
||||
use crate::recovery::recovery_main;
|
||||
use crate::recovery::{recovery_main, Donor, RecoveryNeededInfo};
|
||||
use crate::safekeeper::{
|
||||
AcceptorProposerMessage, ProposerAcceptorMessage, SafeKeeper, SafeKeeperState,
|
||||
SafekeeperMemState, ServerInfo, Term, TermLsn, INVALID_TERM,
|
||||
@@ -45,11 +46,12 @@ use crate::{debug_dump, wal_storage};
|
||||
#[derive(Debug, Clone, Serialize, Deserialize)]
|
||||
pub struct PeerInfo {
|
||||
pub sk_id: NodeId,
|
||||
pub term: Term,
|
||||
/// Term of the last entry.
|
||||
_last_log_term: Term,
|
||||
pub last_log_term: Term,
|
||||
/// LSN of the last record.
|
||||
#[serde_as(as = "DisplayFromStr")]
|
||||
_flush_lsn: Lsn,
|
||||
pub flush_lsn: Lsn,
|
||||
#[serde_as(as = "DisplayFromStr")]
|
||||
pub commit_lsn: Lsn,
|
||||
/// Since which LSN safekeeper has WAL. TODO: remove this once we fill new
|
||||
@@ -61,16 +63,21 @@ pub struct PeerInfo {
|
||||
#[serde(skip)]
|
||||
#[serde(default = "Instant::now")]
|
||||
ts: Instant,
|
||||
pub pg_connstr: String,
|
||||
pub http_connstr: String,
|
||||
}
|
||||
|
||||
impl PeerInfo {
|
||||
fn from_sk_info(sk_info: &SafekeeperTimelineInfo, ts: Instant) -> PeerInfo {
|
||||
PeerInfo {
|
||||
sk_id: NodeId(sk_info.safekeeper_id),
|
||||
_last_log_term: sk_info.last_log_term,
|
||||
_flush_lsn: Lsn(sk_info.flush_lsn),
|
||||
term: sk_info.term,
|
||||
last_log_term: sk_info.last_log_term,
|
||||
flush_lsn: Lsn(sk_info.flush_lsn),
|
||||
commit_lsn: Lsn(sk_info.commit_lsn),
|
||||
local_start_lsn: Lsn(sk_info.local_start_lsn),
|
||||
pg_connstr: sk_info.safekeeper_connstr.clone(),
|
||||
http_connstr: sk_info.http_connstr.clone(),
|
||||
ts,
|
||||
}
|
||||
}
|
||||
@@ -265,6 +272,20 @@ impl SharedState {
|
||||
availability_zone: conf.availability_zone.clone(),
|
||||
}
|
||||
}
|
||||
|
||||
/// Get our latest view of alive peers status on the timeline.
|
||||
/// We pass our own info through the broker as well, so when we don't have connection
|
||||
/// to the broker returned vec is empty.
|
||||
fn get_peers(&self, heartbeat_timeout: Duration) -> Vec<PeerInfo> {
|
||||
let now = Instant::now();
|
||||
self.peers_info
|
||||
.0
|
||||
.iter()
|
||||
// Regard peer as absent if we haven't heard from it within heartbeat_timeout.
|
||||
.filter(|p| now.duration_since(p.ts) <= heartbeat_timeout)
|
||||
.cloned()
|
||||
.collect()
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug, thiserror::Error)]
|
||||
@@ -446,7 +467,9 @@ impl Timeline {
|
||||
/// Bootstrap new or existing timeline starting background stasks.
|
||||
pub fn bootstrap(self: &Arc<Timeline>, conf: &SafeKeeperConf) {
|
||||
// Start recovery task which always runs on the timeline.
|
||||
tokio::spawn(recovery_main(self.clone(), conf.clone()));
|
||||
if conf.peer_recovery_enabled {
|
||||
tokio::spawn(recovery_main(self.clone(), conf.clone()));
|
||||
}
|
||||
}
|
||||
|
||||
/// Delete timeline from disk completely, by removing timeline directory. Background
|
||||
@@ -680,20 +703,88 @@ impl Timeline {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Get our latest view of alive peers status on the timeline.
|
||||
/// We pass our own info through the broker as well, so when we don't have connection
|
||||
/// to the broker returned vec is empty.
|
||||
pub async fn get_peers(&self, conf: &SafeKeeperConf) -> Vec<PeerInfo> {
|
||||
let shared_state = self.write_shared_state().await;
|
||||
let now = Instant::now();
|
||||
shared_state
|
||||
.peers_info
|
||||
.0
|
||||
.iter()
|
||||
// Regard peer as absent if we haven't heard from it within heartbeat_timeout.
|
||||
.filter(|p| now.duration_since(p.ts) <= conf.heartbeat_timeout)
|
||||
.cloned()
|
||||
.collect()
|
||||
shared_state.get_peers(conf.heartbeat_timeout)
|
||||
}
|
||||
|
||||
/// Should we start fetching WAL from a peer safekeeper, and if yes, from
|
||||
/// which? Answer is yes, i.e. .donors is not empty if 1) there is something
|
||||
/// to fetch, and we can do that without running elections; 2) there is no
|
||||
/// actively streaming compute, as we don't want to compete with it.
|
||||
///
|
||||
/// If donor(s) are choosen, theirs last_log_term is guaranteed to be equal
|
||||
/// to its last_log_term so we are sure such a leader ever had been elected.
|
||||
///
|
||||
/// All possible donors are returned so that we could keep connection to the
|
||||
/// current one if it is good even if it slightly lags behind.
|
||||
///
|
||||
/// Note that term conditions above might be not met, but safekeepers are
|
||||
/// still not aligned on last flush_lsn. Generally in this case until
|
||||
/// elections are run it is not possible to say which safekeeper should
|
||||
/// recover from which one -- history which would be committed is different
|
||||
/// depending on assembled quorum (e.g. classic picture 8 from Raft paper).
|
||||
/// Thus we don't try to predict it here.
|
||||
pub async fn recovery_needed(&self, heartbeat_timeout: Duration) -> RecoveryNeededInfo {
|
||||
let ss = self.write_shared_state().await;
|
||||
let term = ss.sk.state.acceptor_state.term;
|
||||
let last_log_term = ss.sk.get_epoch();
|
||||
let flush_lsn = ss.sk.flush_lsn();
|
||||
// note that peers contain myself, but that's ok -- we are interested only in peers which are strictly ahead of us.
|
||||
let mut peers = ss.get_peers(heartbeat_timeout);
|
||||
// Sort by <last log term, lsn> pairs.
|
||||
peers.sort_by(|p1, p2| {
|
||||
let tl1 = TermLsn {
|
||||
term: p1.last_log_term,
|
||||
lsn: p1.flush_lsn,
|
||||
};
|
||||
let tl2 = TermLsn {
|
||||
term: p2.last_log_term,
|
||||
lsn: p2.flush_lsn,
|
||||
};
|
||||
tl2.cmp(&tl1) // desc
|
||||
});
|
||||
let num_streaming_computes = self.walreceivers.get_num_streaming();
|
||||
let donors = if num_streaming_computes > 0 {
|
||||
vec![] // If there is a streaming compute, don't try to recover to not intervene.
|
||||
} else {
|
||||
peers
|
||||
.iter()
|
||||
.filter_map(|candidate| {
|
||||
// Are we interested in this candidate?
|
||||
let candidate_tl = TermLsn {
|
||||
term: candidate.last_log_term,
|
||||
lsn: candidate.flush_lsn,
|
||||
};
|
||||
let my_tl = TermLsn {
|
||||
term: last_log_term,
|
||||
lsn: flush_lsn,
|
||||
};
|
||||
if my_tl < candidate_tl {
|
||||
// Yes, we are interested. Can we pull from it without
|
||||
// (re)running elections? It is possible if 1) his term
|
||||
// is equal to his last_log_term so we could act on
|
||||
// behalf of leader of this term (we must be sure he was
|
||||
// ever elected) and 2) our term is not higher, or we'll refuse data.
|
||||
if candidate.term == candidate.last_log_term && candidate.term >= term {
|
||||
Some(Donor::from(candidate))
|
||||
} else {
|
||||
None
|
||||
}
|
||||
} else {
|
||||
None
|
||||
}
|
||||
})
|
||||
.collect()
|
||||
};
|
||||
RecoveryNeededInfo {
|
||||
term,
|
||||
last_log_term,
|
||||
flush_lsn,
|
||||
peers,
|
||||
num_streaming_computes,
|
||||
donors,
|
||||
}
|
||||
}
|
||||
|
||||
pub fn get_walsenders(&self) -> &Arc<WalSenders> {
|
||||
|
||||
@@ -2691,6 +2691,20 @@ class Safekeeper:
|
||||
def data_dir(self) -> str:
|
||||
return os.path.join(self.env.repo_dir, "safekeepers", f"sk{self.id}")
|
||||
|
||||
def timeline_dir(self, tenant_id, timeline_id) -> str:
|
||||
return os.path.join(self.data_dir(), str(tenant_id), str(timeline_id))
|
||||
|
||||
def list_segments(self, tenant_id, timeline_id) -> List[str]:
|
||||
"""
|
||||
Get list of segment names of the given timeline.
|
||||
"""
|
||||
tli_dir = self.timeline_dir(tenant_id, timeline_id)
|
||||
segments = []
|
||||
for _, _, filenames in os.walk(tli_dir):
|
||||
segments.extend([f for f in filenames if f != "safekeeper.control"])
|
||||
segments.sort()
|
||||
return segments
|
||||
|
||||
|
||||
@dataclass
|
||||
class SafekeeperTimelineStatus:
|
||||
|
||||
@@ -157,6 +157,8 @@ def test_pageserver_http_get_wal_receiver_success(neon_simple_env: NeonEnv):
|
||||
tenant_id, timeline_id = env.neon_cli.create_tenant()
|
||||
endpoint = env.endpoints.create_start(DEFAULT_BRANCH_NAME, tenant_id=tenant_id)
|
||||
|
||||
# insert something to force sk -> ps message
|
||||
endpoint.safe_psql("CREATE TABLE t(key int primary key, value text)")
|
||||
# Wait to make sure that we get a latest WAL receiver data.
|
||||
# We need to wait here because it's possible that we don't have access to
|
||||
# the latest WAL yet, when the `timeline_detail` API is first called.
|
||||
@@ -168,7 +170,7 @@ def test_pageserver_http_get_wal_receiver_success(neon_simple_env: NeonEnv):
|
||||
)
|
||||
|
||||
# Make a DB modification then expect getting a new WAL receiver's data.
|
||||
endpoint.safe_psql("CREATE TABLE t(key int primary key, value text)")
|
||||
endpoint.safe_psql("INSERT INTO t VALUES (1, 'hey')")
|
||||
wait_until(
|
||||
number_of_iterations=5,
|
||||
interval=1,
|
||||
|
||||
@@ -1,3 +1,4 @@
|
||||
import filecmp
|
||||
import os
|
||||
import pathlib
|
||||
import random
|
||||
@@ -980,6 +981,137 @@ def test_restart_endpoint(neon_env_builder: NeonEnvBuilder):
|
||||
endpoint.start()
|
||||
|
||||
|
||||
# Test that we can create timeline with one safekeeper down and initialize it
|
||||
# later when some data already had been written.
|
||||
def test_late_init(neon_env_builder: NeonEnvBuilder):
|
||||
neon_env_builder.num_safekeepers = 3
|
||||
env = neon_env_builder.init_start()
|
||||
|
||||
sk1 = env.safekeepers[0]
|
||||
sk1.stop()
|
||||
|
||||
# create and insert smth while safekeeper is down...
|
||||
env.neon_cli.create_branch("test_late_init")
|
||||
endpoint = env.endpoints.create_start("test_late_init")
|
||||
endpoint.safe_psql("create table t(key int, value text)")
|
||||
endpoint.safe_psql("insert into t select generate_series(1, 1000), 'payload'")
|
||||
log.info("insert with safekeeper down done")
|
||||
endpoint.stop() # stop compute
|
||||
|
||||
# stop another safekeeper, and start one which missed timeline creation
|
||||
sk2 = env.safekeepers[1]
|
||||
sk2.stop()
|
||||
sk1.start()
|
||||
|
||||
# insert some more
|
||||
endpoint = env.endpoints.create_start("test_late_init")
|
||||
endpoint.safe_psql("insert into t select generate_series(1,100), 'payload'")
|
||||
|
||||
|
||||
# is timeline flush_lsn equal on provided safekeepers?
|
||||
def is_flush_lsn_aligned(sk1_http_cli, sk2_http_cli, tenant_id, timeline_id):
|
||||
return (
|
||||
sk1_http_cli.timeline_status(tenant_id, timeline_id).flush_lsn
|
||||
== sk2_http_cli.timeline_status(tenant_id, timeline_id).flush_lsn
|
||||
)
|
||||
|
||||
|
||||
# Test behaviour with one safekeeper down and missing a lot of WAL. Namely, that
|
||||
# 1) walproposer can't recover node if it misses WAL written by previous computes, but
|
||||
# still starts up and functions normally if two other sks are ok.
|
||||
# 2) walproposer doesn't keep WAL after some threshold (pg_wal bloat is limited), but functions
|
||||
# normally if two other sks are ok.
|
||||
# 3) Lagged safekeeper can still recover by peer recovery.
|
||||
def test_one_sk_down(neon_env_builder: NeonEnvBuilder):
|
||||
pass
|
||||
|
||||
|
||||
# Smaller version of test_one_sk_down testing peer recovery in isolation: that
|
||||
# it works without compute at all.
|
||||
def test_peer_recovery(neon_env_builder: NeonEnvBuilder):
|
||||
neon_env_builder.num_safekeepers = 3
|
||||
env = neon_env_builder.init_start()
|
||||
|
||||
tenant_id = env.initial_tenant
|
||||
timeline_id = env.neon_cli.create_branch("test_peer_recovery")
|
||||
endpoint = env.endpoints.create_start("test_peer_recovery")
|
||||
|
||||
endpoint.safe_psql("create table t(key int, value text)")
|
||||
|
||||
sk1 = env.safekeepers[0]
|
||||
sk1.stop()
|
||||
|
||||
# roughly fills one segment
|
||||
endpoint.safe_psql("insert into t select generate_series(1,250000), 'payload'")
|
||||
|
||||
endpoint.stop() # stop compute
|
||||
|
||||
# now start safekeeper, but with peer recovery disabled
|
||||
sk1.start(extra_opts=["--peer-recovery=false"])
|
||||
# it should lag for about a segment
|
||||
sk1_http_cli = sk1.http_client()
|
||||
sk2 = env.safekeepers[1]
|
||||
sk2_http_cli = sk2.http_client()
|
||||
sk1_tli_status = sk1_http_cli.timeline_status(tenant_id, timeline_id)
|
||||
sk2_tli_status = sk2_http_cli.timeline_status(tenant_id, timeline_id)
|
||||
log.info(
|
||||
f"flush_lsns after insertion: sk1={sk1_tli_status.flush_lsn}, sk2={sk2_tli_status.flush_lsn}"
|
||||
)
|
||||
assert sk2_tli_status.flush_lsn - sk1_tli_status.flush_lsn >= 16 * 1024 * 1024
|
||||
|
||||
# wait a bit, lsns shouldn't change
|
||||
# time.sleep(5)
|
||||
sk1_tli_status = sk1_http_cli.timeline_status(tenant_id, timeline_id)
|
||||
sk2_tli_status = sk2_http_cli.timeline_status(tenant_id, timeline_id)
|
||||
log.info(
|
||||
f"flush_lsns after waiting: sk1={sk1_tli_status.flush_lsn}, sk2={sk2_tli_status.flush_lsn}"
|
||||
)
|
||||
assert sk2_tli_status.flush_lsn - sk1_tli_status.flush_lsn >= 16 * 1024 * 1024
|
||||
|
||||
# now restart safekeeper with peer recovery enabled and wait for recovery
|
||||
sk1.stop().start()
|
||||
wait(
|
||||
partial(is_flush_lsn_aligned, sk1_http_cli, sk2_http_cli, tenant_id, timeline_id),
|
||||
"flush_lsn to get aligned",
|
||||
wait_f=lambda sk1_http_cli=sk1_http_cli, sk2_http_cli=sk2_http_cli, tenant_id=tenant_id, timeline_id=timeline_id: log.info(
|
||||
f"waiting for flush_lsn alignment, sk1.flush_lsn={sk1_http_cli.timeline_status(tenant_id, timeline_id).flush_lsn}, sk2.flush_lsn={sk2_http_cli.timeline_status(tenant_id, timeline_id).flush_lsn}"
|
||||
),
|
||||
)
|
||||
|
||||
# check that WALs are identic after recovery
|
||||
segs = sk1.list_segments(tenant_id, timeline_id)
|
||||
log.info(f"segs are {segs}")
|
||||
|
||||
(_, mismatch, not_regular) = filecmp.cmpfiles(
|
||||
sk1.timeline_dir(tenant_id, timeline_id),
|
||||
sk2.timeline_dir(tenant_id, timeline_id),
|
||||
segs,
|
||||
shallow=False,
|
||||
)
|
||||
log.info(
|
||||
f"filecmp result mismatch and not regular files:\n\t mismatch={mismatch}\n\t not_regular={not_regular}"
|
||||
)
|
||||
|
||||
for f in mismatch:
|
||||
f1 = os.path.join(sk1.timeline_dir(tenant_id, timeline_id), f)
|
||||
f2 = os.path.join(sk2.timeline_dir(tenant_id, timeline_id), f)
|
||||
stdout_filename = "{}.filediff".format(f2)
|
||||
|
||||
with open(stdout_filename, "w") as stdout_f:
|
||||
subprocess.run("xxd {} > {}.hex ".format(f1, f1), shell=True)
|
||||
subprocess.run("xxd {} > {}.hex ".format(f2, f2), shell=True)
|
||||
|
||||
cmd = "diff {}.hex {}.hex".format(f1, f2)
|
||||
subprocess.run([cmd], stdout=stdout_f, shell=True)
|
||||
|
||||
assert (mismatch, not_regular) == ([], [])
|
||||
|
||||
# stop one of safekeepers which weren't recovering and insert a bit more to check we can commit
|
||||
env.safekeepers[2].stop()
|
||||
endpoint = env.endpoints.create_start("test_peer_recovery")
|
||||
endpoint.safe_psql("insert into t select generate_series(1,100), 'payload'")
|
||||
|
||||
|
||||
class SafekeeperEnv:
|
||||
def __init__(
|
||||
self,
|
||||
|
||||
Reference in New Issue
Block a user