mirror of
https://github.com/neondatabase/neon.git
synced 2026-01-13 16:32:56 +00:00
Update relation size cache only when latest LSN is requested (#2310)
* Update relation size cache only when latest LSN is requested
* Fix tests
* Add a test case for timetravel query after pageserver restart.
This test is currently failing, the queries return incorrect results.
I don't know why, needs to be investigated.
FAILED test_runner/batch_others/test_readonly_node.py::test_timetravel - assert 85 == 100000
If you remove the pageserver restart from the test, it passes.
* yapf3 test_readonly_node.py
* Add comment about cache correction in case of setting incorrect latest flag
* Fix formatting for test_readonly_node.py
* Remove unused imports
* Fix mypy warning for test_readonly_node.py
* Fix formatting of test_readonly_node.py
* Bump postgres version
Co-authored-by: Heikki Linnakangas <heikki@neon.tech>
This commit is contained in:
committed by
GitHub
parent
aeb1cf9c36
commit
ad057124be
@@ -186,7 +186,7 @@ where
|
||||
}
|
||||
|
||||
fn add_rel(&mut self, tag: RelTag) -> anyhow::Result<()> {
|
||||
let nblocks = self.timeline.get_rel_size(tag, self.lsn)?;
|
||||
let nblocks = self.timeline.get_rel_size(tag, self.lsn, false)?;
|
||||
|
||||
// Function that adds relation segment data to archive
|
||||
let mut add_file = |segment_index, data: &Vec<u8>| -> anyhow::Result<()> {
|
||||
@@ -207,7 +207,9 @@ where
|
||||
for (seg, blocks) in chunks.into_iter().enumerate() {
|
||||
let mut segment_data: Vec<u8> = vec![];
|
||||
for blknum in blocks {
|
||||
let img = self.timeline.get_rel_page_at_lsn(tag, blknum, self.lsn)?;
|
||||
let img = self
|
||||
.timeline
|
||||
.get_rel_page_at_lsn(tag, blknum, self.lsn, false)?;
|
||||
segment_data.extend_from_slice(&img[..]);
|
||||
}
|
||||
|
||||
|
||||
@@ -696,7 +696,7 @@ impl PageServerHandler {
|
||||
let latest_gc_cutoff_lsn = timeline.get_latest_gc_cutoff_lsn();
|
||||
let lsn = Self::wait_or_get_last_lsn(timeline, req.lsn, req.latest, &latest_gc_cutoff_lsn)?;
|
||||
|
||||
let exists = timeline.get_rel_exists(req.rel, lsn)?;
|
||||
let exists = timeline.get_rel_exists(req.rel, lsn, req.latest)?;
|
||||
|
||||
Ok(PagestreamBeMessage::Exists(PagestreamExistsResponse {
|
||||
exists,
|
||||
@@ -712,7 +712,7 @@ impl PageServerHandler {
|
||||
let latest_gc_cutoff_lsn = timeline.get_latest_gc_cutoff_lsn();
|
||||
let lsn = Self::wait_or_get_last_lsn(timeline, req.lsn, req.latest, &latest_gc_cutoff_lsn)?;
|
||||
|
||||
let n_blocks = timeline.get_rel_size(req.rel, lsn)?;
|
||||
let n_blocks = timeline.get_rel_size(req.rel, lsn, req.latest)?;
|
||||
|
||||
Ok(PagestreamBeMessage::Nblocks(PagestreamNblocksResponse {
|
||||
n_blocks,
|
||||
@@ -728,7 +728,8 @@ impl PageServerHandler {
|
||||
let latest_gc_cutoff_lsn = timeline.get_latest_gc_cutoff_lsn();
|
||||
let lsn = Self::wait_or_get_last_lsn(timeline, req.lsn, req.latest, &latest_gc_cutoff_lsn)?;
|
||||
|
||||
let total_blocks = timeline.get_db_size(DEFAULTTABLESPACE_OID, req.dbnode, lsn)?;
|
||||
let total_blocks =
|
||||
timeline.get_db_size(DEFAULTTABLESPACE_OID, req.dbnode, lsn, req.latest)?;
|
||||
|
||||
let db_size = total_blocks as i64 * BLCKSZ as i64;
|
||||
|
||||
@@ -754,7 +755,7 @@ impl PageServerHandler {
|
||||
std::thread::sleep(std::time::Duration::from_millis(1000));
|
||||
}
|
||||
*/
|
||||
let page = timeline.get_rel_page_at_lsn(req.rel, req.blkno, lsn)?;
|
||||
let page = timeline.get_rel_page_at_lsn(req.rel, req.blkno, lsn, req.latest)?;
|
||||
|
||||
Ok(PagestreamBeMessage::GetPage(PagestreamGetPageResponse {
|
||||
page,
|
||||
|
||||
@@ -83,10 +83,16 @@ impl Timeline {
|
||||
//------------------------------------------------------------------------------
|
||||
|
||||
/// Look up given page version.
|
||||
pub fn get_rel_page_at_lsn(&self, tag: RelTag, blknum: BlockNumber, lsn: Lsn) -> Result<Bytes> {
|
||||
pub fn get_rel_page_at_lsn(
|
||||
&self,
|
||||
tag: RelTag,
|
||||
blknum: BlockNumber,
|
||||
lsn: Lsn,
|
||||
latest: bool,
|
||||
) -> Result<Bytes> {
|
||||
ensure!(tag.relnode != 0, "invalid relnode");
|
||||
|
||||
let nblocks = self.get_rel_size(tag, lsn)?;
|
||||
let nblocks = self.get_rel_size(tag, lsn, latest)?;
|
||||
if blknum >= nblocks {
|
||||
debug!(
|
||||
"read beyond EOF at {} blk {} at {}, size is {}: returning all-zeros page",
|
||||
@@ -100,20 +106,20 @@ impl Timeline {
|
||||
}
|
||||
|
||||
// Get size of a database in blocks
|
||||
pub fn get_db_size(&self, spcnode: Oid, dbnode: Oid, lsn: Lsn) -> Result<usize> {
|
||||
pub fn get_db_size(&self, spcnode: Oid, dbnode: Oid, lsn: Lsn, latest: bool) -> Result<usize> {
|
||||
let mut total_blocks = 0;
|
||||
|
||||
let rels = self.list_rels(spcnode, dbnode, lsn)?;
|
||||
|
||||
for rel in rels {
|
||||
let n_blocks = self.get_rel_size(rel, lsn)?;
|
||||
let n_blocks = self.get_rel_size(rel, lsn, latest)?;
|
||||
total_blocks += n_blocks as usize;
|
||||
}
|
||||
Ok(total_blocks)
|
||||
}
|
||||
|
||||
/// Get size of a relation file
|
||||
pub fn get_rel_size(&self, tag: RelTag, lsn: Lsn) -> Result<BlockNumber> {
|
||||
pub fn get_rel_size(&self, tag: RelTag, lsn: Lsn, latest: bool) -> Result<BlockNumber> {
|
||||
ensure!(tag.relnode != 0, "invalid relnode");
|
||||
|
||||
if let Some(nblocks) = self.get_cached_rel_size(&tag, lsn) {
|
||||
@@ -122,7 +128,7 @@ impl Timeline {
|
||||
|
||||
if (tag.forknum == pg_constants::FSM_FORKNUM
|
||||
|| tag.forknum == pg_constants::VISIBILITYMAP_FORKNUM)
|
||||
&& !self.get_rel_exists(tag, lsn)?
|
||||
&& !self.get_rel_exists(tag, lsn, latest)?
|
||||
{
|
||||
// FIXME: Postgres sometimes calls smgrcreate() to create
|
||||
// FSM, and smgrnblocks() on it immediately afterwards,
|
||||
@@ -135,13 +141,21 @@ impl Timeline {
|
||||
let mut buf = self.get(key, lsn)?;
|
||||
let nblocks = buf.get_u32_le();
|
||||
|
||||
// Update relation size cache
|
||||
self.update_cached_rel_size(tag, lsn, nblocks);
|
||||
if latest {
|
||||
// Update relation size cache only if "latest" flag is set.
|
||||
// This flag is set by compute when it is working with most recent version of relation.
|
||||
// Typically master compute node always set latest=true.
|
||||
// Please notice, that even if compute node "by mistake" specifies old LSN but set
|
||||
// latest=true, then it can not cause cache corruption, because with latest=true
|
||||
// pageserver choose max(request_lsn, last_written_lsn) and so cached value will be
|
||||
// associated with most recent value of LSN.
|
||||
self.update_cached_rel_size(tag, lsn, nblocks);
|
||||
}
|
||||
Ok(nblocks)
|
||||
}
|
||||
|
||||
/// Does relation exist?
|
||||
pub fn get_rel_exists(&self, tag: RelTag, lsn: Lsn) -> Result<bool> {
|
||||
pub fn get_rel_exists(&self, tag: RelTag, lsn: Lsn, _latest: bool) -> Result<bool> {
|
||||
ensure!(tag.relnode != 0, "invalid relnode");
|
||||
|
||||
// first try to lookup relation in cache
|
||||
@@ -660,7 +674,7 @@ impl<'a> DatadirModification<'a> {
|
||||
pub fn drop_dbdir(&mut self, spcnode: Oid, dbnode: Oid) -> Result<()> {
|
||||
let req_lsn = self.tline.get_last_record_lsn();
|
||||
|
||||
let total_blocks = self.tline.get_db_size(spcnode, dbnode, req_lsn)?;
|
||||
let total_blocks = self.tline.get_db_size(spcnode, dbnode, req_lsn, true)?;
|
||||
|
||||
// Remove entry from dbdir
|
||||
let buf = self.get(DBDIR_KEY)?;
|
||||
@@ -733,7 +747,7 @@ impl<'a> DatadirModification<'a> {
|
||||
pub fn put_rel_truncation(&mut self, rel: RelTag, nblocks: BlockNumber) -> Result<()> {
|
||||
ensure!(rel.relnode != 0, "invalid relnode");
|
||||
let last_lsn = self.tline.get_last_record_lsn();
|
||||
if self.tline.get_rel_exists(rel, last_lsn)? {
|
||||
if self.tline.get_rel_exists(rel, last_lsn, true)? {
|
||||
let size_key = rel_size_to_key(rel);
|
||||
// Fetch the old size first
|
||||
let old_size = self.get(size_key)?.get_u32_le();
|
||||
@@ -1499,19 +1513,19 @@ mod tests {
|
||||
writer.finish()?;
|
||||
|
||||
// Test read before rel creation. Should error out.
|
||||
assert!(tline.get_rel_page_at_lsn(TESTREL_A, 1, Lsn(0x10)).is_err());
|
||||
assert!(tline.get_rel_page_at_lsn(TESTREL_A, 1, Lsn(0x10), false).is_err());
|
||||
|
||||
// Read block beyond end of relation at different points in time.
|
||||
// These reads should fall into different delta, image, and in-memory layers.
|
||||
assert_eq!(tline.get_rel_page_at_lsn(TESTREL_A, 1, Lsn(0x20))?, ZERO_PAGE);
|
||||
assert_eq!(tline.get_rel_page_at_lsn(TESTREL_A, 1, Lsn(0x25))?, ZERO_PAGE);
|
||||
assert_eq!(tline.get_rel_page_at_lsn(TESTREL_A, 1, Lsn(0x30))?, ZERO_PAGE);
|
||||
assert_eq!(tline.get_rel_page_at_lsn(TESTREL_A, 1, Lsn(0x35))?, ZERO_PAGE);
|
||||
assert_eq!(tline.get_rel_page_at_lsn(TESTREL_A, 1, Lsn(0x40))?, ZERO_PAGE);
|
||||
assert_eq!(tline.get_rel_page_at_lsn(TESTREL_A, 1, Lsn(0x45))?, ZERO_PAGE);
|
||||
assert_eq!(tline.get_rel_page_at_lsn(TESTREL_A, 1, Lsn(0x50))?, ZERO_PAGE);
|
||||
assert_eq!(tline.get_rel_page_at_lsn(TESTREL_A, 1, Lsn(0x55))?, ZERO_PAGE);
|
||||
assert_eq!(tline.get_rel_page_at_lsn(TESTREL_A, 1, Lsn(0x60))?, ZERO_PAGE);
|
||||
assert_eq!(tline.get_rel_page_at_lsn(TESTREL_A, 1, Lsn(0x20), false)?, ZERO_PAGE);
|
||||
assert_eq!(tline.get_rel_page_at_lsn(TESTREL_A, 1, Lsn(0x25), false)?, ZERO_PAGE);
|
||||
assert_eq!(tline.get_rel_page_at_lsn(TESTREL_A, 1, Lsn(0x30), false)?, ZERO_PAGE);
|
||||
assert_eq!(tline.get_rel_page_at_lsn(TESTREL_A, 1, Lsn(0x35), false)?, ZERO_PAGE);
|
||||
assert_eq!(tline.get_rel_page_at_lsn(TESTREL_A, 1, Lsn(0x40), false)?, ZERO_PAGE);
|
||||
assert_eq!(tline.get_rel_page_at_lsn(TESTREL_A, 1, Lsn(0x45), false)?, ZERO_PAGE);
|
||||
assert_eq!(tline.get_rel_page_at_lsn(TESTREL_A, 1, Lsn(0x50), false)?, ZERO_PAGE);
|
||||
assert_eq!(tline.get_rel_page_at_lsn(TESTREL_A, 1, Lsn(0x55), false)?, ZERO_PAGE);
|
||||
assert_eq!(tline.get_rel_page_at_lsn(TESTREL_A, 1, Lsn(0x60), false)?, ZERO_PAGE);
|
||||
|
||||
// Test on an in-memory layer with no preceding layer
|
||||
let mut writer = tline.begin_record(Lsn(0x70));
|
||||
@@ -1523,7 +1537,7 @@ mod tests {
|
||||
)?;
|
||||
writer.finish()?;
|
||||
|
||||
assert_eq!(tline.get_rel_page_at_lsn(TESTREL_B, 1, Lsn(0x70))?, ZERO_PAGE);
|
||||
assert_eq!(tline.get_rel_page_at_lsn(TESTREL_B, 1, Lsn(0x70), false)?6, ZERO_PAGE);
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
@@ -504,7 +504,7 @@ impl<'a> WalIngest<'a> {
|
||||
assert_eq!(src_rel.spcnode, src_tablespace_id);
|
||||
assert_eq!(src_rel.dbnode, src_db_id);
|
||||
|
||||
let nblocks = modification.tline.get_rel_size(src_rel, req_lsn)?;
|
||||
let nblocks = modification.tline.get_rel_size(src_rel, req_lsn, true)?;
|
||||
let dst_rel = RelTag {
|
||||
spcnode: tablespace_id,
|
||||
dbnode: db_id,
|
||||
@@ -521,7 +521,7 @@ impl<'a> WalIngest<'a> {
|
||||
|
||||
let content = modification
|
||||
.tline
|
||||
.get_rel_page_at_lsn(src_rel, blknum, req_lsn)?;
|
||||
.get_rel_page_at_lsn(src_rel, blknum, req_lsn, true)?;
|
||||
modification.put_rel_page_image(dst_rel, blknum, content)?;
|
||||
num_blocks_copied += 1;
|
||||
}
|
||||
@@ -680,7 +680,7 @@ impl<'a> WalIngest<'a> {
|
||||
relnode: xnode.relnode,
|
||||
};
|
||||
let last_lsn = self.timeline.get_last_record_lsn();
|
||||
if modification.tline.get_rel_exists(rel, last_lsn)? {
|
||||
if modification.tline.get_rel_exists(rel, last_lsn, true)? {
|
||||
self.put_rel_drop(modification, rel)?;
|
||||
}
|
||||
}
|
||||
@@ -924,10 +924,10 @@ impl<'a> WalIngest<'a> {
|
||||
}
|
||||
|
||||
fn get_relsize(&mut self, rel: RelTag, lsn: Lsn) -> Result<BlockNumber> {
|
||||
let nblocks = if !self.timeline.get_rel_exists(rel, lsn)? {
|
||||
let nblocks = if !self.timeline.get_rel_exists(rel, lsn, true)? {
|
||||
0
|
||||
} else {
|
||||
self.timeline.get_rel_size(rel, lsn)?
|
||||
self.timeline.get_rel_size(rel, lsn, true)?
|
||||
};
|
||||
Ok(nblocks)
|
||||
}
|
||||
@@ -943,12 +943,12 @@ impl<'a> WalIngest<'a> {
|
||||
// record.
|
||||
// TODO: would be nice if to be more explicit about it
|
||||
let last_lsn = modification.lsn;
|
||||
let old_nblocks = if !self.timeline.get_rel_exists(rel, last_lsn)? {
|
||||
let old_nblocks = if !self.timeline.get_rel_exists(rel, last_lsn, true)? {
|
||||
// create it with 0 size initially, the logic below will extend it
|
||||
modification.put_rel_creation(rel, 0)?;
|
||||
0
|
||||
} else {
|
||||
self.timeline.get_rel_size(rel, last_lsn)?
|
||||
self.timeline.get_rel_size(rel, last_lsn, true)?
|
||||
};
|
||||
|
||||
if new_nblocks > old_nblocks {
|
||||
@@ -1082,43 +1082,43 @@ mod tests {
|
||||
assert_current_logical_size(&*tline, Lsn(0x50));
|
||||
|
||||
// The relation was created at LSN 2, not visible at LSN 1 yet.
|
||||
assert_eq!(tline.get_rel_exists(TESTREL_A, Lsn(0x10))?, false);
|
||||
assert!(tline.get_rel_size(TESTREL_A, Lsn(0x10)).is_err());
|
||||
assert_eq!(tline.get_rel_exists(TESTREL_A, Lsn(0x10), false)?, false);
|
||||
assert!(tline.get_rel_size(TESTREL_A, Lsn(0x10), false).is_err());
|
||||
|
||||
assert_eq!(tline.get_rel_exists(TESTREL_A, Lsn(0x20))?, true);
|
||||
assert_eq!(tline.get_rel_size(TESTREL_A, Lsn(0x20))?, 1);
|
||||
assert_eq!(tline.get_rel_size(TESTREL_A, Lsn(0x50))?, 3);
|
||||
assert_eq!(tline.get_rel_exists(TESTREL_A, Lsn(0x20), false)?, true);
|
||||
assert_eq!(tline.get_rel_size(TESTREL_A, Lsn(0x20), false)?, 1);
|
||||
assert_eq!(tline.get_rel_size(TESTREL_A, Lsn(0x50), false)?, 3);
|
||||
|
||||
// Check page contents at each LSN
|
||||
assert_eq!(
|
||||
tline.get_rel_page_at_lsn(TESTREL_A, 0, Lsn(0x20))?,
|
||||
tline.get_rel_page_at_lsn(TESTREL_A, 0, Lsn(0x20), false)?,
|
||||
TEST_IMG("foo blk 0 at 2")
|
||||
);
|
||||
|
||||
assert_eq!(
|
||||
tline.get_rel_page_at_lsn(TESTREL_A, 0, Lsn(0x30))?,
|
||||
tline.get_rel_page_at_lsn(TESTREL_A, 0, Lsn(0x30), false)?,
|
||||
TEST_IMG("foo blk 0 at 3")
|
||||
);
|
||||
|
||||
assert_eq!(
|
||||
tline.get_rel_page_at_lsn(TESTREL_A, 0, Lsn(0x40))?,
|
||||
tline.get_rel_page_at_lsn(TESTREL_A, 0, Lsn(0x40), false)?,
|
||||
TEST_IMG("foo blk 0 at 3")
|
||||
);
|
||||
assert_eq!(
|
||||
tline.get_rel_page_at_lsn(TESTREL_A, 1, Lsn(0x40))?,
|
||||
tline.get_rel_page_at_lsn(TESTREL_A, 1, Lsn(0x40), false)?,
|
||||
TEST_IMG("foo blk 1 at 4")
|
||||
);
|
||||
|
||||
assert_eq!(
|
||||
tline.get_rel_page_at_lsn(TESTREL_A, 0, Lsn(0x50))?,
|
||||
tline.get_rel_page_at_lsn(TESTREL_A, 0, Lsn(0x50), false)?,
|
||||
TEST_IMG("foo blk 0 at 3")
|
||||
);
|
||||
assert_eq!(
|
||||
tline.get_rel_page_at_lsn(TESTREL_A, 1, Lsn(0x50))?,
|
||||
tline.get_rel_page_at_lsn(TESTREL_A, 1, Lsn(0x50), false)?,
|
||||
TEST_IMG("foo blk 1 at 4")
|
||||
);
|
||||
assert_eq!(
|
||||
tline.get_rel_page_at_lsn(TESTREL_A, 2, Lsn(0x50))?,
|
||||
tline.get_rel_page_at_lsn(TESTREL_A, 2, Lsn(0x50), false)?,
|
||||
TEST_IMG("foo blk 2 at 5")
|
||||
);
|
||||
|
||||
@@ -1129,20 +1129,20 @@ mod tests {
|
||||
assert_current_logical_size(&*tline, Lsn(0x60));
|
||||
|
||||
// Check reported size and contents after truncation
|
||||
assert_eq!(tline.get_rel_size(TESTREL_A, Lsn(0x60))?, 2);
|
||||
assert_eq!(tline.get_rel_size(TESTREL_A, Lsn(0x60), false)?, 2);
|
||||
assert_eq!(
|
||||
tline.get_rel_page_at_lsn(TESTREL_A, 0, Lsn(0x60))?,
|
||||
tline.get_rel_page_at_lsn(TESTREL_A, 0, Lsn(0x60), false)?,
|
||||
TEST_IMG("foo blk 0 at 3")
|
||||
);
|
||||
assert_eq!(
|
||||
tline.get_rel_page_at_lsn(TESTREL_A, 1, Lsn(0x60))?,
|
||||
tline.get_rel_page_at_lsn(TESTREL_A, 1, Lsn(0x60), false)?,
|
||||
TEST_IMG("foo blk 1 at 4")
|
||||
);
|
||||
|
||||
// should still see the truncated block with older LSN
|
||||
assert_eq!(tline.get_rel_size(TESTREL_A, Lsn(0x50))?, 3);
|
||||
assert_eq!(tline.get_rel_size(TESTREL_A, Lsn(0x50), false)?, 3);
|
||||
assert_eq!(
|
||||
tline.get_rel_page_at_lsn(TESTREL_A, 2, Lsn(0x50))?,
|
||||
tline.get_rel_page_at_lsn(TESTREL_A, 2, Lsn(0x50), false)?,
|
||||
TEST_IMG("foo blk 2 at 5")
|
||||
);
|
||||
|
||||
@@ -1150,19 +1150,19 @@ mod tests {
|
||||
let mut m = tline.begin_modification(Lsn(0x68));
|
||||
walingest.put_rel_truncation(&mut m, TESTREL_A, 0)?;
|
||||
m.commit()?;
|
||||
assert_eq!(tline.get_rel_size(TESTREL_A, Lsn(0x68))?, 0);
|
||||
assert_eq!(tline.get_rel_size(TESTREL_A, Lsn(0x68), false)?, 0);
|
||||
|
||||
// Extend from 0 to 2 blocks, leaving a gap
|
||||
let mut m = tline.begin_modification(Lsn(0x70));
|
||||
walingest.put_rel_page_image(&mut m, TESTREL_A, 1, TEST_IMG("foo blk 1"))?;
|
||||
m.commit()?;
|
||||
assert_eq!(tline.get_rel_size(TESTREL_A, Lsn(0x70))?, 2);
|
||||
assert_eq!(tline.get_rel_size(TESTREL_A, Lsn(0x70), false)?, 2);
|
||||
assert_eq!(
|
||||
tline.get_rel_page_at_lsn(TESTREL_A, 0, Lsn(0x70))?,
|
||||
tline.get_rel_page_at_lsn(TESTREL_A, 0, Lsn(0x70), false)?,
|
||||
ZERO_PAGE
|
||||
);
|
||||
assert_eq!(
|
||||
tline.get_rel_page_at_lsn(TESTREL_A, 1, Lsn(0x70))?,
|
||||
tline.get_rel_page_at_lsn(TESTREL_A, 1, Lsn(0x70), false)?,
|
||||
TEST_IMG("foo blk 1")
|
||||
);
|
||||
|
||||
@@ -1170,15 +1170,15 @@ mod tests {
|
||||
let mut m = tline.begin_modification(Lsn(0x80));
|
||||
walingest.put_rel_page_image(&mut m, TESTREL_A, 1500, TEST_IMG("foo blk 1500"))?;
|
||||
m.commit()?;
|
||||
assert_eq!(tline.get_rel_size(TESTREL_A, Lsn(0x80))?, 1501);
|
||||
assert_eq!(tline.get_rel_size(TESTREL_A, Lsn(0x80), false)?, 1501);
|
||||
for blk in 2..1500 {
|
||||
assert_eq!(
|
||||
tline.get_rel_page_at_lsn(TESTREL_A, blk, Lsn(0x80))?,
|
||||
tline.get_rel_page_at_lsn(TESTREL_A, blk, Lsn(0x80), false)?,
|
||||
ZERO_PAGE
|
||||
);
|
||||
}
|
||||
assert_eq!(
|
||||
tline.get_rel_page_at_lsn(TESTREL_A, 1500, Lsn(0x80))?,
|
||||
tline.get_rel_page_at_lsn(TESTREL_A, 1500, Lsn(0x80), false)?,
|
||||
TEST_IMG("foo blk 1500")
|
||||
);
|
||||
|
||||
@@ -1198,8 +1198,8 @@ mod tests {
|
||||
m.commit()?;
|
||||
|
||||
// Check that rel exists and size is correct
|
||||
assert_eq!(tline.get_rel_exists(TESTREL_A, Lsn(0x20))?, true);
|
||||
assert_eq!(tline.get_rel_size(TESTREL_A, Lsn(0x20))?, 1);
|
||||
assert_eq!(tline.get_rel_exists(TESTREL_A, Lsn(0x20), false)?, true);
|
||||
assert_eq!(tline.get_rel_size(TESTREL_A, Lsn(0x20), false)?, 1);
|
||||
|
||||
// Drop rel
|
||||
let mut m = tline.begin_modification(Lsn(0x30));
|
||||
@@ -1207,10 +1207,10 @@ mod tests {
|
||||
m.commit()?;
|
||||
|
||||
// Check that rel is not visible anymore
|
||||
assert_eq!(tline.get_rel_exists(TESTREL_A, Lsn(0x30))?, false);
|
||||
assert_eq!(tline.get_rel_exists(TESTREL_A, Lsn(0x30), false)?, false);
|
||||
|
||||
// FIXME: should fail
|
||||
//assert!(tline.get_rel_size(TESTREL_A, Lsn(0x30))?.is_none());
|
||||
//assert!(tline.get_rel_size(TESTREL_A, Lsn(0x30), false)?.is_none());
|
||||
|
||||
// Re-create it
|
||||
let mut m = tline.begin_modification(Lsn(0x40));
|
||||
@@ -1218,8 +1218,8 @@ mod tests {
|
||||
m.commit()?;
|
||||
|
||||
// Check that rel exists and size is correct
|
||||
assert_eq!(tline.get_rel_exists(TESTREL_A, Lsn(0x40))?, true);
|
||||
assert_eq!(tline.get_rel_size(TESTREL_A, Lsn(0x40))?, 1);
|
||||
assert_eq!(tline.get_rel_exists(TESTREL_A, Lsn(0x40), false)?, true);
|
||||
assert_eq!(tline.get_rel_size(TESTREL_A, Lsn(0x40), false)?, 1);
|
||||
|
||||
Ok(())
|
||||
}
|
||||
@@ -1243,18 +1243,18 @@ mod tests {
|
||||
m.commit()?;
|
||||
|
||||
// The relation was created at LSN 20, not visible at LSN 1 yet.
|
||||
assert_eq!(tline.get_rel_exists(TESTREL_A, Lsn(0x10))?, false);
|
||||
assert!(tline.get_rel_size(TESTREL_A, Lsn(0x10)).is_err());
|
||||
assert_eq!(tline.get_rel_exists(TESTREL_A, Lsn(0x10), false)?, false);
|
||||
assert!(tline.get_rel_size(TESTREL_A, Lsn(0x10), false).is_err());
|
||||
|
||||
assert_eq!(tline.get_rel_exists(TESTREL_A, Lsn(0x20))?, true);
|
||||
assert_eq!(tline.get_rel_size(TESTREL_A, Lsn(0x20))?, relsize);
|
||||
assert_eq!(tline.get_rel_exists(TESTREL_A, Lsn(0x20), false)?, true);
|
||||
assert_eq!(tline.get_rel_size(TESTREL_A, Lsn(0x20), false)?, relsize);
|
||||
|
||||
// Check relation content
|
||||
for blkno in 0..relsize {
|
||||
let lsn = Lsn(0x20);
|
||||
let data = format!("foo blk {} at {}", blkno, lsn);
|
||||
assert_eq!(
|
||||
tline.get_rel_page_at_lsn(TESTREL_A, blkno, lsn)?,
|
||||
tline.get_rel_page_at_lsn(TESTREL_A, blkno, lsn, false)?,
|
||||
TEST_IMG(&data)
|
||||
);
|
||||
}
|
||||
@@ -1266,24 +1266,24 @@ mod tests {
|
||||
m.commit()?;
|
||||
|
||||
// Check reported size and contents after truncation
|
||||
assert_eq!(tline.get_rel_size(TESTREL_A, Lsn(0x60))?, 1);
|
||||
assert_eq!(tline.get_rel_size(TESTREL_A, Lsn(0x60), false)?, 1);
|
||||
|
||||
for blkno in 0..1 {
|
||||
let lsn = Lsn(0x20);
|
||||
let data = format!("foo blk {} at {}", blkno, lsn);
|
||||
assert_eq!(
|
||||
tline.get_rel_page_at_lsn(TESTREL_A, blkno, Lsn(0x60))?,
|
||||
tline.get_rel_page_at_lsn(TESTREL_A, blkno, Lsn(0x60), false)?,
|
||||
TEST_IMG(&data)
|
||||
);
|
||||
}
|
||||
|
||||
// should still see all blocks with older LSN
|
||||
assert_eq!(tline.get_rel_size(TESTREL_A, Lsn(0x50))?, relsize);
|
||||
assert_eq!(tline.get_rel_size(TESTREL_A, Lsn(0x50), false)?, relsize);
|
||||
for blkno in 0..relsize {
|
||||
let lsn = Lsn(0x20);
|
||||
let data = format!("foo blk {} at {}", blkno, lsn);
|
||||
assert_eq!(
|
||||
tline.get_rel_page_at_lsn(TESTREL_A, blkno, Lsn(0x50))?,
|
||||
tline.get_rel_page_at_lsn(TESTREL_A, blkno, Lsn(0x50), false)?,
|
||||
TEST_IMG(&data)
|
||||
);
|
||||
}
|
||||
@@ -1298,14 +1298,14 @@ mod tests {
|
||||
}
|
||||
m.commit()?;
|
||||
|
||||
assert_eq!(tline.get_rel_exists(TESTREL_A, Lsn(0x80))?, true);
|
||||
assert_eq!(tline.get_rel_size(TESTREL_A, Lsn(0x80))?, relsize);
|
||||
assert_eq!(tline.get_rel_exists(TESTREL_A, Lsn(0x80), false)?, true);
|
||||
assert_eq!(tline.get_rel_size(TESTREL_A, Lsn(0x80), false)?, relsize);
|
||||
// Check relation content
|
||||
for blkno in 0..relsize {
|
||||
let lsn = Lsn(0x80);
|
||||
let data = format!("foo blk {} at {}", blkno, lsn);
|
||||
assert_eq!(
|
||||
tline.get_rel_page_at_lsn(TESTREL_A, blkno, Lsn(0x80))?,
|
||||
tline.get_rel_page_at_lsn(TESTREL_A, blkno, Lsn(0x80), false)?,
|
||||
TEST_IMG(&data)
|
||||
);
|
||||
}
|
||||
@@ -1332,14 +1332,17 @@ mod tests {
|
||||
|
||||
assert_current_logical_size(&*tline, Lsn(lsn));
|
||||
|
||||
assert_eq!(tline.get_rel_size(TESTREL_A, Lsn(lsn))?, RELSEG_SIZE + 1);
|
||||
assert_eq!(
|
||||
tline.get_rel_size(TESTREL_A, Lsn(lsn), false)?,
|
||||
RELSEG_SIZE + 1
|
||||
);
|
||||
|
||||
// Truncate one block
|
||||
lsn += 0x10;
|
||||
let mut m = tline.begin_modification(Lsn(lsn));
|
||||
walingest.put_rel_truncation(&mut m, TESTREL_A, RELSEG_SIZE)?;
|
||||
m.commit()?;
|
||||
assert_eq!(tline.get_rel_size(TESTREL_A, Lsn(lsn))?, RELSEG_SIZE);
|
||||
assert_eq!(tline.get_rel_size(TESTREL_A, Lsn(lsn), false)?, RELSEG_SIZE);
|
||||
assert_current_logical_size(&*tline, Lsn(lsn));
|
||||
|
||||
// Truncate another block
|
||||
@@ -1347,7 +1350,10 @@ mod tests {
|
||||
let mut m = tline.begin_modification(Lsn(lsn));
|
||||
walingest.put_rel_truncation(&mut m, TESTREL_A, RELSEG_SIZE - 1)?;
|
||||
m.commit()?;
|
||||
assert_eq!(tline.get_rel_size(TESTREL_A, Lsn(lsn))?, RELSEG_SIZE - 1);
|
||||
assert_eq!(
|
||||
tline.get_rel_size(TESTREL_A, Lsn(lsn), false)?,
|
||||
RELSEG_SIZE - 1
|
||||
);
|
||||
assert_current_logical_size(&*tline, Lsn(lsn));
|
||||
|
||||
// Truncate to 1500, and then truncate all the way down to 0, one block at a time
|
||||
@@ -1359,7 +1365,7 @@ mod tests {
|
||||
walingest.put_rel_truncation(&mut m, TESTREL_A, size as BlockNumber)?;
|
||||
m.commit()?;
|
||||
assert_eq!(
|
||||
tline.get_rel_size(TESTREL_A, Lsn(lsn))?,
|
||||
tline.get_rel_size(TESTREL_A, Lsn(lsn), false)?,
|
||||
size as BlockNumber
|
||||
);
|
||||
|
||||
|
||||
@@ -1,6 +1,6 @@
|
||||
import pytest
|
||||
from fixtures.log_helper import log
|
||||
from fixtures.neon_fixtures import NeonEnv
|
||||
from fixtures.neon_fixtures import NeonEnv, wait_for_last_record_lsn
|
||||
from fixtures.types import Lsn
|
||||
from fixtures.utils import query_scalar
|
||||
|
||||
@@ -101,3 +101,52 @@ def test_readonly_node(neon_simple_env: NeonEnv):
|
||||
node_name="test_readonly_node_preinitdb",
|
||||
lsn=Lsn("0/42"),
|
||||
)
|
||||
|
||||
|
||||
# Similar test, but with more data, and we force checkpoints
|
||||
def test_timetravel(neon_simple_env: NeonEnv):
|
||||
env = neon_simple_env
|
||||
env.neon_cli.create_branch("test_timetravel", "empty")
|
||||
pg = env.postgres.create_start("test_timetravel")
|
||||
|
||||
client = env.pageserver.http_client()
|
||||
|
||||
tenant_id = pg.safe_psql("show neon.tenant_id")[0][0]
|
||||
timeline_id = pg.safe_psql("show neon.timeline_id")[0][0]
|
||||
|
||||
lsns = []
|
||||
|
||||
with pg.cursor() as cur:
|
||||
cur.execute(
|
||||
"""
|
||||
CREATE TABLE testtab(id serial primary key, iteration int, data text);
|
||||
INSERT INTO testtab (iteration, data) SELECT 0, 'data' FROM generate_series(1, 100000);
|
||||
"""
|
||||
)
|
||||
current_lsn = Lsn(query_scalar(cur, "SELECT pg_current_wal_flush_lsn()"))
|
||||
lsns.append((0, current_lsn))
|
||||
|
||||
for i in range(1, 5):
|
||||
with pg.cursor() as cur:
|
||||
cur.execute(f"UPDATE testtab SET iteration = {i}")
|
||||
current_lsn = Lsn(query_scalar(cur, "SELECT pg_current_wal_flush_lsn()"))
|
||||
lsns.append((i, current_lsn))
|
||||
|
||||
# wait until pageserver receives that data
|
||||
wait_for_last_record_lsn(client, tenant_id, timeline_id, current_lsn)
|
||||
|
||||
# run checkpoint manually to force a new layer file
|
||||
env.pageserver.safe_psql(f"checkpoint {tenant_id} {timeline_id}")
|
||||
|
||||
##### Restart pageserver
|
||||
env.postgres.stop_all()
|
||||
env.pageserver.stop()
|
||||
env.pageserver.start()
|
||||
|
||||
for (i, lsn) in lsns:
|
||||
pg_old = env.postgres.create_start(
|
||||
branch_name="test_timetravel", node_name=f"test_old_lsn_{i}", lsn=lsn
|
||||
)
|
||||
with pg_old.cursor() as cur:
|
||||
assert query_scalar(cur, f"select count(*) from testtab where iteration={i}") == 100000
|
||||
assert query_scalar(cur, f"select count(*) from testtab where iteration<>{i}") == 0
|
||||
|
||||
Reference in New Issue
Block a user