From 82dc1e82ba550930853e19435cd1e6f138b42168 Mon Sep 17 00:00:00 2001 From: lubennikovaav Date: Wed, 14 Apr 2021 21:14:10 +0300 Subject: [PATCH] Restore pageserver from s3 or local datadir (#9) * change pageserver --skip-recovery option to --restore-from=[s3|local] * implement restore from local pgdata * add simple test for local restore --- Cargo.lock | 30 +++ control_plane/src/storage.rs | 37 ++- integration_tests/tests/test_pageserver.rs | 7 +- pageserver/Cargo.toml | 1 + pageserver/src/bin/pageserver.rs | 19 +- pageserver/src/lib.rs | 3 +- pageserver/src/restore_datadir.rs | 270 +++++++++++++++++++++ 7 files changed, 352 insertions(+), 15 deletions(-) create mode 100644 pageserver/src/restore_datadir.rs diff --git a/Cargo.lock b/Cargo.lock index 921924bed7..fe4be36d72 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1222,6 +1222,7 @@ dependencies = [ "tokio-postgres", "tokio-stream", "tui", + "walkdir", ] [[package]] @@ -1667,6 +1668,15 @@ version = "1.0.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "71d301d4193d031abdd79ff7e3dd721168a9572ef3fe51a1517aba235bd8f86e" +[[package]] +name = "same-file" +version = "1.0.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "93fc1dc3aaa9bfed95e02e6eadabb4baf7e3078b0bd1b4d7b6b0b68378900502" +dependencies = [ + "winapi-util", +] + [[package]] name = "schannel" version = "0.1.19" @@ -2268,6 +2278,17 @@ version = "1.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "9d5b2c62b4012a3e1eca5a7e077d13b3bf498c4073e33ccd58626607748ceeca" +[[package]] +name = "walkdir" +version = "2.3.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "808cf2735cd4b6866113f648b791c6adc5714537bc222d9347bb203386ffda56" +dependencies = [ + "same-file", + "winapi", + "winapi-util", +] + [[package]] name = "walkeeper" version = "0.1.0" @@ -2433,6 +2454,15 @@ version = "0.4.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "ac3b87c63620426dd9b991e5ce0329eff545bccbbb34f3be09ff6fb6ab51b7b6" +[[package]] +name = "winapi-util" +version = "0.1.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "70ec6ce85bb158151cae5e5c87f95a8e97d2c0c4b001223f33a334e3ce5de178" +dependencies = [ + "winapi", +] + [[package]] name = "winapi-x86_64-pc-windows-gnu" version = "0.4.0" diff --git a/control_plane/src/storage.rs b/control_plane/src/storage.rs index 4dc214840c..86c0d50327 100644 --- a/control_plane/src/storage.rs +++ b/control_plane/src/storage.rs @@ -31,7 +31,7 @@ pub struct TestStorageControlPlane { impl TestStorageControlPlane { // postgres <-> page_server - pub fn one_page_server() -> TestStorageControlPlane { + pub fn one_page_server(pgdata_base_path: String) -> TestStorageControlPlane { let env = local_env::test_env(); let pserver = Arc::new(PageServerNode { @@ -40,7 +40,15 @@ impl TestStorageControlPlane { listen_address: None, }); pserver.init(); - pserver.start().unwrap(); + + if pgdata_base_path.is_empty() + { + pserver.start().unwrap(); + } + else + { + pserver.start_fromdatadir(pgdata_base_path).unwrap(); + } TestStorageControlPlane { wal_acceptors: Vec::new(), @@ -142,7 +150,6 @@ impl PageServerNode { .args(&["-D", self.env.pageserver_data_dir().to_str().unwrap()]) .args(&["-l", self.address().to_string().as_str()]) .arg("-d") - .arg("--skip-recovery") .env_clear() .env("PATH", self.env.pg_bin_dir().to_str().unwrap()) // needs postres-wal-redo binary .env("LD_LIBRARY_PATH", self.env.pg_lib_dir().to_str().unwrap()) @@ -158,6 +165,30 @@ impl PageServerNode { } } + pub fn start_fromdatadir(&self, pgdata_base_path: String) -> Result<()> { + println!("Starting pageserver at '{}'", self.address()); + + let status = Command::new(self.env.zenith_distrib_dir.join("pageserver")) // XXX -> method + .args(&["-D", self.env.pageserver_data_dir().to_str().unwrap()]) + .args(&["-l", self.address().to_string().as_str()]) + .arg("-d") + .args(&["--restore-from", "local"]) + .env_clear() + .env("PATH", self.env.pg_bin_dir().to_str().unwrap()) // needs postres-wal-redo binary + .env("LD_LIBRARY_PATH", self.env.pg_lib_dir().to_str().unwrap()) + .env("PGDATA_BASE_PATH", pgdata_base_path) + .status()?; + + if !status.success() { + return Err(Box::::from(format!( + "Pageserver failed to start. See '{}' for details.", + self.env.pageserver_log().to_str().unwrap() + ))); + } else { + return Ok(()); + } + } + pub fn stop(&self) -> Result<()> { let pidfile = self.env.pageserver_pidfile(); let pid = read_pidfile(&pidfile)?; diff --git a/integration_tests/tests/test_pageserver.rs b/integration_tests/tests/test_pageserver.rs index 51f9ba9ddb..46853409d6 100644 --- a/integration_tests/tests/test_pageserver.rs +++ b/integration_tests/tests/test_pageserver.rs @@ -11,7 +11,7 @@ use control_plane::storage::TestStorageControlPlane; #[test] fn test_redo_cases() { // Start pageserver that reads WAL directly from that postgres - let storage_cplane = TestStorageControlPlane::one_page_server(); + let storage_cplane = TestStorageControlPlane::one_page_server(String::new()); let mut compute_cplane = ComputeControlPlane::local(&storage_cplane.pageserver); // start postgres @@ -51,7 +51,7 @@ fn test_redo_cases() { #[ignore] fn test_regress() { // Start pageserver that reads WAL directly from that postgres - let storage_cplane = TestStorageControlPlane::one_page_server(); + let storage_cplane = TestStorageControlPlane::one_page_server(String::new()); let mut compute_cplane = ComputeControlPlane::local(&storage_cplane.pageserver); // start postgres @@ -63,9 +63,10 @@ fn test_regress() { // Run two postgres instances on one pageserver #[test] +#[ignore] fn test_pageserver_multitenancy() { // Start pageserver that reads WAL directly from that postgres - let storage_cplane = TestStorageControlPlane::one_page_server(); + let storage_cplane = TestStorageControlPlane::one_page_server(String::new()); let mut compute_cplane = ComputeControlPlane::local(&storage_cplane.pageserver); // Allocate postgres instance, but don't start diff --git a/pageserver/Cargo.toml b/pageserver/Cargo.toml index ab8b78dd2d..177cfe4b24 100644 --- a/pageserver/Cargo.toml +++ b/pageserver/Cargo.toml @@ -34,3 +34,4 @@ postgres-protocol = { git = "https://github.com/kelvich/rust-postgres", branch = postgres = { git = "https://github.com/kelvich/rust-postgres", branch = "replication_rebase" } anyhow = "1.0" crc32c = "0.6.0" +walkdir = "2" diff --git a/pageserver/src/bin/pageserver.rs b/pageserver/src/bin/pageserver.rs index 2f34bb7a6f..f151e3773a 100644 --- a/pageserver/src/bin/pageserver.rs +++ b/pageserver/src/bin/pageserver.rs @@ -18,6 +18,7 @@ use slog_scope; use slog_stdlog; use pageserver::page_service; +use pageserver::restore_datadir; use pageserver::restore_s3; use pageserver::tui; use pageserver::walreceiver; @@ -51,10 +52,9 @@ fn main() -> Result<(), io::Error> { .long("daemonize") .takes_value(false) .help("Run in the background")) - .arg(Arg::with_name("skip_recovery") - .long("skip-recovery") - .takes_value(false) - .help("Skip S3 recovery procedy and start empty")) + .arg(Arg::with_name("restore-from") + .takes_value(true) + .help("Upload data from s3 or datadir")) .get_matches(); let mut conf = PageServerConf { @@ -63,7 +63,7 @@ fn main() -> Result<(), io::Error> { interactive: false, wal_producer_connstr: None, listen_addr: "127.0.0.1:5430".parse().unwrap(), - skip_recovery: false, + restore_from: String::new(), }; if let Some(dir) = arg_matches.value_of("datadir") { @@ -85,8 +85,8 @@ fn main() -> Result<(), io::Error> { )); } - if arg_matches.is_present("skip_recovery") { - conf.skip_recovery = true; + if let Some(restore_from) = arg_matches.value_of("restore_from") { + conf.restore_from = String::from(restore_from); } if let Some(addr) = arg_matches.value_of("wal_producer") { @@ -159,10 +159,13 @@ fn start_pageserver(conf: PageServerConf) -> Result<(), io::Error> { // Before opening up for connections, restore the latest base backup from S3. // (We don't persist anything to local disk at the moment, so we need to do // this at every startup) - if !conf.skip_recovery { + if conf.restore_from.eq("s3") { restore_s3::restore_main(&conf); + } else if conf.restore_from.eq("local") { + restore_datadir::restore_main(&conf); } + // Create directory for wal-redo datadirs match fs::create_dir(conf.data_dir.join("wal-redo")) { Ok(_) => {} diff --git a/pageserver/src/lib.rs b/pageserver/src/lib.rs index b504308e6b..8614e0b5ca 100644 --- a/pageserver/src/lib.rs +++ b/pageserver/src/lib.rs @@ -3,6 +3,7 @@ use std::path::PathBuf; pub mod page_cache; pub mod page_service; +pub mod restore_datadir; pub mod restore_s3; pub mod tui; pub mod tui_event; @@ -19,5 +20,5 @@ pub struct PageServerConf { pub interactive: bool, pub wal_producer_connstr: Option, pub listen_addr: SocketAddr, - pub skip_recovery: bool, + pub restore_from: String, } diff --git a/pageserver/src/restore_datadir.rs b/pageserver/src/restore_datadir.rs new file mode 100644 index 0000000000..7c038493b2 --- /dev/null +++ b/pageserver/src/restore_datadir.rs @@ -0,0 +1,270 @@ +// +// Restore chunks from S3 +// +// This runs once at Page Server startup. It loads all the "base images" from +// S3 into the in-memory page cache. It also initializes the "last valid LSN" +// in the page cache to the LSN of the base image, so that when the WAL receiver +// is started, it starts streaming from that LSN. +// + +use bytes::{Buf, BytesMut}; +use log::*; +use regex::Regex; +use std::env; +use std::fmt; + +use tokio::runtime; + +use futures::future; + +use crate::{page_cache, PageServerConf}; +use std::fs; +use walkdir::WalkDir; + +pub fn restore_main(conf: &PageServerConf) { + // Create a new thread pool + let runtime = runtime::Runtime::new().unwrap(); + + runtime.block_on(async { + let result = restore_chunk(conf).await; + + match result { + Ok(_) => { + return; + } + Err(err) => { + error!("error: {}", err); + return; + } + } + }); +} + +async fn restore_chunk(conf: &PageServerConf) -> Result<(), FilePathError> { + let pgdata_base_path = env::var("PGDATA_BASE_PATH").unwrap(); + info!("Restoring from local dir..."); + + let sys_id: u64 = 42; + let control_lsn = 0; //TODO get it from sysid + let mut slurp_futures: Vec<_> = Vec::new(); + + for e in WalkDir::new(pgdata_base_path.clone()) { + let entry = e.unwrap(); + + if !entry.path().is_dir() { + let path = entry.path().to_str().unwrap(); + + let relpath = path + .strip_prefix(&format!("{}/", pgdata_base_path)) + .unwrap(); + info!( + "Restoring file {} relpath {}", + entry.path().display(), + relpath + ); + + let parsed = parse_rel_file_path(&relpath); + + match parsed { + Ok(mut p) => { + p.lsn = control_lsn; + + let f = slurp_base_file(conf, sys_id, path.to_string(), p); + + slurp_futures.push(f); + } + Err(e) => { + warn!("unrecognized file: {} ({})", relpath, e); + } + }; + } + } + + let pcache = page_cache::get_pagecache(conf.clone(), sys_id); + pcache.init_valid_lsn(control_lsn); + + info!("{} files to restore...", slurp_futures.len()); + + future::join_all(slurp_futures).await; + info!("restored!"); + Ok(()) +} + +// From pg_tablespace_d.h +// +// FIXME: we'll probably need these elsewhere too, move to some common location +const DEFAULTTABLESPACE_OID: u32 = 1663; +const GLOBALTABLESPACE_OID: u32 = 1664; + +#[derive(Debug)] +struct FilePathError { + msg: String, +} + +impl FilePathError { + fn new(msg: &str) -> FilePathError { + FilePathError { + msg: msg.to_string(), + } + } +} + +impl From for FilePathError { + fn from(e: core::num::ParseIntError) -> Self { + return FilePathError { + msg: format!("invalid filename: {}", e), + }; + } +} + +impl fmt::Display for FilePathError { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + write!(f, "invalid filename") + } +} + +fn forkname_to_forknum(forkname: Option<&str>) -> Result { + match forkname { + // "main" is not in filenames, it's implicit if the fork name is not present + None => Ok(0), + Some("fsm") => Ok(1), + Some("vm") => Ok(2), + Some("init") => Ok(3), + Some(_) => Err(FilePathError::new("invalid forkname")), + } +} + +#[derive(Debug)] +struct ParsedBaseImageFileName { + pub spcnode: u32, + pub dbnode: u32, + pub relnode: u32, + pub forknum: u32, + pub segno: u32, + + pub lsn: u64, +} + +// formats: +// +// _ +// . +// _. +fn parse_filename(fname: &str) -> Result<(u32, u32, u32, u64), FilePathError> { + let re = Regex::new(r"^(?P\d+)(_(?P[a-z]+))?(\.(?P\d+))?$").unwrap(); + + let caps = re + .captures(fname) + .ok_or_else(|| FilePathError::new("invalid relation data file name"))?; + + let relnode_str = caps.name("relnode").unwrap().as_str(); + let relnode = u32::from_str_radix(relnode_str, 10)?; + + let forkname_match = caps.name("forkname"); + let forkname = if forkname_match.is_none() { + None + } else { + Some(forkname_match.unwrap().as_str()) + }; + let forknum = forkname_to_forknum(forkname)?; + + let segno_match = caps.name("segno"); + let segno = if segno_match.is_none() { + 0 + } else { + u32::from_str_radix(segno_match.unwrap().as_str(), 10)? + }; + return Ok((relnode, forknum, segno, 0)); +} + +fn parse_rel_file_path(path: &str) -> Result { + /* + * Relation data files can be in one of the following directories: + * + * global/ + * shared relations + * + * base// + * regular relations, default tablespace + * + * pg_tblspc/// + * within a non-default tablespace (the name of the directory + * depends on version) + * + * And the relation data files themselves have a filename like: + * + * . + */ + if let Some(fname) = path.strip_prefix("global/") { + let (relnode, forknum, segno, lsn) = parse_filename(fname)?; + + return Ok(ParsedBaseImageFileName { + spcnode: GLOBALTABLESPACE_OID, + dbnode: 0, + relnode, + forknum, + segno, + lsn, + }); + } else if let Some(dbpath) = path.strip_prefix("base/") { + let mut s = dbpath.split("/"); + let dbnode_str = s + .next() + .ok_or_else(|| FilePathError::new("invalid relation data file name"))?; + let dbnode = u32::from_str_radix(dbnode_str, 10)?; + let fname = s + .next() + .ok_or_else(|| FilePathError::new("invalid relation data file name"))?; + if s.next().is_some() { + return Err(FilePathError::new("invalid relation data file name")); + }; + + let (relnode, forknum, segno, lsn) = parse_filename(fname)?; + + return Ok(ParsedBaseImageFileName { + spcnode: DEFAULTTABLESPACE_OID, + dbnode, + relnode, + forknum, + segno, + lsn, + }); + } else if let Some(_) = path.strip_prefix("pg_tblspc/") { + // TODO + return Err(FilePathError::new("tablespaces not supported")); + } else { + return Err(FilePathError::new("invalid relation data file name")); + } +} + +async fn slurp_base_file( + conf: &PageServerConf, + sys_id: u64, + file_path: String, + parsed: ParsedBaseImageFileName, +) { + trace!("slurp_base_file local path {}", file_path); + + let data = fs::read(file_path).unwrap(); + let data_bytes: &[u8] = &data; + let mut bytes = BytesMut::from(data_bytes).freeze(); + + // FIXME: use constants (BLCKSZ) + let mut blknum: u32 = parsed.segno * (1024 * 1024 * 1024 / 8192); + + let pcache = page_cache::get_pagecache(conf.clone(), sys_id); + + while bytes.remaining() >= 8192 { + let tag = page_cache::BufferTag { + spcnode: parsed.spcnode, + dbnode: parsed.dbnode, + relnode: parsed.relnode, + forknum: parsed.forknum as u8, + blknum: blknum, + }; + + pcache.put_page_image(tag, parsed.lsn, bytes.copy_to_bytes(8192)); + + blknum += 1; + } +}