mirror of
https://github.com/neondatabase/neon.git
synced 2026-01-07 13:32:57 +00:00
LSN-aware smgrnblock/smgrexists implementations
This commit is contained in:
@@ -50,7 +50,6 @@ fn test_redo_cases() {
|
||||
|
||||
// Runs pg_regress on a compute node
|
||||
#[test]
|
||||
#[ignore]
|
||||
fn test_regress() {
|
||||
// Start pageserver that reads WAL directly from that postgres
|
||||
let storage_cplane = TestStorageControlPlane::one_page_server(String::new());
|
||||
|
||||
@@ -79,13 +79,6 @@ impl AddAssign for PageCacheStats {
|
||||
// Shared data structure, holding page cache and related auxiliary information
|
||||
//
|
||||
struct PageCacheShared {
|
||||
// Relation n_blocks cache
|
||||
//
|
||||
// This hashtable should be updated together with the pagecache. Now it is
|
||||
// accessed unreasonably often through the smgr_nblocks(). It is better to just
|
||||
// cache it in postgres smgr and ask only on restart.
|
||||
relsize_cache: HashMap<RelTag, u32>,
|
||||
|
||||
// What page versions do we hold in the cache? If we get GetPage with
|
||||
// LSN < first_valid_lsn, that's an error because we (no longer) hold that
|
||||
// page version. If we get a request > last_valid_lsn, we need to wait until
|
||||
@@ -148,7 +141,6 @@ fn init_page_cache(conf: &PageServerConf, sys_id: u64) -> PageCache {
|
||||
PageCache {
|
||||
db: open_rocksdb(&conf, sys_id),
|
||||
shared: Mutex::new(PageCacheShared {
|
||||
relsize_cache: HashMap::new(),
|
||||
first_valid_lsn: 0,
|
||||
last_valid_lsn: 0,
|
||||
last_record_lsn: 0,
|
||||
@@ -338,6 +330,46 @@ impl WALRecord {
|
||||
// Public interface functions
|
||||
|
||||
impl PageCache {
|
||||
fn wait_lsn(&self, lsn: u64) -> anyhow::Result<()> {
|
||||
let mut shared = self.shared.lock().unwrap();
|
||||
let mut waited = false;
|
||||
|
||||
while lsn > shared.last_valid_lsn {
|
||||
// TODO: Wait for the WAL receiver to catch up
|
||||
waited = true;
|
||||
trace!(
|
||||
"not caught up yet: {}, requested {}",
|
||||
shared.last_valid_lsn,
|
||||
lsn
|
||||
);
|
||||
let wait_result = self
|
||||
.valid_lsn_condvar
|
||||
.wait_timeout(shared, TIMEOUT)
|
||||
.unwrap();
|
||||
|
||||
shared = wait_result.0;
|
||||
if wait_result.1.timed_out() {
|
||||
bail!(
|
||||
"Timed out while waiting for WAL record at LSN {:X}/{:X} to arrive",
|
||||
lsn >> 32,
|
||||
lsn & 0xffff_ffff
|
||||
);
|
||||
}
|
||||
}
|
||||
if waited {
|
||||
trace!("caught up now, continuing");
|
||||
}
|
||||
|
||||
if lsn < shared.first_valid_lsn {
|
||||
bail!(
|
||||
"LSN {:X}/{:X} has already been removed",
|
||||
lsn >> 32,
|
||||
lsn & 0xffff_ffff
|
||||
);
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
//
|
||||
// GetPage@LSN
|
||||
//
|
||||
@@ -346,49 +378,12 @@ impl PageCache {
|
||||
pub fn get_page_at_lsn(&self, tag: BufferTag, lsn: u64) -> anyhow::Result<Bytes> {
|
||||
self.num_getpage_requests.fetch_add(1, Ordering::Relaxed);
|
||||
|
||||
self.wait_lsn(lsn)?;
|
||||
|
||||
// Look up cache entry. If it's a page image, return that. If it's a WAL record,
|
||||
// ask the WAL redo service to reconstruct the page image from the WAL records.
|
||||
let minkey = CacheKey { tag, lsn: 0 };
|
||||
let maxkey = CacheKey { tag, lsn };
|
||||
|
||||
{
|
||||
let mut shared = self.shared.lock().unwrap();
|
||||
let mut waited = false;
|
||||
|
||||
while lsn > shared.last_valid_lsn {
|
||||
// TODO: Wait for the WAL receiver to catch up
|
||||
waited = true;
|
||||
trace!(
|
||||
"not caught up yet: {}, requested {}",
|
||||
shared.last_valid_lsn,
|
||||
lsn
|
||||
);
|
||||
let wait_result = self
|
||||
.valid_lsn_condvar
|
||||
.wait_timeout(shared, TIMEOUT)
|
||||
.unwrap();
|
||||
|
||||
shared = wait_result.0;
|
||||
if wait_result.1.timed_out() {
|
||||
bail!(
|
||||
"Timed out while waiting for WAL record at LSN {:X}/{:X} to arrive",
|
||||
lsn >> 32,
|
||||
lsn & 0xffff_ffff
|
||||
);
|
||||
}
|
||||
}
|
||||
if waited {
|
||||
trace!("caught up now, continuing");
|
||||
}
|
||||
|
||||
if lsn < shared.first_valid_lsn {
|
||||
bail!(
|
||||
"LSN {:X}/{:X} has already been removed",
|
||||
lsn >> 32,
|
||||
lsn & 0xffff_ffff
|
||||
);
|
||||
}
|
||||
}
|
||||
let mut buf = BytesMut::new();
|
||||
minkey.pack(&mut buf);
|
||||
|
||||
@@ -521,17 +516,6 @@ impl PageCache {
|
||||
return (base_img, records);
|
||||
}
|
||||
|
||||
fn update_rel_size(&self, tag: &BufferTag) {
|
||||
let mut shared = self.shared.lock().unwrap();
|
||||
let rel_entry = shared
|
||||
.relsize_cache
|
||||
.entry(tag.rel)
|
||||
.or_insert(tag.blknum + 1);
|
||||
if tag.blknum >= *rel_entry {
|
||||
*rel_entry = tag.blknum + 1;
|
||||
}
|
||||
}
|
||||
|
||||
//
|
||||
// Adds a WAL record to the page cache
|
||||
//
|
||||
@@ -544,8 +528,6 @@ impl PageCache {
|
||||
apply_pending: false,
|
||||
};
|
||||
|
||||
self.update_rel_size(&tag);
|
||||
|
||||
let mut key_buf = BytesMut::new();
|
||||
key.pack(&mut key_buf);
|
||||
let mut val_buf = BytesMut::new();
|
||||
@@ -564,19 +546,14 @@ impl PageCache {
|
||||
//
|
||||
pub fn put_rel_wal_record(&self, tag: BufferTag, rec: WALRecord) {
|
||||
let mut key = CacheKey { tag, lsn: rec.lsn };
|
||||
let old_rel_size = self.relsize_get(&tag.rel);
|
||||
let old_rel_size = self.relsize_get(&tag.rel, u64::MAX).unwrap();
|
||||
let content = CacheEntryContent {
|
||||
page_image: None,
|
||||
wal_record: Some(rec),
|
||||
apply_pending: false,
|
||||
};
|
||||
// set new relation size
|
||||
self.shared
|
||||
.lock()
|
||||
.unwrap()
|
||||
.relsize_cache
|
||||
.insert(tag.rel, tag.blknum);
|
||||
info!("Truncate relation {:?}", tag);
|
||||
trace!("Truncate relation {:?}", tag);
|
||||
let mut key_buf = BytesMut::new();
|
||||
let mut val_buf = BytesMut::new();
|
||||
content.pack(&mut val_buf);
|
||||
@@ -692,31 +669,17 @@ impl PageCache {
|
||||
return shared.last_record_lsn;
|
||||
}
|
||||
|
||||
// FIXME: Shouldn't relation size also be tracked with an LSN?
|
||||
// If a replica is lagging behind, it needs to get the size as it was on
|
||||
// the replica's current replay LSN.
|
||||
pub fn relsize_inc(&self, rel: &RelTag, to: Option<u32>) {
|
||||
let mut shared = self.shared.lock().unwrap();
|
||||
let entry = shared.relsize_cache.entry(*rel).or_insert(0);
|
||||
pub fn relsize_get(&self, rel: &RelTag, lsn: u64) -> anyhow::Result<u32> {
|
||||
if lsn != u64::MAX {
|
||||
self.wait_lsn(lsn)?;
|
||||
}
|
||||
|
||||
if let Some(to) = to {
|
||||
if to >= *entry {
|
||||
*entry = to + 1;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub fn relsize_get(&self, rel: &RelTag) -> u32 {
|
||||
let mut shared = self.shared.lock().unwrap();
|
||||
if let Some(relsize) = shared.relsize_cache.get(rel) {
|
||||
return *relsize;
|
||||
}
|
||||
let mut key = CacheKey {
|
||||
tag: BufferTag {
|
||||
rel: *rel,
|
||||
blknum: u32::MAX,
|
||||
},
|
||||
lsn: u64::MAX,
|
||||
lsn,
|
||||
};
|
||||
let mut buf = BytesMut::new();
|
||||
|
||||
@@ -744,44 +707,38 @@ impl PageCache {
|
||||
}
|
||||
}
|
||||
let relsize = tag.blknum + 1;
|
||||
shared.relsize_cache.insert(*rel, relsize);
|
||||
return relsize;
|
||||
return Ok(relsize);
|
||||
}
|
||||
}
|
||||
break;
|
||||
}
|
||||
return 0;
|
||||
return Ok(0);
|
||||
}
|
||||
|
||||
pub fn relsize_exist(&self, rel: &RelTag) -> bool {
|
||||
let mut shared = self.shared.lock().unwrap();
|
||||
let relsize_cache = &shared.relsize_cache;
|
||||
if relsize_cache.contains_key(rel) {
|
||||
return true;
|
||||
}
|
||||
pub fn relsize_exist(&self, rel: &RelTag, lsn: u64) -> anyhow::Result<bool> {
|
||||
self.wait_lsn(lsn)?;
|
||||
|
||||
let key = CacheKey {
|
||||
tag: BufferTag {
|
||||
rel: *rel,
|
||||
blknum: 0,
|
||||
blknum: u32::MAX,
|
||||
},
|
||||
lsn: 0,
|
||||
lsn,
|
||||
};
|
||||
let mut buf = BytesMut::new();
|
||||
key.pack(&mut buf);
|
||||
let mut iter = self
|
||||
.db
|
||||
.iterator(IteratorMode::From(&buf[..], Direction::Forward));
|
||||
.iterator(IteratorMode::From(&buf[..], Direction::Reverse));
|
||||
if let Some((k, _v)) = iter.next() {
|
||||
buf.clear();
|
||||
buf.extend_from_slice(&k);
|
||||
let tag = BufferTag::unpack(&mut buf);
|
||||
if tag.rel == *rel {
|
||||
shared.relsize_cache.insert(*rel, tag.blknum + 1);
|
||||
return true;
|
||||
return Ok(true);
|
||||
}
|
||||
}
|
||||
return false;
|
||||
return Ok(false);
|
||||
}
|
||||
|
||||
pub fn get_stats(&self) -> PageCacheStats {
|
||||
|
||||
@@ -36,12 +36,8 @@ enum FeMessage {
|
||||
// All that messages are actually CopyData from libpq point of view.
|
||||
//
|
||||
ZenithExistsRequest(ZenithRequest),
|
||||
ZenithTruncRequest(ZenithRequest),
|
||||
ZenithUnlinkRequest(ZenithRequest),
|
||||
ZenithNblocksRequest(ZenithRequest),
|
||||
ZenithReadRequest(ZenithRequest),
|
||||
ZenithCreateRequest(ZenithRequest),
|
||||
ZenithExtendRequest(ZenithRequest),
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
@@ -193,12 +189,8 @@ impl FeMessage {
|
||||
// serialization.
|
||||
match smgr_tag {
|
||||
0 => Ok(Some(FeMessage::ZenithExistsRequest(zreq))),
|
||||
1 => Ok(Some(FeMessage::ZenithTruncRequest(zreq))),
|
||||
2 => Ok(Some(FeMessage::ZenithUnlinkRequest(zreq))),
|
||||
3 => Ok(Some(FeMessage::ZenithNblocksRequest(zreq))),
|
||||
4 => Ok(Some(FeMessage::ZenithReadRequest(zreq))),
|
||||
5 => Ok(Some(FeMessage::ZenithCreateRequest(zreq))),
|
||||
6 => Ok(Some(FeMessage::ZenithExtendRequest(zreq))),
|
||||
1 => Ok(Some(FeMessage::ZenithNblocksRequest(zreq))),
|
||||
2 => Ok(Some(FeMessage::ZenithReadRequest(zreq))),
|
||||
_ => Err(io::Error::new(
|
||||
io::ErrorKind::InvalidInput,
|
||||
format!("unknown smgr message tag: {},'{:?}'", smgr_tag, buf),
|
||||
@@ -527,7 +519,7 @@ impl Connection {
|
||||
forknum: req.forknum,
|
||||
};
|
||||
|
||||
let exist = pcache.relsize_exist(&tag);
|
||||
let exist = pcache.relsize_exist(&tag, req.lsn).unwrap_or(false);
|
||||
|
||||
self.write_message(&BeMessage::ZenithStatusResponse(ZenithStatusResponse {
|
||||
ok: exist,
|
||||
@@ -535,20 +527,6 @@ impl Connection {
|
||||
}))
|
||||
.await?
|
||||
}
|
||||
Some(FeMessage::ZenithTruncRequest(_)) => {
|
||||
self.write_message(&BeMessage::ZenithStatusResponse(ZenithStatusResponse {
|
||||
ok: true,
|
||||
n_blocks: 0,
|
||||
}))
|
||||
.await?
|
||||
}
|
||||
Some(FeMessage::ZenithUnlinkRequest(_)) => {
|
||||
self.write_message(&BeMessage::ZenithStatusResponse(ZenithStatusResponse {
|
||||
ok: true,
|
||||
n_blocks: 0,
|
||||
}))
|
||||
.await?
|
||||
}
|
||||
Some(FeMessage::ZenithNblocksRequest(req)) => {
|
||||
let tag = page_cache::RelTag {
|
||||
spcnode: req.spcnode,
|
||||
@@ -557,7 +535,7 @@ impl Connection {
|
||||
forknum: req.forknum,
|
||||
};
|
||||
|
||||
let n_blocks = pcache.relsize_get(&tag);
|
||||
let n_blocks = pcache.relsize_get(&tag, req.lsn).unwrap_or(0);
|
||||
|
||||
self.write_message(&BeMessage::ZenithNblocksResponse(ZenithStatusResponse {
|
||||
ok: true,
|
||||
@@ -595,38 +573,6 @@ impl Connection {
|
||||
|
||||
self.write_message(&msg).await?
|
||||
}
|
||||
Some(FeMessage::ZenithCreateRequest(req)) => {
|
||||
let tag = page_cache::RelTag {
|
||||
spcnode: req.spcnode,
|
||||
dbnode: req.dbnode,
|
||||
relnode: req.relnode,
|
||||
forknum: req.forknum,
|
||||
};
|
||||
|
||||
pcache.relsize_inc(&tag, None);
|
||||
|
||||
self.write_message(&BeMessage::ZenithStatusResponse(ZenithStatusResponse {
|
||||
ok: true,
|
||||
n_blocks: 0,
|
||||
}))
|
||||
.await?
|
||||
}
|
||||
Some(FeMessage::ZenithExtendRequest(req)) => {
|
||||
let tag = page_cache::RelTag {
|
||||
spcnode: req.spcnode,
|
||||
dbnode: req.dbnode,
|
||||
relnode: req.relnode,
|
||||
forknum: req.forknum,
|
||||
};
|
||||
|
||||
pcache.relsize_inc(&tag, Some(req.blkno));
|
||||
|
||||
self.write_message(&BeMessage::ZenithStatusResponse(ZenithStatusResponse {
|
||||
ok: true,
|
||||
n_blocks: 0,
|
||||
}))
|
||||
.await?
|
||||
}
|
||||
_ => {}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -315,13 +315,6 @@ async fn slurp_base_file(
|
||||
|
||||
let pcache = page_cache::get_pagecache(conf, sys_id);
|
||||
|
||||
let reltag = page_cache::RelTag {
|
||||
spcnode: parsed.spcnode,
|
||||
dbnode: parsed.dbnode,
|
||||
relnode: parsed.relnode,
|
||||
forknum: parsed.forknum as u8,
|
||||
};
|
||||
|
||||
while bytes.remaining() >= 8192 {
|
||||
let tag = page_cache::BufferTag {
|
||||
rel: page_cache::RelTag {
|
||||
@@ -335,7 +328,6 @@ async fn slurp_base_file(
|
||||
|
||||
pcache.put_page_image(tag, parsed.lsn, bytes.copy_to_bytes(8192));
|
||||
|
||||
pcache.relsize_inc(&reltag, Some(blknum));
|
||||
blknum += 1;
|
||||
}
|
||||
}
|
||||
|
||||
2
vendor/postgres
vendored
2
vendor/postgres
vendored
Submodule vendor/postgres updated: 167196910d...9f9aa9c300
Reference in New Issue
Block a user