From d73cb49f89da5d3b0d906d8a2d7c9de26a8680ec Mon Sep 17 00:00:00 2001 From: Konstantin Knizhnik Date: Wed, 2 Jun 2021 12:48:12 +0300 Subject: [PATCH] Support non-rel objects --- control_plane/src/compute.rs | 2 + pageserver/src/basebackup.rs | 437 +++++++++++++++++---------- pageserver/src/object_repository.rs | 415 +++++++++++++------------ pageserver/src/object_store.rs | 16 +- pageserver/src/page_service.rs | 159 +++++++++- pageserver/src/repository.rs | 96 +++++- pageserver/src/restore_local_repo.rs | 161 ++++++++-- pageserver/src/rocksdb_storage.rs | 120 ++++++-- pageserver/src/waldecoder.rs | 378 ++++++++++++++++++----- pageserver/src/walreceiver.rs | 24 +- pageserver/src/walredo.rs | 252 ++++++++++++++- postgres_ffi/build.rs | 1 + postgres_ffi/src/lib.rs | 1 + postgres_ffi/src/pg_constants.rs | 54 +++- postgres_ffi/src/xlog_utils.rs | 2 +- postgres_ffi/xlog_ffi.h | 2 + 16 files changed, 1587 insertions(+), 533 deletions(-) create mode 100644 postgres_ffi/xlog_ffi.h diff --git a/control_plane/src/compute.rs b/control_plane/src/compute.rs index b2066a8891..be74cd1564 100644 --- a/control_plane/src/compute.rs +++ b/control_plane/src/compute.rs @@ -301,6 +301,8 @@ impl PostgresNode { ), )?; + fs::create_dir_all(self.pgdata().join("pg_wal"))?; + fs::create_dir_all(self.pgdata().join("pg_wal").join("archive_status"))?; Ok(()) } diff --git a/pageserver/src/basebackup.rs b/pageserver/src/basebackup.rs index e467609ff9..ad947c264f 100644 --- a/pageserver/src/basebackup.rs +++ b/pageserver/src/basebackup.rs @@ -5,180 +5,292 @@ //! It could use a better name. //! use crate::ZTimelineId; +use bytes::{BufMut, BytesMut}; use log::*; use std::io::Write; use std::sync::Arc; -use tar::Builder; +use std::time::SystemTime; +use tar::{Builder, Header}; use walkdir::WalkDir; -use crate::repository::Timeline; +use crate::repository::{DatabaseTag, ObjectTag, Timeline}; +use crc32c::*; use postgres_ffi::relfile_utils::*; +use postgres_ffi::xlog_utils::*; +use postgres_ffi::*; use zenith_utils::lsn::Lsn; -/// -/// Generate tarball with non-relational files from repository -/// -pub fn send_tarball_at_lsn( - write: &mut dyn Write, - timelineid: ZTimelineId, - _timeline: &Arc, - _lsn: Lsn, - snapshot_lsn: Lsn, -) -> anyhow::Result<()> { - let mut ar = Builder::new(write); - - let snappath = format!("timelines/{}/snapshots/{:016X}", timelineid, snapshot_lsn.0); - let walpath = format!("timelines/{}/wal", timelineid); - - debug!("sending tarball of snapshot in {}", snappath); - for entry in WalkDir::new(&snappath) { - let entry = entry?; - let fullpath = entry.path(); - let relpath = entry.path().strip_prefix(&snappath).unwrap(); - - if relpath.to_str().unwrap() == "" { - continue; - } - - if entry.file_type().is_dir() { - trace!( - "sending dir {} as {}", - fullpath.display(), - relpath.display() - ); - ar.append_dir(relpath, fullpath)?; - } else if entry.file_type().is_symlink() { - error!("ignoring symlink in snapshot dir"); - } else if entry.file_type().is_file() { - // Shared catalogs are exempt - if relpath.starts_with("global/") { - trace!("sending shared catalog {}", relpath.display()); - ar.append_path_with_name(fullpath, relpath)?; - } else if !is_rel_file_path(relpath.to_str().unwrap()) { - trace!("sending {}", relpath.display()); - ar.append_path_with_name(fullpath, relpath)?; - } else { - trace!("not sending {}", relpath.display()); - } - } else { - error!("unknown file type: {}", fullpath.display()); - } - } - - // FIXME: Also send all the WAL. The compute node would only need - // the WAL that applies to non-relation files, because the page - // server handles all the relation files. But we don't have a - // mechanism for separating relation and non-relation WAL at the - // moment. - for entry in std::fs::read_dir(&walpath)? { - let entry = entry?; - let fullpath = &entry.path(); - let relpath = fullpath.strip_prefix(&walpath).unwrap(); - - if !entry.path().is_file() { - continue; - } - - let archive_fname = relpath.to_str().unwrap(); - let archive_fname = archive_fname - .strip_suffix(".partial") - .unwrap_or(&archive_fname); - let archive_path = "pg_wal/".to_owned() + archive_fname; - ar.append_path_with_name(fullpath, archive_path)?; - } - ar.finish()?; - debug!("all tarred up!"); - Ok(()) +pub struct Basebackup<'a> { + ar: Builder<&'a mut dyn Write>, + timeline: &'a Arc, + lsn: Lsn, + snappath: String, + slru_buf: [u8; pg_constants::SLRU_SEG_SIZE], + slru_segno: u32, + slru_path: &'static str, } -/// -/// Send a tarball containing a snapshot of all non-relation files in the -/// PostgreSQL data directory, at given LSN -/// -/// There must be a snapshot at the given LSN in the snapshots directory, we cannot -/// reconstruct the state at an arbitrary LSN at the moment. -/// -pub fn send_snapshot_tarball( - write: &mut dyn Write, - timelineid: ZTimelineId, - snapshotlsn: Lsn, -) -> Result<(), std::io::Error> { - let mut ar = Builder::new(write); - - let snappath = format!("timelines/{}/snapshots/{:016X}", timelineid, snapshotlsn.0); - let walpath = format!("timelines/{}/wal", timelineid); - - debug!("sending tarball of snapshot in {}", snappath); - //ar.append_dir_all("", &snappath)?; - - for entry in WalkDir::new(&snappath) { - let entry = entry?; - let fullpath = entry.path(); - let relpath = entry.path().strip_prefix(&snappath).unwrap(); - - if relpath.to_str().unwrap() == "" { - continue; +impl<'a> Basebackup<'a> { + pub fn new( + write: &'a mut dyn Write, + timelineid: ZTimelineId, + timeline: &'a Arc, + lsn: Lsn, + snapshot_lsn: Lsn, + ) -> Basebackup<'a> { + Basebackup { + ar: Builder::new(write), + timeline, + lsn, + snappath: format!("timelines/{}/snapshots/{:016X}", timelineid, snapshot_lsn.0), + slru_path: "", + slru_segno: u32::MAX, + slru_buf: [0u8; pg_constants::SLRU_SEG_SIZE], } + } - if entry.file_type().is_dir() { - trace!( - "sending dir {} as {}", - fullpath.display(), - relpath.display() - ); - ar.append_dir(relpath, fullpath)?; - } else if entry.file_type().is_symlink() { - error!("ignoring symlink in snapshot dir"); - } else if entry.file_type().is_file() { - // Shared catalogs are exempt - if relpath.starts_with("global/") { - trace!("sending shared catalog {}", relpath.display()); - ar.append_path_with_name(fullpath, relpath)?; - } else if !is_rel_file_path(relpath.to_str().unwrap()) { - trace!("sending {}", relpath.display()); - ar.append_path_with_name(fullpath, relpath)?; - } else { - trace!("not sending {}", relpath.display()); + #[rustfmt::skip] + pub fn send_tarball(&mut self) -> anyhow::Result<()> { + debug!("sending tarball of snapshot in {}", self.snappath); + for entry in WalkDir::new(&self.snappath) { + let entry = entry?; + let fullpath = entry.path(); + let relpath = entry.path().strip_prefix(&self.snappath).unwrap(); - // FIXME: For now, also send all the relation files. - // This really shouldn't be necessary, and kind of - // defeats the point of having a page server in the - // first place. But it is useful at least when - // debugging with the DEBUG_COMPARE_LOCAL option (see - // vendor/postgres/src/backend/storage/smgr/pagestore_smgr.c) - - ar.append_path_with_name(fullpath, relpath)?; + if relpath.to_str().unwrap() == "" { + continue; } + + if entry.file_type().is_dir() { + trace!( + "sending dir {} as {}", + fullpath.display(), + relpath.display() + ); + self.ar.append_dir(relpath, fullpath)?; + } else if entry.file_type().is_symlink() { + error!("ignoring symlink in snapshot dir"); + } else if entry.file_type().is_file() { + // Shared catalogs are exempt + if relpath.starts_with("global/") { + trace!("sending shared catalog {}", relpath.display()); + self.ar.append_path_with_name(fullpath, relpath)?; + } else if !is_rel_file_path(relpath.to_str().unwrap()) { + if entry.file_name() != "pg_filenode.map" + && entry.file_name() != "pg_control" + && !relpath.starts_with("pg_xact/") + && !relpath.starts_with("pg_multixact/") + { + trace!("sending {}", relpath.display()); + self.ar.append_path_with_name(fullpath, relpath)?; + } + } else { + trace!("not sending {}", relpath.display()); + } + } else { + error!("unknown file type: {}", fullpath.display()); + } + } + + for obj in self.timeline.list_nonrels(self.lsn)? { + match obj { + ObjectTag::Clog(slru) => + self.add_slru_segment("pg_xact", &obj, slru.blknum)?, + ObjectTag::MultiXactMembers(slru) => + self.add_slru_segment("pg_multixact/members", &obj, slru.blknum)?, + ObjectTag::MultiXactOffsets(slru) => + self.add_slru_segment("pg_multixact/offsets", &obj, slru.blknum)?, + ObjectTag::FileNodeMap(db) => + self.add_relmap_file(&obj, &db)?, + ObjectTag::TwoPhase(prepare) => + self.add_twophase_file(&obj, prepare.xid)?, + _ => {} + } + } + self.finish_slru_segment()?; + self.add_pgcontrol_file()?; + self.ar.finish()?; + debug!("all tarred up!"); + Ok(()) + } + + // + // Generate SRLU segment files from repository + // + fn add_slru_segment( + &mut self, + path: &'static str, + tag: &ObjectTag, + page: u32, + ) -> anyhow::Result<()> { + let img = self.timeline.get_page_at_lsn(*tag, self.lsn)?; + // Zero length image indicates truncated segment: just skip it + if !img.is_empty() { + assert!(img.len() == pg_constants::BLCKSZ as usize); + + let segno = page / pg_constants::SLRU_PAGES_PER_SEGMENT; + if self.slru_segno != segno || self.slru_path != path { + let segname = format!("{}/{:>04X}", self.slru_path, self.slru_segno); + let header = new_tar_header(&segname, pg_constants::SLRU_SEG_SIZE as u64)?; + self.ar.append(&header, &self.slru_buf[..])?; + self.slru_buf = [0u8; pg_constants::SLRU_SEG_SIZE]; + } + self.slru_segno = segno; + self.slru_path = path; + let offs_start = (page % pg_constants::SLRU_PAGES_PER_SEGMENT) as usize + * pg_constants::BLCKSZ as usize; + let offs_end = offs_start + pg_constants::BLCKSZ as usize; + self.slru_buf[offs_start..offs_end].copy_from_slice(&img); + } + Ok(()) + } + + fn finish_slru_segment(&mut self) -> anyhow::Result<()> { + if self.slru_path != "" { + let segname = format!("{}/{:>04X}", self.slru_path, self.slru_segno); + let header = new_tar_header(&segname, pg_constants::SLRU_SEG_SIZE as u64)?; + self.ar.append(&header, &self.slru_buf[..])?; + } + Ok(()) + } + + // + // Extract pg_filenode.map files from repository + // + fn add_relmap_file(&mut self, tag: &ObjectTag, db: &DatabaseTag) -> anyhow::Result<()> { + let img = self.timeline.get_page_at_lsn(*tag, self.lsn)?; + let path = if db.spcnode == pg_constants::GLOBALTABLESPACE_OID { + String::from("global/pg_filenode.map") } else { - error!("unknown file type: {}", fullpath.display()); - } + // User defined tablespaces are not supported + assert!(db.spcnode == pg_constants::DEFAULTTABLESPACE_OID); + let src_path = format!("{}/base/1/PG_VERSION", self.snappath); + let dst_path = format!("base/{}/PG_VERSION", db.dbnode); + self.ar.append_path_with_name(&src_path, &dst_path)?; + format!("base/{}/pg_filenode.map", db.dbnode) + }; + assert!(img.len() == 512); + let header = new_tar_header(&path, img.len() as u64)?; + self.ar.append(&header, &img[..])?; + Ok(()) } - // FIXME: Also send all the WAL. The compute node would only need - // the WAL that applies to non-relation files, because the page - // server handles all the relation files. But we don't have a - // mechanism for separating relation and non-relation WAL at the - // moment. - for entry in std::fs::read_dir(&walpath)? { - let entry = entry?; - let fullpath = &entry.path(); - let relpath = fullpath.strip_prefix(&walpath).unwrap(); - - if !entry.path().is_file() { - continue; - } - - let archive_fname = relpath.to_str().unwrap(); - let archive_fname = archive_fname - .strip_suffix(".partial") - .unwrap_or(&archive_fname); - let archive_path = "pg_wal/".to_owned() + archive_fname; - ar.append_path_with_name(fullpath, archive_path)?; + // + // Extract twophase state files + // + fn add_twophase_file(&mut self, tag: &ObjectTag, xid: TransactionId) -> anyhow::Result<()> { + let img = self.timeline.get_page_at_lsn(*tag, self.lsn)?; + let mut buf = BytesMut::new(); + buf.extend_from_slice(&img[..]); + let crc = crc32c::crc32c(&img[..]); + buf.put_u32_le(crc); + let path = format!("pg_twophase/{:>08X}", xid); + let header = new_tar_header(&path, buf.len() as u64)?; + self.ar.append(&header, &buf[..])?; + Ok(()) } - ar.finish()?; - debug!("all tarred up!"); - Ok(()) + // + // Add generated pg_control file + // + fn add_pgcontrol_file(&mut self) -> anyhow::Result<()> { + let most_recent_lsn = Lsn(0); + let checkpoint_bytes = self + .timeline + .get_page_at_lsn(ObjectTag::Checkpoint, most_recent_lsn)?; + let pg_control_bytes = self + .timeline + .get_page_at_lsn(ObjectTag::ControlFile, most_recent_lsn)?; + let mut pg_control = postgres_ffi::decode_pg_control(pg_control_bytes)?; + let mut checkpoint = postgres_ffi::decode_checkpoint(checkpoint_bytes)?; + + // Here starts pg_resetwal inspired magic + // Generate new pg_control and WAL needed for bootstrap + let new_segno = self.lsn.segment_number(pg_constants::WAL_SEGMENT_SIZE) + 1; + + let new_lsn = XLogSegNoOffsetToRecPtr( + new_segno, + SizeOfXLogLongPHD as u32, + pg_constants::WAL_SEGMENT_SIZE, + ); + checkpoint.redo = new_lsn; + + //reset some fields we don't want to preserve + checkpoint.oldestActiveXid = 0; + + //save new values in pg_control + pg_control.checkPoint = new_lsn; + pg_control.checkPointCopy = checkpoint; + + //send pg_control + let pg_control_bytes = postgres_ffi::encode_pg_control(pg_control); + let header = new_tar_header("global/pg_control", pg_control_bytes.len() as u64)?; + self.ar.append(&header, &pg_control_bytes[..])?; + + //send wal segment + let wal_file_name = XLogFileName( + 1, // FIXME: always use Postgres timeline 1 + new_segno, + pg_constants::WAL_SEGMENT_SIZE, + ); + let wal_file_path = format!("pg_wal/{}", wal_file_name); + let header = new_tar_header(&wal_file_path, pg_constants::WAL_SEGMENT_SIZE as u64)?; + + let mut seg_buf = BytesMut::with_capacity(pg_constants::WAL_SEGMENT_SIZE as usize); + + let hdr = XLogLongPageHeaderData { + std: { + XLogPageHeaderData { + xlp_magic: XLOG_PAGE_MAGIC, + xlp_info: pg_constants::XLP_LONG_HEADER, + xlp_tli: 1, // FIXME: always use Postgres timeline 1 + xlp_pageaddr: pg_control.checkPointCopy.redo - SizeOfXLogLongPHD as u64, + xlp_rem_len: 0, + } + }, + xlp_sysid: pg_control.system_identifier, + xlp_seg_size: pg_constants::WAL_SEGMENT_SIZE as u32, + xlp_xlog_blcksz: XLOG_BLCKSZ as u32, + }; + + let hdr_bytes = encode_xlog_long_phd(hdr); + seg_buf.extend_from_slice(&hdr_bytes); + + let rec_hdr = XLogRecord { + xl_tot_len: (XLOG_SIZE_OF_XLOG_RECORD + + SIZE_OF_XLOG_RECORD_DATA_HEADER_SHORT + + SIZEOF_CHECKPOINT) as u32, + xl_xid: 0, //0 is for InvalidTransactionId + xl_prev: 0, + xl_info: pg_constants::XLOG_CHECKPOINT_SHUTDOWN, + xl_rmid: pg_constants::RM_XLOG_ID, + xl_crc: 0, + }; + + let mut rec_shord_hdr_bytes = BytesMut::new(); + rec_shord_hdr_bytes.put_u8(pg_constants::XLR_BLOCK_ID_DATA_SHORT); + rec_shord_hdr_bytes.put_u8(SIZEOF_CHECKPOINT as u8); + + let rec_bytes = encode_xlog_record(rec_hdr); + let checkpoint_bytes = encode_checkpoint(pg_control.checkPointCopy); + + //calculate record checksum + let mut crc = 0; + crc = crc32c_append(crc, &rec_shord_hdr_bytes[..]); + crc = crc32c_append(crc, &checkpoint_bytes[..]); + crc = crc32c_append(crc, &rec_bytes[0..XLOG_RECORD_CRC_OFFS]); + + seg_buf.extend_from_slice(&rec_bytes[0..XLOG_RECORD_CRC_OFFS]); + seg_buf.put_u32_le(crc); + seg_buf.extend_from_slice(&rec_shord_hdr_bytes); + seg_buf.extend_from_slice(&checkpoint_bytes); + + //zero out remainig file + seg_buf.resize(pg_constants::WAL_SEGMENT_SIZE, 0); + + self.ar.append(&header, &seg_buf[..])?; + Ok(()) + } } /// @@ -231,3 +343,18 @@ fn parse_rel_file_path(path: &str) -> Result<(), FilePathError> { fn is_rel_file_path(path: &str) -> bool { parse_rel_file_path(path).is_ok() } + +fn new_tar_header(path: &str, size: u64) -> anyhow::Result
{ + let mut header = Header::new_gnu(); + header.set_size(size); + header.set_path(path)?; + header.set_mode(0b110000000); + header.set_mtime( + SystemTime::now() + .duration_since(SystemTime::UNIX_EPOCH) + .unwrap() + .as_secs(), + ); + header.set_cksum(); + Ok(header) +} diff --git a/pageserver/src/object_repository.rs b/pageserver/src/object_repository.rs index 4b7ec67e4e..b3f4a5e3d0 100644 --- a/pageserver/src/object_repository.rs +++ b/pageserver/src/object_repository.rs @@ -21,7 +21,6 @@ use crate::{PageServerConf, ZTimelineId}; use anyhow::{bail, Context, Result}; use bytes::Bytes; use log::*; -use postgres_ffi::pg_constants; use serde::{Deserialize, Serialize}; use std::cmp::max; use std::collections::{BTreeMap, HashMap, HashSet}; @@ -113,10 +112,11 @@ impl Repository for ObjectRepository { ancestor_timeline: None, ancestor_lsn: start_lsn, }; + let val = ObjectValue::TimelineMetadata(metadata); self.obj_store.put( &timeline_metadata_key(timelineid), Lsn(0), - &MetadataEntry::ser(&metadata)?, + &ObjectValue::ser(&val)?, )?; info!("Created empty timeline {}", timelineid); @@ -149,10 +149,11 @@ impl Repository for ObjectRepository { ancestor_timeline: Some(src), ancestor_lsn: at_lsn, }; + let val = ObjectValue::TimelineMetadata(metadata); self.obj_store.put( &timeline_metadata_key(dst), Lsn(0), - &MetadataEntry::ser(&metadata)?, + &ObjectValue::ser(&val)?, )?; Ok(()) @@ -223,19 +224,22 @@ impl ObjectTimeline { let v = obj_store .get(&timeline_metadata_key(timelineid), Lsn(0)) .with_context(|| "timeline not found in repository")?; - let metadata = MetadataEntry::des(&v)?; - let timeline = ObjectTimeline { - timelineid, - obj_store, - walredo_mgr, - last_valid_lsn: SeqWait::new(metadata.last_valid_lsn), - last_record_lsn: AtomicLsn::new(metadata.last_record_lsn.0), - ancestor_timeline: metadata.ancestor_timeline, - ancestor_lsn: metadata.ancestor_lsn, - rel_meta: RwLock::new(BTreeMap::new()), - }; - Ok(timeline) + if let ObjectValue::TimelineMetadata(metadata) = ObjectValue::des(&v)? { + let timeline = ObjectTimeline { + timelineid, + obj_store, + walredo_mgr, + last_valid_lsn: SeqWait::new(metadata.last_valid_lsn), + last_record_lsn: AtomicLsn::new(metadata.last_record_lsn.0), + ancestor_timeline: metadata.ancestor_timeline, + ancestor_lsn: metadata.ancestor_lsn, + rel_meta: RwLock::new(BTreeMap::new()), + }; + Ok(timeline) + } else { + bail!("Invalid timeline metadata"); + } } } @@ -245,7 +249,7 @@ impl Timeline for ObjectTimeline { //------------------------------------------------------------------------------ /// Look up given page in the cache. - fn get_page_at_lsn(&self, tag: BufferTag, req_lsn: Lsn) -> Result { + fn get_page_at_lsn(&self, tag: ObjectTag, req_lsn: Lsn) -> Result { let lsn = self.wait_lsn(req_lsn)?; self.get_page_at_lsn_nowait(tag, lsn) @@ -282,6 +286,11 @@ impl Timeline for ObjectTimeline { Ok(false) } + /// Get a list of non-relational objects + fn list_nonrels<'a>(&'a self, lsn: Lsn) -> Result + 'a>> { + self.obj_store.list_objects(self.timelineid, true, lsn) + } + /// Get a list of all distinct relations in given tablespace and database. fn list_rels(&self, spcnode: u32, dbnode: u32, lsn: Lsn) -> Result> { // List all relations in this timeline. @@ -306,10 +315,12 @@ impl Timeline for ObjectTimeline { .obj_store .get(&timeline_metadata_key(timeline), Lsn(0)) .with_context(|| "timeline not found in repository")?; - let metadata = MetadataEntry::des(&v)?; - - prev_timeline = metadata.ancestor_timeline; - lsn = metadata.ancestor_lsn; + if let ObjectValue::TimelineMetadata(metadata) = ObjectValue::des(&v)? { + prev_timeline = metadata.ancestor_timeline; + lsn = metadata.ancestor_lsn; + } else { + bail!("Invalid timeline metadata"); + } } Ok(all_rels) @@ -325,96 +336,102 @@ impl Timeline for ObjectTimeline { /// /// This will implicitly extend the relation, if the page is beyond the /// current end-of-file. - fn put_wal_record(&self, tag: BufferTag, rec: WALRecord) -> Result<()> { + fn put_wal_record(&self, tag: ObjectTag, rec: WALRecord) -> Result<()> { let lsn = rec.lsn; let key = ObjectKey { timeline: self.timelineid, - buf_tag: tag, + tag, }; - let val = PageEntry::WALRecord(rec); + let val = ObjectValue::WALRecord(rec); - self.obj_store.put(&key, lsn, &PageEntry::ser(&val)?)?; - debug!( - "put_wal_record rel {} blk {} at {}", - tag.rel, tag.blknum, lsn - ); + self.obj_store.put(&key, lsn, &ObjectValue::ser(&val)?)?; + debug!("put_wal_record {:?} at {}", tag, lsn); - // Also check if this created or extended the file - let old_nblocks = self.relsize_get_nowait(tag.rel, lsn)?.unwrap_or(0); + if let ObjectTag::RelationBuffer(tag) = tag { + // Also check if this created or extended the file + let old_nblocks = self.relsize_get_nowait(tag.rel, lsn)?.unwrap_or(0); - if tag.blknum >= old_nblocks { - let new_nblocks = tag.blknum + 1; - let key = relation_size_key(self.timelineid, tag.rel); - let val = RelationSizeEntry::Size(new_nblocks); + if tag.blknum >= old_nblocks { + let new_nblocks = tag.blknum + 1; + let key = relation_size_key(self.timelineid, tag.rel); + let val = ObjectValue::RelationSize(new_nblocks); - trace!( - "Extended relation {} from {} to {} blocks at {}", - tag.rel, - old_nblocks, - new_nblocks, - lsn - ); + trace!( + "Extended relation {} from {} to {} blocks at {}", + tag.rel, + old_nblocks, + new_nblocks, + lsn + ); - self.obj_store - .put(&key, lsn, &RelationSizeEntry::ser(&val)?)?; - let mut rel_meta = self.rel_meta.write().unwrap(); - rel_meta.insert( - tag.rel, - RelMetadata { - size: new_nblocks, - last_updated: lsn, - }, - ); + self.obj_store + .put(&key, lsn, &ObjectValue::ser(&val)?)?; + let mut rel_meta = self.rel_meta.write().unwrap(); + rel_meta.insert( + tag.rel, + RelMetadata { + size: new_nblocks, + last_updated: lsn, + }, + ); + } } + Ok(()) + } + fn put_unlink(&self, tag: ObjectTag, lsn: Lsn) -> Result<()> { + let key = ObjectKey { + timeline: self.timelineid, + tag, + }; + let val = ObjectValue::Unlink; + self.obj_store.put(&key, lsn, &ObjectValue::ser(&val)?)?; Ok(()) } /// /// Memorize a full image of a page version /// - fn put_page_image(&self, tag: BufferTag, lsn: Lsn, img: Bytes) -> Result<()> { + fn put_page_image(&self, tag: ObjectTag, lsn: Lsn, img: Bytes) -> Result<()> { let key = ObjectKey { timeline: self.timelineid, - buf_tag: tag, + tag, }; - let val = PageEntry::Page(img); + let val = ObjectValue::Page(img); - self.obj_store.put(&key, lsn, &PageEntry::ser(&val)?)?; + self.obj_store.put(&key, lsn, &ObjectValue::ser(&val)?)?; - debug!( - "put_page_image rel {} blk {} at {}", - tag.rel, tag.blknum, lsn - ); + debug!("put_page_image {:?} at {}", tag, lsn); - // Also check if this created or extended the file - let old_nblocks = self.relsize_get_nowait(tag.rel, lsn)?.unwrap_or(0); + if let ObjectTag::RelationBuffer(tag) = tag { + // Also check if this created or extended the file + let old_nblocks = self.relsize_get_nowait(tag.rel, lsn)?.unwrap_or(0); - if tag.blknum >= old_nblocks { - let new_nblocks = tag.blknum + 1; - let key = relation_size_key(self.timelineid, tag.rel); - let val = RelationSizeEntry::Size(new_nblocks); + if tag.blknum >= old_nblocks { + let new_nblocks = tag.blknum + 1; + let key = relation_size_key(self.timelineid, tag.rel); + let val = ObjectValue::RelationSize(new_nblocks); - trace!( - "Extended relation {} from {} to {} blocks at {}", - tag.rel, - old_nblocks, - new_nblocks, - lsn - ); + trace!( + "Extended relation {} from {} to {} blocks at {}", + tag.rel, + old_nblocks, + new_nblocks, + lsn + ); - self.obj_store - .put(&key, lsn, &RelationSizeEntry::ser(&val)?)?; - let mut rel_meta = self.rel_meta.write().unwrap(); - rel_meta.insert( - tag.rel, - RelMetadata { - size: new_nblocks, - last_updated: lsn, - }, - ); + self.obj_store + .put(&key, lsn, &ObjectValue::ser(&val)?)?; + let mut rel_meta = self.rel_meta.write().unwrap(); + rel_meta.insert( + tag.rel, + RelMetadata { + size: new_nblocks, + last_updated: lsn, + }, + ); + } } - Ok(()) } @@ -424,12 +441,12 @@ impl Timeline for ObjectTimeline { /// fn put_truncation(&self, rel: RelTag, lsn: Lsn, nblocks: u32) -> Result<()> { let key = relation_size_key(self.timelineid, rel); - let val = RelationSizeEntry::Size(nblocks); + let val = ObjectValue::RelationSize(nblocks); info!("Truncate relation {} to {} blocks at {}", rel, nblocks, lsn); self.obj_store - .put(&key, lsn, &RelationSizeEntry::ser(&val)?)?; + .put(&key, lsn, &ObjectValue::ser(&val)?)?; let mut rel_meta = self.rel_meta.write().unwrap(); rel_meta.insert( rel, @@ -512,14 +529,15 @@ impl Timeline for ObjectTimeline { ancestor_timeline: self.ancestor_timeline, ancestor_lsn: self.ancestor_lsn, }; + trace!("checkpoint at {}", metadata.last_valid_lsn); + + let val = ObjectValue::TimelineMetadata(metadata); self.obj_store.put( &timeline_metadata_key(self.timelineid), Lsn(0), - &MetadataEntry::ser(&metadata)?, + &ObjectValue::ser(&val)?, )?; - trace!("checkpoint at {}", metadata.last_valid_lsn); - Ok(()) } @@ -535,46 +553,46 @@ impl Timeline for ObjectTimeline { } impl ObjectTimeline { - fn get_page_at_lsn_nowait(&self, tag: BufferTag, lsn: Lsn) -> Result { + fn get_page_at_lsn_nowait(&self, tag: ObjectTag, lsn: Lsn) -> Result { // Look up the page entry. If it's a page image, return that. If it's a WAL record, // ask the WAL redo service to reconstruct the page image from the WAL records. let searchkey = ObjectKey { timeline: self.timelineid, - buf_tag: tag, + tag, }; let mut iter = self.object_versions(&*self.obj_store, &searchkey, lsn)?; if let Some((version_lsn, value)) = iter.next().transpose()? { let page_img: Bytes; - match PageEntry::des(&value)? { - PageEntry::Page(img) => { + match ObjectValue::des(&value)? { + ObjectValue::Page(img) => { page_img = img; } - PageEntry::WALRecord(_rec) => { + ObjectValue::WALRecord(_rec) => { // Request the WAL redo manager to apply the WAL records for us. let (base_img, records) = self.collect_records_for_apply(tag, lsn)?; page_img = self.walredo_mgr.request_redo(tag, lsn, base_img, records)?; self.put_page_image(tag, lsn, page_img.clone())?; } + x => bail!("Unexpected object value: {:?}", x), } // FIXME: assumes little-endian. Only used for the debugging log though let page_lsn_hi = u32::from_le_bytes(page_img.get(0..4).unwrap().try_into().unwrap()); let page_lsn_lo = u32::from_le_bytes(page_img.get(4..8).unwrap().try_into().unwrap()); trace!( - "Returning page with LSN {:X}/{:X} for {} blk {} from {} (request {})", + "Returning page with LSN {:X}/{:X} for {:?} from {} (request {})", page_lsn_hi, page_lsn_lo, - tag.rel, - tag.blknum, + tag, version_lsn, lsn ); return Ok(page_img); } static ZERO_PAGE: [u8; 8192] = [0u8; 8192]; - trace!("page {} blk {} at {} not found", tag.rel, tag.blknum, lsn); + trace!("page {:?} at {} not found", tag, lsn); Ok(Bytes::from_static(&ZERO_PAGE)) /* return Err("could not find page image")?; */ } @@ -597,8 +615,9 @@ impl ObjectTimeline { let mut iter = self.object_versions(&*self.obj_store, &key, lsn)?; if let Some((version_lsn, value)) = iter.next().transpose()? { - match RelationSizeEntry::des(&value)? { - RelationSizeEntry::Size(nblocks) => { + let value = ObjectValue::des(&value)?; + match value { + ObjectValue::RelationSize(nblocks) => { trace!( "relation {} has size {} at {} (request {})", rel, @@ -608,7 +627,7 @@ impl ObjectTimeline { ); Ok(Some(nblocks)) } - RelationSizeEntry::Unlink => { + ObjectValue::Unlink => { trace!( "relation {} not found; it was dropped at lsn {}", rel, @@ -616,9 +635,15 @@ impl ObjectTimeline { ); Ok(None) } + _ => bail!( + "Unexpect relation {} size value {:?} at {}", + rel, + value, + lsn + ), } } else { - info!("relation {} not found at {}", rel, lsn); + debug!("relation {} not found at {}", rel, lsn); Ok(None) } } @@ -632,7 +657,7 @@ impl ObjectTimeline { /// fn collect_records_for_apply( &self, - tag: BufferTag, + tag: ObjectTag, lsn: Lsn, ) -> Result<(Option, Vec)> { let mut base_img: Option = None; @@ -642,24 +667,25 @@ impl ObjectTimeline { // old page image. let searchkey = ObjectKey { timeline: self.timelineid, - buf_tag: tag, + tag, }; let mut iter = self.object_versions(&*self.obj_store, &searchkey, lsn)?; while let Some((_key, value)) = iter.next().transpose()? { - match PageEntry::des(&value)? { - PageEntry::Page(img) => { + match ObjectValue::des(&value)? { + ObjectValue::Page(img) => { // We have a base image. No need to dig deeper into the list of // records base_img = Some(img); break; } - PageEntry::WALRecord(rec) => { + ObjectValue::WALRecord(rec) => { records.push(rec.clone()); // If this WAL record initializes the page, no need to dig deeper. if rec.will_init { break; } } + x => bail!("Unexpected object value {:?}", x), } } records.reverse(); @@ -797,7 +823,7 @@ impl ObjectTimeline { Ok(ObjectVersionIter { obj_store, - buf_tag: key.buf_tag, + tag: key.tag, current_iter, ancestor_timeline: self.ancestor_timeline, ancestor_lsn: self.ancestor_lsn, @@ -806,9 +832,9 @@ impl ObjectTimeline { } struct ObjectHistory<'a> { - iter: Box)>> + 'a>, + iter: Box)>> + 'a>, lsn: Lsn, - last_relation_size: Option<(BufferTag, u32)>, + last_relation_size: Option<(RelTag, u32)>, } impl<'a> Iterator for ObjectHistory<'a> { @@ -826,123 +852,92 @@ impl<'a> History for ObjectHistory<'a> { } impl<'a> ObjectHistory<'a> { - fn handle_relation_size( - &mut self, - buf_tag: BufferTag, - entry: RelationSizeEntry, - ) -> Option { - match entry { - RelationSizeEntry::Size(size) => { - // we only want to output truncations, expansions are filtered out - let last_relation_size = self.last_relation_size.replace((buf_tag, size)); - - match last_relation_size { - Some((last_buf, last_size)) if last_buf != buf_tag || size < last_size => { - Some(Update::Truncate { n_blocks: size }) - } - _ => None, - } - } - RelationSizeEntry::Unlink => Some(Update::Unlink), - } - } - - fn handle_page(&mut self, buf_tag: BufferTag, entry: PageEntry) -> Update { - match entry { - PageEntry::Page(img) => Update::Page { - blknum: buf_tag.blknum, - img, - }, - PageEntry::WALRecord(rec) => Update::WALRecord { - blknum: buf_tag.blknum, - rec, - }, - } - } - fn next_result(&mut self) -> Result> { - while let Some((buf_tag, lsn, value)) = self.iter.next().transpose()? { - if buf_tag.rel.forknum == pg_constants::ROCKSDB_SPECIAL_FORKNUM { - continue; - } + while let Some((tag, lsn, value)) = self.iter.next().transpose()? { + let entry = ObjectValue::des(&value)?; + let rel_tag: RelTag; + let update = match tag { + ObjectTag::RelationMetadata(rel) => { + rel_tag = rel; + match entry { + ObjectValue::RelationSize(size) => { + // we only want to output truncations, expansions are filtered out + let last_relation_size = self.last_relation_size.replace((rel, size)); - let update = if buf_tag.blknum == RELATION_SIZE_BLKNUM { - let entry = RelationSizeEntry::des(&value)?; - match self.handle_relation_size(buf_tag, entry) { - Some(relation_update) => relation_update, - None => continue, + match last_relation_size { + Some((last_rel, last_size)) + if last_rel != rel || size < last_size => + { + Update::Truncate { n_blocks: size } + } + _ => continue, + } + } + ObjectValue::Unlink => Update::Unlink, + _ => continue, + } } - } else { - let entry = PageEntry::des(&value)?; - self.handle_page(buf_tag, entry) + ObjectTag::RelationBuffer(buf_tag) => { + rel_tag = buf_tag.rel; + match entry { + ObjectValue::Page(img) => Update::Page { + blknum: buf_tag.blknum, + img, + }, + ObjectValue::WALRecord(rec) => Update::WALRecord { + blknum: buf_tag.blknum, + rec, + }, + _ => continue, + } + } + _ => continue, }; - return Ok(Some(RelationUpdate { - rel: buf_tag.rel, + rel: rel_tag, lsn, update, })); } - Ok(None) } } /// -/// We store two kinds of page versions in the repository: -/// -/// 1. Ready-made images of the block -/// 2. WAL records, to be applied on top of the "previous" entry -/// -/// Some WAL records will initialize the page from scratch. For such records, -/// the 'will_init' flag is set. They don't need the previous page image before -/// applying. The 'will_init' flag is set for records containing a full-page image, -/// and for records with the BKPBLOCK_WILL_INIT flag. These differ from PageImages -/// stored directly in the cache entry in that you still need to run the WAL redo -/// routine to generate the page image. +/// We store several kinds of objects in the repository. +/// We have per-page, per-relation(or non-rel file) and per-timeline entries. /// #[derive(Debug, Clone, Serialize, Deserialize)] -enum PageEntry { +enum ObjectValue { + /// Ready-made images of the block Page(Bytes), + /// WAL records, to be applied on top of the "previous" entry + /// + /// Some WAL records will initialize the page from scratch. For such records, + /// the 'will_init' flag is set. They don't need the previous page image before + /// applying. The 'will_init' flag is set for records containing a full-page image, + /// and for records with the BKPBLOCK_WILL_INIT flag. These differ from PageImages + /// stored directly in the cache entry in that you still need to run the WAL redo + /// routine to generate the page image. WALRecord(WALRecord), -} - -/// -/// In addition to page versions, we store relation size as a separate, versioned, -/// object. -/// -#[derive(Debug, Clone, Serialize, Deserialize)] -pub enum RelationSizeEntry { - Size(u32), - - /// Tombstone for a dropped relation. - // - // TODO: Not used. Currently, we never drop relations. The parsing - // of relation drops in COMMIT/ABORT records has not been - // implemented. We should also have a mechanism to remove - // "orphaned" relfiles, if the compute node crashes before writing - // the COMMIT/ABORT record. + /// RelationSize. We store it separately not only to ansver nblocks requests faster. + /// We also need it to support relation truncation. + RelationSize(u32), + /// TODO Add a comment Unlink, + TimelineMetadata(MetadataEntry), } -// No real block in PostgreSQL will have block number u32::MAX -// See vendor/postgres/src/include/storage/block.h -const RELATION_SIZE_BLKNUM: u32 = u32::MAX; - const fn relation_size_key(timelineid: ZTimelineId, rel: RelTag) -> ObjectKey { ObjectKey { timeline: timelineid, - buf_tag: BufferTag { - rel, - blknum: RELATION_SIZE_BLKNUM, - }, + tag: ObjectTag::RelationMetadata(rel), } } /// /// In addition to those per-page and per-relation entries, we also -/// store a little metadata blob for each timeline. It is stored using -/// STORAGE_SPECIAL_FORKNUM. +/// store a little metadata blob for each timeline. /// #[derive(Debug, Clone, Serialize, Deserialize)] pub struct MetadataEntry { @@ -955,15 +950,7 @@ pub struct MetadataEntry { const fn timeline_metadata_key(timelineid: ZTimelineId) -> ObjectKey { ObjectKey { timeline: timelineid, - buf_tag: BufferTag { - rel: RelTag { - forknum: pg_constants::ROCKSDB_SPECIAL_FORKNUM, - spcnode: 0, - dbnode: 0, - relnode: 0, - }, - blknum: 0, - }, + tag: ObjectTag::TimelineMetadataTag, } } @@ -976,7 +963,7 @@ const fn timeline_metadata_key(timelineid: ZTimelineId) -> ObjectKey { struct ObjectVersionIter<'a> { obj_store: &'a dyn ObjectStore, - buf_tag: BufferTag, + tag: ObjectTag, /// Iterator on the current timeline. current_iter: Box)> + 'a>, @@ -1013,7 +1000,7 @@ impl<'a> ObjectVersionIter<'a> { if let Some(ancestor_timeline) = self.ancestor_timeline { let searchkey = ObjectKey { timeline: ancestor_timeline, - buf_tag: self.buf_tag, + tag: self.tag, }; let ancestor_iter = self .obj_store @@ -1026,11 +1013,13 @@ impl<'a> ObjectVersionIter<'a> { .obj_store .get(&timeline_metadata_key(ancestor_timeline), Lsn(0)) .with_context(|| "timeline not found in repository")?; - let ancestor_metadata = MetadataEntry::des(&v)?; - - self.ancestor_timeline = ancestor_metadata.ancestor_timeline; - self.ancestor_lsn = ancestor_metadata.ancestor_lsn; - self.current_iter = ancestor_iter; + if let ObjectValue::TimelineMetadata(ancestor_metadata) = ObjectValue::des(&v)? { + self.ancestor_timeline = ancestor_metadata.ancestor_timeline; + self.ancestor_lsn = ancestor_metadata.ancestor_lsn; + self.current_iter = ancestor_iter; + } else { + bail!("Invalid timeline metadata"); + } } else { return Ok(None); } diff --git a/pageserver/src/object_store.rs b/pageserver/src/object_store.rs index 6bc6c86a24..ebd0cf7467 100644 --- a/pageserver/src/object_store.rs +++ b/pageserver/src/object_store.rs @@ -1,6 +1,6 @@ //! Low-level key-value storage abstraction. //! -use crate::repository::{BufferTag, RelTag}; +use crate::repository::{ObjectTag, RelTag}; use crate::ZTimelineId; use anyhow::Result; use serde::{Deserialize, Serialize}; @@ -11,7 +11,7 @@ use zenith_utils::lsn::Lsn; #[derive(Debug, Clone, Serialize, Deserialize)] pub struct ObjectKey { pub timeline: ZTimelineId, - pub buf_tag: BufferTag, + pub tag: ObjectTag, } /// @@ -58,7 +58,7 @@ pub trait ObjectStore: Send + Sync { &'a self, timeline: ZTimelineId, lsn: Lsn, - ) -> Result)>> + 'a>>; + ) -> Result)>> + 'a>>; /// Iterate through all keys with given tablespace and database ID, and LSN <= 'lsn'. /// Both dbnode and spcnode can be InvalidId (0) which means get all relations in tablespace/cluster @@ -72,6 +72,16 @@ pub trait ObjectStore: Send + Sync { lsn: Lsn, ) -> Result>; + /// Iterate through all objects + /// + /// This is used to implement GC and preparing tarball for new node startup + fn list_objects<'a>( + &'a self, + timelineid: ZTimelineId, + nonrel_only: bool, + lsn: Lsn, + ) -> Result + 'a>>; + /// Unlink object (used by GC). This mehod may actually delete object or just mark it for deletion. fn unlink(&self, key: &ObjectKey, lsn: Lsn) -> Result<()>; } diff --git a/pageserver/src/page_service.rs b/pageserver/src/page_service.rs index bd5a1841e2..0fb5878761 100644 --- a/pageserver/src/page_service.rs +++ b/pageserver/src/page_service.rs @@ -27,7 +27,7 @@ use zenith_utils::{bin_ser::BeSer, lsn::Lsn}; use crate::basebackup; use crate::branches; use crate::page_cache; -use crate::repository::{BufferTag, RelTag, RelationUpdate, Update}; +use crate::repository::{BufferTag, ObjectTag, RelTag, RelationUpdate, Update}; use crate::restore_local_repo; use crate::walreceiver; use crate::PageServerConf; @@ -410,18 +410,18 @@ impl postgres_backend::Handler for PageServerHandler { match relation_update.update { Update::Page { blknum, img } => { - let tag = BufferTag { + let tag = ObjectTag::RelationBuffer(BufferTag { rel: relation_update.rel, blknum, - }; + }); timeline.put_page_image(tag, relation_update.lsn, img)?; } Update::WALRecord { blknum, rec } => { - let tag = BufferTag { + let tag = ObjectTag::RelationBuffer(BufferTag { rel: relation_update.rel, blknum, - }; + }); timeline.put_wal_record(tag, rec)?; } @@ -505,6 +505,155 @@ impl postgres_backend::Handler for PageServerHandler { } pgb.flush()?; + Ok(()) + } + + fn handle_controlfile(&mut self) -> io::Result<()> { + self.write_message_noflush(&BeMessage::RowDescription)?; + self.write_message_noflush(&BeMessage::ControlFile)?; + self.write_message(&BeMessage::CommandComplete)?; + + Ok(()) + } + + fn handle_pagerequests(&mut self, timelineid: ZTimelineId) -> anyhow::Result<()> { + // Check that the timeline exists + let repository = page_cache::get_repository(); + let timeline = repository.get_timeline(timelineid).map_err(|_| { + anyhow!( + "client requested pagestream on timeline {} which does not exist in page server", + timelineid + ) + })?; + + /* switch client to COPYBOTH */ + self.stream.write_u8(b'W')?; + self.stream.write_i32::(4 + 1 + 2)?; + self.stream.write_u8(0)?; /* copy_is_binary */ + self.stream.write_i16::(0)?; /* numAttributes */ + self.stream.flush()?; + + while let Some(message) = self.read_message()? { + trace!("query({:?}): {:?}", timelineid, message); + + let copy_data_bytes = match message { + FeMessage::CopyData(bytes) => bytes, + _ => continue, + }; + + let zenith_fe_msg = PagestreamFeMessage::parse(copy_data_bytes)?; + + let response = match zenith_fe_msg { + PagestreamFeMessage::Exists(req) => { + let tag = RelTag { + spcnode: req.spcnode, + dbnode: req.dbnode, + relnode: req.relnode, + forknum: req.forknum, + }; + + let exist = timeline.get_rel_exists(tag, req.lsn).unwrap_or(false); + + PagestreamBeMessage::Status(PagestreamStatusResponse { + ok: exist, + n_blocks: 0, + }) + } + PagestreamFeMessage::Nblocks(req) => { + let tag = RelTag { + spcnode: req.spcnode, + dbnode: req.dbnode, + relnode: req.relnode, + forknum: req.forknum, + }; + + let n_blocks = timeline.get_rel_size(tag, req.lsn).unwrap_or(0); + + PagestreamBeMessage::Nblocks(PagestreamStatusResponse { ok: true, n_blocks }) + } + PagestreamFeMessage::Read(req) => { + let buf_tag = ObjectTag::RelationBuffer(BufferTag { + rel: RelTag { + spcnode: req.spcnode, + dbnode: req.dbnode, + relnode: req.relnode, + forknum: req.forknum, + }, + blknum: req.blkno, + }); + + let read_response = match timeline.get_page_at_lsn(buf_tag, req.lsn) { + Ok(p) => PagestreamReadResponse { + ok: true, + n_blocks: 0, + page: p, + }, + Err(e) => { + const ZERO_PAGE: [u8; 8192] = [0; 8192]; + error!("get_page_at_lsn: {}", e); + PagestreamReadResponse { + ok: false, + n_blocks: 0, + page: Bytes::from_static(&ZERO_PAGE), + } + } + }; + + PagestreamBeMessage::Read(read_response) + } + }; + + self.write_message(&BeMessage::CopyData(response.serialize()))?; + } + + Ok(()) + } + + fn handle_basebackup_request( + &mut self, + timelineid: ZTimelineId, + lsn: Option, + ) -> anyhow::Result<()> { + // check that the timeline exists + let repository = page_cache::get_repository(); + let timeline = repository.get_timeline(timelineid).map_err(|e| { + error!("error fetching timeline: {:?}", e); + anyhow!( + "client requested basebackup on timeline {} which does not exist in page server", + timelineid + ) + })?; + /* switch client to COPYOUT */ + let stream = &mut self.stream; + stream.write_u8(b'H')?; + stream.write_i32::(4 + 1 + 2)?; + stream.write_u8(0)?; /* copy_is_binary */ + stream.write_i16::(0)?; /* numAttributes */ + stream.flush()?; + info!("sent CopyOut"); + + /* Send a tarball of the latest snapshot on the timeline */ + + // find latest snapshot + let snapshot_lsn = + restore_local_repo::find_latest_snapshot(&self.conf, timelineid).unwrap(); + let req_lsn = lsn.unwrap_or_else(|| timeline.get_last_valid_lsn()); + { + let mut writer = CopyDataSink { stream }; + let mut basebackup = basebackup::Basebackup::new( + &mut writer, + timelineid, + &timeline, + req_lsn, + snapshot_lsn, + ); + basebackup.send_tarball()?; + } + // CopyDone + self.stream.write_u8(b'c')?; + self.stream.write_u32::(4)?; + self.stream.flush()?; + debug!("CopyDone sent!"); Ok(()) } diff --git a/pageserver/src/repository.rs b/pageserver/src/repository.rs index 6254dbf607..1caa35840c 100644 --- a/pageserver/src/repository.rs +++ b/pageserver/src/repository.rs @@ -1,3 +1,4 @@ +use crate::waldecoder::TransactionId; use crate::ZTimelineId; use anyhow::Result; use bytes::{Buf, BufMut, Bytes, BytesMut}; @@ -5,6 +6,7 @@ use postgres_ffi::relfile_utils::forknumber_to_name; use serde::{Deserialize, Serialize}; use std::collections::HashSet; use std::fmt; +use std::iter::Iterator; use std::sync::Arc; use zenith_utils::lsn::Lsn; @@ -34,7 +36,7 @@ pub trait Timeline: Send + Sync { //------------------------------------------------------------------------------ /// Look up given page in the cache. - fn get_page_at_lsn(&self, tag: BufferTag, lsn: Lsn) -> Result; + fn get_page_at_lsn(&self, tag: ObjectTag, lsn: Lsn) -> Result; /// Get size of relation fn get_rel_size(&self, tag: RelTag, lsn: Lsn) -> Result; @@ -42,9 +44,12 @@ pub trait Timeline: Send + Sync { /// Does relation exist? fn get_rel_exists(&self, tag: RelTag, lsn: Lsn) -> Result; - /// Get a list of all distinct relations in given tablespace and database. + /// Get a list of all relations in given tablespace and database. fn list_rels(&self, spcnode: u32, dbnode: u32, lsn: Lsn) -> Result>; + /// Get a list of non-relational objects + fn list_nonrels<'a>(&'a self, lsn: Lsn) -> Result + 'a>>; + //------------------------------------------------------------------------------ // Public PUT functions, to update the repository with new page versions. // @@ -55,14 +60,17 @@ pub trait Timeline: Send + Sync { /// /// This will implicitly extend the relation, if the page is beyond the /// current end-of-file. - fn put_wal_record(&self, tag: BufferTag, rec: WALRecord) -> Result<()>; + fn put_wal_record(&self, tag: ObjectTag, rec: WALRecord) -> Result<()>; /// Like put_wal_record, but with ready-made image of the page. - fn put_page_image(&self, tag: BufferTag, lsn: Lsn, img: Bytes) -> Result<()>; + fn put_page_image(&self, tag: ObjectTag, lsn: Lsn, img: Bytes) -> Result<()>; /// Truncate relation fn put_truncation(&self, rel: RelTag, lsn: Lsn, nblocks: u32) -> Result<()>; + /// Unlink object + fn put_unlink(&self, tag: ObjectTag, lsn: Lsn) -> Result<()>; + /// Remember the all WAL before the given LSN has been processed. /// /// The WAL receiver calls this after the put_* functions, to indicate that @@ -185,6 +193,8 @@ impl fmt::Display for RelTag { /// In Postgres `BufferTag` structure is used for exactly the same purpose. /// [See more related comments here](https://github.com/postgres/postgres/blob/99c5852e20a0987eca1c38ba0c09329d4076b6a0/src/include/storage/buf_internals.h#L91). /// +/// NOTE: In this context we use buffer, block and page interchangeably when speak about relation files. +/// #[derive(Debug, PartialEq, Eq, PartialOrd, Ord, Clone, Copy, Serialize, Deserialize)] pub struct BufferTag { pub rel: RelTag, @@ -198,6 +208,71 @@ impl BufferTag { }; } +/// +/// Non-relation transaction status files (clog (a.k.a. pg_xact) and pg_multixact) +/// in Postgres are handled by SLRU (Simple LRU) buffer, hence the name. +/// +/// These files are global for a postgres instance. +/// +/// These files are divided into segments, which are divided into pages +/// of the same BLCKSZ as used for relation files. +/// +#[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq, Eq, PartialOrd, Ord)] +pub struct SlruBufferTag { + pub blknum: u32, +} + +/// +/// Special type of Postgres files: pg_filenode.map is needed to map +/// catalog table OIDs to filenode numbers, which define filename. +/// +/// Each database has a map file for its local mapped catalogs, +/// and there is a separate map file for shared catalogs. +/// +/// These files have untypical size of 512 bytes. +/// +/// See PostgreSQL relmapper.c for details. +/// +#[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq, Eq, PartialOrd, Ord)] +pub struct DatabaseTag { + pub spcnode: u32, + pub dbnode: u32, +} + +/// +/// Non-relation files that keep state for prepared transactions. +/// Unlike other files these are not divided into pages. +/// +/// See PostgreSQL twophase.c for details. +/// +#[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq, Eq, PartialOrd, Ord)] +pub struct PrepareTag { + pub xid: TransactionId, +} + +/// ObjectTag is a part of ObjectKey that is specific +/// to the type of the stored object. +#[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq, Eq, PartialOrd, Ord)] +pub enum ObjectTag { + // dummy tag preceeding all other keys + FirstTag, + TimelineMetadataTag, + // Special entry that represents PostgreSQL checkpoint. + // We use it to track fields needed to restore controlfile checkpoint. + Checkpoint, + // Various types of non-relation files. + // We need them to bootstrap compute node. + ControlFile, + Clog(SlruBufferTag), + MultiXactMembers(SlruBufferTag), + MultiXactOffsets(SlruBufferTag), + FileNodeMap(DatabaseTag), + TwoPhase(PrepareTag), + // put relations at the end of enum to allow efficient iterations through non-rel objects + RelationMetadata(RelTag), + RelationBuffer(BufferTag), +} + #[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)] pub struct WALRecord { pub lsn: Lsn, // LSN at the *end* of the record @@ -259,11 +334,11 @@ mod tests { /// Convenience function to create a BufferTag for testing. /// Helps to keeps the tests shorter. #[allow(non_snake_case)] - fn TEST_BUF(blknum: u32) -> BufferTag { - BufferTag { + fn TEST_BUF(blknum: u32) -> ObjectTag { + ObjectTag::RelationBuffer(BufferTag { rel: TESTREL_A, blknum, - } + }) } /// Convenience function to create a page image with given string as the only content @@ -493,15 +568,14 @@ mod tests { impl WalRedoManager for TestRedoManager { fn request_redo( &self, - tag: BufferTag, + tag: ObjectTag, lsn: Lsn, base_img: Option, records: Vec, ) -> Result { let s = format!( - "redo for rel {} blk {} to get to {}, with {} and {} records", - tag.rel, - tag.blknum, + "redo for {:?} to get to {}, with {} and {} records", + tag, lsn, if base_img.is_some() { "base image" diff --git a/pageserver/src/restore_local_repo.rs b/pageserver/src/restore_local_repo.rs index 7fae6a121b..5ffe704fa2 100644 --- a/pageserver/src/restore_local_repo.rs +++ b/pageserver/src/restore_local_repo.rs @@ -14,7 +14,9 @@ use std::path::{Path, PathBuf}; use anyhow::Result; use bytes::Bytes; -use crate::repository::{BufferTag, RelTag, Timeline, WALRecord}; +use crate::repository::{ + BufferTag, DatabaseTag, ObjectTag, PrepareTag, RelTag, SlruBufferTag, Timeline, WALRecord, +}; use crate::waldecoder::{decode_wal_record, DecodedWALRecord, Oid, WalStreamDecoder}; use crate::waldecoder::{XlCreateDatabase, XlSmgrTruncate}; use crate::PageServerConf; @@ -22,6 +24,7 @@ use crate::ZTimelineId; use postgres_ffi::pg_constants; use postgres_ffi::relfile_utils::*; use postgres_ffi::xlog_utils::*; +use postgres_ffi::*; use zenith_utils::lsn::Lsn; /// @@ -63,8 +66,21 @@ pub fn import_timeline_from_postgres_datadir( None => continue, // These special files appear in the snapshot, but are not needed by the page server - Some("pg_control") => continue, - Some("pg_filenode.map") => continue, + Some("pg_control") => import_nonrel_file( + timeline, + Lsn(0), // control file is not versioned + ObjectTag::ControlFile, + &direntry.path(), + )?, + Some("pg_filenode.map") => import_nonrel_file( + timeline, + lsn, + ObjectTag::FileNodeMap(DatabaseTag { + spcnode: pg_constants::GLOBALTABLESPACE_OID, + dbnode: 0, + }), + &direntry.path(), + )?, // Load any relation files into the page server _ => import_relfile( @@ -91,7 +107,15 @@ pub fn import_timeline_from_postgres_datadir( // These special files appear in the snapshot, but are not needed by the page server Some("PG_VERSION") => continue, - Some("pg_filenode.map") => continue, + Some("pg_filenode.map") => import_nonrel_file( + timeline, + lsn, + ObjectTag::FileNodeMap(DatabaseTag { + spcnode: pg_constants::DEFAULTTABLESPACE_OID, + dbnode: dboid, + }), + &direntry.path(), + )?, // Load any relation files into the page server _ => import_relfile( @@ -104,6 +128,43 @@ pub fn import_timeline_from_postgres_datadir( } } } + for entry in fs::read_dir(path.join("pg_xact"))? { + let entry = entry?; + import_slru_file( + timeline, + lsn, + |blknum| ObjectTag::Clog(SlruBufferTag { blknum }), + &entry.path(), + )?; + } + for entry in fs::read_dir(path.join("pg_multixact").join("members"))? { + let entry = entry?; + import_slru_file( + timeline, + lsn, + |blknum| ObjectTag::MultiXactMembers(SlruBufferTag { blknum }), + &entry.path(), + )?; + } + for entry in fs::read_dir(path.join("pg_multixact").join("offsets"))? { + let entry = entry?; + import_slru_file( + timeline, + lsn, + |blknum| ObjectTag::MultiXactOffsets(SlruBufferTag { blknum }), + &entry.path(), + )?; + } + for entry in fs::read_dir(path.join("pg_twophase"))? { + let entry = entry?; + let xid = u32::from_str_radix(&entry.path().to_str().unwrap(), 16)?; + import_nonrel_file( + timeline, + lsn, + ObjectTag::TwoPhase(PrepareTag { xid }), + &entry.path(), + )?; + } // TODO: Scan pg_tblspc timeline.checkpoint()?; @@ -136,7 +197,7 @@ fn import_relfile( let r = file.read_exact(&mut buf); match r { Ok(_) => { - let tag = BufferTag { + let tag = ObjectTag::RelationBuffer(BufferTag { rel: RelTag { spcnode: spcoid, dbnode: dboid, @@ -144,13 +205,62 @@ fn import_relfile( forknum, }, blknum, - }; + }); timeline.put_page_image(tag, lsn, Bytes::copy_from_slice(&buf))?; - /* - if oldest_lsn == 0 || p.lsn < oldest_lsn { - oldest_lsn = p.lsn; + } + + // TODO: UnexpectedEof is expected + Err(e) => match e.kind() { + std::io::ErrorKind::UnexpectedEof => { + // reached EOF. That's expected. + // FIXME: maybe check that we read the full length of the file? + break; } - */ + _ => { + error!("error reading file: {:?} ({})", path, e); + break; + } + }, + }; + blknum += 1; + } + + Ok(()) +} + +fn import_nonrel_file( + timeline: &dyn Timeline, + lsn: Lsn, + tag: ObjectTag, + path: &Path, +) -> Result<()> { + let mut file = File::open(path)?; + let mut buffer = Vec::new(); + // read the whole file + file.read_to_end(&mut buffer)?; + + timeline.put_page_image(tag, lsn, Bytes::copy_from_slice(&buffer[..]))?; + Ok(()) +} + +fn import_slru_file( + timeline: &dyn Timeline, + lsn: Lsn, + gen_tag: fn(blknum: u32) -> ObjectTag, + path: &Path, +) -> Result<()> { + // Does it look like a relation file? + + let mut file = File::open(path)?; + let mut buf: [u8; 8192] = [0u8; 8192]; + let segno = u32::from_str_radix(path.file_name().unwrap().to_str().unwrap(), 16)?; + + let mut blknum: u32 = segno * pg_constants::SLRU_PAGES_PER_SEGMENT; + loop { + let r = file.read_exact(&mut buf); + match r { + Ok(_) => { + timeline.put_page_image(gen_tag(blknum), lsn, Bytes::copy_from_slice(&buf))?; } // TODO: UnexpectedEof is expected @@ -180,6 +290,11 @@ pub fn import_timeline_wal(walpath: &Path, timeline: &dyn Timeline, startpoint: let mut segno = startpoint.segment_number(pg_constants::WAL_SEGMENT_SIZE); let mut offset = startpoint.segment_offset(pg_constants::WAL_SEGMENT_SIZE); let mut last_lsn = startpoint; + + let pg_control_bytes = timeline.get_page_at_lsn(ObjectTag::ControlFile, Lsn(0))?; + let pg_control = decode_pg_control(pg_control_bytes)?; + let mut checkpoint = pg_control.checkPointCopy.clone(); + loop { // FIXME: assume postgresql tli 1 for now let filename = XLogFileName(1, segno, pg_constants::WAL_SEGMENT_SIZE); @@ -217,10 +332,12 @@ pub fn import_timeline_wal(walpath: &Path, timeline: &dyn Timeline, startpoint: if rec.is_err() { // Assume that an error means we've reached the end of // a partial WAL record. So that's ok. + trace!("WAL decoder error {:?}", rec); + waldecoder.set_position(Lsn((segno + 1) * pg_constants::WAL_SEGMENT_SIZE as u64)); break; } if let Some((lsn, recdata)) = rec.unwrap() { - let decoded = decode_wal_record(recdata.clone()); + let decoded = decode_wal_record(&mut checkpoint, recdata.clone()); save_decoded_record(timeline, &decoded, recdata, lsn)?; last_lsn = lsn; } else { @@ -240,6 +357,8 @@ pub fn import_timeline_wal(walpath: &Path, timeline: &dyn Timeline, startpoint: offset = 0; } info!("reached end of WAL at {}", last_lsn); + let checkpoint_bytes = encode_checkpoint(checkpoint); + timeline.put_page_image(ObjectTag::Checkpoint, Lsn(0), checkpoint_bytes)?; Ok(()) } @@ -256,16 +375,6 @@ pub fn save_decoded_record( // Iterate through all the blocks that the record modifies, and // "put" a separate copy of the record for each block. for blk in decoded.blocks.iter() { - let tag = BufferTag { - rel: RelTag { - spcnode: blk.rnode_spcnode, - dbnode: blk.rnode_dbnode, - relnode: blk.rnode_relnode, - forknum: blk.forknum as u8, - }, - blknum: blk.blkno, - }; - let rec = WALRecord { lsn, will_init: blk.will_init || blk.apply_image, @@ -273,7 +382,7 @@ pub fn save_decoded_record( main_data_offset: decoded.main_data_offset as u32, }; - timeline.put_wal_record(tag, rec)?; + timeline.put_wal_record(blk.tag, rec)?; } // Handle a few special record types @@ -329,14 +438,14 @@ fn save_xlog_dbase_create(timeline: &dyn Timeline, lsn: Lsn, rec: &XlCreateDatab // Copy content for blknum in 0..nblocks { - let src_key = BufferTag { + let src_key = ObjectTag::RelationBuffer(BufferTag { rel: src_rel, blknum, - }; - let dst_key = BufferTag { + }); + let dst_key = ObjectTag::RelationBuffer(BufferTag { rel: dst_rel, blknum, - }; + }); let content = timeline.get_page_at_lsn(src_key, req_lsn)?; diff --git a/pageserver/src/rocksdb_storage.rs b/pageserver/src/rocksdb_storage.rs index b442053f6c..39055ac4b1 100644 --- a/pageserver/src/rocksdb_storage.rs +++ b/pageserver/src/rocksdb_storage.rs @@ -2,7 +2,7 @@ //! An implementation of the ObjectStore interface, backed by RocksDB //! use crate::object_store::{ObjectKey, ObjectStore}; -use crate::repository::{BufferTag, RelTag}; +use crate::repository::{BufferTag, ObjectTag, RelTag}; use crate::PageServerConf; use crate::ZTimelineId; use anyhow::{bail, Result}; @@ -24,7 +24,7 @@ impl StorageKey { Self { obj_key: ObjectKey { timeline, - buf_tag: BufferTag::ZEROED, + tag: ObjectTag::FirstTag, }, lsn: Lsn(0), } @@ -123,6 +123,17 @@ impl ObjectStore for RocksObjectStore { Ok(Box::new(iter)) } + /// Iterate through all timeline objects + fn list_objects<'a>( + &'a self, + timeline: ZTimelineId, + nonrel_only: bool, + lsn: Lsn, + ) -> Result + 'a>> { + let iter = RocksObjectIter::new(&self.db, timeline, nonrel_only, lsn)?; + Ok(Box::new(iter)) + } + /// Get a list of all distinct relations in given tablespace and database. /// /// TODO: This implementation is very inefficient, it scans @@ -143,7 +154,7 @@ impl ObjectStore for RocksObjectStore { let mut search_key = StorageKey { obj_key: ObjectKey { timeline: timelineid, - buf_tag: BufferTag { + tag: ObjectTag::RelationBuffer(BufferTag { rel: RelTag { spcnode, dbnode, @@ -151,7 +162,7 @@ impl ObjectStore for RocksObjectStore { forknum: 0u8, }, blknum: 0, - }, + }), }, lsn: Lsn(0), }; @@ -162,20 +173,21 @@ impl ObjectStore for RocksObjectStore { break; } let key = StorageKey::des(iter.key().unwrap())?; - if (spcnode != 0 && key.obj_key.buf_tag.rel.spcnode != spcnode) - || (dbnode != 0 && key.obj_key.buf_tag.rel.dbnode != dbnode) - { + if let ObjectTag::RelationBuffer(buf_tag) = key.obj_key.tag { + if (spcnode != 0 && buf_tag.rel.spcnode != spcnode) + || (dbnode != 0 && buf_tag.rel.dbnode != dbnode) + { + break; + } + if key.lsn < lsn { + rels.insert(buf_tag.rel); + } + let mut next_tag = buf_tag.clone(); + next_tag.rel.relnode += 1; // skip to next relation + search_key = ObjectTag::RelationBuffernext_tag); + } else { break; } - - if key.obj_key.buf_tag.rel.relnode != 0 // skip non-relational records (like timeline metadata) - && key.lsn < lsn - // visible in this snapshot - { - rels.insert(key.obj_key.buf_tag.rel); - } - search_key = key.clone(); - search_key.obj_key.buf_tag.rel.relnode += 1; // skip to next relation } Ok(rels) @@ -189,7 +201,7 @@ impl ObjectStore for RocksObjectStore { &'a self, timeline: ZTimelineId, lsn: Lsn, - ) -> Result)>> + 'a>> { + ) -> Result)>> + 'a>> { let start_key = StorageKey::timeline_start(timeline); let start_key_bytes = StorageKey::ser(&start_key)?; let iter = self.db.iterator(rocksdb::IteratorMode::From( @@ -296,7 +308,7 @@ impl<'a> Iterator for RocksObjectVersionIter<'a> { return None; } let key = StorageKey::des(self.dbiter.key().unwrap()).unwrap(); - if key.obj_key.buf_tag != self.obj_key.buf_tag { + if key.obj_key.tag != self.obj_key.tag { return None; } let val = self.dbiter.value().unwrap(); @@ -314,7 +326,7 @@ struct RocksObjects<'r> { impl<'r> Iterator for RocksObjects<'r> { // TODO consider returning Box<[u8]> - type Item = Result<(BufferTag, Lsn, Vec)>; + type Item = Result<(ObjectTag, Lsn, Vec)>; fn next(&mut self) -> Option { self.next_result().transpose() @@ -322,7 +334,7 @@ impl<'r> Iterator for RocksObjects<'r> { } impl<'r> RocksObjects<'r> { - fn next_result(&mut self) -> Result)>> { + fn next_result(&mut self) -> Result)>> { for (key_bytes, v) in &mut self.iter { let key = StorageKey::des(&key_bytes)?; @@ -335,9 +347,75 @@ impl<'r> RocksObjects<'r> { continue; } - return Ok(Some((key.obj_key.buf_tag, key.lsn, v.to_vec()))); + return Ok(Some((key.obj_key.tag, key.lsn, v.to_vec()))); } Ok(None) } } + +/// +/// Iterator for `list_objects`. Returns all objects preceeding specified LSN +/// +struct RocksObjectIter<'a> { + timeline: ZTimelineId, + key: StorageKey, + nonrel_only: bool, + lsn: Lsn, + dbiter: rocksdb::DBRawIterator<'a>, +} +impl<'a> RocksObjectIter<'a> { + fn new( + db: &'a rocksdb::DB, + timeline: ZTimelineId, + nonrel_only: bool, + lsn: Lsn, + ) -> Result> { + let key = StorageKey { + obj_key: ObjectKey { + timeline, + tag: ObjectTag::FirstTag, + }, + lsn: Lsn(0), + }; + let dbiter = db.raw_iterator(); + Ok(RocksObjectIter { + key, + timeline, + nonrel_only, + lsn, + dbiter, + }) + } +} +impl<'a> Iterator for RocksObjectIter<'a> { + type Item = ObjectTag; + + fn next(&mut self) -> std::option::Option { + loop { + self.dbiter.seek(StorageKey::ser(&self.key).unwrap()); + if !self.dbiter.valid() { + return None; + } + let key = StorageKey::des(self.dbiter.key().unwrap()).unwrap(); + if key.obj_key.timeline != self.timeline { + // End of this timeline + return None; + } + self.key = key.clone(); + self.key.lsn = Lsn(u64::MAX); // next seek should skip all versions + if key.lsn <= self.lsn { + // visible in this snapshot + if self.nonrel_only { + match key.obj_key.tag { + ObjectTag::RelationMetadata(_) => return None, + ObjectTag::RelationBuffer(_) => return None, + _ => return Some(key.obj_key.tag), + } + } else { + return Some(key.obj_key.tag); + } + } + } + } +} diff --git a/pageserver/src/waldecoder.rs b/pageserver/src/waldecoder.rs index 2fbb0af95f..7072cdbbd8 100644 --- a/pageserver/src/waldecoder.rs +++ b/pageserver/src/waldecoder.rs @@ -2,6 +2,7 @@ //! WAL decoder. For each WAL record, it decodes the record to figure out which data blocks //! the record affects, to add the records to the page cache. //! +use crate::repository::*; use bytes::{Buf, BufMut, Bytes, BytesMut}; use log::*; use postgres_ffi::pg_constants; @@ -9,7 +10,6 @@ use postgres_ffi::xlog_utils::*; use postgres_ffi::XLogLongPageHeaderData; use postgres_ffi::XLogPageHeaderData; use postgres_ffi::XLogRecord; - use std::cmp::min; use std::str; use thiserror::Error; @@ -61,6 +61,13 @@ impl WalStreamDecoder { } } + pub fn set_position(&mut self, lsn: Lsn) { + self.lsn = lsn; + self.contlen = 0; + self.padlen = 0; + self.inputbuf.clear(); + } + pub fn feed_bytes(&mut self, buf: &[u8]) { self.inputbuf.extend_from_slice(buf); } @@ -198,12 +205,7 @@ pub struct DecodedBkpBlock { //in_use: bool, /* Identify the block this refers to */ - pub rnode_spcnode: u32, - pub rnode_dbnode: u32, - pub rnode_relnode: u32, - // Note that we have a few special forknum values for non-rel files. - pub forknum: u8, - pub blkno: u32, + pub tag: ObjectTag, /* copy of the fork_flags field from the XLogRecordBlockHeader */ flags: u8, @@ -212,6 +214,7 @@ pub struct DecodedBkpBlock { has_image: bool, /* has image, even for consistency checking */ pub apply_image: bool, /* has image that should be restored */ pub will_init: bool, /* record doesn't need previous page version to apply */ + pub will_drop: bool, /* record drops relation */ //char *bkp_image; hole_offset: u16, hole_length: u16, @@ -226,16 +229,13 @@ pub struct DecodedBkpBlock { impl DecodedBkpBlock { pub fn new() -> DecodedBkpBlock { DecodedBkpBlock { - rnode_spcnode: 0, - rnode_dbnode: 0, - rnode_relnode: 0, - forknum: 0, - blkno: 0, + tag: ObjectTag::FirstTag, flags: 0, has_image: false, apply_image: false, will_init: false, + will_drop: false, hole_offset: 0, hole_length: 0, bimg_len: 0, @@ -490,10 +490,11 @@ impl XlMultiXactTruncate { // block data // ... // main data -pub fn decode_wal_record(record: Bytes) -> DecodedWALRecord { - let mut rnode_spcnode: u32 = 0; - let mut rnode_dbnode: u32 = 0; - let mut rnode_relnode: u32 = 0; +pub fn decode_wal_record(checkpoint: &mut CheckPoint, record: Bytes) -> DecodedWALRecord { + let mut spcnode: u32 = 0; + let mut dbnode: u32 = 0; + let mut relnode: u32 = 0; + let mut forknum: u8; let mut got_rnode = false; let mut buf = record.clone(); @@ -509,6 +510,7 @@ pub fn decode_wal_record(record: Bytes) -> DecodedWALRecord { xlogrec.xl_info ); + checkpoint.update_next_xid(xlogrec.xl_xid); let remaining: usize = xlogrec.xl_tot_len as usize - XLOG_SIZE_OF_XLOG_RECORD; if buf.remaining() != remaining { @@ -567,7 +569,7 @@ pub fn decode_wal_record(record: Bytes) -> DecodedWALRecord { max_block_id = block_id; fork_flags = buf.get_u8(); - blk.forknum = fork_flags & pg_constants::BKPBLOCK_FORK_MASK; + forknum = fork_flags & pg_constants::BKPBLOCK_FORK_MASK; blk.flags = fork_flags; blk.has_image = (fork_flags & pg_constants::BKPBLOCK_HAS_IMAGE) != 0; blk.has_data = (fork_flags & pg_constants::BKPBLOCK_HAS_DATA) != 0; @@ -673,9 +675,9 @@ pub fn decode_wal_record(record: Bytes) -> DecodedWALRecord { } } if fork_flags & pg_constants::BKPBLOCK_SAME_REL == 0 { - rnode_spcnode = buf.get_u32_le(); - rnode_dbnode = buf.get_u32_le(); - rnode_relnode = buf.get_u32_le(); + spcnode = buf.get_u32_le(); + dbnode = buf.get_u32_le(); + relnode = buf.get_u32_le(); got_rnode = true; } else if !got_rnode { // TODO @@ -686,18 +688,16 @@ pub fn decode_wal_record(record: Bytes) -> DecodedWALRecord { goto err; */ } - blk.rnode_spcnode = rnode_spcnode; - blk.rnode_dbnode = rnode_dbnode; - blk.rnode_relnode = rnode_relnode; - - blk.blkno = buf.get_u32_le(); - trace!( - "this record affects {}/{}/{} blk {}", - rnode_spcnode, - rnode_dbnode, - rnode_relnode, - blk.blkno - ); + blk.tag = ObjectTag::RelationBuffer(BufferTag { + rel: RelTag { + forknum, + spcnode, + dbnode, + relnode, + }, + blknum: buf.get_u32_le(), + }); + trace!("this record affects {:?}", blk.tag); blocks.push(blk); } @@ -719,10 +719,44 @@ pub fn decode_wal_record(record: Bytes) -> DecodedWALRecord { assert_eq!(buf.remaining(), main_data_len as usize); } - //5. Handle special XACT records - if xlogrec.xl_rmid == pg_constants::RM_XACT_ID { + //5. Handle special CLOG and XACT records + if xlogrec.xl_rmid == pg_constants::RM_CLOG_ID { + let mut blk = DecodedBkpBlock::new(); + let blknum = buf.get_i32_le() as u32; + blk.tag = ObjectTag::Clog(SlruBufferTag { blknum }); + let info = xlogrec.xl_info & !pg_constants::XLR_INFO_MASK; + if info == pg_constants::CLOG_ZEROPAGE { + blk.will_init = true; + } else { + assert!(info == pg_constants::CLOG_TRUNCATE); + blk.will_drop = true; + checkpoint.oldestXid = buf.get_u32_le(); + checkpoint.oldestXidDB = buf.get_u32_le(); + info!( + "RM_CLOG_ID truncate blkno {} oldestXid {} oldestXidDB {}", + blknum, checkpoint.oldestXid, checkpoint.oldestXidDB + ); + } + trace!("RM_CLOG_ID updates block {}", blknum); + blocks.push(blk); + } else if xlogrec.xl_rmid == pg_constants::RM_XACT_ID { let info = xlogrec.xl_info & pg_constants::XLOG_XACT_OPMASK; - if info == pg_constants::XLOG_XACT_COMMIT { + if info == pg_constants::XLOG_XACT_COMMIT || info == pg_constants::XLOG_XACT_COMMIT_PREPARED + { + if info == pg_constants::XLOG_XACT_COMMIT { + let mut blk = DecodedBkpBlock::new(); + let blknum = xlogrec.xl_xid / pg_constants::CLOG_XACTS_PER_PAGE; + blk.tag = ObjectTag::Clog(SlruBufferTag { blknum }); + trace!( + "XLOG_XACT_COMMIT xl_info {} xl_prev {:X}/{:X} xid {} updates block {} main_data_len {}", + xlogrec.xl_info, (xlogrec.xl_prev >> 32), + xlogrec.xl_prev & 0xffffffff, + xlogrec.xl_xid, + blknum, + main_data_len + ); + blocks.push(blk); + } //parse commit record to extract subtrans entries // xl_xact_commit starts with time of commit let _xact_time = buf.get_i64_le(); @@ -737,8 +771,16 @@ pub fn decode_wal_record(record: Bytes) -> DecodedWALRecord { } if xinfo & pg_constants::XACT_XINFO_HAS_SUBXACTS != 0 { let nsubxacts = buf.get_i32_le(); + let mut prev_blknum = u32::MAX; for _i in 0..nsubxacts { - let _subxact = buf.get_u32_le(); + let subxact = buf.get_u32_le(); + let blknum = subxact / pg_constants::CLOG_XACTS_PER_PAGE; + if prev_blknum != blknum { + prev_blknum = blknum; + let mut blk = DecodedBkpBlock::new(); + blk.tag = ObjectTag::Clog(SlruBufferTag { blknum }); + blocks.push(blk); + } } } if xinfo & pg_constants::XACT_XINFO_HAS_RELFILENODES != 0 { @@ -747,7 +789,15 @@ pub fn decode_wal_record(record: Bytes) -> DecodedWALRecord { let spcnode = buf.get_u32_le(); let dbnode = buf.get_u32_le(); let relnode = buf.get_u32_le(); - //TODO handle this too? + let mut blk = DecodedBkpBlock::new(); + blk.tag = ObjectTag::RelationMetadata(RelTag { + forknum: pg_constants::MAIN_FORKNUM, + spcnode, + dbnode, + relnode, + }); + blk.will_drop = true; + blocks.push(blk); trace!( "XLOG_XACT_COMMIT relfilenode {}/{}/{}", spcnode, @@ -764,11 +814,31 @@ pub fn decode_wal_record(record: Bytes) -> DecodedWALRecord { } } if xinfo & pg_constants::XACT_XINFO_HAS_TWOPHASE != 0 { - let _xid = buf.get_u32_le(); + let xid = buf.get_u32_le(); + let mut blk = DecodedBkpBlock::new(); + let blknum = xid / pg_constants::CLOG_XACTS_PER_PAGE; + blk.tag = ObjectTag::Clog(SlruBufferTag { blknum }); + blocks.push(blk); trace!("XLOG_XACT_COMMIT-XACT_XINFO_HAS_TWOPHASE"); //TODO handle this to be able to restore pg_twophase on node start } - } else if info == pg_constants::XLOG_XACT_ABORT { + } else if info == pg_constants::XLOG_XACT_ABORT + || info == pg_constants::XLOG_XACT_ABORT_PREPARED + { + if info == pg_constants::XLOG_XACT_ABORT { + let mut blk = DecodedBkpBlock::new(); + let blknum = xlogrec.xl_xid / pg_constants::CLOG_XACTS_PER_PAGE; + blk.tag = ObjectTag::Clog(SlruBufferTag { blknum }); + trace!( + "XLOG_XACT_ABORT xl_info {} xl_prev {:X}/{:X} xid {} updates block {} main_data_len {}", + xlogrec.xl_info, (xlogrec.xl_prev >> 32), + xlogrec.xl_prev & 0xffffffff, + xlogrec.xl_xid, + blknum, + main_data_len + ); + blocks.push(blk); + } //parse abort record to extract subtrans entries // xl_xact_abort starts with time of commit let _xact_time = buf.get_i64_le(); @@ -783,8 +853,16 @@ pub fn decode_wal_record(record: Bytes) -> DecodedWALRecord { } if xinfo & pg_constants::XACT_XINFO_HAS_SUBXACTS != 0 { let nsubxacts = buf.get_i32_le(); + let mut prev_blknum = u32::MAX; for _i in 0..nsubxacts { - let _subxact = buf.get_u32_le(); + let subxact = buf.get_u32_le(); + let blknum = subxact / pg_constants::CLOG_XACTS_PER_PAGE; + if prev_blknum != blknum { + prev_blknum = blknum; + let mut blk = DecodedBkpBlock::new(); + blk.tag = ObjectTag::Clog(SlruBufferTag { blknum }); + blocks.push(blk); + } } } if xinfo & pg_constants::XACT_XINFO_HAS_RELFILENODES != 0 { @@ -793,7 +871,15 @@ pub fn decode_wal_record(record: Bytes) -> DecodedWALRecord { let spcnode = buf.get_u32_le(); let dbnode = buf.get_u32_le(); let relnode = buf.get_u32_le(); - //TODO handle this too? + let mut blk = DecodedBkpBlock::new(); + blk.tag = ObjectTag::RelationMetadata(RelTag { + forknum: pg_constants::MAIN_FORKNUM, + spcnode, + dbnode, + relnode, + }); + blk.will_drop = true; + blocks.push(blk); trace!( "XLOG_XACT_ABORT relfilenode {}/{}/{}", spcnode, @@ -803,9 +889,21 @@ pub fn decode_wal_record(record: Bytes) -> DecodedWALRecord { } } if xinfo & pg_constants::XACT_XINFO_HAS_TWOPHASE != 0 { - let _xid = buf.get_u32_le(); + let xid = buf.get_u32_le(); + let mut blk = DecodedBkpBlock::new(); + let blknum = xid / pg_constants::CLOG_XACTS_PER_PAGE; + blk.tag = ObjectTag::Clog(SlruBufferTag { blknum }); + blocks.push(blk); trace!("XLOG_XACT_ABORT-XACT_XINFO_HAS_TWOPHASE"); } + } else if info == pg_constants::XLOG_XACT_PREPARE { + let mut blk = DecodedBkpBlock::new(); + blk.tag = ObjectTag::TwoPhase(PrepareTag { + xid: xlogrec.xl_xid, + }); + blk.will_init = true; + blocks.push(blk); + info!("Prepare transaction {}", xlogrec.xl_xid); } } else if xlogrec.xl_rmid == pg_constants::RM_DBASE_ID { let info = xlogrec.xl_info & !pg_constants::XLR_INFO_MASK; @@ -838,8 +936,7 @@ pub fn decode_wal_record(record: Bytes) -> DecodedWALRecord { trace!("XLOG_TBLSPC_DROP is not handled yet"); } } else if xlogrec.xl_rmid == pg_constants::RM_HEAP_ID { - let info = xlogrec.xl_info & pg_constants::XLOG_XACT_OPMASK; - let blkno = blocks[0].blkno / pg_constants::HEAPBLOCKS_PER_PAGE as u32; + let info = xlogrec.xl_info & pg_constants::XLR_RMGR_INFO_MASK; if info == pg_constants::XLOG_HEAP_INSERT { let xlrec = XlHeapInsert::decode(&mut buf); if (xlrec.flags @@ -847,52 +944,76 @@ pub fn decode_wal_record(record: Bytes) -> DecodedWALRecord { | pg_constants::XLH_INSERT_ALL_FROZEN_SET)) != 0 { - let mut blk = DecodedBkpBlock::new(); - blk.forknum = pg_constants::VISIBILITYMAP_FORKNUM; - blk.blkno = blkno; - blk.rnode_spcnode = blocks[0].rnode_spcnode; - blk.rnode_dbnode = blocks[0].rnode_dbnode; - blk.rnode_relnode = blocks[0].rnode_relnode; - blocks.push(blk); + if let ObjectTag::RelationBuffer(tag0) = blocks[0].tag { + let mut blk = DecodedBkpBlock::new(); + blk.tag = ObjectTag::RelationBuffer(BufferTag { + rel: RelTag { + forknum: pg_constants::VISIBILITYMAP_FORKNUM, + spcnode: tag0.rel.spcnode, + dbnode: tag0.rel.dbnode, + relnode: tag0.rel.relnode, + }, + blknum: tag0.blknum / pg_constants::HEAPBLOCKS_PER_PAGE as u32, + }); + blocks.push(blk); + } } } else if info == pg_constants::XLOG_HEAP_DELETE { let xlrec = XlHeapDelete::decode(&mut buf); if (xlrec.flags & pg_constants::XLH_DELETE_ALL_VISIBLE_CLEARED) != 0 { - let mut blk = DecodedBkpBlock::new(); - blk.forknum = pg_constants::VISIBILITYMAP_FORKNUM; - blk.blkno = blkno; - blk.rnode_spcnode = blocks[0].rnode_spcnode; - blk.rnode_dbnode = blocks[0].rnode_dbnode; - blk.rnode_relnode = blocks[0].rnode_relnode; - blocks.push(blk); + if let ObjectTag::RelationBuffer(tag0) = blocks[0].tag { + let mut blk = DecodedBkpBlock::new(); + blk.tag = ObjectTag::RelationBuffer(BufferTag { + rel: RelTag { + forknum: pg_constants::VISIBILITYMAP_FORKNUM, + spcnode: tag0.rel.spcnode, + dbnode: tag0.rel.dbnode, + relnode: tag0.rel.relnode, + }, + blknum: tag0.blknum / pg_constants::HEAPBLOCKS_PER_PAGE as u32, + }); + blocks.push(blk); + } } } else if info == pg_constants::XLOG_HEAP_UPDATE || info == pg_constants::XLOG_HEAP_HOT_UPDATE { let xlrec = XlHeapUpdate::decode(&mut buf); if (xlrec.flags & pg_constants::XLH_UPDATE_NEW_ALL_VISIBLE_CLEARED) != 0 { - let mut blk = DecodedBkpBlock::new(); - blk.forknum = pg_constants::VISIBILITYMAP_FORKNUM; - blk.blkno = blkno; - blk.rnode_spcnode = blocks[0].rnode_spcnode; - blk.rnode_dbnode = blocks[0].rnode_dbnode; - blk.rnode_relnode = blocks[0].rnode_relnode; - blocks.push(blk); + if let ObjectTag::RelationBuffer(tag0) = blocks[0].tag { + let mut blk = DecodedBkpBlock::new(); + blk.tag = ObjectTag::RelationBuffer(BufferTag { + rel: RelTag { + forknum: pg_constants::VISIBILITYMAP_FORKNUM, + spcnode: tag0.rel.spcnode, + dbnode: tag0.rel.dbnode, + relnode: tag0.rel.relnode, + }, + blknum: tag0.blknum / pg_constants::HEAPBLOCKS_PER_PAGE as u32, + }); + blocks.push(blk); + } } if (xlrec.flags & pg_constants::XLH_UPDATE_OLD_ALL_VISIBLE_CLEARED) != 0 && blocks.len() > 1 { - let mut blk = DecodedBkpBlock::new(); - blk.forknum = pg_constants::VISIBILITYMAP_FORKNUM; - blk.blkno = blocks[1].blkno / pg_constants::HEAPBLOCKS_PER_PAGE as u32; - blk.rnode_spcnode = blocks[1].rnode_spcnode; - blk.rnode_dbnode = blocks[1].rnode_dbnode; - blk.rnode_relnode = blocks[1].rnode_relnode; - blocks.push(blk); + if let ObjectTag::RelationBuffer(tag1) = blocks[1].tag { + let mut blk = DecodedBkpBlock::new(); + blk.tag = ObjectTag::RelationBuffer(BufferTag { + rel: RelTag { + forknum: pg_constants::VISIBILITYMAP_FORKNUM, + spcnode: tag1.rel.spcnode, + dbnode: tag1.rel.dbnode, + relnode: tag1.rel.relnode, + }, + blknum: tag1.blknum / pg_constants::HEAPBLOCKS_PER_PAGE as u32, + }); + blocks.push(blk); + } } } } else if xlogrec.xl_rmid == pg_constants::RM_HEAP2_ID { - let info = xlogrec.xl_info & pg_constants::XLOG_XACT_OPMASK; + let info = xlogrec.xl_info & pg_constants::XLR_RMGR_INFO_MASK; if info == pg_constants::XLOG_HEAP2_MULTI_INSERT { let xlrec = XlHeapMultiInsert::decode(&mut buf); if (xlrec.flags @@ -900,15 +1021,114 @@ pub fn decode_wal_record(record: Bytes) -> DecodedWALRecord { | pg_constants::XLH_INSERT_ALL_FROZEN_SET)) != 0 { + if let ObjectTag::RelationBuffer(tag0) = blocks[0].tag { + let mut blk = DecodedBkpBlock::new(); + blk.tag = ObjectTag::RelationBuffer(BufferTag { + rel: RelTag { + forknum: pg_constants::VISIBILITYMAP_FORKNUM, + spcnode: tag0.rel.spcnode, + dbnode: tag0.rel.dbnode, + relnode: tag0.rel.relnode, + }, + blknum: tag0.blknum / pg_constants::HEAPBLOCKS_PER_PAGE as u32, + }); + blocks.push(blk); + } + } + } + } else if xlogrec.xl_rmid == pg_constants::RM_MULTIXACT_ID { + let info = xlogrec.xl_info & pg_constants::XLR_RMGR_INFO_MASK; + if info == pg_constants::XLOG_MULTIXACT_ZERO_OFF_PAGE { + let mut blk = DecodedBkpBlock::new(); + blk.tag = ObjectTag::MultiXactOffsets(SlruBufferTag { + blknum: buf.get_u32_le(), + }); + blk.will_init = true; + blocks.push(blk); + } else if info == pg_constants::XLOG_MULTIXACT_ZERO_MEM_PAGE { + let mut blk = DecodedBkpBlock::new(); + blk.tag = ObjectTag::MultiXactMembers(SlruBufferTag { + blknum: buf.get_u32_le(), + }); + blk.will_init = true; + blocks.push(blk); + } else if info == pg_constants::XLOG_MULTIXACT_CREATE_ID { + let xlrec = XlMultiXactCreate::decode(&mut buf); + // Update offset page + let mut blk = DecodedBkpBlock::new(); + let blknum = xlrec.mid / pg_constants::MULTIXACT_OFFSETS_PER_PAGE as u32; + blk.tag = ObjectTag::MultiXactOffsets(SlruBufferTag { blknum }); + blocks.push(blk); + let first_mbr_blkno = xlrec.moff / pg_constants::MULTIXACT_MEMBERS_PER_PAGE as u32; + let last_mbr_blkno = + (xlrec.moff + xlrec.nmembers - 1) / pg_constants::MULTIXACT_MEMBERS_PER_PAGE as u32; + for blknum in first_mbr_blkno..=last_mbr_blkno { + // Update members page let mut blk = DecodedBkpBlock::new(); - let blkno = blocks[0].blkno / pg_constants::HEAPBLOCKS_PER_PAGE as u32; - blk.forknum = pg_constants::VISIBILITYMAP_FORKNUM; - blk.blkno = blkno; - blk.rnode_spcnode = blocks[0].rnode_spcnode; - blk.rnode_dbnode = blocks[0].rnode_dbnode; - blk.rnode_relnode = blocks[0].rnode_relnode; + blk.tag = ObjectTag::MultiXactMembers(SlruBufferTag { blknum }); blocks.push(blk); } + if xlrec.mid >= checkpoint.nextMulti { + checkpoint.nextMulti = xlrec.mid + 1; + } + if xlrec.moff + xlrec.nmembers > checkpoint.nextMultiOffset { + checkpoint.nextMultiOffset = xlrec.moff + xlrec.nmembers; + } + let max_mbr_xid = + xlrec.members.iter().fold( + 0u32, + |acc, mbr| { + if mbr.xid > acc { + mbr.xid + } else { + acc + } + }, + ); + checkpoint.update_next_xid(max_mbr_xid); + } else if info == pg_constants::XLOG_MULTIXACT_TRUNCATE_ID { + let xlrec = XlMultiXactTruncate::decode(&mut buf); + checkpoint.oldestMulti = xlrec.end_trunc_off; + checkpoint.oldestMultiDB = xlrec.oldest_multi_db; + let first_off_blkno = + xlrec.start_trunc_off / pg_constants::MULTIXACT_OFFSETS_PER_PAGE as u32; + let last_off_blkno = + xlrec.end_trunc_off / pg_constants::MULTIXACT_OFFSETS_PER_PAGE as u32; + for blknum in first_off_blkno..last_off_blkno { + let mut blk = DecodedBkpBlock::new(); + blk.tag = ObjectTag::MultiXactOffsets(SlruBufferTag { blknum }); + blk.will_drop = true; + blocks.push(blk); + } + let first_mbr_blkno = + xlrec.start_trunc_memb / pg_constants::MULTIXACT_MEMBERS_PER_PAGE as u32; + let last_mbr_blkno = + xlrec.end_trunc_memb / pg_constants::MULTIXACT_MEMBERS_PER_PAGE as u32; + for blknum in first_mbr_blkno..last_mbr_blkno { + let mut blk = DecodedBkpBlock::new(); + blk.tag = ObjectTag::MultiXactMembers(SlruBufferTag { blknum }); + blk.will_drop = true; + blocks.push(blk); + } + } else { + panic!() + } + } else if xlogrec.xl_rmid == pg_constants::RM_RELMAP_ID { + let xlrec = XlRelmapUpdate::decode(&mut buf); + let mut blk = DecodedBkpBlock::new(); + blk.tag = ObjectTag::FileNodeMap(DatabaseTag { + spcnode: xlrec.tsid, + dbnode: xlrec.dbid, + }); + blk.will_init = true; + blocks.push(blk); + } else if xlogrec.xl_rmid == pg_constants::RM_XLOG_ID { + let info = xlogrec.xl_info & pg_constants::XLR_RMGR_INFO_MASK; + if info == pg_constants::XLOG_NEXTOID { + let next_oid = buf.get_u32_le(); + if next_oid > checkpoint.nextOid { + checkpoint.nextOid = next_oid; + } } } diff --git a/pageserver/src/walreceiver.rs b/pageserver/src/walreceiver.rs index e1c80932a1..22c7bfe6ff 100644 --- a/pageserver/src/walreceiver.rs +++ b/pageserver/src/walreceiver.rs @@ -5,6 +5,7 @@ //! We keep one WAL receiver active per timeline. use crate::page_cache; +use crate::repository::*; use crate::restore_local_repo; use crate::waldecoder::*; use crate::PageServerConf; @@ -15,8 +16,8 @@ use log::*; use postgres::fallible_iterator::FallibleIterator; use postgres::replication::ReplicationIter; use postgres::{Client, NoTls, SimpleQueryMessage, SimpleQueryRow}; -use postgres_ffi::pg_constants; use postgres_ffi::xlog_utils::*; +use postgres_ffi::*; use postgres_protocol::message::backend::ReplicationMessage; use postgres_types::PgLsn; use std::cmp::{max, min}; @@ -149,6 +150,11 @@ fn walreceiver_main( error!("No previous WAL position"); } + startpoint = Lsn::max( + startpoint, + Lsn(end_of_wal.0 & !(pg_constants::WAL_SEGMENT_SIZE as u64 - 1)), + ); + // There might be some padding after the last full record, skip it. // // FIXME: It probably would be better to always start streaming from the beginning @@ -168,6 +174,10 @@ fn walreceiver_main( let mut waldecoder = WalStreamDecoder::new(startpoint); + let checkpoint_bytes = timeline.get_page_at_lsn(ObjectTag::Checkpoint, Lsn(0))?; + let mut checkpoint = decode_checkpoint(checkpoint_bytes)?; + trace!("CheckPoint.nextXid = {}", checkpoint.nextXid.value); + while let Some(replication_message) = physical_stream.next()? { let status_update = match replication_message { ReplicationMessage::XLogData(xlog_data) => { @@ -185,9 +195,19 @@ fn walreceiver_main( waldecoder.feed_bytes(data); while let Some((lsn, recdata)) = waldecoder.poll_decode()? { - let decoded = decode_wal_record(recdata.clone()); + let old_checkpoint_bytes = encode_checkpoint(checkpoint); + let decoded = decode_wal_record(&mut checkpoint, recdata.clone()); restore_local_repo::save_decoded_record(&*timeline, &decoded, recdata, lsn)?; last_rec_lsn = lsn; + + let new_checkpoint_bytes = encode_checkpoint(checkpoint); + if new_checkpoint_bytes != old_checkpoint_bytes { + timeline.put_page_image( + ObjectTag::Checkpoint, + Lsn(0), + new_checkpoint_bytes, + )?; + } } // Update the last_valid LSN value in the page cache one more time. We updated diff --git a/pageserver/src/walredo.rs b/pageserver/src/walredo.rs index f410693178..c9140c50e1 100644 --- a/pageserver/src/walredo.rs +++ b/pageserver/src/walredo.rs @@ -15,8 +15,10 @@ //! TODO: Even though the postgres code runs in a separate process, //! it's not a secure sandbox. //! -use bytes::{BufMut, Bytes, BytesMut}; +use byteorder::{ByteOrder, LittleEndian}; +use bytes::{Buf, BufMut, Bytes, BytesMut}; use log::*; +use std::assert; use std::cell::RefCell; use std::fs; use std::fs::OpenOptions; @@ -35,9 +37,12 @@ use tokio::time::timeout; use zenith_utils::bin_ser::BeSer; use zenith_utils::lsn::Lsn; -use crate::repository::BufferTag; -use crate::repository::WALRecord; +use crate::repository::{BufferTag, ObjectTag, WALRecord}; +use crate::waldecoder::{MultiXactId, XlMultiXactCreate}; use crate::PageServerConf; +use postgres_ffi::nonrelfile_utils::transaction_id_set_status; +use postgres_ffi::pg_constants; +use postgres_ffi::xlog_utils::XLogRecord; /// /// WAL Redo Manager is responsible for replaying WAL records. @@ -52,7 +57,7 @@ pub trait WalRedoManager: Send + Sync { /// the reords. fn request_redo( &self, - tag: BufferTag, + tag: ObjectTag, lsn: Lsn, base_img: Option, records: Vec, @@ -68,7 +73,7 @@ pub struct DummyRedoManager {} impl crate::walredo::WalRedoManager for DummyRedoManager { fn request_redo( &self, - _tag: BufferTag, + _tag: ObjectTag, _lsn: Lsn, _base_img: Option, _records: Vec, @@ -97,7 +102,7 @@ struct PostgresRedoManagerInternal { #[derive(Debug)] struct WalRedoRequest { - tag: BufferTag, + tag: ObjectTag, lsn: Lsn, base_img: Option, @@ -159,14 +164,13 @@ impl WalRedoManager for PostgresRedoManager { /// fn request_redo( &self, - tag: BufferTag, + tag: ObjectTag, lsn: Lsn, base_img: Option, records: Vec, ) -> Result { // Create a channel where to receive the response let (tx, rx) = mpsc::channel::>(); - let request = WalRedoRequest { tag, lsn, @@ -174,7 +178,6 @@ impl WalRedoManager for PostgresRedoManager { records, response_channel: tx, }; - self.request_tx .lock() .unwrap() @@ -186,6 +189,24 @@ impl WalRedoManager for PostgresRedoManager { } } +fn mx_offset_to_flags_offset(xid: MultiXactId) -> usize { + ((xid / pg_constants::MULTIXACT_MEMBERS_PER_MEMBERGROUP as u32) as u16 + % pg_constants::MULTIXACT_MEMBERGROUPS_PER_PAGE + * pg_constants::MULTIXACT_MEMBERGROUP_SIZE) as usize +} + +fn mx_offset_to_flags_bitshift(xid: MultiXactId) -> u16 { + (xid as u16) % pg_constants::MULTIXACT_MEMBERS_PER_MEMBERGROUP + * pg_constants::MXACT_MEMBER_BITS_PER_XACT +} + +/* Location (byte offset within page) of TransactionId of given member */ +fn mx_offset_to_member_offset(xid: MultiXactId) -> usize { + mx_offset_to_flags_offset(xid) + + (pg_constants::MULTIXACT_FLAGBYTES_PER_GROUP + + (xid as u16 % pg_constants::MULTIXACT_MEMBERS_PER_MEMBERGROUP) * 4) as usize +} + /// /// WAL redo thread /// @@ -249,7 +270,218 @@ impl PostgresRedoManagerInternal { let start = Instant::now(); let apply_result: Result; - apply_result = process.apply_wal_records(tag, base_img, records).await; + if let ObjectTag::RelationBuffer(buf_tag) = tag { + apply_result = process.apply_wal_records(buf_tag, base_img, records).await; + } else { + const ZERO_PAGE: [u8; 8192] = [0u8; 8192]; + let mut page = BytesMut::new(); + if let Some(fpi) = base_img { + page.extend_from_slice(&fpi[..]); + } else { + page.extend_from_slice(&ZERO_PAGE); + } + for record in records { + let mut buf = record.rec.clone(); + + // 1. Parse XLogRecord struct + // FIXME: refactor to avoid code duplication. + let xlogrec = XLogRecord::from_bytes(&mut buf); + + //move to main data + // TODO probably, we should store some records in our special format + // to avoid this weird parsing on replay + let skip = (record.main_data_offset - pg_constants::SIZEOF_XLOGRECORD) as usize; + if buf.remaining() > skip { + buf.advance(skip); + } + + if xlogrec.xl_rmid == pg_constants::RM_CLOG_ID { + let info = xlogrec.xl_info & !pg_constants::XLR_INFO_MASK; + if info == pg_constants::CLOG_ZEROPAGE { + page.copy_from_slice(&ZERO_PAGE); + } + } else if xlogrec.xl_rmid == pg_constants::RM_XACT_ID { + let info = xlogrec.xl_info & pg_constants::XLOG_XACT_OPMASK; + let mut status = 0; + let tag_blknum = match tag { + ObjectTag::Clog(slru) => slru.blknum, + _ => panic!("Not CLOG object tag"), + }; + if info == pg_constants::XLOG_XACT_COMMIT + || info == pg_constants::XLOG_XACT_COMMIT_PREPARED + { + status = pg_constants::TRANSACTION_STATUS_COMMITTED; + if info == pg_constants::XLOG_XACT_COMMIT { + transaction_id_set_status(xlogrec.xl_xid, status, &mut page); + } + //handle subtrans + let _xact_time = buf.get_i64_le(); + let mut xinfo = 0; + if xlogrec.xl_info & pg_constants::XLOG_XACT_HAS_INFO != 0 { + xinfo = buf.get_u32_le(); + if xinfo & pg_constants::XACT_XINFO_HAS_DBINFO != 0 { + let _dbid = buf.get_u32_le(); + let _tsid = buf.get_u32_le(); + } + } + + if xinfo & pg_constants::XACT_XINFO_HAS_SUBXACTS != 0 { + let nsubxacts = buf.get_i32_le(); + for _i in 0..nsubxacts { + let subxact = buf.get_u32_le(); + let blkno = subxact as u32 / pg_constants::CLOG_XACTS_PER_PAGE; + // only update xids on the requested page + + if tag_blknum == blkno { + status = pg_constants::TRANSACTION_STATUS_SUB_COMMITTED; + transaction_id_set_status(subxact, status, &mut page); + } + } + } + if info == pg_constants::XLOG_XACT_COMMIT_PREPARED { + if xinfo & pg_constants::XACT_XINFO_HAS_RELFILENODES != 0 { + let nrels = buf.get_i32_le(); + for _i in 0..nrels { + let spcnode = buf.get_u32_le(); + let dbnode = buf.get_u32_le(); + let relnode = buf.get_u32_le(); + //TODO handle this too? + trace!( + "XLOG_XACT_COMMIT relfilenode {}/{}/{}", + spcnode, + dbnode, + relnode + ); + } + } + if xinfo & pg_constants::XACT_XINFO_HAS_INVALS != 0 { + let nmsgs = buf.get_i32_le(); + for _i in 0..nmsgs { + let sizeof_shared_invalidation_message = 0; + buf.advance(sizeof_shared_invalidation_message); + } + } + assert!((xinfo & pg_constants::XACT_XINFO_HAS_TWOPHASE) != 0); + let xid = buf.get_u32_le(); + transaction_id_set_status(xid, status, &mut page); + } + } else if info == pg_constants::XLOG_XACT_ABORT + || info == pg_constants::XLOG_XACT_ABORT_PREPARED + { + status = pg_constants::TRANSACTION_STATUS_ABORTED; + if info == pg_constants::XLOG_XACT_ABORT { + transaction_id_set_status(xlogrec.xl_xid, status, &mut page); + } + //handle subtrans + let _xact_time = buf.get_i64_le(); + let mut xinfo = 0; + if xlogrec.xl_info & pg_constants::XLOG_XACT_HAS_INFO != 0 { + xinfo = buf.get_u32_le(); + if xinfo & pg_constants::XACT_XINFO_HAS_DBINFO != 0 { + let _dbid = buf.get_u32_le(); + let _tsid = buf.get_u32_le(); + } + } + + if xinfo & pg_constants::XACT_XINFO_HAS_SUBXACTS != 0 { + let nsubxacts = buf.get_i32_le(); + for _i in 0..nsubxacts { + let subxact = buf.get_u32_le(); + let blkno = subxact as u32 / pg_constants::CLOG_XACTS_PER_PAGE; + // only update xids on the requested page + if tag_blknum == blkno { + status = pg_constants::TRANSACTION_STATUS_ABORTED; + transaction_id_set_status(subxact, status, &mut page); + } + } + } + if info == pg_constants::XLOG_XACT_ABORT_PREPARED { + if xinfo & pg_constants::XACT_XINFO_HAS_RELFILENODES != 0 { + let nrels = buf.get_i32_le(); + for _i in 0..nrels { + let spcnode = buf.get_u32_le(); + let dbnode = buf.get_u32_le(); + let relnode = buf.get_u32_le(); + //TODO handle this too? + trace!( + "XLOG_XACT_COMMIT relfilenode {}/{}/{}", + spcnode, + dbnode, + relnode + ); + } + } + if xinfo & pg_constants::XACT_XINFO_HAS_INVALS != 0 { + let nmsgs = buf.get_i32_le(); + for _i in 0..nmsgs { + let sizeof_shared_invalidation_message = 0; + buf.advance(sizeof_shared_invalidation_message); + } + } + assert!((xinfo & pg_constants::XACT_XINFO_HAS_TWOPHASE) != 0); + let xid = buf.get_u32_le(); + transaction_id_set_status(xid, status, &mut page); + } + } else if info == pg_constants::XLOG_XACT_PREPARE { + info!("Apply prepare {} record", xlogrec.xl_xid); + page.clear(); + page.extend_from_slice(&buf[..]); + } else { + error!("handle_apply_request for RM_XACT_ID-{} NOT SUPPORTED YET. RETURN. lsn {} main_data_offset {}, rec.len {}", + status, + record.lsn, + record.main_data_offset, record.rec.len()); + } + } else if xlogrec.xl_rmid == pg_constants::RM_MULTIXACT_ID { + let info = xlogrec.xl_info & pg_constants::XLR_RMGR_INFO_MASK; + if info == pg_constants::XLOG_MULTIXACT_ZERO_OFF_PAGE + || info == pg_constants::XLOG_MULTIXACT_ZERO_MEM_PAGE + { + page.copy_from_slice(&ZERO_PAGE); + } else if info == pg_constants::XLOG_MULTIXACT_CREATE_ID { + let xlrec = XlMultiXactCreate::decode(&mut buf); + if let ObjectTag::MultiXactMembers(slru) = tag { + for i in 0..xlrec.nmembers { + let blkno = i / pg_constants::MULTIXACT_MEMBERS_PER_PAGE as u32; + if blkno == slru.blknum { + // update only target block + let offset = xlrec.moff + i; + let memberoff = mx_offset_to_member_offset(offset); + let flagsoff = mx_offset_to_flags_offset(offset); + let bshift = mx_offset_to_flags_bitshift(offset); + let mut flagsval = + LittleEndian::read_u32(&page[flagsoff..flagsoff + 4]); + flagsval &= + !(((1 << pg_constants::MXACT_MEMBER_BITS_PER_XACT) - 1) + << bshift); + flagsval |= xlrec.members[i as usize].status << bshift; + LittleEndian::write_u32( + &mut page[flagsoff..flagsoff + 4], + flagsval, + ); + LittleEndian::write_u32( + &mut page[memberoff..memberoff + 4], + xlrec.members[i as usize].xid, + ); + } + } + } else { + let offs = (xlrec.mid % pg_constants::MULTIXACT_OFFSETS_PER_PAGE as u32 + * 4) as usize; + LittleEndian::write_u32(&mut page[offs..offs + 4], xlrec.moff); + } + } else { + panic!(); + } + } else if xlogrec.xl_rmid == pg_constants::RM_RELMAP_ID { + page.clear(); + page.extend_from_slice(&buf[12..]); // skip xl_relmap_update + assert!(page.len() == 512); // size of pg_filenode.map + } + } + + apply_result = Ok::(page.freeze()); + } let duration = start.elapsed(); diff --git a/postgres_ffi/build.rs b/postgres_ffi/build.rs index a29f2bbb6d..65c3ef6bce 100644 --- a/postgres_ffi/build.rs +++ b/postgres_ffi/build.rs @@ -15,6 +15,7 @@ fn main() { // All the needed PostgreSQL headers are included from 'pg_control_ffi.h' // .header("pg_control_ffi.h") + .header("xlog_ffi.h") // // Tell cargo to invalidate the built crate whenever any of the // included header files changed. diff --git a/postgres_ffi/src/lib.rs b/postgres_ffi/src/lib.rs index 56dd1f1003..970f4078e9 100644 --- a/postgres_ffi/src/lib.rs +++ b/postgres_ffi/src/lib.rs @@ -5,5 +5,6 @@ include!(concat!(env!("OUT_DIR"), "/bindings.rs")); pub mod controlfile_utils; pub mod pg_constants; +pub mod nonrelfile_utils; pub mod relfile_utils; pub mod xlog_utils; diff --git a/postgres_ffi/src/pg_constants.rs b/postgres_ffi/src/pg_constants.rs index d8223d352f..61c54b05d2 100644 --- a/postgres_ffi/src/pg_constants.rs +++ b/postgres_ffi/src/pg_constants.rs @@ -21,13 +21,24 @@ pub const FSM_FORKNUM: u8 = 1; pub const VISIBILITYMAP_FORKNUM: u8 = 2; pub const INIT_FORKNUM: u8 = 3; -pub const ROCKSDB_SPECIAL_FORKNUM: u8 = 50; - // From storage_xlog.h pub const SMGR_TRUNCATE_HEAP: u32 = 0x0001; pub const SMGR_TRUNCATE_VM: u32 = 0x0002; pub const SMGR_TRUNCATE_FSM: u32 = 0x0004; +// from pg_config.h. These can be changed with configure options --with-blocksize=BLOCKSIZE and +// --with-segsize=SEGSIZE, but assume the defaults for now. +pub const BLCKSZ: u16 = 8192; +pub const RELSEG_SIZE: u32 = 1024 * 1024 * 1024 / (BLCKSZ as u32); + +// +// constants from clog.h +// +pub const CLOG_XACTS_PER_BYTE: u32 = 4; +pub const CLOG_XACTS_PER_PAGE: u32 = BLCKSZ as u32 * CLOG_XACTS_PER_BYTE; +pub const CLOG_BITS_PER_XACT: u8 = 2; +pub const CLOG_XACT_BITMASK: u8 = (1 << CLOG_BITS_PER_XACT) - 1; + // // Constants from visbilitymap.h // @@ -35,14 +46,24 @@ pub const SIZE_OF_PAGE_HEADER: u16 = 24; pub const BITS_PER_HEAPBLOCK: u16 = 2; pub const HEAPBLOCKS_PER_PAGE: u16 = (BLCKSZ - SIZE_OF_PAGE_HEADER) * 8 / BITS_PER_HEAPBLOCK; +pub const TRANSACTION_STATUS_IN_PROGRESS: u8 = 0x00; pub const TRANSACTION_STATUS_COMMITTED: u8 = 0x01; pub const TRANSACTION_STATUS_ABORTED: u8 = 0x02; pub const TRANSACTION_STATUS_SUB_COMMITTED: u8 = 0x03; +pub const CLOG_ZEROPAGE: u8 = 0x00; +pub const CLOG_TRUNCATE: u8 = 0x10; + // From xact.h pub const XLOG_XACT_COMMIT: u8 = 0x00; pub const XLOG_XACT_PREPARE: u8 = 0x10; pub const XLOG_XACT_ABORT: u8 = 0x20; +pub const XLOG_XACT_COMMIT_PREPARED: u8 = 0x30; +pub const XLOG_XACT_ABORT_PREPARED: u8 = 0x40; + +// From srlu.h +pub const SLRU_PAGES_PER_SEGMENT: u32 = 32; +pub const SLRU_SEG_SIZE: usize = BLCKSZ as usize * SLRU_PAGES_PER_SEGMENT as usize; /* mask for filtering opcodes out of xl_info */ pub const XLOG_XACT_OPMASK: u8 = 0x70; @@ -63,9 +84,29 @@ pub const XACT_XINFO_HAS_TWOPHASE: u32 = 1u32 << 4; // pub const XACT_XINFO_HAS_GID: u32 = 1u32 << 7; // From pg_control.h and rmgrlist.h +pub const XLOG_NEXTOID: u8 = 0x30; pub const XLOG_SWITCH: u8 = 0x40; pub const XLOG_SMGR_TRUNCATE: u8 = 0x20; +// From multixact.h +pub const XLOG_MULTIXACT_ZERO_OFF_PAGE: u8 = 0x00; +pub const XLOG_MULTIXACT_ZERO_MEM_PAGE: u8 = 0x10; +pub const XLOG_MULTIXACT_CREATE_ID: u8 = 0x20; +pub const XLOG_MULTIXACT_TRUNCATE_ID: u8 = 0x30; + +pub const MULTIXACT_OFFSETS_PER_PAGE: u16 = BLCKSZ / 4; +pub const MXACT_MEMBER_BITS_PER_XACT: u16 = 8; +pub const MXACT_MEMBER_FLAGS_PER_BYTE: u16 = 1; +pub const MULTIXACT_FLAGBYTES_PER_GROUP: u16 = 4; +pub const MULTIXACT_MEMBERS_PER_MEMBERGROUP: u16 = + MULTIXACT_FLAGBYTES_PER_GROUP * MXACT_MEMBER_FLAGS_PER_BYTE; +/* size in bytes of a complete group */ +pub const MULTIXACT_MEMBERGROUP_SIZE: u16 = + 4 * MULTIXACT_MEMBERS_PER_MEMBERGROUP + MULTIXACT_FLAGBYTES_PER_GROUP; +pub const MULTIXACT_MEMBERGROUPS_PER_PAGE: u16 = BLCKSZ / MULTIXACT_MEMBERGROUP_SIZE; +pub const MULTIXACT_MEMBERS_PER_PAGE: u16 = + MULTIXACT_MEMBERGROUPS_PER_PAGE * MULTIXACT_MEMBERS_PER_MEMBERGROUP; + // From heapam_xlog.h pub const XLOG_HEAP_INSERT: u8 = 0x00; pub const XLOG_HEAP_DELETE: u8 = 0x10; @@ -104,11 +145,6 @@ pub const XLOG_TBLSPC_DROP: u8 = 0x10; pub const SIZEOF_XLOGRECORD: u32 = 24; -// from pg_config.h. These can be changed with configure options --with-blocksize=BLOCKSIZE and -// --with-segsize=SEGSIZE, but assume the defaults for now. -pub const BLCKSZ: u16 = 8192; -pub const RELSEG_SIZE: u32 = 1024 * 1024 * 1024 / (BLCKSZ as u32); - // // from xlogrecord.h // @@ -139,3 +175,7 @@ pub const FIRST_NORMAL_OBJECT_ID: u32 = 16384; /* FIXME: pageserver should request wal_seg_size from compute node */ pub const WAL_SEGMENT_SIZE: usize = 16 * 1024 * 1024; + +pub const XLOG_BLCKSZ: usize = 8192; +pub const XLOG_CHECKPOINT_SHUTDOWN: u8 = 0x00; +pub const XLP_LONG_HEADER: u16 = 0x0002; diff --git a/postgres_ffi/src/xlog_utils.rs b/postgres_ffi/src/xlog_utils.rs index aabbf8628b..8648b799d8 100644 --- a/postgres_ffi/src/xlog_utils.rs +++ b/postgres_ffi/src/xlog_utils.rs @@ -13,8 +13,8 @@ use crate::FullTransactionId; use crate::XLogLongPageHeaderData; use crate::XLogPageHeaderData; use crate::XLogRecord; - use crate::XLOG_PAGE_MAGIC; + use byteorder::{ByteOrder, LittleEndian}; use bytes::{Buf, Bytes}; use crc32c::*; diff --git a/postgres_ffi/xlog_ffi.h b/postgres_ffi/xlog_ffi.h new file mode 100644 index 0000000000..68fe9b1bdb --- /dev/null +++ b/postgres_ffi/xlog_ffi.h @@ -0,0 +1,2 @@ +#include "c.h" +#include "access/xlog_internal.h"