From 8ca3faa61e0ef33c48f9a7f14492235a68a03bda Mon Sep 17 00:00:00 2001 From: bojanserafimov Date: Tue, 21 Jun 2022 11:04:10 -0400 Subject: [PATCH] Import basebackup into pageserver (#1925) Allow importing basebackup taken from vanilla postgres or another pageserver via psql copy in protocol. --- Cargo.lock | 12 + control_plane/src/storage.rs | 53 ++- neon_local/src/main.rs | 53 ++- pageserver/Cargo.toml | 2 + pageserver/src/basebackup.rs | 4 +- pageserver/src/import_datadir.rs | 493 +++++++++++++++--------- pageserver/src/layered_repository.rs | 16 +- pageserver/src/page_service.rs | 231 ++++++++++- pageserver/src/pgdatadir_mapping.rs | 10 +- test_runner/batch_others/test_import.py | 193 ++++++++++ test_runner/fixtures/zenith_fixtures.py | 4 +- 11 files changed, 876 insertions(+), 195 deletions(-) create mode 100644 test_runner/batch_others/test_import.py diff --git a/Cargo.lock b/Cargo.lock index 6acad6dac8..1281734609 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -363,6 +363,16 @@ dependencies = [ "textwrap 0.14.2", ] +[[package]] +name = "close_fds" +version = "0.3.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3bc416f33de9d59e79e57560f450d21ff8393adcf1cdfc3e6d8fb93d5f88a2ed" +dependencies = [ + "cfg-if", + "libc", +] + [[package]] name = "cmake" version = "0.1.48" @@ -1789,6 +1799,7 @@ dependencies = [ "bytes", "chrono", "clap 3.0.14", + "close_fds", "const_format", "crc32c", "crossbeam-utils", @@ -1830,6 +1841,7 @@ dependencies = [ "tracing", "url", "utils", + "walkdir", "workspace_hack", ] diff --git a/control_plane/src/storage.rs b/control_plane/src/storage.rs index 355c7c250d..1cb19abf39 100644 --- a/control_plane/src/storage.rs +++ b/control_plane/src/storage.rs @@ -1,5 +1,6 @@ use std::collections::HashMap; -use std::io::Write; +use std::fs::File; +use std::io::{BufReader, Write}; use std::net::TcpStream; use std::path::PathBuf; use std::process::Command; @@ -492,6 +493,56 @@ impl PageServerNode { Ok(timeline_info_response) } + + /// Import a basebackup prepared using either: + /// a) `pg_basebackup -F tar`, or + /// b) The `fullbackup` pageserver endpoint + /// + /// # Arguments + /// * `tenant_id` - tenant to import into. Created if not exists + /// * `timeline_id` - id to assign to imported timeline + /// * `base` - (start lsn of basebackup, path to `base.tar` file) + /// * `pg_wal` - if there's any wal to import: (end lsn, path to `pg_wal.tar`) + pub fn timeline_import( + &self, + tenant_id: ZTenantId, + timeline_id: ZTimelineId, + base: (Lsn, PathBuf), + pg_wal: Option<(Lsn, PathBuf)>, + ) -> anyhow::Result<()> { + let mut client = self.pg_connection_config.connect(NoTls).unwrap(); + + // Init base reader + let (start_lsn, base_tarfile_path) = base; + let base_tarfile = File::open(base_tarfile_path)?; + let mut base_reader = BufReader::new(base_tarfile); + + // Init wal reader if necessary + let (end_lsn, wal_reader) = if let Some((end_lsn, wal_tarfile_path)) = pg_wal { + let wal_tarfile = File::open(wal_tarfile_path)?; + let wal_reader = BufReader::new(wal_tarfile); + (end_lsn, Some(wal_reader)) + } else { + (start_lsn, None) + }; + + // Import base + let import_cmd = + format!("import basebackup {tenant_id} {timeline_id} {start_lsn} {end_lsn}"); + let mut writer = client.copy_in(&import_cmd)?; + io::copy(&mut base_reader, &mut writer)?; + writer.finish()?; + + // Import wal if necessary + if let Some(mut wal_reader) = wal_reader { + let import_cmd = format!("import wal {tenant_id} {timeline_id} {start_lsn} {end_lsn}"); + let mut writer = client.copy_in(&import_cmd)?; + io::copy(&mut wal_reader, &mut writer)?; + writer.finish()?; + } + + Ok(()) + } } fn fill_aws_secrets_vars(mut cmd: &mut Command) -> &mut Command { diff --git a/neon_local/src/main.rs b/neon_local/src/main.rs index f04af9cfdd..86ed98e34c 100644 --- a/neon_local/src/main.rs +++ b/neon_local/src/main.rs @@ -14,7 +14,7 @@ use safekeeper::defaults::{ DEFAULT_PG_LISTEN_PORT as DEFAULT_SAFEKEEPER_PG_PORT, }; use std::collections::{BTreeSet, HashMap}; -use std::path::Path; +use std::path::{Path, PathBuf}; use std::process::exit; use std::str::FromStr; use utils::{ @@ -159,6 +159,20 @@ fn main() -> Result<()> { .about("Create a new blank timeline") .arg(tenant_id_arg.clone()) .arg(branch_name_arg.clone())) + .subcommand(App::new("import") + .about("Import timeline from basebackup directory") + .arg(tenant_id_arg.clone()) + .arg(timeline_id_arg.clone()) + .arg(Arg::new("node-name").long("node-name").takes_value(true) + .help("Name to assign to the imported timeline")) + .arg(Arg::new("base-tarfile").long("base-tarfile").takes_value(true) + .help("Basebackup tarfile to import")) + .arg(Arg::new("base-lsn").long("base-lsn").takes_value(true) + .help("Lsn the basebackup starts at")) + .arg(Arg::new("wal-tarfile").long("wal-tarfile").takes_value(true) + .help("Wal to add after base")) + .arg(Arg::new("end-lsn").long("end-lsn").takes_value(true) + .help("Lsn the basebackup ends at"))) ).subcommand( App::new("tenant") .setting(AppSettings::ArgRequiredElseHelp) @@ -613,6 +627,43 @@ fn handle_timeline(timeline_match: &ArgMatches, env: &mut local_env::LocalEnv) - timeline.timeline_id, last_record_lsn, tenant_id, ); } + Some(("import", import_match)) => { + let tenant_id = get_tenant_id(import_match, env)?; + let timeline_id = parse_timeline_id(import_match)?.expect("No timeline id provided"); + let name = import_match + .value_of("node-name") + .ok_or_else(|| anyhow!("No node name provided"))?; + + // Parse base inputs + let base_tarfile = import_match + .value_of("base-tarfile") + .map(|s| PathBuf::from_str(s).unwrap()) + .ok_or_else(|| anyhow!("No base-tarfile provided"))?; + let base_lsn = Lsn::from_str( + import_match + .value_of("base-lsn") + .ok_or_else(|| anyhow!("No base-lsn provided"))?, + )?; + let base = (base_lsn, base_tarfile); + + // Parse pg_wal inputs + let wal_tarfile = import_match + .value_of("wal-tarfile") + .map(|s| PathBuf::from_str(s).unwrap()); + let end_lsn = import_match + .value_of("end-lsn") + .map(|s| Lsn::from_str(s).unwrap()); + // TODO validate both or none are provided + let pg_wal = end_lsn.zip(wal_tarfile); + + let mut cplane = ComputeControlPlane::load(env.clone())?; + println!("Importing timeline into pageserver ..."); + pageserver.timeline_import(tenant_id, timeline_id, base, pg_wal)?; + println!("Creating node for imported timeline ..."); + env.register_branch_mapping(name.to_string(), tenant_id, timeline_id)?; + cplane.new_node(tenant_id, name, timeline_id, None, None)?; + println!("Done"); + } Some(("branch", branch_match)) => { let tenant_id = get_tenant_id(branch_match, env)?; let new_branch_name = branch_match diff --git a/pageserver/Cargo.toml b/pageserver/Cargo.toml index 290f52e0b2..1652356afa 100644 --- a/pageserver/Cargo.toml +++ b/pageserver/Cargo.toml @@ -60,6 +60,8 @@ metrics = { path = "../libs/metrics" } utils = { path = "../libs/utils" } remote_storage = { path = "../libs/remote_storage" } workspace_hack = { version = "0.1", path = "../workspace_hack" } +close_fds = "0.3.2" +walkdir = "2.3.2" [dev-dependencies] hex-literal = "0.3" diff --git a/pageserver/src/basebackup.rs b/pageserver/src/basebackup.rs index 92d35130d8..6c1dc29a43 100644 --- a/pageserver/src/basebackup.rs +++ b/pageserver/src/basebackup.rs @@ -97,7 +97,9 @@ impl<'a> Basebackup<'a> { }) } - pub fn send_tarball(&mut self) -> anyhow::Result<()> { + pub fn send_tarball(mut self) -> anyhow::Result<()> { + // TODO include checksum + // Create pgdata subdirs structure for dir in pg_constants::PGDATA_SUBDIRS.iter() { let header = new_tar_header_dir(*dir)?; diff --git a/pageserver/src/import_datadir.rs b/pageserver/src/import_datadir.rs index 703ee8f1b1..3ede949885 100644 --- a/pageserver/src/import_datadir.rs +++ b/pageserver/src/import_datadir.rs @@ -2,7 +2,6 @@ //! Import data and WAL from a PostgreSQL data directory and WAL segments into //! a zenith Timeline. //! -use std::fs; use std::fs::File; use std::io::{Read, Seek, SeekFrom}; use std::path::{Path, PathBuf}; @@ -10,16 +9,18 @@ use std::path::{Path, PathBuf}; use anyhow::{bail, ensure, Context, Result}; use bytes::Bytes; use tracing::*; +use walkdir::WalkDir; use crate::pgdatadir_mapping::*; use crate::reltag::{RelTag, SlruKind}; use crate::repository::Repository; +use crate::repository::Timeline; use crate::walingest::WalIngest; use postgres_ffi::relfile_utils::*; use postgres_ffi::waldecoder::*; use postgres_ffi::xlog_utils::*; +use postgres_ffi::Oid; use postgres_ffi::{pg_constants, ControlFileData, DBState_DB_SHUTDOWNED}; -use postgres_ffi::{Oid, TransactionId}; use utils::lsn::Lsn; /// @@ -35,100 +36,29 @@ pub fn import_timeline_from_postgres_datadir( ) -> Result<()> { let mut pg_control: Option = None; + // TODO this shoud be start_lsn, which is not necessarily equal to end_lsn (aka lsn) + // Then fishing out pg_control would be unnecessary let mut modification = tline.begin_modification(lsn); modification.init_empty()?; - // Scan 'global' - let mut relfiles: Vec = Vec::new(); - for direntry in fs::read_dir(path.join("global"))? { - let direntry = direntry?; - match direntry.file_name().to_str() { - None => continue, + // Import all but pg_wal + let all_but_wal = WalkDir::new(path) + .into_iter() + .filter_entry(|entry| !entry.path().ends_with("pg_wal")); + for entry in all_but_wal { + let entry = entry?; + let metadata = entry.metadata().expect("error getting dir entry metadata"); + if metadata.is_file() { + let absolute_path = entry.path(); + let relative_path = absolute_path.strip_prefix(path)?; - Some("pg_control") => { - pg_control = Some(import_control_file(&mut modification, &direntry.path())?); - } - Some("pg_filenode.map") => { - import_relmap_file( - &mut modification, - pg_constants::GLOBALTABLESPACE_OID, - 0, - &direntry.path(), - )?; - } - - // Load any relation files into the page server (but only after the other files) - _ => relfiles.push(direntry.path()), - } - } - for relfile in relfiles { - import_relfile( - &mut modification, - &relfile, - pg_constants::GLOBALTABLESPACE_OID, - 0, - )?; - } - - // Scan 'base'. It contains database dirs, the database OID is the filename. - // E.g. 'base/12345', where 12345 is the database OID. - for direntry in fs::read_dir(path.join("base"))? { - let direntry = direntry?; - - //skip all temporary files - if direntry.file_name().to_string_lossy() == "pgsql_tmp" { - continue; - } - - let dboid = direntry.file_name().to_string_lossy().parse::()?; - - let mut relfiles: Vec = Vec::new(); - for direntry in fs::read_dir(direntry.path())? { - let direntry = direntry?; - match direntry.file_name().to_str() { - None => continue, - - Some("PG_VERSION") => { - //modification.put_dbdir_creation(pg_constants::DEFAULTTABLESPACE_OID, dboid)?; - } - Some("pg_filenode.map") => import_relmap_file( - &mut modification, - pg_constants::DEFAULTTABLESPACE_OID, - dboid, - &direntry.path(), - )?, - - // Load any relation files into the page server - _ => relfiles.push(direntry.path()), + let file = File::open(absolute_path)?; + let len = metadata.len() as usize; + if let Some(control_file) = import_file(&mut modification, relative_path, file, len)? { + pg_control = Some(control_file); } } - for relfile in relfiles { - import_relfile( - &mut modification, - &relfile, - pg_constants::DEFAULTTABLESPACE_OID, - dboid, - )?; - } } - for entry in fs::read_dir(path.join("pg_xact"))? { - let entry = entry?; - import_slru_file(&mut modification, SlruKind::Clog, &entry.path())?; - } - for entry in fs::read_dir(path.join("pg_multixact").join("members"))? { - let entry = entry?; - import_slru_file(&mut modification, SlruKind::MultiXactMembers, &entry.path())?; - } - for entry in fs::read_dir(path.join("pg_multixact").join("offsets"))? { - let entry = entry?; - import_slru_file(&mut modification, SlruKind::MultiXactOffsets, &entry.path())?; - } - for entry in fs::read_dir(path.join("pg_twophase"))? { - let entry = entry?; - let xid = u32::from_str_radix(&entry.path().to_string_lossy(), 16)?; - import_twophase_file(&mut modification, xid, &entry.path())?; - } - // TODO: Scan pg_tblspc // We're done importing all the data files. modification.commit()?; @@ -158,31 +88,30 @@ pub fn import_timeline_from_postgres_datadir( } // subroutine of import_timeline_from_postgres_datadir(), to load one relation file. -fn import_relfile( +fn import_rel( modification: &mut DatadirModification, path: &Path, spcoid: Oid, dboid: Oid, + mut reader: Reader, + len: usize, ) -> anyhow::Result<()> { // Does it look like a relation file? trace!("importing rel file {}", path.display()); - let (relnode, forknum, segno) = parse_relfilename(&path.file_name().unwrap().to_string_lossy()) - .map_err(|e| { - warn!("unrecognized file in postgres datadir: {:?} ({})", path, e); - e - })?; + let filename = &path + .file_name() + .expect("missing rel filename") + .to_string_lossy(); + let (relnode, forknum, segno) = parse_relfilename(filename).map_err(|e| { + warn!("unrecognized file in postgres datadir: {:?} ({})", path, e); + e + })?; - let mut file = File::open(path)?; let mut buf: [u8; 8192] = [0u8; 8192]; - let len = file.metadata().unwrap().len(); - ensure!(len % pg_constants::BLCKSZ as u64 == 0); - let nblocks = len / pg_constants::BLCKSZ as u64; - - if segno != 0 { - todo!(); - } + ensure!(len % pg_constants::BLCKSZ as usize == 0); + let nblocks = len / pg_constants::BLCKSZ as usize; let rel = RelTag { spcnode: spcoid, @@ -190,11 +119,22 @@ fn import_relfile( relnode, forknum, }; - modification.put_rel_creation(rel, nblocks as u32)?; let mut blknum: u32 = segno * (1024 * 1024 * 1024 / pg_constants::BLCKSZ as u32); + + // Call put_rel_creation for every segment of the relation, + // because there is no guarantee about the order in which we are processing segments. + // ignore "relation already exists" error + if let Err(e) = modification.put_rel_creation(rel, nblocks as u32) { + if e.to_string().contains("already exists") { + debug!("relation {} already exists. we must be extending it", rel); + } else { + return Err(e); + } + } + loop { - let r = file.read_exact(&mut buf); + let r = reader.read_exact(&mut buf); match r { Ok(_) => { modification.put_rel_page_image(rel, blknum, Bytes::copy_from_slice(&buf))?; @@ -204,7 +144,9 @@ fn import_relfile( Err(err) => match err.kind() { std::io::ErrorKind::UnexpectedEof => { // reached EOF. That's expected. - ensure!(blknum == nblocks as u32, "unexpected EOF"); + let relative_blknum = + blknum - segno * (1024 * 1024 * 1024 / pg_constants::BLCKSZ as u32); + ensure!(relative_blknum == nblocks as u32, "unexpected EOF"); break; } _ => { @@ -215,96 +157,43 @@ fn import_relfile( blknum += 1; } + // Update relation size + // + // If we process rel segments out of order, + // put_rel_extend will skip the update. + modification.put_rel_extend(rel, blknum)?; + Ok(()) } -/// Import a relmapper (pg_filenode.map) file into the repository -fn import_relmap_file( - modification: &mut DatadirModification, - spcnode: Oid, - dbnode: Oid, - path: &Path, -) -> Result<()> { - let mut file = File::open(path)?; - let mut buffer = Vec::new(); - // read the whole file - file.read_to_end(&mut buffer)?; - - trace!("importing relmap file {}", path.display()); - - modification.put_relmap_file(spcnode, dbnode, Bytes::copy_from_slice(&buffer[..]))?; - Ok(()) -} - -/// Import a twophase state file (pg_twophase/) into the repository -fn import_twophase_file( - modification: &mut DatadirModification, - xid: TransactionId, - path: &Path, -) -> Result<()> { - let mut file = File::open(path)?; - let mut buffer = Vec::new(); - // read the whole file - file.read_to_end(&mut buffer)?; - - trace!("importing non-rel file {}", path.display()); - - modification.put_twophase_file(xid, Bytes::copy_from_slice(&buffer[..]))?; - Ok(()) -} - -/// -/// Import pg_control file into the repository. -/// -/// The control file is imported as is, but we also extract the checkpoint record -/// from it and store it separated. -fn import_control_file( - modification: &mut DatadirModification, - path: &Path, -) -> Result { - let mut file = File::open(path)?; - let mut buffer = Vec::new(); - // read the whole file - file.read_to_end(&mut buffer)?; - - trace!("importing control file {}", path.display()); - - // Import it as ControlFile - modification.put_control_file(Bytes::copy_from_slice(&buffer[..]))?; - - // Extract the checkpoint record and import it separately. - let pg_control = ControlFileData::decode(&buffer)?; - let checkpoint_bytes = pg_control.checkPointCopy.encode()?; - modification.put_checkpoint(checkpoint_bytes)?; - - Ok(pg_control) -} - -/// /// Import an SLRU segment file /// -fn import_slru_file( +fn import_slru( modification: &mut DatadirModification, slru: SlruKind, path: &Path, + mut reader: Reader, + len: usize, ) -> Result<()> { trace!("importing slru file {}", path.display()); - let mut file = File::open(path)?; let mut buf: [u8; 8192] = [0u8; 8192]; - let segno = u32::from_str_radix(&path.file_name().unwrap().to_string_lossy(), 16)?; + let filename = &path + .file_name() + .expect("missing slru filename") + .to_string_lossy(); + let segno = u32::from_str_radix(filename, 16)?; - let len = file.metadata().unwrap().len(); - ensure!(len % pg_constants::BLCKSZ as u64 == 0); // we assume SLRU block size is the same as BLCKSZ - let nblocks = len / pg_constants::BLCKSZ as u64; + ensure!(len % pg_constants::BLCKSZ as usize == 0); // we assume SLRU block size is the same as BLCKSZ + let nblocks = len / pg_constants::BLCKSZ as usize; - ensure!(nblocks <= pg_constants::SLRU_PAGES_PER_SEGMENT as u64); + ensure!(nblocks <= pg_constants::SLRU_PAGES_PER_SEGMENT as usize); modification.put_slru_segment_creation(slru, segno, nblocks as u32)?; let mut rpageno = 0; loop { - let r = file.read_exact(&mut buf); + let r = reader.read_exact(&mut buf); match r { Ok(_) => { modification.put_slru_page_image( @@ -396,10 +285,258 @@ fn import_wal( } if last_lsn != startpoint { - debug!("reached end of WAL at {}", last_lsn); + info!("reached end of WAL at {}", last_lsn); } else { info!("no WAL to import at {}", last_lsn); } Ok(()) } + +pub fn import_basebackup_from_tar( + tline: &mut DatadirTimeline, + reader: Reader, + base_lsn: Lsn, +) -> Result<()> { + info!("importing base at {}", base_lsn); + let mut modification = tline.begin_modification(base_lsn); + modification.init_empty()?; + + let mut pg_control: Option = None; + + // Import base + for base_tar_entry in tar::Archive::new(reader).entries()? { + let entry = base_tar_entry?; + let header = entry.header(); + let len = header.entry_size()? as usize; + let file_path = header.path()?.into_owned(); + + match header.entry_type() { + tar::EntryType::Regular => { + if let Some(res) = import_file(&mut modification, file_path.as_ref(), entry, len)? { + // We found the pg_control file. + pg_control = Some(res); + } + } + tar::EntryType::Directory => { + debug!("directory {:?}", file_path); + } + _ => { + panic!("tar::EntryType::?? {}", file_path.display()); + } + } + } + + // sanity check: ensure that pg_control is loaded + let _pg_control = pg_control.context("pg_control file not found")?; + + modification.commit()?; + Ok(()) +} + +pub fn import_wal_from_tar( + tline: &mut DatadirTimeline, + reader: Reader, + start_lsn: Lsn, + end_lsn: Lsn, +) -> Result<()> { + // Set up walingest mutable state + let mut waldecoder = WalStreamDecoder::new(start_lsn); + let mut segno = start_lsn.segment_number(pg_constants::WAL_SEGMENT_SIZE); + let mut offset = start_lsn.segment_offset(pg_constants::WAL_SEGMENT_SIZE); + let mut last_lsn = start_lsn; + let mut walingest = WalIngest::new(tline, start_lsn)?; + + // Ingest wal until end_lsn + info!("importing wal until {}", end_lsn); + let mut pg_wal_tar = tar::Archive::new(reader); + let mut pg_wal_entries_iter = pg_wal_tar.entries()?; + while last_lsn <= end_lsn { + let bytes = { + let entry = pg_wal_entries_iter.next().expect("expected more wal")?; + let header = entry.header(); + let file_path = header.path()?.into_owned(); + + match header.entry_type() { + tar::EntryType::Regular => { + // FIXME: assume postgresql tli 1 for now + let expected_filename = XLogFileName(1, segno, pg_constants::WAL_SEGMENT_SIZE); + let file_name = file_path + .file_name() + .expect("missing wal filename") + .to_string_lossy(); + ensure!(expected_filename == file_name); + + debug!("processing wal file {:?}", file_path); + read_all_bytes(entry)? + } + tar::EntryType::Directory => { + debug!("directory {:?}", file_path); + continue; + } + _ => { + panic!("tar::EntryType::?? {}", file_path.display()); + } + } + }; + + waldecoder.feed_bytes(&bytes[offset..]); + + while last_lsn <= end_lsn { + if let Some((lsn, recdata)) = waldecoder.poll_decode()? { + walingest.ingest_record(tline, recdata, lsn)?; + last_lsn = lsn; + + debug!("imported record at {} (end {})", lsn, end_lsn); + } + } + + debug!("imported records up to {}", last_lsn); + segno += 1; + offset = 0; + } + + if last_lsn != start_lsn { + info!("reached end of WAL at {}", last_lsn); + } else { + info!("there was no WAL to import at {}", last_lsn); + } + + // Log any extra unused files + for e in &mut pg_wal_entries_iter { + let entry = e?; + let header = entry.header(); + let file_path = header.path()?.into_owned(); + info!("skipping {:?}", file_path); + } + + Ok(()) +} + +pub fn import_file( + modification: &mut DatadirModification, + file_path: &Path, + reader: Reader, + len: usize, +) -> Result> { + debug!("looking at {:?}", file_path); + + if file_path.starts_with("global") { + let spcnode = pg_constants::GLOBALTABLESPACE_OID; + let dbnode = 0; + + match file_path + .file_name() + .expect("missing filename") + .to_string_lossy() + .as_ref() + { + "pg_control" => { + let bytes = read_all_bytes(reader)?; + + // Extract the checkpoint record and import it separately. + let pg_control = ControlFileData::decode(&bytes[..])?; + let checkpoint_bytes = pg_control.checkPointCopy.encode()?; + modification.put_checkpoint(checkpoint_bytes)?; + debug!("imported control file"); + + // Import it as ControlFile + modification.put_control_file(bytes)?; + return Ok(Some(pg_control)); + } + "pg_filenode.map" => { + let bytes = read_all_bytes(reader)?; + modification.put_relmap_file(spcnode, dbnode, bytes)?; + debug!("imported relmap file") + } + "PG_VERSION" => { + debug!("ignored"); + } + _ => { + import_rel(modification, file_path, spcnode, dbnode, reader, len)?; + debug!("imported rel creation"); + } + } + } else if file_path.starts_with("base") { + let spcnode = pg_constants::DEFAULTTABLESPACE_OID; + let dbnode: u32 = file_path + .iter() + .nth(1) + .expect("invalid file path, expected dbnode") + .to_string_lossy() + .parse()?; + + match file_path + .file_name() + .expect("missing base filename") + .to_string_lossy() + .as_ref() + { + "pg_filenode.map" => { + let bytes = read_all_bytes(reader)?; + modification.put_relmap_file(spcnode, dbnode, bytes)?; + debug!("imported relmap file") + } + "PG_VERSION" => { + debug!("ignored"); + } + _ => { + import_rel(modification, file_path, spcnode, dbnode, reader, len)?; + debug!("imported rel creation"); + } + } + } else if file_path.starts_with("pg_xact") { + let slru = SlruKind::Clog; + + import_slru(modification, slru, file_path, reader, len)?; + debug!("imported clog slru"); + } else if file_path.starts_with("pg_multixact/offsets") { + let slru = SlruKind::MultiXactOffsets; + + import_slru(modification, slru, file_path, reader, len)?; + debug!("imported multixact offsets slru"); + } else if file_path.starts_with("pg_multixact/members") { + let slru = SlruKind::MultiXactMembers; + + import_slru(modification, slru, file_path, reader, len)?; + debug!("imported multixact members slru"); + } else if file_path.starts_with("pg_twophase") { + let file_name = &file_path + .file_name() + .expect("missing twophase filename") + .to_string_lossy(); + let xid = u32::from_str_radix(file_name, 16)?; + + let bytes = read_all_bytes(reader)?; + modification.put_twophase_file(xid, Bytes::copy_from_slice(&bytes[..]))?; + debug!("imported twophase file"); + } else if file_path.starts_with("pg_wal") { + debug!("found wal file in base section. ignore it"); + } else if file_path.starts_with("zenith.signal") { + // Parse zenith signal file to set correct previous LSN + let bytes = read_all_bytes(reader)?; + // zenith.signal format is "PREV LSN: prev_lsn" + let zenith_signal = std::str::from_utf8(&bytes)?; + let zenith_signal = zenith_signal.split(':').collect::>(); + let prev_lsn = zenith_signal[1].trim().parse::()?; + + let writer = modification.tline.tline.writer(); + writer.finish_write(prev_lsn); + + debug!("imported zenith signal {}", prev_lsn); + } else if file_path.starts_with("pg_tblspc") { + // TODO Backups exported from neon won't have pg_tblspc, but we will need + // this to import arbitrary postgres databases. + bail!("Importing pg_tblspc is not implemented"); + } else { + debug!("ignored"); + } + + Ok(None) +} + +fn read_all_bytes(mut reader: Reader) -> Result { + let mut buf: Vec = vec![]; + reader.read_to_end(&mut buf)?; + Ok(Bytes::copy_from_slice(&buf[..])) +} diff --git a/pageserver/src/layered_repository.rs b/pageserver/src/layered_repository.rs index a83907430e..f001ca0964 100644 --- a/pageserver/src/layered_repository.rs +++ b/pageserver/src/layered_repository.rs @@ -242,15 +242,15 @@ impl Repository for LayeredRepository { ); timeline.layers.write().unwrap().next_open_layer_at = Some(initdb_lsn); + // Insert if not exists let timeline = Arc::new(timeline); - let r = timelines.insert( - timelineid, - LayeredTimelineEntry::Loaded(Arc::clone(&timeline)), - ); - ensure!( - r.is_none(), - "assertion failure, inserted duplicate timeline" - ); + match timelines.entry(timelineid) { + Entry::Occupied(_) => bail!("Timeline already exists"), + Entry::Vacant(vacant) => { + vacant.insert(LayeredTimelineEntry::Loaded(Arc::clone(&timeline))) + } + }; + Ok(timeline) } diff --git a/pageserver/src/page_service.rs b/pageserver/src/page_service.rs index 03264c9782..0a9f8d1057 100644 --- a/pageserver/src/page_service.rs +++ b/pageserver/src/page_service.rs @@ -14,7 +14,7 @@ use anyhow::{bail, ensure, Context, Result}; use bytes::{Buf, BufMut, Bytes, BytesMut}; use lazy_static::lazy_static; use regex::Regex; -use std::io; +use std::io::{self, Read}; use std::net::TcpListener; use std::str; use std::str::FromStr; @@ -30,6 +30,8 @@ use utils::{ use crate::basebackup; use crate::config::{PageServerConf, ProfilingConfig}; +use crate::import_datadir::{import_basebackup_from_tar, import_wal_from_tar}; +use crate::layered_repository::LayeredRepository; use crate::pgdatadir_mapping::{DatadirTimeline, LsnForTimestamp}; use crate::profiling::profpoint_start; use crate::reltag::RelTag; @@ -202,6 +204,96 @@ impl PagestreamBeMessage { } } +/// Implements Read for the server side of CopyIn +struct CopyInReader<'a> { + pgb: &'a mut PostgresBackend, + + /// Overflow buffer for bytes sent in CopyData messages + /// that the reader (caller of read) hasn't asked for yet. + /// TODO use BytesMut? + buf: Vec, + + /// Bytes before `buf_begin` are considered as dropped. + /// This allows us to implement O(1) pop_front on Vec. + /// The Vec won't grow large because we only add to it + /// when it's empty. + buf_begin: usize, +} + +impl<'a> CopyInReader<'a> { + // NOTE: pgb should be in copy in state already + fn new(pgb: &'a mut PostgresBackend) -> Self { + Self { + pgb, + buf: Vec::<_>::new(), + buf_begin: 0, + } + } +} + +impl<'a> Drop for CopyInReader<'a> { + fn drop(&mut self) { + // Finalize copy protocol so that self.pgb can be reused + // TODO instead, maybe take ownership of pgb and give it back at the end + let mut buf: Vec = vec![]; + let _ = self.read_to_end(&mut buf); + } +} + +impl<'a> Read for CopyInReader<'a> { + fn read(&mut self, buf: &mut [u8]) -> io::Result { + while !thread_mgr::is_shutdown_requested() { + // Return from buffer if nonempty + if self.buf_begin < self.buf.len() { + let bytes_to_read = std::cmp::min(buf.len(), self.buf.len() - self.buf_begin); + buf[..bytes_to_read].copy_from_slice(&self.buf[self.buf_begin..][..bytes_to_read]); + self.buf_begin += bytes_to_read; + return Ok(bytes_to_read); + } + + // Delete garbage + self.buf.clear(); + self.buf_begin = 0; + + // Wait for client to send CopyData bytes + match self.pgb.read_message() { + Ok(Some(message)) => { + let copy_data_bytes = match message { + FeMessage::CopyData(bytes) => bytes, + FeMessage::CopyDone => return Ok(0), + FeMessage::Sync => continue, + m => { + let msg = format!("unexpected message {:?}", m); + self.pgb.write_message(&BeMessage::ErrorResponse(&msg))?; + return Err(io::Error::new(io::ErrorKind::Other, msg)); + } + }; + + // Return as much as we can, saving the rest in self.buf + let mut reader = copy_data_bytes.reader(); + let bytes_read = reader.read(buf)?; + reader.read_to_end(&mut self.buf)?; + return Ok(bytes_read); + } + Ok(None) => { + let msg = "client closed connection"; + self.pgb.write_message(&BeMessage::ErrorResponse(msg))?; + return Err(io::Error::new(io::ErrorKind::Other, msg)); + } + Err(e) => { + if !is_socket_read_timed_out(&e) { + return Err(io::Error::new(io::ErrorKind::Other, e)); + } + } + } + } + + // Shutting down + let msg = "Importer thread was shut down"; + Err(io::Error::new(io::ErrorKind::Other, msg)) + } +} + /////////////////////////////////////////////////////////////////////////////// /// @@ -423,6 +515,98 @@ impl PageServerHandler { Ok(()) } + fn handle_import_basebackup( + &self, + pgb: &mut PostgresBackend, + tenant_id: ZTenantId, + timeline_id: ZTimelineId, + base_lsn: Lsn, + _end_lsn: Lsn, + ) -> anyhow::Result<()> { + thread_mgr::associate_with(Some(tenant_id), Some(timeline_id)); + let _enter = + info_span!("import basebackup", timeline = %timeline_id, tenant = %tenant_id).entered(); + + // Create empty timeline + info!("creating new timeline"); + let repo = tenant_mgr::get_repository_for_tenant(tenant_id)?; + let timeline = repo.create_empty_timeline(timeline_id, Lsn(0))?; + let repartition_distance = repo.get_checkpoint_distance(); + let mut datadir_timeline = + DatadirTimeline::::new(timeline, repartition_distance); + + // TODO mark timeline as not ready until it reaches end_lsn. + // We might have some wal to import as well, and we should prevent compute + // from connecting before that and writing conflicting wal. + // + // This is not relevant for pageserver->pageserver migrations, since there's + // no wal to import. But should be fixed if we want to import from postgres. + + // TODO leave clean state on error. For now you can use detach to clean + // up broken state from a failed import. + + // Import basebackup provided via CopyData + info!("importing basebackup"); + pgb.write_message(&BeMessage::CopyInResponse)?; + let reader = CopyInReader::new(pgb); + import_basebackup_from_tar(&mut datadir_timeline, reader, base_lsn)?; + + // TODO check checksum + // Meanwhile you can verify client-side by taking fullbackup + // and checking that it matches in size with what was imported. + // It wouldn't work if base came from vanilla postgres though, + // since we discard some log files. + + // Flush data to disk, then upload to s3 + info!("flushing layers"); + datadir_timeline.tline.checkpoint(CheckpointConfig::Flush)?; + + info!("done"); + Ok(()) + } + + fn handle_import_wal( + &self, + pgb: &mut PostgresBackend, + tenant_id: ZTenantId, + timeline_id: ZTimelineId, + start_lsn: Lsn, + end_lsn: Lsn, + ) -> anyhow::Result<()> { + thread_mgr::associate_with(Some(tenant_id), Some(timeline_id)); + let _enter = + info_span!("import wal", timeline = %timeline_id, tenant = %tenant_id).entered(); + + let repo = tenant_mgr::get_repository_for_tenant(tenant_id)?; + let timeline = repo.get_timeline_load(timeline_id)?; + ensure!(timeline.get_last_record_lsn() == start_lsn); + + let repartition_distance = repo.get_checkpoint_distance(); + let mut datadir_timeline = + DatadirTimeline::::new(timeline, repartition_distance); + + // TODO leave clean state on error. For now you can use detach to clean + // up broken state from a failed import. + + // Import wal provided via CopyData + info!("importing wal"); + pgb.write_message(&BeMessage::CopyInResponse)?; + let reader = CopyInReader::new(pgb); + import_wal_from_tar(&mut datadir_timeline, reader, start_lsn, end_lsn)?; + + // TODO Does it make sense to overshoot? + ensure!(datadir_timeline.tline.get_last_record_lsn() >= end_lsn); + + // Flush data to disk, then upload to s3. No need for a forced checkpoint. + // We only want to persist the data, and it doesn't matter if it's in the + // shape of deltas or images. + info!("flushing layers"); + datadir_timeline.tline.checkpoint(CheckpointConfig::Flush)?; + + info!("done"); + Ok(()) + } + /// Helper function to handle the LSN from client request. /// /// Each GetPage (and Exists and Nblocks) request includes information about @@ -718,6 +902,51 @@ impl postgres_backend::Handler for PageServerHandler { walreceiver::launch_wal_receiver(self.conf, tenantid, timelineid, &connstr)?; pgb.write_message_noflush(&BeMessage::CommandComplete(b"SELECT 1"))?; + } else if query_string.starts_with("import basebackup ") { + // Import the `base` section (everything but the wal) of a basebackup. + // Assumes the tenant already exists on this pageserver. + // + // Files are scheduled to be persisted to remote storage, and the + // caller should poll the http api to check when that is done. + // + // Example import command: + // 1. Get start/end LSN from backup_manifest file + // 2. Run: + // cat my_backup/base.tar | psql -h $PAGESERVER \ + // -c "import basebackup $TENANT $TIMELINE $START_LSN $END_LSN" + let (_, params_raw) = query_string.split_at("import basebackup ".len()); + let params = params_raw.split_whitespace().collect::>(); + ensure!(params.len() == 4); + let tenant = ZTenantId::from_str(params[0])?; + let timeline = ZTimelineId::from_str(params[1])?; + let base_lsn = Lsn::from_str(params[2])?; + let end_lsn = Lsn::from_str(params[3])?; + + self.check_permission(Some(tenant))?; + + match self.handle_import_basebackup(pgb, tenant, timeline, base_lsn, end_lsn) { + Ok(()) => pgb.write_message_noflush(&BeMessage::CommandComplete(b"SELECT 1"))?, + Err(e) => pgb.write_message_noflush(&BeMessage::ErrorResponse(&e.to_string()))?, + }; + } else if query_string.starts_with("import wal ") { + // Import the `pg_wal` section of a basebackup. + // + // Files are scheduled to be persisted to remote storage, and the + // caller should poll the http api to check when that is done. + let (_, params_raw) = query_string.split_at("import wal ".len()); + let params = params_raw.split_whitespace().collect::>(); + ensure!(params.len() == 4); + let tenant = ZTenantId::from_str(params[0])?; + let timeline = ZTimelineId::from_str(params[1])?; + let start_lsn = Lsn::from_str(params[2])?; + let end_lsn = Lsn::from_str(params[3])?; + + self.check_permission(Some(tenant))?; + + match self.handle_import_wal(pgb, tenant, timeline, start_lsn, end_lsn) { + Ok(()) => pgb.write_message_noflush(&BeMessage::CommandComplete(b"SELECT 1"))?, + Err(e) => pgb.write_message_noflush(&BeMessage::ErrorResponse(&e.to_string()))?, + }; } else if query_string.to_ascii_lowercase().starts_with("set ") { // important because psycopg2 executes "SET datestyle TO 'ISO'" // on connect diff --git a/pageserver/src/pgdatadir_mapping.rs b/pageserver/src/pgdatadir_mapping.rs index c052aa3d69..6ff5949c7e 100644 --- a/pageserver/src/pgdatadir_mapping.rs +++ b/pageserver/src/pgdatadir_mapping.rs @@ -749,6 +749,7 @@ impl<'a, R: Repository> DatadirModification<'a, R> { } /// Extend relation + /// If new size is smaller, do nothing. pub fn put_rel_extend(&mut self, rel: RelTag, nblocks: BlockNumber) -> Result<()> { ensure!(rel.relnode != 0, "invalid relnode"); @@ -756,10 +757,13 @@ impl<'a, R: Repository> DatadirModification<'a, R> { let size_key = rel_size_to_key(rel); let old_size = self.get(size_key)?.get_u32_le(); - let buf = nblocks.to_le_bytes(); - self.put(size_key, Value::Image(Bytes::from(buf.to_vec()))); + // only extend relation here. never decrease the size + if nblocks > old_size { + let buf = nblocks.to_le_bytes(); + self.put(size_key, Value::Image(Bytes::from(buf.to_vec()))); - self.pending_nblocks += nblocks as isize - old_size as isize; + self.pending_nblocks += nblocks as isize - old_size as isize; + } Ok(()) } diff --git a/test_runner/batch_others/test_import.py b/test_runner/batch_others/test_import.py new file mode 100644 index 0000000000..e478103313 --- /dev/null +++ b/test_runner/batch_others/test_import.py @@ -0,0 +1,193 @@ +import pytest +from fixtures.neon_fixtures import NeonEnvBuilder, wait_for_upload, wait_for_last_record_lsn +from fixtures.utils import lsn_from_hex, lsn_to_hex +from uuid import UUID, uuid4 +import tarfile +import os +import shutil +from pathlib import Path +import json +from fixtures.utils import subprocess_capture +from fixtures.log_helper import log +from contextlib import closing +from fixtures.neon_fixtures import pg_distrib_dir + + +@pytest.mark.timeout(600) +def test_import_from_vanilla(test_output_dir, pg_bin, vanilla_pg, neon_env_builder): + # Put data in vanilla pg + vanilla_pg.start() + vanilla_pg.safe_psql("create user cloud_admin with password 'postgres' superuser") + vanilla_pg.safe_psql('''create table t as select 'long string to consume some space' || g + from generate_series(1,300000) g''') + assert vanilla_pg.safe_psql('select count(*) from t') == [(300000, )] + + # Take basebackup + basebackup_dir = os.path.join(test_output_dir, "basebackup") + base_tar = os.path.join(basebackup_dir, "base.tar") + wal_tar = os.path.join(basebackup_dir, "pg_wal.tar") + os.mkdir(basebackup_dir) + vanilla_pg.safe_psql("CHECKPOINT") + pg_bin.run([ + "pg_basebackup", + "-F", + "tar", + "-d", + vanilla_pg.connstr(), + "-D", + basebackup_dir, + ]) + + # Make corrupt base tar with missing pg_control + unpacked_base = os.path.join(basebackup_dir, "unpacked-base") + corrupt_base_tar = os.path.join(unpacked_base, "corrupt-base.tar") + os.mkdir(unpacked_base, 0o750) + subprocess_capture(str(test_output_dir), ["tar", "-xf", base_tar, "-C", unpacked_base]) + os.remove(os.path.join(unpacked_base, "global/pg_control")) + subprocess_capture(str(test_output_dir), + ["tar", "-cf", "corrupt-base.tar"] + os.listdir(unpacked_base), + cwd=unpacked_base) + + # Get start_lsn and end_lsn + with open(os.path.join(basebackup_dir, "backup_manifest")) as f: + manifest = json.load(f) + start_lsn = manifest["WAL-Ranges"][0]["Start-LSN"] + end_lsn = manifest["WAL-Ranges"][0]["End-LSN"] + + node_name = "import_from_vanilla" + tenant = uuid4() + timeline = uuid4() + + # Set up pageserver for import + neon_env_builder.enable_local_fs_remote_storage() + env = neon_env_builder.init_start() + env.pageserver.http_client().tenant_create(tenant) + + def import_tar(base, wal): + env.neon_cli.raw_cli([ + "timeline", + "import", + "--tenant-id", + tenant.hex, + "--timeline-id", + timeline.hex, + "--node-name", + node_name, + "--base-lsn", + start_lsn, + "--base-tarfile", + base, + "--end-lsn", + end_lsn, + "--wal-tarfile", + wal, + ]) + + # Importing corrupt backup fails + with pytest.raises(Exception): + import_tar(corrupt_base_tar, wal_tar) + + # Clean up + # TODO it should clean itself + client = env.pageserver.http_client() + client.timeline_detach(tenant, timeline) + + # Importing correct backup works + import_tar(base_tar, wal_tar) + + # Wait for data to land in s3 + wait_for_last_record_lsn(client, tenant, timeline, lsn_from_hex(end_lsn)) + wait_for_upload(client, tenant, timeline, lsn_from_hex(end_lsn)) + + # Check it worked + pg = env.postgres.create_start(node_name, tenant_id=tenant) + assert pg.safe_psql('select count(*) from t') == [(300000, )] + + +@pytest.mark.timeout(600) +def test_import_from_pageserver(test_output_dir, pg_bin, vanilla_pg, neon_env_builder): + + num_rows = 3000 + neon_env_builder.num_safekeepers = 1 + neon_env_builder.enable_local_fs_remote_storage() + env = neon_env_builder.init_start() + + env.neon_cli.create_branch('test_import_from_pageserver') + pgmain = env.postgres.create_start('test_import_from_pageserver') + log.info("postgres is running on 'test_import_from_pageserver' branch") + + timeline = pgmain.safe_psql("SHOW neon.timeline_id")[0][0] + + with closing(pgmain.connect()) as conn: + with conn.cursor() as cur: + # data loading may take a while, so increase statement timeout + cur.execute("SET statement_timeout='300s'") + cur.execute(f'''CREATE TABLE tbl AS SELECT 'long string to consume some space' || g + from generate_series(1,{num_rows}) g''') + cur.execute("CHECKPOINT") + + cur.execute('SELECT pg_current_wal_insert_lsn()') + lsn = cur.fetchone()[0] + log.info(f"start_backup_lsn = {lsn}") + + # Set LD_LIBRARY_PATH in the env properly, otherwise we may use the wrong libpq. + # PgBin sets it automatically, but here we need to pipe psql output to the tar command. + psql_env = {'LD_LIBRARY_PATH': os.path.join(str(pg_distrib_dir), 'lib')} + + # Get a fullbackup from pageserver + query = f"fullbackup { env.initial_tenant.hex} {timeline} {lsn}" + cmd = ["psql", "--no-psqlrc", env.pageserver.connstr(), "-c", query] + result_basepath = pg_bin.run_capture(cmd, env=psql_env) + tar_output_file = result_basepath + ".stdout" + + # Stop the first pageserver instance, erase all its data + env.postgres.stop_all() + env.pageserver.stop() + + dir_to_clear = Path(env.repo_dir) / 'tenants' + shutil.rmtree(dir_to_clear) + os.mkdir(dir_to_clear) + + #start the pageserver again + env.pageserver.start() + + # Import using another tenantid, because we use the same pageserver. + # TODO Create another pageserver to maeke test more realistic. + tenant = uuid4() + + # Import to pageserver + node_name = "import_from_pageserver" + client = env.pageserver.http_client() + client.tenant_create(tenant) + env.neon_cli.raw_cli([ + "timeline", + "import", + "--tenant-id", + tenant.hex, + "--timeline-id", + timeline, + "--node-name", + node_name, + "--base-lsn", + lsn, + "--base-tarfile", + os.path.join(tar_output_file), + ]) + + # Wait for data to land in s3 + wait_for_last_record_lsn(client, tenant, UUID(timeline), lsn_from_hex(lsn)) + wait_for_upload(client, tenant, UUID(timeline), lsn_from_hex(lsn)) + + # Check it worked + pg = env.postgres.create_start(node_name, tenant_id=tenant) + assert pg.safe_psql('select count(*) from tbl') == [(num_rows, )] + + # Take another fullbackup + query = f"fullbackup { tenant.hex} {timeline} {lsn}" + cmd = ["psql", "--no-psqlrc", env.pageserver.connstr(), "-c", query] + result_basepath = pg_bin.run_capture(cmd, env=psql_env) + new_tar_output_file = result_basepath + ".stdout" + + # Check it's the same as the first fullbackup + # TODO pageserver should be checking checksum + assert os.path.getsize(tar_output_file) == os.path.getsize(new_tar_output_file) diff --git a/test_runner/fixtures/zenith_fixtures.py b/test_runner/fixtures/zenith_fixtures.py index 8f9bf1c11b..ad09d51272 100644 --- a/test_runner/fixtures/zenith_fixtures.py +++ b/test_runner/fixtures/zenith_fixtures.py @@ -1349,12 +1349,12 @@ class VanillaPostgres(PgProtocol): if log_path is None: log_path = os.path.join(self.pgdatadir, "pg.log") - self.pg_bin.run_capture(['pg_ctl', '-D', self.pgdatadir, '-l', log_path, 'start']) + self.pg_bin.run_capture(['pg_ctl', '-w', '-D', self.pgdatadir, '-l', log_path, 'start']) def stop(self): assert self.running self.running = False - self.pg_bin.run_capture(['pg_ctl', '-D', self.pgdatadir, 'stop']) + self.pg_bin.run_capture(['pg_ctl', '-w', '-D', self.pgdatadir, 'stop']) def get_subdir_size(self, subdir) -> int: """Return size of pgdatadir subdirectory in bytes."""