diff --git a/pageserver/src/basebackup.rs b/pageserver/src/basebackup.rs index 46d824b2e2..44a6442522 100644 --- a/pageserver/src/basebackup.rs +++ b/pageserver/src/basebackup.rs @@ -13,6 +13,7 @@ use anyhow::{anyhow, bail, ensure, Context, Result}; use bytes::{BufMut, BytesMut}; use fail::fail_point; +use itertools::Itertools; use std::fmt::Write as FmtWrite; use std::io; use std::io::Write; @@ -21,7 +22,7 @@ use std::time::SystemTime; use tar::{Builder, EntryType, Header}; use tracing::*; -use crate::reltag::SlruKind; +use crate::reltag::{RelTag, SlruKind}; use crate::repository::Timeline; use crate::DatadirTimelineImpl; use postgres_ffi::xlog_utils::*; @@ -39,11 +40,12 @@ where timeline: &'a Arc, pub lsn: Lsn, prev_record_lsn: Lsn, - + full_backup: bool, finished: bool, } -// Create basebackup with non-rel data in it. Omit relational data. +// Create basebackup with non-rel data in it. +// Only include relational data if 'full_backup' is true. // // Currently we use empty lsn in two cases: // * During the basebackup right after timeline creation @@ -58,6 +60,7 @@ where write: W, timeline: &'a Arc, req_lsn: Option, + full_backup: bool, ) -> Result> { // Compute postgres doesn't have any previous WAL files, but the first // record that it's going to write needs to include the LSN of the @@ -94,8 +97,8 @@ where }; info!( - "taking basebackup lsn={}, prev_lsn={}", - backup_lsn, backup_prev + "taking basebackup lsn={}, prev_lsn={} (full_backup={})", + backup_lsn, backup_prev, full_backup ); Ok(Basebackup { @@ -103,6 +106,7 @@ where timeline, lsn: backup_lsn, prev_record_lsn: backup_prev, + full_backup, finished: false, }) } @@ -140,6 +144,13 @@ where // Create tablespace directories for ((spcnode, dbnode), has_relmap_file) in self.timeline.list_dbdirs(self.lsn)? { self.add_dbdir(spcnode, dbnode, has_relmap_file)?; + + // Gather and send relational files in each database if full backup is requested. + if self.full_backup { + for rel in self.timeline.list_rels(spcnode, dbnode, self.lsn)? { + self.add_rel(rel)?; + } + } } for xid in self.timeline.list_twophase_files(self.lsn)? { self.add_twophase_file(xid)?; @@ -157,6 +168,38 @@ where Ok(()) } + fn add_rel(&mut self, tag: RelTag) -> anyhow::Result<()> { + let nblocks = self.timeline.get_rel_size(tag, self.lsn)?; + + // Function that adds relation segment data to archive + let mut add_file = |segment_index, data: &Vec| -> anyhow::Result<()> { + let file_name = tag.to_segfile_name(segment_index as u32); + let header = new_tar_header(&file_name, data.len() as u64)?; + self.ar.append(&header, data.as_slice())?; + Ok(()) + }; + + // If the relation is empty, create an empty file + if nblocks == 0 { + add_file(0, &vec![])?; + return Ok(()); + } + + // Add a file for each chunk of blocks (aka segment) + let chunks = (0..nblocks).chunks(pg_constants::RELSEG_SIZE as usize); + for (seg, blocks) in chunks.into_iter().enumerate() { + let mut segment_data: Vec = vec![]; + for blknum in blocks { + let img = self.timeline.get_rel_page_at_lsn(tag, blknum, self.lsn)?; + segment_data.extend_from_slice(&img[..]); + } + + add_file(seg, &segment_data)?; + } + + Ok(()) + } + // // Generate SLRU segment files from repository. // @@ -312,21 +355,24 @@ where pg_control.checkPointCopy = checkpoint; pg_control.state = pg_constants::DB_SHUTDOWNED; - // add zenith.signal file - let mut zenith_signal = String::new(); - if self.prev_record_lsn == Lsn(0) { - if self.lsn == self.timeline.tline.get_ancestor_lsn() { - write!(zenith_signal, "PREV LSN: none")?; + // Postgres doesn't recognize the zenith.signal file and doesn't need it. + if !self.full_backup { + // add zenith.signal file + let mut zenith_signal = String::new(); + if self.prev_record_lsn == Lsn(0) { + if self.lsn == self.timeline.tline.get_ancestor_lsn() { + write!(zenith_signal, "PREV LSN: none")?; + } else { + write!(zenith_signal, "PREV LSN: invalid")?; + } } else { - write!(zenith_signal, "PREV LSN: invalid")?; + write!(zenith_signal, "PREV LSN: {}", self.prev_record_lsn)?; } - } else { - write!(zenith_signal, "PREV LSN: {}", self.prev_record_lsn)?; + self.ar.append( + &new_tar_header("zenith.signal", zenith_signal.len() as u64)?, + zenith_signal.as_bytes(), + )?; } - self.ar.append( - &new_tar_header("zenith.signal", zenith_signal.len() as u64)?, - zenith_signal.as_bytes(), - )?; //send pg_control let pg_control_bytes = pg_control.encode(); diff --git a/pageserver/src/page_service.rs b/pageserver/src/page_service.rs index b739e55566..fc8205284a 100644 --- a/pageserver/src/page_service.rs +++ b/pageserver/src/page_service.rs @@ -755,6 +755,7 @@ impl PageServerHandler { timelineid: ZTimelineId, lsn: Option, tenantid: ZTenantId, + full_backup: bool, ) -> anyhow::Result<()> { let span = info_span!("basebackup", timeline = %timelineid, tenant = %tenantid, lsn = field::Empty); let _enter = span.enter(); @@ -777,7 +778,7 @@ impl PageServerHandler { { let mut writer = CopyDataSink { pgb }; - let basebackup = basebackup::Basebackup::new(&mut writer, &timeline, lsn)?; + let basebackup = basebackup::Basebackup::new(&mut writer, &timeline, lsn, full_backup)?; span.record("lsn", &basebackup.lsn.to_string().as_str()); basebackup.send_tarball()?; } @@ -880,7 +881,33 @@ impl postgres_backend::Handler for PageServerHandler { }; // Check that the timeline exists - self.handle_basebackup_request(pgb, timelineid, lsn, tenantid)?; + self.handle_basebackup_request(pgb, timelineid, lsn, tenantid, false)?; + pgb.write_message_noflush(&BeMessage::CommandComplete(b"SELECT 1"))?; + } + // same as basebackup, but result includes relational data as well + else if query_string.starts_with("fullbackup ") { + let (_, params_raw) = query_string.split_at("fullbackup ".len()); + let params = params_raw.split_whitespace().collect::>(); + + ensure!( + params.len() == 3, + "invalid param number for fullbackup command" + ); + + let tenantid = ZTenantId::from_str(params[0])?; + let timelineid = ZTimelineId::from_str(params[1])?; + + self.check_permission(Some(tenantid))?; + + // Lsn is required for fullbackup, because otherwise we would not know + // at which lsn to upload this backup. + // + // The caller is responsible for providing a valid lsn + // and using it in the subsequent import. + let lsn = Some(Lsn::from_str(params[2])?); + + // Check that the timeline exists + self.handle_basebackup_request(pgb, timelineid, lsn, tenantid, true)?; pgb.write_message_noflush(&BeMessage::CommandComplete(b"SELECT 1"))?; } else if query_string.starts_with("import basebackup ") { // Import the `base` section (everything but the wal) of a basebackup. diff --git a/pageserver/src/reltag.rs b/pageserver/src/reltag.rs index 18e26cc37a..fadd41f547 100644 --- a/pageserver/src/reltag.rs +++ b/pageserver/src/reltag.rs @@ -3,7 +3,7 @@ use std::cmp::Ordering; use std::fmt; use postgres_ffi::relfile_utils::forknumber_to_name; -use postgres_ffi::Oid; +use postgres_ffi::{pg_constants, Oid}; /// /// Relation data file segment id throughout the Postgres cluster. @@ -75,6 +75,30 @@ impl fmt::Display for RelTag { } } +impl RelTag { + pub fn to_segfile_name(&self, segno: u32) -> String { + let mut name = if self.spcnode == pg_constants::GLOBALTABLESPACE_OID { + "global/".to_string() + } else { + format!("base/{}/", self.dbnode) + }; + + name += &self.relnode.to_string(); + + if let Some(fork_name) = forknumber_to_name(self.forknum) { + name += "_"; + name += fork_name; + } + + if segno != 0 { + name += "."; + name += &segno.to_string(); + } + + name + } +} + /// /// Non-relation transaction status files (clog (a.k.a. pg_xact) and /// pg_multixact) in Postgres are handled by SLRU (Simple LRU) buffer, diff --git a/test_runner/batch_others/test_fullbackup.py b/test_runner/batch_others/test_fullbackup.py new file mode 100644 index 0000000000..e5d705beab --- /dev/null +++ b/test_runner/batch_others/test_fullbackup.py @@ -0,0 +1,73 @@ +import subprocess +from contextlib import closing + +import psycopg2.extras +import pytest +from fixtures.log_helper import log +from fixtures.neon_fixtures import NeonEnvBuilder, PgBin, PortDistributor, VanillaPostgres +from fixtures.neon_fixtures import pg_distrib_dir +import os +from fixtures.utils import mkdir_if_needed, subprocess_capture +import shutil +import getpass +import pwd + +num_rows = 1000 + + +# Ensure that regular postgres can start from fullbackup +def test_fullbackup(neon_env_builder: NeonEnvBuilder, + pg_bin: PgBin, + port_distributor: PortDistributor): + + neon_env_builder.num_safekeepers = 1 + env = neon_env_builder.init_start() + + env.neon_cli.create_branch('test_fullbackup') + pgmain = env.postgres.create_start('test_fullbackup') + log.info("postgres is running on 'test_fullbackup' branch") + + timeline = pgmain.safe_psql("SHOW neon.timeline_id")[0][0] + + with closing(pgmain.connect()) as conn: + with conn.cursor() as cur: + # data loading may take a while, so increase statement timeout + cur.execute("SET statement_timeout='300s'") + cur.execute(f'''CREATE TABLE tbl AS SELECT 'long string to consume some space' || g + from generate_series(1,{num_rows}) g''') + cur.execute("CHECKPOINT") + + cur.execute('SELECT pg_current_wal_insert_lsn()') + lsn = cur.fetchone()[0] + log.info(f"start_backup_lsn = {lsn}") + + # Set LD_LIBRARY_PATH in the env properly, otherwise we may use the wrong libpq. + # PgBin sets it automatically, but here we need to pipe psql output to the tar command. + psql_env = {'LD_LIBRARY_PATH': os.path.join(str(pg_distrib_dir), 'lib')} + + # Get and unpack fullbackup from pageserver + restored_dir_path = os.path.join(env.repo_dir, "restored_datadir") + os.mkdir(restored_dir_path, 0o750) + query = f"fullbackup {env.initial_tenant.hex} {timeline} {lsn}" + cmd = ["psql", "--no-psqlrc", env.pageserver.connstr(), "-c", query] + result_basepath = pg_bin.run_capture(cmd, env=psql_env) + tar_output_file = result_basepath + ".stdout" + subprocess_capture(str(env.repo_dir), ["tar", "-xf", tar_output_file, "-C", restored_dir_path]) + + # HACK + # fullbackup returns neon specific pg_control and first WAL segment + # use resetwal to overwrite it + pg_resetwal_path = os.path.join(pg_bin.pg_bin_path, 'pg_resetwal') + cmd = [pg_resetwal_path, "-D", restored_dir_path] + pg_bin.run_capture(cmd, env=psql_env) + + # Restore from the backup and find the data we inserted + port = port_distributor.get_port() + with VanillaPostgres(restored_dir_path, pg_bin, port, init=False) as vanilla_pg: + # TODO make port an optional argument + vanilla_pg.configure([ + f"port={port}", + ]) + vanilla_pg.start() + num_rows_found = vanilla_pg.safe_psql('select count(*) from tbl;', user="cloud_admin")[0][0] + assert num_rows == num_rows_found diff --git a/test_runner/fixtures/neon_fixtures.py b/test_runner/fixtures/neon_fixtures.py index 3d48f20f11..b3b76fafed 100644 --- a/test_runner/fixtures/neon_fixtures.py +++ b/test_runner/fixtures/neon_fixtures.py @@ -29,7 +29,7 @@ from dataclasses import dataclass # Type-related stuff from psycopg2.extensions import connection as PgConnection from psycopg2.extensions import make_dsn, parse_dsn -from typing import Any, Callable, Dict, Iterable, Iterator, List, Optional, TypeVar, cast, Union, Tuple +from typing import Any, Callable, Dict, Iterator, List, Optional, TypeVar, cast, Union, Tuple from typing_extensions import Literal import requests @@ -1373,12 +1373,14 @@ def pg_bin(test_output_dir: str) -> PgBin: class VanillaPostgres(PgProtocol): - def __init__(self, pgdatadir: str, pg_bin: PgBin, port: int): + def __init__(self, pgdatadir: str, pg_bin: PgBin, port: int, init=True): super().__init__(host='localhost', port=port, dbname='postgres') self.pgdatadir = pgdatadir self.pg_bin = pg_bin self.running = False - self.pg_bin.run_capture(['initdb', '-D', pgdatadir]) + if init: + self.pg_bin.run_capture(['initdb', '-D', pgdatadir]) + self.configure([f"port = {port}\n"]) def configure(self, options: List[str]): """Append lines into postgresql.conf file.""" @@ -1413,10 +1415,12 @@ class VanillaPostgres(PgProtocol): @pytest.fixture(scope='function') -def vanilla_pg(test_output_dir: str) -> Iterator[VanillaPostgres]: +def vanilla_pg(test_output_dir: str, + port_distributor: PortDistributor) -> Iterator[VanillaPostgres]: pgdatadir = os.path.join(test_output_dir, "pgdata-vanilla") pg_bin = PgBin(test_output_dir) - with VanillaPostgres(pgdatadir, pg_bin, 5432) as vanilla_pg: + port = port_distributor.get_port() + with VanillaPostgres(pgdatadir, pg_bin, port) as vanilla_pg: yield vanilla_pg @@ -1462,7 +1466,7 @@ def remote_pg(test_output_dir: str) -> Iterator[RemotePostgres]: class NeonProxy(PgProtocol): - def __init__(self, port: int): + def __init__(self, port: int, pg_port: int): super().__init__(host="127.0.0.1", user="proxy_user", password="pytest2", @@ -1471,9 +1475,10 @@ class NeonProxy(PgProtocol): self.http_port = 7001 self.host = "127.0.0.1" self.port = port + self.pg_port = pg_port self._popen: Optional[subprocess.Popen[bytes]] = None - def start_static(self, addr="127.0.0.1:5432") -> None: + def start(self) -> None: assert self._popen is None # Start proxy @@ -1482,7 +1487,8 @@ class NeonProxy(PgProtocol): args.extend(["--http", f"{self.host}:{self.http_port}"]) args.extend(["--proxy", f"{self.host}:{self.port}"]) args.extend(["--auth-backend", "postgres"]) - args.extend(["--auth-endpoint", "postgres://proxy_auth:pytest1@localhost:5432/postgres"]) + args.extend( + ["--auth-endpoint", f"postgres://proxy_auth:pytest1@localhost:{self.pg_port}/postgres"]) self._popen = subprocess.Popen(args) self._wait_until_ready() @@ -1501,14 +1507,16 @@ class NeonProxy(PgProtocol): @pytest.fixture(scope='function') -def static_proxy(vanilla_pg) -> Iterator[NeonProxy]: +def static_proxy(vanilla_pg, port_distributor) -> Iterator[NeonProxy]: """Neon proxy that routes directly to vanilla postgres.""" vanilla_pg.start() vanilla_pg.safe_psql("create user proxy_auth with password 'pytest1' superuser") vanilla_pg.safe_psql("create user proxy_user with password 'pytest2'") - with NeonProxy(4432) as proxy: - proxy.start_static() + port = port_distributor.get_port() + pg_port = vanilla_pg.default_options['port'] + with NeonProxy(port, pg_port) as proxy: + proxy.start() yield proxy diff --git a/test_runner/fixtures/utils.py b/test_runner/fixtures/utils.py index ba9bc6e113..bfa57373b3 100644 --- a/test_runner/fixtures/utils.py +++ b/test_runner/fixtures/utils.py @@ -3,7 +3,7 @@ import shutil import subprocess from pathlib import Path -from typing import Any, List, Optional +from typing import Any, List from fixtures.log_helper import log