Restore pageserver from s3 or local datadir (#9)

* change pageserver --skip-recovery option to --restore-from=[s3|local]
* implement restore from local pgdata 
* add simple test for local restore
This commit is contained in:
lubennikovaav
2021-04-14 21:14:10 +03:00
committed by GitHub
parent 2e9c730dd1
commit 82dc1e82ba
7 changed files with 352 additions and 15 deletions

30
Cargo.lock generated
View File

@@ -1222,6 +1222,7 @@ dependencies = [
"tokio-postgres",
"tokio-stream",
"tui",
"walkdir",
]
[[package]]
@@ -1667,6 +1668,15 @@ version = "1.0.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "71d301d4193d031abdd79ff7e3dd721168a9572ef3fe51a1517aba235bd8f86e"
[[package]]
name = "same-file"
version = "1.0.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "93fc1dc3aaa9bfed95e02e6eadabb4baf7e3078b0bd1b4d7b6b0b68378900502"
dependencies = [
"winapi-util",
]
[[package]]
name = "schannel"
version = "0.1.19"
@@ -2268,6 +2278,17 @@ version = "1.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9d5b2c62b4012a3e1eca5a7e077d13b3bf498c4073e33ccd58626607748ceeca"
[[package]]
name = "walkdir"
version = "2.3.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "808cf2735cd4b6866113f648b791c6adc5714537bc222d9347bb203386ffda56"
dependencies = [
"same-file",
"winapi",
"winapi-util",
]
[[package]]
name = "walkeeper"
version = "0.1.0"
@@ -2433,6 +2454,15 @@ version = "0.4.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ac3b87c63620426dd9b991e5ce0329eff545bccbbb34f3be09ff6fb6ab51b7b6"
[[package]]
name = "winapi-util"
version = "0.1.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "70ec6ce85bb158151cae5e5c87f95a8e97d2c0c4b001223f33a334e3ce5de178"
dependencies = [
"winapi",
]
[[package]]
name = "winapi-x86_64-pc-windows-gnu"
version = "0.4.0"

View File

@@ -31,7 +31,7 @@ pub struct TestStorageControlPlane {
impl TestStorageControlPlane {
// postgres <-> page_server
pub fn one_page_server() -> TestStorageControlPlane {
pub fn one_page_server(pgdata_base_path: String) -> TestStorageControlPlane {
let env = local_env::test_env();
let pserver = Arc::new(PageServerNode {
@@ -40,7 +40,15 @@ impl TestStorageControlPlane {
listen_address: None,
});
pserver.init();
pserver.start().unwrap();
if pgdata_base_path.is_empty()
{
pserver.start().unwrap();
}
else
{
pserver.start_fromdatadir(pgdata_base_path).unwrap();
}
TestStorageControlPlane {
wal_acceptors: Vec::new(),
@@ -142,7 +150,6 @@ impl PageServerNode {
.args(&["-D", self.env.pageserver_data_dir().to_str().unwrap()])
.args(&["-l", self.address().to_string().as_str()])
.arg("-d")
.arg("--skip-recovery")
.env_clear()
.env("PATH", self.env.pg_bin_dir().to_str().unwrap()) // needs postres-wal-redo binary
.env("LD_LIBRARY_PATH", self.env.pg_lib_dir().to_str().unwrap())
@@ -158,6 +165,30 @@ impl PageServerNode {
}
}
pub fn start_fromdatadir(&self, pgdata_base_path: String) -> Result<()> {
println!("Starting pageserver at '{}'", self.address());
let status = Command::new(self.env.zenith_distrib_dir.join("pageserver")) // XXX -> method
.args(&["-D", self.env.pageserver_data_dir().to_str().unwrap()])
.args(&["-l", self.address().to_string().as_str()])
.arg("-d")
.args(&["--restore-from", "local"])
.env_clear()
.env("PATH", self.env.pg_bin_dir().to_str().unwrap()) // needs postres-wal-redo binary
.env("LD_LIBRARY_PATH", self.env.pg_lib_dir().to_str().unwrap())
.env("PGDATA_BASE_PATH", pgdata_base_path)
.status()?;
if !status.success() {
return Err(Box::<dyn error::Error>::from(format!(
"Pageserver failed to start. See '{}' for details.",
self.env.pageserver_log().to_str().unwrap()
)));
} else {
return Ok(());
}
}
pub fn stop(&self) -> Result<()> {
let pidfile = self.env.pageserver_pidfile();
let pid = read_pidfile(&pidfile)?;

View File

@@ -11,7 +11,7 @@ use control_plane::storage::TestStorageControlPlane;
#[test]
fn test_redo_cases() {
// Start pageserver that reads WAL directly from that postgres
let storage_cplane = TestStorageControlPlane::one_page_server();
let storage_cplane = TestStorageControlPlane::one_page_server(String::new());
let mut compute_cplane = ComputeControlPlane::local(&storage_cplane.pageserver);
// start postgres
@@ -51,7 +51,7 @@ fn test_redo_cases() {
#[ignore]
fn test_regress() {
// Start pageserver that reads WAL directly from that postgres
let storage_cplane = TestStorageControlPlane::one_page_server();
let storage_cplane = TestStorageControlPlane::one_page_server(String::new());
let mut compute_cplane = ComputeControlPlane::local(&storage_cplane.pageserver);
// start postgres
@@ -63,9 +63,10 @@ fn test_regress() {
// Run two postgres instances on one pageserver
#[test]
#[ignore]
fn test_pageserver_multitenancy() {
// Start pageserver that reads WAL directly from that postgres
let storage_cplane = TestStorageControlPlane::one_page_server();
let storage_cplane = TestStorageControlPlane::one_page_server(String::new());
let mut compute_cplane = ComputeControlPlane::local(&storage_cplane.pageserver);
// Allocate postgres instance, but don't start

View File

@@ -34,3 +34,4 @@ postgres-protocol = { git = "https://github.com/kelvich/rust-postgres", branch =
postgres = { git = "https://github.com/kelvich/rust-postgres", branch = "replication_rebase" }
anyhow = "1.0"
crc32c = "0.6.0"
walkdir = "2"

View File

@@ -18,6 +18,7 @@ use slog_scope;
use slog_stdlog;
use pageserver::page_service;
use pageserver::restore_datadir;
use pageserver::restore_s3;
use pageserver::tui;
use pageserver::walreceiver;
@@ -51,10 +52,9 @@ fn main() -> Result<(), io::Error> {
.long("daemonize")
.takes_value(false)
.help("Run in the background"))
.arg(Arg::with_name("skip_recovery")
.long("skip-recovery")
.takes_value(false)
.help("Skip S3 recovery procedy and start empty"))
.arg(Arg::with_name("restore-from")
.takes_value(true)
.help("Upload data from s3 or datadir"))
.get_matches();
let mut conf = PageServerConf {
@@ -63,7 +63,7 @@ fn main() -> Result<(), io::Error> {
interactive: false,
wal_producer_connstr: None,
listen_addr: "127.0.0.1:5430".parse().unwrap(),
skip_recovery: false,
restore_from: String::new(),
};
if let Some(dir) = arg_matches.value_of("datadir") {
@@ -85,8 +85,8 @@ fn main() -> Result<(), io::Error> {
));
}
if arg_matches.is_present("skip_recovery") {
conf.skip_recovery = true;
if let Some(restore_from) = arg_matches.value_of("restore_from") {
conf.restore_from = String::from(restore_from);
}
if let Some(addr) = arg_matches.value_of("wal_producer") {
@@ -159,10 +159,13 @@ fn start_pageserver(conf: PageServerConf) -> Result<(), io::Error> {
// Before opening up for connections, restore the latest base backup from S3.
// (We don't persist anything to local disk at the moment, so we need to do
// this at every startup)
if !conf.skip_recovery {
if conf.restore_from.eq("s3") {
restore_s3::restore_main(&conf);
} else if conf.restore_from.eq("local") {
restore_datadir::restore_main(&conf);
}
// Create directory for wal-redo datadirs
match fs::create_dir(conf.data_dir.join("wal-redo")) {
Ok(_) => {}

View File

@@ -3,6 +3,7 @@ use std::path::PathBuf;
pub mod page_cache;
pub mod page_service;
pub mod restore_datadir;
pub mod restore_s3;
pub mod tui;
pub mod tui_event;
@@ -19,5 +20,5 @@ pub struct PageServerConf {
pub interactive: bool,
pub wal_producer_connstr: Option<String>,
pub listen_addr: SocketAddr,
pub skip_recovery: bool,
pub restore_from: String,
}

View File

@@ -0,0 +1,270 @@
//
// Restore chunks from S3
//
// This runs once at Page Server startup. It loads all the "base images" from
// S3 into the in-memory page cache. It also initializes the "last valid LSN"
// in the page cache to the LSN of the base image, so that when the WAL receiver
// is started, it starts streaming from that LSN.
//
use bytes::{Buf, BytesMut};
use log::*;
use regex::Regex;
use std::env;
use std::fmt;
use tokio::runtime;
use futures::future;
use crate::{page_cache, PageServerConf};
use std::fs;
use walkdir::WalkDir;
pub fn restore_main(conf: &PageServerConf) {
// Create a new thread pool
let runtime = runtime::Runtime::new().unwrap();
runtime.block_on(async {
let result = restore_chunk(conf).await;
match result {
Ok(_) => {
return;
}
Err(err) => {
error!("error: {}", err);
return;
}
}
});
}
async fn restore_chunk(conf: &PageServerConf) -> Result<(), FilePathError> {
let pgdata_base_path = env::var("PGDATA_BASE_PATH").unwrap();
info!("Restoring from local dir...");
let sys_id: u64 = 42;
let control_lsn = 0; //TODO get it from sysid
let mut slurp_futures: Vec<_> = Vec::new();
for e in WalkDir::new(pgdata_base_path.clone()) {
let entry = e.unwrap();
if !entry.path().is_dir() {
let path = entry.path().to_str().unwrap();
let relpath = path
.strip_prefix(&format!("{}/", pgdata_base_path))
.unwrap();
info!(
"Restoring file {} relpath {}",
entry.path().display(),
relpath
);
let parsed = parse_rel_file_path(&relpath);
match parsed {
Ok(mut p) => {
p.lsn = control_lsn;
let f = slurp_base_file(conf, sys_id, path.to_string(), p);
slurp_futures.push(f);
}
Err(e) => {
warn!("unrecognized file: {} ({})", relpath, e);
}
};
}
}
let pcache = page_cache::get_pagecache(conf.clone(), sys_id);
pcache.init_valid_lsn(control_lsn);
info!("{} files to restore...", slurp_futures.len());
future::join_all(slurp_futures).await;
info!("restored!");
Ok(())
}
// From pg_tablespace_d.h
//
// FIXME: we'll probably need these elsewhere too, move to some common location
const DEFAULTTABLESPACE_OID: u32 = 1663;
const GLOBALTABLESPACE_OID: u32 = 1664;
#[derive(Debug)]
struct FilePathError {
msg: String,
}
impl FilePathError {
fn new(msg: &str) -> FilePathError {
FilePathError {
msg: msg.to_string(),
}
}
}
impl From<core::num::ParseIntError> for FilePathError {
fn from(e: core::num::ParseIntError) -> Self {
return FilePathError {
msg: format!("invalid filename: {}", e),
};
}
}
impl fmt::Display for FilePathError {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
write!(f, "invalid filename")
}
}
fn forkname_to_forknum(forkname: Option<&str>) -> Result<u32, FilePathError> {
match forkname {
// "main" is not in filenames, it's implicit if the fork name is not present
None => Ok(0),
Some("fsm") => Ok(1),
Some("vm") => Ok(2),
Some("init") => Ok(3),
Some(_) => Err(FilePathError::new("invalid forkname")),
}
}
#[derive(Debug)]
struct ParsedBaseImageFileName {
pub spcnode: u32,
pub dbnode: u32,
pub relnode: u32,
pub forknum: u32,
pub segno: u32,
pub lsn: u64,
}
// formats:
// <oid>
// <oid>_<fork name>
// <oid>.<segment number>
// <oid>_<fork name>.<segment number>
fn parse_filename(fname: &str) -> Result<(u32, u32, u32, u64), FilePathError> {
let re = Regex::new(r"^(?P<relnode>\d+)(_(?P<forkname>[a-z]+))?(\.(?P<segno>\d+))?$").unwrap();
let caps = re
.captures(fname)
.ok_or_else(|| FilePathError::new("invalid relation data file name"))?;
let relnode_str = caps.name("relnode").unwrap().as_str();
let relnode = u32::from_str_radix(relnode_str, 10)?;
let forkname_match = caps.name("forkname");
let forkname = if forkname_match.is_none() {
None
} else {
Some(forkname_match.unwrap().as_str())
};
let forknum = forkname_to_forknum(forkname)?;
let segno_match = caps.name("segno");
let segno = if segno_match.is_none() {
0
} else {
u32::from_str_radix(segno_match.unwrap().as_str(), 10)?
};
return Ok((relnode, forknum, segno, 0));
}
fn parse_rel_file_path(path: &str) -> Result<ParsedBaseImageFileName, FilePathError> {
/*
* Relation data files can be in one of the following directories:
*
* global/
* shared relations
*
* base/<db oid>/
* regular relations, default tablespace
*
* pg_tblspc/<tblspc oid>/<tblspc version>/
* within a non-default tablespace (the name of the directory
* depends on version)
*
* And the relation data files themselves have a filename like:
*
* <oid>.<segment number>
*/
if let Some(fname) = path.strip_prefix("global/") {
let (relnode, forknum, segno, lsn) = parse_filename(fname)?;
return Ok(ParsedBaseImageFileName {
spcnode: GLOBALTABLESPACE_OID,
dbnode: 0,
relnode,
forknum,
segno,
lsn,
});
} else if let Some(dbpath) = path.strip_prefix("base/") {
let mut s = dbpath.split("/");
let dbnode_str = s
.next()
.ok_or_else(|| FilePathError::new("invalid relation data file name"))?;
let dbnode = u32::from_str_radix(dbnode_str, 10)?;
let fname = s
.next()
.ok_or_else(|| FilePathError::new("invalid relation data file name"))?;
if s.next().is_some() {
return Err(FilePathError::new("invalid relation data file name"));
};
let (relnode, forknum, segno, lsn) = parse_filename(fname)?;
return Ok(ParsedBaseImageFileName {
spcnode: DEFAULTTABLESPACE_OID,
dbnode,
relnode,
forknum,
segno,
lsn,
});
} else if let Some(_) = path.strip_prefix("pg_tblspc/") {
// TODO
return Err(FilePathError::new("tablespaces not supported"));
} else {
return Err(FilePathError::new("invalid relation data file name"));
}
}
async fn slurp_base_file(
conf: &PageServerConf,
sys_id: u64,
file_path: String,
parsed: ParsedBaseImageFileName,
) {
trace!("slurp_base_file local path {}", file_path);
let data = fs::read(file_path).unwrap();
let data_bytes: &[u8] = &data;
let mut bytes = BytesMut::from(data_bytes).freeze();
// FIXME: use constants (BLCKSZ)
let mut blknum: u32 = parsed.segno * (1024 * 1024 * 1024 / 8192);
let pcache = page_cache::get_pagecache(conf.clone(), sys_id);
while bytes.remaining() >= 8192 {
let tag = page_cache::BufferTag {
spcnode: parsed.spcnode,
dbnode: parsed.dbnode,
relnode: parsed.relnode,
forknum: parsed.forknum as u8,
blknum: blknum,
};
pcache.put_page_image(tag, parsed.lsn, bytes.copy_to_bytes(8192));
blknum += 1;
}
}