diff --git a/pageserver/src/import_datadir.rs b/pageserver/src/import_datadir.rs index 8888e078aa..6f980f96f8 100644 --- a/pageserver/src/import_datadir.rs +++ b/pageserver/src/import_datadir.rs @@ -7,24 +7,26 @@ use std::fs::File; use std::io::{Read, Seek, SeekFrom}; use std::path::{Path, PathBuf}; -use anyhow::{anyhow, bail, Result}; +use anyhow::{anyhow, bail, ensure, Result}; use bytes::Bytes; use tracing::*; use crate::relish::*; use crate::repository::*; -use crate::walrecord::*; -use crate::walingest; +use crate::walingest::WalIngest; use postgres_ffi::relfile_utils::*; use postgres_ffi::waldecoder::*; use postgres_ffi::xlog_utils::*; use postgres_ffi::Oid; -use postgres_ffi::{pg_constants, CheckPoint, ControlFileData}; +use postgres_ffi::{pg_constants, ControlFileData, DBState_DB_SHUTDOWNED}; use zenith_utils::lsn::Lsn; /// /// Import all relation data pages from local disk into the repository. /// +/// This is currently only used to import a cluster freshly created by initdb. +/// The code that deals with the checkpoint would not work right if the +/// cluster was not shut down cleanly. pub fn import_timeline_from_postgres_datadir( path: &Path, writer: &dyn TimelineWriter, @@ -120,18 +122,28 @@ pub fn import_timeline_from_postgres_datadir( } // TODO: Scan pg_tblspc + // We're done importing all the data files. writer.advance_last_record_lsn(lsn); + // We expect the Postgres server to be shut down cleanly. + let pg_control = pg_control.ok_or_else(|| anyhow!("pg_control file not found"))?; + ensure!( + pg_control.state == DBState_DB_SHUTDOWNED, + "Postgres cluster was not shut down cleanly" + ); + ensure!( + pg_control.checkPointCopy.redo == lsn.0, + "unexpected checkpoint REDO pointer" + ); + // Import WAL. This is needed even when starting from a shutdown checkpoint, because // this reads the checkpoint record itself, advancing the tip of the timeline to - // *after* the checkpoint record. And crucially, it initializes the 'prev_lsn' - let pg_control = pg_control.ok_or_else(|| anyhow!("pg_control file not found"))?; + // *after* the checkpoint record. And crucially, it initializes the 'prev_lsn'. import_wal( &path.join("pg_wal"), writer, Lsn(pg_control.checkPointCopy.redo), lsn, - &mut pg_control.checkPointCopy.clone(), )?; Ok(()) @@ -300,10 +312,9 @@ fn import_slru_file( /// 'startpoint' and 'endpoint' into the repository. fn import_wal( walpath: &Path, - timeline: &dyn TimelineWriter, + writer: &dyn TimelineWriter, startpoint: Lsn, endpoint: Lsn, - checkpoint: &mut CheckPoint, ) -> Result<()> { let mut waldecoder = WalStreamDecoder::new(startpoint); @@ -311,6 +322,8 @@ fn import_wal( let mut offset = startpoint.segment_offset(pg_constants::WAL_SEGMENT_SIZE); let mut last_lsn = startpoint; + let mut walingest = WalIngest::new(writer, startpoint)?; + while last_lsn <= endpoint { // FIXME: assume postgresql tli 1 for now let filename = XLogFileName(1, segno, pg_constants::WAL_SEGMENT_SIZE); @@ -342,32 +355,9 @@ fn import_wal( let mut nrecords = 0; while last_lsn <= endpoint { if let Some((lsn, recdata)) = waldecoder.poll_decode()? { - let mut checkpoint_modified = false; - - let decoded = decode_wal_record(recdata.clone()); - walingest::save_decoded_record( - checkpoint, - &mut checkpoint_modified, - timeline, - &decoded, - recdata, - lsn, - )?; + walingest.ingest_record(writer, recdata, lsn)?; last_lsn = lsn; - if checkpoint_modified { - let checkpoint_bytes = checkpoint.encode(); - timeline.put_page_image( - RelishTag::Checkpoint, - 0, - last_lsn, - checkpoint_bytes, - )?; - } - - // Now that this record has been fully handled, including updating the - // checkpoint data, let the repository know that it is up-to-date to this LSN - timeline.advance_last_record_lsn(last_lsn); nrecords += 1; trace!("imported record at {} (end {})", lsn, endpoint); @@ -381,12 +371,7 @@ fn import_wal( } if last_lsn != startpoint { - debug!( - "reached end of WAL at {}, updating checkpoint info", - last_lsn - ); - - timeline.advance_last_record_lsn(last_lsn); + debug!("reached end of WAL at {}", last_lsn); } else { info!("no WAL to import at {}", last_lsn); } diff --git a/pageserver/src/walingest.rs b/pageserver/src/walingest.rs index cada876ee9..0d2b04ef80 100644 --- a/pageserver/src/walingest.rs +++ b/pageserver/src/walingest.rs @@ -1,6 +1,26 @@ //! //! Parse PostgreSQL WAL records and store them in a zenith Timeline. //! +//! The pipeline for ingesting WAL looks like this: +//! +//! WAL receiver -> WalIngest -> Repository +//! +//! The WAL receiver receives a stream of WAL from the WAL safekeepers, +//! and decodes it to individual WAL records. It feeds the WAL records +//! to WalIngest, which parses them and stores them in the Repository. +//! +//! The zenith Repository can store page versions in two formats: as +//! page images, or a WAL records. WalIngest::ingest_record() extracts +//! page images out of some WAL records, but most it stores as WAL +//! records. If a WAL record modifies multple pages, WalIngest +//! will call Repository::put_wal_record or put_page_image functions +//! separately for each modified page. +//! +//! To reconstruct a page using a WAL record, the Repository calls the +//! code in walredo.rs. walredo.rs passes most WAL records to the WAL +//! redo Postgres process, but some records it can handle directly with +//! bespoken Rust code. + use postgres_ffi::nonrelfile_utils::clogpage_precedes; use postgres_ffi::nonrelfile_utils::slru_may_delete_clogsegment; use std::cmp::min; @@ -22,651 +42,694 @@ const MAX_MBR_BLKNO: u32 = static ZERO_PAGE: Bytes = Bytes::from_static(&[0u8; 8192]); -/// -/// Helper function to parse a WAL record and call the Timeline's PUT functions for all the -/// relations/pages that the record affects. -/// -pub fn save_decoded_record( - checkpoint: &mut CheckPoint, - checkpoint_modified: &mut bool, - timeline: &dyn TimelineWriter, - decoded: &DecodedWALRecord, - recdata: Bytes, - lsn: Lsn, -) -> Result<()> { - if checkpoint.update_next_xid(decoded.xl_xid) { - *checkpoint_modified = true; +pub struct WalIngest { + checkpoint: CheckPoint, + checkpoint_modified: bool, +} + +impl WalIngest { + pub fn new(timeline: &dyn Timeline, startpoint: Lsn) -> Result { + // Fetch the latest checkpoint into memory, so that we can compare with it + // quickly in `ingest_record` and update it when it changes. + let checkpoint_bytes = timeline.get_page_at_lsn(RelishTag::Checkpoint, 0, startpoint)?; + let checkpoint = CheckPoint::decode(&checkpoint_bytes)?; + trace!("CheckPoint.nextXid = {}", checkpoint.nextXid.value); + + Ok(WalIngest { + checkpoint, + checkpoint_modified: false, + }) } - // Iterate through all the blocks that the record modifies, and - // "put" a separate copy of the record for each block. - for blk in decoded.blocks.iter() { - let tag = RelishTag::Relation(RelTag { - spcnode: blk.rnode_spcnode, - dbnode: blk.rnode_dbnode, - relnode: blk.rnode_relnode, - forknum: blk.forknum as u8, - }); - // - // Instead of storing full-page-image WAL record, - // it is better to store extracted image: we can skip wal-redo - // in this case. Also some FPI records may contain multiple (up to 32) pages, - // so them have to be copied multiple times. - // - if blk.apply_image - && blk.has_image - && decoded.xl_rmid == pg_constants::RM_XLOG_ID - && (decoded.xl_info == pg_constants::XLOG_FPI - || decoded.xl_info == pg_constants::XLOG_FPI_FOR_HINT) - // compression of WAL is not yet supported: fall back to storing the original WAL record - && (blk.bimg_info & pg_constants::BKPIMAGE_IS_COMPRESSED) == 0 - { - // Extract page image from FPI record - let img_len = blk.bimg_len as usize; - let img_offs = blk.bimg_offset as usize; - let mut image = BytesMut::with_capacity(pg_constants::BLCKSZ as usize); - image.extend_from_slice(&recdata[img_offs..img_offs + img_len]); + /// + /// Decode a PostgreSQL WAL record and store it in the repository, in the given timeline. + /// + /// + /// Helper function to parse a WAL record and call the Timeline's PUT functions for all the + /// relations/pages that the record affects. + /// + pub fn ingest_record( + &mut self, + timeline: &dyn TimelineWriter, + recdata: Bytes, + lsn: Lsn, + ) -> Result<()> { + let decoded = decode_wal_record(recdata.clone()); - if blk.hole_length != 0 { - let tail = image.split_off(blk.hole_offset as usize); - image.resize(image.len() + blk.hole_length as usize, 0u8); - image.unsplit(tail); - } - image[0..4].copy_from_slice(&((lsn.0 >> 32) as u32).to_le_bytes()); - image[4..8].copy_from_slice(&(lsn.0 as u32).to_le_bytes()); - assert_eq!(image.len(), pg_constants::BLCKSZ as usize); - timeline.put_page_image(tag, blk.blkno, lsn, image.freeze())?; - } else { - let rec = WALRecord { - will_init: blk.will_init || blk.apply_image, - rec: recdata.clone(), - main_data_offset: decoded.main_data_offset as u32, - }; - timeline.put_wal_record(lsn, tag, blk.blkno, rec)?; + assert!(!self.checkpoint_modified); + if self.checkpoint.update_next_xid(decoded.xl_xid) { + self.checkpoint_modified = true; } - } - let mut buf = decoded.record.clone(); - buf.advance(decoded.main_data_offset); + // Iterate through all the blocks that the record modifies, and + // "put" a separate copy of the record for each block. + for blk in decoded.blocks.iter() { + let tag = RelishTag::Relation(RelTag { + spcnode: blk.rnode_spcnode, + dbnode: blk.rnode_dbnode, + relnode: blk.rnode_relnode, + forknum: blk.forknum as u8, + }); - // Handle a few special record types - if decoded.xl_rmid == pg_constants::RM_SMGR_ID - && (decoded.xl_info & pg_constants::XLR_RMGR_INFO_MASK) == pg_constants::XLOG_SMGR_TRUNCATE - { - let truncate = XlSmgrTruncate::decode(&mut buf); - save_xlog_smgr_truncate(timeline, lsn, &truncate)?; - } else if decoded.xl_rmid == pg_constants::RM_DBASE_ID { - if (decoded.xl_info & pg_constants::XLR_RMGR_INFO_MASK) == pg_constants::XLOG_DBASE_CREATE { - let createdb = XlCreateDatabase::decode(&mut buf); - save_xlog_dbase_create(timeline, lsn, &createdb)?; - } else if (decoded.xl_info & pg_constants::XLR_RMGR_INFO_MASK) - == pg_constants::XLOG_DBASE_DROP - { - let dropdb = XlDropDatabase::decode(&mut buf); + // + // Instead of storing full-page-image WAL record, + // it is better to store extracted image: we can skip wal-redo + // in this case. Also some FPI records may contain multiple (up to 32) pages, + // so them have to be copied multiple times. + // + if blk.apply_image + && blk.has_image + && decoded.xl_rmid == pg_constants::RM_XLOG_ID + && (decoded.xl_info == pg_constants::XLOG_FPI + || decoded.xl_info == pg_constants::XLOG_FPI_FOR_HINT) + // compression of WAL is not yet supported: fall back to storing the original WAL record + && (blk.bimg_info & pg_constants::BKPIMAGE_IS_COMPRESSED) == 0 + { + // Extract page image from FPI record + let img_len = blk.bimg_len as usize; + let img_offs = blk.bimg_offset as usize; + let mut image = BytesMut::with_capacity(pg_constants::BLCKSZ as usize); + image.extend_from_slice(&recdata[img_offs..img_offs + img_len]); - // To drop the database, we need to drop all the relations in it. Like in - // save_xlog_dbase_create(), use the previous record's LSN in the list_rels() call - let req_lsn = min(timeline.get_last_record_lsn(), lsn); - - for tablespace_id in dropdb.tablespace_ids { - let rels = timeline.list_rels(tablespace_id, dropdb.db_id, req_lsn)?; - for rel in rels { - timeline.drop_relish(rel, lsn)?; + if blk.hole_length != 0 { + let tail = image.split_off(blk.hole_offset as usize); + image.resize(image.len() + blk.hole_length as usize, 0u8); + image.unsplit(tail); } + image[0..4].copy_from_slice(&((lsn.0 >> 32) as u32).to_le_bytes()); + image[4..8].copy_from_slice(&(lsn.0 as u32).to_le_bytes()); + assert_eq!(image.len(), pg_constants::BLCKSZ as usize); + timeline.put_page_image(tag, blk.blkno, lsn, image.freeze())?; + } else { + let rec = WALRecord { + will_init: blk.will_init || blk.apply_image, + rec: recdata.clone(), + main_data_offset: decoded.main_data_offset as u32, + }; + timeline.put_wal_record(lsn, tag, blk.blkno, rec)?; + } + } + + let mut buf = decoded.record.clone(); + buf.advance(decoded.main_data_offset); + + // Handle a few special record types + if decoded.xl_rmid == pg_constants::RM_SMGR_ID + && (decoded.xl_info & pg_constants::XLR_RMGR_INFO_MASK) + == pg_constants::XLOG_SMGR_TRUNCATE + { + let truncate = XlSmgrTruncate::decode(&mut buf); + self.ingest_xlog_smgr_truncate(timeline, lsn, &truncate)?; + } else if decoded.xl_rmid == pg_constants::RM_DBASE_ID { + if (decoded.xl_info & pg_constants::XLR_RMGR_INFO_MASK) + == pg_constants::XLOG_DBASE_CREATE + { + let createdb = XlCreateDatabase::decode(&mut buf); + self.ingest_xlog_dbase_create(timeline, lsn, &createdb)?; + } else if (decoded.xl_info & pg_constants::XLR_RMGR_INFO_MASK) + == pg_constants::XLOG_DBASE_DROP + { + let dropdb = XlDropDatabase::decode(&mut buf); + + // To drop the database, we need to drop all the relations in it. Like in + // ingest_xlog_dbase_create(), use the previous record's LSN in the list_rels() call + let req_lsn = min(timeline.get_last_record_lsn(), lsn); + + for tablespace_id in dropdb.tablespace_ids { + let rels = timeline.list_rels(tablespace_id, dropdb.db_id, req_lsn)?; + for rel in rels { + timeline.drop_relish(rel, lsn)?; + } + trace!( + "Drop FileNodeMap {}, {} at lsn {}", + tablespace_id, + dropdb.db_id, + lsn + ); + timeline.drop_relish( + RelishTag::FileNodeMap { + spcnode: tablespace_id, + dbnode: dropdb.db_id, + }, + lsn, + )?; + } + } + } else if decoded.xl_rmid == pg_constants::RM_TBLSPC_ID { + trace!("XLOG_TBLSPC_CREATE/DROP is not handled yet"); + } else if decoded.xl_rmid == pg_constants::RM_CLOG_ID { + let info = decoded.xl_info & !pg_constants::XLR_INFO_MASK; + if info == pg_constants::CLOG_ZEROPAGE { + let pageno = buf.get_u32_le(); + let segno = pageno / pg_constants::SLRU_PAGES_PER_SEGMENT; + let rpageno = pageno % pg_constants::SLRU_PAGES_PER_SEGMENT; + timeline.put_page_image( + RelishTag::Slru { + slru: SlruKind::Clog, + segno, + }, + rpageno, + lsn, + ZERO_PAGE.clone(), + )?; + } else { + assert!(info == pg_constants::CLOG_TRUNCATE); + let xlrec = XlClogTruncate::decode(&mut buf); + self.ingest_clog_truncate_record(timeline, lsn, &xlrec)?; + } + } else if decoded.xl_rmid == pg_constants::RM_XACT_ID { + let info = decoded.xl_info & pg_constants::XLOG_XACT_OPMASK; + if info == pg_constants::XLOG_XACT_COMMIT || info == pg_constants::XLOG_XACT_ABORT { + let parsed_xact = + XlXactParsedRecord::decode(&mut buf, decoded.xl_xid, decoded.xl_info); + self.ingest_xact_record(timeline, lsn, &parsed_xact, &decoded)?; + } else if info == pg_constants::XLOG_XACT_COMMIT_PREPARED + || info == pg_constants::XLOG_XACT_ABORT_PREPARED + { + let parsed_xact = + XlXactParsedRecord::decode(&mut buf, decoded.xl_xid, decoded.xl_info); + self.ingest_xact_record(timeline, lsn, &parsed_xact, &decoded)?; + // Remove twophase file. see RemoveTwoPhaseFile() in postgres code trace!( - "Drop FileNodeMap {}, {} at lsn {}", - tablespace_id, - dropdb.db_id, + "Drop twophaseFile for xid {} parsed_xact.xid {} here at {}", + decoded.xl_xid, + parsed_xact.xid, lsn ); timeline.drop_relish( - RelishTag::FileNodeMap { - spcnode: tablespace_id, - dbnode: dropdb.db_id, + RelishTag::TwoPhase { + xid: parsed_xact.xid, }, lsn, )?; + } else if info == pg_constants::XLOG_XACT_PREPARE { + let mut buf = decoded.record.clone(); + buf.advance(decoded.main_data_offset); + + timeline.put_page_image( + RelishTag::TwoPhase { + xid: decoded.xl_xid, + }, + 0, + lsn, + Bytes::copy_from_slice(&buf[..]), + )?; } - } - } else if decoded.xl_rmid == pg_constants::RM_TBLSPC_ID { - trace!("XLOG_TBLSPC_CREATE/DROP is not handled yet"); - } else if decoded.xl_rmid == pg_constants::RM_CLOG_ID { - let info = decoded.xl_info & !pg_constants::XLR_INFO_MASK; - if info == pg_constants::CLOG_ZEROPAGE { - let pageno = buf.get_u32_le(); - let segno = pageno / pg_constants::SLRU_PAGES_PER_SEGMENT; - let rpageno = pageno % pg_constants::SLRU_PAGES_PER_SEGMENT; - timeline.put_page_image( - RelishTag::Slru { - slru: SlruKind::Clog, - segno, - }, - rpageno, - lsn, - ZERO_PAGE.clone(), - )?; - } else { - assert!(info == pg_constants::CLOG_TRUNCATE); - let xlrec = XlClogTruncate::decode(&mut buf); - save_clog_truncate_record(checkpoint, checkpoint_modified, timeline, lsn, &xlrec)?; - } - } else if decoded.xl_rmid == pg_constants::RM_XACT_ID { - let info = decoded.xl_info & pg_constants::XLOG_XACT_OPMASK; - if info == pg_constants::XLOG_XACT_COMMIT || info == pg_constants::XLOG_XACT_ABORT { - let parsed_xact = XlXactParsedRecord::decode(&mut buf, decoded.xl_xid, decoded.xl_info); - save_xact_record(timeline, lsn, &parsed_xact, decoded)?; - } else if info == pg_constants::XLOG_XACT_COMMIT_PREPARED - || info == pg_constants::XLOG_XACT_ABORT_PREPARED - { - let parsed_xact = XlXactParsedRecord::decode(&mut buf, decoded.xl_xid, decoded.xl_info); - save_xact_record(timeline, lsn, &parsed_xact, decoded)?; - // Remove twophase file. see RemoveTwoPhaseFile() in postgres code - trace!( - "Drop twophaseFile for xid {} parsed_xact.xid {} here at {}", - decoded.xl_xid, - parsed_xact.xid, - lsn - ); - timeline.drop_relish( - RelishTag::TwoPhase { - xid: parsed_xact.xid, - }, - lsn, - )?; - } else if info == pg_constants::XLOG_XACT_PREPARE { - let mut buf = decoded.record.clone(); - buf.advance(decoded.main_data_offset); + } else if decoded.xl_rmid == pg_constants::RM_MULTIXACT_ID { + let info = decoded.xl_info & pg_constants::XLR_RMGR_INFO_MASK; - timeline.put_page_image( - RelishTag::TwoPhase { - xid: decoded.xl_xid, - }, - 0, - lsn, - Bytes::copy_from_slice(&buf[..]), - )?; - } - } else if decoded.xl_rmid == pg_constants::RM_MULTIXACT_ID { - let info = decoded.xl_info & pg_constants::XLR_RMGR_INFO_MASK; - - if info == pg_constants::XLOG_MULTIXACT_ZERO_OFF_PAGE { - let pageno = buf.get_u32_le(); - let segno = pageno / pg_constants::SLRU_PAGES_PER_SEGMENT; - let rpageno = pageno % pg_constants::SLRU_PAGES_PER_SEGMENT; - timeline.put_page_image( - RelishTag::Slru { - slru: SlruKind::MultiXactOffsets, - segno, - }, - rpageno, - lsn, - ZERO_PAGE.clone(), - )?; - } else if info == pg_constants::XLOG_MULTIXACT_ZERO_MEM_PAGE { - let pageno = buf.get_u32_le(); - let segno = pageno / pg_constants::SLRU_PAGES_PER_SEGMENT; - let rpageno = pageno % pg_constants::SLRU_PAGES_PER_SEGMENT; - timeline.put_page_image( - RelishTag::Slru { - slru: SlruKind::MultiXactMembers, - segno, - }, - rpageno, - lsn, - ZERO_PAGE.clone(), - )?; - } else if info == pg_constants::XLOG_MULTIXACT_CREATE_ID { - let xlrec = XlMultiXactCreate::decode(&mut buf); - save_multixact_create_record( - checkpoint, - checkpoint_modified, - timeline, - lsn, - &xlrec, - decoded, - )?; - } else if info == pg_constants::XLOG_MULTIXACT_TRUNCATE_ID { - let xlrec = XlMultiXactTruncate::decode(&mut buf); - save_multixact_truncate_record(checkpoint, checkpoint_modified, timeline, lsn, &xlrec)?; - } - } else if decoded.xl_rmid == pg_constants::RM_RELMAP_ID { - let xlrec = XlRelmapUpdate::decode(&mut buf); - save_relmap_page(timeline, lsn, &xlrec, decoded)?; - } else if decoded.xl_rmid == pg_constants::RM_XLOG_ID { - let info = decoded.xl_info & pg_constants::XLR_RMGR_INFO_MASK; - if info == pg_constants::XLOG_NEXTOID { - let next_oid = buf.get_u32_le(); - if checkpoint.nextOid != next_oid { - checkpoint.nextOid = next_oid; - *checkpoint_modified = true; + if info == pg_constants::XLOG_MULTIXACT_ZERO_OFF_PAGE { + let pageno = buf.get_u32_le(); + let segno = pageno / pg_constants::SLRU_PAGES_PER_SEGMENT; + let rpageno = pageno % pg_constants::SLRU_PAGES_PER_SEGMENT; + timeline.put_page_image( + RelishTag::Slru { + slru: SlruKind::MultiXactOffsets, + segno, + }, + rpageno, + lsn, + ZERO_PAGE.clone(), + )?; + } else if info == pg_constants::XLOG_MULTIXACT_ZERO_MEM_PAGE { + let pageno = buf.get_u32_le(); + let segno = pageno / pg_constants::SLRU_PAGES_PER_SEGMENT; + let rpageno = pageno % pg_constants::SLRU_PAGES_PER_SEGMENT; + timeline.put_page_image( + RelishTag::Slru { + slru: SlruKind::MultiXactMembers, + segno, + }, + rpageno, + lsn, + ZERO_PAGE.clone(), + )?; + } else if info == pg_constants::XLOG_MULTIXACT_CREATE_ID { + let xlrec = XlMultiXactCreate::decode(&mut buf); + self.ingest_multixact_create_record(timeline, lsn, &xlrec, &decoded)?; + } else if info == pg_constants::XLOG_MULTIXACT_TRUNCATE_ID { + let xlrec = XlMultiXactTruncate::decode(&mut buf); + self.ingest_multixact_truncate_record(timeline, lsn, &xlrec)?; } - } else if info == pg_constants::XLOG_CHECKPOINT_ONLINE - || info == pg_constants::XLOG_CHECKPOINT_SHUTDOWN - { - let mut checkpoint_bytes = [0u8; SIZEOF_CHECKPOINT]; - let mut buf = decoded.record.clone(); - buf.advance(decoded.main_data_offset); - buf.copy_to_slice(&mut checkpoint_bytes); - let xlog_checkpoint = CheckPoint::decode(&checkpoint_bytes).unwrap(); - trace!( - "xlog_checkpoint.oldestXid={}, checkpoint.oldestXid={}", - xlog_checkpoint.oldestXid, - checkpoint.oldestXid - ); - if (checkpoint.oldestXid.wrapping_sub(xlog_checkpoint.oldestXid) as i32) < 0 { - checkpoint.oldestXid = xlog_checkpoint.oldestXid; - *checkpoint_modified = true; - } - } - } - Ok(()) -} - -/// Subroutine of save_decoded_record(), to handle an XLOG_DBASE_CREATE record. -fn save_xlog_dbase_create( - timeline: &dyn TimelineWriter, - lsn: Lsn, - rec: &XlCreateDatabase, -) -> Result<()> { - let db_id = rec.db_id; - let tablespace_id = rec.tablespace_id; - let src_db_id = rec.src_db_id; - let src_tablespace_id = rec.src_tablespace_id; - - // Creating a database is implemented by copying the template (aka. source) database. - // To copy all the relations, we need to ask for the state as of the same LSN, but we - // cannot pass 'lsn' to the Timeline.get_* functions, or they will block waiting for - // the last valid LSN to advance up to it. So we use the previous record's LSN in the - // get calls instead. - let req_lsn = min(timeline.get_last_record_lsn(), lsn); - - let rels = timeline.list_rels(src_tablespace_id, src_db_id, req_lsn)?; - - trace!("save_create_database: {} rels", rels.len()); - - let mut num_rels_copied = 0; - let mut num_blocks_copied = 0; - for rel in rels { - if let RelishTag::Relation(src_rel) = rel { - assert_eq!(src_rel.spcnode, src_tablespace_id); - assert_eq!(src_rel.dbnode, src_db_id); - - let nblocks = timeline.get_relish_size(rel, req_lsn)?.unwrap_or(0); - let dst_rel = RelTag { - spcnode: tablespace_id, - dbnode: db_id, - relnode: src_rel.relnode, - forknum: src_rel.forknum, - }; - - // Copy content - for blknum in 0..nblocks { - let content = timeline.get_page_at_lsn(rel, blknum, req_lsn)?; - - debug!("copying block {} from {} to {}", blknum, src_rel, dst_rel); - - timeline.put_page_image(RelishTag::Relation(dst_rel), blknum, lsn, content)?; - num_blocks_copied += 1; - } - - if nblocks == 0 { - // make sure we have some trace of the relation, even if it's empty - timeline.put_truncation(RelishTag::Relation(dst_rel), lsn, 0)?; - } - - num_rels_copied += 1; - } - } - - // Copy relfilemap - // TODO This implementation is very inefficient - - // it scans all non-rels only to find FileNodeMaps - for tag in timeline.list_nonrels(req_lsn)? { - if let RelishTag::FileNodeMap { spcnode, dbnode } = tag { - if spcnode == src_tablespace_id && dbnode == src_db_id { - let img = timeline.get_page_at_lsn(tag, 0, req_lsn)?; - let new_tag = RelishTag::FileNodeMap { - spcnode: tablespace_id, - dbnode: db_id, - }; - timeline.put_page_image(new_tag, 0, lsn, img)?; - break; - } - } - } - info!( - "Created database {}/{}, copied {} blocks in {} rels at {}", - tablespace_id, db_id, num_blocks_copied, num_rels_copied, lsn - ); - Ok(()) -} - -/// Subroutine of save_decoded_record(), to handle an XLOG_SMGR_TRUNCATE record. -/// -/// This is the same logic as in PostgreSQL's smgr_redo() function. -fn save_xlog_smgr_truncate( - timeline: &dyn TimelineWriter, - lsn: Lsn, - rec: &XlSmgrTruncate, -) -> Result<()> { - let spcnode = rec.rnode.spcnode; - let dbnode = rec.rnode.dbnode; - let relnode = rec.rnode.relnode; - - if (rec.flags & pg_constants::SMGR_TRUNCATE_HEAP) != 0 { - let rel = RelTag { - spcnode, - dbnode, - relnode, - forknum: pg_constants::MAIN_FORKNUM, - }; - timeline.put_truncation(RelishTag::Relation(rel), lsn, rec.blkno)?; - } - if (rec.flags & pg_constants::SMGR_TRUNCATE_FSM) != 0 { - let rel = RelTag { - spcnode, - dbnode, - relnode, - forknum: pg_constants::FSM_FORKNUM, - }; - - // FIXME: 'blkno' stored in the WAL record is the new size of the - // heap. The formula for calculating the new size of the FSM is - // pretty complicated (see FreeSpaceMapPrepareTruncateRel() in - // PostgreSQL), and we should also clear bits in the tail FSM block, - // and update the upper level FSM pages. None of that has been - // implemented. What we do instead, is always just truncate the FSM - // to zero blocks. That's bad for performance, but safe. (The FSM - // isn't needed for correctness, so we could also leave garbage in - // it. Seems more tidy to zap it away.) - if rec.blkno != 0 { - info!("Partial truncation of FSM is not supported"); - } - let num_fsm_blocks = 0; - timeline.put_truncation(RelishTag::Relation(rel), lsn, num_fsm_blocks)?; - } - if (rec.flags & pg_constants::SMGR_TRUNCATE_VM) != 0 { - let rel = RelTag { - spcnode, - dbnode, - relnode, - forknum: pg_constants::VISIBILITYMAP_FORKNUM, - }; - - // FIXME: Like with the FSM above, the logic to truncate the VM - // correctly has not been implemented. Just zap it away completely, - // always. Unlike the FSM, the VM must never have bits incorrectly - // set. From a correctness point of view, it's always OK to clear - // bits or remove it altogether, though. - if rec.blkno != 0 { - info!("Partial truncation of VM is not supported"); - } - let num_vm_blocks = 0; - timeline.put_truncation(RelishTag::Relation(rel), lsn, num_vm_blocks)?; - } - Ok(()) -} - -/// Subroutine of save_decoded_record(), to handle an XLOG_XACT_* records. -/// -fn save_xact_record( - timeline: &dyn TimelineWriter, - lsn: Lsn, - parsed: &XlXactParsedRecord, - decoded: &DecodedWALRecord, -) -> Result<()> { - // Record update of CLOG page - let mut pageno = parsed.xid / pg_constants::CLOG_XACTS_PER_PAGE; - - let segno = pageno / pg_constants::SLRU_PAGES_PER_SEGMENT; - let rpageno = pageno % pg_constants::SLRU_PAGES_PER_SEGMENT; - let rec = WALRecord { - will_init: false, - rec: decoded.record.clone(), - main_data_offset: decoded.main_data_offset as u32, - }; - timeline.put_wal_record( - lsn, - RelishTag::Slru { - slru: SlruKind::Clog, - segno, - }, - rpageno, - rec.clone(), - )?; - - for subxact in &parsed.subxacts { - let subxact_pageno = subxact / pg_constants::CLOG_XACTS_PER_PAGE; - if subxact_pageno != pageno { - pageno = subxact_pageno; - let segno = pageno / pg_constants::SLRU_PAGES_PER_SEGMENT; - let rpageno = pageno % pg_constants::SLRU_PAGES_PER_SEGMENT; - timeline.put_wal_record( - lsn, - RelishTag::Slru { - slru: SlruKind::Clog, - segno, - }, - rpageno, - rec.clone(), - )?; - } - } - for xnode in &parsed.xnodes { - for forknum in pg_constants::MAIN_FORKNUM..=pg_constants::VISIBILITYMAP_FORKNUM { - let rel = RelTag { - forknum, - spcnode: xnode.spcnode, - dbnode: xnode.dbnode, - relnode: xnode.relnode, - }; - timeline.drop_relish(RelishTag::Relation(rel), lsn)?; - } - } - Ok(()) -} - -fn save_clog_truncate_record( - checkpoint: &mut CheckPoint, - checkpoint_modified: &mut bool, - timeline: &dyn TimelineWriter, - lsn: Lsn, - xlrec: &XlClogTruncate, -) -> Result<()> { - info!( - "RM_CLOG_ID truncate pageno {} oldestXid {} oldestXidDB {} lsn {}", - xlrec.pageno, xlrec.oldest_xid, xlrec.oldest_xid_db, lsn - ); - - // Here we treat oldestXid and oldestXidDB - // differently from postgres redo routines. - // In postgres checkpoint.oldestXid lags behind xlrec.oldest_xid - // until checkpoint happens and updates the value. - // Here we can use the most recent value. - // It's just an optimization, though and can be deleted. - // TODO Figure out if there will be any issues with replica. - checkpoint.oldestXid = xlrec.oldest_xid; - checkpoint.oldestXidDB = xlrec.oldest_xid_db; - *checkpoint_modified = true; - - // TODO Treat AdvanceOldestClogXid() or write a comment why we don't need it - - let latest_page_number = checkpoint.nextXid.value as u32 / pg_constants::CLOG_XACTS_PER_PAGE; - - // Now delete all segments containing pages between xlrec.pageno - // and latest_page_number. - - // First, make an important safety check: - // the current endpoint page must not be eligible for removal. - // See SimpleLruTruncate() in slru.c - if clogpage_precedes(latest_page_number, xlrec.pageno) { - info!("could not truncate directory pg_xact apparent wraparound"); - return Ok(()); - } - - // Iterate via SLRU CLOG segments and drop segments that we're ready to truncate - // TODO This implementation is very inefficient - - // it scans all non-rels only to find Clog - // - // We cannot pass 'lsn' to the Timeline.list_nonrels(), or it - // will block waiting for the last valid LSN to advance up to - // it. So we use the previous record's LSN in the get calls - // instead. - let req_lsn = min(timeline.get_last_record_lsn(), lsn); - for obj in timeline.list_nonrels(req_lsn)? { - if let RelishTag::Slru { slru, segno } = obj { - if slru == SlruKind::Clog { - let segpage = segno * pg_constants::SLRU_PAGES_PER_SEGMENT; - if slru_may_delete_clogsegment(segpage, xlrec.pageno) { - timeline.drop_relish(RelishTag::Slru { slru, segno }, lsn)?; - trace!("Drop CLOG segment {:>04X} at lsn {}", segno, lsn); + } else if decoded.xl_rmid == pg_constants::RM_RELMAP_ID { + let xlrec = XlRelmapUpdate::decode(&mut buf); + self.ingest_relmap_page(timeline, lsn, &xlrec, &decoded)?; + } else if decoded.xl_rmid == pg_constants::RM_XLOG_ID { + let info = decoded.xl_info & pg_constants::XLR_RMGR_INFO_MASK; + if info == pg_constants::XLOG_NEXTOID { + let next_oid = buf.get_u32_le(); + if self.checkpoint.nextOid != next_oid { + self.checkpoint.nextOid = next_oid; + self.checkpoint_modified = true; + } + } else if info == pg_constants::XLOG_CHECKPOINT_ONLINE + || info == pg_constants::XLOG_CHECKPOINT_SHUTDOWN + { + let mut checkpoint_bytes = [0u8; SIZEOF_CHECKPOINT]; + let mut buf = decoded.record.clone(); + buf.advance(decoded.main_data_offset); + buf.copy_to_slice(&mut checkpoint_bytes); + let xlog_checkpoint = CheckPoint::decode(&checkpoint_bytes).unwrap(); + trace!( + "xlog_checkpoint.oldestXid={}, checkpoint.oldestXid={}", + xlog_checkpoint.oldestXid, + self.checkpoint.oldestXid + ); + if (self + .checkpoint + .oldestXid + .wrapping_sub(xlog_checkpoint.oldestXid) as i32) + < 0 + { + self.checkpoint.oldestXid = xlog_checkpoint.oldestXid; + self.checkpoint_modified = true; } } } + + // If checkpoint data was updated, store the new version in the repository + if self.checkpoint_modified { + let new_checkpoint_bytes = self.checkpoint.encode(); + + timeline.put_page_image(RelishTag::Checkpoint, 0, lsn, new_checkpoint_bytes)?; + self.checkpoint_modified = false; + } + + // Now that this record has been fully handled, including updating the + // checkpoint data, let the repository know that it is up-to-date to this LSN + timeline.advance_last_record_lsn(lsn); + + Ok(()) } - Ok(()) -} + /// Subroutine of ingest_record(), to handle an XLOG_DBASE_CREATE record. + fn ingest_xlog_dbase_create( + &mut self, + timeline: &dyn TimelineWriter, + lsn: Lsn, + rec: &XlCreateDatabase, + ) -> Result<()> { + let db_id = rec.db_id; + let tablespace_id = rec.tablespace_id; + let src_db_id = rec.src_db_id; + let src_tablespace_id = rec.src_tablespace_id; -fn save_multixact_create_record( - checkpoint: &mut CheckPoint, - checkpoint_modified: &mut bool, - timeline: &dyn TimelineWriter, - lsn: Lsn, - xlrec: &XlMultiXactCreate, - decoded: &DecodedWALRecord, -) -> Result<()> { - let rec = WALRecord { - will_init: false, - rec: decoded.record.clone(), - main_data_offset: decoded.main_data_offset as u32, - }; - let pageno = xlrec.mid / pg_constants::MULTIXACT_OFFSETS_PER_PAGE as u32; - let segno = pageno / pg_constants::SLRU_PAGES_PER_SEGMENT; - let rpageno = pageno % pg_constants::SLRU_PAGES_PER_SEGMENT; - timeline.put_wal_record( - lsn, - RelishTag::Slru { - slru: SlruKind::MultiXactOffsets, - segno, - }, - rpageno, - rec.clone(), - )?; + // Creating a database is implemented by copying the template (aka. source) database. + // To copy all the relations, we need to ask for the state as of the same LSN, but we + // cannot pass 'lsn' to the Timeline.get_* functions, or they will block waiting for + // the last valid LSN to advance up to it. So we use the previous record's LSN in the + // get calls instead. + let req_lsn = min(timeline.get_last_record_lsn(), lsn); + + let rels = timeline.list_rels(src_tablespace_id, src_db_id, req_lsn)?; + + trace!("ingest_xlog_dbase_create: {} rels", rels.len()); + + let mut num_rels_copied = 0; + let mut num_blocks_copied = 0; + for rel in rels { + if let RelishTag::Relation(src_rel) = rel { + assert_eq!(src_rel.spcnode, src_tablespace_id); + assert_eq!(src_rel.dbnode, src_db_id); + + let nblocks = timeline.get_relish_size(rel, req_lsn)?.unwrap_or(0); + let dst_rel = RelTag { + spcnode: tablespace_id, + dbnode: db_id, + relnode: src_rel.relnode, + forknum: src_rel.forknum, + }; + + // Copy content + for blknum in 0..nblocks { + let content = timeline.get_page_at_lsn(rel, blknum, req_lsn)?; + + debug!("copying block {} from {} to {}", blknum, src_rel, dst_rel); + + timeline.put_page_image(RelishTag::Relation(dst_rel), blknum, lsn, content)?; + num_blocks_copied += 1; + } + + if nblocks == 0 { + // make sure we have some trace of the relation, even if it's empty + timeline.put_truncation(RelishTag::Relation(dst_rel), lsn, 0)?; + } + + num_rels_copied += 1; + } + } + + // Copy relfilemap + // TODO This implementation is very inefficient - + // it scans all non-rels only to find FileNodeMaps + for tag in timeline.list_nonrels(req_lsn)? { + if let RelishTag::FileNodeMap { spcnode, dbnode } = tag { + if spcnode == src_tablespace_id && dbnode == src_db_id { + let img = timeline.get_page_at_lsn(tag, 0, req_lsn)?; + let new_tag = RelishTag::FileNodeMap { + spcnode: tablespace_id, + dbnode: db_id, + }; + timeline.put_page_image(new_tag, 0, lsn, img)?; + break; + } + } + } + info!( + "Created database {}/{}, copied {} blocks in {} rels at {}", + tablespace_id, db_id, num_blocks_copied, num_rels_copied, lsn + ); + Ok(()) + } + + /// Subroutine of ingest_record(), to handle an XLOG_SMGR_TRUNCATE record. + /// + /// This is the same logic as in PostgreSQL's smgr_redo() function. + fn ingest_xlog_smgr_truncate( + &mut self, + timeline: &dyn TimelineWriter, + lsn: Lsn, + rec: &XlSmgrTruncate, + ) -> Result<()> { + let spcnode = rec.rnode.spcnode; + let dbnode = rec.rnode.dbnode; + let relnode = rec.rnode.relnode; + + if (rec.flags & pg_constants::SMGR_TRUNCATE_HEAP) != 0 { + let rel = RelTag { + spcnode, + dbnode, + relnode, + forknum: pg_constants::MAIN_FORKNUM, + }; + timeline.put_truncation(RelishTag::Relation(rel), lsn, rec.blkno)?; + } + if (rec.flags & pg_constants::SMGR_TRUNCATE_FSM) != 0 { + let rel = RelTag { + spcnode, + dbnode, + relnode, + forknum: pg_constants::FSM_FORKNUM, + }; + + // FIXME: 'blkno' stored in the WAL record is the new size of the + // heap. The formula for calculating the new size of the FSM is + // pretty complicated (see FreeSpaceMapPrepareTruncateRel() in + // PostgreSQL), and we should also clear bits in the tail FSM block, + // and update the upper level FSM pages. None of that has been + // implemented. What we do instead, is always just truncate the FSM + // to zero blocks. That's bad for performance, but safe. (The FSM + // isn't needed for correctness, so we could also leave garbage in + // it. Seems more tidy to zap it away.) + if rec.blkno != 0 { + info!("Partial truncation of FSM is not supported"); + } + let num_fsm_blocks = 0; + timeline.put_truncation(RelishTag::Relation(rel), lsn, num_fsm_blocks)?; + } + if (rec.flags & pg_constants::SMGR_TRUNCATE_VM) != 0 { + let rel = RelTag { + spcnode, + dbnode, + relnode, + forknum: pg_constants::VISIBILITYMAP_FORKNUM, + }; + + // FIXME: Like with the FSM above, the logic to truncate the VM + // correctly has not been implemented. Just zap it away completely, + // always. Unlike the FSM, the VM must never have bits incorrectly + // set. From a correctness point of view, it's always OK to clear + // bits or remove it altogether, though. + if rec.blkno != 0 { + info!("Partial truncation of VM is not supported"); + } + let num_vm_blocks = 0; + timeline.put_truncation(RelishTag::Relation(rel), lsn, num_vm_blocks)?; + } + Ok(()) + } + + /// Subroutine of ingest_record(), to handle an XLOG_XACT_* records. + /// + fn ingest_xact_record( + &mut self, + timeline: &dyn TimelineWriter, + lsn: Lsn, + parsed: &XlXactParsedRecord, + decoded: &DecodedWALRecord, + ) -> Result<()> { + // Record update of CLOG page + let mut pageno = parsed.xid / pg_constants::CLOG_XACTS_PER_PAGE; - let first_mbr_pageno = xlrec.moff / pg_constants::MULTIXACT_MEMBERS_PER_PAGE as u32; - let last_mbr_pageno = - (xlrec.moff + xlrec.nmembers - 1) / pg_constants::MULTIXACT_MEMBERS_PER_PAGE as u32; - // The members SLRU can, in contrast to the offsets one, be filled to almost - // the full range at once. So we need to handle wraparound. - let mut pageno = first_mbr_pageno; - loop { - // Update members page let segno = pageno / pg_constants::SLRU_PAGES_PER_SEGMENT; let rpageno = pageno % pg_constants::SLRU_PAGES_PER_SEGMENT; + let rec = WALRecord { + will_init: false, + rec: decoded.record.clone(), + main_data_offset: decoded.main_data_offset as u32, + }; timeline.put_wal_record( lsn, RelishTag::Slru { - slru: SlruKind::MultiXactMembers, + slru: SlruKind::Clog, segno, }, rpageno, rec.clone(), )?; - if pageno == last_mbr_pageno { - // last block inclusive - break; + for subxact in &parsed.subxacts { + let subxact_pageno = subxact / pg_constants::CLOG_XACTS_PER_PAGE; + if subxact_pageno != pageno { + pageno = subxact_pageno; + let segno = pageno / pg_constants::SLRU_PAGES_PER_SEGMENT; + let rpageno = pageno % pg_constants::SLRU_PAGES_PER_SEGMENT; + timeline.put_wal_record( + lsn, + RelishTag::Slru { + slru: SlruKind::Clog, + segno, + }, + rpageno, + rec.clone(), + )?; + } + } + for xnode in &parsed.xnodes { + for forknum in pg_constants::MAIN_FORKNUM..=pg_constants::VISIBILITYMAP_FORKNUM { + let rel = RelTag { + forknum, + spcnode: xnode.spcnode, + dbnode: xnode.dbnode, + relnode: xnode.relnode, + }; + timeline.drop_relish(RelishTag::Relation(rel), lsn)?; + } + } + Ok(()) + } + + fn ingest_clog_truncate_record( + &mut self, + timeline: &dyn TimelineWriter, + lsn: Lsn, + xlrec: &XlClogTruncate, + ) -> Result<()> { + info!( + "RM_CLOG_ID truncate pageno {} oldestXid {} oldestXidDB {} lsn {}", + xlrec.pageno, xlrec.oldest_xid, xlrec.oldest_xid_db, lsn + ); + + // Here we treat oldestXid and oldestXidDB + // differently from postgres redo routines. + // In postgres checkpoint.oldestXid lags behind xlrec.oldest_xid + // until checkpoint happens and updates the value. + // Here we can use the most recent value. + // It's just an optimization, though and can be deleted. + // TODO Figure out if there will be any issues with replica. + self.checkpoint.oldestXid = xlrec.oldest_xid; + self.checkpoint.oldestXidDB = xlrec.oldest_xid_db; + self.checkpoint_modified = true; + + // TODO Treat AdvanceOldestClogXid() or write a comment why we don't need it + + let latest_page_number = + self.checkpoint.nextXid.value as u32 / pg_constants::CLOG_XACTS_PER_PAGE; + + // Now delete all segments containing pages between xlrec.pageno + // and latest_page_number. + + // First, make an important safety check: + // the current endpoint page must not be eligible for removal. + // See SimpleLruTruncate() in slru.c + if clogpage_precedes(latest_page_number, xlrec.pageno) { + info!("could not truncate directory pg_xact apparent wraparound"); + return Ok(()); } - // handle wraparound - if pageno == MAX_MBR_BLKNO { - pageno = 0; - } else { - pageno += 1; + // Iterate via SLRU CLOG segments and drop segments that we're ready to truncate + // TODO This implementation is very inefficient - + // it scans all non-rels only to find Clog + // + // We cannot pass 'lsn' to the Timeline.list_nonrels(), or it + // will block waiting for the last valid LSN to advance up to + // it. So we use the previous record's LSN in the get calls + // instead. + let req_lsn = min(timeline.get_last_record_lsn(), lsn); + for obj in timeline.list_nonrels(req_lsn)? { + if let RelishTag::Slru { slru, segno } = obj { + if slru == SlruKind::Clog { + let segpage = segno * pg_constants::SLRU_PAGES_PER_SEGMENT; + if slru_may_delete_clogsegment(segpage, xlrec.pageno) { + timeline.drop_relish(RelishTag::Slru { slru, segno }, lsn)?; + trace!("Drop CLOG segment {:>04X} at lsn {}", segno, lsn); + } + } + } } - } - if xlrec.mid >= checkpoint.nextMulti { - checkpoint.nextMulti = xlrec.mid + 1; - *checkpoint_modified = true; - } - if xlrec.moff + xlrec.nmembers > checkpoint.nextMultiOffset { - checkpoint.nextMultiOffset = xlrec.moff + xlrec.nmembers; - *checkpoint_modified = true; - } - let max_mbr_xid = xlrec.members.iter().fold(0u32, |acc, mbr| { - if mbr.xid.wrapping_sub(acc) as i32 > 0 { - mbr.xid - } else { - acc - } - }); - if checkpoint.update_next_xid(max_mbr_xid) { - *checkpoint_modified = true; + Ok(()) } - Ok(()) -} -fn save_multixact_truncate_record( - checkpoint: &mut CheckPoint, - checkpoint_modified: &mut bool, - timeline: &dyn TimelineWriter, - lsn: Lsn, - xlrec: &XlMultiXactTruncate, -) -> Result<()> { - checkpoint.oldestMulti = xlrec.end_trunc_off; - checkpoint.oldestMultiDB = xlrec.oldest_multi_db; - *checkpoint_modified = true; - - // PerformMembersTruncation - let maxsegment: i32 = mx_offset_to_member_segment(pg_constants::MAX_MULTIXACT_OFFSET); - let startsegment: i32 = mx_offset_to_member_segment(xlrec.start_trunc_memb); - let endsegment: i32 = mx_offset_to_member_segment(xlrec.end_trunc_memb); - let mut segment: i32 = startsegment; - - // Delete all the segments except the last one. The last segment can still - // contain, possibly partially, valid data. - while segment != endsegment { - timeline.drop_relish( - RelishTag::Slru { - slru: SlruKind::MultiXactMembers, - segno: segment as u32, - }, + fn ingest_multixact_create_record( + &mut self, + timeline: &dyn TimelineWriter, + lsn: Lsn, + xlrec: &XlMultiXactCreate, + decoded: &DecodedWALRecord, + ) -> Result<()> { + let rec = WALRecord { + will_init: false, + rec: decoded.record.clone(), + main_data_offset: decoded.main_data_offset as u32, + }; + let pageno = xlrec.mid / pg_constants::MULTIXACT_OFFSETS_PER_PAGE as u32; + let segno = pageno / pg_constants::SLRU_PAGES_PER_SEGMENT; + let rpageno = pageno % pg_constants::SLRU_PAGES_PER_SEGMENT; + timeline.put_wal_record( lsn, + RelishTag::Slru { + slru: SlruKind::MultiXactOffsets, + segno, + }, + rpageno, + rec.clone(), )?; - /* move to next segment, handling wraparound correctly */ - if segment == maxsegment { - segment = 0; - } else { - segment += 1; + let first_mbr_pageno = xlrec.moff / pg_constants::MULTIXACT_MEMBERS_PER_PAGE as u32; + let last_mbr_pageno = + (xlrec.moff + xlrec.nmembers - 1) / pg_constants::MULTIXACT_MEMBERS_PER_PAGE as u32; + // The members SLRU can, in contrast to the offsets one, be filled to almost + // the full range at once. So we need to handle wraparound. + let mut pageno = first_mbr_pageno; + loop { + // Update members page + let segno = pageno / pg_constants::SLRU_PAGES_PER_SEGMENT; + let rpageno = pageno % pg_constants::SLRU_PAGES_PER_SEGMENT; + timeline.put_wal_record( + lsn, + RelishTag::Slru { + slru: SlruKind::MultiXactMembers, + segno, + }, + rpageno, + rec.clone(), + )?; + + if pageno == last_mbr_pageno { + // last block inclusive + break; + } + + // handle wraparound + if pageno == MAX_MBR_BLKNO { + pageno = 0; + } else { + pageno += 1; + } } + if xlrec.mid >= self.checkpoint.nextMulti { + self.checkpoint.nextMulti = xlrec.mid + 1; + self.checkpoint_modified = true; + } + if xlrec.moff + xlrec.nmembers > self.checkpoint.nextMultiOffset { + self.checkpoint.nextMultiOffset = xlrec.moff + xlrec.nmembers; + self.checkpoint_modified = true; + } + let max_mbr_xid = xlrec.members.iter().fold(0u32, |acc, mbr| { + if mbr.xid.wrapping_sub(acc) as i32 > 0 { + mbr.xid + } else { + acc + } + }); + + if self.checkpoint.update_next_xid(max_mbr_xid) { + self.checkpoint_modified = true; + } + Ok(()) } - // Truncate offsets - // FIXME: this did not handle wraparound correctly + fn ingest_multixact_truncate_record( + &mut self, + timeline: &dyn TimelineWriter, + lsn: Lsn, + xlrec: &XlMultiXactTruncate, + ) -> Result<()> { + self.checkpoint.oldestMulti = xlrec.end_trunc_off; + self.checkpoint.oldestMultiDB = xlrec.oldest_multi_db; + self.checkpoint_modified = true; - Ok(()) -} - -fn save_relmap_page( - timeline: &dyn TimelineWriter, - lsn: Lsn, - xlrec: &XlRelmapUpdate, - decoded: &DecodedWALRecord, -) -> Result<()> { - let tag = RelishTag::FileNodeMap { - spcnode: xlrec.tsid, - dbnode: xlrec.dbid, - }; - - let mut buf = decoded.record.clone(); - buf.advance(decoded.main_data_offset); - // skip xl_relmap_update - buf.advance(12); - - timeline.put_page_image(tag, 0, lsn, Bytes::copy_from_slice(&buf[..]))?; - - Ok(()) + // PerformMembersTruncation + let maxsegment: i32 = mx_offset_to_member_segment(pg_constants::MAX_MULTIXACT_OFFSET); + let startsegment: i32 = mx_offset_to_member_segment(xlrec.start_trunc_memb); + let endsegment: i32 = mx_offset_to_member_segment(xlrec.end_trunc_memb); + let mut segment: i32 = startsegment; + + // Delete all the segments except the last one. The last segment can still + // contain, possibly partially, valid data. + while segment != endsegment { + timeline.drop_relish( + RelishTag::Slru { + slru: SlruKind::MultiXactMembers, + segno: segment as u32, + }, + lsn, + )?; + + /* move to next segment, handling wraparound correctly */ + if segment == maxsegment { + segment = 0; + } else { + segment += 1; + } + } + + // Truncate offsets + // FIXME: this did not handle wraparound correctly + + Ok(()) + } + + fn ingest_relmap_page( + &mut self, + timeline: &dyn TimelineWriter, + lsn: Lsn, + xlrec: &XlRelmapUpdate, + decoded: &DecodedWALRecord, + ) -> Result<()> { + let tag = RelishTag::FileNodeMap { + spcnode: xlrec.tsid, + dbnode: xlrec.dbid, + }; + + let mut buf = decoded.record.clone(); + buf.advance(decoded.main_data_offset); + // skip xl_relmap_update + buf.advance(12); + + timeline.put_page_image(tag, 0, lsn, Bytes::copy_from_slice(&buf[..]))?; + + Ok(()) + } } diff --git a/pageserver/src/walreceiver.rs b/pageserver/src/walreceiver.rs index 66c2080086..12de49286f 100644 --- a/pageserver/src/walreceiver.rs +++ b/pageserver/src/walreceiver.rs @@ -5,12 +5,10 @@ //! //! We keep one WAL receiver active per timeline. -use crate::relish::*; use crate::tenant_mgr; use crate::tenant_mgr::TenantState; use crate::tenant_threads; -use crate::walingest; -use crate::walrecord::*; +use crate::walingest::WalIngest; use crate::PageServerConf; use anyhow::{bail, Context, Error, Result}; use lazy_static::lazy_static; @@ -18,7 +16,6 @@ use postgres::fallible_iterator::FallibleIterator; use postgres::replication::ReplicationIter; use postgres::{Client, NoTls, SimpleQueryMessage, SimpleQueryRow}; use postgres_ffi::waldecoder::*; -use postgres_ffi::*; use postgres_protocol::message::backend::ReplicationMessage; use postgres_types::PgLsn; use std::cell::Cell; @@ -240,9 +237,7 @@ fn walreceiver_main( let mut waldecoder = WalStreamDecoder::new(startpoint); - let checkpoint_bytes = timeline.get_page_at_lsn(RelishTag::Checkpoint, 0, startpoint)?; - let mut checkpoint = CheckPoint::decode(&checkpoint_bytes)?; - trace!("CheckPoint.nextXid = {}", checkpoint.nextXid.value); + let mut walingest = WalIngest::new(&*timeline, startpoint)?; while let Some(replication_message) = physical_stream.next()? { let status_update = match replication_message { @@ -266,34 +261,8 @@ fn walreceiver_main( assert!(lsn.is_aligned()); let writer = timeline.writer(); + walingest.ingest_record(writer.as_ref(), recdata, lsn)?; - let mut checkpoint_modified = false; - - let decoded = decode_wal_record(recdata.clone()); - walingest::save_decoded_record( - &mut checkpoint, - &mut checkpoint_modified, - writer.as_ref(), - &decoded, - recdata, - lsn, - )?; - - // Check if checkpoint data was updated by save_decoded_record - if checkpoint_modified { - let new_checkpoint_bytes = checkpoint.encode(); - - writer.put_page_image( - RelishTag::Checkpoint, - 0, - lsn, - new_checkpoint_bytes, - )?; - } - - // Now that this record has been fully handled, including updating the - // checkpoint data, let the repository know that it is up-to-date to this LSN - writer.advance_last_record_lsn(lsn); last_rec_lsn = lsn; }