mirror of
https://github.com/neondatabase/neon.git
synced 2026-01-05 12:32:54 +00:00
pageserver - create TimelineWriter
This commit is contained in:
committed by
Patrick Insinger
parent
f3445949d1
commit
6e5ca5dc5c
@@ -224,7 +224,7 @@ fn bootstrap_timeline(
|
||||
let timeline = repo.create_empty_timeline(tli)?;
|
||||
restore_local_repo::import_timeline_from_postgres_datadir(
|
||||
&pgdata_path,
|
||||
timeline.as_ref(),
|
||||
timeline.writer().as_ref(),
|
||||
lsn,
|
||||
)?;
|
||||
timeline.checkpoint()?;
|
||||
|
||||
@@ -26,7 +26,7 @@ use std::convert::TryInto;
|
||||
use std::fs;
|
||||
use std::fs::{File, OpenOptions};
|
||||
use std::io::Write;
|
||||
use std::ops::Bound::Included;
|
||||
use std::ops::{Bound::Included, Deref};
|
||||
use std::path::{Path, PathBuf};
|
||||
use std::sync::atomic::{AtomicUsize, Ordering};
|
||||
use std::sync::{Arc, Mutex, MutexGuard};
|
||||
@@ -36,7 +36,7 @@ use std::time::{Duration, Instant};
|
||||
use crate::layered_repository::inmemory_layer::FreezeLayers;
|
||||
use crate::relish::*;
|
||||
use crate::relish_storage::schedule_timeline_upload;
|
||||
use crate::repository::{GcResult, Repository, Timeline, WALRecord};
|
||||
use crate::repository::{GcResult, Repository, Timeline, TimelineWriter, WALRecord};
|
||||
use crate::tenant_mgr;
|
||||
use crate::walreceiver;
|
||||
use crate::walreceiver::IS_WAL_RECEIVER;
|
||||
@@ -841,131 +841,6 @@ impl Timeline for LayeredTimeline {
|
||||
Ok(result)
|
||||
}
|
||||
|
||||
fn put_wal_record(&self, rel: RelishTag, blknum: u32, rec: WALRecord) -> Result<()> {
|
||||
if !rel.is_blocky() && blknum != 0 {
|
||||
bail!(
|
||||
"invalid request for block {} for non-blocky relish {}",
|
||||
blknum,
|
||||
rel
|
||||
);
|
||||
}
|
||||
ensure!(rec.lsn.is_aligned(), "unaligned record LSN");
|
||||
|
||||
let seg = SegmentTag::from_blknum(rel, blknum);
|
||||
let delta_size = self.perform_write_op(seg, rec.lsn, |layer| {
|
||||
layer.put_wal_record(blknum, rec.clone())
|
||||
})?;
|
||||
self.increase_current_logical_size(delta_size * BLCKSZ as u32);
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn put_truncation(&self, rel: RelishTag, lsn: Lsn, relsize: u32) -> anyhow::Result<()> {
|
||||
if !rel.is_blocky() {
|
||||
bail!("invalid truncation for non-blocky relish {}", rel);
|
||||
}
|
||||
ensure!(lsn.is_aligned(), "unaligned record LSN");
|
||||
|
||||
debug!("put_truncation: {} to {} blocks at {}", rel, relsize, lsn);
|
||||
|
||||
let oldsize = self
|
||||
.get_relish_size(rel, self.get_last_record_lsn())?
|
||||
.ok_or_else(|| {
|
||||
anyhow!(
|
||||
"attempted to truncate non-existent relish {} at {}",
|
||||
rel,
|
||||
lsn
|
||||
)
|
||||
})?;
|
||||
|
||||
if oldsize <= relsize {
|
||||
return Ok(());
|
||||
}
|
||||
let old_last_seg = (oldsize - 1) / RELISH_SEG_SIZE;
|
||||
|
||||
let last_remain_seg = if relsize == 0 {
|
||||
0
|
||||
} else {
|
||||
(relsize - 1) / RELISH_SEG_SIZE
|
||||
};
|
||||
|
||||
// Drop segments beyond the last remaining segment.
|
||||
for remove_segno in (last_remain_seg + 1)..=old_last_seg {
|
||||
let seg = SegmentTag {
|
||||
rel,
|
||||
segno: remove_segno,
|
||||
};
|
||||
self.perform_write_op(seg, lsn, |layer| layer.drop_segment(lsn))?;
|
||||
}
|
||||
|
||||
// Truncate the last remaining segment to the specified size
|
||||
if relsize == 0 || relsize % RELISH_SEG_SIZE != 0 {
|
||||
let seg = SegmentTag {
|
||||
rel,
|
||||
segno: last_remain_seg,
|
||||
};
|
||||
self.perform_write_op(seg, lsn, |layer| {
|
||||
layer.put_truncation(lsn, relsize % RELISH_SEG_SIZE)
|
||||
})?;
|
||||
}
|
||||
self.decrease_current_logical_size((oldsize - relsize) * BLCKSZ as u32);
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn drop_relish(&self, rel: RelishTag, lsn: Lsn) -> Result<()> {
|
||||
trace!("drop_segment: {} at {}", rel, lsn);
|
||||
|
||||
if rel.is_blocky() {
|
||||
if let Some(oldsize) = self.get_relish_size(rel, self.get_last_record_lsn())? {
|
||||
let old_last_seg = if oldsize == 0 {
|
||||
0
|
||||
} else {
|
||||
(oldsize - 1) / RELISH_SEG_SIZE
|
||||
};
|
||||
|
||||
// Drop all segments of the relish
|
||||
for remove_segno in 0..=old_last_seg {
|
||||
let seg = SegmentTag {
|
||||
rel,
|
||||
segno: remove_segno,
|
||||
};
|
||||
self.perform_write_op(seg, lsn, |layer| layer.drop_segment(lsn))?;
|
||||
}
|
||||
self.decrease_current_logical_size(oldsize * BLCKSZ as u32);
|
||||
} else {
|
||||
warn!(
|
||||
"drop_segment called on non-existent relish {} at {}",
|
||||
rel, lsn
|
||||
);
|
||||
}
|
||||
} else {
|
||||
// TODO handle TwoPhase relishes
|
||||
let seg = SegmentTag::from_blknum(rel, 0);
|
||||
self.perform_write_op(seg, lsn, |layer| layer.drop_segment(lsn))?;
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn put_page_image(&self, rel: RelishTag, blknum: u32, lsn: Lsn, img: Bytes) -> Result<()> {
|
||||
if !rel.is_blocky() && blknum != 0 {
|
||||
bail!(
|
||||
"invalid request for block {} for non-blocky relish {}",
|
||||
blknum,
|
||||
rel
|
||||
);
|
||||
}
|
||||
ensure!(lsn.is_aligned(), "unaligned record LSN");
|
||||
|
||||
let seg = SegmentTag::from_blknum(rel, blknum);
|
||||
|
||||
let delta_size = self.perform_write_op(seg, lsn, |layer| {
|
||||
layer.put_page_image(blknum, lsn, img.clone())
|
||||
})?;
|
||||
|
||||
self.increase_current_logical_size(delta_size * BLCKSZ as u32);
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Public entry point for checkpoint(). All the logic is in the private
|
||||
/// checkpoint_internal function, this public facade just wraps it for
|
||||
/// metrics collection.
|
||||
@@ -976,15 +851,6 @@ impl Timeline for LayeredTimeline {
|
||||
.observe_closure_duration(|| self.checkpoint_internal(0, true))
|
||||
}
|
||||
|
||||
///
|
||||
/// Remember the (end of) last valid WAL record remembered in the timeline.
|
||||
///
|
||||
fn advance_last_record_lsn(&self, new_lsn: Lsn) {
|
||||
assert!(new_lsn.is_aligned());
|
||||
|
||||
self.last_record_lsn.advance(new_lsn);
|
||||
}
|
||||
|
||||
fn get_last_record_lsn(&self) -> Lsn {
|
||||
self.last_record_lsn.load().last
|
||||
}
|
||||
@@ -1035,6 +901,10 @@ impl Timeline for LayeredTimeline {
|
||||
|
||||
Ok(total_blocks * BLCKSZ as usize)
|
||||
}
|
||||
|
||||
fn writer<'a>(&'a self) -> Box<dyn TimelineWriter + 'a> {
|
||||
Box::new(LayeredTimelineWriter(self))
|
||||
}
|
||||
}
|
||||
|
||||
impl LayeredTimeline {
|
||||
@@ -1923,6 +1793,160 @@ impl LayeredTimeline {
|
||||
}
|
||||
}
|
||||
|
||||
struct LayeredTimelineWriter<'a>(&'a LayeredTimeline);
|
||||
|
||||
impl Deref for LayeredTimelineWriter<'_> {
|
||||
type Target = dyn Timeline;
|
||||
|
||||
fn deref(&self) -> &Self::Target {
|
||||
self.0
|
||||
}
|
||||
}
|
||||
|
||||
impl<'a> TimelineWriter for LayeredTimelineWriter<'a> {
|
||||
fn put_wal_record(&self, rel: RelishTag, blknum: u32, rec: WALRecord) -> Result<()> {
|
||||
if !rel.is_blocky() && blknum != 0 {
|
||||
bail!(
|
||||
"invalid request for block {} for non-blocky relish {}",
|
||||
blknum,
|
||||
rel
|
||||
);
|
||||
}
|
||||
ensure!(rec.lsn.is_aligned(), "unaligned record LSN");
|
||||
|
||||
let seg = SegmentTag::from_blknum(rel, blknum);
|
||||
let delta_size = self.0.perform_write_op(seg, rec.lsn, |layer| {
|
||||
layer.put_wal_record(blknum, rec.clone())
|
||||
})?;
|
||||
self.0
|
||||
.increase_current_logical_size(delta_size * BLCKSZ as u32);
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn put_page_image(&self, rel: RelishTag, blknum: u32, lsn: Lsn, img: Bytes) -> Result<()> {
|
||||
if !rel.is_blocky() && blknum != 0 {
|
||||
bail!(
|
||||
"invalid request for block {} for non-blocky relish {}",
|
||||
blknum,
|
||||
rel
|
||||
);
|
||||
}
|
||||
ensure!(lsn.is_aligned(), "unaligned record LSN");
|
||||
|
||||
let seg = SegmentTag::from_blknum(rel, blknum);
|
||||
|
||||
let delta_size = self.0.perform_write_op(seg, lsn, |layer| {
|
||||
layer.put_page_image(blknum, lsn, img.clone())
|
||||
})?;
|
||||
|
||||
self.0
|
||||
.increase_current_logical_size(delta_size * BLCKSZ as u32);
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn put_truncation(&self, rel: RelishTag, lsn: Lsn, relsize: u32) -> Result<()> {
|
||||
if !rel.is_blocky() {
|
||||
bail!("invalid truncation for non-blocky relish {}", rel);
|
||||
}
|
||||
ensure!(lsn.is_aligned(), "unaligned record LSN");
|
||||
|
||||
debug!("put_truncation: {} to {} blocks at {}", rel, relsize, lsn);
|
||||
|
||||
let oldsize = self
|
||||
.0
|
||||
.get_relish_size(rel, self.0.get_last_record_lsn())?
|
||||
.ok_or_else(|| {
|
||||
anyhow!(
|
||||
"attempted to truncate non-existent relish {} at {}",
|
||||
rel,
|
||||
lsn
|
||||
)
|
||||
})?;
|
||||
|
||||
if oldsize <= relsize {
|
||||
return Ok(());
|
||||
}
|
||||
let old_last_seg = (oldsize - 1) / RELISH_SEG_SIZE;
|
||||
|
||||
let last_remain_seg = if relsize == 0 {
|
||||
0
|
||||
} else {
|
||||
(relsize - 1) / RELISH_SEG_SIZE
|
||||
};
|
||||
|
||||
// Drop segments beyond the last remaining segment.
|
||||
for remove_segno in (last_remain_seg + 1)..=old_last_seg {
|
||||
let seg = SegmentTag {
|
||||
rel,
|
||||
segno: remove_segno,
|
||||
};
|
||||
self.0
|
||||
.perform_write_op(seg, lsn, |layer| layer.drop_segment(lsn))?;
|
||||
}
|
||||
|
||||
// Truncate the last remaining segment to the specified size
|
||||
if relsize == 0 || relsize % RELISH_SEG_SIZE != 0 {
|
||||
let seg = SegmentTag {
|
||||
rel,
|
||||
segno: last_remain_seg,
|
||||
};
|
||||
self.0.perform_write_op(seg, lsn, |layer| {
|
||||
layer.put_truncation(lsn, relsize % RELISH_SEG_SIZE)
|
||||
})?;
|
||||
}
|
||||
self.0
|
||||
.decrease_current_logical_size((oldsize - relsize) * BLCKSZ as u32);
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn drop_relish(&self, rel: RelishTag, lsn: Lsn) -> Result<()> {
|
||||
trace!("drop_segment: {} at {}", rel, lsn);
|
||||
|
||||
if rel.is_blocky() {
|
||||
if let Some(oldsize) = self.0.get_relish_size(rel, self.0.get_last_record_lsn())? {
|
||||
let old_last_seg = if oldsize == 0 {
|
||||
0
|
||||
} else {
|
||||
(oldsize - 1) / RELISH_SEG_SIZE
|
||||
};
|
||||
|
||||
// Drop all segments of the relish
|
||||
for remove_segno in 0..=old_last_seg {
|
||||
let seg = SegmentTag {
|
||||
rel,
|
||||
segno: remove_segno,
|
||||
};
|
||||
self.0
|
||||
.perform_write_op(seg, lsn, |layer| layer.drop_segment(lsn))?;
|
||||
}
|
||||
self.0
|
||||
.decrease_current_logical_size(oldsize * BLCKSZ as u32);
|
||||
} else {
|
||||
warn!(
|
||||
"drop_segment called on non-existent relish {} at {}",
|
||||
rel, lsn
|
||||
);
|
||||
}
|
||||
} else {
|
||||
// TODO handle TwoPhase relishes
|
||||
let seg = SegmentTag::from_blknum(rel, 0);
|
||||
self.0
|
||||
.perform_write_op(seg, lsn, |layer| layer.drop_segment(lsn))?;
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
///
|
||||
/// Remember the (end of) last valid WAL record remembered in the timeline.
|
||||
///
|
||||
fn advance_last_record_lsn(&self, new_lsn: Lsn) {
|
||||
assert!(new_lsn.is_aligned());
|
||||
|
||||
self.0.last_record_lsn.advance(new_lsn);
|
||||
}
|
||||
}
|
||||
|
||||
/// Dump contents of a layer file to stdout.
|
||||
pub fn dump_layerfile_from_path(path: &Path) -> Result<()> {
|
||||
let file = File::open(path)?;
|
||||
|
||||
@@ -3,7 +3,7 @@ use anyhow::Result;
|
||||
use bytes::{Buf, BufMut, Bytes, BytesMut};
|
||||
use serde::{Deserialize, Serialize};
|
||||
use std::collections::HashSet;
|
||||
use std::ops::AddAssign;
|
||||
use std::ops::{AddAssign, Deref};
|
||||
use std::sync::Arc;
|
||||
use std::time::Duration;
|
||||
use zenith_utils::lsn::{Lsn, RecordLsn};
|
||||
@@ -125,6 +125,39 @@ pub trait Timeline: Send + Sync {
|
||||
// These are called by the WAL receiver to digest WAL records.
|
||||
//------------------------------------------------------------------------------
|
||||
|
||||
/// Atomically get both last and prev.
|
||||
fn get_last_record_rlsn(&self) -> RecordLsn;
|
||||
/// Get last or prev record separately. Same as get_last_record_rlsn().last/prev.
|
||||
fn get_last_record_lsn(&self) -> Lsn;
|
||||
fn get_prev_record_lsn(&self) -> Lsn;
|
||||
fn get_start_lsn(&self) -> Lsn;
|
||||
|
||||
/// Mutate the timeline with a [`TimelineWriter`].
|
||||
fn writer<'a>(&'a self) -> Box<dyn TimelineWriter + 'a>;
|
||||
|
||||
///
|
||||
/// Flush to disk all data that was written with the put_* functions
|
||||
///
|
||||
/// NOTE: This has nothing to do with checkpoint in PostgreSQL. We don't
|
||||
/// know anything about them here in the repository.
|
||||
fn checkpoint(&self) -> Result<()>;
|
||||
|
||||
/// Retrieve current logical size of the timeline
|
||||
///
|
||||
/// NOTE: counted incrementally, includes ancestors,
|
||||
/// doesnt support TwoPhase relishes yet
|
||||
fn get_current_logical_size(&self) -> usize;
|
||||
|
||||
/// Does the same as get_current_logical_size but counted on demand.
|
||||
/// Used in tests to ensure thet incremental and non incremental variants match.
|
||||
fn get_current_logical_size_non_incremental(&self, lsn: Lsn) -> Result<usize>;
|
||||
}
|
||||
|
||||
/// Various functions to mutate the timeline.
|
||||
// TODO Currently, Deref is used to allow easy access to read methods from this trait.
|
||||
// This is probably considered a bad practice in Rust and should be fixed eventually,
|
||||
// but will cause large code changes.
|
||||
pub trait TimelineWriter: Deref<Target = dyn Timeline> {
|
||||
/// Put a new page version that can be constructed from a WAL record
|
||||
///
|
||||
/// This will implicitly extend the relation, if the page is beyond the
|
||||
@@ -145,29 +178,6 @@ pub trait Timeline: Send + Sync {
|
||||
/// Advance requires aligned LSN as an argument and would wake wait_lsn() callers.
|
||||
/// Previous last record LSN is stored alongside the latest and can be read.
|
||||
fn advance_last_record_lsn(&self, lsn: Lsn);
|
||||
/// Atomically get both last and prev.
|
||||
fn get_last_record_rlsn(&self) -> RecordLsn;
|
||||
/// Get last or prev record separately. Same as get_last_record_rlsn().last/prev.
|
||||
fn get_last_record_lsn(&self) -> Lsn;
|
||||
fn get_prev_record_lsn(&self) -> Lsn;
|
||||
fn get_start_lsn(&self) -> Lsn;
|
||||
|
||||
///
|
||||
/// Flush to disk all data that was written with the put_* functions
|
||||
///
|
||||
/// NOTE: This has nothing to do with checkpoint in PostgreSQL. We don't
|
||||
/// know anything about them here in the repository.
|
||||
fn checkpoint(&self) -> Result<()>;
|
||||
|
||||
/// Retrieve current logical size of the timeline
|
||||
///
|
||||
/// NOTE: counted incrementally, includes ancestors,
|
||||
/// doesnt support TwoPhase relishes yet
|
||||
fn get_current_logical_size(&self) -> usize;
|
||||
|
||||
/// Does the same as get_current_logical_size but counted on demand.
|
||||
/// Used in tests to ensure thet incremental and non incremental variants match.
|
||||
fn get_current_logical_size_non_incremental(&self, lsn: Lsn) -> Result<usize>;
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
|
||||
@@ -308,14 +318,15 @@ mod tests {
|
||||
|
||||
// Create timeline to work on
|
||||
let tline = repo.create_empty_timeline(TIMELINE_ID)?;
|
||||
let writer = tline.writer();
|
||||
|
||||
tline.put_page_image(TESTREL_A, 0, Lsn(0x20), TEST_IMG("foo blk 0 at 2"))?;
|
||||
tline.put_page_image(TESTREL_A, 0, Lsn(0x20), TEST_IMG("foo blk 0 at 2"))?;
|
||||
tline.put_page_image(TESTREL_A, 0, Lsn(0x30), TEST_IMG("foo blk 0 at 3"))?;
|
||||
tline.put_page_image(TESTREL_A, 1, Lsn(0x40), TEST_IMG("foo blk 1 at 4"))?;
|
||||
tline.put_page_image(TESTREL_A, 2, Lsn(0x50), TEST_IMG("foo blk 2 at 5"))?;
|
||||
writer.put_page_image(TESTREL_A, 0, Lsn(0x20), TEST_IMG("foo blk 0 at 2"))?;
|
||||
writer.put_page_image(TESTREL_A, 0, Lsn(0x20), TEST_IMG("foo blk 0 at 2"))?;
|
||||
writer.put_page_image(TESTREL_A, 0, Lsn(0x30), TEST_IMG("foo blk 0 at 3"))?;
|
||||
writer.put_page_image(TESTREL_A, 1, Lsn(0x40), TEST_IMG("foo blk 1 at 4"))?;
|
||||
writer.put_page_image(TESTREL_A, 2, Lsn(0x50), TEST_IMG("foo blk 2 at 5"))?;
|
||||
|
||||
tline.advance_last_record_lsn(Lsn(0x50));
|
||||
writer.advance_last_record_lsn(Lsn(0x50));
|
||||
|
||||
assert_current_logical_size(&tline, Lsn(0x50));
|
||||
|
||||
@@ -361,8 +372,8 @@ mod tests {
|
||||
);
|
||||
|
||||
// Truncate last block
|
||||
tline.put_truncation(TESTREL_A, Lsn(0x60), 2)?;
|
||||
tline.advance_last_record_lsn(Lsn(0x60));
|
||||
writer.put_truncation(TESTREL_A, Lsn(0x60), 2)?;
|
||||
writer.advance_last_record_lsn(Lsn(0x60));
|
||||
assert_current_logical_size(&tline, Lsn(0x60));
|
||||
|
||||
// Check reported size and contents after truncation
|
||||
@@ -384,13 +395,13 @@ mod tests {
|
||||
);
|
||||
|
||||
// Truncate to zero length
|
||||
tline.put_truncation(TESTREL_A, Lsn(0x68), 0)?;
|
||||
tline.advance_last_record_lsn(Lsn(0x68));
|
||||
writer.put_truncation(TESTREL_A, Lsn(0x68), 0)?;
|
||||
writer.advance_last_record_lsn(Lsn(0x68));
|
||||
assert_eq!(tline.get_relish_size(TESTREL_A, Lsn(0x68))?.unwrap(), 0);
|
||||
|
||||
// Extend from 0 to 2 blocks, leaving a gap
|
||||
tline.put_page_image(TESTREL_A, 1, Lsn(0x70), TEST_IMG("foo blk 1"))?;
|
||||
tline.advance_last_record_lsn(Lsn(0x70));
|
||||
writer.put_page_image(TESTREL_A, 1, Lsn(0x70), TEST_IMG("foo blk 1"))?;
|
||||
writer.advance_last_record_lsn(Lsn(0x70));
|
||||
assert_eq!(tline.get_relish_size(TESTREL_A, Lsn(0x70))?.unwrap(), 2);
|
||||
assert_eq!(tline.get_page_at_lsn(TESTREL_A, 0, Lsn(0x70))?, ZERO_PAGE);
|
||||
assert_eq!(
|
||||
@@ -425,25 +436,26 @@ mod tests {
|
||||
|
||||
// Create timeline to work on
|
||||
let tline = repo.create_empty_timeline(TIMELINE_ID)?;
|
||||
let writer = tline.writer();
|
||||
|
||||
tline.put_page_image(TESTREL_A, 0, Lsn(0x20), TEST_IMG("foo blk 0 at 2"))?;
|
||||
tline.advance_last_record_lsn(Lsn(0x20));
|
||||
writer.put_page_image(TESTREL_A, 0, Lsn(0x20), TEST_IMG("foo blk 0 at 2"))?;
|
||||
writer.advance_last_record_lsn(Lsn(0x20));
|
||||
|
||||
// Check that rel exists and size is correct
|
||||
assert_eq!(tline.get_rel_exists(TESTREL_A, Lsn(0x20))?, true);
|
||||
assert_eq!(tline.get_relish_size(TESTREL_A, Lsn(0x20))?.unwrap(), 1);
|
||||
|
||||
// Drop relish
|
||||
tline.drop_relish(TESTREL_A, Lsn(0x30))?;
|
||||
tline.advance_last_record_lsn(Lsn(0x30));
|
||||
writer.drop_relish(TESTREL_A, Lsn(0x30))?;
|
||||
writer.advance_last_record_lsn(Lsn(0x30));
|
||||
|
||||
// Check that rel is not visible anymore
|
||||
assert_eq!(tline.get_rel_exists(TESTREL_A, Lsn(0x30))?, false);
|
||||
assert!(tline.get_relish_size(TESTREL_A, Lsn(0x30))?.is_none());
|
||||
|
||||
// Extend it again
|
||||
tline.put_page_image(TESTREL_A, 0, Lsn(0x40), TEST_IMG("foo blk 0 at 4"))?;
|
||||
tline.advance_last_record_lsn(Lsn(0x40));
|
||||
writer.put_page_image(TESTREL_A, 0, Lsn(0x40), TEST_IMG("foo blk 0 at 4"))?;
|
||||
writer.advance_last_record_lsn(Lsn(0x40));
|
||||
|
||||
// Check that rel exists and size is correct
|
||||
assert_eq!(tline.get_rel_exists(TESTREL_A, Lsn(0x40))?, true);
|
||||
@@ -461,6 +473,7 @@ mod tests {
|
||||
|
||||
// Create timeline to work on
|
||||
let tline = repo.create_empty_timeline(TIMELINE_ID)?;
|
||||
let writer = tline.writer();
|
||||
|
||||
//from storage_layer.rs
|
||||
const RELISH_SEG_SIZE: u32 = 10 * 1024 * 1024 / 8192;
|
||||
@@ -470,10 +483,10 @@ mod tests {
|
||||
for blkno in 0..relsize {
|
||||
let lsn = Lsn(0x20);
|
||||
let data = format!("foo blk {} at {}", blkno, lsn);
|
||||
tline.put_page_image(TESTREL_A, blkno, lsn, TEST_IMG(&data))?;
|
||||
writer.put_page_image(TESTREL_A, blkno, lsn, TEST_IMG(&data))?;
|
||||
}
|
||||
|
||||
tline.advance_last_record_lsn(Lsn(0x20));
|
||||
writer.advance_last_record_lsn(Lsn(0x20));
|
||||
|
||||
// The relation was created at LSN 2, not visible at LSN 1 yet.
|
||||
assert_eq!(tline.get_rel_exists(TESTREL_A, Lsn(0x10))?, false);
|
||||
@@ -497,8 +510,8 @@ mod tests {
|
||||
|
||||
// Truncate relation so that second segment was dropped
|
||||
// - only leave one page
|
||||
tline.put_truncation(TESTREL_A, Lsn(0x60), 1)?;
|
||||
tline.advance_last_record_lsn(Lsn(0x60));
|
||||
writer.put_truncation(TESTREL_A, Lsn(0x60), 1)?;
|
||||
writer.advance_last_record_lsn(Lsn(0x60));
|
||||
|
||||
// Check reported size and contents after truncation
|
||||
assert_eq!(tline.get_relish_size(TESTREL_A, Lsn(0x60))?.unwrap(), 1);
|
||||
@@ -531,9 +544,9 @@ mod tests {
|
||||
for blkno in 0..relsize {
|
||||
let lsn = Lsn(0x80);
|
||||
let data = format!("foo blk {} at {}", blkno, lsn);
|
||||
tline.put_page_image(TESTREL_A, blkno, lsn, TEST_IMG(&data))?;
|
||||
writer.put_page_image(TESTREL_A, blkno, lsn, TEST_IMG(&data))?;
|
||||
}
|
||||
tline.advance_last_record_lsn(Lsn(0x80));
|
||||
writer.advance_last_record_lsn(Lsn(0x80));
|
||||
|
||||
assert_eq!(tline.get_rel_exists(TESTREL_A, Lsn(0x80))?, true);
|
||||
assert_eq!(
|
||||
@@ -559,14 +572,15 @@ mod tests {
|
||||
fn test_large_rel() -> Result<()> {
|
||||
let repo = RepoHarness::create("test_large_rel")?.load();
|
||||
let tline = repo.create_empty_timeline(TIMELINE_ID)?;
|
||||
let writer = tline.writer();
|
||||
|
||||
let mut lsn = 0x10;
|
||||
for blknum in 0..pg_constants::RELSEG_SIZE + 1 {
|
||||
let img = TEST_IMG(&format!("foo blk {} at {}", blknum, Lsn(lsn)));
|
||||
lsn += 0x10;
|
||||
tline.put_page_image(TESTREL_A, blknum as u32, Lsn(lsn), img)?;
|
||||
writer.put_page_image(TESTREL_A, blknum as u32, Lsn(lsn), img)?;
|
||||
}
|
||||
tline.advance_last_record_lsn(Lsn(lsn));
|
||||
writer.advance_last_record_lsn(Lsn(lsn));
|
||||
|
||||
assert_current_logical_size(&tline, Lsn(lsn));
|
||||
|
||||
@@ -577,8 +591,8 @@ mod tests {
|
||||
|
||||
// Truncate one block
|
||||
lsn += 0x10;
|
||||
tline.put_truncation(TESTREL_A, Lsn(lsn), pg_constants::RELSEG_SIZE)?;
|
||||
tline.advance_last_record_lsn(Lsn(lsn));
|
||||
writer.put_truncation(TESTREL_A, Lsn(lsn), pg_constants::RELSEG_SIZE)?;
|
||||
writer.advance_last_record_lsn(Lsn(lsn));
|
||||
assert_eq!(
|
||||
tline.get_relish_size(TESTREL_A, Lsn(lsn))?.unwrap(),
|
||||
pg_constants::RELSEG_SIZE
|
||||
@@ -587,8 +601,8 @@ mod tests {
|
||||
|
||||
// Truncate another block
|
||||
lsn += 0x10;
|
||||
tline.put_truncation(TESTREL_A, Lsn(lsn), pg_constants::RELSEG_SIZE - 1)?;
|
||||
tline.advance_last_record_lsn(Lsn(lsn));
|
||||
writer.put_truncation(TESTREL_A, Lsn(lsn), pg_constants::RELSEG_SIZE - 1)?;
|
||||
writer.advance_last_record_lsn(Lsn(lsn));
|
||||
assert_eq!(
|
||||
tline.get_relish_size(TESTREL_A, Lsn(lsn))?.unwrap(),
|
||||
pg_constants::RELSEG_SIZE - 1
|
||||
@@ -600,8 +614,8 @@ mod tests {
|
||||
let mut size: i32 = 3000;
|
||||
while size >= 0 {
|
||||
lsn += 0x10;
|
||||
tline.put_truncation(TESTREL_A, Lsn(lsn), size as u32)?;
|
||||
tline.advance_last_record_lsn(Lsn(lsn));
|
||||
writer.put_truncation(TESTREL_A, Lsn(lsn), size as u32)?;
|
||||
writer.advance_last_record_lsn(Lsn(lsn));
|
||||
assert_eq!(
|
||||
tline.get_relish_size(TESTREL_A, Lsn(lsn))?.unwrap(),
|
||||
size as u32
|
||||
@@ -621,16 +635,17 @@ mod tests {
|
||||
fn test_list_rels_drop() -> Result<()> {
|
||||
let repo = RepoHarness::create("test_list_rels_drop")?.load();
|
||||
let tline = repo.create_empty_timeline(TIMELINE_ID)?;
|
||||
let writer = tline.writer();
|
||||
const TESTDB: u32 = 111;
|
||||
|
||||
// Import initial dummy checkpoint record, otherwise the get_timeline() call
|
||||
// after branching fails below
|
||||
tline.put_page_image(RelishTag::Checkpoint, 0, Lsn(0x10), ZERO_CHECKPOINT.clone())?;
|
||||
writer.put_page_image(RelishTag::Checkpoint, 0, Lsn(0x10), ZERO_CHECKPOINT.clone())?;
|
||||
|
||||
// Create a relation on the timeline
|
||||
tline.put_page_image(TESTREL_A, 0, Lsn(0x20), TEST_IMG("foo blk 0 at 2"))?;
|
||||
writer.put_page_image(TESTREL_A, 0, Lsn(0x20), TEST_IMG("foo blk 0 at 2"))?;
|
||||
|
||||
tline.advance_last_record_lsn(Lsn(0x30));
|
||||
writer.advance_last_record_lsn(Lsn(0x30));
|
||||
|
||||
// Check that list_rels() lists it after LSN 2, but no before it
|
||||
assert!(!tline.list_rels(0, TESTDB, Lsn(0x10))?.contains(&TESTREL_A));
|
||||
@@ -640,14 +655,17 @@ mod tests {
|
||||
// Create a branch, check that the relation is visible there
|
||||
repo.branch_timeline(TIMELINE_ID, NEW_TIMELINE_ID, Lsn(0x30))?;
|
||||
let newtline = repo.get_timeline(NEW_TIMELINE_ID)?;
|
||||
let new_writer = newtline.writer();
|
||||
|
||||
assert!(newtline
|
||||
.list_rels(0, TESTDB, Lsn(0x30))?
|
||||
.contains(&TESTREL_A));
|
||||
|
||||
// Drop it on the branch
|
||||
newtline.drop_relish(TESTREL_A, Lsn(0x40))?;
|
||||
newtline.advance_last_record_lsn(Lsn(0x40));
|
||||
new_writer.drop_relish(TESTREL_A, Lsn(0x40))?;
|
||||
new_writer.advance_last_record_lsn(Lsn(0x40));
|
||||
|
||||
drop(new_writer);
|
||||
|
||||
// Check that it's no longer listed on the branch after the point where it was dropped
|
||||
assert!(newtline
|
||||
@@ -675,28 +693,30 @@ mod tests {
|
||||
fn test_branch() -> Result<()> {
|
||||
let repo = RepoHarness::create("test_branch")?.load();
|
||||
let tline = repo.create_empty_timeline(TIMELINE_ID)?;
|
||||
let writer = tline.writer();
|
||||
|
||||
// Import initial dummy checkpoint record, otherwise the get_timeline() call
|
||||
// after branching fails below
|
||||
tline.put_page_image(RelishTag::Checkpoint, 0, Lsn(0x10), ZERO_CHECKPOINT.clone())?;
|
||||
writer.put_page_image(RelishTag::Checkpoint, 0, Lsn(0x10), ZERO_CHECKPOINT.clone())?;
|
||||
|
||||
// Create a relation on the timeline
|
||||
tline.put_page_image(TESTREL_A, 0, Lsn(0x20), TEST_IMG("foo blk 0 at 2"))?;
|
||||
tline.put_page_image(TESTREL_A, 0, Lsn(0x30), TEST_IMG("foo blk 0 at 3"))?;
|
||||
tline.put_page_image(TESTREL_A, 0, Lsn(0x40), TEST_IMG("foo blk 0 at 4"))?;
|
||||
writer.put_page_image(TESTREL_A, 0, Lsn(0x20), TEST_IMG("foo blk 0 at 2"))?;
|
||||
writer.put_page_image(TESTREL_A, 0, Lsn(0x30), TEST_IMG("foo blk 0 at 3"))?;
|
||||
writer.put_page_image(TESTREL_A, 0, Lsn(0x40), TEST_IMG("foo blk 0 at 4"))?;
|
||||
|
||||
// Create another relation
|
||||
tline.put_page_image(TESTREL_B, 0, Lsn(0x20), TEST_IMG("foobar blk 0 at 2"))?;
|
||||
writer.put_page_image(TESTREL_B, 0, Lsn(0x20), TEST_IMG("foobar blk 0 at 2"))?;
|
||||
|
||||
tline.advance_last_record_lsn(Lsn(0x40));
|
||||
writer.advance_last_record_lsn(Lsn(0x40));
|
||||
assert_current_logical_size(&tline, Lsn(0x40));
|
||||
|
||||
// Branch the history, modify relation differently on the new timeline
|
||||
repo.branch_timeline(TIMELINE_ID, NEW_TIMELINE_ID, Lsn(0x30))?;
|
||||
let newtline = repo.get_timeline(NEW_TIMELINE_ID)?;
|
||||
let new_writer = newtline.writer();
|
||||
|
||||
newtline.put_page_image(TESTREL_A, 0, Lsn(0x40), TEST_IMG("bar blk 0 at 4"))?;
|
||||
newtline.advance_last_record_lsn(Lsn(0x40));
|
||||
new_writer.put_page_image(TESTREL_A, 0, Lsn(0x40), TEST_IMG("bar blk 0 at 4"))?;
|
||||
new_writer.advance_last_record_lsn(Lsn(0x40));
|
||||
|
||||
// Check page contents on both branches
|
||||
assert_eq!(
|
||||
|
||||
@@ -34,7 +34,7 @@ static ZERO_PAGE: Bytes = Bytes::from_static(&[0u8; 8192]);
|
||||
///
|
||||
pub fn import_timeline_from_postgres_datadir(
|
||||
path: &Path,
|
||||
timeline: &dyn Timeline,
|
||||
writer: &dyn TimelineWriter,
|
||||
lsn: Lsn,
|
||||
) -> Result<()> {
|
||||
// Scan 'global'
|
||||
@@ -44,10 +44,10 @@ pub fn import_timeline_from_postgres_datadir(
|
||||
None => continue,
|
||||
|
||||
Some("pg_control") => {
|
||||
import_control_file(timeline, lsn, &direntry.path())?;
|
||||
import_control_file(writer, lsn, &direntry.path())?;
|
||||
}
|
||||
Some("pg_filenode.map") => import_nonrel_file(
|
||||
timeline,
|
||||
writer,
|
||||
lsn,
|
||||
RelishTag::FileNodeMap {
|
||||
spcnode: pg_constants::GLOBALTABLESPACE_OID,
|
||||
@@ -59,7 +59,7 @@ pub fn import_timeline_from_postgres_datadir(
|
||||
// Load any relation files into the page server
|
||||
_ => import_relfile(
|
||||
&direntry.path(),
|
||||
timeline,
|
||||
writer,
|
||||
lsn,
|
||||
pg_constants::GLOBALTABLESPACE_OID,
|
||||
0,
|
||||
@@ -86,7 +86,7 @@ pub fn import_timeline_from_postgres_datadir(
|
||||
|
||||
Some("PG_VERSION") => continue,
|
||||
Some("pg_filenode.map") => import_nonrel_file(
|
||||
timeline,
|
||||
writer,
|
||||
lsn,
|
||||
RelishTag::FileNodeMap {
|
||||
spcnode: pg_constants::DEFAULTTABLESPACE_OID,
|
||||
@@ -98,7 +98,7 @@ pub fn import_timeline_from_postgres_datadir(
|
||||
// Load any relation files into the page server
|
||||
_ => import_relfile(
|
||||
&direntry.path(),
|
||||
timeline,
|
||||
writer,
|
||||
lsn,
|
||||
pg_constants::DEFAULTTABLESPACE_OID,
|
||||
dboid,
|
||||
@@ -108,24 +108,24 @@ pub fn import_timeline_from_postgres_datadir(
|
||||
}
|
||||
for entry in fs::read_dir(path.join("pg_xact"))? {
|
||||
let entry = entry?;
|
||||
import_slru_file(timeline, lsn, SlruKind::Clog, &entry.path())?;
|
||||
import_slru_file(writer, lsn, SlruKind::Clog, &entry.path())?;
|
||||
}
|
||||
for entry in fs::read_dir(path.join("pg_multixact").join("members"))? {
|
||||
let entry = entry?;
|
||||
import_slru_file(timeline, lsn, SlruKind::MultiXactMembers, &entry.path())?;
|
||||
import_slru_file(writer, lsn, SlruKind::MultiXactMembers, &entry.path())?;
|
||||
}
|
||||
for entry in fs::read_dir(path.join("pg_multixact").join("offsets"))? {
|
||||
let entry = entry?;
|
||||
import_slru_file(timeline, lsn, SlruKind::MultiXactOffsets, &entry.path())?;
|
||||
import_slru_file(writer, lsn, SlruKind::MultiXactOffsets, &entry.path())?;
|
||||
}
|
||||
for entry in fs::read_dir(path.join("pg_twophase"))? {
|
||||
let entry = entry?;
|
||||
let xid = u32::from_str_radix(entry.path().to_str().unwrap(), 16)?;
|
||||
import_nonrel_file(timeline, lsn, RelishTag::TwoPhase { xid }, &entry.path())?;
|
||||
import_nonrel_file(writer, lsn, RelishTag::TwoPhase { xid }, &entry.path())?;
|
||||
}
|
||||
// TODO: Scan pg_tblspc
|
||||
|
||||
timeline.advance_last_record_lsn(lsn);
|
||||
writer.advance_last_record_lsn(lsn);
|
||||
|
||||
Ok(())
|
||||
}
|
||||
@@ -133,7 +133,7 @@ pub fn import_timeline_from_postgres_datadir(
|
||||
// subroutine of import_timeline_from_postgres_datadir(), to load one relation file.
|
||||
fn import_relfile(
|
||||
path: &Path,
|
||||
timeline: &dyn Timeline,
|
||||
timeline: &dyn TimelineWriter,
|
||||
lsn: Lsn,
|
||||
spcoid: Oid,
|
||||
dboid: Oid,
|
||||
@@ -191,7 +191,7 @@ fn import_relfile(
|
||||
/// are just slurped into the repository as one blob.
|
||||
///
|
||||
fn import_nonrel_file(
|
||||
timeline: &dyn Timeline,
|
||||
timeline: &dyn TimelineWriter,
|
||||
lsn: Lsn,
|
||||
tag: RelishTag,
|
||||
path: &Path,
|
||||
@@ -212,7 +212,7 @@ fn import_nonrel_file(
|
||||
///
|
||||
/// The control file is imported as is, but we also extract the checkpoint record
|
||||
/// from it and store it separated.
|
||||
fn import_control_file(timeline: &dyn Timeline, lsn: Lsn, path: &Path) -> Result<()> {
|
||||
fn import_control_file(timeline: &dyn TimelineWriter, lsn: Lsn, path: &Path) -> Result<()> {
|
||||
let mut file = File::open(path)?;
|
||||
let mut buffer = Vec::new();
|
||||
// read the whole file
|
||||
@@ -239,7 +239,12 @@ fn import_control_file(timeline: &dyn Timeline, lsn: Lsn, path: &Path) -> Result
|
||||
///
|
||||
/// Import an SLRU segment file
|
||||
///
|
||||
fn import_slru_file(timeline: &dyn Timeline, lsn: Lsn, slru: SlruKind, path: &Path) -> Result<()> {
|
||||
fn import_slru_file(
|
||||
timeline: &dyn TimelineWriter,
|
||||
lsn: Lsn,
|
||||
slru: SlruKind,
|
||||
path: &Path,
|
||||
) -> Result<()> {
|
||||
// Does it look like an SLRU file?
|
||||
let mut file = File::open(path)?;
|
||||
let mut buf: [u8; 8192] = [0u8; 8192];
|
||||
@@ -287,7 +292,7 @@ fn import_slru_file(timeline: &dyn Timeline, lsn: Lsn, slru: SlruKind, path: &Pa
|
||||
pub fn save_decoded_record(
|
||||
checkpoint: &mut CheckPoint,
|
||||
checkpoint_modified: &mut bool,
|
||||
timeline: &dyn Timeline,
|
||||
timeline: &dyn TimelineWriter,
|
||||
decoded: &DecodedWALRecord,
|
||||
recdata: Bytes,
|
||||
lsn: Lsn,
|
||||
@@ -493,7 +498,11 @@ pub fn save_decoded_record(
|
||||
}
|
||||
|
||||
/// Subroutine of save_decoded_record(), to handle an XLOG_DBASE_CREATE record.
|
||||
fn save_xlog_dbase_create(timeline: &dyn Timeline, lsn: Lsn, rec: &XlCreateDatabase) -> Result<()> {
|
||||
fn save_xlog_dbase_create(
|
||||
timeline: &dyn TimelineWriter,
|
||||
lsn: Lsn,
|
||||
rec: &XlCreateDatabase,
|
||||
) -> Result<()> {
|
||||
let db_id = rec.db_id;
|
||||
let tablespace_id = rec.tablespace_id;
|
||||
let src_db_id = rec.src_db_id;
|
||||
@@ -570,7 +579,11 @@ fn save_xlog_dbase_create(timeline: &dyn Timeline, lsn: Lsn, rec: &XlCreateDatab
|
||||
/// Subroutine of save_decoded_record(), to handle an XLOG_SMGR_TRUNCATE record.
|
||||
///
|
||||
/// This is the same logic as in PostgreSQL's smgr_redo() function.
|
||||
fn save_xlog_smgr_truncate(timeline: &dyn Timeline, lsn: Lsn, rec: &XlSmgrTruncate) -> Result<()> {
|
||||
fn save_xlog_smgr_truncate(
|
||||
timeline: &dyn TimelineWriter,
|
||||
lsn: Lsn,
|
||||
rec: &XlSmgrTruncate,
|
||||
) -> Result<()> {
|
||||
let spcnode = rec.rnode.spcnode;
|
||||
let dbnode = rec.rnode.dbnode;
|
||||
let relnode = rec.rnode.relnode;
|
||||
@@ -632,7 +645,7 @@ fn save_xlog_smgr_truncate(timeline: &dyn Timeline, lsn: Lsn, rec: &XlSmgrTrunca
|
||||
/// Subroutine of save_decoded_record(), to handle an XLOG_XACT_* records.
|
||||
///
|
||||
fn save_xact_record(
|
||||
timeline: &dyn Timeline,
|
||||
timeline: &dyn TimelineWriter,
|
||||
lsn: Lsn,
|
||||
parsed: &XlXactParsedRecord,
|
||||
decoded: &DecodedWALRecord,
|
||||
@@ -690,7 +703,7 @@ fn save_xact_record(
|
||||
fn save_clog_truncate_record(
|
||||
checkpoint: &mut CheckPoint,
|
||||
checkpoint_modified: &mut bool,
|
||||
timeline: &dyn Timeline,
|
||||
timeline: &dyn TimelineWriter,
|
||||
lsn: Lsn,
|
||||
xlrec: &XlClogTruncate,
|
||||
) -> Result<()> {
|
||||
@@ -752,7 +765,7 @@ fn save_clog_truncate_record(
|
||||
fn save_multixact_create_record(
|
||||
checkpoint: &mut CheckPoint,
|
||||
checkpoint_modified: &mut bool,
|
||||
timeline: &dyn Timeline,
|
||||
timeline: &dyn TimelineWriter,
|
||||
lsn: Lsn,
|
||||
xlrec: &XlMultiXactCreate,
|
||||
decoded: &DecodedWALRecord,
|
||||
@@ -831,7 +844,7 @@ fn save_multixact_create_record(
|
||||
fn save_multixact_truncate_record(
|
||||
checkpoint: &mut CheckPoint,
|
||||
checkpoint_modified: &mut bool,
|
||||
timeline: &dyn Timeline,
|
||||
timeline: &dyn TimelineWriter,
|
||||
lsn: Lsn,
|
||||
xlrec: &XlMultiXactTruncate,
|
||||
) -> Result<()> {
|
||||
@@ -871,7 +884,7 @@ fn save_multixact_truncate_record(
|
||||
}
|
||||
|
||||
fn save_relmap_page(
|
||||
timeline: &dyn Timeline,
|
||||
timeline: &dyn TimelineWriter,
|
||||
lsn: Lsn,
|
||||
xlrec: &XlRelmapUpdate,
|
||||
decoded: &DecodedWALRecord,
|
||||
|
||||
@@ -219,13 +219,15 @@ fn walreceiver_main(
|
||||
// at risk of hittind a deadlock.
|
||||
assert!(lsn.is_aligned());
|
||||
|
||||
let writer = timeline.writer();
|
||||
|
||||
let mut checkpoint_modified = false;
|
||||
|
||||
let decoded = decode_wal_record(recdata.clone());
|
||||
restore_local_repo::save_decoded_record(
|
||||
&mut checkpoint,
|
||||
&mut checkpoint_modified,
|
||||
&*timeline,
|
||||
writer.as_ref(),
|
||||
&decoded,
|
||||
recdata,
|
||||
lsn,
|
||||
@@ -235,7 +237,7 @@ fn walreceiver_main(
|
||||
if checkpoint_modified {
|
||||
let new_checkpoint_bytes = checkpoint.encode();
|
||||
|
||||
timeline.put_page_image(
|
||||
writer.put_page_image(
|
||||
RelishTag::Checkpoint,
|
||||
0,
|
||||
lsn,
|
||||
@@ -245,7 +247,7 @@ fn walreceiver_main(
|
||||
|
||||
// Now that this record has been fully handled, including updating the
|
||||
// checkpoint data, let the repository know that it is up-to-date to this LSN
|
||||
timeline.advance_last_record_lsn(lsn);
|
||||
writer.advance_last_record_lsn(lsn);
|
||||
last_rec_lsn = lsn;
|
||||
}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user