From 290ad416a5bd02e93d5b8d8aa729893eaaf58443 Mon Sep 17 00:00:00 2001 From: Bojan Serafimov Date: Thu, 16 Jun 2022 18:31:07 -0400 Subject: [PATCH] Remove repeated code --- Cargo.lock | 1 + pageserver/Cargo.toml | 1 + pageserver/src/import_datadir.rs | 191 +++---------------------------- 3 files changed, 20 insertions(+), 173 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index c615766eb8..dca525941d 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1842,6 +1842,7 @@ dependencies = [ "tracing", "url", "utils", + "walkdir", "workspace_hack", ] diff --git a/pageserver/Cargo.toml b/pageserver/Cargo.toml index 298addb838..b7d97a67c0 100644 --- a/pageserver/Cargo.toml +++ b/pageserver/Cargo.toml @@ -61,6 +61,7 @@ utils = { path = "../libs/utils" } remote_storage = { path = "../libs/remote_storage" } workspace_hack = { version = "0.1", path = "../workspace_hack" } close_fds = "0.3.2" +walkdir = "2.3.2" [dev-dependencies] hex-literal = "0.3" diff --git a/pageserver/src/import_datadir.rs b/pageserver/src/import_datadir.rs index 119c18fcce..db7b0f2c80 100644 --- a/pageserver/src/import_datadir.rs +++ b/pageserver/src/import_datadir.rs @@ -2,7 +2,6 @@ //! Import data and WAL from a PostgreSQL data directory and WAL segments into //! a zenith Timeline. //! -use std::fs; use std::fs::File; use std::io::{Read, Seek, SeekFrom}; use std::path::{Path, PathBuf}; @@ -10,6 +9,7 @@ use std::path::{Path, PathBuf}; use anyhow::{bail, ensure, Context, Result}; use bytes::Bytes; use tracing::*; +use walkdir::WalkDir; use crate::pgdatadir_mapping::*; use crate::reltag::{RelTag, SlruKind}; @@ -18,8 +18,8 @@ use crate::walingest::WalIngest; use postgres_ffi::relfile_utils::*; use postgres_ffi::waldecoder::*; use postgres_ffi::xlog_utils::*; +use postgres_ffi::Oid; use postgres_ffi::{pg_constants, ControlFileData, DBState_DB_SHUTDOWNED}; -use postgres_ffi::{Oid, TransactionId}; use utils::lsn::Lsn; /// @@ -35,100 +35,29 @@ pub fn import_timeline_from_postgres_datadir( ) -> Result<()> { let mut pg_control: Option = None; + // TODO this shoud be start_lsn, which is not necessarily equal to end_lsn (aka lsn) + // Then fishing out pg_control would be unnecessary let mut modification = tline.begin_modification(lsn); modification.init_empty()?; - // Scan 'global' - let mut relfiles: Vec = Vec::new(); - for direntry in fs::read_dir(path.join("global"))? { - let direntry = direntry?; - match direntry.file_name().to_str() { - None => continue, + // Import all but pg_wal + let all_but_wal = WalkDir::new(path) + .into_iter() + .filter_entry(|entry| !entry.path().ends_with("pg_wal")); + for entry in all_but_wal { + let entry = entry.unwrap(); + let metadata = entry.metadata().unwrap(); + if metadata.is_file() { + let absolute_path = entry.path(); + let relative_path = absolute_path.strip_prefix(path)?; - Some("pg_control") => { - pg_control = Some(import_control_file(&mut modification, &direntry.path())?); - } - Some("pg_filenode.map") => { - import_relmap_file( - &mut modification, - pg_constants::GLOBALTABLESPACE_OID, - 0, - &direntry.path(), - )?; - } - - // Load any relation files into the page server (but only after the other files) - _ => relfiles.push(direntry.path()), - } - } - for relfile in relfiles { - import_relfile( - &mut modification, - &relfile, - pg_constants::GLOBALTABLESPACE_OID, - 0, - )?; - } - - // Scan 'base'. It contains database dirs, the database OID is the filename. - // E.g. 'base/12345', where 12345 is the database OID. - for direntry in fs::read_dir(path.join("base"))? { - let direntry = direntry?; - - //skip all temporary files - if direntry.file_name().to_string_lossy() == "pgsql_tmp" { - continue; - } - - let dboid = direntry.file_name().to_string_lossy().parse::()?; - - let mut relfiles: Vec = Vec::new(); - for direntry in fs::read_dir(direntry.path())? { - let direntry = direntry?; - match direntry.file_name().to_str() { - None => continue, - - Some("PG_VERSION") => { - //modification.put_dbdir_creation(pg_constants::DEFAULTTABLESPACE_OID, dboid)?; - } - Some("pg_filenode.map") => import_relmap_file( - &mut modification, - pg_constants::DEFAULTTABLESPACE_OID, - dboid, - &direntry.path(), - )?, - - // Load any relation files into the page server - _ => relfiles.push(direntry.path()), + let file = File::open(absolute_path)?; + let len = metadata.len() as usize; + if let Some(control_file) = import_file(&mut modification, relative_path, file, len)? { + pg_control = Some(control_file); } } - for relfile in relfiles { - import_relfile( - &mut modification, - &relfile, - pg_constants::DEFAULTTABLESPACE_OID, - dboid, - )?; - } } - for entry in fs::read_dir(path.join("pg_xact"))? { - let entry = entry?; - import_slru_file(&mut modification, SlruKind::Clog, &entry.path())?; - } - for entry in fs::read_dir(path.join("pg_multixact").join("members"))? { - let entry = entry?; - import_slru_file(&mut modification, SlruKind::MultiXactMembers, &entry.path())?; - } - for entry in fs::read_dir(path.join("pg_multixact").join("offsets"))? { - let entry = entry?; - import_slru_file(&mut modification, SlruKind::MultiXactOffsets, &entry.path())?; - } - for entry in fs::read_dir(path.join("pg_twophase"))? { - let entry = entry?; - let xid = u32::from_str_radix(&entry.path().to_string_lossy(), 16)?; - import_twophase_file(&mut modification, xid, &entry.path())?; - } - // TODO: Scan pg_tblspc // We're done importing all the data files. modification.commit()?; @@ -157,18 +86,6 @@ pub fn import_timeline_from_postgres_datadir( Ok(()) } -fn import_relfile( - modification: &mut DatadirModification, - path: &Path, - spcoid: Oid, - dboid: Oid, -) -> anyhow::Result<()> { - let file = File::open(path)?; - let len = file.metadata().unwrap().len() as usize; - - import_rel(modification, path, spcoid, dboid, file, len) -} - // subroutine of import_timeline_from_postgres_datadir(), to load one relation file. fn import_rel( modification: &mut DatadirModification, @@ -245,78 +162,6 @@ fn import_rel( Ok(()) } -/// Import a relmapper (pg_filenode.map) file into the repository -fn import_relmap_file( - modification: &mut DatadirModification, - spcnode: Oid, - dbnode: Oid, - path: &Path, -) -> Result<()> { - let mut file = File::open(path)?; - let mut buffer = Vec::new(); - // read the whole file - file.read_to_end(&mut buffer)?; - - trace!("importing relmap file {}", path.display()); - - modification.put_relmap_file(spcnode, dbnode, Bytes::copy_from_slice(&buffer[..]))?; - Ok(()) -} - -/// Import a twophase state file (pg_twophase/) into the repository -fn import_twophase_file( - modification: &mut DatadirModification, - xid: TransactionId, - path: &Path, -) -> Result<()> { - let mut file = File::open(path)?; - let mut buffer = Vec::new(); - // read the whole file - file.read_to_end(&mut buffer)?; - - trace!("importing non-rel file {}", path.display()); - - modification.put_twophase_file(xid, Bytes::copy_from_slice(&buffer[..]))?; - Ok(()) -} - -/// -/// Import pg_control file into the repository. -/// -/// The control file is imported as is, but we also extract the checkpoint record -/// from it and store it separated. -fn import_control_file( - modification: &mut DatadirModification, - path: &Path, -) -> Result { - let mut file = File::open(path)?; - let mut buffer = Vec::new(); - // read the whole file - file.read_to_end(&mut buffer)?; - - trace!("importing control file {}", path.display()); - - // Import it as ControlFile - modification.put_control_file(Bytes::copy_from_slice(&buffer[..]))?; - - // Extract the checkpoint record and import it separately. - let pg_control = ControlFileData::decode(&buffer)?; - let checkpoint_bytes = pg_control.checkPointCopy.encode()?; - modification.put_checkpoint(checkpoint_bytes)?; - - Ok(pg_control) -} - -fn import_slru_file( - modification: &mut DatadirModification, - slru: SlruKind, - path: &Path, -) -> Result<()> { - let file = File::open(path)?; - let len = file.metadata().unwrap().len() as usize; - import_slru(modification, slru, path, file, len) -} - /// /// Import an SLRU segment file ///