WIP Store base images in separate ImageLayers

This commit is contained in:
Heikki Linnakangas
2021-08-16 23:22:55 +03:00
parent 882f549236
commit ddb7155bbe
7 changed files with 666 additions and 122 deletions

View File

@@ -35,16 +35,19 @@ use crate::PageServerConf;
use crate::{ZTenantId, ZTimelineId};
use zenith_metrics::{register_histogram_vec, HistogramVec};
use zenith_metrics::{register_histogram, Histogram};
use zenith_utils::bin_ser::BeSer;
use zenith_utils::lsn::{AtomicLsn, Lsn};
use zenith_utils::seqwait::SeqWait;
mod filename;
mod image_layer;
mod inmemory_layer;
mod layer_map;
mod snapshot_layer;
mod storage_layer;
use image_layer::ImageLayer;
use inmemory_layer::InMemoryLayer;
use layer_map::LayerMap;
use snapshot_layer::SnapshotLayer;
@@ -74,6 +77,16 @@ lazy_static! {
.expect("failed to define a metric");
}
// Metrics collected on operations on the storage repository.
lazy_static! {
static ref RECONSTRUCT_TIME: Histogram = register_histogram!(
"pageserver_getpage_reconstruct_time",
"FIXME Time spent on storage operations"
)
.expect("failed to define a metric");
}
///
/// Repository consists of multiple timelines. Keep them in a hash table.
///
@@ -486,7 +499,10 @@ impl Timeline for LayeredTimeline {
let seg = SegmentTag::from_blknum(rel, blknum);
if let Some((layer, lsn)) = self.get_layer_for_read(seg, lsn)? {
self.materialize_page(seg, blknum, lsn, &*layer)
RECONSTRUCT_TIME
.observe_closure_duration(|| {
self.materialize_page(seg, blknum, lsn, &*layer)
})
} else {
bail!("relish {} not found at {}", rel, lsn);
}
@@ -885,9 +901,21 @@ impl LayeredTimeline {
self.timelineid
);
let mut layers = self.layers.lock().unwrap();
let snapfilenames =
let (snapfilenames, imgfilenames) =
filename::list_snapshot_files(self.conf, self.timelineid, self.tenantid)?;
for filename in imgfilenames.iter() {
let layer = ImageLayer::load_image_layer(self.conf, self.timelineid, self.tenantid, filename)?;
info!(
"found layer {} {} on timeline {}",
layer.get_seg_tag(),
layer.get_start_lsn(),
self.timelineid
);
layers.insert_historic(Arc::new(layer));
}
for filename in snapfilenames.iter() {
let layer = SnapshotLayer::load_snapshot_layer(self.conf, self.timelineid, self.tenantid, filename)?;
@@ -1031,10 +1059,9 @@ impl LayeredTimeline {
prev_layer.get_start_lsn(),
prev_layer.get_end_lsn()
);
layer = InMemoryLayer::copy_snapshot(
layer = InMemoryLayer::create_successor_layer(
self.conf,
&self,
&*prev_layer,
prev_layer,
self.timelineid,
self.tenantid,
start_lsn,
@@ -1147,14 +1174,14 @@ impl LayeredTimeline {
break;
}
let (new_historic, new_open) = oldest_layer.freeze(last_valid_lsn, &self)?;
let (new_historics, new_open) = oldest_layer.freeze(last_valid_lsn, &self)?;
// replace this layer with the new layers that 'freeze' returned
layers.pop_oldest();
if let Some(n) = new_open {
layers.insert_open(n);
}
if let Some(historic) = new_historic {
for historic in new_historics {
trace!(
"freeze returned layer {} {}-{}",
historic.get_seg_tag(),
@@ -1223,7 +1250,7 @@ impl LayeredTimeline {
self.timelineid, cutoff
);
let mut layers_to_remove: Vec<Arc<SnapshotLayer>> = Vec::new();
let mut layers_to_remove: Vec<Arc<dyn Layer>> = Vec::new();
// Scan all snapshot files in the directory. For each file, if a newer file
// exists, we can remove the old one.

View File

@@ -151,6 +151,132 @@ impl fmt::Display for SnapshotFileName {
}
}
#[derive(Debug, PartialEq, Eq, PartialOrd, Ord, Clone)]
pub struct ImageFileName {
pub seg: SegmentTag,
pub lsn: Lsn,
}
impl ImageFileName {
fn from_str(fname: &str) -> Option<Self> {
// Split the filename into parts
//
// <spcnode>_<dbnode>_<relnode>_<forknum>_<seg>_<start LSN>_<end LSN>
//
// or if it was dropped:
//
// <spcnode>_<dbnode>_<relnode>_<forknum>_<seg>_<start LSN>_<end LSN>_DROPPED
//
let rel;
let mut parts;
if let Some(rest) = fname.strip_prefix("rel_") {
parts = rest.split('_');
rel = RelishTag::Relation(RelTag {
spcnode: parts.next()?.parse::<u32>().ok()?,
dbnode: parts.next()?.parse::<u32>().ok()?,
relnode: parts.next()?.parse::<u32>().ok()?,
forknum: parts.next()?.parse::<u8>().ok()?,
});
} else if let Some(rest) = fname.strip_prefix("pg_xact_") {
parts = rest.split('_');
rel = RelishTag::Slru {
slru: SlruKind::Clog,
segno: u32::from_str_radix(parts.next()?, 16).ok()?,
};
} else if let Some(rest) = fname.strip_prefix("pg_multixact_members_") {
parts = rest.split('_');
rel = RelishTag::Slru {
slru: SlruKind::MultiXactMembers,
segno: u32::from_str_radix(parts.next()?, 16).ok()?,
};
} else if let Some(rest) = fname.strip_prefix("pg_multixact_offsets_") {
parts = rest.split('_');
rel = RelishTag::Slru {
slru: SlruKind::MultiXactOffsets,
segno: u32::from_str_radix(parts.next()?, 16).ok()?,
};
} else if let Some(rest) = fname.strip_prefix("pg_filenodemap_") {
parts = rest.split('_');
rel = RelishTag::FileNodeMap {
spcnode: parts.next()?.parse::<u32>().ok()?,
dbnode: parts.next()?.parse::<u32>().ok()?,
};
} else if let Some(rest) = fname.strip_prefix("pg_twophase_") {
parts = rest.split('_');
rel = RelishTag::TwoPhase {
xid: parts.next()?.parse::<u32>().ok()?,
};
} else if let Some(rest) = fname.strip_prefix("pg_control_checkpoint_") {
parts = rest.split('_');
rel = RelishTag::Checkpoint;
} else if let Some(rest) = fname.strip_prefix("pg_control_") {
parts = rest.split('_');
rel = RelishTag::ControlFile;
} else {
return None;
}
let segno = parts.next()?.parse::<u32>().ok()?;
let seg = SegmentTag {
rel,
segno
};
let lsn = Lsn::from_hex(parts.next()?).ok()?;
if parts.next().is_some() {
warn!("unrecognized filename in timeline dir: {}", fname);
return None;
}
Some(ImageFileName {
seg,
lsn,
})
}
fn to_string(&self) -> String {
let basename = match self.seg.rel {
RelishTag::Relation(reltag) => format!(
"rel_{}_{}_{}_{}",
reltag.spcnode, reltag.dbnode, reltag.relnode, reltag.forknum
),
RelishTag::Slru {
slru: SlruKind::Clog,
segno,
} => format!("pg_xact_{:04X}", segno),
RelishTag::Slru {
slru: SlruKind::MultiXactMembers,
segno,
} => format!("pg_multixact_members_{:04X}", segno),
RelishTag::Slru {
slru: SlruKind::MultiXactOffsets,
segno,
} => format!("pg_multixact_offsets_{:04X}", segno),
RelishTag::FileNodeMap { spcnode, dbnode } => {
format!("pg_filenodemap_{}_{}", spcnode, dbnode)
}
RelishTag::TwoPhase { xid } => format!("pg_twophase_{}", xid),
RelishTag::Checkpoint => format!("pg_control_checkpoint"),
RelishTag::ControlFile => format!("pg_control"),
};
format!(
"{}_{}_{:016X}",
basename,
self.seg.segno,
u64::from(self.lsn),
)
}
}
impl fmt::Display for ImageFileName {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(f, "{}", self.to_string())
}
}
/// Create SnapshotLayers representing all files on disk
///
@@ -159,10 +285,11 @@ pub fn list_snapshot_files(
conf: &'static PageServerConf,
timelineid: ZTimelineId,
tenantid: ZTenantId,
) -> Result<Vec<SnapshotFileName>> {
) -> Result<(Vec<SnapshotFileName>, Vec<ImageFileName>)> {
let path = conf.timeline_path(&timelineid, &tenantid);
let mut snapfiles: Vec<SnapshotFileName> = Vec::new();
let mut imgfiles: Vec<ImageFileName> = Vec::new();
for direntry in fs::read_dir(path)? {
let fname = direntry?.file_name();
let fname = fname.to_str().unwrap();
@@ -170,6 +297,10 @@ pub fn list_snapshot_files(
if let Some(snapfilename) = SnapshotFileName::from_str(fname) {
snapfiles.push(snapfilename);
}
if let Some(imgfilename) = ImageFileName::from_str(fname) {
imgfiles.push(imgfilename);
}
}
return Ok(snapfiles);
return Ok((snapfiles, imgfiles));
}

View File

@@ -0,0 +1,384 @@
//! FIXME
//! A SnapshotLayer represents one snapshot file on disk. One file holds all page
//! version and size information of one relation, in a range of LSN.
//! The name "snapshot file" is a bit of a misnomer because a snapshot file doesn't
//! contain a snapshot at a specific LSN, but rather all the page versions in a range
//! of LSNs.
//!
//! Currently, a snapshot file contains full information needed to reconstruct any
//! page version in the LSN range, without consulting any other snapshot files. When
//! a new snapshot file is created for writing, the full contents of relation are
//! materialized as it is at the beginning of the LSN range. That can be very expensive,
//! we should find a way to store differential files. But this keeps the read-side
//! of things simple. You can find the correct snapshot file based on RelishTag and
//! timeline+LSN, and once you've located it, you have all the data you need to in that
//! file.
//!
//! When a snapshot file needs to be accessed, we slurp the whole file into memory, into
//! the SnapshotLayer struct. See load() and unload() functions.
//!
//! On disk, the snapshot files are stored in timelines/<timelineid> directory.
//! Currently, there are no subdirectories, and each snapshot file is named like this:
//!
//! <spcnode>_<dbnode>_<relnode>_<forknum>_<start LSN>_<end LSN>
//!
//! For example:
//!
//! 1663_13990_2609_0_000000000169C348_000000000169C349
//!
//! If a relation is dropped, we add a '_DROPPED' to the end of the filename to indicate that.
//! So the above example would become:
//!
//! 1663_13990_2609_0_000000000169C348_000000000169C349_DROPPED
//!
//! The end LSN indicates when it was dropped in that case, we don't store it in the
//! file contents in any way.
//!
//! A snapshot file is constructed using the 'bookfile' crate. Each file consists of two
//! parts: the page versions and the relation sizes. They are stored as separate chapters.
//! FIXME
//!
use crate::layered_repository::storage_layer::{Layer, PageReconstructData, SegmentTag};
use crate::layered_repository::LayeredTimeline;
use crate::layered_repository::filename::{ImageFileName};
use crate::layered_repository::RELISH_SEG_SIZE;
use crate::PageServerConf;
use crate::{ZTenantId, ZTimelineId};
use anyhow::{bail, Result};
use bytes::Bytes;
use lazy_static::lazy_static;
use log::*;
use std::fs;
use std::fs::File;
use std::io::Write;
use std::path::PathBuf;
use std::sync::{Mutex, MutexGuard};
use bookfile::{Book, BookWriter};
use zenith_metrics::{register_histogram, Histogram};
use zenith_utils::bin_ser::BeSer;
use zenith_utils::lsn::Lsn;
// Magic constant to identify a Zenith segment image file
static IMAGE_FILE_MAGIC: u32 = 0x5A616E01 + 1;
static BASE_IMAGES_CHAPTER: u64 = 1;
// Metrics collected on operations on the storage repository.
lazy_static! {
static ref RECONSTRUCT_TIME: Histogram = register_histogram!(
"pageserver_image_reconstruct_time",
"FIXME Time spent on storage operations"
)
.expect("failed to define a metric");
}
///
/// SnapshotLayer is the in-memory data structure associated with an
/// on-disk snapshot file. We keep a SnapshotLayer in memory for each
/// file, in the LayerMap. If a layer is in "loaded" state, we have a
/// copy of the file in memory, in 'inner'. Otherwise the struct is
/// just a placeholder for a file that exists on disk, and it needs to
/// be loaded before using it in queries.
///
pub struct ImageLayer {
conf: &'static PageServerConf,
pub tenantid: ZTenantId,
pub timelineid: ZTimelineId,
pub seg: SegmentTag,
// This entry contains an image of all pages as of this LSN
pub lsn: Lsn,
inner: Mutex<ImageLayerInner>,
}
pub struct ImageLayerInner {
/// If false, the 'page_versions' and 'relsizes' have not been
/// loaded into memory yet.
loaded: bool,
// indexed by block number (within segment)
base_images: Vec<Bytes>,
}
impl Layer for ImageLayer {
fn get_timeline_id(&self) -> ZTimelineId {
return self.timelineid;
}
fn get_seg_tag(&self) -> SegmentTag {
return self.seg;
}
fn is_dropped(&self) -> bool {
return false;
}
fn get_start_lsn(&self) -> Lsn {
return self.lsn;
}
fn get_end_lsn(&self) -> Lsn {
return self.lsn;
}
/// Look up given page in the cache.
fn get_page_reconstruct_data(
&self,
blknum: u32,
lsn: Lsn,
reconstruct_data: &mut PageReconstructData,
) -> Result<Option<Lsn>> {
let need_base_image_lsn: Option<Lsn>;
assert!(lsn >= self.lsn);
{
let inner = self.load()?;
let base_blknum: usize = (blknum % RELISH_SEG_SIZE) as usize;
if let Some(img) = inner.base_images.get(base_blknum) {
reconstruct_data.page_img = Some(img.clone());
need_base_image_lsn = None;
} else {
bail!("no base img found for {} at blk {} at LSN {}", self.seg, base_blknum, lsn);
}
// release lock on 'inner'
}
Ok(need_base_image_lsn)
}
/// Get size of the relation at given LSN
fn get_seg_size(&self, _lsn: Lsn) -> Result<u32> {
let inner = self.load()?;
let result = inner.base_images.len() as u32;
Ok(result)
}
/// Does this segment exist at given LSN?
fn get_seg_exists(&self, _lsn: Lsn) -> Result<bool> {
Ok(true)
}
///
/// Release most of the memory used by this layer. If it's accessed again later,
/// it will need to be loaded back.
///
fn unload(&self) -> Result<()> {
let mut inner = self.inner.lock().unwrap();
inner.base_images = Vec::new();
inner.loaded = false;
Ok(())
}
fn delete(&self) -> Result<()> {
// delete underlying file
fs::remove_file(self.path())?;
Ok(())
}
fn is_incremental(&self) -> bool {
false
}
}
impl ImageLayer {
fn path(&self) -> PathBuf {
Self::path_for(
self.conf,
self.timelineid,
self.tenantid,
&ImageFileName {
seg: self.seg,
lsn: self.lsn,
},
)
}
fn path_for(
conf: &'static PageServerConf,
timelineid: ZTimelineId,
tenantid: ZTenantId,
fname: &ImageFileName,
) -> PathBuf {
conf.timeline_path(&timelineid, &tenantid)
.join(fname.to_string())
}
/// Create a new snapshot file, using the given btreemaps containing the page versions and
/// relsizes.
/// FIXME comment
/// This is used to write the in-memory layer to disk. The in-memory layer uses the same
/// data structure with two btreemaps as we do, so passing the btreemaps is currently
/// expedient.
pub fn create(
conf: &'static PageServerConf,
timelineid: ZTimelineId,
tenantid: ZTenantId,
seg: SegmentTag,
lsn: Lsn,
base_images: Vec<Bytes>,
) -> Result<ImageLayer> {
let layer = ImageLayer {
conf: conf,
timelineid: timelineid,
tenantid: tenantid,
seg: seg,
lsn: lsn,
inner: Mutex::new(ImageLayerInner {
loaded: true,
base_images: base_images,
}),
};
let inner = layer.inner.lock().unwrap();
// Write the images into a file
let path = layer.path();
// Note: This overwrites any existing file. There shouldn't be any.
// FIXME: throw an error instead?
let file = File::create(&path)?;
let book = BookWriter::new(file, IMAGE_FILE_MAGIC)?;
// Write out the base images
let mut chapter = book.new_chapter(BASE_IMAGES_CHAPTER);
let buf = Vec::ser(&inner.base_images)?;
chapter.write_all(&buf)?;
let book = chapter.close()?;
book.close()?;
trace!("saved {}", &path.display());
drop(inner);
Ok(layer)
}
pub fn create_from_src(
conf: &'static PageServerConf,
timeline: &LayeredTimeline,
src: &dyn Layer,
lsn: Lsn,
) -> Result<ImageLayer> {
let seg = src.get_seg_tag();
let timelineid = timeline.timelineid;
let startblk;
let size;
if seg.rel.is_blocky() {
size = src.get_seg_size(lsn)?;
startblk = seg.segno * RELISH_SEG_SIZE;
} else {
size = 1;
startblk = 0;
}
trace!(
"creating new ImageLayer for {} on timeline {} at {}",
seg,
timelineid,
lsn,
);
let mut base_images: Vec<Bytes> = Vec::new();
for blknum in startblk..(startblk+size) {
let img =
RECONSTRUCT_TIME
.observe_closure_duration(|| {
timeline.materialize_page(seg, blknum, lsn, &*src)
})?;
base_images.push(img);
}
Self::create(conf, timelineid, timeline.tenantid, seg, lsn,
base_images)
}
///
/// Load the contents of the file into memory
///
fn load(&self) -> Result<MutexGuard<ImageLayerInner>> {
// quick exit if already loaded
let mut inner = self.inner.lock().unwrap();
if inner.loaded {
return Ok(inner);
}
let path = Self::path_for(
self.conf,
self.timelineid,
self.tenantid,
&ImageFileName {
seg: self.seg,
lsn: self.lsn,
},
);
let file = File::open(&path)?;
let book = Book::new(file)?;
let chapter = book.read_chapter(BASE_IMAGES_CHAPTER)?;
let base_images = Vec::des(&chapter)?;
debug!("loaded from {}", &path.display());
*inner = ImageLayerInner {
loaded: true,
base_images,
};
Ok(inner)
}
/// Create an ImageLayer represent a file on disk
pub fn load_image_layer(
conf: &'static PageServerConf,
timelineid: ZTimelineId,
tenantid: ZTenantId,
filename: &ImageFileName,
) -> Result<ImageLayer> {
let layer = ImageLayer {
conf,
timelineid,
tenantid,
seg: filename.seg,
lsn: filename.lsn,
inner: Mutex::new(ImageLayerInner {
loaded: false,
base_images: Vec::new(),
}),
};
Ok(layer)
}
/// debugging function to print out the contents of the layer
#[allow(unused)]
pub fn dump(&self) -> String {
let mut result = format!(
"----- image layer for {} at {} ----\n",
self.seg, self.lsn,
);
//let inner = self.inner.lock().unwrap();
//for (k, v) in inner.page_versions.iter() {
// result += &format!("blk {} at {}: {}/{}\n", k.0, k.1, v.page_image.is_some(), v.record.is_some());
//}
result
}
}

View File

@@ -6,8 +6,7 @@ use crate::layered_repository::storage_layer::{
Layer, PageReconstructData, PageVersion, SegmentTag, RELISH_SEG_SIZE,
};
use crate::layered_repository::LayeredTimeline;
use crate::layered_repository::SnapshotLayer;
use crate::repository::WALRecord;
use crate::layered_repository::{ImageLayer, SnapshotLayer};
use crate::PageServerConf;
use crate::{ZTenantId, ZTimelineId};
use anyhow::{bail, Result};
@@ -37,14 +36,14 @@ pub struct InMemoryLayer {
/// The above fields never change. The parts that do change are in 'inner',
/// and protected by mutex.
inner: Mutex<InMemoryLayerInner>,
img_layer: Option<Arc<dyn Layer>>,
}
pub struct InMemoryLayerInner {
/// If this relation was dropped, remember when that happened.
drop_lsn: Option<Lsn>,
base_images: Vec<Bytes>,
///
/// All versions of all pages in the layer are are kept here.
/// Indexed by block number and LSN.
@@ -130,13 +129,11 @@ impl Layer for InMemoryLayer {
}
// Use the base image, if needed
if need_base_image_lsn.is_some() {
let base_blknum: usize = (blknum % RELISH_SEG_SIZE) as usize;
if let Some(img) = inner.base_images.get(base_blknum) {
reconstruct_data.page_img = Some(img.clone());
need_base_image_lsn = None;
if let Some(need_lsn) = need_base_image_lsn {
if let Some(img_layer) = &self.img_layer {
need_base_image_lsn = img_layer.get_page_reconstruct_data(blknum, need_lsn, reconstruct_data)?;
} else {
bail!("inmem: no base img found for {} at blk {} at LSN {}", self.seg, base_blknum, lsn);
bail!("no base img found for {} at blk {} at LSN {}", self.seg, blknum, lsn);
}
}
@@ -157,8 +154,11 @@ impl Layer for InMemoryLayer {
let result;
if let Some((_entry_lsn, entry)) = iter.next_back() {
result = *entry;
// Use the base image if needed
} else if let Some(img_layer) = &self.img_layer {
result = img_layer.get_seg_size(lsn)?;
} else {
result = inner.base_images.len() as u32;
result = 0;
}
trace!("get_seg_size: {} at {} -> {}", self.seg, lsn, result);
Ok(result)
@@ -178,6 +178,23 @@ impl Layer for InMemoryLayer {
// Otherwise, it exists
Ok(true)
}
///
/// Release most of the memory used by this layer. If it's accessed again later,
/// it will need to be loaded back.
///
fn unload(&self) -> Result<()> {
Ok(())
}
fn delete(&self) -> Result<()> {
Ok(())
}
fn is_incremental(&self) -> bool {
true
}
}
impl InMemoryLayer {
@@ -213,11 +230,11 @@ impl InMemoryLayer {
oldest_pending_lsn,
inner: Mutex::new(InMemoryLayerInner {
drop_lsn: None,
base_images: Vec::new(),
page_versions: BTreeMap::new(),
segsizes: BTreeMap::new(),
mem_used: 0,
}),
img_layer: None,
})
}
@@ -285,8 +302,10 @@ impl InMemoryLayer {
let oldsize;
if let Some((_entry_lsn, entry)) = iter.next_back() {
oldsize = *entry;
} else if let Some(img_layer) = &self.img_layer {
oldsize = img_layer.get_seg_size(lsn)?;
} else {
oldsize = inner.base_images.len() as u32;
oldsize = 0;
//bail!("No old size found for {} at {}", self.tag, lsn);
}
if newsize > oldsize {
@@ -333,60 +352,37 @@ impl InMemoryLayer {
/// Initialize a new InMemoryLayer for, by copying the state at the given
/// point in time from given existing layer.
///
pub fn copy_snapshot(
pub fn create_successor_layer(
conf: &'static PageServerConf,
timeline: &LayeredTimeline,
src: &dyn Layer,
src: Arc<dyn Layer>,
timelineid: ZTimelineId,
tenantid: ZTenantId,
start_lsn: Lsn,
oldest_pending_lsn: Lsn,
) -> Result<InMemoryLayer> {
let mut mem_used = 0;
let seg = src.get_seg_tag();
let startblk;
let size;
if seg.rel.is_blocky() {
size = src.get_seg_size(start_lsn)?;
startblk = seg.segno * RELISH_SEG_SIZE;
} else {
size = 1;
startblk = 0;
}
trace!(
"initializing new InMemoryLayer for writing {} on timeline {} at {}, size {}",
src.get_seg_tag(),
"initializing new InMemoryLayer for writing {} on timeline {} at {}",
seg,
timelineid,
start_lsn,
size,
);
let mut base_images: Vec<Bytes> = Vec::new();
for blknum in startblk..(startblk+size) {
let img = timeline.materialize_page(seg, blknum, start_lsn, src)?;
mem_used += img.len();
base_images.push(img);
}
Ok(InMemoryLayer {
conf,
timelineid,
tenantid,
seg: src.get_seg_tag(),
seg,
start_lsn,
oldest_pending_lsn,
inner: Mutex::new(InMemoryLayerInner {
drop_lsn: None,
base_images: base_images,
page_versions: BTreeMap::new(),
segsizes: BTreeMap::new(),
mem_used: mem_used,
mem_used: 0,
}),
img_layer: Some(src),
})
}
@@ -406,7 +402,7 @@ impl InMemoryLayer {
cutoff_lsn: Lsn,
// This is needed just to call materialize_page()
timeline: &LayeredTimeline,
) -> Result<(Option<Arc<SnapshotLayer>>, Option<Arc<InMemoryLayer>>)> {
) -> Result<(Vec<Arc<dyn Layer>>, Option<Arc<InMemoryLayer>>)> {
info!(
"freezing in memory layer for {} on timeline {} at {}",
self.seg, self.timelineid, cutoff_lsn
@@ -428,7 +424,6 @@ impl InMemoryLayer {
};
// Divide all the page versions into old and new at the 'end_lsn' cutoff point.
let before_base_images = inner.base_images.clone();
let mut before_page_versions;
let mut before_segsizes;
let mut after_page_versions;
@@ -463,6 +458,18 @@ impl InMemoryLayer {
// we can release the lock now.
drop(inner);
let mut historics: Vec<Arc<dyn Layer>> = Vec::new();
// write a new base image layer at the cutoff point
let imgfile = ImageLayer::create_from_src(
self.conf,
timeline,
self,
end_lsn,
)?;
let imgfile_rc: Arc<dyn Layer> = Arc::new(imgfile);
historics.push(Arc::clone(&imgfile_rc));
// Write the page versions before the cutoff to disk.
let snapfile = SnapshotLayer::create(
self.conf,
@@ -472,10 +479,12 @@ impl InMemoryLayer {
self.start_lsn,
end_lsn,
dropped,
before_base_images,
self.img_layer.clone(),
before_page_versions,
before_segsizes,
)?;
let snapfile_rc: Arc<dyn Layer> = Arc::new(snapfile);
historics.push(snapfile_rc);
// If there were any "new" page versions, initialize a new in-memory layer to hold
// them
@@ -483,10 +492,9 @@ impl InMemoryLayer {
if !after_segsizes.is_empty() || !after_page_versions.is_empty() {
info!("created new in-mem layer for {} {}-", self.seg, end_lsn);
let new_open = Self::copy_snapshot(
let new_open = Self::create_successor_layer(
self.conf,
timeline,
&snapfile,
imgfile_rc,
self.timelineid,
self.tenantid,
end_lsn,
@@ -502,9 +510,7 @@ impl InMemoryLayer {
None
};
let new_historic = Some(Arc::new(snapfile));
Ok((new_historic, new_open))
Ok((historics, new_open))
}
/// debugging function to print out the contents of the layer

View File

@@ -3,14 +3,14 @@
//!
//! When the timeline is first accessed, the server lists of all snapshot files
//! in the timelines/<timelineid> directory, and populates this map with
//! SnapshotLayers corresponding to each file. When new WAL is received,
//! SnapshotLayers corresponding to each file. When new WAL is received, FIXME
//! we create InMemoryLayers to hold the incoming records. Now and then,
//! in the checkpoint() function, the in-memory layers are frozen, forming
//! new snapshot layers and corresponding files are written to disk.
//!
use crate::layered_repository::storage_layer::{Layer, SegmentTag};
use crate::layered_repository::{InMemoryLayer, SnapshotLayer};
use crate::layered_repository::{InMemoryLayer};
use crate::relish::*;
use anyhow::Result;
use lazy_static::lazy_static;
@@ -48,7 +48,7 @@ pub struct LayerMap {
struct SegEntry {
pub open: Option<Arc<InMemoryLayer>>,
pub historic: BTreeMap<Lsn, Arc<SnapshotLayer>>,
pub historic: BTreeMap<Lsn, Arc<dyn Layer>>,
}
struct OpenSegEntry {
@@ -160,7 +160,7 @@ impl LayerMap {
///
/// Insert an on-disk layer
///
pub fn insert_historic(&mut self, layer: Arc<SnapshotLayer>) {
pub fn insert_historic(&mut self, layer: Arc<dyn Layer>) {
let tag = layer.get_seg_tag();
let start_lsn = layer.get_start_lsn();
@@ -184,7 +184,7 @@ impl LayerMap {
///
/// This should be called when the corresponding file on disk has been deleted.
///
pub fn remove_historic(&mut self, layer: &SnapshotLayer) {
pub fn remove_historic(&mut self, layer: &dyn Layer) {
let tag = layer.get_seg_tag();
let start_lsn = layer.get_start_lsn();
@@ -230,14 +230,16 @@ impl LayerMap {
/// Is there a newer layer for given segment?
pub fn newer_layer_exists(&self, seg: SegmentTag, lsn: Lsn) -> bool {
if let Some(segentry) = self.segs.get(&seg) {
if let Some(_open) = &segentry.open {
return true;
}
// open layer is always incremental so it doesn't count
for (newer_lsn, layer) in segentry
.historic
.range((Included(lsn), Included(Lsn(u64::MAX))))
{
// FIXME: incremental layers don't count
if layer.is_incremental() {
continue;
}
if layer.get_end_lsn() > lsn {
trace!(
"found later layer for {}, {} {}-{}",
@@ -284,11 +286,11 @@ impl Default for LayerMap {
pub struct HistoricLayerIter<'a> {
segiter: std::collections::hash_map::Iter<'a, SegmentTag, SegEntry>,
iter: Option<std::collections::btree_map::Iter<'a, Lsn, Arc<SnapshotLayer>>>,
iter: Option<std::collections::btree_map::Iter<'a, Lsn, Arc<dyn Layer>>>,
}
impl<'a> Iterator for HistoricLayerIter<'a> {
type Item = Arc<SnapshotLayer>;
type Item = Arc<dyn Layer>;
fn next(&mut self) -> std::option::Option<<Self as std::iter::Iterator>::Item> {
loop {

View File

@@ -42,11 +42,9 @@ use crate::layered_repository::storage_layer::{
Layer, PageReconstructData, PageVersion, SegmentTag,
};
use crate::layered_repository::filename::{SnapshotFileName};
use crate::layered_repository::RELISH_SEG_SIZE;
use crate::PageServerConf;
use crate::{ZTenantId, ZTimelineId};
use anyhow::{bail, Result};
use bytes::Bytes;
use log::*;
use std::collections::BTreeMap;
use std::fs;
@@ -54,7 +52,7 @@ use std::fs::File;
use std::io::Write;
use std::ops::Bound::Included;
use std::path::PathBuf;
use std::sync::{Mutex, MutexGuard};
use std::sync::{Arc, Mutex, MutexGuard};
use bookfile::{Book, BookWriter};
@@ -64,9 +62,8 @@ use zenith_utils::lsn::Lsn;
// Magic constant to identify a Zenith snapshot file
static SNAPSHOT_FILE_MAGIC: u32 = 0x5A616E01;
static BASE_IMAGES_CHAPTER: u64 = 1;
static PAGE_VERSIONS_CHAPTER: u64 = 2;
static REL_SIZES_CHAPTER: u64 = 3;
static PAGE_VERSIONS_CHAPTER: u64 = 1;
static REL_SIZES_CHAPTER: u64 = 2;
///
/// SnapshotLayer is the in-memory data structure associated with an
@@ -91,6 +88,8 @@ pub struct SnapshotLayer {
dropped: bool,
inner: Mutex<SnapshotLayerInner>,
img_layer: Option<Arc<dyn Layer>>,
}
pub struct SnapshotLayerInner {
@@ -98,9 +97,6 @@ pub struct SnapshotLayerInner {
/// loaded into memory yet.
loaded: bool,
// indexed by block number (within segment)
base_images: Vec<Bytes>,
/// All versions of all pages in the file are are kept here.
/// Indexed by block number and LSN.
page_versions: BTreeMap<(u32, Lsn), PageVersion>,
@@ -167,13 +163,11 @@ impl Layer for SnapshotLayer {
}
// Use the base image, if needed
if need_base_image_lsn.is_some() {
let base_blknum: usize = (blknum % RELISH_SEG_SIZE) as usize;
if let Some(img) = inner.base_images.get(base_blknum) {
reconstruct_data.page_img = Some(img.clone());
need_base_image_lsn = None;
if let Some(need_lsn) = need_base_image_lsn {
if let Some(img_layer) = &self.img_layer {
need_base_image_lsn = img_layer.get_page_reconstruct_data(blknum, need_lsn, reconstruct_data)?;
} else {
bail!("no base img found for {} at blk {} at LSN {}", self.seg, base_blknum, lsn);
bail!("no base img found for {} at blk {} at LSN {}", self.seg, blknum, lsn);
}
}
@@ -195,10 +189,12 @@ impl Layer for SnapshotLayer {
let result;
if let Some((_entry_lsn, entry)) = iter.next_back() {
result = *entry;
// Use the base image if needed
} else if let Some(img_layer) = &self.img_layer {
result = img_layer.get_seg_size(lsn)?;
} else {
result = inner.base_images.len() as u32;
result = 0;
}
info!("get_seg_size: {} at {} -> {}", self.seg, lsn, result);
Ok(result)
}
@@ -212,6 +208,28 @@ impl Layer for SnapshotLayer {
// Otherwise, it exists.
Ok(true)
}
///
/// Release most of the memory used by this layer. If it's accessed again later,
/// it will need to be loaded back.
///
fn unload(&self) -> Result<()> {
let mut inner = self.inner.lock().unwrap();
inner.page_versions = BTreeMap::new();
inner.relsizes = BTreeMap::new();
inner.loaded = false;
Ok(())
}
fn delete(&self) -> Result<()> {
// delete underlying file
fs::remove_file(self.path())?;
Ok(())
}
fn is_incremental(&self) -> bool {
true
}
}
impl SnapshotLayer {
@@ -253,7 +271,7 @@ impl SnapshotLayer {
start_lsn: Lsn,
end_lsn: Lsn,
dropped: bool,
base_images: Vec<Bytes>,
img_layer: Option<Arc<dyn Layer>>,
page_versions: BTreeMap<(u32, Lsn), PageVersion>,
relsizes: BTreeMap<Lsn, u32>,
) -> Result<SnapshotLayer> {
@@ -268,10 +286,10 @@ impl SnapshotLayer {
dropped,
inner: Mutex::new(SnapshotLayerInner {
loaded: true,
base_images: base_images,
page_versions: page_versions,
relsizes: relsizes,
}),
img_layer,
};
let inner = snapfile.inner.lock().unwrap();
@@ -283,13 +301,6 @@ impl SnapshotLayer {
let file = File::create(&path)?;
let book = BookWriter::new(file, SNAPSHOT_FILE_MAGIC)?;
// Write out the base images
let mut chapter = book.new_chapter(BASE_IMAGES_CHAPTER);
let buf = Vec::ser(&inner.base_images)?;
chapter.write_all(&buf)?;
let book = chapter.close()?;
// Write out the other page versions
let mut chapter = book.new_chapter(PAGE_VERSIONS_CHAPTER);
let buf = BTreeMap::ser(&inner.page_versions)?;
@@ -337,9 +348,6 @@ impl SnapshotLayer {
let file = File::open(&path)?;
let book = Book::new(file)?;
let chapter = book.read_chapter(BASE_IMAGES_CHAPTER)?;
let base_images = Vec::des(&chapter)?;
let chapter = book.read_chapter(PAGE_VERSIONS_CHAPTER)?;
let page_versions = BTreeMap::des(&chapter)?;
@@ -350,7 +358,6 @@ impl SnapshotLayer {
*inner = SnapshotLayerInner {
loaded: true,
base_images,
page_versions,
relsizes,
};
@@ -377,34 +384,16 @@ impl SnapshotLayer {
dropped: filename.dropped,
inner: Mutex::new(SnapshotLayerInner {
loaded: false,
base_images: Vec::new(),
page_versions: BTreeMap::new(),
relsizes: BTreeMap::new(),
}),
// FIXME: This doesn't work across restarts.
img_layer: None,
};
Ok(snapfile)
}
pub fn delete(&self) -> Result<()> {
// delete underlying file
fs::remove_file(self.path())?;
Ok(())
}
///
/// Release most of the memory used by this layer. If it's accessed again later,
/// it will need to be loaded back.
///
pub fn unload(&self) -> Result<()> {
let mut inner = self.inner.lock().unwrap();
inner.base_images = Vec::new();
inner.page_versions = BTreeMap::new();
inner.relsizes = BTreeMap::new();
inner.loaded = false;
Ok(())
}
/// debugging function to print out the contents of the layer
#[allow(unused)]
pub fn dump(&self) -> String {

View File

@@ -147,4 +147,9 @@ pub trait Layer: Send + Sync {
fn get_seg_size(&self, lsn: Lsn) -> Result<u32>;
fn get_seg_exists(&self, lsn: Lsn) -> Result<bool>;
fn is_incremental(&self) -> bool;
fn unload(&self) -> Result<()>;
fn delete(&self) -> Result<()>;
}