mirror of
https://github.com/neondatabase/neon.git
synced 2025-12-27 16:12:56 +00:00
Compute and restore pg_xact, pg_multixact and pg_filenode.map files
This commit is contained in:
@@ -1,12 +1,207 @@
|
||||
use crate::ZTimelineId;
|
||||
use log::*;
|
||||
use std::io::Write;
|
||||
use tar::Builder;
|
||||
use std::sync::Arc;
|
||||
use std::time::SystemTime;
|
||||
use tar::{Builder, Header};
|
||||
use walkdir::WalkDir;
|
||||
|
||||
use crate::repository::{BufferTag, RelTag, Timeline};
|
||||
use postgres_ffi::pg_constants;
|
||||
use postgres_ffi::relfile_utils::*;
|
||||
use zenith_utils::lsn::Lsn;
|
||||
|
||||
fn new_tar_header(path: &str, size: u64) -> anyhow::Result<Header> {
|
||||
let mut header = Header::new_gnu();
|
||||
header.set_size(size);
|
||||
header.set_path(path)?;
|
||||
header.set_mode(0b110000000);
|
||||
header.set_mtime(SystemTime::now().duration_since(SystemTime::UNIX_EPOCH).unwrap().as_secs());
|
||||
header.set_cksum();
|
||||
Ok(header)
|
||||
}
|
||||
|
||||
//
|
||||
// Generate SRLU segment files from repoistory
|
||||
//
|
||||
fn add_slru_segments(
|
||||
ar: &mut Builder<&mut dyn Write>,
|
||||
timeline: &Arc<dyn Timeline>,
|
||||
path: &str,
|
||||
forknum: u8,
|
||||
lsn: Lsn,
|
||||
) -> anyhow::Result<()> {
|
||||
let rel = RelTag {
|
||||
spcnode: 0,
|
||||
dbnode: 0,
|
||||
relnode: 0,
|
||||
forknum,
|
||||
};
|
||||
let (first, last) = timeline.get_range(rel, lsn)?;
|
||||
const SEG_SIZE: usize =
|
||||
pg_constants::BLCKSZ as usize * pg_constants::SLRU_PAGES_PER_SEGMENT as usize;
|
||||
let mut seg_buf = [0u8; SEG_SIZE];
|
||||
let mut curr_segno: Option<u32> = None;
|
||||
for page in first..last {
|
||||
let tag = BufferTag { rel, blknum: page };
|
||||
let img = timeline.get_page_at_lsn(tag, lsn)?;
|
||||
// Zero length image indicates truncated segment: just skip it
|
||||
if img.len() != 0 {
|
||||
assert!(img.len() == pg_constants::BLCKSZ as usize);
|
||||
|
||||
let segno = page / pg_constants::SLRU_PAGES_PER_SEGMENT;
|
||||
if curr_segno.is_some() && curr_segno.unwrap() != segno {
|
||||
let segname = format!("{}/{:>04X}", path, curr_segno.unwrap());
|
||||
let header = new_tar_header(&segname, SEG_SIZE as u64)?;
|
||||
ar.append(&header, &seg_buf[..])?;
|
||||
seg_buf.fill(0);
|
||||
}
|
||||
curr_segno = Some(segno);
|
||||
let offs_start = (page % pg_constants::SLRU_PAGES_PER_SEGMENT) as usize
|
||||
* pg_constants::BLCKSZ as usize;
|
||||
let offs_end = offs_start + pg_constants::BLCKSZ as usize;
|
||||
seg_buf[offs_start..offs_end].copy_from_slice(&img);
|
||||
}
|
||||
}
|
||||
if curr_segno.is_some() {
|
||||
let segname = format!("{}/{:>04X}", path, curr_segno.unwrap());
|
||||
let header = new_tar_header(&segname, SEG_SIZE as u64)?;
|
||||
ar.append(&header, &seg_buf[..])?;
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
//
|
||||
// Extract pg_filenode.map files from repoistory
|
||||
//
|
||||
fn add_relmap_files(
|
||||
ar: &mut Builder<&mut dyn Write>,
|
||||
timeline: &Arc<dyn Timeline>,
|
||||
lsn: Lsn,
|
||||
) -> anyhow::Result<()> {
|
||||
for db in timeline.get_databases()?.iter() {
|
||||
let tag = BufferTag {
|
||||
rel: *db,
|
||||
blknum: 0,
|
||||
};
|
||||
let img = timeline.get_page_at_lsn(tag, lsn)?;
|
||||
let path = if db.spcnode == pg_constants::GLOBALTABLESPACE_OID {
|
||||
String::from("global/pg_filenode.map")
|
||||
} else {
|
||||
// User defined tablespaces are not supported
|
||||
assert!(db.spcnode == pg_constants::DEFAULTTABLESPACE_OID);
|
||||
format!("base/{}/pg_filenode.map", db.dbnode)
|
||||
};
|
||||
let header = new_tar_header(&path, 512)?;
|
||||
ar.append(&header, &img[..])?;
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
///
|
||||
/// Generate tarball with non-relational files from repository
|
||||
///
|
||||
pub fn send_tarball_at_lsn(
|
||||
write: &mut dyn Write,
|
||||
timelineid: ZTimelineId,
|
||||
timeline: &Arc<dyn Timeline>,
|
||||
lsn: Lsn,
|
||||
snapshot_lsn: Lsn,
|
||||
) -> anyhow::Result<()> {
|
||||
let mut ar = Builder::new(write);
|
||||
|
||||
let snappath = format!("timelines/{}/snapshots/{:016X}", timelineid, snapshot_lsn.0);
|
||||
let walpath = format!("timelines/{}/wal", timelineid);
|
||||
|
||||
debug!("sending tarball of snapshot in {}", snappath);
|
||||
for entry in WalkDir::new(&snappath) {
|
||||
let entry = entry?;
|
||||
let fullpath = entry.path();
|
||||
let relpath = entry.path().strip_prefix(&snappath).unwrap();
|
||||
|
||||
if relpath.to_str().unwrap() == "" {
|
||||
continue;
|
||||
}
|
||||
|
||||
if entry.file_type().is_dir() {
|
||||
trace!(
|
||||
"sending dir {} as {}",
|
||||
fullpath.display(),
|
||||
relpath.display()
|
||||
);
|
||||
ar.append_dir(relpath, fullpath)?;
|
||||
} else if entry.file_type().is_symlink() {
|
||||
error!("ignoring symlink in snapshot dir");
|
||||
} else if entry.file_type().is_file() {
|
||||
// Shared catalogs are exempt
|
||||
if relpath.starts_with("global/") {
|
||||
trace!("sending shared catalog {}", relpath.display());
|
||||
ar.append_path_with_name(fullpath, relpath)?;
|
||||
} else if !is_rel_file_path(relpath.to_str().unwrap()) {
|
||||
if entry.file_name() != "pg_filenode.map"
|
||||
&& !relpath.starts_with("pg_xact/")
|
||||
&& !relpath.starts_with("pg_multixact/")
|
||||
{
|
||||
trace!("sending {}", relpath.display());
|
||||
ar.append_path_with_name(fullpath, relpath)?;
|
||||
}
|
||||
} else {
|
||||
trace!("not sending {}", relpath.display());
|
||||
}
|
||||
} else {
|
||||
error!("unknown file type: {}", fullpath.display());
|
||||
}
|
||||
}
|
||||
|
||||
add_slru_segments(
|
||||
&mut ar,
|
||||
timeline,
|
||||
"pg_xact",
|
||||
pg_constants::PG_XACT_FORKNUM,
|
||||
lsn,
|
||||
)?;
|
||||
add_slru_segments(
|
||||
&mut ar,
|
||||
timeline,
|
||||
"pg_multixact/members",
|
||||
pg_constants::PG_MXACT_MEMBERS_FORKNUM,
|
||||
lsn,
|
||||
)?;
|
||||
add_slru_segments(
|
||||
&mut ar,
|
||||
timeline,
|
||||
"pg_multixact/offsets",
|
||||
pg_constants::PG_MXACT_OFFSETS_FORKNUM,
|
||||
lsn,
|
||||
)?;
|
||||
add_relmap_files(&mut ar, timeline, lsn)?;
|
||||
|
||||
// FIXME: Also send all the WAL. The compute node would only need
|
||||
// the WAL that applies to non-relation files, because the page
|
||||
// server handles all the relation files. But we don't have a
|
||||
// mechanism for separating relation and non-relation WAL at the
|
||||
// moment.
|
||||
for entry in std::fs::read_dir(&walpath)? {
|
||||
let entry = entry?;
|
||||
let fullpath = &entry.path();
|
||||
let relpath = fullpath.strip_prefix(&walpath).unwrap();
|
||||
|
||||
if !entry.path().is_file() {
|
||||
continue;
|
||||
}
|
||||
|
||||
let archive_fname = relpath.to_str().unwrap();
|
||||
let archive_fname = archive_fname
|
||||
.strip_suffix(".partial")
|
||||
.unwrap_or(&archive_fname);
|
||||
let archive_path = "pg_wal/".to_owned() + archive_fname;
|
||||
ar.append_path_with_name(fullpath, archive_path)?;
|
||||
}
|
||||
ar.finish()?;
|
||||
debug!("all tarred up!");
|
||||
Ok(())
|
||||
}
|
||||
|
||||
///
|
||||
/// Send a tarball containing a snapshot of all non-relation files in the
|
||||
/// PostgreSQL data directory, at given LSN
|
||||
|
||||
@@ -653,12 +653,18 @@ impl Connection {
|
||||
} else if query_string.starts_with(b"basebackup ") {
|
||||
let (_l, r) = query_string.split_at("basebackup ".len());
|
||||
let r = r.to_vec();
|
||||
let timelineid_str = String::from(String::from_utf8(r)?.trim_end());
|
||||
let basebackup_args = String::from(String::from_utf8(r)?.trim_end());
|
||||
let args: Vec<&str> = basebackup_args.rsplit(' ').collect();
|
||||
let timelineid_str = args[0];
|
||||
info!("got basebackup command: \"{}\"", timelineid_str);
|
||||
let timelineid = ZTimelineId::from_str(&timelineid_str)?;
|
||||
|
||||
let lsn = if args.len() > 1 {
|
||||
Some(Lsn::from_str(args[1])?)
|
||||
} else {
|
||||
None
|
||||
};
|
||||
// Check that the timeline exists
|
||||
self.handle_basebackup_request(timelineid)?;
|
||||
self.handle_basebackup_request(timelineid, lsn)?;
|
||||
self.write_message_noflush(&BeMessage::CommandComplete)?;
|
||||
self.write_message(&BeMessage::ReadyForQuery)?;
|
||||
} else if query_string.starts_with(b"callmemaybe ") {
|
||||
@@ -814,16 +820,19 @@ impl Connection {
|
||||
}
|
||||
}
|
||||
|
||||
fn handle_basebackup_request(&mut self, timelineid: ZTimelineId) -> anyhow::Result<()> {
|
||||
fn handle_basebackup_request(
|
||||
&mut self,
|
||||
timelineid: ZTimelineId,
|
||||
lsn: Option<Lsn>,
|
||||
) -> anyhow::Result<()> {
|
||||
// check that the timeline exists
|
||||
let repository = page_cache::get_repository();
|
||||
if repository.get_or_restore_timeline(timelineid).is_err() {
|
||||
bail!(
|
||||
let timeline = repository.get_or_restore_timeline(timelineid).map_err(|_| {
|
||||
anyhow!(
|
||||
"client requested basebackup on timeline {} which does not exist in page server",
|
||||
timelineid
|
||||
);
|
||||
}
|
||||
|
||||
)
|
||||
})?;
|
||||
/* switch client to COPYOUT */
|
||||
let stream = &mut self.stream;
|
||||
stream.write_u8(b'H')?;
|
||||
@@ -836,10 +845,16 @@ impl Connection {
|
||||
/* Send a tarball of the latest snapshot on the timeline */
|
||||
|
||||
// find latest snapshot
|
||||
let snapshotlsn = restore_local_repo::find_latest_snapshot(&self.conf, timelineid).unwrap();
|
||||
|
||||
basebackup::send_snapshot_tarball(&mut CopyDataSink { stream }, timelineid, snapshotlsn)?;
|
||||
|
||||
let snapshot_lsn =
|
||||
restore_local_repo::find_latest_snapshot(&self.conf, timelineid).unwrap();
|
||||
let req_lsn = lsn.unwrap_or(snapshot_lsn);
|
||||
basebackup::send_tarball_at_lsn(
|
||||
&mut CopyDataSink { stream },
|
||||
timelineid,
|
||||
&timeline,
|
||||
req_lsn,
|
||||
snapshot_lsn,
|
||||
)?;
|
||||
// CopyDone
|
||||
self.stream.write_u8(b'c')?;
|
||||
self.stream.write_u32::<BE>(4)?;
|
||||
|
||||
@@ -106,6 +106,13 @@ pub trait Timeline {
|
||||
/// valid LSN, so that the WAL receiver knows where to restart streaming.
|
||||
fn advance_last_record_lsn(&self, lsn: Lsn);
|
||||
fn get_last_record_lsn(&self) -> Lsn;
|
||||
|
||||
/// Get range [begin,end) of stored blocks. Used mostly for SMGR pseudorelations
|
||||
/// but can be also applied to normal relations.
|
||||
fn get_range(&self, rel: RelTag, lsn: Lsn) -> Result<(u32, u32)>;
|
||||
|
||||
/// Get vector of databases (represented using RelTag only dbnode and spcnode fields are used)
|
||||
fn get_databases(&self) -> Result<Vec<RelTag>>;
|
||||
}
|
||||
|
||||
#[derive(Clone)]
|
||||
@@ -118,25 +125,25 @@ pub struct RepositoryStats {
|
||||
|
||||
#[derive(Debug, PartialEq, Eq, PartialOrd, Hash, Ord, Clone, Copy)]
|
||||
pub struct RelTag {
|
||||
pub forknum: u8,
|
||||
pub spcnode: u32,
|
||||
pub dbnode: u32,
|
||||
pub relnode: u32,
|
||||
pub forknum: u8,
|
||||
}
|
||||
|
||||
impl RelTag {
|
||||
pub fn pack(&self, buf: &mut BytesMut) {
|
||||
buf.put_u8(self.forknum);
|
||||
buf.put_u32(self.spcnode);
|
||||
buf.put_u32(self.dbnode);
|
||||
buf.put_u32(self.relnode);
|
||||
buf.put_u32(self.forknum as u32); // encode forknum as u32 to provide compatibility with wal_redo_postgres
|
||||
}
|
||||
pub fn unpack(buf: &mut BytesMut) -> RelTag {
|
||||
RelTag {
|
||||
forknum: buf.get_u8(),
|
||||
spcnode: buf.get_u32(),
|
||||
dbnode: buf.get_u32(),
|
||||
relnode: buf.get_u32(),
|
||||
forknum: buf.get_u32() as u8,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -440,7 +440,6 @@ impl RocksTimeline {
|
||||
let new_img = self
|
||||
.walredo_mgr
|
||||
.request_redo(key.tag, key.lsn, base_img, records)?;
|
||||
|
||||
self.put_page_image(key.tag, key.lsn, new_img.clone());
|
||||
|
||||
reconstructed += 1;
|
||||
@@ -629,10 +628,88 @@ impl Timeline for RocksTimeline {
|
||||
/// Get size of relation at given LSN.
|
||||
///
|
||||
fn get_relsize(&self, rel: RelTag, lsn: Lsn) -> Result<u32> {
|
||||
self.wait_lsn(lsn)?;
|
||||
let lsn = self.wait_lsn(lsn)?;
|
||||
self.relsize_get_nowait(rel, lsn)
|
||||
}
|
||||
|
||||
/// Get databases. This function is used to local pg_filenode.map files
|
||||
fn get_databases(&self) -> Result<Vec<RelTag>> {
|
||||
let key = CacheKey {
|
||||
// minimal key
|
||||
tag: BufferTag {
|
||||
rel: RelTag {
|
||||
forknum: pg_constants::PG_FILENODEMAP_FORKNUM,
|
||||
spcnode: 0,
|
||||
dbnode: 0,
|
||||
relnode: 0,
|
||||
},
|
||||
blknum: 0,
|
||||
},
|
||||
lsn: Lsn(0),
|
||||
};
|
||||
let mut buf = BytesMut::new();
|
||||
key.pack(&mut buf);
|
||||
let mut dbs = Vec::new();
|
||||
|
||||
let mut iter = self.db.raw_iterator();
|
||||
iter.seek(&buf[..]);
|
||||
let mut prev_tag = key.tag.rel;
|
||||
while iter.valid() {
|
||||
let k = iter.key().unwrap();
|
||||
buf.clear();
|
||||
buf.extend_from_slice(&k);
|
||||
let tag = RelTag::unpack(&mut buf);
|
||||
if tag.forknum != pg_constants::PG_FILENODEMAP_FORKNUM {
|
||||
break; // we are done with this fork
|
||||
}
|
||||
if tag != prev_tag {
|
||||
dbs.push(tag); // collect unique tags
|
||||
prev_tag = tag;
|
||||
}
|
||||
iter.next();
|
||||
}
|
||||
return Ok(dbs);
|
||||
}
|
||||
|
||||
/// Get range [begin,end) of stored blocks. Used mostly for SMGR pseudorelations
|
||||
/// but can be also applied to normal relations.
|
||||
fn get_range(&self, rel: RelTag, lsn: Lsn) -> Result<(u32, u32)> {
|
||||
let lsn = self.wait_lsn(lsn)?;
|
||||
let mut key = CacheKey {
|
||||
// minimal key to start with
|
||||
tag: BufferTag { rel, blknum: 0 },
|
||||
lsn,
|
||||
};
|
||||
let mut buf = BytesMut::new();
|
||||
key.pack(&mut buf);
|
||||
let mut iter = self.db.raw_iterator();
|
||||
iter.seek(&buf[..]); // locate first entry
|
||||
if iter.valid() {
|
||||
let k = iter.key().unwrap();
|
||||
buf.clear();
|
||||
buf.extend_from_slice(&k);
|
||||
let tag = BufferTag::unpack(&mut buf);
|
||||
if tag.rel == rel {
|
||||
// still trversing this relation
|
||||
let first_blknum = tag.blknum;
|
||||
key.tag.blknum = u32::MAX; // maximal key
|
||||
buf.clear();
|
||||
key.pack(&mut buf);
|
||||
let mut iter = self.db.raw_iterator();
|
||||
iter.seek_for_prev(&buf[..]); // localte last entry
|
||||
if iter.valid() {
|
||||
let k = iter.key().unwrap();
|
||||
buf.clear();
|
||||
buf.extend_from_slice(&k);
|
||||
let tag = BufferTag::unpack(&mut buf);
|
||||
let last_blknum = tag.blknum;
|
||||
return Ok((first_blknum, last_blknum + 1)); // upper boundary is exclusive
|
||||
}
|
||||
}
|
||||
}
|
||||
Ok((0, 0)) // empty range
|
||||
}
|
||||
|
||||
///
|
||||
/// Does relation exist at given LSN?
|
||||
///
|
||||
@@ -731,14 +808,25 @@ impl Timeline for RocksTimeline {
|
||||
/// Memorize a full image of a page version
|
||||
///
|
||||
fn put_page_image(&self, tag: BufferTag, lsn: Lsn, img: Bytes) {
|
||||
let img_len = img.len();
|
||||
let key = CacheKey { tag, lsn };
|
||||
let content = CacheEntryContent::PageImage(img);
|
||||
|
||||
let mut key_buf = BytesMut::new();
|
||||
key.pack(&mut key_buf);
|
||||
let mut val_buf = BytesMut::new();
|
||||
content.pack(&mut val_buf);
|
||||
|
||||
// Zero size of page image indicates that SLRU page was truncated
|
||||
if img_len == 0 && key.tag.rel.forknum > pg_constants::PG_XACT_FORKNUM {
|
||||
if (val_buf[0] & UNUSED_VERSION_FLAG) != 0 {
|
||||
// records already marked for deletion
|
||||
return;
|
||||
} else {
|
||||
// delete truncated multixact page
|
||||
val_buf[0] |= UNUSED_VERSION_FLAG;
|
||||
}
|
||||
}
|
||||
|
||||
trace!("put_wal_record lsn: {}", key.lsn);
|
||||
let _res = self.db.put(&key_buf[..], &val_buf[..]);
|
||||
|
||||
|
||||
@@ -23,7 +23,7 @@ use anyhow::Result;
|
||||
use bytes::Bytes;
|
||||
|
||||
use crate::repository::{BufferTag, RelTag, Timeline};
|
||||
use crate::waldecoder::{decode_wal_record, WalStreamDecoder};
|
||||
use crate::waldecoder::{Oid, decode_wal_record, WalStreamDecoder};
|
||||
use crate::PageServerConf;
|
||||
use crate::ZTimelineId;
|
||||
use postgres_ffi::pg_constants;
|
||||
@@ -120,8 +120,26 @@ fn restore_snapshot(
|
||||
None => continue,
|
||||
|
||||
// These special files appear in the snapshot, but are not needed by the page server
|
||||
Some("pg_control") => continue,
|
||||
Some("pg_filenode.map") => continue,
|
||||
Some("pg_control") => restore_nonrel_file(
|
||||
conf,
|
||||
timeline,
|
||||
timelineid,
|
||||
snapshot,
|
||||
pg_constants::GLOBALTABLESPACE_OID,
|
||||
0,
|
||||
pg_constants::PG_CONTROLFILE_FORKNUM,
|
||||
&direntry.path(),
|
||||
)?,
|
||||
Some("pg_filenode.map") => restore_nonrel_file(
|
||||
conf,
|
||||
timeline,
|
||||
timelineid,
|
||||
snapshot,
|
||||
pg_constants::GLOBALTABLESPACE_OID,
|
||||
0,
|
||||
pg_constants::PG_FILENODEMAP_FORKNUM,
|
||||
&direntry.path(),
|
||||
)?,
|
||||
|
||||
// Load any relation files into the page server
|
||||
_ => restore_relfile(
|
||||
@@ -148,7 +166,16 @@ fn restore_snapshot(
|
||||
|
||||
// These special files appear in the snapshot, but are not needed by the page server
|
||||
Some("PG_VERSION") => continue,
|
||||
Some("pg_filenode.map") => continue,
|
||||
Some("pg_filenode.map") => restore_nonrel_file(
|
||||
conf,
|
||||
timeline,
|
||||
timelineid,
|
||||
snapshot,
|
||||
pg_constants::DEFAULTTABLESPACE_OID,
|
||||
dboid,
|
||||
pg_constants::PG_FILENODEMAP_FORKNUM,
|
||||
&direntry.path(),
|
||||
)?,
|
||||
|
||||
// Load any relation files into the page server
|
||||
_ => restore_relfile(
|
||||
@@ -163,7 +190,7 @@ fn restore_snapshot(
|
||||
}
|
||||
for entry in fs::read_dir(snapshotpath.join("pg_xact"))? {
|
||||
let entry = entry?;
|
||||
restore_nonrelfile(
|
||||
restore_slru_file(
|
||||
conf,
|
||||
timeline,
|
||||
timelineid,
|
||||
@@ -172,6 +199,28 @@ fn restore_snapshot(
|
||||
&entry.path(),
|
||||
)?;
|
||||
}
|
||||
for entry in fs::read_dir(snapshotpath.join("pg_multixact").join("members"))? {
|
||||
let entry = entry?;
|
||||
restore_slru_file(
|
||||
conf,
|
||||
timeline,
|
||||
timelineid,
|
||||
snapshot,
|
||||
pg_constants::PG_MXACT_MEMBERS_FORKNUM,
|
||||
&entry.path(),
|
||||
)?;
|
||||
}
|
||||
for entry in fs::read_dir(snapshotpath.join("pg_multixact").join("offsets"))? {
|
||||
let entry = entry?;
|
||||
restore_slru_file(
|
||||
conf,
|
||||
timeline,
|
||||
timelineid,
|
||||
snapshot,
|
||||
pg_constants::PG_MXACT_OFFSETS_FORKNUM,
|
||||
&entry.path(),
|
||||
)?;
|
||||
}
|
||||
// TODO: Scan pg_tblspc
|
||||
|
||||
Ok(())
|
||||
@@ -180,8 +229,8 @@ fn restore_snapshot(
|
||||
fn restore_relfile(
|
||||
timeline: &dyn Timeline,
|
||||
snapshot: &str,
|
||||
spcoid: u32,
|
||||
dboid: u32,
|
||||
spcoid: Oid,
|
||||
dboid: Oid,
|
||||
path: &Path,
|
||||
) -> Result<()> {
|
||||
let lsn = Lsn::from_hex(snapshot)?;
|
||||
@@ -239,7 +288,39 @@ fn restore_relfile(
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn restore_nonrelfile(
|
||||
fn restore_nonrel_file(
|
||||
_conf: &PageServerConf,
|
||||
timeline: &dyn Timeline,
|
||||
_timelineid: ZTimelineId,
|
||||
snapshot: &str,
|
||||
spcoid: Oid,
|
||||
dboid: Oid,
|
||||
forknum: u8,
|
||||
path: &Path,
|
||||
) -> Result<()> {
|
||||
let lsn = Lsn::from_hex(snapshot)?;
|
||||
|
||||
// Does it look like a relation file?
|
||||
|
||||
let mut file = File::open(path)?;
|
||||
let mut buffer = Vec::new();
|
||||
// read the whole file
|
||||
file.read_to_end(&mut buffer)?;
|
||||
|
||||
let tag = BufferTag {
|
||||
rel: RelTag {
|
||||
spcnode: spcoid,
|
||||
dbnode: dboid,
|
||||
relnode: 0,
|
||||
forknum,
|
||||
},
|
||||
blknum: 0,
|
||||
};
|
||||
timeline.put_page_image(tag, lsn, Bytes::copy_from_slice(&buffer[..]));
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn restore_slru_file(
|
||||
_conf: &PageServerConf,
|
||||
timeline: &dyn Timeline,
|
||||
_timelineid: ZTimelineId,
|
||||
@@ -263,7 +344,7 @@ fn restore_nonrelfile(
|
||||
let tag = BufferTag {
|
||||
rel: RelTag {
|
||||
spcnode: 0,
|
||||
dbnode: 0,
|
||||
dbnode: 0,
|
||||
relnode: 0,
|
||||
forknum,
|
||||
},
|
||||
@@ -296,7 +377,7 @@ fn restore_nonrelfile(
|
||||
Ok(())
|
||||
}
|
||||
|
||||
// Scan WAL on a timeline, starting from gien LSN, and load all the records
|
||||
// Scan WAL on a timeline, starting from given LSN, and load all the records
|
||||
// into the page cache.
|
||||
fn restore_wal(timeline: &dyn Timeline, timelineid: ZTimelineId, startpoint: Lsn) -> Result<()> {
|
||||
let walpath = format!("timelines/{}/wal", timelineid);
|
||||
|
||||
@@ -310,6 +310,9 @@ pub type Oid = u32;
|
||||
pub type TransactionId = u32;
|
||||
pub type BlockNumber = u32;
|
||||
pub type OffsetNumber = u16;
|
||||
pub type MultiXactId = TransactionId;
|
||||
pub type MultiXactOffset = u32;
|
||||
pub type MultiXactStatus = u32;
|
||||
|
||||
#[repr(C)]
|
||||
#[derive(Debug, Clone, Copy)]
|
||||
@@ -319,6 +322,24 @@ pub struct RelFileNode {
|
||||
pub relnode: Oid, /* relation */
|
||||
}
|
||||
|
||||
#[repr(C)]
|
||||
#[derive(Debug)]
|
||||
pub struct XlRelmapUpdate {
|
||||
pub dbid: Oid, /* database ID, or 0 for shared map */
|
||||
pub tsid: Oid, /* database's tablespace, or pg_global */
|
||||
pub nbytes: i32, /* size of relmap data */
|
||||
}
|
||||
|
||||
impl XlRelmapUpdate {
|
||||
pub fn decode(buf: &mut Bytes) -> XlRelmapUpdate {
|
||||
XlRelmapUpdate {
|
||||
dbid: buf.get_u32_le(),
|
||||
tsid: buf.get_u32_le(),
|
||||
nbytes: buf.get_i32_le(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[repr(C)]
|
||||
#[derive(Debug)]
|
||||
pub struct XlSmgrTruncate {
|
||||
@@ -441,6 +462,74 @@ impl XlHeapUpdate {
|
||||
}
|
||||
}
|
||||
|
||||
#[repr(C)]
|
||||
#[derive(Debug)]
|
||||
pub struct MultiXactMember {
|
||||
pub xid: TransactionId,
|
||||
pub status: MultiXactStatus,
|
||||
}
|
||||
|
||||
impl MultiXactMember {
|
||||
pub fn decode(buf: &mut Bytes) -> MultiXactMember {
|
||||
MultiXactMember {
|
||||
xid: buf.get_u32_le(),
|
||||
status: buf.get_u32_le(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[repr(C)]
|
||||
#[derive(Debug)]
|
||||
pub struct XlMultiXactCreate {
|
||||
pub mid: MultiXactId, /* new MultiXact's ID */
|
||||
pub moff: MultiXactOffset, /* its starting offset in members file */
|
||||
pub nmembers: u32, /* number of member XIDs */
|
||||
pub members: Vec<MultiXactMember>,
|
||||
}
|
||||
|
||||
impl XlMultiXactCreate {
|
||||
pub fn decode(buf: &mut Bytes) -> XlMultiXactCreate {
|
||||
let mid = buf.get_u32_le();
|
||||
let moff = buf.get_u32_le();
|
||||
let nmembers = buf.get_u32_le();
|
||||
let mut members = Vec::new();
|
||||
for _ in 0..nmembers {
|
||||
members.push(MultiXactMember::decode(buf));
|
||||
}
|
||||
XlMultiXactCreate {
|
||||
mid,
|
||||
moff,
|
||||
nmembers,
|
||||
members,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[repr(C)]
|
||||
#[derive(Debug)]
|
||||
pub struct XlMultiXactTruncate {
|
||||
oldest_multi_db: Oid,
|
||||
/* to-be-truncated range of multixact offsets */
|
||||
start_trunc_off: MultiXactId, /* just for completeness' sake */
|
||||
end_trunc_off: MultiXactId,
|
||||
|
||||
/* to-be-truncated range of multixact members */
|
||||
start_trunc_memb: MultiXactOffset,
|
||||
end_trunc_memb: MultiXactOffset,
|
||||
}
|
||||
|
||||
impl XlMultiXactTruncate {
|
||||
pub fn decode(buf: &mut Bytes) -> XlMultiXactTruncate {
|
||||
XlMultiXactTruncate {
|
||||
oldest_multi_db: buf.get_u32_le(),
|
||||
start_trunc_off: buf.get_u32_le(),
|
||||
end_trunc_off: buf.get_u32_le(),
|
||||
start_trunc_memb: buf.get_u32_le(),
|
||||
end_trunc_memb: buf.get_u32_le(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
//
|
||||
// Routines to decode a WAL record and figure out which blocks are modified
|
||||
//
|
||||
@@ -930,6 +1019,73 @@ pub fn decode_wal_record(record: Bytes) -> DecodedWALRecord {
|
||||
blocks.push(blk);
|
||||
}
|
||||
}
|
||||
} else if xlogrec.xl_rmid == pg_constants::RM_MULTIXACT_ID {
|
||||
let info = xlogrec.xl_info & pg_constants::XLOG_XACT_OPMASK;
|
||||
if info == pg_constants::XLOG_MULTIXACT_ZERO_OFF_PAGE {
|
||||
let mut blk = DecodedBkpBlock::new();
|
||||
blk.forknum = pg_constants::PG_MXACT_OFFSETS_FORKNUM;
|
||||
blk.blkno = buf.get_u32_le();
|
||||
blk.will_init = true;
|
||||
blocks.push(blk);
|
||||
} else if info == pg_constants::XLOG_MULTIXACT_ZERO_MEM_PAGE {
|
||||
let mut blk = DecodedBkpBlock::new();
|
||||
blk.forknum = pg_constants::PG_MXACT_MEMBERS_FORKNUM;
|
||||
blk.blkno = buf.get_u32_le();
|
||||
blk.will_init = true;
|
||||
blocks.push(blk);
|
||||
} else if info == pg_constants::XLOG_MULTIXACT_CREATE_ID {
|
||||
let xlrec = XlMultiXactCreate::decode(&mut buf);
|
||||
// Update offset page
|
||||
let mut blk = DecodedBkpBlock::new();
|
||||
blk.blkno = xlrec.mid / pg_constants::MULTIXACT_OFFSETS_PER_PAGE as u32;
|
||||
blk.forknum = pg_constants::PG_MXACT_OFFSETS_FORKNUM;
|
||||
blocks.push(blk);
|
||||
let first_mbr_blkno = xlrec.moff / pg_constants::MULTIXACT_MEMBERS_PER_PAGE as u32;
|
||||
let last_mbr_blkno =
|
||||
(xlrec.moff + xlrec.nmembers - 1) / pg_constants::MULTIXACT_MEMBERS_PER_PAGE as u32;
|
||||
for blkno in first_mbr_blkno..=last_mbr_blkno {
|
||||
// Update members page
|
||||
let mut blk = DecodedBkpBlock::new();
|
||||
blk.forknum = pg_constants::PG_MXACT_MEMBERS_FORKNUM;
|
||||
blk.blkno = blkno;
|
||||
blocks.push(blk);
|
||||
}
|
||||
} else if info == pg_constants::XLOG_MULTIXACT_TRUNCATE_ID {
|
||||
let xlrec = XlMultiXactTruncate::decode(&mut buf);
|
||||
let first_off_blkno =
|
||||
xlrec.start_trunc_off / pg_constants::MULTIXACT_OFFSETS_PER_PAGE as u32;
|
||||
let last_off_blkno =
|
||||
xlrec.end_trunc_off / pg_constants::MULTIXACT_OFFSETS_PER_PAGE as u32;
|
||||
for blkno in first_off_blkno..last_off_blkno {
|
||||
let mut blk = DecodedBkpBlock::new();
|
||||
blk.forknum = pg_constants::PG_MXACT_OFFSETS_FORKNUM;
|
||||
blk.blkno = blkno;
|
||||
blk.will_init = true;
|
||||
blocks.push(blk);
|
||||
}
|
||||
let first_mbr_blkno =
|
||||
xlrec.start_trunc_memb / pg_constants::MULTIXACT_MEMBERS_PER_PAGE as u32;
|
||||
let last_mbr_blkno =
|
||||
xlrec.end_trunc_memb / pg_constants::MULTIXACT_MEMBERS_PER_PAGE as u32;
|
||||
for blkno in first_mbr_blkno..last_mbr_blkno {
|
||||
let mut blk = DecodedBkpBlock::new();
|
||||
blk.forknum = pg_constants::PG_MXACT_MEMBERS_FORKNUM;
|
||||
blk.blkno = blkno;
|
||||
blk.will_init = true;
|
||||
blocks.push(blk);
|
||||
}
|
||||
} else {
|
||||
assert!(false);
|
||||
}
|
||||
} else if xlogrec.xl_rmid == pg_constants::RM_RELMAP_ID {
|
||||
let xlrec = XlRelmapUpdate::decode(&mut buf);
|
||||
let mut blk = DecodedBkpBlock::new();
|
||||
blk.forknum = pg_constants::PG_FILENODEMAP_FORKNUM;
|
||||
blk.rnode_spcnode = xlrec.tsid;
|
||||
blk.rnode_dbnode = xlrec.dbid;
|
||||
blk.rnode_relnode = 0;
|
||||
blk.will_init = true;
|
||||
blocks.push(blk);
|
||||
}
|
||||
|
||||
DecodedWALRecord {
|
||||
|
||||
@@ -14,6 +14,7 @@
|
||||
//! TODO: Even though the postgres code runs in a separate process,
|
||||
//! it's not a secure sandbox.
|
||||
//!
|
||||
use byteorder::{ByteOrder, LittleEndian};
|
||||
use bytes::{Buf, BufMut, Bytes, BytesMut};
|
||||
use log::*;
|
||||
use std::assert;
|
||||
@@ -36,6 +37,7 @@ use zenith_utils::lsn::Lsn;
|
||||
|
||||
use crate::repository::BufferTag;
|
||||
use crate::repository::WALRecord;
|
||||
use crate::waldecoder::{MultiXactId, XlMultiXactCreate};
|
||||
use crate::PageServerConf;
|
||||
use postgres_ffi::pg_constants;
|
||||
use postgres_ffi::xlog_utils::XLogRecord;
|
||||
@@ -170,6 +172,25 @@ impl WalRedoManager for PostgresRedoManager {
|
||||
}
|
||||
}
|
||||
|
||||
fn mx_offset_to_flags_offset(xid: MultiXactId) -> usize {
|
||||
return ((xid / pg_constants::MULTIXACT_MEMBERS_PER_MEMBERGROUP as u32) as u16
|
||||
% pg_constants::MULTIXACT_MEMBERGROUPS_PER_PAGE
|
||||
* pg_constants::MULTIXACT_MEMBERGROUP_SIZE) as usize;
|
||||
}
|
||||
|
||||
fn mx_offset_to_flags_bitshift(xid: MultiXactId) -> u16 {
|
||||
return (xid as u16) % pg_constants::MULTIXACT_MEMBERS_PER_MEMBERGROUP
|
||||
* pg_constants::MXACT_MEMBER_BITS_PER_XACT;
|
||||
}
|
||||
|
||||
/* Location (byte offset within page) of TransactionId of given member */
|
||||
fn mx_offset_to_member_offset(xid: MultiXactId) -> usize {
|
||||
return mx_offset_to_flags_offset(xid)
|
||||
+ (pg_constants::MULTIXACT_FLAGBYTES_PER_GROUP
|
||||
+ (xid as u16 % pg_constants::MULTIXACT_MEMBERS_PER_MEMBERGROUP) * 4)
|
||||
as usize;
|
||||
}
|
||||
|
||||
///
|
||||
/// WAL redo thread
|
||||
///
|
||||
@@ -255,7 +276,7 @@ impl PostgresRedoManagerInternal {
|
||||
let start = Instant::now();
|
||||
|
||||
let apply_result: Result<Bytes, Error>;
|
||||
if tag.rel.forknum == pg_constants::PG_XACT_FORKNUM {
|
||||
if tag.rel.forknum >= pg_constants::PG_XACT_FORKNUM {
|
||||
const ZERO_PAGE: [u8; 8192] = [0u8; 8192];
|
||||
let mut page = BytesMut::new();
|
||||
if let Some(fpi) = base_img {
|
||||
@@ -344,6 +365,55 @@ impl PostgresRedoManagerInternal {
|
||||
record.lsn,
|
||||
record.main_data_offset, record.rec.len());
|
||||
}
|
||||
} else if xlogrec.xl_rmid == pg_constants::RM_MULTIXACT_ID {
|
||||
let info = xlogrec.xl_info & pg_constants::XLOG_XACT_OPMASK;
|
||||
if info == pg_constants::XLOG_MULTIXACT_ZERO_OFF_PAGE {
|
||||
page.fill(0);
|
||||
} else if info == pg_constants::XLOG_MULTIXACT_ZERO_MEM_PAGE {
|
||||
page.fill(0);
|
||||
} else if info == pg_constants::XLOG_MULTIXACT_CREATE_ID {
|
||||
let xlrec = XlMultiXactCreate::decode(&mut buf);
|
||||
if tag.rel.forknum == pg_constants::PG_MXACT_OFFSETS_FORKNUM {
|
||||
let offs = (xlrec.mid % pg_constants::MULTIXACT_OFFSETS_PER_PAGE as u32
|
||||
* 4) as usize;
|
||||
LittleEndian::write_u32(&mut page[offs..offs + 4], xlrec.moff);
|
||||
} else {
|
||||
assert!(tag.rel.forknum == pg_constants::PG_MXACT_MEMBERS_FORKNUM);
|
||||
for i in 0..xlrec.nmembers {
|
||||
let blkno = i / pg_constants::MULTIXACT_MEMBERS_PER_PAGE as u32;
|
||||
if blkno == tag.blknum {
|
||||
// update only target block
|
||||
let offset = xlrec.moff + i;
|
||||
let memberoff = mx_offset_to_member_offset(offset);
|
||||
let flagsoff = mx_offset_to_flags_offset(offset);
|
||||
let bshift = mx_offset_to_flags_bitshift(offset);
|
||||
let mut flagsval =
|
||||
LittleEndian::read_u32(&page[flagsoff..flagsoff + 4]);
|
||||
flagsval &=
|
||||
!(((1 << pg_constants::MXACT_MEMBER_BITS_PER_XACT) - 1)
|
||||
<< bshift);
|
||||
flagsval |= xlrec.members[i as usize].status << bshift;
|
||||
LittleEndian::write_u32(
|
||||
&mut page[flagsoff..flagsoff + 4],
|
||||
flagsval,
|
||||
);
|
||||
LittleEndian::write_u32(
|
||||
&mut page[memberoff..memberoff + 4],
|
||||
xlrec.members[i as usize].xid,
|
||||
);
|
||||
}
|
||||
}
|
||||
}
|
||||
} else if info == pg_constants::XLOG_MULTIXACT_TRUNCATE_ID {
|
||||
// empty page image indicates that this SLRU page is truncated and can be removed by GC
|
||||
page.clear();
|
||||
} else {
|
||||
assert!(false);
|
||||
}
|
||||
} else if xlogrec.xl_rmid == pg_constants::RM_RELMAP_ID {
|
||||
page.clear();
|
||||
page.extend_from_slice(&buf[..]);
|
||||
assert!(page.len() == 512); // size of pg_filenode.map
|
||||
}
|
||||
}
|
||||
|
||||
@@ -537,7 +607,7 @@ impl PostgresRedoProcess {
|
||||
// explanation of the protocol.
|
||||
|
||||
fn build_begin_redo_for_block_msg(tag: BufferTag) -> Bytes {
|
||||
let len = 4 + 5 * 4;
|
||||
let len = 4 + 1 + 4 * 4;
|
||||
let mut buf = BytesMut::with_capacity(1 + len);
|
||||
|
||||
buf.put_u8(b'B');
|
||||
@@ -552,7 +622,7 @@ fn build_begin_redo_for_block_msg(tag: BufferTag) -> Bytes {
|
||||
fn build_push_page_msg(tag: BufferTag, base_img: Bytes) -> Bytes {
|
||||
assert!(base_img.len() == 8192);
|
||||
|
||||
let len = 4 + 5 * 4 + base_img.len();
|
||||
let len = 4 + 1 + 4 * 4 + base_img.len();
|
||||
let mut buf = BytesMut::with_capacity(1 + len);
|
||||
|
||||
buf.put_u8(b'P');
|
||||
@@ -580,7 +650,7 @@ fn build_apply_record_msg(endlsn: Lsn, rec: Bytes) -> Bytes {
|
||||
}
|
||||
|
||||
fn build_get_page_msg(tag: BufferTag) -> Bytes {
|
||||
let len = 4 + 5 * 4;
|
||||
let len = 4 + 1 + 4 * 4;
|
||||
let mut buf = BytesMut::with_capacity(1 + len);
|
||||
|
||||
buf.put_u8(b'G');
|
||||
|
||||
@@ -77,6 +77,25 @@ pub const XACT_XINFO_HAS_TWOPHASE: u32 = 1u32 << 4;
|
||||
pub const XLOG_SWITCH: u8 = 0x40;
|
||||
pub const XLOG_SMGR_TRUNCATE: u8 = 0x20;
|
||||
|
||||
// From multixact.h
|
||||
pub const XLOG_MULTIXACT_ZERO_OFF_PAGE: u8 = 0x00;
|
||||
pub const XLOG_MULTIXACT_ZERO_MEM_PAGE: u8 = 0x10;
|
||||
pub const XLOG_MULTIXACT_CREATE_ID: u8 = 0x20;
|
||||
pub const XLOG_MULTIXACT_TRUNCATE_ID: u8 = 0x30;
|
||||
|
||||
pub const MULTIXACT_OFFSETS_PER_PAGE: u16 = BLCKSZ / 4;
|
||||
pub const MXACT_MEMBER_BITS_PER_XACT: u16 = 8;
|
||||
pub const MXACT_MEMBER_FLAGS_PER_BYTE: u16 = 1;
|
||||
pub const MULTIXACT_FLAGBYTES_PER_GROUP: u16 = 4;
|
||||
pub const MULTIXACT_MEMBERS_PER_MEMBERGROUP: u16 =
|
||||
MULTIXACT_FLAGBYTES_PER_GROUP * MXACT_MEMBER_FLAGS_PER_BYTE;
|
||||
/* size in bytes of a complete group */
|
||||
pub const MULTIXACT_MEMBERGROUP_SIZE: u16 =
|
||||
4 * MULTIXACT_MEMBERS_PER_MEMBERGROUP + MULTIXACT_FLAGBYTES_PER_GROUP;
|
||||
pub const MULTIXACT_MEMBERGROUPS_PER_PAGE: u16 = BLCKSZ / MULTIXACT_MEMBERGROUP_SIZE;
|
||||
pub const MULTIXACT_MEMBERS_PER_PAGE: u16 =
|
||||
MULTIXACT_MEMBERGROUPS_PER_PAGE * MULTIXACT_MEMBERS_PER_MEMBERGROUP;
|
||||
|
||||
// From heapam_xlog.h
|
||||
pub const XLOG_HEAP_INSERT: u8 = 0x00;
|
||||
pub const XLOG_HEAP_DELETE: u8 = 0x10;
|
||||
|
||||
Reference in New Issue
Block a user