Refactor CLI and CLI<->pageserver interfaces to support remote pageserver

This patch started as an effort to support CLI working against remote
pageserver, but turned into a pretty big refactoring.

* CLI now does not look into repository files directly. New commands
'branch_create' and 'identify_system' were introduced into page_service to
support that.
* Branch management that was scattered between local_env and
zenith/main.rs is moved into pageserver/branches.rs. That code could better fit
in Repository/Timeline impl, but I'll leave that for a different patch.
* All tests-related code from local_env went into integration_tests/src/lib.rs as an
extension to PostgresNode trait.
* Paths-generating functions were concentrated around corresponding config
types (LocalEnv and PageserverConf).
This commit is contained in:
Stas Kelvich
2021-05-16 17:19:36 +03:00
parent 53ea6702bd
commit 746f667311
27 changed files with 1317 additions and 1082 deletions

View File

@@ -141,7 +141,7 @@ jobs:
working_directory: test_runner
environment:
- ZENITH_BIN: /tmp/zenith/bin
- POSTGRES_BIN: /tmp/zenith/pg_install
- POSTGRES_DISTRIB_DIR: /tmp/zenith/pg_install
- TEST_OUTPUT: /tmp/test_output
command: |
TEST_FILE="<< parameters.test_file >>"

12
Cargo.lock generated
View File

@@ -263,8 +263,6 @@ version = "0.1.0"
dependencies = [
"anyhow",
"bytes",
"fs_extra",
"hex",
"lazy_static",
"nix",
"pageserver",
@@ -273,9 +271,10 @@ dependencies = [
"rand",
"regex",
"serde",
"serde_json",
"tar",
"thiserror",
"toml",
"url",
"walkeeper",
"workspace_hack",
"zenith_utils",
@@ -788,8 +787,10 @@ dependencies = [
name = "integration_tests"
version = "0.1.0"
dependencies = [
"anyhow",
"control_plane",
"lazy_static",
"nix",
"pageserver",
"postgres",
"rand",
@@ -1169,6 +1170,7 @@ dependencies = [
"clap",
"crc32c",
"daemonize",
"fs_extra",
"futures",
"hex",
"lazy_static",
@@ -2190,9 +2192,9 @@ checksum = "8ccb82d61f80a663efe1f787a51b16b5a51e3314d6ac365b08639f52387b33f3"
[[package]]
name = "url"
version = "2.2.1"
version = "2.2.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9ccd964113622c8e9322cfac19eb1004a07e636c545f325da085d5cdde6f1f8b"
checksum = "a507c383b2d33b5fc35d1861e77e6b383d158b2da5e14fe51b83dfedf6fd578c"
dependencies = [
"form_urlencoded",
"idna",

View File

@@ -11,15 +11,17 @@ rand = "0.8.3"
tar = "0.4.33"
postgres = { git = "https://github.com/zenithdb/rust-postgres.git", rev="9eb0dbfbeb6a6c1b79099b9f7ae4a8c021877858" }
serde = { version = "1.0", features = ["derive"] }
serde_json = "1"
toml = "0.5"
lazy_static = "1.4"
regex = "1"
anyhow = "1.0"
hex = "0.4.3"
# hex = "0.4.3"
bytes = "1.0.1"
fs_extra = "1.2.0"
# fs_extra = "1.2.0"
nix = "0.20"
thiserror = "1"
# thiserror = "1"
url = "2.2.2"
pageserver = { path = "../pageserver" }
walkeeper = { path = "../walkeeper" }

View File

@@ -1,23 +1,24 @@
use std::fs::{self, File, OpenOptions};
use std::io::{Read, Write};
use std::io::Write;
use std::net::SocketAddr;
use std::net::TcpStream;
use std::os::unix::fs::PermissionsExt;
use std::path::Path;
use std::process::{Command, ExitStatus};
use std::process::Command;
use std::sync::Arc;
use std::time::Duration;
use std::{collections::BTreeMap, path::PathBuf};
use std::{
fs::{self, OpenOptions},
io::Read,
};
use anyhow::{Context, Result};
use lazy_static::lazy_static;
use regex::Regex;
use postgres::{Client, NoTls};
use crate::local_env::LocalEnv;
use crate::storage::{PageServerNode, WalProposerNode};
use pageserver::{zenith_repo_dir, ZTimelineId};
use pageserver::ZTimelineId;
use crate::storage::PageServerNode;
//
// ComputeControlPlane
@@ -36,8 +37,8 @@ impl ComputeControlPlane {
// it is running on default port. Change that when pageserver will have config.
let pageserver = Arc::new(PageServerNode::from_env(&env));
let pgdatadirspath = env.repo_path.join("pgdatadirs");
let nodes: Result<BTreeMap<_, _>> = fs::read_dir(&pgdatadirspath)
let pgdatadirspath = &env.pg_data_dirs_path();
let nodes: Result<BTreeMap<_, _>> = fs::read_dir(pgdatadirspath)
.with_context(|| format!("failed to list {}", pgdatadirspath.display()))?
.into_iter()
.map(|f| {
@@ -97,8 +98,14 @@ impl ComputeControlPlane {
Ok(node)
}
pub fn new_test_node(&mut self, timelineid: ZTimelineId) -> Arc<PostgresNode> {
let node = self.new_from_page_server(true, timelineid);
pub fn new_test_node(&mut self, branch_name: &str) -> Arc<PostgresNode> {
let timeline_id = self
.pageserver
.branch_get_by_name(branch_name)
.expect("failed to get timeline_id")
.timeline_id;
let node = self.new_from_page_server(true, timeline_id);
let node = node.unwrap();
// Configure the node to stream WAL directly to the pageserver
@@ -115,8 +122,14 @@ impl ComputeControlPlane {
node
}
pub fn new_test_master_node(&mut self, timelineid: ZTimelineId) -> Arc<PostgresNode> {
let node = self.new_from_page_server(true, timelineid).unwrap();
pub fn new_test_master_node(&mut self, branch_name: &str) -> Arc<PostgresNode> {
let timeline_id = self
.pageserver
.branch_get_by_name(branch_name)
.expect("failed to get timeline_id")
.timeline_id;
let node = self.new_from_page_server(true, timeline_id).unwrap();
node.append_conf(
"postgresql.conf",
@@ -126,8 +139,14 @@ impl ComputeControlPlane {
node
}
pub fn new_node(&mut self, timelineid: ZTimelineId) -> Result<Arc<PostgresNode>> {
let node = self.new_from_page_server(false, timelineid).unwrap();
pub fn new_node(&mut self, branch_name: &str) -> Result<Arc<PostgresNode>> {
let timeline_id = self
.pageserver
.branch_get_by_name(branch_name)
.expect("failed to get timeline_id")
.timeline_id;
let node = self.new_from_page_server(false, timeline_id).unwrap();
// Configure the node to stream WAL directly to the pageserver
node.append_conf(
@@ -291,9 +310,9 @@ impl PostgresNode {
max_replication_slots = 10\n\
hot_standby = on\n\
shared_buffers = 1MB\n\
fsync = off\n\
fsync = off\n\
max_connections = 100\n\
wal_sender_timeout = 0\n\
wal_sender_timeout = 0\n\
wal_level = replica\n\
listen_addresses = '{address}'\n\
port = {port}\n",
@@ -326,8 +345,8 @@ impl PostgresNode {
Ok(())
}
fn pgdata(&self) -> PathBuf {
self.env.repo_path.join("pgdatadirs").join(&self.name)
pub fn pgdata(&self) -> PathBuf {
self.env.pg_data_dir(&self.name)
}
pub fn status(&self) -> &str {
@@ -413,152 +432,6 @@ impl PostgresNode {
String::from_utf8(output.stdout).unwrap().trim().to_string()
}
fn dump_log_file(&self) {
if let Ok(mut file) = File::open(self.env.repo_path.join("pageserver.log")) {
let mut buffer = String::new();
file.read_to_string(&mut buffer).unwrap();
println!("--------------- pageserver.log:\n{}", buffer);
}
}
pub fn safe_psql(&self, db: &str, sql: &str) -> Vec<postgres::Row> {
let connstring = format!(
"host={} port={} dbname={} user={}",
self.address.ip(),
self.address.port(),
db,
self.whoami()
);
let mut client = Client::connect(connstring.as_str(), NoTls).unwrap();
println!("Running {}", sql);
let result = client.query(sql, &[]);
if result.is_err() {
self.dump_log_file();
}
result.unwrap()
}
pub fn open_psql(&self, db: &str) -> Client {
let connstring = format!(
"host={} port={} dbname={} user={}",
self.address.ip(),
self.address.port(),
db,
self.whoami()
);
Client::connect(connstring.as_str(), NoTls).unwrap()
}
pub fn start_proxy(&self, wal_acceptors: &str) -> WalProposerNode {
let proxy_path = self.env.pg_bin_dir().join("safekeeper_proxy");
match Command::new(proxy_path.as_path())
.args(&["--ztimelineid", &self.timelineid.to_string()])
.args(&["-s", wal_acceptors])
.args(&["-h", &self.address.ip().to_string()])
.args(&["-p", &self.address.port().to_string()])
.arg("-v")
.stderr(
OpenOptions::new()
.create(true)
.append(true)
.open(self.pgdata().join("safekeeper_proxy.log"))
.unwrap(),
)
.spawn()
{
Ok(child) => WalProposerNode { pid: child.id() },
Err(e) => panic!("Failed to launch {:?}: {}", proxy_path, e),
}
}
pub fn pg_regress(&self) -> ExitStatus {
self.safe_psql("postgres", "CREATE DATABASE regression");
let data_dir = zenith_repo_dir();
let regress_run_path = data_dir.join("regress");
fs::create_dir_all(&regress_run_path).unwrap();
fs::create_dir_all(regress_run_path.join("testtablespace")).unwrap();
std::env::set_current_dir(regress_run_path).unwrap();
let regress_build_path =
Path::new(env!("CARGO_MANIFEST_DIR")).join("../tmp_install/build/src/test/regress");
let regress_src_path =
Path::new(env!("CARGO_MANIFEST_DIR")).join("../vendor/postgres/src/test/regress");
let regress_check = Command::new(regress_build_path.join("pg_regress"))
.args(&[
"--bindir=''",
"--use-existing",
format!("--bindir={}", self.env.pg_bin_dir().to_str().unwrap()).as_str(),
format!("--dlpath={}", regress_build_path.to_str().unwrap()).as_str(),
format!(
"--schedule={}",
regress_src_path.join("parallel_schedule").to_str().unwrap()
)
.as_str(),
format!("--inputdir={}", regress_src_path.to_str().unwrap()).as_str(),
])
.env_clear()
.env("LD_LIBRARY_PATH", self.env.pg_lib_dir().to_str().unwrap())
.env("DYLD_LIBRARY_PATH", self.env.pg_lib_dir().to_str().unwrap())
.env("PGPORT", self.address.port().to_string())
.env("PGUSER", self.whoami())
.env("PGHOST", self.address.ip().to_string())
.status()
.expect("pg_regress failed");
if !regress_check.success() {
if let Ok(mut file) = File::open("regression.diffs") {
let mut buffer = String::new();
file.read_to_string(&mut buffer).unwrap();
println!("--------------- regression.diffs:\n{}", buffer);
}
self.dump_log_file();
if let Ok(mut file) = File::open(
self.env
.repo_path
.join("pgdatadirs")
.join("pg1")
.join("log"),
) {
let mut buffer = String::new();
file.read_to_string(&mut buffer).unwrap();
println!("--------------- pgdatadirs/pg1/log:\n{}", buffer);
}
}
regress_check
}
pub fn pg_bench(&self, clients: u32, seconds: u32) -> ExitStatus {
let port = self.address.port().to_string();
let clients = clients.to_string();
let seconds = seconds.to_string();
let _pg_bench_init = Command::new(self.env.pg_bin_dir().join("pgbench"))
.args(&["-i", "-p", port.as_str(), "postgres"])
.env("LD_LIBRARY_PATH", self.env.pg_lib_dir().to_str().unwrap())
.env("DYLD_LIBRARY_PATH", self.env.pg_lib_dir().to_str().unwrap())
.status()
.expect("pgbench -i");
let pg_bench_run = Command::new(self.env.pg_bin_dir().join("pgbench"))
.args(&[
"-p",
port.as_str(),
"-T",
seconds.as_str(),
"-P",
"1",
"-c",
clients.as_str(),
"-M",
"prepared",
"postgres",
])
.env("LD_LIBRARY_PATH", self.env.pg_lib_dir().to_str().unwrap())
.env("DYLD_LIBRARY_PATH", self.env.pg_lib_dir().to_str().unwrap())
.status()
.expect("pgbench run");
pg_bench_run
}
}
impl Drop for PostgresNode {

View File

@@ -6,7 +6,26 @@
// Intended to be used in integration tests and in CLI tools for
// local installations.
//
use anyhow::{anyhow, bail, Context, Result};
use std::fs;
use std::path::Path;
pub mod compute;
pub mod local_env;
pub mod storage;
/// Read a PID file
///
/// We expect a file that contains a single integer.
/// We return an i32 for compatibility with libc and nix.
pub fn read_pidfile(pidfile: &Path) -> Result<i32> {
let pid_str = fs::read_to_string(pidfile)
.with_context(|| format!("failed to read pidfile {:?}", pidfile))?;
let pid: i32 = pid_str
.parse()
.map_err(|_| anyhow!("failed to parse pidfile {:?}", pidfile))?;
if pid < 1 {
bail!("pidfile {:?} contained bad value '{}'", pidfile, pid);
}
Ok(pid)
}

View File

@@ -4,37 +4,23 @@
// Now it also provides init method which acts like a stub for proper installation
// script which will use local paths.
//
use anyhow::Context;
use bytes::Bytes;
use rand::Rng;
use anyhow::{anyhow, Result};
use serde::{Deserialize, Serialize};
use std::env;
use std::fs;
use std::fs::File;
use std::io::Read;
use std::path::{Path, PathBuf};
use std::process::{Command, Stdio};
use anyhow::Result;
use serde::{Deserialize, Serialize};
use pageserver::zenith_repo_dir;
use pageserver::ZTimelineId;
use postgres_ffi::xlog_utils;
use zenith_utils::lsn::Lsn;
use std::path::PathBuf;
use url::Url;
//
// This data structure represents deserialized zenith config, which should be
// located in ~/.zenith
//
// TODO: should we also support ZENITH_CONF env var?
// This data structures represent deserialized zenith CLI config
//
#[derive(Serialize, Deserialize, Clone)]
pub struct LocalEnv {
// Path to the Repository. Here page server and compute nodes will create and store their data.
pub repo_path: PathBuf,
// Pageserver connection strings
pub pageserver_connstring: String,
// System identifier, from the PostgreSQL control file
pub systemid: u64,
// Base directory for both pageserver and compute nodes
pub base_data_dir: PathBuf,
// Path to postgres distribution. It's expected that "bin", "include",
// "lib", "share" from postgres distribution are there. If at some point
@@ -42,38 +28,66 @@ pub struct LocalEnv {
// to four separate paths and match OS-specific installation layout.
pub pg_distrib_dir: PathBuf,
// Path to pageserver binary.
pub zenith_distrib_dir: PathBuf,
// Path to pageserver binary. Empty for remote pageserver.
pub zenith_distrib_dir: Option<PathBuf>,
}
impl LocalEnv {
// postgres installation
// postgres installation paths
pub fn pg_bin_dir(&self) -> PathBuf {
self.pg_distrib_dir.join("bin")
}
pub fn pg_lib_dir(&self) -> PathBuf {
self.pg_distrib_dir.join("lib")
}
pub fn pageserver_bin(&self) -> Result<PathBuf> {
Ok(self
.zenith_distrib_dir
.as_ref()
.ok_or(anyhow!("Can not manage remote pageserver"))?
.join("pageserver"))
}
pub fn pg_data_dirs_path(&self) -> PathBuf {
self.base_data_dir.join("pgdatadirs")
}
pub fn pg_data_dir(&self, name: &str) -> PathBuf {
self.pg_data_dirs_path().join(name)
}
// TODO: move pageserver files into ./pageserver
pub fn pageserver_data_dir(&self) -> PathBuf {
self.base_data_dir.clone()
}
}
fn base_path() -> PathBuf {
match std::env::var_os("ZENITH_REPO_DIR") {
Some(val) => PathBuf::from(val.to_str().unwrap()),
None => ".zenith".into(),
}
}
//
// Initialize a new Zenith repository
//
pub fn init() -> Result<()> {
pub fn init(remote_pageserver: Option<&str>) -> Result<()> {
// check if config already exists
let repo_path = zenith_repo_dir();
if repo_path.exists() {
let base_path = base_path();
if base_path.exists() {
anyhow::bail!(
"{} already exists. Perhaps already initialized?",
repo_path.to_str().unwrap()
base_path.to_str().unwrap()
);
}
// ok, now check that expected binaries are present
// Find postgres binaries. Follow POSTGRES_BIN if set, otherwise look in "tmp_install".
// Find postgres binaries. Follow POSTGRES_DISTRIB_DIR if set, otherwise look in "tmp_install".
let pg_distrib_dir: PathBuf = {
if let Some(postgres_bin) = env::var_os("POSTGRES_BIN") {
if let Some(postgres_bin) = env::var_os("POSTGRES_DISTRIB_DIR") {
postgres_bin.into()
} else {
let cwd = env::current_dir()?;
@@ -84,137 +98,45 @@ pub fn init() -> Result<()> {
anyhow::bail!("Can't find postgres binary at {:?}", pg_distrib_dir);
}
// Find zenith binaries.
let zenith_distrib_dir = env::current_exe()?.parent().unwrap().to_owned();
if !zenith_distrib_dir.join("pageserver").exists() {
anyhow::bail!("Can't find pageserver binary.",);
}
fs::create_dir(&base_path)?;
fs::create_dir(base_path.join("pgdatadirs"))?;
// ok, we are good to go
let mut conf = LocalEnv {
repo_path,
pg_distrib_dir,
zenith_distrib_dir,
systemid: 0,
let conf = if let Some(addr) = remote_pageserver {
// check that addr is parsable
let _uri = Url::parse(addr)
.map_err(|e| anyhow!("{}: {}", addr, e))?;
LocalEnv {
pageserver_connstring: format!("postgresql://{}/", addr),
pg_distrib_dir,
zenith_distrib_dir: None,
base_data_dir: base_path,
}
} else {
// Find zenith binaries.
let zenith_distrib_dir = env::current_exe()?.parent().unwrap().to_owned();
if !zenith_distrib_dir.join("pageserver").exists() {
anyhow::bail!("Can't find pageserver binary.",);
}
LocalEnv {
pageserver_connstring: "postgresql://127.0.0.1:6400".to_string(),
pg_distrib_dir,
zenith_distrib_dir: Some(zenith_distrib_dir),
base_data_dir: base_path,
}
};
init_repo(&mut conf)?;
let toml = toml::to_string(&conf)?;
fs::write(conf.base_data_dir.join("config"), toml)?;
Ok(())
}
pub fn init_repo(local_env: &mut LocalEnv) -> Result<()> {
let repopath = &local_env.repo_path;
fs::create_dir(&repopath)
.with_context(|| format!("could not create directory {}", repopath.display()))?;
fs::create_dir(repopath.join("pgdatadirs"))?;
fs::create_dir(repopath.join("timelines"))?;
fs::create_dir(repopath.join("refs"))?;
fs::create_dir(repopath.join("refs").join("branches"))?;
fs::create_dir(repopath.join("refs").join("tags"))?;
println!("created directory structure in {}", repopath.display());
// Locate and load config
pub fn load_config() -> Result<LocalEnv> {
let repopath = base_path();
// Create initial timeline
let tli = create_timeline(&local_env, None)?;
let timelinedir = repopath.join("timelines").join(tli.to_string());
println!("created initial timeline {}", timelinedir.display());
// Run initdb
//
// We create the cluster temporarily in a "tmp" directory inside the repository,
// and move it to the right location from there.
let tmppath = repopath.join("tmp");
let initdb_path = local_env.pg_bin_dir().join("initdb");
let initdb = Command::new(initdb_path)
.args(&["-D", tmppath.to_str().unwrap()])
.arg("--no-instructions")
.env_clear()
.env("LD_LIBRARY_PATH", local_env.pg_lib_dir().to_str().unwrap())
.env(
"DYLD_LIBRARY_PATH",
local_env.pg_lib_dir().to_str().unwrap(),
)
.stdout(Stdio::null())
.status()
.with_context(|| "failed to execute initdb")?;
if !initdb.success() {
anyhow::bail!("initdb failed");
}
println!("initdb succeeded");
// Read control file to extract the LSN and system id
let controlfile_path = tmppath.join("global").join("pg_control");
let controlfile = postgres_ffi::decode_pg_control(Bytes::from(fs::read(controlfile_path)?))?;
let systemid = controlfile.system_identifier;
let lsn = controlfile.checkPoint;
let lsnstr = format!("{:016X}", lsn);
// Move the initial WAL file
fs::rename(
tmppath.join("pg_wal").join("000000010000000000000001"),
timelinedir
.join("wal")
.join("000000010000000000000001.partial"),
)?;
println!("moved initial WAL file");
// Remove pg_wal
fs::remove_dir_all(tmppath.join("pg_wal"))?;
force_crash_recovery(&tmppath)?;
println!("updated pg_control");
let target = timelinedir.join("snapshots").join(&lsnstr);
fs::rename(tmppath, &target)?;
println!("moved 'tmp' to {}", target.display());
// Create 'main' branch to refer to the initial timeline
let data = tli.to_string();
fs::write(repopath.join("refs").join("branches").join("main"), data)?;
println!("created main branch");
// Also update the system id in the LocalEnv
local_env.systemid = systemid;
// write config
let toml = toml::to_string(&local_env)?;
fs::write(repopath.join("config"), toml)?;
println!(
"new zenith repository was created in {}",
repopath.display()
);
Ok(())
}
// If control file says the cluster was shut down cleanly, modify it, to mark
// it as crashed. That forces crash recovery when you start the cluster.
//
// FIXME:
// We currently do this to the initial snapshot in "zenith init". It would
// be more natural to do this when the snapshot is restored instead, but we
// currently don't have any code to create new snapshots, so it doesn't matter
// Or better yet, use a less hacky way of putting the cluster into recovery.
// Perhaps create a backup label file in the data directory when it's restored.
fn force_crash_recovery(datadir: &Path) -> Result<()> {
// Read in the control file
let controlfilepath = datadir.to_path_buf().join("global").join("pg_control");
let mut controlfile =
postgres_ffi::decode_pg_control(Bytes::from(fs::read(controlfilepath.as_path())?))?;
controlfile.state = postgres_ffi::DBState_DB_IN_PRODUCTION;
fs::write(
controlfilepath.as_path(),
postgres_ffi::encode_pg_control(controlfile),
)?;
Ok(())
}
// check that config file is present
pub fn load_config(repopath: &Path) -> Result<LocalEnv> {
if !repopath.exists() {
anyhow::bail!(
"Zenith config is not found in {}. You need to run 'zenith init' first",
@@ -222,32 +144,13 @@ pub fn load_config(repopath: &Path) -> Result<LocalEnv> {
);
}
// TODO: check that it looks like a zenith repository
// load and parse file
let config = fs::read_to_string(repopath.join("config"))?;
toml::from_str(config.as_str()).map_err(|e| e.into())
}
// local env for tests
pub fn test_env(testname: &str) -> LocalEnv {
fs::create_dir_all("../tmp_check").expect("could not create directory ../tmp_check");
let repo_path = Path::new(env!("CARGO_MANIFEST_DIR"))
.join("../tmp_check/")
.join(testname);
// Remove remnants of old test repo
let _ = fs::remove_dir_all(&repo_path);
let mut local_env = LocalEnv {
repo_path,
pg_distrib_dir: Path::new(env!("CARGO_MANIFEST_DIR")).join("../tmp_install"),
zenith_distrib_dir: cargo_bin_dir(),
systemid: 0,
};
init_repo(&mut local_env).expect("could not initialize zenith repository");
local_env
}
// Find the directory where the binaries were put (i.e. target/debug/)
pub fn cargo_bin_dir() -> PathBuf {
let mut pathbuf = std::env::current_exe().unwrap();
@@ -259,155 +162,3 @@ pub fn cargo_bin_dir() -> PathBuf {
pathbuf
}
#[derive(Debug, Clone, Copy)]
pub struct PointInTime {
pub timelineid: ZTimelineId,
pub lsn: Lsn,
}
fn create_timeline(local_env: &LocalEnv, ancestor: Option<PointInTime>) -> Result<ZTimelineId> {
let repopath = &local_env.repo_path;
// Create initial timeline
let mut tli_buf = [0u8; 16];
rand::thread_rng().fill(&mut tli_buf);
let timelineid = ZTimelineId::from(tli_buf);
let timelinedir = repopath.join("timelines").join(timelineid.to_string());
fs::create_dir(&timelinedir)?;
fs::create_dir(&timelinedir.join("snapshots"))?;
fs::create_dir(&timelinedir.join("wal"))?;
if let Some(ancestor) = ancestor {
let data = format!("{}@{}", ancestor.timelineid, ancestor.lsn);
fs::write(timelinedir.join("ancestor"), data)?;
}
Ok(timelineid)
}
// Create a new branch in the repository (for the "zenith branch" subcommand)
pub fn create_branch(
local_env: &LocalEnv,
branchname: &str,
startpoint: PointInTime,
) -> Result<()> {
let repopath = &local_env.repo_path;
// create a new timeline for it
let newtli = create_timeline(local_env, Some(startpoint))?;
let newtimelinedir = repopath.join("timelines").join(newtli.to_string());
let data = newtli.to_string();
fs::write(
repopath.join("refs").join("branches").join(branchname),
data,
)?;
// Copy the latest snapshot (TODO: before the startpoint) and all WAL
// TODO: be smarter and avoid the copying...
let (_maxsnapshot, oldsnapshotdir) = find_latest_snapshot(local_env, startpoint.timelineid)?;
let copy_opts = fs_extra::dir::CopyOptions::new();
fs_extra::dir::copy(oldsnapshotdir, newtimelinedir.join("snapshots"), &copy_opts)?;
let oldtimelinedir = repopath
.join("timelines")
.join(startpoint.timelineid.to_string());
copy_wal(
&oldtimelinedir.join("wal"),
&newtimelinedir.join("wal"),
startpoint.lsn,
16 * 1024 * 1024 // FIXME: assume default WAL segment size
)?;
Ok(())
}
///
/// Copy all WAL segments from one directory to another, up to given LSN.
///
/// If the given LSN is in the middle of a segment, the last segment containing it
/// is written out as .partial, and padded with zeros.
///
fn copy_wal(src_dir: &Path, dst_dir: &Path, upto: Lsn, wal_seg_size: u64) -> Result<()>{
let last_segno = upto.segment_number(wal_seg_size);
let last_segoff = upto.segment_offset(wal_seg_size);
for entry in fs::read_dir(src_dir).unwrap() {
if let Ok(entry) = entry {
let entry_name = entry.file_name();
let fname = entry_name.to_str().unwrap();
// Check if the filename looks like an xlog file, or a .partial file.
if !xlog_utils::IsXLogFileName(fname) && !xlog_utils::IsPartialXLogFileName(fname) {
continue
}
let (segno, _tli) = xlog_utils::XLogFromFileName(fname, wal_seg_size as usize);
let copylen;
let mut dst_fname = PathBuf::from(fname);
if segno > last_segno {
// future segment, skip
continue;
} else if segno < last_segno {
copylen = wal_seg_size;
dst_fname.set_extension("");
} else {
copylen = last_segoff;
dst_fname.set_extension("partial");
}
let src_file = File::open(entry.path())?;
let mut dst_file = File::create(dst_dir.join(&dst_fname))?;
std::io::copy(&mut src_file.take(copylen), &mut dst_file)?;
if copylen < wal_seg_size {
std::io::copy(&mut std::io::repeat(0).take(wal_seg_size - copylen), &mut dst_file)?;
}
}
}
Ok(())
}
// Find the end of valid WAL in a wal directory
pub fn find_end_of_wal(local_env: &LocalEnv, timeline: ZTimelineId) -> Result<Lsn> {
let repopath = &local_env.repo_path;
let waldir = repopath
.join("timelines")
.join(timeline.to_string())
.join("wal");
let (lsn, _tli) = xlog_utils::find_end_of_wal(&waldir, 16 * 1024 * 1024, true);
Ok(Lsn(lsn))
}
// Find the latest snapshot for a timeline
fn find_latest_snapshot(local_env: &LocalEnv, timeline: ZTimelineId) -> Result<(Lsn, PathBuf)> {
let repopath = &local_env.repo_path;
let snapshotsdir = repopath
.join("timelines")
.join(timeline.to_string())
.join("snapshots");
let paths = fs::read_dir(&snapshotsdir)?;
let mut maxsnapshot = Lsn(0);
let mut snapshotdir: Option<PathBuf> = None;
for path in paths {
let path = path?;
let filename = path.file_name().to_str().unwrap().to_owned();
if let Ok(lsn) = Lsn::from_hex(&filename) {
maxsnapshot = std::cmp::max(lsn, maxsnapshot);
snapshotdir = Some(path.path());
}
}
if maxsnapshot == Lsn(0) {
// TODO: check ancestor timeline
anyhow::bail!("no snapshot found in {}", snapshotsdir.display());
}
Ok((maxsnapshot, snapshotdir.unwrap()))
}

View File

@@ -1,123 +1,18 @@
use anyhow::{anyhow, bail, Context, Result};
use nix::sys::signal::{kill, Signal};
use nix::unistd::Pid;
use std::convert::TryInto;
use std::fs;
use std::net::{SocketAddr, TcpStream};
use std::path::{Path, PathBuf};
use std::path::PathBuf;
use std::process::Command;
use std::str::FromStr;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Arc;
use std::thread;
use std::time::Duration;
use std::collections::HashMap;
use anyhow::{anyhow, bail, Result};
use nix::sys::signal::{kill, Signal};
use nix::unistd::Pid;
use postgres::{Client, NoTls};
use pageserver::branches::BranchInfo;
use crate::local_env::LocalEnv;
use pageserver::ZTimelineId;
//
// Collection of several example deployments useful for tests.
//
// I'm intendedly modelling storage and compute control planes as a separate entities
// as it is closer to the actual setup.
//
pub struct TestStorageControlPlane {
pub wal_acceptors: Vec<WalAcceptorNode>,
pub pageserver: Arc<PageServerNode>,
pub test_done: AtomicBool,
pub repopath: PathBuf,
}
impl TestStorageControlPlane {
// Peek into the repository, to grab the timeline ID of given branch
pub fn get_branch_timeline(&self, branchname: &str) -> ZTimelineId {
let branchpath = self.repopath.join("refs/branches/".to_owned() + branchname);
ZTimelineId::from_str(&(fs::read_to_string(&branchpath).unwrap())).unwrap()
}
// postgres <-> page_server
//
// Initialize a new repository and configure a page server to run in it
//
pub fn one_page_server(local_env: &LocalEnv) -> TestStorageControlPlane {
let repopath = local_env.repo_path.clone();
let pserver = Arc::new(PageServerNode {
env: local_env.clone(),
kill_on_exit: true,
listen_address: None,
});
pserver.start().unwrap();
TestStorageControlPlane {
wal_acceptors: Vec::new(),
pageserver: pserver,
test_done: AtomicBool::new(false),
repopath,
}
}
// postgres <-> {wal_acceptor1, wal_acceptor2, ...}
pub fn fault_tolerant(local_env: &LocalEnv, redundancy: usize) -> TestStorageControlPlane {
let repopath = local_env.repo_path.clone();
let mut cplane = TestStorageControlPlane {
wal_acceptors: Vec::new(),
pageserver: Arc::new(PageServerNode {
env: local_env.clone(),
kill_on_exit: true,
listen_address: None,
}),
test_done: AtomicBool::new(false),
repopath,
};
cplane.pageserver.start().unwrap();
const WAL_ACCEPTOR_PORT: usize = 54321;
for i in 0..redundancy {
let wal_acceptor = WalAcceptorNode {
listen: format!("127.0.0.1:{}", WAL_ACCEPTOR_PORT + i)
.parse()
.unwrap(),
data_dir: local_env.repo_path.join(format!("wal_acceptor_{}", i)),
env: local_env.clone(),
};
wal_acceptor.init();
wal_acceptor.start();
cplane.wal_acceptors.push(wal_acceptor);
}
cplane
}
pub fn stop(&self) {
for wa in self.wal_acceptors.iter() {
let _ = wa.stop();
}
self.test_done.store(true, Ordering::Relaxed);
}
pub fn get_wal_acceptor_conn_info(&self) -> String {
self.wal_acceptors
.iter()
.map(|wa| wa.listen.to_string())
.collect::<Vec<String>>()
.join(",")
}
pub fn is_running(&self) -> bool {
self.test_done.load(Ordering::Relaxed)
}
}
impl Drop for TestStorageControlPlane {
fn drop(&mut self) {
self.stop();
}
}
use crate::read_pidfile;
//
// Control routines for pageserver.
@@ -125,8 +20,8 @@ impl Drop for TestStorageControlPlane {
// Used in CLI and tests.
//
pub struct PageServerNode {
kill_on_exit: bool,
listen_address: Option<SocketAddr>,
pub kill_on_exit: bool,
pub listen_address: Option<SocketAddr>,
pub env: LocalEnv,
}
@@ -146,12 +41,32 @@ impl PageServerNode {
}
}
pub fn init(&self) -> Result<()> {
let mut cmd = Command::new(self.env.pageserver_bin()?);
let status = cmd.args(&["--init", "-D", self.env.base_data_dir.to_str().unwrap()])
.env_clear()
.env("RUST_BACKTRACE", "1")
.env("POSTGRES_DISTRIB_DIR", self.env.pg_distrib_dir.to_str().unwrap())
.env("ZENITH_REPO_DIR", self.repo_path())
.env("PATH", self.env.pg_bin_dir().to_str().unwrap()) // needs postres-wal-redo binary
.env("LD_LIBRARY_PATH", self.env.pg_lib_dir().to_str().unwrap())
.env("DYLD_LIBRARY_PATH", self.env.pg_lib_dir().to_str().unwrap())
.status()
.expect("pageserver init failed");
if status.success() {
Ok(())
} else {
Err(anyhow!("pageserver init failed"))
}
}
pub fn repo_path(&self) -> PathBuf {
self.env.repo_path.clone()
self.env.pageserver_data_dir()
}
pub fn pid_file(&self) -> PathBuf {
self.env.repo_path.join("pageserver.pid")
self.repo_path().join("pageserver.pid")
}
pub fn start(&self) -> Result<()> {
@@ -161,11 +76,12 @@ impl PageServerNode {
self.repo_path().display()
);
let mut cmd = Command::new(self.env.zenith_distrib_dir.join("pageserver"));
cmd.args(&["-l", self.address().to_string().as_str()])
let mut cmd = Command::new(self.env.pageserver_bin()?);
cmd.args(&["-l", self.address().to_string().as_str(), "-D", self.repo_path().to_str().unwrap()])
.arg("-d")
.env_clear()
.env("RUST_BACKTRACE", "1")
.env("POSTGRES_DISTRIB_DIR", self.env.pg_distrib_dir.to_str().unwrap())
.env("ZENITH_REPO_DIR", self.repo_path())
.env("PATH", self.env.pg_bin_dir().to_str().unwrap()) // needs postres-wal-redo binary
.env("LD_LIBRARY_PATH", self.env.pg_lib_dir().to_str().unwrap())
@@ -226,9 +142,7 @@ impl PageServerNode {
client.simple_query(sql).unwrap()
}
pub fn page_server_psql_client(
&self,
) -> std::result::Result<postgres::Client, postgres::Error> {
pub fn page_server_psql_client(&self) -> Result<postgres::Client, postgres::Error> {
let connstring = format!(
"host={} port={} dbname={} user={}",
self.address().ip(),
@@ -238,6 +152,74 @@ impl PageServerNode {
);
Client::connect(connstring.as_str(), NoTls)
}
pub fn branches_list(&self) -> Result<Vec<BranchInfo>> {
let mut client = self.page_server_psql_client()?;
let query_result = client.simple_query("pg_list")?;
let branches_json = query_result
.first()
.map(|msg| match msg {
postgres::SimpleQueryMessage::Row(row) => row.get(0),
_ => None,
})
.flatten()
.ok_or_else(|| anyhow!("missing branches"))?;
let res: Vec<BranchInfo> = serde_json::from_str(branches_json)?;
Ok(res)
}
pub fn branch_create(&self, name: &str, startpoint: &str) -> Result<BranchInfo> {
let mut client = self.page_server_psql_client()?;
let query_result =
client.simple_query(format!("branch_create {} {}", name, startpoint).as_str())?;
let branch_json = query_result
.first()
.map(|msg| match msg {
postgres::SimpleQueryMessage::Row(row) => row.get(0),
_ => None,
})
.flatten()
.ok_or_else(|| anyhow!("missing branch"))?;
let res: BranchInfo = serde_json::from_str(branch_json)
.map_err(|e| anyhow!("failed to parse branch_create response: {}: {}", branch_json, e))?;
Ok(res)
}
// TODO: make this a separate request type and avoid loading all the branches
pub fn branch_get_by_name(&self, name: &str) -> Result<BranchInfo> {
let branch_infos = self.branches_list()?;
let branche_by_name: Result<HashMap<String, BranchInfo>> = branch_infos
.into_iter()
.map(|branch_info| Ok((branch_info.name.clone(), branch_info)))
.collect();
let branche_by_name = branche_by_name?;
let branch = branche_by_name
.get(name)
.ok_or_else(|| anyhow!("Branch {} not found", name))?;
Ok(branch.clone())
}
pub fn system_id_get(&self) -> Result<u64> {
let mut client = self.page_server_psql_client()?;
let query_result = client
.simple_query("identify_system")?
.first()
.map(|msg| match msg {
postgres::SimpleQueryMessage::Row(row) => row.get(0),
_ => None,
})
.flatten()
.ok_or_else(|| anyhow!("failed to get system_id"))?
.parse::<u64>()?;
Ok(query_result)
}
}
impl Drop for PageServerNode {
@@ -247,104 +229,3 @@ impl Drop for PageServerNode {
}
}
}
//
// Control routines for WalAcceptor.
//
// Now used only in test setups.
//
pub struct WalAcceptorNode {
listen: SocketAddr,
data_dir: PathBuf,
env: LocalEnv,
}
impl WalAcceptorNode {
pub fn init(&self) {
if self.data_dir.exists() {
fs::remove_dir_all(self.data_dir.clone()).unwrap();
}
fs::create_dir_all(self.data_dir.clone()).unwrap();
}
pub fn start(&self) {
println!(
"Starting wal_acceptor in {} listening '{}'",
self.data_dir.to_str().unwrap(),
self.listen
);
let status = Command::new(self.env.zenith_distrib_dir.join("wal_acceptor"))
.args(&["-D", self.data_dir.to_str().unwrap()])
.args(&["-l", self.listen.to_string().as_str()])
.args(&["--systemid", &self.env.systemid.to_string()])
// Tell page server it can receive WAL from this WAL safekeeper
// FIXME: If there are multiple safekeepers, they will all inform
// the page server. Only the last "notification" will stay in effect.
// So it's pretty random which safekeeper the page server will connect to
.args(&["--pageserver", "127.0.0.1:64000"])
.arg("-d")
.arg("-n")
.status()
.expect("failed to start wal_acceptor");
if !status.success() {
panic!("wal_acceptor start failed");
}
}
pub fn stop(&self) -> Result<()> {
println!("Stopping wal acceptor on {}", self.listen);
let pidfile = self.data_dir.join("wal_acceptor.pid");
let pid = read_pidfile(&pidfile)?;
let pid = Pid::from_raw(pid);
if kill(pid, Signal::SIGTERM).is_err() {
bail!("Failed to kill wal_acceptor with pid {}", pid);
}
Ok(())
}
}
impl Drop for WalAcceptorNode {
fn drop(&mut self) {
// Ignore errors.
let _ = self.stop();
}
}
///////////////////////////////////////////////////////////////////////////////
pub struct WalProposerNode {
pub pid: u32,
}
impl WalProposerNode {
pub fn stop(&self) {
// std::process::Child::id() returns u32, we need i32.
let pid: i32 = self.pid.try_into().unwrap();
let pid = Pid::from_raw(pid);
kill(pid, Signal::SIGTERM).expect("failed to execute kill");
}
}
impl Drop for WalProposerNode {
fn drop(&mut self) {
self.stop();
}
}
/// Read a PID file
///
/// We expect a file that contains a single integer.
/// We return an i32 for compatibility with libc and nix.
fn read_pidfile(pidfile: &Path) -> Result<i32> {
let pid_str = fs::read_to_string(pidfile)
.with_context(|| format!("failed to read pidfile {:?}", pidfile))?;
let pid: i32 = pid_str
.parse()
.map_err(|_| anyhow!("failed to parse pidfile {:?}", pidfile))?;
if pid < 1 {
bail!("pidfile {:?} contained bad value '{}'", pidfile, pid);
}
Ok(pid)
}

View File

@@ -9,7 +9,10 @@ edition = "2018"
[dependencies]
lazy_static = "1.4.0"
rand = "0.8.3"
anyhow = "1.0"
nix = "0.20"
postgres = { git = "https://github.com/zenithdb/rust-postgres.git", rev="9eb0dbfbeb6a6c1b79099b9f7ae4a8c021877858" }
pageserver = { path = "../pageserver" }
walkeeper = { path = "../walkeeper" }
control_plane = { path = "../control_plane" }

View File

@@ -0,0 +1,403 @@
use anyhow::{bail, Result};
use std::sync::{atomic::AtomicBool, Arc};
use std::{
convert::TryInto,
fs::{self, File, OpenOptions},
io::Read,
net::SocketAddr,
path::{Path, PathBuf},
process::{Command, ExitStatus},
sync::atomic::Ordering,
};
use control_plane::compute::PostgresNode;
use control_plane::local_env;
use control_plane::read_pidfile;
use control_plane::{local_env::LocalEnv, storage::PageServerNode};
use nix::sys::signal::{kill, Signal};
use nix::unistd::Pid;
use postgres;
// local compute env for tests
pub fn create_test_env(testname: &str) -> LocalEnv {
let base_path = Path::new(env!("CARGO_MANIFEST_DIR"))
.join("../tmp_check/")
.join(testname);
let base_path_str = base_path.to_str().unwrap();
// Remove remnants of old test repo
let _ = fs::remove_dir_all(&base_path);
fs::create_dir_all(&base_path).expect(format!("could not create directory for {}", base_path_str).as_str());
let pgdatadirs_path = base_path.join("pgdatadirs");
fs::create_dir(&pgdatadirs_path)
.expect(format!("could not create directory {:?}", pgdatadirs_path).as_str());
LocalEnv {
pageserver_connstring: "postgresql://127.0.0.1:64000".to_string(),
pg_distrib_dir: Path::new(env!("CARGO_MANIFEST_DIR")).join("../tmp_install"),
zenith_distrib_dir: Some(local_env::cargo_bin_dir()),
base_data_dir: base_path,
}
}
//
// Collection of several example deployments useful for tests.
//
// I'm intendedly modelling storage and compute control planes as a separate entities
// as it is closer to the actual setup.
//
pub struct TestStorageControlPlane {
pub wal_acceptors: Vec<WalAcceptorNode>,
pub pageserver: Arc<PageServerNode>,
pub test_done: AtomicBool,
}
impl TestStorageControlPlane {
// postgres <-> page_server
//
// Initialize a new repository and configure a page server to run in it
//
pub fn one_page_server(local_env: &LocalEnv) -> TestStorageControlPlane {
let pserver = Arc::new(PageServerNode {
env: local_env.clone(),
kill_on_exit: true,
listen_address: None,
});
pserver.init().unwrap();
pserver.start().unwrap();
TestStorageControlPlane {
wal_acceptors: Vec::new(),
pageserver: pserver,
test_done: AtomicBool::new(false),
}
}
// postgres <-> {wal_acceptor1, wal_acceptor2, ...}
pub fn fault_tolerant(local_env: &LocalEnv, redundancy: usize) -> TestStorageControlPlane {
let mut cplane = TestStorageControlPlane {
wal_acceptors: Vec::new(),
pageserver: Arc::new(PageServerNode {
env: local_env.clone(),
kill_on_exit: true,
listen_address: None,
}),
test_done: AtomicBool::new(false),
// repopath,
};
cplane.pageserver.init().unwrap();
cplane.pageserver.start().unwrap();
let systemid = cplane.pageserver.system_id_get().unwrap();
const WAL_ACCEPTOR_PORT: usize = 54321;
let datadir_base = local_env.base_data_dir.join("safekeepers");
fs::create_dir_all(&datadir_base).unwrap();
for i in 0..redundancy {
let wal_acceptor = WalAcceptorNode {
listen: format!("127.0.0.1:{}", WAL_ACCEPTOR_PORT + i)
.parse()
.unwrap(),
data_dir: datadir_base.join(format!("wal_acceptor_{}", i)),
systemid,
env: local_env.clone(),
pass_to_pageserver: i == 0
};
wal_acceptor.init();
wal_acceptor.start();
cplane.wal_acceptors.push(wal_acceptor);
}
cplane
}
pub fn stop(&self) {
for wa in self.wal_acceptors.iter() {
let _ = wa.stop();
}
self.test_done.store(true, Ordering::Relaxed);
}
pub fn get_wal_acceptor_conn_info(&self) -> String {
self.wal_acceptors
.iter()
.map(|wa| wa.listen.to_string())
.collect::<Vec<String>>()
.join(",")
}
pub fn is_running(&self) -> bool {
self.test_done.load(Ordering::Relaxed)
}
}
impl Drop for TestStorageControlPlane {
fn drop(&mut self) {
self.stop();
}
}
///////////////////////////////////////////////////////////////////////////////
//
// PostgresNodeExt
//
///////////////////////////////////////////////////////////////////////////////
///
/// Testing utilities for PostgresNode type
///
pub trait PostgresNodeExt {
fn pg_regress(&self) -> ExitStatus;
fn pg_bench(&self, clients: u32, seconds: u32) -> ExitStatus;
fn start_proxy(&self, wal_acceptors: &str) -> WalProposerNode;
fn open_psql(&self, db: &str) -> postgres::Client;
fn dump_log_file(&self);
fn safe_psql(&self, db: &str, sql: &str) -> Vec<postgres::Row>;
}
impl PostgresNodeExt for PostgresNode {
fn pg_regress(&self) -> ExitStatus {
self.safe_psql("postgres", "CREATE DATABASE regression");
let regress_run_path = self.env.base_data_dir.join("regress");
fs::create_dir_all(&regress_run_path).unwrap();
fs::create_dir_all(regress_run_path.join("testtablespace")).unwrap();
std::env::set_current_dir(regress_run_path).unwrap();
let regress_build_path =
Path::new(env!("CARGO_MANIFEST_DIR")).join("../tmp_install/build/src/test/regress");
let regress_src_path =
Path::new(env!("CARGO_MANIFEST_DIR")).join("../vendor/postgres/src/test/regress");
let regress_check = Command::new(regress_build_path.join("pg_regress"))
.args(&[
"--bindir=''",
"--use-existing",
format!("--bindir={}", self.env.pg_bin_dir().to_str().unwrap()).as_str(),
format!("--dlpath={}", regress_build_path.to_str().unwrap()).as_str(),
format!(
"--schedule={}",
regress_src_path.join("parallel_schedule").to_str().unwrap()
)
.as_str(),
format!("--inputdir={}", regress_src_path.to_str().unwrap()).as_str(),
])
.env_clear()
.env("LD_LIBRARY_PATH", self.env.pg_lib_dir().to_str().unwrap())
.env("DYLD_LIBRARY_PATH", self.env.pg_lib_dir().to_str().unwrap())
.env("PGPORT", self.address.port().to_string())
.env("PGUSER", self.whoami())
.env("PGHOST", self.address.ip().to_string())
.status()
.expect("pg_regress failed");
if !regress_check.success() {
if let Ok(mut file) = File::open("regression.diffs") {
let mut buffer = String::new();
file.read_to_string(&mut buffer).unwrap();
println!("--------------- regression.diffs:\n{}", buffer);
}
// self.dump_log_file();
if let Ok(mut file) = File::open(self.env.pg_data_dir("pg1").join("log")) {
let mut buffer = String::new();
file.read_to_string(&mut buffer).unwrap();
println!("--------------- pgdatadirs/pg1/log:\n{}", buffer);
}
}
regress_check
}
fn pg_bench(&self, clients: u32, seconds: u32) -> ExitStatus {
let port = self.address.port().to_string();
let clients = clients.to_string();
let seconds = seconds.to_string();
let _pg_bench_init = Command::new(self.env.pg_bin_dir().join("pgbench"))
.args(&["-i", "-p", port.as_str(), "postgres"])
.env("LD_LIBRARY_PATH", self.env.pg_lib_dir().to_str().unwrap())
.env("DYLD_LIBRARY_PATH", self.env.pg_lib_dir().to_str().unwrap())
.status()
.expect("pgbench -i");
let pg_bench_run = Command::new(self.env.pg_bin_dir().join("pgbench"))
.args(&[
"-p",
port.as_str(),
"-T",
seconds.as_str(),
"-P",
"1",
"-c",
clients.as_str(),
"-M",
"prepared",
"postgres",
])
.env("LD_LIBRARY_PATH", self.env.pg_lib_dir().to_str().unwrap())
.env("DYLD_LIBRARY_PATH", self.env.pg_lib_dir().to_str().unwrap())
.status()
.expect("pgbench run");
pg_bench_run
}
fn start_proxy(&self, wal_acceptors: &str) -> WalProposerNode {
let proxy_path = self.env.pg_bin_dir().join("safekeeper_proxy");
match Command::new(proxy_path.as_path())
.args(&["--ztimelineid", &self.timelineid.to_string()])
.args(&["-s", wal_acceptors])
.args(&["-h", &self.address.ip().to_string()])
.args(&["-p", &self.address.port().to_string()])
.arg("-v")
.stderr(
OpenOptions::new()
.create(true)
.append(true)
.open(self.pgdata().join("safekeeper_proxy.log"))
.unwrap(),
)
.spawn()
{
Ok(child) => WalProposerNode { pid: child.id() },
Err(e) => panic!("Failed to launch {:?}: {}", proxy_path, e),
}
}
fn dump_log_file(&self) {
if let Ok(mut file) = File::open(self.env.pageserver_data_dir().join("pageserver.log")) {
let mut buffer = String::new();
file.read_to_string(&mut buffer).unwrap();
println!("--------------- pageserver.log:\n{}", buffer);
}
}
fn safe_psql(&self, db: &str, sql: &str) -> Vec<postgres::Row> {
let connstring = format!(
"host={} port={} dbname={} user={}",
self.address.ip(),
self.address.port(),
db,
self.whoami()
);
let mut client = postgres::Client::connect(connstring.as_str(), postgres::NoTls).unwrap();
println!("Running {}", sql);
let result = client.query(sql, &[]);
if result.is_err() {
// self.dump_log_file();
}
result.unwrap()
}
fn open_psql(&self, db: &str) -> postgres::Client {
let connstring = format!(
"host={} port={} dbname={} user={}",
self.address.ip(),
self.address.port(),
db,
self.whoami()
);
postgres::Client::connect(connstring.as_str(), postgres::NoTls).unwrap()
}
}
///////////////////////////////////////////////////////////////////////////////
//
// WalAcceptorNode
//
///////////////////////////////////////////////////////////////////////////////
//
// Control routines for WalAcceptor.
//
// Now used only in test setups.
//
pub struct WalAcceptorNode {
listen: SocketAddr,
data_dir: PathBuf,
systemid: u64,
env: LocalEnv,
pass_to_pageserver: bool,
}
impl WalAcceptorNode {
pub fn init(&self) {
if self.data_dir.exists() {
fs::remove_dir_all(self.data_dir.clone()).unwrap();
}
fs::create_dir_all(self.data_dir.clone()).unwrap();
}
pub fn start(&self) {
println!(
"Starting wal_acceptor in {} listening '{}'",
self.data_dir.to_str().unwrap(),
self.listen
);
let ps_arg = if self.pass_to_pageserver {
// Tell page server it can receive WAL from this WAL safekeeper
["--pageserver", "127.0.0.1:64000"].to_vec()
} else {
[].to_vec()
};
let status = Command::new(self.env.zenith_distrib_dir.as_ref().unwrap().join("wal_acceptor"))
.args(&["-D", self.data_dir.to_str().unwrap()])
.args(&["-l", self.listen.to_string().as_str()])
.args(&["--systemid", self.systemid.to_string().as_str()])
.args(&ps_arg)
.arg("-d")
.arg("-n")
.status()
.expect("failed to start wal_acceptor");
if !status.success() {
panic!("wal_acceptor start failed");
}
}
pub fn stop(&self) -> Result<()> {
println!("Stopping wal acceptor on {}", self.listen);
let pidfile = self.data_dir.join("wal_acceptor.pid");
let pid = read_pidfile(&pidfile)?;
let pid = Pid::from_raw(pid);
if kill(pid, Signal::SIGTERM).is_err() {
bail!("Failed to kill wal_acceptor with pid {}", pid);
}
Ok(())
}
}
impl Drop for WalAcceptorNode {
fn drop(&mut self) {
// Ignore errors.
let _ = self.stop();
}
}
///////////////////////////////////////////////////////////////////////////////
//
// WalProposerNode
//
///////////////////////////////////////////////////////////////////////////////
pub struct WalProposerNode {
pub pid: u32,
}
impl WalProposerNode {
pub fn stop(&self) {
// std::process::Child::id() returns u32, we need i32.
let pid: i32 = self.pid.try_into().unwrap();
let pid = Pid::from_raw(pid);
kill(pid, Signal::SIGTERM).expect("failed to execute kill");
}
}
impl Drop for WalProposerNode {
fn drop(&mut self) {
self.stop();
}
}

View File

@@ -1,11 +0,0 @@
// test node resettlement to an empty datadir
// TODO
/*
#[test]
fn test_resettlement() {}
// test seq scan of everythin after restart
#[test]
fn test_cold_seqscan() {}
*/

View File

@@ -1,8 +0,0 @@
// TODO
/*
#[test]
fn test_actions() {}
#[test]
fn test_regress() {}
*/

View File

@@ -1,23 +1,22 @@
// mod control_plane;
use control_plane::compute::ComputeControlPlane;
use control_plane::local_env;
use control_plane::local_env::PointInTime;
use control_plane::storage::TestStorageControlPlane;
use integration_tests;
use integration_tests::TestStorageControlPlane;
use integration_tests::PostgresNodeExt;
// XXX: force all redo at the end
// -- restart + seqscan won't read deleted stuff
// -- pageserver api endpoint to check all rels
#[test]
fn test_redo_cases() {
let local_env = local_env::test_env("test_redo_cases");
let local_env = integration_tests::create_test_env("test_redo_cases");
// Start pageserver that reads WAL directly from that postgres
let storage_cplane = TestStorageControlPlane::one_page_server(&local_env);
let mut compute_cplane = ComputeControlPlane::local(&local_env, &storage_cplane.pageserver);
// start postgres
let maintli = storage_cplane.get_branch_timeline("main");
let node = compute_cplane.new_test_node(maintli);
let node = compute_cplane.new_test_node("main");
node.start().unwrap();
// check basic work with table
@@ -51,15 +50,14 @@ fn test_redo_cases() {
// Runs pg_regress on a compute node
#[test]
fn test_regress() {
let local_env = local_env::test_env("test_regress");
let local_env = integration_tests::create_test_env("test_regress");
// Start pageserver that reads WAL directly from that postgres
let storage_cplane = TestStorageControlPlane::one_page_server(&local_env);
let mut compute_cplane = ComputeControlPlane::local(&local_env, &storage_cplane.pageserver);
// start postgres
let maintli = storage_cplane.get_branch_timeline("main");
let node = compute_cplane.new_test_node(maintli);
let node = compute_cplane.new_test_node("main");
node.start().unwrap();
let status = node.pg_regress();
@@ -69,15 +67,14 @@ fn test_regress() {
// Runs pg_bench on a compute node
#[test]
fn pgbench() {
let local_env = local_env::test_env("pgbench");
let local_env = integration_tests::create_test_env("pgbench");
// Start pageserver that reads WAL directly from that postgres
let storage_cplane = TestStorageControlPlane::one_page_server(&local_env);
let mut compute_cplane = ComputeControlPlane::local(&local_env, &storage_cplane.pageserver);
// start postgres
let maintli = storage_cplane.get_branch_timeline("main");
let node = compute_cplane.new_test_node(maintli);
let node = compute_cplane.new_test_node("main");
node.start().unwrap();
let status = node.pg_bench(10, 5);
@@ -87,30 +84,21 @@ fn pgbench() {
// Run two postgres instances on one pageserver, on different timelines
#[test]
fn test_pageserver_two_timelines() {
let local_env = local_env::test_env("test_pageserver_two_timelines");
let local_env = integration_tests::create_test_env("test_pageserver_two_timelines");
// Start pageserver that reads WAL directly from that postgres
let storage_cplane = TestStorageControlPlane::one_page_server(&local_env);
let mut compute_cplane = ComputeControlPlane::local(&local_env, &storage_cplane.pageserver);
let maintli = storage_cplane.get_branch_timeline("main");
// Create new branch at the end of 'main'
let startpoint = local_env::find_end_of_wal(&local_env, maintli).unwrap();
local_env::create_branch(
&local_env,
"experimental",
PointInTime {
timelineid: maintli,
lsn: startpoint,
},
)
.unwrap();
let experimentaltli = storage_cplane.get_branch_timeline("experimental");
storage_cplane
.pageserver
.branch_create("experimental", "main")
.unwrap();
// Launch postgres instances on both branches
let node1 = compute_cplane.new_test_node(maintli);
let node2 = compute_cplane.new_test_node(experimentaltli);
let node1 = compute_cplane.new_test_node("main");
let node2 = compute_cplane.new_test_node("experimental");
node1.start().unwrap();
node2.start().unwrap();

View File

@@ -1,21 +1,20 @@
// Restart acceptors one by one while compute is under the load.
use control_plane::compute::ComputeControlPlane;
use control_plane::local_env;
use control_plane::local_env::PointInTime;
use control_plane::storage::TestStorageControlPlane;
use pageserver::ZTimelineId;
use rand::Rng;
use std::sync::Arc;
use std::time::SystemTime;
use std::{thread, time};
use control_plane::compute::ComputeControlPlane;
use integration_tests;
use integration_tests::TestStorageControlPlane;
use integration_tests::PostgresNodeExt;
const DOWNTIME: u64 = 2;
#[test]
//#[ignore]
fn test_embedded_wal_proposer() {
let local_env = local_env::test_env("test_embedded_wal_proposer");
let local_env = integration_tests::create_test_env("test_embedded_wal_proposer");
const REDUNDANCY: usize = 3;
let storage_cplane = TestStorageControlPlane::fault_tolerant(&local_env, REDUNDANCY);
@@ -23,8 +22,7 @@ fn test_embedded_wal_proposer() {
let wal_acceptors = storage_cplane.get_wal_acceptor_conn_info();
// start postgres
let maintli = storage_cplane.get_branch_timeline("main");
let node = compute_cplane.new_test_master_node(maintli);
let node = compute_cplane.new_test_master_node("main");
node.append_conf(
"postgresql.conf",
&format!("wal_acceptors='{}'\n", wal_acceptors),
@@ -52,7 +50,7 @@ fn test_embedded_wal_proposer() {
#[test]
fn test_acceptors_normal_work() {
let local_env = local_env::test_env("test_acceptors_normal_work");
let local_env = integration_tests::create_test_env("test_acceptors_normal_work");
const REDUNDANCY: usize = 3;
let storage_cplane = TestStorageControlPlane::fault_tolerant(&local_env, REDUNDANCY);
@@ -60,8 +58,7 @@ fn test_acceptors_normal_work() {
let wal_acceptors = storage_cplane.get_wal_acceptor_conn_info();
// start postgres
let maintli = storage_cplane.get_branch_timeline("main");
let node = compute_cplane.new_test_master_node(maintli);
let node = compute_cplane.new_test_master_node("main");
node.start().unwrap();
// start proxy
@@ -93,36 +90,25 @@ fn test_many_timelines() {
// Initialize a new repository, and set up WAL safekeepers and page server.
const REDUNDANCY: usize = 3;
const N_TIMELINES: usize = 5;
let local_env = local_env::test_env("test_many_timelines");
let local_env = integration_tests::create_test_env("test_many_timelines");
let storage_cplane = TestStorageControlPlane::fault_tolerant(&local_env, REDUNDANCY);
let mut compute_cplane = ComputeControlPlane::local(&local_env, &storage_cplane.pageserver);
let wal_acceptors = storage_cplane.get_wal_acceptor_conn_info();
// Create branches
let mut timelines: Vec<ZTimelineId> = Vec::new();
let maintli = storage_cplane.get_branch_timeline("main"); // main branch
timelines.push(maintli);
let startpoint = local_env::find_end_of_wal(&local_env, maintli).unwrap();
let mut timelines: Vec<String> = Vec::new();
timelines.push("main".to_string());
for i in 1..N_TIMELINES {
// additional branches
let branchname = format!("experimental{}", i);
local_env::create_branch(
&local_env,
&branchname,
PointInTime {
timelineid: maintli,
lsn: startpoint,
},
)
.unwrap();
let tli = storage_cplane.get_branch_timeline(&branchname);
timelines.push(tli);
storage_cplane.pageserver.branch_create(&branchname, "main").unwrap();
timelines.push(branchname);
}
// start postgres on each timeline
let mut nodes = Vec::new();
for tli in timelines {
let node = compute_cplane.new_test_node(tli);
for tli_name in timelines {
let node = compute_cplane.new_test_node(&tli_name);
nodes.push(node.clone());
node.start().unwrap();
node.start_proxy(&wal_acceptors);
@@ -159,7 +145,7 @@ fn test_many_timelines() {
// Majority is always alive
#[test]
fn test_acceptors_restarts() {
let local_env = local_env::test_env("test_acceptors_restarts");
let local_env = integration_tests::create_test_env("test_acceptors_restarts");
// Start pageserver that reads WAL directly from that postgres
const REDUNDANCY: usize = 3;
@@ -171,8 +157,7 @@ fn test_acceptors_restarts() {
let mut rng = rand::thread_rng();
// start postgres
let maintli = storage_cplane.get_branch_timeline("main");
let node = compute_cplane.new_test_master_node(maintli);
let node = compute_cplane.new_test_master_node("main");
node.start().unwrap();
// start proxy
@@ -222,7 +207,7 @@ fn start_acceptor(cplane: &Arc<TestStorageControlPlane>, no: usize) {
// N_CRASHES env var
#[test]
fn test_acceptors_unavailability() {
let local_env = local_env::test_env("test_acceptors_unavailability");
let local_env = integration_tests::create_test_env("test_acceptors_unavailability");
// Start pageserver that reads WAL directly from that postgres
const REDUNDANCY: usize = 2;
@@ -232,8 +217,7 @@ fn test_acceptors_unavailability() {
let wal_acceptors = storage_cplane.get_wal_acceptor_conn_info();
// start postgres
let maintli = storage_cplane.get_branch_timeline("main");
let node = compute_cplane.new_test_master_node(maintli);
let node = compute_cplane.new_test_master_node("main");
node.start().unwrap();
// start proxy
@@ -307,7 +291,7 @@ fn simulate_failures(cplane: Arc<TestStorageControlPlane>) {
// Race condition test
#[test]
fn test_race_conditions() {
let local_env = local_env::test_env("test_race_conditions");
let local_env = integration_tests::create_test_env("test_race_conditions");
// Start pageserver that reads WAL directly from that postgres
const REDUNDANCY: usize = 3;
@@ -319,8 +303,7 @@ fn test_race_conditions() {
let wal_acceptors = storage_cplane.get_wal_acceptor_conn_info();
// start postgres
let maintli = storage_cplane.get_branch_timeline("main");
let node = compute_cplane.new_test_master_node(maintli);
let node = compute_cplane.new_test_master_node("main");
node.start().unwrap();
// start proxy

View File

@@ -40,6 +40,7 @@ tar = "0.4.33"
parse_duration = "2.1.1"
serde = { version = "1.0", features = ["derive"] }
serde_json = "1"
fs_extra = "1.2.0"
postgres_ffi = { path = "../postgres_ffi" }
zenith_utils = { path = "../zenith_utils" }

View File

@@ -4,7 +4,8 @@
use log::*;
use parse_duration::parse;
use std::fs::{self, File, OpenOptions};
use std::fs::{File, OpenOptions};
use std::{env, path::PathBuf};
use std::io;
use std::process::exit;
use std::thread;
@@ -16,7 +17,7 @@ use daemonize::Daemonize;
use slog::{Drain, FnValue};
use pageserver::{page_cache, page_service, tui, zenith_repo_dir, PageServerConf};
use pageserver::{page_cache, page_service, tui, PageServerConf, branches};
const DEFAULT_GC_HORIZON: u64 = 64 * 1024 * 1024;
const DEFAULT_GC_PERIOD_SEC: u64 = 10;
@@ -47,6 +48,12 @@ fn main() -> Result<()> {
.takes_value(false)
.help("Run in the background"),
)
.arg(
Arg::with_name("init")
.long("init")
.takes_value(false)
.help("Initialize pageserver repo"),
)
.arg(
Arg::with_name("gc_horizon")
.long("gc_horizon")
@@ -59,16 +66,55 @@ fn main() -> Result<()> {
.takes_value(true)
.help("Interval between garbage collector iterations"),
)
.arg(
Arg::with_name("workdir")
.short("D")
.long("workdir")
.takes_value(true)
.help("Working directory for the pageserver"),
)
.get_matches();
let workdir = if let Some(workdir_arg) = arg_matches.value_of("workdir") {
PathBuf::from(workdir_arg)
} else if let Some(workdir_arg) = std::env::var_os("ZENITH_REPO_DIR") {
PathBuf::from(workdir_arg.to_str().unwrap())
} else {
PathBuf::from(".zenith")
};
let pg_distrib_dir: PathBuf = {
if let Some(postgres_bin) = env::var_os("POSTGRES_DISTRIB_DIR") {
postgres_bin.into()
} else {
let cwd = env::current_dir()?;
cwd.join("tmp_install")
}
};
if !pg_distrib_dir.join("bin/postgres").exists() {
anyhow::bail!("Can't find postgres binary at {:?}", pg_distrib_dir);
}
let mut conf = PageServerConf {
daemonize: false,
interactive: false,
gc_horizon: DEFAULT_GC_HORIZON,
gc_period: Duration::from_secs(DEFAULT_GC_PERIOD_SEC),
listen_addr: "127.0.0.1:5430".parse().unwrap(),
listen_addr: "127.0.0.1:64000".parse().unwrap(),
workdir,
pg_distrib_dir,
};
// Create repo and exit if init was requested
if arg_matches.is_present("init") {
branches::init_repo(&conf)?;
return Ok(());
}
// Set CWD to workdir for non-daemon modes
env::set_current_dir(&conf.workdir)?;
if arg_matches.is_present("daemonize") {
conf.daemonize = true;
}
@@ -98,8 +144,7 @@ fn main() -> Result<()> {
}
fn start_pageserver(conf: &PageServerConf) -> Result<()> {
let repodir = zenith_repo_dir();
let log_filename = repodir.join("pageserver.log");
let log_filename = "pageserver.log";
// Don't open the same file for output multiple times;
// the different fds could overwrite each other's output.
let log_file = OpenOptions::new()
@@ -141,8 +186,8 @@ fn start_pageserver(conf: &PageServerConf) -> Result<()> {
let stderr = log_file;
let daemonize = Daemonize::new()
.pid_file(repodir.join("pageserver.pid"))
.working_directory(repodir)
.pid_file("pageserver.pid")
.working_directory(".")
.stdout(stdout)
.stderr(stderr);
@@ -153,26 +198,14 @@ fn start_pageserver(conf: &PageServerConf) -> Result<()> {
} else {
// change into the repository directory. In daemon mode, Daemonize
// does this for us.
let repodir = zenith_repo_dir();
std::env::set_current_dir(&repodir)?;
info!("Changed current directory to repository in {:?}", &repodir);
std::env::set_current_dir(&conf.workdir)?;
info!("Changed current directory to repository in {:?}", &conf.workdir);
}
let mut threads = Vec::new();
// TODO: Check that it looks like a valid repository before going further
// Create directory for wal-redo datadirs
match fs::create_dir("wal-redo") {
Ok(_) => {}
Err(e) => match e.kind() {
io::ErrorKind::AlreadyExists => {}
_ => {
anyhow::bail!("Failed to create wal-redo data directory: {}", e);
}
},
}
page_cache::init(conf);
// GetPage@LSN requests are served by another thread. (It uses async I/O,

View File

@@ -1,17 +1,135 @@
use anyhow::Result;
use serde::{Deserialize, Serialize};
//
// Branch management code
//
// TODO: move all paths construction to conf impl
//
use anyhow::{Context, Result, anyhow};
use bytes::Bytes;
use postgres_ffi::xlog_utils;
use rand::Rng;
use serde::{Deserialize, Serialize};
use std::{collections::HashMap, fs, path::{Path, PathBuf}, process::{Command, Stdio}, str::FromStr};
use fs_extra;
use fs::File;
use std::io::Read;
use std::env;
use zenith_utils::lsn::Lsn;
use crate::{repository::Repository, ZTimelineId};
use crate::{repository::Repository, ZTimelineId, PageServerConf};
#[derive(Serialize, Deserialize)]
#[derive(Serialize, Deserialize, Clone)]
pub struct BranchInfo {
pub name: String,
pub timeline_id: String,
pub timeline_id: ZTimelineId,
pub latest_valid_lsn: Option<Lsn>,
}
// impl BranchInfo {
// pub fn lsn_string(&self) -> String {
// let lsn_string_opt = self.latest_valid_lsn.map(|lsn| lsn.to_string());
// let lsn_str = lsn_string_opt.as_deref().unwrap_or("?");
// format!("{}@{}", self.name, lsn_str)
// }
// }
#[derive(Debug, Clone, Copy)]
pub struct PointInTime {
pub timelineid: ZTimelineId,
pub lsn: Lsn,
}
pub fn init_repo(conf: &PageServerConf) -> Result<()> {
// top-level dir may exist if we are creating it through CLI
fs::create_dir_all(&conf.workdir)
.with_context(|| format!("could not create directory {}", &conf.workdir.display()))?;
env::set_current_dir(&conf.workdir)?;
fs::create_dir(std::path::Path::new("timelines"))?;
fs::create_dir(std::path::Path::new("refs"))?;
fs::create_dir(std::path::Path::new("refs").join("branches"))?;
fs::create_dir(std::path::Path::new("refs").join("tags"))?;
fs::create_dir(std::path::Path::new("wal-redo"))?;
println!("created directory structure in {}", &conf.workdir.display());
// Create initial timeline
let tli = create_timeline(conf, None)?;
let timelinedir = conf.timeline_path(tli);
println!("created initial timeline {}", timelinedir.display());
// Run initdb
//
// We create the cluster temporarily in a "tmp" directory inside the repository,
// and move it to the right location from there.
let tmppath = std::path::Path::new("tmp");
let initdb_path = conf.pg_bin_dir().join("initdb");
let initdb = Command::new(initdb_path)
.args(&["-D", tmppath.to_str().unwrap()])
.arg("--no-instructions")
.env_clear()
.env("LD_LIBRARY_PATH", conf.pg_lib_dir().to_str().unwrap())
.env(
"DYLD_LIBRARY_PATH",
conf.pg_lib_dir().to_str().unwrap(),
)
.stdout(Stdio::null())
.status()
.with_context(|| "failed to execute initdb")?;
if !initdb.success() {
anyhow::bail!("initdb failed");
}
println!("initdb succeeded");
// Read control file to extract the LSN and system id
let controlfile_path = tmppath.join("global").join("pg_control");
let controlfile = postgres_ffi::decode_pg_control(Bytes::from(fs::read(controlfile_path)?))?;
// let systemid = controlfile.system_identifier;
let lsn = controlfile.checkPoint;
let lsnstr = format!("{:016X}", lsn);
// Move the initial WAL file
fs::rename(
tmppath.join("pg_wal").join("000000010000000000000001"),
timelinedir
.join("wal")
.join("000000010000000000000001.partial"),
)?;
println!("moved initial WAL file");
// Remove pg_wal
fs::remove_dir_all(tmppath.join("pg_wal"))?;
force_crash_recovery(&tmppath)?;
println!("updated pg_control");
let target = timelinedir.join("snapshots").join(&lsnstr);
fs::rename(tmppath, &target)?;
println!("moved 'tmp' to {}", target.display());
// Create 'main' branch to refer to the initial timeline
let data = tli.to_string();
fs::write(conf.branch_path("main"), data)?;
println!("created main branch");
// XXX: do we need that now? -- yep, for test only
// // Also update the system id in the LocalEnv
// local_env.systemid = systemid;
// // write config
// let toml = toml::to_string(&local_env)?;
// fs::write(repopath.join("config"), toml)?;
println!(
"new zenith repository was created in {}",
conf.workdir.display()
);
Ok(())
}
pub(crate) fn get_branches(repository: &dyn Repository) -> Result<Vec<BranchInfo>> {
// adapted from CLI code
let branches_dir = std::path::Path::new("refs").join("branches");
@@ -28,9 +146,263 @@ pub(crate) fn get_branches(repository: &dyn Repository) -> Result<Vec<BranchInfo
Ok(BranchInfo {
name,
timeline_id: timeline_id.to_string(),
timeline_id,
latest_valid_lsn,
})
})
.collect()
}
pub(crate) fn get_system_id(conf: &PageServerConf) -> Result<u64> {
// let branches = get_branches();
let branches_dir = std::path::Path::new("refs").join("branches");
let branches = std::fs::read_dir(&branches_dir)?
.map(|dir_entry_res| {
let dir_entry = dir_entry_res?;
let name = dir_entry.file_name().to_str().unwrap().to_string();
let timeline_id = std::fs::read_to_string(dir_entry.path())?.parse::<ZTimelineId>()?;
Ok((name, timeline_id))
})
.collect::<Result<HashMap<String, ZTimelineId>>>()?;
let main_tli = branches
.get("main")
.ok_or_else(|| anyhow!("Branch main not found"))?;
let (_, main_snap_dir) = find_latest_snapshot(conf, *main_tli)?;
let controlfile_path = main_snap_dir.join("global").join("pg_control");
let controlfile = postgres_ffi::decode_pg_control(Bytes::from(fs::read(controlfile_path)?))?;
Ok(controlfile.system_identifier)
}
pub(crate) fn create_branch(conf: &PageServerConf, branchname: &str, startpoint_str: &str) -> Result<BranchInfo> {
if conf.branch_path(&branchname).exists() {
anyhow::bail!("branch {} already exists", branchname);
}
let mut startpoint = parse_point_in_time(conf, startpoint_str)?;
if startpoint.lsn == Lsn(0) {
// Find end of WAL on the old timeline
let end_of_wal = find_end_of_wal(conf, startpoint.timelineid)?;
println!("branching at end of WAL: {}", end_of_wal);
startpoint.lsn = end_of_wal;
}
// create a new timeline for it
let newtli = create_timeline(conf, Some(startpoint))?;
let newtimelinedir = conf.timeline_path(newtli);
let data = newtli.to_string();
fs::write(conf.branch_path(&branchname), data)?;
// Copy the latest snapshot (TODO: before the startpoint) and all WAL
// TODO: be smarter and avoid the copying...
let (_maxsnapshot, oldsnapshotdir) = find_latest_snapshot(conf, startpoint.timelineid)?;
let copy_opts = fs_extra::dir::CopyOptions::new();
fs_extra::dir::copy(oldsnapshotdir, newtimelinedir.join("snapshots"), &copy_opts)?;
let oldtimelinedir = conf.timeline_path(startpoint.timelineid);
copy_wal(
&oldtimelinedir.join("wal"),
&newtimelinedir.join("wal"),
startpoint.lsn,
16 * 1024 * 1024 // FIXME: assume default WAL segment size
)?;
Ok(BranchInfo {
name: branchname.to_string(),
timeline_id: newtli,
latest_valid_lsn: Some(startpoint.lsn),
})
}
//
// Parse user-given string that represents a point-in-time.
//
// We support multiple variants:
//
// Raw timeline id in hex, meaning the end of that timeline:
// bc62e7d612d0e6fe8f99a6dd2f281f9d
//
// A specific LSN on a timeline:
// bc62e7d612d0e6fe8f99a6dd2f281f9d@2/15D3DD8
//
// Same, with a human-friendly branch name:
// main
// main@2/15D3DD8
//
// Human-friendly tag name:
// mytag
//
//
fn parse_point_in_time(conf: &PageServerConf, s: &str) -> Result<PointInTime> {
let mut strings = s.split('@');
let name = strings.next().unwrap();
let lsn: Option<Lsn>;
if let Some(lsnstr) = strings.next() {
lsn = Some(Lsn::from_str(lsnstr)
.with_context(|| "invalid LSN in point-in-time specification")?);
} else {
lsn = None
}
// Check if it's a tag
if lsn.is_none() {
let tagpath = conf.tag_path(name);
if tagpath.exists() {
let pointstr = fs::read_to_string(tagpath)?;
return parse_point_in_time(conf, &pointstr);
}
}
// Check if it's a branch
// Check if it's branch @ LSN
let branchpath = conf.branch_path(name);
if branchpath.exists() {
let pointstr = fs::read_to_string(branchpath)?;
let mut result = parse_point_in_time(conf, &pointstr)?;
result.lsn = lsn.unwrap_or(Lsn(0));
return Ok(result);
}
// Check if it's a timelineid
// Check if it's timelineid @ LSN
if let Ok(timelineid) = ZTimelineId::from_str(name) {
let tlipath = conf.timeline_path(timelineid);
if tlipath.exists() {
return Ok(PointInTime {
timelineid,
lsn: lsn.unwrap_or(Lsn(0)),
});
}
}
panic!("could not parse point-in-time {}", s);
}
// If control file says the cluster was shut down cleanly, modify it, to mark
// it as crashed. That forces crash recovery when you start the cluster.
//
// FIXME:
// We currently do this to the initial snapshot in "zenith init". It would
// be more natural to do this when the snapshot is restored instead, but we
// currently don't have any code to create new snapshots, so it doesn't matter
// Or better yet, use a less hacky way of putting the cluster into recovery.
// Perhaps create a backup label file in the data directory when it's restored.
fn force_crash_recovery(datadir: &Path) -> Result<()> {
// Read in the control file
let controlfilepath = datadir.to_path_buf().join("global").join("pg_control");
let mut controlfile =
postgres_ffi::decode_pg_control(Bytes::from(fs::read(controlfilepath.as_path())?))?;
controlfile.state = postgres_ffi::DBState_DB_IN_PRODUCTION;
fs::write(
controlfilepath.as_path(),
postgres_ffi::encode_pg_control(controlfile),
)?;
Ok(())
}
fn create_timeline(conf: &PageServerConf, ancestor: Option<PointInTime>) -> Result<ZTimelineId> {
// Create initial timeline
let mut tli_buf = [0u8; 16];
rand::thread_rng().fill(&mut tli_buf);
let timelineid = ZTimelineId::from(tli_buf);
let timelinedir = conf.timeline_path(timelineid);
fs::create_dir(&timelinedir)?;
fs::create_dir(&timelinedir.join("snapshots"))?;
fs::create_dir(&timelinedir.join("wal"))?;
if let Some(ancestor) = ancestor {
let data = format!("{}@{}", ancestor.timelineid, ancestor.lsn);
fs::write(timelinedir.join("ancestor"), data)?;
}
Ok(timelineid)
}
///
/// Copy all WAL segments from one directory to another, up to given LSN.
///
/// If the given LSN is in the middle of a segment, the last segment containing it
/// is written out as .partial, and padded with zeros.
///
fn copy_wal(src_dir: &Path, dst_dir: &Path, upto: Lsn, wal_seg_size: u64) -> Result<()>{
let last_segno = upto.segment_number(wal_seg_size);
let last_segoff = upto.segment_offset(wal_seg_size);
for entry in fs::read_dir(src_dir).unwrap() {
if let Ok(entry) = entry {
let entry_name = entry.file_name();
let fname = entry_name.to_str().unwrap();
// Check if the filename looks like an xlog file, or a .partial file.
if !xlog_utils::IsXLogFileName(fname) && !xlog_utils::IsPartialXLogFileName(fname) {
continue
}
let (segno, _tli) = xlog_utils::XLogFromFileName(fname, wal_seg_size as usize);
let copylen;
let mut dst_fname = PathBuf::from(fname);
if segno > last_segno {
// future segment, skip
continue;
} else if segno < last_segno {
copylen = wal_seg_size;
dst_fname.set_extension("");
} else {
copylen = last_segoff;
dst_fname.set_extension("partial");
}
let src_file = File::open(entry.path())?;
let mut dst_file = File::create(dst_dir.join(&dst_fname))?;
std::io::copy(&mut src_file.take(copylen), &mut dst_file)?;
if copylen < wal_seg_size {
std::io::copy(&mut std::io::repeat(0).take(wal_seg_size - copylen), &mut dst_file)?;
}
}
}
Ok(())
}
// Find the end of valid WAL in a wal directory
pub fn find_end_of_wal(conf: &PageServerConf, timeline: ZTimelineId) -> Result<Lsn> {
let waldir = conf.timeline_path(timeline).join("wal");
let (lsn, _tli) = xlog_utils::find_end_of_wal(&waldir, 16 * 1024 * 1024, true);
Ok(Lsn(lsn))
}
// Find the latest snapshot for a timeline
fn find_latest_snapshot(conf: &PageServerConf, timeline: ZTimelineId) -> Result<(Lsn, PathBuf)> {
let snapshotsdir = conf.snapshots_path(timeline);
let paths = fs::read_dir(&snapshotsdir)?;
let mut maxsnapshot = Lsn(0);
let mut snapshotdir: Option<PathBuf> = None;
for path in paths {
let path = path?;
let filename = path.file_name().to_str().unwrap().to_owned();
if let Ok(lsn) = Lsn::from_hex(&filename) {
maxsnapshot = std::cmp::max(lsn, maxsnapshot);
snapshotdir = Some(path.path());
}
}
if maxsnapshot == Lsn(0) {
// TODO: check ancestor timeline
anyhow::bail!("no snapshot found in {}", snapshotsdir.display());
}
Ok((maxsnapshot, snapshotdir.unwrap()))
}

View File

@@ -26,8 +26,47 @@ pub struct PageServerConf {
pub listen_addr: SocketAddr,
pub gc_horizon: u64,
pub gc_period: Duration,
pub workdir: PathBuf,
pub pg_distrib_dir: PathBuf,
}
impl PageServerConf {
//
// Repository paths, relative to workdir.
//
fn tag_path(&self, name: &str) -> PathBuf {
std::path::Path::new("refs").join("tags").join(name)
}
fn branch_path(&self, name: &str) -> PathBuf {
std::path::Path::new("refs").join("branches").join(name)
}
fn timeline_path(&self, timelineid: ZTimelineId) -> PathBuf {
std::path::Path::new("timelines").join(timelineid.to_string())
}
fn snapshots_path(&self, timelineid: ZTimelineId) -> PathBuf {
std::path::Path::new("timelines").join(timelineid.to_string()).join("snapshots")
}
//
// Postgres distribution paths
//
pub fn pg_bin_dir(&self) -> PathBuf {
self.pg_distrib_dir.join("bin")
}
pub fn pg_lib_dir(&self) -> PathBuf {
self.pg_distrib_dir.join("lib")
}
}
/// Zenith Timeline ID is a 128-bit random ID.
///
/// Zenith timeline IDs are different from PostgreSQL timeline
@@ -89,10 +128,3 @@ impl fmt::Display for ZTimelineId {
}
}
pub fn zenith_repo_dir() -> PathBuf {
// Find repository path
match std::env::var_os("ZENITH_REPO_DIR") {
Some(val) => PathBuf::from(val.to_str().unwrap()),
None => ".zenith".into(),
}
}

View File

@@ -8,7 +8,6 @@ use crate::repository::Repository;
use crate::walredo::PostgresRedoManager;
use crate::PageServerConf;
use lazy_static::lazy_static;
use std::path::Path;
use std::sync::{Arc, Mutex};
lazy_static! {
@@ -22,7 +21,7 @@ pub fn init(conf: &PageServerConf) {
let walredo_mgr = PostgresRedoManager::new(conf);
// we have already changed current dir to the repository.
let repo = RocksRepository::new(conf, Path::new("."), Arc::new(walredo_mgr));
let repo = RocksRepository::new(conf, Arc::new(walredo_mgr));
*m = Some(Arc::new(repo));
}

View File

@@ -30,6 +30,7 @@ use crate::restore_local_repo;
use crate::walreceiver;
use crate::PageServerConf;
use crate::ZTimelineId;
use crate::branches;
#[derive(Debug)]
enum FeMessage {
@@ -690,6 +691,28 @@ impl Connection {
self.write_message_noflush(&BeMessage::CommandComplete)?;
self.write_message(&BeMessage::ReadyForQuery)?;
} else if query_string.starts_with(b"branch_create ") {
let query_str = String::from_utf8(query_string.to_vec())?;
let err = || anyhow!("invalid branch_create: '{}'", query_str);
// branch_create <branchname> <startpoint>
// TODO lazy static
let re = Regex::new(r"^branch_create (\w+) ([\w@\\]+)[\r\n\s]*;?$").unwrap();
let caps = re
.captures(&query_str)
.ok_or_else(err)?;
let branchname: String = String::from(caps.get(1).ok_or_else(err)?.as_str());
let startpoint_str: String = String::from(caps.get(2).ok_or_else(err)?.as_str());
let branch = branches::create_branch(&self.conf, &branchname, &startpoint_str)?;
let branch = serde_json::to_vec(&branch)?;
self.write_message_noflush(&BeMessage::RowDescription)?;
self.write_message_noflush(&BeMessage::DataRow(Bytes::from(branch)))?;
self.write_message_noflush(&BeMessage::CommandComplete)?;
self.write_message(&BeMessage::ReadyForQuery)?;
} else if query_string.starts_with(b"pg_list") {
let branches = crate::branches::get_branches(&*page_cache::get_repository())?;
let branches_buf = serde_json::to_vec(&branches)?;
@@ -708,6 +731,15 @@ impl Connection {
// on connect
self.write_message_noflush(&BeMessage::CommandComplete)?;
self.write_message(&BeMessage::ReadyForQuery)?;
} else if query_string.to_ascii_lowercase().starts_with(b"identify_system") {
// TODO: match postgres response formarmat for 'identify_system'
let system_id = crate::branches::get_system_id(&self.conf)?
.to_string();
self.write_message_noflush(&BeMessage::RowDescription)?;
self.write_message_noflush(&BeMessage::DataRow(Bytes::from(system_id)))?;
self.write_message_noflush(&BeMessage::CommandComplete)?;
self.write_message(&BeMessage::ReadyForQuery)?;
} else {
self.write_message_noflush(&BeMessage::RowDescription)?;
self.write_message_noflush(&HELLO_WORLD_ROW)?;

View File

@@ -291,6 +291,7 @@ mod tests {
use std::path::Path;
use std::str::FromStr;
use std::time::Duration;
use std::env;
fn get_test_conf() -> PageServerConf {
PageServerConf {
@@ -299,6 +300,8 @@ mod tests {
gc_horizon: 64 * 1024 * 1024,
gc_period: Duration::from_secs(10),
listen_addr: "127.0.0.1:5430".parse().unwrap(),
workdir: "".into(),
pg_distrib_dir: "".into(),
}
}
@@ -345,7 +348,9 @@ mod tests {
let repo_dir = Path::new("../tmp_check/test_relsize_repo");
let _ = fs::remove_dir_all(repo_dir);
fs::create_dir_all(repo_dir)?;
let repo = rocksdb::RocksRepository::new(&get_test_conf(), repo_dir, Arc::new(walredo_mgr));
env::set_current_dir(repo_dir)?;
let repo = rocksdb::RocksRepository::new(&get_test_conf(), Arc::new(walredo_mgr));
// get_timeline() with non-existent timeline id should fail
//repo.get_timeline("11223344556677881122334455667788");

View File

@@ -11,6 +11,8 @@ use crate::waldecoder::{Oid, TransactionId};
use crate::walredo::WalRedoManager;
use crate::PageServerConf;
use crate::ZTimelineId;
// use crate::PageServerConf;
// use crate::branches;
use anyhow::{bail, Context, Result};
use bytes::{Buf, BufMut, Bytes, BytesMut};
use log::*;
@@ -18,7 +20,6 @@ use postgres_ffi::pg_constants;
use std::cmp::min;
use std::collections::HashMap;
use std::convert::TryInto;
use std::path::{Path, PathBuf};
use std::sync::atomic::AtomicU64;
use std::sync::atomic::Ordering;
use std::sync::{Arc, Mutex};
@@ -31,7 +32,6 @@ use zenith_utils::seqwait::SeqWait;
static TIMEOUT: Duration = Duration::from_secs(60);
pub struct RocksRepository {
repo_dir: PathBuf,
conf: PageServerConf,
timelines: Mutex<HashMap<ZTimelineId, Arc<RocksTimeline>>>,
@@ -158,11 +158,9 @@ impl CacheEntryContent {
impl RocksRepository {
pub fn new(
conf: &PageServerConf,
repo_dir: &Path,
walredo_mgr: Arc<dyn WalRedoManager>,
) -> RocksRepository {
RocksRepository {
repo_dir: PathBuf::from(repo_dir),
conf: conf.clone(),
timelines: Mutex::new(HashMap::new()),
walredo_mgr,
@@ -188,7 +186,7 @@ impl Repository for RocksRepository {
Some(timeline) => Ok(timeline.clone()),
None => {
let timeline =
RocksTimeline::new(&self.repo_dir, timelineid, self.walredo_mgr.clone());
RocksTimeline::new(&self.conf, timelineid, self.walredo_mgr.clone());
restore_timeline(&self.conf, &timeline, timelineid)?;
@@ -216,7 +214,7 @@ impl Repository for RocksRepository {
fn create_empty_timeline(&self, timelineid: ZTimelineId) -> Result<Arc<dyn Timeline>> {
let mut timelines = self.timelines.lock().unwrap();
let timeline = RocksTimeline::new(&self.repo_dir, timelineid, self.walredo_mgr.clone());
let timeline = RocksTimeline::new(&self.conf, timelineid, self.walredo_mgr.clone());
let timeline_rc = Arc::new(timeline);
let r = timelines.insert(timelineid, timeline_rc.clone());
@@ -229,8 +227,8 @@ impl Repository for RocksRepository {
}
impl RocksTimeline {
fn open_rocksdb(repo_dir: &Path, timelineid: ZTimelineId) -> rocksdb::DB {
let path = repo_dir.join("timelines").join(timelineid.to_string());
fn open_rocksdb(conf: &PageServerConf, timelineid: ZTimelineId) -> rocksdb::DB {
let path = conf.timeline_path(timelineid);
let mut opts = rocksdb::Options::default();
opts.create_if_missing(true);
opts.set_use_fsync(true);
@@ -246,12 +244,12 @@ impl RocksTimeline {
}
fn new(
repo_dir: &Path,
conf: &PageServerConf,
timelineid: ZTimelineId,
walredo_mgr: Arc<dyn WalRedoManager>,
) -> RocksTimeline {
RocksTimeline {
db: RocksTimeline::open_rocksdb(repo_dir, timelineid),
db: RocksTimeline::open_rocksdb(conf, timelineid),
walredo_mgr,

View File

@@ -457,7 +457,9 @@ impl PostgresRedoProcess {
// Start postgres binary in special WAL redo mode.
//
// Tests who run pageserver binary are setting proper PG_BIN_DIR
// and PG_LIB_DIR so that WalRedo would start right postgres. We may later
// and PG_LIB_DIR so that WalRedo would start right postgres.
// do that: We may later
// switch to setting same things in pageserver config file.
async fn launch(datadir: &str) -> Result<PostgresRedoProcess, Error> {
// Create empty data directory for wal-redo postgres deleting old one.

View File

@@ -41,7 +41,7 @@ If you want to run all tests that have the string "bench" in their names:
Useful environment variables:
`ZENITH_BIN`: The directory where zenith binaries can be found.
`POSTGRES_BIN`: The directory where postgres binaries can be found.
`POSTGRES_DISTRIB_DIR`: The directory where postgres distribution can be found.
`TEST_OUTPUT`: Set the directory where test state and test output files
should go.
`TEST_SHARED_FIXTURES`: Try to re-use a single postgres and pageserver

View File

@@ -16,7 +16,7 @@ A fixture is created with the decorator @zenfixture, which is a wrapper around
the standard pytest.fixture with some extra behavior.
There are several environment variables that can control the running of tests:
ZENITH_BIN, POSTGRES_BIN, etc. See README.md for more information.
ZENITH_BIN, POSTGRES_DISTRIB_DIR, etc. See README.md for more information.
To use fixtures in a test file, add this line of code:
@@ -78,7 +78,7 @@ class ZenithCli:
self.bin_zenith = os.path.join(binpath, 'zenith')
self.env = os.environ.copy()
self.env['ZENITH_REPO_DIR'] = repo_dir
self.env['POSTGRES_BIN'] = pg_distrib_dir
self.env['POSTGRES_DISTRIB_DIR'] = pg_distrib_dir
def run(self, arguments):
""" Run "zenith" with the specified arguments.
@@ -108,11 +108,11 @@ class ZenithPageserver:
self.running = False
def start(self):
self.zenith_cli.run(['pageserver', 'start'])
self.zenith_cli.run(['start'])
self.running = True
def stop(self):
self.zenith_cli.run(['pageserver', 'stop'])
self.zenith_cli.run(['stop'])
self.running = True
@@ -316,7 +316,7 @@ def zenith_binpath(base_dir):
@zenfixture
def pg_distrib_dir(base_dir):
""" find the postgress install """
env_postgres_bin = os.environ.get('POSTGRES_BIN')
env_postgres_bin = os.environ.get('POSTGRES_DISTRIB_DIR')
if env_postgres_bin:
pg_dir = env_postgres_bin
else:

View File

@@ -3,7 +3,7 @@ use std::net::SocketAddr;
use std::path::PathBuf;
use std::time::Duration;
mod pq_protocol;
pub mod pq_protocol;
pub mod s3_offload;
pub mod wal_service;

View File

@@ -10,6 +10,18 @@ edition = "2018"
clap = "2.33.0"
anyhow = "1.0"
serde_json = "1"
# rand = "0.8.3"
# tar = "0.4.33"
# serde = { version = "1.0", features = ["derive"] }
# toml = "0.5"
# lazy_static = "1.4"
# regex = "1"
# # hex = "0.4.3"
# bytes = "1.0.1"
# # fs_extra = "1.2.0"
# nix = "0.20"
# # thiserror = "1"
# url = "2.2.2"
postgres = { git = "https://github.com/zenithdb/rust-postgres.git", rev="9eb0dbfbeb6a6c1b79099b9f7ae4a8c021877858" }

View File

@@ -1,55 +1,51 @@
use std::path::{Path, PathBuf};
use std::collections::HashMap;
use std::process::exit;
use std::str::FromStr;
use std::{collections::HashMap, fs};
use anyhow::{Result, Context};
use anyhow::{anyhow, bail};
use anyhow::{Context, anyhow};
use anyhow::Result;
use clap::{App, Arg, ArgMatches, SubCommand};
use control_plane::local_env::LocalEnv;
use control_plane::local_env;
use control_plane::compute::ComputeControlPlane;
use control_plane::storage::PageServerNode;
use control_plane::{compute::ComputeControlPlane, local_env, storage};
use pageserver::{branches::BranchInfo, ZTimelineId};
use pageserver::{ZTimelineId, branches::BranchInfo};
use zenith_utils::lsn::Lsn;
fn zenith_repo_dir() -> PathBuf {
// Find repository path
match std::env::var_os("ZENITH_REPO_DIR") {
Some(val) => PathBuf::from(val.to_str().unwrap()),
None => ".zenith".into(),
}
}
// Main entry point for the 'zenith' CLI utility
//
// This utility can used to work with a local zenith repository.
// In order to run queries in it, you need to launch the page server,
// and a compute node against the page server
// This utility helps to manage zenith installation. That includes following:
// * Management of local postgres installations running on top of the
// pageserver.
// * Providing CLI api to the pageserver (local or remote)
// * TODO: export/import to/from usual postgres
fn main() -> Result<()> {
let name_arg = Arg::with_name("NAME")
.short("n")
.index(1)
.help("name of this postgres instance")
.required(true);
let matches = App::new("zenith")
.about("Zenith CLI")
.subcommand(SubCommand::with_name("init").about("Initialize a new Zenith repository"))
.subcommand(
SubCommand::with_name("init")
.about("Initialize a new Zenith repository")
.arg(
Arg::with_name("remote-pageserver")
.long("remote-pageserver")
.required(false)
.value_name("pageserver-url")
),
)
.subcommand(
SubCommand::with_name("branch")
.about("Create a new branch")
.arg(Arg::with_name("branchname").required(false).index(1))
.arg(Arg::with_name("start-point").required(false).index(2)),
)
.subcommand(
SubCommand::with_name("pageserver")
.about("Manage pageserver instance")
.subcommand(SubCommand::with_name("status"))
.subcommand(SubCommand::with_name("start"))
.subcommand(SubCommand::with_name("stop")),
)
.subcommand(SubCommand::with_name("status"))
.subcommand(SubCommand::with_name("start"))
.subcommand(SubCommand::with_name("stop"))
.subcommand(SubCommand::with_name("restart"))
.subcommand(
SubCommand::with_name("pg")
.about("Manage postgres instances")
@@ -67,52 +63,74 @@ fn main() -> Result<()> {
)
.get_matches();
// handle init separately and exit
// Create config file
if let ("init", Some(sub_args)) = matches.subcommand() {
run_init_cmd(sub_args.clone())?;
exit(0);
let pageserver_uri = sub_args.value_of("pageserver-url");
local_env::init(pageserver_uri)
.with_context(|| "Failed to create cofig file")?;
}
// all other commands would need config
let repopath = zenith_repo_dir();
if !repopath.exists() {
bail!(
"Zenith repository does not exist in {}.\n\
Set ZENITH_REPO_DIR or initialize a new repository with 'zenith init'",
repopath.display()
);
}
// TODO: check that it looks like a zenith repository
let env = match local_env::load_config(&repopath) {
let env = match local_env::load_config() {
Ok(conf) => conf,
Err(e) => {
eprintln!("Error loading config from {}: {}", repopath.display(), e);
eprintln!("Error loading config: {}", e);
exit(1);
}
};
match matches.subcommand() {
("init", Some(_)) => {
panic!() /* Should not happen. Init was handled before */
let pageserver = PageServerNode::from_env(&env);
pageserver.init()?;
}
("branch", Some(sub_args)) => run_branch_cmd(&env, sub_args.clone())?,
("pageserver", Some(sub_args)) => run_pageserver_cmd(&env, sub_args.clone())?,
("branch", Some(sub_args)) => {
let pageserver = PageServerNode::from_env(&env);
if let Some(branchname) = sub_args.value_of("branchname") {
if let Some(startpoint_str) = sub_args.value_of("start-point") {
let branch = pageserver.branch_create(branchname, startpoint_str)?;
println!("Created branch '{}' at {:?}", branch.name, branch.latest_valid_lsn.unwrap_or(Lsn(0)));
} else {
panic!("Missing start-point");
}
} else {
// No arguments, list branches
for branch in pageserver.branches_list()? {
println!(" {}", branch.name);
}
}
}
("start", Some(_sub_m)) => {
let pageserver = storage::PageServerNode::from_env(&env);
let pageserver = PageServerNode::from_env(&env);
if let Err(e) = pageserver.start() {
eprintln!("pageserver start: {}", e);
eprintln!("pageserver start failed: {}", e);
exit(1);
}
}
("stop", Some(_sub_m)) => {
let pageserver = storage::PageServerNode::from_env(&env);
let pageserver = PageServerNode::from_env(&env);
if let Err(e) = pageserver.stop() {
eprintln!("pageserver stop: {}", e);
eprintln!("pageserver stop failed: {}", e);
exit(1);
}
}
("restart", Some(_sub_m)) => {
let pageserver = PageServerNode::from_env(&env);
if let Err(e) = pageserver.stop() {
eprintln!("pageserver stop failed: {}", e);
exit(1);
}
if let Err(e) = pageserver.start() {
eprintln!("pageserver start failed: {}", e);
exit(1);
}
}
@@ -131,38 +149,10 @@ fn main() -> Result<()> {
Ok(())
}
fn run_pageserver_cmd(local_env: &LocalEnv, args: ArgMatches) -> Result<()> {
match args.subcommand() {
("status", Some(_sub_m)) => {
todo!();
}
("start", Some(_sub_m)) => {
let psnode = PageServerNode::from_env(local_env);
psnode.start()?;
println!("Page server started");
}
("stop", Some(_sub_m)) => {
let psnode = PageServerNode::from_env(local_env);
psnode.stop()?;
println!("Page server stopped");
}
_ => unreachable!(),
};
Ok(())
}
// Peek into the repository, to grab the timeline ID of given branch
pub fn get_branch_timeline(repopath: &Path, branchname: &str) -> ZTimelineId {
let branchpath = repopath.join("refs/branches/".to_owned() + branchname);
ZTimelineId::from_str(&(fs::read_to_string(&branchpath).unwrap())).unwrap()
}
/// Returns a map of timeline IDs to branch_name@lsn strings.
/// Connects to the pageserver to query this information.
fn get_branch_infos(env: &LocalEnv) -> Result<HashMap<ZTimelineId, String>> {
let page_server = storage::PageServerNode::from_env(env);
fn get_branch_infos(env: &local_env::LocalEnv) -> Result<HashMap<ZTimelineId, String>> {
let page_server = PageServerNode::from_env(env);
let mut client = page_server.page_server_psql_client()?;
let branches_msgs = client.simple_query("pg_list")?;
@@ -179,11 +169,10 @@ fn get_branch_infos(env: &LocalEnv) -> Result<HashMap<ZTimelineId, String>> {
let branch_infos: Result<HashMap<ZTimelineId, String>> = branch_infos
.into_iter()
.map(|branch_info| {
let timeline_id = ZTimelineId::from_str(&branch_info.timeline_id)?;
let lsn_string_opt = branch_info.latest_valid_lsn.map(|lsn| lsn.to_string());
let lsn_str = lsn_string_opt.as_deref().unwrap_or("?");
let branch_lsn_string = format!("{}@{}", branch_info.name, lsn_str);
Ok((timeline_id, branch_lsn_string))
Ok((branch_info.timeline_id, branch_lsn_string))
})
.collect();
@@ -193,19 +182,11 @@ fn get_branch_infos(env: &LocalEnv) -> Result<HashMap<ZTimelineId, String>> {
fn handle_pg(pg_match: &ArgMatches, env: &local_env::LocalEnv) -> Result<()> {
let mut cplane = ComputeControlPlane::load(env.clone())?;
// FIXME: cheat and resolve the timeline by peeking into the
// repository. In reality, when you're launching a compute node
// against a possibly-remote page server, we wouldn't know what
// branches exist in the remote repository. Or would we require
// that you "zenith fetch" them into a local repoitory first?
match pg_match.subcommand() {
("create", Some(sub_m)) => {
let timeline_arg = sub_m.value_of("timeline").unwrap_or("main");
let timeline = get_branch_timeline(&env.repo_path, timeline_arg);
println!("Initializing Postgres on timeline {}...", timeline);
cplane.new_node(timeline)?;
println!("Initializing Postgres on timeline {}...", timeline_arg);
cplane.new_node(timeline_arg)?;
}
("list", Some(_sub_m)) => {
let branch_infos = get_branch_infos(env).unwrap_or_else(|e| {
@@ -249,121 +230,3 @@ fn handle_pg(pg_match: &ArgMatches, env: &local_env::LocalEnv) -> Result<()> {
Ok(())
}
// "zenith init" - Initialize a new Zenith repository in current dir
fn run_init_cmd(_args: ArgMatches) -> Result<()> {
local_env::init()?;
Ok(())
}
// handle "zenith branch" subcommand
fn run_branch_cmd(local_env: &LocalEnv, args: ArgMatches) -> Result<()> {
let repopath = local_env.repo_path.to_str().unwrap();
if let Some(branchname) = args.value_of("branchname") {
if PathBuf::from(format!("{}/refs/branches/{}", repopath, branchname)).exists() {
anyhow::bail!("branch {} already exists", branchname);
}
if let Some(startpoint_str) = args.value_of("start-point") {
let mut startpoint = parse_point_in_time(startpoint_str)?;
if startpoint.lsn == Lsn(0) {
// Find end of WAL on the old timeline
let end_of_wal = local_env::find_end_of_wal(local_env, startpoint.timelineid)?;
println!("branching at end of WAL: {}", end_of_wal);
startpoint.lsn = end_of_wal;
}
return local_env::create_branch(local_env, branchname, startpoint);
} else {
panic!("Missing start-point");
}
} else {
// No arguments, list branches
list_branches()?;
}
Ok(())
}
fn list_branches() -> Result<()> {
// list branches
let paths = fs::read_dir(zenith_repo_dir().join("refs").join("branches"))?;
for path in paths {
println!(" {}", path?.file_name().to_str().unwrap());
}
Ok(())
}
//
// Parse user-given string that represents a point-in-time.
//
// We support multiple variants:
//
// Raw timeline id in hex, meaning the end of that timeline:
// bc62e7d612d0e6fe8f99a6dd2f281f9d
//
// A specific LSN on a timeline:
// bc62e7d612d0e6fe8f99a6dd2f281f9d@2/15D3DD8
//
// Same, with a human-friendly branch name:
// main
// main@2/15D3DD8
//
// Human-friendly tag name:
// mytag
//
//
fn parse_point_in_time(s: &str) -> Result<local_env::PointInTime> {
let mut strings = s.split('@');
let name = strings.next().unwrap();
let lsn: Option<Lsn>;
if let Some(lsnstr) = strings.next() {
lsn = Some(
Lsn::from_str(lsnstr)
.with_context(|| "invalid LSN in point-in-time specification")?
);
} else {
lsn = None
}
// Check if it's a tag
if lsn.is_none() {
let tagpath = zenith_repo_dir().join("refs").join("tags").join(name);
if tagpath.exists() {
let pointstr = fs::read_to_string(tagpath)?;
return parse_point_in_time(&pointstr);
}
}
// Check if it's a branch
// Check if it's branch @ LSN
let branchpath = zenith_repo_dir().join("refs").join("branches").join(name);
if branchpath.exists() {
let pointstr = fs::read_to_string(branchpath)?;
let mut result = parse_point_in_time(&pointstr)?;
result.lsn = lsn.unwrap_or(Lsn(0));
return Ok(result);
}
// Check if it's a timelineid
// Check if it's timelineid @ LSN
let tlipath = zenith_repo_dir().join("timelines").join(name);
if tlipath.exists() {
let result = local_env::PointInTime {
timelineid: ZTimelineId::from_str(name)?,
lsn: lsn.unwrap_or(Lsn(0)),
};
return Ok(result);
}
panic!("could not parse point-in-time {}", s);
}