Implement garbage collection of unused versions

This commit is contained in:
Konstantin Knizhnik
2021-04-21 19:04:30 +03:00
parent d8fa2ec367
commit c981f4ad66
6 changed files with 219 additions and 26 deletions

70
Cargo.lock generated
View File

@@ -1222,6 +1222,41 @@ dependencies = [
"winapi",
]
[[package]]
name = "num"
version = "0.2.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b8536030f9fea7127f841b45bb6243b27255787fb4eb83958aa1ef9d2fdc0c36"
dependencies = [
"num-bigint",
"num-complex",
"num-integer",
"num-iter",
"num-rational",
"num-traits",
]
[[package]]
name = "num-bigint"
version = "0.2.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "090c7f9998ee0ff65aa5b723e4009f7b217707f1fb5ea551329cc4d6231fb304"
dependencies = [
"autocfg",
"num-integer",
"num-traits",
]
[[package]]
name = "num-complex"
version = "0.2.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b6b19411a9719e753aff12e5187b74d60d3dc449ec3f4dc21e3989c3f554bc95"
dependencies = [
"autocfg",
"num-traits",
]
[[package]]
name = "num-integer"
version = "0.1.44"
@@ -1232,6 +1267,29 @@ dependencies = [
"num-traits",
]
[[package]]
name = "num-iter"
version = "0.1.42"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b2021c8337a54d21aca0d59a92577a029af9431cb59b909b03252b9c164fad59"
dependencies = [
"autocfg",
"num-integer",
"num-traits",
]
[[package]]
name = "num-rational"
version = "0.2.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5c000134b5dbf44adc5cb772486d335293351644b801551abe8f75c84cfa4aef"
dependencies = [
"autocfg",
"num-bigint",
"num-integer",
"num-traits",
]
[[package]]
name = "num-traits"
version = "0.2.14"
@@ -1330,6 +1388,7 @@ dependencies = [
"hex",
"lazy_static",
"log",
"parse_duration",
"postgres",
"postgres-protocol",
"postgres-types",
@@ -1384,6 +1443,17 @@ dependencies = [
"winapi",
]
[[package]]
name = "parse_duration"
version = "2.1.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7037e5e93e0172a5a96874380bf73bc6ecef022e26fa25f2be26864d6b3ba95d"
dependencies = [
"lazy_static",
"num",
"regex",
]
[[package]]
name = "peeking_take_while"
version = "0.1.2"

View File

@@ -41,5 +41,6 @@ walkdir = "2"
thiserror = "1.0"
hex = "0.4.3"
tar = "0.4.33"
parse_duration = "*"
postgres_ffi = { path = "../postgres_ffi" }

View File

@@ -9,6 +9,8 @@ use std::io;
use std::path::PathBuf;
use std::process::exit;
use std::thread;
use std::time::Duration;
use parse_duration::parse;
use anyhow::{Context, Result};
use clap::{App, Arg};
@@ -22,6 +24,8 @@ use pageserver::tui;
//use pageserver::walreceiver;
use pageserver::PageServerConf;
const DEFAULT_GC_HORIZON : u64 = 64*1024*1024;
fn main() -> Result<()> {
let arg_matches = App::new("Zenith page server")
.about("Materializes WAL stream to pages and serves them to the postgres")
@@ -46,11 +50,20 @@ fn main() -> Result<()> {
.takes_value(false)
.help("Run in the background"),
)
.arg(
Arg::with_name("gc_horizon")
.short("g")
.long("gc_horizon")
.takes_value(true)
.help("Garbage colletor horizon"),
)
.get_matches();
let mut conf = PageServerConf {
daemonize: false,
interactive: false,
gc_horizon: DEFAULT_GC_HORIZON,
gc_period: Duration::from_secs(10),
listen_addr: "127.0.0.1:5430".parse().unwrap(),
};
@@ -71,6 +84,14 @@ fn main() -> Result<()> {
conf.listen_addr = addr.parse()?;
}
if let Some(horizon) = arg_matches.value_of("gc_horizon") {
conf.gc_horizon = horizon.parse()?;
}
if let Some(period) = arg_matches.value_of("gc_period") {
conf.gc_period = parse(period)?;
}
start_pageserver(&conf)
}

View File

@@ -2,6 +2,7 @@ use std::fmt;
use std::net::SocketAddr;
use std::str::FromStr;
use std::path::PathBuf;
use std::time::Duration;
pub mod basebackup;
pub mod page_cache;
@@ -20,6 +21,8 @@ pub struct PageServerConf {
pub daemonize: bool,
pub interactive: bool,
pub listen_addr: SocketAddr,
pub gc_horizon: u64,
pub gc_period: Duration,
}
// Zenith Timeline ID is a 32-byte random ID.

View File

@@ -15,6 +15,7 @@ use crossbeam_channel::unbounded;
use crossbeam_channel::{Receiver, Sender};
use lazy_static::lazy_static;
use log::*;
use std::cmp::min;
use std::collections::HashMap;
use std::sync::atomic::AtomicU64;
use std::sync::atomic::Ordering;
@@ -124,7 +125,7 @@ pub fn get_or_restore_pagecache(
match pcaches.get(&timelineid) {
Some(pcache) => Ok(pcache.clone()),
None => {
let pcache = init_page_cache(&conf, timelineid);
let pcache = init_page_cache(conf, timelineid);
restore_timeline(conf, &pcache, timelineid)?;
@@ -145,11 +146,25 @@ pub fn get_or_restore_pagecache(
})
.unwrap();
let conf_copy = conf.clone();
let _gc_thread = thread::Builder::new()
.name("Garbage collection thread".into())
.spawn(move || {
gc_thread_main(&conf_copy, timelineid);
})
.unwrap();
return Ok(result);
}
}
}
fn gc_thread_main(conf: &PageServerConf, timelineid: ZTimelineId) {
info!("Garbage collection thread started {}", timelineid);
let pcache = get_pagecache(conf, timelineid).unwrap();
pcache.do_gc(conf).unwrap();
}
fn open_rocksdb(_conf: &PageServerConf, timelineid: ZTimelineId) -> DB {
let path = zenith_repo_dir().join(timelineid.to_string());
let mut opts = Options::default();
@@ -355,6 +370,110 @@ impl WALRecord {
// Public interface functions
impl PageCache {
fn do_gc(&self, conf: &PageServerConf) -> anyhow::Result<Bytes> {
let mut minbuf = BytesMut::new();
let mut maxbuf = BytesMut::new();
let cf = self.db.cf_handle(DEFAULT_COLUMN_FAMILY_NAME).unwrap();
loop {
thread::sleep(conf.gc_period);
let last_lsn = self.get_last_valid_lsn();
if last_lsn > conf.gc_horizon {
let horizon = last_lsn - conf.gc_horizon;
let mut maxkey = CacheKey {
tag: BufferTag {
rel: RelTag {
spcnode: u32::MAX,
dbnode: u32::MAX,
relnode: u32::MAX,
forknum: u8::MAX,
},
blknum: u32::MAX,
},
lsn: u64::MAX
};
loop {
maxbuf.clear();
maxkey.pack(&mut maxbuf);
let mut iter = self.db.iterator(IteratorMode::From(&maxbuf[..], Direction::Reverse));
if let Some((k,v)) = iter.next() {
minbuf.clear();
minbuf.extend_from_slice(&v);
let content = CacheEntryContent::unpack(&mut minbuf);
minbuf.clear();
minbuf.extend_from_slice(&k);
let key = CacheKey::unpack(&mut minbuf);
// Construct boundaries for old records cleanup
maxkey.tag = key.tag;
let last_lsn = key.lsn;
maxkey.lsn = min(horizon, last_lsn); // do not remove last version
let mut minkey = maxkey.clone();
minkey.lsn = 0;
// reconstruct most recent page version
if content.wal_record.is_some() {
// force reconstruction of most recent page version
self.reconstruct_page(key, content)?;
}
maxbuf.clear();
maxkey.pack(&mut maxbuf);
if last_lsn > horizon {
// locate most recent record before horizon
let mut iter = self.db.iterator(IteratorMode::From(&maxbuf[..], Direction::Reverse));
if let Some((k,v)) = iter.next() {
minbuf.clear();
minbuf.extend_from_slice(&v);
let content = CacheEntryContent::unpack(&mut minbuf);
if content.wal_record.is_some() {
minbuf.clear();
minbuf.extend_from_slice(&k);
let key = CacheKey::unpack(&mut minbuf);
self.reconstruct_page(key, content)?;
}
}
}
// remove records prior to horizon
minbuf.clear();
minkey.pack(&mut minbuf);
self.db.delete_range_cf(cf, &minbuf[..], &maxbuf[..])?;
maxkey = minkey;
}
}
}
}
}
fn reconstruct_page(&self, key: CacheKey, content: CacheEntryContent) -> anyhow::Result<Bytes> {
let entry_rc = Arc::new(CacheEntry::new(key.clone(), content));
let mut entry_content = entry_rc.content.lock().unwrap();
entry_content.apply_pending = true;
let s = &self.walredo_sender;
s.send(entry_rc.clone())?;
while entry_content.apply_pending {
entry_content = entry_rc.walredo_condvar.wait(entry_content).unwrap();
}
// We should now have a page image. If we don't, it means that WAL redo
// failed to reconstruct it. WAL redo should've logged that error already.
let page_img = match &entry_content.page_image {
Some(p) => p.clone(),
None => {
error!(
"could not apply WAL to reconstruct page image for GetPage@LSN request"
);
bail!("could not apply WAL to reconstruct page image");
}
};
self.put_page_image(key.tag, key.lsn, page_img.clone());
Ok(page_img)
}
fn wait_lsn(&self, lsn: u64) -> anyhow::Result<()> {
let mut shared = self.shared.lock().unwrap();
let mut waited = false;
@@ -437,30 +556,8 @@ impl PageCache {
} else if content.wal_record.is_some() {
buf.clear();
buf.extend_from_slice(&k);
let entry_rc = Arc::new(CacheEntry::new(CacheKey::unpack(&mut buf), content));
let mut entry_content = entry_rc.content.lock().unwrap();
entry_content.apply_pending = true;
let s = &self.walredo_sender;
s.send(entry_rc.clone())?;
while entry_content.apply_pending {
entry_content = entry_rc.walredo_condvar.wait(entry_content).unwrap();
}
// We should now have a page image. If we don't, it means that WAL redo
// failed to reconstruct it. WAL redo should've logged that error already.
page_img = match &entry_content.page_image {
Some(p) => p.clone(),
None => {
error!(
"could not apply WAL to reconstruct page image for GetPage@LSN request"
);
bail!("could not apply WAL to reconstruct page image");
}
};
self.put_page_image(tag, lsn, page_img.clone());
let key = CacheKey::unpack(&mut buf);
page_img = self.reconstruct_page(key, content)?;
} else {
// No base image, and no WAL record. Huh?
bail!("no page image or WAL record for requested page");

View File

@@ -24,6 +24,7 @@ use tokio::runtime;
use tokio::runtime::Runtime;
use tokio::sync::mpsc;
use tokio::task;
use std::time::Duration;
use crate::basebackup;
use crate::page_cache;
@@ -936,7 +937,7 @@ impl Connection {
// FIXME: I'm getting an error from the tokio copyout driver without this.
// I think it happens when the CommandComplete, CloseComplete and ReadyForQuery
// are sent in the same TCP packet as the CopyDone. I don't understand why.
thread::sleep(std::time::Duration::from_secs(1));
thread::sleep(Duration::from_secs(1));
Ok(())
}