diff --git a/Cargo.lock b/Cargo.lock index 0d45f7133a..526b802356 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2549,6 +2549,7 @@ dependencies = [ "postgres", "postgres_ffi", "serde_json", + "walkeeper", "workspace_hack", "zenith_utils", ] diff --git a/README.md b/README.md index 3d4392fe9d..c2822dfadf 100644 --- a/README.md +++ b/README.md @@ -47,17 +47,26 @@ make -j5 # Create repository in .zenith with proper paths to binaries and data # Later that would be responsibility of a package install script > ./target/debug/zenith init +initializing tenantid c03ba6b7ad4c5e9cf556f059ade44229 +created initial timeline 5b014a9e41b4b63ce1a1febc04503636 timeline.lsn 0/169C3C8 +created main branch pageserver init succeeded -# start pageserver +# start pageserver and safekeeper > ./target/debug/zenith start -Starting pageserver at '127.0.0.1:64000' in .zenith +Starting pageserver at 'localhost:64000' in '.zenith' Pageserver started +initializing for single for 7676 +Starting safekeeper at 'localhost:5454' in '.zenith/safekeepers/single' +Safekeeper started -# start postgres on top on the pageserver +# start postgres compute node > ./target/debug/zenith pg start main -Starting postgres node at 'host=127.0.0.1 port=55432 user=stas' +Starting new postgres main on main... +Extracting base backup to create postgres instance: path=.zenith/pgdatadirs/tenants/c03ba6b7ad4c5e9cf556f059ade44229/main port=55432 +Starting postgres node at 'host=127.0.0.1 port=55432 user=zenith_admin dbname=postgres' waiting for server to start.... done +server started # check list of running postgres instances > ./target/debug/zenith pg list @@ -108,10 +117,9 @@ postgres=# insert into t values(2,2); INSERT 0 1 ``` -6. If you want to run tests afterwards (see below), you have to stop pageserver and all postgres instances you have just started: +6. If you want to run tests afterwards (see below), you have to stop all the running the pageserver, safekeeper and postgres instances + you have just started. You can stop them all with one command: ```sh -> ./target/debug/zenith pg stop migration_check -> ./target/debug/zenith pg stop main > ./target/debug/zenith stop ``` diff --git a/control_plane/safekeepers.conf b/control_plane/safekeepers.conf new file mode 100644 index 0000000000..99f4545990 --- /dev/null +++ b/control_plane/safekeepers.conf @@ -0,0 +1,20 @@ +# Page server and three safekeepers. +[pageserver] +pg_port = 64000 +http_port = 9898 +auth_type = 'Trust' + +[[safekeepers]] +name = 'sk1' +pg_port = 5454 +http_port = 7676 + +[[safekeepers]] +name = 'sk2' +pg_port = 5455 +http_port = 7677 + +[[safekeepers]] +name = 'sk3' +pg_port = 5456 +http_port = 7678 diff --git a/control_plane/simple.conf b/control_plane/simple.conf new file mode 100644 index 0000000000..d482e111c1 --- /dev/null +++ b/control_plane/simple.conf @@ -0,0 +1,11 @@ +# Minimal zenith environment with one safekeeper. This is equivalent to the built-in +# defaults that you get with no --config +[pageserver] +pg_port = 64000 +http_port = 9898 +auth_type = 'Trust' + +[[safekeepers]] +name = 'single' +pg_port = 5454 +http_port = 7676 diff --git a/control_plane/src/compute.rs b/control_plane/src/compute.rs index 1df7249361..282e94aed5 100644 --- a/control_plane/src/compute.rs +++ b/control_plane/src/compute.rs @@ -125,7 +125,7 @@ impl ComputeControlPlane { }); node.create_pgdata()?; - node.setup_pg_conf(self.env.auth_type)?; + node.setup_pg_conf(self.env.pageserver.auth_type)?; self.nodes .insert((tenantid, node.name.clone()), Arc::clone(&node)); @@ -328,9 +328,25 @@ impl PostgresNode { } conf.append_line(""); - // Configure the node to stream WAL directly to the pageserver - conf.append("synchronous_standby_names", "pageserver"); // TODO: add a new function arg? - conf.append("zenith.callmemaybe_connstring", &self.connstr()); + if !self.env.safekeepers.is_empty() { + // Configure the node to connect to the safekeepers + conf.append("synchronous_standby_names", "walproposer"); + + let wal_acceptors = self + .env + .safekeepers + .iter() + .map(|sk| format!("localhost:{}", sk.pg_port)) + .collect::>() + .join(","); + conf.append("wal_acceptors", &wal_acceptors); + } else { + // Configure the node to stream WAL directly to the pageserver + // This isn't really a supported configuration, but can be useful for + // testing. + conf.append("synchronous_standby_names", "pageserver"); + conf.append("zenith.callmemaybe_connstring", &self.connstr()); + } let mut file = File::create(self.pgdata().join("postgresql.conf"))?; file.write_all(conf.to_string().as_bytes())?; diff --git a/control_plane/src/lib.rs b/control_plane/src/lib.rs index 287f30be63..d2d16312e2 100644 --- a/control_plane/src/lib.rs +++ b/control_plane/src/lib.rs @@ -13,6 +13,7 @@ use std::path::Path; pub mod compute; pub mod local_env; pub mod postgresql_conf; +pub mod safekeeper; pub mod storage; /// Read a PID file diff --git a/control_plane/src/local_env.rs b/control_plane/src/local_env.rs index 38c75948c6..d3bf4d304b 100644 --- a/control_plane/src/local_env.rs +++ b/control_plane/src/local_env.rs @@ -7,36 +7,41 @@ use anyhow::{Context, Result}; use serde::{Deserialize, Serialize}; use std::env; +use std::fmt::Write; use std::fs; -use std::path::PathBuf; +use std::path::{Path, PathBuf}; use std::process::{Command, Stdio}; -use zenith_utils::auth::{encode_from_key_path, Claims, Scope}; +use zenith_utils::auth::{encode_from_key_file, Claims, Scope}; use zenith_utils::postgres_backend::AuthType; use zenith_utils::zid::ZTenantId; // -// This data structures represent deserialized zenith CLI config +// This data structures represents zenith CLI config +// +// It is deserialized from the .zenith/config file, or the config file passed +// to 'zenith init --config=' option. See control_plane/simple.conf for +// an example. // #[derive(Serialize, Deserialize, Clone, Debug)] pub struct LocalEnv { - // Pageserver connection settings - pub pageserver_pg_port: u16, - pub pageserver_http_port: u16, - - // Base directory for both pageserver and compute nodes + // Base directory for all the nodes (the pageserver, safekeepers and + // compute nodes). // // This is not stored in the config file. Rather, this is the path where the // config file itself is. It is read from the ZENITH_REPO_DIR env variable or // '.zenith' if not given. + #[serde(skip)] pub base_data_dir: PathBuf, // Path to postgres distribution. It's expected that "bin", "include", // "lib", "share" from postgres distribution are there. If at some point // in time we will be able to run against vanilla postgres we may split that // to four separate paths and match OS-specific installation layout. + #[serde(default)] pub pg_distrib_dir: PathBuf, // Path to pageserver binary. + #[serde(default)] pub zenith_distrib_dir: PathBuf, // Default tenant ID to use with the 'zenith' command line utility, when @@ -45,14 +50,59 @@ pub struct LocalEnv { #[serde(default)] pub default_tenantid: Option, - // jwt auth token used for communication with pageserver - pub auth_token: String, + // used to issue tokens during e.g pg start + #[serde(default)] + pub private_key_path: PathBuf, + + pub pageserver: PageServerConf, + + #[serde(default)] + pub safekeepers: Vec, +} + +#[derive(Serialize, Deserialize, Clone, Debug)] +#[serde(default)] +pub struct PageServerConf { + // Pageserver connection settings + pub pg_port: u16, + pub http_port: u16, // used to determine which auth type is used pub auth_type: AuthType, - // used to issue tokens during e.g pg start - pub private_key_path: PathBuf, + // jwt auth token used for communication with pageserver + pub auth_token: String, +} + +impl Default for PageServerConf { + fn default() -> Self { + Self { + pg_port: 0, + http_port: 0, + auth_type: AuthType::Trust, + auth_token: "".to_string(), + } + } +} + +#[derive(Serialize, Deserialize, Clone, Debug)] +#[serde(default)] +pub struct SafekeeperConf { + pub name: String, + pub pg_port: u16, + pub http_port: u16, + pub sync: bool, +} + +impl Default for SafekeeperConf { + fn default() -> Self { + Self { + name: "".to_string(), + pg_port: 0, + http_port: 0, + sync: true, + } + } } impl LocalEnv { @@ -68,6 +118,10 @@ impl LocalEnv { Ok(self.zenith_distrib_dir.join("pageserver")) } + pub fn safekeeper_bin(&self) -> Result { + Ok(self.zenith_distrib_dir.join("safekeeper")) + } + pub fn pg_data_dirs_path(&self) -> PathBuf { self.base_data_dir.join("pgdatadirs").join("tenants") } @@ -82,6 +136,187 @@ impl LocalEnv { pub fn pageserver_data_dir(&self) -> PathBuf { self.base_data_dir.clone() } + + pub fn safekeeper_data_dir(&self, node_name: &str) -> PathBuf { + self.base_data_dir.join("safekeepers").join(node_name) + } + + /// Create a LocalEnv from a config file. + /// + /// Unlike 'load_config', this function fills in any defaults that are missing + /// from the config file. + pub fn create_config(toml: &str) -> Result { + let mut env: LocalEnv = toml::from_str(toml)?; + + // Find postgres binaries. + // Follow POSTGRES_DISTRIB_DIR if set, otherwise look in "tmp_install". + if env.pg_distrib_dir == Path::new("") { + if let Some(postgres_bin) = env::var_os("POSTGRES_DISTRIB_DIR") { + env.pg_distrib_dir = postgres_bin.into(); + } else { + let cwd = env::current_dir()?; + env.pg_distrib_dir = cwd.join("tmp_install") + } + } + if !env.pg_distrib_dir.join("bin/postgres").exists() { + anyhow::bail!( + "Can't find postgres binary at {}", + env.pg_distrib_dir.display() + ); + } + + // Find zenith binaries. + if env.zenith_distrib_dir == Path::new("") { + env.zenith_distrib_dir = env::current_exe()?.parent().unwrap().to_owned(); + } + if !env.zenith_distrib_dir.join("pageserver").exists() { + anyhow::bail!("Can't find pageserver binary."); + } + if !env.zenith_distrib_dir.join("safekeeper").exists() { + anyhow::bail!("Can't find safekeeper binary."); + } + + // If no initial tenant ID was given, generate it. + if env.default_tenantid.is_none() { + env.default_tenantid = Some(ZTenantId::generate()); + } + + env.base_data_dir = base_path(); + + Ok(env) + } + + /// Locate and load config + pub fn load_config() -> Result { + let repopath = base_path(); + + if !repopath.exists() { + anyhow::bail!( + "Zenith config is not found in {}. You need to run 'zenith init' first", + repopath.to_str().unwrap() + ); + } + + // TODO: check that it looks like a zenith repository + + // load and parse file + let config = fs::read_to_string(repopath.join("config"))?; + let mut env: LocalEnv = toml::from_str(config.as_str())?; + + env.base_data_dir = repopath; + + Ok(env) + } + + // this function is used only for testing purposes in CLI e g generate tokens during init + pub fn generate_auth_token(&self, claims: &Claims) -> Result { + let private_key_path = if self.private_key_path.is_absolute() { + self.private_key_path.to_path_buf() + } else { + self.base_data_dir.join(&self.private_key_path) + }; + + let key_data = fs::read(private_key_path)?; + encode_from_key_file(claims, &key_data) + } + + // + // Initialize a new Zenith repository + // + pub fn init(&mut self) -> Result<()> { + // check if config already exists + let base_path = &self.base_data_dir; + if base_path == Path::new("") { + anyhow::bail!("repository base path is missing"); + } + if base_path.exists() { + anyhow::bail!( + "directory '{}' already exists. Perhaps already initialized?", + base_path.to_str().unwrap() + ); + } + + fs::create_dir(&base_path)?; + + // generate keys for jwt + // openssl genrsa -out private_key.pem 2048 + let private_key_path; + if self.private_key_path == PathBuf::new() { + private_key_path = base_path.join("auth_private_key.pem"); + let keygen_output = Command::new("openssl") + .arg("genrsa") + .args(&["-out", private_key_path.to_str().unwrap()]) + .arg("2048") + .stdout(Stdio::null()) + .output() + .with_context(|| "failed to generate auth private key")?; + if !keygen_output.status.success() { + anyhow::bail!( + "openssl failed: '{}'", + String::from_utf8_lossy(&keygen_output.stderr) + ); + } + self.private_key_path = Path::new("auth_private_key.pem").to_path_buf(); + + let public_key_path = base_path.join("auth_public_key.pem"); + // openssl rsa -in private_key.pem -pubout -outform PEM -out public_key.pem + let keygen_output = Command::new("openssl") + .arg("rsa") + .args(&["-in", private_key_path.to_str().unwrap()]) + .arg("-pubout") + .args(&["-outform", "PEM"]) + .args(&["-out", public_key_path.to_str().unwrap()]) + .stdout(Stdio::null()) + .output() + .with_context(|| "failed to generate auth private key")?; + if !keygen_output.status.success() { + anyhow::bail!( + "openssl failed: '{}'", + String::from_utf8_lossy(&keygen_output.stderr) + ); + } + } + + self.pageserver.auth_token = + self.generate_auth_token(&Claims::new(None, Scope::PageServerApi))?; + + fs::create_dir_all(self.pg_data_dirs_path())?; + + for safekeeper in self.safekeepers.iter() { + fs::create_dir_all(self.safekeeper_data_dir(&safekeeper.name))?; + } + + let mut conf_content = String::new(); + + // Currently, the user first passes a config file with 'zenith init --config=' + // We read that in, in `create_config`, and fill any missing defaults. Then it's saved + // to .zenith/config. TODO: We lose any formatting and comments along the way, which is + // a bit sad. + write!( + &mut conf_content, + r#"# This file describes a locale deployment of the page server +# and safekeeeper node. It is read by the 'zenith' command-line +# utility. +"# + )?; + + // Convert the LocalEnv to a toml file. + // + // This could be as simple as this: + // + // conf_content += &toml::to_string_pretty(env)?; + // + // But it results in a "values must be emitted before tables". I'm not sure + // why, AFAICS the table, i.e. 'safekeepers: Vec' is last. + // Maybe rust reorders the fields to squeeze avoid padding or something? + // In any case, converting to toml::Value first, and serializing that, works. + // See https://github.com/alexcrichton/toml-rs/issues/142 + conf_content += &toml::to_string_pretty(&toml::Value::try_from(&self)?)?; + + fs::write(base_path.join("config"), conf_content)?; + + Ok(()) + } } fn base_path() -> PathBuf { @@ -91,122 +326,6 @@ fn base_path() -> PathBuf { } } -// -// Initialize a new Zenith repository -// -pub fn init( - pageserver_pg_port: u16, - pageserver_http_port: u16, - tenantid: ZTenantId, - auth_type: AuthType, -) -> Result<()> { - // check if config already exists - let base_path = base_path(); - if base_path.exists() { - anyhow::bail!( - "{} already exists. Perhaps already initialized?", - base_path.to_str().unwrap() - ); - } - fs::create_dir(&base_path)?; - - // ok, now check that expected binaries are present - - // Find postgres binaries. Follow POSTGRES_DISTRIB_DIR if set, otherwise look in "tmp_install". - let pg_distrib_dir: PathBuf = { - if let Some(postgres_bin) = env::var_os("POSTGRES_DISTRIB_DIR") { - postgres_bin.into() - } else { - let cwd = env::current_dir()?; - cwd.join("tmp_install") - } - }; - if !pg_distrib_dir.join("bin/postgres").exists() { - anyhow::bail!("Can't find postgres binary at {:?}", pg_distrib_dir); - } - - // generate keys for jwt - // openssl genrsa -out private_key.pem 2048 - let private_key_path = base_path.join("auth_private_key.pem"); - let keygen_output = Command::new("openssl") - .arg("genrsa") - .args(&["-out", private_key_path.to_str().unwrap()]) - .arg("2048") - .stdout(Stdio::null()) - .output() - .with_context(|| "failed to generate auth private key")?; - if !keygen_output.status.success() { - anyhow::bail!( - "openssl failed: '{}'", - String::from_utf8_lossy(&keygen_output.stderr) - ); - } - - let public_key_path = base_path.join("auth_public_key.pem"); - // openssl rsa -in private_key.pem -pubout -outform PEM -out public_key.pem - let keygen_output = Command::new("openssl") - .arg("rsa") - .args(&["-in", private_key_path.to_str().unwrap()]) - .arg("-pubout") - .args(&["-outform", "PEM"]) - .args(&["-out", public_key_path.to_str().unwrap()]) - .stdout(Stdio::null()) - .output() - .with_context(|| "failed to generate auth private key")?; - if !keygen_output.status.success() { - anyhow::bail!( - "openssl failed: '{}'", - String::from_utf8_lossy(&keygen_output.stderr) - ); - } - - let auth_token = - encode_from_key_path(&Claims::new(None, Scope::PageServerApi), &private_key_path)?; - - // Find zenith binaries. - let zenith_distrib_dir = env::current_exe()?.parent().unwrap().to_owned(); - if !zenith_distrib_dir.join("pageserver").exists() { - anyhow::bail!("Can't find pageserver binary.",); - } - - let conf = LocalEnv { - pageserver_pg_port, - pageserver_http_port, - pg_distrib_dir, - zenith_distrib_dir, - base_data_dir: base_path, - default_tenantid: Some(tenantid), - auth_token, - auth_type, - private_key_path, - }; - - fs::create_dir_all(conf.pg_data_dirs_path())?; - - let toml = toml::to_string_pretty(&conf)?; - fs::write(conf.base_data_dir.join("config"), toml)?; - - Ok(()) -} - -// Locate and load config -pub fn load_config() -> Result { - let repopath = base_path(); - - if !repopath.exists() { - anyhow::bail!( - "Zenith config is not found in {}. You need to run 'zenith init' first", - repopath.to_str().unwrap() - ); - } - - // TODO: check that it looks like a zenith repository - - // load and parse file - let config = fs::read_to_string(repopath.join("config"))?; - toml::from_str(config.as_str()).map_err(|e| e.into()) -} - /// Serde routines for Option. The serialized form is a hex string. mod opt_tenantid_serde { use serde::{Deserialize, Deserializer, Serialize, Serializer}; diff --git a/control_plane/src/safekeeper.rs b/control_plane/src/safekeeper.rs new file mode 100644 index 0000000000..2ed9749ce4 --- /dev/null +++ b/control_plane/src/safekeeper.rs @@ -0,0 +1,256 @@ +use std::io::Write; +use std::net::TcpStream; +use std::path::PathBuf; +use std::process::Command; +use std::sync::Arc; +use std::time::Duration; +use std::{io, result, thread}; + +use anyhow::bail; +use nix::sys::signal::{kill, Signal}; +use nix::unistd::Pid; +use postgres::Config; +use reqwest::blocking::{Client, RequestBuilder, Response}; +use reqwest::{IntoUrl, Method}; +use thiserror::Error; +use zenith_utils::http::error::HttpErrorBody; +use zenith_utils::postgres_backend::AuthType; + +use crate::local_env::{LocalEnv, SafekeeperConf}; +use crate::read_pidfile; +use crate::storage::PageServerNode; +use zenith_utils::connstring::connection_address; +use zenith_utils::connstring::connection_host_port; + +#[derive(Error, Debug)] +pub enum SafekeeperHttpError { + #[error("Reqwest error: {0}")] + Transport(#[from] reqwest::Error), + + #[error("Error: {0}")] + Response(String), +} + +type Result = result::Result; + +pub trait ResponseErrorMessageExt: Sized { + fn error_from_body(self) -> Result; +} + +impl ResponseErrorMessageExt for Response { + fn error_from_body(self) -> Result { + let status = self.status(); + if !(status.is_client_error() || status.is_server_error()) { + return Ok(self); + } + + // reqwest do not export it's error construction utility functions, so lets craft the message ourselves + let url = self.url().to_owned(); + Err(SafekeeperHttpError::Response( + match self.json::() { + Ok(err_body) => format!("Error: {}", err_body.msg), + Err(_) => format!("Http error ({}) at {}.", status.as_u16(), url), + }, + )) + } +} + +// +// Control routines for safekeeper. +// +// Used in CLI and tests. +// +#[derive(Debug)] +pub struct SafekeeperNode { + pub name: String, + + pub conf: SafekeeperConf, + + pub pg_connection_config: Config, + pub env: LocalEnv, + pub http_client: Client, + pub http_base_url: String, + + pub pageserver: Arc, +} + +impl SafekeeperNode { + pub fn from_env(env: &LocalEnv, conf: &SafekeeperConf) -> SafekeeperNode { + let pageserver = Arc::new(PageServerNode::from_env(env)); + + println!("initializing for {} for {}", conf.name, conf.http_port); + + SafekeeperNode { + name: conf.name.clone(), + conf: conf.clone(), + pg_connection_config: Self::safekeeper_connection_config(conf.pg_port), + env: env.clone(), + http_client: Client::new(), + http_base_url: format!("http://localhost:{}/v1", conf.http_port), + pageserver, + } + } + + /// Construct libpq connection string for connecting to this safekeeper. + fn safekeeper_connection_config(port: u16) -> Config { + // TODO safekeeper authentication not implemented yet + format!("postgresql://no_user@localhost:{}/no_db", port) + .parse() + .unwrap() + } + + pub fn datadir_path(&self) -> PathBuf { + self.env.safekeeper_data_dir(&self.name) + } + + pub fn pid_file(&self) -> PathBuf { + self.datadir_path().join("safekeeper.pid") + } + + pub fn start(&self) -> anyhow::Result<()> { + print!( + "Starting safekeeper at '{}' in '{}'", + connection_address(&self.pg_connection_config), + self.datadir_path().display() + ); + io::stdout().flush().unwrap(); + + // Configure connection to page server + // + // FIXME: We extract the host and port from the connection string instead of using + // the connection string directly, because the 'safekeeper' binary expects + // host:port format. That's a bit silly when we already have a full libpq connection + // string at hand. + let pageserver_conn = { + let (host, port) = connection_host_port(&self.pageserver.pg_connection_config); + format!("{}:{}", host, port) + }; + + let listen_pg = format!("localhost:{}", self.conf.pg_port); + let listen_http = format!("localhost:{}", self.conf.http_port); + + let mut cmd: &mut Command = &mut Command::new(self.env.safekeeper_bin()?); + cmd = cmd + .args(&["-D", self.datadir_path().to_str().unwrap()]) + .args(&["--listen-pg", &listen_pg]) + .args(&["--listen-http", &listen_http]) + .args(&["--pageserver", &pageserver_conn]) + .args(&["--recall", "1 second"]) + .arg("--daemonize") + .env_clear() + .env("RUST_BACKTRACE", "1"); + if !self.conf.sync { + cmd = cmd.arg("--no-sync"); + } + + if self.env.pageserver.auth_type == AuthType::ZenithJWT { + cmd.env("PAGESERVER_AUTH_TOKEN", &self.env.pageserver.auth_token); + } + + if !cmd.status()?.success() { + bail!( + "Safekeeper failed to start. See '{}' for details.", + self.datadir_path().join("safekeeper.log").display() + ); + } + + // It takes a while for the safekeeper to start up. Wait until it is + // open for business. + const RETRIES: i8 = 15; + for retries in 1..RETRIES { + match self.check_status() { + Ok(_) => { + println!("\nSafekeeper started"); + return Ok(()); + } + Err(err) => { + match err { + SafekeeperHttpError::Transport(err) => { + if err.is_connect() && retries < 5 { + print!("."); + io::stdout().flush().unwrap(); + } else { + if retries == 5 { + println!() // put a line break after dots for second message + } + println!( + "Safekeeper not responding yet, err {} retrying ({})...", + err, retries + ); + } + } + SafekeeperHttpError::Response(msg) => { + bail!("safekeeper failed to start: {} ", msg) + } + } + thread::sleep(Duration::from_secs(1)); + } + } + } + bail!("safekeeper failed to start in {} seconds", RETRIES); + } + + pub fn stop(&self, immediate: bool) -> anyhow::Result<()> { + let pid_file = self.pid_file(); + if !pid_file.exists() { + println!("Safekeeper {} is already stopped", self.name); + return Ok(()) + } + let pid = read_pidfile(&pid_file)?; + let pid = Pid::from_raw(pid); + if immediate { + println!("Stop safekeeper immediately"); + if kill(pid, Signal::SIGQUIT).is_err() { + bail!("Failed to kill safekeeper with pid {}", pid); + } + } else { + println!("Stop safekeeper gracefully"); + if kill(pid, Signal::SIGTERM).is_err() { + bail!("Failed to stop safekeeper with pid {}", pid); + } + } + + let address = connection_address(&self.pg_connection_config); + + // TODO Remove this "timeout" and handle it on caller side instead. + // Shutting down may take a long time, + // if safekeeper flushes a lot of data + for _ in 0..100 { + if let Err(_e) = TcpStream::connect(&address) { + println!("Safekeeper stopped receiving connections"); + + //Now check status + match self.check_status() { + Ok(_) => { + println!("Safekeeper status is OK. Wait a bit."); + thread::sleep(Duration::from_secs(1)); + } + Err(err) => { + println!("Safekeeper status is: {}", err); + return Ok(()); + } + } + } else { + println!("Safekeeper still receives connections"); + thread::sleep(Duration::from_secs(1)); + } + } + + bail!("Failed to stop safekeeper with pid {}", pid); + } + + fn http_request(&self, method: Method, url: U) -> RequestBuilder { + // TODO: authentication + //if self.env.auth_type == AuthType::ZenithJWT { + // builder = builder.bearer_auth(&self.env.safekeeper_auth_token) + //} + self.http_client.request(method, url) + } + + pub fn check_status(&self) -> Result<()> { + self.http_request(Method::GET, format!("{}/{}", self.http_base_url, "status")) + .send()? + .error_from_body()?; + Ok(()) + } +} diff --git a/control_plane/src/storage.rs b/control_plane/src/storage.rs index 92de4752dc..088163258a 100644 --- a/control_plane/src/storage.rs +++ b/control_plane/src/storage.rs @@ -71,8 +71,8 @@ pub struct PageServerNode { impl PageServerNode { pub fn from_env(env: &LocalEnv) -> PageServerNode { - let password = if env.auth_type == AuthType::ZenithJWT { - &env.auth_token + let password = if env.pageserver.auth_type == AuthType::ZenithJWT { + &env.pageserver.auth_token } else { "" }; @@ -80,24 +80,25 @@ impl PageServerNode { PageServerNode { pg_connection_config: Self::pageserver_connection_config( password, - env.pageserver_pg_port, + env.pageserver.pg_port, ), env: env.clone(), http_client: Client::new(), - http_base_url: format!("http://localhost:{}/v1", env.pageserver_http_port), + http_base_url: format!("http://localhost:{}/v1", env.pageserver.http_port), } } + /// Construct libpq connection string for connecting to the pageserver. fn pageserver_connection_config(password: &str, port: u16) -> Config { format!("postgresql://no_user:{}@localhost:{}/no_db", password, port) .parse() .unwrap() } - pub fn init(&self, create_tenant: Option<&str>, enable_auth: bool) -> anyhow::Result<()> { + pub fn init(&self, create_tenant: Option<&str>) -> anyhow::Result<()> { let mut cmd = Command::new(self.env.pageserver_bin()?); - let listen_pg = format!("localhost:{}", self.env.pageserver_pg_port); - let listen_http = format!("localhost:{}", self.env.pageserver_http_port); + let listen_pg = format!("localhost:{}", self.env.pageserver.pg_port); + let listen_http = format!("localhost:{}", self.env.pageserver.http_port); let mut args = vec![ "--init", "-D", @@ -110,10 +111,11 @@ impl PageServerNode { &listen_http, ]; - if enable_auth { + let auth_type_str = &self.env.pageserver.auth_type.to_string(); + if self.env.pageserver.auth_type != AuthType::Trust { args.extend(&["--auth-validation-public-key-path", "auth_public_key.pem"]); - args.extend(&["--auth-type", "ZenithJWT"]); } + args.extend(&["--auth-type", auth_type_str]); if let Some(tenantid) = create_tenant { args.extend(&["--create-tenant", tenantid]) @@ -267,8 +269,8 @@ impl PageServerNode { fn http_request(&self, method: Method, url: U) -> RequestBuilder { let mut builder = self.http_client.request(method, url); - if self.env.auth_type == AuthType::ZenithJWT { - builder = builder.bearer_auth(&self.env.auth_token) + if self.env.pageserver.auth_type == AuthType::ZenithJWT { + builder = builder.bearer_auth(&self.env.pageserver.auth_token) } builder } diff --git a/test_runner/fixtures/zenith_fixtures.py b/test_runner/fixtures/zenith_fixtures.py index b71317f37d..f3fd98ca31 100644 --- a/test_runner/fixtures/zenith_fixtures.py +++ b/test_runner/fixtures/zenith_fixtures.py @@ -16,6 +16,7 @@ import socket import subprocess import time import filecmp +import tempfile from contextlib import closing, suppress from pathlib import Path @@ -336,7 +337,7 @@ class ZenithEnvBuilder: def __exit__(self, exc_type, exc_value, traceback): - # After the yield comes any cleanup code we need. Stop all the nodes. + # Stop all the nodes. if self.env: log.info('Cleaning up all storage and compute nodes') self.env.postgres.stop_all() @@ -385,36 +386,68 @@ class ZenithEnv: self.safekeepers: List[Safekeeper] = [] - # Create and start up the pageserver, and safekeepers if any + # generate initial tenant ID here instead of letting 'zenith init' generate it, + # so that we don't need to dig it out of the config file afterwards. + self.initial_tenant = uuid.uuid4().hex + + # Create a config file corresponding to the options + toml = f""" +default_tenantid = '{self.initial_tenant}' + """ + + # Create config for pageserver pageserver_port = PageserverPort( pg=self.port_distributor.get_port(), http=self.port_distributor.get_port(), ) - self.pageserver = ZenithPageserver(self, - port=pageserver_port, - enable_auth=config.pageserver_auth_enabled) + pageserver_auth_type = "ZenithJWT" if config.pageserver_auth_enabled else "Trust" + + toml += f""" +[pageserver] +pg_port = {pageserver_port.pg} +http_port = {pageserver_port.http} +auth_type = '{pageserver_auth_type}' + """ + + # Create a corresponding ZenithPageserver object + self.pageserver = ZenithPageserver(self, port=pageserver_port) + + # Create config and a Safekeeper object for each safekeeper + for i in range(1, config.num_safekeepers + 1): + port = SafekeeperPort( + pg=self.port_distributor.get_port(), + http=self.port_distributor.get_port(), + ) + + if config.num_safekeepers == 1: + name = "single" + else: + name = f"sk{i}" + toml += f""" +[[safekeepers]] +name = '{name}' +pg_port = {port.pg} +http_port = {port.http} +sync = false # Disable fsyncs to make the tests go faster + """ + safekeeper = Safekeeper(env=self, name=name, port=port) + self.safekeepers.append(safekeeper) + + log.info(f"Config: {toml}") + + # Run 'zenith init' using the config file we constructed + with tempfile.NamedTemporaryFile(mode='w+') as tmp: + tmp.write(toml) + tmp.flush() + + cmd = ['init', f'--config={tmp.name}'] + self.zenith_cli(cmd) + + # Start up the page server and all the safekeepers self.pageserver.start() - # since we are in progress of refactoring protocols between compute safekeeper - # and page server use hardcoded management token in safekeeper - management_token = self.auth_keys.generate_management_token() \ - if config.pageserver_auth_enabled else None - - # get newly created tenant id - self.initial_tenant = self.zenith_cli(['tenant', 'list']).stdout.split()[0] - - # Start up safekeepers - for wa_num in range(config.num_safekeepers): - wa = Safekeeper(env=self, - data_dir=Path(os.path.join(self.repo_dir, f"safekeeper_{wa_num}")), - port=SafekeeperPort( - pg=self.port_distributor.get_port(), - http=self.port_distributor.get_port(), - ), - num=wa_num, - auth_token=management_token) - wa.start() - self.safekeepers.append(wa) + for safekeeper in self.safekeepers: + safekeeper.start() def get_safekeeper_connstrs(self) -> str: """ Get list of safekeeper endpoints suitable for wal_acceptors GUC """ @@ -624,15 +657,6 @@ class ZenithPageserver(PgProtocol): self.running = False self.service_port = port # do not shadow PgProtocol.port which is just int - cmd = [ - 'init', - f'--pageserver-pg-port={self.service_port.pg}', - f'--pageserver-http-port={self.service_port.http}' - ] - if enable_auth: - cmd.append('--enable-auth') - self.env.zenith_cli(cmd) - def start(self) -> 'ZenithPageserver': """ Start the page server. @@ -640,7 +664,7 @@ class ZenithPageserver(PgProtocol): """ assert self.running == False - self.env.zenith_cli(['start']) + self.env.zenith_cli(['pageserver', 'start']) self.running = True return self @@ -649,7 +673,7 @@ class ZenithPageserver(PgProtocol): Stop the page server. Returns self. """ - cmd = ['stop'] + cmd = ['pageserver', 'stop'] if immediate: cmd.append('immediate') @@ -977,29 +1001,12 @@ class SafekeeperPort: class Safekeeper: """ An object representing a running safekeeper daemon. """ env: ZenithEnv - data_dir: Path port: SafekeeperPort - num: int # identifier for logging + name: str # identifier for logging auth_token: Optional[str] = None def start(self) -> 'Safekeeper': - # create data directory if not exists - self.data_dir.mkdir(parents=True, exist_ok=True) - with suppress(FileNotFoundError): - self.pidfile.unlink() - - cmd = [os.path.join(str(zenith_binpath), "safekeeper")] - cmd.extend(["-D", str(self.data_dir)]) - cmd.extend(["--listen-pg", f"localhost:{self.port.pg}"]) - cmd.extend(["--listen-http", f"localhost:{self.port.http}"]) - cmd.append("--daemonize") - cmd.append("--no-sync") - # Tell page server it can receive WAL from this WAL safekeeper - cmd.extend(["--pageserver", f"localhost:{self.env.pageserver.service_port.pg}"]) - cmd.extend(["--recall", "1 second"]) - log.info('Running command "{}"'.format(' '.join(cmd))) - env = {'PAGESERVER_AUTH_TOKEN': self.auth_token} if self.auth_token else None - subprocess.run(cmd, check=True, env=env) + self.env.zenith_cli(['safekeeper', 'start', self.name]) # wait for wal acceptor start by checking its status started_at = time.time() @@ -1017,33 +1024,9 @@ class Safekeeper: break # success return self - @property - def pidfile(self) -> Path: - return self.data_dir / "safekeeper.pid" - - def get_pid(self) -> Optional[int]: - if not self.pidfile.exists(): - return None - - try: - pid = read_pid(self.pidfile) - except ValueError: - return None - - return pid - def stop(self) -> 'Safekeeper': - log.info('Stopping wal acceptor {}'.format(self.num)) - pid = self.get_pid() - if pid is None: - log.info("Wal acceptor {} is not running".format(self.num)) - return self - - try: - os.kill(pid, signal.SIGTERM) - except Exception: - # TODO: cleanup pid file on exit in wal acceptor - pass # pidfile might be obsolete + log.info('Stopping safekeeper {}'.format(self.name)) + self.env.zenith_cli(['safekeeper', 'stop', self.name]) return self def append_logical_message(self, tenant_id: str, timeline_id: str, diff --git a/zenith/Cargo.toml b/zenith/Cargo.toml index 6b2f54582d..207477b5f9 100644 --- a/zenith/Cargo.toml +++ b/zenith/Cargo.toml @@ -15,6 +15,7 @@ postgres = { git = "https://github.com/zenithdb/rust-postgres.git", rev="9eb0dbf # FIXME: 'pageserver' is needed for BranchInfo. Refactor pageserver = { path = "../pageserver" } control_plane = { path = "../control_plane" } +walkeeper = { path = "../walkeeper" } postgres_ffi = { path = "../postgres_ffi" } zenith_utils = { path = "../zenith_utils" } workspace_hack = { path = "../workspace_hack" } diff --git a/zenith/src/main.rs b/zenith/src/main.rs index 63a6a72808..eefb62baa3 100644 --- a/zenith/src/main.rs +++ b/zenith/src/main.rs @@ -3,17 +3,52 @@ use anyhow::{Context, Result}; use clap::{App, AppSettings, Arg, ArgMatches, SubCommand}; use control_plane::compute::ComputeControlPlane; use control_plane::local_env; +use control_plane::local_env::LocalEnv; +use control_plane::safekeeper::SafekeeperNode; use control_plane::storage::PageServerNode; -use pageserver::defaults::{DEFAULT_HTTP_LISTEN_PORT, DEFAULT_PG_LISTEN_PORT}; +use pageserver::defaults::{ + DEFAULT_HTTP_LISTEN_PORT as DEFAULT_PAGESERVER_HTTP_PORT, + DEFAULT_PG_LISTEN_PORT as DEFAULT_PAGESERVER_PG_PORT, +}; use std::collections::HashMap; use std::process::exit; use std::str::FromStr; -use zenith_utils::auth::{encode_from_key_path, Claims, Scope}; +use walkeeper::defaults::{ + DEFAULT_HTTP_LISTEN_PORT as DEFAULT_SAFEKEEPER_HTTP_PORT, + DEFAULT_PG_LISTEN_PORT as DEFAULT_SAFEKEEPER_PG_PORT, +}; +use zenith_utils::auth::{Claims, Scope}; use zenith_utils::postgres_backend::AuthType; use zenith_utils::zid::{ZTenantId, ZTimelineId}; use pageserver::branches::BranchInfo; +// Default name of a safekeeper node, if not specified on the command line. +const DEFAULT_SAFEKEEPER_NAME: &str = "single"; + +fn default_conf() -> String { + format!( + r#" +# Default built-in configuration, defined in main.rs +[pageserver] +pg_port = {pageserver_pg_port} +http_port = {pageserver_http_port} +auth_type = '{pageserver_auth_type}' + +[[safekeepers]] +name = '{safekeeper_name}' +pg_port = {safekeeper_pg_port} +http_port = {safekeeper_http_port} +"#, + pageserver_pg_port = DEFAULT_PAGESERVER_PG_PORT, + pageserver_http_port = DEFAULT_PAGESERVER_HTTP_PORT, + pageserver_auth_type = AuthType::Trust, + safekeeper_name = DEFAULT_SAFEKEEPER_NAME, + safekeeper_pg_port = DEFAULT_SAFEKEEPER_PG_PORT, + safekeeper_http_port = DEFAULT_SAFEKEEPER_HTTP_PORT, + ) +} + /// /// Branches tree element used as a value in the HashMap. /// @@ -32,11 +67,16 @@ struct BranchTreeEl { // * Providing CLI api to the pageserver // * TODO: export/import to/from usual postgres fn main() -> Result<()> { - let node_arg = Arg::with_name("node") + let pg_node_arg = Arg::with_name("node") .index(1) .help("Node name") .required(true); + let safekeeper_node_arg = Arg::with_name("node") + .index(1) + .help("Node name") + .required(false); + let timeline_arg = Arg::with_name("timeline") .index(2) .help("Branch name or a point-in time specification") @@ -59,23 +99,11 @@ fn main() -> Result<()> { SubCommand::with_name("init") .about("Initialize a new Zenith repository") .arg( - Arg::with_name("pageserver-pg-port") - .long("pageserver-pg-port") + Arg::with_name("config") + .long("config") .required(false) - .value_name("pageserver-pg-port"), + .value_name("config"), ) - .arg( - Arg::with_name("pageserver-http-port") - .long("pageserver-http-port") - .required(false) - .value_name("pageserver-http-port"), - ) - .arg( - Arg::with_name("enable-auth") - .long("enable-auth") - .takes_value(false) - .help("Enable authentication using ZenithJWT") - ), ) .subcommand( SubCommand::with_name("branch") @@ -90,15 +118,39 @@ fn main() -> Result<()> { .subcommand(SubCommand::with_name("list")) .subcommand(SubCommand::with_name("create").arg(Arg::with_name("tenantid").required(false).index(1))) ) - .subcommand(SubCommand::with_name("status")) - .subcommand(SubCommand::with_name("start").about("Start local pageserver")) - .subcommand(SubCommand::with_name("stop").about("Stop local pageserver") - .arg(Arg::with_name("immediate") - .help("Don't flush repository data at shutdown") - .required(false) - ) + .subcommand( + SubCommand::with_name("pageserver") + .setting(AppSettings::ArgRequiredElseHelp) + .about("Manage page server") + .subcommand(SubCommand::with_name("status")) + .subcommand(SubCommand::with_name("start").about("Start local pageserver")) + .subcommand(SubCommand::with_name("stop").about("Stop local pageserver") + .arg(Arg::with_name("immediate") + .help("Don't flush repository data at shutdown") + .required(false) + )) + .subcommand(SubCommand::with_name("restart").about("Restart local pageserver")) + ) + .subcommand( + SubCommand::with_name("safekeeper") + .setting(AppSettings::ArgRequiredElseHelp) + .about("Manage safekeepers") + .subcommand(SubCommand::with_name("start") + .about("Start local safekeeper") + .arg(safekeeper_node_arg.clone()) + ) + .subcommand(SubCommand::with_name("stop") + .about("Stop local safekeeper") + .arg(safekeeper_node_arg.clone()) + .arg(Arg::with_name("immediate") + .help("Don't flush data at shutdown") + .required(false) + )) + .subcommand(SubCommand::with_name("restart") + .about("Restart local safekeeper") + .arg(safekeeper_node_arg.clone()) + ) ) - .subcommand(SubCommand::with_name("restart").about("Restart local pageserver")) .subcommand( SubCommand::with_name("pg") .setting(AppSettings::ArgRequiredElseHelp) @@ -106,7 +158,7 @@ fn main() -> Result<()> { .subcommand(SubCommand::with_name("list").arg(tenantid_arg.clone())) .subcommand(SubCommand::with_name("create") .about("Create a postgres compute node") - .arg(node_arg.clone()) + .arg(pg_node_arg.clone()) .arg(timeline_arg.clone()) .arg(tenantid_arg.clone()) .arg(port_arg.clone()) @@ -118,13 +170,13 @@ fn main() -> Result<()> { )) .subcommand(SubCommand::with_name("start") .about("Start a postgres compute node.\n This command actually creates new node from scratch, but preserves existing config files") - .arg(node_arg.clone()) + .arg(pg_node_arg.clone()) .arg(timeline_arg.clone()) .arg(tenantid_arg.clone()) .arg(port_arg.clone())) .subcommand( SubCommand::with_name("stop") - .arg(node_arg.clone()) + .arg(pg_node_arg.clone()) .arg(timeline_arg.clone()) .arg(tenantid_arg.clone()) .arg( @@ -136,37 +188,39 @@ fn main() -> Result<()> { ) ) + .subcommand( + SubCommand::with_name("start") + .about("Start page server and safekeepers") + ) + .subcommand( + SubCommand::with_name("stop") + .about("Stop page server and safekeepers") + .arg(Arg::with_name("immediate") + .help("Don't flush repository data at shutdown") + .required(false) + ) + ) .get_matches(); // Create config file if let ("init", Some(init_match)) = matches.subcommand() { - let tenantid = ZTenantId::generate(); - let pageserver_pg_port = match init_match.value_of("pageserver-pg-port") { - Some(v) => v.parse()?, - None => DEFAULT_PG_LISTEN_PORT, - }; - let pageserver_http_port = match init_match.value_of("pageserver-http-port") { - Some(v) => v.parse()?, - None => DEFAULT_HTTP_LISTEN_PORT, - }; - - let auth_type = if init_match.is_present("enable-auth") { - AuthType::ZenithJWT + let toml_file: String = if let Some(config_path) = init_match.value_of("config") { + // load and parse the file + std::fs::read_to_string(std::path::Path::new(config_path)) + .with_context(|| format!("Could not read configuration file \"{}\"", config_path))? } else { - AuthType::Trust + // Built-in default config + default_conf() }; - local_env::init( - pageserver_pg_port, - pageserver_http_port, - tenantid, - auth_type, - ) - .with_context(|| "Failed to create config file")?; + let mut env = LocalEnv::create_config(&toml_file) + .with_context(|| "Failed to create zenith configuration")?; + env.init() + .with_context(|| "Failed to initialize zenith repository")?; } // all other commands would need config - let env = match local_env::load_config() { + let env = match LocalEnv::load_config() { Ok(conf) => conf, Err(e) => { eprintln!("Error loading config: {}", e); @@ -175,12 +229,12 @@ fn main() -> Result<()> { }; match matches.subcommand() { - ("init", Some(init_match)) => { + ("init", Some(_sub_m)) => { + // The options were handled above already let pageserver = PageServerNode::from_env(&env); if let Err(e) = pageserver.init( - // default_tenantid was generated before the `local_env::init` call above + // default_tenantid was generated by the `env.init()` call above Some(&env.default_tenantid.unwrap().to_string()), - init_match.is_present("enable-auth"), ) { eprintln!("pageserver init failed: {}", e); exit(1); @@ -200,43 +254,27 @@ fn main() -> Result<()> { } } - ("start", Some(_sub_m)) => { - let pageserver = PageServerNode::from_env(&env); - - if let Err(e) = pageserver.start() { - eprintln!("pageserver start failed: {}", e); + ("start", Some(sub_match)) => { + if let Err(e) = handle_start_all(sub_match, &env) { + eprintln!("start command failed: {}", e); exit(1); } } - ("stop", Some(stop_match)) => { - let pageserver = PageServerNode::from_env(&env); - - let immediate = stop_match.is_present("immediate"); - - if let Err(e) = pageserver.stop(immediate) { - eprintln!("pageserver stop failed: {}", e); + ("stop", Some(sub_match)) => { + if let Err(e) = handle_stop_all(sub_match, &env) { + eprintln!("stop command failed: {}", e); exit(1); } } - ("restart", Some(_sub_m)) => { - let pageserver = PageServerNode::from_env(&env); - - //TODO what shutdown strategy should we use here? - if let Err(e) = pageserver.stop(false) { - eprintln!("pageserver stop failed: {}", e); - exit(1); - } - - if let Err(e) = pageserver.start() { - eprintln!("pageserver start failed: {}", e); + ("pageserver", Some(sub_match)) => { + if let Err(e) = handle_pageserver(sub_match, &env) { + eprintln!("branch command failed: {}", e); exit(1); } } - ("status", Some(_sub_m)) => {} - ("pg", Some(pg_match)) => { if let Err(e) = handle_pg(pg_match, &env) { eprintln!("pg operation failed: {:?}", e); @@ -244,6 +282,13 @@ fn main() -> Result<()> { } } + ("safekeeper", Some(sub_match)) => { + if let Err(e) = handle_safekeeper(sub_match, &env) { + eprintln!("branch command failed: {}", e); + exit(1); + } + } + _ => {} }; @@ -496,9 +541,10 @@ fn handle_pg(pg_match: &ArgMatches, env: &local_env::LocalEnv) -> Result<()> { let node = cplane.nodes.get(&(tenantid, node_name.to_owned())); - let auth_token = if matches!(env.auth_type, AuthType::ZenithJWT) { + let auth_token = if matches!(env.pageserver.auth_type, AuthType::ZenithJWT) { let claims = Claims::new(Some(tenantid), Scope::Tenant); - Some(encode_from_key_path(&claims, &env.private_key_path)?) + + Some(env.generate_auth_token(&claims)?) } else { None }; @@ -541,3 +587,147 @@ fn handle_pg(pg_match: &ArgMatches, env: &local_env::LocalEnv) -> Result<()> { Ok(()) } + +fn handle_pageserver(sub_match: &ArgMatches, env: &local_env::LocalEnv) -> Result<()> { + let pageserver = PageServerNode::from_env(env); + + match sub_match.subcommand() { + ("start", Some(_sub_m)) => { + if let Err(e) = pageserver.start() { + eprintln!("pageserver start failed: {}", e); + exit(1); + } + } + + ("stop", Some(stop_match)) => { + let immediate = stop_match.is_present("immediate"); + + if let Err(e) = pageserver.stop(immediate) { + eprintln!("pageserver stop failed: {}", e); + exit(1); + } + } + + ("restart", Some(_sub_m)) => { + //TODO what shutdown strategy should we use here? + if let Err(e) = pageserver.stop(false) { + eprintln!("pageserver stop failed: {}", e); + exit(1); + } + + if let Err(e) = pageserver.start() { + eprintln!("pageserver start failed: {}", e); + exit(1); + } + } + + _ => {} + } + Ok(()) +} + +fn get_safekeeper(env: &local_env::LocalEnv, name: &str) -> Result { + if let Some(node) = env.safekeepers.iter().find(|node| node.name == name) { + Ok(SafekeeperNode::from_env(env, node)) + } else { + bail!("could not find safekeeper '{}'", name) + } +} + +fn handle_safekeeper(sub_match: &ArgMatches, env: &local_env::LocalEnv) -> Result<()> { + match sub_match.subcommand() { + ("start", Some(sub_match)) => { + let node_name = sub_match + .value_of("node") + .unwrap_or(DEFAULT_SAFEKEEPER_NAME); + let safekeeper = get_safekeeper(env, node_name)?; + + if let Err(e) = safekeeper.start() { + eprintln!("safekeeper start failed: {}", e); + exit(1); + } + } + + ("stop", Some(sub_match)) => { + let node_name = sub_match + .value_of("node") + .unwrap_or(DEFAULT_SAFEKEEPER_NAME); + let immediate = sub_match.is_present("immediate"); + + let safekeeper = get_safekeeper(env, node_name)?; + + if let Err(e) = safekeeper.stop(immediate) { + eprintln!("safekeeper stop failed: {}", e); + exit(1); + } + } + + ("restart", Some(sub_match)) => { + let node_name = sub_match + .value_of("node") + .unwrap_or(DEFAULT_SAFEKEEPER_NAME); + + let safekeeper = get_safekeeper(env, node_name)?; + + //TODO what shutdown strategy should we use here? + if let Err(e) = safekeeper.stop(false) { + eprintln!("safekeeper stop failed: {}", e); + exit(1); + } + + if let Err(e) = safekeeper.start() { + eprintln!("safekeeper start failed: {}", e); + exit(1); + } + } + + _ => {} + } + Ok(()) +} + +fn handle_start_all(_sub_match: &ArgMatches, env: &local_env::LocalEnv) -> Result<()> { + let pageserver = PageServerNode::from_env(env); + + // Postgres nodes are not started automatically + + if let Err(e) = pageserver.start() { + eprintln!("pageserver start failed: {}", e); + exit(1); + } + + for node in env.safekeepers.iter() { + let safekeeper = SafekeeperNode::from_env(env, node); + if let Err(e) = safekeeper.start() { + eprintln!("safekeeper '{}' start failed: {}", safekeeper.name, e); + exit(1); + } + } + Ok(()) +} + +fn handle_stop_all(sub_match: &ArgMatches, env: &local_env::LocalEnv) -> Result<()> { + let immediate = sub_match.is_present("immediate"); + + let pageserver = PageServerNode::from_env(env); + + // Stop all compute nodes + let cplane = ComputeControlPlane::load(env.clone())?; + for (_k, node) in cplane.nodes { + if let Err(e) = node.stop(false) { + eprintln!("postgres stop failed: {}", e); + } + } + + if let Err(e) = pageserver.stop(immediate) { + eprintln!("pageserver stop failed: {}", e); + } + + for node in env.safekeepers.iter() { + let safekeeper = SafekeeperNode::from_env(env, node); + if let Err(e) = safekeeper.stop(immediate) { + eprintln!("safekeeper '{}' stop failed: {}", safekeeper.name, e); + } + } + Ok(()) +} diff --git a/zenith_utils/src/auth.rs b/zenith_utils/src/auth.rs index 23ed320de8..3ab1d0d9b1 100644 --- a/zenith_utils/src/auth.rs +++ b/zenith_utils/src/auth.rs @@ -104,8 +104,8 @@ impl JwtAuth { } pub fn from_key_path(key_path: &Path) -> Result { - let public_key = fs::read_to_string(key_path)?; - Ok(Self::new(DecodingKey::from_rsa_pem(public_key.as_bytes())?)) + let public_key = fs::read(key_path)?; + Ok(Self::new(DecodingKey::from_rsa_pem(&public_key)?)) } pub fn decode(&self, token: &str) -> Result> { @@ -114,8 +114,7 @@ impl JwtAuth { } // this function is used only for testing purposes in CLI e g generate tokens during init -pub fn encode_from_key_path(claims: &Claims, key_path: &Path) -> Result { - let key_data = fs::read_to_string(key_path)?; - let key = EncodingKey::from_rsa_pem(key_data.as_bytes())?; +pub fn encode_from_key_file(claims: &Claims, key_data: &[u8]) -> Result { + let key = EncodingKey::from_rsa_pem(key_data)?; Ok(encode(&Header::new(JWT_ALGORITHM), claims, &key)?) } diff --git a/zenith_utils/src/postgres_backend.rs b/zenith_utils/src/postgres_backend.rs index 7950eee7bd..28636d7811 100644 --- a/zenith_utils/src/postgres_backend.rs +++ b/zenith_utils/src/postgres_backend.rs @@ -9,6 +9,7 @@ use anyhow::{anyhow, bail, ensure, Result}; use bytes::{Bytes, BytesMut}; use rand::Rng; use serde::{Deserialize, Serialize}; +use std::fmt; use std::io::{self, Write}; use std::net::{Shutdown, SocketAddr, TcpStream}; use std::str::FromStr; @@ -77,6 +78,16 @@ impl FromStr for AuthType { } } +impl fmt::Display for AuthType { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.write_str(match self { + AuthType::Trust => "Trust", + AuthType::MD5 => "MD5", + AuthType::ZenithJWT => "ZenithJWT", + }) + } +} + #[derive(Clone, Copy)] pub enum ProcessMsgResult { Continue,