diff --git a/pageserver/src/layered_repository.rs b/pageserver/src/layered_repository.rs index a86565ac59..bb9d8af112 100644 --- a/pageserver/src/layered_repository.rs +++ b/pageserver/src/layered_repository.rs @@ -111,6 +111,9 @@ lazy_static! { .expect("failed to define a metric"); } +/// The name of the metadata file pageserver creates per timeline. +pub const METADATA_FILE_NAME: &str = "metadata"; + /// /// Repository consists of multiple timelines. Keep them in a hash table. /// @@ -252,7 +255,16 @@ impl LayeredRepository { )?; // List the layers on disk, and load them into the layer map - timeline.load_layer_map(disk_consistent_lsn)?; + let _loaded_layers = timeline.load_layer_map(disk_consistent_lsn)?; + if self.upload_relishes { + schedule_timeline_upload(()); + // schedule_timeline_upload( + // self.tenantid, + // timelineid, + // loaded_layers, + // disk_consistent_lsn, + // ); + } // needs to be after load_layer_map timeline.init_current_logical_size()?; @@ -351,9 +363,8 @@ impl LayeredRepository { tenantid: ZTenantId, data: &TimelineMetadata, first_save: bool, - ) -> Result { - let timeline_path = conf.timeline_path(&timelineid, &tenantid); - let path = timeline_path.join("metadata"); + ) -> Result<()> { + let path = metadata_path(conf, timelineid, tenantid); // use OpenOptions to ensure file presence is consistent with first_save let mut file = OpenOptions::new() .write(true) @@ -377,11 +388,15 @@ impl LayeredRepository { // fsync the parent directory to ensure the directory entry is durable if first_save { - let timeline_dir = File::open(&timeline_path)?; + let timeline_dir = File::open( + &path + .parent() + .expect("Metadata should always have a parent dir"), + )?; timeline_dir.sync_all()?; } - Ok(path) + Ok(()) } fn load_metadata( @@ -389,7 +404,7 @@ impl LayeredRepository { timelineid: ZTimelineId, tenantid: ZTenantId, ) -> Result { - let path = conf.timeline_path(&timelineid, &tenantid).join("metadata"); + let path = metadata_path(conf, timelineid, tenantid); let metadata_bytes = std::fs::read(&path)?; ensure!(metadata_bytes.len() == METADATA_MAX_SAFE_SIZE); @@ -469,7 +484,7 @@ impl LayeredRepository { let timeline = self.get_timeline_locked(*timelineid, &mut *timelines)?; if let Some(ancestor_timeline) = &timeline.ancestor_timeline { - // If target_timeline is specified, we only need to know branchpoints of its childs + // If target_timeline is specified, we only need to know branchpoints of its children if let Some(timelineid) = target_timelineid { if ancestor_timeline.timelineid == timelineid { all_branchpoints @@ -1023,9 +1038,10 @@ impl LayeredTimeline { } /// - /// Scan the timeline directory to populate the layer map + /// Scan the timeline directory to populate the layer map. + /// Returns all timeline-related files that were found and loaded. /// - fn load_layer_map(&self, disk_consistent_lsn: Lsn) -> anyhow::Result<()> { + fn load_layer_map(&self, disk_consistent_lsn: Lsn) -> anyhow::Result> { info!( "loading layer map for timeline {} into memory", self.timelineid @@ -1035,9 +1051,9 @@ impl LayeredTimeline { filename::list_files(self.conf, self.timelineid, self.tenantid)?; let timeline_path = self.conf.timeline_path(&self.timelineid, &self.tenantid); - + let mut local_layers = Vec::with_capacity(imgfilenames.len() + deltafilenames.len()); // First create ImageLayer structs for each image file. - for filename in imgfilenames.iter() { + for filename in &imgfilenames { if filename.lsn > disk_consistent_lsn { warn!( "found future image layer {} on timeline {}", @@ -1056,11 +1072,11 @@ impl LayeredTimeline { layer.get_start_lsn(), self.timelineid ); + local_layers.push(layer.path()); layers.insert_historic(Arc::new(layer)); } - // Then for the Delta files. - for filename in deltafilenames.iter() { + for filename in &deltafilenames { ensure!(filename.start_lsn < filename.end_lsn); if filename.end_lsn > disk_consistent_lsn { warn!( @@ -1079,10 +1095,11 @@ impl LayeredTimeline { layer.filename().display(), self.timelineid, ); + local_layers.push(layer.path()); layers.insert_historic(Arc::new(layer)); } - Ok(()) + Ok(local_layers) } /// @@ -1341,7 +1358,7 @@ impl LayeredTimeline { let mut disk_consistent_lsn = last_record_lsn; let mut created_historics = false; - + let mut layer_uploads = Vec::new(); while let Some((oldest_layer, oldest_generation)) = layers.peek_oldest_open() { let oldest_pending_lsn = oldest_layer.get_oldest_pending_lsn(); @@ -1403,8 +1420,13 @@ impl LayeredTimeline { layers.remove_historic(frozen.clone()); // Add the historics to the LayerMap - for n in new_historics { - layers.insert_historic(n); + for delta_layer in new_historics.delta_layers { + layer_uploads.push(delta_layer.path()); + layers.insert_historic(Arc::new(delta_layer)); + } + for image_layer in new_historics.image_layers { + layer_uploads.push(image_layer.path()); + layers.insert_historic(Arc::new(image_layer)); } } @@ -1449,7 +1471,7 @@ impl LayeredTimeline { ancestor_timeline: ancestor_timelineid, ancestor_lsn: self.ancestor_lsn, }; - let _metadata_path = LayeredRepository::save_metadata( + LayeredRepository::save_metadata( self.conf, self.timelineid, self.tenantid, @@ -1458,12 +1480,10 @@ impl LayeredTimeline { )?; if self.upload_relishes { schedule_timeline_upload(()) - // schedule_timeline_upload(LocalTimeline { - // tenant_id: self.tenantid, - // timeline_id: self.timelineid, - // metadata_path, - // image_layers: image_layer_uploads, - // delta_layers: delta_layer_uploads, + // schedule_timeline_upload( + // self.tenantid, + // self.timelineid, + // layer_uploads, // disk_consistent_lsn, // }); } @@ -1896,6 +1916,15 @@ pub fn dump_layerfile_from_path(path: &Path) -> Result<()> { Ok(()) } +fn metadata_path( + conf: &'static PageServerConf, + timelineid: ZTimelineId, + tenantid: ZTenantId, +) -> PathBuf { + conf.timeline_path(&timelineid, &tenantid) + .join(METADATA_FILE_NAME) +} + /// Add a suffix to a layer file's name: .{num}.old /// Uses the first available num (starts at 0) fn rename_to_backup(path: PathBuf) -> anyhow::Result<()> { diff --git a/pageserver/src/layered_repository/delta_layer.rs b/pageserver/src/layered_repository/delta_layer.rs index b96ea88920..e93eddb7e6 100644 --- a/pageserver/src/layered_repository/delta_layer.rs +++ b/pageserver/src/layered_repository/delta_layer.rs @@ -169,29 +169,7 @@ impl Layer for DeltaLayer { } fn filename(&self) -> PathBuf { - PathBuf::from( - DeltaFileName { - seg: self.seg, - start_lsn: self.start_lsn, - end_lsn: self.end_lsn, - dropped: self.dropped, - } - .to_string(), - ) - } - - fn path(&self) -> Option { - Some(Self::path_for( - &self.path_or_conf, - self.timelineid, - self.tenantid, - &DeltaFileName { - seg: self.seg, - start_lsn: self.start_lsn, - end_lsn: self.end_lsn, - dropped: self.dropped, - }, - )) + PathBuf::from(self.layer_name().to_string()) } /// Look up given page in the cache. @@ -300,9 +278,7 @@ impl Layer for DeltaLayer { fn delete(&self) -> Result<()> { // delete underlying file - if let Some(path) = self.path() { - fs::remove_file(path)?; - } + fs::remove_file(self.path())?; Ok(()) } @@ -406,9 +382,7 @@ impl DeltaLayer { let mut inner = delta_layer.inner.lock().unwrap(); // Write the in-memory btreemaps into a file - let path = delta_layer - .path() - .expect("DeltaLayer is supposed to have a layer path on disk"); + let path = delta_layer.path(); // Note: This overwrites any existing file. There shouldn't be any. // FIXME: throw an error instead? @@ -472,12 +446,7 @@ impl DeltaLayer { &self.path_or_conf, self.timelineid, self.tenantid, - &DeltaFileName { - seg: self.seg, - start_lsn: self.start_lsn, - end_lsn: self.end_lsn, - dropped: self.dropped, - }, + &self.layer_name(), ); let file = File::open(&path)?; @@ -586,4 +555,23 @@ impl DeltaLayer { }), }) } + + fn layer_name(&self) -> DeltaFileName { + DeltaFileName { + seg: self.seg, + start_lsn: self.start_lsn, + end_lsn: self.end_lsn, + dropped: self.dropped, + } + } + + /// Path to the layer file in pageserver workdir. + pub fn path(&self) -> PathBuf { + Self::path_for( + &self.path_or_conf, + self.timelineid, + self.tenantid, + &self.layer_name(), + ) + } } diff --git a/pageserver/src/layered_repository/filename.rs b/pageserver/src/layered_repository/filename.rs index 50bfe2977e..afa106f939 100644 --- a/pageserver/src/layered_repository/filename.rs +++ b/pageserver/src/layered_repository/filename.rs @@ -13,6 +13,8 @@ use anyhow::Result; use log::*; use zenith_utils::lsn::Lsn; +use super::METADATA_FILE_NAME; + // Note: LayeredTimeline::load_layer_map() relies on this sort order #[derive(Debug, PartialEq, Eq, PartialOrd, Ord, Clone)] pub struct DeltaFileName { @@ -35,7 +37,7 @@ impl DeltaFileName { /// Parse a string as a delta file name. Returns None if the filename does not /// match the expected pattern. /// - pub fn from_str(fname: &str) -> Option { + pub fn parse_str(fname: &str) -> Option { let rel; let mut parts; if let Some(rest) = fname.strip_prefix("rel_") { @@ -168,7 +170,7 @@ impl ImageFileName { /// Parse a string as an image file name. Returns None if the filename does not /// match the expected pattern. /// - pub fn from_str(fname: &str) -> Option { + pub fn parse_str(fname: &str) -> Option { let rel; let mut parts; if let Some(rest) = fname.strip_prefix("rel_") { @@ -286,11 +288,11 @@ pub fn list_files( let fname = direntry?.file_name(); let fname = fname.to_str().unwrap(); - if let Some(deltafilename) = DeltaFileName::from_str(fname) { + if let Some(deltafilename) = DeltaFileName::parse_str(fname) { deltafiles.push(deltafilename); - } else if let Some(imgfilename) = ImageFileName::from_str(fname) { + } else if let Some(imgfilename) = ImageFileName::parse_str(fname) { imgfiles.push(imgfilename); - } else if fname == "metadata" || fname == "ancestor" || fname.ends_with(".old") { + } else if fname == METADATA_FILE_NAME || fname == "ancestor" || fname.ends_with(".old") { // ignore these } else { warn!("unrecognized filename in timeline dir: {}", fname); diff --git a/pageserver/src/layered_repository/image_layer.rs b/pageserver/src/layered_repository/image_layer.rs index a9487a02d4..744f793558 100644 --- a/pageserver/src/layered_repository/image_layer.rs +++ b/pageserver/src/layered_repository/image_layer.rs @@ -114,25 +114,7 @@ pub struct ImageLayerInner { impl Layer for ImageLayer { fn filename(&self) -> PathBuf { - PathBuf::from( - ImageFileName { - seg: self.seg, - lsn: self.lsn, - } - .to_string(), - ) - } - - fn path(&self) -> Option { - Some(Self::path_for( - &self.path_or_conf, - self.timelineid, - self.tenantid, - &ImageFileName { - seg: self.seg, - lsn: self.lsn, - }, - )) + PathBuf::from(self.layer_name().to_string()) } fn get_timeline_id(&self) -> ZTimelineId { @@ -222,9 +204,7 @@ impl Layer for ImageLayer { fn delete(&self) -> Result<()> { // delete underlying file - if let Some(path) = self.path() { - fs::remove_file(path)?; - } + fs::remove_file(self.path())?; Ok(()) } @@ -300,9 +280,7 @@ impl ImageLayer { let inner = layer.inner.lock().unwrap(); // Write the images into a file - let path = layer - .path() - .expect("ImageLayer is supposed to have a layer path on disk"); + let path = layer.path(); // Note: This overwrites any existing file. There shouldn't be any. // FIXME: throw an error instead? let file = File::create(&path)?; @@ -340,7 +318,7 @@ impl ImageLayer { let writer = book.close()?; writer.get_ref().sync_all()?; - trace!("saved {}", &path.display()); + trace!("saved {}", path.display()); drop(inner); @@ -445,15 +423,7 @@ impl ImageLayer { } fn open_book(&self) -> Result<(PathBuf, Book)> { - let path = Self::path_for( - &self.path_or_conf, - self.timelineid, - self.tenantid, - &ImageFileName { - seg: self.seg, - lsn: self.lsn, - }, - ); + let path = self.path(); let file = File::open(&path)?; let book = Book::new(file)?; @@ -500,4 +470,21 @@ impl ImageLayer { }), }) } + + fn layer_name(&self) -> ImageFileName { + ImageFileName { + seg: self.seg, + lsn: self.lsn, + } + } + + /// Path to the layer file in pageserver workdir. + pub fn path(&self) -> PathBuf { + Self::path_for( + &self.path_or_conf, + self.timelineid, + self.tenantid, + &self.layer_name(), + ) + } } diff --git a/pageserver/src/layered_repository/inmemory_layer.rs b/pageserver/src/layered_repository/inmemory_layer.rs index 5f5307fec0..214c392fb6 100644 --- a/pageserver/src/layered_repository/inmemory_layer.rs +++ b/pageserver/src/layered_repository/inmemory_layer.rs @@ -124,10 +124,6 @@ impl Layer for InMemoryLayer { PathBuf::from(format!("inmem-{}", delta_filename)) } - fn path(&self) -> Option { - None - } - fn get_timeline_id(&self) -> ZTimelineId { self.timelineid } @@ -309,6 +305,18 @@ pub struct FreezeLayers { pub open: Option>, } +/// A result of an inmemory layer data being written to disk. +pub struct LayersOnDisk { + pub delta_layers: Vec, + pub image_layers: Vec, +} + +impl LayersOnDisk { + pub fn is_empty(&self) -> bool { + self.delta_layers.is_empty() && self.image_layers.is_empty() + } +} + impl InMemoryLayer { fn assert_not_frozen(&self) { assert!(self.end_lsn.is_none()); @@ -669,7 +677,7 @@ impl InMemoryLayer { /// WAL records between start and end LSN. (The delta layer is not needed /// when a new relish is created with a single LSN, so that the start and /// end LSN are the same.) - pub fn write_to_disk(&self, timeline: &LayeredTimeline) -> Result>> { + pub fn write_to_disk(&self, timeline: &LayeredTimeline) -> Result { trace!( "write_to_disk {} end_lsn is {} get_end_lsn is {}", self.filename().display(), @@ -678,7 +686,7 @@ impl InMemoryLayer { ); // Grab the lock in read-mode. We hold it over the I/O, but because this - // layer is not writeable anymore, no one should be trying to aquire the + // layer is not writeable anymore, no one should be trying to acquire the // write lock on it, so we shouldn't block anyone. There's one exception // though: another thread might have grabbed a reference to this layer // in `get_layer_for_write' just before the checkpointer called @@ -707,15 +715,17 @@ impl InMemoryLayer { self.start_lsn, drop_lsn ); - return Ok(vec![Arc::new(delta_layer)]); + return Ok(LayersOnDisk { + delta_layers: vec![delta_layer], + image_layers: Vec::new(), + }); } let end_lsn = self.end_lsn.unwrap(); let mut before_page_versions = inner.page_versions.ordered_page_version_iter(Some(end_lsn)); - let mut frozen_layers: Vec> = Vec::new(); - + let mut delta_layers = Vec::new(); if self.start_lsn != end_lsn { let (before_segsizes, _after_segsizes) = inner.segsizes.split_at(&Lsn(end_lsn.0 + 1)); @@ -731,7 +741,7 @@ impl InMemoryLayer { before_page_versions, before_segsizes, )?; - frozen_layers.push(Arc::new(delta_layer)); + delta_layers.push(delta_layer); trace!( "freeze: created delta layer {} {}-{}", self.seg, @@ -746,9 +756,11 @@ impl InMemoryLayer { // Write a new base image layer at the cutoff point let image_layer = ImageLayer::create_from_src(self.conf, timeline, self, end_lsn)?; - frozen_layers.push(Arc::new(image_layer)); trace!("freeze: created image layer {} at {}", self.seg, end_lsn); - Ok(frozen_layers) + Ok(LayersOnDisk { + delta_layers, + image_layers: vec![image_layer], + }) } } diff --git a/pageserver/src/layered_repository/storage_layer.rs b/pageserver/src/layered_repository/storage_layer.rs index a107d63b40..c49fbbdd99 100644 --- a/pageserver/src/layered_repository/storage_layer.rs +++ b/pageserver/src/layered_repository/storage_layer.rs @@ -123,10 +123,6 @@ pub trait Layer: Send + Sync { /// Is the segment represented by this layer dropped by PostgreSQL? fn is_dropped(&self) -> bool; - /// Gets the physical location of the layer on disk. - /// Some layers, such as in-memory, might not have the location. - fn path(&self) -> Option; - /// Filename used to store this layer on disk. (Even in-memory layers /// implement this, to print a handy unique identifier for the layer for /// log messages, even though they're never not on disk.) diff --git a/pageserver/src/relish_storage.rs b/pageserver/src/relish_storage.rs index a687abe489..70d75c34bf 100644 --- a/pageserver/src/relish_storage.rs +++ b/pageserver/src/relish_storage.rs @@ -12,14 +12,12 @@ mod rust_s3; /// local page server layer files with external storage. mod synced_storage; -use std::path::Path; -use std::thread; +use std::{path::Path, thread}; use anyhow::Context; -use self::local_fs::LocalFs; pub use self::synced_storage::schedule_timeline_upload; -use crate::relish_storage::rust_s3::RustS3; +use self::{local_fs::LocalFs, rust_s3::RustS3}; use crate::{PageServerConf, RelishStorageKind}; pub fn run_storage_sync_thread( diff --git a/pageserver/src/relish_storage/rust_s3.rs b/pageserver/src/relish_storage/rust_s3.rs index e98bf8949f..dc29752e99 100644 --- a/pageserver/src/relish_storage/rust_s3.rs +++ b/pageserver/src/relish_storage/rust_s3.rs @@ -5,9 +5,10 @@ use std::path::Path; use anyhow::Context; use s3::{bucket::Bucket, creds::Credentials, region::Region}; -use crate::{relish_storage::strip_workspace_prefix, S3Config}; - -use super::RelishStorage; +use crate::{ + relish_storage::{strip_workspace_prefix, RelishStorage}, + S3Config, +}; const S3_FILE_SEPARATOR: char = '/'; diff --git a/pageserver/src/repository.rs b/pageserver/src/repository.rs index e4412aef6f..e8a7952d77 100644 --- a/pageserver/src/repository.rs +++ b/pageserver/src/repository.rs @@ -209,7 +209,7 @@ impl WALRecord { #[cfg(test)] mod tests { use super::*; - use crate::layered_repository::LayeredRepository; + use crate::layered_repository::{LayeredRepository, METADATA_FILE_NAME}; use crate::walredo::{WalRedoError, WalRedoManager}; use crate::PageServerConf; use hex_literal::hex; @@ -728,7 +728,7 @@ mod tests { repo.create_empty_timeline(TIMELINE_ID)?; drop(repo); - let metadata_path = harness.timeline_path(&TIMELINE_ID).join("metadata"); + let metadata_path = harness.timeline_path(&TIMELINE_ID).join(METADATA_FILE_NAME); assert!(metadata_path.is_file());