mirror of
https://github.com/neondatabase/neon.git
synced 2026-01-07 13:32:57 +00:00
Implement "checkpointing" in the page server.
- Previously, we checked on first use of a timeline, whether there is a snapshot and WAL for the timeline, and loaded it all into the (rocksdb) repository. That's a waste of effort if we had done that earlier already, and stopped and restarted the server. Track the last LSN that we have loaded into the repository, and only load the recent missing WAL after that. - When you create a new zenith repository with "zenith init", immediately load the initial empty postgres cluster into the rocksdb repository. Previously, we only did that on the first connection. This way, we don't need any "load from filesystem" codepath during normal operation, we can assume that the repository for a timeline is always up to date. (We might still want to use the functionality to import an existing PostgreSQL data directory into the repository in the future, as a separate Import feature, but not today.)
This commit is contained in:
@@ -21,6 +21,8 @@ use std::{
|
||||
};
|
||||
use zenith_utils::lsn::Lsn;
|
||||
|
||||
use crate::page_cache;
|
||||
use crate::restore_local_repo;
|
||||
use crate::{repository::Repository, PageServerConf, ZTimelineId};
|
||||
|
||||
#[derive(Serialize, Deserialize, Clone)]
|
||||
@@ -38,7 +40,7 @@ pub struct PointInTime {
|
||||
pub lsn: Lsn,
|
||||
}
|
||||
|
||||
pub fn init_repo(conf: &PageServerConf, repo_dir: &Path) -> Result<()> {
|
||||
pub fn init_repo(conf: &'static PageServerConf, repo_dir: &Path) -> Result<()> {
|
||||
// top-level dir may exist if we are creating it through CLI
|
||||
fs::create_dir_all(repo_dir)
|
||||
.with_context(|| format!("could not create directory {}", repo_dir.display()))?;
|
||||
@@ -49,15 +51,9 @@ pub fn init_repo(conf: &PageServerConf, repo_dir: &Path) -> Result<()> {
|
||||
fs::create_dir(std::path::Path::new("refs"))?;
|
||||
fs::create_dir(std::path::Path::new("refs").join("branches"))?;
|
||||
fs::create_dir(std::path::Path::new("refs").join("tags"))?;
|
||||
fs::create_dir(std::path::Path::new("wal-redo"))?;
|
||||
|
||||
println!("created directory structure in {}", repo_dir.display());
|
||||
|
||||
// Create initial timeline
|
||||
let tli = create_timeline(conf, None)?;
|
||||
let timelinedir = conf.timeline_path(tli);
|
||||
println!("created initial timeline {}", tli);
|
||||
|
||||
// Run initdb
|
||||
//
|
||||
// We create the cluster temporarily in a "tmp" directory inside the repository,
|
||||
@@ -89,6 +85,27 @@ pub fn init_repo(conf: &PageServerConf, repo_dir: &Path) -> Result<()> {
|
||||
let lsn = controlfile.checkPoint;
|
||||
let lsnstr = format!("{:016X}", lsn);
|
||||
|
||||
// Bootstrap the repository by loading the newly-initdb'd cluster into 'main' branch.
|
||||
let tli = create_timeline(conf, None)?;
|
||||
let timelinedir = conf.timeline_path(tli);
|
||||
|
||||
// We don't use page_cache here, because we don't want to spawn the WAL redo thread during
|
||||
// repository initialization.
|
||||
//
|
||||
// FIXME: That caused trouble, because the WAL redo thread launched initdb in the background,
|
||||
// and it kept running even after the "zenith init" had exited. In tests, we started the
|
||||
// page server immediately after that, so that initdb was still running in the background,
|
||||
// and we failed to run initdb again in the same directory. This has been solved for the
|
||||
// rapid init+start case now, but the general race condition remains if you restart the the
|
||||
// server quickly.
|
||||
let repo = crate::repository::rocksdb::RocksRepository::new(
|
||||
conf,
|
||||
std::sync::Arc::new(crate::walredo::DummyRedoManager {}),
|
||||
);
|
||||
let timeline = repo.create_empty_timeline(tli, Lsn(lsn))?;
|
||||
|
||||
restore_local_repo::import_timeline_from_postgres_datadir(&tmppath, &*timeline, Lsn(lsn))?;
|
||||
|
||||
// Move the initial WAL file
|
||||
fs::rename(
|
||||
tmppath.join("pg_wal").join("000000010000000000000001"),
|
||||
@@ -96,7 +113,11 @@ pub fn init_repo(conf: &PageServerConf, repo_dir: &Path) -> Result<()> {
|
||||
.join("wal")
|
||||
.join("000000010000000000000001.partial"),
|
||||
)?;
|
||||
println!("moved initial WAL file");
|
||||
println!("created initial timeline {}", tli);
|
||||
|
||||
let data = tli.to_string();
|
||||
fs::write(conf.branch_path("main"), data)?;
|
||||
println!("created main branch");
|
||||
|
||||
// Remove pg_wal
|
||||
fs::remove_dir_all(tmppath.join("pg_wal"))?;
|
||||
@@ -104,14 +125,12 @@ pub fn init_repo(conf: &PageServerConf, repo_dir: &Path) -> Result<()> {
|
||||
force_crash_recovery(&tmppath)?;
|
||||
println!("updated pg_control");
|
||||
|
||||
// Move the data directory as an initial base backup.
|
||||
// FIXME: It would be enough to only copy the non-relation files here, the relation
|
||||
// data was already loaded into the repository.
|
||||
let target = timelinedir.join("snapshots").join(&lsnstr);
|
||||
fs::rename(tmppath, &target)?;
|
||||
|
||||
// Create 'main' branch to refer to the initial timeline
|
||||
let data = tli.to_string();
|
||||
fs::write(conf.branch_path("main"), data)?;
|
||||
println!("created main branch");
|
||||
|
||||
println!(
|
||||
"new zenith repository was created in {}",
|
||||
repo_dir.display()
|
||||
@@ -213,12 +232,13 @@ pub(crate) fn create_branch(
|
||||
startpoint.lsn = end_of_wal;
|
||||
}
|
||||
|
||||
// create a new timeline for it
|
||||
// create a new timeline directory for it
|
||||
let newtli = create_timeline(conf, Some(startpoint))?;
|
||||
let newtimelinedir = conf.timeline_path(newtli);
|
||||
|
||||
let data = newtli.to_string();
|
||||
fs::write(conf.branch_path(&branchname), data)?;
|
||||
// Let the Repository backend do its initialization
|
||||
let repo = page_cache::get_repository();
|
||||
repo.branch_timeline(startpoint.timelineid, newtli, startpoint.lsn)?;
|
||||
|
||||
// Copy the latest snapshot (TODO: before the startpoint) and all WAL
|
||||
// TODO: be smarter and avoid the copying...
|
||||
@@ -234,6 +254,12 @@ pub(crate) fn create_branch(
|
||||
pg_constants::WAL_SEGMENT_SIZE,
|
||||
)?;
|
||||
|
||||
// Remember the human-readable branch name for the new timeline.
|
||||
// FIXME: there's a race condition, if you create a branch with the same
|
||||
// name concurrently.
|
||||
let data = newtli.to_string();
|
||||
fs::write(conf.branch_path(&branchname), data)?;
|
||||
|
||||
Ok(BranchInfo {
|
||||
name: branchname.to_string(),
|
||||
timeline_id: newtli,
|
||||
|
||||
@@ -732,7 +732,7 @@ impl Connection {
|
||||
|
||||
// Check that the timeline exists
|
||||
let repository = page_cache::get_repository();
|
||||
if repository.get_or_restore_timeline(timelineid).is_err() {
|
||||
if repository.get_timeline(timelineid).is_err() {
|
||||
bail!("client requested callmemaybe on timeline {} which does not exist in page server", timelineid);
|
||||
}
|
||||
|
||||
@@ -899,14 +899,13 @@ impl Connection {
|
||||
) -> anyhow::Result<()> {
|
||||
// check that the timeline exists
|
||||
let repository = page_cache::get_repository();
|
||||
let timeline = repository
|
||||
.get_or_restore_timeline(timelineid)
|
||||
.map_err(|_| {
|
||||
anyhow!(
|
||||
let timeline = repository.get_timeline(timelineid).map_err(|e| {
|
||||
error!("error fetching timeline: {:?}", e);
|
||||
anyhow!(
|
||||
"client requested basebackup on timeline {} which does not exist in page server",
|
||||
timelineid
|
||||
)
|
||||
})?;
|
||||
})?;
|
||||
/* switch client to COPYOUT */
|
||||
let stream = &mut self.stream;
|
||||
stream.write_u8(b'H')?;
|
||||
|
||||
@@ -15,21 +15,17 @@ use zenith_utils::lsn::Lsn;
|
||||
/// timelines, forked off from the same initial call to 'initdb'.
|
||||
pub trait Repository {
|
||||
/// Get Timeline handle for given zenith timeline ID.
|
||||
///
|
||||
/// The Timeline is expected to be already "open", i.e. `get_or_restore_timeline`
|
||||
/// should've been called on it earlier already.
|
||||
fn get_timeline(&self, timelineid: ZTimelineId) -> Result<Arc<dyn Timeline>>;
|
||||
|
||||
/// Get Timeline handle for given zenith timeline ID.
|
||||
///
|
||||
/// Creates a new Timeline object if it's not "open" already.
|
||||
fn get_or_restore_timeline(&self, timelineid: ZTimelineId) -> Result<Arc<dyn Timeline>>;
|
||||
/// Create a new, empty timeline. The caller is responsible for loading data into it
|
||||
fn create_empty_timeline(
|
||||
&self,
|
||||
timelineid: ZTimelineId,
|
||||
start_lsn: Lsn,
|
||||
) -> Result<Arc<dyn Timeline>>;
|
||||
|
||||
/// Create an empty timeline, without loading any data into it from possible on-disk snapshot.
|
||||
///
|
||||
/// For unit tests.
|
||||
#[cfg(test)]
|
||||
fn create_empty_timeline(&self, timelineid: ZTimelineId) -> Result<Arc<dyn Timeline>>;
|
||||
/// Branch a timeline
|
||||
fn branch_timeline(&self, src: ZTimelineId, dst: ZTimelineId, start_lsn: Lsn) -> Result<()>;
|
||||
|
||||
//fn get_stats(&self) -> RepositoryStats;
|
||||
}
|
||||
@@ -163,6 +159,13 @@ pub trait Timeline {
|
||||
/// valid LSN, so that the WAL receiver knows where to restart streaming.
|
||||
fn advance_last_record_lsn(&self, lsn: Lsn);
|
||||
fn get_last_record_lsn(&self) -> Lsn;
|
||||
|
||||
///
|
||||
/// Flush to disk all data that was written with the put_* functions
|
||||
///
|
||||
/// NOTE: This has nothing to do with checkpoint in PostgreSQL. We don't
|
||||
/// know anything about them here in the repository.
|
||||
fn checkpoint(&self) -> Result<()>;
|
||||
}
|
||||
|
||||
#[derive(Clone)]
|
||||
@@ -352,7 +355,7 @@ mod tests {
|
||||
// Create timeline to work on
|
||||
let repo = get_test_repo("test_relsize")?;
|
||||
let timelineid = ZTimelineId::from_str("11223344556677881122334455667788").unwrap();
|
||||
let tline = repo.create_empty_timeline(timelineid)?;
|
||||
let tline = repo.create_empty_timeline(timelineid, Lsn(0))?;
|
||||
|
||||
tline.init_valid_lsn(Lsn(1));
|
||||
tline.put_page_image(TEST_BUF(0), Lsn(2), TEST_IMG("foo blk 0 at 2"));
|
||||
@@ -439,7 +442,7 @@ mod tests {
|
||||
fn test_large_rel() -> Result<()> {
|
||||
let repo = get_test_repo("test_large_rel")?;
|
||||
let timelineid = ZTimelineId::from_str("11223344556677881122334455667788").unwrap();
|
||||
let tline = repo.create_empty_timeline(timelineid)?;
|
||||
let tline = repo.create_empty_timeline(timelineid, Lsn(0))?;
|
||||
|
||||
tline.init_valid_lsn(Lsn(1));
|
||||
|
||||
|
||||
@@ -6,19 +6,19 @@
|
||||
// LSN.
|
||||
|
||||
use crate::repository::{BufferTag, RelTag, Repository, Timeline, WALRecord};
|
||||
use crate::restore_local_repo::restore_timeline;
|
||||
use crate::restore_local_repo::import_timeline_wal;
|
||||
use crate::waldecoder::Oid;
|
||||
use crate::walredo::WalRedoManager;
|
||||
use crate::PageServerConf;
|
||||
use crate::ZTimelineId;
|
||||
// use crate::PageServerConf;
|
||||
// use crate::branches;
|
||||
use anyhow::{bail, Context, Result};
|
||||
use anyhow::{anyhow, bail, Context, Result};
|
||||
use bytes::{Buf, BufMut, Bytes, BytesMut};
|
||||
use log::*;
|
||||
use postgres_ffi::pg_constants;
|
||||
use std::cmp::min;
|
||||
use std::collections::HashMap;
|
||||
use std::convert::TryInto;
|
||||
use std::str::FromStr;
|
||||
use std::sync::atomic::AtomicU64;
|
||||
use std::sync::atomic::Ordering;
|
||||
use std::sync::{Arc, Mutex};
|
||||
@@ -88,6 +88,43 @@ struct CacheKey {
|
||||
pub lsn: Lsn,
|
||||
}
|
||||
|
||||
//
|
||||
// In addition to those per-page entries, the 'last_valid_lsn' and 'last_record_lsn'
|
||||
// values are also persisted in the rocskdb repository. They are stored with CacheKeys
|
||||
// with ROCKSDB_SPECIAL_FORKNUM, and 'blknum' indicates which value it is. The
|
||||
// rest of the key fields are zero. We use a CacheKey as the key for these too,
|
||||
// so that whenever we iterate through keys in the repository, we can safely parse
|
||||
// the key blob as CacheKey without checking for these special values first.
|
||||
//
|
||||
// FIXME: This is quite a similar concept to the special entries created by
|
||||
// `BufferTag::fork` function. Merge them somehow? These special keys are specific
|
||||
// to the rocksb implementation, not exposed to the rest of the system, but the
|
||||
// other special forks created by `BufferTag::fork` are also used elsewhere.
|
||||
//
|
||||
impl CacheKey {
|
||||
const fn special(id: u32) -> CacheKey {
|
||||
CacheKey {
|
||||
tag: BufferTag {
|
||||
rel: RelTag {
|
||||
forknum: pg_constants::ROCKSDB_SPECIAL_FORKNUM,
|
||||
spcnode: 0,
|
||||
dbnode: 0,
|
||||
relnode: 0,
|
||||
},
|
||||
blknum: id,
|
||||
},
|
||||
lsn: Lsn(0),
|
||||
}
|
||||
}
|
||||
|
||||
fn is_special(&self) -> bool {
|
||||
self.tag.rel.forknum == pg_constants::ROCKSDB_SPECIAL_FORKNUM
|
||||
}
|
||||
}
|
||||
|
||||
static LAST_VALID_LSN_KEY: CacheKey = CacheKey::special(0);
|
||||
static LAST_VALID_RECORD_LSN_KEY: CacheKey = CacheKey::special(1);
|
||||
|
||||
impl CacheKey {
|
||||
fn pack(&self, buf: &mut BytesMut) {
|
||||
self.tag.pack(buf);
|
||||
@@ -189,52 +226,54 @@ impl RocksRepository {
|
||||
}
|
||||
}
|
||||
|
||||
// Get handle to a given timeline. It is assumed to already exist.
|
||||
impl Repository for RocksRepository {
|
||||
fn get_timeline(&self, timelineid: ZTimelineId) -> Result<Arc<dyn Timeline>> {
|
||||
let timelines = self.timelines.lock().unwrap();
|
||||
|
||||
match timelines.get(&timelineid) {
|
||||
Some(timeline) => Ok(timeline.clone()),
|
||||
None => bail!("timeline not found"),
|
||||
}
|
||||
}
|
||||
|
||||
fn get_or_restore_timeline(&self, timelineid: ZTimelineId) -> Result<Arc<dyn Timeline>> {
|
||||
impl RocksRepository {
|
||||
fn get_rocks_timeline(&self, timelineid: ZTimelineId) -> Result<Arc<RocksTimeline>> {
|
||||
let mut timelines = self.timelines.lock().unwrap();
|
||||
|
||||
match timelines.get(&timelineid) {
|
||||
Some(timeline) => Ok(timeline.clone()),
|
||||
None => {
|
||||
let timeline = RocksTimeline::new(self.conf, timelineid, self.walredo_mgr.clone());
|
||||
let timeline =
|
||||
RocksTimeline::open(self.conf, timelineid, self.walredo_mgr.clone())?;
|
||||
|
||||
restore_timeline(self.conf, &timeline, timelineid)?;
|
||||
// Load any new WAL after the last checkpoint into the repository.
|
||||
info!(
|
||||
"Loading WAL for timeline {} starting at {}",
|
||||
timelineid,
|
||||
timeline.get_last_record_lsn()
|
||||
);
|
||||
let wal_dir = self.conf.timeline_path(timelineid).join("wal");
|
||||
import_timeline_wal(&wal_dir, &timeline, timeline.get_last_record_lsn())?;
|
||||
|
||||
let timeline_rc = Arc::new(timeline);
|
||||
|
||||
if self.conf.gc_horizon != 0 {
|
||||
RocksTimeline::launch_gc_thread(self.conf, timeline_rc.clone());
|
||||
}
|
||||
|
||||
timelines.insert(timelineid, timeline_rc.clone());
|
||||
|
||||
if self.conf.gc_horizon != 0 {
|
||||
let timeline_rc_copy = timeline_rc.clone();
|
||||
let conf = self.conf;
|
||||
let _gc_thread = thread::Builder::new()
|
||||
.name("Garbage collection thread".into())
|
||||
.spawn(move || {
|
||||
// FIXME
|
||||
timeline_rc_copy.do_gc(conf).expect("GC thread died");
|
||||
})
|
||||
.unwrap();
|
||||
}
|
||||
Ok(timeline_rc)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
fn create_empty_timeline(&self, timelineid: ZTimelineId) -> Result<Arc<dyn Timeline>> {
|
||||
// Get handle to a given timeline. It is assumed to already exist.
|
||||
impl Repository for RocksRepository {
|
||||
fn get_timeline(&self, timelineid: ZTimelineId) -> Result<Arc<dyn Timeline>> {
|
||||
Ok(self.get_rocks_timeline(timelineid)?)
|
||||
}
|
||||
|
||||
fn create_empty_timeline(
|
||||
&self,
|
||||
timelineid: ZTimelineId,
|
||||
start_lsn: Lsn,
|
||||
) -> Result<Arc<dyn Timeline>> {
|
||||
let mut timelines = self.timelines.lock().unwrap();
|
||||
|
||||
let timeline = RocksTimeline::new(&self.conf, timelineid, self.walredo_mgr.clone());
|
||||
let timeline =
|
||||
RocksTimeline::create(&self.conf, timelineid, self.walredo_mgr.clone(), start_lsn)?;
|
||||
|
||||
let timeline_rc = Arc::new(timeline);
|
||||
let r = timelines.insert(timelineid, timeline_rc.clone());
|
||||
@@ -244,13 +283,39 @@ impl Repository for RocksRepository {
|
||||
|
||||
Ok(timeline_rc)
|
||||
}
|
||||
|
||||
fn branch_timeline(&self, src: ZTimelineId, dst: ZTimelineId, at_lsn: Lsn) -> Result<()> {
|
||||
let src_timeline = self.get_rocks_timeline(src)?;
|
||||
|
||||
info!("branching at {}", at_lsn);
|
||||
|
||||
let dst_timeline =
|
||||
RocksTimeline::create(&self.conf, dst, self.walredo_mgr.clone(), at_lsn)?;
|
||||
|
||||
// Copy all entries <= LSN
|
||||
//
|
||||
// This is very inefficient, a far cry from the promise of cheap copy-on-write
|
||||
// branching. But it will do for now.
|
||||
let mut iter = src_timeline.db.raw_iterator();
|
||||
iter.seek_to_first();
|
||||
while iter.valid() {
|
||||
let k = iter.key().unwrap();
|
||||
let key = CacheKey::from_slice(k);
|
||||
|
||||
if !key.is_special() && key.lsn <= at_lsn {
|
||||
let v = iter.value().unwrap();
|
||||
dst_timeline.db.put(k, v)?;
|
||||
}
|
||||
iter.next();
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
impl RocksTimeline {
|
||||
fn open_rocksdb(conf: &PageServerConf, timelineid: ZTimelineId) -> rocksdb::DB {
|
||||
let path = conf.timeline_path(timelineid);
|
||||
/// common options used by `open` and `create`
|
||||
fn get_rocksdb_opts() -> rocksdb::Options {
|
||||
let mut opts = rocksdb::Options::default();
|
||||
opts.create_if_missing(true);
|
||||
opts.set_use_fsync(true);
|
||||
opts.set_compression_type(rocksdb::DBCompressionType::Lz4);
|
||||
opts.set_compaction_filter("ttl", move |_level: u32, _key: &[u8], val: &[u8]| {
|
||||
@@ -260,31 +325,87 @@ impl RocksTimeline {
|
||||
rocksdb::compaction_filter::Decision::Keep
|
||||
}
|
||||
});
|
||||
rocksdb::DB::open(&opts, &path).unwrap()
|
||||
opts
|
||||
}
|
||||
|
||||
fn new(
|
||||
conf: &'static PageServerConf,
|
||||
/// Open a RocksDB database, and load the last valid and record LSNs into memory.
|
||||
fn open(
|
||||
conf: &PageServerConf,
|
||||
timelineid: ZTimelineId,
|
||||
walredo_mgr: Arc<dyn WalRedoManager>,
|
||||
) -> RocksTimeline {
|
||||
RocksTimeline {
|
||||
db: RocksTimeline::open_rocksdb(conf, timelineid),
|
||||
) -> Result<RocksTimeline> {
|
||||
let path = conf.timeline_path(timelineid);
|
||||
let db = rocksdb::DB::open(&RocksTimeline::get_rocksdb_opts(), path)?;
|
||||
|
||||
// Load these into memory
|
||||
let lsnstr = db
|
||||
.get(LAST_VALID_LSN_KEY.to_bytes())
|
||||
.with_context(|| "last_valid_lsn not found in repository")?
|
||||
.ok_or(anyhow!("empty last_valid_lsn"))?;
|
||||
let last_valid_lsn = Lsn::from_str(std::str::from_utf8(&lsnstr)?)?;
|
||||
let lsnstr = db
|
||||
.get(LAST_VALID_RECORD_LSN_KEY.to_bytes())
|
||||
.with_context(|| "last_record_lsn not found in repository")?
|
||||
.ok_or(anyhow!("empty last_record_lsn"))?;
|
||||
let last_record_lsn = Lsn::from_str(std::str::from_utf8(&lsnstr)?)?;
|
||||
|
||||
let timeline = RocksTimeline {
|
||||
db,
|
||||
walredo_mgr,
|
||||
|
||||
last_valid_lsn: SeqWait::new(Lsn(0)),
|
||||
last_record_lsn: AtomicLsn::new(0),
|
||||
last_valid_lsn: SeqWait::new(last_valid_lsn),
|
||||
last_record_lsn: AtomicLsn::new(last_record_lsn.0),
|
||||
|
||||
num_entries: AtomicU64::new(0),
|
||||
num_page_images: AtomicU64::new(0),
|
||||
num_wal_records: AtomicU64::new(0),
|
||||
num_getpage_requests: AtomicU64::new(0),
|
||||
}
|
||||
};
|
||||
Ok(timeline)
|
||||
}
|
||||
|
||||
/// Create a new RocksDB database. It is initally empty, except for the last
|
||||
/// valid and last record LSNs, which are set to 'start_lsn'.
|
||||
fn create(
|
||||
conf: &PageServerConf,
|
||||
timelineid: ZTimelineId,
|
||||
walredo_mgr: Arc<dyn WalRedoManager>,
|
||||
start_lsn: Lsn,
|
||||
) -> Result<RocksTimeline> {
|
||||
let path = conf.timeline_path(timelineid);
|
||||
let mut opts = RocksTimeline::get_rocksdb_opts();
|
||||
opts.create_if_missing(true);
|
||||
opts.set_error_if_exists(true);
|
||||
let db = rocksdb::DB::open(&opts, path)?;
|
||||
|
||||
let timeline = RocksTimeline {
|
||||
db,
|
||||
walredo_mgr,
|
||||
|
||||
last_valid_lsn: SeqWait::new(start_lsn),
|
||||
last_record_lsn: AtomicLsn::new(start_lsn.0),
|
||||
|
||||
num_entries: AtomicU64::new(0),
|
||||
num_page_images: AtomicU64::new(0),
|
||||
num_wal_records: AtomicU64::new(0),
|
||||
num_getpage_requests: AtomicU64::new(0),
|
||||
};
|
||||
// Write the initial last_valid/record_lsn values
|
||||
timeline.checkpoint()?;
|
||||
Ok(timeline)
|
||||
}
|
||||
|
||||
fn launch_gc_thread(conf: &'static PageServerConf, timeline_rc: Arc<RocksTimeline>) {
|
||||
let timeline_rc_copy = timeline_rc.clone();
|
||||
let _gc_thread = thread::Builder::new()
|
||||
.name("Garbage collection thread".into())
|
||||
.spawn(move || {
|
||||
// FIXME
|
||||
timeline_rc_copy.do_gc(conf).expect("GC thread died");
|
||||
})
|
||||
.unwrap();
|
||||
}
|
||||
}
|
||||
|
||||
impl RocksTimeline {
|
||||
///
|
||||
/// Collect all the WAL records that are needed to reconstruct a page
|
||||
/// image for the given cache entry.
|
||||
@@ -813,6 +934,23 @@ impl Timeline for RocksTimeline {
|
||||
self.last_valid_lsn.load()
|
||||
}
|
||||
|
||||
// Flush all the changes written so far with PUT functions to disk.
|
||||
// RocksDB writes out things as we go (?), so we don't need to do much here. We just
|
||||
// write out the last valid and record LSNs.
|
||||
fn checkpoint(&self) -> Result<()> {
|
||||
let last_valid_lsn = self.last_valid_lsn.load();
|
||||
self.db
|
||||
.put(LAST_VALID_LSN_KEY.to_bytes(), last_valid_lsn.to_string())?;
|
||||
self.db.put(
|
||||
LAST_VALID_RECORD_LSN_KEY.to_bytes(),
|
||||
self.last_record_lsn.load().to_string(),
|
||||
)?;
|
||||
|
||||
trace!("checkpoint at {}", last_valid_lsn);
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
//
|
||||
// Get statistics to be displayed in the user interface.
|
||||
//
|
||||
|
||||
@@ -1,15 +1,7 @@
|
||||
//
|
||||
// Restore chunks from local Zenith repository
|
||||
//
|
||||
// This runs once at Page Server startup. It loads all the "snapshots" and all
|
||||
// WAL from all timelines from the local zenith repository into the in-memory page
|
||||
// cache.
|
||||
//
|
||||
// This also initializes the "last valid LSN" in the page cache to the last LSN
|
||||
// seen in the WAL, so that when the WAL receiver is started, it starts
|
||||
// streaming from that LSN.
|
||||
//
|
||||
|
||||
//!
|
||||
//! Import data and WAL from a PostgreSQL data directory and WAL segments into
|
||||
//! zenith repository
|
||||
//!
|
||||
use log::*;
|
||||
use std::cmp::max;
|
||||
use std::fs;
|
||||
@@ -31,53 +23,6 @@ use postgres_ffi::relfile_utils::*;
|
||||
use postgres_ffi::xlog_utils::*;
|
||||
use zenith_utils::lsn::Lsn;
|
||||
|
||||
///
|
||||
/// Load all WAL and all relation data pages from local disk into the repository.
|
||||
///
|
||||
pub fn restore_timeline(
|
||||
conf: &PageServerConf,
|
||||
timeline: &dyn Timeline,
|
||||
timelineid: ZTimelineId,
|
||||
) -> Result<()> {
|
||||
let timelinepath = PathBuf::from("timelines").join(timelineid.to_string());
|
||||
|
||||
if !timelinepath.exists() {
|
||||
anyhow::bail!("timeline {} does not exist in the page server's repository");
|
||||
}
|
||||
|
||||
// Scan .zenith/timelines/<timeline>/snapshots
|
||||
let snapshotspath = PathBuf::from("timelines")
|
||||
.join(timelineid.to_string())
|
||||
.join("snapshots");
|
||||
|
||||
let mut last_snapshot_lsn: Lsn = Lsn(0);
|
||||
|
||||
for direntry in fs::read_dir(&snapshotspath).unwrap() {
|
||||
let direntry = direntry?;
|
||||
let filename = direntry.file_name();
|
||||
let lsn = Lsn::from_filename(&filename)?;
|
||||
last_snapshot_lsn = max(lsn, last_snapshot_lsn);
|
||||
|
||||
// FIXME: pass filename as Path instead of str?
|
||||
let filename_str = filename.into_string().unwrap();
|
||||
restore_snapshot(conf, timeline, timelineid, &filename_str)?;
|
||||
info!("restored snapshot at {:?}", filename_str);
|
||||
}
|
||||
|
||||
if last_snapshot_lsn == Lsn(0) {
|
||||
error!(
|
||||
"could not find valid snapshot in {}",
|
||||
snapshotspath.display()
|
||||
);
|
||||
// TODO return error?
|
||||
}
|
||||
timeline.init_valid_lsn(last_snapshot_lsn);
|
||||
|
||||
restore_wal(timeline, timelineid, last_snapshot_lsn)?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
///
|
||||
/// Find latest snapshot in a timeline's 'snapshots' directory
|
||||
///
|
||||
@@ -102,19 +47,16 @@ pub fn find_latest_snapshot(_conf: &PageServerConf, timeline: ZTimelineId) -> Re
|
||||
Ok(last_snapshot_lsn)
|
||||
}
|
||||
|
||||
fn restore_snapshot(
|
||||
_conf: &PageServerConf,
|
||||
///
|
||||
/// Import all relation data pages from local disk into the repository.
|
||||
///
|
||||
pub fn import_timeline_from_postgres_datadir(
|
||||
path: &Path,
|
||||
timeline: &dyn Timeline,
|
||||
timelineid: ZTimelineId,
|
||||
snapshot: &str,
|
||||
lsn: Lsn,
|
||||
) -> Result<()> {
|
||||
let snapshotpath = PathBuf::from("timelines")
|
||||
.join(timelineid.to_string())
|
||||
.join("snapshots")
|
||||
.join(snapshot);
|
||||
|
||||
// Scan 'global'
|
||||
for direntry in fs::read_dir(snapshotpath.join("global"))? {
|
||||
for direntry in fs::read_dir(path.join("global"))? {
|
||||
let direntry = direntry?;
|
||||
match direntry.file_name().to_str() {
|
||||
None => continue,
|
||||
@@ -124,19 +66,19 @@ fn restore_snapshot(
|
||||
Some("pg_filenode.map") => continue,
|
||||
|
||||
// Load any relation files into the page server
|
||||
_ => restore_relfile(
|
||||
_ => import_relfile(
|
||||
&direntry.path(),
|
||||
timeline,
|
||||
snapshot,
|
||||
lsn,
|
||||
pg_constants::GLOBALTABLESPACE_OID,
|
||||
0,
|
||||
&direntry.path(),
|
||||
)?,
|
||||
}
|
||||
}
|
||||
|
||||
// Scan 'base'. It contains database dirs, the database OID is the filename.
|
||||
// E.g. 'base/12345', where 12345 is the database OID.
|
||||
for direntry in fs::read_dir(snapshotpath.join("base"))? {
|
||||
for direntry in fs::read_dir(path.join("base"))? {
|
||||
let direntry = direntry?;
|
||||
|
||||
let dboid = direntry.file_name().to_str().unwrap().parse::<u32>()?;
|
||||
@@ -151,30 +93,31 @@ fn restore_snapshot(
|
||||
Some("pg_filenode.map") => continue,
|
||||
|
||||
// Load any relation files into the page server
|
||||
_ => restore_relfile(
|
||||
_ => import_relfile(
|
||||
&direntry.path(),
|
||||
timeline,
|
||||
snapshot,
|
||||
lsn,
|
||||
pg_constants::DEFAULTTABLESPACE_OID,
|
||||
dboid,
|
||||
&direntry.path(),
|
||||
)?,
|
||||
}
|
||||
}
|
||||
}
|
||||
// TODO: Scan pg_tblspc
|
||||
|
||||
timeline.checkpoint()?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn restore_relfile(
|
||||
// subroutine of import_timeline_from_postgres_datadir(), to load one relation file.
|
||||
fn import_relfile(
|
||||
path: &Path,
|
||||
timeline: &dyn Timeline,
|
||||
snapshot: &str,
|
||||
lsn: Lsn,
|
||||
spcoid: Oid,
|
||||
dboid: Oid,
|
||||
path: &Path,
|
||||
) -> Result<()> {
|
||||
let lsn = Lsn::from_hex(snapshot)?;
|
||||
|
||||
// Does it look like a relation file?
|
||||
|
||||
let p = parse_relfilename(path.file_name().unwrap().to_str().unwrap());
|
||||
@@ -228,24 +171,22 @@ fn restore_relfile(
|
||||
Ok(())
|
||||
}
|
||||
|
||||
// Scan WAL on a timeline, starting from given LSN, and load all the records
|
||||
// into the page cache.
|
||||
fn restore_wal(timeline: &dyn Timeline, timelineid: ZTimelineId, startpoint: Lsn) -> Result<()> {
|
||||
let walpath = format!("timelines/{}/wal", timelineid);
|
||||
|
||||
/// Scan PostgreSQL WAL files in given directory, and load all records >= 'startpoint' into
|
||||
/// the repository.
|
||||
pub fn import_timeline_wal(walpath: &Path, timeline: &dyn Timeline, startpoint: Lsn) -> Result<()> {
|
||||
let mut waldecoder = WalStreamDecoder::new(startpoint);
|
||||
|
||||
let mut segno = startpoint.segment_number(pg_constants::WAL_SEGMENT_SIZE);
|
||||
let mut offset = startpoint.segment_offset(pg_constants::WAL_SEGMENT_SIZE);
|
||||
let mut last_lsn = Lsn(0);
|
||||
let mut last_lsn = startpoint;
|
||||
loop {
|
||||
// FIXME: assume postgresql tli 1 for now
|
||||
let filename = XLogFileName(1, segno, pg_constants::WAL_SEGMENT_SIZE);
|
||||
let mut path = walpath.clone() + "/" + &filename;
|
||||
let mut path = walpath.join(&filename);
|
||||
|
||||
// It could be as .partial
|
||||
if !PathBuf::from(&path).exists() {
|
||||
path += ".partial";
|
||||
path = walpath.join(filename + ".partial");
|
||||
}
|
||||
|
||||
// Slurp the WAL file
|
||||
@@ -287,12 +228,16 @@ fn restore_wal(timeline: &dyn Timeline, timelineid: ZTimelineId, startpoint: Lsn
|
||||
nrecords += 1;
|
||||
}
|
||||
|
||||
info!("restored {} records from WAL file {}", nrecords, filename);
|
||||
info!(
|
||||
"imported {} records from WAL file {} up to {}",
|
||||
nrecords,
|
||||
path.display(),
|
||||
last_lsn
|
||||
);
|
||||
|
||||
segno += 1;
|
||||
offset = 0;
|
||||
}
|
||||
info!("reached end of WAL at {}", last_lsn);
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
@@ -10,7 +10,7 @@ use crate::page_cache;
|
||||
use crate::waldecoder::*;
|
||||
use crate::PageServerConf;
|
||||
use crate::ZTimelineId;
|
||||
use anyhow::Error;
|
||||
use anyhow::{Error, Result};
|
||||
use lazy_static::lazy_static;
|
||||
use log::*;
|
||||
use postgres::fallible_iterator::FallibleIterator;
|
||||
@@ -20,6 +20,7 @@ use postgres_ffi::pg_constants;
|
||||
use postgres_ffi::xlog_utils::*;
|
||||
use postgres_protocol::message::backend::ReplicationMessage;
|
||||
use postgres_types::PgLsn;
|
||||
use std::cmp::{max, min};
|
||||
use std::collections::HashMap;
|
||||
use std::fs;
|
||||
use std::fs::{File, OpenOptions};
|
||||
@@ -139,7 +140,7 @@ fn walreceiver_main(
|
||||
// If we had previously received WAL up to some point in the middle of a WAL record, we
|
||||
// better start from the end of last full WAL record, not in the middle of one. Hence,
|
||||
// use 'last_record_lsn' rather than 'last_valid_lsn' here.
|
||||
let last_rec_lsn = timeline.get_last_record_lsn();
|
||||
let mut last_rec_lsn = timeline.get_last_record_lsn();
|
||||
let mut startpoint = last_rec_lsn;
|
||||
|
||||
if startpoint == Lsn(0) {
|
||||
@@ -173,6 +174,7 @@ fn walreceiver_main(
|
||||
let data = xlog_data.data();
|
||||
let startlsn = Lsn::from(xlog_data.wal_start());
|
||||
let endlsn = startlsn + data.len() as u64;
|
||||
let prev_last_rec_lsn = last_rec_lsn;
|
||||
|
||||
write_wal_file(startlsn, timelineid, pg_constants::WAL_SEGMENT_SIZE, data)?;
|
||||
|
||||
@@ -187,6 +189,7 @@ fn walreceiver_main(
|
||||
// Now that this record has been handled, let the page cache know that
|
||||
// it is up-to-date to this LSN
|
||||
timeline.advance_last_record_lsn(lsn);
|
||||
last_rec_lsn = lsn;
|
||||
}
|
||||
|
||||
// Update the last_valid LSN value in the page cache one more time. We updated
|
||||
@@ -197,6 +200,33 @@ fn walreceiver_main(
|
||||
// flush ptr.
|
||||
timeline.advance_last_valid_lsn(endlsn);
|
||||
|
||||
// Somewhat arbitrarily, if we have at least 10 complete wal segments (16 MB each),
|
||||
// "checkpoint" the repository to flush all the changes from WAL we've processed
|
||||
// so far to disk. After this, we don't need the original WAL anymore, and it
|
||||
// can be removed.
|
||||
//
|
||||
// TODO: We don't actually dare to remove the WAL. It's useful for debugging,
|
||||
// and we might it for logical decoiding other things in the future. Although
|
||||
// we should also be able to fetch it back from the WAL safekeepers or S3 if
|
||||
// needed.
|
||||
if prev_last_rec_lsn.segment_number(pg_constants::WAL_SEGMENT_SIZE)
|
||||
!= last_rec_lsn.segment_number(pg_constants::WAL_SEGMENT_SIZE)
|
||||
{
|
||||
info!("switched segment {} to {}", prev_last_rec_lsn, last_rec_lsn);
|
||||
let (oldest_segno, newest_segno) = find_wal_file_range(
|
||||
timelineid,
|
||||
pg_constants::WAL_SEGMENT_SIZE,
|
||||
last_rec_lsn,
|
||||
)?;
|
||||
|
||||
if newest_segno - oldest_segno >= 10 {
|
||||
timeline.checkpoint()?;
|
||||
|
||||
// TODO: This is where we could remove WAL older than last_rec_lsn.
|
||||
//remove_wal_files(timelineid, pg_constants::WAL_SEGMENT_SIZE, last_rec_lsn)?;
|
||||
}
|
||||
}
|
||||
|
||||
if !caught_up && endlsn >= end_of_wal {
|
||||
info!("caught up at LSN {}", endlsn);
|
||||
caught_up = true;
|
||||
@@ -233,6 +263,45 @@ fn walreceiver_main(
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn find_wal_file_range(
|
||||
timeline: ZTimelineId,
|
||||
wal_seg_size: usize,
|
||||
written_upto: Lsn,
|
||||
) -> Result<(u64, u64)> {
|
||||
let written_upto_segno = written_upto.segment_number(wal_seg_size);
|
||||
|
||||
let mut oldest_segno = written_upto_segno;
|
||||
let mut newest_segno = written_upto_segno;
|
||||
// Scan the wal directory, and count how many WAL filed we could remove
|
||||
let wal_dir = PathBuf::from(format!("timelines/{}/wal", timeline));
|
||||
for entry in fs::read_dir(wal_dir)? {
|
||||
let entry = entry?;
|
||||
let path = entry.path();
|
||||
|
||||
if path.is_dir() {
|
||||
continue;
|
||||
}
|
||||
|
||||
let filename = path.file_name().unwrap().to_str().unwrap();
|
||||
|
||||
if IsXLogFileName(filename) {
|
||||
let (segno, _tli) = XLogFromFileName(filename, wal_seg_size);
|
||||
|
||||
if segno > written_upto_segno {
|
||||
// that's strange.
|
||||
warn!("there is a WAL file from future at {}", path.display());
|
||||
continue;
|
||||
}
|
||||
|
||||
oldest_segno = min(oldest_segno, segno);
|
||||
newest_segno = max(newest_segno, segno);
|
||||
}
|
||||
}
|
||||
// FIXME: would be good to assert that there are no gaps in the WAL files
|
||||
|
||||
Ok((oldest_segno, newest_segno))
|
||||
}
|
||||
|
||||
/// Data returned from the postgres `IDENTIFY_SYSTEM` command
|
||||
///
|
||||
/// See the [postgres docs] for more details.
|
||||
|
||||
@@ -58,6 +58,24 @@ pub trait WalRedoManager: Send + Sync {
|
||||
) -> Result<Bytes, WalRedoError>;
|
||||
}
|
||||
|
||||
///
|
||||
/// A dummy WAL Redo Manager implementation that doesn't allow replaying
|
||||
/// anything. Currently used during bootstrapping (zenith init), to create
|
||||
/// a Repository object without launching the real WAL redo process.
|
||||
///
|
||||
pub struct DummyRedoManager {}
|
||||
impl crate::walredo::WalRedoManager for DummyRedoManager {
|
||||
fn request_redo(
|
||||
&self,
|
||||
_tag: BufferTag,
|
||||
_lsn: Lsn,
|
||||
_base_img: Option<Bytes>,
|
||||
_records: Vec<WALRecord>,
|
||||
) -> Result<Bytes, WalRedoError> {
|
||||
Err(WalRedoError::InvalidState)
|
||||
}
|
||||
}
|
||||
|
||||
static TIMEOUT: Duration = Duration::from_secs(20);
|
||||
|
||||
///
|
||||
@@ -92,6 +110,9 @@ struct WalRedoRequest {
|
||||
pub enum WalRedoError {
|
||||
#[error(transparent)]
|
||||
IoError(#[from] std::io::Error),
|
||||
|
||||
#[error("cannot perform WAL redo now")]
|
||||
InvalidState,
|
||||
}
|
||||
|
||||
///
|
||||
|
||||
@@ -16,6 +16,8 @@ pub const FSM_FORKNUM: u8 = 1;
|
||||
pub const VISIBILITYMAP_FORKNUM: u8 = 2;
|
||||
pub const INIT_FORKNUM: u8 = 3;
|
||||
|
||||
pub const ROCKSDB_SPECIAL_FORKNUM: u8 = 50;
|
||||
|
||||
// From storage_xlog.h
|
||||
pub const SMGR_TRUNCATE_HEAP: u32 = 0x0001;
|
||||
|
||||
|
||||
Reference in New Issue
Block a user