diff --git a/control_plane/src/compute.rs b/control_plane/src/compute.rs index c0107a431e..fb98eeca03 100644 --- a/control_plane/src/compute.rs +++ b/control_plane/src/compute.rs @@ -300,7 +300,7 @@ impl PostgresNode { conf.append("shared_buffers", "1MB"); conf.append("fsync", "off"); conf.append("max_connections", "100"); - conf.append("wal_sender_timeout", "0"); + conf.append("wal_sender_timeout", "10s"); conf.append("wal_level", "replica"); conf.append("listen_addresses", &self.address.ip().to_string()); conf.append("port", &self.address.port().to_string()); diff --git a/pageserver/src/layered_repository.rs b/pageserver/src/layered_repository.rs index b17d08a33a..4e47009326 100644 --- a/pageserver/src/layered_repository.rs +++ b/pageserver/src/layered_repository.rs @@ -695,8 +695,8 @@ impl Timeline for LayeredTimeline { .wait_for_timeout(lsn, TIMEOUT) .with_context(|| { format!( - "Timed out while waiting for WAL record at LSN {} to arrive", - lsn + "Timed out while waiting for WAL record at LSN {} to arrive, disk consistent LSN={}", + lsn, self.get_disk_consistent_lsn() ) })?; @@ -910,6 +910,10 @@ impl Timeline for LayeredTimeline { Ok(total_blocks * BLCKSZ as usize) } + fn get_disk_consistent_lsn(&self) -> Lsn { + self.disk_consistent_lsn.load() + } + fn writer<'a>(&'a self) -> Box { Box::new(LayeredTimelineWriter { tl: self, diff --git a/pageserver/src/repository.rs b/pageserver/src/repository.rs index 3009d51352..73c6f370d6 100644 --- a/pageserver/src/repository.rs +++ b/pageserver/src/repository.rs @@ -134,6 +134,7 @@ pub trait Timeline: Send + Sync { fn get_last_record_lsn(&self) -> Lsn; fn get_prev_record_lsn(&self) -> Lsn; fn get_start_lsn(&self) -> Lsn; + fn get_disk_consistent_lsn(&self) -> Lsn; /// Mutate the timeline with a [`TimelineWriter`]. fn writer<'a>(&'a self) -> Box; diff --git a/pageserver/src/walreceiver.rs b/pageserver/src/walreceiver.rs index de5ee5340c..65b3fa5cf6 100644 --- a/pageserver/src/walreceiver.rs +++ b/pageserver/src/walreceiver.rs @@ -284,12 +284,14 @@ fn walreceiver_main( if let Some(last_lsn) = status_update { // TODO: More thought should go into what values are sent here. let last_lsn = PgLsn::from(u64::from(last_lsn)); - let write_lsn = last_lsn; + // We are using disk consistent LSN as `write_lsn`, i.e. LSN at which page server + // may guarantee persistence of all received data. Safekeeper is not free to remove + // WAL preceding `write_lsn`: it should not be requested by this page server. + let write_lsn = PgLsn::from(u64::from(timeline.get_disk_consistent_lsn())); let flush_lsn = last_lsn; let apply_lsn = PgLsn::from(0); let ts = SystemTime::now(); const NO_REPLY: u8 = 0; - physical_stream.standby_status_update(write_lsn, flush_lsn, apply_lsn, ts, NO_REPLY)?; } diff --git a/test_runner/pytest.ini b/test_runner/pytest.ini index e6c7013559..7ea2ae5dfb 100644 --- a/test_runner/pytest.ini +++ b/test_runner/pytest.ini @@ -2,3 +2,4 @@ minversion = 6.0 log_format = %(asctime)s.%(msecs)-3d %(levelname)s [%(filename)s:%(lineno)d] %(message)s log_date_format = %Y-%m-%d %H:%M:%S +log_cli = true diff --git a/vendor/postgres b/vendor/postgres index 9160deb05a..2a31a0bb8e 160000 --- a/vendor/postgres +++ b/vendor/postgres @@ -1 +1 @@ -Subproject commit 9160deb05a08986354721173ba36e3ebc50a9e21 +Subproject commit 2a31a0bb8ea06dae2d86b0851c547d2f552de770 diff --git a/walkeeper/src/replication.rs b/walkeeper/src/replication.rs index 4ec5f16ba9..29532f9942 100644 --- a/walkeeper/src/replication.rs +++ b/walkeeper/src/replication.rs @@ -2,7 +2,7 @@ //! with the "START_REPLICATION" message. use crate::send_wal::SendWalHandler; -use crate::timeline::{Timeline, TimelineTools}; +use crate::timeline::{ReplicaState, Timeline, TimelineTools}; use anyhow::{anyhow, Context, Result}; use bytes::Bytes; use log::*; @@ -20,7 +20,7 @@ use std::{str, thread}; use zenith_utils::bin_ser::BeSer; use zenith_utils::lsn::Lsn; use zenith_utils::postgres_backend::PostgresBackend; -use zenith_utils::pq_proto::{BeMessage, FeMessage, XLogDataBody}; +use zenith_utils::pq_proto::{BeMessage, FeMessage, WalSndKeepAlive, XLogDataBody}; use zenith_utils::sock_split::ReadStream; pub const END_REPLICATION_MARKER: Lsn = Lsn::MAX; @@ -32,7 +32,7 @@ const STANDBY_STATUS_UPDATE_TAG_BYTE: u8 = b'r'; type FullTransactionId = u64; /// Hot standby feedback received from replica -#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)] +#[derive(Debug, Clone, Copy, PartialEq, Serialize, Deserialize)] pub struct HotStandbyFeedback { pub ts: TimestampTz, pub xmin: FullTransactionId, @@ -49,6 +49,16 @@ impl HotStandbyFeedback { } } +/// Standby status update +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct StandbyReply { + pub write_lsn: Lsn, // disk consistent lSN + pub flush_lsn: Lsn, // LSN committedby quorum + pub apply_lsn: Lsn, // not used + pub reply_ts: TimestampTz, + pub reply_requested: bool, +} + /// A network connection that's speaking the replication protocol. pub struct ReplicationConn { /// This is an `Option` because we will spawn a background thread that will @@ -56,16 +66,15 @@ pub struct ReplicationConn { stream_in: Option, } -// TODO: move this to crate::timeline when there's more users -// TODO: design a proper Timeline mock api -trait HsFeedbackSubscriber { - fn add_hs_feedback(&self, _feedback: HotStandbyFeedback) {} +/// Scope guard to unregister replication connection from timeline +struct ReplicationConnGuard { + replica: usize, // replica internal ID assigned by timeline + timeline: Arc, } -impl HsFeedbackSubscriber for Arc { - #[inline(always)] - fn add_hs_feedback(&self, feedback: HotStandbyFeedback) { - Timeline::add_hs_feedback(self, feedback); +impl Drop for ReplicationConnGuard { + fn drop(&mut self) { + self.timeline.update_replica_state(self.replica, None); } } @@ -79,26 +88,33 @@ impl ReplicationConn { /// Handle incoming messages from the network. /// This is spawned into the background by `handle_start_replication`. - fn background_thread( - mut stream_in: impl Read, - subscriber: impl HsFeedbackSubscriber, - ) -> Result<()> { + fn background_thread(mut stream_in: impl Read, timeline: Arc) -> Result<()> { + let mut state = ReplicaState::new(); + let replica = timeline.add_replica(state); + let _guard = ReplicationConnGuard { + replica, + timeline: timeline.clone(), + }; // Wait for replica's feedback. while let Some(msg) = FeMessage::read(&mut stream_in)? { match &msg { FeMessage::CopyData(m) => { // There's two possible data messages that the client is supposed to send here: - // `HotStandbyFeedback` and `StandbyStatusUpdate`. We only handle hot standby - // feedback. + // `HotStandbyFeedback` and `StandbyStatusUpdate`. match m.first().cloned() { Some(HOT_STANDBY_FEEDBACK_TAG_BYTE) => { // Note: deserializing is on m[1..] because we skip the tag byte. - let feedback = HotStandbyFeedback::des(&m[1..]) + state.hs_feedback = HotStandbyFeedback::des(&m[1..]) .context("failed to deserialize HotStandbyFeedback")?; - subscriber.add_hs_feedback(feedback); + timeline.update_replica_state(replica, Some(state)); + } + Some(STANDBY_STATUS_UPDATE_TAG_BYTE) => { + let reply = StandbyReply::des(&m[1..]) + .context("failed to deserialize StandbyReply")?; + state.disk_consistent_lsn = reply.write_lsn; + timeline.update_replica_state(replica, Some(state)); } - Some(STANDBY_STATUS_UPDATE_TAG_BYTE) => (), _ => warn!("unexpected message {:?}", msg), } } @@ -187,7 +203,7 @@ impl ReplicationConn { // switch to copy pgb.write_message(&BeMessage::CopyBothResponse)?; - let mut end_pos: Lsn; + let mut end_pos = Lsn(0); let mut wal_file: Option = None; loop { @@ -202,7 +218,18 @@ impl ReplicationConn { } else { /* normal mode */ let timeline = swh.timeline.get(); - end_pos = timeline.wait_for_lsn(start_pos); + if let Some(lsn) = timeline.wait_for_lsn(start_pos) { + end_pos = lsn + } else { + // timeout expired: request pageserver status + pgb.write_message(&BeMessage::KeepAlive(WalSndKeepAlive { + sent_ptr: end_pos.0, + timestamp: get_current_timestamp(), + request_reply: true, + })) + .context("Failed to send KeepAlive message")?; + continue; + } } if end_pos == END_REPLICATION_MARKER { break; @@ -257,18 +284,3 @@ impl ReplicationConn { Ok(()) } } - -#[cfg(test)] -mod tests { - use super::*; - - // A no-op impl for tests - impl HsFeedbackSubscriber for () {} - - #[test] - fn test_replication_conn_background_thread_eof() { - // Test that background_thread recognizes EOF - let stream: &[u8] = &[]; - ReplicationConn::background_thread(stream, ()).unwrap(); - } -} diff --git a/walkeeper/src/safekeeper.rs b/walkeeper/src/safekeeper.rs index d7f5623002..49e5945c95 100644 --- a/walkeeper/src/safekeeper.rs +++ b/walkeeper/src/safekeeper.rs @@ -191,6 +191,8 @@ pub struct AppendResponse { // We report back our awareness about which WAL is committed, as this is // a criterion for walproposer --sync mode exit pub commit_lsn: Lsn, + // Min disk consistent lsn of pageservers (portion of WAL applied and written to the disk by pageservers) + pub disk_consistent_lsn: Lsn, pub hs_feedback: HotStandbyFeedback, } @@ -458,6 +460,7 @@ where epoch: self.s.acceptor_state.epoch, commit_lsn: Lsn(0), flush_lsn: Lsn(0), + disk_consistent_lsn: Lsn(0), hs_feedback: HotStandbyFeedback::empty(), }; return Ok(AcceptorProposerMessage::AppendResponse(resp)); @@ -567,6 +570,7 @@ where epoch: self.s.acceptor_state.epoch, flush_lsn: self.flush_lsn, commit_lsn: self.s.commit_lsn, + disk_consistent_lsn: Lsn(0), // will be filled by caller code to avoid bothering safekeeper hs_feedback: HotStandbyFeedback::empty(), }; diff --git a/walkeeper/src/timeline.rs b/walkeeper/src/timeline.rs index 82aa6d6d36..b30c061c9c 100644 --- a/walkeeper/src/timeline.rs +++ b/walkeeper/src/timeline.rs @@ -11,9 +11,9 @@ use std::collections::HashMap; use std::fs::{self, File, OpenOptions}; use std::io::{Seek, SeekFrom, Write}; use std::sync::{Arc, Condvar, Mutex}; +use std::time::Duration; use zenith_utils::bin_ser::LeSer; use zenith_utils::lsn::Lsn; - use zenith_utils::zid::{ZTenantId, ZTimelineId}; use crate::replication::{HotStandbyFeedback, END_REPLICATION_MARKER}; @@ -25,6 +25,35 @@ use crate::WalAcceptorConf; use postgres_ffi::xlog_utils::{XLogFileName, XLOG_BLCKSZ}; const CONTROL_FILE_NAME: &str = "safekeeper.control"; +const POLL_STATE_TIMEOUT: Duration = Duration::from_secs(1); + +/// Replica status: host standby feedback + disk consistent lsn +#[derive(Debug, Clone, Copy)] +pub struct ReplicaState { + /// combined disk_consistent_lsn of pageservers + pub disk_consistent_lsn: Lsn, + /// combined hot standby feedback from all replicas + pub hs_feedback: HotStandbyFeedback, +} + +impl Default for ReplicaState { + fn default() -> Self { + Self::new() + } +} + +impl ReplicaState { + pub fn new() -> ReplicaState { + ReplicaState { + disk_consistent_lsn: Lsn(u64::MAX), + hs_feedback: HotStandbyFeedback { + ts: 0, + xmin: u64::MAX, + catalog_xmin: u64::MAX, + }, + } + } +} /// Shared state associated with database instance (tenant) struct SharedState { @@ -33,8 +62,8 @@ struct SharedState { /// For receiving-sending wal cooperation /// quorum commit LSN we've notified walsenders about notified_commit_lsn: Lsn, - /// combined hot standby feedback from all replicas - hs_feedback: HotStandbyFeedback, + /// State of replicas + replicas: Vec>, } // A named boolean. @@ -45,6 +74,31 @@ pub enum CreateControlFile { } impl SharedState { + /// Get combined stateof all alive replicas + pub fn get_replicas_state(&self) -> ReplicaState { + let mut acc = ReplicaState::new(); + for state in self.replicas.iter().flatten() { + acc.hs_feedback.ts = max(acc.hs_feedback.ts, state.hs_feedback.ts); + acc.hs_feedback.xmin = min(acc.hs_feedback.xmin, state.hs_feedback.xmin); + acc.hs_feedback.catalog_xmin = + min(acc.hs_feedback.catalog_xmin, state.hs_feedback.catalog_xmin); + acc.disk_consistent_lsn = Lsn::min(acc.disk_consistent_lsn, state.disk_consistent_lsn); + } + acc + } + + /// Assign new replica ID. We choose first empty cell in the replicas vector + /// or extend the vector if there are not free items. + pub fn add_replica(&mut self, state: ReplicaState) -> usize { + if let Some(pos) = self.replicas.iter().position(|r| r.is_none()) { + self.replicas[pos] = Some(state); + return pos; + } + let pos = self.replicas.len(); + self.replicas.push(Some(state)); + pos + } + /// Restore SharedState from control file. Locks the control file along the /// way to prevent running more than one instance of safekeeper on the same /// data dir. @@ -74,21 +128,10 @@ impl SharedState { Ok(Self { notified_commit_lsn: Lsn(0), sk: SafeKeeper::new(Lsn(flush_lsn), tli, storage, state), - hs_feedback: HotStandbyFeedback { - ts: 0, - xmin: u64::MAX, - catalog_xmin: u64::MAX, - }, + replicas: Vec::new(), }) } - /// Accumulate hot standby feedbacks from replicas - pub fn add_hs_feedback(&mut self, feedback: HotStandbyFeedback) { - self.hs_feedback.xmin = min(self.hs_feedback.xmin, feedback.xmin); - self.hs_feedback.catalog_xmin = min(self.hs_feedback.catalog_xmin, feedback.catalog_xmin); - self.hs_feedback.ts = max(self.hs_feedback.ts, feedback.ts); - } - /// Fetch and lock control file (prevent running more than one instance of safekeeper) /// If create=false and file doesn't exist, bails out. fn load_control_file( @@ -178,20 +221,27 @@ impl Timeline { } } - /// Wait for an LSN to be committed. + /// Timed wait for an LSN to be committed. /// /// Returns the last committed LSN, which will be at least - /// as high as the LSN waited for. + /// as high as the LSN waited for, or None if timeout expired. /// - pub fn wait_for_lsn(&self, lsn: Lsn) -> Lsn { + pub fn wait_for_lsn(&self, lsn: Lsn) -> Option { let mut shared_state = self.mutex.lock().unwrap(); loop { let commit_lsn = shared_state.notified_commit_lsn; // This must be `>`, not `>=`. if commit_lsn > lsn { - return commit_lsn; + return Some(commit_lsn); } - shared_state = self.cond.wait(shared_state).unwrap(); + let result = self + .cond + .wait_timeout(shared_state, POLL_STATE_TIMEOUT) + .unwrap(); + if result.1.timed_out() { + return None; + } + shared_state = result.0 } } @@ -219,9 +269,11 @@ impl Timeline { // commit_lsn if we are catching up safekeeper. commit_lsn = shared_state.sk.commit_lsn; - // if this is AppendResponse, fill in proper hot standby feedback + // if this is AppendResponse, fill in proper hot standby feedback and disk consistent lsn if let AcceptorProposerMessage::AppendResponse(ref mut resp) = rmsg { - resp.hs_feedback = shared_state.hs_feedback.clone(); + let state = shared_state.get_replicas_state(); + resp.hs_feedback = state.hs_feedback; + resp.disk_consistent_lsn = state.disk_consistent_lsn; } } // Ping wal sender that new data might be available. @@ -233,15 +285,14 @@ impl Timeline { self.mutex.lock().unwrap().sk.s.clone() } - // Accumulate hot standby feedbacks from replicas - pub fn add_hs_feedback(&self, feedback: HotStandbyFeedback) { + pub fn add_replica(&self, state: ReplicaState) -> usize { let mut shared_state = self.mutex.lock().unwrap(); - shared_state.add_hs_feedback(feedback); + shared_state.add_replica(state) } - pub fn get_hs_feedback(&self) -> HotStandbyFeedback { - let shared_state = self.mutex.lock().unwrap(); - shared_state.hs_feedback.clone() + pub fn update_replica_state(&self, id: usize, state: Option) { + let mut shared_state = self.mutex.lock().unwrap(); + shared_state.replicas[id] = state; } pub fn get_end_of_wal(&self) -> (Lsn, u32) { diff --git a/zenith_utils/src/pq_proto.rs b/zenith_utils/src/pq_proto.rs index fe66f9d5a2..1941784332 100644 --- a/zenith_utils/src/pq_proto.rs +++ b/zenith_utils/src/pq_proto.rs @@ -358,6 +358,7 @@ pub enum BeMessage<'a> { RowDescription(&'a [RowDescriptor<'a>]), XLogData(XLogDataBody<'a>), NoticeResponse(String), + KeepAlive(WalSndKeepAlive), } // One row desciption in RowDescription packet. @@ -409,6 +410,13 @@ pub struct XLogDataBody<'a> { pub data: &'a [u8], } +#[derive(Debug)] +pub struct WalSndKeepAlive { + pub sent_ptr: u64, + pub timestamp: i64, + pub request_reply: bool, +} + pub static HELLO_WORLD_ROW: BeMessage = BeMessage::DataRow(&[Some(b"hello world")]); // single text column @@ -721,6 +729,18 @@ impl<'a> BeMessage<'a> { }) .unwrap(); } + + BeMessage::KeepAlive(req) => { + buf.put_u8(b'd'); + write_body(buf, |buf| { + buf.put_u8(b'k'); + buf.put_u64(req.sent_ptr); + buf.put_i64(req.timestamp); + buf.put_u8(if req.request_reply { 1u8 } else { 0u8 }); + Ok::<_, io::Error>(()) + }) + .unwrap(); + } } Ok(()) }