From 936cad17e49a515f3196d8acaa64ba1a1ef98437 Mon Sep 17 00:00:00 2001 From: Konstantin Knizhnik Date: Tue, 20 Apr 2021 18:28:35 +0300 Subject: [PATCH] LSN-aware smgrnblock/smgrexists implementations --- integration_tests/tests/test_pageserver.rs | 1 - pageserver/src/page_cache.rs | 159 ++++++++------------- pageserver/src/page_service.rs | 62 +------- pageserver/src/restore_datadir.rs | 8 -- vendor/postgres | 2 +- 5 files changed, 63 insertions(+), 169 deletions(-) diff --git a/integration_tests/tests/test_pageserver.rs b/integration_tests/tests/test_pageserver.rs index bfb9b71d0f..8af066ae90 100644 --- a/integration_tests/tests/test_pageserver.rs +++ b/integration_tests/tests/test_pageserver.rs @@ -50,7 +50,6 @@ fn test_redo_cases() { // Runs pg_regress on a compute node #[test] -#[ignore] fn test_regress() { // Start pageserver that reads WAL directly from that postgres let storage_cplane = TestStorageControlPlane::one_page_server(String::new()); diff --git a/pageserver/src/page_cache.rs b/pageserver/src/page_cache.rs index 46474921a7..336eba1f55 100644 --- a/pageserver/src/page_cache.rs +++ b/pageserver/src/page_cache.rs @@ -79,13 +79,6 @@ impl AddAssign for PageCacheStats { // Shared data structure, holding page cache and related auxiliary information // struct PageCacheShared { - // Relation n_blocks cache - // - // This hashtable should be updated together with the pagecache. Now it is - // accessed unreasonably often through the smgr_nblocks(). It is better to just - // cache it in postgres smgr and ask only on restart. - relsize_cache: HashMap, - // What page versions do we hold in the cache? If we get GetPage with // LSN < first_valid_lsn, that's an error because we (no longer) hold that // page version. If we get a request > last_valid_lsn, we need to wait until @@ -148,7 +141,6 @@ fn init_page_cache(conf: &PageServerConf, sys_id: u64) -> PageCache { PageCache { db: open_rocksdb(&conf, sys_id), shared: Mutex::new(PageCacheShared { - relsize_cache: HashMap::new(), first_valid_lsn: 0, last_valid_lsn: 0, last_record_lsn: 0, @@ -338,6 +330,46 @@ impl WALRecord { // Public interface functions impl PageCache { + fn wait_lsn(&self, lsn: u64) -> anyhow::Result<()> { + let mut shared = self.shared.lock().unwrap(); + let mut waited = false; + + while lsn > shared.last_valid_lsn { + // TODO: Wait for the WAL receiver to catch up + waited = true; + trace!( + "not caught up yet: {}, requested {}", + shared.last_valid_lsn, + lsn + ); + let wait_result = self + .valid_lsn_condvar + .wait_timeout(shared, TIMEOUT) + .unwrap(); + + shared = wait_result.0; + if wait_result.1.timed_out() { + bail!( + "Timed out while waiting for WAL record at LSN {:X}/{:X} to arrive", + lsn >> 32, + lsn & 0xffff_ffff + ); + } + } + if waited { + trace!("caught up now, continuing"); + } + + if lsn < shared.first_valid_lsn { + bail!( + "LSN {:X}/{:X} has already been removed", + lsn >> 32, + lsn & 0xffff_ffff + ); + } + Ok(()) + } + // // GetPage@LSN // @@ -346,49 +378,12 @@ impl PageCache { pub fn get_page_at_lsn(&self, tag: BufferTag, lsn: u64) -> anyhow::Result { self.num_getpage_requests.fetch_add(1, Ordering::Relaxed); + self.wait_lsn(lsn)?; + // Look up cache entry. If it's a page image, return that. If it's a WAL record, // ask the WAL redo service to reconstruct the page image from the WAL records. let minkey = CacheKey { tag, lsn: 0 }; let maxkey = CacheKey { tag, lsn }; - - { - let mut shared = self.shared.lock().unwrap(); - let mut waited = false; - - while lsn > shared.last_valid_lsn { - // TODO: Wait for the WAL receiver to catch up - waited = true; - trace!( - "not caught up yet: {}, requested {}", - shared.last_valid_lsn, - lsn - ); - let wait_result = self - .valid_lsn_condvar - .wait_timeout(shared, TIMEOUT) - .unwrap(); - - shared = wait_result.0; - if wait_result.1.timed_out() { - bail!( - "Timed out while waiting for WAL record at LSN {:X}/{:X} to arrive", - lsn >> 32, - lsn & 0xffff_ffff - ); - } - } - if waited { - trace!("caught up now, continuing"); - } - - if lsn < shared.first_valid_lsn { - bail!( - "LSN {:X}/{:X} has already been removed", - lsn >> 32, - lsn & 0xffff_ffff - ); - } - } let mut buf = BytesMut::new(); minkey.pack(&mut buf); @@ -521,17 +516,6 @@ impl PageCache { return (base_img, records); } - fn update_rel_size(&self, tag: &BufferTag) { - let mut shared = self.shared.lock().unwrap(); - let rel_entry = shared - .relsize_cache - .entry(tag.rel) - .or_insert(tag.blknum + 1); - if tag.blknum >= *rel_entry { - *rel_entry = tag.blknum + 1; - } - } - // // Adds a WAL record to the page cache // @@ -544,8 +528,6 @@ impl PageCache { apply_pending: false, }; - self.update_rel_size(&tag); - let mut key_buf = BytesMut::new(); key.pack(&mut key_buf); let mut val_buf = BytesMut::new(); @@ -564,19 +546,14 @@ impl PageCache { // pub fn put_rel_wal_record(&self, tag: BufferTag, rec: WALRecord) { let mut key = CacheKey { tag, lsn: rec.lsn }; - let old_rel_size = self.relsize_get(&tag.rel); + let old_rel_size = self.relsize_get(&tag.rel, u64::MAX).unwrap(); let content = CacheEntryContent { page_image: None, wal_record: Some(rec), apply_pending: false, }; // set new relation size - self.shared - .lock() - .unwrap() - .relsize_cache - .insert(tag.rel, tag.blknum); - info!("Truncate relation {:?}", tag); + trace!("Truncate relation {:?}", tag); let mut key_buf = BytesMut::new(); let mut val_buf = BytesMut::new(); content.pack(&mut val_buf); @@ -692,31 +669,17 @@ impl PageCache { return shared.last_record_lsn; } - // FIXME: Shouldn't relation size also be tracked with an LSN? - // If a replica is lagging behind, it needs to get the size as it was on - // the replica's current replay LSN. - pub fn relsize_inc(&self, rel: &RelTag, to: Option) { - let mut shared = self.shared.lock().unwrap(); - let entry = shared.relsize_cache.entry(*rel).or_insert(0); + pub fn relsize_get(&self, rel: &RelTag, lsn: u64) -> anyhow::Result { + if lsn != u64::MAX { + self.wait_lsn(lsn)?; + } - if let Some(to) = to { - if to >= *entry { - *entry = to + 1; - } - } - } - - pub fn relsize_get(&self, rel: &RelTag) -> u32 { - let mut shared = self.shared.lock().unwrap(); - if let Some(relsize) = shared.relsize_cache.get(rel) { - return *relsize; - } let mut key = CacheKey { tag: BufferTag { rel: *rel, blknum: u32::MAX, }, - lsn: u64::MAX, + lsn, }; let mut buf = BytesMut::new(); @@ -744,44 +707,38 @@ impl PageCache { } } let relsize = tag.blknum + 1; - shared.relsize_cache.insert(*rel, relsize); - return relsize; + return Ok(relsize); } } break; } - return 0; + return Ok(0); } - pub fn relsize_exist(&self, rel: &RelTag) -> bool { - let mut shared = self.shared.lock().unwrap(); - let relsize_cache = &shared.relsize_cache; - if relsize_cache.contains_key(rel) { - return true; - } + pub fn relsize_exist(&self, rel: &RelTag, lsn: u64) -> anyhow::Result { + self.wait_lsn(lsn)?; let key = CacheKey { tag: BufferTag { rel: *rel, - blknum: 0, + blknum: u32::MAX, }, - lsn: 0, + lsn, }; let mut buf = BytesMut::new(); key.pack(&mut buf); let mut iter = self .db - .iterator(IteratorMode::From(&buf[..], Direction::Forward)); + .iterator(IteratorMode::From(&buf[..], Direction::Reverse)); if let Some((k, _v)) = iter.next() { buf.clear(); buf.extend_from_slice(&k); let tag = BufferTag::unpack(&mut buf); if tag.rel == *rel { - shared.relsize_cache.insert(*rel, tag.blknum + 1); - return true; + return Ok(true); } } - return false; + return Ok(false); } pub fn get_stats(&self) -> PageCacheStats { diff --git a/pageserver/src/page_service.rs b/pageserver/src/page_service.rs index 84e155c940..2335cc3bce 100644 --- a/pageserver/src/page_service.rs +++ b/pageserver/src/page_service.rs @@ -36,12 +36,8 @@ enum FeMessage { // All that messages are actually CopyData from libpq point of view. // ZenithExistsRequest(ZenithRequest), - ZenithTruncRequest(ZenithRequest), - ZenithUnlinkRequest(ZenithRequest), ZenithNblocksRequest(ZenithRequest), ZenithReadRequest(ZenithRequest), - ZenithCreateRequest(ZenithRequest), - ZenithExtendRequest(ZenithRequest), } #[derive(Debug)] @@ -193,12 +189,8 @@ impl FeMessage { // serialization. match smgr_tag { 0 => Ok(Some(FeMessage::ZenithExistsRequest(zreq))), - 1 => Ok(Some(FeMessage::ZenithTruncRequest(zreq))), - 2 => Ok(Some(FeMessage::ZenithUnlinkRequest(zreq))), - 3 => Ok(Some(FeMessage::ZenithNblocksRequest(zreq))), - 4 => Ok(Some(FeMessage::ZenithReadRequest(zreq))), - 5 => Ok(Some(FeMessage::ZenithCreateRequest(zreq))), - 6 => Ok(Some(FeMessage::ZenithExtendRequest(zreq))), + 1 => Ok(Some(FeMessage::ZenithNblocksRequest(zreq))), + 2 => Ok(Some(FeMessage::ZenithReadRequest(zreq))), _ => Err(io::Error::new( io::ErrorKind::InvalidInput, format!("unknown smgr message tag: {},'{:?}'", smgr_tag, buf), @@ -527,7 +519,7 @@ impl Connection { forknum: req.forknum, }; - let exist = pcache.relsize_exist(&tag); + let exist = pcache.relsize_exist(&tag, req.lsn).unwrap_or(false); self.write_message(&BeMessage::ZenithStatusResponse(ZenithStatusResponse { ok: exist, @@ -535,20 +527,6 @@ impl Connection { })) .await? } - Some(FeMessage::ZenithTruncRequest(_)) => { - self.write_message(&BeMessage::ZenithStatusResponse(ZenithStatusResponse { - ok: true, - n_blocks: 0, - })) - .await? - } - Some(FeMessage::ZenithUnlinkRequest(_)) => { - self.write_message(&BeMessage::ZenithStatusResponse(ZenithStatusResponse { - ok: true, - n_blocks: 0, - })) - .await? - } Some(FeMessage::ZenithNblocksRequest(req)) => { let tag = page_cache::RelTag { spcnode: req.spcnode, @@ -557,7 +535,7 @@ impl Connection { forknum: req.forknum, }; - let n_blocks = pcache.relsize_get(&tag); + let n_blocks = pcache.relsize_get(&tag, req.lsn).unwrap_or(0); self.write_message(&BeMessage::ZenithNblocksResponse(ZenithStatusResponse { ok: true, @@ -595,38 +573,6 @@ impl Connection { self.write_message(&msg).await? } - Some(FeMessage::ZenithCreateRequest(req)) => { - let tag = page_cache::RelTag { - spcnode: req.spcnode, - dbnode: req.dbnode, - relnode: req.relnode, - forknum: req.forknum, - }; - - pcache.relsize_inc(&tag, None); - - self.write_message(&BeMessage::ZenithStatusResponse(ZenithStatusResponse { - ok: true, - n_blocks: 0, - })) - .await? - } - Some(FeMessage::ZenithExtendRequest(req)) => { - let tag = page_cache::RelTag { - spcnode: req.spcnode, - dbnode: req.dbnode, - relnode: req.relnode, - forknum: req.forknum, - }; - - pcache.relsize_inc(&tag, Some(req.blkno)); - - self.write_message(&BeMessage::ZenithStatusResponse(ZenithStatusResponse { - ok: true, - n_blocks: 0, - })) - .await? - } _ => {} } } diff --git a/pageserver/src/restore_datadir.rs b/pageserver/src/restore_datadir.rs index 3b28d64585..b16276e7ab 100644 --- a/pageserver/src/restore_datadir.rs +++ b/pageserver/src/restore_datadir.rs @@ -315,13 +315,6 @@ async fn slurp_base_file( let pcache = page_cache::get_pagecache(conf, sys_id); - let reltag = page_cache::RelTag { - spcnode: parsed.spcnode, - dbnode: parsed.dbnode, - relnode: parsed.relnode, - forknum: parsed.forknum as u8, - }; - while bytes.remaining() >= 8192 { let tag = page_cache::BufferTag { rel: page_cache::RelTag { @@ -335,7 +328,6 @@ async fn slurp_base_file( pcache.put_page_image(tag, parsed.lsn, bytes.copy_to_bytes(8192)); - pcache.relsize_inc(&reltag, Some(blknum)); blknum += 1; } } diff --git a/vendor/postgres b/vendor/postgres index 167196910d..9f9aa9c300 160000 --- a/vendor/postgres +++ b/vendor/postgres @@ -1 +1 @@ -Subproject commit 167196910d6f41466c82793bcf14bfe442468776 +Subproject commit 9f9aa9c300c9bbac296e2c126b3f96701d4e683d