Compare commits

...

6 Commits

Author SHA1 Message Date
Arseny Sher
f0cbd5353a Use in wp custom WAL reader gracefully handling missing WAL.
and disable recovery on start.
2023-10-04 12:51:26 +03:00
Arseny Sher
8ea21686e1 Add safekeeper test_late_init. 2023-10-04 12:50:47 +03:00
Arseny Sher
a8e7eede2a Add check that WAL segments are identical after recovery. 2023-09-20 13:34:44 +03:00
Arseny Sher
2b91f507a8 Make test_pageserver_http_get_wal_receiver_success not wait for keepalive. 2023-09-18 17:44:39 +03:00
Arseny Sher
bb2c3253c6 Introduce safekeeper peer recovery.
Implements fetching of WAL by safekeeper from another safekeeper by imitating
behaviour of last elected leader. This allows to avoid WAL accumulation on
compute and facilitates faster compute startup as it doesn't need to download
any WAL. Actually removing WAL download in walproposer is a matter of another
patch though.

There is a per timeline task which always runs, checking regularly if it should
start recovery frome someone, meaning there is something to fetch and there is
no streaming compute. It then proceeds with fetching, finishing when there is
nothing more to receive.

Implements https://github.com/neondatabase/neon/pull/4875
2023-09-18 17:44:38 +03:00
Arseny Sher
bdf3769a2b Don't use AppenRequestHeader.epoch_start_lsn.
It is simpler to get it once from ProposerEelected.
2023-09-18 17:44:38 +03:00
15 changed files with 1097 additions and 70 deletions

View File

@@ -566,9 +566,7 @@ WalProposerInit(XLogRecPtr flushRecPtr, uint64 systemId)
}
initStringInfo(&safekeeper[n_safekeepers].outbuf);
safekeeper[n_safekeepers].xlogreader = XLogReaderAllocate(wal_segment_size, NULL, XL_ROUTINE(.segment_open = wal_segment_open,.segment_close = wal_segment_close), NULL);
if (safekeeper[n_safekeepers].xlogreader == NULL)
elog(FATAL, "Failed to allocate xlog reader");
safekeeper[n_safekeepers].xlogreader = NULL;
safekeeper[n_safekeepers].flushWrite = false;
safekeeper[n_safekeepers].startStreamingAt = InvalidXLogRecPtr;
safekeeper[n_safekeepers].streamingAt = InvalidXLogRecPtr;
@@ -716,6 +714,12 @@ ShutdownConnection(Safekeeper *sk)
sk->voteResponse.termHistory.entries = NULL;
HackyRemoveWalProposerEvent(sk);
if (sk->xlogreader)
{
NeonWALReaderFree(sk->xlogreader);
sk->xlogreader = NULL;
}
}
/*
@@ -1238,8 +1242,8 @@ HandleElectedProposer(void)
LSN_FORMAT_ARGS(truncateLsn),
LSN_FORMAT_ARGS(propEpochStartLsn));
/* Perform recovery */
if (!WalProposerRecovery(donor, greetRequest.timeline, truncateLsn, propEpochStartLsn))
elog(FATAL, "Failed to recover state");
// if (!WalProposerRecovery(donor, greetRequest.timeline, truncateLsn, propEpochStartLsn))
// elog(FATAL, "Failed to recover state");
}
else if (syncSafekeepers)
{
@@ -1555,6 +1559,12 @@ SendProposerElected(Safekeeper *sk)
term_t lastCommonTerm;
int i;
/* It's a good moment to create WAL reader */
Assert(!sk->xlogreader);
sk->xlogreader = NeonWALReaderAllocate(wal_segment_size, propEpochStartLsn);
if (!sk->xlogreader)
elog(FATAL, "failed to allocate xlog reader");
/*
* Determine start LSN by comparing safekeeper's log term switch history
* and proposer's, searching for the divergence point.
@@ -1834,19 +1844,24 @@ SendAppendRequests(Safekeeper *sk)
/* write the WAL itself */
enlargeStringInfo(&sk->outbuf, req->endLsn - req->beginLsn);
if (!WALRead(sk->xlogreader,
if (!NeonWALRead(sk->xlogreader,
&sk->outbuf.data[sk->outbuf.len],
req->beginLsn,
req->endLsn - req->beginLsn,
#if PG_VERSION_NUM >= 150000
/* FIXME don't use hardcoded timeline_id here */
1,
1
#else
ThisTimeLineID,
ThisTimeLineID
#endif
&errinfo))
))
{
WALReadRaiseError(&errinfo);
elog(WARNING, "WAL reading for node %s:%s failed: %s",
sk->host, sk->port,
sk->xlogreader->err_msg);
ShutdownConnection(sk);
return false;
}
sk->outbuf.len += req->endLsn - req->beginLsn;

View File

@@ -2,6 +2,7 @@
#define __NEON_WALPROPOSER_H__
#include "access/xlogdefs.h"
#include "access/xlogreader.h"
#include "postgres.h"
#include "port.h"
#include "access/xlog_internal.h"
@@ -327,6 +328,24 @@ typedef struct AppendResponse
/* Other fields are fixed part */
#define APPENDRESPONSE_FIXEDPART_SIZE offsetof(AppendResponse, rf)
#define NEON_WALREADER_ERR_MSG_LEN 128
/*
* Like WALRead, but returns error instead of throwing ERROR when segment is
* missing + doesn't attempt to read WAL before specified horizon -- basebackup
* LSN. Missing WAL should be fetched by peer recovery, or, alternatively, on
* demand WAL fetching from safekeepers should be implemented in NeonWALReader.
*/
typedef struct {
/* LSN before */
XLogRecPtr available_lsn;
WALSegmentContext segcxt;
WALOpenSegment seg;
int wre_errno;
/* Explains failure to read, static for simplicity. */
char err_msg[NEON_WALREADER_ERR_MSG_LEN];
} NeonWALReader;
/*
* Descriptor of safekeeper
*/
@@ -358,7 +377,7 @@ typedef struct Safekeeper
/*
* WAL reader, allocated for each safekeeper.
*/
XLogReaderState *xlogreader;
NeonWALReader *xlogreader;
/*
* Streaming will start here; must be record boundary.
@@ -508,4 +527,9 @@ extern bool walprop_blocking_write(WalProposerConn *conn, void const *buf, size_
extern uint64 BackpressureThrottlingTime(void);
extern NeonWALReader *NeonWALReaderAllocate(int wal_segment_size, XLogRecPtr available_lsn);
extern void NeonWALReaderFree(NeonWALReader *state);
extern bool NeonWALRead(NeonWALReader *state, char *buf, XLogRecPtr startptr, Size count, TimeLineID tli);
#endif /* __NEON_WALPROPOSER_H__ */

View File

@@ -12,6 +12,7 @@
#include "replication/slot.h"
#include "walproposer_utils.h"
#include "replication/walsender_private.h"
#include "utils/wait_event.h"
#include "storage/ipc.h"
#include "utils/builtins.h"
@@ -657,3 +658,185 @@ XLogBroadcastWalProposer(void)
set_ps_display(activitymsg);
}
}
/* palloc and initialize NeonWALReader */
NeonWALReader *NeonWALReaderAllocate(int wal_segment_size, XLogRecPtr available_lsn)
{
NeonWALReader *reader;
reader = (NeonWALReader *)
palloc_extended(sizeof(NeonWALReader),
MCXT_ALLOC_NO_OOM | MCXT_ALLOC_ZERO);
if (!reader)
return NULL;
reader->available_lsn = available_lsn;
reader->seg.ws_file = -1;
reader->seg.ws_segno = 0;
reader->seg.ws_tli = 0;
reader->segcxt.ws_segsize = wal_segment_size;
return reader;
}
static void neon_wal_segment_close(NeonWALReader *state);
void
NeonWALReaderFree(NeonWALReader *state)
{
if (state->seg.ws_file != -1)
neon_wal_segment_close(state);
pfree(state);
}
/*
* Copy of vanilla wal_segment_open, but returns false in case of error instead
* of ERROR, with errno set.
*
* XLogReaderRoutine->segment_open callback for local pg_wal files
*/
static bool
neon_wal_segment_open(NeonWALReader *state, XLogSegNo nextSegNo,
TimeLineID *tli_p)
{
TimeLineID tli = *tli_p;
char path[MAXPGPATH];
XLogFilePath(path, tli, nextSegNo, state->segcxt.ws_segsize);
state->seg.ws_file = BasicOpenFile(path, O_RDONLY | PG_BINARY);
if (state->seg.ws_file >= 0)
return true;
return false;
}
/* copy of vanilla wal_segment_close with NeonWALReader */
void
neon_wal_segment_close(NeonWALReader *state)
{
close(state->seg.ws_file);
/* need to check errno? */
state->seg.ws_file = -1;
}
/*
* Mostly copy of vanilla WALRead, but 1) returns error if requested data before
* available_lsn 2) returns error is segment is missing instead of throwing
* ERROR.
*
* Read 'count' bytes into 'buf', starting at location 'startptr', from WAL
* fetched from timeline 'tli'.
*
* Returns true if succeeded, false if an error occurs, in which case
* 'state->errno' shows whether it was missing WAL (ENOENT) or something else,
* and 'err' the desciption.
*/
bool NeonWALRead(NeonWALReader *state, char *buf, XLogRecPtr startptr, Size count, TimeLineID tli)
{
char *p;
XLogRecPtr recptr;
Size nbytes;
if (startptr < state->available_lsn)
{
state->wre_errno = 0;
snprintf(state->err_msg, sizeof(state->err_msg), "failed to read WAL at %X/%X which is earlier than available %X/%X",
LSN_FORMAT_ARGS(startptr), LSN_FORMAT_ARGS(state->available_lsn));
return false;
}
p = buf;
recptr = startptr;
nbytes = count;
while (nbytes > 0)
{
uint32 startoff;
int segbytes;
int readbytes;
startoff = XLogSegmentOffset(recptr, state->segcxt.ws_segsize);
/*
* If the data we want is not in a segment we have open, close what we
* have (if anything) and open the next one, using the caller's
* provided openSegment callback.
*/
if (state->seg.ws_file < 0 ||
!XLByteInSeg(recptr, state->seg.ws_segno, state->segcxt.ws_segsize) ||
tli != state->seg.ws_tli)
{
XLogSegNo nextSegNo;
if (state->seg.ws_file >= 0)
neon_wal_segment_close(state);
XLByteToSeg(recptr, nextSegNo, state->segcxt.ws_segsize);
if (!neon_wal_segment_open(state, nextSegNo, &tli))
{
char fname[MAXFNAMELEN];
state->wre_errno = errno;
XLogFileName(fname, tli, nextSegNo, state->segcxt.ws_segsize);
snprintf(state->err_msg, sizeof(state->err_msg), "failed to open WAL segment %s while reading at %X/%X: %s",
fname, LSN_FORMAT_ARGS(recptr), strerror(state->wre_errno));
return false;
}
/* This shouldn't happen -- indicates a bug in segment_open */
Assert(state->seg.ws_file >= 0);
/* Update the current segment info. */
state->seg.ws_tli = tli;
state->seg.ws_segno = nextSegNo;
}
/* How many bytes are within this segment? */
if (nbytes > (state->segcxt.ws_segsize - startoff))
segbytes = state->segcxt.ws_segsize - startoff;
else
segbytes = nbytes;
#ifndef FRONTEND
pgstat_report_wait_start(WAIT_EVENT_WAL_READ);
#endif
/* Reset errno first; eases reporting non-errno-affecting errors */
errno = 0;
readbytes = pg_pread(state->seg.ws_file, p, segbytes, (off_t) startoff);
#ifndef FRONTEND
pgstat_report_wait_end();
#endif
if (readbytes <= 0)
{
char fname[MAXFNAMELEN];
XLogFileName(fname, state->seg.ws_tli, state->seg.ws_segno, state->segcxt.ws_segsize);
if (readbytes < 0)
{
state->wre_errno = errno;
snprintf(state->err_msg, sizeof(state->err_msg), "could not read from log segment %s, offset %d: %m: %s",
fname, startoff, strerror(state->wre_errno));
}
else
{
snprintf(state->err_msg, sizeof(state->err_msg), "could not read from log segment %s, offset %d: %m: unexpected EOF",
fname, startoff);
}
return false;
}
/* Update state for read */
recptr += readbytes;
nbytes -= readbytes;
p += readbytes;
}
return true;
}

View File

@@ -2,7 +2,7 @@
// Main entry point for the safekeeper executable
//
use anyhow::{bail, Context, Result};
use clap::Parser;
use clap::{ArgAction, Parser};
use futures::future::BoxFuture;
use futures::stream::FuturesUnordered;
use futures::{FutureExt, StreamExt};
@@ -105,6 +105,9 @@ struct Args {
/// it during this period passed as a human readable duration.
#[arg(long, value_parser= humantime::parse_duration, default_value = DEFAULT_HEARTBEAT_TIMEOUT, verbatim_doc_comment)]
heartbeat_timeout: Duration,
/// Disable/enable peer recovery. Used for disabling it in tests.
#[arg(long, default_value = "true", action=ArgAction::Set)]
peer_recovery: bool,
/// Remote storage configuration for WAL backup (offloading to s3) as TOML
/// inline table, e.g.
/// {"max_concurrent_syncs" = 17, "max_sync_errors": 13, "bucket_name": "<BUCKETNAME>", "bucket_region":"<REGION>", "concurrency_limit": 119}
@@ -268,6 +271,7 @@ async fn main() -> anyhow::Result<()> {
broker_endpoint: args.broker_endpoint,
broker_keepalive_interval: args.broker_keepalive_interval,
heartbeat_timeout: args.heartbeat_timeout,
peer_recovery_enabled: args.peer_recovery,
remote_storage: args.remote_storage,
max_offloader_lag_bytes: args.max_offloader_lag,
wal_backup_enabled: !args.disable_wal_backup,

View File

@@ -372,6 +372,13 @@ impl SafekeeperPostgresHandler {
/// from a walproposer recovery function. This connection gets a special handling:
/// safekeeper must stream all local WAL till the flush_lsn, whether committed or not.
pub fn is_walproposer_recovery(&self) -> bool {
self.appname == Some("wal_proposer_recovery".to_string())
match &self.appname {
None => false,
Some(appname) => {
appname == "wal_proposer_recovery" ||
// set by safekeeper peer recovery
appname.starts_with("safekeeper")
}
}
}
}

View File

@@ -16,8 +16,8 @@ use tokio::io::AsyncReadExt;
use utils::http::endpoint::request_span;
use crate::receive_wal::WalReceiverState;
use crate::safekeeper::ServerInfo;
use crate::safekeeper::Term;
use crate::safekeeper::{ServerInfo, TermLsn};
use crate::send_wal::WalSenderState;
use crate::timeline::PeerInfo;
use crate::{debug_dump, pull_timeline};
@@ -60,16 +60,25 @@ fn get_conf(request: &Request<Body>) -> &SafeKeeperConf {
.as_ref()
}
/// Same as TermSwitchEntry, but serializes LSN using display serializer
/// Same as TermLsn, but serializes LSN using display serializer
/// in Postgres format, i.e. 0/FFFFFFFF. Used only for the API response.
#[serde_as]
#[derive(Debug, Serialize, Deserialize)]
#[derive(Debug, Clone, Copy, Serialize, Deserialize)]
pub struct TermSwitchApiEntry {
pub term: Term,
#[serde_as(as = "DisplayFromStr")]
pub lsn: Lsn,
}
impl From<TermSwitchApiEntry> for TermLsn {
fn from(api_val: TermSwitchApiEntry) -> Self {
TermLsn {
term: api_val.term,
lsn: api_val.lsn,
}
}
}
/// Augment AcceptorState with epoch for convenience
#[derive(Debug, Serialize, Deserialize)]
pub struct AcceptorStateStatus {

View File

@@ -62,6 +62,7 @@ pub struct SafeKeeperConf {
pub broker_endpoint: Uri,
pub broker_keepalive_interval: Duration,
pub heartbeat_timeout: Duration,
pub peer_recovery_enabled: bool,
pub remote_storage: Option<RemoteStorageConfig>,
pub max_offloader_lag_bytes: u64,
pub backup_parallel_jobs: usize,
@@ -100,6 +101,7 @@ impl SafeKeeperConf {
.parse()
.expect("failed to parse default broker endpoint"),
broker_keepalive_interval: Duration::from_secs(5),
peer_recovery_enabled: true,
wal_backup_enabled: true,
backup_parallel_jobs: 1,
pg_auth: None,

View File

@@ -55,9 +55,12 @@ impl WalReceivers {
/// Register new walreceiver. Returned guard provides access to the slot and
/// automatically deregisters in Drop.
pub fn register(self: &Arc<WalReceivers>) -> WalReceiverGuard {
pub fn register(self: &Arc<WalReceivers>, conn_id: Option<ConnectionId>) -> WalReceiverGuard {
let slots = &mut self.mutex.lock().slots;
let walreceiver = WalReceiverState::Voting;
let walreceiver = WalReceiverState {
conn_id,
status: WalReceiverStatus::Voting,
};
// find empty slot or create new one
let pos = if let Some(pos) = slots.iter().position(|s| s.is_none()) {
slots[pos] = Some(walreceiver);
@@ -96,6 +99,18 @@ impl WalReceivers {
self.mutex.lock().slots.iter().flatten().cloned().collect()
}
/// Get number of streaming walreceivers (normally 0 or 1) from compute.
pub fn get_num_streaming(self: &Arc<WalReceivers>) -> usize {
self.mutex
.lock()
.slots
.iter()
.flatten()
// conn_id.is_none skips recovery which also registers here
.filter(|s| s.conn_id.is_none() && matches!(s.status, WalReceiverStatus::Streaming))
.count()
}
/// Unregister walsender.
fn unregister(self: &Arc<WalReceivers>, id: WalReceiverId) {
let mut shared = self.mutex.lock();
@@ -108,10 +123,17 @@ struct WalReceiversShared {
slots: Vec<Option<WalReceiverState>>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct WalReceiverState {
/// None means it is recovery initiated by us (this safekeeper).
pub conn_id: Option<ConnectionId>,
pub status: WalReceiverStatus,
}
/// Walreceiver status. Currently only whether it passed voting stage and
/// started receiving the stream, but it is easy to add more if needed.
#[derive(Debug, Clone, Serialize, Deserialize)]
pub enum WalReceiverState {
pub enum WalReceiverStatus {
Voting,
Streaming,
}
@@ -136,8 +158,8 @@ impl Drop for WalReceiverGuard {
}
}
const MSG_QUEUE_SIZE: usize = 256;
const REPLY_QUEUE_SIZE: usize = 16;
pub const MSG_QUEUE_SIZE: usize = 256;
pub const REPLY_QUEUE_SIZE: usize = 16;
impl SafekeeperPostgresHandler {
/// Wrapper around handle_start_wal_push_guts handling result. Error is
@@ -261,7 +283,7 @@ impl<'a, IO: AsyncRead + AsyncWrite + Unpin> NetworkReader<'a, IO> {
tli.clone(),
msg_rx,
reply_tx,
self.conn_id,
Some(self.conn_id),
));
// Forward all messages to WalAcceptor
@@ -317,31 +339,41 @@ async fn network_write<IO: AsyncRead + AsyncWrite + Unpin>(
// even when it writes a steady stream of messages.
const KEEPALIVE_INTERVAL: Duration = Duration::from_secs(1);
/// Takes messages from msg_rx, processes and pushes replies to reply_tx.
struct WalAcceptor {
/// Encapsulates a task which takes messages from msg_rx, processes and pushes
/// replies to reply_tx; reading from socket and writing to disk in parallel is
/// beneficial for performance, this struct provides writing to disk part.
pub struct WalAcceptor {
tli: Arc<Timeline>,
msg_rx: Receiver<ProposerAcceptorMessage>,
reply_tx: Sender<AcceptorProposerMessage>,
conn_id: Option<ConnectionId>,
}
impl WalAcceptor {
/// Spawn thread with WalAcceptor running, return handle to it.
fn spawn(
/// Spawn task with WalAcceptor running, return handle to it. Task returns
/// Ok(()) if either of channels has closed, and Err if any error during
/// message processing is encountered.
///
/// conn_id None means WalAcceptor is used by recovery initiated at this safekeeper.
pub fn spawn(
tli: Arc<Timeline>,
msg_rx: Receiver<ProposerAcceptorMessage>,
reply_tx: Sender<AcceptorProposerMessage>,
conn_id: ConnectionId,
conn_id: Option<ConnectionId>,
) -> JoinHandle<anyhow::Result<()>> {
task::spawn(async move {
let mut wa = WalAcceptor {
tli,
msg_rx,
reply_tx,
conn_id,
};
let span_ttid = wa.tli.ttid; // satisfy borrow checker
wa.run()
.instrument(info_span!("WAL acceptor", cid = %conn_id, ttid = %span_ttid))
.instrument(
info_span!("WAL acceptor", cid = %conn_id.unwrap_or(0), ttid = %span_ttid),
)
.await
})
}
@@ -355,7 +387,7 @@ impl WalAcceptor {
let _compute_conn_guard = ComputeConnectionGuard {
timeline: Arc::clone(&self.tli),
};
let walreceiver_guard = self.tli.get_walreceivers().register();
let walreceiver_guard = self.tli.get_walreceivers().register(self.conn_id);
self.tli.update_status_notify().await?;
// After this timestamp we will stop processing AppendRequests and send a response
@@ -372,7 +404,7 @@ impl WalAcceptor {
// Update walreceiver state in shmem for reporting.
if let ProposerAcceptorMessage::Elected(_) = &next_msg {
*walreceiver_guard.get() = WalReceiverState::Streaming;
walreceiver_guard.get().status = WalReceiverStatus::Streaming;
}
let reply_msg = if matches!(next_msg, ProposerAcceptorMessage::AppendRequest(_)) {

View File

@@ -1,17 +1,41 @@
//! This module implements pulling WAL from peer safekeepers if compute can't
//! provide it, i.e. safekeeper lags too much.
use std::sync::Arc;
use std::time::SystemTime;
use std::{fmt, pin::pin, sync::Arc};
use tokio::{select, time::sleep, time::Duration};
use tracing::{info, instrument};
use anyhow::{bail, Context};
use futures::StreamExt;
use postgres_protocol::message::backend::ReplicationMessage;
use tokio::sync::mpsc::{channel, Receiver, Sender};
use tokio::time::timeout;
use tokio::{
select,
time::sleep,
time::{self, Duration},
};
use tokio_postgres::replication::ReplicationStream;
use tokio_postgres::types::PgLsn;
use tracing::*;
use utils::{id::NodeId, lsn::Lsn, postgres_client::wal_stream_connection_config};
use crate::{timeline::Timeline, SafeKeeperConf};
use crate::receive_wal::{WalAcceptor, REPLY_QUEUE_SIZE};
use crate::safekeeper::{AppendRequest, AppendRequestHeader};
use crate::{
http::routes::TimelineStatus,
receive_wal::MSG_QUEUE_SIZE,
safekeeper::{
AcceptorProposerMessage, ProposerAcceptorMessage, ProposerElected, Term, TermHistory,
TermLsn, VoteRequest,
},
timeline::{PeerInfo, Timeline},
SafeKeeperConf,
};
/// Entrypoint for per timeline task which always runs, checking whether
/// recovery for this safekeeper is needed and starting it if so.
#[instrument(name = "recovery task", skip_all, fields(ttid = %tli.ttid))]
pub async fn recovery_main(tli: Arc<Timeline>, _conf: SafeKeeperConf) {
pub async fn recovery_main(tli: Arc<Timeline>, conf: SafeKeeperConf) {
info!("started");
let mut cancellation_rx = match tli.get_cancellation_rx() {
Ok(rx) => rx,
@@ -22,19 +46,387 @@ pub async fn recovery_main(tli: Arc<Timeline>, _conf: SafeKeeperConf) {
};
select! {
_ = recovery_main_loop(tli) => { unreachable!() }
_ = recovery_main_loop(tli, conf) => { unreachable!() }
_ = cancellation_rx.changed() => {
info!("stopped");
}
}
}
/// Result of Timeline::recovery_needed, contains donor(s) if recovery needed and
/// fields to explain the choice.
#[derive(Debug)]
pub struct RecoveryNeededInfo {
/// my term
pub term: Term,
/// my last_log_term
pub last_log_term: Term,
/// my flush_lsn
pub flush_lsn: Lsn,
/// peers from which we can fetch WAL, for observability.
pub peers: Vec<PeerInfo>,
/// for observability
pub num_streaming_computes: usize,
pub donors: Vec<Donor>,
}
// Custom to omit not important fields from PeerInfo.
impl fmt::Display for RecoveryNeededInfo {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
write!(f, "{{")?;
write!(
f,
"term: {}, last_log_term: {}, flush_lsn: {}, peers: {{",
self.term, self.last_log_term, self.flush_lsn
)?;
for p in self.peers.iter() {
write!(
f,
"PeerInfo {{ sk_id: {}, term: {}, last_log_term: {}, flush_lsn: {} }}, ",
p.sk_id, p.term, p.last_log_term, p.flush_lsn
)?;
}
write!(
f,
"}} num_streaming_computes: {}, donors: {:?}",
self.num_streaming_computes, self.donors
)
}
}
#[derive(Clone, Debug)]
pub struct Donor {
pub sk_id: NodeId,
/// equals to last_log_term
pub term: Term,
pub flush_lsn: Lsn,
pub pg_connstr: String,
pub http_connstr: String,
}
impl From<&PeerInfo> for Donor {
fn from(p: &PeerInfo) -> Self {
Donor {
sk_id: p.sk_id,
term: p.term,
flush_lsn: p.flush_lsn,
pg_connstr: p.pg_connstr.clone(),
http_connstr: p.http_connstr.clone(),
}
}
}
const CHECK_INTERVAL_MS: u64 = 2000;
/// Check regularly whether we need to start recovery.
async fn recovery_main_loop(_tli: Arc<Timeline>) {
async fn recovery_main_loop(tli: Arc<Timeline>, conf: SafeKeeperConf) {
let check_duration = Duration::from_millis(CHECK_INTERVAL_MS);
loop {
let recovery_needed_info = tli.recovery_needed(conf.heartbeat_timeout).await;
match recovery_needed_info.donors.first() {
Some(donor) => {
info!(
"starting recovery from donor {}: {}",
donor.sk_id, recovery_needed_info
);
match recover(tli.clone(), donor, &conf).await {
// Note: 'write_wal rewrites WAL written before' error is
// expected here and might happen if compute and recovery
// concurrently write the same data. Eventually compute
// should win.
Err(e) => warn!("recovery failed: {:#}", e),
Ok(msg) => info!("recovery finished: {}", msg),
}
}
None => {
trace!(
"recovery not needed or not possible: {}",
recovery_needed_info
);
}
}
sleep(check_duration).await;
}
}
/// Recover from the specified donor. Returns message explaining normal finish
/// reason or error.
async fn recover(
tli: Arc<Timeline>,
donor: &Donor,
conf: &SafeKeeperConf,
) -> anyhow::Result<String> {
// Learn donor term switch history to figure out starting point.
let client = reqwest::Client::new();
let timeline_info: TimelineStatus = client
.get(format!(
"http://{}/v1/tenant/{}/timeline/{}",
donor.http_connstr, tli.ttid.tenant_id, tli.ttid.timeline_id
))
.send()
.await?
.json()
.await?;
if timeline_info.acceptor_state.term != donor.term {
bail!(
"donor term changed from {} to {}",
donor.term,
timeline_info.acceptor_state.term
);
}
// convert from API TermSwitchApiEntry into TermLsn.
let donor_th = TermHistory(
timeline_info
.acceptor_state
.term_history
.iter()
.map(|tl| Into::<TermLsn>::into(*tl))
.collect(),
);
// Now understand our term history.
let vote_request = ProposerAcceptorMessage::VoteRequest(VoteRequest { term: donor.term });
let vote_response = match tli
.process_msg(&vote_request)
.await
.context("VoteRequest handling")?
{
Some(AcceptorProposerMessage::VoteResponse(vr)) => vr,
_ => {
bail!("unexpected VoteRequest response"); // unreachable
}
};
if vote_response.term != donor.term {
bail!(
"our term changed from {} to {}",
donor.term,
vote_response.term
);
}
let last_common_point = match TermHistory::find_highest_common_point(
&donor_th,
&vote_response.term_history,
vote_response.flush_lsn,
) {
None => bail!(
"couldn't find common point in histories, donor {:?}, sk {:?}",
donor_th,
vote_response.term_history,
),
Some(lcp) => lcp,
};
info!("found last common point at {:?}", last_common_point);
// truncate WAL locally
let pe = ProposerAcceptorMessage::Elected(ProposerElected {
term: donor.term,
start_streaming_at: last_common_point.lsn,
term_history: donor_th,
timeline_start_lsn: Lsn::INVALID,
});
// Successful ProposerElected handling always returns None. If term changed,
// we'll find out that during the streaming. Note: it is expected to get
// 'refusing to overwrite correct WAL' here if walproposer reconnected
// concurrently, restart helps here.
tli.process_msg(&pe)
.await
.context("ProposerElected handling")?;
recovery_stream(tli, donor, last_common_point.lsn, conf).await
}
// Pull WAL from donor, assuming handshake is already done.
async fn recovery_stream(
tli: Arc<Timeline>,
donor: &Donor,
start_streaming_at: Lsn,
conf: &SafeKeeperConf,
) -> anyhow::Result<String> {
// TODO: pass auth token
let cfg = wal_stream_connection_config(tli.ttid, &donor.pg_connstr, None, None)?;
let mut cfg = cfg.to_tokio_postgres_config();
// It will make safekeeper give out not committed WAL (up to flush_lsn).
cfg.application_name(&format!("safekeeper_{}", conf.my_id));
cfg.replication_mode(tokio_postgres::config::ReplicationMode::Physical);
let connect_timeout = Duration::from_millis(10000);
let (client, connection) = match time::timeout(connect_timeout, cfg.connect(postgres::NoTls))
.await
{
Ok(client_and_conn) => client_and_conn?,
Err(_elapsed) => {
bail!("timed out while waiting {connect_timeout:?} for connection to peer safekeeper to open");
}
};
trace!("connected to {:?}", donor);
// The connection object performs the actual communication with the
// server, spawn it off to run on its own.
let ttid = tli.ttid;
tokio::spawn(async move {
if let Err(e) = connection
.instrument(info_span!("recovery task connection poll", ttid = %ttid))
.await
{
// This logging isn't very useful as error is anyway forwarded to client.
trace!(
"tokio_postgres connection object finished with error: {}",
e
);
}
});
let query = format!(
"START_REPLICATION PHYSICAL {} (term='{}')",
start_streaming_at, donor.term
);
let copy_stream = client.copy_both_simple(&query).await?;
let physical_stream = ReplicationStream::new(copy_stream);
// As in normal walreceiver, do networking and writing to disk in parallel.
let (msg_tx, msg_rx) = channel(MSG_QUEUE_SIZE);
let (reply_tx, reply_rx) = channel(REPLY_QUEUE_SIZE);
let wa = WalAcceptor::spawn(tli.clone(), msg_rx, reply_tx, None);
let res = tokio::select! {
r = network_io(physical_stream, msg_tx, donor.clone(), tli.clone(), conf.clone()) => r,
r = read_replies(reply_rx, donor.term) => r.map(|()| None),
};
// Join the spawned WalAcceptor. At this point chans to/from it passed to
// network routines are dropped, so it will exit as soon as it touches them.
match wa.await {
Ok(Ok(())) => {
// WalAcceptor finished normally, termination reason is different
match res {
Ok(Some(success_desc)) => Ok(success_desc),
Ok(None) => bail!("unexpected recovery end without error/success"), // can't happen
Err(e) => Err(e), // network error or term change
}
}
Ok(Err(e)) => Err(e), // error while processing message
Err(e) => bail!("WalAcceptor panicked: {}", e),
}
}
// Perform network part of streaming: read data and push it to msg_tx, send KA
// to make sender hear from us. If there is nothing coming for a while, check
// for termination.
// Returns
// - Ok(None) if channel to WalAcceptor closed -- its task should return error.
// - Ok(Some(String)) if recovery successfully completed.
// - Err if error happened while reading/writing to socket.
async fn network_io(
physical_stream: ReplicationStream,
msg_tx: Sender<ProposerAcceptorMessage>,
donor: Donor,
tli: Arc<Timeline>,
conf: SafeKeeperConf,
) -> anyhow::Result<Option<String>> {
let mut physical_stream = pin!(physical_stream);
let mut last_received_lsn = Lsn::INVALID;
// tear down connection if no data arrives withing this period
let no_data_timeout = Duration::from_millis(30000);
loop {
let msg = match timeout(no_data_timeout, physical_stream.next()).await {
Ok(next) => match next {
None => bail!("unexpected end of replication stream"),
Some(msg) => msg.context("get replication message")?,
},
Err(_) => bail!("no message received within {:?}", no_data_timeout),
};
match msg {
ReplicationMessage::XLogData(xlog_data) => {
let ar_hdr = AppendRequestHeader {
term: donor.term,
epoch_start_lsn: Lsn::INVALID, // unused
begin_lsn: Lsn(xlog_data.wal_start()),
end_lsn: Lsn(xlog_data.wal_start()) + xlog_data.data().len() as u64,
commit_lsn: Lsn::INVALID, // do not attempt to advance, peer communication anyway does it
truncate_lsn: Lsn::INVALID, // do not attempt to advance
proposer_uuid: [0; 16],
};
let ar = AppendRequest {
h: ar_hdr,
wal_data: xlog_data.into_data(),
};
trace!(
"processing AppendRequest {}-{}, len {}",
ar.h.begin_lsn,
ar.h.end_lsn,
ar.wal_data.len()
);
last_received_lsn = ar.h.end_lsn;
if msg_tx
.send(ProposerAcceptorMessage::AppendRequest(ar))
.await
.is_err()
{
return Ok(None); // chan closed, WalAcceptor terminated
}
}
ReplicationMessage::PrimaryKeepAlive(_) => {
// keepalive means nothing is being streamed for a while. Check whether we need to stop.
let recovery_needed_info = tli.recovery_needed(conf.heartbeat_timeout).await;
// do current donors still contain one we currently connected to?
if !recovery_needed_info
.donors
.iter()
.any(|d| d.sk_id == donor.sk_id)
{
// Most likely it means we are caughtup.
// note: just exiting makes tokio_postgres send CopyFail to the far end.
return Ok(Some(format!(
"terminating at {} as connected safekeeper {} with term {} is not a donor anymore: {}",
last_received_lsn, donor.sk_id, donor.term, recovery_needed_info
)));
}
}
_ => {}
}
// Send reply to each message to keep connection alive. Ideally we
// should do that once in a while instead, but this again requires
// stream split or similar workaround, and recovery is anyway not that
// performance critical.
//
// We do not know here real write/flush LSNs (need to take mutex again
// or check replies which are read in different future), but neither
// sender much cares about them, so just send last received.
physical_stream
.as_mut()
.standby_status_update(
PgLsn::from(last_received_lsn.0),
PgLsn::from(last_received_lsn.0),
PgLsn::from(last_received_lsn.0),
SystemTime::now(),
0,
)
.await?;
}
}
// Read replies from WalAcceptor. We are not interested much in sending them to
// donor safekeeper, so don't route them anywhere. However, we should check if
// term changes and exit if it does.
// Returns Ok(()) if channel closed, Err in case of term change.
async fn read_replies(
mut reply_rx: Receiver<AcceptorProposerMessage>,
donor_term: Term,
) -> anyhow::Result<()> {
loop {
match reply_rx.recv().await {
Some(msg) => {
if let AcceptorProposerMessage::AppendResponse(ar) = msg {
if ar.term != donor_term {
bail!("donor term changed from {} to {}", donor_term, ar.term);
}
}
}
None => return Ok(()), // chan closed, WalAcceptor terminated
}
}
}

View File

@@ -91,6 +91,59 @@ impl TermHistory {
}
TermHistory(res)
}
/// Find point of divergence between leader (walproposer) term history and
/// safekeeper. Arguments are not symmetrics as proposer history ends at
/// +infinity while safekeeper at flush_lsn.
/// C version is at walproposer SendProposerElected.
pub fn find_highest_common_point(
prop_th: &TermHistory,
sk_th: &TermHistory,
sk_wal_end: Lsn,
) -> Option<TermLsn> {
let (prop_th, sk_th) = (&prop_th.0, &sk_th.0); // avoid .0 below
// find last common term, if any...
let mut last_common_idx = None;
for i in 0..min(sk_th.len(), prop_th.len()) {
if prop_th[i].term != sk_th[i].term {
break;
}
// If term is the same, LSN must be equal as well.
assert!(
prop_th[i].lsn == sk_th[i].lsn,
"same term {} has different start LSNs: prop {}, sk {}",
prop_th[i].term,
prop_th[i].lsn,
sk_th[i].lsn
);
last_common_idx = Some(i);
}
let last_common_idx = match last_common_idx {
None => return None, // no common point
Some(lci) => lci,
};
// Now find where it ends at both prop and sk and take min. End of
// (common) term is the start of the next except it is the last one;
// there it is flush_lsn in case of safekeeper or, in case of proposer
// +infinity, so we just take flush_lsn then.
if last_common_idx == prop_th.len() - 1 {
Some(TermLsn {
term: prop_th[last_common_idx].term,
lsn: sk_wal_end,
})
} else {
let prop_common_term_end = prop_th[last_common_idx + 1].lsn;
let sk_common_term_end = if last_common_idx + 1 < sk_th.len() {
sk_th[last_common_idx + 1].lsn
} else {
sk_wal_end
};
Some(TermLsn {
term: prop_th[last_common_idx].term,
lsn: min(prop_common_term_end, sk_common_term_end),
})
}
}
}
/// Display only latest entries for Debug.
@@ -305,19 +358,19 @@ pub struct AcceptorGreeting {
/// Vote request sent from proposer to safekeepers
#[derive(Debug, Deserialize)]
pub struct VoteRequest {
term: Term,
pub term: Term,
}
/// Vote itself, sent from safekeeper to proposer
#[derive(Debug, Serialize)]
pub struct VoteResponse {
term: Term, // safekeeper's current term; if it is higher than proposer's, the compute is out of date.
pub term: Term, // safekeeper's current term; if it is higher than proposer's, the compute is out of date.
vote_given: u64, // fixme u64 due to padding
// Safekeeper flush_lsn (end of WAL) + history of term switches allow
// proposer to choose the most advanced one.
flush_lsn: Lsn,
pub flush_lsn: Lsn,
truncate_lsn: Lsn,
term_history: TermHistory,
pub term_history: TermHistory,
timeline_start_lsn: Lsn,
}
@@ -344,7 +397,8 @@ pub struct AppendRequest {
pub struct AppendRequestHeader {
// safekeeper's current term; if it is higher than proposer's, the compute is out of date.
pub term: Term,
// LSN since the proposer appends WAL; determines epoch switch point.
// TODO: remove this field, it in unused -- LSN of term switch can be taken
// from ProposerElected (as well as from term history).
pub epoch_start_lsn: Lsn,
/// start position of message in WAL
pub begin_lsn: Lsn,
@@ -759,7 +813,7 @@ where
bail!("refusing ProposerElected which is going to overwrite correct WAL: term={}, flush_lsn={}, start_streaming_at={}; restarting the handshake should help",
msg.term, self.flush_lsn(), msg.start_streaming_at)
}
// Otherwise this shouldn't happen.
// Otherwise we must never attempt to truncate committed data.
assert!(
msg.start_streaming_at >= self.inmem.commit_lsn,
"attempt to truncate committed data: start_streaming_at={}, commit_lsn={}",
@@ -810,6 +864,14 @@ where
info!("start receiving WAL since {:?}", msg.start_streaming_at);
// Cache LSN where term starts to immediately fsync control file with
// commit_lsn once we reach it -- sync-safekeepers finishes when
// persisted commit_lsn on majority of safekeepers aligns.
self.epoch_start_lsn = match msg.term_history.0.last() {
None => bail!("proposer elected with empty term history"),
Some(term_lsn_start) => term_lsn_start.lsn,
};
Ok(None)
}
@@ -835,10 +897,7 @@ where
// file: walproposer in sync mode is very interested when this
// happens. Note: this is for sync-safekeepers mode only, as
// otherwise commit_lsn might jump over epoch_start_lsn.
// Also note that commit_lsn can reach epoch_start_lsn earlier
// that we receive new epoch_start_lsn, and we still need to sync
// control file in this case.
if commit_lsn == self.epoch_start_lsn && self.state.commit_lsn != commit_lsn {
if commit_lsn >= self.epoch_start_lsn && self.state.commit_lsn < self.epoch_start_lsn {
self.persist_control_file(self.state.clone()).await?;
}
@@ -902,7 +961,6 @@ where
// Now we know that we are in the same term as the proposer,
// processing the message.
self.epoch_start_lsn = msg.h.epoch_start_lsn;
self.inmem.proposer_uuid = msg.h.proposer_uuid;
// do the job
@@ -1185,4 +1243,65 @@ mod tests {
sk.wal_store.truncate_wal(Lsn(3)).await.unwrap(); // imitate the complete record at 3 %)
assert_eq!(sk.get_epoch(), 1);
}
#[test]
fn test_find_highest_common_point_none() {
let prop_th = TermHistory(vec![(0, Lsn(1)).into()]);
let sk_th = TermHistory(vec![(1, Lsn(1)).into(), (2, Lsn(2)).into()]);
assert_eq!(
TermHistory::find_highest_common_point(&prop_th, &sk_th, Lsn(3),),
None
);
}
#[test]
fn test_find_highest_common_point_middle() {
let prop_th = TermHistory(vec![
(1, Lsn(10)).into(),
(2, Lsn(20)).into(),
(4, Lsn(40)).into(),
]);
let sk_th = TermHistory(vec![
(1, Lsn(10)).into(),
(2, Lsn(20)).into(),
(3, Lsn(30)).into(), // sk ends last common term 2 at 30
]);
assert_eq!(
TermHistory::find_highest_common_point(&prop_th, &sk_th, Lsn(40),),
Some(TermLsn {
term: 2,
lsn: Lsn(30),
})
);
}
#[test]
fn test_find_highest_common_point_sk_end() {
let prop_th = TermHistory(vec![
(1, Lsn(10)).into(),
(2, Lsn(20)).into(), // last common term 2, sk will end it at 32 sk_end_lsn
(4, Lsn(40)).into(),
]);
let sk_th = TermHistory(vec![(1, Lsn(10)).into(), (2, Lsn(20)).into()]);
assert_eq!(
TermHistory::find_highest_common_point(&prop_th, &sk_th, Lsn(32),),
Some(TermLsn {
term: 2,
lsn: Lsn(32),
})
);
}
#[test]
fn test_find_highest_common_point_walprop() {
let prop_th = TermHistory(vec![(1, Lsn(10)).into(), (2, Lsn(20)).into()]);
let sk_th = TermHistory(vec![(1, Lsn(10)).into(), (2, Lsn(20)).into()]);
assert_eq!(
TermHistory::find_highest_common_point(&prop_th, &sk_th, Lsn(32),),
Some(TermLsn {
term: 2,
lsn: Lsn(32),
})
);
}
}

View File

@@ -418,10 +418,11 @@ impl SafekeeperPostgresHandler {
}
info!(
"starting streaming from {:?}, available WAL ends at {}, recovery={}",
"starting streaming from {:?}, available WAL ends at {}, recovery={}, appname={:?}",
start_pos,
end_pos,
matches!(end_watch, EndWatch::Flush(_))
matches!(end_watch, EndWatch::Flush(_)),
appname
);
// switch to copy
@@ -680,7 +681,7 @@ impl<IO: AsyncRead + AsyncWrite + Unpin> ReplyReader<IO> {
}
}
const POLL_STATE_TIMEOUT: Duration = Duration::from_secs(1);
const POLL_STATE_TIMEOUT: Duration = Duration::from_secs(5);
/// Wait until we have available WAL > start_pos or timeout expires. Returns
/// - Ok(Some(end_pos)) if needed lsn is successfully observed;

View File

@@ -11,6 +11,7 @@ use serde_with::DisplayFromStr;
use std::cmp::max;
use std::path::PathBuf;
use std::sync::Arc;
use std::time::Duration;
use tokio::sync::{Mutex, MutexGuard};
use tokio::{
sync::{mpsc::Sender, watch},
@@ -27,7 +28,7 @@ use storage_broker::proto::SafekeeperTimelineInfo;
use storage_broker::proto::TenantTimelineId as ProtoTenantTimelineId;
use crate::receive_wal::WalReceivers;
use crate::recovery::recovery_main;
use crate::recovery::{recovery_main, Donor, RecoveryNeededInfo};
use crate::safekeeper::{
AcceptorProposerMessage, ProposerAcceptorMessage, SafeKeeper, SafeKeeperState,
SafekeeperMemState, ServerInfo, Term, TermLsn, INVALID_TERM,
@@ -45,11 +46,12 @@ use crate::{debug_dump, wal_storage};
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct PeerInfo {
pub sk_id: NodeId,
pub term: Term,
/// Term of the last entry.
_last_log_term: Term,
pub last_log_term: Term,
/// LSN of the last record.
#[serde_as(as = "DisplayFromStr")]
_flush_lsn: Lsn,
pub flush_lsn: Lsn,
#[serde_as(as = "DisplayFromStr")]
pub commit_lsn: Lsn,
/// Since which LSN safekeeper has WAL. TODO: remove this once we fill new
@@ -61,16 +63,21 @@ pub struct PeerInfo {
#[serde(skip)]
#[serde(default = "Instant::now")]
ts: Instant,
pub pg_connstr: String,
pub http_connstr: String,
}
impl PeerInfo {
fn from_sk_info(sk_info: &SafekeeperTimelineInfo, ts: Instant) -> PeerInfo {
PeerInfo {
sk_id: NodeId(sk_info.safekeeper_id),
_last_log_term: sk_info.last_log_term,
_flush_lsn: Lsn(sk_info.flush_lsn),
term: sk_info.term,
last_log_term: sk_info.last_log_term,
flush_lsn: Lsn(sk_info.flush_lsn),
commit_lsn: Lsn(sk_info.commit_lsn),
local_start_lsn: Lsn(sk_info.local_start_lsn),
pg_connstr: sk_info.safekeeper_connstr.clone(),
http_connstr: sk_info.http_connstr.clone(),
ts,
}
}
@@ -265,6 +272,20 @@ impl SharedState {
availability_zone: conf.availability_zone.clone(),
}
}
/// Get our latest view of alive peers status on the timeline.
/// We pass our own info through the broker as well, so when we don't have connection
/// to the broker returned vec is empty.
fn get_peers(&self, heartbeat_timeout: Duration) -> Vec<PeerInfo> {
let now = Instant::now();
self.peers_info
.0
.iter()
// Regard peer as absent if we haven't heard from it within heartbeat_timeout.
.filter(|p| now.duration_since(p.ts) <= heartbeat_timeout)
.cloned()
.collect()
}
}
#[derive(Debug, thiserror::Error)]
@@ -446,7 +467,9 @@ impl Timeline {
/// Bootstrap new or existing timeline starting background stasks.
pub fn bootstrap(self: &Arc<Timeline>, conf: &SafeKeeperConf) {
// Start recovery task which always runs on the timeline.
tokio::spawn(recovery_main(self.clone(), conf.clone()));
if conf.peer_recovery_enabled {
tokio::spawn(recovery_main(self.clone(), conf.clone()));
}
}
/// Delete timeline from disk completely, by removing timeline directory. Background
@@ -680,20 +703,88 @@ impl Timeline {
Ok(())
}
/// Get our latest view of alive peers status on the timeline.
/// We pass our own info through the broker as well, so when we don't have connection
/// to the broker returned vec is empty.
pub async fn get_peers(&self, conf: &SafeKeeperConf) -> Vec<PeerInfo> {
let shared_state = self.write_shared_state().await;
let now = Instant::now();
shared_state
.peers_info
.0
.iter()
// Regard peer as absent if we haven't heard from it within heartbeat_timeout.
.filter(|p| now.duration_since(p.ts) <= conf.heartbeat_timeout)
.cloned()
.collect()
shared_state.get_peers(conf.heartbeat_timeout)
}
/// Should we start fetching WAL from a peer safekeeper, and if yes, from
/// which? Answer is yes, i.e. .donors is not empty if 1) there is something
/// to fetch, and we can do that without running elections; 2) there is no
/// actively streaming compute, as we don't want to compete with it.
///
/// If donor(s) are choosen, theirs last_log_term is guaranteed to be equal
/// to its last_log_term so we are sure such a leader ever had been elected.
///
/// All possible donors are returned so that we could keep connection to the
/// current one if it is good even if it slightly lags behind.
///
/// Note that term conditions above might be not met, but safekeepers are
/// still not aligned on last flush_lsn. Generally in this case until
/// elections are run it is not possible to say which safekeeper should
/// recover from which one -- history which would be committed is different
/// depending on assembled quorum (e.g. classic picture 8 from Raft paper).
/// Thus we don't try to predict it here.
pub async fn recovery_needed(&self, heartbeat_timeout: Duration) -> RecoveryNeededInfo {
let ss = self.write_shared_state().await;
let term = ss.sk.state.acceptor_state.term;
let last_log_term = ss.sk.get_epoch();
let flush_lsn = ss.sk.flush_lsn();
// note that peers contain myself, but that's ok -- we are interested only in peers which are strictly ahead of us.
let mut peers = ss.get_peers(heartbeat_timeout);
// Sort by <last log term, lsn> pairs.
peers.sort_by(|p1, p2| {
let tl1 = TermLsn {
term: p1.last_log_term,
lsn: p1.flush_lsn,
};
let tl2 = TermLsn {
term: p2.last_log_term,
lsn: p2.flush_lsn,
};
tl2.cmp(&tl1) // desc
});
let num_streaming_computes = self.walreceivers.get_num_streaming();
let donors = if num_streaming_computes > 0 {
vec![] // If there is a streaming compute, don't try to recover to not intervene.
} else {
peers
.iter()
.filter_map(|candidate| {
// Are we interested in this candidate?
let candidate_tl = TermLsn {
term: candidate.last_log_term,
lsn: candidate.flush_lsn,
};
let my_tl = TermLsn {
term: last_log_term,
lsn: flush_lsn,
};
if my_tl < candidate_tl {
// Yes, we are interested. Can we pull from it without
// (re)running elections? It is possible if 1) his term
// is equal to his last_log_term so we could act on
// behalf of leader of this term (we must be sure he was
// ever elected) and 2) our term is not higher, or we'll refuse data.
if candidate.term == candidate.last_log_term && candidate.term >= term {
Some(Donor::from(candidate))
} else {
None
}
} else {
None
}
})
.collect()
};
RecoveryNeededInfo {
term,
last_log_term,
flush_lsn,
peers,
num_streaming_computes,
donors,
}
}
pub fn get_walsenders(&self) -> &Arc<WalSenders> {

View File

@@ -2691,6 +2691,20 @@ class Safekeeper:
def data_dir(self) -> str:
return os.path.join(self.env.repo_dir, "safekeepers", f"sk{self.id}")
def timeline_dir(self, tenant_id, timeline_id) -> str:
return os.path.join(self.data_dir(), str(tenant_id), str(timeline_id))
def list_segments(self, tenant_id, timeline_id) -> List[str]:
"""
Get list of segment names of the given timeline.
"""
tli_dir = self.timeline_dir(tenant_id, timeline_id)
segments = []
for _, _, filenames in os.walk(tli_dir):
segments.extend([f for f in filenames if f != "safekeeper.control"])
segments.sort()
return segments
@dataclass
class SafekeeperTimelineStatus:

View File

@@ -157,6 +157,8 @@ def test_pageserver_http_get_wal_receiver_success(neon_simple_env: NeonEnv):
tenant_id, timeline_id = env.neon_cli.create_tenant()
endpoint = env.endpoints.create_start(DEFAULT_BRANCH_NAME, tenant_id=tenant_id)
# insert something to force sk -> ps message
endpoint.safe_psql("CREATE TABLE t(key int primary key, value text)")
# Wait to make sure that we get a latest WAL receiver data.
# We need to wait here because it's possible that we don't have access to
# the latest WAL yet, when the `timeline_detail` API is first called.
@@ -168,7 +170,7 @@ def test_pageserver_http_get_wal_receiver_success(neon_simple_env: NeonEnv):
)
# Make a DB modification then expect getting a new WAL receiver's data.
endpoint.safe_psql("CREATE TABLE t(key int primary key, value text)")
endpoint.safe_psql("INSERT INTO t VALUES (1, 'hey')")
wait_until(
number_of_iterations=5,
interval=1,

View File

@@ -1,3 +1,4 @@
import filecmp
import os
import pathlib
import random
@@ -980,6 +981,137 @@ def test_restart_endpoint(neon_env_builder: NeonEnvBuilder):
endpoint.start()
# Test that we can create timeline with one safekeeper down and initialize it
# later when some data already had been written.
def test_late_init(neon_env_builder: NeonEnvBuilder):
neon_env_builder.num_safekeepers = 3
env = neon_env_builder.init_start()
sk1 = env.safekeepers[0]
sk1.stop()
# create and insert smth while safekeeper is down...
env.neon_cli.create_branch("test_late_init")
endpoint = env.endpoints.create_start("test_late_init")
endpoint.safe_psql("create table t(key int, value text)")
endpoint.safe_psql("insert into t select generate_series(1, 1000), 'payload'")
log.info("insert with safekeeper down done")
endpoint.stop() # stop compute
# stop another safekeeper, and start one which missed timeline creation
sk2 = env.safekeepers[1]
sk2.stop()
sk1.start()
# insert some more
endpoint = env.endpoints.create_start("test_late_init")
endpoint.safe_psql("insert into t select generate_series(1,100), 'payload'")
# is timeline flush_lsn equal on provided safekeepers?
def is_flush_lsn_aligned(sk1_http_cli, sk2_http_cli, tenant_id, timeline_id):
return (
sk1_http_cli.timeline_status(tenant_id, timeline_id).flush_lsn
== sk2_http_cli.timeline_status(tenant_id, timeline_id).flush_lsn
)
# Test behaviour with one safekeeper down and missing a lot of WAL. Namely, that
# 1) walproposer can't recover node if it misses WAL written by previous computes, but
# still starts up and functions normally if two other sks are ok.
# 2) walproposer doesn't keep WAL after some threshold (pg_wal bloat is limited), but functions
# normally if two other sks are ok.
# 3) Lagged safekeeper can still recover by peer recovery.
def test_one_sk_down(neon_env_builder: NeonEnvBuilder):
pass
# Smaller version of test_one_sk_down testing peer recovery in isolation: that
# it works without compute at all.
def test_peer_recovery(neon_env_builder: NeonEnvBuilder):
neon_env_builder.num_safekeepers = 3
env = neon_env_builder.init_start()
tenant_id = env.initial_tenant
timeline_id = env.neon_cli.create_branch("test_peer_recovery")
endpoint = env.endpoints.create_start("test_peer_recovery")
endpoint.safe_psql("create table t(key int, value text)")
sk1 = env.safekeepers[0]
sk1.stop()
# roughly fills one segment
endpoint.safe_psql("insert into t select generate_series(1,250000), 'payload'")
endpoint.stop() # stop compute
# now start safekeeper, but with peer recovery disabled
sk1.start(extra_opts=["--peer-recovery=false"])
# it should lag for about a segment
sk1_http_cli = sk1.http_client()
sk2 = env.safekeepers[1]
sk2_http_cli = sk2.http_client()
sk1_tli_status = sk1_http_cli.timeline_status(tenant_id, timeline_id)
sk2_tli_status = sk2_http_cli.timeline_status(tenant_id, timeline_id)
log.info(
f"flush_lsns after insertion: sk1={sk1_tli_status.flush_lsn}, sk2={sk2_tli_status.flush_lsn}"
)
assert sk2_tli_status.flush_lsn - sk1_tli_status.flush_lsn >= 16 * 1024 * 1024
# wait a bit, lsns shouldn't change
# time.sleep(5)
sk1_tli_status = sk1_http_cli.timeline_status(tenant_id, timeline_id)
sk2_tli_status = sk2_http_cli.timeline_status(tenant_id, timeline_id)
log.info(
f"flush_lsns after waiting: sk1={sk1_tli_status.flush_lsn}, sk2={sk2_tli_status.flush_lsn}"
)
assert sk2_tli_status.flush_lsn - sk1_tli_status.flush_lsn >= 16 * 1024 * 1024
# now restart safekeeper with peer recovery enabled and wait for recovery
sk1.stop().start()
wait(
partial(is_flush_lsn_aligned, sk1_http_cli, sk2_http_cli, tenant_id, timeline_id),
"flush_lsn to get aligned",
wait_f=lambda sk1_http_cli=sk1_http_cli, sk2_http_cli=sk2_http_cli, tenant_id=tenant_id, timeline_id=timeline_id: log.info(
f"waiting for flush_lsn alignment, sk1.flush_lsn={sk1_http_cli.timeline_status(tenant_id, timeline_id).flush_lsn}, sk2.flush_lsn={sk2_http_cli.timeline_status(tenant_id, timeline_id).flush_lsn}"
),
)
# check that WALs are identic after recovery
segs = sk1.list_segments(tenant_id, timeline_id)
log.info(f"segs are {segs}")
(_, mismatch, not_regular) = filecmp.cmpfiles(
sk1.timeline_dir(tenant_id, timeline_id),
sk2.timeline_dir(tenant_id, timeline_id),
segs,
shallow=False,
)
log.info(
f"filecmp result mismatch and not regular files:\n\t mismatch={mismatch}\n\t not_regular={not_regular}"
)
for f in mismatch:
f1 = os.path.join(sk1.timeline_dir(tenant_id, timeline_id), f)
f2 = os.path.join(sk2.timeline_dir(tenant_id, timeline_id), f)
stdout_filename = "{}.filediff".format(f2)
with open(stdout_filename, "w") as stdout_f:
subprocess.run("xxd {} > {}.hex ".format(f1, f1), shell=True)
subprocess.run("xxd {} > {}.hex ".format(f2, f2), shell=True)
cmd = "diff {}.hex {}.hex".format(f1, f2)
subprocess.run([cmd], stdout=stdout_f, shell=True)
assert (mismatch, not_regular) == ([], [])
# stop one of safekeepers which weren't recovering and insert a bit more to check we can commit
env.safekeepers[2].stop()
endpoint = env.endpoints.create_start("test_peer_recovery")
endpoint.safe_psql("insert into t select generate_series(1,100), 'payload'")
class SafekeeperEnv:
def __init__(
self,