diff --git a/Cargo.lock b/Cargo.lock index 8e83d4bcae..e6ca7717c6 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2571,9 +2571,7 @@ checksum = "b07db065a5cf61a7e4ba64f29e67db906fb1787316516c4e6e5ff0fea1efcd8a" [[package]] name = "yakv" -version = "0.1.3" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "54d2b862dd7bc9ac752c4642a0fb126dc17cb3895e20d25135f3f5052fa3000b" +version = "0.1.6" dependencies = [ "anyhow", "crc32c", diff --git a/pageserver/Cargo.toml b/pageserver/Cargo.toml index e1f95cdddb..37239c4a47 100644 --- a/pageserver/Cargo.toml +++ b/pageserver/Cargo.toml @@ -38,7 +38,7 @@ const_format = "0.2.21" tracing = "0.1.27" signal-hook = {version = "0.3.10", features = ["extended-siginfo"] } #yakv = { path = "../../yakv" } -yakv = "0.1.5" +yakv = "0.1.6" lz4_flex = "0.9.0" postgres_ffi = { path = "../postgres_ffi" } diff --git a/pageserver/src/buffered_repository.rs b/pageserver/src/buffered_repository.rs index 5c97beb1f1..a6ae3ac130 100644 --- a/pageserver/src/buffered_repository.rs +++ b/pageserver/src/buffered_repository.rs @@ -1417,21 +1417,24 @@ impl<'a> BufferedTimelineWriter<'a> { size: Some(blknum + 1), }; store.data.put(&mk.ser()?, &mv.ser()?)?; -/* - // Fill gap with zero pages - for blk in rel_size..blknum { - let key = StoreKey::Data(DataKey { - rel, - blknum: blk, - lsn, - }); - store - .data - .put(&key.ser()?, &PageVersion::Image(ZERO_PAGE.clone()).ser()?)?; - } -*/ + /* Looks like we do not need to explicitly fill gap, because we in any case have to handle situation when + * page in accessed before been wal logged + // Fill gap with zero pages + for blk in rel_size..blknum { + let key = StoreKey::Data(DataKey { + rel, + blknum: blk, + lsn, + }); + store + .data + .put(&key.ser()?, &PageVersion::Image(ZERO_PAGE.clone()).ser()?)?; + } + */ + } + if store.data.committed { + self.tl.disk_consistent_lsn.store(lsn); // each update is flushed to the disk } - self.tl.disk_consistent_lsn.store(lsn); // each update is flushed to the disk Ok(()) } } @@ -1461,9 +1464,9 @@ impl<'a> TimelineWriter for BufferedTimelineWriter<'a> { size: Some(relsize), }; store.data.put(&mk.ser()?, &mv.ser()?)?; - - self.tl.disk_consistent_lsn.store(lsn); // each update is flushed to the disk - + if store.data.committed { + self.tl.disk_consistent_lsn.store(lsn); // each update is flushed to the disk + } Ok(()) } @@ -1477,17 +1480,26 @@ impl<'a> TimelineWriter for BufferedTimelineWriter<'a> { let mv = MetadataValue { size: None }; // None indicates dropped relation store.data.put(&mk.ser()?, &mv.ser()?)?; - self.tl.disk_consistent_lsn.store(lsn); // each update is flushed to the disk - + if store.data.committed { + self.tl.disk_consistent_lsn.store(lsn); // each update is flushed to the disk + } Ok(()) } + /// + /// Complete all delayed commits and advance disk_consistent_lsn + /// + fn checkpoint(&self) -> Result<()> { + let store = self.tl.store.write().unwrap(); + store.data.checkpoint()?; + self.tl.disk_consistent_lsn.store(self.tl.get_last_record_lsn()); + Ok(()) + } + /// /// Remember the (end of) last valid WAL record remembered in the timeline. /// fn advance_last_record_lsn(&self, new_lsn: Lsn) { - assert!(new_lsn.is_aligned()); - self.tl.last_record_lsn.advance(new_lsn); } } diff --git a/pageserver/src/lib.rs b/pageserver/src/lib.rs index a1bde5f347..a2c0c47460 100644 --- a/pageserver/src/lib.rs +++ b/pageserver/src/lib.rs @@ -33,7 +33,7 @@ pub mod defaults { // Minimal size of WAL records chain to trigger materialization of the page pub const DEFAULT_CHECKPOINT_DISTANCE: u64 = 0; - pub const DEFAULT_CHECKPOINT_PERIOD: Duration = Duration::from_secs(1); + pub const DEFAULT_CHECKPOINT_PERIOD: Duration = Duration::from_millis(1000); pub const DEFAULT_GC_HORIZON: u64 = 64 * 1024 * 1024; pub const DEFAULT_GC_PERIOD: Duration = Duration::from_secs(100); diff --git a/pageserver/src/repository.rs b/pageserver/src/repository.rs index dd26a6d870..65122d9075 100644 --- a/pageserver/src/repository.rs +++ b/pageserver/src/repository.rs @@ -179,6 +179,11 @@ pub trait TimelineWriter: Deref { /// Advance requires aligned LSN as an argument and would wake wait_lsn() callers. /// Previous last record LSN is stored alongside the latest and can be read. fn advance_last_record_lsn(&self, lsn: Lsn); + + /// + /// Complete all delayed commits and advance disk_consistent_lsn + /// + fn checkpoint(&self) -> Result<()>; } #[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)] diff --git a/pageserver/src/restore_local_repo.rs b/pageserver/src/restore_local_repo.rs index 8afa2676e2..7481bbf08f 100644 --- a/pageserver/src/restore_local_repo.rs +++ b/pageserver/src/restore_local_repo.rs @@ -126,7 +126,6 @@ pub fn import_timeline_from_postgres_datadir( import_nonrel_file(writer, lsn, RelishTag::TwoPhase { xid }, &entry.path())?; } // TODO: Scan pg_tblspc - writer.advance_last_record_lsn(lsn); // Import WAL. This is needed even when starting from a shutdown checkpoint, because @@ -140,6 +139,7 @@ pub fn import_timeline_from_postgres_datadir( lsn, &mut pg_control.checkPointCopy.clone(), )?; + writer.checkpoint()?; Ok(()) } diff --git a/pageserver/src/toast_store.rs b/pageserver/src/toast_store.rs index ae211fcd5e..eb4aa8da0f 100644 --- a/pageserver/src/toast_store.rs +++ b/pageserver/src/toast_store.rs @@ -5,10 +5,14 @@ use std::ops::RangeBounds; use std::path::Path; use yakv::storage::{Key, Storage, StorageIterator, Value}; use zenith_utils::bin_ser::BeSer; +use tracing::*; const TOAST_SEGMENT_SIZE: usize = 2 * 1024; const CHECKPOINT_INTERVAL: u64 = 1u64 * 1024 * 1024 * 1024; -const CACHE_SIZE: usize = 1024; // 8Mb +const MAIN_CACHE_SIZE: usize = 8 * 1024; // 64Mb +const TOAST_CACHE_SIZE: usize = 1024; // 8Mb +const MAIN_COMMIT_THRESHOLD: usize = MAIN_CACHE_SIZE / 2; +const TOAST_COMMIT_THRESHOLD: usize = TOAST_CACHE_SIZE / 2; const TOAST_VALUE_TAG: u8 = 0; const PLAIN_VALUE_TAG: u8 = 1; @@ -22,10 +26,10 @@ type ToastId = u32; /// data locality and reduce key size for TOAST segments. /// pub struct ToastStore { - pub update_count: u64, - pub index: Storage, // primary storage - blobs: Storage, // storage for TOAST segments - next_id: ToastId, // counter used to identify new TOAST segments + main: Storage, // primary storage + toast: Storage, // storage for TOAST segments + next_id: ToastId, // counter used to identify new TOAST segments + pub committed: bool, // last transaction was committed (not delayed) } /// @@ -75,31 +79,31 @@ impl<'a> DoubleEndedIterator for ToastIterator<'a> { impl ToastStore { pub fn new(path: &Path) -> Result { Ok(ToastStore { - update_count: 0, - index: Storage::open( - &path.join("index.db"), - None, - CACHE_SIZE, + main: Storage::open( + &path.join("main.db"), + Some(&path.join("main.log")), + MAIN_CACHE_SIZE, CHECKPOINT_INTERVAL, )?, - blobs: Storage::open( - &path.join("blobs.db"), - None, - CACHE_SIZE, + toast: Storage::open( + &path.join("toast.db"), + Some(&path.join("toast.log")), + TOAST_CACHE_SIZE, CHECKPOINT_INTERVAL, )?, next_id: 0, + committed: false, }) } pub fn put(&mut self, key: &Key, value: &Value) -> Result<()> { - let mut index_tx = self.index.start_transaction(); + let mut main_tx = self.main.start_transaction(); let value_len = value.len(); - self.update_count += 1; + let main_pinned; if value_len >= TOAST_SEGMENT_SIZE { - let mut blobs_tx = self.blobs.start_transaction(); + let mut toast_tx = self.toast.start_transaction(); if self.next_id == 0 { - self.next_id = blobs_tx + self.next_id = toast_tx .iter() .next_back() .transpose()? @@ -114,7 +118,7 @@ impl ToastStore { let mut offs: usize = 0; let mut segno = 0u32; while offs + TOAST_SEGMENT_SIZE <= compressed_data_len { - blobs_tx.put( + toast_tx.put( &ToastSegId { toast_id, segno }.ser()?, &compressed_data[offs..offs + TOAST_SEGMENT_SIZE].to_vec(), )?; @@ -122,7 +126,7 @@ impl ToastStore { segno += 1; } if offs < compressed_data_len { - blobs_tx.put( + toast_tx.put( &ToastSegId { toast_id, segno }.ser()?, &compressed_data[offs..].to_vec(), )?; @@ -134,20 +138,43 @@ impl ToastStore { } .ser()?; value.insert(0, TOAST_VALUE_TAG); - index_tx.put(key, &value)?; - blobs_tx.commit()?; + main_tx.put(key, &value)?; + main_pinned = main_tx.get_cache_info().pinned; + // If we are going to commit main storage, then we have to commit toast storage first to avoid dangling references + if main_pinned > MAIN_COMMIT_THRESHOLD + || toast_tx.get_cache_info().pinned > TOAST_COMMIT_THRESHOLD + { + toast_tx.commit()?; + } else { + toast_tx.delay()?; + } } else { let mut vec = Vec::with_capacity(value.len() + 1); vec.push(PLAIN_VALUE_TAG); vec.extend_from_slice(&value); - index_tx.put(key, &vec)?; + main_tx.put(key, &vec)?; + main_pinned = main_tx.get_cache_info().pinned; + } + if main_pinned > MAIN_COMMIT_THRESHOLD { + main_tx.commit()?; + self.committed = true; + } else { + main_tx.delay()?; + self.committed = false; } - index_tx.commit()?; Ok(()) } + pub fn checkpoint(&self) -> Result<()> { + let mut main_tx = self.main.start_transaction(); + let mut toast_tx = self.toast.start_transaction(); + toast_tx.commit()?; + main_tx.commit()?; + Ok(()) + } + pub fn get(&self, key: &[u8]) -> Result> { - self.index + self.main .get(&key.to_vec()) .transpose() .and_then(|res| Some(res.and_then(|value| Ok(self.detoast(value)?)))) @@ -161,20 +188,22 @@ impl ToastStore { pub fn range>(&self, range: R) -> ToastIterator<'_> { ToastIterator { store: self, - iter: self.index.range(range), + iter: self.main.range(range), } } - pub fn remove(&self, key: &Key) -> Result<()> { - let mut index_tx = self.index.start_transaction(); - if let Some(value) = index_tx.get(key)? { + pub fn remove(&mut self, key: &Key) -> Result<()> { + let mut main_tx = self.main.start_transaction(); + if let Some(value) = main_tx.get(key)? { + main_tx.remove(key)?; + let main_pinned = main_tx.get_cache_info().pinned; if value[0] == TOAST_VALUE_TAG { - let mut blobs_tx = self.blobs.start_transaction(); + let mut toast_tx = self.toast.start_transaction(); let toast_ref = ToastRef::des(&value[1..])?; let n_segments = ((toast_ref.compressed_size as usize + TOAST_SEGMENT_SIZE - 1) / TOAST_SEGMENT_SIZE) as u32; for segno in 0..n_segments { - blobs_tx.remove( + toast_tx.remove( &ToastSegId { toast_id: toast_ref.toast_id, segno, @@ -182,11 +211,31 @@ impl ToastStore { .ser()?, )?; } - blobs_tx.commit()?; + // If we are going to commit main storage, then we have to commit toast storage first to avoid dangling references + if main_pinned > MAIN_COMMIT_THRESHOLD + || toast_tx.get_cache_info().pinned > TOAST_COMMIT_THRESHOLD + { + toast_tx.commit()?; + } else { + toast_tx.delay()?; + } } - index_tx.remove(key)?; + if main_pinned > MAIN_COMMIT_THRESHOLD { + main_tx.commit()?; + self.committed = true; + } else { + main_tx.delay()?; + self.committed = false; + } + } else { + self.committed = false; } - index_tx.commit()?; + Ok(()) + } + + pub fn close(&self) -> Result<()> { + self.toast.close()?; // commit and close TOAST store first to avoid dangling references + self.main.close()?; Ok(()) } @@ -207,7 +256,7 @@ impl ToastStore { segno: n_segments, } .ser()?; - for seg in self.blobs.range(from..till) { + for seg in self.toast.range(from..till) { toast.extend_from_slice(&seg?.1); } Ok(lz4_flex::decompress(&toast, toast_ref.orig_size as usize)?) @@ -217,3 +266,11 @@ impl ToastStore { } } } + +impl Drop for ToastStore { + fn drop(&mut self) { + info!("Storage closed"); + // FIXME-KK: better call close() explicitly + self.close().unwrap(); + } +}