diff --git a/pageserver/src/branches.rs b/pageserver/src/branches.rs index e5fc490038..fa4c24dca3 100644 --- a/pageserver/src/branches.rs +++ b/pageserver/src/branches.rs @@ -21,6 +21,8 @@ use std::{ }; use zenith_utils::lsn::Lsn; +use crate::page_cache; +use crate::restore_local_repo; use crate::{repository::Repository, PageServerConf, ZTimelineId}; #[derive(Serialize, Deserialize, Clone)] @@ -38,7 +40,7 @@ pub struct PointInTime { pub lsn: Lsn, } -pub fn init_repo(conf: &PageServerConf, repo_dir: &Path) -> Result<()> { +pub fn init_repo(conf: &'static PageServerConf, repo_dir: &Path) -> Result<()> { // top-level dir may exist if we are creating it through CLI fs::create_dir_all(repo_dir) .with_context(|| format!("could not create directory {}", repo_dir.display()))?; @@ -49,15 +51,9 @@ pub fn init_repo(conf: &PageServerConf, repo_dir: &Path) -> Result<()> { fs::create_dir(std::path::Path::new("refs"))?; fs::create_dir(std::path::Path::new("refs").join("branches"))?; fs::create_dir(std::path::Path::new("refs").join("tags"))?; - fs::create_dir(std::path::Path::new("wal-redo"))?; println!("created directory structure in {}", repo_dir.display()); - // Create initial timeline - let tli = create_timeline(conf, None)?; - let timelinedir = conf.timeline_path(tli); - println!("created initial timeline {}", tli); - // Run initdb // // We create the cluster temporarily in a "tmp" directory inside the repository, @@ -89,6 +85,27 @@ pub fn init_repo(conf: &PageServerConf, repo_dir: &Path) -> Result<()> { let lsn = controlfile.checkPoint; let lsnstr = format!("{:016X}", lsn); + // Bootstrap the repository by loading the newly-initdb'd cluster into 'main' branch. + let tli = create_timeline(conf, None)?; + let timelinedir = conf.timeline_path(tli); + + // We don't use page_cache here, because we don't want to spawn the WAL redo thread during + // repository initialization. + // + // FIXME: That caused trouble, because the WAL redo thread launched initdb in the background, + // and it kept running even after the "zenith init" had exited. In tests, we started the + // page server immediately after that, so that initdb was still running in the background, + // and we failed to run initdb again in the same directory. This has been solved for the + // rapid init+start case now, but the general race condition remains if you restart the the + // server quickly. + let repo = crate::repository::rocksdb::RocksRepository::new( + conf, + std::sync::Arc::new(crate::walredo::DummyRedoManager {}), + ); + let timeline = repo.create_empty_timeline(tli, Lsn(lsn))?; + + restore_local_repo::import_timeline_from_postgres_datadir(&tmppath, &*timeline, Lsn(lsn))?; + // Move the initial WAL file fs::rename( tmppath.join("pg_wal").join("000000010000000000000001"), @@ -96,7 +113,11 @@ pub fn init_repo(conf: &PageServerConf, repo_dir: &Path) -> Result<()> { .join("wal") .join("000000010000000000000001.partial"), )?; - println!("moved initial WAL file"); + println!("created initial timeline {}", tli); + + let data = tli.to_string(); + fs::write(conf.branch_path("main"), data)?; + println!("created main branch"); // Remove pg_wal fs::remove_dir_all(tmppath.join("pg_wal"))?; @@ -104,14 +125,12 @@ pub fn init_repo(conf: &PageServerConf, repo_dir: &Path) -> Result<()> { force_crash_recovery(&tmppath)?; println!("updated pg_control"); + // Move the data directory as an initial base backup. + // FIXME: It would be enough to only copy the non-relation files here, the relation + // data was already loaded into the repository. let target = timelinedir.join("snapshots").join(&lsnstr); fs::rename(tmppath, &target)?; - // Create 'main' branch to refer to the initial timeline - let data = tli.to_string(); - fs::write(conf.branch_path("main"), data)?; - println!("created main branch"); - println!( "new zenith repository was created in {}", repo_dir.display() @@ -213,12 +232,13 @@ pub(crate) fn create_branch( startpoint.lsn = end_of_wal; } - // create a new timeline for it + // create a new timeline directory for it let newtli = create_timeline(conf, Some(startpoint))?; let newtimelinedir = conf.timeline_path(newtli); - let data = newtli.to_string(); - fs::write(conf.branch_path(&branchname), data)?; + // Let the Repository backend do its initialization + let repo = page_cache::get_repository(); + repo.branch_timeline(startpoint.timelineid, newtli, startpoint.lsn)?; // Copy the latest snapshot (TODO: before the startpoint) and all WAL // TODO: be smarter and avoid the copying... @@ -234,6 +254,12 @@ pub(crate) fn create_branch( pg_constants::WAL_SEGMENT_SIZE, )?; + // Remember the human-readable branch name for the new timeline. + // FIXME: there's a race condition, if you create a branch with the same + // name concurrently. + let data = newtli.to_string(); + fs::write(conf.branch_path(&branchname), data)?; + Ok(BranchInfo { name: branchname.to_string(), timeline_id: newtli, diff --git a/pageserver/src/page_service.rs b/pageserver/src/page_service.rs index b699df3023..9c24504f77 100644 --- a/pageserver/src/page_service.rs +++ b/pageserver/src/page_service.rs @@ -732,7 +732,7 @@ impl Connection { // Check that the timeline exists let repository = page_cache::get_repository(); - if repository.get_or_restore_timeline(timelineid).is_err() { + if repository.get_timeline(timelineid).is_err() { bail!("client requested callmemaybe on timeline {} which does not exist in page server", timelineid); } @@ -899,14 +899,13 @@ impl Connection { ) -> anyhow::Result<()> { // check that the timeline exists let repository = page_cache::get_repository(); - let timeline = repository - .get_or_restore_timeline(timelineid) - .map_err(|_| { - anyhow!( + let timeline = repository.get_timeline(timelineid).map_err(|e| { + error!("error fetching timeline: {:?}", e); + anyhow!( "client requested basebackup on timeline {} which does not exist in page server", timelineid ) - })?; + })?; /* switch client to COPYOUT */ let stream = &mut self.stream; stream.write_u8(b'H')?; diff --git a/pageserver/src/repository.rs b/pageserver/src/repository.rs index 9fd114293e..dae60d4f69 100644 --- a/pageserver/src/repository.rs +++ b/pageserver/src/repository.rs @@ -15,21 +15,17 @@ use zenith_utils::lsn::Lsn; /// timelines, forked off from the same initial call to 'initdb'. pub trait Repository { /// Get Timeline handle for given zenith timeline ID. - /// - /// The Timeline is expected to be already "open", i.e. `get_or_restore_timeline` - /// should've been called on it earlier already. fn get_timeline(&self, timelineid: ZTimelineId) -> Result>; - /// Get Timeline handle for given zenith timeline ID. - /// - /// Creates a new Timeline object if it's not "open" already. - fn get_or_restore_timeline(&self, timelineid: ZTimelineId) -> Result>; + /// Create a new, empty timeline. The caller is responsible for loading data into it + fn create_empty_timeline( + &self, + timelineid: ZTimelineId, + start_lsn: Lsn, + ) -> Result>; - /// Create an empty timeline, without loading any data into it from possible on-disk snapshot. - /// - /// For unit tests. - #[cfg(test)] - fn create_empty_timeline(&self, timelineid: ZTimelineId) -> Result>; + /// Branch a timeline + fn branch_timeline(&self, src: ZTimelineId, dst: ZTimelineId, start_lsn: Lsn) -> Result<()>; //fn get_stats(&self) -> RepositoryStats; } @@ -163,6 +159,13 @@ pub trait Timeline { /// valid LSN, so that the WAL receiver knows where to restart streaming. fn advance_last_record_lsn(&self, lsn: Lsn); fn get_last_record_lsn(&self) -> Lsn; + + /// + /// Flush to disk all data that was written with the put_* functions + /// + /// NOTE: This has nothing to do with checkpoint in PostgreSQL. We don't + /// know anything about them here in the repository. + fn checkpoint(&self) -> Result<()>; } #[derive(Clone)] @@ -352,7 +355,7 @@ mod tests { // Create timeline to work on let repo = get_test_repo("test_relsize")?; let timelineid = ZTimelineId::from_str("11223344556677881122334455667788").unwrap(); - let tline = repo.create_empty_timeline(timelineid)?; + let tline = repo.create_empty_timeline(timelineid, Lsn(0))?; tline.init_valid_lsn(Lsn(1)); tline.put_page_image(TEST_BUF(0), Lsn(2), TEST_IMG("foo blk 0 at 2")); @@ -439,7 +442,7 @@ mod tests { fn test_large_rel() -> Result<()> { let repo = get_test_repo("test_large_rel")?; let timelineid = ZTimelineId::from_str("11223344556677881122334455667788").unwrap(); - let tline = repo.create_empty_timeline(timelineid)?; + let tline = repo.create_empty_timeline(timelineid, Lsn(0))?; tline.init_valid_lsn(Lsn(1)); diff --git a/pageserver/src/repository/rocksdb.rs b/pageserver/src/repository/rocksdb.rs index 09fe1ad63b..d9b2f509d6 100644 --- a/pageserver/src/repository/rocksdb.rs +++ b/pageserver/src/repository/rocksdb.rs @@ -6,19 +6,19 @@ // LSN. use crate::repository::{BufferTag, RelTag, Repository, Timeline, WALRecord}; -use crate::restore_local_repo::restore_timeline; +use crate::restore_local_repo::import_timeline_wal; use crate::waldecoder::Oid; use crate::walredo::WalRedoManager; use crate::PageServerConf; use crate::ZTimelineId; -// use crate::PageServerConf; -// use crate::branches; -use anyhow::{bail, Context, Result}; +use anyhow::{anyhow, bail, Context, Result}; use bytes::{Buf, BufMut, Bytes, BytesMut}; use log::*; +use postgres_ffi::pg_constants; use std::cmp::min; use std::collections::HashMap; use std::convert::TryInto; +use std::str::FromStr; use std::sync::atomic::AtomicU64; use std::sync::atomic::Ordering; use std::sync::{Arc, Mutex}; @@ -88,6 +88,43 @@ struct CacheKey { pub lsn: Lsn, } +// +// In addition to those per-page entries, the 'last_valid_lsn' and 'last_record_lsn' +// values are also persisted in the rocskdb repository. They are stored with CacheKeys +// with ROCKSDB_SPECIAL_FORKNUM, and 'blknum' indicates which value it is. The +// rest of the key fields are zero. We use a CacheKey as the key for these too, +// so that whenever we iterate through keys in the repository, we can safely parse +// the key blob as CacheKey without checking for these special values first. +// +// FIXME: This is quite a similar concept to the special entries created by +// `BufferTag::fork` function. Merge them somehow? These special keys are specific +// to the rocksb implementation, not exposed to the rest of the system, but the +// other special forks created by `BufferTag::fork` are also used elsewhere. +// +impl CacheKey { + const fn special(id: u32) -> CacheKey { + CacheKey { + tag: BufferTag { + rel: RelTag { + forknum: pg_constants::ROCKSDB_SPECIAL_FORKNUM, + spcnode: 0, + dbnode: 0, + relnode: 0, + }, + blknum: id, + }, + lsn: Lsn(0), + } + } + + fn is_special(&self) -> bool { + self.tag.rel.forknum == pg_constants::ROCKSDB_SPECIAL_FORKNUM + } +} + +static LAST_VALID_LSN_KEY: CacheKey = CacheKey::special(0); +static LAST_VALID_RECORD_LSN_KEY: CacheKey = CacheKey::special(1); + impl CacheKey { fn pack(&self, buf: &mut BytesMut) { self.tag.pack(buf); @@ -189,52 +226,54 @@ impl RocksRepository { } } -// Get handle to a given timeline. It is assumed to already exist. -impl Repository for RocksRepository { - fn get_timeline(&self, timelineid: ZTimelineId) -> Result> { - let timelines = self.timelines.lock().unwrap(); - - match timelines.get(&timelineid) { - Some(timeline) => Ok(timeline.clone()), - None => bail!("timeline not found"), - } - } - - fn get_or_restore_timeline(&self, timelineid: ZTimelineId) -> Result> { +impl RocksRepository { + fn get_rocks_timeline(&self, timelineid: ZTimelineId) -> Result> { let mut timelines = self.timelines.lock().unwrap(); match timelines.get(&timelineid) { Some(timeline) => Ok(timeline.clone()), None => { - let timeline = RocksTimeline::new(self.conf, timelineid, self.walredo_mgr.clone()); + let timeline = + RocksTimeline::open(self.conf, timelineid, self.walredo_mgr.clone())?; - restore_timeline(self.conf, &timeline, timelineid)?; + // Load any new WAL after the last checkpoint into the repository. + info!( + "Loading WAL for timeline {} starting at {}", + timelineid, + timeline.get_last_record_lsn() + ); + let wal_dir = self.conf.timeline_path(timelineid).join("wal"); + import_timeline_wal(&wal_dir, &timeline, timeline.get_last_record_lsn())?; let timeline_rc = Arc::new(timeline); + if self.conf.gc_horizon != 0 { + RocksTimeline::launch_gc_thread(self.conf, timeline_rc.clone()); + } + timelines.insert(timelineid, timeline_rc.clone()); - if self.conf.gc_horizon != 0 { - let timeline_rc_copy = timeline_rc.clone(); - let conf = self.conf; - let _gc_thread = thread::Builder::new() - .name("Garbage collection thread".into()) - .spawn(move || { - // FIXME - timeline_rc_copy.do_gc(conf).expect("GC thread died"); - }) - .unwrap(); - } Ok(timeline_rc) } } } +} - #[cfg(test)] - fn create_empty_timeline(&self, timelineid: ZTimelineId) -> Result> { +// Get handle to a given timeline. It is assumed to already exist. +impl Repository for RocksRepository { + fn get_timeline(&self, timelineid: ZTimelineId) -> Result> { + Ok(self.get_rocks_timeline(timelineid)?) + } + + fn create_empty_timeline( + &self, + timelineid: ZTimelineId, + start_lsn: Lsn, + ) -> Result> { let mut timelines = self.timelines.lock().unwrap(); - let timeline = RocksTimeline::new(&self.conf, timelineid, self.walredo_mgr.clone()); + let timeline = + RocksTimeline::create(&self.conf, timelineid, self.walredo_mgr.clone(), start_lsn)?; let timeline_rc = Arc::new(timeline); let r = timelines.insert(timelineid, timeline_rc.clone()); @@ -244,13 +283,39 @@ impl Repository for RocksRepository { Ok(timeline_rc) } + + fn branch_timeline(&self, src: ZTimelineId, dst: ZTimelineId, at_lsn: Lsn) -> Result<()> { + let src_timeline = self.get_rocks_timeline(src)?; + + info!("branching at {}", at_lsn); + + let dst_timeline = + RocksTimeline::create(&self.conf, dst, self.walredo_mgr.clone(), at_lsn)?; + + // Copy all entries <= LSN + // + // This is very inefficient, a far cry from the promise of cheap copy-on-write + // branching. But it will do for now. + let mut iter = src_timeline.db.raw_iterator(); + iter.seek_to_first(); + while iter.valid() { + let k = iter.key().unwrap(); + let key = CacheKey::from_slice(k); + + if !key.is_special() && key.lsn <= at_lsn { + let v = iter.value().unwrap(); + dst_timeline.db.put(k, v)?; + } + iter.next(); + } + Ok(()) + } } impl RocksTimeline { - fn open_rocksdb(conf: &PageServerConf, timelineid: ZTimelineId) -> rocksdb::DB { - let path = conf.timeline_path(timelineid); + /// common options used by `open` and `create` + fn get_rocksdb_opts() -> rocksdb::Options { let mut opts = rocksdb::Options::default(); - opts.create_if_missing(true); opts.set_use_fsync(true); opts.set_compression_type(rocksdb::DBCompressionType::Lz4); opts.set_compaction_filter("ttl", move |_level: u32, _key: &[u8], val: &[u8]| { @@ -260,31 +325,87 @@ impl RocksTimeline { rocksdb::compaction_filter::Decision::Keep } }); - rocksdb::DB::open(&opts, &path).unwrap() + opts } - fn new( - conf: &'static PageServerConf, + /// Open a RocksDB database, and load the last valid and record LSNs into memory. + fn open( + conf: &PageServerConf, timelineid: ZTimelineId, walredo_mgr: Arc, - ) -> RocksTimeline { - RocksTimeline { - db: RocksTimeline::open_rocksdb(conf, timelineid), + ) -> Result { + let path = conf.timeline_path(timelineid); + let db = rocksdb::DB::open(&RocksTimeline::get_rocksdb_opts(), path)?; + // Load these into memory + let lsnstr = db + .get(LAST_VALID_LSN_KEY.to_bytes()) + .with_context(|| "last_valid_lsn not found in repository")? + .ok_or(anyhow!("empty last_valid_lsn"))?; + let last_valid_lsn = Lsn::from_str(std::str::from_utf8(&lsnstr)?)?; + let lsnstr = db + .get(LAST_VALID_RECORD_LSN_KEY.to_bytes()) + .with_context(|| "last_record_lsn not found in repository")? + .ok_or(anyhow!("empty last_record_lsn"))?; + let last_record_lsn = Lsn::from_str(std::str::from_utf8(&lsnstr)?)?; + + let timeline = RocksTimeline { + db, walredo_mgr, - last_valid_lsn: SeqWait::new(Lsn(0)), - last_record_lsn: AtomicLsn::new(0), + last_valid_lsn: SeqWait::new(last_valid_lsn), + last_record_lsn: AtomicLsn::new(last_record_lsn.0), num_entries: AtomicU64::new(0), num_page_images: AtomicU64::new(0), num_wal_records: AtomicU64::new(0), num_getpage_requests: AtomicU64::new(0), - } + }; + Ok(timeline) + } + + /// Create a new RocksDB database. It is initally empty, except for the last + /// valid and last record LSNs, which are set to 'start_lsn'. + fn create( + conf: &PageServerConf, + timelineid: ZTimelineId, + walredo_mgr: Arc, + start_lsn: Lsn, + ) -> Result { + let path = conf.timeline_path(timelineid); + let mut opts = RocksTimeline::get_rocksdb_opts(); + opts.create_if_missing(true); + opts.set_error_if_exists(true); + let db = rocksdb::DB::open(&opts, path)?; + + let timeline = RocksTimeline { + db, + walredo_mgr, + + last_valid_lsn: SeqWait::new(start_lsn), + last_record_lsn: AtomicLsn::new(start_lsn.0), + + num_entries: AtomicU64::new(0), + num_page_images: AtomicU64::new(0), + num_wal_records: AtomicU64::new(0), + num_getpage_requests: AtomicU64::new(0), + }; + // Write the initial last_valid/record_lsn values + timeline.checkpoint()?; + Ok(timeline) + } + + fn launch_gc_thread(conf: &'static PageServerConf, timeline_rc: Arc) { + let timeline_rc_copy = timeline_rc.clone(); + let _gc_thread = thread::Builder::new() + .name("Garbage collection thread".into()) + .spawn(move || { + // FIXME + timeline_rc_copy.do_gc(conf).expect("GC thread died"); + }) + .unwrap(); } -} -impl RocksTimeline { /// /// Collect all the WAL records that are needed to reconstruct a page /// image for the given cache entry. @@ -813,6 +934,23 @@ impl Timeline for RocksTimeline { self.last_valid_lsn.load() } + // Flush all the changes written so far with PUT functions to disk. + // RocksDB writes out things as we go (?), so we don't need to do much here. We just + // write out the last valid and record LSNs. + fn checkpoint(&self) -> Result<()> { + let last_valid_lsn = self.last_valid_lsn.load(); + self.db + .put(LAST_VALID_LSN_KEY.to_bytes(), last_valid_lsn.to_string())?; + self.db.put( + LAST_VALID_RECORD_LSN_KEY.to_bytes(), + self.last_record_lsn.load().to_string(), + )?; + + trace!("checkpoint at {}", last_valid_lsn); + + Ok(()) + } + // // Get statistics to be displayed in the user interface. // diff --git a/pageserver/src/restore_local_repo.rs b/pageserver/src/restore_local_repo.rs index d6d822a1bf..9610c1b2cb 100644 --- a/pageserver/src/restore_local_repo.rs +++ b/pageserver/src/restore_local_repo.rs @@ -1,15 +1,7 @@ -// -// Restore chunks from local Zenith repository -// -// This runs once at Page Server startup. It loads all the "snapshots" and all -// WAL from all timelines from the local zenith repository into the in-memory page -// cache. -// -// This also initializes the "last valid LSN" in the page cache to the last LSN -// seen in the WAL, so that when the WAL receiver is started, it starts -// streaming from that LSN. -// - +//! +//! Import data and WAL from a PostgreSQL data directory and WAL segments into +//! zenith repository +//! use log::*; use std::cmp::max; use std::fs; @@ -31,53 +23,6 @@ use postgres_ffi::relfile_utils::*; use postgres_ffi::xlog_utils::*; use zenith_utils::lsn::Lsn; -/// -/// Load all WAL and all relation data pages from local disk into the repository. -/// -pub fn restore_timeline( - conf: &PageServerConf, - timeline: &dyn Timeline, - timelineid: ZTimelineId, -) -> Result<()> { - let timelinepath = PathBuf::from("timelines").join(timelineid.to_string()); - - if !timelinepath.exists() { - anyhow::bail!("timeline {} does not exist in the page server's repository"); - } - - // Scan .zenith/timelines//snapshots - let snapshotspath = PathBuf::from("timelines") - .join(timelineid.to_string()) - .join("snapshots"); - - let mut last_snapshot_lsn: Lsn = Lsn(0); - - for direntry in fs::read_dir(&snapshotspath).unwrap() { - let direntry = direntry?; - let filename = direntry.file_name(); - let lsn = Lsn::from_filename(&filename)?; - last_snapshot_lsn = max(lsn, last_snapshot_lsn); - - // FIXME: pass filename as Path instead of str? - let filename_str = filename.into_string().unwrap(); - restore_snapshot(conf, timeline, timelineid, &filename_str)?; - info!("restored snapshot at {:?}", filename_str); - } - - if last_snapshot_lsn == Lsn(0) { - error!( - "could not find valid snapshot in {}", - snapshotspath.display() - ); - // TODO return error? - } - timeline.init_valid_lsn(last_snapshot_lsn); - - restore_wal(timeline, timelineid, last_snapshot_lsn)?; - - Ok(()) -} - /// /// Find latest snapshot in a timeline's 'snapshots' directory /// @@ -102,19 +47,16 @@ pub fn find_latest_snapshot(_conf: &PageServerConf, timeline: ZTimelineId) -> Re Ok(last_snapshot_lsn) } -fn restore_snapshot( - _conf: &PageServerConf, +/// +/// Import all relation data pages from local disk into the repository. +/// +pub fn import_timeline_from_postgres_datadir( + path: &Path, timeline: &dyn Timeline, - timelineid: ZTimelineId, - snapshot: &str, + lsn: Lsn, ) -> Result<()> { - let snapshotpath = PathBuf::from("timelines") - .join(timelineid.to_string()) - .join("snapshots") - .join(snapshot); - // Scan 'global' - for direntry in fs::read_dir(snapshotpath.join("global"))? { + for direntry in fs::read_dir(path.join("global"))? { let direntry = direntry?; match direntry.file_name().to_str() { None => continue, @@ -124,19 +66,19 @@ fn restore_snapshot( Some("pg_filenode.map") => continue, // Load any relation files into the page server - _ => restore_relfile( + _ => import_relfile( + &direntry.path(), timeline, - snapshot, + lsn, pg_constants::GLOBALTABLESPACE_OID, 0, - &direntry.path(), )?, } } // Scan 'base'. It contains database dirs, the database OID is the filename. // E.g. 'base/12345', where 12345 is the database OID. - for direntry in fs::read_dir(snapshotpath.join("base"))? { + for direntry in fs::read_dir(path.join("base"))? { let direntry = direntry?; let dboid = direntry.file_name().to_str().unwrap().parse::()?; @@ -151,30 +93,31 @@ fn restore_snapshot( Some("pg_filenode.map") => continue, // Load any relation files into the page server - _ => restore_relfile( + _ => import_relfile( + &direntry.path(), timeline, - snapshot, + lsn, pg_constants::DEFAULTTABLESPACE_OID, dboid, - &direntry.path(), )?, } } } // TODO: Scan pg_tblspc + timeline.checkpoint()?; + Ok(()) } -fn restore_relfile( +// subroutine of import_timeline_from_postgres_datadir(), to load one relation file. +fn import_relfile( + path: &Path, timeline: &dyn Timeline, - snapshot: &str, + lsn: Lsn, spcoid: Oid, dboid: Oid, - path: &Path, ) -> Result<()> { - let lsn = Lsn::from_hex(snapshot)?; - // Does it look like a relation file? let p = parse_relfilename(path.file_name().unwrap().to_str().unwrap()); @@ -228,24 +171,22 @@ fn restore_relfile( Ok(()) } -// Scan WAL on a timeline, starting from given LSN, and load all the records -// into the page cache. -fn restore_wal(timeline: &dyn Timeline, timelineid: ZTimelineId, startpoint: Lsn) -> Result<()> { - let walpath = format!("timelines/{}/wal", timelineid); - +/// Scan PostgreSQL WAL files in given directory, and load all records >= 'startpoint' into +/// the repository. +pub fn import_timeline_wal(walpath: &Path, timeline: &dyn Timeline, startpoint: Lsn) -> Result<()> { let mut waldecoder = WalStreamDecoder::new(startpoint); let mut segno = startpoint.segment_number(pg_constants::WAL_SEGMENT_SIZE); let mut offset = startpoint.segment_offset(pg_constants::WAL_SEGMENT_SIZE); - let mut last_lsn = Lsn(0); + let mut last_lsn = startpoint; loop { // FIXME: assume postgresql tli 1 for now let filename = XLogFileName(1, segno, pg_constants::WAL_SEGMENT_SIZE); - let mut path = walpath.clone() + "/" + &filename; + let mut path = walpath.join(&filename); // It could be as .partial if !PathBuf::from(&path).exists() { - path += ".partial"; + path = walpath.join(filename + ".partial"); } // Slurp the WAL file @@ -287,12 +228,16 @@ fn restore_wal(timeline: &dyn Timeline, timelineid: ZTimelineId, startpoint: Lsn nrecords += 1; } - info!("restored {} records from WAL file {}", nrecords, filename); + info!( + "imported {} records from WAL file {} up to {}", + nrecords, + path.display(), + last_lsn + ); segno += 1; offset = 0; } info!("reached end of WAL at {}", last_lsn); - Ok(()) } diff --git a/pageserver/src/walreceiver.rs b/pageserver/src/walreceiver.rs index fbcf67eb39..dbdf69a03e 100644 --- a/pageserver/src/walreceiver.rs +++ b/pageserver/src/walreceiver.rs @@ -10,7 +10,7 @@ use crate::page_cache; use crate::waldecoder::*; use crate::PageServerConf; use crate::ZTimelineId; -use anyhow::Error; +use anyhow::{Error, Result}; use lazy_static::lazy_static; use log::*; use postgres::fallible_iterator::FallibleIterator; @@ -20,6 +20,7 @@ use postgres_ffi::pg_constants; use postgres_ffi::xlog_utils::*; use postgres_protocol::message::backend::ReplicationMessage; use postgres_types::PgLsn; +use std::cmp::{max, min}; use std::collections::HashMap; use std::fs; use std::fs::{File, OpenOptions}; @@ -139,7 +140,7 @@ fn walreceiver_main( // If we had previously received WAL up to some point in the middle of a WAL record, we // better start from the end of last full WAL record, not in the middle of one. Hence, // use 'last_record_lsn' rather than 'last_valid_lsn' here. - let last_rec_lsn = timeline.get_last_record_lsn(); + let mut last_rec_lsn = timeline.get_last_record_lsn(); let mut startpoint = last_rec_lsn; if startpoint == Lsn(0) { @@ -173,6 +174,7 @@ fn walreceiver_main( let data = xlog_data.data(); let startlsn = Lsn::from(xlog_data.wal_start()); let endlsn = startlsn + data.len() as u64; + let prev_last_rec_lsn = last_rec_lsn; write_wal_file(startlsn, timelineid, pg_constants::WAL_SEGMENT_SIZE, data)?; @@ -187,6 +189,7 @@ fn walreceiver_main( // Now that this record has been handled, let the page cache know that // it is up-to-date to this LSN timeline.advance_last_record_lsn(lsn); + last_rec_lsn = lsn; } // Update the last_valid LSN value in the page cache one more time. We updated @@ -197,6 +200,33 @@ fn walreceiver_main( // flush ptr. timeline.advance_last_valid_lsn(endlsn); + // Somewhat arbitrarily, if we have at least 10 complete wal segments (16 MB each), + // "checkpoint" the repository to flush all the changes from WAL we've processed + // so far to disk. After this, we don't need the original WAL anymore, and it + // can be removed. + // + // TODO: We don't actually dare to remove the WAL. It's useful for debugging, + // and we might it for logical decoiding other things in the future. Although + // we should also be able to fetch it back from the WAL safekeepers or S3 if + // needed. + if prev_last_rec_lsn.segment_number(pg_constants::WAL_SEGMENT_SIZE) + != last_rec_lsn.segment_number(pg_constants::WAL_SEGMENT_SIZE) + { + info!("switched segment {} to {}", prev_last_rec_lsn, last_rec_lsn); + let (oldest_segno, newest_segno) = find_wal_file_range( + timelineid, + pg_constants::WAL_SEGMENT_SIZE, + last_rec_lsn, + )?; + + if newest_segno - oldest_segno >= 10 { + timeline.checkpoint()?; + + // TODO: This is where we could remove WAL older than last_rec_lsn. + //remove_wal_files(timelineid, pg_constants::WAL_SEGMENT_SIZE, last_rec_lsn)?; + } + } + if !caught_up && endlsn >= end_of_wal { info!("caught up at LSN {}", endlsn); caught_up = true; @@ -233,6 +263,45 @@ fn walreceiver_main( Ok(()) } +fn find_wal_file_range( + timeline: ZTimelineId, + wal_seg_size: usize, + written_upto: Lsn, +) -> Result<(u64, u64)> { + let written_upto_segno = written_upto.segment_number(wal_seg_size); + + let mut oldest_segno = written_upto_segno; + let mut newest_segno = written_upto_segno; + // Scan the wal directory, and count how many WAL filed we could remove + let wal_dir = PathBuf::from(format!("timelines/{}/wal", timeline)); + for entry in fs::read_dir(wal_dir)? { + let entry = entry?; + let path = entry.path(); + + if path.is_dir() { + continue; + } + + let filename = path.file_name().unwrap().to_str().unwrap(); + + if IsXLogFileName(filename) { + let (segno, _tli) = XLogFromFileName(filename, wal_seg_size); + + if segno > written_upto_segno { + // that's strange. + warn!("there is a WAL file from future at {}", path.display()); + continue; + } + + oldest_segno = min(oldest_segno, segno); + newest_segno = max(newest_segno, segno); + } + } + // FIXME: would be good to assert that there are no gaps in the WAL files + + Ok((oldest_segno, newest_segno)) +} + /// Data returned from the postgres `IDENTIFY_SYSTEM` command /// /// See the [postgres docs] for more details. diff --git a/pageserver/src/walredo.rs b/pageserver/src/walredo.rs index 97ef72ad2d..9ba4e6f515 100644 --- a/pageserver/src/walredo.rs +++ b/pageserver/src/walredo.rs @@ -58,6 +58,24 @@ pub trait WalRedoManager: Send + Sync { ) -> Result; } +/// +/// A dummy WAL Redo Manager implementation that doesn't allow replaying +/// anything. Currently used during bootstrapping (zenith init), to create +/// a Repository object without launching the real WAL redo process. +/// +pub struct DummyRedoManager {} +impl crate::walredo::WalRedoManager for DummyRedoManager { + fn request_redo( + &self, + _tag: BufferTag, + _lsn: Lsn, + _base_img: Option, + _records: Vec, + ) -> Result { + Err(WalRedoError::InvalidState) + } +} + static TIMEOUT: Duration = Duration::from_secs(20); /// @@ -92,6 +110,9 @@ struct WalRedoRequest { pub enum WalRedoError { #[error(transparent)] IoError(#[from] std::io::Error), + + #[error("cannot perform WAL redo now")] + InvalidState, } /// diff --git a/postgres_ffi/src/pg_constants.rs b/postgres_ffi/src/pg_constants.rs index 19abe5f741..16f603acca 100644 --- a/postgres_ffi/src/pg_constants.rs +++ b/postgres_ffi/src/pg_constants.rs @@ -16,6 +16,8 @@ pub const FSM_FORKNUM: u8 = 1; pub const VISIBILITYMAP_FORKNUM: u8 = 2; pub const INIT_FORKNUM: u8 = 3; +pub const ROCKSDB_SPECIAL_FORKNUM: u8 = 50; + // From storage_xlog.h pub const SMGR_TRUNCATE_HEAP: u32 = 0x0001;