add AtomicLsn

AtomicLsn is a wrapper around AtomicU64 that has load() and store()
members that are cheap (on x86, anyway) and can be safely used in any
context.

This commit uses AtomicLsn in the page cache, and fixes up some
downstream code that manually implemented LSN formatting.

There's also a bugfix to the logging in wait_lsn, which prints the
wrong lsn value.
This commit is contained in:
Eric Seppanen
2021-04-24 15:00:09 -07:00
parent d760446053
commit 07d0241076
3 changed files with 76 additions and 45 deletions

View File

@@ -39,7 +39,7 @@ use bytes::{Buf, BufMut, Bytes, BytesMut};
use lazy_static::lazy_static;
use log::*;
use rocksdb;
use std::cmp::min;
use std::cmp::{max, min};
use std::collections::HashMap;
use std::sync::atomic::AtomicU64;
use std::sync::atomic::Ordering;
@@ -47,7 +47,7 @@ use std::sync::{Arc, Mutex};
use std::thread;
use std::time::{Duration, Instant};
use std::{convert::TryInto, ops::AddAssign};
use zenith_utils::lsn::Lsn;
use zenith_utils::lsn::{AtomicLsn, Lsn};
use zenith_utils::seqwait::SeqWait;
// Timeout when waiting or WAL receiver to catch up to an LSN given in a GetPage@LSN call.
@@ -73,9 +73,9 @@ pub struct PageCache {
// copies of shared.first/last_valid_lsn fields (copied here so
// that they can be read without acquiring the mutex).
pub first_valid_lsn: AtomicU64,
pub last_valid_lsn: AtomicU64,
pub last_record_lsn: AtomicU64,
first_valid_lsn: AtomicLsn,
last_valid_lsn: AtomicLsn,
last_record_lsn: AtomicLsn,
}
#[derive(Clone)]
@@ -84,9 +84,9 @@ pub struct PageCacheStats {
pub num_page_images: u64,
pub num_wal_records: u64,
pub num_getpage_requests: u64,
pub first_valid_lsn: u64,
pub last_valid_lsn: u64,
pub last_record_lsn: u64,
pub first_valid_lsn: Lsn,
pub last_valid_lsn: Lsn,
pub last_record_lsn: Lsn,
}
impl AddAssign for PageCacheStats {
@@ -96,9 +96,12 @@ impl AddAssign for PageCacheStats {
num_page_images: self.num_page_images + other.num_page_images,
num_wal_records: self.num_wal_records + other.num_wal_records,
num_getpage_requests: self.num_getpage_requests + other.num_getpage_requests,
first_valid_lsn: self.first_valid_lsn + other.first_valid_lsn,
last_valid_lsn: self.last_valid_lsn + other.last_valid_lsn,
last_record_lsn: self.last_record_lsn + other.last_record_lsn,
// FIXME: needs review
// What should be happening here? I'm not sure what is the desired result.
first_valid_lsn: min(self.first_valid_lsn, other.first_valid_lsn),
last_valid_lsn: max(self.last_valid_lsn, other.last_valid_lsn),
last_record_lsn: max(self.last_record_lsn, other.last_record_lsn),
}
}
}
@@ -221,9 +224,9 @@ fn init_page_cache(conf: &PageServerConf, timelineid: ZTimelineId) -> PageCache
num_wal_records: AtomicU64::new(0),
num_getpage_requests: AtomicU64::new(0),
first_valid_lsn: AtomicU64::new(0),
last_valid_lsn: AtomicU64::new(0),
last_record_lsn: AtomicU64::new(0),
first_valid_lsn: AtomicLsn::new(0),
last_valid_lsn: AtomicLsn::new(0),
last_record_lsn: AtomicLsn::new(0),
}
}
@@ -577,7 +580,7 @@ impl PageCache {
let mut key = CacheKey { tag, lsn: rec.lsn };
// What was the size of the relation before this record?
let last_lsn = Lsn::from(self.last_valid_lsn.load(Ordering::Acquire));
let last_lsn = self.last_valid_lsn.load();
let old_rel_size = self.relsize_get_nowait(&tag.rel, last_lsn)?;
let content = CacheEntryContent {
@@ -687,7 +690,7 @@ impl PageCache {
let oldlsn = shared.last_valid_lsn;
if lsn >= oldlsn {
shared.last_valid_lsn = lsn;
self.last_valid_lsn.store(lsn.0, Ordering::Relaxed);
self.last_valid_lsn.store(lsn);
self.seqwait_lsn.advance(lsn);
} else {
warn!(
@@ -711,8 +714,8 @@ impl PageCache {
shared.last_valid_lsn = lsn;
shared.last_record_lsn = lsn;
self.last_valid_lsn.store(lsn.0, Ordering::Relaxed);
self.last_record_lsn.store(lsn.0, Ordering::Relaxed);
self.last_valid_lsn.store(lsn);
self.last_record_lsn.store(lsn);
self.seqwait_lsn.advance(lsn);
}
@@ -732,7 +735,7 @@ impl PageCache {
assert!(shared.last_valid_lsn == Lsn(0) || lsn < shared.last_valid_lsn);
shared.first_valid_lsn = lsn;
self.first_valid_lsn.store(lsn.0, Ordering::Relaxed);
self.first_valid_lsn.store(lsn);
}
pub fn init_valid_lsn(&self, lsn: Lsn) {
@@ -746,9 +749,9 @@ impl PageCache {
shared.last_valid_lsn = lsn;
shared.last_record_lsn = lsn;
self.first_valid_lsn.store(lsn.0, Ordering::Relaxed);
self.last_valid_lsn.store(lsn.0, Ordering::Relaxed);
self.last_record_lsn.store(lsn.0, Ordering::Relaxed);
self.first_valid_lsn.store(lsn);
self.last_valid_lsn.store(lsn);
self.last_record_lsn.store(lsn);
}
pub fn get_last_valid_lsn(&self) -> Lsn {
@@ -766,9 +769,9 @@ impl PageCache {
num_page_images: self.num_page_images.load(Ordering::Relaxed),
num_wal_records: self.num_wal_records.load(Ordering::Relaxed),
num_getpage_requests: self.num_getpage_requests.load(Ordering::Relaxed),
first_valid_lsn: self.first_valid_lsn.load(Ordering::Relaxed),
last_valid_lsn: self.last_valid_lsn.load(Ordering::Relaxed),
last_record_lsn: self.last_record_lsn.load(Ordering::Relaxed),
first_valid_lsn: self.first_valid_lsn.load(),
last_valid_lsn: self.last_valid_lsn.load(),
last_record_lsn: self.last_record_lsn.load(),
}
}
@@ -780,7 +783,7 @@ impl PageCache {
// The caller must ensure that WAL has been received up to 'lsn'.
//
fn relsize_get_nowait(&self, rel: &RelTag, lsn: Lsn) -> anyhow::Result<u32> {
assert!(lsn.0 <= self.last_valid_lsn.load(Ordering::Acquire));
assert!(lsn <= self.last_valid_lsn.load());
let mut key = CacheKey {
tag: BufferTag {
@@ -941,17 +944,17 @@ impl PageCache {
//
// Wait until WAL has been received up to the given LSN.
//
async fn wait_lsn(&self, req_lsn: Lsn) -> anyhow::Result<Lsn> {
let mut lsn = req_lsn;
//When invalid LSN is requested, it means "don't wait, return latest version of the page"
//This is necessary for bootstrap.
async fn wait_lsn(&self, mut lsn: Lsn) -> anyhow::Result<Lsn> {
// When invalid LSN is requested, it means "don't wait, return latest version of the page"
// This is necessary for bootstrap.
if lsn == Lsn(0) {
lsn = Lsn::from(self.last_valid_lsn.load(Ordering::Acquire));
let last_valid_lsn = self.last_valid_lsn.load();
trace!(
"walreceiver doesn't work yet last_valid_lsn {}, requested {}",
self.last_valid_lsn.load(Ordering::Acquire),
last_valid_lsn,
lsn
);
lsn = last_valid_lsn;
}
self.seqwait_lsn
@@ -981,9 +984,9 @@ pub fn get_stats() -> PageCacheStats {
num_page_images: 0,
num_wal_records: 0,
num_getpage_requests: 0,
first_valid_lsn: 0,
last_valid_lsn: 0,
last_record_lsn: 0,
first_valid_lsn: Lsn(0),
last_valid_lsn: Lsn(0),
last_record_lsn: Lsn(0),
};
pcaches.iter().for_each(|(_sys_id, pcache)| {

View File

@@ -248,13 +248,6 @@ fn get_metric_str<'a>(title: &str, value: &'a str) -> Spans<'a> {
])
}
// FIXME: We really should define a datatype for LSNs, with Display trait and
// helper functions. There's one in tokio-postgres, but I don't think we want
// to rely on that.
fn format_lsn(lsn: u64) -> String {
return format!("{:X}/{:X}", lsn >> 32, lsn & 0xffff_ffff);
}
impl tui::widgets::Widget for MetricsWidget {
fn render(self, area: Rect, buf: &mut Buffer) {
let block = Block::default()
@@ -270,10 +263,9 @@ impl tui::widgets::Widget for MetricsWidget {
let page_cache_stats = crate::page_cache::get_stats();
let lsnrange = format!(
"{} - {}",
format_lsn(page_cache_stats.first_valid_lsn),
format_lsn(page_cache_stats.last_valid_lsn)
page_cache_stats.first_valid_lsn, page_cache_stats.last_valid_lsn
);
let last_valid_recordlsn_str = format_lsn(page_cache_stats.last_record_lsn);
let last_valid_recordlsn_str = page_cache_stats.last_record_lsn.to_string();
lines.push(get_metric_str("Valid LSN range", &lsnrange));
lines.push(get_metric_str("Last record LSN", &last_valid_recordlsn_str));
lines.push(get_metric_u64(

View File

@@ -4,6 +4,7 @@ use std::fmt;
use std::ops::{Add, AddAssign};
use std::path::Path;
use std::str::FromStr;
use std::sync::atomic::{AtomicU64, Ordering};
/// A Postgres LSN (Log Sequence Number), also known as an XLogRecPtr
#[derive(Debug, Clone, Copy, Eq, Ord, PartialEq, PartialOrd)]
@@ -108,6 +109,41 @@ impl AddAssign<u64> for Lsn {
}
}
/// An [`Lsn`] that can be accessed atomically.
pub struct AtomicLsn {
inner: AtomicU64,
}
impl AtomicLsn {
/// Creates a new atomic `Lsn`.
pub fn new(val: u64) -> Self {
AtomicLsn {
inner: AtomicU64::new(val),
}
}
/// Atomically retrieve the `Lsn` value from memory.
pub fn load(&self) -> Lsn {
Lsn(self.inner.load(Ordering::Acquire))
}
/// Atomically store a new `Lsn` value to memory.
pub fn store(&self, lsn: Lsn) {
self.inner.store(lsn.0, Ordering::Release);
}
/// Adds to the current value, returning the previous value.
///
/// This operation will panic on overflow.
pub fn fetch_add(&self, val: u64) -> Lsn {
let prev = self.inner.fetch_add(val, Ordering::AcqRel);
if prev.checked_add(val).is_none() {
panic!("AtomicLsn overflow");
}
Lsn(prev)
}
}
#[cfg(test)]
mod tests {
use super::*;