Makecheckpoint and pg_control records in object storage also versioned

This commit is contained in:
Konstantin Knizhnik
2021-06-07 11:34:00 +03:00
parent 6d38b9ce6a
commit e7587ceb81
5 changed files with 26 additions and 34 deletions

View File

@@ -4,6 +4,10 @@
//! TODO: this module has nothing to do with PostgreSQL pg_basebackup.
//! It could use a better name.
//!
//! Stateless Postgres compute node is lauched by sending taball which contains on-relational data (multixacts, clog, filenodemaps, twophase files)
//! and generate pg_control and dummy segment of WAL. This module is responsible for creation of such tarball from snapshot directry and
//! data stored in object storage.
//!
use crate::ZTimelineId;
use bytes::{BufMut, BytesMut};
use log::*;
@@ -20,6 +24,9 @@ use postgres_ffi::xlog_utils::*;
use postgres_ffi::*;
use zenith_utils::lsn::Lsn;
/// This is shorliving object only for the time of tarball creation,
/// created mostly to avoid passing a lot of parameters between varyouds functions
/// used for constructing tarball.
pub struct Basebackup<'a> {
ar: Builder<&'a mut dyn Write>,
timeline: &'a Arc<dyn Timeline>,
@@ -49,7 +56,7 @@ impl<'a> Basebackup<'a> {
}
}
#[rustfmt::skip]
#[rustfmt::skip] // otherwise "cargo fmt" produce very strange formatting for macch arms of self.timeline.list_nonrels
pub fn send_tarball(&mut self) -> anyhow::Result<()> {
debug!("sending tarball of snapshot in {}", self.snappath);
for entry in WalkDir::new(&self.snappath) {
@@ -76,15 +83,14 @@ impl<'a> Basebackup<'a> {
trace!("sending shared catalog {}", relpath.display());
self.ar.append_path_with_name(fullpath, relpath)?;
} else if !is_rel_file_path(relpath.to_str().unwrap()) {
if entry.file_name() != "pg_filenode.map"
&& entry.file_name() != "pg_control"
if entry.file_name() != "pg_filenode.map" // this files will be generated from object storage
&& !relpath.starts_with("pg_xact/")
&& !relpath.starts_with("pg_multixact/")
{
trace!("sending {}", relpath.display());
self.ar.append_path_with_name(fullpath, relpath)?;
}
} else {
} else { // relation pages are loaded on demand and should not be included in tarball
trace!("not sending {}", relpath.display());
}
} else {
@@ -92,6 +98,7 @@ impl<'a> Basebackup<'a> {
}
}
// Generate non-relational files.
for obj in self.timeline.list_nonrels(self.lsn)? {
match obj {
ObjectTag::Clog(slru) =>
@@ -107,7 +114,7 @@ impl<'a> Basebackup<'a> {
_ => {}
}
}
self.finish_slru_segment()?;
self.finish_slru_segment()?; // write last non-completed Slru segment (if any)
self.add_pgcontrol_file()?;
self.ar.finish()?;
debug!("all tarred up!");
@@ -194,13 +201,12 @@ impl<'a> Basebackup<'a> {
// Add generated pg_control file
//
fn add_pgcontrol_file(&mut self) -> anyhow::Result<()> {
let most_recent_lsn = Lsn(0);
let checkpoint_bytes = self
.timeline
.get_page_at_lsn_nowait(ObjectTag::Checkpoint, most_recent_lsn)?;
.get_page_at_lsn_nowait(ObjectTag::Checkpoint, self.lsn)?;
let pg_control_bytes = self
.timeline
.get_page_at_lsn_nowait(ObjectTag::ControlFile, most_recent_lsn)?;
.get_page_at_lsn_nowait(ObjectTag::ControlFile, self.lsn)?;
let mut pg_control = postgres_ffi::decode_pg_control(pg_control_bytes)?;
let mut checkpoint = postgres_ffi::decode_checkpoint(checkpoint_bytes)?;
// Here starts pg_resetwal inspired magic

View File

@@ -163,8 +163,7 @@ impl Repository for ObjectRepository {
let img = src_timeline.get_page_at_lsn_nowait(tag, at_lsn)?;
let val = ObjectValue::Page(img);
let key = ObjectKey { timeline: dst, tag };
let lsn = if tag.is_versioned() { at_lsn } else { Lsn(0) };
self.obj_store.put(&key, lsn, &ObjectValue::ser(&val)?)?;
self.obj_store.put(&key, at_lsn, &ObjectValue::ser(&val)?)?;
}
}
}
@@ -935,7 +934,7 @@ enum ObjectValue {
/// RelationSize. We store it separately not only to ansver nblocks requests faster.
/// We also need it to support relation truncation.
RelationSize(u32),
/// TODO Add a comment
/// Tombstone for a dropped relation.
Unlink,
TimelineMetadata(MetadataEntry),
}

View File

@@ -276,17 +276,6 @@ pub enum ObjectTag {
RelationBuffer(BufferTag),
}
impl ObjectTag {
pub fn is_versioned(&self) -> bool {
match self {
ObjectTag::Checkpoint => false,
ObjectTag::ControlFile => false,
ObjectTag::TimelineMetadataTag => false,
_ => true,
}
}
}
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
pub struct WALRecord {
pub lsn: Lsn, // LSN at the *end* of the record
@@ -537,7 +526,7 @@ mod tests {
assert_eq!(None, snapshot.next().transpose()?);
// add a page and advance the last valid LSN
let rel = TESTREL_A;
let rel = TESTREL_A;
let buf = TEST_BUF(1);
tline.put_page_image(buf, Lsn(1), TEST_IMG("blk 1 @ lsn 1"))?;
tline.advance_last_valid_lsn(Lsn(1));

View File

@@ -66,12 +66,9 @@ pub fn import_timeline_from_postgres_datadir(
None => continue,
// These special files appear in the snapshot, but are not needed by the page server
Some("pg_control") => import_nonrel_file(
timeline,
Lsn(0), // control file is not versioned
ObjectTag::ControlFile,
&direntry.path(),
)?,
Some("pg_control") => {
import_nonrel_file(timeline, lsn, ObjectTag::ControlFile, &direntry.path())?
}
Some("pg_filenode.map") => import_nonrel_file(
timeline,
lsn,
@@ -290,10 +287,11 @@ pub fn import_timeline_wal(walpath: &Path, timeline: &dyn Timeline, startpoint:
let mut offset = startpoint.segment_offset(pg_constants::WAL_SEGMENT_SIZE);
let mut last_lsn = startpoint;
let checkpoint_bytes = timeline.get_page_at_lsn_nowait(ObjectTag::Checkpoint, Lsn(0))?;
let checkpoint_bytes = timeline.get_page_at_lsn_nowait(ObjectTag::Checkpoint, startpoint)?;
let mut checkpoint = decode_checkpoint(checkpoint_bytes)?;
if checkpoint.nextXid.value == 0 {
let pg_control_bytes = timeline.get_page_at_lsn_nowait(ObjectTag::ControlFile, Lsn(0))?;
let pg_control_bytes =
timeline.get_page_at_lsn_nowait(ObjectTag::ControlFile, startpoint)?;
let pg_control = decode_pg_control(pg_control_bytes)?;
checkpoint = pg_control.checkPointCopy;
}
@@ -361,7 +359,7 @@ pub fn import_timeline_wal(walpath: &Path, timeline: &dyn Timeline, startpoint:
}
info!("reached end of WAL at {}", last_lsn);
let checkpoint_bytes = encode_checkpoint(checkpoint);
timeline.put_page_image(ObjectTag::Checkpoint, Lsn(0), checkpoint_bytes)?;
timeline.put_page_image(ObjectTag::Checkpoint, last_lsn, checkpoint_bytes)?;
Ok(())
}

View File

@@ -174,7 +174,7 @@ fn walreceiver_main(
let mut waldecoder = WalStreamDecoder::new(startpoint);
let checkpoint_bytes = timeline.get_page_at_lsn_nowait(ObjectTag::Checkpoint, Lsn(0))?;
let checkpoint_bytes = timeline.get_page_at_lsn_nowait(ObjectTag::Checkpoint, startpoint)?;
let mut checkpoint = decode_checkpoint(checkpoint_bytes)?;
trace!("CheckPoint.nextXid = {}", checkpoint.nextXid.value);
@@ -204,7 +204,7 @@ fn walreceiver_main(
if new_checkpoint_bytes != old_checkpoint_bytes {
timeline.put_page_image(
ObjectTag::Checkpoint,
Lsn(0),
lsn,
new_checkpoint_bytes,
)?;
}