diff --git a/pageserver/src/layered_repository.rs b/pageserver/src/layered_repository.rs index 5170ab61a2..c81625088d 100644 --- a/pageserver/src/layered_repository.rs +++ b/pageserver/src/layered_repository.rs @@ -35,16 +35,19 @@ use crate::PageServerConf; use crate::{ZTenantId, ZTimelineId}; use zenith_metrics::{register_histogram_vec, HistogramVec}; +use zenith_metrics::{register_histogram, Histogram}; use zenith_utils::bin_ser::BeSer; use zenith_utils::lsn::{AtomicLsn, Lsn}; use zenith_utils::seqwait::SeqWait; mod filename; +mod image_layer; mod inmemory_layer; mod layer_map; mod snapshot_layer; mod storage_layer; +use image_layer::ImageLayer; use inmemory_layer::InMemoryLayer; use layer_map::LayerMap; use snapshot_layer::SnapshotLayer; @@ -74,6 +77,16 @@ lazy_static! { .expect("failed to define a metric"); } + +// Metrics collected on operations on the storage repository. +lazy_static! { + static ref RECONSTRUCT_TIME: Histogram = register_histogram!( + "pageserver_getpage_reconstruct_time", + "FIXME Time spent on storage operations" + ) + .expect("failed to define a metric"); +} + /// /// Repository consists of multiple timelines. Keep them in a hash table. /// @@ -486,7 +499,10 @@ impl Timeline for LayeredTimeline { let seg = SegmentTag::from_blknum(rel, blknum); if let Some((layer, lsn)) = self.get_layer_for_read(seg, lsn)? { - self.materialize_page(seg, blknum, lsn, &*layer) + RECONSTRUCT_TIME + .observe_closure_duration(|| { + self.materialize_page(seg, blknum, lsn, &*layer) + }) } else { bail!("relish {} not found at {}", rel, lsn); } @@ -885,9 +901,21 @@ impl LayeredTimeline { self.timelineid ); let mut layers = self.layers.lock().unwrap(); - let snapfilenames = + let (snapfilenames, imgfilenames) = filename::list_snapshot_files(self.conf, self.timelineid, self.tenantid)?; + for filename in imgfilenames.iter() { + let layer = ImageLayer::load_image_layer(self.conf, self.timelineid, self.tenantid, filename)?; + + info!( + "found layer {} {} on timeline {}", + layer.get_seg_tag(), + layer.get_start_lsn(), + self.timelineid + ); + layers.insert_historic(Arc::new(layer)); + } + for filename in snapfilenames.iter() { let layer = SnapshotLayer::load_snapshot_layer(self.conf, self.timelineid, self.tenantid, filename)?; @@ -1031,10 +1059,9 @@ impl LayeredTimeline { prev_layer.get_start_lsn(), prev_layer.get_end_lsn() ); - layer = InMemoryLayer::copy_snapshot( + layer = InMemoryLayer::create_successor_layer( self.conf, - &self, - &*prev_layer, + prev_layer, self.timelineid, self.tenantid, start_lsn, @@ -1147,14 +1174,14 @@ impl LayeredTimeline { break; } - let (new_historic, new_open) = oldest_layer.freeze(last_valid_lsn, &self)?; + let (new_historics, new_open) = oldest_layer.freeze(last_valid_lsn, &self)?; // replace this layer with the new layers that 'freeze' returned layers.pop_oldest(); if let Some(n) = new_open { layers.insert_open(n); } - if let Some(historic) = new_historic { + for historic in new_historics { trace!( "freeze returned layer {} {}-{}", historic.get_seg_tag(), @@ -1223,7 +1250,7 @@ impl LayeredTimeline { self.timelineid, cutoff ); - let mut layers_to_remove: Vec> = Vec::new(); + let mut layers_to_remove: Vec> = Vec::new(); // Scan all snapshot files in the directory. For each file, if a newer file // exists, we can remove the old one. diff --git a/pageserver/src/layered_repository/filename.rs b/pageserver/src/layered_repository/filename.rs index 948affe547..f807aa599a 100644 --- a/pageserver/src/layered_repository/filename.rs +++ b/pageserver/src/layered_repository/filename.rs @@ -151,6 +151,132 @@ impl fmt::Display for SnapshotFileName { } } +#[derive(Debug, PartialEq, Eq, PartialOrd, Ord, Clone)] +pub struct ImageFileName { + pub seg: SegmentTag, + pub lsn: Lsn, +} + +impl ImageFileName { + fn from_str(fname: &str) -> Option { + // Split the filename into parts + // + // ______ + // + // or if it was dropped: + // + // _______DROPPED + // + let rel; + let mut parts; + if let Some(rest) = fname.strip_prefix("rel_") { + parts = rest.split('_'); + rel = RelishTag::Relation(RelTag { + spcnode: parts.next()?.parse::().ok()?, + dbnode: parts.next()?.parse::().ok()?, + relnode: parts.next()?.parse::().ok()?, + forknum: parts.next()?.parse::().ok()?, + }); + } else if let Some(rest) = fname.strip_prefix("pg_xact_") { + parts = rest.split('_'); + rel = RelishTag::Slru { + slru: SlruKind::Clog, + segno: u32::from_str_radix(parts.next()?, 16).ok()?, + }; + } else if let Some(rest) = fname.strip_prefix("pg_multixact_members_") { + parts = rest.split('_'); + rel = RelishTag::Slru { + slru: SlruKind::MultiXactMembers, + segno: u32::from_str_radix(parts.next()?, 16).ok()?, + }; + } else if let Some(rest) = fname.strip_prefix("pg_multixact_offsets_") { + parts = rest.split('_'); + rel = RelishTag::Slru { + slru: SlruKind::MultiXactOffsets, + segno: u32::from_str_radix(parts.next()?, 16).ok()?, + }; + } else if let Some(rest) = fname.strip_prefix("pg_filenodemap_") { + parts = rest.split('_'); + rel = RelishTag::FileNodeMap { + spcnode: parts.next()?.parse::().ok()?, + dbnode: parts.next()?.parse::().ok()?, + }; + } else if let Some(rest) = fname.strip_prefix("pg_twophase_") { + parts = rest.split('_'); + rel = RelishTag::TwoPhase { + xid: parts.next()?.parse::().ok()?, + }; + } else if let Some(rest) = fname.strip_prefix("pg_control_checkpoint_") { + parts = rest.split('_'); + rel = RelishTag::Checkpoint; + } else if let Some(rest) = fname.strip_prefix("pg_control_") { + parts = rest.split('_'); + rel = RelishTag::ControlFile; + } else { + return None; + } + + let segno = parts.next()?.parse::().ok()?; + + let seg = SegmentTag { + rel, + segno + }; + + let lsn = Lsn::from_hex(parts.next()?).ok()?; + + if parts.next().is_some() { + warn!("unrecognized filename in timeline dir: {}", fname); + return None; + } + + Some(ImageFileName { + seg, + lsn, + }) + } + + fn to_string(&self) -> String { + let basename = match self.seg.rel { + RelishTag::Relation(reltag) => format!( + "rel_{}_{}_{}_{}", + reltag.spcnode, reltag.dbnode, reltag.relnode, reltag.forknum + ), + RelishTag::Slru { + slru: SlruKind::Clog, + segno, + } => format!("pg_xact_{:04X}", segno), + RelishTag::Slru { + slru: SlruKind::MultiXactMembers, + segno, + } => format!("pg_multixact_members_{:04X}", segno), + RelishTag::Slru { + slru: SlruKind::MultiXactOffsets, + segno, + } => format!("pg_multixact_offsets_{:04X}", segno), + RelishTag::FileNodeMap { spcnode, dbnode } => { + format!("pg_filenodemap_{}_{}", spcnode, dbnode) + } + RelishTag::TwoPhase { xid } => format!("pg_twophase_{}", xid), + RelishTag::Checkpoint => format!("pg_control_checkpoint"), + RelishTag::ControlFile => format!("pg_control"), + }; + + format!( + "{}_{}_{:016X}", + basename, + self.seg.segno, + u64::from(self.lsn), + ) + } +} + +impl fmt::Display for ImageFileName { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + write!(f, "{}", self.to_string()) + } +} + /// Create SnapshotLayers representing all files on disk /// @@ -159,10 +285,11 @@ pub fn list_snapshot_files( conf: &'static PageServerConf, timelineid: ZTimelineId, tenantid: ZTenantId, -) -> Result> { +) -> Result<(Vec, Vec)> { let path = conf.timeline_path(&timelineid, &tenantid); let mut snapfiles: Vec = Vec::new(); + let mut imgfiles: Vec = Vec::new(); for direntry in fs::read_dir(path)? { let fname = direntry?.file_name(); let fname = fname.to_str().unwrap(); @@ -170,6 +297,10 @@ pub fn list_snapshot_files( if let Some(snapfilename) = SnapshotFileName::from_str(fname) { snapfiles.push(snapfilename); } + + if let Some(imgfilename) = ImageFileName::from_str(fname) { + imgfiles.push(imgfilename); + } } - return Ok(snapfiles); + return Ok((snapfiles, imgfiles)); } diff --git a/pageserver/src/layered_repository/image_layer.rs b/pageserver/src/layered_repository/image_layer.rs new file mode 100644 index 0000000000..debbeb6ae1 --- /dev/null +++ b/pageserver/src/layered_repository/image_layer.rs @@ -0,0 +1,384 @@ +//! FIXME +//! A SnapshotLayer represents one snapshot file on disk. One file holds all page +//! version and size information of one relation, in a range of LSN. +//! The name "snapshot file" is a bit of a misnomer because a snapshot file doesn't +//! contain a snapshot at a specific LSN, but rather all the page versions in a range +//! of LSNs. +//! +//! Currently, a snapshot file contains full information needed to reconstruct any +//! page version in the LSN range, without consulting any other snapshot files. When +//! a new snapshot file is created for writing, the full contents of relation are +//! materialized as it is at the beginning of the LSN range. That can be very expensive, +//! we should find a way to store differential files. But this keeps the read-side +//! of things simple. You can find the correct snapshot file based on RelishTag and +//! timeline+LSN, and once you've located it, you have all the data you need to in that +//! file. +//! +//! When a snapshot file needs to be accessed, we slurp the whole file into memory, into +//! the SnapshotLayer struct. See load() and unload() functions. +//! +//! On disk, the snapshot files are stored in timelines/ directory. +//! Currently, there are no subdirectories, and each snapshot file is named like this: +//! +//! _____ +//! +//! For example: +//! +//! 1663_13990_2609_0_000000000169C348_000000000169C349 +//! +//! If a relation is dropped, we add a '_DROPPED' to the end of the filename to indicate that. +//! So the above example would become: +//! +//! 1663_13990_2609_0_000000000169C348_000000000169C349_DROPPED +//! +//! The end LSN indicates when it was dropped in that case, we don't store it in the +//! file contents in any way. +//! +//! A snapshot file is constructed using the 'bookfile' crate. Each file consists of two +//! parts: the page versions and the relation sizes. They are stored as separate chapters. +//! FIXME +//! +use crate::layered_repository::storage_layer::{Layer, PageReconstructData, SegmentTag}; +use crate::layered_repository::LayeredTimeline; +use crate::layered_repository::filename::{ImageFileName}; +use crate::layered_repository::RELISH_SEG_SIZE; +use crate::PageServerConf; +use crate::{ZTenantId, ZTimelineId}; +use anyhow::{bail, Result}; +use bytes::Bytes; +use lazy_static::lazy_static; +use log::*; +use std::fs; +use std::fs::File; +use std::io::Write; +use std::path::PathBuf; +use std::sync::{Mutex, MutexGuard}; + +use bookfile::{Book, BookWriter}; + +use zenith_metrics::{register_histogram, Histogram}; +use zenith_utils::bin_ser::BeSer; +use zenith_utils::lsn::Lsn; + +// Magic constant to identify a Zenith segment image file +static IMAGE_FILE_MAGIC: u32 = 0x5A616E01 + 1; + +static BASE_IMAGES_CHAPTER: u64 = 1; + + +// Metrics collected on operations on the storage repository. +lazy_static! { + static ref RECONSTRUCT_TIME: Histogram = register_histogram!( + "pageserver_image_reconstruct_time", + "FIXME Time spent on storage operations" + ) + .expect("failed to define a metric"); +} + +/// +/// SnapshotLayer is the in-memory data structure associated with an +/// on-disk snapshot file. We keep a SnapshotLayer in memory for each +/// file, in the LayerMap. If a layer is in "loaded" state, we have a +/// copy of the file in memory, in 'inner'. Otherwise the struct is +/// just a placeholder for a file that exists on disk, and it needs to +/// be loaded before using it in queries. +/// +pub struct ImageLayer { + conf: &'static PageServerConf, + pub tenantid: ZTenantId, + pub timelineid: ZTimelineId, + pub seg: SegmentTag, + + // This entry contains an image of all pages as of this LSN + pub lsn: Lsn, + + inner: Mutex, +} + +pub struct ImageLayerInner { + /// If false, the 'page_versions' and 'relsizes' have not been + /// loaded into memory yet. + loaded: bool, + + // indexed by block number (within segment) + base_images: Vec, +} + +impl Layer for ImageLayer { + fn get_timeline_id(&self) -> ZTimelineId { + return self.timelineid; + } + + fn get_seg_tag(&self) -> SegmentTag { + return self.seg; + } + + fn is_dropped(&self) -> bool { + return false; + } + + fn get_start_lsn(&self) -> Lsn { + return self.lsn; + } + + fn get_end_lsn(&self) -> Lsn { + return self.lsn; + } + + /// Look up given page in the cache. + fn get_page_reconstruct_data( + &self, + blknum: u32, + lsn: Lsn, + reconstruct_data: &mut PageReconstructData, + ) -> Result> { + let need_base_image_lsn: Option; + + assert!(lsn >= self.lsn); + + { + let inner = self.load()?; + + let base_blknum: usize = (blknum % RELISH_SEG_SIZE) as usize; + if let Some(img) = inner.base_images.get(base_blknum) { + reconstruct_data.page_img = Some(img.clone()); + need_base_image_lsn = None; + } else { + bail!("no base img found for {} at blk {} at LSN {}", self.seg, base_blknum, lsn); + } + // release lock on 'inner' + } + + Ok(need_base_image_lsn) + } + + /// Get size of the relation at given LSN + fn get_seg_size(&self, _lsn: Lsn) -> Result { + + let inner = self.load()?; + let result = inner.base_images.len() as u32; + + Ok(result) + } + + /// Does this segment exist at given LSN? + fn get_seg_exists(&self, _lsn: Lsn) -> Result { + Ok(true) + } + + + /// + /// Release most of the memory used by this layer. If it's accessed again later, + /// it will need to be loaded back. + /// + fn unload(&self) -> Result<()> { + let mut inner = self.inner.lock().unwrap(); + inner.base_images = Vec::new(); + inner.loaded = false; + Ok(()) + } + + fn delete(&self) -> Result<()> { + // delete underlying file + fs::remove_file(self.path())?; + Ok(()) + } + + fn is_incremental(&self) -> bool { + false + } +} + +impl ImageLayer { + fn path(&self) -> PathBuf { + Self::path_for( + self.conf, + self.timelineid, + self.tenantid, + &ImageFileName { + seg: self.seg, + lsn: self.lsn, + }, + ) + } + + fn path_for( + conf: &'static PageServerConf, + timelineid: ZTimelineId, + tenantid: ZTenantId, + fname: &ImageFileName, + ) -> PathBuf { + conf.timeline_path(&timelineid, &tenantid) + .join(fname.to_string()) + } + + /// Create a new snapshot file, using the given btreemaps containing the page versions and + /// relsizes. + /// FIXME comment + /// This is used to write the in-memory layer to disk. The in-memory layer uses the same + /// data structure with two btreemaps as we do, so passing the btreemaps is currently + /// expedient. + pub fn create( + conf: &'static PageServerConf, + timelineid: ZTimelineId, + tenantid: ZTenantId, + seg: SegmentTag, + lsn: Lsn, + base_images: Vec, + ) -> Result { + + let layer = ImageLayer { + conf: conf, + timelineid: timelineid, + tenantid: tenantid, + seg: seg, + lsn: lsn, + inner: Mutex::new(ImageLayerInner { + loaded: true, + base_images: base_images, + }), + }; + let inner = layer.inner.lock().unwrap(); + + // Write the images into a file + let path = layer.path(); + + // Note: This overwrites any existing file. There shouldn't be any. + // FIXME: throw an error instead? + let file = File::create(&path)?; + let book = BookWriter::new(file, IMAGE_FILE_MAGIC)?; + + // Write out the base images + let mut chapter = book.new_chapter(BASE_IMAGES_CHAPTER); + let buf = Vec::ser(&inner.base_images)?; + + chapter.write_all(&buf)?; + let book = chapter.close()?; + + book.close()?; + + trace!("saved {}", &path.display()); + + drop(inner); + + Ok(layer) + } + + pub fn create_from_src( + conf: &'static PageServerConf, + timeline: &LayeredTimeline, + src: &dyn Layer, + lsn: Lsn, + ) -> Result { + let seg = src.get_seg_tag(); + let timelineid = timeline.timelineid; + + let startblk; + let size; + if seg.rel.is_blocky() { + size = src.get_seg_size(lsn)?; + startblk = seg.segno * RELISH_SEG_SIZE; + } else { + size = 1; + startblk = 0; + } + + trace!( + "creating new ImageLayer for {} on timeline {} at {}", + seg, + timelineid, + lsn, + ); + + let mut base_images: Vec = Vec::new(); + for blknum in startblk..(startblk+size) { + let img = + RECONSTRUCT_TIME + .observe_closure_duration(|| { + timeline.materialize_page(seg, blknum, lsn, &*src) + })?; + + base_images.push(img); + } + + Self::create(conf, timelineid, timeline.tenantid, seg, lsn, + base_images) + } + + + /// + /// Load the contents of the file into memory + /// + fn load(&self) -> Result> { + // quick exit if already loaded + let mut inner = self.inner.lock().unwrap(); + + if inner.loaded { + return Ok(inner); + } + + let path = Self::path_for( + self.conf, + self.timelineid, + self.tenantid, + &ImageFileName { + seg: self.seg, + lsn: self.lsn, + }, + ); + + let file = File::open(&path)?; + let book = Book::new(file)?; + + let chapter = book.read_chapter(BASE_IMAGES_CHAPTER)?; + let base_images = Vec::des(&chapter)?; + + debug!("loaded from {}", &path.display()); + + *inner = ImageLayerInner { + loaded: true, + base_images, + }; + + Ok(inner) + } + + /// Create an ImageLayer represent a file on disk + pub fn load_image_layer( + conf: &'static PageServerConf, + timelineid: ZTimelineId, + tenantid: ZTenantId, + filename: &ImageFileName, + ) -> Result { + let layer = ImageLayer { + conf, + timelineid, + tenantid, + seg: filename.seg, + lsn: filename.lsn, + inner: Mutex::new(ImageLayerInner { + loaded: false, + base_images: Vec::new(), + }), + }; + + Ok(layer) + } + + /// debugging function to print out the contents of the layer + #[allow(unused)] + pub fn dump(&self) -> String { + let mut result = format!( + "----- image layer for {} at {} ----\n", + self.seg, self.lsn, + ); + + //let inner = self.inner.lock().unwrap(); + + //for (k, v) in inner.page_versions.iter() { + // result += &format!("blk {} at {}: {}/{}\n", k.0, k.1, v.page_image.is_some(), v.record.is_some()); + //} + + result + } +} diff --git a/pageserver/src/layered_repository/inmemory_layer.rs b/pageserver/src/layered_repository/inmemory_layer.rs index 7266879a43..8eba41af5e 100644 --- a/pageserver/src/layered_repository/inmemory_layer.rs +++ b/pageserver/src/layered_repository/inmemory_layer.rs @@ -6,8 +6,7 @@ use crate::layered_repository::storage_layer::{ Layer, PageReconstructData, PageVersion, SegmentTag, RELISH_SEG_SIZE, }; use crate::layered_repository::LayeredTimeline; -use crate::layered_repository::SnapshotLayer; -use crate::repository::WALRecord; +use crate::layered_repository::{ImageLayer, SnapshotLayer}; use crate::PageServerConf; use crate::{ZTenantId, ZTimelineId}; use anyhow::{bail, Result}; @@ -37,14 +36,14 @@ pub struct InMemoryLayer { /// The above fields never change. The parts that do change are in 'inner', /// and protected by mutex. inner: Mutex, + + img_layer: Option>, } pub struct InMemoryLayerInner { /// If this relation was dropped, remember when that happened. drop_lsn: Option, - base_images: Vec, - /// /// All versions of all pages in the layer are are kept here. /// Indexed by block number and LSN. @@ -130,13 +129,11 @@ impl Layer for InMemoryLayer { } // Use the base image, if needed - if need_base_image_lsn.is_some() { - let base_blknum: usize = (blknum % RELISH_SEG_SIZE) as usize; - if let Some(img) = inner.base_images.get(base_blknum) { - reconstruct_data.page_img = Some(img.clone()); - need_base_image_lsn = None; + if let Some(need_lsn) = need_base_image_lsn { + if let Some(img_layer) = &self.img_layer { + need_base_image_lsn = img_layer.get_page_reconstruct_data(blknum, need_lsn, reconstruct_data)?; } else { - bail!("inmem: no base img found for {} at blk {} at LSN {}", self.seg, base_blknum, lsn); + bail!("no base img found for {} at blk {} at LSN {}", self.seg, blknum, lsn); } } @@ -157,8 +154,11 @@ impl Layer for InMemoryLayer { let result; if let Some((_entry_lsn, entry)) = iter.next_back() { result = *entry; + // Use the base image if needed + } else if let Some(img_layer) = &self.img_layer { + result = img_layer.get_seg_size(lsn)?; } else { - result = inner.base_images.len() as u32; + result = 0; } trace!("get_seg_size: {} at {} -> {}", self.seg, lsn, result); Ok(result) @@ -178,6 +178,23 @@ impl Layer for InMemoryLayer { // Otherwise, it exists Ok(true) } + + + /// + /// Release most of the memory used by this layer. If it's accessed again later, + /// it will need to be loaded back. + /// + fn unload(&self) -> Result<()> { + Ok(()) + } + + fn delete(&self) -> Result<()> { + Ok(()) + } + + fn is_incremental(&self) -> bool { + true + } } impl InMemoryLayer { @@ -213,11 +230,11 @@ impl InMemoryLayer { oldest_pending_lsn, inner: Mutex::new(InMemoryLayerInner { drop_lsn: None, - base_images: Vec::new(), page_versions: BTreeMap::new(), segsizes: BTreeMap::new(), mem_used: 0, }), + img_layer: None, }) } @@ -285,8 +302,10 @@ impl InMemoryLayer { let oldsize; if let Some((_entry_lsn, entry)) = iter.next_back() { oldsize = *entry; + } else if let Some(img_layer) = &self.img_layer { + oldsize = img_layer.get_seg_size(lsn)?; } else { - oldsize = inner.base_images.len() as u32; + oldsize = 0; //bail!("No old size found for {} at {}", self.tag, lsn); } if newsize > oldsize { @@ -333,60 +352,37 @@ impl InMemoryLayer { /// Initialize a new InMemoryLayer for, by copying the state at the given /// point in time from given existing layer. /// - pub fn copy_snapshot( + pub fn create_successor_layer( conf: &'static PageServerConf, - timeline: &LayeredTimeline, - src: &dyn Layer, + src: Arc, timelineid: ZTimelineId, tenantid: ZTenantId, start_lsn: Lsn, oldest_pending_lsn: Lsn, ) -> Result { - let mut mem_used = 0; - let seg = src.get_seg_tag(); - let startblk; - let size; - if seg.rel.is_blocky() { - size = src.get_seg_size(start_lsn)?; - startblk = seg.segno * RELISH_SEG_SIZE; - } else { - size = 1; - startblk = 0; - } - trace!( - "initializing new InMemoryLayer for writing {} on timeline {} at {}, size {}", - src.get_seg_tag(), + "initializing new InMemoryLayer for writing {} on timeline {} at {}", + seg, timelineid, start_lsn, - size, ); - let mut base_images: Vec = Vec::new(); - for blknum in startblk..(startblk+size) { - let img = timeline.materialize_page(seg, blknum, start_lsn, src)?; - - mem_used += img.len(); - - base_images.push(img); - } - Ok(InMemoryLayer { conf, timelineid, tenantid, - seg: src.get_seg_tag(), + seg, start_lsn, oldest_pending_lsn, inner: Mutex::new(InMemoryLayerInner { drop_lsn: None, - base_images: base_images, page_versions: BTreeMap::new(), segsizes: BTreeMap::new(), - mem_used: mem_used, + mem_used: 0, }), + img_layer: Some(src), }) } @@ -406,7 +402,7 @@ impl InMemoryLayer { cutoff_lsn: Lsn, // This is needed just to call materialize_page() timeline: &LayeredTimeline, - ) -> Result<(Option>, Option>)> { + ) -> Result<(Vec>, Option>)> { info!( "freezing in memory layer for {} on timeline {} at {}", self.seg, self.timelineid, cutoff_lsn @@ -428,7 +424,6 @@ impl InMemoryLayer { }; // Divide all the page versions into old and new at the 'end_lsn' cutoff point. - let before_base_images = inner.base_images.clone(); let mut before_page_versions; let mut before_segsizes; let mut after_page_versions; @@ -463,6 +458,18 @@ impl InMemoryLayer { // we can release the lock now. drop(inner); + let mut historics: Vec> = Vec::new(); + + // write a new base image layer at the cutoff point + let imgfile = ImageLayer::create_from_src( + self.conf, + timeline, + self, + end_lsn, + )?; + let imgfile_rc: Arc = Arc::new(imgfile); + historics.push(Arc::clone(&imgfile_rc)); + // Write the page versions before the cutoff to disk. let snapfile = SnapshotLayer::create( self.conf, @@ -472,10 +479,12 @@ impl InMemoryLayer { self.start_lsn, end_lsn, dropped, - before_base_images, + self.img_layer.clone(), before_page_versions, before_segsizes, )?; + let snapfile_rc: Arc = Arc::new(snapfile); + historics.push(snapfile_rc); // If there were any "new" page versions, initialize a new in-memory layer to hold // them @@ -483,10 +492,9 @@ impl InMemoryLayer { if !after_segsizes.is_empty() || !after_page_versions.is_empty() { info!("created new in-mem layer for {} {}-", self.seg, end_lsn); - let new_open = Self::copy_snapshot( + let new_open = Self::create_successor_layer( self.conf, - timeline, - &snapfile, + imgfile_rc, self.timelineid, self.tenantid, end_lsn, @@ -502,9 +510,7 @@ impl InMemoryLayer { None }; - let new_historic = Some(Arc::new(snapfile)); - - Ok((new_historic, new_open)) + Ok((historics, new_open)) } /// debugging function to print out the contents of the layer diff --git a/pageserver/src/layered_repository/layer_map.rs b/pageserver/src/layered_repository/layer_map.rs index c052816e9b..78d6da3312 100644 --- a/pageserver/src/layered_repository/layer_map.rs +++ b/pageserver/src/layered_repository/layer_map.rs @@ -3,14 +3,14 @@ //! //! When the timeline is first accessed, the server lists of all snapshot files //! in the timelines/ directory, and populates this map with -//! SnapshotLayers corresponding to each file. When new WAL is received, +//! SnapshotLayers corresponding to each file. When new WAL is received, FIXME //! we create InMemoryLayers to hold the incoming records. Now and then, //! in the checkpoint() function, the in-memory layers are frozen, forming //! new snapshot layers and corresponding files are written to disk. //! use crate::layered_repository::storage_layer::{Layer, SegmentTag}; -use crate::layered_repository::{InMemoryLayer, SnapshotLayer}; +use crate::layered_repository::{InMemoryLayer}; use crate::relish::*; use anyhow::Result; use lazy_static::lazy_static; @@ -48,7 +48,7 @@ pub struct LayerMap { struct SegEntry { pub open: Option>, - pub historic: BTreeMap>, + pub historic: BTreeMap>, } struct OpenSegEntry { @@ -160,7 +160,7 @@ impl LayerMap { /// /// Insert an on-disk layer /// - pub fn insert_historic(&mut self, layer: Arc) { + pub fn insert_historic(&mut self, layer: Arc) { let tag = layer.get_seg_tag(); let start_lsn = layer.get_start_lsn(); @@ -184,7 +184,7 @@ impl LayerMap { /// /// This should be called when the corresponding file on disk has been deleted. /// - pub fn remove_historic(&mut self, layer: &SnapshotLayer) { + pub fn remove_historic(&mut self, layer: &dyn Layer) { let tag = layer.get_seg_tag(); let start_lsn = layer.get_start_lsn(); @@ -230,14 +230,16 @@ impl LayerMap { /// Is there a newer layer for given segment? pub fn newer_layer_exists(&self, seg: SegmentTag, lsn: Lsn) -> bool { if let Some(segentry) = self.segs.get(&seg) { - if let Some(_open) = &segentry.open { - return true; - } + // open layer is always incremental so it doesn't count for (newer_lsn, layer) in segentry .historic .range((Included(lsn), Included(Lsn(u64::MAX)))) { + // FIXME: incremental layers don't count + if layer.is_incremental() { + continue; + } if layer.get_end_lsn() > lsn { trace!( "found later layer for {}, {} {}-{}", @@ -284,11 +286,11 @@ impl Default for LayerMap { pub struct HistoricLayerIter<'a> { segiter: std::collections::hash_map::Iter<'a, SegmentTag, SegEntry>, - iter: Option>>, + iter: Option>>, } impl<'a> Iterator for HistoricLayerIter<'a> { - type Item = Arc; + type Item = Arc; fn next(&mut self) -> std::option::Option<::Item> { loop { diff --git a/pageserver/src/layered_repository/snapshot_layer.rs b/pageserver/src/layered_repository/snapshot_layer.rs index 34f69fb93c..bc97be9835 100644 --- a/pageserver/src/layered_repository/snapshot_layer.rs +++ b/pageserver/src/layered_repository/snapshot_layer.rs @@ -42,11 +42,9 @@ use crate::layered_repository::storage_layer::{ Layer, PageReconstructData, PageVersion, SegmentTag, }; use crate::layered_repository::filename::{SnapshotFileName}; -use crate::layered_repository::RELISH_SEG_SIZE; use crate::PageServerConf; use crate::{ZTenantId, ZTimelineId}; use anyhow::{bail, Result}; -use bytes::Bytes; use log::*; use std::collections::BTreeMap; use std::fs; @@ -54,7 +52,7 @@ use std::fs::File; use std::io::Write; use std::ops::Bound::Included; use std::path::PathBuf; -use std::sync::{Mutex, MutexGuard}; +use std::sync::{Arc, Mutex, MutexGuard}; use bookfile::{Book, BookWriter}; @@ -64,9 +62,8 @@ use zenith_utils::lsn::Lsn; // Magic constant to identify a Zenith snapshot file static SNAPSHOT_FILE_MAGIC: u32 = 0x5A616E01; -static BASE_IMAGES_CHAPTER: u64 = 1; -static PAGE_VERSIONS_CHAPTER: u64 = 2; -static REL_SIZES_CHAPTER: u64 = 3; +static PAGE_VERSIONS_CHAPTER: u64 = 1; +static REL_SIZES_CHAPTER: u64 = 2; /// /// SnapshotLayer is the in-memory data structure associated with an @@ -91,6 +88,8 @@ pub struct SnapshotLayer { dropped: bool, inner: Mutex, + + img_layer: Option>, } pub struct SnapshotLayerInner { @@ -98,9 +97,6 @@ pub struct SnapshotLayerInner { /// loaded into memory yet. loaded: bool, - // indexed by block number (within segment) - base_images: Vec, - /// All versions of all pages in the file are are kept here. /// Indexed by block number and LSN. page_versions: BTreeMap<(u32, Lsn), PageVersion>, @@ -167,13 +163,11 @@ impl Layer for SnapshotLayer { } // Use the base image, if needed - if need_base_image_lsn.is_some() { - let base_blknum: usize = (blknum % RELISH_SEG_SIZE) as usize; - if let Some(img) = inner.base_images.get(base_blknum) { - reconstruct_data.page_img = Some(img.clone()); - need_base_image_lsn = None; + if let Some(need_lsn) = need_base_image_lsn { + if let Some(img_layer) = &self.img_layer { + need_base_image_lsn = img_layer.get_page_reconstruct_data(blknum, need_lsn, reconstruct_data)?; } else { - bail!("no base img found for {} at blk {} at LSN {}", self.seg, base_blknum, lsn); + bail!("no base img found for {} at blk {} at LSN {}", self.seg, blknum, lsn); } } @@ -195,10 +189,12 @@ impl Layer for SnapshotLayer { let result; if let Some((_entry_lsn, entry)) = iter.next_back() { result = *entry; + // Use the base image if needed + } else if let Some(img_layer) = &self.img_layer { + result = img_layer.get_seg_size(lsn)?; } else { - result = inner.base_images.len() as u32; + result = 0; } - info!("get_seg_size: {} at {} -> {}", self.seg, lsn, result); Ok(result) } @@ -212,6 +208,28 @@ impl Layer for SnapshotLayer { // Otherwise, it exists. Ok(true) } + + /// + /// Release most of the memory used by this layer. If it's accessed again later, + /// it will need to be loaded back. + /// + fn unload(&self) -> Result<()> { + let mut inner = self.inner.lock().unwrap(); + inner.page_versions = BTreeMap::new(); + inner.relsizes = BTreeMap::new(); + inner.loaded = false; + Ok(()) + } + + fn delete(&self) -> Result<()> { + // delete underlying file + fs::remove_file(self.path())?; + Ok(()) + } + + fn is_incremental(&self) -> bool { + true + } } impl SnapshotLayer { @@ -253,7 +271,7 @@ impl SnapshotLayer { start_lsn: Lsn, end_lsn: Lsn, dropped: bool, - base_images: Vec, + img_layer: Option>, page_versions: BTreeMap<(u32, Lsn), PageVersion>, relsizes: BTreeMap, ) -> Result { @@ -268,10 +286,10 @@ impl SnapshotLayer { dropped, inner: Mutex::new(SnapshotLayerInner { loaded: true, - base_images: base_images, page_versions: page_versions, relsizes: relsizes, }), + img_layer, }; let inner = snapfile.inner.lock().unwrap(); @@ -283,13 +301,6 @@ impl SnapshotLayer { let file = File::create(&path)?; let book = BookWriter::new(file, SNAPSHOT_FILE_MAGIC)?; - // Write out the base images - let mut chapter = book.new_chapter(BASE_IMAGES_CHAPTER); - let buf = Vec::ser(&inner.base_images)?; - - chapter.write_all(&buf)?; - let book = chapter.close()?; - // Write out the other page versions let mut chapter = book.new_chapter(PAGE_VERSIONS_CHAPTER); let buf = BTreeMap::ser(&inner.page_versions)?; @@ -337,9 +348,6 @@ impl SnapshotLayer { let file = File::open(&path)?; let book = Book::new(file)?; - let chapter = book.read_chapter(BASE_IMAGES_CHAPTER)?; - let base_images = Vec::des(&chapter)?; - let chapter = book.read_chapter(PAGE_VERSIONS_CHAPTER)?; let page_versions = BTreeMap::des(&chapter)?; @@ -350,7 +358,6 @@ impl SnapshotLayer { *inner = SnapshotLayerInner { loaded: true, - base_images, page_versions, relsizes, }; @@ -377,34 +384,16 @@ impl SnapshotLayer { dropped: filename.dropped, inner: Mutex::new(SnapshotLayerInner { loaded: false, - base_images: Vec::new(), page_versions: BTreeMap::new(), relsizes: BTreeMap::new(), }), + // FIXME: This doesn't work across restarts. + img_layer: None, }; Ok(snapfile) } - pub fn delete(&self) -> Result<()> { - // delete underlying file - fs::remove_file(self.path())?; - Ok(()) - } - - /// - /// Release most of the memory used by this layer. If it's accessed again later, - /// it will need to be loaded back. - /// - pub fn unload(&self) -> Result<()> { - let mut inner = self.inner.lock().unwrap(); - inner.base_images = Vec::new(); - inner.page_versions = BTreeMap::new(); - inner.relsizes = BTreeMap::new(); - inner.loaded = false; - Ok(()) - } - /// debugging function to print out the contents of the layer #[allow(unused)] pub fn dump(&self) -> String { diff --git a/pageserver/src/layered_repository/storage_layer.rs b/pageserver/src/layered_repository/storage_layer.rs index 142bf76b96..167aaaecde 100644 --- a/pageserver/src/layered_repository/storage_layer.rs +++ b/pageserver/src/layered_repository/storage_layer.rs @@ -147,4 +147,9 @@ pub trait Layer: Send + Sync { fn get_seg_size(&self, lsn: Lsn) -> Result; fn get_seg_exists(&self, lsn: Lsn) -> Result; + + fn is_incremental(&self) -> bool; + + fn unload(&self) -> Result<()>; + fn delete(&self) -> Result<()>; }