Compare commits

...

4 Commits

Author SHA1 Message Date
Arseny Sher
23644ed251 set pageserver id in dockerfile 2022-02-23 09:17:45 +03:00
Dmitry Rodionov
99e0f07a1d review adjustments, fancy enum for builder, minor cleanups 2022-02-23 08:33:50 +03:00
Dmitry Rodionov
5d490babf8 add node id to pageserver
This adds node id parameter to pageserver configuration. Also I use a
simple builder to construct pageserver config struct to avoid setting
node id to some temporary invalid value. Some of the changes in test
fixtures are needed to split init and start operations for envrionment.
2022-02-23 08:33:50 +03:00
Arseny Sher
5865f85ae2 Add --id argument to safekeeper setting its unique u64 id.
In preparation for storage node messaging. IDs are supposed to be monotonically
assigned by the console. In tests it is issued by ZenithEnv; at the zenith cli
level and fixtures, string name is completely replaced by integer id. Example
TOML configs are adjusted accordingly.

Sequential ids are chosen over Zid mainly because they are compact and easy to
type/remember.
2022-02-23 08:33:50 +03:00
33 changed files with 467 additions and 164 deletions

View File

@@ -5,16 +5,16 @@ listen_http_addr = '127.0.0.1:9898'
auth_type = 'Trust'
[[safekeepers]]
name = 'sk1'
id = 1
pg_port = 5454
http_port = 7676
[[safekeepers]]
name = 'sk2'
id = 2
pg_port = 5455
http_port = 7677
[[safekeepers]]
name = 'sk3'
id = 3
pg_port = 5456
http_port = 7678

View File

@@ -6,6 +6,6 @@ listen_http_addr = '127.0.0.1:9898'
auth_type = 'Trust'
[[safekeepers]]
name = 'single'
id = 1
pg_port = 5454
http_port = 7676

View File

@@ -12,7 +12,9 @@ use std::path::{Path, PathBuf};
use std::process::{Command, Stdio};
use zenith_utils::auth::{encode_from_key_file, Claims, Scope};
use zenith_utils::postgres_backend::AuthType;
use zenith_utils::zid::{opt_display_serde, ZTenantId};
use zenith_utils::zid::{opt_display_serde, ZNodeId, ZTenantId};
use crate::safekeeper::SafekeeperNode;
//
// This data structures represents zenith CLI config
@@ -62,6 +64,8 @@ pub struct LocalEnv {
#[derive(Serialize, Deserialize, Clone, Debug)]
#[serde(default)]
pub struct PageServerConf {
// node id
pub id: ZNodeId,
// Pageserver connection settings
pub listen_pg_addr: String,
pub listen_http_addr: String,
@@ -76,6 +80,7 @@ pub struct PageServerConf {
impl Default for PageServerConf {
fn default() -> Self {
Self {
id: ZNodeId(0),
listen_pg_addr: String::new(),
listen_http_addr: String::new(),
auth_type: AuthType::Trust,
@@ -87,7 +92,7 @@ impl Default for PageServerConf {
#[derive(Serialize, Deserialize, Clone, Debug)]
#[serde(default)]
pub struct SafekeeperConf {
pub name: String,
pub id: ZNodeId,
pub pg_port: u16,
pub http_port: u16,
pub sync: bool,
@@ -96,7 +101,7 @@ pub struct SafekeeperConf {
impl Default for SafekeeperConf {
fn default() -> Self {
Self {
name: String::new(),
id: ZNodeId(0),
pg_port: 0,
http_port: 0,
sync: true,
@@ -136,8 +141,8 @@ impl LocalEnv {
self.base_data_dir.clone()
}
pub fn safekeeper_data_dir(&self, node_name: &str) -> PathBuf {
self.base_data_dir.join("safekeepers").join(node_name)
pub fn safekeeper_data_dir(&self, data_dir_name: &str) -> PathBuf {
self.base_data_dir.join("safekeepers").join(data_dir_name)
}
/// Create a LocalEnv from a config file.
@@ -285,7 +290,7 @@ impl LocalEnv {
fs::create_dir_all(self.pg_data_dirs_path())?;
for safekeeper in &self.safekeepers {
fs::create_dir_all(self.safekeeper_data_dir(&safekeeper.name))?;
fs::create_dir_all(SafekeeperNode::datadir_path_by_id(self, safekeeper.id))?;
}
let mut conf_content = String::new();

View File

@@ -15,6 +15,7 @@ use reqwest::blocking::{Client, RequestBuilder, Response};
use reqwest::{IntoUrl, Method};
use thiserror::Error;
use zenith_utils::http::error::HttpErrorBody;
use zenith_utils::zid::ZNodeId;
use crate::local_env::{LocalEnv, SafekeeperConf};
use crate::storage::PageServerNode;
@@ -61,7 +62,7 @@ impl ResponseErrorMessageExt for Response {
//
#[derive(Debug)]
pub struct SafekeeperNode {
pub name: String,
pub id: ZNodeId,
pub conf: SafekeeperConf,
@@ -77,10 +78,10 @@ impl SafekeeperNode {
pub fn from_env(env: &LocalEnv, conf: &SafekeeperConf) -> SafekeeperNode {
let pageserver = Arc::new(PageServerNode::from_env(env));
println!("initializing for {} for {}", conf.name, conf.http_port);
println!("initializing for sk {} for {}", conf.id, conf.http_port);
SafekeeperNode {
name: conf.name.clone(),
id: conf.id,
conf: conf.clone(),
pg_connection_config: Self::safekeeper_connection_config(conf.pg_port),
env: env.clone(),
@@ -98,8 +99,12 @@ impl SafekeeperNode {
.unwrap()
}
pub fn datadir_path_by_id(env: &LocalEnv, sk_id: ZNodeId) -> PathBuf {
env.safekeeper_data_dir(format!("sk{}", sk_id).as_ref())
}
pub fn datadir_path(&self) -> PathBuf {
self.env.safekeeper_data_dir(&self.name)
SafekeeperNode::datadir_path_by_id(&self.env, self.id)
}
pub fn pid_file(&self) -> PathBuf {
@@ -120,6 +125,7 @@ impl SafekeeperNode {
let mut cmd = Command::new(self.env.safekeeper_bin()?);
fill_rust_env_vars(
cmd.args(&["-D", self.datadir_path().to_str().unwrap()])
.args(&["--id", self.id.to_string().as_ref()])
.args(&["--listen-pg", &listen_pg])
.args(&["--listen-http", &listen_http])
.args(&["--recall", "1 second"])
@@ -183,7 +189,7 @@ impl SafekeeperNode {
pub fn stop(&self, immediate: bool) -> anyhow::Result<()> {
let pid_file = self.pid_file();
if !pid_file.exists() {
println!("Safekeeper {} is already stopped", self.name);
println!("Safekeeper {} is already stopped", self.id);
return Ok(());
}
let pid = read_pidfile(&pid_file)?;

View File

@@ -103,6 +103,8 @@ impl PageServerNode {
) -> anyhow::Result<()> {
let mut cmd = Command::new(self.env.pageserver_bin()?);
let id = format!("id={}", self.env.pageserver.id);
// FIXME: the paths should be shell-escaped to handle paths with spaces, quotas etc.
let base_data_dir_param = self.env.base_data_dir.display().to_string();
let pg_distrib_dir_param =
@@ -122,6 +124,7 @@ impl PageServerNode {
args.extend(["-c", &authg_type_param]);
args.extend(["-c", &listen_http_addr_param]);
args.extend(["-c", &listen_pg_addr_param]);
args.extend(["-c", &id]);
for config_override in config_overrides {
args.extend(["-c", config_override]);

View File

@@ -4,7 +4,7 @@ set -eux
if [ "$1" = 'pageserver' ]; then
if [ ! -d "/data/tenants" ]; then
echo "Initializing pageserver data directory"
pageserver --init -D /data -c "pg_distrib_dir='/usr/local'"
pageserver --init -D /data -c "pg_distrib_dir='/usr/local'" -c "id=10"
fi
echo "Staring pageserver at 0.0.0.0:6400"
pageserver -c "listen_pg_addr='0.0.0.0:6400'" -c "listen_http_addr='0.0.0.0:9898'" -D /data

View File

@@ -61,7 +61,7 @@ fn main() -> Result<()> {
.number_of_values(1)
.multiple_occurrences(true)
.help("Additional configuration overrides of the ones from the toml config file (or new ones to add there).
Any option has to be a valid toml document, example: `-c \"foo='hey'\"` `-c \"foo={value=1}\"`"),
Any option has to be a valid toml document, example: `-c=\"foo='hey'\"` `-c=\"foo={value=1}\"`"),
)
.get_matches();
@@ -115,7 +115,14 @@ fn main() -> Result<()> {
option_line
)
})?;
for (key, item) in doc.iter() {
if key == "id" {
anyhow::ensure!(
init,
"node id can only be set during pageserver init and cannot be overridden"
);
}
toml.insert(key, item.clone());
}
}

View File

@@ -8,7 +8,7 @@ use anyhow::{bail, ensure, Context, Result};
use toml_edit;
use toml_edit::{Document, Item};
use zenith_utils::postgres_backend::AuthType;
use zenith_utils::zid::{ZTenantId, ZTimelineId};
use zenith_utils::zid::{ZNodeId, ZTenantId, ZTimelineId};
use std::convert::TryInto;
use std::env;
@@ -72,6 +72,10 @@ pub mod defaults {
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct PageServerConf {
// Identifier of that particular pageserver so e g safekeepers
// can safely distinguish different pageservers
pub id: ZNodeId,
/// Example (default): 127.0.0.1:64000
pub listen_pg_addr: String,
/// Example (default): 127.0.0.1:9898
@@ -106,6 +110,184 @@ pub struct PageServerConf {
pub remote_storage_config: Option<RemoteStorageConfig>,
}
// use dedicated enum for builder to better indicate the intention
// and avoid possible confusion with nested options
pub enum BuilderValue<T> {
Set(T),
NotSet,
}
impl<T> BuilderValue<T> {
pub fn ok_or<E>(self, err: E) -> Result<T, E> {
match self {
Self::Set(v) => Ok(v),
Self::NotSet => Err(err),
}
}
}
// needed to simplify config construction
struct PageServerConfigBuilder {
listen_pg_addr: BuilderValue<String>,
listen_http_addr: BuilderValue<String>,
checkpoint_distance: BuilderValue<u64>,
checkpoint_period: BuilderValue<Duration>,
gc_horizon: BuilderValue<u64>,
gc_period: BuilderValue<Duration>,
superuser: BuilderValue<String>,
page_cache_size: BuilderValue<usize>,
max_file_descriptors: BuilderValue<usize>,
workdir: BuilderValue<PathBuf>,
pg_distrib_dir: BuilderValue<PathBuf>,
auth_type: BuilderValue<AuthType>,
//
auth_validation_public_key_path: BuilderValue<Option<PathBuf>>,
remote_storage_config: BuilderValue<Option<RemoteStorageConfig>>,
id: BuilderValue<ZNodeId>,
}
impl Default for PageServerConfigBuilder {
fn default() -> Self {
use self::BuilderValue::*;
use defaults::*;
Self {
listen_pg_addr: Set(DEFAULT_PG_LISTEN_ADDR.to_string()),
listen_http_addr: Set(DEFAULT_HTTP_LISTEN_ADDR.to_string()),
checkpoint_distance: Set(DEFAULT_CHECKPOINT_DISTANCE),
checkpoint_period: Set(humantime::parse_duration(DEFAULT_CHECKPOINT_PERIOD)
.expect("cannot parse default checkpoint period")),
gc_horizon: Set(DEFAULT_GC_HORIZON),
gc_period: Set(humantime::parse_duration(DEFAULT_GC_PERIOD)
.expect("cannot parse default gc period")),
superuser: Set(DEFAULT_SUPERUSER.to_string()),
page_cache_size: Set(DEFAULT_PAGE_CACHE_SIZE),
max_file_descriptors: Set(DEFAULT_MAX_FILE_DESCRIPTORS),
workdir: Set(PathBuf::new()),
pg_distrib_dir: Set(env::current_dir()
.expect("cannot access current directory")
.join("tmp_install")),
auth_type: Set(AuthType::Trust),
auth_validation_public_key_path: Set(None),
remote_storage_config: Set(None),
id: NotSet,
}
}
}
impl PageServerConfigBuilder {
pub fn listen_pg_addr(&mut self, listen_pg_addr: String) {
self.listen_pg_addr = BuilderValue::Set(listen_pg_addr)
}
pub fn listen_http_addr(&mut self, listen_http_addr: String) {
self.listen_http_addr = BuilderValue::Set(listen_http_addr)
}
pub fn checkpoint_distance(&mut self, checkpoint_distance: u64) {
self.checkpoint_distance = BuilderValue::Set(checkpoint_distance)
}
pub fn checkpoint_period(&mut self, checkpoint_period: Duration) {
self.checkpoint_period = BuilderValue::Set(checkpoint_period)
}
pub fn gc_horizon(&mut self, gc_horizon: u64) {
self.gc_horizon = BuilderValue::Set(gc_horizon)
}
pub fn gc_period(&mut self, gc_period: Duration) {
self.gc_period = BuilderValue::Set(gc_period)
}
pub fn superuser(&mut self, superuser: String) {
self.superuser = BuilderValue::Set(superuser)
}
pub fn page_cache_size(&mut self, page_cache_size: usize) {
self.page_cache_size = BuilderValue::Set(page_cache_size)
}
pub fn max_file_descriptors(&mut self, max_file_descriptors: usize) {
self.max_file_descriptors = BuilderValue::Set(max_file_descriptors)
}
pub fn workdir(&mut self, workdir: PathBuf) {
self.workdir = BuilderValue::Set(workdir)
}
pub fn pg_distrib_dir(&mut self, pg_distrib_dir: PathBuf) {
self.pg_distrib_dir = BuilderValue::Set(pg_distrib_dir)
}
pub fn auth_type(&mut self, auth_type: AuthType) {
self.auth_type = BuilderValue::Set(auth_type)
}
pub fn auth_validation_public_key_path(
&mut self,
auth_validation_public_key_path: Option<PathBuf>,
) {
self.auth_validation_public_key_path = BuilderValue::Set(auth_validation_public_key_path)
}
pub fn remote_storage_config(&mut self, remote_storage_config: Option<RemoteStorageConfig>) {
self.remote_storage_config = BuilderValue::Set(remote_storage_config)
}
pub fn id(&mut self, node_id: ZNodeId) {
self.id = BuilderValue::Set(node_id)
}
pub fn build(self) -> Result<PageServerConf> {
Ok(PageServerConf {
listen_pg_addr: self
.listen_pg_addr
.ok_or(anyhow::anyhow!("missing listen_pg_addr"))?,
listen_http_addr: self
.listen_http_addr
.ok_or(anyhow::anyhow!("missing listen_http_addr"))?,
checkpoint_distance: self
.checkpoint_distance
.ok_or(anyhow::anyhow!("missing checkpoint_distance"))?,
checkpoint_period: self
.checkpoint_period
.ok_or(anyhow::anyhow!("missing checkpoint_period"))?,
gc_horizon: self
.gc_horizon
.ok_or(anyhow::anyhow!("missing gc_horizon"))?,
gc_period: self.gc_period.ok_or(anyhow::anyhow!("missing gc_period"))?,
superuser: self.superuser.ok_or(anyhow::anyhow!("missing superuser"))?,
page_cache_size: self
.page_cache_size
.ok_or(anyhow::anyhow!("missing page_cache_size"))?,
max_file_descriptors: self
.max_file_descriptors
.ok_or(anyhow::anyhow!("missing max_file_descriptors"))?,
workdir: self.workdir.ok_or(anyhow::anyhow!("missing workdir"))?,
pg_distrib_dir: self
.pg_distrib_dir
.ok_or(anyhow::anyhow!("missing pg_distrib_dir"))?,
auth_type: self.auth_type.ok_or(anyhow::anyhow!("missing auth_type"))?,
auth_validation_public_key_path: self
.auth_validation_public_key_path
.ok_or(anyhow::anyhow!("missing auth_validation_public_key_path"))?,
remote_storage_config: self
.remote_storage_config
.ok_or(anyhow::anyhow!("missing remote_storage_config"))?,
id: self.id.ok_or(anyhow::anyhow!("missing id"))?,
})
}
}
/// External backup storage configuration, enough for creating a client for that storage.
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct RemoteStorageConfig {
@@ -221,57 +403,39 @@ impl PageServerConf {
///
/// This leaves any options not present in the file in the built-in defaults.
pub fn parse_and_validate(toml: &Document, workdir: &Path) -> Result<Self> {
use defaults::*;
let mut conf = PageServerConf {
workdir: workdir.to_path_buf(),
listen_pg_addr: DEFAULT_PG_LISTEN_ADDR.to_string(),
listen_http_addr: DEFAULT_HTTP_LISTEN_ADDR.to_string(),
checkpoint_distance: DEFAULT_CHECKPOINT_DISTANCE,
checkpoint_period: humantime::parse_duration(DEFAULT_CHECKPOINT_PERIOD)?,
gc_horizon: DEFAULT_GC_HORIZON,
gc_period: humantime::parse_duration(DEFAULT_GC_PERIOD)?,
page_cache_size: DEFAULT_PAGE_CACHE_SIZE,
max_file_descriptors: DEFAULT_MAX_FILE_DESCRIPTORS,
pg_distrib_dir: PathBuf::new(),
auth_validation_public_key_path: None,
auth_type: AuthType::Trust,
remote_storage_config: None,
superuser: DEFAULT_SUPERUSER.to_string(),
};
let mut builder = PageServerConfigBuilder::default();
builder.workdir(workdir.to_owned());
for (key, item) in toml.iter() {
match key {
"listen_pg_addr" => conf.listen_pg_addr = parse_toml_string(key, item)?,
"listen_http_addr" => conf.listen_http_addr = parse_toml_string(key, item)?,
"checkpoint_distance" => conf.checkpoint_distance = parse_toml_u64(key, item)?,
"checkpoint_period" => conf.checkpoint_period = parse_toml_duration(key, item)?,
"gc_horizon" => conf.gc_horizon = parse_toml_u64(key, item)?,
"gc_period" => conf.gc_period = parse_toml_duration(key, item)?,
"initial_superuser_name" => conf.superuser = parse_toml_string(key, item)?,
"page_cache_size" => conf.page_cache_size = parse_toml_u64(key, item)? as usize,
"listen_pg_addr" => builder.listen_pg_addr(parse_toml_string(key, item)?),
"listen_http_addr" => builder.listen_http_addr(parse_toml_string(key, item)?),
"checkpoint_distance" => builder.checkpoint_distance(parse_toml_u64(key, item)?),
"checkpoint_period" => builder.checkpoint_period(parse_toml_duration(key, item)?),
"gc_horizon" => builder.gc_horizon(parse_toml_u64(key, item)?),
"gc_period" => builder.gc_period(parse_toml_duration(key, item)?),
"initial_superuser_name" => builder.superuser(parse_toml_string(key, item)?),
"page_cache_size" => builder.page_cache_size(parse_toml_u64(key, item)? as usize),
"max_file_descriptors" => {
conf.max_file_descriptors = parse_toml_u64(key, item)? as usize
builder.max_file_descriptors(parse_toml_u64(key, item)? as usize)
}
"pg_distrib_dir" => {
conf.pg_distrib_dir = PathBuf::from(parse_toml_string(key, item)?)
builder.pg_distrib_dir(PathBuf::from(parse_toml_string(key, item)?))
}
"auth_validation_public_key_path" => {
conf.auth_validation_public_key_path =
Some(PathBuf::from(parse_toml_string(key, item)?))
}
"auth_type" => conf.auth_type = parse_toml_auth_type(key, item)?,
"auth_validation_public_key_path" => builder.auth_validation_public_key_path(Some(
PathBuf::from(parse_toml_string(key, item)?),
)),
"auth_type" => builder.auth_type(parse_toml_auth_type(key, item)?),
"remote_storage" => {
conf.remote_storage_config = Some(Self::parse_remote_storage_config(item)?)
builder.remote_storage_config(Some(Self::parse_remote_storage_config(item)?))
}
"id" => builder.id(ZNodeId(parse_toml_u64(key, item)?)),
_ => bail!("unrecognized pageserver option '{}'", key),
}
}
let mut conf = builder.build().context("invalid config")?;
if conf.auth_type == AuthType::ZenithJWT {
let auth_validation_public_key_path = conf
.auth_validation_public_key_path
@@ -285,9 +449,6 @@ impl PageServerConf {
);
}
if conf.pg_distrib_dir == PathBuf::new() {
conf.pg_distrib_dir = env::current_dir()?.join("tmp_install")
};
if !conf.pg_distrib_dir.join("bin/postgres").exists() {
bail!(
"Can't find postgres binary at {}",
@@ -382,6 +543,7 @@ impl PageServerConf {
#[cfg(test)]
pub fn dummy_conf(repo_dir: PathBuf) -> Self {
PageServerConf {
id: ZNodeId(0),
checkpoint_distance: defaults::DEFAULT_CHECKPOINT_DISTANCE,
checkpoint_period: Duration::from_secs(10),
gc_horizon: defaults::DEFAULT_GC_HORIZON,
@@ -461,15 +623,16 @@ max_file_descriptors = 333
# initial superuser role name to use when creating a new tenant
initial_superuser_name = 'zzzz'
id = 10
"#;
"#;
#[test]
fn parse_defaults() -> anyhow::Result<()> {
let tempdir = tempdir()?;
let (workdir, pg_distrib_dir) = prepare_fs(&tempdir)?;
// we have to create dummy pathes to overcome the validation errors
let config_string = format!("pg_distrib_dir='{}'", pg_distrib_dir.display());
let config_string = format!("pg_distrib_dir='{}'\nid=10", pg_distrib_dir.display());
let toml = config_string.parse()?;
let parsed_config =
@@ -480,6 +643,7 @@ initial_superuser_name = 'zzzz'
assert_eq!(
parsed_config,
PageServerConf {
id: ZNodeId(10),
listen_pg_addr: defaults::DEFAULT_PG_LISTEN_ADDR.to_string(),
listen_http_addr: defaults::DEFAULT_HTTP_LISTEN_ADDR.to_string(),
checkpoint_distance: defaults::DEFAULT_CHECKPOINT_DISTANCE,
@@ -521,6 +685,7 @@ initial_superuser_name = 'zzzz'
assert_eq!(
parsed_config,
PageServerConf {
id: ZNodeId(10),
listen_pg_addr: "127.0.0.1:64000".to_string(),
listen_http_addr: "127.0.0.1:9898".to_string(),
checkpoint_distance: 111,

View File

@@ -1,6 +1,7 @@
use serde::{Deserialize, Serialize};
use crate::ZTenantId;
use zenith_utils::zid::ZNodeId;
#[derive(Serialize, Deserialize)]
pub struct BranchCreateRequest {
@@ -15,3 +16,8 @@ pub struct TenantCreateRequest {
#[serde(with = "hex")]
pub tenant_id: ZTenantId,
}
#[derive(Serialize)]
pub struct StatusResponse {
pub id: ZNodeId,
}

View File

@@ -17,6 +17,11 @@ paths:
application/json:
schema:
type: object
required:
- id
properties:
id:
type: integer
/v1/timeline/{tenant_id}:
parameters:
- name: tenant_id

View File

@@ -1,7 +1,6 @@
use std::sync::Arc;
use anyhow::{Context, Result};
use hyper::header;
use hyper::StatusCode;
use hyper::{Body, Request, Response, Uri};
use serde::Serialize;
@@ -23,6 +22,7 @@ use zenith_utils::lsn::Lsn;
use zenith_utils::zid::{opt_display_serde, ZTimelineId};
use super::models::BranchCreateRequest;
use super::models::StatusResponse;
use super::models::TenantCreateRequest;
use crate::branches::BranchInfo;
use crate::repository::RepositoryTimeline;
@@ -64,12 +64,12 @@ fn get_config(request: &Request<Body>) -> &'static PageServerConf {
}
// healthcheck handler
async fn status_handler(_: Request<Body>) -> Result<Response<Body>, ApiError> {
Ok(Response::builder()
.status(StatusCode::OK)
.header(header::CONTENT_TYPE, "application/json")
.body(Body::from("{}"))
.map_err(ApiError::from_err)?)
async fn status_handler(request: Request<Body>) -> Result<Response<Body>, ApiError> {
let config = get_config(&request);
Ok(json_response(
StatusCode::OK,
StatusResponse { id: config.id },
)?)
}
async fn branch_create_handler(mut request: Request<Body>) -> Result<Response<Body>, ApiError> {

View File

@@ -89,7 +89,7 @@ def test_foobar(zenith_env_builder: ZenithEnvBuilder):
# Now create the environment. This initializes the repository, and starts
# up the page server and the safekeepers
env = zenith_env_builder.init()
env = zenith_env_builder.init_start()
# Run the test
...

View File

@@ -8,7 +8,7 @@ import pytest
def test_pageserver_auth(zenith_env_builder: ZenithEnvBuilder):
zenith_env_builder.pageserver_auth_enabled = True
env = zenith_env_builder.init()
env = zenith_env_builder.init_start()
ps = env.pageserver
@@ -51,7 +51,7 @@ def test_compute_auth_to_pageserver(zenith_env_builder: ZenithEnvBuilder, with_w
zenith_env_builder.pageserver_auth_enabled = True
if with_wal_acceptors:
zenith_env_builder.num_safekeepers = 3
env = zenith_env_builder.init()
env = zenith_env_builder.init_start()
branch = f"test_compute_auth_to_pageserver{with_wal_acceptors}"
env.zenith_cli.create_branch(branch, "main")

View File

@@ -19,7 +19,7 @@ def test_branch_behind(zenith_env_builder: ZenithEnvBuilder):
#
# See https://github.com/zenithdb/zenith/issues/1068
zenith_env_builder.num_safekeepers = 1
env = zenith_env_builder.init()
env = zenith_env_builder.init_start()
# Branch at the point where only 100 rows were inserted
env.zenith_cli.create_branch("test_branch_behind", "main")

View File

@@ -11,7 +11,7 @@ from fixtures.log_helper import log
def test_next_xid(zenith_env_builder: ZenithEnvBuilder):
# One safekeeper is enough for this test.
zenith_env_builder.num_safekeepers = 1
env = zenith_env_builder.init()
env = zenith_env_builder.init_start()
pg = env.postgres.create_start('main')

View File

@@ -1,8 +1,15 @@
import json
from uuid import uuid4, UUID
from fixtures.zenith_fixtures import ZenithEnv, ZenithEnvBuilder, ZenithPageserverHttpClient
from typing import cast
import pytest, psycopg2
import pytest
from fixtures.zenith_fixtures import ZenithEnv, ZenithEnvBuilder, ZenithPageserverHttpClient, zenith_binpath
# test that we cannot override node id
def test_pageserver_init_node_id(zenith_env_builder: ZenithEnvBuilder):
env = zenith_env_builder.init()
with pytest.raises(
Exception,
match="node id can only be set during pageserver init and cannot be overridden"):
env.pageserver.start(overrides=['--pageserver-config-override=id=10'])
def check_client(client: ZenithPageserverHttpClient, initial_tenant: UUID):
@@ -41,7 +48,7 @@ def test_pageserver_http_api_client(zenith_simple_env: ZenithEnv):
def test_pageserver_http_api_client_auth_enabled(zenith_env_builder: ZenithEnvBuilder):
zenith_env_builder.pageserver_auth_enabled = True
env = zenith_env_builder.init()
env = zenith_env_builder.init_start()
management_token = env.auth_keys.generate_management_token()

View File

@@ -14,7 +14,7 @@ from fixtures.log_helper import log
# and new compute node contains all data.
def test_pageserver_catchup_while_compute_down(zenith_env_builder: ZenithEnvBuilder):
zenith_env_builder.num_safekeepers = 3
env = zenith_env_builder.init()
env = zenith_env_builder.init_start()
env.zenith_cli.create_branch("test_pageserver_catchup_while_compute_down", "main")
pg = env.postgres.create_start('test_pageserver_catchup_while_compute_down')

View File

@@ -13,7 +13,7 @@ from fixtures.log_helper import log
def test_pageserver_restart(zenith_env_builder: ZenithEnvBuilder):
# One safekeeper is enough for this test.
zenith_env_builder.num_safekeepers = 1
env = zenith_env_builder.init()
env = zenith_env_builder.init_start()
env.zenith_cli.create_branch("test_pageserver_restart", "main")
pg = env.postgres.create_start('test_pageserver_restart')

View File

@@ -42,7 +42,7 @@ def test_remote_storage_backup_and_restore(zenith_env_builder: ZenithEnvBuilder,
data_secret = 'very secret secret'
##### First start, insert secret data and upload it to the remote storage
env = zenith_env_builder.init()
env = zenith_env_builder.init_start()
pg = env.postgres.create_start()
tenant_id = pg.safe_psql("show zenith.zenith_tenant")[0][0]

View File

@@ -13,7 +13,7 @@ def test_restart_compute(zenith_env_builder: ZenithEnvBuilder, with_wal_acceptor
zenith_env_builder.pageserver_auth_enabled = True
if with_wal_acceptors:
zenith_env_builder.num_safekeepers = 3
env = zenith_env_builder.init()
env = zenith_env_builder.init_start()
env.zenith_cli.create_branch("test_restart_compute", "main")

View File

@@ -122,7 +122,7 @@ def test_tenant_relocation(zenith_env_builder: ZenithEnvBuilder,
zenith_env_builder.num_safekeepers = 1
zenith_env_builder.enable_local_fs_remote_storage()
env = zenith_env_builder.init()
env = zenith_env_builder.init_start()
# create folder for remote storage mock
remote_storage_mock_path = env.repo_dir / 'local_fs_remote_storage'

View File

@@ -10,7 +10,7 @@ def test_tenants_normal_work(zenith_env_builder: ZenithEnvBuilder, with_wal_acce
if with_wal_acceptors:
zenith_env_builder.num_safekeepers = 3
env = zenith_env_builder.init()
env = zenith_env_builder.init_start()
"""Tests tenants with and without wal acceptors"""
tenant_1 = env.create_tenant()
tenant_2 = env.create_tenant()

View File

@@ -67,7 +67,7 @@ def wait_for_pageserver_catchup(pgmain: Postgres, polling_interval=1, timeout=60
def test_timeline_size_quota(zenith_env_builder: ZenithEnvBuilder):
zenith_env_builder.num_safekeepers = 1
env = zenith_env_builder.init()
env = zenith_env_builder.init_start()
env.zenith_cli.create_branch("test_timeline_size_quota", "main")
client = env.pageserver.http_client()

View File

@@ -22,7 +22,7 @@ from typing import List, Optional, Any
# succeed and data is written
def test_normal_work(zenith_env_builder: ZenithEnvBuilder):
zenith_env_builder.num_safekeepers = 3
env = zenith_env_builder.init()
env = zenith_env_builder.init_start()
env.zenith_cli.create_branch("test_wal_acceptors_normal_work", "main")
@@ -51,7 +51,7 @@ class BranchMetrics:
# against different timelines.
def test_many_timelines(zenith_env_builder: ZenithEnvBuilder):
zenith_env_builder.num_safekeepers = 3
env = zenith_env_builder.init()
env = zenith_env_builder.init_start()
n_timelines = 3
@@ -181,7 +181,7 @@ def test_restarts(zenith_env_builder: ZenithEnvBuilder):
n_acceptors = 3
zenith_env_builder.num_safekeepers = n_acceptors
env = zenith_env_builder.init()
env = zenith_env_builder.init_start()
env.zenith_cli.create_branch("test_wal_acceptors_restarts", "main")
pg = env.postgres.create_start('test_wal_acceptors_restarts')
@@ -218,7 +218,7 @@ def delayed_wal_acceptor_start(wa):
# When majority of acceptors is offline, commits are expected to be frozen
def test_unavailability(zenith_env_builder: ZenithEnvBuilder):
zenith_env_builder.num_safekeepers = 2
env = zenith_env_builder.init()
env = zenith_env_builder.init_start()
env.zenith_cli.create_branch("test_wal_acceptors_unavailability", "main")
pg = env.postgres.create_start('test_wal_acceptors_unavailability')
@@ -289,7 +289,7 @@ def stop_value():
def test_race_conditions(zenith_env_builder: ZenithEnvBuilder, stop_value):
zenith_env_builder.num_safekeepers = 3
env = zenith_env_builder.init()
env = zenith_env_builder.init_start()
env.zenith_cli.create_branch("test_wal_acceptors_race_conditions", "main")
pg = env.postgres.create_start('test_wal_acceptors_race_conditions')
@@ -404,7 +404,7 @@ def test_sync_safekeepers(zenith_env_builder: ZenithEnvBuilder,
# We don't really need the full environment for this test, just the
# safekeepers would be enough.
zenith_env_builder.num_safekeepers = 3
env = zenith_env_builder.init()
env = zenith_env_builder.init_start()
timeline_id = uuid.uuid4()
tenant_id = uuid.uuid4()
@@ -454,7 +454,7 @@ def test_sync_safekeepers(zenith_env_builder: ZenithEnvBuilder,
def test_timeline_status(zenith_env_builder: ZenithEnvBuilder):
zenith_env_builder.num_safekeepers = 1
env = zenith_env_builder.init()
env = zenith_env_builder.init_start()
env.zenith_cli.create_branch("test_timeline_status", "main")
pg = env.postgres.create_start('test_timeline_status')
@@ -521,12 +521,7 @@ class SafekeeperEnv:
http=self.port_distributor.get_port(),
)
if self.num_safekeepers == 1:
name = "single"
else:
name = f"sk{i}"
safekeeper_dir = os.path.join(self.repo_dir, name)
safekeeper_dir = os.path.join(self.repo_dir, f"sk{i}")
mkdir_if_needed(safekeeper_dir)
args = [
@@ -537,6 +532,8 @@ class SafekeeperEnv:
f"127.0.0.1:{port.http}",
"-D",
safekeeper_dir,
"--id",
str(i),
"--daemonize"
]
@@ -604,9 +601,8 @@ def test_safekeeper_without_pageserver(test_output_dir: str,
def test_replace_safekeeper(zenith_env_builder: ZenithEnvBuilder):
def safekeepers_guc(env: ZenithEnv, sk_names: List[str]) -> str:
return ','.join(
[f'localhost:{sk.port.pg}' for sk in env.safekeepers if sk.name in sk_names])
def safekeepers_guc(env: ZenithEnv, sk_names: List[int]) -> str:
return ','.join([f'localhost:{sk.port.pg}' for sk in env.safekeepers if sk.id in sk_names])
def execute_payload(pg: Postgres):
with closing(pg.connect()) as conn:
@@ -628,17 +624,17 @@ def test_replace_safekeeper(zenith_env_builder: ZenithEnvBuilder):
http_cli = sk.http_client()
try:
status = http_cli.timeline_status(tenant_id, timeline_id)
log.info(f"Safekeeper {sk.name} status: {status}")
log.info(f"Safekeeper {sk.id} status: {status}")
except Exception as e:
log.info(f"Safekeeper {sk.name} status error: {e}")
log.info(f"Safekeeper {sk.id} status error: {e}")
zenith_env_builder.num_safekeepers = 4
env = zenith_env_builder.init()
env = zenith_env_builder.init_start()
env.zenith_cli.create_branch("test_replace_safekeeper", "main")
log.info("Use only first 3 safekeepers")
env.safekeepers[3].stop()
active_safekeepers = ['sk1', 'sk2', 'sk3']
active_safekeepers = [1, 2, 3]
pg = env.postgres.create('test_replace_safekeeper')
pg.adjust_for_wal_acceptors(safekeepers_guc(env, active_safekeepers))
pg.start()
@@ -678,7 +674,7 @@ def test_replace_safekeeper(zenith_env_builder: ZenithEnvBuilder):
log.info("Recreate postgres to replace failed sk1 with new sk4")
pg.stop_and_destroy().create('test_replace_safekeeper')
active_safekeepers = ['sk2', 'sk3', 'sk4']
active_safekeepers = [2, 3, 4]
env.safekeepers[3].start()
pg.adjust_for_wal_acceptors(safekeepers_guc(env, active_safekeepers))
pg.start()

View File

@@ -200,7 +200,7 @@ async def run_restarts_under_load(pg: Postgres, acceptors: List[Safekeeper], n_w
# restart acceptors one by one, while executing and validating bank transactions
def test_restarts_under_load(zenith_env_builder: ZenithEnvBuilder):
zenith_env_builder.num_safekeepers = 3
env = zenith_env_builder.init()
env = zenith_env_builder.init_start()
env.zenith_cli.create_branch("test_wal_acceptors_restarts_under_load", "main")
pg = env.postgres.create_start('test_wal_acceptors_restarts_under_load')

View File

@@ -97,7 +97,7 @@ def test_cli_tenant_list(zenith_simple_env: ZenithEnv):
def test_cli_ipv4_listeners(zenith_env_builder: ZenithEnvBuilder):
# Start with single sk
zenith_env_builder.num_safekeepers = 1
env = zenith_env_builder.init()
env = zenith_env_builder.init_start()
# Connect to sk port on v4 loopback
res = requests.get(f'http://127.0.0.1:{env.safekeepers[0].port.http}/v1/status')
@@ -114,7 +114,7 @@ def test_cli_ipv4_listeners(zenith_env_builder: ZenithEnvBuilder):
def test_cli_start_stop(zenith_env_builder: ZenithEnvBuilder):
# Start with single sk
zenith_env_builder.num_safekeepers = 1
env = zenith_env_builder.init()
env = zenith_env_builder.init_start()
# Stop default ps/sk
env.zenith_cli.pageserver_stop()

View File

@@ -27,7 +27,7 @@ from dataclasses import dataclass
# Type-related stuff
from psycopg2.extensions import connection as PgConnection
from typing import Any, Callable, Dict, Iterator, List, Optional, TypeVar, cast, Union, Tuple
from typing import Any, Callable, Dict, Iterable, Iterator, List, Optional, TypeVar, cast, Union, Tuple
from typing_extensions import Literal
import pytest
@@ -425,6 +425,14 @@ class ZenithEnvBuilder:
self.env = ZenithEnv(self)
return self.env
def start(self):
self.env.start()
def init_start(self) -> ZenithEnv:
env = self.init()
self.start()
return env
"""
Sets up the pageserver to use the local fs at the `test_dir/local_fs_remote_storage` path.
Errors, if the pageserver has some remote storage configuration already, unless `force_enable` is not set to `True`.
@@ -540,6 +548,7 @@ class ZenithEnv:
toml += textwrap.dedent(f"""
[pageserver]
id=1
listen_pg_addr = 'localhost:{pageserver_port.pg}'
listen_http_addr = 'localhost:{pageserver_port.http}'
auth_type = '{pageserver_auth_type}'
@@ -556,25 +565,22 @@ class ZenithEnv:
pg=self.port_distributor.get_port(),
http=self.port_distributor.get_port(),
)
if config.num_safekeepers == 1:
name = "single"
else:
name = f"sk{i}"
toml += f"""
[[safekeepers]]
name = '{name}'
pg_port = {port.pg}
http_port = {port.http}
sync = false # Disable fsyncs to make the tests go faster
"""
safekeeper = Safekeeper(env=self, name=name, port=port)
id = i # assign ids sequentially
toml += textwrap.dedent(f"""
[[safekeepers]]
id = {id}
pg_port = {port.pg}
http_port = {port.http}
sync = false # Disable fsyncs to make the tests go faster
""")
safekeeper = Safekeeper(env=self, id=id, port=port)
self.safekeepers.append(safekeeper)
log.info(f"Config: {toml}")
self.zenith_cli.init(toml)
def start(self):
# Start up the page server and all the safekeepers
self.pageserver.start()
@@ -615,7 +621,7 @@ def _shared_simple_env(request: Any, port_distributor) -> Iterator[ZenithEnv]:
with ZenithEnvBuilder(Path(repo_dir), port_distributor) as builder:
env = builder.init()
env = builder.init_start()
# For convenience in tests, create a branch from the freshly-initialized cluster.
env.zenith_cli.create_branch("empty", "main")
@@ -649,7 +655,7 @@ def zenith_env_builder(test_output_dir, port_distributor) -> Iterator[ZenithEnvB
To use, define 'zenith_env_builder' fixture in your test to get access to the
builder object. Set properties on it to describe the environment.
Finally, initialize and start up the environment by calling
zenith_env_builder.init().
zenith_env_builder.init_start().
After the initialization, you can launch compute nodes by calling
the functions in the 'env.postgres' factory object, stop/start the
@@ -835,8 +841,9 @@ class ZenithCli:
return self.raw_cli(cmd)
def pageserver_start(self) -> 'subprocess.CompletedProcess[str]':
start_args = ['pageserver', 'start']
def pageserver_start(self, overrides=()) -> 'subprocess.CompletedProcess[str]':
start_args = ['pageserver', 'start', *overrides]
append_pageserver_param_overrides(start_args, self.env.pageserver.remote_storage)
return self.raw_cli(start_args)
@@ -848,17 +855,17 @@ class ZenithCli:
log.info(f"Stopping pageserver with {cmd}")
return self.raw_cli(cmd)
def safekeeper_start(self, name: str) -> 'subprocess.CompletedProcess[str]':
return self.raw_cli(['safekeeper', 'start', name])
def safekeeper_start(self, id: int) -> 'subprocess.CompletedProcess[str]':
return self.raw_cli(['safekeeper', 'start', str(id)])
def safekeeper_stop(self,
name: Optional[str] = None,
id: Optional[int] = None,
immediate=False) -> 'subprocess.CompletedProcess[str]':
args = ['safekeeper', 'stop']
if id is not None:
args.extend(str(id))
if immediate:
args.extend(['-m', 'immediate'])
if name is not None:
args.append(name)
return self.raw_cli(args)
def pg_create(
@@ -989,14 +996,14 @@ class ZenithPageserver(PgProtocol):
self.service_port = port # do not shadow PgProtocol.port which is just int
self.remote_storage = remote_storage
def start(self) -> 'ZenithPageserver':
def start(self, overrides=()) -> 'ZenithPageserver':
"""
Start the page server.
Returns self.
"""
assert self.running == False
self.env.zenith_cli.pageserver_start()
self.env.zenith_cli.pageserver_start(overrides=overrides)
self.running = True
return self
@@ -1397,12 +1404,14 @@ class Safekeeper:
""" An object representing a running safekeeper daemon. """
env: ZenithEnv
port: SafekeeperPort
name: str # identifier for logging
id: int
auth_token: Optional[str] = None
running: bool = False
def start(self) -> 'Safekeeper':
self.env.zenith_cli.safekeeper_start(self.name)
assert self.running == False
self.env.zenith_cli.safekeeper_start(self.id)
self.running = True
# wait for wal acceptor start by checking its status
started_at = time.time()
while True:
@@ -1420,8 +1429,9 @@ class Safekeeper:
return self
def stop(self, immediate=False) -> 'Safekeeper':
log.info('Stopping safekeeper {}'.format(self.name))
self.env.zenith_cli.safekeeper_stop(self.name, immediate)
log.info('Stopping safekeeper {}'.format(self.id))
self.env.zenith_cli.safekeeper_stop(self.id, immediate)
self.running = False
return self
def append_logical_message(self,

View File

@@ -23,7 +23,7 @@ def test_bulk_tenant_create(
"""Measure tenant creation time (with and without wal acceptors)"""
if use_wal_acceptors == 'with_wa':
zenith_env_builder.num_safekeepers = 3
env = zenith_env_builder.init()
env = zenith_env_builder.init_start()
time_slices = []

View File

@@ -1,17 +1,19 @@
//
// Main entry point for the safekeeper executable
//
use anyhow::{Context, Result};
use anyhow::{bail, Context, Result};
use clap::{App, Arg};
use const_format::formatcp;
use daemonize::Daemonize;
use fs2::FileExt;
use std::fs::File;
use std::fs::{self, File};
use std::io::{ErrorKind, Write};
use std::path::{Path, PathBuf};
use std::thread;
use tracing::*;
use walkeeper::control_file::{self, CreateControlFile};
use zenith_utils::http::endpoint;
use zenith_utils::zid::ZNodeId;
use zenith_utils::{logging, tcp_listener, GIT_VERSION};
use tokio::sync::mpsc;
@@ -25,6 +27,7 @@ use zenith_utils::shutdown::exit_now;
use zenith_utils::signals;
const LOCK_FILE_NAME: &str = "safekeeper.lock";
const ID_FILE_NAME: &str = "safekeeper.id";
fn main() -> Result<()> {
zenith_metrics::set_common_metrics_prefix("safekeeper");
@@ -93,6 +96,9 @@ fn main() -> Result<()> {
.takes_value(true)
.help("Dump control file at path specifed by this argument and exit"),
)
.arg(
Arg::new("id").long("id").takes_value(true).help("safekeeper node id: integer")
)
.get_matches();
if let Some(addr) = arg_matches.value_of("dump-control-file") {
@@ -136,10 +142,19 @@ fn main() -> Result<()> {
conf.recall_period = humantime::parse_duration(recall)?;
}
start_safekeeper(conf)
let mut given_id = None;
if let Some(given_id_str) = arg_matches.value_of("id") {
given_id = Some(ZNodeId(
given_id_str
.parse()
.context("failed to parse safekeeper id")?,
));
}
start_safekeeper(conf, given_id)
}
fn start_safekeeper(conf: SafeKeeperConf) -> Result<()> {
fn start_safekeeper(mut conf: SafeKeeperConf, given_id: Option<ZNodeId>) -> Result<()> {
let log_file = logging::init("safekeeper.log", conf.daemonize)?;
info!("version: {}", GIT_VERSION);
@@ -154,6 +169,9 @@ fn start_safekeeper(conf: SafeKeeperConf) -> Result<()> {
)
})?;
// Set or read our ID.
set_id(&mut conf, given_id)?;
let http_listener = tcp_listener::bind(conf.listen_http_addr.clone()).map_err(|e| {
error!("failed to bind to address {}: {}", conf.listen_http_addr, e);
e
@@ -260,3 +278,49 @@ fn start_safekeeper(conf: SafeKeeperConf) -> Result<()> {
std::process::exit(111);
})
}
/// Determine safekeeper id and set it in config.
fn set_id(conf: &mut SafeKeeperConf, given_id: Option<ZNodeId>) -> Result<()> {
let id_file_path = conf.workdir.join(ID_FILE_NAME);
let my_id: ZNodeId;
// If ID exists, read it in; otherwise set one passed
match fs::read(&id_file_path) {
Ok(id_serialized) => {
my_id = ZNodeId(
std::str::from_utf8(&id_serialized)
.context("failed to parse safekeeper id")?
.parse()
.context("failed to parse safekeeper id")?,
);
if let Some(given_id) = given_id {
if given_id != my_id {
bail!(
"safekeeper already initialized with id {}, can't set {}",
my_id,
given_id
);
}
}
info!("safekeeper ID {}", my_id);
}
Err(error) => match error.kind() {
ErrorKind::NotFound => {
my_id = if let Some(given_id) = given_id {
given_id
} else {
bail!("safekeeper id is not specified");
};
let mut f = File::create(&id_file_path)?;
f.write_all(my_id.to_string().as_bytes())?;
f.sync_all()?;
info!("initialized safekeeper ID {}", my_id);
}
_ => {
return Err(error.into());
}
},
}
conf.my_id = my_id;
Ok(())
}

View File

@@ -5,6 +5,7 @@ use std::fmt::Display;
use std::sync::Arc;
use zenith_utils::http::{RequestExt, RouterBuilder};
use zenith_utils::lsn::Lsn;
use zenith_utils::zid::ZNodeId;
use zenith_utils::zid::ZTenantTimelineId;
use crate::control_file::CreateControlFile;
@@ -18,9 +19,16 @@ use zenith_utils::http::json::json_response;
use zenith_utils::http::request::parse_request_param;
use zenith_utils::zid::{ZTenantId, ZTimelineId};
#[derive(Debug, Serialize)]
struct SafekeeperStatus {
id: ZNodeId,
}
/// Healthcheck handler.
async fn status_handler(_: Request<Body>) -> Result<Response<Body>, ApiError> {
Ok(json_response(StatusCode::OK, "")?)
async fn status_handler(request: Request<Body>) -> Result<Response<Body>, ApiError> {
let conf = get_conf(&request);
let status = SafekeeperStatus { id: conf.my_id };
Ok(json_response(StatusCode::OK, status)?)
}
fn get_conf(request: &Request<Body>) -> &SafeKeeperConf {

View File

@@ -2,7 +2,7 @@
use std::path::PathBuf;
use std::time::Duration;
use zenith_utils::zid::ZTenantTimelineId;
use zenith_utils::zid::{ZNodeId, ZTenantTimelineId};
pub mod callmemaybe;
pub mod control_file;
@@ -46,6 +46,7 @@ pub struct SafeKeeperConf {
pub listen_http_addr: String,
pub ttl: Option<Duration>,
pub recall_period: Duration,
pub my_id: ZNodeId,
}
impl SafeKeeperConf {
@@ -69,6 +70,7 @@ impl Default for SafeKeeperConf {
listen_http_addr: defaults::DEFAULT_HTTP_LISTEN_ADDR.to_string(),
ttl: None,
recall_period: defaults::DEFAULT_RECALL_PERIOD,
my_id: ZNodeId(0),
}
}
}

View File

@@ -18,32 +18,35 @@ use walkeeper::defaults::{
};
use zenith_utils::auth::{Claims, Scope};
use zenith_utils::postgres_backend::AuthType;
use zenith_utils::zid::{ZTenantId, ZTimelineId};
use zenith_utils::zid::{ZNodeId, ZTenantId, ZTimelineId};
use zenith_utils::GIT_VERSION;
use pageserver::branches::BranchInfo;
// Default name of a safekeeper node, if not specified on the command line.
const DEFAULT_SAFEKEEPER_NAME: &str = "single";
// Default id of a safekeeper node, if not specified on the command line.
const DEFAULT_SAFEKEEPER_ID: ZNodeId = ZNodeId(1);
const DEFAULT_PAGESERVER_ID: ZNodeId = ZNodeId(1);
fn default_conf() -> String {
format!(
r#"
# Default built-in configuration, defined in main.rs
[pageserver]
id = {pageserver_id}
listen_pg_addr = '{pageserver_pg_addr}'
listen_http_addr = '{pageserver_http_addr}'
auth_type = '{pageserver_auth_type}'
[[safekeepers]]
name = '{safekeeper_name}'
id = {safekeeper_id}
pg_port = {safekeeper_pg_port}
http_port = {safekeeper_http_port}
"#,
pageserver_id = DEFAULT_PAGESERVER_ID,
pageserver_pg_addr = DEFAULT_PAGESERVER_PG_ADDR,
pageserver_http_addr = DEFAULT_PAGESERVER_HTTP_ADDR,
pageserver_auth_type = AuthType::Trust,
safekeeper_name = DEFAULT_SAFEKEEPER_NAME,
safekeeper_id = DEFAULT_SAFEKEEPER_ID,
safekeeper_pg_port = DEFAULT_SAFEKEEPER_PG_PORT,
safekeeper_http_port = DEFAULT_SAFEKEEPER_HTTP_PORT,
)
@@ -74,9 +77,9 @@ fn main() -> Result<()> {
.required(true);
#[rustfmt::skip]
let safekeeper_node_arg = Arg::new("node")
let safekeeper_id_arg = Arg::new("id")
.index(1)
.help("Node name")
.help("safekeeper id")
.required(false);
let timeline_arg = Arg::new("timeline")
@@ -154,16 +157,16 @@ fn main() -> Result<()> {
.about("Manage safekeepers")
.subcommand(App::new("start")
.about("Start local safekeeper")
.arg(safekeeper_node_arg.clone())
.arg(safekeeper_id_arg.clone())
)
.subcommand(App::new("stop")
.about("Stop local safekeeper")
.arg(safekeeper_node_arg.clone())
.arg(safekeeper_id_arg.clone())
.arg(stop_mode_arg.clone())
)
.subcommand(App::new("restart")
.about("Restart local safekeeper")
.arg(safekeeper_node_arg.clone())
.arg(safekeeper_id_arg.clone())
.arg(stop_mode_arg.clone())
)
)
@@ -628,11 +631,11 @@ fn handle_pageserver(sub_match: &ArgMatches, env: &local_env::LocalEnv) -> Resul
Ok(())
}
fn get_safekeeper(env: &local_env::LocalEnv, name: &str) -> Result<SafekeeperNode> {
if let Some(node) = env.safekeepers.iter().find(|node| node.name == name) {
fn get_safekeeper(env: &local_env::LocalEnv, id: ZNodeId) -> Result<SafekeeperNode> {
if let Some(node) = env.safekeepers.iter().find(|node| node.id == id) {
Ok(SafekeeperNode::from_env(env, node))
} else {
bail!("could not find safekeeper '{}'", name)
bail!("could not find safekeeper '{}'", id)
}
}
@@ -643,8 +646,12 @@ fn handle_safekeeper(sub_match: &ArgMatches, env: &local_env::LocalEnv) -> Resul
};
// All the commands take an optional safekeeper name argument
let node_name = sub_args.value_of("node").unwrap_or(DEFAULT_SAFEKEEPER_NAME);
let safekeeper = get_safekeeper(env, node_name)?;
let sk_id = if let Some(id_str) = sub_args.value_of("id") {
ZNodeId(id_str.parse().context("while parsing safekeeper id")?)
} else {
DEFAULT_SAFEKEEPER_ID
};
let safekeeper = get_safekeeper(env, sk_id)?;
match sub_name {
"start" => {
@@ -697,7 +704,7 @@ fn handle_start_all(sub_match: &ArgMatches, env: &local_env::LocalEnv) -> Result
for node in env.safekeepers.iter() {
let safekeeper = SafekeeperNode::from_env(env, node);
if let Err(e) = safekeeper.start() {
eprintln!("safekeeper '{}' start failed: {}", safekeeper.name, e);
eprintln!("safekeeper '{}' start failed: {}", safekeeper.id, e);
exit(1);
}
}
@@ -724,7 +731,7 @@ fn handle_stop_all(sub_match: &ArgMatches, env: &local_env::LocalEnv) -> Result<
for node in env.safekeepers.iter() {
let safekeeper = SafekeeperNode::from_env(env, node);
if let Err(e) = safekeeper.stop(immediate) {
eprintln!("safekeeper '{}' stop failed: {}", safekeeper.name, e);
eprintln!("safekeeper '{}' stop failed: {}", safekeeper.id, e);
}
}
Ok(())

View File

@@ -221,6 +221,18 @@ impl fmt::Display for ZTenantTimelineId {
}
}
// Unique ID of a storage node (safekeeper or pageserver). Supposed to be issued
// by the console.
#[derive(Clone, Copy, Eq, Ord, PartialEq, PartialOrd, Debug, Serialize, Deserialize)]
#[serde(transparent)]
pub struct ZNodeId(pub u64);
impl fmt::Display for ZNodeId {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(f, "{}", self.0)
}
}
#[cfg(test)]
mod tests {
use std::fmt::Display;