mirror of
https://github.com/neondatabase/neon.git
synced 2026-01-08 05:52:55 +00:00
Rework basebackup.
* add lsn argument * do not expose wait_lsn, wait inside list_nonrels() * fix parameters parsing * expose get_last_record_rlsn() to atomically read (last,prev) pair More work is needed to correctly handle basebackup@old_lsn but current approach already allows to fix test_restart_compute
This commit is contained in:
@@ -22,7 +22,7 @@ use crate::relish::*;
|
||||
use crate::repository::Timeline;
|
||||
use postgres_ffi::xlog_utils::*;
|
||||
use postgres_ffi::*;
|
||||
use zenith_utils::lsn::Lsn;
|
||||
use zenith_utils::lsn::{Lsn, RecordLsn};
|
||||
|
||||
/// This is short-living object only for the time of tarball creation,
|
||||
/// created mostly to avoid passing a lot of parameters between various functions
|
||||
@@ -34,13 +34,36 @@ pub struct Basebackup<'a> {
|
||||
prev_record_lsn: Lsn,
|
||||
}
|
||||
|
||||
// Create basebackup with non-rel data in it. Omit relational data.
|
||||
//
|
||||
// Currently we use empty lsn in two cases:
|
||||
// * During the basebackup right after timeline creation
|
||||
// * When working without safekeepers. In this situation it is important to match the lsn
|
||||
// we are taking basebackup on with the lsn that is used in pageserver's walreceiver
|
||||
// to start the replication.
|
||||
impl<'a> Basebackup<'a> {
|
||||
pub fn new(
|
||||
write: &'a mut dyn Write,
|
||||
timeline: &'a Arc<dyn Timeline>,
|
||||
lsn: Lsn,
|
||||
prev_record_lsn: Lsn,
|
||||
req_lsn: Option<Lsn>,
|
||||
) -> Basebackup<'a> {
|
||||
let RecordLsn {
|
||||
last: lsn,
|
||||
prev: prev_record_lsn,
|
||||
} = if let Some(lsn) = req_lsn {
|
||||
// FIXME: that wouldn't work since we don't know prev for old LSN's.
|
||||
// Probably it is better to avoid using prev in compute node start
|
||||
// at all and acept the fact that first WAL record in the timeline would
|
||||
// have zero as prev. https://github.com/zenithdb/zenith/issues/506
|
||||
RecordLsn {
|
||||
last: lsn,
|
||||
prev: lsn,
|
||||
}
|
||||
} else {
|
||||
// Atomically get last and prev LSN's
|
||||
timeline.get_last_record_rlsn()
|
||||
};
|
||||
|
||||
Basebackup {
|
||||
ar: Builder::new(write),
|
||||
timeline,
|
||||
|
||||
@@ -38,8 +38,8 @@ use crate::{ZTenantId, ZTimelineId};
|
||||
use zenith_metrics::{register_histogram, Histogram};
|
||||
use zenith_metrics::{register_histogram_vec, HistogramVec};
|
||||
use zenith_utils::bin_ser::BeSer;
|
||||
use zenith_utils::lsn::{AtomicLsn, Lsn};
|
||||
use zenith_utils::seqwait::{MonotonicCounter, SeqWait};
|
||||
use zenith_utils::lsn::{AtomicLsn, Lsn, RecordLsn};
|
||||
use zenith_utils::seqwait::SeqWait;
|
||||
|
||||
mod blob;
|
||||
mod delta_layer;
|
||||
@@ -147,8 +147,6 @@ impl Repository for LayeredRepository {
|
||||
/// Branch a timeline
|
||||
fn branch_timeline(&self, src: ZTimelineId, dst: ZTimelineId, start_lsn: Lsn) -> Result<()> {
|
||||
let src_timeline = self.get_timeline(src)?;
|
||||
// This LSN comes from the user request. Make sure it is aligned.
|
||||
let start_lsn = start_lsn.aligned();
|
||||
|
||||
// Create the metadata file, noting the ancestor of the new timeline.
|
||||
// There is initially no data in it, but all the read-calls know to look
|
||||
@@ -458,24 +456,6 @@ pub struct TimelineMetadata {
|
||||
ancestor_lsn: Lsn,
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, Copy)]
|
||||
struct RecordLsn {
|
||||
last: Lsn,
|
||||
prev: Lsn,
|
||||
}
|
||||
|
||||
impl MonotonicCounter<Lsn> for RecordLsn {
|
||||
fn cnt_advance(&mut self, lsn: Lsn) {
|
||||
assert!(self.last <= lsn);
|
||||
let new_prev = self.last;
|
||||
self.last = lsn;
|
||||
self.prev = new_prev;
|
||||
}
|
||||
fn cnt_value(&self) -> Lsn {
|
||||
self.last
|
||||
}
|
||||
}
|
||||
|
||||
pub struct LayeredTimeline {
|
||||
conf: &'static PageServerConf,
|
||||
|
||||
@@ -629,6 +609,8 @@ impl Timeline for LayeredTimeline {
|
||||
fn list_nonrels(&self, lsn: Lsn) -> Result<HashSet<RelishTag>> {
|
||||
info!("list_nonrels called at {}", lsn);
|
||||
|
||||
let lsn = self.wait_lsn(lsn)?;
|
||||
|
||||
// List all nonrels in this timeline, and all its ancestors.
|
||||
let mut all_rels = HashSet::new();
|
||||
let mut timeline = self;
|
||||
@@ -794,6 +776,10 @@ impl Timeline for LayeredTimeline {
|
||||
fn get_prev_record_lsn(&self) -> Lsn {
|
||||
self.last_record_lsn.load().prev
|
||||
}
|
||||
|
||||
fn get_last_record_rlsn(&self) -> RecordLsn {
|
||||
self.last_record_lsn.load()
|
||||
}
|
||||
}
|
||||
|
||||
impl LayeredTimeline {
|
||||
@@ -1052,20 +1038,23 @@ impl LayeredTimeline {
|
||||
///
|
||||
/// Wait until WAL has been received up to the given LSN.
|
||||
///
|
||||
fn wait_lsn(&self, mut lsn: Lsn) -> anyhow::Result<Lsn> {
|
||||
/// TODO: change lsn to be an Optional<Lsn> or even force callers to call get_last_record_lsn()
|
||||
/// (since at least basebackup needs to call get_last_and_prev_record_lsns() instead)
|
||||
fn wait_lsn(&self, lsn: Lsn) -> anyhow::Result<Lsn> {
|
||||
// When invalid LSN is requested, it means "don't wait, return latest version of the page"
|
||||
// This is necessary for bootstrap.
|
||||
if lsn == Lsn(0) {
|
||||
let last_valid_lsn = self.last_valid_lsn.load();
|
||||
trace!(
|
||||
"walreceiver doesn't work yet last_valid_lsn {}, requested {}",
|
||||
last_valid_lsn,
|
||||
lsn
|
||||
);
|
||||
lsn = last_valid_lsn;
|
||||
return Ok(self.get_last_record_lsn());
|
||||
}
|
||||
|
||||
self.last_valid_lsn
|
||||
// FIXME: we can deadlock if we call wait_lsn() from WAL receiver. And we actually
|
||||
// it a lot from there. Only deadlock that I caught was while trying to add wait_lsn()
|
||||
// in list_rels(). But it makes sense to make all functions in timeline non-waiting;
|
||||
// assert that arg_lsn <= current_record_lsn; call wait_lsn explicetly where it is
|
||||
// needed (page_service and basebackup); uncomment this check:
|
||||
// assert_ne!(thread::current().name(), Some("WAL receiver thread"));
|
||||
|
||||
self.last_record_lsn
|
||||
.wait_for_timeout(lsn, TIMEOUT)
|
||||
.with_context(|| {
|
||||
format!(
|
||||
|
||||
@@ -347,17 +347,9 @@ impl PageServerHandler {
|
||||
info!("sent CopyOut");
|
||||
|
||||
/* Send a tarball of the latest snapshot on the timeline */
|
||||
|
||||
let req_lsn = lsn.unwrap_or_else(|| timeline.get_last_record_lsn());
|
||||
|
||||
{
|
||||
let mut writer = CopyDataSink { pgb };
|
||||
let mut basebackup = basebackup::Basebackup::new(
|
||||
&mut writer,
|
||||
&timeline,
|
||||
req_lsn,
|
||||
timeline.get_prev_record_lsn(),
|
||||
);
|
||||
let mut basebackup = basebackup::Basebackup::new(&mut writer, &timeline, lsn);
|
||||
basebackup.send_tarball()?;
|
||||
}
|
||||
pgb.write_message(&BeMessage::CopyDone)?;
|
||||
@@ -446,9 +438,10 @@ impl postgres_backend::Handler for PageServerHandler {
|
||||
self.handle_pagerequests(pgb, timelineid, tenantid)?;
|
||||
} else if query_string.starts_with("basebackup ") {
|
||||
let (_, params_raw) = query_string.split_at("basebackup ".len());
|
||||
let params = params_raw.split(" ").collect::<Vec<_>>();
|
||||
let params = params_raw.split_whitespace().collect::<Vec<_>>();
|
||||
|
||||
ensure!(
|
||||
params.len() == 2,
|
||||
params.len() >= 2,
|
||||
"invalid param number for basebackup command"
|
||||
);
|
||||
|
||||
@@ -457,12 +450,12 @@ impl postgres_backend::Handler for PageServerHandler {
|
||||
|
||||
self.check_permission(Some(tenantid))?;
|
||||
|
||||
// TODO are there any tests with lsn option?
|
||||
let lsn = if params.len() == 3 {
|
||||
Some(Lsn::from_str(params[2])?)
|
||||
} else {
|
||||
None
|
||||
};
|
||||
|
||||
info!(
|
||||
"got basebackup command. tenantid=\"{}\" timelineid=\"{}\" lsn=\"{:#?}\"",
|
||||
tenantid, timelineid, lsn
|
||||
|
||||
@@ -6,7 +6,7 @@ use std::collections::HashSet;
|
||||
use std::ops::AddAssign;
|
||||
use std::sync::Arc;
|
||||
use std::time::Duration;
|
||||
use zenith_utils::lsn::Lsn;
|
||||
use zenith_utils::lsn::{Lsn, RecordLsn};
|
||||
use zenith_utils::zid::ZTimelineId;
|
||||
|
||||
///
|
||||
@@ -142,6 +142,9 @@ pub trait Timeline: Send + Sync {
|
||||
/// Advance requires aligned LSN as an argument and would wake wait_lsn() callers.
|
||||
/// Previous last record LSN is stored alongside the latest and can be read.
|
||||
fn advance_last_record_lsn(&self, lsn: Lsn);
|
||||
/// Atomically get both last and prev.
|
||||
fn get_last_record_rlsn(&self) -> RecordLsn;
|
||||
/// Get last or prev record separately. Same as get_last_record_rlsn().last/prev.
|
||||
fn get_last_record_lsn(&self) -> Lsn;
|
||||
fn get_prev_record_lsn(&self) -> Lsn;
|
||||
|
||||
|
||||
Reference in New Issue
Block a user