Compute and restore pg_xact, pg_multixact and pg_filenode.map files

This commit is contained in:
Konstantin Knizhnik
2021-05-14 16:32:52 +03:00
parent 2870150365
commit 9ece1e863d
8 changed files with 665 additions and 34 deletions

View File

@@ -1,12 +1,207 @@
use crate::ZTimelineId;
use log::*;
use std::io::Write;
use tar::Builder;
use std::sync::Arc;
use std::time::SystemTime;
use tar::{Builder, Header};
use walkdir::WalkDir;
use crate::repository::{BufferTag, RelTag, Timeline};
use postgres_ffi::pg_constants;
use postgres_ffi::relfile_utils::*;
use zenith_utils::lsn::Lsn;
fn new_tar_header(path: &str, size: u64) -> anyhow::Result<Header> {
let mut header = Header::new_gnu();
header.set_size(size);
header.set_path(path)?;
header.set_mode(0b110000000);
header.set_mtime(SystemTime::now().duration_since(SystemTime::UNIX_EPOCH).unwrap().as_secs());
header.set_cksum();
Ok(header)
}
//
// Generate SRLU segment files from repoistory
//
fn add_slru_segments(
ar: &mut Builder<&mut dyn Write>,
timeline: &Arc<dyn Timeline>,
path: &str,
forknum: u8,
lsn: Lsn,
) -> anyhow::Result<()> {
let rel = RelTag {
spcnode: 0,
dbnode: 0,
relnode: 0,
forknum,
};
let (first, last) = timeline.get_range(rel, lsn)?;
const SEG_SIZE: usize =
pg_constants::BLCKSZ as usize * pg_constants::SLRU_PAGES_PER_SEGMENT as usize;
let mut seg_buf = [0u8; SEG_SIZE];
let mut curr_segno: Option<u32> = None;
for page in first..last {
let tag = BufferTag { rel, blknum: page };
let img = timeline.get_page_at_lsn(tag, lsn)?;
// Zero length image indicates truncated segment: just skip it
if img.len() != 0 {
assert!(img.len() == pg_constants::BLCKSZ as usize);
let segno = page / pg_constants::SLRU_PAGES_PER_SEGMENT;
if curr_segno.is_some() && curr_segno.unwrap() != segno {
let segname = format!("{}/{:>04X}", path, curr_segno.unwrap());
let header = new_tar_header(&segname, SEG_SIZE as u64)?;
ar.append(&header, &seg_buf[..])?;
seg_buf.fill(0);
}
curr_segno = Some(segno);
let offs_start = (page % pg_constants::SLRU_PAGES_PER_SEGMENT) as usize
* pg_constants::BLCKSZ as usize;
let offs_end = offs_start + pg_constants::BLCKSZ as usize;
seg_buf[offs_start..offs_end].copy_from_slice(&img);
}
}
if curr_segno.is_some() {
let segname = format!("{}/{:>04X}", path, curr_segno.unwrap());
let header = new_tar_header(&segname, SEG_SIZE as u64)?;
ar.append(&header, &seg_buf[..])?;
}
Ok(())
}
//
// Extract pg_filenode.map files from repoistory
//
fn add_relmap_files(
ar: &mut Builder<&mut dyn Write>,
timeline: &Arc<dyn Timeline>,
lsn: Lsn,
) -> anyhow::Result<()> {
for db in timeline.get_databases()?.iter() {
let tag = BufferTag {
rel: *db,
blknum: 0,
};
let img = timeline.get_page_at_lsn(tag, lsn)?;
let path = if db.spcnode == pg_constants::GLOBALTABLESPACE_OID {
String::from("global/pg_filenode.map")
} else {
// User defined tablespaces are not supported
assert!(db.spcnode == pg_constants::DEFAULTTABLESPACE_OID);
format!("base/{}/pg_filenode.map", db.dbnode)
};
let header = new_tar_header(&path, 512)?;
ar.append(&header, &img[..])?;
}
Ok(())
}
///
/// Generate tarball with non-relational files from repository
///
pub fn send_tarball_at_lsn(
write: &mut dyn Write,
timelineid: ZTimelineId,
timeline: &Arc<dyn Timeline>,
lsn: Lsn,
snapshot_lsn: Lsn,
) -> anyhow::Result<()> {
let mut ar = Builder::new(write);
let snappath = format!("timelines/{}/snapshots/{:016X}", timelineid, snapshot_lsn.0);
let walpath = format!("timelines/{}/wal", timelineid);
debug!("sending tarball of snapshot in {}", snappath);
for entry in WalkDir::new(&snappath) {
let entry = entry?;
let fullpath = entry.path();
let relpath = entry.path().strip_prefix(&snappath).unwrap();
if relpath.to_str().unwrap() == "" {
continue;
}
if entry.file_type().is_dir() {
trace!(
"sending dir {} as {}",
fullpath.display(),
relpath.display()
);
ar.append_dir(relpath, fullpath)?;
} else if entry.file_type().is_symlink() {
error!("ignoring symlink in snapshot dir");
} else if entry.file_type().is_file() {
// Shared catalogs are exempt
if relpath.starts_with("global/") {
trace!("sending shared catalog {}", relpath.display());
ar.append_path_with_name(fullpath, relpath)?;
} else if !is_rel_file_path(relpath.to_str().unwrap()) {
if entry.file_name() != "pg_filenode.map"
&& !relpath.starts_with("pg_xact/")
&& !relpath.starts_with("pg_multixact/")
{
trace!("sending {}", relpath.display());
ar.append_path_with_name(fullpath, relpath)?;
}
} else {
trace!("not sending {}", relpath.display());
}
} else {
error!("unknown file type: {}", fullpath.display());
}
}
add_slru_segments(
&mut ar,
timeline,
"pg_xact",
pg_constants::PG_XACT_FORKNUM,
lsn,
)?;
add_slru_segments(
&mut ar,
timeline,
"pg_multixact/members",
pg_constants::PG_MXACT_MEMBERS_FORKNUM,
lsn,
)?;
add_slru_segments(
&mut ar,
timeline,
"pg_multixact/offsets",
pg_constants::PG_MXACT_OFFSETS_FORKNUM,
lsn,
)?;
add_relmap_files(&mut ar, timeline, lsn)?;
// FIXME: Also send all the WAL. The compute node would only need
// the WAL that applies to non-relation files, because the page
// server handles all the relation files. But we don't have a
// mechanism for separating relation and non-relation WAL at the
// moment.
for entry in std::fs::read_dir(&walpath)? {
let entry = entry?;
let fullpath = &entry.path();
let relpath = fullpath.strip_prefix(&walpath).unwrap();
if !entry.path().is_file() {
continue;
}
let archive_fname = relpath.to_str().unwrap();
let archive_fname = archive_fname
.strip_suffix(".partial")
.unwrap_or(&archive_fname);
let archive_path = "pg_wal/".to_owned() + archive_fname;
ar.append_path_with_name(fullpath, archive_path)?;
}
ar.finish()?;
debug!("all tarred up!");
Ok(())
}
///
/// Send a tarball containing a snapshot of all non-relation files in the
/// PostgreSQL data directory, at given LSN

View File

@@ -653,12 +653,18 @@ impl Connection {
} else if query_string.starts_with(b"basebackup ") {
let (_l, r) = query_string.split_at("basebackup ".len());
let r = r.to_vec();
let timelineid_str = String::from(String::from_utf8(r)?.trim_end());
let basebackup_args = String::from(String::from_utf8(r)?.trim_end());
let args: Vec<&str> = basebackup_args.rsplit(' ').collect();
let timelineid_str = args[0];
info!("got basebackup command: \"{}\"", timelineid_str);
let timelineid = ZTimelineId::from_str(&timelineid_str)?;
let lsn = if args.len() > 1 {
Some(Lsn::from_str(args[1])?)
} else {
None
};
// Check that the timeline exists
self.handle_basebackup_request(timelineid)?;
self.handle_basebackup_request(timelineid, lsn)?;
self.write_message_noflush(&BeMessage::CommandComplete)?;
self.write_message(&BeMessage::ReadyForQuery)?;
} else if query_string.starts_with(b"callmemaybe ") {
@@ -814,16 +820,19 @@ impl Connection {
}
}
fn handle_basebackup_request(&mut self, timelineid: ZTimelineId) -> anyhow::Result<()> {
fn handle_basebackup_request(
&mut self,
timelineid: ZTimelineId,
lsn: Option<Lsn>,
) -> anyhow::Result<()> {
// check that the timeline exists
let repository = page_cache::get_repository();
if repository.get_or_restore_timeline(timelineid).is_err() {
bail!(
let timeline = repository.get_or_restore_timeline(timelineid).map_err(|_| {
anyhow!(
"client requested basebackup on timeline {} which does not exist in page server",
timelineid
);
}
)
})?;
/* switch client to COPYOUT */
let stream = &mut self.stream;
stream.write_u8(b'H')?;
@@ -836,10 +845,16 @@ impl Connection {
/* Send a tarball of the latest snapshot on the timeline */
// find latest snapshot
let snapshotlsn = restore_local_repo::find_latest_snapshot(&self.conf, timelineid).unwrap();
basebackup::send_snapshot_tarball(&mut CopyDataSink { stream }, timelineid, snapshotlsn)?;
let snapshot_lsn =
restore_local_repo::find_latest_snapshot(&self.conf, timelineid).unwrap();
let req_lsn = lsn.unwrap_or(snapshot_lsn);
basebackup::send_tarball_at_lsn(
&mut CopyDataSink { stream },
timelineid,
&timeline,
req_lsn,
snapshot_lsn,
)?;
// CopyDone
self.stream.write_u8(b'c')?;
self.stream.write_u32::<BE>(4)?;

View File

@@ -106,6 +106,13 @@ pub trait Timeline {
/// valid LSN, so that the WAL receiver knows where to restart streaming.
fn advance_last_record_lsn(&self, lsn: Lsn);
fn get_last_record_lsn(&self) -> Lsn;
/// Get range [begin,end) of stored blocks. Used mostly for SMGR pseudorelations
/// but can be also applied to normal relations.
fn get_range(&self, rel: RelTag, lsn: Lsn) -> Result<(u32, u32)>;
/// Get vector of databases (represented using RelTag only dbnode and spcnode fields are used)
fn get_databases(&self) -> Result<Vec<RelTag>>;
}
#[derive(Clone)]
@@ -118,25 +125,25 @@ pub struct RepositoryStats {
#[derive(Debug, PartialEq, Eq, PartialOrd, Hash, Ord, Clone, Copy)]
pub struct RelTag {
pub forknum: u8,
pub spcnode: u32,
pub dbnode: u32,
pub relnode: u32,
pub forknum: u8,
}
impl RelTag {
pub fn pack(&self, buf: &mut BytesMut) {
buf.put_u8(self.forknum);
buf.put_u32(self.spcnode);
buf.put_u32(self.dbnode);
buf.put_u32(self.relnode);
buf.put_u32(self.forknum as u32); // encode forknum as u32 to provide compatibility with wal_redo_postgres
}
pub fn unpack(buf: &mut BytesMut) -> RelTag {
RelTag {
forknum: buf.get_u8(),
spcnode: buf.get_u32(),
dbnode: buf.get_u32(),
relnode: buf.get_u32(),
forknum: buf.get_u32() as u8,
}
}
}

View File

@@ -440,7 +440,6 @@ impl RocksTimeline {
let new_img = self
.walredo_mgr
.request_redo(key.tag, key.lsn, base_img, records)?;
self.put_page_image(key.tag, key.lsn, new_img.clone());
reconstructed += 1;
@@ -629,10 +628,88 @@ impl Timeline for RocksTimeline {
/// Get size of relation at given LSN.
///
fn get_relsize(&self, rel: RelTag, lsn: Lsn) -> Result<u32> {
self.wait_lsn(lsn)?;
let lsn = self.wait_lsn(lsn)?;
self.relsize_get_nowait(rel, lsn)
}
/// Get databases. This function is used to local pg_filenode.map files
fn get_databases(&self) -> Result<Vec<RelTag>> {
let key = CacheKey {
// minimal key
tag: BufferTag {
rel: RelTag {
forknum: pg_constants::PG_FILENODEMAP_FORKNUM,
spcnode: 0,
dbnode: 0,
relnode: 0,
},
blknum: 0,
},
lsn: Lsn(0),
};
let mut buf = BytesMut::new();
key.pack(&mut buf);
let mut dbs = Vec::new();
let mut iter = self.db.raw_iterator();
iter.seek(&buf[..]);
let mut prev_tag = key.tag.rel;
while iter.valid() {
let k = iter.key().unwrap();
buf.clear();
buf.extend_from_slice(&k);
let tag = RelTag::unpack(&mut buf);
if tag.forknum != pg_constants::PG_FILENODEMAP_FORKNUM {
break; // we are done with this fork
}
if tag != prev_tag {
dbs.push(tag); // collect unique tags
prev_tag = tag;
}
iter.next();
}
return Ok(dbs);
}
/// Get range [begin,end) of stored blocks. Used mostly for SMGR pseudorelations
/// but can be also applied to normal relations.
fn get_range(&self, rel: RelTag, lsn: Lsn) -> Result<(u32, u32)> {
let lsn = self.wait_lsn(lsn)?;
let mut key = CacheKey {
// minimal key to start with
tag: BufferTag { rel, blknum: 0 },
lsn,
};
let mut buf = BytesMut::new();
key.pack(&mut buf);
let mut iter = self.db.raw_iterator();
iter.seek(&buf[..]); // locate first entry
if iter.valid() {
let k = iter.key().unwrap();
buf.clear();
buf.extend_from_slice(&k);
let tag = BufferTag::unpack(&mut buf);
if tag.rel == rel {
// still trversing this relation
let first_blknum = tag.blknum;
key.tag.blknum = u32::MAX; // maximal key
buf.clear();
key.pack(&mut buf);
let mut iter = self.db.raw_iterator();
iter.seek_for_prev(&buf[..]); // localte last entry
if iter.valid() {
let k = iter.key().unwrap();
buf.clear();
buf.extend_from_slice(&k);
let tag = BufferTag::unpack(&mut buf);
let last_blknum = tag.blknum;
return Ok((first_blknum, last_blknum + 1)); // upper boundary is exclusive
}
}
}
Ok((0, 0)) // empty range
}
///
/// Does relation exist at given LSN?
///
@@ -731,14 +808,25 @@ impl Timeline for RocksTimeline {
/// Memorize a full image of a page version
///
fn put_page_image(&self, tag: BufferTag, lsn: Lsn, img: Bytes) {
let img_len = img.len();
let key = CacheKey { tag, lsn };
let content = CacheEntryContent::PageImage(img);
let mut key_buf = BytesMut::new();
key.pack(&mut key_buf);
let mut val_buf = BytesMut::new();
content.pack(&mut val_buf);
// Zero size of page image indicates that SLRU page was truncated
if img_len == 0 && key.tag.rel.forknum > pg_constants::PG_XACT_FORKNUM {
if (val_buf[0] & UNUSED_VERSION_FLAG) != 0 {
// records already marked for deletion
return;
} else {
// delete truncated multixact page
val_buf[0] |= UNUSED_VERSION_FLAG;
}
}
trace!("put_wal_record lsn: {}", key.lsn);
let _res = self.db.put(&key_buf[..], &val_buf[..]);

View File

@@ -23,7 +23,7 @@ use anyhow::Result;
use bytes::Bytes;
use crate::repository::{BufferTag, RelTag, Timeline};
use crate::waldecoder::{decode_wal_record, WalStreamDecoder};
use crate::waldecoder::{Oid, decode_wal_record, WalStreamDecoder};
use crate::PageServerConf;
use crate::ZTimelineId;
use postgres_ffi::pg_constants;
@@ -120,8 +120,26 @@ fn restore_snapshot(
None => continue,
// These special files appear in the snapshot, but are not needed by the page server
Some("pg_control") => continue,
Some("pg_filenode.map") => continue,
Some("pg_control") => restore_nonrel_file(
conf,
timeline,
timelineid,
snapshot,
pg_constants::GLOBALTABLESPACE_OID,
0,
pg_constants::PG_CONTROLFILE_FORKNUM,
&direntry.path(),
)?,
Some("pg_filenode.map") => restore_nonrel_file(
conf,
timeline,
timelineid,
snapshot,
pg_constants::GLOBALTABLESPACE_OID,
0,
pg_constants::PG_FILENODEMAP_FORKNUM,
&direntry.path(),
)?,
// Load any relation files into the page server
_ => restore_relfile(
@@ -148,7 +166,16 @@ fn restore_snapshot(
// These special files appear in the snapshot, but are not needed by the page server
Some("PG_VERSION") => continue,
Some("pg_filenode.map") => continue,
Some("pg_filenode.map") => restore_nonrel_file(
conf,
timeline,
timelineid,
snapshot,
pg_constants::DEFAULTTABLESPACE_OID,
dboid,
pg_constants::PG_FILENODEMAP_FORKNUM,
&direntry.path(),
)?,
// Load any relation files into the page server
_ => restore_relfile(
@@ -163,7 +190,7 @@ fn restore_snapshot(
}
for entry in fs::read_dir(snapshotpath.join("pg_xact"))? {
let entry = entry?;
restore_nonrelfile(
restore_slru_file(
conf,
timeline,
timelineid,
@@ -172,6 +199,28 @@ fn restore_snapshot(
&entry.path(),
)?;
}
for entry in fs::read_dir(snapshotpath.join("pg_multixact").join("members"))? {
let entry = entry?;
restore_slru_file(
conf,
timeline,
timelineid,
snapshot,
pg_constants::PG_MXACT_MEMBERS_FORKNUM,
&entry.path(),
)?;
}
for entry in fs::read_dir(snapshotpath.join("pg_multixact").join("offsets"))? {
let entry = entry?;
restore_slru_file(
conf,
timeline,
timelineid,
snapshot,
pg_constants::PG_MXACT_OFFSETS_FORKNUM,
&entry.path(),
)?;
}
// TODO: Scan pg_tblspc
Ok(())
@@ -180,8 +229,8 @@ fn restore_snapshot(
fn restore_relfile(
timeline: &dyn Timeline,
snapshot: &str,
spcoid: u32,
dboid: u32,
spcoid: Oid,
dboid: Oid,
path: &Path,
) -> Result<()> {
let lsn = Lsn::from_hex(snapshot)?;
@@ -239,7 +288,39 @@ fn restore_relfile(
Ok(())
}
fn restore_nonrelfile(
fn restore_nonrel_file(
_conf: &PageServerConf,
timeline: &dyn Timeline,
_timelineid: ZTimelineId,
snapshot: &str,
spcoid: Oid,
dboid: Oid,
forknum: u8,
path: &Path,
) -> Result<()> {
let lsn = Lsn::from_hex(snapshot)?;
// Does it look like a relation file?
let mut file = File::open(path)?;
let mut buffer = Vec::new();
// read the whole file
file.read_to_end(&mut buffer)?;
let tag = BufferTag {
rel: RelTag {
spcnode: spcoid,
dbnode: dboid,
relnode: 0,
forknum,
},
blknum: 0,
};
timeline.put_page_image(tag, lsn, Bytes::copy_from_slice(&buffer[..]));
Ok(())
}
fn restore_slru_file(
_conf: &PageServerConf,
timeline: &dyn Timeline,
_timelineid: ZTimelineId,
@@ -263,7 +344,7 @@ fn restore_nonrelfile(
let tag = BufferTag {
rel: RelTag {
spcnode: 0,
dbnode: 0,
dbnode: 0,
relnode: 0,
forknum,
},
@@ -296,7 +377,7 @@ fn restore_nonrelfile(
Ok(())
}
// Scan WAL on a timeline, starting from gien LSN, and load all the records
// Scan WAL on a timeline, starting from given LSN, and load all the records
// into the page cache.
fn restore_wal(timeline: &dyn Timeline, timelineid: ZTimelineId, startpoint: Lsn) -> Result<()> {
let walpath = format!("timelines/{}/wal", timelineid);

View File

@@ -310,6 +310,9 @@ pub type Oid = u32;
pub type TransactionId = u32;
pub type BlockNumber = u32;
pub type OffsetNumber = u16;
pub type MultiXactId = TransactionId;
pub type MultiXactOffset = u32;
pub type MultiXactStatus = u32;
#[repr(C)]
#[derive(Debug, Clone, Copy)]
@@ -319,6 +322,24 @@ pub struct RelFileNode {
pub relnode: Oid, /* relation */
}
#[repr(C)]
#[derive(Debug)]
pub struct XlRelmapUpdate {
pub dbid: Oid, /* database ID, or 0 for shared map */
pub tsid: Oid, /* database's tablespace, or pg_global */
pub nbytes: i32, /* size of relmap data */
}
impl XlRelmapUpdate {
pub fn decode(buf: &mut Bytes) -> XlRelmapUpdate {
XlRelmapUpdate {
dbid: buf.get_u32_le(),
tsid: buf.get_u32_le(),
nbytes: buf.get_i32_le(),
}
}
}
#[repr(C)]
#[derive(Debug)]
pub struct XlSmgrTruncate {
@@ -441,6 +462,74 @@ impl XlHeapUpdate {
}
}
#[repr(C)]
#[derive(Debug)]
pub struct MultiXactMember {
pub xid: TransactionId,
pub status: MultiXactStatus,
}
impl MultiXactMember {
pub fn decode(buf: &mut Bytes) -> MultiXactMember {
MultiXactMember {
xid: buf.get_u32_le(),
status: buf.get_u32_le(),
}
}
}
#[repr(C)]
#[derive(Debug)]
pub struct XlMultiXactCreate {
pub mid: MultiXactId, /* new MultiXact's ID */
pub moff: MultiXactOffset, /* its starting offset in members file */
pub nmembers: u32, /* number of member XIDs */
pub members: Vec<MultiXactMember>,
}
impl XlMultiXactCreate {
pub fn decode(buf: &mut Bytes) -> XlMultiXactCreate {
let mid = buf.get_u32_le();
let moff = buf.get_u32_le();
let nmembers = buf.get_u32_le();
let mut members = Vec::new();
for _ in 0..nmembers {
members.push(MultiXactMember::decode(buf));
}
XlMultiXactCreate {
mid,
moff,
nmembers,
members,
}
}
}
#[repr(C)]
#[derive(Debug)]
pub struct XlMultiXactTruncate {
oldest_multi_db: Oid,
/* to-be-truncated range of multixact offsets */
start_trunc_off: MultiXactId, /* just for completeness' sake */
end_trunc_off: MultiXactId,
/* to-be-truncated range of multixact members */
start_trunc_memb: MultiXactOffset,
end_trunc_memb: MultiXactOffset,
}
impl XlMultiXactTruncate {
pub fn decode(buf: &mut Bytes) -> XlMultiXactTruncate {
XlMultiXactTruncate {
oldest_multi_db: buf.get_u32_le(),
start_trunc_off: buf.get_u32_le(),
end_trunc_off: buf.get_u32_le(),
start_trunc_memb: buf.get_u32_le(),
end_trunc_memb: buf.get_u32_le(),
}
}
}
//
// Routines to decode a WAL record and figure out which blocks are modified
//
@@ -930,6 +1019,73 @@ pub fn decode_wal_record(record: Bytes) -> DecodedWALRecord {
blocks.push(blk);
}
}
} else if xlogrec.xl_rmid == pg_constants::RM_MULTIXACT_ID {
let info = xlogrec.xl_info & pg_constants::XLOG_XACT_OPMASK;
if info == pg_constants::XLOG_MULTIXACT_ZERO_OFF_PAGE {
let mut blk = DecodedBkpBlock::new();
blk.forknum = pg_constants::PG_MXACT_OFFSETS_FORKNUM;
blk.blkno = buf.get_u32_le();
blk.will_init = true;
blocks.push(blk);
} else if info == pg_constants::XLOG_MULTIXACT_ZERO_MEM_PAGE {
let mut blk = DecodedBkpBlock::new();
blk.forknum = pg_constants::PG_MXACT_MEMBERS_FORKNUM;
blk.blkno = buf.get_u32_le();
blk.will_init = true;
blocks.push(blk);
} else if info == pg_constants::XLOG_MULTIXACT_CREATE_ID {
let xlrec = XlMultiXactCreate::decode(&mut buf);
// Update offset page
let mut blk = DecodedBkpBlock::new();
blk.blkno = xlrec.mid / pg_constants::MULTIXACT_OFFSETS_PER_PAGE as u32;
blk.forknum = pg_constants::PG_MXACT_OFFSETS_FORKNUM;
blocks.push(blk);
let first_mbr_blkno = xlrec.moff / pg_constants::MULTIXACT_MEMBERS_PER_PAGE as u32;
let last_mbr_blkno =
(xlrec.moff + xlrec.nmembers - 1) / pg_constants::MULTIXACT_MEMBERS_PER_PAGE as u32;
for blkno in first_mbr_blkno..=last_mbr_blkno {
// Update members page
let mut blk = DecodedBkpBlock::new();
blk.forknum = pg_constants::PG_MXACT_MEMBERS_FORKNUM;
blk.blkno = blkno;
blocks.push(blk);
}
} else if info == pg_constants::XLOG_MULTIXACT_TRUNCATE_ID {
let xlrec = XlMultiXactTruncate::decode(&mut buf);
let first_off_blkno =
xlrec.start_trunc_off / pg_constants::MULTIXACT_OFFSETS_PER_PAGE as u32;
let last_off_blkno =
xlrec.end_trunc_off / pg_constants::MULTIXACT_OFFSETS_PER_PAGE as u32;
for blkno in first_off_blkno..last_off_blkno {
let mut blk = DecodedBkpBlock::new();
blk.forknum = pg_constants::PG_MXACT_OFFSETS_FORKNUM;
blk.blkno = blkno;
blk.will_init = true;
blocks.push(blk);
}
let first_mbr_blkno =
xlrec.start_trunc_memb / pg_constants::MULTIXACT_MEMBERS_PER_PAGE as u32;
let last_mbr_blkno =
xlrec.end_trunc_memb / pg_constants::MULTIXACT_MEMBERS_PER_PAGE as u32;
for blkno in first_mbr_blkno..last_mbr_blkno {
let mut blk = DecodedBkpBlock::new();
blk.forknum = pg_constants::PG_MXACT_MEMBERS_FORKNUM;
blk.blkno = blkno;
blk.will_init = true;
blocks.push(blk);
}
} else {
assert!(false);
}
} else if xlogrec.xl_rmid == pg_constants::RM_RELMAP_ID {
let xlrec = XlRelmapUpdate::decode(&mut buf);
let mut blk = DecodedBkpBlock::new();
blk.forknum = pg_constants::PG_FILENODEMAP_FORKNUM;
blk.rnode_spcnode = xlrec.tsid;
blk.rnode_dbnode = xlrec.dbid;
blk.rnode_relnode = 0;
blk.will_init = true;
blocks.push(blk);
}
DecodedWALRecord {

View File

@@ -14,6 +14,7 @@
//! TODO: Even though the postgres code runs in a separate process,
//! it's not a secure sandbox.
//!
use byteorder::{ByteOrder, LittleEndian};
use bytes::{Buf, BufMut, Bytes, BytesMut};
use log::*;
use std::assert;
@@ -36,6 +37,7 @@ use zenith_utils::lsn::Lsn;
use crate::repository::BufferTag;
use crate::repository::WALRecord;
use crate::waldecoder::{MultiXactId, XlMultiXactCreate};
use crate::PageServerConf;
use postgres_ffi::pg_constants;
use postgres_ffi::xlog_utils::XLogRecord;
@@ -170,6 +172,25 @@ impl WalRedoManager for PostgresRedoManager {
}
}
fn mx_offset_to_flags_offset(xid: MultiXactId) -> usize {
return ((xid / pg_constants::MULTIXACT_MEMBERS_PER_MEMBERGROUP as u32) as u16
% pg_constants::MULTIXACT_MEMBERGROUPS_PER_PAGE
* pg_constants::MULTIXACT_MEMBERGROUP_SIZE) as usize;
}
fn mx_offset_to_flags_bitshift(xid: MultiXactId) -> u16 {
return (xid as u16) % pg_constants::MULTIXACT_MEMBERS_PER_MEMBERGROUP
* pg_constants::MXACT_MEMBER_BITS_PER_XACT;
}
/* Location (byte offset within page) of TransactionId of given member */
fn mx_offset_to_member_offset(xid: MultiXactId) -> usize {
return mx_offset_to_flags_offset(xid)
+ (pg_constants::MULTIXACT_FLAGBYTES_PER_GROUP
+ (xid as u16 % pg_constants::MULTIXACT_MEMBERS_PER_MEMBERGROUP) * 4)
as usize;
}
///
/// WAL redo thread
///
@@ -255,7 +276,7 @@ impl PostgresRedoManagerInternal {
let start = Instant::now();
let apply_result: Result<Bytes, Error>;
if tag.rel.forknum == pg_constants::PG_XACT_FORKNUM {
if tag.rel.forknum >= pg_constants::PG_XACT_FORKNUM {
const ZERO_PAGE: [u8; 8192] = [0u8; 8192];
let mut page = BytesMut::new();
if let Some(fpi) = base_img {
@@ -344,6 +365,55 @@ impl PostgresRedoManagerInternal {
record.lsn,
record.main_data_offset, record.rec.len());
}
} else if xlogrec.xl_rmid == pg_constants::RM_MULTIXACT_ID {
let info = xlogrec.xl_info & pg_constants::XLOG_XACT_OPMASK;
if info == pg_constants::XLOG_MULTIXACT_ZERO_OFF_PAGE {
page.fill(0);
} else if info == pg_constants::XLOG_MULTIXACT_ZERO_MEM_PAGE {
page.fill(0);
} else if info == pg_constants::XLOG_MULTIXACT_CREATE_ID {
let xlrec = XlMultiXactCreate::decode(&mut buf);
if tag.rel.forknum == pg_constants::PG_MXACT_OFFSETS_FORKNUM {
let offs = (xlrec.mid % pg_constants::MULTIXACT_OFFSETS_PER_PAGE as u32
* 4) as usize;
LittleEndian::write_u32(&mut page[offs..offs + 4], xlrec.moff);
} else {
assert!(tag.rel.forknum == pg_constants::PG_MXACT_MEMBERS_FORKNUM);
for i in 0..xlrec.nmembers {
let blkno = i / pg_constants::MULTIXACT_MEMBERS_PER_PAGE as u32;
if blkno == tag.blknum {
// update only target block
let offset = xlrec.moff + i;
let memberoff = mx_offset_to_member_offset(offset);
let flagsoff = mx_offset_to_flags_offset(offset);
let bshift = mx_offset_to_flags_bitshift(offset);
let mut flagsval =
LittleEndian::read_u32(&page[flagsoff..flagsoff + 4]);
flagsval &=
!(((1 << pg_constants::MXACT_MEMBER_BITS_PER_XACT) - 1)
<< bshift);
flagsval |= xlrec.members[i as usize].status << bshift;
LittleEndian::write_u32(
&mut page[flagsoff..flagsoff + 4],
flagsval,
);
LittleEndian::write_u32(
&mut page[memberoff..memberoff + 4],
xlrec.members[i as usize].xid,
);
}
}
}
} else if info == pg_constants::XLOG_MULTIXACT_TRUNCATE_ID {
// empty page image indicates that this SLRU page is truncated and can be removed by GC
page.clear();
} else {
assert!(false);
}
} else if xlogrec.xl_rmid == pg_constants::RM_RELMAP_ID {
page.clear();
page.extend_from_slice(&buf[..]);
assert!(page.len() == 512); // size of pg_filenode.map
}
}
@@ -537,7 +607,7 @@ impl PostgresRedoProcess {
// explanation of the protocol.
fn build_begin_redo_for_block_msg(tag: BufferTag) -> Bytes {
let len = 4 + 5 * 4;
let len = 4 + 1 + 4 * 4;
let mut buf = BytesMut::with_capacity(1 + len);
buf.put_u8(b'B');
@@ -552,7 +622,7 @@ fn build_begin_redo_for_block_msg(tag: BufferTag) -> Bytes {
fn build_push_page_msg(tag: BufferTag, base_img: Bytes) -> Bytes {
assert!(base_img.len() == 8192);
let len = 4 + 5 * 4 + base_img.len();
let len = 4 + 1 + 4 * 4 + base_img.len();
let mut buf = BytesMut::with_capacity(1 + len);
buf.put_u8(b'P');
@@ -580,7 +650,7 @@ fn build_apply_record_msg(endlsn: Lsn, rec: Bytes) -> Bytes {
}
fn build_get_page_msg(tag: BufferTag) -> Bytes {
let len = 4 + 5 * 4;
let len = 4 + 1 + 4 * 4;
let mut buf = BytesMut::with_capacity(1 + len);
buf.put_u8(b'G');

View File

@@ -77,6 +77,25 @@ pub const XACT_XINFO_HAS_TWOPHASE: u32 = 1u32 << 4;
pub const XLOG_SWITCH: u8 = 0x40;
pub const XLOG_SMGR_TRUNCATE: u8 = 0x20;
// From multixact.h
pub const XLOG_MULTIXACT_ZERO_OFF_PAGE: u8 = 0x00;
pub const XLOG_MULTIXACT_ZERO_MEM_PAGE: u8 = 0x10;
pub const XLOG_MULTIXACT_CREATE_ID: u8 = 0x20;
pub const XLOG_MULTIXACT_TRUNCATE_ID: u8 = 0x30;
pub const MULTIXACT_OFFSETS_PER_PAGE: u16 = BLCKSZ / 4;
pub const MXACT_MEMBER_BITS_PER_XACT: u16 = 8;
pub const MXACT_MEMBER_FLAGS_PER_BYTE: u16 = 1;
pub const MULTIXACT_FLAGBYTES_PER_GROUP: u16 = 4;
pub const MULTIXACT_MEMBERS_PER_MEMBERGROUP: u16 =
MULTIXACT_FLAGBYTES_PER_GROUP * MXACT_MEMBER_FLAGS_PER_BYTE;
/* size in bytes of a complete group */
pub const MULTIXACT_MEMBERGROUP_SIZE: u16 =
4 * MULTIXACT_MEMBERS_PER_MEMBERGROUP + MULTIXACT_FLAGBYTES_PER_GROUP;
pub const MULTIXACT_MEMBERGROUPS_PER_PAGE: u16 = BLCKSZ / MULTIXACT_MEMBERGROUP_SIZE;
pub const MULTIXACT_MEMBERS_PER_PAGE: u16 =
MULTIXACT_MEMBERGROUPS_PER_PAGE * MULTIXACT_MEMBERS_PER_MEMBERGROUP;
// From heapam_xlog.h
pub const XLOG_HEAP_INSERT: u8 = 0x00;
pub const XLOG_HEAP_DELETE: u8 = 0x10;