From 21ad98ae4e44b6ee9d2b806368e795337c64d36e Mon Sep 17 00:00:00 2001 From: Anastasia Lubennikova Date: Wed, 8 Jun 2022 22:22:43 +0300 Subject: [PATCH] WIP import_timeline_from_tar --- pageserver/src/import_datadir.rs | 125 +++++++++++++++++++++++++++++++ 1 file changed, 125 insertions(+) diff --git a/pageserver/src/import_datadir.rs b/pageserver/src/import_datadir.rs index 703ee8f1b1..db637c0df7 100644 --- a/pageserver/src/import_datadir.rs +++ b/pageserver/src/import_datadir.rs @@ -22,6 +22,8 @@ use postgres_ffi::{pg_constants, ControlFileData, DBState_DB_SHUTDOWNED}; use postgres_ffi::{Oid, TransactionId}; use utils::lsn::Lsn; +use postgres::CopyOutReader; + /// /// Import all relation data pages from local disk into the repository. /// @@ -403,3 +405,126 @@ fn import_wal( Ok(()) } + +pub fn import_timeline_from_tar( + tline: &mut DatadirTimeline, + copyreader: CopyOutReader, + lsn: Lsn, +) -> Result<()> { + let mut ar = tar::Archive::new(copyreader); + + let mut modification = tline.begin_modification(lsn); + modification.init_empty()?; + + for e in ar.entries().unwrap() { + let mut entry = e.unwrap(); + let header = entry.header(); + let file_path = header.path().unwrap().into_owned(); + + match header.entry_type() { + tar::EntryType::Regular => { + let mut buffer = Vec::new(); + // read the whole entry + entry.read_to_end(&mut buffer).unwrap(); + + import_file(&mut modification, &file_path.as_ref(), &buffer)?; + } + tar::EntryType::Directory => { + println!("tar::EntryType::Directory {}", file_path.display()); + } + _ => { + panic!("tar::EntryType::?? {}", file_path.display()); + } + } + } + Ok(()) +} + +pub fn import_file( + modification: &mut DatadirModification, + file_path: &Path, + buffer: &[u8], +) -> Result<()> { + if file_path.starts_with("global") { + let spcnode = pg_constants::GLOBALTABLESPACE_OID; + let dbnode = 0; + + match file_path.file_name().unwrap().to_string_lossy().as_ref() { + "pg_control" => { + println!("pg_control file {}", file_path.display()); + + // Import it as ControlFile + modification.put_control_file(Bytes::copy_from_slice(&buffer[..]))?; + + // Extract the checkpoint record and import it separately. + let pg_control = ControlFileData::decode(&buffer)?; + let checkpoint_bytes = pg_control.checkPointCopy.encode()?; + modification.put_checkpoint(checkpoint_bytes)?; + } + "pg_filenode.map" => { + ("pg_filenode.map file {}", file_path.display()); + } + _ => { + println!("global relfile {}", file_path.display()); + //TODO + } + } + } else if file_path.starts_with("base") { + let spcnode = pg_constants::DEFAULTTABLESPACE_OID; + let dbnode: u32 = file_path + .iter() + .skip(1) + .next() + .unwrap() + .to_string_lossy() + .parse() + .unwrap(); + + match file_path.file_name().unwrap().to_string_lossy().as_ref() { + "pg_filenode.map" => { + println!( + "dbnode {} pg_filenode.map file {}", + dbnode, + file_path.display() + ); + modification.put_relmap_file( + spcnode, + dbnode, + Bytes::copy_from_slice(&buffer[..]), + )?; + } + _ => { + println!("dbnode {} relfile {}", dbnode, file_path.display()); + //TODO + } + } + } else if file_path.starts_with("pg_xact") { + println!( + "pg_xact {} ", + file_path.file_name().unwrap().to_string_lossy().as_ref() + ); + // TODO + } else if file_path.starts_with("pg_multixact/offset") { + println!( + "pg_multixact/offset {}", + file_path.file_name().unwrap().to_string_lossy().as_ref() + ); + // TODO + } else if file_path.starts_with("pg_multixact/members") { + println!( + "pg_multixact/members {}", + file_path.file_name().unwrap().to_string_lossy().as_ref() + ); + // TODO + } else if file_path.starts_with("pg_twophase") { + let xid = u32::from_str_radix(&file_path.file_name().unwrap().to_string_lossy(), 16)?; + + println!( + "xid {} pg_twophase {}", + xid, + file_path.file_name().unwrap().to_string_lossy().as_ref() + ); + modification.put_twophase_file(xid, Bytes::copy_from_slice(&buffer[..]))?; + } + Ok(()) +}