mirror of
https://github.com/neondatabase/neon.git
synced 2026-05-14 11:40:38 +00:00
Implement page servise 'fullbackup' endpoint (#1923)
* Implement page servise 'fullbackup' endpoint that works like basebackup, but also sends relational files * Add test_runner/batch_others/test_fullbackup.py Co-authored-by: bojanserafimov <bojan.serafimov7@gmail.com>
This commit is contained in:
committed by
GitHub
parent
d11c9f9fcb
commit
36ee182d26
@@ -13,6 +13,7 @@
|
|||||||
use anyhow::{anyhow, bail, ensure, Context, Result};
|
use anyhow::{anyhow, bail, ensure, Context, Result};
|
||||||
use bytes::{BufMut, BytesMut};
|
use bytes::{BufMut, BytesMut};
|
||||||
use fail::fail_point;
|
use fail::fail_point;
|
||||||
|
use itertools::Itertools;
|
||||||
use std::fmt::Write as FmtWrite;
|
use std::fmt::Write as FmtWrite;
|
||||||
use std::io;
|
use std::io;
|
||||||
use std::io::Write;
|
use std::io::Write;
|
||||||
@@ -21,7 +22,7 @@ use std::time::SystemTime;
|
|||||||
use tar::{Builder, EntryType, Header};
|
use tar::{Builder, EntryType, Header};
|
||||||
use tracing::*;
|
use tracing::*;
|
||||||
|
|
||||||
use crate::reltag::SlruKind;
|
use crate::reltag::{RelTag, SlruKind};
|
||||||
use crate::repository::Timeline;
|
use crate::repository::Timeline;
|
||||||
use crate::DatadirTimelineImpl;
|
use crate::DatadirTimelineImpl;
|
||||||
use postgres_ffi::xlog_utils::*;
|
use postgres_ffi::xlog_utils::*;
|
||||||
@@ -39,11 +40,12 @@ where
|
|||||||
timeline: &'a Arc<DatadirTimelineImpl>,
|
timeline: &'a Arc<DatadirTimelineImpl>,
|
||||||
pub lsn: Lsn,
|
pub lsn: Lsn,
|
||||||
prev_record_lsn: Lsn,
|
prev_record_lsn: Lsn,
|
||||||
|
full_backup: bool,
|
||||||
finished: bool,
|
finished: bool,
|
||||||
}
|
}
|
||||||
|
|
||||||
// Create basebackup with non-rel data in it. Omit relational data.
|
// Create basebackup with non-rel data in it.
|
||||||
|
// Only include relational data if 'full_backup' is true.
|
||||||
//
|
//
|
||||||
// Currently we use empty lsn in two cases:
|
// Currently we use empty lsn in two cases:
|
||||||
// * During the basebackup right after timeline creation
|
// * During the basebackup right after timeline creation
|
||||||
@@ -58,6 +60,7 @@ where
|
|||||||
write: W,
|
write: W,
|
||||||
timeline: &'a Arc<DatadirTimelineImpl>,
|
timeline: &'a Arc<DatadirTimelineImpl>,
|
||||||
req_lsn: Option<Lsn>,
|
req_lsn: Option<Lsn>,
|
||||||
|
full_backup: bool,
|
||||||
) -> Result<Basebackup<'a, W>> {
|
) -> Result<Basebackup<'a, W>> {
|
||||||
// Compute postgres doesn't have any previous WAL files, but the first
|
// Compute postgres doesn't have any previous WAL files, but the first
|
||||||
// record that it's going to write needs to include the LSN of the
|
// record that it's going to write needs to include the LSN of the
|
||||||
@@ -94,8 +97,8 @@ where
|
|||||||
};
|
};
|
||||||
|
|
||||||
info!(
|
info!(
|
||||||
"taking basebackup lsn={}, prev_lsn={}",
|
"taking basebackup lsn={}, prev_lsn={} (full_backup={})",
|
||||||
backup_lsn, backup_prev
|
backup_lsn, backup_prev, full_backup
|
||||||
);
|
);
|
||||||
|
|
||||||
Ok(Basebackup {
|
Ok(Basebackup {
|
||||||
@@ -103,6 +106,7 @@ where
|
|||||||
timeline,
|
timeline,
|
||||||
lsn: backup_lsn,
|
lsn: backup_lsn,
|
||||||
prev_record_lsn: backup_prev,
|
prev_record_lsn: backup_prev,
|
||||||
|
full_backup,
|
||||||
finished: false,
|
finished: false,
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
@@ -140,6 +144,13 @@ where
|
|||||||
// Create tablespace directories
|
// Create tablespace directories
|
||||||
for ((spcnode, dbnode), has_relmap_file) in self.timeline.list_dbdirs(self.lsn)? {
|
for ((spcnode, dbnode), has_relmap_file) in self.timeline.list_dbdirs(self.lsn)? {
|
||||||
self.add_dbdir(spcnode, dbnode, has_relmap_file)?;
|
self.add_dbdir(spcnode, dbnode, has_relmap_file)?;
|
||||||
|
|
||||||
|
// Gather and send relational files in each database if full backup is requested.
|
||||||
|
if self.full_backup {
|
||||||
|
for rel in self.timeline.list_rels(spcnode, dbnode, self.lsn)? {
|
||||||
|
self.add_rel(rel)?;
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
for xid in self.timeline.list_twophase_files(self.lsn)? {
|
for xid in self.timeline.list_twophase_files(self.lsn)? {
|
||||||
self.add_twophase_file(xid)?;
|
self.add_twophase_file(xid)?;
|
||||||
@@ -157,6 +168,38 @@ where
|
|||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
|
fn add_rel(&mut self, tag: RelTag) -> anyhow::Result<()> {
|
||||||
|
let nblocks = self.timeline.get_rel_size(tag, self.lsn)?;
|
||||||
|
|
||||||
|
// Function that adds relation segment data to archive
|
||||||
|
let mut add_file = |segment_index, data: &Vec<u8>| -> anyhow::Result<()> {
|
||||||
|
let file_name = tag.to_segfile_name(segment_index as u32);
|
||||||
|
let header = new_tar_header(&file_name, data.len() as u64)?;
|
||||||
|
self.ar.append(&header, data.as_slice())?;
|
||||||
|
Ok(())
|
||||||
|
};
|
||||||
|
|
||||||
|
// If the relation is empty, create an empty file
|
||||||
|
if nblocks == 0 {
|
||||||
|
add_file(0, &vec![])?;
|
||||||
|
return Ok(());
|
||||||
|
}
|
||||||
|
|
||||||
|
// Add a file for each chunk of blocks (aka segment)
|
||||||
|
let chunks = (0..nblocks).chunks(pg_constants::RELSEG_SIZE as usize);
|
||||||
|
for (seg, blocks) in chunks.into_iter().enumerate() {
|
||||||
|
let mut segment_data: Vec<u8> = vec![];
|
||||||
|
for blknum in blocks {
|
||||||
|
let img = self.timeline.get_rel_page_at_lsn(tag, blknum, self.lsn)?;
|
||||||
|
segment_data.extend_from_slice(&img[..]);
|
||||||
|
}
|
||||||
|
|
||||||
|
add_file(seg, &segment_data)?;
|
||||||
|
}
|
||||||
|
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
//
|
//
|
||||||
// Generate SLRU segment files from repository.
|
// Generate SLRU segment files from repository.
|
||||||
//
|
//
|
||||||
@@ -312,21 +355,24 @@ where
|
|||||||
pg_control.checkPointCopy = checkpoint;
|
pg_control.checkPointCopy = checkpoint;
|
||||||
pg_control.state = pg_constants::DB_SHUTDOWNED;
|
pg_control.state = pg_constants::DB_SHUTDOWNED;
|
||||||
|
|
||||||
// add zenith.signal file
|
// Postgres doesn't recognize the zenith.signal file and doesn't need it.
|
||||||
let mut zenith_signal = String::new();
|
if !self.full_backup {
|
||||||
if self.prev_record_lsn == Lsn(0) {
|
// add zenith.signal file
|
||||||
if self.lsn == self.timeline.tline.get_ancestor_lsn() {
|
let mut zenith_signal = String::new();
|
||||||
write!(zenith_signal, "PREV LSN: none")?;
|
if self.prev_record_lsn == Lsn(0) {
|
||||||
|
if self.lsn == self.timeline.tline.get_ancestor_lsn() {
|
||||||
|
write!(zenith_signal, "PREV LSN: none")?;
|
||||||
|
} else {
|
||||||
|
write!(zenith_signal, "PREV LSN: invalid")?;
|
||||||
|
}
|
||||||
} else {
|
} else {
|
||||||
write!(zenith_signal, "PREV LSN: invalid")?;
|
write!(zenith_signal, "PREV LSN: {}", self.prev_record_lsn)?;
|
||||||
}
|
}
|
||||||
} else {
|
self.ar.append(
|
||||||
write!(zenith_signal, "PREV LSN: {}", self.prev_record_lsn)?;
|
&new_tar_header("zenith.signal", zenith_signal.len() as u64)?,
|
||||||
|
zenith_signal.as_bytes(),
|
||||||
|
)?;
|
||||||
}
|
}
|
||||||
self.ar.append(
|
|
||||||
&new_tar_header("zenith.signal", zenith_signal.len() as u64)?,
|
|
||||||
zenith_signal.as_bytes(),
|
|
||||||
)?;
|
|
||||||
|
|
||||||
//send pg_control
|
//send pg_control
|
||||||
let pg_control_bytes = pg_control.encode();
|
let pg_control_bytes = pg_control.encode();
|
||||||
|
|||||||
@@ -596,6 +596,7 @@ impl PageServerHandler {
|
|||||||
timelineid: ZTimelineId,
|
timelineid: ZTimelineId,
|
||||||
lsn: Option<Lsn>,
|
lsn: Option<Lsn>,
|
||||||
tenantid: ZTenantId,
|
tenantid: ZTenantId,
|
||||||
|
full_backup: bool,
|
||||||
) -> anyhow::Result<()> {
|
) -> anyhow::Result<()> {
|
||||||
let span = info_span!("basebackup", timeline = %timelineid, tenant = %tenantid, lsn = field::Empty);
|
let span = info_span!("basebackup", timeline = %timelineid, tenant = %tenantid, lsn = field::Empty);
|
||||||
let _enter = span.enter();
|
let _enter = span.enter();
|
||||||
@@ -618,7 +619,7 @@ impl PageServerHandler {
|
|||||||
{
|
{
|
||||||
let mut writer = CopyDataSink { pgb };
|
let mut writer = CopyDataSink { pgb };
|
||||||
|
|
||||||
let basebackup = basebackup::Basebackup::new(&mut writer, &timeline, lsn)?;
|
let basebackup = basebackup::Basebackup::new(&mut writer, &timeline, lsn, full_backup)?;
|
||||||
span.record("lsn", &basebackup.lsn.to_string().as_str());
|
span.record("lsn", &basebackup.lsn.to_string().as_str());
|
||||||
basebackup.send_tarball()?;
|
basebackup.send_tarball()?;
|
||||||
}
|
}
|
||||||
@@ -721,7 +722,33 @@ impl postgres_backend::Handler for PageServerHandler {
|
|||||||
};
|
};
|
||||||
|
|
||||||
// Check that the timeline exists
|
// Check that the timeline exists
|
||||||
self.handle_basebackup_request(pgb, timelineid, lsn, tenantid)?;
|
self.handle_basebackup_request(pgb, timelineid, lsn, tenantid, false)?;
|
||||||
|
pgb.write_message_noflush(&BeMessage::CommandComplete(b"SELECT 1"))?;
|
||||||
|
}
|
||||||
|
// same as basebackup, but result includes relational data as well
|
||||||
|
else if query_string.starts_with("fullbackup ") {
|
||||||
|
let (_, params_raw) = query_string.split_at("fullbackup ".len());
|
||||||
|
let params = params_raw.split_whitespace().collect::<Vec<_>>();
|
||||||
|
|
||||||
|
ensure!(
|
||||||
|
params.len() == 3,
|
||||||
|
"invalid param number for fullbackup command"
|
||||||
|
);
|
||||||
|
|
||||||
|
let tenantid = ZTenantId::from_str(params[0])?;
|
||||||
|
let timelineid = ZTimelineId::from_str(params[1])?;
|
||||||
|
|
||||||
|
self.check_permission(Some(tenantid))?;
|
||||||
|
|
||||||
|
// Lsn is required for fullbackup, because otherwise we would not know
|
||||||
|
// at which lsn to upload this backup.
|
||||||
|
//
|
||||||
|
// The caller is responsible for providing a valid lsn
|
||||||
|
// and using it in the subsequent import.
|
||||||
|
let lsn = Some(Lsn::from_str(params[2])?);
|
||||||
|
|
||||||
|
// Check that the timeline exists
|
||||||
|
self.handle_basebackup_request(pgb, timelineid, lsn, tenantid, true)?;
|
||||||
pgb.write_message_noflush(&BeMessage::CommandComplete(b"SELECT 1"))?;
|
pgb.write_message_noflush(&BeMessage::CommandComplete(b"SELECT 1"))?;
|
||||||
} else if query_string.to_ascii_lowercase().starts_with("set ") {
|
} else if query_string.to_ascii_lowercase().starts_with("set ") {
|
||||||
// important because psycopg2 executes "SET datestyle TO 'ISO'"
|
// important because psycopg2 executes "SET datestyle TO 'ISO'"
|
||||||
|
|||||||
@@ -3,7 +3,7 @@ use std::cmp::Ordering;
|
|||||||
use std::fmt;
|
use std::fmt;
|
||||||
|
|
||||||
use postgres_ffi::relfile_utils::forknumber_to_name;
|
use postgres_ffi::relfile_utils::forknumber_to_name;
|
||||||
use postgres_ffi::Oid;
|
use postgres_ffi::{pg_constants, Oid};
|
||||||
|
|
||||||
///
|
///
|
||||||
/// Relation data file segment id throughout the Postgres cluster.
|
/// Relation data file segment id throughout the Postgres cluster.
|
||||||
@@ -75,6 +75,30 @@ impl fmt::Display for RelTag {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
impl RelTag {
|
||||||
|
pub fn to_segfile_name(&self, segno: u32) -> String {
|
||||||
|
let mut name = if self.spcnode == pg_constants::GLOBALTABLESPACE_OID {
|
||||||
|
"global/".to_string()
|
||||||
|
} else {
|
||||||
|
format!("base/{}/", self.dbnode)
|
||||||
|
};
|
||||||
|
|
||||||
|
name += &self.relnode.to_string();
|
||||||
|
|
||||||
|
if let Some(fork_name) = forknumber_to_name(self.forknum) {
|
||||||
|
name += "_";
|
||||||
|
name += fork_name;
|
||||||
|
}
|
||||||
|
|
||||||
|
if segno != 0 {
|
||||||
|
name += ".";
|
||||||
|
name += &segno.to_string();
|
||||||
|
}
|
||||||
|
|
||||||
|
name
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
///
|
///
|
||||||
/// Non-relation transaction status files (clog (a.k.a. pg_xact) and
|
/// Non-relation transaction status files (clog (a.k.a. pg_xact) and
|
||||||
/// pg_multixact) in Postgres are handled by SLRU (Simple LRU) buffer,
|
/// pg_multixact) in Postgres are handled by SLRU (Simple LRU) buffer,
|
||||||
|
|||||||
73
test_runner/batch_others/test_fullbackup.py
Normal file
73
test_runner/batch_others/test_fullbackup.py
Normal file
@@ -0,0 +1,73 @@
|
|||||||
|
import subprocess
|
||||||
|
from contextlib import closing
|
||||||
|
|
||||||
|
import psycopg2.extras
|
||||||
|
import pytest
|
||||||
|
from fixtures.log_helper import log
|
||||||
|
from fixtures.neon_fixtures import NeonEnvBuilder, PgBin, PortDistributor, VanillaPostgres
|
||||||
|
from fixtures.neon_fixtures import pg_distrib_dir
|
||||||
|
import os
|
||||||
|
from fixtures.utils import mkdir_if_needed, subprocess_capture
|
||||||
|
import shutil
|
||||||
|
import getpass
|
||||||
|
import pwd
|
||||||
|
|
||||||
|
num_rows = 1000
|
||||||
|
|
||||||
|
|
||||||
|
# Ensure that regular postgres can start from fullbackup
|
||||||
|
def test_fullbackup(neon_env_builder: NeonEnvBuilder,
|
||||||
|
pg_bin: PgBin,
|
||||||
|
port_distributor: PortDistributor):
|
||||||
|
|
||||||
|
neon_env_builder.num_safekeepers = 1
|
||||||
|
env = neon_env_builder.init_start()
|
||||||
|
|
||||||
|
env.neon_cli.create_branch('test_fullbackup')
|
||||||
|
pgmain = env.postgres.create_start('test_fullbackup')
|
||||||
|
log.info("postgres is running on 'test_fullbackup' branch")
|
||||||
|
|
||||||
|
timeline = pgmain.safe_psql("SHOW neon.timeline_id")[0][0]
|
||||||
|
|
||||||
|
with closing(pgmain.connect()) as conn:
|
||||||
|
with conn.cursor() as cur:
|
||||||
|
# data loading may take a while, so increase statement timeout
|
||||||
|
cur.execute("SET statement_timeout='300s'")
|
||||||
|
cur.execute(f'''CREATE TABLE tbl AS SELECT 'long string to consume some space' || g
|
||||||
|
from generate_series(1,{num_rows}) g''')
|
||||||
|
cur.execute("CHECKPOINT")
|
||||||
|
|
||||||
|
cur.execute('SELECT pg_current_wal_insert_lsn()')
|
||||||
|
lsn = cur.fetchone()[0]
|
||||||
|
log.info(f"start_backup_lsn = {lsn}")
|
||||||
|
|
||||||
|
# Set LD_LIBRARY_PATH in the env properly, otherwise we may use the wrong libpq.
|
||||||
|
# PgBin sets it automatically, but here we need to pipe psql output to the tar command.
|
||||||
|
psql_env = {'LD_LIBRARY_PATH': os.path.join(str(pg_distrib_dir), 'lib')}
|
||||||
|
|
||||||
|
# Get and unpack fullbackup from pageserver
|
||||||
|
restored_dir_path = os.path.join(env.repo_dir, "restored_datadir")
|
||||||
|
os.mkdir(restored_dir_path, 0o750)
|
||||||
|
query = f"fullbackup {env.initial_tenant.hex} {timeline} {lsn}"
|
||||||
|
cmd = ["psql", "--no-psqlrc", env.pageserver.connstr(), "-c", query]
|
||||||
|
result_basepath = pg_bin.run_capture(cmd, env=psql_env)
|
||||||
|
tar_output_file = result_basepath + ".stdout"
|
||||||
|
subprocess_capture(str(env.repo_dir), ["tar", "-xf", tar_output_file, "-C", restored_dir_path])
|
||||||
|
|
||||||
|
# HACK
|
||||||
|
# fullbackup returns neon specific pg_control and first WAL segment
|
||||||
|
# use resetwal to overwrite it
|
||||||
|
pg_resetwal_path = os.path.join(pg_bin.pg_bin_path, 'pg_resetwal')
|
||||||
|
cmd = [pg_resetwal_path, "-D", restored_dir_path]
|
||||||
|
pg_bin.run_capture(cmd, env=psql_env)
|
||||||
|
|
||||||
|
# Restore from the backup and find the data we inserted
|
||||||
|
port = port_distributor.get_port()
|
||||||
|
with VanillaPostgres(restored_dir_path, pg_bin, port, init=False) as vanilla_pg:
|
||||||
|
# TODO make port an optional argument
|
||||||
|
vanilla_pg.configure([
|
||||||
|
f"port={port}",
|
||||||
|
])
|
||||||
|
vanilla_pg.start()
|
||||||
|
num_rows_found = vanilla_pg.safe_psql('select count(*) from tbl;', user="cloud_admin")[0][0]
|
||||||
|
assert num_rows == num_rows_found
|
||||||
@@ -1373,12 +1373,13 @@ def pg_bin(test_output_dir: str) -> PgBin:
|
|||||||
|
|
||||||
|
|
||||||
class VanillaPostgres(PgProtocol):
|
class VanillaPostgres(PgProtocol):
|
||||||
def __init__(self, pgdatadir: str, pg_bin: PgBin, port: int):
|
def __init__(self, pgdatadir: str, pg_bin: PgBin, port: int, init=True):
|
||||||
super().__init__(host='localhost', port=port, dbname='postgres')
|
super().__init__(host='localhost', port=port, dbname='postgres')
|
||||||
self.pgdatadir = pgdatadir
|
self.pgdatadir = pgdatadir
|
||||||
self.pg_bin = pg_bin
|
self.pg_bin = pg_bin
|
||||||
self.running = False
|
self.running = False
|
||||||
self.pg_bin.run_capture(['initdb', '-D', pgdatadir])
|
if init:
|
||||||
|
self.pg_bin.run_capture(['initdb', '-D', pgdatadir])
|
||||||
self.configure([f"port = {port}\n"])
|
self.configure([f"port = {port}\n"])
|
||||||
|
|
||||||
def configure(self, options: List[str]):
|
def configure(self, options: List[str]):
|
||||||
|
|||||||
Reference in New Issue
Block a user