diff --git a/pageserver/src/basebackup.rs b/pageserver/src/basebackup.rs index 33f072553f..864c5b8ac8 100644 --- a/pageserver/src/basebackup.rs +++ b/pageserver/src/basebackup.rs @@ -22,8 +22,8 @@ use std::time::SystemTime; use tar::{Builder, EntryType, Header}; use tracing::*; +use crate::layered_repository::Timeline; use crate::reltag::{RelTag, SlruKind}; -use crate::DatadirTimeline; use postgres_ffi::v14::pg_constants; use postgres_ffi::v14::xlog_utils::{generate_wal_segment, normalize_lsn, XLogFileName}; @@ -36,13 +36,12 @@ use utils::lsn::Lsn; /// This is short-living object only for the time of tarball creation, /// created mostly to avoid passing a lot of parameters between various functions /// used for constructing tarball. -pub struct Basebackup<'a, W, T> +pub struct Basebackup<'a, W> where W: Write, - T: DatadirTimeline, { ar: Builder>, - timeline: &'a Arc, + timeline: &'a Arc, pub lsn: Lsn, prev_record_lsn: Lsn, full_backup: bool, @@ -57,18 +56,17 @@ where // * When working without safekeepers. In this situation it is important to match the lsn // we are taking basebackup on with the lsn that is used in pageserver's walreceiver // to start the replication. -impl<'a, W, T> Basebackup<'a, W, T> +impl<'a, W> Basebackup<'a, W> where W: Write, - T: DatadirTimeline, { pub fn new( write: W, - timeline: &'a Arc, + timeline: &'a Arc, req_lsn: Option, prev_lsn: Option, full_backup: bool, - ) -> Result> { + ) -> Result> { // Compute postgres doesn't have any previous WAL files, but the first // record that it's going to write needs to include the LSN of the // previous record (xl_prev). We include prev_record_lsn in the @@ -404,10 +402,9 @@ where } } -impl<'a, W, T> Drop for Basebackup<'a, W, T> +impl<'a, W> Drop for Basebackup<'a, W> where W: Write, - T: DatadirTimeline, { /// If the basebackup was not finished, prevent the Archive::drop() from /// writing the end-of-archive marker. diff --git a/pageserver/src/http/routes.rs b/pageserver/src/http/routes.rs index 1d0adec63d..8d300e554a 100644 --- a/pageserver/src/http/routes.rs +++ b/pageserver/src/http/routes.rs @@ -11,10 +11,9 @@ use super::models::{ StatusResponse, TenantConfigRequest, TenantCreateRequest, TenantCreateResponse, TenantInfo, TimelineCreateRequest, }; -use crate::layered_repository::{metadata::TimelineMetadata, LayeredTimeline}; -use crate::pgdatadir_mapping::DatadirTimeline; +use crate::layered_repository::{metadata::TimelineMetadata, Timeline}; +use crate::repository::Repository; use crate::repository::{LocalTimelineState, RepositoryTimeline}; -use crate::repository::{Repository, Timeline}; use crate::storage_sync; use crate::storage_sync::index::{RemoteIndex, RemoteTimeline}; use crate::tenant_config::TenantConfOpt; @@ -85,7 +84,7 @@ fn get_config(request: &Request) -> &'static PageServerConf { // Helper functions to construct a LocalTimelineInfo struct for a timeline fn local_timeline_info_from_loaded_timeline( - timeline: &LayeredTimeline, + timeline: &Timeline, include_non_incremental_logical_size: bool, include_non_incremental_physical_size: bool, ) -> anyhow::Result { @@ -160,7 +159,7 @@ fn local_timeline_info_from_unloaded_timeline(metadata: &TimelineMetadata) -> Lo } fn local_timeline_info_from_repo_timeline( - repo_timeline: &RepositoryTimeline, + repo_timeline: &RepositoryTimeline, include_non_incremental_logical_size: bool, include_non_incremental_physical_size: bool, ) -> anyhow::Result { diff --git a/pageserver/src/import_datadir.rs b/pageserver/src/import_datadir.rs index 729829c5e8..54e791e5b5 100644 --- a/pageserver/src/import_datadir.rs +++ b/pageserver/src/import_datadir.rs @@ -11,6 +11,7 @@ use bytes::Bytes; use tracing::*; use walkdir::WalkDir; +use crate::layered_repository::Timeline; use crate::pgdatadir_mapping::*; use crate::reltag::{RelTag, SlruKind}; use crate::walingest::WalIngest; @@ -39,9 +40,9 @@ pub fn get_lsn_from_controlfile(path: &Path) -> Result { /// This is currently only used to import a cluster freshly created by initdb. /// The code that deals with the checkpoint would not work right if the /// cluster was not shut down cleanly. -pub fn import_timeline_from_postgres_datadir( +pub fn import_timeline_from_postgres_datadir( path: &Path, - tline: &T, + tline: &Timeline, lsn: Lsn, ) -> Result<()> { let mut pg_control: Option = None; @@ -99,8 +100,8 @@ pub fn import_timeline_from_postgres_datadir( } // subroutine of import_timeline_from_postgres_datadir(), to load one relation file. -fn import_rel( - modification: &mut DatadirModification, +fn import_rel( + modification: &mut DatadirModification, path: &Path, spcoid: Oid, dboid: Oid, @@ -178,8 +179,8 @@ fn import_rel( /// Import an SLRU segment file /// -fn import_slru( - modification: &mut DatadirModification, +fn import_slru( + modification: &mut DatadirModification, slru: SlruKind, path: &Path, mut reader: Reader, @@ -234,12 +235,7 @@ fn import_slru( /// Scan PostgreSQL WAL files in given directory and load all records between /// 'startpoint' and 'endpoint' into the repository. -fn import_wal( - walpath: &Path, - tline: &T, - startpoint: Lsn, - endpoint: Lsn, -) -> Result<()> { +fn import_wal(walpath: &Path, tline: &Timeline, startpoint: Lsn, endpoint: Lsn) -> Result<()> { let mut waldecoder = WalStreamDecoder::new(startpoint); let mut segno = startpoint.segment_number(pg_constants::WAL_SEGMENT_SIZE); @@ -305,12 +301,12 @@ fn import_wal( Ok(()) } -pub fn import_basebackup_from_tar( - tline: &T, +pub fn import_basebackup_from_tar( + tline: &Timeline, reader: Reader, base_lsn: Lsn, ) -> Result<()> { - info!("importing base at {}", base_lsn); + info!("importing base at {base_lsn}"); let mut modification = tline.begin_modification(base_lsn); modification.init_empty()?; @@ -347,8 +343,8 @@ pub fn import_basebackup_from_tar( Ok(()) } -pub fn import_wal_from_tar( - tline: &T, +pub fn import_wal_from_tar( + tline: &Timeline, reader: Reader, start_lsn: Lsn, end_lsn: Lsn, @@ -428,8 +424,8 @@ pub fn import_wal_from_tar( Ok(()) } -pub fn import_file( - modification: &mut DatadirModification, +pub fn import_file( + modification: &mut DatadirModification, file_path: &Path, reader: Reader, len: usize, diff --git a/pageserver/src/layered_repository.rs b/pageserver/src/layered_repository.rs index 6bf2e71852..c0f4aece54 100644 --- a/pageserver/src/layered_repository.rs +++ b/pageserver/src/layered_repository.rs @@ -31,7 +31,7 @@ use crate::config::PageServerConf; use crate::storage_sync::index::RemoteIndex; use crate::tenant_config::{TenantConf, TenantConfOpt}; -use crate::repository::{GcResult, Repository, RepositoryTimeline, Timeline}; +use crate::repository::{GcResult, Repository, RepositoryTimeline}; use crate::thread_mgr; use crate::walredo::WalRedoManager; use crate::CheckpointConfig; @@ -61,7 +61,7 @@ mod timeline; use storage_layer::Layer; use timeline::LayeredTimelineEntry; -pub use timeline::LayeredTimeline; +pub use timeline::Timeline; // re-export this function so that page_cache.rs can use it. pub use crate::layered_repository::ephemeral_file::writeback as writeback_ephemeral_file; @@ -121,15 +121,13 @@ pub struct LayeredRepository { /// Public interface impl Repository for LayeredRepository { - type Timeline = LayeredTimeline; - - fn get_timeline(&self, timelineid: ZTimelineId) -> Option> { + fn get_timeline(&self, timelineid: ZTimelineId) -> Option> { let timelines = self.timelines.lock().unwrap(); self.get_timeline_internal(timelineid, &timelines) .map(RepositoryTimeline::from) } - fn get_timeline_load(&self, timelineid: ZTimelineId) -> Result> { + fn get_timeline_load(&self, timelineid: ZTimelineId) -> Result> { let mut timelines = self.timelines.lock().unwrap(); match self.get_timeline_load_internal(timelineid, &mut timelines)? { Some(local_loaded_timeline) => Ok(local_loaded_timeline), @@ -140,7 +138,7 @@ impl Repository for LayeredRepository { } } - fn list_timelines(&self) -> Vec<(ZTimelineId, RepositoryTimeline)> { + fn list_timelines(&self) -> Vec<(ZTimelineId, RepositoryTimeline)> { self.timelines .lock() .unwrap() @@ -158,7 +156,7 @@ impl Repository for LayeredRepository { &self, timeline_id: ZTimelineId, initdb_lsn: Lsn, - ) -> Result> { + ) -> Result> { let mut timelines = self.timelines.lock().unwrap(); let vacant_timeline_entry = match timelines.entry(timeline_id) { Entry::Occupied(_) => bail!("Timeline already exists"), @@ -176,7 +174,7 @@ impl Repository for LayeredRepository { let metadata = TimelineMetadata::new(Lsn(0), None, None, Lsn(0), initdb_lsn, initdb_lsn); timeline::save_metadata(self.conf, timeline_id, self.tenant_id, &metadata, true)?; - let timeline = LayeredTimeline::new( + let timeline = Timeline::new( self.conf, Arc::clone(&self.tenant_conf), metadata, @@ -539,7 +537,7 @@ impl LayeredRepository { &self, timelineid: ZTimelineId, timelines: &mut HashMap, - ) -> anyhow::Result>> { + ) -> anyhow::Result>> { match timelines.get(&timelineid) { Some(entry) => match entry { LayeredTimelineEntry::Loaded(local_timeline) => { @@ -574,7 +572,7 @@ impl LayeredRepository { &self, timeline_id: ZTimelineId, timelines: &mut HashMap, - ) -> anyhow::Result> { + ) -> anyhow::Result> { let metadata = load_metadata(self.conf, timeline_id, self.tenant_id) .context("failed to load metadata")?; let disk_consistent_lsn = metadata.disk_consistent_lsn(); @@ -591,7 +589,7 @@ impl LayeredRepository { .map(LayeredTimelineEntry::Loaded); let _enter = info_span!("loading local timeline").entered(); - let timeline = LayeredTimeline::new( + let timeline = Timeline::new( self.conf, Arc::clone(&self.tenant_conf), metadata, diff --git a/pageserver/src/layered_repository/timeline.rs b/pageserver/src/layered_repository/timeline.rs index 910fc9e9fc..da3a6981da 100644 --- a/pageserver/src/layered_repository/timeline.rs +++ b/pageserver/src/layered_repository/timeline.rs @@ -9,7 +9,7 @@ use once_cell::sync::Lazy; use tracing::*; use std::cmp::{max, min, Ordering}; -use std::collections::{hash_map::Entry, HashMap, HashSet}; +use std::collections::{HashMap, HashSet}; use std::fs; use std::fs::{File, OpenOptions}; use std::io::Write; @@ -43,7 +43,6 @@ use crate::pgdatadir_mapping::BlockNumber; use crate::pgdatadir_mapping::LsnForTimestamp; use crate::reltag::RelTag; use crate::tenant_config::TenantConfOpt; -use crate::DatadirTimeline; use postgres_ffi::v14::xlog_utils::to_pg_timestamp; use utils::{ @@ -52,7 +51,7 @@ use utils::{ zid::{ZTenantId, ZTimelineId}, }; -use crate::repository::{GcResult, RepositoryTimeline, Timeline, TimelineWriter}; +use crate::repository::{GcResult, RepositoryTimeline, TimelineWriter}; use crate::repository::{Key, Value}; use crate::thread_mgr; use crate::virtual_file::VirtualFile; @@ -160,7 +159,7 @@ static PERSISTENT_BYTES_WRITTEN: Lazy = Lazy::new(|| { #[derive(Clone)] pub enum LayeredTimelineEntry { - Loaded(Arc), + Loaded(Arc), Unloaded { id: ZTimelineId, metadata: TimelineMetadata, @@ -191,7 +190,7 @@ impl LayeredTimelineEntry { } } - fn ensure_loaded(&self) -> anyhow::Result<&Arc> { + fn ensure_loaded(&self) -> anyhow::Result<&Arc> { match self { LayeredTimelineEntry::Loaded(timeline) => Ok(timeline), LayeredTimelineEntry::Unloaded { .. } => { @@ -213,7 +212,7 @@ impl LayeredTimelineEntry { } } -impl From for RepositoryTimeline { +impl From for RepositoryTimeline { fn from(entry: LayeredTimelineEntry) -> Self { match entry { LayeredTimelineEntry::Loaded(timeline) => RepositoryTimeline::Loaded(timeline as _), @@ -288,7 +287,7 @@ impl TimelineMetrics { } } -pub struct LayeredTimeline { +pub struct Timeline { conf: &'static PageServerConf, tenant_conf: Arc>, @@ -385,7 +384,7 @@ pub struct LayeredTimeline { pub last_received_wal: Mutex>, /// Relation size cache - rel_size_cache: RwLock>, + pub rel_size_cache: RwLock>, } pub struct WalReceiverInfo { @@ -394,46 +393,6 @@ pub struct WalReceiverInfo { pub last_received_msg_ts: u128, } -/// Inherit all the functions from DatadirTimeline, to provide the -/// functionality to store PostgreSQL relations, SLRUs, etc. in a -/// LayeredTimeline. -impl DatadirTimeline for LayeredTimeline { - fn get_cached_rel_size(&self, tag: &RelTag, lsn: Lsn) -> Option { - let rel_size_cache = self.rel_size_cache.read().unwrap(); - if let Some((cached_lsn, nblocks)) = rel_size_cache.get(tag) { - if lsn >= *cached_lsn { - return Some(*nblocks); - } - } - None - } - - fn update_cached_rel_size(&self, tag: RelTag, lsn: Lsn, nblocks: BlockNumber) { - let mut rel_size_cache = self.rel_size_cache.write().unwrap(); - match rel_size_cache.entry(tag) { - Entry::Occupied(mut entry) => { - let cached_lsn = entry.get_mut(); - if lsn >= cached_lsn.0 { - *cached_lsn = (lsn, nblocks); - } - } - Entry::Vacant(entry) => { - entry.insert((lsn, nblocks)); - } - } - } - - fn set_cached_rel_size(&self, tag: RelTag, lsn: Lsn, nblocks: BlockNumber) { - let mut rel_size_cache = self.rel_size_cache.write().unwrap(); - rel_size_cache.insert(tag, (lsn, nblocks)); - } - - fn remove_cached_rel_size(&self, tag: &RelTag) { - let mut rel_size_cache = self.rel_size_cache.write().unwrap(); - rel_size_cache.remove(tag); - } -} - /// /// Information about how much history needs to be retained, needed by /// Garbage Collection. @@ -464,45 +423,37 @@ pub struct GcInfo { } /// Public interface functions -impl Timeline for LayeredTimeline { - fn get_ancestor_lsn(&self) -> Lsn { +impl Timeline { + //------------------------------------------------------------------------------ + // Public GET functions + //------------------------------------------------------------------------------ + + /// Get the LSN where this branch was created + pub fn get_ancestor_lsn(&self) -> Lsn { self.ancestor_lsn } - fn get_ancestor_timeline_id(&self) -> Option { + /// Get the ancestor's timeline id + pub fn get_ancestor_timeline_id(&self) -> Option { self.ancestor_timeline .as_ref() .map(LayeredTimelineEntry::timeline_id) } - /// Wait until WAL has been received up to the given LSN. - fn wait_lsn(&self, lsn: Lsn) -> anyhow::Result<()> { - // This should never be called from the WAL receiver thread, because that could lead - // to a deadlock. - ensure!( - !IS_WAL_RECEIVER.with(|c| c.get()), - "wait_lsn called by WAL receiver thread" - ); - - self.metrics.wait_lsn_time_histo.observe_closure_duration( - || self.last_record_lsn - .wait_for_timeout(lsn, self.conf.wait_lsn_timeout) - .with_context(|| { - format!( - "Timed out while waiting for WAL record at LSN {} to arrive, last_record_lsn {} disk consistent LSN={}", - lsn, self.get_last_record_lsn(), self.get_disk_consistent_lsn() - ) - }))?; - - Ok(()) - } - - fn get_latest_gc_cutoff_lsn(&self) -> RwLockReadGuard { + /// Lock and get timeline's GC cuttof + pub fn get_latest_gc_cutoff_lsn(&self) -> RwLockReadGuard { self.latest_gc_cutoff_lsn.read().unwrap() } - /// Look up the value with the given a key - fn get(&self, key: Key, lsn: Lsn) -> Result { + /// Look up given page version. + /// + /// NOTE: It is considered an error to 'get' a key that doesn't exist. The abstraction + /// above this needs to store suitable metadata to track what data exists with + /// what keys, in separate metadata entries. If a non-existent key is requested, + /// the Repository implementation may incorrectly return a value from an ancestor + /// branch, for example, or waste a lot of cycles chasing the non-existing key. + /// + pub fn get(&self, key: Key, lsn: Lsn) -> Result { // Check the page cache. We will get back the most recent page with lsn <= `lsn`. // The cached image can be returned directly if there is no WAL between the cached image // and requested LSN. The cached image can also be used to reduce the amount of WAL needed @@ -531,68 +482,31 @@ impl Timeline for LayeredTimeline { .observe_closure_duration(|| self.reconstruct_value(key, lsn, reconstruct_state)) } - /// Public entry point for checkpoint(). All the logic is in the private - /// checkpoint_internal function, this public facade just wraps it for - /// metrics collection. - fn checkpoint(&self, cconf: CheckpointConfig) -> anyhow::Result<()> { - match cconf { - CheckpointConfig::Flush => { - self.freeze_inmem_layer(false); - self.flush_frozen_layers(true) - } - CheckpointConfig::Forced => { - self.freeze_inmem_layer(false); - self.flush_frozen_layers(true)?; - self.compact() - } - } - } - - /// - /// Validate lsn against initdb_lsn and latest_gc_cutoff_lsn. - /// - fn check_lsn_is_in_scope( - &self, - lsn: Lsn, - latest_gc_cutoff_lsn: &RwLockReadGuard, - ) -> Result<()> { - ensure!( - lsn >= **latest_gc_cutoff_lsn, - "LSN {} is earlier than latest GC horizon {} (we might've already garbage collected needed data)", - lsn, - **latest_gc_cutoff_lsn, - ); - Ok(()) - } - - fn get_last_record_lsn(&self) -> Lsn { + /// Get last or prev record separately. Same as get_last_record_rlsn().last/prev. + pub fn get_last_record_lsn(&self) -> Lsn { self.last_record_lsn.load().last } - fn get_prev_record_lsn(&self) -> Lsn { + pub fn get_prev_record_lsn(&self) -> Lsn { self.last_record_lsn.load().prev } - fn get_last_record_rlsn(&self) -> RecordLsn { + /// Atomically get both last and prev. + pub fn get_last_record_rlsn(&self) -> RecordLsn { self.last_record_lsn.load() } - fn get_disk_consistent_lsn(&self) -> Lsn { + pub fn get_disk_consistent_lsn(&self) -> Lsn { self.disk_consistent_lsn.load() } - fn writer<'a>(&'a self) -> Box { - Box::new(LayeredTimelineWriter { - tl: self, - _write_guard: self.write_lock.lock().unwrap(), - }) - } - - fn get_physical_size(&self) -> u64 { + /// Get the physical size of the timeline at the latest LSN + pub fn get_physical_size(&self) -> u64 { self.metrics.current_physical_size_gauge.get() } - fn get_physical_size_non_incremental(&self) -> anyhow::Result { + /// Get the physical size of the timeline at the latest LSN non incrementally + pub fn get_physical_size_non_incremental(&self) -> anyhow::Result { let timeline_path = self.conf.timeline_path(&self.timeline_id, &self.tenant_id); // total size of layer files in the current timeline directory let mut total_physical_size = 0; @@ -611,9 +525,89 @@ impl Timeline for LayeredTimeline { Ok(total_physical_size) } + + /// + /// Wait until WAL has been received and processed up to this LSN. + /// + /// You should call this before any of the other get_* or list_* functions. Calling + /// those functions with an LSN that has been processed yet is an error. + /// + pub fn wait_lsn(&self, lsn: Lsn) -> anyhow::Result<()> { + // This should never be called from the WAL receiver thread, because that could lead + // to a deadlock. + ensure!( + !IS_WAL_RECEIVER.with(|c| c.get()), + "wait_lsn called by WAL receiver thread" + ); + + self.metrics.wait_lsn_time_histo.observe_closure_duration( + || self.last_record_lsn + .wait_for_timeout(lsn, self.conf.wait_lsn_timeout) + .with_context(|| { + format!( + "Timed out while waiting for WAL record at LSN {} to arrive, last_record_lsn {} disk consistent LSN={}", + lsn, self.get_last_record_lsn(), self.get_disk_consistent_lsn() + ) + }))?; + + Ok(()) + } + + /// Check that it is valid to request operations with that lsn. + pub fn check_lsn_is_in_scope( + &self, + lsn: Lsn, + latest_gc_cutoff_lsn: &RwLockReadGuard, + ) -> Result<()> { + ensure!( + lsn >= **latest_gc_cutoff_lsn, + "LSN {} is earlier than latest GC horizon {} (we might've already garbage collected needed data)", + lsn, + **latest_gc_cutoff_lsn, + ); + Ok(()) + } + + //------------------------------------------------------------------------------ + // Public PUT functions, to update the repository with new page versions. + // + // These are called by the WAL receiver to digest WAL records. + //------------------------------------------------------------------------------ + + /// Flush to disk all data that was written with the put_* functions + /// + /// NOTE: This has nothing to do with checkpoint in PostgreSQL. We don't + /// know anything about them here in the repository. + pub fn checkpoint(&self, cconf: CheckpointConfig) -> anyhow::Result<()> { + match cconf { + CheckpointConfig::Flush => { + self.freeze_inmem_layer(false); + self.flush_frozen_layers(true) + } + CheckpointConfig::Forced => { + self.freeze_inmem_layer(false); + self.flush_frozen_layers(true)?; + self.compact() + } + } + } + + /// Mutate the timeline with a [`TimelineWriter`]. + /// + /// FIXME: This ought to return &'a TimelineWriter, where TimelineWriter + /// is a generic type in this trait. But that doesn't currently work in + /// Rust: https://rust-lang.github.io/rfcs/1598-generic_associated_types.html + /// TODO kb replace with the concrete type + pub fn writer<'a>(&'a self) -> Box { + Box::new(LayeredTimelineWriter { + tl: self, + _write_guard: self.write_lock.lock().unwrap(), + }) + } } -impl LayeredTimeline { +// Private functions +impl Timeline { fn get_checkpoint_distance(&self) -> u64 { let tenant_conf = self.tenant_conf.read().unwrap(); tenant_conf @@ -662,8 +656,8 @@ impl LayeredTimeline { tenant_id: ZTenantId, walredo_mgr: Arc, upload_layers: bool, - ) -> LayeredTimeline { - let mut result = LayeredTimeline { + ) -> Timeline { + let mut result = Timeline { conf, tenant_conf, timeline_id, @@ -1014,7 +1008,7 @@ impl LayeredTimeline { Some((lsn, img)) } - fn get_ancestor_timeline(&self) -> Result> { + fn get_ancestor_timeline(&self) -> Result> { let ancestor = self .ancestor_timeline .as_ref() @@ -1135,7 +1129,7 @@ impl LayeredTimeline { /// Also flush after a period of time without new data -- it helps /// safekeepers to regard pageserver as caught up and suspend activity. /// - pub fn check_checkpoint_distance(self: &Arc) -> Result<()> { + pub fn check_checkpoint_distance(self: &Arc) -> Result<()> { let last_lsn = self.get_last_record_lsn(); let layers = self.layers.read().unwrap(); if let Some(open_layer) = &layers.open_layer { @@ -2211,12 +2205,12 @@ fn layer_traversal_error( } struct LayeredTimelineWriter<'a> { - tl: &'a LayeredTimeline, + tl: &'a Timeline, _write_guard: MutexGuard<'a, ()>, } impl Deref for LayeredTimelineWriter<'_> { - type Target = dyn Timeline; + type Target = Timeline; fn deref(&self) -> &Self::Target { self.tl diff --git a/pageserver/src/lib.rs b/pageserver/src/lib.rs index 47fd8a84cf..06c5f552a4 100644 --- a/pageserver/src/lib.rs +++ b/pageserver/src/lib.rs @@ -28,8 +28,6 @@ use tracing::info; use crate::thread_mgr::ThreadKind; use metrics::{register_int_gauge_vec, IntGaugeVec}; -use pgdatadir_mapping::DatadirTimeline; - /// Current storage format version /// /// This is embedded in the metadata file, and also in the header of all the diff --git a/pageserver/src/page_service.rs b/pageserver/src/page_service.rs index b63bb90be1..f5f1e4d7bd 100644 --- a/pageserver/src/page_service.rs +++ b/pageserver/src/page_service.rs @@ -30,11 +30,11 @@ use utils::{ use crate::basebackup; use crate::config::{PageServerConf, ProfilingConfig}; use crate::import_datadir::{import_basebackup_from_tar, import_wal_from_tar}; -use crate::pgdatadir_mapping::{DatadirTimeline, LsnForTimestamp}; +use crate::layered_repository::Timeline; +use crate::pgdatadir_mapping::LsnForTimestamp; use crate::profiling::profpoint_start; use crate::reltag::RelTag; use crate::repository::Repository; -use crate::repository::Timeline; use crate::tenant_mgr; use crate::thread_mgr; use crate::thread_mgr::ThreadKind; @@ -636,8 +636,8 @@ impl PageServerHandler { /// In either case, if the page server hasn't received the WAL up to the /// requested LSN yet, we will wait for it to arrive. The return value is /// the LSN that should be used to look up the page versions. - fn wait_or_get_last_lsn( - timeline: &T, + fn wait_or_get_last_lsn( + timeline: &Timeline, mut lsn: Lsn, latest: bool, latest_gc_cutoff_lsn: &RwLockReadGuard, @@ -684,9 +684,9 @@ impl PageServerHandler { Ok(lsn) } - fn handle_get_rel_exists_request( + fn handle_get_rel_exists_request( &self, - timeline: &T, + timeline: &Timeline, req: &PagestreamExistsRequest, ) -> Result { let _enter = info_span!("get_rel_exists", rel = %req.rel, req_lsn = %req.lsn).entered(); @@ -701,9 +701,9 @@ impl PageServerHandler { })) } - fn handle_get_nblocks_request( + fn handle_get_nblocks_request( &self, - timeline: &T, + timeline: &Timeline, req: &PagestreamNblocksRequest, ) -> Result { let _enter = info_span!("get_nblocks", rel = %req.rel, req_lsn = %req.lsn).entered(); @@ -717,9 +717,9 @@ impl PageServerHandler { })) } - fn handle_db_size_request( + fn handle_db_size_request( &self, - timeline: &T, + timeline: &Timeline, req: &PagestreamDbSizeRequest, ) -> Result { let _enter = info_span!("get_db_size", dbnode = %req.dbnode, req_lsn = %req.lsn).entered(); @@ -735,9 +735,9 @@ impl PageServerHandler { })) } - fn handle_get_page_at_lsn_request( + fn handle_get_page_at_lsn_request( &self, - timeline: &T, + timeline: &Timeline, req: &PagestreamGetPageRequest, ) -> Result { let _enter = info_span!("get_page", rel = %req.rel, blkno = &req.blkno, req_lsn = %req.lsn) diff --git a/pageserver/src/pgdatadir_mapping.rs b/pageserver/src/pgdatadir_mapping.rs index 88fac0ad5a..d10e48393c 100644 --- a/pageserver/src/pgdatadir_mapping.rs +++ b/pageserver/src/pgdatadir_mapping.rs @@ -7,8 +7,8 @@ //! Clarify that) //! use crate::keyspace::{KeySpace, KeySpaceAccum}; +use crate::layered_repository::Timeline; use crate::reltag::{RelTag, SlruKind}; -use crate::repository::Timeline; use crate::repository::*; use crate::walrecord::ZenithWalRecord; use anyhow::{bail, ensure, Result}; @@ -18,7 +18,7 @@ use postgres_ffi::v14::xlog_utils::TimestampTz; use postgres_ffi::BLCKSZ; use postgres_ffi::{Oid, TransactionId}; use serde::{Deserialize, Serialize}; -use std::collections::{HashMap, HashSet}; +use std::collections::{hash_map, HashMap, HashSet}; use std::ops::Range; use tracing::{debug, trace, warn}; use utils::{bin_ser::BeSer, lsn::Lsn}; @@ -35,23 +35,13 @@ pub enum LsnForTimestamp { } /// -/// This trait provides all the functionality to store PostgreSQL relations, SLRUs, +/// This impl provides all the functionality to store PostgreSQL relations, SLRUs, /// and other special kinds of files, in a versioned key-value store. The -/// Timeline trait provides the key-value store. +/// Timeline struct provides the key-value store. /// -/// This is a trait, so that we can easily include all these functions in a Timeline -/// implementation. You're not expected to have different implementations of this trait, -/// rather, this provides an interface and implementation, over Timeline. -/// -/// If you wanted to store other kinds of data in the Neon repository, e.g. -/// flat files or MySQL, you would create a new trait like this, with all the -/// functions that make sense for the kind of data you're storing. For flat files, -/// for example, you might have a function like "fn read(path, offset, size)". -/// We might also have that situation in the future, to support multiple PostgreSQL -/// versions, if there are big changes in how the data is organized in the data -/// directory, or if new special files are introduced. -/// -pub trait DatadirTimeline: Timeline { +/// This is a separate impl, so that we can easily include all these functions in a Timeline +/// implementation, and might be moved into a separate struct later. +impl Timeline { /// Start ingesting a WAL record, or other atomic modification of /// the timeline. /// @@ -75,7 +65,7 @@ pub trait DatadirTimeline: Timeline { /// functions of the timeline until you finish! And if you update the /// same page twice, the last update wins. /// - fn begin_modification(&self, lsn: Lsn) -> DatadirModification + pub fn begin_modification(&self, lsn: Lsn) -> DatadirModification where Self: Sized, { @@ -93,7 +83,7 @@ pub trait DatadirTimeline: Timeline { //------------------------------------------------------------------------------ /// Look up given page version. - fn get_rel_page_at_lsn(&self, tag: RelTag, blknum: BlockNumber, lsn: Lsn) -> Result { + pub fn get_rel_page_at_lsn(&self, tag: RelTag, blknum: BlockNumber, lsn: Lsn) -> Result { ensure!(tag.relnode != 0, "invalid relnode"); let nblocks = self.get_rel_size(tag, lsn)?; @@ -110,7 +100,7 @@ pub trait DatadirTimeline: Timeline { } // Get size of a database in blocks - fn get_db_size(&self, spcnode: Oid, dbnode: Oid, lsn: Lsn) -> Result { + pub fn get_db_size(&self, spcnode: Oid, dbnode: Oid, lsn: Lsn) -> Result { let mut total_blocks = 0; let rels = self.list_rels(spcnode, dbnode, lsn)?; @@ -123,7 +113,7 @@ pub trait DatadirTimeline: Timeline { } /// Get size of a relation file - fn get_rel_size(&self, tag: RelTag, lsn: Lsn) -> Result { + pub fn get_rel_size(&self, tag: RelTag, lsn: Lsn) -> Result { ensure!(tag.relnode != 0, "invalid relnode"); if let Some(nblocks) = self.get_cached_rel_size(&tag, lsn) { @@ -151,7 +141,7 @@ pub trait DatadirTimeline: Timeline { } /// Does relation exist? - fn get_rel_exists(&self, tag: RelTag, lsn: Lsn) -> Result { + pub fn get_rel_exists(&self, tag: RelTag, lsn: Lsn) -> Result { ensure!(tag.relnode != 0, "invalid relnode"); // first try to lookup relation in cache @@ -169,7 +159,7 @@ pub trait DatadirTimeline: Timeline { } /// Get a list of all existing relations in given tablespace and database. - fn list_rels(&self, spcnode: Oid, dbnode: Oid, lsn: Lsn) -> Result> { + pub fn list_rels(&self, spcnode: Oid, dbnode: Oid, lsn: Lsn) -> Result> { // fetch directory listing let key = rel_dir_to_key(spcnode, dbnode); let buf = self.get(key, lsn)?; @@ -187,7 +177,7 @@ pub trait DatadirTimeline: Timeline { } /// Look up given SLRU page version. - fn get_slru_page_at_lsn( + pub fn get_slru_page_at_lsn( &self, kind: SlruKind, segno: u32, @@ -199,14 +189,19 @@ pub trait DatadirTimeline: Timeline { } /// Get size of an SLRU segment - fn get_slru_segment_size(&self, kind: SlruKind, segno: u32, lsn: Lsn) -> Result { + pub fn get_slru_segment_size( + &self, + kind: SlruKind, + segno: u32, + lsn: Lsn, + ) -> Result { let key = slru_segment_size_to_key(kind, segno); let mut buf = self.get(key, lsn)?; Ok(buf.get_u32_le()) } /// Get size of an SLRU segment - fn get_slru_segment_exists(&self, kind: SlruKind, segno: u32, lsn: Lsn) -> Result { + pub fn get_slru_segment_exists(&self, kind: SlruKind, segno: u32, lsn: Lsn) -> Result { // fetch directory listing let key = slru_dir_to_key(kind); let buf = self.get(key, lsn)?; @@ -223,7 +218,7 @@ pub trait DatadirTimeline: Timeline { /// so it's not well defined which LSN you get if there were multiple commits /// "in flight" at that point in time. /// - fn find_lsn_for_timestamp(&self, search_timestamp: TimestampTz) -> Result { + pub fn find_lsn_for_timestamp(&self, search_timestamp: TimestampTz) -> Result { let gc_cutoff_lsn_guard = self.get_latest_gc_cutoff_lsn(); let min_lsn = *gc_cutoff_lsn_guard; let max_lsn = self.get_last_record_lsn(); @@ -286,7 +281,7 @@ pub trait DatadirTimeline: Timeline { /// Additionally, sets 'found_smaller'/'found_Larger, if encounters any commits /// with a smaller/larger timestamp. /// - fn is_latest_commit_timestamp_ge_than( + pub fn is_latest_commit_timestamp_ge_than( &self, search_timestamp: TimestampTz, probe_lsn: Lsn, @@ -317,7 +312,7 @@ pub trait DatadirTimeline: Timeline { } /// Get a list of SLRU segments - fn list_slru_segments(&self, kind: SlruKind, lsn: Lsn) -> Result> { + pub fn list_slru_segments(&self, kind: SlruKind, lsn: Lsn) -> Result> { // fetch directory entry let key = slru_dir_to_key(kind); @@ -327,14 +322,14 @@ pub trait DatadirTimeline: Timeline { Ok(dir.segments) } - fn get_relmap_file(&self, spcnode: Oid, dbnode: Oid, lsn: Lsn) -> Result { + pub fn get_relmap_file(&self, spcnode: Oid, dbnode: Oid, lsn: Lsn) -> Result { let key = relmap_file_key(spcnode, dbnode); let buf = self.get(key, lsn)?; Ok(buf) } - fn list_dbdirs(&self, lsn: Lsn) -> Result> { + pub fn list_dbdirs(&self, lsn: Lsn) -> Result> { // fetch directory entry let buf = self.get(DBDIR_KEY, lsn)?; let dir = DbDirectory::des(&buf)?; @@ -342,13 +337,13 @@ pub trait DatadirTimeline: Timeline { Ok(dir.dbdirs) } - fn get_twophase_file(&self, xid: TransactionId, lsn: Lsn) -> Result { + pub fn get_twophase_file(&self, xid: TransactionId, lsn: Lsn) -> Result { let key = twophase_file_key(xid); let buf = self.get(key, lsn)?; Ok(buf) } - fn list_twophase_files(&self, lsn: Lsn) -> Result> { + pub fn list_twophase_files(&self, lsn: Lsn) -> Result> { // fetch directory entry let buf = self.get(TWOPHASEDIR_KEY, lsn)?; let dir = TwoPhaseDirectory::des(&buf)?; @@ -356,11 +351,11 @@ pub trait DatadirTimeline: Timeline { Ok(dir.xids) } - fn get_control_file(&self, lsn: Lsn) -> Result { + pub fn get_control_file(&self, lsn: Lsn) -> Result { self.get(CONTROLFILE_KEY, lsn) } - fn get_checkpoint(&self, lsn: Lsn) -> Result { + pub fn get_checkpoint(&self, lsn: Lsn) -> Result { self.get(CHECKPOINT_KEY, lsn) } @@ -369,7 +364,7 @@ pub trait DatadirTimeline: Timeline { /// /// Only relation blocks are counted currently. That excludes metadata, /// SLRUs, twophase files etc. - fn get_current_logical_size_non_incremental(&self, lsn: Lsn) -> Result { + pub fn get_current_logical_size_non_incremental(&self, lsn: Lsn) -> Result { // Fetch list of database dirs and iterate them let buf = self.get(DBDIR_KEY, lsn)?; let dbdir = DbDirectory::des(&buf)?; @@ -391,7 +386,7 @@ pub trait DatadirTimeline: Timeline { /// Get a KeySpace that covers all the Keys that are in use at the given LSN. /// Anything that's not listed maybe removed from the underlying storage (from /// that LSN forwards). - fn collect_keyspace(&self, lsn: Lsn) -> Result { + pub fn collect_keyspace(&self, lsn: Lsn) -> Result { // Iterate through key ranges, greedily packing them into partitions let mut result = KeySpaceAccum::new(); @@ -465,27 +460,54 @@ pub trait DatadirTimeline: Timeline { } /// Get cached size of relation if it not updated after specified LSN - fn get_cached_rel_size(&self, tag: &RelTag, lsn: Lsn) -> Option; + pub fn get_cached_rel_size(&self, tag: &RelTag, lsn: Lsn) -> Option { + let rel_size_cache = self.rel_size_cache.read().unwrap(); + if let Some((cached_lsn, nblocks)) = rel_size_cache.get(tag) { + if lsn >= *cached_lsn { + return Some(*nblocks); + } + } + None + } /// Update cached relation size if there is no more recent update - fn update_cached_rel_size(&self, tag: RelTag, lsn: Lsn, nblocks: BlockNumber); + pub fn update_cached_rel_size(&self, tag: RelTag, lsn: Lsn, nblocks: BlockNumber) { + let mut rel_size_cache = self.rel_size_cache.write().unwrap(); + match rel_size_cache.entry(tag) { + hash_map::Entry::Occupied(mut entry) => { + let cached_lsn = entry.get_mut(); + if lsn >= cached_lsn.0 { + *cached_lsn = (lsn, nblocks); + } + } + hash_map::Entry::Vacant(entry) => { + entry.insert((lsn, nblocks)); + } + } + } /// Store cached relation size - fn set_cached_rel_size(&self, tag: RelTag, lsn: Lsn, nblocks: BlockNumber); + pub fn set_cached_rel_size(&self, tag: RelTag, lsn: Lsn, nblocks: BlockNumber) { + let mut rel_size_cache = self.rel_size_cache.write().unwrap(); + rel_size_cache.insert(tag, (lsn, nblocks)); + } /// Remove cached relation size - fn remove_cached_rel_size(&self, tag: &RelTag); + pub fn remove_cached_rel_size(&self, tag: &RelTag) { + let mut rel_size_cache = self.rel_size_cache.write().unwrap(); + rel_size_cache.remove(tag); + } } /// DatadirModification represents an operation to ingest an atomic set of /// updates to the repository. It is created by the 'begin_record' /// function. It is called for each WAL record, so that all the modifications /// by a one WAL record appear atomic. -pub struct DatadirModification<'a, T: DatadirTimeline> { +pub struct DatadirModification<'a> { /// The timeline this modification applies to. You can access this to /// read the state, but note that any pending updates are *not* reflected /// in the state in 'tline' yet. - pub tline: &'a T, + pub tline: &'a Timeline, /// Lsn assigned by begin_modification pub lsn: Lsn, @@ -498,7 +520,7 @@ pub struct DatadirModification<'a, T: DatadirTimeline> { pending_nblocks: isize, } -impl<'a, T: DatadirTimeline> DatadirModification<'a, T> { +impl<'a> DatadirModification<'a> { /// Initialize a completely new repository. /// /// This inserts the directory metadata entries that are assumed to @@ -1371,7 +1393,7 @@ fn is_slru_block_key(key: Key) -> bool { pub fn create_test_timeline( repo: R, timeline_id: utils::zid::ZTimelineId, -) -> Result> { +) -> Result> { let tline = repo.create_empty_timeline(timeline_id, Lsn(8))?; let mut m = tline.begin_modification(Lsn(8)); m.init_empty()?; diff --git a/pageserver/src/repository.rs b/pageserver/src/repository.rs index d09b01437c..5cdc27a846 100644 --- a/pageserver/src/repository.rs +++ b/pageserver/src/repository.rs @@ -1,19 +1,16 @@ use crate::layered_repository::metadata::TimelineMetadata; +use crate::layered_repository::Timeline; use crate::storage_sync::index::RemoteIndex; use crate::walrecord::ZenithWalRecord; -use crate::CheckpointConfig; use anyhow::{bail, Result}; use byteorder::{ByteOrder, BE}; use bytes::Bytes; use serde::{Deserialize, Serialize}; use std::fmt; use std::ops::{AddAssign, Range}; -use std::sync::{Arc, RwLockReadGuard}; +use std::sync::Arc; use std::time::Duration; -use utils::{ - lsn::{Lsn, RecordLsn}, - zid::ZTimelineId, -}; +use utils::{lsn::Lsn, zid::ZTimelineId}; #[derive(Debug, Clone, Copy, Hash, PartialEq, Eq, Ord, PartialOrd, Serialize, Deserialize)] /// Key used in the Repository kv-store. @@ -185,22 +182,20 @@ impl Value { /// A repository corresponds to one .neon directory. One repository holds multiple /// timelines, forked off from the same initial call to 'initdb'. pub trait Repository: Send + Sync { - type Timeline: crate::DatadirTimeline; - /// Updates timeline based on the `TimelineSyncStatusUpdate`, received from the remote storage synchronization. /// See [`crate::remote_storage`] for more details about the synchronization. fn attach_timeline(&self, timeline_id: ZTimelineId) -> Result<()>; /// Get Timeline handle for given zenith timeline ID. /// This function is idempotent. It doesn't change internal state in any way. - fn get_timeline(&self, timelineid: ZTimelineId) -> Option>; + fn get_timeline(&self, timelineid: ZTimelineId) -> Option>; /// Get Timeline handle for locally available timeline. Load it into memory if it is not loaded. - fn get_timeline_load(&self, timelineid: ZTimelineId) -> Result>; + fn get_timeline_load(&self, timelineid: ZTimelineId) -> Result>; /// Lists timelines the repository contains. /// Up to repository's implementation to omit certain timelines that ar not considered ready for use. - fn list_timelines(&self) -> Vec<(ZTimelineId, RepositoryTimeline)>; + fn list_timelines(&self) -> Vec<(ZTimelineId, RepositoryTimeline)>; /// Create a new, empty timeline. The caller is responsible for loading data into it /// Initdb lsn is provided for timeline impl to be able to perform checks for some operations against it. @@ -208,7 +203,7 @@ pub trait Repository: Send + Sync { &self, timeline_id: ZTimelineId, initdb_lsn: Lsn, - ) -> Result>; + ) -> Result>; /// Branch a timeline fn branch_timeline( @@ -305,81 +300,6 @@ impl AddAssign for GcResult { } } -pub trait Timeline: Send + Sync { - //------------------------------------------------------------------------------ - // Public GET functions - //------------------------------------------------------------------------------ - - /// - /// Wait until WAL has been received and processed up to this LSN. - /// - /// You should call this before any of the other get_* or list_* functions. Calling - /// those functions with an LSN that has been processed yet is an error. - /// - fn wait_lsn(&self, lsn: Lsn) -> Result<()>; - - /// Lock and get timeline's GC cuttof - fn get_latest_gc_cutoff_lsn(&self) -> RwLockReadGuard; - - /// Look up given page version. - /// - /// NOTE: It is considered an error to 'get' a key that doesn't exist. The abstraction - /// above this needs to store suitable metadata to track what data exists with - /// what keys, in separate metadata entries. If a non-existent key is requested, - /// the Repository implementation may incorrectly return a value from an ancestor - /// branch, for example, or waste a lot of cycles chasing the non-existing key. - /// - fn get(&self, key: Key, lsn: Lsn) -> Result; - - /// Get the ancestor's timeline id - fn get_ancestor_timeline_id(&self) -> Option; - - /// Get the LSN where this branch was created - fn get_ancestor_lsn(&self) -> Lsn; - - //------------------------------------------------------------------------------ - // Public PUT functions, to update the repository with new page versions. - // - // These are called by the WAL receiver to digest WAL records. - //------------------------------------------------------------------------------ - /// Atomically get both last and prev. - fn get_last_record_rlsn(&self) -> RecordLsn; - - /// Get last or prev record separately. Same as get_last_record_rlsn().last/prev. - fn get_last_record_lsn(&self) -> Lsn; - - fn get_prev_record_lsn(&self) -> Lsn; - - fn get_disk_consistent_lsn(&self) -> Lsn; - - /// Mutate the timeline with a [`TimelineWriter`]. - /// - /// FIXME: This ought to return &'a TimelineWriter, where TimelineWriter - /// is a generic type in this trait. But that doesn't currently work in - /// Rust: https://rust-lang.github.io/rfcs/1598-generic_associated_types.html - fn writer<'a>(&'a self) -> Box; - - /// - /// Flush to disk all data that was written with the put_* functions - /// - /// NOTE: This has nothing to do with checkpoint in PostgreSQL. We don't - /// know anything about them here in the repository. - fn checkpoint(&self, cconf: CheckpointConfig) -> Result<()>; - - /// - /// Check that it is valid to request operations with that lsn. - fn check_lsn_is_in_scope( - &self, - lsn: Lsn, - latest_gc_cutoff_lsn: &RwLockReadGuard, - ) -> Result<()>; - - /// Get the physical size of the timeline at the latest LSN - fn get_physical_size(&self) -> u64; - /// Get the physical size of the timeline at the latest LSN non incrementally - fn get_physical_size_non_incremental(&self) -> Result; -} - /// Various functions to mutate the timeline. // TODO Currently, Deref is used to allow easy access to read methods from this trait. // This is probably considered a bad practice in Rust and should be fixed eventually, @@ -581,6 +501,9 @@ pub mod repo_harness { #[allow(clippy::bool_assert_comparison)] #[cfg(test)] mod tests { + use crate::layered_repository::Timeline; + use crate::CheckpointConfig; + use super::repo_harness::*; use super::*; //use postgres_ffi::{pg_constants, xlog_utils::SIZEOF_CHECKPOINT}; @@ -689,7 +612,7 @@ mod tests { Ok(()) } - fn make_some_layers(tline: &T, start_lsn: Lsn) -> Result<()> { + fn make_some_layers(tline: &Timeline, start_lsn: Lsn) -> Result<()> { let mut lsn = start_lsn; #[allow(non_snake_case)] { diff --git a/pageserver/src/tenant_mgr.rs b/pageserver/src/tenant_mgr.rs index 64f1caa542..36c3e569a6 100644 --- a/pageserver/src/tenant_mgr.rs +++ b/pageserver/src/tenant_mgr.rs @@ -3,7 +3,7 @@ use crate::config::PageServerConf; use crate::http::models::TenantInfo; -use crate::layered_repository::{load_metadata, LayeredRepository, LayeredTimeline}; +use crate::layered_repository::{load_metadata, LayeredRepository, Timeline}; use crate::repository::Repository; use crate::storage_sync::index::{RemoteIndex, RemoteTimelineIndex}; use crate::storage_sync::{self, LocalTimelineInitStatus, SyncStartupData}; @@ -100,7 +100,7 @@ struct Tenant { /// /// Local timelines have more metadata that's loaded into memory, /// that is located in the `repo.timelines` field, [`crate::layered_repository::LayeredTimelineEntry`]. - local_timelines: HashMap>, + local_timelines: HashMap>, } #[derive(Debug, Serialize, Deserialize, Clone, Copy, PartialEq, Eq)] @@ -177,7 +177,7 @@ pub enum LocalTimelineUpdate { }, Attach { id: ZTenantTimelineId, - datadir: Arc, + datadir: Arc, }, } @@ -379,7 +379,7 @@ pub fn get_repository_for_tenant(tenant_id: ZTenantId) -> anyhow::Result anyhow::Result> { +) -> anyhow::Result> { let mut m = tenants_state::write_tenants(); let tenant = m .get_mut(&tenant_id) @@ -486,7 +486,7 @@ pub fn detach_tenant(conf: &'static PageServerConf, tenant_id: ZTenantId) -> any fn load_local_timeline( repo: &LayeredRepository, timeline_id: ZTimelineId, -) -> anyhow::Result> { +) -> anyhow::Result> { let inmem_timeline = repo.get_timeline_load(timeline_id).with_context(|| { format!("Inmem timeline {timeline_id} not found in tenant's repository") })?; diff --git a/pageserver/src/timelines.rs b/pageserver/src/timelines.rs index 0d35195691..6a55dd286e 100644 --- a/pageserver/src/timelines.rs +++ b/pageserver/src/timelines.rs @@ -20,15 +20,15 @@ use utils::{ use crate::import_datadir; use crate::tenant_mgr; +use crate::CheckpointConfig; use crate::{ config::PageServerConf, repository::Repository, storage_sync::index::RemoteIndex, tenant_config::TenantConfOpt, }; use crate::{ - layered_repository::{LayeredRepository, LayeredTimeline}, + layered_repository::{LayeredRepository, Timeline}, walredo::WalRedoManager, }; -use crate::{repository::Timeline, CheckpointConfig}; #[derive(Debug, Clone, Copy)] pub struct PointInTime { @@ -160,7 +160,7 @@ pub(crate) fn create_timeline( new_timeline_id: Option, ancestor_timeline_id: Option, mut ancestor_start_lsn: Option, -) -> Result)>> { +) -> Result)>> { let new_timeline_id = new_timeline_id.unwrap_or_else(ZTimelineId::generate); let repo = tenant_mgr::get_repository_for_tenant(tenant_id)?; diff --git a/pageserver/src/walingest.rs b/pageserver/src/walingest.rs index 05afe4ba3e..c24ffc49de 100644 --- a/pageserver/src/walingest.rs +++ b/pageserver/src/walingest.rs @@ -30,6 +30,7 @@ use anyhow::Result; use bytes::{Buf, Bytes, BytesMut}; use tracing::*; +use crate::layered_repository::Timeline; use crate::pgdatadir_mapping::*; use crate::reltag::{RelTag, SlruKind}; use crate::walrecord::*; @@ -43,15 +44,15 @@ use utils::lsn::Lsn; static ZERO_PAGE: Bytes = Bytes::from_static(&[0u8; 8192]); -pub struct WalIngest<'a, T: DatadirTimeline> { - timeline: &'a T, +pub struct WalIngest<'a> { + timeline: &'a Timeline, checkpoint: CheckPoint, checkpoint_modified: bool, } -impl<'a, T: DatadirTimeline> WalIngest<'a, T> { - pub fn new(timeline: &T, startpoint: Lsn) -> Result> { +impl<'a> WalIngest<'a> { + pub fn new(timeline: &Timeline, startpoint: Lsn) -> Result { // Fetch the latest checkpoint into memory, so that we can compare with it // quickly in `ingest_record` and update it when it changes. let checkpoint_bytes = timeline.get_checkpoint(startpoint)?; @@ -77,7 +78,7 @@ impl<'a, T: DatadirTimeline> WalIngest<'a, T> { &mut self, recdata: Bytes, lsn: Lsn, - modification: &mut DatadirModification, + modification: &mut DatadirModification, decoded: &mut DecodedWALRecord, ) -> Result<()> { modification.lsn = lsn; @@ -266,7 +267,7 @@ impl<'a, T: DatadirTimeline> WalIngest<'a, T> { fn ingest_decoded_block( &mut self, - modification: &mut DatadirModification, + modification: &mut DatadirModification, lsn: Lsn, decoded: &DecodedWALRecord, blk: &DecodedBkpBlock, @@ -326,7 +327,7 @@ impl<'a, T: DatadirTimeline> WalIngest<'a, T> { fn ingest_heapam_record( &mut self, buf: &mut Bytes, - modification: &mut DatadirModification, + modification: &mut DatadirModification, decoded: &mut DecodedWALRecord, ) -> Result<()> { // Handle VM bit updates that are implicitly part of heap records. @@ -470,7 +471,7 @@ impl<'a, T: DatadirTimeline> WalIngest<'a, T> { /// Subroutine of ingest_record(), to handle an XLOG_DBASE_CREATE record. fn ingest_xlog_dbase_create( &mut self, - modification: &mut DatadirModification, + modification: &mut DatadirModification, rec: &XlCreateDatabase, ) -> Result<()> { let db_id = rec.db_id; @@ -537,7 +538,7 @@ impl<'a, T: DatadirTimeline> WalIngest<'a, T> { fn ingest_xlog_smgr_create( &mut self, - modification: &mut DatadirModification, + modification: &mut DatadirModification, rec: &XlSmgrCreate, ) -> Result<()> { let rel = RelTag { @@ -555,7 +556,7 @@ impl<'a, T: DatadirTimeline> WalIngest<'a, T> { /// This is the same logic as in PostgreSQL's smgr_redo() function. fn ingest_xlog_smgr_truncate( &mut self, - modification: &mut DatadirModification, + modification: &mut DatadirModification, rec: &XlSmgrTruncate, ) -> Result<()> { let spcnode = rec.rnode.spcnode; @@ -620,7 +621,7 @@ impl<'a, T: DatadirTimeline> WalIngest<'a, T> { /// fn ingest_xact_record( &mut self, - modification: &mut DatadirModification, + modification: &mut DatadirModification, parsed: &XlXactParsedRecord, is_commit: bool, ) -> Result<()> { @@ -689,7 +690,7 @@ impl<'a, T: DatadirTimeline> WalIngest<'a, T> { fn ingest_clog_truncate_record( &mut self, - modification: &mut DatadirModification, + modification: &mut DatadirModification, xlrec: &XlClogTruncate, ) -> Result<()> { info!( @@ -747,7 +748,7 @@ impl<'a, T: DatadirTimeline> WalIngest<'a, T> { fn ingest_multixact_create_record( &mut self, - modification: &mut DatadirModification, + modification: &mut DatadirModification, xlrec: &XlMultiXactCreate, ) -> Result<()> { // Create WAL record for updating the multixact-offsets page @@ -826,7 +827,7 @@ impl<'a, T: DatadirTimeline> WalIngest<'a, T> { fn ingest_multixact_truncate_record( &mut self, - modification: &mut DatadirModification, + modification: &mut DatadirModification, xlrec: &XlMultiXactTruncate, ) -> Result<()> { self.checkpoint.oldestMulti = xlrec.end_trunc_off; @@ -860,7 +861,7 @@ impl<'a, T: DatadirTimeline> WalIngest<'a, T> { fn ingest_relmap_page( &mut self, - modification: &mut DatadirModification, + modification: &mut DatadirModification, xlrec: &XlRelmapUpdate, decoded: &DecodedWALRecord, ) -> Result<()> { @@ -876,7 +877,7 @@ impl<'a, T: DatadirTimeline> WalIngest<'a, T> { fn put_rel_creation( &mut self, - modification: &mut DatadirModification, + modification: &mut DatadirModification, rel: RelTag, ) -> Result<()> { modification.put_rel_creation(rel, 0)?; @@ -885,7 +886,7 @@ impl<'a, T: DatadirTimeline> WalIngest<'a, T> { fn put_rel_page_image( &mut self, - modification: &mut DatadirModification, + modification: &mut DatadirModification, rel: RelTag, blknum: BlockNumber, img: Bytes, @@ -897,7 +898,7 @@ impl<'a, T: DatadirTimeline> WalIngest<'a, T> { fn put_rel_wal_record( &mut self, - modification: &mut DatadirModification, + modification: &mut DatadirModification, rel: RelTag, blknum: BlockNumber, rec: ZenithWalRecord, @@ -909,7 +910,7 @@ impl<'a, T: DatadirTimeline> WalIngest<'a, T> { fn put_rel_truncation( &mut self, - modification: &mut DatadirModification, + modification: &mut DatadirModification, rel: RelTag, nblocks: BlockNumber, ) -> Result<()> { @@ -917,11 +918,7 @@ impl<'a, T: DatadirTimeline> WalIngest<'a, T> { Ok(()) } - fn put_rel_drop( - &mut self, - modification: &mut DatadirModification, - rel: RelTag, - ) -> Result<()> { + fn put_rel_drop(&mut self, modification: &mut DatadirModification, rel: RelTag) -> Result<()> { modification.put_rel_drop(rel)?; Ok(()) } @@ -937,7 +934,7 @@ impl<'a, T: DatadirTimeline> WalIngest<'a, T> { fn handle_rel_extend( &mut self, - modification: &mut DatadirModification, + modification: &mut DatadirModification, rel: RelTag, blknum: BlockNumber, ) -> Result<()> { @@ -968,7 +965,7 @@ impl<'a, T: DatadirTimeline> WalIngest<'a, T> { fn put_slru_page_image( &mut self, - modification: &mut DatadirModification, + modification: &mut DatadirModification, kind: SlruKind, segno: u32, blknum: BlockNumber, @@ -981,7 +978,7 @@ impl<'a, T: DatadirTimeline> WalIngest<'a, T> { fn handle_slru_extend( &mut self, - modification: &mut DatadirModification, + modification: &mut DatadirModification, kind: SlruKind, segno: u32, blknum: BlockNumber, @@ -1032,9 +1029,9 @@ impl<'a, T: DatadirTimeline> WalIngest<'a, T> { #[cfg(test)] mod tests { use super::*; + use crate::layered_repository::Timeline; use crate::pgdatadir_mapping::create_test_timeline; use crate::repository::repo_harness::*; - use crate::repository::Timeline; use postgres_ffi::v14::xlog_utils::SIZEOF_CHECKPOINT; use postgres_ffi::RELSEG_SIZE; @@ -1046,13 +1043,13 @@ mod tests { forknum: 0, }; - fn assert_current_logical_size(_timeline: &T, _lsn: Lsn) { + fn assert_current_logical_size(_timeline: &Timeline, _lsn: Lsn) { // TODO } static ZERO_CHECKPOINT: Bytes = Bytes::from_static(&[0u8; SIZEOF_CHECKPOINT]); - fn init_walingest_test(tline: &T) -> Result> { + fn init_walingest_test(tline: &Timeline) -> Result { let mut m = tline.begin_modification(Lsn(0x10)); m.put_checkpoint(ZERO_CHECKPOINT.clone())?; m.put_relmap_file(0, 111, Bytes::from(""))?; // dummy relmapper file diff --git a/pageserver/src/walreceiver/connection_manager.rs b/pageserver/src/walreceiver/connection_manager.rs index e8e0a7c52b..2fc44cb26a 100644 --- a/pageserver/src/walreceiver/connection_manager.rs +++ b/pageserver/src/walreceiver/connection_manager.rs @@ -16,7 +16,7 @@ use std::{ time::Duration, }; -use crate::{layered_repository::LayeredTimeline, repository::Timeline}; +use crate::layered_repository::Timeline; use anyhow::Context; use chrono::{NaiveDateTime, Utc}; use etcd_broker::{ @@ -39,7 +39,7 @@ pub(super) fn spawn_connection_manager_task( id: ZTenantTimelineId, broker_loop_prefix: String, mut client: Client, - local_timeline: Arc, + local_timeline: Arc, wal_connect_timeout: Duration, lagging_wal_timeout: Duration, max_lsn_wal_lag: NonZeroU64, @@ -242,7 +242,7 @@ const WALCONNECTION_RETRY_BACKOFF_MULTIPLIER: f64 = 1.5; struct WalreceiverState { id: ZTenantTimelineId, /// Use pageserver data about the timeline to filter out some of the safekeepers. - local_timeline: Arc, + local_timeline: Arc, /// The timeout on the connection to safekeeper for WAL streaming. wal_connect_timeout: Duration, /// The timeout to use to determine when the current connection is "stale" and reconnect to the other one. @@ -300,7 +300,7 @@ struct EtcdSkTimeline { impl WalreceiverState { fn new( id: ZTenantTimelineId, - local_timeline: Arc, + local_timeline: Arc, wal_connect_timeout: Duration, lagging_wal_timeout: Duration, max_lsn_wal_lag: NonZeroU64, diff --git a/pageserver/src/walreceiver/walreceiver_connection.rs b/pageserver/src/walreceiver/walreceiver_connection.rs index 025bfeb506..283cc76e66 100644 --- a/pageserver/src/walreceiver/walreceiver_connection.rs +++ b/pageserver/src/walreceiver/walreceiver_connection.rs @@ -20,11 +20,7 @@ use tracing::{debug, error, info, info_span, trace, warn, Instrument}; use super::TaskEvent; use crate::{ - layered_repository::WalReceiverInfo, - pgdatadir_mapping::DatadirTimeline, - repository::{Repository, Timeline}, - tenant_mgr, - walingest::WalIngest, + layered_repository::WalReceiverInfo, repository::Repository, tenant_mgr, walingest::WalIngest, walrecord::DecodedWALRecord, }; use postgres_ffi::v14::waldecoder::WalStreamDecoder;