mirror of
https://github.com/neondatabase/neon.git
synced 2026-01-14 00:42:54 +00:00
Implement page servise 'fullbackup' endpoint that works like basebackup, but also sends relational files
This commit is contained in:
@@ -21,7 +21,7 @@ use std::time::SystemTime;
|
||||
use tar::{Builder, EntryType, Header};
|
||||
use tracing::*;
|
||||
|
||||
use crate::reltag::SlruKind;
|
||||
use crate::reltag::{RelTag, SlruKind};
|
||||
use crate::repository::Timeline;
|
||||
use crate::DatadirTimelineImpl;
|
||||
use postgres_ffi::xlog_utils::*;
|
||||
@@ -39,11 +39,12 @@ where
|
||||
timeline: &'a Arc<DatadirTimelineImpl>,
|
||||
pub lsn: Lsn,
|
||||
prev_record_lsn: Lsn,
|
||||
|
||||
full_backup: bool,
|
||||
finished: bool,
|
||||
}
|
||||
|
||||
// Create basebackup with non-rel data in it. Omit relational data.
|
||||
// Create basebackup with non-rel data in it.
|
||||
// Only include relational data if 'full_backup' is true.
|
||||
//
|
||||
// Currently we use empty lsn in two cases:
|
||||
// * During the basebackup right after timeline creation
|
||||
@@ -58,6 +59,7 @@ where
|
||||
write: W,
|
||||
timeline: &'a Arc<DatadirTimelineImpl>,
|
||||
req_lsn: Option<Lsn>,
|
||||
full_backup: bool,
|
||||
) -> Result<Basebackup<'a, W>> {
|
||||
// Compute postgres doesn't have any previous WAL files, but the first
|
||||
// record that it's going to write needs to include the LSN of the
|
||||
@@ -94,8 +96,8 @@ where
|
||||
};
|
||||
|
||||
info!(
|
||||
"taking basebackup lsn={}, prev_lsn={}",
|
||||
backup_lsn, backup_prev
|
||||
"taking basebackup lsn={}, prev_lsn={} (full_backup={})",
|
||||
backup_lsn, backup_prev, full_backup
|
||||
);
|
||||
|
||||
Ok(Basebackup {
|
||||
@@ -103,6 +105,7 @@ where
|
||||
timeline,
|
||||
lsn: backup_lsn,
|
||||
prev_record_lsn: backup_prev,
|
||||
full_backup,
|
||||
finished: false,
|
||||
})
|
||||
}
|
||||
@@ -140,6 +143,13 @@ where
|
||||
// Create tablespace directories
|
||||
for ((spcnode, dbnode), has_relmap_file) in self.timeline.list_dbdirs(self.lsn)? {
|
||||
self.add_dbdir(spcnode, dbnode, has_relmap_file)?;
|
||||
|
||||
// Gather and send relational files in each database if full backup is requested.
|
||||
if self.full_backup {
|
||||
for rel in self.timeline.list_rels(spcnode, dbnode, self.lsn)? {
|
||||
self.add_rel(rel)?;
|
||||
}
|
||||
}
|
||||
}
|
||||
for xid in self.timeline.list_twophase_files(self.lsn)? {
|
||||
self.add_twophase_file(xid)?;
|
||||
@@ -157,6 +167,58 @@ where
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn add_rel(&mut self, tag: RelTag) -> anyhow::Result<()> {
|
||||
let nblocks = self.timeline.get_rel_size(tag, self.lsn)?;
|
||||
|
||||
let mut rel_seg_buf: Vec<u8> =
|
||||
Vec::with_capacity(nblocks as usize * pg_constants::BLCKSZ as usize);
|
||||
let mut blknum = 0;
|
||||
|
||||
loop {
|
||||
if nblocks > 0 {
|
||||
let img = self.timeline.get_rel_page_at_lsn(tag, blknum, self.lsn)?;
|
||||
|
||||
ensure!(img.len() == pg_constants::BLCKSZ as usize);
|
||||
|
||||
rel_seg_buf.extend_from_slice(&img[..pg_constants::BLCKSZ as usize]);
|
||||
}
|
||||
|
||||
if (blknum != 0 && blknum % pg_constants::RELSEG_SIZE == 0) || blknum == nblocks {
|
||||
let segname = if tag.spcnode == pg_constants::GLOBALTABLESPACE_OID {
|
||||
format!(
|
||||
"global/{}",
|
||||
tag.to_segfile_name(blknum / pg_constants::RELSEG_SIZE)
|
||||
)
|
||||
} else {
|
||||
format!(
|
||||
"base/{}/{}",
|
||||
tag.dbnode,
|
||||
tag.to_segfile_name(blknum / pg_constants::RELSEG_SIZE)
|
||||
)
|
||||
};
|
||||
|
||||
let header = new_tar_header(&segname, rel_seg_buf.len() as u64)?;
|
||||
self.ar.append(&header, rel_seg_buf.as_slice())?;
|
||||
debug!(
|
||||
"Added to fullbackup relseg {} blknum {}/nblocks {}, len {}",
|
||||
segname,
|
||||
blknum,
|
||||
nblocks,
|
||||
rel_seg_buf.len()
|
||||
);
|
||||
rel_seg_buf.clear();
|
||||
}
|
||||
|
||||
if blknum == nblocks {
|
||||
break;
|
||||
}
|
||||
|
||||
blknum += 1;
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
//
|
||||
// Generate SLRU segment files from repository.
|
||||
//
|
||||
@@ -312,21 +374,24 @@ where
|
||||
pg_control.checkPointCopy = checkpoint;
|
||||
pg_control.state = pg_constants::DB_SHUTDOWNED;
|
||||
|
||||
// add zenith.signal file
|
||||
let mut zenith_signal = String::new();
|
||||
if self.prev_record_lsn == Lsn(0) {
|
||||
if self.lsn == self.timeline.tline.get_ancestor_lsn() {
|
||||
write!(zenith_signal, "PREV LSN: none")?;
|
||||
// Postgres doesn't recognize the zenith.signal file and doesn't need it.
|
||||
if !self.full_backup {
|
||||
// add zenith.signal file
|
||||
let mut zenith_signal = String::new();
|
||||
if self.prev_record_lsn == Lsn(0) {
|
||||
if self.lsn == self.timeline.tline.get_ancestor_lsn() {
|
||||
write!(zenith_signal, "PREV LSN: none")?;
|
||||
} else {
|
||||
write!(zenith_signal, "PREV LSN: invalid")?;
|
||||
}
|
||||
} else {
|
||||
write!(zenith_signal, "PREV LSN: invalid")?;
|
||||
write!(zenith_signal, "PREV LSN: {}", self.prev_record_lsn)?;
|
||||
}
|
||||
} else {
|
||||
write!(zenith_signal, "PREV LSN: {}", self.prev_record_lsn)?;
|
||||
self.ar.append(
|
||||
&new_tar_header("zenith.signal", zenith_signal.len() as u64)?,
|
||||
zenith_signal.as_bytes(),
|
||||
)?;
|
||||
}
|
||||
self.ar.append(
|
||||
&new_tar_header("zenith.signal", zenith_signal.len() as u64)?,
|
||||
zenith_signal.as_bytes(),
|
||||
)?;
|
||||
|
||||
//send pg_control
|
||||
let pg_control_bytes = pg_control.encode();
|
||||
|
||||
@@ -596,6 +596,7 @@ impl PageServerHandler {
|
||||
timelineid: ZTimelineId,
|
||||
lsn: Option<Lsn>,
|
||||
tenantid: ZTenantId,
|
||||
full_backup: bool,
|
||||
) -> anyhow::Result<()> {
|
||||
let span = info_span!("basebackup", timeline = %timelineid, tenant = %tenantid, lsn = field::Empty);
|
||||
let _enter = span.enter();
|
||||
@@ -618,7 +619,7 @@ impl PageServerHandler {
|
||||
{
|
||||
let mut writer = CopyDataSink { pgb };
|
||||
|
||||
let basebackup = basebackup::Basebackup::new(&mut writer, &timeline, lsn)?;
|
||||
let basebackup = basebackup::Basebackup::new(&mut writer, &timeline, lsn, full_backup)?;
|
||||
span.record("lsn", &basebackup.lsn.to_string().as_str());
|
||||
basebackup.send_tarball()?;
|
||||
}
|
||||
@@ -721,7 +722,33 @@ impl postgres_backend::Handler for PageServerHandler {
|
||||
};
|
||||
|
||||
// Check that the timeline exists
|
||||
self.handle_basebackup_request(pgb, timelineid, lsn, tenantid)?;
|
||||
self.handle_basebackup_request(pgb, timelineid, lsn, tenantid, false)?;
|
||||
pgb.write_message_noflush(&BeMessage::CommandComplete(b"SELECT 1"))?;
|
||||
}
|
||||
// same as basebackup, but result includes relational data as well
|
||||
else if query_string.starts_with("fullbackup ") {
|
||||
let (_, params_raw) = query_string.split_at("fullbackup ".len());
|
||||
let params = params_raw.split_whitespace().collect::<Vec<_>>();
|
||||
|
||||
ensure!(
|
||||
params.len() == 3,
|
||||
"invalid param number for fullbackup command"
|
||||
);
|
||||
|
||||
let tenantid = ZTenantId::from_str(params[0])?;
|
||||
let timelineid = ZTimelineId::from_str(params[1])?;
|
||||
|
||||
self.check_permission(Some(tenantid))?;
|
||||
|
||||
// Lsn is required for fullbackup, because otherwise we would not know
|
||||
// at which lsn to upload this backup.
|
||||
//
|
||||
// The caller is responsible for providing a valid lsn
|
||||
// and using it in the subsequent import.
|
||||
let lsn = Some(Lsn::from_str(params[2])?);
|
||||
|
||||
// Check that the timeline exists
|
||||
self.handle_basebackup_request(pgb, timelineid, lsn, tenantid, true)?;
|
||||
pgb.write_message_noflush(&BeMessage::CommandComplete(b"SELECT 1"))?;
|
||||
} else if query_string.to_ascii_lowercase().starts_with("set ") {
|
||||
// important because psycopg2 executes "SET datestyle TO 'ISO'"
|
||||
|
||||
@@ -75,6 +75,27 @@ impl fmt::Display for RelTag {
|
||||
}
|
||||
}
|
||||
|
||||
impl RelTag {
|
||||
/// Formats:
|
||||
/// <oid>
|
||||
/// <oid>_<fork name>
|
||||
/// <oid>.<segment number>
|
||||
/// <oid>_<fork name>.<segment number>
|
||||
pub fn to_segfile_name(&self, segno: u32) -> String {
|
||||
if segno == 0 {
|
||||
if let Some(forkname) = forknumber_to_name(self.forknum) {
|
||||
format!("{}_{}", self.relnode, forkname)
|
||||
} else {
|
||||
format!("{}", self.relnode)
|
||||
}
|
||||
} else if let Some(forkname) = forknumber_to_name(self.forknum) {
|
||||
format!("{}_{}.{}", self.relnode, forkname, segno)
|
||||
} else {
|
||||
format!("{}.{}", self.relnode, segno)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
///
|
||||
/// Non-relation transaction status files (clog (a.k.a. pg_xact) and
|
||||
/// pg_multixact) in Postgres are handled by SLRU (Simple LRU) buffer,
|
||||
|
||||
Reference in New Issue
Block a user