From 7c7e89e2ea6e2f75704b7c222704f76f115d8176 Mon Sep 17 00:00:00 2001 From: Patrick Insinger Date: Sat, 28 Aug 2021 16:02:10 -0700 Subject: [PATCH] layered_repo - atomic last/prev record_lsn Make a new type that stores both Lsns. Use an RwLock for thread safety. --- pageserver/src/layered_repository.rs | 91 +++++++++++++++++++--------- 1 file changed, 61 insertions(+), 30 deletions(-) diff --git a/pageserver/src/layered_repository.rs b/pageserver/src/layered_repository.rs index 95de2a525c..a97bd9b904 100644 --- a/pageserver/src/layered_repository.rs +++ b/pageserver/src/layered_repository.rs @@ -23,7 +23,7 @@ use std::fs; use std::fs::File; use std::io::Write; use std::ops::Bound::Included; -use std::sync::{Arc, Mutex}; +use std::sync::{Arc, Mutex, RwLock}; use std::time::{Duration, Instant}; use crate::relish::*; @@ -223,7 +223,7 @@ impl LayeredRepository { .conf .timeline_path(&timelineid, &self.tenantid) .join("wal"); - import_timeline_wal(&wal_dir, &timeline, timeline.last_record_lsn.load())?; + import_timeline_wal(&wal_dir, &timeline, timeline.get_last_record_lsn())?; let timeline_rc = Arc::new(timeline); timelines.insert(timelineid, timeline_rc.clone()); @@ -447,6 +447,21 @@ pub struct TimelineMetadata { ancestor_lsn: Lsn, } +#[derive(Debug, Clone)] +struct RecordLsn { + last: Lsn, + prev: Lsn, +} + +impl RecordLsn { + fn advance(&mut self, lsn: Lsn) { + assert!(self.last <= lsn); + let new_prev = self.last; + self.last = lsn; + self.prev = new_prev; + } +} + pub struct LayeredTimeline { conf: &'static PageServerConf, @@ -465,7 +480,7 @@ pub struct LayeredTimeline { // versions have already been garbage collected away, we should // throw an error, but we don't track that currently. // - // last_record_lsn points to the end of last processed WAL record. + // record_lsn.last points to the end of last processed WAL record. // It can lag behind last_valid_lsn, if the WAL receiver has // received some WAL after the end of last record, but not the whole // next record yet. For get_page_at_lsn requests, we care about @@ -476,14 +491,16 @@ pub struct LayeredTimeline { // all three values together. // // We also remember the starting point of the previous record in - // 'prev_record_lsn'. It's used to set the xl_prev pointer of the + // 'record_lsn.prev'. It's used to set the xl_prev pointer of the // first WAL record when the node is started up. But here, we just - // keep track of it. FIXME: last_record_lsn and prev_record_lsn - // should be updated atomically together. + // keep track of it. // + // When advancing last_valid_lsn and record_lsn simultaneously, we MUST + // advance last_valid_lsn before record_lsn. + // This is so if a reader wishes, it can read record_lsn and then + // last_valid_lsn, and find that last_valid_lsn >= record_lsn. last_valid_lsn: SeqWait, - last_record_lsn: AtomicLsn, - prev_record_lsn: AtomicLsn, + record_lsn: RwLock, // All WAL records have been processed and stored durably on files on // local disk, up to this LSN. On crash and restart, we need to re-process @@ -768,7 +785,7 @@ impl Timeline for LayeredTimeline { if lsn < old { // Should never be called with an LSN older than the last // record LSN, though. - let last_record_lsn = self.last_record_lsn.load(); + let last_record_lsn = self.get_last_record_lsn(); if lsn < last_record_lsn { warn!( "attempted to move last valid LSN backwards beyond last record LSN (last record {}, new {})", @@ -779,11 +796,18 @@ impl Timeline for LayeredTimeline { } fn init_valid_lsn(&self, lsn: Lsn) { + // These writes must be specified in the order mentioned on the field comments. let old = self.last_valid_lsn.advance(lsn); assert!(old == Lsn(0)); - let old = self.last_record_lsn.fetch_max(lsn); - assert!(old == Lsn(0)); - self.prev_record_lsn.store(Lsn(0)); + + { + let mut record_lsn = self.record_lsn.write().unwrap(); + + assert_eq!(record_lsn.last, Lsn(0)); + assert_eq!(record_lsn.prev, Lsn(0)); + + record_lsn.advance(lsn); + } } fn get_last_valid_lsn(&self) -> Lsn { @@ -796,30 +820,30 @@ impl Timeline for LayeredTimeline { /// NOTE: this updates last_valid_lsn as well. /// fn advance_last_record_lsn(&self, lsn: Lsn) { - // Can't move backwards. - let old = self.last_record_lsn.fetch_max(lsn); - assert!(old <= lsn); + // These writes must be specified in the order mentioned on the field comments. + let old_valid_lsn = self.last_valid_lsn.advance(lsn); - // Use old value of last_record_lsn as prev_record_lsn - self.prev_record_lsn.fetch_max(old); - - // Also advance last_valid_lsn - let old = self.last_valid_lsn.advance(lsn); // Can't move backwards. - if lsn < old { + if lsn < old_valid_lsn { warn!( "attempted to move last record LSN backwards (was {}, new {})", - old, lsn + old_valid_lsn, lsn ); } + + { + let mut record_lsn = self.record_lsn.write().unwrap(); + assert!(record_lsn.last <= lsn); + record_lsn.advance(lsn); + } } fn get_last_record_lsn(&self) -> Lsn { - self.last_record_lsn.load() + self.record_lsn.read().unwrap().last } fn get_prev_record_lsn(&self) -> Lsn { - self.prev_record_lsn.load() + self.record_lsn.read().unwrap().prev } } @@ -846,9 +870,10 @@ impl LayeredTimeline { // initialize in-memory 'last_valid_lsn' and 'last_record_lsn' from // 'disk_consistent_lsn'. last_valid_lsn: SeqWait::new(metadata.disk_consistent_lsn), - last_record_lsn: AtomicLsn::new(metadata.disk_consistent_lsn.0), - - prev_record_lsn: AtomicLsn::new(metadata.prev_record_lsn.unwrap_or(Lsn(0)).0), + record_lsn: RwLock::new(RecordLsn { + last: metadata.disk_consistent_lsn, + prev: metadata.prev_record_lsn.unwrap_or(Lsn(0)), + }), disk_consistent_lsn: AtomicLsn::new(metadata.disk_consistent_lsn.0), ancestor_timeline: ancestor, @@ -1104,10 +1129,16 @@ impl LayeredTimeline { /// NOTE: This has nothing to do with checkpoint in PostgreSQL. We don't /// know anything about them here in the repository. fn checkpoint_internal(&self, force: bool) -> Result<()> { - // FIXME: these should be fetched atomically. + // To hold the invariant last_valid_lsn >= last_record_lsn, + // we must follow the read order specified in the field comments. + + let RecordLsn { + last: last_record_lsn, + prev: prev_record_lsn, + } = self.record_lsn.read().unwrap().clone(); + let last_valid_lsn = self.last_valid_lsn.load(); - let last_record_lsn = self.last_record_lsn.load(); - let prev_record_lsn = self.prev_record_lsn.load(); + trace!( "checkpointing timeline {} at {}", self.timelineid,