Refactring after Heikki review

This commit is contained in:
Konstantin Knizhnik
2021-07-09 14:40:55 +03:00
parent eb0a56eb22
commit 3cded20662
6 changed files with 26 additions and 14 deletions

View File

@@ -4,8 +4,8 @@
//! TODO: this module has nothing to do with PostgreSQL pg_basebackup.
//! It could use a better name.
//!
//! Stateless Postgres compute node is lauched by sending taball which contains on-relational data (multixacts, clog, filenodemaps, twophase files)
//! and generate pg_control and dummy segment of WAL. This module is responsible for creation of such tarball from snapshot directry and
//! Stateless Postgres compute node is launched by sending tarball which contains non-relational data (multixacts, clog, filenodemaps, twophase files)
//! and generate pg_control and dummy segment of WAL. This module is responsible for creation of such tarball from snapshot directory and
//! data stored in object storage.
//!
use crate::ZTimelineId;

View File

@@ -289,6 +289,8 @@ pub fn import_timeline_wal(walpath: &Path, timeline: &dyn Timeline, startpoint:
let checkpoint_bytes = timeline.get_page_at_lsn_nowait(ObjectTag::Checkpoint, startpoint)?;
let mut checkpoint = CheckPoint::decode(&checkpoint_bytes)?;
// get_page_at_lsn_nowait returns pages with zeros when object is not found in the storage.
// nextXid can not be zero, so this check is used to detect situation when checkpoint record needs to be initialized.
if checkpoint.nextXid.value == 0 {
let pg_control_bytes =
timeline.get_page_at_lsn_nowait(ObjectTag::ControlFile, startpoint)?;
@@ -505,9 +507,7 @@ pub fn save_decoded_record(
let info = decoded.xl_info & pg_constants::XLR_RMGR_INFO_MASK;
if info == pg_constants::XLOG_NEXTOID {
let next_oid = buf.get_u32_le();
if next_oid > checkpoint.nextOid {
checkpoint.nextOid = next_oid;
}
checkpoint.nextOid = next_oid;
} else if info == pg_constants::XLOG_CHECKPOINT_ONLINE
|| info == pg_constants::XLOG_CHECKPOINT_SHUTDOWN
{
@@ -764,10 +764,13 @@ fn save_multixact_create_record(
if xlrec.moff + xlrec.nmembers > checkpoint.nextMultiOffset {
checkpoint.nextMultiOffset = xlrec.moff + xlrec.nmembers;
}
let max_mbr_xid = xlrec
.members
.iter()
.fold(0u32, |acc, mbr| if mbr.xid > acc { mbr.xid } else { acc });
let max_mbr_xid = xlrec.members.iter().fold(0u32, |acc, mbr| {
if mbr.xid.wrapping_sub(acc) as i32 > 0 {
mbr.xid
} else {
acc
}
});
checkpoint.update_next_xid(max_mbr_xid);
Ok(())
}

View File

@@ -190,6 +190,7 @@ fn walreceiver_main(
waldecoder.feed_bytes(data);
while let Some((lsn, recdata)) = waldecoder.poll_decode()? {
// Save old checkpoint value to compare with it after decoding WAL record
let old_checkpoint_bytes = checkpoint.encode();
let decoded = decode_wal_record(recdata.clone());
restore_local_repo::save_decoded_record(
@@ -202,6 +203,7 @@ fn walreceiver_main(
last_rec_lsn = lsn;
let new_checkpoint_bytes = checkpoint.encode();
// Check if checkpoint data was updated by save_decoded_record
if new_checkpoint_bytes != old_checkpoint_bytes {
timeline.put_page_image(
ObjectTag::Checkpoint,

View File

@@ -317,9 +317,9 @@ impl PostgresRedoManagerInternal {
ObjectTag::Clog(slru) => slru.blknum,
ObjectTag::TwoPhase(_) => {
assert!(info == pg_constants::XLOG_XACT_PREPARE);
trace!("Apply prepare {} record", xlogrec.xl_xid);
page.clear();
page.extend_from_slice(&buf[..]);
trace!("Apply prepare {} record", xlogrec.xl_xid);
page.clear();
page.extend_from_slice(&buf[..]);
continue;
}
_ => panic!("Not valid XACT object tag {:?}", tag),
@@ -371,7 +371,7 @@ impl PostgresRedoManagerInternal {
if info == pg_constants::XLOG_MULTIXACT_ZERO_OFF_PAGE
|| info == pg_constants::XLOG_MULTIXACT_ZERO_MEM_PAGE
{
// Just need to ero page
// Just need to zero page
page.copy_from_slice(&ZERO_PAGE);
} else if info == pg_constants::XLOG_MULTIXACT_CREATE_ID {
let xlrec = XlMultiXactCreate::decode(&mut buf);

View File

@@ -390,6 +390,13 @@ impl CheckPoint {
}
}
//
// Generate new WAL segment with single XLOG_CHECKPOINT_SHUTDOWN record.
// We need this segment to start compute node.
// In order to minimize changes in Postgres core, we prefer to
// provide WAL segment from which is can extract checkpoint record in standard way,
// rather then implement some alternative mechanism.
//
pub fn generate_wal_segment(pg_control: &ControlFileData) -> Bytes {
let mut seg_buf = BytesMut::with_capacity(pg_constants::WAL_SEGMENT_SIZE as usize);