Remove duplicated LSN fields from the page cache.

Having multiple copies of the same values is a source of confusion.
Commit da9bf5dc63 fixed one race condition caused by that, for example.
See also discussion at
https://github.com/zenithdb/zenith/issues/57#issuecomment-824393470

This changes SeqWait.advance() to return the old number, and not panic if
you try to move the value backwards. The caller should check for that and
act accordingly.
This commit is contained in:
Heikki Linnakangas
2021-04-27 10:32:39 +03:00
parent 4acdcbe90f
commit cff671c1bd
3 changed files with 77 additions and 96 deletions

View File

@@ -29,28 +29,36 @@ use zenith_utils::seqwait::SeqWait;
static TIMEOUT: Duration = Duration::from_secs(60);
pub struct PageCache {
shared: Mutex<PageCacheShared>,
// RocksDB handle
db: rocksdb::DB,
// WAL redo manager
walredo_mgr: WalRedoManager,
// Allows waiting for the arrival of a particular LSN.
seqwait_lsn: SeqWait<Lsn>,
// What page versions do we hold in the cache? If we get GetPage with
// LSN < first_valid_lsn, that's an error because we (no longer) hold that
// page version. If we get a request > last_valid_lsn, we need to wait until
// we receive all the WAL up to the request. The SeqWait provides functions
// for that.
//
// last_record_lsn points to the end of last processed WAL record.
// It can lag behind last_valid_lsn, if the WAL receiver has received some WAL
// after the end of last record, but not the whole next record yet. In the
// page cache, we care about last_valid_lsn, but if the WAL receiver needs to
// restart the streaming, it needs to restart at the end of last record, so
// we track them separately. last_record_lsn should perhaps be in
// walreceiver.rs instead of here, but it seems convenient to keep all three
// values together.
//
first_valid_lsn: AtomicLsn,
last_valid_lsn: SeqWait<Lsn>,
last_record_lsn: AtomicLsn,
// Counters, for metrics collection.
pub num_entries: AtomicU64,
pub num_page_images: AtomicU64,
pub num_wal_records: AtomicU64,
pub num_getpage_requests: AtomicU64,
// copies of shared.first/last_valid_lsn fields (copied here so
// that they can be read without acquiring the mutex).
first_valid_lsn: AtomicLsn,
last_valid_lsn: AtomicLsn,
last_record_lsn: AtomicLsn,
}
#[derive(Clone)]
@@ -70,29 +78,6 @@ impl AddAssign for PageCacheStats {
}
}
//
// Shared data structure, holding page cache and related auxiliary information
//
struct PageCacheShared {
// What page versions do we hold in the cache? If we get GetPage with
// LSN < first_valid_lsn, that's an error because we (no longer) hold that
// page version. If we get a request > last_valid_lsn, we need to wait until
// we receive all the WAL up to the request.
//
// last_record_lsn points to the end of last processed WAL record.
// It can lag behind last_valid_lsn, if the WAL receiver has received some WAL
// after the end of last record, but not the whole next record yet. In the
// page cache, we care about last_valid_lsn, but if the WAL receiver needs to
// restart the streaming, it needs to restart at the end of last record, so
// we track them separately. last_record_lsn should perhaps be in
// walreceiver.rs instead of here, but it seems convenient to keep all three
// values together.
//
first_valid_lsn: Lsn,
last_valid_lsn: Lsn,
last_record_lsn: Lsn,
}
lazy_static! {
pub static ref PAGECACHES: Mutex<HashMap<ZTimelineId, Arc<PageCache>>> =
Mutex::new(HashMap::new());
@@ -166,26 +151,19 @@ fn open_rocksdb(_conf: &PageServerConf, timelineid: ZTimelineId) -> rocksdb::DB
fn init_page_cache(conf: &PageServerConf, timelineid: ZTimelineId) -> PageCache {
PageCache {
shared: Mutex::new(PageCacheShared {
first_valid_lsn: Lsn(0),
last_valid_lsn: Lsn(0),
last_record_lsn: Lsn(0),
}),
db: open_rocksdb(&conf, timelineid),
walredo_mgr: WalRedoManager::new(conf, timelineid),
seqwait_lsn: SeqWait::new(Lsn(0)),
first_valid_lsn: AtomicLsn::new(0),
last_valid_lsn: SeqWait::new(Lsn(0)),
last_record_lsn: AtomicLsn::new(0),
num_entries: AtomicU64::new(0),
num_page_images: AtomicU64::new(0),
num_wal_records: AtomicU64::new(0),
num_getpage_requests: AtomicU64::new(0),
first_valid_lsn: AtomicLsn::new(0),
last_valid_lsn: AtomicLsn::new(0),
last_record_lsn: AtomicLsn::new(0),
}
}
@@ -643,18 +621,13 @@ impl PageCache {
/// Remember that WAL has been received and added to the page cache up to the given LSN
pub fn advance_last_valid_lsn(&self, lsn: Lsn) {
let mut shared = self.shared.lock().unwrap();
let old = self.last_valid_lsn.advance(lsn);
// Can't move backwards.
let oldlsn = shared.last_valid_lsn;
if lsn >= oldlsn {
shared.last_valid_lsn = lsn;
self.last_valid_lsn.store(lsn);
self.seqwait_lsn.advance(lsn);
} else {
if lsn < old {
warn!(
"attempted to move last valid LSN backwards (was {}, new {})",
oldlsn, lsn
old, lsn
);
}
}
@@ -665,17 +638,19 @@ impl PageCache {
/// NOTE: this updates last_valid_lsn as well.
///
pub fn advance_last_record_lsn(&self, lsn: Lsn) {
let mut shared = self.shared.lock().unwrap();
// Can't move backwards.
assert!(lsn >= shared.last_valid_lsn);
assert!(lsn >= shared.last_record_lsn);
let old = self.last_record_lsn.fetch_max(lsn);
assert!(old <= lsn);
shared.last_valid_lsn = lsn;
shared.last_record_lsn = lsn;
self.last_valid_lsn.store(lsn);
self.last_record_lsn.store(lsn);
self.seqwait_lsn.advance(lsn);
// Also advance last_valid_lsn
let old = self.last_valid_lsn.advance(lsn);
// Can't move backwards.
if lsn < old {
warn!(
"attempted to move last record LSN backwards (was {}, new {})",
old, lsn
);
}
}
///
@@ -684,39 +659,27 @@ impl PageCache {
/// TODO: This should be called by garbage collection, so that if an older
/// page is requested, we will return an error to the requestor.
pub fn _advance_first_valid_lsn(&self, lsn: Lsn) {
let mut shared = self.shared.lock().unwrap();
// Can't move backwards.
assert!(lsn >= shared.first_valid_lsn);
// Can't overtake last_valid_lsn (except when we're
// initializing the system and last_valid_lsn hasn't been set yet.
assert!(shared.last_valid_lsn == Lsn(0) || lsn < shared.last_valid_lsn);
let last_valid_lsn = self.last_valid_lsn.load();
assert!(last_valid_lsn == Lsn(0) || lsn < last_valid_lsn);
shared.first_valid_lsn = lsn;
self.first_valid_lsn.store(lsn);
let old = self.first_valid_lsn.fetch_max(lsn);
// Can't move backwards.
assert!(lsn >= old);
}
pub fn init_valid_lsn(&self, lsn: Lsn) {
let mut shared = self.shared.lock().unwrap();
assert!(shared.first_valid_lsn == Lsn(0));
assert!(shared.last_valid_lsn == Lsn(0));
assert!(shared.last_record_lsn == Lsn(0));
shared.first_valid_lsn = lsn;
shared.last_valid_lsn = lsn;
shared.last_record_lsn = lsn;
self.first_valid_lsn.store(lsn);
self.last_valid_lsn.store(lsn);
self.last_record_lsn.store(lsn);
let old = self.last_valid_lsn.advance(lsn);
assert!(old == Lsn(0));
let old = self.last_record_lsn.fetch_max(lsn);
assert!(old == Lsn(0));
let old = self.first_valid_lsn.fetch_max(lsn);
assert!(old == Lsn(0));
}
pub fn get_last_valid_lsn(&self) -> Lsn {
let shared = self.shared.lock().unwrap();
shared.last_record_lsn
self.last_valid_lsn.load()
}
//
@@ -913,7 +876,7 @@ impl PageCache {
lsn = last_valid_lsn;
}
self.seqwait_lsn
self.last_valid_lsn
.wait_for_timeout(lsn, TIMEOUT)
.with_context(|| {
format!(

View File

@@ -171,6 +171,12 @@ impl AtomicLsn {
}
Lsn(prev)
}
/// Atomically sets the Lsn to the max of old and new value, returning the old value.
pub fn fetch_max(&self, lsn: Lsn) -> Lsn {
let prev = self.inner.fetch_max(lsn.0, Ordering::AcqRel);
Lsn(prev)
}
}
#[cfg(test)]

View File

@@ -173,17 +173,15 @@ where
///
/// All waiters at this value or below will be woken.
///
/// `advance` will panic if you send it a lower number than
/// a previous call.
pub fn advance(&self, num: T) {
/// Returns the old number.
pub fn advance(&self, num: T) -> T {
let old_value;
let wake_these = {
let mut internal = self.internal.lock().unwrap();
if internal.current > num {
panic!(
"tried to advance backwards, from {:?} to {:?}",
internal.current, num
);
old_value = internal.current;
if old_value >= num {
return old_value;
}
internal.current = num;
@@ -204,6 +202,12 @@ where
// We don't care; discard the error.
let _ = tx.send(());
}
old_value
}
/// Read the current value, without waiting.
pub fn load(&self) -> T {
self.internal.lock().unwrap().current
}
}
@@ -222,7 +226,8 @@ mod tests {
let seq3 = Arc::clone(&seq);
spawn(move || {
seq2.wait_for(42).expect("wait_for 42");
seq2.advance(100);
let old = seq2.advance(100);
assert_eq!(old, 99);
seq2.wait_for(999).expect_err("no 999");
});
spawn(move || {
@@ -230,8 +235,14 @@ mod tests {
seq3.wait_for(0).expect("wait_for 0");
});
sleep(Duration::from_secs(1));
seq.advance(99);
let old = seq.advance(99);
assert_eq!(old, 0);
seq.wait_for(100).expect("wait_for 100");
// Calling advance with a smaller value is a no-op
assert_eq!(seq.advance(98), 100);
assert_eq!(seq.load(), 100);
seq.shutdown();
}
@@ -247,6 +258,7 @@ mod tests {
sleep(Duration::from_secs(1));
// This will attempt to wake, but nothing will happen
// because the waiter already dropped its Receiver.
seq.advance(99);
let old = seq.advance(99);
assert_eq!(old, 0)
}
}