diff --git a/pageserver/src/object_repository.rs b/pageserver/src/object_repository.rs index a8148a3de9..31b8c40d4e 100644 --- a/pageserver/src/object_repository.rs +++ b/pageserver/src/object_repository.rs @@ -167,6 +167,27 @@ impl Repository for ObjectRepository { Ok(()) } + + fn gc_iteration( + &self, + timelineid: Option, + horizon: u64, + compact: bool, + ) -> Result { + if let Some(timelineid) = timelineid { + let timelines = self.timelines.lock().unwrap(); + + // FIXME: If the timeline isn't opened yet, we don't open it just for GC. + if let Some(timeline) = timelines.get(&timelineid) { + return timeline.gc_iteration(horizon, compact); + } + } else { + // FIXME: the object repository doesn't support GC on all timelines. Should + // iterate all the timelines here + bail!("GC of all timelines not implemented"); + } + return Ok(GcResult::default()); + } } /// @@ -693,149 +714,6 @@ impl Timeline for ObjectTimeline { let iter = self.obj_store.objects(self.timelineid, lsn)?; Ok(Box::new(ObjectHistory { lsn, iter })) } - - fn gc_iteration(&self, horizon: u64, compact: bool) -> Result { - let last_lsn = self.get_last_valid_lsn(); - let mut result: GcResult = Default::default(); - - // checked_sub() returns None on overflow. - if let Some(horizon) = last_lsn.checked_sub(horizon) { - // WAL is large enough to perform GC - let now = Instant::now(); - // Iterate through all objects in timeline - for obj in self.obj_store.list_objects(self.timelineid, last_lsn)? { - result.inspected += 1; - match obj { - ObjectTag::RelationMetadata(_) => { - // Do not need to reconstruct page images, - // just delete all old versions over horizon - let mut last_version = true; - let key = ObjectKey { - timeline: self.timelineid, - tag: obj, - }; - for vers in self.obj_store.object_versions(&key, horizon)? { - let lsn = vers.0; - if last_version { - let content = vers.1; - match ObjectValue::des(&content[..])? { - ObjectValue::RelationSize(RelationSizeEntry::Unlink) => { - self.obj_store.unlink(&key, lsn)?; - result.deleted += 1; - result.dropped += 1; - } - _ => (), // preserve last version - } - last_version = false; - result.truncated += 1; - result.n_relations += 1; - } else { - self.obj_store.unlink(&key, lsn)?; - result.deleted += 1; - } - } - } - ObjectTag::Buffer(rel, blknum) => { - if rel.is_blocky() { - // Reconstruct page at horizon unless relation was dropped - // and delete all older versions over horizon - let mut last_version = true; - let key = ObjectKey { - timeline: self.timelineid, - tag: obj, - }; - for vers in self.obj_store.object_versions(&key, horizon)? { - let lsn = vers.0; - if last_version { - result.truncated += 1; - last_version = false; - if let Some(rel_size) = - self.relsize_get_nowait(rel, last_lsn)? - { - if rel_size > blknum { - // preserve and materialize last version before deleting all preceeding - self.get_page_at_lsn_nowait(rel, blknum, lsn)?; - continue; - } - debug!("Drop last block {} of relation {} at {} because it is beyond relation size {}", blknum, rel, lsn, rel_size); - } else { - if let Some(rel_size) = - self.relsize_get_nowait(rel, last_lsn)? - { - debug!("Preserve block {} of relation {} at {} because relation has size {} at {}", blknum, rel, lsn, rel_size, last_lsn); - continue; - } - debug!("Relation {} was dropped at {}", rel, lsn); - } - // relation was dropped or truncated so this block can be removed - } - self.obj_store.unlink(&key, lsn)?; - result.deleted += 1; - } - } else { - // versioned always materialized objects: no need to reconstruct pages - - // Remove old versions over horizon - let mut last_version = true; - let key = ObjectKey { - timeline: self.timelineid, - tag: obj, - }; - for vers in self.obj_store.object_versions(&key, horizon)? { - let lsn = vers.0; - if last_version { - last_version = false; - // Don't preserve last version for unlinked relishes - match rel { - RelishTag::TwoPhase { .. } => { - if !self.get_rel_exists(rel, last_lsn)? { - self.obj_store.unlink(&key, lsn)?; - result.prep_deleted += 1; - } - } - // TODO treat unlinked FileNodeMap too - _ => (), - } - } else { - // delete deteriorated version - self.obj_store.unlink(&key, lsn)?; - - match rel { - RelishTag::TwoPhase { .. } => { - result.prep_deleted += 1; - } - RelishTag::Checkpoint => { - result.chkp_deleted += 1; - } - RelishTag::ControlFile => { - result.control_deleted += 1; - } - RelishTag::FileNodeMap { .. } => { - result.filenodemap_deleted += 1; - } - _ => { - bail!( - "unexpected non-blocky object found during GC {}", - rel - ); - } - }; - } - } - } - } - _ => (), // do nothing - } - } - result.elapsed = now.elapsed(); - info!("Garbage collection completed in {:?}: {} relations inspected, {} object inspected, {} version histories truncated, {} versions deleted, {} relations dropped", - result.elapsed, result.n_relations, result.inspected, result.truncated, result.deleted, result.dropped); - if compact { - self.obj_store.compact(); - } - } - Ok(result) - } } impl ObjectTimeline { @@ -1028,6 +906,149 @@ impl ObjectTimeline { self.obj_store.put(&key, Lsn(0), &ObjectValue::ser(&val)?) } + + fn gc_iteration(&self, horizon: u64, compact: bool) -> Result { + let last_lsn = self.get_last_valid_lsn(); + let mut result: GcResult = Default::default(); + + // checked_sub() returns None on overflow. + if let Some(horizon) = last_lsn.checked_sub(horizon) { + // WAL is large enough to perform GC + let now = Instant::now(); + // Iterate through all objects in timeline + for obj in self.obj_store.list_objects(self.timelineid, last_lsn)? { + result.inspected += 1; + match obj { + ObjectTag::RelationMetadata(_) => { + // Do not need to reconstruct page images, + // just delete all old versions over horizon + let mut last_version = true; + let key = ObjectKey { + timeline: self.timelineid, + tag: obj, + }; + for vers in self.obj_store.object_versions(&key, horizon)? { + let lsn = vers.0; + if last_version { + let content = vers.1; + match ObjectValue::des(&content[..])? { + ObjectValue::RelationSize(RelationSizeEntry::Unlink) => { + self.obj_store.unlink(&key, lsn)?; + result.deleted += 1; + result.dropped += 1; + } + _ => (), // preserve last version + } + last_version = false; + result.truncated += 1; + result.n_relations += 1; + } else { + self.obj_store.unlink(&key, lsn)?; + result.deleted += 1; + } + } + } + ObjectTag::Buffer(rel, blknum) => { + if rel.is_blocky() { + // Reconstruct page at horizon unless relation was dropped + // and delete all older versions over horizon + let mut last_version = true; + let key = ObjectKey { + timeline: self.timelineid, + tag: obj, + }; + for vers in self.obj_store.object_versions(&key, horizon)? { + let lsn = vers.0; + if last_version { + result.truncated += 1; + last_version = false; + if let Some(rel_size) = + self.relsize_get_nowait(rel, last_lsn)? + { + if rel_size > blknum { + // preserve and materialize last version before deleting all preceeding + self.get_page_at_lsn_nowait(rel, blknum, lsn)?; + continue; + } + debug!("Drop last block {} of relation {} at {} because it is beyond relation size {}", blknum, rel, lsn, rel_size); + } else { + if let Some(rel_size) = + self.relsize_get_nowait(rel, last_lsn)? + { + debug!("Preserve block {} of relation {} at {} because relation has size {} at {}", blknum, rel, lsn, rel_size, last_lsn); + continue; + } + debug!("Relation {} was dropped at {}", rel, lsn); + } + // relation was dropped or truncated so this block can be removed + } + self.obj_store.unlink(&key, lsn)?; + result.deleted += 1; + } + } else { + // versioned always materialized objects: no need to reconstruct pages + + // Remove old versions over horizon + let mut last_version = true; + let key = ObjectKey { + timeline: self.timelineid, + tag: obj, + }; + for vers in self.obj_store.object_versions(&key, horizon)? { + let lsn = vers.0; + if last_version { + last_version = false; + // Don't preserve last version for unlinked relishes + match rel { + RelishTag::TwoPhase { .. } => { + if !self.get_rel_exists(rel, last_lsn)? { + self.obj_store.unlink(&key, lsn)?; + result.prep_deleted += 1; + } + } + // TODO treat unlinked FileNodeMap too + _ => (), + } + } else { + // delete deteriorated version + self.obj_store.unlink(&key, lsn)?; + + match rel { + RelishTag::TwoPhase { .. } => { + result.prep_deleted += 1; + } + RelishTag::Checkpoint => { + result.chkp_deleted += 1; + } + RelishTag::ControlFile => { + result.control_deleted += 1; + } + RelishTag::FileNodeMap { .. } => { + result.filenodemap_deleted += 1; + } + _ => { + bail!( + "unexpected non-blocky object found during GC {}", + rel + ); + } + }; + } + } + } + } + _ => (), // do nothing + } + } + result.elapsed = now.elapsed(); + info!("Garbage collection completed in {:?}: {} relations inspected, {} object inspected, {} version histories truncated, {} versions deleted, {} relations dropped", + result.elapsed, result.n_relations, result.inspected, result.truncated, result.deleted, result.dropped); + if compact { + self.obj_store.compact(); + } + } + Ok(result) + } } struct ObjectHistory<'a> { diff --git a/pageserver/src/page_service.rs b/pageserver/src/page_service.rs index 612649f27e..2caffb1eb2 100644 --- a/pageserver/src/page_service.rs +++ b/pageserver/src/page_service.rs @@ -679,10 +679,9 @@ impl postgres_backend::Handler for PageServerHandler { .map(|h| h.as_str().parse()) .unwrap_or(Ok(self.conf.gc_horizon))?; - let timeline = - page_cache::get_repository_for_tenant(&tenantid)?.get_timeline(timelineid)?; + let repo = page_cache::get_repository_for_tenant(&tenantid)?; - let result = timeline.gc_iteration(gc_horizon, true)?; + let result = repo.gc_iteration(Some(timelineid), gc_horizon, true)?; pgb.write_message_noflush(&BeMessage::RowDescription(&[ RowDescriptor::int8_col(b"n_relations"), diff --git a/pageserver/src/repository.rs b/pageserver/src/repository.rs index 8346d108c5..bb6388a532 100644 --- a/pageserver/src/repository.rs +++ b/pageserver/src/repository.rs @@ -27,6 +27,26 @@ pub trait Repository: Send + Sync { /// Branch a timeline fn branch_timeline(&self, src: ZTimelineId, dst: ZTimelineId, start_lsn: Lsn) -> Result<()>; + /// perform one garbage collection iteration. + /// garbage collection is periodically performed by gc thread, + /// but it can be explicitly requested through page server api. + /// + /// 'timelineid' specifies the timeline to GC, or None for all. + /// `horizon` specifies delta from last lsn to preserve all object versions (pitr interval). + /// `compact` parameter is used to force compaction of storage. + /// some storage implementation are based on lsm tree and require periodic merge (compaction). + /// usually storage implementation determines itself when compaction should be performed. + /// but for gc tests it way be useful to force compaction just after completion of gc iteration + /// to make sure that all detected garbage is removed. + /// so right now `compact` is set to true when gc explicitly requested through page srver api, + /// and is st to false in gc threads which infinitely repeats gc iterations in loop. + fn gc_iteration( + &self, + timelineid: Option, + horizon: u64, + compact: bool, + ) -> Result; + // TODO get timelines? //fn get_stats(&self) -> RepositoryStats; } @@ -138,20 +158,6 @@ pub trait Timeline: Send + Sync { /// Relation size is increased implicitly and decreased with Truncate updates. // TODO ordering guarantee? fn history<'a>(&'a self) -> Result>; - - /// Perform one garbage collection iteration. - /// Garbage collection is periodically performed by GC thread, - /// but it can be explicitly requested through page server API. - /// - /// `horizon` specifies delta from last LSN to preserve all object versions (PITR interval). - /// `compact` parameter is used to force compaction of storage. - /// Some storage implementation are based on LSM tree and require periodic merge (compaction). - /// Usually storage implementation determines itself when compaction should be performed. - /// But for GC tests it way be useful to force compaction just after completion of GC iteration - /// to make sure that all detected garbage is removed. - /// So right now `compact` is set to true when GC explicitly requested through page srver API, - /// and is st to false in GC threads which infinitely repeats GC iterations in loop. - fn gc_iteration(&self, horizon: u64, compact: bool) -> Result; } pub trait History: Iterator> {