From 9ece1e863dda3783ba18e364737d9f41c1c1b0be Mon Sep 17 00:00:00 2001 From: Konstantin Knizhnik Date: Fri, 14 May 2021 16:32:52 +0300 Subject: [PATCH] Compute and restore pg_xact, pg_multixact and pg_filenode.map files --- pageserver/src/basebackup.rs | 197 ++++++++++++++++++++++++++- pageserver/src/page_service.rs | 41 ++++-- pageserver/src/repository.rs | 13 +- pageserver/src/repository/rocksdb.rs | 94 ++++++++++++- pageserver/src/restore_local_repo.rs | 101 ++++++++++++-- pageserver/src/waldecoder.rs | 156 +++++++++++++++++++++ pageserver/src/walredo.rs | 78 ++++++++++- postgres_ffi/src/pg_constants.rs | 19 +++ 8 files changed, 665 insertions(+), 34 deletions(-) diff --git a/pageserver/src/basebackup.rs b/pageserver/src/basebackup.rs index 739b63c0c9..1e02ef589a 100644 --- a/pageserver/src/basebackup.rs +++ b/pageserver/src/basebackup.rs @@ -1,12 +1,207 @@ use crate::ZTimelineId; use log::*; use std::io::Write; -use tar::Builder; +use std::sync::Arc; +use std::time::SystemTime; +use tar::{Builder, Header}; use walkdir::WalkDir; +use crate::repository::{BufferTag, RelTag, Timeline}; +use postgres_ffi::pg_constants; use postgres_ffi::relfile_utils::*; use zenith_utils::lsn::Lsn; +fn new_tar_header(path: &str, size: u64) -> anyhow::Result
{ + let mut header = Header::new_gnu(); + header.set_size(size); + header.set_path(path)?; + header.set_mode(0b110000000); + header.set_mtime(SystemTime::now().duration_since(SystemTime::UNIX_EPOCH).unwrap().as_secs()); + header.set_cksum(); + Ok(header) +} + +// +// Generate SRLU segment files from repoistory +// +fn add_slru_segments( + ar: &mut Builder<&mut dyn Write>, + timeline: &Arc, + path: &str, + forknum: u8, + lsn: Lsn, +) -> anyhow::Result<()> { + let rel = RelTag { + spcnode: 0, + dbnode: 0, + relnode: 0, + forknum, + }; + let (first, last) = timeline.get_range(rel, lsn)?; + const SEG_SIZE: usize = + pg_constants::BLCKSZ as usize * pg_constants::SLRU_PAGES_PER_SEGMENT as usize; + let mut seg_buf = [0u8; SEG_SIZE]; + let mut curr_segno: Option = None; + for page in first..last { + let tag = BufferTag { rel, blknum: page }; + let img = timeline.get_page_at_lsn(tag, lsn)?; + // Zero length image indicates truncated segment: just skip it + if img.len() != 0 { + assert!(img.len() == pg_constants::BLCKSZ as usize); + + let segno = page / pg_constants::SLRU_PAGES_PER_SEGMENT; + if curr_segno.is_some() && curr_segno.unwrap() != segno { + let segname = format!("{}/{:>04X}", path, curr_segno.unwrap()); + let header = new_tar_header(&segname, SEG_SIZE as u64)?; + ar.append(&header, &seg_buf[..])?; + seg_buf.fill(0); + } + curr_segno = Some(segno); + let offs_start = (page % pg_constants::SLRU_PAGES_PER_SEGMENT) as usize + * pg_constants::BLCKSZ as usize; + let offs_end = offs_start + pg_constants::BLCKSZ as usize; + seg_buf[offs_start..offs_end].copy_from_slice(&img); + } + } + if curr_segno.is_some() { + let segname = format!("{}/{:>04X}", path, curr_segno.unwrap()); + let header = new_tar_header(&segname, SEG_SIZE as u64)?; + ar.append(&header, &seg_buf[..])?; + } + Ok(()) +} + +// +// Extract pg_filenode.map files from repoistory +// +fn add_relmap_files( + ar: &mut Builder<&mut dyn Write>, + timeline: &Arc, + lsn: Lsn, +) -> anyhow::Result<()> { + for db in timeline.get_databases()?.iter() { + let tag = BufferTag { + rel: *db, + blknum: 0, + }; + let img = timeline.get_page_at_lsn(tag, lsn)?; + let path = if db.spcnode == pg_constants::GLOBALTABLESPACE_OID { + String::from("global/pg_filenode.map") + } else { + // User defined tablespaces are not supported + assert!(db.spcnode == pg_constants::DEFAULTTABLESPACE_OID); + format!("base/{}/pg_filenode.map", db.dbnode) + }; + let header = new_tar_header(&path, 512)?; + ar.append(&header, &img[..])?; + } + Ok(()) +} + +/// +/// Generate tarball with non-relational files from repository +/// +pub fn send_tarball_at_lsn( + write: &mut dyn Write, + timelineid: ZTimelineId, + timeline: &Arc, + lsn: Lsn, + snapshot_lsn: Lsn, +) -> anyhow::Result<()> { + let mut ar = Builder::new(write); + + let snappath = format!("timelines/{}/snapshots/{:016X}", timelineid, snapshot_lsn.0); + let walpath = format!("timelines/{}/wal", timelineid); + + debug!("sending tarball of snapshot in {}", snappath); + for entry in WalkDir::new(&snappath) { + let entry = entry?; + let fullpath = entry.path(); + let relpath = entry.path().strip_prefix(&snappath).unwrap(); + + if relpath.to_str().unwrap() == "" { + continue; + } + + if entry.file_type().is_dir() { + trace!( + "sending dir {} as {}", + fullpath.display(), + relpath.display() + ); + ar.append_dir(relpath, fullpath)?; + } else if entry.file_type().is_symlink() { + error!("ignoring symlink in snapshot dir"); + } else if entry.file_type().is_file() { + // Shared catalogs are exempt + if relpath.starts_with("global/") { + trace!("sending shared catalog {}", relpath.display()); + ar.append_path_with_name(fullpath, relpath)?; + } else if !is_rel_file_path(relpath.to_str().unwrap()) { + if entry.file_name() != "pg_filenode.map" + && !relpath.starts_with("pg_xact/") + && !relpath.starts_with("pg_multixact/") + { + trace!("sending {}", relpath.display()); + ar.append_path_with_name(fullpath, relpath)?; + } + } else { + trace!("not sending {}", relpath.display()); + } + } else { + error!("unknown file type: {}", fullpath.display()); + } + } + + add_slru_segments( + &mut ar, + timeline, + "pg_xact", + pg_constants::PG_XACT_FORKNUM, + lsn, + )?; + add_slru_segments( + &mut ar, + timeline, + "pg_multixact/members", + pg_constants::PG_MXACT_MEMBERS_FORKNUM, + lsn, + )?; + add_slru_segments( + &mut ar, + timeline, + "pg_multixact/offsets", + pg_constants::PG_MXACT_OFFSETS_FORKNUM, + lsn, + )?; + add_relmap_files(&mut ar, timeline, lsn)?; + + // FIXME: Also send all the WAL. The compute node would only need + // the WAL that applies to non-relation files, because the page + // server handles all the relation files. But we don't have a + // mechanism for separating relation and non-relation WAL at the + // moment. + for entry in std::fs::read_dir(&walpath)? { + let entry = entry?; + let fullpath = &entry.path(); + let relpath = fullpath.strip_prefix(&walpath).unwrap(); + + if !entry.path().is_file() { + continue; + } + + let archive_fname = relpath.to_str().unwrap(); + let archive_fname = archive_fname + .strip_suffix(".partial") + .unwrap_or(&archive_fname); + let archive_path = "pg_wal/".to_owned() + archive_fname; + ar.append_path_with_name(fullpath, archive_path)?; + } + ar.finish()?; + debug!("all tarred up!"); + Ok(()) +} + /// /// Send a tarball containing a snapshot of all non-relation files in the /// PostgreSQL data directory, at given LSN diff --git a/pageserver/src/page_service.rs b/pageserver/src/page_service.rs index 06bba6932b..79998c37ba 100644 --- a/pageserver/src/page_service.rs +++ b/pageserver/src/page_service.rs @@ -653,12 +653,18 @@ impl Connection { } else if query_string.starts_with(b"basebackup ") { let (_l, r) = query_string.split_at("basebackup ".len()); let r = r.to_vec(); - let timelineid_str = String::from(String::from_utf8(r)?.trim_end()); + let basebackup_args = String::from(String::from_utf8(r)?.trim_end()); + let args: Vec<&str> = basebackup_args.rsplit(' ').collect(); + let timelineid_str = args[0]; info!("got basebackup command: \"{}\"", timelineid_str); let timelineid = ZTimelineId::from_str(&timelineid_str)?; - + let lsn = if args.len() > 1 { + Some(Lsn::from_str(args[1])?) + } else { + None + }; // Check that the timeline exists - self.handle_basebackup_request(timelineid)?; + self.handle_basebackup_request(timelineid, lsn)?; self.write_message_noflush(&BeMessage::CommandComplete)?; self.write_message(&BeMessage::ReadyForQuery)?; } else if query_string.starts_with(b"callmemaybe ") { @@ -814,16 +820,19 @@ impl Connection { } } - fn handle_basebackup_request(&mut self, timelineid: ZTimelineId) -> anyhow::Result<()> { + fn handle_basebackup_request( + &mut self, + timelineid: ZTimelineId, + lsn: Option, + ) -> anyhow::Result<()> { // check that the timeline exists let repository = page_cache::get_repository(); - if repository.get_or_restore_timeline(timelineid).is_err() { - bail!( + let timeline = repository.get_or_restore_timeline(timelineid).map_err(|_| { + anyhow!( "client requested basebackup on timeline {} which does not exist in page server", timelineid - ); - } - + ) + })?; /* switch client to COPYOUT */ let stream = &mut self.stream; stream.write_u8(b'H')?; @@ -836,10 +845,16 @@ impl Connection { /* Send a tarball of the latest snapshot on the timeline */ // find latest snapshot - let snapshotlsn = restore_local_repo::find_latest_snapshot(&self.conf, timelineid).unwrap(); - - basebackup::send_snapshot_tarball(&mut CopyDataSink { stream }, timelineid, snapshotlsn)?; - + let snapshot_lsn = + restore_local_repo::find_latest_snapshot(&self.conf, timelineid).unwrap(); + let req_lsn = lsn.unwrap_or(snapshot_lsn); + basebackup::send_tarball_at_lsn( + &mut CopyDataSink { stream }, + timelineid, + &timeline, + req_lsn, + snapshot_lsn, + )?; // CopyDone self.stream.write_u8(b'c')?; self.stream.write_u32::(4)?; diff --git a/pageserver/src/repository.rs b/pageserver/src/repository.rs index 3d83c155fd..cb3eb5f67e 100644 --- a/pageserver/src/repository.rs +++ b/pageserver/src/repository.rs @@ -106,6 +106,13 @@ pub trait Timeline { /// valid LSN, so that the WAL receiver knows where to restart streaming. fn advance_last_record_lsn(&self, lsn: Lsn); fn get_last_record_lsn(&self) -> Lsn; + + /// Get range [begin,end) of stored blocks. Used mostly for SMGR pseudorelations + /// but can be also applied to normal relations. + fn get_range(&self, rel: RelTag, lsn: Lsn) -> Result<(u32, u32)>; + + /// Get vector of databases (represented using RelTag only dbnode and spcnode fields are used) + fn get_databases(&self) -> Result>; } #[derive(Clone)] @@ -118,25 +125,25 @@ pub struct RepositoryStats { #[derive(Debug, PartialEq, Eq, PartialOrd, Hash, Ord, Clone, Copy)] pub struct RelTag { + pub forknum: u8, pub spcnode: u32, pub dbnode: u32, pub relnode: u32, - pub forknum: u8, } impl RelTag { pub fn pack(&self, buf: &mut BytesMut) { + buf.put_u8(self.forknum); buf.put_u32(self.spcnode); buf.put_u32(self.dbnode); buf.put_u32(self.relnode); - buf.put_u32(self.forknum as u32); // encode forknum as u32 to provide compatibility with wal_redo_postgres } pub fn unpack(buf: &mut BytesMut) -> RelTag { RelTag { + forknum: buf.get_u8(), spcnode: buf.get_u32(), dbnode: buf.get_u32(), relnode: buf.get_u32(), - forknum: buf.get_u32() as u8, } } } diff --git a/pageserver/src/repository/rocksdb.rs b/pageserver/src/repository/rocksdb.rs index 4bfb161482..0ee33e11e3 100644 --- a/pageserver/src/repository/rocksdb.rs +++ b/pageserver/src/repository/rocksdb.rs @@ -440,7 +440,6 @@ impl RocksTimeline { let new_img = self .walredo_mgr .request_redo(key.tag, key.lsn, base_img, records)?; - self.put_page_image(key.tag, key.lsn, new_img.clone()); reconstructed += 1; @@ -629,10 +628,88 @@ impl Timeline for RocksTimeline { /// Get size of relation at given LSN. /// fn get_relsize(&self, rel: RelTag, lsn: Lsn) -> Result { - self.wait_lsn(lsn)?; + let lsn = self.wait_lsn(lsn)?; self.relsize_get_nowait(rel, lsn) } + /// Get databases. This function is used to local pg_filenode.map files + fn get_databases(&self) -> Result> { + let key = CacheKey { + // minimal key + tag: BufferTag { + rel: RelTag { + forknum: pg_constants::PG_FILENODEMAP_FORKNUM, + spcnode: 0, + dbnode: 0, + relnode: 0, + }, + blknum: 0, + }, + lsn: Lsn(0), + }; + let mut buf = BytesMut::new(); + key.pack(&mut buf); + let mut dbs = Vec::new(); + + let mut iter = self.db.raw_iterator(); + iter.seek(&buf[..]); + let mut prev_tag = key.tag.rel; + while iter.valid() { + let k = iter.key().unwrap(); + buf.clear(); + buf.extend_from_slice(&k); + let tag = RelTag::unpack(&mut buf); + if tag.forknum != pg_constants::PG_FILENODEMAP_FORKNUM { + break; // we are done with this fork + } + if tag != prev_tag { + dbs.push(tag); // collect unique tags + prev_tag = tag; + } + iter.next(); + } + return Ok(dbs); + } + + /// Get range [begin,end) of stored blocks. Used mostly for SMGR pseudorelations + /// but can be also applied to normal relations. + fn get_range(&self, rel: RelTag, lsn: Lsn) -> Result<(u32, u32)> { + let lsn = self.wait_lsn(lsn)?; + let mut key = CacheKey { + // minimal key to start with + tag: BufferTag { rel, blknum: 0 }, + lsn, + }; + let mut buf = BytesMut::new(); + key.pack(&mut buf); + let mut iter = self.db.raw_iterator(); + iter.seek(&buf[..]); // locate first entry + if iter.valid() { + let k = iter.key().unwrap(); + buf.clear(); + buf.extend_from_slice(&k); + let tag = BufferTag::unpack(&mut buf); + if tag.rel == rel { + // still trversing this relation + let first_blknum = tag.blknum; + key.tag.blknum = u32::MAX; // maximal key + buf.clear(); + key.pack(&mut buf); + let mut iter = self.db.raw_iterator(); + iter.seek_for_prev(&buf[..]); // localte last entry + if iter.valid() { + let k = iter.key().unwrap(); + buf.clear(); + buf.extend_from_slice(&k); + let tag = BufferTag::unpack(&mut buf); + let last_blknum = tag.blknum; + return Ok((first_blknum, last_blknum + 1)); // upper boundary is exclusive + } + } + } + Ok((0, 0)) // empty range + } + /// /// Does relation exist at given LSN? /// @@ -731,14 +808,25 @@ impl Timeline for RocksTimeline { /// Memorize a full image of a page version /// fn put_page_image(&self, tag: BufferTag, lsn: Lsn, img: Bytes) { + let img_len = img.len(); let key = CacheKey { tag, lsn }; let content = CacheEntryContent::PageImage(img); - let mut key_buf = BytesMut::new(); key.pack(&mut key_buf); let mut val_buf = BytesMut::new(); content.pack(&mut val_buf); + // Zero size of page image indicates that SLRU page was truncated + if img_len == 0 && key.tag.rel.forknum > pg_constants::PG_XACT_FORKNUM { + if (val_buf[0] & UNUSED_VERSION_FLAG) != 0 { + // records already marked for deletion + return; + } else { + // delete truncated multixact page + val_buf[0] |= UNUSED_VERSION_FLAG; + } + } + trace!("put_wal_record lsn: {}", key.lsn); let _res = self.db.put(&key_buf[..], &val_buf[..]); diff --git a/pageserver/src/restore_local_repo.rs b/pageserver/src/restore_local_repo.rs index 57cefb7c01..1c24964fc1 100644 --- a/pageserver/src/restore_local_repo.rs +++ b/pageserver/src/restore_local_repo.rs @@ -23,7 +23,7 @@ use anyhow::Result; use bytes::Bytes; use crate::repository::{BufferTag, RelTag, Timeline}; -use crate::waldecoder::{decode_wal_record, WalStreamDecoder}; +use crate::waldecoder::{Oid, decode_wal_record, WalStreamDecoder}; use crate::PageServerConf; use crate::ZTimelineId; use postgres_ffi::pg_constants; @@ -120,8 +120,26 @@ fn restore_snapshot( None => continue, // These special files appear in the snapshot, but are not needed by the page server - Some("pg_control") => continue, - Some("pg_filenode.map") => continue, + Some("pg_control") => restore_nonrel_file( + conf, + timeline, + timelineid, + snapshot, + pg_constants::GLOBALTABLESPACE_OID, + 0, + pg_constants::PG_CONTROLFILE_FORKNUM, + &direntry.path(), + )?, + Some("pg_filenode.map") => restore_nonrel_file( + conf, + timeline, + timelineid, + snapshot, + pg_constants::GLOBALTABLESPACE_OID, + 0, + pg_constants::PG_FILENODEMAP_FORKNUM, + &direntry.path(), + )?, // Load any relation files into the page server _ => restore_relfile( @@ -148,7 +166,16 @@ fn restore_snapshot( // These special files appear in the snapshot, but are not needed by the page server Some("PG_VERSION") => continue, - Some("pg_filenode.map") => continue, + Some("pg_filenode.map") => restore_nonrel_file( + conf, + timeline, + timelineid, + snapshot, + pg_constants::DEFAULTTABLESPACE_OID, + dboid, + pg_constants::PG_FILENODEMAP_FORKNUM, + &direntry.path(), + )?, // Load any relation files into the page server _ => restore_relfile( @@ -163,7 +190,7 @@ fn restore_snapshot( } for entry in fs::read_dir(snapshotpath.join("pg_xact"))? { let entry = entry?; - restore_nonrelfile( + restore_slru_file( conf, timeline, timelineid, @@ -172,6 +199,28 @@ fn restore_snapshot( &entry.path(), )?; } + for entry in fs::read_dir(snapshotpath.join("pg_multixact").join("members"))? { + let entry = entry?; + restore_slru_file( + conf, + timeline, + timelineid, + snapshot, + pg_constants::PG_MXACT_MEMBERS_FORKNUM, + &entry.path(), + )?; + } + for entry in fs::read_dir(snapshotpath.join("pg_multixact").join("offsets"))? { + let entry = entry?; + restore_slru_file( + conf, + timeline, + timelineid, + snapshot, + pg_constants::PG_MXACT_OFFSETS_FORKNUM, + &entry.path(), + )?; + } // TODO: Scan pg_tblspc Ok(()) @@ -180,8 +229,8 @@ fn restore_snapshot( fn restore_relfile( timeline: &dyn Timeline, snapshot: &str, - spcoid: u32, - dboid: u32, + spcoid: Oid, + dboid: Oid, path: &Path, ) -> Result<()> { let lsn = Lsn::from_hex(snapshot)?; @@ -239,7 +288,39 @@ fn restore_relfile( Ok(()) } -fn restore_nonrelfile( +fn restore_nonrel_file( + _conf: &PageServerConf, + timeline: &dyn Timeline, + _timelineid: ZTimelineId, + snapshot: &str, + spcoid: Oid, + dboid: Oid, + forknum: u8, + path: &Path, +) -> Result<()> { + let lsn = Lsn::from_hex(snapshot)?; + + // Does it look like a relation file? + + let mut file = File::open(path)?; + let mut buffer = Vec::new(); + // read the whole file + file.read_to_end(&mut buffer)?; + + let tag = BufferTag { + rel: RelTag { + spcnode: spcoid, + dbnode: dboid, + relnode: 0, + forknum, + }, + blknum: 0, + }; + timeline.put_page_image(tag, lsn, Bytes::copy_from_slice(&buffer[..])); + Ok(()) +} + +fn restore_slru_file( _conf: &PageServerConf, timeline: &dyn Timeline, _timelineid: ZTimelineId, @@ -263,7 +344,7 @@ fn restore_nonrelfile( let tag = BufferTag { rel: RelTag { spcnode: 0, - dbnode: 0, + dbnode: 0, relnode: 0, forknum, }, @@ -296,7 +377,7 @@ fn restore_nonrelfile( Ok(()) } -// Scan WAL on a timeline, starting from gien LSN, and load all the records +// Scan WAL on a timeline, starting from given LSN, and load all the records // into the page cache. fn restore_wal(timeline: &dyn Timeline, timelineid: ZTimelineId, startpoint: Lsn) -> Result<()> { let walpath = format!("timelines/{}/wal", timelineid); diff --git a/pageserver/src/waldecoder.rs b/pageserver/src/waldecoder.rs index cb0c3ce7de..ba8f9af6de 100644 --- a/pageserver/src/waldecoder.rs +++ b/pageserver/src/waldecoder.rs @@ -310,6 +310,9 @@ pub type Oid = u32; pub type TransactionId = u32; pub type BlockNumber = u32; pub type OffsetNumber = u16; +pub type MultiXactId = TransactionId; +pub type MultiXactOffset = u32; +pub type MultiXactStatus = u32; #[repr(C)] #[derive(Debug, Clone, Copy)] @@ -319,6 +322,24 @@ pub struct RelFileNode { pub relnode: Oid, /* relation */ } +#[repr(C)] +#[derive(Debug)] +pub struct XlRelmapUpdate { + pub dbid: Oid, /* database ID, or 0 for shared map */ + pub tsid: Oid, /* database's tablespace, or pg_global */ + pub nbytes: i32, /* size of relmap data */ +} + +impl XlRelmapUpdate { + pub fn decode(buf: &mut Bytes) -> XlRelmapUpdate { + XlRelmapUpdate { + dbid: buf.get_u32_le(), + tsid: buf.get_u32_le(), + nbytes: buf.get_i32_le(), + } + } +} + #[repr(C)] #[derive(Debug)] pub struct XlSmgrTruncate { @@ -441,6 +462,74 @@ impl XlHeapUpdate { } } +#[repr(C)] +#[derive(Debug)] +pub struct MultiXactMember { + pub xid: TransactionId, + pub status: MultiXactStatus, +} + +impl MultiXactMember { + pub fn decode(buf: &mut Bytes) -> MultiXactMember { + MultiXactMember { + xid: buf.get_u32_le(), + status: buf.get_u32_le(), + } + } +} + +#[repr(C)] +#[derive(Debug)] +pub struct XlMultiXactCreate { + pub mid: MultiXactId, /* new MultiXact's ID */ + pub moff: MultiXactOffset, /* its starting offset in members file */ + pub nmembers: u32, /* number of member XIDs */ + pub members: Vec, +} + +impl XlMultiXactCreate { + pub fn decode(buf: &mut Bytes) -> XlMultiXactCreate { + let mid = buf.get_u32_le(); + let moff = buf.get_u32_le(); + let nmembers = buf.get_u32_le(); + let mut members = Vec::new(); + for _ in 0..nmembers { + members.push(MultiXactMember::decode(buf)); + } + XlMultiXactCreate { + mid, + moff, + nmembers, + members, + } + } +} + +#[repr(C)] +#[derive(Debug)] +pub struct XlMultiXactTruncate { + oldest_multi_db: Oid, + /* to-be-truncated range of multixact offsets */ + start_trunc_off: MultiXactId, /* just for completeness' sake */ + end_trunc_off: MultiXactId, + + /* to-be-truncated range of multixact members */ + start_trunc_memb: MultiXactOffset, + end_trunc_memb: MultiXactOffset, +} + +impl XlMultiXactTruncate { + pub fn decode(buf: &mut Bytes) -> XlMultiXactTruncate { + XlMultiXactTruncate { + oldest_multi_db: buf.get_u32_le(), + start_trunc_off: buf.get_u32_le(), + end_trunc_off: buf.get_u32_le(), + start_trunc_memb: buf.get_u32_le(), + end_trunc_memb: buf.get_u32_le(), + } + } +} + // // Routines to decode a WAL record and figure out which blocks are modified // @@ -930,6 +1019,73 @@ pub fn decode_wal_record(record: Bytes) -> DecodedWALRecord { blocks.push(blk); } } + } else if xlogrec.xl_rmid == pg_constants::RM_MULTIXACT_ID { + let info = xlogrec.xl_info & pg_constants::XLOG_XACT_OPMASK; + if info == pg_constants::XLOG_MULTIXACT_ZERO_OFF_PAGE { + let mut blk = DecodedBkpBlock::new(); + blk.forknum = pg_constants::PG_MXACT_OFFSETS_FORKNUM; + blk.blkno = buf.get_u32_le(); + blk.will_init = true; + blocks.push(blk); + } else if info == pg_constants::XLOG_MULTIXACT_ZERO_MEM_PAGE { + let mut blk = DecodedBkpBlock::new(); + blk.forknum = pg_constants::PG_MXACT_MEMBERS_FORKNUM; + blk.blkno = buf.get_u32_le(); + blk.will_init = true; + blocks.push(blk); + } else if info == pg_constants::XLOG_MULTIXACT_CREATE_ID { + let xlrec = XlMultiXactCreate::decode(&mut buf); + // Update offset page + let mut blk = DecodedBkpBlock::new(); + blk.blkno = xlrec.mid / pg_constants::MULTIXACT_OFFSETS_PER_PAGE as u32; + blk.forknum = pg_constants::PG_MXACT_OFFSETS_FORKNUM; + blocks.push(blk); + let first_mbr_blkno = xlrec.moff / pg_constants::MULTIXACT_MEMBERS_PER_PAGE as u32; + let last_mbr_blkno = + (xlrec.moff + xlrec.nmembers - 1) / pg_constants::MULTIXACT_MEMBERS_PER_PAGE as u32; + for blkno in first_mbr_blkno..=last_mbr_blkno { + // Update members page + let mut blk = DecodedBkpBlock::new(); + blk.forknum = pg_constants::PG_MXACT_MEMBERS_FORKNUM; + blk.blkno = blkno; + blocks.push(blk); + } + } else if info == pg_constants::XLOG_MULTIXACT_TRUNCATE_ID { + let xlrec = XlMultiXactTruncate::decode(&mut buf); + let first_off_blkno = + xlrec.start_trunc_off / pg_constants::MULTIXACT_OFFSETS_PER_PAGE as u32; + let last_off_blkno = + xlrec.end_trunc_off / pg_constants::MULTIXACT_OFFSETS_PER_PAGE as u32; + for blkno in first_off_blkno..last_off_blkno { + let mut blk = DecodedBkpBlock::new(); + blk.forknum = pg_constants::PG_MXACT_OFFSETS_FORKNUM; + blk.blkno = blkno; + blk.will_init = true; + blocks.push(blk); + } + let first_mbr_blkno = + xlrec.start_trunc_memb / pg_constants::MULTIXACT_MEMBERS_PER_PAGE as u32; + let last_mbr_blkno = + xlrec.end_trunc_memb / pg_constants::MULTIXACT_MEMBERS_PER_PAGE as u32; + for blkno in first_mbr_blkno..last_mbr_blkno { + let mut blk = DecodedBkpBlock::new(); + blk.forknum = pg_constants::PG_MXACT_MEMBERS_FORKNUM; + blk.blkno = blkno; + blk.will_init = true; + blocks.push(blk); + } + } else { + assert!(false); + } + } else if xlogrec.xl_rmid == pg_constants::RM_RELMAP_ID { + let xlrec = XlRelmapUpdate::decode(&mut buf); + let mut blk = DecodedBkpBlock::new(); + blk.forknum = pg_constants::PG_FILENODEMAP_FORKNUM; + blk.rnode_spcnode = xlrec.tsid; + blk.rnode_dbnode = xlrec.dbid; + blk.rnode_relnode = 0; + blk.will_init = true; + blocks.push(blk); } DecodedWALRecord { diff --git a/pageserver/src/walredo.rs b/pageserver/src/walredo.rs index 6239f553e4..1775748d97 100644 --- a/pageserver/src/walredo.rs +++ b/pageserver/src/walredo.rs @@ -14,6 +14,7 @@ //! TODO: Even though the postgres code runs in a separate process, //! it's not a secure sandbox. //! +use byteorder::{ByteOrder, LittleEndian}; use bytes::{Buf, BufMut, Bytes, BytesMut}; use log::*; use std::assert; @@ -36,6 +37,7 @@ use zenith_utils::lsn::Lsn; use crate::repository::BufferTag; use crate::repository::WALRecord; +use crate::waldecoder::{MultiXactId, XlMultiXactCreate}; use crate::PageServerConf; use postgres_ffi::pg_constants; use postgres_ffi::xlog_utils::XLogRecord; @@ -170,6 +172,25 @@ impl WalRedoManager for PostgresRedoManager { } } +fn mx_offset_to_flags_offset(xid: MultiXactId) -> usize { + return ((xid / pg_constants::MULTIXACT_MEMBERS_PER_MEMBERGROUP as u32) as u16 + % pg_constants::MULTIXACT_MEMBERGROUPS_PER_PAGE + * pg_constants::MULTIXACT_MEMBERGROUP_SIZE) as usize; +} + +fn mx_offset_to_flags_bitshift(xid: MultiXactId) -> u16 { + return (xid as u16) % pg_constants::MULTIXACT_MEMBERS_PER_MEMBERGROUP + * pg_constants::MXACT_MEMBER_BITS_PER_XACT; +} + +/* Location (byte offset within page) of TransactionId of given member */ +fn mx_offset_to_member_offset(xid: MultiXactId) -> usize { + return mx_offset_to_flags_offset(xid) + + (pg_constants::MULTIXACT_FLAGBYTES_PER_GROUP + + (xid as u16 % pg_constants::MULTIXACT_MEMBERS_PER_MEMBERGROUP) * 4) + as usize; +} + /// /// WAL redo thread /// @@ -255,7 +276,7 @@ impl PostgresRedoManagerInternal { let start = Instant::now(); let apply_result: Result; - if tag.rel.forknum == pg_constants::PG_XACT_FORKNUM { + if tag.rel.forknum >= pg_constants::PG_XACT_FORKNUM { const ZERO_PAGE: [u8; 8192] = [0u8; 8192]; let mut page = BytesMut::new(); if let Some(fpi) = base_img { @@ -344,6 +365,55 @@ impl PostgresRedoManagerInternal { record.lsn, record.main_data_offset, record.rec.len()); } + } else if xlogrec.xl_rmid == pg_constants::RM_MULTIXACT_ID { + let info = xlogrec.xl_info & pg_constants::XLOG_XACT_OPMASK; + if info == pg_constants::XLOG_MULTIXACT_ZERO_OFF_PAGE { + page.fill(0); + } else if info == pg_constants::XLOG_MULTIXACT_ZERO_MEM_PAGE { + page.fill(0); + } else if info == pg_constants::XLOG_MULTIXACT_CREATE_ID { + let xlrec = XlMultiXactCreate::decode(&mut buf); + if tag.rel.forknum == pg_constants::PG_MXACT_OFFSETS_FORKNUM { + let offs = (xlrec.mid % pg_constants::MULTIXACT_OFFSETS_PER_PAGE as u32 + * 4) as usize; + LittleEndian::write_u32(&mut page[offs..offs + 4], xlrec.moff); + } else { + assert!(tag.rel.forknum == pg_constants::PG_MXACT_MEMBERS_FORKNUM); + for i in 0..xlrec.nmembers { + let blkno = i / pg_constants::MULTIXACT_MEMBERS_PER_PAGE as u32; + if blkno == tag.blknum { + // update only target block + let offset = xlrec.moff + i; + let memberoff = mx_offset_to_member_offset(offset); + let flagsoff = mx_offset_to_flags_offset(offset); + let bshift = mx_offset_to_flags_bitshift(offset); + let mut flagsval = + LittleEndian::read_u32(&page[flagsoff..flagsoff + 4]); + flagsval &= + !(((1 << pg_constants::MXACT_MEMBER_BITS_PER_XACT) - 1) + << bshift); + flagsval |= xlrec.members[i as usize].status << bshift; + LittleEndian::write_u32( + &mut page[flagsoff..flagsoff + 4], + flagsval, + ); + LittleEndian::write_u32( + &mut page[memberoff..memberoff + 4], + xlrec.members[i as usize].xid, + ); + } + } + } + } else if info == pg_constants::XLOG_MULTIXACT_TRUNCATE_ID { + // empty page image indicates that this SLRU page is truncated and can be removed by GC + page.clear(); + } else { + assert!(false); + } + } else if xlogrec.xl_rmid == pg_constants::RM_RELMAP_ID { + page.clear(); + page.extend_from_slice(&buf[..]); + assert!(page.len() == 512); // size of pg_filenode.map } } @@ -537,7 +607,7 @@ impl PostgresRedoProcess { // explanation of the protocol. fn build_begin_redo_for_block_msg(tag: BufferTag) -> Bytes { - let len = 4 + 5 * 4; + let len = 4 + 1 + 4 * 4; let mut buf = BytesMut::with_capacity(1 + len); buf.put_u8(b'B'); @@ -552,7 +622,7 @@ fn build_begin_redo_for_block_msg(tag: BufferTag) -> Bytes { fn build_push_page_msg(tag: BufferTag, base_img: Bytes) -> Bytes { assert!(base_img.len() == 8192); - let len = 4 + 5 * 4 + base_img.len(); + let len = 4 + 1 + 4 * 4 + base_img.len(); let mut buf = BytesMut::with_capacity(1 + len); buf.put_u8(b'P'); @@ -580,7 +650,7 @@ fn build_apply_record_msg(endlsn: Lsn, rec: Bytes) -> Bytes { } fn build_get_page_msg(tag: BufferTag) -> Bytes { - let len = 4 + 5 * 4; + let len = 4 + 1 + 4 * 4; let mut buf = BytesMut::with_capacity(1 + len); buf.put_u8(b'G'); diff --git a/postgres_ffi/src/pg_constants.rs b/postgres_ffi/src/pg_constants.rs index b336371373..ebbe687502 100644 --- a/postgres_ffi/src/pg_constants.rs +++ b/postgres_ffi/src/pg_constants.rs @@ -77,6 +77,25 @@ pub const XACT_XINFO_HAS_TWOPHASE: u32 = 1u32 << 4; pub const XLOG_SWITCH: u8 = 0x40; pub const XLOG_SMGR_TRUNCATE: u8 = 0x20; +// From multixact.h +pub const XLOG_MULTIXACT_ZERO_OFF_PAGE: u8 = 0x00; +pub const XLOG_MULTIXACT_ZERO_MEM_PAGE: u8 = 0x10; +pub const XLOG_MULTIXACT_CREATE_ID: u8 = 0x20; +pub const XLOG_MULTIXACT_TRUNCATE_ID: u8 = 0x30; + +pub const MULTIXACT_OFFSETS_PER_PAGE: u16 = BLCKSZ / 4; +pub const MXACT_MEMBER_BITS_PER_XACT: u16 = 8; +pub const MXACT_MEMBER_FLAGS_PER_BYTE: u16 = 1; +pub const MULTIXACT_FLAGBYTES_PER_GROUP: u16 = 4; +pub const MULTIXACT_MEMBERS_PER_MEMBERGROUP: u16 = + MULTIXACT_FLAGBYTES_PER_GROUP * MXACT_MEMBER_FLAGS_PER_BYTE; +/* size in bytes of a complete group */ +pub const MULTIXACT_MEMBERGROUP_SIZE: u16 = + 4 * MULTIXACT_MEMBERS_PER_MEMBERGROUP + MULTIXACT_FLAGBYTES_PER_GROUP; +pub const MULTIXACT_MEMBERGROUPS_PER_PAGE: u16 = BLCKSZ / MULTIXACT_MEMBERGROUP_SIZE; +pub const MULTIXACT_MEMBERS_PER_PAGE: u16 = + MULTIXACT_MEMBERGROUPS_PER_PAGE * MULTIXACT_MEMBERS_PER_MEMBERGROUP; + // From heapam_xlog.h pub const XLOG_HEAP_INSERT: u8 = 0x00; pub const XLOG_HEAP_DELETE: u8 = 0x10;