From e7587ceb81bb8a54c6c775351024a2fe175d2a1b Mon Sep 17 00:00:00 2001 From: Konstantin Knizhnik Date: Mon, 7 Jun 2021 11:34:00 +0300 Subject: [PATCH] Makecheckpoint and pg_control records in object storage also versioned --- pageserver/src/basebackup.rs | 22 ++++++++++++++-------- pageserver/src/object_repository.rs | 5 ++--- pageserver/src/repository.rs | 13 +------------ pageserver/src/restore_local_repo.rs | 16 +++++++--------- pageserver/src/walreceiver.rs | 4 ++-- 5 files changed, 26 insertions(+), 34 deletions(-) diff --git a/pageserver/src/basebackup.rs b/pageserver/src/basebackup.rs index d3fbf14c92..a6d0f35cca 100644 --- a/pageserver/src/basebackup.rs +++ b/pageserver/src/basebackup.rs @@ -4,6 +4,10 @@ //! TODO: this module has nothing to do with PostgreSQL pg_basebackup. //! It could use a better name. //! +//! Stateless Postgres compute node is lauched by sending taball which contains on-relational data (multixacts, clog, filenodemaps, twophase files) +//! and generate pg_control and dummy segment of WAL. This module is responsible for creation of such tarball from snapshot directry and +//! data stored in object storage. +//! use crate::ZTimelineId; use bytes::{BufMut, BytesMut}; use log::*; @@ -20,6 +24,9 @@ use postgres_ffi::xlog_utils::*; use postgres_ffi::*; use zenith_utils::lsn::Lsn; +/// This is shorliving object only for the time of tarball creation, +/// created mostly to avoid passing a lot of parameters between varyouds functions +/// used for constructing tarball. pub struct Basebackup<'a> { ar: Builder<&'a mut dyn Write>, timeline: &'a Arc, @@ -49,7 +56,7 @@ impl<'a> Basebackup<'a> { } } - #[rustfmt::skip] + #[rustfmt::skip] // otherwise "cargo fmt" produce very strange formatting for macch arms of self.timeline.list_nonrels pub fn send_tarball(&mut self) -> anyhow::Result<()> { debug!("sending tarball of snapshot in {}", self.snappath); for entry in WalkDir::new(&self.snappath) { @@ -76,15 +83,14 @@ impl<'a> Basebackup<'a> { trace!("sending shared catalog {}", relpath.display()); self.ar.append_path_with_name(fullpath, relpath)?; } else if !is_rel_file_path(relpath.to_str().unwrap()) { - if entry.file_name() != "pg_filenode.map" - && entry.file_name() != "pg_control" + if entry.file_name() != "pg_filenode.map" // this files will be generated from object storage && !relpath.starts_with("pg_xact/") && !relpath.starts_with("pg_multixact/") { trace!("sending {}", relpath.display()); self.ar.append_path_with_name(fullpath, relpath)?; } - } else { + } else { // relation pages are loaded on demand and should not be included in tarball trace!("not sending {}", relpath.display()); } } else { @@ -92,6 +98,7 @@ impl<'a> Basebackup<'a> { } } + // Generate non-relational files. for obj in self.timeline.list_nonrels(self.lsn)? { match obj { ObjectTag::Clog(slru) => @@ -107,7 +114,7 @@ impl<'a> Basebackup<'a> { _ => {} } } - self.finish_slru_segment()?; + self.finish_slru_segment()?; // write last non-completed Slru segment (if any) self.add_pgcontrol_file()?; self.ar.finish()?; debug!("all tarred up!"); @@ -194,13 +201,12 @@ impl<'a> Basebackup<'a> { // Add generated pg_control file // fn add_pgcontrol_file(&mut self) -> anyhow::Result<()> { - let most_recent_lsn = Lsn(0); let checkpoint_bytes = self .timeline - .get_page_at_lsn_nowait(ObjectTag::Checkpoint, most_recent_lsn)?; + .get_page_at_lsn_nowait(ObjectTag::Checkpoint, self.lsn)?; let pg_control_bytes = self .timeline - .get_page_at_lsn_nowait(ObjectTag::ControlFile, most_recent_lsn)?; + .get_page_at_lsn_nowait(ObjectTag::ControlFile, self.lsn)?; let mut pg_control = postgres_ffi::decode_pg_control(pg_control_bytes)?; let mut checkpoint = postgres_ffi::decode_checkpoint(checkpoint_bytes)?; // Here starts pg_resetwal inspired magic diff --git a/pageserver/src/object_repository.rs b/pageserver/src/object_repository.rs index ec7f1d31f6..1979063e33 100644 --- a/pageserver/src/object_repository.rs +++ b/pageserver/src/object_repository.rs @@ -163,8 +163,7 @@ impl Repository for ObjectRepository { let img = src_timeline.get_page_at_lsn_nowait(tag, at_lsn)?; let val = ObjectValue::Page(img); let key = ObjectKey { timeline: dst, tag }; - let lsn = if tag.is_versioned() { at_lsn } else { Lsn(0) }; - self.obj_store.put(&key, lsn, &ObjectValue::ser(&val)?)?; + self.obj_store.put(&key, at_lsn, &ObjectValue::ser(&val)?)?; } } } @@ -935,7 +934,7 @@ enum ObjectValue { /// RelationSize. We store it separately not only to ansver nblocks requests faster. /// We also need it to support relation truncation. RelationSize(u32), - /// TODO Add a comment + /// Tombstone for a dropped relation. Unlink, TimelineMetadata(MetadataEntry), } diff --git a/pageserver/src/repository.rs b/pageserver/src/repository.rs index 693f2ff2a9..b807740576 100644 --- a/pageserver/src/repository.rs +++ b/pageserver/src/repository.rs @@ -276,17 +276,6 @@ pub enum ObjectTag { RelationBuffer(BufferTag), } -impl ObjectTag { - pub fn is_versioned(&self) -> bool { - match self { - ObjectTag::Checkpoint => false, - ObjectTag::ControlFile => false, - ObjectTag::TimelineMetadataTag => false, - _ => true, - } - } -} - #[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)] pub struct WALRecord { pub lsn: Lsn, // LSN at the *end* of the record @@ -537,7 +526,7 @@ mod tests { assert_eq!(None, snapshot.next().transpose()?); // add a page and advance the last valid LSN - let rel = TESTREL_A; + let rel = TESTREL_A; let buf = TEST_BUF(1); tline.put_page_image(buf, Lsn(1), TEST_IMG("blk 1 @ lsn 1"))?; tline.advance_last_valid_lsn(Lsn(1)); diff --git a/pageserver/src/restore_local_repo.rs b/pageserver/src/restore_local_repo.rs index d8cf0918ca..b50e316431 100644 --- a/pageserver/src/restore_local_repo.rs +++ b/pageserver/src/restore_local_repo.rs @@ -66,12 +66,9 @@ pub fn import_timeline_from_postgres_datadir( None => continue, // These special files appear in the snapshot, but are not needed by the page server - Some("pg_control") => import_nonrel_file( - timeline, - Lsn(0), // control file is not versioned - ObjectTag::ControlFile, - &direntry.path(), - )?, + Some("pg_control") => { + import_nonrel_file(timeline, lsn, ObjectTag::ControlFile, &direntry.path())? + } Some("pg_filenode.map") => import_nonrel_file( timeline, lsn, @@ -290,10 +287,11 @@ pub fn import_timeline_wal(walpath: &Path, timeline: &dyn Timeline, startpoint: let mut offset = startpoint.segment_offset(pg_constants::WAL_SEGMENT_SIZE); let mut last_lsn = startpoint; - let checkpoint_bytes = timeline.get_page_at_lsn_nowait(ObjectTag::Checkpoint, Lsn(0))?; + let checkpoint_bytes = timeline.get_page_at_lsn_nowait(ObjectTag::Checkpoint, startpoint)?; let mut checkpoint = decode_checkpoint(checkpoint_bytes)?; if checkpoint.nextXid.value == 0 { - let pg_control_bytes = timeline.get_page_at_lsn_nowait(ObjectTag::ControlFile, Lsn(0))?; + let pg_control_bytes = + timeline.get_page_at_lsn_nowait(ObjectTag::ControlFile, startpoint)?; let pg_control = decode_pg_control(pg_control_bytes)?; checkpoint = pg_control.checkPointCopy; } @@ -361,7 +359,7 @@ pub fn import_timeline_wal(walpath: &Path, timeline: &dyn Timeline, startpoint: } info!("reached end of WAL at {}", last_lsn); let checkpoint_bytes = encode_checkpoint(checkpoint); - timeline.put_page_image(ObjectTag::Checkpoint, Lsn(0), checkpoint_bytes)?; + timeline.put_page_image(ObjectTag::Checkpoint, last_lsn, checkpoint_bytes)?; Ok(()) } diff --git a/pageserver/src/walreceiver.rs b/pageserver/src/walreceiver.rs index 84ca9f8db7..b417982967 100644 --- a/pageserver/src/walreceiver.rs +++ b/pageserver/src/walreceiver.rs @@ -174,7 +174,7 @@ fn walreceiver_main( let mut waldecoder = WalStreamDecoder::new(startpoint); - let checkpoint_bytes = timeline.get_page_at_lsn_nowait(ObjectTag::Checkpoint, Lsn(0))?; + let checkpoint_bytes = timeline.get_page_at_lsn_nowait(ObjectTag::Checkpoint, startpoint)?; let mut checkpoint = decode_checkpoint(checkpoint_bytes)?; trace!("CheckPoint.nextXid = {}", checkpoint.nextXid.value); @@ -204,7 +204,7 @@ fn walreceiver_main( if new_checkpoint_bytes != old_checkpoint_bytes { timeline.put_page_image( ObjectTag::Checkpoint, - Lsn(0), + lsn, new_checkpoint_bytes, )?; }