Import basebackup into pageserver (#1925)

Allow importing basebackup taken from vanilla postgres or another pageserver via psql copy in protocol.
This commit is contained in:
bojanserafimov
2022-06-21 11:04:10 -04:00
committed by GitHub
parent 6c4d6a2183
commit 1ca28e6f3c
11 changed files with 875 additions and 209 deletions

1
Cargo.lock generated
View File

@@ -1842,6 +1842,7 @@ dependencies = [
"tracing",
"url",
"utils",
"walkdir",
"workspace_hack",
]

View File

@@ -1,5 +1,6 @@
use std::collections::HashMap;
use std::io::Write;
use std::fs::File;
use std::io::{BufReader, Write};
use std::net::TcpStream;
use std::num::NonZeroU64;
use std::path::PathBuf;
@@ -527,4 +528,54 @@ impl PageServerNode {
Ok(timeline_info_response)
}
/// Import a basebackup prepared using either:
/// a) `pg_basebackup -F tar`, or
/// b) The `fullbackup` pageserver endpoint
///
/// # Arguments
/// * `tenant_id` - tenant to import into. Created if not exists
/// * `timeline_id` - id to assign to imported timeline
/// * `base` - (start lsn of basebackup, path to `base.tar` file)
/// * `pg_wal` - if there's any wal to import: (end lsn, path to `pg_wal.tar`)
pub fn timeline_import(
&self,
tenant_id: ZTenantId,
timeline_id: ZTimelineId,
base: (Lsn, PathBuf),
pg_wal: Option<(Lsn, PathBuf)>,
) -> anyhow::Result<()> {
let mut client = self.pg_connection_config.connect(NoTls).unwrap();
// Init base reader
let (start_lsn, base_tarfile_path) = base;
let base_tarfile = File::open(base_tarfile_path)?;
let mut base_reader = BufReader::new(base_tarfile);
// Init wal reader if necessary
let (end_lsn, wal_reader) = if let Some((end_lsn, wal_tarfile_path)) = pg_wal {
let wal_tarfile = File::open(wal_tarfile_path)?;
let wal_reader = BufReader::new(wal_tarfile);
(end_lsn, Some(wal_reader))
} else {
(start_lsn, None)
};
// Import base
let import_cmd =
format!("import basebackup {tenant_id} {timeline_id} {start_lsn} {end_lsn}");
let mut writer = client.copy_in(&import_cmd)?;
io::copy(&mut base_reader, &mut writer)?;
writer.finish()?;
// Import wal if necessary
if let Some(mut wal_reader) = wal_reader {
let import_cmd = format!("import wal {tenant_id} {timeline_id} {start_lsn} {end_lsn}");
let mut writer = client.copy_in(&import_cmd)?;
io::copy(&mut wal_reader, &mut writer)?;
writer.finish()?;
}
Ok(())
}
}

View File

@@ -14,7 +14,7 @@ use safekeeper::defaults::{
DEFAULT_PG_LISTEN_PORT as DEFAULT_SAFEKEEPER_PG_PORT,
};
use std::collections::{BTreeSet, HashMap};
use std::path::Path;
use std::path::{Path, PathBuf};
use std::process::exit;
use std::str::FromStr;
use utils::{
@@ -159,6 +159,20 @@ fn main() -> Result<()> {
.about("Create a new blank timeline")
.arg(tenant_id_arg.clone())
.arg(branch_name_arg.clone()))
.subcommand(App::new("import")
.about("Import timeline from basebackup directory")
.arg(tenant_id_arg.clone())
.arg(timeline_id_arg.clone())
.arg(Arg::new("node-name").long("node-name").takes_value(true)
.help("Name to assign to the imported timeline"))
.arg(Arg::new("base-tarfile").long("base-tarfile").takes_value(true)
.help("Basebackup tarfile to import"))
.arg(Arg::new("base-lsn").long("base-lsn").takes_value(true)
.help("Lsn the basebackup starts at"))
.arg(Arg::new("wal-tarfile").long("wal-tarfile").takes_value(true)
.help("Wal to add after base"))
.arg(Arg::new("end-lsn").long("end-lsn").takes_value(true)
.help("Lsn the basebackup ends at")))
).subcommand(
App::new("tenant")
.setting(AppSettings::ArgRequiredElseHelp)
@@ -613,6 +627,43 @@ fn handle_timeline(timeline_match: &ArgMatches, env: &mut local_env::LocalEnv) -
timeline.timeline_id, last_record_lsn, tenant_id,
);
}
Some(("import", import_match)) => {
let tenant_id = get_tenant_id(import_match, env)?;
let timeline_id = parse_timeline_id(import_match)?.expect("No timeline id provided");
let name = import_match
.value_of("node-name")
.ok_or_else(|| anyhow!("No node name provided"))?;
// Parse base inputs
let base_tarfile = import_match
.value_of("base-tarfile")
.map(|s| PathBuf::from_str(s).unwrap())
.ok_or_else(|| anyhow!("No base-tarfile provided"))?;
let base_lsn = Lsn::from_str(
import_match
.value_of("base-lsn")
.ok_or_else(|| anyhow!("No base-lsn provided"))?,
)?;
let base = (base_lsn, base_tarfile);
// Parse pg_wal inputs
let wal_tarfile = import_match
.value_of("wal-tarfile")
.map(|s| PathBuf::from_str(s).unwrap());
let end_lsn = import_match
.value_of("end-lsn")
.map(|s| Lsn::from_str(s).unwrap());
// TODO validate both or none are provided
let pg_wal = end_lsn.zip(wal_tarfile);
let mut cplane = ComputeControlPlane::load(env.clone())?;
println!("Importing timeline into pageserver ...");
pageserver.timeline_import(tenant_id, timeline_id, base, pg_wal)?;
println!("Creating node for imported timeline ...");
env.register_branch_mapping(name.to_string(), tenant_id, timeline_id)?;
cplane.new_node(tenant_id, name, timeline_id, None, None)?;
println!("Done");
}
Some(("branch", branch_match)) => {
let tenant_id = get_tenant_id(branch_match, env)?;
let new_branch_name = branch_match

View File

@@ -61,6 +61,7 @@ utils = { path = "../libs/utils" }
remote_storage = { path = "../libs/remote_storage" }
workspace_hack = { version = "0.1", path = "../workspace_hack" }
close_fds = "0.3.2"
walkdir = "2.3.2"
[dev-dependencies]
hex-literal = "0.3"

View File

@@ -112,6 +112,8 @@ where
}
pub fn send_tarball(mut self) -> anyhow::Result<()> {
// TODO include checksum
// Create pgdata subdirs structure
for dir in pg_constants::PGDATA_SUBDIRS.iter() {
let header = new_tar_header_dir(*dir)?;
@@ -355,24 +357,21 @@ where
pg_control.checkPointCopy = checkpoint;
pg_control.state = pg_constants::DB_SHUTDOWNED;
// Postgres doesn't recognize the zenith.signal file and doesn't need it.
if !self.full_backup {
// add zenith.signal file
let mut zenith_signal = String::new();
if self.prev_record_lsn == Lsn(0) {
if self.lsn == self.timeline.tline.get_ancestor_lsn() {
write!(zenith_signal, "PREV LSN: none")?;
} else {
write!(zenith_signal, "PREV LSN: invalid")?;
}
// add zenith.signal file
let mut zenith_signal = String::new();
if self.prev_record_lsn == Lsn(0) {
if self.lsn == self.timeline.tline.get_ancestor_lsn() {
write!(zenith_signal, "PREV LSN: none")?;
} else {
write!(zenith_signal, "PREV LSN: {}", self.prev_record_lsn)?;
write!(zenith_signal, "PREV LSN: invalid")?;
}
self.ar.append(
&new_tar_header("zenith.signal", zenith_signal.len() as u64)?,
zenith_signal.as_bytes(),
)?;
} else {
write!(zenith_signal, "PREV LSN: {}", self.prev_record_lsn)?;
}
self.ar.append(
&new_tar_header("zenith.signal", zenith_signal.len() as u64)?,
zenith_signal.as_bytes(),
)?;
//send pg_control
let pg_control_bytes = pg_control.encode();

View File

@@ -2,7 +2,6 @@
//! Import data and WAL from a PostgreSQL data directory and WAL segments into
//! a zenith Timeline.
//!
use std::fs;
use std::fs::File;
use std::io::{Read, Seek, SeekFrom};
use std::path::{Path, PathBuf};
@@ -10,16 +9,18 @@ use std::path::{Path, PathBuf};
use anyhow::{bail, ensure, Context, Result};
use bytes::Bytes;
use tracing::*;
use walkdir::WalkDir;
use crate::pgdatadir_mapping::*;
use crate::reltag::{RelTag, SlruKind};
use crate::repository::Repository;
use crate::repository::Timeline;
use crate::walingest::WalIngest;
use postgres_ffi::relfile_utils::*;
use postgres_ffi::waldecoder::*;
use postgres_ffi::xlog_utils::*;
use postgres_ffi::Oid;
use postgres_ffi::{pg_constants, ControlFileData, DBState_DB_SHUTDOWNED};
use postgres_ffi::{Oid, TransactionId};
use utils::lsn::Lsn;
///
@@ -35,100 +36,29 @@ pub fn import_timeline_from_postgres_datadir<R: Repository>(
) -> Result<()> {
let mut pg_control: Option<ControlFileData> = None;
// TODO this shoud be start_lsn, which is not necessarily equal to end_lsn (aka lsn)
// Then fishing out pg_control would be unnecessary
let mut modification = tline.begin_modification(lsn);
modification.init_empty()?;
// Scan 'global'
let mut relfiles: Vec<PathBuf> = Vec::new();
for direntry in fs::read_dir(path.join("global"))? {
let direntry = direntry?;
match direntry.file_name().to_str() {
None => continue,
// Import all but pg_wal
let all_but_wal = WalkDir::new(path)
.into_iter()
.filter_entry(|entry| !entry.path().ends_with("pg_wal"));
for entry in all_but_wal {
let entry = entry?;
let metadata = entry.metadata().expect("error getting dir entry metadata");
if metadata.is_file() {
let absolute_path = entry.path();
let relative_path = absolute_path.strip_prefix(path)?;
Some("pg_control") => {
pg_control = Some(import_control_file(&mut modification, &direntry.path())?);
}
Some("pg_filenode.map") => {
import_relmap_file(
&mut modification,
pg_constants::GLOBALTABLESPACE_OID,
0,
&direntry.path(),
)?;
}
// Load any relation files into the page server (but only after the other files)
_ => relfiles.push(direntry.path()),
}
}
for relfile in relfiles {
import_relfile(
&mut modification,
&relfile,
pg_constants::GLOBALTABLESPACE_OID,
0,
)?;
}
// Scan 'base'. It contains database dirs, the database OID is the filename.
// E.g. 'base/12345', where 12345 is the database OID.
for direntry in fs::read_dir(path.join("base"))? {
let direntry = direntry?;
//skip all temporary files
if direntry.file_name().to_string_lossy() == "pgsql_tmp" {
continue;
}
let dboid = direntry.file_name().to_string_lossy().parse::<u32>()?;
let mut relfiles: Vec<PathBuf> = Vec::new();
for direntry in fs::read_dir(direntry.path())? {
let direntry = direntry?;
match direntry.file_name().to_str() {
None => continue,
Some("PG_VERSION") => {
//modification.put_dbdir_creation(pg_constants::DEFAULTTABLESPACE_OID, dboid)?;
}
Some("pg_filenode.map") => import_relmap_file(
&mut modification,
pg_constants::DEFAULTTABLESPACE_OID,
dboid,
&direntry.path(),
)?,
// Load any relation files into the page server
_ => relfiles.push(direntry.path()),
let file = File::open(absolute_path)?;
let len = metadata.len() as usize;
if let Some(control_file) = import_file(&mut modification, relative_path, file, len)? {
pg_control = Some(control_file);
}
}
for relfile in relfiles {
import_relfile(
&mut modification,
&relfile,
pg_constants::DEFAULTTABLESPACE_OID,
dboid,
)?;
}
}
for entry in fs::read_dir(path.join("pg_xact"))? {
let entry = entry?;
import_slru_file(&mut modification, SlruKind::Clog, &entry.path())?;
}
for entry in fs::read_dir(path.join("pg_multixact").join("members"))? {
let entry = entry?;
import_slru_file(&mut modification, SlruKind::MultiXactMembers, &entry.path())?;
}
for entry in fs::read_dir(path.join("pg_multixact").join("offsets"))? {
let entry = entry?;
import_slru_file(&mut modification, SlruKind::MultiXactOffsets, &entry.path())?;
}
for entry in fs::read_dir(path.join("pg_twophase"))? {
let entry = entry?;
let xid = u32::from_str_radix(&entry.path().to_string_lossy(), 16)?;
import_twophase_file(&mut modification, xid, &entry.path())?;
}
// TODO: Scan pg_tblspc
// We're done importing all the data files.
modification.commit()?;
@@ -158,31 +88,30 @@ pub fn import_timeline_from_postgres_datadir<R: Repository>(
}
// subroutine of import_timeline_from_postgres_datadir(), to load one relation file.
fn import_relfile<R: Repository>(
fn import_rel<R: Repository, Reader: Read>(
modification: &mut DatadirModification<R>,
path: &Path,
spcoid: Oid,
dboid: Oid,
mut reader: Reader,
len: usize,
) -> anyhow::Result<()> {
// Does it look like a relation file?
trace!("importing rel file {}", path.display());
let (relnode, forknum, segno) = parse_relfilename(&path.file_name().unwrap().to_string_lossy())
.map_err(|e| {
warn!("unrecognized file in postgres datadir: {:?} ({})", path, e);
e
})?;
let filename = &path
.file_name()
.expect("missing rel filename")
.to_string_lossy();
let (relnode, forknum, segno) = parse_relfilename(filename).map_err(|e| {
warn!("unrecognized file in postgres datadir: {:?} ({})", path, e);
e
})?;
let mut file = File::open(path)?;
let mut buf: [u8; 8192] = [0u8; 8192];
let len = file.metadata().unwrap().len();
ensure!(len % pg_constants::BLCKSZ as u64 == 0);
let nblocks = len / pg_constants::BLCKSZ as u64;
if segno != 0 {
todo!();
}
ensure!(len % pg_constants::BLCKSZ as usize == 0);
let nblocks = len / pg_constants::BLCKSZ as usize;
let rel = RelTag {
spcnode: spcoid,
@@ -190,11 +119,22 @@ fn import_relfile<R: Repository>(
relnode,
forknum,
};
modification.put_rel_creation(rel, nblocks as u32)?;
let mut blknum: u32 = segno * (1024 * 1024 * 1024 / pg_constants::BLCKSZ as u32);
// Call put_rel_creation for every segment of the relation,
// because there is no guarantee about the order in which we are processing segments.
// ignore "relation already exists" error
if let Err(e) = modification.put_rel_creation(rel, nblocks as u32) {
if e.to_string().contains("already exists") {
debug!("relation {} already exists. we must be extending it", rel);
} else {
return Err(e);
}
}
loop {
let r = file.read_exact(&mut buf);
let r = reader.read_exact(&mut buf);
match r {
Ok(_) => {
modification.put_rel_page_image(rel, blknum, Bytes::copy_from_slice(&buf))?;
@@ -204,7 +144,9 @@ fn import_relfile<R: Repository>(
Err(err) => match err.kind() {
std::io::ErrorKind::UnexpectedEof => {
// reached EOF. That's expected.
ensure!(blknum == nblocks as u32, "unexpected EOF");
let relative_blknum =
blknum - segno * (1024 * 1024 * 1024 / pg_constants::BLCKSZ as u32);
ensure!(relative_blknum == nblocks as u32, "unexpected EOF");
break;
}
_ => {
@@ -215,96 +157,43 @@ fn import_relfile<R: Repository>(
blknum += 1;
}
// Update relation size
//
// If we process rel segments out of order,
// put_rel_extend will skip the update.
modification.put_rel_extend(rel, blknum)?;
Ok(())
}
/// Import a relmapper (pg_filenode.map) file into the repository
fn import_relmap_file<R: Repository>(
modification: &mut DatadirModification<R>,
spcnode: Oid,
dbnode: Oid,
path: &Path,
) -> Result<()> {
let mut file = File::open(path)?;
let mut buffer = Vec::new();
// read the whole file
file.read_to_end(&mut buffer)?;
trace!("importing relmap file {}", path.display());
modification.put_relmap_file(spcnode, dbnode, Bytes::copy_from_slice(&buffer[..]))?;
Ok(())
}
/// Import a twophase state file (pg_twophase/<xid>) into the repository
fn import_twophase_file<R: Repository>(
modification: &mut DatadirModification<R>,
xid: TransactionId,
path: &Path,
) -> Result<()> {
let mut file = File::open(path)?;
let mut buffer = Vec::new();
// read the whole file
file.read_to_end(&mut buffer)?;
trace!("importing non-rel file {}", path.display());
modification.put_twophase_file(xid, Bytes::copy_from_slice(&buffer[..]))?;
Ok(())
}
///
/// Import pg_control file into the repository.
///
/// The control file is imported as is, but we also extract the checkpoint record
/// from it and store it separated.
fn import_control_file<R: Repository>(
modification: &mut DatadirModification<R>,
path: &Path,
) -> Result<ControlFileData> {
let mut file = File::open(path)?;
let mut buffer = Vec::new();
// read the whole file
file.read_to_end(&mut buffer)?;
trace!("importing control file {}", path.display());
// Import it as ControlFile
modification.put_control_file(Bytes::copy_from_slice(&buffer[..]))?;
// Extract the checkpoint record and import it separately.
let pg_control = ControlFileData::decode(&buffer)?;
let checkpoint_bytes = pg_control.checkPointCopy.encode()?;
modification.put_checkpoint(checkpoint_bytes)?;
Ok(pg_control)
}
///
/// Import an SLRU segment file
///
fn import_slru_file<R: Repository>(
fn import_slru<R: Repository, Reader: Read>(
modification: &mut DatadirModification<R>,
slru: SlruKind,
path: &Path,
mut reader: Reader,
len: usize,
) -> Result<()> {
trace!("importing slru file {}", path.display());
let mut file = File::open(path)?;
let mut buf: [u8; 8192] = [0u8; 8192];
let segno = u32::from_str_radix(&path.file_name().unwrap().to_string_lossy(), 16)?;
let filename = &path
.file_name()
.expect("missing slru filename")
.to_string_lossy();
let segno = u32::from_str_radix(filename, 16)?;
let len = file.metadata().unwrap().len();
ensure!(len % pg_constants::BLCKSZ as u64 == 0); // we assume SLRU block size is the same as BLCKSZ
let nblocks = len / pg_constants::BLCKSZ as u64;
ensure!(len % pg_constants::BLCKSZ as usize == 0); // we assume SLRU block size is the same as BLCKSZ
let nblocks = len / pg_constants::BLCKSZ as usize;
ensure!(nblocks <= pg_constants::SLRU_PAGES_PER_SEGMENT as u64);
ensure!(nblocks <= pg_constants::SLRU_PAGES_PER_SEGMENT as usize);
modification.put_slru_segment_creation(slru, segno, nblocks as u32)?;
let mut rpageno = 0;
loop {
let r = file.read_exact(&mut buf);
let r = reader.read_exact(&mut buf);
match r {
Ok(_) => {
modification.put_slru_page_image(
@@ -396,10 +285,258 @@ fn import_wal<R: Repository>(
}
if last_lsn != startpoint {
debug!("reached end of WAL at {}", last_lsn);
info!("reached end of WAL at {}", last_lsn);
} else {
info!("no WAL to import at {}", last_lsn);
}
Ok(())
}
pub fn import_basebackup_from_tar<R: Repository, Reader: Read>(
tline: &mut DatadirTimeline<R>,
reader: Reader,
base_lsn: Lsn,
) -> Result<()> {
info!("importing base at {}", base_lsn);
let mut modification = tline.begin_modification(base_lsn);
modification.init_empty()?;
let mut pg_control: Option<ControlFileData> = None;
// Import base
for base_tar_entry in tar::Archive::new(reader).entries()? {
let entry = base_tar_entry?;
let header = entry.header();
let len = header.entry_size()? as usize;
let file_path = header.path()?.into_owned();
match header.entry_type() {
tar::EntryType::Regular => {
if let Some(res) = import_file(&mut modification, file_path.as_ref(), entry, len)? {
// We found the pg_control file.
pg_control = Some(res);
}
}
tar::EntryType::Directory => {
debug!("directory {:?}", file_path);
}
_ => {
panic!("tar::EntryType::?? {}", file_path.display());
}
}
}
// sanity check: ensure that pg_control is loaded
let _pg_control = pg_control.context("pg_control file not found")?;
modification.commit()?;
Ok(())
}
pub fn import_wal_from_tar<R: Repository, Reader: Read>(
tline: &mut DatadirTimeline<R>,
reader: Reader,
start_lsn: Lsn,
end_lsn: Lsn,
) -> Result<()> {
// Set up walingest mutable state
let mut waldecoder = WalStreamDecoder::new(start_lsn);
let mut segno = start_lsn.segment_number(pg_constants::WAL_SEGMENT_SIZE);
let mut offset = start_lsn.segment_offset(pg_constants::WAL_SEGMENT_SIZE);
let mut last_lsn = start_lsn;
let mut walingest = WalIngest::new(tline, start_lsn)?;
// Ingest wal until end_lsn
info!("importing wal until {}", end_lsn);
let mut pg_wal_tar = tar::Archive::new(reader);
let mut pg_wal_entries_iter = pg_wal_tar.entries()?;
while last_lsn <= end_lsn {
let bytes = {
let entry = pg_wal_entries_iter.next().expect("expected more wal")?;
let header = entry.header();
let file_path = header.path()?.into_owned();
match header.entry_type() {
tar::EntryType::Regular => {
// FIXME: assume postgresql tli 1 for now
let expected_filename = XLogFileName(1, segno, pg_constants::WAL_SEGMENT_SIZE);
let file_name = file_path
.file_name()
.expect("missing wal filename")
.to_string_lossy();
ensure!(expected_filename == file_name);
debug!("processing wal file {:?}", file_path);
read_all_bytes(entry)?
}
tar::EntryType::Directory => {
debug!("directory {:?}", file_path);
continue;
}
_ => {
panic!("tar::EntryType::?? {}", file_path.display());
}
}
};
waldecoder.feed_bytes(&bytes[offset..]);
while last_lsn <= end_lsn {
if let Some((lsn, recdata)) = waldecoder.poll_decode()? {
walingest.ingest_record(tline, recdata, lsn)?;
last_lsn = lsn;
debug!("imported record at {} (end {})", lsn, end_lsn);
}
}
debug!("imported records up to {}", last_lsn);
segno += 1;
offset = 0;
}
if last_lsn != start_lsn {
info!("reached end of WAL at {}", last_lsn);
} else {
info!("there was no WAL to import at {}", last_lsn);
}
// Log any extra unused files
for e in &mut pg_wal_entries_iter {
let entry = e?;
let header = entry.header();
let file_path = header.path()?.into_owned();
info!("skipping {:?}", file_path);
}
Ok(())
}
pub fn import_file<R: Repository, Reader: Read>(
modification: &mut DatadirModification<R>,
file_path: &Path,
reader: Reader,
len: usize,
) -> Result<Option<ControlFileData>> {
debug!("looking at {:?}", file_path);
if file_path.starts_with("global") {
let spcnode = pg_constants::GLOBALTABLESPACE_OID;
let dbnode = 0;
match file_path
.file_name()
.expect("missing filename")
.to_string_lossy()
.as_ref()
{
"pg_control" => {
let bytes = read_all_bytes(reader)?;
// Extract the checkpoint record and import it separately.
let pg_control = ControlFileData::decode(&bytes[..])?;
let checkpoint_bytes = pg_control.checkPointCopy.encode()?;
modification.put_checkpoint(checkpoint_bytes)?;
debug!("imported control file");
// Import it as ControlFile
modification.put_control_file(bytes)?;
return Ok(Some(pg_control));
}
"pg_filenode.map" => {
let bytes = read_all_bytes(reader)?;
modification.put_relmap_file(spcnode, dbnode, bytes)?;
debug!("imported relmap file")
}
"PG_VERSION" => {
debug!("ignored");
}
_ => {
import_rel(modification, file_path, spcnode, dbnode, reader, len)?;
debug!("imported rel creation");
}
}
} else if file_path.starts_with("base") {
let spcnode = pg_constants::DEFAULTTABLESPACE_OID;
let dbnode: u32 = file_path
.iter()
.nth(1)
.expect("invalid file path, expected dbnode")
.to_string_lossy()
.parse()?;
match file_path
.file_name()
.expect("missing base filename")
.to_string_lossy()
.as_ref()
{
"pg_filenode.map" => {
let bytes = read_all_bytes(reader)?;
modification.put_relmap_file(spcnode, dbnode, bytes)?;
debug!("imported relmap file")
}
"PG_VERSION" => {
debug!("ignored");
}
_ => {
import_rel(modification, file_path, spcnode, dbnode, reader, len)?;
debug!("imported rel creation");
}
}
} else if file_path.starts_with("pg_xact") {
let slru = SlruKind::Clog;
import_slru(modification, slru, file_path, reader, len)?;
debug!("imported clog slru");
} else if file_path.starts_with("pg_multixact/offsets") {
let slru = SlruKind::MultiXactOffsets;
import_slru(modification, slru, file_path, reader, len)?;
debug!("imported multixact offsets slru");
} else if file_path.starts_with("pg_multixact/members") {
let slru = SlruKind::MultiXactMembers;
import_slru(modification, slru, file_path, reader, len)?;
debug!("imported multixact members slru");
} else if file_path.starts_with("pg_twophase") {
let file_name = &file_path
.file_name()
.expect("missing twophase filename")
.to_string_lossy();
let xid = u32::from_str_radix(file_name, 16)?;
let bytes = read_all_bytes(reader)?;
modification.put_twophase_file(xid, Bytes::copy_from_slice(&bytes[..]))?;
debug!("imported twophase file");
} else if file_path.starts_with("pg_wal") {
debug!("found wal file in base section. ignore it");
} else if file_path.starts_with("zenith.signal") {
// Parse zenith signal file to set correct previous LSN
let bytes = read_all_bytes(reader)?;
// zenith.signal format is "PREV LSN: prev_lsn"
let zenith_signal = std::str::from_utf8(&bytes)?;
let zenith_signal = zenith_signal.split(':').collect::<Vec<_>>();
let prev_lsn = zenith_signal[1].trim().parse::<Lsn>()?;
let writer = modification.tline.tline.writer();
writer.finish_write(prev_lsn);
debug!("imported zenith signal {}", prev_lsn);
} else if file_path.starts_with("pg_tblspc") {
// TODO Backups exported from neon won't have pg_tblspc, but we will need
// this to import arbitrary postgres databases.
bail!("Importing pg_tblspc is not implemented");
} else {
debug!("ignored");
}
Ok(None)
}
fn read_all_bytes<Reader: Read>(mut reader: Reader) -> Result<Bytes> {
let mut buf: Vec<u8> = vec![];
reader.read_to_end(&mut buf)?;
Ok(Bytes::copy_from_slice(&buf[..]))
}

View File

@@ -243,15 +243,15 @@ impl Repository for LayeredRepository {
);
timeline.layers.write().unwrap().next_open_layer_at = Some(initdb_lsn);
// Insert if not exists
let timeline = Arc::new(timeline);
let r = timelines.insert(
timelineid,
LayeredTimelineEntry::Loaded(Arc::clone(&timeline)),
);
ensure!(
r.is_none(),
"assertion failure, inserted duplicate timeline"
);
match timelines.entry(timelineid) {
Entry::Occupied(_) => bail!("Timeline already exists"),
Entry::Vacant(vacant) => {
vacant.insert(LayeredTimelineEntry::Loaded(Arc::clone(&timeline)))
}
};
Ok(timeline)
}

View File

@@ -13,7 +13,7 @@ use anyhow::{bail, ensure, Context, Result};
use bytes::{Buf, BufMut, Bytes, BytesMut};
use lazy_static::lazy_static;
use regex::Regex;
use std::io;
use std::io::{self, Read};
use std::net::TcpListener;
use std::str;
use std::str::FromStr;
@@ -29,6 +29,8 @@ use utils::{
use crate::basebackup;
use crate::config::{PageServerConf, ProfilingConfig};
use crate::import_datadir::{import_basebackup_from_tar, import_wal_from_tar};
use crate::layered_repository::LayeredRepository;
use crate::pgdatadir_mapping::{DatadirTimeline, LsnForTimestamp};
use crate::profiling::profpoint_start;
use crate::reltag::RelTag;
@@ -200,6 +202,96 @@ impl PagestreamBeMessage {
}
}
/// Implements Read for the server side of CopyIn
struct CopyInReader<'a> {
pgb: &'a mut PostgresBackend,
/// Overflow buffer for bytes sent in CopyData messages
/// that the reader (caller of read) hasn't asked for yet.
/// TODO use BytesMut?
buf: Vec<u8>,
/// Bytes before `buf_begin` are considered as dropped.
/// This allows us to implement O(1) pop_front on Vec<u8>.
/// The Vec won't grow large because we only add to it
/// when it's empty.
buf_begin: usize,
}
impl<'a> CopyInReader<'a> {
// NOTE: pgb should be in copy in state already
fn new(pgb: &'a mut PostgresBackend) -> Self {
Self {
pgb,
buf: Vec::<_>::new(),
buf_begin: 0,
}
}
}
impl<'a> Drop for CopyInReader<'a> {
fn drop(&mut self) {
// Finalize copy protocol so that self.pgb can be reused
// TODO instead, maybe take ownership of pgb and give it back at the end
let mut buf: Vec<u8> = vec![];
let _ = self.read_to_end(&mut buf);
}
}
impl<'a> Read for CopyInReader<'a> {
fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
while !thread_mgr::is_shutdown_requested() {
// Return from buffer if nonempty
if self.buf_begin < self.buf.len() {
let bytes_to_read = std::cmp::min(buf.len(), self.buf.len() - self.buf_begin);
buf[..bytes_to_read].copy_from_slice(&self.buf[self.buf_begin..][..bytes_to_read]);
self.buf_begin += bytes_to_read;
return Ok(bytes_to_read);
}
// Delete garbage
self.buf.clear();
self.buf_begin = 0;
// Wait for client to send CopyData bytes
match self.pgb.read_message() {
Ok(Some(message)) => {
let copy_data_bytes = match message {
FeMessage::CopyData(bytes) => bytes,
FeMessage::CopyDone => return Ok(0),
FeMessage::Sync => continue,
m => {
let msg = format!("unexpected message {:?}", m);
self.pgb.write_message(&BeMessage::ErrorResponse(&msg))?;
return Err(io::Error::new(io::ErrorKind::Other, msg));
}
};
// Return as much as we can, saving the rest in self.buf
let mut reader = copy_data_bytes.reader();
let bytes_read = reader.read(buf)?;
reader.read_to_end(&mut self.buf)?;
return Ok(bytes_read);
}
Ok(None) => {
let msg = "client closed connection";
self.pgb.write_message(&BeMessage::ErrorResponse(msg))?;
return Err(io::Error::new(io::ErrorKind::Other, msg));
}
Err(e) => {
if !is_socket_read_timed_out(&e) {
return Err(io::Error::new(io::ErrorKind::Other, e));
}
}
}
}
// Shutting down
let msg = "Importer thread was shut down";
Err(io::Error::new(io::ErrorKind::Other, msg))
}
}
///////////////////////////////////////////////////////////////////////////////
///
@@ -447,6 +539,98 @@ impl PageServerHandler {
Ok(())
}
fn handle_import_basebackup(
&self,
pgb: &mut PostgresBackend,
tenant_id: ZTenantId,
timeline_id: ZTimelineId,
base_lsn: Lsn,
_end_lsn: Lsn,
) -> anyhow::Result<()> {
thread_mgr::associate_with(Some(tenant_id), Some(timeline_id));
let _enter =
info_span!("import basebackup", timeline = %timeline_id, tenant = %tenant_id).entered();
// Create empty timeline
info!("creating new timeline");
let repo = tenant_mgr::get_repository_for_tenant(tenant_id)?;
let timeline = repo.create_empty_timeline(timeline_id, Lsn(0))?;
let repartition_distance = repo.get_checkpoint_distance();
let mut datadir_timeline =
DatadirTimeline::<LayeredRepository>::new(timeline, repartition_distance);
// TODO mark timeline as not ready until it reaches end_lsn.
// We might have some wal to import as well, and we should prevent compute
// from connecting before that and writing conflicting wal.
//
// This is not relevant for pageserver->pageserver migrations, since there's
// no wal to import. But should be fixed if we want to import from postgres.
// TODO leave clean state on error. For now you can use detach to clean
// up broken state from a failed import.
// Import basebackup provided via CopyData
info!("importing basebackup");
pgb.write_message(&BeMessage::CopyInResponse)?;
let reader = CopyInReader::new(pgb);
import_basebackup_from_tar(&mut datadir_timeline, reader, base_lsn)?;
// TODO check checksum
// Meanwhile you can verify client-side by taking fullbackup
// and checking that it matches in size with what was imported.
// It wouldn't work if base came from vanilla postgres though,
// since we discard some log files.
// Flush data to disk, then upload to s3
info!("flushing layers");
datadir_timeline.tline.checkpoint(CheckpointConfig::Flush)?;
info!("done");
Ok(())
}
fn handle_import_wal(
&self,
pgb: &mut PostgresBackend,
tenant_id: ZTenantId,
timeline_id: ZTimelineId,
start_lsn: Lsn,
end_lsn: Lsn,
) -> anyhow::Result<()> {
thread_mgr::associate_with(Some(tenant_id), Some(timeline_id));
let _enter =
info_span!("import wal", timeline = %timeline_id, tenant = %tenant_id).entered();
let repo = tenant_mgr::get_repository_for_tenant(tenant_id)?;
let timeline = repo.get_timeline_load(timeline_id)?;
ensure!(timeline.get_last_record_lsn() == start_lsn);
let repartition_distance = repo.get_checkpoint_distance();
let mut datadir_timeline =
DatadirTimeline::<LayeredRepository>::new(timeline, repartition_distance);
// TODO leave clean state on error. For now you can use detach to clean
// up broken state from a failed import.
// Import wal provided via CopyData
info!("importing wal");
pgb.write_message(&BeMessage::CopyInResponse)?;
let reader = CopyInReader::new(pgb);
import_wal_from_tar(&mut datadir_timeline, reader, start_lsn, end_lsn)?;
// TODO Does it make sense to overshoot?
ensure!(datadir_timeline.tline.get_last_record_lsn() >= end_lsn);
// Flush data to disk, then upload to s3. No need for a forced checkpoint.
// We only want to persist the data, and it doesn't matter if it's in the
// shape of deltas or images.
info!("flushing layers");
datadir_timeline.tline.checkpoint(CheckpointConfig::Flush)?;
info!("done");
Ok(())
}
/// Helper function to handle the LSN from client request.
///
/// Each GetPage (and Exists and Nblocks) request includes information about
@@ -750,6 +934,51 @@ impl postgres_backend::Handler for PageServerHandler {
// Check that the timeline exists
self.handle_basebackup_request(pgb, timelineid, lsn, tenantid, true)?;
pgb.write_message_noflush(&BeMessage::CommandComplete(b"SELECT 1"))?;
} else if query_string.starts_with("import basebackup ") {
// Import the `base` section (everything but the wal) of a basebackup.
// Assumes the tenant already exists on this pageserver.
//
// Files are scheduled to be persisted to remote storage, and the
// caller should poll the http api to check when that is done.
//
// Example import command:
// 1. Get start/end LSN from backup_manifest file
// 2. Run:
// cat my_backup/base.tar | psql -h $PAGESERVER \
// -c "import basebackup $TENANT $TIMELINE $START_LSN $END_LSN"
let (_, params_raw) = query_string.split_at("import basebackup ".len());
let params = params_raw.split_whitespace().collect::<Vec<_>>();
ensure!(params.len() == 4);
let tenant = ZTenantId::from_str(params[0])?;
let timeline = ZTimelineId::from_str(params[1])?;
let base_lsn = Lsn::from_str(params[2])?;
let end_lsn = Lsn::from_str(params[3])?;
self.check_permission(Some(tenant))?;
match self.handle_import_basebackup(pgb, tenant, timeline, base_lsn, end_lsn) {
Ok(()) => pgb.write_message_noflush(&BeMessage::CommandComplete(b"SELECT 1"))?,
Err(e) => pgb.write_message_noflush(&BeMessage::ErrorResponse(&e.to_string()))?,
};
} else if query_string.starts_with("import wal ") {
// Import the `pg_wal` section of a basebackup.
//
// Files are scheduled to be persisted to remote storage, and the
// caller should poll the http api to check when that is done.
let (_, params_raw) = query_string.split_at("import wal ".len());
let params = params_raw.split_whitespace().collect::<Vec<_>>();
ensure!(params.len() == 4);
let tenant = ZTenantId::from_str(params[0])?;
let timeline = ZTimelineId::from_str(params[1])?;
let start_lsn = Lsn::from_str(params[2])?;
let end_lsn = Lsn::from_str(params[3])?;
self.check_permission(Some(tenant))?;
match self.handle_import_wal(pgb, tenant, timeline, start_lsn, end_lsn) {
Ok(()) => pgb.write_message_noflush(&BeMessage::CommandComplete(b"SELECT 1"))?,
Err(e) => pgb.write_message_noflush(&BeMessage::ErrorResponse(&e.to_string()))?,
};
} else if query_string.to_ascii_lowercase().starts_with("set ") {
// important because psycopg2 executes "SET datestyle TO 'ISO'"
// on connect

View File

@@ -749,6 +749,7 @@ impl<'a, R: Repository> DatadirModification<'a, R> {
}
/// Extend relation
/// If new size is smaller, do nothing.
pub fn put_rel_extend(&mut self, rel: RelTag, nblocks: BlockNumber) -> Result<()> {
ensure!(rel.relnode != 0, "invalid relnode");
@@ -756,10 +757,13 @@ impl<'a, R: Repository> DatadirModification<'a, R> {
let size_key = rel_size_to_key(rel);
let old_size = self.get(size_key)?.get_u32_le();
let buf = nblocks.to_le_bytes();
self.put(size_key, Value::Image(Bytes::from(buf.to_vec())));
// only extend relation here. never decrease the size
if nblocks > old_size {
let buf = nblocks.to_le_bytes();
self.put(size_key, Value::Image(Bytes::from(buf.to_vec())));
self.pending_nblocks += nblocks as isize - old_size as isize;
self.pending_nblocks += nblocks as isize - old_size as isize;
}
Ok(())
}

View File

@@ -0,0 +1,193 @@
import pytest
from fixtures.neon_fixtures import NeonEnvBuilder, wait_for_upload, wait_for_last_record_lsn
from fixtures.utils import lsn_from_hex, lsn_to_hex
from uuid import UUID, uuid4
import tarfile
import os
import shutil
from pathlib import Path
import json
from fixtures.utils import subprocess_capture
from fixtures.log_helper import log
from contextlib import closing
from fixtures.neon_fixtures import pg_distrib_dir
@pytest.mark.timeout(600)
def test_import_from_vanilla(test_output_dir, pg_bin, vanilla_pg, neon_env_builder):
# Put data in vanilla pg
vanilla_pg.start()
vanilla_pg.safe_psql("create user cloud_admin with password 'postgres' superuser")
vanilla_pg.safe_psql('''create table t as select 'long string to consume some space' || g
from generate_series(1,300000) g''')
assert vanilla_pg.safe_psql('select count(*) from t') == [(300000, )]
# Take basebackup
basebackup_dir = os.path.join(test_output_dir, "basebackup")
base_tar = os.path.join(basebackup_dir, "base.tar")
wal_tar = os.path.join(basebackup_dir, "pg_wal.tar")
os.mkdir(basebackup_dir)
vanilla_pg.safe_psql("CHECKPOINT")
pg_bin.run([
"pg_basebackup",
"-F",
"tar",
"-d",
vanilla_pg.connstr(),
"-D",
basebackup_dir,
])
# Make corrupt base tar with missing pg_control
unpacked_base = os.path.join(basebackup_dir, "unpacked-base")
corrupt_base_tar = os.path.join(unpacked_base, "corrupt-base.tar")
os.mkdir(unpacked_base, 0o750)
subprocess_capture(str(test_output_dir), ["tar", "-xf", base_tar, "-C", unpacked_base])
os.remove(os.path.join(unpacked_base, "global/pg_control"))
subprocess_capture(str(test_output_dir),
["tar", "-cf", "corrupt-base.tar"] + os.listdir(unpacked_base),
cwd=unpacked_base)
# Get start_lsn and end_lsn
with open(os.path.join(basebackup_dir, "backup_manifest")) as f:
manifest = json.load(f)
start_lsn = manifest["WAL-Ranges"][0]["Start-LSN"]
end_lsn = manifest["WAL-Ranges"][0]["End-LSN"]
node_name = "import_from_vanilla"
tenant = uuid4()
timeline = uuid4()
# Set up pageserver for import
neon_env_builder.enable_local_fs_remote_storage()
env = neon_env_builder.init_start()
env.pageserver.http_client().tenant_create(tenant)
def import_tar(base, wal):
env.neon_cli.raw_cli([
"timeline",
"import",
"--tenant-id",
tenant.hex,
"--timeline-id",
timeline.hex,
"--node-name",
node_name,
"--base-lsn",
start_lsn,
"--base-tarfile",
base,
"--end-lsn",
end_lsn,
"--wal-tarfile",
wal,
])
# Importing corrupt backup fails
with pytest.raises(Exception):
import_tar(corrupt_base_tar, wal_tar)
# Clean up
# TODO it should clean itself
client = env.pageserver.http_client()
client.timeline_detach(tenant, timeline)
# Importing correct backup works
import_tar(base_tar, wal_tar)
# Wait for data to land in s3
wait_for_last_record_lsn(client, tenant, timeline, lsn_from_hex(end_lsn))
wait_for_upload(client, tenant, timeline, lsn_from_hex(end_lsn))
# Check it worked
pg = env.postgres.create_start(node_name, tenant_id=tenant)
assert pg.safe_psql('select count(*) from t') == [(300000, )]
@pytest.mark.timeout(600)
def test_import_from_pageserver(test_output_dir, pg_bin, vanilla_pg, neon_env_builder):
num_rows = 3000
neon_env_builder.num_safekeepers = 1
neon_env_builder.enable_local_fs_remote_storage()
env = neon_env_builder.init_start()
env.neon_cli.create_branch('test_import_from_pageserver')
pgmain = env.postgres.create_start('test_import_from_pageserver')
log.info("postgres is running on 'test_import_from_pageserver' branch")
timeline = pgmain.safe_psql("SHOW neon.timeline_id")[0][0]
with closing(pgmain.connect()) as conn:
with conn.cursor() as cur:
# data loading may take a while, so increase statement timeout
cur.execute("SET statement_timeout='300s'")
cur.execute(f'''CREATE TABLE tbl AS SELECT 'long string to consume some space' || g
from generate_series(1,{num_rows}) g''')
cur.execute("CHECKPOINT")
cur.execute('SELECT pg_current_wal_insert_lsn()')
lsn = cur.fetchone()[0]
log.info(f"start_backup_lsn = {lsn}")
# Set LD_LIBRARY_PATH in the env properly, otherwise we may use the wrong libpq.
# PgBin sets it automatically, but here we need to pipe psql output to the tar command.
psql_env = {'LD_LIBRARY_PATH': os.path.join(str(pg_distrib_dir), 'lib')}
# Get a fullbackup from pageserver
query = f"fullbackup { env.initial_tenant.hex} {timeline} {lsn}"
cmd = ["psql", "--no-psqlrc", env.pageserver.connstr(), "-c", query]
result_basepath = pg_bin.run_capture(cmd, env=psql_env)
tar_output_file = result_basepath + ".stdout"
# Stop the first pageserver instance, erase all its data
env.postgres.stop_all()
env.pageserver.stop()
dir_to_clear = Path(env.repo_dir) / 'tenants'
shutil.rmtree(dir_to_clear)
os.mkdir(dir_to_clear)
#start the pageserver again
env.pageserver.start()
# Import using another tenantid, because we use the same pageserver.
# TODO Create another pageserver to maeke test more realistic.
tenant = uuid4()
# Import to pageserver
node_name = "import_from_pageserver"
client = env.pageserver.http_client()
client.tenant_create(tenant)
env.neon_cli.raw_cli([
"timeline",
"import",
"--tenant-id",
tenant.hex,
"--timeline-id",
timeline,
"--node-name",
node_name,
"--base-lsn",
lsn,
"--base-tarfile",
os.path.join(tar_output_file),
])
# Wait for data to land in s3
wait_for_last_record_lsn(client, tenant, UUID(timeline), lsn_from_hex(lsn))
wait_for_upload(client, tenant, UUID(timeline), lsn_from_hex(lsn))
# Check it worked
pg = env.postgres.create_start(node_name, tenant_id=tenant)
assert pg.safe_psql('select count(*) from tbl') == [(num_rows, )]
# Take another fullbackup
query = f"fullbackup { tenant.hex} {timeline} {lsn}"
cmd = ["psql", "--no-psqlrc", env.pageserver.connstr(), "-c", query]
result_basepath = pg_bin.run_capture(cmd, env=psql_env)
new_tar_output_file = result_basepath + ".stdout"
# Check it's the same as the first fullbackup
# TODO pageserver should be checking checksum
assert os.path.getsize(tar_output_file) == os.path.getsize(new_tar_output_file)

View File

@@ -1398,12 +1398,12 @@ class VanillaPostgres(PgProtocol):
if log_path is None:
log_path = os.path.join(self.pgdatadir, "pg.log")
self.pg_bin.run_capture(['pg_ctl', '-D', self.pgdatadir, '-l', log_path, 'start'])
self.pg_bin.run_capture(['pg_ctl', '-w', '-D', self.pgdatadir, '-l', log_path, 'start'])
def stop(self):
assert self.running
self.running = False
self.pg_bin.run_capture(['pg_ctl', '-D', self.pgdatadir, 'stop'])
self.pg_bin.run_capture(['pg_ctl', '-w', '-D', self.pgdatadir, 'stop'])
def get_subdir_size(self, subdir) -> int:
"""Return size of pgdatadir subdirectory in bytes."""