Refactor code to do one iteration of GC to separate function.

This commit is contained in:
Heikki Linnakangas
2021-06-28 15:25:38 +03:00
parent eb7388e3e8
commit cb2ddf06d0

View File

@@ -535,6 +535,79 @@ impl Timeline for ObjectTimeline {
}
impl ObjectTimeline {
fn gc_iteration(&self, horizon: u64) -> Result<()> {
let last_lsn = self.get_last_valid_lsn();
// checked_sub() returns None on overflow.
if let Some(horizon) = last_lsn.checked_sub(horizon) {
// WAL is large enough to perform GC
let now = Instant::now();
let mut truncated = 0u64;
let mut deleted = 0u64;
// Iterate through all relations
for rels in &self.obj_store.list_rels(self.timelineid, 0, 0, last_lsn)? {
let rel = *rels;
let mut last_version = true;
let key = relation_size_key(self.timelineid, rel);
let mut max_size = 0u32;
let mut relation_dropped = false;
// Process relation metadata versions
for vers in self.obj_store.object_versions(&key, horizon)? {
let lsn = vers.0;
let rel_meta = RelationSizeEntry::des(&vers.1)?;
// If relation is dropped at the horizon,
// we can remove all its versions including last (Unlink)
match rel_meta {
RelationSizeEntry::Size(size) => max_size = max(max_size, size),
RelationSizeEntry::Unlink => {
if last_version {
relation_dropped = true;
info!("Relation {:?} dropped", rels);
}
}
}
if last_version {
last_version = false;
if !relation_dropped {
// preserve last version
continue;
}
}
self.obj_store.unlink(&key, lsn)?;
deleted += 1;
}
// Now process all relation blocks
for blknum in 0..max_size {
let buf_tag = BufferTag { rel, blknum };
let key = ObjectKey {
timeline: self.timelineid,
tag: ObjectTag::RelationBuffer(buf_tag),
};
last_version = true;
for vers in self.obj_store.object_versions(&key, horizon)? {
let lsn = vers.0;
if last_version {
last_version = false;
truncated += 1;
if !relation_dropped {
// preserve and materialize last version before deleting all preceeding
self.get_page_at_lsn_nowait(buf_tag, lsn)?;
continue;
}
}
self.obj_store.unlink(&key, lsn)?;
deleted += 1;
}
}
}
info!("Garbage collection completed in {:?}: {} version histories truncated, {} versions deleted",
now.elapsed(), &truncated, &deleted);
}
Ok(())
}
fn get_page_at_lsn_nowait(&self, tag: BufferTag, lsn: Lsn) -> Result<Bytes> {
// Look up the page entry. If it's a page image, return that. If it's a WAL record,
// ask the WAL redo service to reconstruct the page image from the WAL records.
@@ -671,86 +744,15 @@ impl ObjectTimeline {
.name("Garbage collection thread".into())
.spawn(move || {
// FIXME
timeline_rc.do_gc(conf).expect("GC thread died");
timeline_rc.gc_loop(conf).expect("GC thread died");
})
.unwrap();
}
fn do_gc(&self, conf: &'static PageServerConf) -> Result<()> {
fn gc_loop(&self, conf: &'static PageServerConf) -> Result<()> {
loop {
thread::sleep(conf.gc_period);
let last_lsn = self.get_last_valid_lsn();
// checked_sub() returns None on overflow.
if let Some(horizon) = last_lsn.checked_sub(conf.gc_horizon) {
// WAL is large enough to perform GC
let now = Instant::now();
let mut truncated = 0u64;
let mut deleted = 0u64;
// Iterate through all relations
for rels in &self.obj_store.list_rels(self.timelineid, 0, 0, last_lsn)? {
let rel = *rels;
let mut last_version = true;
let key = relation_size_key(self.timelineid, rel);
let mut max_size = 0u32;
let mut relation_dropped = false;
// Process relation metadata versions
for vers in self.obj_store.object_versions(&key, horizon)? {
let lsn = vers.0;
let rel_meta = RelationSizeEntry::des(&vers.1)?;
// If relation is dropped at the horizon,
// we can remove all its versions including last (Unlink)
match rel_meta {
RelationSizeEntry::Size(size) => max_size = max(max_size, size),
RelationSizeEntry::Unlink => {
if last_version {
relation_dropped = true;
info!("Relation {:?} dropped", rels);
}
}
}
if last_version {
last_version = false;
if !relation_dropped {
// preserve last version
continue;
}
}
self.obj_store.unlink(&key, lsn)?;
deleted += 1;
}
// Now process all relation blocks
for blknum in 0..max_size {
let buf_tag = BufferTag {
rel,
blknum,
};
let key = ObjectKey {
timeline: self.timelineid,
tag: ObjectTag::RelationBuffer(buf_tag),
};
last_version = true;
for vers in self.obj_store.object_versions(&key, horizon)? {
let lsn = vers.0;
if last_version {
last_version = false;
truncated += 1;
if !relation_dropped {
// preserve and materialize last version before deleting all preceeding
self.get_page_at_lsn_nowait(buf_tag, lsn)?;
continue;
}
}
self.obj_store.unlink(&key, lsn)?;
deleted += 1;
}
}
}
info!("Garbage collection completed in {:?}: {} version histories truncated, {} versions deleted",
now.elapsed(), truncated, deleted);
}
self.gc_iteration(conf.gc_horizon)?;
}
}