Remove disk IO from InMemoryLayer::freeze

Move the creation of Image and Delta layers from
`InMemoryLayer::freeze()` to `InMemoryLayer::write_to_disk`.
This commit is contained in:
Patrick Insinger
2021-09-14 14:27:04 -07:00
committed by Patrick Insinger
parent fea5954b18
commit 91ff09151d
2 changed files with 153 additions and 89 deletions

View File

@@ -31,6 +31,7 @@ use std::sync::{Arc, Mutex, MutexGuard};
use std::time::{Duration, Instant};
use std::{fs, thread};
use crate::layered_repository::inmemory_layer::FreezeLayers;
use crate::relish::*;
use crate::repository::{GcResult, Repository, Timeline, WALRecord};
use crate::restore_local_repo::import_timeline_wal;
@@ -1272,11 +1273,22 @@ impl LayeredTimeline {
}
// freeze it
let (new_historics, new_open) = oldest_layer.freeze(last_record_lsn, self)?;
let FreezeLayers {
frozen,
open: maybe_new_open,
} = oldest_layer.freeze(last_record_lsn)?;
let new_historics = frozen.write_to_disk(self)?;
if let Some(last_historic) = new_historics.last() {
if let Some(new_open) = &maybe_new_open {
new_open.update_predecessor(Arc::clone(last_historic));
}
}
// replace this layer with the new layers that 'freeze' returned
layers.pop_oldest_open();
if let Some(n) = new_open {
if let Some(n) = maybe_new_open {
layers.insert_open(n);
}
for n in new_historics {

View File

@@ -15,7 +15,6 @@ use crate::{ZTenantId, ZTimelineId};
use anyhow::{bail, Result};
use bytes::Bytes;
use log::*;
use std::cmp::Ordering;
use std::collections::BTreeMap;
use std::ops::Bound::Included;
use std::path::PathBuf;
@@ -31,20 +30,19 @@ pub struct InMemoryLayer {
///
/// This layer contains all the changes from 'start_lsn'. The
/// start is inclusive. There is no end LSN; we only use in-memory
/// layer at the end of a timeline.
/// start is inclusive.
///
start_lsn: Lsn,
/// Frozen in-memory layers have an inclusive end LSN.
end_lsn: Option<Lsn>,
/// LSN of the oldest page version stored in this layer
oldest_pending_lsn: Lsn,
/// The above fields never change. The parts that do change are in 'inner',
/// and protected by mutex.
inner: Mutex<InMemoryLayerInner>,
/// Predecessor layer
predecessor: Option<Arc<dyn Layer>>,
}
pub struct InMemoryLayerInner {
@@ -62,9 +60,12 @@ pub struct InMemoryLayerInner {
///
segsizes: BTreeMap<Lsn, u32>,
/// True if freeze() has been called on the layer, indicating it no longer
/// accepts writes.
frozen: bool,
/// Writes are only allowed when true.
/// Set to false when this layer is in the process of being replaced.
writeable: bool,
/// Predecessor layer
predecessor: Option<Arc<dyn Layer>>,
}
impl InMemoryLayerInner {
@@ -74,7 +75,7 @@ impl InMemoryLayerInner {
// `get_layer_for_write` and InMemoryLayer write calls interleave with
// a checkpoint operation, however timing should make this rare.
// Assert only so we can identify when the bug triggers more easily.
assert!(!self.frozen);
assert!(self.writeable);
}
fn get_seg_size(&self, lsn: Lsn) -> u32 {
@@ -154,6 +155,8 @@ impl Layer for InMemoryLayer {
assert!(self.seg.blknum_in_seg(blknum));
let predecessor: Option<Arc<dyn Layer>>;
{
let inner = self.inner.lock().unwrap();
@@ -181,17 +184,15 @@ impl Layer for InMemoryLayer {
}
}
predecessor = inner.predecessor.clone();
// release lock on 'inner'
}
// If an older page image is needed to reconstruct the page, let the
// caller know about the predecessor layer.
if need_image {
if let Some(cont_layer) = &self.predecessor {
Ok(PageReconstructResult::Continue(
self.start_lsn,
Arc::clone(cont_layer),
))
if let Some(cont_layer) = predecessor {
Ok(PageReconstructResult::Continue(self.start_lsn, cont_layer))
} else {
Ok(PageReconstructResult::Missing(self.start_lsn))
}
@@ -242,7 +243,8 @@ impl Layer for InMemoryLayer {
}
fn is_incremental(&self) -> bool {
self.predecessor.is_some()
let inner = self.inner.lock().unwrap();
inner.predecessor.is_some()
}
/// debugging function to print out the contents of the layer
@@ -278,11 +280,19 @@ impl Layer for InMemoryLayer {
}
}
// Type alias to simplify InMemoryLayer::freeze signature
//
type SuccessorLayers = (Vec<Arc<dyn Layer>>, Option<Arc<InMemoryLayer>>);
/// Helper struct to cleanup `InMemoryLayer::freeze` return signature.
pub struct FreezeLayers {
/// Replacement layer for the layer which freeze was called on.
pub frozen: Arc<InMemoryLayer>,
/// New open layer containing leftover data.
pub open: Option<Arc<InMemoryLayer>>,
}
impl InMemoryLayer {
fn assert_not_frozen(&self) {
assert!(self.end_lsn.is_none());
}
/// Return the oldest page version that's stored in this layer
pub fn get_oldest_pending_lsn(&self) -> Lsn {
self.oldest_pending_lsn
@@ -312,14 +322,15 @@ impl InMemoryLayer {
tenantid,
seg,
start_lsn,
end_lsn: None,
oldest_pending_lsn,
inner: Mutex::new(InMemoryLayerInner {
drop_lsn: None,
page_versions: BTreeMap::new(),
segsizes: BTreeMap::new(),
frozen: false,
writeable: true,
predecessor: None,
}),
predecessor: None,
})
}
@@ -352,6 +363,7 @@ impl InMemoryLayer {
/// Common subroutine of the public put_wal_record() and put_page_image() functions.
/// Adds the page version to the in-memory tree
pub fn put_page_version(&self, blknum: u32, lsn: Lsn, pv: PageVersion) -> Result<u32> {
self.assert_not_frozen();
assert!(self.seg.blknum_in_seg(blknum));
trace!(
@@ -430,8 +442,9 @@ impl InMemoryLayer {
/// Remember that the relation was truncated at given LSN
pub fn put_truncation(&self, lsn: Lsn, segsize: u32) -> anyhow::Result<()> {
let mut inner = self.inner.lock().unwrap();
self.assert_not_frozen();
let mut inner = self.inner.lock().unwrap();
inner.assert_writeable();
// check that this we truncate to a smaller size than segment was before the truncation
@@ -450,6 +463,8 @@ impl InMemoryLayer {
/// Remember that the segment was dropped at given LSN
pub fn drop_segment(&self, lsn: Lsn) -> anyhow::Result<()> {
self.assert_not_frozen();
let mut inner = self.inner.lock().unwrap();
inner.assert_writeable();
@@ -496,37 +511,23 @@ impl InMemoryLayer {
tenantid,
seg,
start_lsn,
end_lsn: None,
oldest_pending_lsn,
inner: Mutex::new(InMemoryLayerInner {
drop_lsn: None,
page_versions: BTreeMap::new(),
segsizes,
frozen: false,
writeable: true,
predecessor: Some(src),
}),
predecessor: Some(src),
})
}
///
/// Write the this in-memory layer to disk.
///
/// The cutoff point for the layer that's written to disk is 'end_lsn'.
///
/// Returns new layers that replace this one. Always returns a new image
/// layer containing the page versions at the cutoff LSN, that were written
/// to disk, and usually also a DeltaLayer that includes all the WAL records
/// between start LSN and the cutoff. (The delta layer is not needed when
/// a new relish is created with a single LSN, so that the start and end LSN
/// are the same.) If there were page versions newer than 'end_lsn', also
/// returns a new in-memory layer containing those page versions. The caller
/// replaces this layer with the returned layers in the layer map.
///
pub fn freeze(
&self,
cutoff_lsn: Lsn,
// This is needed just to call materialize_page()
timeline: &LayeredTimeline,
) -> Result<SuccessorLayers> {
/// Splits `self` into two InMemoryLayers: `frozen` and `open`.
/// All data up to and including `cutoff_lsn` (or the drop LSN, if dropped)
/// is copied to `frozen`, while the remaining data is copied to `open`.
/// After completion, self is non-writeable, but not frozen.
pub fn freeze(&self, cutoff_lsn: Lsn) -> Result<FreezeLayers> {
info!(
"freezing in memory layer for {} on timeline {} at {}",
self.seg, self.timelineid, cutoff_lsn
@@ -534,7 +535,7 @@ impl InMemoryLayer {
let mut inner = self.inner.lock().unwrap();
inner.assert_writeable();
inner.frozen = true;
inner.writeable = false;
// Normally, use the cutoff LSN as the end of the frozen layer.
// But if the relation was dropped, we know that there are no
@@ -568,17 +569,10 @@ impl InMemoryLayer {
before_page_versions = BTreeMap::new();
after_page_versions = BTreeMap::new();
for ((blknum, lsn), pv) in inner.page_versions.iter() {
match lsn.cmp(&end_lsn) {
Ordering::Less => {
before_page_versions.insert((*blknum, *lsn), pv.clone());
}
Ordering::Equal => {
// Page versions at the cutoff LSN will be stored in the
// materialized image layer.
}
Ordering::Greater => {
after_page_versions.insert((*blknum, *lsn), pv.clone());
}
if *lsn > end_lsn {
after_page_versions.insert((*blknum, *lsn), pv.clone());
} else {
before_page_versions.insert((*blknum, *lsn), pv.clone());
}
}
} else {
@@ -588,7 +582,83 @@ impl InMemoryLayer {
after_page_versions = BTreeMap::new();
}
// we can release the lock now.
let frozen = Arc::new(InMemoryLayer {
conf: self.conf,
tenantid: self.tenantid,
timelineid: self.timelineid,
seg: self.seg,
start_lsn: self.start_lsn,
end_lsn: Some(end_lsn),
oldest_pending_lsn: self.start_lsn,
inner: Mutex::new(InMemoryLayerInner {
drop_lsn: inner.drop_lsn,
page_versions: before_page_versions,
segsizes: before_segsizes,
writeable: false,
predecessor: inner.predecessor.clone(),
}),
});
let open = if !dropped && (!after_segsizes.is_empty() || !after_page_versions.is_empty()) {
let mut new_open = Self::create_successor_layer(
self.conf,
frozen.clone(),
self.timelineid,
self.tenantid,
end_lsn,
end_lsn,
)?;
let new_inner = new_open.inner.get_mut().unwrap();
new_inner.page_versions.append(&mut after_page_versions);
new_inner.segsizes.append(&mut after_segsizes);
Some(Arc::new(new_open))
} else {
None
};
// TODO could we avoid creating the `frozen` if it contains no data
Ok(FreezeLayers { frozen, open })
}
/// Write the this frozen in-memory layer to disk.
///
/// Returns new layers that replace this one.
/// If not dropped, returns a new image layer containing the page versions
/// at the `end_lsn`. Can also return a DeltaLayer that includes all the
/// WAL records between start and end LSN. (The delta layer is not needed
/// when a new relish is created with a single LSN, so that the start and
/// end LSN are the same.)
pub fn write_to_disk(&self, timeline: &LayeredTimeline) -> Result<Vec<Arc<dyn Layer>>> {
let end_lsn = self.end_lsn.expect("can only write frozen layers to disk");
let inner = self.inner.lock().unwrap();
let drop_lsn = inner.drop_lsn.clone();
let predecessor = inner.predecessor.clone();
let mut before_page_versions;
let mut before_segsizes;
if inner.drop_lsn.is_none() {
before_segsizes = BTreeMap::new();
for (lsn, size) in inner.segsizes.iter() {
if *lsn <= end_lsn {
before_segsizes.insert(*lsn, *size);
}
}
before_page_versions = BTreeMap::new();
for ((blknum, lsn), pv) in inner.page_versions.iter() {
if *lsn < end_lsn {
before_page_versions.insert((*blknum, *lsn), pv.clone());
}
}
} else {
before_page_versions = inner.page_versions.clone();
before_segsizes = inner.segsizes.clone();
}
drop(inner);
let mut frozen_layers: Vec<Arc<dyn Layer>> = Vec::new();
@@ -602,13 +672,12 @@ impl InMemoryLayer {
self.seg,
self.start_lsn,
end_lsn,
dropped,
self.predecessor.clone(),
drop_lsn.is_some(),
predecessor,
before_page_versions,
before_segsizes,
)?;
let delta_layer_rc: Arc<dyn Layer> = Arc::new(delta_layer);
frozen_layers.push(delta_layer_rc);
frozen_layers.push(Arc::new(delta_layer));
trace!(
"freeze: created delta layer {} {}-{}",
self.seg,
@@ -619,35 +688,18 @@ impl InMemoryLayer {
assert!(before_page_versions.is_empty());
}
let mut new_open_rc = None;
if !dropped {
if drop_lsn.is_none() {
// Write a new base image layer at the cutoff point
let imgfile = ImageLayer::create_from_src(self.conf, timeline, self, end_lsn)?;
let imgfile_rc: Arc<dyn Layer> = Arc::new(imgfile);
frozen_layers.push(Arc::clone(&imgfile_rc));
let image_layer = ImageLayer::create_from_src(self.conf, &timeline, self, end_lsn)?;
frozen_layers.push(Arc::new(image_layer));
trace!("freeze: created image layer {} at {}", self.seg, end_lsn);
// If there were any page versions newer than the cutoff, initialize a new in-memory
// layer to hold them
if !after_segsizes.is_empty() || !after_page_versions.is_empty() {
let new_open = Self::create_successor_layer(
self.conf,
imgfile_rc,
self.timelineid,
self.tenantid,
end_lsn,
end_lsn,
)?;
let mut new_inner = new_open.inner.lock().unwrap();
new_inner.page_versions.append(&mut after_page_versions);
new_inner.segsizes.append(&mut after_segsizes);
drop(new_inner);
trace!("freeze: created new in-mem layer {} {}-", self.seg, end_lsn);
new_open_rc = Some(Arc::new(new_open))
}
}
Ok((frozen_layers, new_open_rc))
Ok(frozen_layers)
}
pub fn update_predecessor(&self, predecessor: Arc<dyn Layer>) {
let mut inner = self.inner.lock().unwrap();
inner.predecessor = Some(predecessor);
}
}