Remove 'predecessor' reference from in-memory and delta layers.

The caller is now responsible for lookin up the predecessor layer,
instead. This makes the code simpler, as you don't need to update the
predecessor reference when a layer is frozen or written to disk.

There was a bug in that, as Konstantin noted on discord:

    Assume that freeze doesn't create new inmem layer
    (maybe_new_open=None). Then we temporary place in historics frozen
    layer. Assume that now new put_wal_record request arrives. There is
    no open in-mem layer, so it has to create new one. It is looking for
    previous layer for read and set it as new in-mem layer
    predecessor. But as far as I understand, prev layer should be our
    temporary frozen layer. Which will be then removed from
    historics.

That leaves the predecessor field of the new in-memory layer pointing
at the frozen in-memory layer that has been removed from the layer map,
preventing it from being removed from memory.

This makes two subtle changes:

1. When the first new layer is created on a branch for a segment that
   existed on the ancestor branch, the start_lsn of the new layer is now
   the branch point + 1. We were previously slightly confused on what
   the branch point LSN meant. It means that all the WAL up to and
   *including* the LSN on the old branch is visible to the new branch.
   If we mark the start LSN of the new layer as equal to the branch point,
   that's wrong, because if there is a WAL record with that LSN on the
   predecessor layer, the new layer would hide it. This bug was hidden
   when the layer on the new branch contained a direct reference to the
   layer in the old branch, as get_page_reconstruct_data() followed that
   reference directly when it didn't find the page version in the new
   layer. But now that the caller performs the lookup, it will look up
   the new layer that doesn't contain the record, and you get an error.

2. InMemoryLayer now always stores the segment size at the beginning
   of the layer's LSN range. Previously, get_seg_size() might have
   recursed into the predecessor layer to get the size, but now we
   avoid that by always copying over the last size from the previous
   layer, when a new layer is created.
This commit is contained in:
Heikki Linnakangas
2021-10-08 00:54:13 +03:00
parent 60dae0b4ac
commit 960c7d69a8
4 changed files with 69 additions and 110 deletions

View File

@@ -1030,7 +1030,7 @@ impl LayeredTimeline {
self.timelineid
);
let mut layers = self.layers.lock().unwrap();
let (imgfilenames, mut deltafilenames) =
let (imgfilenames, deltafilenames) =
filename::list_files(self.conf, self.timelineid, self.tenantid)?;
let timeline_path = self.conf.timeline_path(&self.timelineid, &self.tenantid);
@@ -1058,11 +1058,7 @@ impl LayeredTimeline {
layers.insert_historic(Arc::new(layer));
}
// Then for the Delta files. The delta files are created in order starting
// from the oldest file, because each DeltaLayer needs a reference to its
// predecessor.
deltafilenames.sort();
// Then for the Delta files.
for filename in deltafilenames.iter() {
ensure!(filename.start_lsn < filename.end_lsn);
if filename.end_lsn > disk_consistent_lsn {
@@ -1075,27 +1071,12 @@ impl LayeredTimeline {
continue;
}
let predecessor = layers.get(&filename.seg, filename.start_lsn);
let predecessor_str: String = if let Some(prec) = &predecessor {
prec.filename().display().to_string()
} else {
"none".to_string()
};
let layer = DeltaLayer::new(
self.conf,
self.timelineid,
self.tenantid,
filename,
predecessor,
);
let layer = DeltaLayer::new(self.conf, self.timelineid, self.tenantid, filename);
info!(
"found layer {} on timeline {}, predecessor: {}",
"found layer {} on timeline {}",
layer.filename().display(),
self.timelineid,
predecessor_str,
);
layers.insert_historic(Arc::new(layer));
}
@@ -1270,7 +1251,7 @@ impl LayeredTimeline {
let start_lsn;
if prev_layer.get_timeline_id() != self.timelineid {
// First modification on this timeline
start_lsn = self.ancestor_lsn;
start_lsn = self.ancestor_lsn + 1;
trace!(
"creating layer for write for {} at branch point {}/{}",
seg,
@@ -1422,19 +1403,6 @@ impl LayeredTimeline {
// Finally, replace the frozen in-memory layer with the new on-disk layers
layers.remove_historic(frozen.clone());
// If we created a successor InMemoryLayer, its predecessor is
// currently the frozen layer. We need to update the predecessor
// to be the latest on-disk layer.
if let Some(last_historic) = new_historics.last() {
if let Some(new_open) = &maybe_new_open {
let maybe_old_predecessor =
new_open.update_predecessor(Arc::clone(last_historic));
let old_predecessor = maybe_old_predecessor
.expect("new_open should always be a successor to frozen");
assert!(layer_ptr_eq(frozen.as_ref(), old_predecessor.as_ref()));
}
}
// Add the historics to the LayerMap
for n in new_historics {
layers.insert_historic(n);
@@ -1741,12 +1709,20 @@ impl LayeredTimeline {
loop {
match layer_ref.get_page_reconstruct_data(blknum, curr_lsn, &mut data)? {
PageReconstructResult::Complete => break,
PageReconstructResult::Continue(cont_lsn, cont_layer) => {
PageReconstructResult::Continue(cont_lsn) => {
// Fetch base image / more WAL from the returned predecessor layer
layer_arc = cont_layer;
layer_ref = &*layer_arc;
curr_lsn = cont_lsn;
continue;
if let Some((cont_layer, cont_lsn)) = self.get_layer_for_read(seg, cont_lsn)? {
layer_arc = cont_layer;
layer_ref = &*layer_arc;
curr_lsn = cont_lsn;
continue;
} else {
bail!(
"could not find predecessor layer of segment {} at {}",
seg.rel,
cont_lsn
);
}
}
PageReconstructResult::Missing(lsn) => {
// Oops, we could not reconstruct the page.
@@ -1919,17 +1895,6 @@ pub fn dump_layerfile_from_path(path: &Path) -> Result<()> {
Ok(())
}
/// Check for equality of Layer memory addresses
fn layer_ptr_eq(l1: &dyn Layer, l2: &dyn Layer) -> bool {
let l1_ptr = l1 as *const dyn Layer;
let l2_ptr = l2 as *const dyn Layer;
// comparing *const dyn Layer will not only check for data address equality,
// but also for vtable address equality.
// to avoid this, we compare *const ().
// see here for more https://github.com/rust-lang/rust/issues/46139
std::ptr::eq(l1_ptr as *const (), l2_ptr as *const ())
}
/// Add a suffix to a layer file's name: .{num}.old
/// Uses the first available num (starts at 0)
fn rename_to_backup(path: PathBuf) -> anyhow::Result<()> {

View File

@@ -45,7 +45,7 @@ use crate::layered_repository::storage_layer::{
use crate::waldecoder;
use crate::PageServerConf;
use crate::{ZTenantId, ZTimelineId};
use anyhow::{bail, Result};
use anyhow::{bail, ensure, Result};
use log::*;
use serde::{Deserialize, Serialize};
use std::collections::BTreeMap;
@@ -57,7 +57,7 @@ use std::fs::File;
use std::io::{BufWriter, Write};
use std::ops::Bound::Included;
use std::path::{Path, PathBuf};
use std::sync::{Arc, Mutex, MutexGuard};
use std::sync::{Mutex, MutexGuard};
use bookfile::{Book, BookWriter};
@@ -131,9 +131,6 @@ pub struct DeltaLayer {
dropped: bool,
/// Predecessor layer
predecessor: Option<Arc<dyn Layer>>,
inner: Mutex<DeltaLayerInner>,
}
@@ -247,16 +244,9 @@ impl Layer for DeltaLayer {
}
// If an older page image is needed to reconstruct the page, let the
// caller know about the predecessor layer.
// caller know.
if need_image {
if let Some(cont_layer) = &self.predecessor {
Ok(PageReconstructResult::Continue(
self.start_lsn,
Arc::clone(cont_layer),
))
} else {
Ok(PageReconstructResult::Missing(self.start_lsn))
}
Ok(PageReconstructResult::Continue(self.start_lsn))
} else {
Ok(PageReconstructResult::Complete)
}
@@ -265,6 +255,10 @@ impl Layer for DeltaLayer {
/// Get size of the relation at given LSN
fn get_seg_size(&self, lsn: Lsn) -> Result<u32> {
assert!(lsn >= self.start_lsn);
ensure!(
self.seg.rel.is_blocky(),
"get_seg_size() called on a non-blocky rel"
);
// Scan the BTreeMap backwards, starting from the given entry.
let inner = self.load()?;
@@ -273,11 +267,8 @@ impl Layer for DeltaLayer {
let result;
if let Some((_entry_lsn, entry)) = iter.next_back() {
result = *entry;
// Use the base image if needed
} else if let Some(predecessor) = &self.predecessor {
result = predecessor.get_seg_size(lsn)?;
} else {
result = 0;
bail!("could not find seg size in delta layer");
}
Ok(result)
}
@@ -389,10 +380,13 @@ impl DeltaLayer {
start_lsn: Lsn,
end_lsn: Lsn,
dropped: bool,
predecessor: Option<Arc<dyn Layer>>,
page_versions: impl Iterator<Item = (&'a (u32, Lsn), &'a PageVersion)>,
relsizes: BTreeMap<Lsn, u32>,
) -> Result<DeltaLayer> {
if seg.rel.is_blocky() {
assert!(!relsizes.is_empty());
}
let delta_layer = DeltaLayer {
path_or_conf: PathOrConf::Conf(conf),
timelineid,
@@ -406,7 +400,6 @@ impl DeltaLayer {
page_version_metas: BTreeMap::new(),
relsizes,
}),
predecessor,
};
let mut inner = delta_layer.inner.lock().unwrap();
@@ -551,7 +544,6 @@ impl DeltaLayer {
timelineid: ZTimelineId,
tenantid: ZTenantId,
filename: &DeltaFileName,
predecessor: Option<Arc<dyn Layer>>,
) -> DeltaLayer {
DeltaLayer {
path_or_conf: PathOrConf::Conf(conf),
@@ -566,7 +558,6 @@ impl DeltaLayer {
page_version_metas: BTreeMap::new(),
relsizes: BTreeMap::new(),
}),
predecessor,
}
}
@@ -590,7 +581,6 @@ impl DeltaLayer {
page_version_metas: BTreeMap::new(),
relsizes: BTreeMap::new(),
}),
predecessor: None,
})
}
}

View File

@@ -12,7 +12,7 @@ use crate::layered_repository::{DeltaLayer, ImageLayer};
use crate::repository::WALRecord;
use crate::PageServerConf;
use crate::{ZTenantId, ZTimelineId};
use anyhow::{bail, Result};
use anyhow::{bail, ensure, Result};
use bytes::Bytes;
use log::*;
use std::cmp::min;
@@ -45,6 +45,9 @@ pub struct InMemoryLayer {
/// The above fields never change. The parts that do change are in 'inner',
/// and protected by mutex.
inner: RwLock<InMemoryLayerInner>,
/// Predecessor layer might be needed?
incremental: bool,
}
pub struct InMemoryLayerInner {
@@ -60,14 +63,15 @@ pub struct InMemoryLayerInner {
///
/// `segsizes` tracks the size of the segment at different points in time.
///
/// For a blocky rel, there is always one entry, at the layer's start_lsn,
/// so that determining the size never depends on the predecessor layer. For
/// a non-blocky rel, 'segsizes' is not used and is always empty.
///
segsizes: BTreeMap<Lsn, u32>,
/// Writes are only allowed when true.
/// Set to false when this layer is in the process of being replaced.
writeable: bool,
/// Predecessor layer
predecessor: Option<Arc<dyn Layer>>,
}
impl InMemoryLayerInner {
@@ -83,10 +87,11 @@ impl InMemoryLayerInner {
// Scan the BTreeMap backwards, starting from the given entry.
let mut iter = self.segsizes.range((Included(&Lsn(0)), Included(&lsn)));
// We make sure there is always at least one entry
if let Some((_entry_lsn, entry)) = iter.next_back() {
*entry
} else {
0
panic!("could not find seg size in in-memory layer");
}
}
}
@@ -164,8 +169,6 @@ impl Layer for InMemoryLayer {
assert!(self.seg.blknum_in_seg(blknum));
let predecessor: Option<Arc<dyn Layer>>;
{
let inner = self.inner.read().unwrap();
@@ -192,16 +195,14 @@ impl Layer for InMemoryLayer {
bail!("no page image or WAL record for requested page");
}
}
predecessor = inner.predecessor.clone();
// release lock on 'inner'
}
// If an older page image is needed to reconstruct the page, let the
// caller know about the predecessor layer.
// caller know
if need_image {
if let Some(cont_layer) = predecessor {
Ok(PageReconstructResult::Continue(self.start_lsn, cont_layer))
if self.incremental {
Ok(PageReconstructResult::Continue(Lsn(self.start_lsn.0 - 1)))
} else {
Ok(PageReconstructResult::Missing(self.start_lsn))
}
@@ -213,6 +214,10 @@ impl Layer for InMemoryLayer {
/// Get size of the relation at given LSN
fn get_seg_size(&self, lsn: Lsn) -> Result<u32> {
assert!(lsn >= self.start_lsn);
ensure!(
self.seg.rel.is_blocky(),
"get_seg_size() called on a non-blocky rel"
);
let inner = self.inner.read().unwrap();
Ok(inner.get_seg_size(lsn))
@@ -252,8 +257,7 @@ impl Layer for InMemoryLayer {
}
fn is_incremental(&self) -> bool {
let inner = self.inner.read().unwrap();
inner.predecessor.is_some()
self.incremental
}
/// debugging function to print out the contents of the layer
@@ -332,6 +336,12 @@ impl InMemoryLayer {
start_lsn
);
// The segment is initially empty, so initialize 'segsizes' with 0.
let mut segsizes = BTreeMap::new();
if seg.rel.is_blocky() {
segsizes.insert(start_lsn, 0);
}
Ok(InMemoryLayer {
conf,
timelineid,
@@ -340,12 +350,12 @@ impl InMemoryLayer {
start_lsn,
end_lsn: None,
oldest_pending_lsn,
incremental: false,
inner: RwLock::new(InMemoryLayerInner {
drop_lsn: None,
page_versions: BTreeMap::new(),
segsizes: BTreeMap::new(),
segsizes,
writeable: true,
predecessor: None,
}),
})
}
@@ -458,6 +468,10 @@ impl InMemoryLayer {
/// Remember that the relation was truncated at given LSN
pub fn put_truncation(&self, lsn: Lsn, segsize: u32) -> WriteResult<()> {
assert!(
self.seg.rel.is_blocky(),
"put_truncation() called on a non-blocky rel"
);
self.assert_not_frozen();
let mut inner = self.inner.write().unwrap();
@@ -518,7 +532,7 @@ impl InMemoryLayer {
start_lsn,
);
// For convenience, copy the segment size from the predecessor layer
// Copy the segment size at the start LSN from the predecessor layer.
let mut segsizes = BTreeMap::new();
if seg.rel.is_blocky() {
let size = src.get_seg_size(start_lsn)?;
@@ -533,12 +547,12 @@ impl InMemoryLayer {
start_lsn,
end_lsn: None,
oldest_pending_lsn,
incremental: true,
inner: RwLock::new(InMemoryLayerInner {
drop_lsn: None,
page_versions: BTreeMap::new(),
segsizes,
writeable: true,
predecessor: Some(src),
}),
})
}
@@ -620,12 +634,12 @@ impl InMemoryLayer {
start_lsn: self.start_lsn,
end_lsn: Some(cutoff_lsn),
oldest_pending_lsn: self.start_lsn,
incremental: self.incremental,
inner: RwLock::new(InMemoryLayerInner {
drop_lsn: inner.drop_lsn,
page_versions: before_page_versions,
segsizes: before_segsizes,
writeable: false,
predecessor: inner.predecessor.clone(),
}),
});
@@ -679,8 +693,6 @@ impl InMemoryLayer {
let inner = self.inner.read().unwrap();
assert!(!inner.writeable);
let predecessor = inner.predecessor.clone();
if let Some(drop_lsn) = inner.drop_lsn {
let delta_layer = DeltaLayer::create(
self.conf,
@@ -690,7 +702,6 @@ impl InMemoryLayer {
self.start_lsn,
drop_lsn,
true,
predecessor,
inner.page_versions.iter(),
inner.segsizes.clone(),
)?;
@@ -729,7 +740,6 @@ impl InMemoryLayer {
self.start_lsn,
end_lsn,
false,
predecessor,
before_page_versions,
before_segsizes,
)?;
@@ -753,9 +763,4 @@ impl InMemoryLayer {
Ok(frozen_layers)
}
pub fn update_predecessor(&self, predecessor: Arc<dyn Layer>) -> Option<Arc<dyn Layer>> {
let mut inner = self.inner.write().unwrap();
inner.predecessor.replace(predecessor)
}
}

View File

@@ -10,7 +10,6 @@ use bytes::Bytes;
use serde::{Deserialize, Serialize};
use std::fmt;
use std::path::PathBuf;
use std::sync::Arc;
use zenith_utils::lsn::Lsn;
@@ -87,9 +86,9 @@ pub struct PageReconstructData {
pub enum PageReconstructResult {
/// Got all the data needed to reconstruct the requested page
Complete,
/// This layer didn't contain all the required data, the caller should collect
/// more data from the returned predecessor layer at the returned LSN.
Continue(Lsn, Arc<dyn Layer>),
/// This layer didn't contain all the required data, the caller should look up
/// the predecessor layer at the returned LSN and collect more data from there.
Continue(Lsn),
/// This layer didn't contain data needed to reconstruct the page version at
/// the returned LSN. This is usually considered an error, but might be OK
/// in some circumstances.
@@ -145,8 +144,8 @@ pub trait Layer: Send + Sync {
///
/// See PageReconstructResult for possible return values. The collected data
/// is appended to reconstruct_data; the caller should pass an empty struct
/// on first call. If this returns PageReconstructResult::Continue, call
/// again on the returned predecessor layer with the same 'reconstruct_data'
/// on first call. If this returns PageReconstructResult::Continue, look up
/// the predecessor layer and call again with the same 'reconstruct_data'
/// to collect more data.
fn get_page_reconstruct_data(
&self,