mirror of
https://github.com/neondatabase/neon.git
synced 2026-01-17 18:32:56 +00:00
Compare commits
4 Commits
proxy-asyn
...
ars/tmp
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
23644ed251 | ||
|
|
99e0f07a1d | ||
|
|
5d490babf8 | ||
|
|
5865f85ae2 |
@@ -5,16 +5,16 @@ listen_http_addr = '127.0.0.1:9898'
|
||||
auth_type = 'Trust'
|
||||
|
||||
[[safekeepers]]
|
||||
name = 'sk1'
|
||||
id = 1
|
||||
pg_port = 5454
|
||||
http_port = 7676
|
||||
|
||||
[[safekeepers]]
|
||||
name = 'sk2'
|
||||
id = 2
|
||||
pg_port = 5455
|
||||
http_port = 7677
|
||||
|
||||
[[safekeepers]]
|
||||
name = 'sk3'
|
||||
id = 3
|
||||
pg_port = 5456
|
||||
http_port = 7678
|
||||
|
||||
@@ -6,6 +6,6 @@ listen_http_addr = '127.0.0.1:9898'
|
||||
auth_type = 'Trust'
|
||||
|
||||
[[safekeepers]]
|
||||
name = 'single'
|
||||
id = 1
|
||||
pg_port = 5454
|
||||
http_port = 7676
|
||||
|
||||
@@ -12,7 +12,9 @@ use std::path::{Path, PathBuf};
|
||||
use std::process::{Command, Stdio};
|
||||
use zenith_utils::auth::{encode_from_key_file, Claims, Scope};
|
||||
use zenith_utils::postgres_backend::AuthType;
|
||||
use zenith_utils::zid::{opt_display_serde, ZTenantId};
|
||||
use zenith_utils::zid::{opt_display_serde, ZNodeId, ZTenantId};
|
||||
|
||||
use crate::safekeeper::SafekeeperNode;
|
||||
|
||||
//
|
||||
// This data structures represents zenith CLI config
|
||||
@@ -62,6 +64,8 @@ pub struct LocalEnv {
|
||||
#[derive(Serialize, Deserialize, Clone, Debug)]
|
||||
#[serde(default)]
|
||||
pub struct PageServerConf {
|
||||
// node id
|
||||
pub id: ZNodeId,
|
||||
// Pageserver connection settings
|
||||
pub listen_pg_addr: String,
|
||||
pub listen_http_addr: String,
|
||||
@@ -76,6 +80,7 @@ pub struct PageServerConf {
|
||||
impl Default for PageServerConf {
|
||||
fn default() -> Self {
|
||||
Self {
|
||||
id: ZNodeId(0),
|
||||
listen_pg_addr: String::new(),
|
||||
listen_http_addr: String::new(),
|
||||
auth_type: AuthType::Trust,
|
||||
@@ -87,7 +92,7 @@ impl Default for PageServerConf {
|
||||
#[derive(Serialize, Deserialize, Clone, Debug)]
|
||||
#[serde(default)]
|
||||
pub struct SafekeeperConf {
|
||||
pub name: String,
|
||||
pub id: ZNodeId,
|
||||
pub pg_port: u16,
|
||||
pub http_port: u16,
|
||||
pub sync: bool,
|
||||
@@ -96,7 +101,7 @@ pub struct SafekeeperConf {
|
||||
impl Default for SafekeeperConf {
|
||||
fn default() -> Self {
|
||||
Self {
|
||||
name: String::new(),
|
||||
id: ZNodeId(0),
|
||||
pg_port: 0,
|
||||
http_port: 0,
|
||||
sync: true,
|
||||
@@ -136,8 +141,8 @@ impl LocalEnv {
|
||||
self.base_data_dir.clone()
|
||||
}
|
||||
|
||||
pub fn safekeeper_data_dir(&self, node_name: &str) -> PathBuf {
|
||||
self.base_data_dir.join("safekeepers").join(node_name)
|
||||
pub fn safekeeper_data_dir(&self, data_dir_name: &str) -> PathBuf {
|
||||
self.base_data_dir.join("safekeepers").join(data_dir_name)
|
||||
}
|
||||
|
||||
/// Create a LocalEnv from a config file.
|
||||
@@ -285,7 +290,7 @@ impl LocalEnv {
|
||||
fs::create_dir_all(self.pg_data_dirs_path())?;
|
||||
|
||||
for safekeeper in &self.safekeepers {
|
||||
fs::create_dir_all(self.safekeeper_data_dir(&safekeeper.name))?;
|
||||
fs::create_dir_all(SafekeeperNode::datadir_path_by_id(self, safekeeper.id))?;
|
||||
}
|
||||
|
||||
let mut conf_content = String::new();
|
||||
|
||||
@@ -15,6 +15,7 @@ use reqwest::blocking::{Client, RequestBuilder, Response};
|
||||
use reqwest::{IntoUrl, Method};
|
||||
use thiserror::Error;
|
||||
use zenith_utils::http::error::HttpErrorBody;
|
||||
use zenith_utils::zid::ZNodeId;
|
||||
|
||||
use crate::local_env::{LocalEnv, SafekeeperConf};
|
||||
use crate::storage::PageServerNode;
|
||||
@@ -61,7 +62,7 @@ impl ResponseErrorMessageExt for Response {
|
||||
//
|
||||
#[derive(Debug)]
|
||||
pub struct SafekeeperNode {
|
||||
pub name: String,
|
||||
pub id: ZNodeId,
|
||||
|
||||
pub conf: SafekeeperConf,
|
||||
|
||||
@@ -77,10 +78,10 @@ impl SafekeeperNode {
|
||||
pub fn from_env(env: &LocalEnv, conf: &SafekeeperConf) -> SafekeeperNode {
|
||||
let pageserver = Arc::new(PageServerNode::from_env(env));
|
||||
|
||||
println!("initializing for {} for {}", conf.name, conf.http_port);
|
||||
println!("initializing for sk {} for {}", conf.id, conf.http_port);
|
||||
|
||||
SafekeeperNode {
|
||||
name: conf.name.clone(),
|
||||
id: conf.id,
|
||||
conf: conf.clone(),
|
||||
pg_connection_config: Self::safekeeper_connection_config(conf.pg_port),
|
||||
env: env.clone(),
|
||||
@@ -98,8 +99,12 @@ impl SafekeeperNode {
|
||||
.unwrap()
|
||||
}
|
||||
|
||||
pub fn datadir_path_by_id(env: &LocalEnv, sk_id: ZNodeId) -> PathBuf {
|
||||
env.safekeeper_data_dir(format!("sk{}", sk_id).as_ref())
|
||||
}
|
||||
|
||||
pub fn datadir_path(&self) -> PathBuf {
|
||||
self.env.safekeeper_data_dir(&self.name)
|
||||
SafekeeperNode::datadir_path_by_id(&self.env, self.id)
|
||||
}
|
||||
|
||||
pub fn pid_file(&self) -> PathBuf {
|
||||
@@ -120,6 +125,7 @@ impl SafekeeperNode {
|
||||
let mut cmd = Command::new(self.env.safekeeper_bin()?);
|
||||
fill_rust_env_vars(
|
||||
cmd.args(&["-D", self.datadir_path().to_str().unwrap()])
|
||||
.args(&["--id", self.id.to_string().as_ref()])
|
||||
.args(&["--listen-pg", &listen_pg])
|
||||
.args(&["--listen-http", &listen_http])
|
||||
.args(&["--recall", "1 second"])
|
||||
@@ -183,7 +189,7 @@ impl SafekeeperNode {
|
||||
pub fn stop(&self, immediate: bool) -> anyhow::Result<()> {
|
||||
let pid_file = self.pid_file();
|
||||
if !pid_file.exists() {
|
||||
println!("Safekeeper {} is already stopped", self.name);
|
||||
println!("Safekeeper {} is already stopped", self.id);
|
||||
return Ok(());
|
||||
}
|
||||
let pid = read_pidfile(&pid_file)?;
|
||||
|
||||
@@ -103,6 +103,8 @@ impl PageServerNode {
|
||||
) -> anyhow::Result<()> {
|
||||
let mut cmd = Command::new(self.env.pageserver_bin()?);
|
||||
|
||||
let id = format!("id={}", self.env.pageserver.id);
|
||||
|
||||
// FIXME: the paths should be shell-escaped to handle paths with spaces, quotas etc.
|
||||
let base_data_dir_param = self.env.base_data_dir.display().to_string();
|
||||
let pg_distrib_dir_param =
|
||||
@@ -122,6 +124,7 @@ impl PageServerNode {
|
||||
args.extend(["-c", &authg_type_param]);
|
||||
args.extend(["-c", &listen_http_addr_param]);
|
||||
args.extend(["-c", &listen_pg_addr_param]);
|
||||
args.extend(["-c", &id]);
|
||||
|
||||
for config_override in config_overrides {
|
||||
args.extend(["-c", config_override]);
|
||||
|
||||
@@ -4,7 +4,7 @@ set -eux
|
||||
if [ "$1" = 'pageserver' ]; then
|
||||
if [ ! -d "/data/tenants" ]; then
|
||||
echo "Initializing pageserver data directory"
|
||||
pageserver --init -D /data -c "pg_distrib_dir='/usr/local'"
|
||||
pageserver --init -D /data -c "pg_distrib_dir='/usr/local'" -c "id=10"
|
||||
fi
|
||||
echo "Staring pageserver at 0.0.0.0:6400"
|
||||
pageserver -c "listen_pg_addr='0.0.0.0:6400'" -c "listen_http_addr='0.0.0.0:9898'" -D /data
|
||||
|
||||
@@ -61,7 +61,7 @@ fn main() -> Result<()> {
|
||||
.number_of_values(1)
|
||||
.multiple_occurrences(true)
|
||||
.help("Additional configuration overrides of the ones from the toml config file (or new ones to add there).
|
||||
Any option has to be a valid toml document, example: `-c \"foo='hey'\"` `-c \"foo={value=1}\"`"),
|
||||
Any option has to be a valid toml document, example: `-c=\"foo='hey'\"` `-c=\"foo={value=1}\"`"),
|
||||
)
|
||||
.get_matches();
|
||||
|
||||
@@ -115,7 +115,14 @@ fn main() -> Result<()> {
|
||||
option_line
|
||||
)
|
||||
})?;
|
||||
|
||||
for (key, item) in doc.iter() {
|
||||
if key == "id" {
|
||||
anyhow::ensure!(
|
||||
init,
|
||||
"node id can only be set during pageserver init and cannot be overridden"
|
||||
);
|
||||
}
|
||||
toml.insert(key, item.clone());
|
||||
}
|
||||
}
|
||||
|
||||
@@ -8,7 +8,7 @@ use anyhow::{bail, ensure, Context, Result};
|
||||
use toml_edit;
|
||||
use toml_edit::{Document, Item};
|
||||
use zenith_utils::postgres_backend::AuthType;
|
||||
use zenith_utils::zid::{ZTenantId, ZTimelineId};
|
||||
use zenith_utils::zid::{ZNodeId, ZTenantId, ZTimelineId};
|
||||
|
||||
use std::convert::TryInto;
|
||||
use std::env;
|
||||
@@ -72,6 +72,10 @@ pub mod defaults {
|
||||
|
||||
#[derive(Debug, Clone, PartialEq, Eq)]
|
||||
pub struct PageServerConf {
|
||||
// Identifier of that particular pageserver so e g safekeepers
|
||||
// can safely distinguish different pageservers
|
||||
pub id: ZNodeId,
|
||||
|
||||
/// Example (default): 127.0.0.1:64000
|
||||
pub listen_pg_addr: String,
|
||||
/// Example (default): 127.0.0.1:9898
|
||||
@@ -106,6 +110,184 @@ pub struct PageServerConf {
|
||||
pub remote_storage_config: Option<RemoteStorageConfig>,
|
||||
}
|
||||
|
||||
// use dedicated enum for builder to better indicate the intention
|
||||
// and avoid possible confusion with nested options
|
||||
pub enum BuilderValue<T> {
|
||||
Set(T),
|
||||
NotSet,
|
||||
}
|
||||
|
||||
impl<T> BuilderValue<T> {
|
||||
pub fn ok_or<E>(self, err: E) -> Result<T, E> {
|
||||
match self {
|
||||
Self::Set(v) => Ok(v),
|
||||
Self::NotSet => Err(err),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// needed to simplify config construction
|
||||
struct PageServerConfigBuilder {
|
||||
listen_pg_addr: BuilderValue<String>,
|
||||
|
||||
listen_http_addr: BuilderValue<String>,
|
||||
|
||||
checkpoint_distance: BuilderValue<u64>,
|
||||
checkpoint_period: BuilderValue<Duration>,
|
||||
|
||||
gc_horizon: BuilderValue<u64>,
|
||||
gc_period: BuilderValue<Duration>,
|
||||
superuser: BuilderValue<String>,
|
||||
|
||||
page_cache_size: BuilderValue<usize>,
|
||||
max_file_descriptors: BuilderValue<usize>,
|
||||
|
||||
workdir: BuilderValue<PathBuf>,
|
||||
|
||||
pg_distrib_dir: BuilderValue<PathBuf>,
|
||||
|
||||
auth_type: BuilderValue<AuthType>,
|
||||
|
||||
//
|
||||
auth_validation_public_key_path: BuilderValue<Option<PathBuf>>,
|
||||
remote_storage_config: BuilderValue<Option<RemoteStorageConfig>>,
|
||||
|
||||
id: BuilderValue<ZNodeId>,
|
||||
}
|
||||
|
||||
impl Default for PageServerConfigBuilder {
|
||||
fn default() -> Self {
|
||||
use self::BuilderValue::*;
|
||||
use defaults::*;
|
||||
Self {
|
||||
listen_pg_addr: Set(DEFAULT_PG_LISTEN_ADDR.to_string()),
|
||||
listen_http_addr: Set(DEFAULT_HTTP_LISTEN_ADDR.to_string()),
|
||||
checkpoint_distance: Set(DEFAULT_CHECKPOINT_DISTANCE),
|
||||
checkpoint_period: Set(humantime::parse_duration(DEFAULT_CHECKPOINT_PERIOD)
|
||||
.expect("cannot parse default checkpoint period")),
|
||||
gc_horizon: Set(DEFAULT_GC_HORIZON),
|
||||
gc_period: Set(humantime::parse_duration(DEFAULT_GC_PERIOD)
|
||||
.expect("cannot parse default gc period")),
|
||||
superuser: Set(DEFAULT_SUPERUSER.to_string()),
|
||||
page_cache_size: Set(DEFAULT_PAGE_CACHE_SIZE),
|
||||
max_file_descriptors: Set(DEFAULT_MAX_FILE_DESCRIPTORS),
|
||||
workdir: Set(PathBuf::new()),
|
||||
pg_distrib_dir: Set(env::current_dir()
|
||||
.expect("cannot access current directory")
|
||||
.join("tmp_install")),
|
||||
auth_type: Set(AuthType::Trust),
|
||||
auth_validation_public_key_path: Set(None),
|
||||
remote_storage_config: Set(None),
|
||||
id: NotSet,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl PageServerConfigBuilder {
|
||||
pub fn listen_pg_addr(&mut self, listen_pg_addr: String) {
|
||||
self.listen_pg_addr = BuilderValue::Set(listen_pg_addr)
|
||||
}
|
||||
|
||||
pub fn listen_http_addr(&mut self, listen_http_addr: String) {
|
||||
self.listen_http_addr = BuilderValue::Set(listen_http_addr)
|
||||
}
|
||||
|
||||
pub fn checkpoint_distance(&mut self, checkpoint_distance: u64) {
|
||||
self.checkpoint_distance = BuilderValue::Set(checkpoint_distance)
|
||||
}
|
||||
|
||||
pub fn checkpoint_period(&mut self, checkpoint_period: Duration) {
|
||||
self.checkpoint_period = BuilderValue::Set(checkpoint_period)
|
||||
}
|
||||
|
||||
pub fn gc_horizon(&mut self, gc_horizon: u64) {
|
||||
self.gc_horizon = BuilderValue::Set(gc_horizon)
|
||||
}
|
||||
|
||||
pub fn gc_period(&mut self, gc_period: Duration) {
|
||||
self.gc_period = BuilderValue::Set(gc_period)
|
||||
}
|
||||
|
||||
pub fn superuser(&mut self, superuser: String) {
|
||||
self.superuser = BuilderValue::Set(superuser)
|
||||
}
|
||||
|
||||
pub fn page_cache_size(&mut self, page_cache_size: usize) {
|
||||
self.page_cache_size = BuilderValue::Set(page_cache_size)
|
||||
}
|
||||
|
||||
pub fn max_file_descriptors(&mut self, max_file_descriptors: usize) {
|
||||
self.max_file_descriptors = BuilderValue::Set(max_file_descriptors)
|
||||
}
|
||||
|
||||
pub fn workdir(&mut self, workdir: PathBuf) {
|
||||
self.workdir = BuilderValue::Set(workdir)
|
||||
}
|
||||
|
||||
pub fn pg_distrib_dir(&mut self, pg_distrib_dir: PathBuf) {
|
||||
self.pg_distrib_dir = BuilderValue::Set(pg_distrib_dir)
|
||||
}
|
||||
|
||||
pub fn auth_type(&mut self, auth_type: AuthType) {
|
||||
self.auth_type = BuilderValue::Set(auth_type)
|
||||
}
|
||||
|
||||
pub fn auth_validation_public_key_path(
|
||||
&mut self,
|
||||
auth_validation_public_key_path: Option<PathBuf>,
|
||||
) {
|
||||
self.auth_validation_public_key_path = BuilderValue::Set(auth_validation_public_key_path)
|
||||
}
|
||||
|
||||
pub fn remote_storage_config(&mut self, remote_storage_config: Option<RemoteStorageConfig>) {
|
||||
self.remote_storage_config = BuilderValue::Set(remote_storage_config)
|
||||
}
|
||||
|
||||
pub fn id(&mut self, node_id: ZNodeId) {
|
||||
self.id = BuilderValue::Set(node_id)
|
||||
}
|
||||
|
||||
pub fn build(self) -> Result<PageServerConf> {
|
||||
Ok(PageServerConf {
|
||||
listen_pg_addr: self
|
||||
.listen_pg_addr
|
||||
.ok_or(anyhow::anyhow!("missing listen_pg_addr"))?,
|
||||
listen_http_addr: self
|
||||
.listen_http_addr
|
||||
.ok_or(anyhow::anyhow!("missing listen_http_addr"))?,
|
||||
checkpoint_distance: self
|
||||
.checkpoint_distance
|
||||
.ok_or(anyhow::anyhow!("missing checkpoint_distance"))?,
|
||||
checkpoint_period: self
|
||||
.checkpoint_period
|
||||
.ok_or(anyhow::anyhow!("missing checkpoint_period"))?,
|
||||
gc_horizon: self
|
||||
.gc_horizon
|
||||
.ok_or(anyhow::anyhow!("missing gc_horizon"))?,
|
||||
gc_period: self.gc_period.ok_or(anyhow::anyhow!("missing gc_period"))?,
|
||||
superuser: self.superuser.ok_or(anyhow::anyhow!("missing superuser"))?,
|
||||
page_cache_size: self
|
||||
.page_cache_size
|
||||
.ok_or(anyhow::anyhow!("missing page_cache_size"))?,
|
||||
max_file_descriptors: self
|
||||
.max_file_descriptors
|
||||
.ok_or(anyhow::anyhow!("missing max_file_descriptors"))?,
|
||||
workdir: self.workdir.ok_or(anyhow::anyhow!("missing workdir"))?,
|
||||
pg_distrib_dir: self
|
||||
.pg_distrib_dir
|
||||
.ok_or(anyhow::anyhow!("missing pg_distrib_dir"))?,
|
||||
auth_type: self.auth_type.ok_or(anyhow::anyhow!("missing auth_type"))?,
|
||||
auth_validation_public_key_path: self
|
||||
.auth_validation_public_key_path
|
||||
.ok_or(anyhow::anyhow!("missing auth_validation_public_key_path"))?,
|
||||
remote_storage_config: self
|
||||
.remote_storage_config
|
||||
.ok_or(anyhow::anyhow!("missing remote_storage_config"))?,
|
||||
id: self.id.ok_or(anyhow::anyhow!("missing id"))?,
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
/// External backup storage configuration, enough for creating a client for that storage.
|
||||
#[derive(Debug, Clone, PartialEq, Eq)]
|
||||
pub struct RemoteStorageConfig {
|
||||
@@ -221,57 +403,39 @@ impl PageServerConf {
|
||||
///
|
||||
/// This leaves any options not present in the file in the built-in defaults.
|
||||
pub fn parse_and_validate(toml: &Document, workdir: &Path) -> Result<Self> {
|
||||
use defaults::*;
|
||||
|
||||
let mut conf = PageServerConf {
|
||||
workdir: workdir.to_path_buf(),
|
||||
|
||||
listen_pg_addr: DEFAULT_PG_LISTEN_ADDR.to_string(),
|
||||
listen_http_addr: DEFAULT_HTTP_LISTEN_ADDR.to_string(),
|
||||
checkpoint_distance: DEFAULT_CHECKPOINT_DISTANCE,
|
||||
checkpoint_period: humantime::parse_duration(DEFAULT_CHECKPOINT_PERIOD)?,
|
||||
gc_horizon: DEFAULT_GC_HORIZON,
|
||||
gc_period: humantime::parse_duration(DEFAULT_GC_PERIOD)?,
|
||||
page_cache_size: DEFAULT_PAGE_CACHE_SIZE,
|
||||
max_file_descriptors: DEFAULT_MAX_FILE_DESCRIPTORS,
|
||||
|
||||
pg_distrib_dir: PathBuf::new(),
|
||||
auth_validation_public_key_path: None,
|
||||
auth_type: AuthType::Trust,
|
||||
|
||||
remote_storage_config: None,
|
||||
|
||||
superuser: DEFAULT_SUPERUSER.to_string(),
|
||||
};
|
||||
let mut builder = PageServerConfigBuilder::default();
|
||||
builder.workdir(workdir.to_owned());
|
||||
|
||||
for (key, item) in toml.iter() {
|
||||
match key {
|
||||
"listen_pg_addr" => conf.listen_pg_addr = parse_toml_string(key, item)?,
|
||||
"listen_http_addr" => conf.listen_http_addr = parse_toml_string(key, item)?,
|
||||
"checkpoint_distance" => conf.checkpoint_distance = parse_toml_u64(key, item)?,
|
||||
"checkpoint_period" => conf.checkpoint_period = parse_toml_duration(key, item)?,
|
||||
"gc_horizon" => conf.gc_horizon = parse_toml_u64(key, item)?,
|
||||
"gc_period" => conf.gc_period = parse_toml_duration(key, item)?,
|
||||
"initial_superuser_name" => conf.superuser = parse_toml_string(key, item)?,
|
||||
"page_cache_size" => conf.page_cache_size = parse_toml_u64(key, item)? as usize,
|
||||
"listen_pg_addr" => builder.listen_pg_addr(parse_toml_string(key, item)?),
|
||||
"listen_http_addr" => builder.listen_http_addr(parse_toml_string(key, item)?),
|
||||
"checkpoint_distance" => builder.checkpoint_distance(parse_toml_u64(key, item)?),
|
||||
"checkpoint_period" => builder.checkpoint_period(parse_toml_duration(key, item)?),
|
||||
"gc_horizon" => builder.gc_horizon(parse_toml_u64(key, item)?),
|
||||
"gc_period" => builder.gc_period(parse_toml_duration(key, item)?),
|
||||
"initial_superuser_name" => builder.superuser(parse_toml_string(key, item)?),
|
||||
"page_cache_size" => builder.page_cache_size(parse_toml_u64(key, item)? as usize),
|
||||
"max_file_descriptors" => {
|
||||
conf.max_file_descriptors = parse_toml_u64(key, item)? as usize
|
||||
builder.max_file_descriptors(parse_toml_u64(key, item)? as usize)
|
||||
}
|
||||
"pg_distrib_dir" => {
|
||||
conf.pg_distrib_dir = PathBuf::from(parse_toml_string(key, item)?)
|
||||
builder.pg_distrib_dir(PathBuf::from(parse_toml_string(key, item)?))
|
||||
}
|
||||
"auth_validation_public_key_path" => {
|
||||
conf.auth_validation_public_key_path =
|
||||
Some(PathBuf::from(parse_toml_string(key, item)?))
|
||||
}
|
||||
"auth_type" => conf.auth_type = parse_toml_auth_type(key, item)?,
|
||||
"auth_validation_public_key_path" => builder.auth_validation_public_key_path(Some(
|
||||
PathBuf::from(parse_toml_string(key, item)?),
|
||||
)),
|
||||
"auth_type" => builder.auth_type(parse_toml_auth_type(key, item)?),
|
||||
"remote_storage" => {
|
||||
conf.remote_storage_config = Some(Self::parse_remote_storage_config(item)?)
|
||||
builder.remote_storage_config(Some(Self::parse_remote_storage_config(item)?))
|
||||
}
|
||||
"id" => builder.id(ZNodeId(parse_toml_u64(key, item)?)),
|
||||
_ => bail!("unrecognized pageserver option '{}'", key),
|
||||
}
|
||||
}
|
||||
|
||||
let mut conf = builder.build().context("invalid config")?;
|
||||
|
||||
if conf.auth_type == AuthType::ZenithJWT {
|
||||
let auth_validation_public_key_path = conf
|
||||
.auth_validation_public_key_path
|
||||
@@ -285,9 +449,6 @@ impl PageServerConf {
|
||||
);
|
||||
}
|
||||
|
||||
if conf.pg_distrib_dir == PathBuf::new() {
|
||||
conf.pg_distrib_dir = env::current_dir()?.join("tmp_install")
|
||||
};
|
||||
if !conf.pg_distrib_dir.join("bin/postgres").exists() {
|
||||
bail!(
|
||||
"Can't find postgres binary at {}",
|
||||
@@ -382,6 +543,7 @@ impl PageServerConf {
|
||||
#[cfg(test)]
|
||||
pub fn dummy_conf(repo_dir: PathBuf) -> Self {
|
||||
PageServerConf {
|
||||
id: ZNodeId(0),
|
||||
checkpoint_distance: defaults::DEFAULT_CHECKPOINT_DISTANCE,
|
||||
checkpoint_period: Duration::from_secs(10),
|
||||
gc_horizon: defaults::DEFAULT_GC_HORIZON,
|
||||
@@ -461,15 +623,16 @@ max_file_descriptors = 333
|
||||
|
||||
# initial superuser role name to use when creating a new tenant
|
||||
initial_superuser_name = 'zzzz'
|
||||
id = 10
|
||||
|
||||
"#;
|
||||
"#;
|
||||
|
||||
#[test]
|
||||
fn parse_defaults() -> anyhow::Result<()> {
|
||||
let tempdir = tempdir()?;
|
||||
let (workdir, pg_distrib_dir) = prepare_fs(&tempdir)?;
|
||||
// we have to create dummy pathes to overcome the validation errors
|
||||
let config_string = format!("pg_distrib_dir='{}'", pg_distrib_dir.display());
|
||||
let config_string = format!("pg_distrib_dir='{}'\nid=10", pg_distrib_dir.display());
|
||||
let toml = config_string.parse()?;
|
||||
|
||||
let parsed_config =
|
||||
@@ -480,6 +643,7 @@ initial_superuser_name = 'zzzz'
|
||||
assert_eq!(
|
||||
parsed_config,
|
||||
PageServerConf {
|
||||
id: ZNodeId(10),
|
||||
listen_pg_addr: defaults::DEFAULT_PG_LISTEN_ADDR.to_string(),
|
||||
listen_http_addr: defaults::DEFAULT_HTTP_LISTEN_ADDR.to_string(),
|
||||
checkpoint_distance: defaults::DEFAULT_CHECKPOINT_DISTANCE,
|
||||
@@ -521,6 +685,7 @@ initial_superuser_name = 'zzzz'
|
||||
assert_eq!(
|
||||
parsed_config,
|
||||
PageServerConf {
|
||||
id: ZNodeId(10),
|
||||
listen_pg_addr: "127.0.0.1:64000".to_string(),
|
||||
listen_http_addr: "127.0.0.1:9898".to_string(),
|
||||
checkpoint_distance: 111,
|
||||
|
||||
@@ -1,6 +1,7 @@
|
||||
use serde::{Deserialize, Serialize};
|
||||
|
||||
use crate::ZTenantId;
|
||||
use zenith_utils::zid::ZNodeId;
|
||||
|
||||
#[derive(Serialize, Deserialize)]
|
||||
pub struct BranchCreateRequest {
|
||||
@@ -15,3 +16,8 @@ pub struct TenantCreateRequest {
|
||||
#[serde(with = "hex")]
|
||||
pub tenant_id: ZTenantId,
|
||||
}
|
||||
|
||||
#[derive(Serialize)]
|
||||
pub struct StatusResponse {
|
||||
pub id: ZNodeId,
|
||||
}
|
||||
|
||||
@@ -17,6 +17,11 @@ paths:
|
||||
application/json:
|
||||
schema:
|
||||
type: object
|
||||
required:
|
||||
- id
|
||||
properties:
|
||||
id:
|
||||
type: integer
|
||||
/v1/timeline/{tenant_id}:
|
||||
parameters:
|
||||
- name: tenant_id
|
||||
|
||||
@@ -1,7 +1,6 @@
|
||||
use std::sync::Arc;
|
||||
|
||||
use anyhow::{Context, Result};
|
||||
use hyper::header;
|
||||
use hyper::StatusCode;
|
||||
use hyper::{Body, Request, Response, Uri};
|
||||
use serde::Serialize;
|
||||
@@ -23,6 +22,7 @@ use zenith_utils::lsn::Lsn;
|
||||
use zenith_utils::zid::{opt_display_serde, ZTimelineId};
|
||||
|
||||
use super::models::BranchCreateRequest;
|
||||
use super::models::StatusResponse;
|
||||
use super::models::TenantCreateRequest;
|
||||
use crate::branches::BranchInfo;
|
||||
use crate::repository::RepositoryTimeline;
|
||||
@@ -64,12 +64,12 @@ fn get_config(request: &Request<Body>) -> &'static PageServerConf {
|
||||
}
|
||||
|
||||
// healthcheck handler
|
||||
async fn status_handler(_: Request<Body>) -> Result<Response<Body>, ApiError> {
|
||||
Ok(Response::builder()
|
||||
.status(StatusCode::OK)
|
||||
.header(header::CONTENT_TYPE, "application/json")
|
||||
.body(Body::from("{}"))
|
||||
.map_err(ApiError::from_err)?)
|
||||
async fn status_handler(request: Request<Body>) -> Result<Response<Body>, ApiError> {
|
||||
let config = get_config(&request);
|
||||
Ok(json_response(
|
||||
StatusCode::OK,
|
||||
StatusResponse { id: config.id },
|
||||
)?)
|
||||
}
|
||||
|
||||
async fn branch_create_handler(mut request: Request<Body>) -> Result<Response<Body>, ApiError> {
|
||||
|
||||
@@ -89,7 +89,7 @@ def test_foobar(zenith_env_builder: ZenithEnvBuilder):
|
||||
|
||||
# Now create the environment. This initializes the repository, and starts
|
||||
# up the page server and the safekeepers
|
||||
env = zenith_env_builder.init()
|
||||
env = zenith_env_builder.init_start()
|
||||
|
||||
# Run the test
|
||||
...
|
||||
|
||||
@@ -8,7 +8,7 @@ import pytest
|
||||
|
||||
def test_pageserver_auth(zenith_env_builder: ZenithEnvBuilder):
|
||||
zenith_env_builder.pageserver_auth_enabled = True
|
||||
env = zenith_env_builder.init()
|
||||
env = zenith_env_builder.init_start()
|
||||
|
||||
ps = env.pageserver
|
||||
|
||||
@@ -51,7 +51,7 @@ def test_compute_auth_to_pageserver(zenith_env_builder: ZenithEnvBuilder, with_w
|
||||
zenith_env_builder.pageserver_auth_enabled = True
|
||||
if with_wal_acceptors:
|
||||
zenith_env_builder.num_safekeepers = 3
|
||||
env = zenith_env_builder.init()
|
||||
env = zenith_env_builder.init_start()
|
||||
|
||||
branch = f"test_compute_auth_to_pageserver{with_wal_acceptors}"
|
||||
env.zenith_cli.create_branch(branch, "main")
|
||||
|
||||
@@ -19,7 +19,7 @@ def test_branch_behind(zenith_env_builder: ZenithEnvBuilder):
|
||||
#
|
||||
# See https://github.com/zenithdb/zenith/issues/1068
|
||||
zenith_env_builder.num_safekeepers = 1
|
||||
env = zenith_env_builder.init()
|
||||
env = zenith_env_builder.init_start()
|
||||
|
||||
# Branch at the point where only 100 rows were inserted
|
||||
env.zenith_cli.create_branch("test_branch_behind", "main")
|
||||
|
||||
@@ -11,7 +11,7 @@ from fixtures.log_helper import log
|
||||
def test_next_xid(zenith_env_builder: ZenithEnvBuilder):
|
||||
# One safekeeper is enough for this test.
|
||||
zenith_env_builder.num_safekeepers = 1
|
||||
env = zenith_env_builder.init()
|
||||
env = zenith_env_builder.init_start()
|
||||
|
||||
pg = env.postgres.create_start('main')
|
||||
|
||||
|
||||
@@ -1,8 +1,15 @@
|
||||
import json
|
||||
from uuid import uuid4, UUID
|
||||
from fixtures.zenith_fixtures import ZenithEnv, ZenithEnvBuilder, ZenithPageserverHttpClient
|
||||
from typing import cast
|
||||
import pytest, psycopg2
|
||||
import pytest
|
||||
from fixtures.zenith_fixtures import ZenithEnv, ZenithEnvBuilder, ZenithPageserverHttpClient, zenith_binpath
|
||||
|
||||
|
||||
# test that we cannot override node id
|
||||
def test_pageserver_init_node_id(zenith_env_builder: ZenithEnvBuilder):
|
||||
env = zenith_env_builder.init()
|
||||
with pytest.raises(
|
||||
Exception,
|
||||
match="node id can only be set during pageserver init and cannot be overridden"):
|
||||
env.pageserver.start(overrides=['--pageserver-config-override=id=10'])
|
||||
|
||||
|
||||
def check_client(client: ZenithPageserverHttpClient, initial_tenant: UUID):
|
||||
@@ -41,7 +48,7 @@ def test_pageserver_http_api_client(zenith_simple_env: ZenithEnv):
|
||||
|
||||
def test_pageserver_http_api_client_auth_enabled(zenith_env_builder: ZenithEnvBuilder):
|
||||
zenith_env_builder.pageserver_auth_enabled = True
|
||||
env = zenith_env_builder.init()
|
||||
env = zenith_env_builder.init_start()
|
||||
|
||||
management_token = env.auth_keys.generate_management_token()
|
||||
|
||||
|
||||
@@ -14,7 +14,7 @@ from fixtures.log_helper import log
|
||||
# and new compute node contains all data.
|
||||
def test_pageserver_catchup_while_compute_down(zenith_env_builder: ZenithEnvBuilder):
|
||||
zenith_env_builder.num_safekeepers = 3
|
||||
env = zenith_env_builder.init()
|
||||
env = zenith_env_builder.init_start()
|
||||
|
||||
env.zenith_cli.create_branch("test_pageserver_catchup_while_compute_down", "main")
|
||||
pg = env.postgres.create_start('test_pageserver_catchup_while_compute_down')
|
||||
|
||||
@@ -13,7 +13,7 @@ from fixtures.log_helper import log
|
||||
def test_pageserver_restart(zenith_env_builder: ZenithEnvBuilder):
|
||||
# One safekeeper is enough for this test.
|
||||
zenith_env_builder.num_safekeepers = 1
|
||||
env = zenith_env_builder.init()
|
||||
env = zenith_env_builder.init_start()
|
||||
|
||||
env.zenith_cli.create_branch("test_pageserver_restart", "main")
|
||||
pg = env.postgres.create_start('test_pageserver_restart')
|
||||
|
||||
@@ -42,7 +42,7 @@ def test_remote_storage_backup_and_restore(zenith_env_builder: ZenithEnvBuilder,
|
||||
data_secret = 'very secret secret'
|
||||
|
||||
##### First start, insert secret data and upload it to the remote storage
|
||||
env = zenith_env_builder.init()
|
||||
env = zenith_env_builder.init_start()
|
||||
pg = env.postgres.create_start()
|
||||
|
||||
tenant_id = pg.safe_psql("show zenith.zenith_tenant")[0][0]
|
||||
|
||||
@@ -13,7 +13,7 @@ def test_restart_compute(zenith_env_builder: ZenithEnvBuilder, with_wal_acceptor
|
||||
zenith_env_builder.pageserver_auth_enabled = True
|
||||
if with_wal_acceptors:
|
||||
zenith_env_builder.num_safekeepers = 3
|
||||
env = zenith_env_builder.init()
|
||||
env = zenith_env_builder.init_start()
|
||||
|
||||
env.zenith_cli.create_branch("test_restart_compute", "main")
|
||||
|
||||
|
||||
@@ -122,7 +122,7 @@ def test_tenant_relocation(zenith_env_builder: ZenithEnvBuilder,
|
||||
zenith_env_builder.num_safekeepers = 1
|
||||
zenith_env_builder.enable_local_fs_remote_storage()
|
||||
|
||||
env = zenith_env_builder.init()
|
||||
env = zenith_env_builder.init_start()
|
||||
|
||||
# create folder for remote storage mock
|
||||
remote_storage_mock_path = env.repo_dir / 'local_fs_remote_storage'
|
||||
|
||||
@@ -10,7 +10,7 @@ def test_tenants_normal_work(zenith_env_builder: ZenithEnvBuilder, with_wal_acce
|
||||
if with_wal_acceptors:
|
||||
zenith_env_builder.num_safekeepers = 3
|
||||
|
||||
env = zenith_env_builder.init()
|
||||
env = zenith_env_builder.init_start()
|
||||
"""Tests tenants with and without wal acceptors"""
|
||||
tenant_1 = env.create_tenant()
|
||||
tenant_2 = env.create_tenant()
|
||||
|
||||
@@ -67,7 +67,7 @@ def wait_for_pageserver_catchup(pgmain: Postgres, polling_interval=1, timeout=60
|
||||
|
||||
def test_timeline_size_quota(zenith_env_builder: ZenithEnvBuilder):
|
||||
zenith_env_builder.num_safekeepers = 1
|
||||
env = zenith_env_builder.init()
|
||||
env = zenith_env_builder.init_start()
|
||||
env.zenith_cli.create_branch("test_timeline_size_quota", "main")
|
||||
|
||||
client = env.pageserver.http_client()
|
||||
|
||||
@@ -22,7 +22,7 @@ from typing import List, Optional, Any
|
||||
# succeed and data is written
|
||||
def test_normal_work(zenith_env_builder: ZenithEnvBuilder):
|
||||
zenith_env_builder.num_safekeepers = 3
|
||||
env = zenith_env_builder.init()
|
||||
env = zenith_env_builder.init_start()
|
||||
|
||||
env.zenith_cli.create_branch("test_wal_acceptors_normal_work", "main")
|
||||
|
||||
@@ -51,7 +51,7 @@ class BranchMetrics:
|
||||
# against different timelines.
|
||||
def test_many_timelines(zenith_env_builder: ZenithEnvBuilder):
|
||||
zenith_env_builder.num_safekeepers = 3
|
||||
env = zenith_env_builder.init()
|
||||
env = zenith_env_builder.init_start()
|
||||
|
||||
n_timelines = 3
|
||||
|
||||
@@ -181,7 +181,7 @@ def test_restarts(zenith_env_builder: ZenithEnvBuilder):
|
||||
n_acceptors = 3
|
||||
|
||||
zenith_env_builder.num_safekeepers = n_acceptors
|
||||
env = zenith_env_builder.init()
|
||||
env = zenith_env_builder.init_start()
|
||||
|
||||
env.zenith_cli.create_branch("test_wal_acceptors_restarts", "main")
|
||||
pg = env.postgres.create_start('test_wal_acceptors_restarts')
|
||||
@@ -218,7 +218,7 @@ def delayed_wal_acceptor_start(wa):
|
||||
# When majority of acceptors is offline, commits are expected to be frozen
|
||||
def test_unavailability(zenith_env_builder: ZenithEnvBuilder):
|
||||
zenith_env_builder.num_safekeepers = 2
|
||||
env = zenith_env_builder.init()
|
||||
env = zenith_env_builder.init_start()
|
||||
|
||||
env.zenith_cli.create_branch("test_wal_acceptors_unavailability", "main")
|
||||
pg = env.postgres.create_start('test_wal_acceptors_unavailability')
|
||||
@@ -289,7 +289,7 @@ def stop_value():
|
||||
def test_race_conditions(zenith_env_builder: ZenithEnvBuilder, stop_value):
|
||||
|
||||
zenith_env_builder.num_safekeepers = 3
|
||||
env = zenith_env_builder.init()
|
||||
env = zenith_env_builder.init_start()
|
||||
|
||||
env.zenith_cli.create_branch("test_wal_acceptors_race_conditions", "main")
|
||||
pg = env.postgres.create_start('test_wal_acceptors_race_conditions')
|
||||
@@ -404,7 +404,7 @@ def test_sync_safekeepers(zenith_env_builder: ZenithEnvBuilder,
|
||||
# We don't really need the full environment for this test, just the
|
||||
# safekeepers would be enough.
|
||||
zenith_env_builder.num_safekeepers = 3
|
||||
env = zenith_env_builder.init()
|
||||
env = zenith_env_builder.init_start()
|
||||
|
||||
timeline_id = uuid.uuid4()
|
||||
tenant_id = uuid.uuid4()
|
||||
@@ -454,7 +454,7 @@ def test_sync_safekeepers(zenith_env_builder: ZenithEnvBuilder,
|
||||
def test_timeline_status(zenith_env_builder: ZenithEnvBuilder):
|
||||
|
||||
zenith_env_builder.num_safekeepers = 1
|
||||
env = zenith_env_builder.init()
|
||||
env = zenith_env_builder.init_start()
|
||||
|
||||
env.zenith_cli.create_branch("test_timeline_status", "main")
|
||||
pg = env.postgres.create_start('test_timeline_status')
|
||||
@@ -521,12 +521,7 @@ class SafekeeperEnv:
|
||||
http=self.port_distributor.get_port(),
|
||||
)
|
||||
|
||||
if self.num_safekeepers == 1:
|
||||
name = "single"
|
||||
else:
|
||||
name = f"sk{i}"
|
||||
|
||||
safekeeper_dir = os.path.join(self.repo_dir, name)
|
||||
safekeeper_dir = os.path.join(self.repo_dir, f"sk{i}")
|
||||
mkdir_if_needed(safekeeper_dir)
|
||||
|
||||
args = [
|
||||
@@ -537,6 +532,8 @@ class SafekeeperEnv:
|
||||
f"127.0.0.1:{port.http}",
|
||||
"-D",
|
||||
safekeeper_dir,
|
||||
"--id",
|
||||
str(i),
|
||||
"--daemonize"
|
||||
]
|
||||
|
||||
@@ -604,9 +601,8 @@ def test_safekeeper_without_pageserver(test_output_dir: str,
|
||||
|
||||
|
||||
def test_replace_safekeeper(zenith_env_builder: ZenithEnvBuilder):
|
||||
def safekeepers_guc(env: ZenithEnv, sk_names: List[str]) -> str:
|
||||
return ','.join(
|
||||
[f'localhost:{sk.port.pg}' for sk in env.safekeepers if sk.name in sk_names])
|
||||
def safekeepers_guc(env: ZenithEnv, sk_names: List[int]) -> str:
|
||||
return ','.join([f'localhost:{sk.port.pg}' for sk in env.safekeepers if sk.id in sk_names])
|
||||
|
||||
def execute_payload(pg: Postgres):
|
||||
with closing(pg.connect()) as conn:
|
||||
@@ -628,17 +624,17 @@ def test_replace_safekeeper(zenith_env_builder: ZenithEnvBuilder):
|
||||
http_cli = sk.http_client()
|
||||
try:
|
||||
status = http_cli.timeline_status(tenant_id, timeline_id)
|
||||
log.info(f"Safekeeper {sk.name} status: {status}")
|
||||
log.info(f"Safekeeper {sk.id} status: {status}")
|
||||
except Exception as e:
|
||||
log.info(f"Safekeeper {sk.name} status error: {e}")
|
||||
log.info(f"Safekeeper {sk.id} status error: {e}")
|
||||
|
||||
zenith_env_builder.num_safekeepers = 4
|
||||
env = zenith_env_builder.init()
|
||||
env = zenith_env_builder.init_start()
|
||||
env.zenith_cli.create_branch("test_replace_safekeeper", "main")
|
||||
|
||||
log.info("Use only first 3 safekeepers")
|
||||
env.safekeepers[3].stop()
|
||||
active_safekeepers = ['sk1', 'sk2', 'sk3']
|
||||
active_safekeepers = [1, 2, 3]
|
||||
pg = env.postgres.create('test_replace_safekeeper')
|
||||
pg.adjust_for_wal_acceptors(safekeepers_guc(env, active_safekeepers))
|
||||
pg.start()
|
||||
@@ -678,7 +674,7 @@ def test_replace_safekeeper(zenith_env_builder: ZenithEnvBuilder):
|
||||
|
||||
log.info("Recreate postgres to replace failed sk1 with new sk4")
|
||||
pg.stop_and_destroy().create('test_replace_safekeeper')
|
||||
active_safekeepers = ['sk2', 'sk3', 'sk4']
|
||||
active_safekeepers = [2, 3, 4]
|
||||
env.safekeepers[3].start()
|
||||
pg.adjust_for_wal_acceptors(safekeepers_guc(env, active_safekeepers))
|
||||
pg.start()
|
||||
|
||||
@@ -200,7 +200,7 @@ async def run_restarts_under_load(pg: Postgres, acceptors: List[Safekeeper], n_w
|
||||
# restart acceptors one by one, while executing and validating bank transactions
|
||||
def test_restarts_under_load(zenith_env_builder: ZenithEnvBuilder):
|
||||
zenith_env_builder.num_safekeepers = 3
|
||||
env = zenith_env_builder.init()
|
||||
env = zenith_env_builder.init_start()
|
||||
|
||||
env.zenith_cli.create_branch("test_wal_acceptors_restarts_under_load", "main")
|
||||
pg = env.postgres.create_start('test_wal_acceptors_restarts_under_load')
|
||||
|
||||
@@ -97,7 +97,7 @@ def test_cli_tenant_list(zenith_simple_env: ZenithEnv):
|
||||
def test_cli_ipv4_listeners(zenith_env_builder: ZenithEnvBuilder):
|
||||
# Start with single sk
|
||||
zenith_env_builder.num_safekeepers = 1
|
||||
env = zenith_env_builder.init()
|
||||
env = zenith_env_builder.init_start()
|
||||
|
||||
# Connect to sk port on v4 loopback
|
||||
res = requests.get(f'http://127.0.0.1:{env.safekeepers[0].port.http}/v1/status')
|
||||
@@ -114,7 +114,7 @@ def test_cli_ipv4_listeners(zenith_env_builder: ZenithEnvBuilder):
|
||||
def test_cli_start_stop(zenith_env_builder: ZenithEnvBuilder):
|
||||
# Start with single sk
|
||||
zenith_env_builder.num_safekeepers = 1
|
||||
env = zenith_env_builder.init()
|
||||
env = zenith_env_builder.init_start()
|
||||
|
||||
# Stop default ps/sk
|
||||
env.zenith_cli.pageserver_stop()
|
||||
|
||||
@@ -27,7 +27,7 @@ from dataclasses import dataclass
|
||||
|
||||
# Type-related stuff
|
||||
from psycopg2.extensions import connection as PgConnection
|
||||
from typing import Any, Callable, Dict, Iterator, List, Optional, TypeVar, cast, Union, Tuple
|
||||
from typing import Any, Callable, Dict, Iterable, Iterator, List, Optional, TypeVar, cast, Union, Tuple
|
||||
from typing_extensions import Literal
|
||||
import pytest
|
||||
|
||||
@@ -425,6 +425,14 @@ class ZenithEnvBuilder:
|
||||
self.env = ZenithEnv(self)
|
||||
return self.env
|
||||
|
||||
def start(self):
|
||||
self.env.start()
|
||||
|
||||
def init_start(self) -> ZenithEnv:
|
||||
env = self.init()
|
||||
self.start()
|
||||
return env
|
||||
|
||||
"""
|
||||
Sets up the pageserver to use the local fs at the `test_dir/local_fs_remote_storage` path.
|
||||
Errors, if the pageserver has some remote storage configuration already, unless `force_enable` is not set to `True`.
|
||||
@@ -540,6 +548,7 @@ class ZenithEnv:
|
||||
|
||||
toml += textwrap.dedent(f"""
|
||||
[pageserver]
|
||||
id=1
|
||||
listen_pg_addr = 'localhost:{pageserver_port.pg}'
|
||||
listen_http_addr = 'localhost:{pageserver_port.http}'
|
||||
auth_type = '{pageserver_auth_type}'
|
||||
@@ -556,25 +565,22 @@ class ZenithEnv:
|
||||
pg=self.port_distributor.get_port(),
|
||||
http=self.port_distributor.get_port(),
|
||||
)
|
||||
|
||||
if config.num_safekeepers == 1:
|
||||
name = "single"
|
||||
else:
|
||||
name = f"sk{i}"
|
||||
toml += f"""
|
||||
[[safekeepers]]
|
||||
name = '{name}'
|
||||
pg_port = {port.pg}
|
||||
http_port = {port.http}
|
||||
sync = false # Disable fsyncs to make the tests go faster
|
||||
"""
|
||||
safekeeper = Safekeeper(env=self, name=name, port=port)
|
||||
id = i # assign ids sequentially
|
||||
toml += textwrap.dedent(f"""
|
||||
[[safekeepers]]
|
||||
id = {id}
|
||||
pg_port = {port.pg}
|
||||
http_port = {port.http}
|
||||
sync = false # Disable fsyncs to make the tests go faster
|
||||
""")
|
||||
safekeeper = Safekeeper(env=self, id=id, port=port)
|
||||
self.safekeepers.append(safekeeper)
|
||||
|
||||
log.info(f"Config: {toml}")
|
||||
|
||||
self.zenith_cli.init(toml)
|
||||
|
||||
def start(self):
|
||||
# Start up the page server and all the safekeepers
|
||||
self.pageserver.start()
|
||||
|
||||
@@ -615,7 +621,7 @@ def _shared_simple_env(request: Any, port_distributor) -> Iterator[ZenithEnv]:
|
||||
|
||||
with ZenithEnvBuilder(Path(repo_dir), port_distributor) as builder:
|
||||
|
||||
env = builder.init()
|
||||
env = builder.init_start()
|
||||
|
||||
# For convenience in tests, create a branch from the freshly-initialized cluster.
|
||||
env.zenith_cli.create_branch("empty", "main")
|
||||
@@ -649,7 +655,7 @@ def zenith_env_builder(test_output_dir, port_distributor) -> Iterator[ZenithEnvB
|
||||
To use, define 'zenith_env_builder' fixture in your test to get access to the
|
||||
builder object. Set properties on it to describe the environment.
|
||||
Finally, initialize and start up the environment by calling
|
||||
zenith_env_builder.init().
|
||||
zenith_env_builder.init_start().
|
||||
|
||||
After the initialization, you can launch compute nodes by calling
|
||||
the functions in the 'env.postgres' factory object, stop/start the
|
||||
@@ -835,8 +841,9 @@ class ZenithCli:
|
||||
|
||||
return self.raw_cli(cmd)
|
||||
|
||||
def pageserver_start(self) -> 'subprocess.CompletedProcess[str]':
|
||||
start_args = ['pageserver', 'start']
|
||||
def pageserver_start(self, overrides=()) -> 'subprocess.CompletedProcess[str]':
|
||||
start_args = ['pageserver', 'start', *overrides]
|
||||
|
||||
append_pageserver_param_overrides(start_args, self.env.pageserver.remote_storage)
|
||||
return self.raw_cli(start_args)
|
||||
|
||||
@@ -848,17 +855,17 @@ class ZenithCli:
|
||||
log.info(f"Stopping pageserver with {cmd}")
|
||||
return self.raw_cli(cmd)
|
||||
|
||||
def safekeeper_start(self, name: str) -> 'subprocess.CompletedProcess[str]':
|
||||
return self.raw_cli(['safekeeper', 'start', name])
|
||||
def safekeeper_start(self, id: int) -> 'subprocess.CompletedProcess[str]':
|
||||
return self.raw_cli(['safekeeper', 'start', str(id)])
|
||||
|
||||
def safekeeper_stop(self,
|
||||
name: Optional[str] = None,
|
||||
id: Optional[int] = None,
|
||||
immediate=False) -> 'subprocess.CompletedProcess[str]':
|
||||
args = ['safekeeper', 'stop']
|
||||
if id is not None:
|
||||
args.extend(str(id))
|
||||
if immediate:
|
||||
args.extend(['-m', 'immediate'])
|
||||
if name is not None:
|
||||
args.append(name)
|
||||
return self.raw_cli(args)
|
||||
|
||||
def pg_create(
|
||||
@@ -989,14 +996,14 @@ class ZenithPageserver(PgProtocol):
|
||||
self.service_port = port # do not shadow PgProtocol.port which is just int
|
||||
self.remote_storage = remote_storage
|
||||
|
||||
def start(self) -> 'ZenithPageserver':
|
||||
def start(self, overrides=()) -> 'ZenithPageserver':
|
||||
"""
|
||||
Start the page server.
|
||||
Returns self.
|
||||
"""
|
||||
assert self.running == False
|
||||
|
||||
self.env.zenith_cli.pageserver_start()
|
||||
self.env.zenith_cli.pageserver_start(overrides=overrides)
|
||||
self.running = True
|
||||
return self
|
||||
|
||||
@@ -1397,12 +1404,14 @@ class Safekeeper:
|
||||
""" An object representing a running safekeeper daemon. """
|
||||
env: ZenithEnv
|
||||
port: SafekeeperPort
|
||||
name: str # identifier for logging
|
||||
id: int
|
||||
auth_token: Optional[str] = None
|
||||
running: bool = False
|
||||
|
||||
def start(self) -> 'Safekeeper':
|
||||
self.env.zenith_cli.safekeeper_start(self.name)
|
||||
|
||||
assert self.running == False
|
||||
self.env.zenith_cli.safekeeper_start(self.id)
|
||||
self.running = True
|
||||
# wait for wal acceptor start by checking its status
|
||||
started_at = time.time()
|
||||
while True:
|
||||
@@ -1420,8 +1429,9 @@ class Safekeeper:
|
||||
return self
|
||||
|
||||
def stop(self, immediate=False) -> 'Safekeeper':
|
||||
log.info('Stopping safekeeper {}'.format(self.name))
|
||||
self.env.zenith_cli.safekeeper_stop(self.name, immediate)
|
||||
log.info('Stopping safekeeper {}'.format(self.id))
|
||||
self.env.zenith_cli.safekeeper_stop(self.id, immediate)
|
||||
self.running = False
|
||||
return self
|
||||
|
||||
def append_logical_message(self,
|
||||
|
||||
@@ -23,7 +23,7 @@ def test_bulk_tenant_create(
|
||||
"""Measure tenant creation time (with and without wal acceptors)"""
|
||||
if use_wal_acceptors == 'with_wa':
|
||||
zenith_env_builder.num_safekeepers = 3
|
||||
env = zenith_env_builder.init()
|
||||
env = zenith_env_builder.init_start()
|
||||
|
||||
time_slices = []
|
||||
|
||||
|
||||
@@ -1,17 +1,19 @@
|
||||
//
|
||||
// Main entry point for the safekeeper executable
|
||||
//
|
||||
use anyhow::{Context, Result};
|
||||
use anyhow::{bail, Context, Result};
|
||||
use clap::{App, Arg};
|
||||
use const_format::formatcp;
|
||||
use daemonize::Daemonize;
|
||||
use fs2::FileExt;
|
||||
use std::fs::File;
|
||||
use std::fs::{self, File};
|
||||
use std::io::{ErrorKind, Write};
|
||||
use std::path::{Path, PathBuf};
|
||||
use std::thread;
|
||||
use tracing::*;
|
||||
use walkeeper::control_file::{self, CreateControlFile};
|
||||
use zenith_utils::http::endpoint;
|
||||
use zenith_utils::zid::ZNodeId;
|
||||
use zenith_utils::{logging, tcp_listener, GIT_VERSION};
|
||||
|
||||
use tokio::sync::mpsc;
|
||||
@@ -25,6 +27,7 @@ use zenith_utils::shutdown::exit_now;
|
||||
use zenith_utils::signals;
|
||||
|
||||
const LOCK_FILE_NAME: &str = "safekeeper.lock";
|
||||
const ID_FILE_NAME: &str = "safekeeper.id";
|
||||
|
||||
fn main() -> Result<()> {
|
||||
zenith_metrics::set_common_metrics_prefix("safekeeper");
|
||||
@@ -93,6 +96,9 @@ fn main() -> Result<()> {
|
||||
.takes_value(true)
|
||||
.help("Dump control file at path specifed by this argument and exit"),
|
||||
)
|
||||
.arg(
|
||||
Arg::new("id").long("id").takes_value(true).help("safekeeper node id: integer")
|
||||
)
|
||||
.get_matches();
|
||||
|
||||
if let Some(addr) = arg_matches.value_of("dump-control-file") {
|
||||
@@ -136,10 +142,19 @@ fn main() -> Result<()> {
|
||||
conf.recall_period = humantime::parse_duration(recall)?;
|
||||
}
|
||||
|
||||
start_safekeeper(conf)
|
||||
let mut given_id = None;
|
||||
if let Some(given_id_str) = arg_matches.value_of("id") {
|
||||
given_id = Some(ZNodeId(
|
||||
given_id_str
|
||||
.parse()
|
||||
.context("failed to parse safekeeper id")?,
|
||||
));
|
||||
}
|
||||
|
||||
start_safekeeper(conf, given_id)
|
||||
}
|
||||
|
||||
fn start_safekeeper(conf: SafeKeeperConf) -> Result<()> {
|
||||
fn start_safekeeper(mut conf: SafeKeeperConf, given_id: Option<ZNodeId>) -> Result<()> {
|
||||
let log_file = logging::init("safekeeper.log", conf.daemonize)?;
|
||||
|
||||
info!("version: {}", GIT_VERSION);
|
||||
@@ -154,6 +169,9 @@ fn start_safekeeper(conf: SafeKeeperConf) -> Result<()> {
|
||||
)
|
||||
})?;
|
||||
|
||||
// Set or read our ID.
|
||||
set_id(&mut conf, given_id)?;
|
||||
|
||||
let http_listener = tcp_listener::bind(conf.listen_http_addr.clone()).map_err(|e| {
|
||||
error!("failed to bind to address {}: {}", conf.listen_http_addr, e);
|
||||
e
|
||||
@@ -260,3 +278,49 @@ fn start_safekeeper(conf: SafeKeeperConf) -> Result<()> {
|
||||
std::process::exit(111);
|
||||
})
|
||||
}
|
||||
|
||||
/// Determine safekeeper id and set it in config.
|
||||
fn set_id(conf: &mut SafeKeeperConf, given_id: Option<ZNodeId>) -> Result<()> {
|
||||
let id_file_path = conf.workdir.join(ID_FILE_NAME);
|
||||
|
||||
let my_id: ZNodeId;
|
||||
// If ID exists, read it in; otherwise set one passed
|
||||
match fs::read(&id_file_path) {
|
||||
Ok(id_serialized) => {
|
||||
my_id = ZNodeId(
|
||||
std::str::from_utf8(&id_serialized)
|
||||
.context("failed to parse safekeeper id")?
|
||||
.parse()
|
||||
.context("failed to parse safekeeper id")?,
|
||||
);
|
||||
if let Some(given_id) = given_id {
|
||||
if given_id != my_id {
|
||||
bail!(
|
||||
"safekeeper already initialized with id {}, can't set {}",
|
||||
my_id,
|
||||
given_id
|
||||
);
|
||||
}
|
||||
}
|
||||
info!("safekeeper ID {}", my_id);
|
||||
}
|
||||
Err(error) => match error.kind() {
|
||||
ErrorKind::NotFound => {
|
||||
my_id = if let Some(given_id) = given_id {
|
||||
given_id
|
||||
} else {
|
||||
bail!("safekeeper id is not specified");
|
||||
};
|
||||
let mut f = File::create(&id_file_path)?;
|
||||
f.write_all(my_id.to_string().as_bytes())?;
|
||||
f.sync_all()?;
|
||||
info!("initialized safekeeper ID {}", my_id);
|
||||
}
|
||||
_ => {
|
||||
return Err(error.into());
|
||||
}
|
||||
},
|
||||
}
|
||||
conf.my_id = my_id;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
@@ -5,6 +5,7 @@ use std::fmt::Display;
|
||||
use std::sync::Arc;
|
||||
use zenith_utils::http::{RequestExt, RouterBuilder};
|
||||
use zenith_utils::lsn::Lsn;
|
||||
use zenith_utils::zid::ZNodeId;
|
||||
use zenith_utils::zid::ZTenantTimelineId;
|
||||
|
||||
use crate::control_file::CreateControlFile;
|
||||
@@ -18,9 +19,16 @@ use zenith_utils::http::json::json_response;
|
||||
use zenith_utils::http::request::parse_request_param;
|
||||
use zenith_utils::zid::{ZTenantId, ZTimelineId};
|
||||
|
||||
#[derive(Debug, Serialize)]
|
||||
struct SafekeeperStatus {
|
||||
id: ZNodeId,
|
||||
}
|
||||
|
||||
/// Healthcheck handler.
|
||||
async fn status_handler(_: Request<Body>) -> Result<Response<Body>, ApiError> {
|
||||
Ok(json_response(StatusCode::OK, "")?)
|
||||
async fn status_handler(request: Request<Body>) -> Result<Response<Body>, ApiError> {
|
||||
let conf = get_conf(&request);
|
||||
let status = SafekeeperStatus { id: conf.my_id };
|
||||
Ok(json_response(StatusCode::OK, status)?)
|
||||
}
|
||||
|
||||
fn get_conf(request: &Request<Body>) -> &SafeKeeperConf {
|
||||
|
||||
@@ -2,7 +2,7 @@
|
||||
use std::path::PathBuf;
|
||||
use std::time::Duration;
|
||||
|
||||
use zenith_utils::zid::ZTenantTimelineId;
|
||||
use zenith_utils::zid::{ZNodeId, ZTenantTimelineId};
|
||||
|
||||
pub mod callmemaybe;
|
||||
pub mod control_file;
|
||||
@@ -46,6 +46,7 @@ pub struct SafeKeeperConf {
|
||||
pub listen_http_addr: String,
|
||||
pub ttl: Option<Duration>,
|
||||
pub recall_period: Duration,
|
||||
pub my_id: ZNodeId,
|
||||
}
|
||||
|
||||
impl SafeKeeperConf {
|
||||
@@ -69,6 +70,7 @@ impl Default for SafeKeeperConf {
|
||||
listen_http_addr: defaults::DEFAULT_HTTP_LISTEN_ADDR.to_string(),
|
||||
ttl: None,
|
||||
recall_period: defaults::DEFAULT_RECALL_PERIOD,
|
||||
my_id: ZNodeId(0),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -18,32 +18,35 @@ use walkeeper::defaults::{
|
||||
};
|
||||
use zenith_utils::auth::{Claims, Scope};
|
||||
use zenith_utils::postgres_backend::AuthType;
|
||||
use zenith_utils::zid::{ZTenantId, ZTimelineId};
|
||||
use zenith_utils::zid::{ZNodeId, ZTenantId, ZTimelineId};
|
||||
use zenith_utils::GIT_VERSION;
|
||||
|
||||
use pageserver::branches::BranchInfo;
|
||||
|
||||
// Default name of a safekeeper node, if not specified on the command line.
|
||||
const DEFAULT_SAFEKEEPER_NAME: &str = "single";
|
||||
// Default id of a safekeeper node, if not specified on the command line.
|
||||
const DEFAULT_SAFEKEEPER_ID: ZNodeId = ZNodeId(1);
|
||||
const DEFAULT_PAGESERVER_ID: ZNodeId = ZNodeId(1);
|
||||
|
||||
fn default_conf() -> String {
|
||||
format!(
|
||||
r#"
|
||||
# Default built-in configuration, defined in main.rs
|
||||
[pageserver]
|
||||
id = {pageserver_id}
|
||||
listen_pg_addr = '{pageserver_pg_addr}'
|
||||
listen_http_addr = '{pageserver_http_addr}'
|
||||
auth_type = '{pageserver_auth_type}'
|
||||
|
||||
[[safekeepers]]
|
||||
name = '{safekeeper_name}'
|
||||
id = {safekeeper_id}
|
||||
pg_port = {safekeeper_pg_port}
|
||||
http_port = {safekeeper_http_port}
|
||||
"#,
|
||||
pageserver_id = DEFAULT_PAGESERVER_ID,
|
||||
pageserver_pg_addr = DEFAULT_PAGESERVER_PG_ADDR,
|
||||
pageserver_http_addr = DEFAULT_PAGESERVER_HTTP_ADDR,
|
||||
pageserver_auth_type = AuthType::Trust,
|
||||
safekeeper_name = DEFAULT_SAFEKEEPER_NAME,
|
||||
safekeeper_id = DEFAULT_SAFEKEEPER_ID,
|
||||
safekeeper_pg_port = DEFAULT_SAFEKEEPER_PG_PORT,
|
||||
safekeeper_http_port = DEFAULT_SAFEKEEPER_HTTP_PORT,
|
||||
)
|
||||
@@ -74,9 +77,9 @@ fn main() -> Result<()> {
|
||||
.required(true);
|
||||
|
||||
#[rustfmt::skip]
|
||||
let safekeeper_node_arg = Arg::new("node")
|
||||
let safekeeper_id_arg = Arg::new("id")
|
||||
.index(1)
|
||||
.help("Node name")
|
||||
.help("safekeeper id")
|
||||
.required(false);
|
||||
|
||||
let timeline_arg = Arg::new("timeline")
|
||||
@@ -154,16 +157,16 @@ fn main() -> Result<()> {
|
||||
.about("Manage safekeepers")
|
||||
.subcommand(App::new("start")
|
||||
.about("Start local safekeeper")
|
||||
.arg(safekeeper_node_arg.clone())
|
||||
.arg(safekeeper_id_arg.clone())
|
||||
)
|
||||
.subcommand(App::new("stop")
|
||||
.about("Stop local safekeeper")
|
||||
.arg(safekeeper_node_arg.clone())
|
||||
.arg(safekeeper_id_arg.clone())
|
||||
.arg(stop_mode_arg.clone())
|
||||
)
|
||||
.subcommand(App::new("restart")
|
||||
.about("Restart local safekeeper")
|
||||
.arg(safekeeper_node_arg.clone())
|
||||
.arg(safekeeper_id_arg.clone())
|
||||
.arg(stop_mode_arg.clone())
|
||||
)
|
||||
)
|
||||
@@ -628,11 +631,11 @@ fn handle_pageserver(sub_match: &ArgMatches, env: &local_env::LocalEnv) -> Resul
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn get_safekeeper(env: &local_env::LocalEnv, name: &str) -> Result<SafekeeperNode> {
|
||||
if let Some(node) = env.safekeepers.iter().find(|node| node.name == name) {
|
||||
fn get_safekeeper(env: &local_env::LocalEnv, id: ZNodeId) -> Result<SafekeeperNode> {
|
||||
if let Some(node) = env.safekeepers.iter().find(|node| node.id == id) {
|
||||
Ok(SafekeeperNode::from_env(env, node))
|
||||
} else {
|
||||
bail!("could not find safekeeper '{}'", name)
|
||||
bail!("could not find safekeeper '{}'", id)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -643,8 +646,12 @@ fn handle_safekeeper(sub_match: &ArgMatches, env: &local_env::LocalEnv) -> Resul
|
||||
};
|
||||
|
||||
// All the commands take an optional safekeeper name argument
|
||||
let node_name = sub_args.value_of("node").unwrap_or(DEFAULT_SAFEKEEPER_NAME);
|
||||
let safekeeper = get_safekeeper(env, node_name)?;
|
||||
let sk_id = if let Some(id_str) = sub_args.value_of("id") {
|
||||
ZNodeId(id_str.parse().context("while parsing safekeeper id")?)
|
||||
} else {
|
||||
DEFAULT_SAFEKEEPER_ID
|
||||
};
|
||||
let safekeeper = get_safekeeper(env, sk_id)?;
|
||||
|
||||
match sub_name {
|
||||
"start" => {
|
||||
@@ -697,7 +704,7 @@ fn handle_start_all(sub_match: &ArgMatches, env: &local_env::LocalEnv) -> Result
|
||||
for node in env.safekeepers.iter() {
|
||||
let safekeeper = SafekeeperNode::from_env(env, node);
|
||||
if let Err(e) = safekeeper.start() {
|
||||
eprintln!("safekeeper '{}' start failed: {}", safekeeper.name, e);
|
||||
eprintln!("safekeeper '{}' start failed: {}", safekeeper.id, e);
|
||||
exit(1);
|
||||
}
|
||||
}
|
||||
@@ -724,7 +731,7 @@ fn handle_stop_all(sub_match: &ArgMatches, env: &local_env::LocalEnv) -> Result<
|
||||
for node in env.safekeepers.iter() {
|
||||
let safekeeper = SafekeeperNode::from_env(env, node);
|
||||
if let Err(e) = safekeeper.stop(immediate) {
|
||||
eprintln!("safekeeper '{}' stop failed: {}", safekeeper.name, e);
|
||||
eprintln!("safekeeper '{}' stop failed: {}", safekeeper.id, e);
|
||||
}
|
||||
}
|
||||
Ok(())
|
||||
|
||||
@@ -221,6 +221,18 @@ impl fmt::Display for ZTenantTimelineId {
|
||||
}
|
||||
}
|
||||
|
||||
// Unique ID of a storage node (safekeeper or pageserver). Supposed to be issued
|
||||
// by the console.
|
||||
#[derive(Clone, Copy, Eq, Ord, PartialEq, PartialOrd, Debug, Serialize, Deserialize)]
|
||||
#[serde(transparent)]
|
||||
pub struct ZNodeId(pub u64);
|
||||
|
||||
impl fmt::Display for ZNodeId {
|
||||
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
|
||||
write!(f, "{}", self.0)
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use std::fmt::Display;
|
||||
|
||||
Reference in New Issue
Block a user