WIP import_timeline_from_tar

This commit is contained in:
Anastasia Lubennikova
2022-06-08 22:22:43 +03:00
parent 6cfebc096f
commit 21ad98ae4e

View File

@@ -22,6 +22,8 @@ use postgres_ffi::{pg_constants, ControlFileData, DBState_DB_SHUTDOWNED};
use postgres_ffi::{Oid, TransactionId};
use utils::lsn::Lsn;
use postgres::CopyOutReader;
///
/// Import all relation data pages from local disk into the repository.
///
@@ -403,3 +405,126 @@ fn import_wal<R: Repository>(
Ok(())
}
pub fn import_timeline_from_tar<R: Repository>(
tline: &mut DatadirTimeline<R>,
copyreader: CopyOutReader,
lsn: Lsn,
) -> Result<()> {
let mut ar = tar::Archive::new(copyreader);
let mut modification = tline.begin_modification(lsn);
modification.init_empty()?;
for e in ar.entries().unwrap() {
let mut entry = e.unwrap();
let header = entry.header();
let file_path = header.path().unwrap().into_owned();
match header.entry_type() {
tar::EntryType::Regular => {
let mut buffer = Vec::new();
// read the whole entry
entry.read_to_end(&mut buffer).unwrap();
import_file(&mut modification, &file_path.as_ref(), &buffer)?;
}
tar::EntryType::Directory => {
println!("tar::EntryType::Directory {}", file_path.display());
}
_ => {
panic!("tar::EntryType::?? {}", file_path.display());
}
}
}
Ok(())
}
pub fn import_file<R: Repository>(
modification: &mut DatadirModification<R>,
file_path: &Path,
buffer: &[u8],
) -> Result<()> {
if file_path.starts_with("global") {
let spcnode = pg_constants::GLOBALTABLESPACE_OID;
let dbnode = 0;
match file_path.file_name().unwrap().to_string_lossy().as_ref() {
"pg_control" => {
println!("pg_control file {}", file_path.display());
// Import it as ControlFile
modification.put_control_file(Bytes::copy_from_slice(&buffer[..]))?;
// Extract the checkpoint record and import it separately.
let pg_control = ControlFileData::decode(&buffer)?;
let checkpoint_bytes = pg_control.checkPointCopy.encode()?;
modification.put_checkpoint(checkpoint_bytes)?;
}
"pg_filenode.map" => {
("pg_filenode.map file {}", file_path.display());
}
_ => {
println!("global relfile {}", file_path.display());
//TODO
}
}
} else if file_path.starts_with("base") {
let spcnode = pg_constants::DEFAULTTABLESPACE_OID;
let dbnode: u32 = file_path
.iter()
.skip(1)
.next()
.unwrap()
.to_string_lossy()
.parse()
.unwrap();
match file_path.file_name().unwrap().to_string_lossy().as_ref() {
"pg_filenode.map" => {
println!(
"dbnode {} pg_filenode.map file {}",
dbnode,
file_path.display()
);
modification.put_relmap_file(
spcnode,
dbnode,
Bytes::copy_from_slice(&buffer[..]),
)?;
}
_ => {
println!("dbnode {} relfile {}", dbnode, file_path.display());
//TODO
}
}
} else if file_path.starts_with("pg_xact") {
println!(
"pg_xact {} ",
file_path.file_name().unwrap().to_string_lossy().as_ref()
);
// TODO
} else if file_path.starts_with("pg_multixact/offset") {
println!(
"pg_multixact/offset {}",
file_path.file_name().unwrap().to_string_lossy().as_ref()
);
// TODO
} else if file_path.starts_with("pg_multixact/members") {
println!(
"pg_multixact/members {}",
file_path.file_name().unwrap().to_string_lossy().as_ref()
);
// TODO
} else if file_path.starts_with("pg_twophase") {
let xid = u32::from_str_radix(&file_path.file_name().unwrap().to_string_lossy(), 16)?;
println!(
"xid {} pg_twophase {}",
xid,
file_path.file_name().unwrap().to_string_lossy().as_ref()
);
modification.put_twophase_file(xid, Bytes::copy_from_slice(&buffer[..]))?;
}
Ok(())
}