From 1a9b8d92554e5e4c74947dbf5a30160cd6f6d50b Mon Sep 17 00:00:00 2001 From: Anastasia Lubennikova Date: Tue, 7 Jun 2022 14:10:03 +0300 Subject: [PATCH] Backported 36ee182d. Implement page servise 'fullbackup' endpoint that works like basebackup, but also sends relational files --- Cargo.lock | 3 +- pageserver/Cargo.toml | 3 +- pageserver/src/basebackup.rs | 93 +++++++++++++++++---- pageserver/src/page_service.rs | 32 ++++++- pageserver/src/relish.rs | 25 ++++++ test_runner/batch_others/test_fullbackup.py | 73 ++++++++++++++++ test_runner/fixtures/zenith_fixtures.py | 5 +- 7 files changed, 212 insertions(+), 22 deletions(-) create mode 100644 test_runner/batch_others/test_fullbackup.py diff --git a/Cargo.lock b/Cargo.lock index a9de71420b..7f88b8e63b 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -246,7 +246,7 @@ dependencies = [ [[package]] name = "bookfile" version = "0.3.0" -source = "git+https://github.com/zenithdb/bookfile.git?branch=generic-readext#d51a99c7a0be48c3d9cc7cb85c9b7fb05ce1100c" +source = "git+https://github.com/neondatabase/bookfile.git?branch=main#bf6e43825dfb6e749ae9b80e8372c8fea76cec2f" dependencies = [ "aversion", "byteorder", @@ -1467,6 +1467,7 @@ dependencies = [ "hex-literal", "humantime", "hyper", + "itertools", "lazy_static", "log", "nix", diff --git a/pageserver/Cargo.toml b/pageserver/Cargo.toml index efd2fa4a38..1e2be66680 100644 --- a/pageserver/Cargo.toml +++ b/pageserver/Cargo.toml @@ -4,7 +4,7 @@ version = "0.1.0" edition = "2021" [dependencies] -bookfile = { git = "https://github.com/zenithdb/bookfile.git", branch="generic-readext" } +bookfile = { git = "https://github.com/neondatabase/bookfile.git", branch="main" } chrono = "0.4.19" rand = "0.8.3" regex = "1.4.5" @@ -16,6 +16,7 @@ lazy_static = "1.4.0" log = "0.4.14" clap = "3.0" daemonize = "0.4.1" +itertools = "0.10.3" tokio = { version = "1.11", features = ["process", "sync", "macros", "fs", "rt", "io-util", "time"] } postgres-types = { git = "https://github.com/zenithdb/rust-postgres.git", rev="2949d98df52587d562986aad155dd4e889e408b7" } postgres-protocol = { git = "https://github.com/zenithdb/rust-postgres.git", rev="2949d98df52587d562986aad155dd4e889e408b7" } diff --git a/pageserver/src/basebackup.rs b/pageserver/src/basebackup.rs index 1ee48eb2fc..740f48acd0 100644 --- a/pageserver/src/basebackup.rs +++ b/pageserver/src/basebackup.rs @@ -10,8 +10,9 @@ //! This module is responsible for creation of such tarball //! from data stored in object storage. //! -use anyhow::{Context, Result}; +use anyhow::{anyhow, Context, Result}; use bytes::{BufMut, BytesMut}; +use itertools::Itertools; use log::*; use std::fmt::Write as FmtWrite; use std::io; @@ -34,9 +35,11 @@ pub struct Basebackup<'a> { timeline: &'a Arc, pub lsn: Lsn, prev_record_lsn: Lsn, + full_backup: bool, } -// Create basebackup with non-rel data in it. Omit relational data. +// Create basebackup with non-rel data in it. +// Only include relational data if 'full_backup' is true. // // Currently we use empty lsn in two cases: // * During the basebackup right after timeline creation @@ -48,6 +51,7 @@ impl<'a> Basebackup<'a> { write: &'a mut dyn Write, timeline: &'a Arc, req_lsn: Option, + full_backup: bool, ) -> Result> { // Compute postgres doesn't have any previous WAL files, but the first // record that it's going to write needs to include the LSN of the @@ -83,8 +87,8 @@ impl<'a> Basebackup<'a> { }; info!( - "taking basebackup lsn={}, prev_lsn={}", - backup_lsn, backup_prev + "taking basebackup lsn={}, prev_lsn={} (full_backup={})", + backup_lsn, backup_prev, full_backup ); Ok(Basebackup { @@ -92,6 +96,7 @@ impl<'a> Basebackup<'a> { timeline, lsn: backup_lsn, prev_record_lsn: backup_prev, + full_backup, }) } @@ -130,6 +135,14 @@ impl<'a> Basebackup<'a> { } } + // Gather relational files if we are doing a full backup. + if self.full_backup { + let all_rels = self.timeline.list_rels(0, 0, self.lsn)?; + for rel in all_rels { + self.add_rel(rel)?; + } + } + // Generate pg_control and bootstrap WAL segment. self.add_pgcontrol_file()?; self.ar.finish()?; @@ -137,6 +150,51 @@ impl<'a> Basebackup<'a> { Ok(()) } + fn add_rel(&mut self, rel: RelishTag) -> anyhow::Result<()> { + let tag = match rel { + RelishTag::Relation(tag) => tag, + _ => { + return Err(anyhow!("expected RelishTag::Rel, got {:?}", rel)); + } + }; + + // Function that adds relation segment data to archive + let mut add_file = |segment_index, data: &Vec| -> anyhow::Result<()> { + let file_name = tag.to_segfile_name(segment_index as u32); + let header = new_tar_header(&file_name, data.len() as u64)?; + self.ar.append(&header, data.as_slice())?; + Ok(()) + }; + + let nblocks = match self.timeline.get_relish_size(rel, self.lsn)? { + Some(nblocks) => nblocks, + None => { + warn!("rel {} is truncated in timeline", tag); + return Ok(()); + } + }; + + // If the relation is empty, create an empty file + if nblocks == 0 { + add_file(0, &vec![])?; + return Ok(()); + } + + // Add a file for each chunk of blocks (aka segment) + let chunks = (0..nblocks).chunks(pg_constants::RELSEG_SIZE as usize); + for (seg, blocks) in chunks.into_iter().enumerate() { + let mut segment_data: Vec = vec![]; + for blknum in blocks { + let img = self.timeline.get_page_at_lsn(rel, blknum, self.lsn)?; + segment_data.extend_from_slice(&img[..]); + } + + add_file(seg, &segment_data)?; + } + + Ok(()) + } + // // Generate SLRU segment files from repository. // @@ -264,21 +322,24 @@ impl<'a> Basebackup<'a> { pg_control.checkPointCopy = checkpoint; pg_control.state = pg_constants::DB_SHUTDOWNED; - // add zenith.signal file - let mut zenith_signal = String::new(); - if self.prev_record_lsn == Lsn(0) { - if self.lsn == self.timeline.get_ancestor_lsn() { - write!(zenith_signal, "PREV LSN: none")?; + // Postgres doesn't recognize the zenith.signal file and doesn't need it. + if !self.full_backup { + // add zenith.signal file + let mut zenith_signal = String::new(); + if self.prev_record_lsn == Lsn(0) { + if self.lsn == self.timeline.get_ancestor_lsn() { + write!(zenith_signal, "PREV LSN: none")?; + } else { + write!(zenith_signal, "PREV LSN: invalid")?; + } } else { - write!(zenith_signal, "PREV LSN: invalid")?; + write!(zenith_signal, "PREV LSN: {}", self.prev_record_lsn)?; } - } else { - write!(zenith_signal, "PREV LSN: {}", self.prev_record_lsn)?; + self.ar.append( + &new_tar_header("zenith.signal", zenith_signal.len() as u64)?, + zenith_signal.as_bytes(), + )?; } - self.ar.append( - &new_tar_header("zenith.signal", zenith_signal.len() as u64)?, - zenith_signal.as_bytes(), - )?; //send pg_control let pg_control_bytes = pg_control.encode(); diff --git a/pageserver/src/page_service.rs b/pageserver/src/page_service.rs index 6e6b6415f3..e3feb52d4d 100644 --- a/pageserver/src/page_service.rs +++ b/pageserver/src/page_service.rs @@ -515,6 +515,7 @@ impl PageServerHandler { timelineid: ZTimelineId, lsn: Option, tenantid: ZTenantId, + full_backup: bool, ) -> anyhow::Result<()> { let span = info_span!("basebackup", timeline = %timelineid, tenant = %tenantid, lsn = field::Empty); let _enter = span.enter(); @@ -535,7 +536,8 @@ impl PageServerHandler { /* Send a tarball of the latest layer on the timeline */ { let mut writer = CopyDataSink { pgb }; - let mut basebackup = basebackup::Basebackup::new(&mut writer, &timeline, lsn)?; + let mut basebackup = + basebackup::Basebackup::new(&mut writer, &timeline, lsn, full_backup)?; span.record("lsn", &basebackup.lsn.to_string().as_str()); basebackup.send_tarball()?; } @@ -635,7 +637,33 @@ impl postgres_backend::Handler for PageServerHandler { }; // Check that the timeline exists - self.handle_basebackup_request(pgb, timelineid, lsn, tenantid)?; + self.handle_basebackup_request(pgb, timelineid, lsn, tenantid, false)?; + pgb.write_message_noflush(&BeMessage::CommandComplete(b"SELECT 1"))?; + } + // same as basebackup, but result includes relational data as well + else if query_string.starts_with("fullbackup ") { + let (_, params_raw) = query_string.split_at("fullbackup ".len()); + let params = params_raw.split_whitespace().collect::>(); + + ensure!( + params.len() == 3, + "invalid param number for fullbackup command" + ); + + let tenantid = ZTenantId::from_str(params[0])?; + let timelineid = ZTimelineId::from_str(params[1])?; + + self.check_permission(Some(tenantid))?; + + // Lsn is required for fullbackup, because otherwise we would not know + // at which lsn to upload this backup. + // + // The caller is responsible for providing a valid lsn + // and using it in the subsequent import. + let lsn = Some(Lsn::from_str(params[2])?); + + // Check that the timeline exists + self.handle_basebackup_request(pgb, timelineid, lsn, tenantid, true)?; pgb.write_message_noflush(&BeMessage::CommandComplete(b"SELECT 1"))?; } else if query_string.starts_with("callmemaybe ") { // callmemaybe diff --git a/pageserver/src/relish.rs b/pageserver/src/relish.rs index 9228829aef..2a8ab81266 100644 --- a/pageserver/src/relish.rs +++ b/pageserver/src/relish.rs @@ -26,6 +26,7 @@ use serde::{Deserialize, Serialize}; use std::fmt; +use postgres_ffi::pg_constants; use postgres_ffi::relfile_utils::forknumber_to_name; use postgres_ffi::{Oid, TransactionId}; @@ -170,6 +171,30 @@ impl fmt::Display for RelTag { } } +impl RelTag { + pub fn to_segfile_name(&self, segno: u32) -> String { + let mut name = if self.spcnode == pg_constants::GLOBALTABLESPACE_OID { + "global/".to_string() + } else { + format!("base/{}/", self.dbnode) + }; + + name += &self.relnode.to_string(); + + if let Some(fork_name) = forknumber_to_name(self.forknum) { + name += "_"; + name += fork_name; + } + + if segno != 0 { + name += "."; + name += &segno.to_string(); + } + + name + } +} + /// Display RelTag in the same format that's used in most PostgreSQL debug messages: /// /// //[_fsm|_vm|_init] diff --git a/test_runner/batch_others/test_fullbackup.py b/test_runner/batch_others/test_fullbackup.py new file mode 100644 index 0000000000..f3090278dd --- /dev/null +++ b/test_runner/batch_others/test_fullbackup.py @@ -0,0 +1,73 @@ +import subprocess +from contextlib import closing + +import psycopg2.extras +import pytest +from fixtures.log_helper import log +from fixtures.zenith_fixtures import ZenithEnvBuilder, PgBin, PortDistributor, VanillaPostgres +from fixtures.zenith_fixtures import pg_distrib_dir +import os +from fixtures.utils import mkdir_if_needed, subprocess_capture +import shutil +import getpass +import pwd + +num_rows = 1000 + + +# Ensure that regular postgres can start from fullbackup +def test_fullbackup(zenith_env_builder: ZenithEnvBuilder, + pg_bin: PgBin, + port_distributor: PortDistributor): + + zenith_env_builder.num_safekeepers = 1 + env = zenith_env_builder.init_start() + + env.zenith_cli.create_branch('test_fullbackup') + pgmain = env.postgres.create_start('test_fullbackup') + log.info("postgres is running on 'test_fullbackup' branch") + + timeline = pgmain.safe_psql("SHOW zenith.zenith_timeline")[0][0] + + with closing(pgmain.connect()) as conn: + with conn.cursor() as cur: + # data loading may take a while, so increase statement timeout + cur.execute("SET statement_timeout='300s'") + cur.execute(f'''CREATE TABLE tbl AS SELECT 'long string to consume some space' || g + from generate_series(1,{num_rows}) g''') + cur.execute("CHECKPOINT") + + cur.execute('SELECT pg_current_wal_insert_lsn()') + lsn = cur.fetchone()[0] + log.info(f"start_backup_lsn = {lsn}") + + # Set LD_LIBRARY_PATH in the env properly, otherwise we may use the wrong libpq. + # PgBin sets it automatically, but here we need to pipe psql output to the tar command. + psql_env = {'LD_LIBRARY_PATH': os.path.join(str(pg_distrib_dir), 'lib')} + + # Get and unpack fullbackup from pageserver + restored_dir_path = os.path.join(env.repo_dir, "restored_datadir") + os.mkdir(restored_dir_path, 0o750) + query = f"fullbackup {env.initial_tenant.hex} {timeline} {lsn}" + cmd = ["psql", "--no-psqlrc", env.pageserver.connstr(), "-c", query] + result_basepath = pg_bin.run_capture(cmd, env=psql_env) + tar_output_file = result_basepath + ".stdout" + subprocess_capture(str(env.repo_dir), ["tar", "-xf", tar_output_file, "-C", restored_dir_path]) + + # HACK + # fullbackup returns zenith specific pg_control and first WAL segment + # use resetwal to overwrite it + pg_resetwal_path = os.path.join(pg_bin.pg_bin_path, 'pg_resetwal') + cmd = [pg_resetwal_path, "-D", restored_dir_path] + pg_bin.run_capture(cmd, env=psql_env) + + # Restore from the backup and find the data we inserted + port = port_distributor.get_port() + with VanillaPostgres(restored_dir_path, pg_bin, port, init=False) as vanilla_pg: + # TODO make port an optional argument + vanilla_pg.configure([ + f"port={port}", + ]) + vanilla_pg.start() + num_rows_found = vanilla_pg.safe_psql('select count(*) from tbl;', user="cloud_admin")[0][0] + assert num_rows == num_rows_found diff --git a/test_runner/fixtures/zenith_fixtures.py b/test_runner/fixtures/zenith_fixtures.py index fa68c4f476..70f504287d 100644 --- a/test_runner/fixtures/zenith_fixtures.py +++ b/test_runner/fixtures/zenith_fixtures.py @@ -1274,12 +1274,13 @@ def pg_bin(test_output_dir: str) -> PgBin: class VanillaPostgres(PgProtocol): - def __init__(self, pgdatadir: str, pg_bin: PgBin, port: int): + def __init__(self, pgdatadir: str, pg_bin: PgBin, port: int, init=True): super().__init__(host='localhost', port=port) self.pgdatadir = pgdatadir self.pg_bin = pg_bin self.running = False - self.pg_bin.run_capture(['initdb', '-D', pgdatadir]) + if init: + self.pg_bin.run_capture(['initdb', '-D', pgdatadir]) def configure(self, options: List[str]): """Append lines into postgresql.conf file."""