From 8c07a36fdadc08475b45dcc5c09f91d0777b57a1 Mon Sep 17 00:00:00 2001 From: Stas Kelvich Date: Thu, 2 Sep 2021 00:52:03 +0300 Subject: [PATCH] Remove last_valid_lsn tracking in wal_receiver. There are two main reasons for that: a) Latest unfinished record may disapper after compute node restart, so let's try not leak volatile part of the WAL into the repository. Always use last_valid_record instead. That change requires different getPage@LSN logic in postgres -- we need to ask LSN's that point to some complete record instead of GetFlushRecPtr() that can point in the middle of the record. That was already done by @knizhnik to deal with the same problem during the work on `postgres --sync-safekeepers`. Postgres will use LSN's aligned on 0x8 boundary in get_page requests, so we also need to be sure that last_valid_record is aligned. b) Switch to get_last_record_lsn() in basebackup@no_lsn. When compute node is running without safekeepers and streams WAL directly to pageserver it is important to match basebackup LSN and LSN of replication start. Before this commit basebackup@no_lsn was waiting for last_valid_lsn and walreceiver started replication with last_record_lsn, which can be less. So replication was failing since compute node doesn't have requested WAL. --- pageserver/src/branches.rs | 4 +- pageserver/src/layered_repository.rs | 149 +++++++++------------------ pageserver/src/page_service.rs | 2 +- pageserver/src/repository.rs | 122 ++++++++++------------ pageserver/src/restore_local_repo.rs | 6 +- pageserver/src/waldecoder.rs | 5 +- pageserver/src/walreceiver.rs | 23 ++--- vendor/postgres | 2 +- zenith_utils/src/lsn.rs | 10 ++ zenith_utils/src/seqwait.rs | 65 ++++++++---- 10 files changed, 176 insertions(+), 212 deletions(-) diff --git a/pageserver/src/branches.rs b/pageserver/src/branches.rs index da5b04bcc3..ead27740fc 100644 --- a/pageserver/src/branches.rs +++ b/pageserver/src/branches.rs @@ -162,7 +162,7 @@ fn bootstrap_timeline( run_initdb(conf, &initdb_path)?; let pgdata_path = initdb_path; - let lsn = get_lsn_from_controlfile(&pgdata_path)?; + let lsn = get_lsn_from_controlfile(&pgdata_path)?.align(); info!("bootstrap_timeline {:?} at lsn {}", pgdata_path, lsn); @@ -215,7 +215,7 @@ pub(crate) fn get_branches(conf: &PageServerConf, tenantid: &ZTenantId) -> Resul let latest_valid_lsn = repo .get_timeline(timeline_id) - .map(|timeline| timeline.get_last_valid_lsn()) + .map(|timeline| timeline.get_last_record_lsn()) .ok(); let ancestor_path = conf.ancestor_path(&timeline_id, tenantid); diff --git a/pageserver/src/layered_repository.rs b/pageserver/src/layered_repository.rs index 66bdcc50d5..0a670b7f6b 100644 --- a/pageserver/src/layered_repository.rs +++ b/pageserver/src/layered_repository.rs @@ -25,7 +25,7 @@ use std::io::Write; use std::ops::Bound::Included; use std::path::Path; use std::str::FromStr; -use std::sync::{Arc, Mutex, RwLock}; +use std::sync::{Arc, Mutex}; use std::time::{Duration, Instant}; use crate::relish::*; @@ -39,7 +39,7 @@ use zenith_metrics::{register_histogram, Histogram}; use zenith_metrics::{register_histogram_vec, HistogramVec}; use zenith_utils::bin_ser::BeSer; use zenith_utils::lsn::{AtomicLsn, Lsn}; -use zenith_utils::seqwait::SeqWait; +use zenith_utils::seqwait::{MonotonicCounter, SeqWait}; mod blob; mod delta_layer; @@ -116,6 +116,8 @@ impl Repository for LayeredRepository { ) -> Result> { let mut timelines = self.timelines.lock().unwrap(); + assert!(start_lsn.is_aligned()); + // Create the timeline directory, and write initial metadata to file. std::fs::create_dir_all(self.conf.timeline_path(&timelineid, &self.tenantid))?; @@ -145,6 +147,8 @@ impl Repository for LayeredRepository { /// Branch a timeline fn branch_timeline(&self, src: ZTimelineId, dst: ZTimelineId, start_lsn: Lsn) -> Result<()> { let src_timeline = self.get_timeline(src)?; + // This LSN comes from the user request. Make sure it is aligned. + let start_lsn = start_lsn.aligned(); // Create the metadata file, noting the ancestor of the new timeline. // There is initially no data in it, but all the read-calls know to look @@ -315,7 +319,10 @@ impl LayeredRepository { let path = conf.timeline_path(&timelineid, &tenantid).join("metadata"); let data = std::fs::read(&path)?; - Ok(TimelineMetadata::des(&data)?) + let data = TimelineMetadata::des(&data)?; + assert!(data.disk_consistent_lsn.is_aligned()); + + Ok(data) } // @@ -407,7 +414,7 @@ impl LayeredRepository { .collect(); let timeline = self.get_timeline_locked(timelineid, &mut *timelines)?; - let last_lsn = timeline.get_last_valid_lsn(); + let last_lsn = timeline.get_last_record_lsn(); if let Some(cutoff) = last_lsn.checked_sub(horizon) { // If GC was explicitly requested by the admin, force flush all in-memory @@ -451,19 +458,22 @@ pub struct TimelineMetadata { ancestor_lsn: Lsn, } -#[derive(Debug, Clone)] +#[derive(Debug, Clone, Copy)] struct RecordLsn { last: Lsn, prev: Lsn, } -impl RecordLsn { - fn advance(&mut self, lsn: Lsn) { +impl MonotonicCounter for RecordLsn { + fn cnt_advance(&mut self, lsn: Lsn) { assert!(self.last <= lsn); let new_prev = self.last; self.last = lsn; self.prev = new_prev; } + fn cnt_value(&self) -> Lsn { + self.last + } } pub struct LayeredTimeline { @@ -478,33 +488,19 @@ pub struct LayeredTimeline { walredo_mgr: Arc, // What page versions do we hold in the repository? If we get a - // request > last_valid_lsn, we need to wait until we receive all + // request > last_record_lsn, we need to wait until we receive all // the WAL up to the request. The SeqWait provides functions for // that. TODO: If we get a request for an old LSN, such that the // versions have already been garbage collected away, we should // throw an error, but we don't track that currently. // - // record_lsn.last points to the end of last processed WAL record. - // It can lag behind last_valid_lsn, if the WAL receiver has - // received some WAL after the end of last record, but not the whole - // next record yet. For get_page_at_lsn requests, we care about - // last_valid_lsn, but if the WAL receiver needs to restart the - // streaming, it needs to restart at the end of last record, so we - // track them separately. last_record_lsn should perhaps be in - // walreceiver.rs instead of here, but it seems convenient to keep - // all three values together. + // last_record_lsn.load().last points to the end of last processed WAL record. // // We also remember the starting point of the previous record in - // 'record_lsn.prev'. It's used to set the xl_prev pointer of the + // 'last_record_lsn.load().prev'. It's used to set the xl_prev pointer of the // first WAL record when the node is started up. But here, we just // keep track of it. - // - // When advancing last_valid_lsn and record_lsn simultaneously, we MUST - // advance last_valid_lsn before record_lsn. - // This is so if a reader wishes, it can read record_lsn and then - // last_valid_lsn, and find that last_valid_lsn >= record_lsn. - last_valid_lsn: SeqWait, - record_lsn: RwLock, + last_record_lsn: SeqWait, // All WAL records have been processed and stored durably on files on // local disk, up to this LSN. On crash and restart, we need to re-process @@ -675,7 +671,7 @@ impl Timeline for LayeredTimeline { debug!("put_truncation: {} to {} blocks at {}", rel, relsize, lsn); let oldsize = self - .get_relish_size(rel, self.last_valid_lsn.load())? + .get_relish_size(rel, self.get_last_record_lsn())? .ok_or_else(|| { anyhow!( "attempted to truncate non-existent relish {} at {}", @@ -722,7 +718,7 @@ impl Timeline for LayeredTimeline { trace!("put_unlink: {} at {}", rel, lsn); if rel.is_blocky() { - let oldsize_opt = self.get_relish_size(rel, self.last_valid_lsn.load())?; + let oldsize_opt = self.get_relish_size(rel, self.get_last_record_lsn())?; if let Some(oldsize) = oldsize_opt { let old_last_seg = if oldsize == 0 { 0 @@ -778,76 +774,25 @@ impl Timeline for LayeredTimeline { .observe_closure_duration(|| self.checkpoint_internal(true)) } - /// Remember that WAL has been received and added to the timeline up to the given LSN - fn advance_last_valid_lsn(&self, lsn: Lsn) { - let old = self.last_valid_lsn.advance(lsn); - - // The last valid LSN cannot move backwards, but when WAL - // receiver is restarted after having only partially processed - // a record, it can call this with an lsn older than previous - // last valid LSN, when it restarts processing that record. - if lsn < old { - // Should never be called with an LSN older than the last - // record LSN, though. - let last_record_lsn = self.get_last_record_lsn(); - if lsn < last_record_lsn { - warn!( - "attempted to move last valid LSN backwards beyond last record LSN (last record {}, new {})", - last_record_lsn, lsn - ); - } - } - } - - fn init_valid_lsn(&self, lsn: Lsn) { - // These writes must be specified in the order mentioned on the field comments. - let old = self.last_valid_lsn.advance(lsn); - assert!(old == Lsn(0)); - - { - let mut record_lsn = self.record_lsn.write().unwrap(); - - assert_eq!(record_lsn.last, Lsn(0)); - assert_eq!(record_lsn.prev, Lsn(0)); - - record_lsn.advance(lsn); - } - } - - fn get_last_valid_lsn(&self) -> Lsn { - self.last_valid_lsn.load() - } - /// /// Remember the (end of) last valid WAL record remembered in the timeline. /// - /// NOTE: this updates last_valid_lsn as well. - /// - fn advance_last_record_lsn(&self, lsn: Lsn) { - // These writes must be specified in the order mentioned on the field comments. - let old_valid_lsn = self.last_valid_lsn.advance(lsn); + fn advance_last_record_lsn(&self, new_lsn: Lsn) { + assert!(new_lsn.is_aligned()); - // Can't move backwards. - if lsn < old_valid_lsn { - warn!( - "attempted to move last record LSN backwards (was {}, new {})", - old_valid_lsn, lsn - ); - } + let old_lsn = self.last_record_lsn.advance(new_lsn); - { - let mut record_lsn = self.record_lsn.write().unwrap(); - assert!(record_lsn.last <= lsn); - record_lsn.advance(lsn); - } + // since we are align incoming LSN's we can't have delta less + // then 0x8 + assert!(old_lsn == new_lsn || (new_lsn.0 - old_lsn.0 >= 0x8)); } fn get_last_record_lsn(&self) -> Lsn { - self.record_lsn.read().unwrap().last + self.last_record_lsn.load().last } fn get_prev_record_lsn(&self) -> Lsn { - self.record_lsn.read().unwrap().prev + self.last_record_lsn.load().prev } } @@ -871,10 +816,8 @@ impl LayeredTimeline { walredo_mgr, - // initialize in-memory 'last_valid_lsn' and 'last_record_lsn' from - // 'disk_consistent_lsn'. - last_valid_lsn: SeqWait::new(metadata.disk_consistent_lsn), - record_lsn: RwLock::new(RecordLsn { + // initialize in-memory 'last_record_lsn' from 'disk_consistent_lsn'. + last_record_lsn: SeqWait::new(RecordLsn { last: metadata.disk_consistent_lsn, prev: metadata.prev_record_lsn.unwrap_or(Lsn(0)), }), @@ -1020,8 +963,15 @@ impl LayeredTimeline { fn get_layer_for_write(&self, seg: SegmentTag, lsn: Lsn) -> Result> { let layers = self.layers.lock().unwrap(); - if lsn < self.last_valid_lsn.load() { - bail!("cannot modify relation after advancing last_valid_lsn"); + assert!(lsn.is_aligned()); + + let last_record_lsn = self.get_last_record_lsn(); + if lsn < last_record_lsn { + panic!( + "cannot modify relation after advancing last_record_lsn (incoming_lsn={}, last_record_lsn={})", + lsn, + last_record_lsn + ); } // Do we have a layer open for writing already? @@ -1133,20 +1083,15 @@ impl LayeredTimeline { /// NOTE: This has nothing to do with checkpoint in PostgreSQL. We don't /// know anything about them here in the repository. fn checkpoint_internal(&self, force: bool) -> Result<()> { - // To hold the invariant last_valid_lsn >= last_record_lsn, - // we must follow the read order specified in the field comments. - let RecordLsn { last: last_record_lsn, prev: prev_record_lsn, - } = self.record_lsn.read().unwrap().clone(); - - let last_valid_lsn = self.last_valid_lsn.load(); + } = self.last_record_lsn.load(); trace!( "checkpointing timeline {} at {}", self.timelineid, - last_valid_lsn + last_record_lsn ); // Grab lock on the layer map. @@ -1169,10 +1114,10 @@ impl LayeredTimeline { while let Some(oldest_layer) = layers.peek_oldest_open() { // Does this layer need freezing? let oldest_pending_lsn = oldest_layer.get_oldest_pending_lsn(); - let distance = last_valid_lsn.0 - oldest_pending_lsn.0; + let distance = last_record_lsn.0 - oldest_pending_lsn.0; if !force && distance < OLDEST_INMEM_DISTANCE { info!( - "the oldest layer is now {} which is {} bytes behind last_valid_lsn", + "the oldest layer is now {} which is {} bytes behind last_record_lsn", oldest_layer.get_seg_tag(), distance ); @@ -1181,7 +1126,7 @@ impl LayeredTimeline { } // freeze it - let (new_historics, new_open) = oldest_layer.freeze(last_valid_lsn, &self)?; + let (new_historics, new_open) = oldest_layer.freeze(last_record_lsn, &self)?; // replace this layer with the new layers that 'freeze' returned layers.pop_oldest_open(); diff --git a/pageserver/src/page_service.rs b/pageserver/src/page_service.rs index b64fe261ef..3ba9e86b87 100644 --- a/pageserver/src/page_service.rs +++ b/pageserver/src/page_service.rs @@ -348,7 +348,7 @@ impl PageServerHandler { /* Send a tarball of the latest snapshot on the timeline */ - let req_lsn = lsn.unwrap_or_else(|| timeline.get_last_valid_lsn()); + let req_lsn = lsn.unwrap_or_else(|| timeline.get_last_record_lsn()); { let mut writer = CopyDataSink { pgb }; diff --git a/pageserver/src/repository.rs b/pageserver/src/repository.rs index 153091868c..c56e571f5f 100644 --- a/pageserver/src/repository.rs +++ b/pageserver/src/repository.rs @@ -137,24 +137,12 @@ pub trait Timeline: Send + Sync { /// This method is used for marking dropped relations and truncated SLRU segments fn put_unlink(&self, tag: RelishTag, lsn: Lsn) -> Result<()>; - /// Remember the all WAL before the given LSN has been processed. + /// Track end of the latest digested WAL record. /// - /// The WAL receiver calls this after the put_* functions, to indicate that - /// all WAL before this point has been digested. Before that, if you call - /// GET on an earlier LSN, it will block. - fn advance_last_valid_lsn(&self, lsn: Lsn); - fn get_last_valid_lsn(&self) -> Lsn; - fn init_valid_lsn(&self, lsn: Lsn); - - /// Like `advance_last_valid_lsn`, but this always points to the end of - /// a WAL record, not in the middle of one. - /// - /// This must be <= last valid LSN. This is tracked separately from last - /// valid LSN, so that the WAL receiver knows where to restart streaming. + /// Advance requires aligned LSN as an argument and would wake wait_lsn() callers. + /// Previous last record LSN is stored alongside the latest and can be read. fn advance_last_record_lsn(&self, lsn: Lsn); fn get_last_record_lsn(&self) -> Lsn; - - // Like `advance_last_record_lsn`, but points to the start position of last record fn get_prev_record_lsn(&self) -> Lsn; /// @@ -293,77 +281,76 @@ mod tests { // Create timeline to work on let timelineid = ZTimelineId::from_str("11223344556677881122334455667788").unwrap(); - let tline = repo.create_empty_timeline(timelineid, Lsn(0))?; + let tline = repo.create_empty_timeline(timelineid, Lsn(0x00))?; - tline.init_valid_lsn(Lsn(1)); - tline.put_page_image(TESTREL_A, 0, Lsn(2), TEST_IMG("foo blk 0 at 2"))?; - tline.put_page_image(TESTREL_A, 0, Lsn(2), TEST_IMG("foo blk 0 at 2"))?; - tline.put_page_image(TESTREL_A, 0, Lsn(3), TEST_IMG("foo blk 0 at 3"))?; - tline.put_page_image(TESTREL_A, 1, Lsn(4), TEST_IMG("foo blk 1 at 4"))?; - tline.put_page_image(TESTREL_A, 2, Lsn(5), TEST_IMG("foo blk 2 at 5"))?; + tline.put_page_image(TESTREL_A, 0, Lsn(0x20), TEST_IMG("foo blk 0 at 2"))?; + tline.put_page_image(TESTREL_A, 0, Lsn(0x20), TEST_IMG("foo blk 0 at 2"))?; + tline.put_page_image(TESTREL_A, 0, Lsn(0x30), TEST_IMG("foo blk 0 at 3"))?; + tline.put_page_image(TESTREL_A, 1, Lsn(0x40), TEST_IMG("foo blk 1 at 4"))?; + tline.put_page_image(TESTREL_A, 2, Lsn(0x50), TEST_IMG("foo blk 2 at 5"))?; - tline.advance_last_valid_lsn(Lsn(5)); + tline.advance_last_record_lsn(Lsn(0x50)); // The relation was created at LSN 2, not visible at LSN 1 yet. - assert_eq!(tline.get_rel_exists(TESTREL_A, Lsn(1))?, false); - assert!(tline.get_relish_size(TESTREL_A, Lsn(1))?.is_none()); + assert_eq!(tline.get_rel_exists(TESTREL_A, Lsn(0x10))?, false); + assert!(tline.get_relish_size(TESTREL_A, Lsn(0x10))?.is_none()); - assert_eq!(tline.get_rel_exists(TESTREL_A, Lsn(2))?, true); - assert_eq!(tline.get_relish_size(TESTREL_A, Lsn(2))?.unwrap(), 1); - assert_eq!(tline.get_relish_size(TESTREL_A, Lsn(5))?.unwrap(), 3); + assert_eq!(tline.get_rel_exists(TESTREL_A, Lsn(0x20))?, true); + assert_eq!(tline.get_relish_size(TESTREL_A, Lsn(0x20))?.unwrap(), 1); + assert_eq!(tline.get_relish_size(TESTREL_A, Lsn(0x50))?.unwrap(), 3); // Check page contents at each LSN assert_eq!( - tline.get_page_at_lsn(TESTREL_A, 0, Lsn(2))?, + tline.get_page_at_lsn(TESTREL_A, 0, Lsn(0x20))?, TEST_IMG("foo blk 0 at 2") ); assert_eq!( - tline.get_page_at_lsn(TESTREL_A, 0, Lsn(3))?, + tline.get_page_at_lsn(TESTREL_A, 0, Lsn(0x30))?, TEST_IMG("foo blk 0 at 3") ); assert_eq!( - tline.get_page_at_lsn(TESTREL_A, 0, Lsn(4))?, + tline.get_page_at_lsn(TESTREL_A, 0, Lsn(0x40))?, TEST_IMG("foo blk 0 at 3") ); assert_eq!( - tline.get_page_at_lsn(TESTREL_A, 1, Lsn(4))?, + tline.get_page_at_lsn(TESTREL_A, 1, Lsn(0x40))?, TEST_IMG("foo blk 1 at 4") ); assert_eq!( - tline.get_page_at_lsn(TESTREL_A, 0, Lsn(5))?, + tline.get_page_at_lsn(TESTREL_A, 0, Lsn(0x50))?, TEST_IMG("foo blk 0 at 3") ); assert_eq!( - tline.get_page_at_lsn(TESTREL_A, 1, Lsn(5))?, + tline.get_page_at_lsn(TESTREL_A, 1, Lsn(0x50))?, TEST_IMG("foo blk 1 at 4") ); assert_eq!( - tline.get_page_at_lsn(TESTREL_A, 2, Lsn(5))?, + tline.get_page_at_lsn(TESTREL_A, 2, Lsn(0x50))?, TEST_IMG("foo blk 2 at 5") ); // Truncate last block - tline.put_truncation(TESTREL_A, Lsn(6), 2)?; - tline.advance_last_valid_lsn(Lsn(6)); + tline.put_truncation(TESTREL_A, Lsn(0x60), 2)?; + tline.advance_last_record_lsn(Lsn(0x60)); // Check reported size and contents after truncation - assert_eq!(tline.get_relish_size(TESTREL_A, Lsn(6))?.unwrap(), 2); + assert_eq!(tline.get_relish_size(TESTREL_A, Lsn(0x60))?.unwrap(), 2); assert_eq!( - tline.get_page_at_lsn(TESTREL_A, 0, Lsn(6))?, + tline.get_page_at_lsn(TESTREL_A, 0, Lsn(0x60))?, TEST_IMG("foo blk 0 at 3") ); assert_eq!( - tline.get_page_at_lsn(TESTREL_A, 1, Lsn(6))?, + tline.get_page_at_lsn(TESTREL_A, 1, Lsn(0x60))?, TEST_IMG("foo blk 1 at 4") ); // should still see the truncated block with older LSN - assert_eq!(tline.get_relish_size(TESTREL_A, Lsn(5))?.unwrap(), 3); + assert_eq!(tline.get_relish_size(TESTREL_A, Lsn(0x50))?.unwrap(), 3); assert_eq!( - tline.get_page_at_lsn(TESTREL_A, 2, Lsn(5))?, + tline.get_page_at_lsn(TESTREL_A, 2, Lsn(0x50))?, TEST_IMG("foo blk 2 at 5") ); @@ -376,17 +363,15 @@ mod tests { fn test_large_rel() -> Result<()> { let repo = get_test_repo("test_large_rel")?; let timelineid = ZTimelineId::from_str("11223344556677881122334455667788").unwrap(); - let tline = repo.create_empty_timeline(timelineid, Lsn(0))?; + let tline = repo.create_empty_timeline(timelineid, Lsn(0x00))?; - tline.init_valid_lsn(Lsn(1)); - - let mut lsn = 1; + let mut lsn = 0x10; for blknum in 0..pg_constants::RELSEG_SIZE + 1 { let img = TEST_IMG(&format!("foo blk {} at {}", blknum, Lsn(lsn))); - lsn += 1; + lsn += 0x10; tline.put_page_image(TESTREL_A, blknum as u32, Lsn(lsn), img)?; } - tline.advance_last_valid_lsn(Lsn(lsn)); + tline.advance_last_record_lsn(Lsn(lsn)); assert_eq!( tline.get_relish_size(TESTREL_A, Lsn(lsn))?.unwrap(), @@ -394,18 +379,18 @@ mod tests { ); // Truncate one block - lsn += 1; + lsn += 0x10; tline.put_truncation(TESTREL_A, Lsn(lsn), pg_constants::RELSEG_SIZE)?; - tline.advance_last_valid_lsn(Lsn(lsn)); + tline.advance_last_record_lsn(Lsn(lsn)); assert_eq!( tline.get_relish_size(TESTREL_A, Lsn(lsn))?.unwrap(), pg_constants::RELSEG_SIZE ); // Truncate another block - lsn += 1; + lsn += 0x10; tline.put_truncation(TESTREL_A, Lsn(lsn), pg_constants::RELSEG_SIZE - 1)?; - tline.advance_last_valid_lsn(Lsn(lsn)); + tline.advance_last_record_lsn(Lsn(lsn)); assert_eq!( tline.get_relish_size(TESTREL_A, Lsn(lsn))?.unwrap(), pg_constants::RELSEG_SIZE - 1 @@ -415,9 +400,9 @@ mod tests { // This tests the behavior at segment boundaries let mut size: i32 = 3000; while size >= 0 { - lsn += 1; + lsn += 0x10; tline.put_truncation(TESTREL_A, Lsn(lsn), size as u32)?; - tline.advance_last_valid_lsn(Lsn(lsn)); + tline.advance_last_record_lsn(Lsn(lsn)); assert_eq!( tline.get_relish_size(TESTREL_A, Lsn(lsn))?.unwrap(), size as u32 @@ -436,48 +421,47 @@ mod tests { fn test_branch() -> Result<()> { let repo = get_test_repo("test_branch")?; let timelineid = ZTimelineId::from_str("11223344556677881122334455667788").unwrap(); - let tline = repo.create_empty_timeline(timelineid, Lsn(0))?; + let tline = repo.create_empty_timeline(timelineid, Lsn(0x00))?; // Import initial dummy checkpoint record, otherwise the get_timeline() call // after branching fails below - tline.put_page_image(RelishTag::Checkpoint, 0, Lsn(1), ZERO_PAGE.clone())?; + tline.put_page_image(RelishTag::Checkpoint, 0, Lsn(0x10), ZERO_PAGE.clone())?; // Create a relation on the timeline - tline.init_valid_lsn(Lsn(1)); - tline.put_page_image(TESTREL_A, 0, Lsn(2), TEST_IMG("foo blk 0 at 2"))?; - tline.put_page_image(TESTREL_A, 0, Lsn(3), TEST_IMG("foo blk 0 at 3"))?; - tline.put_page_image(TESTREL_A, 0, Lsn(4), TEST_IMG("foo blk 0 at 4"))?; + tline.put_page_image(TESTREL_A, 0, Lsn(0x20), TEST_IMG("foo blk 0 at 2"))?; + tline.put_page_image(TESTREL_A, 0, Lsn(0x30), TEST_IMG("foo blk 0 at 3"))?; + tline.put_page_image(TESTREL_A, 0, Lsn(0x40), TEST_IMG("foo blk 0 at 4"))?; // Create another relation - tline.put_page_image(TESTREL_B, 0, Lsn(2), TEST_IMG("foobar blk 0 at 2"))?; + tline.put_page_image(TESTREL_B, 0, Lsn(0x20), TEST_IMG("foobar blk 0 at 2"))?; - tline.advance_last_valid_lsn(Lsn(4)); + tline.advance_last_record_lsn(Lsn(0x40)); // Branch the history, modify relation differently on the new timeline let newtimelineid = ZTimelineId::from_str("AA223344556677881122334455667788").unwrap(); - repo.branch_timeline(timelineid, newtimelineid, Lsn(3))?; + repo.branch_timeline(timelineid, newtimelineid, Lsn(0x30))?; let newtline = repo.get_timeline(newtimelineid)?; - newtline.put_page_image(TESTREL_A, 0, Lsn(4), TEST_IMG("bar blk 0 at 4"))?; - newtline.advance_last_valid_lsn(Lsn(4)); + newtline.put_page_image(TESTREL_A, 0, Lsn(0x40), TEST_IMG("bar blk 0 at 4"))?; + newtline.advance_last_record_lsn(Lsn(0x40)); // Check page contents on both branches assert_eq!( - tline.get_page_at_lsn(TESTREL_A, 0, Lsn(4))?, + tline.get_page_at_lsn(TESTREL_A, 0, Lsn(0x40))?, TEST_IMG("foo blk 0 at 4") ); assert_eq!( - newtline.get_page_at_lsn(TESTREL_A, 0, Lsn(4))?, + newtline.get_page_at_lsn(TESTREL_A, 0, Lsn(0x40))?, TEST_IMG("bar blk 0 at 4") ); assert_eq!( - newtline.get_page_at_lsn(TESTREL_B, 0, Lsn(4))?, + newtline.get_page_at_lsn(TESTREL_B, 0, Lsn(0x40))?, TEST_IMG("foobar blk 0 at 2") ); - assert_eq!(newtline.get_relish_size(TESTREL_B, Lsn(4))?.unwrap(), 1); + assert_eq!(newtline.get_relish_size(TESTREL_B, Lsn(0x40))?.unwrap(), 1); Ok(()) } diff --git a/pageserver/src/restore_local_repo.rs b/pageserver/src/restore_local_repo.rs index 768f557e6f..9cb56a12ca 100644 --- a/pageserver/src/restore_local_repo.rs +++ b/pageserver/src/restore_local_repo.rs @@ -135,7 +135,7 @@ pub fn import_timeline_from_postgres_datadir( } // TODO: Scan pg_tblspc - timeline.advance_last_valid_lsn(lsn); + timeline.advance_last_record_lsn(lsn.align()); timeline.checkpoint()?; Ok(()) @@ -338,7 +338,7 @@ pub fn import_timeline_wal(walpath: &Path, timeline: &dyn Timeline, startpoint: let checkpoint_bytes = checkpoint.encode(); timeline.put_page_image(RelishTag::Checkpoint, 0, last_lsn, checkpoint_bytes)?; - timeline.advance_last_valid_lsn(last_lsn); + timeline.advance_last_record_lsn(last_lsn.align()); timeline.checkpoint()?; Ok(()) } @@ -536,7 +536,7 @@ pub fn save_decoded_record( } // Now that this record has been handled, let the repository know that // it is up-to-date to this LSN - timeline.advance_last_record_lsn(lsn); + timeline.advance_last_record_lsn(lsn.align()); Ok(()) } diff --git a/pageserver/src/waldecoder.rs b/pageserver/src/waldecoder.rs index 9ebc5c4023..a9f6a98dd2 100644 --- a/pageserver/src/waldecoder.rs +++ b/pageserver/src/waldecoder.rs @@ -179,7 +179,10 @@ impl WalStreamDecoder { self.padlen = self.lsn.calc_padding(8u32) as u32; } - let result = (self.lsn, recordbuf); + // Always align resulting LSN on 0x8 boundary -- that is important for getPage() + // and WalReceiver integration. Since this code is used both for WalReceiver and + // initial WAL import let's force alignment right here. + let result = (self.lsn.align(), recordbuf); return Ok(Some(result)); } continue; diff --git a/pageserver/src/walreceiver.rs b/pageserver/src/walreceiver.rs index 9936e2569d..424d961b57 100644 --- a/pageserver/src/walreceiver.rs +++ b/pageserver/src/walreceiver.rs @@ -153,8 +153,7 @@ fn walreceiver_main( // Start streaming the WAL, from where we left off previously. // // If we had previously received WAL up to some point in the middle of a WAL record, we - // better start from the end of last full WAL record, not in the middle of one. Hence, - // use 'last_record_lsn' rather than 'last_valid_lsn' here. + // better start from the end of last full WAL record, not in the middle of one. let mut last_rec_lsn = timeline.get_last_record_lsn(); let mut startpoint = last_rec_lsn; @@ -163,10 +162,6 @@ fn walreceiver_main( } // There might be some padding after the last full record, skip it. - // - // FIXME: It probably would be better to always start streaming from the beginning - // of the page, or the segment, so that we could check the page/segment headers - // too. Just for the sake of paranoia. startpoint += startpoint.calc_padding(8u32); debug!( @@ -212,6 +207,12 @@ fn walreceiver_main( // Save old checkpoint value to compare with it after decoding WAL record let old_checkpoint_bytes = checkpoint.encode(); let decoded = decode_wal_record(recdata.clone()); + + // It is important to deal with the aligned records as lsn in getPage@LSN is + // aligned and can be several bytes bigger. Without this alignment we are + // at risk of hittind a deadlock. + assert!(lsn.is_aligned()); + restore_local_repo::save_decoded_record( &mut checkpoint, &*timeline, @@ -233,14 +234,6 @@ fn walreceiver_main( } } - // Update the last_valid LSN value in the timeline one more time. We updated - // it in the loop above, between each WAL record, but we might have received - // a partial record after the last completed record. Our timeline's value - // better reflect that, because GetPage@LSN requests might also point in the - // middle of a record, if the request LSN was taken from the server's current - // flush ptr. - timeline.advance_last_valid_lsn(endlsn); - // Somewhat arbitrarily, if we have at least 10 complete wal segments (16 MB each), // "checkpoint" the repository to flush all the changes from WAL we've processed // so far to disk. After this, we don't need the original WAL anymore, and it @@ -290,7 +283,7 @@ fn walreceiver_main( ); if reply_requested { - Some(timeline.get_last_valid_lsn()) + Some(timeline.get_last_record_lsn()) } else { None } diff --git a/vendor/postgres b/vendor/postgres index 9b1c9cb2e4..909c606355 160000 --- a/vendor/postgres +++ b/vendor/postgres @@ -1 +1 @@ -Subproject commit 9b1c9cb2e49e2214feb67f769c28f830eada95c6 +Subproject commit 909c606355dcf5b53a8d97e81e28997675de5936 diff --git a/zenith_utils/src/lsn.rs b/zenith_utils/src/lsn.rs index 859d6db09d..dba5a91a65 100644 --- a/zenith_utils/src/lsn.rs +++ b/zenith_utils/src/lsn.rs @@ -85,6 +85,16 @@ impl Lsn { // (Regular subtraction will panic on overflow in debug builds.) (sz.wrapping_sub(self.0)) % sz } + + /// Align LSN on 8-byte boundary (alignment of WAL records). + pub fn align(&self) -> Lsn { + Lsn((self.0 + 7) & !7) + } + + /// Align LSN on 8-byte boundary (alignment of WAL records). + pub fn is_aligned(&self) -> bool { + *self == self.align() + } } impl From for Lsn { diff --git a/zenith_utils/src/seqwait.rs b/zenith_utils/src/seqwait.rs index d5ddc92b7a..bc32f51b13 100644 --- a/zenith_utils/src/seqwait.rs +++ b/zenith_utils/src/seqwait.rs @@ -18,13 +18,28 @@ pub enum SeqWaitError { Shutdown, } +/// Monotonically increasing value +/// +/// It is handy to store some other fields under the same mutex in SeqWait +/// (e.g. store prev_record_lsn). So we allow SeqWait to be parametrized with +/// any type that can expose counter. is the type of exposed counter. +pub trait MonotonicCounter { + /// Bump counter value and check that it goes forward + /// N.B.: new_val is an actual new value, not a difference. + fn cnt_advance(&mut self, new_val: V); + + /// Get counter value + fn cnt_value(&self) -> V; +} + /// Internal components of a `SeqWait` -struct SeqWaitInt +struct SeqWaitInt where - T: Ord, + S: MonotonicCounter, + V: Ord, { - waiters: BinaryHeap>, - current: T, + waiters: BinaryHeap>, + current: S, shutdown: bool, } @@ -72,19 +87,23 @@ impl Eq for Waiter {} /// [`wait_for`]: SeqWait::wait_for /// [`advance`]: SeqWait::advance /// -pub struct SeqWait +/// means Storage, is type of counter that this storage exposes. +/// +pub struct SeqWait where - T: Ord, + S: MonotonicCounter, + V: Ord, { - internal: Mutex>, + internal: Mutex>, } -impl SeqWait +impl SeqWait where - T: Ord + Debug + Copy, + S: MonotonicCounter + Copy, + V: Ord + Copy, { /// Create a new `SeqWait`, initialized to a particular number - pub fn new(starting_num: T) -> Self { + pub fn new(starting_num: S) -> Self { let internal = SeqWaitInt { waiters: BinaryHeap::new(), current: starting_num, @@ -122,7 +141,7 @@ where /// /// This call won't complete until someone has called `advance` /// with a number greater than or equal to the one we're waiting for. - pub fn wait_for(&self, num: T) -> Result<(), SeqWaitError> { + pub fn wait_for(&self, num: V) -> Result<(), SeqWaitError> { match self.queue_for_wait(num) { Ok(None) => Ok(()), Ok(Some(rx)) => rx.recv().map_err(|_| SeqWaitError::Shutdown), @@ -137,7 +156,7 @@ where /// /// If that hasn't happened after the specified timeout duration, /// [`SeqWaitError::Timeout`] will be returned. - pub fn wait_for_timeout(&self, num: T, timeout_duration: Duration) -> Result<(), SeqWaitError> { + pub fn wait_for_timeout(&self, num: V, timeout_duration: Duration) -> Result<(), SeqWaitError> { match self.queue_for_wait(num) { Ok(None) => Ok(()), Ok(Some(rx)) => rx.recv_timeout(timeout_duration).map_err(|e| match e { @@ -150,9 +169,9 @@ where /// Register and return a channel that will be notified when a number arrives, /// or None, if it has already arrived. - fn queue_for_wait(&self, num: T) -> Result>, SeqWaitError> { + fn queue_for_wait(&self, num: V) -> Result>, SeqWaitError> { let mut internal = self.internal.lock().unwrap(); - if internal.current >= num { + if internal.current.cnt_value() >= num { return Ok(None); } if internal.shutdown { @@ -174,16 +193,16 @@ where /// All waiters at this value or below will be woken. /// /// Returns the old number. - pub fn advance(&self, num: T) -> T { + pub fn advance(&self, num: V) -> V { let old_value; let wake_these = { let mut internal = self.internal.lock().unwrap(); - old_value = internal.current; + old_value = internal.current.cnt_value(); if old_value >= num { return old_value; } - internal.current = num; + internal.current.cnt_advance(num); // Pop all waiters <= num from the heap. Collect them in a vector, and // wake them up after releasing the lock. @@ -206,7 +225,7 @@ where } /// Read the current value, without waiting. - pub fn load(&self) -> T { + pub fn load(&self) -> S { self.internal.lock().unwrap().current } } @@ -219,6 +238,16 @@ mod tests { use std::thread::spawn; use std::time::Duration; + impl MonotonicCounter for i32 { + fn cnt_advance(&mut self, val: i32) { + assert!(*self <= val); + *self = val; + } + fn cnt_value(&self) -> i32 { + *self + } + } + #[test] fn seqwait() { let seq = Arc::new(SeqWait::new(0));