Backported 36ee182d. Implement page servise 'fullbackup' endpoint that works like basebackup, but also sends relational files

This commit is contained in:
Anastasia Lubennikova
2022-06-07 14:10:03 +03:00
committed by Bojan Serafimov
parent 063f9ba81d
commit 1a9b8d9255
7 changed files with 212 additions and 22 deletions

3
Cargo.lock generated
View File

@@ -246,7 +246,7 @@ dependencies = [
[[package]]
name = "bookfile"
version = "0.3.0"
source = "git+https://github.com/zenithdb/bookfile.git?branch=generic-readext#d51a99c7a0be48c3d9cc7cb85c9b7fb05ce1100c"
source = "git+https://github.com/neondatabase/bookfile.git?branch=main#bf6e43825dfb6e749ae9b80e8372c8fea76cec2f"
dependencies = [
"aversion",
"byteorder",
@@ -1467,6 +1467,7 @@ dependencies = [
"hex-literal",
"humantime",
"hyper",
"itertools",
"lazy_static",
"log",
"nix",

View File

@@ -4,7 +4,7 @@ version = "0.1.0"
edition = "2021"
[dependencies]
bookfile = { git = "https://github.com/zenithdb/bookfile.git", branch="generic-readext" }
bookfile = { git = "https://github.com/neondatabase/bookfile.git", branch="main" }
chrono = "0.4.19"
rand = "0.8.3"
regex = "1.4.5"
@@ -16,6 +16,7 @@ lazy_static = "1.4.0"
log = "0.4.14"
clap = "3.0"
daemonize = "0.4.1"
itertools = "0.10.3"
tokio = { version = "1.11", features = ["process", "sync", "macros", "fs", "rt", "io-util", "time"] }
postgres-types = { git = "https://github.com/zenithdb/rust-postgres.git", rev="2949d98df52587d562986aad155dd4e889e408b7" }
postgres-protocol = { git = "https://github.com/zenithdb/rust-postgres.git", rev="2949d98df52587d562986aad155dd4e889e408b7" }

View File

@@ -10,8 +10,9 @@
//! This module is responsible for creation of such tarball
//! from data stored in object storage.
//!
use anyhow::{Context, Result};
use anyhow::{anyhow, Context, Result};
use bytes::{BufMut, BytesMut};
use itertools::Itertools;
use log::*;
use std::fmt::Write as FmtWrite;
use std::io;
@@ -34,9 +35,11 @@ pub struct Basebackup<'a> {
timeline: &'a Arc<dyn Timeline>,
pub lsn: Lsn,
prev_record_lsn: Lsn,
full_backup: bool,
}
// Create basebackup with non-rel data in it. Omit relational data.
// Create basebackup with non-rel data in it.
// Only include relational data if 'full_backup' is true.
//
// Currently we use empty lsn in two cases:
// * During the basebackup right after timeline creation
@@ -48,6 +51,7 @@ impl<'a> Basebackup<'a> {
write: &'a mut dyn Write,
timeline: &'a Arc<dyn Timeline>,
req_lsn: Option<Lsn>,
full_backup: bool,
) -> Result<Basebackup<'a>> {
// Compute postgres doesn't have any previous WAL files, but the first
// record that it's going to write needs to include the LSN of the
@@ -83,8 +87,8 @@ impl<'a> Basebackup<'a> {
};
info!(
"taking basebackup lsn={}, prev_lsn={}",
backup_lsn, backup_prev
"taking basebackup lsn={}, prev_lsn={} (full_backup={})",
backup_lsn, backup_prev, full_backup
);
Ok(Basebackup {
@@ -92,6 +96,7 @@ impl<'a> Basebackup<'a> {
timeline,
lsn: backup_lsn,
prev_record_lsn: backup_prev,
full_backup,
})
}
@@ -130,6 +135,14 @@ impl<'a> Basebackup<'a> {
}
}
// Gather relational files if we are doing a full backup.
if self.full_backup {
let all_rels = self.timeline.list_rels(0, 0, self.lsn)?;
for rel in all_rels {
self.add_rel(rel)?;
}
}
// Generate pg_control and bootstrap WAL segment.
self.add_pgcontrol_file()?;
self.ar.finish()?;
@@ -137,6 +150,51 @@ impl<'a> Basebackup<'a> {
Ok(())
}
fn add_rel(&mut self, rel: RelishTag) -> anyhow::Result<()> {
let tag = match rel {
RelishTag::Relation(tag) => tag,
_ => {
return Err(anyhow!("expected RelishTag::Rel, got {:?}", rel));
}
};
// Function that adds relation segment data to archive
let mut add_file = |segment_index, data: &Vec<u8>| -> anyhow::Result<()> {
let file_name = tag.to_segfile_name(segment_index as u32);
let header = new_tar_header(&file_name, data.len() as u64)?;
self.ar.append(&header, data.as_slice())?;
Ok(())
};
let nblocks = match self.timeline.get_relish_size(rel, self.lsn)? {
Some(nblocks) => nblocks,
None => {
warn!("rel {} is truncated in timeline", tag);
return Ok(());
}
};
// If the relation is empty, create an empty file
if nblocks == 0 {
add_file(0, &vec![])?;
return Ok(());
}
// Add a file for each chunk of blocks (aka segment)
let chunks = (0..nblocks).chunks(pg_constants::RELSEG_SIZE as usize);
for (seg, blocks) in chunks.into_iter().enumerate() {
let mut segment_data: Vec<u8> = vec![];
for blknum in blocks {
let img = self.timeline.get_page_at_lsn(rel, blknum, self.lsn)?;
segment_data.extend_from_slice(&img[..]);
}
add_file(seg, &segment_data)?;
}
Ok(())
}
//
// Generate SLRU segment files from repository.
//
@@ -264,21 +322,24 @@ impl<'a> Basebackup<'a> {
pg_control.checkPointCopy = checkpoint;
pg_control.state = pg_constants::DB_SHUTDOWNED;
// add zenith.signal file
let mut zenith_signal = String::new();
if self.prev_record_lsn == Lsn(0) {
if self.lsn == self.timeline.get_ancestor_lsn() {
write!(zenith_signal, "PREV LSN: none")?;
// Postgres doesn't recognize the zenith.signal file and doesn't need it.
if !self.full_backup {
// add zenith.signal file
let mut zenith_signal = String::new();
if self.prev_record_lsn == Lsn(0) {
if self.lsn == self.timeline.get_ancestor_lsn() {
write!(zenith_signal, "PREV LSN: none")?;
} else {
write!(zenith_signal, "PREV LSN: invalid")?;
}
} else {
write!(zenith_signal, "PREV LSN: invalid")?;
write!(zenith_signal, "PREV LSN: {}", self.prev_record_lsn)?;
}
} else {
write!(zenith_signal, "PREV LSN: {}", self.prev_record_lsn)?;
self.ar.append(
&new_tar_header("zenith.signal", zenith_signal.len() as u64)?,
zenith_signal.as_bytes(),
)?;
}
self.ar.append(
&new_tar_header("zenith.signal", zenith_signal.len() as u64)?,
zenith_signal.as_bytes(),
)?;
//send pg_control
let pg_control_bytes = pg_control.encode();

View File

@@ -515,6 +515,7 @@ impl PageServerHandler {
timelineid: ZTimelineId,
lsn: Option<Lsn>,
tenantid: ZTenantId,
full_backup: bool,
) -> anyhow::Result<()> {
let span = info_span!("basebackup", timeline = %timelineid, tenant = %tenantid, lsn = field::Empty);
let _enter = span.enter();
@@ -535,7 +536,8 @@ impl PageServerHandler {
/* Send a tarball of the latest layer on the timeline */
{
let mut writer = CopyDataSink { pgb };
let mut basebackup = basebackup::Basebackup::new(&mut writer, &timeline, lsn)?;
let mut basebackup =
basebackup::Basebackup::new(&mut writer, &timeline, lsn, full_backup)?;
span.record("lsn", &basebackup.lsn.to_string().as_str());
basebackup.send_tarball()?;
}
@@ -635,7 +637,33 @@ impl postgres_backend::Handler for PageServerHandler {
};
// Check that the timeline exists
self.handle_basebackup_request(pgb, timelineid, lsn, tenantid)?;
self.handle_basebackup_request(pgb, timelineid, lsn, tenantid, false)?;
pgb.write_message_noflush(&BeMessage::CommandComplete(b"SELECT 1"))?;
}
// same as basebackup, but result includes relational data as well
else if query_string.starts_with("fullbackup ") {
let (_, params_raw) = query_string.split_at("fullbackup ".len());
let params = params_raw.split_whitespace().collect::<Vec<_>>();
ensure!(
params.len() == 3,
"invalid param number for fullbackup command"
);
let tenantid = ZTenantId::from_str(params[0])?;
let timelineid = ZTimelineId::from_str(params[1])?;
self.check_permission(Some(tenantid))?;
// Lsn is required for fullbackup, because otherwise we would not know
// at which lsn to upload this backup.
//
// The caller is responsible for providing a valid lsn
// and using it in the subsequent import.
let lsn = Some(Lsn::from_str(params[2])?);
// Check that the timeline exists
self.handle_basebackup_request(pgb, timelineid, lsn, tenantid, true)?;
pgb.write_message_noflush(&BeMessage::CommandComplete(b"SELECT 1"))?;
} else if query_string.starts_with("callmemaybe ") {
// callmemaybe <zenith tenantid as hex string> <zenith timelineid as hex string> <connstr>

View File

@@ -26,6 +26,7 @@
use serde::{Deserialize, Serialize};
use std::fmt;
use postgres_ffi::pg_constants;
use postgres_ffi::relfile_utils::forknumber_to_name;
use postgres_ffi::{Oid, TransactionId};
@@ -170,6 +171,30 @@ impl fmt::Display for RelTag {
}
}
impl RelTag {
pub fn to_segfile_name(&self, segno: u32) -> String {
let mut name = if self.spcnode == pg_constants::GLOBALTABLESPACE_OID {
"global/".to_string()
} else {
format!("base/{}/", self.dbnode)
};
name += &self.relnode.to_string();
if let Some(fork_name) = forknumber_to_name(self.forknum) {
name += "_";
name += fork_name;
}
if segno != 0 {
name += ".";
name += &segno.to_string();
}
name
}
}
/// Display RelTag in the same format that's used in most PostgreSQL debug messages:
///
/// <spcnode>/<dbnode>/<relnode>[_fsm|_vm|_init]

View File

@@ -0,0 +1,73 @@
import subprocess
from contextlib import closing
import psycopg2.extras
import pytest
from fixtures.log_helper import log
from fixtures.zenith_fixtures import ZenithEnvBuilder, PgBin, PortDistributor, VanillaPostgres
from fixtures.zenith_fixtures import pg_distrib_dir
import os
from fixtures.utils import mkdir_if_needed, subprocess_capture
import shutil
import getpass
import pwd
num_rows = 1000
# Ensure that regular postgres can start from fullbackup
def test_fullbackup(zenith_env_builder: ZenithEnvBuilder,
pg_bin: PgBin,
port_distributor: PortDistributor):
zenith_env_builder.num_safekeepers = 1
env = zenith_env_builder.init_start()
env.zenith_cli.create_branch('test_fullbackup')
pgmain = env.postgres.create_start('test_fullbackup')
log.info("postgres is running on 'test_fullbackup' branch")
timeline = pgmain.safe_psql("SHOW zenith.zenith_timeline")[0][0]
with closing(pgmain.connect()) as conn:
with conn.cursor() as cur:
# data loading may take a while, so increase statement timeout
cur.execute("SET statement_timeout='300s'")
cur.execute(f'''CREATE TABLE tbl AS SELECT 'long string to consume some space' || g
from generate_series(1,{num_rows}) g''')
cur.execute("CHECKPOINT")
cur.execute('SELECT pg_current_wal_insert_lsn()')
lsn = cur.fetchone()[0]
log.info(f"start_backup_lsn = {lsn}")
# Set LD_LIBRARY_PATH in the env properly, otherwise we may use the wrong libpq.
# PgBin sets it automatically, but here we need to pipe psql output to the tar command.
psql_env = {'LD_LIBRARY_PATH': os.path.join(str(pg_distrib_dir), 'lib')}
# Get and unpack fullbackup from pageserver
restored_dir_path = os.path.join(env.repo_dir, "restored_datadir")
os.mkdir(restored_dir_path, 0o750)
query = f"fullbackup {env.initial_tenant.hex} {timeline} {lsn}"
cmd = ["psql", "--no-psqlrc", env.pageserver.connstr(), "-c", query]
result_basepath = pg_bin.run_capture(cmd, env=psql_env)
tar_output_file = result_basepath + ".stdout"
subprocess_capture(str(env.repo_dir), ["tar", "-xf", tar_output_file, "-C", restored_dir_path])
# HACK
# fullbackup returns zenith specific pg_control and first WAL segment
# use resetwal to overwrite it
pg_resetwal_path = os.path.join(pg_bin.pg_bin_path, 'pg_resetwal')
cmd = [pg_resetwal_path, "-D", restored_dir_path]
pg_bin.run_capture(cmd, env=psql_env)
# Restore from the backup and find the data we inserted
port = port_distributor.get_port()
with VanillaPostgres(restored_dir_path, pg_bin, port, init=False) as vanilla_pg:
# TODO make port an optional argument
vanilla_pg.configure([
f"port={port}",
])
vanilla_pg.start()
num_rows_found = vanilla_pg.safe_psql('select count(*) from tbl;', user="cloud_admin")[0][0]
assert num_rows == num_rows_found

View File

@@ -1274,12 +1274,13 @@ def pg_bin(test_output_dir: str) -> PgBin:
class VanillaPostgres(PgProtocol):
def __init__(self, pgdatadir: str, pg_bin: PgBin, port: int):
def __init__(self, pgdatadir: str, pg_bin: PgBin, port: int, init=True):
super().__init__(host='localhost', port=port)
self.pgdatadir = pgdatadir
self.pg_bin = pg_bin
self.running = False
self.pg_bin.run_capture(['initdb', '-D', pgdatadir])
if init:
self.pg_bin.run_capture(['initdb', '-D', pgdatadir])
def configure(self, options: List[str]):
"""Append lines into postgresql.conf file."""