Compare commits

...

3 Commits

Author SHA1 Message Date
Konstantin Knizhnik
7b1fea280b Fix clippy errors 2021-10-07 18:05:15 +03:00
Konstantin Knizhnik
20e7c0d76a Fix clippy errors 2021-10-07 18:03:51 +03:00
Konstantin Knizhnik
77d352400e Do not materialize pages during checkpoint, do it in GC 2021-10-07 16:52:59 +03:00
4 changed files with 209 additions and 402 deletions

View File

@@ -32,7 +32,6 @@ use std::sync::{Arc, Mutex, MutexGuard};
use std::time::{Duration, Instant};
use std::{fs, thread};
use crate::layered_repository::inmemory_layer::FreezeLayers;
use crate::relish::*;
use crate::relish_storage::schedule_timeline_upload;
use crate::repository::{GcResult, Repository, Timeline, WALRecord};
@@ -1418,58 +1417,14 @@ impl LayeredTimeline {
break;
}
// Freeze the layer.
//
// This is a two-step process. First, we "freeze" the in-memory
// layer, to close it for new writes, and replace the original
// layer with the new frozen in-memory layer (and possibly a new
// open layer to hold changes newer than the cutoff.) Then we write
// the frozen layer to disk, and replace the in-memory frozen layer
// with the new on-disk layers.
let FreezeLayers {
frozen,
open: maybe_new_open,
} = oldest_layer.freeze(last_record_lsn)?;
// replace this layer with the new layers that 'freeze' returned
layers.pop_oldest_open();
if let Some(new_open) = maybe_new_open.clone() {
layers.insert_open(new_open);
}
// We temporarily insert InMemory layer into historic list here.
// TODO: check that all possible concurrent users of 'historic' treat it right
layers.insert_historic(frozen.clone());
let new_delta_layer = oldest_layer.write_to_disk(self)?;
created_historics = true;
// Write the now-frozen layer to disk. That could take a while, so release the lock while do it
drop(layers);
let new_historics = frozen.write_to_disk(self)?;
layers = self.layers.lock().unwrap();
if !new_historics.is_empty() {
created_historics = true;
}
// Finally, replace the frozen in-memory layer with the new on-disk layers
layers.remove_historic(frozen.clone());
// If we created a successor InMemoryLayer, its predecessor is
// currently the frozen layer. We need to update the predecessor
// to be the latest on-disk layer.
if let Some(last_historic) = new_historics.last() {
if let Some(new_open) = &maybe_new_open {
let maybe_old_predecessor =
new_open.update_predecessor(Arc::clone(last_historic));
let old_predecessor = maybe_old_predecessor
.expect("new_open should always be a successor to frozen");
assert!(layer_ptr_eq(frozen.as_ref(), old_predecessor.as_ref()));
}
}
// Add the historics to the LayerMap
for n in new_historics {
layers.insert_historic(n);
}
// Add the historic to the LayerMap
layers.insert_historic(new_delta_layer);
}
// Call unload() on all frozen layers, to release memory.
@@ -1567,181 +1522,212 @@ impl LayeredTimeline {
self.timelineid, cutoff
);
info!("retain_lsns: {:?}", retain_lsns);
let mut layers_to_remove: Vec<Arc<dyn Layer>> = Vec::new();
// Scan all on-disk layers in the timeline.
//
// Garbage collect the layer if all conditions are satisfied:
// 1. it is older than cutoff LSN;
// 2. it doesn't need to be retained for 'retain_lsns';
// 3. newer on-disk layer exists (only for non-dropped segments);
// 4. this layer doesn't serve as a tombstone for some older layer;
//
let mut layers = self.layers.lock().unwrap();
'outer: for l in layers.iter_historic_layers() {
let seg = l.get_seg_tag();
if seg.rel.is_relation() {
result.ondisk_relfiles_total += 1;
} else {
result.ondisk_nonrelfiles_total += 1;
}
'outer: loop {
let mut layers_to_remove: Vec<Arc<dyn Layer>> = Vec::new();
let mut layer_to_materialize: Option<Arc<dyn Layer>> = None;
// Scan all on-disk layers in the timeline.
//
// Garbage collect the layer if all conditions are satisfied:
// 1. it is older than cutoff LSN;
// 2. it doesn't need to be retained for 'retain_lsns';
// 3. newer on-disk layer exists (only for non-dropped segments);
// 4. this layer doesn't serve as a tombstone for some older layer;
//
'for_all_historic_layers: for l in layers.iter_historic_layers() {
let seg = l.get_seg_tag();
// 1. Is it newer than cutoff point?
if l.get_end_lsn() > cutoff {
info!(
"keeping {} {}-{} because it's newer than cutoff {}",
seg,
l.get_start_lsn(),
l.get_end_lsn(),
cutoff
);
if seg.rel.is_relation() {
result.ondisk_relfiles_needed_by_cutoff += 1;
result.ondisk_relfiles_total += 1;
} else {
result.ondisk_nonrelfiles_needed_by_cutoff += 1;
result.ondisk_nonrelfiles_total += 1;
}
continue 'outer;
}
// 2. Is it needed by a child branch?
for retain_lsn in &retain_lsns {
// start_lsn is inclusive and end_lsn is exclusive
if l.get_start_lsn() <= *retain_lsn && *retain_lsn < l.get_end_lsn() {
// 1. Create image layer for this relation?
if !l.is_dropped()
&& !layers.newer_image_layer_exists(l.get_seg_tag(), l.get_end_lsn())
{
if l.is_incremental() {
layer_to_materialize = Some(Arc::clone(&l));
break 'for_all_historic_layers;
} else {
info!(
"keeping {} {}-{} because it's newer than cutoff {}",
seg,
l.get_start_lsn(),
l.get_end_lsn(),
cutoff
);
if seg.rel.is_relation() {
result.ondisk_relfiles_needed_by_cutoff += 1;
} else {
result.ondisk_nonrelfiles_needed_by_cutoff += 1;
}
continue 'for_all_historic_layers;
}
}
// 2. Is it newer than cutoff point?
if l.get_end_lsn() > cutoff {
info!(
"keeping {} {}-{} because it's needed by branch point {}",
"keeping {} {}-{} because it's newer than cutoff {}",
seg,
l.get_start_lsn(),
l.get_end_lsn(),
*retain_lsn
cutoff
);
if seg.rel.is_relation() {
result.ondisk_relfiles_needed_by_branches += 1;
result.ondisk_relfiles_needed_by_cutoff += 1;
} else {
result.ondisk_nonrelfiles_needed_by_branches += 1;
result.ondisk_nonrelfiles_needed_by_cutoff += 1;
}
continue 'outer;
continue 'for_all_historic_layers;
}
}
// 3. Is there a later on-disk layer for this relation?
if !l.is_dropped() && !layers.newer_image_layer_exists(l.get_seg_tag(), l.get_end_lsn())
{
info!(
"keeping {} {}-{} because it is the latest layer",
seg,
l.get_start_lsn(),
l.get_end_lsn()
);
if seg.rel.is_relation() {
result.ondisk_relfiles_not_updated += 1;
} else {
result.ondisk_nonrelfiles_not_updated += 1;
}
continue 'outer;
}
// 4. Does this layer serve as a tombstome for some older layer?
if l.is_dropped() {
let prior_lsn = l.get_start_lsn().checked_sub(1u64).unwrap();
// Check if this layer serves as a tombstone for this timeline
// We have to do this separately from timeline check below,
// because LayerMap of this timeline is already locked.
let mut is_tombstone = layers.layer_exists_at_lsn(l.get_seg_tag(), prior_lsn)?;
if is_tombstone {
info!(
"earlier layer exists at {} in {}",
prior_lsn, self.timelineid
);
}
// Now check ancestor timelines, if any
else if let Some(ancestor) = &self.ancestor_timeline {
let prior_lsn = ancestor.get_last_record_lsn();
if seg.rel.is_blocky() {
// 3. Is it needed by a child branch?
for retain_lsn in &retain_lsns {
// start_lsn is inclusive and end_lsn is exclusive
if l.get_start_lsn() <= *retain_lsn && *retain_lsn < l.get_end_lsn() {
info!(
"check blocky relish size {} at {} in {} for layer {}-{}",
"keeping {} {}-{} because it's needed by branch point {}",
seg,
prior_lsn,
ancestor.timelineid,
l.get_start_lsn(),
l.get_end_lsn()
l.get_end_lsn(),
*retain_lsn
);
match ancestor.get_relish_size(seg.rel, prior_lsn).unwrap() {
Some(size) => {
let last_live_seg = SegmentTag::from_blknum(seg.rel, size - 1);
info!(
"blocky rel size is {} last_live_seg.segno {} seg.segno {}",
size, last_live_seg.segno, seg.segno
);
if last_live_seg.segno >= seg.segno {
is_tombstone = true;
if seg.rel.is_relation() {
result.ondisk_relfiles_needed_by_branches += 1;
} else {
result.ondisk_nonrelfiles_needed_by_branches += 1;
}
continue 'for_all_historic_layers;
}
}
// 4. Does this layer serve as a tombstome for some older layer?
if l.is_dropped() {
let prior_lsn = l.get_start_lsn().checked_sub(1u64).unwrap();
// Check if this layer serves as a tombstone for this timeline
// We have to do this separately from timeline check below,
// because LayerMap of this timeline is already locked.
let mut is_tombstone =
layers.layer_exists_at_lsn(l.get_seg_tag(), prior_lsn)?;
if is_tombstone {
info!(
"earlier layer exists at {} in {}",
prior_lsn, self.timelineid
);
}
// Now check ancestor timelines, if any
else if let Some(ancestor) = &self.ancestor_timeline {
let prior_lsn = ancestor.get_last_record_lsn();
if seg.rel.is_blocky() {
info!(
"check blocky relish size {} at {} in {} for layer {}-{}",
seg,
prior_lsn,
ancestor.timelineid,
l.get_start_lsn(),
l.get_end_lsn()
);
match ancestor.get_relish_size(seg.rel, prior_lsn).unwrap() {
Some(size) => {
let last_live_seg = SegmentTag::from_blknum(seg.rel, size - 1);
info!(
"blocky rel size is {} last_live_seg.segno {} seg.segno {}",
size, last_live_seg.segno, seg.segno
);
if last_live_seg.segno >= seg.segno {
is_tombstone = true;
}
}
_ => {
info!("blocky rel doesn't exist");
}
}
_ => {
info!("blocky rel doesn't exist");
}
} else {
info!(
"check non-blocky relish existence {} at {} in {} for layer {}-{}",
seg,
prior_lsn,
ancestor.timelineid,
l.get_start_lsn(),
l.get_end_lsn()
);
is_tombstone =
ancestor.get_rel_exists(seg.rel, prior_lsn).unwrap_or(false);
}
} else {
}
if is_tombstone {
info!(
"check non-blocky relish existence {} at {} in {} for layer {}-{}",
seg,
prior_lsn,
ancestor.timelineid,
l.get_start_lsn(),
l.get_end_lsn()
);
is_tombstone = ancestor.get_rel_exists(seg.rel, prior_lsn).unwrap_or(false);
"keeping {} {}-{} because this layer servers as a tombstome for older layer",
seg,
l.get_start_lsn(),
l.get_end_lsn()
);
if seg.rel.is_relation() {
result.ondisk_relfiles_needed_as_tombstone += 1;
} else {
result.ondisk_nonrelfiles_needed_as_tombstone += 1;
}
continue 'for_all_historic_layers;
}
}
if is_tombstone {
info!(
"keeping {} {}-{} because this layer servers as a tombstome for older layer",
seg,
l.get_start_lsn(),
l.get_end_lsn()
);
// We didn't find any reason to keep this file, so remove it.
info!(
"garbage collecting {} {}-{} {}",
l.get_seg_tag(),
l.get_start_lsn(),
l.get_end_lsn(),
l.is_dropped()
);
layers_to_remove.push(Arc::clone(&l));
}
if seg.rel.is_relation() {
result.ondisk_relfiles_needed_as_tombstone += 1;
} else {
result.ondisk_nonrelfiles_needed_as_tombstone += 1;
}
continue 'outer;
// Actually delete the layers from disk and remove them from the map.
// (couldn't do this in the loop above, because you cannot modify a collection
// while iterating it. BTreeMap::retain() would be another option)
for doomed_layer in layers_to_remove {
doomed_layer.delete()?;
layers.remove_historic(doomed_layer.clone());
match (
doomed_layer.is_dropped(),
doomed_layer.get_seg_tag().rel.is_relation(),
) {
(true, true) => result.ondisk_relfiles_dropped += 1,
(true, false) => result.ondisk_nonrelfiles_dropped += 1,
(false, true) => result.ondisk_relfiles_removed += 1,
(false, false) => result.ondisk_nonrelfiles_removed += 1,
}
}
// We didn't find any reason to keep this file, so remove it.
info!(
"garbage collecting {} {}-{} {}",
l.get_seg_tag(),
l.get_start_lsn(),
l.get_end_lsn(),
l.is_dropped()
);
layers_to_remove.push(Arc::clone(&l));
}
// Actually delete the layers from disk and remove them from the map.
// (couldn't do this in the loop above, because you cannot modify a collection
// while iterating it. BTreeMap::retain() would be another option)
for doomed_layer in layers_to_remove {
doomed_layer.delete()?;
layers.remove_historic(doomed_layer.clone());
match (
doomed_layer.is_dropped(),
doomed_layer.get_seg_tag().rel.is_relation(),
) {
(true, true) => result.ondisk_relfiles_dropped += 1,
(true, false) => result.ondisk_nonrelfiles_dropped += 1,
(false, true) => result.ondisk_relfiles_removed += 1,
(false, false) => result.ondisk_nonrelfiles_removed += 1,
if let Some(delta_layer) = layer_to_materialize {
drop(layers); // release lock, as far as new image layers are created only by GC thread,
let image_layer = ImageLayer::create_from_src(
self.conf,
&self,
&*delta_layer,
delta_layer.get_end_lsn(),
)?;
layers = self.layers.lock().unwrap();
info!(
"materialize layer {} {}-{}",
delta_layer.get_seg_tag(),
delta_layer.get_start_lsn(),
delta_layer.get_end_lsn()
);
layers.insert_historic(Arc::new(image_layer));
continue 'outer;
}
break 'outer;
}
result.elapsed = now.elapsed();
Ok(result)
}
@@ -1950,17 +1936,6 @@ pub fn dump_layerfile_from_path(path: &Path) -> Result<()> {
Ok(())
}
/// Check for equality of Layer memory addresses
fn layer_ptr_eq(l1: &dyn Layer, l2: &dyn Layer) -> bool {
let l1_ptr = l1 as *const dyn Layer;
let l2_ptr = l2 as *const dyn Layer;
// comparing *const dyn Layer will not only check for data address equality,
// but also for vtable address equality.
// to avoid this, we compare *const ().
// see here for more https://github.com/rust-lang/rust/issues/46139
std::ptr::eq(l1_ptr as *const (), l2_ptr as *const ())
}
/// Add a suffix to a layer file's name: .{num}.old
/// Uses the first available num (starts at 0)
fn rename_to_backup(path: PathBuf) -> anyhow::Result<()> {

View File

@@ -15,13 +15,11 @@ use crate::{ZTenantId, ZTimelineId};
use anyhow::{bail, Result};
use bytes::Bytes;
use log::*;
use std::cmp::min;
use std::collections::BTreeMap;
use std::ops::Bound::Included;
use std::path::PathBuf;
use std::sync::{Arc, RwLock};
use zenith_utils::accum::Accum;
use zenith_utils::lsn::Lsn;
pub struct InMemoryLayer {
@@ -36,9 +34,6 @@ pub struct InMemoryLayer {
///
start_lsn: Lsn,
/// Frozen in-memory layers have an inclusive end LSN.
end_lsn: Option<Lsn>,
/// LSN of the oldest page version stored in this layer
oldest_pending_lsn: Lsn,
@@ -51,6 +46,9 @@ pub struct InMemoryLayerInner {
/// If this relation was dropped, remember when that happened.
drop_lsn: Option<Lsn>,
/// LSN of last record
end_lsn: Option<Lsn>,
///
/// All versions of all pages in the layer are are kept here.
/// Indexed by block number and LSN.
@@ -118,6 +116,11 @@ impl Layer for InMemoryLayer {
PathBuf::from(format!("inmem-{}", delta_filename))
}
fn get_end_lsn(&self) -> Lsn {
let inner = self.inner.read().unwrap();
inner.drop_lsn.unwrap_or(Lsn(u64::MAX))
}
fn path(&self) -> Option<PathBuf> {
None
}
@@ -134,20 +137,6 @@ impl Layer for InMemoryLayer {
self.start_lsn
}
fn get_end_lsn(&self) -> Lsn {
if let Some(end_lsn) = self.end_lsn {
return Lsn(end_lsn.0 + 1);
}
let inner = self.inner.read().unwrap();
if let Some(drop_lsn) = inner.drop_lsn {
drop_lsn
} else {
Lsn(u64::MAX)
}
}
fn is_dropped(&self) -> bool {
let inner = self.inner.read().unwrap();
inner.drop_lsn.is_some()
@@ -252,8 +241,7 @@ impl Layer for InMemoryLayer {
}
fn is_incremental(&self) -> bool {
let inner = self.inner.read().unwrap();
inner.predecessor.is_some()
true
}
/// debugging function to print out the contents of the layer
@@ -296,19 +284,7 @@ pub struct NonWriteableError;
pub type WriteResult<T> = std::result::Result<T, NonWriteableError>;
/// Helper struct to cleanup `InMemoryLayer::freeze` return signature.
pub struct FreezeLayers {
/// Replacement layer for the layer which freeze was called on.
pub frozen: Arc<InMemoryLayer>,
/// New open layer containing leftover data.
pub open: Option<Arc<InMemoryLayer>>,
}
impl InMemoryLayer {
fn assert_not_frozen(&self) {
assert!(self.end_lsn.is_none());
}
/// Return the oldest page version that's stored in this layer
pub fn get_oldest_pending_lsn(&self) -> Lsn {
self.oldest_pending_lsn
@@ -338,10 +314,10 @@ impl InMemoryLayer {
tenantid,
seg,
start_lsn,
end_lsn: None,
oldest_pending_lsn,
inner: RwLock::new(InMemoryLayerInner {
drop_lsn: None,
end_lsn: None,
page_versions: BTreeMap::new(),
segsizes: BTreeMap::new(),
writeable: true,
@@ -379,7 +355,6 @@ impl InMemoryLayer {
/// Common subroutine of the public put_wal_record() and put_page_image() functions.
/// Adds the page version to the in-memory tree
pub fn put_page_version(&self, blknum: u32, lsn: Lsn, pv: PageVersion) -> WriteResult<u32> {
self.assert_not_frozen();
assert!(self.seg.blknum_in_seg(blknum));
trace!(
@@ -392,6 +367,7 @@ impl InMemoryLayer {
let mut inner = self.inner.write().unwrap();
inner.check_writeable()?;
inner.end_lsn = Some(lsn);
let old = inner.page_versions.insert((blknum, lsn), pv);
@@ -458,10 +434,9 @@ impl InMemoryLayer {
/// Remember that the relation was truncated at given LSN
pub fn put_truncation(&self, lsn: Lsn, segsize: u32) -> WriteResult<()> {
self.assert_not_frozen();
let mut inner = self.inner.write().unwrap();
inner.check_writeable()?;
inner.end_lsn = Some(lsn);
// check that this we truncate to a smaller size than segment was before the truncation
let oldsize = inner.get_seg_size(lsn);
@@ -479,8 +454,6 @@ impl InMemoryLayer {
/// Remember that the segment was dropped at given LSN
pub fn drop_segment(&self, lsn: Lsn) -> WriteResult<()> {
self.assert_not_frozen();
let mut inner = self.inner.write().unwrap();
inner.check_writeable()?;
@@ -531,10 +504,10 @@ impl InMemoryLayer {
tenantid,
seg,
start_lsn,
end_lsn: None,
oldest_pending_lsn,
inner: RwLock::new(InMemoryLayerInner {
drop_lsn: None,
end_lsn: None,
page_versions: BTreeMap::new(),
segsizes,
writeable: true,
@@ -548,125 +521,12 @@ impl InMemoryLayer {
inner.writeable
}
/// Splits `self` into two InMemoryLayers: `frozen` and `open`.
/// All data up to and including `cutoff_lsn`
/// is copied to `frozen`, while the remaining data is copied to `open`.
/// After completion, self is non-writeable, but not frozen.
pub fn freeze(self: Arc<Self>, cutoff_lsn: Lsn) -> Result<FreezeLayers> {
info!(
"freezing in memory layer {} on timeline {} at {} (oldest {})",
self.filename().display(),
self.timelineid,
cutoff_lsn,
self.oldest_pending_lsn
);
self.assert_not_frozen();
let self_ref = self.clone();
let mut inner = self_ref.inner.write().unwrap();
// Dropped layers don't need any special freeze actions,
// they are marked as non-writeable at drop and just
// written out to disk by checkpointer.
if inner.drop_lsn.is_some() {
assert!(!inner.writeable);
info!(
"freezing in memory layer for {} on timeline {} is dropped at {}",
self.seg,
self.timelineid,
inner.drop_lsn.unwrap()
);
// There should be no newer layer that refers this non-writeable layer,
// because layer that is created after dropped one represents a new rel.
return Ok(FreezeLayers {
frozen: self,
open: None,
});
}
assert!(inner.writeable);
inner.writeable = false;
// Divide all the page versions into old and new
// at the 'cutoff_lsn' point.
let mut before_segsizes = BTreeMap::new();
let mut after_segsizes = BTreeMap::new();
let mut after_oldest_lsn: Accum<Lsn> = Accum(None);
for (lsn, size) in inner.segsizes.iter() {
if *lsn > cutoff_lsn {
after_segsizes.insert(*lsn, *size);
after_oldest_lsn.accum(min, *lsn);
} else {
before_segsizes.insert(*lsn, *size);
}
}
let mut before_page_versions = BTreeMap::new();
let mut after_page_versions = BTreeMap::new();
for ((blknum, lsn), pv) in inner.page_versions.iter() {
if *lsn > cutoff_lsn {
after_page_versions.insert((*blknum, *lsn), pv.clone());
after_oldest_lsn.accum(min, *lsn);
} else {
before_page_versions.insert((*blknum, *lsn), pv.clone());
}
}
let frozen = Arc::new(InMemoryLayer {
conf: self.conf,
tenantid: self.tenantid,
timelineid: self.timelineid,
seg: self.seg,
start_lsn: self.start_lsn,
end_lsn: Some(cutoff_lsn),
oldest_pending_lsn: self.start_lsn,
inner: RwLock::new(InMemoryLayerInner {
drop_lsn: inner.drop_lsn,
page_versions: before_page_versions,
segsizes: before_segsizes,
writeable: false,
predecessor: inner.predecessor.clone(),
}),
});
let open = if !after_segsizes.is_empty() || !after_page_versions.is_empty() {
let mut new_open = Self::create_successor_layer(
self.conf,
frozen.clone(),
self.timelineid,
self.tenantid,
cutoff_lsn + 1,
after_oldest_lsn.0.unwrap(),
)?;
let new_inner = new_open.inner.get_mut().unwrap();
new_inner.page_versions.append(&mut after_page_versions);
new_inner.segsizes.append(&mut after_segsizes);
Some(Arc::new(new_open))
} else {
None
};
Ok(FreezeLayers { frozen, open })
}
/// Write the this frozen in-memory layer to disk.
///
/// Returns new layers that replace this one.
/// If not dropped, returns a new image layer containing the page versions
/// at the `end_lsn`. Can also return a DeltaLayer that includes all the
/// WAL records between start and end LSN. (The delta layer is not needed
/// when a new relish is created with a single LSN, so that the start and
/// end LSN are the same.)
pub fn write_to_disk(&self, timeline: &LayeredTimeline) -> Result<Vec<Arc<dyn Layer>>> {
trace!(
"write_to_disk {} end_lsn is {} get_end_lsn is {}",
self.filename().display(),
self.end_lsn.unwrap_or(Lsn(0)),
self.get_end_lsn()
);
/// Returns new DeltaLayer that includes all the
/// WAL records between start and end LSN.
///
pub fn write_to_disk(&self, timeline: &LayeredTimeline) -> Result<Arc<dyn Layer>> {
// Grab the lock in read-mode. We hold it over the I/O, but because this
// layer is not writeable anymore, no one should be trying to aquire the
// write lock on it, so we shouldn't block anyone. There's one exception
@@ -677,7 +537,12 @@ impl InMemoryLayer {
// would have to wait until we release it. That race condition is very
// rare though, so we just accept the potential latency hit for now.
let inner = self.inner.read().unwrap();
assert!(!inner.writeable);
trace!(
"write_to_disk {} end_lsn is {}",
self.filename().display(),
inner.end_lsn.unwrap_or(Lsn(0)),
);
let predecessor = inner.predecessor.clone();
@@ -700,24 +565,10 @@ impl InMemoryLayer {
self.start_lsn,
drop_lsn
);
return Ok(vec![Arc::new(delta_layer)]);
return Ok(Arc::new(delta_layer));
}
let end_lsn = self.end_lsn.unwrap();
let mut before_segsizes = BTreeMap::new();
for (lsn, size) in inner.segsizes.iter() {
if *lsn <= end_lsn {
before_segsizes.insert(*lsn, *size);
}
}
let mut before_page_versions = inner.page_versions.iter().filter(|tup| {
let ((_blknum, lsn), _pv) = tup;
*lsn < end_lsn
});
let mut frozen_layers: Vec<Arc<dyn Layer>> = Vec::new();
let end_lsn = inner.end_lsn.unwrap();
if self.start_lsn != end_lsn {
// Write the page versions before the cutoff to disk.
@@ -730,32 +581,13 @@ impl InMemoryLayer {
end_lsn,
false,
predecessor,
before_page_versions,
before_segsizes,
inner.page_versions.iter(),
inner.segsizes.clone(),
)?;
frozen_layers.push(Arc::new(delta_layer));
trace!(
"freeze: created delta layer {} {}-{}",
self.seg,
self.start_lsn,
end_lsn
);
Ok(Arc::new(delta_layer))
} else {
assert!(before_page_versions.next().is_none());
let image_layer = ImageLayer::create_from_src(self.conf, timeline, self, end_lsn)?;
Ok(Arc::new(image_layer))
}
drop(inner);
// Write a new base image layer at the cutoff point
let image_layer = ImageLayer::create_from_src(self.conf, timeline, self, end_lsn)?;
frozen_layers.push(Arc::new(image_layer));
trace!("freeze: created image layer {} at {}", self.seg, end_lsn);
Ok(frozen_layers)
}
pub fn update_predecessor(&self, predecessor: Arc<dyn Layer>) -> Option<Arc<dyn Layer>> {
let mut inner = self.inner.write().unwrap();
inner.predecessor.replace(predecessor)
}
}

View File

@@ -219,7 +219,7 @@ where
}
}
// No more elements at this point. Move to next point.
if let Some((point_key, point)) = self.point_iter.next() {
if let Some((point_key, point)) = self.point_iter.next_back() {
self.elem_iter = Some((*point_key, point.elements.iter()));
continue;
} else {

View File

@@ -36,8 +36,8 @@ pub mod defaults {
pub const DEFAULT_CHECKPOINT_DISTANCE: u64 = 256 * 1024 * 1024;
pub const DEFAULT_CHECKPOINT_PERIOD: Duration = Duration::from_secs(1);
pub const DEFAULT_GC_HORIZON: u64 = 64 * 1024 * 1024;
pub const DEFAULT_GC_PERIOD: Duration = Duration::from_secs(100);
pub const DEFAULT_GC_HORIZON: u64 = 64u64 * 1024 * 1024 * 1024;
pub const DEFAULT_GC_PERIOD: Duration = Duration::from_secs(1);
pub const DEFAULT_SUPERUSER: &str = "zenith_admin";
pub const DEFAULT_RELISH_STORAGE_MAX_CONCURRENT_SYNC_LIMITS: usize = 100;