Compare commits

...

3 Commits

Author SHA1 Message Date
Konstantin Knizhnik
7b1fea280b Fix clippy errors 2021-10-07 18:05:15 +03:00
Konstantin Knizhnik
20e7c0d76a Fix clippy errors 2021-10-07 18:03:51 +03:00
Konstantin Knizhnik
77d352400e Do not materialize pages during checkpoint, do it in GC 2021-10-07 16:52:59 +03:00
4 changed files with 209 additions and 402 deletions

View File

@@ -32,7 +32,6 @@ use std::sync::{Arc, Mutex, MutexGuard};
use std::time::{Duration, Instant}; use std::time::{Duration, Instant};
use std::{fs, thread}; use std::{fs, thread};
use crate::layered_repository::inmemory_layer::FreezeLayers;
use crate::relish::*; use crate::relish::*;
use crate::relish_storage::schedule_timeline_upload; use crate::relish_storage::schedule_timeline_upload;
use crate::repository::{GcResult, Repository, Timeline, WALRecord}; use crate::repository::{GcResult, Repository, Timeline, WALRecord};
@@ -1418,58 +1417,14 @@ impl LayeredTimeline {
break; break;
} }
// Freeze the layer.
//
// This is a two-step process. First, we "freeze" the in-memory
// layer, to close it for new writes, and replace the original
// layer with the new frozen in-memory layer (and possibly a new
// open layer to hold changes newer than the cutoff.) Then we write
// the frozen layer to disk, and replace the in-memory frozen layer
// with the new on-disk layers.
let FreezeLayers {
frozen,
open: maybe_new_open,
} = oldest_layer.freeze(last_record_lsn)?;
// replace this layer with the new layers that 'freeze' returned // replace this layer with the new layers that 'freeze' returned
layers.pop_oldest_open(); layers.pop_oldest_open();
if let Some(new_open) = maybe_new_open.clone() {
layers.insert_open(new_open);
}
// We temporarily insert InMemory layer into historic list here. let new_delta_layer = oldest_layer.write_to_disk(self)?;
// TODO: check that all possible concurrent users of 'historic' treat it right created_historics = true;
layers.insert_historic(frozen.clone());
// Write the now-frozen layer to disk. That could take a while, so release the lock while do it // Add the historic to the LayerMap
drop(layers); layers.insert_historic(new_delta_layer);
let new_historics = frozen.write_to_disk(self)?;
layers = self.layers.lock().unwrap();
if !new_historics.is_empty() {
created_historics = true;
}
// Finally, replace the frozen in-memory layer with the new on-disk layers
layers.remove_historic(frozen.clone());
// If we created a successor InMemoryLayer, its predecessor is
// currently the frozen layer. We need to update the predecessor
// to be the latest on-disk layer.
if let Some(last_historic) = new_historics.last() {
if let Some(new_open) = &maybe_new_open {
let maybe_old_predecessor =
new_open.update_predecessor(Arc::clone(last_historic));
let old_predecessor = maybe_old_predecessor
.expect("new_open should always be a successor to frozen");
assert!(layer_ptr_eq(frozen.as_ref(), old_predecessor.as_ref()));
}
}
// Add the historics to the LayerMap
for n in new_historics {
layers.insert_historic(n);
}
} }
// Call unload() on all frozen layers, to release memory. // Call unload() on all frozen layers, to release memory.
@@ -1567,181 +1522,212 @@ impl LayeredTimeline {
self.timelineid, cutoff self.timelineid, cutoff
); );
info!("retain_lsns: {:?}", retain_lsns); info!("retain_lsns: {:?}", retain_lsns);
let mut layers_to_remove: Vec<Arc<dyn Layer>> = Vec::new();
// Scan all on-disk layers in the timeline.
//
// Garbage collect the layer if all conditions are satisfied:
// 1. it is older than cutoff LSN;
// 2. it doesn't need to be retained for 'retain_lsns';
// 3. newer on-disk layer exists (only for non-dropped segments);
// 4. this layer doesn't serve as a tombstone for some older layer;
//
let mut layers = self.layers.lock().unwrap(); let mut layers = self.layers.lock().unwrap();
'outer: for l in layers.iter_historic_layers() {
let seg = l.get_seg_tag();
if seg.rel.is_relation() { 'outer: loop {
result.ondisk_relfiles_total += 1; let mut layers_to_remove: Vec<Arc<dyn Layer>> = Vec::new();
} else { let mut layer_to_materialize: Option<Arc<dyn Layer>> = None;
result.ondisk_nonrelfiles_total += 1;
} // Scan all on-disk layers in the timeline.
//
// Garbage collect the layer if all conditions are satisfied:
// 1. it is older than cutoff LSN;
// 2. it doesn't need to be retained for 'retain_lsns';
// 3. newer on-disk layer exists (only for non-dropped segments);
// 4. this layer doesn't serve as a tombstone for some older layer;
//
'for_all_historic_layers: for l in layers.iter_historic_layers() {
let seg = l.get_seg_tag();
// 1. Is it newer than cutoff point?
if l.get_end_lsn() > cutoff {
info!(
"keeping {} {}-{} because it's newer than cutoff {}",
seg,
l.get_start_lsn(),
l.get_end_lsn(),
cutoff
);
if seg.rel.is_relation() { if seg.rel.is_relation() {
result.ondisk_relfiles_needed_by_cutoff += 1; result.ondisk_relfiles_total += 1;
} else { } else {
result.ondisk_nonrelfiles_needed_by_cutoff += 1; result.ondisk_nonrelfiles_total += 1;
} }
continue 'outer;
}
// 2. Is it needed by a child branch? // 1. Create image layer for this relation?
for retain_lsn in &retain_lsns { if !l.is_dropped()
// start_lsn is inclusive and end_lsn is exclusive && !layers.newer_image_layer_exists(l.get_seg_tag(), l.get_end_lsn())
if l.get_start_lsn() <= *retain_lsn && *retain_lsn < l.get_end_lsn() { {
if l.is_incremental() {
layer_to_materialize = Some(Arc::clone(&l));
break 'for_all_historic_layers;
} else {
info!(
"keeping {} {}-{} because it's newer than cutoff {}",
seg,
l.get_start_lsn(),
l.get_end_lsn(),
cutoff
);
if seg.rel.is_relation() {
result.ondisk_relfiles_needed_by_cutoff += 1;
} else {
result.ondisk_nonrelfiles_needed_by_cutoff += 1;
}
continue 'for_all_historic_layers;
}
}
// 2. Is it newer than cutoff point?
if l.get_end_lsn() > cutoff {
info!( info!(
"keeping {} {}-{} because it's needed by branch point {}", "keeping {} {}-{} because it's newer than cutoff {}",
seg, seg,
l.get_start_lsn(), l.get_start_lsn(),
l.get_end_lsn(), l.get_end_lsn(),
*retain_lsn cutoff
); );
if seg.rel.is_relation() { if seg.rel.is_relation() {
result.ondisk_relfiles_needed_by_branches += 1; result.ondisk_relfiles_needed_by_cutoff += 1;
} else { } else {
result.ondisk_nonrelfiles_needed_by_branches += 1; result.ondisk_nonrelfiles_needed_by_cutoff += 1;
} }
continue 'outer; continue 'for_all_historic_layers;
} }
}
// 3. Is there a later on-disk layer for this relation? // 3. Is it needed by a child branch?
if !l.is_dropped() && !layers.newer_image_layer_exists(l.get_seg_tag(), l.get_end_lsn()) for retain_lsn in &retain_lsns {
{ // start_lsn is inclusive and end_lsn is exclusive
info!( if l.get_start_lsn() <= *retain_lsn && *retain_lsn < l.get_end_lsn() {
"keeping {} {}-{} because it is the latest layer",
seg,
l.get_start_lsn(),
l.get_end_lsn()
);
if seg.rel.is_relation() {
result.ondisk_relfiles_not_updated += 1;
} else {
result.ondisk_nonrelfiles_not_updated += 1;
}
continue 'outer;
}
// 4. Does this layer serve as a tombstome for some older layer?
if l.is_dropped() {
let prior_lsn = l.get_start_lsn().checked_sub(1u64).unwrap();
// Check if this layer serves as a tombstone for this timeline
// We have to do this separately from timeline check below,
// because LayerMap of this timeline is already locked.
let mut is_tombstone = layers.layer_exists_at_lsn(l.get_seg_tag(), prior_lsn)?;
if is_tombstone {
info!(
"earlier layer exists at {} in {}",
prior_lsn, self.timelineid
);
}
// Now check ancestor timelines, if any
else if let Some(ancestor) = &self.ancestor_timeline {
let prior_lsn = ancestor.get_last_record_lsn();
if seg.rel.is_blocky() {
info!( info!(
"check blocky relish size {} at {} in {} for layer {}-{}", "keeping {} {}-{} because it's needed by branch point {}",
seg, seg,
prior_lsn,
ancestor.timelineid,
l.get_start_lsn(), l.get_start_lsn(),
l.get_end_lsn() l.get_end_lsn(),
*retain_lsn
); );
match ancestor.get_relish_size(seg.rel, prior_lsn).unwrap() { if seg.rel.is_relation() {
Some(size) => { result.ondisk_relfiles_needed_by_branches += 1;
let last_live_seg = SegmentTag::from_blknum(seg.rel, size - 1); } else {
info!( result.ondisk_nonrelfiles_needed_by_branches += 1;
"blocky rel size is {} last_live_seg.segno {} seg.segno {}", }
size, last_live_seg.segno, seg.segno continue 'for_all_historic_layers;
); }
if last_live_seg.segno >= seg.segno { }
is_tombstone = true;
// 4. Does this layer serve as a tombstome for some older layer?
if l.is_dropped() {
let prior_lsn = l.get_start_lsn().checked_sub(1u64).unwrap();
// Check if this layer serves as a tombstone for this timeline
// We have to do this separately from timeline check below,
// because LayerMap of this timeline is already locked.
let mut is_tombstone =
layers.layer_exists_at_lsn(l.get_seg_tag(), prior_lsn)?;
if is_tombstone {
info!(
"earlier layer exists at {} in {}",
prior_lsn, self.timelineid
);
}
// Now check ancestor timelines, if any
else if let Some(ancestor) = &self.ancestor_timeline {
let prior_lsn = ancestor.get_last_record_lsn();
if seg.rel.is_blocky() {
info!(
"check blocky relish size {} at {} in {} for layer {}-{}",
seg,
prior_lsn,
ancestor.timelineid,
l.get_start_lsn(),
l.get_end_lsn()
);
match ancestor.get_relish_size(seg.rel, prior_lsn).unwrap() {
Some(size) => {
let last_live_seg = SegmentTag::from_blknum(seg.rel, size - 1);
info!(
"blocky rel size is {} last_live_seg.segno {} seg.segno {}",
size, last_live_seg.segno, seg.segno
);
if last_live_seg.segno >= seg.segno {
is_tombstone = true;
}
}
_ => {
info!("blocky rel doesn't exist");
} }
} }
_ => { } else {
info!("blocky rel doesn't exist"); info!(
} "check non-blocky relish existence {} at {} in {} for layer {}-{}",
seg,
prior_lsn,
ancestor.timelineid,
l.get_start_lsn(),
l.get_end_lsn()
);
is_tombstone =
ancestor.get_rel_exists(seg.rel, prior_lsn).unwrap_or(false);
} }
} else { }
if is_tombstone {
info!( info!(
"check non-blocky relish existence {} at {} in {} for layer {}-{}", "keeping {} {}-{} because this layer servers as a tombstome for older layer",
seg, seg,
prior_lsn, l.get_start_lsn(),
ancestor.timelineid, l.get_end_lsn()
l.get_start_lsn(), );
l.get_end_lsn()
); if seg.rel.is_relation() {
is_tombstone = ancestor.get_rel_exists(seg.rel, prior_lsn).unwrap_or(false); result.ondisk_relfiles_needed_as_tombstone += 1;
} else {
result.ondisk_nonrelfiles_needed_as_tombstone += 1;
}
continue 'for_all_historic_layers;
} }
} }
if is_tombstone { // We didn't find any reason to keep this file, so remove it.
info!( info!(
"keeping {} {}-{} because this layer servers as a tombstome for older layer", "garbage collecting {} {}-{} {}",
seg, l.get_seg_tag(),
l.get_start_lsn(), l.get_start_lsn(),
l.get_end_lsn() l.get_end_lsn(),
); l.is_dropped()
);
layers_to_remove.push(Arc::clone(&l));
}
if seg.rel.is_relation() { // Actually delete the layers from disk and remove them from the map.
result.ondisk_relfiles_needed_as_tombstone += 1; // (couldn't do this in the loop above, because you cannot modify a collection
} else { // while iterating it. BTreeMap::retain() would be another option)
result.ondisk_nonrelfiles_needed_as_tombstone += 1; for doomed_layer in layers_to_remove {
} doomed_layer.delete()?;
continue 'outer; layers.remove_historic(doomed_layer.clone());
match (
doomed_layer.is_dropped(),
doomed_layer.get_seg_tag().rel.is_relation(),
) {
(true, true) => result.ondisk_relfiles_dropped += 1,
(true, false) => result.ondisk_nonrelfiles_dropped += 1,
(false, true) => result.ondisk_relfiles_removed += 1,
(false, false) => result.ondisk_nonrelfiles_removed += 1,
} }
} }
// We didn't find any reason to keep this file, so remove it. if let Some(delta_layer) = layer_to_materialize {
info!( drop(layers); // release lock, as far as new image layers are created only by GC thread,
"garbage collecting {} {}-{} {}", let image_layer = ImageLayer::create_from_src(
l.get_seg_tag(), self.conf,
l.get_start_lsn(), &self,
l.get_end_lsn(), &*delta_layer,
l.is_dropped() delta_layer.get_end_lsn(),
); )?;
layers_to_remove.push(Arc::clone(&l)); layers = self.layers.lock().unwrap();
} info!(
"materialize layer {} {}-{}",
// Actually delete the layers from disk and remove them from the map. delta_layer.get_seg_tag(),
// (couldn't do this in the loop above, because you cannot modify a collection delta_layer.get_start_lsn(),
// while iterating it. BTreeMap::retain() would be another option) delta_layer.get_end_lsn()
for doomed_layer in layers_to_remove { );
doomed_layer.delete()?; layers.insert_historic(Arc::new(image_layer));
layers.remove_historic(doomed_layer.clone()); continue 'outer;
match (
doomed_layer.is_dropped(),
doomed_layer.get_seg_tag().rel.is_relation(),
) {
(true, true) => result.ondisk_relfiles_dropped += 1,
(true, false) => result.ondisk_nonrelfiles_dropped += 1,
(false, true) => result.ondisk_relfiles_removed += 1,
(false, false) => result.ondisk_nonrelfiles_removed += 1,
} }
break 'outer;
} }
result.elapsed = now.elapsed(); result.elapsed = now.elapsed();
Ok(result) Ok(result)
} }
@@ -1950,17 +1936,6 @@ pub fn dump_layerfile_from_path(path: &Path) -> Result<()> {
Ok(()) Ok(())
} }
/// Check for equality of Layer memory addresses
fn layer_ptr_eq(l1: &dyn Layer, l2: &dyn Layer) -> bool {
let l1_ptr = l1 as *const dyn Layer;
let l2_ptr = l2 as *const dyn Layer;
// comparing *const dyn Layer will not only check for data address equality,
// but also for vtable address equality.
// to avoid this, we compare *const ().
// see here for more https://github.com/rust-lang/rust/issues/46139
std::ptr::eq(l1_ptr as *const (), l2_ptr as *const ())
}
/// Add a suffix to a layer file's name: .{num}.old /// Add a suffix to a layer file's name: .{num}.old
/// Uses the first available num (starts at 0) /// Uses the first available num (starts at 0)
fn rename_to_backup(path: PathBuf) -> anyhow::Result<()> { fn rename_to_backup(path: PathBuf) -> anyhow::Result<()> {

View File

@@ -15,13 +15,11 @@ use crate::{ZTenantId, ZTimelineId};
use anyhow::{bail, Result}; use anyhow::{bail, Result};
use bytes::Bytes; use bytes::Bytes;
use log::*; use log::*;
use std::cmp::min;
use std::collections::BTreeMap; use std::collections::BTreeMap;
use std::ops::Bound::Included; use std::ops::Bound::Included;
use std::path::PathBuf; use std::path::PathBuf;
use std::sync::{Arc, RwLock}; use std::sync::{Arc, RwLock};
use zenith_utils::accum::Accum;
use zenith_utils::lsn::Lsn; use zenith_utils::lsn::Lsn;
pub struct InMemoryLayer { pub struct InMemoryLayer {
@@ -36,9 +34,6 @@ pub struct InMemoryLayer {
/// ///
start_lsn: Lsn, start_lsn: Lsn,
/// Frozen in-memory layers have an inclusive end LSN.
end_lsn: Option<Lsn>,
/// LSN of the oldest page version stored in this layer /// LSN of the oldest page version stored in this layer
oldest_pending_lsn: Lsn, oldest_pending_lsn: Lsn,
@@ -51,6 +46,9 @@ pub struct InMemoryLayerInner {
/// If this relation was dropped, remember when that happened. /// If this relation was dropped, remember when that happened.
drop_lsn: Option<Lsn>, drop_lsn: Option<Lsn>,
/// LSN of last record
end_lsn: Option<Lsn>,
/// ///
/// All versions of all pages in the layer are are kept here. /// All versions of all pages in the layer are are kept here.
/// Indexed by block number and LSN. /// Indexed by block number and LSN.
@@ -118,6 +116,11 @@ impl Layer for InMemoryLayer {
PathBuf::from(format!("inmem-{}", delta_filename)) PathBuf::from(format!("inmem-{}", delta_filename))
} }
fn get_end_lsn(&self) -> Lsn {
let inner = self.inner.read().unwrap();
inner.drop_lsn.unwrap_or(Lsn(u64::MAX))
}
fn path(&self) -> Option<PathBuf> { fn path(&self) -> Option<PathBuf> {
None None
} }
@@ -134,20 +137,6 @@ impl Layer for InMemoryLayer {
self.start_lsn self.start_lsn
} }
fn get_end_lsn(&self) -> Lsn {
if let Some(end_lsn) = self.end_lsn {
return Lsn(end_lsn.0 + 1);
}
let inner = self.inner.read().unwrap();
if let Some(drop_lsn) = inner.drop_lsn {
drop_lsn
} else {
Lsn(u64::MAX)
}
}
fn is_dropped(&self) -> bool { fn is_dropped(&self) -> bool {
let inner = self.inner.read().unwrap(); let inner = self.inner.read().unwrap();
inner.drop_lsn.is_some() inner.drop_lsn.is_some()
@@ -252,8 +241,7 @@ impl Layer for InMemoryLayer {
} }
fn is_incremental(&self) -> bool { fn is_incremental(&self) -> bool {
let inner = self.inner.read().unwrap(); true
inner.predecessor.is_some()
} }
/// debugging function to print out the contents of the layer /// debugging function to print out the contents of the layer
@@ -296,19 +284,7 @@ pub struct NonWriteableError;
pub type WriteResult<T> = std::result::Result<T, NonWriteableError>; pub type WriteResult<T> = std::result::Result<T, NonWriteableError>;
/// Helper struct to cleanup `InMemoryLayer::freeze` return signature.
pub struct FreezeLayers {
/// Replacement layer for the layer which freeze was called on.
pub frozen: Arc<InMemoryLayer>,
/// New open layer containing leftover data.
pub open: Option<Arc<InMemoryLayer>>,
}
impl InMemoryLayer { impl InMemoryLayer {
fn assert_not_frozen(&self) {
assert!(self.end_lsn.is_none());
}
/// Return the oldest page version that's stored in this layer /// Return the oldest page version that's stored in this layer
pub fn get_oldest_pending_lsn(&self) -> Lsn { pub fn get_oldest_pending_lsn(&self) -> Lsn {
self.oldest_pending_lsn self.oldest_pending_lsn
@@ -338,10 +314,10 @@ impl InMemoryLayer {
tenantid, tenantid,
seg, seg,
start_lsn, start_lsn,
end_lsn: None,
oldest_pending_lsn, oldest_pending_lsn,
inner: RwLock::new(InMemoryLayerInner { inner: RwLock::new(InMemoryLayerInner {
drop_lsn: None, drop_lsn: None,
end_lsn: None,
page_versions: BTreeMap::new(), page_versions: BTreeMap::new(),
segsizes: BTreeMap::new(), segsizes: BTreeMap::new(),
writeable: true, writeable: true,
@@ -379,7 +355,6 @@ impl InMemoryLayer {
/// Common subroutine of the public put_wal_record() and put_page_image() functions. /// Common subroutine of the public put_wal_record() and put_page_image() functions.
/// Adds the page version to the in-memory tree /// Adds the page version to the in-memory tree
pub fn put_page_version(&self, blknum: u32, lsn: Lsn, pv: PageVersion) -> WriteResult<u32> { pub fn put_page_version(&self, blknum: u32, lsn: Lsn, pv: PageVersion) -> WriteResult<u32> {
self.assert_not_frozen();
assert!(self.seg.blknum_in_seg(blknum)); assert!(self.seg.blknum_in_seg(blknum));
trace!( trace!(
@@ -392,6 +367,7 @@ impl InMemoryLayer {
let mut inner = self.inner.write().unwrap(); let mut inner = self.inner.write().unwrap();
inner.check_writeable()?; inner.check_writeable()?;
inner.end_lsn = Some(lsn);
let old = inner.page_versions.insert((blknum, lsn), pv); let old = inner.page_versions.insert((blknum, lsn), pv);
@@ -458,10 +434,9 @@ impl InMemoryLayer {
/// Remember that the relation was truncated at given LSN /// Remember that the relation was truncated at given LSN
pub fn put_truncation(&self, lsn: Lsn, segsize: u32) -> WriteResult<()> { pub fn put_truncation(&self, lsn: Lsn, segsize: u32) -> WriteResult<()> {
self.assert_not_frozen();
let mut inner = self.inner.write().unwrap(); let mut inner = self.inner.write().unwrap();
inner.check_writeable()?; inner.check_writeable()?;
inner.end_lsn = Some(lsn);
// check that this we truncate to a smaller size than segment was before the truncation // check that this we truncate to a smaller size than segment was before the truncation
let oldsize = inner.get_seg_size(lsn); let oldsize = inner.get_seg_size(lsn);
@@ -479,8 +454,6 @@ impl InMemoryLayer {
/// Remember that the segment was dropped at given LSN /// Remember that the segment was dropped at given LSN
pub fn drop_segment(&self, lsn: Lsn) -> WriteResult<()> { pub fn drop_segment(&self, lsn: Lsn) -> WriteResult<()> {
self.assert_not_frozen();
let mut inner = self.inner.write().unwrap(); let mut inner = self.inner.write().unwrap();
inner.check_writeable()?; inner.check_writeable()?;
@@ -531,10 +504,10 @@ impl InMemoryLayer {
tenantid, tenantid,
seg, seg,
start_lsn, start_lsn,
end_lsn: None,
oldest_pending_lsn, oldest_pending_lsn,
inner: RwLock::new(InMemoryLayerInner { inner: RwLock::new(InMemoryLayerInner {
drop_lsn: None, drop_lsn: None,
end_lsn: None,
page_versions: BTreeMap::new(), page_versions: BTreeMap::new(),
segsizes, segsizes,
writeable: true, writeable: true,
@@ -548,125 +521,12 @@ impl InMemoryLayer {
inner.writeable inner.writeable
} }
/// Splits `self` into two InMemoryLayers: `frozen` and `open`.
/// All data up to and including `cutoff_lsn`
/// is copied to `frozen`, while the remaining data is copied to `open`.
/// After completion, self is non-writeable, but not frozen.
pub fn freeze(self: Arc<Self>, cutoff_lsn: Lsn) -> Result<FreezeLayers> {
info!(
"freezing in memory layer {} on timeline {} at {} (oldest {})",
self.filename().display(),
self.timelineid,
cutoff_lsn,
self.oldest_pending_lsn
);
self.assert_not_frozen();
let self_ref = self.clone();
let mut inner = self_ref.inner.write().unwrap();
// Dropped layers don't need any special freeze actions,
// they are marked as non-writeable at drop and just
// written out to disk by checkpointer.
if inner.drop_lsn.is_some() {
assert!(!inner.writeable);
info!(
"freezing in memory layer for {} on timeline {} is dropped at {}",
self.seg,
self.timelineid,
inner.drop_lsn.unwrap()
);
// There should be no newer layer that refers this non-writeable layer,
// because layer that is created after dropped one represents a new rel.
return Ok(FreezeLayers {
frozen: self,
open: None,
});
}
assert!(inner.writeable);
inner.writeable = false;
// Divide all the page versions into old and new
// at the 'cutoff_lsn' point.
let mut before_segsizes = BTreeMap::new();
let mut after_segsizes = BTreeMap::new();
let mut after_oldest_lsn: Accum<Lsn> = Accum(None);
for (lsn, size) in inner.segsizes.iter() {
if *lsn > cutoff_lsn {
after_segsizes.insert(*lsn, *size);
after_oldest_lsn.accum(min, *lsn);
} else {
before_segsizes.insert(*lsn, *size);
}
}
let mut before_page_versions = BTreeMap::new();
let mut after_page_versions = BTreeMap::new();
for ((blknum, lsn), pv) in inner.page_versions.iter() {
if *lsn > cutoff_lsn {
after_page_versions.insert((*blknum, *lsn), pv.clone());
after_oldest_lsn.accum(min, *lsn);
} else {
before_page_versions.insert((*blknum, *lsn), pv.clone());
}
}
let frozen = Arc::new(InMemoryLayer {
conf: self.conf,
tenantid: self.tenantid,
timelineid: self.timelineid,
seg: self.seg,
start_lsn: self.start_lsn,
end_lsn: Some(cutoff_lsn),
oldest_pending_lsn: self.start_lsn,
inner: RwLock::new(InMemoryLayerInner {
drop_lsn: inner.drop_lsn,
page_versions: before_page_versions,
segsizes: before_segsizes,
writeable: false,
predecessor: inner.predecessor.clone(),
}),
});
let open = if !after_segsizes.is_empty() || !after_page_versions.is_empty() {
let mut new_open = Self::create_successor_layer(
self.conf,
frozen.clone(),
self.timelineid,
self.tenantid,
cutoff_lsn + 1,
after_oldest_lsn.0.unwrap(),
)?;
let new_inner = new_open.inner.get_mut().unwrap();
new_inner.page_versions.append(&mut after_page_versions);
new_inner.segsizes.append(&mut after_segsizes);
Some(Arc::new(new_open))
} else {
None
};
Ok(FreezeLayers { frozen, open })
}
/// Write the this frozen in-memory layer to disk. /// Write the this frozen in-memory layer to disk.
/// ///
/// Returns new layers that replace this one. /// Returns new DeltaLayer that includes all the
/// If not dropped, returns a new image layer containing the page versions /// WAL records between start and end LSN.
/// at the `end_lsn`. Can also return a DeltaLayer that includes all the ///
/// WAL records between start and end LSN. (The delta layer is not needed pub fn write_to_disk(&self, timeline: &LayeredTimeline) -> Result<Arc<dyn Layer>> {
/// when a new relish is created with a single LSN, so that the start and
/// end LSN are the same.)
pub fn write_to_disk(&self, timeline: &LayeredTimeline) -> Result<Vec<Arc<dyn Layer>>> {
trace!(
"write_to_disk {} end_lsn is {} get_end_lsn is {}",
self.filename().display(),
self.end_lsn.unwrap_or(Lsn(0)),
self.get_end_lsn()
);
// Grab the lock in read-mode. We hold it over the I/O, but because this // Grab the lock in read-mode. We hold it over the I/O, but because this
// layer is not writeable anymore, no one should be trying to aquire the // layer is not writeable anymore, no one should be trying to aquire the
// write lock on it, so we shouldn't block anyone. There's one exception // write lock on it, so we shouldn't block anyone. There's one exception
@@ -677,7 +537,12 @@ impl InMemoryLayer {
// would have to wait until we release it. That race condition is very // would have to wait until we release it. That race condition is very
// rare though, so we just accept the potential latency hit for now. // rare though, so we just accept the potential latency hit for now.
let inner = self.inner.read().unwrap(); let inner = self.inner.read().unwrap();
assert!(!inner.writeable);
trace!(
"write_to_disk {} end_lsn is {}",
self.filename().display(),
inner.end_lsn.unwrap_or(Lsn(0)),
);
let predecessor = inner.predecessor.clone(); let predecessor = inner.predecessor.clone();
@@ -700,24 +565,10 @@ impl InMemoryLayer {
self.start_lsn, self.start_lsn,
drop_lsn drop_lsn
); );
return Ok(vec![Arc::new(delta_layer)]); return Ok(Arc::new(delta_layer));
} }
let end_lsn = self.end_lsn.unwrap(); let end_lsn = inner.end_lsn.unwrap();
let mut before_segsizes = BTreeMap::new();
for (lsn, size) in inner.segsizes.iter() {
if *lsn <= end_lsn {
before_segsizes.insert(*lsn, *size);
}
}
let mut before_page_versions = inner.page_versions.iter().filter(|tup| {
let ((_blknum, lsn), _pv) = tup;
*lsn < end_lsn
});
let mut frozen_layers: Vec<Arc<dyn Layer>> = Vec::new();
if self.start_lsn != end_lsn { if self.start_lsn != end_lsn {
// Write the page versions before the cutoff to disk. // Write the page versions before the cutoff to disk.
@@ -730,32 +581,13 @@ impl InMemoryLayer {
end_lsn, end_lsn,
false, false,
predecessor, predecessor,
before_page_versions, inner.page_versions.iter(),
before_segsizes, inner.segsizes.clone(),
)?; )?;
frozen_layers.push(Arc::new(delta_layer)); Ok(Arc::new(delta_layer))
trace!(
"freeze: created delta layer {} {}-{}",
self.seg,
self.start_lsn,
end_lsn
);
} else { } else {
assert!(before_page_versions.next().is_none()); let image_layer = ImageLayer::create_from_src(self.conf, timeline, self, end_lsn)?;
Ok(Arc::new(image_layer))
} }
drop(inner);
// Write a new base image layer at the cutoff point
let image_layer = ImageLayer::create_from_src(self.conf, timeline, self, end_lsn)?;
frozen_layers.push(Arc::new(image_layer));
trace!("freeze: created image layer {} at {}", self.seg, end_lsn);
Ok(frozen_layers)
}
pub fn update_predecessor(&self, predecessor: Arc<dyn Layer>) -> Option<Arc<dyn Layer>> {
let mut inner = self.inner.write().unwrap();
inner.predecessor.replace(predecessor)
} }
} }

View File

@@ -219,7 +219,7 @@ where
} }
} }
// No more elements at this point. Move to next point. // No more elements at this point. Move to next point.
if let Some((point_key, point)) = self.point_iter.next() { if let Some((point_key, point)) = self.point_iter.next_back() {
self.elem_iter = Some((*point_key, point.elements.iter())); self.elem_iter = Some((*point_key, point.elements.iter()));
continue; continue;
} else { } else {

View File

@@ -36,8 +36,8 @@ pub mod defaults {
pub const DEFAULT_CHECKPOINT_DISTANCE: u64 = 256 * 1024 * 1024; pub const DEFAULT_CHECKPOINT_DISTANCE: u64 = 256 * 1024 * 1024;
pub const DEFAULT_CHECKPOINT_PERIOD: Duration = Duration::from_secs(1); pub const DEFAULT_CHECKPOINT_PERIOD: Duration = Duration::from_secs(1);
pub const DEFAULT_GC_HORIZON: u64 = 64 * 1024 * 1024; pub const DEFAULT_GC_HORIZON: u64 = 64u64 * 1024 * 1024 * 1024;
pub const DEFAULT_GC_PERIOD: Duration = Duration::from_secs(100); pub const DEFAULT_GC_PERIOD: Duration = Duration::from_secs(1);
pub const DEFAULT_SUPERUSER: &str = "zenith_admin"; pub const DEFAULT_SUPERUSER: &str = "zenith_admin";
pub const DEFAULT_RELISH_STORAGE_MAX_CONCURRENT_SYNC_LIMITS: usize = 100; pub const DEFAULT_RELISH_STORAGE_MAX_CONCURRENT_SYNC_LIMITS: usize = 100;