This commit is contained in:
Bojan Serafimov
2022-06-09 22:51:32 -04:00
parent 1380a1cce1
commit 31cf43724c
3 changed files with 155 additions and 19 deletions

View File

@@ -434,31 +434,39 @@ fn import_wal<R: Repository>(
pub fn import_timeline_from_tar<R: Repository, Reader: Read>(
tline: &mut DatadirTimeline<R>,
reader: Reader,
lsn: Lsn,
end_lsn: Lsn,
) -> Result<()> {
let mut ar = tar::Archive::new(reader);
let mut modification = tline.begin_modification(lsn);
let mut modification = tline.begin_modification(end_lsn);
modification.init_empty()?;
for e in ar.entries().unwrap() {
let mut entries_iter = ar.entries()?;
let mut pg_control: Option<ControlFileData> = None;
// Import base
for e in &mut entries_iter {
let mut entry = e.unwrap();
let header = entry.header();
let file_path = header.path().unwrap().into_owned();
// HACK why does python put absolute file paths inside tar?
let file_path = file_path.strip_prefix("home/bojan/src/neondatabase/neon/test_output/test_import/basebackup/").unwrap();
match header.entry_type() {
tar::EntryType::Regular => {
let mut buffer = Vec::new();
// read the whole entry
entry.read_to_end(&mut buffer).unwrap();
import_file(&mut modification, &file_path.as_ref(), &buffer)?;
if let Some(control) = import_file(&mut modification, &file_path.as_ref(), &buffer)? {
pg_control = Some(control);
}
}
tar::EntryType::Directory => {
// info!("tar::EntryType::Directory {}", file_path.display());
info!("directory {:?}", file_path);
if file_path.starts_with("pg_wal") {
info!("found pg_wal");
break;
}
}
_ => {
panic!("tar::EntryType::?? {}", file_path.display());
@@ -466,9 +474,79 @@ pub fn import_timeline_from_tar<R: Repository, Reader: Read>(
}
}
let pg_control = pg_control.context("pg_control file not found")?;
let startpoint = Lsn(pg_control.checkPointCopy.redo);
info!("wal redo startpoint {}", startpoint);
modification.commit()?;
// TODO import wal.
// Set up walingest mutable state
let mut waldecoder = WalStreamDecoder::new(startpoint);
let mut segno = startpoint.segment_number(pg_constants::WAL_SEGMENT_SIZE);
let mut offset = startpoint.segment_offset(pg_constants::WAL_SEGMENT_SIZE);
let mut last_lsn = startpoint;
let mut walingest = WalIngest::new(tline, startpoint)?;
// Ingest wal until end_lsn
while last_lsn <= end_lsn {
let bytes = {
let mut entry = entries_iter.next().expect("expected more wal")?;
let header = entry.header();
let file_path = header.path().unwrap().into_owned();
match header.entry_type() {
tar::EntryType::Regular => {
let mut buffer = Vec::new();
entry.read_to_end(&mut buffer).unwrap();
if !file_path.starts_with("pg_wal") {
panic!("found non-wal file in wal section")
}
// TODO assert filename matches segno
info!("processing wal file {:?}", file_path);
buffer
}
tar::EntryType::Directory => {
info!("directory {:?}", file_path);
continue;
}
_ => {
panic!("tar::EntryType::?? {}", file_path.display());
}
}
};
waldecoder.feed_bytes(&bytes[offset..]);
while last_lsn <= end_lsn {
if let Some((lsn, recdata)) = waldecoder.poll_decode()? {
walingest.ingest_record(tline, recdata, lsn)?;
last_lsn = lsn;
info!("imported record at {} (end {})", lsn, end_lsn);
}
}
info!("imported records up to {}", last_lsn);
segno += 1;
offset = 0;
}
if last_lsn != startpoint {
info!("reached end of WAL at {}", last_lsn);
} else {
info!("there was no WAL to import at {}", last_lsn);
}
// Log any extra unused files
for e in &mut entries_iter {
let mut entry = e.unwrap();
let header = entry.header();
let file_path = header.path().unwrap().into_owned();
info!("skipping {:?}", file_path);
}
Ok(())
}
@@ -477,7 +555,7 @@ pub fn import_file<R: Repository>(
modification: &mut DatadirModification<R>,
file_path: &Path,
buffer: &[u8],
) -> Result<()> {
) -> Result<Option<ControlFileData>> {
info!("looking at {:?}", file_path);
let bytes = Bytes::copy_from_slice(&buffer[..]);
let bytes_len = bytes.len();
@@ -496,6 +574,7 @@ pub fn import_file<R: Repository>(
let checkpoint_bytes = pg_control.checkPointCopy.encode()?;
modification.put_checkpoint(checkpoint_bytes)?;
info!("imported control file");
return Ok(Some(pg_control));
}
"pg_filenode.map" => {
modification.put_relmap_file(spcnode, dbnode, bytes)?;
@@ -553,11 +632,13 @@ pub fn import_file<R: Repository>(
modification.put_twophase_file(xid, Bytes::copy_from_slice(&buffer[..]))?;
info!("imported twophase file");
} else if file_path.starts_with("pg_wal") {
panic!("found wal file in base section");
} else {
info!("ignored");
}
// TODO: pg_tblspc ??
Ok(())
Ok(None)
}

View File

@@ -16,6 +16,7 @@ use regex::Regex;
use std::collections::VecDeque;
use std::io::{self, Read};
use std::net::TcpListener;
use std::path::PathBuf;
use std::str;
use std::str::FromStr;
use std::sync::{Arc, RwLockReadGuard};
@@ -30,7 +31,7 @@ use utils::{
use crate::basebackup;
use crate::config::{PageServerConf, ProfilingConfig};
use crate::import_datadir::import_timeline_from_tar;
use crate::import_datadir::{import_timeline_from_postgres_datadir, import_timeline_from_tar};
use crate::layered_repository::LayeredRepository;
use crate::pgdatadir_mapping::{DatadirTimeline, LsnForTimestamp};
use crate::profiling::profpoint_start;

View File

@@ -1,17 +1,68 @@
from fixtures.neon_fixtures import NeonEnvBuilder
from uuid import UUID
from uuid import UUID, uuid4
import tarfile
import os
import shutil
from pathlib import Path
import json
def test_import(neon_env_builder,
port_distributor,
default_broker,
mock_s3_server,
test_output_dir,
pg_bin):
def test_import_from_vanilla(test_output_dir, pg_bin, vanilla_pg, neon_env_builder):
# HACK
basebackup_cache = "/home/bojan/tmp/basebackup"
# basebackup_cache = None
basebackup_dir = os.path.join(test_output_dir, "basebackup")
if basebackup_cache:
basebackup_dir = basebackup_cache
else:
vanilla_pg.start()
vanilla_pg.safe_psql("create table t as select generate_series(1,300000)")
assert vanilla_pg.safe_psql('select count(*) from t') == [(300000, )]
vanilla_pg.safe_psql("CHECKPOINT")
os.mkdir(basebackup_dir)
pg_bin.run([
"pg_basebackup",
"-F", "tar",
"-d", vanilla_pg.connstr(),
"-D", basebackup_dir,
])
with open(os.path.join(basebackup_dir, "backup_manifest")) as f:
manifest = json.load(f)
start_lsn = manifest["WAL-Ranges"][0]["Start-LSN"]
end_lsn = manifest["WAL-Ranges"][0]["End-LSN"]
node_name = "import_from_vanilla"
tenant = uuid4()
timeline = uuid4()
env = neon_env_builder.init_start()
env.neon_cli.create_tenant(tenant)
env.neon_cli.raw_cli([
"timeline",
"import",
"--tenant-id", tenant.hex,
"--timeline-id", timeline.hex,
"--node-name", node_name,
# created manually with: tar -cvf basebackup.tar <all files, with pg_wal at the end>
"--tarfile", os.path.join(basebackup_dir, "basebackup.tar"),
"--lsn", end_lsn,
# "--tarfile", os.path.join(basebackup_dir, "base.tar"),
# "--lsn", start_lsn,
])
pg = env.postgres.create_start(node_name, tenant_id=tenant)
assert pg.safe_psql('select count(*) from t') == [(300000, )]
def test_import_from_neon(neon_env_builder,
port_distributor,
default_broker,
mock_s3_server,
test_output_dir,
pg_bin):
"""Move a timeline to a new neon stack using pg_basebackup as interface."""
node_name = "test_import"
source_repo_dir = Path(test_output_dir) / "source_repo"
@@ -57,7 +108,10 @@ def test_import(neon_env_builder,
"--tarfile", str(basebackup_tar_path),
"--lsn", lsn,
])
# pg.stop_and_destroy()
# pg = env.postgres.create_start(node_name, tenant_id=UUID(tenant))
assert pg.safe_psql('select count(*) from t') == [(300000, )]