Revert all changes related to storing and restoring non-rel data in page server

This includes the following commits:

35a1c3d521 Specify right LSN in test_createdb.py
d95e1da742 Fix issue with propagation of CREATE DATABASE to the branch
8465738aa5 [refer #167] Fix handling of pg_filenode.map files in page server
86056abd0e Fix merge conflict: set initial WAL position to second segment because of pg_resetwal
2bf2dd1d88 Add nonrelfile_utils.rs file
20b6279beb Fix restoring non-relational data during compute node startup
06f96f9600 Do not transfer WAL to computation nodes: use pg_resetwal for node startup

As well as some older changes related to storing CLOG and MultiXact data as
"pseudorelation" in the page server.

With this revert, we go back to the situtation that when you create a
new compute node, we ship *all* the WAL from the beginning of time to
the compute node. Obviously we need a better solution, like the code
that this reverts. But per discussion with Konstantin and Stas, this
stuff was still half-baked, and it's better for it to live in a branch
for now, until it's more complete and has gone through some review.
This commit is contained in:
Heikki Linnakangas
2021-05-24 14:57:23 +03:00
parent 6f9a582973
commit 6a9c036ac1
16 changed files with 120 additions and 1091 deletions

View File

@@ -252,6 +252,7 @@ impl PostgresNode {
// new data directory
pub fn init_from_page_server(&self) -> Result<()> {
let pgdata = self.pgdata();
println!(
"Extracting base backup to create postgres instance: path={} port={}",
pgdata.display(),
@@ -340,9 +341,6 @@ impl PostgresNode {
),
)?;
fs::create_dir_all(self.pgdata().join("pg_wal"))?;
fs::create_dir_all(self.pgdata().join("pg_wal").join("archive_status"))?;
self.pg_resetwal(&["-f"])?;
Ok(())
}
@@ -398,19 +396,6 @@ impl PostgresNode {
Ok(())
}
fn pg_resetwal(&self, args: &[&str]) -> Result<()> {
let pg_resetwal_path = self.env.pg_bin_dir().join("pg_resetwal");
let pg_ctl = Command::new(pg_resetwal_path)
.args([&["-D", self.pgdata().to_str().unwrap()], args].concat())
.status()
.with_context(|| "pg_resetwal failed")?;
if !pg_ctl.success() {
anyhow::bail!("pg_resetwal failed");
}
Ok(())
}
pub fn start(&self) -> Result<()> {
println!("Starting postgres node at '{}'", self.connstr());
self.pg_ctl(&["start"])

View File

@@ -2,185 +2,27 @@ use crate::ZTimelineId;
use log::*;
use std::io::Write;
use std::sync::Arc;
use std::time::SystemTime;
use tar::{Builder, Header};
use tar::Builder;
use walkdir::WalkDir;
use crate::repository::{BufferTag, RelTag, Timeline};
use crate::repository::Timeline;
use postgres_ffi::relfile_utils::*;
use postgres_ffi::*;
use zenith_utils::lsn::Lsn;
fn new_tar_header(path: &str, size: u64) -> anyhow::Result<Header> {
let mut header = Header::new_gnu();
header.set_size(size);
header.set_path(path)?;
header.set_mode(0b110000000);
header.set_mtime(
SystemTime::now()
.duration_since(SystemTime::UNIX_EPOCH)
.unwrap()
.as_secs(),
);
header.set_cksum();
Ok(header)
}
//
// Generate SRLU segment files from repository
//
fn add_slru_segments(
ar: &mut Builder<&mut dyn Write>,
timeline: &Arc<dyn Timeline>,
path: &str,
forknum: u8,
lsn: Lsn,
) -> anyhow::Result<()> {
let rel = RelTag {
spcnode: 0,
dbnode: 0,
relnode: 0,
forknum,
};
let (first, last) = timeline.get_range(rel, lsn)?;
const SEG_SIZE: usize =
pg_constants::BLCKSZ as usize * pg_constants::SLRU_PAGES_PER_SEGMENT as usize;
let mut seg_buf = [0u8; SEG_SIZE];
let mut curr_segno: Option<u32> = None;
for page in first..last {
let tag = BufferTag { rel, blknum: page };
let img = timeline.get_page_at_lsn(tag, lsn)?;
// Zero length image indicates truncated segment: just skip it
if !img.is_empty() {
assert!(img.len() == pg_constants::BLCKSZ as usize);
let segno = page / pg_constants::SLRU_PAGES_PER_SEGMENT;
if curr_segno.is_some() && curr_segno.unwrap() != segno {
let segname = format!("{}/{:>04X}", path, curr_segno.unwrap());
let header = new_tar_header(&segname, SEG_SIZE as u64)?;
ar.append(&header, &seg_buf[..])?;
seg_buf = [0u8; SEG_SIZE];
}
curr_segno = Some(segno);
let offs_start = (page % pg_constants::SLRU_PAGES_PER_SEGMENT) as usize
* pg_constants::BLCKSZ as usize;
let offs_end = offs_start + pg_constants::BLCKSZ as usize;
seg_buf[offs_start..offs_end].copy_from_slice(&img);
}
}
if curr_segno.is_some() {
let segname = format!("{}/{:>04X}", path, curr_segno.unwrap());
let header = new_tar_header(&segname, SEG_SIZE as u64)?;
ar.append(&header, &seg_buf[..])?;
}
Ok(())
}
//
// Extract pg_filenode.map files from repository
//
fn add_relmap_files(
ar: &mut Builder<&mut dyn Write>,
timeline: &Arc<dyn Timeline>,
lsn: Lsn,
snappath: &str,
) -> anyhow::Result<()> {
for db in timeline.get_databases(lsn)?.iter() {
let tag = BufferTag {
rel: *db,
blknum: 0,
};
let img = timeline.get_page_at_lsn(tag, lsn)?;
let path = if db.spcnode == pg_constants::GLOBALTABLESPACE_OID {
String::from("global/pg_filenode.map")
} else {
// User defined tablespaces are not supported
assert!(db.spcnode == pg_constants::DEFAULTTABLESPACE_OID);
let src_path = format!("{}/base/1/PG_VERSION", snappath);
let dst_path = format!("base/{}/PG_VERSION", db.dbnode);
ar.append_path_with_name(&src_path, &dst_path)?;
format!("base/{}/pg_filenode.map", db.dbnode)
};
info!("Deliver {}", path);
assert!(img.len() == 512);
let header = new_tar_header(&path, img.len() as u64)?;
ar.append(&header, &img[..])?;
}
Ok(())
}
//
// Extract twophase state files
//
fn add_twophase_files(
ar: &mut Builder<&mut dyn Write>,
timeline: &Arc<dyn Timeline>,
lsn: Lsn,
) -> anyhow::Result<()> {
for xid in timeline.get_twophase(lsn)?.iter() {
let tag = BufferTag {
rel: RelTag {
spcnode: 0,
dbnode: 0,
relnode: 0,
forknum: pg_constants::PG_TWOPHASE_FORKNUM,
},
blknum: *xid,
};
let img = timeline.get_page_at_lsn(tag, lsn)?;
let path = format!("pg_twophase/{:>08X}", xid);
let header = new_tar_header(&path, img.len() as u64)?;
ar.append(&header, &img[..])?;
}
Ok(())
}
//
// Add generated pg_control file
//
fn add_pgcontrol_file(
ar: &mut Builder<&mut dyn Write>,
timeline: &Arc<dyn Timeline>,
lsn: Lsn,
) -> anyhow::Result<()> {
if let Some(checkpoint_bytes) =
timeline.get_page_image(BufferTag::fork(pg_constants::PG_CHECKPOINT_FORKNUM), Lsn(0))?
{
if let Some(pg_control_bytes) = timeline.get_page_image(
BufferTag::fork(pg_constants::PG_CONTROLFILE_FORKNUM),
Lsn(0),
)? {
let mut pg_control = postgres_ffi::decode_pg_control(pg_control_bytes)?;
let mut checkpoint = postgres_ffi::decode_checkpoint(checkpoint_bytes)?;
checkpoint.redo = lsn.0;
checkpoint.nextXid.value += 1;
// TODO: When we restart master there are no active transaction and oldestXid is
// equal to nextXid if there are no prepared transactions.
// Let's ignore them for a while...
checkpoint.oldestXid = checkpoint.nextXid.value as u32;
pg_control.checkPointCopy = checkpoint;
let pg_control_bytes = postgres_ffi::encode_pg_control(pg_control);
let header = new_tar_header("global/pg_control", pg_control_bytes.len() as u64)?;
ar.append(&header, &pg_control_bytes[..])?;
}
}
Ok(())
}
///
/// Generate tarball with non-relational files from repository
///
pub fn send_tarball_at_lsn(
write: &mut dyn Write,
timelineid: ZTimelineId,
timeline: &Arc<dyn Timeline>,
lsn: Lsn,
_timeline: &Arc<dyn Timeline>,
_lsn: Lsn,
snapshot_lsn: Lsn,
) -> anyhow::Result<()> {
let mut ar = Builder::new(write);
let snappath = format!("timelines/{}/snapshots/{:016X}", timelineid, snapshot_lsn.0);
let walpath = format!("timelines/{}/wal", timelineid);
debug!("sending tarball of snapshot in {}", snappath);
for entry in WalkDir::new(&snappath) {
@@ -207,14 +49,8 @@ pub fn send_tarball_at_lsn(
trace!("sending shared catalog {}", relpath.display());
ar.append_path_with_name(fullpath, relpath)?;
} else if !is_rel_file_path(relpath.to_str().unwrap()) {
if entry.file_name() != "pg_filenode.map"
&& entry.file_name() != "pg_control"
&& !relpath.starts_with("pg_xact/")
&& !relpath.starts_with("pg_multixact/")
{
trace!("sending {}", relpath.display());
ar.append_path_with_name(fullpath, relpath)?;
}
trace!("sending {}", relpath.display());
ar.append_path_with_name(fullpath, relpath)?;
} else {
trace!("not sending {}", relpath.display());
}
@@ -223,31 +59,27 @@ pub fn send_tarball_at_lsn(
}
}
add_slru_segments(
&mut ar,
timeline,
"pg_xact",
pg_constants::PG_XACT_FORKNUM,
lsn,
)?;
add_slru_segments(
&mut ar,
timeline,
"pg_multixact/members",
pg_constants::PG_MXACT_MEMBERS_FORKNUM,
lsn,
)?;
add_slru_segments(
&mut ar,
timeline,
"pg_multixact/offsets",
pg_constants::PG_MXACT_OFFSETS_FORKNUM,
lsn,
)?;
add_relmap_files(&mut ar, timeline, lsn, &snappath)?;
add_twophase_files(&mut ar, timeline, lsn)?;
add_pgcontrol_file(&mut ar, timeline, lsn)?;
// FIXME: Also send all the WAL. The compute node would only need
// the WAL that applies to non-relation files, because the page
// server handles all the relation files. But we don't have a
// mechanism for separating relation and non-relation WAL at the
// moment.
for entry in std::fs::read_dir(&walpath)? {
let entry = entry?;
let fullpath = &entry.path();
let relpath = fullpath.strip_prefix(&walpath).unwrap();
if !entry.path().is_file() {
continue;
}
let archive_fname = relpath.to_str().unwrap();
let archive_fname = archive_fname
.strip_suffix(".partial")
.unwrap_or(&archive_fname);
let archive_path = "pg_wal/".to_owned() + archive_fname;
ar.append_path_with_name(fullpath, archive_path)?;
}
ar.finish()?;
debug!("all tarred up!");
Ok(())

View File

@@ -101,6 +101,9 @@ pub fn init_repo(conf: &PageServerConf, repo_dir: &Path) -> Result<()> {
// Remove pg_wal
fs::remove_dir_all(tmppath.join("pg_wal"))?;
force_crash_recovery(&tmppath)?;
println!("updated pg_control");
let target = timelinedir.join("snapshots").join(&lsnstr);
fs::rename(tmppath, &target)?;
@@ -309,6 +312,31 @@ fn parse_point_in_time(conf: &PageServerConf, s: &str) -> Result<PointInTime> {
bail!("could not parse point-in-time {}", s);
}
// If control file says the cluster was shut down cleanly, modify it, to mark
// it as crashed. That forces crash recovery when you start the cluster.
//
// FIXME:
// We currently do this to the initial snapshot in "zenith init". It would
// be more natural to do this when the snapshot is restored instead, but we
// currently don't have any code to create new snapshots, so it doesn't matter
// Or better yet, use a less hacky way of putting the cluster into recovery.
// Perhaps create a backup label file in the data directory when it's restored.
fn force_crash_recovery(datadir: &Path) -> Result<()> {
// Read in the control file
let controlfilepath = datadir.to_path_buf().join("global").join("pg_control");
let mut controlfile =
postgres_ffi::decode_pg_control(Bytes::from(fs::read(controlfilepath.as_path())?))?;
controlfile.state = postgres_ffi::DBState_DB_IN_PRODUCTION;
fs::write(
controlfilepath.as_path(),
postgres_ffi::encode_pg_control(controlfile),
)?;
Ok(())
}
fn create_timeline(conf: &PageServerConf, ancestor: Option<PointInTime>) -> Result<ZTimelineId> {
// Create initial timeline
let mut tli_buf = [0u8; 16];

View File

@@ -921,7 +921,7 @@ impl Connection {
// find latest snapshot
let snapshot_lsn =
restore_local_repo::find_latest_snapshot(&self.conf, timelineid).unwrap();
let req_lsn = lsn.unwrap_or_else(|| timeline.get_last_valid_lsn());
let req_lsn = lsn.unwrap_or(snapshot_lsn);
basebackup::send_tarball_at_lsn(
&mut CopyDataSink { stream },
timelineid,

View File

@@ -1,6 +1,6 @@
pub mod rocksdb;
use crate::waldecoder::{DecodedWALRecord, Oid, TransactionId, XlCreateDatabase, XlSmgrTruncate};
use crate::waldecoder::{DecodedWALRecord, Oid, XlCreateDatabase, XlSmgrTruncate};
use crate::ZTimelineId;
use anyhow::Result;
use bytes::{Buf, BufMut, Bytes, BytesMut};
@@ -48,9 +48,6 @@ pub trait Timeline {
/// Does relation exist?
fn get_relsize_exists(&self, tag: RelTag, lsn: Lsn) -> Result<bool>;
/// Get page image at the particular LSN
fn get_page_image(&self, tag: BufferTag, lsn: Lsn) -> Result<Option<Bytes>>;
//------------------------------------------------------------------------------
// Public PUT functions, to update the repository with new page versions.
//
@@ -166,16 +163,6 @@ pub trait Timeline {
/// valid LSN, so that the WAL receiver knows where to restart streaming.
fn advance_last_record_lsn(&self, lsn: Lsn);
fn get_last_record_lsn(&self) -> Lsn;
/// Get range [begin,end) of stored blocks. Used mostly for SMGR pseudorelations
/// but can be also applied to normal relations.
fn get_range(&self, rel: RelTag, lsn: Lsn) -> Result<(u32, u32)>;
/// Get vector of databases (represented using RelTag only dbnode and spcnode fields are used)
fn get_databases(&self, lsn: Lsn) -> Result<Vec<RelTag>>;
/// Get vector of prepared twophase transactions
fn get_twophase(&self, lsn: Lsn) -> Result<Vec<TransactionId>>;
}
#[derive(Clone)]
@@ -236,18 +223,6 @@ pub struct BufferTag {
}
impl BufferTag {
pub fn fork(forknum: u8) -> BufferTag {
BufferTag {
rel: RelTag {
forknum,
spcnode: 0,
dbnode: 0,
relnode: 0,
},
blknum: 0,
}
}
pub fn pack(&self, buf: &mut BytesMut) {
self.rel.pack(buf);
buf.put_u32(self.blknum);

View File

@@ -7,7 +7,7 @@
use crate::repository::{BufferTag, RelTag, Repository, Timeline, WALRecord};
use crate::restore_local_repo::restore_timeline;
use crate::waldecoder::{Oid, TransactionId};
use crate::waldecoder::Oid;
use crate::walredo::WalRedoManager;
use crate::PageServerConf;
use crate::ZTimelineId;
@@ -16,8 +16,6 @@ use crate::ZTimelineId;
use anyhow::{bail, Context, Result};
use bytes::{Buf, BufMut, Bytes, BytesMut};
use log::*;
use postgres_ffi::nonrelfile_utils::transaction_id_get_status;
use postgres_ffi::*;
use std::cmp::min;
use std::collections::HashMap;
use std::convert::TryInto;
@@ -417,19 +415,6 @@ impl RocksTimeline {
let mut minkey = maxkey.clone();
minkey.lsn = Lsn(0); // first version
// Special handling of delete of PREPARE WAL record
if last_lsn < horizon
&& key.tag.rel.forknum == pg_constants::PG_TWOPHASE_FORKNUM
{
if (v[0] & UNUSED_VERSION_FLAG) == 0 {
let mut v = v.to_owned();
v[0] |= UNUSED_VERSION_FLAG;
self.db.put(key.to_bytes(), &v[..])?;
deleted += 1;
}
maxkey = minkey;
continue;
}
// reconstruct most recent page version
if (v[0] & CONTENT_KIND_MASK) == CONTENT_WAL_RECORD {
// force reconstruction of most recent page version
@@ -623,116 +608,6 @@ impl Timeline for RocksTimeline {
self.relsize_get_nowait(rel, lsn)
}
/// Get vector of prepared twophase transactions
fn get_twophase(&self, lsn: Lsn) -> Result<Vec<TransactionId>> {
let key = CacheKey {
// minimal key
tag: BufferTag {
rel: RelTag {
forknum: pg_constants::PG_TWOPHASE_FORKNUM,
spcnode: 0,
dbnode: 0,
relnode: 0,
},
blknum: 0,
},
lsn: Lsn(0),
};
let mut gxacts = Vec::new();
let mut iter = self.db.raw_iterator();
iter.seek(key.to_bytes());
while iter.valid() {
let key = CacheKey::from_slice(iter.key().unwrap());
if key.tag.rel.forknum != pg_constants::PG_TWOPHASE_FORKNUM {
break; // we are done with this fork
}
if key.lsn <= lsn {
let xid = key.tag.blknum;
let tag = BufferTag {
rel: RelTag {
forknum: pg_constants::PG_XACT_FORKNUM,
spcnode: 0,
dbnode: 0,
relnode: 0,
},
blknum: xid / pg_constants::CLOG_XACTS_PER_PAGE,
};
let clog_page = self.get_page_at_lsn(tag, lsn)?;
let status = transaction_id_get_status(xid, &clog_page[..]);
if status == pg_constants::TRANSACTION_STATUS_IN_PROGRESS {
gxacts.push(xid);
}
}
iter.next();
}
Ok(gxacts)
}
/// Get databases. This function is used to local pg_filenode.map files
fn get_databases(&self, lsn: Lsn) -> Result<Vec<RelTag>> {
let key = CacheKey {
// minimal key
tag: BufferTag {
rel: RelTag {
forknum: pg_constants::PG_FILENODEMAP_FORKNUM,
spcnode: 0,
dbnode: 0,
relnode: 0,
},
blknum: 0,
},
lsn: Lsn(0),
};
let mut dbs = Vec::new();
let mut iter = self.db.raw_iterator();
iter.seek(key.to_bytes());
let mut prev_tag = key.tag.rel;
while iter.valid() {
let key = CacheKey::from_slice(iter.key().unwrap());
if key.tag.rel.forknum != pg_constants::PG_FILENODEMAP_FORKNUM {
break; // we are done with this fork
}
if key.tag.rel != prev_tag && key.lsn <= lsn {
prev_tag = key.tag.rel;
dbs.push(prev_tag); // collect unique tags
}
iter.next();
}
Ok(dbs)
}
/// Get range [begin,end) of stored blocks. Used mostly for SMGR pseudorelations
/// but can be also applied to normal relations.
fn get_range(&self, rel: RelTag, lsn: Lsn) -> Result<(u32, u32)> {
let _lsn = self.wait_lsn(lsn)?;
let mut key = CacheKey {
// minimal key to start with
tag: BufferTag { rel, blknum: 0 },
lsn: Lsn(0),
};
let mut iter = self.db.raw_iterator();
iter.seek(key.to_bytes()); // locate first entry
if iter.valid() {
let thiskey = CacheKey::from_slice(iter.key().unwrap());
let tag = thiskey.tag;
if tag.rel == rel {
// still trversing this relation
let first_blknum = tag.blknum;
key.tag.blknum = u32::MAX; // maximal key
let mut iter = self.db.raw_iterator();
iter.seek_for_prev(key.to_bytes()); // localte last entry
if iter.valid() {
let thiskey = CacheKey::from_slice(iter.key().unwrap());
let last_blknum = thiskey.tag.blknum;
return Ok((first_blknum, last_blknum + 1)); // upper boundary is exclusive
}
}
}
Ok((0, 0)) // empty range
}
///
/// Does relation exist at given LSN?
///
@@ -811,20 +686,6 @@ impl Timeline for RocksTimeline {
Ok(())
}
///
/// Get page image at particular LSN
///
fn get_page_image(&self, tag: BufferTag, lsn: Lsn) -> Result<Option<Bytes>> {
let key = CacheKey { tag, lsn };
if let Some(bytes) = self.db.get(key.to_bytes())? {
let content = CacheEntryContent::from_slice(&bytes);
if let CacheEntryContent::PageImage(img) = content {
return Ok(Some(img));
}
}
Ok(None)
}
///
/// Memorize a full image of a page version
///
@@ -866,42 +727,35 @@ impl Timeline for RocksTimeline {
src_db_id: Oid,
src_tablespace_id: Oid,
) -> Result<()> {
let mut n = 0;
for forknum in &[
pg_constants::MAIN_FORKNUM,
pg_constants::FSM_FORKNUM,
pg_constants::VISIBILITYMAP_FORKNUM,
pg_constants::INIT_FORKNUM,
pg_constants::PG_FILENODEMAP_FORKNUM,
] {
let key = CacheKey {
tag: BufferTag {
rel: RelTag {
spcnode: src_tablespace_id,
dbnode: src_db_id,
relnode: 0,
forknum: *forknum,
},
blknum: 0,
let key = CacheKey {
tag: BufferTag {
rel: RelTag {
spcnode: src_tablespace_id,
dbnode: src_db_id,
relnode: 0,
forknum: 0u8,
},
lsn: Lsn(0),
};
let mut iter = self.db.raw_iterator();
iter.seek(key.to_bytes());
while iter.valid() {
let mut key = CacheKey::from_slice(iter.key().unwrap());
if key.tag.rel.spcnode != src_tablespace_id || key.tag.rel.dbnode != src_db_id {
break;
}
key.tag.rel.spcnode = tablespace_id;
key.tag.rel.dbnode = db_id;
key.lsn = lsn;
let v = iter.value().unwrap();
self.db.put(key.to_bytes(), v)?;
n += 1;
iter.next();
blknum: 0,
},
lsn: Lsn(0),
};
let mut iter = self.db.raw_iterator();
iter.seek(key.to_bytes());
let mut n = 0;
while iter.valid() {
let mut key = CacheKey::from_slice(iter.key().unwrap());
if key.tag.rel.spcnode != src_tablespace_id || key.tag.rel.dbnode != src_db_id {
break;
}
key.tag.rel.spcnode = tablespace_id;
key.tag.rel.dbnode = db_id;
key.lsn = lsn;
let v = iter.value().unwrap();
self.db.put(key.to_bytes(), v)?;
n += 1;
iter.next();
}
info!(
"Create database {}/{}, copy {} entries",
@@ -912,7 +766,6 @@ impl Timeline for RocksTimeline {
/// Remember that WAL has been received and added to the timeline up to the given LSN
fn advance_last_valid_lsn(&self, lsn: Lsn) {
let lsn = Lsn((lsn.0 + 7) & !7); // align position on 8 bytes
let old = self.last_valid_lsn.advance(lsn);
// Can't move backwards.
@@ -930,8 +783,7 @@ impl Timeline for RocksTimeline {
/// NOTE: this updates last_valid_lsn as well.
///
fn advance_last_record_lsn(&self, lsn: Lsn) {
let lsn = Lsn((lsn.0 + 7) & !7); // align position on 8 bytes
// Can't move backwards.
// Can't move backwards.
let old = self.last_record_lsn.fetch_max(lsn);
assert!(old <= lsn);

View File

@@ -26,9 +26,9 @@ use crate::repository::{BufferTag, RelTag, Timeline};
use crate::waldecoder::{decode_wal_record, Oid, WalStreamDecoder};
use crate::PageServerConf;
use crate::ZTimelineId;
use postgres_ffi::pg_constants;
use postgres_ffi::relfile_utils::*;
use postgres_ffi::xlog_utils::*;
use postgres_ffi::*;
use zenith_utils::lsn::Lsn;
///
@@ -103,7 +103,7 @@ pub fn find_latest_snapshot(_conf: &PageServerConf, timeline: ZTimelineId) -> Re
}
fn restore_snapshot(
conf: &PageServerConf,
_conf: &PageServerConf,
timeline: &dyn Timeline,
timelineid: ZTimelineId,
snapshot: &str,
@@ -120,28 +120,8 @@ fn restore_snapshot(
None => continue,
// These special files appear in the snapshot, but are not needed by the page server
Some("pg_control") => restore_nonrel_file(
conf,
timeline,
timelineid,
"0",
0,
0,
pg_constants::PG_CONTROLFILE_FORKNUM,
0,
&direntry.path(),
)?,
Some("pg_filenode.map") => restore_nonrel_file(
conf,
timeline,
timelineid,
snapshot,
pg_constants::GLOBALTABLESPACE_OID,
0,
pg_constants::PG_FILENODEMAP_FORKNUM,
0,
&direntry.path(),
)?,
Some("pg_control") => continue,
Some("pg_filenode.map") => continue,
// Load any relation files into the page server
_ => restore_relfile(
@@ -168,17 +148,7 @@ fn restore_snapshot(
// These special files appear in the snapshot, but are not needed by the page server
Some("PG_VERSION") => continue,
Some("pg_filenode.map") => restore_nonrel_file(
conf,
timeline,
timelineid,
snapshot,
pg_constants::DEFAULTTABLESPACE_OID,
dboid,
pg_constants::PG_FILENODEMAP_FORKNUM,
0,
&direntry.path(),
)?,
Some("pg_filenode.map") => continue,
// Load any relation files into the page server
_ => restore_relfile(
@@ -191,54 +161,6 @@ fn restore_snapshot(
}
}
}
for entry in fs::read_dir(snapshotpath.join("pg_xact"))? {
let entry = entry?;
restore_slru_file(
conf,
timeline,
timelineid,
snapshot,
pg_constants::PG_XACT_FORKNUM,
&entry.path(),
)?;
}
for entry in fs::read_dir(snapshotpath.join("pg_multixact").join("members"))? {
let entry = entry?;
restore_slru_file(
conf,
timeline,
timelineid,
snapshot,
pg_constants::PG_MXACT_MEMBERS_FORKNUM,
&entry.path(),
)?;
}
for entry in fs::read_dir(snapshotpath.join("pg_multixact").join("offsets"))? {
let entry = entry?;
restore_slru_file(
conf,
timeline,
timelineid,
snapshot,
pg_constants::PG_MXACT_OFFSETS_FORKNUM,
&entry.path(),
)?;
}
for entry in fs::read_dir(snapshotpath.join("pg_twophase"))? {
let entry = entry?;
let xid = u32::from_str_radix(&entry.path().to_str().unwrap(), 16)?;
restore_nonrel_file(
conf,
timeline,
timelineid,
snapshot,
0,
0,
pg_constants::PG_TWOPHASE_FORKNUM,
xid,
&entry.path(),
)?;
}
// TODO: Scan pg_tblspc
Ok(())
@@ -306,96 +228,6 @@ fn restore_relfile(
Ok(())
}
fn restore_nonrel_file(
_conf: &PageServerConf,
timeline: &dyn Timeline,
_timelineid: ZTimelineId,
snapshot: &str,
spcoid: Oid,
dboid: Oid,
forknum: u8,
blknum: u32,
path: &Path,
) -> Result<()> {
let lsn = Lsn::from_hex(snapshot)?;
// Does it look like a relation file?
let mut file = File::open(path)?;
let mut buffer = Vec::new();
// read the whole file
file.read_to_end(&mut buffer)?;
let tag = BufferTag {
rel: RelTag {
spcnode: spcoid,
dbnode: dboid,
relnode: 0,
forknum,
},
blknum,
};
timeline.put_page_image(tag, lsn, Bytes::copy_from_slice(&buffer[..]));
Ok(())
}
fn restore_slru_file(
_conf: &PageServerConf,
timeline: &dyn Timeline,
_timelineid: ZTimelineId,
snapshot: &str,
forknum: u8,
path: &Path,
) -> Result<()> {
let lsn = Lsn::from_hex(snapshot)?;
// Does it look like a relation file?
let mut file = File::open(path)?;
let mut buf: [u8; 8192] = [0u8; 8192];
let segno = u32::from_str_radix(path.file_name().unwrap().to_str().unwrap(), 16)?;
let mut blknum: u32 = segno * pg_constants::SLRU_PAGES_PER_SEGMENT;
loop {
let r = file.read_exact(&mut buf);
match r {
Ok(_) => {
let tag = BufferTag {
rel: RelTag {
spcnode: 0,
dbnode: 0,
relnode: 0,
forknum,
},
blknum,
};
timeline.put_page_image(tag, lsn, Bytes::copy_from_slice(&buf));
/*
if oldest_lsn == 0 || p.lsn < oldest_lsn {
oldest_lsn = p.lsn;
}
*/
}
// TODO: UnexpectedEof is expected
Err(e) => match e.kind() {
std::io::ErrorKind::UnexpectedEof => {
// reached EOF. That's expected.
// FIXME: maybe check that we read the full length of the file?
break;
}
_ => {
error!("error reading file: {:?} ({})", path, e);
break;
}
},
};
blknum += 1;
}
Ok(())
}
// Scan WAL on a timeline, starting from given LSN, and load all the records
// into the page cache.
fn restore_wal(timeline: &dyn Timeline, timelineid: ZTimelineId, startpoint: Lsn) -> Result<()> {
@@ -406,17 +238,6 @@ fn restore_wal(timeline: &dyn Timeline, timelineid: ZTimelineId, startpoint: Lsn
let mut segno = startpoint.segment_number(pg_constants::WAL_SEGMENT_SIZE);
let mut offset = startpoint.segment_offset(pg_constants::WAL_SEGMENT_SIZE);
let mut last_lsn = Lsn(0);
let mut checkpoint = CheckPoint::new(startpoint.0, 1);
let checkpoint_tag = BufferTag::fork(pg_constants::PG_CHECKPOINT_FORKNUM);
let pg_control_tag = BufferTag::fork(pg_constants::PG_CONTROLFILE_FORKNUM);
if let Some(pg_control_bytes) = timeline.get_page_image(pg_control_tag, Lsn(0))? {
let pg_control = decode_pg_control(pg_control_bytes)?;
checkpoint = pg_control.checkPointCopy.clone();
} else {
error!("No control file is found in reposistory");
}
loop {
// FIXME: assume postgresql tli 1 for now
let filename = XLogFileName(1, segno, pg_constants::WAL_SEGMENT_SIZE);
@@ -454,12 +275,10 @@ fn restore_wal(timeline: &dyn Timeline, timelineid: ZTimelineId, startpoint: Lsn
if rec.is_err() {
// Assume that an error means we've reached the end of
// a partial WAL record. So that's ok.
trace!("WAL decoder error {:?}", rec);
waldecoder.set_position(Lsn((segno + 1) * pg_constants::WAL_SEGMENT_SIZE as u64));
break;
}
if let Some((lsn, recdata)) = rec.unwrap() {
let decoded = decode_wal_record(&mut checkpoint, recdata.clone());
let decoded = decode_wal_record(recdata.clone());
timeline.save_decoded_record(decoded, recdata, lsn)?;
last_lsn = lsn;
} else {
@@ -468,16 +287,12 @@ fn restore_wal(timeline: &dyn Timeline, timelineid: ZTimelineId, startpoint: Lsn
nrecords += 1;
}
info!(
"restored {} records from WAL file {} at {}",
nrecords, filename, last_lsn
);
info!("restored {} records from WAL file {}", nrecords, filename);
segno += 1;
offset = 0;
}
info!("reached end of WAL at {}", last_lsn);
let checkpoint_bytes = encode_checkpoint(checkpoint);
timeline.put_page_image(checkpoint_tag, Lsn(0), checkpoint_bytes);
Ok(())
}

View File

@@ -1,7 +1,7 @@
use bytes::{Buf, BufMut, Bytes, BytesMut};
use log::*;
use postgres_ffi::pg_constants;
use postgres_ffi::xlog_utils::XLogRecord;
use postgres_ffi::*;
use std::cmp::min;
use std::str;
use thiserror::Error;
@@ -14,19 +14,17 @@ pub type OffsetNumber = u16;
pub type MultiXactId = TransactionId;
pub type MultiXactOffset = u32;
pub type MultiXactStatus = u32;
pub type TimeLineID = u32;
pub type PgTime = i64;
// From PostgreSQL headers
#[repr(C)]
#[derive(Debug)]
pub struct XLogPageHeaderData {
xlp_magic: u16, /* magic value for correctness checks */
xlp_info: u16, /* flag bits, see below */
xlp_tli: TimeLineID, /* TimeLineID of first record on page */
xlp_pageaddr: u64, /* XLOG address of this page */
xlp_rem_len: u32, /* total len of remaining data for record */
xlp_magic: u16, /* magic value for correctness checks */
xlp_info: u16, /* flag bits, see below */
xlp_tli: u32, /* TimeLineID of first record on page */
xlp_pageaddr: u64, /* XLOG address of this page */
xlp_rem_len: u32, /* total len of remaining data for record */
}
// FIXME: this assumes MAXIMUM_ALIGNOF 8. There are 4 padding bytes at end
@@ -84,13 +82,6 @@ impl WalStreamDecoder {
}
}
pub fn set_position(&mut self, lsn: Lsn) {
self.lsn = lsn;
self.contlen = 0;
self.padlen = 0;
self.inputbuf.clear();
}
pub fn feed_bytes(&mut self, buf: &[u8]) {
self.inputbuf.extend_from_slice(buf);
}
@@ -556,7 +547,7 @@ impl XlMultiXactTruncate {
// block data
// ...
// main data
pub fn decode_wal_record(checkpoint: &mut CheckPoint, record: Bytes) -> DecodedWALRecord {
pub fn decode_wal_record(record: Bytes) -> DecodedWALRecord {
let mut rnode_spcnode: u32 = 0;
let mut rnode_dbnode: u32 = 0;
let mut rnode_relnode: u32 = 0;
@@ -574,12 +565,7 @@ pub fn decode_wal_record(checkpoint: &mut CheckPoint, record: Bytes) -> DecodedW
xlogrec.xl_rmid,
xlogrec.xl_info
);
if xlogrec.xl_xid > checkpoint.nextXid.value as u32 {
// TODO: handle XID wraparound
checkpoint.nextXid = FullTransactionId {
value: (checkpoint.nextXid.value & 0xFFFFFFFF00000000) | xlogrec.xl_xid as u64,
};
}
let remaining = xlogrec.xl_tot_len - SizeOfXLogRecord;
if buf.remaining() != remaining as usize {
@@ -790,30 +776,10 @@ pub fn decode_wal_record(checkpoint: &mut CheckPoint, record: Bytes) -> DecodedW
assert_eq!(buf.remaining(), main_data_len as usize);
}
//5. Handle special CLOG and XACT records
if xlogrec.xl_rmid == pg_constants::RM_CLOG_ID {
let mut blk = DecodedBkpBlock::new();
blk.forknum = pg_constants::PG_XACT_FORKNUM;
blk.blkno = buf.get_i32_le() as u32;
blk.will_init = true;
trace!("RM_CLOG_ID updates block {}", blk.blkno);
blocks.push(blk);
} else if xlogrec.xl_rmid == pg_constants::RM_XACT_ID {
//5. Handle special XACT records
if xlogrec.xl_rmid == pg_constants::RM_XACT_ID {
let info = xlogrec.xl_info & pg_constants::XLOG_XACT_OPMASK;
if info == pg_constants::XLOG_XACT_COMMIT {
let mut blk = DecodedBkpBlock::new();
blk.forknum = pg_constants::PG_XACT_FORKNUM;
blk.blkno = xlogrec.xl_xid / pg_constants::CLOG_XACTS_PER_PAGE;
trace!(
"XLOG_XACT_COMMIT xl_info {} xl_prev {:X}/{:X} xid {} updates block {} main_data_len {}",
xlogrec.xl_info, (xlogrec.xl_prev >> 32),
xlogrec.xl_prev & 0xffffffff,
xlogrec.xl_xid,
blk.blkno,
main_data_len
);
blocks.push(blk);
//parse commit record to extract subtrans entries
// xl_xact_commit starts with time of commit
let _xact_time = buf.get_i64_le();
@@ -828,17 +794,8 @@ pub fn decode_wal_record(checkpoint: &mut CheckPoint, record: Bytes) -> DecodedW
}
if xinfo & pg_constants::XACT_XINFO_HAS_SUBXACTS != 0 {
let nsubxacts = buf.get_i32_le();
let mut prev_blkno = u32::MAX;
for _i in 0..nsubxacts {
let subxact = buf.get_u32_le();
let blkno = subxact / pg_constants::CLOG_XACTS_PER_PAGE;
if prev_blkno != blkno {
prev_blkno = blkno;
let mut blk = DecodedBkpBlock::new();
blk.forknum = pg_constants::PG_XACT_FORKNUM;
blk.blkno = blkno;
blocks.push(blk);
}
let _subxact = buf.get_u32_le();
}
}
if xinfo & pg_constants::XACT_XINFO_HAS_RELFILENODES != 0 {
@@ -869,18 +826,6 @@ pub fn decode_wal_record(checkpoint: &mut CheckPoint, record: Bytes) -> DecodedW
//TODO handle this to be able to restore pg_twophase on node start
}
} else if info == pg_constants::XLOG_XACT_ABORT {
let mut blk = DecodedBkpBlock::new();
blk.forknum = pg_constants::PG_XACT_FORKNUM;
blk.blkno = xlogrec.xl_xid / pg_constants::CLOG_XACTS_PER_PAGE;
trace!(
"XLOG_XACT_ABORT xl_info {} xl_prev {:X}/{:X} xid {} updates block {} main_data_len {}",
xlogrec.xl_info, (xlogrec.xl_prev >> 32),
xlogrec.xl_prev & 0xffffffff,
xlogrec.xl_xid,
blk.blkno,
main_data_len
);
blocks.push(blk);
//parse abort record to extract subtrans entries
// xl_xact_abort starts with time of commit
let _xact_time = buf.get_i64_le();
@@ -895,17 +840,8 @@ pub fn decode_wal_record(checkpoint: &mut CheckPoint, record: Bytes) -> DecodedW
}
if xinfo & pg_constants::XACT_XINFO_HAS_SUBXACTS != 0 {
let nsubxacts = buf.get_i32_le();
let mut prev_blkno = u32::MAX;
for _i in 0..nsubxacts {
let subxact = buf.get_u32_le();
let blkno = subxact / pg_constants::CLOG_XACTS_PER_PAGE;
if prev_blkno != blkno {
prev_blkno = blkno;
let mut blk = DecodedBkpBlock::new();
blk.forknum = pg_constants::PG_XACT_FORKNUM;
blk.blkno = blkno;
blocks.push(blk);
}
let _subxact = buf.get_u32_le();
}
}
if xinfo & pg_constants::XACT_XINFO_HAS_RELFILENODES != 0 {
@@ -927,11 +863,6 @@ pub fn decode_wal_record(checkpoint: &mut CheckPoint, record: Bytes) -> DecodedW
let _xid = buf.get_u32_le();
trace!("XLOG_XACT_ABORT-XACT_XINFO_HAS_TWOPHASE");
}
} else if info == pg_constants::XLOG_XACT_PREPARE {
let mut blk = DecodedBkpBlock::new();
blk.forknum = pg_constants::PG_TWOPHASE_FORKNUM;
blk.blkno = xlogrec.xl_xid;
blk.will_init = true;
}
} else if xlogrec.xl_rmid == pg_constants::RM_DBASE_ID {
let info = xlogrec.xl_info & !pg_constants::XLR_INFO_MASK;
@@ -964,7 +895,7 @@ pub fn decode_wal_record(checkpoint: &mut CheckPoint, record: Bytes) -> DecodedW
trace!("XLOG_TBLSPC_DROP is not handled yet");
}
} else if xlogrec.xl_rmid == pg_constants::RM_HEAP_ID {
let info = xlogrec.xl_info & pg_constants::XLR_RMGR_INFO_MASK;
let info = xlogrec.xl_info & pg_constants::XLOG_XACT_OPMASK;
let blkno = blocks[0].blkno / pg_constants::HEAPBLOCKS_PER_PAGE as u32;
if info == pg_constants::XLOG_HEAP_INSERT {
let xlrec = XlHeapInsert::decode(&mut buf);
@@ -1018,7 +949,7 @@ pub fn decode_wal_record(checkpoint: &mut CheckPoint, record: Bytes) -> DecodedW
}
}
} else if xlogrec.xl_rmid == pg_constants::RM_HEAP2_ID {
let info = xlogrec.xl_info & pg_constants::XLR_RMGR_INFO_MASK;
let info = xlogrec.xl_info & pg_constants::XLOG_XACT_OPMASK;
if info == pg_constants::XLOG_HEAP2_MULTI_INSERT {
let xlrec = XlHeapMultiInsert::decode(&mut buf);
if (xlrec.flags
@@ -1036,104 +967,6 @@ pub fn decode_wal_record(checkpoint: &mut CheckPoint, record: Bytes) -> DecodedW
blocks.push(blk);
}
}
} else if xlogrec.xl_rmid == pg_constants::RM_MULTIXACT_ID {
let info = xlogrec.xl_info & pg_constants::XLR_RMGR_INFO_MASK;
if info == pg_constants::XLOG_MULTIXACT_ZERO_OFF_PAGE {
let mut blk = DecodedBkpBlock::new();
blk.forknum = pg_constants::PG_MXACT_OFFSETS_FORKNUM;
blk.blkno = buf.get_u32_le();
blk.will_init = true;
blocks.push(blk);
} else if info == pg_constants::XLOG_MULTIXACT_ZERO_MEM_PAGE {
let mut blk = DecodedBkpBlock::new();
blk.forknum = pg_constants::PG_MXACT_MEMBERS_FORKNUM;
blk.blkno = buf.get_u32_le();
blk.will_init = true;
blocks.push(blk);
} else if info == pg_constants::XLOG_MULTIXACT_CREATE_ID {
let xlrec = XlMultiXactCreate::decode(&mut buf);
// Update offset page
let mut blk = DecodedBkpBlock::new();
blk.blkno = xlrec.mid / pg_constants::MULTIXACT_OFFSETS_PER_PAGE as u32;
blk.forknum = pg_constants::PG_MXACT_OFFSETS_FORKNUM;
blocks.push(blk);
let first_mbr_blkno = xlrec.moff / pg_constants::MULTIXACT_MEMBERS_PER_PAGE as u32;
let last_mbr_blkno =
(xlrec.moff + xlrec.nmembers - 1) / pg_constants::MULTIXACT_MEMBERS_PER_PAGE as u32;
for blkno in first_mbr_blkno..=last_mbr_blkno {
// Update members page
let mut blk = DecodedBkpBlock::new();
blk.forknum = pg_constants::PG_MXACT_MEMBERS_FORKNUM;
blk.blkno = blkno;
blocks.push(blk);
}
if xlrec.mid > checkpoint.nextMulti {
// See MultiXactAdvanceNextMXact in postgres code
checkpoint.nextMulti = xlrec.mid + 1;
}
if xlrec.moff > checkpoint.nextMultiOffset {
// See MultiXactAdvanceNextMXact in postgres code
checkpoint.nextMultiOffset = xlrec.moff + xlrec.nmembers;
}
let max_xid = xlrec
.members
.iter()
.fold(checkpoint.nextXid.value as u32, |acc, mbr| {
if mbr.xid > acc {
mbr.xid
} else {
acc
}
});
checkpoint.nextXid = FullTransactionId {
value: (checkpoint.nextXid.value & 0xFFFFFFFF00000000) | max_xid as u64,
};
} else if info == pg_constants::XLOG_MULTIXACT_TRUNCATE_ID {
let xlrec = XlMultiXactTruncate::decode(&mut buf);
checkpoint.oldestXid = xlrec.end_trunc_off;
checkpoint.oldestMultiDB = xlrec.oldest_multi_db;
let first_off_blkno =
xlrec.start_trunc_off / pg_constants::MULTIXACT_OFFSETS_PER_PAGE as u32;
let last_off_blkno =
xlrec.end_trunc_off / pg_constants::MULTIXACT_OFFSETS_PER_PAGE as u32;
for blkno in first_off_blkno..last_off_blkno {
let mut blk = DecodedBkpBlock::new();
blk.forknum = pg_constants::PG_MXACT_OFFSETS_FORKNUM;
blk.blkno = blkno;
blk.will_init = true;
blocks.push(blk);
}
let first_mbr_blkno =
xlrec.start_trunc_memb / pg_constants::MULTIXACT_MEMBERS_PER_PAGE as u32;
let last_mbr_blkno =
xlrec.end_trunc_memb / pg_constants::MULTIXACT_MEMBERS_PER_PAGE as u32;
for blkno in first_mbr_blkno..last_mbr_blkno {
let mut blk = DecodedBkpBlock::new();
blk.forknum = pg_constants::PG_MXACT_MEMBERS_FORKNUM;
blk.blkno = blkno;
blk.will_init = true;
blocks.push(blk);
}
} else {
panic!()
}
} else if xlogrec.xl_rmid == pg_constants::RM_RELMAP_ID {
let xlrec = XlRelmapUpdate::decode(&mut buf);
let mut blk = DecodedBkpBlock::new();
blk.forknum = pg_constants::PG_FILENODEMAP_FORKNUM;
blk.rnode_spcnode = xlrec.tsid;
blk.rnode_dbnode = xlrec.dbid;
blk.rnode_relnode = 0;
blk.will_init = true;
blocks.push(blk);
} else if xlogrec.xl_rmid == pg_constants::RM_XLOG_ID {
let info = xlogrec.xl_info & pg_constants::XLR_RMGR_INFO_MASK;
if info == pg_constants::XLOG_NEXTOID {
let next_oid = buf.get_u32_le();
if next_oid > checkpoint.nextOid {
checkpoint.nextOid = next_oid;
}
}
}
DecodedWALRecord {

View File

@@ -7,7 +7,6 @@
//!
use crate::page_cache;
use crate::repository::*;
use crate::waldecoder::*;
use crate::PageServerConf;
use crate::ZTimelineId;
@@ -17,8 +16,8 @@ use log::*;
use postgres::fallible_iterator::FallibleIterator;
use postgres::replication::ReplicationIter;
use postgres::{Client, NoTls, SimpleQueryMessage, SimpleQueryRow};
use postgres_ffi::pg_constants;
use postgres_ffi::xlog_utils::*;
use postgres_ffi::*;
use postgres_protocol::message::backend::ReplicationMessage;
use postgres_types::PgLsn;
use std::collections::HashMap;
@@ -147,11 +146,6 @@ fn walreceiver_main(
error!("No previous WAL position");
}
startpoint = Lsn::max(
startpoint,
Lsn(end_of_wal.0 & !(pg_constants::WAL_SEGMENT_SIZE as u64 - 1)),
);
// There might be some padding after the last full record, skip it.
//
// FIXME: It probably would be better to always start streaming from the beginning
@@ -171,14 +165,6 @@ fn walreceiver_main(
let mut waldecoder = WalStreamDecoder::new(startpoint);
let mut checkpoint = CheckPoint::new(startpoint.0, identify.timeline);
let checkpoint_tag = BufferTag::fork(pg_constants::PG_CHECKPOINT_FORKNUM);
if let Some(checkpoint_bytes) = timeline.get_page_image(checkpoint_tag, Lsn(0))? {
checkpoint = decode_checkpoint(checkpoint_bytes)?;
trace!("CheckPoint.nextXid = {}", checkpoint.nextXid.value);
} else {
error!("No checkpoint record was found in reposistory");
}
while let Some(replication_message) = physical_stream.next()? {
match replication_message {
ReplicationMessage::XLogData(xlog_data) => {
@@ -195,14 +181,9 @@ fn walreceiver_main(
waldecoder.feed_bytes(data);
while let Some((lsn, recdata)) = waldecoder.poll_decode()? {
let old_checkpoint_bytes = encode_checkpoint(checkpoint);
let decoded = decode_wal_record(&mut checkpoint, recdata.clone());
let decoded = decode_wal_record(recdata.clone());
timeline.save_decoded_record(decoded, recdata, lsn)?;
let new_checkpoint_bytes = encode_checkpoint(checkpoint);
if new_checkpoint_bytes != old_checkpoint_bytes {
timeline.put_page_image(checkpoint_tag, Lsn(0), new_checkpoint_bytes);
}
// Now that this record has been handled, let the page cache know that
// it is up-to-date to this LSN
timeline.advance_last_record_lsn(lsn);

View File

@@ -14,8 +14,7 @@
//! TODO: Even though the postgres code runs in a separate process,
//! it's not a secure sandbox.
//!
use byteorder::{ByteOrder, LittleEndian};
use bytes::{Buf, BufMut, Bytes, BytesMut};
use bytes::{BufMut, Bytes, BytesMut};
use log::*;
use std::assert;
use std::cell::RefCell;
@@ -37,11 +36,7 @@ use zenith_utils::lsn::Lsn;
use crate::repository::BufferTag;
use crate::repository::WALRecord;
use crate::waldecoder::{MultiXactId, XlMultiXactCreate};
use crate::PageServerConf;
use postgres_ffi::nonrelfile_utils::transaction_id_set_status;
use postgres_ffi::pg_constants;
use postgres_ffi::xlog_utils::XLogRecord;
///
/// WAL Redo Manager is responsible for replaying WAL records.
@@ -169,24 +164,6 @@ impl WalRedoManager for PostgresRedoManager {
}
}
fn mx_offset_to_flags_offset(xid: MultiXactId) -> usize {
((xid / pg_constants::MULTIXACT_MEMBERS_PER_MEMBERGROUP as u32) as u16
% pg_constants::MULTIXACT_MEMBERGROUPS_PER_PAGE
* pg_constants::MULTIXACT_MEMBERGROUP_SIZE) as usize
}
fn mx_offset_to_flags_bitshift(xid: MultiXactId) -> u16 {
(xid as u16) % pg_constants::MULTIXACT_MEMBERS_PER_MEMBERGROUP
* pg_constants::MXACT_MEMBER_BITS_PER_XACT
}
/* Location (byte offset within page) of TransactionId of given member */
fn mx_offset_to_member_offset(xid: MultiXactId) -> usize {
mx_offset_to_flags_offset(xid)
+ (pg_constants::MULTIXACT_FLAGBYTES_PER_GROUP
+ (xid as u16 % pg_constants::MULTIXACT_MEMBERS_PER_MEMBERGROUP) * 4) as usize
}
///
/// WAL redo thread
///
@@ -255,151 +232,7 @@ impl PostgresRedoManagerInternal {
let start = Instant::now();
let apply_result: Result<Bytes, Error>;
if tag.rel.forknum > pg_constants::INIT_FORKNUM {
const ZERO_PAGE: [u8; 8192] = [0u8; 8192];
let mut page = BytesMut::new();
if let Some(fpi) = base_img {
page.extend_from_slice(&fpi[..]);
} else {
page.extend_from_slice(&ZERO_PAGE);
}
for record in records {
let mut buf = record.rec.clone();
// 1. Parse XLogRecord struct
// FIXME: refactor to avoid code duplication.
let xlogrec = XLogRecord::from_bytes(&mut buf);
//move to main data
// TODO probably, we should store some records in our special format
// to avoid this weird parsing on replay
let skip = (record.main_data_offset - pg_constants::SIZEOF_XLOGRECORD) as usize;
if buf.remaining() > skip {
buf.advance(skip);
}
if xlogrec.xl_rmid == pg_constants::RM_CLOG_ID {
let info = xlogrec.xl_info & !pg_constants::XLR_INFO_MASK;
if info == pg_constants::CLOG_ZEROPAGE {
page.copy_from_slice(&ZERO_PAGE);
}
} else if xlogrec.xl_rmid == pg_constants::RM_XACT_ID {
let info = xlogrec.xl_info & pg_constants::XLOG_XACT_OPMASK;
let mut status = 0;
if info == pg_constants::XLOG_XACT_COMMIT {
status = pg_constants::TRANSACTION_STATUS_COMMITTED;
transaction_id_set_status(xlogrec.xl_xid, status, &mut page);
//handle subtrans
let _xact_time = buf.get_i64_le();
let mut xinfo = 0;
if xlogrec.xl_info & pg_constants::XLOG_XACT_HAS_INFO != 0 {
xinfo = buf.get_u32_le();
if xinfo & pg_constants::XACT_XINFO_HAS_DBINFO != 0 {
let _dbid = buf.get_u32_le();
let _tsid = buf.get_u32_le();
}
}
if xinfo & pg_constants::XACT_XINFO_HAS_SUBXACTS != 0 {
let nsubxacts = buf.get_i32_le();
for _i in 0..nsubxacts {
let subxact = buf.get_u32_le();
let blkno = subxact as u32 / pg_constants::CLOG_XACTS_PER_PAGE;
// only update xids on the requested page
if tag.blknum == blkno {
status = pg_constants::TRANSACTION_STATUS_SUB_COMMITTED;
transaction_id_set_status(subxact, status, &mut page);
}
}
}
} else if info == pg_constants::XLOG_XACT_ABORT {
status = pg_constants::TRANSACTION_STATUS_ABORTED;
transaction_id_set_status(xlogrec.xl_xid, status, &mut page);
//handle subtrans
let _xact_time = buf.get_i64_le();
let mut xinfo = 0;
if xlogrec.xl_info & pg_constants::XLOG_XACT_HAS_INFO != 0 {
xinfo = buf.get_u32_le();
if xinfo & pg_constants::XACT_XINFO_HAS_DBINFO != 0 {
let _dbid = buf.get_u32_le();
let _tsid = buf.get_u32_le();
}
}
if xinfo & pg_constants::XACT_XINFO_HAS_SUBXACTS != 0 {
let nsubxacts = buf.get_i32_le();
for _i in 0..nsubxacts {
let subxact = buf.get_u32_le();
let blkno = subxact as u32 / pg_constants::CLOG_XACTS_PER_PAGE;
// only update xids on the requested page
if tag.blknum == blkno {
status = pg_constants::TRANSACTION_STATUS_ABORTED;
transaction_id_set_status(subxact, status, &mut page);
}
}
}
} else if info != pg_constants::XLOG_XACT_PREPARE {
trace!("handle_apply_request for RM_XACT_ID-{} NOT SUPPORTED YET. RETURN. lsn {} main_data_offset {}, rec.len {}",
status,
record.lsn,
record.main_data_offset, record.rec.len());
}
} else if xlogrec.xl_rmid == pg_constants::RM_MULTIXACT_ID {
let info = xlogrec.xl_info & pg_constants::XLR_RMGR_INFO_MASK;
if info == pg_constants::XLOG_MULTIXACT_ZERO_OFF_PAGE
|| info == pg_constants::XLOG_MULTIXACT_ZERO_MEM_PAGE
{
page.copy_from_slice(&ZERO_PAGE);
} else if info == pg_constants::XLOG_MULTIXACT_CREATE_ID {
let xlrec = XlMultiXactCreate::decode(&mut buf);
if tag.rel.forknum == pg_constants::PG_MXACT_OFFSETS_FORKNUM {
let offs = (xlrec.mid % pg_constants::MULTIXACT_OFFSETS_PER_PAGE as u32
* 4) as usize;
LittleEndian::write_u32(&mut page[offs..offs + 4], xlrec.moff);
} else {
assert!(tag.rel.forknum == pg_constants::PG_MXACT_MEMBERS_FORKNUM);
for i in 0..xlrec.nmembers {
let blkno = i / pg_constants::MULTIXACT_MEMBERS_PER_PAGE as u32;
if blkno == tag.blknum {
// update only target block
let offset = xlrec.moff + i;
let memberoff = mx_offset_to_member_offset(offset);
let flagsoff = mx_offset_to_flags_offset(offset);
let bshift = mx_offset_to_flags_bitshift(offset);
let mut flagsval =
LittleEndian::read_u32(&page[flagsoff..flagsoff + 4]);
flagsval &=
!(((1 << pg_constants::MXACT_MEMBER_BITS_PER_XACT) - 1)
<< bshift);
flagsval |= xlrec.members[i as usize].status << bshift;
LittleEndian::write_u32(
&mut page[flagsoff..flagsoff + 4],
flagsval,
);
LittleEndian::write_u32(
&mut page[memberoff..memberoff + 4],
xlrec.members[i as usize].xid,
);
}
}
}
} else if info == pg_constants::XLOG_MULTIXACT_TRUNCATE_ID {
// empty page image indicates that this SLRU page is truncated and can be removed by GC
page.clear();
} else {
panic!();
}
} else if xlogrec.xl_rmid == pg_constants::RM_RELMAP_ID {
page.clear();
page.extend_from_slice(&buf[12..]); // skip xl_relmap_update
assert!(page.len() == 512); // size of pg_filenode.map
}
}
apply_result = Ok::<Bytes, Error>(page.freeze());
} else {
apply_result = process.apply_wal_records(tag, base_img, records).await;
}
apply_result = process.apply_wal_records(tag, base_img, records).await;
let duration = start.elapsed();

View File

@@ -18,8 +18,6 @@ fn main() {
// included header files changed.
.parse_callbacks(Box::new(bindgen::CargoCallbacks))
.whitelist_type("ControlFileData")
.whitelist_type("CheckPoint")
.whitelist_type("FullTransactionId")
.whitelist_var("PG_CONTROL_FILE_SIZE")
.whitelist_var("PG_CONTROLFILEDATA_OFFSETOF_CRC")
.whitelist_type("DBState")

View File

@@ -3,7 +3,6 @@
#![allow(non_snake_case)]
include!(concat!(env!("OUT_DIR"), "/bindings.rs"));
pub mod nonrelfile_utils;
pub mod pg_constants;
pub mod relfile_utils;
pub mod xlog_utils;
@@ -12,7 +11,6 @@ use bytes::{Buf, Bytes, BytesMut};
// sizeof(ControlFileData)
const SIZEOF_CONTROLDATA: usize = std::mem::size_of::<ControlFileData>();
const SIZEOF_CHECKPOINT: usize = std::mem::size_of::<CheckPoint>();
const OFFSETOF_CRC: usize = PG_CONTROLFILEDATA_OFFSETOF_CRC as usize;
impl ControlFileData {
@@ -71,42 +69,3 @@ pub fn encode_pg_control(controlfile: ControlFileData) -> Bytes {
buf.into()
}
pub fn encode_checkpoint(checkpoint: CheckPoint) -> Bytes {
let b: [u8; SIZEOF_CHECKPOINT];
b = unsafe { std::mem::transmute::<CheckPoint, [u8; SIZEOF_CHECKPOINT]>(checkpoint) };
Bytes::copy_from_slice(&b[..])
}
pub fn decode_checkpoint(mut buf: Bytes) -> Result<CheckPoint, anyhow::Error> {
let mut b = [0u8; SIZEOF_CHECKPOINT];
buf.copy_to_slice(&mut b);
let checkpoint: CheckPoint;
checkpoint = unsafe { std::mem::transmute::<[u8; SIZEOF_CHECKPOINT], CheckPoint>(b) };
Ok(checkpoint)
}
impl CheckPoint {
pub fn new(lsn: u64, timeline: u32) -> CheckPoint {
CheckPoint {
redo: lsn,
ThisTimeLineID: timeline,
PrevTimeLineID: timeline,
fullPageWrites: true, // TODO: get actual value of full_page_writes
nextXid: FullTransactionId {
value: pg_constants::FIRST_NORMAL_TRANSACTION_ID as u64,
}, // TODO: handle epoch?
nextOid: pg_constants::FIRST_BOOTSTRAP_OBJECT_ID,
nextMulti: 1,
nextMultiOffset: 0,
oldestXid: pg_constants::FIRST_NORMAL_TRANSACTION_ID,
oldestXidDB: 0,
oldestMulti: 1,
oldestMultiDB: 0,
time: 0,
oldestCommitTsXid: 0,
newestCommitTsXid: 0,
oldestActiveXid: pg_constants::INVALID_TRANSACTION_ID,
}
}
}

View File

@@ -15,32 +15,10 @@ pub const MAIN_FORKNUM: u8 = 0;
pub const FSM_FORKNUM: u8 = 1;
pub const VISIBILITYMAP_FORKNUM: u8 = 2;
pub const INIT_FORKNUM: u8 = 3;
// Special values for non-rel files' tags (Zenith-specific)
//Special values for non-rel files' tags
pub const PG_CONTROLFILE_FORKNUM: u8 = 42;
pub const PG_FILENODEMAP_FORKNUM: u8 = 43;
pub const PG_XACT_FORKNUM: u8 = 44;
pub const PG_MXACT_OFFSETS_FORKNUM: u8 = 45;
pub const PG_MXACT_MEMBERS_FORKNUM: u8 = 46;
pub const PG_TWOPHASE_FORKNUM: u8 = 47;
pub const PG_CHECKPOINT_FORKNUM: u8 = 48;
// From storage_xlog.h
pub const SMGR_TRUNCATE_HEAP: u32 = 0x0001;
// from pg_config.h. These can be changed with configure options --with-blocksize=BLOCKSIZE and
// --with-segsize=SEGSIZE, but assume the defaults for now.
pub const BLCKSZ: u16 = 8192;
pub const RELSEG_SIZE: u32 = 1024 * 1024 * 1024 / (BLCKSZ as u32);
//
// constants from clog.h
//
pub const CLOG_XACTS_PER_BYTE: u32 = 4;
pub const CLOG_XACTS_PER_PAGE: u32 = BLCKSZ as u32 * CLOG_XACTS_PER_BYTE;
pub const CLOG_BITS_PER_XACT: u8 = 2;
pub const CLOG_XACT_BITMASK: u8 = (1 << CLOG_BITS_PER_XACT) - 1;
//
// Constants from visbilitymap.h
//
@@ -48,22 +26,15 @@ pub const SIZE_OF_PAGE_HEADER: u16 = 24;
pub const BITS_PER_HEAPBLOCK: u16 = 2;
pub const HEAPBLOCKS_PER_PAGE: u16 = (BLCKSZ - SIZE_OF_PAGE_HEADER) * 8 / BITS_PER_HEAPBLOCK;
pub const TRANSACTION_STATUS_IN_PROGRESS: u8 = 0x00;
pub const TRANSACTION_STATUS_COMMITTED: u8 = 0x01;
pub const TRANSACTION_STATUS_ABORTED: u8 = 0x02;
pub const TRANSACTION_STATUS_SUB_COMMITTED: u8 = 0x03;
pub const CLOG_ZEROPAGE: u8 = 0x00;
pub const CLOG_TRUNCATE: u8 = 0x10;
// From xact.h
pub const XLOG_XACT_COMMIT: u8 = 0x00;
pub const XLOG_XACT_PREPARE: u8 = 0x10;
pub const XLOG_XACT_ABORT: u8 = 0x20;
// From srlu.h
pub const SLRU_PAGES_PER_SEGMENT: u32 = 32;
/* mask for filtering opcodes out of xl_info */
pub const XLOG_XACT_OPMASK: u8 = 0x70;
/* does this record have a 'xinfo' field or not */
@@ -83,29 +54,9 @@ pub const XACT_XINFO_HAS_TWOPHASE: u32 = 1u32 << 4;
// pub const XACT_XINFO_HAS_GID: u32 = 1u32 << 7;
// From pg_control.h and rmgrlist.h
pub const XLOG_NEXTOID: u8 = 0x30;
pub const XLOG_SWITCH: u8 = 0x40;
pub const XLOG_SMGR_TRUNCATE: u8 = 0x20;
// From multixact.h
pub const XLOG_MULTIXACT_ZERO_OFF_PAGE: u8 = 0x00;
pub const XLOG_MULTIXACT_ZERO_MEM_PAGE: u8 = 0x10;
pub const XLOG_MULTIXACT_CREATE_ID: u8 = 0x20;
pub const XLOG_MULTIXACT_TRUNCATE_ID: u8 = 0x30;
pub const MULTIXACT_OFFSETS_PER_PAGE: u16 = BLCKSZ / 4;
pub const MXACT_MEMBER_BITS_PER_XACT: u16 = 8;
pub const MXACT_MEMBER_FLAGS_PER_BYTE: u16 = 1;
pub const MULTIXACT_FLAGBYTES_PER_GROUP: u16 = 4;
pub const MULTIXACT_MEMBERS_PER_MEMBERGROUP: u16 =
MULTIXACT_FLAGBYTES_PER_GROUP * MXACT_MEMBER_FLAGS_PER_BYTE;
/* size in bytes of a complete group */
pub const MULTIXACT_MEMBERGROUP_SIZE: u16 =
4 * MULTIXACT_MEMBERS_PER_MEMBERGROUP + MULTIXACT_FLAGBYTES_PER_GROUP;
pub const MULTIXACT_MEMBERGROUPS_PER_PAGE: u16 = BLCKSZ / MULTIXACT_MEMBERGROUP_SIZE;
pub const MULTIXACT_MEMBERS_PER_PAGE: u16 =
MULTIXACT_MEMBERGROUPS_PER_PAGE * MULTIXACT_MEMBERS_PER_MEMBERGROUP;
// From heapam_xlog.h
pub const XLOG_HEAP_INSERT: u8 = 0x00;
pub const XLOG_HEAP_DELETE: u8 = 0x10;
@@ -144,6 +95,11 @@ pub const XLOG_TBLSPC_DROP: u8 = 0x10;
pub const SIZEOF_XLOGRECORD: u32 = 24;
// from pg_config.h. These can be changed with configure options --with-blocksize=BLOCKSIZE and
// --with-segsize=SEGSIZE, but assume the defaults for now.
pub const BLCKSZ: u16 = 8192;
pub const RELSEG_SIZE: u32 = 1024 * 1024 * 1024 / (BLCKSZ as u32);
//
// from xlogrecord.h
//
@@ -166,11 +122,5 @@ pub const BKPIMAGE_HAS_HOLE: u8 = 0x01; /* page image has "hole" */
pub const BKPIMAGE_IS_COMPRESSED: u8 = 0x02; /* page image is compressed */
pub const BKPIMAGE_APPLY: u8 = 0x04; /* page image should be restored during replay */
/* From transam.h */
pub const FIRST_NORMAL_TRANSACTION_ID: u32 = 3;
pub const INVALID_TRANSACTION_ID: u32 = 0;
pub const FIRST_BOOTSTRAP_OBJECT_ID: u32 = 12000;
pub const FIRST_NORMAL_OBJECT_ID: u32 = 16384;
/* FIXME: pageserver should request wal_seg_size from compute node */
pub const WAL_SEGMENT_SIZE: usize = 16 * 1024 * 1024;

View File

@@ -38,16 +38,6 @@ pub fn forknumber_to_name(forknum: u8) -> Option<&'static str> {
pg_constants::FSM_FORKNUM => Some("fsm"),
pg_constants::VISIBILITYMAP_FORKNUM => Some("vm"),
pg_constants::INIT_FORKNUM => Some("init"),
// These should not appear in WAL records, but we use them internally,
// and need to be prepared to print them out in log messages and such
pg_constants::PG_CONTROLFILE_FORKNUM => Some("controlfile"),
pg_constants::PG_FILENODEMAP_FORKNUM => Some("filenodemap"),
pg_constants::PG_XACT_FORKNUM => Some("xact"),
pg_constants::PG_MXACT_OFFSETS_FORKNUM => Some("mxact_offsets"),
pg_constants::PG_MXACT_MEMBERS_FORKNUM => Some("mxact_members"),
pg_constants::PG_TWOPHASE_FORKNUM => Some("twophase"),
_ => Some("UNKNOWN FORKNUM"),
}
}

View File

@@ -233,9 +233,7 @@ impl TimelineTools for Option<Arc<Timeline>> {
fn find_end_of_wal(&self, data_dir: &Path, precise: bool) -> (Lsn, TimeLineID) {
let seg_size = self.get().get_info().server.wal_seg_size as usize;
let (lsn, timeline) = find_end_of_wal(data_dir, seg_size, precise);
let wal_start = Lsn((seg_size * 2) as u64); // FIXME: handle pg_resetwal
let lsn = Lsn::max(Lsn(lsn), wal_start);
(lsn, timeline)
(Lsn(lsn), timeline)
}
}