mirror of
https://github.com/neondatabase/neon.git
synced 2026-05-28 02:20:42 +00:00
add node id to pageserver (#1310)
* Add --id argument to safekeeper setting its unique u64 id. In preparation for storage node messaging. IDs are supposed to be monotonically assigned by the console. In tests it is issued by ZenithEnv; at the zenith cli level and fixtures, string name is completely replaced by integer id. Example TOML configs are adjusted accordingly. Sequential ids are chosen over Zid mainly because they are compact and easy to type/remember. * add node id to pageserver This adds node id parameter to pageserver configuration. Also I use a simple builder to construct pageserver config struct to avoid setting node id to some temporary invalid value. Some of the changes in test fixtures are needed to split init and start operations for envrionment. Co-authored-by: Arseny Sher <sher-ars@yandex.ru>
This commit is contained in:
@@ -5,16 +5,16 @@ listen_http_addr = '127.0.0.1:9898'
|
||||
auth_type = 'Trust'
|
||||
|
||||
[[safekeepers]]
|
||||
name = 'sk1'
|
||||
id = 1
|
||||
pg_port = 5454
|
||||
http_port = 7676
|
||||
|
||||
[[safekeepers]]
|
||||
name = 'sk2'
|
||||
id = 2
|
||||
pg_port = 5455
|
||||
http_port = 7677
|
||||
|
||||
[[safekeepers]]
|
||||
name = 'sk3'
|
||||
id = 3
|
||||
pg_port = 5456
|
||||
http_port = 7678
|
||||
|
||||
@@ -6,6 +6,6 @@ listen_http_addr = '127.0.0.1:9898'
|
||||
auth_type = 'Trust'
|
||||
|
||||
[[safekeepers]]
|
||||
name = 'single'
|
||||
id = 1
|
||||
pg_port = 5454
|
||||
http_port = 7676
|
||||
|
||||
@@ -12,7 +12,9 @@ use std::path::{Path, PathBuf};
|
||||
use std::process::{Command, Stdio};
|
||||
use zenith_utils::auth::{encode_from_key_file, Claims, Scope};
|
||||
use zenith_utils::postgres_backend::AuthType;
|
||||
use zenith_utils::zid::{opt_display_serde, ZTenantId};
|
||||
use zenith_utils::zid::{opt_display_serde, ZNodeId, ZTenantId};
|
||||
|
||||
use crate::safekeeper::SafekeeperNode;
|
||||
|
||||
//
|
||||
// This data structures represents zenith CLI config
|
||||
@@ -62,6 +64,8 @@ pub struct LocalEnv {
|
||||
#[derive(Serialize, Deserialize, Clone, Debug)]
|
||||
#[serde(default)]
|
||||
pub struct PageServerConf {
|
||||
// node id
|
||||
pub id: ZNodeId,
|
||||
// Pageserver connection settings
|
||||
pub listen_pg_addr: String,
|
||||
pub listen_http_addr: String,
|
||||
@@ -76,6 +80,7 @@ pub struct PageServerConf {
|
||||
impl Default for PageServerConf {
|
||||
fn default() -> Self {
|
||||
Self {
|
||||
id: ZNodeId(0),
|
||||
listen_pg_addr: String::new(),
|
||||
listen_http_addr: String::new(),
|
||||
auth_type: AuthType::Trust,
|
||||
@@ -87,7 +92,7 @@ impl Default for PageServerConf {
|
||||
#[derive(Serialize, Deserialize, Clone, Debug)]
|
||||
#[serde(default)]
|
||||
pub struct SafekeeperConf {
|
||||
pub name: String,
|
||||
pub id: ZNodeId,
|
||||
pub pg_port: u16,
|
||||
pub http_port: u16,
|
||||
pub sync: bool,
|
||||
@@ -96,7 +101,7 @@ pub struct SafekeeperConf {
|
||||
impl Default for SafekeeperConf {
|
||||
fn default() -> Self {
|
||||
Self {
|
||||
name: String::new(),
|
||||
id: ZNodeId(0),
|
||||
pg_port: 0,
|
||||
http_port: 0,
|
||||
sync: true,
|
||||
@@ -136,8 +141,8 @@ impl LocalEnv {
|
||||
self.base_data_dir.clone()
|
||||
}
|
||||
|
||||
pub fn safekeeper_data_dir(&self, node_name: &str) -> PathBuf {
|
||||
self.base_data_dir.join("safekeepers").join(node_name)
|
||||
pub fn safekeeper_data_dir(&self, data_dir_name: &str) -> PathBuf {
|
||||
self.base_data_dir.join("safekeepers").join(data_dir_name)
|
||||
}
|
||||
|
||||
/// Create a LocalEnv from a config file.
|
||||
@@ -285,7 +290,7 @@ impl LocalEnv {
|
||||
fs::create_dir_all(self.pg_data_dirs_path())?;
|
||||
|
||||
for safekeeper in &self.safekeepers {
|
||||
fs::create_dir_all(self.safekeeper_data_dir(&safekeeper.name))?;
|
||||
fs::create_dir_all(SafekeeperNode::datadir_path_by_id(self, safekeeper.id))?;
|
||||
}
|
||||
|
||||
let mut conf_content = String::new();
|
||||
|
||||
@@ -15,6 +15,7 @@ use reqwest::blocking::{Client, RequestBuilder, Response};
|
||||
use reqwest::{IntoUrl, Method};
|
||||
use thiserror::Error;
|
||||
use zenith_utils::http::error::HttpErrorBody;
|
||||
use zenith_utils::zid::ZNodeId;
|
||||
|
||||
use crate::local_env::{LocalEnv, SafekeeperConf};
|
||||
use crate::storage::PageServerNode;
|
||||
@@ -61,7 +62,7 @@ impl ResponseErrorMessageExt for Response {
|
||||
//
|
||||
#[derive(Debug)]
|
||||
pub struct SafekeeperNode {
|
||||
pub name: String,
|
||||
pub id: ZNodeId,
|
||||
|
||||
pub conf: SafekeeperConf,
|
||||
|
||||
@@ -77,10 +78,10 @@ impl SafekeeperNode {
|
||||
pub fn from_env(env: &LocalEnv, conf: &SafekeeperConf) -> SafekeeperNode {
|
||||
let pageserver = Arc::new(PageServerNode::from_env(env));
|
||||
|
||||
println!("initializing for {} for {}", conf.name, conf.http_port);
|
||||
println!("initializing for sk {} for {}", conf.id, conf.http_port);
|
||||
|
||||
SafekeeperNode {
|
||||
name: conf.name.clone(),
|
||||
id: conf.id,
|
||||
conf: conf.clone(),
|
||||
pg_connection_config: Self::safekeeper_connection_config(conf.pg_port),
|
||||
env: env.clone(),
|
||||
@@ -98,8 +99,12 @@ impl SafekeeperNode {
|
||||
.unwrap()
|
||||
}
|
||||
|
||||
pub fn datadir_path_by_id(env: &LocalEnv, sk_id: ZNodeId) -> PathBuf {
|
||||
env.safekeeper_data_dir(format!("sk{}", sk_id).as_ref())
|
||||
}
|
||||
|
||||
pub fn datadir_path(&self) -> PathBuf {
|
||||
self.env.safekeeper_data_dir(&self.name)
|
||||
SafekeeperNode::datadir_path_by_id(&self.env, self.id)
|
||||
}
|
||||
|
||||
pub fn pid_file(&self) -> PathBuf {
|
||||
@@ -120,6 +125,7 @@ impl SafekeeperNode {
|
||||
let mut cmd = Command::new(self.env.safekeeper_bin()?);
|
||||
fill_rust_env_vars(
|
||||
cmd.args(&["-D", self.datadir_path().to_str().unwrap()])
|
||||
.args(&["--id", self.id.to_string().as_ref()])
|
||||
.args(&["--listen-pg", &listen_pg])
|
||||
.args(&["--listen-http", &listen_http])
|
||||
.args(&["--recall", "1 second"])
|
||||
@@ -183,7 +189,7 @@ impl SafekeeperNode {
|
||||
pub fn stop(&self, immediate: bool) -> anyhow::Result<()> {
|
||||
let pid_file = self.pid_file();
|
||||
if !pid_file.exists() {
|
||||
println!("Safekeeper {} is already stopped", self.name);
|
||||
println!("Safekeeper {} is already stopped", self.id);
|
||||
return Ok(());
|
||||
}
|
||||
let pid = read_pidfile(&pid_file)?;
|
||||
|
||||
@@ -103,6 +103,8 @@ impl PageServerNode {
|
||||
) -> anyhow::Result<()> {
|
||||
let mut cmd = Command::new(self.env.pageserver_bin()?);
|
||||
|
||||
let id = format!("id={}", self.env.pageserver.id);
|
||||
|
||||
// FIXME: the paths should be shell-escaped to handle paths with spaces, quotas etc.
|
||||
let base_data_dir_param = self.env.base_data_dir.display().to_string();
|
||||
let pg_distrib_dir_param =
|
||||
@@ -122,6 +124,7 @@ impl PageServerNode {
|
||||
args.extend(["-c", &authg_type_param]);
|
||||
args.extend(["-c", &listen_http_addr_param]);
|
||||
args.extend(["-c", &listen_pg_addr_param]);
|
||||
args.extend(["-c", &id]);
|
||||
|
||||
for config_override in config_overrides {
|
||||
args.extend(["-c", config_override]);
|
||||
|
||||
@@ -4,7 +4,7 @@ set -eux
|
||||
if [ "$1" = 'pageserver' ]; then
|
||||
if [ ! -d "/data/tenants" ]; then
|
||||
echo "Initializing pageserver data directory"
|
||||
pageserver --init -D /data -c "pg_distrib_dir='/usr/local'"
|
||||
pageserver --init -D /data -c "pg_distrib_dir='/usr/local'" -c "id=10"
|
||||
fi
|
||||
echo "Staring pageserver at 0.0.0.0:6400"
|
||||
pageserver -c "listen_pg_addr='0.0.0.0:6400'" -c "listen_http_addr='0.0.0.0:9898'" -D /data
|
||||
|
||||
@@ -61,7 +61,7 @@ fn main() -> Result<()> {
|
||||
.number_of_values(1)
|
||||
.multiple_occurrences(true)
|
||||
.help("Additional configuration overrides of the ones from the toml config file (or new ones to add there).
|
||||
Any option has to be a valid toml document, example: `-c \"foo='hey'\"` `-c \"foo={value=1}\"`"),
|
||||
Any option has to be a valid toml document, example: `-c=\"foo='hey'\"` `-c=\"foo={value=1}\"`"),
|
||||
)
|
||||
.get_matches();
|
||||
|
||||
@@ -115,7 +115,14 @@ fn main() -> Result<()> {
|
||||
option_line
|
||||
)
|
||||
})?;
|
||||
|
||||
for (key, item) in doc.iter() {
|
||||
if key == "id" {
|
||||
anyhow::ensure!(
|
||||
init,
|
||||
"node id can only be set during pageserver init and cannot be overridden"
|
||||
);
|
||||
}
|
||||
toml.insert(key, item.clone());
|
||||
}
|
||||
}
|
||||
|
||||
@@ -8,7 +8,7 @@ use anyhow::{bail, ensure, Context, Result};
|
||||
use toml_edit;
|
||||
use toml_edit::{Document, Item};
|
||||
use zenith_utils::postgres_backend::AuthType;
|
||||
use zenith_utils::zid::{ZTenantId, ZTimelineId};
|
||||
use zenith_utils::zid::{ZNodeId, ZTenantId, ZTimelineId};
|
||||
|
||||
use std::convert::TryInto;
|
||||
use std::env;
|
||||
@@ -78,6 +78,10 @@ pub mod defaults {
|
||||
|
||||
#[derive(Debug, Clone, PartialEq, Eq)]
|
||||
pub struct PageServerConf {
|
||||
// Identifier of that particular pageserver so e g safekeepers
|
||||
// can safely distinguish different pageservers
|
||||
pub id: ZNodeId,
|
||||
|
||||
/// Example (default): 127.0.0.1:64000
|
||||
pub listen_pg_addr: String,
|
||||
/// Example (default): 127.0.0.1:9898
|
||||
@@ -118,6 +122,206 @@ pub struct PageServerConf {
|
||||
pub remote_storage_config: Option<RemoteStorageConfig>,
|
||||
}
|
||||
|
||||
// use dedicated enum for builder to better indicate the intention
|
||||
// and avoid possible confusion with nested options
|
||||
pub enum BuilderValue<T> {
|
||||
Set(T),
|
||||
NotSet,
|
||||
}
|
||||
|
||||
impl<T> BuilderValue<T> {
|
||||
pub fn ok_or<E>(self, err: E) -> Result<T, E> {
|
||||
match self {
|
||||
Self::Set(v) => Ok(v),
|
||||
Self::NotSet => Err(err),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// needed to simplify config construction
|
||||
struct PageServerConfigBuilder {
|
||||
listen_pg_addr: BuilderValue<String>,
|
||||
|
||||
listen_http_addr: BuilderValue<String>,
|
||||
|
||||
checkpoint_distance: BuilderValue<u64>,
|
||||
checkpoint_period: BuilderValue<Duration>,
|
||||
|
||||
gc_horizon: BuilderValue<u64>,
|
||||
gc_period: BuilderValue<Duration>,
|
||||
|
||||
wait_lsn_timeout: BuilderValue<Duration>,
|
||||
wal_redo_timeout: BuilderValue<Duration>,
|
||||
|
||||
superuser: BuilderValue<String>,
|
||||
|
||||
page_cache_size: BuilderValue<usize>,
|
||||
max_file_descriptors: BuilderValue<usize>,
|
||||
|
||||
workdir: BuilderValue<PathBuf>,
|
||||
|
||||
pg_distrib_dir: BuilderValue<PathBuf>,
|
||||
|
||||
auth_type: BuilderValue<AuthType>,
|
||||
|
||||
//
|
||||
auth_validation_public_key_path: BuilderValue<Option<PathBuf>>,
|
||||
remote_storage_config: BuilderValue<Option<RemoteStorageConfig>>,
|
||||
|
||||
id: BuilderValue<ZNodeId>,
|
||||
}
|
||||
|
||||
impl Default for PageServerConfigBuilder {
|
||||
fn default() -> Self {
|
||||
use self::BuilderValue::*;
|
||||
use defaults::*;
|
||||
Self {
|
||||
listen_pg_addr: Set(DEFAULT_PG_LISTEN_ADDR.to_string()),
|
||||
listen_http_addr: Set(DEFAULT_HTTP_LISTEN_ADDR.to_string()),
|
||||
checkpoint_distance: Set(DEFAULT_CHECKPOINT_DISTANCE),
|
||||
checkpoint_period: Set(humantime::parse_duration(DEFAULT_CHECKPOINT_PERIOD)
|
||||
.expect("cannot parse default checkpoint period")),
|
||||
gc_horizon: Set(DEFAULT_GC_HORIZON),
|
||||
gc_period: Set(humantime::parse_duration(DEFAULT_GC_PERIOD)
|
||||
.expect("cannot parse default gc period")),
|
||||
wait_lsn_timeout: Set(humantime::parse_duration(DEFAULT_WAIT_LSN_TIMEOUT)
|
||||
.expect("cannot parse default wait lsn timeout")),
|
||||
wal_redo_timeout: Set(humantime::parse_duration(DEFAULT_WAL_REDO_TIMEOUT)
|
||||
.expect("cannot parse default wal redo timeout")),
|
||||
superuser: Set(DEFAULT_SUPERUSER.to_string()),
|
||||
page_cache_size: Set(DEFAULT_PAGE_CACHE_SIZE),
|
||||
max_file_descriptors: Set(DEFAULT_MAX_FILE_DESCRIPTORS),
|
||||
workdir: Set(PathBuf::new()),
|
||||
pg_distrib_dir: Set(env::current_dir()
|
||||
.expect("cannot access current directory")
|
||||
.join("tmp_install")),
|
||||
auth_type: Set(AuthType::Trust),
|
||||
auth_validation_public_key_path: Set(None),
|
||||
remote_storage_config: Set(None),
|
||||
id: NotSet,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl PageServerConfigBuilder {
|
||||
pub fn listen_pg_addr(&mut self, listen_pg_addr: String) {
|
||||
self.listen_pg_addr = BuilderValue::Set(listen_pg_addr)
|
||||
}
|
||||
|
||||
pub fn listen_http_addr(&mut self, listen_http_addr: String) {
|
||||
self.listen_http_addr = BuilderValue::Set(listen_http_addr)
|
||||
}
|
||||
|
||||
pub fn checkpoint_distance(&mut self, checkpoint_distance: u64) {
|
||||
self.checkpoint_distance = BuilderValue::Set(checkpoint_distance)
|
||||
}
|
||||
|
||||
pub fn checkpoint_period(&mut self, checkpoint_period: Duration) {
|
||||
self.checkpoint_period = BuilderValue::Set(checkpoint_period)
|
||||
}
|
||||
|
||||
pub fn gc_horizon(&mut self, gc_horizon: u64) {
|
||||
self.gc_horizon = BuilderValue::Set(gc_horizon)
|
||||
}
|
||||
|
||||
pub fn gc_period(&mut self, gc_period: Duration) {
|
||||
self.gc_period = BuilderValue::Set(gc_period)
|
||||
}
|
||||
|
||||
pub fn wait_lsn_timeout(&mut self, wait_lsn_timeout: Duration) {
|
||||
self.wait_lsn_timeout = BuilderValue::Set(wait_lsn_timeout)
|
||||
}
|
||||
|
||||
pub fn wal_redo_timeout(&mut self, wal_redo_timeout: Duration) {
|
||||
self.wal_redo_timeout = BuilderValue::Set(wal_redo_timeout)
|
||||
}
|
||||
|
||||
pub fn superuser(&mut self, superuser: String) {
|
||||
self.superuser = BuilderValue::Set(superuser)
|
||||
}
|
||||
|
||||
pub fn page_cache_size(&mut self, page_cache_size: usize) {
|
||||
self.page_cache_size = BuilderValue::Set(page_cache_size)
|
||||
}
|
||||
|
||||
pub fn max_file_descriptors(&mut self, max_file_descriptors: usize) {
|
||||
self.max_file_descriptors = BuilderValue::Set(max_file_descriptors)
|
||||
}
|
||||
|
||||
pub fn workdir(&mut self, workdir: PathBuf) {
|
||||
self.workdir = BuilderValue::Set(workdir)
|
||||
}
|
||||
|
||||
pub fn pg_distrib_dir(&mut self, pg_distrib_dir: PathBuf) {
|
||||
self.pg_distrib_dir = BuilderValue::Set(pg_distrib_dir)
|
||||
}
|
||||
|
||||
pub fn auth_type(&mut self, auth_type: AuthType) {
|
||||
self.auth_type = BuilderValue::Set(auth_type)
|
||||
}
|
||||
|
||||
pub fn auth_validation_public_key_path(
|
||||
&mut self,
|
||||
auth_validation_public_key_path: Option<PathBuf>,
|
||||
) {
|
||||
self.auth_validation_public_key_path = BuilderValue::Set(auth_validation_public_key_path)
|
||||
}
|
||||
|
||||
pub fn remote_storage_config(&mut self, remote_storage_config: Option<RemoteStorageConfig>) {
|
||||
self.remote_storage_config = BuilderValue::Set(remote_storage_config)
|
||||
}
|
||||
|
||||
pub fn id(&mut self, node_id: ZNodeId) {
|
||||
self.id = BuilderValue::Set(node_id)
|
||||
}
|
||||
|
||||
pub fn build(self) -> Result<PageServerConf> {
|
||||
Ok(PageServerConf {
|
||||
listen_pg_addr: self
|
||||
.listen_pg_addr
|
||||
.ok_or(anyhow::anyhow!("missing listen_pg_addr"))?,
|
||||
listen_http_addr: self
|
||||
.listen_http_addr
|
||||
.ok_or(anyhow::anyhow!("missing listen_http_addr"))?,
|
||||
checkpoint_distance: self
|
||||
.checkpoint_distance
|
||||
.ok_or(anyhow::anyhow!("missing checkpoint_distance"))?,
|
||||
checkpoint_period: self
|
||||
.checkpoint_period
|
||||
.ok_or(anyhow::anyhow!("missing checkpoint_period"))?,
|
||||
gc_horizon: self
|
||||
.gc_horizon
|
||||
.ok_or(anyhow::anyhow!("missing gc_horizon"))?,
|
||||
gc_period: self.gc_period.ok_or(anyhow::anyhow!("missing gc_period"))?,
|
||||
wait_lsn_timeout: self
|
||||
.wait_lsn_timeout
|
||||
.ok_or(anyhow::anyhow!("missing wait_lsn_timeout"))?,
|
||||
wal_redo_timeout: self
|
||||
.wal_redo_timeout
|
||||
.ok_or(anyhow::anyhow!("missing wal_redo_timeout"))?,
|
||||
superuser: self.superuser.ok_or(anyhow::anyhow!("missing superuser"))?,
|
||||
page_cache_size: self
|
||||
.page_cache_size
|
||||
.ok_or(anyhow::anyhow!("missing page_cache_size"))?,
|
||||
max_file_descriptors: self
|
||||
.max_file_descriptors
|
||||
.ok_or(anyhow::anyhow!("missing max_file_descriptors"))?,
|
||||
workdir: self.workdir.ok_or(anyhow::anyhow!("missing workdir"))?,
|
||||
pg_distrib_dir: self
|
||||
.pg_distrib_dir
|
||||
.ok_or(anyhow::anyhow!("missing pg_distrib_dir"))?,
|
||||
auth_type: self.auth_type.ok_or(anyhow::anyhow!("missing auth_type"))?,
|
||||
auth_validation_public_key_path: self
|
||||
.auth_validation_public_key_path
|
||||
.ok_or(anyhow::anyhow!("missing auth_validation_public_key_path"))?,
|
||||
remote_storage_config: self
|
||||
.remote_storage_config
|
||||
.ok_or(anyhow::anyhow!("missing remote_storage_config"))?,
|
||||
id: self.id.ok_or(anyhow::anyhow!("missing id"))?,
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
/// External backup storage configuration, enough for creating a client for that storage.
|
||||
#[derive(Debug, Clone, PartialEq, Eq)]
|
||||
pub struct RemoteStorageConfig {
|
||||
@@ -233,61 +437,41 @@ impl PageServerConf {
|
||||
///
|
||||
/// This leaves any options not present in the file in the built-in defaults.
|
||||
pub fn parse_and_validate(toml: &Document, workdir: &Path) -> Result<Self> {
|
||||
use defaults::*;
|
||||
|
||||
let mut conf = PageServerConf {
|
||||
workdir: workdir.to_path_buf(),
|
||||
|
||||
listen_pg_addr: DEFAULT_PG_LISTEN_ADDR.to_string(),
|
||||
listen_http_addr: DEFAULT_HTTP_LISTEN_ADDR.to_string(),
|
||||
checkpoint_distance: DEFAULT_CHECKPOINT_DISTANCE,
|
||||
checkpoint_period: humantime::parse_duration(DEFAULT_CHECKPOINT_PERIOD)?,
|
||||
gc_horizon: DEFAULT_GC_HORIZON,
|
||||
gc_period: humantime::parse_duration(DEFAULT_GC_PERIOD)?,
|
||||
wait_lsn_timeout: humantime::parse_duration(DEFAULT_WAIT_LSN_TIMEOUT)?,
|
||||
wal_redo_timeout: humantime::parse_duration(DEFAULT_WAL_REDO_TIMEOUT)?,
|
||||
page_cache_size: DEFAULT_PAGE_CACHE_SIZE,
|
||||
max_file_descriptors: DEFAULT_MAX_FILE_DESCRIPTORS,
|
||||
|
||||
pg_distrib_dir: PathBuf::new(),
|
||||
auth_validation_public_key_path: None,
|
||||
auth_type: AuthType::Trust,
|
||||
|
||||
remote_storage_config: None,
|
||||
|
||||
superuser: DEFAULT_SUPERUSER.to_string(),
|
||||
};
|
||||
let mut builder = PageServerConfigBuilder::default();
|
||||
builder.workdir(workdir.to_owned());
|
||||
|
||||
for (key, item) in toml.iter() {
|
||||
match key {
|
||||
"listen_pg_addr" => conf.listen_pg_addr = parse_toml_string(key, item)?,
|
||||
"listen_http_addr" => conf.listen_http_addr = parse_toml_string(key, item)?,
|
||||
"checkpoint_distance" => conf.checkpoint_distance = parse_toml_u64(key, item)?,
|
||||
"checkpoint_period" => conf.checkpoint_period = parse_toml_duration(key, item)?,
|
||||
"gc_horizon" => conf.gc_horizon = parse_toml_u64(key, item)?,
|
||||
"gc_period" => conf.gc_period = parse_toml_duration(key, item)?,
|
||||
"wait_lsn_timeout" => conf.wait_lsn_timeout = parse_toml_duration(key, item)?,
|
||||
"wal_redo_timeout" => conf.wal_redo_timeout = parse_toml_duration(key, item)?,
|
||||
"initial_superuser_name" => conf.superuser = parse_toml_string(key, item)?,
|
||||
"page_cache_size" => conf.page_cache_size = parse_toml_u64(key, item)? as usize,
|
||||
"listen_pg_addr" => builder.listen_pg_addr(parse_toml_string(key, item)?),
|
||||
"listen_http_addr" => builder.listen_http_addr(parse_toml_string(key, item)?),
|
||||
"checkpoint_distance" => builder.checkpoint_distance(parse_toml_u64(key, item)?),
|
||||
"checkpoint_period" => builder.checkpoint_period(parse_toml_duration(key, item)?),
|
||||
"gc_horizon" => builder.gc_horizon(parse_toml_u64(key, item)?),
|
||||
"gc_period" => builder.gc_period(parse_toml_duration(key, item)?),
|
||||
"wait_lsn_timeout" => builder.wait_lsn_timeout(parse_toml_duration(key, item)?),
|
||||
"wal_redo_timeout" => builder.wal_redo_timeout(parse_toml_duration(key, item)?),
|
||||
"initial_superuser_name" => builder.superuser(parse_toml_string(key, item)?),
|
||||
"page_cache_size" => builder.page_cache_size(parse_toml_u64(key, item)? as usize),
|
||||
"max_file_descriptors" => {
|
||||
conf.max_file_descriptors = parse_toml_u64(key, item)? as usize
|
||||
builder.max_file_descriptors(parse_toml_u64(key, item)? as usize)
|
||||
}
|
||||
"pg_distrib_dir" => {
|
||||
conf.pg_distrib_dir = PathBuf::from(parse_toml_string(key, item)?)
|
||||
builder.pg_distrib_dir(PathBuf::from(parse_toml_string(key, item)?))
|
||||
}
|
||||
"auth_validation_public_key_path" => {
|
||||
conf.auth_validation_public_key_path =
|
||||
Some(PathBuf::from(parse_toml_string(key, item)?))
|
||||
}
|
||||
"auth_type" => conf.auth_type = parse_toml_auth_type(key, item)?,
|
||||
"auth_validation_public_key_path" => builder.auth_validation_public_key_path(Some(
|
||||
PathBuf::from(parse_toml_string(key, item)?),
|
||||
)),
|
||||
"auth_type" => builder.auth_type(parse_toml_auth_type(key, item)?),
|
||||
"remote_storage" => {
|
||||
conf.remote_storage_config = Some(Self::parse_remote_storage_config(item)?)
|
||||
builder.remote_storage_config(Some(Self::parse_remote_storage_config(item)?))
|
||||
}
|
||||
"id" => builder.id(ZNodeId(parse_toml_u64(key, item)?)),
|
||||
_ => bail!("unrecognized pageserver option '{}'", key),
|
||||
}
|
||||
}
|
||||
|
||||
let mut conf = builder.build().context("invalid config")?;
|
||||
|
||||
if conf.auth_type == AuthType::ZenithJWT {
|
||||
let auth_validation_public_key_path = conf
|
||||
.auth_validation_public_key_path
|
||||
@@ -301,9 +485,6 @@ impl PageServerConf {
|
||||
);
|
||||
}
|
||||
|
||||
if conf.pg_distrib_dir == PathBuf::new() {
|
||||
conf.pg_distrib_dir = env::current_dir()?.join("tmp_install")
|
||||
};
|
||||
if !conf.pg_distrib_dir.join("bin/postgres").exists() {
|
||||
bail!(
|
||||
"Can't find postgres binary at {}",
|
||||
@@ -398,6 +579,7 @@ impl PageServerConf {
|
||||
#[cfg(test)]
|
||||
pub fn dummy_conf(repo_dir: PathBuf) -> Self {
|
||||
PageServerConf {
|
||||
id: ZNodeId(0),
|
||||
checkpoint_distance: defaults::DEFAULT_CHECKPOINT_DISTANCE,
|
||||
checkpoint_period: Duration::from_secs(10),
|
||||
gc_horizon: defaults::DEFAULT_GC_HORIZON,
|
||||
@@ -482,15 +664,16 @@ max_file_descriptors = 333
|
||||
|
||||
# initial superuser role name to use when creating a new tenant
|
||||
initial_superuser_name = 'zzzz'
|
||||
id = 10
|
||||
|
||||
"#;
|
||||
"#;
|
||||
|
||||
#[test]
|
||||
fn parse_defaults() -> anyhow::Result<()> {
|
||||
let tempdir = tempdir()?;
|
||||
let (workdir, pg_distrib_dir) = prepare_fs(&tempdir)?;
|
||||
// we have to create dummy pathes to overcome the validation errors
|
||||
let config_string = format!("pg_distrib_dir='{}'", pg_distrib_dir.display());
|
||||
let config_string = format!("pg_distrib_dir='{}'\nid=10", pg_distrib_dir.display());
|
||||
let toml = config_string.parse()?;
|
||||
|
||||
let parsed_config =
|
||||
@@ -501,6 +684,7 @@ initial_superuser_name = 'zzzz'
|
||||
assert_eq!(
|
||||
parsed_config,
|
||||
PageServerConf {
|
||||
id: ZNodeId(10),
|
||||
listen_pg_addr: defaults::DEFAULT_PG_LISTEN_ADDR.to_string(),
|
||||
listen_http_addr: defaults::DEFAULT_HTTP_LISTEN_ADDR.to_string(),
|
||||
checkpoint_distance: defaults::DEFAULT_CHECKPOINT_DISTANCE,
|
||||
@@ -544,6 +728,7 @@ initial_superuser_name = 'zzzz'
|
||||
assert_eq!(
|
||||
parsed_config,
|
||||
PageServerConf {
|
||||
id: ZNodeId(10),
|
||||
listen_pg_addr: "127.0.0.1:64000".to_string(),
|
||||
listen_http_addr: "127.0.0.1:9898".to_string(),
|
||||
checkpoint_distance: 111,
|
||||
|
||||
@@ -1,6 +1,7 @@
|
||||
use serde::{Deserialize, Serialize};
|
||||
|
||||
use crate::ZTenantId;
|
||||
use zenith_utils::zid::ZNodeId;
|
||||
|
||||
#[derive(Serialize, Deserialize)]
|
||||
pub struct BranchCreateRequest {
|
||||
@@ -15,3 +16,8 @@ pub struct TenantCreateRequest {
|
||||
#[serde(with = "hex")]
|
||||
pub tenant_id: ZTenantId,
|
||||
}
|
||||
|
||||
#[derive(Serialize)]
|
||||
pub struct StatusResponse {
|
||||
pub id: ZNodeId,
|
||||
}
|
||||
|
||||
@@ -17,6 +17,11 @@ paths:
|
||||
application/json:
|
||||
schema:
|
||||
type: object
|
||||
required:
|
||||
- id
|
||||
properties:
|
||||
id:
|
||||
type: integer
|
||||
/v1/timeline/{tenant_id}:
|
||||
parameters:
|
||||
- name: tenant_id
|
||||
|
||||
@@ -1,7 +1,6 @@
|
||||
use std::sync::Arc;
|
||||
|
||||
use anyhow::{Context, Result};
|
||||
use hyper::header;
|
||||
use hyper::StatusCode;
|
||||
use hyper::{Body, Request, Response, Uri};
|
||||
use serde::Serialize;
|
||||
@@ -23,6 +22,7 @@ use zenith_utils::lsn::Lsn;
|
||||
use zenith_utils::zid::{opt_display_serde, ZTimelineId};
|
||||
|
||||
use super::models::BranchCreateRequest;
|
||||
use super::models::StatusResponse;
|
||||
use super::models::TenantCreateRequest;
|
||||
use crate::branches::BranchInfo;
|
||||
use crate::repository::RepositoryTimeline;
|
||||
@@ -64,12 +64,12 @@ fn get_config(request: &Request<Body>) -> &'static PageServerConf {
|
||||
}
|
||||
|
||||
// healthcheck handler
|
||||
async fn status_handler(_: Request<Body>) -> Result<Response<Body>, ApiError> {
|
||||
Ok(Response::builder()
|
||||
.status(StatusCode::OK)
|
||||
.header(header::CONTENT_TYPE, "application/json")
|
||||
.body(Body::from("{}"))
|
||||
.map_err(ApiError::from_err)?)
|
||||
async fn status_handler(request: Request<Body>) -> Result<Response<Body>, ApiError> {
|
||||
let config = get_config(&request);
|
||||
Ok(json_response(
|
||||
StatusCode::OK,
|
||||
StatusResponse { id: config.id },
|
||||
)?)
|
||||
}
|
||||
|
||||
async fn branch_create_handler(mut request: Request<Body>) -> Result<Response<Body>, ApiError> {
|
||||
|
||||
@@ -89,7 +89,7 @@ def test_foobar(zenith_env_builder: ZenithEnvBuilder):
|
||||
|
||||
# Now create the environment. This initializes the repository, and starts
|
||||
# up the page server and the safekeepers
|
||||
env = zenith_env_builder.init()
|
||||
env = zenith_env_builder.init_start()
|
||||
|
||||
# Run the test
|
||||
...
|
||||
|
||||
@@ -8,7 +8,7 @@ import pytest
|
||||
|
||||
def test_pageserver_auth(zenith_env_builder: ZenithEnvBuilder):
|
||||
zenith_env_builder.pageserver_auth_enabled = True
|
||||
env = zenith_env_builder.init()
|
||||
env = zenith_env_builder.init_start()
|
||||
|
||||
ps = env.pageserver
|
||||
|
||||
@@ -51,7 +51,7 @@ def test_compute_auth_to_pageserver(zenith_env_builder: ZenithEnvBuilder, with_w
|
||||
zenith_env_builder.pageserver_auth_enabled = True
|
||||
if with_wal_acceptors:
|
||||
zenith_env_builder.num_safekeepers = 3
|
||||
env = zenith_env_builder.init()
|
||||
env = zenith_env_builder.init_start()
|
||||
|
||||
branch = f"test_compute_auth_to_pageserver{with_wal_acceptors}"
|
||||
env.zenith_cli.create_branch(branch, "main")
|
||||
|
||||
@@ -93,7 +93,7 @@ def check_backpressure(pg: Postgres, stop_event: threading.Event, polling_interv
|
||||
|
||||
def test_backpressure_received_lsn_lag(zenith_env_builder: ZenithEnvBuilder):
|
||||
zenith_env_builder.num_safekeepers = 1
|
||||
env = zenith_env_builder.init()
|
||||
env = zenith_env_builder.init_start()
|
||||
# Create a branch for us
|
||||
env.zenith_cli.create_branch("test_backpressure", "main")
|
||||
|
||||
|
||||
@@ -19,7 +19,7 @@ def test_branch_behind(zenith_env_builder: ZenithEnvBuilder):
|
||||
#
|
||||
# See https://github.com/zenithdb/zenith/issues/1068
|
||||
zenith_env_builder.num_safekeepers = 1
|
||||
env = zenith_env_builder.init()
|
||||
env = zenith_env_builder.init_start()
|
||||
|
||||
# Branch at the point where only 100 rows were inserted
|
||||
env.zenith_cli.create_branch("test_branch_behind", "main")
|
||||
|
||||
@@ -11,7 +11,7 @@ from fixtures.log_helper import log
|
||||
def test_next_xid(zenith_env_builder: ZenithEnvBuilder):
|
||||
# One safekeeper is enough for this test.
|
||||
zenith_env_builder.num_safekeepers = 1
|
||||
env = zenith_env_builder.init()
|
||||
env = zenith_env_builder.init_start()
|
||||
|
||||
pg = env.postgres.create_start('main')
|
||||
|
||||
|
||||
@@ -1,8 +1,15 @@
|
||||
import json
|
||||
from uuid import uuid4, UUID
|
||||
from fixtures.zenith_fixtures import ZenithEnv, ZenithEnvBuilder, ZenithPageserverHttpClient
|
||||
from typing import cast
|
||||
import pytest, psycopg2
|
||||
import pytest
|
||||
from fixtures.zenith_fixtures import ZenithEnv, ZenithEnvBuilder, ZenithPageserverHttpClient, zenith_binpath
|
||||
|
||||
|
||||
# test that we cannot override node id
|
||||
def test_pageserver_init_node_id(zenith_env_builder: ZenithEnvBuilder):
|
||||
env = zenith_env_builder.init()
|
||||
with pytest.raises(
|
||||
Exception,
|
||||
match="node id can only be set during pageserver init and cannot be overridden"):
|
||||
env.pageserver.start(overrides=['--pageserver-config-override=id=10'])
|
||||
|
||||
|
||||
def check_client(client: ZenithPageserverHttpClient, initial_tenant: UUID):
|
||||
@@ -41,7 +48,7 @@ def test_pageserver_http_api_client(zenith_simple_env: ZenithEnv):
|
||||
|
||||
def test_pageserver_http_api_client_auth_enabled(zenith_env_builder: ZenithEnvBuilder):
|
||||
zenith_env_builder.pageserver_auth_enabled = True
|
||||
env = zenith_env_builder.init()
|
||||
env = zenith_env_builder.init_start()
|
||||
|
||||
management_token = env.auth_keys.generate_management_token()
|
||||
|
||||
|
||||
@@ -14,7 +14,7 @@ from fixtures.log_helper import log
|
||||
# and new compute node contains all data.
|
||||
def test_pageserver_catchup_while_compute_down(zenith_env_builder: ZenithEnvBuilder):
|
||||
zenith_env_builder.num_safekeepers = 3
|
||||
env = zenith_env_builder.init()
|
||||
env = zenith_env_builder.init_start()
|
||||
|
||||
env.zenith_cli.create_branch("test_pageserver_catchup_while_compute_down", "main")
|
||||
pg = env.postgres.create_start('test_pageserver_catchup_while_compute_down')
|
||||
|
||||
@@ -13,7 +13,7 @@ from fixtures.log_helper import log
|
||||
def test_pageserver_restart(zenith_env_builder: ZenithEnvBuilder):
|
||||
# One safekeeper is enough for this test.
|
||||
zenith_env_builder.num_safekeepers = 1
|
||||
env = zenith_env_builder.init()
|
||||
env = zenith_env_builder.init_start()
|
||||
|
||||
env.zenith_cli.create_branch("test_pageserver_restart", "main")
|
||||
pg = env.postgres.create_start('test_pageserver_restart')
|
||||
|
||||
@@ -42,7 +42,7 @@ def test_remote_storage_backup_and_restore(zenith_env_builder: ZenithEnvBuilder,
|
||||
data_secret = 'very secret secret'
|
||||
|
||||
##### First start, insert secret data and upload it to the remote storage
|
||||
env = zenith_env_builder.init()
|
||||
env = zenith_env_builder.init_start()
|
||||
pg = env.postgres.create_start()
|
||||
|
||||
tenant_id = pg.safe_psql("show zenith.zenith_tenant")[0][0]
|
||||
|
||||
@@ -13,7 +13,7 @@ def test_restart_compute(zenith_env_builder: ZenithEnvBuilder, with_wal_acceptor
|
||||
zenith_env_builder.pageserver_auth_enabled = True
|
||||
if with_wal_acceptors:
|
||||
zenith_env_builder.num_safekeepers = 3
|
||||
env = zenith_env_builder.init()
|
||||
env = zenith_env_builder.init_start()
|
||||
|
||||
env.zenith_cli.create_branch("test_restart_compute", "main")
|
||||
|
||||
|
||||
@@ -122,7 +122,7 @@ def test_tenant_relocation(zenith_env_builder: ZenithEnvBuilder,
|
||||
zenith_env_builder.num_safekeepers = 1
|
||||
zenith_env_builder.enable_local_fs_remote_storage()
|
||||
|
||||
env = zenith_env_builder.init()
|
||||
env = zenith_env_builder.init_start()
|
||||
|
||||
# create folder for remote storage mock
|
||||
remote_storage_mock_path = env.repo_dir / 'local_fs_remote_storage'
|
||||
|
||||
@@ -10,7 +10,7 @@ def test_tenants_normal_work(zenith_env_builder: ZenithEnvBuilder, with_wal_acce
|
||||
if with_wal_acceptors:
|
||||
zenith_env_builder.num_safekeepers = 3
|
||||
|
||||
env = zenith_env_builder.init()
|
||||
env = zenith_env_builder.init_start()
|
||||
"""Tests tenants with and without wal acceptors"""
|
||||
tenant_1 = env.create_tenant()
|
||||
tenant_2 = env.create_tenant()
|
||||
|
||||
@@ -67,7 +67,7 @@ def wait_for_pageserver_catchup(pgmain: Postgres, polling_interval=1, timeout=60
|
||||
|
||||
def test_timeline_size_quota(zenith_env_builder: ZenithEnvBuilder):
|
||||
zenith_env_builder.num_safekeepers = 1
|
||||
env = zenith_env_builder.init()
|
||||
env = zenith_env_builder.init_start()
|
||||
env.zenith_cli.create_branch("test_timeline_size_quota", "main")
|
||||
|
||||
client = env.pageserver.http_client()
|
||||
|
||||
@@ -22,7 +22,7 @@ from typing import List, Optional, Any
|
||||
# succeed and data is written
|
||||
def test_normal_work(zenith_env_builder: ZenithEnvBuilder):
|
||||
zenith_env_builder.num_safekeepers = 3
|
||||
env = zenith_env_builder.init()
|
||||
env = zenith_env_builder.init_start()
|
||||
|
||||
env.zenith_cli.create_branch("test_wal_acceptors_normal_work", "main")
|
||||
|
||||
@@ -51,7 +51,7 @@ class BranchMetrics:
|
||||
# against different timelines.
|
||||
def test_many_timelines(zenith_env_builder: ZenithEnvBuilder):
|
||||
zenith_env_builder.num_safekeepers = 3
|
||||
env = zenith_env_builder.init()
|
||||
env = zenith_env_builder.init_start()
|
||||
|
||||
n_timelines = 3
|
||||
|
||||
@@ -181,7 +181,7 @@ def test_restarts(zenith_env_builder: ZenithEnvBuilder):
|
||||
n_acceptors = 3
|
||||
|
||||
zenith_env_builder.num_safekeepers = n_acceptors
|
||||
env = zenith_env_builder.init()
|
||||
env = zenith_env_builder.init_start()
|
||||
|
||||
env.zenith_cli.create_branch("test_wal_acceptors_restarts", "main")
|
||||
pg = env.postgres.create_start('test_wal_acceptors_restarts')
|
||||
@@ -218,7 +218,7 @@ def delayed_wal_acceptor_start(wa):
|
||||
# When majority of acceptors is offline, commits are expected to be frozen
|
||||
def test_unavailability(zenith_env_builder: ZenithEnvBuilder):
|
||||
zenith_env_builder.num_safekeepers = 2
|
||||
env = zenith_env_builder.init()
|
||||
env = zenith_env_builder.init_start()
|
||||
|
||||
env.zenith_cli.create_branch("test_wal_acceptors_unavailability", "main")
|
||||
pg = env.postgres.create_start('test_wal_acceptors_unavailability')
|
||||
@@ -289,7 +289,7 @@ def stop_value():
|
||||
def test_race_conditions(zenith_env_builder: ZenithEnvBuilder, stop_value):
|
||||
|
||||
zenith_env_builder.num_safekeepers = 3
|
||||
env = zenith_env_builder.init()
|
||||
env = zenith_env_builder.init_start()
|
||||
|
||||
env.zenith_cli.create_branch("test_wal_acceptors_race_conditions", "main")
|
||||
pg = env.postgres.create_start('test_wal_acceptors_race_conditions')
|
||||
@@ -404,7 +404,7 @@ def test_sync_safekeepers(zenith_env_builder: ZenithEnvBuilder,
|
||||
# We don't really need the full environment for this test, just the
|
||||
# safekeepers would be enough.
|
||||
zenith_env_builder.num_safekeepers = 3
|
||||
env = zenith_env_builder.init()
|
||||
env = zenith_env_builder.init_start()
|
||||
|
||||
timeline_id = uuid.uuid4()
|
||||
tenant_id = uuid.uuid4()
|
||||
@@ -454,7 +454,7 @@ def test_sync_safekeepers(zenith_env_builder: ZenithEnvBuilder,
|
||||
def test_timeline_status(zenith_env_builder: ZenithEnvBuilder):
|
||||
|
||||
zenith_env_builder.num_safekeepers = 1
|
||||
env = zenith_env_builder.init()
|
||||
env = zenith_env_builder.init_start()
|
||||
|
||||
env.zenith_cli.create_branch("test_timeline_status", "main")
|
||||
pg = env.postgres.create_start('test_timeline_status')
|
||||
@@ -521,12 +521,7 @@ class SafekeeperEnv:
|
||||
http=self.port_distributor.get_port(),
|
||||
)
|
||||
|
||||
if self.num_safekeepers == 1:
|
||||
name = "single"
|
||||
else:
|
||||
name = f"sk{i}"
|
||||
|
||||
safekeeper_dir = os.path.join(self.repo_dir, name)
|
||||
safekeeper_dir = os.path.join(self.repo_dir, f"sk{i}")
|
||||
mkdir_if_needed(safekeeper_dir)
|
||||
|
||||
args = [
|
||||
@@ -537,6 +532,8 @@ class SafekeeperEnv:
|
||||
f"127.0.0.1:{port.http}",
|
||||
"-D",
|
||||
safekeeper_dir,
|
||||
"--id",
|
||||
str(i),
|
||||
"--daemonize"
|
||||
]
|
||||
|
||||
@@ -604,9 +601,8 @@ def test_safekeeper_without_pageserver(test_output_dir: str,
|
||||
|
||||
|
||||
def test_replace_safekeeper(zenith_env_builder: ZenithEnvBuilder):
|
||||
def safekeepers_guc(env: ZenithEnv, sk_names: List[str]) -> str:
|
||||
return ','.join(
|
||||
[f'localhost:{sk.port.pg}' for sk in env.safekeepers if sk.name in sk_names])
|
||||
def safekeepers_guc(env: ZenithEnv, sk_names: List[int]) -> str:
|
||||
return ','.join([f'localhost:{sk.port.pg}' for sk in env.safekeepers if sk.id in sk_names])
|
||||
|
||||
def execute_payload(pg: Postgres):
|
||||
with closing(pg.connect()) as conn:
|
||||
@@ -628,17 +624,17 @@ def test_replace_safekeeper(zenith_env_builder: ZenithEnvBuilder):
|
||||
http_cli = sk.http_client()
|
||||
try:
|
||||
status = http_cli.timeline_status(tenant_id, timeline_id)
|
||||
log.info(f"Safekeeper {sk.name} status: {status}")
|
||||
log.info(f"Safekeeper {sk.id} status: {status}")
|
||||
except Exception as e:
|
||||
log.info(f"Safekeeper {sk.name} status error: {e}")
|
||||
log.info(f"Safekeeper {sk.id} status error: {e}")
|
||||
|
||||
zenith_env_builder.num_safekeepers = 4
|
||||
env = zenith_env_builder.init()
|
||||
env = zenith_env_builder.init_start()
|
||||
env.zenith_cli.create_branch("test_replace_safekeeper", "main")
|
||||
|
||||
log.info("Use only first 3 safekeepers")
|
||||
env.safekeepers[3].stop()
|
||||
active_safekeepers = ['sk1', 'sk2', 'sk3']
|
||||
active_safekeepers = [1, 2, 3]
|
||||
pg = env.postgres.create('test_replace_safekeeper')
|
||||
pg.adjust_for_wal_acceptors(safekeepers_guc(env, active_safekeepers))
|
||||
pg.start()
|
||||
@@ -678,7 +674,7 @@ def test_replace_safekeeper(zenith_env_builder: ZenithEnvBuilder):
|
||||
|
||||
log.info("Recreate postgres to replace failed sk1 with new sk4")
|
||||
pg.stop_and_destroy().create('test_replace_safekeeper')
|
||||
active_safekeepers = ['sk2', 'sk3', 'sk4']
|
||||
active_safekeepers = [2, 3, 4]
|
||||
env.safekeepers[3].start()
|
||||
pg.adjust_for_wal_acceptors(safekeepers_guc(env, active_safekeepers))
|
||||
pg.start()
|
||||
|
||||
@@ -200,7 +200,7 @@ async def run_restarts_under_load(pg: Postgres, acceptors: List[Safekeeper], n_w
|
||||
# restart acceptors one by one, while executing and validating bank transactions
|
||||
def test_restarts_under_load(zenith_env_builder: ZenithEnvBuilder):
|
||||
zenith_env_builder.num_safekeepers = 3
|
||||
env = zenith_env_builder.init()
|
||||
env = zenith_env_builder.init_start()
|
||||
|
||||
env.zenith_cli.create_branch("test_wal_acceptors_restarts_under_load", "main")
|
||||
pg = env.postgres.create_start('test_wal_acceptors_restarts_under_load')
|
||||
|
||||
@@ -97,7 +97,7 @@ def test_cli_tenant_list(zenith_simple_env: ZenithEnv):
|
||||
def test_cli_ipv4_listeners(zenith_env_builder: ZenithEnvBuilder):
|
||||
# Start with single sk
|
||||
zenith_env_builder.num_safekeepers = 1
|
||||
env = zenith_env_builder.init()
|
||||
env = zenith_env_builder.init_start()
|
||||
|
||||
# Connect to sk port on v4 loopback
|
||||
res = requests.get(f'http://127.0.0.1:{env.safekeepers[0].port.http}/v1/status')
|
||||
@@ -114,7 +114,7 @@ def test_cli_ipv4_listeners(zenith_env_builder: ZenithEnvBuilder):
|
||||
def test_cli_start_stop(zenith_env_builder: ZenithEnvBuilder):
|
||||
# Start with single sk
|
||||
zenith_env_builder.num_safekeepers = 1
|
||||
env = zenith_env_builder.init()
|
||||
env = zenith_env_builder.init_start()
|
||||
|
||||
# Stop default ps/sk
|
||||
env.zenith_cli.pageserver_stop()
|
||||
|
||||
@@ -27,7 +27,7 @@ from dataclasses import dataclass
|
||||
|
||||
# Type-related stuff
|
||||
from psycopg2.extensions import connection as PgConnection
|
||||
from typing import Any, Callable, Dict, Iterator, List, Optional, TypeVar, cast, Union, Tuple
|
||||
from typing import Any, Callable, Dict, Iterable, Iterator, List, Optional, TypeVar, cast, Union, Tuple
|
||||
from typing_extensions import Literal
|
||||
import pytest
|
||||
|
||||
@@ -434,6 +434,14 @@ class ZenithEnvBuilder:
|
||||
self.env = ZenithEnv(self)
|
||||
return self.env
|
||||
|
||||
def start(self):
|
||||
self.env.start()
|
||||
|
||||
def init_start(self) -> ZenithEnv:
|
||||
env = self.init()
|
||||
self.start()
|
||||
return env
|
||||
|
||||
"""
|
||||
Sets up the pageserver to use the local fs at the `test_dir/local_fs_remote_storage` path.
|
||||
Errors, if the pageserver has some remote storage configuration already, unless `force_enable` is not set to `True`.
|
||||
@@ -549,6 +557,7 @@ class ZenithEnv:
|
||||
|
||||
toml += textwrap.dedent(f"""
|
||||
[pageserver]
|
||||
id=1
|
||||
listen_pg_addr = 'localhost:{pageserver_port.pg}'
|
||||
listen_http_addr = 'localhost:{pageserver_port.http}'
|
||||
auth_type = '{pageserver_auth_type}'
|
||||
@@ -566,25 +575,22 @@ class ZenithEnv:
|
||||
pg=self.port_distributor.get_port(),
|
||||
http=self.port_distributor.get_port(),
|
||||
)
|
||||
|
||||
if config.num_safekeepers == 1:
|
||||
name = "single"
|
||||
else:
|
||||
name = f"sk{i}"
|
||||
toml += f"""
|
||||
[[safekeepers]]
|
||||
name = '{name}'
|
||||
pg_port = {port.pg}
|
||||
http_port = {port.http}
|
||||
sync = false # Disable fsyncs to make the tests go faster
|
||||
"""
|
||||
safekeeper = Safekeeper(env=self, name=name, port=port)
|
||||
id = i # assign ids sequentially
|
||||
toml += textwrap.dedent(f"""
|
||||
[[safekeepers]]
|
||||
id = {id}
|
||||
pg_port = {port.pg}
|
||||
http_port = {port.http}
|
||||
sync = false # Disable fsyncs to make the tests go faster
|
||||
""")
|
||||
safekeeper = Safekeeper(env=self, id=id, port=port)
|
||||
self.safekeepers.append(safekeeper)
|
||||
|
||||
log.info(f"Config: {toml}")
|
||||
|
||||
self.zenith_cli.init(toml)
|
||||
|
||||
def start(self):
|
||||
# Start up the page server and all the safekeepers
|
||||
self.pageserver.start()
|
||||
|
||||
@@ -625,7 +631,7 @@ def _shared_simple_env(request: Any, port_distributor) -> Iterator[ZenithEnv]:
|
||||
|
||||
with ZenithEnvBuilder(Path(repo_dir), port_distributor) as builder:
|
||||
|
||||
env = builder.init()
|
||||
env = builder.init_start()
|
||||
|
||||
# For convenience in tests, create a branch from the freshly-initialized cluster.
|
||||
env.zenith_cli.create_branch("empty", "main")
|
||||
@@ -659,7 +665,7 @@ def zenith_env_builder(test_output_dir, port_distributor) -> Iterator[ZenithEnvB
|
||||
To use, define 'zenith_env_builder' fixture in your test to get access to the
|
||||
builder object. Set properties on it to describe the environment.
|
||||
Finally, initialize and start up the environment by calling
|
||||
zenith_env_builder.init().
|
||||
zenith_env_builder.init_start().
|
||||
|
||||
After the initialization, you can launch compute nodes by calling
|
||||
the functions in the 'env.postgres' factory object, stop/start the
|
||||
@@ -847,8 +853,8 @@ class ZenithCli:
|
||||
|
||||
return self.raw_cli(cmd)
|
||||
|
||||
def pageserver_start(self) -> 'subprocess.CompletedProcess[str]':
|
||||
start_args = ['pageserver', 'start']
|
||||
def pageserver_start(self, overrides=()) -> 'subprocess.CompletedProcess[str]':
|
||||
start_args = ['pageserver', 'start', *overrides]
|
||||
append_pageserver_param_overrides(start_args,
|
||||
self.env.pageserver.remote_storage,
|
||||
self.env.pageserver.config_override)
|
||||
@@ -862,17 +868,17 @@ class ZenithCli:
|
||||
log.info(f"Stopping pageserver with {cmd}")
|
||||
return self.raw_cli(cmd)
|
||||
|
||||
def safekeeper_start(self, name: str) -> 'subprocess.CompletedProcess[str]':
|
||||
return self.raw_cli(['safekeeper', 'start', name])
|
||||
def safekeeper_start(self, id: int) -> 'subprocess.CompletedProcess[str]':
|
||||
return self.raw_cli(['safekeeper', 'start', str(id)])
|
||||
|
||||
def safekeeper_stop(self,
|
||||
name: Optional[str] = None,
|
||||
id: Optional[int] = None,
|
||||
immediate=False) -> 'subprocess.CompletedProcess[str]':
|
||||
args = ['safekeeper', 'stop']
|
||||
if id is not None:
|
||||
args.extend(str(id))
|
||||
if immediate:
|
||||
args.extend(['-m', 'immediate'])
|
||||
if name is not None:
|
||||
args.append(name)
|
||||
return self.raw_cli(args)
|
||||
|
||||
def pg_create(
|
||||
@@ -1005,14 +1011,15 @@ class ZenithPageserver(PgProtocol):
|
||||
self.remote_storage = remote_storage
|
||||
self.config_override = config_override
|
||||
|
||||
def start(self) -> 'ZenithPageserver':
|
||||
def start(self, overrides=()) -> 'ZenithPageserver':
|
||||
"""
|
||||
Start the page server.
|
||||
`overrides` allows to add some config to this pageserver start.
|
||||
Returns self.
|
||||
"""
|
||||
assert self.running == False
|
||||
|
||||
self.env.zenith_cli.pageserver_start()
|
||||
self.env.zenith_cli.pageserver_start(overrides=overrides)
|
||||
self.running = True
|
||||
return self
|
||||
|
||||
@@ -1466,12 +1473,14 @@ class Safekeeper:
|
||||
""" An object representing a running safekeeper daemon. """
|
||||
env: ZenithEnv
|
||||
port: SafekeeperPort
|
||||
name: str # identifier for logging
|
||||
id: int
|
||||
auth_token: Optional[str] = None
|
||||
running: bool = False
|
||||
|
||||
def start(self) -> 'Safekeeper':
|
||||
self.env.zenith_cli.safekeeper_start(self.name)
|
||||
|
||||
assert self.running == False
|
||||
self.env.zenith_cli.safekeeper_start(self.id)
|
||||
self.running = True
|
||||
# wait for wal acceptor start by checking its status
|
||||
started_at = time.time()
|
||||
while True:
|
||||
@@ -1489,8 +1498,9 @@ class Safekeeper:
|
||||
return self
|
||||
|
||||
def stop(self, immediate=False) -> 'Safekeeper':
|
||||
log.info('Stopping safekeeper {}'.format(self.name))
|
||||
self.env.zenith_cli.safekeeper_stop(self.name, immediate)
|
||||
log.info('Stopping safekeeper {}'.format(self.id))
|
||||
self.env.zenith_cli.safekeeper_stop(self.id, immediate)
|
||||
self.running = False
|
||||
return self
|
||||
|
||||
def append_logical_message(self,
|
||||
|
||||
@@ -23,7 +23,7 @@ def test_bulk_tenant_create(
|
||||
"""Measure tenant creation time (with and without wal acceptors)"""
|
||||
if use_wal_acceptors == 'with_wa':
|
||||
zenith_env_builder.num_safekeepers = 3
|
||||
env = zenith_env_builder.init()
|
||||
env = zenith_env_builder.init_start()
|
||||
|
||||
time_slices = []
|
||||
|
||||
|
||||
@@ -1,17 +1,19 @@
|
||||
//
|
||||
// Main entry point for the safekeeper executable
|
||||
//
|
||||
use anyhow::{Context, Result};
|
||||
use anyhow::{bail, Context, Result};
|
||||
use clap::{App, Arg};
|
||||
use const_format::formatcp;
|
||||
use daemonize::Daemonize;
|
||||
use fs2::FileExt;
|
||||
use std::fs::File;
|
||||
use std::fs::{self, File};
|
||||
use std::io::{ErrorKind, Write};
|
||||
use std::path::{Path, PathBuf};
|
||||
use std::thread;
|
||||
use tracing::*;
|
||||
use walkeeper::control_file::{self, CreateControlFile};
|
||||
use zenith_utils::http::endpoint;
|
||||
use zenith_utils::zid::ZNodeId;
|
||||
use zenith_utils::{logging, tcp_listener, GIT_VERSION};
|
||||
|
||||
use tokio::sync::mpsc;
|
||||
@@ -25,6 +27,7 @@ use zenith_utils::shutdown::exit_now;
|
||||
use zenith_utils::signals;
|
||||
|
||||
const LOCK_FILE_NAME: &str = "safekeeper.lock";
|
||||
const ID_FILE_NAME: &str = "safekeeper.id";
|
||||
|
||||
fn main() -> Result<()> {
|
||||
zenith_metrics::set_common_metrics_prefix("safekeeper");
|
||||
@@ -38,6 +41,12 @@ fn main() -> Result<()> {
|
||||
.takes_value(true)
|
||||
.help("Path to the safekeeper data directory"),
|
||||
)
|
||||
.arg(
|
||||
Arg::new("init")
|
||||
.long("init")
|
||||
.takes_value(false)
|
||||
.help("Initialize safekeeper with ID"),
|
||||
)
|
||||
.arg(
|
||||
Arg::new("listen-pg")
|
||||
.short('l')
|
||||
@@ -93,6 +102,9 @@ fn main() -> Result<()> {
|
||||
.takes_value(true)
|
||||
.help("Dump control file at path specifed by this argument and exit"),
|
||||
)
|
||||
.arg(
|
||||
Arg::new("id").long("id").takes_value(true).help("safekeeper node id: integer")
|
||||
)
|
||||
.get_matches();
|
||||
|
||||
if let Some(addr) = arg_matches.value_of("dump-control-file") {
|
||||
@@ -136,10 +148,19 @@ fn main() -> Result<()> {
|
||||
conf.recall_period = humantime::parse_duration(recall)?;
|
||||
}
|
||||
|
||||
start_safekeeper(conf)
|
||||
let mut given_id = None;
|
||||
if let Some(given_id_str) = arg_matches.value_of("id") {
|
||||
given_id = Some(ZNodeId(
|
||||
given_id_str
|
||||
.parse()
|
||||
.context("failed to parse safekeeper id")?,
|
||||
));
|
||||
}
|
||||
|
||||
start_safekeeper(conf, given_id, arg_matches.is_present("init"))
|
||||
}
|
||||
|
||||
fn start_safekeeper(conf: SafeKeeperConf) -> Result<()> {
|
||||
fn start_safekeeper(mut conf: SafeKeeperConf, given_id: Option<ZNodeId>, init: bool) -> Result<()> {
|
||||
let log_file = logging::init("safekeeper.log", conf.daemonize)?;
|
||||
|
||||
info!("version: {}", GIT_VERSION);
|
||||
@@ -154,6 +175,12 @@ fn start_safekeeper(conf: SafeKeeperConf) -> Result<()> {
|
||||
)
|
||||
})?;
|
||||
|
||||
// Set or read our ID.
|
||||
set_id(&mut conf, given_id)?;
|
||||
if init {
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
let http_listener = tcp_listener::bind(conf.listen_http_addr.clone()).map_err(|e| {
|
||||
error!("failed to bind to address {}: {}", conf.listen_http_addr, e);
|
||||
e
|
||||
@@ -260,3 +287,49 @@ fn start_safekeeper(conf: SafeKeeperConf) -> Result<()> {
|
||||
std::process::exit(111);
|
||||
})
|
||||
}
|
||||
|
||||
/// Determine safekeeper id and set it in config.
|
||||
fn set_id(conf: &mut SafeKeeperConf, given_id: Option<ZNodeId>) -> Result<()> {
|
||||
let id_file_path = conf.workdir.join(ID_FILE_NAME);
|
||||
|
||||
let my_id: ZNodeId;
|
||||
// If ID exists, read it in; otherwise set one passed
|
||||
match fs::read(&id_file_path) {
|
||||
Ok(id_serialized) => {
|
||||
my_id = ZNodeId(
|
||||
std::str::from_utf8(&id_serialized)
|
||||
.context("failed to parse safekeeper id")?
|
||||
.parse()
|
||||
.context("failed to parse safekeeper id")?,
|
||||
);
|
||||
if let Some(given_id) = given_id {
|
||||
if given_id != my_id {
|
||||
bail!(
|
||||
"safekeeper already initialized with id {}, can't set {}",
|
||||
my_id,
|
||||
given_id
|
||||
);
|
||||
}
|
||||
}
|
||||
info!("safekeeper ID {}", my_id);
|
||||
}
|
||||
Err(error) => match error.kind() {
|
||||
ErrorKind::NotFound => {
|
||||
my_id = if let Some(given_id) = given_id {
|
||||
given_id
|
||||
} else {
|
||||
bail!("safekeeper id is not specified");
|
||||
};
|
||||
let mut f = File::create(&id_file_path)?;
|
||||
f.write_all(my_id.to_string().as_bytes())?;
|
||||
f.sync_all()?;
|
||||
info!("initialized safekeeper ID {}", my_id);
|
||||
}
|
||||
_ => {
|
||||
return Err(error.into());
|
||||
}
|
||||
},
|
||||
}
|
||||
conf.my_id = my_id;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
@@ -5,6 +5,7 @@ use std::fmt::Display;
|
||||
use std::sync::Arc;
|
||||
use zenith_utils::http::{RequestExt, RouterBuilder};
|
||||
use zenith_utils::lsn::Lsn;
|
||||
use zenith_utils::zid::ZNodeId;
|
||||
use zenith_utils::zid::ZTenantTimelineId;
|
||||
|
||||
use crate::control_file::CreateControlFile;
|
||||
@@ -18,9 +19,16 @@ use zenith_utils::http::json::json_response;
|
||||
use zenith_utils::http::request::parse_request_param;
|
||||
use zenith_utils::zid::{ZTenantId, ZTimelineId};
|
||||
|
||||
#[derive(Debug, Serialize)]
|
||||
struct SafekeeperStatus {
|
||||
id: ZNodeId,
|
||||
}
|
||||
|
||||
/// Healthcheck handler.
|
||||
async fn status_handler(_: Request<Body>) -> Result<Response<Body>, ApiError> {
|
||||
Ok(json_response(StatusCode::OK, "")?)
|
||||
async fn status_handler(request: Request<Body>) -> Result<Response<Body>, ApiError> {
|
||||
let conf = get_conf(&request);
|
||||
let status = SafekeeperStatus { id: conf.my_id };
|
||||
Ok(json_response(StatusCode::OK, status)?)
|
||||
}
|
||||
|
||||
fn get_conf(request: &Request<Body>) -> &SafeKeeperConf {
|
||||
|
||||
@@ -2,7 +2,7 @@
|
||||
use std::path::PathBuf;
|
||||
use std::time::Duration;
|
||||
|
||||
use zenith_utils::zid::ZTenantTimelineId;
|
||||
use zenith_utils::zid::{ZNodeId, ZTenantTimelineId};
|
||||
|
||||
pub mod callmemaybe;
|
||||
pub mod control_file;
|
||||
@@ -46,6 +46,7 @@ pub struct SafeKeeperConf {
|
||||
pub listen_http_addr: String,
|
||||
pub ttl: Option<Duration>,
|
||||
pub recall_period: Duration,
|
||||
pub my_id: ZNodeId,
|
||||
}
|
||||
|
||||
impl SafeKeeperConf {
|
||||
@@ -69,6 +70,7 @@ impl Default for SafeKeeperConf {
|
||||
listen_http_addr: defaults::DEFAULT_HTTP_LISTEN_ADDR.to_string(),
|
||||
ttl: None,
|
||||
recall_period: defaults::DEFAULT_RECALL_PERIOD,
|
||||
my_id: ZNodeId(0),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -18,32 +18,35 @@ use walkeeper::defaults::{
|
||||
};
|
||||
use zenith_utils::auth::{Claims, Scope};
|
||||
use zenith_utils::postgres_backend::AuthType;
|
||||
use zenith_utils::zid::{ZTenantId, ZTimelineId};
|
||||
use zenith_utils::zid::{ZNodeId, ZTenantId, ZTimelineId};
|
||||
use zenith_utils::GIT_VERSION;
|
||||
|
||||
use pageserver::branches::BranchInfo;
|
||||
|
||||
// Default name of a safekeeper node, if not specified on the command line.
|
||||
const DEFAULT_SAFEKEEPER_NAME: &str = "single";
|
||||
// Default id of a safekeeper node, if not specified on the command line.
|
||||
const DEFAULT_SAFEKEEPER_ID: ZNodeId = ZNodeId(1);
|
||||
const DEFAULT_PAGESERVER_ID: ZNodeId = ZNodeId(1);
|
||||
|
||||
fn default_conf() -> String {
|
||||
format!(
|
||||
r#"
|
||||
# Default built-in configuration, defined in main.rs
|
||||
[pageserver]
|
||||
id = {pageserver_id}
|
||||
listen_pg_addr = '{pageserver_pg_addr}'
|
||||
listen_http_addr = '{pageserver_http_addr}'
|
||||
auth_type = '{pageserver_auth_type}'
|
||||
|
||||
[[safekeepers]]
|
||||
name = '{safekeeper_name}'
|
||||
id = {safekeeper_id}
|
||||
pg_port = {safekeeper_pg_port}
|
||||
http_port = {safekeeper_http_port}
|
||||
"#,
|
||||
pageserver_id = DEFAULT_PAGESERVER_ID,
|
||||
pageserver_pg_addr = DEFAULT_PAGESERVER_PG_ADDR,
|
||||
pageserver_http_addr = DEFAULT_PAGESERVER_HTTP_ADDR,
|
||||
pageserver_auth_type = AuthType::Trust,
|
||||
safekeeper_name = DEFAULT_SAFEKEEPER_NAME,
|
||||
safekeeper_id = DEFAULT_SAFEKEEPER_ID,
|
||||
safekeeper_pg_port = DEFAULT_SAFEKEEPER_PG_PORT,
|
||||
safekeeper_http_port = DEFAULT_SAFEKEEPER_HTTP_PORT,
|
||||
)
|
||||
@@ -74,9 +77,9 @@ fn main() -> Result<()> {
|
||||
.required(true);
|
||||
|
||||
#[rustfmt::skip]
|
||||
let safekeeper_node_arg = Arg::new("node")
|
||||
let safekeeper_id_arg = Arg::new("id")
|
||||
.index(1)
|
||||
.help("Node name")
|
||||
.help("safekeeper id")
|
||||
.required(false);
|
||||
|
||||
let timeline_arg = Arg::new("timeline")
|
||||
@@ -154,16 +157,16 @@ fn main() -> Result<()> {
|
||||
.about("Manage safekeepers")
|
||||
.subcommand(App::new("start")
|
||||
.about("Start local safekeeper")
|
||||
.arg(safekeeper_node_arg.clone())
|
||||
.arg(safekeeper_id_arg.clone())
|
||||
)
|
||||
.subcommand(App::new("stop")
|
||||
.about("Stop local safekeeper")
|
||||
.arg(safekeeper_node_arg.clone())
|
||||
.arg(safekeeper_id_arg.clone())
|
||||
.arg(stop_mode_arg.clone())
|
||||
)
|
||||
.subcommand(App::new("restart")
|
||||
.about("Restart local safekeeper")
|
||||
.arg(safekeeper_node_arg.clone())
|
||||
.arg(safekeeper_id_arg.clone())
|
||||
.arg(stop_mode_arg.clone())
|
||||
)
|
||||
)
|
||||
@@ -628,11 +631,11 @@ fn handle_pageserver(sub_match: &ArgMatches, env: &local_env::LocalEnv) -> Resul
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn get_safekeeper(env: &local_env::LocalEnv, name: &str) -> Result<SafekeeperNode> {
|
||||
if let Some(node) = env.safekeepers.iter().find(|node| node.name == name) {
|
||||
fn get_safekeeper(env: &local_env::LocalEnv, id: ZNodeId) -> Result<SafekeeperNode> {
|
||||
if let Some(node) = env.safekeepers.iter().find(|node| node.id == id) {
|
||||
Ok(SafekeeperNode::from_env(env, node))
|
||||
} else {
|
||||
bail!("could not find safekeeper '{}'", name)
|
||||
bail!("could not find safekeeper '{}'", id)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -643,8 +646,12 @@ fn handle_safekeeper(sub_match: &ArgMatches, env: &local_env::LocalEnv) -> Resul
|
||||
};
|
||||
|
||||
// All the commands take an optional safekeeper name argument
|
||||
let node_name = sub_args.value_of("node").unwrap_or(DEFAULT_SAFEKEEPER_NAME);
|
||||
let safekeeper = get_safekeeper(env, node_name)?;
|
||||
let sk_id = if let Some(id_str) = sub_args.value_of("id") {
|
||||
ZNodeId(id_str.parse().context("while parsing safekeeper id")?)
|
||||
} else {
|
||||
DEFAULT_SAFEKEEPER_ID
|
||||
};
|
||||
let safekeeper = get_safekeeper(env, sk_id)?;
|
||||
|
||||
match sub_name {
|
||||
"start" => {
|
||||
@@ -697,7 +704,7 @@ fn handle_start_all(sub_match: &ArgMatches, env: &local_env::LocalEnv) -> Result
|
||||
for node in env.safekeepers.iter() {
|
||||
let safekeeper = SafekeeperNode::from_env(env, node);
|
||||
if let Err(e) = safekeeper.start() {
|
||||
eprintln!("safekeeper '{}' start failed: {}", safekeeper.name, e);
|
||||
eprintln!("safekeeper '{}' start failed: {}", safekeeper.id, e);
|
||||
exit(1);
|
||||
}
|
||||
}
|
||||
@@ -724,7 +731,7 @@ fn handle_stop_all(sub_match: &ArgMatches, env: &local_env::LocalEnv) -> Result<
|
||||
for node in env.safekeepers.iter() {
|
||||
let safekeeper = SafekeeperNode::from_env(env, node);
|
||||
if let Err(e) = safekeeper.stop(immediate) {
|
||||
eprintln!("safekeeper '{}' stop failed: {}", safekeeper.name, e);
|
||||
eprintln!("safekeeper '{}' stop failed: {}", safekeeper.id, e);
|
||||
}
|
||||
}
|
||||
Ok(())
|
||||
|
||||
@@ -221,6 +221,18 @@ impl fmt::Display for ZTenantTimelineId {
|
||||
}
|
||||
}
|
||||
|
||||
// Unique ID of a storage node (safekeeper or pageserver). Supposed to be issued
|
||||
// by the console.
|
||||
#[derive(Clone, Copy, Eq, Ord, PartialEq, PartialOrd, Debug, Serialize, Deserialize)]
|
||||
#[serde(transparent)]
|
||||
pub struct ZNodeId(pub u64);
|
||||
|
||||
impl fmt::Display for ZNodeId {
|
||||
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
|
||||
write!(f, "{}", self.0)
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use std::fmt::Display;
|
||||
|
||||
Reference in New Issue
Block a user