Compare commits

...

4 Commits

Author SHA1 Message Date
Arseny Sher
23644ed251 set pageserver id in dockerfile 2022-02-23 09:17:45 +03:00
Dmitry Rodionov
99e0f07a1d review adjustments, fancy enum for builder, minor cleanups 2022-02-23 08:33:50 +03:00
Dmitry Rodionov
5d490babf8 add node id to pageserver
This adds node id parameter to pageserver configuration. Also I use a
simple builder to construct pageserver config struct to avoid setting
node id to some temporary invalid value. Some of the changes in test
fixtures are needed to split init and start operations for envrionment.
2022-02-23 08:33:50 +03:00
Arseny Sher
5865f85ae2 Add --id argument to safekeeper setting its unique u64 id.
In preparation for storage node messaging. IDs are supposed to be monotonically
assigned by the console. In tests it is issued by ZenithEnv; at the zenith cli
level and fixtures, string name is completely replaced by integer id. Example
TOML configs are adjusted accordingly.

Sequential ids are chosen over Zid mainly because they are compact and easy to
type/remember.
2022-02-23 08:33:50 +03:00
33 changed files with 467 additions and 164 deletions

View File

@@ -5,16 +5,16 @@ listen_http_addr = '127.0.0.1:9898'
auth_type = 'Trust' auth_type = 'Trust'
[[safekeepers]] [[safekeepers]]
name = 'sk1' id = 1
pg_port = 5454 pg_port = 5454
http_port = 7676 http_port = 7676
[[safekeepers]] [[safekeepers]]
name = 'sk2' id = 2
pg_port = 5455 pg_port = 5455
http_port = 7677 http_port = 7677
[[safekeepers]] [[safekeepers]]
name = 'sk3' id = 3
pg_port = 5456 pg_port = 5456
http_port = 7678 http_port = 7678

View File

@@ -6,6 +6,6 @@ listen_http_addr = '127.0.0.1:9898'
auth_type = 'Trust' auth_type = 'Trust'
[[safekeepers]] [[safekeepers]]
name = 'single' id = 1
pg_port = 5454 pg_port = 5454
http_port = 7676 http_port = 7676

View File

@@ -12,7 +12,9 @@ use std::path::{Path, PathBuf};
use std::process::{Command, Stdio}; use std::process::{Command, Stdio};
use zenith_utils::auth::{encode_from_key_file, Claims, Scope}; use zenith_utils::auth::{encode_from_key_file, Claims, Scope};
use zenith_utils::postgres_backend::AuthType; use zenith_utils::postgres_backend::AuthType;
use zenith_utils::zid::{opt_display_serde, ZTenantId}; use zenith_utils::zid::{opt_display_serde, ZNodeId, ZTenantId};
use crate::safekeeper::SafekeeperNode;
// //
// This data structures represents zenith CLI config // This data structures represents zenith CLI config
@@ -62,6 +64,8 @@ pub struct LocalEnv {
#[derive(Serialize, Deserialize, Clone, Debug)] #[derive(Serialize, Deserialize, Clone, Debug)]
#[serde(default)] #[serde(default)]
pub struct PageServerConf { pub struct PageServerConf {
// node id
pub id: ZNodeId,
// Pageserver connection settings // Pageserver connection settings
pub listen_pg_addr: String, pub listen_pg_addr: String,
pub listen_http_addr: String, pub listen_http_addr: String,
@@ -76,6 +80,7 @@ pub struct PageServerConf {
impl Default for PageServerConf { impl Default for PageServerConf {
fn default() -> Self { fn default() -> Self {
Self { Self {
id: ZNodeId(0),
listen_pg_addr: String::new(), listen_pg_addr: String::new(),
listen_http_addr: String::new(), listen_http_addr: String::new(),
auth_type: AuthType::Trust, auth_type: AuthType::Trust,
@@ -87,7 +92,7 @@ impl Default for PageServerConf {
#[derive(Serialize, Deserialize, Clone, Debug)] #[derive(Serialize, Deserialize, Clone, Debug)]
#[serde(default)] #[serde(default)]
pub struct SafekeeperConf { pub struct SafekeeperConf {
pub name: String, pub id: ZNodeId,
pub pg_port: u16, pub pg_port: u16,
pub http_port: u16, pub http_port: u16,
pub sync: bool, pub sync: bool,
@@ -96,7 +101,7 @@ pub struct SafekeeperConf {
impl Default for SafekeeperConf { impl Default for SafekeeperConf {
fn default() -> Self { fn default() -> Self {
Self { Self {
name: String::new(), id: ZNodeId(0),
pg_port: 0, pg_port: 0,
http_port: 0, http_port: 0,
sync: true, sync: true,
@@ -136,8 +141,8 @@ impl LocalEnv {
self.base_data_dir.clone() self.base_data_dir.clone()
} }
pub fn safekeeper_data_dir(&self, node_name: &str) -> PathBuf { pub fn safekeeper_data_dir(&self, data_dir_name: &str) -> PathBuf {
self.base_data_dir.join("safekeepers").join(node_name) self.base_data_dir.join("safekeepers").join(data_dir_name)
} }
/// Create a LocalEnv from a config file. /// Create a LocalEnv from a config file.
@@ -285,7 +290,7 @@ impl LocalEnv {
fs::create_dir_all(self.pg_data_dirs_path())?; fs::create_dir_all(self.pg_data_dirs_path())?;
for safekeeper in &self.safekeepers { for safekeeper in &self.safekeepers {
fs::create_dir_all(self.safekeeper_data_dir(&safekeeper.name))?; fs::create_dir_all(SafekeeperNode::datadir_path_by_id(self, safekeeper.id))?;
} }
let mut conf_content = String::new(); let mut conf_content = String::new();

View File

@@ -15,6 +15,7 @@ use reqwest::blocking::{Client, RequestBuilder, Response};
use reqwest::{IntoUrl, Method}; use reqwest::{IntoUrl, Method};
use thiserror::Error; use thiserror::Error;
use zenith_utils::http::error::HttpErrorBody; use zenith_utils::http::error::HttpErrorBody;
use zenith_utils::zid::ZNodeId;
use crate::local_env::{LocalEnv, SafekeeperConf}; use crate::local_env::{LocalEnv, SafekeeperConf};
use crate::storage::PageServerNode; use crate::storage::PageServerNode;
@@ -61,7 +62,7 @@ impl ResponseErrorMessageExt for Response {
// //
#[derive(Debug)] #[derive(Debug)]
pub struct SafekeeperNode { pub struct SafekeeperNode {
pub name: String, pub id: ZNodeId,
pub conf: SafekeeperConf, pub conf: SafekeeperConf,
@@ -77,10 +78,10 @@ impl SafekeeperNode {
pub fn from_env(env: &LocalEnv, conf: &SafekeeperConf) -> SafekeeperNode { pub fn from_env(env: &LocalEnv, conf: &SafekeeperConf) -> SafekeeperNode {
let pageserver = Arc::new(PageServerNode::from_env(env)); let pageserver = Arc::new(PageServerNode::from_env(env));
println!("initializing for {} for {}", conf.name, conf.http_port); println!("initializing for sk {} for {}", conf.id, conf.http_port);
SafekeeperNode { SafekeeperNode {
name: conf.name.clone(), id: conf.id,
conf: conf.clone(), conf: conf.clone(),
pg_connection_config: Self::safekeeper_connection_config(conf.pg_port), pg_connection_config: Self::safekeeper_connection_config(conf.pg_port),
env: env.clone(), env: env.clone(),
@@ -98,8 +99,12 @@ impl SafekeeperNode {
.unwrap() .unwrap()
} }
pub fn datadir_path_by_id(env: &LocalEnv, sk_id: ZNodeId) -> PathBuf {
env.safekeeper_data_dir(format!("sk{}", sk_id).as_ref())
}
pub fn datadir_path(&self) -> PathBuf { pub fn datadir_path(&self) -> PathBuf {
self.env.safekeeper_data_dir(&self.name) SafekeeperNode::datadir_path_by_id(&self.env, self.id)
} }
pub fn pid_file(&self) -> PathBuf { pub fn pid_file(&self) -> PathBuf {
@@ -120,6 +125,7 @@ impl SafekeeperNode {
let mut cmd = Command::new(self.env.safekeeper_bin()?); let mut cmd = Command::new(self.env.safekeeper_bin()?);
fill_rust_env_vars( fill_rust_env_vars(
cmd.args(&["-D", self.datadir_path().to_str().unwrap()]) cmd.args(&["-D", self.datadir_path().to_str().unwrap()])
.args(&["--id", self.id.to_string().as_ref()])
.args(&["--listen-pg", &listen_pg]) .args(&["--listen-pg", &listen_pg])
.args(&["--listen-http", &listen_http]) .args(&["--listen-http", &listen_http])
.args(&["--recall", "1 second"]) .args(&["--recall", "1 second"])
@@ -183,7 +189,7 @@ impl SafekeeperNode {
pub fn stop(&self, immediate: bool) -> anyhow::Result<()> { pub fn stop(&self, immediate: bool) -> anyhow::Result<()> {
let pid_file = self.pid_file(); let pid_file = self.pid_file();
if !pid_file.exists() { if !pid_file.exists() {
println!("Safekeeper {} is already stopped", self.name); println!("Safekeeper {} is already stopped", self.id);
return Ok(()); return Ok(());
} }
let pid = read_pidfile(&pid_file)?; let pid = read_pidfile(&pid_file)?;

View File

@@ -103,6 +103,8 @@ impl PageServerNode {
) -> anyhow::Result<()> { ) -> anyhow::Result<()> {
let mut cmd = Command::new(self.env.pageserver_bin()?); let mut cmd = Command::new(self.env.pageserver_bin()?);
let id = format!("id={}", self.env.pageserver.id);
// FIXME: the paths should be shell-escaped to handle paths with spaces, quotas etc. // FIXME: the paths should be shell-escaped to handle paths with spaces, quotas etc.
let base_data_dir_param = self.env.base_data_dir.display().to_string(); let base_data_dir_param = self.env.base_data_dir.display().to_string();
let pg_distrib_dir_param = let pg_distrib_dir_param =
@@ -122,6 +124,7 @@ impl PageServerNode {
args.extend(["-c", &authg_type_param]); args.extend(["-c", &authg_type_param]);
args.extend(["-c", &listen_http_addr_param]); args.extend(["-c", &listen_http_addr_param]);
args.extend(["-c", &listen_pg_addr_param]); args.extend(["-c", &listen_pg_addr_param]);
args.extend(["-c", &id]);
for config_override in config_overrides { for config_override in config_overrides {
args.extend(["-c", config_override]); args.extend(["-c", config_override]);

View File

@@ -4,7 +4,7 @@ set -eux
if [ "$1" = 'pageserver' ]; then if [ "$1" = 'pageserver' ]; then
if [ ! -d "/data/tenants" ]; then if [ ! -d "/data/tenants" ]; then
echo "Initializing pageserver data directory" echo "Initializing pageserver data directory"
pageserver --init -D /data -c "pg_distrib_dir='/usr/local'" pageserver --init -D /data -c "pg_distrib_dir='/usr/local'" -c "id=10"
fi fi
echo "Staring pageserver at 0.0.0.0:6400" echo "Staring pageserver at 0.0.0.0:6400"
pageserver -c "listen_pg_addr='0.0.0.0:6400'" -c "listen_http_addr='0.0.0.0:9898'" -D /data pageserver -c "listen_pg_addr='0.0.0.0:6400'" -c "listen_http_addr='0.0.0.0:9898'" -D /data

View File

@@ -61,7 +61,7 @@ fn main() -> Result<()> {
.number_of_values(1) .number_of_values(1)
.multiple_occurrences(true) .multiple_occurrences(true)
.help("Additional configuration overrides of the ones from the toml config file (or new ones to add there). .help("Additional configuration overrides of the ones from the toml config file (or new ones to add there).
Any option has to be a valid toml document, example: `-c \"foo='hey'\"` `-c \"foo={value=1}\"`"), Any option has to be a valid toml document, example: `-c=\"foo='hey'\"` `-c=\"foo={value=1}\"`"),
) )
.get_matches(); .get_matches();
@@ -115,7 +115,14 @@ fn main() -> Result<()> {
option_line option_line
) )
})?; })?;
for (key, item) in doc.iter() { for (key, item) in doc.iter() {
if key == "id" {
anyhow::ensure!(
init,
"node id can only be set during pageserver init and cannot be overridden"
);
}
toml.insert(key, item.clone()); toml.insert(key, item.clone());
} }
} }

View File

@@ -8,7 +8,7 @@ use anyhow::{bail, ensure, Context, Result};
use toml_edit; use toml_edit;
use toml_edit::{Document, Item}; use toml_edit::{Document, Item};
use zenith_utils::postgres_backend::AuthType; use zenith_utils::postgres_backend::AuthType;
use zenith_utils::zid::{ZTenantId, ZTimelineId}; use zenith_utils::zid::{ZNodeId, ZTenantId, ZTimelineId};
use std::convert::TryInto; use std::convert::TryInto;
use std::env; use std::env;
@@ -72,6 +72,10 @@ pub mod defaults {
#[derive(Debug, Clone, PartialEq, Eq)] #[derive(Debug, Clone, PartialEq, Eq)]
pub struct PageServerConf { pub struct PageServerConf {
// Identifier of that particular pageserver so e g safekeepers
// can safely distinguish different pageservers
pub id: ZNodeId,
/// Example (default): 127.0.0.1:64000 /// Example (default): 127.0.0.1:64000
pub listen_pg_addr: String, pub listen_pg_addr: String,
/// Example (default): 127.0.0.1:9898 /// Example (default): 127.0.0.1:9898
@@ -106,6 +110,184 @@ pub struct PageServerConf {
pub remote_storage_config: Option<RemoteStorageConfig>, pub remote_storage_config: Option<RemoteStorageConfig>,
} }
// use dedicated enum for builder to better indicate the intention
// and avoid possible confusion with nested options
pub enum BuilderValue<T> {
Set(T),
NotSet,
}
impl<T> BuilderValue<T> {
pub fn ok_or<E>(self, err: E) -> Result<T, E> {
match self {
Self::Set(v) => Ok(v),
Self::NotSet => Err(err),
}
}
}
// needed to simplify config construction
struct PageServerConfigBuilder {
listen_pg_addr: BuilderValue<String>,
listen_http_addr: BuilderValue<String>,
checkpoint_distance: BuilderValue<u64>,
checkpoint_period: BuilderValue<Duration>,
gc_horizon: BuilderValue<u64>,
gc_period: BuilderValue<Duration>,
superuser: BuilderValue<String>,
page_cache_size: BuilderValue<usize>,
max_file_descriptors: BuilderValue<usize>,
workdir: BuilderValue<PathBuf>,
pg_distrib_dir: BuilderValue<PathBuf>,
auth_type: BuilderValue<AuthType>,
//
auth_validation_public_key_path: BuilderValue<Option<PathBuf>>,
remote_storage_config: BuilderValue<Option<RemoteStorageConfig>>,
id: BuilderValue<ZNodeId>,
}
impl Default for PageServerConfigBuilder {
fn default() -> Self {
use self::BuilderValue::*;
use defaults::*;
Self {
listen_pg_addr: Set(DEFAULT_PG_LISTEN_ADDR.to_string()),
listen_http_addr: Set(DEFAULT_HTTP_LISTEN_ADDR.to_string()),
checkpoint_distance: Set(DEFAULT_CHECKPOINT_DISTANCE),
checkpoint_period: Set(humantime::parse_duration(DEFAULT_CHECKPOINT_PERIOD)
.expect("cannot parse default checkpoint period")),
gc_horizon: Set(DEFAULT_GC_HORIZON),
gc_period: Set(humantime::parse_duration(DEFAULT_GC_PERIOD)
.expect("cannot parse default gc period")),
superuser: Set(DEFAULT_SUPERUSER.to_string()),
page_cache_size: Set(DEFAULT_PAGE_CACHE_SIZE),
max_file_descriptors: Set(DEFAULT_MAX_FILE_DESCRIPTORS),
workdir: Set(PathBuf::new()),
pg_distrib_dir: Set(env::current_dir()
.expect("cannot access current directory")
.join("tmp_install")),
auth_type: Set(AuthType::Trust),
auth_validation_public_key_path: Set(None),
remote_storage_config: Set(None),
id: NotSet,
}
}
}
impl PageServerConfigBuilder {
pub fn listen_pg_addr(&mut self, listen_pg_addr: String) {
self.listen_pg_addr = BuilderValue::Set(listen_pg_addr)
}
pub fn listen_http_addr(&mut self, listen_http_addr: String) {
self.listen_http_addr = BuilderValue::Set(listen_http_addr)
}
pub fn checkpoint_distance(&mut self, checkpoint_distance: u64) {
self.checkpoint_distance = BuilderValue::Set(checkpoint_distance)
}
pub fn checkpoint_period(&mut self, checkpoint_period: Duration) {
self.checkpoint_period = BuilderValue::Set(checkpoint_period)
}
pub fn gc_horizon(&mut self, gc_horizon: u64) {
self.gc_horizon = BuilderValue::Set(gc_horizon)
}
pub fn gc_period(&mut self, gc_period: Duration) {
self.gc_period = BuilderValue::Set(gc_period)
}
pub fn superuser(&mut self, superuser: String) {
self.superuser = BuilderValue::Set(superuser)
}
pub fn page_cache_size(&mut self, page_cache_size: usize) {
self.page_cache_size = BuilderValue::Set(page_cache_size)
}
pub fn max_file_descriptors(&mut self, max_file_descriptors: usize) {
self.max_file_descriptors = BuilderValue::Set(max_file_descriptors)
}
pub fn workdir(&mut self, workdir: PathBuf) {
self.workdir = BuilderValue::Set(workdir)
}
pub fn pg_distrib_dir(&mut self, pg_distrib_dir: PathBuf) {
self.pg_distrib_dir = BuilderValue::Set(pg_distrib_dir)
}
pub fn auth_type(&mut self, auth_type: AuthType) {
self.auth_type = BuilderValue::Set(auth_type)
}
pub fn auth_validation_public_key_path(
&mut self,
auth_validation_public_key_path: Option<PathBuf>,
) {
self.auth_validation_public_key_path = BuilderValue::Set(auth_validation_public_key_path)
}
pub fn remote_storage_config(&mut self, remote_storage_config: Option<RemoteStorageConfig>) {
self.remote_storage_config = BuilderValue::Set(remote_storage_config)
}
pub fn id(&mut self, node_id: ZNodeId) {
self.id = BuilderValue::Set(node_id)
}
pub fn build(self) -> Result<PageServerConf> {
Ok(PageServerConf {
listen_pg_addr: self
.listen_pg_addr
.ok_or(anyhow::anyhow!("missing listen_pg_addr"))?,
listen_http_addr: self
.listen_http_addr
.ok_or(anyhow::anyhow!("missing listen_http_addr"))?,
checkpoint_distance: self
.checkpoint_distance
.ok_or(anyhow::anyhow!("missing checkpoint_distance"))?,
checkpoint_period: self
.checkpoint_period
.ok_or(anyhow::anyhow!("missing checkpoint_period"))?,
gc_horizon: self
.gc_horizon
.ok_or(anyhow::anyhow!("missing gc_horizon"))?,
gc_period: self.gc_period.ok_or(anyhow::anyhow!("missing gc_period"))?,
superuser: self.superuser.ok_or(anyhow::anyhow!("missing superuser"))?,
page_cache_size: self
.page_cache_size
.ok_or(anyhow::anyhow!("missing page_cache_size"))?,
max_file_descriptors: self
.max_file_descriptors
.ok_or(anyhow::anyhow!("missing max_file_descriptors"))?,
workdir: self.workdir.ok_or(anyhow::anyhow!("missing workdir"))?,
pg_distrib_dir: self
.pg_distrib_dir
.ok_or(anyhow::anyhow!("missing pg_distrib_dir"))?,
auth_type: self.auth_type.ok_or(anyhow::anyhow!("missing auth_type"))?,
auth_validation_public_key_path: self
.auth_validation_public_key_path
.ok_or(anyhow::anyhow!("missing auth_validation_public_key_path"))?,
remote_storage_config: self
.remote_storage_config
.ok_or(anyhow::anyhow!("missing remote_storage_config"))?,
id: self.id.ok_or(anyhow::anyhow!("missing id"))?,
})
}
}
/// External backup storage configuration, enough for creating a client for that storage. /// External backup storage configuration, enough for creating a client for that storage.
#[derive(Debug, Clone, PartialEq, Eq)] #[derive(Debug, Clone, PartialEq, Eq)]
pub struct RemoteStorageConfig { pub struct RemoteStorageConfig {
@@ -221,57 +403,39 @@ impl PageServerConf {
/// ///
/// This leaves any options not present in the file in the built-in defaults. /// This leaves any options not present in the file in the built-in defaults.
pub fn parse_and_validate(toml: &Document, workdir: &Path) -> Result<Self> { pub fn parse_and_validate(toml: &Document, workdir: &Path) -> Result<Self> {
use defaults::*; let mut builder = PageServerConfigBuilder::default();
builder.workdir(workdir.to_owned());
let mut conf = PageServerConf {
workdir: workdir.to_path_buf(),
listen_pg_addr: DEFAULT_PG_LISTEN_ADDR.to_string(),
listen_http_addr: DEFAULT_HTTP_LISTEN_ADDR.to_string(),
checkpoint_distance: DEFAULT_CHECKPOINT_DISTANCE,
checkpoint_period: humantime::parse_duration(DEFAULT_CHECKPOINT_PERIOD)?,
gc_horizon: DEFAULT_GC_HORIZON,
gc_period: humantime::parse_duration(DEFAULT_GC_PERIOD)?,
page_cache_size: DEFAULT_PAGE_CACHE_SIZE,
max_file_descriptors: DEFAULT_MAX_FILE_DESCRIPTORS,
pg_distrib_dir: PathBuf::new(),
auth_validation_public_key_path: None,
auth_type: AuthType::Trust,
remote_storage_config: None,
superuser: DEFAULT_SUPERUSER.to_string(),
};
for (key, item) in toml.iter() { for (key, item) in toml.iter() {
match key { match key {
"listen_pg_addr" => conf.listen_pg_addr = parse_toml_string(key, item)?, "listen_pg_addr" => builder.listen_pg_addr(parse_toml_string(key, item)?),
"listen_http_addr" => conf.listen_http_addr = parse_toml_string(key, item)?, "listen_http_addr" => builder.listen_http_addr(parse_toml_string(key, item)?),
"checkpoint_distance" => conf.checkpoint_distance = parse_toml_u64(key, item)?, "checkpoint_distance" => builder.checkpoint_distance(parse_toml_u64(key, item)?),
"checkpoint_period" => conf.checkpoint_period = parse_toml_duration(key, item)?, "checkpoint_period" => builder.checkpoint_period(parse_toml_duration(key, item)?),
"gc_horizon" => conf.gc_horizon = parse_toml_u64(key, item)?, "gc_horizon" => builder.gc_horizon(parse_toml_u64(key, item)?),
"gc_period" => conf.gc_period = parse_toml_duration(key, item)?, "gc_period" => builder.gc_period(parse_toml_duration(key, item)?),
"initial_superuser_name" => conf.superuser = parse_toml_string(key, item)?, "initial_superuser_name" => builder.superuser(parse_toml_string(key, item)?),
"page_cache_size" => conf.page_cache_size = parse_toml_u64(key, item)? as usize, "page_cache_size" => builder.page_cache_size(parse_toml_u64(key, item)? as usize),
"max_file_descriptors" => { "max_file_descriptors" => {
conf.max_file_descriptors = parse_toml_u64(key, item)? as usize builder.max_file_descriptors(parse_toml_u64(key, item)? as usize)
} }
"pg_distrib_dir" => { "pg_distrib_dir" => {
conf.pg_distrib_dir = PathBuf::from(parse_toml_string(key, item)?) builder.pg_distrib_dir(PathBuf::from(parse_toml_string(key, item)?))
} }
"auth_validation_public_key_path" => { "auth_validation_public_key_path" => builder.auth_validation_public_key_path(Some(
conf.auth_validation_public_key_path = PathBuf::from(parse_toml_string(key, item)?),
Some(PathBuf::from(parse_toml_string(key, item)?)) )),
} "auth_type" => builder.auth_type(parse_toml_auth_type(key, item)?),
"auth_type" => conf.auth_type = parse_toml_auth_type(key, item)?,
"remote_storage" => { "remote_storage" => {
conf.remote_storage_config = Some(Self::parse_remote_storage_config(item)?) builder.remote_storage_config(Some(Self::parse_remote_storage_config(item)?))
} }
"id" => builder.id(ZNodeId(parse_toml_u64(key, item)?)),
_ => bail!("unrecognized pageserver option '{}'", key), _ => bail!("unrecognized pageserver option '{}'", key),
} }
} }
let mut conf = builder.build().context("invalid config")?;
if conf.auth_type == AuthType::ZenithJWT { if conf.auth_type == AuthType::ZenithJWT {
let auth_validation_public_key_path = conf let auth_validation_public_key_path = conf
.auth_validation_public_key_path .auth_validation_public_key_path
@@ -285,9 +449,6 @@ impl PageServerConf {
); );
} }
if conf.pg_distrib_dir == PathBuf::new() {
conf.pg_distrib_dir = env::current_dir()?.join("tmp_install")
};
if !conf.pg_distrib_dir.join("bin/postgres").exists() { if !conf.pg_distrib_dir.join("bin/postgres").exists() {
bail!( bail!(
"Can't find postgres binary at {}", "Can't find postgres binary at {}",
@@ -382,6 +543,7 @@ impl PageServerConf {
#[cfg(test)] #[cfg(test)]
pub fn dummy_conf(repo_dir: PathBuf) -> Self { pub fn dummy_conf(repo_dir: PathBuf) -> Self {
PageServerConf { PageServerConf {
id: ZNodeId(0),
checkpoint_distance: defaults::DEFAULT_CHECKPOINT_DISTANCE, checkpoint_distance: defaults::DEFAULT_CHECKPOINT_DISTANCE,
checkpoint_period: Duration::from_secs(10), checkpoint_period: Duration::from_secs(10),
gc_horizon: defaults::DEFAULT_GC_HORIZON, gc_horizon: defaults::DEFAULT_GC_HORIZON,
@@ -461,15 +623,16 @@ max_file_descriptors = 333
# initial superuser role name to use when creating a new tenant # initial superuser role name to use when creating a new tenant
initial_superuser_name = 'zzzz' initial_superuser_name = 'zzzz'
id = 10
"#; "#;
#[test] #[test]
fn parse_defaults() -> anyhow::Result<()> { fn parse_defaults() -> anyhow::Result<()> {
let tempdir = tempdir()?; let tempdir = tempdir()?;
let (workdir, pg_distrib_dir) = prepare_fs(&tempdir)?; let (workdir, pg_distrib_dir) = prepare_fs(&tempdir)?;
// we have to create dummy pathes to overcome the validation errors // we have to create dummy pathes to overcome the validation errors
let config_string = format!("pg_distrib_dir='{}'", pg_distrib_dir.display()); let config_string = format!("pg_distrib_dir='{}'\nid=10", pg_distrib_dir.display());
let toml = config_string.parse()?; let toml = config_string.parse()?;
let parsed_config = let parsed_config =
@@ -480,6 +643,7 @@ initial_superuser_name = 'zzzz'
assert_eq!( assert_eq!(
parsed_config, parsed_config,
PageServerConf { PageServerConf {
id: ZNodeId(10),
listen_pg_addr: defaults::DEFAULT_PG_LISTEN_ADDR.to_string(), listen_pg_addr: defaults::DEFAULT_PG_LISTEN_ADDR.to_string(),
listen_http_addr: defaults::DEFAULT_HTTP_LISTEN_ADDR.to_string(), listen_http_addr: defaults::DEFAULT_HTTP_LISTEN_ADDR.to_string(),
checkpoint_distance: defaults::DEFAULT_CHECKPOINT_DISTANCE, checkpoint_distance: defaults::DEFAULT_CHECKPOINT_DISTANCE,
@@ -521,6 +685,7 @@ initial_superuser_name = 'zzzz'
assert_eq!( assert_eq!(
parsed_config, parsed_config,
PageServerConf { PageServerConf {
id: ZNodeId(10),
listen_pg_addr: "127.0.0.1:64000".to_string(), listen_pg_addr: "127.0.0.1:64000".to_string(),
listen_http_addr: "127.0.0.1:9898".to_string(), listen_http_addr: "127.0.0.1:9898".to_string(),
checkpoint_distance: 111, checkpoint_distance: 111,

View File

@@ -1,6 +1,7 @@
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
use crate::ZTenantId; use crate::ZTenantId;
use zenith_utils::zid::ZNodeId;
#[derive(Serialize, Deserialize)] #[derive(Serialize, Deserialize)]
pub struct BranchCreateRequest { pub struct BranchCreateRequest {
@@ -15,3 +16,8 @@ pub struct TenantCreateRequest {
#[serde(with = "hex")] #[serde(with = "hex")]
pub tenant_id: ZTenantId, pub tenant_id: ZTenantId,
} }
#[derive(Serialize)]
pub struct StatusResponse {
pub id: ZNodeId,
}

View File

@@ -17,6 +17,11 @@ paths:
application/json: application/json:
schema: schema:
type: object type: object
required:
- id
properties:
id:
type: integer
/v1/timeline/{tenant_id}: /v1/timeline/{tenant_id}:
parameters: parameters:
- name: tenant_id - name: tenant_id

View File

@@ -1,7 +1,6 @@
use std::sync::Arc; use std::sync::Arc;
use anyhow::{Context, Result}; use anyhow::{Context, Result};
use hyper::header;
use hyper::StatusCode; use hyper::StatusCode;
use hyper::{Body, Request, Response, Uri}; use hyper::{Body, Request, Response, Uri};
use serde::Serialize; use serde::Serialize;
@@ -23,6 +22,7 @@ use zenith_utils::lsn::Lsn;
use zenith_utils::zid::{opt_display_serde, ZTimelineId}; use zenith_utils::zid::{opt_display_serde, ZTimelineId};
use super::models::BranchCreateRequest; use super::models::BranchCreateRequest;
use super::models::StatusResponse;
use super::models::TenantCreateRequest; use super::models::TenantCreateRequest;
use crate::branches::BranchInfo; use crate::branches::BranchInfo;
use crate::repository::RepositoryTimeline; use crate::repository::RepositoryTimeline;
@@ -64,12 +64,12 @@ fn get_config(request: &Request<Body>) -> &'static PageServerConf {
} }
// healthcheck handler // healthcheck handler
async fn status_handler(_: Request<Body>) -> Result<Response<Body>, ApiError> { async fn status_handler(request: Request<Body>) -> Result<Response<Body>, ApiError> {
Ok(Response::builder() let config = get_config(&request);
.status(StatusCode::OK) Ok(json_response(
.header(header::CONTENT_TYPE, "application/json") StatusCode::OK,
.body(Body::from("{}")) StatusResponse { id: config.id },
.map_err(ApiError::from_err)?) )?)
} }
async fn branch_create_handler(mut request: Request<Body>) -> Result<Response<Body>, ApiError> { async fn branch_create_handler(mut request: Request<Body>) -> Result<Response<Body>, ApiError> {

View File

@@ -89,7 +89,7 @@ def test_foobar(zenith_env_builder: ZenithEnvBuilder):
# Now create the environment. This initializes the repository, and starts # Now create the environment. This initializes the repository, and starts
# up the page server and the safekeepers # up the page server and the safekeepers
env = zenith_env_builder.init() env = zenith_env_builder.init_start()
# Run the test # Run the test
... ...

View File

@@ -8,7 +8,7 @@ import pytest
def test_pageserver_auth(zenith_env_builder: ZenithEnvBuilder): def test_pageserver_auth(zenith_env_builder: ZenithEnvBuilder):
zenith_env_builder.pageserver_auth_enabled = True zenith_env_builder.pageserver_auth_enabled = True
env = zenith_env_builder.init() env = zenith_env_builder.init_start()
ps = env.pageserver ps = env.pageserver
@@ -51,7 +51,7 @@ def test_compute_auth_to_pageserver(zenith_env_builder: ZenithEnvBuilder, with_w
zenith_env_builder.pageserver_auth_enabled = True zenith_env_builder.pageserver_auth_enabled = True
if with_wal_acceptors: if with_wal_acceptors:
zenith_env_builder.num_safekeepers = 3 zenith_env_builder.num_safekeepers = 3
env = zenith_env_builder.init() env = zenith_env_builder.init_start()
branch = f"test_compute_auth_to_pageserver{with_wal_acceptors}" branch = f"test_compute_auth_to_pageserver{with_wal_acceptors}"
env.zenith_cli.create_branch(branch, "main") env.zenith_cli.create_branch(branch, "main")

View File

@@ -19,7 +19,7 @@ def test_branch_behind(zenith_env_builder: ZenithEnvBuilder):
# #
# See https://github.com/zenithdb/zenith/issues/1068 # See https://github.com/zenithdb/zenith/issues/1068
zenith_env_builder.num_safekeepers = 1 zenith_env_builder.num_safekeepers = 1
env = zenith_env_builder.init() env = zenith_env_builder.init_start()
# Branch at the point where only 100 rows were inserted # Branch at the point where only 100 rows were inserted
env.zenith_cli.create_branch("test_branch_behind", "main") env.zenith_cli.create_branch("test_branch_behind", "main")

View File

@@ -11,7 +11,7 @@ from fixtures.log_helper import log
def test_next_xid(zenith_env_builder: ZenithEnvBuilder): def test_next_xid(zenith_env_builder: ZenithEnvBuilder):
# One safekeeper is enough for this test. # One safekeeper is enough for this test.
zenith_env_builder.num_safekeepers = 1 zenith_env_builder.num_safekeepers = 1
env = zenith_env_builder.init() env = zenith_env_builder.init_start()
pg = env.postgres.create_start('main') pg = env.postgres.create_start('main')

View File

@@ -1,8 +1,15 @@
import json
from uuid import uuid4, UUID from uuid import uuid4, UUID
from fixtures.zenith_fixtures import ZenithEnv, ZenithEnvBuilder, ZenithPageserverHttpClient import pytest
from typing import cast from fixtures.zenith_fixtures import ZenithEnv, ZenithEnvBuilder, ZenithPageserverHttpClient, zenith_binpath
import pytest, psycopg2
# test that we cannot override node id
def test_pageserver_init_node_id(zenith_env_builder: ZenithEnvBuilder):
env = zenith_env_builder.init()
with pytest.raises(
Exception,
match="node id can only be set during pageserver init and cannot be overridden"):
env.pageserver.start(overrides=['--pageserver-config-override=id=10'])
def check_client(client: ZenithPageserverHttpClient, initial_tenant: UUID): def check_client(client: ZenithPageserverHttpClient, initial_tenant: UUID):
@@ -41,7 +48,7 @@ def test_pageserver_http_api_client(zenith_simple_env: ZenithEnv):
def test_pageserver_http_api_client_auth_enabled(zenith_env_builder: ZenithEnvBuilder): def test_pageserver_http_api_client_auth_enabled(zenith_env_builder: ZenithEnvBuilder):
zenith_env_builder.pageserver_auth_enabled = True zenith_env_builder.pageserver_auth_enabled = True
env = zenith_env_builder.init() env = zenith_env_builder.init_start()
management_token = env.auth_keys.generate_management_token() management_token = env.auth_keys.generate_management_token()

View File

@@ -14,7 +14,7 @@ from fixtures.log_helper import log
# and new compute node contains all data. # and new compute node contains all data.
def test_pageserver_catchup_while_compute_down(zenith_env_builder: ZenithEnvBuilder): def test_pageserver_catchup_while_compute_down(zenith_env_builder: ZenithEnvBuilder):
zenith_env_builder.num_safekeepers = 3 zenith_env_builder.num_safekeepers = 3
env = zenith_env_builder.init() env = zenith_env_builder.init_start()
env.zenith_cli.create_branch("test_pageserver_catchup_while_compute_down", "main") env.zenith_cli.create_branch("test_pageserver_catchup_while_compute_down", "main")
pg = env.postgres.create_start('test_pageserver_catchup_while_compute_down') pg = env.postgres.create_start('test_pageserver_catchup_while_compute_down')

View File

@@ -13,7 +13,7 @@ from fixtures.log_helper import log
def test_pageserver_restart(zenith_env_builder: ZenithEnvBuilder): def test_pageserver_restart(zenith_env_builder: ZenithEnvBuilder):
# One safekeeper is enough for this test. # One safekeeper is enough for this test.
zenith_env_builder.num_safekeepers = 1 zenith_env_builder.num_safekeepers = 1
env = zenith_env_builder.init() env = zenith_env_builder.init_start()
env.zenith_cli.create_branch("test_pageserver_restart", "main") env.zenith_cli.create_branch("test_pageserver_restart", "main")
pg = env.postgres.create_start('test_pageserver_restart') pg = env.postgres.create_start('test_pageserver_restart')

View File

@@ -42,7 +42,7 @@ def test_remote_storage_backup_and_restore(zenith_env_builder: ZenithEnvBuilder,
data_secret = 'very secret secret' data_secret = 'very secret secret'
##### First start, insert secret data and upload it to the remote storage ##### First start, insert secret data and upload it to the remote storage
env = zenith_env_builder.init() env = zenith_env_builder.init_start()
pg = env.postgres.create_start() pg = env.postgres.create_start()
tenant_id = pg.safe_psql("show zenith.zenith_tenant")[0][0] tenant_id = pg.safe_psql("show zenith.zenith_tenant")[0][0]

View File

@@ -13,7 +13,7 @@ def test_restart_compute(zenith_env_builder: ZenithEnvBuilder, with_wal_acceptor
zenith_env_builder.pageserver_auth_enabled = True zenith_env_builder.pageserver_auth_enabled = True
if with_wal_acceptors: if with_wal_acceptors:
zenith_env_builder.num_safekeepers = 3 zenith_env_builder.num_safekeepers = 3
env = zenith_env_builder.init() env = zenith_env_builder.init_start()
env.zenith_cli.create_branch("test_restart_compute", "main") env.zenith_cli.create_branch("test_restart_compute", "main")

View File

@@ -122,7 +122,7 @@ def test_tenant_relocation(zenith_env_builder: ZenithEnvBuilder,
zenith_env_builder.num_safekeepers = 1 zenith_env_builder.num_safekeepers = 1
zenith_env_builder.enable_local_fs_remote_storage() zenith_env_builder.enable_local_fs_remote_storage()
env = zenith_env_builder.init() env = zenith_env_builder.init_start()
# create folder for remote storage mock # create folder for remote storage mock
remote_storage_mock_path = env.repo_dir / 'local_fs_remote_storage' remote_storage_mock_path = env.repo_dir / 'local_fs_remote_storage'

View File

@@ -10,7 +10,7 @@ def test_tenants_normal_work(zenith_env_builder: ZenithEnvBuilder, with_wal_acce
if with_wal_acceptors: if with_wal_acceptors:
zenith_env_builder.num_safekeepers = 3 zenith_env_builder.num_safekeepers = 3
env = zenith_env_builder.init() env = zenith_env_builder.init_start()
"""Tests tenants with and without wal acceptors""" """Tests tenants with and without wal acceptors"""
tenant_1 = env.create_tenant() tenant_1 = env.create_tenant()
tenant_2 = env.create_tenant() tenant_2 = env.create_tenant()

View File

@@ -67,7 +67,7 @@ def wait_for_pageserver_catchup(pgmain: Postgres, polling_interval=1, timeout=60
def test_timeline_size_quota(zenith_env_builder: ZenithEnvBuilder): def test_timeline_size_quota(zenith_env_builder: ZenithEnvBuilder):
zenith_env_builder.num_safekeepers = 1 zenith_env_builder.num_safekeepers = 1
env = zenith_env_builder.init() env = zenith_env_builder.init_start()
env.zenith_cli.create_branch("test_timeline_size_quota", "main") env.zenith_cli.create_branch("test_timeline_size_quota", "main")
client = env.pageserver.http_client() client = env.pageserver.http_client()

View File

@@ -22,7 +22,7 @@ from typing import List, Optional, Any
# succeed and data is written # succeed and data is written
def test_normal_work(zenith_env_builder: ZenithEnvBuilder): def test_normal_work(zenith_env_builder: ZenithEnvBuilder):
zenith_env_builder.num_safekeepers = 3 zenith_env_builder.num_safekeepers = 3
env = zenith_env_builder.init() env = zenith_env_builder.init_start()
env.zenith_cli.create_branch("test_wal_acceptors_normal_work", "main") env.zenith_cli.create_branch("test_wal_acceptors_normal_work", "main")
@@ -51,7 +51,7 @@ class BranchMetrics:
# against different timelines. # against different timelines.
def test_many_timelines(zenith_env_builder: ZenithEnvBuilder): def test_many_timelines(zenith_env_builder: ZenithEnvBuilder):
zenith_env_builder.num_safekeepers = 3 zenith_env_builder.num_safekeepers = 3
env = zenith_env_builder.init() env = zenith_env_builder.init_start()
n_timelines = 3 n_timelines = 3
@@ -181,7 +181,7 @@ def test_restarts(zenith_env_builder: ZenithEnvBuilder):
n_acceptors = 3 n_acceptors = 3
zenith_env_builder.num_safekeepers = n_acceptors zenith_env_builder.num_safekeepers = n_acceptors
env = zenith_env_builder.init() env = zenith_env_builder.init_start()
env.zenith_cli.create_branch("test_wal_acceptors_restarts", "main") env.zenith_cli.create_branch("test_wal_acceptors_restarts", "main")
pg = env.postgres.create_start('test_wal_acceptors_restarts') pg = env.postgres.create_start('test_wal_acceptors_restarts')
@@ -218,7 +218,7 @@ def delayed_wal_acceptor_start(wa):
# When majority of acceptors is offline, commits are expected to be frozen # When majority of acceptors is offline, commits are expected to be frozen
def test_unavailability(zenith_env_builder: ZenithEnvBuilder): def test_unavailability(zenith_env_builder: ZenithEnvBuilder):
zenith_env_builder.num_safekeepers = 2 zenith_env_builder.num_safekeepers = 2
env = zenith_env_builder.init() env = zenith_env_builder.init_start()
env.zenith_cli.create_branch("test_wal_acceptors_unavailability", "main") env.zenith_cli.create_branch("test_wal_acceptors_unavailability", "main")
pg = env.postgres.create_start('test_wal_acceptors_unavailability') pg = env.postgres.create_start('test_wal_acceptors_unavailability')
@@ -289,7 +289,7 @@ def stop_value():
def test_race_conditions(zenith_env_builder: ZenithEnvBuilder, stop_value): def test_race_conditions(zenith_env_builder: ZenithEnvBuilder, stop_value):
zenith_env_builder.num_safekeepers = 3 zenith_env_builder.num_safekeepers = 3
env = zenith_env_builder.init() env = zenith_env_builder.init_start()
env.zenith_cli.create_branch("test_wal_acceptors_race_conditions", "main") env.zenith_cli.create_branch("test_wal_acceptors_race_conditions", "main")
pg = env.postgres.create_start('test_wal_acceptors_race_conditions') pg = env.postgres.create_start('test_wal_acceptors_race_conditions')
@@ -404,7 +404,7 @@ def test_sync_safekeepers(zenith_env_builder: ZenithEnvBuilder,
# We don't really need the full environment for this test, just the # We don't really need the full environment for this test, just the
# safekeepers would be enough. # safekeepers would be enough.
zenith_env_builder.num_safekeepers = 3 zenith_env_builder.num_safekeepers = 3
env = zenith_env_builder.init() env = zenith_env_builder.init_start()
timeline_id = uuid.uuid4() timeline_id = uuid.uuid4()
tenant_id = uuid.uuid4() tenant_id = uuid.uuid4()
@@ -454,7 +454,7 @@ def test_sync_safekeepers(zenith_env_builder: ZenithEnvBuilder,
def test_timeline_status(zenith_env_builder: ZenithEnvBuilder): def test_timeline_status(zenith_env_builder: ZenithEnvBuilder):
zenith_env_builder.num_safekeepers = 1 zenith_env_builder.num_safekeepers = 1
env = zenith_env_builder.init() env = zenith_env_builder.init_start()
env.zenith_cli.create_branch("test_timeline_status", "main") env.zenith_cli.create_branch("test_timeline_status", "main")
pg = env.postgres.create_start('test_timeline_status') pg = env.postgres.create_start('test_timeline_status')
@@ -521,12 +521,7 @@ class SafekeeperEnv:
http=self.port_distributor.get_port(), http=self.port_distributor.get_port(),
) )
if self.num_safekeepers == 1: safekeeper_dir = os.path.join(self.repo_dir, f"sk{i}")
name = "single"
else:
name = f"sk{i}"
safekeeper_dir = os.path.join(self.repo_dir, name)
mkdir_if_needed(safekeeper_dir) mkdir_if_needed(safekeeper_dir)
args = [ args = [
@@ -537,6 +532,8 @@ class SafekeeperEnv:
f"127.0.0.1:{port.http}", f"127.0.0.1:{port.http}",
"-D", "-D",
safekeeper_dir, safekeeper_dir,
"--id",
str(i),
"--daemonize" "--daemonize"
] ]
@@ -604,9 +601,8 @@ def test_safekeeper_without_pageserver(test_output_dir: str,
def test_replace_safekeeper(zenith_env_builder: ZenithEnvBuilder): def test_replace_safekeeper(zenith_env_builder: ZenithEnvBuilder):
def safekeepers_guc(env: ZenithEnv, sk_names: List[str]) -> str: def safekeepers_guc(env: ZenithEnv, sk_names: List[int]) -> str:
return ','.join( return ','.join([f'localhost:{sk.port.pg}' for sk in env.safekeepers if sk.id in sk_names])
[f'localhost:{sk.port.pg}' for sk in env.safekeepers if sk.name in sk_names])
def execute_payload(pg: Postgres): def execute_payload(pg: Postgres):
with closing(pg.connect()) as conn: with closing(pg.connect()) as conn:
@@ -628,17 +624,17 @@ def test_replace_safekeeper(zenith_env_builder: ZenithEnvBuilder):
http_cli = sk.http_client() http_cli = sk.http_client()
try: try:
status = http_cli.timeline_status(tenant_id, timeline_id) status = http_cli.timeline_status(tenant_id, timeline_id)
log.info(f"Safekeeper {sk.name} status: {status}") log.info(f"Safekeeper {sk.id} status: {status}")
except Exception as e: except Exception as e:
log.info(f"Safekeeper {sk.name} status error: {e}") log.info(f"Safekeeper {sk.id} status error: {e}")
zenith_env_builder.num_safekeepers = 4 zenith_env_builder.num_safekeepers = 4
env = zenith_env_builder.init() env = zenith_env_builder.init_start()
env.zenith_cli.create_branch("test_replace_safekeeper", "main") env.zenith_cli.create_branch("test_replace_safekeeper", "main")
log.info("Use only first 3 safekeepers") log.info("Use only first 3 safekeepers")
env.safekeepers[3].stop() env.safekeepers[3].stop()
active_safekeepers = ['sk1', 'sk2', 'sk3'] active_safekeepers = [1, 2, 3]
pg = env.postgres.create('test_replace_safekeeper') pg = env.postgres.create('test_replace_safekeeper')
pg.adjust_for_wal_acceptors(safekeepers_guc(env, active_safekeepers)) pg.adjust_for_wal_acceptors(safekeepers_guc(env, active_safekeepers))
pg.start() pg.start()
@@ -678,7 +674,7 @@ def test_replace_safekeeper(zenith_env_builder: ZenithEnvBuilder):
log.info("Recreate postgres to replace failed sk1 with new sk4") log.info("Recreate postgres to replace failed sk1 with new sk4")
pg.stop_and_destroy().create('test_replace_safekeeper') pg.stop_and_destroy().create('test_replace_safekeeper')
active_safekeepers = ['sk2', 'sk3', 'sk4'] active_safekeepers = [2, 3, 4]
env.safekeepers[3].start() env.safekeepers[3].start()
pg.adjust_for_wal_acceptors(safekeepers_guc(env, active_safekeepers)) pg.adjust_for_wal_acceptors(safekeepers_guc(env, active_safekeepers))
pg.start() pg.start()

View File

@@ -200,7 +200,7 @@ async def run_restarts_under_load(pg: Postgres, acceptors: List[Safekeeper], n_w
# restart acceptors one by one, while executing and validating bank transactions # restart acceptors one by one, while executing and validating bank transactions
def test_restarts_under_load(zenith_env_builder: ZenithEnvBuilder): def test_restarts_under_load(zenith_env_builder: ZenithEnvBuilder):
zenith_env_builder.num_safekeepers = 3 zenith_env_builder.num_safekeepers = 3
env = zenith_env_builder.init() env = zenith_env_builder.init_start()
env.zenith_cli.create_branch("test_wal_acceptors_restarts_under_load", "main") env.zenith_cli.create_branch("test_wal_acceptors_restarts_under_load", "main")
pg = env.postgres.create_start('test_wal_acceptors_restarts_under_load') pg = env.postgres.create_start('test_wal_acceptors_restarts_under_load')

View File

@@ -97,7 +97,7 @@ def test_cli_tenant_list(zenith_simple_env: ZenithEnv):
def test_cli_ipv4_listeners(zenith_env_builder: ZenithEnvBuilder): def test_cli_ipv4_listeners(zenith_env_builder: ZenithEnvBuilder):
# Start with single sk # Start with single sk
zenith_env_builder.num_safekeepers = 1 zenith_env_builder.num_safekeepers = 1
env = zenith_env_builder.init() env = zenith_env_builder.init_start()
# Connect to sk port on v4 loopback # Connect to sk port on v4 loopback
res = requests.get(f'http://127.0.0.1:{env.safekeepers[0].port.http}/v1/status') res = requests.get(f'http://127.0.0.1:{env.safekeepers[0].port.http}/v1/status')
@@ -114,7 +114,7 @@ def test_cli_ipv4_listeners(zenith_env_builder: ZenithEnvBuilder):
def test_cli_start_stop(zenith_env_builder: ZenithEnvBuilder): def test_cli_start_stop(zenith_env_builder: ZenithEnvBuilder):
# Start with single sk # Start with single sk
zenith_env_builder.num_safekeepers = 1 zenith_env_builder.num_safekeepers = 1
env = zenith_env_builder.init() env = zenith_env_builder.init_start()
# Stop default ps/sk # Stop default ps/sk
env.zenith_cli.pageserver_stop() env.zenith_cli.pageserver_stop()

View File

@@ -27,7 +27,7 @@ from dataclasses import dataclass
# Type-related stuff # Type-related stuff
from psycopg2.extensions import connection as PgConnection from psycopg2.extensions import connection as PgConnection
from typing import Any, Callable, Dict, Iterator, List, Optional, TypeVar, cast, Union, Tuple from typing import Any, Callable, Dict, Iterable, Iterator, List, Optional, TypeVar, cast, Union, Tuple
from typing_extensions import Literal from typing_extensions import Literal
import pytest import pytest
@@ -425,6 +425,14 @@ class ZenithEnvBuilder:
self.env = ZenithEnv(self) self.env = ZenithEnv(self)
return self.env return self.env
def start(self):
self.env.start()
def init_start(self) -> ZenithEnv:
env = self.init()
self.start()
return env
""" """
Sets up the pageserver to use the local fs at the `test_dir/local_fs_remote_storage` path. Sets up the pageserver to use the local fs at the `test_dir/local_fs_remote_storage` path.
Errors, if the pageserver has some remote storage configuration already, unless `force_enable` is not set to `True`. Errors, if the pageserver has some remote storage configuration already, unless `force_enable` is not set to `True`.
@@ -540,6 +548,7 @@ class ZenithEnv:
toml += textwrap.dedent(f""" toml += textwrap.dedent(f"""
[pageserver] [pageserver]
id=1
listen_pg_addr = 'localhost:{pageserver_port.pg}' listen_pg_addr = 'localhost:{pageserver_port.pg}'
listen_http_addr = 'localhost:{pageserver_port.http}' listen_http_addr = 'localhost:{pageserver_port.http}'
auth_type = '{pageserver_auth_type}' auth_type = '{pageserver_auth_type}'
@@ -556,25 +565,22 @@ class ZenithEnv:
pg=self.port_distributor.get_port(), pg=self.port_distributor.get_port(),
http=self.port_distributor.get_port(), http=self.port_distributor.get_port(),
) )
id = i # assign ids sequentially
if config.num_safekeepers == 1: toml += textwrap.dedent(f"""
name = "single" [[safekeepers]]
else: id = {id}
name = f"sk{i}" pg_port = {port.pg}
toml += f""" http_port = {port.http}
[[safekeepers]] sync = false # Disable fsyncs to make the tests go faster
name = '{name}' """)
pg_port = {port.pg} safekeeper = Safekeeper(env=self, id=id, port=port)
http_port = {port.http}
sync = false # Disable fsyncs to make the tests go faster
"""
safekeeper = Safekeeper(env=self, name=name, port=port)
self.safekeepers.append(safekeeper) self.safekeepers.append(safekeeper)
log.info(f"Config: {toml}") log.info(f"Config: {toml}")
self.zenith_cli.init(toml) self.zenith_cli.init(toml)
def start(self):
# Start up the page server and all the safekeepers # Start up the page server and all the safekeepers
self.pageserver.start() self.pageserver.start()
@@ -615,7 +621,7 @@ def _shared_simple_env(request: Any, port_distributor) -> Iterator[ZenithEnv]:
with ZenithEnvBuilder(Path(repo_dir), port_distributor) as builder: with ZenithEnvBuilder(Path(repo_dir), port_distributor) as builder:
env = builder.init() env = builder.init_start()
# For convenience in tests, create a branch from the freshly-initialized cluster. # For convenience in tests, create a branch from the freshly-initialized cluster.
env.zenith_cli.create_branch("empty", "main") env.zenith_cli.create_branch("empty", "main")
@@ -649,7 +655,7 @@ def zenith_env_builder(test_output_dir, port_distributor) -> Iterator[ZenithEnvB
To use, define 'zenith_env_builder' fixture in your test to get access to the To use, define 'zenith_env_builder' fixture in your test to get access to the
builder object. Set properties on it to describe the environment. builder object. Set properties on it to describe the environment.
Finally, initialize and start up the environment by calling Finally, initialize and start up the environment by calling
zenith_env_builder.init(). zenith_env_builder.init_start().
After the initialization, you can launch compute nodes by calling After the initialization, you can launch compute nodes by calling
the functions in the 'env.postgres' factory object, stop/start the the functions in the 'env.postgres' factory object, stop/start the
@@ -835,8 +841,9 @@ class ZenithCli:
return self.raw_cli(cmd) return self.raw_cli(cmd)
def pageserver_start(self) -> 'subprocess.CompletedProcess[str]': def pageserver_start(self, overrides=()) -> 'subprocess.CompletedProcess[str]':
start_args = ['pageserver', 'start'] start_args = ['pageserver', 'start', *overrides]
append_pageserver_param_overrides(start_args, self.env.pageserver.remote_storage) append_pageserver_param_overrides(start_args, self.env.pageserver.remote_storage)
return self.raw_cli(start_args) return self.raw_cli(start_args)
@@ -848,17 +855,17 @@ class ZenithCli:
log.info(f"Stopping pageserver with {cmd}") log.info(f"Stopping pageserver with {cmd}")
return self.raw_cli(cmd) return self.raw_cli(cmd)
def safekeeper_start(self, name: str) -> 'subprocess.CompletedProcess[str]': def safekeeper_start(self, id: int) -> 'subprocess.CompletedProcess[str]':
return self.raw_cli(['safekeeper', 'start', name]) return self.raw_cli(['safekeeper', 'start', str(id)])
def safekeeper_stop(self, def safekeeper_stop(self,
name: Optional[str] = None, id: Optional[int] = None,
immediate=False) -> 'subprocess.CompletedProcess[str]': immediate=False) -> 'subprocess.CompletedProcess[str]':
args = ['safekeeper', 'stop'] args = ['safekeeper', 'stop']
if id is not None:
args.extend(str(id))
if immediate: if immediate:
args.extend(['-m', 'immediate']) args.extend(['-m', 'immediate'])
if name is not None:
args.append(name)
return self.raw_cli(args) return self.raw_cli(args)
def pg_create( def pg_create(
@@ -989,14 +996,14 @@ class ZenithPageserver(PgProtocol):
self.service_port = port # do not shadow PgProtocol.port which is just int self.service_port = port # do not shadow PgProtocol.port which is just int
self.remote_storage = remote_storage self.remote_storage = remote_storage
def start(self) -> 'ZenithPageserver': def start(self, overrides=()) -> 'ZenithPageserver':
""" """
Start the page server. Start the page server.
Returns self. Returns self.
""" """
assert self.running == False assert self.running == False
self.env.zenith_cli.pageserver_start() self.env.zenith_cli.pageserver_start(overrides=overrides)
self.running = True self.running = True
return self return self
@@ -1397,12 +1404,14 @@ class Safekeeper:
""" An object representing a running safekeeper daemon. """ """ An object representing a running safekeeper daemon. """
env: ZenithEnv env: ZenithEnv
port: SafekeeperPort port: SafekeeperPort
name: str # identifier for logging id: int
auth_token: Optional[str] = None auth_token: Optional[str] = None
running: bool = False
def start(self) -> 'Safekeeper': def start(self) -> 'Safekeeper':
self.env.zenith_cli.safekeeper_start(self.name) assert self.running == False
self.env.zenith_cli.safekeeper_start(self.id)
self.running = True
# wait for wal acceptor start by checking its status # wait for wal acceptor start by checking its status
started_at = time.time() started_at = time.time()
while True: while True:
@@ -1420,8 +1429,9 @@ class Safekeeper:
return self return self
def stop(self, immediate=False) -> 'Safekeeper': def stop(self, immediate=False) -> 'Safekeeper':
log.info('Stopping safekeeper {}'.format(self.name)) log.info('Stopping safekeeper {}'.format(self.id))
self.env.zenith_cli.safekeeper_stop(self.name, immediate) self.env.zenith_cli.safekeeper_stop(self.id, immediate)
self.running = False
return self return self
def append_logical_message(self, def append_logical_message(self,

View File

@@ -23,7 +23,7 @@ def test_bulk_tenant_create(
"""Measure tenant creation time (with and without wal acceptors)""" """Measure tenant creation time (with and without wal acceptors)"""
if use_wal_acceptors == 'with_wa': if use_wal_acceptors == 'with_wa':
zenith_env_builder.num_safekeepers = 3 zenith_env_builder.num_safekeepers = 3
env = zenith_env_builder.init() env = zenith_env_builder.init_start()
time_slices = [] time_slices = []

View File

@@ -1,17 +1,19 @@
// //
// Main entry point for the safekeeper executable // Main entry point for the safekeeper executable
// //
use anyhow::{Context, Result}; use anyhow::{bail, Context, Result};
use clap::{App, Arg}; use clap::{App, Arg};
use const_format::formatcp; use const_format::formatcp;
use daemonize::Daemonize; use daemonize::Daemonize;
use fs2::FileExt; use fs2::FileExt;
use std::fs::File; use std::fs::{self, File};
use std::io::{ErrorKind, Write};
use std::path::{Path, PathBuf}; use std::path::{Path, PathBuf};
use std::thread; use std::thread;
use tracing::*; use tracing::*;
use walkeeper::control_file::{self, CreateControlFile}; use walkeeper::control_file::{self, CreateControlFile};
use zenith_utils::http::endpoint; use zenith_utils::http::endpoint;
use zenith_utils::zid::ZNodeId;
use zenith_utils::{logging, tcp_listener, GIT_VERSION}; use zenith_utils::{logging, tcp_listener, GIT_VERSION};
use tokio::sync::mpsc; use tokio::sync::mpsc;
@@ -25,6 +27,7 @@ use zenith_utils::shutdown::exit_now;
use zenith_utils::signals; use zenith_utils::signals;
const LOCK_FILE_NAME: &str = "safekeeper.lock"; const LOCK_FILE_NAME: &str = "safekeeper.lock";
const ID_FILE_NAME: &str = "safekeeper.id";
fn main() -> Result<()> { fn main() -> Result<()> {
zenith_metrics::set_common_metrics_prefix("safekeeper"); zenith_metrics::set_common_metrics_prefix("safekeeper");
@@ -93,6 +96,9 @@ fn main() -> Result<()> {
.takes_value(true) .takes_value(true)
.help("Dump control file at path specifed by this argument and exit"), .help("Dump control file at path specifed by this argument and exit"),
) )
.arg(
Arg::new("id").long("id").takes_value(true).help("safekeeper node id: integer")
)
.get_matches(); .get_matches();
if let Some(addr) = arg_matches.value_of("dump-control-file") { if let Some(addr) = arg_matches.value_of("dump-control-file") {
@@ -136,10 +142,19 @@ fn main() -> Result<()> {
conf.recall_period = humantime::parse_duration(recall)?; conf.recall_period = humantime::parse_duration(recall)?;
} }
start_safekeeper(conf) let mut given_id = None;
if let Some(given_id_str) = arg_matches.value_of("id") {
given_id = Some(ZNodeId(
given_id_str
.parse()
.context("failed to parse safekeeper id")?,
));
}
start_safekeeper(conf, given_id)
} }
fn start_safekeeper(conf: SafeKeeperConf) -> Result<()> { fn start_safekeeper(mut conf: SafeKeeperConf, given_id: Option<ZNodeId>) -> Result<()> {
let log_file = logging::init("safekeeper.log", conf.daemonize)?; let log_file = logging::init("safekeeper.log", conf.daemonize)?;
info!("version: {}", GIT_VERSION); info!("version: {}", GIT_VERSION);
@@ -154,6 +169,9 @@ fn start_safekeeper(conf: SafeKeeperConf) -> Result<()> {
) )
})?; })?;
// Set or read our ID.
set_id(&mut conf, given_id)?;
let http_listener = tcp_listener::bind(conf.listen_http_addr.clone()).map_err(|e| { let http_listener = tcp_listener::bind(conf.listen_http_addr.clone()).map_err(|e| {
error!("failed to bind to address {}: {}", conf.listen_http_addr, e); error!("failed to bind to address {}: {}", conf.listen_http_addr, e);
e e
@@ -260,3 +278,49 @@ fn start_safekeeper(conf: SafeKeeperConf) -> Result<()> {
std::process::exit(111); std::process::exit(111);
}) })
} }
/// Determine safekeeper id and set it in config.
fn set_id(conf: &mut SafeKeeperConf, given_id: Option<ZNodeId>) -> Result<()> {
let id_file_path = conf.workdir.join(ID_FILE_NAME);
let my_id: ZNodeId;
// If ID exists, read it in; otherwise set one passed
match fs::read(&id_file_path) {
Ok(id_serialized) => {
my_id = ZNodeId(
std::str::from_utf8(&id_serialized)
.context("failed to parse safekeeper id")?
.parse()
.context("failed to parse safekeeper id")?,
);
if let Some(given_id) = given_id {
if given_id != my_id {
bail!(
"safekeeper already initialized with id {}, can't set {}",
my_id,
given_id
);
}
}
info!("safekeeper ID {}", my_id);
}
Err(error) => match error.kind() {
ErrorKind::NotFound => {
my_id = if let Some(given_id) = given_id {
given_id
} else {
bail!("safekeeper id is not specified");
};
let mut f = File::create(&id_file_path)?;
f.write_all(my_id.to_string().as_bytes())?;
f.sync_all()?;
info!("initialized safekeeper ID {}", my_id);
}
_ => {
return Err(error.into());
}
},
}
conf.my_id = my_id;
Ok(())
}

View File

@@ -5,6 +5,7 @@ use std::fmt::Display;
use std::sync::Arc; use std::sync::Arc;
use zenith_utils::http::{RequestExt, RouterBuilder}; use zenith_utils::http::{RequestExt, RouterBuilder};
use zenith_utils::lsn::Lsn; use zenith_utils::lsn::Lsn;
use zenith_utils::zid::ZNodeId;
use zenith_utils::zid::ZTenantTimelineId; use zenith_utils::zid::ZTenantTimelineId;
use crate::control_file::CreateControlFile; use crate::control_file::CreateControlFile;
@@ -18,9 +19,16 @@ use zenith_utils::http::json::json_response;
use zenith_utils::http::request::parse_request_param; use zenith_utils::http::request::parse_request_param;
use zenith_utils::zid::{ZTenantId, ZTimelineId}; use zenith_utils::zid::{ZTenantId, ZTimelineId};
#[derive(Debug, Serialize)]
struct SafekeeperStatus {
id: ZNodeId,
}
/// Healthcheck handler. /// Healthcheck handler.
async fn status_handler(_: Request<Body>) -> Result<Response<Body>, ApiError> { async fn status_handler(request: Request<Body>) -> Result<Response<Body>, ApiError> {
Ok(json_response(StatusCode::OK, "")?) let conf = get_conf(&request);
let status = SafekeeperStatus { id: conf.my_id };
Ok(json_response(StatusCode::OK, status)?)
} }
fn get_conf(request: &Request<Body>) -> &SafeKeeperConf { fn get_conf(request: &Request<Body>) -> &SafeKeeperConf {

View File

@@ -2,7 +2,7 @@
use std::path::PathBuf; use std::path::PathBuf;
use std::time::Duration; use std::time::Duration;
use zenith_utils::zid::ZTenantTimelineId; use zenith_utils::zid::{ZNodeId, ZTenantTimelineId};
pub mod callmemaybe; pub mod callmemaybe;
pub mod control_file; pub mod control_file;
@@ -46,6 +46,7 @@ pub struct SafeKeeperConf {
pub listen_http_addr: String, pub listen_http_addr: String,
pub ttl: Option<Duration>, pub ttl: Option<Duration>,
pub recall_period: Duration, pub recall_period: Duration,
pub my_id: ZNodeId,
} }
impl SafeKeeperConf { impl SafeKeeperConf {
@@ -69,6 +70,7 @@ impl Default for SafeKeeperConf {
listen_http_addr: defaults::DEFAULT_HTTP_LISTEN_ADDR.to_string(), listen_http_addr: defaults::DEFAULT_HTTP_LISTEN_ADDR.to_string(),
ttl: None, ttl: None,
recall_period: defaults::DEFAULT_RECALL_PERIOD, recall_period: defaults::DEFAULT_RECALL_PERIOD,
my_id: ZNodeId(0),
} }
} }
} }

View File

@@ -18,32 +18,35 @@ use walkeeper::defaults::{
}; };
use zenith_utils::auth::{Claims, Scope}; use zenith_utils::auth::{Claims, Scope};
use zenith_utils::postgres_backend::AuthType; use zenith_utils::postgres_backend::AuthType;
use zenith_utils::zid::{ZTenantId, ZTimelineId}; use zenith_utils::zid::{ZNodeId, ZTenantId, ZTimelineId};
use zenith_utils::GIT_VERSION; use zenith_utils::GIT_VERSION;
use pageserver::branches::BranchInfo; use pageserver::branches::BranchInfo;
// Default name of a safekeeper node, if not specified on the command line. // Default id of a safekeeper node, if not specified on the command line.
const DEFAULT_SAFEKEEPER_NAME: &str = "single"; const DEFAULT_SAFEKEEPER_ID: ZNodeId = ZNodeId(1);
const DEFAULT_PAGESERVER_ID: ZNodeId = ZNodeId(1);
fn default_conf() -> String { fn default_conf() -> String {
format!( format!(
r#" r#"
# Default built-in configuration, defined in main.rs # Default built-in configuration, defined in main.rs
[pageserver] [pageserver]
id = {pageserver_id}
listen_pg_addr = '{pageserver_pg_addr}' listen_pg_addr = '{pageserver_pg_addr}'
listen_http_addr = '{pageserver_http_addr}' listen_http_addr = '{pageserver_http_addr}'
auth_type = '{pageserver_auth_type}' auth_type = '{pageserver_auth_type}'
[[safekeepers]] [[safekeepers]]
name = '{safekeeper_name}' id = {safekeeper_id}
pg_port = {safekeeper_pg_port} pg_port = {safekeeper_pg_port}
http_port = {safekeeper_http_port} http_port = {safekeeper_http_port}
"#, "#,
pageserver_id = DEFAULT_PAGESERVER_ID,
pageserver_pg_addr = DEFAULT_PAGESERVER_PG_ADDR, pageserver_pg_addr = DEFAULT_PAGESERVER_PG_ADDR,
pageserver_http_addr = DEFAULT_PAGESERVER_HTTP_ADDR, pageserver_http_addr = DEFAULT_PAGESERVER_HTTP_ADDR,
pageserver_auth_type = AuthType::Trust, pageserver_auth_type = AuthType::Trust,
safekeeper_name = DEFAULT_SAFEKEEPER_NAME, safekeeper_id = DEFAULT_SAFEKEEPER_ID,
safekeeper_pg_port = DEFAULT_SAFEKEEPER_PG_PORT, safekeeper_pg_port = DEFAULT_SAFEKEEPER_PG_PORT,
safekeeper_http_port = DEFAULT_SAFEKEEPER_HTTP_PORT, safekeeper_http_port = DEFAULT_SAFEKEEPER_HTTP_PORT,
) )
@@ -74,9 +77,9 @@ fn main() -> Result<()> {
.required(true); .required(true);
#[rustfmt::skip] #[rustfmt::skip]
let safekeeper_node_arg = Arg::new("node") let safekeeper_id_arg = Arg::new("id")
.index(1) .index(1)
.help("Node name") .help("safekeeper id")
.required(false); .required(false);
let timeline_arg = Arg::new("timeline") let timeline_arg = Arg::new("timeline")
@@ -154,16 +157,16 @@ fn main() -> Result<()> {
.about("Manage safekeepers") .about("Manage safekeepers")
.subcommand(App::new("start") .subcommand(App::new("start")
.about("Start local safekeeper") .about("Start local safekeeper")
.arg(safekeeper_node_arg.clone()) .arg(safekeeper_id_arg.clone())
) )
.subcommand(App::new("stop") .subcommand(App::new("stop")
.about("Stop local safekeeper") .about("Stop local safekeeper")
.arg(safekeeper_node_arg.clone()) .arg(safekeeper_id_arg.clone())
.arg(stop_mode_arg.clone()) .arg(stop_mode_arg.clone())
) )
.subcommand(App::new("restart") .subcommand(App::new("restart")
.about("Restart local safekeeper") .about("Restart local safekeeper")
.arg(safekeeper_node_arg.clone()) .arg(safekeeper_id_arg.clone())
.arg(stop_mode_arg.clone()) .arg(stop_mode_arg.clone())
) )
) )
@@ -628,11 +631,11 @@ fn handle_pageserver(sub_match: &ArgMatches, env: &local_env::LocalEnv) -> Resul
Ok(()) Ok(())
} }
fn get_safekeeper(env: &local_env::LocalEnv, name: &str) -> Result<SafekeeperNode> { fn get_safekeeper(env: &local_env::LocalEnv, id: ZNodeId) -> Result<SafekeeperNode> {
if let Some(node) = env.safekeepers.iter().find(|node| node.name == name) { if let Some(node) = env.safekeepers.iter().find(|node| node.id == id) {
Ok(SafekeeperNode::from_env(env, node)) Ok(SafekeeperNode::from_env(env, node))
} else { } else {
bail!("could not find safekeeper '{}'", name) bail!("could not find safekeeper '{}'", id)
} }
} }
@@ -643,8 +646,12 @@ fn handle_safekeeper(sub_match: &ArgMatches, env: &local_env::LocalEnv) -> Resul
}; };
// All the commands take an optional safekeeper name argument // All the commands take an optional safekeeper name argument
let node_name = sub_args.value_of("node").unwrap_or(DEFAULT_SAFEKEEPER_NAME); let sk_id = if let Some(id_str) = sub_args.value_of("id") {
let safekeeper = get_safekeeper(env, node_name)?; ZNodeId(id_str.parse().context("while parsing safekeeper id")?)
} else {
DEFAULT_SAFEKEEPER_ID
};
let safekeeper = get_safekeeper(env, sk_id)?;
match sub_name { match sub_name {
"start" => { "start" => {
@@ -697,7 +704,7 @@ fn handle_start_all(sub_match: &ArgMatches, env: &local_env::LocalEnv) -> Result
for node in env.safekeepers.iter() { for node in env.safekeepers.iter() {
let safekeeper = SafekeeperNode::from_env(env, node); let safekeeper = SafekeeperNode::from_env(env, node);
if let Err(e) = safekeeper.start() { if let Err(e) = safekeeper.start() {
eprintln!("safekeeper '{}' start failed: {}", safekeeper.name, e); eprintln!("safekeeper '{}' start failed: {}", safekeeper.id, e);
exit(1); exit(1);
} }
} }
@@ -724,7 +731,7 @@ fn handle_stop_all(sub_match: &ArgMatches, env: &local_env::LocalEnv) -> Result<
for node in env.safekeepers.iter() { for node in env.safekeepers.iter() {
let safekeeper = SafekeeperNode::from_env(env, node); let safekeeper = SafekeeperNode::from_env(env, node);
if let Err(e) = safekeeper.stop(immediate) { if let Err(e) = safekeeper.stop(immediate) {
eprintln!("safekeeper '{}' stop failed: {}", safekeeper.name, e); eprintln!("safekeeper '{}' stop failed: {}", safekeeper.id, e);
} }
} }
Ok(()) Ok(())

View File

@@ -221,6 +221,18 @@ impl fmt::Display for ZTenantTimelineId {
} }
} }
// Unique ID of a storage node (safekeeper or pageserver). Supposed to be issued
// by the console.
#[derive(Clone, Copy, Eq, Ord, PartialEq, PartialOrd, Debug, Serialize, Deserialize)]
#[serde(transparent)]
pub struct ZNodeId(pub u64);
impl fmt::Display for ZNodeId {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(f, "{}", self.0)
}
}
#[cfg(test)] #[cfg(test)]
mod tests { mod tests {
use std::fmt::Display; use std::fmt::Display;