mirror of
https://github.com/neondatabase/neon.git
synced 2026-01-18 19:02:56 +00:00
Compare commits
15 Commits
rustls
...
test_oldes
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
01e9011ea4 | ||
|
|
f61e92b692 | ||
|
|
3f89016bd7 | ||
|
|
3865a8d901 | ||
|
|
2b7963056c | ||
|
|
1e08fda040 | ||
|
|
47f9ff6410 | ||
|
|
b72939530c | ||
|
|
72e373cd4e | ||
|
|
e3ea9cf70f | ||
|
|
e2a9b4cc9b | ||
|
|
5b9ea6495e | ||
|
|
6ec8a90c78 | ||
|
|
e9c7665c81 | ||
|
|
feb925c546 |
@@ -5,6 +5,7 @@ use std::sync::Arc;
|
||||
use std::time::SystemTime;
|
||||
use tar::{Builder, Header};
|
||||
use walkdir::WalkDir;
|
||||
use bytes::{BufMut, BytesMut};
|
||||
|
||||
use crate::repository::{BufferTag, RelTag, Timeline};
|
||||
use postgres_ffi::relfile_utils::*;
|
||||
@@ -101,7 +102,6 @@ fn add_relmap_files(
|
||||
ar.append_path_with_name(&src_path, &dst_path)?;
|
||||
format!("base/{}/pg_filenode.map", db.dbnode)
|
||||
};
|
||||
info!("Deliver {}", path);
|
||||
assert!(img.len() == 512);
|
||||
let header = new_tar_header(&path, img.len() as u64)?;
|
||||
ar.append(&header, &img[..])?;
|
||||
@@ -128,9 +128,13 @@ fn add_twophase_files(
|
||||
blknum: *xid,
|
||||
};
|
||||
let img = timeline.get_page_at_lsn(tag, lsn)?;
|
||||
let mut buf = BytesMut::new();
|
||||
buf.extend_from_slice(&img[..]);
|
||||
let crc = crc32c::crc32c(&img[..]);
|
||||
buf.put_u32_le(crc);
|
||||
let path = format!("pg_twophase/{:>08X}", xid);
|
||||
let header = new_tar_header(&path, img.len() as u64)?;
|
||||
ar.append(&header, &img[..])?;
|
||||
let header = new_tar_header(&path, buf.len() as u64)?;
|
||||
ar.append(&header, &buf[..])?;
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
@@ -154,11 +158,10 @@ fn add_pgcontrol_file(
|
||||
let mut checkpoint = postgres_ffi::decode_checkpoint(checkpoint_bytes)?;
|
||||
|
||||
checkpoint.redo = lsn.0;
|
||||
checkpoint.nextXid.value += 1;
|
||||
// TODO: When we restart master there are no active transaction and oldestXid is
|
||||
// equal to nextXid if there are no prepared transactions.
|
||||
// Let's ignore them for a while...
|
||||
checkpoint.oldestXid = checkpoint.nextXid.value as u32;
|
||||
//checkpoint.oldestXid = checkpoint.nextXid.value as u32;
|
||||
pg_control.checkPointCopy = checkpoint;
|
||||
let pg_control_bytes = postgres_ffi::encode_pg_control(pg_control);
|
||||
let header = new_tar_header("global/pg_control", pg_control_bytes.len() as u64)?;
|
||||
|
||||
@@ -9,6 +9,8 @@ use postgres_ffi::relfile_utils::forknumber_to_name;
|
||||
use std::fmt;
|
||||
use std::sync::Arc;
|
||||
use zenith_utils::lsn::Lsn;
|
||||
use log::*;
|
||||
use postgres_ffi::nonrelfile_utils::transaction_id_get_status;
|
||||
|
||||
///
|
||||
/// A repository corresponds to one .zenith directory. One repository holds multiple
|
||||
@@ -61,14 +63,23 @@ pub trait Timeline {
|
||||
///
|
||||
/// This will implicitly extend the relation, if the page is beyond the
|
||||
/// current end-of-file.
|
||||
fn put_wal_record(&self, tag: BufferTag, rec: WALRecord);
|
||||
fn put_wal_record(&self, tag: BufferTag, rec: WALRecord) -> Result<()>;
|
||||
|
||||
/// Like put_wal_record, but with ready-made image of the page.
|
||||
fn put_page_image(&self, tag: BufferTag, lsn: Lsn, img: Bytes);
|
||||
fn put_page_image(&self, tag: BufferTag, lsn: Lsn, img: Bytes) -> Result<()>;
|
||||
|
||||
/// Truncate relation
|
||||
fn put_truncation(&self, rel: RelTag, lsn: Lsn, nblocks: u32) -> Result<()>;
|
||||
|
||||
/// Drop relation or file segment
|
||||
fn put_drop(&self, tag: BufferTag, lsn: Lsn) -> Result<()>;
|
||||
|
||||
/// Put raw data
|
||||
fn put_raw_data(&self, key: RepositoryKey, data: &[u8]) -> Result<()>;
|
||||
|
||||
/// Get repository iterator
|
||||
fn iterator(&self) -> Box<dyn RepositoryIterator + '_>;
|
||||
|
||||
/// Create a new database from a template database
|
||||
///
|
||||
/// In PostgreSQL, CREATE DATABASE works by scanning the data directory and
|
||||
@@ -81,7 +92,49 @@ pub trait Timeline {
|
||||
tablespace_id: Oid,
|
||||
src_db_id: Oid,
|
||||
src_tablespace_id: Oid,
|
||||
) -> Result<()>;
|
||||
) -> Result<()> {
|
||||
let mut n = 0;
|
||||
for forknum in &[
|
||||
pg_constants::MAIN_FORKNUM,
|
||||
pg_constants::FSM_FORKNUM,
|
||||
pg_constants::VISIBILITYMAP_FORKNUM,
|
||||
pg_constants::INIT_FORKNUM,
|
||||
pg_constants::PG_FILENODEMAP_FORKNUM,
|
||||
] {
|
||||
let key = RepositoryKey {
|
||||
tag: BufferTag {
|
||||
rel: RelTag {
|
||||
spcnode: src_tablespace_id,
|
||||
dbnode: src_db_id,
|
||||
relnode: 0,
|
||||
forknum: *forknum,
|
||||
},
|
||||
blknum: 0,
|
||||
},
|
||||
lsn: Lsn(0),
|
||||
};
|
||||
let mut iter = self.iterator();
|
||||
iter.first(&key);
|
||||
while iter.valid() {
|
||||
let mut key = iter.key();
|
||||
if key.tag.rel.spcnode != src_tablespace_id || key.tag.rel.dbnode != src_db_id {
|
||||
break;
|
||||
}
|
||||
key.tag.rel.spcnode = tablespace_id;
|
||||
key.tag.rel.dbnode = db_id;
|
||||
key.lsn = lsn;
|
||||
|
||||
self.put_raw_data(key, iter.value())?;
|
||||
n += 1;
|
||||
iter.next();
|
||||
}
|
||||
}
|
||||
info!(
|
||||
"Create database {}/{}, copy {} entries",
|
||||
tablespace_id, db_id, n
|
||||
);
|
||||
Ok(())
|
||||
}
|
||||
|
||||
///
|
||||
/// Helper function to parse a WAL record and call the above functions for all the
|
||||
@@ -106,14 +159,18 @@ pub trait Timeline {
|
||||
blknum: blk.blkno,
|
||||
};
|
||||
|
||||
let rec = WALRecord {
|
||||
lsn,
|
||||
will_init: blk.will_init || blk.apply_image,
|
||||
rec: recdata.clone(),
|
||||
main_data_offset: decoded.main_data_offset as u32,
|
||||
};
|
||||
if blk.will_drop {
|
||||
self.put_drop(tag, lsn)?;
|
||||
} else {
|
||||
let rec = WALRecord {
|
||||
lsn,
|
||||
will_init: blk.will_init || blk.apply_image,
|
||||
rec: recdata.clone(),
|
||||
main_data_offset: decoded.main_data_offset as u32,
|
||||
};
|
||||
|
||||
self.put_wal_record(tag, rec);
|
||||
self.put_wal_record(tag, rec)?;
|
||||
}
|
||||
}
|
||||
|
||||
// Handle a few special record types
|
||||
@@ -167,15 +224,163 @@ pub trait Timeline {
|
||||
fn advance_last_record_lsn(&self, lsn: Lsn);
|
||||
fn get_last_record_lsn(&self) -> Lsn;
|
||||
|
||||
//
|
||||
// Wait until WAL has been received up to the given LSN.
|
||||
//
|
||||
fn wait_lsn(&self, lsn: Lsn) -> Result<Lsn>;
|
||||
|
||||
/// Get range [begin,end) of stored blocks. Used mostly for SMGR pseudorelations
|
||||
/// but can be also applied to normal relations.
|
||||
fn get_range(&self, rel: RelTag, lsn: Lsn) -> Result<(u32, u32)>;
|
||||
fn get_range(&self, rel: RelTag, lsn: Lsn) -> Result<(u32, u32)> {
|
||||
let _lsn = self.wait_lsn(lsn)?;
|
||||
let mut key = RepositoryKey {
|
||||
// minimal key to start with
|
||||
tag: BufferTag { rel, blknum: 0 },
|
||||
lsn: Lsn(0),
|
||||
};
|
||||
let mut iter = self.iterator();
|
||||
iter.first(&key);
|
||||
if iter.valid() {
|
||||
let thiskey = iter.key();
|
||||
let tag = thiskey.tag;
|
||||
if tag.rel == rel {
|
||||
// still trversing this relation
|
||||
let first_blknum = tag.blknum;
|
||||
key.tag.blknum = u32::MAX; // maximal key
|
||||
iter.last(&key); // locate last entry
|
||||
if iter.valid() {
|
||||
let thiskey = iter.key();
|
||||
let last_blknum = thiskey.tag.blknum;
|
||||
return Ok((first_blknum, last_blknum + 1)); // upper boundary is exclusive
|
||||
}
|
||||
}
|
||||
}
|
||||
Ok((0, 0)) // empty range
|
||||
}
|
||||
|
||||
/// Get vector of databases (represented using RelTag only dbnode and spcnode fields are used)
|
||||
fn get_databases(&self, lsn: Lsn) -> Result<Vec<RelTag>>;
|
||||
fn get_databases(&self, lsn: Lsn) -> Result<Vec<RelTag>> {
|
||||
let key = RepositoryKey {
|
||||
// minimal key
|
||||
tag: BufferTag {
|
||||
rel: RelTag {
|
||||
forknum: pg_constants::PG_FILENODEMAP_FORKNUM,
|
||||
spcnode: 0,
|
||||
dbnode: 0,
|
||||
relnode: 0,
|
||||
},
|
||||
blknum: 0,
|
||||
},
|
||||
lsn: Lsn(0),
|
||||
};
|
||||
let mut dbs = Vec::new();
|
||||
|
||||
let mut iter = self.iterator();
|
||||
iter.first(&key);
|
||||
let mut prev_tag = key.tag.rel;
|
||||
while iter.valid() {
|
||||
let key = iter.key();
|
||||
if key.tag.rel.forknum != pg_constants::PG_FILENODEMAP_FORKNUM {
|
||||
break; // we are done with this fork
|
||||
}
|
||||
if key.tag.rel != prev_tag && key.lsn <= lsn {
|
||||
prev_tag = key.tag.rel;
|
||||
dbs.push(prev_tag); // collect unique tags
|
||||
}
|
||||
iter.next();
|
||||
}
|
||||
return Ok(dbs);
|
||||
}
|
||||
|
||||
fn get_tx_status(&self, xid: TransactionId, lsn: Lsn) -> Result<u8> {
|
||||
let tag = BufferTag {
|
||||
rel: RelTag {
|
||||
forknum: pg_constants::PG_XACT_FORKNUM,
|
||||
spcnode: 0,
|
||||
dbnode: 0,
|
||||
relnode: 0,
|
||||
},
|
||||
blknum: xid / pg_constants::CLOG_XACTS_PER_PAGE,
|
||||
};
|
||||
let clog_page = self.get_page_at_lsn(tag, lsn)?;
|
||||
let status = transaction_id_get_status(xid, &clog_page[..]);
|
||||
Ok(status)
|
||||
}
|
||||
|
||||
/// Get vector of prepared twophase transactions
|
||||
fn get_twophase(&self, lsn: Lsn) -> Result<Vec<TransactionId>>;
|
||||
fn get_twophase(&self, lsn: Lsn) -> Result<Vec<TransactionId>> {
|
||||
let key = RepositoryKey {
|
||||
// minimal key
|
||||
tag: BufferTag {
|
||||
rel: RelTag {
|
||||
forknum: pg_constants::PG_TWOPHASE_FORKNUM,
|
||||
spcnode: 0,
|
||||
dbnode: 0,
|
||||
relnode: 0,
|
||||
},
|
||||
blknum: 0,
|
||||
},
|
||||
lsn: Lsn(0),
|
||||
};
|
||||
let mut gxacts = Vec::new();
|
||||
|
||||
let mut iter = self.iterator();
|
||||
iter.first(&key);
|
||||
while iter.valid() {
|
||||
let key = iter.key();
|
||||
if key.tag.rel.forknum != pg_constants::PG_TWOPHASE_FORKNUM {
|
||||
break; // we are done with this fork
|
||||
}
|
||||
if key.lsn <= lsn {
|
||||
let xid = key.tag.blknum;
|
||||
if self.get_tx_status(xid, lsn)? == pg_constants::TRANSACTION_STATUS_IN_PROGRESS {
|
||||
gxacts.push(xid);
|
||||
}
|
||||
}
|
||||
iter.next();
|
||||
}
|
||||
return Ok(gxacts);
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug, PartialEq, Eq, PartialOrd, Ord, Clone)]
|
||||
pub struct RepositoryKey {
|
||||
pub tag: BufferTag,
|
||||
pub lsn: Lsn,
|
||||
}
|
||||
|
||||
impl RepositoryKey {
|
||||
fn pack(&self, buf: &mut BytesMut) {
|
||||
self.tag.pack(buf);
|
||||
buf.put_u64(self.lsn.0);
|
||||
}
|
||||
fn unpack(buf: &mut Bytes) -> RepositoryKey {
|
||||
RepositoryKey {
|
||||
tag: BufferTag::unpack(buf),
|
||||
lsn: Lsn::from(buf.get_u64()),
|
||||
}
|
||||
}
|
||||
|
||||
fn from_slice(slice: &[u8]) -> Self {
|
||||
let mut buf = Bytes::copy_from_slice(slice);
|
||||
Self::unpack(&mut buf)
|
||||
}
|
||||
|
||||
fn to_bytes(&self) -> BytesMut {
|
||||
let mut buf = BytesMut::new();
|
||||
self.pack(&mut buf);
|
||||
buf
|
||||
}
|
||||
}
|
||||
|
||||
pub trait RepositoryIterator {
|
||||
fn first(&mut self, key: &RepositoryKey);
|
||||
fn last(&mut self, key: &RepositoryKey);
|
||||
fn next(&mut self);
|
||||
fn prev(&mut self);
|
||||
fn valid(&self) -> bool;
|
||||
fn key(&self) -> RepositoryKey;
|
||||
fn value(&self) -> &[u8];
|
||||
}
|
||||
|
||||
#[derive(Clone)]
|
||||
@@ -362,13 +567,6 @@ mod tests {
|
||||
}
|
||||
|
||||
/// Test get_relsize() and truncation.
|
||||
///
|
||||
/// FIXME: The RocksRepository implementation returns wrong relation size, if
|
||||
/// you make a request with an old LSN. It seems to ignore the requested LSN
|
||||
/// and always return result as of latest LSN. For such cases, the expected
|
||||
/// results below match the current RocksRepository behavior, so that the test
|
||||
/// passes, and the actually correct answers are in comments like
|
||||
/// "// CORRECT: <correct answer>"
|
||||
#[test]
|
||||
fn test_relsize() -> Result<()> {
|
||||
// get_timeline() with non-existent timeline id should fail
|
||||
@@ -380,21 +578,23 @@ mod tests {
|
||||
let tline = repo.create_empty_timeline(timelineid)?;
|
||||
|
||||
tline.init_valid_lsn(Lsn(1));
|
||||
tline.put_page_image(TEST_BUF(0), Lsn(2), TEST_IMG("foo blk 0 at 2"));
|
||||
tline.put_page_image(TEST_BUF(0), Lsn(2), TEST_IMG("foo blk 0 at 2"));
|
||||
tline.put_page_image(TEST_BUF(0), Lsn(3), TEST_IMG("foo blk 0 at 3"));
|
||||
tline.put_page_image(TEST_BUF(1), Lsn(4), TEST_IMG("foo blk 1 at 4"));
|
||||
tline.put_page_image(TEST_BUF(2), Lsn(5), TEST_IMG("foo blk 2 at 5"));
|
||||
tline.put_page_image(TEST_BUF(0), Lsn(2), TEST_IMG("foo blk 0 at 2"))?;
|
||||
tline.put_page_image(TEST_BUF(0), Lsn(2), TEST_IMG("foo blk 0 at 2"))?;
|
||||
tline.put_page_image(TEST_BUF(0), Lsn(3), TEST_IMG("foo blk 0 at 3"))?;
|
||||
tline.put_page_image(TEST_BUF(1), Lsn(4), TEST_IMG("foo blk 1 at 4"))?;
|
||||
tline.put_page_image(TEST_BUF(2), Lsn(5), TEST_IMG("foo blk 2 at 5"))?;
|
||||
|
||||
tline.advance_last_valid_lsn(Lsn(5));
|
||||
|
||||
// FIXME: The rocksdb implementation erroneously returns 'true' here, even
|
||||
// though the relation was created only at a later LSN
|
||||
// rocksdb implementation erroneosly returns 'true' here
|
||||
assert_eq!(tline.get_relsize_exists(TESTREL_A, Lsn(1))?, true); // CORRECT: false
|
||||
// likewise, it returns wrong size here
|
||||
assert_eq!(tline.get_relsize(TESTREL_A, Lsn(1))?, 3); // CORRECT: 0 (or error?)
|
||||
// And this probably should throw an error, becaue the relation doesn't exist at Lsn(1) yet
|
||||
assert_eq!(tline.get_relsize(TESTREL_A, Lsn(1))?, 0); // CORRECT: throw error
|
||||
|
||||
assert_eq!(tline.get_relsize_exists(TESTREL_A, Lsn(2))?, true);
|
||||
assert_eq!(tline.get_relsize(TESTREL_A, Lsn(2))?, 3); // CORRECT: 1
|
||||
assert_eq!(tline.get_relsize(TESTREL_A, Lsn(2))?, 1);
|
||||
assert_eq!(tline.get_relsize(TESTREL_A, Lsn(5))?, 3);
|
||||
|
||||
// Check page contents at each LSN
|
||||
@@ -446,7 +646,7 @@ mod tests {
|
||||
);
|
||||
|
||||
// should still see the truncated block with older LSN
|
||||
assert_eq!(tline.get_relsize(TESTREL_A, Lsn(5))?, 2); // CORRECT: 3
|
||||
assert_eq!(tline.get_relsize(TESTREL_A, Lsn(5))?, 3);
|
||||
assert_eq!(
|
||||
tline.get_page_at_lsn(TEST_BUF(2), Lsn(5))?,
|
||||
TEST_IMG("foo blk 2 at 5")
|
||||
@@ -472,7 +672,7 @@ mod tests {
|
||||
for i in 0..pg_constants::RELSEG_SIZE + 1 {
|
||||
let img = TEST_IMG(&format!("foo blk {} at {}", i, Lsn(lsn)));
|
||||
lsn += 1;
|
||||
tline.put_page_image(TEST_BUF(i as u32), Lsn(lsn), img);
|
||||
tline.put_page_image(TEST_BUF(i as u32), Lsn(lsn), img)?;
|
||||
}
|
||||
tline.advance_last_valid_lsn(Lsn(lsn));
|
||||
|
||||
|
||||
@@ -5,9 +5,8 @@
|
||||
// full page images, keyed by the RelFileNode, blocknumber, and the
|
||||
// LSN.
|
||||
|
||||
use crate::repository::{BufferTag, RelTag, Repository, Timeline, WALRecord};
|
||||
use crate::repository::{BufferTag, RelTag, Repository, Timeline, WALRecord, RepositoryKey, RepositoryIterator};
|
||||
use crate::restore_local_repo::restore_timeline;
|
||||
use crate::waldecoder::{Oid, TransactionId};
|
||||
use crate::walredo::WalRedoManager;
|
||||
use crate::PageServerConf;
|
||||
use crate::ZTimelineId;
|
||||
@@ -16,7 +15,6 @@ use crate::ZTimelineId;
|
||||
use anyhow::{bail, Context, Result};
|
||||
use bytes::{Buf, BufMut, Bytes, BytesMut};
|
||||
use log::*;
|
||||
use postgres_ffi::nonrelfile_utils::transaction_id_get_status;
|
||||
use postgres_ffi::*;
|
||||
use std::cmp::min;
|
||||
use std::collections::HashMap;
|
||||
@@ -30,7 +28,7 @@ use zenith_utils::lsn::{AtomicLsn, Lsn};
|
||||
use zenith_utils::seqwait::SeqWait;
|
||||
|
||||
// Timeout when waiting or WAL receiver to catch up to an LSN given in a GetPage@LSN call.
|
||||
static TIMEOUT: Duration = Duration::from_secs(60);
|
||||
static TIMEOUT: Duration = Duration::from_secs(600);
|
||||
|
||||
pub struct RocksRepository {
|
||||
conf: &'static PageServerConf,
|
||||
@@ -84,40 +82,12 @@ pub struct RocksTimeline {
|
||||
// stored directly in the cache entry in that you still need to run the WAL redo
|
||||
// routine to generate the page image.
|
||||
//
|
||||
#[derive(Debug, PartialEq, Eq, PartialOrd, Ord, Clone)]
|
||||
struct CacheKey {
|
||||
pub tag: BufferTag,
|
||||
pub lsn: Lsn,
|
||||
}
|
||||
|
||||
impl CacheKey {
|
||||
fn pack(&self, buf: &mut BytesMut) {
|
||||
self.tag.pack(buf);
|
||||
buf.put_u64(self.lsn.0);
|
||||
}
|
||||
fn unpack(buf: &mut Bytes) -> CacheKey {
|
||||
CacheKey {
|
||||
tag: BufferTag::unpack(buf),
|
||||
lsn: Lsn::from(buf.get_u64()),
|
||||
}
|
||||
}
|
||||
|
||||
fn from_slice(slice: &[u8]) -> Self {
|
||||
let mut buf = Bytes::copy_from_slice(slice);
|
||||
Self::unpack(&mut buf)
|
||||
}
|
||||
|
||||
fn to_bytes(&self) -> BytesMut {
|
||||
let mut buf = BytesMut::new();
|
||||
self.pack(&mut buf);
|
||||
buf
|
||||
}
|
||||
}
|
||||
|
||||
enum CacheEntryContent {
|
||||
PageImage(Bytes),
|
||||
WALRecord(WALRecord),
|
||||
Truncation,
|
||||
Drop,
|
||||
}
|
||||
|
||||
// The serialized representation of a CacheEntryContent begins with
|
||||
@@ -125,11 +95,12 @@ enum CacheEntryContent {
|
||||
// an UNUSED_VERSION_FLAG that is not represented in the CacheEntryContent
|
||||
// at all, you must peek into the first byte of the serialized representation
|
||||
// to read it.
|
||||
const CONTENT_PAGE_IMAGE: u8 = 1u8;
|
||||
const CONTENT_WAL_RECORD: u8 = 2u8;
|
||||
const CONTENT_TRUNCATION: u8 = 3u8;
|
||||
const CONTENT_PAGE_IMAGE: u8 = 0u8;
|
||||
const CONTENT_WAL_RECORD: u8 = 1u8;
|
||||
const CONTENT_TRUNCATION: u8 = 2u8;
|
||||
const CONTENT_DROP: u8 = 3u8;
|
||||
|
||||
const CONTENT_KIND_MASK: u8 = 3u8; // bitmask that covers the above
|
||||
const CONTENT_KIND_MASK: u8 = 3u8; // bitmask that covers the above
|
||||
|
||||
const UNUSED_VERSION_FLAG: u8 = 4u8;
|
||||
|
||||
@@ -148,6 +119,9 @@ impl CacheEntryContent {
|
||||
CacheEntryContent::Truncation => {
|
||||
buf.put_u8(CONTENT_TRUNCATION);
|
||||
}
|
||||
CacheEntryContent::Drop => {
|
||||
buf.put_u8(CONTENT_DROP);
|
||||
}
|
||||
}
|
||||
}
|
||||
pub fn unpack(buf: &mut Bytes) -> CacheEntryContent {
|
||||
@@ -162,6 +136,7 @@ impl CacheEntryContent {
|
||||
}
|
||||
CONTENT_WAL_RECORD => CacheEntryContent::WALRecord(WALRecord::unpack(buf)),
|
||||
CONTENT_TRUNCATION => CacheEntryContent::Truncation,
|
||||
CONTENT_DROP => CacheEntryContent::Drop,
|
||||
_ => unreachable!(),
|
||||
}
|
||||
}
|
||||
@@ -190,6 +165,33 @@ impl RocksRepository {
|
||||
}
|
||||
}
|
||||
}
|
||||
struct RocksIterator<'a> {
|
||||
iter: rocksdb::DBRawIterator<'a>,
|
||||
}
|
||||
|
||||
impl<'a> RepositoryIterator for RocksIterator<'a> {
|
||||
fn next(&mut self) {
|
||||
self.iter.next()
|
||||
}
|
||||
fn prev(&mut self) {
|
||||
self.iter.prev()
|
||||
}
|
||||
fn valid(&self) -> bool {
|
||||
self.iter.valid()
|
||||
}
|
||||
fn first(&mut self, key: &RepositoryKey) {
|
||||
self.iter.seek(key.to_bytes())
|
||||
}
|
||||
fn last(&mut self, key: &RepositoryKey) {
|
||||
self.iter.seek_for_prev(key.to_bytes())
|
||||
}
|
||||
fn key(&self) -> RepositoryKey {
|
||||
RepositoryKey::from_slice(self.iter.key().unwrap())
|
||||
}
|
||||
fn value(&self) -> &[u8] {
|
||||
self.iter.value().unwrap()
|
||||
}
|
||||
}
|
||||
|
||||
// Get handle to a given timeline. It is assumed to already exist.
|
||||
impl Repository for RocksRepository {
|
||||
@@ -299,21 +301,21 @@ impl RocksTimeline {
|
||||
tag: BufferTag,
|
||||
lsn: Lsn,
|
||||
) -> (Option<Bytes>, Vec<WALRecord>) {
|
||||
let key = CacheKey { tag, lsn };
|
||||
let key = RepositoryKey { tag, lsn };
|
||||
let mut base_img: Option<Bytes> = None;
|
||||
let mut records: Vec<WALRecord> = Vec::new();
|
||||
|
||||
let mut iter = self.db.raw_iterator();
|
||||
iter.seek_for_prev(key.to_bytes());
|
||||
let mut iter = self.iterator();
|
||||
iter.last(&key);
|
||||
|
||||
// Scan backwards, collecting the WAL records, until we hit an
|
||||
// old page image.
|
||||
while iter.valid() {
|
||||
let key = CacheKey::from_slice(iter.key().unwrap());
|
||||
let key = iter.key();
|
||||
if key.tag != tag {
|
||||
break;
|
||||
}
|
||||
let content = CacheEntryContent::from_slice(iter.value().unwrap());
|
||||
let content = CacheEntryContent::from_slice(iter.value());
|
||||
if let CacheEntryContent::PageImage(img) = content {
|
||||
// We have a base image. No need to dig deeper into the list of
|
||||
// records
|
||||
@@ -344,20 +346,26 @@ impl RocksTimeline {
|
||||
fn relsize_get_nowait(&self, rel: RelTag, lsn: Lsn) -> Result<u32> {
|
||||
assert!(lsn <= self.last_valid_lsn.load());
|
||||
|
||||
let mut key = CacheKey {
|
||||
let mut key = RepositoryKey {
|
||||
tag: BufferTag {
|
||||
rel,
|
||||
blknum: u32::MAX,
|
||||
},
|
||||
lsn,
|
||||
};
|
||||
let mut iter = self.db.raw_iterator();
|
||||
let mut iter = self.iterator();
|
||||
loop {
|
||||
iter.seek_for_prev(key.to_bytes());
|
||||
iter.last(&key);
|
||||
if iter.valid() {
|
||||
let thiskey = CacheKey::from_slice(iter.key().unwrap());
|
||||
let thiskey = iter.key();
|
||||
if thiskey.tag.rel == rel {
|
||||
let content = CacheEntryContent::from_slice(iter.value().unwrap());
|
||||
// Ignore entries with later LSNs.
|
||||
if thiskey.lsn > lsn {
|
||||
key.tag.blknum = thiskey.tag.blknum;
|
||||
continue;
|
||||
}
|
||||
|
||||
let content = CacheEntryContent::from_slice(iter.value());
|
||||
if let CacheEntryContent::Truncation = content {
|
||||
if thiskey.tag.blknum > 0 {
|
||||
key.tag.blknum = thiskey.tag.blknum - 1;
|
||||
@@ -376,6 +384,46 @@ impl RocksTimeline {
|
||||
Ok(0)
|
||||
}
|
||||
|
||||
///
|
||||
/// Drop relations with all its forks or non-relational file
|
||||
///
|
||||
fn drop(&self, tag: BufferTag) -> Result<()> {
|
||||
let mut iter = self.iterator();
|
||||
let mut key = RepositoryKey { tag, lsn: Lsn(u64::MAX) };
|
||||
if tag.rel.forknum == pg_constants::MAIN_FORKNUM {
|
||||
// if it is relation then remove all its blocks in all forks
|
||||
key.tag.blknum = u32::MAX;
|
||||
key.tag.rel.forknum = pg_constants::INIT_FORKNUM;
|
||||
} else {
|
||||
assert!(tag.rel.forknum > pg_constants::INIT_FORKNUM);
|
||||
}
|
||||
|
||||
iter.last(&key);
|
||||
while iter.valid() {
|
||||
let key = iter.key();
|
||||
if key.tag.rel.relnode != tag.rel.relnode ||
|
||||
key.tag.rel.spcnode != tag.rel.spcnode ||
|
||||
key.tag.rel.dbnode != tag.rel.dbnode ||
|
||||
(key.tag.rel.forknum != tag.rel.forknum &&
|
||||
tag.rel.forknum != pg_constants::MAIN_FORKNUM)
|
||||
{
|
||||
// no more entries belonging to this relation or file
|
||||
break;
|
||||
}
|
||||
let v = iter.value();
|
||||
if (v[0] & UNUSED_VERSION_FLAG) == 0 {
|
||||
let mut v = v.to_owned();
|
||||
v[0] |= UNUSED_VERSION_FLAG;
|
||||
self.db.put(key.to_bytes(), &v[..])?;
|
||||
} else {
|
||||
// already marked for deletion
|
||||
break;
|
||||
}
|
||||
iter.prev();
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn do_gc(&self, conf: &'static PageServerConf) -> Result<Bytes> {
|
||||
loop {
|
||||
thread::sleep(conf.gc_period);
|
||||
@@ -383,7 +431,7 @@ impl RocksTimeline {
|
||||
|
||||
// checked_sub() returns None on overflow.
|
||||
if let Some(horizon) = last_lsn.checked_sub(conf.gc_horizon) {
|
||||
let mut maxkey = CacheKey {
|
||||
let mut maxkey = RepositoryKey {
|
||||
tag: BufferTag {
|
||||
rel: RelTag {
|
||||
spcnode: u32::MAX,
|
||||
@@ -401,35 +449,50 @@ impl RocksTimeline {
|
||||
let mut inspected = 0u64;
|
||||
let mut deleted = 0u64;
|
||||
loop {
|
||||
let mut iter = self.db.raw_iterator();
|
||||
iter.seek_for_prev(maxkey.to_bytes());
|
||||
let mut iter = self.iterator();
|
||||
iter.last(&maxkey);
|
||||
if iter.valid() {
|
||||
let key = CacheKey::from_slice(iter.key().unwrap());
|
||||
let v = iter.value().unwrap();
|
||||
let key = iter.key();
|
||||
let v = iter.value();
|
||||
let flag = v[0];
|
||||
let last_lsn = key.lsn;
|
||||
|
||||
inspected += 1;
|
||||
|
||||
// Construct boundaries for old records cleanup
|
||||
maxkey.tag = key.tag;
|
||||
let last_lsn = key.lsn;
|
||||
maxkey.lsn = min(horizon, last_lsn); // do not remove last version
|
||||
|
||||
let mut minkey = maxkey.clone();
|
||||
minkey.lsn = Lsn(0); // first version
|
||||
|
||||
// Special handling of delete of PREPARE WAL record
|
||||
if (flag & CONTENT_KIND_MASK) == CONTENT_DROP {
|
||||
// If drop record in over the horizon then delete all entries from repository
|
||||
if last_lsn < horizon {
|
||||
self.drop(key.tag)?;
|
||||
}
|
||||
maxkey = minkey;
|
||||
continue;
|
||||
}
|
||||
|
||||
// Special handling of PREPARE transaction WAL record.
|
||||
if last_lsn < horizon
|
||||
&& key.tag.rel.forknum == pg_constants::PG_TWOPHASE_FORKNUM
|
||||
{
|
||||
if (v[0] & UNUSED_VERSION_FLAG) == 0 {
|
||||
let mut v = v.to_owned();
|
||||
v[0] |= UNUSED_VERSION_FLAG;
|
||||
self.db.put(key.to_bytes(), &v[..])?;
|
||||
deleted += 1;
|
||||
}
|
||||
maxkey = minkey;
|
||||
continue;
|
||||
}
|
||||
let xid = key.tag.blknum;
|
||||
// We can not remove information about uncompleted prepared transaction
|
||||
// even if it is over horizon
|
||||
if self.get_tx_status(xid, horizon)? != pg_constants::TRANSACTION_STATUS_IN_PROGRESS {
|
||||
if (v[0] & UNUSED_VERSION_FLAG) == 0 {
|
||||
let mut v = v.to_owned();
|
||||
v[0] |= UNUSED_VERSION_FLAG;
|
||||
self.db.put(key.to_bytes(), &v[..])?;
|
||||
deleted += 1;
|
||||
}
|
||||
}
|
||||
maxkey = minkey;
|
||||
continue;
|
||||
}
|
||||
// reconstruct most recent page version
|
||||
if (v[0] & CONTENT_KIND_MASK) == CONTENT_WAL_RECORD {
|
||||
// force reconstruction of most recent page version
|
||||
@@ -447,19 +510,19 @@ impl RocksTimeline {
|
||||
let new_img = self
|
||||
.walredo_mgr
|
||||
.request_redo(key.tag, key.lsn, base_img, records)?;
|
||||
self.put_page_image(key.tag, key.lsn, new_img.clone());
|
||||
self.put_page_image(key.tag, key.lsn, new_img.clone())?;
|
||||
|
||||
reconstructed += 1;
|
||||
}
|
||||
|
||||
iter.seek_for_prev(maxkey.to_bytes());
|
||||
iter.last(&maxkey);
|
||||
if iter.valid() {
|
||||
// do not remove last version
|
||||
if last_lsn > horizon {
|
||||
// locate most recent record before horizon
|
||||
let key = CacheKey::from_slice(iter.key().unwrap());
|
||||
let key = iter.key();
|
||||
if key.tag == maxkey.tag {
|
||||
let v = iter.value().unwrap();
|
||||
let v = iter.value();
|
||||
if (v[0] & CONTENT_KIND_MASK) == CONTENT_WAL_RECORD {
|
||||
let (base_img, records) =
|
||||
self.collect_records_for_apply(key.tag, key.lsn);
|
||||
@@ -468,7 +531,7 @@ impl RocksTimeline {
|
||||
let new_img = self
|
||||
.walredo_mgr
|
||||
.request_redo(key.tag, key.lsn, base_img, records)?;
|
||||
self.put_page_image(key.tag, key.lsn, new_img.clone());
|
||||
self.put_page_image(key.tag, key.lsn, new_img.clone())?;
|
||||
|
||||
truncated += 1;
|
||||
} else {
|
||||
@@ -495,11 +558,11 @@ impl RocksTimeline {
|
||||
if !iter.valid() {
|
||||
break;
|
||||
}
|
||||
let key = CacheKey::from_slice(iter.key().unwrap());
|
||||
let key = iter.key();
|
||||
if key.tag != maxkey.tag {
|
||||
break;
|
||||
}
|
||||
let v = iter.value().unwrap();
|
||||
let v = iter.value();
|
||||
if (v[0] & UNUSED_VERSION_FLAG) == 0 {
|
||||
let mut v = v.to_owned();
|
||||
v[0] |= UNUSED_VERSION_FLAG;
|
||||
@@ -526,35 +589,6 @@ impl RocksTimeline {
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
//
|
||||
// Wait until WAL has been received up to the given LSN.
|
||||
//
|
||||
fn wait_lsn(&self, mut lsn: Lsn) -> Result<Lsn> {
|
||||
// When invalid LSN is requested, it means "don't wait, return latest version of the page"
|
||||
// This is necessary for bootstrap.
|
||||
if lsn == Lsn(0) {
|
||||
let last_valid_lsn = self.last_valid_lsn.load();
|
||||
trace!(
|
||||
"walreceiver doesn't work yet last_valid_lsn {}, requested {}",
|
||||
last_valid_lsn,
|
||||
lsn
|
||||
);
|
||||
lsn = last_valid_lsn;
|
||||
}
|
||||
//trace!("Start waiting for LSN {}, valid LSN is {}", lsn, self.last_valid_lsn.load());
|
||||
self.last_valid_lsn
|
||||
.wait_for_timeout(lsn, TIMEOUT)
|
||||
.with_context(|| {
|
||||
format!(
|
||||
"Timed out while waiting for WAL record at LSN {} to arrive",
|
||||
lsn
|
||||
)
|
||||
})?;
|
||||
//trace!("Stop waiting for LSN {}, valid LSN is {}", lsn, self.last_valid_lsn.load());
|
||||
|
||||
Ok(lsn)
|
||||
}
|
||||
}
|
||||
|
||||
impl Timeline for RocksTimeline {
|
||||
@@ -572,15 +606,15 @@ impl Timeline for RocksTimeline {
|
||||
|
||||
// Look up cache entry. If it's a page image, return that. If it's a WAL record,
|
||||
// ask the WAL redo service to reconstruct the page image from the WAL records.
|
||||
let key = CacheKey { tag, lsn };
|
||||
let key = RepositoryKey { tag, lsn };
|
||||
|
||||
let mut iter = self.db.raw_iterator();
|
||||
iter.seek_for_prev(key.to_bytes());
|
||||
let mut iter = self.iterator();
|
||||
iter.last(&key);
|
||||
|
||||
if iter.valid() {
|
||||
let key = CacheKey::from_slice(iter.key().unwrap());
|
||||
let key = iter.key();
|
||||
if key.tag == tag {
|
||||
let content = CacheEntryContent::from_slice(iter.value().unwrap());
|
||||
let content = CacheEntryContent::from_slice(iter.value());
|
||||
let page_img: Bytes;
|
||||
if let CacheEntryContent::PageImage(img) = content {
|
||||
page_img = img;
|
||||
@@ -589,10 +623,16 @@ impl Timeline for RocksTimeline {
|
||||
let (base_img, records) = self.collect_records_for_apply(tag, lsn);
|
||||
page_img = self.walredo_mgr.request_redo(tag, lsn, base_img, records)?;
|
||||
|
||||
self.put_page_image(tag, lsn, page_img.clone());
|
||||
self.put_page_image(tag, lsn, page_img.clone())?;
|
||||
} else {
|
||||
// No base image, and no WAL record. Huh?
|
||||
bail!("no page image or WAL record for requested page");
|
||||
bail!(
|
||||
"no page image or WAL record for requested page {} blk {} at lsn {}({})",
|
||||
tag.rel,
|
||||
tag.blknum,
|
||||
req_lsn,
|
||||
lsn
|
||||
);
|
||||
}
|
||||
// FIXME: assumes little-endian. Only used for the debugging log though
|
||||
let page_lsn_hi =
|
||||
@@ -623,116 +663,6 @@ impl Timeline for RocksTimeline {
|
||||
self.relsize_get_nowait(rel, lsn)
|
||||
}
|
||||
|
||||
/// Get vector of prepared twophase transactions
|
||||
fn get_twophase(&self, lsn: Lsn) -> Result<Vec<TransactionId>> {
|
||||
let key = CacheKey {
|
||||
// minimal key
|
||||
tag: BufferTag {
|
||||
rel: RelTag {
|
||||
forknum: pg_constants::PG_TWOPHASE_FORKNUM,
|
||||
spcnode: 0,
|
||||
dbnode: 0,
|
||||
relnode: 0,
|
||||
},
|
||||
blknum: 0,
|
||||
},
|
||||
lsn: Lsn(0),
|
||||
};
|
||||
let mut gxacts = Vec::new();
|
||||
|
||||
let mut iter = self.db.raw_iterator();
|
||||
iter.seek(key.to_bytes());
|
||||
while iter.valid() {
|
||||
let key = CacheKey::from_slice(iter.key().unwrap());
|
||||
if key.tag.rel.forknum != pg_constants::PG_TWOPHASE_FORKNUM {
|
||||
break; // we are done with this fork
|
||||
}
|
||||
if key.lsn <= lsn {
|
||||
let xid = key.tag.blknum;
|
||||
let tag = BufferTag {
|
||||
rel: RelTag {
|
||||
forknum: pg_constants::PG_XACT_FORKNUM,
|
||||
spcnode: 0,
|
||||
dbnode: 0,
|
||||
relnode: 0,
|
||||
},
|
||||
blknum: xid / pg_constants::CLOG_XACTS_PER_PAGE,
|
||||
};
|
||||
let clog_page = self.get_page_at_lsn(tag, lsn)?;
|
||||
let status = transaction_id_get_status(xid, &clog_page[..]);
|
||||
if status == pg_constants::TRANSACTION_STATUS_IN_PROGRESS {
|
||||
gxacts.push(xid);
|
||||
}
|
||||
}
|
||||
iter.next();
|
||||
}
|
||||
Ok(gxacts)
|
||||
}
|
||||
|
||||
/// Get databases. This function is used to local pg_filenode.map files
|
||||
fn get_databases(&self, lsn: Lsn) -> Result<Vec<RelTag>> {
|
||||
let key = CacheKey {
|
||||
// minimal key
|
||||
tag: BufferTag {
|
||||
rel: RelTag {
|
||||
forknum: pg_constants::PG_FILENODEMAP_FORKNUM,
|
||||
spcnode: 0,
|
||||
dbnode: 0,
|
||||
relnode: 0,
|
||||
},
|
||||
blknum: 0,
|
||||
},
|
||||
lsn: Lsn(0),
|
||||
};
|
||||
let mut dbs = Vec::new();
|
||||
|
||||
let mut iter = self.db.raw_iterator();
|
||||
iter.seek(key.to_bytes());
|
||||
let mut prev_tag = key.tag.rel;
|
||||
while iter.valid() {
|
||||
let key = CacheKey::from_slice(iter.key().unwrap());
|
||||
if key.tag.rel.forknum != pg_constants::PG_FILENODEMAP_FORKNUM {
|
||||
break; // we are done with this fork
|
||||
}
|
||||
if key.tag.rel != prev_tag && key.lsn <= lsn {
|
||||
prev_tag = key.tag.rel;
|
||||
dbs.push(prev_tag); // collect unique tags
|
||||
}
|
||||
iter.next();
|
||||
}
|
||||
Ok(dbs)
|
||||
}
|
||||
|
||||
/// Get range [begin,end) of stored blocks. Used mostly for SMGR pseudorelations
|
||||
/// but can be also applied to normal relations.
|
||||
fn get_range(&self, rel: RelTag, lsn: Lsn) -> Result<(u32, u32)> {
|
||||
let _lsn = self.wait_lsn(lsn)?;
|
||||
let mut key = CacheKey {
|
||||
// minimal key to start with
|
||||
tag: BufferTag { rel, blknum: 0 },
|
||||
lsn: Lsn(0),
|
||||
};
|
||||
let mut iter = self.db.raw_iterator();
|
||||
iter.seek(key.to_bytes()); // locate first entry
|
||||
if iter.valid() {
|
||||
let thiskey = CacheKey::from_slice(iter.key().unwrap());
|
||||
let tag = thiskey.tag;
|
||||
if tag.rel == rel {
|
||||
// still trversing this relation
|
||||
let first_blknum = tag.blknum;
|
||||
key.tag.blknum = u32::MAX; // maximal key
|
||||
let mut iter = self.db.raw_iterator();
|
||||
iter.seek_for_prev(key.to_bytes()); // localte last entry
|
||||
if iter.valid() {
|
||||
let thiskey = CacheKey::from_slice(iter.key().unwrap());
|
||||
let last_blknum = thiskey.tag.blknum;
|
||||
return Ok((first_blknum, last_blknum + 1)); // upper boundary is exclusive
|
||||
}
|
||||
}
|
||||
}
|
||||
Ok((0, 0)) // empty range
|
||||
}
|
||||
|
||||
///
|
||||
/// Does relation exist at given LSN?
|
||||
///
|
||||
@@ -740,17 +670,17 @@ impl Timeline for RocksTimeline {
|
||||
fn get_relsize_exists(&self, rel: RelTag, req_lsn: Lsn) -> Result<bool> {
|
||||
let lsn = self.wait_lsn(req_lsn)?;
|
||||
|
||||
let key = CacheKey {
|
||||
let key = RepositoryKey {
|
||||
tag: BufferTag {
|
||||
rel,
|
||||
blknum: u32::MAX,
|
||||
},
|
||||
lsn,
|
||||
};
|
||||
let mut iter = self.db.raw_iterator();
|
||||
iter.seek_for_prev(key.to_bytes());
|
||||
let mut iter = self.iterator();
|
||||
iter.last(&key);
|
||||
if iter.valid() {
|
||||
let key = CacheKey::from_slice(iter.key().unwrap());
|
||||
let key = iter.key();
|
||||
if key.tag.rel == rel {
|
||||
debug!("Relation {} exists at {}", rel, lsn);
|
||||
return Ok(true);
|
||||
@@ -766,13 +696,13 @@ impl Timeline for RocksTimeline {
|
||||
///
|
||||
/// Adds a WAL record to the repository
|
||||
///
|
||||
fn put_wal_record(&self, tag: BufferTag, rec: WALRecord) {
|
||||
fn put_wal_record(&self, tag: BufferTag, rec: WALRecord) -> Result<()> {
|
||||
let lsn = rec.lsn;
|
||||
let key = CacheKey { tag, lsn };
|
||||
let key = RepositoryKey { tag, lsn };
|
||||
|
||||
let content = CacheEntryContent::WALRecord(rec);
|
||||
|
||||
let _res = self.db.put(key.to_bytes(), content.to_bytes());
|
||||
let _res = self.db.put(key.to_bytes(), content.to_bytes())?;
|
||||
trace!(
|
||||
"put_wal_record rel {} blk {} at {}",
|
||||
tag.rel,
|
||||
@@ -782,8 +712,20 @@ impl Timeline for RocksTimeline {
|
||||
|
||||
self.num_entries.fetch_add(1, Ordering::Relaxed);
|
||||
self.num_wal_records.fetch_add(1, Ordering::Relaxed);
|
||||
Ok(())
|
||||
}
|
||||
|
||||
///
|
||||
/// Put drop record which should completely delete relation with all its forks
|
||||
/// or non-relational file from repository
|
||||
///
|
||||
fn put_drop(&self, tag: BufferTag, lsn: Lsn) -> Result<()> {
|
||||
let key = RepositoryKey { tag, lsn };
|
||||
let content = CacheEntryContent::Drop;
|
||||
let _res = self.db.put(key.to_bytes(), content.to_bytes())?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
///
|
||||
/// Adds a relation-wide WAL record (like truncate) to the repository,
|
||||
/// associating it with all pages started with specified block number
|
||||
@@ -798,12 +740,12 @@ impl Timeline for RocksTimeline {
|
||||
trace!("Truncate relation {} to {} blocks at {}", rel, nblocks, lsn);
|
||||
|
||||
for blknum in nblocks..old_rel_size {
|
||||
let key = CacheKey {
|
||||
let key = RepositoryKey {
|
||||
tag: BufferTag { rel, blknum },
|
||||
lsn,
|
||||
};
|
||||
trace!("put_wal_record lsn: {}", key.lsn);
|
||||
let _res = self.db.put(key.to_bytes(), content.to_bytes());
|
||||
let _res = self.db.put(key.to_bytes(), content.to_bytes())?;
|
||||
}
|
||||
let n = (old_rel_size - nblocks) as u64;
|
||||
self.num_entries.fetch_add(n, Ordering::Relaxed);
|
||||
@@ -815,7 +757,7 @@ impl Timeline for RocksTimeline {
|
||||
/// Get page image at particular LSN
|
||||
///
|
||||
fn get_page_image(&self, tag: BufferTag, lsn: Lsn) -> Result<Option<Bytes>> {
|
||||
let key = CacheKey { tag, lsn };
|
||||
let key = RepositoryKey { tag, lsn };
|
||||
if let Some(bytes) = self.db.get(key.to_bytes())? {
|
||||
let content = CacheEntryContent::from_slice(&bytes);
|
||||
if let CacheEntryContent::PageImage(img) = content {
|
||||
@@ -828,26 +770,12 @@ impl Timeline for RocksTimeline {
|
||||
///
|
||||
/// Memorize a full image of a page version
|
||||
///
|
||||
fn put_page_image(&self, tag: BufferTag, lsn: Lsn, img: Bytes) {
|
||||
let img_len = img.len();
|
||||
let key = CacheKey { tag, lsn };
|
||||
fn put_page_image(&self, tag: BufferTag, lsn: Lsn, img: Bytes) -> Result<()> {
|
||||
let key = RepositoryKey { tag, lsn };
|
||||
let content = CacheEntryContent::PageImage(img);
|
||||
|
||||
let mut val_buf = content.to_bytes();
|
||||
|
||||
// Zero size of page image indicates that page can be removed
|
||||
if img_len == 0 {
|
||||
if (val_buf[0] & UNUSED_VERSION_FLAG) != 0 {
|
||||
// records already marked for deletion
|
||||
return;
|
||||
} else {
|
||||
// delete truncated multixact page
|
||||
val_buf[0] |= UNUSED_VERSION_FLAG;
|
||||
}
|
||||
}
|
||||
|
||||
trace!("put_wal_record lsn: {}", key.lsn);
|
||||
let _res = self.db.put(key.to_bytes(), content.to_bytes());
|
||||
let _res = self.db.put(key.to_bytes(), content.to_bytes())?;
|
||||
|
||||
trace!(
|
||||
"put_page_image rel {} blk {} at {}",
|
||||
@@ -856,57 +784,17 @@ impl Timeline for RocksTimeline {
|
||||
lsn
|
||||
);
|
||||
self.num_page_images.fetch_add(1, Ordering::Relaxed);
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn put_create_database(
|
||||
&self,
|
||||
lsn: Lsn,
|
||||
db_id: Oid,
|
||||
tablespace_id: Oid,
|
||||
src_db_id: Oid,
|
||||
src_tablespace_id: Oid,
|
||||
) -> Result<()> {
|
||||
let mut n = 0;
|
||||
for forknum in &[
|
||||
pg_constants::MAIN_FORKNUM,
|
||||
pg_constants::FSM_FORKNUM,
|
||||
pg_constants::VISIBILITYMAP_FORKNUM,
|
||||
pg_constants::INIT_FORKNUM,
|
||||
pg_constants::PG_FILENODEMAP_FORKNUM,
|
||||
] {
|
||||
let key = CacheKey {
|
||||
tag: BufferTag {
|
||||
rel: RelTag {
|
||||
spcnode: src_tablespace_id,
|
||||
dbnode: src_db_id,
|
||||
relnode: 0,
|
||||
forknum: *forknum,
|
||||
},
|
||||
blknum: 0,
|
||||
},
|
||||
lsn: Lsn(0),
|
||||
};
|
||||
let mut iter = self.db.raw_iterator();
|
||||
iter.seek(key.to_bytes());
|
||||
while iter.valid() {
|
||||
let mut key = CacheKey::from_slice(iter.key().unwrap());
|
||||
if key.tag.rel.spcnode != src_tablespace_id || key.tag.rel.dbnode != src_db_id {
|
||||
break;
|
||||
}
|
||||
key.tag.rel.spcnode = tablespace_id;
|
||||
key.tag.rel.dbnode = db_id;
|
||||
key.lsn = lsn;
|
||||
fn iterator(&self) -> Box<dyn RepositoryIterator + '_> {
|
||||
Box::new(RocksIterator {
|
||||
iter: self.db.raw_iterator(),
|
||||
})
|
||||
}
|
||||
|
||||
let v = iter.value().unwrap();
|
||||
self.db.put(key.to_bytes(), v)?;
|
||||
n += 1;
|
||||
iter.next();
|
||||
}
|
||||
}
|
||||
info!(
|
||||
"Create database {}/{}, copy {} entries",
|
||||
tablespace_id, db_id, n
|
||||
);
|
||||
fn put_raw_data(&self, key: RepositoryKey, data: &[u8]) -> Result<()> {
|
||||
self.db.put(key.to_bytes(), data)?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
@@ -961,6 +849,36 @@ impl Timeline for RocksTimeline {
|
||||
self.last_valid_lsn.load()
|
||||
}
|
||||
|
||||
//
|
||||
// Wait until WAL has been received up to the given LSN.
|
||||
//
|
||||
fn wait_lsn(&self, lsn: Lsn) -> Result<Lsn> {
|
||||
let mut lsn = lsn;
|
||||
// When invalid LSN is requested, it means "don't wait, return latest version of the page"
|
||||
// This is necessary for bootstrap.
|
||||
if lsn == Lsn(0) {
|
||||
let last_valid_lsn = self.last_valid_lsn.load();
|
||||
trace!(
|
||||
"walreceiver doesn't work yet last_valid_lsn {}, requested {}",
|
||||
last_valid_lsn,
|
||||
lsn
|
||||
);
|
||||
lsn = last_valid_lsn;
|
||||
}
|
||||
//trace!("Start waiting for LSN {}, valid LSN is {}", lsn, self.last_valid_lsn.load());
|
||||
self.last_valid_lsn
|
||||
.wait_for_timeout(lsn, TIMEOUT)
|
||||
.with_context(|| {
|
||||
format!(
|
||||
"Timed out while waiting for WAL record at LSN {} to arrive",
|
||||
lsn
|
||||
)
|
||||
})?;
|
||||
//trace!("Stop waiting for LSN {}, valid LSN is {}", lsn, self.last_valid_lsn.load());
|
||||
|
||||
Ok(lsn)
|
||||
}
|
||||
|
||||
//
|
||||
// Get statistics to be displayed in the user interface.
|
||||
//
|
||||
|
||||
@@ -279,7 +279,7 @@ fn restore_relfile(
|
||||
},
|
||||
blknum,
|
||||
};
|
||||
timeline.put_page_image(tag, lsn, Bytes::copy_from_slice(&buf));
|
||||
timeline.put_page_image(tag, lsn, Bytes::copy_from_slice(&buf))?;
|
||||
/*
|
||||
if oldest_lsn == 0 || p.lsn < oldest_lsn {
|
||||
oldest_lsn = p.lsn;
|
||||
@@ -335,7 +335,7 @@ fn restore_nonrel_file(
|
||||
},
|
||||
blknum,
|
||||
};
|
||||
timeline.put_page_image(tag, lsn, Bytes::copy_from_slice(&buffer[..]));
|
||||
timeline.put_page_image(tag, lsn, Bytes::copy_from_slice(&buffer[..]))?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
@@ -369,7 +369,7 @@ fn restore_slru_file(
|
||||
},
|
||||
blknum,
|
||||
};
|
||||
timeline.put_page_image(tag, lsn, Bytes::copy_from_slice(&buf));
|
||||
timeline.put_page_image(tag, lsn, Bytes::copy_from_slice(&buf))?;
|
||||
/*
|
||||
if oldest_lsn == 0 || p.lsn < oldest_lsn {
|
||||
oldest_lsn = p.lsn;
|
||||
@@ -478,6 +478,6 @@ fn restore_wal(timeline: &dyn Timeline, timelineid: ZTimelineId, startpoint: Lsn
|
||||
}
|
||||
info!("reached end of WAL at {}", last_lsn);
|
||||
let checkpoint_bytes = encode_checkpoint(checkpoint);
|
||||
timeline.put_page_image(checkpoint_tag, Lsn(0), checkpoint_bytes);
|
||||
timeline.put_page_image(checkpoint_tag, Lsn(0), checkpoint_bytes)?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
@@ -273,7 +273,8 @@ pub struct DecodedBkpBlock {
|
||||
/* Information on full-page image, if any */
|
||||
has_image: bool, /* has image, even for consistency checking */
|
||||
pub apply_image: bool, /* has image that should be restored */
|
||||
pub will_init: bool,
|
||||
pub will_init: bool, /* record intialize page content */
|
||||
pub will_drop: bool, /* record drops relation */
|
||||
//char *bkp_image;
|
||||
hole_offset: u16,
|
||||
hole_length: u16,
|
||||
@@ -298,6 +299,7 @@ impl DecodedBkpBlock {
|
||||
has_image: false,
|
||||
apply_image: false,
|
||||
will_init: false,
|
||||
will_drop: false,
|
||||
hole_offset: 0,
|
||||
hole_length: 0,
|
||||
bimg_len: 0,
|
||||
@@ -574,10 +576,10 @@ pub fn decode_wal_record(checkpoint: &mut CheckPoint, record: Bytes) -> DecodedW
|
||||
xlogrec.xl_rmid,
|
||||
xlogrec.xl_info
|
||||
);
|
||||
if xlogrec.xl_xid > checkpoint.nextXid.value as u32 {
|
||||
if xlogrec.xl_xid >= checkpoint.nextXid.value as u32 {
|
||||
// TODO: handle XID wraparound
|
||||
checkpoint.nextXid = FullTransactionId {
|
||||
value: (checkpoint.nextXid.value & 0xFFFFFFFF00000000) | xlogrec.xl_xid as u64,
|
||||
value: (checkpoint.nextXid.value & 0xFFFFFFFF00000000) | (xlogrec.xl_xid+1) as u64,
|
||||
};
|
||||
}
|
||||
let remaining = xlogrec.xl_tot_len - SizeOfXLogRecord;
|
||||
@@ -795,25 +797,36 @@ pub fn decode_wal_record(checkpoint: &mut CheckPoint, record: Bytes) -> DecodedW
|
||||
let mut blk = DecodedBkpBlock::new();
|
||||
blk.forknum = pg_constants::PG_XACT_FORKNUM;
|
||||
blk.blkno = buf.get_i32_le() as u32;
|
||||
blk.will_init = true;
|
||||
let info = xlogrec.xl_info & !pg_constants::XLR_INFO_MASK;
|
||||
if info == pg_constants::CLOG_ZEROPAGE {
|
||||
blk.will_init = true;
|
||||
} else {
|
||||
assert!(info == pg_constants::CLOG_TRUNCATE);
|
||||
blk.will_drop = true;
|
||||
checkpoint.oldestXid = buf.get_u32_le();
|
||||
checkpoint.oldestXidDB = buf.get_u32_le();
|
||||
info!("RM_CLOG_ID truncate blkno {} oldestXid {} oldestXidDB {}",
|
||||
blk.blkno, checkpoint.oldestXid, checkpoint.oldestXidDB);
|
||||
}
|
||||
trace!("RM_CLOG_ID updates block {}", blk.blkno);
|
||||
blocks.push(blk);
|
||||
} else if xlogrec.xl_rmid == pg_constants::RM_XACT_ID {
|
||||
let info = xlogrec.xl_info & pg_constants::XLOG_XACT_OPMASK;
|
||||
if info == pg_constants::XLOG_XACT_COMMIT {
|
||||
let mut blk = DecodedBkpBlock::new();
|
||||
blk.forknum = pg_constants::PG_XACT_FORKNUM;
|
||||
blk.blkno = xlogrec.xl_xid / pg_constants::CLOG_XACTS_PER_PAGE;
|
||||
trace!(
|
||||
"XLOG_XACT_COMMIT xl_info {} xl_prev {:X}/{:X} xid {} updates block {} main_data_len {}",
|
||||
xlogrec.xl_info, (xlogrec.xl_prev >> 32),
|
||||
xlogrec.xl_prev & 0xffffffff,
|
||||
xlogrec.xl_xid,
|
||||
blk.blkno,
|
||||
main_data_len
|
||||
);
|
||||
blocks.push(blk);
|
||||
|
||||
if info == pg_constants::XLOG_XACT_COMMIT || info == pg_constants::XLOG_XACT_COMMIT_PREPARED {
|
||||
if info == pg_constants::XLOG_XACT_COMMIT {
|
||||
let mut blk = DecodedBkpBlock::new();
|
||||
blk.forknum = pg_constants::PG_XACT_FORKNUM;
|
||||
blk.blkno = xlogrec.xl_xid / pg_constants::CLOG_XACTS_PER_PAGE;
|
||||
trace!(
|
||||
"XLOG_XACT_COMMIT xl_info {} xl_prev {:X}/{:X} xid {} updates block {} main_data_len {}",
|
||||
xlogrec.xl_info, (xlogrec.xl_prev >> 32),
|
||||
xlogrec.xl_prev & 0xffffffff,
|
||||
xlogrec.xl_xid,
|
||||
blk.blkno,
|
||||
main_data_len
|
||||
);
|
||||
blocks.push(blk);
|
||||
}
|
||||
//parse commit record to extract subtrans entries
|
||||
// xl_xact_commit starts with time of commit
|
||||
let _xact_time = buf.get_i64_le();
|
||||
@@ -847,7 +860,13 @@ pub fn decode_wal_record(checkpoint: &mut CheckPoint, record: Bytes) -> DecodedW
|
||||
let spcnode = buf.get_u32_le();
|
||||
let dbnode = buf.get_u32_le();
|
||||
let relnode = buf.get_u32_le();
|
||||
//TODO handle this too?
|
||||
let mut blk = DecodedBkpBlock::new();
|
||||
blk.forknum = pg_constants::MAIN_FORKNUM;
|
||||
blk.rnode_spcnode = spcnode;
|
||||
blk.rnode_dbnode = dbnode;
|
||||
blk.rnode_relnode = relnode;
|
||||
blk.will_drop = true;
|
||||
blocks.push(blk);
|
||||
trace!(
|
||||
"XLOG_XACT_COMMIT relfilenode {}/{}/{}",
|
||||
spcnode,
|
||||
@@ -864,23 +883,29 @@ pub fn decode_wal_record(checkpoint: &mut CheckPoint, record: Bytes) -> DecodedW
|
||||
}
|
||||
}
|
||||
if xinfo & pg_constants::XACT_XINFO_HAS_TWOPHASE != 0 {
|
||||
let _xid = buf.get_u32_le();
|
||||
let xid = buf.get_u32_le();
|
||||
let mut blk = DecodedBkpBlock::new();
|
||||
blk.forknum = pg_constants::PG_XACT_FORKNUM;
|
||||
blk.blkno = xid / pg_constants::CLOG_XACTS_PER_PAGE;
|
||||
blocks.push(blk);
|
||||
trace!("XLOG_XACT_COMMIT-XACT_XINFO_HAS_TWOPHASE");
|
||||
//TODO handle this to be able to restore pg_twophase on node start
|
||||
}
|
||||
} else if info == pg_constants::XLOG_XACT_ABORT {
|
||||
let mut blk = DecodedBkpBlock::new();
|
||||
blk.forknum = pg_constants::PG_XACT_FORKNUM;
|
||||
blk.blkno = xlogrec.xl_xid / pg_constants::CLOG_XACTS_PER_PAGE;
|
||||
trace!(
|
||||
"XLOG_XACT_ABORT xl_info {} xl_prev {:X}/{:X} xid {} updates block {} main_data_len {}",
|
||||
xlogrec.xl_info, (xlogrec.xl_prev >> 32),
|
||||
xlogrec.xl_prev & 0xffffffff,
|
||||
xlogrec.xl_xid,
|
||||
blk.blkno,
|
||||
main_data_len
|
||||
);
|
||||
blocks.push(blk);
|
||||
} else if info == pg_constants::XLOG_XACT_ABORT || info == pg_constants::XLOG_XACT_ABORT_PREPARED {
|
||||
if info == pg_constants::XLOG_XACT_ABORT {
|
||||
let mut blk = DecodedBkpBlock::new();
|
||||
blk.forknum = pg_constants::PG_XACT_FORKNUM;
|
||||
blk.blkno = xlogrec.xl_xid / pg_constants::CLOG_XACTS_PER_PAGE;
|
||||
trace!(
|
||||
"XLOG_XACT_ABORT xl_info {} xl_prev {:X}/{:X} xid {} updates block {} main_data_len {}",
|
||||
xlogrec.xl_info, (xlogrec.xl_prev >> 32),
|
||||
xlogrec.xl_prev & 0xffffffff,
|
||||
xlogrec.xl_xid,
|
||||
blk.blkno,
|
||||
main_data_len
|
||||
);
|
||||
blocks.push(blk);
|
||||
}
|
||||
//parse abort record to extract subtrans entries
|
||||
// xl_xact_abort starts with time of commit
|
||||
let _xact_time = buf.get_i64_le();
|
||||
@@ -914,7 +939,13 @@ pub fn decode_wal_record(checkpoint: &mut CheckPoint, record: Bytes) -> DecodedW
|
||||
let spcnode = buf.get_u32_le();
|
||||
let dbnode = buf.get_u32_le();
|
||||
let relnode = buf.get_u32_le();
|
||||
//TODO save these too
|
||||
let mut blk = DecodedBkpBlock::new();
|
||||
blk.forknum = pg_constants::MAIN_FORKNUM;
|
||||
blk.rnode_spcnode = spcnode;
|
||||
blk.rnode_dbnode = dbnode;
|
||||
blk.rnode_relnode = relnode;
|
||||
blk.will_drop = true;
|
||||
blocks.push(blk);
|
||||
trace!(
|
||||
"XLOG_XACT_ABORT relfilenode {}/{}/{}",
|
||||
spcnode,
|
||||
@@ -924,7 +955,11 @@ pub fn decode_wal_record(checkpoint: &mut CheckPoint, record: Bytes) -> DecodedW
|
||||
}
|
||||
}
|
||||
if xinfo & pg_constants::XACT_XINFO_HAS_TWOPHASE != 0 {
|
||||
let _xid = buf.get_u32_le();
|
||||
let xid = buf.get_u32_le();
|
||||
let mut blk = DecodedBkpBlock::new();
|
||||
blk.forknum = pg_constants::PG_XACT_FORKNUM;
|
||||
blk.blkno = xid / pg_constants::CLOG_XACTS_PER_PAGE;
|
||||
blocks.push(blk);
|
||||
trace!("XLOG_XACT_ABORT-XACT_XINFO_HAS_TWOPHASE");
|
||||
}
|
||||
} else if info == pg_constants::XLOG_XACT_PREPARE {
|
||||
@@ -932,6 +967,8 @@ pub fn decode_wal_record(checkpoint: &mut CheckPoint, record: Bytes) -> DecodedW
|
||||
blk.forknum = pg_constants::PG_TWOPHASE_FORKNUM;
|
||||
blk.blkno = xlogrec.xl_xid;
|
||||
blk.will_init = true;
|
||||
blocks.push(blk);
|
||||
info!("Prepare transaction {}", xlogrec.xl_xid);
|
||||
}
|
||||
} else if xlogrec.xl_rmid == pg_constants::RM_DBASE_ID {
|
||||
let info = xlogrec.xl_info & !pg_constants::XLR_INFO_MASK;
|
||||
@@ -1067,18 +1104,18 @@ pub fn decode_wal_record(checkpoint: &mut CheckPoint, record: Bytes) -> DecodedW
|
||||
blk.blkno = blkno;
|
||||
blocks.push(blk);
|
||||
}
|
||||
if xlrec.mid > checkpoint.nextMulti {
|
||||
checkpoint.nextMulti = xlrec.mid;
|
||||
if xlrec.mid >= checkpoint.nextMulti {
|
||||
checkpoint.nextMulti = xlrec.mid+1;
|
||||
}
|
||||
if xlrec.moff > checkpoint.nextMultiOffset {
|
||||
checkpoint.nextMultiOffset = xlrec.moff;
|
||||
if xlrec.moff+xlrec.nmembers > checkpoint.nextMultiOffset {
|
||||
checkpoint.nextMultiOffset = xlrec.moff+xlrec.nmembers;
|
||||
}
|
||||
let max_xid = xlrec
|
||||
.members
|
||||
.iter()
|
||||
.fold(checkpoint.nextXid.value as u32, |acc, mbr| {
|
||||
if mbr.xid > acc {
|
||||
mbr.xid
|
||||
if mbr.xid >= acc {
|
||||
mbr.xid+1
|
||||
} else {
|
||||
acc
|
||||
}
|
||||
@@ -1088,7 +1125,7 @@ pub fn decode_wal_record(checkpoint: &mut CheckPoint, record: Bytes) -> DecodedW
|
||||
};
|
||||
} else if info == pg_constants::XLOG_MULTIXACT_TRUNCATE_ID {
|
||||
let xlrec = XlMultiXactTruncate::decode(&mut buf);
|
||||
checkpoint.oldestXid = xlrec.end_trunc_off;
|
||||
checkpoint.oldestMulti = xlrec.end_trunc_off;
|
||||
checkpoint.oldestMultiDB = xlrec.oldest_multi_db;
|
||||
let first_off_blkno =
|
||||
xlrec.start_trunc_off / pg_constants::MULTIXACT_OFFSETS_PER_PAGE as u32;
|
||||
@@ -1098,7 +1135,7 @@ pub fn decode_wal_record(checkpoint: &mut CheckPoint, record: Bytes) -> DecodedW
|
||||
let mut blk = DecodedBkpBlock::new();
|
||||
blk.forknum = pg_constants::PG_MXACT_OFFSETS_FORKNUM;
|
||||
blk.blkno = blkno;
|
||||
blk.will_init = true;
|
||||
blk.will_drop = true;
|
||||
blocks.push(blk);
|
||||
}
|
||||
let first_mbr_blkno =
|
||||
@@ -1109,7 +1146,7 @@ pub fn decode_wal_record(checkpoint: &mut CheckPoint, record: Bytes) -> DecodedW
|
||||
let mut blk = DecodedBkpBlock::new();
|
||||
blk.forknum = pg_constants::PG_MXACT_MEMBERS_FORKNUM;
|
||||
blk.blkno = blkno;
|
||||
blk.will_init = true;
|
||||
blk.will_drop = true;
|
||||
blocks.push(blk);
|
||||
}
|
||||
} else {
|
||||
|
||||
@@ -201,7 +201,7 @@ fn walreceiver_main(
|
||||
|
||||
let new_checkpoint_bytes = encode_checkpoint(checkpoint);
|
||||
if new_checkpoint_bytes != old_checkpoint_bytes {
|
||||
timeline.put_page_image(checkpoint_tag, Lsn(0), new_checkpoint_bytes);
|
||||
timeline.put_page_image(checkpoint_tag, Lsn(0), new_checkpoint_bytes)?;
|
||||
}
|
||||
// Now that this record has been handled, let the page cache know that
|
||||
// it is up-to-date to this LSN
|
||||
|
||||
@@ -286,9 +286,11 @@ impl PostgresRedoManagerInternal {
|
||||
} else if xlogrec.xl_rmid == pg_constants::RM_XACT_ID {
|
||||
let info = xlogrec.xl_info & pg_constants::XLOG_XACT_OPMASK;
|
||||
let mut status = 0;
|
||||
if info == pg_constants::XLOG_XACT_COMMIT {
|
||||
if info == pg_constants::XLOG_XACT_COMMIT || info == pg_constants::XLOG_XACT_COMMIT_PREPARED {
|
||||
status = pg_constants::TRANSACTION_STATUS_COMMITTED;
|
||||
transaction_id_set_status(xlogrec.xl_xid, status, &mut page);
|
||||
if info == pg_constants::XLOG_XACT_COMMIT {
|
||||
transaction_id_set_status(xlogrec.xl_xid, status, &mut page);
|
||||
}
|
||||
//handle subtrans
|
||||
let _xact_time = buf.get_i64_le();
|
||||
let mut xinfo = 0;
|
||||
@@ -312,9 +314,38 @@ impl PostgresRedoManagerInternal {
|
||||
}
|
||||
}
|
||||
}
|
||||
} else if info == pg_constants::XLOG_XACT_ABORT {
|
||||
if info == pg_constants::XLOG_XACT_COMMIT_PREPARED {
|
||||
if xinfo & pg_constants::XACT_XINFO_HAS_RELFILENODES != 0 {
|
||||
let nrels = buf.get_i32_le();
|
||||
for _i in 0..nrels {
|
||||
let spcnode = buf.get_u32_le();
|
||||
let dbnode = buf.get_u32_le();
|
||||
let relnode = buf.get_u32_le();
|
||||
//TODO handle this too?
|
||||
trace!(
|
||||
"XLOG_XACT_COMMIT relfilenode {}/{}/{}",
|
||||
spcnode,
|
||||
dbnode,
|
||||
relnode
|
||||
);
|
||||
}
|
||||
}
|
||||
if xinfo & pg_constants::XACT_XINFO_HAS_INVALS != 0 {
|
||||
let nmsgs = buf.get_i32_le();
|
||||
for _i in 0..nmsgs {
|
||||
let sizeof_shared_invalidation_message = 0;
|
||||
buf.advance(sizeof_shared_invalidation_message);
|
||||
}
|
||||
}
|
||||
assert!((xinfo & pg_constants::XACT_XINFO_HAS_TWOPHASE) != 0);
|
||||
let xid = buf.get_u32_le();
|
||||
transaction_id_set_status(xid, status, &mut page);
|
||||
}
|
||||
} else if info == pg_constants::XLOG_XACT_ABORT || info == pg_constants::XLOG_XACT_ABORT_PREPARED {
|
||||
status = pg_constants::TRANSACTION_STATUS_ABORTED;
|
||||
transaction_id_set_status(xlogrec.xl_xid, status, &mut page);
|
||||
if info == pg_constants::XLOG_XACT_ABORT {
|
||||
transaction_id_set_status(xlogrec.xl_xid, status, &mut page);
|
||||
}
|
||||
//handle subtrans
|
||||
let _xact_time = buf.get_i64_le();
|
||||
let mut xinfo = 0;
|
||||
@@ -338,8 +369,39 @@ impl PostgresRedoManagerInternal {
|
||||
}
|
||||
}
|
||||
}
|
||||
} else if info != pg_constants::XLOG_XACT_PREPARE {
|
||||
trace!("handle_apply_request for RM_XACT_ID-{} NOT SUPPORTED YET. RETURN. lsn {} main_data_offset {}, rec.len {}",
|
||||
if info == pg_constants::XLOG_XACT_ABORT_PREPARED {
|
||||
if xinfo & pg_constants::XACT_XINFO_HAS_RELFILENODES != 0 {
|
||||
let nrels = buf.get_i32_le();
|
||||
for _i in 0..nrels {
|
||||
let spcnode = buf.get_u32_le();
|
||||
let dbnode = buf.get_u32_le();
|
||||
let relnode = buf.get_u32_le();
|
||||
//TODO handle this too?
|
||||
trace!(
|
||||
"XLOG_XACT_COMMIT relfilenode {}/{}/{}",
|
||||
spcnode,
|
||||
dbnode,
|
||||
relnode
|
||||
);
|
||||
}
|
||||
}
|
||||
if xinfo & pg_constants::XACT_XINFO_HAS_INVALS != 0 {
|
||||
let nmsgs = buf.get_i32_le();
|
||||
for _i in 0..nmsgs {
|
||||
let sizeof_shared_invalidation_message = 0;
|
||||
buf.advance(sizeof_shared_invalidation_message);
|
||||
}
|
||||
}
|
||||
assert!((xinfo & pg_constants::XACT_XINFO_HAS_TWOPHASE) != 0);
|
||||
let xid = buf.get_u32_le();
|
||||
transaction_id_set_status(xid, status, &mut page);
|
||||
}
|
||||
} else if info == pg_constants::XLOG_XACT_PREPARE {
|
||||
info!("Apply prepare {} record", xlogrec.xl_xid);
|
||||
page.clear();
|
||||
page.extend_from_slice(&buf[..]);
|
||||
} else {
|
||||
error!("handle_apply_request for RM_XACT_ID-{} NOT SUPPORTED YET. RETURN. lsn {} main_data_offset {}, rec.len {}",
|
||||
status,
|
||||
record.lsn,
|
||||
record.main_data_offset, record.rec.len());
|
||||
@@ -383,9 +445,6 @@ impl PostgresRedoManagerInternal {
|
||||
}
|
||||
}
|
||||
}
|
||||
} else if info == pg_constants::XLOG_MULTIXACT_TRUNCATE_ID {
|
||||
// empty page image indicates that this SLRU page is truncated and can be removed by GC
|
||||
page.clear();
|
||||
} else {
|
||||
panic!();
|
||||
}
|
||||
|
||||
@@ -60,6 +60,8 @@ pub const CLOG_TRUNCATE: u8 = 0x10;
|
||||
pub const XLOG_XACT_COMMIT: u8 = 0x00;
|
||||
pub const XLOG_XACT_PREPARE: u8 = 0x10;
|
||||
pub const XLOG_XACT_ABORT: u8 = 0x20;
|
||||
pub const XLOG_XACT_COMMIT_PREPARED: u8 = 0x30;
|
||||
pub const XLOG_XACT_ABORT_PREPARED: u8 = 0x40;
|
||||
|
||||
// From srlu.h
|
||||
pub const SLRU_PAGES_PER_SEGMENT: u32 = 32;
|
||||
|
||||
@@ -47,6 +47,7 @@ pub fn forknumber_to_name(forknum: u8) -> Option<&'static str> {
|
||||
pg_constants::PG_MXACT_OFFSETS_FORKNUM => Some("mxact_offsets"),
|
||||
pg_constants::PG_MXACT_MEMBERS_FORKNUM => Some("mxact_members"),
|
||||
pg_constants::PG_TWOPHASE_FORKNUM => Some("twophase"),
|
||||
pg_constants::PG_CHECKPOINT_FORKNUM => Some("checkpoint"),
|
||||
|
||||
_ => Some("UNKNOWN FORKNUM"),
|
||||
}
|
||||
|
||||
62
test_runner/batch_others/test_multixact.py
Normal file
62
test_runner/batch_others/test_multixact.py
Normal file
@@ -0,0 +1,62 @@
|
||||
import pytest
|
||||
import os
|
||||
import psycopg2
|
||||
import multiprocessing
|
||||
|
||||
pytest_plugins = ("fixtures.zenith_fixtures")
|
||||
|
||||
def runQuery(connstr):
|
||||
con = psycopg2.connect(connstr)
|
||||
con.set_isolation_level(psycopg2.extensions.ISOLATION_LEVEL_AUTOCOMMIT)
|
||||
cur = con.cursor()
|
||||
cur.execute('select * from t1 for key share;')
|
||||
|
||||
|
||||
def test_multixact(pageserver, postgres, pg_bin, zenith_cli, base_dir):
|
||||
|
||||
# Create a branch for us
|
||||
zenith_cli.run(["branch", "test_multixact", "empty"])
|
||||
pg = postgres.create_start('test_multixact')
|
||||
|
||||
print("postgres is running on 'test_multixact' branch")
|
||||
pg_conn = psycopg2.connect(pg.connstr())
|
||||
pg_conn.set_isolation_level(psycopg2.extensions.ISOLATION_LEVEL_AUTOCOMMIT)
|
||||
cur = pg_conn.cursor()
|
||||
|
||||
cur.execute('CREATE TABLE t1(i int primary key);'
|
||||
'INSERT INTO t1 select * from generate_series(1,100);')
|
||||
|
||||
# Lock entries in parallel connections to set multixact
|
||||
nclients = 3
|
||||
pool = multiprocessing.Pool(nclients)
|
||||
args = [pg.connstr()] * nclients
|
||||
pool.map(runQuery, args)
|
||||
pool.close()
|
||||
pool.join()
|
||||
|
||||
# force wal flush
|
||||
cur.execute('checkpoint')
|
||||
|
||||
cur.execute('SELECT next_multixact_id, pg_current_wal_flush_lsn() FROM pg_control_checkpoint();')
|
||||
res = cur.fetchone()
|
||||
next_multixact_id = res[0]
|
||||
lsn = res[1]
|
||||
|
||||
# Ensure that we did lock some tuples
|
||||
assert(int(next_multixact_id) > 1)
|
||||
|
||||
# Branch at this point
|
||||
zenith_cli.run(["branch", "test_multixact_new", "test_multixact@"+lsn]);
|
||||
pg_new = postgres.create_start('test_multixact_new')
|
||||
|
||||
print("postgres is running on 'test_multixact_new' branch")
|
||||
pg_new_conn = psycopg2.connect(pg_new.connstr())
|
||||
pg_new_conn.set_isolation_level(psycopg2.extensions.ISOLATION_LEVEL_AUTOCOMMIT)
|
||||
cur_new = pg_new_conn.cursor()
|
||||
|
||||
cur_new.execute('SELECT next_multixact_id FROM pg_control_checkpoint();')
|
||||
next_multixact_id_new = cur_new.fetchone()[0]
|
||||
|
||||
# Check that we restored pg_controlfile correctly
|
||||
# TODO compare content of pg_multixact files?
|
||||
assert(next_multixact_id_new == next_multixact_id)
|
||||
61
test_runner/batch_others/test_oldestXid.py
Normal file
61
test_runner/batch_others/test_oldestXid.py
Normal file
@@ -0,0 +1,61 @@
|
||||
import pytest
|
||||
import getpass
|
||||
import psycopg2
|
||||
import time
|
||||
import os
|
||||
|
||||
pytest_plugins = ("fixtures.zenith_fixtures")
|
||||
|
||||
#
|
||||
# Test pg_control values after recreating a postgres instance
|
||||
#
|
||||
def test_oldestxid(zenith_cli, pageserver, postgres, pg_bin, repo_dir):
|
||||
zenith_cli.run(["branch", "test_oldestxid", "empty"]);
|
||||
|
||||
pg = postgres.create_start('test_oldestxid')
|
||||
print("postgres is running on 'test_oldestxid' branch")
|
||||
|
||||
pg_conn = psycopg2.connect(pg.connstr());
|
||||
pg_conn.set_isolation_level(psycopg2.extensions.ISOLATION_LEVEL_AUTOCOMMIT)
|
||||
cur = pg_conn.cursor()
|
||||
|
||||
# Create table, and insert a row
|
||||
cur.execute('CREATE TABLE foo (t text)');
|
||||
cur.execute("INSERT INTO foo VALUES ('bar')");
|
||||
|
||||
cur.execute('checkpoint')
|
||||
cur.execute('SELECT oldest_xid, oldest_xid_dbid, oldest_active_xid FROM pg_control_checkpoint();')
|
||||
res = cur.fetchone()
|
||||
oldest_xid = res[0]
|
||||
oldest_xid_dbid = res[1]
|
||||
oldest_active_xid = res[2]
|
||||
|
||||
# Stop, and destroy the Postgres instance. Then recreate and restart it.
|
||||
pg_conn.close();
|
||||
pg.stop();
|
||||
|
||||
# capture old pg_controldata output for debugging purposes
|
||||
pgdatadir = os.path.join(repo_dir, 'pgdatadirs/test_oldestxid')
|
||||
pg_bin.run_capture(['pg_controldata', pgdatadir])
|
||||
|
||||
pg.destroy();
|
||||
pg.create_start('test_oldestxid');
|
||||
pg_conn = psycopg2.connect(pg.connstr());
|
||||
pg_conn.set_isolation_level(psycopg2.extensions.ISOLATION_LEVEL_AUTOCOMMIT)
|
||||
cur = pg_conn.cursor()
|
||||
|
||||
cur.execute('SELECT oldest_xid, oldest_xid_dbid, oldest_active_xid FROM pg_control_checkpoint();')
|
||||
res = cur.fetchone()
|
||||
oldest_xid_new = res[0]
|
||||
oldest_xid_dbid_new = res[1]
|
||||
oldest_active_xid_new = res[2]
|
||||
|
||||
assert(oldest_xid_new == oldest_xid)
|
||||
assert(oldest_xid_dbid_new == oldest_xid_dbid)
|
||||
|
||||
# this field should be reset at restart
|
||||
assert(int(oldest_active_xid_new) == 0)
|
||||
|
||||
# capture new pg_controldata output for debugging purposes
|
||||
pgdatadir = os.path.join(repo_dir, 'pgdatadirs/test_oldestxid')
|
||||
pg_bin.run_capture(['pg_controldata', pgdatadir])
|
||||
60
test_runner/batch_others/test_restart_compute.py
Normal file
60
test_runner/batch_others/test_restart_compute.py
Normal file
@@ -0,0 +1,60 @@
|
||||
import pytest
|
||||
import getpass
|
||||
import psycopg2
|
||||
import time
|
||||
|
||||
pytest_plugins = ("fixtures.zenith_fixtures")
|
||||
|
||||
#
|
||||
# Test restarting and recreating a postgres instance
|
||||
#
|
||||
def test_restart_compute(zenith_cli, pageserver, postgres, pg_bin):
|
||||
zenith_cli.run(["branch", "test_restart_compute", "empty"]);
|
||||
|
||||
pg = postgres.create_start('test_restart_compute')
|
||||
print("postgres is running on 'test_restart_compute' branch")
|
||||
|
||||
pg_conn = psycopg2.connect(pg.connstr());
|
||||
pg_conn.set_isolation_level(psycopg2.extensions.ISOLATION_LEVEL_AUTOCOMMIT)
|
||||
cur = pg_conn.cursor()
|
||||
|
||||
# Create table, and insert a row
|
||||
cur.execute('CREATE TABLE foo (t text)');
|
||||
cur.execute("INSERT INTO foo VALUES ('bar')");
|
||||
|
||||
# Stop and restart the Postgres instance
|
||||
pg_conn.close();
|
||||
pg.stop();
|
||||
pg.start();
|
||||
pg_conn = psycopg2.connect(pg.connstr());
|
||||
pg_conn.set_isolation_level(psycopg2.extensions.ISOLATION_LEVEL_AUTOCOMMIT)
|
||||
cur = pg_conn.cursor()
|
||||
|
||||
# We can still see the row
|
||||
cur.execute('SELECT count(*) FROM foo');
|
||||
assert(cur.fetchone()[0] == 1);
|
||||
|
||||
# Insert another row
|
||||
cur.execute("INSERT INTO foo VALUES ('bar2')");
|
||||
cur.execute('SELECT count(*) FROM foo');
|
||||
assert(cur.fetchone()[0] == 2);
|
||||
|
||||
# FIXME: Currently, there is no guarantee that by the time the INSERT commits, the WAL
|
||||
# has been streamed safely to the WAL safekeeper or page server. It is merely stored
|
||||
# on the Postgres instance's local disk. Sleep a little, to give it time to be
|
||||
# streamed. This should be removed, when we have the ability to run the Postgres
|
||||
# instance -> safekeeper streaming in synchronous mode.
|
||||
time.sleep(5)
|
||||
|
||||
# Stop, and destroy the Postgres instance. Then recreate and restart it.
|
||||
pg_conn.close();
|
||||
pg.stop();
|
||||
pg.destroy();
|
||||
pg.create_start('test_restart_compute');
|
||||
pg_conn = psycopg2.connect(pg.connstr());
|
||||
pg_conn.set_isolation_level(psycopg2.extensions.ISOLATION_LEVEL_AUTOCOMMIT)
|
||||
cur = pg_conn.cursor()
|
||||
|
||||
# We can still see the rows
|
||||
cur.execute('SELECT count(*) FROM foo');
|
||||
assert(cur.fetchone()[0] == 2);
|
||||
58
test_runner/batch_others/test_twophase.py
Normal file
58
test_runner/batch_others/test_twophase.py
Normal file
@@ -0,0 +1,58 @@
|
||||
#
|
||||
# Test branching, when a transaction is in prepared state
|
||||
#
|
||||
import pytest
|
||||
import getpass
|
||||
import psycopg2
|
||||
|
||||
pytest_plugins = ("fixtures.zenith_fixtures")
|
||||
|
||||
def test_twophase(zenith_cli, pageserver, postgres, pg_bin):
|
||||
zenith_cli.run(["branch", "test_twophase", "empty"]);
|
||||
|
||||
pg = postgres.create_start('test_twophase', ['max_prepared_transactions=5'])
|
||||
print("postgres is running on 'test_twophase' branch")
|
||||
|
||||
conn = psycopg2.connect(pg.connstr());
|
||||
conn.set_isolation_level(psycopg2.extensions.ISOLATION_LEVEL_AUTOCOMMIT)
|
||||
cur = conn.cursor()
|
||||
|
||||
cur.execute('CREATE TABLE foo (t text)');
|
||||
|
||||
# Prepare a transaction that will insert a row
|
||||
cur.execute('BEGIN');
|
||||
cur.execute("INSERT INTO foo VALUES ('one')");
|
||||
cur.execute("PREPARE TRANSACTION 'insert_one'");
|
||||
|
||||
# Prepare another transaction that will insert a row
|
||||
cur.execute('BEGIN');
|
||||
cur.execute("INSERT INTO foo VALUES ('two')");
|
||||
cur.execute("PREPARE TRANSACTION 'insert_two'");
|
||||
|
||||
cur.execute('BEGIN');
|
||||
cur.execute("INSERT INTO foo VALUES ('three')");
|
||||
cur.execute("PREPARE TRANSACTION 'insert_three'");
|
||||
cur.execute("COMMIT PREPARED 'insert_three'");
|
||||
|
||||
cur.execute('SELECT pg_current_wal_insert_lsn()');
|
||||
lsn = cur.fetchone()[0]
|
||||
|
||||
# Create a branch with the transaction in prepared state
|
||||
zenith_cli.run(["branch", "test_twophase_prepared", "test_twophase@"+lsn]);
|
||||
|
||||
pg2 = postgres.create_start('test_twophase_prepared', ['max_prepared_transactions=5'])
|
||||
conn2 = psycopg2.connect(pg2.connstr());
|
||||
conn2.set_isolation_level(psycopg2.extensions.ISOLATION_LEVEL_AUTOCOMMIT)
|
||||
cur2 = conn2.cursor()
|
||||
|
||||
# On the new branch, commit one of the prepared transactions, abort the other one.
|
||||
cur2.execute("COMMIT PREPARED 'insert_one'");
|
||||
cur2.execute("ROLLBACK PREPARED 'insert_two'");
|
||||
|
||||
cur2.execute('SELECT * FROM foo');
|
||||
assert(cur2.fetchall() == [('one',),('three',)]);
|
||||
|
||||
# Neither insert is visible on the original branch, the transactions are still
|
||||
# in prepared state there.
|
||||
cur.execute('SELECT * FROM foo');
|
||||
assert(cur.fetchall() == [('three',)]);
|
||||
@@ -168,14 +168,18 @@ class Postgres:
|
||||
self.branch = None
|
||||
# path to conf is <repo_dir>/pgdatadirs/<branch_name>/postgresql.conf
|
||||
|
||||
def create_start(self, branch, config_lines=None):
|
||||
""" create the pg data directory, and start the server """
|
||||
def create(self, branch, config_lines=None):
|
||||
""" create the pg data directory """
|
||||
self.zenith_cli.run(['pg', 'create', branch])
|
||||
self.branch = branch
|
||||
if config_lines is None:
|
||||
config_lines = []
|
||||
self.config(config_lines)
|
||||
self.zenith_cli.run(['pg', 'start', branch])
|
||||
return
|
||||
|
||||
def start(self):
|
||||
""" start the server """
|
||||
self.zenith_cli.run(['pg', 'start', self.branch])
|
||||
self.running = True
|
||||
return
|
||||
|
||||
@@ -189,9 +193,19 @@ class Postgres:
|
||||
conf.write('\n')
|
||||
|
||||
def stop(self):
|
||||
""" stop the server """
|
||||
if self.running:
|
||||
self.zenith_cli.run(['pg', 'stop', self.branch])
|
||||
|
||||
def destroy(self):
|
||||
datadir = os.path.join(self.repo_dir, 'pgdatadirs/{}'.format(self.branch))
|
||||
shutil.rmtree(datadir)
|
||||
|
||||
def create_start(self, branch, config_lines=None):
|
||||
self.create(branch, config_lines);
|
||||
self.start();
|
||||
return
|
||||
|
||||
# Return a libpq connection string to connect to the Postgres instance
|
||||
def connstr(self, dbname='postgres'):
|
||||
conn_str = 'host={} port={} dbname={} user={}'.format(
|
||||
|
||||
Reference in New Issue
Block a user