mirror of
https://github.com/neondatabase/neon.git
synced 2026-01-14 17:02:56 +00:00
layered_repo - atomic last/prev record_lsn
Make a new type that stores both Lsns. Use an RwLock for thread safety.
This commit is contained in:
committed by
Patrick Insinger
parent
561bf2c510
commit
7c7e89e2ea
@@ -23,7 +23,7 @@ use std::fs;
|
||||
use std::fs::File;
|
||||
use std::io::Write;
|
||||
use std::ops::Bound::Included;
|
||||
use std::sync::{Arc, Mutex};
|
||||
use std::sync::{Arc, Mutex, RwLock};
|
||||
use std::time::{Duration, Instant};
|
||||
|
||||
use crate::relish::*;
|
||||
@@ -223,7 +223,7 @@ impl LayeredRepository {
|
||||
.conf
|
||||
.timeline_path(&timelineid, &self.tenantid)
|
||||
.join("wal");
|
||||
import_timeline_wal(&wal_dir, &timeline, timeline.last_record_lsn.load())?;
|
||||
import_timeline_wal(&wal_dir, &timeline, timeline.get_last_record_lsn())?;
|
||||
|
||||
let timeline_rc = Arc::new(timeline);
|
||||
timelines.insert(timelineid, timeline_rc.clone());
|
||||
@@ -447,6 +447,21 @@ pub struct TimelineMetadata {
|
||||
ancestor_lsn: Lsn,
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone)]
|
||||
struct RecordLsn {
|
||||
last: Lsn,
|
||||
prev: Lsn,
|
||||
}
|
||||
|
||||
impl RecordLsn {
|
||||
fn advance(&mut self, lsn: Lsn) {
|
||||
assert!(self.last <= lsn);
|
||||
let new_prev = self.last;
|
||||
self.last = lsn;
|
||||
self.prev = new_prev;
|
||||
}
|
||||
}
|
||||
|
||||
pub struct LayeredTimeline {
|
||||
conf: &'static PageServerConf,
|
||||
|
||||
@@ -465,7 +480,7 @@ pub struct LayeredTimeline {
|
||||
// versions have already been garbage collected away, we should
|
||||
// throw an error, but we don't track that currently.
|
||||
//
|
||||
// last_record_lsn points to the end of last processed WAL record.
|
||||
// record_lsn.last points to the end of last processed WAL record.
|
||||
// It can lag behind last_valid_lsn, if the WAL receiver has
|
||||
// received some WAL after the end of last record, but not the whole
|
||||
// next record yet. For get_page_at_lsn requests, we care about
|
||||
@@ -476,14 +491,16 @@ pub struct LayeredTimeline {
|
||||
// all three values together.
|
||||
//
|
||||
// We also remember the starting point of the previous record in
|
||||
// 'prev_record_lsn'. It's used to set the xl_prev pointer of the
|
||||
// 'record_lsn.prev'. It's used to set the xl_prev pointer of the
|
||||
// first WAL record when the node is started up. But here, we just
|
||||
// keep track of it. FIXME: last_record_lsn and prev_record_lsn
|
||||
// should be updated atomically together.
|
||||
// keep track of it.
|
||||
//
|
||||
// When advancing last_valid_lsn and record_lsn simultaneously, we MUST
|
||||
// advance last_valid_lsn before record_lsn.
|
||||
// This is so if a reader wishes, it can read record_lsn and then
|
||||
// last_valid_lsn, and find that last_valid_lsn >= record_lsn.
|
||||
last_valid_lsn: SeqWait<Lsn>,
|
||||
last_record_lsn: AtomicLsn,
|
||||
prev_record_lsn: AtomicLsn,
|
||||
record_lsn: RwLock<RecordLsn>,
|
||||
|
||||
// All WAL records have been processed and stored durably on files on
|
||||
// local disk, up to this LSN. On crash and restart, we need to re-process
|
||||
@@ -768,7 +785,7 @@ impl Timeline for LayeredTimeline {
|
||||
if lsn < old {
|
||||
// Should never be called with an LSN older than the last
|
||||
// record LSN, though.
|
||||
let last_record_lsn = self.last_record_lsn.load();
|
||||
let last_record_lsn = self.get_last_record_lsn();
|
||||
if lsn < last_record_lsn {
|
||||
warn!(
|
||||
"attempted to move last valid LSN backwards beyond last record LSN (last record {}, new {})",
|
||||
@@ -779,11 +796,18 @@ impl Timeline for LayeredTimeline {
|
||||
}
|
||||
|
||||
fn init_valid_lsn(&self, lsn: Lsn) {
|
||||
// These writes must be specified in the order mentioned on the field comments.
|
||||
let old = self.last_valid_lsn.advance(lsn);
|
||||
assert!(old == Lsn(0));
|
||||
let old = self.last_record_lsn.fetch_max(lsn);
|
||||
assert!(old == Lsn(0));
|
||||
self.prev_record_lsn.store(Lsn(0));
|
||||
|
||||
{
|
||||
let mut record_lsn = self.record_lsn.write().unwrap();
|
||||
|
||||
assert_eq!(record_lsn.last, Lsn(0));
|
||||
assert_eq!(record_lsn.prev, Lsn(0));
|
||||
|
||||
record_lsn.advance(lsn);
|
||||
}
|
||||
}
|
||||
|
||||
fn get_last_valid_lsn(&self) -> Lsn {
|
||||
@@ -796,30 +820,30 @@ impl Timeline for LayeredTimeline {
|
||||
/// NOTE: this updates last_valid_lsn as well.
|
||||
///
|
||||
fn advance_last_record_lsn(&self, lsn: Lsn) {
|
||||
// Can't move backwards.
|
||||
let old = self.last_record_lsn.fetch_max(lsn);
|
||||
assert!(old <= lsn);
|
||||
// These writes must be specified in the order mentioned on the field comments.
|
||||
let old_valid_lsn = self.last_valid_lsn.advance(lsn);
|
||||
|
||||
// Use old value of last_record_lsn as prev_record_lsn
|
||||
self.prev_record_lsn.fetch_max(old);
|
||||
|
||||
// Also advance last_valid_lsn
|
||||
let old = self.last_valid_lsn.advance(lsn);
|
||||
// Can't move backwards.
|
||||
if lsn < old {
|
||||
if lsn < old_valid_lsn {
|
||||
warn!(
|
||||
"attempted to move last record LSN backwards (was {}, new {})",
|
||||
old, lsn
|
||||
old_valid_lsn, lsn
|
||||
);
|
||||
}
|
||||
|
||||
{
|
||||
let mut record_lsn = self.record_lsn.write().unwrap();
|
||||
assert!(record_lsn.last <= lsn);
|
||||
record_lsn.advance(lsn);
|
||||
}
|
||||
}
|
||||
|
||||
fn get_last_record_lsn(&self) -> Lsn {
|
||||
self.last_record_lsn.load()
|
||||
self.record_lsn.read().unwrap().last
|
||||
}
|
||||
|
||||
fn get_prev_record_lsn(&self) -> Lsn {
|
||||
self.prev_record_lsn.load()
|
||||
self.record_lsn.read().unwrap().prev
|
||||
}
|
||||
}
|
||||
|
||||
@@ -846,9 +870,10 @@ impl LayeredTimeline {
|
||||
// initialize in-memory 'last_valid_lsn' and 'last_record_lsn' from
|
||||
// 'disk_consistent_lsn'.
|
||||
last_valid_lsn: SeqWait::new(metadata.disk_consistent_lsn),
|
||||
last_record_lsn: AtomicLsn::new(metadata.disk_consistent_lsn.0),
|
||||
|
||||
prev_record_lsn: AtomicLsn::new(metadata.prev_record_lsn.unwrap_or(Lsn(0)).0),
|
||||
record_lsn: RwLock::new(RecordLsn {
|
||||
last: metadata.disk_consistent_lsn,
|
||||
prev: metadata.prev_record_lsn.unwrap_or(Lsn(0)),
|
||||
}),
|
||||
disk_consistent_lsn: AtomicLsn::new(metadata.disk_consistent_lsn.0),
|
||||
|
||||
ancestor_timeline: ancestor,
|
||||
@@ -1104,10 +1129,16 @@ impl LayeredTimeline {
|
||||
/// NOTE: This has nothing to do with checkpoint in PostgreSQL. We don't
|
||||
/// know anything about them here in the repository.
|
||||
fn checkpoint_internal(&self, force: bool) -> Result<()> {
|
||||
// FIXME: these should be fetched atomically.
|
||||
// To hold the invariant last_valid_lsn >= last_record_lsn,
|
||||
// we must follow the read order specified in the field comments.
|
||||
|
||||
let RecordLsn {
|
||||
last: last_record_lsn,
|
||||
prev: prev_record_lsn,
|
||||
} = self.record_lsn.read().unwrap().clone();
|
||||
|
||||
let last_valid_lsn = self.last_valid_lsn.load();
|
||||
let last_record_lsn = self.last_record_lsn.load();
|
||||
let prev_record_lsn = self.prev_record_lsn.load();
|
||||
|
||||
trace!(
|
||||
"checkpointing timeline {} at {}",
|
||||
self.timelineid,
|
||||
|
||||
Reference in New Issue
Block a user