Rework controle_plane code to reuse it in CLI.

Move all paths from control_plane to local_env which can set them
for testing environment or for local installation.
This commit is contained in:
Stas Kelvich
2021-04-10 11:55:50 +03:00
parent ba4f8e94aa
commit 59163cf3b3
13 changed files with 648 additions and 223 deletions

2
.gitignore vendored
View File

@@ -1,3 +1,3 @@
/target
/tmp_check
/tmp_check/
/tmp_install

42
Cargo.lock generated
View File

@@ -378,6 +378,21 @@ version = "0.1.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "245097e9a4535ee1e3e3931fcfcd55a796a44c643e8596ff6566d68f09b87bbc"
[[package]]
name = "control_plane"
version = "0.1.0"
dependencies = [
"home",
"pageserver",
"postgres",
"rand 0.8.3",
"serde",
"serde_derive",
"tokio-postgres",
"toml",
"walkeeper",
]
[[package]]
name = "core-foundation"
version = "0.9.1"
@@ -795,6 +810,15 @@ dependencies = [
"digest",
]
[[package]]
name = "home"
version = "0.5.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "2456aef2e6b6a9784192ae780c0f15bc57df0e918585282325e8c8ac27737654"
dependencies = [
"winapi",
]
[[package]]
name = "http"
version = "0.2.3"
@@ -900,6 +924,7 @@ dependencies = [
name = "integration_tests"
version = "0.1.0"
dependencies = [
"control_plane",
"lazy_static",
"pageserver",
"postgres",
@@ -2094,6 +2119,15 @@ dependencies = [
"tokio",
]
[[package]]
name = "toml"
version = "0.5.8"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a31142970826733df8241ef35dc040ef98c679ab14d7c3e54d827099b3acecaa"
dependencies = [
"serde",
]
[[package]]
name = "tower-service"
version = "0.3.1"
@@ -2417,3 +2451,11 @@ name = "xml-rs"
version = "0.8.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b07db065a5cf61a7e4ba64f29e67db906fb1787316516c4e6e5ff0fea1efcd8a"
[[package]]
name = "zenith"
version = "0.1.0"
dependencies = [
"clap",
"control_plane",
]

View File

@@ -3,4 +3,6 @@ members = [
"integration_tests",
"pageserver",
"walkeeper",
"zenith",
"control_plane",
]

1
control_plane/.gitignore vendored Normal file
View File

@@ -0,0 +1 @@
tmp_check/

20
control_plane/Cargo.toml Normal file
View File

@@ -0,0 +1,20 @@
[package]
name = "control_plane"
version = "0.1.0"
authors = ["Stas Kelvich <stas@zenith.tech>"]
edition = "2018"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[dependencies]
rand = "0.8.3"
postgres = { git = "https://github.com/kelvich/rust-postgres", branch = "replication_rebase" }
tokio-postgres = { git = "https://github.com/kelvich/rust-postgres", branch = "replication_rebase" }
serde = ""
serde_derive = ""
toml = ""
home = "0.5.3"
pageserver = { path = "../pageserver" }
walkeeper = { path = "../walkeeper" }

View File

@@ -9,76 +9,68 @@
use std::fs::File;
use std::fs::{self, OpenOptions};
use std::net::TcpStream;
use std::path::{Path, PathBuf};
use std::process::Command;
use std::str;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Arc;
use std::thread;
use std::time::Duration;
use std::{
io::Write,
net::{IpAddr, Ipv4Addr, SocketAddr},
};
use lazy_static::lazy_static;
pub mod local_env;
use local_env::LocalEnv;
use postgres::{Client, NoTls};
lazy_static! {
// postgres would be there if it was build by 'make postgres' here in the repo
pub static ref PG_BIN_DIR : PathBuf = Path::new(env!("CARGO_MANIFEST_DIR"))
.join("../tmp_install/bin");
pub static ref PG_LIB_DIR : PathBuf = Path::new(env!("CARGO_MANIFEST_DIR"))
.join("../tmp_install/lib");
pub static ref BIN_DIR : PathBuf = cargo_bin_dir();
pub static ref TEST_WORKDIR : PathBuf = Path::new(env!("CARGO_MANIFEST_DIR"))
.join("tmp_check");
}
// Find the directory where the binaries were put (i.e. target/debug/)
pub fn cargo_bin_dir() -> PathBuf {
let mut pathbuf = std::env::current_exe().ok().unwrap();
pathbuf.pop();
if pathbuf.ends_with("deps") {
pathbuf.pop();
}
return pathbuf;
}
//
// Collection of several example deployments useful for tests.
//
// I'm intendedly modelling storage and compute control planes as a separate entities
// as it is closer to the actual setup.
//
pub struct StorageControlPlane {
pub struct TestStorageControlPlane {
pub wal_acceptors: Vec<WalAcceptorNode>,
pub page_servers: Vec<PageServerNode>,
pub pageserver: Arc<PageServerNode>,
pub test_done: AtomicBool,
}
impl StorageControlPlane {
impl TestStorageControlPlane {
// postgres <-> page_server
pub fn one_page_server() -> StorageControlPlane {
let mut cplane = StorageControlPlane {
wal_acceptors: Vec::new(),
page_servers: Vec::new(),
};
pub fn one_page_server() -> TestStorageControlPlane {
let env = local_env::test_env();
let pserver = PageServerNode {
page_service_addr: "127.0.0.1:65200".parse().unwrap(),
data_dir: TEST_WORKDIR.join("pageserver"),
};
let pserver = Arc::new(PageServerNode {
env: env.clone(),
kill_on_exit: true,
});
pserver.init();
pserver.start();
cplane.page_servers.push(pserver);
cplane
TestStorageControlPlane {
wal_acceptors: Vec::new(),
pageserver: pserver,
test_done: AtomicBool::new(false),
}
}
pub fn fault_tolerant(redundancy: usize) -> StorageControlPlane {
let mut cplane = StorageControlPlane {
// postgres <-> {wal_acceptor1, wal_acceptor2, ...}
pub fn fault_tolerant(redundancy: usize) -> TestStorageControlPlane {
let env = local_env::test_env();
let mut cplane = TestStorageControlPlane {
wal_acceptors: Vec::new(),
page_servers: Vec::new(),
pageserver: Arc::new(PageServerNode {
env: env.clone(),
kill_on_exit: true,
}),
test_done: AtomicBool::new(false),
};
cplane.pageserver.init();
cplane.pageserver.start();
const WAL_ACCEPTOR_PORT: usize = 54321;
for i in 0..redundancy {
@@ -86,7 +78,8 @@ impl StorageControlPlane {
listen: format!("127.0.0.1:{}", WAL_ACCEPTOR_PORT + i)
.parse()
.unwrap(),
data_dir: TEST_WORKDIR.join(format!("wal_acceptor_{}", i)),
data_dir: env.data_dir.join(format!("wal_acceptor_{}", i)),
env: env.clone(),
};
wal_acceptor.init();
wal_acceptor.start();
@@ -96,17 +89,7 @@ impl StorageControlPlane {
}
pub fn stop(&self) {
for wa in self.wal_acceptors.iter() {
wa.stop();
}
}
// // postgres <-> wal_acceptor x3 <-> page_server
// fn local(&mut self) -> StorageControlPlane {
// }
pub fn page_server_addr(&self) -> &SocketAddr {
&self.page_servers[0].page_service_addr
self.test_done.store(true, Ordering::Relaxed);
}
pub fn get_wal_acceptor_conn_info(&self) -> String {
@@ -117,13 +100,99 @@ impl StorageControlPlane {
.join(",")
}
pub fn is_running(&self) -> bool {
self.test_done.load(Ordering::Relaxed)
}
}
impl Drop for TestStorageControlPlane {
fn drop(&mut self) {
self.stop();
}
}
//
// Control routines for pageserver.
//
// Used in CLI and tests.
//
pub struct PageServerNode {
kill_on_exit: bool,
env: LocalEnv,
}
impl PageServerNode {
pub fn init(&self) {
fs::create_dir_all(self.env.pageserver_data_dir()).unwrap();
}
pub fn start(&self) {
println!(
"Starting pageserver at '{}'",
self.env.pageserver.listen_address
);
let status = Command::new(self.env.zenith_distrib_dir.join("pageserver")) // XXX -> method
.args(&["-D", self.env.pageserver_data_dir().to_str().unwrap()])
.args(&[
"-l",
self.env.pageserver.listen_address.to_string().as_str(),
])
.arg("-d")
.arg("--skip-recovery")
.env_clear()
.env("PATH", self.env.pg_bin_dir().to_str().unwrap()) // needs postres-wal-redo binary
.env("LD_LIBRARY_PATH", self.env.pg_lib_dir().to_str().unwrap())
.status()
.expect("failed to start pageserver");
if !status.success() {
panic!("pageserver start failed");
}
}
pub fn stop(&self) {
let pidfile = self.env.pageserver_pidfile();
let pid = fs::read_to_string(pidfile).unwrap();
let status = Command::new("kill")
.arg(pid)
.env_clear()
.status()
.expect("failed to execute kill");
if !status.success() {
panic!("kill start failed");
}
// await for pageserver stop
for _ in 0..5 {
let stream = TcpStream::connect(self.env.pageserver.listen_address);
if let Err(_e) = stream {
return;
}
println!(
"Stopping pageserver on {}",
self.env.pageserver.listen_address
);
thread::sleep(Duration::from_secs(1));
}
// ok, we failed to stop pageserver, let's panic
panic!("Failed to stop pageserver");
}
pub fn address(&self) -> &std::net::SocketAddr {
&self.env.pageserver.listen_address
}
pub fn page_server_psql(&self, sql: &str) -> Vec<postgres::SimpleQueryMessage> {
let addr = &self.page_servers[0].page_service_addr;
// let addr = &self.page_servers[0].env.pageserver.listen_address;
let connstring = format!(
"host={} port={} dbname={} user={}",
addr.ip(),
addr.port(),
self.address().ip(),
self.address().port(),
"no_db",
"no_user",
);
@@ -134,83 +203,23 @@ impl StorageControlPlane {
}
}
impl Drop for StorageControlPlane {
fn drop(&mut self) {
self.stop();
}
}
pub struct PageServerNode {
page_service_addr: SocketAddr,
data_dir: PathBuf,
}
impl PageServerNode {
// TODO: method to force redo on a specific relation
// TODO: make wal-redo-postgres workable without data directory?
pub fn init(&self) {
fs::create_dir_all(self.data_dir.clone()).unwrap();
let datadir_path = self.data_dir.join("wal_redo_pgdata");
fs::remove_dir_all(datadir_path.to_str().unwrap()).ok();
let initdb = Command::new(PG_BIN_DIR.join("initdb"))
.args(&["-D", datadir_path.to_str().unwrap()])
.arg("-N")
.arg("--no-instructions")
.env_clear()
.env("LD_LIBRARY_PATH", PG_LIB_DIR.to_str().unwrap())
.status()
.expect("failed to execute initdb");
if !initdb.success() {
panic!("initdb failed");
}
}
pub fn start(&self) {
println!("Starting pageserver at '{}'", self.page_service_addr);
let status = Command::new(BIN_DIR.join("pageserver"))
.args(&["-D", self.data_dir.to_str().unwrap()])
.args(&["-l", self.page_service_addr.to_string().as_str()])
.arg("-d")
.arg("--skip-recovery")
.env_clear()
.env("PATH", PG_BIN_DIR.to_str().unwrap()) // path to postres-wal-redo binary
.status()
.expect("failed to start pageserver");
if !status.success() {
panic!("pageserver start failed");
}
}
pub fn stop(&self) {
let pidfile = self.data_dir.join("pageserver.pid");
let pid = fs::read_to_string(pidfile).unwrap();
let status = Command::new("kill")
.arg(pid)
.env_clear()
.status()
.expect("failed to execute kill");
if !status.success() {
panic!("kill start failed");
}
}
}
impl Drop for PageServerNode {
fn drop(&mut self) {
self.stop();
// fs::remove_dir_all(self.data_dir.clone()).unwrap();
if self.kill_on_exit {
self.stop();
}
}
}
//
// Control routines for WalAcceptor.
//
// Now used only in test setups.
//
pub struct WalAcceptorNode {
listen: SocketAddr,
data_dir: PathBuf,
env: LocalEnv,
}
impl WalAcceptorNode {
@@ -228,7 +237,7 @@ impl WalAcceptorNode {
self.listen
);
let status = Command::new(BIN_DIR.join("wal_acceptor"))
let status = Command::new(self.env.zenith_distrib_dir.join("wal_acceptor"))
.args(&["-D", self.data_dir.to_str().unwrap()])
.args(&["-l", self.listen.to_string().as_str()])
.arg("-d")
@@ -242,6 +251,7 @@ impl WalAcceptorNode {
}
pub fn stop(&self) {
println!("Stopping wal acceptor on {}", self.listen);
let pidfile = self.data_dir.join("wal_acceptor.pid");
if let Ok(pid) = fs::read_to_string(pidfile) {
let _status = Command::new("kill")
@@ -256,7 +266,6 @@ impl WalAcceptorNode {
impl Drop for WalAcceptorNode {
fn drop(&mut self) {
self.stop();
// fs::remove_dir_all(self.data_dir.clone()).unwrap();
}
}
@@ -265,22 +274,25 @@ impl Drop for WalAcceptorNode {
//
// ComputeControlPlane
//
pub struct ComputeControlPlane<'a> {
pub struct ComputeControlPlane {
pg_bin_dir: PathBuf,
work_dir: PathBuf,
last_assigned_port: u16,
storage_cplane: &'a StorageControlPlane,
pageserver: Arc<PageServerNode>,
nodes: Vec<Arc<PostgresNode>>,
env: LocalEnv,
}
impl ComputeControlPlane<'_> {
pub fn local(storage_cplane: &StorageControlPlane) -> ComputeControlPlane {
impl ComputeControlPlane {
pub fn local(pageserver: &Arc<PageServerNode>) -> ComputeControlPlane {
let env = local_env::test_env();
ComputeControlPlane {
pg_bin_dir: PG_BIN_DIR.to_path_buf(),
work_dir: TEST_WORKDIR.to_path_buf(),
pg_bin_dir: env.pg_bin_dir(),
work_dir: env.data_dir.clone(),
last_assigned_port: 65431,
storage_cplane: storage_cplane,
pageserver: Arc::clone(pageserver),
nodes: Vec::new(),
env: env.clone(),
}
}
@@ -296,24 +308,29 @@ impl ComputeControlPlane<'_> {
let node_id = self.nodes.len() + 1;
let node = PostgresNode {
_node_id: node_id,
port: self.get_port(),
ip: IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)),
address: SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), self.get_port()),
pgdata: self.work_dir.join(format!("compute/pg{}", node_id)),
pg_bin_dir: self.pg_bin_dir.clone(),
env: self.env.clone(),
pageserver: Arc::clone(&self.pageserver),
};
self.nodes.push(Arc::new(node));
let node = self.nodes.last().unwrap();
println!(
"Creating new postgres: path={} port={}",
node.pgdata.to_str().unwrap(),
node.address.port()
);
// initialize data directory
fs::remove_dir_all(node.pgdata.to_str().unwrap()).ok();
let initdb_path = self.pg_bin_dir.join("initdb");
println!("initdb_path: {}", initdb_path.to_str().unwrap());
let initdb = Command::new(initdb_path)
.args(&["-D", node.pgdata.to_str().unwrap()])
.arg("-N")
.arg("--no-instructions")
.env_clear()
.env("LD_LIBRARY_PATH", PG_LIB_DIR.to_str().unwrap())
.env("LD_LIBRARY_PATH", self.env.pg_lib_dir().to_str().unwrap())
.status()
.expect("failed to execute initdb");
@@ -340,8 +357,8 @@ impl ComputeControlPlane<'_> {
listen_addresses = '{address}'\n\
port = {port}\n\
",
address = node.ip,
port = node.port
address = node.address.ip(),
port = node.address.port()
)
.as_str(),
);
@@ -357,10 +374,10 @@ impl ComputeControlPlane<'_> {
let node_id = self.nodes.len() + 1;
let node = PostgresNode {
_node_id: node_id,
port: self.get_port(),
ip: IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)),
address: SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), self.get_port()),
pgdata: self.work_dir.join(format!("compute/pg{}", node_id)),
pg_bin_dir: self.pg_bin_dir.clone(),
env: self.env.clone(),
pageserver: Arc::clone(&self.pageserver),
};
self.nodes.push(Arc::new(node));
let node = self.nodes.last().unwrap();
@@ -375,7 +392,7 @@ impl ComputeControlPlane<'_> {
.arg("--no-instructions")
.arg("--compute-node")
.env_clear()
.env("LD_LIBRARY_PATH", PG_LIB_DIR.to_str().unwrap())
.env("LD_LIBRARY_PATH", self.env.pg_lib_dir().to_str().unwrap())
.status()
.expect("failed to execute initdb");
@@ -398,8 +415,8 @@ impl ComputeControlPlane<'_> {
port = {port}\n\
computenode_mode = true\n\
",
address = node.ip,
port = node.port
address = node.address.ip(),
port = node.address.port()
)
.as_str(),
);
@@ -408,20 +425,18 @@ impl ComputeControlPlane<'_> {
}
pub fn new_node(&mut self) -> Arc<PostgresNode> {
let storage_cplane = self.storage_cplane;
let addr = self.pageserver.address().clone();
let node = self.new_vanilla_node();
let pserver = storage_cplane.page_server_addr();
// Configure that node to take pages from pageserver
node.append_conf(
"postgresql.conf",
format!(
"\
page_server_connstring = 'host={} port={}'\n\
",
pserver.ip(),
pserver.port()
page_server_connstring = 'host={} port={}'\n\
",
addr.ip(),
addr.port()
)
.as_str(),
);
@@ -434,8 +449,7 @@ impl ComputeControlPlane<'_> {
node.append_conf(
"postgresql.conf",
"synchronous_standby_names = 'safekeeper_proxy'\n\
",
"synchronous_standby_names = 'safekeeper_proxy'\n",
);
node.clone()
}
@@ -470,11 +484,11 @@ impl Drop for WalProposerNode {
///////////////////////////////////////////////////////////////////////////////
pub struct PostgresNode {
pub address: SocketAddr,
_node_id: usize,
pub port: u16,
pub ip: IpAddr,
pgdata: PathBuf,
pg_bin_dir: PathBuf,
pub env: LocalEnv,
pageserver: Arc<PageServerNode>,
}
impl PostgresNode {
@@ -488,7 +502,7 @@ impl PostgresNode {
}
fn pg_ctl(&self, args: &[&str], check_ok: bool) {
let pg_ctl_path = self.pg_bin_dir.join("pg_ctl");
let pg_ctl_path = self.env.pg_bin_dir().join("pg_ctl");
let pg_ctl = Command::new(pg_ctl_path)
.args(
[
@@ -503,7 +517,7 @@ impl PostgresNode {
.concat(),
)
.env_clear()
.env("LD_LIBRARY_PATH", PG_LIB_DIR.to_str().unwrap())
.env("LD_LIBRARY_PATH", self.env.pg_lib_dir().to_str().unwrap())
.status()
.expect("failed to execute pg_ctl");
@@ -512,11 +526,10 @@ impl PostgresNode {
}
}
pub fn start(&self, storage_cplane: &StorageControlPlane) {
if storage_cplane.page_servers.len() != 0 {
let _res =
storage_cplane.page_server_psql(format!("callmemaybe {}", self.connstr()).as_str());
}
pub fn start(&self) {
let _res = self
.pageserver
.page_server_psql(format!("callmemaybe {}", self.connstr()).as_str());
println!("Starting postgres node at '{}'", self.connstr());
self.pg_ctl(&["start"], true);
}
@@ -530,7 +543,12 @@ impl PostgresNode {
}
pub fn connstr(&self) -> String {
format!("host={} port={} user={}", self.ip, self.port, self.whoami())
format!(
"host={} port={} user={}",
self.address.ip(),
self.address.port(),
self.whoami()
)
}
// XXX: cache that in control plane
@@ -549,8 +567,8 @@ impl PostgresNode {
pub fn safe_psql(&self, db: &str, sql: &str) -> Vec<tokio_postgres::Row> {
let connstring = format!(
"host={} port={} dbname={} user={}",
self.ip,
self.port,
self.address.ip(),
self.address.port(),
db,
self.whoami()
);
@@ -563,8 +581,8 @@ impl PostgresNode {
pub fn open_psql(&self, db: &str) -> Client {
let connstring = format!(
"host={} port={} dbname={} user={}",
self.ip,
self.port,
self.address.ip(),
self.address.port(),
db,
self.whoami()
);
@@ -583,7 +601,7 @@ impl PostgresNode {
File::create(filepath).unwrap();
}
let pg_resetwal_path = self.pg_bin_dir.join("pg_resetwal");
let pg_resetwal_path = self.env.pg_bin_dir().join("pg_resetwal");
let pg_resetwal = Command::new(pg_resetwal_path)
.args(&["-D", self.pgdata.to_str().unwrap()])
@@ -599,13 +617,13 @@ impl PostgresNode {
}
pub fn start_proxy(&self, wal_acceptors: String) -> WalProposerNode {
let proxy_path = PG_BIN_DIR.join("safekeeper_proxy");
let proxy_path = self.env.pg_bin_dir().join("safekeeper_proxy");
match Command::new(proxy_path.as_path())
.args(&["-s", &wal_acceptors])
.args(&["-h", &self.ip.to_string()])
.args(&["-p", &self.port.to_string()])
.args(&["-h", &self.address.ip().to_string()])
.args(&["-p", &self.address.port().to_string()])
.arg("-v")
.stderr(File::create(TEST_WORKDIR.join("safepkeeper_proxy.log")).unwrap())
.stderr(File::create(self.env.data_dir.join("safepkeeper_proxy.log")).unwrap())
.spawn()
{
Ok(child) => WalProposerNode { pid: child.id() },
@@ -644,7 +662,7 @@ pub fn regress_check(pg: &PostgresNode) {
.args(&[
"--bindir=''",
"--use-existing",
format!("--bindir={}", PG_BIN_DIR.to_str().unwrap()).as_str(),
format!("--bindir={}", pg.env.pg_bin_dir().to_str().unwrap()).as_str(),
format!("--dlpath={}", regress_build_path.to_str().unwrap()).as_str(),
format!(
"--schedule={}",
@@ -654,10 +672,10 @@ pub fn regress_check(pg: &PostgresNode) {
format!("--inputdir={}", regress_src_path.to_str().unwrap()).as_str(),
])
.env_clear()
.env("LD_LIBRARY_PATH", PG_LIB_DIR.to_str().unwrap())
.env("PGPORT", pg.port.to_string())
.env("LD_LIBRARY_PATH", pg.env.pg_lib_dir().to_str().unwrap())
.env("PGHOST", pg.address.ip().to_string())
.env("PGPORT", pg.address.port().to_string())
.env("PGUSER", pg.whoami())
.env("PGHOST", pg.ip.to_string())
.status()
.expect("pg_regress failed");
}

View File

@@ -0,0 +1,210 @@
//
// This module is responsible for locating and loading paths in a local setup.
//
// Now it also provides init method which acts like a stub for proper installation
// script which will use local paths.
//
use std::env;
use std::error;
use std::fs;
use std::net::SocketAddr;
use std::path::{Path, PathBuf};
use home;
use serde_derive::{Deserialize, Serialize};
type Result<T> = std::result::Result<T, Box<dyn error::Error>>;
//
// This data structure represents deserialized zenith config, which should be
// located in ~/.zenith
//
// TODO: should we also support ZENITH_CONF env var?
//
#[derive(Serialize, Deserialize, Clone)]
pub struct LocalEnv {
// Here page server and compute nodes will create and store their data.
pub data_dir: PathBuf,
// Path to postgres distribution. It expected that "bin", "include",
// "lib", "share" from postgres distribution will be there. If at some point
// in time we will be able to run against vanilla postgres we may split that
// to four separate paths and match OS-specific installation layout.
pub pg_distrib_dir: PathBuf,
// Path to pageserver binary.
pub zenith_distrib_dir: PathBuf,
// PageServer-specific options.
pub pageserver: PageServerConf,
}
impl LocalEnv {
pub fn pg_bin_dir(&self) -> PathBuf {
self.pg_distrib_dir.join("bin")
}
pub fn pg_lib_dir(&self) -> PathBuf {
self.pg_distrib_dir.join("lib")
}
pub fn pageserver_data_dir(&self) -> PathBuf {
self.data_dir.join("pageserver")
}
pub fn pageserver_log(&self) -> PathBuf {
self.pageserver_data_dir().join("pageserver.log")
}
pub fn pageserver_pidfile(&self) -> PathBuf {
self.pageserver_data_dir().join("pageserver.pid")
}
}
#[derive(Serialize, Deserialize, Clone)]
pub struct PageServerConf {
pub listen_address: SocketAddr,
}
//
// Issues in rust-lang repo has several discussions about proper library to check
// home directory in a cross-platform way. Seems that current consensus is around
// home crate and cargo uses it.
//
fn get_home() -> Result<PathBuf> {
match home::home_dir() {
Some(path) => Ok(path),
None => {
return Err(Box::<dyn error::Error>::from(
"can not determine home directory path",
));
}
}
}
pub fn init() -> Result<()> {
let home_dir = get_home()?;
// check if config already exists
let cfg_path = home_dir.join(".zenith");
if cfg_path.exists() {
let err_msg = format!(
"{} already exists. Perhaps already initialized?",
cfg_path.to_str().unwrap()
);
return Err(Box::<dyn error::Error>::from(err_msg));
}
// Now we can run init only from crate directory, so check that current dir is our crate.
// Use 'pageserver/Cargo.toml' existence as evidendce.
let cargo_path = env::current_dir()?;
if !cargo_path.join("pageserver/Cargo.toml").exists() {
let err_msg = "Current dirrectory does not look like a zenith repo. \
Please, run 'init' from zenith repo root.";
return Err(Box::<dyn error::Error>::from(err_msg));
}
// ok, now check that expected binaries are present
// check postgres
let pg_distrib_dir = cargo_path.join("tmp_install");
let pg_path = pg_distrib_dir.join("bin/postgres");
if !pg_path.exists() {
let err_msg = format!(
"Can't find postres binary at {}. \
Perhaps './pgbuild.sh' is needed to build it first.",
pg_path.to_str().unwrap()
);
return Err(Box::<dyn error::Error>::from(err_msg));
}
// check pageserver
let zenith_distrib_dir = cargo_path.join("target/debug/");
let pageserver_path = zenith_distrib_dir.join("pageserver");
if !pageserver_path.exists() {
let err_msg = format!(
"Can't find pageserver binary at {}. Please build it.",
pageserver_path.to_str().unwrap()
);
return Err(Box::<dyn error::Error>::from(err_msg));
}
// ok, we are good to go
// create data dir
let data_dir = cargo_path.join("tmp_install");
match fs::create_dir(data_dir.clone()) {
Ok(_) => {}
Err(e) => match e.kind() {
std::io::ErrorKind::AlreadyExists => {}
_ => {
let err_msg = format!(
"Failed to create data directory in '{}': {}",
data_dir.to_str().unwrap(),
e
);
return Err(Box::<dyn error::Error>::from(err_msg));
}
},
}
// write config
let conf = LocalEnv {
data_dir,
pg_distrib_dir,
zenith_distrib_dir,
pageserver: PageServerConf {
listen_address: "127.0.0.1:5430".parse().unwrap(),
},
};
let toml = toml::to_string(&conf)?;
fs::write(cfg_path, toml)?;
Ok(())
}
// check that config file is present
pub fn load_config() -> Result<LocalEnv> {
// home
let home_dir = get_home()?;
// check file exists
let cfg_path = home_dir.join(".zenith");
if !cfg_path.exists() {
let err_msg = format!(
"Zenith config is not found in {}. You need to run 'zenith init' first",
cfg_path.to_str().unwrap()
);
return Err(Box::<dyn error::Error>::from(err_msg));
}
// load and parse file
let config = fs::read_to_string(cfg_path)?;
match toml::from_str(config.as_str()) {
Ok(cfg) => Ok(cfg),
Err(e) => Err(Box::<dyn error::Error>::from(e)),
}
}
// local env for tests
pub fn test_env() -> LocalEnv {
let data_dir = Path::new(env!("CARGO_MANIFEST_DIR")).join("tmp_check");
fs::create_dir_all(data_dir.clone()).unwrap();
LocalEnv {
data_dir,
pg_distrib_dir: Path::new(env!("CARGO_MANIFEST_DIR")).join("../tmp_install"),
zenith_distrib_dir: cargo_bin_dir(),
pageserver: PageServerConf {
listen_address: "127.0.0.1:65200".parse().unwrap(),
},
}
}
// Find the directory where the binaries were put (i.e. target/debug/)
pub fn cargo_bin_dir() -> PathBuf {
let mut pathbuf = std::env::current_exe().ok().unwrap();
pathbuf.pop();
if pathbuf.ends_with("deps") {
pathbuf.pop();
}
return pathbuf;
}

1
integration_tests/.gitignore vendored Normal file
View File

@@ -0,0 +1 @@
tmp_check/

View File

@@ -14,3 +14,4 @@ tokio-postgres = { git = "https://github.com/kelvich/rust-postgres", branch = "r
pageserver = { path = "../pageserver" }
walkeeper = { path = "../walkeeper" }
control_plane = { path = "../control_plane" }

View File

@@ -1,8 +1,7 @@
#[allow(dead_code)]
mod control_plane;
// mod control_plane;
use control_plane::ComputeControlPlane;
use control_plane::StorageControlPlane;
use control_plane::TestStorageControlPlane;
// XXX: force all redo at the end
// -- restart + seqscan won't read deleted stuff
@@ -12,12 +11,12 @@ use control_plane::StorageControlPlane;
#[test]
fn test_redo_cases() {
// Start pageserver that reads WAL directly from that postgres
let storage_cplane = StorageControlPlane::one_page_server();
let mut compute_cplane = ComputeControlPlane::local(&storage_cplane);
let storage_cplane = TestStorageControlPlane::one_page_server();
let mut compute_cplane = ComputeControlPlane::local(&storage_cplane.pageserver);
// start postgres
let node = compute_cplane.new_node();
node.start(&storage_cplane);
node.start();
// check basic work with table
node.safe_psql(
@@ -49,14 +48,15 @@ fn test_redo_cases() {
// Runs pg_regress on a compute node
#[test]
#[ignore]
fn test_regress() {
// Start pageserver that reads WAL directly from that postgres
let storage_cplane = StorageControlPlane::one_page_server();
let mut compute_cplane = ComputeControlPlane::local(&storage_cplane);
let storage_cplane = TestStorageControlPlane::one_page_server();
let mut compute_cplane = ComputeControlPlane::local(&storage_cplane.pageserver);
// start postgres
let node = compute_cplane.new_node();
node.start(&storage_cplane);
node.start();
control_plane::regress_check(&node);
}
@@ -65,14 +65,14 @@ fn test_regress() {
#[test]
fn test_pageserver_multitenancy() {
// Start pageserver that reads WAL directly from that postgres
let storage_cplane = StorageControlPlane::one_page_server();
let mut compute_cplane = ComputeControlPlane::local(&storage_cplane);
let storage_cplane = TestStorageControlPlane::one_page_server();
let mut compute_cplane = ComputeControlPlane::local(&storage_cplane.pageserver);
// Allocate postgres instance, but don't start
let node1 = compute_cplane.new_node();
let node2 = compute_cplane.new_node();
node1.start(&storage_cplane);
node2.start(&storage_cplane);
node1.start();
node2.start();
// check node1
node1.safe_psql(

View File

@@ -1,8 +1,6 @@
// Restart acceptors one by one while compute is under the load.
#[allow(dead_code)]
mod control_plane;
use control_plane::ComputeControlPlane;
use control_plane::StorageControlPlane;
use control_plane::TestStorageControlPlane;
use rand::Rng;
use std::sync::Arc;
@@ -13,13 +11,13 @@ use std::{thread, time};
fn test_acceptors_normal_work() {
// Start pageserver that reads WAL directly from that postgres
const REDUNDANCY: usize = 3;
let storage_cplane = StorageControlPlane::fault_tolerant(REDUNDANCY);
let mut compute_cplane = ComputeControlPlane::local(&storage_cplane);
let storage_cplane = TestStorageControlPlane::fault_tolerant(REDUNDANCY);
let mut compute_cplane = ComputeControlPlane::local(&storage_cplane.pageserver);
let wal_acceptors = storage_cplane.get_wal_acceptor_conn_info();
// start postgre
// start postgres
let node = compute_cplane.new_master_node();
node.start(&storage_cplane);
node.start();
// start proxy
let _proxy = node.start_proxy(wal_acceptors);
@@ -50,14 +48,14 @@ fn test_acceptors_restarts() {
const REDUNDANCY: usize = 3;
const FAULT_PROBABILITY: f32 = 0.01;
let storage_cplane = StorageControlPlane::fault_tolerant(REDUNDANCY);
let mut compute_cplane = ComputeControlPlane::local(&storage_cplane);
let storage_cplane = TestStorageControlPlane::fault_tolerant(REDUNDANCY);
let mut compute_cplane = ComputeControlPlane::local(&storage_cplane.pageserver);
let wal_acceptors = storage_cplane.get_wal_acceptor_conn_info();
let mut rng = rand::thread_rng();
// start postgre
// start postgres
let node = compute_cplane.new_master_node();
node.start(&storage_cplane);
node.start();
// start proxy
let _proxy = node.start_proxy(wal_acceptors);
@@ -93,7 +91,7 @@ fn test_acceptors_restarts() {
assert_eq!(count, 500500);
}
fn start_acceptor(cplane: &Arc<StorageControlPlane>, no: usize) {
fn start_acceptor(cplane: &Arc<TestStorageControlPlane>, no: usize) {
let cp = cplane.clone();
thread::spawn(move || {
thread::sleep(time::Duration::from_secs(1));
@@ -109,13 +107,13 @@ fn test_acceptors_unavalability() {
// Start pageserver that reads WAL directly from that postgres
const REDUNDANCY: usize = 2;
let storage_cplane = StorageControlPlane::fault_tolerant(REDUNDANCY);
let mut compute_cplane = ComputeControlPlane::local(&storage_cplane);
let storage_cplane = TestStorageControlPlane::fault_tolerant(REDUNDANCY);
let mut compute_cplane = ComputeControlPlane::local(&storage_cplane.pageserver);
let wal_acceptors = storage_cplane.get_wal_acceptor_conn_info();
// start postgre
// start postgres
let node = compute_cplane.new_master_node();
node.start(&storage_cplane);
node.start();
// start proxy
let _proxy = node.start_proxy(wal_acceptors);
@@ -157,11 +155,11 @@ fn test_acceptors_unavalability() {
assert_eq!(count, 15);
}
fn simulate_failures(cplane: &Arc<StorageControlPlane>) {
fn simulate_failures(cplane: Arc<TestStorageControlPlane>) {
let mut rng = rand::thread_rng();
let n_acceptors = cplane.wal_acceptors.len();
let failure_period = time::Duration::from_secs(1);
loop {
while cplane.is_running() {
thread::sleep(failure_period);
let mask: u32 = rng.gen_range(0..(1 << n_acceptors));
for i in 0..n_acceptors {
@@ -184,13 +182,13 @@ fn test_race_conditions() {
// Start pageserver that reads WAL directly from that postgres
const REDUNDANCY: usize = 3;
let storage_cplane = StorageControlPlane::fault_tolerant(REDUNDANCY);
let mut compute_cplane = ComputeControlPlane::local(&storage_cplane);
let storage_cplane = Arc::new(TestStorageControlPlane::fault_tolerant(REDUNDANCY));
let mut compute_cplane = ComputeControlPlane::local(&storage_cplane.pageserver);
let wal_acceptors = storage_cplane.get_wal_acceptor_conn_info();
// start postgre
// start postgres
let node = compute_cplane.new_master_node();
node.start(&storage_cplane);
node.start();
// start proxy
let _proxy = node.start_proxy(wal_acceptors);
@@ -200,10 +198,10 @@ fn test_race_conditions() {
"postgres",
"CREATE TABLE t(key int primary key, value text)",
);
let cplane = Arc::new(storage_cplane);
let cp = cplane.clone();
thread::spawn(move || {
simulate_failures(&cp);
let cp = storage_cplane.clone();
let failures_thread = thread::spawn(move || {
simulate_failures(cp);
});
let mut psql = node.open_psql("postgres");
@@ -218,5 +216,7 @@ fn test_race_conditions() {
.get(0);
println!("sum = {}", count);
assert_eq!(count, 500500);
cplane.stop();
storage_cplane.stop();
failures_thread.join().unwrap();
}

11
zenith/Cargo.toml Normal file
View File

@@ -0,0 +1,11 @@
[package]
name = "zenith"
version = "0.1.0"
authors = ["Stas Kelvich <stas@zenith.tech>"]
edition = "2018"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[dependencies]
clap = "2.33.0"
control_plane = { path = "../control_plane" }

119
zenith/src/main.rs Normal file
View File

@@ -0,0 +1,119 @@
use clap::{App, SubCommand};
use std::fs;
use std::process::exit;
use std::process::Command;
use control_plane::local_env;
fn main() {
let matches = App::new("zenith")
.subcommand(SubCommand::with_name("init"))
.subcommand(SubCommand::with_name("start"))
.subcommand(SubCommand::with_name("stop"))
.subcommand(SubCommand::with_name("status"))
.subcommand(
SubCommand::with_name("pg")
.about("Manage postgres instances")
.subcommand(SubCommand::with_name("create"))
.subcommand(SubCommand::with_name("start"))
.subcommand(SubCommand::with_name("stop"))
.subcommand(SubCommand::with_name("destroy")),
)
.subcommand(
SubCommand::with_name("snapshot")
.about("Manage database snapshots")
.subcommand(SubCommand::with_name("create"))
.subcommand(SubCommand::with_name("start"))
.subcommand(SubCommand::with_name("stop"))
.subcommand(SubCommand::with_name("destroy")),
)
.get_matches();
// handle init separately and exit
if let Some("init") = matches.subcommand_name() {
match local_env::init() {
Ok(_) => {
println!("Initialization complete! You may start zenith with 'zenith start' now.");
exit(0);
}
Err(e) => {
eprintln!("Error during init: {}", e);
exit(1);
}
}
}
// all other commands would need config
let conf = match local_env::load_config() {
Ok(conf) => conf,
Err(e) => {
eprintln!("Error loading config from ~/.zenith: {}", e);
exit(1);
}
};
match matches.subcommand() {
("init", Some(_)) => {
panic!() /* init was handled before */
}
("start", Some(_sub_m)) => {
println!(
"Starting pageserver at '{}'",
conf.pageserver.listen_address
);
let status = Command::new(conf.zenith_distrib_dir.join("pageserver"))
.args(&["-D", conf.data_dir.to_str().unwrap()])
.args(&["-l", conf.pageserver.listen_address.to_string().as_str()])
.arg("-d")
.arg("--skip-recovery")
.env_clear()
.env("PATH", conf.pg_bin_dir().to_str().unwrap()) // pageserver needs postres-wal-redo binary
.env("LD_LIBRARY_PATH", conf.pg_lib_dir().to_str().unwrap())
.status()
.expect("failed to start pageserver");
if !status.success() {
eprintln!(
"Pageserver failed to start. See '{}' for details.",
conf.pageserver_log().to_str().unwrap()
);
exit(1);
}
// TODO: check it's actually started, or run status
println!("Done!");
}
("stop", Some(_sub_m)) => {
let pid = fs::read_to_string(conf.pageserver_pidfile()).unwrap();
let status = Command::new("kill")
.arg(pid)
.env_clear()
.status()
.expect("failed to execute kill");
if !status.success() {
eprintln!("Failed to kill pageserver");
exit(1);
}
println!("Done!");
}
("status", Some(_sub_m)) => {}
("pg", Some(pg_match)) => {
match pg_match.subcommand() {
("start", Some(_sub_m)) => {
println!("xxx: pg start");
// Ok(())
}
_ => {}
}
}
_ => {}
}
}