mirror of
https://github.com/neondatabase/neon.git
synced 2026-02-16 00:50:36 +00:00
Compare commits
15 Commits
split-prox
...
test_oldes
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
01e9011ea4 | ||
|
|
f61e92b692 | ||
|
|
3f89016bd7 | ||
|
|
3865a8d901 | ||
|
|
2b7963056c | ||
|
|
1e08fda040 | ||
|
|
47f9ff6410 | ||
|
|
b72939530c | ||
|
|
72e373cd4e | ||
|
|
e3ea9cf70f | ||
|
|
e2a9b4cc9b | ||
|
|
5b9ea6495e | ||
|
|
6ec8a90c78 | ||
|
|
e9c7665c81 | ||
|
|
feb925c546 |
@@ -5,6 +5,7 @@ use std::sync::Arc;
|
|||||||
use std::time::SystemTime;
|
use std::time::SystemTime;
|
||||||
use tar::{Builder, Header};
|
use tar::{Builder, Header};
|
||||||
use walkdir::WalkDir;
|
use walkdir::WalkDir;
|
||||||
|
use bytes::{BufMut, BytesMut};
|
||||||
|
|
||||||
use crate::repository::{BufferTag, RelTag, Timeline};
|
use crate::repository::{BufferTag, RelTag, Timeline};
|
||||||
use postgres_ffi::relfile_utils::*;
|
use postgres_ffi::relfile_utils::*;
|
||||||
@@ -101,7 +102,6 @@ fn add_relmap_files(
|
|||||||
ar.append_path_with_name(&src_path, &dst_path)?;
|
ar.append_path_with_name(&src_path, &dst_path)?;
|
||||||
format!("base/{}/pg_filenode.map", db.dbnode)
|
format!("base/{}/pg_filenode.map", db.dbnode)
|
||||||
};
|
};
|
||||||
info!("Deliver {}", path);
|
|
||||||
assert!(img.len() == 512);
|
assert!(img.len() == 512);
|
||||||
let header = new_tar_header(&path, img.len() as u64)?;
|
let header = new_tar_header(&path, img.len() as u64)?;
|
||||||
ar.append(&header, &img[..])?;
|
ar.append(&header, &img[..])?;
|
||||||
@@ -128,9 +128,13 @@ fn add_twophase_files(
|
|||||||
blknum: *xid,
|
blknum: *xid,
|
||||||
};
|
};
|
||||||
let img = timeline.get_page_at_lsn(tag, lsn)?;
|
let img = timeline.get_page_at_lsn(tag, lsn)?;
|
||||||
|
let mut buf = BytesMut::new();
|
||||||
|
buf.extend_from_slice(&img[..]);
|
||||||
|
let crc = crc32c::crc32c(&img[..]);
|
||||||
|
buf.put_u32_le(crc);
|
||||||
let path = format!("pg_twophase/{:>08X}", xid);
|
let path = format!("pg_twophase/{:>08X}", xid);
|
||||||
let header = new_tar_header(&path, img.len() as u64)?;
|
let header = new_tar_header(&path, buf.len() as u64)?;
|
||||||
ar.append(&header, &img[..])?;
|
ar.append(&header, &buf[..])?;
|
||||||
}
|
}
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
@@ -154,11 +158,10 @@ fn add_pgcontrol_file(
|
|||||||
let mut checkpoint = postgres_ffi::decode_checkpoint(checkpoint_bytes)?;
|
let mut checkpoint = postgres_ffi::decode_checkpoint(checkpoint_bytes)?;
|
||||||
|
|
||||||
checkpoint.redo = lsn.0;
|
checkpoint.redo = lsn.0;
|
||||||
checkpoint.nextXid.value += 1;
|
|
||||||
// TODO: When we restart master there are no active transaction and oldestXid is
|
// TODO: When we restart master there are no active transaction and oldestXid is
|
||||||
// equal to nextXid if there are no prepared transactions.
|
// equal to nextXid if there are no prepared transactions.
|
||||||
// Let's ignore them for a while...
|
// Let's ignore them for a while...
|
||||||
checkpoint.oldestXid = checkpoint.nextXid.value as u32;
|
//checkpoint.oldestXid = checkpoint.nextXid.value as u32;
|
||||||
pg_control.checkPointCopy = checkpoint;
|
pg_control.checkPointCopy = checkpoint;
|
||||||
let pg_control_bytes = postgres_ffi::encode_pg_control(pg_control);
|
let pg_control_bytes = postgres_ffi::encode_pg_control(pg_control);
|
||||||
let header = new_tar_header("global/pg_control", pg_control_bytes.len() as u64)?;
|
let header = new_tar_header("global/pg_control", pg_control_bytes.len() as u64)?;
|
||||||
|
|||||||
@@ -9,6 +9,8 @@ use postgres_ffi::relfile_utils::forknumber_to_name;
|
|||||||
use std::fmt;
|
use std::fmt;
|
||||||
use std::sync::Arc;
|
use std::sync::Arc;
|
||||||
use zenith_utils::lsn::Lsn;
|
use zenith_utils::lsn::Lsn;
|
||||||
|
use log::*;
|
||||||
|
use postgres_ffi::nonrelfile_utils::transaction_id_get_status;
|
||||||
|
|
||||||
///
|
///
|
||||||
/// A repository corresponds to one .zenith directory. One repository holds multiple
|
/// A repository corresponds to one .zenith directory. One repository holds multiple
|
||||||
@@ -61,14 +63,23 @@ pub trait Timeline {
|
|||||||
///
|
///
|
||||||
/// This will implicitly extend the relation, if the page is beyond the
|
/// This will implicitly extend the relation, if the page is beyond the
|
||||||
/// current end-of-file.
|
/// current end-of-file.
|
||||||
fn put_wal_record(&self, tag: BufferTag, rec: WALRecord);
|
fn put_wal_record(&self, tag: BufferTag, rec: WALRecord) -> Result<()>;
|
||||||
|
|
||||||
/// Like put_wal_record, but with ready-made image of the page.
|
/// Like put_wal_record, but with ready-made image of the page.
|
||||||
fn put_page_image(&self, tag: BufferTag, lsn: Lsn, img: Bytes);
|
fn put_page_image(&self, tag: BufferTag, lsn: Lsn, img: Bytes) -> Result<()>;
|
||||||
|
|
||||||
/// Truncate relation
|
/// Truncate relation
|
||||||
fn put_truncation(&self, rel: RelTag, lsn: Lsn, nblocks: u32) -> Result<()>;
|
fn put_truncation(&self, rel: RelTag, lsn: Lsn, nblocks: u32) -> Result<()>;
|
||||||
|
|
||||||
|
/// Drop relation or file segment
|
||||||
|
fn put_drop(&self, tag: BufferTag, lsn: Lsn) -> Result<()>;
|
||||||
|
|
||||||
|
/// Put raw data
|
||||||
|
fn put_raw_data(&self, key: RepositoryKey, data: &[u8]) -> Result<()>;
|
||||||
|
|
||||||
|
/// Get repository iterator
|
||||||
|
fn iterator(&self) -> Box<dyn RepositoryIterator + '_>;
|
||||||
|
|
||||||
/// Create a new database from a template database
|
/// Create a new database from a template database
|
||||||
///
|
///
|
||||||
/// In PostgreSQL, CREATE DATABASE works by scanning the data directory and
|
/// In PostgreSQL, CREATE DATABASE works by scanning the data directory and
|
||||||
@@ -81,7 +92,49 @@ pub trait Timeline {
|
|||||||
tablespace_id: Oid,
|
tablespace_id: Oid,
|
||||||
src_db_id: Oid,
|
src_db_id: Oid,
|
||||||
src_tablespace_id: Oid,
|
src_tablespace_id: Oid,
|
||||||
) -> Result<()>;
|
) -> Result<()> {
|
||||||
|
let mut n = 0;
|
||||||
|
for forknum in &[
|
||||||
|
pg_constants::MAIN_FORKNUM,
|
||||||
|
pg_constants::FSM_FORKNUM,
|
||||||
|
pg_constants::VISIBILITYMAP_FORKNUM,
|
||||||
|
pg_constants::INIT_FORKNUM,
|
||||||
|
pg_constants::PG_FILENODEMAP_FORKNUM,
|
||||||
|
] {
|
||||||
|
let key = RepositoryKey {
|
||||||
|
tag: BufferTag {
|
||||||
|
rel: RelTag {
|
||||||
|
spcnode: src_tablespace_id,
|
||||||
|
dbnode: src_db_id,
|
||||||
|
relnode: 0,
|
||||||
|
forknum: *forknum,
|
||||||
|
},
|
||||||
|
blknum: 0,
|
||||||
|
},
|
||||||
|
lsn: Lsn(0),
|
||||||
|
};
|
||||||
|
let mut iter = self.iterator();
|
||||||
|
iter.first(&key);
|
||||||
|
while iter.valid() {
|
||||||
|
let mut key = iter.key();
|
||||||
|
if key.tag.rel.spcnode != src_tablespace_id || key.tag.rel.dbnode != src_db_id {
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
key.tag.rel.spcnode = tablespace_id;
|
||||||
|
key.tag.rel.dbnode = db_id;
|
||||||
|
key.lsn = lsn;
|
||||||
|
|
||||||
|
self.put_raw_data(key, iter.value())?;
|
||||||
|
n += 1;
|
||||||
|
iter.next();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
info!(
|
||||||
|
"Create database {}/{}, copy {} entries",
|
||||||
|
tablespace_id, db_id, n
|
||||||
|
);
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
///
|
///
|
||||||
/// Helper function to parse a WAL record and call the above functions for all the
|
/// Helper function to parse a WAL record and call the above functions for all the
|
||||||
@@ -106,14 +159,18 @@ pub trait Timeline {
|
|||||||
blknum: blk.blkno,
|
blknum: blk.blkno,
|
||||||
};
|
};
|
||||||
|
|
||||||
let rec = WALRecord {
|
if blk.will_drop {
|
||||||
lsn,
|
self.put_drop(tag, lsn)?;
|
||||||
will_init: blk.will_init || blk.apply_image,
|
} else {
|
||||||
rec: recdata.clone(),
|
let rec = WALRecord {
|
||||||
main_data_offset: decoded.main_data_offset as u32,
|
lsn,
|
||||||
};
|
will_init: blk.will_init || blk.apply_image,
|
||||||
|
rec: recdata.clone(),
|
||||||
|
main_data_offset: decoded.main_data_offset as u32,
|
||||||
|
};
|
||||||
|
|
||||||
self.put_wal_record(tag, rec);
|
self.put_wal_record(tag, rec)?;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Handle a few special record types
|
// Handle a few special record types
|
||||||
@@ -167,15 +224,163 @@ pub trait Timeline {
|
|||||||
fn advance_last_record_lsn(&self, lsn: Lsn);
|
fn advance_last_record_lsn(&self, lsn: Lsn);
|
||||||
fn get_last_record_lsn(&self) -> Lsn;
|
fn get_last_record_lsn(&self) -> Lsn;
|
||||||
|
|
||||||
|
//
|
||||||
|
// Wait until WAL has been received up to the given LSN.
|
||||||
|
//
|
||||||
|
fn wait_lsn(&self, lsn: Lsn) -> Result<Lsn>;
|
||||||
|
|
||||||
/// Get range [begin,end) of stored blocks. Used mostly for SMGR pseudorelations
|
/// Get range [begin,end) of stored blocks. Used mostly for SMGR pseudorelations
|
||||||
/// but can be also applied to normal relations.
|
/// but can be also applied to normal relations.
|
||||||
fn get_range(&self, rel: RelTag, lsn: Lsn) -> Result<(u32, u32)>;
|
fn get_range(&self, rel: RelTag, lsn: Lsn) -> Result<(u32, u32)> {
|
||||||
|
let _lsn = self.wait_lsn(lsn)?;
|
||||||
|
let mut key = RepositoryKey {
|
||||||
|
// minimal key to start with
|
||||||
|
tag: BufferTag { rel, blknum: 0 },
|
||||||
|
lsn: Lsn(0),
|
||||||
|
};
|
||||||
|
let mut iter = self.iterator();
|
||||||
|
iter.first(&key);
|
||||||
|
if iter.valid() {
|
||||||
|
let thiskey = iter.key();
|
||||||
|
let tag = thiskey.tag;
|
||||||
|
if tag.rel == rel {
|
||||||
|
// still trversing this relation
|
||||||
|
let first_blknum = tag.blknum;
|
||||||
|
key.tag.blknum = u32::MAX; // maximal key
|
||||||
|
iter.last(&key); // locate last entry
|
||||||
|
if iter.valid() {
|
||||||
|
let thiskey = iter.key();
|
||||||
|
let last_blknum = thiskey.tag.blknum;
|
||||||
|
return Ok((first_blknum, last_blknum + 1)); // upper boundary is exclusive
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
Ok((0, 0)) // empty range
|
||||||
|
}
|
||||||
|
|
||||||
/// Get vector of databases (represented using RelTag only dbnode and spcnode fields are used)
|
/// Get vector of databases (represented using RelTag only dbnode and spcnode fields are used)
|
||||||
fn get_databases(&self, lsn: Lsn) -> Result<Vec<RelTag>>;
|
fn get_databases(&self, lsn: Lsn) -> Result<Vec<RelTag>> {
|
||||||
|
let key = RepositoryKey {
|
||||||
|
// minimal key
|
||||||
|
tag: BufferTag {
|
||||||
|
rel: RelTag {
|
||||||
|
forknum: pg_constants::PG_FILENODEMAP_FORKNUM,
|
||||||
|
spcnode: 0,
|
||||||
|
dbnode: 0,
|
||||||
|
relnode: 0,
|
||||||
|
},
|
||||||
|
blknum: 0,
|
||||||
|
},
|
||||||
|
lsn: Lsn(0),
|
||||||
|
};
|
||||||
|
let mut dbs = Vec::new();
|
||||||
|
|
||||||
|
let mut iter = self.iterator();
|
||||||
|
iter.first(&key);
|
||||||
|
let mut prev_tag = key.tag.rel;
|
||||||
|
while iter.valid() {
|
||||||
|
let key = iter.key();
|
||||||
|
if key.tag.rel.forknum != pg_constants::PG_FILENODEMAP_FORKNUM {
|
||||||
|
break; // we are done with this fork
|
||||||
|
}
|
||||||
|
if key.tag.rel != prev_tag && key.lsn <= lsn {
|
||||||
|
prev_tag = key.tag.rel;
|
||||||
|
dbs.push(prev_tag); // collect unique tags
|
||||||
|
}
|
||||||
|
iter.next();
|
||||||
|
}
|
||||||
|
return Ok(dbs);
|
||||||
|
}
|
||||||
|
|
||||||
|
fn get_tx_status(&self, xid: TransactionId, lsn: Lsn) -> Result<u8> {
|
||||||
|
let tag = BufferTag {
|
||||||
|
rel: RelTag {
|
||||||
|
forknum: pg_constants::PG_XACT_FORKNUM,
|
||||||
|
spcnode: 0,
|
||||||
|
dbnode: 0,
|
||||||
|
relnode: 0,
|
||||||
|
},
|
||||||
|
blknum: xid / pg_constants::CLOG_XACTS_PER_PAGE,
|
||||||
|
};
|
||||||
|
let clog_page = self.get_page_at_lsn(tag, lsn)?;
|
||||||
|
let status = transaction_id_get_status(xid, &clog_page[..]);
|
||||||
|
Ok(status)
|
||||||
|
}
|
||||||
|
|
||||||
/// Get vector of prepared twophase transactions
|
/// Get vector of prepared twophase transactions
|
||||||
fn get_twophase(&self, lsn: Lsn) -> Result<Vec<TransactionId>>;
|
fn get_twophase(&self, lsn: Lsn) -> Result<Vec<TransactionId>> {
|
||||||
|
let key = RepositoryKey {
|
||||||
|
// minimal key
|
||||||
|
tag: BufferTag {
|
||||||
|
rel: RelTag {
|
||||||
|
forknum: pg_constants::PG_TWOPHASE_FORKNUM,
|
||||||
|
spcnode: 0,
|
||||||
|
dbnode: 0,
|
||||||
|
relnode: 0,
|
||||||
|
},
|
||||||
|
blknum: 0,
|
||||||
|
},
|
||||||
|
lsn: Lsn(0),
|
||||||
|
};
|
||||||
|
let mut gxacts = Vec::new();
|
||||||
|
|
||||||
|
let mut iter = self.iterator();
|
||||||
|
iter.first(&key);
|
||||||
|
while iter.valid() {
|
||||||
|
let key = iter.key();
|
||||||
|
if key.tag.rel.forknum != pg_constants::PG_TWOPHASE_FORKNUM {
|
||||||
|
break; // we are done with this fork
|
||||||
|
}
|
||||||
|
if key.lsn <= lsn {
|
||||||
|
let xid = key.tag.blknum;
|
||||||
|
if self.get_tx_status(xid, lsn)? == pg_constants::TRANSACTION_STATUS_IN_PROGRESS {
|
||||||
|
gxacts.push(xid);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
iter.next();
|
||||||
|
}
|
||||||
|
return Ok(gxacts);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Debug, PartialEq, Eq, PartialOrd, Ord, Clone)]
|
||||||
|
pub struct RepositoryKey {
|
||||||
|
pub tag: BufferTag,
|
||||||
|
pub lsn: Lsn,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl RepositoryKey {
|
||||||
|
fn pack(&self, buf: &mut BytesMut) {
|
||||||
|
self.tag.pack(buf);
|
||||||
|
buf.put_u64(self.lsn.0);
|
||||||
|
}
|
||||||
|
fn unpack(buf: &mut Bytes) -> RepositoryKey {
|
||||||
|
RepositoryKey {
|
||||||
|
tag: BufferTag::unpack(buf),
|
||||||
|
lsn: Lsn::from(buf.get_u64()),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
fn from_slice(slice: &[u8]) -> Self {
|
||||||
|
let mut buf = Bytes::copy_from_slice(slice);
|
||||||
|
Self::unpack(&mut buf)
|
||||||
|
}
|
||||||
|
|
||||||
|
fn to_bytes(&self) -> BytesMut {
|
||||||
|
let mut buf = BytesMut::new();
|
||||||
|
self.pack(&mut buf);
|
||||||
|
buf
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
pub trait RepositoryIterator {
|
||||||
|
fn first(&mut self, key: &RepositoryKey);
|
||||||
|
fn last(&mut self, key: &RepositoryKey);
|
||||||
|
fn next(&mut self);
|
||||||
|
fn prev(&mut self);
|
||||||
|
fn valid(&self) -> bool;
|
||||||
|
fn key(&self) -> RepositoryKey;
|
||||||
|
fn value(&self) -> &[u8];
|
||||||
}
|
}
|
||||||
|
|
||||||
#[derive(Clone)]
|
#[derive(Clone)]
|
||||||
@@ -362,13 +567,6 @@ mod tests {
|
|||||||
}
|
}
|
||||||
|
|
||||||
/// Test get_relsize() and truncation.
|
/// Test get_relsize() and truncation.
|
||||||
///
|
|
||||||
/// FIXME: The RocksRepository implementation returns wrong relation size, if
|
|
||||||
/// you make a request with an old LSN. It seems to ignore the requested LSN
|
|
||||||
/// and always return result as of latest LSN. For such cases, the expected
|
|
||||||
/// results below match the current RocksRepository behavior, so that the test
|
|
||||||
/// passes, and the actually correct answers are in comments like
|
|
||||||
/// "// CORRECT: <correct answer>"
|
|
||||||
#[test]
|
#[test]
|
||||||
fn test_relsize() -> Result<()> {
|
fn test_relsize() -> Result<()> {
|
||||||
// get_timeline() with non-existent timeline id should fail
|
// get_timeline() with non-existent timeline id should fail
|
||||||
@@ -380,21 +578,23 @@ mod tests {
|
|||||||
let tline = repo.create_empty_timeline(timelineid)?;
|
let tline = repo.create_empty_timeline(timelineid)?;
|
||||||
|
|
||||||
tline.init_valid_lsn(Lsn(1));
|
tline.init_valid_lsn(Lsn(1));
|
||||||
tline.put_page_image(TEST_BUF(0), Lsn(2), TEST_IMG("foo blk 0 at 2"));
|
tline.put_page_image(TEST_BUF(0), Lsn(2), TEST_IMG("foo blk 0 at 2"))?;
|
||||||
tline.put_page_image(TEST_BUF(0), Lsn(2), TEST_IMG("foo blk 0 at 2"));
|
tline.put_page_image(TEST_BUF(0), Lsn(2), TEST_IMG("foo blk 0 at 2"))?;
|
||||||
tline.put_page_image(TEST_BUF(0), Lsn(3), TEST_IMG("foo blk 0 at 3"));
|
tline.put_page_image(TEST_BUF(0), Lsn(3), TEST_IMG("foo blk 0 at 3"))?;
|
||||||
tline.put_page_image(TEST_BUF(1), Lsn(4), TEST_IMG("foo blk 1 at 4"));
|
tline.put_page_image(TEST_BUF(1), Lsn(4), TEST_IMG("foo blk 1 at 4"))?;
|
||||||
tline.put_page_image(TEST_BUF(2), Lsn(5), TEST_IMG("foo blk 2 at 5"));
|
tline.put_page_image(TEST_BUF(2), Lsn(5), TEST_IMG("foo blk 2 at 5"))?;
|
||||||
|
|
||||||
tline.advance_last_valid_lsn(Lsn(5));
|
tline.advance_last_valid_lsn(Lsn(5));
|
||||||
|
|
||||||
|
// FIXME: The rocksdb implementation erroneously returns 'true' here, even
|
||||||
|
// though the relation was created only at a later LSN
|
||||||
// rocksdb implementation erroneosly returns 'true' here
|
// rocksdb implementation erroneosly returns 'true' here
|
||||||
assert_eq!(tline.get_relsize_exists(TESTREL_A, Lsn(1))?, true); // CORRECT: false
|
assert_eq!(tline.get_relsize_exists(TESTREL_A, Lsn(1))?, true); // CORRECT: false
|
||||||
// likewise, it returns wrong size here
|
// And this probably should throw an error, becaue the relation doesn't exist at Lsn(1) yet
|
||||||
assert_eq!(tline.get_relsize(TESTREL_A, Lsn(1))?, 3); // CORRECT: 0 (or error?)
|
assert_eq!(tline.get_relsize(TESTREL_A, Lsn(1))?, 0); // CORRECT: throw error
|
||||||
|
|
||||||
assert_eq!(tline.get_relsize_exists(TESTREL_A, Lsn(2))?, true);
|
assert_eq!(tline.get_relsize_exists(TESTREL_A, Lsn(2))?, true);
|
||||||
assert_eq!(tline.get_relsize(TESTREL_A, Lsn(2))?, 3); // CORRECT: 1
|
assert_eq!(tline.get_relsize(TESTREL_A, Lsn(2))?, 1);
|
||||||
assert_eq!(tline.get_relsize(TESTREL_A, Lsn(5))?, 3);
|
assert_eq!(tline.get_relsize(TESTREL_A, Lsn(5))?, 3);
|
||||||
|
|
||||||
// Check page contents at each LSN
|
// Check page contents at each LSN
|
||||||
@@ -446,7 +646,7 @@ mod tests {
|
|||||||
);
|
);
|
||||||
|
|
||||||
// should still see the truncated block with older LSN
|
// should still see the truncated block with older LSN
|
||||||
assert_eq!(tline.get_relsize(TESTREL_A, Lsn(5))?, 2); // CORRECT: 3
|
assert_eq!(tline.get_relsize(TESTREL_A, Lsn(5))?, 3);
|
||||||
assert_eq!(
|
assert_eq!(
|
||||||
tline.get_page_at_lsn(TEST_BUF(2), Lsn(5))?,
|
tline.get_page_at_lsn(TEST_BUF(2), Lsn(5))?,
|
||||||
TEST_IMG("foo blk 2 at 5")
|
TEST_IMG("foo blk 2 at 5")
|
||||||
@@ -472,7 +672,7 @@ mod tests {
|
|||||||
for i in 0..pg_constants::RELSEG_SIZE + 1 {
|
for i in 0..pg_constants::RELSEG_SIZE + 1 {
|
||||||
let img = TEST_IMG(&format!("foo blk {} at {}", i, Lsn(lsn)));
|
let img = TEST_IMG(&format!("foo blk {} at {}", i, Lsn(lsn)));
|
||||||
lsn += 1;
|
lsn += 1;
|
||||||
tline.put_page_image(TEST_BUF(i as u32), Lsn(lsn), img);
|
tline.put_page_image(TEST_BUF(i as u32), Lsn(lsn), img)?;
|
||||||
}
|
}
|
||||||
tline.advance_last_valid_lsn(Lsn(lsn));
|
tline.advance_last_valid_lsn(Lsn(lsn));
|
||||||
|
|
||||||
|
|||||||
@@ -5,9 +5,8 @@
|
|||||||
// full page images, keyed by the RelFileNode, blocknumber, and the
|
// full page images, keyed by the RelFileNode, blocknumber, and the
|
||||||
// LSN.
|
// LSN.
|
||||||
|
|
||||||
use crate::repository::{BufferTag, RelTag, Repository, Timeline, WALRecord};
|
use crate::repository::{BufferTag, RelTag, Repository, Timeline, WALRecord, RepositoryKey, RepositoryIterator};
|
||||||
use crate::restore_local_repo::restore_timeline;
|
use crate::restore_local_repo::restore_timeline;
|
||||||
use crate::waldecoder::{Oid, TransactionId};
|
|
||||||
use crate::walredo::WalRedoManager;
|
use crate::walredo::WalRedoManager;
|
||||||
use crate::PageServerConf;
|
use crate::PageServerConf;
|
||||||
use crate::ZTimelineId;
|
use crate::ZTimelineId;
|
||||||
@@ -16,7 +15,6 @@ use crate::ZTimelineId;
|
|||||||
use anyhow::{bail, Context, Result};
|
use anyhow::{bail, Context, Result};
|
||||||
use bytes::{Buf, BufMut, Bytes, BytesMut};
|
use bytes::{Buf, BufMut, Bytes, BytesMut};
|
||||||
use log::*;
|
use log::*;
|
||||||
use postgres_ffi::nonrelfile_utils::transaction_id_get_status;
|
|
||||||
use postgres_ffi::*;
|
use postgres_ffi::*;
|
||||||
use std::cmp::min;
|
use std::cmp::min;
|
||||||
use std::collections::HashMap;
|
use std::collections::HashMap;
|
||||||
@@ -30,7 +28,7 @@ use zenith_utils::lsn::{AtomicLsn, Lsn};
|
|||||||
use zenith_utils::seqwait::SeqWait;
|
use zenith_utils::seqwait::SeqWait;
|
||||||
|
|
||||||
// Timeout when waiting or WAL receiver to catch up to an LSN given in a GetPage@LSN call.
|
// Timeout when waiting or WAL receiver to catch up to an LSN given in a GetPage@LSN call.
|
||||||
static TIMEOUT: Duration = Duration::from_secs(60);
|
static TIMEOUT: Duration = Duration::from_secs(600);
|
||||||
|
|
||||||
pub struct RocksRepository {
|
pub struct RocksRepository {
|
||||||
conf: &'static PageServerConf,
|
conf: &'static PageServerConf,
|
||||||
@@ -84,40 +82,12 @@ pub struct RocksTimeline {
|
|||||||
// stored directly in the cache entry in that you still need to run the WAL redo
|
// stored directly in the cache entry in that you still need to run the WAL redo
|
||||||
// routine to generate the page image.
|
// routine to generate the page image.
|
||||||
//
|
//
|
||||||
#[derive(Debug, PartialEq, Eq, PartialOrd, Ord, Clone)]
|
|
||||||
struct CacheKey {
|
|
||||||
pub tag: BufferTag,
|
|
||||||
pub lsn: Lsn,
|
|
||||||
}
|
|
||||||
|
|
||||||
impl CacheKey {
|
|
||||||
fn pack(&self, buf: &mut BytesMut) {
|
|
||||||
self.tag.pack(buf);
|
|
||||||
buf.put_u64(self.lsn.0);
|
|
||||||
}
|
|
||||||
fn unpack(buf: &mut Bytes) -> CacheKey {
|
|
||||||
CacheKey {
|
|
||||||
tag: BufferTag::unpack(buf),
|
|
||||||
lsn: Lsn::from(buf.get_u64()),
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
fn from_slice(slice: &[u8]) -> Self {
|
|
||||||
let mut buf = Bytes::copy_from_slice(slice);
|
|
||||||
Self::unpack(&mut buf)
|
|
||||||
}
|
|
||||||
|
|
||||||
fn to_bytes(&self) -> BytesMut {
|
|
||||||
let mut buf = BytesMut::new();
|
|
||||||
self.pack(&mut buf);
|
|
||||||
buf
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
enum CacheEntryContent {
|
enum CacheEntryContent {
|
||||||
PageImage(Bytes),
|
PageImage(Bytes),
|
||||||
WALRecord(WALRecord),
|
WALRecord(WALRecord),
|
||||||
Truncation,
|
Truncation,
|
||||||
|
Drop,
|
||||||
}
|
}
|
||||||
|
|
||||||
// The serialized representation of a CacheEntryContent begins with
|
// The serialized representation of a CacheEntryContent begins with
|
||||||
@@ -125,11 +95,12 @@ enum CacheEntryContent {
|
|||||||
// an UNUSED_VERSION_FLAG that is not represented in the CacheEntryContent
|
// an UNUSED_VERSION_FLAG that is not represented in the CacheEntryContent
|
||||||
// at all, you must peek into the first byte of the serialized representation
|
// at all, you must peek into the first byte of the serialized representation
|
||||||
// to read it.
|
// to read it.
|
||||||
const CONTENT_PAGE_IMAGE: u8 = 1u8;
|
const CONTENT_PAGE_IMAGE: u8 = 0u8;
|
||||||
const CONTENT_WAL_RECORD: u8 = 2u8;
|
const CONTENT_WAL_RECORD: u8 = 1u8;
|
||||||
const CONTENT_TRUNCATION: u8 = 3u8;
|
const CONTENT_TRUNCATION: u8 = 2u8;
|
||||||
|
const CONTENT_DROP: u8 = 3u8;
|
||||||
|
|
||||||
const CONTENT_KIND_MASK: u8 = 3u8; // bitmask that covers the above
|
const CONTENT_KIND_MASK: u8 = 3u8; // bitmask that covers the above
|
||||||
|
|
||||||
const UNUSED_VERSION_FLAG: u8 = 4u8;
|
const UNUSED_VERSION_FLAG: u8 = 4u8;
|
||||||
|
|
||||||
@@ -148,6 +119,9 @@ impl CacheEntryContent {
|
|||||||
CacheEntryContent::Truncation => {
|
CacheEntryContent::Truncation => {
|
||||||
buf.put_u8(CONTENT_TRUNCATION);
|
buf.put_u8(CONTENT_TRUNCATION);
|
||||||
}
|
}
|
||||||
|
CacheEntryContent::Drop => {
|
||||||
|
buf.put_u8(CONTENT_DROP);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
pub fn unpack(buf: &mut Bytes) -> CacheEntryContent {
|
pub fn unpack(buf: &mut Bytes) -> CacheEntryContent {
|
||||||
@@ -162,6 +136,7 @@ impl CacheEntryContent {
|
|||||||
}
|
}
|
||||||
CONTENT_WAL_RECORD => CacheEntryContent::WALRecord(WALRecord::unpack(buf)),
|
CONTENT_WAL_RECORD => CacheEntryContent::WALRecord(WALRecord::unpack(buf)),
|
||||||
CONTENT_TRUNCATION => CacheEntryContent::Truncation,
|
CONTENT_TRUNCATION => CacheEntryContent::Truncation,
|
||||||
|
CONTENT_DROP => CacheEntryContent::Drop,
|
||||||
_ => unreachable!(),
|
_ => unreachable!(),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -190,6 +165,33 @@ impl RocksRepository {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
struct RocksIterator<'a> {
|
||||||
|
iter: rocksdb::DBRawIterator<'a>,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<'a> RepositoryIterator for RocksIterator<'a> {
|
||||||
|
fn next(&mut self) {
|
||||||
|
self.iter.next()
|
||||||
|
}
|
||||||
|
fn prev(&mut self) {
|
||||||
|
self.iter.prev()
|
||||||
|
}
|
||||||
|
fn valid(&self) -> bool {
|
||||||
|
self.iter.valid()
|
||||||
|
}
|
||||||
|
fn first(&mut self, key: &RepositoryKey) {
|
||||||
|
self.iter.seek(key.to_bytes())
|
||||||
|
}
|
||||||
|
fn last(&mut self, key: &RepositoryKey) {
|
||||||
|
self.iter.seek_for_prev(key.to_bytes())
|
||||||
|
}
|
||||||
|
fn key(&self) -> RepositoryKey {
|
||||||
|
RepositoryKey::from_slice(self.iter.key().unwrap())
|
||||||
|
}
|
||||||
|
fn value(&self) -> &[u8] {
|
||||||
|
self.iter.value().unwrap()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// Get handle to a given timeline. It is assumed to already exist.
|
// Get handle to a given timeline. It is assumed to already exist.
|
||||||
impl Repository for RocksRepository {
|
impl Repository for RocksRepository {
|
||||||
@@ -299,21 +301,21 @@ impl RocksTimeline {
|
|||||||
tag: BufferTag,
|
tag: BufferTag,
|
||||||
lsn: Lsn,
|
lsn: Lsn,
|
||||||
) -> (Option<Bytes>, Vec<WALRecord>) {
|
) -> (Option<Bytes>, Vec<WALRecord>) {
|
||||||
let key = CacheKey { tag, lsn };
|
let key = RepositoryKey { tag, lsn };
|
||||||
let mut base_img: Option<Bytes> = None;
|
let mut base_img: Option<Bytes> = None;
|
||||||
let mut records: Vec<WALRecord> = Vec::new();
|
let mut records: Vec<WALRecord> = Vec::new();
|
||||||
|
|
||||||
let mut iter = self.db.raw_iterator();
|
let mut iter = self.iterator();
|
||||||
iter.seek_for_prev(key.to_bytes());
|
iter.last(&key);
|
||||||
|
|
||||||
// Scan backwards, collecting the WAL records, until we hit an
|
// Scan backwards, collecting the WAL records, until we hit an
|
||||||
// old page image.
|
// old page image.
|
||||||
while iter.valid() {
|
while iter.valid() {
|
||||||
let key = CacheKey::from_slice(iter.key().unwrap());
|
let key = iter.key();
|
||||||
if key.tag != tag {
|
if key.tag != tag {
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
let content = CacheEntryContent::from_slice(iter.value().unwrap());
|
let content = CacheEntryContent::from_slice(iter.value());
|
||||||
if let CacheEntryContent::PageImage(img) = content {
|
if let CacheEntryContent::PageImage(img) = content {
|
||||||
// We have a base image. No need to dig deeper into the list of
|
// We have a base image. No need to dig deeper into the list of
|
||||||
// records
|
// records
|
||||||
@@ -344,20 +346,26 @@ impl RocksTimeline {
|
|||||||
fn relsize_get_nowait(&self, rel: RelTag, lsn: Lsn) -> Result<u32> {
|
fn relsize_get_nowait(&self, rel: RelTag, lsn: Lsn) -> Result<u32> {
|
||||||
assert!(lsn <= self.last_valid_lsn.load());
|
assert!(lsn <= self.last_valid_lsn.load());
|
||||||
|
|
||||||
let mut key = CacheKey {
|
let mut key = RepositoryKey {
|
||||||
tag: BufferTag {
|
tag: BufferTag {
|
||||||
rel,
|
rel,
|
||||||
blknum: u32::MAX,
|
blknum: u32::MAX,
|
||||||
},
|
},
|
||||||
lsn,
|
lsn,
|
||||||
};
|
};
|
||||||
let mut iter = self.db.raw_iterator();
|
let mut iter = self.iterator();
|
||||||
loop {
|
loop {
|
||||||
iter.seek_for_prev(key.to_bytes());
|
iter.last(&key);
|
||||||
if iter.valid() {
|
if iter.valid() {
|
||||||
let thiskey = CacheKey::from_slice(iter.key().unwrap());
|
let thiskey = iter.key();
|
||||||
if thiskey.tag.rel == rel {
|
if thiskey.tag.rel == rel {
|
||||||
let content = CacheEntryContent::from_slice(iter.value().unwrap());
|
// Ignore entries with later LSNs.
|
||||||
|
if thiskey.lsn > lsn {
|
||||||
|
key.tag.blknum = thiskey.tag.blknum;
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
let content = CacheEntryContent::from_slice(iter.value());
|
||||||
if let CacheEntryContent::Truncation = content {
|
if let CacheEntryContent::Truncation = content {
|
||||||
if thiskey.tag.blknum > 0 {
|
if thiskey.tag.blknum > 0 {
|
||||||
key.tag.blknum = thiskey.tag.blknum - 1;
|
key.tag.blknum = thiskey.tag.blknum - 1;
|
||||||
@@ -376,6 +384,46 @@ impl RocksTimeline {
|
|||||||
Ok(0)
|
Ok(0)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
///
|
||||||
|
/// Drop relations with all its forks or non-relational file
|
||||||
|
///
|
||||||
|
fn drop(&self, tag: BufferTag) -> Result<()> {
|
||||||
|
let mut iter = self.iterator();
|
||||||
|
let mut key = RepositoryKey { tag, lsn: Lsn(u64::MAX) };
|
||||||
|
if tag.rel.forknum == pg_constants::MAIN_FORKNUM {
|
||||||
|
// if it is relation then remove all its blocks in all forks
|
||||||
|
key.tag.blknum = u32::MAX;
|
||||||
|
key.tag.rel.forknum = pg_constants::INIT_FORKNUM;
|
||||||
|
} else {
|
||||||
|
assert!(tag.rel.forknum > pg_constants::INIT_FORKNUM);
|
||||||
|
}
|
||||||
|
|
||||||
|
iter.last(&key);
|
||||||
|
while iter.valid() {
|
||||||
|
let key = iter.key();
|
||||||
|
if key.tag.rel.relnode != tag.rel.relnode ||
|
||||||
|
key.tag.rel.spcnode != tag.rel.spcnode ||
|
||||||
|
key.tag.rel.dbnode != tag.rel.dbnode ||
|
||||||
|
(key.tag.rel.forknum != tag.rel.forknum &&
|
||||||
|
tag.rel.forknum != pg_constants::MAIN_FORKNUM)
|
||||||
|
{
|
||||||
|
// no more entries belonging to this relation or file
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
let v = iter.value();
|
||||||
|
if (v[0] & UNUSED_VERSION_FLAG) == 0 {
|
||||||
|
let mut v = v.to_owned();
|
||||||
|
v[0] |= UNUSED_VERSION_FLAG;
|
||||||
|
self.db.put(key.to_bytes(), &v[..])?;
|
||||||
|
} else {
|
||||||
|
// already marked for deletion
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
iter.prev();
|
||||||
|
}
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
fn do_gc(&self, conf: &'static PageServerConf) -> Result<Bytes> {
|
fn do_gc(&self, conf: &'static PageServerConf) -> Result<Bytes> {
|
||||||
loop {
|
loop {
|
||||||
thread::sleep(conf.gc_period);
|
thread::sleep(conf.gc_period);
|
||||||
@@ -383,7 +431,7 @@ impl RocksTimeline {
|
|||||||
|
|
||||||
// checked_sub() returns None on overflow.
|
// checked_sub() returns None on overflow.
|
||||||
if let Some(horizon) = last_lsn.checked_sub(conf.gc_horizon) {
|
if let Some(horizon) = last_lsn.checked_sub(conf.gc_horizon) {
|
||||||
let mut maxkey = CacheKey {
|
let mut maxkey = RepositoryKey {
|
||||||
tag: BufferTag {
|
tag: BufferTag {
|
||||||
rel: RelTag {
|
rel: RelTag {
|
||||||
spcnode: u32::MAX,
|
spcnode: u32::MAX,
|
||||||
@@ -401,35 +449,50 @@ impl RocksTimeline {
|
|||||||
let mut inspected = 0u64;
|
let mut inspected = 0u64;
|
||||||
let mut deleted = 0u64;
|
let mut deleted = 0u64;
|
||||||
loop {
|
loop {
|
||||||
let mut iter = self.db.raw_iterator();
|
let mut iter = self.iterator();
|
||||||
iter.seek_for_prev(maxkey.to_bytes());
|
iter.last(&maxkey);
|
||||||
if iter.valid() {
|
if iter.valid() {
|
||||||
let key = CacheKey::from_slice(iter.key().unwrap());
|
let key = iter.key();
|
||||||
let v = iter.value().unwrap();
|
let v = iter.value();
|
||||||
|
let flag = v[0];
|
||||||
|
let last_lsn = key.lsn;
|
||||||
|
|
||||||
inspected += 1;
|
inspected += 1;
|
||||||
|
|
||||||
// Construct boundaries for old records cleanup
|
// Construct boundaries for old records cleanup
|
||||||
maxkey.tag = key.tag;
|
maxkey.tag = key.tag;
|
||||||
let last_lsn = key.lsn;
|
|
||||||
maxkey.lsn = min(horizon, last_lsn); // do not remove last version
|
maxkey.lsn = min(horizon, last_lsn); // do not remove last version
|
||||||
|
|
||||||
let mut minkey = maxkey.clone();
|
let mut minkey = maxkey.clone();
|
||||||
minkey.lsn = Lsn(0); // first version
|
minkey.lsn = Lsn(0); // first version
|
||||||
|
|
||||||
// Special handling of delete of PREPARE WAL record
|
if (flag & CONTENT_KIND_MASK) == CONTENT_DROP {
|
||||||
|
// If drop record in over the horizon then delete all entries from repository
|
||||||
|
if last_lsn < horizon {
|
||||||
|
self.drop(key.tag)?;
|
||||||
|
}
|
||||||
|
maxkey = minkey;
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
// Special handling of PREPARE transaction WAL record.
|
||||||
if last_lsn < horizon
|
if last_lsn < horizon
|
||||||
&& key.tag.rel.forknum == pg_constants::PG_TWOPHASE_FORKNUM
|
&& key.tag.rel.forknum == pg_constants::PG_TWOPHASE_FORKNUM
|
||||||
{
|
{
|
||||||
if (v[0] & UNUSED_VERSION_FLAG) == 0 {
|
let xid = key.tag.blknum;
|
||||||
let mut v = v.to_owned();
|
// We can not remove information about uncompleted prepared transaction
|
||||||
v[0] |= UNUSED_VERSION_FLAG;
|
// even if it is over horizon
|
||||||
self.db.put(key.to_bytes(), &v[..])?;
|
if self.get_tx_status(xid, horizon)? != pg_constants::TRANSACTION_STATUS_IN_PROGRESS {
|
||||||
deleted += 1;
|
if (v[0] & UNUSED_VERSION_FLAG) == 0 {
|
||||||
}
|
let mut v = v.to_owned();
|
||||||
maxkey = minkey;
|
v[0] |= UNUSED_VERSION_FLAG;
|
||||||
continue;
|
self.db.put(key.to_bytes(), &v[..])?;
|
||||||
}
|
deleted += 1;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
maxkey = minkey;
|
||||||
|
continue;
|
||||||
|
}
|
||||||
// reconstruct most recent page version
|
// reconstruct most recent page version
|
||||||
if (v[0] & CONTENT_KIND_MASK) == CONTENT_WAL_RECORD {
|
if (v[0] & CONTENT_KIND_MASK) == CONTENT_WAL_RECORD {
|
||||||
// force reconstruction of most recent page version
|
// force reconstruction of most recent page version
|
||||||
@@ -447,19 +510,19 @@ impl RocksTimeline {
|
|||||||
let new_img = self
|
let new_img = self
|
||||||
.walredo_mgr
|
.walredo_mgr
|
||||||
.request_redo(key.tag, key.lsn, base_img, records)?;
|
.request_redo(key.tag, key.lsn, base_img, records)?;
|
||||||
self.put_page_image(key.tag, key.lsn, new_img.clone());
|
self.put_page_image(key.tag, key.lsn, new_img.clone())?;
|
||||||
|
|
||||||
reconstructed += 1;
|
reconstructed += 1;
|
||||||
}
|
}
|
||||||
|
|
||||||
iter.seek_for_prev(maxkey.to_bytes());
|
iter.last(&maxkey);
|
||||||
if iter.valid() {
|
if iter.valid() {
|
||||||
// do not remove last version
|
// do not remove last version
|
||||||
if last_lsn > horizon {
|
if last_lsn > horizon {
|
||||||
// locate most recent record before horizon
|
// locate most recent record before horizon
|
||||||
let key = CacheKey::from_slice(iter.key().unwrap());
|
let key = iter.key();
|
||||||
if key.tag == maxkey.tag {
|
if key.tag == maxkey.tag {
|
||||||
let v = iter.value().unwrap();
|
let v = iter.value();
|
||||||
if (v[0] & CONTENT_KIND_MASK) == CONTENT_WAL_RECORD {
|
if (v[0] & CONTENT_KIND_MASK) == CONTENT_WAL_RECORD {
|
||||||
let (base_img, records) =
|
let (base_img, records) =
|
||||||
self.collect_records_for_apply(key.tag, key.lsn);
|
self.collect_records_for_apply(key.tag, key.lsn);
|
||||||
@@ -468,7 +531,7 @@ impl RocksTimeline {
|
|||||||
let new_img = self
|
let new_img = self
|
||||||
.walredo_mgr
|
.walredo_mgr
|
||||||
.request_redo(key.tag, key.lsn, base_img, records)?;
|
.request_redo(key.tag, key.lsn, base_img, records)?;
|
||||||
self.put_page_image(key.tag, key.lsn, new_img.clone());
|
self.put_page_image(key.tag, key.lsn, new_img.clone())?;
|
||||||
|
|
||||||
truncated += 1;
|
truncated += 1;
|
||||||
} else {
|
} else {
|
||||||
@@ -495,11 +558,11 @@ impl RocksTimeline {
|
|||||||
if !iter.valid() {
|
if !iter.valid() {
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
let key = CacheKey::from_slice(iter.key().unwrap());
|
let key = iter.key();
|
||||||
if key.tag != maxkey.tag {
|
if key.tag != maxkey.tag {
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
let v = iter.value().unwrap();
|
let v = iter.value();
|
||||||
if (v[0] & UNUSED_VERSION_FLAG) == 0 {
|
if (v[0] & UNUSED_VERSION_FLAG) == 0 {
|
||||||
let mut v = v.to_owned();
|
let mut v = v.to_owned();
|
||||||
v[0] |= UNUSED_VERSION_FLAG;
|
v[0] |= UNUSED_VERSION_FLAG;
|
||||||
@@ -526,35 +589,6 @@ impl RocksTimeline {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
//
|
|
||||||
// Wait until WAL has been received up to the given LSN.
|
|
||||||
//
|
|
||||||
fn wait_lsn(&self, mut lsn: Lsn) -> Result<Lsn> {
|
|
||||||
// When invalid LSN is requested, it means "don't wait, return latest version of the page"
|
|
||||||
// This is necessary for bootstrap.
|
|
||||||
if lsn == Lsn(0) {
|
|
||||||
let last_valid_lsn = self.last_valid_lsn.load();
|
|
||||||
trace!(
|
|
||||||
"walreceiver doesn't work yet last_valid_lsn {}, requested {}",
|
|
||||||
last_valid_lsn,
|
|
||||||
lsn
|
|
||||||
);
|
|
||||||
lsn = last_valid_lsn;
|
|
||||||
}
|
|
||||||
//trace!("Start waiting for LSN {}, valid LSN is {}", lsn, self.last_valid_lsn.load());
|
|
||||||
self.last_valid_lsn
|
|
||||||
.wait_for_timeout(lsn, TIMEOUT)
|
|
||||||
.with_context(|| {
|
|
||||||
format!(
|
|
||||||
"Timed out while waiting for WAL record at LSN {} to arrive",
|
|
||||||
lsn
|
|
||||||
)
|
|
||||||
})?;
|
|
||||||
//trace!("Stop waiting for LSN {}, valid LSN is {}", lsn, self.last_valid_lsn.load());
|
|
||||||
|
|
||||||
Ok(lsn)
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
impl Timeline for RocksTimeline {
|
impl Timeline for RocksTimeline {
|
||||||
@@ -572,15 +606,15 @@ impl Timeline for RocksTimeline {
|
|||||||
|
|
||||||
// Look up cache entry. If it's a page image, return that. If it's a WAL record,
|
// Look up cache entry. If it's a page image, return that. If it's a WAL record,
|
||||||
// ask the WAL redo service to reconstruct the page image from the WAL records.
|
// ask the WAL redo service to reconstruct the page image from the WAL records.
|
||||||
let key = CacheKey { tag, lsn };
|
let key = RepositoryKey { tag, lsn };
|
||||||
|
|
||||||
let mut iter = self.db.raw_iterator();
|
let mut iter = self.iterator();
|
||||||
iter.seek_for_prev(key.to_bytes());
|
iter.last(&key);
|
||||||
|
|
||||||
if iter.valid() {
|
if iter.valid() {
|
||||||
let key = CacheKey::from_slice(iter.key().unwrap());
|
let key = iter.key();
|
||||||
if key.tag == tag {
|
if key.tag == tag {
|
||||||
let content = CacheEntryContent::from_slice(iter.value().unwrap());
|
let content = CacheEntryContent::from_slice(iter.value());
|
||||||
let page_img: Bytes;
|
let page_img: Bytes;
|
||||||
if let CacheEntryContent::PageImage(img) = content {
|
if let CacheEntryContent::PageImage(img) = content {
|
||||||
page_img = img;
|
page_img = img;
|
||||||
@@ -589,10 +623,16 @@ impl Timeline for RocksTimeline {
|
|||||||
let (base_img, records) = self.collect_records_for_apply(tag, lsn);
|
let (base_img, records) = self.collect_records_for_apply(tag, lsn);
|
||||||
page_img = self.walredo_mgr.request_redo(tag, lsn, base_img, records)?;
|
page_img = self.walredo_mgr.request_redo(tag, lsn, base_img, records)?;
|
||||||
|
|
||||||
self.put_page_image(tag, lsn, page_img.clone());
|
self.put_page_image(tag, lsn, page_img.clone())?;
|
||||||
} else {
|
} else {
|
||||||
// No base image, and no WAL record. Huh?
|
// No base image, and no WAL record. Huh?
|
||||||
bail!("no page image or WAL record for requested page");
|
bail!(
|
||||||
|
"no page image or WAL record for requested page {} blk {} at lsn {}({})",
|
||||||
|
tag.rel,
|
||||||
|
tag.blknum,
|
||||||
|
req_lsn,
|
||||||
|
lsn
|
||||||
|
);
|
||||||
}
|
}
|
||||||
// FIXME: assumes little-endian. Only used for the debugging log though
|
// FIXME: assumes little-endian. Only used for the debugging log though
|
||||||
let page_lsn_hi =
|
let page_lsn_hi =
|
||||||
@@ -623,116 +663,6 @@ impl Timeline for RocksTimeline {
|
|||||||
self.relsize_get_nowait(rel, lsn)
|
self.relsize_get_nowait(rel, lsn)
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Get vector of prepared twophase transactions
|
|
||||||
fn get_twophase(&self, lsn: Lsn) -> Result<Vec<TransactionId>> {
|
|
||||||
let key = CacheKey {
|
|
||||||
// minimal key
|
|
||||||
tag: BufferTag {
|
|
||||||
rel: RelTag {
|
|
||||||
forknum: pg_constants::PG_TWOPHASE_FORKNUM,
|
|
||||||
spcnode: 0,
|
|
||||||
dbnode: 0,
|
|
||||||
relnode: 0,
|
|
||||||
},
|
|
||||||
blknum: 0,
|
|
||||||
},
|
|
||||||
lsn: Lsn(0),
|
|
||||||
};
|
|
||||||
let mut gxacts = Vec::new();
|
|
||||||
|
|
||||||
let mut iter = self.db.raw_iterator();
|
|
||||||
iter.seek(key.to_bytes());
|
|
||||||
while iter.valid() {
|
|
||||||
let key = CacheKey::from_slice(iter.key().unwrap());
|
|
||||||
if key.tag.rel.forknum != pg_constants::PG_TWOPHASE_FORKNUM {
|
|
||||||
break; // we are done with this fork
|
|
||||||
}
|
|
||||||
if key.lsn <= lsn {
|
|
||||||
let xid = key.tag.blknum;
|
|
||||||
let tag = BufferTag {
|
|
||||||
rel: RelTag {
|
|
||||||
forknum: pg_constants::PG_XACT_FORKNUM,
|
|
||||||
spcnode: 0,
|
|
||||||
dbnode: 0,
|
|
||||||
relnode: 0,
|
|
||||||
},
|
|
||||||
blknum: xid / pg_constants::CLOG_XACTS_PER_PAGE,
|
|
||||||
};
|
|
||||||
let clog_page = self.get_page_at_lsn(tag, lsn)?;
|
|
||||||
let status = transaction_id_get_status(xid, &clog_page[..]);
|
|
||||||
if status == pg_constants::TRANSACTION_STATUS_IN_PROGRESS {
|
|
||||||
gxacts.push(xid);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
iter.next();
|
|
||||||
}
|
|
||||||
Ok(gxacts)
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Get databases. This function is used to local pg_filenode.map files
|
|
||||||
fn get_databases(&self, lsn: Lsn) -> Result<Vec<RelTag>> {
|
|
||||||
let key = CacheKey {
|
|
||||||
// minimal key
|
|
||||||
tag: BufferTag {
|
|
||||||
rel: RelTag {
|
|
||||||
forknum: pg_constants::PG_FILENODEMAP_FORKNUM,
|
|
||||||
spcnode: 0,
|
|
||||||
dbnode: 0,
|
|
||||||
relnode: 0,
|
|
||||||
},
|
|
||||||
blknum: 0,
|
|
||||||
},
|
|
||||||
lsn: Lsn(0),
|
|
||||||
};
|
|
||||||
let mut dbs = Vec::new();
|
|
||||||
|
|
||||||
let mut iter = self.db.raw_iterator();
|
|
||||||
iter.seek(key.to_bytes());
|
|
||||||
let mut prev_tag = key.tag.rel;
|
|
||||||
while iter.valid() {
|
|
||||||
let key = CacheKey::from_slice(iter.key().unwrap());
|
|
||||||
if key.tag.rel.forknum != pg_constants::PG_FILENODEMAP_FORKNUM {
|
|
||||||
break; // we are done with this fork
|
|
||||||
}
|
|
||||||
if key.tag.rel != prev_tag && key.lsn <= lsn {
|
|
||||||
prev_tag = key.tag.rel;
|
|
||||||
dbs.push(prev_tag); // collect unique tags
|
|
||||||
}
|
|
||||||
iter.next();
|
|
||||||
}
|
|
||||||
Ok(dbs)
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Get range [begin,end) of stored blocks. Used mostly for SMGR pseudorelations
|
|
||||||
/// but can be also applied to normal relations.
|
|
||||||
fn get_range(&self, rel: RelTag, lsn: Lsn) -> Result<(u32, u32)> {
|
|
||||||
let _lsn = self.wait_lsn(lsn)?;
|
|
||||||
let mut key = CacheKey {
|
|
||||||
// minimal key to start with
|
|
||||||
tag: BufferTag { rel, blknum: 0 },
|
|
||||||
lsn: Lsn(0),
|
|
||||||
};
|
|
||||||
let mut iter = self.db.raw_iterator();
|
|
||||||
iter.seek(key.to_bytes()); // locate first entry
|
|
||||||
if iter.valid() {
|
|
||||||
let thiskey = CacheKey::from_slice(iter.key().unwrap());
|
|
||||||
let tag = thiskey.tag;
|
|
||||||
if tag.rel == rel {
|
|
||||||
// still trversing this relation
|
|
||||||
let first_blknum = tag.blknum;
|
|
||||||
key.tag.blknum = u32::MAX; // maximal key
|
|
||||||
let mut iter = self.db.raw_iterator();
|
|
||||||
iter.seek_for_prev(key.to_bytes()); // localte last entry
|
|
||||||
if iter.valid() {
|
|
||||||
let thiskey = CacheKey::from_slice(iter.key().unwrap());
|
|
||||||
let last_blknum = thiskey.tag.blknum;
|
|
||||||
return Ok((first_blknum, last_blknum + 1)); // upper boundary is exclusive
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
Ok((0, 0)) // empty range
|
|
||||||
}
|
|
||||||
|
|
||||||
///
|
///
|
||||||
/// Does relation exist at given LSN?
|
/// Does relation exist at given LSN?
|
||||||
///
|
///
|
||||||
@@ -740,17 +670,17 @@ impl Timeline for RocksTimeline {
|
|||||||
fn get_relsize_exists(&self, rel: RelTag, req_lsn: Lsn) -> Result<bool> {
|
fn get_relsize_exists(&self, rel: RelTag, req_lsn: Lsn) -> Result<bool> {
|
||||||
let lsn = self.wait_lsn(req_lsn)?;
|
let lsn = self.wait_lsn(req_lsn)?;
|
||||||
|
|
||||||
let key = CacheKey {
|
let key = RepositoryKey {
|
||||||
tag: BufferTag {
|
tag: BufferTag {
|
||||||
rel,
|
rel,
|
||||||
blknum: u32::MAX,
|
blknum: u32::MAX,
|
||||||
},
|
},
|
||||||
lsn,
|
lsn,
|
||||||
};
|
};
|
||||||
let mut iter = self.db.raw_iterator();
|
let mut iter = self.iterator();
|
||||||
iter.seek_for_prev(key.to_bytes());
|
iter.last(&key);
|
||||||
if iter.valid() {
|
if iter.valid() {
|
||||||
let key = CacheKey::from_slice(iter.key().unwrap());
|
let key = iter.key();
|
||||||
if key.tag.rel == rel {
|
if key.tag.rel == rel {
|
||||||
debug!("Relation {} exists at {}", rel, lsn);
|
debug!("Relation {} exists at {}", rel, lsn);
|
||||||
return Ok(true);
|
return Ok(true);
|
||||||
@@ -766,13 +696,13 @@ impl Timeline for RocksTimeline {
|
|||||||
///
|
///
|
||||||
/// Adds a WAL record to the repository
|
/// Adds a WAL record to the repository
|
||||||
///
|
///
|
||||||
fn put_wal_record(&self, tag: BufferTag, rec: WALRecord) {
|
fn put_wal_record(&self, tag: BufferTag, rec: WALRecord) -> Result<()> {
|
||||||
let lsn = rec.lsn;
|
let lsn = rec.lsn;
|
||||||
let key = CacheKey { tag, lsn };
|
let key = RepositoryKey { tag, lsn };
|
||||||
|
|
||||||
let content = CacheEntryContent::WALRecord(rec);
|
let content = CacheEntryContent::WALRecord(rec);
|
||||||
|
|
||||||
let _res = self.db.put(key.to_bytes(), content.to_bytes());
|
let _res = self.db.put(key.to_bytes(), content.to_bytes())?;
|
||||||
trace!(
|
trace!(
|
||||||
"put_wal_record rel {} blk {} at {}",
|
"put_wal_record rel {} blk {} at {}",
|
||||||
tag.rel,
|
tag.rel,
|
||||||
@@ -782,8 +712,20 @@ impl Timeline for RocksTimeline {
|
|||||||
|
|
||||||
self.num_entries.fetch_add(1, Ordering::Relaxed);
|
self.num_entries.fetch_add(1, Ordering::Relaxed);
|
||||||
self.num_wal_records.fetch_add(1, Ordering::Relaxed);
|
self.num_wal_records.fetch_add(1, Ordering::Relaxed);
|
||||||
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
|
///
|
||||||
|
/// Put drop record which should completely delete relation with all its forks
|
||||||
|
/// or non-relational file from repository
|
||||||
|
///
|
||||||
|
fn put_drop(&self, tag: BufferTag, lsn: Lsn) -> Result<()> {
|
||||||
|
let key = RepositoryKey { tag, lsn };
|
||||||
|
let content = CacheEntryContent::Drop;
|
||||||
|
let _res = self.db.put(key.to_bytes(), content.to_bytes())?;
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
///
|
///
|
||||||
/// Adds a relation-wide WAL record (like truncate) to the repository,
|
/// Adds a relation-wide WAL record (like truncate) to the repository,
|
||||||
/// associating it with all pages started with specified block number
|
/// associating it with all pages started with specified block number
|
||||||
@@ -798,12 +740,12 @@ impl Timeline for RocksTimeline {
|
|||||||
trace!("Truncate relation {} to {} blocks at {}", rel, nblocks, lsn);
|
trace!("Truncate relation {} to {} blocks at {}", rel, nblocks, lsn);
|
||||||
|
|
||||||
for blknum in nblocks..old_rel_size {
|
for blknum in nblocks..old_rel_size {
|
||||||
let key = CacheKey {
|
let key = RepositoryKey {
|
||||||
tag: BufferTag { rel, blknum },
|
tag: BufferTag { rel, blknum },
|
||||||
lsn,
|
lsn,
|
||||||
};
|
};
|
||||||
trace!("put_wal_record lsn: {}", key.lsn);
|
trace!("put_wal_record lsn: {}", key.lsn);
|
||||||
let _res = self.db.put(key.to_bytes(), content.to_bytes());
|
let _res = self.db.put(key.to_bytes(), content.to_bytes())?;
|
||||||
}
|
}
|
||||||
let n = (old_rel_size - nblocks) as u64;
|
let n = (old_rel_size - nblocks) as u64;
|
||||||
self.num_entries.fetch_add(n, Ordering::Relaxed);
|
self.num_entries.fetch_add(n, Ordering::Relaxed);
|
||||||
@@ -815,7 +757,7 @@ impl Timeline for RocksTimeline {
|
|||||||
/// Get page image at particular LSN
|
/// Get page image at particular LSN
|
||||||
///
|
///
|
||||||
fn get_page_image(&self, tag: BufferTag, lsn: Lsn) -> Result<Option<Bytes>> {
|
fn get_page_image(&self, tag: BufferTag, lsn: Lsn) -> Result<Option<Bytes>> {
|
||||||
let key = CacheKey { tag, lsn };
|
let key = RepositoryKey { tag, lsn };
|
||||||
if let Some(bytes) = self.db.get(key.to_bytes())? {
|
if let Some(bytes) = self.db.get(key.to_bytes())? {
|
||||||
let content = CacheEntryContent::from_slice(&bytes);
|
let content = CacheEntryContent::from_slice(&bytes);
|
||||||
if let CacheEntryContent::PageImage(img) = content {
|
if let CacheEntryContent::PageImage(img) = content {
|
||||||
@@ -828,26 +770,12 @@ impl Timeline for RocksTimeline {
|
|||||||
///
|
///
|
||||||
/// Memorize a full image of a page version
|
/// Memorize a full image of a page version
|
||||||
///
|
///
|
||||||
fn put_page_image(&self, tag: BufferTag, lsn: Lsn, img: Bytes) {
|
fn put_page_image(&self, tag: BufferTag, lsn: Lsn, img: Bytes) -> Result<()> {
|
||||||
let img_len = img.len();
|
let key = RepositoryKey { tag, lsn };
|
||||||
let key = CacheKey { tag, lsn };
|
|
||||||
let content = CacheEntryContent::PageImage(img);
|
let content = CacheEntryContent::PageImage(img);
|
||||||
|
|
||||||
let mut val_buf = content.to_bytes();
|
|
||||||
|
|
||||||
// Zero size of page image indicates that page can be removed
|
|
||||||
if img_len == 0 {
|
|
||||||
if (val_buf[0] & UNUSED_VERSION_FLAG) != 0 {
|
|
||||||
// records already marked for deletion
|
|
||||||
return;
|
|
||||||
} else {
|
|
||||||
// delete truncated multixact page
|
|
||||||
val_buf[0] |= UNUSED_VERSION_FLAG;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
trace!("put_wal_record lsn: {}", key.lsn);
|
trace!("put_wal_record lsn: {}", key.lsn);
|
||||||
let _res = self.db.put(key.to_bytes(), content.to_bytes());
|
let _res = self.db.put(key.to_bytes(), content.to_bytes())?;
|
||||||
|
|
||||||
trace!(
|
trace!(
|
||||||
"put_page_image rel {} blk {} at {}",
|
"put_page_image rel {} blk {} at {}",
|
||||||
@@ -856,57 +784,17 @@ impl Timeline for RocksTimeline {
|
|||||||
lsn
|
lsn
|
||||||
);
|
);
|
||||||
self.num_page_images.fetch_add(1, Ordering::Relaxed);
|
self.num_page_images.fetch_add(1, Ordering::Relaxed);
|
||||||
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
fn put_create_database(
|
fn iterator(&self) -> Box<dyn RepositoryIterator + '_> {
|
||||||
&self,
|
Box::new(RocksIterator {
|
||||||
lsn: Lsn,
|
iter: self.db.raw_iterator(),
|
||||||
db_id: Oid,
|
})
|
||||||
tablespace_id: Oid,
|
}
|
||||||
src_db_id: Oid,
|
|
||||||
src_tablespace_id: Oid,
|
|
||||||
) -> Result<()> {
|
|
||||||
let mut n = 0;
|
|
||||||
for forknum in &[
|
|
||||||
pg_constants::MAIN_FORKNUM,
|
|
||||||
pg_constants::FSM_FORKNUM,
|
|
||||||
pg_constants::VISIBILITYMAP_FORKNUM,
|
|
||||||
pg_constants::INIT_FORKNUM,
|
|
||||||
pg_constants::PG_FILENODEMAP_FORKNUM,
|
|
||||||
] {
|
|
||||||
let key = CacheKey {
|
|
||||||
tag: BufferTag {
|
|
||||||
rel: RelTag {
|
|
||||||
spcnode: src_tablespace_id,
|
|
||||||
dbnode: src_db_id,
|
|
||||||
relnode: 0,
|
|
||||||
forknum: *forknum,
|
|
||||||
},
|
|
||||||
blknum: 0,
|
|
||||||
},
|
|
||||||
lsn: Lsn(0),
|
|
||||||
};
|
|
||||||
let mut iter = self.db.raw_iterator();
|
|
||||||
iter.seek(key.to_bytes());
|
|
||||||
while iter.valid() {
|
|
||||||
let mut key = CacheKey::from_slice(iter.key().unwrap());
|
|
||||||
if key.tag.rel.spcnode != src_tablespace_id || key.tag.rel.dbnode != src_db_id {
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
key.tag.rel.spcnode = tablespace_id;
|
|
||||||
key.tag.rel.dbnode = db_id;
|
|
||||||
key.lsn = lsn;
|
|
||||||
|
|
||||||
let v = iter.value().unwrap();
|
fn put_raw_data(&self, key: RepositoryKey, data: &[u8]) -> Result<()> {
|
||||||
self.db.put(key.to_bytes(), v)?;
|
self.db.put(key.to_bytes(), data)?;
|
||||||
n += 1;
|
|
||||||
iter.next();
|
|
||||||
}
|
|
||||||
}
|
|
||||||
info!(
|
|
||||||
"Create database {}/{}, copy {} entries",
|
|
||||||
tablespace_id, db_id, n
|
|
||||||
);
|
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -961,6 +849,36 @@ impl Timeline for RocksTimeline {
|
|||||||
self.last_valid_lsn.load()
|
self.last_valid_lsn.load()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
//
|
||||||
|
// Wait until WAL has been received up to the given LSN.
|
||||||
|
//
|
||||||
|
fn wait_lsn(&self, lsn: Lsn) -> Result<Lsn> {
|
||||||
|
let mut lsn = lsn;
|
||||||
|
// When invalid LSN is requested, it means "don't wait, return latest version of the page"
|
||||||
|
// This is necessary for bootstrap.
|
||||||
|
if lsn == Lsn(0) {
|
||||||
|
let last_valid_lsn = self.last_valid_lsn.load();
|
||||||
|
trace!(
|
||||||
|
"walreceiver doesn't work yet last_valid_lsn {}, requested {}",
|
||||||
|
last_valid_lsn,
|
||||||
|
lsn
|
||||||
|
);
|
||||||
|
lsn = last_valid_lsn;
|
||||||
|
}
|
||||||
|
//trace!("Start waiting for LSN {}, valid LSN is {}", lsn, self.last_valid_lsn.load());
|
||||||
|
self.last_valid_lsn
|
||||||
|
.wait_for_timeout(lsn, TIMEOUT)
|
||||||
|
.with_context(|| {
|
||||||
|
format!(
|
||||||
|
"Timed out while waiting for WAL record at LSN {} to arrive",
|
||||||
|
lsn
|
||||||
|
)
|
||||||
|
})?;
|
||||||
|
//trace!("Stop waiting for LSN {}, valid LSN is {}", lsn, self.last_valid_lsn.load());
|
||||||
|
|
||||||
|
Ok(lsn)
|
||||||
|
}
|
||||||
|
|
||||||
//
|
//
|
||||||
// Get statistics to be displayed in the user interface.
|
// Get statistics to be displayed in the user interface.
|
||||||
//
|
//
|
||||||
|
|||||||
@@ -279,7 +279,7 @@ fn restore_relfile(
|
|||||||
},
|
},
|
||||||
blknum,
|
blknum,
|
||||||
};
|
};
|
||||||
timeline.put_page_image(tag, lsn, Bytes::copy_from_slice(&buf));
|
timeline.put_page_image(tag, lsn, Bytes::copy_from_slice(&buf))?;
|
||||||
/*
|
/*
|
||||||
if oldest_lsn == 0 || p.lsn < oldest_lsn {
|
if oldest_lsn == 0 || p.lsn < oldest_lsn {
|
||||||
oldest_lsn = p.lsn;
|
oldest_lsn = p.lsn;
|
||||||
@@ -335,7 +335,7 @@ fn restore_nonrel_file(
|
|||||||
},
|
},
|
||||||
blknum,
|
blknum,
|
||||||
};
|
};
|
||||||
timeline.put_page_image(tag, lsn, Bytes::copy_from_slice(&buffer[..]));
|
timeline.put_page_image(tag, lsn, Bytes::copy_from_slice(&buffer[..]))?;
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -369,7 +369,7 @@ fn restore_slru_file(
|
|||||||
},
|
},
|
||||||
blknum,
|
blknum,
|
||||||
};
|
};
|
||||||
timeline.put_page_image(tag, lsn, Bytes::copy_from_slice(&buf));
|
timeline.put_page_image(tag, lsn, Bytes::copy_from_slice(&buf))?;
|
||||||
/*
|
/*
|
||||||
if oldest_lsn == 0 || p.lsn < oldest_lsn {
|
if oldest_lsn == 0 || p.lsn < oldest_lsn {
|
||||||
oldest_lsn = p.lsn;
|
oldest_lsn = p.lsn;
|
||||||
@@ -478,6 +478,6 @@ fn restore_wal(timeline: &dyn Timeline, timelineid: ZTimelineId, startpoint: Lsn
|
|||||||
}
|
}
|
||||||
info!("reached end of WAL at {}", last_lsn);
|
info!("reached end of WAL at {}", last_lsn);
|
||||||
let checkpoint_bytes = encode_checkpoint(checkpoint);
|
let checkpoint_bytes = encode_checkpoint(checkpoint);
|
||||||
timeline.put_page_image(checkpoint_tag, Lsn(0), checkpoint_bytes);
|
timeline.put_page_image(checkpoint_tag, Lsn(0), checkpoint_bytes)?;
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -273,7 +273,8 @@ pub struct DecodedBkpBlock {
|
|||||||
/* Information on full-page image, if any */
|
/* Information on full-page image, if any */
|
||||||
has_image: bool, /* has image, even for consistency checking */
|
has_image: bool, /* has image, even for consistency checking */
|
||||||
pub apply_image: bool, /* has image that should be restored */
|
pub apply_image: bool, /* has image that should be restored */
|
||||||
pub will_init: bool,
|
pub will_init: bool, /* record intialize page content */
|
||||||
|
pub will_drop: bool, /* record drops relation */
|
||||||
//char *bkp_image;
|
//char *bkp_image;
|
||||||
hole_offset: u16,
|
hole_offset: u16,
|
||||||
hole_length: u16,
|
hole_length: u16,
|
||||||
@@ -298,6 +299,7 @@ impl DecodedBkpBlock {
|
|||||||
has_image: false,
|
has_image: false,
|
||||||
apply_image: false,
|
apply_image: false,
|
||||||
will_init: false,
|
will_init: false,
|
||||||
|
will_drop: false,
|
||||||
hole_offset: 0,
|
hole_offset: 0,
|
||||||
hole_length: 0,
|
hole_length: 0,
|
||||||
bimg_len: 0,
|
bimg_len: 0,
|
||||||
@@ -574,10 +576,10 @@ pub fn decode_wal_record(checkpoint: &mut CheckPoint, record: Bytes) -> DecodedW
|
|||||||
xlogrec.xl_rmid,
|
xlogrec.xl_rmid,
|
||||||
xlogrec.xl_info
|
xlogrec.xl_info
|
||||||
);
|
);
|
||||||
if xlogrec.xl_xid > checkpoint.nextXid.value as u32 {
|
if xlogrec.xl_xid >= checkpoint.nextXid.value as u32 {
|
||||||
// TODO: handle XID wraparound
|
// TODO: handle XID wraparound
|
||||||
checkpoint.nextXid = FullTransactionId {
|
checkpoint.nextXid = FullTransactionId {
|
||||||
value: (checkpoint.nextXid.value & 0xFFFFFFFF00000000) | xlogrec.xl_xid as u64,
|
value: (checkpoint.nextXid.value & 0xFFFFFFFF00000000) | (xlogrec.xl_xid+1) as u64,
|
||||||
};
|
};
|
||||||
}
|
}
|
||||||
let remaining = xlogrec.xl_tot_len - SizeOfXLogRecord;
|
let remaining = xlogrec.xl_tot_len - SizeOfXLogRecord;
|
||||||
@@ -795,25 +797,36 @@ pub fn decode_wal_record(checkpoint: &mut CheckPoint, record: Bytes) -> DecodedW
|
|||||||
let mut blk = DecodedBkpBlock::new();
|
let mut blk = DecodedBkpBlock::new();
|
||||||
blk.forknum = pg_constants::PG_XACT_FORKNUM;
|
blk.forknum = pg_constants::PG_XACT_FORKNUM;
|
||||||
blk.blkno = buf.get_i32_le() as u32;
|
blk.blkno = buf.get_i32_le() as u32;
|
||||||
blk.will_init = true;
|
let info = xlogrec.xl_info & !pg_constants::XLR_INFO_MASK;
|
||||||
|
if info == pg_constants::CLOG_ZEROPAGE {
|
||||||
|
blk.will_init = true;
|
||||||
|
} else {
|
||||||
|
assert!(info == pg_constants::CLOG_TRUNCATE);
|
||||||
|
blk.will_drop = true;
|
||||||
|
checkpoint.oldestXid = buf.get_u32_le();
|
||||||
|
checkpoint.oldestXidDB = buf.get_u32_le();
|
||||||
|
info!("RM_CLOG_ID truncate blkno {} oldestXid {} oldestXidDB {}",
|
||||||
|
blk.blkno, checkpoint.oldestXid, checkpoint.oldestXidDB);
|
||||||
|
}
|
||||||
trace!("RM_CLOG_ID updates block {}", blk.blkno);
|
trace!("RM_CLOG_ID updates block {}", blk.blkno);
|
||||||
blocks.push(blk);
|
blocks.push(blk);
|
||||||
} else if xlogrec.xl_rmid == pg_constants::RM_XACT_ID {
|
} else if xlogrec.xl_rmid == pg_constants::RM_XACT_ID {
|
||||||
let info = xlogrec.xl_info & pg_constants::XLOG_XACT_OPMASK;
|
let info = xlogrec.xl_info & pg_constants::XLOG_XACT_OPMASK;
|
||||||
if info == pg_constants::XLOG_XACT_COMMIT {
|
if info == pg_constants::XLOG_XACT_COMMIT || info == pg_constants::XLOG_XACT_COMMIT_PREPARED {
|
||||||
let mut blk = DecodedBkpBlock::new();
|
if info == pg_constants::XLOG_XACT_COMMIT {
|
||||||
blk.forknum = pg_constants::PG_XACT_FORKNUM;
|
let mut blk = DecodedBkpBlock::new();
|
||||||
blk.blkno = xlogrec.xl_xid / pg_constants::CLOG_XACTS_PER_PAGE;
|
blk.forknum = pg_constants::PG_XACT_FORKNUM;
|
||||||
trace!(
|
blk.blkno = xlogrec.xl_xid / pg_constants::CLOG_XACTS_PER_PAGE;
|
||||||
"XLOG_XACT_COMMIT xl_info {} xl_prev {:X}/{:X} xid {} updates block {} main_data_len {}",
|
trace!(
|
||||||
xlogrec.xl_info, (xlogrec.xl_prev >> 32),
|
"XLOG_XACT_COMMIT xl_info {} xl_prev {:X}/{:X} xid {} updates block {} main_data_len {}",
|
||||||
xlogrec.xl_prev & 0xffffffff,
|
xlogrec.xl_info, (xlogrec.xl_prev >> 32),
|
||||||
xlogrec.xl_xid,
|
xlogrec.xl_prev & 0xffffffff,
|
||||||
blk.blkno,
|
xlogrec.xl_xid,
|
||||||
main_data_len
|
blk.blkno,
|
||||||
);
|
main_data_len
|
||||||
blocks.push(blk);
|
);
|
||||||
|
blocks.push(blk);
|
||||||
|
}
|
||||||
//parse commit record to extract subtrans entries
|
//parse commit record to extract subtrans entries
|
||||||
// xl_xact_commit starts with time of commit
|
// xl_xact_commit starts with time of commit
|
||||||
let _xact_time = buf.get_i64_le();
|
let _xact_time = buf.get_i64_le();
|
||||||
@@ -847,7 +860,13 @@ pub fn decode_wal_record(checkpoint: &mut CheckPoint, record: Bytes) -> DecodedW
|
|||||||
let spcnode = buf.get_u32_le();
|
let spcnode = buf.get_u32_le();
|
||||||
let dbnode = buf.get_u32_le();
|
let dbnode = buf.get_u32_le();
|
||||||
let relnode = buf.get_u32_le();
|
let relnode = buf.get_u32_le();
|
||||||
//TODO handle this too?
|
let mut blk = DecodedBkpBlock::new();
|
||||||
|
blk.forknum = pg_constants::MAIN_FORKNUM;
|
||||||
|
blk.rnode_spcnode = spcnode;
|
||||||
|
blk.rnode_dbnode = dbnode;
|
||||||
|
blk.rnode_relnode = relnode;
|
||||||
|
blk.will_drop = true;
|
||||||
|
blocks.push(blk);
|
||||||
trace!(
|
trace!(
|
||||||
"XLOG_XACT_COMMIT relfilenode {}/{}/{}",
|
"XLOG_XACT_COMMIT relfilenode {}/{}/{}",
|
||||||
spcnode,
|
spcnode,
|
||||||
@@ -864,23 +883,29 @@ pub fn decode_wal_record(checkpoint: &mut CheckPoint, record: Bytes) -> DecodedW
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
if xinfo & pg_constants::XACT_XINFO_HAS_TWOPHASE != 0 {
|
if xinfo & pg_constants::XACT_XINFO_HAS_TWOPHASE != 0 {
|
||||||
let _xid = buf.get_u32_le();
|
let xid = buf.get_u32_le();
|
||||||
|
let mut blk = DecodedBkpBlock::new();
|
||||||
|
blk.forknum = pg_constants::PG_XACT_FORKNUM;
|
||||||
|
blk.blkno = xid / pg_constants::CLOG_XACTS_PER_PAGE;
|
||||||
|
blocks.push(blk);
|
||||||
trace!("XLOG_XACT_COMMIT-XACT_XINFO_HAS_TWOPHASE");
|
trace!("XLOG_XACT_COMMIT-XACT_XINFO_HAS_TWOPHASE");
|
||||||
//TODO handle this to be able to restore pg_twophase on node start
|
//TODO handle this to be able to restore pg_twophase on node start
|
||||||
}
|
}
|
||||||
} else if info == pg_constants::XLOG_XACT_ABORT {
|
} else if info == pg_constants::XLOG_XACT_ABORT || info == pg_constants::XLOG_XACT_ABORT_PREPARED {
|
||||||
let mut blk = DecodedBkpBlock::new();
|
if info == pg_constants::XLOG_XACT_ABORT {
|
||||||
blk.forknum = pg_constants::PG_XACT_FORKNUM;
|
let mut blk = DecodedBkpBlock::new();
|
||||||
blk.blkno = xlogrec.xl_xid / pg_constants::CLOG_XACTS_PER_PAGE;
|
blk.forknum = pg_constants::PG_XACT_FORKNUM;
|
||||||
trace!(
|
blk.blkno = xlogrec.xl_xid / pg_constants::CLOG_XACTS_PER_PAGE;
|
||||||
"XLOG_XACT_ABORT xl_info {} xl_prev {:X}/{:X} xid {} updates block {} main_data_len {}",
|
trace!(
|
||||||
xlogrec.xl_info, (xlogrec.xl_prev >> 32),
|
"XLOG_XACT_ABORT xl_info {} xl_prev {:X}/{:X} xid {} updates block {} main_data_len {}",
|
||||||
xlogrec.xl_prev & 0xffffffff,
|
xlogrec.xl_info, (xlogrec.xl_prev >> 32),
|
||||||
xlogrec.xl_xid,
|
xlogrec.xl_prev & 0xffffffff,
|
||||||
blk.blkno,
|
xlogrec.xl_xid,
|
||||||
main_data_len
|
blk.blkno,
|
||||||
);
|
main_data_len
|
||||||
blocks.push(blk);
|
);
|
||||||
|
blocks.push(blk);
|
||||||
|
}
|
||||||
//parse abort record to extract subtrans entries
|
//parse abort record to extract subtrans entries
|
||||||
// xl_xact_abort starts with time of commit
|
// xl_xact_abort starts with time of commit
|
||||||
let _xact_time = buf.get_i64_le();
|
let _xact_time = buf.get_i64_le();
|
||||||
@@ -914,7 +939,13 @@ pub fn decode_wal_record(checkpoint: &mut CheckPoint, record: Bytes) -> DecodedW
|
|||||||
let spcnode = buf.get_u32_le();
|
let spcnode = buf.get_u32_le();
|
||||||
let dbnode = buf.get_u32_le();
|
let dbnode = buf.get_u32_le();
|
||||||
let relnode = buf.get_u32_le();
|
let relnode = buf.get_u32_le();
|
||||||
//TODO save these too
|
let mut blk = DecodedBkpBlock::new();
|
||||||
|
blk.forknum = pg_constants::MAIN_FORKNUM;
|
||||||
|
blk.rnode_spcnode = spcnode;
|
||||||
|
blk.rnode_dbnode = dbnode;
|
||||||
|
blk.rnode_relnode = relnode;
|
||||||
|
blk.will_drop = true;
|
||||||
|
blocks.push(blk);
|
||||||
trace!(
|
trace!(
|
||||||
"XLOG_XACT_ABORT relfilenode {}/{}/{}",
|
"XLOG_XACT_ABORT relfilenode {}/{}/{}",
|
||||||
spcnode,
|
spcnode,
|
||||||
@@ -924,7 +955,11 @@ pub fn decode_wal_record(checkpoint: &mut CheckPoint, record: Bytes) -> DecodedW
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
if xinfo & pg_constants::XACT_XINFO_HAS_TWOPHASE != 0 {
|
if xinfo & pg_constants::XACT_XINFO_HAS_TWOPHASE != 0 {
|
||||||
let _xid = buf.get_u32_le();
|
let xid = buf.get_u32_le();
|
||||||
|
let mut blk = DecodedBkpBlock::new();
|
||||||
|
blk.forknum = pg_constants::PG_XACT_FORKNUM;
|
||||||
|
blk.blkno = xid / pg_constants::CLOG_XACTS_PER_PAGE;
|
||||||
|
blocks.push(blk);
|
||||||
trace!("XLOG_XACT_ABORT-XACT_XINFO_HAS_TWOPHASE");
|
trace!("XLOG_XACT_ABORT-XACT_XINFO_HAS_TWOPHASE");
|
||||||
}
|
}
|
||||||
} else if info == pg_constants::XLOG_XACT_PREPARE {
|
} else if info == pg_constants::XLOG_XACT_PREPARE {
|
||||||
@@ -932,6 +967,8 @@ pub fn decode_wal_record(checkpoint: &mut CheckPoint, record: Bytes) -> DecodedW
|
|||||||
blk.forknum = pg_constants::PG_TWOPHASE_FORKNUM;
|
blk.forknum = pg_constants::PG_TWOPHASE_FORKNUM;
|
||||||
blk.blkno = xlogrec.xl_xid;
|
blk.blkno = xlogrec.xl_xid;
|
||||||
blk.will_init = true;
|
blk.will_init = true;
|
||||||
|
blocks.push(blk);
|
||||||
|
info!("Prepare transaction {}", xlogrec.xl_xid);
|
||||||
}
|
}
|
||||||
} else if xlogrec.xl_rmid == pg_constants::RM_DBASE_ID {
|
} else if xlogrec.xl_rmid == pg_constants::RM_DBASE_ID {
|
||||||
let info = xlogrec.xl_info & !pg_constants::XLR_INFO_MASK;
|
let info = xlogrec.xl_info & !pg_constants::XLR_INFO_MASK;
|
||||||
@@ -1067,18 +1104,18 @@ pub fn decode_wal_record(checkpoint: &mut CheckPoint, record: Bytes) -> DecodedW
|
|||||||
blk.blkno = blkno;
|
blk.blkno = blkno;
|
||||||
blocks.push(blk);
|
blocks.push(blk);
|
||||||
}
|
}
|
||||||
if xlrec.mid > checkpoint.nextMulti {
|
if xlrec.mid >= checkpoint.nextMulti {
|
||||||
checkpoint.nextMulti = xlrec.mid;
|
checkpoint.nextMulti = xlrec.mid+1;
|
||||||
}
|
}
|
||||||
if xlrec.moff > checkpoint.nextMultiOffset {
|
if xlrec.moff+xlrec.nmembers > checkpoint.nextMultiOffset {
|
||||||
checkpoint.nextMultiOffset = xlrec.moff;
|
checkpoint.nextMultiOffset = xlrec.moff+xlrec.nmembers;
|
||||||
}
|
}
|
||||||
let max_xid = xlrec
|
let max_xid = xlrec
|
||||||
.members
|
.members
|
||||||
.iter()
|
.iter()
|
||||||
.fold(checkpoint.nextXid.value as u32, |acc, mbr| {
|
.fold(checkpoint.nextXid.value as u32, |acc, mbr| {
|
||||||
if mbr.xid > acc {
|
if mbr.xid >= acc {
|
||||||
mbr.xid
|
mbr.xid+1
|
||||||
} else {
|
} else {
|
||||||
acc
|
acc
|
||||||
}
|
}
|
||||||
@@ -1088,7 +1125,7 @@ pub fn decode_wal_record(checkpoint: &mut CheckPoint, record: Bytes) -> DecodedW
|
|||||||
};
|
};
|
||||||
} else if info == pg_constants::XLOG_MULTIXACT_TRUNCATE_ID {
|
} else if info == pg_constants::XLOG_MULTIXACT_TRUNCATE_ID {
|
||||||
let xlrec = XlMultiXactTruncate::decode(&mut buf);
|
let xlrec = XlMultiXactTruncate::decode(&mut buf);
|
||||||
checkpoint.oldestXid = xlrec.end_trunc_off;
|
checkpoint.oldestMulti = xlrec.end_trunc_off;
|
||||||
checkpoint.oldestMultiDB = xlrec.oldest_multi_db;
|
checkpoint.oldestMultiDB = xlrec.oldest_multi_db;
|
||||||
let first_off_blkno =
|
let first_off_blkno =
|
||||||
xlrec.start_trunc_off / pg_constants::MULTIXACT_OFFSETS_PER_PAGE as u32;
|
xlrec.start_trunc_off / pg_constants::MULTIXACT_OFFSETS_PER_PAGE as u32;
|
||||||
@@ -1098,7 +1135,7 @@ pub fn decode_wal_record(checkpoint: &mut CheckPoint, record: Bytes) -> DecodedW
|
|||||||
let mut blk = DecodedBkpBlock::new();
|
let mut blk = DecodedBkpBlock::new();
|
||||||
blk.forknum = pg_constants::PG_MXACT_OFFSETS_FORKNUM;
|
blk.forknum = pg_constants::PG_MXACT_OFFSETS_FORKNUM;
|
||||||
blk.blkno = blkno;
|
blk.blkno = blkno;
|
||||||
blk.will_init = true;
|
blk.will_drop = true;
|
||||||
blocks.push(blk);
|
blocks.push(blk);
|
||||||
}
|
}
|
||||||
let first_mbr_blkno =
|
let first_mbr_blkno =
|
||||||
@@ -1109,7 +1146,7 @@ pub fn decode_wal_record(checkpoint: &mut CheckPoint, record: Bytes) -> DecodedW
|
|||||||
let mut blk = DecodedBkpBlock::new();
|
let mut blk = DecodedBkpBlock::new();
|
||||||
blk.forknum = pg_constants::PG_MXACT_MEMBERS_FORKNUM;
|
blk.forknum = pg_constants::PG_MXACT_MEMBERS_FORKNUM;
|
||||||
blk.blkno = blkno;
|
blk.blkno = blkno;
|
||||||
blk.will_init = true;
|
blk.will_drop = true;
|
||||||
blocks.push(blk);
|
blocks.push(blk);
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
|
|||||||
@@ -201,7 +201,7 @@ fn walreceiver_main(
|
|||||||
|
|
||||||
let new_checkpoint_bytes = encode_checkpoint(checkpoint);
|
let new_checkpoint_bytes = encode_checkpoint(checkpoint);
|
||||||
if new_checkpoint_bytes != old_checkpoint_bytes {
|
if new_checkpoint_bytes != old_checkpoint_bytes {
|
||||||
timeline.put_page_image(checkpoint_tag, Lsn(0), new_checkpoint_bytes);
|
timeline.put_page_image(checkpoint_tag, Lsn(0), new_checkpoint_bytes)?;
|
||||||
}
|
}
|
||||||
// Now that this record has been handled, let the page cache know that
|
// Now that this record has been handled, let the page cache know that
|
||||||
// it is up-to-date to this LSN
|
// it is up-to-date to this LSN
|
||||||
|
|||||||
@@ -286,9 +286,11 @@ impl PostgresRedoManagerInternal {
|
|||||||
} else if xlogrec.xl_rmid == pg_constants::RM_XACT_ID {
|
} else if xlogrec.xl_rmid == pg_constants::RM_XACT_ID {
|
||||||
let info = xlogrec.xl_info & pg_constants::XLOG_XACT_OPMASK;
|
let info = xlogrec.xl_info & pg_constants::XLOG_XACT_OPMASK;
|
||||||
let mut status = 0;
|
let mut status = 0;
|
||||||
if info == pg_constants::XLOG_XACT_COMMIT {
|
if info == pg_constants::XLOG_XACT_COMMIT || info == pg_constants::XLOG_XACT_COMMIT_PREPARED {
|
||||||
status = pg_constants::TRANSACTION_STATUS_COMMITTED;
|
status = pg_constants::TRANSACTION_STATUS_COMMITTED;
|
||||||
transaction_id_set_status(xlogrec.xl_xid, status, &mut page);
|
if info == pg_constants::XLOG_XACT_COMMIT {
|
||||||
|
transaction_id_set_status(xlogrec.xl_xid, status, &mut page);
|
||||||
|
}
|
||||||
//handle subtrans
|
//handle subtrans
|
||||||
let _xact_time = buf.get_i64_le();
|
let _xact_time = buf.get_i64_le();
|
||||||
let mut xinfo = 0;
|
let mut xinfo = 0;
|
||||||
@@ -312,9 +314,38 @@ impl PostgresRedoManagerInternal {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
} else if info == pg_constants::XLOG_XACT_ABORT {
|
if info == pg_constants::XLOG_XACT_COMMIT_PREPARED {
|
||||||
|
if xinfo & pg_constants::XACT_XINFO_HAS_RELFILENODES != 0 {
|
||||||
|
let nrels = buf.get_i32_le();
|
||||||
|
for _i in 0..nrels {
|
||||||
|
let spcnode = buf.get_u32_le();
|
||||||
|
let dbnode = buf.get_u32_le();
|
||||||
|
let relnode = buf.get_u32_le();
|
||||||
|
//TODO handle this too?
|
||||||
|
trace!(
|
||||||
|
"XLOG_XACT_COMMIT relfilenode {}/{}/{}",
|
||||||
|
spcnode,
|
||||||
|
dbnode,
|
||||||
|
relnode
|
||||||
|
);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if xinfo & pg_constants::XACT_XINFO_HAS_INVALS != 0 {
|
||||||
|
let nmsgs = buf.get_i32_le();
|
||||||
|
for _i in 0..nmsgs {
|
||||||
|
let sizeof_shared_invalidation_message = 0;
|
||||||
|
buf.advance(sizeof_shared_invalidation_message);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
assert!((xinfo & pg_constants::XACT_XINFO_HAS_TWOPHASE) != 0);
|
||||||
|
let xid = buf.get_u32_le();
|
||||||
|
transaction_id_set_status(xid, status, &mut page);
|
||||||
|
}
|
||||||
|
} else if info == pg_constants::XLOG_XACT_ABORT || info == pg_constants::XLOG_XACT_ABORT_PREPARED {
|
||||||
status = pg_constants::TRANSACTION_STATUS_ABORTED;
|
status = pg_constants::TRANSACTION_STATUS_ABORTED;
|
||||||
transaction_id_set_status(xlogrec.xl_xid, status, &mut page);
|
if info == pg_constants::XLOG_XACT_ABORT {
|
||||||
|
transaction_id_set_status(xlogrec.xl_xid, status, &mut page);
|
||||||
|
}
|
||||||
//handle subtrans
|
//handle subtrans
|
||||||
let _xact_time = buf.get_i64_le();
|
let _xact_time = buf.get_i64_le();
|
||||||
let mut xinfo = 0;
|
let mut xinfo = 0;
|
||||||
@@ -338,8 +369,39 @@ impl PostgresRedoManagerInternal {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
} else if info != pg_constants::XLOG_XACT_PREPARE {
|
if info == pg_constants::XLOG_XACT_ABORT_PREPARED {
|
||||||
trace!("handle_apply_request for RM_XACT_ID-{} NOT SUPPORTED YET. RETURN. lsn {} main_data_offset {}, rec.len {}",
|
if xinfo & pg_constants::XACT_XINFO_HAS_RELFILENODES != 0 {
|
||||||
|
let nrels = buf.get_i32_le();
|
||||||
|
for _i in 0..nrels {
|
||||||
|
let spcnode = buf.get_u32_le();
|
||||||
|
let dbnode = buf.get_u32_le();
|
||||||
|
let relnode = buf.get_u32_le();
|
||||||
|
//TODO handle this too?
|
||||||
|
trace!(
|
||||||
|
"XLOG_XACT_COMMIT relfilenode {}/{}/{}",
|
||||||
|
spcnode,
|
||||||
|
dbnode,
|
||||||
|
relnode
|
||||||
|
);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if xinfo & pg_constants::XACT_XINFO_HAS_INVALS != 0 {
|
||||||
|
let nmsgs = buf.get_i32_le();
|
||||||
|
for _i in 0..nmsgs {
|
||||||
|
let sizeof_shared_invalidation_message = 0;
|
||||||
|
buf.advance(sizeof_shared_invalidation_message);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
assert!((xinfo & pg_constants::XACT_XINFO_HAS_TWOPHASE) != 0);
|
||||||
|
let xid = buf.get_u32_le();
|
||||||
|
transaction_id_set_status(xid, status, &mut page);
|
||||||
|
}
|
||||||
|
} else if info == pg_constants::XLOG_XACT_PREPARE {
|
||||||
|
info!("Apply prepare {} record", xlogrec.xl_xid);
|
||||||
|
page.clear();
|
||||||
|
page.extend_from_slice(&buf[..]);
|
||||||
|
} else {
|
||||||
|
error!("handle_apply_request for RM_XACT_ID-{} NOT SUPPORTED YET. RETURN. lsn {} main_data_offset {}, rec.len {}",
|
||||||
status,
|
status,
|
||||||
record.lsn,
|
record.lsn,
|
||||||
record.main_data_offset, record.rec.len());
|
record.main_data_offset, record.rec.len());
|
||||||
@@ -383,9 +445,6 @@ impl PostgresRedoManagerInternal {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
} else if info == pg_constants::XLOG_MULTIXACT_TRUNCATE_ID {
|
|
||||||
// empty page image indicates that this SLRU page is truncated and can be removed by GC
|
|
||||||
page.clear();
|
|
||||||
} else {
|
} else {
|
||||||
panic!();
|
panic!();
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -60,6 +60,8 @@ pub const CLOG_TRUNCATE: u8 = 0x10;
|
|||||||
pub const XLOG_XACT_COMMIT: u8 = 0x00;
|
pub const XLOG_XACT_COMMIT: u8 = 0x00;
|
||||||
pub const XLOG_XACT_PREPARE: u8 = 0x10;
|
pub const XLOG_XACT_PREPARE: u8 = 0x10;
|
||||||
pub const XLOG_XACT_ABORT: u8 = 0x20;
|
pub const XLOG_XACT_ABORT: u8 = 0x20;
|
||||||
|
pub const XLOG_XACT_COMMIT_PREPARED: u8 = 0x30;
|
||||||
|
pub const XLOG_XACT_ABORT_PREPARED: u8 = 0x40;
|
||||||
|
|
||||||
// From srlu.h
|
// From srlu.h
|
||||||
pub const SLRU_PAGES_PER_SEGMENT: u32 = 32;
|
pub const SLRU_PAGES_PER_SEGMENT: u32 = 32;
|
||||||
|
|||||||
@@ -47,6 +47,7 @@ pub fn forknumber_to_name(forknum: u8) -> Option<&'static str> {
|
|||||||
pg_constants::PG_MXACT_OFFSETS_FORKNUM => Some("mxact_offsets"),
|
pg_constants::PG_MXACT_OFFSETS_FORKNUM => Some("mxact_offsets"),
|
||||||
pg_constants::PG_MXACT_MEMBERS_FORKNUM => Some("mxact_members"),
|
pg_constants::PG_MXACT_MEMBERS_FORKNUM => Some("mxact_members"),
|
||||||
pg_constants::PG_TWOPHASE_FORKNUM => Some("twophase"),
|
pg_constants::PG_TWOPHASE_FORKNUM => Some("twophase"),
|
||||||
|
pg_constants::PG_CHECKPOINT_FORKNUM => Some("checkpoint"),
|
||||||
|
|
||||||
_ => Some("UNKNOWN FORKNUM"),
|
_ => Some("UNKNOWN FORKNUM"),
|
||||||
}
|
}
|
||||||
|
|||||||
62
test_runner/batch_others/test_multixact.py
Normal file
62
test_runner/batch_others/test_multixact.py
Normal file
@@ -0,0 +1,62 @@
|
|||||||
|
import pytest
|
||||||
|
import os
|
||||||
|
import psycopg2
|
||||||
|
import multiprocessing
|
||||||
|
|
||||||
|
pytest_plugins = ("fixtures.zenith_fixtures")
|
||||||
|
|
||||||
|
def runQuery(connstr):
|
||||||
|
con = psycopg2.connect(connstr)
|
||||||
|
con.set_isolation_level(psycopg2.extensions.ISOLATION_LEVEL_AUTOCOMMIT)
|
||||||
|
cur = con.cursor()
|
||||||
|
cur.execute('select * from t1 for key share;')
|
||||||
|
|
||||||
|
|
||||||
|
def test_multixact(pageserver, postgres, pg_bin, zenith_cli, base_dir):
|
||||||
|
|
||||||
|
# Create a branch for us
|
||||||
|
zenith_cli.run(["branch", "test_multixact", "empty"])
|
||||||
|
pg = postgres.create_start('test_multixact')
|
||||||
|
|
||||||
|
print("postgres is running on 'test_multixact' branch")
|
||||||
|
pg_conn = psycopg2.connect(pg.connstr())
|
||||||
|
pg_conn.set_isolation_level(psycopg2.extensions.ISOLATION_LEVEL_AUTOCOMMIT)
|
||||||
|
cur = pg_conn.cursor()
|
||||||
|
|
||||||
|
cur.execute('CREATE TABLE t1(i int primary key);'
|
||||||
|
'INSERT INTO t1 select * from generate_series(1,100);')
|
||||||
|
|
||||||
|
# Lock entries in parallel connections to set multixact
|
||||||
|
nclients = 3
|
||||||
|
pool = multiprocessing.Pool(nclients)
|
||||||
|
args = [pg.connstr()] * nclients
|
||||||
|
pool.map(runQuery, args)
|
||||||
|
pool.close()
|
||||||
|
pool.join()
|
||||||
|
|
||||||
|
# force wal flush
|
||||||
|
cur.execute('checkpoint')
|
||||||
|
|
||||||
|
cur.execute('SELECT next_multixact_id, pg_current_wal_flush_lsn() FROM pg_control_checkpoint();')
|
||||||
|
res = cur.fetchone()
|
||||||
|
next_multixact_id = res[0]
|
||||||
|
lsn = res[1]
|
||||||
|
|
||||||
|
# Ensure that we did lock some tuples
|
||||||
|
assert(int(next_multixact_id) > 1)
|
||||||
|
|
||||||
|
# Branch at this point
|
||||||
|
zenith_cli.run(["branch", "test_multixact_new", "test_multixact@"+lsn]);
|
||||||
|
pg_new = postgres.create_start('test_multixact_new')
|
||||||
|
|
||||||
|
print("postgres is running on 'test_multixact_new' branch")
|
||||||
|
pg_new_conn = psycopg2.connect(pg_new.connstr())
|
||||||
|
pg_new_conn.set_isolation_level(psycopg2.extensions.ISOLATION_LEVEL_AUTOCOMMIT)
|
||||||
|
cur_new = pg_new_conn.cursor()
|
||||||
|
|
||||||
|
cur_new.execute('SELECT next_multixact_id FROM pg_control_checkpoint();')
|
||||||
|
next_multixact_id_new = cur_new.fetchone()[0]
|
||||||
|
|
||||||
|
# Check that we restored pg_controlfile correctly
|
||||||
|
# TODO compare content of pg_multixact files?
|
||||||
|
assert(next_multixact_id_new == next_multixact_id)
|
||||||
61
test_runner/batch_others/test_oldestXid.py
Normal file
61
test_runner/batch_others/test_oldestXid.py
Normal file
@@ -0,0 +1,61 @@
|
|||||||
|
import pytest
|
||||||
|
import getpass
|
||||||
|
import psycopg2
|
||||||
|
import time
|
||||||
|
import os
|
||||||
|
|
||||||
|
pytest_plugins = ("fixtures.zenith_fixtures")
|
||||||
|
|
||||||
|
#
|
||||||
|
# Test pg_control values after recreating a postgres instance
|
||||||
|
#
|
||||||
|
def test_oldestxid(zenith_cli, pageserver, postgres, pg_bin, repo_dir):
|
||||||
|
zenith_cli.run(["branch", "test_oldestxid", "empty"]);
|
||||||
|
|
||||||
|
pg = postgres.create_start('test_oldestxid')
|
||||||
|
print("postgres is running on 'test_oldestxid' branch")
|
||||||
|
|
||||||
|
pg_conn = psycopg2.connect(pg.connstr());
|
||||||
|
pg_conn.set_isolation_level(psycopg2.extensions.ISOLATION_LEVEL_AUTOCOMMIT)
|
||||||
|
cur = pg_conn.cursor()
|
||||||
|
|
||||||
|
# Create table, and insert a row
|
||||||
|
cur.execute('CREATE TABLE foo (t text)');
|
||||||
|
cur.execute("INSERT INTO foo VALUES ('bar')");
|
||||||
|
|
||||||
|
cur.execute('checkpoint')
|
||||||
|
cur.execute('SELECT oldest_xid, oldest_xid_dbid, oldest_active_xid FROM pg_control_checkpoint();')
|
||||||
|
res = cur.fetchone()
|
||||||
|
oldest_xid = res[0]
|
||||||
|
oldest_xid_dbid = res[1]
|
||||||
|
oldest_active_xid = res[2]
|
||||||
|
|
||||||
|
# Stop, and destroy the Postgres instance. Then recreate and restart it.
|
||||||
|
pg_conn.close();
|
||||||
|
pg.stop();
|
||||||
|
|
||||||
|
# capture old pg_controldata output for debugging purposes
|
||||||
|
pgdatadir = os.path.join(repo_dir, 'pgdatadirs/test_oldestxid')
|
||||||
|
pg_bin.run_capture(['pg_controldata', pgdatadir])
|
||||||
|
|
||||||
|
pg.destroy();
|
||||||
|
pg.create_start('test_oldestxid');
|
||||||
|
pg_conn = psycopg2.connect(pg.connstr());
|
||||||
|
pg_conn.set_isolation_level(psycopg2.extensions.ISOLATION_LEVEL_AUTOCOMMIT)
|
||||||
|
cur = pg_conn.cursor()
|
||||||
|
|
||||||
|
cur.execute('SELECT oldest_xid, oldest_xid_dbid, oldest_active_xid FROM pg_control_checkpoint();')
|
||||||
|
res = cur.fetchone()
|
||||||
|
oldest_xid_new = res[0]
|
||||||
|
oldest_xid_dbid_new = res[1]
|
||||||
|
oldest_active_xid_new = res[2]
|
||||||
|
|
||||||
|
assert(oldest_xid_new == oldest_xid)
|
||||||
|
assert(oldest_xid_dbid_new == oldest_xid_dbid)
|
||||||
|
|
||||||
|
# this field should be reset at restart
|
||||||
|
assert(int(oldest_active_xid_new) == 0)
|
||||||
|
|
||||||
|
# capture new pg_controldata output for debugging purposes
|
||||||
|
pgdatadir = os.path.join(repo_dir, 'pgdatadirs/test_oldestxid')
|
||||||
|
pg_bin.run_capture(['pg_controldata', pgdatadir])
|
||||||
60
test_runner/batch_others/test_restart_compute.py
Normal file
60
test_runner/batch_others/test_restart_compute.py
Normal file
@@ -0,0 +1,60 @@
|
|||||||
|
import pytest
|
||||||
|
import getpass
|
||||||
|
import psycopg2
|
||||||
|
import time
|
||||||
|
|
||||||
|
pytest_plugins = ("fixtures.zenith_fixtures")
|
||||||
|
|
||||||
|
#
|
||||||
|
# Test restarting and recreating a postgres instance
|
||||||
|
#
|
||||||
|
def test_restart_compute(zenith_cli, pageserver, postgres, pg_bin):
|
||||||
|
zenith_cli.run(["branch", "test_restart_compute", "empty"]);
|
||||||
|
|
||||||
|
pg = postgres.create_start('test_restart_compute')
|
||||||
|
print("postgres is running on 'test_restart_compute' branch")
|
||||||
|
|
||||||
|
pg_conn = psycopg2.connect(pg.connstr());
|
||||||
|
pg_conn.set_isolation_level(psycopg2.extensions.ISOLATION_LEVEL_AUTOCOMMIT)
|
||||||
|
cur = pg_conn.cursor()
|
||||||
|
|
||||||
|
# Create table, and insert a row
|
||||||
|
cur.execute('CREATE TABLE foo (t text)');
|
||||||
|
cur.execute("INSERT INTO foo VALUES ('bar')");
|
||||||
|
|
||||||
|
# Stop and restart the Postgres instance
|
||||||
|
pg_conn.close();
|
||||||
|
pg.stop();
|
||||||
|
pg.start();
|
||||||
|
pg_conn = psycopg2.connect(pg.connstr());
|
||||||
|
pg_conn.set_isolation_level(psycopg2.extensions.ISOLATION_LEVEL_AUTOCOMMIT)
|
||||||
|
cur = pg_conn.cursor()
|
||||||
|
|
||||||
|
# We can still see the row
|
||||||
|
cur.execute('SELECT count(*) FROM foo');
|
||||||
|
assert(cur.fetchone()[0] == 1);
|
||||||
|
|
||||||
|
# Insert another row
|
||||||
|
cur.execute("INSERT INTO foo VALUES ('bar2')");
|
||||||
|
cur.execute('SELECT count(*) FROM foo');
|
||||||
|
assert(cur.fetchone()[0] == 2);
|
||||||
|
|
||||||
|
# FIXME: Currently, there is no guarantee that by the time the INSERT commits, the WAL
|
||||||
|
# has been streamed safely to the WAL safekeeper or page server. It is merely stored
|
||||||
|
# on the Postgres instance's local disk. Sleep a little, to give it time to be
|
||||||
|
# streamed. This should be removed, when we have the ability to run the Postgres
|
||||||
|
# instance -> safekeeper streaming in synchronous mode.
|
||||||
|
time.sleep(5)
|
||||||
|
|
||||||
|
# Stop, and destroy the Postgres instance. Then recreate and restart it.
|
||||||
|
pg_conn.close();
|
||||||
|
pg.stop();
|
||||||
|
pg.destroy();
|
||||||
|
pg.create_start('test_restart_compute');
|
||||||
|
pg_conn = psycopg2.connect(pg.connstr());
|
||||||
|
pg_conn.set_isolation_level(psycopg2.extensions.ISOLATION_LEVEL_AUTOCOMMIT)
|
||||||
|
cur = pg_conn.cursor()
|
||||||
|
|
||||||
|
# We can still see the rows
|
||||||
|
cur.execute('SELECT count(*) FROM foo');
|
||||||
|
assert(cur.fetchone()[0] == 2);
|
||||||
58
test_runner/batch_others/test_twophase.py
Normal file
58
test_runner/batch_others/test_twophase.py
Normal file
@@ -0,0 +1,58 @@
|
|||||||
|
#
|
||||||
|
# Test branching, when a transaction is in prepared state
|
||||||
|
#
|
||||||
|
import pytest
|
||||||
|
import getpass
|
||||||
|
import psycopg2
|
||||||
|
|
||||||
|
pytest_plugins = ("fixtures.zenith_fixtures")
|
||||||
|
|
||||||
|
def test_twophase(zenith_cli, pageserver, postgres, pg_bin):
|
||||||
|
zenith_cli.run(["branch", "test_twophase", "empty"]);
|
||||||
|
|
||||||
|
pg = postgres.create_start('test_twophase', ['max_prepared_transactions=5'])
|
||||||
|
print("postgres is running on 'test_twophase' branch")
|
||||||
|
|
||||||
|
conn = psycopg2.connect(pg.connstr());
|
||||||
|
conn.set_isolation_level(psycopg2.extensions.ISOLATION_LEVEL_AUTOCOMMIT)
|
||||||
|
cur = conn.cursor()
|
||||||
|
|
||||||
|
cur.execute('CREATE TABLE foo (t text)');
|
||||||
|
|
||||||
|
# Prepare a transaction that will insert a row
|
||||||
|
cur.execute('BEGIN');
|
||||||
|
cur.execute("INSERT INTO foo VALUES ('one')");
|
||||||
|
cur.execute("PREPARE TRANSACTION 'insert_one'");
|
||||||
|
|
||||||
|
# Prepare another transaction that will insert a row
|
||||||
|
cur.execute('BEGIN');
|
||||||
|
cur.execute("INSERT INTO foo VALUES ('two')");
|
||||||
|
cur.execute("PREPARE TRANSACTION 'insert_two'");
|
||||||
|
|
||||||
|
cur.execute('BEGIN');
|
||||||
|
cur.execute("INSERT INTO foo VALUES ('three')");
|
||||||
|
cur.execute("PREPARE TRANSACTION 'insert_three'");
|
||||||
|
cur.execute("COMMIT PREPARED 'insert_three'");
|
||||||
|
|
||||||
|
cur.execute('SELECT pg_current_wal_insert_lsn()');
|
||||||
|
lsn = cur.fetchone()[0]
|
||||||
|
|
||||||
|
# Create a branch with the transaction in prepared state
|
||||||
|
zenith_cli.run(["branch", "test_twophase_prepared", "test_twophase@"+lsn]);
|
||||||
|
|
||||||
|
pg2 = postgres.create_start('test_twophase_prepared', ['max_prepared_transactions=5'])
|
||||||
|
conn2 = psycopg2.connect(pg2.connstr());
|
||||||
|
conn2.set_isolation_level(psycopg2.extensions.ISOLATION_LEVEL_AUTOCOMMIT)
|
||||||
|
cur2 = conn2.cursor()
|
||||||
|
|
||||||
|
# On the new branch, commit one of the prepared transactions, abort the other one.
|
||||||
|
cur2.execute("COMMIT PREPARED 'insert_one'");
|
||||||
|
cur2.execute("ROLLBACK PREPARED 'insert_two'");
|
||||||
|
|
||||||
|
cur2.execute('SELECT * FROM foo');
|
||||||
|
assert(cur2.fetchall() == [('one',),('three',)]);
|
||||||
|
|
||||||
|
# Neither insert is visible on the original branch, the transactions are still
|
||||||
|
# in prepared state there.
|
||||||
|
cur.execute('SELECT * FROM foo');
|
||||||
|
assert(cur.fetchall() == [('three',)]);
|
||||||
@@ -168,14 +168,18 @@ class Postgres:
|
|||||||
self.branch = None
|
self.branch = None
|
||||||
# path to conf is <repo_dir>/pgdatadirs/<branch_name>/postgresql.conf
|
# path to conf is <repo_dir>/pgdatadirs/<branch_name>/postgresql.conf
|
||||||
|
|
||||||
def create_start(self, branch, config_lines=None):
|
def create(self, branch, config_lines=None):
|
||||||
""" create the pg data directory, and start the server """
|
""" create the pg data directory """
|
||||||
self.zenith_cli.run(['pg', 'create', branch])
|
self.zenith_cli.run(['pg', 'create', branch])
|
||||||
self.branch = branch
|
self.branch = branch
|
||||||
if config_lines is None:
|
if config_lines is None:
|
||||||
config_lines = []
|
config_lines = []
|
||||||
self.config(config_lines)
|
self.config(config_lines)
|
||||||
self.zenith_cli.run(['pg', 'start', branch])
|
return
|
||||||
|
|
||||||
|
def start(self):
|
||||||
|
""" start the server """
|
||||||
|
self.zenith_cli.run(['pg', 'start', self.branch])
|
||||||
self.running = True
|
self.running = True
|
||||||
return
|
return
|
||||||
|
|
||||||
@@ -189,9 +193,19 @@ class Postgres:
|
|||||||
conf.write('\n')
|
conf.write('\n')
|
||||||
|
|
||||||
def stop(self):
|
def stop(self):
|
||||||
|
""" stop the server """
|
||||||
if self.running:
|
if self.running:
|
||||||
self.zenith_cli.run(['pg', 'stop', self.branch])
|
self.zenith_cli.run(['pg', 'stop', self.branch])
|
||||||
|
|
||||||
|
def destroy(self):
|
||||||
|
datadir = os.path.join(self.repo_dir, 'pgdatadirs/{}'.format(self.branch))
|
||||||
|
shutil.rmtree(datadir)
|
||||||
|
|
||||||
|
def create_start(self, branch, config_lines=None):
|
||||||
|
self.create(branch, config_lines);
|
||||||
|
self.start();
|
||||||
|
return
|
||||||
|
|
||||||
# Return a libpq connection string to connect to the Postgres instance
|
# Return a libpq connection string to connect to the Postgres instance
|
||||||
def connstr(self, dbname='postgres'):
|
def connstr(self, dbname='postgres'):
|
||||||
conn_str = 'host={} port={} dbname={} user={}'.format(
|
conn_str = 'host={} port={} dbname={} user={}'.format(
|
||||||
|
|||||||
Reference in New Issue
Block a user