mirror of
https://github.com/neondatabase/neon.git
synced 2026-01-13 16:32:56 +00:00
Prohibit branch creation and basebackup at out of scope lsns
Out of scope LSNs include pre initdb LSNs, and LSNs prior to latest_gc_cutoff. To get there there was also two cleanups: * Fix error handling in Execute message handler. This fixes behaviour when basebackup retured an error. Previously pageserver thread just died. * Remove "ancestor" file which previously contained ancestor id and branch lsn. Currently the same data can be obtained from metadata file. And just the way we handled ancestor file in the code introduced the case when branching fails timeline directory is created but there is no data in it except ancestor file. And this confused gc because it scans directories. So it is better to just remove ancestor file and clean up this timeline directory creation so it happens after all validity checks have passed
This commit is contained in:
committed by
Dmitry Rodionov
parent
d47f610606
commit
130184fee9
@@ -10,7 +10,7 @@
|
||||
//! This module is responsible for creation of such tarball
|
||||
//! from data stored in object storage.
|
||||
//!
|
||||
use anyhow::Result;
|
||||
use anyhow::{Context, Result};
|
||||
use bytes::{BufMut, BytesMut};
|
||||
use log::*;
|
||||
use std::fmt::Write as FmtWrite;
|
||||
@@ -242,10 +242,12 @@ impl<'a> Basebackup<'a> {
|
||||
fn add_pgcontrol_file(&mut self) -> anyhow::Result<()> {
|
||||
let checkpoint_bytes = self
|
||||
.timeline
|
||||
.get_page_at_lsn(RelishTag::Checkpoint, 0, self.lsn)?;
|
||||
let pg_control_bytes =
|
||||
self.timeline
|
||||
.get_page_at_lsn(RelishTag::ControlFile, 0, self.lsn)?;
|
||||
.get_page_at_lsn(RelishTag::Checkpoint, 0, self.lsn)
|
||||
.context("failed to get checkpoint bytes")?;
|
||||
let pg_control_bytes = self
|
||||
.timeline
|
||||
.get_page_at_lsn(RelishTag::ControlFile, 0, self.lsn)
|
||||
.context("failed get control bytes")?;
|
||||
let mut pg_control = ControlFileData::decode(&pg_control_bytes)?;
|
||||
let mut checkpoint = CheckPoint::decode(&checkpoint_bytes)?;
|
||||
|
||||
|
||||
@@ -42,8 +42,6 @@ pub struct BranchInfo {
|
||||
impl BranchInfo {
|
||||
pub fn from_path<T: AsRef<Path>>(
|
||||
path: T,
|
||||
conf: &PageServerConf,
|
||||
tenantid: &ZTenantId,
|
||||
repo: &Arc<dyn Repository>,
|
||||
include_non_incremental_logical_size: bool,
|
||||
) -> Result<Self> {
|
||||
@@ -58,27 +56,14 @@ impl BranchInfo {
|
||||
|
||||
let timeline = repo.get_timeline(timeline_id)?;
|
||||
|
||||
let ancestor_path = conf.ancestor_path(&timeline_id, tenantid);
|
||||
let mut ancestor_id: Option<String> = None;
|
||||
let mut ancestor_lsn: Option<String> = None;
|
||||
|
||||
if ancestor_path.exists() {
|
||||
let ancestor = std::fs::read_to_string(ancestor_path)?;
|
||||
let mut strings = ancestor.split('@');
|
||||
|
||||
ancestor_id = Some(
|
||||
strings
|
||||
.next()
|
||||
.with_context(|| "wrong branch ancestor point in time format")?
|
||||
.to_owned(),
|
||||
);
|
||||
ancestor_lsn = Some(
|
||||
strings
|
||||
.next()
|
||||
.with_context(|| "wrong branch ancestor point in time format")?
|
||||
.to_owned(),
|
||||
);
|
||||
}
|
||||
// we use ancestor lsn zero if we don't have an ancestor, so turn this into an option based on timeline id
|
||||
let (ancestor_id, ancestor_lsn) = match timeline.get_ancestor_timeline_id() {
|
||||
Some(ancestor_id) => (
|
||||
Some(ancestor_id.to_string()),
|
||||
Some(timeline.get_ancestor_lsn().to_string()),
|
||||
),
|
||||
None => (None, None),
|
||||
};
|
||||
|
||||
// non incremental size calculation can be heavy, so let it be optional
|
||||
// needed for tests to check size calculation
|
||||
@@ -154,7 +139,11 @@ pub fn create_repo(
|
||||
|
||||
info!("created directory structure in {}", repo_dir.display());
|
||||
|
||||
let tli = create_timeline(conf, None, &tenantid)?;
|
||||
// create a new timeline directory
|
||||
let timeline_id = ZTimelineId::generate();
|
||||
let timelinedir = conf.timeline_path(&timeline_id, &tenantid);
|
||||
|
||||
crashsafe_dir::create_dir(&timelinedir)?;
|
||||
|
||||
let repo = Arc::new(crate::layered_repository::LayeredRepository::new(
|
||||
conf,
|
||||
@@ -166,7 +155,7 @@ pub fn create_repo(
|
||||
// Load data into pageserver
|
||||
// TODO To implement zenith import we need to
|
||||
// move data loading out of create_repo()
|
||||
bootstrap_timeline(conf, tenantid, tli, repo.as_ref())?;
|
||||
bootstrap_timeline(conf, tenantid, timeline_id, repo.as_ref())?;
|
||||
|
||||
Ok(repo)
|
||||
}
|
||||
@@ -233,7 +222,9 @@ fn bootstrap_timeline(
|
||||
|
||||
// Import the contents of the data directory at the initial checkpoint
|
||||
// LSN, and any WAL after that.
|
||||
let timeline = repo.create_empty_timeline(tli)?;
|
||||
// Initdb lsn will be equal to last_record_lsn which will be set after import.
|
||||
// Because we know it upfront avoid having an option or dummy zero value by passing it to create_empty_timeline.
|
||||
let timeline = repo.create_empty_timeline(tli, lsn)?;
|
||||
restore_local_repo::import_timeline_from_postgres_datadir(
|
||||
&pgdata_path,
|
||||
timeline.writer().as_ref(),
|
||||
@@ -286,8 +277,6 @@ pub(crate) fn get_branches(
|
||||
})?;
|
||||
BranchInfo::from_path(
|
||||
dir_entry.path(),
|
||||
conf,
|
||||
tenantid,
|
||||
&repo,
|
||||
include_non_incremental_logical_size,
|
||||
)
|
||||
@@ -333,24 +322,24 @@ pub(crate) fn create_branch(
|
||||
);
|
||||
}
|
||||
|
||||
// create a new timeline directory for it
|
||||
let newtli = create_timeline(conf, Some(startpoint), tenantid)?;
|
||||
let new_timeline_id = ZTimelineId::generate();
|
||||
|
||||
// Let the Repository backend do its initialization
|
||||
repo.branch_timeline(startpoint.timelineid, newtli, startpoint.lsn)?;
|
||||
// Forward entire timeline creation routine to repository
|
||||
// backend, so it can do all needed initialization
|
||||
repo.branch_timeline(startpoint.timelineid, new_timeline_id, startpoint.lsn)?;
|
||||
|
||||
// Remember the human-readable branch name for the new timeline.
|
||||
// FIXME: there's a race condition, if you create a branch with the same
|
||||
// name concurrently.
|
||||
let data = newtli.to_string();
|
||||
let data = new_timeline_id.to_string();
|
||||
fs::write(conf.branch_path(branchname, tenantid), data)?;
|
||||
|
||||
Ok(BranchInfo {
|
||||
name: branchname.to_string(),
|
||||
timeline_id: newtli,
|
||||
timeline_id: new_timeline_id,
|
||||
latest_valid_lsn: startpoint.lsn,
|
||||
ancestor_id: None,
|
||||
ancestor_lsn: None,
|
||||
ancestor_id: Some(startpoint.timelineid.to_string()),
|
||||
ancestor_lsn: Some(startpoint.lsn.to_string()),
|
||||
current_logical_size: 0,
|
||||
current_logical_size_non_incremental: Some(0),
|
||||
})
|
||||
@@ -428,24 +417,3 @@ fn parse_point_in_time(
|
||||
|
||||
bail!("could not parse point-in-time {}", s);
|
||||
}
|
||||
|
||||
fn create_timeline(
|
||||
conf: &PageServerConf,
|
||||
ancestor: Option<PointInTime>,
|
||||
tenantid: &ZTenantId,
|
||||
) -> Result<ZTimelineId> {
|
||||
// Create initial timeline
|
||||
|
||||
let timelineid = ZTimelineId::generate();
|
||||
|
||||
let timelinedir = conf.timeline_path(&timelineid, tenantid);
|
||||
|
||||
fs::create_dir(&timelinedir)?;
|
||||
|
||||
if let Some(ancestor) = ancestor {
|
||||
let data = format!("{}@{}", ancestor.timelineid, ancestor.lsn);
|
||||
fs::write(timelinedir.join("ancestor"), data)?;
|
||||
}
|
||||
|
||||
Ok(timelineid)
|
||||
}
|
||||
|
||||
@@ -132,13 +132,7 @@ async fn branch_detail_handler(request: Request<Body>) -> Result<Response<Body>,
|
||||
let response_data = tokio::task::spawn_blocking(move || {
|
||||
let _enter = info_span!("branch_detail", tenant = %tenantid, branch=%branch_name).entered();
|
||||
let repo = tenant_mgr::get_repository_for_tenant(tenantid)?;
|
||||
BranchInfo::from_path(
|
||||
path,
|
||||
conf,
|
||||
&tenantid,
|
||||
&repo,
|
||||
include_non_incremental_logical_size,
|
||||
)
|
||||
BranchInfo::from_path(path, &repo, include_non_incremental_logical_size)
|
||||
})
|
||||
.await
|
||||
.map_err(ApiError::from_err)??;
|
||||
|
||||
@@ -140,13 +140,17 @@ impl Repository for LayeredRepository {
|
||||
Ok(self.get_timeline_locked(timelineid, &mut timelines)?)
|
||||
}
|
||||
|
||||
fn create_empty_timeline(&self, timelineid: ZTimelineId) -> Result<Arc<dyn Timeline>> {
|
||||
fn create_empty_timeline(
|
||||
&self,
|
||||
timelineid: ZTimelineId,
|
||||
initdb_lsn: Lsn,
|
||||
) -> Result<Arc<dyn Timeline>> {
|
||||
let mut timelines = self.timelines.lock().unwrap();
|
||||
|
||||
// Create the timeline directory, and write initial metadata to file.
|
||||
crashsafe_dir::create_dir_all(self.conf.timeline_path(&timelineid, &self.tenantid))?;
|
||||
|
||||
let metadata = TimelineMetadata::new(Lsn(0), None, None, Lsn(0), Lsn(0));
|
||||
let metadata = TimelineMetadata::new(Lsn(0), None, None, Lsn(0), Lsn(0), initdb_lsn);
|
||||
Self::save_metadata(self.conf, timelineid, self.tenantid, &metadata, true)?;
|
||||
|
||||
let timeline = LayeredTimeline::new(
|
||||
@@ -174,13 +178,9 @@ impl Repository for LayeredRepository {
|
||||
let mut timelines = self.timelines.lock().unwrap();
|
||||
let src_timeline = self.get_timeline_locked(src, &mut timelines)?;
|
||||
|
||||
let latest_gc_cutoff = src_timeline.latest_gc_cutoff.load();
|
||||
ensure!(
|
||||
start_lsn >= latest_gc_cutoff,
|
||||
"Branch start LSN {} is earlier than latest GC horizon {} (we might've already garbage collected needed data)",
|
||||
start_lsn,
|
||||
latest_gc_cutoff,
|
||||
);
|
||||
src_timeline
|
||||
.check_lsn_is_in_scope(start_lsn)
|
||||
.context("invalid branch start lsn")?;
|
||||
|
||||
let RecordLsn {
|
||||
last: src_last,
|
||||
@@ -194,6 +194,11 @@ impl Repository for LayeredRepository {
|
||||
None
|
||||
};
|
||||
|
||||
// create a new timeline directory
|
||||
let timelinedir = self.conf.timeline_path(&dst, &self.tenantid);
|
||||
|
||||
crashsafe_dir::create_dir(&timelinedir)?;
|
||||
|
||||
// Create the metadata file, noting the ancestor of the new timeline.
|
||||
// There is initially no data in it, but all the read-calls know to look
|
||||
// into the ancestor.
|
||||
@@ -202,15 +207,13 @@ impl Repository for LayeredRepository {
|
||||
dst_prev,
|
||||
Some(src),
|
||||
start_lsn,
|
||||
src_timeline.latest_gc_cutoff.load(),
|
||||
src_timeline.latest_gc_cutoff_lsn.load(),
|
||||
src_timeline.initdb_lsn,
|
||||
);
|
||||
crashsafe_dir::create_dir_all(self.conf.timeline_path(&dst, &self.tenantid))?;
|
||||
Self::save_metadata(self.conf, dst, self.tenantid, &metadata, true)?;
|
||||
|
||||
info!(
|
||||
"branched timeline {} from {} at {} latest_gc_cutoff {}",
|
||||
dst, src, start_lsn, latest_gc_cutoff
|
||||
);
|
||||
info!("branched timeline {} from {} at {}", dst, src, start_lsn);
|
||||
|
||||
Ok(())
|
||||
}
|
||||
@@ -277,7 +280,8 @@ impl LayeredRepository {
|
||||
match timelines.get(&timelineid) {
|
||||
Some(timeline) => Ok(timeline.clone()),
|
||||
None => {
|
||||
let metadata = Self::load_metadata(self.conf, timelineid, self.tenantid)?;
|
||||
let metadata = Self::load_metadata(self.conf, timelineid, self.tenantid)
|
||||
.context("failed to load metadata")?;
|
||||
let disk_consistent_lsn = metadata.disk_consistent_lsn();
|
||||
|
||||
// Recurse to look up the ancestor timeline.
|
||||
@@ -307,7 +311,9 @@ impl LayeredRepository {
|
||||
)?;
|
||||
|
||||
// List the layers on disk, and load them into the layer map
|
||||
let loaded_layers = timeline.load_layer_map(disk_consistent_lsn)?;
|
||||
let loaded_layers = timeline
|
||||
.load_layer_map(disk_consistent_lsn)
|
||||
.context("failed to load layermap")?;
|
||||
if self.upload_relishes {
|
||||
schedule_timeline_upload(self.tenantid, timelineid, loaded_layers, metadata);
|
||||
}
|
||||
@@ -379,6 +385,7 @@ impl LayeredRepository {
|
||||
tenantid: ZTenantId,
|
||||
) -> Result<TimelineMetadata> {
|
||||
let path = metadata_path(conf, timelineid, tenantid);
|
||||
info!("loading metadata from {}", path.display());
|
||||
let metadata_bytes = std::fs::read(&path)?;
|
||||
TimelineMetadata::from_bytes(&metadata_bytes)
|
||||
}
|
||||
@@ -575,7 +582,15 @@ pub struct LayeredTimeline {
|
||||
write_lock: Mutex<()>,
|
||||
|
||||
// Needed to ensure that we can't create a branch at a point that was already garbage collected
|
||||
latest_gc_cutoff: AtomicLsn,
|
||||
latest_gc_cutoff_lsn: AtomicLsn,
|
||||
|
||||
// It may change across major versions so for simplicity
|
||||
// keep it after running initdb for a timeline.
|
||||
// It is needed in checks when we want to error on some operations
|
||||
// when they are requested for pre-initdb lsn.
|
||||
// It can be unified with latest_gc_cutoff_lsn under some "first_valid_lsn",
|
||||
// though lets keep them both for better error visibility.
|
||||
initdb_lsn: Lsn,
|
||||
}
|
||||
|
||||
/// Public interface functions
|
||||
@@ -584,6 +599,10 @@ impl Timeline for LayeredTimeline {
|
||||
self.ancestor_lsn
|
||||
}
|
||||
|
||||
fn get_ancestor_timeline_id(&self) -> Option<ZTimelineId> {
|
||||
self.ancestor_timeline.as_ref().map(|x| x.timelineid)
|
||||
}
|
||||
|
||||
/// Wait until WAL has been received up to the given LSN.
|
||||
fn wait_lsn(&self, lsn: Lsn) -> Result<()> {
|
||||
// This should never be called from the WAL receiver thread, because that could lead
|
||||
@@ -615,12 +634,12 @@ impl Timeline for LayeredTimeline {
|
||||
);
|
||||
}
|
||||
debug_assert!(lsn <= self.get_last_record_lsn());
|
||||
let latest_gc_cutoff = self.latest_gc_cutoff.load();
|
||||
let latest_gc_cutoff_lsn = self.latest_gc_cutoff_lsn.load();
|
||||
// error instead of assert to simplify testing
|
||||
ensure!(
|
||||
lsn >= latest_gc_cutoff,
|
||||
lsn >= latest_gc_cutoff_lsn,
|
||||
"tried to request a page version that was garbage collected. requested at {} gc cutoff {}",
|
||||
lsn, latest_gc_cutoff
|
||||
lsn, latest_gc_cutoff_lsn
|
||||
);
|
||||
|
||||
let seg = SegmentTag::from_blknum(rel, blknum);
|
||||
@@ -781,6 +800,28 @@ impl Timeline for LayeredTimeline {
|
||||
}
|
||||
}
|
||||
|
||||
///
|
||||
/// Validate lsn against initdb_lsn and latest_gc_cutoff_lsn.
|
||||
///
|
||||
fn check_lsn_is_in_scope(&self, lsn: Lsn) -> Result<()> {
|
||||
let initdb_lsn = self.initdb_lsn;
|
||||
ensure!(
|
||||
lsn >= initdb_lsn,
|
||||
"LSN {} is earlier than initdb lsn {}",
|
||||
lsn,
|
||||
initdb_lsn,
|
||||
);
|
||||
|
||||
let latest_gc_cutoff_lsn = self.latest_gc_cutoff_lsn.load();
|
||||
ensure!(
|
||||
lsn >= latest_gc_cutoff_lsn,
|
||||
"LSN {} is earlier than latest GC horizon {} (we might've already garbage collected needed data)",
|
||||
lsn,
|
||||
latest_gc_cutoff_lsn,
|
||||
);
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn get_last_record_lsn(&self) -> Lsn {
|
||||
self.last_record_lsn.load().last
|
||||
}
|
||||
@@ -889,7 +930,8 @@ impl LayeredTimeline {
|
||||
|
||||
write_lock: Mutex::new(()),
|
||||
|
||||
latest_gc_cutoff: AtomicLsn::from(metadata.latest_gc_cutoff()),
|
||||
latest_gc_cutoff_lsn: AtomicLsn::from(metadata.latest_gc_cutoff_lsn()),
|
||||
initdb_lsn: metadata.initdb_lsn(),
|
||||
};
|
||||
Ok(timeline)
|
||||
}
|
||||
@@ -1270,7 +1312,8 @@ impl LayeredTimeline {
|
||||
ondisk_prev_record_lsn,
|
||||
ancestor_timelineid,
|
||||
self.ancestor_lsn,
|
||||
self.latest_gc_cutoff.load(),
|
||||
self.latest_gc_cutoff_lsn.load(),
|
||||
self.initdb_lsn,
|
||||
);
|
||||
|
||||
LayeredRepository::save_metadata(
|
||||
@@ -1368,9 +1411,9 @@ impl LayeredTimeline {
|
||||
|
||||
let _enter = info_span!("garbage collection", timeline = %self.timelineid, tenant = %self.tenantid, cutoff = %cutoff).entered();
|
||||
|
||||
// We need to ensure that no one branches at a point before latest_gc_cutoff.
|
||||
// We need to ensure that no one branches at a point before latest_gc_cutoff_lsn.
|
||||
// See branch_timeline() for details.
|
||||
self.latest_gc_cutoff.store(cutoff);
|
||||
self.latest_gc_cutoff_lsn.store(cutoff);
|
||||
|
||||
info!("GC starting");
|
||||
|
||||
|
||||
@@ -292,7 +292,7 @@ pub fn list_files(
|
||||
deltafiles.push(deltafilename);
|
||||
} else if let Some(imgfilename) = ImageFileName::parse_str(fname) {
|
||||
imgfiles.push(imgfilename);
|
||||
} else if fname == METADATA_FILE_NAME || fname == "ancestor" || fname.ends_with(".old") {
|
||||
} else if fname == METADATA_FILE_NAME || fname.ends_with(".old") {
|
||||
// ignore these
|
||||
} else {
|
||||
warn!("unrecognized filename in timeline dir: {}", fname);
|
||||
|
||||
@@ -42,7 +42,8 @@ pub struct TimelineMetadata {
|
||||
prev_record_lsn: Option<Lsn>,
|
||||
ancestor_timeline: Option<ZTimelineId>,
|
||||
ancestor_lsn: Lsn,
|
||||
latest_gc_cutoff: Lsn,
|
||||
latest_gc_cutoff_lsn: Lsn,
|
||||
initdb_lsn: Lsn,
|
||||
}
|
||||
|
||||
/// Points to a place in pageserver's local directory,
|
||||
@@ -62,14 +63,16 @@ impl TimelineMetadata {
|
||||
prev_record_lsn: Option<Lsn>,
|
||||
ancestor_timeline: Option<ZTimelineId>,
|
||||
ancestor_lsn: Lsn,
|
||||
latest_gc_cutoff: Lsn,
|
||||
latest_gc_cutoff_lsn: Lsn,
|
||||
initdb_lsn: Lsn,
|
||||
) -> Self {
|
||||
Self {
|
||||
disk_consistent_lsn,
|
||||
prev_record_lsn,
|
||||
ancestor_timeline,
|
||||
ancestor_lsn,
|
||||
latest_gc_cutoff,
|
||||
latest_gc_cutoff_lsn,
|
||||
initdb_lsn,
|
||||
}
|
||||
}
|
||||
|
||||
@@ -125,8 +128,12 @@ impl TimelineMetadata {
|
||||
self.ancestor_lsn
|
||||
}
|
||||
|
||||
pub fn latest_gc_cutoff(&self) -> Lsn {
|
||||
self.latest_gc_cutoff
|
||||
pub fn latest_gc_cutoff_lsn(&self) -> Lsn {
|
||||
self.latest_gc_cutoff_lsn
|
||||
}
|
||||
|
||||
pub fn initdb_lsn(&self) -> Lsn {
|
||||
self.initdb_lsn
|
||||
}
|
||||
}
|
||||
|
||||
@@ -146,7 +153,8 @@ mod serialize {
|
||||
prev_record_lsn: &'a Option<Lsn>,
|
||||
ancestor_timeline: &'a Option<ZTimelineId>,
|
||||
ancestor_lsn: &'a Lsn,
|
||||
latest_gc_cutoff: &'a Lsn,
|
||||
latest_gc_cutoff_lsn: &'a Lsn,
|
||||
initdb_lsn: &'a Lsn,
|
||||
}
|
||||
|
||||
impl<'a> From<&'a TimelineMetadata> for SeTimelineMetadata<'a> {
|
||||
@@ -156,7 +164,8 @@ mod serialize {
|
||||
prev_record_lsn: &other.prev_record_lsn,
|
||||
ancestor_timeline: &other.ancestor_timeline,
|
||||
ancestor_lsn: &other.ancestor_lsn,
|
||||
latest_gc_cutoff: &other.latest_gc_cutoff,
|
||||
latest_gc_cutoff_lsn: &other.latest_gc_cutoff_lsn,
|
||||
initdb_lsn: &other.initdb_lsn,
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -167,7 +176,8 @@ mod serialize {
|
||||
prev_record_lsn: Option<Lsn>,
|
||||
ancestor_timeline: Option<ZTimelineId>,
|
||||
ancestor_lsn: Lsn,
|
||||
latest_gc_cutoff: Lsn,
|
||||
latest_gc_cutoff_lsn: Lsn,
|
||||
initdb_lsn: Lsn,
|
||||
}
|
||||
|
||||
impl From<DeTimelineMetadata> for TimelineMetadata {
|
||||
@@ -177,7 +187,8 @@ mod serialize {
|
||||
prev_record_lsn: other.prev_record_lsn,
|
||||
ancestor_timeline: other.ancestor_timeline,
|
||||
ancestor_lsn: other.ancestor_lsn,
|
||||
latest_gc_cutoff: other.latest_gc_cutoff,
|
||||
latest_gc_cutoff_lsn: other.latest_gc_cutoff_lsn,
|
||||
initdb_lsn: other.initdb_lsn,
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -196,7 +207,8 @@ mod tests {
|
||||
prev_record_lsn: Some(Lsn(0x100)),
|
||||
ancestor_timeline: Some(TIMELINE_ID),
|
||||
ancestor_lsn: Lsn(0),
|
||||
latest_gc_cutoff: Lsn(0),
|
||||
latest_gc_cutoff_lsn: Lsn(0),
|
||||
initdb_lsn: Lsn(0),
|
||||
};
|
||||
|
||||
let metadata_bytes = original_metadata
|
||||
|
||||
@@ -134,10 +134,6 @@ impl PageServerConf {
|
||||
self.timelines_path(tenantid).join(timelineid.to_string())
|
||||
}
|
||||
|
||||
fn ancestor_path(&self, timelineid: &ZTimelineId, tenantid: &ZTenantId) -> PathBuf {
|
||||
self.timeline_path(timelineid, tenantid).join("ancestor")
|
||||
}
|
||||
|
||||
//
|
||||
// Postgres distribution paths
|
||||
//
|
||||
|
||||
@@ -10,7 +10,7 @@
|
||||
// *callmemaybe <zenith timelineid> $url* -- ask pageserver to start walreceiver on $url
|
||||
//
|
||||
|
||||
use anyhow::{anyhow, bail, ensure, Result};
|
||||
use anyhow::{anyhow, bail, ensure, Context, Result};
|
||||
use bytes::{Buf, BufMut, Bytes, BytesMut};
|
||||
use lazy_static::lazy_static;
|
||||
use regex::Regex;
|
||||
@@ -456,6 +456,11 @@ impl PageServerHandler {
|
||||
|
||||
// check that the timeline exists
|
||||
let timeline = tenant_mgr::get_timeline_for_tenant(tenantid, timelineid)?;
|
||||
if let Some(lsn) = lsn {
|
||||
timeline
|
||||
.check_lsn_is_in_scope(lsn)
|
||||
.context("invalid basebackup lsn")?;
|
||||
}
|
||||
|
||||
// switch client to COPYOUT
|
||||
pgb.write_message(&BeMessage::CopyOutResponse)?;
|
||||
@@ -691,9 +696,7 @@ impl postgres_backend::Handler for PageServerHandler {
|
||||
.unwrap_or(Ok(self.conf.gc_horizon))?;
|
||||
|
||||
let repo = tenant_mgr::get_repository_for_tenant(tenantid)?;
|
||||
|
||||
let result = repo.gc_iteration(Some(timelineid), gc_horizon, true)?;
|
||||
|
||||
pgb.write_message_noflush(&BeMessage::RowDescription(&[
|
||||
RowDescriptor::int8_col(b"layer_relfiles_total"),
|
||||
RowDescriptor::int8_col(b"layer_relfiles_needed_by_cutoff"),
|
||||
|
||||
@@ -1623,6 +1623,6 @@ mod tests {
|
||||
}
|
||||
|
||||
fn dummy_metadata(disk_consistent_lsn: Lsn) -> TimelineMetadata {
|
||||
TimelineMetadata::new(disk_consistent_lsn, None, None, Lsn(0), Lsn(0))
|
||||
TimelineMetadata::new(disk_consistent_lsn, None, None, Lsn(0), Lsn(0), Lsn(0))
|
||||
}
|
||||
}
|
||||
|
||||
@@ -20,7 +20,12 @@ pub trait Repository: Send + Sync {
|
||||
fn get_timeline(&self, timelineid: ZTimelineId) -> Result<Arc<dyn Timeline>>;
|
||||
|
||||
/// Create a new, empty timeline. The caller is responsible for loading data into it
|
||||
fn create_empty_timeline(&self, timelineid: ZTimelineId) -> Result<Arc<dyn Timeline>>;
|
||||
/// Initdb lsn is provided for timeline impl to be able to perform checks for some operations against it.
|
||||
fn create_empty_timeline(
|
||||
&self,
|
||||
timelineid: ZTimelineId,
|
||||
initdb_lsn: Lsn,
|
||||
) -> Result<Arc<dyn Timeline>>;
|
||||
|
||||
/// Branch a timeline
|
||||
fn branch_timeline(&self, src: ZTimelineId, dst: ZTimelineId, start_lsn: Lsn) -> Result<()>;
|
||||
@@ -124,6 +129,9 @@ pub trait Timeline: Send + Sync {
|
||||
/// Get a list of all existing non-relational objects
|
||||
fn list_nonrels(&self, lsn: Lsn) -> Result<HashSet<RelishTag>>;
|
||||
|
||||
/// Get the ancestor's timeline id
|
||||
fn get_ancestor_timeline_id(&self) -> Option<ZTimelineId>;
|
||||
|
||||
/// Get the LSN where this branch was created
|
||||
fn get_ancestor_lsn(&self) -> Lsn;
|
||||
|
||||
@@ -151,6 +159,10 @@ pub trait Timeline: Send + Sync {
|
||||
/// know anything about them here in the repository.
|
||||
fn checkpoint(&self, cconf: CheckpointConfig) -> Result<()>;
|
||||
|
||||
///
|
||||
/// Check that it is valid to request operations with that lsn.
|
||||
fn check_lsn_is_in_scope(&self, lsn: Lsn) -> Result<()>;
|
||||
|
||||
/// Retrieve current logical size of the timeline
|
||||
///
|
||||
/// NOTE: counted incrementally, includes ancestors,
|
||||
@@ -347,7 +359,7 @@ mod tests {
|
||||
//repo.get_timeline("11223344556677881122334455667788");
|
||||
|
||||
// Create timeline to work on
|
||||
let tline = repo.create_empty_timeline(TIMELINE_ID)?;
|
||||
let tline = repo.create_empty_timeline(TIMELINE_ID, Lsn(0))?;
|
||||
let writer = tline.writer();
|
||||
|
||||
writer.put_page_image(TESTREL_A, 0, Lsn(0x20), TEST_IMG("foo blk 0 at 2"))?;
|
||||
@@ -465,7 +477,7 @@ mod tests {
|
||||
let repo = RepoHarness::create("test_drop_extend")?.load();
|
||||
|
||||
// Create timeline to work on
|
||||
let tline = repo.create_empty_timeline(TIMELINE_ID)?;
|
||||
let tline = repo.create_empty_timeline(TIMELINE_ID, Lsn(0))?;
|
||||
let writer = tline.writer();
|
||||
|
||||
writer.put_page_image(TESTREL_A, 0, Lsn(0x20), TEST_IMG("foo blk 0 at 2"))?;
|
||||
@@ -502,7 +514,7 @@ mod tests {
|
||||
let repo = RepoHarness::create("test_truncate_extend")?.load();
|
||||
|
||||
// Create timeline to work on
|
||||
let tline = repo.create_empty_timeline(TIMELINE_ID)?;
|
||||
let tline = repo.create_empty_timeline(TIMELINE_ID, Lsn(0))?;
|
||||
let writer = tline.writer();
|
||||
|
||||
//from storage_layer.rs
|
||||
@@ -601,7 +613,7 @@ mod tests {
|
||||
#[test]
|
||||
fn test_large_rel() -> Result<()> {
|
||||
let repo = RepoHarness::create("test_large_rel")?.load();
|
||||
let tline = repo.create_empty_timeline(TIMELINE_ID)?;
|
||||
let tline = repo.create_empty_timeline(TIMELINE_ID, Lsn(0))?;
|
||||
let writer = tline.writer();
|
||||
|
||||
let mut lsn = 0x10;
|
||||
@@ -664,7 +676,7 @@ mod tests {
|
||||
#[test]
|
||||
fn test_list_rels_drop() -> Result<()> {
|
||||
let repo = RepoHarness::create("test_list_rels_drop")?.load();
|
||||
let tline = repo.create_empty_timeline(TIMELINE_ID)?;
|
||||
let tline = repo.create_empty_timeline(TIMELINE_ID, Lsn(0))?;
|
||||
let writer = tline.writer();
|
||||
const TESTDB: u32 = 111;
|
||||
|
||||
@@ -722,7 +734,7 @@ mod tests {
|
||||
#[test]
|
||||
fn test_branch() -> Result<()> {
|
||||
let repo = RepoHarness::create("test_branch")?.load();
|
||||
let tline = repo.create_empty_timeline(TIMELINE_ID)?;
|
||||
let tline = repo.create_empty_timeline(TIMELINE_ID, Lsn(0))?;
|
||||
let writer = tline.writer();
|
||||
|
||||
// Import initial dummy checkpoint record, otherwise the get_timeline() call
|
||||
@@ -818,7 +830,7 @@ mod tests {
|
||||
let repo =
|
||||
RepoHarness::create("test_prohibit_branch_creation_on_garbage_collected_data")?.load();
|
||||
|
||||
let tline = repo.create_empty_timeline(TIMELINE_ID)?;
|
||||
let tline = repo.create_empty_timeline(TIMELINE_ID, Lsn(0))?;
|
||||
make_some_layers(&tline, Lsn(0x20))?;
|
||||
|
||||
// this removes layers before lsn 40 (50 minus 10), so there are two remaining layers, image and delta for 31-50
|
||||
@@ -827,9 +839,35 @@ mod tests {
|
||||
// try to branch at lsn 25, should fail because we already garbage collected the data
|
||||
match repo.branch_timeline(TIMELINE_ID, NEW_TIMELINE_ID, Lsn(0x25)) {
|
||||
Ok(_) => panic!("branching should have failed"),
|
||||
Err(err) => assert!(err
|
||||
.to_string()
|
||||
.contains("we might've already garbage collected needed data")),
|
||||
Err(err) => {
|
||||
assert!(err.to_string().contains("invalid branch start lsn"));
|
||||
assert!(err
|
||||
.source()
|
||||
.unwrap()
|
||||
.to_string()
|
||||
.contains("we might've already garbage collected needed data"))
|
||||
}
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_prohibit_branch_creation_on_pre_initdb_lsn() -> Result<()> {
|
||||
let repo = RepoHarness::create("test_prohibit_branch_creation_on_pre_initdb_lsn")?.load();
|
||||
|
||||
repo.create_empty_timeline(TIMELINE_ID, Lsn(0x50))?;
|
||||
// try to branch at lsn 0x25, should fail because initdb lsn is 0x50
|
||||
match repo.branch_timeline(TIMELINE_ID, NEW_TIMELINE_ID, Lsn(0x25)) {
|
||||
Ok(_) => panic!("branching should have failed"),
|
||||
Err(err) => {
|
||||
assert!(&err.to_string().contains("invalid branch start lsn"));
|
||||
assert!(&err
|
||||
.source()
|
||||
.unwrap()
|
||||
.to_string()
|
||||
.contains("is earlier than initdb lsn"));
|
||||
}
|
||||
}
|
||||
|
||||
Ok(())
|
||||
@@ -841,7 +879,7 @@ mod tests {
|
||||
RepoHarness::create("test_prohibit_get_page_at_lsn_for_garbage_collected_pages")?
|
||||
.load();
|
||||
|
||||
let tline = repo.create_empty_timeline(TIMELINE_ID)?;
|
||||
let tline = repo.create_empty_timeline(TIMELINE_ID, Lsn(0))?;
|
||||
make_some_layers(&tline, Lsn(0x20))?;
|
||||
|
||||
repo.gc_iteration(Some(TIMELINE_ID), 0x10, false)?;
|
||||
@@ -859,7 +897,7 @@ mod tests {
|
||||
fn test_retain_data_in_parent_which_is_needed_for_child() -> Result<()> {
|
||||
let repo =
|
||||
RepoHarness::create("test_retain_data_in_parent_which_is_needed_for_child")?.load();
|
||||
let tline = repo.create_empty_timeline(TIMELINE_ID)?;
|
||||
let tline = repo.create_empty_timeline(TIMELINE_ID, Lsn(0))?;
|
||||
|
||||
make_some_layers(&tline, Lsn(0x20))?;
|
||||
|
||||
@@ -877,7 +915,7 @@ mod tests {
|
||||
fn test_parent_keeps_data_forever_after_branching() -> Result<()> {
|
||||
let harness = RepoHarness::create("test_parent_keeps_data_forever_after_branching")?;
|
||||
let repo = harness.load();
|
||||
let tline = repo.create_empty_timeline(TIMELINE_ID)?;
|
||||
let tline = repo.create_empty_timeline(TIMELINE_ID, Lsn(0))?;
|
||||
|
||||
make_some_layers(&tline, Lsn(0x20))?;
|
||||
|
||||
@@ -913,7 +951,7 @@ mod tests {
|
||||
let harness = RepoHarness::create(TEST_NAME)?;
|
||||
let repo = harness.load();
|
||||
|
||||
repo.create_empty_timeline(TIMELINE_ID)?;
|
||||
repo.create_empty_timeline(TIMELINE_ID, Lsn(0))?;
|
||||
drop(repo);
|
||||
|
||||
let metadata_path = harness.timeline_path(&TIMELINE_ID).join(METADATA_FILE_NAME);
|
||||
@@ -927,7 +965,11 @@ mod tests {
|
||||
|
||||
let new_repo = harness.load();
|
||||
let err = new_repo.get_timeline(TIMELINE_ID).err().unwrap();
|
||||
assert!(err.to_string().contains("checksum"));
|
||||
assert_eq!(err.to_string(), "failed to load metadata");
|
||||
assert_eq!(
|
||||
err.source().unwrap().to_string(),
|
||||
"metadata checksum mismatch"
|
||||
);
|
||||
|
||||
Ok(())
|
||||
}
|
||||
@@ -938,7 +980,7 @@ mod tests {
|
||||
let harness = RepoHarness::create(TEST_NAME)?;
|
||||
let repo = harness.load();
|
||||
|
||||
repo.create_empty_timeline(TIMELINE_ID)?;
|
||||
repo.create_empty_timeline(TIMELINE_ID, Lsn(0))?;
|
||||
drop(repo);
|
||||
|
||||
let timeline_path = harness.timeline_path(&TIMELINE_ID);
|
||||
|
||||
@@ -100,23 +100,18 @@ def test_branch_behind(zenith_simple_env: ZenithEnv):
|
||||
assert cur.fetchone() == (1, )
|
||||
|
||||
# branch at pre-initdb lsn
|
||||
#
|
||||
# FIXME: This works currently, but probably shouldn't be allowed
|
||||
try:
|
||||
with pytest.raises(Exception, match="invalid branch start lsn"):
|
||||
env.zenith_cli(["branch", "test_branch_preinitdb", "test_branch_behind@0/42"])
|
||||
# FIXME: assert false, "branch with invalid LSN should have failed"
|
||||
except subprocess.CalledProcessError:
|
||||
log.info("Branch creation with pre-initdb LSN failed (as expected)")
|
||||
|
||||
# check that we cannot create branch based on garbage collected data
|
||||
with closing(env.pageserver.connect()) as psconn:
|
||||
with psconn.cursor(cursor_factory=psycopg2.extras.DictCursor) as pscur:
|
||||
# call gc to advace latest_gc_cutoff
|
||||
# call gc to advace latest_gc_cutoff_lsn
|
||||
pscur.execute(f"do_gc {env.initial_tenant} {timeline} 0")
|
||||
row = pscur.fetchone()
|
||||
print_gc_result(row)
|
||||
|
||||
with pytest.raises(Exception, match="(we might've already garbage collected needed data)"):
|
||||
with pytest.raises(Exception, match="invalid branch start lsn"):
|
||||
# this gced_lsn is pretty random, so if gc is disabled this woudln't fail
|
||||
env.zenith_cli(["branch", "test_branch_create_fail", f"test_branch_behind@{gced_lsn}"])
|
||||
|
||||
|
||||
@@ -86,6 +86,6 @@ def test_readonly_node(zenith_simple_env: ZenithEnv):
|
||||
assert cur.fetchone() == (1, )
|
||||
|
||||
# Create node at pre-initdb lsn
|
||||
with pytest.raises(Exception, match='extracting base backup failed'):
|
||||
with pytest.raises(Exception, match="invalid basebackup lsn"):
|
||||
# compute node startup with invalid LSN should fail
|
||||
env.zenith_cli(["pg", "start", "test_branch_preinitdb", "test_readonly_node@0/42"])
|
||||
env.zenith_cli(["pg", "start", "test_readonly_node_preinitdb", "test_readonly_node@0/42"])
|
||||
|
||||
@@ -233,7 +233,7 @@ fn main() -> Result<()> {
|
||||
}
|
||||
};
|
||||
if let Err(e) = subcmd_result {
|
||||
eprintln!("command failed: {}", e);
|
||||
eprintln!("command failed: {:#}", e);
|
||||
exit(1);
|
||||
}
|
||||
|
||||
|
||||
@@ -450,7 +450,21 @@ impl PostgresBackend {
|
||||
}
|
||||
|
||||
FeMessage::Execute(_) => {
|
||||
handler.process_query(self, unnamed_query_string.clone())?;
|
||||
trace!("got execute {:?}", unnamed_query_string);
|
||||
// xxx distinguish fatal and recoverable errors?
|
||||
if let Err(e) = handler.process_query(self, unnamed_query_string.clone()) {
|
||||
let errmsg = format!("{}", e);
|
||||
|
||||
warn!(
|
||||
"query handler for {:?} failed: {:#}",
|
||||
unnamed_query_string, e
|
||||
);
|
||||
self.write_message(&BeMessage::ErrorResponse(errmsg))?;
|
||||
}
|
||||
// NOTE there is no ReadyForQuery message. This handler is used
|
||||
// for basebackup and it uses CopyOut which doesnt require
|
||||
// ReadyForQuery message and backend just switches back to
|
||||
// processing mode after sending CopyDone or ErrorResponse.
|
||||
}
|
||||
|
||||
FeMessage::Sync => {
|
||||
|
||||
Reference in New Issue
Block a user