mirror of
https://github.com/neondatabase/neon.git
synced 2026-01-10 15:02:56 +00:00
Compute and restore pg_xact, pg_multixact and pg_filenode.map files
This commit is contained in:
@@ -1,12 +1,207 @@
|
|||||||
use crate::ZTimelineId;
|
use crate::ZTimelineId;
|
||||||
use log::*;
|
use log::*;
|
||||||
use std::io::Write;
|
use std::io::Write;
|
||||||
use tar::Builder;
|
use std::sync::Arc;
|
||||||
|
use std::time::SystemTime;
|
||||||
|
use tar::{Builder, Header};
|
||||||
use walkdir::WalkDir;
|
use walkdir::WalkDir;
|
||||||
|
|
||||||
|
use crate::repository::{BufferTag, RelTag, Timeline};
|
||||||
|
use postgres_ffi::pg_constants;
|
||||||
use postgres_ffi::relfile_utils::*;
|
use postgres_ffi::relfile_utils::*;
|
||||||
use zenith_utils::lsn::Lsn;
|
use zenith_utils::lsn::Lsn;
|
||||||
|
|
||||||
|
fn new_tar_header(path: &str, size: u64) -> anyhow::Result<Header> {
|
||||||
|
let mut header = Header::new_gnu();
|
||||||
|
header.set_size(size);
|
||||||
|
header.set_path(path)?;
|
||||||
|
header.set_mode(0b110000000);
|
||||||
|
header.set_mtime(SystemTime::now().duration_since(SystemTime::UNIX_EPOCH).unwrap().as_secs());
|
||||||
|
header.set_cksum();
|
||||||
|
Ok(header)
|
||||||
|
}
|
||||||
|
|
||||||
|
//
|
||||||
|
// Generate SRLU segment files from repoistory
|
||||||
|
//
|
||||||
|
fn add_slru_segments(
|
||||||
|
ar: &mut Builder<&mut dyn Write>,
|
||||||
|
timeline: &Arc<dyn Timeline>,
|
||||||
|
path: &str,
|
||||||
|
forknum: u8,
|
||||||
|
lsn: Lsn,
|
||||||
|
) -> anyhow::Result<()> {
|
||||||
|
let rel = RelTag {
|
||||||
|
spcnode: 0,
|
||||||
|
dbnode: 0,
|
||||||
|
relnode: 0,
|
||||||
|
forknum,
|
||||||
|
};
|
||||||
|
let (first, last) = timeline.get_range(rel, lsn)?;
|
||||||
|
const SEG_SIZE: usize =
|
||||||
|
pg_constants::BLCKSZ as usize * pg_constants::SLRU_PAGES_PER_SEGMENT as usize;
|
||||||
|
let mut seg_buf = [0u8; SEG_SIZE];
|
||||||
|
let mut curr_segno: Option<u32> = None;
|
||||||
|
for page in first..last {
|
||||||
|
let tag = BufferTag { rel, blknum: page };
|
||||||
|
let img = timeline.get_page_at_lsn(tag, lsn)?;
|
||||||
|
// Zero length image indicates truncated segment: just skip it
|
||||||
|
if img.len() != 0 {
|
||||||
|
assert!(img.len() == pg_constants::BLCKSZ as usize);
|
||||||
|
|
||||||
|
let segno = page / pg_constants::SLRU_PAGES_PER_SEGMENT;
|
||||||
|
if curr_segno.is_some() && curr_segno.unwrap() != segno {
|
||||||
|
let segname = format!("{}/{:>04X}", path, curr_segno.unwrap());
|
||||||
|
let header = new_tar_header(&segname, SEG_SIZE as u64)?;
|
||||||
|
ar.append(&header, &seg_buf[..])?;
|
||||||
|
seg_buf.fill(0);
|
||||||
|
}
|
||||||
|
curr_segno = Some(segno);
|
||||||
|
let offs_start = (page % pg_constants::SLRU_PAGES_PER_SEGMENT) as usize
|
||||||
|
* pg_constants::BLCKSZ as usize;
|
||||||
|
let offs_end = offs_start + pg_constants::BLCKSZ as usize;
|
||||||
|
seg_buf[offs_start..offs_end].copy_from_slice(&img);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if curr_segno.is_some() {
|
||||||
|
let segname = format!("{}/{:>04X}", path, curr_segno.unwrap());
|
||||||
|
let header = new_tar_header(&segname, SEG_SIZE as u64)?;
|
||||||
|
ar.append(&header, &seg_buf[..])?;
|
||||||
|
}
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
//
|
||||||
|
// Extract pg_filenode.map files from repoistory
|
||||||
|
//
|
||||||
|
fn add_relmap_files(
|
||||||
|
ar: &mut Builder<&mut dyn Write>,
|
||||||
|
timeline: &Arc<dyn Timeline>,
|
||||||
|
lsn: Lsn,
|
||||||
|
) -> anyhow::Result<()> {
|
||||||
|
for db in timeline.get_databases()?.iter() {
|
||||||
|
let tag = BufferTag {
|
||||||
|
rel: *db,
|
||||||
|
blknum: 0,
|
||||||
|
};
|
||||||
|
let img = timeline.get_page_at_lsn(tag, lsn)?;
|
||||||
|
let path = if db.spcnode == pg_constants::GLOBALTABLESPACE_OID {
|
||||||
|
String::from("global/pg_filenode.map")
|
||||||
|
} else {
|
||||||
|
// User defined tablespaces are not supported
|
||||||
|
assert!(db.spcnode == pg_constants::DEFAULTTABLESPACE_OID);
|
||||||
|
format!("base/{}/pg_filenode.map", db.dbnode)
|
||||||
|
};
|
||||||
|
let header = new_tar_header(&path, 512)?;
|
||||||
|
ar.append(&header, &img[..])?;
|
||||||
|
}
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
///
|
||||||
|
/// Generate tarball with non-relational files from repository
|
||||||
|
///
|
||||||
|
pub fn send_tarball_at_lsn(
|
||||||
|
write: &mut dyn Write,
|
||||||
|
timelineid: ZTimelineId,
|
||||||
|
timeline: &Arc<dyn Timeline>,
|
||||||
|
lsn: Lsn,
|
||||||
|
snapshot_lsn: Lsn,
|
||||||
|
) -> anyhow::Result<()> {
|
||||||
|
let mut ar = Builder::new(write);
|
||||||
|
|
||||||
|
let snappath = format!("timelines/{}/snapshots/{:016X}", timelineid, snapshot_lsn.0);
|
||||||
|
let walpath = format!("timelines/{}/wal", timelineid);
|
||||||
|
|
||||||
|
debug!("sending tarball of snapshot in {}", snappath);
|
||||||
|
for entry in WalkDir::new(&snappath) {
|
||||||
|
let entry = entry?;
|
||||||
|
let fullpath = entry.path();
|
||||||
|
let relpath = entry.path().strip_prefix(&snappath).unwrap();
|
||||||
|
|
||||||
|
if relpath.to_str().unwrap() == "" {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
if entry.file_type().is_dir() {
|
||||||
|
trace!(
|
||||||
|
"sending dir {} as {}",
|
||||||
|
fullpath.display(),
|
||||||
|
relpath.display()
|
||||||
|
);
|
||||||
|
ar.append_dir(relpath, fullpath)?;
|
||||||
|
} else if entry.file_type().is_symlink() {
|
||||||
|
error!("ignoring symlink in snapshot dir");
|
||||||
|
} else if entry.file_type().is_file() {
|
||||||
|
// Shared catalogs are exempt
|
||||||
|
if relpath.starts_with("global/") {
|
||||||
|
trace!("sending shared catalog {}", relpath.display());
|
||||||
|
ar.append_path_with_name(fullpath, relpath)?;
|
||||||
|
} else if !is_rel_file_path(relpath.to_str().unwrap()) {
|
||||||
|
if entry.file_name() != "pg_filenode.map"
|
||||||
|
&& !relpath.starts_with("pg_xact/")
|
||||||
|
&& !relpath.starts_with("pg_multixact/")
|
||||||
|
{
|
||||||
|
trace!("sending {}", relpath.display());
|
||||||
|
ar.append_path_with_name(fullpath, relpath)?;
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
trace!("not sending {}", relpath.display());
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
error!("unknown file type: {}", fullpath.display());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
add_slru_segments(
|
||||||
|
&mut ar,
|
||||||
|
timeline,
|
||||||
|
"pg_xact",
|
||||||
|
pg_constants::PG_XACT_FORKNUM,
|
||||||
|
lsn,
|
||||||
|
)?;
|
||||||
|
add_slru_segments(
|
||||||
|
&mut ar,
|
||||||
|
timeline,
|
||||||
|
"pg_multixact/members",
|
||||||
|
pg_constants::PG_MXACT_MEMBERS_FORKNUM,
|
||||||
|
lsn,
|
||||||
|
)?;
|
||||||
|
add_slru_segments(
|
||||||
|
&mut ar,
|
||||||
|
timeline,
|
||||||
|
"pg_multixact/offsets",
|
||||||
|
pg_constants::PG_MXACT_OFFSETS_FORKNUM,
|
||||||
|
lsn,
|
||||||
|
)?;
|
||||||
|
add_relmap_files(&mut ar, timeline, lsn)?;
|
||||||
|
|
||||||
|
// FIXME: Also send all the WAL. The compute node would only need
|
||||||
|
// the WAL that applies to non-relation files, because the page
|
||||||
|
// server handles all the relation files. But we don't have a
|
||||||
|
// mechanism for separating relation and non-relation WAL at the
|
||||||
|
// moment.
|
||||||
|
for entry in std::fs::read_dir(&walpath)? {
|
||||||
|
let entry = entry?;
|
||||||
|
let fullpath = &entry.path();
|
||||||
|
let relpath = fullpath.strip_prefix(&walpath).unwrap();
|
||||||
|
|
||||||
|
if !entry.path().is_file() {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
let archive_fname = relpath.to_str().unwrap();
|
||||||
|
let archive_fname = archive_fname
|
||||||
|
.strip_suffix(".partial")
|
||||||
|
.unwrap_or(&archive_fname);
|
||||||
|
let archive_path = "pg_wal/".to_owned() + archive_fname;
|
||||||
|
ar.append_path_with_name(fullpath, archive_path)?;
|
||||||
|
}
|
||||||
|
ar.finish()?;
|
||||||
|
debug!("all tarred up!");
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
///
|
///
|
||||||
/// Send a tarball containing a snapshot of all non-relation files in the
|
/// Send a tarball containing a snapshot of all non-relation files in the
|
||||||
/// PostgreSQL data directory, at given LSN
|
/// PostgreSQL data directory, at given LSN
|
||||||
|
|||||||
@@ -653,12 +653,18 @@ impl Connection {
|
|||||||
} else if query_string.starts_with(b"basebackup ") {
|
} else if query_string.starts_with(b"basebackup ") {
|
||||||
let (_l, r) = query_string.split_at("basebackup ".len());
|
let (_l, r) = query_string.split_at("basebackup ".len());
|
||||||
let r = r.to_vec();
|
let r = r.to_vec();
|
||||||
let timelineid_str = String::from(String::from_utf8(r)?.trim_end());
|
let basebackup_args = String::from(String::from_utf8(r)?.trim_end());
|
||||||
|
let args: Vec<&str> = basebackup_args.rsplit(' ').collect();
|
||||||
|
let timelineid_str = args[0];
|
||||||
info!("got basebackup command: \"{}\"", timelineid_str);
|
info!("got basebackup command: \"{}\"", timelineid_str);
|
||||||
let timelineid = ZTimelineId::from_str(&timelineid_str)?;
|
let timelineid = ZTimelineId::from_str(&timelineid_str)?;
|
||||||
|
let lsn = if args.len() > 1 {
|
||||||
|
Some(Lsn::from_str(args[1])?)
|
||||||
|
} else {
|
||||||
|
None
|
||||||
|
};
|
||||||
// Check that the timeline exists
|
// Check that the timeline exists
|
||||||
self.handle_basebackup_request(timelineid)?;
|
self.handle_basebackup_request(timelineid, lsn)?;
|
||||||
self.write_message_noflush(&BeMessage::CommandComplete)?;
|
self.write_message_noflush(&BeMessage::CommandComplete)?;
|
||||||
self.write_message(&BeMessage::ReadyForQuery)?;
|
self.write_message(&BeMessage::ReadyForQuery)?;
|
||||||
} else if query_string.starts_with(b"callmemaybe ") {
|
} else if query_string.starts_with(b"callmemaybe ") {
|
||||||
@@ -814,16 +820,19 @@ impl Connection {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
fn handle_basebackup_request(&mut self, timelineid: ZTimelineId) -> anyhow::Result<()> {
|
fn handle_basebackup_request(
|
||||||
|
&mut self,
|
||||||
|
timelineid: ZTimelineId,
|
||||||
|
lsn: Option<Lsn>,
|
||||||
|
) -> anyhow::Result<()> {
|
||||||
// check that the timeline exists
|
// check that the timeline exists
|
||||||
let repository = page_cache::get_repository();
|
let repository = page_cache::get_repository();
|
||||||
if repository.get_or_restore_timeline(timelineid).is_err() {
|
let timeline = repository.get_or_restore_timeline(timelineid).map_err(|_| {
|
||||||
bail!(
|
anyhow!(
|
||||||
"client requested basebackup on timeline {} which does not exist in page server",
|
"client requested basebackup on timeline {} which does not exist in page server",
|
||||||
timelineid
|
timelineid
|
||||||
);
|
)
|
||||||
}
|
})?;
|
||||||
|
|
||||||
/* switch client to COPYOUT */
|
/* switch client to COPYOUT */
|
||||||
let stream = &mut self.stream;
|
let stream = &mut self.stream;
|
||||||
stream.write_u8(b'H')?;
|
stream.write_u8(b'H')?;
|
||||||
@@ -836,10 +845,16 @@ impl Connection {
|
|||||||
/* Send a tarball of the latest snapshot on the timeline */
|
/* Send a tarball of the latest snapshot on the timeline */
|
||||||
|
|
||||||
// find latest snapshot
|
// find latest snapshot
|
||||||
let snapshotlsn = restore_local_repo::find_latest_snapshot(&self.conf, timelineid).unwrap();
|
let snapshot_lsn =
|
||||||
|
restore_local_repo::find_latest_snapshot(&self.conf, timelineid).unwrap();
|
||||||
basebackup::send_snapshot_tarball(&mut CopyDataSink { stream }, timelineid, snapshotlsn)?;
|
let req_lsn = lsn.unwrap_or(snapshot_lsn);
|
||||||
|
basebackup::send_tarball_at_lsn(
|
||||||
|
&mut CopyDataSink { stream },
|
||||||
|
timelineid,
|
||||||
|
&timeline,
|
||||||
|
req_lsn,
|
||||||
|
snapshot_lsn,
|
||||||
|
)?;
|
||||||
// CopyDone
|
// CopyDone
|
||||||
self.stream.write_u8(b'c')?;
|
self.stream.write_u8(b'c')?;
|
||||||
self.stream.write_u32::<BE>(4)?;
|
self.stream.write_u32::<BE>(4)?;
|
||||||
|
|||||||
@@ -106,6 +106,13 @@ pub trait Timeline {
|
|||||||
/// valid LSN, so that the WAL receiver knows where to restart streaming.
|
/// valid LSN, so that the WAL receiver knows where to restart streaming.
|
||||||
fn advance_last_record_lsn(&self, lsn: Lsn);
|
fn advance_last_record_lsn(&self, lsn: Lsn);
|
||||||
fn get_last_record_lsn(&self) -> Lsn;
|
fn get_last_record_lsn(&self) -> Lsn;
|
||||||
|
|
||||||
|
/// Get range [begin,end) of stored blocks. Used mostly for SMGR pseudorelations
|
||||||
|
/// but can be also applied to normal relations.
|
||||||
|
fn get_range(&self, rel: RelTag, lsn: Lsn) -> Result<(u32, u32)>;
|
||||||
|
|
||||||
|
/// Get vector of databases (represented using RelTag only dbnode and spcnode fields are used)
|
||||||
|
fn get_databases(&self) -> Result<Vec<RelTag>>;
|
||||||
}
|
}
|
||||||
|
|
||||||
#[derive(Clone)]
|
#[derive(Clone)]
|
||||||
@@ -118,25 +125,25 @@ pub struct RepositoryStats {
|
|||||||
|
|
||||||
#[derive(Debug, PartialEq, Eq, PartialOrd, Hash, Ord, Clone, Copy)]
|
#[derive(Debug, PartialEq, Eq, PartialOrd, Hash, Ord, Clone, Copy)]
|
||||||
pub struct RelTag {
|
pub struct RelTag {
|
||||||
|
pub forknum: u8,
|
||||||
pub spcnode: u32,
|
pub spcnode: u32,
|
||||||
pub dbnode: u32,
|
pub dbnode: u32,
|
||||||
pub relnode: u32,
|
pub relnode: u32,
|
||||||
pub forknum: u8,
|
|
||||||
}
|
}
|
||||||
|
|
||||||
impl RelTag {
|
impl RelTag {
|
||||||
pub fn pack(&self, buf: &mut BytesMut) {
|
pub fn pack(&self, buf: &mut BytesMut) {
|
||||||
|
buf.put_u8(self.forknum);
|
||||||
buf.put_u32(self.spcnode);
|
buf.put_u32(self.spcnode);
|
||||||
buf.put_u32(self.dbnode);
|
buf.put_u32(self.dbnode);
|
||||||
buf.put_u32(self.relnode);
|
buf.put_u32(self.relnode);
|
||||||
buf.put_u32(self.forknum as u32); // encode forknum as u32 to provide compatibility with wal_redo_postgres
|
|
||||||
}
|
}
|
||||||
pub fn unpack(buf: &mut BytesMut) -> RelTag {
|
pub fn unpack(buf: &mut BytesMut) -> RelTag {
|
||||||
RelTag {
|
RelTag {
|
||||||
|
forknum: buf.get_u8(),
|
||||||
spcnode: buf.get_u32(),
|
spcnode: buf.get_u32(),
|
||||||
dbnode: buf.get_u32(),
|
dbnode: buf.get_u32(),
|
||||||
relnode: buf.get_u32(),
|
relnode: buf.get_u32(),
|
||||||
forknum: buf.get_u32() as u8,
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -440,7 +440,6 @@ impl RocksTimeline {
|
|||||||
let new_img = self
|
let new_img = self
|
||||||
.walredo_mgr
|
.walredo_mgr
|
||||||
.request_redo(key.tag, key.lsn, base_img, records)?;
|
.request_redo(key.tag, key.lsn, base_img, records)?;
|
||||||
|
|
||||||
self.put_page_image(key.tag, key.lsn, new_img.clone());
|
self.put_page_image(key.tag, key.lsn, new_img.clone());
|
||||||
|
|
||||||
reconstructed += 1;
|
reconstructed += 1;
|
||||||
@@ -629,10 +628,88 @@ impl Timeline for RocksTimeline {
|
|||||||
/// Get size of relation at given LSN.
|
/// Get size of relation at given LSN.
|
||||||
///
|
///
|
||||||
fn get_relsize(&self, rel: RelTag, lsn: Lsn) -> Result<u32> {
|
fn get_relsize(&self, rel: RelTag, lsn: Lsn) -> Result<u32> {
|
||||||
self.wait_lsn(lsn)?;
|
let lsn = self.wait_lsn(lsn)?;
|
||||||
self.relsize_get_nowait(rel, lsn)
|
self.relsize_get_nowait(rel, lsn)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Get databases. This function is used to local pg_filenode.map files
|
||||||
|
fn get_databases(&self) -> Result<Vec<RelTag>> {
|
||||||
|
let key = CacheKey {
|
||||||
|
// minimal key
|
||||||
|
tag: BufferTag {
|
||||||
|
rel: RelTag {
|
||||||
|
forknum: pg_constants::PG_FILENODEMAP_FORKNUM,
|
||||||
|
spcnode: 0,
|
||||||
|
dbnode: 0,
|
||||||
|
relnode: 0,
|
||||||
|
},
|
||||||
|
blknum: 0,
|
||||||
|
},
|
||||||
|
lsn: Lsn(0),
|
||||||
|
};
|
||||||
|
let mut buf = BytesMut::new();
|
||||||
|
key.pack(&mut buf);
|
||||||
|
let mut dbs = Vec::new();
|
||||||
|
|
||||||
|
let mut iter = self.db.raw_iterator();
|
||||||
|
iter.seek(&buf[..]);
|
||||||
|
let mut prev_tag = key.tag.rel;
|
||||||
|
while iter.valid() {
|
||||||
|
let k = iter.key().unwrap();
|
||||||
|
buf.clear();
|
||||||
|
buf.extend_from_slice(&k);
|
||||||
|
let tag = RelTag::unpack(&mut buf);
|
||||||
|
if tag.forknum != pg_constants::PG_FILENODEMAP_FORKNUM {
|
||||||
|
break; // we are done with this fork
|
||||||
|
}
|
||||||
|
if tag != prev_tag {
|
||||||
|
dbs.push(tag); // collect unique tags
|
||||||
|
prev_tag = tag;
|
||||||
|
}
|
||||||
|
iter.next();
|
||||||
|
}
|
||||||
|
return Ok(dbs);
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Get range [begin,end) of stored blocks. Used mostly for SMGR pseudorelations
|
||||||
|
/// but can be also applied to normal relations.
|
||||||
|
fn get_range(&self, rel: RelTag, lsn: Lsn) -> Result<(u32, u32)> {
|
||||||
|
let lsn = self.wait_lsn(lsn)?;
|
||||||
|
let mut key = CacheKey {
|
||||||
|
// minimal key to start with
|
||||||
|
tag: BufferTag { rel, blknum: 0 },
|
||||||
|
lsn,
|
||||||
|
};
|
||||||
|
let mut buf = BytesMut::new();
|
||||||
|
key.pack(&mut buf);
|
||||||
|
let mut iter = self.db.raw_iterator();
|
||||||
|
iter.seek(&buf[..]); // locate first entry
|
||||||
|
if iter.valid() {
|
||||||
|
let k = iter.key().unwrap();
|
||||||
|
buf.clear();
|
||||||
|
buf.extend_from_slice(&k);
|
||||||
|
let tag = BufferTag::unpack(&mut buf);
|
||||||
|
if tag.rel == rel {
|
||||||
|
// still trversing this relation
|
||||||
|
let first_blknum = tag.blknum;
|
||||||
|
key.tag.blknum = u32::MAX; // maximal key
|
||||||
|
buf.clear();
|
||||||
|
key.pack(&mut buf);
|
||||||
|
let mut iter = self.db.raw_iterator();
|
||||||
|
iter.seek_for_prev(&buf[..]); // localte last entry
|
||||||
|
if iter.valid() {
|
||||||
|
let k = iter.key().unwrap();
|
||||||
|
buf.clear();
|
||||||
|
buf.extend_from_slice(&k);
|
||||||
|
let tag = BufferTag::unpack(&mut buf);
|
||||||
|
let last_blknum = tag.blknum;
|
||||||
|
return Ok((first_blknum, last_blknum + 1)); // upper boundary is exclusive
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
Ok((0, 0)) // empty range
|
||||||
|
}
|
||||||
|
|
||||||
///
|
///
|
||||||
/// Does relation exist at given LSN?
|
/// Does relation exist at given LSN?
|
||||||
///
|
///
|
||||||
@@ -731,14 +808,25 @@ impl Timeline for RocksTimeline {
|
|||||||
/// Memorize a full image of a page version
|
/// Memorize a full image of a page version
|
||||||
///
|
///
|
||||||
fn put_page_image(&self, tag: BufferTag, lsn: Lsn, img: Bytes) {
|
fn put_page_image(&self, tag: BufferTag, lsn: Lsn, img: Bytes) {
|
||||||
|
let img_len = img.len();
|
||||||
let key = CacheKey { tag, lsn };
|
let key = CacheKey { tag, lsn };
|
||||||
let content = CacheEntryContent::PageImage(img);
|
let content = CacheEntryContent::PageImage(img);
|
||||||
|
|
||||||
let mut key_buf = BytesMut::new();
|
let mut key_buf = BytesMut::new();
|
||||||
key.pack(&mut key_buf);
|
key.pack(&mut key_buf);
|
||||||
let mut val_buf = BytesMut::new();
|
let mut val_buf = BytesMut::new();
|
||||||
content.pack(&mut val_buf);
|
content.pack(&mut val_buf);
|
||||||
|
|
||||||
|
// Zero size of page image indicates that SLRU page was truncated
|
||||||
|
if img_len == 0 && key.tag.rel.forknum > pg_constants::PG_XACT_FORKNUM {
|
||||||
|
if (val_buf[0] & UNUSED_VERSION_FLAG) != 0 {
|
||||||
|
// records already marked for deletion
|
||||||
|
return;
|
||||||
|
} else {
|
||||||
|
// delete truncated multixact page
|
||||||
|
val_buf[0] |= UNUSED_VERSION_FLAG;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
trace!("put_wal_record lsn: {}", key.lsn);
|
trace!("put_wal_record lsn: {}", key.lsn);
|
||||||
let _res = self.db.put(&key_buf[..], &val_buf[..]);
|
let _res = self.db.put(&key_buf[..], &val_buf[..]);
|
||||||
|
|
||||||
|
|||||||
@@ -23,7 +23,7 @@ use anyhow::Result;
|
|||||||
use bytes::Bytes;
|
use bytes::Bytes;
|
||||||
|
|
||||||
use crate::repository::{BufferTag, RelTag, Timeline};
|
use crate::repository::{BufferTag, RelTag, Timeline};
|
||||||
use crate::waldecoder::{decode_wal_record, WalStreamDecoder};
|
use crate::waldecoder::{Oid, decode_wal_record, WalStreamDecoder};
|
||||||
use crate::PageServerConf;
|
use crate::PageServerConf;
|
||||||
use crate::ZTimelineId;
|
use crate::ZTimelineId;
|
||||||
use postgres_ffi::pg_constants;
|
use postgres_ffi::pg_constants;
|
||||||
@@ -120,8 +120,26 @@ fn restore_snapshot(
|
|||||||
None => continue,
|
None => continue,
|
||||||
|
|
||||||
// These special files appear in the snapshot, but are not needed by the page server
|
// These special files appear in the snapshot, but are not needed by the page server
|
||||||
Some("pg_control") => continue,
|
Some("pg_control") => restore_nonrel_file(
|
||||||
Some("pg_filenode.map") => continue,
|
conf,
|
||||||
|
timeline,
|
||||||
|
timelineid,
|
||||||
|
snapshot,
|
||||||
|
pg_constants::GLOBALTABLESPACE_OID,
|
||||||
|
0,
|
||||||
|
pg_constants::PG_CONTROLFILE_FORKNUM,
|
||||||
|
&direntry.path(),
|
||||||
|
)?,
|
||||||
|
Some("pg_filenode.map") => restore_nonrel_file(
|
||||||
|
conf,
|
||||||
|
timeline,
|
||||||
|
timelineid,
|
||||||
|
snapshot,
|
||||||
|
pg_constants::GLOBALTABLESPACE_OID,
|
||||||
|
0,
|
||||||
|
pg_constants::PG_FILENODEMAP_FORKNUM,
|
||||||
|
&direntry.path(),
|
||||||
|
)?,
|
||||||
|
|
||||||
// Load any relation files into the page server
|
// Load any relation files into the page server
|
||||||
_ => restore_relfile(
|
_ => restore_relfile(
|
||||||
@@ -148,7 +166,16 @@ fn restore_snapshot(
|
|||||||
|
|
||||||
// These special files appear in the snapshot, but are not needed by the page server
|
// These special files appear in the snapshot, but are not needed by the page server
|
||||||
Some("PG_VERSION") => continue,
|
Some("PG_VERSION") => continue,
|
||||||
Some("pg_filenode.map") => continue,
|
Some("pg_filenode.map") => restore_nonrel_file(
|
||||||
|
conf,
|
||||||
|
timeline,
|
||||||
|
timelineid,
|
||||||
|
snapshot,
|
||||||
|
pg_constants::DEFAULTTABLESPACE_OID,
|
||||||
|
dboid,
|
||||||
|
pg_constants::PG_FILENODEMAP_FORKNUM,
|
||||||
|
&direntry.path(),
|
||||||
|
)?,
|
||||||
|
|
||||||
// Load any relation files into the page server
|
// Load any relation files into the page server
|
||||||
_ => restore_relfile(
|
_ => restore_relfile(
|
||||||
@@ -163,7 +190,7 @@ fn restore_snapshot(
|
|||||||
}
|
}
|
||||||
for entry in fs::read_dir(snapshotpath.join("pg_xact"))? {
|
for entry in fs::read_dir(snapshotpath.join("pg_xact"))? {
|
||||||
let entry = entry?;
|
let entry = entry?;
|
||||||
restore_nonrelfile(
|
restore_slru_file(
|
||||||
conf,
|
conf,
|
||||||
timeline,
|
timeline,
|
||||||
timelineid,
|
timelineid,
|
||||||
@@ -172,6 +199,28 @@ fn restore_snapshot(
|
|||||||
&entry.path(),
|
&entry.path(),
|
||||||
)?;
|
)?;
|
||||||
}
|
}
|
||||||
|
for entry in fs::read_dir(snapshotpath.join("pg_multixact").join("members"))? {
|
||||||
|
let entry = entry?;
|
||||||
|
restore_slru_file(
|
||||||
|
conf,
|
||||||
|
timeline,
|
||||||
|
timelineid,
|
||||||
|
snapshot,
|
||||||
|
pg_constants::PG_MXACT_MEMBERS_FORKNUM,
|
||||||
|
&entry.path(),
|
||||||
|
)?;
|
||||||
|
}
|
||||||
|
for entry in fs::read_dir(snapshotpath.join("pg_multixact").join("offsets"))? {
|
||||||
|
let entry = entry?;
|
||||||
|
restore_slru_file(
|
||||||
|
conf,
|
||||||
|
timeline,
|
||||||
|
timelineid,
|
||||||
|
snapshot,
|
||||||
|
pg_constants::PG_MXACT_OFFSETS_FORKNUM,
|
||||||
|
&entry.path(),
|
||||||
|
)?;
|
||||||
|
}
|
||||||
// TODO: Scan pg_tblspc
|
// TODO: Scan pg_tblspc
|
||||||
|
|
||||||
Ok(())
|
Ok(())
|
||||||
@@ -180,8 +229,8 @@ fn restore_snapshot(
|
|||||||
fn restore_relfile(
|
fn restore_relfile(
|
||||||
timeline: &dyn Timeline,
|
timeline: &dyn Timeline,
|
||||||
snapshot: &str,
|
snapshot: &str,
|
||||||
spcoid: u32,
|
spcoid: Oid,
|
||||||
dboid: u32,
|
dboid: Oid,
|
||||||
path: &Path,
|
path: &Path,
|
||||||
) -> Result<()> {
|
) -> Result<()> {
|
||||||
let lsn = Lsn::from_hex(snapshot)?;
|
let lsn = Lsn::from_hex(snapshot)?;
|
||||||
@@ -239,7 +288,39 @@ fn restore_relfile(
|
|||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
fn restore_nonrelfile(
|
fn restore_nonrel_file(
|
||||||
|
_conf: &PageServerConf,
|
||||||
|
timeline: &dyn Timeline,
|
||||||
|
_timelineid: ZTimelineId,
|
||||||
|
snapshot: &str,
|
||||||
|
spcoid: Oid,
|
||||||
|
dboid: Oid,
|
||||||
|
forknum: u8,
|
||||||
|
path: &Path,
|
||||||
|
) -> Result<()> {
|
||||||
|
let lsn = Lsn::from_hex(snapshot)?;
|
||||||
|
|
||||||
|
// Does it look like a relation file?
|
||||||
|
|
||||||
|
let mut file = File::open(path)?;
|
||||||
|
let mut buffer = Vec::new();
|
||||||
|
// read the whole file
|
||||||
|
file.read_to_end(&mut buffer)?;
|
||||||
|
|
||||||
|
let tag = BufferTag {
|
||||||
|
rel: RelTag {
|
||||||
|
spcnode: spcoid,
|
||||||
|
dbnode: dboid,
|
||||||
|
relnode: 0,
|
||||||
|
forknum,
|
||||||
|
},
|
||||||
|
blknum: 0,
|
||||||
|
};
|
||||||
|
timeline.put_page_image(tag, lsn, Bytes::copy_from_slice(&buffer[..]));
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
fn restore_slru_file(
|
||||||
_conf: &PageServerConf,
|
_conf: &PageServerConf,
|
||||||
timeline: &dyn Timeline,
|
timeline: &dyn Timeline,
|
||||||
_timelineid: ZTimelineId,
|
_timelineid: ZTimelineId,
|
||||||
@@ -263,7 +344,7 @@ fn restore_nonrelfile(
|
|||||||
let tag = BufferTag {
|
let tag = BufferTag {
|
||||||
rel: RelTag {
|
rel: RelTag {
|
||||||
spcnode: 0,
|
spcnode: 0,
|
||||||
dbnode: 0,
|
dbnode: 0,
|
||||||
relnode: 0,
|
relnode: 0,
|
||||||
forknum,
|
forknum,
|
||||||
},
|
},
|
||||||
@@ -296,7 +377,7 @@ fn restore_nonrelfile(
|
|||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
// Scan WAL on a timeline, starting from gien LSN, and load all the records
|
// Scan WAL on a timeline, starting from given LSN, and load all the records
|
||||||
// into the page cache.
|
// into the page cache.
|
||||||
fn restore_wal(timeline: &dyn Timeline, timelineid: ZTimelineId, startpoint: Lsn) -> Result<()> {
|
fn restore_wal(timeline: &dyn Timeline, timelineid: ZTimelineId, startpoint: Lsn) -> Result<()> {
|
||||||
let walpath = format!("timelines/{}/wal", timelineid);
|
let walpath = format!("timelines/{}/wal", timelineid);
|
||||||
|
|||||||
@@ -310,6 +310,9 @@ pub type Oid = u32;
|
|||||||
pub type TransactionId = u32;
|
pub type TransactionId = u32;
|
||||||
pub type BlockNumber = u32;
|
pub type BlockNumber = u32;
|
||||||
pub type OffsetNumber = u16;
|
pub type OffsetNumber = u16;
|
||||||
|
pub type MultiXactId = TransactionId;
|
||||||
|
pub type MultiXactOffset = u32;
|
||||||
|
pub type MultiXactStatus = u32;
|
||||||
|
|
||||||
#[repr(C)]
|
#[repr(C)]
|
||||||
#[derive(Debug, Clone, Copy)]
|
#[derive(Debug, Clone, Copy)]
|
||||||
@@ -319,6 +322,24 @@ pub struct RelFileNode {
|
|||||||
pub relnode: Oid, /* relation */
|
pub relnode: Oid, /* relation */
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[repr(C)]
|
||||||
|
#[derive(Debug)]
|
||||||
|
pub struct XlRelmapUpdate {
|
||||||
|
pub dbid: Oid, /* database ID, or 0 for shared map */
|
||||||
|
pub tsid: Oid, /* database's tablespace, or pg_global */
|
||||||
|
pub nbytes: i32, /* size of relmap data */
|
||||||
|
}
|
||||||
|
|
||||||
|
impl XlRelmapUpdate {
|
||||||
|
pub fn decode(buf: &mut Bytes) -> XlRelmapUpdate {
|
||||||
|
XlRelmapUpdate {
|
||||||
|
dbid: buf.get_u32_le(),
|
||||||
|
tsid: buf.get_u32_le(),
|
||||||
|
nbytes: buf.get_i32_le(),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
#[repr(C)]
|
#[repr(C)]
|
||||||
#[derive(Debug)]
|
#[derive(Debug)]
|
||||||
pub struct XlSmgrTruncate {
|
pub struct XlSmgrTruncate {
|
||||||
@@ -441,6 +462,74 @@ impl XlHeapUpdate {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[repr(C)]
|
||||||
|
#[derive(Debug)]
|
||||||
|
pub struct MultiXactMember {
|
||||||
|
pub xid: TransactionId,
|
||||||
|
pub status: MultiXactStatus,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl MultiXactMember {
|
||||||
|
pub fn decode(buf: &mut Bytes) -> MultiXactMember {
|
||||||
|
MultiXactMember {
|
||||||
|
xid: buf.get_u32_le(),
|
||||||
|
status: buf.get_u32_le(),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[repr(C)]
|
||||||
|
#[derive(Debug)]
|
||||||
|
pub struct XlMultiXactCreate {
|
||||||
|
pub mid: MultiXactId, /* new MultiXact's ID */
|
||||||
|
pub moff: MultiXactOffset, /* its starting offset in members file */
|
||||||
|
pub nmembers: u32, /* number of member XIDs */
|
||||||
|
pub members: Vec<MultiXactMember>,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl XlMultiXactCreate {
|
||||||
|
pub fn decode(buf: &mut Bytes) -> XlMultiXactCreate {
|
||||||
|
let mid = buf.get_u32_le();
|
||||||
|
let moff = buf.get_u32_le();
|
||||||
|
let nmembers = buf.get_u32_le();
|
||||||
|
let mut members = Vec::new();
|
||||||
|
for _ in 0..nmembers {
|
||||||
|
members.push(MultiXactMember::decode(buf));
|
||||||
|
}
|
||||||
|
XlMultiXactCreate {
|
||||||
|
mid,
|
||||||
|
moff,
|
||||||
|
nmembers,
|
||||||
|
members,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[repr(C)]
|
||||||
|
#[derive(Debug)]
|
||||||
|
pub struct XlMultiXactTruncate {
|
||||||
|
oldest_multi_db: Oid,
|
||||||
|
/* to-be-truncated range of multixact offsets */
|
||||||
|
start_trunc_off: MultiXactId, /* just for completeness' sake */
|
||||||
|
end_trunc_off: MultiXactId,
|
||||||
|
|
||||||
|
/* to-be-truncated range of multixact members */
|
||||||
|
start_trunc_memb: MultiXactOffset,
|
||||||
|
end_trunc_memb: MultiXactOffset,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl XlMultiXactTruncate {
|
||||||
|
pub fn decode(buf: &mut Bytes) -> XlMultiXactTruncate {
|
||||||
|
XlMultiXactTruncate {
|
||||||
|
oldest_multi_db: buf.get_u32_le(),
|
||||||
|
start_trunc_off: buf.get_u32_le(),
|
||||||
|
end_trunc_off: buf.get_u32_le(),
|
||||||
|
start_trunc_memb: buf.get_u32_le(),
|
||||||
|
end_trunc_memb: buf.get_u32_le(),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
//
|
//
|
||||||
// Routines to decode a WAL record and figure out which blocks are modified
|
// Routines to decode a WAL record and figure out which blocks are modified
|
||||||
//
|
//
|
||||||
@@ -930,6 +1019,73 @@ pub fn decode_wal_record(record: Bytes) -> DecodedWALRecord {
|
|||||||
blocks.push(blk);
|
blocks.push(blk);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
} else if xlogrec.xl_rmid == pg_constants::RM_MULTIXACT_ID {
|
||||||
|
let info = xlogrec.xl_info & pg_constants::XLOG_XACT_OPMASK;
|
||||||
|
if info == pg_constants::XLOG_MULTIXACT_ZERO_OFF_PAGE {
|
||||||
|
let mut blk = DecodedBkpBlock::new();
|
||||||
|
blk.forknum = pg_constants::PG_MXACT_OFFSETS_FORKNUM;
|
||||||
|
blk.blkno = buf.get_u32_le();
|
||||||
|
blk.will_init = true;
|
||||||
|
blocks.push(blk);
|
||||||
|
} else if info == pg_constants::XLOG_MULTIXACT_ZERO_MEM_PAGE {
|
||||||
|
let mut blk = DecodedBkpBlock::new();
|
||||||
|
blk.forknum = pg_constants::PG_MXACT_MEMBERS_FORKNUM;
|
||||||
|
blk.blkno = buf.get_u32_le();
|
||||||
|
blk.will_init = true;
|
||||||
|
blocks.push(blk);
|
||||||
|
} else if info == pg_constants::XLOG_MULTIXACT_CREATE_ID {
|
||||||
|
let xlrec = XlMultiXactCreate::decode(&mut buf);
|
||||||
|
// Update offset page
|
||||||
|
let mut blk = DecodedBkpBlock::new();
|
||||||
|
blk.blkno = xlrec.mid / pg_constants::MULTIXACT_OFFSETS_PER_PAGE as u32;
|
||||||
|
blk.forknum = pg_constants::PG_MXACT_OFFSETS_FORKNUM;
|
||||||
|
blocks.push(blk);
|
||||||
|
let first_mbr_blkno = xlrec.moff / pg_constants::MULTIXACT_MEMBERS_PER_PAGE as u32;
|
||||||
|
let last_mbr_blkno =
|
||||||
|
(xlrec.moff + xlrec.nmembers - 1) / pg_constants::MULTIXACT_MEMBERS_PER_PAGE as u32;
|
||||||
|
for blkno in first_mbr_blkno..=last_mbr_blkno {
|
||||||
|
// Update members page
|
||||||
|
let mut blk = DecodedBkpBlock::new();
|
||||||
|
blk.forknum = pg_constants::PG_MXACT_MEMBERS_FORKNUM;
|
||||||
|
blk.blkno = blkno;
|
||||||
|
blocks.push(blk);
|
||||||
|
}
|
||||||
|
} else if info == pg_constants::XLOG_MULTIXACT_TRUNCATE_ID {
|
||||||
|
let xlrec = XlMultiXactTruncate::decode(&mut buf);
|
||||||
|
let first_off_blkno =
|
||||||
|
xlrec.start_trunc_off / pg_constants::MULTIXACT_OFFSETS_PER_PAGE as u32;
|
||||||
|
let last_off_blkno =
|
||||||
|
xlrec.end_trunc_off / pg_constants::MULTIXACT_OFFSETS_PER_PAGE as u32;
|
||||||
|
for blkno in first_off_blkno..last_off_blkno {
|
||||||
|
let mut blk = DecodedBkpBlock::new();
|
||||||
|
blk.forknum = pg_constants::PG_MXACT_OFFSETS_FORKNUM;
|
||||||
|
blk.blkno = blkno;
|
||||||
|
blk.will_init = true;
|
||||||
|
blocks.push(blk);
|
||||||
|
}
|
||||||
|
let first_mbr_blkno =
|
||||||
|
xlrec.start_trunc_memb / pg_constants::MULTIXACT_MEMBERS_PER_PAGE as u32;
|
||||||
|
let last_mbr_blkno =
|
||||||
|
xlrec.end_trunc_memb / pg_constants::MULTIXACT_MEMBERS_PER_PAGE as u32;
|
||||||
|
for blkno in first_mbr_blkno..last_mbr_blkno {
|
||||||
|
let mut blk = DecodedBkpBlock::new();
|
||||||
|
blk.forknum = pg_constants::PG_MXACT_MEMBERS_FORKNUM;
|
||||||
|
blk.blkno = blkno;
|
||||||
|
blk.will_init = true;
|
||||||
|
blocks.push(blk);
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
assert!(false);
|
||||||
|
}
|
||||||
|
} else if xlogrec.xl_rmid == pg_constants::RM_RELMAP_ID {
|
||||||
|
let xlrec = XlRelmapUpdate::decode(&mut buf);
|
||||||
|
let mut blk = DecodedBkpBlock::new();
|
||||||
|
blk.forknum = pg_constants::PG_FILENODEMAP_FORKNUM;
|
||||||
|
blk.rnode_spcnode = xlrec.tsid;
|
||||||
|
blk.rnode_dbnode = xlrec.dbid;
|
||||||
|
blk.rnode_relnode = 0;
|
||||||
|
blk.will_init = true;
|
||||||
|
blocks.push(blk);
|
||||||
}
|
}
|
||||||
|
|
||||||
DecodedWALRecord {
|
DecodedWALRecord {
|
||||||
|
|||||||
@@ -14,6 +14,7 @@
|
|||||||
//! TODO: Even though the postgres code runs in a separate process,
|
//! TODO: Even though the postgres code runs in a separate process,
|
||||||
//! it's not a secure sandbox.
|
//! it's not a secure sandbox.
|
||||||
//!
|
//!
|
||||||
|
use byteorder::{ByteOrder, LittleEndian};
|
||||||
use bytes::{Buf, BufMut, Bytes, BytesMut};
|
use bytes::{Buf, BufMut, Bytes, BytesMut};
|
||||||
use log::*;
|
use log::*;
|
||||||
use std::assert;
|
use std::assert;
|
||||||
@@ -36,6 +37,7 @@ use zenith_utils::lsn::Lsn;
|
|||||||
|
|
||||||
use crate::repository::BufferTag;
|
use crate::repository::BufferTag;
|
||||||
use crate::repository::WALRecord;
|
use crate::repository::WALRecord;
|
||||||
|
use crate::waldecoder::{MultiXactId, XlMultiXactCreate};
|
||||||
use crate::PageServerConf;
|
use crate::PageServerConf;
|
||||||
use postgres_ffi::pg_constants;
|
use postgres_ffi::pg_constants;
|
||||||
use postgres_ffi::xlog_utils::XLogRecord;
|
use postgres_ffi::xlog_utils::XLogRecord;
|
||||||
@@ -170,6 +172,25 @@ impl WalRedoManager for PostgresRedoManager {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
fn mx_offset_to_flags_offset(xid: MultiXactId) -> usize {
|
||||||
|
return ((xid / pg_constants::MULTIXACT_MEMBERS_PER_MEMBERGROUP as u32) as u16
|
||||||
|
% pg_constants::MULTIXACT_MEMBERGROUPS_PER_PAGE
|
||||||
|
* pg_constants::MULTIXACT_MEMBERGROUP_SIZE) as usize;
|
||||||
|
}
|
||||||
|
|
||||||
|
fn mx_offset_to_flags_bitshift(xid: MultiXactId) -> u16 {
|
||||||
|
return (xid as u16) % pg_constants::MULTIXACT_MEMBERS_PER_MEMBERGROUP
|
||||||
|
* pg_constants::MXACT_MEMBER_BITS_PER_XACT;
|
||||||
|
}
|
||||||
|
|
||||||
|
/* Location (byte offset within page) of TransactionId of given member */
|
||||||
|
fn mx_offset_to_member_offset(xid: MultiXactId) -> usize {
|
||||||
|
return mx_offset_to_flags_offset(xid)
|
||||||
|
+ (pg_constants::MULTIXACT_FLAGBYTES_PER_GROUP
|
||||||
|
+ (xid as u16 % pg_constants::MULTIXACT_MEMBERS_PER_MEMBERGROUP) * 4)
|
||||||
|
as usize;
|
||||||
|
}
|
||||||
|
|
||||||
///
|
///
|
||||||
/// WAL redo thread
|
/// WAL redo thread
|
||||||
///
|
///
|
||||||
@@ -255,7 +276,7 @@ impl PostgresRedoManagerInternal {
|
|||||||
let start = Instant::now();
|
let start = Instant::now();
|
||||||
|
|
||||||
let apply_result: Result<Bytes, Error>;
|
let apply_result: Result<Bytes, Error>;
|
||||||
if tag.rel.forknum == pg_constants::PG_XACT_FORKNUM {
|
if tag.rel.forknum >= pg_constants::PG_XACT_FORKNUM {
|
||||||
const ZERO_PAGE: [u8; 8192] = [0u8; 8192];
|
const ZERO_PAGE: [u8; 8192] = [0u8; 8192];
|
||||||
let mut page = BytesMut::new();
|
let mut page = BytesMut::new();
|
||||||
if let Some(fpi) = base_img {
|
if let Some(fpi) = base_img {
|
||||||
@@ -344,6 +365,55 @@ impl PostgresRedoManagerInternal {
|
|||||||
record.lsn,
|
record.lsn,
|
||||||
record.main_data_offset, record.rec.len());
|
record.main_data_offset, record.rec.len());
|
||||||
}
|
}
|
||||||
|
} else if xlogrec.xl_rmid == pg_constants::RM_MULTIXACT_ID {
|
||||||
|
let info = xlogrec.xl_info & pg_constants::XLOG_XACT_OPMASK;
|
||||||
|
if info == pg_constants::XLOG_MULTIXACT_ZERO_OFF_PAGE {
|
||||||
|
page.fill(0);
|
||||||
|
} else if info == pg_constants::XLOG_MULTIXACT_ZERO_MEM_PAGE {
|
||||||
|
page.fill(0);
|
||||||
|
} else if info == pg_constants::XLOG_MULTIXACT_CREATE_ID {
|
||||||
|
let xlrec = XlMultiXactCreate::decode(&mut buf);
|
||||||
|
if tag.rel.forknum == pg_constants::PG_MXACT_OFFSETS_FORKNUM {
|
||||||
|
let offs = (xlrec.mid % pg_constants::MULTIXACT_OFFSETS_PER_PAGE as u32
|
||||||
|
* 4) as usize;
|
||||||
|
LittleEndian::write_u32(&mut page[offs..offs + 4], xlrec.moff);
|
||||||
|
} else {
|
||||||
|
assert!(tag.rel.forknum == pg_constants::PG_MXACT_MEMBERS_FORKNUM);
|
||||||
|
for i in 0..xlrec.nmembers {
|
||||||
|
let blkno = i / pg_constants::MULTIXACT_MEMBERS_PER_PAGE as u32;
|
||||||
|
if blkno == tag.blknum {
|
||||||
|
// update only target block
|
||||||
|
let offset = xlrec.moff + i;
|
||||||
|
let memberoff = mx_offset_to_member_offset(offset);
|
||||||
|
let flagsoff = mx_offset_to_flags_offset(offset);
|
||||||
|
let bshift = mx_offset_to_flags_bitshift(offset);
|
||||||
|
let mut flagsval =
|
||||||
|
LittleEndian::read_u32(&page[flagsoff..flagsoff + 4]);
|
||||||
|
flagsval &=
|
||||||
|
!(((1 << pg_constants::MXACT_MEMBER_BITS_PER_XACT) - 1)
|
||||||
|
<< bshift);
|
||||||
|
flagsval |= xlrec.members[i as usize].status << bshift;
|
||||||
|
LittleEndian::write_u32(
|
||||||
|
&mut page[flagsoff..flagsoff + 4],
|
||||||
|
flagsval,
|
||||||
|
);
|
||||||
|
LittleEndian::write_u32(
|
||||||
|
&mut page[memberoff..memberoff + 4],
|
||||||
|
xlrec.members[i as usize].xid,
|
||||||
|
);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
} else if info == pg_constants::XLOG_MULTIXACT_TRUNCATE_ID {
|
||||||
|
// empty page image indicates that this SLRU page is truncated and can be removed by GC
|
||||||
|
page.clear();
|
||||||
|
} else {
|
||||||
|
assert!(false);
|
||||||
|
}
|
||||||
|
} else if xlogrec.xl_rmid == pg_constants::RM_RELMAP_ID {
|
||||||
|
page.clear();
|
||||||
|
page.extend_from_slice(&buf[..]);
|
||||||
|
assert!(page.len() == 512); // size of pg_filenode.map
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -537,7 +607,7 @@ impl PostgresRedoProcess {
|
|||||||
// explanation of the protocol.
|
// explanation of the protocol.
|
||||||
|
|
||||||
fn build_begin_redo_for_block_msg(tag: BufferTag) -> Bytes {
|
fn build_begin_redo_for_block_msg(tag: BufferTag) -> Bytes {
|
||||||
let len = 4 + 5 * 4;
|
let len = 4 + 1 + 4 * 4;
|
||||||
let mut buf = BytesMut::with_capacity(1 + len);
|
let mut buf = BytesMut::with_capacity(1 + len);
|
||||||
|
|
||||||
buf.put_u8(b'B');
|
buf.put_u8(b'B');
|
||||||
@@ -552,7 +622,7 @@ fn build_begin_redo_for_block_msg(tag: BufferTag) -> Bytes {
|
|||||||
fn build_push_page_msg(tag: BufferTag, base_img: Bytes) -> Bytes {
|
fn build_push_page_msg(tag: BufferTag, base_img: Bytes) -> Bytes {
|
||||||
assert!(base_img.len() == 8192);
|
assert!(base_img.len() == 8192);
|
||||||
|
|
||||||
let len = 4 + 5 * 4 + base_img.len();
|
let len = 4 + 1 + 4 * 4 + base_img.len();
|
||||||
let mut buf = BytesMut::with_capacity(1 + len);
|
let mut buf = BytesMut::with_capacity(1 + len);
|
||||||
|
|
||||||
buf.put_u8(b'P');
|
buf.put_u8(b'P');
|
||||||
@@ -580,7 +650,7 @@ fn build_apply_record_msg(endlsn: Lsn, rec: Bytes) -> Bytes {
|
|||||||
}
|
}
|
||||||
|
|
||||||
fn build_get_page_msg(tag: BufferTag) -> Bytes {
|
fn build_get_page_msg(tag: BufferTag) -> Bytes {
|
||||||
let len = 4 + 5 * 4;
|
let len = 4 + 1 + 4 * 4;
|
||||||
let mut buf = BytesMut::with_capacity(1 + len);
|
let mut buf = BytesMut::with_capacity(1 + len);
|
||||||
|
|
||||||
buf.put_u8(b'G');
|
buf.put_u8(b'G');
|
||||||
|
|||||||
@@ -77,6 +77,25 @@ pub const XACT_XINFO_HAS_TWOPHASE: u32 = 1u32 << 4;
|
|||||||
pub const XLOG_SWITCH: u8 = 0x40;
|
pub const XLOG_SWITCH: u8 = 0x40;
|
||||||
pub const XLOG_SMGR_TRUNCATE: u8 = 0x20;
|
pub const XLOG_SMGR_TRUNCATE: u8 = 0x20;
|
||||||
|
|
||||||
|
// From multixact.h
|
||||||
|
pub const XLOG_MULTIXACT_ZERO_OFF_PAGE: u8 = 0x00;
|
||||||
|
pub const XLOG_MULTIXACT_ZERO_MEM_PAGE: u8 = 0x10;
|
||||||
|
pub const XLOG_MULTIXACT_CREATE_ID: u8 = 0x20;
|
||||||
|
pub const XLOG_MULTIXACT_TRUNCATE_ID: u8 = 0x30;
|
||||||
|
|
||||||
|
pub const MULTIXACT_OFFSETS_PER_PAGE: u16 = BLCKSZ / 4;
|
||||||
|
pub const MXACT_MEMBER_BITS_PER_XACT: u16 = 8;
|
||||||
|
pub const MXACT_MEMBER_FLAGS_PER_BYTE: u16 = 1;
|
||||||
|
pub const MULTIXACT_FLAGBYTES_PER_GROUP: u16 = 4;
|
||||||
|
pub const MULTIXACT_MEMBERS_PER_MEMBERGROUP: u16 =
|
||||||
|
MULTIXACT_FLAGBYTES_PER_GROUP * MXACT_MEMBER_FLAGS_PER_BYTE;
|
||||||
|
/* size in bytes of a complete group */
|
||||||
|
pub const MULTIXACT_MEMBERGROUP_SIZE: u16 =
|
||||||
|
4 * MULTIXACT_MEMBERS_PER_MEMBERGROUP + MULTIXACT_FLAGBYTES_PER_GROUP;
|
||||||
|
pub const MULTIXACT_MEMBERGROUPS_PER_PAGE: u16 = BLCKSZ / MULTIXACT_MEMBERGROUP_SIZE;
|
||||||
|
pub const MULTIXACT_MEMBERS_PER_PAGE: u16 =
|
||||||
|
MULTIXACT_MEMBERGROUPS_PER_PAGE * MULTIXACT_MEMBERS_PER_MEMBERGROUP;
|
||||||
|
|
||||||
// From heapam_xlog.h
|
// From heapam_xlog.h
|
||||||
pub const XLOG_HEAP_INSERT: u8 = 0x00;
|
pub const XLOG_HEAP_INSERT: u8 = 0x00;
|
||||||
pub const XLOG_HEAP_DELETE: u8 = 0x10;
|
pub const XLOG_HEAP_DELETE: u8 = 0x10;
|
||||||
|
|||||||
Reference in New Issue
Block a user