diff --git a/pageserver/src/basebackup.rs b/pageserver/src/basebackup.rs index f516f85afc..6eb40bead4 100644 --- a/pageserver/src/basebackup.rs +++ b/pageserver/src/basebackup.rs @@ -22,7 +22,7 @@ use crate::relish::*; use crate::repository::Timeline; use postgres_ffi::xlog_utils::*; use postgres_ffi::*; -use zenith_utils::lsn::Lsn; +use zenith_utils::lsn::{Lsn, RecordLsn}; /// This is short-living object only for the time of tarball creation, /// created mostly to avoid passing a lot of parameters between various functions @@ -34,13 +34,36 @@ pub struct Basebackup<'a> { prev_record_lsn: Lsn, } +// Create basebackup with non-rel data in it. Omit relational data. +// +// Currently we use empty lsn in two cases: +// * During the basebackup right after timeline creation +// * When working without safekeepers. In this situation it is important to match the lsn +// we are taking basebackup on with the lsn that is used in pageserver's walreceiver +// to start the replication. impl<'a> Basebackup<'a> { pub fn new( write: &'a mut dyn Write, timeline: &'a Arc, - lsn: Lsn, - prev_record_lsn: Lsn, + req_lsn: Option, ) -> Basebackup<'a> { + let RecordLsn { + last: lsn, + prev: prev_record_lsn, + } = if let Some(lsn) = req_lsn { + // FIXME: that wouldn't work since we don't know prev for old LSN's. + // Probably it is better to avoid using prev in compute node start + // at all and acept the fact that first WAL record in the timeline would + // have zero as prev. https://github.com/zenithdb/zenith/issues/506 + RecordLsn { + last: lsn, + prev: lsn, + } + } else { + // Atomically get last and prev LSN's + timeline.get_last_record_rlsn() + }; + Basebackup { ar: Builder::new(write), timeline, diff --git a/pageserver/src/layered_repository.rs b/pageserver/src/layered_repository.rs index 0a670b7f6b..6006784b55 100644 --- a/pageserver/src/layered_repository.rs +++ b/pageserver/src/layered_repository.rs @@ -38,8 +38,8 @@ use crate::{ZTenantId, ZTimelineId}; use zenith_metrics::{register_histogram, Histogram}; use zenith_metrics::{register_histogram_vec, HistogramVec}; use zenith_utils::bin_ser::BeSer; -use zenith_utils::lsn::{AtomicLsn, Lsn}; -use zenith_utils::seqwait::{MonotonicCounter, SeqWait}; +use zenith_utils::lsn::{AtomicLsn, Lsn, RecordLsn}; +use zenith_utils::seqwait::SeqWait; mod blob; mod delta_layer; @@ -147,8 +147,6 @@ impl Repository for LayeredRepository { /// Branch a timeline fn branch_timeline(&self, src: ZTimelineId, dst: ZTimelineId, start_lsn: Lsn) -> Result<()> { let src_timeline = self.get_timeline(src)?; - // This LSN comes from the user request. Make sure it is aligned. - let start_lsn = start_lsn.aligned(); // Create the metadata file, noting the ancestor of the new timeline. // There is initially no data in it, but all the read-calls know to look @@ -458,24 +456,6 @@ pub struct TimelineMetadata { ancestor_lsn: Lsn, } -#[derive(Debug, Clone, Copy)] -struct RecordLsn { - last: Lsn, - prev: Lsn, -} - -impl MonotonicCounter for RecordLsn { - fn cnt_advance(&mut self, lsn: Lsn) { - assert!(self.last <= lsn); - let new_prev = self.last; - self.last = lsn; - self.prev = new_prev; - } - fn cnt_value(&self) -> Lsn { - self.last - } -} - pub struct LayeredTimeline { conf: &'static PageServerConf, @@ -629,6 +609,8 @@ impl Timeline for LayeredTimeline { fn list_nonrels(&self, lsn: Lsn) -> Result> { info!("list_nonrels called at {}", lsn); + let lsn = self.wait_lsn(lsn)?; + // List all nonrels in this timeline, and all its ancestors. let mut all_rels = HashSet::new(); let mut timeline = self; @@ -794,6 +776,10 @@ impl Timeline for LayeredTimeline { fn get_prev_record_lsn(&self) -> Lsn { self.last_record_lsn.load().prev } + + fn get_last_record_rlsn(&self) -> RecordLsn { + self.last_record_lsn.load() + } } impl LayeredTimeline { @@ -1052,20 +1038,23 @@ impl LayeredTimeline { /// /// Wait until WAL has been received up to the given LSN. /// - fn wait_lsn(&self, mut lsn: Lsn) -> anyhow::Result { + /// TODO: change lsn to be an Optional or even force callers to call get_last_record_lsn() + /// (since at least basebackup needs to call get_last_and_prev_record_lsns() instead) + fn wait_lsn(&self, lsn: Lsn) -> anyhow::Result { // When invalid LSN is requested, it means "don't wait, return latest version of the page" // This is necessary for bootstrap. if lsn == Lsn(0) { - let last_valid_lsn = self.last_valid_lsn.load(); - trace!( - "walreceiver doesn't work yet last_valid_lsn {}, requested {}", - last_valid_lsn, - lsn - ); - lsn = last_valid_lsn; + return Ok(self.get_last_record_lsn()); } - self.last_valid_lsn + // FIXME: we can deadlock if we call wait_lsn() from WAL receiver. And we actually + // it a lot from there. Only deadlock that I caught was while trying to add wait_lsn() + // in list_rels(). But it makes sense to make all functions in timeline non-waiting; + // assert that arg_lsn <= current_record_lsn; call wait_lsn explicetly where it is + // needed (page_service and basebackup); uncomment this check: + // assert_ne!(thread::current().name(), Some("WAL receiver thread")); + + self.last_record_lsn .wait_for_timeout(lsn, TIMEOUT) .with_context(|| { format!( diff --git a/pageserver/src/page_service.rs b/pageserver/src/page_service.rs index 3ba9e86b87..60e7f65078 100644 --- a/pageserver/src/page_service.rs +++ b/pageserver/src/page_service.rs @@ -347,17 +347,9 @@ impl PageServerHandler { info!("sent CopyOut"); /* Send a tarball of the latest snapshot on the timeline */ - - let req_lsn = lsn.unwrap_or_else(|| timeline.get_last_record_lsn()); - { let mut writer = CopyDataSink { pgb }; - let mut basebackup = basebackup::Basebackup::new( - &mut writer, - &timeline, - req_lsn, - timeline.get_prev_record_lsn(), - ); + let mut basebackup = basebackup::Basebackup::new(&mut writer, &timeline, lsn); basebackup.send_tarball()?; } pgb.write_message(&BeMessage::CopyDone)?; @@ -446,9 +438,10 @@ impl postgres_backend::Handler for PageServerHandler { self.handle_pagerequests(pgb, timelineid, tenantid)?; } else if query_string.starts_with("basebackup ") { let (_, params_raw) = query_string.split_at("basebackup ".len()); - let params = params_raw.split(" ").collect::>(); + let params = params_raw.split_whitespace().collect::>(); + ensure!( - params.len() == 2, + params.len() >= 2, "invalid param number for basebackup command" ); @@ -457,12 +450,12 @@ impl postgres_backend::Handler for PageServerHandler { self.check_permission(Some(tenantid))?; - // TODO are there any tests with lsn option? let lsn = if params.len() == 3 { Some(Lsn::from_str(params[2])?) } else { None }; + info!( "got basebackup command. tenantid=\"{}\" timelineid=\"{}\" lsn=\"{:#?}\"", tenantid, timelineid, lsn diff --git a/pageserver/src/repository.rs b/pageserver/src/repository.rs index c56e571f5f..ff2d40182e 100644 --- a/pageserver/src/repository.rs +++ b/pageserver/src/repository.rs @@ -6,7 +6,7 @@ use std::collections::HashSet; use std::ops::AddAssign; use std::sync::Arc; use std::time::Duration; -use zenith_utils::lsn::Lsn; +use zenith_utils::lsn::{Lsn, RecordLsn}; use zenith_utils::zid::ZTimelineId; /// @@ -142,6 +142,9 @@ pub trait Timeline: Send + Sync { /// Advance requires aligned LSN as an argument and would wake wait_lsn() callers. /// Previous last record LSN is stored alongside the latest and can be read. fn advance_last_record_lsn(&self, lsn: Lsn); + /// Atomically get both last and prev. + fn get_last_record_rlsn(&self) -> RecordLsn; + /// Get last or prev record separately. Same as get_last_record_rlsn().last/prev. fn get_last_record_lsn(&self) -> Lsn; fn get_prev_record_lsn(&self) -> Lsn; diff --git a/zenith_utils/src/lsn.rs b/zenith_utils/src/lsn.rs index dba5a91a65..0f865964a4 100644 --- a/zenith_utils/src/lsn.rs +++ b/zenith_utils/src/lsn.rs @@ -7,6 +7,8 @@ use std::path::Path; use std::str::FromStr; use std::sync::atomic::{AtomicU64, Ordering}; +use crate::seqwait::MonotonicCounter; + /// Transaction log block size in bytes pub const XLOG_BLCKSZ: u32 = 8192; @@ -197,6 +199,28 @@ impl AtomicLsn { } } +/// Pair of LSN's pointing to the end of the last valid record and previous one +#[derive(Debug, Clone, Copy)] +pub struct RecordLsn { + /// LSN at the end of the current record + pub last: Lsn, + /// LSN at the end of the previous record + pub prev: Lsn, +} + +/// Expose `self.last` as counter to be able to use RecordLsn in SeqWait +impl MonotonicCounter for RecordLsn { + fn cnt_advance(&mut self, lsn: Lsn) { + assert!(self.last <= lsn); + let new_prev = self.last; + self.last = lsn; + self.prev = new_prev; + } + fn cnt_value(&self) -> Lsn { + self.last + } +} + #[cfg(test)] mod tests { use super::*;