Remove last_valid_lsn tracking in wal_receiver.

There are two main reasons for that:

a) Latest unfinished record may disapper after compute node restart, so let's
    try not leak volatile part of the WAL into the repository. Always use
    last_valid_record instead.

    That change requires different getPage@LSN logic in postgres -- we need
    to ask LSN's that point to some complete record instead of GetFlushRecPtr()
    that can point in the middle of the record. That was already done by @knizhnik
    to deal with the same problem during the work on `postgres --sync-safekeepers`.

    Postgres will use LSN's aligned on 0x8 boundary in get_page requests, so we
    also need to be sure that last_valid_record is aligned.

b) Switch to get_last_record_lsn() in basebackup@no_lsn. When compute node
    is running without safekeepers and streams WAL directly
    to pageserver it is important to match basebackup LSN and LSN of replication
    start. Before this commit basebackup@no_lsn was waiting for last_valid_lsn
    and walreceiver started replication with last_record_lsn, which can be less.
    So replication was failing since compute node doesn't have requested WAL.
This commit is contained in:
Stas Kelvich
2021-09-02 00:52:03 +03:00
parent ddd2c83c64
commit 8c07a36fda
10 changed files with 176 additions and 212 deletions

View File

@@ -162,7 +162,7 @@ fn bootstrap_timeline(
run_initdb(conf, &initdb_path)?;
let pgdata_path = initdb_path;
let lsn = get_lsn_from_controlfile(&pgdata_path)?;
let lsn = get_lsn_from_controlfile(&pgdata_path)?.align();
info!("bootstrap_timeline {:?} at lsn {}", pgdata_path, lsn);
@@ -215,7 +215,7 @@ pub(crate) fn get_branches(conf: &PageServerConf, tenantid: &ZTenantId) -> Resul
let latest_valid_lsn = repo
.get_timeline(timeline_id)
.map(|timeline| timeline.get_last_valid_lsn())
.map(|timeline| timeline.get_last_record_lsn())
.ok();
let ancestor_path = conf.ancestor_path(&timeline_id, tenantid);

View File

@@ -25,7 +25,7 @@ use std::io::Write;
use std::ops::Bound::Included;
use std::path::Path;
use std::str::FromStr;
use std::sync::{Arc, Mutex, RwLock};
use std::sync::{Arc, Mutex};
use std::time::{Duration, Instant};
use crate::relish::*;
@@ -39,7 +39,7 @@ use zenith_metrics::{register_histogram, Histogram};
use zenith_metrics::{register_histogram_vec, HistogramVec};
use zenith_utils::bin_ser::BeSer;
use zenith_utils::lsn::{AtomicLsn, Lsn};
use zenith_utils::seqwait::SeqWait;
use zenith_utils::seqwait::{MonotonicCounter, SeqWait};
mod blob;
mod delta_layer;
@@ -116,6 +116,8 @@ impl Repository for LayeredRepository {
) -> Result<Arc<dyn Timeline>> {
let mut timelines = self.timelines.lock().unwrap();
assert!(start_lsn.is_aligned());
// Create the timeline directory, and write initial metadata to file.
std::fs::create_dir_all(self.conf.timeline_path(&timelineid, &self.tenantid))?;
@@ -145,6 +147,8 @@ impl Repository for LayeredRepository {
/// Branch a timeline
fn branch_timeline(&self, src: ZTimelineId, dst: ZTimelineId, start_lsn: Lsn) -> Result<()> {
let src_timeline = self.get_timeline(src)?;
// This LSN comes from the user request. Make sure it is aligned.
let start_lsn = start_lsn.aligned();
// Create the metadata file, noting the ancestor of the new timeline.
// There is initially no data in it, but all the read-calls know to look
@@ -315,7 +319,10 @@ impl LayeredRepository {
let path = conf.timeline_path(&timelineid, &tenantid).join("metadata");
let data = std::fs::read(&path)?;
Ok(TimelineMetadata::des(&data)?)
let data = TimelineMetadata::des(&data)?;
assert!(data.disk_consistent_lsn.is_aligned());
Ok(data)
}
//
@@ -407,7 +414,7 @@ impl LayeredRepository {
.collect();
let timeline = self.get_timeline_locked(timelineid, &mut *timelines)?;
let last_lsn = timeline.get_last_valid_lsn();
let last_lsn = timeline.get_last_record_lsn();
if let Some(cutoff) = last_lsn.checked_sub(horizon) {
// If GC was explicitly requested by the admin, force flush all in-memory
@@ -451,19 +458,22 @@ pub struct TimelineMetadata {
ancestor_lsn: Lsn,
}
#[derive(Debug, Clone)]
#[derive(Debug, Clone, Copy)]
struct RecordLsn {
last: Lsn,
prev: Lsn,
}
impl RecordLsn {
fn advance(&mut self, lsn: Lsn) {
impl MonotonicCounter<Lsn> for RecordLsn {
fn cnt_advance(&mut self, lsn: Lsn) {
assert!(self.last <= lsn);
let new_prev = self.last;
self.last = lsn;
self.prev = new_prev;
}
fn cnt_value(&self) -> Lsn {
self.last
}
}
pub struct LayeredTimeline {
@@ -478,33 +488,19 @@ pub struct LayeredTimeline {
walredo_mgr: Arc<dyn WalRedoManager + Sync + Send>,
// What page versions do we hold in the repository? If we get a
// request > last_valid_lsn, we need to wait until we receive all
// request > last_record_lsn, we need to wait until we receive all
// the WAL up to the request. The SeqWait provides functions for
// that. TODO: If we get a request for an old LSN, such that the
// versions have already been garbage collected away, we should
// throw an error, but we don't track that currently.
//
// record_lsn.last points to the end of last processed WAL record.
// It can lag behind last_valid_lsn, if the WAL receiver has
// received some WAL after the end of last record, but not the whole
// next record yet. For get_page_at_lsn requests, we care about
// last_valid_lsn, but if the WAL receiver needs to restart the
// streaming, it needs to restart at the end of last record, so we
// track them separately. last_record_lsn should perhaps be in
// walreceiver.rs instead of here, but it seems convenient to keep
// all three values together.
// last_record_lsn.load().last points to the end of last processed WAL record.
//
// We also remember the starting point of the previous record in
// 'record_lsn.prev'. It's used to set the xl_prev pointer of the
// 'last_record_lsn.load().prev'. It's used to set the xl_prev pointer of the
// first WAL record when the node is started up. But here, we just
// keep track of it.
//
// When advancing last_valid_lsn and record_lsn simultaneously, we MUST
// advance last_valid_lsn before record_lsn.
// This is so if a reader wishes, it can read record_lsn and then
// last_valid_lsn, and find that last_valid_lsn >= record_lsn.
last_valid_lsn: SeqWait<Lsn>,
record_lsn: RwLock<RecordLsn>,
last_record_lsn: SeqWait<RecordLsn, Lsn>,
// All WAL records have been processed and stored durably on files on
// local disk, up to this LSN. On crash and restart, we need to re-process
@@ -675,7 +671,7 @@ impl Timeline for LayeredTimeline {
debug!("put_truncation: {} to {} blocks at {}", rel, relsize, lsn);
let oldsize = self
.get_relish_size(rel, self.last_valid_lsn.load())?
.get_relish_size(rel, self.get_last_record_lsn())?
.ok_or_else(|| {
anyhow!(
"attempted to truncate non-existent relish {} at {}",
@@ -722,7 +718,7 @@ impl Timeline for LayeredTimeline {
trace!("put_unlink: {} at {}", rel, lsn);
if rel.is_blocky() {
let oldsize_opt = self.get_relish_size(rel, self.last_valid_lsn.load())?;
let oldsize_opt = self.get_relish_size(rel, self.get_last_record_lsn())?;
if let Some(oldsize) = oldsize_opt {
let old_last_seg = if oldsize == 0 {
0
@@ -778,76 +774,25 @@ impl Timeline for LayeredTimeline {
.observe_closure_duration(|| self.checkpoint_internal(true))
}
/// Remember that WAL has been received and added to the timeline up to the given LSN
fn advance_last_valid_lsn(&self, lsn: Lsn) {
let old = self.last_valid_lsn.advance(lsn);
// The last valid LSN cannot move backwards, but when WAL
// receiver is restarted after having only partially processed
// a record, it can call this with an lsn older than previous
// last valid LSN, when it restarts processing that record.
if lsn < old {
// Should never be called with an LSN older than the last
// record LSN, though.
let last_record_lsn = self.get_last_record_lsn();
if lsn < last_record_lsn {
warn!(
"attempted to move last valid LSN backwards beyond last record LSN (last record {}, new {})",
last_record_lsn, lsn
);
}
}
}
fn init_valid_lsn(&self, lsn: Lsn) {
// These writes must be specified in the order mentioned on the field comments.
let old = self.last_valid_lsn.advance(lsn);
assert!(old == Lsn(0));
{
let mut record_lsn = self.record_lsn.write().unwrap();
assert_eq!(record_lsn.last, Lsn(0));
assert_eq!(record_lsn.prev, Lsn(0));
record_lsn.advance(lsn);
}
}
fn get_last_valid_lsn(&self) -> Lsn {
self.last_valid_lsn.load()
}
///
/// Remember the (end of) last valid WAL record remembered in the timeline.
///
/// NOTE: this updates last_valid_lsn as well.
///
fn advance_last_record_lsn(&self, lsn: Lsn) {
// These writes must be specified in the order mentioned on the field comments.
let old_valid_lsn = self.last_valid_lsn.advance(lsn);
fn advance_last_record_lsn(&self, new_lsn: Lsn) {
assert!(new_lsn.is_aligned());
// Can't move backwards.
if lsn < old_valid_lsn {
warn!(
"attempted to move last record LSN backwards (was {}, new {})",
old_valid_lsn, lsn
);
}
let old_lsn = self.last_record_lsn.advance(new_lsn);
{
let mut record_lsn = self.record_lsn.write().unwrap();
assert!(record_lsn.last <= lsn);
record_lsn.advance(lsn);
}
// since we are align incoming LSN's we can't have delta less
// then 0x8
assert!(old_lsn == new_lsn || (new_lsn.0 - old_lsn.0 >= 0x8));
}
fn get_last_record_lsn(&self) -> Lsn {
self.record_lsn.read().unwrap().last
self.last_record_lsn.load().last
}
fn get_prev_record_lsn(&self) -> Lsn {
self.record_lsn.read().unwrap().prev
self.last_record_lsn.load().prev
}
}
@@ -871,10 +816,8 @@ impl LayeredTimeline {
walredo_mgr,
// initialize in-memory 'last_valid_lsn' and 'last_record_lsn' from
// 'disk_consistent_lsn'.
last_valid_lsn: SeqWait::new(metadata.disk_consistent_lsn),
record_lsn: RwLock::new(RecordLsn {
// initialize in-memory 'last_record_lsn' from 'disk_consistent_lsn'.
last_record_lsn: SeqWait::new(RecordLsn {
last: metadata.disk_consistent_lsn,
prev: metadata.prev_record_lsn.unwrap_or(Lsn(0)),
}),
@@ -1020,8 +963,15 @@ impl LayeredTimeline {
fn get_layer_for_write(&self, seg: SegmentTag, lsn: Lsn) -> Result<Arc<InMemoryLayer>> {
let layers = self.layers.lock().unwrap();
if lsn < self.last_valid_lsn.load() {
bail!("cannot modify relation after advancing last_valid_lsn");
assert!(lsn.is_aligned());
let last_record_lsn = self.get_last_record_lsn();
if lsn < last_record_lsn {
panic!(
"cannot modify relation after advancing last_record_lsn (incoming_lsn={}, last_record_lsn={})",
lsn,
last_record_lsn
);
}
// Do we have a layer open for writing already?
@@ -1133,20 +1083,15 @@ impl LayeredTimeline {
/// NOTE: This has nothing to do with checkpoint in PostgreSQL. We don't
/// know anything about them here in the repository.
fn checkpoint_internal(&self, force: bool) -> Result<()> {
// To hold the invariant last_valid_lsn >= last_record_lsn,
// we must follow the read order specified in the field comments.
let RecordLsn {
last: last_record_lsn,
prev: prev_record_lsn,
} = self.record_lsn.read().unwrap().clone();
let last_valid_lsn = self.last_valid_lsn.load();
} = self.last_record_lsn.load();
trace!(
"checkpointing timeline {} at {}",
self.timelineid,
last_valid_lsn
last_record_lsn
);
// Grab lock on the layer map.
@@ -1169,10 +1114,10 @@ impl LayeredTimeline {
while let Some(oldest_layer) = layers.peek_oldest_open() {
// Does this layer need freezing?
let oldest_pending_lsn = oldest_layer.get_oldest_pending_lsn();
let distance = last_valid_lsn.0 - oldest_pending_lsn.0;
let distance = last_record_lsn.0 - oldest_pending_lsn.0;
if !force && distance < OLDEST_INMEM_DISTANCE {
info!(
"the oldest layer is now {} which is {} bytes behind last_valid_lsn",
"the oldest layer is now {} which is {} bytes behind last_record_lsn",
oldest_layer.get_seg_tag(),
distance
);
@@ -1181,7 +1126,7 @@ impl LayeredTimeline {
}
// freeze it
let (new_historics, new_open) = oldest_layer.freeze(last_valid_lsn, &self)?;
let (new_historics, new_open) = oldest_layer.freeze(last_record_lsn, &self)?;
// replace this layer with the new layers that 'freeze' returned
layers.pop_oldest_open();

View File

@@ -348,7 +348,7 @@ impl PageServerHandler {
/* Send a tarball of the latest snapshot on the timeline */
let req_lsn = lsn.unwrap_or_else(|| timeline.get_last_valid_lsn());
let req_lsn = lsn.unwrap_or_else(|| timeline.get_last_record_lsn());
{
let mut writer = CopyDataSink { pgb };

View File

@@ -137,24 +137,12 @@ pub trait Timeline: Send + Sync {
/// This method is used for marking dropped relations and truncated SLRU segments
fn put_unlink(&self, tag: RelishTag, lsn: Lsn) -> Result<()>;
/// Remember the all WAL before the given LSN has been processed.
/// Track end of the latest digested WAL record.
///
/// The WAL receiver calls this after the put_* functions, to indicate that
/// all WAL before this point has been digested. Before that, if you call
/// GET on an earlier LSN, it will block.
fn advance_last_valid_lsn(&self, lsn: Lsn);
fn get_last_valid_lsn(&self) -> Lsn;
fn init_valid_lsn(&self, lsn: Lsn);
/// Like `advance_last_valid_lsn`, but this always points to the end of
/// a WAL record, not in the middle of one.
///
/// This must be <= last valid LSN. This is tracked separately from last
/// valid LSN, so that the WAL receiver knows where to restart streaming.
/// Advance requires aligned LSN as an argument and would wake wait_lsn() callers.
/// Previous last record LSN is stored alongside the latest and can be read.
fn advance_last_record_lsn(&self, lsn: Lsn);
fn get_last_record_lsn(&self) -> Lsn;
// Like `advance_last_record_lsn`, but points to the start position of last record
fn get_prev_record_lsn(&self) -> Lsn;
///
@@ -293,77 +281,76 @@ mod tests {
// Create timeline to work on
let timelineid = ZTimelineId::from_str("11223344556677881122334455667788").unwrap();
let tline = repo.create_empty_timeline(timelineid, Lsn(0))?;
let tline = repo.create_empty_timeline(timelineid, Lsn(0x00))?;
tline.init_valid_lsn(Lsn(1));
tline.put_page_image(TESTREL_A, 0, Lsn(2), TEST_IMG("foo blk 0 at 2"))?;
tline.put_page_image(TESTREL_A, 0, Lsn(2), TEST_IMG("foo blk 0 at 2"))?;
tline.put_page_image(TESTREL_A, 0, Lsn(3), TEST_IMG("foo blk 0 at 3"))?;
tline.put_page_image(TESTREL_A, 1, Lsn(4), TEST_IMG("foo blk 1 at 4"))?;
tline.put_page_image(TESTREL_A, 2, Lsn(5), TEST_IMG("foo blk 2 at 5"))?;
tline.put_page_image(TESTREL_A, 0, Lsn(0x20), TEST_IMG("foo blk 0 at 2"))?;
tline.put_page_image(TESTREL_A, 0, Lsn(0x20), TEST_IMG("foo blk 0 at 2"))?;
tline.put_page_image(TESTREL_A, 0, Lsn(0x30), TEST_IMG("foo blk 0 at 3"))?;
tline.put_page_image(TESTREL_A, 1, Lsn(0x40), TEST_IMG("foo blk 1 at 4"))?;
tline.put_page_image(TESTREL_A, 2, Lsn(0x50), TEST_IMG("foo blk 2 at 5"))?;
tline.advance_last_valid_lsn(Lsn(5));
tline.advance_last_record_lsn(Lsn(0x50));
// The relation was created at LSN 2, not visible at LSN 1 yet.
assert_eq!(tline.get_rel_exists(TESTREL_A, Lsn(1))?, false);
assert!(tline.get_relish_size(TESTREL_A, Lsn(1))?.is_none());
assert_eq!(tline.get_rel_exists(TESTREL_A, Lsn(0x10))?, false);
assert!(tline.get_relish_size(TESTREL_A, Lsn(0x10))?.is_none());
assert_eq!(tline.get_rel_exists(TESTREL_A, Lsn(2))?, true);
assert_eq!(tline.get_relish_size(TESTREL_A, Lsn(2))?.unwrap(), 1);
assert_eq!(tline.get_relish_size(TESTREL_A, Lsn(5))?.unwrap(), 3);
assert_eq!(tline.get_rel_exists(TESTREL_A, Lsn(0x20))?, true);
assert_eq!(tline.get_relish_size(TESTREL_A, Lsn(0x20))?.unwrap(), 1);
assert_eq!(tline.get_relish_size(TESTREL_A, Lsn(0x50))?.unwrap(), 3);
// Check page contents at each LSN
assert_eq!(
tline.get_page_at_lsn(TESTREL_A, 0, Lsn(2))?,
tline.get_page_at_lsn(TESTREL_A, 0, Lsn(0x20))?,
TEST_IMG("foo blk 0 at 2")
);
assert_eq!(
tline.get_page_at_lsn(TESTREL_A, 0, Lsn(3))?,
tline.get_page_at_lsn(TESTREL_A, 0, Lsn(0x30))?,
TEST_IMG("foo blk 0 at 3")
);
assert_eq!(
tline.get_page_at_lsn(TESTREL_A, 0, Lsn(4))?,
tline.get_page_at_lsn(TESTREL_A, 0, Lsn(0x40))?,
TEST_IMG("foo blk 0 at 3")
);
assert_eq!(
tline.get_page_at_lsn(TESTREL_A, 1, Lsn(4))?,
tline.get_page_at_lsn(TESTREL_A, 1, Lsn(0x40))?,
TEST_IMG("foo blk 1 at 4")
);
assert_eq!(
tline.get_page_at_lsn(TESTREL_A, 0, Lsn(5))?,
tline.get_page_at_lsn(TESTREL_A, 0, Lsn(0x50))?,
TEST_IMG("foo blk 0 at 3")
);
assert_eq!(
tline.get_page_at_lsn(TESTREL_A, 1, Lsn(5))?,
tline.get_page_at_lsn(TESTREL_A, 1, Lsn(0x50))?,
TEST_IMG("foo blk 1 at 4")
);
assert_eq!(
tline.get_page_at_lsn(TESTREL_A, 2, Lsn(5))?,
tline.get_page_at_lsn(TESTREL_A, 2, Lsn(0x50))?,
TEST_IMG("foo blk 2 at 5")
);
// Truncate last block
tline.put_truncation(TESTREL_A, Lsn(6), 2)?;
tline.advance_last_valid_lsn(Lsn(6));
tline.put_truncation(TESTREL_A, Lsn(0x60), 2)?;
tline.advance_last_record_lsn(Lsn(0x60));
// Check reported size and contents after truncation
assert_eq!(tline.get_relish_size(TESTREL_A, Lsn(6))?.unwrap(), 2);
assert_eq!(tline.get_relish_size(TESTREL_A, Lsn(0x60))?.unwrap(), 2);
assert_eq!(
tline.get_page_at_lsn(TESTREL_A, 0, Lsn(6))?,
tline.get_page_at_lsn(TESTREL_A, 0, Lsn(0x60))?,
TEST_IMG("foo blk 0 at 3")
);
assert_eq!(
tline.get_page_at_lsn(TESTREL_A, 1, Lsn(6))?,
tline.get_page_at_lsn(TESTREL_A, 1, Lsn(0x60))?,
TEST_IMG("foo blk 1 at 4")
);
// should still see the truncated block with older LSN
assert_eq!(tline.get_relish_size(TESTREL_A, Lsn(5))?.unwrap(), 3);
assert_eq!(tline.get_relish_size(TESTREL_A, Lsn(0x50))?.unwrap(), 3);
assert_eq!(
tline.get_page_at_lsn(TESTREL_A, 2, Lsn(5))?,
tline.get_page_at_lsn(TESTREL_A, 2, Lsn(0x50))?,
TEST_IMG("foo blk 2 at 5")
);
@@ -376,17 +363,15 @@ mod tests {
fn test_large_rel() -> Result<()> {
let repo = get_test_repo("test_large_rel")?;
let timelineid = ZTimelineId::from_str("11223344556677881122334455667788").unwrap();
let tline = repo.create_empty_timeline(timelineid, Lsn(0))?;
let tline = repo.create_empty_timeline(timelineid, Lsn(0x00))?;
tline.init_valid_lsn(Lsn(1));
let mut lsn = 1;
let mut lsn = 0x10;
for blknum in 0..pg_constants::RELSEG_SIZE + 1 {
let img = TEST_IMG(&format!("foo blk {} at {}", blknum, Lsn(lsn)));
lsn += 1;
lsn += 0x10;
tline.put_page_image(TESTREL_A, blknum as u32, Lsn(lsn), img)?;
}
tline.advance_last_valid_lsn(Lsn(lsn));
tline.advance_last_record_lsn(Lsn(lsn));
assert_eq!(
tline.get_relish_size(TESTREL_A, Lsn(lsn))?.unwrap(),
@@ -394,18 +379,18 @@ mod tests {
);
// Truncate one block
lsn += 1;
lsn += 0x10;
tline.put_truncation(TESTREL_A, Lsn(lsn), pg_constants::RELSEG_SIZE)?;
tline.advance_last_valid_lsn(Lsn(lsn));
tline.advance_last_record_lsn(Lsn(lsn));
assert_eq!(
tline.get_relish_size(TESTREL_A, Lsn(lsn))?.unwrap(),
pg_constants::RELSEG_SIZE
);
// Truncate another block
lsn += 1;
lsn += 0x10;
tline.put_truncation(TESTREL_A, Lsn(lsn), pg_constants::RELSEG_SIZE - 1)?;
tline.advance_last_valid_lsn(Lsn(lsn));
tline.advance_last_record_lsn(Lsn(lsn));
assert_eq!(
tline.get_relish_size(TESTREL_A, Lsn(lsn))?.unwrap(),
pg_constants::RELSEG_SIZE - 1
@@ -415,9 +400,9 @@ mod tests {
// This tests the behavior at segment boundaries
let mut size: i32 = 3000;
while size >= 0 {
lsn += 1;
lsn += 0x10;
tline.put_truncation(TESTREL_A, Lsn(lsn), size as u32)?;
tline.advance_last_valid_lsn(Lsn(lsn));
tline.advance_last_record_lsn(Lsn(lsn));
assert_eq!(
tline.get_relish_size(TESTREL_A, Lsn(lsn))?.unwrap(),
size as u32
@@ -436,48 +421,47 @@ mod tests {
fn test_branch() -> Result<()> {
let repo = get_test_repo("test_branch")?;
let timelineid = ZTimelineId::from_str("11223344556677881122334455667788").unwrap();
let tline = repo.create_empty_timeline(timelineid, Lsn(0))?;
let tline = repo.create_empty_timeline(timelineid, Lsn(0x00))?;
// Import initial dummy checkpoint record, otherwise the get_timeline() call
// after branching fails below
tline.put_page_image(RelishTag::Checkpoint, 0, Lsn(1), ZERO_PAGE.clone())?;
tline.put_page_image(RelishTag::Checkpoint, 0, Lsn(0x10), ZERO_PAGE.clone())?;
// Create a relation on the timeline
tline.init_valid_lsn(Lsn(1));
tline.put_page_image(TESTREL_A, 0, Lsn(2), TEST_IMG("foo blk 0 at 2"))?;
tline.put_page_image(TESTREL_A, 0, Lsn(3), TEST_IMG("foo blk 0 at 3"))?;
tline.put_page_image(TESTREL_A, 0, Lsn(4), TEST_IMG("foo blk 0 at 4"))?;
tline.put_page_image(TESTREL_A, 0, Lsn(0x20), TEST_IMG("foo blk 0 at 2"))?;
tline.put_page_image(TESTREL_A, 0, Lsn(0x30), TEST_IMG("foo blk 0 at 3"))?;
tline.put_page_image(TESTREL_A, 0, Lsn(0x40), TEST_IMG("foo blk 0 at 4"))?;
// Create another relation
tline.put_page_image(TESTREL_B, 0, Lsn(2), TEST_IMG("foobar blk 0 at 2"))?;
tline.put_page_image(TESTREL_B, 0, Lsn(0x20), TEST_IMG("foobar blk 0 at 2"))?;
tline.advance_last_valid_lsn(Lsn(4));
tline.advance_last_record_lsn(Lsn(0x40));
// Branch the history, modify relation differently on the new timeline
let newtimelineid = ZTimelineId::from_str("AA223344556677881122334455667788").unwrap();
repo.branch_timeline(timelineid, newtimelineid, Lsn(3))?;
repo.branch_timeline(timelineid, newtimelineid, Lsn(0x30))?;
let newtline = repo.get_timeline(newtimelineid)?;
newtline.put_page_image(TESTREL_A, 0, Lsn(4), TEST_IMG("bar blk 0 at 4"))?;
newtline.advance_last_valid_lsn(Lsn(4));
newtline.put_page_image(TESTREL_A, 0, Lsn(0x40), TEST_IMG("bar blk 0 at 4"))?;
newtline.advance_last_record_lsn(Lsn(0x40));
// Check page contents on both branches
assert_eq!(
tline.get_page_at_lsn(TESTREL_A, 0, Lsn(4))?,
tline.get_page_at_lsn(TESTREL_A, 0, Lsn(0x40))?,
TEST_IMG("foo blk 0 at 4")
);
assert_eq!(
newtline.get_page_at_lsn(TESTREL_A, 0, Lsn(4))?,
newtline.get_page_at_lsn(TESTREL_A, 0, Lsn(0x40))?,
TEST_IMG("bar blk 0 at 4")
);
assert_eq!(
newtline.get_page_at_lsn(TESTREL_B, 0, Lsn(4))?,
newtline.get_page_at_lsn(TESTREL_B, 0, Lsn(0x40))?,
TEST_IMG("foobar blk 0 at 2")
);
assert_eq!(newtline.get_relish_size(TESTREL_B, Lsn(4))?.unwrap(), 1);
assert_eq!(newtline.get_relish_size(TESTREL_B, Lsn(0x40))?.unwrap(), 1);
Ok(())
}

View File

@@ -135,7 +135,7 @@ pub fn import_timeline_from_postgres_datadir(
}
// TODO: Scan pg_tblspc
timeline.advance_last_valid_lsn(lsn);
timeline.advance_last_record_lsn(lsn.align());
timeline.checkpoint()?;
Ok(())
@@ -338,7 +338,7 @@ pub fn import_timeline_wal(walpath: &Path, timeline: &dyn Timeline, startpoint:
let checkpoint_bytes = checkpoint.encode();
timeline.put_page_image(RelishTag::Checkpoint, 0, last_lsn, checkpoint_bytes)?;
timeline.advance_last_valid_lsn(last_lsn);
timeline.advance_last_record_lsn(last_lsn.align());
timeline.checkpoint()?;
Ok(())
}
@@ -536,7 +536,7 @@ pub fn save_decoded_record(
}
// Now that this record has been handled, let the repository know that
// it is up-to-date to this LSN
timeline.advance_last_record_lsn(lsn);
timeline.advance_last_record_lsn(lsn.align());
Ok(())
}

View File

@@ -179,7 +179,10 @@ impl WalStreamDecoder {
self.padlen = self.lsn.calc_padding(8u32) as u32;
}
let result = (self.lsn, recordbuf);
// Always align resulting LSN on 0x8 boundary -- that is important for getPage()
// and WalReceiver integration. Since this code is used both for WalReceiver and
// initial WAL import let's force alignment right here.
let result = (self.lsn.align(), recordbuf);
return Ok(Some(result));
}
continue;

View File

@@ -153,8 +153,7 @@ fn walreceiver_main(
// Start streaming the WAL, from where we left off previously.
//
// If we had previously received WAL up to some point in the middle of a WAL record, we
// better start from the end of last full WAL record, not in the middle of one. Hence,
// use 'last_record_lsn' rather than 'last_valid_lsn' here.
// better start from the end of last full WAL record, not in the middle of one.
let mut last_rec_lsn = timeline.get_last_record_lsn();
let mut startpoint = last_rec_lsn;
@@ -163,10 +162,6 @@ fn walreceiver_main(
}
// There might be some padding after the last full record, skip it.
//
// FIXME: It probably would be better to always start streaming from the beginning
// of the page, or the segment, so that we could check the page/segment headers
// too. Just for the sake of paranoia.
startpoint += startpoint.calc_padding(8u32);
debug!(
@@ -212,6 +207,12 @@ fn walreceiver_main(
// Save old checkpoint value to compare with it after decoding WAL record
let old_checkpoint_bytes = checkpoint.encode();
let decoded = decode_wal_record(recdata.clone());
// It is important to deal with the aligned records as lsn in getPage@LSN is
// aligned and can be several bytes bigger. Without this alignment we are
// at risk of hittind a deadlock.
assert!(lsn.is_aligned());
restore_local_repo::save_decoded_record(
&mut checkpoint,
&*timeline,
@@ -233,14 +234,6 @@ fn walreceiver_main(
}
}
// Update the last_valid LSN value in the timeline one more time. We updated
// it in the loop above, between each WAL record, but we might have received
// a partial record after the last completed record. Our timeline's value
// better reflect that, because GetPage@LSN requests might also point in the
// middle of a record, if the request LSN was taken from the server's current
// flush ptr.
timeline.advance_last_valid_lsn(endlsn);
// Somewhat arbitrarily, if we have at least 10 complete wal segments (16 MB each),
// "checkpoint" the repository to flush all the changes from WAL we've processed
// so far to disk. After this, we don't need the original WAL anymore, and it
@@ -290,7 +283,7 @@ fn walreceiver_main(
);
if reply_requested {
Some(timeline.get_last_valid_lsn())
Some(timeline.get_last_record_lsn())
} else {
None
}

View File

@@ -85,6 +85,16 @@ impl Lsn {
// (Regular subtraction will panic on overflow in debug builds.)
(sz.wrapping_sub(self.0)) % sz
}
/// Align LSN on 8-byte boundary (alignment of WAL records).
pub fn align(&self) -> Lsn {
Lsn((self.0 + 7) & !7)
}
/// Align LSN on 8-byte boundary (alignment of WAL records).
pub fn is_aligned(&self) -> bool {
*self == self.align()
}
}
impl From<u64> for Lsn {

View File

@@ -18,13 +18,28 @@ pub enum SeqWaitError {
Shutdown,
}
/// Monotonically increasing value
///
/// It is handy to store some other fields under the same mutex in SeqWait<S>
/// (e.g. store prev_record_lsn). So we allow SeqWait to be parametrized with
/// any type that can expose counter. <V> is the type of exposed counter.
pub trait MonotonicCounter<V> {
/// Bump counter value and check that it goes forward
/// N.B.: new_val is an actual new value, not a difference.
fn cnt_advance(&mut self, new_val: V);
/// Get counter value
fn cnt_value(&self) -> V;
}
/// Internal components of a `SeqWait`
struct SeqWaitInt<T>
struct SeqWaitInt<S, V>
where
T: Ord,
S: MonotonicCounter<V>,
V: Ord,
{
waiters: BinaryHeap<Waiter<T>>,
current: T,
waiters: BinaryHeap<Waiter<V>>,
current: S,
shutdown: bool,
}
@@ -72,19 +87,23 @@ impl<T: Ord> Eq for Waiter<T> {}
/// [`wait_for`]: SeqWait::wait_for
/// [`advance`]: SeqWait::advance
///
pub struct SeqWait<T>
/// <S> means Storage, <V> is type of counter that this storage exposes.
///
pub struct SeqWait<S, V>
where
T: Ord,
S: MonotonicCounter<V>,
V: Ord,
{
internal: Mutex<SeqWaitInt<T>>,
internal: Mutex<SeqWaitInt<S, V>>,
}
impl<T> SeqWait<T>
impl<S, V> SeqWait<S, V>
where
T: Ord + Debug + Copy,
S: MonotonicCounter<V> + Copy,
V: Ord + Copy,
{
/// Create a new `SeqWait`, initialized to a particular number
pub fn new(starting_num: T) -> Self {
pub fn new(starting_num: S) -> Self {
let internal = SeqWaitInt {
waiters: BinaryHeap::new(),
current: starting_num,
@@ -122,7 +141,7 @@ where
///
/// This call won't complete until someone has called `advance`
/// with a number greater than or equal to the one we're waiting for.
pub fn wait_for(&self, num: T) -> Result<(), SeqWaitError> {
pub fn wait_for(&self, num: V) -> Result<(), SeqWaitError> {
match self.queue_for_wait(num) {
Ok(None) => Ok(()),
Ok(Some(rx)) => rx.recv().map_err(|_| SeqWaitError::Shutdown),
@@ -137,7 +156,7 @@ where
///
/// If that hasn't happened after the specified timeout duration,
/// [`SeqWaitError::Timeout`] will be returned.
pub fn wait_for_timeout(&self, num: T, timeout_duration: Duration) -> Result<(), SeqWaitError> {
pub fn wait_for_timeout(&self, num: V, timeout_duration: Duration) -> Result<(), SeqWaitError> {
match self.queue_for_wait(num) {
Ok(None) => Ok(()),
Ok(Some(rx)) => rx.recv_timeout(timeout_duration).map_err(|e| match e {
@@ -150,9 +169,9 @@ where
/// Register and return a channel that will be notified when a number arrives,
/// or None, if it has already arrived.
fn queue_for_wait(&self, num: T) -> Result<Option<Receiver<()>>, SeqWaitError> {
fn queue_for_wait(&self, num: V) -> Result<Option<Receiver<()>>, SeqWaitError> {
let mut internal = self.internal.lock().unwrap();
if internal.current >= num {
if internal.current.cnt_value() >= num {
return Ok(None);
}
if internal.shutdown {
@@ -174,16 +193,16 @@ where
/// All waiters at this value or below will be woken.
///
/// Returns the old number.
pub fn advance(&self, num: T) -> T {
pub fn advance(&self, num: V) -> V {
let old_value;
let wake_these = {
let mut internal = self.internal.lock().unwrap();
old_value = internal.current;
old_value = internal.current.cnt_value();
if old_value >= num {
return old_value;
}
internal.current = num;
internal.current.cnt_advance(num);
// Pop all waiters <= num from the heap. Collect them in a vector, and
// wake them up after releasing the lock.
@@ -206,7 +225,7 @@ where
}
/// Read the current value, without waiting.
pub fn load(&self) -> T {
pub fn load(&self) -> S {
self.internal.lock().unwrap().current
}
}
@@ -219,6 +238,16 @@ mod tests {
use std::thread::spawn;
use std::time::Duration;
impl MonotonicCounter<i32> for i32 {
fn cnt_advance(&mut self, val: i32) {
assert!(*self <= val);
*self = val;
}
fn cnt_value(&self) -> i32 {
*self
}
}
#[test]
fn seqwait() {
let seq = Arc::new(SeqWait::new(0));