Re-group functions in page_cache.rs, and add comments.

This commit is contained in:
Heikki Linnakangas
2021-04-24 17:54:31 +03:00
parent 0fc05569e0
commit 5e0cc89de8

View File

@@ -368,9 +368,474 @@ impl WALRecord {
}
}
// Public interface functions
impl PageCache {
// Public GET interface functions
///
/// GetPage@LSN
///
/// Returns an 8k page image
///
pub async fn get_page_at_lsn(&self, tag: BufferTag, req_lsn: u64) -> anyhow::Result<Bytes> {
self.num_getpage_requests.fetch_add(1, Ordering::Relaxed);
let lsn = self.wait_lsn(req_lsn).await?;
// Look up cache entry. If it's a page image, return that. If it's a WAL record,
// ask the WAL redo service to reconstruct the page image from the WAL records.
let minkey = CacheKey { tag, lsn: 0 };
let maxkey = CacheKey { tag, lsn };
let mut buf = BytesMut::new();
minkey.pack(&mut buf);
let mut readopts = rocksdb::ReadOptions::default();
readopts.set_iterate_lower_bound(buf.to_vec());
buf.clear();
maxkey.pack(&mut buf);
let mut iter = self.db.iterator_opt(
rocksdb::IteratorMode::From(&buf[..], rocksdb::Direction::Reverse),
readopts,
);
let entry_opt = iter.next();
if entry_opt.is_none() {
static ZERO_PAGE: [u8; 8192] = [0u8; 8192];
debug!("Page {:?} at {}({}) not found", tag, req_lsn, lsn);
return Ok(Bytes::from_static(&ZERO_PAGE));
/* return Err("could not find page image")?; */
}
let (_k, v) = entry_opt.unwrap();
buf.clear();
buf.extend_from_slice(&v);
let content = CacheEntryContent::unpack(&mut buf);
let page_img: Bytes;
if let Some(img) = &content.page_image {
page_img = img.clone();
} else if content.wal_record.is_some() {
// Request the WAL redo manager to apply the WAL records for us.
page_img = self.walredo_mgr.request_redo(tag, lsn).await?;
} else {
// No base image, and no WAL record. Huh?
bail!("no page image or WAL record for requested page");
}
// FIXME: assumes little-endian. Only used for the debugging log though
let page_lsn_hi = u32::from_le_bytes(page_img.get(0..4).unwrap().try_into().unwrap());
let page_lsn_lo = u32::from_le_bytes(page_img.get(4..8).unwrap().try_into().unwrap());
debug!(
"Returning page with LSN {:X}/{:X} for {}/{}/{}.{} blk {}",
page_lsn_hi,
page_lsn_lo,
tag.rel.spcnode,
tag.rel.dbnode,
tag.rel.relnode,
tag.rel.forknum,
tag.blknum
);
Ok(page_img)
}
///
/// Get size of relation at given LSN.
///
pub async fn relsize_get(&self, rel: &RelTag, lsn: u64) -> anyhow::Result<u32> {
self.wait_lsn(lsn).await?;
return self.relsize_get_nowait(rel, lsn);
}
///
/// Does relation exist at given LSN?
///
pub async fn relsize_exist(&self, rel: &RelTag, req_lsn: u64) -> anyhow::Result<bool> {
let lsn = self.wait_lsn(req_lsn).await?;
let key = CacheKey {
tag: BufferTag {
rel: *rel,
blknum: u32::MAX,
},
lsn,
};
let mut buf = BytesMut::new();
key.pack(&mut buf);
let mut iter = self.db.iterator(rocksdb::IteratorMode::From(
&buf[..],
rocksdb::Direction::Reverse,
));
if let Some((k, _v)) = iter.next() {
buf.clear();
buf.extend_from_slice(&k);
let tag = BufferTag::unpack(&mut buf);
if tag.rel == *rel {
debug!("Relation {:?} exists at {}", rel, lsn);
return Ok(true);
}
}
debug!("Relation {:?} doesn't exist at {}", rel, lsn);
Ok(false)
}
// Other public functions, for updating the page cache.
// These are used by the WAL receiver and WAL redo.
///
/// Collect all the WAL records that are needed to reconstruct a page
/// image for the given cache entry.
///
/// Returns an old page image (if any), and a vector of WAL records to apply
/// over it.
///
pub fn collect_records_for_apply(&self, tag: BufferTag, lsn: u64) -> (Option<Bytes>, Vec<WALRecord>) {
let minkey = CacheKey {
tag: BufferTag {
rel: tag.rel,
blknum: 0,
},
lsn: 0,
};
let mut buf = BytesMut::new();
minkey.pack(&mut buf);
let mut readopts = rocksdb::ReadOptions::default();
readopts.set_iterate_lower_bound(buf.to_vec());
let key = CacheKey {
tag: BufferTag {
rel: tag.rel,
blknum: tag.blknum,
},
lsn: lsn,
};
buf.clear();
key.pack(&mut buf);
let iter = self.db.iterator_opt(
rocksdb::IteratorMode::From(&buf[..], rocksdb::Direction::Reverse),
readopts,
);
let mut base_img: Option<Bytes> = None;
let mut records: Vec<WALRecord> = Vec::new();
// Scan backwards, collecting the WAL records, until we hit an
// old page image.
for (_k, v) in iter {
buf.clear();
buf.extend_from_slice(&v);
let content = CacheEntryContent::unpack(&mut buf);
if let Some(img) = &content.page_image {
// We have a base image. No need to dig deeper into the list of
// records
base_img = Some(img.clone());
break;
} else if let Some(rec) = &content.wal_record {
records.push(rec.clone());
// If this WAL record initializes the page, no need to dig deeper.
if rec.will_init {
break;
}
} else {
panic!("no base image and no WAL record on cache entry");
}
}
records.reverse();
(base_img, records)
}
///
/// Adds a WAL record to the page cache
///
pub fn put_wal_record(&self, tag: BufferTag, rec: WALRecord) {
let lsn = rec.lsn;
let key = CacheKey { tag, lsn };
let content = CacheEntryContent {
page_image: None,
wal_record: Some(rec),
apply_pending: false,
};
let mut key_buf = BytesMut::new();
key.pack(&mut key_buf);
let mut val_buf = BytesMut::new();
content.pack(&mut val_buf);
let _res = self.db.put(&key_buf[..], &val_buf[..]);
//trace!("put_wal_record lsn: {}", lsn);
self.num_entries.fetch_add(1, Ordering::Relaxed);
self.num_wal_records.fetch_add(1, Ordering::Relaxed);
}
///
/// Adds a relation-wide WAL record (like truncate) to the page cache,
/// associating it with all pages started with specified block number
///
pub fn put_rel_wal_record(&self, tag: BufferTag, rec: WALRecord) -> anyhow::Result<()> {
let mut key = CacheKey { tag, lsn: rec.lsn };
// What was the size of the relation before this record?
let last_lsn = self.last_valid_lsn.load(Ordering::Acquire);
let old_rel_size = self.relsize_get_nowait(&tag.rel, last_lsn)?;
let content = CacheEntryContent {
page_image: None,
wal_record: Some(rec),
apply_pending: false,
};
// set new relation size
trace!("Truncate relation {:?}", tag);
let mut key_buf = BytesMut::new();
let mut val_buf = BytesMut::new();
content.pack(&mut val_buf);
for blknum in tag.blknum..old_rel_size {
key_buf.clear();
key.tag.blknum = blknum;
key.pack(&mut key_buf);
trace!("put_wal_record lsn: {}", key.lsn);
let _res = self.db.put(&key_buf[..], &val_buf[..]);
}
let n = (old_rel_size - tag.blknum) as u64;
self.num_entries.fetch_add(n, Ordering::Relaxed);
self.num_wal_records.fetch_add(n, Ordering::Relaxed);
Ok(())
}
///
/// Memorize a full image of a page version
///
pub fn put_page_image(&self, tag: BufferTag, lsn: u64, img: Bytes) {
let key = CacheKey { tag, lsn };
let content = CacheEntryContent {
page_image: Some(img),
wal_record: None,
apply_pending: false,
};
let mut key_buf = BytesMut::new();
key.pack(&mut key_buf);
let mut val_buf = BytesMut::new();
content.pack(&mut val_buf);
trace!("put_wal_record lsn: {}", key.lsn);
let _res = self.db.put(&key_buf[..], &val_buf[..]);
//debug!("inserted page image for {}/{}/{}_{} blk {} at {}",
// tag.spcnode, tag.dbnode, tag.relnode, tag.forknum, tag.blknum, lsn);
self.num_page_images.fetch_add(1, Ordering::Relaxed);
}
pub fn create_database(
&self,
lsn: u64,
db_id: Oid,
tablespace_id: Oid,
src_db_id: Oid,
src_tablespace_id: Oid,
) -> anyhow::Result<()> {
let mut buf = BytesMut::new();
let key = CacheKey {
tag: BufferTag {
rel: RelTag {
spcnode: src_tablespace_id,
dbnode: src_db_id,
relnode: 0,
forknum: 0u8,
},
blknum: 0,
},
lsn: 0,
};
key.pack(&mut buf);
let iter = self.db.iterator(rocksdb::IteratorMode::From(
&buf[..],
rocksdb::Direction::Forward,
));
let mut n = 0;
for (k, v) in iter {
buf.clear();
buf.extend_from_slice(&k);
let mut key = CacheKey::unpack(&mut buf);
if key.tag.rel.spcnode != src_tablespace_id || key.tag.rel.dbnode != src_db_id {
break;
}
key.tag.rel.spcnode = tablespace_id;
key.tag.rel.dbnode = db_id;
key.lsn = lsn;
buf.clear();
key.pack(&mut buf);
self.db.put(&buf[..], v)?;
n += 1;
}
info!(
"Create database {}/{}, copy {} entries",
tablespace_id, db_id, n
);
Ok(())
}
/// Remember that WAL has been received and added to the page cache up to the given LSN
pub fn advance_last_valid_lsn(&self, lsn: u64) {
let mut shared = self.shared.lock().unwrap();
// Can't move backwards.
let oldlsn = shared.last_valid_lsn;
if lsn >= oldlsn {
shared.last_valid_lsn = lsn;
self.seqwait_lsn.advance(lsn);
self.last_valid_lsn.store(lsn, Ordering::Relaxed);
} else {
warn!(
"attempted to move last valid LSN backwards (was {:X}/{:X}, new {:X}/{:X})",
oldlsn >> 32,
oldlsn & 0xffffffff,
lsn >> 32,
lsn & 0xffffffff
);
}
}
///
/// Remember the (end of) last valid WAL record remembered in the page cache.
///
/// NOTE: this updates last_valid_lsn as well.
///
pub fn advance_last_record_lsn(&self, lsn: u64) {
let mut shared = self.shared.lock().unwrap();
// Can't move backwards.
assert!(lsn >= shared.last_valid_lsn);
assert!(lsn >= shared.last_record_lsn);
shared.last_valid_lsn = lsn;
shared.last_record_lsn = lsn;
self.seqwait_lsn.advance(lsn);
self.last_valid_lsn.store(lsn, Ordering::Relaxed);
self.last_record_lsn.store(lsn, Ordering::Relaxed);
}
///
/// Remember the beginning of valid WAL.
///
/// TODO: This should be called by garbage collection, so that if an older
/// page is requested, we will return an error to the requestor.
pub fn _advance_first_valid_lsn(&self, lsn: u64) {
let mut shared = self.shared.lock().unwrap();
// Can't move backwards.
assert!(lsn >= shared.first_valid_lsn);
// Can't overtake last_valid_lsn (except when we're
// initializing the system and last_valid_lsn hasn't been set yet.
assert!(shared.last_valid_lsn == 0 || lsn < shared.last_valid_lsn);
shared.first_valid_lsn = lsn;
self.first_valid_lsn.store(lsn, Ordering::Relaxed);
}
pub fn init_valid_lsn(&self, lsn: u64) {
let mut shared = self.shared.lock().unwrap();
assert!(shared.first_valid_lsn == 0);
assert!(shared.last_valid_lsn == 0);
assert!(shared.last_record_lsn == 0);
shared.first_valid_lsn = lsn;
shared.last_valid_lsn = lsn;
shared.last_record_lsn = lsn;
self.first_valid_lsn.store(lsn, Ordering::Relaxed);
self.last_valid_lsn.store(lsn, Ordering::Relaxed);
self.last_record_lsn.store(lsn, Ordering::Relaxed);
}
pub fn get_last_valid_lsn(&self) -> u64 {
let shared = self.shared.lock().unwrap();
shared.last_record_lsn
}
//
// Get statistics to be displayed in the user interface.
//
pub fn get_stats(&self) -> PageCacheStats {
PageCacheStats {
num_entries: self.num_entries.load(Ordering::Relaxed),
num_page_images: self.num_page_images.load(Ordering::Relaxed),
num_wal_records: self.num_wal_records.load(Ordering::Relaxed),
num_getpage_requests: self.num_getpage_requests.load(Ordering::Relaxed),
first_valid_lsn: self.first_valid_lsn.load(Ordering::Relaxed),
last_valid_lsn: self.last_valid_lsn.load(Ordering::Relaxed),
last_record_lsn: self.last_record_lsn.load(Ordering::Relaxed),
}
}
// Internal functions
//
// Internal function to get relation size at given LSN.
//
// The caller must ensure that WAL has been received up to 'lsn'.
//
fn relsize_get_nowait(&self, rel: &RelTag, lsn: u64) -> anyhow::Result<u32> {
assert!(lsn <= self.last_valid_lsn.load(Ordering::Acquire));
let mut key = CacheKey {
tag: BufferTag {
rel: *rel,
blknum: u32::MAX,
},
lsn,
};
let mut buf = BytesMut::new();
loop {
buf.clear();
key.pack(&mut buf);
let mut iter = self.db.iterator(rocksdb::IteratorMode::From(
&buf[..],
rocksdb::Direction::Reverse,
));
if let Some((k, v)) = iter.next() {
buf.clear();
buf.extend_from_slice(&k);
let tag = BufferTag::unpack(&mut buf);
if tag.rel == *rel {
buf.clear();
buf.extend_from_slice(&v);
let content = CacheEntryContent::unpack(&mut buf);
if let Some(rec) = &content.wal_record {
if rec.truncate {
if tag.blknum > 0 {
key.tag.blknum = tag.blknum - 1;
continue;
}
break;
}
}
let relsize = tag.blknum + 1;
debug!("Size of relation {:?} at {} is {}", rel, lsn, relsize);
return Ok(relsize);
}
}
break;
}
debug!("Size of relation {:?} at {} is zero", rel, lsn);
Ok(0)
}
async fn do_gc(&self, conf: &PageServerConf) -> anyhow::Result<Bytes> {
let mut minbuf = BytesMut::new();
@@ -499,456 +964,6 @@ impl PageCache {
Ok(lsn)
}
//
// GetPage@LSN
//
// Returns an 8k page image
//
pub async fn get_page_at_lsn(&self, tag: BufferTag, req_lsn: u64) -> anyhow::Result<Bytes> {
self.num_getpage_requests.fetch_add(1, Ordering::Relaxed);
let lsn = self.wait_lsn(req_lsn).await?;
// Look up cache entry. If it's a page image, return that. If it's a WAL record,
// ask the WAL redo service to reconstruct the page image from the WAL records.
let minkey = CacheKey { tag, lsn: 0 };
let maxkey = CacheKey { tag, lsn };
let mut buf = BytesMut::new();
minkey.pack(&mut buf);
let mut readopts = rocksdb::ReadOptions::default();
readopts.set_iterate_lower_bound(buf.to_vec());
buf.clear();
maxkey.pack(&mut buf);
let mut iter = self.db.iterator_opt(
rocksdb::IteratorMode::From(&buf[..], rocksdb::Direction::Reverse),
readopts,
);
let entry_opt = iter.next();
if entry_opt.is_none() {
static ZERO_PAGE: [u8; 8192] = [0u8; 8192];
debug!("Page {:?} at {}({}) not found", tag, req_lsn, lsn);
return Ok(Bytes::from_static(&ZERO_PAGE));
/* return Err("could not find page image")?; */
}
let (_k, v) = entry_opt.unwrap();
buf.clear();
buf.extend_from_slice(&v);
let content = CacheEntryContent::unpack(&mut buf);
let page_img: Bytes;
if let Some(img) = &content.page_image {
page_img = img.clone();
} else if content.wal_record.is_some() {
// Request the WAL redo manager to apply the WAL records for us.
page_img = self.walredo_mgr.request_redo(tag, lsn).await?;
} else {
// No base image, and no WAL record. Huh?
bail!("no page image or WAL record for requested page");
}
// FIXME: assumes little-endian. Only used for the debugging log though
let page_lsn_hi = u32::from_le_bytes(page_img.get(0..4).unwrap().try_into().unwrap());
let page_lsn_lo = u32::from_le_bytes(page_img.get(4..8).unwrap().try_into().unwrap());
debug!(
"Returning page with LSN {:X}/{:X} for {}/{}/{}.{} blk {}",
page_lsn_hi,
page_lsn_lo,
tag.rel.spcnode,
tag.rel.dbnode,
tag.rel.relnode,
tag.rel.forknum,
tag.blknum
);
Ok(page_img)
}
//
// Collect all the WAL records that are needed to reconstruct a page
// image for the given cache entry.
//
// Returns an old page image (if any), and a vector of WAL records to apply
// over it.
//
pub fn collect_records_for_apply(&self, tag: BufferTag, lsn: u64) -> (Option<Bytes>, Vec<WALRecord>) {
let minkey = CacheKey {
tag: BufferTag {
rel: tag.rel,
blknum: 0,
},
lsn: 0,
};
let mut buf = BytesMut::new();
minkey.pack(&mut buf);
let mut readopts = rocksdb::ReadOptions::default();
readopts.set_iterate_lower_bound(buf.to_vec());
let key = CacheKey {
tag: BufferTag {
rel: tag.rel,
blknum: tag.blknum,
},
lsn: lsn,
};
buf.clear();
key.pack(&mut buf);
let iter = self.db.iterator_opt(
rocksdb::IteratorMode::From(&buf[..], rocksdb::Direction::Reverse),
readopts,
);
let mut base_img: Option<Bytes> = None;
let mut records: Vec<WALRecord> = Vec::new();
// Scan backwards, collecting the WAL records, until we hit an
// old page image.
for (_k, v) in iter {
buf.clear();
buf.extend_from_slice(&v);
let content = CacheEntryContent::unpack(&mut buf);
if let Some(img) = &content.page_image {
// We have a base image. No need to dig deeper into the list of
// records
base_img = Some(img.clone());
break;
} else if let Some(rec) = &content.wal_record {
records.push(rec.clone());
// If this WAL record initializes the page, no need to dig deeper.
if rec.will_init {
break;
}
} else {
panic!("no base image and no WAL record on cache entry");
}
}
records.reverse();
(base_img, records)
}
//
// Adds a WAL record to the page cache
//
pub fn put_wal_record(&self, tag: BufferTag, rec: WALRecord) {
let lsn = rec.lsn;
let key = CacheKey { tag, lsn };
let content = CacheEntryContent {
page_image: None,
wal_record: Some(rec),
apply_pending: false,
};
let mut key_buf = BytesMut::new();
key.pack(&mut key_buf);
let mut val_buf = BytesMut::new();
content.pack(&mut val_buf);
let _res = self.db.put(&key_buf[..], &val_buf[..]);
//trace!("put_wal_record lsn: {}", lsn);
self.num_entries.fetch_add(1, Ordering::Relaxed);
self.num_wal_records.fetch_add(1, Ordering::Relaxed);
}
//
// Adds a relation-wide WAL record (like truncate) to the page cache,
// associating it with all pages started with specified block number
//
pub fn put_rel_wal_record(&self, tag: BufferTag, rec: WALRecord) -> anyhow::Result<()> {
let mut key = CacheKey { tag, lsn: rec.lsn };
// What was the size of the relation before this record?
let last_lsn = self.last_valid_lsn.load(Ordering::Acquire);
let old_rel_size = self.relsize_get_nowait(&tag.rel, last_lsn)?;
let content = CacheEntryContent {
page_image: None,
wal_record: Some(rec),
apply_pending: false,
};
// set new relation size
trace!("Truncate relation {:?}", tag);
let mut key_buf = BytesMut::new();
let mut val_buf = BytesMut::new();
content.pack(&mut val_buf);
for blknum in tag.blknum..old_rel_size {
key_buf.clear();
key.tag.blknum = blknum;
key.pack(&mut key_buf);
trace!("put_wal_record lsn: {}", key.lsn);
let _res = self.db.put(&key_buf[..], &val_buf[..]);
}
let n = (old_rel_size - tag.blknum) as u64;
self.num_entries.fetch_add(n, Ordering::Relaxed);
self.num_wal_records.fetch_add(n, Ordering::Relaxed);
Ok(())
}
//
// Memorize a full image of a page version
//
pub fn put_page_image(&self, tag: BufferTag, lsn: u64, img: Bytes) {
let key = CacheKey { tag, lsn };
let content = CacheEntryContent {
page_image: Some(img),
wal_record: None,
apply_pending: false,
};
let mut key_buf = BytesMut::new();
key.pack(&mut key_buf);
let mut val_buf = BytesMut::new();
content.pack(&mut val_buf);
trace!("put_wal_record lsn: {}", key.lsn);
let _res = self.db.put(&key_buf[..], &val_buf[..]);
//debug!("inserted page image for {}/{}/{}_{} blk {} at {}",
// tag.spcnode, tag.dbnode, tag.relnode, tag.forknum, tag.blknum, lsn);
self.num_page_images.fetch_add(1, Ordering::Relaxed);
}
//
pub fn advance_last_valid_lsn(&self, lsn: u64) {
let mut shared = self.shared.lock().unwrap();
// Can't move backwards.
let oldlsn = shared.last_valid_lsn;
if lsn >= oldlsn {
shared.last_valid_lsn = lsn;
self.seqwait_lsn.advance(lsn);
self.last_valid_lsn.store(lsn, Ordering::Relaxed);
} else {
warn!(
"attempted to move last valid LSN backwards (was {:X}/{:X}, new {:X}/{:X})",
oldlsn >> 32,
oldlsn & 0xffffffff,
lsn >> 32,
lsn & 0xffffffff
);
}
}
//
// NOTE: this updates last_valid_lsn as well.
//
pub fn advance_last_record_lsn(&self, lsn: u64) {
let mut shared = self.shared.lock().unwrap();
// Can't move backwards.
assert!(lsn >= shared.last_valid_lsn);
assert!(lsn >= shared.last_record_lsn);
shared.last_valid_lsn = lsn;
shared.last_record_lsn = lsn;
self.seqwait_lsn.advance(lsn);
self.last_valid_lsn.store(lsn, Ordering::Relaxed);
self.last_record_lsn.store(lsn, Ordering::Relaxed);
}
//
pub fn _advance_first_valid_lsn(&self, lsn: u64) {
let mut shared = self.shared.lock().unwrap();
// Can't move backwards.
assert!(lsn >= shared.first_valid_lsn);
// Can't overtake last_valid_lsn (except when we're
// initializing the system and last_valid_lsn hasn't been set yet.
assert!(shared.last_valid_lsn == 0 || lsn < shared.last_valid_lsn);
shared.first_valid_lsn = lsn;
self.first_valid_lsn.store(lsn, Ordering::Relaxed);
}
pub fn init_valid_lsn(&self, lsn: u64) {
let mut shared = self.shared.lock().unwrap();
assert!(shared.first_valid_lsn == 0);
assert!(shared.last_valid_lsn == 0);
assert!(shared.last_record_lsn == 0);
shared.first_valid_lsn = lsn;
shared.last_valid_lsn = lsn;
shared.last_record_lsn = lsn;
self.first_valid_lsn.store(lsn, Ordering::Relaxed);
self.last_valid_lsn.store(lsn, Ordering::Relaxed);
self.last_record_lsn.store(lsn, Ordering::Relaxed);
}
pub fn get_last_valid_lsn(&self) -> u64 {
let shared = self.shared.lock().unwrap();
shared.last_record_lsn
}
//
// Get size of relation at given LSN.
//
pub async fn relsize_get(&self, rel: &RelTag, lsn: u64) -> anyhow::Result<u32> {
self.wait_lsn(lsn).await?;
self.relsize_get_nowait(rel, lsn)
}
//
// Internal function to get relation size at given LSN.
//
// The caller must ensure that WAL has been received up to 'lsn'.
//
fn relsize_get_nowait(&self, rel: &RelTag, lsn: u64) -> anyhow::Result<u32> {
assert!(lsn <= self.last_valid_lsn.load(Ordering::Acquire));
let mut key = CacheKey {
tag: BufferTag {
rel: *rel,
blknum: u32::MAX,
},
lsn,
};
let mut buf = BytesMut::new();
loop {
buf.clear();
key.pack(&mut buf);
let mut iter = self.db.iterator(rocksdb::IteratorMode::From(
&buf[..],
rocksdb::Direction::Reverse,
));
if let Some((k, v)) = iter.next() {
buf.clear();
buf.extend_from_slice(&k);
let tag = BufferTag::unpack(&mut buf);
if tag.rel == *rel {
buf.clear();
buf.extend_from_slice(&v);
let content = CacheEntryContent::unpack(&mut buf);
if let Some(rec) = &content.wal_record {
if rec.truncate {
if tag.blknum > 0 {
key.tag.blknum = tag.blknum - 1;
continue;
}
break;
}
}
let relsize = tag.blknum + 1;
debug!("Size of relation {:?} at {} is {}", rel, lsn, relsize);
return Ok(relsize);
}
}
break;
}
debug!("Size of relation {:?} at {} is zero", rel, lsn);
Ok(0)
}
pub async fn relsize_exist(&self, rel: &RelTag, req_lsn: u64) -> anyhow::Result<bool> {
let lsn = self.wait_lsn(req_lsn).await?;
let key = CacheKey {
tag: BufferTag {
rel: *rel,
blknum: u32::MAX,
},
lsn,
};
let mut buf = BytesMut::new();
key.pack(&mut buf);
let mut iter = self.db.iterator(rocksdb::IteratorMode::From(
&buf[..],
rocksdb::Direction::Reverse,
));
if let Some((k, _v)) = iter.next() {
buf.clear();
buf.extend_from_slice(&k);
let tag = BufferTag::unpack(&mut buf);
if tag.rel == *rel {
debug!("Relation {:?} exists at {}", rel, lsn);
return Ok(true);
}
}
debug!("Relation {:?} doesn't exist at {}", rel, lsn);
Ok(false)
}
//
// Get statistics to be displayed in the user interface.
//
pub fn get_stats(&self) -> PageCacheStats {
PageCacheStats {
num_entries: self.num_entries.load(Ordering::Relaxed),
num_page_images: self.num_page_images.load(Ordering::Relaxed),
num_wal_records: self.num_wal_records.load(Ordering::Relaxed),
num_getpage_requests: self.num_getpage_requests.load(Ordering::Relaxed),
first_valid_lsn: self.first_valid_lsn.load(Ordering::Relaxed),
last_valid_lsn: self.last_valid_lsn.load(Ordering::Relaxed),
last_record_lsn: self.last_record_lsn.load(Ordering::Relaxed),
}
}
pub fn create_database(
&self,
lsn: u64,
db_id: Oid,
tablespace_id: Oid,
src_db_id: Oid,
src_tablespace_id: Oid,
) -> anyhow::Result<()> {
let mut buf = BytesMut::new();
let key = CacheKey {
tag: BufferTag {
rel: RelTag {
spcnode: src_tablespace_id,
dbnode: src_db_id,
relnode: 0,
forknum: 0u8,
},
blknum: 0,
},
lsn: 0,
};
key.pack(&mut buf);
let iter = self.db.iterator(rocksdb::IteratorMode::From(
&buf[..],
rocksdb::Direction::Forward,
));
let mut n = 0;
for (k, v) in iter {
buf.clear();
buf.extend_from_slice(&k);
let mut key = CacheKey::unpack(&mut buf);
if key.tag.rel.spcnode != src_tablespace_id || key.tag.rel.dbnode != src_db_id {
break;
}
key.tag.rel.spcnode = tablespace_id;
key.tag.rel.dbnode = db_id;
key.lsn = lsn;
buf.clear();
key.pack(&mut buf);
self.db.put(&buf[..], v)?;
n += 1;
}
info!(
"Create database {}/{}, copy {} entries",
tablespace_id, db_id, n
);
Ok(())
}
}
//