Compare commits

...

16 Commits

Author SHA1 Message Date
Konstantin Knizhnik
600588034b Store page image with the same LSN as replaced WAL record 2021-07-09 12:06:46 +03:00
Konstantin Knizhnik
9f015bdc60 Include zenith.signal file in tarball 2021-06-21 16:01:10 +03:00
Konstantin Knizhnik
c564272142 [refer #258] Handle CHECKOINT_ONLINE WAL record 2021-06-18 23:55:59 +03:00
Konstantin Knizhnik
14a0ae5456 Start compute node without generation of new WAL segment 2021-06-17 22:55:13 +03:00
Konstantin Knizhnik
0d9805a505 Handle non-relational data in PUSH command 2021-06-17 22:49:44 +03:00
Konstantin Knizhnik
87fce3fcd5 1. Always start repliction from the begging of WAL segment (to be able to skip missed segments)
2. Do not materialize always last version of objects in GC (only when needed)
3. Fix history test
4. Fix CPU consumption in wal_keeper when connection is broken
5. Fix handling of --recall parameter in walkeeper
2021-06-17 22:49:44 +03:00
Konstantin Knizhnik
04e1ee5ce3 Correctly handle Unlink message in GC for SLRU 2021-06-17 22:49:44 +03:00
anastasia
ec675bbdd6 use correct req_lsn when gathering basebackup tar 2021-06-17 22:49:44 +03:00
Konstantin Knizhnik
a3b70745a9 Support collecting nonrel object in GC 2021-06-17 22:49:44 +03:00
Konstantin Knizhnik
41c352be7f Batch nextXid updates in object storage checkpoints 2021-06-17 22:49:44 +03:00
Konstantin Knizhnik
8a5fcf52c0 Remove special handling of shared catalogs 2021-06-17 22:49:44 +03:00
Konstantin Knizhnik
abb114decd Fix review issues:
- Add comments
- Handle members multixact wraparoud
- Extract WAL segment creation (pg_resetwal) to postgres_ffi
- Fix adding prepared 2PC files to basebackup
2021-06-17 22:49:44 +03:00
Konstantin Knizhnik
e7587ceb81 Makecheckpoint and pg_control records in object storage also versioned 2021-06-17 22:49:44 +03:00
Konstantin Knizhnik
6d38b9ce6a Fix unit tests after merging 2021-06-17 22:49:44 +03:00
Konstantin Knizhnik
47ef9c7ef4 Fix various bugs caused by switch to new storage model 2021-06-17 22:49:44 +03:00
Konstantin Knizhnik
d73cb49f89 Support non-rel objects 2021-06-17 22:49:44 +03:00
23 changed files with 1872 additions and 741 deletions

View File

@@ -301,6 +301,8 @@ impl PostgresNode {
),
)?;
fs::create_dir_all(self.pgdata().join("pg_wal"))?;
fs::create_dir_all(self.pgdata().join("pg_wal").join("archive_status"))?;
Ok(())
}

View File

@@ -4,181 +4,260 @@
//! TODO: this module has nothing to do with PostgreSQL pg_basebackup.
//! It could use a better name.
//!
//! Stateless Postgres compute node is lauched by sending taball which contains on-relational data (multixacts, clog, filenodemaps, twophase files)
//! and generate pg_control and dummy segment of WAL. This module is responsible for creation of such tarball from snapshot directry and
//! data stored in object storage.
//!
use crate::ZTimelineId;
use bytes::{BufMut, BytesMut};
use log::*;
use std::io::Write;
use std::sync::Arc;
use tar::Builder;
use std::time::SystemTime;
use tar::{Builder, Header};
use walkdir::WalkDir;
use crate::repository::Timeline;
use crate::repository::{DatabaseTag, ObjectTag, Timeline};
use postgres_ffi::relfile_utils::*;
use postgres_ffi::xlog_utils::*;
use postgres_ffi::*;
use zenith_utils::lsn::Lsn;
///
/// Generate tarball with non-relational files from repository
///
pub fn send_tarball_at_lsn(
write: &mut dyn Write,
timelineid: ZTimelineId,
_timeline: &Arc<dyn Timeline>,
_lsn: Lsn,
snapshot_lsn: Lsn,
) -> anyhow::Result<()> {
let mut ar = Builder::new(write);
let snappath = format!("timelines/{}/snapshots/{:016X}", timelineid, snapshot_lsn.0);
let walpath = format!("timelines/{}/wal", timelineid);
debug!("sending tarball of snapshot in {}", snappath);
for entry in WalkDir::new(&snappath) {
let entry = entry?;
let fullpath = entry.path();
let relpath = entry.path().strip_prefix(&snappath).unwrap();
if relpath.to_str().unwrap() == "" {
continue;
}
if entry.file_type().is_dir() {
trace!(
"sending dir {} as {}",
fullpath.display(),
relpath.display()
);
ar.append_dir(relpath, fullpath)?;
} else if entry.file_type().is_symlink() {
error!("ignoring symlink in snapshot dir");
} else if entry.file_type().is_file() {
// Shared catalogs are exempt
if relpath.starts_with("global/") {
trace!("sending shared catalog {}", relpath.display());
ar.append_path_with_name(fullpath, relpath)?;
} else if !is_rel_file_path(relpath.to_str().unwrap()) {
trace!("sending {}", relpath.display());
ar.append_path_with_name(fullpath, relpath)?;
} else {
trace!("not sending {}", relpath.display());
}
} else {
error!("unknown file type: {}", fullpath.display());
}
}
// FIXME: Also send all the WAL. The compute node would only need
// the WAL that applies to non-relation files, because the page
// server handles all the relation files. But we don't have a
// mechanism for separating relation and non-relation WAL at the
// moment.
for entry in std::fs::read_dir(&walpath)? {
let entry = entry?;
let fullpath = &entry.path();
let relpath = fullpath.strip_prefix(&walpath).unwrap();
if !entry.path().is_file() {
continue;
}
let archive_fname = relpath.to_str().unwrap();
let archive_fname = archive_fname
.strip_suffix(".partial")
.unwrap_or(&archive_fname);
let archive_path = "pg_wal/".to_owned() + archive_fname;
ar.append_path_with_name(fullpath, archive_path)?;
}
ar.finish()?;
debug!("all tarred up!");
Ok(())
/// This is shorliving object only for the time of tarball creation,
/// created mostly to avoid passing a lot of parameters between varyouds functions
/// used for constructing tarball.
pub struct Basebackup<'a> {
ar: Builder<&'a mut dyn Write>,
timeline: &'a Arc<dyn Timeline>,
lsn: Lsn,
snappath: String,
slru_buf: [u8; pg_constants::SLRU_SEG_SIZE],
slru_segno: u32,
slru_path: &'static str,
}
///
/// Send a tarball containing a snapshot of all non-relation files in the
/// PostgreSQL data directory, at given LSN
///
/// There must be a snapshot at the given LSN in the snapshots directory, we cannot
/// reconstruct the state at an arbitrary LSN at the moment.
///
pub fn send_snapshot_tarball(
write: &mut dyn Write,
timelineid: ZTimelineId,
snapshotlsn: Lsn,
) -> Result<(), std::io::Error> {
let mut ar = Builder::new(write);
let snappath = format!("timelines/{}/snapshots/{:016X}", timelineid, snapshotlsn.0);
let walpath = format!("timelines/{}/wal", timelineid);
debug!("sending tarball of snapshot in {}", snappath);
//ar.append_dir_all("", &snappath)?;
for entry in WalkDir::new(&snappath) {
let entry = entry?;
let fullpath = entry.path();
let relpath = entry.path().strip_prefix(&snappath).unwrap();
if relpath.to_str().unwrap() == "" {
continue;
impl<'a> Basebackup<'a> {
pub fn new(
write: &'a mut dyn Write,
timelineid: ZTimelineId,
timeline: &'a Arc<dyn Timeline>,
lsn: Lsn,
snapshot_lsn: Lsn,
) -> Basebackup<'a> {
Basebackup {
ar: Builder::new(write),
timeline,
lsn,
snappath: format!("timelines/{}/snapshots/{:016X}", timelineid, snapshot_lsn.0),
slru_path: "",
slru_segno: u32::MAX,
slru_buf: [0u8; pg_constants::SLRU_SEG_SIZE],
}
}
if entry.file_type().is_dir() {
trace!(
"sending dir {} as {}",
fullpath.display(),
relpath.display()
);
ar.append_dir(relpath, fullpath)?;
} else if entry.file_type().is_symlink() {
error!("ignoring symlink in snapshot dir");
} else if entry.file_type().is_file() {
// Shared catalogs are exempt
if relpath.starts_with("global/") {
trace!("sending shared catalog {}", relpath.display());
ar.append_path_with_name(fullpath, relpath)?;
} else if !is_rel_file_path(relpath.to_str().unwrap()) {
trace!("sending {}", relpath.display());
ar.append_path_with_name(fullpath, relpath)?;
} else {
trace!("not sending {}", relpath.display());
#[rustfmt::skip] // otherwise "cargo fmt" produce very strange formatting for macch arms of self.timeline.list_nonrels
pub fn send_tarball(&mut self) -> anyhow::Result<()> {
debug!("sending tarball of snapshot in {}", self.snappath);
for entry in WalkDir::new(&self.snappath) {
let entry = entry?;
let fullpath = entry.path();
let relpath = entry.path().strip_prefix(&self.snappath).unwrap();
// FIXME: For now, also send all the relation files.
// This really shouldn't be necessary, and kind of
// defeats the point of having a page server in the
// first place. But it is useful at least when
// debugging with the DEBUG_COMPARE_LOCAL option (see
// vendor/postgres/src/backend/storage/smgr/pagestore_smgr.c)
ar.append_path_with_name(fullpath, relpath)?;
if relpath.to_str().unwrap() == "" {
continue;
}
if entry.file_type().is_dir() {
trace!(
"sending dir {} as {}",
fullpath.display(),
relpath.display()
);
self.ar.append_dir(relpath, fullpath)?;
} else if entry.file_type().is_symlink() {
error!("ignoring symlink in snapshot dir");
} else if entry.file_type().is_file() {
if !is_rel_file_path(relpath.to_str().unwrap()) {
if entry.file_name() != "pg_filenode.map" // this files will be generated from object storage
&& !relpath.starts_with("pg_xact/")
&& !relpath.starts_with("pg_multixact/")
{
trace!("sending {}", relpath.display());
self.ar.append_path_with_name(fullpath, relpath)?;
}
} else { // relation pages are loaded on demand and should not be included in tarball
trace!("not sending {}", relpath.display());
}
} else {
error!("unknown file type: {}", fullpath.display());
}
}
// Generate non-relational files.
// Iteration is sorted order: all objects of the same time are grouped and traversed
// in key ascending order. For example all pg_xact records precede pg_multixact records and are sorted by block number.
// It allows to easily construct SLRU segments (32 blocks).
for obj in self.timeline.list_nonrels(self.lsn)? {
match obj {
ObjectTag::Clog(slru) =>
self.add_slru_segment("pg_xact", &obj, slru.blknum)?,
ObjectTag::MultiXactMembers(slru) =>
self.add_slru_segment("pg_multixact/members", &obj, slru.blknum)?,
ObjectTag::MultiXactOffsets(slru) =>
self.add_slru_segment("pg_multixact/offsets", &obj, slru.blknum)?,
ObjectTag::FileNodeMap(db) =>
self.add_relmap_file(&obj, &db)?,
ObjectTag::TwoPhase(prepare) =>
self.add_twophase_file(&obj, prepare.xid)?,
_ => {}
}
}
self.finish_slru_segment()?; // write last non-completed SLRU segment (if any)
self.add_pgcontrol_file()?;
self.ar.finish()?;
debug!("all tarred up!");
Ok(())
}
//
// Generate SRLU segment files from repository. Path identifiers SLRU kind (pg_xact, pg_multixact/members, ...).
// Intiallly pass is empty string.
//
fn add_slru_segment(
&mut self,
path: &'static str,
tag: &ObjectTag,
page: u32,
) -> anyhow::Result<()> {
let img = self.timeline.get_page_at_lsn_nowait(*tag, self.lsn)?;
// Zero length image indicates truncated segment: just skip it
if !img.is_empty() {
assert!(img.len() == pg_constants::BLCKSZ as usize);
let segno = page / pg_constants::SLRU_PAGES_PER_SEGMENT;
if self.slru_path != "" && (self.slru_segno != segno || self.slru_path != path) {
// Switch to new segment: save old one
let segname = format!("{}/{:>04X}", self.slru_path, self.slru_segno);
let header = new_tar_header(&segname, pg_constants::SLRU_SEG_SIZE as u64)?;
self.ar.append(&header, &self.slru_buf[..])?;
self.slru_buf = [0u8; pg_constants::SLRU_SEG_SIZE]; // reinitialize segment buffer
}
self.slru_segno = segno;
self.slru_path = path;
let offs_start = (page % pg_constants::SLRU_PAGES_PER_SEGMENT) as usize
* pg_constants::BLCKSZ as usize;
let offs_end = offs_start + pg_constants::BLCKSZ as usize;
self.slru_buf[offs_start..offs_end].copy_from_slice(&img);
}
Ok(())
}
//
// We flush SLRU segments to the tarball once them are completed.
// This method is used to flush last (may be incompleted) segment.
//
fn finish_slru_segment(&mut self) -> anyhow::Result<()> {
if self.slru_path != "" {
// is there is some incompleted segment
let segname = format!("{}/{:>04X}", self.slru_path, self.slru_segno);
let header = new_tar_header(&segname, pg_constants::SLRU_SEG_SIZE as u64)?;
self.ar.append(&header, &self.slru_buf[..])?;
}
Ok(())
}
//
// Extract pg_filenode.map files from repository
//
fn add_relmap_file(&mut self, tag: &ObjectTag, db: &DatabaseTag) -> anyhow::Result<()> {
let img = self.timeline.get_page_at_lsn_nowait(*tag, self.lsn)?;
info!("add_relmap_file {:?}", db);
let path = if db.spcnode == pg_constants::GLOBALTABLESPACE_OID {
String::from("global/pg_filenode.map") // filenode map for global tablespace
} else {
error!("unknown file type: {}", fullpath.display());
}
// User defined tablespaces are not supported
assert!(db.spcnode == pg_constants::DEFAULTTABLESPACE_OID);
let src_path = format!("{}/base/1/PG_VERSION", self.snappath);
let dst_path = format!("base/{}/PG_VERSION", db.dbnode);
self.ar.append_path_with_name(&src_path, &dst_path)?;
format!("base/{}/pg_filenode.map", db.dbnode)
};
assert!(img.len() == 512);
let header = new_tar_header(&path, img.len() as u64)?;
self.ar.append(&header, &img[..])?;
Ok(())
}
// FIXME: Also send all the WAL. The compute node would only need
// the WAL that applies to non-relation files, because the page
// server handles all the relation files. But we don't have a
// mechanism for separating relation and non-relation WAL at the
// moment.
for entry in std::fs::read_dir(&walpath)? {
let entry = entry?;
let fullpath = &entry.path();
let relpath = fullpath.strip_prefix(&walpath).unwrap();
if !entry.path().is_file() {
continue;
//
// Extract twophase state files
//
fn add_twophase_file(&mut self, tag: &ObjectTag, xid: TransactionId) -> anyhow::Result<()> {
// Include in tarball two-phase files only of in-progress transactions
if self.timeline.get_tx_status(xid, self.lsn)?
== pg_constants::TRANSACTION_STATUS_IN_PROGRESS
{
let img = self.timeline.get_page_at_lsn_nowait(*tag, self.lsn)?;
let mut buf = BytesMut::new();
buf.extend_from_slice(&img[..]);
let crc = crc32c::crc32c(&img[..]);
buf.put_u32_le(crc);
let path = format!("pg_twophase/{:>08X}", xid);
let header = new_tar_header(&path, buf.len() as u64)?;
self.ar.append(&header, &buf[..])?;
}
let archive_fname = relpath.to_str().unwrap();
let archive_fname = archive_fname
.strip_suffix(".partial")
.unwrap_or(&archive_fname);
let archive_path = "pg_wal/".to_owned() + archive_fname;
ar.append_path_with_name(fullpath, archive_path)?;
Ok(())
}
ar.finish()?;
debug!("all tarred up!");
Ok(())
//
// Add generated pg_control file
//
fn add_pgcontrol_file(&mut self) -> anyhow::Result<()> {
let checkpoint_bytes = self
.timeline
.get_page_at_lsn_nowait(ObjectTag::Checkpoint, self.lsn)?;
let pg_control_bytes = self
.timeline
.get_page_at_lsn_nowait(ObjectTag::ControlFile, self.lsn)?;
let mut pg_control = ControlFileData::decode(&pg_control_bytes)?;
let mut checkpoint = CheckPoint::decode(&checkpoint_bytes)?;
// Generate new pg_control and WAL needed for bootstrap
let checkpoint_segno = self.lsn.segment_number(pg_constants::WAL_SEGMENT_SIZE);
let checkpoint_lsn = XLogSegNoOffsetToRecPtr(
checkpoint_segno,
XLOG_SIZE_OF_XLOG_LONG_PHD as u32,
pg_constants::WAL_SEGMENT_SIZE,
);
checkpoint.redo = self.lsn.0 + self.lsn.calc_padding(8u32);
//reset some fields we don't want to preserve
checkpoint.oldestActiveXid = 0;
//save new values in pg_control
pg_control.checkPoint = checkpoint_lsn;
pg_control.checkPointCopy = checkpoint;
info!("pg_control.state = {}", pg_control.state);
pg_control.state = pg_constants::DB_SHUTDOWNED;
// add zenith.signal file
self.ar.append(&new_tar_header("zenith.signal", 0)?, &b""[..])?;
//send pg_control
let pg_control_bytes = pg_control.encode();
let header = new_tar_header("global/pg_control", pg_control_bytes.len() as u64)?;
self.ar.append(&header, &pg_control_bytes[..])?;
//send wal segment
let wal_file_name = XLogFileName(
1, // FIXME: always use Postgres timeline 1
checkpoint_segno,
pg_constants::WAL_SEGMENT_SIZE,
);
let wal_file_path = format!("pg_wal/{}", wal_file_name);
let header = new_tar_header(&wal_file_path, pg_constants::WAL_SEGMENT_SIZE as u64)?;
let wal_seg = generate_wal_segment(&pg_control);
self.ar.append(&header, &wal_seg[..])?;
Ok(())
}
}
///
@@ -228,6 +307,28 @@ fn parse_rel_file_path(path: &str) -> Result<(), FilePathError> {
}
}
//
// Check if it is relational file
//
fn is_rel_file_path(path: &str) -> bool {
parse_rel_file_path(path).is_ok()
}
//
// Create new tarball entry header
//
fn new_tar_header(path: &str, size: u64) -> anyhow::Result<Header> {
let mut header = Header::new_gnu();
header.set_size(size);
header.set_path(path)?;
header.set_mode(0b110000000); // -rw-------
header.set_mtime(
// use currenttime as last modified time
SystemTime::now()
.duration_since(SystemTime::UNIX_EPOCH)
.unwrap()
.as_secs(),
);
header.set_cksum();
Ok(header)
}

View File

@@ -127,9 +127,6 @@ pub fn init_repo(conf: &'static PageServerConf, repo_dir: &Path) -> Result<()> {
// Remove pg_wal
fs::remove_dir_all(tmppath.join("pg_wal"))?;
force_crash_recovery(&tmppath)?;
println!("updated pg_control");
// Move the data directory as an initial base backup.
// FIXME: It would be enough to only copy the non-relation files here, the relation
// data was already loaded into the repository.
@@ -345,27 +342,6 @@ fn parse_point_in_time(conf: &PageServerConf, s: &str) -> Result<PointInTime> {
bail!("could not parse point-in-time {}", s);
}
// If control file says the cluster was shut down cleanly, modify it, to mark
// it as crashed. That forces crash recovery when you start the cluster.
//
// FIXME:
// We currently do this to the initial snapshot in "zenith init". It would
// be more natural to do this when the snapshot is restored instead, but we
// currently don't have any code to create new snapshots, so it doesn't matter
// Or better yet, use a less hacky way of putting the cluster into recovery.
// Perhaps create a backup label file in the data directory when it's restored.
fn force_crash_recovery(datadir: &Path) -> Result<()> {
// Read in the control file
let controlfilepath = datadir.to_path_buf().join("global").join("pg_control");
let mut controlfile = ControlFileData::decode(&fs::read(controlfilepath.as_path())?)?;
controlfile.state = postgres_ffi::DBState_DB_IN_PRODUCTION;
fs::write(controlfilepath.as_path(), controlfile.encode())?;
Ok(())
}
fn create_timeline(conf: &PageServerConf, ancestor: Option<PointInTime>) -> Result<ZTimelineId> {
// Create initial timeline
let mut tli_buf = [0u8; 16];

View File

@@ -23,7 +23,6 @@ use bytes::Bytes;
use log::*;
use postgres_ffi::pg_constants;
use serde::{Deserialize, Serialize};
use std::cmp::max;
use std::collections::{BTreeMap, HashMap, HashSet};
use std::convert::TryInto;
use std::sync::{Arc, Mutex, RwLock};
@@ -113,10 +112,11 @@ impl Repository for ObjectRepository {
ancestor_timeline: None,
ancestor_lsn: start_lsn,
};
let val = ObjectValue::TimelineMetadata(metadata);
self.obj_store.put(
&timeline_metadata_key(timelineid),
Lsn(0),
&MetadataEntry::ser(&metadata)?,
&ObjectValue::ser(&val)?,
)?;
info!("Created empty timeline {}", timelineid);
@@ -138,8 +138,7 @@ impl Repository for ObjectRepository {
/// Branch a timeline
fn branch_timeline(&self, src: ZTimelineId, dst: ZTimelineId, at_lsn: Lsn) -> Result<()> {
// just to check the source timeline exists
let _ = self.get_timeline(src)?;
let src_timeline = self.get_timeline(src)?;
// Write a metadata key, noting the ancestor of th new timeline. There is initially
// no data in it, but all the read-calls know to look into the ancestor.
@@ -149,12 +148,25 @@ impl Repository for ObjectRepository {
ancestor_timeline: Some(src),
ancestor_lsn: at_lsn,
};
let val = ObjectValue::TimelineMetadata(metadata);
self.obj_store.put(
&timeline_metadata_key(dst),
Lsn(0),
&MetadataEntry::ser(&metadata)?,
&ObjectValue::ser(&val)?,
)?;
// Copy non-rel objects
for tag in src_timeline.list_nonrels(at_lsn)? {
match tag {
ObjectTag::TimelineMetadataTag => {} // skip it
_ => {
let img = src_timeline.get_page_at_lsn_nowait(tag, at_lsn)?;
let val = ObjectValue::Page(img);
let key = ObjectKey { timeline: dst, tag };
self.obj_store.put(&key, at_lsn, &ObjectValue::ser(&val)?)?;
}
}
}
Ok(())
}
}
@@ -223,19 +235,22 @@ impl ObjectTimeline {
let v = obj_store
.get(&timeline_metadata_key(timelineid), Lsn(0))
.with_context(|| "timeline not found in repository")?;
let metadata = MetadataEntry::des(&v)?;
let timeline = ObjectTimeline {
timelineid,
obj_store,
walredo_mgr,
last_valid_lsn: SeqWait::new(metadata.last_valid_lsn),
last_record_lsn: AtomicLsn::new(metadata.last_record_lsn.0),
ancestor_timeline: metadata.ancestor_timeline,
ancestor_lsn: metadata.ancestor_lsn,
rel_meta: RwLock::new(BTreeMap::new()),
};
Ok(timeline)
if let ObjectValue::TimelineMetadata(metadata) = ObjectValue::des(&v)? {
let timeline = ObjectTimeline {
timelineid,
obj_store,
walredo_mgr,
last_valid_lsn: SeqWait::new(metadata.last_valid_lsn),
last_record_lsn: AtomicLsn::new(metadata.last_record_lsn.0),
ancestor_timeline: metadata.ancestor_timeline,
ancestor_lsn: metadata.ancestor_lsn,
rel_meta: RwLock::new(BTreeMap::new()),
};
Ok(timeline)
} else {
bail!("Invalid timeline metadata");
}
}
}
@@ -245,12 +260,56 @@ impl Timeline for ObjectTimeline {
//------------------------------------------------------------------------------
/// Look up given page in the cache.
fn get_page_at_lsn(&self, tag: BufferTag, req_lsn: Lsn) -> Result<Bytes> {
fn get_page_at_lsn(&self, tag: ObjectTag, req_lsn: Lsn) -> Result<Bytes> {
let lsn = self.wait_lsn(req_lsn)?;
self.get_page_at_lsn_nowait(tag, lsn)
}
fn get_page_at_lsn_nowait(&self, tag: ObjectTag, req_lsn: Lsn) -> Result<Bytes> {
const ZERO_PAGE: [u8; 8192] = [0u8; 8192];
// Look up the page entry. If it's a page image, return that. If it's a WAL record,
// ask the WAL redo service to reconstruct the page image from the WAL records.
let searchkey = ObjectKey {
timeline: self.timelineid,
tag,
};
let mut iter = self.object_versions(&*self.obj_store, &searchkey, req_lsn)?;
if let Some((lsn, value)) = iter.next().transpose()? {
let page_img: Bytes;
match ObjectValue::des(&value)? {
ObjectValue::Page(img) => {
page_img = img;
}
ObjectValue::WALRecord(_rec) => {
// Request the WAL redo manager to apply the WAL records for us.
let (base_img, records) = self.collect_records_for_apply(tag, lsn)?;
page_img = self.walredo_mgr.request_redo(tag, lsn, base_img, records)?;
self.put_page_image(tag, lsn, page_img.clone())?;
}
x => bail!("Unexpected object value: {:?}", x),
}
// FIXME: assumes little-endian. Only used for the debugging log though
let page_lsn_hi = u32::from_le_bytes(page_img.get(0..4).unwrap().try_into().unwrap());
let page_lsn_lo = u32::from_le_bytes(page_img.get(4..8).unwrap().try_into().unwrap());
trace!(
"Returning page with LSN {:X}/{:X} for {:?} from {} (request {})",
page_lsn_hi,
page_lsn_lo,
tag,
lsn,
req_lsn
);
return Ok(page_img);
}
trace!("page {:?} at {} not found", tag, req_lsn);
Ok(Bytes::from_static(&ZERO_PAGE))
/* return Err("could not find page image")?; */
}
/// Get size of relation
fn get_rel_size(&self, rel: RelTag, lsn: Lsn) -> Result<u32> {
let lsn = self.wait_lsn(lsn)?;
@@ -282,6 +341,11 @@ impl Timeline for ObjectTimeline {
Ok(false)
}
/// Get a list of non-relational objects
fn list_nonrels<'a>(&'a self, lsn: Lsn) -> Result<Box<dyn Iterator<Item = ObjectTag> + 'a>> {
self.obj_store.list_objects(self.timelineid, true, lsn)
}
/// Get a list of all distinct relations in given tablespace and database.
fn list_rels(&self, spcnode: u32, dbnode: u32, lsn: Lsn) -> Result<HashSet<RelTag>> {
// List all relations in this timeline.
@@ -306,10 +370,12 @@ impl Timeline for ObjectTimeline {
.obj_store
.get(&timeline_metadata_key(timeline), Lsn(0))
.with_context(|| "timeline not found in repository")?;
let metadata = MetadataEntry::des(&v)?;
prev_timeline = metadata.ancestor_timeline;
lsn = metadata.ancestor_lsn;
if let ObjectValue::TimelineMetadata(metadata) = ObjectValue::des(&v)? {
prev_timeline = metadata.ancestor_timeline;
lsn = metadata.ancestor_lsn;
} else {
bail!("Invalid timeline metadata");
}
}
Ok(all_rels)
@@ -325,96 +391,110 @@ impl Timeline for ObjectTimeline {
///
/// This will implicitly extend the relation, if the page is beyond the
/// current end-of-file.
fn put_wal_record(&self, tag: BufferTag, rec: WALRecord) -> Result<()> {
fn put_wal_record(&self, tag: ObjectTag, rec: WALRecord) -> Result<()> {
let lsn = rec.lsn;
let key = ObjectKey {
timeline: self.timelineid,
buf_tag: tag,
tag,
};
let val = PageEntry::WALRecord(rec);
let val = ObjectValue::WALRecord(rec);
self.obj_store.put(&key, lsn, &PageEntry::ser(&val)?)?;
debug!(
"put_wal_record rel {} blk {} at {}",
tag.rel, tag.blknum, lsn
);
self.obj_store.put(&key, lsn, &ObjectValue::ser(&val)?)?;
debug!("put_wal_record {:?} at {}", tag, lsn);
// Also check if this created or extended the file
let old_nblocks = self.relsize_get_nowait(tag.rel, lsn)?.unwrap_or(0);
if let ObjectTag::RelationBuffer(tag) = tag {
// Also check if this created or extended the file
let old_nblocks = self.relsize_get_nowait(tag.rel, lsn)?.unwrap_or(0);
if tag.blknum >= old_nblocks {
let new_nblocks = tag.blknum + 1;
let key = relation_size_key(self.timelineid, tag.rel);
let val = RelationSizeEntry::Size(new_nblocks);
if tag.blknum >= old_nblocks {
let new_nblocks = tag.blknum + 1;
let key = relation_size_key(self.timelineid, tag.rel);
let val = ObjectValue::RelationSize(new_nblocks);
trace!(
"Extended relation {} from {} to {} blocks at {}",
tag.rel,
old_nblocks,
new_nblocks,
lsn
);
trace!(
"Extended relation {} from {} to {} blocks at {}",
tag.rel,
old_nblocks,
new_nblocks,
lsn
);
self.obj_store
.put(&key, lsn, &RelationSizeEntry::ser(&val)?)?;
let mut rel_meta = self.rel_meta.write().unwrap();
rel_meta.insert(
tag.rel,
RelMetadata {
size: new_nblocks,
last_updated: lsn,
},
);
self.obj_store.put(&key, lsn, &ObjectValue::ser(&val)?)?;
let mut rel_meta = self.rel_meta.write().unwrap();
rel_meta.insert(
tag.rel,
RelMetadata {
size: new_nblocks,
last_updated: lsn,
},
);
}
}
Ok(())
}
/// Unlink object. This method is used for marking dropped relations
/// and removed segments of SLRUs.
fn put_unlink(&self, tag: ObjectTag, lsn: Lsn) -> Result<()> {
let key = ObjectKey {
timeline: self.timelineid,
tag,
};
let val = ObjectValue::Unlink;
self.obj_store.put(&key, lsn, &ObjectValue::ser(&val)?)?;
Ok(())
}
fn put_raw_data(&self, tag: ObjectTag, lsn: Lsn, data: &[u8]) -> Result<()> {
let key = ObjectKey {
timeline: self.timelineid,
tag,
};
self.obj_store.put(&key, lsn, data)?;
Ok(())
}
///
/// Memorize a full image of a page version
///
fn put_page_image(&self, tag: BufferTag, lsn: Lsn, img: Bytes) -> Result<()> {
fn put_page_image(&self, tag: ObjectTag, lsn: Lsn, img: Bytes) -> Result<()> {
let key = ObjectKey {
timeline: self.timelineid,
buf_tag: tag,
tag,
};
let val = PageEntry::Page(img);
let val = ObjectValue::Page(img);
self.obj_store.put(&key, lsn, &PageEntry::ser(&val)?)?;
self.obj_store.put(&key, lsn, &ObjectValue::ser(&val)?)?;
debug!(
"put_page_image rel {} blk {} at {}",
tag.rel, tag.blknum, lsn
);
debug!("put_page_image {:?} at {}", tag, lsn);
// Also check if this created or extended the file
let old_nblocks = self.relsize_get_nowait(tag.rel, lsn)?.unwrap_or(0);
if let ObjectTag::RelationBuffer(tag) = tag {
// Also check if this created or extended the file
let old_nblocks = self.relsize_get_nowait(tag.rel, lsn)?.unwrap_or(0);
if tag.blknum >= old_nblocks {
let new_nblocks = tag.blknum + 1;
let key = relation_size_key(self.timelineid, tag.rel);
let val = RelationSizeEntry::Size(new_nblocks);
if tag.blknum >= old_nblocks {
let new_nblocks = tag.blknum + 1;
let key = relation_size_key(self.timelineid, tag.rel);
let val = ObjectValue::RelationSize(new_nblocks);
trace!(
"Extended relation {} from {} to {} blocks at {}",
tag.rel,
old_nblocks,
new_nblocks,
lsn
);
trace!(
"Extended relation {} from {} to {} blocks at {}",
tag.rel,
old_nblocks,
new_nblocks,
lsn
);
self.obj_store
.put(&key, lsn, &RelationSizeEntry::ser(&val)?)?;
let mut rel_meta = self.rel_meta.write().unwrap();
rel_meta.insert(
tag.rel,
RelMetadata {
size: new_nblocks,
last_updated: lsn,
},
);
self.obj_store.put(&key, lsn, &ObjectValue::ser(&val)?)?;
let mut rel_meta = self.rel_meta.write().unwrap();
rel_meta.insert(
tag.rel,
RelMetadata {
size: new_nblocks,
last_updated: lsn,
},
);
}
}
Ok(())
}
@@ -424,12 +504,9 @@ impl Timeline for ObjectTimeline {
///
fn put_truncation(&self, rel: RelTag, lsn: Lsn, nblocks: u32) -> Result<()> {
let key = relation_size_key(self.timelineid, rel);
let val = RelationSizeEntry::Size(nblocks);
let val = ObjectValue::RelationSize(nblocks);
info!("Truncate relation {} to {} blocks at {}", rel, nblocks, lsn);
self.obj_store
.put(&key, lsn, &RelationSizeEntry::ser(&val)?)?;
self.obj_store.put(&key, lsn, &ObjectValue::ser(&val)?)?;
let mut rel_meta = self.rel_meta.write().unwrap();
rel_meta.insert(
rel,
@@ -512,73 +589,26 @@ impl Timeline for ObjectTimeline {
ancestor_timeline: self.ancestor_timeline,
ancestor_lsn: self.ancestor_lsn,
};
trace!("checkpoint at {}", metadata.last_valid_lsn);
let val = ObjectValue::TimelineMetadata(metadata);
self.obj_store.put(
&timeline_metadata_key(self.timelineid),
Lsn(0),
&MetadataEntry::ser(&metadata)?,
&ObjectValue::ser(&val)?,
)?;
trace!("checkpoint at {}", metadata.last_valid_lsn);
Ok(())
}
fn history<'a>(&'a self) -> Result<Box<dyn History + 'a>> {
let lsn = self.last_valid_lsn.load();
let iter = self.obj_store.objects(self.timelineid, lsn)?;
Ok(Box::new(ObjectHistory {
lsn,
iter,
last_relation_size: None,
}))
Ok(Box::new(ObjectHistory { lsn, iter }))
}
}
impl ObjectTimeline {
fn get_page_at_lsn_nowait(&self, tag: BufferTag, lsn: Lsn) -> Result<Bytes> {
// Look up the page entry. If it's a page image, return that. If it's a WAL record,
// ask the WAL redo service to reconstruct the page image from the WAL records.
let searchkey = ObjectKey {
timeline: self.timelineid,
buf_tag: tag,
};
let mut iter = self.object_versions(&*self.obj_store, &searchkey, lsn)?;
if let Some((version_lsn, value)) = iter.next().transpose()? {
let page_img: Bytes;
match PageEntry::des(&value)? {
PageEntry::Page(img) => {
page_img = img;
}
PageEntry::WALRecord(_rec) => {
// Request the WAL redo manager to apply the WAL records for us.
let (base_img, records) = self.collect_records_for_apply(tag, lsn)?;
page_img = self.walredo_mgr.request_redo(tag, lsn, base_img, records)?;
self.put_page_image(tag, lsn, page_img.clone())?;
}
}
// FIXME: assumes little-endian. Only used for the debugging log though
let page_lsn_hi = u32::from_le_bytes(page_img.get(0..4).unwrap().try_into().unwrap());
let page_lsn_lo = u32::from_le_bytes(page_img.get(4..8).unwrap().try_into().unwrap());
trace!(
"Returning page with LSN {:X}/{:X} for {} blk {} from {} (request {})",
page_lsn_hi,
page_lsn_lo,
tag.rel,
tag.blknum,
version_lsn,
lsn
);
return Ok(page_img);
}
static ZERO_PAGE: [u8; 8192] = [0u8; 8192];
trace!("page {} blk {} at {} not found", tag.rel, tag.blknum, lsn);
Ok(Bytes::from_static(&ZERO_PAGE))
/* return Err("could not find page image")?; */
}
///
/// Internal function to get relation size at given LSN.
///
@@ -597,8 +627,9 @@ impl ObjectTimeline {
let mut iter = self.object_versions(&*self.obj_store, &key, lsn)?;
if let Some((version_lsn, value)) = iter.next().transpose()? {
match RelationSizeEntry::des(&value)? {
RelationSizeEntry::Size(nblocks) => {
let value = ObjectValue::des(&value)?;
match value {
ObjectValue::RelationSize(nblocks) => {
trace!(
"relation {} has size {} at {} (request {})",
rel,
@@ -608,7 +639,7 @@ impl ObjectTimeline {
);
Ok(Some(nblocks))
}
RelationSizeEntry::Unlink => {
ObjectValue::Unlink => {
trace!(
"relation {} not found; it was dropped at lsn {}",
rel,
@@ -616,9 +647,15 @@ impl ObjectTimeline {
);
Ok(None)
}
_ => bail!(
"Unexpect relation {} size value {:?} at {}",
rel,
value,
lsn
),
}
} else {
info!("relation {} not found at {}", rel, lsn);
debug!("relation {} not found at {}", rel, lsn);
Ok(None)
}
}
@@ -632,7 +669,7 @@ impl ObjectTimeline {
///
fn collect_records_for_apply(
&self,
tag: BufferTag,
tag: ObjectTag,
lsn: Lsn,
) -> Result<(Option<Bytes>, Vec<WALRecord>)> {
let mut base_img: Option<Bytes> = None;
@@ -642,24 +679,25 @@ impl ObjectTimeline {
// old page image.
let searchkey = ObjectKey {
timeline: self.timelineid,
buf_tag: tag,
tag,
};
let mut iter = self.object_versions(&*self.obj_store, &searchkey, lsn)?;
while let Some((_key, value)) = iter.next().transpose()? {
match PageEntry::des(&value)? {
PageEntry::Page(img) => {
match ObjectValue::des(&value)? {
ObjectValue::Page(img) => {
// We have a base image. No need to dig deeper into the list of
// records
base_img = Some(img);
break;
}
PageEntry::WALRecord(rec) => {
ObjectValue::WALRecord(rec) => {
records.push(rec.clone());
// If this WAL record initializes the page, no need to dig deeper.
if rec.will_init {
break;
}
}
x => bail!("Unexpected object value {:?}", x),
}
}
records.reverse();
@@ -686,62 +724,154 @@ impl ObjectTimeline {
// WAL is large enough to perform GC
let now = Instant::now();
let mut truncated = 0u64;
let mut inspected = 0u64;
let mut deleted = 0u64;
// Iterate through all relations
for rels in &self.obj_store.list_rels(self.timelineid, 0, 0, last_lsn)? {
let mut last_version = true;
let mut key = relation_size_key(self.timelineid, *rels);
let mut max_size = 0u32;
let mut relation_dropped = false;
// Process relation metadata versions
for vers in self.obj_store.object_versions(&key, horizon)? {
let lsn = vers.0;
let rel_meta = RelationSizeEntry::des(&vers.1)?;
// If relation is dropped at the horizon,
// we can remove all its versions including last (Unlink)
match rel_meta {
RelationSizeEntry::Size(size) => max_size = max(max_size, size),
RelationSizeEntry::Unlink => {
// Iterate through all objects in timeline
for obj in self
.obj_store
.list_objects(self.timelineid, false, last_lsn)?
{
inspected += 1;
match obj {
// Prepared transactions
ObjectTag::TwoPhase(prepare) => {
let key = ObjectKey {
timeline: self.timelineid,
tag: obj,
};
for vers in self.obj_store.object_versions(&key, horizon)? {
if self.get_tx_status(prepare.xid, horizon)?
!= pg_constants::TRANSACTION_STATUS_IN_PROGRESS
{
let lsn = vers.0;
self.obj_store.unlink(&key, lsn)?;
deleted += 1;
}
}
}
ObjectTag::RelationMetadata(_) => {
// Do not need to reconstruct page images,
// just delete all old versions over horizon
let mut last_version = true;
let key = ObjectKey {
timeline: self.timelineid,
tag: obj,
};
for vers in self.obj_store.object_versions(&key, horizon)? {
let lsn = vers.0;
if last_version {
relation_dropped = true;
info!("Relation {:?} dropped", rels);
let content = vers.1;
match ObjectValue::des(&content[..])? {
ObjectValue::Unlink => {
self.obj_store.unlink(&key, lsn)?;
deleted += 1;
}
_ => (), // preserve last version
}
last_version = false;
truncated += 1;
} else {
self.obj_store.unlink(&key, lsn)?;
deleted += 1;
}
}
}
if last_version {
last_version = false;
if !relation_dropped {
// preserve last version
continue;
ObjectTag::RelationBuffer(tag) => {
// Reconstruct page at horizon unless relation was dropped
// and delete all older versions over horizon
let mut last_version = true;
let key = ObjectKey {
timeline: self.timelineid,
tag: obj,
};
for vers in self.obj_store.object_versions(&key, horizon)? {
let lsn = vers.0;
if last_version {
truncated += 1;
last_version = false;
if let Some(rel_size) = self.relsize_get_nowait(tag.rel, lsn)? {
if rel_size > tag.blknum {
// preserve and materialize last version before deleting all preceeding
self.get_page_at_lsn_nowait(obj, lsn)?;
continue;
}
debug!("Drop last block {} of relation {:?} at {} because it is beyond relation size {}", tag.blknum, tag.rel, lsn, rel_size);
} else {
if let Some(rel_size) =
self.relsize_get_nowait(tag.rel, last_lsn)?
{
debug!("Preserve block {} of relation {:?} at {} because relation has size {} at {}", tag.rel, tag, lsn, rel_size, last_lsn);
continue;
}
debug!("Relation {:?} was dropped at {}", tag.rel, lsn);
}
// relation was dropped or truncated so this block can be removed
}
self.obj_store.unlink(&key, lsn)?;
deleted += 1;
}
}
self.obj_store.unlink(&key, lsn)?;
deleted += 1;
}
// Now process all relation blocks
for blknum in 0..max_size {
key.buf_tag.blknum = blknum;
last_version = true;
for vers in self.obj_store.object_versions(&key, horizon)? {
let lsn = vers.0;
if last_version {
last_version = false;
truncated += 1;
if !relation_dropped {
// preserve and materialize last version before deleting all preceeding
self.get_page_at_lsn_nowait(key.buf_tag, lsn)?;
continue;
// SLRU-s
ObjectTag::Clog(_)
| ObjectTag::MultiXactOffsets(_)
| ObjectTag::MultiXactMembers(_) => {
// Remove old versions over horizon
let mut last_version = true;
let key = ObjectKey {
timeline: self.timelineid,
tag: obj,
};
for vers in self.obj_store.object_versions(&key, horizon)? {
let lsn = vers.0;
if last_version {
let content = vers.1;
match ObjectValue::des(&content[..])? {
ObjectValue::Unlink => {
self.obj_store.unlink(&key, lsn)?;
deleted += 1;
}
ObjectValue::WALRecord(_) => {
// preserve and materialize last version before deleting all preceeding
self.get_page_at_lsn_nowait(obj, lsn)?;
}
_ => {} // do nothing if already materialized
}
last_version = false;
truncated += 1;
} else {
// delete deteriorated version
self.obj_store.unlink(&key, lsn)?;
deleted += 1;
}
}
self.obj_store.unlink(&key, lsn)?;
deleted += 1;
}
// versioned alwaysmaterialized objects: no need to reconstruct pages
ObjectTag::Checkpoint | ObjectTag::ControlFile => {
// Remove old versions over horizon
let mut last_version = true;
let key = ObjectKey {
timeline: self.timelineid,
tag: obj,
};
for vers in self.obj_store.object_versions(&key, horizon)? {
let lsn = vers.0;
if last_version {
// presrve last version
last_version = false;
truncated += 1;
} else {
// delete deteriorated version
self.obj_store.unlink(&key, lsn)?;
deleted += 1;
}
}
}
_ => (), // do nothing
}
}
info!("Garbage collection completed in {:?}: {} version histories truncated, {} versions deleted",
now.elapsed(), truncated, deleted);
info!("Garbage collection completed in {:?}:\n{} version chains inspected, {} version histories truncated, {} versions deleted",
now.elapsed(), inspected, truncated, deleted);
}
}
}
@@ -797,7 +927,7 @@ impl ObjectTimeline {
Ok(ObjectVersionIter {
obj_store,
buf_tag: key.buf_tag,
tag: key.tag,
current_iter,
ancestor_timeline: self.ancestor_timeline,
ancestor_lsn: self.ancestor_lsn,
@@ -806,16 +936,17 @@ impl ObjectTimeline {
}
struct ObjectHistory<'a> {
iter: Box<dyn Iterator<Item = Result<(BufferTag, Lsn, Vec<u8>)>> + 'a>,
iter: Box<dyn Iterator<Item = Result<(ObjectTag, Lsn, Vec<u8>)>> + 'a>,
lsn: Lsn,
last_relation_size: Option<(BufferTag, u32)>,
}
impl<'a> Iterator for ObjectHistory<'a> {
type Item = Result<RelationUpdate>;
type Item = Result<Modification>;
fn next(&mut self) -> Option<Self::Item> {
self.next_result().transpose()
self.iter
.next()
.map(|result| result.map(|t| Modification::new(t)))
}
}
@@ -825,124 +956,41 @@ impl<'a> History for ObjectHistory<'a> {
}
}
impl<'a> ObjectHistory<'a> {
fn handle_relation_size(
&mut self,
buf_tag: BufferTag,
entry: RelationSizeEntry,
) -> Option<Update> {
match entry {
RelationSizeEntry::Size(size) => {
// we only want to output truncations, expansions are filtered out
let last_relation_size = self.last_relation_size.replace((buf_tag, size));
match last_relation_size {
Some((last_buf, last_size)) if last_buf != buf_tag || size < last_size => {
Some(Update::Truncate { n_blocks: size })
}
_ => None,
}
}
RelationSizeEntry::Unlink => Some(Update::Unlink),
}
}
fn handle_page(&mut self, buf_tag: BufferTag, entry: PageEntry) -> Update {
match entry {
PageEntry::Page(img) => Update::Page {
blknum: buf_tag.blknum,
img,
},
PageEntry::WALRecord(rec) => Update::WALRecord {
blknum: buf_tag.blknum,
rec,
},
}
}
fn next_result(&mut self) -> Result<Option<RelationUpdate>> {
while let Some((buf_tag, lsn, value)) = self.iter.next().transpose()? {
if buf_tag.rel.forknum == pg_constants::ROCKSDB_SPECIAL_FORKNUM {
continue;
}
let update = if buf_tag.blknum == RELATION_SIZE_BLKNUM {
let entry = RelationSizeEntry::des(&value)?;
match self.handle_relation_size(buf_tag, entry) {
Some(relation_update) => relation_update,
None => continue,
}
} else {
let entry = PageEntry::des(&value)?;
self.handle_page(buf_tag, entry)
};
return Ok(Some(RelationUpdate {
rel: buf_tag.rel,
lsn,
update,
}));
}
Ok(None)
}
}
///
/// We store two kinds of page versions in the repository:
///
/// 1. Ready-made images of the block
/// 2. WAL records, to be applied on top of the "previous" entry
///
/// Some WAL records will initialize the page from scratch. For such records,
/// the 'will_init' flag is set. They don't need the previous page image before
/// applying. The 'will_init' flag is set for records containing a full-page image,
/// and for records with the BKPBLOCK_WILL_INIT flag. These differ from PageImages
/// stored directly in the cache entry in that you still need to run the WAL redo
/// routine to generate the page image.
/// We store several kinds of objects in the repository.
/// We have per-page, per-relation(or non-rel file) and per-timeline entries.
///
#[derive(Debug, Clone, Serialize, Deserialize)]
enum PageEntry {
pub enum ObjectValue {
/// Ready-made images of the block
Page(Bytes),
/// WAL records, to be applied on top of the "previous" entry
///
/// Some WAL records will initialize the page from scratch. For such records,
/// the 'will_init' flag is set. They don't need the previous page image before
/// applying. The 'will_init' flag is set for records containing a full-page image,
/// and for records with the BKPBLOCK_WILL_INIT flag. These differ from PageImages
/// stored directly in the cache entry in that you still need to run the WAL redo
/// routine to generate the page image.
WALRecord(WALRecord),
}
///
/// In addition to page versions, we store relation size as a separate, versioned,
/// object.
///
#[derive(Debug, Clone, Serialize, Deserialize)]
pub enum RelationSizeEntry {
Size(u32),
/// RelationSize. We store it separately not only to ansver nblocks requests faster.
/// We also need it to support relation truncation.
RelationSize(u32),
/// Tombstone for a dropped relation.
//
// TODO: Not used. Currently, we never drop relations. The parsing
// of relation drops in COMMIT/ABORT records has not been
// implemented. We should also have a mechanism to remove
// "orphaned" relfiles, if the compute node crashes before writing
// the COMMIT/ABORT record.
Unlink,
TimelineMetadata(MetadataEntry),
}
// No real block in PostgreSQL will have block number u32::MAX
// See vendor/postgres/src/include/storage/block.h
const RELATION_SIZE_BLKNUM: u32 = u32::MAX;
const fn relation_size_key(timelineid: ZTimelineId, rel: RelTag) -> ObjectKey {
ObjectKey {
timeline: timelineid,
buf_tag: BufferTag {
rel,
blknum: RELATION_SIZE_BLKNUM,
},
tag: ObjectTag::RelationMetadata(rel),
}
}
///
/// In addition to those per-page and per-relation entries, we also
/// store a little metadata blob for each timeline. It is stored using
/// STORAGE_SPECIAL_FORKNUM.
/// store a little metadata blob for each timeline.
///
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct MetadataEntry {
@@ -955,15 +1003,7 @@ pub struct MetadataEntry {
const fn timeline_metadata_key(timelineid: ZTimelineId) -> ObjectKey {
ObjectKey {
timeline: timelineid,
buf_tag: BufferTag {
rel: RelTag {
forknum: pg_constants::ROCKSDB_SPECIAL_FORKNUM,
spcnode: 0,
dbnode: 0,
relnode: 0,
},
blknum: 0,
},
tag: ObjectTag::TimelineMetadataTag,
}
}
@@ -976,7 +1016,7 @@ const fn timeline_metadata_key(timelineid: ZTimelineId) -> ObjectKey {
struct ObjectVersionIter<'a> {
obj_store: &'a dyn ObjectStore,
buf_tag: BufferTag,
tag: ObjectTag,
/// Iterator on the current timeline.
current_iter: Box<dyn Iterator<Item = (Lsn, Vec<u8>)> + 'a>,
@@ -1013,7 +1053,7 @@ impl<'a> ObjectVersionIter<'a> {
if let Some(ancestor_timeline) = self.ancestor_timeline {
let searchkey = ObjectKey {
timeline: ancestor_timeline,
buf_tag: self.buf_tag,
tag: self.tag,
};
let ancestor_iter = self
.obj_store
@@ -1026,11 +1066,13 @@ impl<'a> ObjectVersionIter<'a> {
.obj_store
.get(&timeline_metadata_key(ancestor_timeline), Lsn(0))
.with_context(|| "timeline not found in repository")?;
let ancestor_metadata = MetadataEntry::des(&v)?;
self.ancestor_timeline = ancestor_metadata.ancestor_timeline;
self.ancestor_lsn = ancestor_metadata.ancestor_lsn;
self.current_iter = ancestor_iter;
if let ObjectValue::TimelineMetadata(ancestor_metadata) = ObjectValue::des(&v)? {
self.ancestor_timeline = ancestor_metadata.ancestor_timeline;
self.ancestor_lsn = ancestor_metadata.ancestor_lsn;
self.current_iter = ancestor_iter;
} else {
bail!("Invalid timeline metadata");
}
} else {
return Ok(None);
}

View File

@@ -1,6 +1,6 @@
//! Low-level key-value storage abstraction.
//!
use crate::repository::{BufferTag, RelTag};
use crate::repository::{ObjectTag, RelTag};
use crate::ZTimelineId;
use anyhow::Result;
use serde::{Deserialize, Serialize};
@@ -11,7 +11,7 @@ use zenith_utils::lsn::Lsn;
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ObjectKey {
pub timeline: ZTimelineId,
pub buf_tag: BufferTag,
pub tag: ObjectTag,
}
///
@@ -58,7 +58,7 @@ pub trait ObjectStore: Send + Sync {
&'a self,
timeline: ZTimelineId,
lsn: Lsn,
) -> Result<Box<dyn Iterator<Item = Result<(BufferTag, Lsn, Vec<u8>)>> + 'a>>;
) -> Result<Box<dyn Iterator<Item = Result<(ObjectTag, Lsn, Vec<u8>)>> + 'a>>;
/// Iterate through all keys with given tablespace and database ID, and LSN <= 'lsn'.
/// Both dbnode and spcnode can be InvalidId (0) which means get all relations in tablespace/cluster
@@ -72,6 +72,17 @@ pub trait ObjectStore: Send + Sync {
lsn: Lsn,
) -> Result<HashSet<RelTag>>;
/// Iterate through objects tags. If nonrel_only, then only non-relationa data is iterated.
///
/// This is used to implement GC and preparing tarball for new node startup
/// Returns objects in increasing key-version order.
fn list_objects<'a>(
&'a self,
timelineid: ZTimelineId,
nonrel_only: bool,
lsn: Lsn,
) -> Result<Box<dyn Iterator<Item = ObjectTag> + 'a>>;
/// Unlink object (used by GC). This mehod may actually delete object or just mark it for deletion.
fn unlink(&self, key: &ObjectKey, lsn: Lsn) -> Result<()>;
}

View File

@@ -27,7 +27,7 @@ use zenith_utils::{bin_ser::BeSer, lsn::Lsn};
use crate::basebackup;
use crate::branches;
use crate::page_cache;
use crate::repository::{BufferTag, RelTag, RelationUpdate, Update};
use crate::repository::{BufferTag, Modification, ObjectTag, RelTag};
use crate::restore_local_repo;
use crate::walreceiver;
use crate::PageServerConf;
@@ -229,7 +229,7 @@ impl PageServerHandler {
PagestreamBeMessage::Nblocks(PagestreamStatusResponse { ok: true, n_blocks })
}
PagestreamFeMessage::Read(req) => {
let buf_tag = BufferTag {
let tag = ObjectTag::RelationBuffer(BufferTag {
rel: RelTag {
spcnode: req.spcnode,
dbnode: req.dbnode,
@@ -237,9 +237,9 @@ impl PageServerHandler {
forknum: req.forknum,
},
blknum: req.blkno,
};
});
let read_response = match timeline.get_page_at_lsn(buf_tag, req.lsn) {
let read_response = match timeline.get_page_at_lsn(tag, req.lsn) {
Ok(p) => PagestreamReadResponse {
ok: true,
n_blocks: 0,
@@ -290,14 +290,20 @@ impl PageServerHandler {
// find latest snapshot
let snapshot_lsn =
restore_local_repo::find_latest_snapshot(&self.conf, timelineid).unwrap();
let req_lsn = lsn.unwrap_or(snapshot_lsn);
basebackup::send_tarball_at_lsn(
&mut CopyDataSink { pgb },
timelineid,
&timeline,
req_lsn,
snapshot_lsn,
)?;
let req_lsn = lsn.unwrap_or_else(|| timeline.get_last_valid_lsn());
{
let mut writer = CopyDataSink { pgb };
let mut basebackup = basebackup::Basebackup::new(
&mut writer,
timelineid,
&timeline,
req_lsn,
snapshot_lsn,
);
basebackup.send_tarball()?;
}
pgb.write_message(&BeMessage::CopyDone)?;
debug!("CopyDone sent!");
@@ -404,38 +410,14 @@ impl postgres_backend::Handler for PageServerHandler {
while let Some(msg) = pgb.read_message()? {
match msg {
FeMessage::CopyData(bytes) => {
let relation_update = RelationUpdate::des(&bytes)?;
let modification = Modification::des(&bytes)?;
last_lsn = relation_update.lsn;
match relation_update.update {
Update::Page { blknum, img } => {
let tag = BufferTag {
rel: relation_update.rel,
blknum,
};
timeline.put_page_image(tag, relation_update.lsn, img)?;
}
Update::WALRecord { blknum, rec } => {
let tag = BufferTag {
rel: relation_update.rel,
blknum,
};
timeline.put_wal_record(tag, rec)?;
}
Update::Truncate { n_blocks } => {
timeline.put_truncation(
relation_update.rel,
relation_update.lsn,
n_blocks,
)?;
}
Update::Unlink => {
todo!()
}
}
last_lsn = modification.lsn;
timeline.put_raw_data(
modification.tag,
last_lsn,
&modification.data[..],
)?;
}
FeMessage::CopyDone => {
timeline.advance_last_valid_lsn(last_lsn);
@@ -505,7 +487,6 @@ impl postgres_backend::Handler for PageServerHandler {
}
pgb.flush()?;
Ok(())
}
}

View File

@@ -1,10 +1,14 @@
use crate::waldecoder::TransactionId;
use crate::ZTimelineId;
use anyhow::Result;
use bytes::{Buf, BufMut, Bytes, BytesMut};
use postgres_ffi::nonrelfile_utils::transaction_id_get_status;
use postgres_ffi::pg_constants;
use postgres_ffi::relfile_utils::forknumber_to_name;
use serde::{Deserialize, Serialize};
use std::collections::HashSet;
use std::fmt;
use std::iter::Iterator;
use std::sync::Arc;
use zenith_utils::lsn::Lsn;
@@ -34,7 +38,10 @@ pub trait Timeline: Send + Sync {
//------------------------------------------------------------------------------
/// Look up given page in the cache.
fn get_page_at_lsn(&self, tag: BufferTag, lsn: Lsn) -> Result<Bytes>;
fn get_page_at_lsn(&self, tag: ObjectTag, lsn: Lsn) -> Result<Bytes>;
/// Look up given page in the cache.
fn get_page_at_lsn_nowait(&self, tag: ObjectTag, lsn: Lsn) -> Result<Bytes>;
/// Get size of relation
fn get_rel_size(&self, tag: RelTag, lsn: Lsn) -> Result<u32>;
@@ -42,27 +49,37 @@ pub trait Timeline: Send + Sync {
/// Does relation exist?
fn get_rel_exists(&self, tag: RelTag, lsn: Lsn) -> Result<bool>;
/// Get a list of all distinct relations in given tablespace and database.
/// Get a list of all relations in given tablespace and database.
fn list_rels(&self, spcnode: u32, dbnode: u32, lsn: Lsn) -> Result<HashSet<RelTag>>;
/// Get a list of non-relational objects
fn list_nonrels<'a>(&'a self, lsn: Lsn) -> Result<Box<dyn Iterator<Item = ObjectTag> + 'a>>;
//------------------------------------------------------------------------------
// Public PUT functions, to update the repository with new page versions.
//
// These are called by the WAL receiver to digest WAL records.
//------------------------------------------------------------------------------
/// Put raw data
fn put_raw_data(&self, tag: ObjectTag, lsn: Lsn, data: &[u8]) -> Result<()>;
/// Put a new page version that can be constructed from a WAL record
///
/// This will implicitly extend the relation, if the page is beyond the
/// current end-of-file.
fn put_wal_record(&self, tag: BufferTag, rec: WALRecord) -> Result<()>;
fn put_wal_record(&self, tag: ObjectTag, rec: WALRecord) -> Result<()>;
/// Like put_wal_record, but with ready-made image of the page.
fn put_page_image(&self, tag: BufferTag, lsn: Lsn, img: Bytes) -> Result<()>;
fn put_page_image(&self, tag: ObjectTag, lsn: Lsn, img: Bytes) -> Result<()>;
/// Truncate relation
fn put_truncation(&self, rel: RelTag, lsn: Lsn, nblocks: u32) -> Result<()>;
/// Unlink object. This method is used for marking dropped relations
/// and removed segments of SLRUs.
fn put_unlink(&self, tag: ObjectTag, lsn: Lsn) -> Result<()>;
/// Remember the all WAL before the given LSN has been processed.
///
/// The WAL receiver calls this after the put_* functions, to indicate that
@@ -94,26 +111,37 @@ pub trait Timeline: Send + Sync {
/// Relation size is increased implicitly and decreased with Truncate updates.
// TODO ordering guarantee?
fn history<'a>(&'a self) -> Result<Box<dyn History + 'a>>;
// Check transaction status
fn get_tx_status(&self, xid: TransactionId, lsn: Lsn) -> anyhow::Result<u8> {
let blknum = xid / pg_constants::CLOG_XACTS_PER_PAGE;
let tag = ObjectTag::Clog(SlruBufferTag { blknum });
let clog_page = self.get_page_at_lsn(tag, lsn)?;
let status = transaction_id_get_status(xid, &clog_page[..]);
Ok(status)
}
}
pub trait History: Iterator<Item = Result<RelationUpdate>> {
pub trait History: Iterator<Item = Result<Modification>> {
/// The last_valid_lsn at the time of history() call.
fn lsn(&self) -> Lsn;
}
#[derive(Debug, PartialEq, Eq, Serialize, Deserialize)]
pub struct RelationUpdate {
pub rel: RelTag,
pub struct Modification {
pub tag: ObjectTag,
pub lsn: Lsn,
pub update: Update,
pub data: Vec<u8>,
}
#[derive(Debug, PartialEq, Eq, Serialize, Deserialize)]
pub enum Update {
Page { blknum: u32, img: Bytes },
WALRecord { blknum: u32, rec: WALRecord },
Truncate { n_blocks: u32 },
Unlink,
impl Modification {
pub fn new(entry: (ObjectTag, Lsn, Vec<u8>)) -> Modification {
Modification {
tag: entry.0,
lsn: entry.1,
data: entry.2,
}
}
}
#[derive(Clone)]
@@ -185,6 +213,8 @@ impl fmt::Display for RelTag {
/// In Postgres `BufferTag` structure is used for exactly the same purpose.
/// [See more related comments here](https://github.com/postgres/postgres/blob/99c5852e20a0987eca1c38ba0c09329d4076b6a0/src/include/storage/buf_internals.h#L91).
///
/// NOTE: In this context we use buffer, block and page interchangeably when speak about relation files.
///
#[derive(Debug, PartialEq, Eq, PartialOrd, Ord, Clone, Copy, Serialize, Deserialize)]
pub struct BufferTag {
pub rel: RelTag,
@@ -198,6 +228,71 @@ impl BufferTag {
};
}
///
/// Non-relation transaction status files (clog (a.k.a. pg_xact) and pg_multixact)
/// in Postgres are handled by SLRU (Simple LRU) buffer, hence the name.
///
/// These files are global for a postgres instance.
///
/// These files are divided into segments, which are divided into pages
/// of the same BLCKSZ as used for relation files.
///
#[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq, Eq, PartialOrd, Ord)]
pub struct SlruBufferTag {
pub blknum: u32,
}
///
/// Special type of Postgres files: pg_filenode.map is needed to map
/// catalog table OIDs to filenode numbers, which define filename.
///
/// Each database has a map file for its local mapped catalogs,
/// and there is a separate map file for shared catalogs.
///
/// These files have untypical size of 512 bytes.
///
/// See PostgreSQL relmapper.c for details.
///
#[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq, Eq, PartialOrd, Ord)]
pub struct DatabaseTag {
pub spcnode: u32,
pub dbnode: u32,
}
///
/// Non-relation files that keep state for prepared transactions.
/// Unlike other files these are not divided into pages.
///
/// See PostgreSQL twophase.c for details.
///
#[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq, Eq, PartialOrd, Ord)]
pub struct PrepareTag {
pub xid: TransactionId,
}
/// ObjectTag is a part of ObjectKey that is specific
/// to the type of the stored object.
#[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq, Eq, PartialOrd, Ord)]
pub enum ObjectTag {
// dummy tag preceeding all other keys
FirstTag,
TimelineMetadataTag,
// Special entry that represents PostgreSQL checkpoint.
// We use it to track fields needed to restore controlfile checkpoint.
Checkpoint,
// Various types of non-relation files.
// We need them to bootstrap compute node.
ControlFile,
Clog(SlruBufferTag),
MultiXactMembers(SlruBufferTag),
MultiXactOffsets(SlruBufferTag),
FileNodeMap(DatabaseTag),
TwoPhase(PrepareTag),
// put relations at the end of enum to allow efficient iterations through non-rel objects
RelationMetadata(RelTag),
RelationBuffer(BufferTag),
}
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
pub struct WALRecord {
pub lsn: Lsn, // LSN at the *end* of the record
@@ -239,6 +334,7 @@ impl WALRecord {
mod tests {
use super::*;
use crate::object_repository::ObjectRepository;
use crate::object_repository::ObjectValue;
use crate::rocksdb_storage::RocksObjectStore;
use crate::walredo::{WalRedoError, WalRedoManager};
use crate::PageServerConf;
@@ -247,6 +343,7 @@ mod tests {
use std::path::PathBuf;
use std::str::FromStr;
use std::time::Duration;
use zenith_utils::bin_ser::BeSer;
/// Arbitrary relation tag, for testing.
const TESTREL_A: RelTag = RelTag {
@@ -259,11 +356,11 @@ mod tests {
/// Convenience function to create a BufferTag for testing.
/// Helps to keeps the tests shorter.
#[allow(non_snake_case)]
fn TEST_BUF(blknum: u32) -> BufferTag {
BufferTag {
fn TEST_BUF(blknum: u32) -> ObjectTag {
ObjectTag::RelationBuffer(BufferTag {
rel: TESTREL_A,
blknum,
}
})
}
/// Convenience function to create a page image with given string as the only content
@@ -443,45 +540,83 @@ mod tests {
let timelineid = ZTimelineId::from_str("11223344556677881122334455667788").unwrap();
let tline = repo.create_empty_timeline(timelineid, Lsn(0))?;
let mut snapshot = tline.history()?;
let snapshot = tline.history()?;
assert_eq!(snapshot.lsn(), Lsn(0));
let mut snapshot = snapshot.skip_while(|r| match r {
Ok(m) => match m.tag {
ObjectTag::RelationBuffer(_) => false,
_ => true,
},
_ => true,
});
assert_eq!(None, snapshot.next().transpose()?);
// add a page and advance the last valid LSN
let buf = TEST_BUF(1);
tline.put_page_image(buf, Lsn(1), TEST_IMG("blk 1 @ lsn 1"))?;
let rel = TESTREL_A;
let tag = TEST_BUF(1);
tline.put_page_image(tag, Lsn(1), TEST_IMG("blk 1 @ lsn 1"))?;
tline.advance_last_valid_lsn(Lsn(1));
let mut snapshot = tline.history()?;
let snapshot = tline.history()?;
assert_eq!(snapshot.lsn(), Lsn(1));
let expected_page = RelationUpdate {
rel: buf.rel,
lsn: Lsn(1),
update: Update::Page {
blknum: buf.blknum,
img: TEST_IMG("blk 1 @ lsn 1"),
let mut snapshot = snapshot.skip_while(|r| match r {
Ok(m) => match m.tag {
ObjectTag::RelationBuffer(_) => false,
_ => true,
},
_ => true,
});
let expected_page = Modification {
tag,
lsn: Lsn(1),
data: ObjectValue::ser(&ObjectValue::Page(TEST_IMG("blk 1 @ lsn 1")))?,
};
assert_eq!(Some(&expected_page), snapshot.next().transpose()?.as_ref());
assert_eq!(None, snapshot.next().transpose()?);
// truncate to zero, but don't advance the last valid LSN
tline.put_truncation(buf.rel, Lsn(2), 0)?;
let mut snapshot = tline.history()?;
tline.put_truncation(rel, Lsn(2), 0)?;
let snapshot = tline.history()?;
assert_eq!(snapshot.lsn(), Lsn(1));
let mut snapshot = snapshot.skip_while(|r| match r {
Ok(m) => match m.tag {
ObjectTag::RelationBuffer(_) => false,
_ => true,
},
_ => true,
});
assert_eq!(Some(&expected_page), snapshot.next().transpose()?.as_ref());
assert_eq!(None, snapshot.next().transpose()?);
// advance the last valid LSN and the truncation should be observable
tline.advance_last_valid_lsn(Lsn(2));
let mut snapshot = tline.history()?;
let snapshot = tline.history()?;
assert_eq!(snapshot.lsn(), Lsn(2));
assert_eq!(Some(&expected_page), snapshot.next().transpose()?.as_ref());
let expected_truncate = RelationUpdate {
rel: buf.rel,
lsn: Lsn(2),
update: Update::Truncate { n_blocks: 0 },
let mut snapshot = snapshot.skip_while(|r| match r {
Ok(m) => match m.tag {
ObjectTag::RelationMetadata(_) => false,
_ => true,
},
_ => true,
});
let expected_truncate = Modification {
tag: ObjectTag::RelationMetadata(rel),
lsn: Lsn(1),
data: ObjectValue::ser(&ObjectValue::RelationSize(2))?,
};
assert_eq!(Some(expected_truncate), snapshot.next().transpose()?); // TODO ordering not guaranteed by API
assert_eq!(
Some(&expected_truncate),
snapshot.next().transpose()?.as_ref()
); // TODO ordering not guaranteed by API
let expected_truncate = Modification {
tag: ObjectTag::RelationMetadata(rel),
lsn: Lsn(2),
data: ObjectValue::ser(&ObjectValue::RelationSize(0))?,
};
assert_eq!(
Some(&expected_truncate),
snapshot.next().transpose()?.as_ref()
); // TODO ordering not guaranteed by API
assert_eq!(Some(&expected_page), snapshot.next().transpose()?.as_ref());
assert_eq!(None, snapshot.next().transpose()?);
Ok(())
@@ -493,15 +628,14 @@ mod tests {
impl WalRedoManager for TestRedoManager {
fn request_redo(
&self,
tag: BufferTag,
tag: ObjectTag,
lsn: Lsn,
base_img: Option<Bytes>,
records: Vec<WALRecord>,
) -> Result<Bytes, WalRedoError> {
let s = format!(
"redo for rel {} blk {} to get to {}, with {} and {} records",
tag.rel,
tag.blknum,
"redo for {:?} to get to {}, with {} and {} records",
tag,
lsn,
if base_img.is_some() {
"base image"

View File

@@ -14,7 +14,9 @@ use std::path::{Path, PathBuf};
use anyhow::Result;
use bytes::Bytes;
use crate::repository::{BufferTag, RelTag, Timeline, WALRecord};
use crate::repository::{
BufferTag, DatabaseTag, ObjectTag, PrepareTag, RelTag, SlruBufferTag, Timeline, WALRecord,
};
use crate::waldecoder::{decode_wal_record, DecodedWALRecord, Oid, WalStreamDecoder};
use crate::waldecoder::{XlCreateDatabase, XlSmgrTruncate};
use crate::PageServerConf;
@@ -22,6 +24,7 @@ use crate::ZTimelineId;
use postgres_ffi::pg_constants;
use postgres_ffi::relfile_utils::*;
use postgres_ffi::xlog_utils::*;
use postgres_ffi::*;
use zenith_utils::lsn::Lsn;
///
@@ -63,8 +66,18 @@ pub fn import_timeline_from_postgres_datadir(
None => continue,
// These special files appear in the snapshot, but are not needed by the page server
Some("pg_control") => continue,
Some("pg_filenode.map") => continue,
Some("pg_control") => {
import_nonrel_file(timeline, lsn, ObjectTag::ControlFile, &direntry.path())?
}
Some("pg_filenode.map") => import_nonrel_file(
timeline,
lsn,
ObjectTag::FileNodeMap(DatabaseTag {
spcnode: pg_constants::GLOBALTABLESPACE_OID,
dbnode: 0,
}),
&direntry.path(),
)?,
// Load any relation files into the page server
_ => import_relfile(
@@ -91,7 +104,15 @@ pub fn import_timeline_from_postgres_datadir(
// These special files appear in the snapshot, but are not needed by the page server
Some("PG_VERSION") => continue,
Some("pg_filenode.map") => continue,
Some("pg_filenode.map") => import_nonrel_file(
timeline,
lsn,
ObjectTag::FileNodeMap(DatabaseTag {
spcnode: pg_constants::DEFAULTTABLESPACE_OID,
dbnode: dboid,
}),
&direntry.path(),
)?,
// Load any relation files into the page server
_ => import_relfile(
@@ -104,6 +125,43 @@ pub fn import_timeline_from_postgres_datadir(
}
}
}
for entry in fs::read_dir(path.join("pg_xact"))? {
let entry = entry?;
import_slru_file(
timeline,
lsn,
|blknum| ObjectTag::Clog(SlruBufferTag { blknum }),
&entry.path(),
)?;
}
for entry in fs::read_dir(path.join("pg_multixact").join("members"))? {
let entry = entry?;
import_slru_file(
timeline,
lsn,
|blknum| ObjectTag::MultiXactMembers(SlruBufferTag { blknum }),
&entry.path(),
)?;
}
for entry in fs::read_dir(path.join("pg_multixact").join("offsets"))? {
let entry = entry?;
import_slru_file(
timeline,
lsn,
|blknum| ObjectTag::MultiXactOffsets(SlruBufferTag { blknum }),
&entry.path(),
)?;
}
for entry in fs::read_dir(path.join("pg_twophase"))? {
let entry = entry?;
let xid = u32::from_str_radix(&entry.path().to_str().unwrap(), 16)?;
import_nonrel_file(
timeline,
lsn,
ObjectTag::TwoPhase(PrepareTag { xid }),
&entry.path(),
)?;
}
// TODO: Scan pg_tblspc
timeline.checkpoint()?;
@@ -136,7 +194,7 @@ fn import_relfile(
let r = file.read_exact(&mut buf);
match r {
Ok(_) => {
let tag = BufferTag {
let tag = ObjectTag::RelationBuffer(BufferTag {
rel: RelTag {
spcnode: spcoid,
dbnode: dboid,
@@ -144,13 +202,61 @@ fn import_relfile(
forknum,
},
blknum,
};
});
timeline.put_page_image(tag, lsn, Bytes::copy_from_slice(&buf))?;
/*
if oldest_lsn == 0 || p.lsn < oldest_lsn {
oldest_lsn = p.lsn;
}
// TODO: UnexpectedEof is expected
Err(e) => match e.kind() {
std::io::ErrorKind::UnexpectedEof => {
// reached EOF. That's expected.
// FIXME: maybe check that we read the full length of the file?
break;
}
*/
_ => {
error!("error reading file: {:?} ({})", path, e);
break;
}
},
};
blknum += 1;
}
Ok(())
}
fn import_nonrel_file(
timeline: &dyn Timeline,
lsn: Lsn,
tag: ObjectTag,
path: &Path,
) -> Result<()> {
let mut file = File::open(path)?;
let mut buffer = Vec::new();
// read the whole file
file.read_to_end(&mut buffer)?;
timeline.put_page_image(tag, lsn, Bytes::copy_from_slice(&buffer[..]))?;
Ok(())
}
fn import_slru_file(
timeline: &dyn Timeline,
lsn: Lsn,
gen_tag: fn(blknum: u32) -> ObjectTag,
path: &Path,
) -> Result<()> {
// Does it look like a relation file?
let mut file = File::open(path)?;
let mut buf: [u8; 8192] = [0u8; 8192];
let segno = u32::from_str_radix(path.file_name().unwrap().to_str().unwrap(), 16)?;
let mut blknum: u32 = segno * pg_constants::SLRU_PAGES_PER_SEGMENT;
loop {
let r = file.read_exact(&mut buf);
match r {
Ok(_) => {
timeline.put_page_image(gen_tag(blknum), lsn, Bytes::copy_from_slice(&buf))?;
}
// TODO: UnexpectedEof is expected
@@ -180,6 +286,16 @@ pub fn import_timeline_wal(walpath: &Path, timeline: &dyn Timeline, startpoint:
let mut segno = startpoint.segment_number(pg_constants::WAL_SEGMENT_SIZE);
let mut offset = startpoint.segment_offset(pg_constants::WAL_SEGMENT_SIZE);
let mut last_lsn = startpoint;
let checkpoint_bytes = timeline.get_page_at_lsn_nowait(ObjectTag::Checkpoint, startpoint)?;
let mut checkpoint = CheckPoint::decode(&checkpoint_bytes)?;
if checkpoint.nextXid.value == 0 {
let pg_control_bytes =
timeline.get_page_at_lsn_nowait(ObjectTag::ControlFile, startpoint)?;
let pg_control = ControlFileData::decode(&pg_control_bytes)?;
checkpoint = pg_control.checkPointCopy;
}
loop {
// FIXME: assume postgresql tli 1 for now
let filename = XLogFileName(1, segno, pg_constants::WAL_SEGMENT_SIZE);
@@ -217,10 +333,11 @@ pub fn import_timeline_wal(walpath: &Path, timeline: &dyn Timeline, startpoint:
if rec.is_err() {
// Assume that an error means we've reached the end of
// a partial WAL record. So that's ok.
trace!("WAL decoder error {:?}", rec);
break;
}
if let Some((lsn, recdata)) = rec.unwrap() {
let decoded = decode_wal_record(recdata.clone());
let decoded = decode_wal_record(&mut checkpoint, recdata.clone());
save_decoded_record(timeline, &decoded, recdata, lsn)?;
last_lsn = lsn;
} else {
@@ -240,6 +357,8 @@ pub fn import_timeline_wal(walpath: &Path, timeline: &dyn Timeline, startpoint:
offset = 0;
}
info!("reached end of WAL at {}", last_lsn);
let checkpoint_bytes = checkpoint.encode();
timeline.put_page_image(ObjectTag::Checkpoint, last_lsn, checkpoint_bytes)?;
Ok(())
}
@@ -256,24 +375,17 @@ pub fn save_decoded_record(
// Iterate through all the blocks that the record modifies, and
// "put" a separate copy of the record for each block.
for blk in decoded.blocks.iter() {
let tag = BufferTag {
rel: RelTag {
spcnode: blk.rnode_spcnode,
dbnode: blk.rnode_dbnode,
relnode: blk.rnode_relnode,
forknum: blk.forknum as u8,
},
blknum: blk.blkno,
};
let rec = WALRecord {
lsn,
will_init: blk.will_init || blk.apply_image,
rec: recdata.clone(),
main_data_offset: decoded.main_data_offset as u32,
};
timeline.put_wal_record(tag, rec)?;
if blk.will_drop {
timeline.put_unlink(blk.tag, lsn)?;
} else {
let rec = WALRecord {
lsn,
will_init: blk.will_init || blk.apply_image,
rec: recdata.clone(),
main_data_offset: decoded.main_data_offset as u32,
};
timeline.put_wal_record(blk.tag, rec)?;
}
}
// Handle a few special record types
@@ -329,18 +441,18 @@ fn save_xlog_dbase_create(timeline: &dyn Timeline, lsn: Lsn, rec: &XlCreateDatab
// Copy content
for blknum in 0..nblocks {
let src_key = BufferTag {
let src_key = ObjectTag::RelationBuffer(BufferTag {
rel: src_rel,
blknum,
};
let dst_key = BufferTag {
});
let dst_key = ObjectTag::RelationBuffer(BufferTag {
rel: dst_rel,
blknum,
};
});
let content = timeline.get_page_at_lsn(src_key, req_lsn)?;
let content = timeline.get_page_at_lsn_nowait(src_key, req_lsn)?;
info!("copying block {:?} to {:?}", src_key, dst_key);
debug!("copying block {:?} to {:?}", src_key, dst_key);
timeline.put_page_image(dst_key, lsn, content)?;
num_blocks_copied += 1;
@@ -353,6 +465,23 @@ fn save_xlog_dbase_create(timeline: &dyn Timeline, lsn: Lsn, rec: &XlCreateDatab
num_rels_copied += 1;
}
// Copy relfilemap
for tag in timeline.list_nonrels(req_lsn)? {
match tag {
ObjectTag::FileNodeMap(db) => {
if db.spcnode == src_tablespace_id && db.dbnode == src_db_id {
let img = timeline.get_page_at_lsn_nowait(tag, req_lsn)?;
let new_tag = ObjectTag::FileNodeMap(DatabaseTag {
spcnode: tablespace_id,
dbnode: db_id,
});
timeline.put_page_image(new_tag, lsn, img)?;
break;
}
}
_ => {} // do nothing
}
}
info!(
"Created database {}/{}, copied {} blocks in {} rels at {}",
tablespace_id, db_id, num_blocks_copied, num_rels_copied, lsn

View File

@@ -2,7 +2,7 @@
//! An implementation of the ObjectStore interface, backed by RocksDB
//!
use crate::object_store::{ObjectKey, ObjectStore};
use crate::repository::{BufferTag, RelTag};
use crate::repository::{BufferTag, ObjectTag, RelTag};
use crate::PageServerConf;
use crate::ZTimelineId;
use anyhow::{bail, Result};
@@ -24,7 +24,7 @@ impl StorageKey {
Self {
obj_key: ObjectKey {
timeline,
buf_tag: BufferTag::ZEROED,
tag: ObjectTag::FirstTag,
},
lsn: Lsn(0),
}
@@ -123,6 +123,17 @@ impl ObjectStore for RocksObjectStore {
Ok(Box::new(iter))
}
/// Iterate through all timeline objects
fn list_objects<'a>(
&'a self,
timeline: ZTimelineId,
nonrel_only: bool,
lsn: Lsn,
) -> Result<Box<dyn Iterator<Item = ObjectTag> + 'a>> {
let iter = RocksObjectIter::new(&self.db, timeline, nonrel_only, lsn)?;
Ok(Box::new(iter))
}
/// Get a list of all distinct relations in given tablespace and database.
///
/// TODO: This implementation is very inefficient, it scans
@@ -143,7 +154,7 @@ impl ObjectStore for RocksObjectStore {
let mut search_key = StorageKey {
obj_key: ObjectKey {
timeline: timelineid,
buf_tag: BufferTag {
tag: ObjectTag::RelationBuffer(BufferTag {
rel: RelTag {
spcnode,
dbnode,
@@ -151,7 +162,7 @@ impl ObjectStore for RocksObjectStore {
forknum: 0u8,
},
blknum: 0,
},
}),
},
lsn: Lsn(0),
};
@@ -162,20 +173,21 @@ impl ObjectStore for RocksObjectStore {
break;
}
let key = StorageKey::des(iter.key().unwrap())?;
if (spcnode != 0 && key.obj_key.buf_tag.rel.spcnode != spcnode)
|| (dbnode != 0 && key.obj_key.buf_tag.rel.dbnode != dbnode)
{
if let ObjectTag::RelationBuffer(buf_tag) = key.obj_key.tag {
if (spcnode != 0 && buf_tag.rel.spcnode != spcnode)
|| (dbnode != 0 && buf_tag.rel.dbnode != dbnode)
{
break;
}
if key.lsn < lsn {
rels.insert(buf_tag.rel);
}
let mut next_tag = buf_tag.clone();
next_tag.rel.relnode += 1; // skip to next relation
search_key.obj_key.tag = ObjectTag::RelationBuffer(next_tag);
} else {
break;
}
if key.obj_key.buf_tag.rel.relnode != 0 // skip non-relational records (like timeline metadata)
&& key.lsn < lsn
// visible in this snapshot
{
rels.insert(key.obj_key.buf_tag.rel);
}
search_key = key.clone();
search_key.obj_key.buf_tag.rel.relnode += 1; // skip to next relation
}
Ok(rels)
@@ -189,7 +201,7 @@ impl ObjectStore for RocksObjectStore {
&'a self,
timeline: ZTimelineId,
lsn: Lsn,
) -> Result<Box<dyn Iterator<Item = Result<(BufferTag, Lsn, Vec<u8>)>> + 'a>> {
) -> Result<Box<dyn Iterator<Item = Result<(ObjectTag, Lsn, Vec<u8>)>> + 'a>> {
let start_key = StorageKey::timeline_start(timeline);
let start_key_bytes = StorageKey::ser(&start_key)?;
let iter = self.db.iterator(rocksdb::IteratorMode::From(
@@ -296,7 +308,7 @@ impl<'a> Iterator for RocksObjectVersionIter<'a> {
return None;
}
let key = StorageKey::des(self.dbiter.key().unwrap()).unwrap();
if key.obj_key.buf_tag != self.obj_key.buf_tag {
if key.obj_key.tag != self.obj_key.tag {
return None;
}
let val = self.dbiter.value().unwrap();
@@ -314,7 +326,7 @@ struct RocksObjects<'r> {
impl<'r> Iterator for RocksObjects<'r> {
// TODO consider returning Box<[u8]>
type Item = Result<(BufferTag, Lsn, Vec<u8>)>;
type Item = Result<(ObjectTag, Lsn, Vec<u8>)>;
fn next(&mut self) -> Option<Self::Item> {
self.next_result().transpose()
@@ -322,7 +334,7 @@ impl<'r> Iterator for RocksObjects<'r> {
}
impl<'r> RocksObjects<'r> {
fn next_result(&mut self) -> Result<Option<(BufferTag, Lsn, Vec<u8>)>> {
fn next_result(&mut self) -> Result<Option<(ObjectTag, Lsn, Vec<u8>)>> {
for (key_bytes, v) in &mut self.iter {
let key = StorageKey::des(&key_bytes)?;
@@ -335,9 +347,75 @@ impl<'r> RocksObjects<'r> {
continue;
}
return Ok(Some((key.obj_key.buf_tag, key.lsn, v.to_vec())));
return Ok(Some((key.obj_key.tag, key.lsn, v.to_vec())));
}
Ok(None)
}
}
///
/// Iterator for `list_objects`. Returns all objects preceeding specified LSN
///
struct RocksObjectIter<'a> {
timeline: ZTimelineId,
key: StorageKey,
nonrel_only: bool,
lsn: Lsn,
dbiter: rocksdb::DBRawIterator<'a>,
}
impl<'a> RocksObjectIter<'a> {
fn new(
db: &'a rocksdb::DB,
timeline: ZTimelineId,
nonrel_only: bool,
lsn: Lsn,
) -> Result<RocksObjectIter<'a>> {
let key = StorageKey {
obj_key: ObjectKey {
timeline,
tag: ObjectTag::FirstTag,
},
lsn: Lsn(0),
};
let dbiter = db.raw_iterator();
Ok(RocksObjectIter {
key,
timeline,
nonrel_only,
lsn,
dbiter,
})
}
}
impl<'a> Iterator for RocksObjectIter<'a> {
type Item = ObjectTag;
fn next(&mut self) -> std::option::Option<Self::Item> {
loop {
self.dbiter.seek(StorageKey::ser(&self.key).unwrap());
if !self.dbiter.valid() {
return None;
}
let key = StorageKey::des(self.dbiter.key().unwrap()).unwrap();
if key.obj_key.timeline != self.timeline {
// End of this timeline
return None;
}
self.key = key.clone();
self.key.lsn = Lsn(u64::MAX); // next seek should skip all versions
if key.lsn <= self.lsn {
// visible in this snapshot
if self.nonrel_only {
match key.obj_key.tag {
ObjectTag::RelationMetadata(_) => return None,
ObjectTag::RelationBuffer(_) => return None,
_ => return Some(key.obj_key.tag),
}
} else {
return Some(key.obj_key.tag);
}
}
}
}
}

View File

@@ -2,14 +2,15 @@
//! WAL decoder. For each WAL record, it decodes the record to figure out which data blocks
//! the record affects, to add the records to the page cache.
//!
use crate::repository::*;
use bytes::{Buf, BufMut, Bytes, BytesMut};
use log::*;
use postgres_ffi::pg_constants;
use postgres_ffi::xlog_utils::*;
use postgres_ffi::CheckPoint;
use postgres_ffi::XLogLongPageHeaderData;
use postgres_ffi::XLogPageHeaderData;
use postgres_ffi::XLogRecord;
use std::cmp::min;
use std::str;
use thiserror::Error;
@@ -23,6 +24,9 @@ pub type MultiXactId = TransactionId;
pub type MultiXactOffset = u32;
pub type MultiXactStatus = u32;
const MAX_MBR_BLKNO: u32 =
pg_constants::MAX_MULTIXACT_ID / pg_constants::MULTIXACT_MEMBERS_PER_PAGE as u32;
#[allow(dead_code)]
pub struct WalStreamDecoder {
lsn: Lsn,
@@ -198,12 +202,7 @@ pub struct DecodedBkpBlock {
//in_use: bool,
/* Identify the block this refers to */
pub rnode_spcnode: u32,
pub rnode_dbnode: u32,
pub rnode_relnode: u32,
// Note that we have a few special forknum values for non-rel files.
pub forknum: u8,
pub blkno: u32,
pub tag: ObjectTag,
/* copy of the fork_flags field from the XLogRecordBlockHeader */
flags: u8,
@@ -212,6 +211,7 @@ pub struct DecodedBkpBlock {
has_image: bool, /* has image, even for consistency checking */
pub apply_image: bool, /* has image that should be restored */
pub will_init: bool, /* record doesn't need previous page version to apply */
pub will_drop: bool, /* record drops relation */
//char *bkp_image;
hole_offset: u16,
hole_length: u16,
@@ -226,16 +226,13 @@ pub struct DecodedBkpBlock {
impl DecodedBkpBlock {
pub fn new() -> DecodedBkpBlock {
DecodedBkpBlock {
rnode_spcnode: 0,
rnode_dbnode: 0,
rnode_relnode: 0,
forknum: 0,
blkno: 0,
tag: ObjectTag::FirstTag,
flags: 0,
has_image: false,
apply_image: false,
will_init: false,
will_drop: false,
hole_offset: 0,
hole_length: 0,
bimg_len: 0,
@@ -490,10 +487,11 @@ impl XlMultiXactTruncate {
// block data
// ...
// main data
pub fn decode_wal_record(record: Bytes) -> DecodedWALRecord {
let mut rnode_spcnode: u32 = 0;
let mut rnode_dbnode: u32 = 0;
let mut rnode_relnode: u32 = 0;
pub fn decode_wal_record(checkpoint: &mut CheckPoint, record: Bytes) -> DecodedWALRecord {
let mut spcnode: u32 = 0;
let mut dbnode: u32 = 0;
let mut relnode: u32 = 0;
let mut forknum: u8;
let mut got_rnode = false;
let mut buf = record.clone();
@@ -509,6 +507,7 @@ pub fn decode_wal_record(record: Bytes) -> DecodedWALRecord {
xlogrec.xl_info
);
checkpoint.update_next_xid(xlogrec.xl_xid);
let remaining: usize = xlogrec.xl_tot_len as usize - XLOG_SIZE_OF_XLOG_RECORD;
if buf.remaining() != remaining {
@@ -567,7 +566,7 @@ pub fn decode_wal_record(record: Bytes) -> DecodedWALRecord {
max_block_id = block_id;
fork_flags = buf.get_u8();
blk.forknum = fork_flags & pg_constants::BKPBLOCK_FORK_MASK;
forknum = fork_flags & pg_constants::BKPBLOCK_FORK_MASK;
blk.flags = fork_flags;
blk.has_image = (fork_flags & pg_constants::BKPBLOCK_HAS_IMAGE) != 0;
blk.has_data = (fork_flags & pg_constants::BKPBLOCK_HAS_DATA) != 0;
@@ -673,9 +672,9 @@ pub fn decode_wal_record(record: Bytes) -> DecodedWALRecord {
}
}
if fork_flags & pg_constants::BKPBLOCK_SAME_REL == 0 {
rnode_spcnode = buf.get_u32_le();
rnode_dbnode = buf.get_u32_le();
rnode_relnode = buf.get_u32_le();
spcnode = buf.get_u32_le();
dbnode = buf.get_u32_le();
relnode = buf.get_u32_le();
got_rnode = true;
} else if !got_rnode {
// TODO
@@ -686,18 +685,16 @@ pub fn decode_wal_record(record: Bytes) -> DecodedWALRecord {
goto err; */
}
blk.rnode_spcnode = rnode_spcnode;
blk.rnode_dbnode = rnode_dbnode;
blk.rnode_relnode = rnode_relnode;
blk.blkno = buf.get_u32_le();
trace!(
"this record affects {}/{}/{} blk {}",
rnode_spcnode,
rnode_dbnode,
rnode_relnode,
blk.blkno
);
blk.tag = ObjectTag::RelationBuffer(BufferTag {
rel: RelTag {
forknum,
spcnode,
dbnode,
relnode,
},
blknum: buf.get_u32_le(),
});
trace!("this record affects {:?}", blk.tag);
blocks.push(blk);
}
@@ -719,10 +716,46 @@ pub fn decode_wal_record(record: Bytes) -> DecodedWALRecord {
assert_eq!(buf.remaining(), main_data_len as usize);
}
//5. Handle special XACT records
if xlogrec.xl_rmid == pg_constants::RM_XACT_ID {
//5. Handle special CLOG and XACT records
if xlogrec.xl_rmid == pg_constants::RM_CLOG_ID {
let mut blk = DecodedBkpBlock::new();
let blknum = buf.get_i32_le() as u32;
blk.tag = ObjectTag::Clog(SlruBufferTag { blknum });
let info = xlogrec.xl_info & !pg_constants::XLR_INFO_MASK;
if info == pg_constants::CLOG_ZEROPAGE {
blk.will_init = true;
} else {
assert!(info == pg_constants::CLOG_TRUNCATE);
blk.will_drop = true;
checkpoint.oldestXid = buf.get_u32_le();
checkpoint.oldestXidDB = buf.get_u32_le();
trace!(
"RM_CLOG_ID truncate blkno {} oldestXid {} oldestXidDB {}",
blknum,
checkpoint.oldestXid,
checkpoint.oldestXidDB
);
}
trace!("RM_CLOG_ID updates block {}", blknum);
blocks.push(blk);
} else if xlogrec.xl_rmid == pg_constants::RM_XACT_ID {
let info = xlogrec.xl_info & pg_constants::XLOG_XACT_OPMASK;
if info == pg_constants::XLOG_XACT_COMMIT {
if info == pg_constants::XLOG_XACT_COMMIT || info == pg_constants::XLOG_XACT_COMMIT_PREPARED
{
if info == pg_constants::XLOG_XACT_COMMIT {
let mut blk = DecodedBkpBlock::new();
let blknum = xlogrec.xl_xid / pg_constants::CLOG_XACTS_PER_PAGE;
blk.tag = ObjectTag::Clog(SlruBufferTag { blknum });
trace!(
"XLOG_XACT_COMMIT xl_info {} xl_prev {:X}/{:X} xid {} updates block {} main_data_len {}",
xlogrec.xl_info, (xlogrec.xl_prev >> 32),
xlogrec.xl_prev & 0xffffffff,
xlogrec.xl_xid,
blknum,
main_data_len
);
blocks.push(blk);
}
//parse commit record to extract subtrans entries
// xl_xact_commit starts with time of commit
let _xact_time = buf.get_i64_le();
@@ -737,8 +770,16 @@ pub fn decode_wal_record(record: Bytes) -> DecodedWALRecord {
}
if xinfo & pg_constants::XACT_XINFO_HAS_SUBXACTS != 0 {
let nsubxacts = buf.get_i32_le();
let mut prev_blknum = u32::MAX;
for _i in 0..nsubxacts {
let _subxact = buf.get_u32_le();
let subxact = buf.get_u32_le();
let blknum = subxact / pg_constants::CLOG_XACTS_PER_PAGE;
if prev_blknum != blknum {
prev_blknum = blknum;
let mut blk = DecodedBkpBlock::new();
blk.tag = ObjectTag::Clog(SlruBufferTag { blknum });
blocks.push(blk);
}
}
}
if xinfo & pg_constants::XACT_XINFO_HAS_RELFILENODES != 0 {
@@ -747,7 +788,15 @@ pub fn decode_wal_record(record: Bytes) -> DecodedWALRecord {
let spcnode = buf.get_u32_le();
let dbnode = buf.get_u32_le();
let relnode = buf.get_u32_le();
//TODO handle this too?
let mut blk = DecodedBkpBlock::new();
blk.tag = ObjectTag::RelationMetadata(RelTag {
forknum: pg_constants::MAIN_FORKNUM,
spcnode,
dbnode,
relnode,
});
blk.will_drop = true;
blocks.push(blk);
trace!(
"XLOG_XACT_COMMIT relfilenode {}/{}/{}",
spcnode,
@@ -764,11 +813,31 @@ pub fn decode_wal_record(record: Bytes) -> DecodedWALRecord {
}
}
if xinfo & pg_constants::XACT_XINFO_HAS_TWOPHASE != 0 {
let _xid = buf.get_u32_le();
let xid = buf.get_u32_le();
let mut blk = DecodedBkpBlock::new();
let blknum = xid / pg_constants::CLOG_XACTS_PER_PAGE;
blk.tag = ObjectTag::Clog(SlruBufferTag { blknum });
blocks.push(blk);
trace!("XLOG_XACT_COMMIT-XACT_XINFO_HAS_TWOPHASE");
//TODO handle this to be able to restore pg_twophase on node start
}
} else if info == pg_constants::XLOG_XACT_ABORT {
} else if info == pg_constants::XLOG_XACT_ABORT
|| info == pg_constants::XLOG_XACT_ABORT_PREPARED
{
if info == pg_constants::XLOG_XACT_ABORT {
let mut blk = DecodedBkpBlock::new();
let blknum = xlogrec.xl_xid / pg_constants::CLOG_XACTS_PER_PAGE;
blk.tag = ObjectTag::Clog(SlruBufferTag { blknum });
trace!(
"XLOG_XACT_ABORT xl_info {} xl_prev {:X}/{:X} xid {} updates block {} main_data_len {}",
xlogrec.xl_info, (xlogrec.xl_prev >> 32),
xlogrec.xl_prev & 0xffffffff,
xlogrec.xl_xid,
blknum,
main_data_len
);
blocks.push(blk);
}
//parse abort record to extract subtrans entries
// xl_xact_abort starts with time of commit
let _xact_time = buf.get_i64_le();
@@ -783,8 +852,16 @@ pub fn decode_wal_record(record: Bytes) -> DecodedWALRecord {
}
if xinfo & pg_constants::XACT_XINFO_HAS_SUBXACTS != 0 {
let nsubxacts = buf.get_i32_le();
let mut prev_blknum = u32::MAX;
for _i in 0..nsubxacts {
let _subxact = buf.get_u32_le();
let subxact = buf.get_u32_le();
let blknum = subxact / pg_constants::CLOG_XACTS_PER_PAGE;
if prev_blknum != blknum {
prev_blknum = blknum;
let mut blk = DecodedBkpBlock::new();
blk.tag = ObjectTag::Clog(SlruBufferTag { blknum });
blocks.push(blk);
}
}
}
if xinfo & pg_constants::XACT_XINFO_HAS_RELFILENODES != 0 {
@@ -793,7 +870,15 @@ pub fn decode_wal_record(record: Bytes) -> DecodedWALRecord {
let spcnode = buf.get_u32_le();
let dbnode = buf.get_u32_le();
let relnode = buf.get_u32_le();
//TODO handle this too?
let mut blk = DecodedBkpBlock::new();
blk.tag = ObjectTag::RelationMetadata(RelTag {
forknum: pg_constants::MAIN_FORKNUM,
spcnode,
dbnode,
relnode,
});
blk.will_drop = true;
blocks.push(blk);
trace!(
"XLOG_XACT_ABORT relfilenode {}/{}/{}",
spcnode,
@@ -803,9 +888,21 @@ pub fn decode_wal_record(record: Bytes) -> DecodedWALRecord {
}
}
if xinfo & pg_constants::XACT_XINFO_HAS_TWOPHASE != 0 {
let _xid = buf.get_u32_le();
let xid = buf.get_u32_le();
let mut blk = DecodedBkpBlock::new();
let blknum = xid / pg_constants::CLOG_XACTS_PER_PAGE;
blk.tag = ObjectTag::Clog(SlruBufferTag { blknum });
blocks.push(blk);
trace!("XLOG_XACT_ABORT-XACT_XINFO_HAS_TWOPHASE");
}
} else if info == pg_constants::XLOG_XACT_PREPARE {
let mut blk = DecodedBkpBlock::new();
blk.tag = ObjectTag::TwoPhase(PrepareTag {
xid: xlogrec.xl_xid,
});
blk.will_init = true;
blocks.push(blk);
debug!("Prepare transaction {}", xlogrec.xl_xid);
}
} else if xlogrec.xl_rmid == pg_constants::RM_DBASE_ID {
let info = xlogrec.xl_info & !pg_constants::XLR_INFO_MASK;
@@ -838,8 +935,7 @@ pub fn decode_wal_record(record: Bytes) -> DecodedWALRecord {
trace!("XLOG_TBLSPC_DROP is not handled yet");
}
} else if xlogrec.xl_rmid == pg_constants::RM_HEAP_ID {
let info = xlogrec.xl_info & pg_constants::XLOG_XACT_OPMASK;
let blkno = blocks[0].blkno / pg_constants::HEAPBLOCKS_PER_PAGE as u32;
let info = xlogrec.xl_info & pg_constants::XLR_RMGR_INFO_MASK;
if info == pg_constants::XLOG_HEAP_INSERT {
let xlrec = XlHeapInsert::decode(&mut buf);
if (xlrec.flags
@@ -847,52 +943,96 @@ pub fn decode_wal_record(record: Bytes) -> DecodedWALRecord {
| pg_constants::XLH_INSERT_ALL_FROZEN_SET))
!= 0
{
let mut blk = DecodedBkpBlock::new();
blk.forknum = pg_constants::VISIBILITYMAP_FORKNUM;
blk.blkno = blkno;
blk.rnode_spcnode = blocks[0].rnode_spcnode;
blk.rnode_dbnode = blocks[0].rnode_dbnode;
blk.rnode_relnode = blocks[0].rnode_relnode;
blocks.push(blk);
if let ObjectTag::RelationBuffer(tag0) = blocks[0].tag {
let mut blk = DecodedBkpBlock::new();
blk.tag = ObjectTag::RelationBuffer(BufferTag {
rel: RelTag {
forknum: pg_constants::VISIBILITYMAP_FORKNUM,
spcnode: tag0.rel.spcnode,
dbnode: tag0.rel.dbnode,
relnode: tag0.rel.relnode,
},
blknum: tag0.blknum / pg_constants::HEAPBLOCKS_PER_PAGE as u32,
});
blocks.push(blk);
} else {
panic!(
"Block 0 is expected to be relation buffer tag but it is {:?}",
blocks[0].tag
);
}
}
} else if info == pg_constants::XLOG_HEAP_DELETE {
let xlrec = XlHeapDelete::decode(&mut buf);
if (xlrec.flags & pg_constants::XLH_DELETE_ALL_VISIBLE_CLEARED) != 0 {
let mut blk = DecodedBkpBlock::new();
blk.forknum = pg_constants::VISIBILITYMAP_FORKNUM;
blk.blkno = blkno;
blk.rnode_spcnode = blocks[0].rnode_spcnode;
blk.rnode_dbnode = blocks[0].rnode_dbnode;
blk.rnode_relnode = blocks[0].rnode_relnode;
blocks.push(blk);
if let ObjectTag::RelationBuffer(tag0) = blocks[0].tag {
let mut blk = DecodedBkpBlock::new();
blk.tag = ObjectTag::RelationBuffer(BufferTag {
rel: RelTag {
forknum: pg_constants::VISIBILITYMAP_FORKNUM,
spcnode: tag0.rel.spcnode,
dbnode: tag0.rel.dbnode,
relnode: tag0.rel.relnode,
},
blknum: tag0.blknum / pg_constants::HEAPBLOCKS_PER_PAGE as u32,
});
blocks.push(blk);
} else {
panic!(
"Block 0 is expected to be relation buffer tag but it is {:?}",
blocks[0].tag
);
}
}
} else if info == pg_constants::XLOG_HEAP_UPDATE
|| info == pg_constants::XLOG_HEAP_HOT_UPDATE
{
let xlrec = XlHeapUpdate::decode(&mut buf);
if (xlrec.flags & pg_constants::XLH_UPDATE_NEW_ALL_VISIBLE_CLEARED) != 0 {
let mut blk = DecodedBkpBlock::new();
blk.forknum = pg_constants::VISIBILITYMAP_FORKNUM;
blk.blkno = blkno;
blk.rnode_spcnode = blocks[0].rnode_spcnode;
blk.rnode_dbnode = blocks[0].rnode_dbnode;
blk.rnode_relnode = blocks[0].rnode_relnode;
blocks.push(blk);
if let ObjectTag::RelationBuffer(tag0) = blocks[0].tag {
let mut blk = DecodedBkpBlock::new();
blk.tag = ObjectTag::RelationBuffer(BufferTag {
rel: RelTag {
forknum: pg_constants::VISIBILITYMAP_FORKNUM,
spcnode: tag0.rel.spcnode,
dbnode: tag0.rel.dbnode,
relnode: tag0.rel.relnode,
},
blknum: tag0.blknum / pg_constants::HEAPBLOCKS_PER_PAGE as u32,
});
blocks.push(blk);
} else {
panic!(
"Block 0 is expected to be relation buffer tag but it is {:?}",
blocks[0].tag
);
}
}
if (xlrec.flags & pg_constants::XLH_UPDATE_OLD_ALL_VISIBLE_CLEARED) != 0
&& blocks.len() > 1
{
let mut blk = DecodedBkpBlock::new();
blk.forknum = pg_constants::VISIBILITYMAP_FORKNUM;
blk.blkno = blocks[1].blkno / pg_constants::HEAPBLOCKS_PER_PAGE as u32;
blk.rnode_spcnode = blocks[1].rnode_spcnode;
blk.rnode_dbnode = blocks[1].rnode_dbnode;
blk.rnode_relnode = blocks[1].rnode_relnode;
blocks.push(blk);
if let ObjectTag::RelationBuffer(tag1) = blocks[1].tag {
let mut blk = DecodedBkpBlock::new();
blk.tag = ObjectTag::RelationBuffer(BufferTag {
rel: RelTag {
forknum: pg_constants::VISIBILITYMAP_FORKNUM,
spcnode: tag1.rel.spcnode,
dbnode: tag1.rel.dbnode,
relnode: tag1.rel.relnode,
},
blknum: tag1.blknum / pg_constants::HEAPBLOCKS_PER_PAGE as u32,
});
blocks.push(blk);
} else {
panic!(
"Block 1 is expected to be relation buffer tag but it is {:?}",
blocks[1].tag
);
}
}
}
} else if xlogrec.xl_rmid == pg_constants::RM_HEAP2_ID {
let info = xlogrec.xl_info & pg_constants::XLOG_XACT_OPMASK;
let info = xlogrec.xl_info & pg_constants::XLR_RMGR_INFO_MASK;
if info == pg_constants::XLOG_HEAP2_MULTI_INSERT {
let xlrec = XlHeapMultiInsert::decode(&mut buf);
if (xlrec.flags
@@ -900,14 +1040,159 @@ pub fn decode_wal_record(record: Bytes) -> DecodedWALRecord {
| pg_constants::XLH_INSERT_ALL_FROZEN_SET))
!= 0
{
if let ObjectTag::RelationBuffer(tag0) = blocks[0].tag {
let mut blk = DecodedBkpBlock::new();
blk.tag = ObjectTag::RelationBuffer(BufferTag {
rel: RelTag {
forknum: pg_constants::VISIBILITYMAP_FORKNUM,
spcnode: tag0.rel.spcnode,
dbnode: tag0.rel.dbnode,
relnode: tag0.rel.relnode,
},
blknum: tag0.blknum / pg_constants::HEAPBLOCKS_PER_PAGE as u32,
});
blocks.push(blk);
} else {
panic!(
"Block 0 is expected to be relation buffer tag but it is {:?}",
blocks[0].tag
);
}
}
}
} else if xlogrec.xl_rmid == pg_constants::RM_MULTIXACT_ID {
let info = xlogrec.xl_info & pg_constants::XLR_RMGR_INFO_MASK;
if info == pg_constants::XLOG_MULTIXACT_ZERO_OFF_PAGE {
let mut blk = DecodedBkpBlock::new();
blk.tag = ObjectTag::MultiXactOffsets(SlruBufferTag {
blknum: buf.get_u32_le(),
});
blk.will_init = true;
blocks.push(blk);
} else if info == pg_constants::XLOG_MULTIXACT_ZERO_MEM_PAGE {
let mut blk = DecodedBkpBlock::new();
blk.tag = ObjectTag::MultiXactMembers(SlruBufferTag {
blknum: buf.get_u32_le(),
});
blk.will_init = true;
blocks.push(blk);
} else if info == pg_constants::XLOG_MULTIXACT_CREATE_ID {
let xlrec = XlMultiXactCreate::decode(&mut buf);
// Update offset page
let mut blk = DecodedBkpBlock::new();
let blknum = xlrec.mid / pg_constants::MULTIXACT_OFFSETS_PER_PAGE as u32;
blk.tag = ObjectTag::MultiXactOffsets(SlruBufferTag { blknum });
blocks.push(blk);
let first_mbr_blkno = xlrec.moff / pg_constants::MULTIXACT_MEMBERS_PER_PAGE as u32;
let last_mbr_blkno =
(xlrec.moff + xlrec.nmembers - 1) / pg_constants::MULTIXACT_MEMBERS_PER_PAGE as u32;
// The members SLRU can, in contrast to the offsets one, be filled to almost
// the full range at once. So we need to handle wraparound.
let mut blknum = first_mbr_blkno;
loop {
// Update members page
let mut blk = DecodedBkpBlock::new();
let blkno = blocks[0].blkno / pg_constants::HEAPBLOCKS_PER_PAGE as u32;
blk.forknum = pg_constants::VISIBILITYMAP_FORKNUM;
blk.blkno = blkno;
blk.rnode_spcnode = blocks[0].rnode_spcnode;
blk.rnode_dbnode = blocks[0].rnode_dbnode;
blk.rnode_relnode = blocks[0].rnode_relnode;
blk.tag = ObjectTag::MultiXactMembers(SlruBufferTag { blknum });
blocks.push(blk);
if blknum == last_mbr_blkno {
// last block inclusive
break;
}
// handle wraparound
if blknum == MAX_MBR_BLKNO {
blknum = 0;
} else {
blknum += 1;
}
}
if xlrec.mid >= checkpoint.nextMulti {
checkpoint.nextMulti = xlrec.mid + 1;
}
if xlrec.moff + xlrec.nmembers > checkpoint.nextMultiOffset {
checkpoint.nextMultiOffset = xlrec.moff + xlrec.nmembers;
}
let max_mbr_xid =
xlrec.members.iter().fold(
0u32,
|acc, mbr| {
if mbr.xid > acc {
mbr.xid
} else {
acc
}
},
);
checkpoint.update_next_xid(max_mbr_xid);
} else if info == pg_constants::XLOG_MULTIXACT_TRUNCATE_ID {
let xlrec = XlMultiXactTruncate::decode(&mut buf);
checkpoint.oldestMulti = xlrec.end_trunc_off;
checkpoint.oldestMultiDB = xlrec.oldest_multi_db;
let first_off_blkno =
xlrec.start_trunc_off / pg_constants::MULTIXACT_OFFSETS_PER_PAGE as u32;
let last_off_blkno =
xlrec.end_trunc_off / pg_constants::MULTIXACT_OFFSETS_PER_PAGE as u32;
// Delete all the segments but the last one. The last segment can still
// contain, possibly partially, valid data.
for blknum in first_off_blkno..last_off_blkno {
let mut blk = DecodedBkpBlock::new();
blk.tag = ObjectTag::MultiXactOffsets(SlruBufferTag { blknum });
blk.will_drop = true;
blocks.push(blk);
}
let first_mbr_blkno =
xlrec.start_trunc_memb / pg_constants::MULTIXACT_MEMBERS_PER_PAGE as u32;
let last_mbr_blkno =
xlrec.end_trunc_memb / pg_constants::MULTIXACT_MEMBERS_PER_PAGE as u32;
// The members SLRU can, in contrast to the offsets one, be filled to almost
// the full range at once. So we need to handle wraparound.
let mut blknum = first_mbr_blkno;
// Delete all the segments but the last one. The last segment can still
// contain, possibly partially, valid data.
while blknum != last_mbr_blkno {
let mut blk = DecodedBkpBlock::new();
blk.tag = ObjectTag::MultiXactMembers(SlruBufferTag { blknum });
blk.will_drop = true;
blocks.push(blk);
// handle wraparound
if blknum == MAX_MBR_BLKNO {
blknum = 0;
} else {
blknum += 1;
}
}
} else {
panic!()
}
} else if xlogrec.xl_rmid == pg_constants::RM_RELMAP_ID {
let xlrec = XlRelmapUpdate::decode(&mut buf);
let mut blk = DecodedBkpBlock::new();
blk.tag = ObjectTag::FileNodeMap(DatabaseTag {
spcnode: xlrec.tsid,
dbnode: xlrec.dbid,
});
blk.will_init = true;
blocks.push(blk);
} else if xlogrec.xl_rmid == pg_constants::RM_XLOG_ID {
let info = xlogrec.xl_info & pg_constants::XLR_RMGR_INFO_MASK;
if info == pg_constants::XLOG_NEXTOID {
let next_oid = buf.get_u32_le();
if next_oid > checkpoint.nextOid {
checkpoint.nextOid = next_oid;
}
} else if info == pg_constants::XLOG_CHECKPOINT_ONLINE
|| info == pg_constants::XLOG_CHECKPOINT_SHUTDOWN
{
let mut checkpoint_bytes = [0u8; SIZEOF_CHECKPOINT];
buf.copy_to_slice(&mut checkpoint_bytes);
let xlog_checkpoint = CheckPoint::decode(&checkpoint_bytes).unwrap();
trace!(
"xlog_checkpoint.oldestXid={}, checkpoint.oldestXid={}",
xlog_checkpoint.oldestXid, checkpoint.oldestXid
);
if (checkpoint.oldestXid.wrapping_sub(xlog_checkpoint.oldestXid) as i32) < 0 {
checkpoint.oldestXid = xlog_checkpoint.oldestXid;
}
}
}

View File

@@ -5,6 +5,7 @@
//! We keep one WAL receiver active per timeline.
use crate::page_cache;
use crate::repository::*;
use crate::restore_local_repo;
use crate::waldecoder::*;
use crate::PageServerConf;
@@ -15,8 +16,8 @@ use log::*;
use postgres::fallible_iterator::FallibleIterator;
use postgres::replication::ReplicationIter;
use postgres::{Client, NoTls, SimpleQueryMessage, SimpleQueryRow};
use postgres_ffi::pg_constants;
use postgres_ffi::xlog_utils::*;
use postgres_ffi::*;
use postgres_protocol::message::backend::ReplicationMessage;
use postgres_types::PgLsn;
use std::cmp::{max, min};
@@ -168,6 +169,10 @@ fn walreceiver_main(
let mut waldecoder = WalStreamDecoder::new(startpoint);
let checkpoint_bytes = timeline.get_page_at_lsn_nowait(ObjectTag::Checkpoint, startpoint)?;
let mut checkpoint = CheckPoint::decode(&checkpoint_bytes)?;
trace!("CheckPoint.nextXid = {}", checkpoint.nextXid.value);
while let Some(replication_message) = physical_stream.next()? {
let status_update = match replication_message {
ReplicationMessage::XLogData(xlog_data) => {
@@ -185,9 +190,19 @@ fn walreceiver_main(
waldecoder.feed_bytes(data);
while let Some((lsn, recdata)) = waldecoder.poll_decode()? {
let decoded = decode_wal_record(recdata.clone());
let old_checkpoint_bytes = checkpoint.encode();
let decoded = decode_wal_record(&mut checkpoint, recdata.clone());
restore_local_repo::save_decoded_record(&*timeline, &decoded, recdata, lsn)?;
last_rec_lsn = lsn;
let new_checkpoint_bytes = checkpoint.encode();
if new_checkpoint_bytes != old_checkpoint_bytes {
timeline.put_page_image(
ObjectTag::Checkpoint,
lsn,
new_checkpoint_bytes,
)?;
}
}
// Update the last_valid LSN value in the page cache one more time. We updated

View File

@@ -15,8 +15,10 @@
//! TODO: Even though the postgres code runs in a separate process,
//! it's not a secure sandbox.
//!
use bytes::{BufMut, Bytes, BytesMut};
use byteorder::{ByteOrder, LittleEndian};
use bytes::{Buf, BufMut, Bytes, BytesMut};
use log::*;
use std::assert;
use std::cell::RefCell;
use std::fs;
use std::fs::OpenOptions;
@@ -35,9 +37,12 @@ use tokio::time::timeout;
use zenith_utils::bin_ser::BeSer;
use zenith_utils::lsn::Lsn;
use crate::repository::BufferTag;
use crate::repository::WALRecord;
use crate::repository::{BufferTag, ObjectTag, WALRecord};
use crate::waldecoder::{MultiXactId, XlMultiXactCreate};
use crate::PageServerConf;
use postgres_ffi::nonrelfile_utils::transaction_id_set_status;
use postgres_ffi::pg_constants;
use postgres_ffi::XLogRecord;
///
/// WAL Redo Manager is responsible for replaying WAL records.
@@ -52,7 +57,7 @@ pub trait WalRedoManager: Send + Sync {
/// the reords.
fn request_redo(
&self,
tag: BufferTag,
tag: ObjectTag,
lsn: Lsn,
base_img: Option<Bytes>,
records: Vec<WALRecord>,
@@ -68,7 +73,7 @@ pub struct DummyRedoManager {}
impl crate::walredo::WalRedoManager for DummyRedoManager {
fn request_redo(
&self,
_tag: BufferTag,
_tag: ObjectTag,
_lsn: Lsn,
_base_img: Option<Bytes>,
_records: Vec<WALRecord>,
@@ -97,7 +102,7 @@ struct PostgresRedoManagerInternal {
#[derive(Debug)]
struct WalRedoRequest {
tag: BufferTag,
tag: ObjectTag,
lsn: Lsn,
base_img: Option<Bytes>,
@@ -159,14 +164,13 @@ impl WalRedoManager for PostgresRedoManager {
///
fn request_redo(
&self,
tag: BufferTag,
tag: ObjectTag,
lsn: Lsn,
base_img: Option<Bytes>,
records: Vec<WALRecord>,
) -> Result<Bytes, WalRedoError> {
// Create a channel where to receive the response
let (tx, rx) = mpsc::channel::<Result<Bytes, WalRedoError>>();
let request = WalRedoRequest {
tag,
lsn,
@@ -174,7 +178,6 @@ impl WalRedoManager for PostgresRedoManager {
records,
response_channel: tx,
};
self.request_tx
.lock()
.unwrap()
@@ -186,6 +189,24 @@ impl WalRedoManager for PostgresRedoManager {
}
}
fn mx_offset_to_flags_offset(xid: MultiXactId) -> usize {
((xid / pg_constants::MULTIXACT_MEMBERS_PER_MEMBERGROUP as u32) as u16
% pg_constants::MULTIXACT_MEMBERGROUPS_PER_PAGE
* pg_constants::MULTIXACT_MEMBERGROUP_SIZE) as usize
}
fn mx_offset_to_flags_bitshift(xid: MultiXactId) -> u16 {
(xid as u16) % pg_constants::MULTIXACT_MEMBERS_PER_MEMBERGROUP
* pg_constants::MXACT_MEMBER_BITS_PER_XACT
}
/* Location (byte offset within page) of TransactionId of given member */
fn mx_offset_to_member_offset(xid: MultiXactId) -> usize {
mx_offset_to_flags_offset(xid)
+ (pg_constants::MULTIXACT_FLAGBYTES_PER_GROUP
+ (xid as u16 % pg_constants::MULTIXACT_MEMBERS_PER_MEMBERGROUP) * 4) as usize
}
///
/// WAL redo thread
///
@@ -249,7 +270,244 @@ impl PostgresRedoManagerInternal {
let start = Instant::now();
let apply_result: Result<Bytes, Error>;
apply_result = process.apply_wal_records(tag, base_img, records).await;
if let ObjectTag::RelationBuffer(buf_tag) = tag {
// Relational WAL records are applied using wal-redo-postgres
apply_result = process.apply_wal_records(buf_tag, base_img, records).await;
} else {
// Non-relational WAL records we will aply ourselves.
const ZERO_PAGE: [u8; 8192] = [0u8; 8192];
let mut page = BytesMut::new();
if let Some(fpi) = base_img {
// If full-page image is provided, then use it...
page.extend_from_slice(&fpi[..]);
} else {
// otherwise initialize page with zeros
page.extend_from_slice(&ZERO_PAGE);
}
// Apply all callected WAL records
for record in records {
let mut buf = record.rec.clone();
// 1. Parse XLogRecord struct
// FIXME: refactor to avoid code duplication.
let xlogrec = XLogRecord::from_bytes(&mut buf);
//move to main data
// TODO probably, we should store some records in our special format
// to avoid this weird parsing on replay
let skip = (record.main_data_offset - pg_constants::SIZEOF_XLOGRECORD) as usize;
if buf.remaining() > skip {
buf.advance(skip);
}
if xlogrec.xl_rmid == pg_constants::RM_CLOG_ID {
let info = xlogrec.xl_info & !pg_constants::XLR_INFO_MASK;
if info == pg_constants::CLOG_ZEROPAGE {
// The only operation we need to implement is CLOG_ZEROPAGE
page.copy_from_slice(&ZERO_PAGE);
}
} else if xlogrec.xl_rmid == pg_constants::RM_XACT_ID {
// Transaction manager stuff
let info = xlogrec.xl_info & pg_constants::XLOG_XACT_OPMASK;
let mut status = 0;
let tag_blknum = match tag {
ObjectTag::Clog(slru) => slru.blknum,
ObjectTag::TwoPhase(_) => {
assert!(info == pg_constants::XLOG_XACT_PREPARE);
0 // not used by XLOG_XACT_PREPARE
}
_ => panic!("Not valid XACT object tag {:?}", tag),
};
if info == pg_constants::XLOG_XACT_COMMIT
|| info == pg_constants::XLOG_XACT_COMMIT_PREPARED
{
status = pg_constants::TRANSACTION_STATUS_COMMITTED;
if info == pg_constants::XLOG_XACT_COMMIT {
// status of 2PC transaction will be set later
transaction_id_set_status(xlogrec.xl_xid, status, &mut page);
}
let _xact_time = buf.get_i64_le();
// decode xinfo
let mut xinfo = 0;
if xlogrec.xl_info & pg_constants::XLOG_XACT_HAS_INFO != 0 {
xinfo = buf.get_u32_le();
if xinfo & pg_constants::XACT_XINFO_HAS_DBINFO != 0 {
let _dbid = buf.get_u32_le();
let _tsid = buf.get_u32_le();
}
}
// handle subtrans
if xinfo & pg_constants::XACT_XINFO_HAS_SUBXACTS != 0 {
let nsubxacts = buf.get_i32_le();
for _i in 0..nsubxacts {
let subxact = buf.get_u32_le();
let blkno = subxact as u32 / pg_constants::CLOG_XACTS_PER_PAGE;
// only update xids on the requested page
if tag_blknum == blkno {
status = pg_constants::TRANSACTION_STATUS_SUB_COMMITTED;
transaction_id_set_status(subxact, status, &mut page);
}
}
}
if info == pg_constants::XLOG_XACT_COMMIT_PREPARED {
// Do not need to handle dropped relations here, just need to skip them
if xinfo & pg_constants::XACT_XINFO_HAS_RELFILENODES != 0 {
let nrels = buf.get_i32_le();
for _i in 0..nrels {
let spcnode = buf.get_u32_le();
let dbnode = buf.get_u32_le();
let relnode = buf.get_u32_le();
//TODO handle this too?
trace!(
"XLOG_XACT_COMMIT relfilenode {}/{}/{}",
spcnode,
dbnode,
relnode
);
}
}
// Skip invalidations
if xinfo & pg_constants::XACT_XINFO_HAS_INVALS != 0 {
let nmsgs = buf.get_i32_le();
for _i in 0..nmsgs {
let sizeof_shared_invalidation_message = 0;
buf.advance(sizeof_shared_invalidation_message);
}
}
// Set status of 2PC transaction
assert!((xinfo & pg_constants::XACT_XINFO_HAS_TWOPHASE) != 0);
let xid = buf.get_u32_le();
transaction_id_set_status(xid, status, &mut page);
}
} else if info == pg_constants::XLOG_XACT_ABORT
|| info == pg_constants::XLOG_XACT_ABORT_PREPARED
{
status = pg_constants::TRANSACTION_STATUS_ABORTED;
if info == pg_constants::XLOG_XACT_ABORT {
// status of 2PC transaction will be set later
transaction_id_set_status(xlogrec.xl_xid, status, &mut page);
}
//handle subtrans
let _xact_time = buf.get_i64_le();
// decode xinfo
let mut xinfo = 0;
if xlogrec.xl_info & pg_constants::XLOG_XACT_HAS_INFO != 0 {
xinfo = buf.get_u32_le();
if xinfo & pg_constants::XACT_XINFO_HAS_DBINFO != 0 {
let _dbid = buf.get_u32_le();
let _tsid = buf.get_u32_le();
}
}
// handle subtrans
if xinfo & pg_constants::XACT_XINFO_HAS_SUBXACTS != 0 {
let nsubxacts = buf.get_i32_le();
for _i in 0..nsubxacts {
let subxact = buf.get_u32_le();
let blkno = subxact as u32 / pg_constants::CLOG_XACTS_PER_PAGE;
// only update xids on the requested page
if tag_blknum == blkno {
status = pg_constants::TRANSACTION_STATUS_ABORTED;
transaction_id_set_status(subxact, status, &mut page);
}
}
}
if info == pg_constants::XLOG_XACT_ABORT_PREPARED {
// Do not need to handle dropped relations here, just need to skip them
if xinfo & pg_constants::XACT_XINFO_HAS_RELFILENODES != 0 {
let nrels = buf.get_i32_le();
for _i in 0..nrels {
let spcnode = buf.get_u32_le();
let dbnode = buf.get_u32_le();
let relnode = buf.get_u32_le();
//TODO handle this too?
trace!(
"XLOG_XACT_COMMIT relfilenode {}/{}/{}",
spcnode,
dbnode,
relnode
);
}
}
// Skip invalidations
if xinfo & pg_constants::XACT_XINFO_HAS_INVALS != 0 {
let nmsgs = buf.get_i32_le();
for _i in 0..nmsgs {
let sizeof_shared_invalidation_message = 0;
buf.advance(sizeof_shared_invalidation_message);
}
}
// Set status of 2PC transaction
assert!((xinfo & pg_constants::XACT_XINFO_HAS_TWOPHASE) != 0);
let xid = buf.get_u32_le();
transaction_id_set_status(xid, status, &mut page);
}
} else if info == pg_constants::XLOG_XACT_PREPARE {
trace!("Apply prepare {} record", xlogrec.xl_xid);
page.clear();
page.extend_from_slice(&buf[..]);
} else {
error!("handle_apply_request for RM_XACT_ID-{} NOT SUPPORTED YET. RETURN. lsn {} main_data_offset {}, rec.len {}",
status,
record.lsn,
record.main_data_offset, record.rec.len());
}
} else if xlogrec.xl_rmid == pg_constants::RM_MULTIXACT_ID {
// Multiexact operations
let info = xlogrec.xl_info & pg_constants::XLR_RMGR_INFO_MASK;
if info == pg_constants::XLOG_MULTIXACT_ZERO_OFF_PAGE
|| info == pg_constants::XLOG_MULTIXACT_ZERO_MEM_PAGE
{
// Just need to ero page
page.copy_from_slice(&ZERO_PAGE);
} else if info == pg_constants::XLOG_MULTIXACT_CREATE_ID {
let xlrec = XlMultiXactCreate::decode(&mut buf);
if let ObjectTag::MultiXactMembers(slru) = tag {
for i in 0..xlrec.nmembers {
let blkno = i / pg_constants::MULTIXACT_MEMBERS_PER_PAGE as u32;
if blkno == slru.blknum {
// update only target block
let offset = xlrec.moff + i;
let memberoff = mx_offset_to_member_offset(offset);
let flagsoff = mx_offset_to_flags_offset(offset);
let bshift = mx_offset_to_flags_bitshift(offset);
let mut flagsval =
LittleEndian::read_u32(&page[flagsoff..flagsoff + 4]);
flagsval &=
!(((1 << pg_constants::MXACT_MEMBER_BITS_PER_XACT) - 1)
<< bshift);
flagsval |= xlrec.members[i as usize].status << bshift;
LittleEndian::write_u32(
&mut page[flagsoff..flagsoff + 4],
flagsval,
);
LittleEndian::write_u32(
&mut page[memberoff..memberoff + 4],
xlrec.members[i as usize].xid,
);
}
}
} else {
// Multixact offsets SLRU
let offs = (xlrec.mid % pg_constants::MULTIXACT_OFFSETS_PER_PAGE as u32
* 4) as usize;
LittleEndian::write_u32(&mut page[offs..offs + 4], xlrec.moff);
}
} else {
panic!();
}
} else if xlogrec.xl_rmid == pg_constants::RM_RELMAP_ID {
// Ralation map file has size 512 bytes
page.clear();
page.extend_from_slice(&buf[12..]); // skip xl_relmap_update
assert!(page.len() == 512); // size of pg_filenode.map
}
}
apply_result = Ok::<Bytes, Error>(page.freeze());
}
let duration = start.elapsed();

View File

@@ -15,6 +15,7 @@ fn main() {
// All the needed PostgreSQL headers are included from 'pg_control_ffi.h'
//
.header("pg_control_ffi.h")
.header("xlog_ffi.h")
//
// Tell cargo to invalidate the built crate whenever any of the
// included header files changed.

View File

@@ -4,6 +4,7 @@
include!(concat!(env!("OUT_DIR"), "/bindings.rs"));
pub mod controlfile_utils;
pub mod nonrelfile_utils;
pub mod pg_constants;
pub mod relfile_utils;
pub mod xlog_utils;

View File

@@ -21,13 +21,24 @@ pub const FSM_FORKNUM: u8 = 1;
pub const VISIBILITYMAP_FORKNUM: u8 = 2;
pub const INIT_FORKNUM: u8 = 3;
pub const ROCKSDB_SPECIAL_FORKNUM: u8 = 50;
// From storage_xlog.h
pub const SMGR_TRUNCATE_HEAP: u32 = 0x0001;
pub const SMGR_TRUNCATE_VM: u32 = 0x0002;
pub const SMGR_TRUNCATE_FSM: u32 = 0x0004;
// from pg_config.h. These can be changed with configure options --with-blocksize=BLOCKSIZE and
// --with-segsize=SEGSIZE, but assume the defaults for now.
pub const BLCKSZ: u16 = 8192;
pub const RELSEG_SIZE: u32 = 1024 * 1024 * 1024 / (BLCKSZ as u32);
//
// constants from clog.h
//
pub const CLOG_XACTS_PER_BYTE: u32 = 4;
pub const CLOG_XACTS_PER_PAGE: u32 = BLCKSZ as u32 * CLOG_XACTS_PER_BYTE;
pub const CLOG_BITS_PER_XACT: u8 = 2;
pub const CLOG_XACT_BITMASK: u8 = (1 << CLOG_BITS_PER_XACT) - 1;
//
// Constants from visbilitymap.h
//
@@ -35,14 +46,24 @@ pub const SIZE_OF_PAGE_HEADER: u16 = 24;
pub const BITS_PER_HEAPBLOCK: u16 = 2;
pub const HEAPBLOCKS_PER_PAGE: u16 = (BLCKSZ - SIZE_OF_PAGE_HEADER) * 8 / BITS_PER_HEAPBLOCK;
pub const TRANSACTION_STATUS_IN_PROGRESS: u8 = 0x00;
pub const TRANSACTION_STATUS_COMMITTED: u8 = 0x01;
pub const TRANSACTION_STATUS_ABORTED: u8 = 0x02;
pub const TRANSACTION_STATUS_SUB_COMMITTED: u8 = 0x03;
pub const CLOG_ZEROPAGE: u8 = 0x00;
pub const CLOG_TRUNCATE: u8 = 0x10;
// From xact.h
pub const XLOG_XACT_COMMIT: u8 = 0x00;
pub const XLOG_XACT_PREPARE: u8 = 0x10;
pub const XLOG_XACT_ABORT: u8 = 0x20;
pub const XLOG_XACT_COMMIT_PREPARED: u8 = 0x30;
pub const XLOG_XACT_ABORT_PREPARED: u8 = 0x40;
// From srlu.h
pub const SLRU_PAGES_PER_SEGMENT: u32 = 32;
pub const SLRU_SEG_SIZE: usize = BLCKSZ as usize * SLRU_PAGES_PER_SEGMENT as usize;
/* mask for filtering opcodes out of xl_info */
pub const XLOG_XACT_OPMASK: u8 = 0x70;
@@ -63,8 +84,32 @@ pub const XACT_XINFO_HAS_TWOPHASE: u32 = 1u32 << 4;
// pub const XACT_XINFO_HAS_GID: u32 = 1u32 << 7;
// From pg_control.h and rmgrlist.h
pub const XLOG_NEXTOID: u8 = 0x30;
pub const XLOG_SWITCH: u8 = 0x40;
pub const XLOG_SMGR_TRUNCATE: u8 = 0x20;
pub const DB_SHUTDOWNED: u32 = 1;
// From multixact.h
pub const FIRST_MULTIXACT_ID: u32 = 1;
pub const MAX_MULTIXACT_ID: u32 = 0xFFFFFFFF;
pub const XLOG_MULTIXACT_ZERO_OFF_PAGE: u8 = 0x00;
pub const XLOG_MULTIXACT_ZERO_MEM_PAGE: u8 = 0x10;
pub const XLOG_MULTIXACT_CREATE_ID: u8 = 0x20;
pub const XLOG_MULTIXACT_TRUNCATE_ID: u8 = 0x30;
pub const MULTIXACT_OFFSETS_PER_PAGE: u16 = BLCKSZ / 4;
pub const MXACT_MEMBER_BITS_PER_XACT: u16 = 8;
pub const MXACT_MEMBER_FLAGS_PER_BYTE: u16 = 1;
pub const MULTIXACT_FLAGBYTES_PER_GROUP: u16 = 4;
pub const MULTIXACT_MEMBERS_PER_MEMBERGROUP: u16 =
MULTIXACT_FLAGBYTES_PER_GROUP * MXACT_MEMBER_FLAGS_PER_BYTE;
/* size in bytes of a complete group */
pub const MULTIXACT_MEMBERGROUP_SIZE: u16 =
4 * MULTIXACT_MEMBERS_PER_MEMBERGROUP + MULTIXACT_FLAGBYTES_PER_GROUP;
pub const MULTIXACT_MEMBERGROUPS_PER_PAGE: u16 = BLCKSZ / MULTIXACT_MEMBERGROUP_SIZE;
pub const MULTIXACT_MEMBERS_PER_PAGE: u16 =
MULTIXACT_MEMBERGROUPS_PER_PAGE * MULTIXACT_MEMBERS_PER_MEMBERGROUP;
// From heapam_xlog.h
pub const XLOG_HEAP_INSERT: u8 = 0x00;
@@ -104,11 +149,6 @@ pub const XLOG_TBLSPC_DROP: u8 = 0x10;
pub const SIZEOF_XLOGRECORD: u32 = 24;
// from pg_config.h. These can be changed with configure options --with-blocksize=BLOCKSIZE and
// --with-segsize=SEGSIZE, but assume the defaults for now.
pub const BLCKSZ: u16 = 8192;
pub const RELSEG_SIZE: u32 = 1024 * 1024 * 1024 / (BLCKSZ as u32);
//
// from xlogrecord.h
//
@@ -139,3 +179,8 @@ pub const FIRST_NORMAL_OBJECT_ID: u32 = 16384;
/* FIXME: pageserver should request wal_seg_size from compute node */
pub const WAL_SEGMENT_SIZE: usize = 16 * 1024 * 1024;
pub const XLOG_BLCKSZ: usize = 8192;
pub const XLOG_CHECKPOINT_SHUTDOWN: u8 = 0x00;
pub const XLOG_CHECKPOINT_ONLINE: u8 = 0x10;
pub const XLP_LONG_HEADER: u16 = 0x0002;

View File

@@ -9,14 +9,16 @@
use crate::pg_constants;
use crate::CheckPoint;
use crate::ControlFileData;
use crate::FullTransactionId;
use crate::XLogLongPageHeaderData;
use crate::XLogPageHeaderData;
use crate::XLogRecord;
use crate::XLOG_PAGE_MAGIC;
use byteorder::{ByteOrder, LittleEndian};
use bytes::{Buf, Bytes};
use bytes::{BufMut, BytesMut};
use crc32c::*;
use log::*;
use std::cmp::min;
@@ -35,12 +37,15 @@ pub const MAX_SEND_SIZE: usize = XLOG_BLCKSZ * 16;
pub const XLOG_SIZE_OF_XLOG_SHORT_PHD: usize = std::mem::size_of::<XLogPageHeaderData>();
pub const XLOG_SIZE_OF_XLOG_LONG_PHD: usize = std::mem::size_of::<XLogLongPageHeaderData>();
pub const XLOG_SIZE_OF_XLOG_RECORD: usize = std::mem::size_of::<XLogRecord>();
pub const SIZE_OF_XLOG_RECORD_DATA_HEADER_SHORT: usize = 1 * 2;
pub type XLogRecPtr = u64;
pub type TimeLineID = u32;
pub type TimestampTz = u64;
pub type XLogSegNo = u64;
const XID_CHECKPOINT_INTERVAL: u32 = 1024;
#[allow(non_snake_case)]
pub fn XLogSegmentsPerXLogId(wal_segsz_bytes: usize) -> XLogSegNo {
(0x100000000u64 / wal_segsz_bytes as u64) as XLogSegNo
@@ -368,6 +373,7 @@ impl CheckPoint {
// Next XID should be greater than new_xid.
// Also take in account 32-bit wrap-around.
pub fn update_next_xid(&mut self, xid: u32) {
let xid = xid.wrapping_add(XID_CHECKPOINT_INTERVAL - 1) & !(XID_CHECKPOINT_INTERVAL - 1);
let full_xid = self.nextXid.value;
let new_xid = std::cmp::max(xid + 1, pg_constants::FIRST_NORMAL_TRANSACTION_ID);
let old_xid = full_xid as u32;
@@ -383,3 +389,58 @@ impl CheckPoint {
}
}
}
pub fn generate_wal_segment(pg_control: &ControlFileData) -> Bytes {
let mut seg_buf = BytesMut::with_capacity(pg_constants::WAL_SEGMENT_SIZE as usize);
let hdr = XLogLongPageHeaderData {
std: {
XLogPageHeaderData {
xlp_magic: XLOG_PAGE_MAGIC as u16,
xlp_info: pg_constants::XLP_LONG_HEADER,
xlp_tli: 1, // FIXME: always use Postgres timeline 1
xlp_pageaddr: pg_control.checkPoint - XLOG_SIZE_OF_XLOG_LONG_PHD as u64,
xlp_rem_len: 0,
}
},
xlp_sysid: pg_control.system_identifier,
xlp_seg_size: pg_constants::WAL_SEGMENT_SIZE as u32,
xlp_xlog_blcksz: XLOG_BLCKSZ as u32,
};
let hdr_bytes = hdr.encode();
seg_buf.extend_from_slice(&hdr_bytes);
let rec_hdr = XLogRecord {
xl_tot_len: (XLOG_SIZE_OF_XLOG_RECORD
+ SIZE_OF_XLOG_RECORD_DATA_HEADER_SHORT
+ SIZEOF_CHECKPOINT) as u32,
xl_xid: 0, //0 is for InvalidTransactionId
xl_prev: 0,
xl_info: pg_constants::XLOG_CHECKPOINT_SHUTDOWN,
xl_rmid: pg_constants::RM_XLOG_ID,
xl_crc: 0,
};
let mut rec_shord_hdr_bytes = BytesMut::new();
rec_shord_hdr_bytes.put_u8(pg_constants::XLR_BLOCK_ID_DATA_SHORT);
rec_shord_hdr_bytes.put_u8(SIZEOF_CHECKPOINT as u8);
let rec_bytes = rec_hdr.encode();
let checkpoint_bytes = pg_control.checkPointCopy.encode();
//calculate record checksum
let mut crc = 0;
crc = crc32c_append(crc, &rec_shord_hdr_bytes[..]);
crc = crc32c_append(crc, &checkpoint_bytes[..]);
crc = crc32c_append(crc, &rec_bytes[0..XLOG_RECORD_CRC_OFFS]);
seg_buf.extend_from_slice(&rec_bytes[0..XLOG_RECORD_CRC_OFFS]);
seg_buf.put_u32_le(crc);
seg_buf.extend_from_slice(&rec_shord_hdr_bytes);
seg_buf.extend_from_slice(&checkpoint_bytes);
//zero out remainig file
seg_buf.resize(pg_constants::WAL_SEGMENT_SIZE, 0);
seg_buf.freeze()
}

3
postgres_ffi/xlog_ffi.h Normal file
View File

@@ -0,0 +1,3 @@
#include "c.h"
#include "access/xlog_internal.h"
#include "access/xlogrecord.h"

View File

@@ -19,7 +19,7 @@ def test_restart_compute(zenith_cli, pageserver, postgres, pg_bin):
cur.execute("INSERT INTO foo VALUES ('bar')")
# Stop and restart the Postgres instance
pg.stop().start()
pg.stop_and_destroy().create_start('test_restart_compute')
with closing(pg.connect()) as conn:
with conn.cursor() as cur:

View File

@@ -48,7 +48,7 @@ fn main() -> Result<()> {
.help("interval for keeping WAL as walkeeper node, after which them will be uploaded to S3 and removed locally"),
)
.arg(
Arg::with_name("recall-period")
Arg::with_name("recall")
.long("recall")
.takes_value(true)
.help("Period for requestion pageserver to call for replication"),

View File

@@ -241,6 +241,9 @@ impl ReceiveWalConn {
my_info.server = server_info.clone();
my_info.server.node_id = node_id;
/* Need to save incompleted my_info in timeline to provide wal_seg_size for find_end_of_wal */
self.timeline.get().set_info(&my_info);
/* Calculate WAL end based on local data */
let (flush_lsn, timeline) = self.timeline.find_end_of_wal(&self.conf.data_dir, true);
my_info.flush_lsn = flush_lsn;

View File

@@ -63,11 +63,15 @@ impl ReplicationConn {
let feedback = HotStandbyFeedback::des(&m)?;
timeline.add_hs_feedback(feedback)
}
msg => {
None => {
break;
}
Some(msg) => {
info!("unexpected message {:?}", msg);
}
}
}
Err(anyhow!("Connection closed"))
}
/// Helper function that parses a pair of LSNs.

View File

@@ -232,6 +232,7 @@ impl TimelineTools for Option<Arc<Timeline>> {
/// Find last WAL record. If "precise" is false then just locate last partial segment
fn find_end_of_wal(&self, data_dir: &Path, precise: bool) -> (Lsn, TimeLineID) {
let seg_size = self.get().get_info().server.wal_seg_size as usize;
assert!(seg_size > 0);
let (lsn, timeline) = find_end_of_wal(data_dir, seg_size, precise);
(Lsn(lsn), timeline)
}