mirror of
https://github.com/neondatabase/neon.git
synced 2026-01-18 02:42:56 +00:00
Compare commits
3 Commits
release-pr
...
fast_check
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
7b1fea280b | ||
|
|
20e7c0d76a | ||
|
|
77d352400e |
@@ -32,7 +32,6 @@ use std::sync::{Arc, Mutex, MutexGuard};
|
||||
use std::time::{Duration, Instant};
|
||||
use std::{fs, thread};
|
||||
|
||||
use crate::layered_repository::inmemory_layer::FreezeLayers;
|
||||
use crate::relish::*;
|
||||
use crate::relish_storage::schedule_timeline_upload;
|
||||
use crate::repository::{GcResult, Repository, Timeline, WALRecord};
|
||||
@@ -1418,58 +1417,14 @@ impl LayeredTimeline {
|
||||
break;
|
||||
}
|
||||
|
||||
// Freeze the layer.
|
||||
//
|
||||
// This is a two-step process. First, we "freeze" the in-memory
|
||||
// layer, to close it for new writes, and replace the original
|
||||
// layer with the new frozen in-memory layer (and possibly a new
|
||||
// open layer to hold changes newer than the cutoff.) Then we write
|
||||
// the frozen layer to disk, and replace the in-memory frozen layer
|
||||
// with the new on-disk layers.
|
||||
let FreezeLayers {
|
||||
frozen,
|
||||
open: maybe_new_open,
|
||||
} = oldest_layer.freeze(last_record_lsn)?;
|
||||
|
||||
// replace this layer with the new layers that 'freeze' returned
|
||||
layers.pop_oldest_open();
|
||||
if let Some(new_open) = maybe_new_open.clone() {
|
||||
layers.insert_open(new_open);
|
||||
}
|
||||
|
||||
// We temporarily insert InMemory layer into historic list here.
|
||||
// TODO: check that all possible concurrent users of 'historic' treat it right
|
||||
layers.insert_historic(frozen.clone());
|
||||
let new_delta_layer = oldest_layer.write_to_disk(self)?;
|
||||
created_historics = true;
|
||||
|
||||
// Write the now-frozen layer to disk. That could take a while, so release the lock while do it
|
||||
drop(layers);
|
||||
let new_historics = frozen.write_to_disk(self)?;
|
||||
layers = self.layers.lock().unwrap();
|
||||
|
||||
if !new_historics.is_empty() {
|
||||
created_historics = true;
|
||||
}
|
||||
|
||||
// Finally, replace the frozen in-memory layer with the new on-disk layers
|
||||
layers.remove_historic(frozen.clone());
|
||||
|
||||
// If we created a successor InMemoryLayer, its predecessor is
|
||||
// currently the frozen layer. We need to update the predecessor
|
||||
// to be the latest on-disk layer.
|
||||
if let Some(last_historic) = new_historics.last() {
|
||||
if let Some(new_open) = &maybe_new_open {
|
||||
let maybe_old_predecessor =
|
||||
new_open.update_predecessor(Arc::clone(last_historic));
|
||||
let old_predecessor = maybe_old_predecessor
|
||||
.expect("new_open should always be a successor to frozen");
|
||||
assert!(layer_ptr_eq(frozen.as_ref(), old_predecessor.as_ref()));
|
||||
}
|
||||
}
|
||||
|
||||
// Add the historics to the LayerMap
|
||||
for n in new_historics {
|
||||
layers.insert_historic(n);
|
||||
}
|
||||
// Add the historic to the LayerMap
|
||||
layers.insert_historic(new_delta_layer);
|
||||
}
|
||||
|
||||
// Call unload() on all frozen layers, to release memory.
|
||||
@@ -1567,181 +1522,212 @@ impl LayeredTimeline {
|
||||
self.timelineid, cutoff
|
||||
);
|
||||
info!("retain_lsns: {:?}", retain_lsns);
|
||||
|
||||
let mut layers_to_remove: Vec<Arc<dyn Layer>> = Vec::new();
|
||||
|
||||
// Scan all on-disk layers in the timeline.
|
||||
//
|
||||
// Garbage collect the layer if all conditions are satisfied:
|
||||
// 1. it is older than cutoff LSN;
|
||||
// 2. it doesn't need to be retained for 'retain_lsns';
|
||||
// 3. newer on-disk layer exists (only for non-dropped segments);
|
||||
// 4. this layer doesn't serve as a tombstone for some older layer;
|
||||
//
|
||||
let mut layers = self.layers.lock().unwrap();
|
||||
'outer: for l in layers.iter_historic_layers() {
|
||||
let seg = l.get_seg_tag();
|
||||
|
||||
if seg.rel.is_relation() {
|
||||
result.ondisk_relfiles_total += 1;
|
||||
} else {
|
||||
result.ondisk_nonrelfiles_total += 1;
|
||||
}
|
||||
'outer: loop {
|
||||
let mut layers_to_remove: Vec<Arc<dyn Layer>> = Vec::new();
|
||||
let mut layer_to_materialize: Option<Arc<dyn Layer>> = None;
|
||||
|
||||
// Scan all on-disk layers in the timeline.
|
||||
//
|
||||
// Garbage collect the layer if all conditions are satisfied:
|
||||
// 1. it is older than cutoff LSN;
|
||||
// 2. it doesn't need to be retained for 'retain_lsns';
|
||||
// 3. newer on-disk layer exists (only for non-dropped segments);
|
||||
// 4. this layer doesn't serve as a tombstone for some older layer;
|
||||
//
|
||||
'for_all_historic_layers: for l in layers.iter_historic_layers() {
|
||||
let seg = l.get_seg_tag();
|
||||
|
||||
// 1. Is it newer than cutoff point?
|
||||
if l.get_end_lsn() > cutoff {
|
||||
info!(
|
||||
"keeping {} {}-{} because it's newer than cutoff {}",
|
||||
seg,
|
||||
l.get_start_lsn(),
|
||||
l.get_end_lsn(),
|
||||
cutoff
|
||||
);
|
||||
if seg.rel.is_relation() {
|
||||
result.ondisk_relfiles_needed_by_cutoff += 1;
|
||||
result.ondisk_relfiles_total += 1;
|
||||
} else {
|
||||
result.ondisk_nonrelfiles_needed_by_cutoff += 1;
|
||||
result.ondisk_nonrelfiles_total += 1;
|
||||
}
|
||||
continue 'outer;
|
||||
}
|
||||
|
||||
// 2. Is it needed by a child branch?
|
||||
for retain_lsn in &retain_lsns {
|
||||
// start_lsn is inclusive and end_lsn is exclusive
|
||||
if l.get_start_lsn() <= *retain_lsn && *retain_lsn < l.get_end_lsn() {
|
||||
// 1. Create image layer for this relation?
|
||||
if !l.is_dropped()
|
||||
&& !layers.newer_image_layer_exists(l.get_seg_tag(), l.get_end_lsn())
|
||||
{
|
||||
if l.is_incremental() {
|
||||
layer_to_materialize = Some(Arc::clone(&l));
|
||||
break 'for_all_historic_layers;
|
||||
} else {
|
||||
info!(
|
||||
"keeping {} {}-{} because it's newer than cutoff {}",
|
||||
seg,
|
||||
l.get_start_lsn(),
|
||||
l.get_end_lsn(),
|
||||
cutoff
|
||||
);
|
||||
if seg.rel.is_relation() {
|
||||
result.ondisk_relfiles_needed_by_cutoff += 1;
|
||||
} else {
|
||||
result.ondisk_nonrelfiles_needed_by_cutoff += 1;
|
||||
}
|
||||
continue 'for_all_historic_layers;
|
||||
}
|
||||
}
|
||||
|
||||
// 2. Is it newer than cutoff point?
|
||||
if l.get_end_lsn() > cutoff {
|
||||
info!(
|
||||
"keeping {} {}-{} because it's needed by branch point {}",
|
||||
"keeping {} {}-{} because it's newer than cutoff {}",
|
||||
seg,
|
||||
l.get_start_lsn(),
|
||||
l.get_end_lsn(),
|
||||
*retain_lsn
|
||||
cutoff
|
||||
);
|
||||
if seg.rel.is_relation() {
|
||||
result.ondisk_relfiles_needed_by_branches += 1;
|
||||
result.ondisk_relfiles_needed_by_cutoff += 1;
|
||||
} else {
|
||||
result.ondisk_nonrelfiles_needed_by_branches += 1;
|
||||
result.ondisk_nonrelfiles_needed_by_cutoff += 1;
|
||||
}
|
||||
continue 'outer;
|
||||
continue 'for_all_historic_layers;
|
||||
}
|
||||
}
|
||||
|
||||
// 3. Is there a later on-disk layer for this relation?
|
||||
if !l.is_dropped() && !layers.newer_image_layer_exists(l.get_seg_tag(), l.get_end_lsn())
|
||||
{
|
||||
info!(
|
||||
"keeping {} {}-{} because it is the latest layer",
|
||||
seg,
|
||||
l.get_start_lsn(),
|
||||
l.get_end_lsn()
|
||||
);
|
||||
if seg.rel.is_relation() {
|
||||
result.ondisk_relfiles_not_updated += 1;
|
||||
} else {
|
||||
result.ondisk_nonrelfiles_not_updated += 1;
|
||||
}
|
||||
continue 'outer;
|
||||
}
|
||||
|
||||
// 4. Does this layer serve as a tombstome for some older layer?
|
||||
if l.is_dropped() {
|
||||
let prior_lsn = l.get_start_lsn().checked_sub(1u64).unwrap();
|
||||
|
||||
// Check if this layer serves as a tombstone for this timeline
|
||||
// We have to do this separately from timeline check below,
|
||||
// because LayerMap of this timeline is already locked.
|
||||
let mut is_tombstone = layers.layer_exists_at_lsn(l.get_seg_tag(), prior_lsn)?;
|
||||
if is_tombstone {
|
||||
info!(
|
||||
"earlier layer exists at {} in {}",
|
||||
prior_lsn, self.timelineid
|
||||
);
|
||||
}
|
||||
// Now check ancestor timelines, if any
|
||||
else if let Some(ancestor) = &self.ancestor_timeline {
|
||||
let prior_lsn = ancestor.get_last_record_lsn();
|
||||
if seg.rel.is_blocky() {
|
||||
// 3. Is it needed by a child branch?
|
||||
for retain_lsn in &retain_lsns {
|
||||
// start_lsn is inclusive and end_lsn is exclusive
|
||||
if l.get_start_lsn() <= *retain_lsn && *retain_lsn < l.get_end_lsn() {
|
||||
info!(
|
||||
"check blocky relish size {} at {} in {} for layer {}-{}",
|
||||
"keeping {} {}-{} because it's needed by branch point {}",
|
||||
seg,
|
||||
prior_lsn,
|
||||
ancestor.timelineid,
|
||||
l.get_start_lsn(),
|
||||
l.get_end_lsn()
|
||||
l.get_end_lsn(),
|
||||
*retain_lsn
|
||||
);
|
||||
match ancestor.get_relish_size(seg.rel, prior_lsn).unwrap() {
|
||||
Some(size) => {
|
||||
let last_live_seg = SegmentTag::from_blknum(seg.rel, size - 1);
|
||||
info!(
|
||||
"blocky rel size is {} last_live_seg.segno {} seg.segno {}",
|
||||
size, last_live_seg.segno, seg.segno
|
||||
);
|
||||
if last_live_seg.segno >= seg.segno {
|
||||
is_tombstone = true;
|
||||
if seg.rel.is_relation() {
|
||||
result.ondisk_relfiles_needed_by_branches += 1;
|
||||
} else {
|
||||
result.ondisk_nonrelfiles_needed_by_branches += 1;
|
||||
}
|
||||
continue 'for_all_historic_layers;
|
||||
}
|
||||
}
|
||||
|
||||
// 4. Does this layer serve as a tombstome for some older layer?
|
||||
if l.is_dropped() {
|
||||
let prior_lsn = l.get_start_lsn().checked_sub(1u64).unwrap();
|
||||
|
||||
// Check if this layer serves as a tombstone for this timeline
|
||||
// We have to do this separately from timeline check below,
|
||||
// because LayerMap of this timeline is already locked.
|
||||
let mut is_tombstone =
|
||||
layers.layer_exists_at_lsn(l.get_seg_tag(), prior_lsn)?;
|
||||
if is_tombstone {
|
||||
info!(
|
||||
"earlier layer exists at {} in {}",
|
||||
prior_lsn, self.timelineid
|
||||
);
|
||||
}
|
||||
// Now check ancestor timelines, if any
|
||||
else if let Some(ancestor) = &self.ancestor_timeline {
|
||||
let prior_lsn = ancestor.get_last_record_lsn();
|
||||
if seg.rel.is_blocky() {
|
||||
info!(
|
||||
"check blocky relish size {} at {} in {} for layer {}-{}",
|
||||
seg,
|
||||
prior_lsn,
|
||||
ancestor.timelineid,
|
||||
l.get_start_lsn(),
|
||||
l.get_end_lsn()
|
||||
);
|
||||
match ancestor.get_relish_size(seg.rel, prior_lsn).unwrap() {
|
||||
Some(size) => {
|
||||
let last_live_seg = SegmentTag::from_blknum(seg.rel, size - 1);
|
||||
info!(
|
||||
"blocky rel size is {} last_live_seg.segno {} seg.segno {}",
|
||||
size, last_live_seg.segno, seg.segno
|
||||
);
|
||||
if last_live_seg.segno >= seg.segno {
|
||||
is_tombstone = true;
|
||||
}
|
||||
}
|
||||
_ => {
|
||||
info!("blocky rel doesn't exist");
|
||||
}
|
||||
}
|
||||
_ => {
|
||||
info!("blocky rel doesn't exist");
|
||||
}
|
||||
} else {
|
||||
info!(
|
||||
"check non-blocky relish existence {} at {} in {} for layer {}-{}",
|
||||
seg,
|
||||
prior_lsn,
|
||||
ancestor.timelineid,
|
||||
l.get_start_lsn(),
|
||||
l.get_end_lsn()
|
||||
);
|
||||
is_tombstone =
|
||||
ancestor.get_rel_exists(seg.rel, prior_lsn).unwrap_or(false);
|
||||
}
|
||||
} else {
|
||||
}
|
||||
|
||||
if is_tombstone {
|
||||
info!(
|
||||
"check non-blocky relish existence {} at {} in {} for layer {}-{}",
|
||||
seg,
|
||||
prior_lsn,
|
||||
ancestor.timelineid,
|
||||
l.get_start_lsn(),
|
||||
l.get_end_lsn()
|
||||
);
|
||||
is_tombstone = ancestor.get_rel_exists(seg.rel, prior_lsn).unwrap_or(false);
|
||||
"keeping {} {}-{} because this layer servers as a tombstome for older layer",
|
||||
seg,
|
||||
l.get_start_lsn(),
|
||||
l.get_end_lsn()
|
||||
);
|
||||
|
||||
if seg.rel.is_relation() {
|
||||
result.ondisk_relfiles_needed_as_tombstone += 1;
|
||||
} else {
|
||||
result.ondisk_nonrelfiles_needed_as_tombstone += 1;
|
||||
}
|
||||
continue 'for_all_historic_layers;
|
||||
}
|
||||
}
|
||||
|
||||
if is_tombstone {
|
||||
info!(
|
||||
"keeping {} {}-{} because this layer servers as a tombstome for older layer",
|
||||
seg,
|
||||
l.get_start_lsn(),
|
||||
l.get_end_lsn()
|
||||
);
|
||||
// We didn't find any reason to keep this file, so remove it.
|
||||
info!(
|
||||
"garbage collecting {} {}-{} {}",
|
||||
l.get_seg_tag(),
|
||||
l.get_start_lsn(),
|
||||
l.get_end_lsn(),
|
||||
l.is_dropped()
|
||||
);
|
||||
layers_to_remove.push(Arc::clone(&l));
|
||||
}
|
||||
|
||||
if seg.rel.is_relation() {
|
||||
result.ondisk_relfiles_needed_as_tombstone += 1;
|
||||
} else {
|
||||
result.ondisk_nonrelfiles_needed_as_tombstone += 1;
|
||||
}
|
||||
continue 'outer;
|
||||
// Actually delete the layers from disk and remove them from the map.
|
||||
// (couldn't do this in the loop above, because you cannot modify a collection
|
||||
// while iterating it. BTreeMap::retain() would be another option)
|
||||
for doomed_layer in layers_to_remove {
|
||||
doomed_layer.delete()?;
|
||||
layers.remove_historic(doomed_layer.clone());
|
||||
|
||||
match (
|
||||
doomed_layer.is_dropped(),
|
||||
doomed_layer.get_seg_tag().rel.is_relation(),
|
||||
) {
|
||||
(true, true) => result.ondisk_relfiles_dropped += 1,
|
||||
(true, false) => result.ondisk_nonrelfiles_dropped += 1,
|
||||
(false, true) => result.ondisk_relfiles_removed += 1,
|
||||
(false, false) => result.ondisk_nonrelfiles_removed += 1,
|
||||
}
|
||||
}
|
||||
|
||||
// We didn't find any reason to keep this file, so remove it.
|
||||
info!(
|
||||
"garbage collecting {} {}-{} {}",
|
||||
l.get_seg_tag(),
|
||||
l.get_start_lsn(),
|
||||
l.get_end_lsn(),
|
||||
l.is_dropped()
|
||||
);
|
||||
layers_to_remove.push(Arc::clone(&l));
|
||||
}
|
||||
|
||||
// Actually delete the layers from disk and remove them from the map.
|
||||
// (couldn't do this in the loop above, because you cannot modify a collection
|
||||
// while iterating it. BTreeMap::retain() would be another option)
|
||||
for doomed_layer in layers_to_remove {
|
||||
doomed_layer.delete()?;
|
||||
layers.remove_historic(doomed_layer.clone());
|
||||
|
||||
match (
|
||||
doomed_layer.is_dropped(),
|
||||
doomed_layer.get_seg_tag().rel.is_relation(),
|
||||
) {
|
||||
(true, true) => result.ondisk_relfiles_dropped += 1,
|
||||
(true, false) => result.ondisk_nonrelfiles_dropped += 1,
|
||||
(false, true) => result.ondisk_relfiles_removed += 1,
|
||||
(false, false) => result.ondisk_nonrelfiles_removed += 1,
|
||||
if let Some(delta_layer) = layer_to_materialize {
|
||||
drop(layers); // release lock, as far as new image layers are created only by GC thread,
|
||||
let image_layer = ImageLayer::create_from_src(
|
||||
self.conf,
|
||||
&self,
|
||||
&*delta_layer,
|
||||
delta_layer.get_end_lsn(),
|
||||
)?;
|
||||
layers = self.layers.lock().unwrap();
|
||||
info!(
|
||||
"materialize layer {} {}-{}",
|
||||
delta_layer.get_seg_tag(),
|
||||
delta_layer.get_start_lsn(),
|
||||
delta_layer.get_end_lsn()
|
||||
);
|
||||
layers.insert_historic(Arc::new(image_layer));
|
||||
continue 'outer;
|
||||
}
|
||||
break 'outer;
|
||||
}
|
||||
|
||||
result.elapsed = now.elapsed();
|
||||
Ok(result)
|
||||
}
|
||||
@@ -1950,17 +1936,6 @@ pub fn dump_layerfile_from_path(path: &Path) -> Result<()> {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Check for equality of Layer memory addresses
|
||||
fn layer_ptr_eq(l1: &dyn Layer, l2: &dyn Layer) -> bool {
|
||||
let l1_ptr = l1 as *const dyn Layer;
|
||||
let l2_ptr = l2 as *const dyn Layer;
|
||||
// comparing *const dyn Layer will not only check for data address equality,
|
||||
// but also for vtable address equality.
|
||||
// to avoid this, we compare *const ().
|
||||
// see here for more https://github.com/rust-lang/rust/issues/46139
|
||||
std::ptr::eq(l1_ptr as *const (), l2_ptr as *const ())
|
||||
}
|
||||
|
||||
/// Add a suffix to a layer file's name: .{num}.old
|
||||
/// Uses the first available num (starts at 0)
|
||||
fn rename_to_backup(path: PathBuf) -> anyhow::Result<()> {
|
||||
|
||||
@@ -15,13 +15,11 @@ use crate::{ZTenantId, ZTimelineId};
|
||||
use anyhow::{bail, Result};
|
||||
use bytes::Bytes;
|
||||
use log::*;
|
||||
use std::cmp::min;
|
||||
use std::collections::BTreeMap;
|
||||
use std::ops::Bound::Included;
|
||||
use std::path::PathBuf;
|
||||
use std::sync::{Arc, RwLock};
|
||||
|
||||
use zenith_utils::accum::Accum;
|
||||
use zenith_utils::lsn::Lsn;
|
||||
|
||||
pub struct InMemoryLayer {
|
||||
@@ -36,9 +34,6 @@ pub struct InMemoryLayer {
|
||||
///
|
||||
start_lsn: Lsn,
|
||||
|
||||
/// Frozen in-memory layers have an inclusive end LSN.
|
||||
end_lsn: Option<Lsn>,
|
||||
|
||||
/// LSN of the oldest page version stored in this layer
|
||||
oldest_pending_lsn: Lsn,
|
||||
|
||||
@@ -51,6 +46,9 @@ pub struct InMemoryLayerInner {
|
||||
/// If this relation was dropped, remember when that happened.
|
||||
drop_lsn: Option<Lsn>,
|
||||
|
||||
/// LSN of last record
|
||||
end_lsn: Option<Lsn>,
|
||||
|
||||
///
|
||||
/// All versions of all pages in the layer are are kept here.
|
||||
/// Indexed by block number and LSN.
|
||||
@@ -118,6 +116,11 @@ impl Layer for InMemoryLayer {
|
||||
PathBuf::from(format!("inmem-{}", delta_filename))
|
||||
}
|
||||
|
||||
fn get_end_lsn(&self) -> Lsn {
|
||||
let inner = self.inner.read().unwrap();
|
||||
inner.drop_lsn.unwrap_or(Lsn(u64::MAX))
|
||||
}
|
||||
|
||||
fn path(&self) -> Option<PathBuf> {
|
||||
None
|
||||
}
|
||||
@@ -134,20 +137,6 @@ impl Layer for InMemoryLayer {
|
||||
self.start_lsn
|
||||
}
|
||||
|
||||
fn get_end_lsn(&self) -> Lsn {
|
||||
if let Some(end_lsn) = self.end_lsn {
|
||||
return Lsn(end_lsn.0 + 1);
|
||||
}
|
||||
|
||||
let inner = self.inner.read().unwrap();
|
||||
|
||||
if let Some(drop_lsn) = inner.drop_lsn {
|
||||
drop_lsn
|
||||
} else {
|
||||
Lsn(u64::MAX)
|
||||
}
|
||||
}
|
||||
|
||||
fn is_dropped(&self) -> bool {
|
||||
let inner = self.inner.read().unwrap();
|
||||
inner.drop_lsn.is_some()
|
||||
@@ -252,8 +241,7 @@ impl Layer for InMemoryLayer {
|
||||
}
|
||||
|
||||
fn is_incremental(&self) -> bool {
|
||||
let inner = self.inner.read().unwrap();
|
||||
inner.predecessor.is_some()
|
||||
true
|
||||
}
|
||||
|
||||
/// debugging function to print out the contents of the layer
|
||||
@@ -296,19 +284,7 @@ pub struct NonWriteableError;
|
||||
|
||||
pub type WriteResult<T> = std::result::Result<T, NonWriteableError>;
|
||||
|
||||
/// Helper struct to cleanup `InMemoryLayer::freeze` return signature.
|
||||
pub struct FreezeLayers {
|
||||
/// Replacement layer for the layer which freeze was called on.
|
||||
pub frozen: Arc<InMemoryLayer>,
|
||||
/// New open layer containing leftover data.
|
||||
pub open: Option<Arc<InMemoryLayer>>,
|
||||
}
|
||||
|
||||
impl InMemoryLayer {
|
||||
fn assert_not_frozen(&self) {
|
||||
assert!(self.end_lsn.is_none());
|
||||
}
|
||||
|
||||
/// Return the oldest page version that's stored in this layer
|
||||
pub fn get_oldest_pending_lsn(&self) -> Lsn {
|
||||
self.oldest_pending_lsn
|
||||
@@ -338,10 +314,10 @@ impl InMemoryLayer {
|
||||
tenantid,
|
||||
seg,
|
||||
start_lsn,
|
||||
end_lsn: None,
|
||||
oldest_pending_lsn,
|
||||
inner: RwLock::new(InMemoryLayerInner {
|
||||
drop_lsn: None,
|
||||
end_lsn: None,
|
||||
page_versions: BTreeMap::new(),
|
||||
segsizes: BTreeMap::new(),
|
||||
writeable: true,
|
||||
@@ -379,7 +355,6 @@ impl InMemoryLayer {
|
||||
/// Common subroutine of the public put_wal_record() and put_page_image() functions.
|
||||
/// Adds the page version to the in-memory tree
|
||||
pub fn put_page_version(&self, blknum: u32, lsn: Lsn, pv: PageVersion) -> WriteResult<u32> {
|
||||
self.assert_not_frozen();
|
||||
assert!(self.seg.blknum_in_seg(blknum));
|
||||
|
||||
trace!(
|
||||
@@ -392,6 +367,7 @@ impl InMemoryLayer {
|
||||
let mut inner = self.inner.write().unwrap();
|
||||
|
||||
inner.check_writeable()?;
|
||||
inner.end_lsn = Some(lsn);
|
||||
|
||||
let old = inner.page_versions.insert((blknum, lsn), pv);
|
||||
|
||||
@@ -458,10 +434,9 @@ impl InMemoryLayer {
|
||||
|
||||
/// Remember that the relation was truncated at given LSN
|
||||
pub fn put_truncation(&self, lsn: Lsn, segsize: u32) -> WriteResult<()> {
|
||||
self.assert_not_frozen();
|
||||
|
||||
let mut inner = self.inner.write().unwrap();
|
||||
inner.check_writeable()?;
|
||||
inner.end_lsn = Some(lsn);
|
||||
|
||||
// check that this we truncate to a smaller size than segment was before the truncation
|
||||
let oldsize = inner.get_seg_size(lsn);
|
||||
@@ -479,8 +454,6 @@ impl InMemoryLayer {
|
||||
|
||||
/// Remember that the segment was dropped at given LSN
|
||||
pub fn drop_segment(&self, lsn: Lsn) -> WriteResult<()> {
|
||||
self.assert_not_frozen();
|
||||
|
||||
let mut inner = self.inner.write().unwrap();
|
||||
|
||||
inner.check_writeable()?;
|
||||
@@ -531,10 +504,10 @@ impl InMemoryLayer {
|
||||
tenantid,
|
||||
seg,
|
||||
start_lsn,
|
||||
end_lsn: None,
|
||||
oldest_pending_lsn,
|
||||
inner: RwLock::new(InMemoryLayerInner {
|
||||
drop_lsn: None,
|
||||
end_lsn: None,
|
||||
page_versions: BTreeMap::new(),
|
||||
segsizes,
|
||||
writeable: true,
|
||||
@@ -548,125 +521,12 @@ impl InMemoryLayer {
|
||||
inner.writeable
|
||||
}
|
||||
|
||||
/// Splits `self` into two InMemoryLayers: `frozen` and `open`.
|
||||
/// All data up to and including `cutoff_lsn`
|
||||
/// is copied to `frozen`, while the remaining data is copied to `open`.
|
||||
/// After completion, self is non-writeable, but not frozen.
|
||||
pub fn freeze(self: Arc<Self>, cutoff_lsn: Lsn) -> Result<FreezeLayers> {
|
||||
info!(
|
||||
"freezing in memory layer {} on timeline {} at {} (oldest {})",
|
||||
self.filename().display(),
|
||||
self.timelineid,
|
||||
cutoff_lsn,
|
||||
self.oldest_pending_lsn
|
||||
);
|
||||
|
||||
self.assert_not_frozen();
|
||||
|
||||
let self_ref = self.clone();
|
||||
let mut inner = self_ref.inner.write().unwrap();
|
||||
// Dropped layers don't need any special freeze actions,
|
||||
// they are marked as non-writeable at drop and just
|
||||
// written out to disk by checkpointer.
|
||||
if inner.drop_lsn.is_some() {
|
||||
assert!(!inner.writeable);
|
||||
info!(
|
||||
"freezing in memory layer for {} on timeline {} is dropped at {}",
|
||||
self.seg,
|
||||
self.timelineid,
|
||||
inner.drop_lsn.unwrap()
|
||||
);
|
||||
|
||||
// There should be no newer layer that refers this non-writeable layer,
|
||||
// because layer that is created after dropped one represents a new rel.
|
||||
return Ok(FreezeLayers {
|
||||
frozen: self,
|
||||
open: None,
|
||||
});
|
||||
}
|
||||
assert!(inner.writeable);
|
||||
inner.writeable = false;
|
||||
|
||||
// Divide all the page versions into old and new
|
||||
// at the 'cutoff_lsn' point.
|
||||
let mut before_segsizes = BTreeMap::new();
|
||||
let mut after_segsizes = BTreeMap::new();
|
||||
let mut after_oldest_lsn: Accum<Lsn> = Accum(None);
|
||||
for (lsn, size) in inner.segsizes.iter() {
|
||||
if *lsn > cutoff_lsn {
|
||||
after_segsizes.insert(*lsn, *size);
|
||||
after_oldest_lsn.accum(min, *lsn);
|
||||
} else {
|
||||
before_segsizes.insert(*lsn, *size);
|
||||
}
|
||||
}
|
||||
|
||||
let mut before_page_versions = BTreeMap::new();
|
||||
let mut after_page_versions = BTreeMap::new();
|
||||
for ((blknum, lsn), pv) in inner.page_versions.iter() {
|
||||
if *lsn > cutoff_lsn {
|
||||
after_page_versions.insert((*blknum, *lsn), pv.clone());
|
||||
after_oldest_lsn.accum(min, *lsn);
|
||||
} else {
|
||||
before_page_versions.insert((*blknum, *lsn), pv.clone());
|
||||
}
|
||||
}
|
||||
|
||||
let frozen = Arc::new(InMemoryLayer {
|
||||
conf: self.conf,
|
||||
tenantid: self.tenantid,
|
||||
timelineid: self.timelineid,
|
||||
seg: self.seg,
|
||||
start_lsn: self.start_lsn,
|
||||
end_lsn: Some(cutoff_lsn),
|
||||
oldest_pending_lsn: self.start_lsn,
|
||||
inner: RwLock::new(InMemoryLayerInner {
|
||||
drop_lsn: inner.drop_lsn,
|
||||
page_versions: before_page_versions,
|
||||
segsizes: before_segsizes,
|
||||
writeable: false,
|
||||
predecessor: inner.predecessor.clone(),
|
||||
}),
|
||||
});
|
||||
|
||||
let open = if !after_segsizes.is_empty() || !after_page_versions.is_empty() {
|
||||
let mut new_open = Self::create_successor_layer(
|
||||
self.conf,
|
||||
frozen.clone(),
|
||||
self.timelineid,
|
||||
self.tenantid,
|
||||
cutoff_lsn + 1,
|
||||
after_oldest_lsn.0.unwrap(),
|
||||
)?;
|
||||
|
||||
let new_inner = new_open.inner.get_mut().unwrap();
|
||||
new_inner.page_versions.append(&mut after_page_versions);
|
||||
new_inner.segsizes.append(&mut after_segsizes);
|
||||
|
||||
Some(Arc::new(new_open))
|
||||
} else {
|
||||
None
|
||||
};
|
||||
|
||||
Ok(FreezeLayers { frozen, open })
|
||||
}
|
||||
|
||||
/// Write the this frozen in-memory layer to disk.
|
||||
///
|
||||
/// Returns new layers that replace this one.
|
||||
/// If not dropped, returns a new image layer containing the page versions
|
||||
/// at the `end_lsn`. Can also return a DeltaLayer that includes all the
|
||||
/// WAL records between start and end LSN. (The delta layer is not needed
|
||||
/// when a new relish is created with a single LSN, so that the start and
|
||||
/// end LSN are the same.)
|
||||
pub fn write_to_disk(&self, timeline: &LayeredTimeline) -> Result<Vec<Arc<dyn Layer>>> {
|
||||
trace!(
|
||||
"write_to_disk {} end_lsn is {} get_end_lsn is {}",
|
||||
self.filename().display(),
|
||||
self.end_lsn.unwrap_or(Lsn(0)),
|
||||
self.get_end_lsn()
|
||||
);
|
||||
|
||||
/// Returns new DeltaLayer that includes all the
|
||||
/// WAL records between start and end LSN.
|
||||
///
|
||||
pub fn write_to_disk(&self, timeline: &LayeredTimeline) -> Result<Arc<dyn Layer>> {
|
||||
// Grab the lock in read-mode. We hold it over the I/O, but because this
|
||||
// layer is not writeable anymore, no one should be trying to aquire the
|
||||
// write lock on it, so we shouldn't block anyone. There's one exception
|
||||
@@ -677,7 +537,12 @@ impl InMemoryLayer {
|
||||
// would have to wait until we release it. That race condition is very
|
||||
// rare though, so we just accept the potential latency hit for now.
|
||||
let inner = self.inner.read().unwrap();
|
||||
assert!(!inner.writeable);
|
||||
|
||||
trace!(
|
||||
"write_to_disk {} end_lsn is {}",
|
||||
self.filename().display(),
|
||||
inner.end_lsn.unwrap_or(Lsn(0)),
|
||||
);
|
||||
|
||||
let predecessor = inner.predecessor.clone();
|
||||
|
||||
@@ -700,24 +565,10 @@ impl InMemoryLayer {
|
||||
self.start_lsn,
|
||||
drop_lsn
|
||||
);
|
||||
return Ok(vec![Arc::new(delta_layer)]);
|
||||
return Ok(Arc::new(delta_layer));
|
||||
}
|
||||
|
||||
let end_lsn = self.end_lsn.unwrap();
|
||||
|
||||
let mut before_segsizes = BTreeMap::new();
|
||||
for (lsn, size) in inner.segsizes.iter() {
|
||||
if *lsn <= end_lsn {
|
||||
before_segsizes.insert(*lsn, *size);
|
||||
}
|
||||
}
|
||||
let mut before_page_versions = inner.page_versions.iter().filter(|tup| {
|
||||
let ((_blknum, lsn), _pv) = tup;
|
||||
|
||||
*lsn < end_lsn
|
||||
});
|
||||
|
||||
let mut frozen_layers: Vec<Arc<dyn Layer>> = Vec::new();
|
||||
let end_lsn = inner.end_lsn.unwrap();
|
||||
|
||||
if self.start_lsn != end_lsn {
|
||||
// Write the page versions before the cutoff to disk.
|
||||
@@ -730,32 +581,13 @@ impl InMemoryLayer {
|
||||
end_lsn,
|
||||
false,
|
||||
predecessor,
|
||||
before_page_versions,
|
||||
before_segsizes,
|
||||
inner.page_versions.iter(),
|
||||
inner.segsizes.clone(),
|
||||
)?;
|
||||
frozen_layers.push(Arc::new(delta_layer));
|
||||
trace!(
|
||||
"freeze: created delta layer {} {}-{}",
|
||||
self.seg,
|
||||
self.start_lsn,
|
||||
end_lsn
|
||||
);
|
||||
Ok(Arc::new(delta_layer))
|
||||
} else {
|
||||
assert!(before_page_versions.next().is_none());
|
||||
let image_layer = ImageLayer::create_from_src(self.conf, timeline, self, end_lsn)?;
|
||||
Ok(Arc::new(image_layer))
|
||||
}
|
||||
|
||||
drop(inner);
|
||||
|
||||
// Write a new base image layer at the cutoff point
|
||||
let image_layer = ImageLayer::create_from_src(self.conf, timeline, self, end_lsn)?;
|
||||
frozen_layers.push(Arc::new(image_layer));
|
||||
trace!("freeze: created image layer {} at {}", self.seg, end_lsn);
|
||||
|
||||
Ok(frozen_layers)
|
||||
}
|
||||
|
||||
pub fn update_predecessor(&self, predecessor: Arc<dyn Layer>) -> Option<Arc<dyn Layer>> {
|
||||
let mut inner = self.inner.write().unwrap();
|
||||
inner.predecessor.replace(predecessor)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -219,7 +219,7 @@ where
|
||||
}
|
||||
}
|
||||
// No more elements at this point. Move to next point.
|
||||
if let Some((point_key, point)) = self.point_iter.next() {
|
||||
if let Some((point_key, point)) = self.point_iter.next_back() {
|
||||
self.elem_iter = Some((*point_key, point.elements.iter()));
|
||||
continue;
|
||||
} else {
|
||||
|
||||
@@ -36,8 +36,8 @@ pub mod defaults {
|
||||
pub const DEFAULT_CHECKPOINT_DISTANCE: u64 = 256 * 1024 * 1024;
|
||||
pub const DEFAULT_CHECKPOINT_PERIOD: Duration = Duration::from_secs(1);
|
||||
|
||||
pub const DEFAULT_GC_HORIZON: u64 = 64 * 1024 * 1024;
|
||||
pub const DEFAULT_GC_PERIOD: Duration = Duration::from_secs(100);
|
||||
pub const DEFAULT_GC_HORIZON: u64 = 64u64 * 1024 * 1024 * 1024;
|
||||
pub const DEFAULT_GC_PERIOD: Duration = Duration::from_secs(1);
|
||||
|
||||
pub const DEFAULT_SUPERUSER: &str = "zenith_admin";
|
||||
pub const DEFAULT_RELISH_STORAGE_MAX_CONCURRENT_SYNC_LIMITS: usize = 100;
|
||||
|
||||
Reference in New Issue
Block a user