From 07d0241076aa8ae1da3fd32e8ed11649b7fcb4f2 Mon Sep 17 00:00:00 2001 From: Eric Seppanen Date: Sat, 24 Apr 2021 15:00:09 -0700 Subject: [PATCH] add AtomicLsn AtomicLsn is a wrapper around AtomicU64 that has load() and store() members that are cheap (on x86, anyway) and can be safely used in any context. This commit uses AtomicLsn in the page cache, and fixes up some downstream code that manually implemented LSN formatting. There's also a bugfix to the logging in wait_lsn, which prints the wrong lsn value. --- pageserver/src/page_cache.rs | 73 +++++++++++++++++++----------------- pageserver/src/tui.rs | 12 +----- zenith_utils/src/lsn.rs | 36 ++++++++++++++++++ 3 files changed, 76 insertions(+), 45 deletions(-) diff --git a/pageserver/src/page_cache.rs b/pageserver/src/page_cache.rs index 82e03b8e51..55e4491c4c 100644 --- a/pageserver/src/page_cache.rs +++ b/pageserver/src/page_cache.rs @@ -39,7 +39,7 @@ use bytes::{Buf, BufMut, Bytes, BytesMut}; use lazy_static::lazy_static; use log::*; use rocksdb; -use std::cmp::min; +use std::cmp::{max, min}; use std::collections::HashMap; use std::sync::atomic::AtomicU64; use std::sync::atomic::Ordering; @@ -47,7 +47,7 @@ use std::sync::{Arc, Mutex}; use std::thread; use std::time::{Duration, Instant}; use std::{convert::TryInto, ops::AddAssign}; -use zenith_utils::lsn::Lsn; +use zenith_utils::lsn::{AtomicLsn, Lsn}; use zenith_utils::seqwait::SeqWait; // Timeout when waiting or WAL receiver to catch up to an LSN given in a GetPage@LSN call. @@ -73,9 +73,9 @@ pub struct PageCache { // copies of shared.first/last_valid_lsn fields (copied here so // that they can be read without acquiring the mutex). - pub first_valid_lsn: AtomicU64, - pub last_valid_lsn: AtomicU64, - pub last_record_lsn: AtomicU64, + first_valid_lsn: AtomicLsn, + last_valid_lsn: AtomicLsn, + last_record_lsn: AtomicLsn, } #[derive(Clone)] @@ -84,9 +84,9 @@ pub struct PageCacheStats { pub num_page_images: u64, pub num_wal_records: u64, pub num_getpage_requests: u64, - pub first_valid_lsn: u64, - pub last_valid_lsn: u64, - pub last_record_lsn: u64, + pub first_valid_lsn: Lsn, + pub last_valid_lsn: Lsn, + pub last_record_lsn: Lsn, } impl AddAssign for PageCacheStats { @@ -96,9 +96,12 @@ impl AddAssign for PageCacheStats { num_page_images: self.num_page_images + other.num_page_images, num_wal_records: self.num_wal_records + other.num_wal_records, num_getpage_requests: self.num_getpage_requests + other.num_getpage_requests, - first_valid_lsn: self.first_valid_lsn + other.first_valid_lsn, - last_valid_lsn: self.last_valid_lsn + other.last_valid_lsn, - last_record_lsn: self.last_record_lsn + other.last_record_lsn, + + // FIXME: needs review + // What should be happening here? I'm not sure what is the desired result. + first_valid_lsn: min(self.first_valid_lsn, other.first_valid_lsn), + last_valid_lsn: max(self.last_valid_lsn, other.last_valid_lsn), + last_record_lsn: max(self.last_record_lsn, other.last_record_lsn), } } } @@ -221,9 +224,9 @@ fn init_page_cache(conf: &PageServerConf, timelineid: ZTimelineId) -> PageCache num_wal_records: AtomicU64::new(0), num_getpage_requests: AtomicU64::new(0), - first_valid_lsn: AtomicU64::new(0), - last_valid_lsn: AtomicU64::new(0), - last_record_lsn: AtomicU64::new(0), + first_valid_lsn: AtomicLsn::new(0), + last_valid_lsn: AtomicLsn::new(0), + last_record_lsn: AtomicLsn::new(0), } } @@ -577,7 +580,7 @@ impl PageCache { let mut key = CacheKey { tag, lsn: rec.lsn }; // What was the size of the relation before this record? - let last_lsn = Lsn::from(self.last_valid_lsn.load(Ordering::Acquire)); + let last_lsn = self.last_valid_lsn.load(); let old_rel_size = self.relsize_get_nowait(&tag.rel, last_lsn)?; let content = CacheEntryContent { @@ -687,7 +690,7 @@ impl PageCache { let oldlsn = shared.last_valid_lsn; if lsn >= oldlsn { shared.last_valid_lsn = lsn; - self.last_valid_lsn.store(lsn.0, Ordering::Relaxed); + self.last_valid_lsn.store(lsn); self.seqwait_lsn.advance(lsn); } else { warn!( @@ -711,8 +714,8 @@ impl PageCache { shared.last_valid_lsn = lsn; shared.last_record_lsn = lsn; - self.last_valid_lsn.store(lsn.0, Ordering::Relaxed); - self.last_record_lsn.store(lsn.0, Ordering::Relaxed); + self.last_valid_lsn.store(lsn); + self.last_record_lsn.store(lsn); self.seqwait_lsn.advance(lsn); } @@ -732,7 +735,7 @@ impl PageCache { assert!(shared.last_valid_lsn == Lsn(0) || lsn < shared.last_valid_lsn); shared.first_valid_lsn = lsn; - self.first_valid_lsn.store(lsn.0, Ordering::Relaxed); + self.first_valid_lsn.store(lsn); } pub fn init_valid_lsn(&self, lsn: Lsn) { @@ -746,9 +749,9 @@ impl PageCache { shared.last_valid_lsn = lsn; shared.last_record_lsn = lsn; - self.first_valid_lsn.store(lsn.0, Ordering::Relaxed); - self.last_valid_lsn.store(lsn.0, Ordering::Relaxed); - self.last_record_lsn.store(lsn.0, Ordering::Relaxed); + self.first_valid_lsn.store(lsn); + self.last_valid_lsn.store(lsn); + self.last_record_lsn.store(lsn); } pub fn get_last_valid_lsn(&self) -> Lsn { @@ -766,9 +769,9 @@ impl PageCache { num_page_images: self.num_page_images.load(Ordering::Relaxed), num_wal_records: self.num_wal_records.load(Ordering::Relaxed), num_getpage_requests: self.num_getpage_requests.load(Ordering::Relaxed), - first_valid_lsn: self.first_valid_lsn.load(Ordering::Relaxed), - last_valid_lsn: self.last_valid_lsn.load(Ordering::Relaxed), - last_record_lsn: self.last_record_lsn.load(Ordering::Relaxed), + first_valid_lsn: self.first_valid_lsn.load(), + last_valid_lsn: self.last_valid_lsn.load(), + last_record_lsn: self.last_record_lsn.load(), } } @@ -780,7 +783,7 @@ impl PageCache { // The caller must ensure that WAL has been received up to 'lsn'. // fn relsize_get_nowait(&self, rel: &RelTag, lsn: Lsn) -> anyhow::Result { - assert!(lsn.0 <= self.last_valid_lsn.load(Ordering::Acquire)); + assert!(lsn <= self.last_valid_lsn.load()); let mut key = CacheKey { tag: BufferTag { @@ -941,17 +944,17 @@ impl PageCache { // // Wait until WAL has been received up to the given LSN. // - async fn wait_lsn(&self, req_lsn: Lsn) -> anyhow::Result { - let mut lsn = req_lsn; - //When invalid LSN is requested, it means "don't wait, return latest version of the page" - //This is necessary for bootstrap. + async fn wait_lsn(&self, mut lsn: Lsn) -> anyhow::Result { + // When invalid LSN is requested, it means "don't wait, return latest version of the page" + // This is necessary for bootstrap. if lsn == Lsn(0) { - lsn = Lsn::from(self.last_valid_lsn.load(Ordering::Acquire)); + let last_valid_lsn = self.last_valid_lsn.load(); trace!( "walreceiver doesn't work yet last_valid_lsn {}, requested {}", - self.last_valid_lsn.load(Ordering::Acquire), + last_valid_lsn, lsn ); + lsn = last_valid_lsn; } self.seqwait_lsn @@ -981,9 +984,9 @@ pub fn get_stats() -> PageCacheStats { num_page_images: 0, num_wal_records: 0, num_getpage_requests: 0, - first_valid_lsn: 0, - last_valid_lsn: 0, - last_record_lsn: 0, + first_valid_lsn: Lsn(0), + last_valid_lsn: Lsn(0), + last_record_lsn: Lsn(0), }; pcaches.iter().for_each(|(_sys_id, pcache)| { diff --git a/pageserver/src/tui.rs b/pageserver/src/tui.rs index 46cd76b9ce..188ee54e11 100644 --- a/pageserver/src/tui.rs +++ b/pageserver/src/tui.rs @@ -248,13 +248,6 @@ fn get_metric_str<'a>(title: &str, value: &'a str) -> Spans<'a> { ]) } -// FIXME: We really should define a datatype for LSNs, with Display trait and -// helper functions. There's one in tokio-postgres, but I don't think we want -// to rely on that. -fn format_lsn(lsn: u64) -> String { - return format!("{:X}/{:X}", lsn >> 32, lsn & 0xffff_ffff); -} - impl tui::widgets::Widget for MetricsWidget { fn render(self, area: Rect, buf: &mut Buffer) { let block = Block::default() @@ -270,10 +263,9 @@ impl tui::widgets::Widget for MetricsWidget { let page_cache_stats = crate::page_cache::get_stats(); let lsnrange = format!( "{} - {}", - format_lsn(page_cache_stats.first_valid_lsn), - format_lsn(page_cache_stats.last_valid_lsn) + page_cache_stats.first_valid_lsn, page_cache_stats.last_valid_lsn ); - let last_valid_recordlsn_str = format_lsn(page_cache_stats.last_record_lsn); + let last_valid_recordlsn_str = page_cache_stats.last_record_lsn.to_string(); lines.push(get_metric_str("Valid LSN range", &lsnrange)); lines.push(get_metric_str("Last record LSN", &last_valid_recordlsn_str)); lines.push(get_metric_u64( diff --git a/zenith_utils/src/lsn.rs b/zenith_utils/src/lsn.rs index a1f48ddb84..e16fb678a3 100644 --- a/zenith_utils/src/lsn.rs +++ b/zenith_utils/src/lsn.rs @@ -4,6 +4,7 @@ use std::fmt; use std::ops::{Add, AddAssign}; use std::path::Path; use std::str::FromStr; +use std::sync::atomic::{AtomicU64, Ordering}; /// A Postgres LSN (Log Sequence Number), also known as an XLogRecPtr #[derive(Debug, Clone, Copy, Eq, Ord, PartialEq, PartialOrd)] @@ -108,6 +109,41 @@ impl AddAssign for Lsn { } } +/// An [`Lsn`] that can be accessed atomically. +pub struct AtomicLsn { + inner: AtomicU64, +} + +impl AtomicLsn { + /// Creates a new atomic `Lsn`. + pub fn new(val: u64) -> Self { + AtomicLsn { + inner: AtomicU64::new(val), + } + } + + /// Atomically retrieve the `Lsn` value from memory. + pub fn load(&self) -> Lsn { + Lsn(self.inner.load(Ordering::Acquire)) + } + + /// Atomically store a new `Lsn` value to memory. + pub fn store(&self, lsn: Lsn) { + self.inner.store(lsn.0, Ordering::Release); + } + + /// Adds to the current value, returning the previous value. + /// + /// This operation will panic on overflow. + pub fn fetch_add(&self, val: u64) -> Lsn { + let prev = self.inner.fetch_add(val, Ordering::AcqRel); + if prev.checked_add(val).is_none() { + panic!("AtomicLsn overflow"); + } + Lsn(prev) + } +} + #[cfg(test)] mod tests { use super::*;