Enable at least 1 safekeeper in every test

This commit is contained in:
Kirill Bulatov
2022-05-03 14:11:29 +03:00
committed by Kirill Bulatov
parent bea84150b2
commit 9a0fed0880
20 changed files with 161 additions and 80 deletions

View File

@@ -6,7 +6,7 @@ After=network.target auditd.service
Type=simple
User=pageserver
Environment=RUST_BACKTRACE=1 ZENITH_REPO_DIR=/storage/pageserver LD_LIBRARY_PATH=/usr/local/lib
ExecStart=/usr/local/bin/pageserver -c "pg_distrib_dir='/usr/local'" -c "listen_pg_addr='0.0.0.0:6400'" -c "listen_http_addr='0.0.0.0:9898'" -D /storage/pageserver/data
ExecStart=/usr/local/bin/pageserver -c "pg_distrib_dir='/usr/local'" -c "listen_pg_addr='0.0.0.0:6400'" -c "listen_http_addr='0.0.0.0:9898'" -c "broker_endpoints=['{{ etcd_endpoints }}']" -D /storage/pageserver/data
ExecReload=/bin/kill -HUP $MAINPID
KillMode=mixed
KillSignal=SIGINT

View File

@@ -97,6 +97,7 @@ pub struct PageServerConf {
// jwt auth token used for communication with pageserver
pub auth_token: String,
pub broker_endpoints: Vec<String>,
}
impl Default for PageServerConf {
@@ -107,6 +108,7 @@ impl Default for PageServerConf {
listen_http_addr: String::new(),
auth_type: AuthType::Trust,
auth_token: String::new(),
broker_endpoints: Vec::new(),
}
}
}
@@ -401,6 +403,7 @@ impl LocalEnv {
self.pageserver.auth_token =
self.generate_auth_token(&Claims::new(None, Scope::PageServerApi))?;
self.pageserver.broker_endpoints = self.broker_endpoints.clone();
fs::create_dir_all(self.pg_data_dirs_path())?;

View File

@@ -137,6 +137,7 @@ impl SafekeeperNode {
.args(&["--listen-pg", &listen_pg])
.args(&["--listen-http", &listen_http])
.args(&["--recall", "1 second"])
.args(&["--broker-endpoints", &self.broker_endpoints.join(",")])
.arg("--daemonize"),
);
if !self.conf.sync {

View File

@@ -121,6 +121,16 @@ impl PageServerNode {
);
let listen_pg_addr_param =
format!("listen_pg_addr='{}'", self.env.pageserver.listen_pg_addr);
let broker_endpoints_param = format!(
"broker_endpoints=[{}]",
self.env
.pageserver
.broker_endpoints
.iter()
.map(|url| format!("'{url}'"))
.collect::<Vec<_>>()
.join(",")
);
let mut args = Vec::with_capacity(20);
args.push("--init");
@@ -129,6 +139,7 @@ impl PageServerNode {
args.extend(["-c", &authg_type_param]);
args.extend(["-c", &listen_http_addr_param]);
args.extend(["-c", &listen_pg_addr_param]);
args.extend(["-c", &broker_endpoints_param]);
args.extend(["-c", &id]);
for config_override in config_overrides {

View File

@@ -7,7 +7,11 @@ if [ "$1" = 'pageserver' ]; then
pageserver --init -D /data -c "pg_distrib_dir='/usr/local'" -c "id=10"
fi
echo "Staring pageserver at 0.0.0.0:6400"
pageserver -c "listen_pg_addr='0.0.0.0:6400'" -c "listen_http_addr='0.0.0.0:9898'" -D /data
if [ -z '${BROKER_ENDPOINTS}' ]; then
pageserver -c "listen_pg_addr='0.0.0.0:6400'" -c "listen_http_addr='0.0.0.0:9898'" -D /data
else
pageserver -c "listen_pg_addr='0.0.0.0:6400'" -c "listen_http_addr='0.0.0.0:9898'" -c "broker_endpoints=['${BROKER_ENDPOINTS}']" -D /data
fi
else
"$@"
fi

View File

@@ -13,6 +13,7 @@ use std::str::FromStr;
use std::time::Duration;
use toml_edit;
use toml_edit::{Document, Item};
use url::Url;
use utils::{
postgres_backend::AuthType,
zid::{ZNodeId, ZTenantId, ZTimelineId},
@@ -111,6 +112,9 @@ pub struct PageServerConf {
pub profiling: ProfilingConfig,
pub default_tenant_conf: TenantConf,
/// Etcd broker endpoints to connect to.
pub broker_endpoints: Vec<Url>,
}
#[derive(Debug, Clone, PartialEq, Eq)]
@@ -175,6 +179,7 @@ struct PageServerConfigBuilder {
id: BuilderValue<ZNodeId>,
profiling: BuilderValue<ProfilingConfig>,
broker_endpoints: BuilderValue<Vec<Url>>,
}
impl Default for PageServerConfigBuilder {
@@ -200,6 +205,7 @@ impl Default for PageServerConfigBuilder {
remote_storage_config: Set(None),
id: NotSet,
profiling: Set(ProfilingConfig::Disabled),
broker_endpoints: NotSet,
}
}
}
@@ -256,6 +262,10 @@ impl PageServerConfigBuilder {
self.remote_storage_config = BuilderValue::Set(remote_storage_config)
}
pub fn broker_endpoints(&mut self, broker_endpoints: Vec<Url>) {
self.broker_endpoints = BuilderValue::Set(broker_endpoints)
}
pub fn id(&mut self, node_id: ZNodeId) {
self.id = BuilderValue::Set(node_id)
}
@@ -264,7 +274,15 @@ impl PageServerConfigBuilder {
self.profiling = BuilderValue::Set(profiling)
}
pub fn build(self) -> Result<PageServerConf> {
pub fn build(self) -> anyhow::Result<PageServerConf> {
let broker_endpoints = self
.broker_endpoints
.ok_or(anyhow!("No broker endpoints provided"))?;
ensure!(
!broker_endpoints.is_empty(),
"Empty broker endpoints collection provided"
);
Ok(PageServerConf {
listen_pg_addr: self
.listen_pg_addr
@@ -300,6 +318,7 @@ impl PageServerConfigBuilder {
profiling: self.profiling.ok_or(anyhow!("missing profiling"))?,
// TenantConf is handled separately
default_tenant_conf: TenantConf::default(),
broker_endpoints,
})
}
}
@@ -341,7 +360,7 @@ impl PageServerConf {
/// validating the input and failing on errors.
///
/// This leaves any options not present in the file in the built-in defaults.
pub fn parse_and_validate(toml: &Document, workdir: &Path) -> Result<Self> {
pub fn parse_and_validate(toml: &Document, workdir: &Path) -> anyhow::Result<Self> {
let mut builder = PageServerConfigBuilder::default();
builder.workdir(workdir.to_owned());
@@ -373,6 +392,16 @@ impl PageServerConf {
}
"id" => builder.id(ZNodeId(parse_toml_u64(key, item)?)),
"profiling" => builder.profiling(parse_toml_from_str(key, item)?),
"broker_endpoints" => builder.broker_endpoints(
parse_toml_array(key, item)?
.into_iter()
.map(|endpoint_str| {
endpoint_str.parse::<Url>().with_context(|| {
format!("Array item {endpoint_str} for key {key} is not a valid url endpoint")
})
})
.collect::<anyhow::Result<_>>()?,
),
_ => bail!("unrecognized pageserver option '{key}'"),
}
}
@@ -526,6 +555,7 @@ impl PageServerConf {
remote_storage_config: None,
profiling: ProfilingConfig::Disabled,
default_tenant_conf: TenantConf::dummy_conf(),
broker_endpoints: Vec::new(),
}
}
}
@@ -576,14 +606,36 @@ fn parse_toml_duration(name: &str, item: &Item) -> Result<Duration> {
Ok(humantime::parse_duration(s)?)
}
fn parse_toml_from_str<T>(name: &str, item: &Item) -> Result<T>
fn parse_toml_from_str<T>(name: &str, item: &Item) -> anyhow::Result<T>
where
T: FromStr<Err = anyhow::Error>,
T: FromStr,
<T as FromStr>::Err: std::fmt::Display,
{
let v = item
.as_str()
.with_context(|| format!("configure option {name} is not a string"))?;
T::from_str(v)
T::from_str(v).map_err(|e| {
anyhow!(
"Failed to parse string as {parse_type} for configure option {name}: {e}",
parse_type = stringify!(T)
)
})
}
fn parse_toml_array(name: &str, item: &Item) -> anyhow::Result<Vec<String>> {
let array = item
.as_array()
.with_context(|| format!("configure option {name} is not an array"))?;
array
.iter()
.map(|value| {
value
.as_str()
.map(str::to_string)
.with_context(|| format!("Array item {value:?} for key {name} is not a string"))
})
.collect()
}
#[cfg(test)]
@@ -616,12 +668,16 @@ id = 10
fn parse_defaults() -> anyhow::Result<()> {
let tempdir = tempdir()?;
let (workdir, pg_distrib_dir) = prepare_fs(&tempdir)?;
// we have to create dummy pathes to overcome the validation errors
let config_string = format!("pg_distrib_dir='{}'\nid=10", pg_distrib_dir.display());
let broker_endpoint = "http://127.0.0.1:7777";
// we have to create dummy values to overcome the validation errors
let config_string = format!(
"pg_distrib_dir='{}'\nid=10\nbroker_endpoints = ['{broker_endpoint}']",
pg_distrib_dir.display()
);
let toml = config_string.parse()?;
let parsed_config = PageServerConf::parse_and_validate(&toml, &workdir)
.unwrap_or_else(|e| panic!("Failed to parse config '{config_string}', reason: {e}"));
.unwrap_or_else(|e| panic!("Failed to parse config '{config_string}', reason: {e:?}"));
assert_eq!(
parsed_config,
@@ -641,6 +697,9 @@ id = 10
remote_storage_config: None,
profiling: ProfilingConfig::Disabled,
default_tenant_conf: TenantConf::default(),
broker_endpoints: vec![broker_endpoint
.parse()
.expect("Failed to parse a valid broker endpoint URL")],
},
"Correct defaults should be used when no config values are provided"
);
@@ -652,15 +711,16 @@ id = 10
fn parse_basic_config() -> anyhow::Result<()> {
let tempdir = tempdir()?;
let (workdir, pg_distrib_dir) = prepare_fs(&tempdir)?;
let broker_endpoint = "http://127.0.0.1:7777";
let config_string = format!(
"{ALL_BASE_VALUES_TOML}pg_distrib_dir='{}'",
"{ALL_BASE_VALUES_TOML}pg_distrib_dir='{}'\nbroker_endpoints = ['{broker_endpoint}']",
pg_distrib_dir.display()
);
let toml = config_string.parse()?;
let parsed_config = PageServerConf::parse_and_validate(&toml, &workdir)
.unwrap_or_else(|e| panic!("Failed to parse config '{config_string}', reason: {e}"));
.unwrap_or_else(|e| panic!("Failed to parse config '{config_string}', reason: {e:?}"));
assert_eq!(
parsed_config,
@@ -680,6 +740,9 @@ id = 10
remote_storage_config: None,
profiling: ProfilingConfig::Disabled,
default_tenant_conf: TenantConf::default(),
broker_endpoints: vec![broker_endpoint
.parse()
.expect("Failed to parse a valid broker endpoint URL")],
},
"Should be able to parse all basic config values correctly"
);
@@ -691,6 +754,7 @@ id = 10
fn parse_remote_fs_storage_config() -> anyhow::Result<()> {
let tempdir = tempdir()?;
let (workdir, pg_distrib_dir) = prepare_fs(&tempdir)?;
let broker_endpoint = "http://127.0.0.1:7777";
let local_storage_path = tempdir.path().join("local_remote_storage");
@@ -710,6 +774,7 @@ local_path = '{}'"#,
let config_string = format!(
r#"{ALL_BASE_VALUES_TOML}
pg_distrib_dir='{}'
broker_endpoints = ['{broker_endpoint}']
{remote_storage_config_str}"#,
pg_distrib_dir.display(),
@@ -718,7 +783,9 @@ pg_distrib_dir='{}'
let toml = config_string.parse()?;
let parsed_remote_storage_config = PageServerConf::parse_and_validate(&toml, &workdir)
.unwrap_or_else(|e| panic!("Failed to parse config '{config_string}', reason: {e}"))
.unwrap_or_else(|e| {
panic!("Failed to parse config '{config_string}', reason: {e:?}")
})
.remote_storage_config
.expect("Should have remote storage config for the local FS");
@@ -751,6 +818,7 @@ pg_distrib_dir='{}'
let max_concurrent_syncs = NonZeroUsize::new(111).unwrap();
let max_sync_errors = NonZeroU32::new(222).unwrap();
let s3_concurrency_limit = NonZeroUsize::new(333).unwrap();
let broker_endpoint = "http://127.0.0.1:7777";
let identical_toml_declarations = &[
format!(
@@ -773,6 +841,7 @@ concurrency_limit = {s3_concurrency_limit}"#
let config_string = format!(
r#"{ALL_BASE_VALUES_TOML}
pg_distrib_dir='{}'
broker_endpoints = ['{broker_endpoint}']
{remote_storage_config_str}"#,
pg_distrib_dir.display(),
@@ -781,7 +850,9 @@ pg_distrib_dir='{}'
let toml = config_string.parse()?;
let parsed_remote_storage_config = PageServerConf::parse_and_validate(&toml, &workdir)
.unwrap_or_else(|e| panic!("Failed to parse config '{config_string}', reason: {e}"))
.unwrap_or_else(|e| {
panic!("Failed to parse config '{config_string}', reason: {e:?}")
})
.remote_storage_config
.expect("Should have remote storage config for S3");

View File

@@ -1,7 +1,7 @@
//
// Main entry point for the safekeeper executable
//
use anyhow::{bail, Context, Result};
use anyhow::{bail, ensure, Context, Result};
use clap::{App, Arg};
use const_format::formatcp;
use daemonize::Daemonize;
@@ -31,7 +31,7 @@ const LOCK_FILE_NAME: &str = "safekeeper.lock";
const ID_FILE_NAME: &str = "safekeeper.id";
project_git_version!(GIT_VERSION);
fn main() -> Result<()> {
fn main() -> anyhow::Result<()> {
metrics::set_common_metrics_prefix("safekeeper");
let arg_matches = App::new("Zenith safekeeper")
.about("Store WAL stream to local file system and push it to WAL receivers")
@@ -177,8 +177,12 @@ fn main() -> Result<()> {
if let Some(addr) = arg_matches.value_of("broker-endpoints") {
let collected_ep: Result<Vec<Url>, ParseError> = addr.split(',').map(Url::parse).collect();
conf.broker_endpoints = Some(collected_ep?);
conf.broker_endpoints = collected_ep.context("Failed to parse broker endpoint urls")?;
}
ensure!(
!conf.broker_endpoints.is_empty(),
"No broker endpoints provided"
);
if let Some(prefix) = arg_matches.value_of("broker-etcd-prefix") {
conf.broker_etcd_prefix = prefix.to_string();
}
@@ -309,16 +313,14 @@ fn start_safekeeper(mut conf: SafeKeeperConf, given_id: Option<ZNodeId>, init: b
.unwrap();
threads.push(callmemaybe_thread);
if conf.broker_endpoints.is_some() {
let conf_ = conf.clone();
threads.push(
thread::Builder::new()
.name("broker thread".into())
.spawn(|| {
broker::thread_main(conf_);
})?,
);
}
let conf_ = conf.clone();
threads.push(
thread::Builder::new()
.name("broker thread".into())
.spawn(|| {
broker::thread_main(conf_);
})?,
);
let conf_ = conf.clone();
threads.push(

View File

@@ -46,7 +46,7 @@ fn timeline_safekeeper_path(
/// Push once in a while data about all active timelines to the broker.
async fn push_loop(conf: SafeKeeperConf) -> anyhow::Result<()> {
let mut client = Client::connect(&conf.broker_endpoints.as_ref().unwrap(), None).await?;
let mut client = Client::connect(&conf.broker_endpoints, None).await?;
// Get and maintain lease to automatically delete obsolete data
let lease = client.lease_grant(LEASE_TTL_SEC, None).await?;
@@ -91,7 +91,7 @@ async fn push_loop(conf: SafeKeeperConf) -> anyhow::Result<()> {
/// Subscribe and fetch all the interesting data from the broker.
async fn pull_loop(conf: SafeKeeperConf) -> Result<()> {
let mut client = Client::connect(&conf.broker_endpoints.as_ref().unwrap(), None).await?;
let mut client = Client::connect(&conf.broker_endpoints, None).await?;
let mut subscription = etcd_broker::subscribe_to_safekeeper_timeline_updates(
&mut client,
@@ -99,7 +99,6 @@ async fn pull_loop(conf: SafeKeeperConf) -> Result<()> {
)
.await
.context("failed to subscribe for safekeeper info")?;
loop {
match subscription.fetch_data().await {
Some(new_info) => {

View File

@@ -51,7 +51,7 @@ pub struct SafeKeeperConf {
pub ttl: Option<Duration>,
pub recall_period: Duration,
pub my_id: ZNodeId,
pub broker_endpoints: Option<Vec<Url>>,
pub broker_endpoints: Vec<Url>,
pub broker_etcd_prefix: String,
pub s3_offload_enabled: bool,
}
@@ -81,7 +81,7 @@ impl Default for SafeKeeperConf {
ttl: None,
recall_period: defaults::DEFAULT_RECALL_PERIOD,
my_id: ZNodeId(0),
broker_endpoints: None,
broker_endpoints: Vec::new(),
broker_etcd_prefix: defaults::DEFAULT_NEON_BROKER_PREFIX.to_string(),
s3_offload_enabled: true,
}

View File

@@ -10,13 +10,6 @@ from fixtures.zenith_fixtures import ZenithEnv, ZenithEnvBuilder, ZenithPageserv
# Create ancestor branches off the main branch.
#
def test_ancestor_branch(zenith_env_builder: ZenithEnvBuilder):
# Use safekeeper in this test to avoid a subtle race condition.
# Without safekeeper, walreceiver reconnection can stuck
# because of IO deadlock.
#
# See https://github.com/zenithdb/zenith/issues/1068
zenith_env_builder.num_safekeepers = 1
env = zenith_env_builder.init_start()
# Override defaults, 1M gc_horizon and 4M checkpoint_distance.

View File

@@ -94,7 +94,6 @@ def check_backpressure(pg: Postgres, stop_event: threading.Event, polling_interv
@pytest.mark.skip("See https://github.com/neondatabase/neon/issues/1587")
def test_backpressure_received_lsn_lag(zenith_env_builder: ZenithEnvBuilder):
zenith_env_builder.num_safekeepers = 1
env = zenith_env_builder.init_start()
# Create a branch for us
env.zenith_cli.create_branch('test_backpressure')

View File

@@ -6,8 +6,6 @@ from fixtures.zenith_fixtures import ZenithEnvBuilder
# Test restarting page server, while safekeeper and compute node keep
# running.
def test_next_xid(zenith_env_builder: ZenithEnvBuilder):
# One safekeeper is enough for this test.
zenith_env_builder.num_safekeepers = 1
env = zenith_env_builder.init_start()
pg = env.postgres.create_start('main')

View File

@@ -5,8 +5,6 @@ from fixtures.log_helper import log
# Test restarting page server, while safekeeper and compute node keep
# running.
def test_pageserver_restart(zenith_env_builder: ZenithEnvBuilder):
# One safekeeper is enough for this test.
zenith_env_builder.num_safekeepers = 1
env = zenith_env_builder.init_start()
env.zenith_cli.create_branch('test_pageserver_restart')

View File

@@ -32,7 +32,6 @@ import pytest
@pytest.mark.parametrize('storage_type', ['local_fs', 'mock_s3'])
def test_remote_storage_backup_and_restore(zenith_env_builder: ZenithEnvBuilder, storage_type: str):
# zenith_env_builder.rust_log_override = 'debug'
zenith_env_builder.num_safekeepers = 1
if storage_type == 'local_fs':
zenith_env_builder.enable_local_fs_remote_storage()
elif storage_type == 'mock_s3':

View File

@@ -8,7 +8,7 @@ from fixtures.log_helper import log
import signal
import pytest
from fixtures.zenith_fixtures import PgProtocol, PortDistributor, Postgres, ZenithEnvBuilder, ZenithPageserverHttpClient, assert_local, wait_for, wait_for_last_record_lsn, wait_for_upload, zenith_binpath, pg_distrib_dir
from fixtures.zenith_fixtures import PgProtocol, PortDistributor, Postgres, ZenithEnvBuilder, Etcd, ZenithPageserverHttpClient, assert_local, wait_for, wait_for_last_record_lsn, wait_for_upload, zenith_binpath, pg_distrib_dir
from fixtures.utils import lsn_from_hex
@@ -21,7 +21,8 @@ def new_pageserver_helper(new_pageserver_dir: pathlib.Path,
pageserver_bin: pathlib.Path,
remote_storage_mock_path: pathlib.Path,
pg_port: int,
http_port: int):
http_port: int,
broker: Etcd):
"""
cannot use ZenithPageserver yet because it depends on zenith cli
which currently lacks support for multiple pageservers
@@ -36,6 +37,7 @@ def new_pageserver_helper(new_pageserver_dir: pathlib.Path,
f"-c pg_distrib_dir='{pg_distrib_dir}'",
f"-c id=2",
f"-c remote_storage={{local_path='{remote_storage_mock_path}'}}",
f"-c broker_endpoints=['{broker.client_url()}']",
]
subprocess.check_output(cmd, text=True)
@@ -103,7 +105,6 @@ def load(pg: Postgres, stop_event: threading.Event, load_ok_event: threading.Eve
def test_tenant_relocation(zenith_env_builder: ZenithEnvBuilder,
port_distributor: PortDistributor,
with_load: str):
zenith_env_builder.num_safekeepers = 1
zenith_env_builder.enable_local_fs_remote_storage()
env = zenith_env_builder.init_start()
@@ -180,7 +181,8 @@ def test_tenant_relocation(zenith_env_builder: ZenithEnvBuilder,
pageserver_bin,
remote_storage_mock_path,
new_pageserver_pg_port,
new_pageserver_http_port):
new_pageserver_http_port,
zenith_env_builder.broker):
# call to attach timeline to new pageserver
new_pageserver_http.timeline_attach(tenant, timeline)

View File

@@ -70,7 +70,6 @@ def wait_for_pageserver_catchup(pgmain: Postgres, polling_interval=1, timeout=60
def test_timeline_size_quota(zenith_env_builder: ZenithEnvBuilder):
zenith_env_builder.num_safekeepers = 1
env = zenith_env_builder.init_start()
new_timeline_id = env.zenith_cli.create_branch('test_timeline_size_quota')

View File

@@ -12,7 +12,7 @@ from contextlib import closing
from dataclasses import dataclass, field
from multiprocessing import Process, Value
from pathlib import Path
from fixtures.zenith_fixtures import PgBin, Postgres, Safekeeper, ZenithEnv, ZenithEnvBuilder, PortDistributor, SafekeeperPort, zenith_binpath, PgProtocol
from fixtures.zenith_fixtures import PgBin, Etcd, Postgres, Safekeeper, ZenithEnv, ZenithEnvBuilder, PortDistributor, SafekeeperPort, zenith_binpath, PgProtocol
from fixtures.utils import etcd_path, get_dir_size, lsn_to_hex, mkdir_if_needed, lsn_from_hex
from fixtures.log_helper import log
from typing import List, Optional, Any
@@ -22,7 +22,6 @@ from typing import List, Optional, Any
# succeed and data is written
def test_normal_work(zenith_env_builder: ZenithEnvBuilder):
zenith_env_builder.num_safekeepers = 3
zenith_env_builder.broker = True
env = zenith_env_builder.init_start()
env.zenith_cli.create_branch('test_safekeepers_normal_work')
@@ -331,7 +330,6 @@ def test_race_conditions(zenith_env_builder: ZenithEnvBuilder, stop_value):
@pytest.mark.skipif(etcd_path() is None, reason="requires etcd which is not present in PATH")
def test_broker(zenith_env_builder: ZenithEnvBuilder):
zenith_env_builder.num_safekeepers = 3
zenith_env_builder.broker = True
zenith_env_builder.enable_local_fs_remote_storage()
env = zenith_env_builder.init_start()
@@ -374,7 +372,6 @@ def test_broker(zenith_env_builder: ZenithEnvBuilder):
@pytest.mark.skipif(etcd_path() is None, reason="requires etcd which is not present in PATH")
def test_wal_removal(zenith_env_builder: ZenithEnvBuilder):
zenith_env_builder.num_safekeepers = 2
zenith_env_builder.broker = True
# to advance remote_consistent_llsn
zenith_env_builder.enable_local_fs_remote_storage()
env = zenith_env_builder.init_start()
@@ -557,8 +554,6 @@ def test_sync_safekeepers(zenith_env_builder: ZenithEnvBuilder,
def test_timeline_status(zenith_env_builder: ZenithEnvBuilder):
zenith_env_builder.num_safekeepers = 1
env = zenith_env_builder.init_start()
env.zenith_cli.create_branch('test_timeline_status')
@@ -599,6 +594,9 @@ class SafekeeperEnv:
num_safekeepers: int = 1):
self.repo_dir = repo_dir
self.port_distributor = port_distributor
self.broker = Etcd(datadir=os.path.join(self.repo_dir, "etcd"),
port=self.port_distributor.get_port(),
peer_port=self.port_distributor.get_port())
self.pg_bin = pg_bin
self.num_safekeepers = num_safekeepers
self.bin_safekeeper = os.path.join(str(zenith_binpath), 'safekeeper')
@@ -645,6 +643,8 @@ class SafekeeperEnv:
safekeeper_dir,
"--id",
str(i),
"--broker-endpoints",
self.broker.client_url(),
"--daemonize"
]
@@ -698,7 +698,6 @@ def test_safekeeper_without_pageserver(test_output_dir: str,
repo_dir,
port_distributor,
pg_bin,
num_safekeepers=1,
)
with env:

View File

@@ -15,7 +15,6 @@ def test_wal_restore(zenith_env_builder: ZenithEnvBuilder,
pg_bin: PgBin,
test_output_dir,
port_distributor: PortDistributor):
zenith_env_builder.num_safekeepers = 1
env = zenith_env_builder.init_start()
env.zenith_cli.create_branch("test_wal_restore")
pg = env.postgres.create_start('test_wal_restore')

View File

@@ -94,8 +94,6 @@ def test_cli_tenant_create(zenith_simple_env: ZenithEnv):
def test_cli_ipv4_listeners(zenith_env_builder: ZenithEnvBuilder):
# Start with single sk
zenith_env_builder.num_safekeepers = 1
env = zenith_env_builder.init_start()
# Connect to sk port on v4 loopback
@@ -111,8 +109,6 @@ def test_cli_ipv4_listeners(zenith_env_builder: ZenithEnvBuilder):
def test_cli_start_stop(zenith_env_builder: ZenithEnvBuilder):
# Start with single sk
zenith_env_builder.num_safekeepers = 1
env = zenith_env_builder.init_start()
# Stop default ps/sk

View File

@@ -412,11 +412,10 @@ class ZenithEnvBuilder:
port_distributor: PortDistributor,
pageserver_remote_storage: Optional[RemoteStorage] = None,
pageserver_config_override: Optional[str] = None,
num_safekeepers: int = 0,
num_safekeepers: int = 1,
pageserver_auth_enabled: bool = False,
rust_log_override: Optional[str] = None,
default_branch_name=DEFAULT_BRANCH_NAME,
broker: bool = False):
default_branch_name=DEFAULT_BRANCH_NAME):
self.repo_dir = repo_dir
self.rust_log_override = rust_log_override
self.port_distributor = port_distributor
@@ -425,7 +424,10 @@ class ZenithEnvBuilder:
self.num_safekeepers = num_safekeepers
self.pageserver_auth_enabled = pageserver_auth_enabled
self.default_branch_name = default_branch_name
self.broker = broker
# keep etcd datadir inside 'repo'
self.broker = Etcd(datadir=os.path.join(self.repo_dir, "etcd"),
port=self.port_distributor.get_port(),
peer_port=self.port_distributor.get_port())
self.env: Optional[ZenithEnv] = None
self.s3_mock_server: Optional[MockS3Server] = None
@@ -551,14 +553,9 @@ class ZenithEnv:
default_tenant_id = '{self.initial_tenant.hex}'
""")
self.broker = None
if config.broker:
# keep etcd datadir inside 'repo'
self.broker = Etcd(datadir=os.path.join(self.repo_dir, "etcd"),
port=self.port_distributor.get_port(),
peer_port=self.port_distributor.get_port())
toml += textwrap.dedent(f"""
broker_endpoints = ['http://127.0.0.1:{self.broker.port}']
self.broker = config.broker
toml += textwrap.dedent(f"""
broker_endpoints = ['{self.broker.client_url()}']
""")
# Create config for pageserver
@@ -1851,24 +1848,29 @@ class Etcd:
peer_port: int
handle: Optional[subprocess.Popen[Any]] = None # handle of running daemon
def client_url(self):
return f'http://127.0.0.1:{self.port}'
def check_status(self):
s = requests.Session()
s.mount('http://', requests.adapters.HTTPAdapter(max_retries=1)) # do not retry
s.get(f"http://localhost:{self.port}/health").raise_for_status()
s.get(f"{self.client_url()}/health").raise_for_status()
def start(self):
pathlib.Path(self.datadir).mkdir(exist_ok=True)
etcd_full_path = etcd_path()
if etcd_full_path is None:
raise Exception('etcd not found')
raise Exception('etcd binary not found locally')
client_url = self.client_url()
log.info(f'Starting etcd to listen incoming connections at "{client_url}"')
with open(os.path.join(self.datadir, "etcd.log"), "wb") as log_file:
args = [
etcd_full_path,
f"--data-dir={self.datadir}",
f"--listen-client-urls=http://localhost:{self.port}",
f"--advertise-client-urls=http://localhost:{self.port}",
f"--listen-peer-urls=http://localhost:{self.peer_port}"
f"--listen-client-urls={client_url}",
f"--advertise-client-urls={client_url}",
f"--listen-peer-urls=http://127.0.0.1:{self.peer_port}"
]
self.handle = subprocess.Popen(args, stdout=log_file, stderr=log_file)
@@ -1920,7 +1922,13 @@ def test_output_dir(request: Any) -> str:
return test_dir
SKIP_DIRS = frozenset(('pg_wal', 'pg_stat', 'pg_stat_tmp', 'pg_subtrans', 'pg_logical'))
SKIP_DIRS = frozenset(('pg_wal',
'pg_stat',
'pg_stat_tmp',
'pg_subtrans',
'pg_logical',
'pg_replslot/wal_proposer_slot',
'pg_xact'))
SKIP_FILES = frozenset(('pg_internal.init',
'pg.log',