diff --git a/pageserver/src/basebackup.rs b/pageserver/src/basebackup.rs index 44a6442522..0efd931284 100644 --- a/pageserver/src/basebackup.rs +++ b/pageserver/src/basebackup.rs @@ -355,24 +355,21 @@ where pg_control.checkPointCopy = checkpoint; pg_control.state = pg_constants::DB_SHUTDOWNED; - // Postgres doesn't recognize the zenith.signal file and doesn't need it. - if !self.full_backup { - // add zenith.signal file - let mut zenith_signal = String::new(); - if self.prev_record_lsn == Lsn(0) { - if self.lsn == self.timeline.tline.get_ancestor_lsn() { - write!(zenith_signal, "PREV LSN: none")?; - } else { - write!(zenith_signal, "PREV LSN: invalid")?; - } + // add zenith.signal file + let mut zenith_signal = String::new(); + if self.prev_record_lsn == Lsn(0) { + if self.lsn == self.timeline.tline.get_ancestor_lsn() { + write!(zenith_signal, "PREV LSN: none")?; } else { - write!(zenith_signal, "PREV LSN: {}", self.prev_record_lsn)?; + write!(zenith_signal, "PREV LSN: invalid")?; } - self.ar.append( - &new_tar_header("zenith.signal", zenith_signal.len() as u64)?, - zenith_signal.as_bytes(), - )?; + } else { + write!(zenith_signal, "PREV LSN: {}", self.prev_record_lsn)?; } + self.ar.append( + &new_tar_header("zenith.signal", zenith_signal.len() as u64)?, + zenith_signal.as_bytes(), + )?; //send pg_control let pg_control_bytes = pg_control.encode(); diff --git a/pageserver/src/import_datadir.rs b/pageserver/src/import_datadir.rs index 998f178cd8..20b8b577db 100644 --- a/pageserver/src/import_datadir.rs +++ b/pageserver/src/import_datadir.rs @@ -14,6 +14,7 @@ use walkdir::WalkDir; use crate::pgdatadir_mapping::*; use crate::reltag::{RelTag, SlruKind}; use crate::repository::Repository; +use crate::repository::Timeline; use crate::walingest::WalIngest; use postgres_ffi::relfile_utils::*; use postgres_ffi::waldecoder::*; @@ -162,7 +163,6 @@ fn import_rel( Ok(()) } -/// /// Import an SLRU segment file /// fn import_slru( @@ -295,6 +295,8 @@ pub fn import_basebackup_from_tar( let mut modification = tline.begin_modification(base_lsn); modification.init_empty()?; + let mut pg_control: Option = None; + // Import base for base_tar_entry in tar::Archive::new(reader).entries()? { let entry = base_tar_entry.unwrap(); @@ -307,7 +309,10 @@ pub fn import_basebackup_from_tar( // let mut buffer = Vec::new(); // entry.read_to_end(&mut buffer).unwrap(); - import_file(&mut modification, file_path.as_ref(), entry, len)?; + if let Some(res) = import_file(&mut modification, file_path.as_ref(), entry, len)? { + // We found the pg_control file. + pg_control = Some(res); + } } tar::EntryType::Directory => { info!("directory {:?}", file_path); @@ -321,6 +326,9 @@ pub fn import_basebackup_from_tar( } } + // sanity check: ensure that pg_control is loaded + let _pg_control = pg_control.context("pg_control file not found")?; + modification.commit()?; Ok(()) } @@ -485,12 +493,24 @@ pub fn import_file( let bytes = read_all_bytes(reader)?; modification.put_twophase_file(xid, Bytes::copy_from_slice(&bytes[..]))?; info!("imported twophase file"); + } else if file_path.starts_with("pg_wal") { + info!("found wal file in base section. ignore it"); + } else if file_path.starts_with("zenith.signal") { + // Parse zenith signal file to set correct previous LSN + let bytes = read_all_bytes(reader)?; + // zenith.signal format is "PREV LSN: prev_lsn" + let zenith_signal = std::str::from_utf8(&bytes).unwrap(); + let zenith_signal = zenith_signal.split(':').collect::>(); + let prev_lsn = zenith_signal[1].trim().parse::().unwrap(); + + let writer = modification.tline.tline.writer(); + writer.finish_write(prev_lsn); + + info!("imported zenith signal {}", prev_lsn); } else if file_path.starts_with("pg_tblspc") { // TODO Backups exported from neon won't have pg_tblspc, but we will need // this to import arbitrary postgres databases. bail!("Importing pg_tblspc is not implemented"); - } else if file_path.starts_with("pg_wal") { - panic!("found wal file in base section"); } else { info!("ignored"); } diff --git a/pageserver/src/page_service.rs b/pageserver/src/page_service.rs index b5395ecc23..296175d9f0 100644 --- a/pageserver/src/page_service.rs +++ b/pageserver/src/page_service.rs @@ -564,6 +564,12 @@ impl PageServerHandler { // TODO leave clean state on error + // Import basebackup provided via CopyData + info!("importing basebackup"); + pgb.write_message(&BeMessage::CopyInResponse)?; + let reader = CopyInReader::new(pgb); + import_basebackup_from_tar(&mut datadir_timeline, reader, base_lsn)?; + // Flush data to disk, then upload to s3 info!("flushing layers"); datadir_timeline.tline.checkpoint(CheckpointConfig::Flush)?; @@ -571,12 +577,6 @@ impl PageServerHandler { // TODO Wait for s3 upload to complete // info!("uploading layers"); - // Import basebackup provided via CopyData - info!("importing basebackup"); - pgb.write_message(&BeMessage::CopyInResponse)?; - let reader = CopyInReader::new(pgb); - import_basebackup_from_tar(&mut datadir_timeline, reader, base_lsn)?; - info!("done"); Ok(()) } diff --git a/test_runner/batch_others/test_import.py b/test_runner/batch_others/test_import.py index 2a916a5685..92277a5792 100644 --- a/test_runner/batch_others/test_import.py +++ b/test_runner/batch_others/test_import.py @@ -5,6 +5,10 @@ import os import shutil from pathlib import Path import json +from fixtures.utils import subprocess_capture +from fixtures.log_helper import log +from contextlib import closing +from fixtures.neon_fixtures import pg_distrib_dir def test_import_from_vanilla(test_output_dir, pg_bin, vanilla_pg, neon_env_builder): @@ -66,3 +70,76 @@ def test_import_from_vanilla(test_output_dir, pg_bin, vanilla_pg, neon_env_build # Check it worked pg = env.postgres.create_start(node_name, tenant_id=tenant) assert pg.safe_psql('select count(*) from t') == [(30000000, )] + + +def test_import_from_pageserver(test_output_dir, pg_bin, vanilla_pg, neon_env_builder): + + num_rows = 3000 + neon_env_builder.num_safekeepers = 1 + neon_env_builder + env = neon_env_builder.init_start() + + env.neon_cli.create_branch('test_import_from_pageserver') + pgmain = env.postgres.create_start('test_import_from_pageserver') + log.info("postgres is running on 'test_import_from_pageserver' branch") + + timeline = pgmain.safe_psql("SHOW neon.timeline_id")[0][0] + + with closing(pgmain.connect()) as conn: + with conn.cursor() as cur: + # data loading may take a while, so increase statement timeout + cur.execute("SET statement_timeout='300s'") + cur.execute(f'''CREATE TABLE tbl AS SELECT 'long string to consume some space' || g + from generate_series(1,{num_rows}) g''') + cur.execute("CHECKPOINT") + + cur.execute('SELECT pg_current_wal_insert_lsn()') + lsn = cur.fetchone()[0] + log.info(f"start_backup_lsn = {lsn}") + + # Set LD_LIBRARY_PATH in the env properly, otherwise we may use the wrong libpq. + # PgBin sets it automatically, but here we need to pipe psql output to the tar command. + psql_env = {'LD_LIBRARY_PATH': os.path.join(str(pg_distrib_dir), 'lib')} + + # Get a fullbackup from pageserver + query = f"fullbackup { env.initial_tenant.hex} {timeline} {lsn}" + cmd = ["psql", "--no-psqlrc", env.pageserver.connstr(), "-c", query] + result_basepath = pg_bin.run_capture(cmd, env=psql_env) + tar_output_file = result_basepath + ".stdout" + + # Stop the first pageserver instance, erase all its data + env.postgres.stop_all() + env.pageserver.stop() + + dir_to_clear = Path(env.repo_dir) / 'tenants' + shutil.rmtree(dir_to_clear) + os.mkdir(dir_to_clear) + + #start the pageserver again + env.pageserver.start() + + # Import using another tenantid, because we use the same pageserver. + # TODO Create another pageserver to maeke test more realistic. + tenant = uuid4() + + # Import to pageserver + node_name = "import_from_pageserver" + env.neon_cli.create_tenant(tenant) + env.neon_cli.raw_cli([ + "timeline", + "import", + "--tenant-id", + tenant.hex, + "--timeline-id", + timeline, + "--node-name", + node_name, + "--base-lsn", + lsn, + "--base-tarfile", + os.path.join(tar_output_file), + ]) + + # Check it worked + pg = env.postgres.create_start(node_name, tenant_id=tenant) + assert pg.safe_psql('select count(*) from tbl') == [(num_rows, )]