Work with smaller segments.

Split each relish into fixed-sized 10 MB segments. Separate layers are
created for each segment. This reduces the write amplification if you
have a large relation and update only parts of it; the downside is
that you have a lot more files. The 10 MB is just a guess, we should
do some modeling and testing in the future to figure out the optimal
size.

Each segment tracks the size of the segment separately. To figure out
the total size of a relish, you need to loop through the segment to
find the highest segment that's in use. That's a bit inefficient, but
will do for now. We might want to add a cache or something later.
This commit is contained in:
Heikki Linnakangas
2021-08-17 18:54:41 +03:00
parent cbeb67067c
commit 91f72fabc9
8 changed files with 388 additions and 211 deletions

View File

@@ -12,7 +12,7 @@
//! parent timeline, and the last LSN that has been written to disk.
//!
use anyhow::{bail, Context, Result};
use anyhow::{anyhow, bail, Context, Result};
use bytes::Bytes;
use lazy_static::lazy_static;
use log::*;
@@ -47,7 +47,7 @@ mod storage_layer;
use inmemory_layer::InMemoryLayer;
use layer_map::LayerMap;
use snapshot_layer::SnapshotLayer;
use storage_layer::Layer;
use storage_layer::{Layer, SegmentTag, RELISH_SEG_SIZE};
// Timeout when waiting for WAL receiver to catch up to an LSN given in a GetPage@LSN call.
static TIMEOUT: Duration = Duration::from_secs(60);
@@ -475,7 +475,9 @@ impl Timeline for LayeredTimeline {
}
let lsn = self.wait_lsn(lsn)?;
if let Some((layer, lsn)) = self.get_layer_for_read(rel, lsn)? {
let seg = SegmentTag::from_blknum(rel, blknum);
if let Some((layer, lsn)) = self.get_layer_for_read(seg, lsn)? {
layer.get_page_at_lsn(&*self.walredo_mgr, blknum, lsn)
} else {
bail!("relish {} not found at {}", rel, lsn);
@@ -491,7 +493,9 @@ impl Timeline for LayeredTimeline {
);
}
if let Some((layer, lsn)) = self.get_layer_for_read(rel, lsn)? {
let seg = SegmentTag::from_blknum(rel, blknum);
if let Some((layer, lsn)) = self.get_layer_for_read(seg, lsn)? {
layer.get_page_at_lsn(&*self.walredo_mgr, blknum, lsn)
} else {
bail!("relish {} not found at {}", rel, lsn);
@@ -508,27 +512,43 @@ impl Timeline for LayeredTimeline {
let lsn = self.wait_lsn(lsn)?;
if let Some((layer, lsn)) = self.get_layer_for_read(rel, lsn)? {
let result = layer.get_relish_size(lsn);
trace!(
"get_relish_size: rel {} at {}/{} -> {:?}",
rel,
self.timelineid,
lsn,
result
);
result
} else {
Ok(None)
let mut segno = 0;
loop {
let seg = SegmentTag { rel, segno };
let segsize;
if let Some((layer, lsn)) = self.get_layer_for_read(seg, lsn)? {
segsize = layer.get_seg_size(lsn)?;
trace!(
"get_seg_size: {} at {}/{} -> {}",
seg,
self.timelineid,
lsn,
segsize
);
} else {
if segno == 0 {
return Ok(None);
}
segsize = 0;
}
if segsize != RELISH_SEG_SIZE {
let result = segno * RELISH_SEG_SIZE + segsize;
return Ok(Some(result));
}
segno += 1;
}
}
fn get_rel_exists(&self, rel: RelishTag, lsn: Lsn) -> Result<bool> {
let lsn = self.wait_lsn(lsn)?;
let seg = SegmentTag { rel, segno: 0 };
let result;
if let Some((layer, lsn)) = self.get_layer_for_read(rel, lsn)? {
result = layer.get_rel_exists(lsn)?;
if let Some((layer, lsn)) = self.get_layer_for_read(seg, lsn)? {
result = layer.get_seg_exists(lsn)?;
} else {
result = false;
}
@@ -632,7 +652,10 @@ impl Timeline for LayeredTimeline {
rel
);
}
let layer = self.get_layer_for_write(rel, rec.lsn)?;
let seg = SegmentTag::from_blknum(rel, blknum);
let layer = self.get_layer_for_write(seg, rec.lsn)?;
layer.put_wal_record(blknum, rec)
}
@@ -643,8 +666,84 @@ impl Timeline for LayeredTimeline {
debug!("put_truncation: {} to {} blocks at {}", rel, relsize, lsn);
let layer = self.get_layer_for_write(rel, lsn)?;
layer.put_truncation(lsn, relsize)
let oldsize = self
.get_relish_size(rel, self.last_valid_lsn.load())?
.ok_or_else(|| {
anyhow!(
"attempted to truncate non-existent relish {} at {}",
rel,
lsn
)
})?;
if oldsize <= relsize {
return Ok(());
}
let old_last_seg = (oldsize - 1) / RELISH_SEG_SIZE;
let last_remain_seg = if relsize == 0 {
0
} else {
(relsize - 1) / RELISH_SEG_SIZE
};
// Unlink segments beyond the last remaining segment.
for remove_segno in (last_remain_seg + 1)..=old_last_seg {
let seg = SegmentTag {
rel,
segno: remove_segno,
};
let layer = self.get_layer_for_write(seg, lsn)?;
layer.put_unlink(lsn)?;
}
// Truncate the last remaining segment to the specified size
if relsize == 0 || relsize % RELISH_SEG_SIZE != 0 {
let seg = SegmentTag {
rel,
segno: last_remain_seg,
};
let layer = self.get_layer_for_write(seg, lsn)?;
layer.put_truncation(lsn, relsize % RELISH_SEG_SIZE)?;
}
Ok(())
}
fn put_unlink(&self, rel: RelishTag, lsn: Lsn) -> Result<()> {
trace!("put_unlink: {} at {}", rel, lsn);
if rel.is_blocky() {
let oldsize_opt = self.get_relish_size(rel, self.last_valid_lsn.load())?;
if let Some(oldsize) = oldsize_opt {
let old_last_seg = if oldsize == 0 {
0
} else {
(oldsize - 1) / RELISH_SEG_SIZE
};
// Unlink all segments
for remove_segno in 0..=old_last_seg {
let seg = SegmentTag {
rel,
segno: remove_segno,
};
let layer = self.get_layer_for_write(seg, lsn)?;
layer.put_unlink(lsn)?;
}
} else {
warn!(
"put_unlink called on non-existent relish {} at {}",
rel, lsn
);
}
} else {
let seg = SegmentTag::from_blknum(rel, 0);
let layer = self.get_layer_for_write(seg, lsn)?;
layer.put_unlink(lsn)?;
}
Ok(())
}
fn put_page_image(
@@ -663,17 +762,12 @@ impl Timeline for LayeredTimeline {
);
}
let layer = self.get_layer_for_write(rel, lsn)?;
let seg = SegmentTag::from_blknum(rel, blknum);
let layer = self.get_layer_for_write(seg, lsn)?;
layer.put_page_image(blknum, lsn, img)
}
fn put_unlink(&self, rel: RelishTag, lsn: Lsn) -> Result<()> {
trace!("put_unlink: {} at {}", rel, lsn);
let layer = self.get_layer_for_write(rel, lsn)?;
layer.put_unlink(lsn)
}
fn put_raw_data(
&self,
_tag: crate::object_key::ObjectTag,
@@ -807,7 +901,7 @@ impl LayeredTimeline {
for layer_rc in snapfiles.iter() {
info!(
"found layer {} {}-{} {} on timeline {}",
layer_rc.get_relish_tag(),
layer_rc.get_seg_tag(),
layer_rc.get_start_lsn(),
layer_rc.get_end_lsn(),
layer_rc.is_dropped(),
@@ -822,17 +916,17 @@ impl LayeredTimeline {
///
/// Get a handle to a Layer for reading.
///
/// The returned SnapshotFile might be from an ancestor timeline, if the
/// relation hasn't been updated on this timeline yet.
/// The returned Layer might be from an ancestor timeline, if the
/// segment hasn't been updated on this timeline yet.
///
fn get_layer_for_read(
&self,
rel: RelishTag,
seg: SegmentTag,
lsn: Lsn,
) -> Result<Option<(Arc<dyn Layer>, Lsn)>> {
trace!(
"get_layer_for_read called for {} at {}/{}",
rel,
seg,
self.timelineid,
lsn
);
@@ -859,7 +953,7 @@ impl LayeredTimeline {
//
// Do we have a layer on this timeline?
if let Some(layer) = layers.get(rel, lsn) {
if let Some(layer) = layers.get(seg, lsn) {
trace!(
"found layer in cache: {} {}-{}",
timeline.timelineid,
@@ -869,6 +963,11 @@ impl LayeredTimeline {
assert!(layer.get_start_lsn() <= lsn);
if layer.is_dropped() && layer.get_end_lsn() <= lsn {
// The segment was unlinked
return Ok(None);
}
return Ok(Some((layer.clone(), lsn)));
}
@@ -886,14 +985,14 @@ impl LayeredTimeline {
///
/// Get a handle to the latest layer for appending.
///
fn get_layer_for_write(&self, rel: RelishTag, lsn: Lsn) -> Result<Arc<dyn Layer>> {
fn get_layer_for_write(&self, seg: SegmentTag, lsn: Lsn) -> Result<Arc<dyn Layer>> {
if lsn < self.last_valid_lsn.load() {
bail!("cannot modify relation after advancing last_valid_lsn");
}
// Look up the correct layer.
let layers = self.layers.lock().unwrap();
if let Some(layer) = layers.get(rel, lsn) {
if let Some(layer) = layers.get(seg, lsn) {
// If it's writeable, good, return it.
if !layer.is_frozen() {
return Ok(Arc::clone(&layer));
@@ -912,7 +1011,7 @@ impl LayeredTimeline {
drop(layers);
let layer;
if let Some((prev_layer, _prev_lsn)) = self.get_layer_for_read(rel, lsn)? {
if let Some((prev_layer, _prev_lsn)) = self.get_layer_for_read(seg, lsn)? {
// Create new entry after the previous one.
let lsn;
if prev_layer.get_timeline_id() != self.timelineid {
@@ -920,7 +1019,7 @@ impl LayeredTimeline {
lsn = self.ancestor_lsn;
trace!(
"creating file for write for {} at branch point {}/{}",
rel,
seg,
self.timelineid,
lsn
);
@@ -928,7 +1027,7 @@ impl LayeredTimeline {
lsn = prev_layer.get_end_lsn();
trace!(
"creating file for write for {} after previous layer {}/{}",
rel,
seg,
self.timelineid,
lsn
);
@@ -951,12 +1050,12 @@ impl LayeredTimeline {
// New relation.
trace!(
"creating layer for write for new rel {} at {}/{}",
rel,
seg,
self.timelineid,
lsn
);
layer = InMemoryLayer::create(self.conf, self.timelineid, self.tenantid, rel, lsn)?;
layer = InMemoryLayer::create(self.conf, self.timelineid, self.tenantid, seg, lsn)?;
}
let mut layers = self.layers.lock().unwrap();
@@ -1055,7 +1154,7 @@ impl LayeredTimeline {
for new_layer in new_layers {
trace!(
"freeze returned layer {} {}-{}",
new_layer.get_relish_tag(),
new_layer.get_seg_tag(),
new_layer.get_start_lsn(),
new_layer.get_end_lsn()
);
@@ -1129,8 +1228,8 @@ impl LayeredTimeline {
let mut layers_to_remove: Vec<Arc<dyn Layer>> = Vec::new();
// Determine for each file if it needs to be retained
'outer: for ((rel, _lsn), l) in layers.inner.iter() {
if rel.is_relation() {
'outer: for ((seg, _lsn), l) in layers.inner.iter() {
if seg.rel.is_relation() {
result.snapshot_relfiles_total += 1;
} else {
result.snapshot_nonrelfiles_total += 1;
@@ -1140,12 +1239,12 @@ impl LayeredTimeline {
if l.get_end_lsn() > cutoff {
info!(
"keeping {} {}-{} because it's newer than cutoff {}",
rel,
seg,
l.get_start_lsn(),
l.get_end_lsn(),
cutoff
);
if rel.is_relation() {
if seg.rel.is_relation() {
result.snapshot_relfiles_needed_by_cutoff += 1;
} else {
result.snapshot_nonrelfiles_needed_by_cutoff += 1;
@@ -1159,12 +1258,12 @@ impl LayeredTimeline {
if l.get_start_lsn() <= *retain_lsn && *retain_lsn <= l.get_end_lsn() {
info!(
"keeping {} {}-{} because it's needed by branch point {}",
rel,
seg,
l.get_start_lsn(),
l.get_end_lsn(),
*retain_lsn
);
if rel.is_relation() {
if seg.rel.is_relation() {
result.snapshot_relfiles_needed_by_branches += 1;
} else {
result.snapshot_nonrelfiles_needed_by_branches += 1;
@@ -1174,8 +1273,8 @@ impl LayeredTimeline {
}
// Unless the relation was dropped, is there a later snapshot file for this relation?
if !l.is_dropped() && !layers.newer_layer_exists(l.get_relish_tag(), l.get_end_lsn()) {
if rel.is_relation() {
if !l.is_dropped() && !layers.newer_layer_exists(l.get_seg_tag(), l.get_end_lsn()) {
if seg.rel.is_relation() {
result.snapshot_relfiles_not_updated += 1;
} else {
result.snapshot_nonrelfiles_not_updated += 1;
@@ -1186,7 +1285,7 @@ impl LayeredTimeline {
// We didn't find any reason to keep this file, so remove it.
info!(
"garbage collecting {} {}-{} {}",
l.get_relish_tag(),
l.get_seg_tag(),
l.get_start_lsn(),
l.get_end_lsn(),
l.is_dropped()
@@ -1202,13 +1301,13 @@ impl LayeredTimeline {
layers.remove(&*doomed_layer);
if doomed_layer.is_dropped() {
if doomed_layer.get_relish_tag().is_relation() {
if doomed_layer.get_seg_tag().rel.is_relation() {
result.snapshot_relfiles_dropped += 1;
} else {
result.snapshot_nonrelfiles_dropped += 1;
}
} else {
if doomed_layer.get_relish_tag().is_relation() {
if doomed_layer.get_seg_tag().rel.is_relation() {
result.snapshot_relfiles_removed += 1;
} else {
result.snapshot_nonrelfiles_removed += 1;

View File

@@ -7,22 +7,22 @@ memory. Every now and then, the accumulated changes are written out to
new files.
The files are called "snapshot files". Each snapshot file corresponds
to one PostgreSQL relation fork. The snapshot files for each timeline
are stored in the timeline's subdirectory under
to one 10 MB slice of a PostgreSQL relation fork. The snapshot files
for each timeline are stored in the timeline's subdirectory under
.zenith/tenants/<tenantid>/timelines.
The files are named like this:
rel_<spcnode>_<dbnode>_<relnode>_<forknum>_<start LSN>_<end LSN>
rel_<spcnode>_<dbnode>_<relnode>_<forknum>_<segno>_<start LSN>_<end LSN>
For example:
rel_1663_13990_2609_0_000000000169C348_0000000001702000
rel_1663_13990_2609_0_10_000000000169C348_0000000001702000
Some non-relation files are also stored in repository. For example,
a CLOG segment would be named like this:
pg_xact_0000_00000000198B06B0_00000000198C2550
pg_xact_0000_0_00000000198B06B0_00000000198C2550
There is no difference in how the relation and non-relation files are
managed, except that the first part of file names is different.
@@ -38,7 +38,7 @@ version of the relation in the LSN range.
If a file has been dropped, the last snapshot file for it is created
with the _DROPPED suffix, e.g.
rel_1663_13990_2609_0_000000000169C348_0000000001702000_DROPPED
rel_1663_13990_2609_0_10_000000000169C348_0000000001702000_DROPPED
In addition to the relations, with "rel_*" prefix, we use the same
format for storing various smaller files from the PostgreSQL data
@@ -51,14 +51,14 @@ relation in the storage"
The full path of a snapshot file looks like this:
.zenith/tenants/941ddc8604413b88b3d208bddf90396c/timelines/4af489b06af8eed9e27a841775616962/rel_1663_13990_2609_0_000000000169C348_0000000001702000
.zenith/tenants/941ddc8604413b88b3d208bddf90396c/timelines/4af489b06af8eed9e27a841775616962/rel_1663_13990_2609_0_10_000000000169C348_0000000001702000
For simplicity, the examples below use a simplified notation for the
paths. The tenant ID is left out, the timeline ID is replaced with
the human-readable branch name, and spcnode+dbnode+relnode+forkum with
a human-readable table name. The LSNs are also shorter. For example, a
snapshot file for 'orders' table on 'main' branch, with LSN range
100-200 would be:
the human-readable branch name, and spcnode+dbnode+relnode+forkum+segno
with a human-readable table name. The LSNs are also shorter. For
example, a snapshot file for 'orders' table on 'main' branch, with LSN
range 100-200 would be:
main/orders_100_200

View File

@@ -3,10 +3,8 @@
//! are held in a BTreeMap, and there's another BTreeMap to track the size of the relation.
//!
use crate::layered_repository::storage_layer::Layer;
use crate::layered_repository::storage_layer::PageVersion;
use crate::layered_repository::storage_layer::{Layer, PageVersion, SegmentTag, RELISH_SEG_SIZE};
use crate::layered_repository::SnapshotLayer;
use crate::relish::*;
use crate::repository::WALRecord;
use crate::walredo::WalRedoManager;
use crate::PageServerConf;
@@ -26,7 +24,7 @@ pub struct InMemoryLayer {
conf: &'static PageServerConf,
tenantid: ZTenantId,
timelineid: ZTimelineId,
rel: RelishTag,
seg: SegmentTag,
///
/// This layer contains all the changes from 'start_lsn'. The
@@ -51,9 +49,9 @@ pub struct InMemoryLayerInner {
page_versions: BTreeMap<(u32, Lsn), PageVersion>,
///
/// `relsizes` tracks the size of the relation at different points in time.
/// `segsizes` tracks the size of the segment at different points in time.
///
relsizes: BTreeMap<Lsn, u32>,
segsizes: BTreeMap<Lsn, u32>,
}
impl Layer for InMemoryLayer {
@@ -65,8 +63,8 @@ impl Layer for InMemoryLayer {
return self.timelineid;
}
fn get_relish_tag(&self) -> RelishTag {
return self.rel;
fn get_seg_tag(&self) -> SegmentTag {
return self.seg;
}
fn get_start_lsn(&self) -> Lsn {
@@ -74,7 +72,13 @@ impl Layer for InMemoryLayer {
}
fn get_end_lsn(&self) -> Lsn {
return Lsn(u64::MAX);
let inner = self.inner.lock().unwrap();
if let Some(drop_lsn) = inner.drop_lsn {
drop_lsn
} else {
Lsn(u64::MAX)
}
}
fn is_dropped(&self) -> bool {
@@ -94,6 +98,8 @@ impl Layer for InMemoryLayer {
let mut page_img: Option<Bytes> = None;
let mut need_base_image_lsn: Option<Lsn> = Some(lsn);
assert!(self.seg.blknum_in_seg(blknum));
{
let inner = self.inner.lock().unwrap();
let minkey = (blknum, Lsn(0));
@@ -132,12 +138,12 @@ impl Layer for InMemoryLayer {
// but never writes the page.
//
// Would be nice to detect that situation better.
warn!("Page {} blk {} at {} not found", self.rel, blknum, lsn);
warn!("Page {} blk {} at {} not found", self.seg.rel, blknum, lsn);
return Ok(ZERO_PAGE.clone());
}
bail!(
"No base image found for page {} blk {} at {}/{}",
self.rel,
self.seg.rel,
blknum,
self.timelineid,
lsn
@@ -150,14 +156,14 @@ impl Layer for InMemoryLayer {
trace!(
"found page image for blk {} in {} at {}/{}, no WAL redo required",
blknum,
self.rel,
self.seg.rel,
self.timelineid,
lsn
);
Ok(img)
} else {
// FIXME: this ought to be an error?
warn!("Page {} blk {} at {} not found", self.rel, blknum, lsn);
warn!("Page {} blk {} at {} not found", self.seg.rel, blknum, lsn);
Ok(ZERO_PAGE.clone())
}
} else {
@@ -169,7 +175,7 @@ impl Layer for InMemoryLayer {
// FIXME: this ought to be an error?
warn!(
"Base image for page {}/{} at {} not found, but got {} WAL records",
self.rel,
self.seg.rel,
blknum,
lsn,
records.len()
@@ -177,11 +183,11 @@ impl Layer for InMemoryLayer {
Ok(ZERO_PAGE.clone())
} else {
if page_img.is_some() {
trace!("found {} WAL records and a base image for blk {} in {} at {}/{}, performing WAL redo", records.len(), blknum, self.rel, self.timelineid, lsn);
trace!("found {} WAL records and a base image for blk {} in {} at {}/{}, performing WAL redo", records.len(), blknum, self.seg.rel, self.timelineid, lsn);
} else {
trace!("found {} WAL records that will init the page for blk {} in {} at {}/{}, performing WAL redo", records.len(), blknum, self.rel, self.timelineid, lsn);
trace!("found {} WAL records that will init the page for blk {} in {} at {}/{}, performing WAL redo", records.len(), blknum, self.seg.rel, self.timelineid, lsn);
}
let img = walredo_mgr.request_redo(self.rel, blknum, lsn, page_img, records)?;
let img = walredo_mgr.request_redo(self.seg.rel, blknum, lsn, page_img, records)?;
self.put_page_image(blknum, lsn, img.clone())?;
@@ -191,26 +197,26 @@ impl Layer for InMemoryLayer {
}
/// Get size of the relation at given LSN
fn get_relish_size(&self, lsn: Lsn) -> Result<Option<u32>> {
fn get_seg_size(&self, lsn: Lsn) -> Result<u32> {
// Scan the BTreeMap backwards, starting from the given entry.
let inner = self.inner.lock().unwrap();
let mut iter = inner.relsizes.range((Included(&Lsn(0)), Included(&lsn)));
let mut iter = inner.segsizes.range((Included(&Lsn(0)), Included(&lsn)));
if let Some((_entry_lsn, entry)) = iter.next_back() {
let result = *entry;
drop(inner);
trace!("get_relish_size: {} at {} -> {}", self.rel, lsn, result);
Ok(Some(result))
trace!("get_seg_size: {} at {} -> {}", self.seg, lsn, result);
Ok(result)
} else {
Ok(None)
bail!("No size found for {} at {} in memory", self.seg, lsn);
}
}
/// Does this relation exist at given LSN?
fn get_rel_exists(&self, lsn: Lsn) -> Result<bool> {
/// Does this segment exist at given LSN?
fn get_seg_exists(&self, lsn: Lsn) -> Result<bool> {
let inner = self.inner.lock().unwrap();
// Is the requested LSN after the rel was dropped?
// Is the requested LSN after the segment was dropped?
if let Some(drop_lsn) = inner.drop_lsn {
if lsn >= drop_lsn {
return Ok(false);
@@ -226,10 +232,12 @@ impl Layer for InMemoryLayer {
/// Common subroutine of the public put_wal_record() and put_page_image() functions.
/// Adds the page version to the in-memory tree
fn put_page_version(&self, blknum: u32, lsn: Lsn, pv: PageVersion) -> Result<()> {
assert!(self.seg.blknum_in_seg(blknum));
trace!(
"put_page_version blk {} of {} at {}/{}",
blknum,
self.rel,
self.seg.rel,
self.timelineid,
lsn
);
@@ -240,14 +248,16 @@ impl Layer for InMemoryLayer {
if old.is_some() {
// We already had an entry for this LSN. That's odd..
warn!(
"Page version of rel {:?} blk {} at {} already exists",
self.rel, blknum, lsn
"Page version of rel {} blk {} at {} already exists",
self.seg.rel, blknum, lsn
);
}
// Also update the relation size, if this extended the relation.
if self.rel.is_blocky() {
let mut iter = inner.relsizes.range((Included(&Lsn(0)), Included(&lsn)));
if self.seg.rel.is_blocky() {
let newsize = blknum - self.seg.segno * RELISH_SEG_SIZE + 1;
let mut iter = inner.segsizes.range((Included(&Lsn(0)), Included(&lsn)));
let oldsize;
if let Some((_entry_lsn, entry)) = iter.next_back() {
@@ -256,15 +266,15 @@ impl Layer for InMemoryLayer {
oldsize = 0;
//bail!("No old size found for {} at {}", self.tag, lsn);
}
if blknum >= oldsize {
if newsize > oldsize {
trace!(
"enlarging relation {} from {} to {} blocks at {}",
self.rel,
"enlarging segment {} from {} to {} blocks at {}",
self.seg,
oldsize,
blknum + 1,
newsize,
lsn
);
inner.relsizes.insert(lsn, blknum + 1);
inner.segsizes.insert(lsn, newsize);
}
}
@@ -272,9 +282,9 @@ impl Layer for InMemoryLayer {
}
/// Remember that the relation was truncated at given LSN
fn put_truncation(&self, lsn: Lsn, relsize: u32) -> anyhow::Result<()> {
fn put_truncation(&self, lsn: Lsn, segsize: u32) -> anyhow::Result<()> {
let mut inner = self.inner.lock().unwrap();
let old = inner.relsizes.insert(lsn, relsize);
let old = inner.segsizes.insert(lsn, segsize);
if old.is_some() {
// We already had an entry for this LSN. That's odd..
@@ -291,7 +301,7 @@ impl Layer for InMemoryLayer {
assert!(inner.drop_lsn.is_none());
inner.drop_lsn = Some(lsn);
info!("dropped relation {} at {}", self.rel, lsn);
info!("dropped segment {} at {}", self.seg, lsn);
Ok(())
}
@@ -314,7 +324,7 @@ impl Layer for InMemoryLayer {
) -> Result<Vec<Arc<dyn Layer>>> {
info!(
"freezing in memory layer for {} on timeline {} at {}",
self.rel, self.timelineid, cutoff_lsn
self.seg, self.timelineid, cutoff_lsn
);
let inner = self.inner.lock().unwrap();
@@ -334,17 +344,17 @@ impl Layer for InMemoryLayer {
// Divide all the page versions into old and new at the 'end_lsn' cutoff point.
let mut before_page_versions;
let mut before_relsizes;
let mut before_segsizes;
let mut after_page_versions;
let mut after_relsizes;
let mut after_segsizes;
if !dropped {
before_relsizes = BTreeMap::new();
after_relsizes = BTreeMap::new();
for (lsn, size) in inner.relsizes.iter() {
before_segsizes = BTreeMap::new();
after_segsizes = BTreeMap::new();
for (lsn, size) in inner.segsizes.iter() {
if *lsn > end_lsn {
after_relsizes.insert(*lsn, *size);
after_segsizes.insert(*lsn, *size);
} else {
before_relsizes.insert(*lsn, *size);
before_segsizes.insert(*lsn, *size);
}
}
@@ -359,8 +369,8 @@ impl Layer for InMemoryLayer {
}
} else {
before_page_versions = inner.page_versions.clone();
before_relsizes = inner.relsizes.clone();
after_relsizes = BTreeMap::new();
before_segsizes = inner.segsizes.clone();
after_segsizes = BTreeMap::new();
after_page_versions = BTreeMap::new();
}
@@ -372,19 +382,19 @@ impl Layer for InMemoryLayer {
self.conf,
self.timelineid,
self.tenantid,
self.rel,
self.seg,
self.start_lsn,
end_lsn,
dropped,
before_page_versions,
before_relsizes,
before_segsizes,
)?;
let mut result: Vec<Arc<dyn Layer>> = Vec::new();
// If there were any page versions after the cutoff, initialize a new in-memory layer
// to hold them
if !after_relsizes.is_empty() || !after_page_versions.is_empty() {
info!("created new in-mem layer for {} {}-", self.rel, end_lsn);
if !after_segsizes.is_empty() || !after_page_versions.is_empty() {
info!("created new in-mem layer for {} {}-", self.seg, end_lsn);
let new_layer = Self::copy_snapshot(
self.conf,
@@ -396,7 +406,7 @@ impl Layer for InMemoryLayer {
)?;
let mut new_inner = new_layer.inner.lock().unwrap();
new_inner.page_versions.append(&mut after_page_versions);
new_inner.relsizes.append(&mut after_relsizes);
new_inner.segsizes.append(&mut after_segsizes);
drop(new_inner);
result.push(Arc::new(new_layer));
@@ -425,12 +435,12 @@ impl InMemoryLayer {
conf: &'static PageServerConf,
timelineid: ZTimelineId,
tenantid: ZTenantId,
rel: RelishTag,
seg: SegmentTag,
start_lsn: Lsn,
) -> Result<InMemoryLayer> {
trace!(
"initializing new empty InMemoryLayer for writing {} on timeline {} at {}",
rel,
seg,
timelineid,
start_lsn
);
@@ -439,12 +449,12 @@ impl InMemoryLayer {
conf,
timelineid,
tenantid,
rel,
seg,
start_lsn,
inner: Mutex::new(InMemoryLayerInner {
drop_lsn: None,
page_versions: BTreeMap::new(),
relsizes: BTreeMap::new(),
segsizes: BTreeMap::new(),
}),
})
}
@@ -463,26 +473,27 @@ impl InMemoryLayer {
) -> Result<InMemoryLayer> {
trace!(
"initializing new InMemoryLayer for writing {} on timeline {} at {}",
src.get_relish_tag(),
src.get_seg_tag(),
timelineid,
lsn
);
let mut page_versions = BTreeMap::new();
let mut relsizes = BTreeMap::new();
let mut segsizes = BTreeMap::new();
let seg = src.get_seg_tag();
let startblk;
let size;
if src.get_relish_tag().is_blocky() {
if let Some(sz) = src.get_relish_size(lsn)? {
relsizes.insert(lsn, sz);
size = sz;
} else {
bail!("no size found or {} at {}", src.get_relish_tag(), lsn);
}
if seg.rel.is_blocky() {
size = src.get_seg_size(lsn)?;
segsizes.insert(lsn, size);
startblk = seg.segno * RELISH_SEG_SIZE;
} else {
size = 1;
startblk = 0;
}
for blknum in 0..size {
for blknum in startblk..(startblk + size) {
let img = src.get_page_at_lsn(walredo_mgr, blknum, lsn)?;
let pv = PageVersion {
page_image: Some(img),
@@ -495,12 +506,12 @@ impl InMemoryLayer {
conf,
timelineid,
tenantid,
rel: src.get_relish_tag(),
seg: src.get_seg_tag(),
start_lsn: lsn,
inner: Mutex::new(InMemoryLayerInner {
drop_lsn: None,
page_versions: page_versions,
relsizes: relsizes,
segsizes: segsizes,
}),
})
}
@@ -510,12 +521,12 @@ impl InMemoryLayer {
pub fn dump(&self) -> String {
let mut result = format!(
"----- inmemory layer for {} {}-> ----\n",
self.rel, self.start_lsn
self.seg, self.start_lsn
);
let inner = self.inner.lock().unwrap();
for (k, v) in inner.relsizes.iter() {
for (k, v) in inner.segsizes.iter() {
result += &format!("{}: {}\n", k, v);
}
for (k, v) in inner.page_versions.iter() {

View File

@@ -9,7 +9,7 @@
//! new snapshot layers and corresponding files are written to disk.
//!
use crate::layered_repository::storage_layer::Layer;
use crate::layered_repository::storage_layer::{Layer, SegmentTag};
use crate::relish::*;
use anyhow::Result;
use log::*;
@@ -19,20 +19,20 @@ use std::ops::Bound::Included;
use std::sync::Arc;
use zenith_utils::lsn::Lsn;
/// LayerMap is a BTreeMap keyed by RelishTag and the layer's start LSN.
/// LayerMap is a BTreeMap keyed by SegmentTag and the layer's start LSN.
/// It provides a couple of convenience functions over a plain BTreeMap
pub struct LayerMap {
pub inner: BTreeMap<(RelishTag, Lsn), Arc<dyn Layer>>,
pub inner: BTreeMap<(SegmentTag, Lsn), Arc<dyn Layer>>,
}
impl LayerMap {
///
/// Look up using the given rel tag and LSN. This differs from a plain
/// Look up using the given segment tag and LSN. This differs from a plain
/// key-value lookup in that if there is any layer that covers the
/// given LSN, or precedes the given LSN, it is returned. In other words,
/// you don't need to know the exact start LSN of the layer.
///
pub fn get(&self, tag: RelishTag, lsn: Lsn) -> Option<Arc<dyn Layer>> {
pub fn get(&self, tag: SegmentTag, lsn: Lsn) -> Option<Arc<dyn Layer>> {
let startkey = (tag, Lsn(0));
let endkey = (tag, lsn);
@@ -48,32 +48,32 @@ impl LayerMap {
}
pub fn insert(&mut self, layer: Arc<dyn Layer>) {
let rel = layer.get_relish_tag();
let seg = layer.get_seg_tag();
let start_lsn = layer.get_start_lsn();
self.inner.insert((rel, start_lsn), Arc::clone(&layer));
self.inner.insert((seg, start_lsn), Arc::clone(&layer));
}
pub fn remove(&mut self, layer: &dyn Layer) {
let rel = layer.get_relish_tag();
let seg = layer.get_seg_tag();
let start_lsn = layer.get_start_lsn();
self.inner.remove(&(rel, start_lsn));
self.inner.remove(&(seg, start_lsn));
}
pub fn list_rels(&self, spcnode: u32, dbnode: u32) -> Result<HashSet<RelTag>> {
let mut rels: HashSet<RelTag> = HashSet::new();
// Scan the timeline directory to get all rels in this timeline.
for ((rel, _lsn), _l) in self.inner.iter() {
if let RelishTag::Relation(reltag) = rel {
for ((seg, _lsn), _l) in self.inner.iter() {
if let RelishTag::Relation(reltag) = seg.rel {
// FIXME: skip if it was dropped before the requested LSN. But there is no
// LSN argument
if (spcnode == 0 || reltag.spcnode == spcnode)
&& (dbnode == 0 || reltag.dbnode == dbnode)
{
rels.insert(*reltag);
rels.insert(reltag);
}
}
}
@@ -84,43 +84,40 @@ impl LayerMap {
let mut rels: HashSet<RelishTag> = HashSet::new();
// Scan the timeline directory to get all rels in this timeline.
for ((rel, _lsn), _l) in self.inner.iter() {
for ((seg, _lsn), _l) in self.inner.iter() {
// FIXME: skip if it was dropped before the requested LSN.
if let RelishTag::Relation(_) = rel {
if let RelishTag::Relation(_) = seg.rel {
} else {
rels.insert(*rel);
rels.insert(seg.rel);
}
}
Ok(rels)
}
/// Is there a newer layer for given relation?
pub fn newer_layer_exists(&self, rel: RelishTag, lsn: Lsn) -> bool {
let startkey = (rel, lsn);
let endkey = (rel, Lsn(u64::MAX));
/// Is there a newer layer for given segment?
pub fn newer_layer_exists(&self, seg: SegmentTag, lsn: Lsn) -> bool {
let startkey = (seg, lsn);
let endkey = (seg, Lsn(u64::MAX));
for ((_rel, newer_lsn), layer) in self.inner.range((Included(startkey), Included(endkey))) {
for ((_newer_seg, newer_lsn), layer) in
self.inner.range((Included(startkey), Included(endkey)))
{
if layer.get_end_lsn() > lsn {
trace!(
"found later layer for rel {}, {} {}-{}",
rel,
"found later layer for {}, {} {}-{}",
seg,
lsn,
newer_lsn,
layer.get_end_lsn()
);
return true;
} else {
trace!(
"found singleton layer for rel {}, {} {}",
rel,
lsn,
newer_lsn
);
trace!("found singleton layer for {}, {} {}", seg, lsn, newer_lsn);
continue;
}
}
trace!("no later layer found for rel {}, {}", rel, lsn);
trace!("no later layer found for {}, {}", seg, lsn);
false
}
}

View File

@@ -37,9 +37,8 @@
//! A snapshot file is constructed using the 'bookfile' crate. Each file consists of two
//! parts: the page versions and the relation sizes. They are stored as separate chapters.
//!
use crate::layered_repository::storage_layer::Layer;
use crate::layered_repository::storage_layer::PageVersion;
use crate::layered_repository::storage_layer::ZERO_PAGE;
use crate::layered_repository::storage_layer::{Layer, PageVersion, SegmentTag};
use crate::relish::*;
use crate::repository::WALRecord;
use crate::walredo::WalRedoManager;
@@ -70,7 +69,7 @@ static REL_SIZES_CHAPTER: u64 = 2;
#[derive(Debug, PartialEq, Eq, PartialOrd, Ord, Clone)]
struct SnapshotFileName {
rel: RelishTag,
seg: SegmentTag,
start_lsn: Lsn,
end_lsn: Lsn,
dropped: bool,
@@ -80,11 +79,11 @@ impl SnapshotFileName {
fn from_str(fname: &str) -> Option<Self> {
// Split the filename into parts
//
// <spcnode>_<dbnode>_<relnode>_<forknum>_<start LSN>_<end LSN>
// <spcnode>_<dbnode>_<relnode>_<forknum>_<seg>_<start LSN>_<end LSN>
//
// or if it was dropped:
//
// <spcnode>_<dbnode>_<relnode>_<forknum>_<start LSN>_<end LSN>_DROPPED
// <spcnode>_<dbnode>_<relnode>_<forknum>_<seg>_<start LSN>_<end LSN>_DROPPED
//
let rel;
let mut parts;
@@ -135,6 +134,10 @@ impl SnapshotFileName {
return None;
}
let segno = parts.next()?.parse::<u32>().ok()?;
let seg = SegmentTag { rel, segno };
let start_lsn = Lsn::from_hex(parts.next()?).ok()?;
let end_lsn = Lsn::from_hex(parts.next()?).ok()?;
@@ -153,7 +156,7 @@ impl SnapshotFileName {
}
Some(SnapshotFileName {
rel,
seg,
start_lsn,
end_lsn,
dropped,
@@ -161,7 +164,7 @@ impl SnapshotFileName {
}
fn to_string(&self) -> String {
let basename = match self.rel {
let basename = match self.seg.rel {
RelishTag::Relation(reltag) => format!(
"rel_{}_{}_{}_{}",
reltag.spcnode, reltag.dbnode, reltag.relnode, reltag.forknum
@@ -187,8 +190,9 @@ impl SnapshotFileName {
};
format!(
"{}_{:016X}_{:016X}{}",
"{}_{}_{:016X}_{:016X}{}",
basename,
self.seg.segno,
u64::from(self.start_lsn),
u64::from(self.end_lsn),
if self.dropped { "_DROPPED" } else { "" }
@@ -214,7 +218,7 @@ pub struct SnapshotLayer {
conf: &'static PageServerConf,
pub tenantid: ZTenantId,
pub timelineid: ZTimelineId,
pub rel: RelishTag,
pub seg: SegmentTag,
//
// This entry contains all the changes from 'start_lsn' to 'end_lsn'. The
@@ -249,8 +253,8 @@ impl Layer for SnapshotLayer {
return self.timelineid;
}
fn get_relish_tag(&self) -> RelishTag {
return self.rel;
fn get_seg_tag(&self) -> SegmentTag {
return self.seg;
}
fn is_dropped(&self) -> bool {
@@ -314,12 +318,12 @@ impl Layer for SnapshotLayer {
// but never writes the page.
//
// Would be nice to detect that situation better.
warn!("Page {} blk {} at {} not found", self.rel, blknum, lsn);
warn!("Page {} blk {} at {} not found", self.seg.rel, blknum, lsn);
return Ok(ZERO_PAGE.clone());
}
bail!(
"No base image found for page {} blk {} at {}/{}",
self.rel,
self.seg.rel,
blknum,
self.timelineid,
lsn
@@ -332,14 +336,14 @@ impl Layer for SnapshotLayer {
trace!(
"found page image for blk {} in {} at {}/{}, no WAL redo required",
blknum,
self.rel,
self.seg.rel,
self.timelineid,
lsn
);
Ok(img)
} else {
// FIXME: this ought to be an error?
warn!("Page {} blk {} at {} not found", self.rel, blknum, lsn);
warn!("Page {} blk {} at {} not found", self.seg.rel, blknum, lsn);
Ok(ZERO_PAGE.clone())
}
} else {
@@ -351,7 +355,7 @@ impl Layer for SnapshotLayer {
// FIXME: this ought to be an error?
warn!(
"Base image for page {} blk {} at {} not found, but got {} WAL records",
self.rel,
self.seg.rel,
blknum,
lsn,
records.len()
@@ -359,11 +363,11 @@ impl Layer for SnapshotLayer {
Ok(ZERO_PAGE.clone())
} else {
if page_img.is_some() {
trace!("found {} WAL records and a base image for blk {} in {} at {}/{}, performing WAL redo", records.len(), blknum, self.rel, self.timelineid, lsn);
trace!("found {} WAL records and a base image for blk {} in {} at {}/{}, performing WAL redo", records.len(), blknum, self.seg.rel, self.timelineid, lsn);
} else {
trace!("found {} WAL records that will init the page for blk {} in {} at {}/{}, performing WAL redo", records.len(), blknum, self.rel, self.timelineid, lsn);
trace!("found {} WAL records that will init the page for blk {} in {} at {}/{}, performing WAL redo", records.len(), blknum, self.seg.rel, self.timelineid, lsn);
}
let img = walredo_mgr.request_redo(self.rel, blknum, lsn, page_img, records)?;
let img = walredo_mgr.request_redo(self.seg.rel, blknum, lsn, page_img, records)?;
// FIXME: Should we memoize the page image in memory, so that
// we wouldn't need to reconstruct it again, if it's requested again?
@@ -375,7 +379,7 @@ impl Layer for SnapshotLayer {
}
/// Get size of the relation at given LSN
fn get_relish_size(&self, lsn: Lsn) -> Result<Option<u32>> {
fn get_seg_size(&self, lsn: Lsn) -> Result<u32> {
// Scan the BTreeMap backwards, starting from the given entry.
let inner = self.load()?;
let mut iter = inner.relsizes.range((Included(&Lsn(0)), Included(&lsn)));
@@ -383,15 +387,23 @@ impl Layer for SnapshotLayer {
if let Some((_entry_lsn, entry)) = iter.next_back() {
let result = *entry;
drop(inner);
trace!("get_relsize: {} at {} -> {}", self.rel, lsn, result);
Ok(Some(result))
trace!("get_seg_size: {} at {} -> {}", self.seg, lsn, result);
Ok(result)
} else {
Ok(None)
error!(
"No size found for {} at {} in snapshot layer {} {}-{}",
self.seg, lsn, self.seg, self.start_lsn, self.end_lsn
);
bail!(
"No size found for {} at {} in snapshot layer",
self.seg,
lsn
);
}
}
/// Does this relation exist at given LSN?
fn get_rel_exists(&self, lsn: Lsn) -> Result<bool> {
/// Does this segment exist at given LSN?
fn get_seg_exists(&self, lsn: Lsn) -> Result<bool> {
// Is the requested LSN after the rel was dropped?
if self.dropped && lsn >= self.end_lsn {
return Ok(false);
@@ -404,8 +416,8 @@ impl Layer for SnapshotLayer {
// Unsupported write operations
fn put_page_version(&self, blknum: u32, lsn: Lsn, _pv: PageVersion) -> Result<()> {
panic!(
"cannot modify historical snapshot layer, rel {} blk {} at {}/{}, {}-{}",
self.rel, blknum, self.timelineid, lsn, self.start_lsn, self.end_lsn
"cannot modify historical snapshot layer, {} blk {} at {}/{}, {}-{}",
self.seg, blknum, self.timelineid, lsn, self.start_lsn, self.end_lsn
);
}
fn put_truncation(&self, _lsn: Lsn, _relsize: u32) -> anyhow::Result<()> {
@@ -450,7 +462,7 @@ impl SnapshotLayer {
self.timelineid,
self.tenantid,
&SnapshotFileName {
rel: self.rel,
seg: self.seg,
start_lsn: self.start_lsn,
end_lsn: self.end_lsn,
dropped: self.dropped,
@@ -478,7 +490,7 @@ impl SnapshotLayer {
conf: &'static PageServerConf,
timelineid: ZTimelineId,
tenantid: ZTenantId,
rel: RelishTag,
seg: SegmentTag,
start_lsn: Lsn,
end_lsn: Lsn,
dropped: bool,
@@ -489,7 +501,7 @@ impl SnapshotLayer {
conf: conf,
timelineid: timelineid,
tenantid: tenantid,
rel: rel,
seg: seg,
start_lsn: start_lsn,
end_lsn,
dropped,
@@ -546,7 +558,7 @@ impl SnapshotLayer {
self.timelineid,
self.tenantid,
&SnapshotFileName {
rel: self.rel,
seg: self.seg,
start_lsn: self.start_lsn,
end_lsn: self.end_lsn,
dropped: self.dropped,
@@ -593,7 +605,7 @@ impl SnapshotLayer {
conf,
timelineid,
tenantid,
rel: snapfilename.rel,
seg: snapfilename.seg,
start_lsn: snapfilename.start_lsn,
end_lsn: snapfilename.end_lsn,
dropped: snapfilename.dropped,
@@ -615,7 +627,7 @@ impl SnapshotLayer {
pub fn dump(&self) -> String {
let mut result = format!(
"----- snapshot layer for {} {}-{} ----\n",
self.rel, self.start_lsn, self.end_lsn
self.seg, self.start_lsn, self.end_lsn
);
let inner = self.inner.lock().unwrap();

View File

@@ -1,3 +1,7 @@
//!
//! Common traits and structs for layers
//!
use crate::relish::RelishTag;
use crate::repository::WALRecord;
use crate::walredo::WalRedoManager;
@@ -5,12 +9,45 @@ use crate::ZTimelineId;
use anyhow::Result;
use bytes::Bytes;
use serde::{Deserialize, Serialize};
use std::fmt;
use std::sync::Arc;
use zenith_utils::lsn::Lsn;
pub static ZERO_PAGE: Bytes = Bytes::from_static(&[0u8; 8192]);
// Size of one segment in pages (10 MB)
pub const RELISH_SEG_SIZE: u32 = 10 * 1024 * 1024 / 8192;
///
/// Each relish stored in the repository is divided into fixed-sized "segments",
/// with 10 MB of key-space, or 1280 8k pages each.
///
#[derive(Debug, PartialEq, Eq, PartialOrd, Hash, Ord, Clone, Copy)]
pub struct SegmentTag {
pub rel: RelishTag,
pub segno: u32,
}
impl fmt::Display for SegmentTag {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(f, "{}.{}", self.rel, self.segno)
}
}
impl SegmentTag {
pub const fn from_blknum(rel: RelishTag, blknum: u32) -> SegmentTag {
SegmentTag {
rel,
segno: blknum / RELISH_SEG_SIZE,
}
}
pub fn blknum_in_seg(&self, blknum: u32) -> bool {
blknum / RELISH_SEG_SIZE == self.segno
}
}
///
/// Represents a version of a page at a specific LSN. The LSN is the key of the
/// entry in the 'page_versions' hash, it is not duplicated here.
@@ -37,21 +74,21 @@ pub struct PageVersion {
}
///
/// A Layer holds all page versions for one relish, in a range of LSNs.
/// A Layer holds all page versions for one segment of a relish, in a range of LSNs.
/// There are two kinds of layers, in-memory and snapshot layers. In-memory
/// layers are used to ingest incoming WAL, and provide fast access
/// to the recent page versions. Snaphot layers are stored on disk, and
/// are immutable.
///
/// Each layer contains a full snapshot of the relish at the start
/// Each layer contains a full snapshot of the segment at the start
/// LSN. In addition to that, it contains WAL (or more page images)
/// needed to recontruct any page version up to the end LSN.
///
pub trait Layer: Send + Sync {
// These functions identify the relish and the LSN range that this Layer
// holds.
// These functions identify the relish segment and the LSN range
// that this Layer holds.
fn get_timeline_id(&self) -> ZTimelineId;
fn get_relish_tag(&self) -> RelishTag;
fn get_seg_tag(&self) -> SegmentTag;
fn get_start_lsn(&self) -> Lsn;
fn get_end_lsn(&self) -> Lsn;
fn is_dropped(&self) -> bool;
@@ -63,6 +100,10 @@ pub trait Layer: Send + Sync {
fn is_frozen(&self) -> bool;
// Functions that correspond to the Timeline trait functions.
// Note that the 'blknum' is the offset of the page from the beginning
// of the *relish*, not the beginning of the segment. The requested
// 'blknum' must be covered by this segment.
fn get_page_at_lsn(
&self,
walredo_mgr: &dyn WalRedoManager,
@@ -70,9 +111,9 @@ pub trait Layer: Send + Sync {
lsn: Lsn,
) -> Result<Bytes>;
fn get_relish_size(&self, lsn: Lsn) -> Result<Option<u32>>;
fn get_seg_size(&self, lsn: Lsn) -> Result<u32>;
fn get_rel_exists(&self, lsn: Lsn) -> Result<bool>;
fn get_seg_exists(&self, lsn: Lsn) -> Result<bool>;
fn put_page_version(&self, blknum: u32, lsn: Lsn, pv: PageVersion) -> Result<()>;

View File

@@ -521,6 +521,21 @@ mod tests {
pg_constants::RELSEG_SIZE - 1
);
// Truncate to 1500, and then truncate all the way down to 0, one block at a time
// This tests the behavior at segment boundaries
let mut size: i32 = 3000;
while size >= 0 {
lsn += 1;
tline.put_truncation(TESTREL_A, Lsn(lsn), size as u32)?;
tline.advance_last_valid_lsn(Lsn(lsn));
assert_eq!(
tline.get_relish_size(TESTREL_A, Lsn(lsn))?.unwrap(),
size as u32
);
size -= 1;
}
Ok(())
}

View File

@@ -31,8 +31,8 @@ def test_snapfiles_gc(zenith_cli, pageserver, postgres, pg_bin):
# Create a test table
cur.execute("CREATE TABLE foo(x integer)")
cur.execute("INSERT INTO foo VALUES (1)")
print("Inserting two more rows and running GC")
cur.execute("select relfilenode from pg_class where oid = 'foo'::regclass");
row = cur.fetchone();
print("relfilenode is {}", row[0]);
@@ -46,6 +46,10 @@ def test_snapfiles_gc(zenith_cli, pageserver, postgres, pg_bin):
# kicks in and confuses our numbers.
cur.execute("VACUUM")
# delete the row, to update the Visibility Map. We don't want the VM
# update to confuse our numbers either.
cur.execute("DELETE FROM foo")
print("Running GC before test")
pscur.execute(f"do_gc {pageserver.initial_tenant} {timeline} 0")
row = pscur.fetchone()
@@ -54,16 +58,14 @@ def test_snapfiles_gc(zenith_cli, pageserver, postgres, pg_bin):
snapshot_relfiles_remain = row['snapshot_relfiles_total'] - row['snapshot_relfiles_removed']
assert snapshot_relfiles_remain > 0
# Insert a row. The first insert will also create a metadata entry for the
# relation, with size == 1 block. Hence, bump up the expected relation count.
snapshot_relfiles_remain += 1;
# Insert a row.
print("Inserting one row and running GC")
cur.execute("INSERT INTO foo VALUES (1)")
pscur.execute(f"do_gc {pageserver.initial_tenant} {timeline} 0")
row = pscur.fetchone()
print_gc_result(row);
assert row['snapshot_relfiles_total'] == snapshot_relfiles_remain
assert row['snapshot_relfiles_removed'] == 0
assert row['snapshot_relfiles_total'] == snapshot_relfiles_remain + 1
assert row['snapshot_relfiles_removed'] == 1
assert row['snapshot_relfiles_dropped'] == 0
# Insert two more rows and run GC.