diff --git a/pageserver/src/layered_repository.rs b/pageserver/src/layered_repository.rs index c81625088d..cc78086468 100644 --- a/pageserver/src/layered_repository.rs +++ b/pageserver/src/layered_repository.rs @@ -35,19 +35,15 @@ use crate::PageServerConf; use crate::{ZTenantId, ZTimelineId}; use zenith_metrics::{register_histogram_vec, HistogramVec}; -use zenith_metrics::{register_histogram, Histogram}; use zenith_utils::bin_ser::BeSer; use zenith_utils::lsn::{AtomicLsn, Lsn}; use zenith_utils::seqwait::SeqWait; -mod filename; -mod image_layer; mod inmemory_layer; mod layer_map; mod snapshot_layer; mod storage_layer; -use image_layer::ImageLayer; use inmemory_layer::InMemoryLayer; use layer_map::LayerMap; use snapshot_layer::SnapshotLayer; @@ -58,14 +54,14 @@ static ZERO_PAGE: Bytes = Bytes::from_static(&[0u8; 8192]); // Timeout when waiting for WAL receiver to catch up to an LSN given in a GetPage@LSN call. static TIMEOUT: Duration = Duration::from_secs(60); -// Flush out an inmemory layer, if it's holding WAL older than -// this. This puts a backstop on how much WAL needs to be re-digested -// if the page server is restarted. +// Perform a checkpoint in the GC thread, when the LSN has advanced this much since +// last checkpoint. This puts a backstop on how much WAL needs to be re-digested if +// the page server is restarted. // // FIXME: This current value is very low. I would imagine something like 1 GB or 10 GB // would be more appropriate. But a low value forces the code to be exercised more, // which is good for now to trigger bugs. -static OLDEST_INMEM_DISTANCE: u64 = 16 * 1024 * 1024; +static CHECKPOINT_INTERVAL: u64 = 16 * 1024 * 1024; // Metrics collected on operations on the storage repository. lazy_static! { @@ -77,16 +73,6 @@ lazy_static! { .expect("failed to define a metric"); } - -// Metrics collected on operations on the storage repository. -lazy_static! { - static ref RECONSTRUCT_TIME: Histogram = register_histogram!( - "pageserver_getpage_reconstruct_time", - "FIXME Time spent on storage operations" - ) - .expect("failed to define a metric"); -} - /// /// Repository consists of multiple timelines. Keep them in a hash table. /// @@ -275,11 +261,11 @@ impl LayeredRepository { { let timelines = self.timelines.lock().unwrap(); for (_timelineid, timeline) in timelines.iter() { - STORAGE_TIME - .with_label_values(&["checkpoint_timed"]) - .observe_closure_duration( - || timeline.checkpoint_internal(false) - )? + let distance = u64::from(timeline.last_valid_lsn.load()) + - u64::from(timeline.last_checkpoint_lsn.load()); + if distance > CHECKPOINT_INTERVAL { + timeline.checkpoint()?; + } } // release lock on 'timelines' } @@ -470,7 +456,7 @@ pub struct LayeredTimeline { last_record_lsn: AtomicLsn, prev_record_lsn: AtomicLsn, - oldest_pending_lsn: AtomicLsn, + last_checkpoint_lsn: AtomicLsn, // Parent timeline that this timeline was branched from, and the LSN // of the branch point. @@ -499,10 +485,7 @@ impl Timeline for LayeredTimeline { let seg = SegmentTag::from_blknum(rel, blknum); if let Some((layer, lsn)) = self.get_layer_for_read(seg, lsn)? { - RECONSTRUCT_TIME - .observe_closure_duration(|| { - self.materialize_page(seg, blknum, lsn, &*layer) - }) + self.materialize_page(seg, blknum, lsn, &*layer) } else { bail!("relish {} not found at {}", rel, lsn); } @@ -791,8 +774,8 @@ impl Timeline for LayeredTimeline { /// metrics collection. fn checkpoint(&self) -> Result<()> { STORAGE_TIME - .with_label_values(&["checkpoint_force"]) - .observe_closure_duration(|| self.checkpoint_internal(true)) + .with_label_values(&["checkpoint"]) + .observe_closure_duration(|| self.checkpoint_internal()) } /// Remember that WAL has been received and added to the page cache up to the given LSN @@ -884,7 +867,7 @@ impl LayeredTimeline { last_valid_lsn: SeqWait::new(metadata.last_valid_lsn), last_record_lsn: AtomicLsn::new(metadata.last_record_lsn.0), prev_record_lsn: AtomicLsn::new(metadata.prev_record_lsn.0), - oldest_pending_lsn: AtomicLsn::new(metadata.last_valid_lsn.0), + last_checkpoint_lsn: AtomicLsn::new(metadata.last_valid_lsn.0), ancestor_timeline: ancestor, ancestor_lsn: metadata.ancestor_lsn, @@ -901,33 +884,19 @@ impl LayeredTimeline { self.timelineid ); let mut layers = self.layers.lock().unwrap(); - let (snapfilenames, imgfilenames) = - filename::list_snapshot_files(self.conf, self.timelineid, self.tenantid)?; - - for filename in imgfilenames.iter() { - let layer = ImageLayer::load_image_layer(self.conf, self.timelineid, self.tenantid, filename)?; - - info!( - "found layer {} {} on timeline {}", - layer.get_seg_tag(), - layer.get_start_lsn(), - self.timelineid - ); - layers.insert_historic(Arc::new(layer)); - } - - for filename in snapfilenames.iter() { - let layer = SnapshotLayer::load_snapshot_layer(self.conf, self.timelineid, self.tenantid, filename)?; + let snapfiles = + SnapshotLayer::list_snapshot_files(self.conf, self.timelineid, self.tenantid)?; + for layer_rc in snapfiles.iter() { info!( "found layer {} {}-{} {} on timeline {}", - layer.get_seg_tag(), - layer.get_start_lsn(), - layer.get_end_lsn(), - layer.is_dropped(), + layer_rc.get_seg_tag(), + layer_rc.get_start_lsn(), + layer_rc.get_end_lsn(), + layer_rc.is_dropped(), self.timelineid ); - layers.insert_historic(Arc::new(layer)); + layers.insert_historic(Arc::clone(layer_rc)); } Ok(()) @@ -1034,23 +1003,23 @@ impl LayeredTimeline { let layer; if let Some((prev_layer, _prev_lsn)) = self.get_layer_for_read(seg, lsn)? { // Create new entry after the previous one. - let start_lsn; + let lsn; if prev_layer.get_timeline_id() != self.timelineid { // First modification on this timeline - start_lsn = self.ancestor_lsn; + lsn = self.ancestor_lsn; trace!( "creating file for write for {} at branch point {}/{}", seg, self.timelineid, - start_lsn + lsn ); } else { - start_lsn = prev_layer.get_end_lsn(); + lsn = prev_layer.get_end_lsn(); trace!( "creating file for write for {} after previous layer {}/{}", seg, self.timelineid, - start_lsn + lsn ); } trace!( @@ -1059,12 +1028,12 @@ impl LayeredTimeline { prev_layer.get_start_lsn(), prev_layer.get_end_lsn() ); - layer = InMemoryLayer::create_successor_layer( + layer = InMemoryLayer::copy_snapshot( self.conf, - prev_layer, + &self, + &*prev_layer, self.timelineid, self.tenantid, - start_lsn, lsn, )?; } else { @@ -1076,7 +1045,7 @@ impl LayeredTimeline { lsn ); - layer = InMemoryLayer::create(self.conf, self.timelineid, self.tenantid, seg, lsn, lsn)?; + layer = InMemoryLayer::create(self.conf, self.timelineid, self.tenantid, seg, lsn)?; } let mut layers = self.layers.lock().unwrap(); @@ -1119,7 +1088,7 @@ impl LayeredTimeline { /// /// NOTE: This has nothing to do with checkpoint in PostgreSQL. We don't /// know anything about them here in the repository. - fn checkpoint_internal(&self, force: bool) -> Result<()> { + fn checkpoint_internal(&self) -> Result<()> { let last_valid_lsn = self.last_valid_lsn.load(); let last_record_lsn = self.last_record_lsn.load(); let prev_record_lsn = self.prev_record_lsn.load(); @@ -1161,34 +1130,22 @@ impl LayeredTimeline { // Call freeze() on any unfrozen layers (that is, layers that haven't // been written to disk yet). // Call unload() on all frozen layers, to release memory. - - let mut oldest_pending_lsn = last_valid_lsn; - - while let Some(oldest_layer) = layers.get_oldest_open_layer() { - - oldest_pending_lsn = oldest_layer.get_oldest_pending_lsn(); - let distance = last_valid_lsn.0 - oldest_pending_lsn.0; - if !force && distance < OLDEST_INMEM_DISTANCE { - info!("the oldest layer is now {} which is {} bytes behind last_valid_lsn", - oldest_layer.get_seg_tag(), distance); - break; - } - - let (new_historics, new_open) = oldest_layer.freeze(last_valid_lsn, &self)?; + let mut iter = layers.iter_open_layers(); + while let Some(layer) = iter.next() { + let (new_historic, new_open) = layer.freeze(last_valid_lsn, &self)?; // replace this layer with the new layers that 'freeze' returned - layers.pop_oldest(); - if let Some(n) = new_open { - layers.insert_open(n); - } - for historic in new_historics { + // (removes it if new_open is None) + iter.replace(new_open); + + if let Some(historic) = new_historic { trace!( "freeze returned layer {} {}-{}", historic.get_seg_tag(), historic.get_start_lsn(), historic.get_end_lsn() ); - layers.insert_historic(historic); + iter.insert_historic(historic); } } @@ -1214,7 +1171,7 @@ impl LayeredTimeline { }; LayeredRepository::save_metadata(self.conf, self.timelineid, self.tenantid, &metadata)?; - self.oldest_pending_lsn.store(oldest_pending_lsn); + self.last_checkpoint_lsn.store(last_valid_lsn); Ok(()) } @@ -1245,20 +1202,22 @@ impl LayeredTimeline { let now = Instant::now(); let mut result: GcResult = Default::default(); + // Scan all snapshot files in the directory. For each file, if a newer file + // exists, we can remove the old one. + self.checkpoint()?; + + let mut layers = self.layers.lock().unwrap(); + info!( "running GC on timeline {}, cutoff {}", self.timelineid, cutoff ); - let mut layers_to_remove: Vec> = Vec::new(); + let mut layers_to_remove: Vec> = Vec::new(); - // Scan all snapshot files in the directory. For each file, if a newer file - // exists, we can remove the old one. - // // Determine for each file if it needs to be retained // FIXME: also scan open in-memory layers. Normally we cannot remove the // latest layer of any seg, but if it was unlinked it's possible - let mut layers = self.layers.lock().unwrap(); 'outer: for l in layers.iter_historic_layers() { let seg = l.get_seg_tag(); diff --git a/pageserver/src/layered_repository/filename.rs b/pageserver/src/layered_repository/filename.rs deleted file mode 100644 index f807aa599a..0000000000 --- a/pageserver/src/layered_repository/filename.rs +++ /dev/null @@ -1,306 +0,0 @@ -use crate::layered_repository::storage_layer::{SegmentTag}; -use crate::relish::*; -use crate::PageServerConf; -use crate::{ZTenantId, ZTimelineId}; -use std::fmt; -use std::fs; - -use anyhow::{Result}; -use log::*; -use zenith_utils::lsn::Lsn; - -#[derive(Debug, PartialEq, Eq, PartialOrd, Ord, Clone)] -pub struct SnapshotFileName { - pub seg: SegmentTag, - pub start_lsn: Lsn, - pub end_lsn: Lsn, - pub dropped: bool, -} - -impl SnapshotFileName { - fn from_str(fname: &str) -> Option { - // Split the filename into parts - // - // ______ - // - // or if it was dropped: - // - // _______DROPPED - // - let rel; - let mut parts; - if let Some(rest) = fname.strip_prefix("rel_") { - parts = rest.split('_'); - rel = RelishTag::Relation(RelTag { - spcnode: parts.next()?.parse::().ok()?, - dbnode: parts.next()?.parse::().ok()?, - relnode: parts.next()?.parse::().ok()?, - forknum: parts.next()?.parse::().ok()?, - }); - } else if let Some(rest) = fname.strip_prefix("pg_xact_") { - parts = rest.split('_'); - rel = RelishTag::Slru { - slru: SlruKind::Clog, - segno: u32::from_str_radix(parts.next()?, 16).ok()?, - }; - } else if let Some(rest) = fname.strip_prefix("pg_multixact_members_") { - parts = rest.split('_'); - rel = RelishTag::Slru { - slru: SlruKind::MultiXactMembers, - segno: u32::from_str_radix(parts.next()?, 16).ok()?, - }; - } else if let Some(rest) = fname.strip_prefix("pg_multixact_offsets_") { - parts = rest.split('_'); - rel = RelishTag::Slru { - slru: SlruKind::MultiXactOffsets, - segno: u32::from_str_radix(parts.next()?, 16).ok()?, - }; - } else if let Some(rest) = fname.strip_prefix("pg_filenodemap_") { - parts = rest.split('_'); - rel = RelishTag::FileNodeMap { - spcnode: parts.next()?.parse::().ok()?, - dbnode: parts.next()?.parse::().ok()?, - }; - } else if let Some(rest) = fname.strip_prefix("pg_twophase_") { - parts = rest.split('_'); - rel = RelishTag::TwoPhase { - xid: parts.next()?.parse::().ok()?, - }; - } else if let Some(rest) = fname.strip_prefix("pg_control_checkpoint_") { - parts = rest.split('_'); - rel = RelishTag::Checkpoint; - } else if let Some(rest) = fname.strip_prefix("pg_control_") { - parts = rest.split('_'); - rel = RelishTag::ControlFile; - } else { - return None; - } - - let segno = parts.next()?.parse::().ok()?; - - let seg = SegmentTag { - rel, - segno - }; - - let start_lsn = Lsn::from_hex(parts.next()?).ok()?; - let end_lsn = Lsn::from_hex(parts.next()?).ok()?; - - let mut dropped = false; - if let Some(suffix) = parts.next() { - if suffix == "DROPPED" { - dropped = true; - } else { - warn!("unrecognized filename in timeline dir: {}", fname); - return None; - } - } - if parts.next().is_some() { - warn!("unrecognized filename in timeline dir: {}", fname); - return None; - } - - Some(SnapshotFileName { - seg, - start_lsn, - end_lsn, - dropped, - }) - } - - fn to_string(&self) -> String { - let basename = match self.seg.rel { - RelishTag::Relation(reltag) => format!( - "rel_{}_{}_{}_{}", - reltag.spcnode, reltag.dbnode, reltag.relnode, reltag.forknum - ), - RelishTag::Slru { - slru: SlruKind::Clog, - segno, - } => format!("pg_xact_{:04X}", segno), - RelishTag::Slru { - slru: SlruKind::MultiXactMembers, - segno, - } => format!("pg_multixact_members_{:04X}", segno), - RelishTag::Slru { - slru: SlruKind::MultiXactOffsets, - segno, - } => format!("pg_multixact_offsets_{:04X}", segno), - RelishTag::FileNodeMap { spcnode, dbnode } => { - format!("pg_filenodemap_{}_{}", spcnode, dbnode) - } - RelishTag::TwoPhase { xid } => format!("pg_twophase_{}", xid), - RelishTag::Checkpoint => format!("pg_control_checkpoint"), - RelishTag::ControlFile => format!("pg_control"), - }; - - format!( - "{}_{}_{:016X}_{:016X}{}", - basename, - self.seg.segno, - u64::from(self.start_lsn), - u64::from(self.end_lsn), - if self.dropped { "_DROPPED" } else { "" } - ) - } -} - -impl fmt::Display for SnapshotFileName { - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - write!(f, "{}", self.to_string()) - } -} - -#[derive(Debug, PartialEq, Eq, PartialOrd, Ord, Clone)] -pub struct ImageFileName { - pub seg: SegmentTag, - pub lsn: Lsn, -} - -impl ImageFileName { - fn from_str(fname: &str) -> Option { - // Split the filename into parts - // - // ______ - // - // or if it was dropped: - // - // _______DROPPED - // - let rel; - let mut parts; - if let Some(rest) = fname.strip_prefix("rel_") { - parts = rest.split('_'); - rel = RelishTag::Relation(RelTag { - spcnode: parts.next()?.parse::().ok()?, - dbnode: parts.next()?.parse::().ok()?, - relnode: parts.next()?.parse::().ok()?, - forknum: parts.next()?.parse::().ok()?, - }); - } else if let Some(rest) = fname.strip_prefix("pg_xact_") { - parts = rest.split('_'); - rel = RelishTag::Slru { - slru: SlruKind::Clog, - segno: u32::from_str_radix(parts.next()?, 16).ok()?, - }; - } else if let Some(rest) = fname.strip_prefix("pg_multixact_members_") { - parts = rest.split('_'); - rel = RelishTag::Slru { - slru: SlruKind::MultiXactMembers, - segno: u32::from_str_radix(parts.next()?, 16).ok()?, - }; - } else if let Some(rest) = fname.strip_prefix("pg_multixact_offsets_") { - parts = rest.split('_'); - rel = RelishTag::Slru { - slru: SlruKind::MultiXactOffsets, - segno: u32::from_str_radix(parts.next()?, 16).ok()?, - }; - } else if let Some(rest) = fname.strip_prefix("pg_filenodemap_") { - parts = rest.split('_'); - rel = RelishTag::FileNodeMap { - spcnode: parts.next()?.parse::().ok()?, - dbnode: parts.next()?.parse::().ok()?, - }; - } else if let Some(rest) = fname.strip_prefix("pg_twophase_") { - parts = rest.split('_'); - rel = RelishTag::TwoPhase { - xid: parts.next()?.parse::().ok()?, - }; - } else if let Some(rest) = fname.strip_prefix("pg_control_checkpoint_") { - parts = rest.split('_'); - rel = RelishTag::Checkpoint; - } else if let Some(rest) = fname.strip_prefix("pg_control_") { - parts = rest.split('_'); - rel = RelishTag::ControlFile; - } else { - return None; - } - - let segno = parts.next()?.parse::().ok()?; - - let seg = SegmentTag { - rel, - segno - }; - - let lsn = Lsn::from_hex(parts.next()?).ok()?; - - if parts.next().is_some() { - warn!("unrecognized filename in timeline dir: {}", fname); - return None; - } - - Some(ImageFileName { - seg, - lsn, - }) - } - - fn to_string(&self) -> String { - let basename = match self.seg.rel { - RelishTag::Relation(reltag) => format!( - "rel_{}_{}_{}_{}", - reltag.spcnode, reltag.dbnode, reltag.relnode, reltag.forknum - ), - RelishTag::Slru { - slru: SlruKind::Clog, - segno, - } => format!("pg_xact_{:04X}", segno), - RelishTag::Slru { - slru: SlruKind::MultiXactMembers, - segno, - } => format!("pg_multixact_members_{:04X}", segno), - RelishTag::Slru { - slru: SlruKind::MultiXactOffsets, - segno, - } => format!("pg_multixact_offsets_{:04X}", segno), - RelishTag::FileNodeMap { spcnode, dbnode } => { - format!("pg_filenodemap_{}_{}", spcnode, dbnode) - } - RelishTag::TwoPhase { xid } => format!("pg_twophase_{}", xid), - RelishTag::Checkpoint => format!("pg_control_checkpoint"), - RelishTag::ControlFile => format!("pg_control"), - }; - - format!( - "{}_{}_{:016X}", - basename, - self.seg.segno, - u64::from(self.lsn), - ) - } -} - -impl fmt::Display for ImageFileName { - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - write!(f, "{}", self.to_string()) - } -} - - -/// Create SnapshotLayers representing all files on disk -/// -// TODO: returning an Iterator would be more idiomatic -pub fn list_snapshot_files( - conf: &'static PageServerConf, - timelineid: ZTimelineId, - tenantid: ZTenantId, -) -> Result<(Vec, Vec)> { - let path = conf.timeline_path(&timelineid, &tenantid); - - let mut snapfiles: Vec = Vec::new(); - let mut imgfiles: Vec = Vec::new(); - for direntry in fs::read_dir(path)? { - let fname = direntry?.file_name(); - let fname = fname.to_str().unwrap(); - - if let Some(snapfilename) = SnapshotFileName::from_str(fname) { - snapfiles.push(snapfilename); - } - - if let Some(imgfilename) = ImageFileName::from_str(fname) { - imgfiles.push(imgfilename); - } - } - return Ok((snapfiles, imgfiles)); -} diff --git a/pageserver/src/layered_repository/image_layer.rs b/pageserver/src/layered_repository/image_layer.rs deleted file mode 100644 index debbeb6ae1..0000000000 --- a/pageserver/src/layered_repository/image_layer.rs +++ /dev/null @@ -1,384 +0,0 @@ -//! FIXME -//! A SnapshotLayer represents one snapshot file on disk. One file holds all page -//! version and size information of one relation, in a range of LSN. -//! The name "snapshot file" is a bit of a misnomer because a snapshot file doesn't -//! contain a snapshot at a specific LSN, but rather all the page versions in a range -//! of LSNs. -//! -//! Currently, a snapshot file contains full information needed to reconstruct any -//! page version in the LSN range, without consulting any other snapshot files. When -//! a new snapshot file is created for writing, the full contents of relation are -//! materialized as it is at the beginning of the LSN range. That can be very expensive, -//! we should find a way to store differential files. But this keeps the read-side -//! of things simple. You can find the correct snapshot file based on RelishTag and -//! timeline+LSN, and once you've located it, you have all the data you need to in that -//! file. -//! -//! When a snapshot file needs to be accessed, we slurp the whole file into memory, into -//! the SnapshotLayer struct. See load() and unload() functions. -//! -//! On disk, the snapshot files are stored in timelines/ directory. -//! Currently, there are no subdirectories, and each snapshot file is named like this: -//! -//! _____ -//! -//! For example: -//! -//! 1663_13990_2609_0_000000000169C348_000000000169C349 -//! -//! If a relation is dropped, we add a '_DROPPED' to the end of the filename to indicate that. -//! So the above example would become: -//! -//! 1663_13990_2609_0_000000000169C348_000000000169C349_DROPPED -//! -//! The end LSN indicates when it was dropped in that case, we don't store it in the -//! file contents in any way. -//! -//! A snapshot file is constructed using the 'bookfile' crate. Each file consists of two -//! parts: the page versions and the relation sizes. They are stored as separate chapters. -//! FIXME -//! -use crate::layered_repository::storage_layer::{Layer, PageReconstructData, SegmentTag}; -use crate::layered_repository::LayeredTimeline; -use crate::layered_repository::filename::{ImageFileName}; -use crate::layered_repository::RELISH_SEG_SIZE; -use crate::PageServerConf; -use crate::{ZTenantId, ZTimelineId}; -use anyhow::{bail, Result}; -use bytes::Bytes; -use lazy_static::lazy_static; -use log::*; -use std::fs; -use std::fs::File; -use std::io::Write; -use std::path::PathBuf; -use std::sync::{Mutex, MutexGuard}; - -use bookfile::{Book, BookWriter}; - -use zenith_metrics::{register_histogram, Histogram}; -use zenith_utils::bin_ser::BeSer; -use zenith_utils::lsn::Lsn; - -// Magic constant to identify a Zenith segment image file -static IMAGE_FILE_MAGIC: u32 = 0x5A616E01 + 1; - -static BASE_IMAGES_CHAPTER: u64 = 1; - - -// Metrics collected on operations on the storage repository. -lazy_static! { - static ref RECONSTRUCT_TIME: Histogram = register_histogram!( - "pageserver_image_reconstruct_time", - "FIXME Time spent on storage operations" - ) - .expect("failed to define a metric"); -} - -/// -/// SnapshotLayer is the in-memory data structure associated with an -/// on-disk snapshot file. We keep a SnapshotLayer in memory for each -/// file, in the LayerMap. If a layer is in "loaded" state, we have a -/// copy of the file in memory, in 'inner'. Otherwise the struct is -/// just a placeholder for a file that exists on disk, and it needs to -/// be loaded before using it in queries. -/// -pub struct ImageLayer { - conf: &'static PageServerConf, - pub tenantid: ZTenantId, - pub timelineid: ZTimelineId, - pub seg: SegmentTag, - - // This entry contains an image of all pages as of this LSN - pub lsn: Lsn, - - inner: Mutex, -} - -pub struct ImageLayerInner { - /// If false, the 'page_versions' and 'relsizes' have not been - /// loaded into memory yet. - loaded: bool, - - // indexed by block number (within segment) - base_images: Vec, -} - -impl Layer for ImageLayer { - fn get_timeline_id(&self) -> ZTimelineId { - return self.timelineid; - } - - fn get_seg_tag(&self) -> SegmentTag { - return self.seg; - } - - fn is_dropped(&self) -> bool { - return false; - } - - fn get_start_lsn(&self) -> Lsn { - return self.lsn; - } - - fn get_end_lsn(&self) -> Lsn { - return self.lsn; - } - - /// Look up given page in the cache. - fn get_page_reconstruct_data( - &self, - blknum: u32, - lsn: Lsn, - reconstruct_data: &mut PageReconstructData, - ) -> Result> { - let need_base_image_lsn: Option; - - assert!(lsn >= self.lsn); - - { - let inner = self.load()?; - - let base_blknum: usize = (blknum % RELISH_SEG_SIZE) as usize; - if let Some(img) = inner.base_images.get(base_blknum) { - reconstruct_data.page_img = Some(img.clone()); - need_base_image_lsn = None; - } else { - bail!("no base img found for {} at blk {} at LSN {}", self.seg, base_blknum, lsn); - } - // release lock on 'inner' - } - - Ok(need_base_image_lsn) - } - - /// Get size of the relation at given LSN - fn get_seg_size(&self, _lsn: Lsn) -> Result { - - let inner = self.load()?; - let result = inner.base_images.len() as u32; - - Ok(result) - } - - /// Does this segment exist at given LSN? - fn get_seg_exists(&self, _lsn: Lsn) -> Result { - Ok(true) - } - - - /// - /// Release most of the memory used by this layer. If it's accessed again later, - /// it will need to be loaded back. - /// - fn unload(&self) -> Result<()> { - let mut inner = self.inner.lock().unwrap(); - inner.base_images = Vec::new(); - inner.loaded = false; - Ok(()) - } - - fn delete(&self) -> Result<()> { - // delete underlying file - fs::remove_file(self.path())?; - Ok(()) - } - - fn is_incremental(&self) -> bool { - false - } -} - -impl ImageLayer { - fn path(&self) -> PathBuf { - Self::path_for( - self.conf, - self.timelineid, - self.tenantid, - &ImageFileName { - seg: self.seg, - lsn: self.lsn, - }, - ) - } - - fn path_for( - conf: &'static PageServerConf, - timelineid: ZTimelineId, - tenantid: ZTenantId, - fname: &ImageFileName, - ) -> PathBuf { - conf.timeline_path(&timelineid, &tenantid) - .join(fname.to_string()) - } - - /// Create a new snapshot file, using the given btreemaps containing the page versions and - /// relsizes. - /// FIXME comment - /// This is used to write the in-memory layer to disk. The in-memory layer uses the same - /// data structure with two btreemaps as we do, so passing the btreemaps is currently - /// expedient. - pub fn create( - conf: &'static PageServerConf, - timelineid: ZTimelineId, - tenantid: ZTenantId, - seg: SegmentTag, - lsn: Lsn, - base_images: Vec, - ) -> Result { - - let layer = ImageLayer { - conf: conf, - timelineid: timelineid, - tenantid: tenantid, - seg: seg, - lsn: lsn, - inner: Mutex::new(ImageLayerInner { - loaded: true, - base_images: base_images, - }), - }; - let inner = layer.inner.lock().unwrap(); - - // Write the images into a file - let path = layer.path(); - - // Note: This overwrites any existing file. There shouldn't be any. - // FIXME: throw an error instead? - let file = File::create(&path)?; - let book = BookWriter::new(file, IMAGE_FILE_MAGIC)?; - - // Write out the base images - let mut chapter = book.new_chapter(BASE_IMAGES_CHAPTER); - let buf = Vec::ser(&inner.base_images)?; - - chapter.write_all(&buf)?; - let book = chapter.close()?; - - book.close()?; - - trace!("saved {}", &path.display()); - - drop(inner); - - Ok(layer) - } - - pub fn create_from_src( - conf: &'static PageServerConf, - timeline: &LayeredTimeline, - src: &dyn Layer, - lsn: Lsn, - ) -> Result { - let seg = src.get_seg_tag(); - let timelineid = timeline.timelineid; - - let startblk; - let size; - if seg.rel.is_blocky() { - size = src.get_seg_size(lsn)?; - startblk = seg.segno * RELISH_SEG_SIZE; - } else { - size = 1; - startblk = 0; - } - - trace!( - "creating new ImageLayer for {} on timeline {} at {}", - seg, - timelineid, - lsn, - ); - - let mut base_images: Vec = Vec::new(); - for blknum in startblk..(startblk+size) { - let img = - RECONSTRUCT_TIME - .observe_closure_duration(|| { - timeline.materialize_page(seg, blknum, lsn, &*src) - })?; - - base_images.push(img); - } - - Self::create(conf, timelineid, timeline.tenantid, seg, lsn, - base_images) - } - - - /// - /// Load the contents of the file into memory - /// - fn load(&self) -> Result> { - // quick exit if already loaded - let mut inner = self.inner.lock().unwrap(); - - if inner.loaded { - return Ok(inner); - } - - let path = Self::path_for( - self.conf, - self.timelineid, - self.tenantid, - &ImageFileName { - seg: self.seg, - lsn: self.lsn, - }, - ); - - let file = File::open(&path)?; - let book = Book::new(file)?; - - let chapter = book.read_chapter(BASE_IMAGES_CHAPTER)?; - let base_images = Vec::des(&chapter)?; - - debug!("loaded from {}", &path.display()); - - *inner = ImageLayerInner { - loaded: true, - base_images, - }; - - Ok(inner) - } - - /// Create an ImageLayer represent a file on disk - pub fn load_image_layer( - conf: &'static PageServerConf, - timelineid: ZTimelineId, - tenantid: ZTenantId, - filename: &ImageFileName, - ) -> Result { - let layer = ImageLayer { - conf, - timelineid, - tenantid, - seg: filename.seg, - lsn: filename.lsn, - inner: Mutex::new(ImageLayerInner { - loaded: false, - base_images: Vec::new(), - }), - }; - - Ok(layer) - } - - /// debugging function to print out the contents of the layer - #[allow(unused)] - pub fn dump(&self) -> String { - let mut result = format!( - "----- image layer for {} at {} ----\n", - self.seg, self.lsn, - ); - - //let inner = self.inner.lock().unwrap(); - - //for (k, v) in inner.page_versions.iter() { - // result += &format!("blk {} at {}: {}/{}\n", k.0, k.1, v.page_image.is_some(), v.record.is_some()); - //} - - result - } -} diff --git a/pageserver/src/layered_repository/inmemory_layer.rs b/pageserver/src/layered_repository/inmemory_layer.rs index 8eba41af5e..70a7b7216e 100644 --- a/pageserver/src/layered_repository/inmemory_layer.rs +++ b/pageserver/src/layered_repository/inmemory_layer.rs @@ -6,7 +6,8 @@ use crate::layered_repository::storage_layer::{ Layer, PageReconstructData, PageVersion, SegmentTag, RELISH_SEG_SIZE, }; use crate::layered_repository::LayeredTimeline; -use crate::layered_repository::{ImageLayer, SnapshotLayer}; +use crate::layered_repository::SnapshotLayer; +use crate::repository::WALRecord; use crate::PageServerConf; use crate::{ZTenantId, ZTimelineId}; use anyhow::{bail, Result}; @@ -31,13 +32,9 @@ pub struct InMemoryLayer { /// start_lsn: Lsn, - oldest_pending_lsn: Lsn, - /// The above fields never change. The parts that do change are in 'inner', /// and protected by mutex. inner: Mutex, - - img_layer: Option>, } pub struct InMemoryLayerInner { @@ -54,11 +51,6 @@ pub struct InMemoryLayerInner { /// `segsizes` tracks the size of the segment at different points in time. /// segsizes: BTreeMap, - - /// - /// Memory usage - /// - mem_used: usize, } impl Layer for InMemoryLayer { @@ -128,16 +120,7 @@ impl Layer for InMemoryLayer { } } - // Use the base image, if needed - if let Some(need_lsn) = need_base_image_lsn { - if let Some(img_layer) = &self.img_layer { - need_base_image_lsn = img_layer.get_page_reconstruct_data(blknum, need_lsn, reconstruct_data)?; - } else { - bail!("no base img found for {} at blk {} at LSN {}", self.seg, blknum, lsn); - } - } - - // release lock on 'inner' + // release lock on 'page_versions' } Ok(need_base_image_lsn) @@ -145,23 +128,18 @@ impl Layer for InMemoryLayer { /// Get size of the relation at given LSN fn get_seg_size(&self, lsn: Lsn) -> Result { - assert!(lsn >= self.start_lsn); - // Scan the BTreeMap backwards, starting from the given entry. let inner = self.inner.lock().unwrap(); let mut iter = inner.segsizes.range((Included(&Lsn(0)), Included(&lsn))); - let result; if let Some((_entry_lsn, entry)) = iter.next_back() { - result = *entry; - // Use the base image if needed - } else if let Some(img_layer) = &self.img_layer { - result = img_layer.get_seg_size(lsn)?; + let result = *entry; + drop(inner); + trace!("get_seg_size: {} at {} -> {}", self.seg, lsn, result); + Ok(result) } else { - result = 0; + bail!("No size found for {} at {} in memory", self.seg, lsn); } - trace!("get_seg_size: {} at {} -> {}", self.seg, lsn, result); - Ok(result) } /// Does this segment exist at given LSN? @@ -178,31 +156,9 @@ impl Layer for InMemoryLayer { // Otherwise, it exists Ok(true) } - - - /// - /// Release most of the memory used by this layer. If it's accessed again later, - /// it will need to be loaded back. - /// - fn unload(&self) -> Result<()> { - Ok(()) - } - - fn delete(&self) -> Result<()> { - Ok(()) - } - - fn is_incremental(&self) -> bool { - true - } } impl InMemoryLayer { - - pub fn get_oldest_pending_lsn(&self) -> Lsn { - self.oldest_pending_lsn - } - /// /// Create a new, empty, in-memory layer /// @@ -212,7 +168,6 @@ impl InMemoryLayer { tenantid: ZTenantId, seg: SegmentTag, start_lsn: Lsn, - oldest_pending_lsn: Lsn, ) -> Result { trace!( "initializing new empty InMemoryLayer for writing {} on timeline {} at {}", @@ -227,14 +182,11 @@ impl InMemoryLayer { tenantid, seg, start_lsn, - oldest_pending_lsn, inner: Mutex::new(InMemoryLayerInner { drop_lsn: None, page_versions: BTreeMap::new(), segsizes: BTreeMap::new(), - mem_used: 0, }), - img_layer: None, }) } @@ -276,9 +228,6 @@ impl InMemoryLayer { self.timelineid, lsn ); - - let mem_size = pv.get_mem_size(); - let mut inner = self.inner.lock().unwrap(); let old = inner.page_versions.insert((blknum, lsn), pv); @@ -289,8 +238,6 @@ impl InMemoryLayer { "Page version of rel {} blk {} at {} already exists", self.seg.rel, blknum, lsn ); - } else { - inner.mem_used += mem_size; } // Also update the relation size, if this extended the relation. @@ -302,8 +249,6 @@ impl InMemoryLayer { let oldsize; if let Some((_entry_lsn, entry)) = iter.next_back() { oldsize = *entry; - } else if let Some(img_layer) = &self.img_layer { - oldsize = img_layer.get_seg_size(lsn)?; } else { oldsize = 0; //bail!("No old size found for {} at {}", self.tag, lsn); @@ -352,37 +297,56 @@ impl InMemoryLayer { /// Initialize a new InMemoryLayer for, by copying the state at the given /// point in time from given existing layer. /// - pub fn create_successor_layer( + pub fn copy_snapshot( conf: &'static PageServerConf, - src: Arc, + timeline: &LayeredTimeline, + src: &dyn Layer, timelineid: ZTimelineId, tenantid: ZTenantId, - start_lsn: Lsn, - oldest_pending_lsn: Lsn, + lsn: Lsn, ) -> Result { - let seg = src.get_seg_tag(); - trace!( "initializing new InMemoryLayer for writing {} on timeline {} at {}", - seg, + src.get_seg_tag(), timelineid, - start_lsn, + lsn ); + let mut page_versions = BTreeMap::new(); + let mut segsizes = BTreeMap::new(); + + let seg = src.get_seg_tag(); + + let startblk; + let size; + if seg.rel.is_blocky() { + size = src.get_seg_size(lsn)?; + segsizes.insert(lsn, size); + startblk = seg.segno * RELISH_SEG_SIZE; + } else { + size = 1; + startblk = 0; + } + + for blknum in startblk..(startblk + size) { + let img = timeline.materialize_page(seg, blknum, lsn, src)?; + let pv = PageVersion { + page_image: Some(img), + record: None, + }; + page_versions.insert((blknum, lsn), pv); + } Ok(InMemoryLayer { conf, timelineid, tenantid, - seg, - start_lsn, - oldest_pending_lsn, + seg: src.get_seg_tag(), + start_lsn: lsn, inner: Mutex::new(InMemoryLayerInner { drop_lsn: None, - page_versions: BTreeMap::new(), - segsizes: BTreeMap::new(), - mem_used: 0, + page_versions: page_versions, + segsizes: segsizes, }), - img_layer: Some(src), }) } @@ -402,7 +366,7 @@ impl InMemoryLayer { cutoff_lsn: Lsn, // This is needed just to call materialize_page() timeline: &LayeredTimeline, - ) -> Result<(Vec>, Option>)> { + ) -> Result<(Option>, Option>)> { info!( "freezing in memory layer for {} on timeline {} at {}", self.seg, self.timelineid, cutoff_lsn @@ -458,18 +422,6 @@ impl InMemoryLayer { // we can release the lock now. drop(inner); - let mut historics: Vec> = Vec::new(); - - // write a new base image layer at the cutoff point - let imgfile = ImageLayer::create_from_src( - self.conf, - timeline, - self, - end_lsn, - )?; - let imgfile_rc: Arc = Arc::new(imgfile); - historics.push(Arc::clone(&imgfile_rc)); - // Write the page versions before the cutoff to disk. let snapfile = SnapshotLayer::create( self.conf, @@ -479,38 +431,36 @@ impl InMemoryLayer { self.start_lsn, end_lsn, dropped, - self.img_layer.clone(), before_page_versions, before_segsizes, )?; - let snapfile_rc: Arc = Arc::new(snapfile); - historics.push(snapfile_rc); // If there were any "new" page versions, initialize a new in-memory layer to hold // them - let new_open = - if !after_segsizes.is_empty() || !after_page_versions.is_empty() { - info!("created new in-mem layer for {} {}-", self.seg, end_lsn); + let new_open = if !after_segsizes.is_empty() || !after_page_versions.is_empty() { + info!("created new in-mem layer for {} {}-", self.seg, end_lsn); - let new_open = Self::create_successor_layer( - self.conf, - imgfile_rc, - self.timelineid, - self.tenantid, - end_lsn, - end_lsn, - )?; - let mut new_inner = new_open.inner.lock().unwrap(); - new_inner.page_versions.append(&mut after_page_versions); - new_inner.segsizes.append(&mut after_segsizes); - drop(new_inner); + let new_open = Self::copy_snapshot( + self.conf, + timeline, + &snapfile, + self.timelineid, + self.tenantid, + end_lsn, + )?; + let mut new_inner = new_open.inner.lock().unwrap(); + new_inner.page_versions.append(&mut after_page_versions); + new_inner.segsizes.append(&mut after_segsizes); + drop(new_inner); - Some(Arc::new(new_open)) - } else { - None - }; + Some(Arc::new(new_open)) + } else { + None + }; - Ok((historics, new_open)) + let new_historic = Some(Arc::new(snapfile)); + + Ok((new_historic, new_open)) } /// debugging function to print out the contents of the layer diff --git a/pageserver/src/layered_repository/layer_map.rs b/pageserver/src/layered_repository/layer_map.rs index 78d6da3312..0cf9f93419 100644 --- a/pageserver/src/layered_repository/layer_map.rs +++ b/pageserver/src/layered_repository/layer_map.rs @@ -3,79 +3,38 @@ //! //! When the timeline is first accessed, the server lists of all snapshot files //! in the timelines/ directory, and populates this map with -//! SnapshotLayers corresponding to each file. When new WAL is received, FIXME +//! SnapshotLayers corresponding to each file. When new WAL is received, //! we create InMemoryLayers to hold the incoming records. Now and then, //! in the checkpoint() function, the in-memory layers are frozen, forming //! new snapshot layers and corresponding files are written to disk. //! use crate::layered_repository::storage_layer::{Layer, SegmentTag}; -use crate::layered_repository::{InMemoryLayer}; +use crate::layered_repository::{InMemoryLayer, SnapshotLayer}; use crate::relish::*; use anyhow::Result; -use lazy_static::lazy_static; use log::*; use std::collections::HashSet; -use std::collections::{BinaryHeap, BTreeMap, HashMap}; +use std::collections::{BTreeMap, HashMap}; use std::ops::Bound::Included; -use std::cmp::Ordering; use std::sync::Arc; -use zenith_metrics::{register_int_gauge, IntGauge}; use zenith_utils::lsn::Lsn; -lazy_static! { - static ref NUM_INMEMORY_LAYERS: IntGauge = - register_int_gauge!("pageserver_inmemory_layers", "Number of layers in memory") - .expect("failed to define a metric"); - - static ref NUM_ONDISK_LAYERS: IntGauge = - register_int_gauge!("pageserver_ondisk_layers", "Number of layers on-disk") - .expect("failed to define a metric"); -} - /// -/// LayerMap tracks what layers exist on a timeline. The last layer that is +/// LayerMap tracks what layers exist or a timeline. The last layer that is /// open for writes is always an InMemoryLayer, and is tracked separately /// because there can be only one for each segment. The older layers, /// stored on disk, are kept in a BTreeMap keyed by the layer's start LSN. /// pub struct LayerMap { segs: HashMap, - - // FIXME: explain this - open_segs: BinaryHeap, } struct SegEntry { pub open: Option>, - pub historic: BTreeMap>, + pub historic: BTreeMap>, } -struct OpenSegEntry { - pub oldest_pending_lsn: Lsn, - pub layer: Arc, -} -impl Ord for OpenSegEntry { - fn cmp(&self, other: &Self) -> Ordering { - // BinaryHeap is a max-heap, and we want a min-heap. Reverse the ordering here - // to get that. - other.oldest_pending_lsn.cmp(&self.oldest_pending_lsn) - } -} -impl PartialOrd for OpenSegEntry { - fn partial_cmp(&self, other: &Self) -> Option { - // BinaryHeap is a max-heap, and we want a min-heap. Reverse the ordering here - // to get that. - other.oldest_pending_lsn.partial_cmp(&self.oldest_pending_lsn) - } -} -impl PartialEq for OpenSegEntry { - fn eq(&self, other: &Self) -> bool { - self.oldest_pending_lsn.eq(&other.oldest_pending_lsn) - } -} -impl Eq for OpenSegEntry {} - impl LayerMap { /// /// Look up using the given segment tag and LSN. This differs from a plain @@ -129,38 +88,20 @@ impl LayerMap { if let Some(_old) = &segentry.open { // FIXME: shouldn't exist, but check } - segentry.open = Some(Arc::clone(&layer)); + segentry.open = Some(layer); } else { let segentry = SegEntry { - open: Some(Arc::clone(&layer)), + open: Some(layer), historic: BTreeMap::new(), }; self.segs.insert(tag, segentry); } - - let opensegentry = OpenSegEntry { - oldest_pending_lsn: layer.get_oldest_pending_lsn(), - layer: layer, - }; - self.open_segs.push(opensegentry); - - NUM_INMEMORY_LAYERS.inc(); - } - - // replace given open layer with other layers. - pub fn pop_oldest(&mut self) { - let opensegentry = self.open_segs.pop().unwrap(); - let segtag = opensegentry.layer.get_seg_tag(); - - let mut segentry = self.segs.get_mut(&segtag).unwrap(); - segentry.open = None; - NUM_INMEMORY_LAYERS.dec(); } /// /// Insert an on-disk layer /// - pub fn insert_historic(&mut self, layer: Arc) { + pub fn insert_historic(&mut self, layer: Arc) { let tag = layer.get_seg_tag(); let start_lsn = layer.get_start_lsn(); @@ -176,7 +117,6 @@ impl LayerMap { }; self.segs.insert(tag, segentry); } - NUM_ONDISK_LAYERS.inc(); } /// @@ -184,14 +124,13 @@ impl LayerMap { /// /// This should be called when the corresponding file on disk has been deleted. /// - pub fn remove_historic(&mut self, layer: &dyn Layer) { + pub fn remove_historic(&mut self, layer: &SnapshotLayer) { let tag = layer.get_seg_tag(); let start_lsn = layer.get_start_lsn(); if let Some(segentry) = self.segs.get_mut(&tag) { segentry.historic.remove(&start_lsn); } - NUM_ONDISK_LAYERS.dec(); } pub fn list_rels(&self, spcnode: u32, dbnode: u32) -> Result> { @@ -230,16 +169,14 @@ impl LayerMap { /// Is there a newer layer for given segment? pub fn newer_layer_exists(&self, seg: SegmentTag, lsn: Lsn) -> bool { if let Some(segentry) = self.segs.get(&seg) { - // open layer is always incremental so it doesn't count + if let Some(_open) = &segentry.open { + return true; + } for (newer_lsn, layer) in segentry .historic .range((Included(lsn), Included(Lsn(u64::MAX)))) { - // FIXME: incremental layers don't count - if layer.is_incremental() { - continue; - } if layer.get_end_lsn() > lsn { trace!( "found later layer for {}, {} {}-{}", @@ -259,11 +196,10 @@ impl LayerMap { false } - pub fn get_oldest_open_layer(&mut self) -> Option> { - if let Some(opensegentry) = self.open_segs.peek() { - Some(Arc::clone(&opensegentry.layer)) - } else { - None + pub fn iter_open_layers(&mut self) -> OpenLayerIter { + OpenLayerIter { + last: None, + segiter: self.segs.iter_mut(), } } @@ -279,18 +215,53 @@ impl Default for LayerMap { fn default() -> Self { LayerMap { segs: HashMap::new(), - open_segs: BinaryHeap::new(), } } } +pub struct OpenLayerIter<'a> { + last: Option<&'a mut SegEntry>, + + segiter: std::collections::hash_map::IterMut<'a, SegmentTag, SegEntry>, +} + +impl<'a> OpenLayerIter<'a> { + pub fn replace(&mut self, replacement: Option>) { + let segentry = self.last.as_mut().unwrap(); + segentry.open = replacement; + } + + pub fn insert_historic(&mut self, new_layer: Arc) { + let start_lsn = new_layer.get_start_lsn(); + + let segentry = self.last.as_mut().unwrap(); + segentry.historic.insert(start_lsn, new_layer); + } +} + +impl<'a> Iterator for OpenLayerIter<'a> { + type Item = Arc; + + fn next(&mut self) -> std::option::Option<::Item> { + while let Some((_seg, entry)) = self.segiter.next() { + if let Some(open) = &entry.open { + let op = Arc::clone(&open); + self.last = Some(entry); + return Some(op); + } + } + self.last = None; + None + } +} + pub struct HistoricLayerIter<'a> { segiter: std::collections::hash_map::Iter<'a, SegmentTag, SegEntry>, - iter: Option>>, + iter: Option>>, } impl<'a> Iterator for HistoricLayerIter<'a> { - type Item = Arc; + type Item = Arc; fn next(&mut self) -> std::option::Option<::Item> { loop { diff --git a/pageserver/src/layered_repository/snapshot_layer.rs b/pageserver/src/layered_repository/snapshot_layer.rs index bc97be9835..e0f4e77995 100644 --- a/pageserver/src/layered_repository/snapshot_layer.rs +++ b/pageserver/src/layered_repository/snapshot_layer.rs @@ -36,17 +36,17 @@ //! //! A snapshot file is constructed using the 'bookfile' crate. Each file consists of two //! parts: the page versions and the relation sizes. They are stored as separate chapters. -//! FIXME //! use crate::layered_repository::storage_layer::{ Layer, PageReconstructData, PageVersion, SegmentTag, }; -use crate::layered_repository::filename::{SnapshotFileName}; +use crate::relish::*; use crate::PageServerConf; use crate::{ZTenantId, ZTimelineId}; use anyhow::{bail, Result}; use log::*; use std::collections::BTreeMap; +use std::fmt; use std::fs; use std::fs::File; use std::io::Write; @@ -65,6 +65,145 @@ static SNAPSHOT_FILE_MAGIC: u32 = 0x5A616E01; static PAGE_VERSIONS_CHAPTER: u64 = 1; static REL_SIZES_CHAPTER: u64 = 2; +#[derive(Debug, PartialEq, Eq, PartialOrd, Ord, Clone)] +struct SnapshotFileName { + seg: SegmentTag, + start_lsn: Lsn, + end_lsn: Lsn, + dropped: bool, +} + +impl SnapshotFileName { + fn from_str(fname: &str) -> Option { + // Split the filename into parts + // + // ______ + // + // or if it was dropped: + // + // _______DROPPED + // + let rel; + let mut parts; + if let Some(rest) = fname.strip_prefix("rel_") { + parts = rest.split('_'); + rel = RelishTag::Relation(RelTag { + spcnode: parts.next()?.parse::().ok()?, + dbnode: parts.next()?.parse::().ok()?, + relnode: parts.next()?.parse::().ok()?, + forknum: parts.next()?.parse::().ok()?, + }); + } else if let Some(rest) = fname.strip_prefix("pg_xact_") { + parts = rest.split('_'); + rel = RelishTag::Slru { + slru: SlruKind::Clog, + segno: u32::from_str_radix(parts.next()?, 16).ok()?, + }; + } else if let Some(rest) = fname.strip_prefix("pg_multixact_members_") { + parts = rest.split('_'); + rel = RelishTag::Slru { + slru: SlruKind::MultiXactMembers, + segno: u32::from_str_radix(parts.next()?, 16).ok()?, + }; + } else if let Some(rest) = fname.strip_prefix("pg_multixact_offsets_") { + parts = rest.split('_'); + rel = RelishTag::Slru { + slru: SlruKind::MultiXactOffsets, + segno: u32::from_str_radix(parts.next()?, 16).ok()?, + }; + } else if let Some(rest) = fname.strip_prefix("pg_filenodemap_") { + parts = rest.split('_'); + rel = RelishTag::FileNodeMap { + spcnode: parts.next()?.parse::().ok()?, + dbnode: parts.next()?.parse::().ok()?, + }; + } else if let Some(rest) = fname.strip_prefix("pg_twophase_") { + parts = rest.split('_'); + rel = RelishTag::TwoPhase { + xid: parts.next()?.parse::().ok()?, + }; + } else if let Some(rest) = fname.strip_prefix("pg_control_checkpoint_") { + parts = rest.split('_'); + rel = RelishTag::Checkpoint; + } else if let Some(rest) = fname.strip_prefix("pg_control_") { + parts = rest.split('_'); + rel = RelishTag::ControlFile; + } else { + return None; + } + + let segno = parts.next()?.parse::().ok()?; + + let seg = SegmentTag { rel, segno }; + + let start_lsn = Lsn::from_hex(parts.next()?).ok()?; + let end_lsn = Lsn::from_hex(parts.next()?).ok()?; + + let mut dropped = false; + if let Some(suffix) = parts.next() { + if suffix == "DROPPED" { + dropped = true; + } else { + warn!("unrecognized filename in timeline dir: {}", fname); + return None; + } + } + if parts.next().is_some() { + warn!("unrecognized filename in timeline dir: {}", fname); + return None; + } + + Some(SnapshotFileName { + seg, + start_lsn, + end_lsn, + dropped, + }) + } + + fn to_string(&self) -> String { + let basename = match self.seg.rel { + RelishTag::Relation(reltag) => format!( + "rel_{}_{}_{}_{}", + reltag.spcnode, reltag.dbnode, reltag.relnode, reltag.forknum + ), + RelishTag::Slru { + slru: SlruKind::Clog, + segno, + } => format!("pg_xact_{:04X}", segno), + RelishTag::Slru { + slru: SlruKind::MultiXactMembers, + segno, + } => format!("pg_multixact_members_{:04X}", segno), + RelishTag::Slru { + slru: SlruKind::MultiXactOffsets, + segno, + } => format!("pg_multixact_offsets_{:04X}", segno), + RelishTag::FileNodeMap { spcnode, dbnode } => { + format!("pg_filenodemap_{}_{}", spcnode, dbnode) + } + RelishTag::TwoPhase { xid } => format!("pg_twophase_{}", xid), + RelishTag::Checkpoint => format!("pg_control_checkpoint"), + RelishTag::ControlFile => format!("pg_control"), + }; + + format!( + "{}_{}_{:016X}_{:016X}{}", + basename, + self.seg.segno, + u64::from(self.start_lsn), + u64::from(self.end_lsn), + if self.dropped { "_DROPPED" } else { "" } + ) + } +} + +impl fmt::Display for SnapshotFileName { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + write!(f, "{}", self.to_string()) + } +} + /// /// SnapshotLayer is the in-memory data structure associated with an /// on-disk snapshot file. We keep a SnapshotLayer in memory for each @@ -88,8 +227,6 @@ pub struct SnapshotLayer { dropped: bool, inner: Mutex, - - img_layer: Option>, } pub struct SnapshotLayerInner { @@ -162,15 +299,6 @@ impl Layer for SnapshotLayer { } } - // Use the base image, if needed - if let Some(need_lsn) = need_base_image_lsn { - if let Some(img_layer) = &self.img_layer { - need_base_image_lsn = img_layer.get_page_reconstruct_data(blknum, need_lsn, reconstruct_data)?; - } else { - bail!("no base img found for {} at blk {} at LSN {}", self.seg, blknum, lsn); - } - } - // release lock on 'inner' } @@ -179,23 +307,26 @@ impl Layer for SnapshotLayer { /// Get size of the relation at given LSN fn get_seg_size(&self, lsn: Lsn) -> Result { - - assert!(lsn >= self.start_lsn); - // Scan the BTreeMap backwards, starting from the given entry. let inner = self.load()?; let mut iter = inner.relsizes.range((Included(&Lsn(0)), Included(&lsn))); - let result; if let Some((_entry_lsn, entry)) = iter.next_back() { - result = *entry; - // Use the base image if needed - } else if let Some(img_layer) = &self.img_layer { - result = img_layer.get_seg_size(lsn)?; + let result = *entry; + drop(inner); + trace!("get_seg_size: {} at {} -> {}", self.seg, lsn, result); + Ok(result) } else { - result = 0; + error!( + "No size found for {} at {} in snapshot layer {} {}-{}", + self.seg, lsn, self.seg, self.start_lsn, self.end_lsn + ); + bail!( + "No size found for {} at {} in snapshot layer", + self.seg, + lsn + ); } - Ok(result) } /// Does this segment exist at given LSN? @@ -208,28 +339,6 @@ impl Layer for SnapshotLayer { // Otherwise, it exists. Ok(true) } - - /// - /// Release most of the memory used by this layer. If it's accessed again later, - /// it will need to be loaded back. - /// - fn unload(&self) -> Result<()> { - let mut inner = self.inner.lock().unwrap(); - inner.page_versions = BTreeMap::new(); - inner.relsizes = BTreeMap::new(); - inner.loaded = false; - Ok(()) - } - - fn delete(&self) -> Result<()> { - // delete underlying file - fs::remove_file(self.path())?; - Ok(()) - } - - fn is_incremental(&self) -> bool { - true - } } impl SnapshotLayer { @@ -271,11 +380,9 @@ impl SnapshotLayer { start_lsn: Lsn, end_lsn: Lsn, dropped: bool, - img_layer: Option>, page_versions: BTreeMap<(u32, Lsn), PageVersion>, relsizes: BTreeMap, ) -> Result { - let snapfile = SnapshotLayer { conf: conf, timelineid: timelineid, @@ -289,7 +396,6 @@ impl SnapshotLayer { page_versions: page_versions, relsizes: relsizes, }), - img_layer, }; let inner = snapfile.inner.lock().unwrap(); @@ -301,7 +407,7 @@ impl SnapshotLayer { let file = File::create(&path)?; let book = BookWriter::new(file, SNAPSHOT_FILE_MAGIC)?; - // Write out the other page versions + // Write out page versions let mut chapter = book.new_chapter(PAGE_VERSIONS_CHAPTER); let buf = BTreeMap::ser(&inner.page_versions)?; chapter.write_all(&buf)?; @@ -368,30 +474,56 @@ impl SnapshotLayer { /// Create SnapshotLayers representing all files on disk /// // TODO: returning an Iterator would be more idiomatic - pub fn load_snapshot_layer( + pub fn list_snapshot_files( conf: &'static PageServerConf, timelineid: ZTimelineId, tenantid: ZTenantId, - filename: &SnapshotFileName, - ) -> Result { - let snapfile = SnapshotLayer { - conf, - timelineid, - tenantid, - seg: filename.seg, - start_lsn: filename.start_lsn, - end_lsn: filename.end_lsn, - dropped: filename.dropped, - inner: Mutex::new(SnapshotLayerInner { - loaded: false, - page_versions: BTreeMap::new(), - relsizes: BTreeMap::new(), - }), - // FIXME: This doesn't work across restarts. - img_layer: None, - }; + ) -> Result>> { + let path = conf.timeline_path(&timelineid, &tenantid); - Ok(snapfile) + let mut snapfiles: Vec> = Vec::new(); + for direntry in fs::read_dir(path)? { + let fname = direntry?.file_name(); + let fname = fname.to_str().unwrap(); + + if let Some(snapfilename) = SnapshotFileName::from_str(fname) { + let snapfile = SnapshotLayer { + conf, + timelineid, + tenantid, + seg: snapfilename.seg, + start_lsn: snapfilename.start_lsn, + end_lsn: snapfilename.end_lsn, + dropped: snapfilename.dropped, + inner: Mutex::new(SnapshotLayerInner { + loaded: false, + page_versions: BTreeMap::new(), + relsizes: BTreeMap::new(), + }), + }; + + snapfiles.push(Arc::new(snapfile)); + } + } + return Ok(snapfiles); + } + + pub fn delete(&self) -> Result<()> { + // delete underlying file + fs::remove_file(self.path())?; + Ok(()) + } + + /// + /// Release most of the memory used by this layer. If it's accessed again later, + /// it will need to be loaded back. + /// + pub fn unload(&self) -> Result<()> { + let mut inner = self.inner.lock().unwrap(); + inner.page_versions = BTreeMap::new(); + inner.relsizes = BTreeMap::new(); + inner.loaded = false; + Ok(()) } /// debugging function to print out the contents of the layer diff --git a/pageserver/src/layered_repository/storage_layer.rs b/pageserver/src/layered_repository/storage_layer.rs index 167aaaecde..0a181a1aac 100644 --- a/pageserver/src/layered_repository/storage_layer.rs +++ b/pageserver/src/layered_repository/storage_layer.rs @@ -69,28 +69,6 @@ pub struct PageVersion { pub record: Option, } -impl PageVersion { - pub fn get_mem_size(&self) -> usize { - let mut sz = 0; - - // every page version has some fixed overhead. - sz += 16; - - if let Some(img) = &self.page_image { - sz += img.len(); - } - - if let Some(rec) = &self.record { - sz += rec.rec.len(); - - // Some per-record overhead. Not very accurate, but close enough - sz += 32; - } - - sz - } -} - /// /// Data needed to reconstruct a page version /// @@ -147,9 +125,4 @@ pub trait Layer: Send + Sync { fn get_seg_size(&self, lsn: Lsn) -> Result; fn get_seg_exists(&self, lsn: Lsn) -> Result; - - fn is_incremental(&self) -> bool; - - fn unload(&self) -> Result<()>; - fn delete(&self) -> Result<()>; }