diff --git a/Cargo.lock b/Cargo.lock index 0075a7a433..af15d5776b 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2571,9 +2571,7 @@ checksum = "b07db065a5cf61a7e4ba64f29e67db906fb1787316516c4e6e5ff0fea1efcd8a" [[package]] name = "yakv" -version = "0.1.6" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c17eda78e0aabbe323628af9b09766827e5b321b5e742cb38a6dd8f09c60a0c1" +version = "0.1.8" dependencies = [ "anyhow", "crc32c", diff --git a/Cargo.toml b/Cargo.toml index 1ac8a9c0b7..66d3fb1f8e 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -15,3 +15,7 @@ members = [ # This is useful for profiling and, to some extent, debug. # Besides, debug info should not affect the performance. debug = true +panic = 'abort' + +[profile.dev] +panic = 'abort' diff --git a/pageserver/Cargo.toml b/pageserver/Cargo.toml index 37239c4a47..ad092976f1 100644 --- a/pageserver/Cargo.toml +++ b/pageserver/Cargo.toml @@ -38,7 +38,7 @@ const_format = "0.2.21" tracing = "0.1.27" signal-hook = {version = "0.3.10", features = ["extended-siginfo"] } #yakv = { path = "../../yakv" } -yakv = "0.1.6" +yakv = "0.1.8" lz4_flex = "0.9.0" postgres_ffi = { path = "../postgres_ffi" } diff --git a/pageserver/src/buffered_repository.rs b/pageserver/src/buffered_repository.rs index 7b4c2318d6..68db3f5869 100644 --- a/pageserver/src/buffered_repository.rs +++ b/pageserver/src/buffered_repository.rs @@ -59,6 +59,13 @@ const METADATA_MAX_SAFE_SIZE: usize = 512; const METADATA_CHECKSUM_SIZE: usize = std::mem::size_of::(); const METADATA_MAX_DATA_SIZE: usize = METADATA_MAX_SAFE_SIZE - METADATA_CHECKSUM_SIZE; +const ZERO_TAG: RelTag = RelTag { + spcnode: 0, + dbnode: 0, + relnode: 0, + forknum: 0, +}; + // Metrics collected on operations on the storage repository. lazy_static! { static ref STORAGE_TIME: HistogramVec = register_histogram_vec!( @@ -279,6 +286,12 @@ impl Repository for BufferedRepository { /// Private functions impl BufferedRepository { + fn get_buffered_timeline(&self, timelineid: ZTimelineId) -> Result> { + let mut timelines = self.timelines.lock().unwrap(); + + Ok(self.get_timeline_locked(timelineid, &mut timelines)?) + } + // Implementation of the public `get_timeline` function. This differs from the public // interface in that the caller must already hold the mutex on the 'timelines' hashmap. fn get_timeline_locked( @@ -374,12 +387,19 @@ impl BufferedRepository { fn checkpoint_loop(&self, conf: &'static PageServerConf) -> Result<()> { while !tenant_mgr::shutdown_requested() { std::thread::sleep(conf.checkpoint_period); - trace!("checkpointer thread for tenant {} waking up", self.tenantid); + info!("checkpointer thread for tenant {} waking up", self.tenantid); // checkpoint timelines that have accumulated more than CHECKPOINT_DISTANCE // bytes of WAL since last checkpoint. { - let timelines = self.timelines.lock().unwrap(); + let timelines: Vec<(ZTimelineId, Arc)> = self + .timelines + .lock() + .unwrap() + .iter() + .map(|pair| (pair.0.clone(), pair.1.clone())) + .collect(); + //let timelines = self.timelines.lock().unwrap(); for (timelineid, timeline) in timelines.iter() { let _entered = info_span!("checkpoint", timeline = %timelineid, tenant = %self.tenantid) @@ -421,7 +441,8 @@ impl BufferedRepository { while !tenant_mgr::shutdown_requested() { // Garbage collect old files that are not needed for PITR anymore if conf.gc_horizon > 0 { - self.gc_iteration(None, conf.gc_horizon, false).unwrap(); + let result = self.gc_iteration(None, conf.gc_horizon, false)?; + info!("GC result: {:?}", result); } // TODO Write it in more adequate way using @@ -536,7 +557,7 @@ impl BufferedRepository { // grab mutex to prevent new timelines from being created here. // TODO: We will hold it for a long time - let mut timelines = self.timelines.lock().unwrap(); + //let mut timelines = self.timelines.lock().unwrap(); // Scan all timelines. For each timeline, remember the timeline ID and // the branch point where it was created. @@ -560,7 +581,7 @@ impl BufferedRepository { //Now collect info about branchpoints let mut all_branchpoints: BTreeSet<(ZTimelineId, Lsn)> = BTreeSet::new(); for timelineid in &timelineids { - let timeline = self.get_timeline_locked(*timelineid, &mut *timelines)?; + let timeline = self.get_buffered_timeline(*timelineid)?; if let Some(ancestor_timeline) = &timeline.ancestor_timeline { // If target_timeline is specified, we only need to know branchpoints of its childs @@ -582,7 +603,7 @@ impl BufferedRepository { for timelineid in timelineids { // We have already loaded all timelines above // so this operation is just a quick map lookup. - let timeline = self.get_timeline_locked(timelineid, &mut *timelines)?; + let timeline = self.get_buffered_timeline(timelineid)?; // If target_timeline is specified, only GC it if let Some(target_timelineid) = target_timelineid { @@ -1163,12 +1184,7 @@ impl BufferedTimeline { fn checkpoint_internal(&self, checkpoint_distance: u64, _forced: bool) -> Result<()> { // From boundary is constant and till boundary is changed at each iteration. let from = StoreKey::Data(DataKey { - rel: RelishTag::Relation(RelTag { - spcnode: 0, - dbnode: 0, - relnode: 0, - forknum: 0, - }), + rel: RelishTag::Relation(ZERO_TAG), blknum: 0, lsn: Lsn(0), }) @@ -1194,6 +1210,7 @@ impl BufferedTimeline { if let Some(entry) = iter.next_back() { let pair = entry?; let key = pair.0; + debug_assert!(key < till); if let StoreKey::Data(dk) = StoreKey::des(&key)? { let ver = PageVersion::des(&pair.1)?; if let PageVersion::Delta(rec) = ver { @@ -1277,7 +1294,7 @@ impl BufferedTimeline { &metadata, false, )?; - trace!("Checkpoint {} records", n_checkpointed_records); + info!("Checkpoint {} records", n_checkpointed_records); if self.upload_relishes { schedule_timeline_upload(()) @@ -1322,108 +1339,160 @@ impl BufferedTimeline { info!("GC starting"); - let mut from_rel = RelishTag::Relation(RelTag { - spcnode: 0, - dbnode: 0, - relnode: 0, - forknum: 0, - }); + let mut from_rel = RelishTag::Relation(ZERO_TAG); let mut from = StoreKey::Metadata(MetadataKey { rel: from_rel, lsn: Lsn(0), }); - - // We can not remove deteriorated version immediately, we need to check first that successor exists - let mut last_key: Option = None; + // Keep tracked dropped relish + let mut dropped: HashSet = HashSet::new(); 'meta: loop { let store = self.store.read().unwrap(); let mut iter = store.data.range(&from.ser()?..); + // We can not remove deteriorated version immediately, we need to check first that successor exists + let mut last_key: Option = None; + while let Some(entry) = iter.next() { - let raw_key = entry?.0; + let pair = entry?; + let raw_key = pair.0; let key = StoreKey::des(&raw_key)?; if let StoreKey::Metadata(dk) = key { + // processing metadata + let same_rel = from_rel == dk.rel; + if !same_rel { + // we jumped to the next relation + result.meta_total += 1; + from_rel = dk.rel; + from = StoreKey::Metadata(MetadataKey { + rel: from_rel, + lsn: Lsn(0), + }); + } if dk.lsn < cutoff { // we have something deteriorated + // Has previos version if let Some(prev_key) = last_key { - // We are still on the same relish... - if from_rel == dk.rel { + // If we are still on the same relish... + if same_rel { + // then drop previus version as it is not needed any more drop(store); let mut store = self.store.write().unwrap(); store.data.remove(&prev_key)?; - last_key = None; + result.meta_removed += 1; // We should reset iterator and start from the current point continue 'meta; } } - from_rel = dk.rel; - from = key; - // Remember key as candidate for deletion - last_key = Some(raw_key); + let meta = MetadataValue::des(&pair.1)?; + if meta.size.is_none() { + // object was dropped, so we can immediately remove deteriorated version + drop(store); + let mut store = self.store.write().unwrap(); + store.data.remove(&raw_key)?; + dropped.insert(dk.rel); + result.meta_dropped += 1; + // We should reset iterator and start from the current point + continue 'meta; + } else { + // Remember key as candidate for deletion and contiune iteration + last_key = Some(raw_key); + } } else { - from_rel = dk.rel; + // We reached version which should be preserved to enable PITR, so jump to the next object from = StoreKey::Metadata(MetadataKey { rel: from_rel, lsn: Lsn::MAX, }); - last_key = None; + continue 'meta; } } else { + // End of metadata break 'meta; } } break; } - // Array to accumulate keys we can remove + from_rel = RelishTag::Relation(ZERO_TAG); + from = StoreKey::Data(DataKey { + rel: from_rel, + blknum: 0, + lsn: Lsn(0), + }); + + // Array to accumulate keys we can remove. + // Place it outside main loop to reduce number of dynamic memory allocations let mut deteriorated: Vec = Vec::new(); // currently proceed block number let mut from_blknum = 0; - - 'data: loop { + 'pages: loop { let store = self.store.read().unwrap(); let mut iter = store.data.range(&from.ser()?..); + deteriorated.clear(); while let Some(entry) = iter.next() { let pair = entry?; let raw_key = pair.0; let key = StoreKey::des(&raw_key)?; if let StoreKey::Data(dk) = key { - if dk.lsn < cutoff { - // we have something deteriorated - let ver = PageVersion::des(&pair.1)?; - // We are still on the same page... - if from_rel == dk.rel && from_blknum == dk.blknum { - if let PageVersion::Image(_) = ver { - // We have full page image: remove all preceding deteriorated records - drop(store); - let mut store = self.store.write().unwrap(); - for key in deteriorated.iter() { - store.data.remove(key)?; - } - deteriorated.clear(); - // We should reset iterator and start from the current point - continue 'data; - } - // No full page image, so we can't remove deteriorated stuff - deteriorated.clear(); - } - // Remember key as candidate for deletion - deteriorated.push(raw_key); + let same_page = from_rel == dk.rel && from_blknum == dk.blknum; + if !same_page { + result.pages_total += 1; from_rel = dk.rel; from_blknum = dk.blknum; - from = key; + from = StoreKey::Data(DataKey { + rel: from_rel, + blknum: from_blknum, + lsn: Lsn(0), + }); + } + if dk.lsn < cutoff { + // we have something deteriorated + // If we still on the same page... + if same_page { + if !deteriorated.is_empty() { + // .. and have something to remove + // ... and have page image + let ver = PageVersion::des(&pair.1)?; + if let PageVersion::Image(_) = ver { + // ... then remove all previously accumulated deltas and images, as them are not needed any more + drop(store); + let mut store = self.store.write().unwrap(); + for key in deteriorated.iter() { + store.data.remove(key)?; + } + result.pages_removed += deteriorated.len() as u64; + // We should reset iterator and start from the current point + continue 'pages; + } + } + } else { + // we jumped to the next relation + deteriorated.clear(); + } + if dropped.contains(&dk.rel) { + // This relations was dropped beyond PITR interval: + // we can remove all its pages + assert!(deteriorated.is_empty()); // we should not append anything to `deteriorated` for dropped relation + drop(store); + let mut store = self.store.write().unwrap(); + // We should reset iterator and start from the current point + store.data.remove(&raw_key)?; + result.pages_dropped += 1; + continue 'pages; + } + // Remember key as candidate for deletion and continue iteration + deteriorated.push(raw_key); } else { - // Jump to next page - from_rel = dk.rel; + // We reached version which should be preserved to enable PITR, so jump to the next object from = StoreKey::Data(DataKey { rel: from_rel, blknum: from_blknum + 1, lsn: Lsn(0), }); - deteriorated.clear(); } } else { - break 'data; + break 'pages; } } break; diff --git a/pageserver/src/lib.rs b/pageserver/src/lib.rs index 60bc3c7f5e..f2bfd6c48b 100644 --- a/pageserver/src/lib.rs +++ b/pageserver/src/lib.rs @@ -35,7 +35,7 @@ pub mod defaults { pub const DEFAULT_CHECKPOINT_DISTANCE: u64 = 0; pub const DEFAULT_CHECKPOINT_PERIOD: Duration = Duration::from_secs(1); - pub const DEFAULT_GC_HORIZON: u64 = 1 * 1024 * 1024; + pub const DEFAULT_GC_HORIZON: u64 = 1600_000_000u64; pub const DEFAULT_GC_PERIOD: Duration = Duration::from_secs(10); pub const DEFAULT_SUPERUSER: &str = "zenith_admin"; diff --git a/pageserver/src/page_service.rs b/pageserver/src/page_service.rs index be849ce35f..9dfed70e5f 100644 --- a/pageserver/src/page_service.rs +++ b/pageserver/src/page_service.rs @@ -693,67 +693,21 @@ impl postgres_backend::Handler for PageServerHandler { let result = repo.gc_iteration(Some(timelineid), gc_horizon, true)?; pgb.write_message_noflush(&BeMessage::RowDescription(&[ - RowDescriptor::int8_col(b"layer_relfiles_total"), - RowDescriptor::int8_col(b"layer_relfiles_needed_by_cutoff"), - RowDescriptor::int8_col(b"layer_relfiles_needed_by_branches"), - RowDescriptor::int8_col(b"layer_relfiles_not_updated"), - RowDescriptor::int8_col(b"layer_relfiles_needed_as_tombstone"), - RowDescriptor::int8_col(b"layer_relfiles_removed"), - RowDescriptor::int8_col(b"layer_relfiles_dropped"), - RowDescriptor::int8_col(b"layer_nonrelfiles_total"), - RowDescriptor::int8_col(b"layer_nonrelfiles_needed_by_cutoff"), - RowDescriptor::int8_col(b"layer_nonrelfiles_needed_by_branches"), - RowDescriptor::int8_col(b"layer_nonrelfiles_not_updated"), - RowDescriptor::int8_col(b"layer_nonrelfiles_needed_as_tombstone"), - RowDescriptor::int8_col(b"layer_nonrelfiles_removed"), - RowDescriptor::int8_col(b"layer_nonrelfiles_dropped"), + RowDescriptor::int8_col(b"meta_total"), + RowDescriptor::int8_col(b"meta_removed"), + RowDescriptor::int8_col(b"meta_dropped"), + RowDescriptor::int8_col(b"pages_total"), + RowDescriptor::int8_col(b"pages_removed"), + RowDescriptor::int8_col(b"pages_dropped"), RowDescriptor::int8_col(b"elapsed"), ]))? .write_message_noflush(&BeMessage::DataRow(&[ - Some(result.ondisk_relfiles_total.to_string().as_bytes()), - Some( - result - .ondisk_relfiles_needed_by_cutoff - .to_string() - .as_bytes(), - ), - Some( - result - .ondisk_relfiles_needed_by_branches - .to_string() - .as_bytes(), - ), - Some(result.ondisk_relfiles_not_updated.to_string().as_bytes()), - Some( - result - .ondisk_relfiles_needed_as_tombstone - .to_string() - .as_bytes(), - ), - Some(result.ondisk_relfiles_removed.to_string().as_bytes()), - Some(result.ondisk_relfiles_dropped.to_string().as_bytes()), - Some(result.ondisk_nonrelfiles_total.to_string().as_bytes()), - Some( - result - .ondisk_nonrelfiles_needed_by_cutoff - .to_string() - .as_bytes(), - ), - Some( - result - .ondisk_nonrelfiles_needed_by_branches - .to_string() - .as_bytes(), - ), - Some(result.ondisk_nonrelfiles_not_updated.to_string().as_bytes()), - Some( - result - .ondisk_nonrelfiles_needed_as_tombstone - .to_string() - .as_bytes(), - ), - Some(result.ondisk_nonrelfiles_removed.to_string().as_bytes()), - Some(result.ondisk_nonrelfiles_dropped.to_string().as_bytes()), + Some(result.meta_total.to_string().as_bytes()), + Some(result.meta_removed.to_string().as_bytes()), + Some(result.meta_dropped.to_string().as_bytes()), + Some(result.pages_total.to_string().as_bytes()), + Some(result.pages_removed.to_string().as_bytes()), + Some(result.pages_dropped.to_string().as_bytes()), Some(result.elapsed.as_millis().to_string().as_bytes()), ]))? .write_message(&BeMessage::CommandComplete(b"SELECT 1"))?; diff --git a/pageserver/src/repository.rs b/pageserver/src/repository.rs index ee4608156d..4016c900c8 100644 --- a/pageserver/src/repository.rs +++ b/pageserver/src/repository.rs @@ -44,45 +44,27 @@ pub trait Repository: Send + Sync { /// /// Result of performing GC /// -#[derive(Default)] +#[derive(Default, Debug)] pub struct GcResult { - pub ondisk_relfiles_total: u64, - pub ondisk_relfiles_needed_by_cutoff: u64, - pub ondisk_relfiles_needed_by_branches: u64, - pub ondisk_relfiles_not_updated: u64, - pub ondisk_relfiles_needed_as_tombstone: u64, - pub ondisk_relfiles_removed: u64, // # of layer files removed because they have been made obsolete by newer ondisk files. - pub ondisk_relfiles_dropped: u64, // # of layer files removed because the relation was dropped + pub meta_removed: u64, // removed versions beyond PITR interval for which new page image exists + pub meta_dropped: u64, // removed versions beyond PITR interval of dropped relations + pub meta_total: u64, // total number of metaobject version histories - pub ondisk_nonrelfiles_total: u64, - pub ondisk_nonrelfiles_needed_by_cutoff: u64, - pub ondisk_nonrelfiles_needed_by_branches: u64, - pub ondisk_nonrelfiles_not_updated: u64, - pub ondisk_nonrelfiles_needed_as_tombstone: u64, - pub ondisk_nonrelfiles_removed: u64, // # of layer files removed because they have been made obsolete by newer ondisk files. - pub ondisk_nonrelfiles_dropped: u64, // # of layer files removed because the relation was dropped + pub pages_removed: u64, // removed versions beyond PITR interval for which new page image exists + pub pages_dropped: u64, // removed versions beyond PITR interval of dropped relations + pub pages_total: u64, // total number of page vaersion histories pub elapsed: Duration, } impl AddAssign for GcResult { fn add_assign(&mut self, other: Self) { - self.ondisk_relfiles_total += other.ondisk_relfiles_total; - self.ondisk_relfiles_needed_by_cutoff += other.ondisk_relfiles_needed_by_cutoff; - self.ondisk_relfiles_needed_by_branches += other.ondisk_relfiles_needed_by_branches; - self.ondisk_relfiles_not_updated += other.ondisk_relfiles_not_updated; - self.ondisk_relfiles_needed_as_tombstone += other.ondisk_relfiles_needed_as_tombstone; - self.ondisk_relfiles_removed += other.ondisk_relfiles_removed; - self.ondisk_relfiles_dropped += other.ondisk_relfiles_dropped; - - self.ondisk_nonrelfiles_total += other.ondisk_nonrelfiles_total; - self.ondisk_nonrelfiles_needed_by_cutoff += other.ondisk_nonrelfiles_needed_by_cutoff; - self.ondisk_nonrelfiles_needed_by_branches += other.ondisk_nonrelfiles_needed_by_branches; - self.ondisk_nonrelfiles_not_updated += other.ondisk_nonrelfiles_not_updated; - self.ondisk_nonrelfiles_needed_as_tombstone += other.ondisk_nonrelfiles_needed_as_tombstone; - self.ondisk_nonrelfiles_removed += other.ondisk_nonrelfiles_removed; - self.ondisk_nonrelfiles_dropped += other.ondisk_nonrelfiles_dropped; - + self.meta_total += other.meta_total; + self.meta_removed += other.meta_removed; + self.meta_dropped += other.meta_dropped; + self.pages_total += other.pages_total; + self.pages_removed += other.pages_removed; + self.pages_dropped += other.pages_dropped; self.elapsed += other.elapsed; } } diff --git a/pageserver/src/toast_store.rs b/pageserver/src/toast_store.rs index 07f95e8c87..2dda5e7750 100644 --- a/pageserver/src/toast_store.rs +++ b/pageserver/src/toast_store.rs @@ -3,7 +3,9 @@ use lz4_flex; use std::convert::TryInto; use std::ops::{Bound, RangeBounds}; use std::path::Path; +use tracing::*; use yakv::storage::{Key, Storage, StorageIterator, Value}; + const TOAST_SEGMENT_SIZE: usize = 2 * 1024; const CHECKPOINT_INTERVAL: u64 = 1u64 * 1024 * 1024 * 1024; const CACHE_SIZE: usize = 32 * 1024; // 256Mb @@ -70,6 +72,7 @@ impl<'a> DoubleEndedIterator for ToastIterator<'a> { let mut next_segno = 0u16; while let Some(elem) = self.iter.next_back() { if let Ok((key, value)) = elem { + assert!(value.len() != 0); let key_len = key.len(); let n_segments = u16::from_be_bytes(key[key_len - 4..key_len - 2].try_into().unwrap()); @@ -90,7 +93,12 @@ impl<'a> DoubleEndedIterator for ToastIterator<'a> { } next_segno = segno; if next_segno == 0 { - let res = lz4_flex::decompress_size_prepended(&toast.unwrap()); + let toast = toast.unwrap(); + if toast.len() == 0 { + warn!("n_segments={}", n_segments); + } + assert!(toast.len() != 0); + let res = lz4_flex::decompress_size_prepended(&toast); return Some(if let Ok(decompressed_data) = res { Ok((key, decompressed_data)) } else {