[refer #190] Fix bugs in handling realtion drop and truncation

This commit is contained in:
Konstantin Knizhnik
2021-05-26 20:27:43 +03:00
parent 1ccf82f932
commit a29fa4b8be
5 changed files with 162 additions and 52 deletions

View File

@@ -10,6 +10,7 @@ use serde::{Deserialize, Serialize};
use std::fmt;
use std::sync::Arc;
use zenith_utils::lsn::Lsn;
use log::*;
///
/// A repository corresponds to one .zenith directory. One repository holds multiple
@@ -55,14 +56,17 @@ pub trait Timeline {
///
/// This will implicitly extend the relation, if the page is beyond the
/// current end-of-file.
fn put_wal_record(&self, tag: BufferTag, rec: WALRecord);
fn put_wal_record(&self, tag: BufferTag, rec: WALRecord) -> Result<()>;
/// Like put_wal_record, but with ready-made image of the page.
fn put_page_image(&self, tag: BufferTag, lsn: Lsn, img: Bytes);
fn put_page_image(&self, tag: BufferTag, lsn: Lsn, img: Bytes) -> Result<()>;
/// Truncate relation
fn put_truncation(&self, rel: RelTag, lsn: Lsn, nblocks: u32) -> Result<()>;
/// Drop relation or file segment
fn put_drop(&self, tag: BufferTag, lsn: Lsn) -> Result<()>;
/// Create a new database from a template database
///
/// In PostgreSQL, CREATE DATABASE works by scanning the data directory and
@@ -100,14 +104,18 @@ pub trait Timeline {
blknum: blk.blkno,
};
let rec = WALRecord {
lsn,
will_init: blk.will_init || blk.apply_image,
rec: recdata.clone(),
main_data_offset: decoded.main_data_offset as u32,
};
if blk.will_drop {
self.put_drop(tag, lsn)?;
} else {
let rec = WALRecord {
lsn,
will_init: blk.will_init || blk.apply_image,
rec: recdata.clone(),
main_data_offset: decoded.main_data_offset as u32,
};
self.put_wal_record(tag, rec);
self.put_wal_record(tag, rec)?;
}
}
// Handle a few special record types
@@ -116,15 +124,37 @@ pub trait Timeline {
== pg_constants::XLOG_SMGR_TRUNCATE
{
let truncate = XlSmgrTruncate::decode(&decoded);
let mut rel = RelTag {
spcnode: truncate.rnode.spcnode,
dbnode: truncate.rnode.dbnode,
relnode: truncate.rnode.relnode,
forknum: pg_constants::MAIN_FORKNUM,
};
if (truncate.flags & pg_constants::SMGR_TRUNCATE_HEAP) != 0 {
let rel = RelTag {
spcnode: truncate.rnode.spcnode,
dbnode: truncate.rnode.dbnode,
relnode: truncate.rnode.relnode,
forknum: pg_constants::MAIN_FORKNUM,
};
self.put_truncation(rel, lsn, truncate.blkno)?;
}
if (truncate.flags & pg_constants::SMGR_TRUNCATE_FSM) != 0 {
if truncate.blkno == 0 {
rel.forknum = pg_constants::FSM_FORKNUM;
self.put_truncation(rel, lsn, truncate.blkno)?;
} else {
// TODO: handle partial truncation of FSM:
// need to map heap block number to FSM block number
// and clear bits in the tail block
info!("Partial truncation of FSM is not supported");
}
}
if (truncate.flags & pg_constants::SMGR_TRUNCATE_VM) != 0 {
if truncate.blkno == 0 {
rel.forknum = pg_constants::VISIBILITYMAP_FORKNUM;
self.put_truncation(rel, lsn, truncate.blkno)?;
} else {
// TODO: handle partial truncation of VM:
// need to map heap block number to VM block number
// and clear bits in the tail block
info!("Partial truncation of VM is not supported");
}
}
} else if decoded.xl_rmid == pg_constants::RM_DBASE_ID
&& (decoded.xl_info & pg_constants::XLR_RMGR_INFO_MASK)
== pg_constants::XLOG_DBASE_CREATE
@@ -348,11 +378,11 @@ mod tests {
let tline = repo.create_empty_timeline(timelineid, Lsn(0))?;
tline.init_valid_lsn(Lsn(1));
tline.put_page_image(TEST_BUF(0), Lsn(2), TEST_IMG("foo blk 0 at 2"));
tline.put_page_image(TEST_BUF(0), Lsn(2), TEST_IMG("foo blk 0 at 2"));
tline.put_page_image(TEST_BUF(0), Lsn(3), TEST_IMG("foo blk 0 at 3"));
tline.put_page_image(TEST_BUF(1), Lsn(4), TEST_IMG("foo blk 1 at 4"));
tline.put_page_image(TEST_BUF(2), Lsn(5), TEST_IMG("foo blk 2 at 5"));
tline.put_page_image(TEST_BUF(0), Lsn(2), TEST_IMG("foo blk 0 at 2"))?;
tline.put_page_image(TEST_BUF(0), Lsn(2), TEST_IMG("foo blk 0 at 2"))?;
tline.put_page_image(TEST_BUF(0), Lsn(3), TEST_IMG("foo blk 0 at 3"))?;
tline.put_page_image(TEST_BUF(1), Lsn(4), TEST_IMG("foo blk 1 at 4"))?;
tline.put_page_image(TEST_BUF(2), Lsn(5), TEST_IMG("foo blk 2 at 5"))?;
tline.advance_last_valid_lsn(Lsn(5));
@@ -442,7 +472,7 @@ mod tests {
for i in 0..pg_constants::RELSEG_SIZE + 1 {
let img = TEST_IMG(&format!("foo blk {} at {}", i, Lsn(lsn)));
lsn += 1;
tline.put_page_image(TEST_BUF(i as u32), Lsn(lsn), img);
tline.put_page_image(TEST_BUF(i as u32), Lsn(lsn), img)?;
}
tline.advance_last_valid_lsn(Lsn(lsn));

View File

@@ -127,10 +127,12 @@ impl CacheKey {
static LAST_VALID_LSN_KEY: CacheKey = CacheKey::special(0);
static LAST_VALID_RECORD_LSN_KEY: CacheKey = CacheKey::special(1);
#[derive(Debug)]
enum CacheEntryContent {
PageImage(Bytes),
WALRecord(WALRecord),
Truncation,
Drop,
}
// The serialized representation of a CacheEntryContent begins with
@@ -138,9 +140,10 @@ enum CacheEntryContent {
// an UNUSED_VERSION_FLAG that is not represented in the CacheEntryContent
// at all, you must peek into the first byte of the serialized representation
// to read it.
const CONTENT_PAGE_IMAGE: u8 = 1u8;
const CONTENT_WAL_RECORD: u8 = 2u8;
const CONTENT_TRUNCATION: u8 = 3u8;
const CONTENT_PAGE_IMAGE: u8 = 0u8;
const CONTENT_WAL_RECORD: u8 = 1u8;
const CONTENT_TRUNCATION: u8 = 2u8;
const CONTENT_DROP: u8 = 3u8;
const CONTENT_KIND_MASK: u8 = 3u8; // bitmask that covers the above
@@ -161,6 +164,9 @@ impl CacheEntryContent {
CacheEntryContent::Truncation => {
buf.put_u8(CONTENT_TRUNCATION);
}
CacheEntryContent::Drop => {
buf.put_u8(CONTENT_DROP);
}
}
}
pub fn unpack(buf: &mut Bytes) -> CacheEntryContent {
@@ -175,6 +181,7 @@ impl CacheEntryContent {
}
CONTENT_WAL_RECORD => CacheEntryContent::WALRecord(WALRecord::unpack(buf)),
CONTENT_TRUNCATION => CacheEntryContent::Truncation,
CONTENT_DROP => CacheEntryContent::Drop,
_ => unreachable!(),
}
}
@@ -296,8 +303,9 @@ impl RocksTimeline {
let mut opts = rocksdb::Options::default();
opts.set_use_fsync(true);
opts.set_compression_type(rocksdb::DBCompressionType::Lz4);
opts.set_compaction_filter("ttl", move |_level: u32, _key: &[u8], val: &[u8]| {
opts.set_compaction_filter("ttl", move |_level: u32, key: &[u8], val: &[u8]| {
if (val[0] & UNUSED_VERSION_FLAG) != 0 {
trace!("Delete unused version {:?}", CacheKey::des(key));
rocksdb::compaction_filter::Decision::Remove
} else {
rocksdb::compaction_filter::Decision::Keep
@@ -480,9 +488,53 @@ impl RocksTimeline {
Ok(0)
}
///
/// Drop relations with all its forks or non-relational file
///
fn drop(&self, tag: BufferTag) -> Result<()> {
let mut iter = self.db.raw_iterator();
let mut key = CacheKey {
tag,
lsn: Lsn(u64::MAX),
};
if tag.rel.forknum == pg_constants::MAIN_FORKNUM {
// if it is relation then remove all its blocks in all forks
key.tag.blknum = u32::MAX;
key.tag.rel.forknum = pg_constants::INIT_FORKNUM;
} else {
assert!(tag.rel.forknum > pg_constants::INIT_FORKNUM);
}
debug!("Drop relation {:?}", tag);
iter.seek_for_prev(key.ser()?);
while iter.valid() {
let key = CacheKey::des(iter.key().unwrap())?;
if key.tag.rel.relnode != tag.rel.relnode
|| key.tag.rel.spcnode != tag.rel.spcnode
|| key.tag.rel.dbnode != tag.rel.dbnode
|| (key.tag.rel.forknum != tag.rel.forknum
&& tag.rel.forknum != pg_constants::MAIN_FORKNUM)
{
// no more entries belonging to this relation or file
break;
}
let v = iter.value().unwrap();
if (v[0] & UNUSED_VERSION_FLAG) == 0 {
let mut v = v.to_owned();
v[0] |= UNUSED_VERSION_FLAG;
self.db.put(key.ser()?, &v[..])?;
} else {
// already marked for deletion
break;
}
iter.prev();
}
Ok(())
}
fn do_gc(&self, conf: &'static PageServerConf) -> Result<Bytes> {
loop {
thread::sleep(conf.gc_period);
trace!("Start GC iteration");
let last_lsn = self.get_last_valid_lsn();
// checked_sub() returns None on overflow.
@@ -510,17 +562,27 @@ impl RocksTimeline {
if iter.valid() {
let key = CacheKey::des(iter.key().unwrap())?;
let v = iter.value().unwrap();
let flag = v[0];
let last_lsn = key.lsn;
inspected += 1;
// Construct boundaries for old records cleanup
maxkey.tag = key.tag;
let last_lsn = key.lsn;
maxkey.lsn = min(horizon, last_lsn); // do not remove last version
let mut minkey = maxkey.clone();
minkey.lsn = Lsn(0); // first version
if (flag & CONTENT_KIND_MASK) == CONTENT_DROP {
// If drop record in over the horizon then delete all entries from repository
if last_lsn < horizon {
self.drop(key.tag)?;
}
maxkey = minkey;
continue;
}
// reconstruct most recent page version
if (v[0] & CONTENT_KIND_MASK) == CONTENT_WAL_RECORD {
// force reconstruction of most recent page version
@@ -538,7 +600,7 @@ impl RocksTimeline {
let new_img = self
.walredo_mgr
.request_redo(key.tag, key.lsn, base_img, records)?;
self.put_page_image(key.tag, key.lsn, new_img.clone());
self.put_page_image(key.tag, key.lsn, new_img.clone())?;
reconstructed += 1;
}
@@ -559,7 +621,7 @@ impl RocksTimeline {
let new_img = self
.walredo_mgr
.request_redo(key.tag, key.lsn, base_img, records)?;
self.put_page_image(key.tag, key.lsn, new_img.clone());
self.put_page_image(key.tag, key.lsn, new_img.clone())?;
truncated += 1;
} else {
@@ -663,6 +725,7 @@ impl Timeline for RocksTimeline {
///
fn get_page_at_lsn(&self, tag: BufferTag, req_lsn: Lsn) -> Result<Bytes> {
self.num_getpage_requests.fetch_add(1, Ordering::Relaxed);
const ZERO_PAGE: [u8; 8192] = [0u8; 8192];
let lsn = self.wait_lsn(req_lsn)?;
@@ -685,7 +748,10 @@ impl Timeline for RocksTimeline {
let (base_img, records) = self.collect_records_for_apply(tag, lsn);
page_img = self.walredo_mgr.request_redo(tag, lsn, base_img, records)?;
self.put_page_image(tag, lsn, page_img.clone());
self.put_page_image(tag, lsn, page_img.clone())?;
} else if let CacheEntryContent::Truncation = content {
// Because FSM pages may be not up-to-date we can request truncated page
return Ok(Bytes::from_static(&ZERO_PAGE));
} else {
// No base image, and no WAL record. Huh?
bail!("no page image or WAL record for requested page");
@@ -702,7 +768,6 @@ impl Timeline for RocksTimeline {
return Ok(page_img);
}
}
static ZERO_PAGE: [u8; 8192] = [0u8; 8192];
debug!(
"Page {} blk {} at {}({}) not found",
tag.rel, tag.blknum, req_lsn, lsn
@@ -752,14 +817,14 @@ impl Timeline for RocksTimeline {
///
/// Adds a WAL record to the repository
///
fn put_wal_record(&self, tag: BufferTag, rec: WALRecord) {
fn put_wal_record(&self, tag: BufferTag, rec: WALRecord) -> Result<()> {
let lsn = rec.lsn;
let key = CacheKey { tag, lsn };
let content = CacheEntryContent::WALRecord(rec);
let serialized_key = key.ser().expect("serialize CacheKey should always succeed");
let _res = self.db.put(serialized_key, content.to_bytes());
let _res = self.db.put(serialized_key, content.to_bytes())?;
trace!(
"put_wal_record rel {} blk {} at {}",
tag.rel,
@@ -769,6 +834,18 @@ impl Timeline for RocksTimeline {
self.num_entries.fetch_add(1, Ordering::Relaxed);
self.num_wal_records.fetch_add(1, Ordering::Relaxed);
Ok(())
}
///
/// Put drop record which should completely delete relation with all its forks
/// or non-relational file from repository
///
fn put_drop(&self, tag: BufferTag, lsn: Lsn) -> Result<()> {
let key = CacheKey { tag, lsn };
let content = CacheEntryContent::Drop;
let _res = self.db.put(key.ser()?, content.to_bytes())?;
Ok(())
}
///
@@ -801,27 +878,13 @@ impl Timeline for RocksTimeline {
///
/// Memorize a full image of a page version
///
fn put_page_image(&self, tag: BufferTag, lsn: Lsn, img: Bytes) {
let img_len = img.len();
fn put_page_image(&self, tag: BufferTag, lsn: Lsn, img: Bytes) -> Result<()> {
let key = CacheKey { tag, lsn };
let content = CacheEntryContent::PageImage(img);
let mut val_buf = content.to_bytes();
// Zero size of page image indicates that page can be removed
if img_len == 0 {
if (val_buf[0] & UNUSED_VERSION_FLAG) != 0 {
// records already marked for deletion
return;
} else {
// delete truncated multixact page
val_buf[0] |= UNUSED_VERSION_FLAG;
}
}
trace!("put_wal_record lsn: {}", key.lsn);
let serialized_key = key.ser().expect("serialize CacheKey should always succeed");
let _res = self.db.put(serialized_key, content.to_bytes());
let _res = self.db.put(serialized_key, content.to_bytes())?;
trace!(
"put_page_image rel {} blk {} at {}",
@@ -830,6 +893,7 @@ impl Timeline for RocksTimeline {
lsn
);
self.num_page_images.fetch_add(1, Ordering::Relaxed);
Ok(())
}
fn put_create_database(

View File

@@ -144,7 +144,7 @@ fn import_relfile(
},
blknum,
};
timeline.put_page_image(tag, lsn, Bytes::copy_from_slice(&buf));
timeline.put_page_image(tag, lsn, Bytes::copy_from_slice(&buf))?;
/*
if oldest_lsn == 0 || p.lsn < oldest_lsn {
oldest_lsn = p.lsn;

View File

@@ -264,7 +264,8 @@ pub struct DecodedBkpBlock {
/* Information on full-page image, if any */
has_image: bool, /* has image, even for consistency checking */
pub apply_image: bool, /* has image that should be restored */
pub will_init: bool,
pub will_init: bool, /* record intialize page content */
pub will_drop: bool, /* record drops relation */
//char *bkp_image;
hole_offset: u16,
hole_length: u16,
@@ -289,6 +290,7 @@ impl DecodedBkpBlock {
has_image: false,
apply_image: false,
will_init: false,
will_drop: false,
hole_offset: 0,
hole_length: 0,
bimg_len: 0,
@@ -804,7 +806,13 @@ pub fn decode_wal_record(record: Bytes) -> DecodedWALRecord {
let spcnode = buf.get_u32_le();
let dbnode = buf.get_u32_le();
let relnode = buf.get_u32_le();
//TODO handle this too?
let mut blk = DecodedBkpBlock::new();
blk.forknum = pg_constants::MAIN_FORKNUM;
blk.rnode_spcnode = spcnode;
blk.rnode_dbnode = dbnode;
blk.rnode_relnode = relnode;
blk.will_drop = true;
blocks.push(blk);
trace!(
"XLOG_XACT_COMMIT relfilenode {}/{}/{}",
spcnode,
@@ -850,7 +858,13 @@ pub fn decode_wal_record(record: Bytes) -> DecodedWALRecord {
let spcnode = buf.get_u32_le();
let dbnode = buf.get_u32_le();
let relnode = buf.get_u32_le();
//TODO save these too
let mut blk = DecodedBkpBlock::new();
blk.forknum = pg_constants::MAIN_FORKNUM;
blk.rnode_spcnode = spcnode;
blk.rnode_dbnode = dbnode;
blk.rnode_relnode = relnode;
blk.will_drop = true;
blocks.push(blk);
trace!(
"XLOG_XACT_ABORT relfilenode {}/{}/{}",
spcnode,

View File

@@ -20,6 +20,8 @@ pub const ROCKSDB_SPECIAL_FORKNUM: u8 = 50;
// From storage_xlog.h
pub const SMGR_TRUNCATE_HEAP: u32 = 0x0001;
pub const SMGR_TRUNCATE_VM: u32 = 0x0002;
pub const SMGR_TRUNCATE_FSM: u32 = 0x0004;
//
// Constants from visbilitymap.h