Address comments

This commit is contained in:
Bojan Serafimov
2022-06-20 10:17:46 -04:00
parent cdab7bc83b
commit bdfe7dacef
2 changed files with 37 additions and 32 deletions

View File

@@ -124,7 +124,7 @@ fn import_rel<R: Repository, Reader: Read>(
// ignore "relation already exists" error
if let Err(e) = modification.put_rel_creation(rel, nblocks as u32) {
if e.to_string().contains("already exists") {
info!("relation {} already exists. we must be extending it", rel);
debug!("relation {} already exists. we must be extending it", rel);
} else {
return Err(e);
}
@@ -278,7 +278,7 @@ fn import_wal<R: Repository>(
}
if last_lsn != startpoint {
debug!("reached end of WAL at {}", last_lsn);
info!("reached end of WAL at {}", last_lsn);
} else {
info!("no WAL to import at {}", last_lsn);
}
@@ -315,10 +315,7 @@ pub fn import_basebackup_from_tar<R: Repository, Reader: Read>(
}
}
tar::EntryType::Directory => {
info!("directory {:?}", file_path);
if file_path.starts_with("pg_wal") {
info!("found pg_wal in base lol");
}
debug!("directory {:?}", file_path);
}
_ => {
panic!("tar::EntryType::?? {}", file_path.display());
@@ -363,11 +360,11 @@ pub fn import_wal_from_tar<R: Repository, Reader: Read>(
let file_name = file_path.file_name().unwrap().to_string_lossy();
ensure!(expected_filename == file_name);
info!("processing wal file {:?}", file_path);
debug!("processing wal file {:?}", file_path);
read_all_bytes(entry)?
}
tar::EntryType::Directory => {
info!("directory {:?}", file_path);
debug!("directory {:?}", file_path);
continue;
}
_ => {
@@ -383,11 +380,11 @@ pub fn import_wal_from_tar<R: Repository, Reader: Read>(
walingest.ingest_record(tline, recdata, lsn)?;
last_lsn = lsn;
info!("imported record at {} (end {})", lsn, end_lsn);
debug!("imported record at {} (end {})", lsn, end_lsn);
}
}
info!("imported records up to {}", last_lsn);
debug!("imported records up to {}", last_lsn);
segno += 1;
offset = 0;
}
@@ -415,7 +412,7 @@ pub fn import_file<R: Repository, Reader: Read>(
reader: Reader,
len: usize,
) -> Result<Option<ControlFileData>> {
info!("looking at {:?}", file_path);
debug!("looking at {:?}", file_path);
if file_path.starts_with("global") {
let spcnode = pg_constants::GLOBALTABLESPACE_OID;
@@ -429,7 +426,7 @@ pub fn import_file<R: Repository, Reader: Read>(
let pg_control = ControlFileData::decode(&bytes[..])?;
let checkpoint_bytes = pg_control.checkPointCopy.encode()?;
modification.put_checkpoint(checkpoint_bytes)?;
info!("imported control file");
debug!("imported control file");
// Import it as ControlFile
modification.put_control_file(bytes)?;
@@ -438,14 +435,14 @@ pub fn import_file<R: Repository, Reader: Read>(
"pg_filenode.map" => {
let bytes = read_all_bytes(reader)?;
modification.put_relmap_file(spcnode, dbnode, bytes)?;
info!("imported relmap file")
debug!("imported relmap file")
}
"PG_VERSION" => {
info!("ignored");
debug!("ignored");
}
_ => {
import_rel(modification, file_path, spcnode, dbnode, reader, len)?;
info!("imported rel creation");
debug!("imported rel creation");
}
}
} else if file_path.starts_with("base") {
@@ -462,39 +459,39 @@ pub fn import_file<R: Repository, Reader: Read>(
"pg_filenode.map" => {
let bytes = read_all_bytes(reader)?;
modification.put_relmap_file(spcnode, dbnode, bytes)?;
info!("imported relmap file")
debug!("imported relmap file")
}
"PG_VERSION" => {
info!("ignored");
debug!("ignored");
}
_ => {
import_rel(modification, file_path, spcnode, dbnode, reader, len)?;
info!("imported rel creation");
debug!("imported rel creation");
}
}
} else if file_path.starts_with("pg_xact") {
let slru = SlruKind::Clog;
import_slru(modification, slru, file_path, reader, len)?;
info!("imported clog slru");
debug!("imported clog slru");
} else if file_path.starts_with("pg_multixact/offsets") {
let slru = SlruKind::MultiXactOffsets;
import_slru(modification, slru, file_path, reader, len)?;
info!("imported multixact offsets slru");
debug!("imported multixact offsets slru");
} else if file_path.starts_with("pg_multixact/members") {
let slru = SlruKind::MultiXactMembers;
import_slru(modification, slru, file_path, reader, len)?;
info!("imported multixact members slru");
debug!("imported multixact members slru");
} else if file_path.starts_with("pg_twophase") {
let xid = u32::from_str_radix(&file_path.file_name().unwrap().to_string_lossy(), 16)?;
let bytes = read_all_bytes(reader)?;
modification.put_twophase_file(xid, Bytes::copy_from_slice(&bytes[..]))?;
info!("imported twophase file");
debug!("imported twophase file");
} else if file_path.starts_with("pg_wal") {
info!("found wal file in base section. ignore it");
debug!("found wal file in base section. ignore it");
} else if file_path.starts_with("zenith.signal") {
// Parse zenith signal file to set correct previous LSN
let bytes = read_all_bytes(reader)?;
@@ -506,13 +503,13 @@ pub fn import_file<R: Repository, Reader: Read>(
let writer = modification.tline.tline.writer();
writer.finish_write(prev_lsn);
info!("imported zenith signal {}", prev_lsn);
debug!("imported zenith signal {}", prev_lsn);
} else if file_path.starts_with("pg_tblspc") {
// TODO Backups exported from neon won't have pg_tblspc, but we will need
// this to import arbitrary postgres databases.
bail!("Importing pg_tblspc is not implemented");
} else {
info!("ignored");
debug!("ignored");
}
Ok(None)

View File

@@ -559,10 +559,15 @@ impl PageServerHandler {
let mut datadir_timeline =
DatadirTimeline::<LayeredRepository>::new(timeline, repartition_distance);
// TODO mark timeline as not ready until it reaches end_lsn?
// This would prevent compute node from connecting to it and writing conflicting wal.
// TODO mark timeline as not ready until it reaches end_lsn.
// We might have some wal to import as well, and we should prevent compute
// from connecting before that and writing conflicting wal.
//
// This is not relevant for pageserver->pageserver migrations, since there's
// no wal to import. But should be fixed if we want to import from postgres.
// TODO leave clean state on error
// TODO leave clean state on error. For now you can use detach to clean
// up broken state from a failed import.
// Import basebackup provided via CopyData
info!("importing basebackup");
@@ -601,13 +606,14 @@ impl PageServerHandler {
let repo = tenant_mgr::get_repository_for_tenant(tenant_id)?;
let timeline = repo.get_timeline_load(timeline_id)?;
ensure!(timeline.get_last_record_lsn() == start_lsn);
let repartition_distance = repo.get_checkpoint_distance();
let mut datadir_timeline =
DatadirTimeline::<LayeredRepository>::new(timeline, repartition_distance);
// TODO ensure start_lsn matches current lsn
// TODO leave clean state on error
// TODO leave clean state on error. For now you can use detach to clean
// up broken state from a failed import.
// Import wal provided via CopyData
info!("importing wal");
@@ -615,7 +621,9 @@ impl PageServerHandler {
let reader = CopyInReader::new(pgb);
import_wal_from_tar(&mut datadir_timeline, reader, start_lsn, end_lsn)?;
// Flush data to disk, then upload to s3
// Flush data to disk, then upload to s3. No need for a forced checkpoint.
// We only want to persist the data, and it doesn't matter if it's in the
// shape of deltas or images.
info!("flushing layers");
datadir_timeline.tline.checkpoint(CheckpointConfig::Flush)?;