mirror of
https://github.com/neondatabase/neon.git
synced 2025-12-24 22:59:59 +00:00
Refactor CLI and CLI<->pageserver interfaces to support remote pageserver
This patch started as an effort to support CLI working against remote pageserver, but turned into a pretty big refactoring. * CLI now does not look into repository files directly. New commands 'branch_create' and 'identify_system' were introduced into page_service to support that. * Branch management that was scattered between local_env and zenith/main.rs is moved into pageserver/branches.rs. That code could better fit in Repository/Timeline impl, but I'll leave that for a different patch. * All tests-related code from local_env went into integration_tests/src/lib.rs as an extension to PostgresNode trait. * Paths-generating functions were concentrated around corresponding config types (LocalEnv and PageserverConf).
This commit is contained in:
@@ -141,7 +141,7 @@ jobs:
|
||||
working_directory: test_runner
|
||||
environment:
|
||||
- ZENITH_BIN: /tmp/zenith/bin
|
||||
- POSTGRES_BIN: /tmp/zenith/pg_install
|
||||
- POSTGRES_DISTRIB_DIR: /tmp/zenith/pg_install
|
||||
- TEST_OUTPUT: /tmp/test_output
|
||||
command: |
|
||||
TEST_FILE="<< parameters.test_file >>"
|
||||
|
||||
12
Cargo.lock
generated
12
Cargo.lock
generated
@@ -263,8 +263,6 @@ version = "0.1.0"
|
||||
dependencies = [
|
||||
"anyhow",
|
||||
"bytes",
|
||||
"fs_extra",
|
||||
"hex",
|
||||
"lazy_static",
|
||||
"nix",
|
||||
"pageserver",
|
||||
@@ -273,9 +271,10 @@ dependencies = [
|
||||
"rand",
|
||||
"regex",
|
||||
"serde",
|
||||
"serde_json",
|
||||
"tar",
|
||||
"thiserror",
|
||||
"toml",
|
||||
"url",
|
||||
"walkeeper",
|
||||
"workspace_hack",
|
||||
"zenith_utils",
|
||||
@@ -788,8 +787,10 @@ dependencies = [
|
||||
name = "integration_tests"
|
||||
version = "0.1.0"
|
||||
dependencies = [
|
||||
"anyhow",
|
||||
"control_plane",
|
||||
"lazy_static",
|
||||
"nix",
|
||||
"pageserver",
|
||||
"postgres",
|
||||
"rand",
|
||||
@@ -1169,6 +1170,7 @@ dependencies = [
|
||||
"clap",
|
||||
"crc32c",
|
||||
"daemonize",
|
||||
"fs_extra",
|
||||
"futures",
|
||||
"hex",
|
||||
"lazy_static",
|
||||
@@ -2190,9 +2192,9 @@ checksum = "8ccb82d61f80a663efe1f787a51b16b5a51e3314d6ac365b08639f52387b33f3"
|
||||
|
||||
[[package]]
|
||||
name = "url"
|
||||
version = "2.2.1"
|
||||
version = "2.2.2"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "9ccd964113622c8e9322cfac19eb1004a07e636c545f325da085d5cdde6f1f8b"
|
||||
checksum = "a507c383b2d33b5fc35d1861e77e6b383d158b2da5e14fe51b83dfedf6fd578c"
|
||||
dependencies = [
|
||||
"form_urlencoded",
|
||||
"idna",
|
||||
|
||||
@@ -11,15 +11,17 @@ rand = "0.8.3"
|
||||
tar = "0.4.33"
|
||||
postgres = { git = "https://github.com/zenithdb/rust-postgres.git", rev="9eb0dbfbeb6a6c1b79099b9f7ae4a8c021877858" }
|
||||
serde = { version = "1.0", features = ["derive"] }
|
||||
serde_json = "1"
|
||||
toml = "0.5"
|
||||
lazy_static = "1.4"
|
||||
regex = "1"
|
||||
anyhow = "1.0"
|
||||
hex = "0.4.3"
|
||||
# hex = "0.4.3"
|
||||
bytes = "1.0.1"
|
||||
fs_extra = "1.2.0"
|
||||
# fs_extra = "1.2.0"
|
||||
nix = "0.20"
|
||||
thiserror = "1"
|
||||
# thiserror = "1"
|
||||
url = "2.2.2"
|
||||
|
||||
pageserver = { path = "../pageserver" }
|
||||
walkeeper = { path = "../walkeeper" }
|
||||
|
||||
@@ -1,23 +1,24 @@
|
||||
use std::fs::{self, File, OpenOptions};
|
||||
use std::io::{Read, Write};
|
||||
use std::io::Write;
|
||||
use std::net::SocketAddr;
|
||||
use std::net::TcpStream;
|
||||
use std::os::unix::fs::PermissionsExt;
|
||||
use std::path::Path;
|
||||
use std::process::{Command, ExitStatus};
|
||||
use std::process::Command;
|
||||
use std::sync::Arc;
|
||||
use std::time::Duration;
|
||||
use std::{collections::BTreeMap, path::PathBuf};
|
||||
use std::{
|
||||
fs::{self, OpenOptions},
|
||||
io::Read,
|
||||
};
|
||||
|
||||
use anyhow::{Context, Result};
|
||||
use lazy_static::lazy_static;
|
||||
use regex::Regex;
|
||||
|
||||
use postgres::{Client, NoTls};
|
||||
|
||||
use crate::local_env::LocalEnv;
|
||||
use crate::storage::{PageServerNode, WalProposerNode};
|
||||
use pageserver::{zenith_repo_dir, ZTimelineId};
|
||||
use pageserver::ZTimelineId;
|
||||
|
||||
use crate::storage::PageServerNode;
|
||||
|
||||
//
|
||||
// ComputeControlPlane
|
||||
@@ -36,8 +37,8 @@ impl ComputeControlPlane {
|
||||
// it is running on default port. Change that when pageserver will have config.
|
||||
let pageserver = Arc::new(PageServerNode::from_env(&env));
|
||||
|
||||
let pgdatadirspath = env.repo_path.join("pgdatadirs");
|
||||
let nodes: Result<BTreeMap<_, _>> = fs::read_dir(&pgdatadirspath)
|
||||
let pgdatadirspath = &env.pg_data_dirs_path();
|
||||
let nodes: Result<BTreeMap<_, _>> = fs::read_dir(pgdatadirspath)
|
||||
.with_context(|| format!("failed to list {}", pgdatadirspath.display()))?
|
||||
.into_iter()
|
||||
.map(|f| {
|
||||
@@ -97,8 +98,14 @@ impl ComputeControlPlane {
|
||||
Ok(node)
|
||||
}
|
||||
|
||||
pub fn new_test_node(&mut self, timelineid: ZTimelineId) -> Arc<PostgresNode> {
|
||||
let node = self.new_from_page_server(true, timelineid);
|
||||
pub fn new_test_node(&mut self, branch_name: &str) -> Arc<PostgresNode> {
|
||||
let timeline_id = self
|
||||
.pageserver
|
||||
.branch_get_by_name(branch_name)
|
||||
.expect("failed to get timeline_id")
|
||||
.timeline_id;
|
||||
|
||||
let node = self.new_from_page_server(true, timeline_id);
|
||||
let node = node.unwrap();
|
||||
|
||||
// Configure the node to stream WAL directly to the pageserver
|
||||
@@ -115,8 +122,14 @@ impl ComputeControlPlane {
|
||||
node
|
||||
}
|
||||
|
||||
pub fn new_test_master_node(&mut self, timelineid: ZTimelineId) -> Arc<PostgresNode> {
|
||||
let node = self.new_from_page_server(true, timelineid).unwrap();
|
||||
pub fn new_test_master_node(&mut self, branch_name: &str) -> Arc<PostgresNode> {
|
||||
let timeline_id = self
|
||||
.pageserver
|
||||
.branch_get_by_name(branch_name)
|
||||
.expect("failed to get timeline_id")
|
||||
.timeline_id;
|
||||
|
||||
let node = self.new_from_page_server(true, timeline_id).unwrap();
|
||||
|
||||
node.append_conf(
|
||||
"postgresql.conf",
|
||||
@@ -126,8 +139,14 @@ impl ComputeControlPlane {
|
||||
node
|
||||
}
|
||||
|
||||
pub fn new_node(&mut self, timelineid: ZTimelineId) -> Result<Arc<PostgresNode>> {
|
||||
let node = self.new_from_page_server(false, timelineid).unwrap();
|
||||
pub fn new_node(&mut self, branch_name: &str) -> Result<Arc<PostgresNode>> {
|
||||
let timeline_id = self
|
||||
.pageserver
|
||||
.branch_get_by_name(branch_name)
|
||||
.expect("failed to get timeline_id")
|
||||
.timeline_id;
|
||||
|
||||
let node = self.new_from_page_server(false, timeline_id).unwrap();
|
||||
|
||||
// Configure the node to stream WAL directly to the pageserver
|
||||
node.append_conf(
|
||||
@@ -291,9 +310,9 @@ impl PostgresNode {
|
||||
max_replication_slots = 10\n\
|
||||
hot_standby = on\n\
|
||||
shared_buffers = 1MB\n\
|
||||
fsync = off\n\
|
||||
fsync = off\n\
|
||||
max_connections = 100\n\
|
||||
wal_sender_timeout = 0\n\
|
||||
wal_sender_timeout = 0\n\
|
||||
wal_level = replica\n\
|
||||
listen_addresses = '{address}'\n\
|
||||
port = {port}\n",
|
||||
@@ -326,8 +345,8 @@ impl PostgresNode {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn pgdata(&self) -> PathBuf {
|
||||
self.env.repo_path.join("pgdatadirs").join(&self.name)
|
||||
pub fn pgdata(&self) -> PathBuf {
|
||||
self.env.pg_data_dir(&self.name)
|
||||
}
|
||||
|
||||
pub fn status(&self) -> &str {
|
||||
@@ -413,152 +432,6 @@ impl PostgresNode {
|
||||
|
||||
String::from_utf8(output.stdout).unwrap().trim().to_string()
|
||||
}
|
||||
|
||||
fn dump_log_file(&self) {
|
||||
if let Ok(mut file) = File::open(self.env.repo_path.join("pageserver.log")) {
|
||||
let mut buffer = String::new();
|
||||
file.read_to_string(&mut buffer).unwrap();
|
||||
println!("--------------- pageserver.log:\n{}", buffer);
|
||||
}
|
||||
}
|
||||
|
||||
pub fn safe_psql(&self, db: &str, sql: &str) -> Vec<postgres::Row> {
|
||||
let connstring = format!(
|
||||
"host={} port={} dbname={} user={}",
|
||||
self.address.ip(),
|
||||
self.address.port(),
|
||||
db,
|
||||
self.whoami()
|
||||
);
|
||||
let mut client = Client::connect(connstring.as_str(), NoTls).unwrap();
|
||||
|
||||
println!("Running {}", sql);
|
||||
let result = client.query(sql, &[]);
|
||||
if result.is_err() {
|
||||
self.dump_log_file();
|
||||
}
|
||||
result.unwrap()
|
||||
}
|
||||
|
||||
pub fn open_psql(&self, db: &str) -> Client {
|
||||
let connstring = format!(
|
||||
"host={} port={} dbname={} user={}",
|
||||
self.address.ip(),
|
||||
self.address.port(),
|
||||
db,
|
||||
self.whoami()
|
||||
);
|
||||
Client::connect(connstring.as_str(), NoTls).unwrap()
|
||||
}
|
||||
|
||||
pub fn start_proxy(&self, wal_acceptors: &str) -> WalProposerNode {
|
||||
let proxy_path = self.env.pg_bin_dir().join("safekeeper_proxy");
|
||||
match Command::new(proxy_path.as_path())
|
||||
.args(&["--ztimelineid", &self.timelineid.to_string()])
|
||||
.args(&["-s", wal_acceptors])
|
||||
.args(&["-h", &self.address.ip().to_string()])
|
||||
.args(&["-p", &self.address.port().to_string()])
|
||||
.arg("-v")
|
||||
.stderr(
|
||||
OpenOptions::new()
|
||||
.create(true)
|
||||
.append(true)
|
||||
.open(self.pgdata().join("safekeeper_proxy.log"))
|
||||
.unwrap(),
|
||||
)
|
||||
.spawn()
|
||||
{
|
||||
Ok(child) => WalProposerNode { pid: child.id() },
|
||||
Err(e) => panic!("Failed to launch {:?}: {}", proxy_path, e),
|
||||
}
|
||||
}
|
||||
|
||||
pub fn pg_regress(&self) -> ExitStatus {
|
||||
self.safe_psql("postgres", "CREATE DATABASE regression");
|
||||
let data_dir = zenith_repo_dir();
|
||||
let regress_run_path = data_dir.join("regress");
|
||||
fs::create_dir_all(®ress_run_path).unwrap();
|
||||
fs::create_dir_all(regress_run_path.join("testtablespace")).unwrap();
|
||||
std::env::set_current_dir(regress_run_path).unwrap();
|
||||
|
||||
let regress_build_path =
|
||||
Path::new(env!("CARGO_MANIFEST_DIR")).join("../tmp_install/build/src/test/regress");
|
||||
let regress_src_path =
|
||||
Path::new(env!("CARGO_MANIFEST_DIR")).join("../vendor/postgres/src/test/regress");
|
||||
|
||||
let regress_check = Command::new(regress_build_path.join("pg_regress"))
|
||||
.args(&[
|
||||
"--bindir=''",
|
||||
"--use-existing",
|
||||
format!("--bindir={}", self.env.pg_bin_dir().to_str().unwrap()).as_str(),
|
||||
format!("--dlpath={}", regress_build_path.to_str().unwrap()).as_str(),
|
||||
format!(
|
||||
"--schedule={}",
|
||||
regress_src_path.join("parallel_schedule").to_str().unwrap()
|
||||
)
|
||||
.as_str(),
|
||||
format!("--inputdir={}", regress_src_path.to_str().unwrap()).as_str(),
|
||||
])
|
||||
.env_clear()
|
||||
.env("LD_LIBRARY_PATH", self.env.pg_lib_dir().to_str().unwrap())
|
||||
.env("DYLD_LIBRARY_PATH", self.env.pg_lib_dir().to_str().unwrap())
|
||||
.env("PGPORT", self.address.port().to_string())
|
||||
.env("PGUSER", self.whoami())
|
||||
.env("PGHOST", self.address.ip().to_string())
|
||||
.status()
|
||||
.expect("pg_regress failed");
|
||||
if !regress_check.success() {
|
||||
if let Ok(mut file) = File::open("regression.diffs") {
|
||||
let mut buffer = String::new();
|
||||
file.read_to_string(&mut buffer).unwrap();
|
||||
println!("--------------- regression.diffs:\n{}", buffer);
|
||||
}
|
||||
self.dump_log_file();
|
||||
if let Ok(mut file) = File::open(
|
||||
self.env
|
||||
.repo_path
|
||||
.join("pgdatadirs")
|
||||
.join("pg1")
|
||||
.join("log"),
|
||||
) {
|
||||
let mut buffer = String::new();
|
||||
file.read_to_string(&mut buffer).unwrap();
|
||||
println!("--------------- pgdatadirs/pg1/log:\n{}", buffer);
|
||||
}
|
||||
}
|
||||
regress_check
|
||||
}
|
||||
|
||||
pub fn pg_bench(&self, clients: u32, seconds: u32) -> ExitStatus {
|
||||
let port = self.address.port().to_string();
|
||||
let clients = clients.to_string();
|
||||
let seconds = seconds.to_string();
|
||||
let _pg_bench_init = Command::new(self.env.pg_bin_dir().join("pgbench"))
|
||||
.args(&["-i", "-p", port.as_str(), "postgres"])
|
||||
.env("LD_LIBRARY_PATH", self.env.pg_lib_dir().to_str().unwrap())
|
||||
.env("DYLD_LIBRARY_PATH", self.env.pg_lib_dir().to_str().unwrap())
|
||||
.status()
|
||||
.expect("pgbench -i");
|
||||
let pg_bench_run = Command::new(self.env.pg_bin_dir().join("pgbench"))
|
||||
.args(&[
|
||||
"-p",
|
||||
port.as_str(),
|
||||
"-T",
|
||||
seconds.as_str(),
|
||||
"-P",
|
||||
"1",
|
||||
"-c",
|
||||
clients.as_str(),
|
||||
"-M",
|
||||
"prepared",
|
||||
"postgres",
|
||||
])
|
||||
.env("LD_LIBRARY_PATH", self.env.pg_lib_dir().to_str().unwrap())
|
||||
.env("DYLD_LIBRARY_PATH", self.env.pg_lib_dir().to_str().unwrap())
|
||||
.status()
|
||||
.expect("pgbench run");
|
||||
pg_bench_run
|
||||
}
|
||||
}
|
||||
|
||||
impl Drop for PostgresNode {
|
||||
|
||||
@@ -6,7 +6,26 @@
|
||||
// Intended to be used in integration tests and in CLI tools for
|
||||
// local installations.
|
||||
//
|
||||
use anyhow::{anyhow, bail, Context, Result};
|
||||
use std::fs;
|
||||
use std::path::Path;
|
||||
|
||||
pub mod compute;
|
||||
pub mod local_env;
|
||||
pub mod storage;
|
||||
|
||||
/// Read a PID file
|
||||
///
|
||||
/// We expect a file that contains a single integer.
|
||||
/// We return an i32 for compatibility with libc and nix.
|
||||
pub fn read_pidfile(pidfile: &Path) -> Result<i32> {
|
||||
let pid_str = fs::read_to_string(pidfile)
|
||||
.with_context(|| format!("failed to read pidfile {:?}", pidfile))?;
|
||||
let pid: i32 = pid_str
|
||||
.parse()
|
||||
.map_err(|_| anyhow!("failed to parse pidfile {:?}", pidfile))?;
|
||||
if pid < 1 {
|
||||
bail!("pidfile {:?} contained bad value '{}'", pidfile, pid);
|
||||
}
|
||||
Ok(pid)
|
||||
}
|
||||
|
||||
@@ -4,37 +4,23 @@
|
||||
// Now it also provides init method which acts like a stub for proper installation
|
||||
// script which will use local paths.
|
||||
//
|
||||
use anyhow::Context;
|
||||
use bytes::Bytes;
|
||||
use rand::Rng;
|
||||
use anyhow::{anyhow, Result};
|
||||
use serde::{Deserialize, Serialize};
|
||||
use std::env;
|
||||
use std::fs;
|
||||
use std::fs::File;
|
||||
use std::io::Read;
|
||||
use std::path::{Path, PathBuf};
|
||||
use std::process::{Command, Stdio};
|
||||
|
||||
use anyhow::Result;
|
||||
use serde::{Deserialize, Serialize};
|
||||
|
||||
use pageserver::zenith_repo_dir;
|
||||
use pageserver::ZTimelineId;
|
||||
use postgres_ffi::xlog_utils;
|
||||
use zenith_utils::lsn::Lsn;
|
||||
use std::path::PathBuf;
|
||||
use url::Url;
|
||||
|
||||
//
|
||||
// This data structure represents deserialized zenith config, which should be
|
||||
// located in ~/.zenith
|
||||
//
|
||||
// TODO: should we also support ZENITH_CONF env var?
|
||||
// This data structures represent deserialized zenith CLI config
|
||||
//
|
||||
#[derive(Serialize, Deserialize, Clone)]
|
||||
pub struct LocalEnv {
|
||||
// Path to the Repository. Here page server and compute nodes will create and store their data.
|
||||
pub repo_path: PathBuf,
|
||||
// Pageserver connection strings
|
||||
pub pageserver_connstring: String,
|
||||
|
||||
// System identifier, from the PostgreSQL control file
|
||||
pub systemid: u64,
|
||||
// Base directory for both pageserver and compute nodes
|
||||
pub base_data_dir: PathBuf,
|
||||
|
||||
// Path to postgres distribution. It's expected that "bin", "include",
|
||||
// "lib", "share" from postgres distribution are there. If at some point
|
||||
@@ -42,38 +28,66 @@ pub struct LocalEnv {
|
||||
// to four separate paths and match OS-specific installation layout.
|
||||
pub pg_distrib_dir: PathBuf,
|
||||
|
||||
// Path to pageserver binary.
|
||||
pub zenith_distrib_dir: PathBuf,
|
||||
// Path to pageserver binary. Empty for remote pageserver.
|
||||
pub zenith_distrib_dir: Option<PathBuf>,
|
||||
}
|
||||
|
||||
impl LocalEnv {
|
||||
// postgres installation
|
||||
// postgres installation paths
|
||||
pub fn pg_bin_dir(&self) -> PathBuf {
|
||||
self.pg_distrib_dir.join("bin")
|
||||
}
|
||||
pub fn pg_lib_dir(&self) -> PathBuf {
|
||||
self.pg_distrib_dir.join("lib")
|
||||
}
|
||||
|
||||
pub fn pageserver_bin(&self) -> Result<PathBuf> {
|
||||
Ok(self
|
||||
.zenith_distrib_dir
|
||||
.as_ref()
|
||||
.ok_or(anyhow!("Can not manage remote pageserver"))?
|
||||
.join("pageserver"))
|
||||
}
|
||||
|
||||
pub fn pg_data_dirs_path(&self) -> PathBuf {
|
||||
self.base_data_dir.join("pgdatadirs")
|
||||
}
|
||||
|
||||
pub fn pg_data_dir(&self, name: &str) -> PathBuf {
|
||||
self.pg_data_dirs_path().join(name)
|
||||
}
|
||||
|
||||
// TODO: move pageserver files into ./pageserver
|
||||
pub fn pageserver_data_dir(&self) -> PathBuf {
|
||||
self.base_data_dir.clone()
|
||||
}
|
||||
}
|
||||
|
||||
fn base_path() -> PathBuf {
|
||||
match std::env::var_os("ZENITH_REPO_DIR") {
|
||||
Some(val) => PathBuf::from(val.to_str().unwrap()),
|
||||
None => ".zenith".into(),
|
||||
}
|
||||
}
|
||||
|
||||
//
|
||||
// Initialize a new Zenith repository
|
||||
//
|
||||
pub fn init() -> Result<()> {
|
||||
pub fn init(remote_pageserver: Option<&str>) -> Result<()> {
|
||||
// check if config already exists
|
||||
let repo_path = zenith_repo_dir();
|
||||
if repo_path.exists() {
|
||||
let base_path = base_path();
|
||||
if base_path.exists() {
|
||||
anyhow::bail!(
|
||||
"{} already exists. Perhaps already initialized?",
|
||||
repo_path.to_str().unwrap()
|
||||
base_path.to_str().unwrap()
|
||||
);
|
||||
}
|
||||
|
||||
// ok, now check that expected binaries are present
|
||||
|
||||
// Find postgres binaries. Follow POSTGRES_BIN if set, otherwise look in "tmp_install".
|
||||
// Find postgres binaries. Follow POSTGRES_DISTRIB_DIR if set, otherwise look in "tmp_install".
|
||||
let pg_distrib_dir: PathBuf = {
|
||||
if let Some(postgres_bin) = env::var_os("POSTGRES_BIN") {
|
||||
if let Some(postgres_bin) = env::var_os("POSTGRES_DISTRIB_DIR") {
|
||||
postgres_bin.into()
|
||||
} else {
|
||||
let cwd = env::current_dir()?;
|
||||
@@ -84,137 +98,45 @@ pub fn init() -> Result<()> {
|
||||
anyhow::bail!("Can't find postgres binary at {:?}", pg_distrib_dir);
|
||||
}
|
||||
|
||||
// Find zenith binaries.
|
||||
let zenith_distrib_dir = env::current_exe()?.parent().unwrap().to_owned();
|
||||
if !zenith_distrib_dir.join("pageserver").exists() {
|
||||
anyhow::bail!("Can't find pageserver binary.",);
|
||||
}
|
||||
fs::create_dir(&base_path)?;
|
||||
fs::create_dir(base_path.join("pgdatadirs"))?;
|
||||
|
||||
// ok, we are good to go
|
||||
let mut conf = LocalEnv {
|
||||
repo_path,
|
||||
pg_distrib_dir,
|
||||
zenith_distrib_dir,
|
||||
systemid: 0,
|
||||
let conf = if let Some(addr) = remote_pageserver {
|
||||
// check that addr is parsable
|
||||
let _uri = Url::parse(addr)
|
||||
.map_err(|e| anyhow!("{}: {}", addr, e))?;
|
||||
|
||||
LocalEnv {
|
||||
pageserver_connstring: format!("postgresql://{}/", addr),
|
||||
pg_distrib_dir,
|
||||
zenith_distrib_dir: None,
|
||||
base_data_dir: base_path,
|
||||
}
|
||||
} else {
|
||||
// Find zenith binaries.
|
||||
let zenith_distrib_dir = env::current_exe()?.parent().unwrap().to_owned();
|
||||
if !zenith_distrib_dir.join("pageserver").exists() {
|
||||
anyhow::bail!("Can't find pageserver binary.",);
|
||||
}
|
||||
|
||||
LocalEnv {
|
||||
pageserver_connstring: "postgresql://127.0.0.1:6400".to_string(),
|
||||
pg_distrib_dir,
|
||||
zenith_distrib_dir: Some(zenith_distrib_dir),
|
||||
base_data_dir: base_path,
|
||||
}
|
||||
};
|
||||
init_repo(&mut conf)?;
|
||||
|
||||
let toml = toml::to_string(&conf)?;
|
||||
fs::write(conf.base_data_dir.join("config"), toml)?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub fn init_repo(local_env: &mut LocalEnv) -> Result<()> {
|
||||
let repopath = &local_env.repo_path;
|
||||
fs::create_dir(&repopath)
|
||||
.with_context(|| format!("could not create directory {}", repopath.display()))?;
|
||||
fs::create_dir(repopath.join("pgdatadirs"))?;
|
||||
fs::create_dir(repopath.join("timelines"))?;
|
||||
fs::create_dir(repopath.join("refs"))?;
|
||||
fs::create_dir(repopath.join("refs").join("branches"))?;
|
||||
fs::create_dir(repopath.join("refs").join("tags"))?;
|
||||
println!("created directory structure in {}", repopath.display());
|
||||
// Locate and load config
|
||||
pub fn load_config() -> Result<LocalEnv> {
|
||||
let repopath = base_path();
|
||||
|
||||
// Create initial timeline
|
||||
let tli = create_timeline(&local_env, None)?;
|
||||
let timelinedir = repopath.join("timelines").join(tli.to_string());
|
||||
println!("created initial timeline {}", timelinedir.display());
|
||||
|
||||
// Run initdb
|
||||
//
|
||||
// We create the cluster temporarily in a "tmp" directory inside the repository,
|
||||
// and move it to the right location from there.
|
||||
let tmppath = repopath.join("tmp");
|
||||
|
||||
let initdb_path = local_env.pg_bin_dir().join("initdb");
|
||||
let initdb = Command::new(initdb_path)
|
||||
.args(&["-D", tmppath.to_str().unwrap()])
|
||||
.arg("--no-instructions")
|
||||
.env_clear()
|
||||
.env("LD_LIBRARY_PATH", local_env.pg_lib_dir().to_str().unwrap())
|
||||
.env(
|
||||
"DYLD_LIBRARY_PATH",
|
||||
local_env.pg_lib_dir().to_str().unwrap(),
|
||||
)
|
||||
.stdout(Stdio::null())
|
||||
.status()
|
||||
.with_context(|| "failed to execute initdb")?;
|
||||
if !initdb.success() {
|
||||
anyhow::bail!("initdb failed");
|
||||
}
|
||||
println!("initdb succeeded");
|
||||
|
||||
// Read control file to extract the LSN and system id
|
||||
let controlfile_path = tmppath.join("global").join("pg_control");
|
||||
let controlfile = postgres_ffi::decode_pg_control(Bytes::from(fs::read(controlfile_path)?))?;
|
||||
let systemid = controlfile.system_identifier;
|
||||
let lsn = controlfile.checkPoint;
|
||||
let lsnstr = format!("{:016X}", lsn);
|
||||
|
||||
// Move the initial WAL file
|
||||
fs::rename(
|
||||
tmppath.join("pg_wal").join("000000010000000000000001"),
|
||||
timelinedir
|
||||
.join("wal")
|
||||
.join("000000010000000000000001.partial"),
|
||||
)?;
|
||||
println!("moved initial WAL file");
|
||||
|
||||
// Remove pg_wal
|
||||
fs::remove_dir_all(tmppath.join("pg_wal"))?;
|
||||
|
||||
force_crash_recovery(&tmppath)?;
|
||||
println!("updated pg_control");
|
||||
|
||||
let target = timelinedir.join("snapshots").join(&lsnstr);
|
||||
fs::rename(tmppath, &target)?;
|
||||
println!("moved 'tmp' to {}", target.display());
|
||||
|
||||
// Create 'main' branch to refer to the initial timeline
|
||||
let data = tli.to_string();
|
||||
fs::write(repopath.join("refs").join("branches").join("main"), data)?;
|
||||
println!("created main branch");
|
||||
|
||||
// Also update the system id in the LocalEnv
|
||||
local_env.systemid = systemid;
|
||||
|
||||
// write config
|
||||
let toml = toml::to_string(&local_env)?;
|
||||
fs::write(repopath.join("config"), toml)?;
|
||||
|
||||
println!(
|
||||
"new zenith repository was created in {}",
|
||||
repopath.display()
|
||||
);
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
// If control file says the cluster was shut down cleanly, modify it, to mark
|
||||
// it as crashed. That forces crash recovery when you start the cluster.
|
||||
//
|
||||
// FIXME:
|
||||
// We currently do this to the initial snapshot in "zenith init". It would
|
||||
// be more natural to do this when the snapshot is restored instead, but we
|
||||
// currently don't have any code to create new snapshots, so it doesn't matter
|
||||
// Or better yet, use a less hacky way of putting the cluster into recovery.
|
||||
// Perhaps create a backup label file in the data directory when it's restored.
|
||||
fn force_crash_recovery(datadir: &Path) -> Result<()> {
|
||||
// Read in the control file
|
||||
let controlfilepath = datadir.to_path_buf().join("global").join("pg_control");
|
||||
let mut controlfile =
|
||||
postgres_ffi::decode_pg_control(Bytes::from(fs::read(controlfilepath.as_path())?))?;
|
||||
|
||||
controlfile.state = postgres_ffi::DBState_DB_IN_PRODUCTION;
|
||||
|
||||
fs::write(
|
||||
controlfilepath.as_path(),
|
||||
postgres_ffi::encode_pg_control(controlfile),
|
||||
)?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
// check that config file is present
|
||||
pub fn load_config(repopath: &Path) -> Result<LocalEnv> {
|
||||
if !repopath.exists() {
|
||||
anyhow::bail!(
|
||||
"Zenith config is not found in {}. You need to run 'zenith init' first",
|
||||
@@ -222,32 +144,13 @@ pub fn load_config(repopath: &Path) -> Result<LocalEnv> {
|
||||
);
|
||||
}
|
||||
|
||||
// TODO: check that it looks like a zenith repository
|
||||
|
||||
// load and parse file
|
||||
let config = fs::read_to_string(repopath.join("config"))?;
|
||||
toml::from_str(config.as_str()).map_err(|e| e.into())
|
||||
}
|
||||
|
||||
// local env for tests
|
||||
pub fn test_env(testname: &str) -> LocalEnv {
|
||||
fs::create_dir_all("../tmp_check").expect("could not create directory ../tmp_check");
|
||||
|
||||
let repo_path = Path::new(env!("CARGO_MANIFEST_DIR"))
|
||||
.join("../tmp_check/")
|
||||
.join(testname);
|
||||
|
||||
// Remove remnants of old test repo
|
||||
let _ = fs::remove_dir_all(&repo_path);
|
||||
|
||||
let mut local_env = LocalEnv {
|
||||
repo_path,
|
||||
pg_distrib_dir: Path::new(env!("CARGO_MANIFEST_DIR")).join("../tmp_install"),
|
||||
zenith_distrib_dir: cargo_bin_dir(),
|
||||
systemid: 0,
|
||||
};
|
||||
init_repo(&mut local_env).expect("could not initialize zenith repository");
|
||||
local_env
|
||||
}
|
||||
|
||||
// Find the directory where the binaries were put (i.e. target/debug/)
|
||||
pub fn cargo_bin_dir() -> PathBuf {
|
||||
let mut pathbuf = std::env::current_exe().unwrap();
|
||||
@@ -259,155 +162,3 @@ pub fn cargo_bin_dir() -> PathBuf {
|
||||
|
||||
pathbuf
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, Copy)]
|
||||
pub struct PointInTime {
|
||||
pub timelineid: ZTimelineId,
|
||||
pub lsn: Lsn,
|
||||
}
|
||||
|
||||
fn create_timeline(local_env: &LocalEnv, ancestor: Option<PointInTime>) -> Result<ZTimelineId> {
|
||||
let repopath = &local_env.repo_path;
|
||||
|
||||
// Create initial timeline
|
||||
let mut tli_buf = [0u8; 16];
|
||||
rand::thread_rng().fill(&mut tli_buf);
|
||||
let timelineid = ZTimelineId::from(tli_buf);
|
||||
|
||||
let timelinedir = repopath.join("timelines").join(timelineid.to_string());
|
||||
|
||||
fs::create_dir(&timelinedir)?;
|
||||
fs::create_dir(&timelinedir.join("snapshots"))?;
|
||||
fs::create_dir(&timelinedir.join("wal"))?;
|
||||
|
||||
if let Some(ancestor) = ancestor {
|
||||
let data = format!("{}@{}", ancestor.timelineid, ancestor.lsn);
|
||||
fs::write(timelinedir.join("ancestor"), data)?;
|
||||
}
|
||||
|
||||
Ok(timelineid)
|
||||
}
|
||||
|
||||
// Create a new branch in the repository (for the "zenith branch" subcommand)
|
||||
pub fn create_branch(
|
||||
local_env: &LocalEnv,
|
||||
branchname: &str,
|
||||
startpoint: PointInTime,
|
||||
) -> Result<()> {
|
||||
let repopath = &local_env.repo_path;
|
||||
|
||||
// create a new timeline for it
|
||||
let newtli = create_timeline(local_env, Some(startpoint))?;
|
||||
let newtimelinedir = repopath.join("timelines").join(newtli.to_string());
|
||||
|
||||
let data = newtli.to_string();
|
||||
fs::write(
|
||||
repopath.join("refs").join("branches").join(branchname),
|
||||
data,
|
||||
)?;
|
||||
|
||||
// Copy the latest snapshot (TODO: before the startpoint) and all WAL
|
||||
// TODO: be smarter and avoid the copying...
|
||||
let (_maxsnapshot, oldsnapshotdir) = find_latest_snapshot(local_env, startpoint.timelineid)?;
|
||||
let copy_opts = fs_extra::dir::CopyOptions::new();
|
||||
fs_extra::dir::copy(oldsnapshotdir, newtimelinedir.join("snapshots"), ©_opts)?;
|
||||
|
||||
let oldtimelinedir = repopath
|
||||
.join("timelines")
|
||||
.join(startpoint.timelineid.to_string());
|
||||
copy_wal(
|
||||
&oldtimelinedir.join("wal"),
|
||||
&newtimelinedir.join("wal"),
|
||||
startpoint.lsn,
|
||||
16 * 1024 * 1024 // FIXME: assume default WAL segment size
|
||||
)?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
///
|
||||
/// Copy all WAL segments from one directory to another, up to given LSN.
|
||||
///
|
||||
/// If the given LSN is in the middle of a segment, the last segment containing it
|
||||
/// is written out as .partial, and padded with zeros.
|
||||
///
|
||||
fn copy_wal(src_dir: &Path, dst_dir: &Path, upto: Lsn, wal_seg_size: u64) -> Result<()>{
|
||||
|
||||
let last_segno = upto.segment_number(wal_seg_size);
|
||||
let last_segoff = upto.segment_offset(wal_seg_size);
|
||||
|
||||
for entry in fs::read_dir(src_dir).unwrap() {
|
||||
if let Ok(entry) = entry {
|
||||
let entry_name = entry.file_name();
|
||||
let fname = entry_name.to_str().unwrap();
|
||||
|
||||
// Check if the filename looks like an xlog file, or a .partial file.
|
||||
if !xlog_utils::IsXLogFileName(fname) && !xlog_utils::IsPartialXLogFileName(fname) {
|
||||
continue
|
||||
}
|
||||
let (segno, _tli) = xlog_utils::XLogFromFileName(fname, wal_seg_size as usize);
|
||||
|
||||
let copylen;
|
||||
let mut dst_fname = PathBuf::from(fname);
|
||||
if segno > last_segno {
|
||||
// future segment, skip
|
||||
continue;
|
||||
} else if segno < last_segno {
|
||||
copylen = wal_seg_size;
|
||||
dst_fname.set_extension("");
|
||||
} else {
|
||||
copylen = last_segoff;
|
||||
dst_fname.set_extension("partial");
|
||||
}
|
||||
|
||||
let src_file = File::open(entry.path())?;
|
||||
let mut dst_file = File::create(dst_dir.join(&dst_fname))?;
|
||||
std::io::copy(&mut src_file.take(copylen), &mut dst_file)?;
|
||||
|
||||
if copylen < wal_seg_size {
|
||||
std::io::copy(&mut std::io::repeat(0).take(wal_seg_size - copylen), &mut dst_file)?;
|
||||
}
|
||||
}
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
// Find the end of valid WAL in a wal directory
|
||||
pub fn find_end_of_wal(local_env: &LocalEnv, timeline: ZTimelineId) -> Result<Lsn> {
|
||||
let repopath = &local_env.repo_path;
|
||||
let waldir = repopath
|
||||
.join("timelines")
|
||||
.join(timeline.to_string())
|
||||
.join("wal");
|
||||
|
||||
let (lsn, _tli) = xlog_utils::find_end_of_wal(&waldir, 16 * 1024 * 1024, true);
|
||||
|
||||
Ok(Lsn(lsn))
|
||||
}
|
||||
|
||||
// Find the latest snapshot for a timeline
|
||||
fn find_latest_snapshot(local_env: &LocalEnv, timeline: ZTimelineId) -> Result<(Lsn, PathBuf)> {
|
||||
let repopath = &local_env.repo_path;
|
||||
|
||||
let snapshotsdir = repopath
|
||||
.join("timelines")
|
||||
.join(timeline.to_string())
|
||||
.join("snapshots");
|
||||
let paths = fs::read_dir(&snapshotsdir)?;
|
||||
let mut maxsnapshot = Lsn(0);
|
||||
let mut snapshotdir: Option<PathBuf> = None;
|
||||
for path in paths {
|
||||
let path = path?;
|
||||
let filename = path.file_name().to_str().unwrap().to_owned();
|
||||
if let Ok(lsn) = Lsn::from_hex(&filename) {
|
||||
maxsnapshot = std::cmp::max(lsn, maxsnapshot);
|
||||
snapshotdir = Some(path.path());
|
||||
}
|
||||
}
|
||||
if maxsnapshot == Lsn(0) {
|
||||
// TODO: check ancestor timeline
|
||||
anyhow::bail!("no snapshot found in {}", snapshotsdir.display());
|
||||
}
|
||||
|
||||
Ok((maxsnapshot, snapshotdir.unwrap()))
|
||||
}
|
||||
|
||||
@@ -1,123 +1,18 @@
|
||||
use anyhow::{anyhow, bail, Context, Result};
|
||||
use nix::sys::signal::{kill, Signal};
|
||||
use nix::unistd::Pid;
|
||||
use std::convert::TryInto;
|
||||
use std::fs;
|
||||
use std::net::{SocketAddr, TcpStream};
|
||||
use std::path::{Path, PathBuf};
|
||||
use std::path::PathBuf;
|
||||
use std::process::Command;
|
||||
use std::str::FromStr;
|
||||
use std::sync::atomic::{AtomicBool, Ordering};
|
||||
use std::sync::Arc;
|
||||
use std::thread;
|
||||
use std::time::Duration;
|
||||
use std::collections::HashMap;
|
||||
|
||||
use anyhow::{anyhow, bail, Result};
|
||||
use nix::sys::signal::{kill, Signal};
|
||||
use nix::unistd::Pid;
|
||||
use postgres::{Client, NoTls};
|
||||
|
||||
use pageserver::branches::BranchInfo;
|
||||
use crate::local_env::LocalEnv;
|
||||
use pageserver::ZTimelineId;
|
||||
|
||||
//
|
||||
// Collection of several example deployments useful for tests.
|
||||
//
|
||||
// I'm intendedly modelling storage and compute control planes as a separate entities
|
||||
// as it is closer to the actual setup.
|
||||
//
|
||||
pub struct TestStorageControlPlane {
|
||||
pub wal_acceptors: Vec<WalAcceptorNode>,
|
||||
pub pageserver: Arc<PageServerNode>,
|
||||
pub test_done: AtomicBool,
|
||||
pub repopath: PathBuf,
|
||||
}
|
||||
|
||||
impl TestStorageControlPlane {
|
||||
// Peek into the repository, to grab the timeline ID of given branch
|
||||
pub fn get_branch_timeline(&self, branchname: &str) -> ZTimelineId {
|
||||
let branchpath = self.repopath.join("refs/branches/".to_owned() + branchname);
|
||||
|
||||
ZTimelineId::from_str(&(fs::read_to_string(&branchpath).unwrap())).unwrap()
|
||||
}
|
||||
|
||||
// postgres <-> page_server
|
||||
//
|
||||
// Initialize a new repository and configure a page server to run in it
|
||||
//
|
||||
pub fn one_page_server(local_env: &LocalEnv) -> TestStorageControlPlane {
|
||||
let repopath = local_env.repo_path.clone();
|
||||
|
||||
let pserver = Arc::new(PageServerNode {
|
||||
env: local_env.clone(),
|
||||
kill_on_exit: true,
|
||||
listen_address: None,
|
||||
});
|
||||
pserver.start().unwrap();
|
||||
|
||||
TestStorageControlPlane {
|
||||
wal_acceptors: Vec::new(),
|
||||
pageserver: pserver,
|
||||
test_done: AtomicBool::new(false),
|
||||
repopath,
|
||||
}
|
||||
}
|
||||
|
||||
// postgres <-> {wal_acceptor1, wal_acceptor2, ...}
|
||||
pub fn fault_tolerant(local_env: &LocalEnv, redundancy: usize) -> TestStorageControlPlane {
|
||||
let repopath = local_env.repo_path.clone();
|
||||
|
||||
let mut cplane = TestStorageControlPlane {
|
||||
wal_acceptors: Vec::new(),
|
||||
pageserver: Arc::new(PageServerNode {
|
||||
env: local_env.clone(),
|
||||
kill_on_exit: true,
|
||||
listen_address: None,
|
||||
}),
|
||||
test_done: AtomicBool::new(false),
|
||||
repopath,
|
||||
};
|
||||
cplane.pageserver.start().unwrap();
|
||||
|
||||
const WAL_ACCEPTOR_PORT: usize = 54321;
|
||||
|
||||
for i in 0..redundancy {
|
||||
let wal_acceptor = WalAcceptorNode {
|
||||
listen: format!("127.0.0.1:{}", WAL_ACCEPTOR_PORT + i)
|
||||
.parse()
|
||||
.unwrap(),
|
||||
data_dir: local_env.repo_path.join(format!("wal_acceptor_{}", i)),
|
||||
env: local_env.clone(),
|
||||
};
|
||||
wal_acceptor.init();
|
||||
wal_acceptor.start();
|
||||
cplane.wal_acceptors.push(wal_acceptor);
|
||||
}
|
||||
cplane
|
||||
}
|
||||
|
||||
pub fn stop(&self) {
|
||||
for wa in self.wal_acceptors.iter() {
|
||||
let _ = wa.stop();
|
||||
}
|
||||
self.test_done.store(true, Ordering::Relaxed);
|
||||
}
|
||||
|
||||
pub fn get_wal_acceptor_conn_info(&self) -> String {
|
||||
self.wal_acceptors
|
||||
.iter()
|
||||
.map(|wa| wa.listen.to_string())
|
||||
.collect::<Vec<String>>()
|
||||
.join(",")
|
||||
}
|
||||
|
||||
pub fn is_running(&self) -> bool {
|
||||
self.test_done.load(Ordering::Relaxed)
|
||||
}
|
||||
}
|
||||
|
||||
impl Drop for TestStorageControlPlane {
|
||||
fn drop(&mut self) {
|
||||
self.stop();
|
||||
}
|
||||
}
|
||||
use crate::read_pidfile;
|
||||
|
||||
//
|
||||
// Control routines for pageserver.
|
||||
@@ -125,8 +20,8 @@ impl Drop for TestStorageControlPlane {
|
||||
// Used in CLI and tests.
|
||||
//
|
||||
pub struct PageServerNode {
|
||||
kill_on_exit: bool,
|
||||
listen_address: Option<SocketAddr>,
|
||||
pub kill_on_exit: bool,
|
||||
pub listen_address: Option<SocketAddr>,
|
||||
pub env: LocalEnv,
|
||||
}
|
||||
|
||||
@@ -146,12 +41,32 @@ impl PageServerNode {
|
||||
}
|
||||
}
|
||||
|
||||
pub fn init(&self) -> Result<()> {
|
||||
let mut cmd = Command::new(self.env.pageserver_bin()?);
|
||||
let status = cmd.args(&["--init", "-D", self.env.base_data_dir.to_str().unwrap()])
|
||||
.env_clear()
|
||||
.env("RUST_BACKTRACE", "1")
|
||||
.env("POSTGRES_DISTRIB_DIR", self.env.pg_distrib_dir.to_str().unwrap())
|
||||
.env("ZENITH_REPO_DIR", self.repo_path())
|
||||
.env("PATH", self.env.pg_bin_dir().to_str().unwrap()) // needs postres-wal-redo binary
|
||||
.env("LD_LIBRARY_PATH", self.env.pg_lib_dir().to_str().unwrap())
|
||||
.env("DYLD_LIBRARY_PATH", self.env.pg_lib_dir().to_str().unwrap())
|
||||
.status()
|
||||
.expect("pageserver init failed");
|
||||
|
||||
if status.success() {
|
||||
Ok(())
|
||||
} else {
|
||||
Err(anyhow!("pageserver init failed"))
|
||||
}
|
||||
}
|
||||
|
||||
pub fn repo_path(&self) -> PathBuf {
|
||||
self.env.repo_path.clone()
|
||||
self.env.pageserver_data_dir()
|
||||
}
|
||||
|
||||
pub fn pid_file(&self) -> PathBuf {
|
||||
self.env.repo_path.join("pageserver.pid")
|
||||
self.repo_path().join("pageserver.pid")
|
||||
}
|
||||
|
||||
pub fn start(&self) -> Result<()> {
|
||||
@@ -161,11 +76,12 @@ impl PageServerNode {
|
||||
self.repo_path().display()
|
||||
);
|
||||
|
||||
let mut cmd = Command::new(self.env.zenith_distrib_dir.join("pageserver"));
|
||||
cmd.args(&["-l", self.address().to_string().as_str()])
|
||||
let mut cmd = Command::new(self.env.pageserver_bin()?);
|
||||
cmd.args(&["-l", self.address().to_string().as_str(), "-D", self.repo_path().to_str().unwrap()])
|
||||
.arg("-d")
|
||||
.env_clear()
|
||||
.env("RUST_BACKTRACE", "1")
|
||||
.env("POSTGRES_DISTRIB_DIR", self.env.pg_distrib_dir.to_str().unwrap())
|
||||
.env("ZENITH_REPO_DIR", self.repo_path())
|
||||
.env("PATH", self.env.pg_bin_dir().to_str().unwrap()) // needs postres-wal-redo binary
|
||||
.env("LD_LIBRARY_PATH", self.env.pg_lib_dir().to_str().unwrap())
|
||||
@@ -226,9 +142,7 @@ impl PageServerNode {
|
||||
client.simple_query(sql).unwrap()
|
||||
}
|
||||
|
||||
pub fn page_server_psql_client(
|
||||
&self,
|
||||
) -> std::result::Result<postgres::Client, postgres::Error> {
|
||||
pub fn page_server_psql_client(&self) -> Result<postgres::Client, postgres::Error> {
|
||||
let connstring = format!(
|
||||
"host={} port={} dbname={} user={}",
|
||||
self.address().ip(),
|
||||
@@ -238,6 +152,74 @@ impl PageServerNode {
|
||||
);
|
||||
Client::connect(connstring.as_str(), NoTls)
|
||||
}
|
||||
|
||||
pub fn branches_list(&self) -> Result<Vec<BranchInfo>> {
|
||||
let mut client = self.page_server_psql_client()?;
|
||||
let query_result = client.simple_query("pg_list")?;
|
||||
let branches_json = query_result
|
||||
.first()
|
||||
.map(|msg| match msg {
|
||||
postgres::SimpleQueryMessage::Row(row) => row.get(0),
|
||||
_ => None,
|
||||
})
|
||||
.flatten()
|
||||
.ok_or_else(|| anyhow!("missing branches"))?;
|
||||
|
||||
let res: Vec<BranchInfo> = serde_json::from_str(branches_json)?;
|
||||
Ok(res)
|
||||
}
|
||||
|
||||
pub fn branch_create(&self, name: &str, startpoint: &str) -> Result<BranchInfo> {
|
||||
let mut client = self.page_server_psql_client()?;
|
||||
let query_result =
|
||||
client.simple_query(format!("branch_create {} {}", name, startpoint).as_str())?;
|
||||
|
||||
let branch_json = query_result
|
||||
.first()
|
||||
.map(|msg| match msg {
|
||||
postgres::SimpleQueryMessage::Row(row) => row.get(0),
|
||||
_ => None,
|
||||
})
|
||||
.flatten()
|
||||
.ok_or_else(|| anyhow!("missing branch"))?;
|
||||
|
||||
let res: BranchInfo = serde_json::from_str(branch_json)
|
||||
.map_err(|e| anyhow!("failed to parse branch_create response: {}: {}", branch_json, e))?;
|
||||
|
||||
Ok(res)
|
||||
}
|
||||
|
||||
// TODO: make this a separate request type and avoid loading all the branches
|
||||
pub fn branch_get_by_name(&self, name: &str) -> Result<BranchInfo> {
|
||||
let branch_infos = self.branches_list()?;
|
||||
let branche_by_name: Result<HashMap<String, BranchInfo>> = branch_infos
|
||||
.into_iter()
|
||||
.map(|branch_info| Ok((branch_info.name.clone(), branch_info)))
|
||||
.collect();
|
||||
let branche_by_name = branche_by_name?;
|
||||
|
||||
let branch = branche_by_name
|
||||
.get(name)
|
||||
.ok_or_else(|| anyhow!("Branch {} not found", name))?;
|
||||
|
||||
Ok(branch.clone())
|
||||
}
|
||||
|
||||
pub fn system_id_get(&self) -> Result<u64> {
|
||||
let mut client = self.page_server_psql_client()?;
|
||||
let query_result = client
|
||||
.simple_query("identify_system")?
|
||||
.first()
|
||||
.map(|msg| match msg {
|
||||
postgres::SimpleQueryMessage::Row(row) => row.get(0),
|
||||
_ => None,
|
||||
})
|
||||
.flatten()
|
||||
.ok_or_else(|| anyhow!("failed to get system_id"))?
|
||||
.parse::<u64>()?;
|
||||
|
||||
Ok(query_result)
|
||||
}
|
||||
}
|
||||
|
||||
impl Drop for PageServerNode {
|
||||
@@ -247,104 +229,3 @@ impl Drop for PageServerNode {
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
//
|
||||
// Control routines for WalAcceptor.
|
||||
//
|
||||
// Now used only in test setups.
|
||||
//
|
||||
pub struct WalAcceptorNode {
|
||||
listen: SocketAddr,
|
||||
data_dir: PathBuf,
|
||||
env: LocalEnv,
|
||||
}
|
||||
|
||||
impl WalAcceptorNode {
|
||||
pub fn init(&self) {
|
||||
if self.data_dir.exists() {
|
||||
fs::remove_dir_all(self.data_dir.clone()).unwrap();
|
||||
}
|
||||
fs::create_dir_all(self.data_dir.clone()).unwrap();
|
||||
}
|
||||
|
||||
pub fn start(&self) {
|
||||
println!(
|
||||
"Starting wal_acceptor in {} listening '{}'",
|
||||
self.data_dir.to_str().unwrap(),
|
||||
self.listen
|
||||
);
|
||||
|
||||
let status = Command::new(self.env.zenith_distrib_dir.join("wal_acceptor"))
|
||||
.args(&["-D", self.data_dir.to_str().unwrap()])
|
||||
.args(&["-l", self.listen.to_string().as_str()])
|
||||
.args(&["--systemid", &self.env.systemid.to_string()])
|
||||
// Tell page server it can receive WAL from this WAL safekeeper
|
||||
// FIXME: If there are multiple safekeepers, they will all inform
|
||||
// the page server. Only the last "notification" will stay in effect.
|
||||
// So it's pretty random which safekeeper the page server will connect to
|
||||
.args(&["--pageserver", "127.0.0.1:64000"])
|
||||
.arg("-d")
|
||||
.arg("-n")
|
||||
.status()
|
||||
.expect("failed to start wal_acceptor");
|
||||
|
||||
if !status.success() {
|
||||
panic!("wal_acceptor start failed");
|
||||
}
|
||||
}
|
||||
|
||||
pub fn stop(&self) -> Result<()> {
|
||||
println!("Stopping wal acceptor on {}", self.listen);
|
||||
let pidfile = self.data_dir.join("wal_acceptor.pid");
|
||||
let pid = read_pidfile(&pidfile)?;
|
||||
let pid = Pid::from_raw(pid);
|
||||
if kill(pid, Signal::SIGTERM).is_err() {
|
||||
bail!("Failed to kill wal_acceptor with pid {}", pid);
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
impl Drop for WalAcceptorNode {
|
||||
fn drop(&mut self) {
|
||||
// Ignore errors.
|
||||
let _ = self.stop();
|
||||
}
|
||||
}
|
||||
|
||||
///////////////////////////////////////////////////////////////////////////////
|
||||
|
||||
pub struct WalProposerNode {
|
||||
pub pid: u32,
|
||||
}
|
||||
|
||||
impl WalProposerNode {
|
||||
pub fn stop(&self) {
|
||||
// std::process::Child::id() returns u32, we need i32.
|
||||
let pid: i32 = self.pid.try_into().unwrap();
|
||||
let pid = Pid::from_raw(pid);
|
||||
kill(pid, Signal::SIGTERM).expect("failed to execute kill");
|
||||
}
|
||||
}
|
||||
|
||||
impl Drop for WalProposerNode {
|
||||
fn drop(&mut self) {
|
||||
self.stop();
|
||||
}
|
||||
}
|
||||
|
||||
/// Read a PID file
|
||||
///
|
||||
/// We expect a file that contains a single integer.
|
||||
/// We return an i32 for compatibility with libc and nix.
|
||||
fn read_pidfile(pidfile: &Path) -> Result<i32> {
|
||||
let pid_str = fs::read_to_string(pidfile)
|
||||
.with_context(|| format!("failed to read pidfile {:?}", pidfile))?;
|
||||
let pid: i32 = pid_str
|
||||
.parse()
|
||||
.map_err(|_| anyhow!("failed to parse pidfile {:?}", pidfile))?;
|
||||
if pid < 1 {
|
||||
bail!("pidfile {:?} contained bad value '{}'", pidfile, pid);
|
||||
}
|
||||
Ok(pid)
|
||||
}
|
||||
|
||||
@@ -9,7 +9,10 @@ edition = "2018"
|
||||
[dependencies]
|
||||
lazy_static = "1.4.0"
|
||||
rand = "0.8.3"
|
||||
anyhow = "1.0"
|
||||
nix = "0.20"
|
||||
postgres = { git = "https://github.com/zenithdb/rust-postgres.git", rev="9eb0dbfbeb6a6c1b79099b9f7ae4a8c021877858" }
|
||||
|
||||
pageserver = { path = "../pageserver" }
|
||||
walkeeper = { path = "../walkeeper" }
|
||||
control_plane = { path = "../control_plane" }
|
||||
|
||||
403
integration_tests/src/lib.rs
Normal file
403
integration_tests/src/lib.rs
Normal file
@@ -0,0 +1,403 @@
|
||||
use anyhow::{bail, Result};
|
||||
use std::sync::{atomic::AtomicBool, Arc};
|
||||
use std::{
|
||||
convert::TryInto,
|
||||
fs::{self, File, OpenOptions},
|
||||
io::Read,
|
||||
net::SocketAddr,
|
||||
path::{Path, PathBuf},
|
||||
process::{Command, ExitStatus},
|
||||
sync::atomic::Ordering,
|
||||
};
|
||||
|
||||
use control_plane::compute::PostgresNode;
|
||||
use control_plane::local_env;
|
||||
use control_plane::read_pidfile;
|
||||
use control_plane::{local_env::LocalEnv, storage::PageServerNode};
|
||||
use nix::sys::signal::{kill, Signal};
|
||||
use nix::unistd::Pid;
|
||||
|
||||
use postgres;
|
||||
|
||||
// local compute env for tests
|
||||
pub fn create_test_env(testname: &str) -> LocalEnv {
|
||||
let base_path = Path::new(env!("CARGO_MANIFEST_DIR"))
|
||||
.join("../tmp_check/")
|
||||
.join(testname);
|
||||
|
||||
let base_path_str = base_path.to_str().unwrap();
|
||||
|
||||
// Remove remnants of old test repo
|
||||
let _ = fs::remove_dir_all(&base_path);
|
||||
|
||||
fs::create_dir_all(&base_path).expect(format!("could not create directory for {}", base_path_str).as_str());
|
||||
|
||||
let pgdatadirs_path = base_path.join("pgdatadirs");
|
||||
fs::create_dir(&pgdatadirs_path)
|
||||
.expect(format!("could not create directory {:?}", pgdatadirs_path).as_str());
|
||||
|
||||
LocalEnv {
|
||||
pageserver_connstring: "postgresql://127.0.0.1:64000".to_string(),
|
||||
pg_distrib_dir: Path::new(env!("CARGO_MANIFEST_DIR")).join("../tmp_install"),
|
||||
zenith_distrib_dir: Some(local_env::cargo_bin_dir()),
|
||||
base_data_dir: base_path,
|
||||
}
|
||||
}
|
||||
|
||||
//
|
||||
// Collection of several example deployments useful for tests.
|
||||
//
|
||||
// I'm intendedly modelling storage and compute control planes as a separate entities
|
||||
// as it is closer to the actual setup.
|
||||
//
|
||||
pub struct TestStorageControlPlane {
|
||||
pub wal_acceptors: Vec<WalAcceptorNode>,
|
||||
pub pageserver: Arc<PageServerNode>,
|
||||
pub test_done: AtomicBool,
|
||||
}
|
||||
|
||||
impl TestStorageControlPlane {
|
||||
// postgres <-> page_server
|
||||
//
|
||||
// Initialize a new repository and configure a page server to run in it
|
||||
//
|
||||
pub fn one_page_server(local_env: &LocalEnv) -> TestStorageControlPlane {
|
||||
let pserver = Arc::new(PageServerNode {
|
||||
env: local_env.clone(),
|
||||
kill_on_exit: true,
|
||||
listen_address: None,
|
||||
});
|
||||
pserver.init().unwrap();
|
||||
pserver.start().unwrap();
|
||||
|
||||
TestStorageControlPlane {
|
||||
wal_acceptors: Vec::new(),
|
||||
pageserver: pserver,
|
||||
test_done: AtomicBool::new(false),
|
||||
}
|
||||
}
|
||||
|
||||
// postgres <-> {wal_acceptor1, wal_acceptor2, ...}
|
||||
pub fn fault_tolerant(local_env: &LocalEnv, redundancy: usize) -> TestStorageControlPlane {
|
||||
let mut cplane = TestStorageControlPlane {
|
||||
wal_acceptors: Vec::new(),
|
||||
pageserver: Arc::new(PageServerNode {
|
||||
env: local_env.clone(),
|
||||
kill_on_exit: true,
|
||||
listen_address: None,
|
||||
}),
|
||||
test_done: AtomicBool::new(false),
|
||||
// repopath,
|
||||
};
|
||||
cplane.pageserver.init().unwrap();
|
||||
cplane.pageserver.start().unwrap();
|
||||
|
||||
let systemid = cplane.pageserver.system_id_get().unwrap();
|
||||
|
||||
const WAL_ACCEPTOR_PORT: usize = 54321;
|
||||
|
||||
let datadir_base = local_env.base_data_dir.join("safekeepers");
|
||||
fs::create_dir_all(&datadir_base).unwrap();
|
||||
|
||||
for i in 0..redundancy {
|
||||
let wal_acceptor = WalAcceptorNode {
|
||||
listen: format!("127.0.0.1:{}", WAL_ACCEPTOR_PORT + i)
|
||||
.parse()
|
||||
.unwrap(),
|
||||
data_dir: datadir_base.join(format!("wal_acceptor_{}", i)),
|
||||
systemid,
|
||||
env: local_env.clone(),
|
||||
pass_to_pageserver: i == 0
|
||||
};
|
||||
wal_acceptor.init();
|
||||
wal_acceptor.start();
|
||||
cplane.wal_acceptors.push(wal_acceptor);
|
||||
}
|
||||
cplane
|
||||
}
|
||||
|
||||
pub fn stop(&self) {
|
||||
for wa in self.wal_acceptors.iter() {
|
||||
let _ = wa.stop();
|
||||
}
|
||||
self.test_done.store(true, Ordering::Relaxed);
|
||||
}
|
||||
|
||||
pub fn get_wal_acceptor_conn_info(&self) -> String {
|
||||
self.wal_acceptors
|
||||
.iter()
|
||||
.map(|wa| wa.listen.to_string())
|
||||
.collect::<Vec<String>>()
|
||||
.join(",")
|
||||
}
|
||||
|
||||
pub fn is_running(&self) -> bool {
|
||||
self.test_done.load(Ordering::Relaxed)
|
||||
}
|
||||
}
|
||||
|
||||
impl Drop for TestStorageControlPlane {
|
||||
fn drop(&mut self) {
|
||||
self.stop();
|
||||
}
|
||||
}
|
||||
|
||||
///////////////////////////////////////////////////////////////////////////////
|
||||
//
|
||||
// PostgresNodeExt
|
||||
//
|
||||
///////////////////////////////////////////////////////////////////////////////
|
||||
|
||||
///
|
||||
/// Testing utilities for PostgresNode type
|
||||
///
|
||||
pub trait PostgresNodeExt {
|
||||
fn pg_regress(&self) -> ExitStatus;
|
||||
fn pg_bench(&self, clients: u32, seconds: u32) -> ExitStatus;
|
||||
fn start_proxy(&self, wal_acceptors: &str) -> WalProposerNode;
|
||||
fn open_psql(&self, db: &str) -> postgres::Client;
|
||||
fn dump_log_file(&self);
|
||||
fn safe_psql(&self, db: &str, sql: &str) -> Vec<postgres::Row>;
|
||||
}
|
||||
|
||||
impl PostgresNodeExt for PostgresNode {
|
||||
fn pg_regress(&self) -> ExitStatus {
|
||||
self.safe_psql("postgres", "CREATE DATABASE regression");
|
||||
|
||||
let regress_run_path = self.env.base_data_dir.join("regress");
|
||||
fs::create_dir_all(®ress_run_path).unwrap();
|
||||
fs::create_dir_all(regress_run_path.join("testtablespace")).unwrap();
|
||||
std::env::set_current_dir(regress_run_path).unwrap();
|
||||
|
||||
let regress_build_path =
|
||||
Path::new(env!("CARGO_MANIFEST_DIR")).join("../tmp_install/build/src/test/regress");
|
||||
let regress_src_path =
|
||||
Path::new(env!("CARGO_MANIFEST_DIR")).join("../vendor/postgres/src/test/regress");
|
||||
|
||||
let regress_check = Command::new(regress_build_path.join("pg_regress"))
|
||||
.args(&[
|
||||
"--bindir=''",
|
||||
"--use-existing",
|
||||
format!("--bindir={}", self.env.pg_bin_dir().to_str().unwrap()).as_str(),
|
||||
format!("--dlpath={}", regress_build_path.to_str().unwrap()).as_str(),
|
||||
format!(
|
||||
"--schedule={}",
|
||||
regress_src_path.join("parallel_schedule").to_str().unwrap()
|
||||
)
|
||||
.as_str(),
|
||||
format!("--inputdir={}", regress_src_path.to_str().unwrap()).as_str(),
|
||||
])
|
||||
.env_clear()
|
||||
.env("LD_LIBRARY_PATH", self.env.pg_lib_dir().to_str().unwrap())
|
||||
.env("DYLD_LIBRARY_PATH", self.env.pg_lib_dir().to_str().unwrap())
|
||||
.env("PGPORT", self.address.port().to_string())
|
||||
.env("PGUSER", self.whoami())
|
||||
.env("PGHOST", self.address.ip().to_string())
|
||||
.status()
|
||||
.expect("pg_regress failed");
|
||||
if !regress_check.success() {
|
||||
if let Ok(mut file) = File::open("regression.diffs") {
|
||||
let mut buffer = String::new();
|
||||
file.read_to_string(&mut buffer).unwrap();
|
||||
println!("--------------- regression.diffs:\n{}", buffer);
|
||||
}
|
||||
// self.dump_log_file();
|
||||
if let Ok(mut file) = File::open(self.env.pg_data_dir("pg1").join("log")) {
|
||||
let mut buffer = String::new();
|
||||
file.read_to_string(&mut buffer).unwrap();
|
||||
println!("--------------- pgdatadirs/pg1/log:\n{}", buffer);
|
||||
}
|
||||
}
|
||||
regress_check
|
||||
}
|
||||
|
||||
fn pg_bench(&self, clients: u32, seconds: u32) -> ExitStatus {
|
||||
let port = self.address.port().to_string();
|
||||
let clients = clients.to_string();
|
||||
let seconds = seconds.to_string();
|
||||
let _pg_bench_init = Command::new(self.env.pg_bin_dir().join("pgbench"))
|
||||
.args(&["-i", "-p", port.as_str(), "postgres"])
|
||||
.env("LD_LIBRARY_PATH", self.env.pg_lib_dir().to_str().unwrap())
|
||||
.env("DYLD_LIBRARY_PATH", self.env.pg_lib_dir().to_str().unwrap())
|
||||
.status()
|
||||
.expect("pgbench -i");
|
||||
let pg_bench_run = Command::new(self.env.pg_bin_dir().join("pgbench"))
|
||||
.args(&[
|
||||
"-p",
|
||||
port.as_str(),
|
||||
"-T",
|
||||
seconds.as_str(),
|
||||
"-P",
|
||||
"1",
|
||||
"-c",
|
||||
clients.as_str(),
|
||||
"-M",
|
||||
"prepared",
|
||||
"postgres",
|
||||
])
|
||||
.env("LD_LIBRARY_PATH", self.env.pg_lib_dir().to_str().unwrap())
|
||||
.env("DYLD_LIBRARY_PATH", self.env.pg_lib_dir().to_str().unwrap())
|
||||
.status()
|
||||
.expect("pgbench run");
|
||||
pg_bench_run
|
||||
}
|
||||
|
||||
fn start_proxy(&self, wal_acceptors: &str) -> WalProposerNode {
|
||||
let proxy_path = self.env.pg_bin_dir().join("safekeeper_proxy");
|
||||
match Command::new(proxy_path.as_path())
|
||||
.args(&["--ztimelineid", &self.timelineid.to_string()])
|
||||
.args(&["-s", wal_acceptors])
|
||||
.args(&["-h", &self.address.ip().to_string()])
|
||||
.args(&["-p", &self.address.port().to_string()])
|
||||
.arg("-v")
|
||||
.stderr(
|
||||
OpenOptions::new()
|
||||
.create(true)
|
||||
.append(true)
|
||||
.open(self.pgdata().join("safekeeper_proxy.log"))
|
||||
.unwrap(),
|
||||
)
|
||||
.spawn()
|
||||
{
|
||||
Ok(child) => WalProposerNode { pid: child.id() },
|
||||
Err(e) => panic!("Failed to launch {:?}: {}", proxy_path, e),
|
||||
}
|
||||
}
|
||||
|
||||
fn dump_log_file(&self) {
|
||||
if let Ok(mut file) = File::open(self.env.pageserver_data_dir().join("pageserver.log")) {
|
||||
let mut buffer = String::new();
|
||||
file.read_to_string(&mut buffer).unwrap();
|
||||
println!("--------------- pageserver.log:\n{}", buffer);
|
||||
}
|
||||
}
|
||||
|
||||
fn safe_psql(&self, db: &str, sql: &str) -> Vec<postgres::Row> {
|
||||
let connstring = format!(
|
||||
"host={} port={} dbname={} user={}",
|
||||
self.address.ip(),
|
||||
self.address.port(),
|
||||
db,
|
||||
self.whoami()
|
||||
);
|
||||
let mut client = postgres::Client::connect(connstring.as_str(), postgres::NoTls).unwrap();
|
||||
|
||||
println!("Running {}", sql);
|
||||
let result = client.query(sql, &[]);
|
||||
if result.is_err() {
|
||||
// self.dump_log_file();
|
||||
}
|
||||
result.unwrap()
|
||||
}
|
||||
|
||||
fn open_psql(&self, db: &str) -> postgres::Client {
|
||||
let connstring = format!(
|
||||
"host={} port={} dbname={} user={}",
|
||||
self.address.ip(),
|
||||
self.address.port(),
|
||||
db,
|
||||
self.whoami()
|
||||
);
|
||||
postgres::Client::connect(connstring.as_str(), postgres::NoTls).unwrap()
|
||||
}
|
||||
}
|
||||
|
||||
///////////////////////////////////////////////////////////////////////////////
|
||||
//
|
||||
// WalAcceptorNode
|
||||
//
|
||||
///////////////////////////////////////////////////////////////////////////////
|
||||
|
||||
//
|
||||
// Control routines for WalAcceptor.
|
||||
//
|
||||
// Now used only in test setups.
|
||||
//
|
||||
pub struct WalAcceptorNode {
|
||||
listen: SocketAddr,
|
||||
data_dir: PathBuf,
|
||||
systemid: u64,
|
||||
env: LocalEnv,
|
||||
pass_to_pageserver: bool,
|
||||
}
|
||||
|
||||
impl WalAcceptorNode {
|
||||
pub fn init(&self) {
|
||||
if self.data_dir.exists() {
|
||||
fs::remove_dir_all(self.data_dir.clone()).unwrap();
|
||||
}
|
||||
fs::create_dir_all(self.data_dir.clone()).unwrap();
|
||||
}
|
||||
|
||||
pub fn start(&self) {
|
||||
println!(
|
||||
"Starting wal_acceptor in {} listening '{}'",
|
||||
self.data_dir.to_str().unwrap(),
|
||||
self.listen
|
||||
);
|
||||
|
||||
let ps_arg = if self.pass_to_pageserver {
|
||||
// Tell page server it can receive WAL from this WAL safekeeper
|
||||
["--pageserver", "127.0.0.1:64000"].to_vec()
|
||||
} else {
|
||||
[].to_vec()
|
||||
};
|
||||
|
||||
let status = Command::new(self.env.zenith_distrib_dir.as_ref().unwrap().join("wal_acceptor"))
|
||||
.args(&["-D", self.data_dir.to_str().unwrap()])
|
||||
.args(&["-l", self.listen.to_string().as_str()])
|
||||
.args(&["--systemid", self.systemid.to_string().as_str()])
|
||||
.args(&ps_arg)
|
||||
.arg("-d")
|
||||
.arg("-n")
|
||||
.status()
|
||||
.expect("failed to start wal_acceptor");
|
||||
|
||||
if !status.success() {
|
||||
panic!("wal_acceptor start failed");
|
||||
}
|
||||
}
|
||||
|
||||
pub fn stop(&self) -> Result<()> {
|
||||
println!("Stopping wal acceptor on {}", self.listen);
|
||||
let pidfile = self.data_dir.join("wal_acceptor.pid");
|
||||
let pid = read_pidfile(&pidfile)?;
|
||||
let pid = Pid::from_raw(pid);
|
||||
if kill(pid, Signal::SIGTERM).is_err() {
|
||||
bail!("Failed to kill wal_acceptor with pid {}", pid);
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
impl Drop for WalAcceptorNode {
|
||||
fn drop(&mut self) {
|
||||
// Ignore errors.
|
||||
let _ = self.stop();
|
||||
}
|
||||
}
|
||||
|
||||
///////////////////////////////////////////////////////////////////////////////
|
||||
//
|
||||
// WalProposerNode
|
||||
//
|
||||
///////////////////////////////////////////////////////////////////////////////
|
||||
|
||||
pub struct WalProposerNode {
|
||||
pub pid: u32,
|
||||
}
|
||||
|
||||
impl WalProposerNode {
|
||||
pub fn stop(&self) {
|
||||
// std::process::Child::id() returns u32, we need i32.
|
||||
let pid: i32 = self.pid.try_into().unwrap();
|
||||
let pid = Pid::from_raw(pid);
|
||||
kill(pid, Signal::SIGTERM).expect("failed to execute kill");
|
||||
}
|
||||
}
|
||||
|
||||
impl Drop for WalProposerNode {
|
||||
fn drop(&mut self) {
|
||||
self.stop();
|
||||
}
|
||||
}
|
||||
@@ -1,11 +0,0 @@
|
||||
// test node resettlement to an empty datadir
|
||||
|
||||
// TODO
|
||||
/*
|
||||
#[test]
|
||||
fn test_resettlement() {}
|
||||
|
||||
// test seq scan of everythin after restart
|
||||
#[test]
|
||||
fn test_cold_seqscan() {}
|
||||
*/
|
||||
@@ -1,8 +0,0 @@
|
||||
// TODO
|
||||
/*
|
||||
#[test]
|
||||
fn test_actions() {}
|
||||
|
||||
#[test]
|
||||
fn test_regress() {}
|
||||
*/
|
||||
@@ -1,23 +1,22 @@
|
||||
// mod control_plane;
|
||||
use control_plane::compute::ComputeControlPlane;
|
||||
use control_plane::local_env;
|
||||
use control_plane::local_env::PointInTime;
|
||||
use control_plane::storage::TestStorageControlPlane;
|
||||
|
||||
use integration_tests;
|
||||
use integration_tests::TestStorageControlPlane;
|
||||
use integration_tests::PostgresNodeExt;
|
||||
|
||||
// XXX: force all redo at the end
|
||||
// -- restart + seqscan won't read deleted stuff
|
||||
// -- pageserver api endpoint to check all rels
|
||||
#[test]
|
||||
fn test_redo_cases() {
|
||||
let local_env = local_env::test_env("test_redo_cases");
|
||||
let local_env = integration_tests::create_test_env("test_redo_cases");
|
||||
|
||||
// Start pageserver that reads WAL directly from that postgres
|
||||
let storage_cplane = TestStorageControlPlane::one_page_server(&local_env);
|
||||
let mut compute_cplane = ComputeControlPlane::local(&local_env, &storage_cplane.pageserver);
|
||||
|
||||
// start postgres
|
||||
let maintli = storage_cplane.get_branch_timeline("main");
|
||||
let node = compute_cplane.new_test_node(maintli);
|
||||
let node = compute_cplane.new_test_node("main");
|
||||
node.start().unwrap();
|
||||
|
||||
// check basic work with table
|
||||
@@ -51,15 +50,14 @@ fn test_redo_cases() {
|
||||
// Runs pg_regress on a compute node
|
||||
#[test]
|
||||
fn test_regress() {
|
||||
let local_env = local_env::test_env("test_regress");
|
||||
let local_env = integration_tests::create_test_env("test_regress");
|
||||
|
||||
// Start pageserver that reads WAL directly from that postgres
|
||||
let storage_cplane = TestStorageControlPlane::one_page_server(&local_env);
|
||||
let mut compute_cplane = ComputeControlPlane::local(&local_env, &storage_cplane.pageserver);
|
||||
|
||||
// start postgres
|
||||
let maintli = storage_cplane.get_branch_timeline("main");
|
||||
let node = compute_cplane.new_test_node(maintli);
|
||||
let node = compute_cplane.new_test_node("main");
|
||||
node.start().unwrap();
|
||||
|
||||
let status = node.pg_regress();
|
||||
@@ -69,15 +67,14 @@ fn test_regress() {
|
||||
// Runs pg_bench on a compute node
|
||||
#[test]
|
||||
fn pgbench() {
|
||||
let local_env = local_env::test_env("pgbench");
|
||||
let local_env = integration_tests::create_test_env("pgbench");
|
||||
|
||||
// Start pageserver that reads WAL directly from that postgres
|
||||
let storage_cplane = TestStorageControlPlane::one_page_server(&local_env);
|
||||
let mut compute_cplane = ComputeControlPlane::local(&local_env, &storage_cplane.pageserver);
|
||||
|
||||
// start postgres
|
||||
let maintli = storage_cplane.get_branch_timeline("main");
|
||||
let node = compute_cplane.new_test_node(maintli);
|
||||
let node = compute_cplane.new_test_node("main");
|
||||
node.start().unwrap();
|
||||
|
||||
let status = node.pg_bench(10, 5);
|
||||
@@ -87,30 +84,21 @@ fn pgbench() {
|
||||
// Run two postgres instances on one pageserver, on different timelines
|
||||
#[test]
|
||||
fn test_pageserver_two_timelines() {
|
||||
let local_env = local_env::test_env("test_pageserver_two_timelines");
|
||||
let local_env = integration_tests::create_test_env("test_pageserver_two_timelines");
|
||||
|
||||
// Start pageserver that reads WAL directly from that postgres
|
||||
let storage_cplane = TestStorageControlPlane::one_page_server(&local_env);
|
||||
let mut compute_cplane = ComputeControlPlane::local(&local_env, &storage_cplane.pageserver);
|
||||
|
||||
let maintli = storage_cplane.get_branch_timeline("main");
|
||||
|
||||
// Create new branch at the end of 'main'
|
||||
let startpoint = local_env::find_end_of_wal(&local_env, maintli).unwrap();
|
||||
local_env::create_branch(
|
||||
&local_env,
|
||||
"experimental",
|
||||
PointInTime {
|
||||
timelineid: maintli,
|
||||
lsn: startpoint,
|
||||
},
|
||||
)
|
||||
.unwrap();
|
||||
let experimentaltli = storage_cplane.get_branch_timeline("experimental");
|
||||
storage_cplane
|
||||
.pageserver
|
||||
.branch_create("experimental", "main")
|
||||
.unwrap();
|
||||
|
||||
// Launch postgres instances on both branches
|
||||
let node1 = compute_cplane.new_test_node(maintli);
|
||||
let node2 = compute_cplane.new_test_node(experimentaltli);
|
||||
let node1 = compute_cplane.new_test_node("main");
|
||||
let node2 = compute_cplane.new_test_node("experimental");
|
||||
node1.start().unwrap();
|
||||
node2.start().unwrap();
|
||||
|
||||
|
||||
@@ -1,21 +1,20 @@
|
||||
// Restart acceptors one by one while compute is under the load.
|
||||
use control_plane::compute::ComputeControlPlane;
|
||||
use control_plane::local_env;
|
||||
use control_plane::local_env::PointInTime;
|
||||
use control_plane::storage::TestStorageControlPlane;
|
||||
use pageserver::ZTimelineId;
|
||||
|
||||
use rand::Rng;
|
||||
use std::sync::Arc;
|
||||
use std::time::SystemTime;
|
||||
use std::{thread, time};
|
||||
|
||||
use control_plane::compute::ComputeControlPlane;
|
||||
|
||||
use integration_tests;
|
||||
use integration_tests::TestStorageControlPlane;
|
||||
use integration_tests::PostgresNodeExt;
|
||||
|
||||
const DOWNTIME: u64 = 2;
|
||||
|
||||
#[test]
|
||||
//#[ignore]
|
||||
fn test_embedded_wal_proposer() {
|
||||
let local_env = local_env::test_env("test_embedded_wal_proposer");
|
||||
let local_env = integration_tests::create_test_env("test_embedded_wal_proposer");
|
||||
|
||||
const REDUNDANCY: usize = 3;
|
||||
let storage_cplane = TestStorageControlPlane::fault_tolerant(&local_env, REDUNDANCY);
|
||||
@@ -23,8 +22,7 @@ fn test_embedded_wal_proposer() {
|
||||
let wal_acceptors = storage_cplane.get_wal_acceptor_conn_info();
|
||||
|
||||
// start postgres
|
||||
let maintli = storage_cplane.get_branch_timeline("main");
|
||||
let node = compute_cplane.new_test_master_node(maintli);
|
||||
let node = compute_cplane.new_test_master_node("main");
|
||||
node.append_conf(
|
||||
"postgresql.conf",
|
||||
&format!("wal_acceptors='{}'\n", wal_acceptors),
|
||||
@@ -52,7 +50,7 @@ fn test_embedded_wal_proposer() {
|
||||
|
||||
#[test]
|
||||
fn test_acceptors_normal_work() {
|
||||
let local_env = local_env::test_env("test_acceptors_normal_work");
|
||||
let local_env = integration_tests::create_test_env("test_acceptors_normal_work");
|
||||
|
||||
const REDUNDANCY: usize = 3;
|
||||
let storage_cplane = TestStorageControlPlane::fault_tolerant(&local_env, REDUNDANCY);
|
||||
@@ -60,8 +58,7 @@ fn test_acceptors_normal_work() {
|
||||
let wal_acceptors = storage_cplane.get_wal_acceptor_conn_info();
|
||||
|
||||
// start postgres
|
||||
let maintli = storage_cplane.get_branch_timeline("main");
|
||||
let node = compute_cplane.new_test_master_node(maintli);
|
||||
let node = compute_cplane.new_test_master_node("main");
|
||||
node.start().unwrap();
|
||||
|
||||
// start proxy
|
||||
@@ -93,36 +90,25 @@ fn test_many_timelines() {
|
||||
// Initialize a new repository, and set up WAL safekeepers and page server.
|
||||
const REDUNDANCY: usize = 3;
|
||||
const N_TIMELINES: usize = 5;
|
||||
let local_env = local_env::test_env("test_many_timelines");
|
||||
let local_env = integration_tests::create_test_env("test_many_timelines");
|
||||
let storage_cplane = TestStorageControlPlane::fault_tolerant(&local_env, REDUNDANCY);
|
||||
let mut compute_cplane = ComputeControlPlane::local(&local_env, &storage_cplane.pageserver);
|
||||
let wal_acceptors = storage_cplane.get_wal_acceptor_conn_info();
|
||||
|
||||
// Create branches
|
||||
let mut timelines: Vec<ZTimelineId> = Vec::new();
|
||||
let maintli = storage_cplane.get_branch_timeline("main"); // main branch
|
||||
timelines.push(maintli);
|
||||
let startpoint = local_env::find_end_of_wal(&local_env, maintli).unwrap();
|
||||
let mut timelines: Vec<String> = Vec::new();
|
||||
timelines.push("main".to_string());
|
||||
|
||||
for i in 1..N_TIMELINES {
|
||||
// additional branches
|
||||
let branchname = format!("experimental{}", i);
|
||||
local_env::create_branch(
|
||||
&local_env,
|
||||
&branchname,
|
||||
PointInTime {
|
||||
timelineid: maintli,
|
||||
lsn: startpoint,
|
||||
},
|
||||
)
|
||||
.unwrap();
|
||||
let tli = storage_cplane.get_branch_timeline(&branchname);
|
||||
timelines.push(tli);
|
||||
storage_cplane.pageserver.branch_create(&branchname, "main").unwrap();
|
||||
timelines.push(branchname);
|
||||
}
|
||||
|
||||
// start postgres on each timeline
|
||||
let mut nodes = Vec::new();
|
||||
for tli in timelines {
|
||||
let node = compute_cplane.new_test_node(tli);
|
||||
for tli_name in timelines {
|
||||
let node = compute_cplane.new_test_node(&tli_name);
|
||||
nodes.push(node.clone());
|
||||
node.start().unwrap();
|
||||
node.start_proxy(&wal_acceptors);
|
||||
@@ -159,7 +145,7 @@ fn test_many_timelines() {
|
||||
// Majority is always alive
|
||||
#[test]
|
||||
fn test_acceptors_restarts() {
|
||||
let local_env = local_env::test_env("test_acceptors_restarts");
|
||||
let local_env = integration_tests::create_test_env("test_acceptors_restarts");
|
||||
|
||||
// Start pageserver that reads WAL directly from that postgres
|
||||
const REDUNDANCY: usize = 3;
|
||||
@@ -171,8 +157,7 @@ fn test_acceptors_restarts() {
|
||||
let mut rng = rand::thread_rng();
|
||||
|
||||
// start postgres
|
||||
let maintli = storage_cplane.get_branch_timeline("main");
|
||||
let node = compute_cplane.new_test_master_node(maintli);
|
||||
let node = compute_cplane.new_test_master_node("main");
|
||||
node.start().unwrap();
|
||||
|
||||
// start proxy
|
||||
@@ -222,7 +207,7 @@ fn start_acceptor(cplane: &Arc<TestStorageControlPlane>, no: usize) {
|
||||
// N_CRASHES env var
|
||||
#[test]
|
||||
fn test_acceptors_unavailability() {
|
||||
let local_env = local_env::test_env("test_acceptors_unavailability");
|
||||
let local_env = integration_tests::create_test_env("test_acceptors_unavailability");
|
||||
|
||||
// Start pageserver that reads WAL directly from that postgres
|
||||
const REDUNDANCY: usize = 2;
|
||||
@@ -232,8 +217,7 @@ fn test_acceptors_unavailability() {
|
||||
let wal_acceptors = storage_cplane.get_wal_acceptor_conn_info();
|
||||
|
||||
// start postgres
|
||||
let maintli = storage_cplane.get_branch_timeline("main");
|
||||
let node = compute_cplane.new_test_master_node(maintli);
|
||||
let node = compute_cplane.new_test_master_node("main");
|
||||
node.start().unwrap();
|
||||
|
||||
// start proxy
|
||||
@@ -307,7 +291,7 @@ fn simulate_failures(cplane: Arc<TestStorageControlPlane>) {
|
||||
// Race condition test
|
||||
#[test]
|
||||
fn test_race_conditions() {
|
||||
let local_env = local_env::test_env("test_race_conditions");
|
||||
let local_env = integration_tests::create_test_env("test_race_conditions");
|
||||
|
||||
// Start pageserver that reads WAL directly from that postgres
|
||||
const REDUNDANCY: usize = 3;
|
||||
@@ -319,8 +303,7 @@ fn test_race_conditions() {
|
||||
let wal_acceptors = storage_cplane.get_wal_acceptor_conn_info();
|
||||
|
||||
// start postgres
|
||||
let maintli = storage_cplane.get_branch_timeline("main");
|
||||
let node = compute_cplane.new_test_master_node(maintli);
|
||||
let node = compute_cplane.new_test_master_node("main");
|
||||
node.start().unwrap();
|
||||
|
||||
// start proxy
|
||||
|
||||
@@ -40,6 +40,7 @@ tar = "0.4.33"
|
||||
parse_duration = "2.1.1"
|
||||
serde = { version = "1.0", features = ["derive"] }
|
||||
serde_json = "1"
|
||||
fs_extra = "1.2.0"
|
||||
|
||||
postgres_ffi = { path = "../postgres_ffi" }
|
||||
zenith_utils = { path = "../zenith_utils" }
|
||||
|
||||
@@ -4,7 +4,8 @@
|
||||
|
||||
use log::*;
|
||||
use parse_duration::parse;
|
||||
use std::fs::{self, File, OpenOptions};
|
||||
use std::fs::{File, OpenOptions};
|
||||
use std::{env, path::PathBuf};
|
||||
use std::io;
|
||||
use std::process::exit;
|
||||
use std::thread;
|
||||
@@ -16,7 +17,7 @@ use daemonize::Daemonize;
|
||||
|
||||
use slog::{Drain, FnValue};
|
||||
|
||||
use pageserver::{page_cache, page_service, tui, zenith_repo_dir, PageServerConf};
|
||||
use pageserver::{page_cache, page_service, tui, PageServerConf, branches};
|
||||
|
||||
const DEFAULT_GC_HORIZON: u64 = 64 * 1024 * 1024;
|
||||
const DEFAULT_GC_PERIOD_SEC: u64 = 10;
|
||||
@@ -47,6 +48,12 @@ fn main() -> Result<()> {
|
||||
.takes_value(false)
|
||||
.help("Run in the background"),
|
||||
)
|
||||
.arg(
|
||||
Arg::with_name("init")
|
||||
.long("init")
|
||||
.takes_value(false)
|
||||
.help("Initialize pageserver repo"),
|
||||
)
|
||||
.arg(
|
||||
Arg::with_name("gc_horizon")
|
||||
.long("gc_horizon")
|
||||
@@ -59,16 +66,55 @@ fn main() -> Result<()> {
|
||||
.takes_value(true)
|
||||
.help("Interval between garbage collector iterations"),
|
||||
)
|
||||
.arg(
|
||||
Arg::with_name("workdir")
|
||||
.short("D")
|
||||
.long("workdir")
|
||||
.takes_value(true)
|
||||
.help("Working directory for the pageserver"),
|
||||
)
|
||||
.get_matches();
|
||||
|
||||
let workdir = if let Some(workdir_arg) = arg_matches.value_of("workdir") {
|
||||
PathBuf::from(workdir_arg)
|
||||
} else if let Some(workdir_arg) = std::env::var_os("ZENITH_REPO_DIR") {
|
||||
PathBuf::from(workdir_arg.to_str().unwrap())
|
||||
} else {
|
||||
PathBuf::from(".zenith")
|
||||
};
|
||||
|
||||
let pg_distrib_dir: PathBuf = {
|
||||
if let Some(postgres_bin) = env::var_os("POSTGRES_DISTRIB_DIR") {
|
||||
postgres_bin.into()
|
||||
} else {
|
||||
let cwd = env::current_dir()?;
|
||||
cwd.join("tmp_install")
|
||||
}
|
||||
};
|
||||
|
||||
if !pg_distrib_dir.join("bin/postgres").exists() {
|
||||
anyhow::bail!("Can't find postgres binary at {:?}", pg_distrib_dir);
|
||||
}
|
||||
|
||||
let mut conf = PageServerConf {
|
||||
daemonize: false,
|
||||
interactive: false,
|
||||
gc_horizon: DEFAULT_GC_HORIZON,
|
||||
gc_period: Duration::from_secs(DEFAULT_GC_PERIOD_SEC),
|
||||
listen_addr: "127.0.0.1:5430".parse().unwrap(),
|
||||
listen_addr: "127.0.0.1:64000".parse().unwrap(),
|
||||
workdir,
|
||||
pg_distrib_dir,
|
||||
};
|
||||
|
||||
// Create repo and exit if init was requested
|
||||
if arg_matches.is_present("init") {
|
||||
branches::init_repo(&conf)?;
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
// Set CWD to workdir for non-daemon modes
|
||||
env::set_current_dir(&conf.workdir)?;
|
||||
|
||||
if arg_matches.is_present("daemonize") {
|
||||
conf.daemonize = true;
|
||||
}
|
||||
@@ -98,8 +144,7 @@ fn main() -> Result<()> {
|
||||
}
|
||||
|
||||
fn start_pageserver(conf: &PageServerConf) -> Result<()> {
|
||||
let repodir = zenith_repo_dir();
|
||||
let log_filename = repodir.join("pageserver.log");
|
||||
let log_filename = "pageserver.log";
|
||||
// Don't open the same file for output multiple times;
|
||||
// the different fds could overwrite each other's output.
|
||||
let log_file = OpenOptions::new()
|
||||
@@ -141,8 +186,8 @@ fn start_pageserver(conf: &PageServerConf) -> Result<()> {
|
||||
let stderr = log_file;
|
||||
|
||||
let daemonize = Daemonize::new()
|
||||
.pid_file(repodir.join("pageserver.pid"))
|
||||
.working_directory(repodir)
|
||||
.pid_file("pageserver.pid")
|
||||
.working_directory(".")
|
||||
.stdout(stdout)
|
||||
.stderr(stderr);
|
||||
|
||||
@@ -153,26 +198,14 @@ fn start_pageserver(conf: &PageServerConf) -> Result<()> {
|
||||
} else {
|
||||
// change into the repository directory. In daemon mode, Daemonize
|
||||
// does this for us.
|
||||
let repodir = zenith_repo_dir();
|
||||
std::env::set_current_dir(&repodir)?;
|
||||
info!("Changed current directory to repository in {:?}", &repodir);
|
||||
std::env::set_current_dir(&conf.workdir)?;
|
||||
info!("Changed current directory to repository in {:?}", &conf.workdir);
|
||||
}
|
||||
|
||||
let mut threads = Vec::new();
|
||||
|
||||
// TODO: Check that it looks like a valid repository before going further
|
||||
|
||||
// Create directory for wal-redo datadirs
|
||||
match fs::create_dir("wal-redo") {
|
||||
Ok(_) => {}
|
||||
Err(e) => match e.kind() {
|
||||
io::ErrorKind::AlreadyExists => {}
|
||||
_ => {
|
||||
anyhow::bail!("Failed to create wal-redo data directory: {}", e);
|
||||
}
|
||||
},
|
||||
}
|
||||
|
||||
page_cache::init(conf);
|
||||
|
||||
// GetPage@LSN requests are served by another thread. (It uses async I/O,
|
||||
|
||||
@@ -1,17 +1,135 @@
|
||||
use anyhow::Result;
|
||||
use serde::{Deserialize, Serialize};
|
||||
//
|
||||
// Branch management code
|
||||
//
|
||||
// TODO: move all paths construction to conf impl
|
||||
//
|
||||
|
||||
use anyhow::{Context, Result, anyhow};
|
||||
use bytes::Bytes;
|
||||
use postgres_ffi::xlog_utils;
|
||||
use rand::Rng;
|
||||
use serde::{Deserialize, Serialize};
|
||||
use std::{collections::HashMap, fs, path::{Path, PathBuf}, process::{Command, Stdio}, str::FromStr};
|
||||
use fs_extra;
|
||||
use fs::File;
|
||||
use std::io::Read;
|
||||
use std::env;
|
||||
use zenith_utils::lsn::Lsn;
|
||||
|
||||
use crate::{repository::Repository, ZTimelineId};
|
||||
use crate::{repository::Repository, ZTimelineId, PageServerConf};
|
||||
|
||||
#[derive(Serialize, Deserialize)]
|
||||
#[derive(Serialize, Deserialize, Clone)]
|
||||
pub struct BranchInfo {
|
||||
pub name: String,
|
||||
pub timeline_id: String,
|
||||
pub timeline_id: ZTimelineId,
|
||||
pub latest_valid_lsn: Option<Lsn>,
|
||||
}
|
||||
|
||||
// impl BranchInfo {
|
||||
// pub fn lsn_string(&self) -> String {
|
||||
// let lsn_string_opt = self.latest_valid_lsn.map(|lsn| lsn.to_string());
|
||||
// let lsn_str = lsn_string_opt.as_deref().unwrap_or("?");
|
||||
// format!("{}@{}", self.name, lsn_str)
|
||||
// }
|
||||
// }
|
||||
|
||||
#[derive(Debug, Clone, Copy)]
|
||||
pub struct PointInTime {
|
||||
pub timelineid: ZTimelineId,
|
||||
pub lsn: Lsn,
|
||||
}
|
||||
|
||||
pub fn init_repo(conf: &PageServerConf) -> Result<()> {
|
||||
// top-level dir may exist if we are creating it through CLI
|
||||
fs::create_dir_all(&conf.workdir)
|
||||
.with_context(|| format!("could not create directory {}", &conf.workdir.display()))?;
|
||||
|
||||
env::set_current_dir(&conf.workdir)?;
|
||||
|
||||
fs::create_dir(std::path::Path::new("timelines"))?;
|
||||
fs::create_dir(std::path::Path::new("refs"))?;
|
||||
fs::create_dir(std::path::Path::new("refs").join("branches"))?;
|
||||
fs::create_dir(std::path::Path::new("refs").join("tags"))?;
|
||||
fs::create_dir(std::path::Path::new("wal-redo"))?;
|
||||
|
||||
println!("created directory structure in {}", &conf.workdir.display());
|
||||
|
||||
// Create initial timeline
|
||||
let tli = create_timeline(conf, None)?;
|
||||
let timelinedir = conf.timeline_path(tli);
|
||||
println!("created initial timeline {}", timelinedir.display());
|
||||
|
||||
// Run initdb
|
||||
//
|
||||
// We create the cluster temporarily in a "tmp" directory inside the repository,
|
||||
// and move it to the right location from there.
|
||||
let tmppath = std::path::Path::new("tmp");
|
||||
|
||||
let initdb_path = conf.pg_bin_dir().join("initdb");
|
||||
let initdb = Command::new(initdb_path)
|
||||
.args(&["-D", tmppath.to_str().unwrap()])
|
||||
.arg("--no-instructions")
|
||||
.env_clear()
|
||||
.env("LD_LIBRARY_PATH", conf.pg_lib_dir().to_str().unwrap())
|
||||
.env(
|
||||
"DYLD_LIBRARY_PATH",
|
||||
conf.pg_lib_dir().to_str().unwrap(),
|
||||
)
|
||||
.stdout(Stdio::null())
|
||||
.status()
|
||||
.with_context(|| "failed to execute initdb")?;
|
||||
if !initdb.success() {
|
||||
anyhow::bail!("initdb failed");
|
||||
}
|
||||
println!("initdb succeeded");
|
||||
|
||||
// Read control file to extract the LSN and system id
|
||||
let controlfile_path = tmppath.join("global").join("pg_control");
|
||||
let controlfile = postgres_ffi::decode_pg_control(Bytes::from(fs::read(controlfile_path)?))?;
|
||||
// let systemid = controlfile.system_identifier;
|
||||
let lsn = controlfile.checkPoint;
|
||||
let lsnstr = format!("{:016X}", lsn);
|
||||
|
||||
// Move the initial WAL file
|
||||
fs::rename(
|
||||
tmppath.join("pg_wal").join("000000010000000000000001"),
|
||||
timelinedir
|
||||
.join("wal")
|
||||
.join("000000010000000000000001.partial"),
|
||||
)?;
|
||||
println!("moved initial WAL file");
|
||||
|
||||
// Remove pg_wal
|
||||
fs::remove_dir_all(tmppath.join("pg_wal"))?;
|
||||
|
||||
force_crash_recovery(&tmppath)?;
|
||||
println!("updated pg_control");
|
||||
|
||||
let target = timelinedir.join("snapshots").join(&lsnstr);
|
||||
fs::rename(tmppath, &target)?;
|
||||
println!("moved 'tmp' to {}", target.display());
|
||||
|
||||
// Create 'main' branch to refer to the initial timeline
|
||||
let data = tli.to_string();
|
||||
fs::write(conf.branch_path("main"), data)?;
|
||||
println!("created main branch");
|
||||
|
||||
// XXX: do we need that now? -- yep, for test only
|
||||
|
||||
// // Also update the system id in the LocalEnv
|
||||
// local_env.systemid = systemid;
|
||||
// // write config
|
||||
// let toml = toml::to_string(&local_env)?;
|
||||
// fs::write(repopath.join("config"), toml)?;
|
||||
|
||||
println!(
|
||||
"new zenith repository was created in {}",
|
||||
conf.workdir.display()
|
||||
);
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub(crate) fn get_branches(repository: &dyn Repository) -> Result<Vec<BranchInfo>> {
|
||||
// adapted from CLI code
|
||||
let branches_dir = std::path::Path::new("refs").join("branches");
|
||||
@@ -28,9 +146,263 @@ pub(crate) fn get_branches(repository: &dyn Repository) -> Result<Vec<BranchInfo
|
||||
|
||||
Ok(BranchInfo {
|
||||
name,
|
||||
timeline_id: timeline_id.to_string(),
|
||||
timeline_id,
|
||||
latest_valid_lsn,
|
||||
})
|
||||
})
|
||||
.collect()
|
||||
}
|
||||
|
||||
pub(crate) fn get_system_id(conf: &PageServerConf) -> Result<u64> {
|
||||
// let branches = get_branches();
|
||||
|
||||
let branches_dir = std::path::Path::new("refs").join("branches");
|
||||
let branches = std::fs::read_dir(&branches_dir)?
|
||||
.map(|dir_entry_res| {
|
||||
let dir_entry = dir_entry_res?;
|
||||
let name = dir_entry.file_name().to_str().unwrap().to_string();
|
||||
let timeline_id = std::fs::read_to_string(dir_entry.path())?.parse::<ZTimelineId>()?;
|
||||
Ok((name, timeline_id))
|
||||
})
|
||||
.collect::<Result<HashMap<String, ZTimelineId>>>()?;
|
||||
|
||||
let main_tli = branches
|
||||
.get("main")
|
||||
.ok_or_else(|| anyhow!("Branch main not found"))?;
|
||||
|
||||
let (_, main_snap_dir) = find_latest_snapshot(conf, *main_tli)?;
|
||||
let controlfile_path = main_snap_dir.join("global").join("pg_control");
|
||||
let controlfile = postgres_ffi::decode_pg_control(Bytes::from(fs::read(controlfile_path)?))?;
|
||||
Ok(controlfile.system_identifier)
|
||||
}
|
||||
|
||||
pub(crate) fn create_branch(conf: &PageServerConf, branchname: &str, startpoint_str: &str) -> Result<BranchInfo> {
|
||||
if conf.branch_path(&branchname).exists() {
|
||||
anyhow::bail!("branch {} already exists", branchname);
|
||||
}
|
||||
|
||||
let mut startpoint = parse_point_in_time(conf, startpoint_str)?;
|
||||
|
||||
if startpoint.lsn == Lsn(0) {
|
||||
// Find end of WAL on the old timeline
|
||||
let end_of_wal = find_end_of_wal(conf, startpoint.timelineid)?;
|
||||
println!("branching at end of WAL: {}", end_of_wal);
|
||||
startpoint.lsn = end_of_wal;
|
||||
}
|
||||
|
||||
// create a new timeline for it
|
||||
let newtli = create_timeline(conf, Some(startpoint))?;
|
||||
let newtimelinedir = conf.timeline_path(newtli);
|
||||
|
||||
let data = newtli.to_string();
|
||||
fs::write(conf.branch_path(&branchname), data)?;
|
||||
|
||||
// Copy the latest snapshot (TODO: before the startpoint) and all WAL
|
||||
// TODO: be smarter and avoid the copying...
|
||||
let (_maxsnapshot, oldsnapshotdir) = find_latest_snapshot(conf, startpoint.timelineid)?;
|
||||
let copy_opts = fs_extra::dir::CopyOptions::new();
|
||||
fs_extra::dir::copy(oldsnapshotdir, newtimelinedir.join("snapshots"), ©_opts)?;
|
||||
|
||||
let oldtimelinedir = conf.timeline_path(startpoint.timelineid);
|
||||
copy_wal(
|
||||
&oldtimelinedir.join("wal"),
|
||||
&newtimelinedir.join("wal"),
|
||||
startpoint.lsn,
|
||||
16 * 1024 * 1024 // FIXME: assume default WAL segment size
|
||||
)?;
|
||||
|
||||
Ok(BranchInfo {
|
||||
name: branchname.to_string(),
|
||||
timeline_id: newtli,
|
||||
latest_valid_lsn: Some(startpoint.lsn),
|
||||
})
|
||||
}
|
||||
|
||||
//
|
||||
// Parse user-given string that represents a point-in-time.
|
||||
//
|
||||
// We support multiple variants:
|
||||
//
|
||||
// Raw timeline id in hex, meaning the end of that timeline:
|
||||
// bc62e7d612d0e6fe8f99a6dd2f281f9d
|
||||
//
|
||||
// A specific LSN on a timeline:
|
||||
// bc62e7d612d0e6fe8f99a6dd2f281f9d@2/15D3DD8
|
||||
//
|
||||
// Same, with a human-friendly branch name:
|
||||
// main
|
||||
// main@2/15D3DD8
|
||||
//
|
||||
// Human-friendly tag name:
|
||||
// mytag
|
||||
//
|
||||
//
|
||||
fn parse_point_in_time(conf: &PageServerConf, s: &str) -> Result<PointInTime> {
|
||||
let mut strings = s.split('@');
|
||||
let name = strings.next().unwrap();
|
||||
|
||||
let lsn: Option<Lsn>;
|
||||
if let Some(lsnstr) = strings.next() {
|
||||
lsn = Some(Lsn::from_str(lsnstr)
|
||||
.with_context(|| "invalid LSN in point-in-time specification")?);
|
||||
} else {
|
||||
lsn = None
|
||||
}
|
||||
|
||||
// Check if it's a tag
|
||||
if lsn.is_none() {
|
||||
let tagpath = conf.tag_path(name);
|
||||
if tagpath.exists() {
|
||||
let pointstr = fs::read_to_string(tagpath)?;
|
||||
|
||||
return parse_point_in_time(conf, &pointstr);
|
||||
}
|
||||
}
|
||||
|
||||
// Check if it's a branch
|
||||
// Check if it's branch @ LSN
|
||||
let branchpath = conf.branch_path(name);
|
||||
if branchpath.exists() {
|
||||
let pointstr = fs::read_to_string(branchpath)?;
|
||||
|
||||
let mut result = parse_point_in_time(conf, &pointstr)?;
|
||||
|
||||
result.lsn = lsn.unwrap_or(Lsn(0));
|
||||
return Ok(result);
|
||||
}
|
||||
|
||||
// Check if it's a timelineid
|
||||
// Check if it's timelineid @ LSN
|
||||
if let Ok(timelineid) = ZTimelineId::from_str(name) {
|
||||
let tlipath = conf.timeline_path(timelineid);
|
||||
if tlipath.exists() {
|
||||
return Ok(PointInTime {
|
||||
timelineid,
|
||||
lsn: lsn.unwrap_or(Lsn(0)),
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
panic!("could not parse point-in-time {}", s);
|
||||
}
|
||||
|
||||
// If control file says the cluster was shut down cleanly, modify it, to mark
|
||||
// it as crashed. That forces crash recovery when you start the cluster.
|
||||
//
|
||||
// FIXME:
|
||||
// We currently do this to the initial snapshot in "zenith init". It would
|
||||
// be more natural to do this when the snapshot is restored instead, but we
|
||||
// currently don't have any code to create new snapshots, so it doesn't matter
|
||||
// Or better yet, use a less hacky way of putting the cluster into recovery.
|
||||
// Perhaps create a backup label file in the data directory when it's restored.
|
||||
fn force_crash_recovery(datadir: &Path) -> Result<()> {
|
||||
// Read in the control file
|
||||
let controlfilepath = datadir.to_path_buf().join("global").join("pg_control");
|
||||
let mut controlfile =
|
||||
postgres_ffi::decode_pg_control(Bytes::from(fs::read(controlfilepath.as_path())?))?;
|
||||
|
||||
controlfile.state = postgres_ffi::DBState_DB_IN_PRODUCTION;
|
||||
|
||||
fs::write(
|
||||
controlfilepath.as_path(),
|
||||
postgres_ffi::encode_pg_control(controlfile),
|
||||
)?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn create_timeline(conf: &PageServerConf, ancestor: Option<PointInTime>) -> Result<ZTimelineId> {
|
||||
// Create initial timeline
|
||||
let mut tli_buf = [0u8; 16];
|
||||
rand::thread_rng().fill(&mut tli_buf);
|
||||
let timelineid = ZTimelineId::from(tli_buf);
|
||||
|
||||
let timelinedir = conf.timeline_path(timelineid);
|
||||
|
||||
fs::create_dir(&timelinedir)?;
|
||||
fs::create_dir(&timelinedir.join("snapshots"))?;
|
||||
fs::create_dir(&timelinedir.join("wal"))?;
|
||||
|
||||
if let Some(ancestor) = ancestor {
|
||||
let data = format!("{}@{}", ancestor.timelineid, ancestor.lsn);
|
||||
fs::write(timelinedir.join("ancestor"), data)?;
|
||||
}
|
||||
|
||||
Ok(timelineid)
|
||||
}
|
||||
|
||||
///
|
||||
/// Copy all WAL segments from one directory to another, up to given LSN.
|
||||
///
|
||||
/// If the given LSN is in the middle of a segment, the last segment containing it
|
||||
/// is written out as .partial, and padded with zeros.
|
||||
///
|
||||
fn copy_wal(src_dir: &Path, dst_dir: &Path, upto: Lsn, wal_seg_size: u64) -> Result<()>{
|
||||
|
||||
let last_segno = upto.segment_number(wal_seg_size);
|
||||
let last_segoff = upto.segment_offset(wal_seg_size);
|
||||
|
||||
for entry in fs::read_dir(src_dir).unwrap() {
|
||||
if let Ok(entry) = entry {
|
||||
let entry_name = entry.file_name();
|
||||
let fname = entry_name.to_str().unwrap();
|
||||
|
||||
// Check if the filename looks like an xlog file, or a .partial file.
|
||||
if !xlog_utils::IsXLogFileName(fname) && !xlog_utils::IsPartialXLogFileName(fname) {
|
||||
continue
|
||||
}
|
||||
let (segno, _tli) = xlog_utils::XLogFromFileName(fname, wal_seg_size as usize);
|
||||
|
||||
let copylen;
|
||||
let mut dst_fname = PathBuf::from(fname);
|
||||
if segno > last_segno {
|
||||
// future segment, skip
|
||||
continue;
|
||||
} else if segno < last_segno {
|
||||
copylen = wal_seg_size;
|
||||
dst_fname.set_extension("");
|
||||
} else {
|
||||
copylen = last_segoff;
|
||||
dst_fname.set_extension("partial");
|
||||
}
|
||||
|
||||
let src_file = File::open(entry.path())?;
|
||||
let mut dst_file = File::create(dst_dir.join(&dst_fname))?;
|
||||
std::io::copy(&mut src_file.take(copylen), &mut dst_file)?;
|
||||
|
||||
if copylen < wal_seg_size {
|
||||
std::io::copy(&mut std::io::repeat(0).take(wal_seg_size - copylen), &mut dst_file)?;
|
||||
}
|
||||
}
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
// Find the end of valid WAL in a wal directory
|
||||
pub fn find_end_of_wal(conf: &PageServerConf, timeline: ZTimelineId) -> Result<Lsn> {
|
||||
let waldir = conf.timeline_path(timeline).join("wal");
|
||||
let (lsn, _tli) = xlog_utils::find_end_of_wal(&waldir, 16 * 1024 * 1024, true);
|
||||
Ok(Lsn(lsn))
|
||||
}
|
||||
|
||||
// Find the latest snapshot for a timeline
|
||||
fn find_latest_snapshot(conf: &PageServerConf, timeline: ZTimelineId) -> Result<(Lsn, PathBuf)> {
|
||||
let snapshotsdir = conf.snapshots_path(timeline);
|
||||
let paths = fs::read_dir(&snapshotsdir)?;
|
||||
let mut maxsnapshot = Lsn(0);
|
||||
let mut snapshotdir: Option<PathBuf> = None;
|
||||
for path in paths {
|
||||
let path = path?;
|
||||
let filename = path.file_name().to_str().unwrap().to_owned();
|
||||
if let Ok(lsn) = Lsn::from_hex(&filename) {
|
||||
maxsnapshot = std::cmp::max(lsn, maxsnapshot);
|
||||
snapshotdir = Some(path.path());
|
||||
}
|
||||
}
|
||||
if maxsnapshot == Lsn(0) {
|
||||
// TODO: check ancestor timeline
|
||||
anyhow::bail!("no snapshot found in {}", snapshotsdir.display());
|
||||
}
|
||||
|
||||
Ok((maxsnapshot, snapshotdir.unwrap()))
|
||||
}
|
||||
|
||||
@@ -26,8 +26,47 @@ pub struct PageServerConf {
|
||||
pub listen_addr: SocketAddr,
|
||||
pub gc_horizon: u64,
|
||||
pub gc_period: Duration,
|
||||
pub workdir: PathBuf,
|
||||
|
||||
pub pg_distrib_dir: PathBuf,
|
||||
}
|
||||
|
||||
impl PageServerConf {
|
||||
|
||||
//
|
||||
// Repository paths, relative to workdir.
|
||||
//
|
||||
|
||||
fn tag_path(&self, name: &str) -> PathBuf {
|
||||
std::path::Path::new("refs").join("tags").join(name)
|
||||
}
|
||||
|
||||
fn branch_path(&self, name: &str) -> PathBuf {
|
||||
std::path::Path::new("refs").join("branches").join(name)
|
||||
}
|
||||
|
||||
fn timeline_path(&self, timelineid: ZTimelineId) -> PathBuf {
|
||||
std::path::Path::new("timelines").join(timelineid.to_string())
|
||||
}
|
||||
|
||||
fn snapshots_path(&self, timelineid: ZTimelineId) -> PathBuf {
|
||||
std::path::Path::new("timelines").join(timelineid.to_string()).join("snapshots")
|
||||
}
|
||||
|
||||
//
|
||||
// Postgres distribution paths
|
||||
//
|
||||
|
||||
pub fn pg_bin_dir(&self) -> PathBuf {
|
||||
self.pg_distrib_dir.join("bin")
|
||||
}
|
||||
|
||||
pub fn pg_lib_dir(&self) -> PathBuf {
|
||||
self.pg_distrib_dir.join("lib")
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
/// Zenith Timeline ID is a 128-bit random ID.
|
||||
///
|
||||
/// Zenith timeline IDs are different from PostgreSQL timeline
|
||||
@@ -89,10 +128,3 @@ impl fmt::Display for ZTimelineId {
|
||||
}
|
||||
}
|
||||
|
||||
pub fn zenith_repo_dir() -> PathBuf {
|
||||
// Find repository path
|
||||
match std::env::var_os("ZENITH_REPO_DIR") {
|
||||
Some(val) => PathBuf::from(val.to_str().unwrap()),
|
||||
None => ".zenith".into(),
|
||||
}
|
||||
}
|
||||
|
||||
@@ -8,7 +8,6 @@ use crate::repository::Repository;
|
||||
use crate::walredo::PostgresRedoManager;
|
||||
use crate::PageServerConf;
|
||||
use lazy_static::lazy_static;
|
||||
use std::path::Path;
|
||||
use std::sync::{Arc, Mutex};
|
||||
|
||||
lazy_static! {
|
||||
@@ -22,7 +21,7 @@ pub fn init(conf: &PageServerConf) {
|
||||
let walredo_mgr = PostgresRedoManager::new(conf);
|
||||
|
||||
// we have already changed current dir to the repository.
|
||||
let repo = RocksRepository::new(conf, Path::new("."), Arc::new(walredo_mgr));
|
||||
let repo = RocksRepository::new(conf, Arc::new(walredo_mgr));
|
||||
|
||||
*m = Some(Arc::new(repo));
|
||||
}
|
||||
|
||||
@@ -30,6 +30,7 @@ use crate::restore_local_repo;
|
||||
use crate::walreceiver;
|
||||
use crate::PageServerConf;
|
||||
use crate::ZTimelineId;
|
||||
use crate::branches;
|
||||
|
||||
#[derive(Debug)]
|
||||
enum FeMessage {
|
||||
@@ -690,6 +691,28 @@ impl Connection {
|
||||
|
||||
self.write_message_noflush(&BeMessage::CommandComplete)?;
|
||||
self.write_message(&BeMessage::ReadyForQuery)?;
|
||||
|
||||
} else if query_string.starts_with(b"branch_create ") {
|
||||
let query_str = String::from_utf8(query_string.to_vec())?;
|
||||
let err = || anyhow!("invalid branch_create: '{}'", query_str);
|
||||
|
||||
// branch_create <branchname> <startpoint>
|
||||
// TODO lazy static
|
||||
let re = Regex::new(r"^branch_create (\w+) ([\w@\\]+)[\r\n\s]*;?$").unwrap();
|
||||
let caps = re
|
||||
.captures(&query_str)
|
||||
.ok_or_else(err)?;
|
||||
|
||||
let branchname: String = String::from(caps.get(1).ok_or_else(err)?.as_str());
|
||||
let startpoint_str: String = String::from(caps.get(2).ok_or_else(err)?.as_str());
|
||||
|
||||
let branch = branches::create_branch(&self.conf, &branchname, &startpoint_str)?;
|
||||
let branch = serde_json::to_vec(&branch)?;
|
||||
|
||||
self.write_message_noflush(&BeMessage::RowDescription)?;
|
||||
self.write_message_noflush(&BeMessage::DataRow(Bytes::from(branch)))?;
|
||||
self.write_message_noflush(&BeMessage::CommandComplete)?;
|
||||
self.write_message(&BeMessage::ReadyForQuery)?;
|
||||
} else if query_string.starts_with(b"pg_list") {
|
||||
let branches = crate::branches::get_branches(&*page_cache::get_repository())?;
|
||||
let branches_buf = serde_json::to_vec(&branches)?;
|
||||
@@ -708,6 +731,15 @@ impl Connection {
|
||||
// on connect
|
||||
self.write_message_noflush(&BeMessage::CommandComplete)?;
|
||||
self.write_message(&BeMessage::ReadyForQuery)?;
|
||||
} else if query_string.to_ascii_lowercase().starts_with(b"identify_system") {
|
||||
// TODO: match postgres response formarmat for 'identify_system'
|
||||
let system_id = crate::branches::get_system_id(&self.conf)?
|
||||
.to_string();
|
||||
|
||||
self.write_message_noflush(&BeMessage::RowDescription)?;
|
||||
self.write_message_noflush(&BeMessage::DataRow(Bytes::from(system_id)))?;
|
||||
self.write_message_noflush(&BeMessage::CommandComplete)?;
|
||||
self.write_message(&BeMessage::ReadyForQuery)?;
|
||||
} else {
|
||||
self.write_message_noflush(&BeMessage::RowDescription)?;
|
||||
self.write_message_noflush(&HELLO_WORLD_ROW)?;
|
||||
|
||||
@@ -291,6 +291,7 @@ mod tests {
|
||||
use std::path::Path;
|
||||
use std::str::FromStr;
|
||||
use std::time::Duration;
|
||||
use std::env;
|
||||
|
||||
fn get_test_conf() -> PageServerConf {
|
||||
PageServerConf {
|
||||
@@ -299,6 +300,8 @@ mod tests {
|
||||
gc_horizon: 64 * 1024 * 1024,
|
||||
gc_period: Duration::from_secs(10),
|
||||
listen_addr: "127.0.0.1:5430".parse().unwrap(),
|
||||
workdir: "".into(),
|
||||
pg_distrib_dir: "".into(),
|
||||
}
|
||||
}
|
||||
|
||||
@@ -345,7 +348,9 @@ mod tests {
|
||||
let repo_dir = Path::new("../tmp_check/test_relsize_repo");
|
||||
let _ = fs::remove_dir_all(repo_dir);
|
||||
fs::create_dir_all(repo_dir)?;
|
||||
let repo = rocksdb::RocksRepository::new(&get_test_conf(), repo_dir, Arc::new(walredo_mgr));
|
||||
env::set_current_dir(repo_dir)?;
|
||||
|
||||
let repo = rocksdb::RocksRepository::new(&get_test_conf(), Arc::new(walredo_mgr));
|
||||
|
||||
// get_timeline() with non-existent timeline id should fail
|
||||
//repo.get_timeline("11223344556677881122334455667788");
|
||||
|
||||
@@ -11,6 +11,8 @@ use crate::waldecoder::{Oid, TransactionId};
|
||||
use crate::walredo::WalRedoManager;
|
||||
use crate::PageServerConf;
|
||||
use crate::ZTimelineId;
|
||||
// use crate::PageServerConf;
|
||||
// use crate::branches;
|
||||
use anyhow::{bail, Context, Result};
|
||||
use bytes::{Buf, BufMut, Bytes, BytesMut};
|
||||
use log::*;
|
||||
@@ -18,7 +20,6 @@ use postgres_ffi::pg_constants;
|
||||
use std::cmp::min;
|
||||
use std::collections::HashMap;
|
||||
use std::convert::TryInto;
|
||||
use std::path::{Path, PathBuf};
|
||||
use std::sync::atomic::AtomicU64;
|
||||
use std::sync::atomic::Ordering;
|
||||
use std::sync::{Arc, Mutex};
|
||||
@@ -31,7 +32,6 @@ use zenith_utils::seqwait::SeqWait;
|
||||
static TIMEOUT: Duration = Duration::from_secs(60);
|
||||
|
||||
pub struct RocksRepository {
|
||||
repo_dir: PathBuf,
|
||||
conf: PageServerConf,
|
||||
timelines: Mutex<HashMap<ZTimelineId, Arc<RocksTimeline>>>,
|
||||
|
||||
@@ -158,11 +158,9 @@ impl CacheEntryContent {
|
||||
impl RocksRepository {
|
||||
pub fn new(
|
||||
conf: &PageServerConf,
|
||||
repo_dir: &Path,
|
||||
walredo_mgr: Arc<dyn WalRedoManager>,
|
||||
) -> RocksRepository {
|
||||
RocksRepository {
|
||||
repo_dir: PathBuf::from(repo_dir),
|
||||
conf: conf.clone(),
|
||||
timelines: Mutex::new(HashMap::new()),
|
||||
walredo_mgr,
|
||||
@@ -188,7 +186,7 @@ impl Repository for RocksRepository {
|
||||
Some(timeline) => Ok(timeline.clone()),
|
||||
None => {
|
||||
let timeline =
|
||||
RocksTimeline::new(&self.repo_dir, timelineid, self.walredo_mgr.clone());
|
||||
RocksTimeline::new(&self.conf, timelineid, self.walredo_mgr.clone());
|
||||
|
||||
restore_timeline(&self.conf, &timeline, timelineid)?;
|
||||
|
||||
@@ -216,7 +214,7 @@ impl Repository for RocksRepository {
|
||||
fn create_empty_timeline(&self, timelineid: ZTimelineId) -> Result<Arc<dyn Timeline>> {
|
||||
let mut timelines = self.timelines.lock().unwrap();
|
||||
|
||||
let timeline = RocksTimeline::new(&self.repo_dir, timelineid, self.walredo_mgr.clone());
|
||||
let timeline = RocksTimeline::new(&self.conf, timelineid, self.walredo_mgr.clone());
|
||||
|
||||
let timeline_rc = Arc::new(timeline);
|
||||
let r = timelines.insert(timelineid, timeline_rc.clone());
|
||||
@@ -229,8 +227,8 @@ impl Repository for RocksRepository {
|
||||
}
|
||||
|
||||
impl RocksTimeline {
|
||||
fn open_rocksdb(repo_dir: &Path, timelineid: ZTimelineId) -> rocksdb::DB {
|
||||
let path = repo_dir.join("timelines").join(timelineid.to_string());
|
||||
fn open_rocksdb(conf: &PageServerConf, timelineid: ZTimelineId) -> rocksdb::DB {
|
||||
let path = conf.timeline_path(timelineid);
|
||||
let mut opts = rocksdb::Options::default();
|
||||
opts.create_if_missing(true);
|
||||
opts.set_use_fsync(true);
|
||||
@@ -246,12 +244,12 @@ impl RocksTimeline {
|
||||
}
|
||||
|
||||
fn new(
|
||||
repo_dir: &Path,
|
||||
conf: &PageServerConf,
|
||||
timelineid: ZTimelineId,
|
||||
walredo_mgr: Arc<dyn WalRedoManager>,
|
||||
) -> RocksTimeline {
|
||||
RocksTimeline {
|
||||
db: RocksTimeline::open_rocksdb(repo_dir, timelineid),
|
||||
db: RocksTimeline::open_rocksdb(conf, timelineid),
|
||||
|
||||
walredo_mgr,
|
||||
|
||||
|
||||
@@ -457,7 +457,9 @@ impl PostgresRedoProcess {
|
||||
// Start postgres binary in special WAL redo mode.
|
||||
//
|
||||
// Tests who run pageserver binary are setting proper PG_BIN_DIR
|
||||
// and PG_LIB_DIR so that WalRedo would start right postgres. We may later
|
||||
// and PG_LIB_DIR so that WalRedo would start right postgres.
|
||||
|
||||
// do that: We may later
|
||||
// switch to setting same things in pageserver config file.
|
||||
async fn launch(datadir: &str) -> Result<PostgresRedoProcess, Error> {
|
||||
// Create empty data directory for wal-redo postgres deleting old one.
|
||||
|
||||
@@ -41,7 +41,7 @@ If you want to run all tests that have the string "bench" in their names:
|
||||
Useful environment variables:
|
||||
|
||||
`ZENITH_BIN`: The directory where zenith binaries can be found.
|
||||
`POSTGRES_BIN`: The directory where postgres binaries can be found.
|
||||
`POSTGRES_DISTRIB_DIR`: The directory where postgres distribution can be found.
|
||||
`TEST_OUTPUT`: Set the directory where test state and test output files
|
||||
should go.
|
||||
`TEST_SHARED_FIXTURES`: Try to re-use a single postgres and pageserver
|
||||
|
||||
@@ -16,7 +16,7 @@ A fixture is created with the decorator @zenfixture, which is a wrapper around
|
||||
the standard pytest.fixture with some extra behavior.
|
||||
|
||||
There are several environment variables that can control the running of tests:
|
||||
ZENITH_BIN, POSTGRES_BIN, etc. See README.md for more information.
|
||||
ZENITH_BIN, POSTGRES_DISTRIB_DIR, etc. See README.md for more information.
|
||||
|
||||
To use fixtures in a test file, add this line of code:
|
||||
|
||||
@@ -78,7 +78,7 @@ class ZenithCli:
|
||||
self.bin_zenith = os.path.join(binpath, 'zenith')
|
||||
self.env = os.environ.copy()
|
||||
self.env['ZENITH_REPO_DIR'] = repo_dir
|
||||
self.env['POSTGRES_BIN'] = pg_distrib_dir
|
||||
self.env['POSTGRES_DISTRIB_DIR'] = pg_distrib_dir
|
||||
|
||||
def run(self, arguments):
|
||||
""" Run "zenith" with the specified arguments.
|
||||
@@ -108,11 +108,11 @@ class ZenithPageserver:
|
||||
self.running = False
|
||||
|
||||
def start(self):
|
||||
self.zenith_cli.run(['pageserver', 'start'])
|
||||
self.zenith_cli.run(['start'])
|
||||
self.running = True
|
||||
|
||||
def stop(self):
|
||||
self.zenith_cli.run(['pageserver', 'stop'])
|
||||
self.zenith_cli.run(['stop'])
|
||||
self.running = True
|
||||
|
||||
|
||||
@@ -316,7 +316,7 @@ def zenith_binpath(base_dir):
|
||||
@zenfixture
|
||||
def pg_distrib_dir(base_dir):
|
||||
""" find the postgress install """
|
||||
env_postgres_bin = os.environ.get('POSTGRES_BIN')
|
||||
env_postgres_bin = os.environ.get('POSTGRES_DISTRIB_DIR')
|
||||
if env_postgres_bin:
|
||||
pg_dir = env_postgres_bin
|
||||
else:
|
||||
|
||||
@@ -3,7 +3,7 @@ use std::net::SocketAddr;
|
||||
use std::path::PathBuf;
|
||||
use std::time::Duration;
|
||||
|
||||
mod pq_protocol;
|
||||
pub mod pq_protocol;
|
||||
pub mod s3_offload;
|
||||
pub mod wal_service;
|
||||
|
||||
|
||||
@@ -10,6 +10,18 @@ edition = "2018"
|
||||
clap = "2.33.0"
|
||||
anyhow = "1.0"
|
||||
serde_json = "1"
|
||||
# rand = "0.8.3"
|
||||
# tar = "0.4.33"
|
||||
# serde = { version = "1.0", features = ["derive"] }
|
||||
# toml = "0.5"
|
||||
# lazy_static = "1.4"
|
||||
# regex = "1"
|
||||
# # hex = "0.4.3"
|
||||
# bytes = "1.0.1"
|
||||
# # fs_extra = "1.2.0"
|
||||
# nix = "0.20"
|
||||
# # thiserror = "1"
|
||||
# url = "2.2.2"
|
||||
|
||||
postgres = { git = "https://github.com/zenithdb/rust-postgres.git", rev="9eb0dbfbeb6a6c1b79099b9f7ae4a8c021877858" }
|
||||
|
||||
|
||||
@@ -1,55 +1,51 @@
|
||||
use std::path::{Path, PathBuf};
|
||||
use std::collections::HashMap;
|
||||
use std::process::exit;
|
||||
use std::str::FromStr;
|
||||
use std::{collections::HashMap, fs};
|
||||
|
||||
use anyhow::{Result, Context};
|
||||
use anyhow::{anyhow, bail};
|
||||
use anyhow::{Context, anyhow};
|
||||
use anyhow::Result;
|
||||
use clap::{App, Arg, ArgMatches, SubCommand};
|
||||
|
||||
use control_plane::local_env::LocalEnv;
|
||||
use control_plane::local_env;
|
||||
use control_plane::compute::ComputeControlPlane;
|
||||
use control_plane::storage::PageServerNode;
|
||||
use control_plane::{compute::ComputeControlPlane, local_env, storage};
|
||||
|
||||
use pageserver::{branches::BranchInfo, ZTimelineId};
|
||||
|
||||
use pageserver::{ZTimelineId, branches::BranchInfo};
|
||||
use zenith_utils::lsn::Lsn;
|
||||
|
||||
fn zenith_repo_dir() -> PathBuf {
|
||||
// Find repository path
|
||||
match std::env::var_os("ZENITH_REPO_DIR") {
|
||||
Some(val) => PathBuf::from(val.to_str().unwrap()),
|
||||
None => ".zenith".into(),
|
||||
}
|
||||
}
|
||||
|
||||
// Main entry point for the 'zenith' CLI utility
|
||||
//
|
||||
// This utility can used to work with a local zenith repository.
|
||||
// In order to run queries in it, you need to launch the page server,
|
||||
// and a compute node against the page server
|
||||
// This utility helps to manage zenith installation. That includes following:
|
||||
// * Management of local postgres installations running on top of the
|
||||
// pageserver.
|
||||
// * Providing CLI api to the pageserver (local or remote)
|
||||
// * TODO: export/import to/from usual postgres
|
||||
fn main() -> Result<()> {
|
||||
let name_arg = Arg::with_name("NAME")
|
||||
.short("n")
|
||||
.index(1)
|
||||
.help("name of this postgres instance")
|
||||
.required(true);
|
||||
|
||||
let matches = App::new("zenith")
|
||||
.about("Zenith CLI")
|
||||
.subcommand(SubCommand::with_name("init").about("Initialize a new Zenith repository"))
|
||||
.subcommand(
|
||||
SubCommand::with_name("init")
|
||||
.about("Initialize a new Zenith repository")
|
||||
.arg(
|
||||
Arg::with_name("remote-pageserver")
|
||||
.long("remote-pageserver")
|
||||
.required(false)
|
||||
.value_name("pageserver-url")
|
||||
),
|
||||
)
|
||||
.subcommand(
|
||||
SubCommand::with_name("branch")
|
||||
.about("Create a new branch")
|
||||
.arg(Arg::with_name("branchname").required(false).index(1))
|
||||
.arg(Arg::with_name("start-point").required(false).index(2)),
|
||||
)
|
||||
.subcommand(
|
||||
SubCommand::with_name("pageserver")
|
||||
.about("Manage pageserver instance")
|
||||
.subcommand(SubCommand::with_name("status"))
|
||||
.subcommand(SubCommand::with_name("start"))
|
||||
.subcommand(SubCommand::with_name("stop")),
|
||||
)
|
||||
.subcommand(SubCommand::with_name("status"))
|
||||
.subcommand(SubCommand::with_name("start"))
|
||||
.subcommand(SubCommand::with_name("stop"))
|
||||
.subcommand(SubCommand::with_name("restart"))
|
||||
.subcommand(
|
||||
SubCommand::with_name("pg")
|
||||
.about("Manage postgres instances")
|
||||
@@ -67,52 +63,74 @@ fn main() -> Result<()> {
|
||||
)
|
||||
.get_matches();
|
||||
|
||||
// handle init separately and exit
|
||||
// Create config file
|
||||
if let ("init", Some(sub_args)) = matches.subcommand() {
|
||||
run_init_cmd(sub_args.clone())?;
|
||||
exit(0);
|
||||
let pageserver_uri = sub_args.value_of("pageserver-url");
|
||||
local_env::init(pageserver_uri)
|
||||
.with_context(|| "Failed to create cofig file")?;
|
||||
}
|
||||
|
||||
// all other commands would need config
|
||||
|
||||
let repopath = zenith_repo_dir();
|
||||
if !repopath.exists() {
|
||||
bail!(
|
||||
"Zenith repository does not exist in {}.\n\
|
||||
Set ZENITH_REPO_DIR or initialize a new repository with 'zenith init'",
|
||||
repopath.display()
|
||||
);
|
||||
}
|
||||
// TODO: check that it looks like a zenith repository
|
||||
let env = match local_env::load_config(&repopath) {
|
||||
let env = match local_env::load_config() {
|
||||
Ok(conf) => conf,
|
||||
Err(e) => {
|
||||
eprintln!("Error loading config from {}: {}", repopath.display(), e);
|
||||
eprintln!("Error loading config: {}", e);
|
||||
exit(1);
|
||||
}
|
||||
};
|
||||
|
||||
match matches.subcommand() {
|
||||
("init", Some(_)) => {
|
||||
panic!() /* Should not happen. Init was handled before */
|
||||
let pageserver = PageServerNode::from_env(&env);
|
||||
pageserver.init()?;
|
||||
}
|
||||
|
||||
("branch", Some(sub_args)) => run_branch_cmd(&env, sub_args.clone())?,
|
||||
("pageserver", Some(sub_args)) => run_pageserver_cmd(&env, sub_args.clone())?,
|
||||
("branch", Some(sub_args)) => {
|
||||
let pageserver = PageServerNode::from_env(&env);
|
||||
|
||||
if let Some(branchname) = sub_args.value_of("branchname") {
|
||||
if let Some(startpoint_str) = sub_args.value_of("start-point") {
|
||||
let branch = pageserver.branch_create(branchname, startpoint_str)?;
|
||||
println!("Created branch '{}' at {:?}", branch.name, branch.latest_valid_lsn.unwrap_or(Lsn(0)));
|
||||
} else {
|
||||
panic!("Missing start-point");
|
||||
}
|
||||
} else {
|
||||
// No arguments, list branches
|
||||
for branch in pageserver.branches_list()? {
|
||||
println!(" {}", branch.name);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
("start", Some(_sub_m)) => {
|
||||
let pageserver = storage::PageServerNode::from_env(&env);
|
||||
let pageserver = PageServerNode::from_env(&env);
|
||||
|
||||
if let Err(e) = pageserver.start() {
|
||||
eprintln!("pageserver start: {}", e);
|
||||
eprintln!("pageserver start failed: {}", e);
|
||||
exit(1);
|
||||
}
|
||||
}
|
||||
|
||||
("stop", Some(_sub_m)) => {
|
||||
let pageserver = storage::PageServerNode::from_env(&env);
|
||||
let pageserver = PageServerNode::from_env(&env);
|
||||
|
||||
if let Err(e) = pageserver.stop() {
|
||||
eprintln!("pageserver stop: {}", e);
|
||||
eprintln!("pageserver stop failed: {}", e);
|
||||
exit(1);
|
||||
}
|
||||
}
|
||||
|
||||
("restart", Some(_sub_m)) => {
|
||||
let pageserver = PageServerNode::from_env(&env);
|
||||
|
||||
if let Err(e) = pageserver.stop() {
|
||||
eprintln!("pageserver stop failed: {}", e);
|
||||
exit(1);
|
||||
}
|
||||
|
||||
if let Err(e) = pageserver.start() {
|
||||
eprintln!("pageserver start failed: {}", e);
|
||||
exit(1);
|
||||
}
|
||||
}
|
||||
@@ -131,38 +149,10 @@ fn main() -> Result<()> {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn run_pageserver_cmd(local_env: &LocalEnv, args: ArgMatches) -> Result<()> {
|
||||
match args.subcommand() {
|
||||
("status", Some(_sub_m)) => {
|
||||
todo!();
|
||||
}
|
||||
("start", Some(_sub_m)) => {
|
||||
let psnode = PageServerNode::from_env(local_env);
|
||||
psnode.start()?;
|
||||
println!("Page server started");
|
||||
}
|
||||
("stop", Some(_sub_m)) => {
|
||||
let psnode = PageServerNode::from_env(local_env);
|
||||
psnode.stop()?;
|
||||
println!("Page server stopped");
|
||||
}
|
||||
_ => unreachable!(),
|
||||
};
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
// Peek into the repository, to grab the timeline ID of given branch
|
||||
pub fn get_branch_timeline(repopath: &Path, branchname: &str) -> ZTimelineId {
|
||||
let branchpath = repopath.join("refs/branches/".to_owned() + branchname);
|
||||
|
||||
ZTimelineId::from_str(&(fs::read_to_string(&branchpath).unwrap())).unwrap()
|
||||
}
|
||||
|
||||
/// Returns a map of timeline IDs to branch_name@lsn strings.
|
||||
/// Connects to the pageserver to query this information.
|
||||
fn get_branch_infos(env: &LocalEnv) -> Result<HashMap<ZTimelineId, String>> {
|
||||
let page_server = storage::PageServerNode::from_env(env);
|
||||
fn get_branch_infos(env: &local_env::LocalEnv) -> Result<HashMap<ZTimelineId, String>> {
|
||||
let page_server = PageServerNode::from_env(env);
|
||||
let mut client = page_server.page_server_psql_client()?;
|
||||
let branches_msgs = client.simple_query("pg_list")?;
|
||||
|
||||
@@ -179,11 +169,10 @@ fn get_branch_infos(env: &LocalEnv) -> Result<HashMap<ZTimelineId, String>> {
|
||||
let branch_infos: Result<HashMap<ZTimelineId, String>> = branch_infos
|
||||
.into_iter()
|
||||
.map(|branch_info| {
|
||||
let timeline_id = ZTimelineId::from_str(&branch_info.timeline_id)?;
|
||||
let lsn_string_opt = branch_info.latest_valid_lsn.map(|lsn| lsn.to_string());
|
||||
let lsn_str = lsn_string_opt.as_deref().unwrap_or("?");
|
||||
let branch_lsn_string = format!("{}@{}", branch_info.name, lsn_str);
|
||||
Ok((timeline_id, branch_lsn_string))
|
||||
Ok((branch_info.timeline_id, branch_lsn_string))
|
||||
})
|
||||
.collect();
|
||||
|
||||
@@ -193,19 +182,11 @@ fn get_branch_infos(env: &LocalEnv) -> Result<HashMap<ZTimelineId, String>> {
|
||||
fn handle_pg(pg_match: &ArgMatches, env: &local_env::LocalEnv) -> Result<()> {
|
||||
let mut cplane = ComputeControlPlane::load(env.clone())?;
|
||||
|
||||
// FIXME: cheat and resolve the timeline by peeking into the
|
||||
// repository. In reality, when you're launching a compute node
|
||||
// against a possibly-remote page server, we wouldn't know what
|
||||
// branches exist in the remote repository. Or would we require
|
||||
// that you "zenith fetch" them into a local repoitory first?
|
||||
match pg_match.subcommand() {
|
||||
("create", Some(sub_m)) => {
|
||||
let timeline_arg = sub_m.value_of("timeline").unwrap_or("main");
|
||||
let timeline = get_branch_timeline(&env.repo_path, timeline_arg);
|
||||
|
||||
println!("Initializing Postgres on timeline {}...", timeline);
|
||||
|
||||
cplane.new_node(timeline)?;
|
||||
println!("Initializing Postgres on timeline {}...", timeline_arg);
|
||||
cplane.new_node(timeline_arg)?;
|
||||
}
|
||||
("list", Some(_sub_m)) => {
|
||||
let branch_infos = get_branch_infos(env).unwrap_or_else(|e| {
|
||||
@@ -249,121 +230,3 @@ fn handle_pg(pg_match: &ArgMatches, env: &local_env::LocalEnv) -> Result<()> {
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
// "zenith init" - Initialize a new Zenith repository in current dir
|
||||
fn run_init_cmd(_args: ArgMatches) -> Result<()> {
|
||||
local_env::init()?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
// handle "zenith branch" subcommand
|
||||
fn run_branch_cmd(local_env: &LocalEnv, args: ArgMatches) -> Result<()> {
|
||||
let repopath = local_env.repo_path.to_str().unwrap();
|
||||
|
||||
if let Some(branchname) = args.value_of("branchname") {
|
||||
if PathBuf::from(format!("{}/refs/branches/{}", repopath, branchname)).exists() {
|
||||
anyhow::bail!("branch {} already exists", branchname);
|
||||
}
|
||||
|
||||
if let Some(startpoint_str) = args.value_of("start-point") {
|
||||
let mut startpoint = parse_point_in_time(startpoint_str)?;
|
||||
|
||||
if startpoint.lsn == Lsn(0) {
|
||||
// Find end of WAL on the old timeline
|
||||
let end_of_wal = local_env::find_end_of_wal(local_env, startpoint.timelineid)?;
|
||||
|
||||
println!("branching at end of WAL: {}", end_of_wal);
|
||||
|
||||
startpoint.lsn = end_of_wal;
|
||||
}
|
||||
|
||||
return local_env::create_branch(local_env, branchname, startpoint);
|
||||
} else {
|
||||
panic!("Missing start-point");
|
||||
}
|
||||
} else {
|
||||
// No arguments, list branches
|
||||
list_branches()?;
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn list_branches() -> Result<()> {
|
||||
// list branches
|
||||
let paths = fs::read_dir(zenith_repo_dir().join("refs").join("branches"))?;
|
||||
|
||||
for path in paths {
|
||||
println!(" {}", path?.file_name().to_str().unwrap());
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
//
|
||||
// Parse user-given string that represents a point-in-time.
|
||||
//
|
||||
// We support multiple variants:
|
||||
//
|
||||
// Raw timeline id in hex, meaning the end of that timeline:
|
||||
// bc62e7d612d0e6fe8f99a6dd2f281f9d
|
||||
//
|
||||
// A specific LSN on a timeline:
|
||||
// bc62e7d612d0e6fe8f99a6dd2f281f9d@2/15D3DD8
|
||||
//
|
||||
// Same, with a human-friendly branch name:
|
||||
// main
|
||||
// main@2/15D3DD8
|
||||
//
|
||||
// Human-friendly tag name:
|
||||
// mytag
|
||||
//
|
||||
//
|
||||
fn parse_point_in_time(s: &str) -> Result<local_env::PointInTime> {
|
||||
let mut strings = s.split('@');
|
||||
let name = strings.next().unwrap();
|
||||
|
||||
let lsn: Option<Lsn>;
|
||||
if let Some(lsnstr) = strings.next() {
|
||||
lsn = Some(
|
||||
Lsn::from_str(lsnstr)
|
||||
.with_context(|| "invalid LSN in point-in-time specification")?
|
||||
);
|
||||
} else {
|
||||
lsn = None
|
||||
}
|
||||
|
||||
// Check if it's a tag
|
||||
if lsn.is_none() {
|
||||
let tagpath = zenith_repo_dir().join("refs").join("tags").join(name);
|
||||
if tagpath.exists() {
|
||||
let pointstr = fs::read_to_string(tagpath)?;
|
||||
|
||||
return parse_point_in_time(&pointstr);
|
||||
}
|
||||
}
|
||||
// Check if it's a branch
|
||||
// Check if it's branch @ LSN
|
||||
let branchpath = zenith_repo_dir().join("refs").join("branches").join(name);
|
||||
if branchpath.exists() {
|
||||
let pointstr = fs::read_to_string(branchpath)?;
|
||||
|
||||
let mut result = parse_point_in_time(&pointstr)?;
|
||||
|
||||
result.lsn = lsn.unwrap_or(Lsn(0));
|
||||
return Ok(result);
|
||||
}
|
||||
|
||||
// Check if it's a timelineid
|
||||
// Check if it's timelineid @ LSN
|
||||
let tlipath = zenith_repo_dir().join("timelines").join(name);
|
||||
if tlipath.exists() {
|
||||
let result = local_env::PointInTime {
|
||||
timelineid: ZTimelineId::from_str(name)?,
|
||||
lsn: lsn.unwrap_or(Lsn(0)),
|
||||
};
|
||||
|
||||
return Ok(result);
|
||||
}
|
||||
|
||||
panic!("could not parse point-in-time {}", s);
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user