From 3e08ad485afc4a16bad9e9a774bb58f80eab3b6b Mon Sep 17 00:00:00 2001 From: Konstantin Knizhnik Date: Tue, 30 Nov 2021 16:33:50 +0300 Subject: [PATCH] Fix bug in using brin index in GC --- Cargo.lock | 2 +- pageserver/Cargo.toml | 2 +- pageserver/src/buffered_repository.rs | 24 +++++++----- pageserver/src/lib.rs | 2 +- pageserver/src/toast_store.rs | 55 ++++++++++++++++++--------- 5 files changed, 55 insertions(+), 30 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index f842c55a12..3e76bccddb 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2571,7 +2571,7 @@ checksum = "b07db065a5cf61a7e4ba64f29e67db906fb1787316516c4e6e5ff0fea1efcd8a" [[package]] name = "yakv" -version = "0.2.1" +version = "0.2.4" dependencies = [ "anyhow", "fs2", diff --git a/pageserver/Cargo.toml b/pageserver/Cargo.toml index 0e469c85ee..42c4a0a2d1 100644 --- a/pageserver/Cargo.toml +++ b/pageserver/Cargo.toml @@ -38,7 +38,7 @@ const_format = "0.2.21" tracing = "0.1.27" signal-hook = {version = "0.3.10", features = ["extended-siginfo"] } #yakv = { path = "../../yakv" } -yakv = "0.2.2" +yakv = "0.2.4" lz4_flex = "0.9.0" postgres_ffi = { path = "../postgres_ffi" } diff --git a/pageserver/src/buffered_repository.rs b/pageserver/src/buffered_repository.rs index 54b049b148..9c155a5260 100644 --- a/pageserver/src/buffered_repository.rs +++ b/pageserver/src/buffered_repository.rs @@ -187,6 +187,7 @@ struct RelishStore { brin: BTreeMap, last_checkpoint: Lsn, last_gc: Lsn, + last_commit: Lsn, } /// Public interface @@ -1308,6 +1309,7 @@ impl BufferedTimeline { brin: BTreeMap::new(), last_checkpoint: Lsn(0), last_gc: Lsn(0), + last_commit: Lsn(0), }), walredo_mgr, @@ -1591,7 +1593,7 @@ impl BufferedTimeline { self.reconstruct_page(dk.rel, dk.blknum, dk.lsn, data) }); - let mut store = self.store.write().unwrap(); + let store = self.store.write().unwrap(); store.data.put(key, PageVersion::Page(img?).ser()?)?; n_checkpointed_records += 1; } @@ -1717,7 +1719,7 @@ impl BufferedTimeline { if same_rel { // then drop previus version as it is not needed any more drop(store); - let mut store = self.store.write().unwrap(); + let store = self.store.write().unwrap(); store.data.remove(prev_key)?; result.meta_removed += 1; // We should reset iterator and start from the current point @@ -1728,7 +1730,7 @@ impl BufferedTimeline { if meta.size.is_none() { // object was dropped, so we can immediately remove deteriorated version drop(store); - let mut store = self.store.write().unwrap(); + let store = self.store.write().unwrap(); store.data.remove(raw_key)?; dropped.insert(dk.rel); result.meta_dropped += 1; @@ -1783,8 +1785,8 @@ impl BufferedTimeline { && store.brin.get(&seg_tag).map_or(true, |lsn| *lsn <= last_gc) { // This segment was not update since last GC: jump to next one - let mut iter = store.brin.range((Excluded(seg_tag), Unbounded)); - while let Some((next_seg, lsn)) = iter.next_back() { + let iter = store.brin.range((Excluded(seg_tag), Unbounded)); + for (next_seg, lsn) in iter { if *lsn > last_gc { from = StoreKey::Data(DataKey { rel: next_seg.rel, @@ -1818,7 +1820,7 @@ impl BufferedTimeline { if let PageVersion::Page(_) = ver { // ... then remove all previously accumulated deltas and images, as them are not needed any more drop(store); - let mut store = self.store.write().unwrap(); + let store = self.store.write().unwrap(); result.pages_removed += deteriorated.len() as u64; for key in deteriorated { store.data.remove(key)?; @@ -1836,7 +1838,7 @@ impl BufferedTimeline { // we can remove all its pages assert!(deteriorated.is_empty()); // we should not append anything to `deteriorated` for dropped relation drop(store); - let mut store = self.store.write().unwrap(); + let store = self.store.write().unwrap(); // We should reset iterator and start from the current point store.data.remove(raw_key)?; result.pages_dropped += 1; @@ -1997,8 +1999,9 @@ impl<'a> BufferedTimelineWriter<'a> { } */ } - if store.data.commit_lsn + self.tl.conf.checkpoint_distance < lsn { - store.data.commit(lsn)?; + if store.last_commit + self.tl.conf.checkpoint_distance < lsn { + store.data.commit()?; + store.last_commit = lsn; self.tl.disk_consistent_lsn.store(lsn); } Ok(()) @@ -2102,7 +2105,8 @@ impl<'a> TimelineWriter for BufferedTimelineWriter<'a> { fn checkpoint(&self) -> Result<()> { let mut store = self.tl.store.write().unwrap(); let lsn = self.tl.get_last_record_lsn(); - store.data.commit(lsn)?; + store.last_commit = lsn; + store.data.commit()?; self.tl.disk_consistent_lsn.store(lsn); Ok(()) } diff --git a/pageserver/src/lib.rs b/pageserver/src/lib.rs index 4a89bc24b4..f7ced71dba 100644 --- a/pageserver/src/lib.rs +++ b/pageserver/src/lib.rs @@ -37,7 +37,7 @@ pub mod defaults { pub const DEFAULT_CHECKPOINT_PERIOD: Duration = Duration::from_secs(10); pub const DEFAULT_UPLOAD_DISTANCE: u64 = 1024 * 1024 * 1024; - pub const DEFAULT_UPLOAD_PERIOD: Duration = Duration::from_secs(250); + pub const DEFAULT_UPLOAD_PERIOD: Duration = Duration::from_secs(2500); pub const DEFAULT_RECONSTRUCT_THRESHOLD: u64 = 0; diff --git a/pageserver/src/toast_store.rs b/pageserver/src/toast_store.rs index e874ba5090..2d68f7cadd 100644 --- a/pageserver/src/toast_store.rs +++ b/pageserver/src/toast_store.rs @@ -3,9 +3,8 @@ use lz4_flex; use std::convert::TryInto; use std::ops::{Bound, RangeBounds}; use std::path::Path; -use zenith_utils::lsn::Lsn; -use yakv::storage::{Key, Storage, StorageConfig, StorageIterator, Value}; +use yakv::storage::{Key, Storage, StorageConfig, StorageIterator, Transaction, Value}; const TOAST_SEGMENT_SIZE: usize = 2 * 1024; const CACHE_SIZE: usize = 32 * 1024; // 256Mb @@ -19,13 +18,29 @@ const CACHE_SIZE: usize = 32 * 1024; // 256Mb /// pub struct ToastStore { db: Storage, // key-value database - pub commit_lsn: Lsn, // LSN of last committed transaction } pub struct ToastIterator<'a> { iter: StorageIterator<'a>, } +#[derive(Clone, Copy)] +pub struct PageData { + data: [u8; 8192], +} + +impl PageData { + pub fn find_first_zero_bit(&self, offs: usize) -> usize { + let bytes = self.data; + for i in offs..8192 { + if bytes[i] != 0xFFu8 { + return i * 8 + bytes[i].trailing_ones() as usize; + } + } + usize::MAX + } +} + impl<'a> Iterator for ToastIterator<'a> { type Item = Result<(Key, Value)>; fn next(&mut self) -> Option { @@ -125,14 +140,15 @@ impl ToastStore { StorageConfig { cache_size: CACHE_SIZE, nosync: false, + mursiw: true, }, )?, - commit_lsn: Lsn(0), }) } - pub fn put(&mut self, key: Key, value: Value) -> Result<()> { + pub fn put(&self, key: Key, value: Value) -> Result<()> { let mut tx = self.db.start_transaction(); + self.tx_remove(&mut tx, &key)?; let value_len = value.len(); let mut key = key; if value_len >= TOAST_SEGMENT_SIZE { @@ -146,7 +162,7 @@ impl ToastStore { key.extend_from_slice(&n_segments.to_be_bytes()); key.extend_from_slice(&[0u8; 2]); let key_len = key.len(); - while offs + TOAST_SEGMENT_SIZE <= compressed_data_len { + while offs + TOAST_SEGMENT_SIZE < compressed_data_len { key[key_len - 2..].copy_from_slice(&segno.to_be_bytes()); tx.put( &key, @@ -155,10 +171,8 @@ impl ToastStore { offs += TOAST_SEGMENT_SIZE; segno += 1; } - if offs < compressed_data_len { - key[key_len - 2..].copy_from_slice(&segno.to_be_bytes()); - tx.put(&key, &compressed_data[offs..].to_vec())?; - } + key[key_len - 2..].copy_from_slice(&segno.to_be_bytes()); + tx.put(&key, &compressed_data[offs..].to_vec())?; } else { key.extend_from_slice(&[0u8; 4]); tx.put(&key, &value)?; @@ -167,10 +181,9 @@ impl ToastStore { Ok(()) } - pub fn commit(&mut self, commit_lsn: Lsn) -> Result<()> { - let mut tx = self.db.start_transaction(); + pub fn commit(&mut self) -> Result<()> { + let tx = self.db.start_transaction(); tx.commit()?; - self.commit_lsn = commit_lsn; Ok(()) } @@ -210,10 +223,15 @@ impl ToastStore { } } - pub fn remove(&mut self, key: Key) -> Result<()> { + pub fn remove(&self, key: Key) -> Result<()> { let mut tx = self.db.start_transaction(); - let mut min_key = key.clone(); - let mut max_key = key; + self.tx_remove(&mut tx, &key)?; + tx.delay() + } + + pub fn tx_remove(&self, tx: &mut Transaction, key: &[u8]) -> Result<()> { + let mut min_key = key.to_vec(); + let mut max_key = key.to_vec(); min_key.extend_from_slice(&[0u8; 4]); max_key.extend_from_slice(&[0xFFu8; 4]); let mut iter = tx.range(&min_key..&max_key); @@ -231,7 +249,10 @@ impl ToastStore { tx.remove(&key)?; } } - tx.delay()?; Ok(()) } + + pub fn size(&self) -> u64 { + self.db.get_database_info().db_used + } }