Compare commits

...

6 Commits

Author SHA1 Message Date
Konstantin Knizhnik
b05abaa1dd Add in-memory storage 2021-07-12 16:16:21 +03:00
Konstantin Knizhnik
819c2d03d0 Merge with main branch 2021-07-10 16:35:20 +03:00
Konstantin Knizhnik
7898aff499 Fix usage of put_page_image in save_xlog_dbase_create 2021-07-10 16:33:21 +03:00
Konstantin Knizhnik
4cc41ea971 Do not update relation metadata in get_page_at_lsn 2021-07-10 16:33:21 +03:00
Konstantin Knizhnik
11b6d29210 Refactring after Heikki review 2021-07-10 16:33:21 +03:00
Konstantin Knizhnik
29f885b37d Replay non-relational WAL records on page server 2021-07-10 16:33:21 +03:00
23 changed files with 2014 additions and 456 deletions

View File

@@ -2,6 +2,7 @@
UNAME_S := $(shell uname -s) UNAME_S := $(shell uname -s)
ifeq ($(UNAME_S),Linux) ifeq ($(UNAME_S),Linux)
SECCOMP = --with-libseccomp SECCOMP = --with-libseccomp
SECCOMP =
else else
SECCOMP = SECCOMP =
endif endif

View File

@@ -303,6 +303,8 @@ impl PostgresNode {
), ),
)?; )?;
fs::create_dir_all(self.pgdata().join("pg_wal"))?;
fs::create_dir_all(self.pgdata().join("pg_wal").join("archive_status"))?;
Ok(()) Ok(())
} }

View File

@@ -4,181 +4,261 @@
//! TODO: this module has nothing to do with PostgreSQL pg_basebackup. //! TODO: this module has nothing to do with PostgreSQL pg_basebackup.
//! It could use a better name. //! It could use a better name.
//! //!
//! Stateless Postgres compute node is launched by sending tarball which contains non-relational data (multixacts, clog, filenodemaps, twophase files)
//! and generate pg_control and dummy segment of WAL. This module is responsible for creation of such tarball from snapshot directory and
//! data stored in object storage.
//!
use crate::ZTimelineId; use crate::ZTimelineId;
use bytes::{BufMut, BytesMut};
use log::*; use log::*;
use std::io::Write; use std::io::Write;
use std::sync::Arc; use std::sync::Arc;
use tar::Builder; use std::time::SystemTime;
use tar::{Builder, Header};
use walkdir::WalkDir; use walkdir::WalkDir;
use crate::object_key::*;
use crate::repository::Timeline; use crate::repository::Timeline;
use postgres_ffi::relfile_utils::*; use postgres_ffi::relfile_utils::*;
use postgres_ffi::xlog_utils::*;
use postgres_ffi::*;
use zenith_utils::lsn::Lsn; use zenith_utils::lsn::Lsn;
/// /// This is short-living object only for the time of tarball creation,
/// Generate tarball with non-relational files from repository /// created mostly to avoid passing a lot of parameters between various functions
/// /// used for constructing tarball.
pub fn send_tarball_at_lsn( pub struct Basebackup<'a> {
write: &mut dyn Write, ar: Builder<&'a mut dyn Write>,
timelineid: ZTimelineId, timeline: &'a Arc<dyn Timeline>,
_timeline: &Arc<dyn Timeline>, lsn: Lsn,
_lsn: Lsn, snappath: String,
snapshot_lsn: Lsn, slru_buf: [u8; pg_constants::SLRU_SEG_SIZE],
) -> anyhow::Result<()> { slru_segno: u32,
let mut ar = Builder::new(write); slru_path: &'static str,
let snappath = format!("timelines/{}/snapshots/{:016X}", timelineid, snapshot_lsn.0);
let walpath = format!("timelines/{}/wal", timelineid);
debug!("sending tarball of snapshot in {}", snappath);
for entry in WalkDir::new(&snappath) {
let entry = entry?;
let fullpath = entry.path();
let relpath = entry.path().strip_prefix(&snappath).unwrap();
if relpath.to_str().unwrap() == "" {
continue;
}
if entry.file_type().is_dir() {
trace!(
"sending dir {} as {}",
fullpath.display(),
relpath.display()
);
ar.append_dir(relpath, fullpath)?;
} else if entry.file_type().is_symlink() {
error!("ignoring symlink in snapshot dir");
} else if entry.file_type().is_file() {
// Shared catalogs are exempt
if relpath.starts_with("global/") {
trace!("sending shared catalog {}", relpath.display());
ar.append_path_with_name(fullpath, relpath)?;
} else if !is_rel_file_path(relpath.to_str().unwrap()) {
trace!("sending {}", relpath.display());
ar.append_path_with_name(fullpath, relpath)?;
} else {
trace!("not sending {}", relpath.display());
}
} else {
error!("unknown file type: {}", fullpath.display());
}
}
// FIXME: Also send all the WAL. The compute node would only need
// the WAL that applies to non-relation files, because the page
// server handles all the relation files. But we don't have a
// mechanism for separating relation and non-relation WAL at the
// moment.
for entry in std::fs::read_dir(&walpath)? {
let entry = entry?;
let fullpath = &entry.path();
let relpath = fullpath.strip_prefix(&walpath).unwrap();
if !entry.path().is_file() {
continue;
}
let archive_fname = relpath.to_str().unwrap();
let archive_fname = archive_fname
.strip_suffix(".partial")
.unwrap_or(&archive_fname);
let archive_path = "pg_wal/".to_owned() + archive_fname;
ar.append_path_with_name(fullpath, archive_path)?;
}
ar.finish()?;
debug!("all tarred up!");
Ok(())
} }
/// impl<'a> Basebackup<'a> {
/// Send a tarball containing a snapshot of all non-relation files in the pub fn new(
/// PostgreSQL data directory, at given LSN write: &'a mut dyn Write,
/// timelineid: ZTimelineId,
/// There must be a snapshot at the given LSN in the snapshots directory, we cannot timeline: &'a Arc<dyn Timeline>,
/// reconstruct the state at an arbitrary LSN at the moment. lsn: Lsn,
/// snapshot_lsn: Lsn,
pub fn send_snapshot_tarball( ) -> Basebackup<'a> {
write: &mut dyn Write, Basebackup {
timelineid: ZTimelineId, ar: Builder::new(write),
snapshotlsn: Lsn, timeline,
) -> Result<(), std::io::Error> { lsn,
let mut ar = Builder::new(write); snappath: format!("timelines/{}/snapshots/{:016X}", timelineid, snapshot_lsn.0),
slru_path: "",
let snappath = format!("timelines/{}/snapshots/{:016X}", timelineid, snapshotlsn.0); slru_segno: u32::MAX,
let walpath = format!("timelines/{}/wal", timelineid); slru_buf: [0u8; pg_constants::SLRU_SEG_SIZE],
debug!("sending tarball of snapshot in {}", snappath);
//ar.append_dir_all("", &snappath)?;
for entry in WalkDir::new(&snappath) {
let entry = entry?;
let fullpath = entry.path();
let relpath = entry.path().strip_prefix(&snappath).unwrap();
if relpath.to_str().unwrap() == "" {
continue;
} }
}
if entry.file_type().is_dir() { pub fn send_tarball(&mut self) -> anyhow::Result<()> {
trace!( debug!("sending tarball of snapshot in {}", self.snappath);
"sending dir {} as {}", for entry in WalkDir::new(&self.snappath) {
fullpath.display(), let entry = entry?;
relpath.display() let fullpath = entry.path();
); let relpath = entry.path().strip_prefix(&self.snappath).unwrap();
ar.append_dir(relpath, fullpath)?;
} else if entry.file_type().is_symlink() {
error!("ignoring symlink in snapshot dir");
} else if entry.file_type().is_file() {
// Shared catalogs are exempt
if relpath.starts_with("global/") {
trace!("sending shared catalog {}", relpath.display());
ar.append_path_with_name(fullpath, relpath)?;
} else if !is_rel_file_path(relpath.to_str().unwrap()) {
trace!("sending {}", relpath.display());
ar.append_path_with_name(fullpath, relpath)?;
} else {
trace!("not sending {}", relpath.display());
// FIXME: For now, also send all the relation files. if relpath.to_str().unwrap() == "" {
// This really shouldn't be necessary, and kind of continue;
// defeats the point of having a page server in the
// first place. But it is useful at least when
// debugging with the DEBUG_COMPARE_LOCAL option (see
// vendor/postgres/src/backend/storage/smgr/pagestore_smgr.c)
ar.append_path_with_name(fullpath, relpath)?;
} }
if entry.file_type().is_dir() {
trace!(
"sending dir {} as {}",
fullpath.display(),
relpath.display()
);
self.ar.append_dir(relpath, fullpath)?;
} else if entry.file_type().is_symlink() {
error!("ignoring symlink in snapshot dir");
} else if entry.file_type().is_file() {
if !is_rel_file_path(relpath.to_str().unwrap()) {
if entry.file_name() != "pg_filenode.map" // this files will be generated from object storage
&& !relpath.starts_with("pg_xact/")
&& !relpath.starts_with("pg_multixact/")
{
trace!("sending {}", relpath.display());
self.ar.append_path_with_name(fullpath, relpath)?;
}
} else {
// relation pages are loaded on demand and should not be included in tarball
trace!("not sending {}", relpath.display());
}
} else {
error!("unknown file type: {}", fullpath.display());
}
}
// Generate non-relational files.
// Iteration is sorted order: all objects of the same time are grouped and traversed
// in key ascending order. For example all pg_xact records precede pg_multixact records and are sorted by block number.
// It allows to easily construct SLRU segments (32 blocks).
for obj in self.timeline.list_nonrels(self.lsn)? {
match obj {
ObjectTag::Clog(slru) => self.add_slru_segment("pg_xact", &obj, slru.blknum)?,
ObjectTag::MultiXactMembers(slru) => {
self.add_slru_segment("pg_multixact/members", &obj, slru.blknum)?
}
ObjectTag::MultiXactOffsets(slru) => {
self.add_slru_segment("pg_multixact/offsets", &obj, slru.blknum)?
}
ObjectTag::FileNodeMap(db) => self.add_relmap_file(&obj, &db)?,
ObjectTag::TwoPhase(prepare) => self.add_twophase_file(&obj, prepare.xid)?,
_ => {}
}
}
self.finish_slru_segment()?; // write last non-completed SLRU segment (if any)
self.add_pgcontrol_file()?;
self.ar.finish()?;
debug!("all tarred up!");
Ok(())
}
//
// Generate SRLU segment files from repository. Path identifiers SLRU kind (pg_xact, pg_multixact/members, ...).
// Intiallly pass is empty string.
//
fn add_slru_segment(
&mut self,
path: &'static str,
tag: &ObjectTag,
page: u32,
) -> anyhow::Result<()> {
let img = self.timeline.get_page_at_lsn_nowait(*tag, self.lsn)?;
// Zero length image indicates truncated segment: just skip it
if !img.is_empty() {
assert!(img.len() == pg_constants::BLCKSZ as usize);
let segno = page / pg_constants::SLRU_PAGES_PER_SEGMENT;
if self.slru_path != "" && (self.slru_segno != segno || self.slru_path != path) {
// Switch to new segment: save old one
let segname = format!("{}/{:>04X}", self.slru_path, self.slru_segno);
let header = new_tar_header(&segname, pg_constants::SLRU_SEG_SIZE as u64)?;
self.ar.append(&header, &self.slru_buf[..])?;
self.slru_buf = [0u8; pg_constants::SLRU_SEG_SIZE]; // reinitialize segment buffer
}
self.slru_segno = segno;
self.slru_path = path;
let offs_start = (page % pg_constants::SLRU_PAGES_PER_SEGMENT) as usize
* pg_constants::BLCKSZ as usize;
let offs_end = offs_start + pg_constants::BLCKSZ as usize;
self.slru_buf[offs_start..offs_end].copy_from_slice(&img);
}
Ok(())
}
//
// We flush SLRU segments to the tarball once them are completed.
// This method is used to flush last (may be incompleted) segment.
//
fn finish_slru_segment(&mut self) -> anyhow::Result<()> {
if self.slru_path != "" {
// is there is some incompleted segment
let segname = format!("{}/{:>04X}", self.slru_path, self.slru_segno);
let header = new_tar_header(&segname, pg_constants::SLRU_SEG_SIZE as u64)?;
self.ar.append(&header, &self.slru_buf[..])?;
}
Ok(())
}
//
// Extract pg_filenode.map files from repository
//
fn add_relmap_file(&mut self, tag: &ObjectTag, db: &DatabaseTag) -> anyhow::Result<()> {
let img = self.timeline.get_page_at_lsn_nowait(*tag, self.lsn)?;
info!("add_relmap_file {:?}", db);
let path = if db.spcnode == pg_constants::GLOBALTABLESPACE_OID {
String::from("global/pg_filenode.map") // filenode map for global tablespace
} else { } else {
error!("unknown file type: {}", fullpath.display()); // User defined tablespaces are not supported
} assert!(db.spcnode == pg_constants::DEFAULTTABLESPACE_OID);
let src_path = format!("{}/base/1/PG_VERSION", self.snappath);
let dst_path = format!("base/{}/PG_VERSION", db.dbnode);
self.ar.append_path_with_name(&src_path, &dst_path)?;
format!("base/{}/pg_filenode.map", db.dbnode)
};
assert!(img.len() == 512);
let header = new_tar_header(&path, img.len() as u64)?;
self.ar.append(&header, &img[..])?;
Ok(())
} }
// FIXME: Also send all the WAL. The compute node would only need //
// the WAL that applies to non-relation files, because the page // Extract twophase state files
// server handles all the relation files. But we don't have a //
// mechanism for separating relation and non-relation WAL at the fn add_twophase_file(&mut self, tag: &ObjectTag, xid: TransactionId) -> anyhow::Result<()> {
// moment. // Include in tarball two-phase files only of in-progress transactions
for entry in std::fs::read_dir(&walpath)? { if self.timeline.get_tx_status(xid, self.lsn)?
let entry = entry?; == pg_constants::TRANSACTION_STATUS_IN_PROGRESS
let fullpath = &entry.path(); {
let relpath = fullpath.strip_prefix(&walpath).unwrap(); let img = self.timeline.get_page_at_lsn_nowait(*tag, self.lsn)?;
let mut buf = BytesMut::new();
if !entry.path().is_file() { buf.extend_from_slice(&img[..]);
continue; let crc = crc32c::crc32c(&img[..]);
buf.put_u32_le(crc);
let path = format!("pg_twophase/{:>08X}", xid);
let header = new_tar_header(&path, buf.len() as u64)?;
self.ar.append(&header, &buf[..])?;
} }
Ok(())
let archive_fname = relpath.to_str().unwrap();
let archive_fname = archive_fname
.strip_suffix(".partial")
.unwrap_or(&archive_fname);
let archive_path = "pg_wal/".to_owned() + archive_fname;
ar.append_path_with_name(fullpath, archive_path)?;
} }
ar.finish()?; //
debug!("all tarred up!"); // Add generated pg_control file
Ok(()) //
fn add_pgcontrol_file(&mut self) -> anyhow::Result<()> {
let checkpoint_bytes = self
.timeline
.get_page_at_lsn_nowait(ObjectTag::Checkpoint, self.lsn)?;
let pg_control_bytes = self
.timeline
.get_page_at_lsn_nowait(ObjectTag::ControlFile, self.lsn)?;
let mut pg_control = ControlFileData::decode(&pg_control_bytes)?;
let mut checkpoint = CheckPoint::decode(&checkpoint_bytes)?;
// Generate new pg_control and WAL needed for bootstrap
let checkpoint_segno = self.lsn.segment_number(pg_constants::WAL_SEGMENT_SIZE);
let checkpoint_lsn = XLogSegNoOffsetToRecPtr(
checkpoint_segno,
XLOG_SIZE_OF_XLOG_LONG_PHD as u32,
pg_constants::WAL_SEGMENT_SIZE,
);
checkpoint.redo = self.lsn.0 + self.lsn.calc_padding(8u32);
//reset some fields we don't want to preserve
checkpoint.oldestActiveXid = 0;
//save new values in pg_control
pg_control.checkPoint = checkpoint_lsn;
pg_control.checkPointCopy = checkpoint;
info!("pg_control.state = {}", pg_control.state);
pg_control.state = pg_constants::DB_SHUTDOWNED;
// add zenith.signal file
self.ar
.append(&new_tar_header("zenith.signal", 0)?, &b""[..])?;
//send pg_control
let pg_control_bytes = pg_control.encode();
let header = new_tar_header("global/pg_control", pg_control_bytes.len() as u64)?;
self.ar.append(&header, &pg_control_bytes[..])?;
//send wal segment
let wal_file_name = XLogFileName(
1, // FIXME: always use Postgres timeline 1
checkpoint_segno,
pg_constants::WAL_SEGMENT_SIZE,
);
let wal_file_path = format!("pg_wal/{}", wal_file_name);
let header = new_tar_header(&wal_file_path, pg_constants::WAL_SEGMENT_SIZE as u64)?;
let wal_seg = generate_wal_segment(&pg_control);
self.ar.append(&header, &wal_seg[..])?;
Ok(())
}
} }
/// ///
@@ -228,6 +308,28 @@ fn parse_rel_file_path(path: &str) -> Result<(), FilePathError> {
} }
} }
//
// Check if it is relational file
//
fn is_rel_file_path(path: &str) -> bool { fn is_rel_file_path(path: &str) -> bool {
parse_rel_file_path(path).is_ok() parse_rel_file_path(path).is_ok()
} }
//
// Create new tarball entry header
//
fn new_tar_header(path: &str, size: u64) -> anyhow::Result<Header> {
let mut header = Header::new_gnu();
header.set_size(size);
header.set_path(path)?;
header.set_mode(0b110000000); // -rw-------
header.set_mtime(
// use currenttime as last modified time
SystemTime::now()
.duration_since(SystemTime::UNIX_EPOCH)
.unwrap()
.as_secs(),
);
header.set_cksum();
Ok(header)
}

View File

@@ -100,7 +100,8 @@ pub fn init_repo(conf: &'static PageServerConf, repo_dir: &Path) -> Result<()> {
// and we failed to run initdb again in the same directory. This has been solved for the // and we failed to run initdb again in the same directory. This has been solved for the
// rapid init+start case now, but the general race condition remains if you restart the // rapid init+start case now, but the general race condition remains if you restart the
// server quickly. // server quickly.
let storage = crate::rocksdb_storage::RocksObjectStore::create(conf)?; //let storage = crate::rocksdb_storage::RocksObjectStore::create(conf)?;
let storage = crate::inmem_storage::InmemObjectStore::create(conf)?;
let repo = crate::object_repository::ObjectRepository::new( let repo = crate::object_repository::ObjectRepository::new(
conf, conf,
@@ -127,9 +128,6 @@ pub fn init_repo(conf: &'static PageServerConf, repo_dir: &Path) -> Result<()> {
// Remove pg_wal // Remove pg_wal
fs::remove_dir_all(tmppath.join("pg_wal"))?; fs::remove_dir_all(tmppath.join("pg_wal"))?;
force_crash_recovery(&tmppath)?;
println!("updated pg_control");
// Move the data directory as an initial base backup. // Move the data directory as an initial base backup.
// FIXME: It would be enough to only copy the non-relation files here, the relation // FIXME: It would be enough to only copy the non-relation files here, the relation
// data was already loaded into the repository. // data was already loaded into the repository.
@@ -345,27 +343,6 @@ fn parse_point_in_time(conf: &PageServerConf, s: &str) -> Result<PointInTime> {
bail!("could not parse point-in-time {}", s); bail!("could not parse point-in-time {}", s);
} }
// If control file says the cluster was shut down cleanly, modify it, to mark
// it as crashed. That forces crash recovery when you start the cluster.
//
// FIXME:
// We currently do this to the initial snapshot in "zenith init". It would
// be more natural to do this when the snapshot is restored instead, but we
// currently don't have any code to create new snapshots, so it doesn't matter
// Or better yet, use a less hacky way of putting the cluster into recovery.
// Perhaps create a backup label file in the data directory when it's restored.
fn force_crash_recovery(datadir: &Path) -> Result<()> {
// Read in the control file
let controlfilepath = datadir.to_path_buf().join("global").join("pg_control");
let mut controlfile = ControlFileData::decode(&fs::read(controlfilepath.as_path())?)?;
controlfile.state = postgres_ffi::DBState_DB_IN_PRODUCTION;
fs::write(controlfilepath.as_path(), controlfile.encode())?;
Ok(())
}
fn create_timeline(conf: &PageServerConf, ancestor: Option<PointInTime>) -> Result<ZTimelineId> { fn create_timeline(conf: &PageServerConf, ancestor: Option<PointInTime>) -> Result<ZTimelineId> {
// Create initial timeline // Create initial timeline
let mut tli_buf = [0u8; 16]; let mut tli_buf = [0u8; 16];

View File

@@ -0,0 +1,349 @@
//!
//! An implementation of the ObjectStore interface, backed by BTreeMap
//!
use crate::object_key::*;
use crate::object_store::ObjectStore;
use crate::repository::RelTag;
use crate::PageServerConf;
use crate::ZTimelineId;
use anyhow::{bail, Result};
use std::collections::{BTreeMap,HashSet};
use std::sync::RwLock;
use zenith_utils::lsn::Lsn;
use std::ops::Bound::*;
use serde::{Deserialize, Serialize};
use zenith_utils::bin_ser::BeSer;
use std::io::prelude::*;
use std::fs::File;
#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Deserialize, Serialize)]
pub struct StorageKey {
obj_key: ObjectKey,
lsn: Lsn,
}
impl StorageKey {
/// The first key for a given timeline
fn timeline_start(timeline: ZTimelineId) -> Self {
Self {
obj_key: ObjectKey {
timeline,
tag: ObjectTag::FirstTag,
},
lsn: Lsn(0),
}
}
}
pub struct InmemObjectStore {
conf: &'static PageServerConf,
db: RwLock<BTreeMap<StorageKey, Vec<u8>>>,
}
impl ObjectStore for InmemObjectStore {
fn get(&self, key: &ObjectKey, lsn: Lsn) -> Result<Vec<u8>> {
let db = self.db.read().unwrap();
let val = db.get(&StorageKey {
obj_key: key.clone(),
lsn,
});
if let Some(val) = val {
Ok(val.clone())
} else {
bail!("could not find page {:?}", key);
}
}
fn get_next_key(&self, key: &ObjectKey) -> Result<Option<ObjectKey>> {
let search_key = StorageKey {
obj_key: key.clone(),
lsn: Lsn(0),
};
let db = self.db.read().unwrap();
for pair in db.range(&search_key..) {
let key = pair.0;
return Ok(Some(key.obj_key.clone()));
}
Ok(None)
}
fn put(&self, key: &ObjectKey, lsn: Lsn, value: &[u8]) -> Result<()> {
let mut db = self.db.write().unwrap();
db.insert(
StorageKey {
obj_key: key.clone(),
lsn,
},
value.to_vec(),
);
Ok(())
}
fn unlink(&self, key: &ObjectKey, lsn: Lsn) -> Result<()> {
let mut db = self.db.write().unwrap();
db.remove(&StorageKey {
obj_key: key.clone(),
lsn,
});
Ok(())
}
/// Iterate through page versions of given page, starting from the given LSN.
/// The versions are walked in descending LSN order.
fn object_versions<'a>(
&'a self,
key: &ObjectKey,
lsn: Lsn,
) -> Result<Box<dyn Iterator<Item = (Lsn, Vec<u8>)> + 'a>> {
let from = StorageKey {
obj_key: key.clone(),
lsn: Lsn(0),
};
let till = StorageKey {
obj_key: key.clone(),
lsn,
};
let db = self.db.read().unwrap();
let versions: Vec<(Lsn, Vec<u8>)> = db.range(from..=till).map(|pair|(pair.0.lsn, pair.1.clone())).collect();
Ok(Box::new(InmemObjectVersionIter::new(versions)))
}
/// Iterate through all timeline objects
fn list_objects<'a>(
&'a self,
timeline: ZTimelineId,
nonrel_only: bool,
lsn: Lsn,
) -> Result<Box<dyn Iterator<Item = ObjectTag> + 'a>> {
let curr_key = StorageKey::timeline_start(timeline);
Ok(Box::new(InmemObjectIter {
store: &self,
curr_key,
timeline,
nonrel_only,
lsn,
}))
}
/// Get a list of all distinct relations in given tablespace and database.
///
/// TODO: This implementation is very inefficient, it scans
/// through all entries in the given database. In practice, this
/// is used for CREATE DATABASE, and usually the template database is small.
/// But if it's not, this will be slow.
fn list_rels(
&self,
timelineid: ZTimelineId,
spcnode: u32,
dbnode: u32,
lsn: Lsn,
) -> Result<HashSet<RelTag>> {
// FIXME: This scans everything. Very slow
let mut rels: HashSet<RelTag> = HashSet::new();
let mut search_rel_tag = RelTag {
spcnode,
dbnode,
relnode: 0,
forknum: 0u8,
};
let db = self.db.read().unwrap();
'outer: loop {
let search_key = StorageKey {
obj_key: ObjectKey {
timeline: timelineid,
tag: ObjectTag::RelationMetadata(search_rel_tag),
},
lsn: Lsn(0),
};
for pair in db.range(&search_key..) {
let key = pair.0;
if let ObjectTag::RelationMetadata(rel_tag) = key.obj_key.tag {
if spcnode != 0 && rel_tag.spcnode != spcnode
|| dbnode != 0 && rel_tag.dbnode != dbnode
{
break 'outer;
}
if key.lsn <= lsn {
// visible in this snapshot
rels.insert(rel_tag);
}
search_rel_tag = rel_tag;
// skip to next relation
// FIXME: What if relnode is u32::MAX ?
search_rel_tag.relnode += 1;
continue 'outer;
} else {
// no more relation metadata entries
break 'outer;
}
}
}
Ok(rels)
}
/// Iterate through versions of all objects in a timeline.
///
/// Returns objects in increasing key-version order.
/// Returns all versions up to and including the specified LSN.
fn objects<'a>(
&'a self,
timeline: ZTimelineId,
lsn: Lsn,
) -> Result<Box<dyn Iterator<Item = Result<(ObjectTag, Lsn, Vec<u8>)>> + 'a>> {
let curr_key = StorageKey::timeline_start(timeline);
Ok(Box::new(InmemObjects {
store: &self,
curr_key,
timeline,
lsn,
}))
}
fn compact(&self) {
}
}
impl Drop for InmemObjectStore {
fn drop(&mut self) {
let path = self.conf.workdir.join("objstore.dmp");
let mut f = File::create(path).unwrap();
f.write(&self.db.ser().unwrap()).unwrap();
}
}
impl InmemObjectStore {
pub fn open(conf: &'static PageServerConf) -> Result<InmemObjectStore> {
let path = conf.workdir.join("objstore.dmp");
let mut f = File::open(path)?;
let mut buffer = Vec::new();
// read the whole file
f.read_to_end(&mut buffer)?;
let db = RwLock::new(BTreeMap::des(&buffer)?);
Ok(InmemObjectStore {
conf: conf,
db
})
}
pub fn create(conf: &'static PageServerConf) -> Result<InmemObjectStore> {
Ok(InmemObjectStore {
conf: conf,
db: RwLock::new(BTreeMap::new()),
})
}
}
///
/// Iterator for `object_versions`. Returns all page versions of a given block, in
/// reverse LSN order.
///
struct InmemObjectVersionIter {
versions: Vec<(Lsn, Vec<u8>)>,
curr: usize,
}
impl InmemObjectVersionIter {
fn new(versions: Vec<(Lsn, Vec<u8>)>) -> InmemObjectVersionIter {
let curr = versions.len();
InmemObjectVersionIter {
versions,
curr
}
}
}
impl Iterator for InmemObjectVersionIter {
type Item = (Lsn, Vec<u8>);
fn next(&mut self) -> std::option::Option<Self::Item> {
if self.curr == 0 {
None
} else {
self.curr -= 1;
Some(self.versions[self.curr].clone())
}
}
}
struct InmemObjects<'r> {
store: &'r InmemObjectStore,
curr_key: StorageKey,
timeline: ZTimelineId,
lsn: Lsn,
}
impl<'r> Iterator for InmemObjects<'r> {
// TODO consider returning Box<[u8]>
type Item = Result<(ObjectTag, Lsn, Vec<u8>)>;
fn next(&mut self) -> Option<Self::Item> {
self.next_result().transpose()
}
}
impl<'r> InmemObjects<'r> {
fn next_result(&mut self) -> Result<Option<(ObjectTag, Lsn, Vec<u8>)>> {
let db = self.store.db.read().unwrap();
for pair in db.range((Excluded(&self.curr_key),Unbounded)) {
let key = pair.0;
if key.obj_key.timeline != self.timeline {
return Ok(None);
}
if key.lsn > self.lsn {
// TODO can speed up by seeking iterator
continue;
}
self.curr_key = key.clone();
let value = pair.1.clone();
return Ok(Some((key.obj_key.tag, key.lsn, value)));
}
Ok(None)
}
}
///
/// Iterator for `list_objects`. Returns all objects preceeding specified LSN
///
struct InmemObjectIter<'a> {
store: &'a InmemObjectStore,
curr_key: StorageKey,
timeline: ZTimelineId,
nonrel_only: bool,
lsn: Lsn,
}
impl<'a> Iterator for InmemObjectIter<'a> {
type Item = ObjectTag;
fn next(&mut self) -> std::option::Option<Self::Item> {
let db = self.store.db.read().unwrap();
'outer: loop {
for pair in db.range((Excluded(&self.curr_key),Unbounded)) {
let key = pair.0;
if key.obj_key.timeline != self.timeline {
return None;
}
self.curr_key = key.clone();
self.curr_key.lsn = Lsn(u64::MAX); // next seek should skip all versions
if key.lsn <= self.lsn {
// visible in this snapshot
if self.nonrel_only {
match key.obj_key.tag {
ObjectTag::RelationMetadata(_) => return None,
ObjectTag::RelationBuffer(_) => return None,
_ => return Some(key.obj_key.tag),
}
} else {
return Some(key.obj_key.tag);
}
}
continue 'outer;
}
return None;
}
}
}

View File

@@ -15,6 +15,7 @@ pub mod page_service;
pub mod repository; pub mod repository;
pub mod restore_local_repo; pub mod restore_local_repo;
pub mod rocksdb_storage; pub mod rocksdb_storage;
pub mod inmem_storage;
pub mod tui; pub mod tui;
pub mod tui_event; pub mod tui_event;
mod tui_logger; mod tui_logger;
@@ -103,7 +104,7 @@ impl PageServerConf {
/// is separate from PostgreSQL timelines, and doesn't have those /// is separate from PostgreSQL timelines, and doesn't have those
/// limitations. A zenith timeline is identified by a 128-bit ID, which /// limitations. A zenith timeline is identified by a 128-bit ID, which
/// is usually printed out as a hex string. /// is usually printed out as a hex string.
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize)] #[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash, Serialize, Deserialize)]
pub struct ZTimelineId([u8; 16]); pub struct ZTimelineId([u8; 16]);
impl FromStr for ZTimelineId { impl FromStr for ZTimelineId {

View File

@@ -1,4 +1,5 @@
use crate::repository::{BufferTag, RelTag}; use crate::repository::{BufferTag, RelTag};
use crate::waldecoder::TransactionId;
use crate::ZTimelineId; use crate::ZTimelineId;
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
@@ -7,12 +8,54 @@ use serde::{Deserialize, Serialize};
/// repository. It is shared between object_repository.rs and object_store.rs. /// repository. It is shared between object_repository.rs and object_store.rs.
/// It is mostly opaque to ObjectStore, it just stores and retrieves objects /// It is mostly opaque to ObjectStore, it just stores and retrieves objects
/// using the key given by the caller. /// using the key given by the caller.
#[derive(Debug, Clone, Serialize, Deserialize)] #[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq, PartialOrd, Ord)]
pub struct ObjectKey { pub struct ObjectKey {
pub timeline: ZTimelineId, pub timeline: ZTimelineId,
pub tag: ObjectTag, pub tag: ObjectTag,
} }
///
/// Non-relation transaction status files (clog (a.k.a. pg_xact) and pg_multixact)
/// in Postgres are handled by SLRU (Simple LRU) buffer, hence the name.
///
/// These files are global for a postgres instance.
///
/// These files are divided into segments, which are divided into pages
/// of the same BLCKSZ as used for relation files.
///
#[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq, Eq, PartialOrd, Ord)]
pub struct SlruBufferTag {
pub blknum: u32,
}
///
/// Special type of Postgres files: pg_filenode.map is needed to map
/// catalog table OIDs to filenode numbers, which define filename.
///
/// Each database has a map file for its local mapped catalogs,
/// and there is a separate map file for shared catalogs.
///
/// These files have untypical size of 512 bytes.
///
/// See PostgreSQL relmapper.c for details.
///
#[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq, Eq, PartialOrd, Ord)]
pub struct DatabaseTag {
pub spcnode: u32,
pub dbnode: u32,
}
///
/// Non-relation files that keep state for prepared transactions.
/// Unlike other files these are not divided into pages.
///
/// See PostgreSQL twophase.c for details.
///
#[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq, Eq, PartialOrd, Ord)]
pub struct PrepareTag {
pub xid: TransactionId,
}
/// ObjectTag is a part of ObjectKey that is specific to the type of /// ObjectTag is a part of ObjectKey that is specific to the type of
/// the stored object. /// the stored object.
/// ///
@@ -21,7 +64,21 @@ pub struct ObjectKey {
/// ///
#[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq, Eq, PartialOrd, Ord)] #[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq, Eq, PartialOrd, Ord)]
pub enum ObjectTag { pub enum ObjectTag {
// dummy tag preceeding all other keys
FirstTag,
TimelineMetadataTag, TimelineMetadataTag,
// Special entry that represents PostgreSQL checkpoint.
// We use it to track fields needed to restore controlfile checkpoint.
Checkpoint,
// Various types of non-relation files.
// We need them to bootstrap compute node.
ControlFile,
Clog(SlruBufferTag),
MultiXactMembers(SlruBufferTag),
MultiXactOffsets(SlruBufferTag),
FileNodeMap(DatabaseTag),
TwoPhase(PrepareTag),
// put relations at the end of enum to allow efficient iterations through non-rel objects
RelationMetadata(RelTag), RelationMetadata(RelTag),
RelationBuffer(BufferTag), RelationBuffer(BufferTag),
} }

View File

@@ -22,8 +22,8 @@ use crate::{PageServerConf, ZTimelineId};
use anyhow::{bail, Context, Result}; use anyhow::{bail, Context, Result};
use bytes::Bytes; use bytes::Bytes;
use log::*; use log::*;
use postgres_ffi::pg_constants;
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
use std::cmp::max;
use std::collections::{BTreeMap, HashMap, HashSet}; use std::collections::{BTreeMap, HashMap, HashSet};
use std::convert::TryInto; use std::convert::TryInto;
use std::sync::{Arc, Mutex, RwLock}; use std::sync::{Arc, Mutex, RwLock};
@@ -139,8 +139,7 @@ impl Repository for ObjectRepository {
/// Branch a timeline /// Branch a timeline
fn branch_timeline(&self, src: ZTimelineId, dst: ZTimelineId, at_lsn: Lsn) -> Result<()> { fn branch_timeline(&self, src: ZTimelineId, dst: ZTimelineId, at_lsn: Lsn) -> Result<()> {
// just to check the source timeline exists let src_timeline = self.get_timeline(src)?;
let _ = self.get_timeline(src)?;
// Write a metadata key, noting the ancestor of th new timeline. There is initially // Write a metadata key, noting the ancestor of th new timeline. There is initially
// no data in it, but all the read-calls know to look into the ancestor. // no data in it, but all the read-calls know to look into the ancestor.
@@ -157,6 +156,18 @@ impl Repository for ObjectRepository {
&ObjectValue::ser(&val)?, &ObjectValue::ser(&val)?,
)?; )?;
// Copy non-rel objects
for tag in src_timeline.list_nonrels(at_lsn)? {
match tag {
ObjectTag::TimelineMetadataTag => {} // skip it
_ => {
let img = src_timeline.get_page_at_lsn_nowait(tag, at_lsn)?;
let val = ObjectValue::Page(PageEntry::Page(img));
let key = ObjectKey { timeline: dst, tag };
self.obj_store.put(&key, at_lsn, &ObjectValue::ser(&val)?)?;
}
}
}
Ok(()) Ok(())
} }
} }
@@ -247,12 +258,61 @@ impl Timeline for ObjectTimeline {
//------------------------------------------------------------------------------ //------------------------------------------------------------------------------
/// Look up given page in the cache. /// Look up given page in the cache.
fn get_page_at_lsn(&self, tag: BufferTag, req_lsn: Lsn) -> Result<Bytes> { fn get_page_at_lsn(&self, tag: ObjectTag, req_lsn: Lsn) -> Result<Bytes> {
let lsn = self.wait_lsn(req_lsn)?; let lsn = self.wait_lsn(req_lsn)?;
self.get_page_at_lsn_nowait(tag, lsn) self.get_page_at_lsn_nowait(tag, lsn)
} }
fn get_page_at_lsn_nowait(&self, tag: ObjectTag, req_lsn: Lsn) -> Result<Bytes> {
const ZERO_PAGE: [u8; 8192] = [0u8; 8192];
// Look up the page entry. If it's a page image, return that. If it's a WAL record,
// ask the WAL redo service to reconstruct the page image from the WAL records.
let searchkey = ObjectKey {
timeline: self.timelineid,
tag,
};
let mut iter = self.object_versions(&*self.obj_store, &searchkey, req_lsn)?;
if let Some((lsn, value)) = iter.next().transpose()? {
let page_img: Bytes;
match ObjectValue::des(&value)? {
ObjectValue::Page(PageEntry::Page(img)) => {
page_img = img;
}
ObjectValue::Page(PageEntry::WALRecord(_rec)) => {
// Request the WAL redo manager to apply the WAL records for us.
let (base_img, records) = self.collect_records_for_apply(tag, lsn)?;
page_img = self.walredo_mgr.request_redo(tag, lsn, base_img, records)?;
// Garbage collection assumes that we remember the materialized page
// version. Otherwise we could opt to not do it, with the downside that
// the next GetPage@LSN call of the same page version would have to
// redo the WAL again.
self.put_page_image(tag, lsn, page_img.clone(), false)?;
}
ObjectValue::SLRUTruncate => page_img = Bytes::from_static(&ZERO_PAGE),
_ => bail!("Invalid object kind, expected a page entry or SRLU truncate"),
}
// FIXME: assumes little-endian. Only used for the debugging log though
let page_lsn_hi = u32::from_le_bytes(page_img.get(0..4).unwrap().try_into().unwrap());
let page_lsn_lo = u32::from_le_bytes(page_img.get(4..8).unwrap().try_into().unwrap());
trace!(
"Returning page with LSN {:X}/{:X} for {:?} from {} (request {})",
page_lsn_hi,
page_lsn_lo,
tag,
lsn,
req_lsn
);
return Ok(page_img);
}
trace!("page {:?} at {} not found", tag, req_lsn);
Ok(Bytes::from_static(&ZERO_PAGE))
/* return Err("could not find page image")?; */
}
/// Get size of relation /// Get size of relation
fn get_rel_size(&self, rel: RelTag, lsn: Lsn) -> Result<u32> { fn get_rel_size(&self, rel: RelTag, lsn: Lsn) -> Result<u32> {
let lsn = self.wait_lsn(lsn)?; let lsn = self.wait_lsn(lsn)?;
@@ -284,6 +344,11 @@ impl Timeline for ObjectTimeline {
Ok(false) Ok(false)
} }
/// Get a list of non-relational objects
fn list_nonrels<'a>(&'a self, lsn: Lsn) -> Result<Box<dyn Iterator<Item = ObjectTag> + 'a>> {
self.obj_store.list_objects(self.timelineid, true, lsn)
}
/// Get a list of all distinct relations in given tablespace and database. /// Get a list of all distinct relations in given tablespace and database.
fn list_rels(&self, spcnode: u32, dbnode: u32, lsn: Lsn) -> Result<HashSet<RelTag>> { fn list_rels(&self, spcnode: u32, dbnode: u32, lsn: Lsn) -> Result<HashSet<RelTag>> {
// List all relations in this timeline. // List all relations in this timeline.
@@ -327,86 +392,117 @@ impl Timeline for ObjectTimeline {
/// ///
/// This will implicitly extend the relation, if the page is beyond the /// This will implicitly extend the relation, if the page is beyond the
/// current end-of-file. /// current end-of-file.
fn put_wal_record(&self, tag: BufferTag, rec: WALRecord) -> Result<()> { fn put_wal_record(&self, tag: ObjectTag, rec: WALRecord) -> Result<()> {
let lsn = rec.lsn; let lsn = rec.lsn;
self.put_page_entry(&tag, lsn, PageEntry::WALRecord(rec))?; self.put_page_entry(&tag, lsn, PageEntry::WALRecord(rec))?;
debug!( debug!("put_wal_record {:?} at {}", tag, lsn);
"put_wal_record rel {} blk {} at {}",
tag.rel, tag.blknum, lsn
);
// Also check if this created or extended the file if let ObjectTag::RelationBuffer(tag) = tag {
let old_nblocks = self.relsize_get_nowait(tag.rel, lsn)?.unwrap_or(0); // Also check if this created or extended the file
let old_nblocks = self.relsize_get_nowait(tag.rel, lsn)?.unwrap_or(0);
if tag.blknum >= old_nblocks { if tag.blknum >= old_nblocks {
let new_nblocks = tag.blknum + 1; let new_nblocks = tag.blknum + 1;
trace!( trace!(
"Extended relation {} from {} to {} blocks at {}", "Extended relation {} from {} to {} blocks at {}",
tag.rel, tag.rel,
old_nblocks, old_nblocks,
new_nblocks, new_nblocks,
lsn lsn
); );
self.put_relsize_entry(&tag.rel, lsn, RelationSizeEntry::Size(new_nblocks))?; self.put_relsize_entry(&tag.rel, lsn, RelationSizeEntry::Size(new_nblocks))?;
let mut rel_meta = self.rel_meta.write().unwrap(); let mut rel_meta = self.rel_meta.write().unwrap();
rel_meta.insert( rel_meta.insert(
tag.rel, tag.rel,
RelMetadata { RelMetadata {
size: new_nblocks, size: new_nblocks,
last_updated: lsn, last_updated: lsn,
}, },
); );
}
} }
Ok(()) Ok(())
} }
/// Unlink object. This method is used for marking dropped relations. /// Unlink relation. This method is used for marking dropped relations.
fn put_unlink(&self, rel_tag: RelTag, lsn: Lsn) -> Result<()> { fn put_unlink(&self, rel_tag: RelTag, lsn: Lsn) -> Result<()> {
self.put_relsize_entry(&rel_tag, lsn, RelationSizeEntry::Unlink)?; self.put_relsize_entry(&rel_tag, lsn, RelationSizeEntry::Unlink)?;
Ok(()) Ok(())
} }
/// Truncate SRLU segment
fn put_slru_truncate(&self, tag: ObjectTag, lsn: Lsn) -> Result<()> {
let key = ObjectKey {
timeline: self.timelineid,
tag,
};
let val = ObjectValue::SLRUTruncate;
self.obj_store.put(&key, lsn, &ObjectValue::ser(&val)?)?;
Ok(())
}
fn get_next_tag(&self, tag: ObjectTag) -> Result<Option<ObjectTag>> {
let key = ObjectKey {
timeline: self.timelineid,
tag,
};
if let Some(key) = self.obj_store.get_next_key(&key)? {
Ok(Some(key.tag))
} else {
Ok(None)
}
}
fn put_raw_data(&self, tag: ObjectTag, lsn: Lsn, data: &[u8]) -> Result<()> {
let key = ObjectKey {
timeline: self.timelineid,
tag,
};
self.obj_store.put(&key, lsn, data)?;
Ok(())
}
/// ///
/// Memorize a full image of a page version /// Memorize a full image of a page version
/// ///
fn put_page_image(&self, tag: BufferTag, lsn: Lsn, img: Bytes) -> Result<()> { fn put_page_image(&self, tag: ObjectTag, lsn: Lsn, img: Bytes, update_meta: bool) -> Result<()> {
self.put_page_entry(&tag, lsn, PageEntry::Page(img))?; self.put_page_entry(&tag, lsn, PageEntry::Page(img))?;
debug!( if !update_meta {
"put_page_image rel {} blk {} at {}", return Ok(());
tag.rel, tag.blknum, lsn }
); debug!("put_page_image rel {:?} at {}", tag, lsn);
// Also check if this created or extended the file if let ObjectTag::RelationBuffer(tag) = tag {
let old_nblocks = self.relsize_get_nowait(tag.rel, lsn)?.unwrap_or(0); // Also check if this created or extended the file
let old_nblocks = self.relsize_get_nowait(tag.rel, lsn)?.unwrap_or(0);
if tag.blknum >= old_nblocks { if tag.blknum >= old_nblocks {
let new_nblocks = tag.blknum + 1; let new_nblocks = tag.blknum + 1;
trace!( trace!(
"Extended relation {} from {} to {} blocks at {}", "Extended relation {} from {} to {} blocks at {}",
tag.rel, tag.rel,
old_nblocks, old_nblocks,
new_nblocks, new_nblocks,
lsn lsn
); );
self.put_relsize_entry(&tag.rel, lsn, RelationSizeEntry::Size(new_nblocks))?; self.put_relsize_entry(&tag.rel, lsn, RelationSizeEntry::Size(new_nblocks))?;
let mut rel_meta = self.rel_meta.write().unwrap(); let mut rel_meta = self.rel_meta.write().unwrap();
rel_meta.insert( rel_meta.insert(
tag.rel, tag.rel,
RelMetadata { RelMetadata {
size: new_nblocks, size: new_nblocks,
last_updated: lsn, last_updated: lsn,
}, },
); );
}
} }
Ok(()) Ok(())
} }
@@ -526,72 +622,152 @@ impl Timeline for ObjectTimeline {
// WAL is large enough to perform GC // WAL is large enough to perform GC
let now = Instant::now(); let now = Instant::now();
// Iterate through all relations // Iterate through all objects in timeline
for rels in &self.obj_store.list_rels(self.timelineid, 0, 0, last_lsn)? { for obj in self
let rel = *rels; .obj_store
let key = relation_size_key(self.timelineid, rel); .list_objects(self.timelineid, false, last_lsn)?
let mut max_size = 0u32; {
let mut relation_dropped = false; result.inspected += 1;
match obj {
result.n_relations += 1; // Prepared transactions
ObjectTag::TwoPhase(prepare) => {
// Process relation metadata versions let key = ObjectKey {
let mut latest_version = true; timeline: self.timelineid,
for (lsn, val) in self.obj_store.object_versions(&key, horizon)? { tag: obj,
let rel_meta = ObjectValue::des_relsize(&val)?; };
// If relation is dropped at the horizon, for vers in self.obj_store.object_versions(&key, horizon)? {
// we can remove all its versions including latest (Unlink) if self.get_tx_status(prepare.xid, horizon)?
match rel_meta { != pg_constants::TRANSACTION_STATUS_IN_PROGRESS
RelationSizeEntry::Size(size) => max_size = max(max_size, size), {
RelationSizeEntry::Unlink => { let lsn = vers.0;
if latest_version { self.obj_store.unlink(&key, lsn)?;
relation_dropped = true; result.prep_deleted += 1;
info!("Relation {:?} dropped", rels);
result.dropped += 1;
} }
} }
} }
// preserve latest version, unless the relation was dropped completely. ObjectTag::RelationMetadata(_) => {
if latest_version { // Do not need to reconstruct page images,
latest_version = false; // just delete all old versions over horizon
if !relation_dropped { let mut last_version = true;
continue; let key = ObjectKey {
} timeline: self.timelineid,
} tag: obj,
self.obj_store.unlink(&key, lsn)?; };
result.deleted += 1; for vers in self.obj_store.object_versions(&key, horizon)? {
} let lsn = vers.0;
// Now process all relation blocks if last_version {
for blknum in 0..max_size { let content = vers.1;
let buf_tag = BufferTag { rel, blknum }; match ObjectValue::des(&content[..])? {
let key = ObjectKey { ObjectValue::RelationSize(RelationSizeEntry::Unlink) => {
timeline: self.timelineid, self.obj_store.unlink(&key, lsn)?;
tag: ObjectTag::RelationBuffer(buf_tag), result.deleted += 1;
}; result.dropped += 1;
let mut latest_version = true; }
let mut deleted_page_versions = 0; _ => (), // preserve last version
for (lsn, _val) in self.obj_store.object_versions(&key, horizon)? { }
// Preserve and materialize latest version before deleting all preceding versions. last_version = false;
// We let get_page_at_lsn_nowait() do the materialization. result.truncated += 1;
if latest_version { result.n_relations += 1;
latest_version = false; } else {
if !relation_dropped { self.obj_store.unlink(&key, lsn)?;
self.get_page_at_lsn_nowait(buf_tag, lsn)?; result.deleted += 1;
continue;
} }
} }
self.obj_store.unlink(&key, lsn)?;
deleted_page_versions += 1;
} }
if deleted_page_versions > 0 && !relation_dropped { ObjectTag::RelationBuffer(tag) => {
result.truncated += 1; // Reconstruct page at horizon unless relation was dropped
// and delete all older versions over horizon
let mut last_version = true;
let key = ObjectKey {
timeline: self.timelineid,
tag: obj,
};
for vers in self.obj_store.object_versions(&key, horizon)? {
let lsn = vers.0;
if last_version {
result.truncated += 1;
last_version = false;
if let Some(rel_size) = self.relsize_get_nowait(tag.rel, last_lsn)? {
if rel_size > tag.blknum {
// preserve and materialize last version before deleting all preceeding
self.get_page_at_lsn_nowait(obj, lsn)?;
continue;
}
debug!("Drop last block {} of relation {:?} at {} because it is beyond relation size {}", tag.blknum, tag.rel, lsn, rel_size);
} else {
if let Some(rel_size) =
self.relsize_get_nowait(tag.rel, last_lsn)?
{
debug!("Preserve block {} of relation {:?} at {} because relation has size {} at {}", tag.rel, tag, lsn, rel_size, last_lsn);
continue;
}
debug!("Relation {:?} was dropped at {}", tag.rel, lsn);
}
// relation was dropped or truncated so this block can be removed
}
self.obj_store.unlink(&key, lsn)?;
result.deleted += 1;
}
} }
result.deleted += deleted_page_versions; // SLRU-s
ObjectTag::Clog(_)
| ObjectTag::MultiXactOffsets(_)
| ObjectTag::MultiXactMembers(_) => {
// Remove old versions over horizon
let mut last_version = true;
let key = ObjectKey {
timeline: self.timelineid,
tag: obj,
};
for vers in self.obj_store.object_versions(&key, horizon)? {
let lsn = vers.0;
if last_version {
let content = vers.1;
match ObjectValue::des(&content[..])? {
ObjectValue::SLRUTruncate => {
self.obj_store.unlink(&key, lsn)?;
result.slru_deleted += 1;
}
ObjectValue::Page(PageEntry::WALRecord(_)) => {
// preserve and materialize last version before deleting all preceeding
self.get_page_at_lsn_nowait(obj, lsn)?;
}
_ => {} // do nothing if already materialized
}
last_version = false;
} else {
// delete deteriorated version
self.obj_store.unlink(&key, lsn)?;
result.slru_deleted += 1;
}
}
}
// versioned always materialized objects: no need to reconstruct pages
ObjectTag::Checkpoint | ObjectTag::ControlFile => {
// Remove old versions over horizon
let mut last_version = true;
let key = ObjectKey {
timeline: self.timelineid,
tag: obj,
};
for vers in self.obj_store.object_versions(&key, horizon)? {
let lsn = vers.0;
if last_version {
// preserve last version
last_version = false;
} else {
// delete deteriorated version
self.obj_store.unlink(&key, lsn)?;
result.chkp_deleted += 1;
}
}
}
_ => (), // do nothing
} }
} }
result.elapsed = now.elapsed(); result.elapsed = now.elapsed();
info!("Garbage collection completed in {:?}: {} relations inspected, {} version histories truncated, {} versions deleted, {} relations dropped", info!("Garbage collection completed in {:?}: {} relations inspected, {} object inspected, {} version histories truncated, {} versions deleted, {} relations dropped",
result.elapsed, &result.n_relations, &result.truncated, &result.deleted, &result.dropped); result.elapsed, result.n_relations, result.inspected, result.truncated, result.deleted, result.dropped);
self.obj_store.compact(); self.obj_store.compact();
} }
Ok(result) Ok(result)
@@ -599,54 +775,6 @@ impl Timeline for ObjectTimeline {
} }
impl ObjectTimeline { impl ObjectTimeline {
fn get_page_at_lsn_nowait(&self, tag: BufferTag, req_lsn: Lsn) -> Result<Bytes> {
// Look up the page entry. If it's a page image, return that. If it's a WAL record,
// ask the WAL redo service to reconstruct the page image from the WAL records.
let searchkey = ObjectKey {
timeline: self.timelineid,
tag: ObjectTag::RelationBuffer(tag),
};
let mut iter = self.object_versions(&*self.obj_store, &searchkey, req_lsn)?;
if let Some((lsn, value)) = iter.next().transpose()? {
let page_img: Bytes;
match ObjectValue::des_page(&value)? {
PageEntry::Page(img) => {
page_img = img;
}
PageEntry::WALRecord(_rec) => {
// Request the WAL redo manager to apply the WAL records for us.
let (base_img, records) = self.collect_records_for_apply(tag, lsn)?;
page_img = self.walredo_mgr.request_redo(tag, lsn, base_img, records)?;
// Garbage collection assumes that we remember the materialized page
// version. Otherwise we could opt to not do it, with the downside that
// the next GetPage@LSN call of the same page version would have to
// redo the WAL again.
self.put_page_image(tag, lsn, page_img.clone())?;
}
}
// FIXME: assumes little-endian. Only used for the debugging log though
let page_lsn_hi = u32::from_le_bytes(page_img.get(0..4).unwrap().try_into().unwrap());
let page_lsn_lo = u32::from_le_bytes(page_img.get(4..8).unwrap().try_into().unwrap());
trace!(
"Returning page with LSN {:X}/{:X} for {} blk {} from {} (request {})",
page_lsn_hi,
page_lsn_lo,
tag.rel,
tag.blknum,
lsn,
req_lsn
);
return Ok(page_img);
}
static ZERO_PAGE: [u8; 8192] = [0u8; 8192];
trace!("page {} blk {} at {} not found", tag.rel, tag.blknum, req_lsn);
Ok(Bytes::from_static(&ZERO_PAGE))
/* return Err("could not find page image")?; */
}
/// ///
/// Internal function to get relation size at given LSN. /// Internal function to get relation size at given LSN.
/// ///
@@ -700,7 +828,7 @@ impl ObjectTimeline {
/// ///
fn collect_records_for_apply( fn collect_records_for_apply(
&self, &self,
tag: BufferTag, tag: ObjectTag,
lsn: Lsn, lsn: Lsn,
) -> Result<(Option<Bytes>, Vec<WALRecord>)> { ) -> Result<(Option<Bytes>, Vec<WALRecord>)> {
let mut base_img: Option<Bytes> = None; let mut base_img: Option<Bytes> = None;
@@ -710,7 +838,7 @@ impl ObjectTimeline {
// old page image. // old page image.
let searchkey = ObjectKey { let searchkey = ObjectKey {
timeline: self.timelineid, timeline: self.timelineid,
tag: ObjectTag::RelationBuffer(tag), tag,
}; };
let mut iter = self.object_versions(&*self.obj_store, &searchkey, lsn)?; let mut iter = self.object_versions(&*self.obj_store, &searchkey, lsn)?;
while let Some((_key, value)) = iter.next().transpose()? { while let Some((_key, value)) = iter.next().transpose()? {
@@ -812,10 +940,10 @@ impl ObjectTimeline {
// //
// Helper functions to store different kinds of objects to the underlying ObjectStore // Helper functions to store different kinds of objects to the underlying ObjectStore
// //
fn put_page_entry(&self, tag: &BufferTag, lsn: Lsn, val: PageEntry) -> Result<()> { fn put_page_entry(&self, tag: &ObjectTag, lsn: Lsn, val: PageEntry) -> Result<()> {
let key = ObjectKey { let key = ObjectKey {
timeline: self.timelineid, timeline: self.timelineid,
tag: ObjectTag::RelationBuffer(*tag), tag: *tag,
}; };
let val = ObjectValue::Page(val); let val = ObjectValue::Page(val);
@@ -895,7 +1023,6 @@ impl<'a> ObjectHistory<'a> {
fn next_result(&mut self) -> Result<Option<RelationUpdate>> { fn next_result(&mut self) -> Result<Option<RelationUpdate>> {
while let Some((object_tag, lsn, value)) = self.iter.next().transpose()? { while let Some((object_tag, lsn, value)) = self.iter.next().transpose()? {
let (rel_tag, update) = match object_tag { let (rel_tag, update) = match object_tag {
ObjectTag::TimelineMetadataTag => continue,
ObjectTag::RelationMetadata(rel_tag) => { ObjectTag::RelationMetadata(rel_tag) => {
let entry = ObjectValue::des_relsize(&value)?; let entry = ObjectValue::des_relsize(&value)?;
match self.handle_relation_size(rel_tag, entry) { match self.handle_relation_size(rel_tag, entry) {
@@ -909,6 +1036,7 @@ impl<'a> ObjectHistory<'a> {
(buf_tag.rel, update) (buf_tag.rel, update)
} }
_ => continue,
}; };
return Ok(Some(RelationUpdate { return Ok(Some(RelationUpdate {
@@ -931,6 +1059,7 @@ enum ObjectValue {
Page(PageEntry), Page(PageEntry),
RelationSize(RelationSizeEntry), RelationSize(RelationSizeEntry),
TimelineMetadata(MetadataEntry), TimelineMetadata(MetadataEntry),
SLRUTruncate,
} }
/// ///

View File

@@ -34,6 +34,9 @@ pub trait ObjectStore: Send + Sync {
/// correspond to any real relation. /// correspond to any real relation.
fn get(&self, key: &ObjectKey, lsn: Lsn) -> Result<Vec<u8>>; fn get(&self, key: &ObjectKey, lsn: Lsn) -> Result<Vec<u8>>;
/// Read key greater or equal than specified
fn get_next_key(&self, key: &ObjectKey) -> Result<Option<ObjectKey>>;
/// Iterate through all page versions of one object. /// Iterate through all page versions of one object.
/// ///
/// Returns all page versions in descending LSN order, along with the LSN /// Returns all page versions in descending LSN order, along with the LSN
@@ -66,6 +69,17 @@ pub trait ObjectStore: Send + Sync {
lsn: Lsn, lsn: Lsn,
) -> Result<HashSet<RelTag>>; ) -> Result<HashSet<RelTag>>;
/// Iterate through objects tags. If nonrel_only, then only non-relationa data is iterated.
///
/// This is used to implement GC and preparing tarball for new node startup
/// Returns objects in increasing key-version order.
fn list_objects<'a>(
&'a self,
timelineid: ZTimelineId,
nonrel_only: bool,
lsn: Lsn,
) -> Result<Box<dyn Iterator<Item = ObjectTag> + 'a>>;
/// Unlink object (used by GC). This mehod may actually delete object or just mark it for deletion. /// Unlink object (used by GC). This mehod may actually delete object or just mark it for deletion.
fn unlink(&self, key: &ObjectKey, lsn: Lsn) -> Result<()>; fn unlink(&self, key: &ObjectKey, lsn: Lsn) -> Result<()>;

View File

@@ -5,7 +5,8 @@
use crate::object_repository::ObjectRepository; use crate::object_repository::ObjectRepository;
use crate::repository::Repository; use crate::repository::Repository;
use crate::rocksdb_storage::RocksObjectStore; //use crate::rocksdb_storage::RocksObjectStore;
use crate::inmem_storage::InmemObjectStore;
use crate::walredo::PostgresRedoManager; use crate::walredo::PostgresRedoManager;
use crate::PageServerConf; use crate::PageServerConf;
use lazy_static::lazy_static; use lazy_static::lazy_static;
@@ -18,7 +19,8 @@ lazy_static! {
pub fn init(conf: &'static PageServerConf) { pub fn init(conf: &'static PageServerConf) {
let mut m = REPOSITORY.lock().unwrap(); let mut m = REPOSITORY.lock().unwrap();
let obj_store = RocksObjectStore::open(conf).unwrap(); //let obj_store = RocksObjectStore::open(conf).unwrap();
let obj_store = InmemObjectStore::open(conf).unwrap();
// Set up a WAL redo manager, for applying WAL records. // Set up a WAL redo manager, for applying WAL records.
let walredo_mgr = PostgresRedoManager::new(conf); let walredo_mgr = PostgresRedoManager::new(conf);

View File

@@ -28,6 +28,7 @@ use zenith_utils::{bin_ser::BeSer, lsn::Lsn};
use crate::basebackup; use crate::basebackup;
use crate::branches; use crate::branches;
use crate::object_key::ObjectTag;
use crate::page_cache; use crate::page_cache;
use crate::repository::{BufferTag, RelTag, RelationUpdate, Update}; use crate::repository::{BufferTag, RelTag, RelationUpdate, Update};
use crate::restore_local_repo; use crate::restore_local_repo;
@@ -231,7 +232,7 @@ impl PageServerHandler {
PagestreamBeMessage::Nblocks(PagestreamStatusResponse { ok: true, n_blocks }) PagestreamBeMessage::Nblocks(PagestreamStatusResponse { ok: true, n_blocks })
} }
PagestreamFeMessage::Read(req) => { PagestreamFeMessage::Read(req) => {
let buf_tag = BufferTag { let tag = ObjectTag::RelationBuffer(BufferTag {
rel: RelTag { rel: RelTag {
spcnode: req.spcnode, spcnode: req.spcnode,
dbnode: req.dbnode, dbnode: req.dbnode,
@@ -239,9 +240,9 @@ impl PageServerHandler {
forknum: req.forknum, forknum: req.forknum,
}, },
blknum: req.blkno, blknum: req.blkno,
}; });
let read_response = match timeline.get_page_at_lsn(buf_tag, req.lsn) { let read_response = match timeline.get_page_at_lsn(tag, req.lsn) {
Ok(p) => PagestreamReadResponse { Ok(p) => PagestreamReadResponse {
ok: true, ok: true,
n_blocks: 0, n_blocks: 0,
@@ -292,14 +293,20 @@ impl PageServerHandler {
// find latest snapshot // find latest snapshot
let snapshot_lsn = let snapshot_lsn =
restore_local_repo::find_latest_snapshot(&self.conf, timelineid).unwrap(); restore_local_repo::find_latest_snapshot(&self.conf, timelineid).unwrap();
let req_lsn = lsn.unwrap_or(snapshot_lsn);
basebackup::send_tarball_at_lsn( let req_lsn = lsn.unwrap_or_else(|| timeline.get_last_valid_lsn());
&mut CopyDataSink { pgb },
timelineid, {
&timeline, let mut writer = CopyDataSink { pgb };
req_lsn, let mut basebackup = basebackup::Basebackup::new(
snapshot_lsn, &mut writer,
)?; timelineid,
&timeline,
req_lsn,
snapshot_lsn,
);
basebackup.send_tarball()?;
}
pgb.write_message(&BeMessage::CopyDone)?; pgb.write_message(&BeMessage::CopyDone)?;
debug!("CopyDone sent!"); debug!("CopyDone sent!");
@@ -412,18 +419,18 @@ impl postgres_backend::Handler for PageServerHandler {
match relation_update.update { match relation_update.update {
Update::Page { blknum, img } => { Update::Page { blknum, img } => {
let tag = BufferTag { let tag = ObjectTag::RelationBuffer(BufferTag {
rel: relation_update.rel, rel: relation_update.rel,
blknum, blknum,
}; });
timeline.put_page_image(tag, relation_update.lsn, img)?; timeline.put_page_image(tag, relation_update.lsn, img, true)?;
} }
Update::WALRecord { blknum, rec } => { Update::WALRecord { blknum, rec } => {
let tag = BufferTag { let tag = ObjectTag::RelationBuffer(BufferTag {
rel: relation_update.rel, rel: relation_update.rel,
blknum, blknum,
}; });
timeline.put_wal_record(tag, rec)?; timeline.put_wal_record(tag, rec)?;
} }
@@ -545,6 +552,24 @@ impl postgres_backend::Handler for PageServerHandler {
typlen: 8, typlen: 8,
..Default::default() ..Default::default()
}, },
RowDescriptor {
name: b"prep_deleted",
typoid: 20,
typlen: 8,
..Default::default()
},
RowDescriptor {
name: b"slru_deleted",
typoid: 20,
typlen: 8,
..Default::default()
},
RowDescriptor {
name: b"chkp_deleted",
typoid: 20,
typlen: 8,
..Default::default()
},
RowDescriptor { RowDescriptor {
name: b"dropped", name: b"dropped",
typoid: 20, typoid: 20,
@@ -562,6 +587,9 @@ impl postgres_backend::Handler for PageServerHandler {
Some(&result.n_relations.to_string().as_bytes()), Some(&result.n_relations.to_string().as_bytes()),
Some(&result.truncated.to_string().as_bytes()), Some(&result.truncated.to_string().as_bytes()),
Some(&result.deleted.to_string().as_bytes()), Some(&result.deleted.to_string().as_bytes()),
Some(&result.prep_deleted.to_string().as_bytes()),
Some(&result.slru_deleted.to_string().as_bytes()),
Some(&result.chkp_deleted.to_string().as_bytes()),
Some(&result.dropped.to_string().as_bytes()), Some(&result.dropped.to_string().as_bytes()),
Some(&result.elapsed.as_millis().to_string().as_bytes()), Some(&result.elapsed.as_millis().to_string().as_bytes()),
]))? ]))?

View File

@@ -1,10 +1,15 @@
use crate::object_key::*;
use crate::waldecoder::TransactionId;
use crate::ZTimelineId; use crate::ZTimelineId;
use anyhow::Result; use anyhow::Result;
use bytes::{Buf, BufMut, Bytes, BytesMut}; use bytes::{Buf, BufMut, Bytes, BytesMut};
use postgres_ffi::nonrelfile_utils::transaction_id_get_status;
use postgres_ffi::pg_constants;
use postgres_ffi::relfile_utils::forknumber_to_name; use postgres_ffi::relfile_utils::forknumber_to_name;
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
use std::collections::HashSet; use std::collections::HashSet;
use std::fmt; use std::fmt;
use std::iter::Iterator;
use std::sync::Arc; use std::sync::Arc;
use std::time::Duration; use std::time::Duration;
use zenith_utils::lsn::Lsn; use zenith_utils::lsn::Lsn;
@@ -35,8 +40,12 @@ pub trait Repository: Send + Sync {
#[derive(Default)] #[derive(Default)]
pub struct GcResult { pub struct GcResult {
pub n_relations: u64, pub n_relations: u64,
pub inspected: u64,
pub truncated: u64, pub truncated: u64,
pub deleted: u64, pub deleted: u64,
pub prep_deleted: u64, // 2PC prepare
pub slru_deleted: u64, // SLRU (clog, multixact)
pub chkp_deleted: u64, // Checkpoints
pub dropped: u64, pub dropped: u64,
pub elapsed: Duration, pub elapsed: Duration,
} }
@@ -47,7 +56,10 @@ pub trait Timeline: Send + Sync {
//------------------------------------------------------------------------------ //------------------------------------------------------------------------------
/// Look up given page in the cache. /// Look up given page in the cache.
fn get_page_at_lsn(&self, tag: BufferTag, lsn: Lsn) -> Result<Bytes>; fn get_page_at_lsn(&self, tag: ObjectTag, lsn: Lsn) -> Result<Bytes>;
/// Look up given page in the cache.
fn get_page_at_lsn_nowait(&self, tag: ObjectTag, lsn: Lsn) -> Result<Bytes>;
/// Get size of relation /// Get size of relation
fn get_rel_size(&self, tag: RelTag, lsn: Lsn) -> Result<u32>; fn get_rel_size(&self, tag: RelTag, lsn: Lsn) -> Result<u32>;
@@ -58,6 +70,9 @@ pub trait Timeline: Send + Sync {
/// Get a list of all distinct relations in given tablespace and database. /// Get a list of all distinct relations in given tablespace and database.
fn list_rels(&self, spcnode: u32, dbnode: u32, lsn: Lsn) -> Result<HashSet<RelTag>>; fn list_rels(&self, spcnode: u32, dbnode: u32, lsn: Lsn) -> Result<HashSet<RelTag>>;
/// Get a list of non-relational objects
fn list_nonrels<'a>(&'a self, lsn: Lsn) -> Result<Box<dyn Iterator<Item = ObjectTag> + 'a>>;
//------------------------------------------------------------------------------ //------------------------------------------------------------------------------
// Public PUT functions, to update the repository with new page versions. // Public PUT functions, to update the repository with new page versions.
// //
@@ -68,17 +83,26 @@ pub trait Timeline: Send + Sync {
/// ///
/// This will implicitly extend the relation, if the page is beyond the /// This will implicitly extend the relation, if the page is beyond the
/// current end-of-file. /// current end-of-file.
fn put_wal_record(&self, tag: BufferTag, rec: WALRecord) -> Result<()>; fn put_wal_record(&self, tag: ObjectTag, rec: WALRecord) -> Result<()>;
/// Put raw data
fn put_raw_data(&self, tag: ObjectTag, lsn: Lsn, data: &[u8]) -> Result<()>;
/// Like put_wal_record, but with ready-made image of the page. /// Like put_wal_record, but with ready-made image of the page.
fn put_page_image(&self, tag: BufferTag, lsn: Lsn, img: Bytes) -> Result<()>; fn put_page_image(&self, tag: ObjectTag, lsn: Lsn, img: Bytes, update_meta: bool) -> Result<()>;
/// Truncate relation /// Truncate relation
fn put_truncation(&self, rel: RelTag, lsn: Lsn, nblocks: u32) -> Result<()>; fn put_truncation(&self, rel: RelTag, lsn: Lsn, nblocks: u32) -> Result<()>;
/// Unlink object. This method is used for marking dropped relations. /// Unlink relation. This method is used for marking dropped relations.
fn put_unlink(&self, tag: RelTag, lsn: Lsn) -> Result<()>; fn put_unlink(&self, tag: RelTag, lsn: Lsn) -> Result<()>;
/// Truncate SRLU segment
fn put_slru_truncate(&self, tag: ObjectTag, lsn: Lsn) -> Result<()>;
// Get object tag greater or equal than specified
fn get_next_tag(&self, tag: ObjectTag) -> Result<Option<ObjectTag>>;
/// Remember the all WAL before the given LSN has been processed. /// Remember the all WAL before the given LSN has been processed.
/// ///
/// The WAL receiver calls this after the put_* functions, to indicate that /// The WAL receiver calls this after the put_* functions, to indicate that
@@ -117,6 +141,15 @@ pub trait Timeline: Send + Sync {
/// ///
/// `horizon` specifies delta from last LSN to preserve all object versions (PITR interval). /// `horizon` specifies delta from last LSN to preserve all object versions (PITR interval).
fn gc_iteration(&self, horizon: u64) -> Result<GcResult>; fn gc_iteration(&self, horizon: u64) -> Result<GcResult>;
// Check transaction status
fn get_tx_status(&self, xid: TransactionId, lsn: Lsn) -> anyhow::Result<u8> {
let blknum = xid / pg_constants::CLOG_XACTS_PER_PAGE;
let tag = ObjectTag::Clog(SlruBufferTag { blknum });
let clog_page = self.get_page_at_lsn(tag, lsn)?;
let status = transaction_id_get_status(xid, &clog_page[..]);
Ok(status)
}
} }
pub trait History: Iterator<Item = Result<RelationUpdate>> { pub trait History: Iterator<Item = Result<RelationUpdate>> {
@@ -278,11 +311,11 @@ mod tests {
/// Convenience function to create a BufferTag for testing. /// Convenience function to create a BufferTag for testing.
/// Helps to keeps the tests shorter. /// Helps to keeps the tests shorter.
#[allow(non_snake_case)] #[allow(non_snake_case)]
fn TEST_BUF(blknum: u32) -> BufferTag { fn TEST_BUF(blknum: u32) -> ObjectTag {
BufferTag { ObjectTag::RelationBuffer(BufferTag {
rel: TESTREL_A, rel: TESTREL_A,
blknum, blknum,
} })
} }
/// Convenience function to create a page image with given string as the only content /// Convenience function to create a page image with given string as the only content
@@ -334,11 +367,11 @@ mod tests {
let tline = repo.create_empty_timeline(timelineid, Lsn(0))?; let tline = repo.create_empty_timeline(timelineid, Lsn(0))?;
tline.init_valid_lsn(Lsn(1)); tline.init_valid_lsn(Lsn(1));
tline.put_page_image(TEST_BUF(0), Lsn(2), TEST_IMG("foo blk 0 at 2"))?; tline.put_page_image(TEST_BUF(0), Lsn(2), TEST_IMG("foo blk 0 at 2"), true)?;
tline.put_page_image(TEST_BUF(0), Lsn(2), TEST_IMG("foo blk 0 at 2"))?; tline.put_page_image(TEST_BUF(0), Lsn(2), TEST_IMG("foo blk 0 at 2"), true)?;
tline.put_page_image(TEST_BUF(0), Lsn(3), TEST_IMG("foo blk 0 at 3"))?; tline.put_page_image(TEST_BUF(0), Lsn(3), TEST_IMG("foo blk 0 at 3"), true)?;
tline.put_page_image(TEST_BUF(1), Lsn(4), TEST_IMG("foo blk 1 at 4"))?; tline.put_page_image(TEST_BUF(1), Lsn(4), TEST_IMG("foo blk 1 at 4"), true)?;
tline.put_page_image(TEST_BUF(2), Lsn(5), TEST_IMG("foo blk 2 at 5"))?; tline.put_page_image(TEST_BUF(2), Lsn(5), TEST_IMG("foo blk 2 at 5"), true)?;
tline.advance_last_valid_lsn(Lsn(5)); tline.advance_last_valid_lsn(Lsn(5));
@@ -425,7 +458,7 @@ mod tests {
for i in 0..pg_constants::RELSEG_SIZE + 1 { for i in 0..pg_constants::RELSEG_SIZE + 1 {
let img = TEST_IMG(&format!("foo blk {} at {}", i, Lsn(lsn))); let img = TEST_IMG(&format!("foo blk {} at {}", i, Lsn(lsn)));
lsn += 1; lsn += 1;
tline.put_page_image(TEST_BUF(i as u32), Lsn(lsn), img)?; tline.put_page_image(TEST_BUF(i as u32), Lsn(lsn), img, true)?;
} }
tline.advance_last_valid_lsn(Lsn(lsn)); tline.advance_last_valid_lsn(Lsn(lsn));
@@ -467,16 +500,17 @@ mod tests {
assert_eq!(None, snapshot.next().transpose()?); assert_eq!(None, snapshot.next().transpose()?);
// add a page and advance the last valid LSN // add a page and advance the last valid LSN
let buf = TEST_BUF(1); let rel = TESTREL_A;
tline.put_page_image(buf, Lsn(1), TEST_IMG("blk 1 @ lsn 1"))?; let tag = TEST_BUF(1);
tline.put_page_image(tag, Lsn(1), TEST_IMG("blk 1 @ lsn 1"), true)?;
tline.advance_last_valid_lsn(Lsn(1)); tline.advance_last_valid_lsn(Lsn(1));
let mut snapshot = tline.history()?; let mut snapshot = tline.history()?;
assert_eq!(snapshot.lsn(), Lsn(1)); assert_eq!(snapshot.lsn(), Lsn(1));
let expected_page = RelationUpdate { let expected_page = RelationUpdate {
rel: buf.rel, rel: rel,
lsn: Lsn(1), lsn: Lsn(1),
update: Update::Page { update: Update::Page {
blknum: buf.blknum, blknum: 1,
img: TEST_IMG("blk 1 @ lsn 1"), img: TEST_IMG("blk 1 @ lsn 1"),
}, },
}; };
@@ -484,7 +518,7 @@ mod tests {
assert_eq!(None, snapshot.next().transpose()?); assert_eq!(None, snapshot.next().transpose()?);
// truncate to zero, but don't advance the last valid LSN // truncate to zero, but don't advance the last valid LSN
tline.put_truncation(buf.rel, Lsn(2), 0)?; tline.put_truncation(rel, Lsn(2), 0)?;
let mut snapshot = tline.history()?; let mut snapshot = tline.history()?;
assert_eq!(snapshot.lsn(), Lsn(1)); assert_eq!(snapshot.lsn(), Lsn(1));
assert_eq!(Some(&expected_page), snapshot.next().transpose()?.as_ref()); assert_eq!(Some(&expected_page), snapshot.next().transpose()?.as_ref());
@@ -498,7 +532,7 @@ mod tests {
// TODO ordering not guaranteed by API. But currently it returns the // TODO ordering not guaranteed by API. But currently it returns the
// truncation entry before the block data. // truncation entry before the block data.
let expected_truncate = RelationUpdate { let expected_truncate = RelationUpdate {
rel: buf.rel, rel: rel,
lsn: Lsn(2), lsn: Lsn(2),
update: Update::Truncate { n_blocks: 0 }, update: Update::Truncate { n_blocks: 0 },
}; };
@@ -515,15 +549,14 @@ mod tests {
impl WalRedoManager for TestRedoManager { impl WalRedoManager for TestRedoManager {
fn request_redo( fn request_redo(
&self, &self,
tag: BufferTag, tag: ObjectTag,
lsn: Lsn, lsn: Lsn,
base_img: Option<Bytes>, base_img: Option<Bytes>,
records: Vec<WALRecord>, records: Vec<WALRecord>,
) -> Result<Bytes, WalRedoError> { ) -> Result<Bytes, WalRedoError> {
let s = format!( let s = format!(
"redo for rel {} blk {} to get to {}, with {} and {} records", "redo for {:?} to get to {}, with {} and {} records",
tag.rel, tag,
tag.blknum,
lsn, lsn,
if base_img.is_some() { if base_img.is_some() {
"base image" "base image"

View File

@@ -12,18 +12,21 @@ use std::io::SeekFrom;
use std::path::{Path, PathBuf}; use std::path::{Path, PathBuf};
use anyhow::Result; use anyhow::Result;
use bytes::Bytes; use bytes::{Buf, Bytes};
use crate::repository::{BufferTag, RelTag, Timeline, WALRecord}; use crate::object_key::*;
use crate::waldecoder::{decode_wal_record, DecodedWALRecord, Oid, WalStreamDecoder}; use crate::repository::*;
use crate::waldecoder::{XlCreateDatabase, XlSmgrTruncate, XlXactParsedRecord}; use crate::waldecoder::*;
use crate::PageServerConf; use crate::PageServerConf;
use crate::ZTimelineId; use crate::ZTimelineId;
use postgres_ffi::pg_constants;
use postgres_ffi::relfile_utils::*; use postgres_ffi::relfile_utils::*;
use postgres_ffi::xlog_utils::*; use postgres_ffi::xlog_utils::*;
use postgres_ffi::{pg_constants, CheckPoint, ControlFileData};
use zenith_utils::lsn::Lsn; use zenith_utils::lsn::Lsn;
const MAX_MBR_BLKNO: u32 =
pg_constants::MAX_MULTIXACT_ID / pg_constants::MULTIXACT_MEMBERS_PER_PAGE as u32;
/// ///
/// Find latest snapshot in a timeline's 'snapshots' directory /// Find latest snapshot in a timeline's 'snapshots' directory
/// ///
@@ -63,8 +66,18 @@ pub fn import_timeline_from_postgres_datadir(
None => continue, None => continue,
// These special files appear in the snapshot, but are not needed by the page server // These special files appear in the snapshot, but are not needed by the page server
Some("pg_control") => continue, Some("pg_control") => {
Some("pg_filenode.map") => continue, import_nonrel_file(timeline, lsn, ObjectTag::ControlFile, &direntry.path())?
}
Some("pg_filenode.map") => import_nonrel_file(
timeline,
lsn,
ObjectTag::FileNodeMap(DatabaseTag {
spcnode: pg_constants::GLOBALTABLESPACE_OID,
dbnode: 0,
}),
&direntry.path(),
)?,
// Load any relation files into the page server // Load any relation files into the page server
_ => import_relfile( _ => import_relfile(
@@ -91,7 +104,15 @@ pub fn import_timeline_from_postgres_datadir(
// These special files appear in the snapshot, but are not needed by the page server // These special files appear in the snapshot, but are not needed by the page server
Some("PG_VERSION") => continue, Some("PG_VERSION") => continue,
Some("pg_filenode.map") => continue, Some("pg_filenode.map") => import_nonrel_file(
timeline,
lsn,
ObjectTag::FileNodeMap(DatabaseTag {
spcnode: pg_constants::DEFAULTTABLESPACE_OID,
dbnode: dboid,
}),
&direntry.path(),
)?,
// Load any relation files into the page server // Load any relation files into the page server
_ => import_relfile( _ => import_relfile(
@@ -104,6 +125,43 @@ pub fn import_timeline_from_postgres_datadir(
} }
} }
} }
for entry in fs::read_dir(path.join("pg_xact"))? {
let entry = entry?;
import_slru_file(
timeline,
lsn,
|blknum| ObjectTag::Clog(SlruBufferTag { blknum }),
&entry.path(),
)?;
}
for entry in fs::read_dir(path.join("pg_multixact").join("members"))? {
let entry = entry?;
import_slru_file(
timeline,
lsn,
|blknum| ObjectTag::MultiXactMembers(SlruBufferTag { blknum }),
&entry.path(),
)?;
}
for entry in fs::read_dir(path.join("pg_multixact").join("offsets"))? {
let entry = entry?;
import_slru_file(
timeline,
lsn,
|blknum| ObjectTag::MultiXactOffsets(SlruBufferTag { blknum }),
&entry.path(),
)?;
}
for entry in fs::read_dir(path.join("pg_twophase"))? {
let entry = entry?;
let xid = u32::from_str_radix(&entry.path().to_str().unwrap(), 16)?;
import_nonrel_file(
timeline,
lsn,
ObjectTag::TwoPhase(PrepareTag { xid }),
&entry.path(),
)?;
}
// TODO: Scan pg_tblspc // TODO: Scan pg_tblspc
timeline.checkpoint()?; timeline.checkpoint()?;
@@ -136,7 +194,7 @@ fn import_relfile(
let r = file.read_exact(&mut buf); let r = file.read_exact(&mut buf);
match r { match r {
Ok(_) => { Ok(_) => {
let tag = BufferTag { let tag = ObjectTag::RelationBuffer(BufferTag {
rel: RelTag { rel: RelTag {
spcnode: spcoid, spcnode: spcoid,
dbnode: dboid, dbnode: dboid,
@@ -144,13 +202,61 @@ fn import_relfile(
forknum, forknum,
}, },
blknum, blknum,
}; });
timeline.put_page_image(tag, lsn, Bytes::copy_from_slice(&buf))?; timeline.put_page_image(tag, lsn, Bytes::copy_from_slice(&buf), true)?;
/* }
if oldest_lsn == 0 || p.lsn < oldest_lsn {
oldest_lsn = p.lsn; // TODO: UnexpectedEof is expected
Err(e) => match e.kind() {
std::io::ErrorKind::UnexpectedEof => {
// reached EOF. That's expected.
// FIXME: maybe check that we read the full length of the file?
break;
} }
*/ _ => {
error!("error reading file: {:?} ({})", path, e);
break;
}
},
};
blknum += 1;
}
Ok(())
}
fn import_nonrel_file(
timeline: &dyn Timeline,
lsn: Lsn,
tag: ObjectTag,
path: &Path,
) -> Result<()> {
let mut file = File::open(path)?;
let mut buffer = Vec::new();
// read the whole file
file.read_to_end(&mut buffer)?;
timeline.put_page_image(tag, lsn, Bytes::copy_from_slice(&buffer[..]), false)?;
Ok(())
}
fn import_slru_file(
timeline: &dyn Timeline,
lsn: Lsn,
gen_tag: fn(blknum: u32) -> ObjectTag,
path: &Path,
) -> Result<()> {
// Does it look like a relation file?
let mut file = File::open(path)?;
let mut buf: [u8; 8192] = [0u8; 8192];
let segno = u32::from_str_radix(path.file_name().unwrap().to_str().unwrap(), 16)?;
let mut blknum: u32 = segno * pg_constants::SLRU_PAGES_PER_SEGMENT;
loop {
let r = file.read_exact(&mut buf);
match r {
Ok(_) => {
timeline.put_page_image(gen_tag(blknum), lsn, Bytes::copy_from_slice(&buf), false)?;
} }
// TODO: UnexpectedEof is expected // TODO: UnexpectedEof is expected
@@ -180,6 +286,18 @@ pub fn import_timeline_wal(walpath: &Path, timeline: &dyn Timeline, startpoint:
let mut segno = startpoint.segment_number(pg_constants::WAL_SEGMENT_SIZE); let mut segno = startpoint.segment_number(pg_constants::WAL_SEGMENT_SIZE);
let mut offset = startpoint.segment_offset(pg_constants::WAL_SEGMENT_SIZE); let mut offset = startpoint.segment_offset(pg_constants::WAL_SEGMENT_SIZE);
let mut last_lsn = startpoint; let mut last_lsn = startpoint;
let checkpoint_bytes = timeline.get_page_at_lsn_nowait(ObjectTag::Checkpoint, startpoint)?;
let mut checkpoint = CheckPoint::decode(&checkpoint_bytes)?;
// get_page_at_lsn_nowait returns pages with zeros when object is not found in the storage.
// nextXid can not be zero, so this check is used to detect situation when checkpoint record needs to be initialized.
if checkpoint.nextXid.value == 0 {
let pg_control_bytes =
timeline.get_page_at_lsn_nowait(ObjectTag::ControlFile, startpoint)?;
let pg_control = ControlFileData::decode(&pg_control_bytes)?;
checkpoint = pg_control.checkPointCopy;
}
loop { loop {
// FIXME: assume postgresql tli 1 for now // FIXME: assume postgresql tli 1 for now
let filename = XLogFileName(1, segno, pg_constants::WAL_SEGMENT_SIZE); let filename = XLogFileName(1, segno, pg_constants::WAL_SEGMENT_SIZE);
@@ -217,11 +335,12 @@ pub fn import_timeline_wal(walpath: &Path, timeline: &dyn Timeline, startpoint:
if rec.is_err() { if rec.is_err() {
// Assume that an error means we've reached the end of // Assume that an error means we've reached the end of
// a partial WAL record. So that's ok. // a partial WAL record. So that's ok.
trace!("WAL decoder error {:?}", rec);
break; break;
} }
if let Some((lsn, recdata)) = rec.unwrap() { if let Some((lsn, recdata)) = rec.unwrap() {
let decoded = decode_wal_record(recdata.clone()); let decoded = decode_wal_record(recdata.clone());
save_decoded_record(timeline, &decoded, recdata, lsn)?; save_decoded_record(&mut checkpoint, timeline, &decoded, recdata, lsn)?;
last_lsn = lsn; last_lsn = lsn;
} else { } else {
break; break;
@@ -240,6 +359,8 @@ pub fn import_timeline_wal(walpath: &Path, timeline: &dyn Timeline, startpoint:
offset = 0; offset = 0;
} }
info!("reached end of WAL at {}", last_lsn); info!("reached end of WAL at {}", last_lsn);
let checkpoint_bytes = checkpoint.encode();
timeline.put_page_image(ObjectTag::Checkpoint, last_lsn, checkpoint_bytes, false)?;
Ok(()) Ok(())
} }
@@ -248,15 +369,18 @@ pub fn import_timeline_wal(walpath: &Path, timeline: &dyn Timeline, startpoint:
/// relations/pages that the record affects. /// relations/pages that the record affects.
/// ///
pub fn save_decoded_record( pub fn save_decoded_record(
checkpoint: &mut CheckPoint,
timeline: &dyn Timeline, timeline: &dyn Timeline,
decoded: &DecodedWALRecord, decoded: &DecodedWALRecord,
recdata: Bytes, recdata: Bytes,
lsn: Lsn, lsn: Lsn,
) -> Result<()> { ) -> Result<()> {
checkpoint.update_next_xid(decoded.xl_xid);
// Iterate through all the blocks that the record modifies, and // Iterate through all the blocks that the record modifies, and
// "put" a separate copy of the record for each block. // "put" a separate copy of the record for each block.
for blk in decoded.blocks.iter() { for blk in decoded.blocks.iter() {
let tag = BufferTag { let tag = ObjectTag::RelationBuffer(BufferTag {
rel: RelTag { rel: RelTag {
spcnode: blk.rnode_spcnode, spcnode: blk.rnode_spcnode,
dbnode: blk.rnode_dbnode, dbnode: blk.rnode_dbnode,
@@ -264,7 +388,7 @@ pub fn save_decoded_record(
forknum: blk.forknum as u8, forknum: blk.forknum as u8,
}, },
blknum: blk.blkno, blknum: blk.blkno,
}; });
let rec = WALRecord { let rec = WALRecord {
lsn, lsn,
@@ -276,15 +400,18 @@ pub fn save_decoded_record(
timeline.put_wal_record(tag, rec)?; timeline.put_wal_record(tag, rec)?;
} }
let mut buf = decoded.record.clone();
buf.advance(decoded.main_data_offset);
// Handle a few special record types // Handle a few special record types
if decoded.xl_rmid == pg_constants::RM_SMGR_ID if decoded.xl_rmid == pg_constants::RM_SMGR_ID
&& (decoded.xl_info & pg_constants::XLR_RMGR_INFO_MASK) == pg_constants::XLOG_SMGR_TRUNCATE && (decoded.xl_info & pg_constants::XLR_RMGR_INFO_MASK) == pg_constants::XLOG_SMGR_TRUNCATE
{ {
let truncate = XlSmgrTruncate::decode(&decoded); let truncate = XlSmgrTruncate::decode(&mut buf);
save_xlog_smgr_truncate(timeline, lsn, &truncate)?; save_xlog_smgr_truncate(timeline, lsn, &truncate)?;
} else if decoded.xl_rmid == pg_constants::RM_DBASE_ID { } else if decoded.xl_rmid == pg_constants::RM_DBASE_ID {
if (decoded.xl_info & pg_constants::XLR_RMGR_INFO_MASK) == pg_constants::XLOG_DBASE_CREATE { if (decoded.xl_info & pg_constants::XLR_RMGR_INFO_MASK) == pg_constants::XLOG_DBASE_CREATE {
let createdb = XlCreateDatabase::decode(&decoded); let createdb = XlCreateDatabase::decode(&mut buf);
save_xlog_dbase_create(timeline, lsn, &createdb)?; save_xlog_dbase_create(timeline, lsn, &createdb)?;
} else { } else {
// TODO // TODO
@@ -292,6 +419,39 @@ pub fn save_decoded_record(
} }
} else if decoded.xl_rmid == pg_constants::RM_TBLSPC_ID { } else if decoded.xl_rmid == pg_constants::RM_TBLSPC_ID {
trace!("XLOG_TBLSPC_CREATE/DROP is not handled yet"); trace!("XLOG_TBLSPC_CREATE/DROP is not handled yet");
} else if decoded.xl_rmid == pg_constants::RM_CLOG_ID {
let blknum = buf.get_u32_le();
let info = decoded.xl_info & !pg_constants::XLR_INFO_MASK;
let tag = ObjectTag::Clog(SlruBufferTag { blknum });
if info == pg_constants::CLOG_ZEROPAGE {
let rec = WALRecord {
lsn,
will_init: true,
rec: recdata.clone(),
main_data_offset: decoded.main_data_offset as u32,
};
timeline.put_wal_record(tag, rec)?;
} else {
assert!(info == pg_constants::CLOG_TRUNCATE);
checkpoint.oldestXid = buf.get_u32_le();
checkpoint.oldestXidDB = buf.get_u32_le();
trace!(
"RM_CLOG_ID truncate blkno {} oldestXid {} oldestXidDB {}",
blknum,
checkpoint.oldestXid,
checkpoint.oldestXidDB
);
if let Some(ObjectTag::Clog(first_slru_tag)) =
timeline.get_next_tag(ObjectTag::Clog(SlruBufferTag { blknum: 0 }))?
{
for trunc_blknum in first_slru_tag.blknum..=blknum {
let tag = ObjectTag::Clog(SlruBufferTag {
blknum: trunc_blknum,
});
timeline.put_slru_truncate(tag, lsn)?;
}
}
}
} else if decoded.xl_rmid == pg_constants::RM_XACT_ID { } else if decoded.xl_rmid == pg_constants::RM_XACT_ID {
let info = decoded.xl_info & pg_constants::XLOG_XACT_OPMASK; let info = decoded.xl_info & pg_constants::XLOG_XACT_OPMASK;
if info == pg_constants::XLOG_XACT_COMMIT if info == pg_constants::XLOG_XACT_COMMIT
@@ -299,11 +459,73 @@ pub fn save_decoded_record(
|| info == pg_constants::XLOG_XACT_ABORT || info == pg_constants::XLOG_XACT_ABORT
|| info == pg_constants::XLOG_XACT_ABORT_PREPARED || info == pg_constants::XLOG_XACT_ABORT_PREPARED
{ {
let parsed_xact = XlXactParsedRecord::decode(&decoded); let parsed_xact = XlXactParsedRecord::decode(&mut buf, decoded.xl_xid, decoded.xl_info);
save_xact_record(timeline, lsn, &parsed_xact)?; save_xact_record(timeline, lsn, &parsed_xact, decoded)?;
} else if info == pg_constants::XLOG_XACT_PREPARE {
let rec = WALRecord {
lsn,
will_init: true,
rec: recdata.clone(),
main_data_offset: decoded.main_data_offset as u32,
};
timeline.put_wal_record(
ObjectTag::TwoPhase(PrepareTag {
xid: decoded.xl_xid,
}),
rec,
)?;
}
} else if decoded.xl_rmid == pg_constants::RM_MULTIXACT_ID {
let info = decoded.xl_info & pg_constants::XLR_RMGR_INFO_MASK;
if info == pg_constants::XLOG_MULTIXACT_ZERO_OFF_PAGE
|| info == pg_constants::XLOG_MULTIXACT_ZERO_MEM_PAGE
{
let blknum = buf.get_u32_le();
let rec = WALRecord {
lsn,
will_init: true,
rec: recdata.clone(),
main_data_offset: decoded.main_data_offset as u32,
};
let tag = if info == pg_constants::XLOG_MULTIXACT_ZERO_OFF_PAGE {
ObjectTag::MultiXactOffsets(SlruBufferTag { blknum })
} else {
ObjectTag::MultiXactMembers(SlruBufferTag { blknum })
};
timeline.put_wal_record(tag, rec)?;
} else if info == pg_constants::XLOG_MULTIXACT_CREATE_ID {
let xlrec = XlMultiXactCreate::decode(&mut buf);
save_multixact_create_record(checkpoint, timeline, lsn, &xlrec, decoded)?;
} else if info == pg_constants::XLOG_MULTIXACT_TRUNCATE_ID {
let xlrec = XlMultiXactTruncate::decode(&mut buf);
save_multixact_truncate_record(checkpoint, timeline, lsn, &xlrec)?;
}
} else if decoded.xl_rmid == pg_constants::RM_RELMAP_ID {
let xlrec = XlRelmapUpdate::decode(&mut buf);
save_relmap_record(timeline, lsn, &xlrec, decoded)?;
} else if decoded.xl_rmid == pg_constants::RM_XLOG_ID {
let info = decoded.xl_info & pg_constants::XLR_RMGR_INFO_MASK;
if info == pg_constants::XLOG_NEXTOID {
let next_oid = buf.get_u32_le();
checkpoint.nextOid = next_oid;
} else if info == pg_constants::XLOG_CHECKPOINT_ONLINE
|| info == pg_constants::XLOG_CHECKPOINT_SHUTDOWN
{
let mut checkpoint_bytes = [0u8; SIZEOF_CHECKPOINT];
let mut buf = decoded.record.clone();
buf.advance(decoded.main_data_offset);
buf.copy_to_slice(&mut checkpoint_bytes);
let xlog_checkpoint = CheckPoint::decode(&checkpoint_bytes).unwrap();
trace!(
"xlog_checkpoint.oldestXid={}, checkpoint.oldestXid={}",
xlog_checkpoint.oldestXid,
checkpoint.oldestXid
);
if (checkpoint.oldestXid.wrapping_sub(xlog_checkpoint.oldestXid) as i32) < 0 {
checkpoint.oldestXid = xlog_checkpoint.oldestXid;
}
} }
} }
// Now that this record has been handled, let the repository know that // Now that this record has been handled, let the repository know that
// it is up-to-date to this LSN // it is up-to-date to this LSN
timeline.advance_last_record_lsn(lsn); timeline.advance_last_record_lsn(lsn);
@@ -344,20 +566,20 @@ fn save_xlog_dbase_create(timeline: &dyn Timeline, lsn: Lsn, rec: &XlCreateDatab
// Copy content // Copy content
for blknum in 0..nblocks { for blknum in 0..nblocks {
let src_key = BufferTag { let src_key = ObjectTag::RelationBuffer(BufferTag {
rel: src_rel, rel: src_rel,
blknum, blknum,
}; });
let dst_key = BufferTag { let dst_key = ObjectTag::RelationBuffer(BufferTag {
rel: dst_rel, rel: dst_rel,
blknum, blknum,
}; });
let content = timeline.get_page_at_lsn(src_key, req_lsn)?; let content = timeline.get_page_at_lsn_nowait(src_key, req_lsn)?;
info!("copying block {:?} to {:?}", src_key, dst_key); debug!("copying block {:?} to {:?}", src_key, dst_key);
timeline.put_page_image(dst_key, lsn, content)?; timeline.put_page_image(dst_key, lsn, content, true)?;
num_blocks_copied += 1; num_blocks_copied += 1;
} }
@@ -368,6 +590,23 @@ fn save_xlog_dbase_create(timeline: &dyn Timeline, lsn: Lsn, rec: &XlCreateDatab
num_rels_copied += 1; num_rels_copied += 1;
} }
// Copy relfilemap
for tag in timeline.list_nonrels(req_lsn)? {
match tag {
ObjectTag::FileNodeMap(db) => {
if db.spcnode == src_tablespace_id && db.dbnode == src_db_id {
let img = timeline.get_page_at_lsn_nowait(tag, req_lsn)?;
let new_tag = ObjectTag::FileNodeMap(DatabaseTag {
spcnode: tablespace_id,
dbnode: db_id,
});
timeline.put_page_image(new_tag, lsn, img, false)?;
break;
}
}
_ => {} // do nothing
}
}
info!( info!(
"Created database {}/{}, copied {} blocks in {} rels at {}", "Created database {}/{}, copied {} blocks in {} rels at {}",
tablespace_id, db_id, num_blocks_copied, num_rels_copied, lsn tablespace_id, db_id, num_blocks_copied, num_rels_copied, lsn
@@ -440,8 +679,32 @@ fn save_xlog_smgr_truncate(timeline: &dyn Timeline, lsn: Lsn, rec: &XlSmgrTrunca
/// Subroutine of save_decoded_record(), to handle an XLOG_XACT_* records. /// Subroutine of save_decoded_record(), to handle an XLOG_XACT_* records.
/// ///
/// We are currently only interested in the dropped relations. /// We are currently only interested in the dropped relations.
fn save_xact_record(timeline: &dyn Timeline, lsn: Lsn, rec: &XlXactParsedRecord) -> Result<()> { fn save_xact_record(
for xnode in &rec.xnodes { timeline: &dyn Timeline,
lsn: Lsn,
parsed: &XlXactParsedRecord,
decoded: &DecodedWALRecord,
) -> Result<()> {
// Record update of CLOG page
let mut blknum = parsed.xid / pg_constants::CLOG_XACTS_PER_PAGE;
let tag = ObjectTag::Clog(SlruBufferTag { blknum });
let rec = WALRecord {
lsn,
will_init: false,
rec: decoded.record.clone(),
main_data_offset: decoded.main_data_offset as u32,
};
timeline.put_wal_record(tag, rec.clone())?;
for subxact in &parsed.subxacts {
let subxact_blknum = subxact / pg_constants::CLOG_XACTS_PER_PAGE;
if subxact_blknum != blknum {
blknum = subxact_blknum;
let tag = ObjectTag::Clog(SlruBufferTag { blknum });
timeline.put_wal_record(tag, rec.clone())?;
}
}
for xnode in &parsed.xnodes {
for forknum in pg_constants::MAIN_FORKNUM..=pg_constants::VISIBILITYMAP_FORKNUM { for forknum in pg_constants::MAIN_FORKNUM..=pg_constants::VISIBILITYMAP_FORKNUM {
let rel_tag = RelTag { let rel_tag = RelTag {
forknum, forknum,
@@ -454,3 +717,116 @@ fn save_xact_record(timeline: &dyn Timeline, lsn: Lsn, rec: &XlXactParsedRecord)
} }
Ok(()) Ok(())
} }
fn save_multixact_create_record(
checkpoint: &mut CheckPoint,
timeline: &dyn Timeline,
lsn: Lsn,
xlrec: &XlMultiXactCreate,
decoded: &DecodedWALRecord,
) -> Result<()> {
let rec = WALRecord {
lsn,
will_init: false,
rec: decoded.record.clone(),
main_data_offset: decoded.main_data_offset as u32,
};
let blknum = xlrec.mid / pg_constants::MULTIXACT_OFFSETS_PER_PAGE as u32;
let tag = ObjectTag::MultiXactOffsets(SlruBufferTag { blknum });
timeline.put_wal_record(tag, rec.clone())?;
let first_mbr_blkno = xlrec.moff / pg_constants::MULTIXACT_MEMBERS_PER_PAGE as u32;
let last_mbr_blkno =
(xlrec.moff + xlrec.nmembers - 1) / pg_constants::MULTIXACT_MEMBERS_PER_PAGE as u32;
// The members SLRU can, in contrast to the offsets one, be filled to almost
// the full range at once. So we need to handle wraparound.
let mut blknum = first_mbr_blkno;
loop {
// Update members page
let tag = ObjectTag::MultiXactMembers(SlruBufferTag { blknum });
timeline.put_wal_record(tag, rec.clone())?;
if blknum == last_mbr_blkno {
// last block inclusive
break;
}
// handle wraparound
if blknum == MAX_MBR_BLKNO {
blknum = 0;
} else {
blknum += 1;
}
}
if xlrec.mid >= checkpoint.nextMulti {
checkpoint.nextMulti = xlrec.mid + 1;
}
if xlrec.moff + xlrec.nmembers > checkpoint.nextMultiOffset {
checkpoint.nextMultiOffset = xlrec.moff + xlrec.nmembers;
}
let max_mbr_xid = xlrec.members.iter().fold(0u32, |acc, mbr| {
if mbr.xid.wrapping_sub(acc) as i32 > 0 {
mbr.xid
} else {
acc
}
});
checkpoint.update_next_xid(max_mbr_xid);
Ok(())
}
fn save_multixact_truncate_record(
checkpoint: &mut CheckPoint,
timeline: &dyn Timeline,
lsn: Lsn,
xlrec: &XlMultiXactTruncate,
) -> Result<()> {
checkpoint.oldestMulti = xlrec.end_trunc_off;
checkpoint.oldestMultiDB = xlrec.oldest_multi_db;
let first_off_blkno = xlrec.start_trunc_off / pg_constants::MULTIXACT_OFFSETS_PER_PAGE as u32;
let last_off_blkno = xlrec.end_trunc_off / pg_constants::MULTIXACT_OFFSETS_PER_PAGE as u32;
// Delete all the segments except the last one. The last segment can still
// contain, possibly partially, valid data.
for blknum in first_off_blkno..last_off_blkno {
let tag = ObjectTag::MultiXactOffsets(SlruBufferTag { blknum });
timeline.put_slru_truncate(tag, lsn)?;
}
let first_mbr_blkno = xlrec.start_trunc_memb / pg_constants::MULTIXACT_MEMBERS_PER_PAGE as u32;
let last_mbr_blkno = xlrec.end_trunc_memb / pg_constants::MULTIXACT_MEMBERS_PER_PAGE as u32;
// The members SLRU can, in contrast to the offsets one, be filled to almost
// the full range at once. So we need to handle wraparound.
let mut blknum = first_mbr_blkno;
// Delete all the segments but the last one. The last segment can still
// contain, possibly partially, valid data.
while blknum != last_mbr_blkno {
let tag = ObjectTag::MultiXactMembers(SlruBufferTag { blknum });
timeline.put_slru_truncate(tag, lsn)?;
// handle wraparound
if blknum == MAX_MBR_BLKNO {
blknum = 0;
} else {
blknum += 1;
}
}
Ok(())
}
fn save_relmap_record(
timeline: &dyn Timeline,
lsn: Lsn,
xlrec: &XlRelmapUpdate,
decoded: &DecodedWALRecord,
) -> Result<()> {
let rec = WALRecord {
lsn,
will_init: true,
rec: decoded.record.clone(),
main_data_offset: decoded.main_data_offset as u32,
};
let tag = ObjectTag::FileNodeMap(DatabaseTag {
spcnode: xlrec.tsid,
dbnode: xlrec.dbid,
});
timeline.put_wal_record(tag, rec)?;
Ok(())
}

View File

@@ -94,6 +94,21 @@ impl ObjectStore for RocksObjectStore {
} }
} }
fn get_next_key(&self, key: &ObjectKey) -> Result<Option<ObjectKey>> {
let mut iter = self.db.raw_iterator();
let search_key = StorageKey {
obj_key: key.clone(),
lsn: Lsn(0),
};
iter.seek(search_key.ser()?);
if !iter.valid() {
Ok(None)
} else {
let key = StorageKey::des(iter.key().unwrap())?;
Ok(Some(key.obj_key.clone()))
}
}
fn put(&self, key: &ObjectKey, lsn: Lsn, value: &[u8]) -> Result<()> { fn put(&self, key: &ObjectKey, lsn: Lsn, value: &[u8]) -> Result<()> {
self.db.put( self.db.put(
StorageKey::ser(&StorageKey { StorageKey::ser(&StorageKey {
@@ -124,6 +139,17 @@ impl ObjectStore for RocksObjectStore {
Ok(Box::new(iter)) Ok(Box::new(iter))
} }
/// Iterate through all timeline objects
fn list_objects<'a>(
&'a self,
timeline: ZTimelineId,
nonrel_only: bool,
lsn: Lsn,
) -> Result<Box<dyn Iterator<Item = ObjectTag> + 'a>> {
let iter = RocksObjectIter::new(&self.db, timeline, nonrel_only, lsn)?;
Ok(Box::new(iter))
}
/// Get a list of all distinct relations in given tablespace and database. /// Get a list of all distinct relations in given tablespace and database.
/// ///
/// TODO: This implementation is very inefficient, it scans /// TODO: This implementation is very inefficient, it scans
@@ -349,3 +375,69 @@ impl<'r> RocksObjects<'r> {
Ok(None) Ok(None)
} }
} }
///
/// Iterator for `list_objects`. Returns all objects preceeding specified LSN
///
struct RocksObjectIter<'a> {
timeline: ZTimelineId,
key: StorageKey,
nonrel_only: bool,
lsn: Lsn,
dbiter: rocksdb::DBRawIterator<'a>,
}
impl<'a> RocksObjectIter<'a> {
fn new(
db: &'a rocksdb::DB,
timeline: ZTimelineId,
nonrel_only: bool,
lsn: Lsn,
) -> Result<RocksObjectIter<'a>> {
let key = StorageKey {
obj_key: ObjectKey {
timeline,
tag: ObjectTag::FirstTag,
},
lsn: Lsn(0),
};
let dbiter = db.raw_iterator();
Ok(RocksObjectIter {
key,
timeline,
nonrel_only,
lsn,
dbiter,
})
}
}
impl<'a> Iterator for RocksObjectIter<'a> {
type Item = ObjectTag;
fn next(&mut self) -> std::option::Option<Self::Item> {
loop {
self.dbiter.seek(StorageKey::ser(&self.key).unwrap());
if !self.dbiter.valid() {
return None;
}
let key = StorageKey::des(self.dbiter.key().unwrap()).unwrap();
if key.obj_key.timeline != self.timeline {
// End of this timeline
return None;
}
self.key = key.clone();
self.key.lsn = Lsn(u64::MAX); // next seek should skip all versions
if key.lsn <= self.lsn {
// visible in this snapshot
if self.nonrel_only {
match key.obj_key.tag {
ObjectTag::RelationMetadata(_) => return None,
ObjectTag::RelationBuffer(_) => return None,
_ => return Some(key.obj_key.tag),
}
} else {
return Some(key.obj_key.tag);
}
}
}
}
}

View File

@@ -9,7 +9,6 @@ use postgres_ffi::xlog_utils::*;
use postgres_ffi::XLogLongPageHeaderData; use postgres_ffi::XLogLongPageHeaderData;
use postgres_ffi::XLogPageHeaderData; use postgres_ffi::XLogPageHeaderData;
use postgres_ffi::XLogRecord; use postgres_ffi::XLogRecord;
use std::cmp::min; use std::cmp::min;
use thiserror::Error; use thiserror::Error;
use zenith_utils::lsn::Lsn; use zenith_utils::lsn::Lsn;
@@ -18,7 +17,9 @@ pub type Oid = u32;
pub type TransactionId = u32; pub type TransactionId = u32;
pub type BlockNumber = u32; pub type BlockNumber = u32;
pub type OffsetNumber = u16; pub type OffsetNumber = u16;
pub type TimestampTz = i64; pub type MultiXactId = TransactionId;
pub type MultiXactOffset = u32;
pub type MultiXactStatus = u32;
#[allow(dead_code)] #[allow(dead_code)]
pub struct WalStreamDecoder { pub struct WalStreamDecoder {
@@ -245,6 +246,7 @@ impl DecodedBkpBlock {
} }
pub struct DecodedWALRecord { pub struct DecodedWALRecord {
pub xl_xid: TransactionId,
pub xl_info: u8, pub xl_info: u8,
pub xl_rmid: u8, pub xl_rmid: u8,
pub record: Bytes, // raw XLogRecord pub record: Bytes, // raw XLogRecord
@@ -261,6 +263,24 @@ pub struct RelFileNode {
pub relnode: Oid, /* relation */ pub relnode: Oid, /* relation */
} }
#[repr(C)]
#[derive(Debug)]
pub struct XlRelmapUpdate {
pub dbid: Oid, /* database ID, or 0 for shared map */
pub tsid: Oid, /* database's tablespace, or pg_global */
pub nbytes: i32, /* size of relmap data */
}
impl XlRelmapUpdate {
pub fn decode(buf: &mut Bytes) -> XlRelmapUpdate {
XlRelmapUpdate {
dbid: buf.get_u32_le(),
tsid: buf.get_u32_le(),
nbytes: buf.get_i32_le(),
}
}
}
#[repr(C)] #[repr(C)]
#[derive(Debug)] #[derive(Debug)]
pub struct XlSmgrTruncate { pub struct XlSmgrTruncate {
@@ -270,9 +290,7 @@ pub struct XlSmgrTruncate {
} }
impl XlSmgrTruncate { impl XlSmgrTruncate {
pub fn decode(decoded: &DecodedWALRecord) -> XlSmgrTruncate { pub fn decode(buf: &mut Bytes) -> XlSmgrTruncate {
let mut buf = decoded.record.clone();
buf.advance((XLOG_SIZE_OF_XLOG_RECORD + 2) as usize);
XlSmgrTruncate { XlSmgrTruncate {
blkno: buf.get_u32_le(), blkno: buf.get_u32_le(),
rnode: RelFileNode { rnode: RelFileNode {
@@ -295,9 +313,7 @@ pub struct XlCreateDatabase {
} }
impl XlCreateDatabase { impl XlCreateDatabase {
pub fn decode(decoded: &DecodedWALRecord) -> XlCreateDatabase { pub fn decode(buf: &mut Bytes) -> XlCreateDatabase {
let mut buf = decoded.record.clone();
buf.advance((XLOG_SIZE_OF_XLOG_RECORD + 2) as usize);
XlCreateDatabase { XlCreateDatabase {
db_id: buf.get_u32_le(), db_id: buf.get_u32_le(),
tablespace_id: buf.get_u32_le(), tablespace_id: buf.get_u32_le(),
@@ -392,6 +408,7 @@ impl XlHeapUpdate {
/// ///
#[derive(Debug)] #[derive(Debug)]
pub struct XlXactParsedRecord { pub struct XlXactParsedRecord {
pub xid: TransactionId,
pub info: u8, pub info: u8,
pub xact_time: TimestampTz, pub xact_time: TimestampTz,
pub xinfo: u32, pub xinfo: u32,
@@ -408,15 +425,12 @@ impl XlXactParsedRecord {
/// Decode a XLOG_XACT_COMMIT/ABORT/COMMIT_PREPARED/ABORT_PREPARED /// Decode a XLOG_XACT_COMMIT/ABORT/COMMIT_PREPARED/ABORT_PREPARED
/// record. This should agree with the ParseCommitRecord and ParseAbortRecord /// record. This should agree with the ParseCommitRecord and ParseAbortRecord
/// functions in PostgreSQL (in src/backend/access/rmgr/xactdesc.c) /// functions in PostgreSQL (in src/backend/access/rmgr/xactdesc.c)
pub fn decode(decoded: &DecodedWALRecord) -> XlXactParsedRecord { pub fn decode(buf: &mut Bytes, mut xid: TransactionId, xl_info: u8) -> XlXactParsedRecord {
let info = decoded.xl_info & pg_constants::XLOG_XACT_OPMASK; let info = xl_info & pg_constants::XLOG_XACT_OPMASK;
let mut buf = decoded.record.clone();
buf.advance(decoded.main_data_offset);
// The record starts with time of commit/abort // The record starts with time of commit/abort
let xact_time = buf.get_i64_le(); let xact_time = buf.get_u64_le();
let xinfo; let xinfo;
if decoded.xl_info & pg_constants::XLOG_XACT_HAS_INFO != 0 { if xl_info & pg_constants::XLOG_XACT_HAS_INFO != 0 {
xinfo = buf.get_u32_le(); xinfo = buf.get_u32_le();
} else { } else {
xinfo = 0; xinfo = 0;
@@ -466,10 +480,11 @@ impl XlXactParsedRecord {
} }
} }
if xinfo & pg_constants::XACT_XINFO_HAS_TWOPHASE != 0 { if xinfo & pg_constants::XACT_XINFO_HAS_TWOPHASE != 0 {
let _xid = buf.get_u32_le(); xid = buf.get_u32_le();
trace!("XLOG_XACT_COMMIT-XACT_XINFO_HAS_TWOPHASE"); trace!("XLOG_XACT_COMMIT-XACT_XINFO_HAS_TWOPHASE");
} }
XlXactParsedRecord { XlXactParsedRecord {
xid,
info, info,
xact_time, xact_time,
xinfo, xinfo,
@@ -481,6 +496,74 @@ impl XlXactParsedRecord {
} }
} }
#[repr(C)]
#[derive(Debug)]
pub struct MultiXactMember {
pub xid: TransactionId,
pub status: MultiXactStatus,
}
impl MultiXactMember {
pub fn decode(buf: &mut Bytes) -> MultiXactMember {
MultiXactMember {
xid: buf.get_u32_le(),
status: buf.get_u32_le(),
}
}
}
#[repr(C)]
#[derive(Debug)]
pub struct XlMultiXactCreate {
pub mid: MultiXactId, /* new MultiXact's ID */
pub moff: MultiXactOffset, /* its starting offset in members file */
pub nmembers: u32, /* number of member XIDs */
pub members: Vec<MultiXactMember>,
}
impl XlMultiXactCreate {
pub fn decode(buf: &mut Bytes) -> XlMultiXactCreate {
let mid = buf.get_u32_le();
let moff = buf.get_u32_le();
let nmembers = buf.get_u32_le();
let mut members = Vec::new();
for _ in 0..nmembers {
members.push(MultiXactMember::decode(buf));
}
XlMultiXactCreate {
mid,
moff,
nmembers,
members,
}
}
}
#[repr(C)]
#[derive(Debug)]
pub struct XlMultiXactTruncate {
pub oldest_multi_db: Oid,
/* to-be-truncated range of multixact offsets */
pub start_trunc_off: MultiXactId, /* just for completeness' sake */
pub end_trunc_off: MultiXactId,
/* to-be-truncated range of multixact members */
pub start_trunc_memb: MultiXactOffset,
pub end_trunc_memb: MultiXactOffset,
}
impl XlMultiXactTruncate {
pub fn decode(buf: &mut Bytes) -> XlMultiXactTruncate {
XlMultiXactTruncate {
oldest_multi_db: buf.get_u32_le(),
start_trunc_off: buf.get_u32_le(),
end_trunc_off: buf.get_u32_le(),
start_trunc_memb: buf.get_u32_le(),
end_trunc_memb: buf.get_u32_le(),
}
}
}
/// Main routine to decode a WAL record and figure out which blocks are modified /// Main routine to decode a WAL record and figure out which blocks are modified
// //
// See xlogrecord.h for details // See xlogrecord.h for details
@@ -806,6 +889,7 @@ pub fn decode_wal_record(record: Bytes) -> DecodedWALRecord {
} }
DecodedWALRecord { DecodedWALRecord {
xl_xid: xlogrec.xl_xid,
xl_info: xlogrec.xl_info, xl_info: xlogrec.xl_info,
xl_rmid: xlogrec.xl_rmid, xl_rmid: xlogrec.xl_rmid,
record, record,

View File

@@ -4,6 +4,7 @@
//! //!
//! We keep one WAL receiver active per timeline. //! We keep one WAL receiver active per timeline.
use crate::object_key::*;
use crate::page_cache; use crate::page_cache;
use crate::restore_local_repo; use crate::restore_local_repo;
use crate::waldecoder::*; use crate::waldecoder::*;
@@ -15,8 +16,8 @@ use log::*;
use postgres::fallible_iterator::FallibleIterator; use postgres::fallible_iterator::FallibleIterator;
use postgres::replication::ReplicationIter; use postgres::replication::ReplicationIter;
use postgres::{Client, NoTls, SimpleQueryMessage, SimpleQueryRow}; use postgres::{Client, NoTls, SimpleQueryMessage, SimpleQueryRow};
use postgres_ffi::pg_constants;
use postgres_ffi::xlog_utils::*; use postgres_ffi::xlog_utils::*;
use postgres_ffi::*;
use postgres_protocol::message::backend::ReplicationMessage; use postgres_protocol::message::backend::ReplicationMessage;
use postgres_types::PgLsn; use postgres_types::PgLsn;
use std::cmp::{max, min}; use std::cmp::{max, min};
@@ -168,6 +169,10 @@ fn walreceiver_main(
let mut waldecoder = WalStreamDecoder::new(startpoint); let mut waldecoder = WalStreamDecoder::new(startpoint);
let checkpoint_bytes = timeline.get_page_at_lsn_nowait(ObjectTag::Checkpoint, startpoint)?;
let mut checkpoint = CheckPoint::decode(&checkpoint_bytes)?;
trace!("CheckPoint.nextXid = {}", checkpoint.nextXid.value);
while let Some(replication_message) = physical_stream.next()? { while let Some(replication_message) = physical_stream.next()? {
let status_update = match replication_message { let status_update = match replication_message {
ReplicationMessage::XLogData(xlog_data) => { ReplicationMessage::XLogData(xlog_data) => {
@@ -185,9 +190,28 @@ fn walreceiver_main(
waldecoder.feed_bytes(data); waldecoder.feed_bytes(data);
while let Some((lsn, recdata)) = waldecoder.poll_decode()? { while let Some((lsn, recdata)) = waldecoder.poll_decode()? {
// Save old checkpoint value to compare with it after decoding WAL record
let old_checkpoint_bytes = checkpoint.encode();
let decoded = decode_wal_record(recdata.clone()); let decoded = decode_wal_record(recdata.clone());
restore_local_repo::save_decoded_record(&*timeline, &decoded, recdata, lsn)?; restore_local_repo::save_decoded_record(
&mut checkpoint,
&*timeline,
&decoded,
recdata,
lsn,
)?;
last_rec_lsn = lsn; last_rec_lsn = lsn;
let new_checkpoint_bytes = checkpoint.encode();
// Check if checkpoint data was updated by save_decoded_record
if new_checkpoint_bytes != old_checkpoint_bytes {
timeline.put_page_image(
ObjectTag::Checkpoint,
lsn,
new_checkpoint_bytes,
false,
)?;
}
} }
// Update the last_valid LSN value in the page cache one more time. We updated // Update the last_valid LSN value in the page cache one more time. We updated

View File

@@ -15,7 +15,8 @@
//! TODO: Even though the postgres code runs in a separate process, //! TODO: Even though the postgres code runs in a separate process,
//! it's not a secure sandbox. //! it's not a secure sandbox.
//! //!
use bytes::{BufMut, Bytes, BytesMut}; use byteorder::{ByteOrder, LittleEndian};
use bytes::{Buf, BufMut, Bytes, BytesMut};
use log::*; use log::*;
use std::cell::RefCell; use std::cell::RefCell;
use std::fs; use std::fs;
@@ -35,9 +36,15 @@ use tokio::time::timeout;
use zenith_utils::bin_ser::BeSer; use zenith_utils::bin_ser::BeSer;
use zenith_utils::lsn::Lsn; use zenith_utils::lsn::Lsn;
use crate::object_key::*;
use crate::repository::BufferTag; use crate::repository::BufferTag;
use crate::repository::WALRecord; use crate::repository::WALRecord;
use crate::waldecoder::XlXactParsedRecord;
use crate::waldecoder::{MultiXactId, XlMultiXactCreate};
use crate::PageServerConf; use crate::PageServerConf;
use postgres_ffi::nonrelfile_utils::transaction_id_set_status;
use postgres_ffi::pg_constants;
use postgres_ffi::XLogRecord;
/// ///
/// WAL Redo Manager is responsible for replaying WAL records. /// WAL Redo Manager is responsible for replaying WAL records.
@@ -52,7 +59,7 @@ pub trait WalRedoManager: Send + Sync {
/// the reords. /// the reords.
fn request_redo( fn request_redo(
&self, &self,
tag: BufferTag, tag: ObjectTag,
lsn: Lsn, lsn: Lsn,
base_img: Option<Bytes>, base_img: Option<Bytes>,
records: Vec<WALRecord>, records: Vec<WALRecord>,
@@ -68,7 +75,7 @@ pub struct DummyRedoManager {}
impl crate::walredo::WalRedoManager for DummyRedoManager { impl crate::walredo::WalRedoManager for DummyRedoManager {
fn request_redo( fn request_redo(
&self, &self,
_tag: BufferTag, _tag: ObjectTag,
_lsn: Lsn, _lsn: Lsn,
_base_img: Option<Bytes>, _base_img: Option<Bytes>,
_records: Vec<WALRecord>, _records: Vec<WALRecord>,
@@ -97,7 +104,7 @@ struct PostgresRedoManagerInternal {
#[derive(Debug)] #[derive(Debug)]
struct WalRedoRequest { struct WalRedoRequest {
tag: BufferTag, tag: ObjectTag,
lsn: Lsn, lsn: Lsn,
base_img: Option<Bytes>, base_img: Option<Bytes>,
@@ -159,7 +166,7 @@ impl WalRedoManager for PostgresRedoManager {
/// ///
fn request_redo( fn request_redo(
&self, &self,
tag: BufferTag, tag: ObjectTag,
lsn: Lsn, lsn: Lsn,
base_img: Option<Bytes>, base_img: Option<Bytes>,
records: Vec<WALRecord>, records: Vec<WALRecord>,
@@ -186,6 +193,24 @@ impl WalRedoManager for PostgresRedoManager {
} }
} }
fn mx_offset_to_flags_offset(xid: MultiXactId) -> usize {
((xid / pg_constants::MULTIXACT_MEMBERS_PER_MEMBERGROUP as u32) as u16
% pg_constants::MULTIXACT_MEMBERGROUPS_PER_PAGE
* pg_constants::MULTIXACT_MEMBERGROUP_SIZE) as usize
}
fn mx_offset_to_flags_bitshift(xid: MultiXactId) -> u16 {
(xid as u16) % pg_constants::MULTIXACT_MEMBERS_PER_MEMBERGROUP
* pg_constants::MXACT_MEMBER_BITS_PER_XACT
}
/* Location (byte offset within page) of TransactionId of given member */
fn mx_offset_to_member_offset(xid: MultiXactId) -> usize {
mx_offset_to_flags_offset(xid)
+ (pg_constants::MULTIXACT_FLAGBYTES_PER_GROUP
+ (xid as u16 % pg_constants::MULTIXACT_MEMBERS_PER_MEMBERGROUP) * 4) as usize
}
/// ///
/// WAL redo thread /// WAL redo thread
/// ///
@@ -249,7 +274,151 @@ impl PostgresRedoManagerInternal {
let start = Instant::now(); let start = Instant::now();
let apply_result: Result<Bytes, Error>; let apply_result: Result<Bytes, Error>;
apply_result = process.apply_wal_records(tag, base_img, records).await; if let ObjectTag::RelationBuffer(buf_tag) = tag {
// Relational WAL records are applied using wal-redo-postgres
apply_result = process.apply_wal_records(buf_tag, base_img, records).await;
} else {
// Non-relational WAL records we will aply ourselves.
const ZERO_PAGE: [u8; 8192] = [0u8; 8192];
let mut page = BytesMut::new();
if let Some(fpi) = base_img {
// If full-page image is provided, then use it...
page.extend_from_slice(&fpi[..]);
} else {
// otherwise initialize page with zeros
page.extend_from_slice(&ZERO_PAGE);
}
// Apply all callected WAL records
for record in records {
let mut buf = record.rec.clone();
// 1. Parse XLogRecord struct
// FIXME: refactor to avoid code duplication.
let xlogrec = XLogRecord::from_bytes(&mut buf);
//move to main data
// TODO probably, we should store some records in our special format
// to avoid this weird parsing on replay
let skip = (record.main_data_offset - pg_constants::SIZEOF_XLOGRECORD) as usize;
if buf.remaining() > skip {
buf.advance(skip);
}
if xlogrec.xl_rmid == pg_constants::RM_CLOG_ID {
let info = xlogrec.xl_info & !pg_constants::XLR_INFO_MASK;
if info == pg_constants::CLOG_ZEROPAGE {
// The only operation we need to implement is CLOG_ZEROPAGE
page.copy_from_slice(&ZERO_PAGE);
}
} else if xlogrec.xl_rmid == pg_constants::RM_XACT_ID {
// Transaction manager stuff
let info = xlogrec.xl_info & pg_constants::XLOG_XACT_OPMASK;
let tag_blknum = match tag {
ObjectTag::Clog(slru) => slru.blknum,
ObjectTag::TwoPhase(_) => {
assert!(info == pg_constants::XLOG_XACT_PREPARE);
trace!("Apply prepare {} record", xlogrec.xl_xid);
page.clear();
page.extend_from_slice(&buf[..]);
continue;
}
_ => panic!("Not valid XACT object tag {:?}", tag),
};
let parsed_xact =
XlXactParsedRecord::decode(&mut buf, xlogrec.xl_xid, xlogrec.xl_info);
if parsed_xact.info == pg_constants::XLOG_XACT_COMMIT
|| parsed_xact.info == pg_constants::XLOG_XACT_COMMIT_PREPARED
{
transaction_id_set_status(
parsed_xact.xid,
pg_constants::TRANSACTION_STATUS_COMMITTED,
&mut page,
);
for subxact in &parsed_xact.subxacts {
let blkno = *subxact as u32 / pg_constants::CLOG_XACTS_PER_PAGE;
// only update xids on the requested page
if tag_blknum == blkno {
transaction_id_set_status(
*subxact,
pg_constants::TRANSACTION_STATUS_SUB_COMMITTED,
&mut page,
);
}
}
} else if parsed_xact.info == pg_constants::XLOG_XACT_ABORT
|| parsed_xact.info == pg_constants::XLOG_XACT_ABORT_PREPARED
{
transaction_id_set_status(
parsed_xact.xid,
pg_constants::TRANSACTION_STATUS_ABORTED,
&mut page,
);
for subxact in &parsed_xact.subxacts {
let blkno = *subxact as u32 / pg_constants::CLOG_XACTS_PER_PAGE;
// only update xids on the requested page
if tag_blknum == blkno {
transaction_id_set_status(
*subxact,
pg_constants::TRANSACTION_STATUS_ABORTED,
&mut page,
);
}
}
}
} else if xlogrec.xl_rmid == pg_constants::RM_MULTIXACT_ID {
// Multiexact operations
let info = xlogrec.xl_info & pg_constants::XLR_RMGR_INFO_MASK;
if info == pg_constants::XLOG_MULTIXACT_ZERO_OFF_PAGE
|| info == pg_constants::XLOG_MULTIXACT_ZERO_MEM_PAGE
{
// Just need to zero page
page.copy_from_slice(&ZERO_PAGE);
} else if info == pg_constants::XLOG_MULTIXACT_CREATE_ID {
let xlrec = XlMultiXactCreate::decode(&mut buf);
if let ObjectTag::MultiXactMembers(slru) = tag {
for i in 0..xlrec.nmembers {
let blkno = i / pg_constants::MULTIXACT_MEMBERS_PER_PAGE as u32;
if blkno == slru.blknum {
// update only target block
let offset = xlrec.moff + i;
let memberoff = mx_offset_to_member_offset(offset);
let flagsoff = mx_offset_to_flags_offset(offset);
let bshift = mx_offset_to_flags_bitshift(offset);
let mut flagsval =
LittleEndian::read_u32(&page[flagsoff..flagsoff + 4]);
flagsval &=
!(((1 << pg_constants::MXACT_MEMBER_BITS_PER_XACT) - 1)
<< bshift);
flagsval |= xlrec.members[i as usize].status << bshift;
LittleEndian::write_u32(
&mut page[flagsoff..flagsoff + 4],
flagsval,
);
LittleEndian::write_u32(
&mut page[memberoff..memberoff + 4],
xlrec.members[i as usize].xid,
);
}
}
} else {
// Multixact offsets SLRU
let offs = (xlrec.mid % pg_constants::MULTIXACT_OFFSETS_PER_PAGE as u32
* 4) as usize;
LittleEndian::write_u32(&mut page[offs..offs + 4], xlrec.moff);
}
} else {
panic!();
}
} else if xlogrec.xl_rmid == pg_constants::RM_RELMAP_ID {
// Ralation map file has size 512 bytes
page.clear();
page.extend_from_slice(&buf[12..]); // skip xl_relmap_update
assert!(page.len() == 512); // size of pg_filenode.map
}
}
apply_result = Ok::<Bytes, Error>(page.freeze());
}
let duration = start.elapsed(); let duration = start.elapsed();

View File

@@ -4,6 +4,7 @@
include!(concat!(env!("OUT_DIR"), "/bindings.rs")); include!(concat!(env!("OUT_DIR"), "/bindings.rs"));
pub mod controlfile_utils; pub mod controlfile_utils;
pub mod nonrelfile_utils;
pub mod pg_constants; pub mod pg_constants;
pub mod relfile_utils; pub mod relfile_utils;
pub mod xlog_utils; pub mod xlog_utils;

View File

@@ -26,6 +26,19 @@ pub const SMGR_TRUNCATE_HEAP: u32 = 0x0001;
pub const SMGR_TRUNCATE_VM: u32 = 0x0002; pub const SMGR_TRUNCATE_VM: u32 = 0x0002;
pub const SMGR_TRUNCATE_FSM: u32 = 0x0004; pub const SMGR_TRUNCATE_FSM: u32 = 0x0004;
// from pg_config.h. These can be changed with configure options --with-blocksize=BLOCKSIZE and
// --with-segsize=SEGSIZE, but assume the defaults for now.
pub const BLCKSZ: u16 = 8192;
pub const RELSEG_SIZE: u32 = 1024 * 1024 * 1024 / (BLCKSZ as u32);
//
// constants from clog.h
//
pub const CLOG_XACTS_PER_BYTE: u32 = 4;
pub const CLOG_XACTS_PER_PAGE: u32 = BLCKSZ as u32 * CLOG_XACTS_PER_BYTE;
pub const CLOG_BITS_PER_XACT: u8 = 2;
pub const CLOG_XACT_BITMASK: u8 = (1 << CLOG_BITS_PER_XACT) - 1;
// //
// Constants from visbilitymap.h // Constants from visbilitymap.h
// //
@@ -33,6 +46,14 @@ pub const SIZE_OF_PAGE_HEADER: u16 = 24;
pub const BITS_PER_HEAPBLOCK: u16 = 2; pub const BITS_PER_HEAPBLOCK: u16 = 2;
pub const HEAPBLOCKS_PER_PAGE: u16 = (BLCKSZ - SIZE_OF_PAGE_HEADER) * 8 / BITS_PER_HEAPBLOCK; pub const HEAPBLOCKS_PER_PAGE: u16 = (BLCKSZ - SIZE_OF_PAGE_HEADER) * 8 / BITS_PER_HEAPBLOCK;
pub const TRANSACTION_STATUS_IN_PROGRESS: u8 = 0x00;
pub const TRANSACTION_STATUS_COMMITTED: u8 = 0x01;
pub const TRANSACTION_STATUS_ABORTED: u8 = 0x02;
pub const TRANSACTION_STATUS_SUB_COMMITTED: u8 = 0x03;
pub const CLOG_ZEROPAGE: u8 = 0x00;
pub const CLOG_TRUNCATE: u8 = 0x10;
// From xact.h // From xact.h
pub const XLOG_XACT_COMMIT: u8 = 0x00; pub const XLOG_XACT_COMMIT: u8 = 0x00;
pub const XLOG_XACT_PREPARE: u8 = 0x10; pub const XLOG_XACT_PREPARE: u8 = 0x10;
@@ -40,6 +61,10 @@ pub const XLOG_XACT_ABORT: u8 = 0x20;
pub const XLOG_XACT_COMMIT_PREPARED: u8 = 0x30; pub const XLOG_XACT_COMMIT_PREPARED: u8 = 0x30;
pub const XLOG_XACT_ABORT_PREPARED: u8 = 0x40; pub const XLOG_XACT_ABORT_PREPARED: u8 = 0x40;
// From srlu.h
pub const SLRU_PAGES_PER_SEGMENT: u32 = 32;
pub const SLRU_SEG_SIZE: usize = BLCKSZ as usize * SLRU_PAGES_PER_SEGMENT as usize;
/* mask for filtering opcodes out of xl_info */ /* mask for filtering opcodes out of xl_info */
pub const XLOG_XACT_OPMASK: u8 = 0x70; pub const XLOG_XACT_OPMASK: u8 = 0x70;
pub const XLOG_HEAP_OPMASK: u8 = 0x70; pub const XLOG_HEAP_OPMASK: u8 = 0x70;
@@ -60,8 +85,32 @@ pub const XACT_XINFO_HAS_TWOPHASE: u32 = 1u32 << 4;
// pub const XACT_XINFO_HAS_GID: u32 = 1u32 << 7; // pub const XACT_XINFO_HAS_GID: u32 = 1u32 << 7;
// From pg_control.h and rmgrlist.h // From pg_control.h and rmgrlist.h
pub const XLOG_NEXTOID: u8 = 0x30;
pub const XLOG_SWITCH: u8 = 0x40; pub const XLOG_SWITCH: u8 = 0x40;
pub const XLOG_SMGR_TRUNCATE: u8 = 0x20; pub const XLOG_SMGR_TRUNCATE: u8 = 0x20;
pub const DB_SHUTDOWNED: u32 = 1;
// From multixact.h
pub const FIRST_MULTIXACT_ID: u32 = 1;
pub const MAX_MULTIXACT_ID: u32 = 0xFFFFFFFF;
pub const XLOG_MULTIXACT_ZERO_OFF_PAGE: u8 = 0x00;
pub const XLOG_MULTIXACT_ZERO_MEM_PAGE: u8 = 0x10;
pub const XLOG_MULTIXACT_CREATE_ID: u8 = 0x20;
pub const XLOG_MULTIXACT_TRUNCATE_ID: u8 = 0x30;
pub const MULTIXACT_OFFSETS_PER_PAGE: u16 = BLCKSZ / 4;
pub const MXACT_MEMBER_BITS_PER_XACT: u16 = 8;
pub const MXACT_MEMBER_FLAGS_PER_BYTE: u16 = 1;
pub const MULTIXACT_FLAGBYTES_PER_GROUP: u16 = 4;
pub const MULTIXACT_MEMBERS_PER_MEMBERGROUP: u16 =
MULTIXACT_FLAGBYTES_PER_GROUP * MXACT_MEMBER_FLAGS_PER_BYTE;
/* size in bytes of a complete group */
pub const MULTIXACT_MEMBERGROUP_SIZE: u16 =
4 * MULTIXACT_MEMBERS_PER_MEMBERGROUP + MULTIXACT_FLAGBYTES_PER_GROUP;
pub const MULTIXACT_MEMBERGROUPS_PER_PAGE: u16 = BLCKSZ / MULTIXACT_MEMBERGROUP_SIZE;
pub const MULTIXACT_MEMBERS_PER_PAGE: u16 =
MULTIXACT_MEMBERGROUPS_PER_PAGE * MULTIXACT_MEMBERS_PER_MEMBERGROUP;
// From heapam_xlog.h // From heapam_xlog.h
pub const XLOG_HEAP_INSERT: u8 = 0x00; pub const XLOG_HEAP_INSERT: u8 = 0x00;
@@ -101,11 +150,6 @@ pub const XLOG_TBLSPC_DROP: u8 = 0x10;
pub const SIZEOF_XLOGRECORD: u32 = 24; pub const SIZEOF_XLOGRECORD: u32 = 24;
// from pg_config.h. These can be changed with configure options --with-blocksize=BLOCKSIZE and
// --with-segsize=SEGSIZE, but assume the defaults for now.
pub const BLCKSZ: u16 = 8192;
pub const RELSEG_SIZE: u32 = 1024 * 1024 * 1024 / (BLCKSZ as u32);
// //
// from xlogrecord.h // from xlogrecord.h
// //
@@ -136,3 +180,8 @@ pub const FIRST_NORMAL_OBJECT_ID: u32 = 16384;
/* FIXME: pageserver should request wal_seg_size from compute node */ /* FIXME: pageserver should request wal_seg_size from compute node */
pub const WAL_SEGMENT_SIZE: usize = 16 * 1024 * 1024; pub const WAL_SEGMENT_SIZE: usize = 16 * 1024 * 1024;
pub const XLOG_BLCKSZ: usize = 8192;
pub const XLOG_CHECKPOINT_SHUTDOWN: u8 = 0x00;
pub const XLOG_CHECKPOINT_ONLINE: u8 = 0x10;
pub const XLP_LONG_HEADER: u16 = 0x0002;

View File

@@ -9,14 +9,16 @@
use crate::pg_constants; use crate::pg_constants;
use crate::CheckPoint; use crate::CheckPoint;
use crate::ControlFileData;
use crate::FullTransactionId; use crate::FullTransactionId;
use crate::XLogLongPageHeaderData; use crate::XLogLongPageHeaderData;
use crate::XLogPageHeaderData; use crate::XLogPageHeaderData;
use crate::XLogRecord; use crate::XLogRecord;
use crate::XLOG_PAGE_MAGIC; use crate::XLOG_PAGE_MAGIC;
use byteorder::{ByteOrder, LittleEndian}; use byteorder::{ByteOrder, LittleEndian};
use bytes::{Buf, Bytes}; use bytes::{Buf, Bytes};
use bytes::{BufMut, BytesMut};
use crc32c::*; use crc32c::*;
use log::*; use log::*;
use std::cmp::min; use std::cmp::min;
@@ -35,12 +37,15 @@ pub const MAX_SEND_SIZE: usize = XLOG_BLCKSZ * 16;
pub const XLOG_SIZE_OF_XLOG_SHORT_PHD: usize = std::mem::size_of::<XLogPageHeaderData>(); pub const XLOG_SIZE_OF_XLOG_SHORT_PHD: usize = std::mem::size_of::<XLogPageHeaderData>();
pub const XLOG_SIZE_OF_XLOG_LONG_PHD: usize = std::mem::size_of::<XLogLongPageHeaderData>(); pub const XLOG_SIZE_OF_XLOG_LONG_PHD: usize = std::mem::size_of::<XLogLongPageHeaderData>();
pub const XLOG_SIZE_OF_XLOG_RECORD: usize = std::mem::size_of::<XLogRecord>(); pub const XLOG_SIZE_OF_XLOG_RECORD: usize = std::mem::size_of::<XLogRecord>();
pub const SIZE_OF_XLOG_RECORD_DATA_HEADER_SHORT: usize = 1 * 2;
pub type XLogRecPtr = u64; pub type XLogRecPtr = u64;
pub type TimeLineID = u32; pub type TimeLineID = u32;
pub type TimestampTz = u64; pub type TimestampTz = u64;
pub type XLogSegNo = u64; pub type XLogSegNo = u64;
const XID_CHECKPOINT_INTERVAL: u32 = 1024;
#[allow(non_snake_case)] #[allow(non_snake_case)]
pub fn XLogSegmentsPerXLogId(wal_segsz_bytes: usize) -> XLogSegNo { pub fn XLogSegmentsPerXLogId(wal_segsz_bytes: usize) -> XLogSegNo {
(0x100000000u64 / wal_segsz_bytes as u64) as XLogSegNo (0x100000000u64 / wal_segsz_bytes as u64) as XLogSegNo
@@ -368,6 +373,7 @@ impl CheckPoint {
// Next XID should be greater than new_xid. // Next XID should be greater than new_xid.
// Also take in account 32-bit wrap-around. // Also take in account 32-bit wrap-around.
pub fn update_next_xid(&mut self, xid: u32) { pub fn update_next_xid(&mut self, xid: u32) {
let xid = xid.wrapping_add(XID_CHECKPOINT_INTERVAL - 1) & !(XID_CHECKPOINT_INTERVAL - 1);
let full_xid = self.nextXid.value; let full_xid = self.nextXid.value;
let new_xid = std::cmp::max(xid + 1, pg_constants::FIRST_NORMAL_TRANSACTION_ID); let new_xid = std::cmp::max(xid + 1, pg_constants::FIRST_NORMAL_TRANSACTION_ID);
let old_xid = full_xid as u32; let old_xid = full_xid as u32;
@@ -383,3 +389,65 @@ impl CheckPoint {
} }
} }
} }
//
// Generate new WAL segment with single XLOG_CHECKPOINT_SHUTDOWN record.
// We need this segment to start compute node.
// In order to minimize changes in Postgres core, we prefer to
// provide WAL segment from which is can extract checkpoint record in standard way,
// rather then implement some alternative mechanism.
//
pub fn generate_wal_segment(pg_control: &ControlFileData) -> Bytes {
let mut seg_buf = BytesMut::with_capacity(pg_constants::WAL_SEGMENT_SIZE as usize);
let hdr = XLogLongPageHeaderData {
std: {
XLogPageHeaderData {
xlp_magic: XLOG_PAGE_MAGIC as u16,
xlp_info: pg_constants::XLP_LONG_HEADER,
xlp_tli: 1, // FIXME: always use Postgres timeline 1
xlp_pageaddr: pg_control.checkPoint - XLOG_SIZE_OF_XLOG_LONG_PHD as u64,
xlp_rem_len: 0,
}
},
xlp_sysid: pg_control.system_identifier,
xlp_seg_size: pg_constants::WAL_SEGMENT_SIZE as u32,
xlp_xlog_blcksz: XLOG_BLCKSZ as u32,
};
let hdr_bytes = hdr.encode();
seg_buf.extend_from_slice(&hdr_bytes);
let rec_hdr = XLogRecord {
xl_tot_len: (XLOG_SIZE_OF_XLOG_RECORD
+ SIZE_OF_XLOG_RECORD_DATA_HEADER_SHORT
+ SIZEOF_CHECKPOINT) as u32,
xl_xid: 0, //0 is for InvalidTransactionId
xl_prev: 0,
xl_info: pg_constants::XLOG_CHECKPOINT_SHUTDOWN,
xl_rmid: pg_constants::RM_XLOG_ID,
xl_crc: 0,
};
let mut rec_shord_hdr_bytes = BytesMut::new();
rec_shord_hdr_bytes.put_u8(pg_constants::XLR_BLOCK_ID_DATA_SHORT);
rec_shord_hdr_bytes.put_u8(SIZEOF_CHECKPOINT as u8);
let rec_bytes = rec_hdr.encode();
let checkpoint_bytes = pg_control.checkPointCopy.encode();
//calculate record checksum
let mut crc = 0;
crc = crc32c_append(crc, &rec_shord_hdr_bytes[..]);
crc = crc32c_append(crc, &checkpoint_bytes[..]);
crc = crc32c_append(crc, &rec_bytes[0..XLOG_RECORD_CRC_OFFS]);
seg_buf.extend_from_slice(&rec_bytes[0..XLOG_RECORD_CRC_OFFS]);
seg_buf.put_u32_le(crc);
seg_buf.extend_from_slice(&rec_shord_hdr_bytes);
seg_buf.extend_from_slice(&checkpoint_bytes);
//zero out remainig file
seg_buf.resize(pg_constants::WAL_SEGMENT_SIZE, 0);
seg_buf.freeze()
}

View File

@@ -47,8 +47,8 @@ def test_gc(zenith_cli, pageserver, postgres, pg_bin):
print("GC duration {elapsed} ms, relations: {n_relations}, dropped {dropped}, truncated: {truncated}, deleted: {deleted}".format_map(row)) print("GC duration {elapsed} ms, relations: {n_relations}, dropped {dropped}, truncated: {truncated}, deleted: {deleted}".format_map(row))
assert row['n_relations'] == n_relations assert row['n_relations'] == n_relations
assert row['dropped'] == 0 assert row['dropped'] == 0
assert row['truncated'] == 1 assert row['truncated'] == 30
assert row['deleted'] == 1 assert row['deleted'] == 3
# Insert two more rows and run GC. # Insert two more rows and run GC.
print("Inserting two more rows and running GC") print("Inserting two more rows and running GC")
@@ -60,7 +60,7 @@ def test_gc(zenith_cli, pageserver, postgres, pg_bin):
print("GC duration {elapsed} ms, relations: {n_relations}, dropped {dropped}, truncated: {truncated}, deleted: {deleted}".format_map(row)) print("GC duration {elapsed} ms, relations: {n_relations}, dropped {dropped}, truncated: {truncated}, deleted: {deleted}".format_map(row))
assert row['n_relations'] == n_relations assert row['n_relations'] == n_relations
assert row['dropped'] == 0 assert row['dropped'] == 0
assert row['truncated'] == 1 assert row['truncated'] == 30
assert row['deleted'] == 2 assert row['deleted'] == 2
# Insert one more row. It creates one more page version, but doesn't affect the # Insert one more row. It creates one more page version, but doesn't affect the
@@ -73,7 +73,7 @@ def test_gc(zenith_cli, pageserver, postgres, pg_bin):
print("GC duration {elapsed} ms, relations: {n_relations}, dropped {dropped}, truncated: {truncated}, deleted: {deleted}".format_map(row)) print("GC duration {elapsed} ms, relations: {n_relations}, dropped {dropped}, truncated: {truncated}, deleted: {deleted}".format_map(row))
assert row['n_relations'] == n_relations assert row['n_relations'] == n_relations
assert row['dropped'] == 0 assert row['dropped'] == 0
assert row['truncated'] == 1 assert row['truncated'] == 30
assert row['deleted'] == 1 assert row['deleted'] == 1
# Run GC again, with no changes in the database. Should not remove anything. # Run GC again, with no changes in the database. Should not remove anything.
@@ -82,7 +82,7 @@ def test_gc(zenith_cli, pageserver, postgres, pg_bin):
print("GC duration {elapsed} ms, relations: {n_relations}, dropped {dropped}, truncated: {truncated}, deleted: {deleted}".format_map(row)) print("GC duration {elapsed} ms, relations: {n_relations}, dropped {dropped}, truncated: {truncated}, deleted: {deleted}".format_map(row))
assert row['n_relations'] == n_relations assert row['n_relations'] == n_relations
assert row['dropped'] == 0 assert row['dropped'] == 0
assert row['truncated'] == 0 assert row['truncated'] == 30
assert row['deleted'] == 0 assert row['deleted'] == 0
# #

View File

@@ -19,7 +19,7 @@ def test_restart_compute(zenith_cli, pageserver, postgres, pg_bin):
cur.execute("INSERT INTO foo VALUES ('bar')") cur.execute("INSERT INTO foo VALUES ('bar')")
# Stop and restart the Postgres instance # Stop and restart the Postgres instance
pg.stop().start() pg.stop_and_destroy().create_start('test_restart_compute')
with closing(pg.connect()) as conn: with closing(pg.connect()) as conn:
with conn.cursor() as cur: with conn.cursor() as cur: