diff --git a/pageserver/src/page_cache.rs b/pageserver/src/page_cache.rs index 77b9572c15..ddb175bfa2 100644 --- a/pageserver/src/page_cache.rs +++ b/pageserver/src/page_cache.rs @@ -29,28 +29,36 @@ use zenith_utils::seqwait::SeqWait; static TIMEOUT: Duration = Duration::from_secs(60); pub struct PageCache { - shared: Mutex, - // RocksDB handle db: rocksdb::DB, // WAL redo manager walredo_mgr: WalRedoManager, - // Allows waiting for the arrival of a particular LSN. - seqwait_lsn: SeqWait, + // What page versions do we hold in the cache? If we get GetPage with + // LSN < first_valid_lsn, that's an error because we (no longer) hold that + // page version. If we get a request > last_valid_lsn, we need to wait until + // we receive all the WAL up to the request. The SeqWait provides functions + // for that. + // + // last_record_lsn points to the end of last processed WAL record. + // It can lag behind last_valid_lsn, if the WAL receiver has received some WAL + // after the end of last record, but not the whole next record yet. In the + // page cache, we care about last_valid_lsn, but if the WAL receiver needs to + // restart the streaming, it needs to restart at the end of last record, so + // we track them separately. last_record_lsn should perhaps be in + // walreceiver.rs instead of here, but it seems convenient to keep all three + // values together. + // + first_valid_lsn: AtomicLsn, + last_valid_lsn: SeqWait, + last_record_lsn: AtomicLsn, // Counters, for metrics collection. pub num_entries: AtomicU64, pub num_page_images: AtomicU64, pub num_wal_records: AtomicU64, pub num_getpage_requests: AtomicU64, - - // copies of shared.first/last_valid_lsn fields (copied here so - // that they can be read without acquiring the mutex). - first_valid_lsn: AtomicLsn, - last_valid_lsn: AtomicLsn, - last_record_lsn: AtomicLsn, } #[derive(Clone)] @@ -70,29 +78,6 @@ impl AddAssign for PageCacheStats { } } -// -// Shared data structure, holding page cache and related auxiliary information -// -struct PageCacheShared { - // What page versions do we hold in the cache? If we get GetPage with - // LSN < first_valid_lsn, that's an error because we (no longer) hold that - // page version. If we get a request > last_valid_lsn, we need to wait until - // we receive all the WAL up to the request. - // - // last_record_lsn points to the end of last processed WAL record. - // It can lag behind last_valid_lsn, if the WAL receiver has received some WAL - // after the end of last record, but not the whole next record yet. In the - // page cache, we care about last_valid_lsn, but if the WAL receiver needs to - // restart the streaming, it needs to restart at the end of last record, so - // we track them separately. last_record_lsn should perhaps be in - // walreceiver.rs instead of here, but it seems convenient to keep all three - // values together. - // - first_valid_lsn: Lsn, - last_valid_lsn: Lsn, - last_record_lsn: Lsn, -} - lazy_static! { pub static ref PAGECACHES: Mutex>> = Mutex::new(HashMap::new()); @@ -166,26 +151,19 @@ fn open_rocksdb(_conf: &PageServerConf, timelineid: ZTimelineId) -> rocksdb::DB fn init_page_cache(conf: &PageServerConf, timelineid: ZTimelineId) -> PageCache { PageCache { - shared: Mutex::new(PageCacheShared { - first_valid_lsn: Lsn(0), - last_valid_lsn: Lsn(0), - last_record_lsn: Lsn(0), - }), - db: open_rocksdb(&conf, timelineid), walredo_mgr: WalRedoManager::new(conf, timelineid), - seqwait_lsn: SeqWait::new(Lsn(0)), + first_valid_lsn: AtomicLsn::new(0), + last_valid_lsn: SeqWait::new(Lsn(0)), + last_record_lsn: AtomicLsn::new(0), num_entries: AtomicU64::new(0), num_page_images: AtomicU64::new(0), num_wal_records: AtomicU64::new(0), num_getpage_requests: AtomicU64::new(0), - first_valid_lsn: AtomicLsn::new(0), - last_valid_lsn: AtomicLsn::new(0), - last_record_lsn: AtomicLsn::new(0), } } @@ -643,18 +621,13 @@ impl PageCache { /// Remember that WAL has been received and added to the page cache up to the given LSN pub fn advance_last_valid_lsn(&self, lsn: Lsn) { - let mut shared = self.shared.lock().unwrap(); + let old = self.last_valid_lsn.advance(lsn); // Can't move backwards. - let oldlsn = shared.last_valid_lsn; - if lsn >= oldlsn { - shared.last_valid_lsn = lsn; - self.last_valid_lsn.store(lsn); - self.seqwait_lsn.advance(lsn); - } else { + if lsn < old { warn!( "attempted to move last valid LSN backwards (was {}, new {})", - oldlsn, lsn + old, lsn ); } } @@ -665,17 +638,19 @@ impl PageCache { /// NOTE: this updates last_valid_lsn as well. /// pub fn advance_last_record_lsn(&self, lsn: Lsn) { - let mut shared = self.shared.lock().unwrap(); - // Can't move backwards. - assert!(lsn >= shared.last_valid_lsn); - assert!(lsn >= shared.last_record_lsn); + let old = self.last_record_lsn.fetch_max(lsn); + assert!(old <= lsn); - shared.last_valid_lsn = lsn; - shared.last_record_lsn = lsn; - self.last_valid_lsn.store(lsn); - self.last_record_lsn.store(lsn); - self.seqwait_lsn.advance(lsn); + // Also advance last_valid_lsn + let old = self.last_valid_lsn.advance(lsn); + // Can't move backwards. + if lsn < old { + warn!( + "attempted to move last record LSN backwards (was {}, new {})", + old, lsn + ); + } } /// @@ -684,39 +659,27 @@ impl PageCache { /// TODO: This should be called by garbage collection, so that if an older /// page is requested, we will return an error to the requestor. pub fn _advance_first_valid_lsn(&self, lsn: Lsn) { - let mut shared = self.shared.lock().unwrap(); - - // Can't move backwards. - assert!(lsn >= shared.first_valid_lsn); - // Can't overtake last_valid_lsn (except when we're // initializing the system and last_valid_lsn hasn't been set yet. - assert!(shared.last_valid_lsn == Lsn(0) || lsn < shared.last_valid_lsn); + let last_valid_lsn = self.last_valid_lsn.load(); + assert!(last_valid_lsn == Lsn(0) || lsn < last_valid_lsn); - shared.first_valid_lsn = lsn; - self.first_valid_lsn.store(lsn); + let old = self.first_valid_lsn.fetch_max(lsn); + // Can't move backwards. + assert!(lsn >= old); } pub fn init_valid_lsn(&self, lsn: Lsn) { - let mut shared = self.shared.lock().unwrap(); - - assert!(shared.first_valid_lsn == Lsn(0)); - assert!(shared.last_valid_lsn == Lsn(0)); - assert!(shared.last_record_lsn == Lsn(0)); - - shared.first_valid_lsn = lsn; - shared.last_valid_lsn = lsn; - shared.last_record_lsn = lsn; - - self.first_valid_lsn.store(lsn); - self.last_valid_lsn.store(lsn); - self.last_record_lsn.store(lsn); + let old = self.last_valid_lsn.advance(lsn); + assert!(old == Lsn(0)); + let old = self.last_record_lsn.fetch_max(lsn); + assert!(old == Lsn(0)); + let old = self.first_valid_lsn.fetch_max(lsn); + assert!(old == Lsn(0)); } pub fn get_last_valid_lsn(&self) -> Lsn { - let shared = self.shared.lock().unwrap(); - - shared.last_record_lsn + self.last_valid_lsn.load() } // @@ -913,7 +876,7 @@ impl PageCache { lsn = last_valid_lsn; } - self.seqwait_lsn + self.last_valid_lsn .wait_for_timeout(lsn, TIMEOUT) .with_context(|| { format!( diff --git a/zenith_utils/src/lsn.rs b/zenith_utils/src/lsn.rs index cf31d9861d..35ba53cd15 100644 --- a/zenith_utils/src/lsn.rs +++ b/zenith_utils/src/lsn.rs @@ -171,6 +171,12 @@ impl AtomicLsn { } Lsn(prev) } + + /// Atomically sets the Lsn to the max of old and new value, returning the old value. + pub fn fetch_max(&self, lsn: Lsn) -> Lsn { + let prev = self.inner.fetch_max(lsn.0, Ordering::AcqRel); + Lsn(prev) + } } #[cfg(test)] diff --git a/zenith_utils/src/seqwait.rs b/zenith_utils/src/seqwait.rs index 409090256e..d5ddc92b7a 100644 --- a/zenith_utils/src/seqwait.rs +++ b/zenith_utils/src/seqwait.rs @@ -173,17 +173,15 @@ where /// /// All waiters at this value or below will be woken. /// - /// `advance` will panic if you send it a lower number than - /// a previous call. - pub fn advance(&self, num: T) { + /// Returns the old number. + pub fn advance(&self, num: T) -> T { + let old_value; let wake_these = { let mut internal = self.internal.lock().unwrap(); - if internal.current > num { - panic!( - "tried to advance backwards, from {:?} to {:?}", - internal.current, num - ); + old_value = internal.current; + if old_value >= num { + return old_value; } internal.current = num; @@ -204,6 +202,12 @@ where // We don't care; discard the error. let _ = tx.send(()); } + old_value + } + + /// Read the current value, without waiting. + pub fn load(&self) -> T { + self.internal.lock().unwrap().current } } @@ -222,7 +226,8 @@ mod tests { let seq3 = Arc::clone(&seq); spawn(move || { seq2.wait_for(42).expect("wait_for 42"); - seq2.advance(100); + let old = seq2.advance(100); + assert_eq!(old, 99); seq2.wait_for(999).expect_err("no 999"); }); spawn(move || { @@ -230,8 +235,14 @@ mod tests { seq3.wait_for(0).expect("wait_for 0"); }); sleep(Duration::from_secs(1)); - seq.advance(99); + let old = seq.advance(99); + assert_eq!(old, 0); seq.wait_for(100).expect("wait_for 100"); + + // Calling advance with a smaller value is a no-op + assert_eq!(seq.advance(98), 100); + assert_eq!(seq.load(), 100); + seq.shutdown(); } @@ -247,6 +258,7 @@ mod tests { sleep(Duration::from_secs(1)); // This will attempt to wake, but nothing will happen // because the waiter already dropped its Receiver. - seq.advance(99); + let old = seq.advance(99); + assert_eq!(old, 0) } }