Compare commits

...

15 Commits

Author SHA1 Message Date
anastasia
01e9011ea4 add more checks to test_oldestXid: check also oldest_xid_dbid and oldest_active_xid 2021-05-25 17:34:18 +03:00
anastasia
f61e92b692 WIP: fix oldestXid and oldestMulti handling in pageserver 2021-05-25 17:16:14 +03:00
anastasia
3f89016bd7 add test_oldestXid to check value after node restart 2021-05-25 17:14:55 +03:00
Konstantin Knizhnik
3865a8d901 Fix dropping of relational/non-relational data from storage 2021-05-25 15:13:14 +03:00
anastasia
2b7963056c add more context for error message 2021-05-25 12:13:42 +03:00
anastasia
1e08fda040 minor impovement: handle PG_CHECKPOINT_FORKNUM in debug messages 2021-05-25 12:13:20 +03:00
Heikki Linnakangas
47f9ff6410 Add a test for restarting and recreating compute node.
This is working. Let's keep it that way.
2021-05-24 15:49:20 +03:00
anastasia
b72939530c increase wait_lsn timeout to make tests more stable 2021-05-24 15:40:49 +03:00
Konstantin Knizhnik
72e373cd4e Fix maintaining of next multixact 2021-05-24 12:29:14 +03:00
Heikki Linnakangas
e3ea9cf70f Fix rocksdb get_relsize() implementation to work with historic LSNs. 2021-05-24 12:01:30 +03:00
anastasia
e2a9b4cc9b Add test_multixact to check that we replay multixact and advance next_multixact_id correctly 2021-05-24 11:57:23 +03:00
Konstantin Knizhnik
5b9ea6495e Merge with main branch 2021-05-24 11:48:39 +03:00
Konstantin Knizhnik
6ec8a90c78 Merge branch 'abstract_repository_iterator' into page_server_nonrel_wal_redo 2021-05-24 11:36:44 +03:00
Konstantin Knizhnik
e9c7665c81 Fix 2PC support 2021-05-21 20:01:44 +03:00
Konstantin Knizhnik
feb925c546 Refector repository API by introducing abstact iterator and removing specialized methods on top level 2021-05-21 17:36:36 +03:00
14 changed files with 862 additions and 387 deletions

View File

@@ -5,6 +5,7 @@ use std::sync::Arc;
use std::time::SystemTime;
use tar::{Builder, Header};
use walkdir::WalkDir;
use bytes::{BufMut, BytesMut};
use crate::repository::{BufferTag, RelTag, Timeline};
use postgres_ffi::relfile_utils::*;
@@ -101,7 +102,6 @@ fn add_relmap_files(
ar.append_path_with_name(&src_path, &dst_path)?;
format!("base/{}/pg_filenode.map", db.dbnode)
};
info!("Deliver {}", path);
assert!(img.len() == 512);
let header = new_tar_header(&path, img.len() as u64)?;
ar.append(&header, &img[..])?;
@@ -128,9 +128,13 @@ fn add_twophase_files(
blknum: *xid,
};
let img = timeline.get_page_at_lsn(tag, lsn)?;
let mut buf = BytesMut::new();
buf.extend_from_slice(&img[..]);
let crc = crc32c::crc32c(&img[..]);
buf.put_u32_le(crc);
let path = format!("pg_twophase/{:>08X}", xid);
let header = new_tar_header(&path, img.len() as u64)?;
ar.append(&header, &img[..])?;
let header = new_tar_header(&path, buf.len() as u64)?;
ar.append(&header, &buf[..])?;
}
Ok(())
}
@@ -154,11 +158,10 @@ fn add_pgcontrol_file(
let mut checkpoint = postgres_ffi::decode_checkpoint(checkpoint_bytes)?;
checkpoint.redo = lsn.0;
checkpoint.nextXid.value += 1;
// TODO: When we restart master there are no active transaction and oldestXid is
// equal to nextXid if there are no prepared transactions.
// Let's ignore them for a while...
checkpoint.oldestXid = checkpoint.nextXid.value as u32;
//checkpoint.oldestXid = checkpoint.nextXid.value as u32;
pg_control.checkPointCopy = checkpoint;
let pg_control_bytes = postgres_ffi::encode_pg_control(pg_control);
let header = new_tar_header("global/pg_control", pg_control_bytes.len() as u64)?;

View File

@@ -9,6 +9,8 @@ use postgres_ffi::relfile_utils::forknumber_to_name;
use std::fmt;
use std::sync::Arc;
use zenith_utils::lsn::Lsn;
use log::*;
use postgres_ffi::nonrelfile_utils::transaction_id_get_status;
///
/// A repository corresponds to one .zenith directory. One repository holds multiple
@@ -61,14 +63,23 @@ pub trait Timeline {
///
/// This will implicitly extend the relation, if the page is beyond the
/// current end-of-file.
fn put_wal_record(&self, tag: BufferTag, rec: WALRecord);
fn put_wal_record(&self, tag: BufferTag, rec: WALRecord) -> Result<()>;
/// Like put_wal_record, but with ready-made image of the page.
fn put_page_image(&self, tag: BufferTag, lsn: Lsn, img: Bytes);
fn put_page_image(&self, tag: BufferTag, lsn: Lsn, img: Bytes) -> Result<()>;
/// Truncate relation
fn put_truncation(&self, rel: RelTag, lsn: Lsn, nblocks: u32) -> Result<()>;
/// Drop relation or file segment
fn put_drop(&self, tag: BufferTag, lsn: Lsn) -> Result<()>;
/// Put raw data
fn put_raw_data(&self, key: RepositoryKey, data: &[u8]) -> Result<()>;
/// Get repository iterator
fn iterator(&self) -> Box<dyn RepositoryIterator + '_>;
/// Create a new database from a template database
///
/// In PostgreSQL, CREATE DATABASE works by scanning the data directory and
@@ -81,7 +92,49 @@ pub trait Timeline {
tablespace_id: Oid,
src_db_id: Oid,
src_tablespace_id: Oid,
) -> Result<()>;
) -> Result<()> {
let mut n = 0;
for forknum in &[
pg_constants::MAIN_FORKNUM,
pg_constants::FSM_FORKNUM,
pg_constants::VISIBILITYMAP_FORKNUM,
pg_constants::INIT_FORKNUM,
pg_constants::PG_FILENODEMAP_FORKNUM,
] {
let key = RepositoryKey {
tag: BufferTag {
rel: RelTag {
spcnode: src_tablespace_id,
dbnode: src_db_id,
relnode: 0,
forknum: *forknum,
},
blknum: 0,
},
lsn: Lsn(0),
};
let mut iter = self.iterator();
iter.first(&key);
while iter.valid() {
let mut key = iter.key();
if key.tag.rel.spcnode != src_tablespace_id || key.tag.rel.dbnode != src_db_id {
break;
}
key.tag.rel.spcnode = tablespace_id;
key.tag.rel.dbnode = db_id;
key.lsn = lsn;
self.put_raw_data(key, iter.value())?;
n += 1;
iter.next();
}
}
info!(
"Create database {}/{}, copy {} entries",
tablespace_id, db_id, n
);
Ok(())
}
///
/// Helper function to parse a WAL record and call the above functions for all the
@@ -106,14 +159,18 @@ pub trait Timeline {
blknum: blk.blkno,
};
let rec = WALRecord {
lsn,
will_init: blk.will_init || blk.apply_image,
rec: recdata.clone(),
main_data_offset: decoded.main_data_offset as u32,
};
if blk.will_drop {
self.put_drop(tag, lsn)?;
} else {
let rec = WALRecord {
lsn,
will_init: blk.will_init || blk.apply_image,
rec: recdata.clone(),
main_data_offset: decoded.main_data_offset as u32,
};
self.put_wal_record(tag, rec);
self.put_wal_record(tag, rec)?;
}
}
// Handle a few special record types
@@ -167,15 +224,163 @@ pub trait Timeline {
fn advance_last_record_lsn(&self, lsn: Lsn);
fn get_last_record_lsn(&self) -> Lsn;
//
// Wait until WAL has been received up to the given LSN.
//
fn wait_lsn(&self, lsn: Lsn) -> Result<Lsn>;
/// Get range [begin,end) of stored blocks. Used mostly for SMGR pseudorelations
/// but can be also applied to normal relations.
fn get_range(&self, rel: RelTag, lsn: Lsn) -> Result<(u32, u32)>;
fn get_range(&self, rel: RelTag, lsn: Lsn) -> Result<(u32, u32)> {
let _lsn = self.wait_lsn(lsn)?;
let mut key = RepositoryKey {
// minimal key to start with
tag: BufferTag { rel, blknum: 0 },
lsn: Lsn(0),
};
let mut iter = self.iterator();
iter.first(&key);
if iter.valid() {
let thiskey = iter.key();
let tag = thiskey.tag;
if tag.rel == rel {
// still trversing this relation
let first_blknum = tag.blknum;
key.tag.blknum = u32::MAX; // maximal key
iter.last(&key); // locate last entry
if iter.valid() {
let thiskey = iter.key();
let last_blknum = thiskey.tag.blknum;
return Ok((first_blknum, last_blknum + 1)); // upper boundary is exclusive
}
}
}
Ok((0, 0)) // empty range
}
/// Get vector of databases (represented using RelTag only dbnode and spcnode fields are used)
fn get_databases(&self, lsn: Lsn) -> Result<Vec<RelTag>>;
fn get_databases(&self, lsn: Lsn) -> Result<Vec<RelTag>> {
let key = RepositoryKey {
// minimal key
tag: BufferTag {
rel: RelTag {
forknum: pg_constants::PG_FILENODEMAP_FORKNUM,
spcnode: 0,
dbnode: 0,
relnode: 0,
},
blknum: 0,
},
lsn: Lsn(0),
};
let mut dbs = Vec::new();
let mut iter = self.iterator();
iter.first(&key);
let mut prev_tag = key.tag.rel;
while iter.valid() {
let key = iter.key();
if key.tag.rel.forknum != pg_constants::PG_FILENODEMAP_FORKNUM {
break; // we are done with this fork
}
if key.tag.rel != prev_tag && key.lsn <= lsn {
prev_tag = key.tag.rel;
dbs.push(prev_tag); // collect unique tags
}
iter.next();
}
return Ok(dbs);
}
fn get_tx_status(&self, xid: TransactionId, lsn: Lsn) -> Result<u8> {
let tag = BufferTag {
rel: RelTag {
forknum: pg_constants::PG_XACT_FORKNUM,
spcnode: 0,
dbnode: 0,
relnode: 0,
},
blknum: xid / pg_constants::CLOG_XACTS_PER_PAGE,
};
let clog_page = self.get_page_at_lsn(tag, lsn)?;
let status = transaction_id_get_status(xid, &clog_page[..]);
Ok(status)
}
/// Get vector of prepared twophase transactions
fn get_twophase(&self, lsn: Lsn) -> Result<Vec<TransactionId>>;
fn get_twophase(&self, lsn: Lsn) -> Result<Vec<TransactionId>> {
let key = RepositoryKey {
// minimal key
tag: BufferTag {
rel: RelTag {
forknum: pg_constants::PG_TWOPHASE_FORKNUM,
spcnode: 0,
dbnode: 0,
relnode: 0,
},
blknum: 0,
},
lsn: Lsn(0),
};
let mut gxacts = Vec::new();
let mut iter = self.iterator();
iter.first(&key);
while iter.valid() {
let key = iter.key();
if key.tag.rel.forknum != pg_constants::PG_TWOPHASE_FORKNUM {
break; // we are done with this fork
}
if key.lsn <= lsn {
let xid = key.tag.blknum;
if self.get_tx_status(xid, lsn)? == pg_constants::TRANSACTION_STATUS_IN_PROGRESS {
gxacts.push(xid);
}
}
iter.next();
}
return Ok(gxacts);
}
}
#[derive(Debug, PartialEq, Eq, PartialOrd, Ord, Clone)]
pub struct RepositoryKey {
pub tag: BufferTag,
pub lsn: Lsn,
}
impl RepositoryKey {
fn pack(&self, buf: &mut BytesMut) {
self.tag.pack(buf);
buf.put_u64(self.lsn.0);
}
fn unpack(buf: &mut Bytes) -> RepositoryKey {
RepositoryKey {
tag: BufferTag::unpack(buf),
lsn: Lsn::from(buf.get_u64()),
}
}
fn from_slice(slice: &[u8]) -> Self {
let mut buf = Bytes::copy_from_slice(slice);
Self::unpack(&mut buf)
}
fn to_bytes(&self) -> BytesMut {
let mut buf = BytesMut::new();
self.pack(&mut buf);
buf
}
}
pub trait RepositoryIterator {
fn first(&mut self, key: &RepositoryKey);
fn last(&mut self, key: &RepositoryKey);
fn next(&mut self);
fn prev(&mut self);
fn valid(&self) -> bool;
fn key(&self) -> RepositoryKey;
fn value(&self) -> &[u8];
}
#[derive(Clone)]
@@ -362,13 +567,6 @@ mod tests {
}
/// Test get_relsize() and truncation.
///
/// FIXME: The RocksRepository implementation returns wrong relation size, if
/// you make a request with an old LSN. It seems to ignore the requested LSN
/// and always return result as of latest LSN. For such cases, the expected
/// results below match the current RocksRepository behavior, so that the test
/// passes, and the actually correct answers are in comments like
/// "// CORRECT: <correct answer>"
#[test]
fn test_relsize() -> Result<()> {
// get_timeline() with non-existent timeline id should fail
@@ -380,21 +578,23 @@ mod tests {
let tline = repo.create_empty_timeline(timelineid)?;
tline.init_valid_lsn(Lsn(1));
tline.put_page_image(TEST_BUF(0), Lsn(2), TEST_IMG("foo blk 0 at 2"));
tline.put_page_image(TEST_BUF(0), Lsn(2), TEST_IMG("foo blk 0 at 2"));
tline.put_page_image(TEST_BUF(0), Lsn(3), TEST_IMG("foo blk 0 at 3"));
tline.put_page_image(TEST_BUF(1), Lsn(4), TEST_IMG("foo blk 1 at 4"));
tline.put_page_image(TEST_BUF(2), Lsn(5), TEST_IMG("foo blk 2 at 5"));
tline.put_page_image(TEST_BUF(0), Lsn(2), TEST_IMG("foo blk 0 at 2"))?;
tline.put_page_image(TEST_BUF(0), Lsn(2), TEST_IMG("foo blk 0 at 2"))?;
tline.put_page_image(TEST_BUF(0), Lsn(3), TEST_IMG("foo blk 0 at 3"))?;
tline.put_page_image(TEST_BUF(1), Lsn(4), TEST_IMG("foo blk 1 at 4"))?;
tline.put_page_image(TEST_BUF(2), Lsn(5), TEST_IMG("foo blk 2 at 5"))?;
tline.advance_last_valid_lsn(Lsn(5));
// FIXME: The rocksdb implementation erroneously returns 'true' here, even
// though the relation was created only at a later LSN
// rocksdb implementation erroneosly returns 'true' here
assert_eq!(tline.get_relsize_exists(TESTREL_A, Lsn(1))?, true); // CORRECT: false
// likewise, it returns wrong size here
assert_eq!(tline.get_relsize(TESTREL_A, Lsn(1))?, 3); // CORRECT: 0 (or error?)
// And this probably should throw an error, becaue the relation doesn't exist at Lsn(1) yet
assert_eq!(tline.get_relsize(TESTREL_A, Lsn(1))?, 0); // CORRECT: throw error
assert_eq!(tline.get_relsize_exists(TESTREL_A, Lsn(2))?, true);
assert_eq!(tline.get_relsize(TESTREL_A, Lsn(2))?, 3); // CORRECT: 1
assert_eq!(tline.get_relsize(TESTREL_A, Lsn(2))?, 1);
assert_eq!(tline.get_relsize(TESTREL_A, Lsn(5))?, 3);
// Check page contents at each LSN
@@ -446,7 +646,7 @@ mod tests {
);
// should still see the truncated block with older LSN
assert_eq!(tline.get_relsize(TESTREL_A, Lsn(5))?, 2); // CORRECT: 3
assert_eq!(tline.get_relsize(TESTREL_A, Lsn(5))?, 3);
assert_eq!(
tline.get_page_at_lsn(TEST_BUF(2), Lsn(5))?,
TEST_IMG("foo blk 2 at 5")
@@ -472,7 +672,7 @@ mod tests {
for i in 0..pg_constants::RELSEG_SIZE + 1 {
let img = TEST_IMG(&format!("foo blk {} at {}", i, Lsn(lsn)));
lsn += 1;
tline.put_page_image(TEST_BUF(i as u32), Lsn(lsn), img);
tline.put_page_image(TEST_BUF(i as u32), Lsn(lsn), img)?;
}
tline.advance_last_valid_lsn(Lsn(lsn));

View File

@@ -5,9 +5,8 @@
// full page images, keyed by the RelFileNode, blocknumber, and the
// LSN.
use crate::repository::{BufferTag, RelTag, Repository, Timeline, WALRecord};
use crate::repository::{BufferTag, RelTag, Repository, Timeline, WALRecord, RepositoryKey, RepositoryIterator};
use crate::restore_local_repo::restore_timeline;
use crate::waldecoder::{Oid, TransactionId};
use crate::walredo::WalRedoManager;
use crate::PageServerConf;
use crate::ZTimelineId;
@@ -16,7 +15,6 @@ use crate::ZTimelineId;
use anyhow::{bail, Context, Result};
use bytes::{Buf, BufMut, Bytes, BytesMut};
use log::*;
use postgres_ffi::nonrelfile_utils::transaction_id_get_status;
use postgres_ffi::*;
use std::cmp::min;
use std::collections::HashMap;
@@ -30,7 +28,7 @@ use zenith_utils::lsn::{AtomicLsn, Lsn};
use zenith_utils::seqwait::SeqWait;
// Timeout when waiting or WAL receiver to catch up to an LSN given in a GetPage@LSN call.
static TIMEOUT: Duration = Duration::from_secs(60);
static TIMEOUT: Duration = Duration::from_secs(600);
pub struct RocksRepository {
conf: &'static PageServerConf,
@@ -84,40 +82,12 @@ pub struct RocksTimeline {
// stored directly in the cache entry in that you still need to run the WAL redo
// routine to generate the page image.
//
#[derive(Debug, PartialEq, Eq, PartialOrd, Ord, Clone)]
struct CacheKey {
pub tag: BufferTag,
pub lsn: Lsn,
}
impl CacheKey {
fn pack(&self, buf: &mut BytesMut) {
self.tag.pack(buf);
buf.put_u64(self.lsn.0);
}
fn unpack(buf: &mut Bytes) -> CacheKey {
CacheKey {
tag: BufferTag::unpack(buf),
lsn: Lsn::from(buf.get_u64()),
}
}
fn from_slice(slice: &[u8]) -> Self {
let mut buf = Bytes::copy_from_slice(slice);
Self::unpack(&mut buf)
}
fn to_bytes(&self) -> BytesMut {
let mut buf = BytesMut::new();
self.pack(&mut buf);
buf
}
}
enum CacheEntryContent {
PageImage(Bytes),
WALRecord(WALRecord),
Truncation,
Drop,
}
// The serialized representation of a CacheEntryContent begins with
@@ -125,11 +95,12 @@ enum CacheEntryContent {
// an UNUSED_VERSION_FLAG that is not represented in the CacheEntryContent
// at all, you must peek into the first byte of the serialized representation
// to read it.
const CONTENT_PAGE_IMAGE: u8 = 1u8;
const CONTENT_WAL_RECORD: u8 = 2u8;
const CONTENT_TRUNCATION: u8 = 3u8;
const CONTENT_PAGE_IMAGE: u8 = 0u8;
const CONTENT_WAL_RECORD: u8 = 1u8;
const CONTENT_TRUNCATION: u8 = 2u8;
const CONTENT_DROP: u8 = 3u8;
const CONTENT_KIND_MASK: u8 = 3u8; // bitmask that covers the above
const CONTENT_KIND_MASK: u8 = 3u8; // bitmask that covers the above
const UNUSED_VERSION_FLAG: u8 = 4u8;
@@ -148,6 +119,9 @@ impl CacheEntryContent {
CacheEntryContent::Truncation => {
buf.put_u8(CONTENT_TRUNCATION);
}
CacheEntryContent::Drop => {
buf.put_u8(CONTENT_DROP);
}
}
}
pub fn unpack(buf: &mut Bytes) -> CacheEntryContent {
@@ -162,6 +136,7 @@ impl CacheEntryContent {
}
CONTENT_WAL_RECORD => CacheEntryContent::WALRecord(WALRecord::unpack(buf)),
CONTENT_TRUNCATION => CacheEntryContent::Truncation,
CONTENT_DROP => CacheEntryContent::Drop,
_ => unreachable!(),
}
}
@@ -190,6 +165,33 @@ impl RocksRepository {
}
}
}
struct RocksIterator<'a> {
iter: rocksdb::DBRawIterator<'a>,
}
impl<'a> RepositoryIterator for RocksIterator<'a> {
fn next(&mut self) {
self.iter.next()
}
fn prev(&mut self) {
self.iter.prev()
}
fn valid(&self) -> bool {
self.iter.valid()
}
fn first(&mut self, key: &RepositoryKey) {
self.iter.seek(key.to_bytes())
}
fn last(&mut self, key: &RepositoryKey) {
self.iter.seek_for_prev(key.to_bytes())
}
fn key(&self) -> RepositoryKey {
RepositoryKey::from_slice(self.iter.key().unwrap())
}
fn value(&self) -> &[u8] {
self.iter.value().unwrap()
}
}
// Get handle to a given timeline. It is assumed to already exist.
impl Repository for RocksRepository {
@@ -299,21 +301,21 @@ impl RocksTimeline {
tag: BufferTag,
lsn: Lsn,
) -> (Option<Bytes>, Vec<WALRecord>) {
let key = CacheKey { tag, lsn };
let key = RepositoryKey { tag, lsn };
let mut base_img: Option<Bytes> = None;
let mut records: Vec<WALRecord> = Vec::new();
let mut iter = self.db.raw_iterator();
iter.seek_for_prev(key.to_bytes());
let mut iter = self.iterator();
iter.last(&key);
// Scan backwards, collecting the WAL records, until we hit an
// old page image.
while iter.valid() {
let key = CacheKey::from_slice(iter.key().unwrap());
let key = iter.key();
if key.tag != tag {
break;
}
let content = CacheEntryContent::from_slice(iter.value().unwrap());
let content = CacheEntryContent::from_slice(iter.value());
if let CacheEntryContent::PageImage(img) = content {
// We have a base image. No need to dig deeper into the list of
// records
@@ -344,20 +346,26 @@ impl RocksTimeline {
fn relsize_get_nowait(&self, rel: RelTag, lsn: Lsn) -> Result<u32> {
assert!(lsn <= self.last_valid_lsn.load());
let mut key = CacheKey {
let mut key = RepositoryKey {
tag: BufferTag {
rel,
blknum: u32::MAX,
},
lsn,
};
let mut iter = self.db.raw_iterator();
let mut iter = self.iterator();
loop {
iter.seek_for_prev(key.to_bytes());
iter.last(&key);
if iter.valid() {
let thiskey = CacheKey::from_slice(iter.key().unwrap());
let thiskey = iter.key();
if thiskey.tag.rel == rel {
let content = CacheEntryContent::from_slice(iter.value().unwrap());
// Ignore entries with later LSNs.
if thiskey.lsn > lsn {
key.tag.blknum = thiskey.tag.blknum;
continue;
}
let content = CacheEntryContent::from_slice(iter.value());
if let CacheEntryContent::Truncation = content {
if thiskey.tag.blknum > 0 {
key.tag.blknum = thiskey.tag.blknum - 1;
@@ -376,6 +384,46 @@ impl RocksTimeline {
Ok(0)
}
///
/// Drop relations with all its forks or non-relational file
///
fn drop(&self, tag: BufferTag) -> Result<()> {
let mut iter = self.iterator();
let mut key = RepositoryKey { tag, lsn: Lsn(u64::MAX) };
if tag.rel.forknum == pg_constants::MAIN_FORKNUM {
// if it is relation then remove all its blocks in all forks
key.tag.blknum = u32::MAX;
key.tag.rel.forknum = pg_constants::INIT_FORKNUM;
} else {
assert!(tag.rel.forknum > pg_constants::INIT_FORKNUM);
}
iter.last(&key);
while iter.valid() {
let key = iter.key();
if key.tag.rel.relnode != tag.rel.relnode ||
key.tag.rel.spcnode != tag.rel.spcnode ||
key.tag.rel.dbnode != tag.rel.dbnode ||
(key.tag.rel.forknum != tag.rel.forknum &&
tag.rel.forknum != pg_constants::MAIN_FORKNUM)
{
// no more entries belonging to this relation or file
break;
}
let v = iter.value();
if (v[0] & UNUSED_VERSION_FLAG) == 0 {
let mut v = v.to_owned();
v[0] |= UNUSED_VERSION_FLAG;
self.db.put(key.to_bytes(), &v[..])?;
} else {
// already marked for deletion
break;
}
iter.prev();
}
Ok(())
}
fn do_gc(&self, conf: &'static PageServerConf) -> Result<Bytes> {
loop {
thread::sleep(conf.gc_period);
@@ -383,7 +431,7 @@ impl RocksTimeline {
// checked_sub() returns None on overflow.
if let Some(horizon) = last_lsn.checked_sub(conf.gc_horizon) {
let mut maxkey = CacheKey {
let mut maxkey = RepositoryKey {
tag: BufferTag {
rel: RelTag {
spcnode: u32::MAX,
@@ -401,35 +449,50 @@ impl RocksTimeline {
let mut inspected = 0u64;
let mut deleted = 0u64;
loop {
let mut iter = self.db.raw_iterator();
iter.seek_for_prev(maxkey.to_bytes());
let mut iter = self.iterator();
iter.last(&maxkey);
if iter.valid() {
let key = CacheKey::from_slice(iter.key().unwrap());
let v = iter.value().unwrap();
let key = iter.key();
let v = iter.value();
let flag = v[0];
let last_lsn = key.lsn;
inspected += 1;
// Construct boundaries for old records cleanup
maxkey.tag = key.tag;
let last_lsn = key.lsn;
maxkey.lsn = min(horizon, last_lsn); // do not remove last version
let mut minkey = maxkey.clone();
minkey.lsn = Lsn(0); // first version
// Special handling of delete of PREPARE WAL record
if (flag & CONTENT_KIND_MASK) == CONTENT_DROP {
// If drop record in over the horizon then delete all entries from repository
if last_lsn < horizon {
self.drop(key.tag)?;
}
maxkey = minkey;
continue;
}
// Special handling of PREPARE transaction WAL record.
if last_lsn < horizon
&& key.tag.rel.forknum == pg_constants::PG_TWOPHASE_FORKNUM
{
if (v[0] & UNUSED_VERSION_FLAG) == 0 {
let mut v = v.to_owned();
v[0] |= UNUSED_VERSION_FLAG;
self.db.put(key.to_bytes(), &v[..])?;
deleted += 1;
}
maxkey = minkey;
continue;
}
let xid = key.tag.blknum;
// We can not remove information about uncompleted prepared transaction
// even if it is over horizon
if self.get_tx_status(xid, horizon)? != pg_constants::TRANSACTION_STATUS_IN_PROGRESS {
if (v[0] & UNUSED_VERSION_FLAG) == 0 {
let mut v = v.to_owned();
v[0] |= UNUSED_VERSION_FLAG;
self.db.put(key.to_bytes(), &v[..])?;
deleted += 1;
}
}
maxkey = minkey;
continue;
}
// reconstruct most recent page version
if (v[0] & CONTENT_KIND_MASK) == CONTENT_WAL_RECORD {
// force reconstruction of most recent page version
@@ -447,19 +510,19 @@ impl RocksTimeline {
let new_img = self
.walredo_mgr
.request_redo(key.tag, key.lsn, base_img, records)?;
self.put_page_image(key.tag, key.lsn, new_img.clone());
self.put_page_image(key.tag, key.lsn, new_img.clone())?;
reconstructed += 1;
}
iter.seek_for_prev(maxkey.to_bytes());
iter.last(&maxkey);
if iter.valid() {
// do not remove last version
if last_lsn > horizon {
// locate most recent record before horizon
let key = CacheKey::from_slice(iter.key().unwrap());
let key = iter.key();
if key.tag == maxkey.tag {
let v = iter.value().unwrap();
let v = iter.value();
if (v[0] & CONTENT_KIND_MASK) == CONTENT_WAL_RECORD {
let (base_img, records) =
self.collect_records_for_apply(key.tag, key.lsn);
@@ -468,7 +531,7 @@ impl RocksTimeline {
let new_img = self
.walredo_mgr
.request_redo(key.tag, key.lsn, base_img, records)?;
self.put_page_image(key.tag, key.lsn, new_img.clone());
self.put_page_image(key.tag, key.lsn, new_img.clone())?;
truncated += 1;
} else {
@@ -495,11 +558,11 @@ impl RocksTimeline {
if !iter.valid() {
break;
}
let key = CacheKey::from_slice(iter.key().unwrap());
let key = iter.key();
if key.tag != maxkey.tag {
break;
}
let v = iter.value().unwrap();
let v = iter.value();
if (v[0] & UNUSED_VERSION_FLAG) == 0 {
let mut v = v.to_owned();
v[0] |= UNUSED_VERSION_FLAG;
@@ -526,35 +589,6 @@ impl RocksTimeline {
}
}
}
//
// Wait until WAL has been received up to the given LSN.
//
fn wait_lsn(&self, mut lsn: Lsn) -> Result<Lsn> {
// When invalid LSN is requested, it means "don't wait, return latest version of the page"
// This is necessary for bootstrap.
if lsn == Lsn(0) {
let last_valid_lsn = self.last_valid_lsn.load();
trace!(
"walreceiver doesn't work yet last_valid_lsn {}, requested {}",
last_valid_lsn,
lsn
);
lsn = last_valid_lsn;
}
//trace!("Start waiting for LSN {}, valid LSN is {}", lsn, self.last_valid_lsn.load());
self.last_valid_lsn
.wait_for_timeout(lsn, TIMEOUT)
.with_context(|| {
format!(
"Timed out while waiting for WAL record at LSN {} to arrive",
lsn
)
})?;
//trace!("Stop waiting for LSN {}, valid LSN is {}", lsn, self.last_valid_lsn.load());
Ok(lsn)
}
}
impl Timeline for RocksTimeline {
@@ -572,15 +606,15 @@ impl Timeline for RocksTimeline {
// Look up cache entry. If it's a page image, return that. If it's a WAL record,
// ask the WAL redo service to reconstruct the page image from the WAL records.
let key = CacheKey { tag, lsn };
let key = RepositoryKey { tag, lsn };
let mut iter = self.db.raw_iterator();
iter.seek_for_prev(key.to_bytes());
let mut iter = self.iterator();
iter.last(&key);
if iter.valid() {
let key = CacheKey::from_slice(iter.key().unwrap());
let key = iter.key();
if key.tag == tag {
let content = CacheEntryContent::from_slice(iter.value().unwrap());
let content = CacheEntryContent::from_slice(iter.value());
let page_img: Bytes;
if let CacheEntryContent::PageImage(img) = content {
page_img = img;
@@ -589,10 +623,16 @@ impl Timeline for RocksTimeline {
let (base_img, records) = self.collect_records_for_apply(tag, lsn);
page_img = self.walredo_mgr.request_redo(tag, lsn, base_img, records)?;
self.put_page_image(tag, lsn, page_img.clone());
self.put_page_image(tag, lsn, page_img.clone())?;
} else {
// No base image, and no WAL record. Huh?
bail!("no page image or WAL record for requested page");
bail!(
"no page image or WAL record for requested page {} blk {} at lsn {}({})",
tag.rel,
tag.blknum,
req_lsn,
lsn
);
}
// FIXME: assumes little-endian. Only used for the debugging log though
let page_lsn_hi =
@@ -623,116 +663,6 @@ impl Timeline for RocksTimeline {
self.relsize_get_nowait(rel, lsn)
}
/// Get vector of prepared twophase transactions
fn get_twophase(&self, lsn: Lsn) -> Result<Vec<TransactionId>> {
let key = CacheKey {
// minimal key
tag: BufferTag {
rel: RelTag {
forknum: pg_constants::PG_TWOPHASE_FORKNUM,
spcnode: 0,
dbnode: 0,
relnode: 0,
},
blknum: 0,
},
lsn: Lsn(0),
};
let mut gxacts = Vec::new();
let mut iter = self.db.raw_iterator();
iter.seek(key.to_bytes());
while iter.valid() {
let key = CacheKey::from_slice(iter.key().unwrap());
if key.tag.rel.forknum != pg_constants::PG_TWOPHASE_FORKNUM {
break; // we are done with this fork
}
if key.lsn <= lsn {
let xid = key.tag.blknum;
let tag = BufferTag {
rel: RelTag {
forknum: pg_constants::PG_XACT_FORKNUM,
spcnode: 0,
dbnode: 0,
relnode: 0,
},
blknum: xid / pg_constants::CLOG_XACTS_PER_PAGE,
};
let clog_page = self.get_page_at_lsn(tag, lsn)?;
let status = transaction_id_get_status(xid, &clog_page[..]);
if status == pg_constants::TRANSACTION_STATUS_IN_PROGRESS {
gxacts.push(xid);
}
}
iter.next();
}
Ok(gxacts)
}
/// Get databases. This function is used to local pg_filenode.map files
fn get_databases(&self, lsn: Lsn) -> Result<Vec<RelTag>> {
let key = CacheKey {
// minimal key
tag: BufferTag {
rel: RelTag {
forknum: pg_constants::PG_FILENODEMAP_FORKNUM,
spcnode: 0,
dbnode: 0,
relnode: 0,
},
blknum: 0,
},
lsn: Lsn(0),
};
let mut dbs = Vec::new();
let mut iter = self.db.raw_iterator();
iter.seek(key.to_bytes());
let mut prev_tag = key.tag.rel;
while iter.valid() {
let key = CacheKey::from_slice(iter.key().unwrap());
if key.tag.rel.forknum != pg_constants::PG_FILENODEMAP_FORKNUM {
break; // we are done with this fork
}
if key.tag.rel != prev_tag && key.lsn <= lsn {
prev_tag = key.tag.rel;
dbs.push(prev_tag); // collect unique tags
}
iter.next();
}
Ok(dbs)
}
/// Get range [begin,end) of stored blocks. Used mostly for SMGR pseudorelations
/// but can be also applied to normal relations.
fn get_range(&self, rel: RelTag, lsn: Lsn) -> Result<(u32, u32)> {
let _lsn = self.wait_lsn(lsn)?;
let mut key = CacheKey {
// minimal key to start with
tag: BufferTag { rel, blknum: 0 },
lsn: Lsn(0),
};
let mut iter = self.db.raw_iterator();
iter.seek(key.to_bytes()); // locate first entry
if iter.valid() {
let thiskey = CacheKey::from_slice(iter.key().unwrap());
let tag = thiskey.tag;
if tag.rel == rel {
// still trversing this relation
let first_blknum = tag.blknum;
key.tag.blknum = u32::MAX; // maximal key
let mut iter = self.db.raw_iterator();
iter.seek_for_prev(key.to_bytes()); // localte last entry
if iter.valid() {
let thiskey = CacheKey::from_slice(iter.key().unwrap());
let last_blknum = thiskey.tag.blknum;
return Ok((first_blknum, last_blknum + 1)); // upper boundary is exclusive
}
}
}
Ok((0, 0)) // empty range
}
///
/// Does relation exist at given LSN?
///
@@ -740,17 +670,17 @@ impl Timeline for RocksTimeline {
fn get_relsize_exists(&self, rel: RelTag, req_lsn: Lsn) -> Result<bool> {
let lsn = self.wait_lsn(req_lsn)?;
let key = CacheKey {
let key = RepositoryKey {
tag: BufferTag {
rel,
blknum: u32::MAX,
},
lsn,
};
let mut iter = self.db.raw_iterator();
iter.seek_for_prev(key.to_bytes());
let mut iter = self.iterator();
iter.last(&key);
if iter.valid() {
let key = CacheKey::from_slice(iter.key().unwrap());
let key = iter.key();
if key.tag.rel == rel {
debug!("Relation {} exists at {}", rel, lsn);
return Ok(true);
@@ -766,13 +696,13 @@ impl Timeline for RocksTimeline {
///
/// Adds a WAL record to the repository
///
fn put_wal_record(&self, tag: BufferTag, rec: WALRecord) {
fn put_wal_record(&self, tag: BufferTag, rec: WALRecord) -> Result<()> {
let lsn = rec.lsn;
let key = CacheKey { tag, lsn };
let key = RepositoryKey { tag, lsn };
let content = CacheEntryContent::WALRecord(rec);
let _res = self.db.put(key.to_bytes(), content.to_bytes());
let _res = self.db.put(key.to_bytes(), content.to_bytes())?;
trace!(
"put_wal_record rel {} blk {} at {}",
tag.rel,
@@ -782,8 +712,20 @@ impl Timeline for RocksTimeline {
self.num_entries.fetch_add(1, Ordering::Relaxed);
self.num_wal_records.fetch_add(1, Ordering::Relaxed);
Ok(())
}
///
/// Put drop record which should completely delete relation with all its forks
/// or non-relational file from repository
///
fn put_drop(&self, tag: BufferTag, lsn: Lsn) -> Result<()> {
let key = RepositoryKey { tag, lsn };
let content = CacheEntryContent::Drop;
let _res = self.db.put(key.to_bytes(), content.to_bytes())?;
Ok(())
}
///
/// Adds a relation-wide WAL record (like truncate) to the repository,
/// associating it with all pages started with specified block number
@@ -798,12 +740,12 @@ impl Timeline for RocksTimeline {
trace!("Truncate relation {} to {} blocks at {}", rel, nblocks, lsn);
for blknum in nblocks..old_rel_size {
let key = CacheKey {
let key = RepositoryKey {
tag: BufferTag { rel, blknum },
lsn,
};
trace!("put_wal_record lsn: {}", key.lsn);
let _res = self.db.put(key.to_bytes(), content.to_bytes());
let _res = self.db.put(key.to_bytes(), content.to_bytes())?;
}
let n = (old_rel_size - nblocks) as u64;
self.num_entries.fetch_add(n, Ordering::Relaxed);
@@ -815,7 +757,7 @@ impl Timeline for RocksTimeline {
/// Get page image at particular LSN
///
fn get_page_image(&self, tag: BufferTag, lsn: Lsn) -> Result<Option<Bytes>> {
let key = CacheKey { tag, lsn };
let key = RepositoryKey { tag, lsn };
if let Some(bytes) = self.db.get(key.to_bytes())? {
let content = CacheEntryContent::from_slice(&bytes);
if let CacheEntryContent::PageImage(img) = content {
@@ -828,26 +770,12 @@ impl Timeline for RocksTimeline {
///
/// Memorize a full image of a page version
///
fn put_page_image(&self, tag: BufferTag, lsn: Lsn, img: Bytes) {
let img_len = img.len();
let key = CacheKey { tag, lsn };
fn put_page_image(&self, tag: BufferTag, lsn: Lsn, img: Bytes) -> Result<()> {
let key = RepositoryKey { tag, lsn };
let content = CacheEntryContent::PageImage(img);
let mut val_buf = content.to_bytes();
// Zero size of page image indicates that page can be removed
if img_len == 0 {
if (val_buf[0] & UNUSED_VERSION_FLAG) != 0 {
// records already marked for deletion
return;
} else {
// delete truncated multixact page
val_buf[0] |= UNUSED_VERSION_FLAG;
}
}
trace!("put_wal_record lsn: {}", key.lsn);
let _res = self.db.put(key.to_bytes(), content.to_bytes());
let _res = self.db.put(key.to_bytes(), content.to_bytes())?;
trace!(
"put_page_image rel {} blk {} at {}",
@@ -856,57 +784,17 @@ impl Timeline for RocksTimeline {
lsn
);
self.num_page_images.fetch_add(1, Ordering::Relaxed);
Ok(())
}
fn put_create_database(
&self,
lsn: Lsn,
db_id: Oid,
tablespace_id: Oid,
src_db_id: Oid,
src_tablespace_id: Oid,
) -> Result<()> {
let mut n = 0;
for forknum in &[
pg_constants::MAIN_FORKNUM,
pg_constants::FSM_FORKNUM,
pg_constants::VISIBILITYMAP_FORKNUM,
pg_constants::INIT_FORKNUM,
pg_constants::PG_FILENODEMAP_FORKNUM,
] {
let key = CacheKey {
tag: BufferTag {
rel: RelTag {
spcnode: src_tablespace_id,
dbnode: src_db_id,
relnode: 0,
forknum: *forknum,
},
blknum: 0,
},
lsn: Lsn(0),
};
let mut iter = self.db.raw_iterator();
iter.seek(key.to_bytes());
while iter.valid() {
let mut key = CacheKey::from_slice(iter.key().unwrap());
if key.tag.rel.spcnode != src_tablespace_id || key.tag.rel.dbnode != src_db_id {
break;
}
key.tag.rel.spcnode = tablespace_id;
key.tag.rel.dbnode = db_id;
key.lsn = lsn;
fn iterator(&self) -> Box<dyn RepositoryIterator + '_> {
Box::new(RocksIterator {
iter: self.db.raw_iterator(),
})
}
let v = iter.value().unwrap();
self.db.put(key.to_bytes(), v)?;
n += 1;
iter.next();
}
}
info!(
"Create database {}/{}, copy {} entries",
tablespace_id, db_id, n
);
fn put_raw_data(&self, key: RepositoryKey, data: &[u8]) -> Result<()> {
self.db.put(key.to_bytes(), data)?;
Ok(())
}
@@ -961,6 +849,36 @@ impl Timeline for RocksTimeline {
self.last_valid_lsn.load()
}
//
// Wait until WAL has been received up to the given LSN.
//
fn wait_lsn(&self, lsn: Lsn) -> Result<Lsn> {
let mut lsn = lsn;
// When invalid LSN is requested, it means "don't wait, return latest version of the page"
// This is necessary for bootstrap.
if lsn == Lsn(0) {
let last_valid_lsn = self.last_valid_lsn.load();
trace!(
"walreceiver doesn't work yet last_valid_lsn {}, requested {}",
last_valid_lsn,
lsn
);
lsn = last_valid_lsn;
}
//trace!("Start waiting for LSN {}, valid LSN is {}", lsn, self.last_valid_lsn.load());
self.last_valid_lsn
.wait_for_timeout(lsn, TIMEOUT)
.with_context(|| {
format!(
"Timed out while waiting for WAL record at LSN {} to arrive",
lsn
)
})?;
//trace!("Stop waiting for LSN {}, valid LSN is {}", lsn, self.last_valid_lsn.load());
Ok(lsn)
}
//
// Get statistics to be displayed in the user interface.
//

View File

@@ -279,7 +279,7 @@ fn restore_relfile(
},
blknum,
};
timeline.put_page_image(tag, lsn, Bytes::copy_from_slice(&buf));
timeline.put_page_image(tag, lsn, Bytes::copy_from_slice(&buf))?;
/*
if oldest_lsn == 0 || p.lsn < oldest_lsn {
oldest_lsn = p.lsn;
@@ -335,7 +335,7 @@ fn restore_nonrel_file(
},
blknum,
};
timeline.put_page_image(tag, lsn, Bytes::copy_from_slice(&buffer[..]));
timeline.put_page_image(tag, lsn, Bytes::copy_from_slice(&buffer[..]))?;
Ok(())
}
@@ -369,7 +369,7 @@ fn restore_slru_file(
},
blknum,
};
timeline.put_page_image(tag, lsn, Bytes::copy_from_slice(&buf));
timeline.put_page_image(tag, lsn, Bytes::copy_from_slice(&buf))?;
/*
if oldest_lsn == 0 || p.lsn < oldest_lsn {
oldest_lsn = p.lsn;
@@ -478,6 +478,6 @@ fn restore_wal(timeline: &dyn Timeline, timelineid: ZTimelineId, startpoint: Lsn
}
info!("reached end of WAL at {}", last_lsn);
let checkpoint_bytes = encode_checkpoint(checkpoint);
timeline.put_page_image(checkpoint_tag, Lsn(0), checkpoint_bytes);
timeline.put_page_image(checkpoint_tag, Lsn(0), checkpoint_bytes)?;
Ok(())
}

View File

@@ -273,7 +273,8 @@ pub struct DecodedBkpBlock {
/* Information on full-page image, if any */
has_image: bool, /* has image, even for consistency checking */
pub apply_image: bool, /* has image that should be restored */
pub will_init: bool,
pub will_init: bool, /* record intialize page content */
pub will_drop: bool, /* record drops relation */
//char *bkp_image;
hole_offset: u16,
hole_length: u16,
@@ -298,6 +299,7 @@ impl DecodedBkpBlock {
has_image: false,
apply_image: false,
will_init: false,
will_drop: false,
hole_offset: 0,
hole_length: 0,
bimg_len: 0,
@@ -574,10 +576,10 @@ pub fn decode_wal_record(checkpoint: &mut CheckPoint, record: Bytes) -> DecodedW
xlogrec.xl_rmid,
xlogrec.xl_info
);
if xlogrec.xl_xid > checkpoint.nextXid.value as u32 {
if xlogrec.xl_xid >= checkpoint.nextXid.value as u32 {
// TODO: handle XID wraparound
checkpoint.nextXid = FullTransactionId {
value: (checkpoint.nextXid.value & 0xFFFFFFFF00000000) | xlogrec.xl_xid as u64,
value: (checkpoint.nextXid.value & 0xFFFFFFFF00000000) | (xlogrec.xl_xid+1) as u64,
};
}
let remaining = xlogrec.xl_tot_len - SizeOfXLogRecord;
@@ -795,25 +797,36 @@ pub fn decode_wal_record(checkpoint: &mut CheckPoint, record: Bytes) -> DecodedW
let mut blk = DecodedBkpBlock::new();
blk.forknum = pg_constants::PG_XACT_FORKNUM;
blk.blkno = buf.get_i32_le() as u32;
blk.will_init = true;
let info = xlogrec.xl_info & !pg_constants::XLR_INFO_MASK;
if info == pg_constants::CLOG_ZEROPAGE {
blk.will_init = true;
} else {
assert!(info == pg_constants::CLOG_TRUNCATE);
blk.will_drop = true;
checkpoint.oldestXid = buf.get_u32_le();
checkpoint.oldestXidDB = buf.get_u32_le();
info!("RM_CLOG_ID truncate blkno {} oldestXid {} oldestXidDB {}",
blk.blkno, checkpoint.oldestXid, checkpoint.oldestXidDB);
}
trace!("RM_CLOG_ID updates block {}", blk.blkno);
blocks.push(blk);
} else if xlogrec.xl_rmid == pg_constants::RM_XACT_ID {
let info = xlogrec.xl_info & pg_constants::XLOG_XACT_OPMASK;
if info == pg_constants::XLOG_XACT_COMMIT {
let mut blk = DecodedBkpBlock::new();
blk.forknum = pg_constants::PG_XACT_FORKNUM;
blk.blkno = xlogrec.xl_xid / pg_constants::CLOG_XACTS_PER_PAGE;
trace!(
"XLOG_XACT_COMMIT xl_info {} xl_prev {:X}/{:X} xid {} updates block {} main_data_len {}",
xlogrec.xl_info, (xlogrec.xl_prev >> 32),
xlogrec.xl_prev & 0xffffffff,
xlogrec.xl_xid,
blk.blkno,
main_data_len
);
blocks.push(blk);
if info == pg_constants::XLOG_XACT_COMMIT || info == pg_constants::XLOG_XACT_COMMIT_PREPARED {
if info == pg_constants::XLOG_XACT_COMMIT {
let mut blk = DecodedBkpBlock::new();
blk.forknum = pg_constants::PG_XACT_FORKNUM;
blk.blkno = xlogrec.xl_xid / pg_constants::CLOG_XACTS_PER_PAGE;
trace!(
"XLOG_XACT_COMMIT xl_info {} xl_prev {:X}/{:X} xid {} updates block {} main_data_len {}",
xlogrec.xl_info, (xlogrec.xl_prev >> 32),
xlogrec.xl_prev & 0xffffffff,
xlogrec.xl_xid,
blk.blkno,
main_data_len
);
blocks.push(blk);
}
//parse commit record to extract subtrans entries
// xl_xact_commit starts with time of commit
let _xact_time = buf.get_i64_le();
@@ -847,7 +860,13 @@ pub fn decode_wal_record(checkpoint: &mut CheckPoint, record: Bytes) -> DecodedW
let spcnode = buf.get_u32_le();
let dbnode = buf.get_u32_le();
let relnode = buf.get_u32_le();
//TODO handle this too?
let mut blk = DecodedBkpBlock::new();
blk.forknum = pg_constants::MAIN_FORKNUM;
blk.rnode_spcnode = spcnode;
blk.rnode_dbnode = dbnode;
blk.rnode_relnode = relnode;
blk.will_drop = true;
blocks.push(blk);
trace!(
"XLOG_XACT_COMMIT relfilenode {}/{}/{}",
spcnode,
@@ -864,23 +883,29 @@ pub fn decode_wal_record(checkpoint: &mut CheckPoint, record: Bytes) -> DecodedW
}
}
if xinfo & pg_constants::XACT_XINFO_HAS_TWOPHASE != 0 {
let _xid = buf.get_u32_le();
let xid = buf.get_u32_le();
let mut blk = DecodedBkpBlock::new();
blk.forknum = pg_constants::PG_XACT_FORKNUM;
blk.blkno = xid / pg_constants::CLOG_XACTS_PER_PAGE;
blocks.push(blk);
trace!("XLOG_XACT_COMMIT-XACT_XINFO_HAS_TWOPHASE");
//TODO handle this to be able to restore pg_twophase on node start
}
} else if info == pg_constants::XLOG_XACT_ABORT {
let mut blk = DecodedBkpBlock::new();
blk.forknum = pg_constants::PG_XACT_FORKNUM;
blk.blkno = xlogrec.xl_xid / pg_constants::CLOG_XACTS_PER_PAGE;
trace!(
"XLOG_XACT_ABORT xl_info {} xl_prev {:X}/{:X} xid {} updates block {} main_data_len {}",
xlogrec.xl_info, (xlogrec.xl_prev >> 32),
xlogrec.xl_prev & 0xffffffff,
xlogrec.xl_xid,
blk.blkno,
main_data_len
);
blocks.push(blk);
} else if info == pg_constants::XLOG_XACT_ABORT || info == pg_constants::XLOG_XACT_ABORT_PREPARED {
if info == pg_constants::XLOG_XACT_ABORT {
let mut blk = DecodedBkpBlock::new();
blk.forknum = pg_constants::PG_XACT_FORKNUM;
blk.blkno = xlogrec.xl_xid / pg_constants::CLOG_XACTS_PER_PAGE;
trace!(
"XLOG_XACT_ABORT xl_info {} xl_prev {:X}/{:X} xid {} updates block {} main_data_len {}",
xlogrec.xl_info, (xlogrec.xl_prev >> 32),
xlogrec.xl_prev & 0xffffffff,
xlogrec.xl_xid,
blk.blkno,
main_data_len
);
blocks.push(blk);
}
//parse abort record to extract subtrans entries
// xl_xact_abort starts with time of commit
let _xact_time = buf.get_i64_le();
@@ -914,7 +939,13 @@ pub fn decode_wal_record(checkpoint: &mut CheckPoint, record: Bytes) -> DecodedW
let spcnode = buf.get_u32_le();
let dbnode = buf.get_u32_le();
let relnode = buf.get_u32_le();
//TODO save these too
let mut blk = DecodedBkpBlock::new();
blk.forknum = pg_constants::MAIN_FORKNUM;
blk.rnode_spcnode = spcnode;
blk.rnode_dbnode = dbnode;
blk.rnode_relnode = relnode;
blk.will_drop = true;
blocks.push(blk);
trace!(
"XLOG_XACT_ABORT relfilenode {}/{}/{}",
spcnode,
@@ -924,7 +955,11 @@ pub fn decode_wal_record(checkpoint: &mut CheckPoint, record: Bytes) -> DecodedW
}
}
if xinfo & pg_constants::XACT_XINFO_HAS_TWOPHASE != 0 {
let _xid = buf.get_u32_le();
let xid = buf.get_u32_le();
let mut blk = DecodedBkpBlock::new();
blk.forknum = pg_constants::PG_XACT_FORKNUM;
blk.blkno = xid / pg_constants::CLOG_XACTS_PER_PAGE;
blocks.push(blk);
trace!("XLOG_XACT_ABORT-XACT_XINFO_HAS_TWOPHASE");
}
} else if info == pg_constants::XLOG_XACT_PREPARE {
@@ -932,6 +967,8 @@ pub fn decode_wal_record(checkpoint: &mut CheckPoint, record: Bytes) -> DecodedW
blk.forknum = pg_constants::PG_TWOPHASE_FORKNUM;
blk.blkno = xlogrec.xl_xid;
blk.will_init = true;
blocks.push(blk);
info!("Prepare transaction {}", xlogrec.xl_xid);
}
} else if xlogrec.xl_rmid == pg_constants::RM_DBASE_ID {
let info = xlogrec.xl_info & !pg_constants::XLR_INFO_MASK;
@@ -1067,18 +1104,18 @@ pub fn decode_wal_record(checkpoint: &mut CheckPoint, record: Bytes) -> DecodedW
blk.blkno = blkno;
blocks.push(blk);
}
if xlrec.mid > checkpoint.nextMulti {
checkpoint.nextMulti = xlrec.mid;
if xlrec.mid >= checkpoint.nextMulti {
checkpoint.nextMulti = xlrec.mid+1;
}
if xlrec.moff > checkpoint.nextMultiOffset {
checkpoint.nextMultiOffset = xlrec.moff;
if xlrec.moff+xlrec.nmembers > checkpoint.nextMultiOffset {
checkpoint.nextMultiOffset = xlrec.moff+xlrec.nmembers;
}
let max_xid = xlrec
.members
.iter()
.fold(checkpoint.nextXid.value as u32, |acc, mbr| {
if mbr.xid > acc {
mbr.xid
if mbr.xid >= acc {
mbr.xid+1
} else {
acc
}
@@ -1088,7 +1125,7 @@ pub fn decode_wal_record(checkpoint: &mut CheckPoint, record: Bytes) -> DecodedW
};
} else if info == pg_constants::XLOG_MULTIXACT_TRUNCATE_ID {
let xlrec = XlMultiXactTruncate::decode(&mut buf);
checkpoint.oldestXid = xlrec.end_trunc_off;
checkpoint.oldestMulti = xlrec.end_trunc_off;
checkpoint.oldestMultiDB = xlrec.oldest_multi_db;
let first_off_blkno =
xlrec.start_trunc_off / pg_constants::MULTIXACT_OFFSETS_PER_PAGE as u32;
@@ -1098,7 +1135,7 @@ pub fn decode_wal_record(checkpoint: &mut CheckPoint, record: Bytes) -> DecodedW
let mut blk = DecodedBkpBlock::new();
blk.forknum = pg_constants::PG_MXACT_OFFSETS_FORKNUM;
blk.blkno = blkno;
blk.will_init = true;
blk.will_drop = true;
blocks.push(blk);
}
let first_mbr_blkno =
@@ -1109,7 +1146,7 @@ pub fn decode_wal_record(checkpoint: &mut CheckPoint, record: Bytes) -> DecodedW
let mut blk = DecodedBkpBlock::new();
blk.forknum = pg_constants::PG_MXACT_MEMBERS_FORKNUM;
blk.blkno = blkno;
blk.will_init = true;
blk.will_drop = true;
blocks.push(blk);
}
} else {

View File

@@ -201,7 +201,7 @@ fn walreceiver_main(
let new_checkpoint_bytes = encode_checkpoint(checkpoint);
if new_checkpoint_bytes != old_checkpoint_bytes {
timeline.put_page_image(checkpoint_tag, Lsn(0), new_checkpoint_bytes);
timeline.put_page_image(checkpoint_tag, Lsn(0), new_checkpoint_bytes)?;
}
// Now that this record has been handled, let the page cache know that
// it is up-to-date to this LSN

View File

@@ -286,9 +286,11 @@ impl PostgresRedoManagerInternal {
} else if xlogrec.xl_rmid == pg_constants::RM_XACT_ID {
let info = xlogrec.xl_info & pg_constants::XLOG_XACT_OPMASK;
let mut status = 0;
if info == pg_constants::XLOG_XACT_COMMIT {
if info == pg_constants::XLOG_XACT_COMMIT || info == pg_constants::XLOG_XACT_COMMIT_PREPARED {
status = pg_constants::TRANSACTION_STATUS_COMMITTED;
transaction_id_set_status(xlogrec.xl_xid, status, &mut page);
if info == pg_constants::XLOG_XACT_COMMIT {
transaction_id_set_status(xlogrec.xl_xid, status, &mut page);
}
//handle subtrans
let _xact_time = buf.get_i64_le();
let mut xinfo = 0;
@@ -312,9 +314,38 @@ impl PostgresRedoManagerInternal {
}
}
}
} else if info == pg_constants::XLOG_XACT_ABORT {
if info == pg_constants::XLOG_XACT_COMMIT_PREPARED {
if xinfo & pg_constants::XACT_XINFO_HAS_RELFILENODES != 0 {
let nrels = buf.get_i32_le();
for _i in 0..nrels {
let spcnode = buf.get_u32_le();
let dbnode = buf.get_u32_le();
let relnode = buf.get_u32_le();
//TODO handle this too?
trace!(
"XLOG_XACT_COMMIT relfilenode {}/{}/{}",
spcnode,
dbnode,
relnode
);
}
}
if xinfo & pg_constants::XACT_XINFO_HAS_INVALS != 0 {
let nmsgs = buf.get_i32_le();
for _i in 0..nmsgs {
let sizeof_shared_invalidation_message = 0;
buf.advance(sizeof_shared_invalidation_message);
}
}
assert!((xinfo & pg_constants::XACT_XINFO_HAS_TWOPHASE) != 0);
let xid = buf.get_u32_le();
transaction_id_set_status(xid, status, &mut page);
}
} else if info == pg_constants::XLOG_XACT_ABORT || info == pg_constants::XLOG_XACT_ABORT_PREPARED {
status = pg_constants::TRANSACTION_STATUS_ABORTED;
transaction_id_set_status(xlogrec.xl_xid, status, &mut page);
if info == pg_constants::XLOG_XACT_ABORT {
transaction_id_set_status(xlogrec.xl_xid, status, &mut page);
}
//handle subtrans
let _xact_time = buf.get_i64_le();
let mut xinfo = 0;
@@ -338,8 +369,39 @@ impl PostgresRedoManagerInternal {
}
}
}
} else if info != pg_constants::XLOG_XACT_PREPARE {
trace!("handle_apply_request for RM_XACT_ID-{} NOT SUPPORTED YET. RETURN. lsn {} main_data_offset {}, rec.len {}",
if info == pg_constants::XLOG_XACT_ABORT_PREPARED {
if xinfo & pg_constants::XACT_XINFO_HAS_RELFILENODES != 0 {
let nrels = buf.get_i32_le();
for _i in 0..nrels {
let spcnode = buf.get_u32_le();
let dbnode = buf.get_u32_le();
let relnode = buf.get_u32_le();
//TODO handle this too?
trace!(
"XLOG_XACT_COMMIT relfilenode {}/{}/{}",
spcnode,
dbnode,
relnode
);
}
}
if xinfo & pg_constants::XACT_XINFO_HAS_INVALS != 0 {
let nmsgs = buf.get_i32_le();
for _i in 0..nmsgs {
let sizeof_shared_invalidation_message = 0;
buf.advance(sizeof_shared_invalidation_message);
}
}
assert!((xinfo & pg_constants::XACT_XINFO_HAS_TWOPHASE) != 0);
let xid = buf.get_u32_le();
transaction_id_set_status(xid, status, &mut page);
}
} else if info == pg_constants::XLOG_XACT_PREPARE {
info!("Apply prepare {} record", xlogrec.xl_xid);
page.clear();
page.extend_from_slice(&buf[..]);
} else {
error!("handle_apply_request for RM_XACT_ID-{} NOT SUPPORTED YET. RETURN. lsn {} main_data_offset {}, rec.len {}",
status,
record.lsn,
record.main_data_offset, record.rec.len());
@@ -383,9 +445,6 @@ impl PostgresRedoManagerInternal {
}
}
}
} else if info == pg_constants::XLOG_MULTIXACT_TRUNCATE_ID {
// empty page image indicates that this SLRU page is truncated and can be removed by GC
page.clear();
} else {
panic!();
}

View File

@@ -60,6 +60,8 @@ pub const CLOG_TRUNCATE: u8 = 0x10;
pub const XLOG_XACT_COMMIT: u8 = 0x00;
pub const XLOG_XACT_PREPARE: u8 = 0x10;
pub const XLOG_XACT_ABORT: u8 = 0x20;
pub const XLOG_XACT_COMMIT_PREPARED: u8 = 0x30;
pub const XLOG_XACT_ABORT_PREPARED: u8 = 0x40;
// From srlu.h
pub const SLRU_PAGES_PER_SEGMENT: u32 = 32;

View File

@@ -47,6 +47,7 @@ pub fn forknumber_to_name(forknum: u8) -> Option<&'static str> {
pg_constants::PG_MXACT_OFFSETS_FORKNUM => Some("mxact_offsets"),
pg_constants::PG_MXACT_MEMBERS_FORKNUM => Some("mxact_members"),
pg_constants::PG_TWOPHASE_FORKNUM => Some("twophase"),
pg_constants::PG_CHECKPOINT_FORKNUM => Some("checkpoint"),
_ => Some("UNKNOWN FORKNUM"),
}

View File

@@ -0,0 +1,62 @@
import pytest
import os
import psycopg2
import multiprocessing
pytest_plugins = ("fixtures.zenith_fixtures")
def runQuery(connstr):
con = psycopg2.connect(connstr)
con.set_isolation_level(psycopg2.extensions.ISOLATION_LEVEL_AUTOCOMMIT)
cur = con.cursor()
cur.execute('select * from t1 for key share;')
def test_multixact(pageserver, postgres, pg_bin, zenith_cli, base_dir):
# Create a branch for us
zenith_cli.run(["branch", "test_multixact", "empty"])
pg = postgres.create_start('test_multixact')
print("postgres is running on 'test_multixact' branch")
pg_conn = psycopg2.connect(pg.connstr())
pg_conn.set_isolation_level(psycopg2.extensions.ISOLATION_LEVEL_AUTOCOMMIT)
cur = pg_conn.cursor()
cur.execute('CREATE TABLE t1(i int primary key);'
'INSERT INTO t1 select * from generate_series(1,100);')
# Lock entries in parallel connections to set multixact
nclients = 3
pool = multiprocessing.Pool(nclients)
args = [pg.connstr()] * nclients
pool.map(runQuery, args)
pool.close()
pool.join()
# force wal flush
cur.execute('checkpoint')
cur.execute('SELECT next_multixact_id, pg_current_wal_flush_lsn() FROM pg_control_checkpoint();')
res = cur.fetchone()
next_multixact_id = res[0]
lsn = res[1]
# Ensure that we did lock some tuples
assert(int(next_multixact_id) > 1)
# Branch at this point
zenith_cli.run(["branch", "test_multixact_new", "test_multixact@"+lsn]);
pg_new = postgres.create_start('test_multixact_new')
print("postgres is running on 'test_multixact_new' branch")
pg_new_conn = psycopg2.connect(pg_new.connstr())
pg_new_conn.set_isolation_level(psycopg2.extensions.ISOLATION_LEVEL_AUTOCOMMIT)
cur_new = pg_new_conn.cursor()
cur_new.execute('SELECT next_multixact_id FROM pg_control_checkpoint();')
next_multixact_id_new = cur_new.fetchone()[0]
# Check that we restored pg_controlfile correctly
# TODO compare content of pg_multixact files?
assert(next_multixact_id_new == next_multixact_id)

View File

@@ -0,0 +1,61 @@
import pytest
import getpass
import psycopg2
import time
import os
pytest_plugins = ("fixtures.zenith_fixtures")
#
# Test pg_control values after recreating a postgres instance
#
def test_oldestxid(zenith_cli, pageserver, postgres, pg_bin, repo_dir):
zenith_cli.run(["branch", "test_oldestxid", "empty"]);
pg = postgres.create_start('test_oldestxid')
print("postgres is running on 'test_oldestxid' branch")
pg_conn = psycopg2.connect(pg.connstr());
pg_conn.set_isolation_level(psycopg2.extensions.ISOLATION_LEVEL_AUTOCOMMIT)
cur = pg_conn.cursor()
# Create table, and insert a row
cur.execute('CREATE TABLE foo (t text)');
cur.execute("INSERT INTO foo VALUES ('bar')");
cur.execute('checkpoint')
cur.execute('SELECT oldest_xid, oldest_xid_dbid, oldest_active_xid FROM pg_control_checkpoint();')
res = cur.fetchone()
oldest_xid = res[0]
oldest_xid_dbid = res[1]
oldest_active_xid = res[2]
# Stop, and destroy the Postgres instance. Then recreate and restart it.
pg_conn.close();
pg.stop();
# capture old pg_controldata output for debugging purposes
pgdatadir = os.path.join(repo_dir, 'pgdatadirs/test_oldestxid')
pg_bin.run_capture(['pg_controldata', pgdatadir])
pg.destroy();
pg.create_start('test_oldestxid');
pg_conn = psycopg2.connect(pg.connstr());
pg_conn.set_isolation_level(psycopg2.extensions.ISOLATION_LEVEL_AUTOCOMMIT)
cur = pg_conn.cursor()
cur.execute('SELECT oldest_xid, oldest_xid_dbid, oldest_active_xid FROM pg_control_checkpoint();')
res = cur.fetchone()
oldest_xid_new = res[0]
oldest_xid_dbid_new = res[1]
oldest_active_xid_new = res[2]
assert(oldest_xid_new == oldest_xid)
assert(oldest_xid_dbid_new == oldest_xid_dbid)
# this field should be reset at restart
assert(int(oldest_active_xid_new) == 0)
# capture new pg_controldata output for debugging purposes
pgdatadir = os.path.join(repo_dir, 'pgdatadirs/test_oldestxid')
pg_bin.run_capture(['pg_controldata', pgdatadir])

View File

@@ -0,0 +1,60 @@
import pytest
import getpass
import psycopg2
import time
pytest_plugins = ("fixtures.zenith_fixtures")
#
# Test restarting and recreating a postgres instance
#
def test_restart_compute(zenith_cli, pageserver, postgres, pg_bin):
zenith_cli.run(["branch", "test_restart_compute", "empty"]);
pg = postgres.create_start('test_restart_compute')
print("postgres is running on 'test_restart_compute' branch")
pg_conn = psycopg2.connect(pg.connstr());
pg_conn.set_isolation_level(psycopg2.extensions.ISOLATION_LEVEL_AUTOCOMMIT)
cur = pg_conn.cursor()
# Create table, and insert a row
cur.execute('CREATE TABLE foo (t text)');
cur.execute("INSERT INTO foo VALUES ('bar')");
# Stop and restart the Postgres instance
pg_conn.close();
pg.stop();
pg.start();
pg_conn = psycopg2.connect(pg.connstr());
pg_conn.set_isolation_level(psycopg2.extensions.ISOLATION_LEVEL_AUTOCOMMIT)
cur = pg_conn.cursor()
# We can still see the row
cur.execute('SELECT count(*) FROM foo');
assert(cur.fetchone()[0] == 1);
# Insert another row
cur.execute("INSERT INTO foo VALUES ('bar2')");
cur.execute('SELECT count(*) FROM foo');
assert(cur.fetchone()[0] == 2);
# FIXME: Currently, there is no guarantee that by the time the INSERT commits, the WAL
# has been streamed safely to the WAL safekeeper or page server. It is merely stored
# on the Postgres instance's local disk. Sleep a little, to give it time to be
# streamed. This should be removed, when we have the ability to run the Postgres
# instance -> safekeeper streaming in synchronous mode.
time.sleep(5)
# Stop, and destroy the Postgres instance. Then recreate and restart it.
pg_conn.close();
pg.stop();
pg.destroy();
pg.create_start('test_restart_compute');
pg_conn = psycopg2.connect(pg.connstr());
pg_conn.set_isolation_level(psycopg2.extensions.ISOLATION_LEVEL_AUTOCOMMIT)
cur = pg_conn.cursor()
# We can still see the rows
cur.execute('SELECT count(*) FROM foo');
assert(cur.fetchone()[0] == 2);

View File

@@ -0,0 +1,58 @@
#
# Test branching, when a transaction is in prepared state
#
import pytest
import getpass
import psycopg2
pytest_plugins = ("fixtures.zenith_fixtures")
def test_twophase(zenith_cli, pageserver, postgres, pg_bin):
zenith_cli.run(["branch", "test_twophase", "empty"]);
pg = postgres.create_start('test_twophase', ['max_prepared_transactions=5'])
print("postgres is running on 'test_twophase' branch")
conn = psycopg2.connect(pg.connstr());
conn.set_isolation_level(psycopg2.extensions.ISOLATION_LEVEL_AUTOCOMMIT)
cur = conn.cursor()
cur.execute('CREATE TABLE foo (t text)');
# Prepare a transaction that will insert a row
cur.execute('BEGIN');
cur.execute("INSERT INTO foo VALUES ('one')");
cur.execute("PREPARE TRANSACTION 'insert_one'");
# Prepare another transaction that will insert a row
cur.execute('BEGIN');
cur.execute("INSERT INTO foo VALUES ('two')");
cur.execute("PREPARE TRANSACTION 'insert_two'");
cur.execute('BEGIN');
cur.execute("INSERT INTO foo VALUES ('three')");
cur.execute("PREPARE TRANSACTION 'insert_three'");
cur.execute("COMMIT PREPARED 'insert_three'");
cur.execute('SELECT pg_current_wal_insert_lsn()');
lsn = cur.fetchone()[0]
# Create a branch with the transaction in prepared state
zenith_cli.run(["branch", "test_twophase_prepared", "test_twophase@"+lsn]);
pg2 = postgres.create_start('test_twophase_prepared', ['max_prepared_transactions=5'])
conn2 = psycopg2.connect(pg2.connstr());
conn2.set_isolation_level(psycopg2.extensions.ISOLATION_LEVEL_AUTOCOMMIT)
cur2 = conn2.cursor()
# On the new branch, commit one of the prepared transactions, abort the other one.
cur2.execute("COMMIT PREPARED 'insert_one'");
cur2.execute("ROLLBACK PREPARED 'insert_two'");
cur2.execute('SELECT * FROM foo');
assert(cur2.fetchall() == [('one',),('three',)]);
# Neither insert is visible on the original branch, the transactions are still
# in prepared state there.
cur.execute('SELECT * FROM foo');
assert(cur.fetchall() == [('three',)]);

View File

@@ -168,14 +168,18 @@ class Postgres:
self.branch = None
# path to conf is <repo_dir>/pgdatadirs/<branch_name>/postgresql.conf
def create_start(self, branch, config_lines=None):
""" create the pg data directory, and start the server """
def create(self, branch, config_lines=None):
""" create the pg data directory """
self.zenith_cli.run(['pg', 'create', branch])
self.branch = branch
if config_lines is None:
config_lines = []
self.config(config_lines)
self.zenith_cli.run(['pg', 'start', branch])
return
def start(self):
""" start the server """
self.zenith_cli.run(['pg', 'start', self.branch])
self.running = True
return
@@ -189,9 +193,19 @@ class Postgres:
conf.write('\n')
def stop(self):
""" stop the server """
if self.running:
self.zenith_cli.run(['pg', 'stop', self.branch])
def destroy(self):
datadir = os.path.join(self.repo_dir, 'pgdatadirs/{}'.format(self.branch))
shutil.rmtree(datadir)
def create_start(self, branch, config_lines=None):
self.create(branch, config_lines);
self.start();
return
# Return a libpq connection string to connect to the Postgres instance
def connstr(self, dbname='postgres'):
conn_str = 'host={} port={} dbname={} user={}'.format(