diff --git a/pageserver/src/basebackup.rs b/pageserver/src/basebackup.rs index 1ee48eb2fc..396e93acc1 100644 --- a/pageserver/src/basebackup.rs +++ b/pageserver/src/basebackup.rs @@ -29,9 +29,9 @@ use zenith_utils::lsn::Lsn; /// This is short-living object only for the time of tarball creation, /// created mostly to avoid passing a lot of parameters between various functions /// used for constructing tarball. -pub struct Basebackup<'a> { +pub struct Basebackup<'a, T> { ar: Builder<&'a mut dyn Write>, - timeline: &'a Arc, + timeline: &'a Arc, pub lsn: Lsn, prev_record_lsn: Lsn, } @@ -43,12 +43,14 @@ pub struct Basebackup<'a> { // * When working without safekeepers. In this situation it is important to match the lsn // we are taking basebackup on with the lsn that is used in pageserver's walreceiver // to start the replication. -impl<'a> Basebackup<'a> { +impl<'a, T> Basebackup<'a, T> +where T: Timeline, +{ pub fn new( write: &'a mut dyn Write, - timeline: &'a Arc, + timeline: &'a Arc, req_lsn: Option, - ) -> Result> { + ) -> Result> { // Compute postgres doesn't have any previous WAL files, but the first // record that it's going to write needs to include the LSN of the // previous record (xl_prev). We include prev_record_lsn in the diff --git a/pageserver/src/branches.rs b/pageserver/src/branches.rs index 43f27af5ea..8b27762ed5 100644 --- a/pageserver/src/branches.rs +++ b/pageserver/src/branches.rs @@ -25,6 +25,7 @@ use crate::CheckpointConfig; use crate::{config::PageServerConf, repository::Repository}; use crate::{import_datadir, LOG_FILE_NAME}; use crate::{repository::RepositoryTimeline, tenant_mgr}; +use crate::repository::Timeline; #[derive(Serialize, Deserialize, Clone)] pub struct BranchInfo { @@ -39,9 +40,9 @@ pub struct BranchInfo { } impl BranchInfo { - pub fn from_path>( - path: T, - repo: &Arc, + pub fn from_path>( + path: P, + repo: &R, include_non_incremental_logical_size: bool, ) -> Result { let path = path.as_ref(); @@ -129,7 +130,7 @@ pub fn create_repo( conf: &'static PageServerConf, tenantid: ZTenantId, wal_redo_manager: Arc, -) -> Result> { +) -> Result> { let repo_dir = conf.tenant_path(&tenantid); if repo_dir.exists() { bail!("repo for {} already exists", tenantid) @@ -211,11 +212,11 @@ fn run_initdb(conf: &'static PageServerConf, initdbpath: &Path) -> Result<()> { // - run initdb to init temporary instance and get bootstrap data // - after initialization complete, remove the temp dir. // -fn bootstrap_timeline( +fn bootstrap_timeline( conf: &'static PageServerConf, tenantid: ZTenantId, tli: ZTimelineId, - repo: &dyn Repository, + repo: &R, ) -> Result<()> { let _enter = info_span!("bootstrapping", timeline = %tli, tenant = %tenantid).entered(); @@ -234,7 +235,7 @@ fn bootstrap_timeline( let timeline = repo.create_empty_timeline(tli, lsn)?; import_datadir::import_timeline_from_postgres_datadir( &pgdata_path, - timeline.writer().as_ref(), + &*timeline, lsn, )?; timeline.checkpoint(CheckpointConfig::Forced)?; @@ -284,7 +285,7 @@ pub(crate) fn get_branches( })?; BranchInfo::from_path( dir_entry.path(), - &repo, + repo.as_ref(), include_non_incremental_logical_size, ) }) diff --git a/pageserver/src/http/routes.rs b/pageserver/src/http/routes.rs index 26d473efaf..f98978c7c8 100644 --- a/pageserver/src/http/routes.rs +++ b/pageserver/src/http/routes.rs @@ -26,7 +26,7 @@ use super::models::BranchCreateRequest; use super::models::StatusResponse; use super::models::TenantCreateRequest; use crate::branches::BranchInfo; -use crate::repository::RepositoryTimeline; +use crate::repository::{Repository, RepositoryTimeline, Timeline}; use crate::repository::TimelineSyncState; use crate::{branches, config::PageServerConf, tenant_mgr, ZTenantId}; @@ -138,7 +138,7 @@ async fn branch_detail_handler(request: Request) -> Result, let response_data = tokio::task::spawn_blocking(move || { let _enter = info_span!("branch_detail", tenant = %tenantid, branch=%branch_name).entered(); let repo = tenant_mgr::get_repository_for_tenant(tenantid)?; - BranchInfo::from_path(path, &repo, include_non_incremental_logical_size) + BranchInfo::from_path(path, repo.as_ref(), include_non_incremental_logical_size) }) .await .map_err(ApiError::from_err)??; diff --git a/pageserver/src/import_datadir.rs b/pageserver/src/import_datadir.rs index e317118bb5..dd3d6e7029 100644 --- a/pageserver/src/import_datadir.rs +++ b/pageserver/src/import_datadir.rs @@ -27,13 +27,16 @@ use zenith_utils::lsn::Lsn; /// This is currently only used to import a cluster freshly created by initdb. /// The code that deals with the checkpoint would not work right if the /// cluster was not shut down cleanly. -pub fn import_timeline_from_postgres_datadir( +pub fn import_timeline_from_postgres_datadir( path: &Path, - writer: &dyn TimelineWriter, + timeline: &T, lsn: Lsn, ) -> Result<()> { let mut pg_control: Option = None; + let writer_box = timeline.writer(); + let writer = writer_box.as_ref(); + // Scan 'global' for direntry in fs::read_dir(path.join("global"))? { let direntry = direntry?; @@ -141,6 +144,7 @@ pub fn import_timeline_from_postgres_datadir( // *after* the checkpoint record. And crucially, it initializes the 'prev_lsn'. import_wal( &path.join("pg_wal"), + timeline, writer, Lsn(pg_control.checkPointCopy.redo), lsn, @@ -310,8 +314,9 @@ fn import_slru_file( /// Scan PostgreSQL WAL files in given directory and load all records between /// 'startpoint' and 'endpoint' into the repository. -fn import_wal( +fn import_wal( walpath: &Path, + timeline: &T, writer: &dyn TimelineWriter, startpoint: Lsn, endpoint: Lsn, @@ -322,7 +327,7 @@ fn import_wal( let mut offset = startpoint.segment_offset(pg_constants::WAL_SEGMENT_SIZE); let mut last_lsn = startpoint; - let mut walingest = WalIngest::new(writer.deref(), startpoint)?; + let mut walingest = WalIngest::new(timeline, startpoint)?; while last_lsn <= endpoint { // FIXME: assume postgresql tli 1 for now @@ -355,7 +360,7 @@ fn import_wal( let mut nrecords = 0; while last_lsn <= endpoint { if let Some((lsn, recdata)) = waldecoder.poll_decode()? { - walingest.ingest_record(writer, recdata, lsn)?; + walingest.ingest_record(timeline, writer, recdata, lsn)?; last_lsn = lsn; nrecords += 1; diff --git a/pageserver/src/layered_repository.rs b/pageserver/src/layered_repository.rs index 975b2f5d2b..909477b722 100644 --- a/pageserver/src/layered_repository.rs +++ b/pageserver/src/layered_repository.rs @@ -136,7 +136,9 @@ pub struct LayeredRepository { /// Public interface impl Repository for LayeredRepository { - fn get_timeline(&self, timelineid: ZTimelineId) -> Result { + type Timeline = LayeredTimeline; + + fn get_timeline(&self, timelineid: ZTimelineId) -> Result> { let mut timelines = self.timelines.lock().unwrap(); Ok( match self.get_or_init_timeline(timelineid, &mut timelines)? { @@ -156,7 +158,7 @@ impl Repository for LayeredRepository { &self, timelineid: ZTimelineId, initdb_lsn: Lsn, - ) -> Result> { + ) -> Result> { let mut timelines = self.timelines.lock().unwrap(); // Create the timeline directory, and write initial metadata to file. @@ -1073,10 +1075,6 @@ impl Timeline for LayeredTimeline { _write_guard: self.write_lock.lock().unwrap(), }) } - - fn upgrade_to_layered_timeline(&self) -> &crate::layered_repository::LayeredTimeline { - self - } } impl LayeredTimeline { @@ -2143,20 +2141,20 @@ impl LayeredTimeline { } } -struct LayeredTimelineWriter<'a> { +pub struct LayeredTimelineWriter<'a> { tl: &'a LayeredTimeline, _write_guard: MutexGuard<'a, ()>, } impl Deref for LayeredTimelineWriter<'_> { - type Target = dyn Timeline; + type Target = LayeredTimeline; fn deref(&self) -> &Self::Target { self.tl } } -impl<'a> TimelineWriter for LayeredTimelineWriter<'a> { +impl<'a> TimelineWriter<'_> for LayeredTimelineWriter<'a> { fn put_wal_record( &self, lsn: Lsn, diff --git a/pageserver/src/lib.rs b/pageserver/src/lib.rs index 3a68f56187..a8a878c448 100644 --- a/pageserver/src/lib.rs +++ b/pageserver/src/lib.rs @@ -22,6 +22,8 @@ use lazy_static::lazy_static; use zenith_metrics::{register_int_gauge_vec, IntGaugeVec}; use zenith_utils::zid::{ZTenantId, ZTimelineId}; +use layered_repository::{LayeredRepository, LayeredTimeline}; + lazy_static! { static ref LIVE_CONNECTIONS_COUNT: IntGaugeVec = register_int_gauge_vec!( "pageserver_live_connections_count", @@ -43,3 +45,7 @@ pub enum CheckpointConfig { // Flush all in-memory data and reconstruct all page images Forced, } + +pub type RepositoryImpl = LayeredRepository; +pub type TimelineImpl = LayeredTimeline; + diff --git a/pageserver/src/page_service.rs b/pageserver/src/page_service.rs index 7dc3c8c752..bf20cfb0db 100644 --- a/pageserver/src/page_service.rs +++ b/pageserver/src/page_service.rs @@ -33,7 +33,7 @@ use zenith_utils::zid::{ZTenantId, ZTimelineId}; use crate::basebackup; use crate::config::PageServerConf; use crate::relish::*; -use crate::repository::Timeline; +use crate::repository::{Repository, Timeline}; use crate::tenant_mgr; use crate::thread_mgr; use crate::thread_mgr::ThreadKind; @@ -395,8 +395,8 @@ impl PageServerHandler { /// In either case, if the page server hasn't received the WAL up to the /// requested LSN yet, we will wait for it to arrive. The return value is /// the LSN that should be used to look up the page versions. - fn wait_or_get_last_lsn( - timeline: &dyn Timeline, + fn wait_or_get_last_lsn( + timeline: &T, mut lsn: Lsn, latest: bool, latest_gc_cutoff_lsn: &RwLockReadGuard, @@ -443,9 +443,9 @@ impl PageServerHandler { Ok(lsn) } - fn handle_get_rel_exists_request( + fn handle_get_rel_exists_request( &self, - timeline: &dyn Timeline, + timeline: &T, req: &PagestreamExistsRequest, ) -> Result { let _enter = info_span!("get_rel_exists", rel = %req.rel, req_lsn = %req.lsn).entered(); @@ -461,9 +461,9 @@ impl PageServerHandler { })) } - fn handle_get_nblocks_request( + fn handle_get_nblocks_request( &self, - timeline: &dyn Timeline, + timeline: &T, req: &PagestreamNblocksRequest, ) -> Result { let _enter = info_span!("get_nblocks", rel = %req.rel, req_lsn = %req.lsn).entered(); @@ -482,9 +482,9 @@ impl PageServerHandler { })) } - fn handle_get_page_at_lsn_request( + fn handle_get_page_at_lsn_request( &self, - timeline: &dyn Timeline, + timeline: &T, req: &PagestreamGetPageRequest, ) -> Result { let _enter = info_span!("get_page", rel = %req.rel, blkno = &req.blkno, req_lsn = %req.lsn) diff --git a/pageserver/src/repository.rs b/pageserver/src/repository.rs index 6142953a58..49aa04ea7c 100644 --- a/pageserver/src/repository.rs +++ b/pageserver/src/repository.rs @@ -6,7 +6,7 @@ use bytes::Bytes; use postgres_ffi::{MultiXactId, MultiXactOffset, TransactionId}; use serde::{Deserialize, Serialize}; use std::collections::HashSet; -use std::ops::{AddAssign, Deref}; +use std::ops::AddAssign; use std::sync::{Arc, RwLockReadGuard}; use std::time::Duration; use zenith_utils::lsn::{Lsn, RecordLsn}; @@ -19,6 +19,8 @@ pub type BlockNumber = u32; /// A repository corresponds to one .zenith directory. One repository holds multiple /// timelines, forked off from the same initial call to 'initdb'. pub trait Repository: Send + Sync { + type Timeline: Timeline; + fn detach_timeline(&self, timeline_id: ZTimelineId) -> Result<()>; /// Updates timeline based on the new sync state, received from the remote storage synchronization. @@ -34,7 +36,7 @@ pub trait Repository: Send + Sync { fn get_timeline_state(&self, timeline_id: ZTimelineId) -> Option; /// Get Timeline handle for given zenith timeline ID. - fn get_timeline(&self, timelineid: ZTimelineId) -> Result; + fn get_timeline(&self, timelineid: ZTimelineId) -> Result>; /// Create a new, empty timeline. The caller is responsible for loading data into it /// Initdb lsn is provided for timeline impl to be able to perform checks for some operations against it. @@ -42,7 +44,7 @@ pub trait Repository: Send + Sync { &self, timelineid: ZTimelineId, initdb_lsn: Lsn, - ) -> Result>; + ) -> Result>; /// Branch a timeline fn branch_timeline(&self, src: ZTimelineId, dst: ZTimelineId, start_lsn: Lsn) -> Result<()>; @@ -69,10 +71,10 @@ pub trait Repository: Send + Sync { } /// A timeline, that belongs to the current repository. -pub enum RepositoryTimeline { +pub enum RepositoryTimeline { /// Timeline, with its files present locally in pageserver's working directory. /// Loaded into pageserver's memory and ready to be used. - Local(Arc), + Local(Arc), /// Timeline, found on the pageserver's remote storage, but not yet downloaded locally. Remote { id: ZTimelineId, @@ -81,8 +83,8 @@ pub enum RepositoryTimeline { }, } -impl RepositoryTimeline { - pub fn local_timeline(&self) -> Option> { +impl RepositoryTimeline { + pub fn local_timeline(&self) -> Option> { if let Self::Local(local_timeline) = self { Some(Arc::clone(local_timeline)) } else { @@ -217,7 +219,6 @@ pub trait Timeline: Send + Sync { // // These are called by the WAL receiver to digest WAL records. //------------------------------------------------------------------------------ - /// Atomically get both last and prev. fn get_last_record_rlsn(&self) -> RecordLsn; @@ -229,6 +230,10 @@ pub trait Timeline: Send + Sync { fn get_disk_consistent_lsn(&self) -> Lsn; /// Mutate the timeline with a [`TimelineWriter`]. + /// + /// FIXME: This ought to return &'a TimelineWriter, where TimelineWriter + /// is a generic type in this trait. But that doesn't currently work in + /// Rust: https://rust-lang.github.io/rfcs/1598-generic_associated_types.html fn writer<'a>(&'a self) -> Box; /// @@ -255,16 +260,13 @@ pub trait Timeline: Send + Sync { /// Does the same as get_current_logical_size but counted on demand. /// Used in tests to ensure that incremental and non incremental variants match. fn get_current_logical_size_non_incremental(&self, lsn: Lsn) -> Result; - - /// An escape hatch to allow "casting" a generic Timeline to LayeredTimeline. - fn upgrade_to_layered_timeline(&self) -> &crate::layered_repository::LayeredTimeline; } /// Various functions to mutate the timeline. // TODO Currently, Deref is used to allow easy access to read methods from this trait. // This is probably considered a bad practice in Rust and should be fixed eventually, // but will cause large code changes. -pub trait TimelineWriter: Deref { +pub trait TimelineWriter<'a> { /// Put a new page version that can be constructed from a WAL record /// /// This will implicitly extend the relation, if the page is beyond the @@ -395,15 +397,15 @@ pub mod repo_harness { Ok(Self { conf, tenant_id }) } - pub fn load(&self) -> Box { + pub fn load(&self) -> LayeredRepository { let walredo_mgr = Arc::new(TestRedoManager); - Box::new(LayeredRepository::new( + LayeredRepository::new( self.conf, walredo_mgr, self.tenant_id, false, - )) + ) } pub fn timeline_path(&self, timeline_id: &ZTimelineId) -> PathBuf { @@ -467,7 +469,7 @@ mod tests { forknum: 0, }); - fn assert_current_logical_size(timeline: &Arc, lsn: Lsn) { + fn assert_current_logical_size(timeline: &Arc, lsn: Lsn) { let incremental = timeline.get_current_logical_size(); let non_incremental = timeline .get_current_logical_size_non_incremental(lsn) @@ -915,7 +917,7 @@ mod tests { Ok(()) } - fn make_some_layers(tline: &Arc, start_lsn: Lsn) -> Result<()> { + fn make_some_layers(tline: &Arc, start_lsn: Lsn) -> Result<()> { let mut lsn = start_lsn; { let writer = tline.writer(); diff --git a/pageserver/src/tenant_mgr.rs b/pageserver/src/tenant_mgr.rs index d60b5fefd3..c0b54278cd 100644 --- a/pageserver/src/tenant_mgr.rs +++ b/pageserver/src/tenant_mgr.rs @@ -2,9 +2,10 @@ //! page server. use crate::branches; +use crate::{RepositoryImpl, TimelineImpl}; use crate::config::PageServerConf; use crate::layered_repository::LayeredRepository; -use crate::repository::{Repository, Timeline, TimelineSyncState}; +use crate::repository::{Repository, TimelineSyncState}; use crate::thread_mgr; use crate::thread_mgr::ThreadKind; use crate::walredo::PostgresRedoManager; @@ -24,7 +25,7 @@ lazy_static! { struct Tenant { state: TenantState, - repo: Arc, + repo: Arc, } #[derive(Debug, Serialize, Deserialize, Clone, Copy, PartialEq, Eq)] @@ -78,7 +79,7 @@ pub fn set_timeline_states( let walredo_mgr = PostgresRedoManager::new(conf, tenant_id); // Set up an object repository, for actual data storage. - let repo: Arc = Arc::new(LayeredRepository::new( + let repo: Arc = Arc::new(LayeredRepository::new( conf, Arc::new(walredo_mgr), tenant_id, @@ -248,7 +249,7 @@ pub fn activate_tenant(conf: &'static PageServerConf, tenantid: ZTenantId) -> Re Ok(()) } -pub fn get_repository_for_tenant(tenantid: ZTenantId) -> Result> { +pub fn get_repository_for_tenant(tenantid: ZTenantId) -> Result> { let m = access_tenants(); let tenant = m .get(&tenantid) @@ -260,7 +261,7 @@ pub fn get_repository_for_tenant(tenantid: ZTenantId) -> Result Result> { +) -> Result> { get_repository_for_tenant(tenantid)? .get_timeline(timelineid)? .local_timeline() diff --git a/pageserver/src/tenant_threads.rs b/pageserver/src/tenant_threads.rs index 062af9f1ad..673a92b80d 100644 --- a/pageserver/src/tenant_threads.rs +++ b/pageserver/src/tenant_threads.rs @@ -1,6 +1,7 @@ //! This module contains functions to serve per-tenant background processes, //! such as checkpointer and GC use crate::config::PageServerConf; +use crate::repository::Repository; use crate::tenant_mgr; use crate::tenant_mgr::TenantState; use crate::CheckpointConfig; diff --git a/pageserver/src/walingest.rs b/pageserver/src/walingest.rs index 1962c9bbd3..615a9960fe 100644 --- a/pageserver/src/walingest.rs +++ b/pageserver/src/walingest.rs @@ -46,7 +46,7 @@ pub struct WalIngest { } impl WalIngest { - pub fn new(timeline: &dyn Timeline, startpoint: Lsn) -> Result { + pub fn new(timeline: &T, startpoint: Lsn) -> Result { // Fetch the latest checkpoint into memory, so that we can compare with it // quickly in `ingest_record` and update it when it changes. let checkpoint_bytes = timeline.get_page_at_lsn(RelishTag::Checkpoint, 0, startpoint)?; @@ -66,9 +66,10 @@ impl WalIngest { /// Helper function to parse a WAL record and call the Timeline's PUT functions for all the /// relations/pages that the record affects. /// - pub fn ingest_record( + pub fn ingest_record( &mut self, - timeline: &dyn TimelineWriter, + timeline: &T, + writer: &dyn TimelineWriter, recdata: Bytes, lsn: Lsn, ) -> Result<()> { @@ -86,7 +87,7 @@ impl WalIngest { if decoded.xl_rmid == pg_constants::RM_HEAP_ID || decoded.xl_rmid == pg_constants::RM_HEAP2_ID { - self.ingest_heapam_record(&mut buf, timeline, lsn, &mut decoded)?; + self.ingest_heapam_record(&mut buf, writer, lsn, &mut decoded)?; } // Handle other special record types if decoded.xl_rmid == pg_constants::RM_SMGR_ID @@ -94,13 +95,13 @@ impl WalIngest { == pg_constants::XLOG_SMGR_TRUNCATE { let truncate = XlSmgrTruncate::decode(&mut buf); - self.ingest_xlog_smgr_truncate(timeline, lsn, &truncate)?; + self.ingest_xlog_smgr_truncate(writer, lsn, &truncate)?; } else if decoded.xl_rmid == pg_constants::RM_DBASE_ID { if (decoded.xl_info & pg_constants::XLR_RMGR_INFO_MASK) == pg_constants::XLOG_DBASE_CREATE { let createdb = XlCreateDatabase::decode(&mut buf); - self.ingest_xlog_dbase_create(timeline, lsn, &createdb)?; + self.ingest_xlog_dbase_create(timeline, writer, lsn, &createdb)?; } else if (decoded.xl_info & pg_constants::XLR_RMGR_INFO_MASK) == pg_constants::XLOG_DBASE_DROP { @@ -113,7 +114,7 @@ impl WalIngest { for tablespace_id in dropdb.tablespace_ids { let rels = timeline.list_rels(tablespace_id, dropdb.db_id, req_lsn)?; for rel in rels { - timeline.drop_relish(rel, lsn)?; + writer.drop_relish(rel, lsn)?; } trace!( "Drop FileNodeMap {}, {} at lsn {}", @@ -121,7 +122,7 @@ impl WalIngest { dropdb.db_id, lsn ); - timeline.drop_relish( + writer.drop_relish( RelishTag::FileNodeMap { spcnode: tablespace_id, dbnode: dropdb.db_id, @@ -138,7 +139,7 @@ impl WalIngest { let pageno = buf.get_u32_le(); let segno = pageno / pg_constants::SLRU_PAGES_PER_SEGMENT; let rpageno = pageno % pg_constants::SLRU_PAGES_PER_SEGMENT; - timeline.put_page_image( + writer.put_page_image( RelishTag::Slru { slru: SlruKind::Clog, segno, @@ -150,7 +151,7 @@ impl WalIngest { } else { assert!(info == pg_constants::CLOG_TRUNCATE); let xlrec = XlClogTruncate::decode(&mut buf); - self.ingest_clog_truncate_record(timeline, lsn, &xlrec)?; + self.ingest_clog_truncate_record(timeline, writer, lsn, &xlrec)?; } } else if decoded.xl_rmid == pg_constants::RM_XACT_ID { let info = decoded.xl_info & pg_constants::XLOG_XACT_OPMASK; @@ -158,7 +159,7 @@ impl WalIngest { let parsed_xact = XlXactParsedRecord::decode(&mut buf, decoded.xl_xid, decoded.xl_info); self.ingest_xact_record( - timeline, + writer, lsn, &parsed_xact, info == pg_constants::XLOG_XACT_COMMIT, @@ -169,7 +170,7 @@ impl WalIngest { let parsed_xact = XlXactParsedRecord::decode(&mut buf, decoded.xl_xid, decoded.xl_info); self.ingest_xact_record( - timeline, + writer, lsn, &parsed_xact, info == pg_constants::XLOG_XACT_COMMIT_PREPARED, @@ -181,14 +182,14 @@ impl WalIngest { parsed_xact.xid, lsn ); - timeline.drop_relish( + writer.drop_relish( RelishTag::TwoPhase { xid: parsed_xact.xid, }, lsn, )?; } else if info == pg_constants::XLOG_XACT_PREPARE { - timeline.put_page_image( + writer.put_page_image( RelishTag::TwoPhase { xid: decoded.xl_xid, }, @@ -204,7 +205,7 @@ impl WalIngest { let pageno = buf.get_u32_le(); let segno = pageno / pg_constants::SLRU_PAGES_PER_SEGMENT; let rpageno = pageno % pg_constants::SLRU_PAGES_PER_SEGMENT; - timeline.put_page_image( + writer.put_page_image( RelishTag::Slru { slru: SlruKind::MultiXactOffsets, segno, @@ -217,7 +218,7 @@ impl WalIngest { let pageno = buf.get_u32_le(); let segno = pageno / pg_constants::SLRU_PAGES_PER_SEGMENT; let rpageno = pageno % pg_constants::SLRU_PAGES_PER_SEGMENT; - timeline.put_page_image( + writer.put_page_image( RelishTag::Slru { slru: SlruKind::MultiXactMembers, segno, @@ -228,14 +229,14 @@ impl WalIngest { )?; } else if info == pg_constants::XLOG_MULTIXACT_CREATE_ID { let xlrec = XlMultiXactCreate::decode(&mut buf); - self.ingest_multixact_create_record(timeline, lsn, &xlrec)?; + self.ingest_multixact_create_record(writer, lsn, &xlrec)?; } else if info == pg_constants::XLOG_MULTIXACT_TRUNCATE_ID { let xlrec = XlMultiXactTruncate::decode(&mut buf); - self.ingest_multixact_truncate_record(timeline, lsn, &xlrec)?; + self.ingest_multixact_truncate_record(writer, lsn, &xlrec)?; } } else if decoded.xl_rmid == pg_constants::RM_RELMAP_ID { let xlrec = XlRelmapUpdate::decode(&mut buf); - self.ingest_relmap_page(timeline, lsn, &xlrec, &decoded)?; + self.ingest_relmap_page(writer, lsn, &xlrec, &decoded)?; } else if decoded.xl_rmid == pg_constants::RM_XLOG_ID { let info = decoded.xl_info & pg_constants::XLR_RMGR_INFO_MASK; if info == pg_constants::XLOG_NEXTOID { @@ -270,20 +271,20 @@ impl WalIngest { // Iterate through all the blocks that the record modifies, and // "put" a separate copy of the record for each block. for blk in decoded.blocks.iter() { - self.ingest_decoded_block(timeline, lsn, &decoded, blk)?; + self.ingest_decoded_block(writer, lsn, &decoded, blk)?; } // If checkpoint data was updated, store the new version in the repository if self.checkpoint_modified { let new_checkpoint_bytes = self.checkpoint.encode(); - timeline.put_page_image(RelishTag::Checkpoint, 0, lsn, new_checkpoint_bytes)?; + writer.put_page_image(RelishTag::Checkpoint, 0, lsn, new_checkpoint_bytes)?; self.checkpoint_modified = false; } // Now that this record has been fully handled, including updating the // checkpoint data, let the repository know that it is up-to-date to this LSN - timeline.advance_last_record_lsn(lsn); + writer.advance_last_record_lsn(lsn); Ok(()) } @@ -465,9 +466,10 @@ impl WalIngest { } /// Subroutine of ingest_record(), to handle an XLOG_DBASE_CREATE record. - fn ingest_xlog_dbase_create( + fn ingest_xlog_dbase_create( &mut self, - timeline: &dyn TimelineWriter, + timeline: &T, + writer: &dyn TimelineWriter, lsn: Lsn, rec: &XlCreateDatabase, ) -> Result<()> { @@ -508,13 +510,13 @@ impl WalIngest { debug!("copying block {} from {} to {}", blknum, src_rel, dst_rel); - timeline.put_page_image(RelishTag::Relation(dst_rel), blknum, lsn, content)?; + writer.put_page_image(RelishTag::Relation(dst_rel), blknum, lsn, content)?; num_blocks_copied += 1; } if nblocks == 0 { // make sure we have some trace of the relation, even if it's empty - timeline.put_truncation(RelishTag::Relation(dst_rel), lsn, 0)?; + writer.put_truncation(RelishTag::Relation(dst_rel), lsn, 0)?; } num_rels_copied += 1; @@ -532,7 +534,7 @@ impl WalIngest { spcnode: tablespace_id, dbnode: db_id, }; - timeline.put_page_image(new_tag, 0, lsn, img)?; + writer.put_page_image(new_tag, 0, lsn, img)?; break; } } @@ -680,9 +682,10 @@ impl WalIngest { Ok(()) } - fn ingest_clog_truncate_record( + fn ingest_clog_truncate_record( &mut self, - timeline: &dyn TimelineWriter, + timeline: &T, + writer: &dyn TimelineWriter, lsn: Lsn, xlrec: &XlClogTruncate, ) -> Result<()> { @@ -732,7 +735,7 @@ impl WalIngest { if slru == SlruKind::Clog { let segpage = segno * pg_constants::SLRU_PAGES_PER_SEGMENT; if slru_may_delete_clogsegment(segpage, xlrec.pageno) { - timeline.drop_relish(RelishTag::Slru { slru, segno }, lsn)?; + writer.drop_relish(RelishTag::Slru { slru, segno }, lsn)?; trace!("Drop CLOG segment {:>04X} at lsn {}", segno, lsn); } } diff --git a/pageserver/src/walreceiver.rs b/pageserver/src/walreceiver.rs index 6fff1d062d..abf6bace22 100644 --- a/pageserver/src/walreceiver.rs +++ b/pageserver/src/walreceiver.rs @@ -6,6 +6,8 @@ //! We keep one WAL receiver active per timeline. use crate::config::PageServerConf; +use crate::repository::Repository; +use crate::repository::Timeline; use crate::tenant_mgr; use crate::thread_mgr; use crate::thread_mgr::ThreadKind; @@ -250,11 +252,11 @@ fn walreceiver_main( // It is important to deal with the aligned records as lsn in getPage@LSN is // aligned and can be several bytes bigger. Without this alignment we are - // at risk of hittind a deadlock. + // at risk of hitting a deadlock. assert!(lsn.is_aligned()); let writer = timeline.writer(); - walingest.ingest_record(writer.as_ref(), recdata, lsn)?; + walingest.ingest_record(&*timeline, writer.as_ref(), recdata, lsn)?; fail_point!("walreceiver-after-ingest");