From 07fb30747ac28e74d22ad6670997577a33b24e73 Mon Sep 17 00:00:00 2001 From: Konstantin Knizhnik Date: Thu, 8 Apr 2021 19:39:30 +0300 Subject: [PATCH] Store pageserver data in RocksDB --- Cargo.lock | 122 +++++++++ pageserver/Cargo.toml | 1 + pageserver/src/page_cache.rs | 488 ++++++++++++++++++++------------- pageserver/src/page_service.rs | 10 +- pageserver/src/restore_s3.rs | 10 +- pageserver/src/walreceiver.rs | 12 +- pageserver/src/walredo.rs | 25 +- 7 files changed, 444 insertions(+), 224 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 0ac61eb60d..fb694ec2f6 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -241,6 +241,25 @@ version = "0.13.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "904dfeac50f3cdaba28fc6f57fdcddb75f49ed61346676a78c4ffe55877802fd" +[[package]] +name = "bindgen" +version = "0.57.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "fd4865004a46a0aafb2a0a5eb19d3c9fc46ee5f063a6cfc605c69ac9ecf5263d" +dependencies = [ + "bitflags", + "cexpr", + "clang-sys", + "lazy_static", + "lazycell", + "peeking_take_while", + "proc-macro2", + "quote", + "regex", + "rustc-hash", + "shlex", +] + [[package]] name = "bitflags" version = "1.2.1" @@ -322,6 +341,18 @@ name = "cc" version = "1.0.67" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e3c69b077ad434294d3ce9f1f6143a2a4b89a8a2d54ef813d85003a4fd1137fd" +dependencies = [ + "jobserver", +] + +[[package]] +name = "cexpr" +version = "0.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f4aedb84272dbe89af497cf81375129abda4fc0a9e7c5d317498c15cc30c0d27" +dependencies = [ + "nom", +] [[package]] name = "cfg-if" @@ -348,6 +379,17 @@ dependencies = [ "winapi", ] +[[package]] +name = "clang-sys" +version = "1.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "853eda514c284c2287f4bf20ae614f8781f40a81d32ecda6e91449304dfe077c" +dependencies = [ + "glob", + "libc", + "libloading", +] + [[package]] name = "clap" version = "2.33.3" @@ -729,6 +771,12 @@ dependencies = [ "wasi 0.10.0+wasi-snapshot-preview1", ] +[[package]] +name = "glob" +version = "0.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9b919933a397b79c37e33b77bb2aa3dc8eb6e165ad809e58ff75bc7db2e34574" + [[package]] name = "gloo-timers" version = "0.2.1" @@ -920,6 +968,15 @@ version = "0.4.7" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "dd25036021b0de88a0aff6b850051563c6516d0bf53f8638938edbb9de732736" +[[package]] +name = "jobserver" +version = "0.1.21" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5c71313ebb9439f74b00d9d2dcec36440beaf57a6aa0623068441dd7cd81a7f2" +dependencies = [ + "libc", +] + [[package]] name = "js-sys" version = "0.3.50" @@ -944,12 +1001,39 @@ version = "1.4.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e2abad23fbc42b3700f2f279844dc832adb2b2eb069b2df918f455c4e18cc646" +[[package]] +name = "lazycell" +version = "1.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "830d08ce1d1d941e6b30645f1a0eb5643013d835ce3779a5fc208261dbe10f55" + [[package]] name = "libc" version = "0.2.92" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "56d855069fafbb9b344c0f962150cd2c1187975cb1c22c1522c240d8c4986714" +[[package]] +name = "libloading" +version = "0.7.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6f84d96438c15fcd6c3f244c8fce01d1e2b9c6b5623e9c711dc9286d8fc92d6a" +dependencies = [ + "cfg-if 1.0.0", + "winapi", +] + +[[package]] +name = "librocksdb-sys" +version = "6.17.3" +source = "git+https://github.com/rust-rocksdb/rust-rocksdb.git#0b700fe70da8ee30483fde79f44df549f8fe11ec" +dependencies = [ + "bindgen", + "cc", + "glob", + "libc", +] + [[package]] name = "lock_api" version = "0.4.3" @@ -1065,6 +1149,16 @@ dependencies = [ "socket2 0.4.0", ] +[[package]] +name = "nom" +version = "5.1.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ffb4262d26ed83a1c0a33a38fe2bb15797329c85770da05e6b828ddb782627af" +dependencies = [ + "memchr", + "version_check", +] + [[package]] name = "ntapi" version = "0.3.6" @@ -1184,6 +1278,7 @@ dependencies = [ "postgres-protocol", "rand 0.8.3", "regex", + "rocksdb", "rust-s3", "slog", "slog-async", @@ -1228,6 +1323,12 @@ dependencies = [ "winapi", ] +[[package]] +name = "peeking_take_while" +version = "0.1.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "19b17cddbe7ec3f8bc800887bab5e717348c95ea2ca0b1bf0837fb964dc67099" + [[package]] name = "percent-encoding" version = "2.1.0" @@ -1566,6 +1667,15 @@ dependencies = [ "winreg", ] +[[package]] +name = "rocksdb" +version = "0.15.0" +source = "git+https://github.com/rust-rocksdb/rust-rocksdb.git#0b700fe70da8ee30483fde79f44df549f8fe11ec" +dependencies = [ + "libc", + "librocksdb-sys", +] + [[package]] name = "rust-argon2" version = "0.8.3" @@ -1619,6 +1729,12 @@ dependencies = [ "url", ] +[[package]] +name = "rustc-hash" +version = "1.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "08d43f7aa6b08d49f382cde6a7982047c3426db949b1424bc4b7ec9ae12c6ce2" + [[package]] name = "rustc_version" version = "0.2.3" @@ -1759,6 +1875,12 @@ dependencies = [ "opaque-debug", ] +[[package]] +name = "shlex" +version = "0.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7fdf1b9db47230893d76faad238fd6097fd6d6a9245cd7a4d90dbd639536bbd2" + [[package]] name = "signal-hook-registry" version = "1.3.0" diff --git a/pageserver/Cargo.toml b/pageserver/Cargo.toml index ab8b78dd2d..8d629deabc 100644 --- a/pageserver/Cargo.toml +++ b/pageserver/Cargo.toml @@ -32,5 +32,6 @@ tokio-stream = { version = "0.1.4" } tokio-postgres = { git = "https://github.com/kelvich/rust-postgres", branch = "replication_rebase" } postgres-protocol = { git = "https://github.com/kelvich/rust-postgres", branch = "replication_rebase" } postgres = { git = "https://github.com/kelvich/rust-postgres", branch = "replication_rebase" } +rocksdb = { git = "https://github.com/rust-rocksdb/rust-rocksdb.git" } anyhow = "1.0" crc32c = "0.6.0" diff --git a/pageserver/src/page_cache.rs b/pageserver/src/page_cache.rs index 7c77ca5926..ea36de3af3 100644 --- a/pageserver/src/page_cache.rs +++ b/pageserver/src/page_cache.rs @@ -6,8 +6,7 @@ // per-entry mutex. // -use core::ops::Bound::Included; -use std::collections::{BTreeMap, HashMap}; +use bytes::{Buf, BufMut, Bytes, BytesMut}; use std::error::Error; use std::sync::atomic::AtomicU64; use std::sync::atomic::Ordering; @@ -16,10 +15,10 @@ use std::thread; use std::time::Duration; use std::{convert::TryInto, ops::AddAssign}; // use tokio::sync::RwLock; -use bytes::Bytes; use lazy_static::lazy_static; use log::*; -use rand::Rng; +use rocksdb::*; +use std::collections::HashMap; use crate::{walredo, PageServerConf}; @@ -32,6 +31,9 @@ static TIMEOUT: Duration = Duration::from_secs(60); pub struct PageCache { shared: Mutex, + // RocksDB handle + db: DB, + // Channel for communicating with the WAL redo process here. pub walredo_sender: Sender>, pub walredo_receiver: Receiver>, @@ -80,9 +82,6 @@ impl AddAssign for PageCacheStats { // Shared data structure, holding page cache and related auxiliary information // struct PageCacheShared { - // The actual page cache - pagecache: BTreeMap>, - // Relation n_blocks cache // // This hashtable should be updated together with the pagecache. Now it is @@ -117,7 +116,7 @@ pub fn get_pagecache(conf: PageServerConf, sys_id: u64) -> Arc { let mut pcaches = PAGECACHES.lock().unwrap(); if !pcaches.contains_key(&sys_id) { - pcaches.insert(sys_id, Arc::new(init_page_cache())); + pcaches.insert(sys_id, Arc::new(init_page_cache(&conf, sys_id))); // Initialize the WAL redo thread // @@ -135,13 +134,22 @@ pub fn get_pagecache(conf: PageServerConf, sys_id: u64) -> Arc { pcaches.get(&sys_id).unwrap().clone() } -fn init_page_cache() -> PageCache { +fn open_rocksdb(conf: &PageServerConf, sys_id: u64) -> DB { + let path = conf.data_dir.join(sys_id.to_string()); + let mut opts = Options::default(); + opts.create_if_missing(true); + opts.set_use_fsync(true); + opts.set_compression_type(DBCompressionType::Lz4); + DB::open(&opts, &path).unwrap() +} + +fn init_page_cache(conf: &PageServerConf, sys_id: u64) -> PageCache { // Initialize the channel between the page cache and the WAL applicator let (s, r) = unbounded(); PageCache { + db: open_rocksdb(&conf, sys_id), shared: Mutex::new(PageCacheShared { - pagecache: BTreeMap::new(), relsize_cache: HashMap::new(), first_valid_lsn: 0, last_valid_lsn: 0, @@ -182,6 +190,19 @@ pub struct CacheKey { pub lsn: u64, } +impl CacheKey { + pub fn pack(&self, buf: &mut BytesMut) { + self.tag.pack(buf); + buf.put_u64(self.lsn); + } + pub fn unpack(buf: &mut BytesMut) -> CacheKey { + CacheKey { + tag: BufferTag::unpack(buf), + lsn: buf.get_u64(), + } + } +} + pub struct CacheEntry { pub key: CacheKey, @@ -201,21 +222,47 @@ pub struct CacheEntryContent { pub apply_pending: bool, } -impl CacheEntry { - fn new(key: CacheKey) -> CacheEntry { - CacheEntry { - key: key, - content: Mutex::new(CacheEntryContent { - page_image: None, +impl CacheEntryContent { + pub fn pack(&self, buf: &mut BytesMut) { + if let Some(image) = &self.page_image { + buf.put_u8(1); + buf.put_u16(image.len() as u16); + buf.put_slice(&image[..]); + } else if let Some(rec) = &self.wal_record { + buf.put_u8(0); + rec.pack(buf); + } + } + pub fn unpack(buf: &mut BytesMut) -> CacheEntryContent { + if buf.get_u8() == 1 { + let mut dst = vec![0u8; buf.get_u16() as usize]; + buf.copy_to_slice(&mut dst); + CacheEntryContent { + page_image: Some(Bytes::from(dst)), wal_record: None, apply_pending: false, - }), + } + } else { + CacheEntryContent { + page_image: None, + wal_record: Some(WALRecord::unpack(buf)), + apply_pending: false, + } + } + } +} + +impl CacheEntry { + fn new(key: CacheKey, content: CacheEntryContent) -> CacheEntry { + CacheEntry { + key, + content: Mutex::new(content), walredo_condvar: Condvar::new(), } } } -#[derive(Eq, PartialEq, Hash, Clone, Copy)] +#[derive(Debug, PartialEq, Eq, PartialOrd, Hash, Ord, Clone, Copy)] pub struct RelTag { pub spcnode: u32, pub dbnode: u32, @@ -223,15 +270,42 @@ pub struct RelTag { pub forknum: u8, } -#[derive(PartialEq, Eq, PartialOrd, Ord, Clone, Copy)] +impl RelTag { + pub fn pack(&self, buf: &mut BytesMut) { + buf.put_u32(self.spcnode); + buf.put_u32(self.dbnode); + buf.put_u32(self.relnode); + buf.put_u32(self.forknum as u32); + } + pub fn unpack(buf: &mut BytesMut) -> RelTag { + RelTag { + spcnode: buf.get_u32(), + dbnode: buf.get_u32(), + relnode: buf.get_u32(), + forknum: buf.get_u32() as u8, + } + } +} + +#[derive(Debug, PartialEq, Eq, PartialOrd, Ord, Clone, Copy)] pub struct BufferTag { - pub spcnode: u32, - pub dbnode: u32, - pub relnode: u32, - pub forknum: u8, + pub rel: RelTag, pub blknum: u32, } +impl BufferTag { + pub fn pack(&self, buf: &mut BytesMut) { + self.rel.pack(buf); + buf.put_u32(self.blknum); + } + pub fn unpack(buf: &mut BytesMut) -> BufferTag { + BufferTag { + rel: RelTag::unpack(buf), + blknum: buf.get_u32(), + } + } +} + #[derive(Clone)] pub struct WALRecord { pub lsn: u64, // LSN at the *end* of the record @@ -239,6 +313,26 @@ pub struct WALRecord { pub rec: Bytes, } +impl WALRecord { + pub fn pack(&self, buf: &mut BytesMut) { + buf.put_u64(self.lsn); + buf.put_u8(self.will_init as u8); + buf.put_u16(self.rec.len() as u16); + buf.put_slice(&self.rec[..]); + } + pub fn unpack(buf: &mut BytesMut) -> WALRecord { + let lsn = buf.get_u64(); + let will_init = buf.get_u8() != 0; + let mut dst = vec![0u8; buf.get_u16() as usize]; + buf.copy_to_slice(&mut dst); + WALRecord { + lsn, + will_init, + rec: Bytes::from(dst), + } + } +} + // Public interface functions impl PageCache { @@ -252,10 +346,9 @@ impl PageCache { // Look up cache entry. If it's a page image, return that. If it's a WAL record, // ask the WAL redo service to reconstruct the page image from the WAL records. - let minkey = CacheKey { tag: tag, lsn: 0 }; - let maxkey = CacheKey { tag: tag, lsn: lsn }; + let minkey = CacheKey { tag, lsn: 0 }; + let maxkey = CacheKey { tag, lsn }; - let entry_rc: Arc; { let mut shared = self.shared.lock().unwrap(); let mut waited = false; @@ -275,6 +368,10 @@ impl PageCache { shared = wait_result.0; if wait_result.1.timed_out() { + error!( + "Timed out while waiting for WAL record at LSN {} to arrive", + lsn + ); return Err(format!( "Timed out while waiting for WAL record at LSN {} to arrive", lsn @@ -286,68 +383,66 @@ impl PageCache { } if lsn < shared.first_valid_lsn { + error!( + "LSN {} has already been removed", + lsn + ); return Err(format!("LSN {} has already been removed", lsn))?; } - - let pagecache = &shared.pagecache; - - let mut entries = pagecache.range((Included(&minkey), Included(&maxkey))); - - let entry_opt = entries.next_back(); - - if entry_opt.is_none() { - static ZERO_PAGE: [u8; 8192] = [0 as u8; 8192]; - return Ok(Bytes::from_static(&ZERO_PAGE)); - /* return Err("could not find page image")?; */ - } - let (_key, entry) = entry_opt.unwrap(); - entry_rc = entry.clone(); - - // Now that we have a reference to the cache entry, drop the lock on the map. - // It's important to do this before waiting on the condition variable below, - // and better to do it as soon as possible to maximize concurrency. } + let mut buf = BytesMut::new(); + minkey.pack(&mut buf); - // Lock the cache entry and dig the page image out of it. + let mut readopts = ReadOptions::default(); + readopts.set_iterate_lower_bound(buf.to_vec()); + + buf.clear(); + maxkey.pack(&mut buf); + let mut iter = self + .db + .iterator_opt(IteratorMode::From(&buf[..], Direction::Reverse), readopts); + let entry_opt = iter.next(); + + if entry_opt.is_none() { + static ZERO_PAGE: [u8; 8192] = [0 as u8; 8192]; + return Ok(Bytes::from_static(&ZERO_PAGE)); + /* return Err("could not find page image")?; */ + } + let (k, v) = entry_opt.unwrap(); + buf.clear(); + buf.extend_from_slice(&v); + let content = CacheEntryContent::unpack(&mut buf); let page_img: Bytes; - { + if let Some(img) = &content.page_image { + page_img = img.clone(); + } else if content.wal_record.is_some() { + buf.clear(); + buf.extend_from_slice(&k); + let entry_rc = Arc::new(CacheEntry::new(CacheKey::unpack(&mut buf), content)); + let mut entry_content = entry_rc.content.lock().unwrap(); + entry_content.apply_pending = true; - if let Some(img) = &entry_content.page_image { - assert!(!entry_content.apply_pending); - page_img = img.clone(); - } else if entry_content.wal_record.is_some() { - // - // If this page needs to be reconstructed by applying some WAL, - // send a request to the WAL redo thread. - // - if !entry_content.apply_pending { - assert!(!entry_content.apply_pending); - entry_content.apply_pending = true; + let s = &self.walredo_sender; + s.send(entry_rc.clone())?; - let s = &self.walredo_sender; - s.send(entry_rc.clone())?; - } - - while entry_content.apply_pending { - entry_content = entry_rc.walredo_condvar.wait(entry_content).unwrap(); - } - - // We should now have a page image. If we don't, it means that WAL redo - // failed to reconstruct it. WAL redo should've logged that error already. - page_img = match &entry_content.page_image { - Some(p) => p.clone(), - None => { - error!( - "could not apply WAL to reconstruct page image for GetPage@LSN request" - ); - return Err("could not apply WAL to reconstruct page image".into()); - } - }; - } else { - // No base image, and no WAL record. Huh? - return Err(format!("no page image or WAL record for requested page"))?; + while entry_content.apply_pending { + entry_content = entry_rc.walredo_condvar.wait(entry_content).unwrap(); } + + // We should now have a page image. If we don't, it means that WAL redo + // failed to reconstruct it. WAL redo should've logged that error already. + page_img = match &entry_content.page_image { + Some(p) => p.clone(), + None => { + error!("could not apply WAL to reconstruct page image for GetPage@LSN request"); + return Err("could not apply WAL to reconstruct page image".into()); + } + }; + self.put_page_image(tag, lsn, page_img.clone()); + } else { + // No base image, and no WAL record. Huh? + return Err(format!("no page image or WAL record for requested page"))?; } // FIXME: assumes little-endian. Only used for the debugging log though @@ -357,10 +452,10 @@ impl PageCache { "Returning page with LSN {:X}/{:X} for {}/{}/{}.{} blk {}", page_lsn_hi, page_lsn_lo, - tag.spcnode, - tag.dbnode, - tag.relnode, - tag.forknum, + tag.rel.spcnode, + tag.rel.dbnode, + tag.rel.relnode, + tag.rel.forknum, tag.blknum ); @@ -375,38 +470,41 @@ impl PageCache { // over it. // pub fn collect_records_for_apply(&self, entry: &CacheEntry) -> (Option, Vec) { - // Scan the BTreeMap backwards, starting from the given entry. - let shared = self.shared.lock().unwrap(); - let pagecache = &shared.pagecache; - let minkey = CacheKey { - tag: entry.key.tag, + tag: BufferTag { + rel: entry.key.tag.rel, + blknum: 0, + }, lsn: 0, }; - let maxkey = CacheKey { - tag: entry.key.tag, - lsn: entry.key.lsn, - }; - let entries = pagecache.range((Included(&minkey), Included(&maxkey))); - // the last entry in the range should be the CacheEntry we were given - //let _last_entry = entries.next_back(); - //assert!(last_entry == entry); + let mut buf = BytesMut::new(); + minkey.pack(&mut buf); + + let mut readopts = ReadOptions::default(); + readopts.set_iterate_lower_bound(buf.to_vec()); + + buf.clear(); + entry.key.pack(&mut buf); + let iter = self + .db + .iterator_opt(IteratorMode::From(&buf[..], Direction::Reverse), readopts); let mut base_img: Option = None; let mut records: Vec = Vec::new(); // Scan backwards, collecting the WAL records, until we hit an // old page image. - for (_key, e) in entries.rev() { - let e = e.content.lock().unwrap(); - - if let Some(img) = &e.page_image { + for (_k, v) in iter { + buf.clear(); + buf.extend_from_slice(&v); + let content = CacheEntryContent::unpack(&mut buf); + if let Some(img) = &content.page_image { // We have a base image. No need to dig deeper into the list of // records base_img = Some(img.clone()); break; - } else if let Some(rec) = &e.wal_record { + } else if let Some(rec) = &content.wal_record { records.push(rec.clone()); // If this WAL record initializes the page, no need to dig deeper. @@ -422,40 +520,40 @@ impl PageCache { return (base_img, records); } + fn update_rel_size(&self, tag: &BufferTag) { + let mut shared = self.shared.lock().unwrap(); + let rel_entry = shared + .relsize_cache + .entry(tag.rel) + .or_insert(tag.blknum + 1); + if tag.blknum >= *rel_entry { + *rel_entry = tag.blknum + 1; + } + } + // // Adds a WAL record to the page cache // pub fn put_wal_record(&self, tag: BufferTag, rec: WALRecord) { - let key = CacheKey { - tag: tag, - lsn: rec.lsn, + let key = CacheKey { tag, lsn: rec.lsn }; + + let content = CacheEntryContent { + page_image: None, + wal_record: Some(rec), + apply_pending: false, }; - let entry = CacheEntry::new(key.clone()); - entry.content.lock().unwrap().wal_record = Some(rec); + self.update_rel_size(&tag); - let mut shared = self.shared.lock().unwrap(); - - let rel_tag = RelTag { - spcnode: tag.spcnode, - dbnode: tag.dbnode, - relnode: tag.relnode, - forknum: tag.forknum, - }; - let rel_entry = shared.relsize_cache.entry(rel_tag).or_insert(0); - if tag.blknum >= *rel_entry { - *rel_entry = tag.blknum + 1; - } + let mut key_buf = BytesMut::new(); + key.pack(&mut key_buf); + let mut val_buf = BytesMut::new(); + content.pack(&mut val_buf); trace!("put_wal_record lsn: {}", key.lsn); + let _res = self.db.put(&key_buf[..], &val_buf[..]); - let oldentry = shared.pagecache.insert(key, Arc::new(entry)); self.num_entries.fetch_add(1, Ordering::Relaxed); - - if !oldentry.is_none() { - error!("overwriting WAL record in page cache"); - } - self.num_wal_records.fetch_add(1, Ordering::Relaxed); } @@ -463,21 +561,23 @@ impl PageCache { // Memorize a full image of a page version // pub fn put_page_image(&self, tag: BufferTag, lsn: u64, img: Bytes) { - let key = CacheKey { tag: tag, lsn: lsn }; + let key = CacheKey { tag, lsn }; + let content = CacheEntryContent { + page_image: Some(img), + wal_record: None, + apply_pending: false, + }; - let entry = CacheEntry::new(key.clone()); - entry.content.lock().unwrap().page_image = Some(img); + let mut key_buf = BytesMut::new(); + key.pack(&mut key_buf); + let mut val_buf = BytesMut::new(); + content.pack(&mut val_buf); - let mut shared = self.shared.lock().unwrap(); - let pagecache = &mut shared.pagecache; - - let oldentry = pagecache.insert(key, Arc::new(entry)); - self.num_entries.fetch_add(1, Ordering::Relaxed); - assert!(oldentry.is_none()); + trace!("put_wal_record lsn: {}", key.lsn); + let _res = self.db.put(&key_buf[..], &val_buf[..]); //debug!("inserted page image for {}/{}/{}_{} blk {} at {}", // tag.spcnode, tag.dbnode, tag.relnode, tag.forknum, tag.blknum, lsn); - self.num_page_images.fetch_add(1, Ordering::Relaxed); } @@ -486,12 +586,15 @@ impl PageCache { let mut shared = self.shared.lock().unwrap(); // Can't move backwards. - assert!(lsn >= shared.last_valid_lsn); + //assert!(lsn >= shared.last_valid_lsn); + if lsn > shared.last_valid_lsn { + shared.last_valid_lsn = lsn; + self.valid_lsn_condvar.notify_all(); - shared.last_valid_lsn = lsn; - self.valid_lsn_condvar.notify_all(); - - self.last_valid_lsn.store(lsn, Ordering::Relaxed); + self.last_valid_lsn.store(lsn, Ordering::Relaxed); + } else { + trace!("lsn={}, shared.last_valid_lsn={}", lsn, shared.last_valid_lsn); + } } // @@ -509,7 +612,7 @@ impl PageCache { self.valid_lsn_condvar.notify_all(); self.last_valid_lsn.store(lsn, Ordering::Relaxed); - self.last_valid_lsn.store(lsn, Ordering::Relaxed); + self.last_record_lsn.store(lsn, Ordering::Relaxed); } // @@ -549,54 +652,6 @@ impl PageCache { return shared.last_record_lsn; } - // - // Simple test function for the WAL redo code: - // - // 1. Pick a page from the page cache at random. - // 2. Request that page with GetPage@LSN, using Max LSN (i.e. get the latest page version) - // - // - pub fn _test_get_page_at_lsn(&self) { - // for quick testing of the get_page_at_lsn() funcion. - // - // Get a random page from the page cache. Apply all its WAL, by requesting - // that page at the highest lsn. - - let mut tag: Option = None; - - { - let shared = self.shared.lock().unwrap(); - let pagecache = &shared.pagecache; - - if pagecache.is_empty() { - info!("page cache is empty"); - return; - } - - // Find nth entry in the map, where n is picked at random - let n = rand::thread_rng().gen_range(0..pagecache.len()); - let mut i = 0; - for (key, _e) in pagecache.iter() { - if i == n { - tag = Some(key.tag); - break; - } - i += 1; - } - } - - info!("testing GetPage@LSN for block {}", tag.unwrap().blknum); - match self.get_page_at_lsn(tag.unwrap(), 0xffff_ffff_ffff_eeee) { - Ok(_img) => { - // This prints out the whole page image. - //println!("{:X?}", img); - } - Err(error) => { - error!("GetPage@LSN failed: {}", error); - } - } - } - // FIXME: Shouldn't relation size also be tracked with an LSN? // If a replica is lagging behind, it needs to get the size as it was on // the replica's current replay LSN. @@ -613,14 +668,63 @@ impl PageCache { pub fn relsize_get(&self, rel: &RelTag) -> u32 { let mut shared = self.shared.lock().unwrap(); - let entry = shared.relsize_cache.entry(*rel).or_insert(0); - *entry + if let Some(relsize) = shared.relsize_cache.get(rel) { + return *relsize; + } + let key = CacheKey { + tag: BufferTag { + rel: *rel, + blknum: u32::MAX, + }, + lsn: u64::MAX, + }; + let mut buf = BytesMut::new(); + key.pack(&mut buf); + let mut iter = self + .db + .iterator(IteratorMode::From(&buf[..], Direction::Reverse)); + if let Some((k, _v)) = iter.next() { + buf.clear(); + buf.extend_from_slice(&k); + let tag = BufferTag::unpack(&mut buf); + if tag.rel == *rel { + let relsize = tag.blknum + 1; + shared.relsize_cache.insert(*rel, relsize); + return relsize; + } + } + return 0; } pub fn relsize_exist(&self, rel: &RelTag) -> bool { - let shared = self.shared.lock().unwrap(); + let mut shared = self.shared.lock().unwrap(); let relsize_cache = &shared.relsize_cache; - relsize_cache.contains_key(rel) + if relsize_cache.contains_key(rel) { + return true; + } + + let key = CacheKey { + tag: BufferTag { + rel: *rel, + blknum: 0, + }, + lsn: 0, + }; + let mut buf = BytesMut::new(); + key.pack(&mut buf); + let mut iter = self + .db + .iterator(IteratorMode::From(&buf[..], Direction::Forward)); + if let Some((k, _v)) = iter.next() { + buf.clear(); + buf.extend_from_slice(&k); + let tag = BufferTag::unpack(&mut buf); + if tag.rel == *rel { + shared.relsize_cache.insert(*rel, tag.blknum + 1); + return true; + } + } + return false; } pub fn get_stats(&self) -> PageCacheStats { diff --git a/pageserver/src/page_service.rs b/pageserver/src/page_service.rs index 06760c6f68..0130cbd2f2 100644 --- a/pageserver/src/page_service.rs +++ b/pageserver/src/page_service.rs @@ -566,10 +566,12 @@ impl Connection { } Some(FeMessage::ZenithReadRequest(req)) => { let buf_tag = page_cache::BufferTag { - spcnode: req.spcnode, - dbnode: req.dbnode, - relnode: req.relnode, - forknum: req.forknum, + rel: page_cache::RelTag { + spcnode: req.spcnode, + dbnode: req.dbnode, + relnode: req.relnode, + forknum: req.forknum, + }, blknum: req.blkno, }; diff --git a/pageserver/src/restore_s3.rs b/pageserver/src/restore_s3.rs index 08ba3e7fa3..0884f17453 100644 --- a/pageserver/src/restore_s3.rs +++ b/pageserver/src/restore_s3.rs @@ -309,10 +309,12 @@ async fn slurp_base_file( while bytes.remaining() >= 8192 { let tag = page_cache::BufferTag { - spcnode: parsed.spcnode, - dbnode: parsed.dbnode, - relnode: parsed.relnode, - forknum: parsed.forknum as u8, + rel: page_cache::RelTag { + spcnode: parsed.spcnode, + dbnode: parsed.dbnode, + relnode: parsed.relnode, + forknum: parsed.forknum as u8, + }, blknum: blknum, }; diff --git a/pageserver/src/walreceiver.rs b/pageserver/src/walreceiver.rs index 9f382b2efb..f41c9274b2 100644 --- a/pageserver/src/walreceiver.rs +++ b/pageserver/src/walreceiver.rs @@ -12,7 +12,7 @@ use tokio::time::{sleep, Duration}; use tokio_stream::StreamExt; use crate::page_cache; -use crate::page_cache::BufferTag; +use crate::page_cache::{BufferTag, RelTag}; use crate::waldecoder::WalStreamDecoder; use crate::PageServerConf; @@ -141,10 +141,12 @@ async fn walreceiver_main( // so having multiple copies of it doesn't cost that much) for blk in decoded.blocks.iter() { let tag = BufferTag { - spcnode: blk.rnode_spcnode, - dbnode: blk.rnode_dbnode, - relnode: blk.rnode_relnode, - forknum: blk.forknum as u8, + rel: RelTag { + spcnode: blk.rnode_spcnode, + dbnode: blk.rnode_dbnode, + relnode: blk.rnode_relnode, + forknum: blk.forknum as u8, + }, blknum: blk.blkno, }; diff --git a/pageserver/src/walredo.rs b/pageserver/src/walredo.rs index a06c87d584..90d2ed470e 100644 --- a/pageserver/src/walredo.rs +++ b/pageserver/src/walredo.rs @@ -65,6 +65,7 @@ pub fn wal_redo_main(conf: PageServerConf, sys_id: u64) { let _guard = runtime.enter(); process = WalRedoProcess::launch(&datadir, &runtime).unwrap(); } + info!("WAL redo postgres started"); // Pretty arbitrarily, reuse the same Postgres process for 100 requests. // After that, kill it and start a new one. This is mostly to avoid @@ -76,11 +77,11 @@ pub fn wal_redo_main(conf: PageServerConf, sys_id: u64) { let result = handle_apply_request(&pcache, &process, &runtime, request); if result.is_err() { // On error, kill the process. + error!("Kill wal redo process on error"); break; } } - info!("killing WAL redo postgres process"); let _ = runtime.block_on(process.stdin.get_mut().shutdown()); let mut child = process.child; drop(process.stdin); @@ -99,6 +100,7 @@ fn handle_apply_request( let (base_img, records) = pcache.collect_records_for_apply(entry_rc.as_ref()); let mut entry = entry_rc.content.lock().unwrap(); + assert!(entry.apply_pending); entry.apply_pending = false; let nrecords = records.len(); @@ -122,9 +124,6 @@ fn handle_apply_request( result = Err(e); } else { entry.page_image = Some(apply_result.unwrap()); - pcache - .num_page_images - .fetch_add(1, std::sync::atomic::Ordering::Relaxed); result = Ok(()); } @@ -296,11 +295,7 @@ fn build_begin_redo_for_block_msg(tag: BufferTag) -> Bytes { buf.put_u8('B' as u8); buf.put_u32(len as u32); - buf.put_u32(tag.spcnode); - buf.put_u32(tag.dbnode); - buf.put_u32(tag.relnode); - buf.put_u32(tag.forknum as u32); - buf.put_u32(tag.blknum); + tag.pack(&mut buf); assert!(buf.len() == 1 + len); @@ -315,11 +310,7 @@ fn build_push_page_msg(tag: BufferTag, base_img: Bytes) -> Bytes { buf.put_u8('P' as u8); buf.put_u32(len as u32); - buf.put_u32(tag.spcnode); - buf.put_u32(tag.dbnode); - buf.put_u32(tag.relnode); - buf.put_u32(tag.forknum as u32); - buf.put_u32(tag.blknum); + tag.pack(&mut buf); buf.put(base_img); assert!(buf.len() == 1 + len); @@ -347,11 +338,7 @@ fn build_get_page_msg(tag: BufferTag) -> Bytes { buf.put_u8('G' as u8); buf.put_u32(len as u32); - buf.put_u32(tag.spcnode); - buf.put_u32(tag.dbnode); - buf.put_u32(tag.relnode); - buf.put_u32(tag.forknum as u32); - buf.put_u32(tag.blknum); + tag.pack(&mut buf); assert!(buf.len() == 1 + len);