Refactor CLI and CLI<->pageserver interfaces to support remote pageserver

This patch started as an effort to support CLI working against remote
pageserver, but turned into a pretty big refactoring.

* CLI now does not look into repository files directly. New commands
'branch_create' and 'identify_system' were introduced into page_service to
support that.
* Branch management that was scattered between local_env and
zenith/main.rs is moved into pageserver/branches.rs. That code could better fit
in Repository/Timeline impl, but I'll leave that for a different patch.
* All tests-related code from local_env went into integration_tests/src/lib.rs as an
extension to PostgresNode trait.
* Paths-generating functions were concentrated around corresponding config
types (LocalEnv and PageserverConf).
This commit is contained in:
Stas Kelvich
2021-05-16 17:19:36 +03:00
parent 53ea6702bd
commit 746f667311
27 changed files with 1317 additions and 1082 deletions

View File

@@ -141,7 +141,7 @@ jobs:
working_directory: test_runner working_directory: test_runner
environment: environment:
- ZENITH_BIN: /tmp/zenith/bin - ZENITH_BIN: /tmp/zenith/bin
- POSTGRES_BIN: /tmp/zenith/pg_install - POSTGRES_DISTRIB_DIR: /tmp/zenith/pg_install
- TEST_OUTPUT: /tmp/test_output - TEST_OUTPUT: /tmp/test_output
command: | command: |
TEST_FILE="<< parameters.test_file >>" TEST_FILE="<< parameters.test_file >>"

12
Cargo.lock generated
View File

@@ -263,8 +263,6 @@ version = "0.1.0"
dependencies = [ dependencies = [
"anyhow", "anyhow",
"bytes", "bytes",
"fs_extra",
"hex",
"lazy_static", "lazy_static",
"nix", "nix",
"pageserver", "pageserver",
@@ -273,9 +271,10 @@ dependencies = [
"rand", "rand",
"regex", "regex",
"serde", "serde",
"serde_json",
"tar", "tar",
"thiserror",
"toml", "toml",
"url",
"walkeeper", "walkeeper",
"workspace_hack", "workspace_hack",
"zenith_utils", "zenith_utils",
@@ -788,8 +787,10 @@ dependencies = [
name = "integration_tests" name = "integration_tests"
version = "0.1.0" version = "0.1.0"
dependencies = [ dependencies = [
"anyhow",
"control_plane", "control_plane",
"lazy_static", "lazy_static",
"nix",
"pageserver", "pageserver",
"postgres", "postgres",
"rand", "rand",
@@ -1169,6 +1170,7 @@ dependencies = [
"clap", "clap",
"crc32c", "crc32c",
"daemonize", "daemonize",
"fs_extra",
"futures", "futures",
"hex", "hex",
"lazy_static", "lazy_static",
@@ -2190,9 +2192,9 @@ checksum = "8ccb82d61f80a663efe1f787a51b16b5a51e3314d6ac365b08639f52387b33f3"
[[package]] [[package]]
name = "url" name = "url"
version = "2.2.1" version = "2.2.2"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9ccd964113622c8e9322cfac19eb1004a07e636c545f325da085d5cdde6f1f8b" checksum = "a507c383b2d33b5fc35d1861e77e6b383d158b2da5e14fe51b83dfedf6fd578c"
dependencies = [ dependencies = [
"form_urlencoded", "form_urlencoded",
"idna", "idna",

View File

@@ -11,15 +11,17 @@ rand = "0.8.3"
tar = "0.4.33" tar = "0.4.33"
postgres = { git = "https://github.com/zenithdb/rust-postgres.git", rev="9eb0dbfbeb6a6c1b79099b9f7ae4a8c021877858" } postgres = { git = "https://github.com/zenithdb/rust-postgres.git", rev="9eb0dbfbeb6a6c1b79099b9f7ae4a8c021877858" }
serde = { version = "1.0", features = ["derive"] } serde = { version = "1.0", features = ["derive"] }
serde_json = "1"
toml = "0.5" toml = "0.5"
lazy_static = "1.4" lazy_static = "1.4"
regex = "1" regex = "1"
anyhow = "1.0" anyhow = "1.0"
hex = "0.4.3" # hex = "0.4.3"
bytes = "1.0.1" bytes = "1.0.1"
fs_extra = "1.2.0" # fs_extra = "1.2.0"
nix = "0.20" nix = "0.20"
thiserror = "1" # thiserror = "1"
url = "2.2.2"
pageserver = { path = "../pageserver" } pageserver = { path = "../pageserver" }
walkeeper = { path = "../walkeeper" } walkeeper = { path = "../walkeeper" }

View File

@@ -1,23 +1,24 @@
use std::fs::{self, File, OpenOptions}; use std::io::Write;
use std::io::{Read, Write};
use std::net::SocketAddr; use std::net::SocketAddr;
use std::net::TcpStream; use std::net::TcpStream;
use std::os::unix::fs::PermissionsExt; use std::os::unix::fs::PermissionsExt;
use std::path::Path; use std::process::Command;
use std::process::{Command, ExitStatus};
use std::sync::Arc; use std::sync::Arc;
use std::time::Duration; use std::time::Duration;
use std::{collections::BTreeMap, path::PathBuf}; use std::{collections::BTreeMap, path::PathBuf};
use std::{
fs::{self, OpenOptions},
io::Read,
};
use anyhow::{Context, Result}; use anyhow::{Context, Result};
use lazy_static::lazy_static; use lazy_static::lazy_static;
use regex::Regex; use regex::Regex;
use postgres::{Client, NoTls};
use crate::local_env::LocalEnv; use crate::local_env::LocalEnv;
use crate::storage::{PageServerNode, WalProposerNode}; use pageserver::ZTimelineId;
use pageserver::{zenith_repo_dir, ZTimelineId};
use crate::storage::PageServerNode;
// //
// ComputeControlPlane // ComputeControlPlane
@@ -36,8 +37,8 @@ impl ComputeControlPlane {
// it is running on default port. Change that when pageserver will have config. // it is running on default port. Change that when pageserver will have config.
let pageserver = Arc::new(PageServerNode::from_env(&env)); let pageserver = Arc::new(PageServerNode::from_env(&env));
let pgdatadirspath = env.repo_path.join("pgdatadirs"); let pgdatadirspath = &env.pg_data_dirs_path();
let nodes: Result<BTreeMap<_, _>> = fs::read_dir(&pgdatadirspath) let nodes: Result<BTreeMap<_, _>> = fs::read_dir(pgdatadirspath)
.with_context(|| format!("failed to list {}", pgdatadirspath.display()))? .with_context(|| format!("failed to list {}", pgdatadirspath.display()))?
.into_iter() .into_iter()
.map(|f| { .map(|f| {
@@ -97,8 +98,14 @@ impl ComputeControlPlane {
Ok(node) Ok(node)
} }
pub fn new_test_node(&mut self, timelineid: ZTimelineId) -> Arc<PostgresNode> { pub fn new_test_node(&mut self, branch_name: &str) -> Arc<PostgresNode> {
let node = self.new_from_page_server(true, timelineid); let timeline_id = self
.pageserver
.branch_get_by_name(branch_name)
.expect("failed to get timeline_id")
.timeline_id;
let node = self.new_from_page_server(true, timeline_id);
let node = node.unwrap(); let node = node.unwrap();
// Configure the node to stream WAL directly to the pageserver // Configure the node to stream WAL directly to the pageserver
@@ -115,8 +122,14 @@ impl ComputeControlPlane {
node node
} }
pub fn new_test_master_node(&mut self, timelineid: ZTimelineId) -> Arc<PostgresNode> { pub fn new_test_master_node(&mut self, branch_name: &str) -> Arc<PostgresNode> {
let node = self.new_from_page_server(true, timelineid).unwrap(); let timeline_id = self
.pageserver
.branch_get_by_name(branch_name)
.expect("failed to get timeline_id")
.timeline_id;
let node = self.new_from_page_server(true, timeline_id).unwrap();
node.append_conf( node.append_conf(
"postgresql.conf", "postgresql.conf",
@@ -126,8 +139,14 @@ impl ComputeControlPlane {
node node
} }
pub fn new_node(&mut self, timelineid: ZTimelineId) -> Result<Arc<PostgresNode>> { pub fn new_node(&mut self, branch_name: &str) -> Result<Arc<PostgresNode>> {
let node = self.new_from_page_server(false, timelineid).unwrap(); let timeline_id = self
.pageserver
.branch_get_by_name(branch_name)
.expect("failed to get timeline_id")
.timeline_id;
let node = self.new_from_page_server(false, timeline_id).unwrap();
// Configure the node to stream WAL directly to the pageserver // Configure the node to stream WAL directly to the pageserver
node.append_conf( node.append_conf(
@@ -291,9 +310,9 @@ impl PostgresNode {
max_replication_slots = 10\n\ max_replication_slots = 10\n\
hot_standby = on\n\ hot_standby = on\n\
shared_buffers = 1MB\n\ shared_buffers = 1MB\n\
fsync = off\n\ fsync = off\n\
max_connections = 100\n\ max_connections = 100\n\
wal_sender_timeout = 0\n\ wal_sender_timeout = 0\n\
wal_level = replica\n\ wal_level = replica\n\
listen_addresses = '{address}'\n\ listen_addresses = '{address}'\n\
port = {port}\n", port = {port}\n",
@@ -326,8 +345,8 @@ impl PostgresNode {
Ok(()) Ok(())
} }
fn pgdata(&self) -> PathBuf { pub fn pgdata(&self) -> PathBuf {
self.env.repo_path.join("pgdatadirs").join(&self.name) self.env.pg_data_dir(&self.name)
} }
pub fn status(&self) -> &str { pub fn status(&self) -> &str {
@@ -413,152 +432,6 @@ impl PostgresNode {
String::from_utf8(output.stdout).unwrap().trim().to_string() String::from_utf8(output.stdout).unwrap().trim().to_string()
} }
fn dump_log_file(&self) {
if let Ok(mut file) = File::open(self.env.repo_path.join("pageserver.log")) {
let mut buffer = String::new();
file.read_to_string(&mut buffer).unwrap();
println!("--------------- pageserver.log:\n{}", buffer);
}
}
pub fn safe_psql(&self, db: &str, sql: &str) -> Vec<postgres::Row> {
let connstring = format!(
"host={} port={} dbname={} user={}",
self.address.ip(),
self.address.port(),
db,
self.whoami()
);
let mut client = Client::connect(connstring.as_str(), NoTls).unwrap();
println!("Running {}", sql);
let result = client.query(sql, &[]);
if result.is_err() {
self.dump_log_file();
}
result.unwrap()
}
pub fn open_psql(&self, db: &str) -> Client {
let connstring = format!(
"host={} port={} dbname={} user={}",
self.address.ip(),
self.address.port(),
db,
self.whoami()
);
Client::connect(connstring.as_str(), NoTls).unwrap()
}
pub fn start_proxy(&self, wal_acceptors: &str) -> WalProposerNode {
let proxy_path = self.env.pg_bin_dir().join("safekeeper_proxy");
match Command::new(proxy_path.as_path())
.args(&["--ztimelineid", &self.timelineid.to_string()])
.args(&["-s", wal_acceptors])
.args(&["-h", &self.address.ip().to_string()])
.args(&["-p", &self.address.port().to_string()])
.arg("-v")
.stderr(
OpenOptions::new()
.create(true)
.append(true)
.open(self.pgdata().join("safekeeper_proxy.log"))
.unwrap(),
)
.spawn()
{
Ok(child) => WalProposerNode { pid: child.id() },
Err(e) => panic!("Failed to launch {:?}: {}", proxy_path, e),
}
}
pub fn pg_regress(&self) -> ExitStatus {
self.safe_psql("postgres", "CREATE DATABASE regression");
let data_dir = zenith_repo_dir();
let regress_run_path = data_dir.join("regress");
fs::create_dir_all(&regress_run_path).unwrap();
fs::create_dir_all(regress_run_path.join("testtablespace")).unwrap();
std::env::set_current_dir(regress_run_path).unwrap();
let regress_build_path =
Path::new(env!("CARGO_MANIFEST_DIR")).join("../tmp_install/build/src/test/regress");
let regress_src_path =
Path::new(env!("CARGO_MANIFEST_DIR")).join("../vendor/postgres/src/test/regress");
let regress_check = Command::new(regress_build_path.join("pg_regress"))
.args(&[
"--bindir=''",
"--use-existing",
format!("--bindir={}", self.env.pg_bin_dir().to_str().unwrap()).as_str(),
format!("--dlpath={}", regress_build_path.to_str().unwrap()).as_str(),
format!(
"--schedule={}",
regress_src_path.join("parallel_schedule").to_str().unwrap()
)
.as_str(),
format!("--inputdir={}", regress_src_path.to_str().unwrap()).as_str(),
])
.env_clear()
.env("LD_LIBRARY_PATH", self.env.pg_lib_dir().to_str().unwrap())
.env("DYLD_LIBRARY_PATH", self.env.pg_lib_dir().to_str().unwrap())
.env("PGPORT", self.address.port().to_string())
.env("PGUSER", self.whoami())
.env("PGHOST", self.address.ip().to_string())
.status()
.expect("pg_regress failed");
if !regress_check.success() {
if let Ok(mut file) = File::open("regression.diffs") {
let mut buffer = String::new();
file.read_to_string(&mut buffer).unwrap();
println!("--------------- regression.diffs:\n{}", buffer);
}
self.dump_log_file();
if let Ok(mut file) = File::open(
self.env
.repo_path
.join("pgdatadirs")
.join("pg1")
.join("log"),
) {
let mut buffer = String::new();
file.read_to_string(&mut buffer).unwrap();
println!("--------------- pgdatadirs/pg1/log:\n{}", buffer);
}
}
regress_check
}
pub fn pg_bench(&self, clients: u32, seconds: u32) -> ExitStatus {
let port = self.address.port().to_string();
let clients = clients.to_string();
let seconds = seconds.to_string();
let _pg_bench_init = Command::new(self.env.pg_bin_dir().join("pgbench"))
.args(&["-i", "-p", port.as_str(), "postgres"])
.env("LD_LIBRARY_PATH", self.env.pg_lib_dir().to_str().unwrap())
.env("DYLD_LIBRARY_PATH", self.env.pg_lib_dir().to_str().unwrap())
.status()
.expect("pgbench -i");
let pg_bench_run = Command::new(self.env.pg_bin_dir().join("pgbench"))
.args(&[
"-p",
port.as_str(),
"-T",
seconds.as_str(),
"-P",
"1",
"-c",
clients.as_str(),
"-M",
"prepared",
"postgres",
])
.env("LD_LIBRARY_PATH", self.env.pg_lib_dir().to_str().unwrap())
.env("DYLD_LIBRARY_PATH", self.env.pg_lib_dir().to_str().unwrap())
.status()
.expect("pgbench run");
pg_bench_run
}
} }
impl Drop for PostgresNode { impl Drop for PostgresNode {

View File

@@ -6,7 +6,26 @@
// Intended to be used in integration tests and in CLI tools for // Intended to be used in integration tests and in CLI tools for
// local installations. // local installations.
// //
use anyhow::{anyhow, bail, Context, Result};
use std::fs;
use std::path::Path;
pub mod compute; pub mod compute;
pub mod local_env; pub mod local_env;
pub mod storage; pub mod storage;
/// Read a PID file
///
/// We expect a file that contains a single integer.
/// We return an i32 for compatibility with libc and nix.
pub fn read_pidfile(pidfile: &Path) -> Result<i32> {
let pid_str = fs::read_to_string(pidfile)
.with_context(|| format!("failed to read pidfile {:?}", pidfile))?;
let pid: i32 = pid_str
.parse()
.map_err(|_| anyhow!("failed to parse pidfile {:?}", pidfile))?;
if pid < 1 {
bail!("pidfile {:?} contained bad value '{}'", pidfile, pid);
}
Ok(pid)
}

View File

@@ -4,37 +4,23 @@
// Now it also provides init method which acts like a stub for proper installation // Now it also provides init method which acts like a stub for proper installation
// script which will use local paths. // script which will use local paths.
// //
use anyhow::Context; use anyhow::{anyhow, Result};
use bytes::Bytes; use serde::{Deserialize, Serialize};
use rand::Rng;
use std::env; use std::env;
use std::fs; use std::fs;
use std::fs::File; use std::path::PathBuf;
use std::io::Read; use url::Url;
use std::path::{Path, PathBuf};
use std::process::{Command, Stdio};
use anyhow::Result;
use serde::{Deserialize, Serialize};
use pageserver::zenith_repo_dir;
use pageserver::ZTimelineId;
use postgres_ffi::xlog_utils;
use zenith_utils::lsn::Lsn;
// //
// This data structure represents deserialized zenith config, which should be // This data structures represent deserialized zenith CLI config
// located in ~/.zenith
//
// TODO: should we also support ZENITH_CONF env var?
// //
#[derive(Serialize, Deserialize, Clone)] #[derive(Serialize, Deserialize, Clone)]
pub struct LocalEnv { pub struct LocalEnv {
// Path to the Repository. Here page server and compute nodes will create and store their data. // Pageserver connection strings
pub repo_path: PathBuf, pub pageserver_connstring: String,
// System identifier, from the PostgreSQL control file // Base directory for both pageserver and compute nodes
pub systemid: u64, pub base_data_dir: PathBuf,
// Path to postgres distribution. It's expected that "bin", "include", // Path to postgres distribution. It's expected that "bin", "include",
// "lib", "share" from postgres distribution are there. If at some point // "lib", "share" from postgres distribution are there. If at some point
@@ -42,38 +28,66 @@ pub struct LocalEnv {
// to four separate paths and match OS-specific installation layout. // to four separate paths and match OS-specific installation layout.
pub pg_distrib_dir: PathBuf, pub pg_distrib_dir: PathBuf,
// Path to pageserver binary. // Path to pageserver binary. Empty for remote pageserver.
pub zenith_distrib_dir: PathBuf, pub zenith_distrib_dir: Option<PathBuf>,
} }
impl LocalEnv { impl LocalEnv {
// postgres installation // postgres installation paths
pub fn pg_bin_dir(&self) -> PathBuf { pub fn pg_bin_dir(&self) -> PathBuf {
self.pg_distrib_dir.join("bin") self.pg_distrib_dir.join("bin")
} }
pub fn pg_lib_dir(&self) -> PathBuf { pub fn pg_lib_dir(&self) -> PathBuf {
self.pg_distrib_dir.join("lib") self.pg_distrib_dir.join("lib")
} }
pub fn pageserver_bin(&self) -> Result<PathBuf> {
Ok(self
.zenith_distrib_dir
.as_ref()
.ok_or(anyhow!("Can not manage remote pageserver"))?
.join("pageserver"))
}
pub fn pg_data_dirs_path(&self) -> PathBuf {
self.base_data_dir.join("pgdatadirs")
}
pub fn pg_data_dir(&self, name: &str) -> PathBuf {
self.pg_data_dirs_path().join(name)
}
// TODO: move pageserver files into ./pageserver
pub fn pageserver_data_dir(&self) -> PathBuf {
self.base_data_dir.clone()
}
}
fn base_path() -> PathBuf {
match std::env::var_os("ZENITH_REPO_DIR") {
Some(val) => PathBuf::from(val.to_str().unwrap()),
None => ".zenith".into(),
}
} }
// //
// Initialize a new Zenith repository // Initialize a new Zenith repository
// //
pub fn init() -> Result<()> { pub fn init(remote_pageserver: Option<&str>) -> Result<()> {
// check if config already exists // check if config already exists
let repo_path = zenith_repo_dir(); let base_path = base_path();
if repo_path.exists() { if base_path.exists() {
anyhow::bail!( anyhow::bail!(
"{} already exists. Perhaps already initialized?", "{} already exists. Perhaps already initialized?",
repo_path.to_str().unwrap() base_path.to_str().unwrap()
); );
} }
// ok, now check that expected binaries are present // ok, now check that expected binaries are present
// Find postgres binaries. Follow POSTGRES_BIN if set, otherwise look in "tmp_install". // Find postgres binaries. Follow POSTGRES_DISTRIB_DIR if set, otherwise look in "tmp_install".
let pg_distrib_dir: PathBuf = { let pg_distrib_dir: PathBuf = {
if let Some(postgres_bin) = env::var_os("POSTGRES_BIN") { if let Some(postgres_bin) = env::var_os("POSTGRES_DISTRIB_DIR") {
postgres_bin.into() postgres_bin.into()
} else { } else {
let cwd = env::current_dir()?; let cwd = env::current_dir()?;
@@ -84,137 +98,45 @@ pub fn init() -> Result<()> {
anyhow::bail!("Can't find postgres binary at {:?}", pg_distrib_dir); anyhow::bail!("Can't find postgres binary at {:?}", pg_distrib_dir);
} }
// Find zenith binaries. fs::create_dir(&base_path)?;
let zenith_distrib_dir = env::current_exe()?.parent().unwrap().to_owned(); fs::create_dir(base_path.join("pgdatadirs"))?;
if !zenith_distrib_dir.join("pageserver").exists() {
anyhow::bail!("Can't find pageserver binary.",);
}
// ok, we are good to go let conf = if let Some(addr) = remote_pageserver {
let mut conf = LocalEnv { // check that addr is parsable
repo_path, let _uri = Url::parse(addr)
pg_distrib_dir, .map_err(|e| anyhow!("{}: {}", addr, e))?;
zenith_distrib_dir,
systemid: 0, LocalEnv {
pageserver_connstring: format!("postgresql://{}/", addr),
pg_distrib_dir,
zenith_distrib_dir: None,
base_data_dir: base_path,
}
} else {
// Find zenith binaries.
let zenith_distrib_dir = env::current_exe()?.parent().unwrap().to_owned();
if !zenith_distrib_dir.join("pageserver").exists() {
anyhow::bail!("Can't find pageserver binary.",);
}
LocalEnv {
pageserver_connstring: "postgresql://127.0.0.1:6400".to_string(),
pg_distrib_dir,
zenith_distrib_dir: Some(zenith_distrib_dir),
base_data_dir: base_path,
}
}; };
init_repo(&mut conf)?;
let toml = toml::to_string(&conf)?;
fs::write(conf.base_data_dir.join("config"), toml)?;
Ok(()) Ok(())
} }
pub fn init_repo(local_env: &mut LocalEnv) -> Result<()> { // Locate and load config
let repopath = &local_env.repo_path; pub fn load_config() -> Result<LocalEnv> {
fs::create_dir(&repopath) let repopath = base_path();
.with_context(|| format!("could not create directory {}", repopath.display()))?;
fs::create_dir(repopath.join("pgdatadirs"))?;
fs::create_dir(repopath.join("timelines"))?;
fs::create_dir(repopath.join("refs"))?;
fs::create_dir(repopath.join("refs").join("branches"))?;
fs::create_dir(repopath.join("refs").join("tags"))?;
println!("created directory structure in {}", repopath.display());
// Create initial timeline
let tli = create_timeline(&local_env, None)?;
let timelinedir = repopath.join("timelines").join(tli.to_string());
println!("created initial timeline {}", timelinedir.display());
// Run initdb
//
// We create the cluster temporarily in a "tmp" directory inside the repository,
// and move it to the right location from there.
let tmppath = repopath.join("tmp");
let initdb_path = local_env.pg_bin_dir().join("initdb");
let initdb = Command::new(initdb_path)
.args(&["-D", tmppath.to_str().unwrap()])
.arg("--no-instructions")
.env_clear()
.env("LD_LIBRARY_PATH", local_env.pg_lib_dir().to_str().unwrap())
.env(
"DYLD_LIBRARY_PATH",
local_env.pg_lib_dir().to_str().unwrap(),
)
.stdout(Stdio::null())
.status()
.with_context(|| "failed to execute initdb")?;
if !initdb.success() {
anyhow::bail!("initdb failed");
}
println!("initdb succeeded");
// Read control file to extract the LSN and system id
let controlfile_path = tmppath.join("global").join("pg_control");
let controlfile = postgres_ffi::decode_pg_control(Bytes::from(fs::read(controlfile_path)?))?;
let systemid = controlfile.system_identifier;
let lsn = controlfile.checkPoint;
let lsnstr = format!("{:016X}", lsn);
// Move the initial WAL file
fs::rename(
tmppath.join("pg_wal").join("000000010000000000000001"),
timelinedir
.join("wal")
.join("000000010000000000000001.partial"),
)?;
println!("moved initial WAL file");
// Remove pg_wal
fs::remove_dir_all(tmppath.join("pg_wal"))?;
force_crash_recovery(&tmppath)?;
println!("updated pg_control");
let target = timelinedir.join("snapshots").join(&lsnstr);
fs::rename(tmppath, &target)?;
println!("moved 'tmp' to {}", target.display());
// Create 'main' branch to refer to the initial timeline
let data = tli.to_string();
fs::write(repopath.join("refs").join("branches").join("main"), data)?;
println!("created main branch");
// Also update the system id in the LocalEnv
local_env.systemid = systemid;
// write config
let toml = toml::to_string(&local_env)?;
fs::write(repopath.join("config"), toml)?;
println!(
"new zenith repository was created in {}",
repopath.display()
);
Ok(())
}
// If control file says the cluster was shut down cleanly, modify it, to mark
// it as crashed. That forces crash recovery when you start the cluster.
//
// FIXME:
// We currently do this to the initial snapshot in "zenith init". It would
// be more natural to do this when the snapshot is restored instead, but we
// currently don't have any code to create new snapshots, so it doesn't matter
// Or better yet, use a less hacky way of putting the cluster into recovery.
// Perhaps create a backup label file in the data directory when it's restored.
fn force_crash_recovery(datadir: &Path) -> Result<()> {
// Read in the control file
let controlfilepath = datadir.to_path_buf().join("global").join("pg_control");
let mut controlfile =
postgres_ffi::decode_pg_control(Bytes::from(fs::read(controlfilepath.as_path())?))?;
controlfile.state = postgres_ffi::DBState_DB_IN_PRODUCTION;
fs::write(
controlfilepath.as_path(),
postgres_ffi::encode_pg_control(controlfile),
)?;
Ok(())
}
// check that config file is present
pub fn load_config(repopath: &Path) -> Result<LocalEnv> {
if !repopath.exists() { if !repopath.exists() {
anyhow::bail!( anyhow::bail!(
"Zenith config is not found in {}. You need to run 'zenith init' first", "Zenith config is not found in {}. You need to run 'zenith init' first",
@@ -222,32 +144,13 @@ pub fn load_config(repopath: &Path) -> Result<LocalEnv> {
); );
} }
// TODO: check that it looks like a zenith repository
// load and parse file // load and parse file
let config = fs::read_to_string(repopath.join("config"))?; let config = fs::read_to_string(repopath.join("config"))?;
toml::from_str(config.as_str()).map_err(|e| e.into()) toml::from_str(config.as_str()).map_err(|e| e.into())
} }
// local env for tests
pub fn test_env(testname: &str) -> LocalEnv {
fs::create_dir_all("../tmp_check").expect("could not create directory ../tmp_check");
let repo_path = Path::new(env!("CARGO_MANIFEST_DIR"))
.join("../tmp_check/")
.join(testname);
// Remove remnants of old test repo
let _ = fs::remove_dir_all(&repo_path);
let mut local_env = LocalEnv {
repo_path,
pg_distrib_dir: Path::new(env!("CARGO_MANIFEST_DIR")).join("../tmp_install"),
zenith_distrib_dir: cargo_bin_dir(),
systemid: 0,
};
init_repo(&mut local_env).expect("could not initialize zenith repository");
local_env
}
// Find the directory where the binaries were put (i.e. target/debug/) // Find the directory where the binaries were put (i.e. target/debug/)
pub fn cargo_bin_dir() -> PathBuf { pub fn cargo_bin_dir() -> PathBuf {
let mut pathbuf = std::env::current_exe().unwrap(); let mut pathbuf = std::env::current_exe().unwrap();
@@ -259,155 +162,3 @@ pub fn cargo_bin_dir() -> PathBuf {
pathbuf pathbuf
} }
#[derive(Debug, Clone, Copy)]
pub struct PointInTime {
pub timelineid: ZTimelineId,
pub lsn: Lsn,
}
fn create_timeline(local_env: &LocalEnv, ancestor: Option<PointInTime>) -> Result<ZTimelineId> {
let repopath = &local_env.repo_path;
// Create initial timeline
let mut tli_buf = [0u8; 16];
rand::thread_rng().fill(&mut tli_buf);
let timelineid = ZTimelineId::from(tli_buf);
let timelinedir = repopath.join("timelines").join(timelineid.to_string());
fs::create_dir(&timelinedir)?;
fs::create_dir(&timelinedir.join("snapshots"))?;
fs::create_dir(&timelinedir.join("wal"))?;
if let Some(ancestor) = ancestor {
let data = format!("{}@{}", ancestor.timelineid, ancestor.lsn);
fs::write(timelinedir.join("ancestor"), data)?;
}
Ok(timelineid)
}
// Create a new branch in the repository (for the "zenith branch" subcommand)
pub fn create_branch(
local_env: &LocalEnv,
branchname: &str,
startpoint: PointInTime,
) -> Result<()> {
let repopath = &local_env.repo_path;
// create a new timeline for it
let newtli = create_timeline(local_env, Some(startpoint))?;
let newtimelinedir = repopath.join("timelines").join(newtli.to_string());
let data = newtli.to_string();
fs::write(
repopath.join("refs").join("branches").join(branchname),
data,
)?;
// Copy the latest snapshot (TODO: before the startpoint) and all WAL
// TODO: be smarter and avoid the copying...
let (_maxsnapshot, oldsnapshotdir) = find_latest_snapshot(local_env, startpoint.timelineid)?;
let copy_opts = fs_extra::dir::CopyOptions::new();
fs_extra::dir::copy(oldsnapshotdir, newtimelinedir.join("snapshots"), &copy_opts)?;
let oldtimelinedir = repopath
.join("timelines")
.join(startpoint.timelineid.to_string());
copy_wal(
&oldtimelinedir.join("wal"),
&newtimelinedir.join("wal"),
startpoint.lsn,
16 * 1024 * 1024 // FIXME: assume default WAL segment size
)?;
Ok(())
}
///
/// Copy all WAL segments from one directory to another, up to given LSN.
///
/// If the given LSN is in the middle of a segment, the last segment containing it
/// is written out as .partial, and padded with zeros.
///
fn copy_wal(src_dir: &Path, dst_dir: &Path, upto: Lsn, wal_seg_size: u64) -> Result<()>{
let last_segno = upto.segment_number(wal_seg_size);
let last_segoff = upto.segment_offset(wal_seg_size);
for entry in fs::read_dir(src_dir).unwrap() {
if let Ok(entry) = entry {
let entry_name = entry.file_name();
let fname = entry_name.to_str().unwrap();
// Check if the filename looks like an xlog file, or a .partial file.
if !xlog_utils::IsXLogFileName(fname) && !xlog_utils::IsPartialXLogFileName(fname) {
continue
}
let (segno, _tli) = xlog_utils::XLogFromFileName(fname, wal_seg_size as usize);
let copylen;
let mut dst_fname = PathBuf::from(fname);
if segno > last_segno {
// future segment, skip
continue;
} else if segno < last_segno {
copylen = wal_seg_size;
dst_fname.set_extension("");
} else {
copylen = last_segoff;
dst_fname.set_extension("partial");
}
let src_file = File::open(entry.path())?;
let mut dst_file = File::create(dst_dir.join(&dst_fname))?;
std::io::copy(&mut src_file.take(copylen), &mut dst_file)?;
if copylen < wal_seg_size {
std::io::copy(&mut std::io::repeat(0).take(wal_seg_size - copylen), &mut dst_file)?;
}
}
}
Ok(())
}
// Find the end of valid WAL in a wal directory
pub fn find_end_of_wal(local_env: &LocalEnv, timeline: ZTimelineId) -> Result<Lsn> {
let repopath = &local_env.repo_path;
let waldir = repopath
.join("timelines")
.join(timeline.to_string())
.join("wal");
let (lsn, _tli) = xlog_utils::find_end_of_wal(&waldir, 16 * 1024 * 1024, true);
Ok(Lsn(lsn))
}
// Find the latest snapshot for a timeline
fn find_latest_snapshot(local_env: &LocalEnv, timeline: ZTimelineId) -> Result<(Lsn, PathBuf)> {
let repopath = &local_env.repo_path;
let snapshotsdir = repopath
.join("timelines")
.join(timeline.to_string())
.join("snapshots");
let paths = fs::read_dir(&snapshotsdir)?;
let mut maxsnapshot = Lsn(0);
let mut snapshotdir: Option<PathBuf> = None;
for path in paths {
let path = path?;
let filename = path.file_name().to_str().unwrap().to_owned();
if let Ok(lsn) = Lsn::from_hex(&filename) {
maxsnapshot = std::cmp::max(lsn, maxsnapshot);
snapshotdir = Some(path.path());
}
}
if maxsnapshot == Lsn(0) {
// TODO: check ancestor timeline
anyhow::bail!("no snapshot found in {}", snapshotsdir.display());
}
Ok((maxsnapshot, snapshotdir.unwrap()))
}

View File

@@ -1,123 +1,18 @@
use anyhow::{anyhow, bail, Context, Result};
use nix::sys::signal::{kill, Signal};
use nix::unistd::Pid;
use std::convert::TryInto;
use std::fs;
use std::net::{SocketAddr, TcpStream}; use std::net::{SocketAddr, TcpStream};
use std::path::{Path, PathBuf}; use std::path::PathBuf;
use std::process::Command; use std::process::Command;
use std::str::FromStr;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Arc;
use std::thread; use std::thread;
use std::time::Duration; use std::time::Duration;
use std::collections::HashMap;
use anyhow::{anyhow, bail, Result};
use nix::sys::signal::{kill, Signal};
use nix::unistd::Pid;
use postgres::{Client, NoTls}; use postgres::{Client, NoTls};
use pageserver::branches::BranchInfo;
use crate::local_env::LocalEnv; use crate::local_env::LocalEnv;
use pageserver::ZTimelineId; use crate::read_pidfile;
//
// Collection of several example deployments useful for tests.
//
// I'm intendedly modelling storage and compute control planes as a separate entities
// as it is closer to the actual setup.
//
pub struct TestStorageControlPlane {
pub wal_acceptors: Vec<WalAcceptorNode>,
pub pageserver: Arc<PageServerNode>,
pub test_done: AtomicBool,
pub repopath: PathBuf,
}
impl TestStorageControlPlane {
// Peek into the repository, to grab the timeline ID of given branch
pub fn get_branch_timeline(&self, branchname: &str) -> ZTimelineId {
let branchpath = self.repopath.join("refs/branches/".to_owned() + branchname);
ZTimelineId::from_str(&(fs::read_to_string(&branchpath).unwrap())).unwrap()
}
// postgres <-> page_server
//
// Initialize a new repository and configure a page server to run in it
//
pub fn one_page_server(local_env: &LocalEnv) -> TestStorageControlPlane {
let repopath = local_env.repo_path.clone();
let pserver = Arc::new(PageServerNode {
env: local_env.clone(),
kill_on_exit: true,
listen_address: None,
});
pserver.start().unwrap();
TestStorageControlPlane {
wal_acceptors: Vec::new(),
pageserver: pserver,
test_done: AtomicBool::new(false),
repopath,
}
}
// postgres <-> {wal_acceptor1, wal_acceptor2, ...}
pub fn fault_tolerant(local_env: &LocalEnv, redundancy: usize) -> TestStorageControlPlane {
let repopath = local_env.repo_path.clone();
let mut cplane = TestStorageControlPlane {
wal_acceptors: Vec::new(),
pageserver: Arc::new(PageServerNode {
env: local_env.clone(),
kill_on_exit: true,
listen_address: None,
}),
test_done: AtomicBool::new(false),
repopath,
};
cplane.pageserver.start().unwrap();
const WAL_ACCEPTOR_PORT: usize = 54321;
for i in 0..redundancy {
let wal_acceptor = WalAcceptorNode {
listen: format!("127.0.0.1:{}", WAL_ACCEPTOR_PORT + i)
.parse()
.unwrap(),
data_dir: local_env.repo_path.join(format!("wal_acceptor_{}", i)),
env: local_env.clone(),
};
wal_acceptor.init();
wal_acceptor.start();
cplane.wal_acceptors.push(wal_acceptor);
}
cplane
}
pub fn stop(&self) {
for wa in self.wal_acceptors.iter() {
let _ = wa.stop();
}
self.test_done.store(true, Ordering::Relaxed);
}
pub fn get_wal_acceptor_conn_info(&self) -> String {
self.wal_acceptors
.iter()
.map(|wa| wa.listen.to_string())
.collect::<Vec<String>>()
.join(",")
}
pub fn is_running(&self) -> bool {
self.test_done.load(Ordering::Relaxed)
}
}
impl Drop for TestStorageControlPlane {
fn drop(&mut self) {
self.stop();
}
}
// //
// Control routines for pageserver. // Control routines for pageserver.
@@ -125,8 +20,8 @@ impl Drop for TestStorageControlPlane {
// Used in CLI and tests. // Used in CLI and tests.
// //
pub struct PageServerNode { pub struct PageServerNode {
kill_on_exit: bool, pub kill_on_exit: bool,
listen_address: Option<SocketAddr>, pub listen_address: Option<SocketAddr>,
pub env: LocalEnv, pub env: LocalEnv,
} }
@@ -146,12 +41,32 @@ impl PageServerNode {
} }
} }
pub fn init(&self) -> Result<()> {
let mut cmd = Command::new(self.env.pageserver_bin()?);
let status = cmd.args(&["--init", "-D", self.env.base_data_dir.to_str().unwrap()])
.env_clear()
.env("RUST_BACKTRACE", "1")
.env("POSTGRES_DISTRIB_DIR", self.env.pg_distrib_dir.to_str().unwrap())
.env("ZENITH_REPO_DIR", self.repo_path())
.env("PATH", self.env.pg_bin_dir().to_str().unwrap()) // needs postres-wal-redo binary
.env("LD_LIBRARY_PATH", self.env.pg_lib_dir().to_str().unwrap())
.env("DYLD_LIBRARY_PATH", self.env.pg_lib_dir().to_str().unwrap())
.status()
.expect("pageserver init failed");
if status.success() {
Ok(())
} else {
Err(anyhow!("pageserver init failed"))
}
}
pub fn repo_path(&self) -> PathBuf { pub fn repo_path(&self) -> PathBuf {
self.env.repo_path.clone() self.env.pageserver_data_dir()
} }
pub fn pid_file(&self) -> PathBuf { pub fn pid_file(&self) -> PathBuf {
self.env.repo_path.join("pageserver.pid") self.repo_path().join("pageserver.pid")
} }
pub fn start(&self) -> Result<()> { pub fn start(&self) -> Result<()> {
@@ -161,11 +76,12 @@ impl PageServerNode {
self.repo_path().display() self.repo_path().display()
); );
let mut cmd = Command::new(self.env.zenith_distrib_dir.join("pageserver")); let mut cmd = Command::new(self.env.pageserver_bin()?);
cmd.args(&["-l", self.address().to_string().as_str()]) cmd.args(&["-l", self.address().to_string().as_str(), "-D", self.repo_path().to_str().unwrap()])
.arg("-d") .arg("-d")
.env_clear() .env_clear()
.env("RUST_BACKTRACE", "1") .env("RUST_BACKTRACE", "1")
.env("POSTGRES_DISTRIB_DIR", self.env.pg_distrib_dir.to_str().unwrap())
.env("ZENITH_REPO_DIR", self.repo_path()) .env("ZENITH_REPO_DIR", self.repo_path())
.env("PATH", self.env.pg_bin_dir().to_str().unwrap()) // needs postres-wal-redo binary .env("PATH", self.env.pg_bin_dir().to_str().unwrap()) // needs postres-wal-redo binary
.env("LD_LIBRARY_PATH", self.env.pg_lib_dir().to_str().unwrap()) .env("LD_LIBRARY_PATH", self.env.pg_lib_dir().to_str().unwrap())
@@ -226,9 +142,7 @@ impl PageServerNode {
client.simple_query(sql).unwrap() client.simple_query(sql).unwrap()
} }
pub fn page_server_psql_client( pub fn page_server_psql_client(&self) -> Result<postgres::Client, postgres::Error> {
&self,
) -> std::result::Result<postgres::Client, postgres::Error> {
let connstring = format!( let connstring = format!(
"host={} port={} dbname={} user={}", "host={} port={} dbname={} user={}",
self.address().ip(), self.address().ip(),
@@ -238,6 +152,74 @@ impl PageServerNode {
); );
Client::connect(connstring.as_str(), NoTls) Client::connect(connstring.as_str(), NoTls)
} }
pub fn branches_list(&self) -> Result<Vec<BranchInfo>> {
let mut client = self.page_server_psql_client()?;
let query_result = client.simple_query("pg_list")?;
let branches_json = query_result
.first()
.map(|msg| match msg {
postgres::SimpleQueryMessage::Row(row) => row.get(0),
_ => None,
})
.flatten()
.ok_or_else(|| anyhow!("missing branches"))?;
let res: Vec<BranchInfo> = serde_json::from_str(branches_json)?;
Ok(res)
}
pub fn branch_create(&self, name: &str, startpoint: &str) -> Result<BranchInfo> {
let mut client = self.page_server_psql_client()?;
let query_result =
client.simple_query(format!("branch_create {} {}", name, startpoint).as_str())?;
let branch_json = query_result
.first()
.map(|msg| match msg {
postgres::SimpleQueryMessage::Row(row) => row.get(0),
_ => None,
})
.flatten()
.ok_or_else(|| anyhow!("missing branch"))?;
let res: BranchInfo = serde_json::from_str(branch_json)
.map_err(|e| anyhow!("failed to parse branch_create response: {}: {}", branch_json, e))?;
Ok(res)
}
// TODO: make this a separate request type and avoid loading all the branches
pub fn branch_get_by_name(&self, name: &str) -> Result<BranchInfo> {
let branch_infos = self.branches_list()?;
let branche_by_name: Result<HashMap<String, BranchInfo>> = branch_infos
.into_iter()
.map(|branch_info| Ok((branch_info.name.clone(), branch_info)))
.collect();
let branche_by_name = branche_by_name?;
let branch = branche_by_name
.get(name)
.ok_or_else(|| anyhow!("Branch {} not found", name))?;
Ok(branch.clone())
}
pub fn system_id_get(&self) -> Result<u64> {
let mut client = self.page_server_psql_client()?;
let query_result = client
.simple_query("identify_system")?
.first()
.map(|msg| match msg {
postgres::SimpleQueryMessage::Row(row) => row.get(0),
_ => None,
})
.flatten()
.ok_or_else(|| anyhow!("failed to get system_id"))?
.parse::<u64>()?;
Ok(query_result)
}
} }
impl Drop for PageServerNode { impl Drop for PageServerNode {
@@ -247,104 +229,3 @@ impl Drop for PageServerNode {
} }
} }
} }
//
// Control routines for WalAcceptor.
//
// Now used only in test setups.
//
pub struct WalAcceptorNode {
listen: SocketAddr,
data_dir: PathBuf,
env: LocalEnv,
}
impl WalAcceptorNode {
pub fn init(&self) {
if self.data_dir.exists() {
fs::remove_dir_all(self.data_dir.clone()).unwrap();
}
fs::create_dir_all(self.data_dir.clone()).unwrap();
}
pub fn start(&self) {
println!(
"Starting wal_acceptor in {} listening '{}'",
self.data_dir.to_str().unwrap(),
self.listen
);
let status = Command::new(self.env.zenith_distrib_dir.join("wal_acceptor"))
.args(&["-D", self.data_dir.to_str().unwrap()])
.args(&["-l", self.listen.to_string().as_str()])
.args(&["--systemid", &self.env.systemid.to_string()])
// Tell page server it can receive WAL from this WAL safekeeper
// FIXME: If there are multiple safekeepers, they will all inform
// the page server. Only the last "notification" will stay in effect.
// So it's pretty random which safekeeper the page server will connect to
.args(&["--pageserver", "127.0.0.1:64000"])
.arg("-d")
.arg("-n")
.status()
.expect("failed to start wal_acceptor");
if !status.success() {
panic!("wal_acceptor start failed");
}
}
pub fn stop(&self) -> Result<()> {
println!("Stopping wal acceptor on {}", self.listen);
let pidfile = self.data_dir.join("wal_acceptor.pid");
let pid = read_pidfile(&pidfile)?;
let pid = Pid::from_raw(pid);
if kill(pid, Signal::SIGTERM).is_err() {
bail!("Failed to kill wal_acceptor with pid {}", pid);
}
Ok(())
}
}
impl Drop for WalAcceptorNode {
fn drop(&mut self) {
// Ignore errors.
let _ = self.stop();
}
}
///////////////////////////////////////////////////////////////////////////////
pub struct WalProposerNode {
pub pid: u32,
}
impl WalProposerNode {
pub fn stop(&self) {
// std::process::Child::id() returns u32, we need i32.
let pid: i32 = self.pid.try_into().unwrap();
let pid = Pid::from_raw(pid);
kill(pid, Signal::SIGTERM).expect("failed to execute kill");
}
}
impl Drop for WalProposerNode {
fn drop(&mut self) {
self.stop();
}
}
/// Read a PID file
///
/// We expect a file that contains a single integer.
/// We return an i32 for compatibility with libc and nix.
fn read_pidfile(pidfile: &Path) -> Result<i32> {
let pid_str = fs::read_to_string(pidfile)
.with_context(|| format!("failed to read pidfile {:?}", pidfile))?;
let pid: i32 = pid_str
.parse()
.map_err(|_| anyhow!("failed to parse pidfile {:?}", pidfile))?;
if pid < 1 {
bail!("pidfile {:?} contained bad value '{}'", pidfile, pid);
}
Ok(pid)
}

View File

@@ -9,7 +9,10 @@ edition = "2018"
[dependencies] [dependencies]
lazy_static = "1.4.0" lazy_static = "1.4.0"
rand = "0.8.3" rand = "0.8.3"
anyhow = "1.0"
nix = "0.20"
postgres = { git = "https://github.com/zenithdb/rust-postgres.git", rev="9eb0dbfbeb6a6c1b79099b9f7ae4a8c021877858" } postgres = { git = "https://github.com/zenithdb/rust-postgres.git", rev="9eb0dbfbeb6a6c1b79099b9f7ae4a8c021877858" }
pageserver = { path = "../pageserver" } pageserver = { path = "../pageserver" }
walkeeper = { path = "../walkeeper" } walkeeper = { path = "../walkeeper" }
control_plane = { path = "../control_plane" } control_plane = { path = "../control_plane" }

View File

@@ -0,0 +1,403 @@
use anyhow::{bail, Result};
use std::sync::{atomic::AtomicBool, Arc};
use std::{
convert::TryInto,
fs::{self, File, OpenOptions},
io::Read,
net::SocketAddr,
path::{Path, PathBuf},
process::{Command, ExitStatus},
sync::atomic::Ordering,
};
use control_plane::compute::PostgresNode;
use control_plane::local_env;
use control_plane::read_pidfile;
use control_plane::{local_env::LocalEnv, storage::PageServerNode};
use nix::sys::signal::{kill, Signal};
use nix::unistd::Pid;
use postgres;
// local compute env for tests
pub fn create_test_env(testname: &str) -> LocalEnv {
let base_path = Path::new(env!("CARGO_MANIFEST_DIR"))
.join("../tmp_check/")
.join(testname);
let base_path_str = base_path.to_str().unwrap();
// Remove remnants of old test repo
let _ = fs::remove_dir_all(&base_path);
fs::create_dir_all(&base_path).expect(format!("could not create directory for {}", base_path_str).as_str());
let pgdatadirs_path = base_path.join("pgdatadirs");
fs::create_dir(&pgdatadirs_path)
.expect(format!("could not create directory {:?}", pgdatadirs_path).as_str());
LocalEnv {
pageserver_connstring: "postgresql://127.0.0.1:64000".to_string(),
pg_distrib_dir: Path::new(env!("CARGO_MANIFEST_DIR")).join("../tmp_install"),
zenith_distrib_dir: Some(local_env::cargo_bin_dir()),
base_data_dir: base_path,
}
}
//
// Collection of several example deployments useful for tests.
//
// I'm intendedly modelling storage and compute control planes as a separate entities
// as it is closer to the actual setup.
//
pub struct TestStorageControlPlane {
pub wal_acceptors: Vec<WalAcceptorNode>,
pub pageserver: Arc<PageServerNode>,
pub test_done: AtomicBool,
}
impl TestStorageControlPlane {
// postgres <-> page_server
//
// Initialize a new repository and configure a page server to run in it
//
pub fn one_page_server(local_env: &LocalEnv) -> TestStorageControlPlane {
let pserver = Arc::new(PageServerNode {
env: local_env.clone(),
kill_on_exit: true,
listen_address: None,
});
pserver.init().unwrap();
pserver.start().unwrap();
TestStorageControlPlane {
wal_acceptors: Vec::new(),
pageserver: pserver,
test_done: AtomicBool::new(false),
}
}
// postgres <-> {wal_acceptor1, wal_acceptor2, ...}
pub fn fault_tolerant(local_env: &LocalEnv, redundancy: usize) -> TestStorageControlPlane {
let mut cplane = TestStorageControlPlane {
wal_acceptors: Vec::new(),
pageserver: Arc::new(PageServerNode {
env: local_env.clone(),
kill_on_exit: true,
listen_address: None,
}),
test_done: AtomicBool::new(false),
// repopath,
};
cplane.pageserver.init().unwrap();
cplane.pageserver.start().unwrap();
let systemid = cplane.pageserver.system_id_get().unwrap();
const WAL_ACCEPTOR_PORT: usize = 54321;
let datadir_base = local_env.base_data_dir.join("safekeepers");
fs::create_dir_all(&datadir_base).unwrap();
for i in 0..redundancy {
let wal_acceptor = WalAcceptorNode {
listen: format!("127.0.0.1:{}", WAL_ACCEPTOR_PORT + i)
.parse()
.unwrap(),
data_dir: datadir_base.join(format!("wal_acceptor_{}", i)),
systemid,
env: local_env.clone(),
pass_to_pageserver: i == 0
};
wal_acceptor.init();
wal_acceptor.start();
cplane.wal_acceptors.push(wal_acceptor);
}
cplane
}
pub fn stop(&self) {
for wa in self.wal_acceptors.iter() {
let _ = wa.stop();
}
self.test_done.store(true, Ordering::Relaxed);
}
pub fn get_wal_acceptor_conn_info(&self) -> String {
self.wal_acceptors
.iter()
.map(|wa| wa.listen.to_string())
.collect::<Vec<String>>()
.join(",")
}
pub fn is_running(&self) -> bool {
self.test_done.load(Ordering::Relaxed)
}
}
impl Drop for TestStorageControlPlane {
fn drop(&mut self) {
self.stop();
}
}
///////////////////////////////////////////////////////////////////////////////
//
// PostgresNodeExt
//
///////////////////////////////////////////////////////////////////////////////
///
/// Testing utilities for PostgresNode type
///
pub trait PostgresNodeExt {
fn pg_regress(&self) -> ExitStatus;
fn pg_bench(&self, clients: u32, seconds: u32) -> ExitStatus;
fn start_proxy(&self, wal_acceptors: &str) -> WalProposerNode;
fn open_psql(&self, db: &str) -> postgres::Client;
fn dump_log_file(&self);
fn safe_psql(&self, db: &str, sql: &str) -> Vec<postgres::Row>;
}
impl PostgresNodeExt for PostgresNode {
fn pg_regress(&self) -> ExitStatus {
self.safe_psql("postgres", "CREATE DATABASE regression");
let regress_run_path = self.env.base_data_dir.join("regress");
fs::create_dir_all(&regress_run_path).unwrap();
fs::create_dir_all(regress_run_path.join("testtablespace")).unwrap();
std::env::set_current_dir(regress_run_path).unwrap();
let regress_build_path =
Path::new(env!("CARGO_MANIFEST_DIR")).join("../tmp_install/build/src/test/regress");
let regress_src_path =
Path::new(env!("CARGO_MANIFEST_DIR")).join("../vendor/postgres/src/test/regress");
let regress_check = Command::new(regress_build_path.join("pg_regress"))
.args(&[
"--bindir=''",
"--use-existing",
format!("--bindir={}", self.env.pg_bin_dir().to_str().unwrap()).as_str(),
format!("--dlpath={}", regress_build_path.to_str().unwrap()).as_str(),
format!(
"--schedule={}",
regress_src_path.join("parallel_schedule").to_str().unwrap()
)
.as_str(),
format!("--inputdir={}", regress_src_path.to_str().unwrap()).as_str(),
])
.env_clear()
.env("LD_LIBRARY_PATH", self.env.pg_lib_dir().to_str().unwrap())
.env("DYLD_LIBRARY_PATH", self.env.pg_lib_dir().to_str().unwrap())
.env("PGPORT", self.address.port().to_string())
.env("PGUSER", self.whoami())
.env("PGHOST", self.address.ip().to_string())
.status()
.expect("pg_regress failed");
if !regress_check.success() {
if let Ok(mut file) = File::open("regression.diffs") {
let mut buffer = String::new();
file.read_to_string(&mut buffer).unwrap();
println!("--------------- regression.diffs:\n{}", buffer);
}
// self.dump_log_file();
if let Ok(mut file) = File::open(self.env.pg_data_dir("pg1").join("log")) {
let mut buffer = String::new();
file.read_to_string(&mut buffer).unwrap();
println!("--------------- pgdatadirs/pg1/log:\n{}", buffer);
}
}
regress_check
}
fn pg_bench(&self, clients: u32, seconds: u32) -> ExitStatus {
let port = self.address.port().to_string();
let clients = clients.to_string();
let seconds = seconds.to_string();
let _pg_bench_init = Command::new(self.env.pg_bin_dir().join("pgbench"))
.args(&["-i", "-p", port.as_str(), "postgres"])
.env("LD_LIBRARY_PATH", self.env.pg_lib_dir().to_str().unwrap())
.env("DYLD_LIBRARY_PATH", self.env.pg_lib_dir().to_str().unwrap())
.status()
.expect("pgbench -i");
let pg_bench_run = Command::new(self.env.pg_bin_dir().join("pgbench"))
.args(&[
"-p",
port.as_str(),
"-T",
seconds.as_str(),
"-P",
"1",
"-c",
clients.as_str(),
"-M",
"prepared",
"postgres",
])
.env("LD_LIBRARY_PATH", self.env.pg_lib_dir().to_str().unwrap())
.env("DYLD_LIBRARY_PATH", self.env.pg_lib_dir().to_str().unwrap())
.status()
.expect("pgbench run");
pg_bench_run
}
fn start_proxy(&self, wal_acceptors: &str) -> WalProposerNode {
let proxy_path = self.env.pg_bin_dir().join("safekeeper_proxy");
match Command::new(proxy_path.as_path())
.args(&["--ztimelineid", &self.timelineid.to_string()])
.args(&["-s", wal_acceptors])
.args(&["-h", &self.address.ip().to_string()])
.args(&["-p", &self.address.port().to_string()])
.arg("-v")
.stderr(
OpenOptions::new()
.create(true)
.append(true)
.open(self.pgdata().join("safekeeper_proxy.log"))
.unwrap(),
)
.spawn()
{
Ok(child) => WalProposerNode { pid: child.id() },
Err(e) => panic!("Failed to launch {:?}: {}", proxy_path, e),
}
}
fn dump_log_file(&self) {
if let Ok(mut file) = File::open(self.env.pageserver_data_dir().join("pageserver.log")) {
let mut buffer = String::new();
file.read_to_string(&mut buffer).unwrap();
println!("--------------- pageserver.log:\n{}", buffer);
}
}
fn safe_psql(&self, db: &str, sql: &str) -> Vec<postgres::Row> {
let connstring = format!(
"host={} port={} dbname={} user={}",
self.address.ip(),
self.address.port(),
db,
self.whoami()
);
let mut client = postgres::Client::connect(connstring.as_str(), postgres::NoTls).unwrap();
println!("Running {}", sql);
let result = client.query(sql, &[]);
if result.is_err() {
// self.dump_log_file();
}
result.unwrap()
}
fn open_psql(&self, db: &str) -> postgres::Client {
let connstring = format!(
"host={} port={} dbname={} user={}",
self.address.ip(),
self.address.port(),
db,
self.whoami()
);
postgres::Client::connect(connstring.as_str(), postgres::NoTls).unwrap()
}
}
///////////////////////////////////////////////////////////////////////////////
//
// WalAcceptorNode
//
///////////////////////////////////////////////////////////////////////////////
//
// Control routines for WalAcceptor.
//
// Now used only in test setups.
//
pub struct WalAcceptorNode {
listen: SocketAddr,
data_dir: PathBuf,
systemid: u64,
env: LocalEnv,
pass_to_pageserver: bool,
}
impl WalAcceptorNode {
pub fn init(&self) {
if self.data_dir.exists() {
fs::remove_dir_all(self.data_dir.clone()).unwrap();
}
fs::create_dir_all(self.data_dir.clone()).unwrap();
}
pub fn start(&self) {
println!(
"Starting wal_acceptor in {} listening '{}'",
self.data_dir.to_str().unwrap(),
self.listen
);
let ps_arg = if self.pass_to_pageserver {
// Tell page server it can receive WAL from this WAL safekeeper
["--pageserver", "127.0.0.1:64000"].to_vec()
} else {
[].to_vec()
};
let status = Command::new(self.env.zenith_distrib_dir.as_ref().unwrap().join("wal_acceptor"))
.args(&["-D", self.data_dir.to_str().unwrap()])
.args(&["-l", self.listen.to_string().as_str()])
.args(&["--systemid", self.systemid.to_string().as_str()])
.args(&ps_arg)
.arg("-d")
.arg("-n")
.status()
.expect("failed to start wal_acceptor");
if !status.success() {
panic!("wal_acceptor start failed");
}
}
pub fn stop(&self) -> Result<()> {
println!("Stopping wal acceptor on {}", self.listen);
let pidfile = self.data_dir.join("wal_acceptor.pid");
let pid = read_pidfile(&pidfile)?;
let pid = Pid::from_raw(pid);
if kill(pid, Signal::SIGTERM).is_err() {
bail!("Failed to kill wal_acceptor with pid {}", pid);
}
Ok(())
}
}
impl Drop for WalAcceptorNode {
fn drop(&mut self) {
// Ignore errors.
let _ = self.stop();
}
}
///////////////////////////////////////////////////////////////////////////////
//
// WalProposerNode
//
///////////////////////////////////////////////////////////////////////////////
pub struct WalProposerNode {
pub pid: u32,
}
impl WalProposerNode {
pub fn stop(&self) {
// std::process::Child::id() returns u32, we need i32.
let pid: i32 = self.pid.try_into().unwrap();
let pid = Pid::from_raw(pid);
kill(pid, Signal::SIGTERM).expect("failed to execute kill");
}
}
impl Drop for WalProposerNode {
fn drop(&mut self) {
self.stop();
}
}

View File

@@ -1,11 +0,0 @@
// test node resettlement to an empty datadir
// TODO
/*
#[test]
fn test_resettlement() {}
// test seq scan of everythin after restart
#[test]
fn test_cold_seqscan() {}
*/

View File

@@ -1,8 +0,0 @@
// TODO
/*
#[test]
fn test_actions() {}
#[test]
fn test_regress() {}
*/

View File

@@ -1,23 +1,22 @@
// mod control_plane;
use control_plane::compute::ComputeControlPlane; use control_plane::compute::ComputeControlPlane;
use control_plane::local_env;
use control_plane::local_env::PointInTime; use integration_tests;
use control_plane::storage::TestStorageControlPlane; use integration_tests::TestStorageControlPlane;
use integration_tests::PostgresNodeExt;
// XXX: force all redo at the end // XXX: force all redo at the end
// -- restart + seqscan won't read deleted stuff // -- restart + seqscan won't read deleted stuff
// -- pageserver api endpoint to check all rels // -- pageserver api endpoint to check all rels
#[test] #[test]
fn test_redo_cases() { fn test_redo_cases() {
let local_env = local_env::test_env("test_redo_cases"); let local_env = integration_tests::create_test_env("test_redo_cases");
// Start pageserver that reads WAL directly from that postgres // Start pageserver that reads WAL directly from that postgres
let storage_cplane = TestStorageControlPlane::one_page_server(&local_env); let storage_cplane = TestStorageControlPlane::one_page_server(&local_env);
let mut compute_cplane = ComputeControlPlane::local(&local_env, &storage_cplane.pageserver); let mut compute_cplane = ComputeControlPlane::local(&local_env, &storage_cplane.pageserver);
// start postgres // start postgres
let maintli = storage_cplane.get_branch_timeline("main"); let node = compute_cplane.new_test_node("main");
let node = compute_cplane.new_test_node(maintli);
node.start().unwrap(); node.start().unwrap();
// check basic work with table // check basic work with table
@@ -51,15 +50,14 @@ fn test_redo_cases() {
// Runs pg_regress on a compute node // Runs pg_regress on a compute node
#[test] #[test]
fn test_regress() { fn test_regress() {
let local_env = local_env::test_env("test_regress"); let local_env = integration_tests::create_test_env("test_regress");
// Start pageserver that reads WAL directly from that postgres // Start pageserver that reads WAL directly from that postgres
let storage_cplane = TestStorageControlPlane::one_page_server(&local_env); let storage_cplane = TestStorageControlPlane::one_page_server(&local_env);
let mut compute_cplane = ComputeControlPlane::local(&local_env, &storage_cplane.pageserver); let mut compute_cplane = ComputeControlPlane::local(&local_env, &storage_cplane.pageserver);
// start postgres // start postgres
let maintli = storage_cplane.get_branch_timeline("main"); let node = compute_cplane.new_test_node("main");
let node = compute_cplane.new_test_node(maintli);
node.start().unwrap(); node.start().unwrap();
let status = node.pg_regress(); let status = node.pg_regress();
@@ -69,15 +67,14 @@ fn test_regress() {
// Runs pg_bench on a compute node // Runs pg_bench on a compute node
#[test] #[test]
fn pgbench() { fn pgbench() {
let local_env = local_env::test_env("pgbench"); let local_env = integration_tests::create_test_env("pgbench");
// Start pageserver that reads WAL directly from that postgres // Start pageserver that reads WAL directly from that postgres
let storage_cplane = TestStorageControlPlane::one_page_server(&local_env); let storage_cplane = TestStorageControlPlane::one_page_server(&local_env);
let mut compute_cplane = ComputeControlPlane::local(&local_env, &storage_cplane.pageserver); let mut compute_cplane = ComputeControlPlane::local(&local_env, &storage_cplane.pageserver);
// start postgres // start postgres
let maintli = storage_cplane.get_branch_timeline("main"); let node = compute_cplane.new_test_node("main");
let node = compute_cplane.new_test_node(maintli);
node.start().unwrap(); node.start().unwrap();
let status = node.pg_bench(10, 5); let status = node.pg_bench(10, 5);
@@ -87,30 +84,21 @@ fn pgbench() {
// Run two postgres instances on one pageserver, on different timelines // Run two postgres instances on one pageserver, on different timelines
#[test] #[test]
fn test_pageserver_two_timelines() { fn test_pageserver_two_timelines() {
let local_env = local_env::test_env("test_pageserver_two_timelines"); let local_env = integration_tests::create_test_env("test_pageserver_two_timelines");
// Start pageserver that reads WAL directly from that postgres // Start pageserver that reads WAL directly from that postgres
let storage_cplane = TestStorageControlPlane::one_page_server(&local_env); let storage_cplane = TestStorageControlPlane::one_page_server(&local_env);
let mut compute_cplane = ComputeControlPlane::local(&local_env, &storage_cplane.pageserver); let mut compute_cplane = ComputeControlPlane::local(&local_env, &storage_cplane.pageserver);
let maintli = storage_cplane.get_branch_timeline("main");
// Create new branch at the end of 'main' // Create new branch at the end of 'main'
let startpoint = local_env::find_end_of_wal(&local_env, maintli).unwrap(); storage_cplane
local_env::create_branch( .pageserver
&local_env, .branch_create("experimental", "main")
"experimental", .unwrap();
PointInTime {
timelineid: maintli,
lsn: startpoint,
},
)
.unwrap();
let experimentaltli = storage_cplane.get_branch_timeline("experimental");
// Launch postgres instances on both branches // Launch postgres instances on both branches
let node1 = compute_cplane.new_test_node(maintli); let node1 = compute_cplane.new_test_node("main");
let node2 = compute_cplane.new_test_node(experimentaltli); let node2 = compute_cplane.new_test_node("experimental");
node1.start().unwrap(); node1.start().unwrap();
node2.start().unwrap(); node2.start().unwrap();

View File

@@ -1,21 +1,20 @@
// Restart acceptors one by one while compute is under the load.
use control_plane::compute::ComputeControlPlane;
use control_plane::local_env;
use control_plane::local_env::PointInTime;
use control_plane::storage::TestStorageControlPlane;
use pageserver::ZTimelineId;
use rand::Rng; use rand::Rng;
use std::sync::Arc; use std::sync::Arc;
use std::time::SystemTime; use std::time::SystemTime;
use std::{thread, time}; use std::{thread, time};
use control_plane::compute::ComputeControlPlane;
use integration_tests;
use integration_tests::TestStorageControlPlane;
use integration_tests::PostgresNodeExt;
const DOWNTIME: u64 = 2; const DOWNTIME: u64 = 2;
#[test] #[test]
//#[ignore] //#[ignore]
fn test_embedded_wal_proposer() { fn test_embedded_wal_proposer() {
let local_env = local_env::test_env("test_embedded_wal_proposer"); let local_env = integration_tests::create_test_env("test_embedded_wal_proposer");
const REDUNDANCY: usize = 3; const REDUNDANCY: usize = 3;
let storage_cplane = TestStorageControlPlane::fault_tolerant(&local_env, REDUNDANCY); let storage_cplane = TestStorageControlPlane::fault_tolerant(&local_env, REDUNDANCY);
@@ -23,8 +22,7 @@ fn test_embedded_wal_proposer() {
let wal_acceptors = storage_cplane.get_wal_acceptor_conn_info(); let wal_acceptors = storage_cplane.get_wal_acceptor_conn_info();
// start postgres // start postgres
let maintli = storage_cplane.get_branch_timeline("main"); let node = compute_cplane.new_test_master_node("main");
let node = compute_cplane.new_test_master_node(maintli);
node.append_conf( node.append_conf(
"postgresql.conf", "postgresql.conf",
&format!("wal_acceptors='{}'\n", wal_acceptors), &format!("wal_acceptors='{}'\n", wal_acceptors),
@@ -52,7 +50,7 @@ fn test_embedded_wal_proposer() {
#[test] #[test]
fn test_acceptors_normal_work() { fn test_acceptors_normal_work() {
let local_env = local_env::test_env("test_acceptors_normal_work"); let local_env = integration_tests::create_test_env("test_acceptors_normal_work");
const REDUNDANCY: usize = 3; const REDUNDANCY: usize = 3;
let storage_cplane = TestStorageControlPlane::fault_tolerant(&local_env, REDUNDANCY); let storage_cplane = TestStorageControlPlane::fault_tolerant(&local_env, REDUNDANCY);
@@ -60,8 +58,7 @@ fn test_acceptors_normal_work() {
let wal_acceptors = storage_cplane.get_wal_acceptor_conn_info(); let wal_acceptors = storage_cplane.get_wal_acceptor_conn_info();
// start postgres // start postgres
let maintli = storage_cplane.get_branch_timeline("main"); let node = compute_cplane.new_test_master_node("main");
let node = compute_cplane.new_test_master_node(maintli);
node.start().unwrap(); node.start().unwrap();
// start proxy // start proxy
@@ -93,36 +90,25 @@ fn test_many_timelines() {
// Initialize a new repository, and set up WAL safekeepers and page server. // Initialize a new repository, and set up WAL safekeepers and page server.
const REDUNDANCY: usize = 3; const REDUNDANCY: usize = 3;
const N_TIMELINES: usize = 5; const N_TIMELINES: usize = 5;
let local_env = local_env::test_env("test_many_timelines"); let local_env = integration_tests::create_test_env("test_many_timelines");
let storage_cplane = TestStorageControlPlane::fault_tolerant(&local_env, REDUNDANCY); let storage_cplane = TestStorageControlPlane::fault_tolerant(&local_env, REDUNDANCY);
let mut compute_cplane = ComputeControlPlane::local(&local_env, &storage_cplane.pageserver); let mut compute_cplane = ComputeControlPlane::local(&local_env, &storage_cplane.pageserver);
let wal_acceptors = storage_cplane.get_wal_acceptor_conn_info(); let wal_acceptors = storage_cplane.get_wal_acceptor_conn_info();
// Create branches // Create branches
let mut timelines: Vec<ZTimelineId> = Vec::new(); let mut timelines: Vec<String> = Vec::new();
let maintli = storage_cplane.get_branch_timeline("main"); // main branch timelines.push("main".to_string());
timelines.push(maintli);
let startpoint = local_env::find_end_of_wal(&local_env, maintli).unwrap();
for i in 1..N_TIMELINES { for i in 1..N_TIMELINES {
// additional branches
let branchname = format!("experimental{}", i); let branchname = format!("experimental{}", i);
local_env::create_branch( storage_cplane.pageserver.branch_create(&branchname, "main").unwrap();
&local_env, timelines.push(branchname);
&branchname,
PointInTime {
timelineid: maintli,
lsn: startpoint,
},
)
.unwrap();
let tli = storage_cplane.get_branch_timeline(&branchname);
timelines.push(tli);
} }
// start postgres on each timeline // start postgres on each timeline
let mut nodes = Vec::new(); let mut nodes = Vec::new();
for tli in timelines { for tli_name in timelines {
let node = compute_cplane.new_test_node(tli); let node = compute_cplane.new_test_node(&tli_name);
nodes.push(node.clone()); nodes.push(node.clone());
node.start().unwrap(); node.start().unwrap();
node.start_proxy(&wal_acceptors); node.start_proxy(&wal_acceptors);
@@ -159,7 +145,7 @@ fn test_many_timelines() {
// Majority is always alive // Majority is always alive
#[test] #[test]
fn test_acceptors_restarts() { fn test_acceptors_restarts() {
let local_env = local_env::test_env("test_acceptors_restarts"); let local_env = integration_tests::create_test_env("test_acceptors_restarts");
// Start pageserver that reads WAL directly from that postgres // Start pageserver that reads WAL directly from that postgres
const REDUNDANCY: usize = 3; const REDUNDANCY: usize = 3;
@@ -171,8 +157,7 @@ fn test_acceptors_restarts() {
let mut rng = rand::thread_rng(); let mut rng = rand::thread_rng();
// start postgres // start postgres
let maintli = storage_cplane.get_branch_timeline("main"); let node = compute_cplane.new_test_master_node("main");
let node = compute_cplane.new_test_master_node(maintli);
node.start().unwrap(); node.start().unwrap();
// start proxy // start proxy
@@ -222,7 +207,7 @@ fn start_acceptor(cplane: &Arc<TestStorageControlPlane>, no: usize) {
// N_CRASHES env var // N_CRASHES env var
#[test] #[test]
fn test_acceptors_unavailability() { fn test_acceptors_unavailability() {
let local_env = local_env::test_env("test_acceptors_unavailability"); let local_env = integration_tests::create_test_env("test_acceptors_unavailability");
// Start pageserver that reads WAL directly from that postgres // Start pageserver that reads WAL directly from that postgres
const REDUNDANCY: usize = 2; const REDUNDANCY: usize = 2;
@@ -232,8 +217,7 @@ fn test_acceptors_unavailability() {
let wal_acceptors = storage_cplane.get_wal_acceptor_conn_info(); let wal_acceptors = storage_cplane.get_wal_acceptor_conn_info();
// start postgres // start postgres
let maintli = storage_cplane.get_branch_timeline("main"); let node = compute_cplane.new_test_master_node("main");
let node = compute_cplane.new_test_master_node(maintli);
node.start().unwrap(); node.start().unwrap();
// start proxy // start proxy
@@ -307,7 +291,7 @@ fn simulate_failures(cplane: Arc<TestStorageControlPlane>) {
// Race condition test // Race condition test
#[test] #[test]
fn test_race_conditions() { fn test_race_conditions() {
let local_env = local_env::test_env("test_race_conditions"); let local_env = integration_tests::create_test_env("test_race_conditions");
// Start pageserver that reads WAL directly from that postgres // Start pageserver that reads WAL directly from that postgres
const REDUNDANCY: usize = 3; const REDUNDANCY: usize = 3;
@@ -319,8 +303,7 @@ fn test_race_conditions() {
let wal_acceptors = storage_cplane.get_wal_acceptor_conn_info(); let wal_acceptors = storage_cplane.get_wal_acceptor_conn_info();
// start postgres // start postgres
let maintli = storage_cplane.get_branch_timeline("main"); let node = compute_cplane.new_test_master_node("main");
let node = compute_cplane.new_test_master_node(maintli);
node.start().unwrap(); node.start().unwrap();
// start proxy // start proxy

View File

@@ -40,6 +40,7 @@ tar = "0.4.33"
parse_duration = "2.1.1" parse_duration = "2.1.1"
serde = { version = "1.0", features = ["derive"] } serde = { version = "1.0", features = ["derive"] }
serde_json = "1" serde_json = "1"
fs_extra = "1.2.0"
postgres_ffi = { path = "../postgres_ffi" } postgres_ffi = { path = "../postgres_ffi" }
zenith_utils = { path = "../zenith_utils" } zenith_utils = { path = "../zenith_utils" }

View File

@@ -4,7 +4,8 @@
use log::*; use log::*;
use parse_duration::parse; use parse_duration::parse;
use std::fs::{self, File, OpenOptions}; use std::fs::{File, OpenOptions};
use std::{env, path::PathBuf};
use std::io; use std::io;
use std::process::exit; use std::process::exit;
use std::thread; use std::thread;
@@ -16,7 +17,7 @@ use daemonize::Daemonize;
use slog::{Drain, FnValue}; use slog::{Drain, FnValue};
use pageserver::{page_cache, page_service, tui, zenith_repo_dir, PageServerConf}; use pageserver::{page_cache, page_service, tui, PageServerConf, branches};
const DEFAULT_GC_HORIZON: u64 = 64 * 1024 * 1024; const DEFAULT_GC_HORIZON: u64 = 64 * 1024 * 1024;
const DEFAULT_GC_PERIOD_SEC: u64 = 10; const DEFAULT_GC_PERIOD_SEC: u64 = 10;
@@ -47,6 +48,12 @@ fn main() -> Result<()> {
.takes_value(false) .takes_value(false)
.help("Run in the background"), .help("Run in the background"),
) )
.arg(
Arg::with_name("init")
.long("init")
.takes_value(false)
.help("Initialize pageserver repo"),
)
.arg( .arg(
Arg::with_name("gc_horizon") Arg::with_name("gc_horizon")
.long("gc_horizon") .long("gc_horizon")
@@ -59,16 +66,55 @@ fn main() -> Result<()> {
.takes_value(true) .takes_value(true)
.help("Interval between garbage collector iterations"), .help("Interval between garbage collector iterations"),
) )
.arg(
Arg::with_name("workdir")
.short("D")
.long("workdir")
.takes_value(true)
.help("Working directory for the pageserver"),
)
.get_matches(); .get_matches();
let workdir = if let Some(workdir_arg) = arg_matches.value_of("workdir") {
PathBuf::from(workdir_arg)
} else if let Some(workdir_arg) = std::env::var_os("ZENITH_REPO_DIR") {
PathBuf::from(workdir_arg.to_str().unwrap())
} else {
PathBuf::from(".zenith")
};
let pg_distrib_dir: PathBuf = {
if let Some(postgres_bin) = env::var_os("POSTGRES_DISTRIB_DIR") {
postgres_bin.into()
} else {
let cwd = env::current_dir()?;
cwd.join("tmp_install")
}
};
if !pg_distrib_dir.join("bin/postgres").exists() {
anyhow::bail!("Can't find postgres binary at {:?}", pg_distrib_dir);
}
let mut conf = PageServerConf { let mut conf = PageServerConf {
daemonize: false, daemonize: false,
interactive: false, interactive: false,
gc_horizon: DEFAULT_GC_HORIZON, gc_horizon: DEFAULT_GC_HORIZON,
gc_period: Duration::from_secs(DEFAULT_GC_PERIOD_SEC), gc_period: Duration::from_secs(DEFAULT_GC_PERIOD_SEC),
listen_addr: "127.0.0.1:5430".parse().unwrap(), listen_addr: "127.0.0.1:64000".parse().unwrap(),
workdir,
pg_distrib_dir,
}; };
// Create repo and exit if init was requested
if arg_matches.is_present("init") {
branches::init_repo(&conf)?;
return Ok(());
}
// Set CWD to workdir for non-daemon modes
env::set_current_dir(&conf.workdir)?;
if arg_matches.is_present("daemonize") { if arg_matches.is_present("daemonize") {
conf.daemonize = true; conf.daemonize = true;
} }
@@ -98,8 +144,7 @@ fn main() -> Result<()> {
} }
fn start_pageserver(conf: &PageServerConf) -> Result<()> { fn start_pageserver(conf: &PageServerConf) -> Result<()> {
let repodir = zenith_repo_dir(); let log_filename = "pageserver.log";
let log_filename = repodir.join("pageserver.log");
// Don't open the same file for output multiple times; // Don't open the same file for output multiple times;
// the different fds could overwrite each other's output. // the different fds could overwrite each other's output.
let log_file = OpenOptions::new() let log_file = OpenOptions::new()
@@ -141,8 +186,8 @@ fn start_pageserver(conf: &PageServerConf) -> Result<()> {
let stderr = log_file; let stderr = log_file;
let daemonize = Daemonize::new() let daemonize = Daemonize::new()
.pid_file(repodir.join("pageserver.pid")) .pid_file("pageserver.pid")
.working_directory(repodir) .working_directory(".")
.stdout(stdout) .stdout(stdout)
.stderr(stderr); .stderr(stderr);
@@ -153,26 +198,14 @@ fn start_pageserver(conf: &PageServerConf) -> Result<()> {
} else { } else {
// change into the repository directory. In daemon mode, Daemonize // change into the repository directory. In daemon mode, Daemonize
// does this for us. // does this for us.
let repodir = zenith_repo_dir(); std::env::set_current_dir(&conf.workdir)?;
std::env::set_current_dir(&repodir)?; info!("Changed current directory to repository in {:?}", &conf.workdir);
info!("Changed current directory to repository in {:?}", &repodir);
} }
let mut threads = Vec::new(); let mut threads = Vec::new();
// TODO: Check that it looks like a valid repository before going further // TODO: Check that it looks like a valid repository before going further
// Create directory for wal-redo datadirs
match fs::create_dir("wal-redo") {
Ok(_) => {}
Err(e) => match e.kind() {
io::ErrorKind::AlreadyExists => {}
_ => {
anyhow::bail!("Failed to create wal-redo data directory: {}", e);
}
},
}
page_cache::init(conf); page_cache::init(conf);
// GetPage@LSN requests are served by another thread. (It uses async I/O, // GetPage@LSN requests are served by another thread. (It uses async I/O,

View File

@@ -1,17 +1,135 @@
use anyhow::Result; //
use serde::{Deserialize, Serialize}; // Branch management code
//
// TODO: move all paths construction to conf impl
//
use anyhow::{Context, Result, anyhow};
use bytes::Bytes;
use postgres_ffi::xlog_utils;
use rand::Rng;
use serde::{Deserialize, Serialize};
use std::{collections::HashMap, fs, path::{Path, PathBuf}, process::{Command, Stdio}, str::FromStr};
use fs_extra;
use fs::File;
use std::io::Read;
use std::env;
use zenith_utils::lsn::Lsn; use zenith_utils::lsn::Lsn;
use crate::{repository::Repository, ZTimelineId}; use crate::{repository::Repository, ZTimelineId, PageServerConf};
#[derive(Serialize, Deserialize)] #[derive(Serialize, Deserialize, Clone)]
pub struct BranchInfo { pub struct BranchInfo {
pub name: String, pub name: String,
pub timeline_id: String, pub timeline_id: ZTimelineId,
pub latest_valid_lsn: Option<Lsn>, pub latest_valid_lsn: Option<Lsn>,
} }
// impl BranchInfo {
// pub fn lsn_string(&self) -> String {
// let lsn_string_opt = self.latest_valid_lsn.map(|lsn| lsn.to_string());
// let lsn_str = lsn_string_opt.as_deref().unwrap_or("?");
// format!("{}@{}", self.name, lsn_str)
// }
// }
#[derive(Debug, Clone, Copy)]
pub struct PointInTime {
pub timelineid: ZTimelineId,
pub lsn: Lsn,
}
pub fn init_repo(conf: &PageServerConf) -> Result<()> {
// top-level dir may exist if we are creating it through CLI
fs::create_dir_all(&conf.workdir)
.with_context(|| format!("could not create directory {}", &conf.workdir.display()))?;
env::set_current_dir(&conf.workdir)?;
fs::create_dir(std::path::Path::new("timelines"))?;
fs::create_dir(std::path::Path::new("refs"))?;
fs::create_dir(std::path::Path::new("refs").join("branches"))?;
fs::create_dir(std::path::Path::new("refs").join("tags"))?;
fs::create_dir(std::path::Path::new("wal-redo"))?;
println!("created directory structure in {}", &conf.workdir.display());
// Create initial timeline
let tli = create_timeline(conf, None)?;
let timelinedir = conf.timeline_path(tli);
println!("created initial timeline {}", timelinedir.display());
// Run initdb
//
// We create the cluster temporarily in a "tmp" directory inside the repository,
// and move it to the right location from there.
let tmppath = std::path::Path::new("tmp");
let initdb_path = conf.pg_bin_dir().join("initdb");
let initdb = Command::new(initdb_path)
.args(&["-D", tmppath.to_str().unwrap()])
.arg("--no-instructions")
.env_clear()
.env("LD_LIBRARY_PATH", conf.pg_lib_dir().to_str().unwrap())
.env(
"DYLD_LIBRARY_PATH",
conf.pg_lib_dir().to_str().unwrap(),
)
.stdout(Stdio::null())
.status()
.with_context(|| "failed to execute initdb")?;
if !initdb.success() {
anyhow::bail!("initdb failed");
}
println!("initdb succeeded");
// Read control file to extract the LSN and system id
let controlfile_path = tmppath.join("global").join("pg_control");
let controlfile = postgres_ffi::decode_pg_control(Bytes::from(fs::read(controlfile_path)?))?;
// let systemid = controlfile.system_identifier;
let lsn = controlfile.checkPoint;
let lsnstr = format!("{:016X}", lsn);
// Move the initial WAL file
fs::rename(
tmppath.join("pg_wal").join("000000010000000000000001"),
timelinedir
.join("wal")
.join("000000010000000000000001.partial"),
)?;
println!("moved initial WAL file");
// Remove pg_wal
fs::remove_dir_all(tmppath.join("pg_wal"))?;
force_crash_recovery(&tmppath)?;
println!("updated pg_control");
let target = timelinedir.join("snapshots").join(&lsnstr);
fs::rename(tmppath, &target)?;
println!("moved 'tmp' to {}", target.display());
// Create 'main' branch to refer to the initial timeline
let data = tli.to_string();
fs::write(conf.branch_path("main"), data)?;
println!("created main branch");
// XXX: do we need that now? -- yep, for test only
// // Also update the system id in the LocalEnv
// local_env.systemid = systemid;
// // write config
// let toml = toml::to_string(&local_env)?;
// fs::write(repopath.join("config"), toml)?;
println!(
"new zenith repository was created in {}",
conf.workdir.display()
);
Ok(())
}
pub(crate) fn get_branches(repository: &dyn Repository) -> Result<Vec<BranchInfo>> { pub(crate) fn get_branches(repository: &dyn Repository) -> Result<Vec<BranchInfo>> {
// adapted from CLI code // adapted from CLI code
let branches_dir = std::path::Path::new("refs").join("branches"); let branches_dir = std::path::Path::new("refs").join("branches");
@@ -28,9 +146,263 @@ pub(crate) fn get_branches(repository: &dyn Repository) -> Result<Vec<BranchInfo
Ok(BranchInfo { Ok(BranchInfo {
name, name,
timeline_id: timeline_id.to_string(), timeline_id,
latest_valid_lsn, latest_valid_lsn,
}) })
}) })
.collect() .collect()
} }
pub(crate) fn get_system_id(conf: &PageServerConf) -> Result<u64> {
// let branches = get_branches();
let branches_dir = std::path::Path::new("refs").join("branches");
let branches = std::fs::read_dir(&branches_dir)?
.map(|dir_entry_res| {
let dir_entry = dir_entry_res?;
let name = dir_entry.file_name().to_str().unwrap().to_string();
let timeline_id = std::fs::read_to_string(dir_entry.path())?.parse::<ZTimelineId>()?;
Ok((name, timeline_id))
})
.collect::<Result<HashMap<String, ZTimelineId>>>()?;
let main_tli = branches
.get("main")
.ok_or_else(|| anyhow!("Branch main not found"))?;
let (_, main_snap_dir) = find_latest_snapshot(conf, *main_tli)?;
let controlfile_path = main_snap_dir.join("global").join("pg_control");
let controlfile = postgres_ffi::decode_pg_control(Bytes::from(fs::read(controlfile_path)?))?;
Ok(controlfile.system_identifier)
}
pub(crate) fn create_branch(conf: &PageServerConf, branchname: &str, startpoint_str: &str) -> Result<BranchInfo> {
if conf.branch_path(&branchname).exists() {
anyhow::bail!("branch {} already exists", branchname);
}
let mut startpoint = parse_point_in_time(conf, startpoint_str)?;
if startpoint.lsn == Lsn(0) {
// Find end of WAL on the old timeline
let end_of_wal = find_end_of_wal(conf, startpoint.timelineid)?;
println!("branching at end of WAL: {}", end_of_wal);
startpoint.lsn = end_of_wal;
}
// create a new timeline for it
let newtli = create_timeline(conf, Some(startpoint))?;
let newtimelinedir = conf.timeline_path(newtli);
let data = newtli.to_string();
fs::write(conf.branch_path(&branchname), data)?;
// Copy the latest snapshot (TODO: before the startpoint) and all WAL
// TODO: be smarter and avoid the copying...
let (_maxsnapshot, oldsnapshotdir) = find_latest_snapshot(conf, startpoint.timelineid)?;
let copy_opts = fs_extra::dir::CopyOptions::new();
fs_extra::dir::copy(oldsnapshotdir, newtimelinedir.join("snapshots"), &copy_opts)?;
let oldtimelinedir = conf.timeline_path(startpoint.timelineid);
copy_wal(
&oldtimelinedir.join("wal"),
&newtimelinedir.join("wal"),
startpoint.lsn,
16 * 1024 * 1024 // FIXME: assume default WAL segment size
)?;
Ok(BranchInfo {
name: branchname.to_string(),
timeline_id: newtli,
latest_valid_lsn: Some(startpoint.lsn),
})
}
//
// Parse user-given string that represents a point-in-time.
//
// We support multiple variants:
//
// Raw timeline id in hex, meaning the end of that timeline:
// bc62e7d612d0e6fe8f99a6dd2f281f9d
//
// A specific LSN on a timeline:
// bc62e7d612d0e6fe8f99a6dd2f281f9d@2/15D3DD8
//
// Same, with a human-friendly branch name:
// main
// main@2/15D3DD8
//
// Human-friendly tag name:
// mytag
//
//
fn parse_point_in_time(conf: &PageServerConf, s: &str) -> Result<PointInTime> {
let mut strings = s.split('@');
let name = strings.next().unwrap();
let lsn: Option<Lsn>;
if let Some(lsnstr) = strings.next() {
lsn = Some(Lsn::from_str(lsnstr)
.with_context(|| "invalid LSN in point-in-time specification")?);
} else {
lsn = None
}
// Check if it's a tag
if lsn.is_none() {
let tagpath = conf.tag_path(name);
if tagpath.exists() {
let pointstr = fs::read_to_string(tagpath)?;
return parse_point_in_time(conf, &pointstr);
}
}
// Check if it's a branch
// Check if it's branch @ LSN
let branchpath = conf.branch_path(name);
if branchpath.exists() {
let pointstr = fs::read_to_string(branchpath)?;
let mut result = parse_point_in_time(conf, &pointstr)?;
result.lsn = lsn.unwrap_or(Lsn(0));
return Ok(result);
}
// Check if it's a timelineid
// Check if it's timelineid @ LSN
if let Ok(timelineid) = ZTimelineId::from_str(name) {
let tlipath = conf.timeline_path(timelineid);
if tlipath.exists() {
return Ok(PointInTime {
timelineid,
lsn: lsn.unwrap_or(Lsn(0)),
});
}
}
panic!("could not parse point-in-time {}", s);
}
// If control file says the cluster was shut down cleanly, modify it, to mark
// it as crashed. That forces crash recovery when you start the cluster.
//
// FIXME:
// We currently do this to the initial snapshot in "zenith init". It would
// be more natural to do this when the snapshot is restored instead, but we
// currently don't have any code to create new snapshots, so it doesn't matter
// Or better yet, use a less hacky way of putting the cluster into recovery.
// Perhaps create a backup label file in the data directory when it's restored.
fn force_crash_recovery(datadir: &Path) -> Result<()> {
// Read in the control file
let controlfilepath = datadir.to_path_buf().join("global").join("pg_control");
let mut controlfile =
postgres_ffi::decode_pg_control(Bytes::from(fs::read(controlfilepath.as_path())?))?;
controlfile.state = postgres_ffi::DBState_DB_IN_PRODUCTION;
fs::write(
controlfilepath.as_path(),
postgres_ffi::encode_pg_control(controlfile),
)?;
Ok(())
}
fn create_timeline(conf: &PageServerConf, ancestor: Option<PointInTime>) -> Result<ZTimelineId> {
// Create initial timeline
let mut tli_buf = [0u8; 16];
rand::thread_rng().fill(&mut tli_buf);
let timelineid = ZTimelineId::from(tli_buf);
let timelinedir = conf.timeline_path(timelineid);
fs::create_dir(&timelinedir)?;
fs::create_dir(&timelinedir.join("snapshots"))?;
fs::create_dir(&timelinedir.join("wal"))?;
if let Some(ancestor) = ancestor {
let data = format!("{}@{}", ancestor.timelineid, ancestor.lsn);
fs::write(timelinedir.join("ancestor"), data)?;
}
Ok(timelineid)
}
///
/// Copy all WAL segments from one directory to another, up to given LSN.
///
/// If the given LSN is in the middle of a segment, the last segment containing it
/// is written out as .partial, and padded with zeros.
///
fn copy_wal(src_dir: &Path, dst_dir: &Path, upto: Lsn, wal_seg_size: u64) -> Result<()>{
let last_segno = upto.segment_number(wal_seg_size);
let last_segoff = upto.segment_offset(wal_seg_size);
for entry in fs::read_dir(src_dir).unwrap() {
if let Ok(entry) = entry {
let entry_name = entry.file_name();
let fname = entry_name.to_str().unwrap();
// Check if the filename looks like an xlog file, or a .partial file.
if !xlog_utils::IsXLogFileName(fname) && !xlog_utils::IsPartialXLogFileName(fname) {
continue
}
let (segno, _tli) = xlog_utils::XLogFromFileName(fname, wal_seg_size as usize);
let copylen;
let mut dst_fname = PathBuf::from(fname);
if segno > last_segno {
// future segment, skip
continue;
} else if segno < last_segno {
copylen = wal_seg_size;
dst_fname.set_extension("");
} else {
copylen = last_segoff;
dst_fname.set_extension("partial");
}
let src_file = File::open(entry.path())?;
let mut dst_file = File::create(dst_dir.join(&dst_fname))?;
std::io::copy(&mut src_file.take(copylen), &mut dst_file)?;
if copylen < wal_seg_size {
std::io::copy(&mut std::io::repeat(0).take(wal_seg_size - copylen), &mut dst_file)?;
}
}
}
Ok(())
}
// Find the end of valid WAL in a wal directory
pub fn find_end_of_wal(conf: &PageServerConf, timeline: ZTimelineId) -> Result<Lsn> {
let waldir = conf.timeline_path(timeline).join("wal");
let (lsn, _tli) = xlog_utils::find_end_of_wal(&waldir, 16 * 1024 * 1024, true);
Ok(Lsn(lsn))
}
// Find the latest snapshot for a timeline
fn find_latest_snapshot(conf: &PageServerConf, timeline: ZTimelineId) -> Result<(Lsn, PathBuf)> {
let snapshotsdir = conf.snapshots_path(timeline);
let paths = fs::read_dir(&snapshotsdir)?;
let mut maxsnapshot = Lsn(0);
let mut snapshotdir: Option<PathBuf> = None;
for path in paths {
let path = path?;
let filename = path.file_name().to_str().unwrap().to_owned();
if let Ok(lsn) = Lsn::from_hex(&filename) {
maxsnapshot = std::cmp::max(lsn, maxsnapshot);
snapshotdir = Some(path.path());
}
}
if maxsnapshot == Lsn(0) {
// TODO: check ancestor timeline
anyhow::bail!("no snapshot found in {}", snapshotsdir.display());
}
Ok((maxsnapshot, snapshotdir.unwrap()))
}

View File

@@ -26,8 +26,47 @@ pub struct PageServerConf {
pub listen_addr: SocketAddr, pub listen_addr: SocketAddr,
pub gc_horizon: u64, pub gc_horizon: u64,
pub gc_period: Duration, pub gc_period: Duration,
pub workdir: PathBuf,
pub pg_distrib_dir: PathBuf,
} }
impl PageServerConf {
//
// Repository paths, relative to workdir.
//
fn tag_path(&self, name: &str) -> PathBuf {
std::path::Path::new("refs").join("tags").join(name)
}
fn branch_path(&self, name: &str) -> PathBuf {
std::path::Path::new("refs").join("branches").join(name)
}
fn timeline_path(&self, timelineid: ZTimelineId) -> PathBuf {
std::path::Path::new("timelines").join(timelineid.to_string())
}
fn snapshots_path(&self, timelineid: ZTimelineId) -> PathBuf {
std::path::Path::new("timelines").join(timelineid.to_string()).join("snapshots")
}
//
// Postgres distribution paths
//
pub fn pg_bin_dir(&self) -> PathBuf {
self.pg_distrib_dir.join("bin")
}
pub fn pg_lib_dir(&self) -> PathBuf {
self.pg_distrib_dir.join("lib")
}
}
/// Zenith Timeline ID is a 128-bit random ID. /// Zenith Timeline ID is a 128-bit random ID.
/// ///
/// Zenith timeline IDs are different from PostgreSQL timeline /// Zenith timeline IDs are different from PostgreSQL timeline
@@ -89,10 +128,3 @@ impl fmt::Display for ZTimelineId {
} }
} }
pub fn zenith_repo_dir() -> PathBuf {
// Find repository path
match std::env::var_os("ZENITH_REPO_DIR") {
Some(val) => PathBuf::from(val.to_str().unwrap()),
None => ".zenith".into(),
}
}

View File

@@ -8,7 +8,6 @@ use crate::repository::Repository;
use crate::walredo::PostgresRedoManager; use crate::walredo::PostgresRedoManager;
use crate::PageServerConf; use crate::PageServerConf;
use lazy_static::lazy_static; use lazy_static::lazy_static;
use std::path::Path;
use std::sync::{Arc, Mutex}; use std::sync::{Arc, Mutex};
lazy_static! { lazy_static! {
@@ -22,7 +21,7 @@ pub fn init(conf: &PageServerConf) {
let walredo_mgr = PostgresRedoManager::new(conf); let walredo_mgr = PostgresRedoManager::new(conf);
// we have already changed current dir to the repository. // we have already changed current dir to the repository.
let repo = RocksRepository::new(conf, Path::new("."), Arc::new(walredo_mgr)); let repo = RocksRepository::new(conf, Arc::new(walredo_mgr));
*m = Some(Arc::new(repo)); *m = Some(Arc::new(repo));
} }

View File

@@ -30,6 +30,7 @@ use crate::restore_local_repo;
use crate::walreceiver; use crate::walreceiver;
use crate::PageServerConf; use crate::PageServerConf;
use crate::ZTimelineId; use crate::ZTimelineId;
use crate::branches;
#[derive(Debug)] #[derive(Debug)]
enum FeMessage { enum FeMessage {
@@ -690,6 +691,28 @@ impl Connection {
self.write_message_noflush(&BeMessage::CommandComplete)?; self.write_message_noflush(&BeMessage::CommandComplete)?;
self.write_message(&BeMessage::ReadyForQuery)?; self.write_message(&BeMessage::ReadyForQuery)?;
} else if query_string.starts_with(b"branch_create ") {
let query_str = String::from_utf8(query_string.to_vec())?;
let err = || anyhow!("invalid branch_create: '{}'", query_str);
// branch_create <branchname> <startpoint>
// TODO lazy static
let re = Regex::new(r"^branch_create (\w+) ([\w@\\]+)[\r\n\s]*;?$").unwrap();
let caps = re
.captures(&query_str)
.ok_or_else(err)?;
let branchname: String = String::from(caps.get(1).ok_or_else(err)?.as_str());
let startpoint_str: String = String::from(caps.get(2).ok_or_else(err)?.as_str());
let branch = branches::create_branch(&self.conf, &branchname, &startpoint_str)?;
let branch = serde_json::to_vec(&branch)?;
self.write_message_noflush(&BeMessage::RowDescription)?;
self.write_message_noflush(&BeMessage::DataRow(Bytes::from(branch)))?;
self.write_message_noflush(&BeMessage::CommandComplete)?;
self.write_message(&BeMessage::ReadyForQuery)?;
} else if query_string.starts_with(b"pg_list") { } else if query_string.starts_with(b"pg_list") {
let branches = crate::branches::get_branches(&*page_cache::get_repository())?; let branches = crate::branches::get_branches(&*page_cache::get_repository())?;
let branches_buf = serde_json::to_vec(&branches)?; let branches_buf = serde_json::to_vec(&branches)?;
@@ -708,6 +731,15 @@ impl Connection {
// on connect // on connect
self.write_message_noflush(&BeMessage::CommandComplete)?; self.write_message_noflush(&BeMessage::CommandComplete)?;
self.write_message(&BeMessage::ReadyForQuery)?; self.write_message(&BeMessage::ReadyForQuery)?;
} else if query_string.to_ascii_lowercase().starts_with(b"identify_system") {
// TODO: match postgres response formarmat for 'identify_system'
let system_id = crate::branches::get_system_id(&self.conf)?
.to_string();
self.write_message_noflush(&BeMessage::RowDescription)?;
self.write_message_noflush(&BeMessage::DataRow(Bytes::from(system_id)))?;
self.write_message_noflush(&BeMessage::CommandComplete)?;
self.write_message(&BeMessage::ReadyForQuery)?;
} else { } else {
self.write_message_noflush(&BeMessage::RowDescription)?; self.write_message_noflush(&BeMessage::RowDescription)?;
self.write_message_noflush(&HELLO_WORLD_ROW)?; self.write_message_noflush(&HELLO_WORLD_ROW)?;

View File

@@ -291,6 +291,7 @@ mod tests {
use std::path::Path; use std::path::Path;
use std::str::FromStr; use std::str::FromStr;
use std::time::Duration; use std::time::Duration;
use std::env;
fn get_test_conf() -> PageServerConf { fn get_test_conf() -> PageServerConf {
PageServerConf { PageServerConf {
@@ -299,6 +300,8 @@ mod tests {
gc_horizon: 64 * 1024 * 1024, gc_horizon: 64 * 1024 * 1024,
gc_period: Duration::from_secs(10), gc_period: Duration::from_secs(10),
listen_addr: "127.0.0.1:5430".parse().unwrap(), listen_addr: "127.0.0.1:5430".parse().unwrap(),
workdir: "".into(),
pg_distrib_dir: "".into(),
} }
} }
@@ -345,7 +348,9 @@ mod tests {
let repo_dir = Path::new("../tmp_check/test_relsize_repo"); let repo_dir = Path::new("../tmp_check/test_relsize_repo");
let _ = fs::remove_dir_all(repo_dir); let _ = fs::remove_dir_all(repo_dir);
fs::create_dir_all(repo_dir)?; fs::create_dir_all(repo_dir)?;
let repo = rocksdb::RocksRepository::new(&get_test_conf(), repo_dir, Arc::new(walredo_mgr)); env::set_current_dir(repo_dir)?;
let repo = rocksdb::RocksRepository::new(&get_test_conf(), Arc::new(walredo_mgr));
// get_timeline() with non-existent timeline id should fail // get_timeline() with non-existent timeline id should fail
//repo.get_timeline("11223344556677881122334455667788"); //repo.get_timeline("11223344556677881122334455667788");

View File

@@ -11,6 +11,8 @@ use crate::waldecoder::{Oid, TransactionId};
use crate::walredo::WalRedoManager; use crate::walredo::WalRedoManager;
use crate::PageServerConf; use crate::PageServerConf;
use crate::ZTimelineId; use crate::ZTimelineId;
// use crate::PageServerConf;
// use crate::branches;
use anyhow::{bail, Context, Result}; use anyhow::{bail, Context, Result};
use bytes::{Buf, BufMut, Bytes, BytesMut}; use bytes::{Buf, BufMut, Bytes, BytesMut};
use log::*; use log::*;
@@ -18,7 +20,6 @@ use postgres_ffi::pg_constants;
use std::cmp::min; use std::cmp::min;
use std::collections::HashMap; use std::collections::HashMap;
use std::convert::TryInto; use std::convert::TryInto;
use std::path::{Path, PathBuf};
use std::sync::atomic::AtomicU64; use std::sync::atomic::AtomicU64;
use std::sync::atomic::Ordering; use std::sync::atomic::Ordering;
use std::sync::{Arc, Mutex}; use std::sync::{Arc, Mutex};
@@ -31,7 +32,6 @@ use zenith_utils::seqwait::SeqWait;
static TIMEOUT: Duration = Duration::from_secs(60); static TIMEOUT: Duration = Duration::from_secs(60);
pub struct RocksRepository { pub struct RocksRepository {
repo_dir: PathBuf,
conf: PageServerConf, conf: PageServerConf,
timelines: Mutex<HashMap<ZTimelineId, Arc<RocksTimeline>>>, timelines: Mutex<HashMap<ZTimelineId, Arc<RocksTimeline>>>,
@@ -158,11 +158,9 @@ impl CacheEntryContent {
impl RocksRepository { impl RocksRepository {
pub fn new( pub fn new(
conf: &PageServerConf, conf: &PageServerConf,
repo_dir: &Path,
walredo_mgr: Arc<dyn WalRedoManager>, walredo_mgr: Arc<dyn WalRedoManager>,
) -> RocksRepository { ) -> RocksRepository {
RocksRepository { RocksRepository {
repo_dir: PathBuf::from(repo_dir),
conf: conf.clone(), conf: conf.clone(),
timelines: Mutex::new(HashMap::new()), timelines: Mutex::new(HashMap::new()),
walredo_mgr, walredo_mgr,
@@ -188,7 +186,7 @@ impl Repository for RocksRepository {
Some(timeline) => Ok(timeline.clone()), Some(timeline) => Ok(timeline.clone()),
None => { None => {
let timeline = let timeline =
RocksTimeline::new(&self.repo_dir, timelineid, self.walredo_mgr.clone()); RocksTimeline::new(&self.conf, timelineid, self.walredo_mgr.clone());
restore_timeline(&self.conf, &timeline, timelineid)?; restore_timeline(&self.conf, &timeline, timelineid)?;
@@ -216,7 +214,7 @@ impl Repository for RocksRepository {
fn create_empty_timeline(&self, timelineid: ZTimelineId) -> Result<Arc<dyn Timeline>> { fn create_empty_timeline(&self, timelineid: ZTimelineId) -> Result<Arc<dyn Timeline>> {
let mut timelines = self.timelines.lock().unwrap(); let mut timelines = self.timelines.lock().unwrap();
let timeline = RocksTimeline::new(&self.repo_dir, timelineid, self.walredo_mgr.clone()); let timeline = RocksTimeline::new(&self.conf, timelineid, self.walredo_mgr.clone());
let timeline_rc = Arc::new(timeline); let timeline_rc = Arc::new(timeline);
let r = timelines.insert(timelineid, timeline_rc.clone()); let r = timelines.insert(timelineid, timeline_rc.clone());
@@ -229,8 +227,8 @@ impl Repository for RocksRepository {
} }
impl RocksTimeline { impl RocksTimeline {
fn open_rocksdb(repo_dir: &Path, timelineid: ZTimelineId) -> rocksdb::DB { fn open_rocksdb(conf: &PageServerConf, timelineid: ZTimelineId) -> rocksdb::DB {
let path = repo_dir.join("timelines").join(timelineid.to_string()); let path = conf.timeline_path(timelineid);
let mut opts = rocksdb::Options::default(); let mut opts = rocksdb::Options::default();
opts.create_if_missing(true); opts.create_if_missing(true);
opts.set_use_fsync(true); opts.set_use_fsync(true);
@@ -246,12 +244,12 @@ impl RocksTimeline {
} }
fn new( fn new(
repo_dir: &Path, conf: &PageServerConf,
timelineid: ZTimelineId, timelineid: ZTimelineId,
walredo_mgr: Arc<dyn WalRedoManager>, walredo_mgr: Arc<dyn WalRedoManager>,
) -> RocksTimeline { ) -> RocksTimeline {
RocksTimeline { RocksTimeline {
db: RocksTimeline::open_rocksdb(repo_dir, timelineid), db: RocksTimeline::open_rocksdb(conf, timelineid),
walredo_mgr, walredo_mgr,

View File

@@ -457,7 +457,9 @@ impl PostgresRedoProcess {
// Start postgres binary in special WAL redo mode. // Start postgres binary in special WAL redo mode.
// //
// Tests who run pageserver binary are setting proper PG_BIN_DIR // Tests who run pageserver binary are setting proper PG_BIN_DIR
// and PG_LIB_DIR so that WalRedo would start right postgres. We may later // and PG_LIB_DIR so that WalRedo would start right postgres.
// do that: We may later
// switch to setting same things in pageserver config file. // switch to setting same things in pageserver config file.
async fn launch(datadir: &str) -> Result<PostgresRedoProcess, Error> { async fn launch(datadir: &str) -> Result<PostgresRedoProcess, Error> {
// Create empty data directory for wal-redo postgres deleting old one. // Create empty data directory for wal-redo postgres deleting old one.

View File

@@ -41,7 +41,7 @@ If you want to run all tests that have the string "bench" in their names:
Useful environment variables: Useful environment variables:
`ZENITH_BIN`: The directory where zenith binaries can be found. `ZENITH_BIN`: The directory where zenith binaries can be found.
`POSTGRES_BIN`: The directory where postgres binaries can be found. `POSTGRES_DISTRIB_DIR`: The directory where postgres distribution can be found.
`TEST_OUTPUT`: Set the directory where test state and test output files `TEST_OUTPUT`: Set the directory where test state and test output files
should go. should go.
`TEST_SHARED_FIXTURES`: Try to re-use a single postgres and pageserver `TEST_SHARED_FIXTURES`: Try to re-use a single postgres and pageserver

View File

@@ -16,7 +16,7 @@ A fixture is created with the decorator @zenfixture, which is a wrapper around
the standard pytest.fixture with some extra behavior. the standard pytest.fixture with some extra behavior.
There are several environment variables that can control the running of tests: There are several environment variables that can control the running of tests:
ZENITH_BIN, POSTGRES_BIN, etc. See README.md for more information. ZENITH_BIN, POSTGRES_DISTRIB_DIR, etc. See README.md for more information.
To use fixtures in a test file, add this line of code: To use fixtures in a test file, add this line of code:
@@ -78,7 +78,7 @@ class ZenithCli:
self.bin_zenith = os.path.join(binpath, 'zenith') self.bin_zenith = os.path.join(binpath, 'zenith')
self.env = os.environ.copy() self.env = os.environ.copy()
self.env['ZENITH_REPO_DIR'] = repo_dir self.env['ZENITH_REPO_DIR'] = repo_dir
self.env['POSTGRES_BIN'] = pg_distrib_dir self.env['POSTGRES_DISTRIB_DIR'] = pg_distrib_dir
def run(self, arguments): def run(self, arguments):
""" Run "zenith" with the specified arguments. """ Run "zenith" with the specified arguments.
@@ -108,11 +108,11 @@ class ZenithPageserver:
self.running = False self.running = False
def start(self): def start(self):
self.zenith_cli.run(['pageserver', 'start']) self.zenith_cli.run(['start'])
self.running = True self.running = True
def stop(self): def stop(self):
self.zenith_cli.run(['pageserver', 'stop']) self.zenith_cli.run(['stop'])
self.running = True self.running = True
@@ -316,7 +316,7 @@ def zenith_binpath(base_dir):
@zenfixture @zenfixture
def pg_distrib_dir(base_dir): def pg_distrib_dir(base_dir):
""" find the postgress install """ """ find the postgress install """
env_postgres_bin = os.environ.get('POSTGRES_BIN') env_postgres_bin = os.environ.get('POSTGRES_DISTRIB_DIR')
if env_postgres_bin: if env_postgres_bin:
pg_dir = env_postgres_bin pg_dir = env_postgres_bin
else: else:

View File

@@ -3,7 +3,7 @@ use std::net::SocketAddr;
use std::path::PathBuf; use std::path::PathBuf;
use std::time::Duration; use std::time::Duration;
mod pq_protocol; pub mod pq_protocol;
pub mod s3_offload; pub mod s3_offload;
pub mod wal_service; pub mod wal_service;

View File

@@ -10,6 +10,18 @@ edition = "2018"
clap = "2.33.0" clap = "2.33.0"
anyhow = "1.0" anyhow = "1.0"
serde_json = "1" serde_json = "1"
# rand = "0.8.3"
# tar = "0.4.33"
# serde = { version = "1.0", features = ["derive"] }
# toml = "0.5"
# lazy_static = "1.4"
# regex = "1"
# # hex = "0.4.3"
# bytes = "1.0.1"
# # fs_extra = "1.2.0"
# nix = "0.20"
# # thiserror = "1"
# url = "2.2.2"
postgres = { git = "https://github.com/zenithdb/rust-postgres.git", rev="9eb0dbfbeb6a6c1b79099b9f7ae4a8c021877858" } postgres = { git = "https://github.com/zenithdb/rust-postgres.git", rev="9eb0dbfbeb6a6c1b79099b9f7ae4a8c021877858" }

View File

@@ -1,55 +1,51 @@
use std::path::{Path, PathBuf}; use std::collections::HashMap;
use std::process::exit; use std::process::exit;
use std::str::FromStr; use anyhow::{Context, anyhow};
use std::{collections::HashMap, fs}; use anyhow::Result;
use anyhow::{Result, Context};
use anyhow::{anyhow, bail};
use clap::{App, Arg, ArgMatches, SubCommand}; use clap::{App, Arg, ArgMatches, SubCommand};
use control_plane::local_env::LocalEnv; use control_plane::local_env;
use control_plane::compute::ComputeControlPlane;
use control_plane::storage::PageServerNode; use control_plane::storage::PageServerNode;
use control_plane::{compute::ComputeControlPlane, local_env, storage}; use pageserver::{ZTimelineId, branches::BranchInfo};
use pageserver::{branches::BranchInfo, ZTimelineId};
use zenith_utils::lsn::Lsn; use zenith_utils::lsn::Lsn;
fn zenith_repo_dir() -> PathBuf {
// Find repository path
match std::env::var_os("ZENITH_REPO_DIR") {
Some(val) => PathBuf::from(val.to_str().unwrap()),
None => ".zenith".into(),
}
}
// Main entry point for the 'zenith' CLI utility // Main entry point for the 'zenith' CLI utility
// //
// This utility can used to work with a local zenith repository. // This utility helps to manage zenith installation. That includes following:
// In order to run queries in it, you need to launch the page server, // * Management of local postgres installations running on top of the
// and a compute node against the page server // pageserver.
// * Providing CLI api to the pageserver (local or remote)
// * TODO: export/import to/from usual postgres
fn main() -> Result<()> { fn main() -> Result<()> {
let name_arg = Arg::with_name("NAME") let name_arg = Arg::with_name("NAME")
.short("n") .short("n")
.index(1) .index(1)
.help("name of this postgres instance") .help("name of this postgres instance")
.required(true); .required(true);
let matches = App::new("zenith") let matches = App::new("zenith")
.about("Zenith CLI") .about("Zenith CLI")
.subcommand(SubCommand::with_name("init").about("Initialize a new Zenith repository")) .subcommand(
SubCommand::with_name("init")
.about("Initialize a new Zenith repository")
.arg(
Arg::with_name("remote-pageserver")
.long("remote-pageserver")
.required(false)
.value_name("pageserver-url")
),
)
.subcommand( .subcommand(
SubCommand::with_name("branch") SubCommand::with_name("branch")
.about("Create a new branch") .about("Create a new branch")
.arg(Arg::with_name("branchname").required(false).index(1)) .arg(Arg::with_name("branchname").required(false).index(1))
.arg(Arg::with_name("start-point").required(false).index(2)), .arg(Arg::with_name("start-point").required(false).index(2)),
) )
.subcommand( .subcommand(SubCommand::with_name("status"))
SubCommand::with_name("pageserver") .subcommand(SubCommand::with_name("start"))
.about("Manage pageserver instance") .subcommand(SubCommand::with_name("stop"))
.subcommand(SubCommand::with_name("status")) .subcommand(SubCommand::with_name("restart"))
.subcommand(SubCommand::with_name("start"))
.subcommand(SubCommand::with_name("stop")),
)
.subcommand( .subcommand(
SubCommand::with_name("pg") SubCommand::with_name("pg")
.about("Manage postgres instances") .about("Manage postgres instances")
@@ -67,52 +63,74 @@ fn main() -> Result<()> {
) )
.get_matches(); .get_matches();
// handle init separately and exit // Create config file
if let ("init", Some(sub_args)) = matches.subcommand() { if let ("init", Some(sub_args)) = matches.subcommand() {
run_init_cmd(sub_args.clone())?; let pageserver_uri = sub_args.value_of("pageserver-url");
exit(0); local_env::init(pageserver_uri)
.with_context(|| "Failed to create cofig file")?;
} }
// all other commands would need config // all other commands would need config
let env = match local_env::load_config() {
let repopath = zenith_repo_dir();
if !repopath.exists() {
bail!(
"Zenith repository does not exist in {}.\n\
Set ZENITH_REPO_DIR or initialize a new repository with 'zenith init'",
repopath.display()
);
}
// TODO: check that it looks like a zenith repository
let env = match local_env::load_config(&repopath) {
Ok(conf) => conf, Ok(conf) => conf,
Err(e) => { Err(e) => {
eprintln!("Error loading config from {}: {}", repopath.display(), e); eprintln!("Error loading config: {}", e);
exit(1); exit(1);
} }
}; };
match matches.subcommand() { match matches.subcommand() {
("init", Some(_)) => { ("init", Some(_)) => {
panic!() /* Should not happen. Init was handled before */ let pageserver = PageServerNode::from_env(&env);
pageserver.init()?;
} }
("branch", Some(sub_args)) => run_branch_cmd(&env, sub_args.clone())?, ("branch", Some(sub_args)) => {
("pageserver", Some(sub_args)) => run_pageserver_cmd(&env, sub_args.clone())?, let pageserver = PageServerNode::from_env(&env);
if let Some(branchname) = sub_args.value_of("branchname") {
if let Some(startpoint_str) = sub_args.value_of("start-point") {
let branch = pageserver.branch_create(branchname, startpoint_str)?;
println!("Created branch '{}' at {:?}", branch.name, branch.latest_valid_lsn.unwrap_or(Lsn(0)));
} else {
panic!("Missing start-point");
}
} else {
// No arguments, list branches
for branch in pageserver.branches_list()? {
println!(" {}", branch.name);
}
}
}
("start", Some(_sub_m)) => { ("start", Some(_sub_m)) => {
let pageserver = storage::PageServerNode::from_env(&env); let pageserver = PageServerNode::from_env(&env);
if let Err(e) = pageserver.start() { if let Err(e) = pageserver.start() {
eprintln!("pageserver start: {}", e); eprintln!("pageserver start failed: {}", e);
exit(1); exit(1);
} }
} }
("stop", Some(_sub_m)) => { ("stop", Some(_sub_m)) => {
let pageserver = storage::PageServerNode::from_env(&env); let pageserver = PageServerNode::from_env(&env);
if let Err(e) = pageserver.stop() { if let Err(e) = pageserver.stop() {
eprintln!("pageserver stop: {}", e); eprintln!("pageserver stop failed: {}", e);
exit(1);
}
}
("restart", Some(_sub_m)) => {
let pageserver = PageServerNode::from_env(&env);
if let Err(e) = pageserver.stop() {
eprintln!("pageserver stop failed: {}", e);
exit(1);
}
if let Err(e) = pageserver.start() {
eprintln!("pageserver start failed: {}", e);
exit(1); exit(1);
} }
} }
@@ -131,38 +149,10 @@ fn main() -> Result<()> {
Ok(()) Ok(())
} }
fn run_pageserver_cmd(local_env: &LocalEnv, args: ArgMatches) -> Result<()> {
match args.subcommand() {
("status", Some(_sub_m)) => {
todo!();
}
("start", Some(_sub_m)) => {
let psnode = PageServerNode::from_env(local_env);
psnode.start()?;
println!("Page server started");
}
("stop", Some(_sub_m)) => {
let psnode = PageServerNode::from_env(local_env);
psnode.stop()?;
println!("Page server stopped");
}
_ => unreachable!(),
};
Ok(())
}
// Peek into the repository, to grab the timeline ID of given branch
pub fn get_branch_timeline(repopath: &Path, branchname: &str) -> ZTimelineId {
let branchpath = repopath.join("refs/branches/".to_owned() + branchname);
ZTimelineId::from_str(&(fs::read_to_string(&branchpath).unwrap())).unwrap()
}
/// Returns a map of timeline IDs to branch_name@lsn strings. /// Returns a map of timeline IDs to branch_name@lsn strings.
/// Connects to the pageserver to query this information. /// Connects to the pageserver to query this information.
fn get_branch_infos(env: &LocalEnv) -> Result<HashMap<ZTimelineId, String>> { fn get_branch_infos(env: &local_env::LocalEnv) -> Result<HashMap<ZTimelineId, String>> {
let page_server = storage::PageServerNode::from_env(env); let page_server = PageServerNode::from_env(env);
let mut client = page_server.page_server_psql_client()?; let mut client = page_server.page_server_psql_client()?;
let branches_msgs = client.simple_query("pg_list")?; let branches_msgs = client.simple_query("pg_list")?;
@@ -179,11 +169,10 @@ fn get_branch_infos(env: &LocalEnv) -> Result<HashMap<ZTimelineId, String>> {
let branch_infos: Result<HashMap<ZTimelineId, String>> = branch_infos let branch_infos: Result<HashMap<ZTimelineId, String>> = branch_infos
.into_iter() .into_iter()
.map(|branch_info| { .map(|branch_info| {
let timeline_id = ZTimelineId::from_str(&branch_info.timeline_id)?;
let lsn_string_opt = branch_info.latest_valid_lsn.map(|lsn| lsn.to_string()); let lsn_string_opt = branch_info.latest_valid_lsn.map(|lsn| lsn.to_string());
let lsn_str = lsn_string_opt.as_deref().unwrap_or("?"); let lsn_str = lsn_string_opt.as_deref().unwrap_or("?");
let branch_lsn_string = format!("{}@{}", branch_info.name, lsn_str); let branch_lsn_string = format!("{}@{}", branch_info.name, lsn_str);
Ok((timeline_id, branch_lsn_string)) Ok((branch_info.timeline_id, branch_lsn_string))
}) })
.collect(); .collect();
@@ -193,19 +182,11 @@ fn get_branch_infos(env: &LocalEnv) -> Result<HashMap<ZTimelineId, String>> {
fn handle_pg(pg_match: &ArgMatches, env: &local_env::LocalEnv) -> Result<()> { fn handle_pg(pg_match: &ArgMatches, env: &local_env::LocalEnv) -> Result<()> {
let mut cplane = ComputeControlPlane::load(env.clone())?; let mut cplane = ComputeControlPlane::load(env.clone())?;
// FIXME: cheat and resolve the timeline by peeking into the
// repository. In reality, when you're launching a compute node
// against a possibly-remote page server, we wouldn't know what
// branches exist in the remote repository. Or would we require
// that you "zenith fetch" them into a local repoitory first?
match pg_match.subcommand() { match pg_match.subcommand() {
("create", Some(sub_m)) => { ("create", Some(sub_m)) => {
let timeline_arg = sub_m.value_of("timeline").unwrap_or("main"); let timeline_arg = sub_m.value_of("timeline").unwrap_or("main");
let timeline = get_branch_timeline(&env.repo_path, timeline_arg); println!("Initializing Postgres on timeline {}...", timeline_arg);
cplane.new_node(timeline_arg)?;
println!("Initializing Postgres on timeline {}...", timeline);
cplane.new_node(timeline)?;
} }
("list", Some(_sub_m)) => { ("list", Some(_sub_m)) => {
let branch_infos = get_branch_infos(env).unwrap_or_else(|e| { let branch_infos = get_branch_infos(env).unwrap_or_else(|e| {
@@ -249,121 +230,3 @@ fn handle_pg(pg_match: &ArgMatches, env: &local_env::LocalEnv) -> Result<()> {
Ok(()) Ok(())
} }
// "zenith init" - Initialize a new Zenith repository in current dir
fn run_init_cmd(_args: ArgMatches) -> Result<()> {
local_env::init()?;
Ok(())
}
// handle "zenith branch" subcommand
fn run_branch_cmd(local_env: &LocalEnv, args: ArgMatches) -> Result<()> {
let repopath = local_env.repo_path.to_str().unwrap();
if let Some(branchname) = args.value_of("branchname") {
if PathBuf::from(format!("{}/refs/branches/{}", repopath, branchname)).exists() {
anyhow::bail!("branch {} already exists", branchname);
}
if let Some(startpoint_str) = args.value_of("start-point") {
let mut startpoint = parse_point_in_time(startpoint_str)?;
if startpoint.lsn == Lsn(0) {
// Find end of WAL on the old timeline
let end_of_wal = local_env::find_end_of_wal(local_env, startpoint.timelineid)?;
println!("branching at end of WAL: {}", end_of_wal);
startpoint.lsn = end_of_wal;
}
return local_env::create_branch(local_env, branchname, startpoint);
} else {
panic!("Missing start-point");
}
} else {
// No arguments, list branches
list_branches()?;
}
Ok(())
}
fn list_branches() -> Result<()> {
// list branches
let paths = fs::read_dir(zenith_repo_dir().join("refs").join("branches"))?;
for path in paths {
println!(" {}", path?.file_name().to_str().unwrap());
}
Ok(())
}
//
// Parse user-given string that represents a point-in-time.
//
// We support multiple variants:
//
// Raw timeline id in hex, meaning the end of that timeline:
// bc62e7d612d0e6fe8f99a6dd2f281f9d
//
// A specific LSN on a timeline:
// bc62e7d612d0e6fe8f99a6dd2f281f9d@2/15D3DD8
//
// Same, with a human-friendly branch name:
// main
// main@2/15D3DD8
//
// Human-friendly tag name:
// mytag
//
//
fn parse_point_in_time(s: &str) -> Result<local_env::PointInTime> {
let mut strings = s.split('@');
let name = strings.next().unwrap();
let lsn: Option<Lsn>;
if let Some(lsnstr) = strings.next() {
lsn = Some(
Lsn::from_str(lsnstr)
.with_context(|| "invalid LSN in point-in-time specification")?
);
} else {
lsn = None
}
// Check if it's a tag
if lsn.is_none() {
let tagpath = zenith_repo_dir().join("refs").join("tags").join(name);
if tagpath.exists() {
let pointstr = fs::read_to_string(tagpath)?;
return parse_point_in_time(&pointstr);
}
}
// Check if it's a branch
// Check if it's branch @ LSN
let branchpath = zenith_repo_dir().join("refs").join("branches").join(name);
if branchpath.exists() {
let pointstr = fs::read_to_string(branchpath)?;
let mut result = parse_point_in_time(&pointstr)?;
result.lsn = lsn.unwrap_or(Lsn(0));
return Ok(result);
}
// Check if it's a timelineid
// Check if it's timelineid @ LSN
let tlipath = zenith_repo_dir().join("timelines").join(name);
if tlipath.exists() {
let result = local_env::PointInTime {
timelineid: ZTimelineId::from_str(name)?,
lsn: lsn.unwrap_or(Lsn(0)),
};
return Ok(result);
}
panic!("could not parse point-in-time {}", s);
}