diff --git a/pageserver/src/basebackup.rs b/pageserver/src/basebackup.rs index 3ec1ec9243..5837447ce8 100644 --- a/pageserver/src/basebackup.rs +++ b/pageserver/src/basebackup.rs @@ -23,8 +23,7 @@ use tar::{Builder, EntryType, Header}; use tracing::*; use crate::reltag::{RelTag, SlruKind}; -use crate::repository::Timeline; -use crate::DatadirTimelineImpl; +use crate::DatadirTimeline; use postgres_ffi::xlog_utils::*; use postgres_ffi::*; use utils::lsn::Lsn; @@ -32,12 +31,13 @@ use utils::lsn::Lsn; /// This is short-living object only for the time of tarball creation, /// created mostly to avoid passing a lot of parameters between various functions /// used for constructing tarball. -pub struct Basebackup<'a, W> +pub struct Basebackup<'a, W, T> where W: Write, + T: DatadirTimeline, { ar: Builder>, - timeline: &'a Arc, + timeline: &'a Arc, pub lsn: Lsn, prev_record_lsn: Lsn, full_backup: bool, @@ -52,17 +52,18 @@ where // * When working without safekeepers. In this situation it is important to match the lsn // we are taking basebackup on with the lsn that is used in pageserver's walreceiver // to start the replication. -impl<'a, W> Basebackup<'a, W> +impl<'a, W, T> Basebackup<'a, W, T> where W: Write, + T: DatadirTimeline, { pub fn new( write: W, - timeline: &'a Arc, + timeline: &'a Arc, req_lsn: Option, prev_lsn: Option, full_backup: bool, - ) -> Result> { + ) -> Result> { // Compute postgres doesn't have any previous WAL files, but the first // record that it's going to write needs to include the LSN of the // previous record (xl_prev). We include prev_record_lsn in the @@ -79,13 +80,13 @@ where let (backup_prev, backup_lsn) = if let Some(req_lsn) = req_lsn { // Backup was requested at a particular LSN. Wait for it to arrive. info!("waiting for {}", req_lsn); - timeline.tline.wait_lsn(req_lsn)?; + timeline.wait_lsn(req_lsn)?; // If the requested point is the end of the timeline, we can // provide prev_lsn. (get_last_record_rlsn() might return it as // zero, though, if no WAL has been generated on this timeline // yet.) - let end_of_timeline = timeline.tline.get_last_record_rlsn(); + let end_of_timeline = timeline.get_last_record_rlsn(); if req_lsn == end_of_timeline.last { (end_of_timeline.prev, req_lsn) } else { @@ -93,7 +94,7 @@ where } } else { // Backup was requested at end of the timeline. - let end_of_timeline = timeline.tline.get_last_record_rlsn(); + let end_of_timeline = timeline.get_last_record_rlsn(); (end_of_timeline.prev, end_of_timeline.last) }; @@ -371,7 +372,7 @@ where // add zenith.signal file let mut zenith_signal = String::new(); if self.prev_record_lsn == Lsn(0) { - if self.lsn == self.timeline.tline.get_ancestor_lsn() { + if self.lsn == self.timeline.get_ancestor_lsn() { write!(zenith_signal, "PREV LSN: none")?; } else { write!(zenith_signal, "PREV LSN: invalid")?; @@ -402,9 +403,10 @@ where } } -impl<'a, W> Drop for Basebackup<'a, W> +impl<'a, W, T> Drop for Basebackup<'a, W, T> where W: Write, + T: DatadirTimeline, { /// If the basebackup was not finished, prevent the Archive::drop() from /// writing the end-of-archive marker. diff --git a/pageserver/src/import_datadir.rs b/pageserver/src/import_datadir.rs index 6402657e05..ccfd83400a 100644 --- a/pageserver/src/import_datadir.rs +++ b/pageserver/src/import_datadir.rs @@ -13,8 +13,6 @@ use walkdir::WalkDir; use crate::pgdatadir_mapping::*; use crate::reltag::{RelTag, SlruKind}; -use crate::repository::Repository; -use crate::repository::Timeline; use crate::walingest::WalIngest; use crate::walrecord::DecodedWALRecord; use postgres_ffi::relfile_utils::*; @@ -30,9 +28,9 @@ use utils::lsn::Lsn; /// This is currently only used to import a cluster freshly created by initdb. /// The code that deals with the checkpoint would not work right if the /// cluster was not shut down cleanly. -pub fn import_timeline_from_postgres_datadir( +pub fn import_timeline_from_postgres_datadir( path: &Path, - tline: &mut DatadirTimeline, + tline: &T, lsn: Lsn, ) -> Result<()> { let mut pg_control: Option = None; @@ -90,8 +88,8 @@ pub fn import_timeline_from_postgres_datadir( } // subroutine of import_timeline_from_postgres_datadir(), to load one relation file. -fn import_rel( - modification: &mut DatadirModification, +fn import_rel( + modification: &mut DatadirModification, path: &Path, spcoid: Oid, dboid: Oid, @@ -170,8 +168,8 @@ fn import_rel( /// Import an SLRU segment file /// -fn import_slru( - modification: &mut DatadirModification, +fn import_slru( + modification: &mut DatadirModification, slru: SlruKind, path: &Path, mut reader: Reader, @@ -226,9 +224,9 @@ fn import_slru( /// Scan PostgreSQL WAL files in given directory and load all records between /// 'startpoint' and 'endpoint' into the repository. -fn import_wal( +fn import_wal( walpath: &Path, - tline: &mut DatadirTimeline, + tline: &T, startpoint: Lsn, endpoint: Lsn, ) -> Result<()> { @@ -297,8 +295,8 @@ fn import_wal( Ok(()) } -pub fn import_basebackup_from_tar( - tline: &mut DatadirTimeline, +pub fn import_basebackup_from_tar( + tline: &T, reader: Reader, base_lsn: Lsn, ) -> Result<()> { @@ -339,8 +337,8 @@ pub fn import_basebackup_from_tar( Ok(()) } -pub fn import_wal_from_tar( - tline: &mut DatadirTimeline, +pub fn import_wal_from_tar( + tline: &T, reader: Reader, start_lsn: Lsn, end_lsn: Lsn, @@ -420,8 +418,8 @@ pub fn import_wal_from_tar( Ok(()) } -pub fn import_file( - modification: &mut DatadirModification, +pub fn import_file( + modification: &mut DatadirModification, file_path: &Path, reader: Reader, len: usize, @@ -540,7 +538,7 @@ pub fn import_file( // zenith.signal is not necessarily the last file, that we handle // but it is ok to call `finish_write()`, because final `modification.commit()` // will update lsn once more to the final one. - let writer = modification.tline.tline.writer(); + let writer = modification.tline.writer(); writer.finish_write(prev_lsn); debug!("imported zenith signal {}", prev_lsn); diff --git a/pageserver/src/layered_repository.rs b/pageserver/src/layered_repository.rs index d770e736e9..c500b05e66 100644 --- a/pageserver/src/layered_repository.rs +++ b/pageserver/src/layered_repository.rs @@ -32,7 +32,6 @@ use crate::storage_sync::index::RemoteIndex; use crate::tenant_config::{TenantConf, TenantConfOpt}; use crate::repository::{GcResult, Repository, RepositoryTimeline, Timeline}; -use crate::tenant_mgr; use crate::thread_mgr; use crate::walredo::WalRedoManager; use crate::CheckpointConfig; @@ -181,7 +180,6 @@ impl Repository for LayeredRepository { self.tenant_id, Arc::clone(&self.walredo_mgr), self.upload_layers, - None, ); timeline.layers.write().unwrap().next_open_layer_at = Some(initdb_lsn); @@ -246,20 +244,6 @@ impl Repository for LayeredRepository { )); } } - // Copy logical size from source timeline if we are branching on the last position. - let init_logical_size = - if let Ok(src_pgdir) = tenant_mgr::get_local_timeline_with_load(self.tenant_id, src) { - let logical_size = src_pgdir.get_current_logical_size(); - // Check LSN after getting logical size to exclude race condition - // when ancestor timeline is concurrently updated - if src_timeline.get_last_record_lsn() == start_lsn { - Some(logical_size) - } else { - None - } - } else { - None - }; // Determine prev-LSN for the new timeline. We can only determine it if // the timeline was branched at the current end of the source timeline. @@ -290,14 +274,7 @@ impl Repository for LayeredRepository { ); crashsafe_dir::create_dir_all(self.conf.timeline_path(&dst, &self.tenant_id))?; timeline::save_metadata(self.conf, dst, self.tenant_id, &metadata, true)?; - timelines.insert( - dst, - LayeredTimelineEntry::Unloaded { - id: dst, - metadata, - init_logical_size, - }, - ); + timelines.insert(dst, LayeredTimelineEntry::Unloaded { id: dst, metadata }); info!("branched timeline {} from {} at {}", dst, src, start_lsn); @@ -433,7 +410,7 @@ impl Repository for LayeredRepository { // we need to get metadata of a timeline, another option is to pass it along with Downloaded status let metadata = load_metadata(self.conf, timeline_id, self.tenant_id).context("failed to load local metadata")?; // finally we make newly downloaded timeline visible to repository - entry.insert(LayeredTimelineEntry::Unloaded { id: timeline_id, metadata, init_logical_size: None }) + entry.insert(LayeredTimelineEntry::Unloaded { id: timeline_id, metadata }) }, }; Ok(()) @@ -551,18 +528,13 @@ impl LayeredRepository { timelineid: ZTimelineId, timelines: &mut HashMap, ) -> anyhow::Result>> { - let logical_size: Option; match timelines.get(&timelineid) { Some(entry) => match entry { LayeredTimelineEntry::Loaded(local_timeline) => { debug!("timeline {} found loaded into memory", &timelineid); return Ok(Some(Arc::clone(local_timeline))); } - LayeredTimelineEntry::Unloaded { - init_logical_size, .. - } => { - logical_size = *init_logical_size; - } + LayeredTimelineEntry::Unloaded { .. } => {} }, None => { debug!("timeline {} not found", &timelineid); @@ -573,7 +545,7 @@ impl LayeredRepository { "timeline {} found on a local disk, but not loaded into the memory, loading", &timelineid ); - let timeline = self.load_local_timeline(timelineid, timelines, logical_size)?; + let timeline = self.load_local_timeline(timelineid, timelines)?; let was_loaded = timelines.insert( timelineid, LayeredTimelineEntry::Loaded(Arc::clone(&timeline)), @@ -590,7 +562,6 @@ impl LayeredRepository { &self, timeline_id: ZTimelineId, timelines: &mut HashMap, - init_logical_size: Option, ) -> anyhow::Result> { let metadata = load_metadata(self.conf, timeline_id, self.tenant_id) .context("failed to load metadata")?; @@ -617,7 +588,6 @@ impl LayeredRepository { self.tenant_id, Arc::clone(&self.walredo_mgr), self.upload_layers, - init_logical_size, ); timeline .load_layer_map(disk_consistent_lsn) diff --git a/pageserver/src/layered_repository/timeline.rs b/pageserver/src/layered_repository/timeline.rs index e862b7def7..bdc74160aa 100644 --- a/pageserver/src/layered_repository/timeline.rs +++ b/pageserver/src/layered_repository/timeline.rs @@ -14,7 +14,7 @@ use std::fs::{File, OpenOptions}; use std::io::Write; use std::ops::{Deref, Range}; use std::path::PathBuf; -use std::sync::atomic::{self, AtomicBool}; +use std::sync::atomic::{self, AtomicBool, AtomicIsize, Ordering as AtomicOrdering}; use std::sync::{Arc, Mutex, MutexGuard, RwLock, RwLockReadGuard, TryLockError}; use std::time::{Duration, SystemTime}; @@ -39,6 +39,7 @@ use crate::config::PageServerConf; use crate::keyspace::{KeyPartitioning, KeySpace}; use crate::pgdatadir_mapping::LsnForTimestamp; use crate::tenant_config::TenantConfOpt; +use crate::DatadirTimeline; use postgres_ffi::xlog_utils::to_pg_timestamp; use utils::{ @@ -49,7 +50,6 @@ use utils::{ use crate::repository::{GcResult, RepositoryTimeline, Timeline, TimelineWriter}; use crate::repository::{Key, Value}; -use crate::tenant_mgr; use crate::thread_mgr; use crate::virtual_file::VirtualFile; use crate::walreceiver::IS_WAL_RECEIVER; @@ -122,7 +122,6 @@ pub enum LayeredTimelineEntry { Unloaded { id: ZTimelineId, metadata: TimelineMetadata, - init_logical_size: Option, }, } @@ -269,11 +268,21 @@ pub struct LayeredTimeline { // though lets keep them both for better error visibility. pub initdb_lsn: Lsn, - // Initial logical size of timeline (if known). - // Logical size can be copied from ancestor timeline when new branch is create at last LSN - pub init_logical_size: Option, + /// When did we last calculate the partitioning? + partitioning: Mutex<(KeyPartitioning, Lsn)>, + + /// Configuration: how often should the partitioning be recalculated. + repartition_threshold: u64, + + /// Current logical size of the "datadir", at the last LSN. + current_logical_size: AtomicIsize, } +/// Inherit all the functions from DatadirTimeline, to provide the +/// functionality to store PostgreSQL relations, SLRUs, etc. in a +/// LayeredTimeline. +impl DatadirTimeline for LayeredTimeline {} + /// /// Information about how much history needs to be retained, needed by /// Garbage Collection. @@ -472,7 +481,6 @@ impl LayeredTimeline { tenant_id: ZTenantId, walredo_mgr: Arc, upload_layers: bool, - init_logical_size: Option, ) -> LayeredTimeline { let reconstruct_time_histo = RECONSTRUCT_TIME .get_metric_with_label_values(&[&tenant_id.to_string(), &timeline_id.to_string()]) @@ -508,7 +516,7 @@ impl LayeredTimeline { .get_metric_with_label_values(&[&tenant_id.to_string(), &timeline_id.to_string()]) .unwrap(); - LayeredTimeline { + let mut result = LayeredTimeline { conf, tenant_conf, timeline_id, @@ -551,8 +559,13 @@ impl LayeredTimeline { latest_gc_cutoff_lsn: RwLock::new(metadata.latest_gc_cutoff_lsn()), initdb_lsn: metadata.initdb_lsn(), - init_logical_size, - } + + current_logical_size: AtomicIsize::new(0), + partitioning: Mutex::new((KeyPartitioning::new(), Lsn(0))), + repartition_threshold: 0, + }; + result.repartition_threshold = result.get_checkpoint_distance() / 10; + result } /// @@ -634,6 +647,58 @@ impl LayeredTimeline { Ok(()) } + /// (Re-)calculate the logical size of the database at the latest LSN. + /// + /// This can be a slow operation. + pub fn init_logical_size(&self) -> Result<()> { + // Try a fast-path first: + // Copy logical size from ancestor timeline if there has been no changes on this + // branch, and no changes on the ancestor branch since the branch point. + if self.get_ancestor_lsn() == self.get_last_record_lsn() && self.ancestor_timeline.is_some() + { + let ancestor = self.get_ancestor_timeline()?; + let ancestor_logical_size = ancestor.get_current_logical_size(); + // Check LSN after getting logical size to exclude race condition + // when ancestor timeline is concurrently updated. + // + // Logical size 0 means that it was not initialized, so don't believe that. + if ancestor_logical_size != 0 && ancestor.get_last_record_lsn() == self.ancestor_lsn { + self.current_logical_size + .store(ancestor_logical_size as isize, AtomicOrdering::SeqCst); + debug!( + "logical size copied from ancestor: {}", + ancestor_logical_size + ); + return Ok(()); + } + } + + // Have to calculate it the hard way + let last_lsn = self.get_last_record_lsn(); + let logical_size = self.get_current_logical_size_non_incremental(last_lsn)?; + self.current_logical_size + .store(logical_size as isize, AtomicOrdering::SeqCst); + debug!("calculated logical size the hard way: {}", logical_size); + Ok(()) + } + + /// Retrieve current logical size of the timeline + /// + /// NOTE: counted incrementally, includes ancestors, + pub fn get_current_logical_size(&self) -> usize { + let current_logical_size = self.current_logical_size.load(AtomicOrdering::Acquire); + match usize::try_from(current_logical_size) { + Ok(sz) => sz, + Err(_) => { + error!( + "current_logical_size is out of range: {}", + current_logical_size + ); + 0 + } + } + } + /// /// Get a handle to a Layer for reading. /// @@ -1003,18 +1068,16 @@ impl LayeredTimeline { // files instead. This is possible as long as *all* the data imported into the // repository have the same LSN. let lsn_range = frozen_layer.get_lsn_range(); - let layer_paths_to_upload = if lsn_range.start == self.initdb_lsn - && lsn_range.end == Lsn(self.initdb_lsn.0 + 1) - { - let pgdir = tenant_mgr::get_local_timeline_with_load(self.tenant_id, self.timeline_id)?; - let (partitioning, _lsn) = - pgdir.repartition(self.initdb_lsn, self.get_compaction_target_size())?; - self.create_image_layers(&partitioning, self.initdb_lsn, true)? - } else { - // normal case, write out a L0 delta layer file. - let delta_path = self.create_delta_layer(&frozen_layer)?; - HashSet::from([delta_path]) - }; + let layer_paths_to_upload = + if lsn_range.start == self.initdb_lsn && lsn_range.end == Lsn(self.initdb_lsn.0 + 1) { + let (partitioning, _lsn) = + self.repartition(self.initdb_lsn, self.get_compaction_target_size())?; + self.create_image_layers(&partitioning, self.initdb_lsn, true)? + } else { + // normal case, write out a L0 delta layer file. + let delta_path = self.create_delta_layer(&frozen_layer)?; + HashSet::from([delta_path]) + }; fail_point!("flush-frozen-before-sync"); @@ -1186,38 +1249,56 @@ impl LayeredTimeline { let target_file_size = self.get_checkpoint_distance(); // Define partitioning schema if needed - if let Ok(pgdir) = - tenant_mgr::get_local_timeline_with_load(self.tenant_id, self.timeline_id) - { - // 2. Create new image layers for partitions that have been modified - // "enough". - let (partitioning, lsn) = pgdir.repartition( - self.get_last_record_lsn(), - self.get_compaction_target_size(), - )?; - let layer_paths_to_upload = self.create_image_layers(&partitioning, lsn, false)?; - if !layer_paths_to_upload.is_empty() - && self.upload_layers.load(atomic::Ordering::Relaxed) - { - storage_sync::schedule_layer_upload( - self.tenant_id, - self.timeline_id, - HashSet::from_iter(layer_paths_to_upload), - None, - ); - } - // 3. Compact - let timer = self.compact_time_histo.start_timer(); - self.compact_level0(target_file_size)?; - timer.stop_and_record(); - } else { - debug!("Could not compact because no partitioning specified yet"); - } + match self.repartition( + self.get_last_record_lsn(), + self.get_compaction_target_size(), + ) { + Ok((partitioning, lsn)) => { + // 2. Create new image layers for partitions that have been modified + // "enough". + let layer_paths_to_upload = self.create_image_layers(&partitioning, lsn, false)?; + if !layer_paths_to_upload.is_empty() + && self.upload_layers.load(atomic::Ordering::Relaxed) + { + storage_sync::schedule_layer_upload( + self.tenant_id, + self.timeline_id, + HashSet::from_iter(layer_paths_to_upload), + None, + ); + } + + // 3. Compact + let timer = self.compact_time_histo.start_timer(); + self.compact_level0(target_file_size)?; + timer.stop_and_record(); + } + Err(err) => { + // no partitioning? This is normal, if the timeline was just created + // as an empty timeline. Also in unit tests, when we use the timeline + // as a simple key-value store, ignoring the datadir layout. Log the + // error but continue. + error!("could not compact, repartitioning keyspace failed: {err:?}"); + } + }; Ok(()) } + fn repartition(&self, lsn: Lsn, partition_size: u64) -> Result<(KeyPartitioning, Lsn)> { + let mut partitioning_guard = self.partitioning.lock().unwrap(); + if partitioning_guard.1 == Lsn(0) + || lsn.0 - partitioning_guard.1 .0 > self.repartition_threshold + { + let keyspace = self.collect_keyspace(lsn)?; + let partitioning = keyspace.partition(partition_size); + *partitioning_guard = (partitioning, lsn); + return Ok((partitioning_guard.0.clone(), lsn)); + } + Ok((partitioning_guard.0.clone(), partitioning_guard.1)) + } + // Is it time to create a new image layer for the given partition? fn time_for_new_image_layer(&self, partition: &KeySpace, lsn: Lsn) -> Result { let layers = self.layers.read().unwrap(); @@ -1626,19 +1707,21 @@ impl LayeredTimeline { // Calculate pitr cutoff point. // If we cannot determine a cutoff LSN, be conservative and don't GC anything. - let mut pitr_cutoff_lsn: Lsn = *self.get_latest_gc_cutoff_lsn(); + let mut pitr_cutoff_lsn: Lsn; + + if pitr != Duration::ZERO { + // conservative, safe default is to remove nothing, when we have no + // commit timestamp data available + pitr_cutoff_lsn = *self.get_latest_gc_cutoff_lsn(); - if let Ok(timeline) = - tenant_mgr::get_local_timeline_with_load(self.tenant_id, self.timeline_id) - { - let now = SystemTime::now(); // First, calculate pitr_cutoff_timestamp and then convert it to LSN. // If we don't have enough data to convert to LSN, // play safe and don't remove any layers. + let now = SystemTime::now(); if let Some(pitr_cutoff_timestamp) = now.checked_sub(pitr) { let pitr_timestamp = to_pg_timestamp(pitr_cutoff_timestamp); - match timeline.find_lsn_for_timestamp(pitr_timestamp)? { + match self.find_lsn_for_timestamp(pitr_timestamp)? { LsnForTimestamp::Present(lsn) => pitr_cutoff_lsn = lsn, LsnForTimestamp::Future(lsn) => { debug!("future({})", lsn); @@ -1653,9 +1736,10 @@ impl LayeredTimeline { } debug!("pitr_cutoff_lsn = {:?}", pitr_cutoff_lsn) } - } else if cfg!(test) { - // We don't have local timeline in mocked cargo tests. - // So, just ignore pitr_interval setting in this case. + } else { + // No time-based retention. (Some unit tests depend on garbage-collection + // working even when CLOG data is missing, so that find_lsn_for_timestamp() + // above doesn't work.) pitr_cutoff_lsn = gc_info.horizon_cutoff; } gc_info.pitr_cutoff = pitr_cutoff_lsn; @@ -1962,6 +2046,12 @@ impl<'a> TimelineWriter<'_> for LayeredTimelineWriter<'a> { fn finish_write(&self, new_lsn: Lsn) { self.tl.finish_write(new_lsn); } + + fn update_current_logical_size(&self, delta: isize) { + self.tl + .current_logical_size + .fetch_add(delta, AtomicOrdering::SeqCst); + } } /// Add a suffix to a layer file's name: .{num}.old diff --git a/pageserver/src/lib.rs b/pageserver/src/lib.rs index c9c00d75e2..4ecb181553 100644 --- a/pageserver/src/lib.rs +++ b/pageserver/src/lib.rs @@ -63,8 +63,7 @@ pub enum CheckpointConfig { } pub type RepositoryImpl = LayeredRepository; - -pub type DatadirTimelineImpl = DatadirTimeline; +pub type TimelineImpl = ::Timeline; pub fn shutdown_pageserver(exit_code: i32) { // Shut down the libpq endpoint thread. This prevents new connections from diff --git a/pageserver/src/page_service.rs b/pageserver/src/page_service.rs index 3dba207ab9..c8aa4b35e8 100644 --- a/pageserver/src/page_service.rs +++ b/pageserver/src/page_service.rs @@ -30,7 +30,6 @@ use utils::{ use crate::basebackup; use crate::config::{PageServerConf, ProfilingConfig}; use crate::import_datadir::{import_basebackup_from_tar, import_wal_from_tar}; -use crate::layered_repository::LayeredRepository; use crate::pgdatadir_mapping::{DatadirTimeline, LsnForTimestamp}; use crate::profiling::profpoint_start; use crate::reltag::RelTag; @@ -555,9 +554,6 @@ impl PageServerHandler { info!("creating new timeline"); let repo = tenant_mgr::get_repository_for_tenant(tenant_id)?; let timeline = repo.create_empty_timeline(timeline_id, base_lsn)?; - let repartition_distance = repo.get_checkpoint_distance(); - let mut datadir_timeline = - DatadirTimeline::::new(timeline, repartition_distance); // TODO mark timeline as not ready until it reaches end_lsn. // We might have some wal to import as well, and we should prevent compute @@ -573,7 +569,7 @@ impl PageServerHandler { info!("importing basebackup"); pgb.write_message(&BeMessage::CopyInResponse)?; let reader = CopyInReader::new(pgb); - import_basebackup_from_tar(&mut datadir_timeline, reader, base_lsn)?; + import_basebackup_from_tar(&*timeline, reader, base_lsn)?; // TODO check checksum // Meanwhile you can verify client-side by taking fullbackup @@ -583,7 +579,7 @@ impl PageServerHandler { // Flush data to disk, then upload to s3 info!("flushing layers"); - datadir_timeline.tline.checkpoint(CheckpointConfig::Flush)?; + timeline.checkpoint(CheckpointConfig::Flush)?; info!("done"); Ok(()) @@ -605,10 +601,6 @@ impl PageServerHandler { let timeline = repo.get_timeline_load(timeline_id)?; ensure!(timeline.get_last_record_lsn() == start_lsn); - let repartition_distance = repo.get_checkpoint_distance(); - let mut datadir_timeline = - DatadirTimeline::::new(timeline, repartition_distance); - // TODO leave clean state on error. For now you can use detach to clean // up broken state from a failed import. @@ -616,16 +608,16 @@ impl PageServerHandler { info!("importing wal"); pgb.write_message(&BeMessage::CopyInResponse)?; let reader = CopyInReader::new(pgb); - import_wal_from_tar(&mut datadir_timeline, reader, start_lsn, end_lsn)?; + import_wal_from_tar(&*timeline, reader, start_lsn, end_lsn)?; // TODO Does it make sense to overshoot? - ensure!(datadir_timeline.tline.get_last_record_lsn() >= end_lsn); + ensure!(timeline.get_last_record_lsn() >= end_lsn); // Flush data to disk, then upload to s3. No need for a forced checkpoint. // We only want to persist the data, and it doesn't matter if it's in the // shape of deltas or images. info!("flushing layers"); - datadir_timeline.tline.checkpoint(CheckpointConfig::Flush)?; + timeline.checkpoint(CheckpointConfig::Flush)?; info!("done"); Ok(()) @@ -643,8 +635,8 @@ impl PageServerHandler { /// In either case, if the page server hasn't received the WAL up to the /// requested LSN yet, we will wait for it to arrive. The return value is /// the LSN that should be used to look up the page versions. - fn wait_or_get_last_lsn( - timeline: &DatadirTimeline, + fn wait_or_get_last_lsn( + timeline: &T, mut lsn: Lsn, latest: bool, latest_gc_cutoff_lsn: &RwLockReadGuard, @@ -671,7 +663,7 @@ impl PageServerHandler { if lsn <= last_record_lsn { lsn = last_record_lsn; } else { - timeline.tline.wait_lsn(lsn)?; + timeline.wait_lsn(lsn)?; // Since we waited for 'lsn' to arrive, that is now the last // record LSN. (Or close enough for our purposes; the // last-record LSN can advance immediately after we return @@ -681,7 +673,7 @@ impl PageServerHandler { if lsn == Lsn(0) { bail!("invalid LSN(0) in request"); } - timeline.tline.wait_lsn(lsn)?; + timeline.wait_lsn(lsn)?; } ensure!( lsn >= **latest_gc_cutoff_lsn, @@ -691,14 +683,14 @@ impl PageServerHandler { Ok(lsn) } - fn handle_get_rel_exists_request( + fn handle_get_rel_exists_request( &self, - timeline: &DatadirTimeline, + timeline: &T, req: &PagestreamExistsRequest, ) -> Result { let _enter = info_span!("get_rel_exists", rel = %req.rel, req_lsn = %req.lsn).entered(); - let latest_gc_cutoff_lsn = timeline.tline.get_latest_gc_cutoff_lsn(); + let latest_gc_cutoff_lsn = timeline.get_latest_gc_cutoff_lsn(); let lsn = Self::wait_or_get_last_lsn(timeline, req.lsn, req.latest, &latest_gc_cutoff_lsn)?; let exists = timeline.get_rel_exists(req.rel, lsn)?; @@ -708,13 +700,13 @@ impl PageServerHandler { })) } - fn handle_get_nblocks_request( + fn handle_get_nblocks_request( &self, - timeline: &DatadirTimeline, + timeline: &T, req: &PagestreamNblocksRequest, ) -> Result { let _enter = info_span!("get_nblocks", rel = %req.rel, req_lsn = %req.lsn).entered(); - let latest_gc_cutoff_lsn = timeline.tline.get_latest_gc_cutoff_lsn(); + let latest_gc_cutoff_lsn = timeline.get_latest_gc_cutoff_lsn(); let lsn = Self::wait_or_get_last_lsn(timeline, req.lsn, req.latest, &latest_gc_cutoff_lsn)?; let n_blocks = timeline.get_rel_size(req.rel, lsn)?; @@ -724,13 +716,13 @@ impl PageServerHandler { })) } - fn handle_db_size_request( + fn handle_db_size_request( &self, - timeline: &DatadirTimeline, + timeline: &T, req: &PagestreamDbSizeRequest, ) -> Result { let _enter = info_span!("get_db_size", dbnode = %req.dbnode, req_lsn = %req.lsn).entered(); - let latest_gc_cutoff_lsn = timeline.tline.get_latest_gc_cutoff_lsn(); + let latest_gc_cutoff_lsn = timeline.get_latest_gc_cutoff_lsn(); let lsn = Self::wait_or_get_last_lsn(timeline, req.lsn, req.latest, &latest_gc_cutoff_lsn)?; let total_blocks = @@ -743,14 +735,14 @@ impl PageServerHandler { })) } - fn handle_get_page_at_lsn_request( + fn handle_get_page_at_lsn_request( &self, - timeline: &DatadirTimeline, + timeline: &T, req: &PagestreamGetPageRequest, ) -> Result { let _enter = info_span!("get_page", rel = %req.rel, blkno = &req.blkno, req_lsn = %req.lsn) .entered(); - let latest_gc_cutoff_lsn = timeline.tline.get_latest_gc_cutoff_lsn(); + let latest_gc_cutoff_lsn = timeline.get_latest_gc_cutoff_lsn(); let lsn = Self::wait_or_get_last_lsn(timeline, req.lsn, req.latest, &latest_gc_cutoff_lsn)?; /* // Add a 1s delay to some requests. The delayed causes the requests to @@ -783,7 +775,7 @@ impl PageServerHandler { // check that the timeline exists let timeline = tenant_mgr::get_local_timeline_with_load(tenantid, timelineid) .context("Cannot load local timeline")?; - let latest_gc_cutoff_lsn = timeline.tline.get_latest_gc_cutoff_lsn(); + let latest_gc_cutoff_lsn = timeline.get_latest_gc_cutoff_lsn(); if let Some(lsn) = lsn { timeline .check_lsn_is_in_scope(lsn, &latest_gc_cutoff_lsn) @@ -921,7 +913,7 @@ impl postgres_backend::Handler for PageServerHandler { let timeline = tenant_mgr::get_local_timeline_with_load(tenantid, timelineid) .context("Cannot load local timeline")?; - let end_of_timeline = timeline.tline.get_last_record_rlsn(); + let end_of_timeline = timeline.get_last_record_rlsn(); pgb.write_message_noflush(&BeMessage::RowDescription(&[ RowDescriptor::text_col(b"prev_lsn"), @@ -1139,7 +1131,7 @@ impl postgres_backend::Handler for PageServerHandler { let timelineid = ZTimelineId::from_str(caps.get(2).unwrap().as_str())?; let timeline = tenant_mgr::get_local_timeline_with_load(tenantid, timelineid) .context("Couldn't load timeline")?; - timeline.tline.compact()?; + timeline.compact()?; pgb.write_message_noflush(&SINGLE_COL_ROWDESC)? .write_message_noflush(&BeMessage::CommandComplete(b"SELECT 1"))?; @@ -1160,7 +1152,7 @@ impl postgres_backend::Handler for PageServerHandler { .context("Cannot load local timeline")?; // Checkpoint the timeline and also compact it (due to `CheckpointConfig::Forced`). - timeline.tline.checkpoint(CheckpointConfig::Forced)?; + timeline.checkpoint(CheckpointConfig::Forced)?; pgb.write_message_noflush(&SINGLE_COL_ROWDESC)? .write_message_noflush(&BeMessage::CommandComplete(b"SELECT 1"))?; diff --git a/pageserver/src/pgdatadir_mapping.rs b/pageserver/src/pgdatadir_mapping.rs index f703fa16af..61aca8d4ba 100644 --- a/pageserver/src/pgdatadir_mapping.rs +++ b/pageserver/src/pgdatadir_mapping.rs @@ -6,10 +6,10 @@ //! walingest.rs handles a few things like implicit relation creation and extension. //! Clarify that) //! -use crate::keyspace::{KeyPartitioning, KeySpace, KeySpaceAccum}; +use crate::keyspace::{KeySpace, KeySpaceAccum}; use crate::reltag::{RelTag, SlruKind}; +use crate::repository::Timeline; use crate::repository::*; -use crate::repository::{Repository, Timeline}; use crate::walrecord::ZenithWalRecord; use anyhow::{bail, ensure, Result}; use bytes::{Buf, Bytes}; @@ -18,34 +18,12 @@ use postgres_ffi::{pg_constants, Oid, TransactionId}; use serde::{Deserialize, Serialize}; use std::collections::{HashMap, HashSet}; use std::ops::Range; -use std::sync::atomic::{AtomicIsize, Ordering}; -use std::sync::{Arc, Mutex, RwLockReadGuard}; -use tracing::{debug, error, trace, warn}; +use tracing::{debug, trace, warn}; use utils::{bin_ser::BeSer, lsn::Lsn}; /// Block number within a relation or SLRU. This matches PostgreSQL's BlockNumber type. pub type BlockNumber = u32; -pub struct DatadirTimeline -where - R: Repository, -{ - /// The underlying key-value store. Callers should not read or modify the - /// data in the underlying store directly. However, it is exposed to have - /// access to information like last-LSN, ancestor, and operations like - /// compaction. - pub tline: Arc, - - /// When did we last calculate the partitioning? - partitioning: Mutex<(KeyPartitioning, Lsn)>, - - /// Configuration: how often should the partitioning be recalculated. - repartition_threshold: u64, - - /// Current logical size of the "datadir", at the last LSN. - current_logical_size: AtomicIsize, -} - #[derive(Debug)] pub enum LsnForTimestamp { Present(Lsn), @@ -54,34 +32,24 @@ pub enum LsnForTimestamp { NoData(Lsn), } -impl DatadirTimeline { - pub fn new(tline: Arc, repartition_threshold: u64) -> Self { - DatadirTimeline { - tline, - partitioning: Mutex::new((KeyPartitioning::new(), Lsn(0))), - current_logical_size: AtomicIsize::new(0), - repartition_threshold, - } - } - - /// (Re-)calculate the logical size of the database at the latest LSN. - /// - /// This can be a slow operation. - pub fn init_logical_size(&self) -> Result<()> { - let last_lsn = self.tline.get_last_record_lsn(); - self.current_logical_size.store( - self.get_current_logical_size_non_incremental(last_lsn)? as isize, - Ordering::SeqCst, - ); - Ok(()) - } - - /// Set timeline logical size. - pub fn set_logical_size(&self, size: usize) { - self.current_logical_size - .store(size as isize, Ordering::SeqCst); - } - +/// +/// This trait provides all the functionality to store PostgreSQL relations, SLRUs, +/// and other special kinds of files, in a versioned key-value store. The +/// Timeline trait provides the key-value store. +/// +/// This is a trait, so that we can easily include all these functions in a Timeline +/// implementation. You're not expected to have different implementations of this trait, +/// rather, this provides an interface and implementation, over Timeline. +/// +/// If you wanted to store other kinds of data in the Neon repository, e.g. +/// flat files or MySQL, you would create a new trait like this, with all the +/// functions that make sense for the kind of data you're storing. For flat files, +/// for example, you might have a function like "fn read(path, offset, size)". +/// We might also have that situation in the future, to support multiple PostgreSQL +/// versions, if there are big changes in how the data is organized in the data +/// directory, or if new special files are introduced. +/// +pub trait DatadirTimeline: Timeline { /// Start ingesting a WAL record, or other atomic modification of /// the timeline. /// @@ -102,7 +70,10 @@ impl DatadirTimeline { /// functions of the timeline until you finish! And if you update the /// same page twice, the last update wins. /// - pub fn begin_modification(&self) -> DatadirModification { + fn begin_modification(&self) -> DatadirModification + where + Self: Sized, + { DatadirModification { tline: self, pending_updates: HashMap::new(), @@ -116,7 +87,7 @@ impl DatadirTimeline { //------------------------------------------------------------------------------ /// Look up given page version. - pub fn get_rel_page_at_lsn(&self, tag: RelTag, blknum: BlockNumber, lsn: Lsn) -> Result { + fn get_rel_page_at_lsn(&self, tag: RelTag, blknum: BlockNumber, lsn: Lsn) -> Result { ensure!(tag.relnode != 0, "invalid relnode"); let nblocks = self.get_rel_size(tag, lsn)?; @@ -129,11 +100,11 @@ impl DatadirTimeline { } let key = rel_block_to_key(tag, blknum); - self.tline.get(key, lsn) + self.get(key, lsn) } // Get size of a database in blocks - pub fn get_db_size(&self, spcnode: Oid, dbnode: Oid, lsn: Lsn) -> Result { + fn get_db_size(&self, spcnode: Oid, dbnode: Oid, lsn: Lsn) -> Result { let mut total_blocks = 0; let rels = self.list_rels(spcnode, dbnode, lsn)?; @@ -146,7 +117,7 @@ impl DatadirTimeline { } /// Get size of a relation file - pub fn get_rel_size(&self, tag: RelTag, lsn: Lsn) -> Result { + fn get_rel_size(&self, tag: RelTag, lsn: Lsn) -> Result { ensure!(tag.relnode != 0, "invalid relnode"); if (tag.forknum == pg_constants::FSM_FORKNUM @@ -161,17 +132,17 @@ impl DatadirTimeline { } let key = rel_size_to_key(tag); - let mut buf = self.tline.get(key, lsn)?; + let mut buf = self.get(key, lsn)?; Ok(buf.get_u32_le()) } /// Does relation exist? - pub fn get_rel_exists(&self, tag: RelTag, lsn: Lsn) -> Result { + fn get_rel_exists(&self, tag: RelTag, lsn: Lsn) -> Result { ensure!(tag.relnode != 0, "invalid relnode"); // fetch directory listing let key = rel_dir_to_key(tag.spcnode, tag.dbnode); - let buf = self.tline.get(key, lsn)?; + let buf = self.get(key, lsn)?; let dir = RelDirectory::des(&buf)?; let exists = dir.rels.get(&(tag.relnode, tag.forknum)).is_some(); @@ -180,10 +151,10 @@ impl DatadirTimeline { } /// Get a list of all existing relations in given tablespace and database. - pub fn list_rels(&self, spcnode: Oid, dbnode: Oid, lsn: Lsn) -> Result> { + fn list_rels(&self, spcnode: Oid, dbnode: Oid, lsn: Lsn) -> Result> { // fetch directory listing let key = rel_dir_to_key(spcnode, dbnode); - let buf = self.tline.get(key, lsn)?; + let buf = self.get(key, lsn)?; let dir = RelDirectory::des(&buf)?; let rels: HashSet = @@ -198,7 +169,7 @@ impl DatadirTimeline { } /// Look up given SLRU page version. - pub fn get_slru_page_at_lsn( + fn get_slru_page_at_lsn( &self, kind: SlruKind, segno: u32, @@ -206,26 +177,21 @@ impl DatadirTimeline { lsn: Lsn, ) -> Result { let key = slru_block_to_key(kind, segno, blknum); - self.tline.get(key, lsn) + self.get(key, lsn) } /// Get size of an SLRU segment - pub fn get_slru_segment_size( - &self, - kind: SlruKind, - segno: u32, - lsn: Lsn, - ) -> Result { + fn get_slru_segment_size(&self, kind: SlruKind, segno: u32, lsn: Lsn) -> Result { let key = slru_segment_size_to_key(kind, segno); - let mut buf = self.tline.get(key, lsn)?; + let mut buf = self.get(key, lsn)?; Ok(buf.get_u32_le()) } /// Get size of an SLRU segment - pub fn get_slru_segment_exists(&self, kind: SlruKind, segno: u32, lsn: Lsn) -> Result { + fn get_slru_segment_exists(&self, kind: SlruKind, segno: u32, lsn: Lsn) -> Result { // fetch directory listing let key = slru_dir_to_key(kind); - let buf = self.tline.get(key, lsn)?; + let buf = self.get(key, lsn)?; let dir = SlruSegmentDirectory::des(&buf)?; let exists = dir.segments.get(&segno).is_some(); @@ -239,10 +205,10 @@ impl DatadirTimeline { /// so it's not well defined which LSN you get if there were multiple commits /// "in flight" at that point in time. /// - pub fn find_lsn_for_timestamp(&self, search_timestamp: TimestampTz) -> Result { - let gc_cutoff_lsn_guard = self.tline.get_latest_gc_cutoff_lsn(); + fn find_lsn_for_timestamp(&self, search_timestamp: TimestampTz) -> Result { + let gc_cutoff_lsn_guard = self.get_latest_gc_cutoff_lsn(); let min_lsn = *gc_cutoff_lsn_guard; - let max_lsn = self.tline.get_last_record_lsn(); + let max_lsn = self.get_last_record_lsn(); // LSNs are always 8-byte aligned. low/mid/high represent the // LSN divided by 8. @@ -333,88 +299,51 @@ impl DatadirTimeline { } /// Get a list of SLRU segments - pub fn list_slru_segments(&self, kind: SlruKind, lsn: Lsn) -> Result> { + fn list_slru_segments(&self, kind: SlruKind, lsn: Lsn) -> Result> { // fetch directory entry let key = slru_dir_to_key(kind); - let buf = self.tline.get(key, lsn)?; + let buf = self.get(key, lsn)?; let dir = SlruSegmentDirectory::des(&buf)?; Ok(dir.segments) } - pub fn get_relmap_file(&self, spcnode: Oid, dbnode: Oid, lsn: Lsn) -> Result { + fn get_relmap_file(&self, spcnode: Oid, dbnode: Oid, lsn: Lsn) -> Result { let key = relmap_file_key(spcnode, dbnode); - let buf = self.tline.get(key, lsn)?; + let buf = self.get(key, lsn)?; Ok(buf) } - pub fn list_dbdirs(&self, lsn: Lsn) -> Result> { + fn list_dbdirs(&self, lsn: Lsn) -> Result> { // fetch directory entry - let buf = self.tline.get(DBDIR_KEY, lsn)?; + let buf = self.get(DBDIR_KEY, lsn)?; let dir = DbDirectory::des(&buf)?; Ok(dir.dbdirs) } - pub fn get_twophase_file(&self, xid: TransactionId, lsn: Lsn) -> Result { + fn get_twophase_file(&self, xid: TransactionId, lsn: Lsn) -> Result { let key = twophase_file_key(xid); - let buf = self.tline.get(key, lsn)?; + let buf = self.get(key, lsn)?; Ok(buf) } - pub fn list_twophase_files(&self, lsn: Lsn) -> Result> { + fn list_twophase_files(&self, lsn: Lsn) -> Result> { // fetch directory entry - let buf = self.tline.get(TWOPHASEDIR_KEY, lsn)?; + let buf = self.get(TWOPHASEDIR_KEY, lsn)?; let dir = TwoPhaseDirectory::des(&buf)?; Ok(dir.xids) } - pub fn get_control_file(&self, lsn: Lsn) -> Result { - self.tline.get(CONTROLFILE_KEY, lsn) + fn get_control_file(&self, lsn: Lsn) -> Result { + self.get(CONTROLFILE_KEY, lsn) } - pub fn get_checkpoint(&self, lsn: Lsn) -> Result { - self.tline.get(CHECKPOINT_KEY, lsn) - } - - /// Get the LSN of the last ingested WAL record. - /// - /// This is just a convenience wrapper that calls through to the underlying - /// repository. - pub fn get_last_record_lsn(&self) -> Lsn { - self.tline.get_last_record_lsn() - } - - /// Check that it is valid to request operations with that lsn. - /// - /// This is just a convenience wrapper that calls through to the underlying - /// repository. - pub fn check_lsn_is_in_scope( - &self, - lsn: Lsn, - latest_gc_cutoff_lsn: &RwLockReadGuard, - ) -> Result<()> { - self.tline.check_lsn_is_in_scope(lsn, latest_gc_cutoff_lsn) - } - - /// Retrieve current logical size of the timeline - /// - /// NOTE: counted incrementally, includes ancestors, - pub fn get_current_logical_size(&self) -> usize { - let current_logical_size = self.current_logical_size.load(Ordering::Acquire); - match usize::try_from(current_logical_size) { - Ok(sz) => sz, - Err(_) => { - error!( - "current_logical_size is out of range: {}", - current_logical_size - ); - 0 - } - } + fn get_checkpoint(&self, lsn: Lsn) -> Result { + self.get(CHECKPOINT_KEY, lsn) } /// Does the same as get_current_logical_size but counted on demand. @@ -422,16 +351,16 @@ impl DatadirTimeline { /// /// Only relation blocks are counted currently. That excludes metadata, /// SLRUs, twophase files etc. - pub fn get_current_logical_size_non_incremental(&self, lsn: Lsn) -> Result { + fn get_current_logical_size_non_incremental(&self, lsn: Lsn) -> Result { // Fetch list of database dirs and iterate them - let buf = self.tline.get(DBDIR_KEY, lsn)?; + let buf = self.get(DBDIR_KEY, lsn)?; let dbdir = DbDirectory::des(&buf)?; let mut total_size: usize = 0; for (spcnode, dbnode) in dbdir.dbdirs.keys() { for rel in self.list_rels(*spcnode, *dbnode, lsn)? { let relsize_key = rel_size_to_key(rel); - let mut buf = self.tline.get(relsize_key, lsn)?; + let mut buf = self.get(relsize_key, lsn)?; let relsize = buf.get_u32_le(); total_size += relsize as usize; @@ -452,7 +381,7 @@ impl DatadirTimeline { result.add_key(DBDIR_KEY); // Fetch list of database dirs and iterate them - let buf = self.tline.get(DBDIR_KEY, lsn)?; + let buf = self.get(DBDIR_KEY, lsn)?; let dbdir = DbDirectory::des(&buf)?; let mut dbs: Vec<(Oid, Oid)> = dbdir.dbdirs.keys().cloned().collect(); @@ -469,7 +398,7 @@ impl DatadirTimeline { rels.sort_unstable(); for rel in rels { let relsize_key = rel_size_to_key(rel); - let mut buf = self.tline.get(relsize_key, lsn)?; + let mut buf = self.get(relsize_key, lsn)?; let relsize = buf.get_u32_le(); result.add_range(rel_block_to_key(rel, 0)..rel_block_to_key(rel, relsize)); @@ -485,13 +414,13 @@ impl DatadirTimeline { ] { let slrudir_key = slru_dir_to_key(kind); result.add_key(slrudir_key); - let buf = self.tline.get(slrudir_key, lsn)?; + let buf = self.get(slrudir_key, lsn)?; let dir = SlruSegmentDirectory::des(&buf)?; let mut segments: Vec = dir.segments.iter().cloned().collect(); segments.sort_unstable(); for segno in segments { let segsize_key = slru_segment_size_to_key(kind, segno); - let mut buf = self.tline.get(segsize_key, lsn)?; + let mut buf = self.get(segsize_key, lsn)?; let segsize = buf.get_u32_le(); result.add_range( @@ -503,7 +432,7 @@ impl DatadirTimeline { // Then pg_twophase result.add_key(TWOPHASEDIR_KEY); - let buf = self.tline.get(TWOPHASEDIR_KEY, lsn)?; + let buf = self.get(TWOPHASEDIR_KEY, lsn)?; let twophase_dir = TwoPhaseDirectory::des(&buf)?; let mut xids: Vec = twophase_dir.xids.iter().cloned().collect(); xids.sort_unstable(); @@ -516,30 +445,17 @@ impl DatadirTimeline { Ok(result.to_keyspace()) } - - pub fn repartition(&self, lsn: Lsn, partition_size: u64) -> Result<(KeyPartitioning, Lsn)> { - let mut partitioning_guard = self.partitioning.lock().unwrap(); - if partitioning_guard.1 == Lsn(0) - || lsn.0 - partitioning_guard.1 .0 > self.repartition_threshold - { - let keyspace = self.collect_keyspace(lsn)?; - let partitioning = keyspace.partition(partition_size); - *partitioning_guard = (partitioning, lsn); - return Ok((partitioning_guard.0.clone(), lsn)); - } - Ok((partitioning_guard.0.clone(), partitioning_guard.1)) - } } /// DatadirModification represents an operation to ingest an atomic set of /// updates to the repository. It is created by the 'begin_record' /// function. It is called for each WAL record, so that all the modifications /// by a one WAL record appear atomic. -pub struct DatadirModification<'a, R: Repository> { +pub struct DatadirModification<'a, T: DatadirTimeline> { /// The timeline this modification applies to. You can access this to /// read the state, but note that any pending updates are *not* reflected /// in the state in 'tline' yet. - pub tline: &'a DatadirTimeline, + pub tline: &'a T, // The modifications are not applied directly to the underlying key-value store. // The put-functions add the modifications here, and they are flushed to the @@ -549,7 +465,7 @@ pub struct DatadirModification<'a, R: Repository> { pending_nblocks: isize, } -impl<'a, R: Repository> DatadirModification<'a, R> { +impl<'a, T: DatadirTimeline> DatadirModification<'a, T> { /// Initialize a completely new repository. /// /// This inserts the directory metadata entries that are assumed to @@ -934,7 +850,7 @@ impl<'a, R: Repository> DatadirModification<'a, R> { return Ok(()); } - let writer = self.tline.tline.writer(); + let writer = self.tline.writer(); // Flush relation and SLRU data blocks, keep metadata. let mut result: Result<()> = Ok(()); @@ -949,10 +865,7 @@ impl<'a, R: Repository> DatadirModification<'a, R> { result?; if pending_nblocks != 0 { - self.tline.current_logical_size.fetch_add( - pending_nblocks * pg_constants::BLCKSZ as isize, - Ordering::SeqCst, - ); + writer.update_current_logical_size(pending_nblocks * pg_constants::BLCKSZ as isize); self.pending_nblocks = 0; } @@ -965,7 +878,7 @@ impl<'a, R: Repository> DatadirModification<'a, R> { /// All the modifications in this atomic update are stamped by the specified LSN. /// pub fn commit(&mut self, lsn: Lsn) -> Result<()> { - let writer = self.tline.tline.writer(); + let writer = self.tline.writer(); let pending_nblocks = self.pending_nblocks; self.pending_nblocks = 0; @@ -980,10 +893,7 @@ impl<'a, R: Repository> DatadirModification<'a, R> { writer.finish_write(lsn); if pending_nblocks != 0 { - self.tline.current_logical_size.fetch_add( - pending_nblocks * pg_constants::BLCKSZ as isize, - Ordering::SeqCst, - ); + writer.update_current_logical_size(pending_nblocks * pg_constants::BLCKSZ as isize); } Ok(()) @@ -1010,7 +920,7 @@ impl<'a, R: Repository> DatadirModification<'a, R> { } } else { let last_lsn = self.tline.get_last_record_lsn(); - self.tline.tline.get(key, last_lsn) + self.tline.get(key, last_lsn) } } @@ -1412,13 +1322,12 @@ fn is_slru_block_key(key: Key) -> bool { pub fn create_test_timeline( repo: R, timeline_id: utils::zid::ZTimelineId, -) -> Result>> { +) -> Result> { let tline = repo.create_empty_timeline(timeline_id, Lsn(8))?; - let tline = DatadirTimeline::new(tline, 256 * 1024); let mut m = tline.begin_modification(); m.init_empty()?; m.commit(Lsn(8))?; - Ok(Arc::new(tline)) + Ok(tline) } #[allow(clippy::bool_assert_comparison)] @@ -1491,7 +1400,7 @@ mod tests { .contains(&TESTREL_A)); // Run checkpoint and garbage collection and check that it's still not visible - newtline.tline.checkpoint(CheckpointConfig::Forced)?; + newtline.checkpoint(CheckpointConfig::Forced)?; repo.gc_iteration(Some(NEW_TIMELINE_ID), 0, true)?; assert!(!newtline diff --git a/pageserver/src/repository.rs b/pageserver/src/repository.rs index 359c704e81..61058a7806 100644 --- a/pageserver/src/repository.rs +++ b/pageserver/src/repository.rs @@ -185,7 +185,7 @@ impl Value { /// A repository corresponds to one .neon directory. One repository holds multiple /// timelines, forked off from the same initial call to 'initdb'. pub trait Repository: Send + Sync { - type Timeline: Timeline; + type Timeline: crate::DatadirTimeline; /// Updates timeline based on the `TimelineSyncStatusUpdate`, received from the remote storage synchronization. /// See [`crate::remote_storage`] for more details about the synchronization. @@ -405,6 +405,8 @@ pub trait TimelineWriter<'a> { /// the 'lsn' or anything older. The previous last record LSN is stored alongside /// the latest and can be read. fn finish_write(&self, lsn: Lsn); + + fn update_current_logical_size(&self, delta: isize); } #[cfg(test)] diff --git a/pageserver/src/tenant_mgr.rs b/pageserver/src/tenant_mgr.rs index a485e7c2cb..640dfa623a 100644 --- a/pageserver/src/tenant_mgr.rs +++ b/pageserver/src/tenant_mgr.rs @@ -3,7 +3,6 @@ use crate::config::PageServerConf; use crate::layered_repository::{load_metadata, LayeredRepository}; -use crate::pgdatadir_mapping::DatadirTimeline; use crate::repository::Repository; use crate::storage_sync::index::{RemoteIndex, RemoteTimelineIndex}; use crate::storage_sync::{self, LocalTimelineInitStatus, SyncStartupData}; @@ -12,7 +11,7 @@ use crate::thread_mgr::ThreadKind; use crate::timelines::CreateRepo; use crate::walredo::PostgresRedoManager; use crate::{thread_mgr, timelines, walreceiver}; -use crate::{DatadirTimelineImpl, RepositoryImpl}; +use crate::{RepositoryImpl, TimelineImpl}; use anyhow::Context; use serde::{Deserialize, Serialize}; use serde_with::{serde_as, DisplayFromStr}; @@ -101,7 +100,7 @@ struct Tenant { /// /// Local timelines have more metadata that's loaded into memory, /// that is located in the `repo.timelines` field, [`crate::layered_repository::LayeredTimelineEntry`]. - local_timelines: HashMap>, + local_timelines: HashMap::Timeline>>, } #[derive(Debug, Serialize, Deserialize, Clone, Copy, PartialEq, Eq)] @@ -178,7 +177,7 @@ pub enum LocalTimelineUpdate { }, Attach { id: ZTenantTimelineId, - datadir: Arc, + datadir: Arc<::Timeline>, }, } @@ -382,7 +381,7 @@ pub fn get_repository_for_tenant(tenant_id: ZTenantId) -> anyhow::Result anyhow::Result> { +) -> anyhow::Result> { let mut m = tenants_state::write_tenants(); let tenant = m .get_mut(&tenant_id) @@ -489,27 +488,18 @@ pub fn detach_tenant(conf: &'static PageServerConf, tenant_id: ZTenantId) -> any fn load_local_timeline( repo: &RepositoryImpl, timeline_id: ZTimelineId, -) -> anyhow::Result>> { +) -> anyhow::Result> { let inmem_timeline = repo.get_timeline_load(timeline_id).with_context(|| { format!("Inmem timeline {timeline_id} not found in tenant's repository") })?; - let repartition_distance = repo.get_checkpoint_distance() / 10; - let init_logical_size = inmem_timeline.init_logical_size; - let page_tline = Arc::new(DatadirTimelineImpl::new( - inmem_timeline, - repartition_distance, - )); - if let Some(logical_size) = init_logical_size { - page_tline.set_logical_size(logical_size); - } else { - page_tline.init_logical_size()?; - } + inmem_timeline.init_logical_size()?; + tenants_state::try_send_timeline_update(LocalTimelineUpdate::Attach { id: ZTenantTimelineId::new(repo.tenant_id(), timeline_id), - datadir: Arc::clone(&page_tline), + datadir: Arc::clone(&inmem_timeline), }); - Ok(page_tline) + Ok(inmem_timeline) } #[serde_as] diff --git a/pageserver/src/timelines.rs b/pageserver/src/timelines.rs index a40e705cb9..984276bad2 100644 --- a/pageserver/src/timelines.rs +++ b/pageserver/src/timelines.rs @@ -26,7 +26,7 @@ use crate::{ repository::{LocalTimelineState, Repository}, storage_sync::index::RemoteIndex, tenant_config::TenantConfOpt, - DatadirTimeline, RepositoryImpl, + DatadirTimeline, RepositoryImpl, TimelineImpl, }; use crate::{import_datadir, LOG_FILE_NAME}; use crate::{layered_repository::LayeredRepository, walredo::WalRedoManager}; @@ -54,27 +54,27 @@ pub struct LocalTimelineInfo { } impl LocalTimelineInfo { - pub fn from_loaded_timeline( - datadir_tline: &DatadirTimeline, + pub fn from_loaded_timeline( + timeline: &TimelineImpl, include_non_incremental_logical_size: bool, ) -> anyhow::Result { - let last_record_lsn = datadir_tline.tline.get_last_record_lsn(); + let last_record_lsn = timeline.get_last_record_lsn(); let info = LocalTimelineInfo { - ancestor_timeline_id: datadir_tline.tline.get_ancestor_timeline_id(), + ancestor_timeline_id: timeline.get_ancestor_timeline_id(), ancestor_lsn: { - match datadir_tline.tline.get_ancestor_lsn() { + match timeline.get_ancestor_lsn() { Lsn(0) => None, lsn @ Lsn(_) => Some(lsn), } }, - disk_consistent_lsn: datadir_tline.tline.get_disk_consistent_lsn(), + disk_consistent_lsn: timeline.get_disk_consistent_lsn(), last_record_lsn, - prev_record_lsn: Some(datadir_tline.tline.get_prev_record_lsn()), - latest_gc_cutoff_lsn: *datadir_tline.tline.get_latest_gc_cutoff_lsn(), + prev_record_lsn: Some(timeline.get_prev_record_lsn()), + latest_gc_cutoff_lsn: *timeline.get_latest_gc_cutoff_lsn(), timeline_state: LocalTimelineState::Loaded, - current_logical_size: Some(datadir_tline.get_current_logical_size()), + current_logical_size: Some(timeline.get_current_logical_size()), current_logical_size_non_incremental: if include_non_incremental_logical_size { - Some(datadir_tline.get_current_logical_size_non_incremental(last_record_lsn)?) + Some(timeline.get_current_logical_size_non_incremental(last_record_lsn)?) } else { None }, @@ -109,9 +109,8 @@ impl LocalTimelineInfo { ) -> anyhow::Result { match repo_timeline { RepositoryTimeline::Loaded(_) => { - let datadir_tline = - tenant_mgr::get_local_timeline_with_load(tenant_id, timeline_id)?; - Self::from_loaded_timeline(&datadir_tline, include_non_incremental_logical_size) + let timeline = tenant_mgr::get_local_timeline_with_load(tenant_id, timeline_id)?; + Self::from_loaded_timeline(&*timeline, include_non_incremental_logical_size) } RepositoryTimeline::Unloaded { metadata } => Ok(Self::from_unloaded_timeline(metadata)), } @@ -298,19 +297,18 @@ fn bootstrap_timeline( // Initdb lsn will be equal to last_record_lsn which will be set after import. // Because we know it upfront avoid having an option or dummy zero value by passing it to create_empty_timeline. let timeline = repo.create_empty_timeline(tli, lsn)?; - let mut page_tline: DatadirTimeline = DatadirTimeline::new(timeline, u64::MAX); - import_datadir::import_timeline_from_postgres_datadir(&pgdata_path, &mut page_tline, lsn)?; + import_datadir::import_timeline_from_postgres_datadir(&pgdata_path, &*timeline, lsn)?; fail::fail_point!("before-checkpoint-new-timeline", |_| { bail!("failpoint before-checkpoint-new-timeline"); }); - page_tline.tline.checkpoint(CheckpointConfig::Forced)?; + timeline.checkpoint(CheckpointConfig::Forced)?; info!( "created root timeline {} timeline.lsn {}", tli, - page_tline.tline.get_last_record_lsn() + timeline.get_last_record_lsn() ); // Remove temp dir. We don't need it anymore @@ -389,7 +387,7 @@ pub(crate) fn create_timeline( // load the timeline into memory let loaded_timeline = tenant_mgr::get_local_timeline_with_load(tenant_id, new_timeline_id)?; - LocalTimelineInfo::from_loaded_timeline(&loaded_timeline, false) + LocalTimelineInfo::from_loaded_timeline(&*loaded_timeline, false) .context("cannot fill timeline info")? } None => { @@ -397,7 +395,7 @@ pub(crate) fn create_timeline( // load the timeline into memory let new_timeline = tenant_mgr::get_local_timeline_with_load(tenant_id, new_timeline_id)?; - LocalTimelineInfo::from_loaded_timeline(&new_timeline, false) + LocalTimelineInfo::from_loaded_timeline(&*new_timeline, false) .context("cannot fill timeline info")? } }; diff --git a/pageserver/src/walingest.rs b/pageserver/src/walingest.rs index adc24328ae..8dd14ec177 100644 --- a/pageserver/src/walingest.rs +++ b/pageserver/src/walingest.rs @@ -34,7 +34,6 @@ use std::collections::HashMap; use crate::pgdatadir_mapping::*; use crate::reltag::{RelTag, SlruKind}; -use crate::repository::Repository; use crate::walrecord::*; use postgres_ffi::nonrelfile_utils::mx_offset_to_member_segment; use postgres_ffi::xlog_utils::*; @@ -44,8 +43,8 @@ use utils::lsn::Lsn; static ZERO_PAGE: Bytes = Bytes::from_static(&[0u8; 8192]); -pub struct WalIngest<'a, R: Repository> { - timeline: &'a DatadirTimeline, +pub struct WalIngest<'a, T: DatadirTimeline> { + timeline: &'a T, checkpoint: CheckPoint, checkpoint_modified: bool, @@ -53,8 +52,8 @@ pub struct WalIngest<'a, R: Repository> { relsize_cache: HashMap, } -impl<'a, R: Repository> WalIngest<'a, R> { - pub fn new(timeline: &DatadirTimeline, startpoint: Lsn) -> Result> { +impl<'a, T: DatadirTimeline> WalIngest<'a, T> { + pub fn new(timeline: &T, startpoint: Lsn) -> Result> { // Fetch the latest checkpoint into memory, so that we can compare with it // quickly in `ingest_record` and update it when it changes. let checkpoint_bytes = timeline.get_checkpoint(startpoint)?; @@ -80,7 +79,7 @@ impl<'a, R: Repository> WalIngest<'a, R> { &mut self, recdata: Bytes, lsn: Lsn, - modification: &mut DatadirModification, + modification: &mut DatadirModification, decoded: &mut DecodedWALRecord, ) -> Result<()> { decode_wal_record(recdata, decoded).context("failed decoding wal record")?; @@ -268,7 +267,7 @@ impl<'a, R: Repository> WalIngest<'a, R> { fn ingest_decoded_block( &mut self, - modification: &mut DatadirModification, + modification: &mut DatadirModification, lsn: Lsn, decoded: &DecodedWALRecord, blk: &DecodedBkpBlock, @@ -328,7 +327,7 @@ impl<'a, R: Repository> WalIngest<'a, R> { fn ingest_heapam_record( &mut self, buf: &mut Bytes, - modification: &mut DatadirModification, + modification: &mut DatadirModification, decoded: &mut DecodedWALRecord, ) -> Result<()> { // Handle VM bit updates that are implicitly part of heap records. @@ -472,7 +471,7 @@ impl<'a, R: Repository> WalIngest<'a, R> { /// Subroutine of ingest_record(), to handle an XLOG_DBASE_CREATE record. fn ingest_xlog_dbase_create( &mut self, - modification: &mut DatadirModification, + modification: &mut DatadirModification, rec: &XlCreateDatabase, ) -> Result<()> { let db_id = rec.db_id; @@ -539,7 +538,7 @@ impl<'a, R: Repository> WalIngest<'a, R> { fn ingest_xlog_smgr_create( &mut self, - modification: &mut DatadirModification, + modification: &mut DatadirModification, rec: &XlSmgrCreate, ) -> Result<()> { let rel = RelTag { @@ -557,7 +556,7 @@ impl<'a, R: Repository> WalIngest<'a, R> { /// This is the same logic as in PostgreSQL's smgr_redo() function. fn ingest_xlog_smgr_truncate( &mut self, - modification: &mut DatadirModification, + modification: &mut DatadirModification, rec: &XlSmgrTruncate, ) -> Result<()> { let spcnode = rec.rnode.spcnode; @@ -622,7 +621,7 @@ impl<'a, R: Repository> WalIngest<'a, R> { /// fn ingest_xact_record( &mut self, - modification: &mut DatadirModification, + modification: &mut DatadirModification, parsed: &XlXactParsedRecord, is_commit: bool, ) -> Result<()> { @@ -691,7 +690,7 @@ impl<'a, R: Repository> WalIngest<'a, R> { fn ingest_clog_truncate_record( &mut self, - modification: &mut DatadirModification, + modification: &mut DatadirModification, xlrec: &XlClogTruncate, ) -> Result<()> { info!( @@ -749,7 +748,7 @@ impl<'a, R: Repository> WalIngest<'a, R> { fn ingest_multixact_create_record( &mut self, - modification: &mut DatadirModification, + modification: &mut DatadirModification, xlrec: &XlMultiXactCreate, ) -> Result<()> { // Create WAL record for updating the multixact-offsets page @@ -828,7 +827,7 @@ impl<'a, R: Repository> WalIngest<'a, R> { fn ingest_multixact_truncate_record( &mut self, - modification: &mut DatadirModification, + modification: &mut DatadirModification, xlrec: &XlMultiXactTruncate, ) -> Result<()> { self.checkpoint.oldestMulti = xlrec.end_trunc_off; @@ -862,7 +861,7 @@ impl<'a, R: Repository> WalIngest<'a, R> { fn ingest_relmap_page( &mut self, - modification: &mut DatadirModification, + modification: &mut DatadirModification, xlrec: &XlRelmapUpdate, decoded: &DecodedWALRecord, ) -> Result<()> { @@ -878,7 +877,7 @@ impl<'a, R: Repository> WalIngest<'a, R> { fn put_rel_creation( &mut self, - modification: &mut DatadirModification, + modification: &mut DatadirModification, rel: RelTag, ) -> Result<()> { self.relsize_cache.insert(rel, 0); @@ -888,7 +887,7 @@ impl<'a, R: Repository> WalIngest<'a, R> { fn put_rel_page_image( &mut self, - modification: &mut DatadirModification, + modification: &mut DatadirModification, rel: RelTag, blknum: BlockNumber, img: Bytes, @@ -900,7 +899,7 @@ impl<'a, R: Repository> WalIngest<'a, R> { fn put_rel_wal_record( &mut self, - modification: &mut DatadirModification, + modification: &mut DatadirModification, rel: RelTag, blknum: BlockNumber, rec: ZenithWalRecord, @@ -912,7 +911,7 @@ impl<'a, R: Repository> WalIngest<'a, R> { fn put_rel_truncation( &mut self, - modification: &mut DatadirModification, + modification: &mut DatadirModification, rel: RelTag, nblocks: BlockNumber, ) -> Result<()> { @@ -923,7 +922,7 @@ impl<'a, R: Repository> WalIngest<'a, R> { fn put_rel_drop( &mut self, - modification: &mut DatadirModification, + modification: &mut DatadirModification, rel: RelTag, ) -> Result<()> { modification.put_rel_drop(rel)?; @@ -948,7 +947,7 @@ impl<'a, R: Repository> WalIngest<'a, R> { fn handle_rel_extend( &mut self, - modification: &mut DatadirModification, + modification: &mut DatadirModification, rel: RelTag, blknum: BlockNumber, ) -> Result<()> { @@ -986,7 +985,7 @@ impl<'a, R: Repository> WalIngest<'a, R> { fn put_slru_page_image( &mut self, - modification: &mut DatadirModification, + modification: &mut DatadirModification, kind: SlruKind, segno: u32, blknum: BlockNumber, @@ -999,7 +998,7 @@ impl<'a, R: Repository> WalIngest<'a, R> { fn handle_slru_extend( &mut self, - modification: &mut DatadirModification, + modification: &mut DatadirModification, kind: SlruKind, segno: u32, blknum: BlockNumber, @@ -1052,6 +1051,7 @@ mod tests { use super::*; use crate::pgdatadir_mapping::create_test_timeline; use crate::repository::repo_harness::*; + use crate::repository::Timeline; use postgres_ffi::pg_constants; /// Arbitrary relation tag, for testing. @@ -1062,13 +1062,13 @@ mod tests { forknum: 0, }; - fn assert_current_logical_size(_timeline: &DatadirTimeline, _lsn: Lsn) { + fn assert_current_logical_size(_timeline: &T, _lsn: Lsn) { // TODO } static ZERO_CHECKPOINT: Bytes = Bytes::from_static(&[0u8; SIZEOF_CHECKPOINT]); - fn init_walingest_test(tline: &DatadirTimeline) -> Result> { + fn init_walingest_test(tline: &T) -> Result> { let mut m = tline.begin_modification(); m.put_checkpoint(ZERO_CHECKPOINT.clone())?; m.put_relmap_file(0, 111, Bytes::from(""))?; // dummy relmapper file @@ -1082,7 +1082,7 @@ mod tests { fn test_relsize() -> Result<()> { let repo = RepoHarness::create("test_relsize")?.load(); let tline = create_test_timeline(repo, TIMELINE_ID)?; - let mut walingest = init_walingest_test(&tline)?; + let mut walingest = init_walingest_test(&*tline)?; let mut m = tline.begin_modification(); walingest.put_rel_creation(&mut m, TESTREL_A)?; @@ -1098,7 +1098,7 @@ mod tests { walingest.put_rel_page_image(&mut m, TESTREL_A, 2, TEST_IMG("foo blk 2 at 5"))?; m.commit(Lsn(0x50))?; - assert_current_logical_size(&tline, Lsn(0x50)); + assert_current_logical_size(&*tline, Lsn(0x50)); // The relation was created at LSN 2, not visible at LSN 1 yet. assert_eq!(tline.get_rel_exists(TESTREL_A, Lsn(0x10))?, false); @@ -1145,7 +1145,7 @@ mod tests { let mut m = tline.begin_modification(); walingest.put_rel_truncation(&mut m, TESTREL_A, 2)?; m.commit(Lsn(0x60))?; - assert_current_logical_size(&tline, Lsn(0x60)); + assert_current_logical_size(&*tline, Lsn(0x60)); // Check reported size and contents after truncation assert_eq!(tline.get_rel_size(TESTREL_A, Lsn(0x60))?, 2); @@ -1210,7 +1210,7 @@ mod tests { fn test_drop_extend() -> Result<()> { let repo = RepoHarness::create("test_drop_extend")?.load(); let tline = create_test_timeline(repo, TIMELINE_ID)?; - let mut walingest = init_walingest_test(&tline)?; + let mut walingest = init_walingest_test(&*tline)?; let mut m = tline.begin_modification(); walingest.put_rel_page_image(&mut m, TESTREL_A, 0, TEST_IMG("foo blk 0 at 2"))?; @@ -1250,7 +1250,7 @@ mod tests { fn test_truncate_extend() -> Result<()> { let repo = RepoHarness::create("test_truncate_extend")?.load(); let tline = create_test_timeline(repo, TIMELINE_ID)?; - let mut walingest = init_walingest_test(&tline)?; + let mut walingest = init_walingest_test(&*tline)?; // Create a 20 MB relation (the size is arbitrary) let relsize = 20 * 1024 * 1024 / 8192; @@ -1338,7 +1338,7 @@ mod tests { fn test_large_rel() -> Result<()> { let repo = RepoHarness::create("test_large_rel")?.load(); let tline = create_test_timeline(repo, TIMELINE_ID)?; - let mut walingest = init_walingest_test(&tline)?; + let mut walingest = init_walingest_test(&*tline)?; let mut lsn = 0x10; for blknum in 0..pg_constants::RELSEG_SIZE + 1 { @@ -1349,7 +1349,7 @@ mod tests { m.commit(Lsn(lsn))?; } - assert_current_logical_size(&tline, Lsn(lsn)); + assert_current_logical_size(&*tline, Lsn(lsn)); assert_eq!( tline.get_rel_size(TESTREL_A, Lsn(lsn))?, @@ -1365,7 +1365,7 @@ mod tests { tline.get_rel_size(TESTREL_A, Lsn(lsn))?, pg_constants::RELSEG_SIZE ); - assert_current_logical_size(&tline, Lsn(lsn)); + assert_current_logical_size(&*tline, Lsn(lsn)); // Truncate another block lsn += 0x10; @@ -1376,7 +1376,7 @@ mod tests { tline.get_rel_size(TESTREL_A, Lsn(lsn))?, pg_constants::RELSEG_SIZE - 1 ); - assert_current_logical_size(&tline, Lsn(lsn)); + assert_current_logical_size(&*tline, Lsn(lsn)); // Truncate to 1500, and then truncate all the way down to 0, one block at a time // This tests the behavior at segment boundaries @@ -1393,7 +1393,7 @@ mod tests { size -= 1; } - assert_current_logical_size(&tline, Lsn(lsn)); + assert_current_logical_size(&*tline, Lsn(lsn)); Ok(()) } diff --git a/pageserver/src/walreceiver/connection_manager.rs b/pageserver/src/walreceiver/connection_manager.rs index 614bca50ad..f2aa7ce2cf 100644 --- a/pageserver/src/walreceiver/connection_manager.rs +++ b/pageserver/src/walreceiver/connection_manager.rs @@ -25,7 +25,8 @@ use etcd_broker::{ use tokio::select; use tracing::*; -use crate::DatadirTimelineImpl; +use crate::repository::{Repository, Timeline}; +use crate::{RepositoryImpl, TimelineImpl}; use utils::{ lsn::Lsn, pq_proto::ReplicationFeedback, @@ -39,7 +40,7 @@ pub(super) fn spawn_connection_manager_task( id: ZTenantTimelineId, broker_loop_prefix: String, mut client: Client, - local_timeline: Arc, + local_timeline: Arc, wal_connect_timeout: Duration, lagging_wal_timeout: Duration, max_lsn_wal_lag: NonZeroU64, @@ -245,7 +246,7 @@ async fn exponential_backoff(n: u32, base: f64, max_seconds: f64) { struct WalreceiverState { id: ZTenantTimelineId, /// Use pageserver data about the timeline to filter out some of the safekeepers. - local_timeline: Arc, + local_timeline: Arc, /// The timeout on the connection to safekeeper for WAL streaming. wal_connect_timeout: Duration, /// The timeout to use to determine when the current connection is "stale" and reconnect to the other one. @@ -283,7 +284,7 @@ struct EtcdSkTimeline { impl WalreceiverState { fn new( id: ZTenantTimelineId, - local_timeline: Arc, + local_timeline: Arc<::Timeline>, wal_connect_timeout: Duration, lagging_wal_timeout: Duration, max_lsn_wal_lag: NonZeroU64, @@ -1203,13 +1204,10 @@ mod tests { tenant_id: harness.tenant_id, timeline_id: TIMELINE_ID, }, - local_timeline: Arc::new(DatadirTimelineImpl::new( - harness - .load() - .create_empty_timeline(TIMELINE_ID, Lsn(0)) - .expect("Failed to create an empty timeline for dummy wal connection manager"), - 10_000, - )), + local_timeline: harness + .load() + .create_empty_timeline(TIMELINE_ID, Lsn(0)) + .expect("Failed to create an empty timeline for dummy wal connection manager"), wal_connect_timeout: Duration::from_secs(1), lagging_wal_timeout: Duration::from_secs(1), max_lsn_wal_lag: NonZeroU64::new(1).unwrap(), diff --git a/pageserver/src/walreceiver/walreceiver_connection.rs b/pageserver/src/walreceiver/walreceiver_connection.rs index cc1a9cc5eb..ca29c00771 100644 --- a/pageserver/src/walreceiver/walreceiver_connection.rs +++ b/pageserver/src/walreceiver/walreceiver_connection.rs @@ -20,6 +20,7 @@ use tracing::{debug, error, info, info_span, trace, warn, Instrument}; use super::TaskEvent; use crate::{ http::models::WalReceiverEntry, + pgdatadir_mapping::DatadirTimeline, repository::{Repository, Timeline}, tenant_mgr, walingest::WalIngest, @@ -177,7 +178,7 @@ pub async fn handle_walreceiver_connection( caught_up = true; } - let timeline_to_check = Arc::clone(&timeline.tline); + let timeline_to_check = Arc::clone(&timeline); tokio::task::spawn_blocking(move || timeline_to_check.check_checkpoint_distance()) .await .with_context(|| { @@ -225,7 +226,7 @@ pub async fn handle_walreceiver_connection( // The last LSN we processed. It is not guaranteed to survive pageserver crash. let write_lsn = u64::from(last_lsn); // `disk_consistent_lsn` is the LSN at which page server guarantees local persistence of all received data - let flush_lsn = u64::from(timeline.tline.get_disk_consistent_lsn()); + let flush_lsn = u64::from(timeline.get_disk_consistent_lsn()); // The last LSN that is synced to remote storage and is guaranteed to survive pageserver crash // Used by safekeepers to remove WAL preceding `remote_consistent_lsn`. let apply_lsn = u64::from(timeline_remote_consistent_lsn);