mirror of
https://github.com/neondatabase/neon.git
synced 2026-03-12 12:50:37 +00:00
Compare commits
5 Commits
split-prox
...
drop_trunc
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
e95c2066b0 | ||
|
|
f2e57e1348 | ||
|
|
736d387f7b | ||
|
|
e6926d1794 | ||
|
|
a29fa4b8be |
@@ -4,6 +4,7 @@ use crate::waldecoder::{DecodedWALRecord, Oid, XlCreateDatabase, XlSmgrTruncate}
|
|||||||
use crate::ZTimelineId;
|
use crate::ZTimelineId;
|
||||||
use anyhow::Result;
|
use anyhow::Result;
|
||||||
use bytes::{Buf, BufMut, Bytes, BytesMut};
|
use bytes::{Buf, BufMut, Bytes, BytesMut};
|
||||||
|
use log::*;
|
||||||
use postgres_ffi::pg_constants;
|
use postgres_ffi::pg_constants;
|
||||||
use postgres_ffi::relfile_utils::forknumber_to_name;
|
use postgres_ffi::relfile_utils::forknumber_to_name;
|
||||||
use serde::{Deserialize, Serialize};
|
use serde::{Deserialize, Serialize};
|
||||||
@@ -55,14 +56,17 @@ pub trait Timeline {
|
|||||||
///
|
///
|
||||||
/// This will implicitly extend the relation, if the page is beyond the
|
/// This will implicitly extend the relation, if the page is beyond the
|
||||||
/// current end-of-file.
|
/// current end-of-file.
|
||||||
fn put_wal_record(&self, tag: BufferTag, rec: WALRecord);
|
fn put_wal_record(&self, tag: BufferTag, rec: WALRecord) -> Result<()>;
|
||||||
|
|
||||||
/// Like put_wal_record, but with ready-made image of the page.
|
/// Like put_wal_record, but with ready-made image of the page.
|
||||||
fn put_page_image(&self, tag: BufferTag, lsn: Lsn, img: Bytes);
|
fn put_page_image(&self, tag: BufferTag, lsn: Lsn, img: Bytes) -> Result<()>;
|
||||||
|
|
||||||
/// Truncate relation
|
/// Truncate relation
|
||||||
fn put_truncation(&self, rel: RelTag, lsn: Lsn, nblocks: u32) -> Result<()>;
|
fn put_truncation(&self, rel: RelTag, lsn: Lsn, nblocks: u32) -> Result<()>;
|
||||||
|
|
||||||
|
/// Drop relation or file segment
|
||||||
|
fn put_drop(&self, tag: BufferTag, lsn: Lsn) -> Result<()>;
|
||||||
|
|
||||||
/// Create a new database from a template database
|
/// Create a new database from a template database
|
||||||
///
|
///
|
||||||
/// In PostgreSQL, CREATE DATABASE works by scanning the data directory and
|
/// In PostgreSQL, CREATE DATABASE works by scanning the data directory and
|
||||||
@@ -100,14 +104,18 @@ pub trait Timeline {
|
|||||||
blknum: blk.blkno,
|
blknum: blk.blkno,
|
||||||
};
|
};
|
||||||
|
|
||||||
let rec = WALRecord {
|
if blk.will_drop {
|
||||||
lsn,
|
self.put_drop(tag, lsn)?;
|
||||||
will_init: blk.will_init || blk.apply_image,
|
} else {
|
||||||
rec: recdata.clone(),
|
let rec = WALRecord {
|
||||||
main_data_offset: decoded.main_data_offset as u32,
|
lsn,
|
||||||
};
|
will_init: blk.will_init || blk.apply_image,
|
||||||
|
rec: recdata.clone(),
|
||||||
|
main_data_offset: decoded.main_data_offset as u32,
|
||||||
|
};
|
||||||
|
|
||||||
self.put_wal_record(tag, rec);
|
self.put_wal_record(tag, rec)?;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Handle a few special record types
|
// Handle a few special record types
|
||||||
@@ -116,15 +124,37 @@ pub trait Timeline {
|
|||||||
== pg_constants::XLOG_SMGR_TRUNCATE
|
== pg_constants::XLOG_SMGR_TRUNCATE
|
||||||
{
|
{
|
||||||
let truncate = XlSmgrTruncate::decode(&decoded);
|
let truncate = XlSmgrTruncate::decode(&decoded);
|
||||||
|
let mut rel = RelTag {
|
||||||
|
spcnode: truncate.rnode.spcnode,
|
||||||
|
dbnode: truncate.rnode.dbnode,
|
||||||
|
relnode: truncate.rnode.relnode,
|
||||||
|
forknum: pg_constants::MAIN_FORKNUM,
|
||||||
|
};
|
||||||
if (truncate.flags & pg_constants::SMGR_TRUNCATE_HEAP) != 0 {
|
if (truncate.flags & pg_constants::SMGR_TRUNCATE_HEAP) != 0 {
|
||||||
let rel = RelTag {
|
|
||||||
spcnode: truncate.rnode.spcnode,
|
|
||||||
dbnode: truncate.rnode.dbnode,
|
|
||||||
relnode: truncate.rnode.relnode,
|
|
||||||
forknum: pg_constants::MAIN_FORKNUM,
|
|
||||||
};
|
|
||||||
self.put_truncation(rel, lsn, truncate.blkno)?;
|
self.put_truncation(rel, lsn, truncate.blkno)?;
|
||||||
}
|
}
|
||||||
|
if (truncate.flags & pg_constants::SMGR_TRUNCATE_FSM) != 0 {
|
||||||
|
if truncate.blkno == 0 {
|
||||||
|
rel.forknum = pg_constants::FSM_FORKNUM;
|
||||||
|
self.put_truncation(rel, lsn, truncate.blkno)?;
|
||||||
|
} else {
|
||||||
|
// TODO: handle partial truncation of FSM:
|
||||||
|
// need to map heap block number to FSM block number
|
||||||
|
// and clear bits in the tail block
|
||||||
|
info!("Partial truncation of FSM is not supported");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if (truncate.flags & pg_constants::SMGR_TRUNCATE_VM) != 0 {
|
||||||
|
if truncate.blkno == 0 {
|
||||||
|
rel.forknum = pg_constants::VISIBILITYMAP_FORKNUM;
|
||||||
|
self.put_truncation(rel, lsn, truncate.blkno)?;
|
||||||
|
} else {
|
||||||
|
// TODO: handle partial truncation of VM:
|
||||||
|
// need to map heap block number to VM block number
|
||||||
|
// and clear bits in the tail block
|
||||||
|
info!("Partial truncation of VM is not supported");
|
||||||
|
}
|
||||||
|
}
|
||||||
} else if decoded.xl_rmid == pg_constants::RM_DBASE_ID
|
} else if decoded.xl_rmid == pg_constants::RM_DBASE_ID
|
||||||
&& (decoded.xl_info & pg_constants::XLR_RMGR_INFO_MASK)
|
&& (decoded.xl_info & pg_constants::XLR_RMGR_INFO_MASK)
|
||||||
== pg_constants::XLOG_DBASE_CREATE
|
== pg_constants::XLOG_DBASE_CREATE
|
||||||
@@ -348,11 +378,11 @@ mod tests {
|
|||||||
let tline = repo.create_empty_timeline(timelineid, Lsn(0))?;
|
let tline = repo.create_empty_timeline(timelineid, Lsn(0))?;
|
||||||
|
|
||||||
tline.init_valid_lsn(Lsn(1));
|
tline.init_valid_lsn(Lsn(1));
|
||||||
tline.put_page_image(TEST_BUF(0), Lsn(2), TEST_IMG("foo blk 0 at 2"));
|
tline.put_page_image(TEST_BUF(0), Lsn(2), TEST_IMG("foo blk 0 at 2"))?;
|
||||||
tline.put_page_image(TEST_BUF(0), Lsn(2), TEST_IMG("foo blk 0 at 2"));
|
tline.put_page_image(TEST_BUF(0), Lsn(2), TEST_IMG("foo blk 0 at 2"))?;
|
||||||
tline.put_page_image(TEST_BUF(0), Lsn(3), TEST_IMG("foo blk 0 at 3"));
|
tline.put_page_image(TEST_BUF(0), Lsn(3), TEST_IMG("foo blk 0 at 3"))?;
|
||||||
tline.put_page_image(TEST_BUF(1), Lsn(4), TEST_IMG("foo blk 1 at 4"));
|
tline.put_page_image(TEST_BUF(1), Lsn(4), TEST_IMG("foo blk 1 at 4"))?;
|
||||||
tline.put_page_image(TEST_BUF(2), Lsn(5), TEST_IMG("foo blk 2 at 5"));
|
tline.put_page_image(TEST_BUF(2), Lsn(5), TEST_IMG("foo blk 2 at 5"))?;
|
||||||
|
|
||||||
tline.advance_last_valid_lsn(Lsn(5));
|
tline.advance_last_valid_lsn(Lsn(5));
|
||||||
|
|
||||||
@@ -442,7 +472,7 @@ mod tests {
|
|||||||
for i in 0..pg_constants::RELSEG_SIZE + 1 {
|
for i in 0..pg_constants::RELSEG_SIZE + 1 {
|
||||||
let img = TEST_IMG(&format!("foo blk {} at {}", i, Lsn(lsn)));
|
let img = TEST_IMG(&format!("foo blk {} at {}", i, Lsn(lsn)));
|
||||||
lsn += 1;
|
lsn += 1;
|
||||||
tline.put_page_image(TEST_BUF(i as u32), Lsn(lsn), img);
|
tline.put_page_image(TEST_BUF(i as u32), Lsn(lsn), img)?;
|
||||||
}
|
}
|
||||||
tline.advance_last_valid_lsn(Lsn(lsn));
|
tline.advance_last_valid_lsn(Lsn(lsn));
|
||||||
|
|
||||||
|
|||||||
@@ -127,10 +127,12 @@ impl CacheKey {
|
|||||||
static LAST_VALID_LSN_KEY: CacheKey = CacheKey::special(0);
|
static LAST_VALID_LSN_KEY: CacheKey = CacheKey::special(0);
|
||||||
static LAST_VALID_RECORD_LSN_KEY: CacheKey = CacheKey::special(1);
|
static LAST_VALID_RECORD_LSN_KEY: CacheKey = CacheKey::special(1);
|
||||||
|
|
||||||
|
#[derive(Debug)]
|
||||||
enum CacheEntryContent {
|
enum CacheEntryContent {
|
||||||
PageImage(Bytes),
|
PageImage(Bytes),
|
||||||
WALRecord(WALRecord),
|
WALRecord(WALRecord),
|
||||||
Truncation,
|
Truncation,
|
||||||
|
Drop,
|
||||||
}
|
}
|
||||||
|
|
||||||
// The serialized representation of a CacheEntryContent begins with
|
// The serialized representation of a CacheEntryContent begins with
|
||||||
@@ -138,9 +140,10 @@ enum CacheEntryContent {
|
|||||||
// an UNUSED_VERSION_FLAG that is not represented in the CacheEntryContent
|
// an UNUSED_VERSION_FLAG that is not represented in the CacheEntryContent
|
||||||
// at all, you must peek into the first byte of the serialized representation
|
// at all, you must peek into the first byte of the serialized representation
|
||||||
// to read it.
|
// to read it.
|
||||||
const CONTENT_PAGE_IMAGE: u8 = 1u8;
|
const CONTENT_PAGE_IMAGE: u8 = 0u8;
|
||||||
const CONTENT_WAL_RECORD: u8 = 2u8;
|
const CONTENT_WAL_RECORD: u8 = 1u8;
|
||||||
const CONTENT_TRUNCATION: u8 = 3u8;
|
const CONTENT_TRUNCATION: u8 = 2u8;
|
||||||
|
const CONTENT_DROP: u8 = 3u8;
|
||||||
|
|
||||||
const CONTENT_KIND_MASK: u8 = 3u8; // bitmask that covers the above
|
const CONTENT_KIND_MASK: u8 = 3u8; // bitmask that covers the above
|
||||||
|
|
||||||
@@ -161,6 +164,9 @@ impl CacheEntryContent {
|
|||||||
CacheEntryContent::Truncation => {
|
CacheEntryContent::Truncation => {
|
||||||
buf.put_u8(CONTENT_TRUNCATION);
|
buf.put_u8(CONTENT_TRUNCATION);
|
||||||
}
|
}
|
||||||
|
CacheEntryContent::Drop => {
|
||||||
|
buf.put_u8(CONTENT_DROP);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
pub fn unpack(buf: &mut Bytes) -> CacheEntryContent {
|
pub fn unpack(buf: &mut Bytes) -> CacheEntryContent {
|
||||||
@@ -175,6 +181,7 @@ impl CacheEntryContent {
|
|||||||
}
|
}
|
||||||
CONTENT_WAL_RECORD => CacheEntryContent::WALRecord(WALRecord::unpack(buf)),
|
CONTENT_WAL_RECORD => CacheEntryContent::WALRecord(WALRecord::unpack(buf)),
|
||||||
CONTENT_TRUNCATION => CacheEntryContent::Truncation,
|
CONTENT_TRUNCATION => CacheEntryContent::Truncation,
|
||||||
|
CONTENT_DROP => CacheEntryContent::Drop,
|
||||||
_ => unreachable!(),
|
_ => unreachable!(),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -296,8 +303,9 @@ impl RocksTimeline {
|
|||||||
let mut opts = rocksdb::Options::default();
|
let mut opts = rocksdb::Options::default();
|
||||||
opts.set_use_fsync(true);
|
opts.set_use_fsync(true);
|
||||||
opts.set_compression_type(rocksdb::DBCompressionType::Lz4);
|
opts.set_compression_type(rocksdb::DBCompressionType::Lz4);
|
||||||
opts.set_compaction_filter("ttl", move |_level: u32, _key: &[u8], val: &[u8]| {
|
opts.set_compaction_filter("ttl", move |_level: u32, key: &[u8], val: &[u8]| {
|
||||||
if (val[0] & UNUSED_VERSION_FLAG) != 0 {
|
if (val[0] & UNUSED_VERSION_FLAG) != 0 {
|
||||||
|
trace!("Delete unused version {:?}", CacheKey::des(key));
|
||||||
rocksdb::compaction_filter::Decision::Remove
|
rocksdb::compaction_filter::Decision::Remove
|
||||||
} else {
|
} else {
|
||||||
rocksdb::compaction_filter::Decision::Keep
|
rocksdb::compaction_filter::Decision::Keep
|
||||||
@@ -480,9 +488,48 @@ impl RocksTimeline {
|
|||||||
Ok(0)
|
Ok(0)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
///
|
||||||
|
/// Drop relations with all its forks
|
||||||
|
///
|
||||||
|
fn delete_entries(&self, tag: BufferTag) -> Result<()> {
|
||||||
|
let mut iter = self.db.raw_iterator();
|
||||||
|
let mut key = CacheKey {
|
||||||
|
tag,
|
||||||
|
lsn: Lsn(u64::MAX),
|
||||||
|
};
|
||||||
|
if tag.rel.forknum == pg_constants::MAIN_FORKNUM {
|
||||||
|
// if it is relation then remove all its blocks in all forks
|
||||||
|
key.tag.blknum = u32::MAX;
|
||||||
|
key.tag.rel.forknum = pg_constants::INIT_FORKNUM;
|
||||||
|
} else {
|
||||||
|
assert!(tag.rel.forknum > pg_constants::INIT_FORKNUM);
|
||||||
|
}
|
||||||
|
debug!("Drop relation {:?}", tag);
|
||||||
|
iter.seek_for_prev(key.ser()?);
|
||||||
|
while iter.valid() {
|
||||||
|
let key = CacheKey::des(iter.key().unwrap())?;
|
||||||
|
if key.tag.rel != tag.rel {
|
||||||
|
// no more entries belonging to this relation
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
let v = iter.value().unwrap();
|
||||||
|
if (v[0] & UNUSED_VERSION_FLAG) == 0 {
|
||||||
|
let mut v = v.to_owned();
|
||||||
|
v[0] |= UNUSED_VERSION_FLAG;
|
||||||
|
self.db.put(key.ser()?, &v[..])?;
|
||||||
|
} else {
|
||||||
|
// already marked for deletion
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
iter.prev();
|
||||||
|
}
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
fn do_gc(&self, conf: &'static PageServerConf) -> Result<Bytes> {
|
fn do_gc(&self, conf: &'static PageServerConf) -> Result<Bytes> {
|
||||||
loop {
|
loop {
|
||||||
thread::sleep(conf.gc_period);
|
thread::sleep(conf.gc_period);
|
||||||
|
trace!("Start GC iteration");
|
||||||
let last_lsn = self.get_last_valid_lsn();
|
let last_lsn = self.get_last_valid_lsn();
|
||||||
|
|
||||||
// checked_sub() returns None on overflow.
|
// checked_sub() returns None on overflow.
|
||||||
@@ -510,17 +557,27 @@ impl RocksTimeline {
|
|||||||
if iter.valid() {
|
if iter.valid() {
|
||||||
let key = CacheKey::des(iter.key().unwrap())?;
|
let key = CacheKey::des(iter.key().unwrap())?;
|
||||||
let v = iter.value().unwrap();
|
let v = iter.value().unwrap();
|
||||||
|
let flag = v[0];
|
||||||
|
let last_lsn = key.lsn;
|
||||||
|
|
||||||
inspected += 1;
|
inspected += 1;
|
||||||
|
|
||||||
// Construct boundaries for old records cleanup
|
// Construct boundaries for old records cleanup
|
||||||
maxkey.tag = key.tag;
|
maxkey.tag = key.tag;
|
||||||
let last_lsn = key.lsn;
|
|
||||||
maxkey.lsn = min(horizon, last_lsn); // do not remove last version
|
maxkey.lsn = min(horizon, last_lsn); // do not remove last version
|
||||||
|
|
||||||
let mut minkey = maxkey.clone();
|
let mut minkey = maxkey.clone();
|
||||||
minkey.lsn = Lsn(0); // first version
|
minkey.lsn = Lsn(0); // first version
|
||||||
|
|
||||||
|
if (flag & CONTENT_KIND_MASK) == CONTENT_DROP {
|
||||||
|
// If drop record in over the horizon then delete all entries from repository
|
||||||
|
if last_lsn < horizon {
|
||||||
|
self.delete_entries(key.tag)?;
|
||||||
|
}
|
||||||
|
maxkey = minkey;
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
// reconstruct most recent page version
|
// reconstruct most recent page version
|
||||||
if (v[0] & CONTENT_KIND_MASK) == CONTENT_WAL_RECORD {
|
if (v[0] & CONTENT_KIND_MASK) == CONTENT_WAL_RECORD {
|
||||||
// force reconstruction of most recent page version
|
// force reconstruction of most recent page version
|
||||||
@@ -538,7 +595,7 @@ impl RocksTimeline {
|
|||||||
let new_img = self
|
let new_img = self
|
||||||
.walredo_mgr
|
.walredo_mgr
|
||||||
.request_redo(key.tag, key.lsn, base_img, records)?;
|
.request_redo(key.tag, key.lsn, base_img, records)?;
|
||||||
self.put_page_image(key.tag, key.lsn, new_img.clone());
|
self.put_page_image(key.tag, key.lsn, new_img.clone())?;
|
||||||
|
|
||||||
reconstructed += 1;
|
reconstructed += 1;
|
||||||
}
|
}
|
||||||
@@ -559,7 +616,7 @@ impl RocksTimeline {
|
|||||||
let new_img = self
|
let new_img = self
|
||||||
.walredo_mgr
|
.walredo_mgr
|
||||||
.request_redo(key.tag, key.lsn, base_img, records)?;
|
.request_redo(key.tag, key.lsn, base_img, records)?;
|
||||||
self.put_page_image(key.tag, key.lsn, new_img.clone());
|
self.put_page_image(key.tag, key.lsn, new_img.clone())?;
|
||||||
|
|
||||||
truncated += 1;
|
truncated += 1;
|
||||||
} else {
|
} else {
|
||||||
@@ -663,6 +720,7 @@ impl Timeline for RocksTimeline {
|
|||||||
///
|
///
|
||||||
fn get_page_at_lsn(&self, tag: BufferTag, req_lsn: Lsn) -> Result<Bytes> {
|
fn get_page_at_lsn(&self, tag: BufferTag, req_lsn: Lsn) -> Result<Bytes> {
|
||||||
self.num_getpage_requests.fetch_add(1, Ordering::Relaxed);
|
self.num_getpage_requests.fetch_add(1, Ordering::Relaxed);
|
||||||
|
const ZERO_PAGE: [u8; 8192] = [0u8; 8192];
|
||||||
|
|
||||||
let lsn = self.wait_lsn(req_lsn)?;
|
let lsn = self.wait_lsn(req_lsn)?;
|
||||||
|
|
||||||
@@ -685,7 +743,10 @@ impl Timeline for RocksTimeline {
|
|||||||
let (base_img, records) = self.collect_records_for_apply(tag, lsn);
|
let (base_img, records) = self.collect_records_for_apply(tag, lsn);
|
||||||
page_img = self.walredo_mgr.request_redo(tag, lsn, base_img, records)?;
|
page_img = self.walredo_mgr.request_redo(tag, lsn, base_img, records)?;
|
||||||
|
|
||||||
self.put_page_image(tag, lsn, page_img.clone());
|
self.put_page_image(tag, lsn, page_img.clone())?;
|
||||||
|
} else if let CacheEntryContent::Truncation = content {
|
||||||
|
// Because FSM pages may be not up-to-date we can request truncated page
|
||||||
|
return Ok(Bytes::from_static(&ZERO_PAGE));
|
||||||
} else {
|
} else {
|
||||||
// No base image, and no WAL record. Huh?
|
// No base image, and no WAL record. Huh?
|
||||||
bail!("no page image or WAL record for requested page");
|
bail!("no page image or WAL record for requested page");
|
||||||
@@ -702,7 +763,6 @@ impl Timeline for RocksTimeline {
|
|||||||
return Ok(page_img);
|
return Ok(page_img);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
static ZERO_PAGE: [u8; 8192] = [0u8; 8192];
|
|
||||||
debug!(
|
debug!(
|
||||||
"Page {} blk {} at {}({}) not found",
|
"Page {} blk {} at {}({}) not found",
|
||||||
tag.rel, tag.blknum, req_lsn, lsn
|
tag.rel, tag.blknum, req_lsn, lsn
|
||||||
@@ -752,14 +812,14 @@ impl Timeline for RocksTimeline {
|
|||||||
///
|
///
|
||||||
/// Adds a WAL record to the repository
|
/// Adds a WAL record to the repository
|
||||||
///
|
///
|
||||||
fn put_wal_record(&self, tag: BufferTag, rec: WALRecord) {
|
fn put_wal_record(&self, tag: BufferTag, rec: WALRecord) -> Result<()> {
|
||||||
let lsn = rec.lsn;
|
let lsn = rec.lsn;
|
||||||
let key = CacheKey { tag, lsn };
|
let key = CacheKey { tag, lsn };
|
||||||
|
|
||||||
let content = CacheEntryContent::WALRecord(rec);
|
let content = CacheEntryContent::WALRecord(rec);
|
||||||
|
|
||||||
let serialized_key = key.ser().expect("serialize CacheKey should always succeed");
|
let serialized_key = key.ser().expect("serialize CacheKey should always succeed");
|
||||||
let _res = self.db.put(serialized_key, content.to_bytes());
|
self.db.put(serialized_key, content.to_bytes())?;
|
||||||
trace!(
|
trace!(
|
||||||
"put_wal_record rel {} blk {} at {}",
|
"put_wal_record rel {} blk {} at {}",
|
||||||
tag.rel,
|
tag.rel,
|
||||||
@@ -769,6 +829,18 @@ impl Timeline for RocksTimeline {
|
|||||||
|
|
||||||
self.num_entries.fetch_add(1, Ordering::Relaxed);
|
self.num_entries.fetch_add(1, Ordering::Relaxed);
|
||||||
self.num_wal_records.fetch_add(1, Ordering::Relaxed);
|
self.num_wal_records.fetch_add(1, Ordering::Relaxed);
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
///
|
||||||
|
/// Put drop record which should completely delete relation with all its forks
|
||||||
|
/// or non-relational file from repository
|
||||||
|
///
|
||||||
|
fn put_drop(&self, tag: BufferTag, lsn: Lsn) -> Result<()> {
|
||||||
|
let key = CacheKey { tag, lsn };
|
||||||
|
let content = CacheEntryContent::Drop;
|
||||||
|
self.db.put(key.ser()?, content.to_bytes())?;
|
||||||
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
///
|
///
|
||||||
@@ -790,7 +862,7 @@ impl Timeline for RocksTimeline {
|
|||||||
lsn,
|
lsn,
|
||||||
};
|
};
|
||||||
trace!("put_wal_record lsn: {}", key.lsn);
|
trace!("put_wal_record lsn: {}", key.lsn);
|
||||||
let _res = self.db.put(key.ser()?, content.to_bytes());
|
self.db.put(key.ser()?, content.to_bytes())?;
|
||||||
}
|
}
|
||||||
let n = (old_rel_size - nblocks) as u64;
|
let n = (old_rel_size - nblocks) as u64;
|
||||||
self.num_entries.fetch_add(n, Ordering::Relaxed);
|
self.num_entries.fetch_add(n, Ordering::Relaxed);
|
||||||
@@ -801,27 +873,13 @@ impl Timeline for RocksTimeline {
|
|||||||
///
|
///
|
||||||
/// Memorize a full image of a page version
|
/// Memorize a full image of a page version
|
||||||
///
|
///
|
||||||
fn put_page_image(&self, tag: BufferTag, lsn: Lsn, img: Bytes) {
|
fn put_page_image(&self, tag: BufferTag, lsn: Lsn, img: Bytes) -> Result<()> {
|
||||||
let img_len = img.len();
|
|
||||||
let key = CacheKey { tag, lsn };
|
let key = CacheKey { tag, lsn };
|
||||||
let content = CacheEntryContent::PageImage(img);
|
let content = CacheEntryContent::PageImage(img);
|
||||||
|
|
||||||
let mut val_buf = content.to_bytes();
|
|
||||||
|
|
||||||
// Zero size of page image indicates that page can be removed
|
|
||||||
if img_len == 0 {
|
|
||||||
if (val_buf[0] & UNUSED_VERSION_FLAG) != 0 {
|
|
||||||
// records already marked for deletion
|
|
||||||
return;
|
|
||||||
} else {
|
|
||||||
// delete truncated multixact page
|
|
||||||
val_buf[0] |= UNUSED_VERSION_FLAG;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
trace!("put_wal_record lsn: {}", key.lsn);
|
trace!("put_wal_record lsn: {}", key.lsn);
|
||||||
let serialized_key = key.ser().expect("serialize CacheKey should always succeed");
|
let serialized_key = key.ser().expect("serialize CacheKey should always succeed");
|
||||||
let _res = self.db.put(serialized_key, content.to_bytes());
|
self.db.put(serialized_key, content.to_bytes())?;
|
||||||
|
|
||||||
trace!(
|
trace!(
|
||||||
"put_page_image rel {} blk {} at {}",
|
"put_page_image rel {} blk {} at {}",
|
||||||
@@ -830,6 +888,7 @@ impl Timeline for RocksTimeline {
|
|||||||
lsn
|
lsn
|
||||||
);
|
);
|
||||||
self.num_page_images.fetch_add(1, Ordering::Relaxed);
|
self.num_page_images.fetch_add(1, Ordering::Relaxed);
|
||||||
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
fn put_create_database(
|
fn put_create_database(
|
||||||
|
|||||||
@@ -144,7 +144,7 @@ fn import_relfile(
|
|||||||
},
|
},
|
||||||
blknum,
|
blknum,
|
||||||
};
|
};
|
||||||
timeline.put_page_image(tag, lsn, Bytes::copy_from_slice(&buf));
|
timeline.put_page_image(tag, lsn, Bytes::copy_from_slice(&buf))?;
|
||||||
/*
|
/*
|
||||||
if oldest_lsn == 0 || p.lsn < oldest_lsn {
|
if oldest_lsn == 0 || p.lsn < oldest_lsn {
|
||||||
oldest_lsn = p.lsn;
|
oldest_lsn = p.lsn;
|
||||||
|
|||||||
@@ -264,7 +264,8 @@ pub struct DecodedBkpBlock {
|
|||||||
/* Information on full-page image, if any */
|
/* Information on full-page image, if any */
|
||||||
has_image: bool, /* has image, even for consistency checking */
|
has_image: bool, /* has image, even for consistency checking */
|
||||||
pub apply_image: bool, /* has image that should be restored */
|
pub apply_image: bool, /* has image that should be restored */
|
||||||
pub will_init: bool,
|
pub will_init: bool, /* record intialize page content */
|
||||||
|
pub will_drop: bool, /* record drops relation */
|
||||||
//char *bkp_image;
|
//char *bkp_image;
|
||||||
hole_offset: u16,
|
hole_offset: u16,
|
||||||
hole_length: u16,
|
hole_length: u16,
|
||||||
@@ -289,6 +290,7 @@ impl DecodedBkpBlock {
|
|||||||
has_image: false,
|
has_image: false,
|
||||||
apply_image: false,
|
apply_image: false,
|
||||||
will_init: false,
|
will_init: false,
|
||||||
|
will_drop: false,
|
||||||
hole_offset: 0,
|
hole_offset: 0,
|
||||||
hole_length: 0,
|
hole_length: 0,
|
||||||
bimg_len: 0,
|
bimg_len: 0,
|
||||||
@@ -804,7 +806,13 @@ pub fn decode_wal_record(record: Bytes) -> DecodedWALRecord {
|
|||||||
let spcnode = buf.get_u32_le();
|
let spcnode = buf.get_u32_le();
|
||||||
let dbnode = buf.get_u32_le();
|
let dbnode = buf.get_u32_le();
|
||||||
let relnode = buf.get_u32_le();
|
let relnode = buf.get_u32_le();
|
||||||
//TODO handle this too?
|
let mut blk = DecodedBkpBlock::new();
|
||||||
|
blk.forknum = pg_constants::MAIN_FORKNUM;
|
||||||
|
blk.rnode_spcnode = spcnode;
|
||||||
|
blk.rnode_dbnode = dbnode;
|
||||||
|
blk.rnode_relnode = relnode;
|
||||||
|
blk.will_drop = true;
|
||||||
|
blocks.push(blk);
|
||||||
trace!(
|
trace!(
|
||||||
"XLOG_XACT_COMMIT relfilenode {}/{}/{}",
|
"XLOG_XACT_COMMIT relfilenode {}/{}/{}",
|
||||||
spcnode,
|
spcnode,
|
||||||
@@ -850,7 +858,13 @@ pub fn decode_wal_record(record: Bytes) -> DecodedWALRecord {
|
|||||||
let spcnode = buf.get_u32_le();
|
let spcnode = buf.get_u32_le();
|
||||||
let dbnode = buf.get_u32_le();
|
let dbnode = buf.get_u32_le();
|
||||||
let relnode = buf.get_u32_le();
|
let relnode = buf.get_u32_le();
|
||||||
//TODO save these too
|
let mut blk = DecodedBkpBlock::new();
|
||||||
|
blk.forknum = pg_constants::MAIN_FORKNUM;
|
||||||
|
blk.rnode_spcnode = spcnode;
|
||||||
|
blk.rnode_dbnode = dbnode;
|
||||||
|
blk.rnode_relnode = relnode;
|
||||||
|
blk.will_drop = true;
|
||||||
|
blocks.push(blk);
|
||||||
trace!(
|
trace!(
|
||||||
"XLOG_XACT_ABORT relfilenode {}/{}/{}",
|
"XLOG_XACT_ABORT relfilenode {}/{}/{}",
|
||||||
spcnode,
|
spcnode,
|
||||||
|
|||||||
@@ -20,6 +20,8 @@ pub const ROCKSDB_SPECIAL_FORKNUM: u8 = 50;
|
|||||||
|
|
||||||
// From storage_xlog.h
|
// From storage_xlog.h
|
||||||
pub const SMGR_TRUNCATE_HEAP: u32 = 0x0001;
|
pub const SMGR_TRUNCATE_HEAP: u32 = 0x0001;
|
||||||
|
pub const SMGR_TRUNCATE_VM: u32 = 0x0002;
|
||||||
|
pub const SMGR_TRUNCATE_FSM: u32 = 0x0004;
|
||||||
|
|
||||||
//
|
//
|
||||||
// Constants from visbilitymap.h
|
// Constants from visbilitymap.h
|
||||||
|
|||||||
9
test_runner/zenith_regress/expected/zenith-truncate.out
Normal file
9
test_runner/zenith_regress/expected/zenith-truncate.out
Normal file
@@ -0,0 +1,9 @@
|
|||||||
|
create table tt(x integer);
|
||||||
|
insert into tt values (generate_series(1,10000));
|
||||||
|
delete from tt;
|
||||||
|
vacuum tt;
|
||||||
|
insert into tt values (generate_series(1,10000));
|
||||||
|
delete from tt;
|
||||||
|
vacuum tt;
|
||||||
|
insert into tt values (generate_series(1,10000));
|
||||||
|
drop table tt;
|
||||||
@@ -9,3 +9,4 @@ test: zenith-cid
|
|||||||
test: zenith-rel-truncate
|
test: zenith-rel-truncate
|
||||||
test: zenith-clog
|
test: zenith-clog
|
||||||
test: zenith-vacuum-full
|
test: zenith-vacuum-full
|
||||||
|
test: zenith-truncate
|
||||||
|
|||||||
@@ -4,3 +4,4 @@ test: zenith-cid
|
|||||||
test: zenith-rel-truncate
|
test: zenith-rel-truncate
|
||||||
test: zenith-clog
|
test: zenith-clog
|
||||||
test: zenith-vacuum-full
|
test: zenith-vacuum-full
|
||||||
|
test: zenith-truncate
|
||||||
|
|||||||
9
test_runner/zenith_regress/sql/zenith-truncate.sql
Normal file
9
test_runner/zenith_regress/sql/zenith-truncate.sql
Normal file
@@ -0,0 +1,9 @@
|
|||||||
|
create table tt(x integer);
|
||||||
|
insert into tt values (generate_series(1,10000));
|
||||||
|
delete from tt;
|
||||||
|
vacuum tt;
|
||||||
|
insert into tt values (generate_series(1,10000));
|
||||||
|
delete from tt;
|
||||||
|
vacuum tt;
|
||||||
|
insert into tt values (generate_series(1,10000));
|
||||||
|
drop table tt;
|
||||||
Reference in New Issue
Block a user