basebackup import fixes (#1955)

This commit is contained in:
Anastasia Lubennikova
2022-06-17 22:29:32 +03:00
committed by GitHub
parent 0e556b2782
commit 11d7743b39
4 changed files with 119 additions and 25 deletions

View File

@@ -355,24 +355,21 @@ where
pg_control.checkPointCopy = checkpoint;
pg_control.state = pg_constants::DB_SHUTDOWNED;
// Postgres doesn't recognize the zenith.signal file and doesn't need it.
if !self.full_backup {
// add zenith.signal file
let mut zenith_signal = String::new();
if self.prev_record_lsn == Lsn(0) {
if self.lsn == self.timeline.tline.get_ancestor_lsn() {
write!(zenith_signal, "PREV LSN: none")?;
} else {
write!(zenith_signal, "PREV LSN: invalid")?;
}
// add zenith.signal file
let mut zenith_signal = String::new();
if self.prev_record_lsn == Lsn(0) {
if self.lsn == self.timeline.tline.get_ancestor_lsn() {
write!(zenith_signal, "PREV LSN: none")?;
} else {
write!(zenith_signal, "PREV LSN: {}", self.prev_record_lsn)?;
write!(zenith_signal, "PREV LSN: invalid")?;
}
self.ar.append(
&new_tar_header("zenith.signal", zenith_signal.len() as u64)?,
zenith_signal.as_bytes(),
)?;
} else {
write!(zenith_signal, "PREV LSN: {}", self.prev_record_lsn)?;
}
self.ar.append(
&new_tar_header("zenith.signal", zenith_signal.len() as u64)?,
zenith_signal.as_bytes(),
)?;
//send pg_control
let pg_control_bytes = pg_control.encode();

View File

@@ -14,6 +14,7 @@ use walkdir::WalkDir;
use crate::pgdatadir_mapping::*;
use crate::reltag::{RelTag, SlruKind};
use crate::repository::Repository;
use crate::repository::Timeline;
use crate::walingest::WalIngest;
use postgres_ffi::relfile_utils::*;
use postgres_ffi::waldecoder::*;
@@ -162,7 +163,6 @@ fn import_rel<R: Repository, Reader: Read>(
Ok(())
}
///
/// Import an SLRU segment file
///
fn import_slru<R: Repository, Reader: Read>(
@@ -295,6 +295,8 @@ pub fn import_basebackup_from_tar<R: Repository, Reader: Read>(
let mut modification = tline.begin_modification(base_lsn);
modification.init_empty()?;
let mut pg_control: Option<ControlFileData> = None;
// Import base
for base_tar_entry in tar::Archive::new(reader).entries()? {
let entry = base_tar_entry.unwrap();
@@ -307,7 +309,10 @@ pub fn import_basebackup_from_tar<R: Repository, Reader: Read>(
// let mut buffer = Vec::new();
// entry.read_to_end(&mut buffer).unwrap();
import_file(&mut modification, file_path.as_ref(), entry, len)?;
if let Some(res) = import_file(&mut modification, file_path.as_ref(), entry, len)? {
// We found the pg_control file.
pg_control = Some(res);
}
}
tar::EntryType::Directory => {
info!("directory {:?}", file_path);
@@ -321,6 +326,9 @@ pub fn import_basebackup_from_tar<R: Repository, Reader: Read>(
}
}
// sanity check: ensure that pg_control is loaded
let _pg_control = pg_control.context("pg_control file not found")?;
modification.commit()?;
Ok(())
}
@@ -485,12 +493,24 @@ pub fn import_file<R: Repository, Reader: Read>(
let bytes = read_all_bytes(reader)?;
modification.put_twophase_file(xid, Bytes::copy_from_slice(&bytes[..]))?;
info!("imported twophase file");
} else if file_path.starts_with("pg_wal") {
info!("found wal file in base section. ignore it");
} else if file_path.starts_with("zenith.signal") {
// Parse zenith signal file to set correct previous LSN
let bytes = read_all_bytes(reader)?;
// zenith.signal format is "PREV LSN: prev_lsn"
let zenith_signal = std::str::from_utf8(&bytes).unwrap();
let zenith_signal = zenith_signal.split(':').collect::<Vec<_>>();
let prev_lsn = zenith_signal[1].trim().parse::<Lsn>().unwrap();
let writer = modification.tline.tline.writer();
writer.finish_write(prev_lsn);
info!("imported zenith signal {}", prev_lsn);
} else if file_path.starts_with("pg_tblspc") {
// TODO Backups exported from neon won't have pg_tblspc, but we will need
// this to import arbitrary postgres databases.
bail!("Importing pg_tblspc is not implemented");
} else if file_path.starts_with("pg_wal") {
panic!("found wal file in base section");
} else {
info!("ignored");
}

View File

@@ -564,6 +564,12 @@ impl PageServerHandler {
// TODO leave clean state on error
// Import basebackup provided via CopyData
info!("importing basebackup");
pgb.write_message(&BeMessage::CopyInResponse)?;
let reader = CopyInReader::new(pgb);
import_basebackup_from_tar(&mut datadir_timeline, reader, base_lsn)?;
// Flush data to disk, then upload to s3
info!("flushing layers");
datadir_timeline.tline.checkpoint(CheckpointConfig::Flush)?;
@@ -571,12 +577,6 @@ impl PageServerHandler {
// TODO Wait for s3 upload to complete
// info!("uploading layers");
// Import basebackup provided via CopyData
info!("importing basebackup");
pgb.write_message(&BeMessage::CopyInResponse)?;
let reader = CopyInReader::new(pgb);
import_basebackup_from_tar(&mut datadir_timeline, reader, base_lsn)?;
info!("done");
Ok(())
}

View File

@@ -5,6 +5,10 @@ import os
import shutil
from pathlib import Path
import json
from fixtures.utils import subprocess_capture
from fixtures.log_helper import log
from contextlib import closing
from fixtures.neon_fixtures import pg_distrib_dir
def test_import_from_vanilla(test_output_dir, pg_bin, vanilla_pg, neon_env_builder):
@@ -66,3 +70,76 @@ def test_import_from_vanilla(test_output_dir, pg_bin, vanilla_pg, neon_env_build
# Check it worked
pg = env.postgres.create_start(node_name, tenant_id=tenant)
assert pg.safe_psql('select count(*) from t') == [(30000000, )]
def test_import_from_pageserver(test_output_dir, pg_bin, vanilla_pg, neon_env_builder):
num_rows = 3000
neon_env_builder.num_safekeepers = 1
neon_env_builder
env = neon_env_builder.init_start()
env.neon_cli.create_branch('test_import_from_pageserver')
pgmain = env.postgres.create_start('test_import_from_pageserver')
log.info("postgres is running on 'test_import_from_pageserver' branch")
timeline = pgmain.safe_psql("SHOW neon.timeline_id")[0][0]
with closing(pgmain.connect()) as conn:
with conn.cursor() as cur:
# data loading may take a while, so increase statement timeout
cur.execute("SET statement_timeout='300s'")
cur.execute(f'''CREATE TABLE tbl AS SELECT 'long string to consume some space' || g
from generate_series(1,{num_rows}) g''')
cur.execute("CHECKPOINT")
cur.execute('SELECT pg_current_wal_insert_lsn()')
lsn = cur.fetchone()[0]
log.info(f"start_backup_lsn = {lsn}")
# Set LD_LIBRARY_PATH in the env properly, otherwise we may use the wrong libpq.
# PgBin sets it automatically, but here we need to pipe psql output to the tar command.
psql_env = {'LD_LIBRARY_PATH': os.path.join(str(pg_distrib_dir), 'lib')}
# Get a fullbackup from pageserver
query = f"fullbackup { env.initial_tenant.hex} {timeline} {lsn}"
cmd = ["psql", "--no-psqlrc", env.pageserver.connstr(), "-c", query]
result_basepath = pg_bin.run_capture(cmd, env=psql_env)
tar_output_file = result_basepath + ".stdout"
# Stop the first pageserver instance, erase all its data
env.postgres.stop_all()
env.pageserver.stop()
dir_to_clear = Path(env.repo_dir) / 'tenants'
shutil.rmtree(dir_to_clear)
os.mkdir(dir_to_clear)
#start the pageserver again
env.pageserver.start()
# Import using another tenantid, because we use the same pageserver.
# TODO Create another pageserver to maeke test more realistic.
tenant = uuid4()
# Import to pageserver
node_name = "import_from_pageserver"
env.neon_cli.create_tenant(tenant)
env.neon_cli.raw_cli([
"timeline",
"import",
"--tenant-id",
tenant.hex,
"--timeline-id",
timeline,
"--node-name",
node_name,
"--base-lsn",
lsn,
"--base-tarfile",
os.path.join(tar_output_file),
])
# Check it worked
pg = env.postgres.create_start(node_name, tenant_id=tenant)
assert pg.safe_psql('select count(*) from tbl') == [(num_rows, )]