diff --git a/pageserver/src/import_datadir.rs b/pageserver/src/import_datadir.rs index 0414ec20f9..8c6e0a4e15 100644 --- a/pageserver/src/import_datadir.rs +++ b/pageserver/src/import_datadir.rs @@ -434,31 +434,39 @@ fn import_wal( pub fn import_timeline_from_tar( tline: &mut DatadirTimeline, reader: Reader, - lsn: Lsn, + end_lsn: Lsn, ) -> Result<()> { let mut ar = tar::Archive::new(reader); - let mut modification = tline.begin_modification(lsn); + let mut modification = tline.begin_modification(end_lsn); modification.init_empty()?; - for e in ar.entries().unwrap() { + let mut entries_iter = ar.entries()?; + + let mut pg_control: Option = None; + + // Import base + for e in &mut entries_iter { let mut entry = e.unwrap(); let header = entry.header(); let file_path = header.path().unwrap().into_owned(); - // HACK why does python put absolute file paths inside tar? - let file_path = file_path.strip_prefix("home/bojan/src/neondatabase/neon/test_output/test_import/basebackup/").unwrap(); - match header.entry_type() { tar::EntryType::Regular => { let mut buffer = Vec::new(); // read the whole entry entry.read_to_end(&mut buffer).unwrap(); - import_file(&mut modification, &file_path.as_ref(), &buffer)?; + if let Some(control) = import_file(&mut modification, &file_path.as_ref(), &buffer)? { + pg_control = Some(control); + } } tar::EntryType::Directory => { - // info!("tar::EntryType::Directory {}", file_path.display()); + info!("directory {:?}", file_path); + if file_path.starts_with("pg_wal") { + info!("found pg_wal"); + break; + } } _ => { panic!("tar::EntryType::?? {}", file_path.display()); @@ -466,9 +474,79 @@ pub fn import_timeline_from_tar( } } + let pg_control = pg_control.context("pg_control file not found")?; + let startpoint = Lsn(pg_control.checkPointCopy.redo); + info!("wal redo startpoint {}", startpoint); + modification.commit()?; - // TODO import wal. + // Set up walingest mutable state + let mut waldecoder = WalStreamDecoder::new(startpoint); + let mut segno = startpoint.segment_number(pg_constants::WAL_SEGMENT_SIZE); + let mut offset = startpoint.segment_offset(pg_constants::WAL_SEGMENT_SIZE); + let mut last_lsn = startpoint; + let mut walingest = WalIngest::new(tline, startpoint)?; + + // Ingest wal until end_lsn + while last_lsn <= end_lsn { + let bytes = { + let mut entry = entries_iter.next().expect("expected more wal")?; + let header = entry.header(); + let file_path = header.path().unwrap().into_owned(); + + match header.entry_type() { + tar::EntryType::Regular => { + let mut buffer = Vec::new(); + entry.read_to_end(&mut buffer).unwrap(); + + if !file_path.starts_with("pg_wal") { + panic!("found non-wal file in wal section") + } + + // TODO assert filename matches segno + + info!("processing wal file {:?}", file_path); + buffer + } + tar::EntryType::Directory => { + info!("directory {:?}", file_path); + continue; + } + _ => { + panic!("tar::EntryType::?? {}", file_path.display()); + } + } + }; + + waldecoder.feed_bytes(&bytes[offset..]); + + while last_lsn <= end_lsn { + if let Some((lsn, recdata)) = waldecoder.poll_decode()? { + walingest.ingest_record(tline, recdata, lsn)?; + last_lsn = lsn; + + info!("imported record at {} (end {})", lsn, end_lsn); + } + } + + info!("imported records up to {}", last_lsn); + segno += 1; + offset = 0; + } + + if last_lsn != startpoint { + info!("reached end of WAL at {}", last_lsn); + } else { + info!("there was no WAL to import at {}", last_lsn); + } + + // Log any extra unused files + for e in &mut entries_iter { + let mut entry = e.unwrap(); + let header = entry.header(); + let file_path = header.path().unwrap().into_owned(); + info!("skipping {:?}", file_path); + } Ok(()) } @@ -477,7 +555,7 @@ pub fn import_file( modification: &mut DatadirModification, file_path: &Path, buffer: &[u8], -) -> Result<()> { +) -> Result> { info!("looking at {:?}", file_path); let bytes = Bytes::copy_from_slice(&buffer[..]); let bytes_len = bytes.len(); @@ -496,6 +574,7 @@ pub fn import_file( let checkpoint_bytes = pg_control.checkPointCopy.encode()?; modification.put_checkpoint(checkpoint_bytes)?; info!("imported control file"); + return Ok(Some(pg_control)); } "pg_filenode.map" => { modification.put_relmap_file(spcnode, dbnode, bytes)?; @@ -553,11 +632,13 @@ pub fn import_file( modification.put_twophase_file(xid, Bytes::copy_from_slice(&buffer[..]))?; info!("imported twophase file"); + } else if file_path.starts_with("pg_wal") { + panic!("found wal file in base section"); } else { info!("ignored"); } // TODO: pg_tblspc ?? - Ok(()) + Ok(None) } diff --git a/pageserver/src/page_service.rs b/pageserver/src/page_service.rs index 341c9cd3ae..bd901612f5 100644 --- a/pageserver/src/page_service.rs +++ b/pageserver/src/page_service.rs @@ -16,6 +16,7 @@ use regex::Regex; use std::collections::VecDeque; use std::io::{self, Read}; use std::net::TcpListener; +use std::path::PathBuf; use std::str; use std::str::FromStr; use std::sync::{Arc, RwLockReadGuard}; @@ -30,7 +31,7 @@ use utils::{ use crate::basebackup; use crate::config::{PageServerConf, ProfilingConfig}; -use crate::import_datadir::import_timeline_from_tar; +use crate::import_datadir::{import_timeline_from_postgres_datadir, import_timeline_from_tar}; use crate::layered_repository::LayeredRepository; use crate::pgdatadir_mapping::{DatadirTimeline, LsnForTimestamp}; use crate::profiling::profpoint_start; diff --git a/test_runner/batch_others/test_import.py b/test_runner/batch_others/test_import.py index 72f069f889..e149be6e0b 100644 --- a/test_runner/batch_others/test_import.py +++ b/test_runner/batch_others/test_import.py @@ -1,17 +1,68 @@ from fixtures.neon_fixtures import NeonEnvBuilder -from uuid import UUID +from uuid import UUID, uuid4 import tarfile import os import shutil from pathlib import Path +import json -def test_import(neon_env_builder, - port_distributor, - default_broker, - mock_s3_server, - test_output_dir, - pg_bin): +def test_import_from_vanilla(test_output_dir, pg_bin, vanilla_pg, neon_env_builder): + + # HACK + basebackup_cache = "/home/bojan/tmp/basebackup" + # basebackup_cache = None + + basebackup_dir = os.path.join(test_output_dir, "basebackup") + if basebackup_cache: + basebackup_dir = basebackup_cache + else: + vanilla_pg.start() + vanilla_pg.safe_psql("create table t as select generate_series(1,300000)") + assert vanilla_pg.safe_psql('select count(*) from t') == [(300000, )] + + vanilla_pg.safe_psql("CHECKPOINT") + os.mkdir(basebackup_dir) + pg_bin.run([ + "pg_basebackup", + "-F", "tar", + "-d", vanilla_pg.connstr(), + "-D", basebackup_dir, + ]) + + with open(os.path.join(basebackup_dir, "backup_manifest")) as f: + manifest = json.load(f) + start_lsn = manifest["WAL-Ranges"][0]["Start-LSN"] + end_lsn = manifest["WAL-Ranges"][0]["End-LSN"] + + node_name = "import_from_vanilla" + tenant = uuid4() + timeline = uuid4() + + env = neon_env_builder.init_start() + env.neon_cli.create_tenant(tenant) + env.neon_cli.raw_cli([ + "timeline", + "import", + "--tenant-id", tenant.hex, + "--timeline-id", timeline.hex, + "--node-name", node_name, + # created manually with: tar -cvf basebackup.tar + "--tarfile", os.path.join(basebackup_dir, "basebackup.tar"), + "--lsn", end_lsn, + # "--tarfile", os.path.join(basebackup_dir, "base.tar"), + # "--lsn", start_lsn, + ]) + pg = env.postgres.create_start(node_name, tenant_id=tenant) + assert pg.safe_psql('select count(*) from t') == [(300000, )] + + +def test_import_from_neon(neon_env_builder, + port_distributor, + default_broker, + mock_s3_server, + test_output_dir, + pg_bin): """Move a timeline to a new neon stack using pg_basebackup as interface.""" node_name = "test_import" source_repo_dir = Path(test_output_dir) / "source_repo" @@ -57,7 +108,10 @@ def test_import(neon_env_builder, "--tarfile", str(basebackup_tar_path), "--lsn", lsn, ]) + + # pg.stop_and_destroy() # pg = env.postgres.create_start(node_name, tenant_id=UUID(tenant)) + assert pg.safe_psql('select count(*) from t') == [(300000, )]