diff --git a/Cargo.lock b/Cargo.lock index a16bd155c9..c63d0746a0 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1222,6 +1222,41 @@ dependencies = [ "winapi", ] +[[package]] +name = "num" +version = "0.2.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b8536030f9fea7127f841b45bb6243b27255787fb4eb83958aa1ef9d2fdc0c36" +dependencies = [ + "num-bigint", + "num-complex", + "num-integer", + "num-iter", + "num-rational", + "num-traits", +] + +[[package]] +name = "num-bigint" +version = "0.2.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "090c7f9998ee0ff65aa5b723e4009f7b217707f1fb5ea551329cc4d6231fb304" +dependencies = [ + "autocfg", + "num-integer", + "num-traits", +] + +[[package]] +name = "num-complex" +version = "0.2.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b6b19411a9719e753aff12e5187b74d60d3dc449ec3f4dc21e3989c3f554bc95" +dependencies = [ + "autocfg", + "num-traits", +] + [[package]] name = "num-integer" version = "0.1.44" @@ -1232,6 +1267,29 @@ dependencies = [ "num-traits", ] +[[package]] +name = "num-iter" +version = "0.1.42" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b2021c8337a54d21aca0d59a92577a029af9431cb59b909b03252b9c164fad59" +dependencies = [ + "autocfg", + "num-integer", + "num-traits", +] + +[[package]] +name = "num-rational" +version = "0.2.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5c000134b5dbf44adc5cb772486d335293351644b801551abe8f75c84cfa4aef" +dependencies = [ + "autocfg", + "num-bigint", + "num-integer", + "num-traits", +] + [[package]] name = "num-traits" version = "0.2.14" @@ -1330,6 +1388,7 @@ dependencies = [ "hex", "lazy_static", "log", + "parse_duration", "postgres", "postgres-protocol", "postgres-types", @@ -1384,6 +1443,17 @@ dependencies = [ "winapi", ] +[[package]] +name = "parse_duration" +version = "2.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7037e5e93e0172a5a96874380bf73bc6ecef022e26fa25f2be26864d6b3ba95d" +dependencies = [ + "lazy_static", + "num", + "regex", +] + [[package]] name = "peeking_take_while" version = "0.1.2" diff --git a/pageserver/Cargo.toml b/pageserver/Cargo.toml index 41e0a548fb..fc2c8618b6 100644 --- a/pageserver/Cargo.toml +++ b/pageserver/Cargo.toml @@ -41,5 +41,6 @@ walkdir = "2" thiserror = "1.0" hex = "0.4.3" tar = "0.4.33" +parse_duration = "*" postgres_ffi = { path = "../postgres_ffi" } diff --git a/pageserver/src/bin/pageserver.rs b/pageserver/src/bin/pageserver.rs index 340894d55b..0a966a81a6 100644 --- a/pageserver/src/bin/pageserver.rs +++ b/pageserver/src/bin/pageserver.rs @@ -9,6 +9,8 @@ use std::io; use std::path::PathBuf; use std::process::exit; use std::thread; +use std::time::Duration; +use parse_duration::parse; use anyhow::{Context, Result}; use clap::{App, Arg}; @@ -22,6 +24,8 @@ use pageserver::tui; //use pageserver::walreceiver; use pageserver::PageServerConf; +const DEFAULT_GC_HORIZON : u64 = 64*1024*1024; + fn main() -> Result<()> { let arg_matches = App::new("Zenith page server") .about("Materializes WAL stream to pages and serves them to the postgres") @@ -46,11 +50,20 @@ fn main() -> Result<()> { .takes_value(false) .help("Run in the background"), ) + .arg( + Arg::with_name("gc_horizon") + .short("g") + .long("gc_horizon") + .takes_value(true) + .help("Garbage colletor horizon"), + ) .get_matches(); let mut conf = PageServerConf { daemonize: false, interactive: false, + gc_horizon: DEFAULT_GC_HORIZON, + gc_period: Duration::from_secs(10), listen_addr: "127.0.0.1:5430".parse().unwrap(), }; @@ -71,6 +84,14 @@ fn main() -> Result<()> { conf.listen_addr = addr.parse()?; } + if let Some(horizon) = arg_matches.value_of("gc_horizon") { + conf.gc_horizon = horizon.parse()?; + } + + if let Some(period) = arg_matches.value_of("gc_period") { + conf.gc_period = parse(period)?; + } + start_pageserver(&conf) } diff --git a/pageserver/src/lib.rs b/pageserver/src/lib.rs index 3b9eea17f5..f6992cd8e1 100644 --- a/pageserver/src/lib.rs +++ b/pageserver/src/lib.rs @@ -2,6 +2,7 @@ use std::fmt; use std::net::SocketAddr; use std::str::FromStr; use std::path::PathBuf; +use std::time::Duration; pub mod basebackup; pub mod page_cache; @@ -20,6 +21,8 @@ pub struct PageServerConf { pub daemonize: bool, pub interactive: bool, pub listen_addr: SocketAddr, + pub gc_horizon: u64, + pub gc_period: Duration, } // Zenith Timeline ID is a 32-byte random ID. diff --git a/pageserver/src/page_cache.rs b/pageserver/src/page_cache.rs index 79ea7f072b..04bd65a319 100644 --- a/pageserver/src/page_cache.rs +++ b/pageserver/src/page_cache.rs @@ -15,6 +15,7 @@ use crossbeam_channel::unbounded; use crossbeam_channel::{Receiver, Sender}; use lazy_static::lazy_static; use log::*; +use std::cmp::min; use std::collections::HashMap; use std::sync::atomic::AtomicU64; use std::sync::atomic::Ordering; @@ -124,7 +125,7 @@ pub fn get_or_restore_pagecache( match pcaches.get(&timelineid) { Some(pcache) => Ok(pcache.clone()), None => { - let pcache = init_page_cache(&conf, timelineid); + let pcache = init_page_cache(conf, timelineid); restore_timeline(conf, &pcache, timelineid)?; @@ -145,11 +146,25 @@ pub fn get_or_restore_pagecache( }) .unwrap(); + let conf_copy = conf.clone(); + let _gc_thread = thread::Builder::new() + .name("Garbage collection thread".into()) + .spawn(move || { + gc_thread_main(&conf_copy, timelineid); + }) + .unwrap(); + return Ok(result); } } } +fn gc_thread_main(conf: &PageServerConf, timelineid: ZTimelineId) { + info!("Garbage collection thread started {}", timelineid); + let pcache = get_pagecache(conf, timelineid).unwrap(); + pcache.do_gc(conf).unwrap(); +} + fn open_rocksdb(_conf: &PageServerConf, timelineid: ZTimelineId) -> DB { let path = zenith_repo_dir().join(timelineid.to_string()); let mut opts = Options::default(); @@ -355,6 +370,110 @@ impl WALRecord { // Public interface functions impl PageCache { + fn do_gc(&self, conf: &PageServerConf) -> anyhow::Result { + let mut minbuf = BytesMut::new(); + let mut maxbuf = BytesMut::new(); + let cf = self.db.cf_handle(DEFAULT_COLUMN_FAMILY_NAME).unwrap(); + loop { + thread::sleep(conf.gc_period); + let last_lsn = self.get_last_valid_lsn(); + if last_lsn > conf.gc_horizon { + let horizon = last_lsn - conf.gc_horizon; + let mut maxkey = CacheKey { + tag: BufferTag { + rel: RelTag { + spcnode: u32::MAX, + dbnode: u32::MAX, + relnode: u32::MAX, + forknum: u8::MAX, + }, + blknum: u32::MAX, + }, + lsn: u64::MAX + }; + loop { + maxbuf.clear(); + maxkey.pack(&mut maxbuf); + let mut iter = self.db.iterator(IteratorMode::From(&maxbuf[..], Direction::Reverse)); + if let Some((k,v)) = iter.next() { + minbuf.clear(); + minbuf.extend_from_slice(&v); + let content = CacheEntryContent::unpack(&mut minbuf); + minbuf.clear(); + minbuf.extend_from_slice(&k); + let key = CacheKey::unpack(&mut minbuf); + + // Construct boundaries for old records cleanup + maxkey.tag = key.tag; + let last_lsn = key.lsn; + maxkey.lsn = min(horizon, last_lsn); // do not remove last version + + let mut minkey = maxkey.clone(); + minkey.lsn = 0; + + // reconstruct most recent page version + if content.wal_record.is_some() { + // force reconstruction of most recent page version + self.reconstruct_page(key, content)?; + } + + maxbuf.clear(); + maxkey.pack(&mut maxbuf); + + if last_lsn > horizon { + // locate most recent record before horizon + let mut iter = self.db.iterator(IteratorMode::From(&maxbuf[..], Direction::Reverse)); + if let Some((k,v)) = iter.next() { + minbuf.clear(); + minbuf.extend_from_slice(&v); + let content = CacheEntryContent::unpack(&mut minbuf); + if content.wal_record.is_some() { + minbuf.clear(); + minbuf.extend_from_slice(&k); + let key = CacheKey::unpack(&mut minbuf); + self.reconstruct_page(key, content)?; + } + } + } + // remove records prior to horizon + minbuf.clear(); + minkey.pack(&mut minbuf); + self.db.delete_range_cf(cf, &minbuf[..], &maxbuf[..])?; + + maxkey = minkey; + } + } + } + } + } + + fn reconstruct_page(&self, key: CacheKey, content: CacheEntryContent) -> anyhow::Result { + let entry_rc = Arc::new(CacheEntry::new(key.clone(), content)); + + let mut entry_content = entry_rc.content.lock().unwrap(); + entry_content.apply_pending = true; + + let s = &self.walredo_sender; + s.send(entry_rc.clone())?; + + while entry_content.apply_pending { + entry_content = entry_rc.walredo_condvar.wait(entry_content).unwrap(); + } + // We should now have a page image. If we don't, it means that WAL redo + // failed to reconstruct it. WAL redo should've logged that error already. + let page_img = match &entry_content.page_image { + Some(p) => p.clone(), + None => { + error!( + "could not apply WAL to reconstruct page image for GetPage@LSN request" + ); + bail!("could not apply WAL to reconstruct page image"); + } + }; + self.put_page_image(key.tag, key.lsn, page_img.clone()); + Ok(page_img) + } + fn wait_lsn(&self, lsn: u64) -> anyhow::Result<()> { let mut shared = self.shared.lock().unwrap(); let mut waited = false; @@ -437,30 +556,8 @@ impl PageCache { } else if content.wal_record.is_some() { buf.clear(); buf.extend_from_slice(&k); - let entry_rc = Arc::new(CacheEntry::new(CacheKey::unpack(&mut buf), content)); - - let mut entry_content = entry_rc.content.lock().unwrap(); - entry_content.apply_pending = true; - - let s = &self.walredo_sender; - s.send(entry_rc.clone())?; - - while entry_content.apply_pending { - entry_content = entry_rc.walredo_condvar.wait(entry_content).unwrap(); - } - - // We should now have a page image. If we don't, it means that WAL redo - // failed to reconstruct it. WAL redo should've logged that error already. - page_img = match &entry_content.page_image { - Some(p) => p.clone(), - None => { - error!( - "could not apply WAL to reconstruct page image for GetPage@LSN request" - ); - bail!("could not apply WAL to reconstruct page image"); - } - }; - self.put_page_image(tag, lsn, page_img.clone()); + let key = CacheKey::unpack(&mut buf); + page_img = self.reconstruct_page(key, content)?; } else { // No base image, and no WAL record. Huh? bail!("no page image or WAL record for requested page"); diff --git a/pageserver/src/page_service.rs b/pageserver/src/page_service.rs index 239b89e306..7ce285164c 100644 --- a/pageserver/src/page_service.rs +++ b/pageserver/src/page_service.rs @@ -24,6 +24,7 @@ use tokio::runtime; use tokio::runtime::Runtime; use tokio::sync::mpsc; use tokio::task; +use std::time::Duration; use crate::basebackup; use crate::page_cache; @@ -936,7 +937,7 @@ impl Connection { // FIXME: I'm getting an error from the tokio copyout driver without this. // I think it happens when the CommandComplete, CloseComplete and ReadyForQuery // are sent in the same TCP packet as the CopyDone. I don't understand why. - thread::sleep(std::time::Duration::from_secs(1)); + thread::sleep(Duration::from_secs(1)); Ok(()) }