Move gc_iteration() function to Repository trait.

The upcoming layered storage implementation handles GC as a
repository-wide operation because it needs to pay attention to the branch
points of all timelines.
This commit is contained in:
Heikki Linnakangas
2021-08-12 23:46:01 +03:00
parent 97f9021c88
commit 8517d9696d
3 changed files with 186 additions and 160 deletions

View File

@@ -167,6 +167,27 @@ impl Repository for ObjectRepository {
Ok(())
}
fn gc_iteration(
&self,
timelineid: Option<ZTimelineId>,
horizon: u64,
compact: bool,
) -> Result<GcResult> {
if let Some(timelineid) = timelineid {
let timelines = self.timelines.lock().unwrap();
// FIXME: If the timeline isn't opened yet, we don't open it just for GC.
if let Some(timeline) = timelines.get(&timelineid) {
return timeline.gc_iteration(horizon, compact);
}
} else {
// FIXME: the object repository doesn't support GC on all timelines. Should
// iterate all the timelines here
bail!("GC of all timelines not implemented");
}
return Ok(GcResult::default());
}
}
///
@@ -693,149 +714,6 @@ impl Timeline for ObjectTimeline {
let iter = self.obj_store.objects(self.timelineid, lsn)?;
Ok(Box::new(ObjectHistory { lsn, iter }))
}
fn gc_iteration(&self, horizon: u64, compact: bool) -> Result<GcResult> {
let last_lsn = self.get_last_valid_lsn();
let mut result: GcResult = Default::default();
// checked_sub() returns None on overflow.
if let Some(horizon) = last_lsn.checked_sub(horizon) {
// WAL is large enough to perform GC
let now = Instant::now();
// Iterate through all objects in timeline
for obj in self.obj_store.list_objects(self.timelineid, last_lsn)? {
result.inspected += 1;
match obj {
ObjectTag::RelationMetadata(_) => {
// Do not need to reconstruct page images,
// just delete all old versions over horizon
let mut last_version = true;
let key = ObjectKey {
timeline: self.timelineid,
tag: obj,
};
for vers in self.obj_store.object_versions(&key, horizon)? {
let lsn = vers.0;
if last_version {
let content = vers.1;
match ObjectValue::des(&content[..])? {
ObjectValue::RelationSize(RelationSizeEntry::Unlink) => {
self.obj_store.unlink(&key, lsn)?;
result.deleted += 1;
result.dropped += 1;
}
_ => (), // preserve last version
}
last_version = false;
result.truncated += 1;
result.n_relations += 1;
} else {
self.obj_store.unlink(&key, lsn)?;
result.deleted += 1;
}
}
}
ObjectTag::Buffer(rel, blknum) => {
if rel.is_blocky() {
// Reconstruct page at horizon unless relation was dropped
// and delete all older versions over horizon
let mut last_version = true;
let key = ObjectKey {
timeline: self.timelineid,
tag: obj,
};
for vers in self.obj_store.object_versions(&key, horizon)? {
let lsn = vers.0;
if last_version {
result.truncated += 1;
last_version = false;
if let Some(rel_size) =
self.relsize_get_nowait(rel, last_lsn)?
{
if rel_size > blknum {
// preserve and materialize last version before deleting all preceeding
self.get_page_at_lsn_nowait(rel, blknum, lsn)?;
continue;
}
debug!("Drop last block {} of relation {} at {} because it is beyond relation size {}", blknum, rel, lsn, rel_size);
} else {
if let Some(rel_size) =
self.relsize_get_nowait(rel, last_lsn)?
{
debug!("Preserve block {} of relation {} at {} because relation has size {} at {}", blknum, rel, lsn, rel_size, last_lsn);
continue;
}
debug!("Relation {} was dropped at {}", rel, lsn);
}
// relation was dropped or truncated so this block can be removed
}
self.obj_store.unlink(&key, lsn)?;
result.deleted += 1;
}
} else {
// versioned always materialized objects: no need to reconstruct pages
// Remove old versions over horizon
let mut last_version = true;
let key = ObjectKey {
timeline: self.timelineid,
tag: obj,
};
for vers in self.obj_store.object_versions(&key, horizon)? {
let lsn = vers.0;
if last_version {
last_version = false;
// Don't preserve last version for unlinked relishes
match rel {
RelishTag::TwoPhase { .. } => {
if !self.get_rel_exists(rel, last_lsn)? {
self.obj_store.unlink(&key, lsn)?;
result.prep_deleted += 1;
}
}
// TODO treat unlinked FileNodeMap too
_ => (),
}
} else {
// delete deteriorated version
self.obj_store.unlink(&key, lsn)?;
match rel {
RelishTag::TwoPhase { .. } => {
result.prep_deleted += 1;
}
RelishTag::Checkpoint => {
result.chkp_deleted += 1;
}
RelishTag::ControlFile => {
result.control_deleted += 1;
}
RelishTag::FileNodeMap { .. } => {
result.filenodemap_deleted += 1;
}
_ => {
bail!(
"unexpected non-blocky object found during GC {}",
rel
);
}
};
}
}
}
}
_ => (), // do nothing
}
}
result.elapsed = now.elapsed();
info!("Garbage collection completed in {:?}: {} relations inspected, {} object inspected, {} version histories truncated, {} versions deleted, {} relations dropped",
result.elapsed, result.n_relations, result.inspected, result.truncated, result.deleted, result.dropped);
if compact {
self.obj_store.compact();
}
}
Ok(result)
}
}
impl ObjectTimeline {
@@ -1028,6 +906,149 @@ impl ObjectTimeline {
self.obj_store.put(&key, Lsn(0), &ObjectValue::ser(&val)?)
}
fn gc_iteration(&self, horizon: u64, compact: bool) -> Result<GcResult> {
let last_lsn = self.get_last_valid_lsn();
let mut result: GcResult = Default::default();
// checked_sub() returns None on overflow.
if let Some(horizon) = last_lsn.checked_sub(horizon) {
// WAL is large enough to perform GC
let now = Instant::now();
// Iterate through all objects in timeline
for obj in self.obj_store.list_objects(self.timelineid, last_lsn)? {
result.inspected += 1;
match obj {
ObjectTag::RelationMetadata(_) => {
// Do not need to reconstruct page images,
// just delete all old versions over horizon
let mut last_version = true;
let key = ObjectKey {
timeline: self.timelineid,
tag: obj,
};
for vers in self.obj_store.object_versions(&key, horizon)? {
let lsn = vers.0;
if last_version {
let content = vers.1;
match ObjectValue::des(&content[..])? {
ObjectValue::RelationSize(RelationSizeEntry::Unlink) => {
self.obj_store.unlink(&key, lsn)?;
result.deleted += 1;
result.dropped += 1;
}
_ => (), // preserve last version
}
last_version = false;
result.truncated += 1;
result.n_relations += 1;
} else {
self.obj_store.unlink(&key, lsn)?;
result.deleted += 1;
}
}
}
ObjectTag::Buffer(rel, blknum) => {
if rel.is_blocky() {
// Reconstruct page at horizon unless relation was dropped
// and delete all older versions over horizon
let mut last_version = true;
let key = ObjectKey {
timeline: self.timelineid,
tag: obj,
};
for vers in self.obj_store.object_versions(&key, horizon)? {
let lsn = vers.0;
if last_version {
result.truncated += 1;
last_version = false;
if let Some(rel_size) =
self.relsize_get_nowait(rel, last_lsn)?
{
if rel_size > blknum {
// preserve and materialize last version before deleting all preceeding
self.get_page_at_lsn_nowait(rel, blknum, lsn)?;
continue;
}
debug!("Drop last block {} of relation {} at {} because it is beyond relation size {}", blknum, rel, lsn, rel_size);
} else {
if let Some(rel_size) =
self.relsize_get_nowait(rel, last_lsn)?
{
debug!("Preserve block {} of relation {} at {} because relation has size {} at {}", blknum, rel, lsn, rel_size, last_lsn);
continue;
}
debug!("Relation {} was dropped at {}", rel, lsn);
}
// relation was dropped or truncated so this block can be removed
}
self.obj_store.unlink(&key, lsn)?;
result.deleted += 1;
}
} else {
// versioned always materialized objects: no need to reconstruct pages
// Remove old versions over horizon
let mut last_version = true;
let key = ObjectKey {
timeline: self.timelineid,
tag: obj,
};
for vers in self.obj_store.object_versions(&key, horizon)? {
let lsn = vers.0;
if last_version {
last_version = false;
// Don't preserve last version for unlinked relishes
match rel {
RelishTag::TwoPhase { .. } => {
if !self.get_rel_exists(rel, last_lsn)? {
self.obj_store.unlink(&key, lsn)?;
result.prep_deleted += 1;
}
}
// TODO treat unlinked FileNodeMap too
_ => (),
}
} else {
// delete deteriorated version
self.obj_store.unlink(&key, lsn)?;
match rel {
RelishTag::TwoPhase { .. } => {
result.prep_deleted += 1;
}
RelishTag::Checkpoint => {
result.chkp_deleted += 1;
}
RelishTag::ControlFile => {
result.control_deleted += 1;
}
RelishTag::FileNodeMap { .. } => {
result.filenodemap_deleted += 1;
}
_ => {
bail!(
"unexpected non-blocky object found during GC {}",
rel
);
}
};
}
}
}
}
_ => (), // do nothing
}
}
result.elapsed = now.elapsed();
info!("Garbage collection completed in {:?}: {} relations inspected, {} object inspected, {} version histories truncated, {} versions deleted, {} relations dropped",
result.elapsed, result.n_relations, result.inspected, result.truncated, result.deleted, result.dropped);
if compact {
self.obj_store.compact();
}
}
Ok(result)
}
}
struct ObjectHistory<'a> {

View File

@@ -679,10 +679,9 @@ impl postgres_backend::Handler for PageServerHandler {
.map(|h| h.as_str().parse())
.unwrap_or(Ok(self.conf.gc_horizon))?;
let timeline =
page_cache::get_repository_for_tenant(&tenantid)?.get_timeline(timelineid)?;
let repo = page_cache::get_repository_for_tenant(&tenantid)?;
let result = timeline.gc_iteration(gc_horizon, true)?;
let result = repo.gc_iteration(Some(timelineid), gc_horizon, true)?;
pgb.write_message_noflush(&BeMessage::RowDescription(&[
RowDescriptor::int8_col(b"n_relations"),

View File

@@ -27,6 +27,26 @@ pub trait Repository: Send + Sync {
/// Branch a timeline
fn branch_timeline(&self, src: ZTimelineId, dst: ZTimelineId, start_lsn: Lsn) -> Result<()>;
/// perform one garbage collection iteration.
/// garbage collection is periodically performed by gc thread,
/// but it can be explicitly requested through page server api.
///
/// 'timelineid' specifies the timeline to GC, or None for all.
/// `horizon` specifies delta from last lsn to preserve all object versions (pitr interval).
/// `compact` parameter is used to force compaction of storage.
/// some storage implementation are based on lsm tree and require periodic merge (compaction).
/// usually storage implementation determines itself when compaction should be performed.
/// but for gc tests it way be useful to force compaction just after completion of gc iteration
/// to make sure that all detected garbage is removed.
/// so right now `compact` is set to true when gc explicitly requested through page srver api,
/// and is st to false in gc threads which infinitely repeats gc iterations in loop.
fn gc_iteration(
&self,
timelineid: Option<ZTimelineId>,
horizon: u64,
compact: bool,
) -> Result<GcResult>;
// TODO get timelines?
//fn get_stats(&self) -> RepositoryStats;
}
@@ -138,20 +158,6 @@ pub trait Timeline: Send + Sync {
/// Relation size is increased implicitly and decreased with Truncate updates.
// TODO ordering guarantee?
fn history<'a>(&'a self) -> Result<Box<dyn History + 'a>>;
/// Perform one garbage collection iteration.
/// Garbage collection is periodically performed by GC thread,
/// but it can be explicitly requested through page server API.
///
/// `horizon` specifies delta from last LSN to preserve all object versions (PITR interval).
/// `compact` parameter is used to force compaction of storage.
/// Some storage implementation are based on LSM tree and require periodic merge (compaction).
/// Usually storage implementation determines itself when compaction should be performed.
/// But for GC tests it way be useful to force compaction just after completion of GC iteration
/// to make sure that all detected garbage is removed.
/// So right now `compact` is set to true when GC explicitly requested through page srver API,
/// and is st to false in GC threads which infinitely repeats GC iterations in loop.
fn gc_iteration(&self, horizon: u64, compact: bool) -> Result<GcResult>;
}
pub trait History: Iterator<Item = Result<Modification>> {