mirror of
https://github.com/neondatabase/neon.git
synced 2026-03-05 17:30:38 +00:00
Compare commits
3 Commits
hack/compu
...
fast_check
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
7b1fea280b | ||
|
|
20e7c0d76a | ||
|
|
77d352400e |
@@ -32,7 +32,6 @@ use std::sync::{Arc, Mutex, MutexGuard};
|
|||||||
use std::time::{Duration, Instant};
|
use std::time::{Duration, Instant};
|
||||||
use std::{fs, thread};
|
use std::{fs, thread};
|
||||||
|
|
||||||
use crate::layered_repository::inmemory_layer::FreezeLayers;
|
|
||||||
use crate::relish::*;
|
use crate::relish::*;
|
||||||
use crate::relish_storage::schedule_timeline_upload;
|
use crate::relish_storage::schedule_timeline_upload;
|
||||||
use crate::repository::{GcResult, Repository, Timeline, WALRecord};
|
use crate::repository::{GcResult, Repository, Timeline, WALRecord};
|
||||||
@@ -1418,58 +1417,14 @@ impl LayeredTimeline {
|
|||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
|
||||||
// Freeze the layer.
|
|
||||||
//
|
|
||||||
// This is a two-step process. First, we "freeze" the in-memory
|
|
||||||
// layer, to close it for new writes, and replace the original
|
|
||||||
// layer with the new frozen in-memory layer (and possibly a new
|
|
||||||
// open layer to hold changes newer than the cutoff.) Then we write
|
|
||||||
// the frozen layer to disk, and replace the in-memory frozen layer
|
|
||||||
// with the new on-disk layers.
|
|
||||||
let FreezeLayers {
|
|
||||||
frozen,
|
|
||||||
open: maybe_new_open,
|
|
||||||
} = oldest_layer.freeze(last_record_lsn)?;
|
|
||||||
|
|
||||||
// replace this layer with the new layers that 'freeze' returned
|
// replace this layer with the new layers that 'freeze' returned
|
||||||
layers.pop_oldest_open();
|
layers.pop_oldest_open();
|
||||||
if let Some(new_open) = maybe_new_open.clone() {
|
|
||||||
layers.insert_open(new_open);
|
|
||||||
}
|
|
||||||
|
|
||||||
// We temporarily insert InMemory layer into historic list here.
|
let new_delta_layer = oldest_layer.write_to_disk(self)?;
|
||||||
// TODO: check that all possible concurrent users of 'historic' treat it right
|
created_historics = true;
|
||||||
layers.insert_historic(frozen.clone());
|
|
||||||
|
|
||||||
// Write the now-frozen layer to disk. That could take a while, so release the lock while do it
|
// Add the historic to the LayerMap
|
||||||
drop(layers);
|
layers.insert_historic(new_delta_layer);
|
||||||
let new_historics = frozen.write_to_disk(self)?;
|
|
||||||
layers = self.layers.lock().unwrap();
|
|
||||||
|
|
||||||
if !new_historics.is_empty() {
|
|
||||||
created_historics = true;
|
|
||||||
}
|
|
||||||
|
|
||||||
// Finally, replace the frozen in-memory layer with the new on-disk layers
|
|
||||||
layers.remove_historic(frozen.clone());
|
|
||||||
|
|
||||||
// If we created a successor InMemoryLayer, its predecessor is
|
|
||||||
// currently the frozen layer. We need to update the predecessor
|
|
||||||
// to be the latest on-disk layer.
|
|
||||||
if let Some(last_historic) = new_historics.last() {
|
|
||||||
if let Some(new_open) = &maybe_new_open {
|
|
||||||
let maybe_old_predecessor =
|
|
||||||
new_open.update_predecessor(Arc::clone(last_historic));
|
|
||||||
let old_predecessor = maybe_old_predecessor
|
|
||||||
.expect("new_open should always be a successor to frozen");
|
|
||||||
assert!(layer_ptr_eq(frozen.as_ref(), old_predecessor.as_ref()));
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// Add the historics to the LayerMap
|
|
||||||
for n in new_historics {
|
|
||||||
layers.insert_historic(n);
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// Call unload() on all frozen layers, to release memory.
|
// Call unload() on all frozen layers, to release memory.
|
||||||
@@ -1567,181 +1522,212 @@ impl LayeredTimeline {
|
|||||||
self.timelineid, cutoff
|
self.timelineid, cutoff
|
||||||
);
|
);
|
||||||
info!("retain_lsns: {:?}", retain_lsns);
|
info!("retain_lsns: {:?}", retain_lsns);
|
||||||
|
|
||||||
let mut layers_to_remove: Vec<Arc<dyn Layer>> = Vec::new();
|
|
||||||
|
|
||||||
// Scan all on-disk layers in the timeline.
|
|
||||||
//
|
|
||||||
// Garbage collect the layer if all conditions are satisfied:
|
|
||||||
// 1. it is older than cutoff LSN;
|
|
||||||
// 2. it doesn't need to be retained for 'retain_lsns';
|
|
||||||
// 3. newer on-disk layer exists (only for non-dropped segments);
|
|
||||||
// 4. this layer doesn't serve as a tombstone for some older layer;
|
|
||||||
//
|
|
||||||
let mut layers = self.layers.lock().unwrap();
|
let mut layers = self.layers.lock().unwrap();
|
||||||
'outer: for l in layers.iter_historic_layers() {
|
|
||||||
let seg = l.get_seg_tag();
|
|
||||||
|
|
||||||
if seg.rel.is_relation() {
|
'outer: loop {
|
||||||
result.ondisk_relfiles_total += 1;
|
let mut layers_to_remove: Vec<Arc<dyn Layer>> = Vec::new();
|
||||||
} else {
|
let mut layer_to_materialize: Option<Arc<dyn Layer>> = None;
|
||||||
result.ondisk_nonrelfiles_total += 1;
|
|
||||||
}
|
// Scan all on-disk layers in the timeline.
|
||||||
|
//
|
||||||
|
// Garbage collect the layer if all conditions are satisfied:
|
||||||
|
// 1. it is older than cutoff LSN;
|
||||||
|
// 2. it doesn't need to be retained for 'retain_lsns';
|
||||||
|
// 3. newer on-disk layer exists (only for non-dropped segments);
|
||||||
|
// 4. this layer doesn't serve as a tombstone for some older layer;
|
||||||
|
//
|
||||||
|
'for_all_historic_layers: for l in layers.iter_historic_layers() {
|
||||||
|
let seg = l.get_seg_tag();
|
||||||
|
|
||||||
// 1. Is it newer than cutoff point?
|
|
||||||
if l.get_end_lsn() > cutoff {
|
|
||||||
info!(
|
|
||||||
"keeping {} {}-{} because it's newer than cutoff {}",
|
|
||||||
seg,
|
|
||||||
l.get_start_lsn(),
|
|
||||||
l.get_end_lsn(),
|
|
||||||
cutoff
|
|
||||||
);
|
|
||||||
if seg.rel.is_relation() {
|
if seg.rel.is_relation() {
|
||||||
result.ondisk_relfiles_needed_by_cutoff += 1;
|
result.ondisk_relfiles_total += 1;
|
||||||
} else {
|
} else {
|
||||||
result.ondisk_nonrelfiles_needed_by_cutoff += 1;
|
result.ondisk_nonrelfiles_total += 1;
|
||||||
}
|
}
|
||||||
continue 'outer;
|
|
||||||
}
|
|
||||||
|
|
||||||
// 2. Is it needed by a child branch?
|
// 1. Create image layer for this relation?
|
||||||
for retain_lsn in &retain_lsns {
|
if !l.is_dropped()
|
||||||
// start_lsn is inclusive and end_lsn is exclusive
|
&& !layers.newer_image_layer_exists(l.get_seg_tag(), l.get_end_lsn())
|
||||||
if l.get_start_lsn() <= *retain_lsn && *retain_lsn < l.get_end_lsn() {
|
{
|
||||||
|
if l.is_incremental() {
|
||||||
|
layer_to_materialize = Some(Arc::clone(&l));
|
||||||
|
break 'for_all_historic_layers;
|
||||||
|
} else {
|
||||||
|
info!(
|
||||||
|
"keeping {} {}-{} because it's newer than cutoff {}",
|
||||||
|
seg,
|
||||||
|
l.get_start_lsn(),
|
||||||
|
l.get_end_lsn(),
|
||||||
|
cutoff
|
||||||
|
);
|
||||||
|
if seg.rel.is_relation() {
|
||||||
|
result.ondisk_relfiles_needed_by_cutoff += 1;
|
||||||
|
} else {
|
||||||
|
result.ondisk_nonrelfiles_needed_by_cutoff += 1;
|
||||||
|
}
|
||||||
|
continue 'for_all_historic_layers;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// 2. Is it newer than cutoff point?
|
||||||
|
if l.get_end_lsn() > cutoff {
|
||||||
info!(
|
info!(
|
||||||
"keeping {} {}-{} because it's needed by branch point {}",
|
"keeping {} {}-{} because it's newer than cutoff {}",
|
||||||
seg,
|
seg,
|
||||||
l.get_start_lsn(),
|
l.get_start_lsn(),
|
||||||
l.get_end_lsn(),
|
l.get_end_lsn(),
|
||||||
*retain_lsn
|
cutoff
|
||||||
);
|
);
|
||||||
if seg.rel.is_relation() {
|
if seg.rel.is_relation() {
|
||||||
result.ondisk_relfiles_needed_by_branches += 1;
|
result.ondisk_relfiles_needed_by_cutoff += 1;
|
||||||
} else {
|
} else {
|
||||||
result.ondisk_nonrelfiles_needed_by_branches += 1;
|
result.ondisk_nonrelfiles_needed_by_cutoff += 1;
|
||||||
}
|
}
|
||||||
continue 'outer;
|
continue 'for_all_historic_layers;
|
||||||
}
|
}
|
||||||
}
|
|
||||||
|
|
||||||
// 3. Is there a later on-disk layer for this relation?
|
// 3. Is it needed by a child branch?
|
||||||
if !l.is_dropped() && !layers.newer_image_layer_exists(l.get_seg_tag(), l.get_end_lsn())
|
for retain_lsn in &retain_lsns {
|
||||||
{
|
// start_lsn is inclusive and end_lsn is exclusive
|
||||||
info!(
|
if l.get_start_lsn() <= *retain_lsn && *retain_lsn < l.get_end_lsn() {
|
||||||
"keeping {} {}-{} because it is the latest layer",
|
|
||||||
seg,
|
|
||||||
l.get_start_lsn(),
|
|
||||||
l.get_end_lsn()
|
|
||||||
);
|
|
||||||
if seg.rel.is_relation() {
|
|
||||||
result.ondisk_relfiles_not_updated += 1;
|
|
||||||
} else {
|
|
||||||
result.ondisk_nonrelfiles_not_updated += 1;
|
|
||||||
}
|
|
||||||
continue 'outer;
|
|
||||||
}
|
|
||||||
|
|
||||||
// 4. Does this layer serve as a tombstome for some older layer?
|
|
||||||
if l.is_dropped() {
|
|
||||||
let prior_lsn = l.get_start_lsn().checked_sub(1u64).unwrap();
|
|
||||||
|
|
||||||
// Check if this layer serves as a tombstone for this timeline
|
|
||||||
// We have to do this separately from timeline check below,
|
|
||||||
// because LayerMap of this timeline is already locked.
|
|
||||||
let mut is_tombstone = layers.layer_exists_at_lsn(l.get_seg_tag(), prior_lsn)?;
|
|
||||||
if is_tombstone {
|
|
||||||
info!(
|
|
||||||
"earlier layer exists at {} in {}",
|
|
||||||
prior_lsn, self.timelineid
|
|
||||||
);
|
|
||||||
}
|
|
||||||
// Now check ancestor timelines, if any
|
|
||||||
else if let Some(ancestor) = &self.ancestor_timeline {
|
|
||||||
let prior_lsn = ancestor.get_last_record_lsn();
|
|
||||||
if seg.rel.is_blocky() {
|
|
||||||
info!(
|
info!(
|
||||||
"check blocky relish size {} at {} in {} for layer {}-{}",
|
"keeping {} {}-{} because it's needed by branch point {}",
|
||||||
seg,
|
seg,
|
||||||
prior_lsn,
|
|
||||||
ancestor.timelineid,
|
|
||||||
l.get_start_lsn(),
|
l.get_start_lsn(),
|
||||||
l.get_end_lsn()
|
l.get_end_lsn(),
|
||||||
|
*retain_lsn
|
||||||
);
|
);
|
||||||
match ancestor.get_relish_size(seg.rel, prior_lsn).unwrap() {
|
if seg.rel.is_relation() {
|
||||||
Some(size) => {
|
result.ondisk_relfiles_needed_by_branches += 1;
|
||||||
let last_live_seg = SegmentTag::from_blknum(seg.rel, size - 1);
|
} else {
|
||||||
info!(
|
result.ondisk_nonrelfiles_needed_by_branches += 1;
|
||||||
"blocky rel size is {} last_live_seg.segno {} seg.segno {}",
|
}
|
||||||
size, last_live_seg.segno, seg.segno
|
continue 'for_all_historic_layers;
|
||||||
);
|
}
|
||||||
if last_live_seg.segno >= seg.segno {
|
}
|
||||||
is_tombstone = true;
|
|
||||||
|
// 4. Does this layer serve as a tombstome for some older layer?
|
||||||
|
if l.is_dropped() {
|
||||||
|
let prior_lsn = l.get_start_lsn().checked_sub(1u64).unwrap();
|
||||||
|
|
||||||
|
// Check if this layer serves as a tombstone for this timeline
|
||||||
|
// We have to do this separately from timeline check below,
|
||||||
|
// because LayerMap of this timeline is already locked.
|
||||||
|
let mut is_tombstone =
|
||||||
|
layers.layer_exists_at_lsn(l.get_seg_tag(), prior_lsn)?;
|
||||||
|
if is_tombstone {
|
||||||
|
info!(
|
||||||
|
"earlier layer exists at {} in {}",
|
||||||
|
prior_lsn, self.timelineid
|
||||||
|
);
|
||||||
|
}
|
||||||
|
// Now check ancestor timelines, if any
|
||||||
|
else if let Some(ancestor) = &self.ancestor_timeline {
|
||||||
|
let prior_lsn = ancestor.get_last_record_lsn();
|
||||||
|
if seg.rel.is_blocky() {
|
||||||
|
info!(
|
||||||
|
"check blocky relish size {} at {} in {} for layer {}-{}",
|
||||||
|
seg,
|
||||||
|
prior_lsn,
|
||||||
|
ancestor.timelineid,
|
||||||
|
l.get_start_lsn(),
|
||||||
|
l.get_end_lsn()
|
||||||
|
);
|
||||||
|
match ancestor.get_relish_size(seg.rel, prior_lsn).unwrap() {
|
||||||
|
Some(size) => {
|
||||||
|
let last_live_seg = SegmentTag::from_blknum(seg.rel, size - 1);
|
||||||
|
info!(
|
||||||
|
"blocky rel size is {} last_live_seg.segno {} seg.segno {}",
|
||||||
|
size, last_live_seg.segno, seg.segno
|
||||||
|
);
|
||||||
|
if last_live_seg.segno >= seg.segno {
|
||||||
|
is_tombstone = true;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
_ => {
|
||||||
|
info!("blocky rel doesn't exist");
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
_ => {
|
} else {
|
||||||
info!("blocky rel doesn't exist");
|
info!(
|
||||||
}
|
"check non-blocky relish existence {} at {} in {} for layer {}-{}",
|
||||||
|
seg,
|
||||||
|
prior_lsn,
|
||||||
|
ancestor.timelineid,
|
||||||
|
l.get_start_lsn(),
|
||||||
|
l.get_end_lsn()
|
||||||
|
);
|
||||||
|
is_tombstone =
|
||||||
|
ancestor.get_rel_exists(seg.rel, prior_lsn).unwrap_or(false);
|
||||||
}
|
}
|
||||||
} else {
|
}
|
||||||
|
|
||||||
|
if is_tombstone {
|
||||||
info!(
|
info!(
|
||||||
"check non-blocky relish existence {} at {} in {} for layer {}-{}",
|
"keeping {} {}-{} because this layer servers as a tombstome for older layer",
|
||||||
seg,
|
seg,
|
||||||
prior_lsn,
|
l.get_start_lsn(),
|
||||||
ancestor.timelineid,
|
l.get_end_lsn()
|
||||||
l.get_start_lsn(),
|
);
|
||||||
l.get_end_lsn()
|
|
||||||
);
|
if seg.rel.is_relation() {
|
||||||
is_tombstone = ancestor.get_rel_exists(seg.rel, prior_lsn).unwrap_or(false);
|
result.ondisk_relfiles_needed_as_tombstone += 1;
|
||||||
|
} else {
|
||||||
|
result.ondisk_nonrelfiles_needed_as_tombstone += 1;
|
||||||
|
}
|
||||||
|
continue 'for_all_historic_layers;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
if is_tombstone {
|
// We didn't find any reason to keep this file, so remove it.
|
||||||
info!(
|
info!(
|
||||||
"keeping {} {}-{} because this layer servers as a tombstome for older layer",
|
"garbage collecting {} {}-{} {}",
|
||||||
seg,
|
l.get_seg_tag(),
|
||||||
l.get_start_lsn(),
|
l.get_start_lsn(),
|
||||||
l.get_end_lsn()
|
l.get_end_lsn(),
|
||||||
);
|
l.is_dropped()
|
||||||
|
);
|
||||||
|
layers_to_remove.push(Arc::clone(&l));
|
||||||
|
}
|
||||||
|
|
||||||
if seg.rel.is_relation() {
|
// Actually delete the layers from disk and remove them from the map.
|
||||||
result.ondisk_relfiles_needed_as_tombstone += 1;
|
// (couldn't do this in the loop above, because you cannot modify a collection
|
||||||
} else {
|
// while iterating it. BTreeMap::retain() would be another option)
|
||||||
result.ondisk_nonrelfiles_needed_as_tombstone += 1;
|
for doomed_layer in layers_to_remove {
|
||||||
}
|
doomed_layer.delete()?;
|
||||||
continue 'outer;
|
layers.remove_historic(doomed_layer.clone());
|
||||||
|
|
||||||
|
match (
|
||||||
|
doomed_layer.is_dropped(),
|
||||||
|
doomed_layer.get_seg_tag().rel.is_relation(),
|
||||||
|
) {
|
||||||
|
(true, true) => result.ondisk_relfiles_dropped += 1,
|
||||||
|
(true, false) => result.ondisk_nonrelfiles_dropped += 1,
|
||||||
|
(false, true) => result.ondisk_relfiles_removed += 1,
|
||||||
|
(false, false) => result.ondisk_nonrelfiles_removed += 1,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// We didn't find any reason to keep this file, so remove it.
|
if let Some(delta_layer) = layer_to_materialize {
|
||||||
info!(
|
drop(layers); // release lock, as far as new image layers are created only by GC thread,
|
||||||
"garbage collecting {} {}-{} {}",
|
let image_layer = ImageLayer::create_from_src(
|
||||||
l.get_seg_tag(),
|
self.conf,
|
||||||
l.get_start_lsn(),
|
&self,
|
||||||
l.get_end_lsn(),
|
&*delta_layer,
|
||||||
l.is_dropped()
|
delta_layer.get_end_lsn(),
|
||||||
);
|
)?;
|
||||||
layers_to_remove.push(Arc::clone(&l));
|
layers = self.layers.lock().unwrap();
|
||||||
}
|
info!(
|
||||||
|
"materialize layer {} {}-{}",
|
||||||
// Actually delete the layers from disk and remove them from the map.
|
delta_layer.get_seg_tag(),
|
||||||
// (couldn't do this in the loop above, because you cannot modify a collection
|
delta_layer.get_start_lsn(),
|
||||||
// while iterating it. BTreeMap::retain() would be another option)
|
delta_layer.get_end_lsn()
|
||||||
for doomed_layer in layers_to_remove {
|
);
|
||||||
doomed_layer.delete()?;
|
layers.insert_historic(Arc::new(image_layer));
|
||||||
layers.remove_historic(doomed_layer.clone());
|
continue 'outer;
|
||||||
|
|
||||||
match (
|
|
||||||
doomed_layer.is_dropped(),
|
|
||||||
doomed_layer.get_seg_tag().rel.is_relation(),
|
|
||||||
) {
|
|
||||||
(true, true) => result.ondisk_relfiles_dropped += 1,
|
|
||||||
(true, false) => result.ondisk_nonrelfiles_dropped += 1,
|
|
||||||
(false, true) => result.ondisk_relfiles_removed += 1,
|
|
||||||
(false, false) => result.ondisk_nonrelfiles_removed += 1,
|
|
||||||
}
|
}
|
||||||
|
break 'outer;
|
||||||
}
|
}
|
||||||
|
|
||||||
result.elapsed = now.elapsed();
|
result.elapsed = now.elapsed();
|
||||||
Ok(result)
|
Ok(result)
|
||||||
}
|
}
|
||||||
@@ -1950,17 +1936,6 @@ pub fn dump_layerfile_from_path(path: &Path) -> Result<()> {
|
|||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Check for equality of Layer memory addresses
|
|
||||||
fn layer_ptr_eq(l1: &dyn Layer, l2: &dyn Layer) -> bool {
|
|
||||||
let l1_ptr = l1 as *const dyn Layer;
|
|
||||||
let l2_ptr = l2 as *const dyn Layer;
|
|
||||||
// comparing *const dyn Layer will not only check for data address equality,
|
|
||||||
// but also for vtable address equality.
|
|
||||||
// to avoid this, we compare *const ().
|
|
||||||
// see here for more https://github.com/rust-lang/rust/issues/46139
|
|
||||||
std::ptr::eq(l1_ptr as *const (), l2_ptr as *const ())
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Add a suffix to a layer file's name: .{num}.old
|
/// Add a suffix to a layer file's name: .{num}.old
|
||||||
/// Uses the first available num (starts at 0)
|
/// Uses the first available num (starts at 0)
|
||||||
fn rename_to_backup(path: PathBuf) -> anyhow::Result<()> {
|
fn rename_to_backup(path: PathBuf) -> anyhow::Result<()> {
|
||||||
|
|||||||
@@ -15,13 +15,11 @@ use crate::{ZTenantId, ZTimelineId};
|
|||||||
use anyhow::{bail, Result};
|
use anyhow::{bail, Result};
|
||||||
use bytes::Bytes;
|
use bytes::Bytes;
|
||||||
use log::*;
|
use log::*;
|
||||||
use std::cmp::min;
|
|
||||||
use std::collections::BTreeMap;
|
use std::collections::BTreeMap;
|
||||||
use std::ops::Bound::Included;
|
use std::ops::Bound::Included;
|
||||||
use std::path::PathBuf;
|
use std::path::PathBuf;
|
||||||
use std::sync::{Arc, RwLock};
|
use std::sync::{Arc, RwLock};
|
||||||
|
|
||||||
use zenith_utils::accum::Accum;
|
|
||||||
use zenith_utils::lsn::Lsn;
|
use zenith_utils::lsn::Lsn;
|
||||||
|
|
||||||
pub struct InMemoryLayer {
|
pub struct InMemoryLayer {
|
||||||
@@ -36,9 +34,6 @@ pub struct InMemoryLayer {
|
|||||||
///
|
///
|
||||||
start_lsn: Lsn,
|
start_lsn: Lsn,
|
||||||
|
|
||||||
/// Frozen in-memory layers have an inclusive end LSN.
|
|
||||||
end_lsn: Option<Lsn>,
|
|
||||||
|
|
||||||
/// LSN of the oldest page version stored in this layer
|
/// LSN of the oldest page version stored in this layer
|
||||||
oldest_pending_lsn: Lsn,
|
oldest_pending_lsn: Lsn,
|
||||||
|
|
||||||
@@ -51,6 +46,9 @@ pub struct InMemoryLayerInner {
|
|||||||
/// If this relation was dropped, remember when that happened.
|
/// If this relation was dropped, remember when that happened.
|
||||||
drop_lsn: Option<Lsn>,
|
drop_lsn: Option<Lsn>,
|
||||||
|
|
||||||
|
/// LSN of last record
|
||||||
|
end_lsn: Option<Lsn>,
|
||||||
|
|
||||||
///
|
///
|
||||||
/// All versions of all pages in the layer are are kept here.
|
/// All versions of all pages in the layer are are kept here.
|
||||||
/// Indexed by block number and LSN.
|
/// Indexed by block number and LSN.
|
||||||
@@ -118,6 +116,11 @@ impl Layer for InMemoryLayer {
|
|||||||
PathBuf::from(format!("inmem-{}", delta_filename))
|
PathBuf::from(format!("inmem-{}", delta_filename))
|
||||||
}
|
}
|
||||||
|
|
||||||
|
fn get_end_lsn(&self) -> Lsn {
|
||||||
|
let inner = self.inner.read().unwrap();
|
||||||
|
inner.drop_lsn.unwrap_or(Lsn(u64::MAX))
|
||||||
|
}
|
||||||
|
|
||||||
fn path(&self) -> Option<PathBuf> {
|
fn path(&self) -> Option<PathBuf> {
|
||||||
None
|
None
|
||||||
}
|
}
|
||||||
@@ -134,20 +137,6 @@ impl Layer for InMemoryLayer {
|
|||||||
self.start_lsn
|
self.start_lsn
|
||||||
}
|
}
|
||||||
|
|
||||||
fn get_end_lsn(&self) -> Lsn {
|
|
||||||
if let Some(end_lsn) = self.end_lsn {
|
|
||||||
return Lsn(end_lsn.0 + 1);
|
|
||||||
}
|
|
||||||
|
|
||||||
let inner = self.inner.read().unwrap();
|
|
||||||
|
|
||||||
if let Some(drop_lsn) = inner.drop_lsn {
|
|
||||||
drop_lsn
|
|
||||||
} else {
|
|
||||||
Lsn(u64::MAX)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
fn is_dropped(&self) -> bool {
|
fn is_dropped(&self) -> bool {
|
||||||
let inner = self.inner.read().unwrap();
|
let inner = self.inner.read().unwrap();
|
||||||
inner.drop_lsn.is_some()
|
inner.drop_lsn.is_some()
|
||||||
@@ -252,8 +241,7 @@ impl Layer for InMemoryLayer {
|
|||||||
}
|
}
|
||||||
|
|
||||||
fn is_incremental(&self) -> bool {
|
fn is_incremental(&self) -> bool {
|
||||||
let inner = self.inner.read().unwrap();
|
true
|
||||||
inner.predecessor.is_some()
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/// debugging function to print out the contents of the layer
|
/// debugging function to print out the contents of the layer
|
||||||
@@ -296,19 +284,7 @@ pub struct NonWriteableError;
|
|||||||
|
|
||||||
pub type WriteResult<T> = std::result::Result<T, NonWriteableError>;
|
pub type WriteResult<T> = std::result::Result<T, NonWriteableError>;
|
||||||
|
|
||||||
/// Helper struct to cleanup `InMemoryLayer::freeze` return signature.
|
|
||||||
pub struct FreezeLayers {
|
|
||||||
/// Replacement layer for the layer which freeze was called on.
|
|
||||||
pub frozen: Arc<InMemoryLayer>,
|
|
||||||
/// New open layer containing leftover data.
|
|
||||||
pub open: Option<Arc<InMemoryLayer>>,
|
|
||||||
}
|
|
||||||
|
|
||||||
impl InMemoryLayer {
|
impl InMemoryLayer {
|
||||||
fn assert_not_frozen(&self) {
|
|
||||||
assert!(self.end_lsn.is_none());
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Return the oldest page version that's stored in this layer
|
/// Return the oldest page version that's stored in this layer
|
||||||
pub fn get_oldest_pending_lsn(&self) -> Lsn {
|
pub fn get_oldest_pending_lsn(&self) -> Lsn {
|
||||||
self.oldest_pending_lsn
|
self.oldest_pending_lsn
|
||||||
@@ -338,10 +314,10 @@ impl InMemoryLayer {
|
|||||||
tenantid,
|
tenantid,
|
||||||
seg,
|
seg,
|
||||||
start_lsn,
|
start_lsn,
|
||||||
end_lsn: None,
|
|
||||||
oldest_pending_lsn,
|
oldest_pending_lsn,
|
||||||
inner: RwLock::new(InMemoryLayerInner {
|
inner: RwLock::new(InMemoryLayerInner {
|
||||||
drop_lsn: None,
|
drop_lsn: None,
|
||||||
|
end_lsn: None,
|
||||||
page_versions: BTreeMap::new(),
|
page_versions: BTreeMap::new(),
|
||||||
segsizes: BTreeMap::new(),
|
segsizes: BTreeMap::new(),
|
||||||
writeable: true,
|
writeable: true,
|
||||||
@@ -379,7 +355,6 @@ impl InMemoryLayer {
|
|||||||
/// Common subroutine of the public put_wal_record() and put_page_image() functions.
|
/// Common subroutine of the public put_wal_record() and put_page_image() functions.
|
||||||
/// Adds the page version to the in-memory tree
|
/// Adds the page version to the in-memory tree
|
||||||
pub fn put_page_version(&self, blknum: u32, lsn: Lsn, pv: PageVersion) -> WriteResult<u32> {
|
pub fn put_page_version(&self, blknum: u32, lsn: Lsn, pv: PageVersion) -> WriteResult<u32> {
|
||||||
self.assert_not_frozen();
|
|
||||||
assert!(self.seg.blknum_in_seg(blknum));
|
assert!(self.seg.blknum_in_seg(blknum));
|
||||||
|
|
||||||
trace!(
|
trace!(
|
||||||
@@ -392,6 +367,7 @@ impl InMemoryLayer {
|
|||||||
let mut inner = self.inner.write().unwrap();
|
let mut inner = self.inner.write().unwrap();
|
||||||
|
|
||||||
inner.check_writeable()?;
|
inner.check_writeable()?;
|
||||||
|
inner.end_lsn = Some(lsn);
|
||||||
|
|
||||||
let old = inner.page_versions.insert((blknum, lsn), pv);
|
let old = inner.page_versions.insert((blknum, lsn), pv);
|
||||||
|
|
||||||
@@ -458,10 +434,9 @@ impl InMemoryLayer {
|
|||||||
|
|
||||||
/// Remember that the relation was truncated at given LSN
|
/// Remember that the relation was truncated at given LSN
|
||||||
pub fn put_truncation(&self, lsn: Lsn, segsize: u32) -> WriteResult<()> {
|
pub fn put_truncation(&self, lsn: Lsn, segsize: u32) -> WriteResult<()> {
|
||||||
self.assert_not_frozen();
|
|
||||||
|
|
||||||
let mut inner = self.inner.write().unwrap();
|
let mut inner = self.inner.write().unwrap();
|
||||||
inner.check_writeable()?;
|
inner.check_writeable()?;
|
||||||
|
inner.end_lsn = Some(lsn);
|
||||||
|
|
||||||
// check that this we truncate to a smaller size than segment was before the truncation
|
// check that this we truncate to a smaller size than segment was before the truncation
|
||||||
let oldsize = inner.get_seg_size(lsn);
|
let oldsize = inner.get_seg_size(lsn);
|
||||||
@@ -479,8 +454,6 @@ impl InMemoryLayer {
|
|||||||
|
|
||||||
/// Remember that the segment was dropped at given LSN
|
/// Remember that the segment was dropped at given LSN
|
||||||
pub fn drop_segment(&self, lsn: Lsn) -> WriteResult<()> {
|
pub fn drop_segment(&self, lsn: Lsn) -> WriteResult<()> {
|
||||||
self.assert_not_frozen();
|
|
||||||
|
|
||||||
let mut inner = self.inner.write().unwrap();
|
let mut inner = self.inner.write().unwrap();
|
||||||
|
|
||||||
inner.check_writeable()?;
|
inner.check_writeable()?;
|
||||||
@@ -531,10 +504,10 @@ impl InMemoryLayer {
|
|||||||
tenantid,
|
tenantid,
|
||||||
seg,
|
seg,
|
||||||
start_lsn,
|
start_lsn,
|
||||||
end_lsn: None,
|
|
||||||
oldest_pending_lsn,
|
oldest_pending_lsn,
|
||||||
inner: RwLock::new(InMemoryLayerInner {
|
inner: RwLock::new(InMemoryLayerInner {
|
||||||
drop_lsn: None,
|
drop_lsn: None,
|
||||||
|
end_lsn: None,
|
||||||
page_versions: BTreeMap::new(),
|
page_versions: BTreeMap::new(),
|
||||||
segsizes,
|
segsizes,
|
||||||
writeable: true,
|
writeable: true,
|
||||||
@@ -548,125 +521,12 @@ impl InMemoryLayer {
|
|||||||
inner.writeable
|
inner.writeable
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Splits `self` into two InMemoryLayers: `frozen` and `open`.
|
|
||||||
/// All data up to and including `cutoff_lsn`
|
|
||||||
/// is copied to `frozen`, while the remaining data is copied to `open`.
|
|
||||||
/// After completion, self is non-writeable, but not frozen.
|
|
||||||
pub fn freeze(self: Arc<Self>, cutoff_lsn: Lsn) -> Result<FreezeLayers> {
|
|
||||||
info!(
|
|
||||||
"freezing in memory layer {} on timeline {} at {} (oldest {})",
|
|
||||||
self.filename().display(),
|
|
||||||
self.timelineid,
|
|
||||||
cutoff_lsn,
|
|
||||||
self.oldest_pending_lsn
|
|
||||||
);
|
|
||||||
|
|
||||||
self.assert_not_frozen();
|
|
||||||
|
|
||||||
let self_ref = self.clone();
|
|
||||||
let mut inner = self_ref.inner.write().unwrap();
|
|
||||||
// Dropped layers don't need any special freeze actions,
|
|
||||||
// they are marked as non-writeable at drop and just
|
|
||||||
// written out to disk by checkpointer.
|
|
||||||
if inner.drop_lsn.is_some() {
|
|
||||||
assert!(!inner.writeable);
|
|
||||||
info!(
|
|
||||||
"freezing in memory layer for {} on timeline {} is dropped at {}",
|
|
||||||
self.seg,
|
|
||||||
self.timelineid,
|
|
||||||
inner.drop_lsn.unwrap()
|
|
||||||
);
|
|
||||||
|
|
||||||
// There should be no newer layer that refers this non-writeable layer,
|
|
||||||
// because layer that is created after dropped one represents a new rel.
|
|
||||||
return Ok(FreezeLayers {
|
|
||||||
frozen: self,
|
|
||||||
open: None,
|
|
||||||
});
|
|
||||||
}
|
|
||||||
assert!(inner.writeable);
|
|
||||||
inner.writeable = false;
|
|
||||||
|
|
||||||
// Divide all the page versions into old and new
|
|
||||||
// at the 'cutoff_lsn' point.
|
|
||||||
let mut before_segsizes = BTreeMap::new();
|
|
||||||
let mut after_segsizes = BTreeMap::new();
|
|
||||||
let mut after_oldest_lsn: Accum<Lsn> = Accum(None);
|
|
||||||
for (lsn, size) in inner.segsizes.iter() {
|
|
||||||
if *lsn > cutoff_lsn {
|
|
||||||
after_segsizes.insert(*lsn, *size);
|
|
||||||
after_oldest_lsn.accum(min, *lsn);
|
|
||||||
} else {
|
|
||||||
before_segsizes.insert(*lsn, *size);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
let mut before_page_versions = BTreeMap::new();
|
|
||||||
let mut after_page_versions = BTreeMap::new();
|
|
||||||
for ((blknum, lsn), pv) in inner.page_versions.iter() {
|
|
||||||
if *lsn > cutoff_lsn {
|
|
||||||
after_page_versions.insert((*blknum, *lsn), pv.clone());
|
|
||||||
after_oldest_lsn.accum(min, *lsn);
|
|
||||||
} else {
|
|
||||||
before_page_versions.insert((*blknum, *lsn), pv.clone());
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
let frozen = Arc::new(InMemoryLayer {
|
|
||||||
conf: self.conf,
|
|
||||||
tenantid: self.tenantid,
|
|
||||||
timelineid: self.timelineid,
|
|
||||||
seg: self.seg,
|
|
||||||
start_lsn: self.start_lsn,
|
|
||||||
end_lsn: Some(cutoff_lsn),
|
|
||||||
oldest_pending_lsn: self.start_lsn,
|
|
||||||
inner: RwLock::new(InMemoryLayerInner {
|
|
||||||
drop_lsn: inner.drop_lsn,
|
|
||||||
page_versions: before_page_versions,
|
|
||||||
segsizes: before_segsizes,
|
|
||||||
writeable: false,
|
|
||||||
predecessor: inner.predecessor.clone(),
|
|
||||||
}),
|
|
||||||
});
|
|
||||||
|
|
||||||
let open = if !after_segsizes.is_empty() || !after_page_versions.is_empty() {
|
|
||||||
let mut new_open = Self::create_successor_layer(
|
|
||||||
self.conf,
|
|
||||||
frozen.clone(),
|
|
||||||
self.timelineid,
|
|
||||||
self.tenantid,
|
|
||||||
cutoff_lsn + 1,
|
|
||||||
after_oldest_lsn.0.unwrap(),
|
|
||||||
)?;
|
|
||||||
|
|
||||||
let new_inner = new_open.inner.get_mut().unwrap();
|
|
||||||
new_inner.page_versions.append(&mut after_page_versions);
|
|
||||||
new_inner.segsizes.append(&mut after_segsizes);
|
|
||||||
|
|
||||||
Some(Arc::new(new_open))
|
|
||||||
} else {
|
|
||||||
None
|
|
||||||
};
|
|
||||||
|
|
||||||
Ok(FreezeLayers { frozen, open })
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Write the this frozen in-memory layer to disk.
|
/// Write the this frozen in-memory layer to disk.
|
||||||
///
|
///
|
||||||
/// Returns new layers that replace this one.
|
/// Returns new DeltaLayer that includes all the
|
||||||
/// If not dropped, returns a new image layer containing the page versions
|
/// WAL records between start and end LSN.
|
||||||
/// at the `end_lsn`. Can also return a DeltaLayer that includes all the
|
///
|
||||||
/// WAL records between start and end LSN. (The delta layer is not needed
|
pub fn write_to_disk(&self, timeline: &LayeredTimeline) -> Result<Arc<dyn Layer>> {
|
||||||
/// when a new relish is created with a single LSN, so that the start and
|
|
||||||
/// end LSN are the same.)
|
|
||||||
pub fn write_to_disk(&self, timeline: &LayeredTimeline) -> Result<Vec<Arc<dyn Layer>>> {
|
|
||||||
trace!(
|
|
||||||
"write_to_disk {} end_lsn is {} get_end_lsn is {}",
|
|
||||||
self.filename().display(),
|
|
||||||
self.end_lsn.unwrap_or(Lsn(0)),
|
|
||||||
self.get_end_lsn()
|
|
||||||
);
|
|
||||||
|
|
||||||
// Grab the lock in read-mode. We hold it over the I/O, but because this
|
// Grab the lock in read-mode. We hold it over the I/O, but because this
|
||||||
// layer is not writeable anymore, no one should be trying to aquire the
|
// layer is not writeable anymore, no one should be trying to aquire the
|
||||||
// write lock on it, so we shouldn't block anyone. There's one exception
|
// write lock on it, so we shouldn't block anyone. There's one exception
|
||||||
@@ -677,7 +537,12 @@ impl InMemoryLayer {
|
|||||||
// would have to wait until we release it. That race condition is very
|
// would have to wait until we release it. That race condition is very
|
||||||
// rare though, so we just accept the potential latency hit for now.
|
// rare though, so we just accept the potential latency hit for now.
|
||||||
let inner = self.inner.read().unwrap();
|
let inner = self.inner.read().unwrap();
|
||||||
assert!(!inner.writeable);
|
|
||||||
|
trace!(
|
||||||
|
"write_to_disk {} end_lsn is {}",
|
||||||
|
self.filename().display(),
|
||||||
|
inner.end_lsn.unwrap_or(Lsn(0)),
|
||||||
|
);
|
||||||
|
|
||||||
let predecessor = inner.predecessor.clone();
|
let predecessor = inner.predecessor.clone();
|
||||||
|
|
||||||
@@ -700,24 +565,10 @@ impl InMemoryLayer {
|
|||||||
self.start_lsn,
|
self.start_lsn,
|
||||||
drop_lsn
|
drop_lsn
|
||||||
);
|
);
|
||||||
return Ok(vec![Arc::new(delta_layer)]);
|
return Ok(Arc::new(delta_layer));
|
||||||
}
|
}
|
||||||
|
|
||||||
let end_lsn = self.end_lsn.unwrap();
|
let end_lsn = inner.end_lsn.unwrap();
|
||||||
|
|
||||||
let mut before_segsizes = BTreeMap::new();
|
|
||||||
for (lsn, size) in inner.segsizes.iter() {
|
|
||||||
if *lsn <= end_lsn {
|
|
||||||
before_segsizes.insert(*lsn, *size);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
let mut before_page_versions = inner.page_versions.iter().filter(|tup| {
|
|
||||||
let ((_blknum, lsn), _pv) = tup;
|
|
||||||
|
|
||||||
*lsn < end_lsn
|
|
||||||
});
|
|
||||||
|
|
||||||
let mut frozen_layers: Vec<Arc<dyn Layer>> = Vec::new();
|
|
||||||
|
|
||||||
if self.start_lsn != end_lsn {
|
if self.start_lsn != end_lsn {
|
||||||
// Write the page versions before the cutoff to disk.
|
// Write the page versions before the cutoff to disk.
|
||||||
@@ -730,32 +581,13 @@ impl InMemoryLayer {
|
|||||||
end_lsn,
|
end_lsn,
|
||||||
false,
|
false,
|
||||||
predecessor,
|
predecessor,
|
||||||
before_page_versions,
|
inner.page_versions.iter(),
|
||||||
before_segsizes,
|
inner.segsizes.clone(),
|
||||||
)?;
|
)?;
|
||||||
frozen_layers.push(Arc::new(delta_layer));
|
Ok(Arc::new(delta_layer))
|
||||||
trace!(
|
|
||||||
"freeze: created delta layer {} {}-{}",
|
|
||||||
self.seg,
|
|
||||||
self.start_lsn,
|
|
||||||
end_lsn
|
|
||||||
);
|
|
||||||
} else {
|
} else {
|
||||||
assert!(before_page_versions.next().is_none());
|
let image_layer = ImageLayer::create_from_src(self.conf, timeline, self, end_lsn)?;
|
||||||
|
Ok(Arc::new(image_layer))
|
||||||
}
|
}
|
||||||
|
|
||||||
drop(inner);
|
|
||||||
|
|
||||||
// Write a new base image layer at the cutoff point
|
|
||||||
let image_layer = ImageLayer::create_from_src(self.conf, timeline, self, end_lsn)?;
|
|
||||||
frozen_layers.push(Arc::new(image_layer));
|
|
||||||
trace!("freeze: created image layer {} at {}", self.seg, end_lsn);
|
|
||||||
|
|
||||||
Ok(frozen_layers)
|
|
||||||
}
|
|
||||||
|
|
||||||
pub fn update_predecessor(&self, predecessor: Arc<dyn Layer>) -> Option<Arc<dyn Layer>> {
|
|
||||||
let mut inner = self.inner.write().unwrap();
|
|
||||||
inner.predecessor.replace(predecessor)
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -219,7 +219,7 @@ where
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
// No more elements at this point. Move to next point.
|
// No more elements at this point. Move to next point.
|
||||||
if let Some((point_key, point)) = self.point_iter.next() {
|
if let Some((point_key, point)) = self.point_iter.next_back() {
|
||||||
self.elem_iter = Some((*point_key, point.elements.iter()));
|
self.elem_iter = Some((*point_key, point.elements.iter()));
|
||||||
continue;
|
continue;
|
||||||
} else {
|
} else {
|
||||||
|
|||||||
@@ -36,8 +36,8 @@ pub mod defaults {
|
|||||||
pub const DEFAULT_CHECKPOINT_DISTANCE: u64 = 256 * 1024 * 1024;
|
pub const DEFAULT_CHECKPOINT_DISTANCE: u64 = 256 * 1024 * 1024;
|
||||||
pub const DEFAULT_CHECKPOINT_PERIOD: Duration = Duration::from_secs(1);
|
pub const DEFAULT_CHECKPOINT_PERIOD: Duration = Duration::from_secs(1);
|
||||||
|
|
||||||
pub const DEFAULT_GC_HORIZON: u64 = 64 * 1024 * 1024;
|
pub const DEFAULT_GC_HORIZON: u64 = 64u64 * 1024 * 1024 * 1024;
|
||||||
pub const DEFAULT_GC_PERIOD: Duration = Duration::from_secs(100);
|
pub const DEFAULT_GC_PERIOD: Duration = Duration::from_secs(1);
|
||||||
|
|
||||||
pub const DEFAULT_SUPERUSER: &str = "zenith_admin";
|
pub const DEFAULT_SUPERUSER: &str = "zenith_admin";
|
||||||
pub const DEFAULT_RELISH_STORAGE_MAX_CONCURRENT_SYNC_LIMITS: usize = 100;
|
pub const DEFAULT_RELISH_STORAGE_MAX_CONCURRENT_SYNC_LIMITS: usize = 100;
|
||||||
|
|||||||
Reference in New Issue
Block a user