diff --git a/pageserver/src/layered_repository.rs b/pageserver/src/layered_repository.rs index 9a6a1d28b5..853cc59b9c 100644 --- a/pageserver/src/layered_repository.rs +++ b/pageserver/src/layered_repository.rs @@ -1175,81 +1175,71 @@ impl LayeredTimeline { ); } - loop { - // Do we have a layer open for writing already? - if let Some(layer) = layers.get_open(&seg) { - if layer.get_start_lsn() > lsn { - bail!("unexpected open layer in the future"); - } - return Ok(layer); + // Do we have a layer open for writing already? + if let Some(layer) = layers.get_open(&seg) { + if layer.get_start_lsn() > lsn { + bail!("unexpected open layer in the future"); } - - // No (writeable) layer for this relation yet. Create one. - // - // Is this a completely new relation? Or the first modification after branching? - // - - let layer; - if let Some((prev_layer, _prev_lsn)) = - self.get_layer_for_read_locked(seg, lsn, &layers)? - { - // Create new entry after the previous one. - let start_lsn; - if prev_layer.get_timeline_id() != self.timelineid { - // First modification on this timeline - start_lsn = self.ancestor_lsn; - trace!( - "creating file for write for {} at branch point {}/{}", - seg, - self.timelineid, - start_lsn - ); - } else { - start_lsn = prev_layer.get_end_lsn(); - trace!( - "creating file for write for {} after previous layer {}/{}", - seg, - self.timelineid, - start_lsn - ); - } - trace!( - "prev layer is at {}/{} - {}", - prev_layer.get_timeline_id(), - prev_layer.get_start_lsn(), - prev_layer.get_end_lsn() - ); - layer = InMemoryLayer::create_successor_layer( - self.conf, - prev_layer, - self.timelineid, - self.tenantid, - start_lsn, - lsn, - )?; - } else { - // New relation. - trace!( - "creating layer for write for new rel {} at {}/{}", - seg, - self.timelineid, - lsn - ); - layer = InMemoryLayer::create( - self.conf, - self.timelineid, - self.tenantid, - seg, - lsn, - lsn, - )?; - } - - let layer_rc: Arc = Arc::new(layer); - layers.insert_open(Arc::clone(&layer_rc)); - - return Ok(layer_rc); + return Ok(layer); } + + // No (writeable) layer for this relation yet. Create one. + // + // Is this a completely new relation? Or the first modification after branching? + // + + let layer; + if let Some((prev_layer, _prev_lsn)) = self.get_layer_for_read_locked(seg, lsn, &layers)? { + // Create new entry after the previous one. + let start_lsn; + if prev_layer.get_timeline_id() != self.timelineid { + // First modification on this timeline + start_lsn = self.ancestor_lsn; + trace!( + "creating file for write for {} at branch point {}/{}", + seg, + self.timelineid, + start_lsn + ); + } else { + start_lsn = prev_layer.get_end_lsn(); + trace!( + "creating file for write for {} after previous layer {}/{}", + seg, + self.timelineid, + start_lsn + ); + } + trace!( + "prev layer is at {}/{} - {}", + prev_layer.get_timeline_id(), + prev_layer.get_start_lsn(), + prev_layer.get_end_lsn() + ); + layer = InMemoryLayer::create_successor_layer( + self.conf, + prev_layer, + self.timelineid, + self.tenantid, + start_lsn, + lsn, + )?; + } else { + // New relation. + trace!( + "creating layer for write for new rel {} at {}/{}", + seg, + self.timelineid, + lsn + ); + layer = + InMemoryLayer::create(self.conf, self.timelineid, self.tenantid, seg, lsn, lsn)?; + } + + let layer_rc: Arc = Arc::new(layer); + layers.insert_open(Arc::clone(&layer_rc)); + + return Ok(layer_rc); } /// diff --git a/walkeeper/src/replication.rs b/walkeeper/src/replication.rs index 4ec5f16ba9..ddb72022e0 100644 --- a/walkeeper/src/replication.rs +++ b/walkeeper/src/replication.rs @@ -2,7 +2,7 @@ //! with the "START_REPLICATION" message. use crate::send_wal::SendWalHandler; -use crate::timeline::{Timeline, TimelineTools}; +use crate::timeline::{ReplicaState, Timeline, TimelineTools}; use anyhow::{anyhow, Context, Result}; use bytes::Bytes; use log::*; @@ -20,7 +20,7 @@ use std::{str, thread}; use zenith_utils::bin_ser::BeSer; use zenith_utils::lsn::Lsn; use zenith_utils::postgres_backend::PostgresBackend; -use zenith_utils::pq_proto::{BeMessage, FeMessage, XLogDataBody}; +use zenith_utils::pq_proto::{BeMessage, FeMessage, WalSndKeepAlive, XLogDataBody}; use zenith_utils::sock_split::ReadStream; pub const END_REPLICATION_MARKER: Lsn = Lsn::MAX; @@ -32,7 +32,7 @@ const STANDBY_STATUS_UPDATE_TAG_BYTE: u8 = b'r'; type FullTransactionId = u64; /// Hot standby feedback received from replica -#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)] +#[derive(Debug, Clone, Copy, PartialEq, Serialize, Deserialize)] pub struct HotStandbyFeedback { pub ts: TimestampTz, pub xmin: FullTransactionId, @@ -49,6 +49,16 @@ impl HotStandbyFeedback { } } +/// Standby status update +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct StandbyReply { + pub write_lsn: Lsn, // disk consistent lSN + pub flush_lsn: Lsn, // LSN committedby quorum + pub apply_lsn: Lsn, // not used + pub reply_ts: TimestampTz, + pub reply_requested: bool, +} + /// A network connection that's speaking the replication protocol. pub struct ReplicationConn { /// This is an `Option` because we will spawn a background thread that will @@ -56,16 +66,15 @@ pub struct ReplicationConn { stream_in: Option, } -// TODO: move this to crate::timeline when there's more users -// TODO: design a proper Timeline mock api -trait HsFeedbackSubscriber { - fn add_hs_feedback(&self, _feedback: HotStandbyFeedback) {} +/// Scope guard to unregister replication connection from timeline +struct ReplicationConnGuard { + replica: usize, // replica internal ID assigned by timeline + timeline: Arc, } -impl HsFeedbackSubscriber for Arc { - #[inline(always)] - fn add_hs_feedback(&self, feedback: HotStandbyFeedback) { - Timeline::add_hs_feedback(self, feedback); +impl Drop for ReplicationConnGuard { + fn drop(&mut self) { + self.timeline.update_replica_state(self.replica, None); } } @@ -79,10 +88,13 @@ impl ReplicationConn { /// Handle incoming messages from the network. /// This is spawned into the background by `handle_start_replication`. - fn background_thread( - mut stream_in: impl Read, - subscriber: impl HsFeedbackSubscriber, - ) -> Result<()> { + fn background_thread(mut stream_in: impl Read, timeline: Arc) -> Result<()> { + let mut state = ReplicaState::new(); + let replica = timeline.add_replica(); + let _guard = ReplicationConnGuard { + replica, + timeline: timeline.clone(), + }; // Wait for replica's feedback. while let Some(msg) = FeMessage::read(&mut stream_in)? { match &msg { @@ -94,11 +106,16 @@ impl ReplicationConn { match m.first().cloned() { Some(HOT_STANDBY_FEEDBACK_TAG_BYTE) => { // Note: deserializing is on m[1..] because we skip the tag byte. - let feedback = HotStandbyFeedback::des(&m[1..]) + state.hs_feedback = HotStandbyFeedback::des(&m[1..]) .context("failed to deserialize HotStandbyFeedback")?; - subscriber.add_hs_feedback(feedback); + timeline.update_replica_state(replica, Some(state)); + } + Some(STANDBY_STATUS_UPDATE_TAG_BYTE) => { + let reply = StandbyReply::des(&m[1..]) + .context("failed to deserialize StandbyReply")?; + state.disk_consistent_lsn = reply.write_lsn; + timeline.update_replica_state(replica, Some(state)); } - Some(STANDBY_STATUS_UPDATE_TAG_BYTE) => (), _ => warn!("unexpected message {:?}", msg), } } @@ -187,7 +204,7 @@ impl ReplicationConn { // switch to copy pgb.write_message(&BeMessage::CopyBothResponse)?; - let mut end_pos: Lsn; + let mut end_pos = Lsn(0); let mut wal_file: Option = None; loop { @@ -202,7 +219,18 @@ impl ReplicationConn { } else { /* normal mode */ let timeline = swh.timeline.get(); - end_pos = timeline.wait_for_lsn(start_pos); + if let Some(lsn) = timeline.wait_for_lsn(start_pos) { + end_pos = lsn + } else { + // timeout expired: request pageserver status + pgb.write_message(&BeMessage::KeepAlive(WalSndKeepAlive { + sent_ptr: end_pos.0, + timestamp: get_current_timestamp(), + request_reply: true, + })) + .context("Failed to send KeepAlive message")?; + continue; + } } if end_pos == END_REPLICATION_MARKER { break; @@ -257,18 +285,3 @@ impl ReplicationConn { Ok(()) } } - -#[cfg(test)] -mod tests { - use super::*; - - // A no-op impl for tests - impl HsFeedbackSubscriber for () {} - - #[test] - fn test_replication_conn_background_thread_eof() { - // Test that background_thread recognizes EOF - let stream: &[u8] = &[]; - ReplicationConn::background_thread(stream, ()).unwrap(); - } -} diff --git a/walkeeper/src/safekeeper.rs b/walkeeper/src/safekeeper.rs index 42f8521117..5755294a71 100644 --- a/walkeeper/src/safekeeper.rs +++ b/walkeeper/src/safekeeper.rs @@ -188,6 +188,8 @@ pub struct AppendResponse { // We report back our awareness about which WAL is committed, as this is // a criterion for walproposer --sync mode exit pub commit_lsn: Lsn, + // Min disk consistent lsn of pageservers (portion of WAL applied and written to the disk by pageservers) + pub disk_consistent_lsn: Lsn, pub hs_feedback: HotStandbyFeedback, } @@ -411,6 +413,7 @@ where epoch: self.s.acceptor_state.epoch, commit_lsn: Lsn(0), flush_lsn: Lsn(0), + disk_consistent_lsn: Lsn(0), hs_feedback: HotStandbyFeedback::empty(), }; return Ok(AcceptorProposerMessage::AppendResponse(resp)); @@ -516,6 +519,7 @@ where epoch: self.s.acceptor_state.epoch, flush_lsn: self.flush_lsn, commit_lsn: self.s.commit_lsn, + disk_consistent_lsn: Lsn(0), // will be filled by caller code to avoid bothering safekeeper hs_feedback: HotStandbyFeedback::empty(), }; diff --git a/walkeeper/src/timeline.rs b/walkeeper/src/timeline.rs index 6e6bc8f97c..d593d9f619 100644 --- a/walkeeper/src/timeline.rs +++ b/walkeeper/src/timeline.rs @@ -11,9 +11,9 @@ use std::collections::HashMap; use std::fs::{self, File, OpenOptions}; use std::io::{Seek, SeekFrom, Write}; use std::sync::{Arc, Condvar, Mutex}; +use std::time::Duration; use zenith_utils::bin_ser::LeSer; use zenith_utils::lsn::Lsn; - use zenith_utils::zid::{ZTenantId, ZTimelineId}; use crate::replication::{HotStandbyFeedback, END_REPLICATION_MARKER}; @@ -25,6 +25,29 @@ use crate::WalAcceptorConf; use postgres_ffi::xlog_utils::{XLogFileName, XLOG_BLCKSZ}; const CONTROL_FILE_NAME: &str = "safekeeper.control"; +const POLL_STATE_TIMEOUT: Duration = Duration::from_secs(1); + +/// Replica status: host standby feedback + disk consistent lsn +#[derive(Debug, Clone, Copy)] +pub struct ReplicaState { + /// combined disk_consistent_lsn of pageservers + pub disk_consistent_lsn: Lsn, + /// combined hot standby feedback from all replicas + pub hs_feedback: HotStandbyFeedback, +} + +impl ReplicaState { + pub fn new() -> ReplicaState { + ReplicaState { + disk_consistent_lsn: Lsn(u64::MAX), + hs_feedback: HotStandbyFeedback { + ts: 0, + xmin: u64::MAX, + catalog_xmin: u64::MAX, + }, + } + } +} /// Shared state associated with database instance (tenant) struct SharedState { @@ -33,8 +56,8 @@ struct SharedState { /// For receiving-sending wal cooperation /// quorum commit LSN we've notified walsenders about commit_lsn: Lsn, - /// combined hot standby feedback from all replicas - hs_feedback: HotStandbyFeedback, + /// State of replicas + replicas: Vec>, } // A named boolean. @@ -45,6 +68,35 @@ pub enum CreateControlFile { } impl SharedState { + /// Get combined stateof all alive replicas + pub fn get_replicas_state(&self) -> ReplicaState { + let mut acc = ReplicaState::new(); + for replica in &self.replicas { + if let Some(state) = replica { + acc.hs_feedback.ts = max(acc.hs_feedback.ts, state.hs_feedback.ts); + acc.hs_feedback.xmin = min(acc.hs_feedback.xmin, state.hs_feedback.xmin); + acc.hs_feedback.catalog_xmin = + min(acc.hs_feedback.catalog_xmin, state.hs_feedback.catalog_xmin); + acc.disk_consistent_lsn = + Lsn::min(acc.disk_consistent_lsn, state.disk_consistent_lsn); + } + } + acc + } + + /// Assign new replica ID. We choose first empty cell in the replicas vector + /// or extend the vector if there are not free items. + pub fn add_replica(&mut self) -> usize { + let len = self.replicas.len(); + for i in 0..len { + if self.replicas[i].is_none() { + return i; + } + } + self.replicas.push(None); + len + } + /// Restore SharedState from control file. Locks the control file along the /// way to prevent running more than one instance of safekeeper on the same /// data dir. @@ -74,21 +126,10 @@ impl SharedState { Ok(Self { commit_lsn: Lsn(0), sk: SafeKeeper::new(Lsn(flush_lsn), tli, storage, state), - hs_feedback: HotStandbyFeedback { - ts: 0, - xmin: u64::MAX, - catalog_xmin: u64::MAX, - }, + replicas: Vec::new(), }) } - /// Accumulate hot standby feedbacks from replicas - pub fn add_hs_feedback(&mut self, feedback: HotStandbyFeedback) { - self.hs_feedback.xmin = min(self.hs_feedback.xmin, feedback.xmin); - self.hs_feedback.catalog_xmin = min(self.hs_feedback.catalog_xmin, feedback.catalog_xmin); - self.hs_feedback.ts = max(self.hs_feedback.ts, feedback.ts); - } - /// Fetch and lock control file (prevent running more than one instance of safekeeper) /// If create=false and file doesn't exist, bails out. fn load_control_file( @@ -178,20 +219,27 @@ impl Timeline { } } - /// Wait for an LSN to be committed. + /// Timed wait for an LSN to be committed. /// /// Returns the last committed LSN, which will be at least - /// as high as the LSN waited for. + /// as high as the LSN waited for, or None if timeout expired. /// - pub fn wait_for_lsn(&self, lsn: Lsn) -> Lsn { + pub fn wait_for_lsn(&self, lsn: Lsn) -> Option { let mut shared_state = self.mutex.lock().unwrap(); loop { let commit_lsn = shared_state.commit_lsn; // This must be `>`, not `>=`. if commit_lsn > lsn { - return commit_lsn; + return Some(commit_lsn); } - shared_state = self.cond.wait(shared_state).unwrap(); + let result = self + .cond + .wait_timeout(shared_state, POLL_STATE_TIMEOUT) + .unwrap(); + if result.1.timed_out() { + return None; + } + shared_state = result.0 } } @@ -219,9 +267,11 @@ impl Timeline { // commit_lsn if we are catching up safekeeper. commit_lsn = shared_state.sk.commit_lsn; - // if this is AppendResponse, fill in proper hot standby feedback + // if this is AppendResponse, fill in proper hot standby feedback and disk consistent lsn if let AcceptorProposerMessage::AppendResponse(ref mut resp) = rmsg { - resp.hs_feedback = shared_state.hs_feedback.clone(); + let state = shared_state.get_replicas_state(); + resp.hs_feedback = state.hs_feedback; + resp.disk_consistent_lsn = state.disk_consistent_lsn; } } // Ping wal sender that new data might be available. @@ -233,15 +283,14 @@ impl Timeline { self.mutex.lock().unwrap().sk.s.clone() } - // Accumulate hot standby feedbacks from replicas - pub fn add_hs_feedback(&self, feedback: HotStandbyFeedback) { + pub fn add_replica(&self) -> usize { let mut shared_state = self.mutex.lock().unwrap(); - shared_state.add_hs_feedback(feedback); + shared_state.add_replica() } - pub fn get_hs_feedback(&self) -> HotStandbyFeedback { - let shared_state = self.mutex.lock().unwrap(); - shared_state.hs_feedback.clone() + pub fn update_replica_state(&self, id: usize, state: Option) { + let mut shared_state = self.mutex.lock().unwrap(); + shared_state.replicas[id] = state; } pub fn get_end_of_wal(&self) -> (Lsn, u32) { diff --git a/zenith_utils/src/pq_proto.rs b/zenith_utils/src/pq_proto.rs index 43a4b217c2..52c194b740 100644 --- a/zenith_utils/src/pq_proto.rs +++ b/zenith_utils/src/pq_proto.rs @@ -357,6 +357,7 @@ pub enum BeMessage<'a> { RowDescription(&'a [RowDescriptor<'a>]), XLogData(XLogDataBody<'a>), NoticeResponse(String), + KeepAlive(WalSndKeepAlive), } // One row desciption in RowDescription packet. @@ -408,6 +409,13 @@ pub struct XLogDataBody<'a> { pub data: &'a [u8], } +#[derive(Debug)] +pub struct WalSndKeepAlive { + pub sent_ptr: u64, + pub timestamp: i64, + pub request_reply: bool, +} + pub static HELLO_WORLD_ROW: BeMessage = BeMessage::DataRow(&[Some(b"hello world")]); // single text column @@ -720,6 +728,17 @@ impl<'a> BeMessage<'a> { }) .unwrap(); } + + BeMessage::KeepAlive(req) => { + buf.put_u8(b'k'); + write_body(buf, |buf| { + buf.put_u64(req.sent_ptr); + buf.put_i64(req.timestamp); + buf.put_u8(if req.request_reply { 1u8 } else { 0u8 }); + Ok::<_, io::Error>(()) + }) + .unwrap(); + } } Ok(()) }