mirror of
https://github.com/neondatabase/neon.git
synced 2026-01-15 01:12:56 +00:00
Refactor the way truncations are handled.
Currently, truncation is implemented in the RocksDB repository by storing a special sentinel entry for each page that was truncated away. Hide that implementation detail better in the abstract Repository interface, so that caller doesn't need to construct the special sentinel WAL record. While we're at it, refactor the CacheEntryContent struct to an enum.
This commit is contained in:
@@ -36,7 +36,7 @@ pub trait Timeline {
|
||||
// Functions used by WAL receiver
|
||||
|
||||
fn put_wal_record(&self, tag: BufferTag, rec: WALRecord);
|
||||
fn put_rel_wal_record(&self, tag: BufferTag, rec: WALRecord) -> anyhow::Result<()>;
|
||||
fn put_truncation(&self, rel: RelTag, lsn: Lsn, nblocks: u32) -> anyhow::Result<()>;
|
||||
fn put_page_image(&self, tag: BufferTag, lsn: Lsn, img: Bytes);
|
||||
fn create_database(
|
||||
&self,
|
||||
@@ -109,7 +109,6 @@ impl BufferTag {
|
||||
pub struct WALRecord {
|
||||
pub lsn: Lsn, // LSN at the *end* of the record
|
||||
pub will_init: bool,
|
||||
pub truncate: bool,
|
||||
pub rec: Bytes,
|
||||
// Remember the offset of main_data in rec,
|
||||
// so that we don't have to parse the record again.
|
||||
@@ -121,7 +120,6 @@ impl WALRecord {
|
||||
pub fn pack(&self, buf: &mut BytesMut) {
|
||||
buf.put_u64(self.lsn.0);
|
||||
buf.put_u8(self.will_init as u8);
|
||||
buf.put_u8(self.truncate as u8);
|
||||
buf.put_u32(self.main_data_offset);
|
||||
buf.put_u32(self.rec.len() as u32);
|
||||
buf.put_slice(&self.rec[..]);
|
||||
@@ -129,14 +127,12 @@ impl WALRecord {
|
||||
pub fn unpack(buf: &mut BytesMut) -> WALRecord {
|
||||
let lsn = Lsn::from(buf.get_u64());
|
||||
let will_init = buf.get_u8() != 0;
|
||||
let truncate = buf.get_u8() != 0;
|
||||
let main_data_offset = buf.get_u32();
|
||||
let mut dst = vec![0u8; buf.get_u32() as usize];
|
||||
buf.copy_to_slice(&mut dst);
|
||||
WALRecord {
|
||||
lsn,
|
||||
will_init,
|
||||
truncate,
|
||||
rec: Bytes::from(dst),
|
||||
main_data_offset,
|
||||
}
|
||||
|
||||
@@ -98,38 +98,55 @@ impl CacheKey {
|
||||
}
|
||||
}
|
||||
|
||||
pub struct CacheEntryContent {
|
||||
pub page_image: Option<Bytes>,
|
||||
pub wal_record: Option<WALRecord>,
|
||||
enum CacheEntryContent {
|
||||
PageImage(Bytes),
|
||||
WALRecord(WALRecord),
|
||||
Truncation,
|
||||
}
|
||||
|
||||
const PAGE_IMAGE_FLAG: u8 = 1u8;
|
||||
const UNUSED_VERSION_FLAG: u8 = 2u8;
|
||||
// The serialized representation of a CacheEntryContent begins with
|
||||
// single byte that indicates what kind of entry it is. There is also
|
||||
// an UNUSED_VERSION_FLAG that is not represented in the CacheEntryContent
|
||||
// at all, you must peek into the first byte of the serialized representation
|
||||
// to read it.
|
||||
const CONTENT_PAGE_IMAGE: u8 = 1u8;
|
||||
const CONTENT_WAL_RECORD: u8 = 2u8;
|
||||
const CONTENT_TRUNCATION: u8 = 3u8;
|
||||
|
||||
const CONTENT_KIND_MASK: u8 = 3u8; // bitmask that covers the above
|
||||
|
||||
const UNUSED_VERSION_FLAG: u8 = 4u8;
|
||||
|
||||
impl CacheEntryContent {
|
||||
pub fn pack(&self, buf: &mut BytesMut) {
|
||||
if let Some(image) = &self.page_image {
|
||||
buf.put_u8(PAGE_IMAGE_FLAG);
|
||||
buf.put_u16(image.len() as u16);
|
||||
buf.put_slice(&image[..]);
|
||||
} else if let Some(rec) = &self.wal_record {
|
||||
buf.put_u8(0);
|
||||
rec.pack(buf);
|
||||
match self {
|
||||
CacheEntryContent::PageImage(image) => {
|
||||
buf.put_u8(CONTENT_PAGE_IMAGE);
|
||||
buf.put_u16(image.len() as u16);
|
||||
buf.put_slice(&image[..]);
|
||||
}
|
||||
CacheEntryContent::WALRecord(rec) => {
|
||||
buf.put_u8(CONTENT_WAL_RECORD);
|
||||
rec.pack(buf);
|
||||
}
|
||||
CacheEntryContent::Truncation => {
|
||||
buf.put_u8(CONTENT_TRUNCATION);
|
||||
}
|
||||
}
|
||||
}
|
||||
pub fn unpack(buf: &mut BytesMut) -> CacheEntryContent {
|
||||
if (buf.get_u8() & PAGE_IMAGE_FLAG) != 0 {
|
||||
let mut dst = vec![0u8; buf.get_u16() as usize];
|
||||
buf.copy_to_slice(&mut dst);
|
||||
CacheEntryContent {
|
||||
page_image: Some(Bytes::from(dst)),
|
||||
wal_record: None,
|
||||
}
|
||||
} else {
|
||||
CacheEntryContent {
|
||||
page_image: None,
|
||||
wal_record: Some(WALRecord::unpack(buf)),
|
||||
let kind = buf.get_u8() & CONTENT_KIND_MASK;
|
||||
|
||||
match kind {
|
||||
CONTENT_PAGE_IMAGE => {
|
||||
let len = buf.get_u16() as usize;
|
||||
let mut dst = vec![0u8; len];
|
||||
buf.copy_to_slice(&mut dst);
|
||||
CacheEntryContent::PageImage(Bytes::from(dst))
|
||||
}
|
||||
CONTENT_WAL_RECORD => CacheEntryContent::WALRecord(WALRecord::unpack(buf)),
|
||||
CONTENT_TRUNCATION => CacheEntryContent::Truncation,
|
||||
_ => unreachable!(),
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -257,12 +274,12 @@ impl RocksTimeline {
|
||||
buf.clear();
|
||||
buf.extend_from_slice(&v);
|
||||
let content = CacheEntryContent::unpack(&mut buf);
|
||||
if let Some(img) = &content.page_image {
|
||||
if let CacheEntryContent::PageImage(img) = content {
|
||||
// We have a base image. No need to dig deeper into the list of
|
||||
// records
|
||||
base_img = Some(img.clone());
|
||||
base_img = Some(img);
|
||||
break;
|
||||
} else if let Some(rec) = &content.wal_record {
|
||||
} else if let CacheEntryContent::WALRecord(rec) = content {
|
||||
records.push(rec.clone());
|
||||
// If this WAL record initializes the page, no need to dig deeper.
|
||||
if rec.will_init {
|
||||
@@ -311,14 +328,12 @@ impl RocksTimeline {
|
||||
buf.clear();
|
||||
buf.extend_from_slice(&v);
|
||||
let content = CacheEntryContent::unpack(&mut buf);
|
||||
if let Some(rec) = &content.wal_record {
|
||||
if rec.truncate {
|
||||
if tag.blknum > 0 {
|
||||
key.tag.blknum = tag.blknum - 1;
|
||||
continue;
|
||||
}
|
||||
break;
|
||||
if let CacheEntryContent::Truncation = content {
|
||||
if tag.blknum > 0 {
|
||||
key.tag.blknum = tag.blknum - 1;
|
||||
continue;
|
||||
}
|
||||
break;
|
||||
}
|
||||
let relsize = tag.blknum + 1;
|
||||
debug!("Size of relation {:?} at {} is {}", rel, lsn, relsize);
|
||||
@@ -380,7 +395,7 @@ impl RocksTimeline {
|
||||
minkey.lsn = Lsn(0); // first version
|
||||
|
||||
// reconstruct most recent page version
|
||||
if (v[0] & PAGE_IMAGE_FLAG) == 0 {
|
||||
if (v[0] & CONTENT_KIND_MASK) == CONTENT_WAL_RECORD {
|
||||
trace!("Reconstruct most recent page {:?}", key);
|
||||
// force reconstruction of most recent page version
|
||||
let (base_img, records) =
|
||||
@@ -408,7 +423,7 @@ impl RocksTimeline {
|
||||
let key = CacheKey::unpack(&mut buf);
|
||||
if key.tag == maxkey.tag {
|
||||
let v = iter.value().unwrap();
|
||||
if (v[0] & PAGE_IMAGE_FLAG) == 0 {
|
||||
if (v[0] & CONTENT_KIND_MASK) == CONTENT_WAL_RECORD {
|
||||
trace!("Reconstruct horizon page {:?}", key);
|
||||
let (base_img, records) =
|
||||
self.collect_records_for_apply(key.tag, key.lsn);
|
||||
@@ -518,9 +533,9 @@ impl Timeline for RocksTimeline {
|
||||
buf.extend_from_slice(&v);
|
||||
let content = CacheEntryContent::unpack(&mut buf);
|
||||
let page_img: Bytes;
|
||||
if let Some(img) = &content.page_image {
|
||||
page_img = img.clone();
|
||||
} else if content.wal_record.is_some() {
|
||||
if let CacheEntryContent::PageImage(img) = content {
|
||||
page_img = img;
|
||||
} else if let CacheEntryContent::WALRecord(_rec) = content {
|
||||
// Request the WAL redo manager to apply the WAL records for us.
|
||||
let (base_img, records) = self.collect_records_for_apply(tag, lsn);
|
||||
page_img = self.walredo_mgr.request_redo(tag, lsn, base_img, records)?;
|
||||
@@ -603,10 +618,7 @@ impl Timeline for RocksTimeline {
|
||||
let lsn = rec.lsn;
|
||||
let key = CacheKey { tag, lsn };
|
||||
|
||||
let content = CacheEntryContent {
|
||||
page_image: None,
|
||||
wal_record: Some(rec),
|
||||
};
|
||||
let content = CacheEntryContent::WALRecord(rec);
|
||||
|
||||
let mut key_buf = BytesMut::new();
|
||||
key.pack(&mut key_buf);
|
||||
@@ -624,32 +636,33 @@ impl Timeline for RocksTimeline {
|
||||
/// Adds a relation-wide WAL record (like truncate) to the repository,
|
||||
/// associating it with all pages started with specified block number
|
||||
///
|
||||
fn put_rel_wal_record(&self, tag: BufferTag, rec: WALRecord) -> anyhow::Result<()> {
|
||||
let mut key = CacheKey { tag, lsn: rec.lsn };
|
||||
|
||||
fn put_truncation(&self, rel: RelTag, lsn: Lsn, nblocks: u32) -> anyhow::Result<()> {
|
||||
// What was the size of the relation before this record?
|
||||
let last_lsn = self.last_valid_lsn.load();
|
||||
let old_rel_size = self.relsize_get_nowait(tag.rel, last_lsn)?;
|
||||
let old_rel_size = self.relsize_get_nowait(rel, last_lsn)?;
|
||||
|
||||
let content = CacheEntryContent {
|
||||
page_image: None,
|
||||
wal_record: Some(rec),
|
||||
};
|
||||
let content = CacheEntryContent::Truncation;
|
||||
// set new relation size
|
||||
trace!("Truncate relation {:?}", tag);
|
||||
trace!("Truncate relation {:?} to {} blocks", rel, nblocks);
|
||||
|
||||
let mut key_buf = BytesMut::new();
|
||||
let mut val_buf = BytesMut::new();
|
||||
content.pack(&mut val_buf);
|
||||
|
||||
for blknum in tag.blknum..old_rel_size {
|
||||
for blknum in nblocks..old_rel_size {
|
||||
key_buf.clear();
|
||||
key.tag.blknum = blknum;
|
||||
let key = CacheKey {
|
||||
tag: BufferTag {
|
||||
rel,
|
||||
blknum,
|
||||
},
|
||||
lsn,
|
||||
};
|
||||
key.pack(&mut key_buf);
|
||||
trace!("put_wal_record lsn: {}", key.lsn);
|
||||
let _res = self.db.put(&key_buf[..], &val_buf[..]);
|
||||
}
|
||||
let n = (old_rel_size - tag.blknum) as u64;
|
||||
let n = (old_rel_size - nblocks) as u64;
|
||||
self.num_entries.fetch_add(n, Ordering::Relaxed);
|
||||
self.num_wal_records.fetch_add(n, Ordering::Relaxed);
|
||||
Ok(())
|
||||
@@ -660,10 +673,7 @@ impl Timeline for RocksTimeline {
|
||||
///
|
||||
fn put_page_image(&self, tag: BufferTag, lsn: Lsn, img: Bytes) {
|
||||
let key = CacheKey { tag, lsn };
|
||||
let content = CacheEntryContent {
|
||||
page_image: Some(img),
|
||||
wal_record: None,
|
||||
};
|
||||
let content = CacheEntryContent::PageImage(img);
|
||||
|
||||
let mut key_buf = BytesMut::new();
|
||||
key.pack(&mut key_buf);
|
||||
|
||||
@@ -370,7 +370,6 @@ fn restore_wal(timeline: &dyn Timeline, timelineid: ZTimelineId, startpoint: Lsn
|
||||
let rec = WALRecord {
|
||||
lsn,
|
||||
will_init: blk.will_init || blk.apply_image,
|
||||
truncate: false,
|
||||
rec: recdata.clone(),
|
||||
main_data_offset: decoded.main_data_offset as u32,
|
||||
};
|
||||
|
||||
@@ -234,7 +234,6 @@ fn walreceiver_main(
|
||||
let rec = WALRecord {
|
||||
lsn,
|
||||
will_init: blk.will_init || blk.apply_image,
|
||||
truncate: false,
|
||||
rec: recdata.clone(),
|
||||
main_data_offset: decoded.main_data_offset as u32,
|
||||
};
|
||||
@@ -248,23 +247,13 @@ fn walreceiver_main(
|
||||
{
|
||||
let truncate = XlSmgrTruncate::decode(&decoded);
|
||||
if (truncate.flags & SMGR_TRUNCATE_HEAP) != 0 {
|
||||
let tag = BufferTag {
|
||||
rel: RelTag {
|
||||
spcnode: truncate.rnode.spcnode,
|
||||
dbnode: truncate.rnode.dbnode,
|
||||
relnode: truncate.rnode.relnode,
|
||||
forknum: MAIN_FORKNUM,
|
||||
},
|
||||
blknum: truncate.blkno,
|
||||
let rel = RelTag {
|
||||
spcnode: truncate.rnode.spcnode,
|
||||
dbnode: truncate.rnode.dbnode,
|
||||
relnode: truncate.rnode.relnode,
|
||||
forknum: MAIN_FORKNUM,
|
||||
};
|
||||
let rec = WALRecord {
|
||||
lsn,
|
||||
will_init: false,
|
||||
truncate: true,
|
||||
rec: recdata.clone(),
|
||||
main_data_offset: decoded.main_data_offset as u32,
|
||||
};
|
||||
timeline.put_rel_wal_record(tag, rec)?;
|
||||
timeline.put_truncation(rel, lsn, truncate.blkno)?;
|
||||
}
|
||||
} else if decoded.xl_rmid == pg_constants::RM_DBASE_ID
|
||||
&& (decoded.xl_info & pg_constants::XLR_RMGR_INFO_MASK)
|
||||
|
||||
Reference in New Issue
Block a user