diff --git a/safekeeper/src/bin/safekeeper.rs b/safekeeper/src/bin/safekeeper.rs index 848b1d7644..ac0c8d9dc2 100644 --- a/safekeeper/src/bin/safekeeper.rs +++ b/safekeeper/src/bin/safekeeper.rs @@ -2,7 +2,7 @@ // Main entry point for the safekeeper executable // use anyhow::{bail, Context, Result}; -use clap::Parser; +use clap::{ArgAction, Parser}; use futures::future::BoxFuture; use futures::stream::FuturesUnordered; use futures::{FutureExt, StreamExt}; @@ -105,6 +105,9 @@ struct Args { /// it during this period passed as a human readable duration. #[arg(long, value_parser= humantime::parse_duration, default_value = DEFAULT_HEARTBEAT_TIMEOUT, verbatim_doc_comment)] heartbeat_timeout: Duration, + /// Disable/enable peer recovery. Used for disabling it in tests. + #[arg(long, default_value = "true", action=ArgAction::Set)] + peer_recovery: bool, /// Remote storage configuration for WAL backup (offloading to s3) as TOML /// inline table, e.g. /// {"max_concurrent_syncs" = 17, "max_sync_errors": 13, "bucket_name": "", "bucket_region":"", "concurrency_limit": 119} @@ -268,6 +271,7 @@ async fn main() -> anyhow::Result<()> { broker_endpoint: args.broker_endpoint, broker_keepalive_interval: args.broker_keepalive_interval, heartbeat_timeout: args.heartbeat_timeout, + peer_recovery_enabled: args.peer_recovery, remote_storage: args.remote_storage, max_offloader_lag_bytes: args.max_offloader_lag, wal_backup_enabled: !args.disable_wal_backup, diff --git a/safekeeper/src/handler.rs b/safekeeper/src/handler.rs index 134331c673..71b99ab1d8 100644 --- a/safekeeper/src/handler.rs +++ b/safekeeper/src/handler.rs @@ -372,6 +372,13 @@ impl SafekeeperPostgresHandler { /// from a walproposer recovery function. This connection gets a special handling: /// safekeeper must stream all local WAL till the flush_lsn, whether committed or not. pub fn is_walproposer_recovery(&self) -> bool { - self.appname == Some("wal_proposer_recovery".to_string()) + match &self.appname { + None => false, + Some(appname) => { + appname == "wal_proposer_recovery" || + // set by safekeeper peer recovery + appname.starts_with("safekeeper") + } + } } } diff --git a/safekeeper/src/http/routes.rs b/safekeeper/src/http/routes.rs index 6104b54f44..509a7e6efe 100644 --- a/safekeeper/src/http/routes.rs +++ b/safekeeper/src/http/routes.rs @@ -16,8 +16,8 @@ use tokio::io::AsyncReadExt; use utils::http::endpoint::request_span; use crate::receive_wal::WalReceiverState; -use crate::safekeeper::ServerInfo; use crate::safekeeper::Term; +use crate::safekeeper::{ServerInfo, TermLsn}; use crate::send_wal::WalSenderState; use crate::timeline::PeerInfo; use crate::{debug_dump, pull_timeline}; @@ -60,16 +60,25 @@ fn get_conf(request: &Request) -> &SafeKeeperConf { .as_ref() } -/// Same as TermSwitchEntry, but serializes LSN using display serializer +/// Same as TermLsn, but serializes LSN using display serializer /// in Postgres format, i.e. 0/FFFFFFFF. Used only for the API response. #[serde_as] -#[derive(Debug, Serialize, Deserialize)] +#[derive(Debug, Clone, Copy, Serialize, Deserialize)] pub struct TermSwitchApiEntry { pub term: Term, #[serde_as(as = "DisplayFromStr")] pub lsn: Lsn, } +impl From for TermLsn { + fn from(api_val: TermSwitchApiEntry) -> Self { + TermLsn { + term: api_val.term, + lsn: api_val.lsn, + } + } +} + /// Augment AcceptorState with epoch for convenience #[derive(Debug, Serialize, Deserialize)] pub struct AcceptorStateStatus { diff --git a/safekeeper/src/lib.rs b/safekeeper/src/lib.rs index d785d0e53a..8ced99f81e 100644 --- a/safekeeper/src/lib.rs +++ b/safekeeper/src/lib.rs @@ -62,6 +62,7 @@ pub struct SafeKeeperConf { pub broker_endpoint: Uri, pub broker_keepalive_interval: Duration, pub heartbeat_timeout: Duration, + pub peer_recovery_enabled: bool, pub remote_storage: Option, pub max_offloader_lag_bytes: u64, pub backup_parallel_jobs: usize, @@ -100,6 +101,7 @@ impl SafeKeeperConf { .parse() .expect("failed to parse default broker endpoint"), broker_keepalive_interval: Duration::from_secs(5), + peer_recovery_enabled: true, wal_backup_enabled: true, backup_parallel_jobs: 1, pg_auth: None, diff --git a/safekeeper/src/receive_wal.rs b/safekeeper/src/receive_wal.rs index cf63eace21..8fa47cb4aa 100644 --- a/safekeeper/src/receive_wal.rs +++ b/safekeeper/src/receive_wal.rs @@ -55,9 +55,12 @@ impl WalReceivers { /// Register new walreceiver. Returned guard provides access to the slot and /// automatically deregisters in Drop. - pub fn register(self: &Arc) -> WalReceiverGuard { + pub fn register(self: &Arc, conn_id: Option) -> WalReceiverGuard { let slots = &mut self.mutex.lock().slots; - let walreceiver = WalReceiverState::Voting; + let walreceiver = WalReceiverState { + conn_id, + status: WalReceiverStatus::Voting, + }; // find empty slot or create new one let pos = if let Some(pos) = slots.iter().position(|s| s.is_none()) { slots[pos] = Some(walreceiver); @@ -96,6 +99,18 @@ impl WalReceivers { self.mutex.lock().slots.iter().flatten().cloned().collect() } + /// Get number of streaming walreceivers (normally 0 or 1) from compute. + pub fn get_num_streaming(self: &Arc) -> usize { + self.mutex + .lock() + .slots + .iter() + .flatten() + // conn_id.is_none skips recovery which also registers here + .filter(|s| s.conn_id.is_none() && matches!(s.status, WalReceiverStatus::Streaming)) + .count() + } + /// Unregister walsender. fn unregister(self: &Arc, id: WalReceiverId) { let mut shared = self.mutex.lock(); @@ -108,10 +123,17 @@ struct WalReceiversShared { slots: Vec>, } +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct WalReceiverState { + /// None means it is recovery initiated by us (this safekeeper). + pub conn_id: Option, + pub status: WalReceiverStatus, +} + /// Walreceiver status. Currently only whether it passed voting stage and /// started receiving the stream, but it is easy to add more if needed. #[derive(Debug, Clone, Serialize, Deserialize)] -pub enum WalReceiverState { +pub enum WalReceiverStatus { Voting, Streaming, } @@ -136,8 +158,8 @@ impl Drop for WalReceiverGuard { } } -const MSG_QUEUE_SIZE: usize = 256; -const REPLY_QUEUE_SIZE: usize = 16; +pub const MSG_QUEUE_SIZE: usize = 256; +pub const REPLY_QUEUE_SIZE: usize = 16; impl SafekeeperPostgresHandler { /// Wrapper around handle_start_wal_push_guts handling result. Error is @@ -261,7 +283,7 @@ impl<'a, IO: AsyncRead + AsyncWrite + Unpin> NetworkReader<'a, IO> { tli.clone(), msg_rx, reply_tx, - self.conn_id, + Some(self.conn_id), )); // Forward all messages to WalAcceptor @@ -317,31 +339,41 @@ async fn network_write( // even when it writes a steady stream of messages. const KEEPALIVE_INTERVAL: Duration = Duration::from_secs(1); -/// Takes messages from msg_rx, processes and pushes replies to reply_tx. -struct WalAcceptor { +/// Encapsulates a task which takes messages from msg_rx, processes and pushes +/// replies to reply_tx; reading from socket and writing to disk in parallel is +/// beneficial for performance, this struct provides writing to disk part. +pub struct WalAcceptor { tli: Arc, msg_rx: Receiver, reply_tx: Sender, + conn_id: Option, } impl WalAcceptor { - /// Spawn thread with WalAcceptor running, return handle to it. - fn spawn( + /// Spawn task with WalAcceptor running, return handle to it. Task returns + /// Ok(()) if either of channels has closed, and Err if any error during + /// message processing is encountered. + /// + /// conn_id None means WalAcceptor is used by recovery initiated at this safekeeper. + pub fn spawn( tli: Arc, msg_rx: Receiver, reply_tx: Sender, - conn_id: ConnectionId, + conn_id: Option, ) -> JoinHandle> { task::spawn(async move { let mut wa = WalAcceptor { tli, msg_rx, reply_tx, + conn_id, }; let span_ttid = wa.tli.ttid; // satisfy borrow checker wa.run() - .instrument(info_span!("WAL acceptor", cid = %conn_id, ttid = %span_ttid)) + .instrument( + info_span!("WAL acceptor", cid = %conn_id.unwrap_or(0), ttid = %span_ttid), + ) .await }) } @@ -355,7 +387,7 @@ impl WalAcceptor { let _compute_conn_guard = ComputeConnectionGuard { timeline: Arc::clone(&self.tli), }; - let walreceiver_guard = self.tli.get_walreceivers().register(); + let walreceiver_guard = self.tli.get_walreceivers().register(self.conn_id); self.tli.update_status_notify().await?; // After this timestamp we will stop processing AppendRequests and send a response @@ -372,7 +404,7 @@ impl WalAcceptor { // Update walreceiver state in shmem for reporting. if let ProposerAcceptorMessage::Elected(_) = &next_msg { - *walreceiver_guard.get() = WalReceiverState::Streaming; + walreceiver_guard.get().status = WalReceiverStatus::Streaming; } let reply_msg = if matches!(next_msg, ProposerAcceptorMessage::AppendRequest(_)) { diff --git a/safekeeper/src/recovery.rs b/safekeeper/src/recovery.rs index 90ba3d2e16..e8fa6c55f4 100644 --- a/safekeeper/src/recovery.rs +++ b/safekeeper/src/recovery.rs @@ -1,17 +1,41 @@ //! This module implements pulling WAL from peer safekeepers if compute can't //! provide it, i.e. safekeeper lags too much. -use std::sync::Arc; +use std::time::SystemTime; +use std::{fmt, pin::pin, sync::Arc}; -use tokio::{select, time::sleep, time::Duration}; -use tracing::{info, instrument}; +use anyhow::{bail, Context}; +use futures::StreamExt; +use postgres_protocol::message::backend::ReplicationMessage; +use tokio::sync::mpsc::{channel, Receiver, Sender}; +use tokio::time::timeout; +use tokio::{ + select, + time::sleep, + time::{self, Duration}, +}; +use tokio_postgres::replication::ReplicationStream; +use tokio_postgres::types::PgLsn; +use tracing::*; +use utils::{id::NodeId, lsn::Lsn, postgres_client::wal_stream_connection_config}; -use crate::{timeline::Timeline, SafeKeeperConf}; +use crate::receive_wal::{WalAcceptor, REPLY_QUEUE_SIZE}; +use crate::safekeeper::{AppendRequest, AppendRequestHeader}; +use crate::{ + http::routes::TimelineStatus, + receive_wal::MSG_QUEUE_SIZE, + safekeeper::{ + AcceptorProposerMessage, ProposerAcceptorMessage, ProposerElected, Term, TermHistory, + TermLsn, VoteRequest, + }, + timeline::{PeerInfo, Timeline}, + SafeKeeperConf, +}; /// Entrypoint for per timeline task which always runs, checking whether /// recovery for this safekeeper is needed and starting it if so. #[instrument(name = "recovery task", skip_all, fields(ttid = %tli.ttid))] -pub async fn recovery_main(tli: Arc, _conf: SafeKeeperConf) { +pub async fn recovery_main(tli: Arc, conf: SafeKeeperConf) { info!("started"); let mut cancellation_rx = match tli.get_cancellation_rx() { Ok(rx) => rx, @@ -22,19 +46,387 @@ pub async fn recovery_main(tli: Arc, _conf: SafeKeeperConf) { }; select! { - _ = recovery_main_loop(tli) => { unreachable!() } + _ = recovery_main_loop(tli, conf) => { unreachable!() } _ = cancellation_rx.changed() => { info!("stopped"); } } } +/// Result of Timeline::recovery_needed, contains donor(s) if recovery needed and +/// fields to explain the choice. +#[derive(Debug)] +pub struct RecoveryNeededInfo { + /// my term + pub term: Term, + /// my last_log_term + pub last_log_term: Term, + /// my flush_lsn + pub flush_lsn: Lsn, + /// peers from which we can fetch WAL, for observability. + pub peers: Vec, + /// for observability + pub num_streaming_computes: usize, + pub donors: Vec, +} + +// Custom to omit not important fields from PeerInfo. +impl fmt::Display for RecoveryNeededInfo { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + write!(f, "{{")?; + write!( + f, + "term: {}, last_log_term: {}, flush_lsn: {}, peers: {{", + self.term, self.last_log_term, self.flush_lsn + )?; + for p in self.peers.iter() { + write!( + f, + "PeerInfo {{ sk_id: {}, term: {}, last_log_term: {}, flush_lsn: {} }}, ", + p.sk_id, p.term, p.last_log_term, p.flush_lsn + )?; + } + write!( + f, + "}} num_streaming_computes: {}, donors: {:?}", + self.num_streaming_computes, self.donors + ) + } +} + +#[derive(Clone, Debug)] +pub struct Donor { + pub sk_id: NodeId, + /// equals to last_log_term + pub term: Term, + pub flush_lsn: Lsn, + pub pg_connstr: String, + pub http_connstr: String, +} + +impl From<&PeerInfo> for Donor { + fn from(p: &PeerInfo) -> Self { + Donor { + sk_id: p.sk_id, + term: p.term, + flush_lsn: p.flush_lsn, + pg_connstr: p.pg_connstr.clone(), + http_connstr: p.http_connstr.clone(), + } + } +} + const CHECK_INTERVAL_MS: u64 = 2000; /// Check regularly whether we need to start recovery. -async fn recovery_main_loop(_tli: Arc) { +async fn recovery_main_loop(tli: Arc, conf: SafeKeeperConf) { let check_duration = Duration::from_millis(CHECK_INTERVAL_MS); loop { + let recovery_needed_info = tli.recovery_needed(conf.heartbeat_timeout).await; + match recovery_needed_info.donors.first() { + Some(donor) => { + info!( + "starting recovery from donor {}: {}", + donor.sk_id, recovery_needed_info + ); + match recover(tli.clone(), donor, &conf).await { + // Note: 'write_wal rewrites WAL written before' error is + // expected here and might happen if compute and recovery + // concurrently write the same data. Eventually compute + // should win. + Err(e) => warn!("recovery failed: {:#}", e), + Ok(msg) => info!("recovery finished: {}", msg), + } + } + None => { + trace!( + "recovery not needed or not possible: {}", + recovery_needed_info + ); + } + } sleep(check_duration).await; } } + +/// Recover from the specified donor. Returns message explaining normal finish +/// reason or error. +async fn recover( + tli: Arc, + donor: &Donor, + conf: &SafeKeeperConf, +) -> anyhow::Result { + // Learn donor term switch history to figure out starting point. + let client = reqwest::Client::new(); + let timeline_info: TimelineStatus = client + .get(format!( + "http://{}/v1/tenant/{}/timeline/{}", + donor.http_connstr, tli.ttid.tenant_id, tli.ttid.timeline_id + )) + .send() + .await? + .json() + .await?; + if timeline_info.acceptor_state.term != donor.term { + bail!( + "donor term changed from {} to {}", + donor.term, + timeline_info.acceptor_state.term + ); + } + // convert from API TermSwitchApiEntry into TermLsn. + let donor_th = TermHistory( + timeline_info + .acceptor_state + .term_history + .iter() + .map(|tl| Into::::into(*tl)) + .collect(), + ); + + // Now understand our term history. + let vote_request = ProposerAcceptorMessage::VoteRequest(VoteRequest { term: donor.term }); + let vote_response = match tli + .process_msg(&vote_request) + .await + .context("VoteRequest handling")? + { + Some(AcceptorProposerMessage::VoteResponse(vr)) => vr, + _ => { + bail!("unexpected VoteRequest response"); // unreachable + } + }; + if vote_response.term != donor.term { + bail!( + "our term changed from {} to {}", + donor.term, + vote_response.term + ); + } + + let last_common_point = match TermHistory::find_highest_common_point( + &donor_th, + &vote_response.term_history, + vote_response.flush_lsn, + ) { + None => bail!( + "couldn't find common point in histories, donor {:?}, sk {:?}", + donor_th, + vote_response.term_history, + ), + Some(lcp) => lcp, + }; + info!("found last common point at {:?}", last_common_point); + + // truncate WAL locally + let pe = ProposerAcceptorMessage::Elected(ProposerElected { + term: donor.term, + start_streaming_at: last_common_point.lsn, + term_history: donor_th, + timeline_start_lsn: Lsn::INVALID, + }); + // Successful ProposerElected handling always returns None. If term changed, + // we'll find out that during the streaming. Note: it is expected to get + // 'refusing to overwrite correct WAL' here if walproposer reconnected + // concurrently, restart helps here. + tli.process_msg(&pe) + .await + .context("ProposerElected handling")?; + + recovery_stream(tli, donor, last_common_point.lsn, conf).await +} + +// Pull WAL from donor, assuming handshake is already done. +async fn recovery_stream( + tli: Arc, + donor: &Donor, + start_streaming_at: Lsn, + conf: &SafeKeeperConf, +) -> anyhow::Result { + // TODO: pass auth token + let cfg = wal_stream_connection_config(tli.ttid, &donor.pg_connstr, None, None)?; + let mut cfg = cfg.to_tokio_postgres_config(); + // It will make safekeeper give out not committed WAL (up to flush_lsn). + cfg.application_name(&format!("safekeeper_{}", conf.my_id)); + cfg.replication_mode(tokio_postgres::config::ReplicationMode::Physical); + + let connect_timeout = Duration::from_millis(10000); + let (client, connection) = match time::timeout(connect_timeout, cfg.connect(postgres::NoTls)) + .await + { + Ok(client_and_conn) => client_and_conn?, + Err(_elapsed) => { + bail!("timed out while waiting {connect_timeout:?} for connection to peer safekeeper to open"); + } + }; + trace!("connected to {:?}", donor); + + // The connection object performs the actual communication with the + // server, spawn it off to run on its own. + let ttid = tli.ttid; + tokio::spawn(async move { + if let Err(e) = connection + .instrument(info_span!("recovery task connection poll", ttid = %ttid)) + .await + { + // This logging isn't very useful as error is anyway forwarded to client. + trace!( + "tokio_postgres connection object finished with error: {}", + e + ); + } + }); + + let query = format!( + "START_REPLICATION PHYSICAL {} (term='{}')", + start_streaming_at, donor.term + ); + + let copy_stream = client.copy_both_simple(&query).await?; + let physical_stream = ReplicationStream::new(copy_stream); + + // As in normal walreceiver, do networking and writing to disk in parallel. + let (msg_tx, msg_rx) = channel(MSG_QUEUE_SIZE); + let (reply_tx, reply_rx) = channel(REPLY_QUEUE_SIZE); + let wa = WalAcceptor::spawn(tli.clone(), msg_rx, reply_tx, None); + + let res = tokio::select! { + r = network_io(physical_stream, msg_tx, donor.clone(), tli.clone(), conf.clone()) => r, + r = read_replies(reply_rx, donor.term) => r.map(|()| None), + }; + + // Join the spawned WalAcceptor. At this point chans to/from it passed to + // network routines are dropped, so it will exit as soon as it touches them. + match wa.await { + Ok(Ok(())) => { + // WalAcceptor finished normally, termination reason is different + match res { + Ok(Some(success_desc)) => Ok(success_desc), + Ok(None) => bail!("unexpected recovery end without error/success"), // can't happen + Err(e) => Err(e), // network error or term change + } + } + Ok(Err(e)) => Err(e), // error while processing message + Err(e) => bail!("WalAcceptor panicked: {}", e), + } +} + +// Perform network part of streaming: read data and push it to msg_tx, send KA +// to make sender hear from us. If there is nothing coming for a while, check +// for termination. +// Returns +// - Ok(None) if channel to WalAcceptor closed -- its task should return error. +// - Ok(Some(String)) if recovery successfully completed. +// - Err if error happened while reading/writing to socket. +async fn network_io( + physical_stream: ReplicationStream, + msg_tx: Sender, + donor: Donor, + tli: Arc, + conf: SafeKeeperConf, +) -> anyhow::Result> { + let mut physical_stream = pin!(physical_stream); + let mut last_received_lsn = Lsn::INVALID; + // tear down connection if no data arrives withing this period + let no_data_timeout = Duration::from_millis(30000); + + loop { + let msg = match timeout(no_data_timeout, physical_stream.next()).await { + Ok(next) => match next { + None => bail!("unexpected end of replication stream"), + Some(msg) => msg.context("get replication message")?, + }, + Err(_) => bail!("no message received within {:?}", no_data_timeout), + }; + + match msg { + ReplicationMessage::XLogData(xlog_data) => { + let ar_hdr = AppendRequestHeader { + term: donor.term, + epoch_start_lsn: Lsn::INVALID, // unused + begin_lsn: Lsn(xlog_data.wal_start()), + end_lsn: Lsn(xlog_data.wal_start()) + xlog_data.data().len() as u64, + commit_lsn: Lsn::INVALID, // do not attempt to advance, peer communication anyway does it + truncate_lsn: Lsn::INVALID, // do not attempt to advance + proposer_uuid: [0; 16], + }; + let ar = AppendRequest { + h: ar_hdr, + wal_data: xlog_data.into_data(), + }; + trace!( + "processing AppendRequest {}-{}, len {}", + ar.h.begin_lsn, + ar.h.end_lsn, + ar.wal_data.len() + ); + last_received_lsn = ar.h.end_lsn; + if msg_tx + .send(ProposerAcceptorMessage::AppendRequest(ar)) + .await + .is_err() + { + return Ok(None); // chan closed, WalAcceptor terminated + } + } + ReplicationMessage::PrimaryKeepAlive(_) => { + // keepalive means nothing is being streamed for a while. Check whether we need to stop. + let recovery_needed_info = tli.recovery_needed(conf.heartbeat_timeout).await; + // do current donors still contain one we currently connected to? + if !recovery_needed_info + .donors + .iter() + .any(|d| d.sk_id == donor.sk_id) + { + // Most likely it means we are caughtup. + // note: just exiting makes tokio_postgres send CopyFail to the far end. + return Ok(Some(format!( + "terminating at {} as connected safekeeper {} with term {} is not a donor anymore: {}", + last_received_lsn, donor.sk_id, donor.term, recovery_needed_info + ))); + } + } + _ => {} + } + // Send reply to each message to keep connection alive. Ideally we + // should do that once in a while instead, but this again requires + // stream split or similar workaround, and recovery is anyway not that + // performance critical. + // + // We do not know here real write/flush LSNs (need to take mutex again + // or check replies which are read in different future), but neither + // sender much cares about them, so just send last received. + physical_stream + .as_mut() + .standby_status_update( + PgLsn::from(last_received_lsn.0), + PgLsn::from(last_received_lsn.0), + PgLsn::from(last_received_lsn.0), + SystemTime::now(), + 0, + ) + .await?; + } +} + +// Read replies from WalAcceptor. We are not interested much in sending them to +// donor safekeeper, so don't route them anywhere. However, we should check if +// term changes and exit if it does. +// Returns Ok(()) if channel closed, Err in case of term change. +async fn read_replies( + mut reply_rx: Receiver, + donor_term: Term, +) -> anyhow::Result<()> { + loop { + match reply_rx.recv().await { + Some(msg) => { + if let AcceptorProposerMessage::AppendResponse(ar) = msg { + if ar.term != donor_term { + bail!("donor term changed from {} to {}", donor_term, ar.term); + } + } + } + None => return Ok(()), // chan closed, WalAcceptor terminated + } + } +} diff --git a/safekeeper/src/safekeeper.rs b/safekeeper/src/safekeeper.rs index ee5c093acb..f115c2d6ad 100644 --- a/safekeeper/src/safekeeper.rs +++ b/safekeeper/src/safekeeper.rs @@ -91,6 +91,59 @@ impl TermHistory { } TermHistory(res) } + + /// Find point of divergence between leader (walproposer) term history and + /// safekeeper. Arguments are not symmetrics as proposer history ends at + /// +infinity while safekeeper at flush_lsn. + /// C version is at walproposer SendProposerElected. + pub fn find_highest_common_point( + prop_th: &TermHistory, + sk_th: &TermHistory, + sk_wal_end: Lsn, + ) -> Option { + let (prop_th, sk_th) = (&prop_th.0, &sk_th.0); // avoid .0 below + // find last common term, if any... + let mut last_common_idx = None; + for i in 0..min(sk_th.len(), prop_th.len()) { + if prop_th[i].term != sk_th[i].term { + break; + } + // If term is the same, LSN must be equal as well. + assert!( + prop_th[i].lsn == sk_th[i].lsn, + "same term {} has different start LSNs: prop {}, sk {}", + prop_th[i].term, + prop_th[i].lsn, + sk_th[i].lsn + ); + last_common_idx = Some(i); + } + let last_common_idx = match last_common_idx { + None => return None, // no common point + Some(lci) => lci, + }; + // Now find where it ends at both prop and sk and take min. End of + // (common) term is the start of the next except it is the last one; + // there it is flush_lsn in case of safekeeper or, in case of proposer + // +infinity, so we just take flush_lsn then. + if last_common_idx == prop_th.len() - 1 { + Some(TermLsn { + term: prop_th[last_common_idx].term, + lsn: sk_wal_end, + }) + } else { + let prop_common_term_end = prop_th[last_common_idx + 1].lsn; + let sk_common_term_end = if last_common_idx + 1 < sk_th.len() { + sk_th[last_common_idx + 1].lsn + } else { + sk_wal_end + }; + Some(TermLsn { + term: prop_th[last_common_idx].term, + lsn: min(prop_common_term_end, sk_common_term_end), + }) + } + } } /// Display only latest entries for Debug. @@ -305,19 +358,19 @@ pub struct AcceptorGreeting { /// Vote request sent from proposer to safekeepers #[derive(Debug, Deserialize)] pub struct VoteRequest { - term: Term, + pub term: Term, } /// Vote itself, sent from safekeeper to proposer #[derive(Debug, Serialize)] pub struct VoteResponse { - term: Term, // safekeeper's current term; if it is higher than proposer's, the compute is out of date. + pub term: Term, // safekeeper's current term; if it is higher than proposer's, the compute is out of date. vote_given: u64, // fixme u64 due to padding // Safekeeper flush_lsn (end of WAL) + history of term switches allow // proposer to choose the most advanced one. - flush_lsn: Lsn, + pub flush_lsn: Lsn, truncate_lsn: Lsn, - term_history: TermHistory, + pub term_history: TermHistory, timeline_start_lsn: Lsn, } @@ -760,7 +813,7 @@ where bail!("refusing ProposerElected which is going to overwrite correct WAL: term={}, flush_lsn={}, start_streaming_at={}; restarting the handshake should help", msg.term, self.flush_lsn(), msg.start_streaming_at) } - // Otherwise this shouldn't happen. + // Otherwise we must never attempt to truncate committed data. assert!( msg.start_streaming_at >= self.inmem.commit_lsn, "attempt to truncate committed data: start_streaming_at={}, commit_lsn={}", @@ -1190,4 +1243,65 @@ mod tests { sk.wal_store.truncate_wal(Lsn(3)).await.unwrap(); // imitate the complete record at 3 %) assert_eq!(sk.get_epoch(), 1); } + + #[test] + fn test_find_highest_common_point_none() { + let prop_th = TermHistory(vec![(0, Lsn(1)).into()]); + let sk_th = TermHistory(vec![(1, Lsn(1)).into(), (2, Lsn(2)).into()]); + assert_eq!( + TermHistory::find_highest_common_point(&prop_th, &sk_th, Lsn(3),), + None + ); + } + + #[test] + fn test_find_highest_common_point_middle() { + let prop_th = TermHistory(vec![ + (1, Lsn(10)).into(), + (2, Lsn(20)).into(), + (4, Lsn(40)).into(), + ]); + let sk_th = TermHistory(vec![ + (1, Lsn(10)).into(), + (2, Lsn(20)).into(), + (3, Lsn(30)).into(), // sk ends last common term 2 at 30 + ]); + assert_eq!( + TermHistory::find_highest_common_point(&prop_th, &sk_th, Lsn(40),), + Some(TermLsn { + term: 2, + lsn: Lsn(30), + }) + ); + } + + #[test] + fn test_find_highest_common_point_sk_end() { + let prop_th = TermHistory(vec![ + (1, Lsn(10)).into(), + (2, Lsn(20)).into(), // last common term 2, sk will end it at 32 sk_end_lsn + (4, Lsn(40)).into(), + ]); + let sk_th = TermHistory(vec![(1, Lsn(10)).into(), (2, Lsn(20)).into()]); + assert_eq!( + TermHistory::find_highest_common_point(&prop_th, &sk_th, Lsn(32),), + Some(TermLsn { + term: 2, + lsn: Lsn(32), + }) + ); + } + + #[test] + fn test_find_highest_common_point_walprop() { + let prop_th = TermHistory(vec![(1, Lsn(10)).into(), (2, Lsn(20)).into()]); + let sk_th = TermHistory(vec![(1, Lsn(10)).into(), (2, Lsn(20)).into()]); + assert_eq!( + TermHistory::find_highest_common_point(&prop_th, &sk_th, Lsn(32),), + Some(TermLsn { + term: 2, + lsn: Lsn(32), + }) + ); + } } diff --git a/safekeeper/src/send_wal.rs b/safekeeper/src/send_wal.rs index b684083446..8e9400d0a5 100644 --- a/safekeeper/src/send_wal.rs +++ b/safekeeper/src/send_wal.rs @@ -418,10 +418,11 @@ impl SafekeeperPostgresHandler { } info!( - "starting streaming from {:?}, available WAL ends at {}, recovery={}", + "starting streaming from {:?}, available WAL ends at {}, recovery={}, appname={:?}", start_pos, end_pos, - matches!(end_watch, EndWatch::Flush(_)) + matches!(end_watch, EndWatch::Flush(_)), + appname ); // switch to copy @@ -680,7 +681,7 @@ impl ReplyReader { } } -const POLL_STATE_TIMEOUT: Duration = Duration::from_secs(1); +const POLL_STATE_TIMEOUT: Duration = Duration::from_secs(5); /// Wait until we have available WAL > start_pos or timeout expires. Returns /// - Ok(Some(end_pos)) if needed lsn is successfully observed; diff --git a/safekeeper/src/timeline.rs b/safekeeper/src/timeline.rs index 3e066de34f..ed3e51f823 100644 --- a/safekeeper/src/timeline.rs +++ b/safekeeper/src/timeline.rs @@ -11,6 +11,7 @@ use serde_with::DisplayFromStr; use std::cmp::max; use std::path::PathBuf; use std::sync::Arc; +use std::time::Duration; use tokio::sync::{Mutex, MutexGuard}; use tokio::{ sync::{mpsc::Sender, watch}, @@ -27,7 +28,7 @@ use storage_broker::proto::SafekeeperTimelineInfo; use storage_broker::proto::TenantTimelineId as ProtoTenantTimelineId; use crate::receive_wal::WalReceivers; -use crate::recovery::recovery_main; +use crate::recovery::{recovery_main, Donor, RecoveryNeededInfo}; use crate::safekeeper::{ AcceptorProposerMessage, ProposerAcceptorMessage, SafeKeeper, SafeKeeperState, SafekeeperMemState, ServerInfo, Term, TermLsn, INVALID_TERM, @@ -45,11 +46,12 @@ use crate::{debug_dump, wal_storage}; #[derive(Debug, Clone, Serialize, Deserialize)] pub struct PeerInfo { pub sk_id: NodeId, + pub term: Term, /// Term of the last entry. - _last_log_term: Term, + pub last_log_term: Term, /// LSN of the last record. #[serde_as(as = "DisplayFromStr")] - _flush_lsn: Lsn, + pub flush_lsn: Lsn, #[serde_as(as = "DisplayFromStr")] pub commit_lsn: Lsn, /// Since which LSN safekeeper has WAL. TODO: remove this once we fill new @@ -61,16 +63,21 @@ pub struct PeerInfo { #[serde(skip)] #[serde(default = "Instant::now")] ts: Instant, + pub pg_connstr: String, + pub http_connstr: String, } impl PeerInfo { fn from_sk_info(sk_info: &SafekeeperTimelineInfo, ts: Instant) -> PeerInfo { PeerInfo { sk_id: NodeId(sk_info.safekeeper_id), - _last_log_term: sk_info.last_log_term, - _flush_lsn: Lsn(sk_info.flush_lsn), + term: sk_info.term, + last_log_term: sk_info.last_log_term, + flush_lsn: Lsn(sk_info.flush_lsn), commit_lsn: Lsn(sk_info.commit_lsn), local_start_lsn: Lsn(sk_info.local_start_lsn), + pg_connstr: sk_info.safekeeper_connstr.clone(), + http_connstr: sk_info.http_connstr.clone(), ts, } } @@ -265,6 +272,20 @@ impl SharedState { availability_zone: conf.availability_zone.clone(), } } + + /// Get our latest view of alive peers status on the timeline. + /// We pass our own info through the broker as well, so when we don't have connection + /// to the broker returned vec is empty. + fn get_peers(&self, heartbeat_timeout: Duration) -> Vec { + let now = Instant::now(); + self.peers_info + .0 + .iter() + // Regard peer as absent if we haven't heard from it within heartbeat_timeout. + .filter(|p| now.duration_since(p.ts) <= heartbeat_timeout) + .cloned() + .collect() + } } #[derive(Debug, thiserror::Error)] @@ -446,7 +467,9 @@ impl Timeline { /// Bootstrap new or existing timeline starting background stasks. pub fn bootstrap(self: &Arc, conf: &SafeKeeperConf) { // Start recovery task which always runs on the timeline. - tokio::spawn(recovery_main(self.clone(), conf.clone())); + if conf.peer_recovery_enabled { + tokio::spawn(recovery_main(self.clone(), conf.clone())); + } } /// Delete timeline from disk completely, by removing timeline directory. Background @@ -680,20 +703,88 @@ impl Timeline { Ok(()) } - /// Get our latest view of alive peers status on the timeline. - /// We pass our own info through the broker as well, so when we don't have connection - /// to the broker returned vec is empty. pub async fn get_peers(&self, conf: &SafeKeeperConf) -> Vec { let shared_state = self.write_shared_state().await; - let now = Instant::now(); - shared_state - .peers_info - .0 - .iter() - // Regard peer as absent if we haven't heard from it within heartbeat_timeout. - .filter(|p| now.duration_since(p.ts) <= conf.heartbeat_timeout) - .cloned() - .collect() + shared_state.get_peers(conf.heartbeat_timeout) + } + + /// Should we start fetching WAL from a peer safekeeper, and if yes, from + /// which? Answer is yes, i.e. .donors is not empty if 1) there is something + /// to fetch, and we can do that without running elections; 2) there is no + /// actively streaming compute, as we don't want to compete with it. + /// + /// If donor(s) are choosen, theirs last_log_term is guaranteed to be equal + /// to its last_log_term so we are sure such a leader ever had been elected. + /// + /// All possible donors are returned so that we could keep connection to the + /// current one if it is good even if it slightly lags behind. + /// + /// Note that term conditions above might be not met, but safekeepers are + /// still not aligned on last flush_lsn. Generally in this case until + /// elections are run it is not possible to say which safekeeper should + /// recover from which one -- history which would be committed is different + /// depending on assembled quorum (e.g. classic picture 8 from Raft paper). + /// Thus we don't try to predict it here. + pub async fn recovery_needed(&self, heartbeat_timeout: Duration) -> RecoveryNeededInfo { + let ss = self.write_shared_state().await; + let term = ss.sk.state.acceptor_state.term; + let last_log_term = ss.sk.get_epoch(); + let flush_lsn = ss.sk.flush_lsn(); + // note that peers contain myself, but that's ok -- we are interested only in peers which are strictly ahead of us. + let mut peers = ss.get_peers(heartbeat_timeout); + // Sort by pairs. + peers.sort_by(|p1, p2| { + let tl1 = TermLsn { + term: p1.last_log_term, + lsn: p1.flush_lsn, + }; + let tl2 = TermLsn { + term: p2.last_log_term, + lsn: p2.flush_lsn, + }; + tl2.cmp(&tl1) // desc + }); + let num_streaming_computes = self.walreceivers.get_num_streaming(); + let donors = if num_streaming_computes > 0 { + vec![] // If there is a streaming compute, don't try to recover to not intervene. + } else { + peers + .iter() + .filter_map(|candidate| { + // Are we interested in this candidate? + let candidate_tl = TermLsn { + term: candidate.last_log_term, + lsn: candidate.flush_lsn, + }; + let my_tl = TermLsn { + term: last_log_term, + lsn: flush_lsn, + }; + if my_tl < candidate_tl { + // Yes, we are interested. Can we pull from it without + // (re)running elections? It is possible if 1) his term + // is equal to his last_log_term so we could act on + // behalf of leader of this term (we must be sure he was + // ever elected) and 2) our term is not higher, or we'll refuse data. + if candidate.term == candidate.last_log_term && candidate.term >= term { + Some(Donor::from(candidate)) + } else { + None + } + } else { + None + } + }) + .collect() + }; + RecoveryNeededInfo { + term, + last_log_term, + flush_lsn, + peers, + num_streaming_computes, + donors, + } } pub fn get_walsenders(&self) -> &Arc { diff --git a/test_runner/regress/test_wal_acceptor.py b/test_runner/regress/test_wal_acceptor.py index 8199f5777b..edfc82c8c2 100644 --- a/test_runner/regress/test_wal_acceptor.py +++ b/test_runner/regress/test_wal_acceptor.py @@ -980,6 +980,81 @@ def test_restart_endpoint(neon_env_builder: NeonEnvBuilder): endpoint.start() +# is timeline flush_lsn equal on provided safekeepers? +def is_flush_lsn_aligned(sk1_http_cli, sk2_http_cli, tenant_id, timeline_id): + return ( + sk1_http_cli.timeline_status(tenant_id, timeline_id).flush_lsn + == sk2_http_cli.timeline_status(tenant_id, timeline_id).flush_lsn + ) + + +# Test behaviour with one safekeeper down and missing a lot of WAL. Namely, that +# 1) walproposer can't recover node if it misses WAL written by previous computes, but +# still starts up and functions normally if two other sks are ok. +# 2) walproposer doesn't keep WAL after some threshold (pg_wal bloat is limited), but functions +# normally if two other sks are ok. +# 3) Lagged safekeeper can still recover by peer recovery. +def test_one_sk_down(neon_env_builder: NeonEnvBuilder): + pass + + +# Smaller version of test_one_sk_down testing peer recovery in isolation: that +# it works without compute at all. +def test_peer_recovery(neon_env_builder: NeonEnvBuilder): + neon_env_builder.num_safekeepers = 3 + env = neon_env_builder.init_start() + + tenant_id = env.initial_tenant + timeline_id = env.neon_cli.create_branch("test_peer_recovery") + endpoint = env.endpoints.create_start("test_peer_recovery") + + endpoint.safe_psql("create table t(key int, value text)") + + sk1 = env.safekeepers[0] + sk1.stop() + + # roughly fills one segment + endpoint.safe_psql("insert into t select generate_series(1,250000), 'payload'") + + endpoint.stop() # stop compute + + # now start safekeeper, but with peer recovery disabled + sk1.start(extra_opts=["--peer-recovery=false"]) + # it should lag for about a segment + sk1_http_cli = sk1.http_client() + sk2 = env.safekeepers[1] + sk2_http_cli = sk2.http_client() + sk1_tli_status = sk1_http_cli.timeline_status(tenant_id, timeline_id) + sk2_tli_status = sk2_http_cli.timeline_status(tenant_id, timeline_id) + log.info( + f"flush_lsns after insertion: sk1={sk1_tli_status.flush_lsn}, sk2={sk2_tli_status.flush_lsn}" + ) + assert sk2_tli_status.flush_lsn - sk1_tli_status.flush_lsn >= 16 * 1024 * 1024 + + # wait a bit, lsns shouldn't change + # time.sleep(5) + sk1_tli_status = sk1_http_cli.timeline_status(tenant_id, timeline_id) + sk2_tli_status = sk2_http_cli.timeline_status(tenant_id, timeline_id) + log.info( + f"flush_lsns after waiting: sk1={sk1_tli_status.flush_lsn}, sk2={sk2_tli_status.flush_lsn}" + ) + assert sk2_tli_status.flush_lsn - sk1_tli_status.flush_lsn >= 16 * 1024 * 1024 + + # now restart safekeeper with peer recovery enabled and wait for recovery + sk1.stop().start() + wait( + partial(is_flush_lsn_aligned, sk1_http_cli, sk2_http_cli, tenant_id, timeline_id), + "flush_lsn to get aligned", + wait_f=lambda sk1_http_cli=sk1_http_cli, sk2_http_cli=sk2_http_cli, tenant_id=tenant_id, timeline_id=timeline_id: log.info( + f"waiting for flush_lsn alignment, sk1.flush_lsn={sk1_http_cli.timeline_status(tenant_id, timeline_id).flush_lsn}, sk2.flush_lsn={sk2_http_cli.timeline_status(tenant_id, timeline_id).flush_lsn}" + ), + ) + # stop one of safekeepers which weren't recovering and insert a bit more + env.safekeepers[2].stop() + endpoint = env.endpoints.create_start("test_peer_recovery") + endpoint.safe_psql("insert into t select generate_series(1,100), 'payload'") + + class SafekeeperEnv: def __init__( self,