mirror of
https://github.com/neondatabase/neon.git
synced 2026-05-16 20:50:37 +00:00
Improve 'zenith' CLI utility for safekeepers and a config file.
The 'zenith' CLI utility can now be used to launch safekeepers. By default, one safekeeper is configured. There are new 'safekeeper start/stop' subcommands to manage the safekeepers. Each safekeeper is given a name that can be used to identify the safekeeper to start/stop with the 'zenith start/stop' commands. The safekeeper data is stored in '.zenith/safekeepers/<name>'. The 'zenith start' command now starts the pageserver and also all safekeepers. 'zenith stop' stops pageserver, all safekeepers, and all postgres nodes. Introduce new 'zenith pageserver start/stop' subcommands for starting/stopping just the page server. The biggest change here is to the 'zenith init' command. This adds a new 'zenith init --config=<path to toml file>' option. It takes a toml config file that describes the environment. In the config file, you can specify options for the pageserver, like the pg and http ports, and authentication. For each safekeeper, you can define a name and the pg and http ports. If you don't use the --config option, you get a default configuration with a pageserver and one safekeeper. Note that that's different from the previous default of no safekeepers. Any fields that are omitted in the configuration file are filled with defaults. You can also specify the initial tenant ID in the config file. A couple of sample config files are added in the control_plane/ directory. The --pageserver-pg-port, --pageserver-http-port, and --pageserver-auth options to 'zenith init' are removed. Use a config file instead. Finally, change the python test fixtures to use the new 'zenith' commands and the config file to describe the environment.
This commit is contained in:
1
Cargo.lock
generated
1
Cargo.lock
generated
@@ -2549,6 +2549,7 @@ dependencies = [
|
||||
"postgres",
|
||||
"postgres_ffi",
|
||||
"serde_json",
|
||||
"walkeeper",
|
||||
"workspace_hack",
|
||||
"zenith_utils",
|
||||
]
|
||||
|
||||
22
README.md
22
README.md
@@ -47,17 +47,26 @@ make -j5
|
||||
# Create repository in .zenith with proper paths to binaries and data
|
||||
# Later that would be responsibility of a package install script
|
||||
> ./target/debug/zenith init
|
||||
initializing tenantid c03ba6b7ad4c5e9cf556f059ade44229
|
||||
created initial timeline 5b014a9e41b4b63ce1a1febc04503636 timeline.lsn 0/169C3C8
|
||||
created main branch
|
||||
pageserver init succeeded
|
||||
|
||||
# start pageserver
|
||||
# start pageserver and safekeeper
|
||||
> ./target/debug/zenith start
|
||||
Starting pageserver at '127.0.0.1:64000' in .zenith
|
||||
Starting pageserver at 'localhost:64000' in '.zenith'
|
||||
Pageserver started
|
||||
initializing for single for 7676
|
||||
Starting safekeeper at 'localhost:5454' in '.zenith/safekeepers/single'
|
||||
Safekeeper started
|
||||
|
||||
# start postgres on top on the pageserver
|
||||
# start postgres compute node
|
||||
> ./target/debug/zenith pg start main
|
||||
Starting postgres node at 'host=127.0.0.1 port=55432 user=stas'
|
||||
Starting new postgres main on main...
|
||||
Extracting base backup to create postgres instance: path=.zenith/pgdatadirs/tenants/c03ba6b7ad4c5e9cf556f059ade44229/main port=55432
|
||||
Starting postgres node at 'host=127.0.0.1 port=55432 user=zenith_admin dbname=postgres'
|
||||
waiting for server to start.... done
|
||||
server started
|
||||
|
||||
# check list of running postgres instances
|
||||
> ./target/debug/zenith pg list
|
||||
@@ -108,10 +117,9 @@ postgres=# insert into t values(2,2);
|
||||
INSERT 0 1
|
||||
```
|
||||
|
||||
6. If you want to run tests afterwards (see below), you have to stop pageserver and all postgres instances you have just started:
|
||||
6. If you want to run tests afterwards (see below), you have to stop all the running the pageserver, safekeeper and postgres instances
|
||||
you have just started. You can stop them all with one command:
|
||||
```sh
|
||||
> ./target/debug/zenith pg stop migration_check
|
||||
> ./target/debug/zenith pg stop main
|
||||
> ./target/debug/zenith stop
|
||||
```
|
||||
|
||||
|
||||
20
control_plane/safekeepers.conf
Normal file
20
control_plane/safekeepers.conf
Normal file
@@ -0,0 +1,20 @@
|
||||
# Page server and three safekeepers.
|
||||
[pageserver]
|
||||
pg_port = 64000
|
||||
http_port = 9898
|
||||
auth_type = 'Trust'
|
||||
|
||||
[[safekeepers]]
|
||||
name = 'sk1'
|
||||
pg_port = 5454
|
||||
http_port = 7676
|
||||
|
||||
[[safekeepers]]
|
||||
name = 'sk2'
|
||||
pg_port = 5455
|
||||
http_port = 7677
|
||||
|
||||
[[safekeepers]]
|
||||
name = 'sk3'
|
||||
pg_port = 5456
|
||||
http_port = 7678
|
||||
11
control_plane/simple.conf
Normal file
11
control_plane/simple.conf
Normal file
@@ -0,0 +1,11 @@
|
||||
# Minimal zenith environment with one safekeeper. This is equivalent to the built-in
|
||||
# defaults that you get with no --config
|
||||
[pageserver]
|
||||
pg_port = 64000
|
||||
http_port = 9898
|
||||
auth_type = 'Trust'
|
||||
|
||||
[[safekeepers]]
|
||||
name = 'single'
|
||||
pg_port = 5454
|
||||
http_port = 7676
|
||||
@@ -125,7 +125,7 @@ impl ComputeControlPlane {
|
||||
});
|
||||
|
||||
node.create_pgdata()?;
|
||||
node.setup_pg_conf(self.env.auth_type)?;
|
||||
node.setup_pg_conf(self.env.pageserver.auth_type)?;
|
||||
|
||||
self.nodes
|
||||
.insert((tenantid, node.name.clone()), Arc::clone(&node));
|
||||
@@ -328,9 +328,25 @@ impl PostgresNode {
|
||||
}
|
||||
conf.append_line("");
|
||||
|
||||
// Configure the node to stream WAL directly to the pageserver
|
||||
conf.append("synchronous_standby_names", "pageserver"); // TODO: add a new function arg?
|
||||
conf.append("zenith.callmemaybe_connstring", &self.connstr());
|
||||
if !self.env.safekeepers.is_empty() {
|
||||
// Configure the node to connect to the safekeepers
|
||||
conf.append("synchronous_standby_names", "walproposer");
|
||||
|
||||
let wal_acceptors = self
|
||||
.env
|
||||
.safekeepers
|
||||
.iter()
|
||||
.map(|sk| format!("localhost:{}", sk.pg_port))
|
||||
.collect::<Vec<String>>()
|
||||
.join(",");
|
||||
conf.append("wal_acceptors", &wal_acceptors);
|
||||
} else {
|
||||
// Configure the node to stream WAL directly to the pageserver
|
||||
// This isn't really a supported configuration, but can be useful for
|
||||
// testing.
|
||||
conf.append("synchronous_standby_names", "pageserver");
|
||||
conf.append("zenith.callmemaybe_connstring", &self.connstr());
|
||||
}
|
||||
|
||||
let mut file = File::create(self.pgdata().join("postgresql.conf"))?;
|
||||
file.write_all(conf.to_string().as_bytes())?;
|
||||
|
||||
@@ -13,6 +13,7 @@ use std::path::Path;
|
||||
pub mod compute;
|
||||
pub mod local_env;
|
||||
pub mod postgresql_conf;
|
||||
pub mod safekeeper;
|
||||
pub mod storage;
|
||||
|
||||
/// Read a PID file
|
||||
|
||||
@@ -7,36 +7,41 @@
|
||||
use anyhow::{Context, Result};
|
||||
use serde::{Deserialize, Serialize};
|
||||
use std::env;
|
||||
use std::fmt::Write;
|
||||
use std::fs;
|
||||
use std::path::PathBuf;
|
||||
use std::path::{Path, PathBuf};
|
||||
use std::process::{Command, Stdio};
|
||||
use zenith_utils::auth::{encode_from_key_path, Claims, Scope};
|
||||
use zenith_utils::auth::{encode_from_key_file, Claims, Scope};
|
||||
use zenith_utils::postgres_backend::AuthType;
|
||||
use zenith_utils::zid::ZTenantId;
|
||||
|
||||
//
|
||||
// This data structures represent deserialized zenith CLI config
|
||||
// This data structures represents zenith CLI config
|
||||
//
|
||||
// It is deserialized from the .zenith/config file, or the config file passed
|
||||
// to 'zenith init --config=<path>' option. See control_plane/simple.conf for
|
||||
// an example.
|
||||
//
|
||||
#[derive(Serialize, Deserialize, Clone, Debug)]
|
||||
pub struct LocalEnv {
|
||||
// Pageserver connection settings
|
||||
pub pageserver_pg_port: u16,
|
||||
pub pageserver_http_port: u16,
|
||||
|
||||
// Base directory for both pageserver and compute nodes
|
||||
// Base directory for all the nodes (the pageserver, safekeepers and
|
||||
// compute nodes).
|
||||
//
|
||||
// This is not stored in the config file. Rather, this is the path where the
|
||||
// config file itself is. It is read from the ZENITH_REPO_DIR env variable or
|
||||
// '.zenith' if not given.
|
||||
#[serde(skip)]
|
||||
pub base_data_dir: PathBuf,
|
||||
|
||||
// Path to postgres distribution. It's expected that "bin", "include",
|
||||
// "lib", "share" from postgres distribution are there. If at some point
|
||||
// in time we will be able to run against vanilla postgres we may split that
|
||||
// to four separate paths and match OS-specific installation layout.
|
||||
#[serde(default)]
|
||||
pub pg_distrib_dir: PathBuf,
|
||||
|
||||
// Path to pageserver binary.
|
||||
#[serde(default)]
|
||||
pub zenith_distrib_dir: PathBuf,
|
||||
|
||||
// Default tenant ID to use with the 'zenith' command line utility, when
|
||||
@@ -45,14 +50,59 @@ pub struct LocalEnv {
|
||||
#[serde(default)]
|
||||
pub default_tenantid: Option<ZTenantId>,
|
||||
|
||||
// jwt auth token used for communication with pageserver
|
||||
pub auth_token: String,
|
||||
// used to issue tokens during e.g pg start
|
||||
#[serde(default)]
|
||||
pub private_key_path: PathBuf,
|
||||
|
||||
pub pageserver: PageServerConf,
|
||||
|
||||
#[serde(default)]
|
||||
pub safekeepers: Vec<SafekeeperConf>,
|
||||
}
|
||||
|
||||
#[derive(Serialize, Deserialize, Clone, Debug)]
|
||||
#[serde(default)]
|
||||
pub struct PageServerConf {
|
||||
// Pageserver connection settings
|
||||
pub pg_port: u16,
|
||||
pub http_port: u16,
|
||||
|
||||
// used to determine which auth type is used
|
||||
pub auth_type: AuthType,
|
||||
|
||||
// used to issue tokens during e.g pg start
|
||||
pub private_key_path: PathBuf,
|
||||
// jwt auth token used for communication with pageserver
|
||||
pub auth_token: String,
|
||||
}
|
||||
|
||||
impl Default for PageServerConf {
|
||||
fn default() -> Self {
|
||||
Self {
|
||||
pg_port: 0,
|
||||
http_port: 0,
|
||||
auth_type: AuthType::Trust,
|
||||
auth_token: "".to_string(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Serialize, Deserialize, Clone, Debug)]
|
||||
#[serde(default)]
|
||||
pub struct SafekeeperConf {
|
||||
pub name: String,
|
||||
pub pg_port: u16,
|
||||
pub http_port: u16,
|
||||
pub sync: bool,
|
||||
}
|
||||
|
||||
impl Default for SafekeeperConf {
|
||||
fn default() -> Self {
|
||||
Self {
|
||||
name: "".to_string(),
|
||||
pg_port: 0,
|
||||
http_port: 0,
|
||||
sync: true,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl LocalEnv {
|
||||
@@ -68,6 +118,10 @@ impl LocalEnv {
|
||||
Ok(self.zenith_distrib_dir.join("pageserver"))
|
||||
}
|
||||
|
||||
pub fn safekeeper_bin(&self) -> Result<PathBuf> {
|
||||
Ok(self.zenith_distrib_dir.join("safekeeper"))
|
||||
}
|
||||
|
||||
pub fn pg_data_dirs_path(&self) -> PathBuf {
|
||||
self.base_data_dir.join("pgdatadirs").join("tenants")
|
||||
}
|
||||
@@ -82,6 +136,187 @@ impl LocalEnv {
|
||||
pub fn pageserver_data_dir(&self) -> PathBuf {
|
||||
self.base_data_dir.clone()
|
||||
}
|
||||
|
||||
pub fn safekeeper_data_dir(&self, node_name: &str) -> PathBuf {
|
||||
self.base_data_dir.join("safekeepers").join(node_name)
|
||||
}
|
||||
|
||||
/// Create a LocalEnv from a config file.
|
||||
///
|
||||
/// Unlike 'load_config', this function fills in any defaults that are missing
|
||||
/// from the config file.
|
||||
pub fn create_config(toml: &str) -> Result<LocalEnv> {
|
||||
let mut env: LocalEnv = toml::from_str(toml)?;
|
||||
|
||||
// Find postgres binaries.
|
||||
// Follow POSTGRES_DISTRIB_DIR if set, otherwise look in "tmp_install".
|
||||
if env.pg_distrib_dir == Path::new("") {
|
||||
if let Some(postgres_bin) = env::var_os("POSTGRES_DISTRIB_DIR") {
|
||||
env.pg_distrib_dir = postgres_bin.into();
|
||||
} else {
|
||||
let cwd = env::current_dir()?;
|
||||
env.pg_distrib_dir = cwd.join("tmp_install")
|
||||
}
|
||||
}
|
||||
if !env.pg_distrib_dir.join("bin/postgres").exists() {
|
||||
anyhow::bail!(
|
||||
"Can't find postgres binary at {}",
|
||||
env.pg_distrib_dir.display()
|
||||
);
|
||||
}
|
||||
|
||||
// Find zenith binaries.
|
||||
if env.zenith_distrib_dir == Path::new("") {
|
||||
env.zenith_distrib_dir = env::current_exe()?.parent().unwrap().to_owned();
|
||||
}
|
||||
if !env.zenith_distrib_dir.join("pageserver").exists() {
|
||||
anyhow::bail!("Can't find pageserver binary.");
|
||||
}
|
||||
if !env.zenith_distrib_dir.join("safekeeper").exists() {
|
||||
anyhow::bail!("Can't find safekeeper binary.");
|
||||
}
|
||||
|
||||
// If no initial tenant ID was given, generate it.
|
||||
if env.default_tenantid.is_none() {
|
||||
env.default_tenantid = Some(ZTenantId::generate());
|
||||
}
|
||||
|
||||
env.base_data_dir = base_path();
|
||||
|
||||
Ok(env)
|
||||
}
|
||||
|
||||
/// Locate and load config
|
||||
pub fn load_config() -> Result<LocalEnv> {
|
||||
let repopath = base_path();
|
||||
|
||||
if !repopath.exists() {
|
||||
anyhow::bail!(
|
||||
"Zenith config is not found in {}. You need to run 'zenith init' first",
|
||||
repopath.to_str().unwrap()
|
||||
);
|
||||
}
|
||||
|
||||
// TODO: check that it looks like a zenith repository
|
||||
|
||||
// load and parse file
|
||||
let config = fs::read_to_string(repopath.join("config"))?;
|
||||
let mut env: LocalEnv = toml::from_str(config.as_str())?;
|
||||
|
||||
env.base_data_dir = repopath;
|
||||
|
||||
Ok(env)
|
||||
}
|
||||
|
||||
// this function is used only for testing purposes in CLI e g generate tokens during init
|
||||
pub fn generate_auth_token(&self, claims: &Claims) -> Result<String> {
|
||||
let private_key_path = if self.private_key_path.is_absolute() {
|
||||
self.private_key_path.to_path_buf()
|
||||
} else {
|
||||
self.base_data_dir.join(&self.private_key_path)
|
||||
};
|
||||
|
||||
let key_data = fs::read(private_key_path)?;
|
||||
encode_from_key_file(claims, &key_data)
|
||||
}
|
||||
|
||||
//
|
||||
// Initialize a new Zenith repository
|
||||
//
|
||||
pub fn init(&mut self) -> Result<()> {
|
||||
// check if config already exists
|
||||
let base_path = &self.base_data_dir;
|
||||
if base_path == Path::new("") {
|
||||
anyhow::bail!("repository base path is missing");
|
||||
}
|
||||
if base_path.exists() {
|
||||
anyhow::bail!(
|
||||
"directory '{}' already exists. Perhaps already initialized?",
|
||||
base_path.to_str().unwrap()
|
||||
);
|
||||
}
|
||||
|
||||
fs::create_dir(&base_path)?;
|
||||
|
||||
// generate keys for jwt
|
||||
// openssl genrsa -out private_key.pem 2048
|
||||
let private_key_path;
|
||||
if self.private_key_path == PathBuf::new() {
|
||||
private_key_path = base_path.join("auth_private_key.pem");
|
||||
let keygen_output = Command::new("openssl")
|
||||
.arg("genrsa")
|
||||
.args(&["-out", private_key_path.to_str().unwrap()])
|
||||
.arg("2048")
|
||||
.stdout(Stdio::null())
|
||||
.output()
|
||||
.with_context(|| "failed to generate auth private key")?;
|
||||
if !keygen_output.status.success() {
|
||||
anyhow::bail!(
|
||||
"openssl failed: '{}'",
|
||||
String::from_utf8_lossy(&keygen_output.stderr)
|
||||
);
|
||||
}
|
||||
self.private_key_path = Path::new("auth_private_key.pem").to_path_buf();
|
||||
|
||||
let public_key_path = base_path.join("auth_public_key.pem");
|
||||
// openssl rsa -in private_key.pem -pubout -outform PEM -out public_key.pem
|
||||
let keygen_output = Command::new("openssl")
|
||||
.arg("rsa")
|
||||
.args(&["-in", private_key_path.to_str().unwrap()])
|
||||
.arg("-pubout")
|
||||
.args(&["-outform", "PEM"])
|
||||
.args(&["-out", public_key_path.to_str().unwrap()])
|
||||
.stdout(Stdio::null())
|
||||
.output()
|
||||
.with_context(|| "failed to generate auth private key")?;
|
||||
if !keygen_output.status.success() {
|
||||
anyhow::bail!(
|
||||
"openssl failed: '{}'",
|
||||
String::from_utf8_lossy(&keygen_output.stderr)
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
self.pageserver.auth_token =
|
||||
self.generate_auth_token(&Claims::new(None, Scope::PageServerApi))?;
|
||||
|
||||
fs::create_dir_all(self.pg_data_dirs_path())?;
|
||||
|
||||
for safekeeper in self.safekeepers.iter() {
|
||||
fs::create_dir_all(self.safekeeper_data_dir(&safekeeper.name))?;
|
||||
}
|
||||
|
||||
let mut conf_content = String::new();
|
||||
|
||||
// Currently, the user first passes a config file with 'zenith init --config=<path>'
|
||||
// We read that in, in `create_config`, and fill any missing defaults. Then it's saved
|
||||
// to .zenith/config. TODO: We lose any formatting and comments along the way, which is
|
||||
// a bit sad.
|
||||
write!(
|
||||
&mut conf_content,
|
||||
r#"# This file describes a locale deployment of the page server
|
||||
# and safekeeeper node. It is read by the 'zenith' command-line
|
||||
# utility.
|
||||
"#
|
||||
)?;
|
||||
|
||||
// Convert the LocalEnv to a toml file.
|
||||
//
|
||||
// This could be as simple as this:
|
||||
//
|
||||
// conf_content += &toml::to_string_pretty(env)?;
|
||||
//
|
||||
// But it results in a "values must be emitted before tables". I'm not sure
|
||||
// why, AFAICS the table, i.e. 'safekeepers: Vec<SafekeeperConf>' is last.
|
||||
// Maybe rust reorders the fields to squeeze avoid padding or something?
|
||||
// In any case, converting to toml::Value first, and serializing that, works.
|
||||
// See https://github.com/alexcrichton/toml-rs/issues/142
|
||||
conf_content += &toml::to_string_pretty(&toml::Value::try_from(&self)?)?;
|
||||
|
||||
fs::write(base_path.join("config"), conf_content)?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
fn base_path() -> PathBuf {
|
||||
@@ -91,122 +326,6 @@ fn base_path() -> PathBuf {
|
||||
}
|
||||
}
|
||||
|
||||
//
|
||||
// Initialize a new Zenith repository
|
||||
//
|
||||
pub fn init(
|
||||
pageserver_pg_port: u16,
|
||||
pageserver_http_port: u16,
|
||||
tenantid: ZTenantId,
|
||||
auth_type: AuthType,
|
||||
) -> Result<()> {
|
||||
// check if config already exists
|
||||
let base_path = base_path();
|
||||
if base_path.exists() {
|
||||
anyhow::bail!(
|
||||
"{} already exists. Perhaps already initialized?",
|
||||
base_path.to_str().unwrap()
|
||||
);
|
||||
}
|
||||
fs::create_dir(&base_path)?;
|
||||
|
||||
// ok, now check that expected binaries are present
|
||||
|
||||
// Find postgres binaries. Follow POSTGRES_DISTRIB_DIR if set, otherwise look in "tmp_install".
|
||||
let pg_distrib_dir: PathBuf = {
|
||||
if let Some(postgres_bin) = env::var_os("POSTGRES_DISTRIB_DIR") {
|
||||
postgres_bin.into()
|
||||
} else {
|
||||
let cwd = env::current_dir()?;
|
||||
cwd.join("tmp_install")
|
||||
}
|
||||
};
|
||||
if !pg_distrib_dir.join("bin/postgres").exists() {
|
||||
anyhow::bail!("Can't find postgres binary at {:?}", pg_distrib_dir);
|
||||
}
|
||||
|
||||
// generate keys for jwt
|
||||
// openssl genrsa -out private_key.pem 2048
|
||||
let private_key_path = base_path.join("auth_private_key.pem");
|
||||
let keygen_output = Command::new("openssl")
|
||||
.arg("genrsa")
|
||||
.args(&["-out", private_key_path.to_str().unwrap()])
|
||||
.arg("2048")
|
||||
.stdout(Stdio::null())
|
||||
.output()
|
||||
.with_context(|| "failed to generate auth private key")?;
|
||||
if !keygen_output.status.success() {
|
||||
anyhow::bail!(
|
||||
"openssl failed: '{}'",
|
||||
String::from_utf8_lossy(&keygen_output.stderr)
|
||||
);
|
||||
}
|
||||
|
||||
let public_key_path = base_path.join("auth_public_key.pem");
|
||||
// openssl rsa -in private_key.pem -pubout -outform PEM -out public_key.pem
|
||||
let keygen_output = Command::new("openssl")
|
||||
.arg("rsa")
|
||||
.args(&["-in", private_key_path.to_str().unwrap()])
|
||||
.arg("-pubout")
|
||||
.args(&["-outform", "PEM"])
|
||||
.args(&["-out", public_key_path.to_str().unwrap()])
|
||||
.stdout(Stdio::null())
|
||||
.output()
|
||||
.with_context(|| "failed to generate auth private key")?;
|
||||
if !keygen_output.status.success() {
|
||||
anyhow::bail!(
|
||||
"openssl failed: '{}'",
|
||||
String::from_utf8_lossy(&keygen_output.stderr)
|
||||
);
|
||||
}
|
||||
|
||||
let auth_token =
|
||||
encode_from_key_path(&Claims::new(None, Scope::PageServerApi), &private_key_path)?;
|
||||
|
||||
// Find zenith binaries.
|
||||
let zenith_distrib_dir = env::current_exe()?.parent().unwrap().to_owned();
|
||||
if !zenith_distrib_dir.join("pageserver").exists() {
|
||||
anyhow::bail!("Can't find pageserver binary.",);
|
||||
}
|
||||
|
||||
let conf = LocalEnv {
|
||||
pageserver_pg_port,
|
||||
pageserver_http_port,
|
||||
pg_distrib_dir,
|
||||
zenith_distrib_dir,
|
||||
base_data_dir: base_path,
|
||||
default_tenantid: Some(tenantid),
|
||||
auth_token,
|
||||
auth_type,
|
||||
private_key_path,
|
||||
};
|
||||
|
||||
fs::create_dir_all(conf.pg_data_dirs_path())?;
|
||||
|
||||
let toml = toml::to_string_pretty(&conf)?;
|
||||
fs::write(conf.base_data_dir.join("config"), toml)?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
// Locate and load config
|
||||
pub fn load_config() -> Result<LocalEnv> {
|
||||
let repopath = base_path();
|
||||
|
||||
if !repopath.exists() {
|
||||
anyhow::bail!(
|
||||
"Zenith config is not found in {}. You need to run 'zenith init' first",
|
||||
repopath.to_str().unwrap()
|
||||
);
|
||||
}
|
||||
|
||||
// TODO: check that it looks like a zenith repository
|
||||
|
||||
// load and parse file
|
||||
let config = fs::read_to_string(repopath.join("config"))?;
|
||||
toml::from_str(config.as_str()).map_err(|e| e.into())
|
||||
}
|
||||
|
||||
/// Serde routines for Option<ZTenantId>. The serialized form is a hex string.
|
||||
mod opt_tenantid_serde {
|
||||
use serde::{Deserialize, Deserializer, Serialize, Serializer};
|
||||
|
||||
256
control_plane/src/safekeeper.rs
Normal file
256
control_plane/src/safekeeper.rs
Normal file
@@ -0,0 +1,256 @@
|
||||
use std::io::Write;
|
||||
use std::net::TcpStream;
|
||||
use std::path::PathBuf;
|
||||
use std::process::Command;
|
||||
use std::sync::Arc;
|
||||
use std::time::Duration;
|
||||
use std::{io, result, thread};
|
||||
|
||||
use anyhow::bail;
|
||||
use nix::sys::signal::{kill, Signal};
|
||||
use nix::unistd::Pid;
|
||||
use postgres::Config;
|
||||
use reqwest::blocking::{Client, RequestBuilder, Response};
|
||||
use reqwest::{IntoUrl, Method};
|
||||
use thiserror::Error;
|
||||
use zenith_utils::http::error::HttpErrorBody;
|
||||
use zenith_utils::postgres_backend::AuthType;
|
||||
|
||||
use crate::local_env::{LocalEnv, SafekeeperConf};
|
||||
use crate::read_pidfile;
|
||||
use crate::storage::PageServerNode;
|
||||
use zenith_utils::connstring::connection_address;
|
||||
use zenith_utils::connstring::connection_host_port;
|
||||
|
||||
#[derive(Error, Debug)]
|
||||
pub enum SafekeeperHttpError {
|
||||
#[error("Reqwest error: {0}")]
|
||||
Transport(#[from] reqwest::Error),
|
||||
|
||||
#[error("Error: {0}")]
|
||||
Response(String),
|
||||
}
|
||||
|
||||
type Result<T> = result::Result<T, SafekeeperHttpError>;
|
||||
|
||||
pub trait ResponseErrorMessageExt: Sized {
|
||||
fn error_from_body(self) -> Result<Self>;
|
||||
}
|
||||
|
||||
impl ResponseErrorMessageExt for Response {
|
||||
fn error_from_body(self) -> Result<Self> {
|
||||
let status = self.status();
|
||||
if !(status.is_client_error() || status.is_server_error()) {
|
||||
return Ok(self);
|
||||
}
|
||||
|
||||
// reqwest do not export it's error construction utility functions, so lets craft the message ourselves
|
||||
let url = self.url().to_owned();
|
||||
Err(SafekeeperHttpError::Response(
|
||||
match self.json::<HttpErrorBody>() {
|
||||
Ok(err_body) => format!("Error: {}", err_body.msg),
|
||||
Err(_) => format!("Http error ({}) at {}.", status.as_u16(), url),
|
||||
},
|
||||
))
|
||||
}
|
||||
}
|
||||
|
||||
//
|
||||
// Control routines for safekeeper.
|
||||
//
|
||||
// Used in CLI and tests.
|
||||
//
|
||||
#[derive(Debug)]
|
||||
pub struct SafekeeperNode {
|
||||
pub name: String,
|
||||
|
||||
pub conf: SafekeeperConf,
|
||||
|
||||
pub pg_connection_config: Config,
|
||||
pub env: LocalEnv,
|
||||
pub http_client: Client,
|
||||
pub http_base_url: String,
|
||||
|
||||
pub pageserver: Arc<PageServerNode>,
|
||||
}
|
||||
|
||||
impl SafekeeperNode {
|
||||
pub fn from_env(env: &LocalEnv, conf: &SafekeeperConf) -> SafekeeperNode {
|
||||
let pageserver = Arc::new(PageServerNode::from_env(env));
|
||||
|
||||
println!("initializing for {} for {}", conf.name, conf.http_port);
|
||||
|
||||
SafekeeperNode {
|
||||
name: conf.name.clone(),
|
||||
conf: conf.clone(),
|
||||
pg_connection_config: Self::safekeeper_connection_config(conf.pg_port),
|
||||
env: env.clone(),
|
||||
http_client: Client::new(),
|
||||
http_base_url: format!("http://localhost:{}/v1", conf.http_port),
|
||||
pageserver,
|
||||
}
|
||||
}
|
||||
|
||||
/// Construct libpq connection string for connecting to this safekeeper.
|
||||
fn safekeeper_connection_config(port: u16) -> Config {
|
||||
// TODO safekeeper authentication not implemented yet
|
||||
format!("postgresql://no_user@localhost:{}/no_db", port)
|
||||
.parse()
|
||||
.unwrap()
|
||||
}
|
||||
|
||||
pub fn datadir_path(&self) -> PathBuf {
|
||||
self.env.safekeeper_data_dir(&self.name)
|
||||
}
|
||||
|
||||
pub fn pid_file(&self) -> PathBuf {
|
||||
self.datadir_path().join("safekeeper.pid")
|
||||
}
|
||||
|
||||
pub fn start(&self) -> anyhow::Result<()> {
|
||||
print!(
|
||||
"Starting safekeeper at '{}' in '{}'",
|
||||
connection_address(&self.pg_connection_config),
|
||||
self.datadir_path().display()
|
||||
);
|
||||
io::stdout().flush().unwrap();
|
||||
|
||||
// Configure connection to page server
|
||||
//
|
||||
// FIXME: We extract the host and port from the connection string instead of using
|
||||
// the connection string directly, because the 'safekeeper' binary expects
|
||||
// host:port format. That's a bit silly when we already have a full libpq connection
|
||||
// string at hand.
|
||||
let pageserver_conn = {
|
||||
let (host, port) = connection_host_port(&self.pageserver.pg_connection_config);
|
||||
format!("{}:{}", host, port)
|
||||
};
|
||||
|
||||
let listen_pg = format!("localhost:{}", self.conf.pg_port);
|
||||
let listen_http = format!("localhost:{}", self.conf.http_port);
|
||||
|
||||
let mut cmd: &mut Command = &mut Command::new(self.env.safekeeper_bin()?);
|
||||
cmd = cmd
|
||||
.args(&["-D", self.datadir_path().to_str().unwrap()])
|
||||
.args(&["--listen-pg", &listen_pg])
|
||||
.args(&["--listen-http", &listen_http])
|
||||
.args(&["--pageserver", &pageserver_conn])
|
||||
.args(&["--recall", "1 second"])
|
||||
.arg("--daemonize")
|
||||
.env_clear()
|
||||
.env("RUST_BACKTRACE", "1");
|
||||
if !self.conf.sync {
|
||||
cmd = cmd.arg("--no-sync");
|
||||
}
|
||||
|
||||
if self.env.pageserver.auth_type == AuthType::ZenithJWT {
|
||||
cmd.env("PAGESERVER_AUTH_TOKEN", &self.env.pageserver.auth_token);
|
||||
}
|
||||
|
||||
if !cmd.status()?.success() {
|
||||
bail!(
|
||||
"Safekeeper failed to start. See '{}' for details.",
|
||||
self.datadir_path().join("safekeeper.log").display()
|
||||
);
|
||||
}
|
||||
|
||||
// It takes a while for the safekeeper to start up. Wait until it is
|
||||
// open for business.
|
||||
const RETRIES: i8 = 15;
|
||||
for retries in 1..RETRIES {
|
||||
match self.check_status() {
|
||||
Ok(_) => {
|
||||
println!("\nSafekeeper started");
|
||||
return Ok(());
|
||||
}
|
||||
Err(err) => {
|
||||
match err {
|
||||
SafekeeperHttpError::Transport(err) => {
|
||||
if err.is_connect() && retries < 5 {
|
||||
print!(".");
|
||||
io::stdout().flush().unwrap();
|
||||
} else {
|
||||
if retries == 5 {
|
||||
println!() // put a line break after dots for second message
|
||||
}
|
||||
println!(
|
||||
"Safekeeper not responding yet, err {} retrying ({})...",
|
||||
err, retries
|
||||
);
|
||||
}
|
||||
}
|
||||
SafekeeperHttpError::Response(msg) => {
|
||||
bail!("safekeeper failed to start: {} ", msg)
|
||||
}
|
||||
}
|
||||
thread::sleep(Duration::from_secs(1));
|
||||
}
|
||||
}
|
||||
}
|
||||
bail!("safekeeper failed to start in {} seconds", RETRIES);
|
||||
}
|
||||
|
||||
pub fn stop(&self, immediate: bool) -> anyhow::Result<()> {
|
||||
let pid_file = self.pid_file();
|
||||
if !pid_file.exists() {
|
||||
println!("Safekeeper {} is already stopped", self.name);
|
||||
return Ok(())
|
||||
}
|
||||
let pid = read_pidfile(&pid_file)?;
|
||||
let pid = Pid::from_raw(pid);
|
||||
if immediate {
|
||||
println!("Stop safekeeper immediately");
|
||||
if kill(pid, Signal::SIGQUIT).is_err() {
|
||||
bail!("Failed to kill safekeeper with pid {}", pid);
|
||||
}
|
||||
} else {
|
||||
println!("Stop safekeeper gracefully");
|
||||
if kill(pid, Signal::SIGTERM).is_err() {
|
||||
bail!("Failed to stop safekeeper with pid {}", pid);
|
||||
}
|
||||
}
|
||||
|
||||
let address = connection_address(&self.pg_connection_config);
|
||||
|
||||
// TODO Remove this "timeout" and handle it on caller side instead.
|
||||
// Shutting down may take a long time,
|
||||
// if safekeeper flushes a lot of data
|
||||
for _ in 0..100 {
|
||||
if let Err(_e) = TcpStream::connect(&address) {
|
||||
println!("Safekeeper stopped receiving connections");
|
||||
|
||||
//Now check status
|
||||
match self.check_status() {
|
||||
Ok(_) => {
|
||||
println!("Safekeeper status is OK. Wait a bit.");
|
||||
thread::sleep(Duration::from_secs(1));
|
||||
}
|
||||
Err(err) => {
|
||||
println!("Safekeeper status is: {}", err);
|
||||
return Ok(());
|
||||
}
|
||||
}
|
||||
} else {
|
||||
println!("Safekeeper still receives connections");
|
||||
thread::sleep(Duration::from_secs(1));
|
||||
}
|
||||
}
|
||||
|
||||
bail!("Failed to stop safekeeper with pid {}", pid);
|
||||
}
|
||||
|
||||
fn http_request<U: IntoUrl>(&self, method: Method, url: U) -> RequestBuilder {
|
||||
// TODO: authentication
|
||||
//if self.env.auth_type == AuthType::ZenithJWT {
|
||||
// builder = builder.bearer_auth(&self.env.safekeeper_auth_token)
|
||||
//}
|
||||
self.http_client.request(method, url)
|
||||
}
|
||||
|
||||
pub fn check_status(&self) -> Result<()> {
|
||||
self.http_request(Method::GET, format!("{}/{}", self.http_base_url, "status"))
|
||||
.send()?
|
||||
.error_from_body()?;
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
@@ -71,8 +71,8 @@ pub struct PageServerNode {
|
||||
|
||||
impl PageServerNode {
|
||||
pub fn from_env(env: &LocalEnv) -> PageServerNode {
|
||||
let password = if env.auth_type == AuthType::ZenithJWT {
|
||||
&env.auth_token
|
||||
let password = if env.pageserver.auth_type == AuthType::ZenithJWT {
|
||||
&env.pageserver.auth_token
|
||||
} else {
|
||||
""
|
||||
};
|
||||
@@ -80,24 +80,25 @@ impl PageServerNode {
|
||||
PageServerNode {
|
||||
pg_connection_config: Self::pageserver_connection_config(
|
||||
password,
|
||||
env.pageserver_pg_port,
|
||||
env.pageserver.pg_port,
|
||||
),
|
||||
env: env.clone(),
|
||||
http_client: Client::new(),
|
||||
http_base_url: format!("http://localhost:{}/v1", env.pageserver_http_port),
|
||||
http_base_url: format!("http://localhost:{}/v1", env.pageserver.http_port),
|
||||
}
|
||||
}
|
||||
|
||||
/// Construct libpq connection string for connecting to the pageserver.
|
||||
fn pageserver_connection_config(password: &str, port: u16) -> Config {
|
||||
format!("postgresql://no_user:{}@localhost:{}/no_db", password, port)
|
||||
.parse()
|
||||
.unwrap()
|
||||
}
|
||||
|
||||
pub fn init(&self, create_tenant: Option<&str>, enable_auth: bool) -> anyhow::Result<()> {
|
||||
pub fn init(&self, create_tenant: Option<&str>) -> anyhow::Result<()> {
|
||||
let mut cmd = Command::new(self.env.pageserver_bin()?);
|
||||
let listen_pg = format!("localhost:{}", self.env.pageserver_pg_port);
|
||||
let listen_http = format!("localhost:{}", self.env.pageserver_http_port);
|
||||
let listen_pg = format!("localhost:{}", self.env.pageserver.pg_port);
|
||||
let listen_http = format!("localhost:{}", self.env.pageserver.http_port);
|
||||
let mut args = vec![
|
||||
"--init",
|
||||
"-D",
|
||||
@@ -110,10 +111,11 @@ impl PageServerNode {
|
||||
&listen_http,
|
||||
];
|
||||
|
||||
if enable_auth {
|
||||
let auth_type_str = &self.env.pageserver.auth_type.to_string();
|
||||
if self.env.pageserver.auth_type != AuthType::Trust {
|
||||
args.extend(&["--auth-validation-public-key-path", "auth_public_key.pem"]);
|
||||
args.extend(&["--auth-type", "ZenithJWT"]);
|
||||
}
|
||||
args.extend(&["--auth-type", auth_type_str]);
|
||||
|
||||
if let Some(tenantid) = create_tenant {
|
||||
args.extend(&["--create-tenant", tenantid])
|
||||
@@ -267,8 +269,8 @@ impl PageServerNode {
|
||||
|
||||
fn http_request<U: IntoUrl>(&self, method: Method, url: U) -> RequestBuilder {
|
||||
let mut builder = self.http_client.request(method, url);
|
||||
if self.env.auth_type == AuthType::ZenithJWT {
|
||||
builder = builder.bearer_auth(&self.env.auth_token)
|
||||
if self.env.pageserver.auth_type == AuthType::ZenithJWT {
|
||||
builder = builder.bearer_auth(&self.env.pageserver.auth_token)
|
||||
}
|
||||
builder
|
||||
}
|
||||
|
||||
@@ -16,6 +16,7 @@ import socket
|
||||
import subprocess
|
||||
import time
|
||||
import filecmp
|
||||
import tempfile
|
||||
|
||||
from contextlib import closing, suppress
|
||||
from pathlib import Path
|
||||
@@ -336,7 +337,7 @@ class ZenithEnvBuilder:
|
||||
|
||||
def __exit__(self, exc_type, exc_value, traceback):
|
||||
|
||||
# After the yield comes any cleanup code we need. Stop all the nodes.
|
||||
# Stop all the nodes.
|
||||
if self.env:
|
||||
log.info('Cleaning up all storage and compute nodes')
|
||||
self.env.postgres.stop_all()
|
||||
@@ -385,36 +386,68 @@ class ZenithEnv:
|
||||
|
||||
self.safekeepers: List[Safekeeper] = []
|
||||
|
||||
# Create and start up the pageserver, and safekeepers if any
|
||||
# generate initial tenant ID here instead of letting 'zenith init' generate it,
|
||||
# so that we don't need to dig it out of the config file afterwards.
|
||||
self.initial_tenant = uuid.uuid4().hex
|
||||
|
||||
# Create a config file corresponding to the options
|
||||
toml = f"""
|
||||
default_tenantid = '{self.initial_tenant}'
|
||||
"""
|
||||
|
||||
# Create config for pageserver
|
||||
pageserver_port = PageserverPort(
|
||||
pg=self.port_distributor.get_port(),
|
||||
http=self.port_distributor.get_port(),
|
||||
)
|
||||
self.pageserver = ZenithPageserver(self,
|
||||
port=pageserver_port,
|
||||
enable_auth=config.pageserver_auth_enabled)
|
||||
pageserver_auth_type = "ZenithJWT" if config.pageserver_auth_enabled else "Trust"
|
||||
|
||||
toml += f"""
|
||||
[pageserver]
|
||||
pg_port = {pageserver_port.pg}
|
||||
http_port = {pageserver_port.http}
|
||||
auth_type = '{pageserver_auth_type}'
|
||||
"""
|
||||
|
||||
# Create a corresponding ZenithPageserver object
|
||||
self.pageserver = ZenithPageserver(self, port=pageserver_port)
|
||||
|
||||
# Create config and a Safekeeper object for each safekeeper
|
||||
for i in range(1, config.num_safekeepers + 1):
|
||||
port = SafekeeperPort(
|
||||
pg=self.port_distributor.get_port(),
|
||||
http=self.port_distributor.get_port(),
|
||||
)
|
||||
|
||||
if config.num_safekeepers == 1:
|
||||
name = "single"
|
||||
else:
|
||||
name = f"sk{i}"
|
||||
toml += f"""
|
||||
[[safekeepers]]
|
||||
name = '{name}'
|
||||
pg_port = {port.pg}
|
||||
http_port = {port.http}
|
||||
sync = false # Disable fsyncs to make the tests go faster
|
||||
"""
|
||||
safekeeper = Safekeeper(env=self, name=name, port=port)
|
||||
self.safekeepers.append(safekeeper)
|
||||
|
||||
log.info(f"Config: {toml}")
|
||||
|
||||
# Run 'zenith init' using the config file we constructed
|
||||
with tempfile.NamedTemporaryFile(mode='w+') as tmp:
|
||||
tmp.write(toml)
|
||||
tmp.flush()
|
||||
|
||||
cmd = ['init', f'--config={tmp.name}']
|
||||
self.zenith_cli(cmd)
|
||||
|
||||
# Start up the page server and all the safekeepers
|
||||
self.pageserver.start()
|
||||
|
||||
# since we are in progress of refactoring protocols between compute safekeeper
|
||||
# and page server use hardcoded management token in safekeeper
|
||||
management_token = self.auth_keys.generate_management_token() \
|
||||
if config.pageserver_auth_enabled else None
|
||||
|
||||
# get newly created tenant id
|
||||
self.initial_tenant = self.zenith_cli(['tenant', 'list']).stdout.split()[0]
|
||||
|
||||
# Start up safekeepers
|
||||
for wa_num in range(config.num_safekeepers):
|
||||
wa = Safekeeper(env=self,
|
||||
data_dir=Path(os.path.join(self.repo_dir, f"safekeeper_{wa_num}")),
|
||||
port=SafekeeperPort(
|
||||
pg=self.port_distributor.get_port(),
|
||||
http=self.port_distributor.get_port(),
|
||||
),
|
||||
num=wa_num,
|
||||
auth_token=management_token)
|
||||
wa.start()
|
||||
self.safekeepers.append(wa)
|
||||
for safekeeper in self.safekeepers:
|
||||
safekeeper.start()
|
||||
|
||||
def get_safekeeper_connstrs(self) -> str:
|
||||
""" Get list of safekeeper endpoints suitable for wal_acceptors GUC """
|
||||
@@ -624,15 +657,6 @@ class ZenithPageserver(PgProtocol):
|
||||
self.running = False
|
||||
self.service_port = port # do not shadow PgProtocol.port which is just int
|
||||
|
||||
cmd = [
|
||||
'init',
|
||||
f'--pageserver-pg-port={self.service_port.pg}',
|
||||
f'--pageserver-http-port={self.service_port.http}'
|
||||
]
|
||||
if enable_auth:
|
||||
cmd.append('--enable-auth')
|
||||
self.env.zenith_cli(cmd)
|
||||
|
||||
def start(self) -> 'ZenithPageserver':
|
||||
"""
|
||||
Start the page server.
|
||||
@@ -640,7 +664,7 @@ class ZenithPageserver(PgProtocol):
|
||||
"""
|
||||
assert self.running == False
|
||||
|
||||
self.env.zenith_cli(['start'])
|
||||
self.env.zenith_cli(['pageserver', 'start'])
|
||||
self.running = True
|
||||
return self
|
||||
|
||||
@@ -649,7 +673,7 @@ class ZenithPageserver(PgProtocol):
|
||||
Stop the page server.
|
||||
Returns self.
|
||||
"""
|
||||
cmd = ['stop']
|
||||
cmd = ['pageserver', 'stop']
|
||||
if immediate:
|
||||
cmd.append('immediate')
|
||||
|
||||
@@ -977,29 +1001,12 @@ class SafekeeperPort:
|
||||
class Safekeeper:
|
||||
""" An object representing a running safekeeper daemon. """
|
||||
env: ZenithEnv
|
||||
data_dir: Path
|
||||
port: SafekeeperPort
|
||||
num: int # identifier for logging
|
||||
name: str # identifier for logging
|
||||
auth_token: Optional[str] = None
|
||||
|
||||
def start(self) -> 'Safekeeper':
|
||||
# create data directory if not exists
|
||||
self.data_dir.mkdir(parents=True, exist_ok=True)
|
||||
with suppress(FileNotFoundError):
|
||||
self.pidfile.unlink()
|
||||
|
||||
cmd = [os.path.join(str(zenith_binpath), "safekeeper")]
|
||||
cmd.extend(["-D", str(self.data_dir)])
|
||||
cmd.extend(["--listen-pg", f"localhost:{self.port.pg}"])
|
||||
cmd.extend(["--listen-http", f"localhost:{self.port.http}"])
|
||||
cmd.append("--daemonize")
|
||||
cmd.append("--no-sync")
|
||||
# Tell page server it can receive WAL from this WAL safekeeper
|
||||
cmd.extend(["--pageserver", f"localhost:{self.env.pageserver.service_port.pg}"])
|
||||
cmd.extend(["--recall", "1 second"])
|
||||
log.info('Running command "{}"'.format(' '.join(cmd)))
|
||||
env = {'PAGESERVER_AUTH_TOKEN': self.auth_token} if self.auth_token else None
|
||||
subprocess.run(cmd, check=True, env=env)
|
||||
self.env.zenith_cli(['safekeeper', 'start', self.name])
|
||||
|
||||
# wait for wal acceptor start by checking its status
|
||||
started_at = time.time()
|
||||
@@ -1017,33 +1024,9 @@ class Safekeeper:
|
||||
break # success
|
||||
return self
|
||||
|
||||
@property
|
||||
def pidfile(self) -> Path:
|
||||
return self.data_dir / "safekeeper.pid"
|
||||
|
||||
def get_pid(self) -> Optional[int]:
|
||||
if not self.pidfile.exists():
|
||||
return None
|
||||
|
||||
try:
|
||||
pid = read_pid(self.pidfile)
|
||||
except ValueError:
|
||||
return None
|
||||
|
||||
return pid
|
||||
|
||||
def stop(self) -> 'Safekeeper':
|
||||
log.info('Stopping wal acceptor {}'.format(self.num))
|
||||
pid = self.get_pid()
|
||||
if pid is None:
|
||||
log.info("Wal acceptor {} is not running".format(self.num))
|
||||
return self
|
||||
|
||||
try:
|
||||
os.kill(pid, signal.SIGTERM)
|
||||
except Exception:
|
||||
# TODO: cleanup pid file on exit in wal acceptor
|
||||
pass # pidfile might be obsolete
|
||||
log.info('Stopping safekeeper {}'.format(self.name))
|
||||
self.env.zenith_cli(['safekeeper', 'stop', self.name])
|
||||
return self
|
||||
|
||||
def append_logical_message(self, tenant_id: str, timeline_id: str,
|
||||
|
||||
@@ -15,6 +15,7 @@ postgres = { git = "https://github.com/zenithdb/rust-postgres.git", rev="9eb0dbf
|
||||
# FIXME: 'pageserver' is needed for BranchInfo. Refactor
|
||||
pageserver = { path = "../pageserver" }
|
||||
control_plane = { path = "../control_plane" }
|
||||
walkeeper = { path = "../walkeeper" }
|
||||
postgres_ffi = { path = "../postgres_ffi" }
|
||||
zenith_utils = { path = "../zenith_utils" }
|
||||
workspace_hack = { path = "../workspace_hack" }
|
||||
|
||||
@@ -3,17 +3,52 @@ use anyhow::{Context, Result};
|
||||
use clap::{App, AppSettings, Arg, ArgMatches, SubCommand};
|
||||
use control_plane::compute::ComputeControlPlane;
|
||||
use control_plane::local_env;
|
||||
use control_plane::local_env::LocalEnv;
|
||||
use control_plane::safekeeper::SafekeeperNode;
|
||||
use control_plane::storage::PageServerNode;
|
||||
use pageserver::defaults::{DEFAULT_HTTP_LISTEN_PORT, DEFAULT_PG_LISTEN_PORT};
|
||||
use pageserver::defaults::{
|
||||
DEFAULT_HTTP_LISTEN_PORT as DEFAULT_PAGESERVER_HTTP_PORT,
|
||||
DEFAULT_PG_LISTEN_PORT as DEFAULT_PAGESERVER_PG_PORT,
|
||||
};
|
||||
use std::collections::HashMap;
|
||||
use std::process::exit;
|
||||
use std::str::FromStr;
|
||||
use zenith_utils::auth::{encode_from_key_path, Claims, Scope};
|
||||
use walkeeper::defaults::{
|
||||
DEFAULT_HTTP_LISTEN_PORT as DEFAULT_SAFEKEEPER_HTTP_PORT,
|
||||
DEFAULT_PG_LISTEN_PORT as DEFAULT_SAFEKEEPER_PG_PORT,
|
||||
};
|
||||
use zenith_utils::auth::{Claims, Scope};
|
||||
use zenith_utils::postgres_backend::AuthType;
|
||||
use zenith_utils::zid::{ZTenantId, ZTimelineId};
|
||||
|
||||
use pageserver::branches::BranchInfo;
|
||||
|
||||
// Default name of a safekeeper node, if not specified on the command line.
|
||||
const DEFAULT_SAFEKEEPER_NAME: &str = "single";
|
||||
|
||||
fn default_conf() -> String {
|
||||
format!(
|
||||
r#"
|
||||
# Default built-in configuration, defined in main.rs
|
||||
[pageserver]
|
||||
pg_port = {pageserver_pg_port}
|
||||
http_port = {pageserver_http_port}
|
||||
auth_type = '{pageserver_auth_type}'
|
||||
|
||||
[[safekeepers]]
|
||||
name = '{safekeeper_name}'
|
||||
pg_port = {safekeeper_pg_port}
|
||||
http_port = {safekeeper_http_port}
|
||||
"#,
|
||||
pageserver_pg_port = DEFAULT_PAGESERVER_PG_PORT,
|
||||
pageserver_http_port = DEFAULT_PAGESERVER_HTTP_PORT,
|
||||
pageserver_auth_type = AuthType::Trust,
|
||||
safekeeper_name = DEFAULT_SAFEKEEPER_NAME,
|
||||
safekeeper_pg_port = DEFAULT_SAFEKEEPER_PG_PORT,
|
||||
safekeeper_http_port = DEFAULT_SAFEKEEPER_HTTP_PORT,
|
||||
)
|
||||
}
|
||||
|
||||
///
|
||||
/// Branches tree element used as a value in the HashMap.
|
||||
///
|
||||
@@ -32,11 +67,16 @@ struct BranchTreeEl {
|
||||
// * Providing CLI api to the pageserver
|
||||
// * TODO: export/import to/from usual postgres
|
||||
fn main() -> Result<()> {
|
||||
let node_arg = Arg::with_name("node")
|
||||
let pg_node_arg = Arg::with_name("node")
|
||||
.index(1)
|
||||
.help("Node name")
|
||||
.required(true);
|
||||
|
||||
let safekeeper_node_arg = Arg::with_name("node")
|
||||
.index(1)
|
||||
.help("Node name")
|
||||
.required(false);
|
||||
|
||||
let timeline_arg = Arg::with_name("timeline")
|
||||
.index(2)
|
||||
.help("Branch name or a point-in time specification")
|
||||
@@ -59,23 +99,11 @@ fn main() -> Result<()> {
|
||||
SubCommand::with_name("init")
|
||||
.about("Initialize a new Zenith repository")
|
||||
.arg(
|
||||
Arg::with_name("pageserver-pg-port")
|
||||
.long("pageserver-pg-port")
|
||||
Arg::with_name("config")
|
||||
.long("config")
|
||||
.required(false)
|
||||
.value_name("pageserver-pg-port"),
|
||||
.value_name("config"),
|
||||
)
|
||||
.arg(
|
||||
Arg::with_name("pageserver-http-port")
|
||||
.long("pageserver-http-port")
|
||||
.required(false)
|
||||
.value_name("pageserver-http-port"),
|
||||
)
|
||||
.arg(
|
||||
Arg::with_name("enable-auth")
|
||||
.long("enable-auth")
|
||||
.takes_value(false)
|
||||
.help("Enable authentication using ZenithJWT")
|
||||
),
|
||||
)
|
||||
.subcommand(
|
||||
SubCommand::with_name("branch")
|
||||
@@ -90,15 +118,39 @@ fn main() -> Result<()> {
|
||||
.subcommand(SubCommand::with_name("list"))
|
||||
.subcommand(SubCommand::with_name("create").arg(Arg::with_name("tenantid").required(false).index(1)))
|
||||
)
|
||||
.subcommand(SubCommand::with_name("status"))
|
||||
.subcommand(SubCommand::with_name("start").about("Start local pageserver"))
|
||||
.subcommand(SubCommand::with_name("stop").about("Stop local pageserver")
|
||||
.arg(Arg::with_name("immediate")
|
||||
.help("Don't flush repository data at shutdown")
|
||||
.required(false)
|
||||
)
|
||||
.subcommand(
|
||||
SubCommand::with_name("pageserver")
|
||||
.setting(AppSettings::ArgRequiredElseHelp)
|
||||
.about("Manage page server")
|
||||
.subcommand(SubCommand::with_name("status"))
|
||||
.subcommand(SubCommand::with_name("start").about("Start local pageserver"))
|
||||
.subcommand(SubCommand::with_name("stop").about("Stop local pageserver")
|
||||
.arg(Arg::with_name("immediate")
|
||||
.help("Don't flush repository data at shutdown")
|
||||
.required(false)
|
||||
))
|
||||
.subcommand(SubCommand::with_name("restart").about("Restart local pageserver"))
|
||||
)
|
||||
.subcommand(
|
||||
SubCommand::with_name("safekeeper")
|
||||
.setting(AppSettings::ArgRequiredElseHelp)
|
||||
.about("Manage safekeepers")
|
||||
.subcommand(SubCommand::with_name("start")
|
||||
.about("Start local safekeeper")
|
||||
.arg(safekeeper_node_arg.clone())
|
||||
)
|
||||
.subcommand(SubCommand::with_name("stop")
|
||||
.about("Stop local safekeeper")
|
||||
.arg(safekeeper_node_arg.clone())
|
||||
.arg(Arg::with_name("immediate")
|
||||
.help("Don't flush data at shutdown")
|
||||
.required(false)
|
||||
))
|
||||
.subcommand(SubCommand::with_name("restart")
|
||||
.about("Restart local safekeeper")
|
||||
.arg(safekeeper_node_arg.clone())
|
||||
)
|
||||
)
|
||||
.subcommand(SubCommand::with_name("restart").about("Restart local pageserver"))
|
||||
.subcommand(
|
||||
SubCommand::with_name("pg")
|
||||
.setting(AppSettings::ArgRequiredElseHelp)
|
||||
@@ -106,7 +158,7 @@ fn main() -> Result<()> {
|
||||
.subcommand(SubCommand::with_name("list").arg(tenantid_arg.clone()))
|
||||
.subcommand(SubCommand::with_name("create")
|
||||
.about("Create a postgres compute node")
|
||||
.arg(node_arg.clone())
|
||||
.arg(pg_node_arg.clone())
|
||||
.arg(timeline_arg.clone())
|
||||
.arg(tenantid_arg.clone())
|
||||
.arg(port_arg.clone())
|
||||
@@ -118,13 +170,13 @@ fn main() -> Result<()> {
|
||||
))
|
||||
.subcommand(SubCommand::with_name("start")
|
||||
.about("Start a postgres compute node.\n This command actually creates new node from scratch, but preserves existing config files")
|
||||
.arg(node_arg.clone())
|
||||
.arg(pg_node_arg.clone())
|
||||
.arg(timeline_arg.clone())
|
||||
.arg(tenantid_arg.clone())
|
||||
.arg(port_arg.clone()))
|
||||
.subcommand(
|
||||
SubCommand::with_name("stop")
|
||||
.arg(node_arg.clone())
|
||||
.arg(pg_node_arg.clone())
|
||||
.arg(timeline_arg.clone())
|
||||
.arg(tenantid_arg.clone())
|
||||
.arg(
|
||||
@@ -136,37 +188,39 @@ fn main() -> Result<()> {
|
||||
)
|
||||
|
||||
)
|
||||
.subcommand(
|
||||
SubCommand::with_name("start")
|
||||
.about("Start page server and safekeepers")
|
||||
)
|
||||
.subcommand(
|
||||
SubCommand::with_name("stop")
|
||||
.about("Stop page server and safekeepers")
|
||||
.arg(Arg::with_name("immediate")
|
||||
.help("Don't flush repository data at shutdown")
|
||||
.required(false)
|
||||
)
|
||||
)
|
||||
.get_matches();
|
||||
|
||||
// Create config file
|
||||
if let ("init", Some(init_match)) = matches.subcommand() {
|
||||
let tenantid = ZTenantId::generate();
|
||||
let pageserver_pg_port = match init_match.value_of("pageserver-pg-port") {
|
||||
Some(v) => v.parse()?,
|
||||
None => DEFAULT_PG_LISTEN_PORT,
|
||||
};
|
||||
let pageserver_http_port = match init_match.value_of("pageserver-http-port") {
|
||||
Some(v) => v.parse()?,
|
||||
None => DEFAULT_HTTP_LISTEN_PORT,
|
||||
};
|
||||
|
||||
let auth_type = if init_match.is_present("enable-auth") {
|
||||
AuthType::ZenithJWT
|
||||
let toml_file: String = if let Some(config_path) = init_match.value_of("config") {
|
||||
// load and parse the file
|
||||
std::fs::read_to_string(std::path::Path::new(config_path))
|
||||
.with_context(|| format!("Could not read configuration file \"{}\"", config_path))?
|
||||
} else {
|
||||
AuthType::Trust
|
||||
// Built-in default config
|
||||
default_conf()
|
||||
};
|
||||
|
||||
local_env::init(
|
||||
pageserver_pg_port,
|
||||
pageserver_http_port,
|
||||
tenantid,
|
||||
auth_type,
|
||||
)
|
||||
.with_context(|| "Failed to create config file")?;
|
||||
let mut env = LocalEnv::create_config(&toml_file)
|
||||
.with_context(|| "Failed to create zenith configuration")?;
|
||||
env.init()
|
||||
.with_context(|| "Failed to initialize zenith repository")?;
|
||||
}
|
||||
|
||||
// all other commands would need config
|
||||
let env = match local_env::load_config() {
|
||||
let env = match LocalEnv::load_config() {
|
||||
Ok(conf) => conf,
|
||||
Err(e) => {
|
||||
eprintln!("Error loading config: {}", e);
|
||||
@@ -175,12 +229,12 @@ fn main() -> Result<()> {
|
||||
};
|
||||
|
||||
match matches.subcommand() {
|
||||
("init", Some(init_match)) => {
|
||||
("init", Some(_sub_m)) => {
|
||||
// The options were handled above already
|
||||
let pageserver = PageServerNode::from_env(&env);
|
||||
if let Err(e) = pageserver.init(
|
||||
// default_tenantid was generated before the `local_env::init` call above
|
||||
// default_tenantid was generated by the `env.init()` call above
|
||||
Some(&env.default_tenantid.unwrap().to_string()),
|
||||
init_match.is_present("enable-auth"),
|
||||
) {
|
||||
eprintln!("pageserver init failed: {}", e);
|
||||
exit(1);
|
||||
@@ -200,43 +254,27 @@ fn main() -> Result<()> {
|
||||
}
|
||||
}
|
||||
|
||||
("start", Some(_sub_m)) => {
|
||||
let pageserver = PageServerNode::from_env(&env);
|
||||
|
||||
if let Err(e) = pageserver.start() {
|
||||
eprintln!("pageserver start failed: {}", e);
|
||||
("start", Some(sub_match)) => {
|
||||
if let Err(e) = handle_start_all(sub_match, &env) {
|
||||
eprintln!("start command failed: {}", e);
|
||||
exit(1);
|
||||
}
|
||||
}
|
||||
|
||||
("stop", Some(stop_match)) => {
|
||||
let pageserver = PageServerNode::from_env(&env);
|
||||
|
||||
let immediate = stop_match.is_present("immediate");
|
||||
|
||||
if let Err(e) = pageserver.stop(immediate) {
|
||||
eprintln!("pageserver stop failed: {}", e);
|
||||
("stop", Some(sub_match)) => {
|
||||
if let Err(e) = handle_stop_all(sub_match, &env) {
|
||||
eprintln!("stop command failed: {}", e);
|
||||
exit(1);
|
||||
}
|
||||
}
|
||||
|
||||
("restart", Some(_sub_m)) => {
|
||||
let pageserver = PageServerNode::from_env(&env);
|
||||
|
||||
//TODO what shutdown strategy should we use here?
|
||||
if let Err(e) = pageserver.stop(false) {
|
||||
eprintln!("pageserver stop failed: {}", e);
|
||||
exit(1);
|
||||
}
|
||||
|
||||
if let Err(e) = pageserver.start() {
|
||||
eprintln!("pageserver start failed: {}", e);
|
||||
("pageserver", Some(sub_match)) => {
|
||||
if let Err(e) = handle_pageserver(sub_match, &env) {
|
||||
eprintln!("branch command failed: {}", e);
|
||||
exit(1);
|
||||
}
|
||||
}
|
||||
|
||||
("status", Some(_sub_m)) => {}
|
||||
|
||||
("pg", Some(pg_match)) => {
|
||||
if let Err(e) = handle_pg(pg_match, &env) {
|
||||
eprintln!("pg operation failed: {:?}", e);
|
||||
@@ -244,6 +282,13 @@ fn main() -> Result<()> {
|
||||
}
|
||||
}
|
||||
|
||||
("safekeeper", Some(sub_match)) => {
|
||||
if let Err(e) = handle_safekeeper(sub_match, &env) {
|
||||
eprintln!("branch command failed: {}", e);
|
||||
exit(1);
|
||||
}
|
||||
}
|
||||
|
||||
_ => {}
|
||||
};
|
||||
|
||||
@@ -496,9 +541,10 @@ fn handle_pg(pg_match: &ArgMatches, env: &local_env::LocalEnv) -> Result<()> {
|
||||
|
||||
let node = cplane.nodes.get(&(tenantid, node_name.to_owned()));
|
||||
|
||||
let auth_token = if matches!(env.auth_type, AuthType::ZenithJWT) {
|
||||
let auth_token = if matches!(env.pageserver.auth_type, AuthType::ZenithJWT) {
|
||||
let claims = Claims::new(Some(tenantid), Scope::Tenant);
|
||||
Some(encode_from_key_path(&claims, &env.private_key_path)?)
|
||||
|
||||
Some(env.generate_auth_token(&claims)?)
|
||||
} else {
|
||||
None
|
||||
};
|
||||
@@ -541,3 +587,147 @@ fn handle_pg(pg_match: &ArgMatches, env: &local_env::LocalEnv) -> Result<()> {
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn handle_pageserver(sub_match: &ArgMatches, env: &local_env::LocalEnv) -> Result<()> {
|
||||
let pageserver = PageServerNode::from_env(env);
|
||||
|
||||
match sub_match.subcommand() {
|
||||
("start", Some(_sub_m)) => {
|
||||
if let Err(e) = pageserver.start() {
|
||||
eprintln!("pageserver start failed: {}", e);
|
||||
exit(1);
|
||||
}
|
||||
}
|
||||
|
||||
("stop", Some(stop_match)) => {
|
||||
let immediate = stop_match.is_present("immediate");
|
||||
|
||||
if let Err(e) = pageserver.stop(immediate) {
|
||||
eprintln!("pageserver stop failed: {}", e);
|
||||
exit(1);
|
||||
}
|
||||
}
|
||||
|
||||
("restart", Some(_sub_m)) => {
|
||||
//TODO what shutdown strategy should we use here?
|
||||
if let Err(e) = pageserver.stop(false) {
|
||||
eprintln!("pageserver stop failed: {}", e);
|
||||
exit(1);
|
||||
}
|
||||
|
||||
if let Err(e) = pageserver.start() {
|
||||
eprintln!("pageserver start failed: {}", e);
|
||||
exit(1);
|
||||
}
|
||||
}
|
||||
|
||||
_ => {}
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn get_safekeeper(env: &local_env::LocalEnv, name: &str) -> Result<SafekeeperNode> {
|
||||
if let Some(node) = env.safekeepers.iter().find(|node| node.name == name) {
|
||||
Ok(SafekeeperNode::from_env(env, node))
|
||||
} else {
|
||||
bail!("could not find safekeeper '{}'", name)
|
||||
}
|
||||
}
|
||||
|
||||
fn handle_safekeeper(sub_match: &ArgMatches, env: &local_env::LocalEnv) -> Result<()> {
|
||||
match sub_match.subcommand() {
|
||||
("start", Some(sub_match)) => {
|
||||
let node_name = sub_match
|
||||
.value_of("node")
|
||||
.unwrap_or(DEFAULT_SAFEKEEPER_NAME);
|
||||
let safekeeper = get_safekeeper(env, node_name)?;
|
||||
|
||||
if let Err(e) = safekeeper.start() {
|
||||
eprintln!("safekeeper start failed: {}", e);
|
||||
exit(1);
|
||||
}
|
||||
}
|
||||
|
||||
("stop", Some(sub_match)) => {
|
||||
let node_name = sub_match
|
||||
.value_of("node")
|
||||
.unwrap_or(DEFAULT_SAFEKEEPER_NAME);
|
||||
let immediate = sub_match.is_present("immediate");
|
||||
|
||||
let safekeeper = get_safekeeper(env, node_name)?;
|
||||
|
||||
if let Err(e) = safekeeper.stop(immediate) {
|
||||
eprintln!("safekeeper stop failed: {}", e);
|
||||
exit(1);
|
||||
}
|
||||
}
|
||||
|
||||
("restart", Some(sub_match)) => {
|
||||
let node_name = sub_match
|
||||
.value_of("node")
|
||||
.unwrap_or(DEFAULT_SAFEKEEPER_NAME);
|
||||
|
||||
let safekeeper = get_safekeeper(env, node_name)?;
|
||||
|
||||
//TODO what shutdown strategy should we use here?
|
||||
if let Err(e) = safekeeper.stop(false) {
|
||||
eprintln!("safekeeper stop failed: {}", e);
|
||||
exit(1);
|
||||
}
|
||||
|
||||
if let Err(e) = safekeeper.start() {
|
||||
eprintln!("safekeeper start failed: {}", e);
|
||||
exit(1);
|
||||
}
|
||||
}
|
||||
|
||||
_ => {}
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn handle_start_all(_sub_match: &ArgMatches, env: &local_env::LocalEnv) -> Result<()> {
|
||||
let pageserver = PageServerNode::from_env(env);
|
||||
|
||||
// Postgres nodes are not started automatically
|
||||
|
||||
if let Err(e) = pageserver.start() {
|
||||
eprintln!("pageserver start failed: {}", e);
|
||||
exit(1);
|
||||
}
|
||||
|
||||
for node in env.safekeepers.iter() {
|
||||
let safekeeper = SafekeeperNode::from_env(env, node);
|
||||
if let Err(e) = safekeeper.start() {
|
||||
eprintln!("safekeeper '{}' start failed: {}", safekeeper.name, e);
|
||||
exit(1);
|
||||
}
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn handle_stop_all(sub_match: &ArgMatches, env: &local_env::LocalEnv) -> Result<()> {
|
||||
let immediate = sub_match.is_present("immediate");
|
||||
|
||||
let pageserver = PageServerNode::from_env(env);
|
||||
|
||||
// Stop all compute nodes
|
||||
let cplane = ComputeControlPlane::load(env.clone())?;
|
||||
for (_k, node) in cplane.nodes {
|
||||
if let Err(e) = node.stop(false) {
|
||||
eprintln!("postgres stop failed: {}", e);
|
||||
}
|
||||
}
|
||||
|
||||
if let Err(e) = pageserver.stop(immediate) {
|
||||
eprintln!("pageserver stop failed: {}", e);
|
||||
}
|
||||
|
||||
for node in env.safekeepers.iter() {
|
||||
let safekeeper = SafekeeperNode::from_env(env, node);
|
||||
if let Err(e) = safekeeper.stop(immediate) {
|
||||
eprintln!("safekeeper '{}' stop failed: {}", safekeeper.name, e);
|
||||
}
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
@@ -104,8 +104,8 @@ impl JwtAuth {
|
||||
}
|
||||
|
||||
pub fn from_key_path(key_path: &Path) -> Result<Self> {
|
||||
let public_key = fs::read_to_string(key_path)?;
|
||||
Ok(Self::new(DecodingKey::from_rsa_pem(public_key.as_bytes())?))
|
||||
let public_key = fs::read(key_path)?;
|
||||
Ok(Self::new(DecodingKey::from_rsa_pem(&public_key)?))
|
||||
}
|
||||
|
||||
pub fn decode(&self, token: &str) -> Result<TokenData<Claims>> {
|
||||
@@ -114,8 +114,7 @@ impl JwtAuth {
|
||||
}
|
||||
|
||||
// this function is used only for testing purposes in CLI e g generate tokens during init
|
||||
pub fn encode_from_key_path(claims: &Claims, key_path: &Path) -> Result<String> {
|
||||
let key_data = fs::read_to_string(key_path)?;
|
||||
let key = EncodingKey::from_rsa_pem(key_data.as_bytes())?;
|
||||
pub fn encode_from_key_file(claims: &Claims, key_data: &[u8]) -> Result<String> {
|
||||
let key = EncodingKey::from_rsa_pem(key_data)?;
|
||||
Ok(encode(&Header::new(JWT_ALGORITHM), claims, &key)?)
|
||||
}
|
||||
|
||||
@@ -9,6 +9,7 @@ use anyhow::{anyhow, bail, ensure, Result};
|
||||
use bytes::{Bytes, BytesMut};
|
||||
use rand::Rng;
|
||||
use serde::{Deserialize, Serialize};
|
||||
use std::fmt;
|
||||
use std::io::{self, Write};
|
||||
use std::net::{Shutdown, SocketAddr, TcpStream};
|
||||
use std::str::FromStr;
|
||||
@@ -77,6 +78,16 @@ impl FromStr for AuthType {
|
||||
}
|
||||
}
|
||||
|
||||
impl fmt::Display for AuthType {
|
||||
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
|
||||
f.write_str(match self {
|
||||
AuthType::Trust => "Trust",
|
||||
AuthType::MD5 => "MD5",
|
||||
AuthType::ZenithJWT => "ZenithJWT",
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Clone, Copy)]
|
||||
pub enum ProcessMsgResult {
|
||||
Continue,
|
||||
|
||||
Reference in New Issue
Block a user