Use more generics, less dyn

This commit is contained in:
Heikki Linnakangas
2022-03-01 13:52:49 +02:00
parent 934bbcba0f
commit c7c1e19667
12 changed files with 113 additions and 92 deletions

View File

@@ -29,9 +29,9 @@ use zenith_utils::lsn::Lsn;
/// This is short-living object only for the time of tarball creation,
/// created mostly to avoid passing a lot of parameters between various functions
/// used for constructing tarball.
pub struct Basebackup<'a> {
pub struct Basebackup<'a, T> {
ar: Builder<&'a mut dyn Write>,
timeline: &'a Arc<dyn Timeline>,
timeline: &'a Arc<T>,
pub lsn: Lsn,
prev_record_lsn: Lsn,
}
@@ -43,12 +43,14 @@ pub struct Basebackup<'a> {
// * When working without safekeepers. In this situation it is important to match the lsn
// we are taking basebackup on with the lsn that is used in pageserver's walreceiver
// to start the replication.
impl<'a> Basebackup<'a> {
impl<'a, T> Basebackup<'a, T>
where T: Timeline,
{
pub fn new(
write: &'a mut dyn Write,
timeline: &'a Arc<dyn Timeline>,
timeline: &'a Arc<T>,
req_lsn: Option<Lsn>,
) -> Result<Basebackup<'a>> {
) -> Result<Basebackup<'a, T>> {
// Compute postgres doesn't have any previous WAL files, but the first
// record that it's going to write needs to include the LSN of the
// previous record (xl_prev). We include prev_record_lsn in the

View File

@@ -25,6 +25,7 @@ use crate::CheckpointConfig;
use crate::{config::PageServerConf, repository::Repository};
use crate::{import_datadir, LOG_FILE_NAME};
use crate::{repository::RepositoryTimeline, tenant_mgr};
use crate::repository::Timeline;
#[derive(Serialize, Deserialize, Clone)]
pub struct BranchInfo {
@@ -39,9 +40,9 @@ pub struct BranchInfo {
}
impl BranchInfo {
pub fn from_path<T: AsRef<Path>>(
path: T,
repo: &Arc<dyn Repository>,
pub fn from_path<R: Repository, P: AsRef<Path>>(
path: P,
repo: &R,
include_non_incremental_logical_size: bool,
) -> Result<Self> {
let path = path.as_ref();
@@ -129,7 +130,7 @@ pub fn create_repo(
conf: &'static PageServerConf,
tenantid: ZTenantId,
wal_redo_manager: Arc<dyn WalRedoManager + Send + Sync>,
) -> Result<Arc<dyn Repository>> {
) -> Result<Arc<crate::layered_repository::LayeredRepository>> {
let repo_dir = conf.tenant_path(&tenantid);
if repo_dir.exists() {
bail!("repo for {} already exists", tenantid)
@@ -211,11 +212,11 @@ fn run_initdb(conf: &'static PageServerConf, initdbpath: &Path) -> Result<()> {
// - run initdb to init temporary instance and get bootstrap data
// - after initialization complete, remove the temp dir.
//
fn bootstrap_timeline(
fn bootstrap_timeline<R: Repository>(
conf: &'static PageServerConf,
tenantid: ZTenantId,
tli: ZTimelineId,
repo: &dyn Repository,
repo: &R,
) -> Result<()> {
let _enter = info_span!("bootstrapping", timeline = %tli, tenant = %tenantid).entered();
@@ -234,7 +235,7 @@ fn bootstrap_timeline(
let timeline = repo.create_empty_timeline(tli, lsn)?;
import_datadir::import_timeline_from_postgres_datadir(
&pgdata_path,
timeline.writer().as_ref(),
&*timeline,
lsn,
)?;
timeline.checkpoint(CheckpointConfig::Forced)?;
@@ -284,7 +285,7 @@ pub(crate) fn get_branches(
})?;
BranchInfo::from_path(
dir_entry.path(),
&repo,
repo.as_ref(),
include_non_incremental_logical_size,
)
})

View File

@@ -26,7 +26,7 @@ use super::models::BranchCreateRequest;
use super::models::StatusResponse;
use super::models::TenantCreateRequest;
use crate::branches::BranchInfo;
use crate::repository::RepositoryTimeline;
use crate::repository::{Repository, RepositoryTimeline, Timeline};
use crate::repository::TimelineSyncState;
use crate::{branches, config::PageServerConf, tenant_mgr, ZTenantId};
@@ -138,7 +138,7 @@ async fn branch_detail_handler(request: Request<Body>) -> Result<Response<Body>,
let response_data = tokio::task::spawn_blocking(move || {
let _enter = info_span!("branch_detail", tenant = %tenantid, branch=%branch_name).entered();
let repo = tenant_mgr::get_repository_for_tenant(tenantid)?;
BranchInfo::from_path(path, &repo, include_non_incremental_logical_size)
BranchInfo::from_path(path, repo.as_ref(), include_non_incremental_logical_size)
})
.await
.map_err(ApiError::from_err)??;

View File

@@ -27,13 +27,16 @@ use zenith_utils::lsn::Lsn;
/// This is currently only used to import a cluster freshly created by initdb.
/// The code that deals with the checkpoint would not work right if the
/// cluster was not shut down cleanly.
pub fn import_timeline_from_postgres_datadir(
pub fn import_timeline_from_postgres_datadir<T: Timeline>(
path: &Path,
writer: &dyn TimelineWriter,
timeline: &T,
lsn: Lsn,
) -> Result<()> {
let mut pg_control: Option<ControlFileData> = None;
let writer_box = timeline.writer();
let writer = writer_box.as_ref();
// Scan 'global'
for direntry in fs::read_dir(path.join("global"))? {
let direntry = direntry?;
@@ -141,6 +144,7 @@ pub fn import_timeline_from_postgres_datadir(
// *after* the checkpoint record. And crucially, it initializes the 'prev_lsn'.
import_wal(
&path.join("pg_wal"),
timeline,
writer,
Lsn(pg_control.checkPointCopy.redo),
lsn,
@@ -310,8 +314,9 @@ fn import_slru_file(
/// Scan PostgreSQL WAL files in given directory and load all records between
/// 'startpoint' and 'endpoint' into the repository.
fn import_wal(
fn import_wal<T: Timeline>(
walpath: &Path,
timeline: &T,
writer: &dyn TimelineWriter,
startpoint: Lsn,
endpoint: Lsn,
@@ -322,7 +327,7 @@ fn import_wal(
let mut offset = startpoint.segment_offset(pg_constants::WAL_SEGMENT_SIZE);
let mut last_lsn = startpoint;
let mut walingest = WalIngest::new(writer.deref(), startpoint)?;
let mut walingest = WalIngest::new(timeline, startpoint)?;
while last_lsn <= endpoint {
// FIXME: assume postgresql tli 1 for now
@@ -355,7 +360,7 @@ fn import_wal(
let mut nrecords = 0;
while last_lsn <= endpoint {
if let Some((lsn, recdata)) = waldecoder.poll_decode()? {
walingest.ingest_record(writer, recdata, lsn)?;
walingest.ingest_record(timeline, writer, recdata, lsn)?;
last_lsn = lsn;
nrecords += 1;

View File

@@ -136,7 +136,9 @@ pub struct LayeredRepository {
/// Public interface
impl Repository for LayeredRepository {
fn get_timeline(&self, timelineid: ZTimelineId) -> Result<RepositoryTimeline> {
type Timeline = LayeredTimeline;
fn get_timeline(&self, timelineid: ZTimelineId) -> Result<RepositoryTimeline<LayeredTimeline>> {
let mut timelines = self.timelines.lock().unwrap();
Ok(
match self.get_or_init_timeline(timelineid, &mut timelines)? {
@@ -156,7 +158,7 @@ impl Repository for LayeredRepository {
&self,
timelineid: ZTimelineId,
initdb_lsn: Lsn,
) -> Result<Arc<dyn Timeline>> {
) -> Result<Arc<LayeredTimeline>> {
let mut timelines = self.timelines.lock().unwrap();
// Create the timeline directory, and write initial metadata to file.
@@ -1073,10 +1075,6 @@ impl Timeline for LayeredTimeline {
_write_guard: self.write_lock.lock().unwrap(),
})
}
fn upgrade_to_layered_timeline(&self) -> &crate::layered_repository::LayeredTimeline {
self
}
}
impl LayeredTimeline {
@@ -2143,20 +2141,20 @@ impl LayeredTimeline {
}
}
struct LayeredTimelineWriter<'a> {
pub struct LayeredTimelineWriter<'a> {
tl: &'a LayeredTimeline,
_write_guard: MutexGuard<'a, ()>,
}
impl Deref for LayeredTimelineWriter<'_> {
type Target = dyn Timeline;
type Target = LayeredTimeline;
fn deref(&self) -> &Self::Target {
self.tl
}
}
impl<'a> TimelineWriter for LayeredTimelineWriter<'a> {
impl<'a> TimelineWriter<'_> for LayeredTimelineWriter<'a> {
fn put_wal_record(
&self,
lsn: Lsn,

View File

@@ -22,6 +22,8 @@ use lazy_static::lazy_static;
use zenith_metrics::{register_int_gauge_vec, IntGaugeVec};
use zenith_utils::zid::{ZTenantId, ZTimelineId};
use layered_repository::{LayeredRepository, LayeredTimeline};
lazy_static! {
static ref LIVE_CONNECTIONS_COUNT: IntGaugeVec = register_int_gauge_vec!(
"pageserver_live_connections_count",
@@ -43,3 +45,7 @@ pub enum CheckpointConfig {
// Flush all in-memory data and reconstruct all page images
Forced,
}
pub type RepositoryImpl = LayeredRepository;
pub type TimelineImpl = LayeredTimeline;

View File

@@ -33,7 +33,7 @@ use zenith_utils::zid::{ZTenantId, ZTimelineId};
use crate::basebackup;
use crate::config::PageServerConf;
use crate::relish::*;
use crate::repository::Timeline;
use crate::repository::{Repository, Timeline};
use crate::tenant_mgr;
use crate::thread_mgr;
use crate::thread_mgr::ThreadKind;
@@ -395,8 +395,8 @@ impl PageServerHandler {
/// In either case, if the page server hasn't received the WAL up to the
/// requested LSN yet, we will wait for it to arrive. The return value is
/// the LSN that should be used to look up the page versions.
fn wait_or_get_last_lsn(
timeline: &dyn Timeline,
fn wait_or_get_last_lsn<T: Timeline>(
timeline: &T,
mut lsn: Lsn,
latest: bool,
latest_gc_cutoff_lsn: &RwLockReadGuard<Lsn>,
@@ -443,9 +443,9 @@ impl PageServerHandler {
Ok(lsn)
}
fn handle_get_rel_exists_request(
fn handle_get_rel_exists_request<T: Timeline>(
&self,
timeline: &dyn Timeline,
timeline: &T,
req: &PagestreamExistsRequest,
) -> Result<PagestreamBeMessage> {
let _enter = info_span!("get_rel_exists", rel = %req.rel, req_lsn = %req.lsn).entered();
@@ -461,9 +461,9 @@ impl PageServerHandler {
}))
}
fn handle_get_nblocks_request(
fn handle_get_nblocks_request<T: Timeline>(
&self,
timeline: &dyn Timeline,
timeline: &T,
req: &PagestreamNblocksRequest,
) -> Result<PagestreamBeMessage> {
let _enter = info_span!("get_nblocks", rel = %req.rel, req_lsn = %req.lsn).entered();
@@ -482,9 +482,9 @@ impl PageServerHandler {
}))
}
fn handle_get_page_at_lsn_request(
fn handle_get_page_at_lsn_request<T: Timeline>(
&self,
timeline: &dyn Timeline,
timeline: &T,
req: &PagestreamGetPageRequest,
) -> Result<PagestreamBeMessage> {
let _enter = info_span!("get_page", rel = %req.rel, blkno = &req.blkno, req_lsn = %req.lsn)

View File

@@ -6,7 +6,7 @@ use bytes::Bytes;
use postgres_ffi::{MultiXactId, MultiXactOffset, TransactionId};
use serde::{Deserialize, Serialize};
use std::collections::HashSet;
use std::ops::{AddAssign, Deref};
use std::ops::AddAssign;
use std::sync::{Arc, RwLockReadGuard};
use std::time::Duration;
use zenith_utils::lsn::{Lsn, RecordLsn};
@@ -19,6 +19,8 @@ pub type BlockNumber = u32;
/// A repository corresponds to one .zenith directory. One repository holds multiple
/// timelines, forked off from the same initial call to 'initdb'.
pub trait Repository: Send + Sync {
type Timeline: Timeline;
fn detach_timeline(&self, timeline_id: ZTimelineId) -> Result<()>;
/// Updates timeline based on the new sync state, received from the remote storage synchronization.
@@ -34,7 +36,7 @@ pub trait Repository: Send + Sync {
fn get_timeline_state(&self, timeline_id: ZTimelineId) -> Option<TimelineSyncState>;
/// Get Timeline handle for given zenith timeline ID.
fn get_timeline(&self, timelineid: ZTimelineId) -> Result<RepositoryTimeline>;
fn get_timeline(&self, timelineid: ZTimelineId) -> Result<RepositoryTimeline<Self::Timeline>>;
/// Create a new, empty timeline. The caller is responsible for loading data into it
/// Initdb lsn is provided for timeline impl to be able to perform checks for some operations against it.
@@ -42,7 +44,7 @@ pub trait Repository: Send + Sync {
&self,
timelineid: ZTimelineId,
initdb_lsn: Lsn,
) -> Result<Arc<dyn Timeline>>;
) -> Result<Arc<Self::Timeline>>;
/// Branch a timeline
fn branch_timeline(&self, src: ZTimelineId, dst: ZTimelineId, start_lsn: Lsn) -> Result<()>;
@@ -69,10 +71,10 @@ pub trait Repository: Send + Sync {
}
/// A timeline, that belongs to the current repository.
pub enum RepositoryTimeline {
pub enum RepositoryTimeline<T> {
/// Timeline, with its files present locally in pageserver's working directory.
/// Loaded into pageserver's memory and ready to be used.
Local(Arc<dyn Timeline>),
Local(Arc<T>),
/// Timeline, found on the pageserver's remote storage, but not yet downloaded locally.
Remote {
id: ZTimelineId,
@@ -81,8 +83,8 @@ pub enum RepositoryTimeline {
},
}
impl RepositoryTimeline {
pub fn local_timeline(&self) -> Option<Arc<dyn Timeline>> {
impl<T> RepositoryTimeline<T> {
pub fn local_timeline(&self) -> Option<Arc<T>> {
if let Self::Local(local_timeline) = self {
Some(Arc::clone(local_timeline))
} else {
@@ -217,7 +219,6 @@ pub trait Timeline: Send + Sync {
//
// These are called by the WAL receiver to digest WAL records.
//------------------------------------------------------------------------------
/// Atomically get both last and prev.
fn get_last_record_rlsn(&self) -> RecordLsn;
@@ -229,6 +230,10 @@ pub trait Timeline: Send + Sync {
fn get_disk_consistent_lsn(&self) -> Lsn;
/// Mutate the timeline with a [`TimelineWriter`].
///
/// FIXME: This ought to return &'a TimelineWriter, where TimelineWriter
/// is a generic type in this trait. But that doesn't currently work in
/// Rust: https://rust-lang.github.io/rfcs/1598-generic_associated_types.html
fn writer<'a>(&'a self) -> Box<dyn TimelineWriter + 'a>;
///
@@ -255,16 +260,13 @@ pub trait Timeline: Send + Sync {
/// Does the same as get_current_logical_size but counted on demand.
/// Used in tests to ensure that incremental and non incremental variants match.
fn get_current_logical_size_non_incremental(&self, lsn: Lsn) -> Result<usize>;
/// An escape hatch to allow "casting" a generic Timeline to LayeredTimeline.
fn upgrade_to_layered_timeline(&self) -> &crate::layered_repository::LayeredTimeline;
}
/// Various functions to mutate the timeline.
// TODO Currently, Deref is used to allow easy access to read methods from this trait.
// This is probably considered a bad practice in Rust and should be fixed eventually,
// but will cause large code changes.
pub trait TimelineWriter: Deref<Target = dyn Timeline> {
pub trait TimelineWriter<'a> {
/// Put a new page version that can be constructed from a WAL record
///
/// This will implicitly extend the relation, if the page is beyond the
@@ -395,15 +397,15 @@ pub mod repo_harness {
Ok(Self { conf, tenant_id })
}
pub fn load(&self) -> Box<dyn Repository> {
pub fn load(&self) -> LayeredRepository {
let walredo_mgr = Arc::new(TestRedoManager);
Box::new(LayeredRepository::new(
LayeredRepository::new(
self.conf,
walredo_mgr,
self.tenant_id,
false,
))
)
}
pub fn timeline_path(&self, timeline_id: &ZTimelineId) -> PathBuf {
@@ -467,7 +469,7 @@ mod tests {
forknum: 0,
});
fn assert_current_logical_size(timeline: &Arc<dyn Timeline>, lsn: Lsn) {
fn assert_current_logical_size<T: Timeline>(timeline: &Arc<T>, lsn: Lsn) {
let incremental = timeline.get_current_logical_size();
let non_incremental = timeline
.get_current_logical_size_non_incremental(lsn)
@@ -915,7 +917,7 @@ mod tests {
Ok(())
}
fn make_some_layers(tline: &Arc<dyn Timeline>, start_lsn: Lsn) -> Result<()> {
fn make_some_layers<T: Timeline>(tline: &Arc<T>, start_lsn: Lsn) -> Result<()> {
let mut lsn = start_lsn;
{
let writer = tline.writer();

View File

@@ -2,9 +2,10 @@
//! page server.
use crate::branches;
use crate::{RepositoryImpl, TimelineImpl};
use crate::config::PageServerConf;
use crate::layered_repository::LayeredRepository;
use crate::repository::{Repository, Timeline, TimelineSyncState};
use crate::repository::{Repository, TimelineSyncState};
use crate::thread_mgr;
use crate::thread_mgr::ThreadKind;
use crate::walredo::PostgresRedoManager;
@@ -24,7 +25,7 @@ lazy_static! {
struct Tenant {
state: TenantState,
repo: Arc<dyn Repository>,
repo: Arc<RepositoryImpl>,
}
#[derive(Debug, Serialize, Deserialize, Clone, Copy, PartialEq, Eq)]
@@ -78,7 +79,7 @@ pub fn set_timeline_states(
let walredo_mgr = PostgresRedoManager::new(conf, tenant_id);
// Set up an object repository, for actual data storage.
let repo: Arc<dyn Repository> = Arc::new(LayeredRepository::new(
let repo: Arc<RepositoryImpl> = Arc::new(LayeredRepository::new(
conf,
Arc::new(walredo_mgr),
tenant_id,
@@ -248,7 +249,7 @@ pub fn activate_tenant(conf: &'static PageServerConf, tenantid: ZTenantId) -> Re
Ok(())
}
pub fn get_repository_for_tenant(tenantid: ZTenantId) -> Result<Arc<dyn Repository>> {
pub fn get_repository_for_tenant(tenantid: ZTenantId) -> Result<Arc<RepositoryImpl>> {
let m = access_tenants();
let tenant = m
.get(&tenantid)
@@ -260,7 +261,7 @@ pub fn get_repository_for_tenant(tenantid: ZTenantId) -> Result<Arc<dyn Reposito
pub fn get_timeline_for_tenant(
tenantid: ZTenantId,
timelineid: ZTimelineId,
) -> Result<Arc<dyn Timeline>> {
) -> Result<Arc<TimelineImpl>> {
get_repository_for_tenant(tenantid)?
.get_timeline(timelineid)?
.local_timeline()

View File

@@ -1,6 +1,7 @@
//! This module contains functions to serve per-tenant background processes,
//! such as checkpointer and GC
use crate::config::PageServerConf;
use crate::repository::Repository;
use crate::tenant_mgr;
use crate::tenant_mgr::TenantState;
use crate::CheckpointConfig;

View File

@@ -46,7 +46,7 @@ pub struct WalIngest {
}
impl WalIngest {
pub fn new(timeline: &dyn Timeline, startpoint: Lsn) -> Result<WalIngest> {
pub fn new<T: Timeline>(timeline: &T, startpoint: Lsn) -> Result<WalIngest> {
// Fetch the latest checkpoint into memory, so that we can compare with it
// quickly in `ingest_record` and update it when it changes.
let checkpoint_bytes = timeline.get_page_at_lsn(RelishTag::Checkpoint, 0, startpoint)?;
@@ -66,9 +66,10 @@ impl WalIngest {
/// Helper function to parse a WAL record and call the Timeline's PUT functions for all the
/// relations/pages that the record affects.
///
pub fn ingest_record(
pub fn ingest_record<T: Timeline>(
&mut self,
timeline: &dyn TimelineWriter,
timeline: &T,
writer: &dyn TimelineWriter,
recdata: Bytes,
lsn: Lsn,
) -> Result<()> {
@@ -86,7 +87,7 @@ impl WalIngest {
if decoded.xl_rmid == pg_constants::RM_HEAP_ID
|| decoded.xl_rmid == pg_constants::RM_HEAP2_ID
{
self.ingest_heapam_record(&mut buf, timeline, lsn, &mut decoded)?;
self.ingest_heapam_record(&mut buf, writer, lsn, &mut decoded)?;
}
// Handle other special record types
if decoded.xl_rmid == pg_constants::RM_SMGR_ID
@@ -94,13 +95,13 @@ impl WalIngest {
== pg_constants::XLOG_SMGR_TRUNCATE
{
let truncate = XlSmgrTruncate::decode(&mut buf);
self.ingest_xlog_smgr_truncate(timeline, lsn, &truncate)?;
self.ingest_xlog_smgr_truncate(writer, lsn, &truncate)?;
} else if decoded.xl_rmid == pg_constants::RM_DBASE_ID {
if (decoded.xl_info & pg_constants::XLR_RMGR_INFO_MASK)
== pg_constants::XLOG_DBASE_CREATE
{
let createdb = XlCreateDatabase::decode(&mut buf);
self.ingest_xlog_dbase_create(timeline, lsn, &createdb)?;
self.ingest_xlog_dbase_create(timeline, writer, lsn, &createdb)?;
} else if (decoded.xl_info & pg_constants::XLR_RMGR_INFO_MASK)
== pg_constants::XLOG_DBASE_DROP
{
@@ -113,7 +114,7 @@ impl WalIngest {
for tablespace_id in dropdb.tablespace_ids {
let rels = timeline.list_rels(tablespace_id, dropdb.db_id, req_lsn)?;
for rel in rels {
timeline.drop_relish(rel, lsn)?;
writer.drop_relish(rel, lsn)?;
}
trace!(
"Drop FileNodeMap {}, {} at lsn {}",
@@ -121,7 +122,7 @@ impl WalIngest {
dropdb.db_id,
lsn
);
timeline.drop_relish(
writer.drop_relish(
RelishTag::FileNodeMap {
spcnode: tablespace_id,
dbnode: dropdb.db_id,
@@ -138,7 +139,7 @@ impl WalIngest {
let pageno = buf.get_u32_le();
let segno = pageno / pg_constants::SLRU_PAGES_PER_SEGMENT;
let rpageno = pageno % pg_constants::SLRU_PAGES_PER_SEGMENT;
timeline.put_page_image(
writer.put_page_image(
RelishTag::Slru {
slru: SlruKind::Clog,
segno,
@@ -150,7 +151,7 @@ impl WalIngest {
} else {
assert!(info == pg_constants::CLOG_TRUNCATE);
let xlrec = XlClogTruncate::decode(&mut buf);
self.ingest_clog_truncate_record(timeline, lsn, &xlrec)?;
self.ingest_clog_truncate_record(timeline, writer, lsn, &xlrec)?;
}
} else if decoded.xl_rmid == pg_constants::RM_XACT_ID {
let info = decoded.xl_info & pg_constants::XLOG_XACT_OPMASK;
@@ -158,7 +159,7 @@ impl WalIngest {
let parsed_xact =
XlXactParsedRecord::decode(&mut buf, decoded.xl_xid, decoded.xl_info);
self.ingest_xact_record(
timeline,
writer,
lsn,
&parsed_xact,
info == pg_constants::XLOG_XACT_COMMIT,
@@ -169,7 +170,7 @@ impl WalIngest {
let parsed_xact =
XlXactParsedRecord::decode(&mut buf, decoded.xl_xid, decoded.xl_info);
self.ingest_xact_record(
timeline,
writer,
lsn,
&parsed_xact,
info == pg_constants::XLOG_XACT_COMMIT_PREPARED,
@@ -181,14 +182,14 @@ impl WalIngest {
parsed_xact.xid,
lsn
);
timeline.drop_relish(
writer.drop_relish(
RelishTag::TwoPhase {
xid: parsed_xact.xid,
},
lsn,
)?;
} else if info == pg_constants::XLOG_XACT_PREPARE {
timeline.put_page_image(
writer.put_page_image(
RelishTag::TwoPhase {
xid: decoded.xl_xid,
},
@@ -204,7 +205,7 @@ impl WalIngest {
let pageno = buf.get_u32_le();
let segno = pageno / pg_constants::SLRU_PAGES_PER_SEGMENT;
let rpageno = pageno % pg_constants::SLRU_PAGES_PER_SEGMENT;
timeline.put_page_image(
writer.put_page_image(
RelishTag::Slru {
slru: SlruKind::MultiXactOffsets,
segno,
@@ -217,7 +218,7 @@ impl WalIngest {
let pageno = buf.get_u32_le();
let segno = pageno / pg_constants::SLRU_PAGES_PER_SEGMENT;
let rpageno = pageno % pg_constants::SLRU_PAGES_PER_SEGMENT;
timeline.put_page_image(
writer.put_page_image(
RelishTag::Slru {
slru: SlruKind::MultiXactMembers,
segno,
@@ -228,14 +229,14 @@ impl WalIngest {
)?;
} else if info == pg_constants::XLOG_MULTIXACT_CREATE_ID {
let xlrec = XlMultiXactCreate::decode(&mut buf);
self.ingest_multixact_create_record(timeline, lsn, &xlrec)?;
self.ingest_multixact_create_record(writer, lsn, &xlrec)?;
} else if info == pg_constants::XLOG_MULTIXACT_TRUNCATE_ID {
let xlrec = XlMultiXactTruncate::decode(&mut buf);
self.ingest_multixact_truncate_record(timeline, lsn, &xlrec)?;
self.ingest_multixact_truncate_record(writer, lsn, &xlrec)?;
}
} else if decoded.xl_rmid == pg_constants::RM_RELMAP_ID {
let xlrec = XlRelmapUpdate::decode(&mut buf);
self.ingest_relmap_page(timeline, lsn, &xlrec, &decoded)?;
self.ingest_relmap_page(writer, lsn, &xlrec, &decoded)?;
} else if decoded.xl_rmid == pg_constants::RM_XLOG_ID {
let info = decoded.xl_info & pg_constants::XLR_RMGR_INFO_MASK;
if info == pg_constants::XLOG_NEXTOID {
@@ -270,20 +271,20 @@ impl WalIngest {
// Iterate through all the blocks that the record modifies, and
// "put" a separate copy of the record for each block.
for blk in decoded.blocks.iter() {
self.ingest_decoded_block(timeline, lsn, &decoded, blk)?;
self.ingest_decoded_block(writer, lsn, &decoded, blk)?;
}
// If checkpoint data was updated, store the new version in the repository
if self.checkpoint_modified {
let new_checkpoint_bytes = self.checkpoint.encode();
timeline.put_page_image(RelishTag::Checkpoint, 0, lsn, new_checkpoint_bytes)?;
writer.put_page_image(RelishTag::Checkpoint, 0, lsn, new_checkpoint_bytes)?;
self.checkpoint_modified = false;
}
// Now that this record has been fully handled, including updating the
// checkpoint data, let the repository know that it is up-to-date to this LSN
timeline.advance_last_record_lsn(lsn);
writer.advance_last_record_lsn(lsn);
Ok(())
}
@@ -465,9 +466,10 @@ impl WalIngest {
}
/// Subroutine of ingest_record(), to handle an XLOG_DBASE_CREATE record.
fn ingest_xlog_dbase_create(
fn ingest_xlog_dbase_create<T: Timeline>(
&mut self,
timeline: &dyn TimelineWriter,
timeline: &T,
writer: &dyn TimelineWriter,
lsn: Lsn,
rec: &XlCreateDatabase,
) -> Result<()> {
@@ -508,13 +510,13 @@ impl WalIngest {
debug!("copying block {} from {} to {}", blknum, src_rel, dst_rel);
timeline.put_page_image(RelishTag::Relation(dst_rel), blknum, lsn, content)?;
writer.put_page_image(RelishTag::Relation(dst_rel), blknum, lsn, content)?;
num_blocks_copied += 1;
}
if nblocks == 0 {
// make sure we have some trace of the relation, even if it's empty
timeline.put_truncation(RelishTag::Relation(dst_rel), lsn, 0)?;
writer.put_truncation(RelishTag::Relation(dst_rel), lsn, 0)?;
}
num_rels_copied += 1;
@@ -532,7 +534,7 @@ impl WalIngest {
spcnode: tablespace_id,
dbnode: db_id,
};
timeline.put_page_image(new_tag, 0, lsn, img)?;
writer.put_page_image(new_tag, 0, lsn, img)?;
break;
}
}
@@ -680,9 +682,10 @@ impl WalIngest {
Ok(())
}
fn ingest_clog_truncate_record(
fn ingest_clog_truncate_record<T: Timeline>(
&mut self,
timeline: &dyn TimelineWriter,
timeline: &T,
writer: &dyn TimelineWriter,
lsn: Lsn,
xlrec: &XlClogTruncate,
) -> Result<()> {
@@ -732,7 +735,7 @@ impl WalIngest {
if slru == SlruKind::Clog {
let segpage = segno * pg_constants::SLRU_PAGES_PER_SEGMENT;
if slru_may_delete_clogsegment(segpage, xlrec.pageno) {
timeline.drop_relish(RelishTag::Slru { slru, segno }, lsn)?;
writer.drop_relish(RelishTag::Slru { slru, segno }, lsn)?;
trace!("Drop CLOG segment {:>04X} at lsn {}", segno, lsn);
}
}

View File

@@ -6,6 +6,8 @@
//! We keep one WAL receiver active per timeline.
use crate::config::PageServerConf;
use crate::repository::Repository;
use crate::repository::Timeline;
use crate::tenant_mgr;
use crate::thread_mgr;
use crate::thread_mgr::ThreadKind;
@@ -250,11 +252,11 @@ fn walreceiver_main(
// It is important to deal with the aligned records as lsn in getPage@LSN is
// aligned and can be several bytes bigger. Without this alignment we are
// at risk of hittind a deadlock.
// at risk of hitting a deadlock.
assert!(lsn.is_aligned());
let writer = timeline.writer();
walingest.ingest_record(writer.as_ref(), recdata, lsn)?;
walingest.ingest_record(&*timeline, writer.as_ref(), recdata, lsn)?;
fail_point!("walreceiver-after-ingest");