diff --git a/.circleci/ansible/systemd/pageserver.service b/.circleci/ansible/systemd/pageserver.service index d346643e58..54a7b1ba0a 100644 --- a/.circleci/ansible/systemd/pageserver.service +++ b/.circleci/ansible/systemd/pageserver.service @@ -6,7 +6,7 @@ After=network.target auditd.service Type=simple User=pageserver Environment=RUST_BACKTRACE=1 ZENITH_REPO_DIR=/storage/pageserver LD_LIBRARY_PATH=/usr/local/lib -ExecStart=/usr/local/bin/pageserver -c "pg_distrib_dir='/usr/local'" -c "listen_pg_addr='0.0.0.0:6400'" -c "listen_http_addr='0.0.0.0:9898'" -D /storage/pageserver/data +ExecStart=/usr/local/bin/pageserver -c "pg_distrib_dir='/usr/local'" -c "listen_pg_addr='0.0.0.0:6400'" -c "listen_http_addr='0.0.0.0:9898'" -c "broker_endpoints=['{{ etcd_endpoints }}']" -D /storage/pageserver/data ExecReload=/bin/kill -HUP $MAINPID KillMode=mixed KillSignal=SIGINT diff --git a/control_plane/src/local_env.rs b/control_plane/src/local_env.rs index 35167ebabf..a8636f9073 100644 --- a/control_plane/src/local_env.rs +++ b/control_plane/src/local_env.rs @@ -97,6 +97,7 @@ pub struct PageServerConf { // jwt auth token used for communication with pageserver pub auth_token: String, + pub broker_endpoints: Vec, } impl Default for PageServerConf { @@ -107,6 +108,7 @@ impl Default for PageServerConf { listen_http_addr: String::new(), auth_type: AuthType::Trust, auth_token: String::new(), + broker_endpoints: Vec::new(), } } } @@ -401,6 +403,7 @@ impl LocalEnv { self.pageserver.auth_token = self.generate_auth_token(&Claims::new(None, Scope::PageServerApi))?; + self.pageserver.broker_endpoints = self.broker_endpoints.clone(); fs::create_dir_all(self.pg_data_dirs_path())?; diff --git a/control_plane/src/safekeeper.rs b/control_plane/src/safekeeper.rs index aeeb4a50ec..c5b7f830bf 100644 --- a/control_plane/src/safekeeper.rs +++ b/control_plane/src/safekeeper.rs @@ -137,6 +137,7 @@ impl SafekeeperNode { .args(&["--listen-pg", &listen_pg]) .args(&["--listen-http", &listen_http]) .args(&["--recall", "1 second"]) + .args(&["--broker-endpoints", &self.broker_endpoints.join(",")]) .arg("--daemonize"), ); if !self.conf.sync { diff --git a/control_plane/src/storage.rs b/control_plane/src/storage.rs index d2e63a22de..0b9fddd64a 100644 --- a/control_plane/src/storage.rs +++ b/control_plane/src/storage.rs @@ -121,6 +121,16 @@ impl PageServerNode { ); let listen_pg_addr_param = format!("listen_pg_addr='{}'", self.env.pageserver.listen_pg_addr); + let broker_endpoints_param = format!( + "broker_endpoints=[{}]", + self.env + .pageserver + .broker_endpoints + .iter() + .map(|url| format!("'{url}'")) + .collect::>() + .join(",") + ); let mut args = Vec::with_capacity(20); args.push("--init"); @@ -129,6 +139,7 @@ impl PageServerNode { args.extend(["-c", &authg_type_param]); args.extend(["-c", &listen_http_addr_param]); args.extend(["-c", &listen_pg_addr_param]); + args.extend(["-c", &broker_endpoints_param]); args.extend(["-c", &id]); for config_override in config_overrides { diff --git a/docker-entrypoint.sh b/docker-entrypoint.sh index 93bb5f9cd7..0e4cf45f29 100755 --- a/docker-entrypoint.sh +++ b/docker-entrypoint.sh @@ -7,7 +7,11 @@ if [ "$1" = 'pageserver' ]; then pageserver --init -D /data -c "pg_distrib_dir='/usr/local'" -c "id=10" fi echo "Staring pageserver at 0.0.0.0:6400" - pageserver -c "listen_pg_addr='0.0.0.0:6400'" -c "listen_http_addr='0.0.0.0:9898'" -D /data + if [ -z '${BROKER_ENDPOINTS}' ]; then + pageserver -c "listen_pg_addr='0.0.0.0:6400'" -c "listen_http_addr='0.0.0.0:9898'" -D /data + else + pageserver -c "listen_pg_addr='0.0.0.0:6400'" -c "listen_http_addr='0.0.0.0:9898'" -c "broker_endpoints=['${BROKER_ENDPOINTS}']" -D /data + fi else "$@" fi diff --git a/pageserver/src/config.rs b/pageserver/src/config.rs index 5257732c5c..8748683f32 100644 --- a/pageserver/src/config.rs +++ b/pageserver/src/config.rs @@ -13,6 +13,7 @@ use std::str::FromStr; use std::time::Duration; use toml_edit; use toml_edit::{Document, Item}; +use url::Url; use utils::{ postgres_backend::AuthType, zid::{ZNodeId, ZTenantId, ZTimelineId}, @@ -111,6 +112,9 @@ pub struct PageServerConf { pub profiling: ProfilingConfig, pub default_tenant_conf: TenantConf, + + /// Etcd broker endpoints to connect to. + pub broker_endpoints: Vec, } #[derive(Debug, Clone, PartialEq, Eq)] @@ -175,6 +179,7 @@ struct PageServerConfigBuilder { id: BuilderValue, profiling: BuilderValue, + broker_endpoints: BuilderValue>, } impl Default for PageServerConfigBuilder { @@ -200,6 +205,7 @@ impl Default for PageServerConfigBuilder { remote_storage_config: Set(None), id: NotSet, profiling: Set(ProfilingConfig::Disabled), + broker_endpoints: NotSet, } } } @@ -256,6 +262,10 @@ impl PageServerConfigBuilder { self.remote_storage_config = BuilderValue::Set(remote_storage_config) } + pub fn broker_endpoints(&mut self, broker_endpoints: Vec) { + self.broker_endpoints = BuilderValue::Set(broker_endpoints) + } + pub fn id(&mut self, node_id: ZNodeId) { self.id = BuilderValue::Set(node_id) } @@ -264,7 +274,15 @@ impl PageServerConfigBuilder { self.profiling = BuilderValue::Set(profiling) } - pub fn build(self) -> Result { + pub fn build(self) -> anyhow::Result { + let broker_endpoints = self + .broker_endpoints + .ok_or(anyhow!("No broker endpoints provided"))?; + ensure!( + !broker_endpoints.is_empty(), + "Empty broker endpoints collection provided" + ); + Ok(PageServerConf { listen_pg_addr: self .listen_pg_addr @@ -300,6 +318,7 @@ impl PageServerConfigBuilder { profiling: self.profiling.ok_or(anyhow!("missing profiling"))?, // TenantConf is handled separately default_tenant_conf: TenantConf::default(), + broker_endpoints, }) } } @@ -341,7 +360,7 @@ impl PageServerConf { /// validating the input and failing on errors. /// /// This leaves any options not present in the file in the built-in defaults. - pub fn parse_and_validate(toml: &Document, workdir: &Path) -> Result { + pub fn parse_and_validate(toml: &Document, workdir: &Path) -> anyhow::Result { let mut builder = PageServerConfigBuilder::default(); builder.workdir(workdir.to_owned()); @@ -373,6 +392,16 @@ impl PageServerConf { } "id" => builder.id(ZNodeId(parse_toml_u64(key, item)?)), "profiling" => builder.profiling(parse_toml_from_str(key, item)?), + "broker_endpoints" => builder.broker_endpoints( + parse_toml_array(key, item)? + .into_iter() + .map(|endpoint_str| { + endpoint_str.parse::().with_context(|| { + format!("Array item {endpoint_str} for key {key} is not a valid url endpoint") + }) + }) + .collect::>()?, + ), _ => bail!("unrecognized pageserver option '{key}'"), } } @@ -526,6 +555,7 @@ impl PageServerConf { remote_storage_config: None, profiling: ProfilingConfig::Disabled, default_tenant_conf: TenantConf::dummy_conf(), + broker_endpoints: Vec::new(), } } } @@ -576,14 +606,36 @@ fn parse_toml_duration(name: &str, item: &Item) -> Result { Ok(humantime::parse_duration(s)?) } -fn parse_toml_from_str(name: &str, item: &Item) -> Result +fn parse_toml_from_str(name: &str, item: &Item) -> anyhow::Result where - T: FromStr, + T: FromStr, + ::Err: std::fmt::Display, { let v = item .as_str() .with_context(|| format!("configure option {name} is not a string"))?; - T::from_str(v) + T::from_str(v).map_err(|e| { + anyhow!( + "Failed to parse string as {parse_type} for configure option {name}: {e}", + parse_type = stringify!(T) + ) + }) +} + +fn parse_toml_array(name: &str, item: &Item) -> anyhow::Result> { + let array = item + .as_array() + .with_context(|| format!("configure option {name} is not an array"))?; + + array + .iter() + .map(|value| { + value + .as_str() + .map(str::to_string) + .with_context(|| format!("Array item {value:?} for key {name} is not a string")) + }) + .collect() } #[cfg(test)] @@ -616,12 +668,16 @@ id = 10 fn parse_defaults() -> anyhow::Result<()> { let tempdir = tempdir()?; let (workdir, pg_distrib_dir) = prepare_fs(&tempdir)?; - // we have to create dummy pathes to overcome the validation errors - let config_string = format!("pg_distrib_dir='{}'\nid=10", pg_distrib_dir.display()); + let broker_endpoint = "http://127.0.0.1:7777"; + // we have to create dummy values to overcome the validation errors + let config_string = format!( + "pg_distrib_dir='{}'\nid=10\nbroker_endpoints = ['{broker_endpoint}']", + pg_distrib_dir.display() + ); let toml = config_string.parse()?; let parsed_config = PageServerConf::parse_and_validate(&toml, &workdir) - .unwrap_or_else(|e| panic!("Failed to parse config '{config_string}', reason: {e}")); + .unwrap_or_else(|e| panic!("Failed to parse config '{config_string}', reason: {e:?}")); assert_eq!( parsed_config, @@ -641,6 +697,9 @@ id = 10 remote_storage_config: None, profiling: ProfilingConfig::Disabled, default_tenant_conf: TenantConf::default(), + broker_endpoints: vec![broker_endpoint + .parse() + .expect("Failed to parse a valid broker endpoint URL")], }, "Correct defaults should be used when no config values are provided" ); @@ -652,15 +711,16 @@ id = 10 fn parse_basic_config() -> anyhow::Result<()> { let tempdir = tempdir()?; let (workdir, pg_distrib_dir) = prepare_fs(&tempdir)?; + let broker_endpoint = "http://127.0.0.1:7777"; let config_string = format!( - "{ALL_BASE_VALUES_TOML}pg_distrib_dir='{}'", + "{ALL_BASE_VALUES_TOML}pg_distrib_dir='{}'\nbroker_endpoints = ['{broker_endpoint}']", pg_distrib_dir.display() ); let toml = config_string.parse()?; let parsed_config = PageServerConf::parse_and_validate(&toml, &workdir) - .unwrap_or_else(|e| panic!("Failed to parse config '{config_string}', reason: {e}")); + .unwrap_or_else(|e| panic!("Failed to parse config '{config_string}', reason: {e:?}")); assert_eq!( parsed_config, @@ -680,6 +740,9 @@ id = 10 remote_storage_config: None, profiling: ProfilingConfig::Disabled, default_tenant_conf: TenantConf::default(), + broker_endpoints: vec![broker_endpoint + .parse() + .expect("Failed to parse a valid broker endpoint URL")], }, "Should be able to parse all basic config values correctly" ); @@ -691,6 +754,7 @@ id = 10 fn parse_remote_fs_storage_config() -> anyhow::Result<()> { let tempdir = tempdir()?; let (workdir, pg_distrib_dir) = prepare_fs(&tempdir)?; + let broker_endpoint = "http://127.0.0.1:7777"; let local_storage_path = tempdir.path().join("local_remote_storage"); @@ -710,6 +774,7 @@ local_path = '{}'"#, let config_string = format!( r#"{ALL_BASE_VALUES_TOML} pg_distrib_dir='{}' +broker_endpoints = ['{broker_endpoint}'] {remote_storage_config_str}"#, pg_distrib_dir.display(), @@ -718,7 +783,9 @@ pg_distrib_dir='{}' let toml = config_string.parse()?; let parsed_remote_storage_config = PageServerConf::parse_and_validate(&toml, &workdir) - .unwrap_or_else(|e| panic!("Failed to parse config '{config_string}', reason: {e}")) + .unwrap_or_else(|e| { + panic!("Failed to parse config '{config_string}', reason: {e:?}") + }) .remote_storage_config .expect("Should have remote storage config for the local FS"); @@ -751,6 +818,7 @@ pg_distrib_dir='{}' let max_concurrent_syncs = NonZeroUsize::new(111).unwrap(); let max_sync_errors = NonZeroU32::new(222).unwrap(); let s3_concurrency_limit = NonZeroUsize::new(333).unwrap(); + let broker_endpoint = "http://127.0.0.1:7777"; let identical_toml_declarations = &[ format!( @@ -773,6 +841,7 @@ concurrency_limit = {s3_concurrency_limit}"# let config_string = format!( r#"{ALL_BASE_VALUES_TOML} pg_distrib_dir='{}' +broker_endpoints = ['{broker_endpoint}'] {remote_storage_config_str}"#, pg_distrib_dir.display(), @@ -781,7 +850,9 @@ pg_distrib_dir='{}' let toml = config_string.parse()?; let parsed_remote_storage_config = PageServerConf::parse_and_validate(&toml, &workdir) - .unwrap_or_else(|e| panic!("Failed to parse config '{config_string}', reason: {e}")) + .unwrap_or_else(|e| { + panic!("Failed to parse config '{config_string}', reason: {e:?}") + }) .remote_storage_config .expect("Should have remote storage config for S3"); diff --git a/safekeeper/src/bin/safekeeper.rs b/safekeeper/src/bin/safekeeper.rs index 6955d2aa5c..d7875a9069 100644 --- a/safekeeper/src/bin/safekeeper.rs +++ b/safekeeper/src/bin/safekeeper.rs @@ -1,7 +1,7 @@ // // Main entry point for the safekeeper executable // -use anyhow::{bail, Context, Result}; +use anyhow::{bail, ensure, Context, Result}; use clap::{App, Arg}; use const_format::formatcp; use daemonize::Daemonize; @@ -31,7 +31,7 @@ const LOCK_FILE_NAME: &str = "safekeeper.lock"; const ID_FILE_NAME: &str = "safekeeper.id"; project_git_version!(GIT_VERSION); -fn main() -> Result<()> { +fn main() -> anyhow::Result<()> { metrics::set_common_metrics_prefix("safekeeper"); let arg_matches = App::new("Zenith safekeeper") .about("Store WAL stream to local file system and push it to WAL receivers") @@ -177,8 +177,12 @@ fn main() -> Result<()> { if let Some(addr) = arg_matches.value_of("broker-endpoints") { let collected_ep: Result, ParseError> = addr.split(',').map(Url::parse).collect(); - conf.broker_endpoints = Some(collected_ep?); + conf.broker_endpoints = collected_ep.context("Failed to parse broker endpoint urls")?; } + ensure!( + !conf.broker_endpoints.is_empty(), + "No broker endpoints provided" + ); if let Some(prefix) = arg_matches.value_of("broker-etcd-prefix") { conf.broker_etcd_prefix = prefix.to_string(); } @@ -309,16 +313,14 @@ fn start_safekeeper(mut conf: SafeKeeperConf, given_id: Option, init: b .unwrap(); threads.push(callmemaybe_thread); - if conf.broker_endpoints.is_some() { - let conf_ = conf.clone(); - threads.push( - thread::Builder::new() - .name("broker thread".into()) - .spawn(|| { - broker::thread_main(conf_); - })?, - ); - } + let conf_ = conf.clone(); + threads.push( + thread::Builder::new() + .name("broker thread".into()) + .spawn(|| { + broker::thread_main(conf_); + })?, + ); let conf_ = conf.clone(); threads.push( diff --git a/safekeeper/src/broker.rs b/safekeeper/src/broker.rs index d9c60c9db0..c906bc1e74 100644 --- a/safekeeper/src/broker.rs +++ b/safekeeper/src/broker.rs @@ -46,7 +46,7 @@ fn timeline_safekeeper_path( /// Push once in a while data about all active timelines to the broker. async fn push_loop(conf: SafeKeeperConf) -> anyhow::Result<()> { - let mut client = Client::connect(&conf.broker_endpoints.as_ref().unwrap(), None).await?; + let mut client = Client::connect(&conf.broker_endpoints, None).await?; // Get and maintain lease to automatically delete obsolete data let lease = client.lease_grant(LEASE_TTL_SEC, None).await?; @@ -91,7 +91,7 @@ async fn push_loop(conf: SafeKeeperConf) -> anyhow::Result<()> { /// Subscribe and fetch all the interesting data from the broker. async fn pull_loop(conf: SafeKeeperConf) -> Result<()> { - let mut client = Client::connect(&conf.broker_endpoints.as_ref().unwrap(), None).await?; + let mut client = Client::connect(&conf.broker_endpoints, None).await?; let mut subscription = etcd_broker::subscribe_to_safekeeper_timeline_updates( &mut client, @@ -99,7 +99,6 @@ async fn pull_loop(conf: SafeKeeperConf) -> Result<()> { ) .await .context("failed to subscribe for safekeeper info")?; - loop { match subscription.fetch_data().await { Some(new_info) => { diff --git a/safekeeper/src/lib.rs b/safekeeper/src/lib.rs index 09b2e68a49..131076fab6 100644 --- a/safekeeper/src/lib.rs +++ b/safekeeper/src/lib.rs @@ -51,7 +51,7 @@ pub struct SafeKeeperConf { pub ttl: Option, pub recall_period: Duration, pub my_id: ZNodeId, - pub broker_endpoints: Option>, + pub broker_endpoints: Vec, pub broker_etcd_prefix: String, pub s3_offload_enabled: bool, } @@ -81,7 +81,7 @@ impl Default for SafeKeeperConf { ttl: None, recall_period: defaults::DEFAULT_RECALL_PERIOD, my_id: ZNodeId(0), - broker_endpoints: None, + broker_endpoints: Vec::new(), broker_etcd_prefix: defaults::DEFAULT_NEON_BROKER_PREFIX.to_string(), s3_offload_enabled: true, } diff --git a/test_runner/batch_others/test_ancestor_branch.py b/test_runner/batch_others/test_ancestor_branch.py index c07b9d6dd1..5dbd6d2e26 100644 --- a/test_runner/batch_others/test_ancestor_branch.py +++ b/test_runner/batch_others/test_ancestor_branch.py @@ -10,13 +10,6 @@ from fixtures.zenith_fixtures import ZenithEnv, ZenithEnvBuilder, ZenithPageserv # Create ancestor branches off the main branch. # def test_ancestor_branch(zenith_env_builder: ZenithEnvBuilder): - - # Use safekeeper in this test to avoid a subtle race condition. - # Without safekeeper, walreceiver reconnection can stuck - # because of IO deadlock. - # - # See https://github.com/zenithdb/zenith/issues/1068 - zenith_env_builder.num_safekeepers = 1 env = zenith_env_builder.init_start() # Override defaults, 1M gc_horizon and 4M checkpoint_distance. diff --git a/test_runner/batch_others/test_backpressure.py b/test_runner/batch_others/test_backpressure.py index 6658b337ec..81f45b749b 100644 --- a/test_runner/batch_others/test_backpressure.py +++ b/test_runner/batch_others/test_backpressure.py @@ -94,7 +94,6 @@ def check_backpressure(pg: Postgres, stop_event: threading.Event, polling_interv @pytest.mark.skip("See https://github.com/neondatabase/neon/issues/1587") def test_backpressure_received_lsn_lag(zenith_env_builder: ZenithEnvBuilder): - zenith_env_builder.num_safekeepers = 1 env = zenith_env_builder.init_start() # Create a branch for us env.zenith_cli.create_branch('test_backpressure') diff --git a/test_runner/batch_others/test_next_xid.py b/test_runner/batch_others/test_next_xid.py index 03c27bcd70..1ab1addad3 100644 --- a/test_runner/batch_others/test_next_xid.py +++ b/test_runner/batch_others/test_next_xid.py @@ -6,8 +6,6 @@ from fixtures.zenith_fixtures import ZenithEnvBuilder # Test restarting page server, while safekeeper and compute node keep # running. def test_next_xid(zenith_env_builder: ZenithEnvBuilder): - # One safekeeper is enough for this test. - zenith_env_builder.num_safekeepers = 1 env = zenith_env_builder.init_start() pg = env.postgres.create_start('main') diff --git a/test_runner/batch_others/test_pageserver_restart.py b/test_runner/batch_others/test_pageserver_restart.py index 20e6f4467e..69f5ea85ce 100644 --- a/test_runner/batch_others/test_pageserver_restart.py +++ b/test_runner/batch_others/test_pageserver_restart.py @@ -5,8 +5,6 @@ from fixtures.log_helper import log # Test restarting page server, while safekeeper and compute node keep # running. def test_pageserver_restart(zenith_env_builder: ZenithEnvBuilder): - # One safekeeper is enough for this test. - zenith_env_builder.num_safekeepers = 1 env = zenith_env_builder.init_start() env.zenith_cli.create_branch('test_pageserver_restart') diff --git a/test_runner/batch_others/test_remote_storage.py b/test_runner/batch_others/test_remote_storage.py index e205f79957..3c7bd08996 100644 --- a/test_runner/batch_others/test_remote_storage.py +++ b/test_runner/batch_others/test_remote_storage.py @@ -32,7 +32,6 @@ import pytest @pytest.mark.parametrize('storage_type', ['local_fs', 'mock_s3']) def test_remote_storage_backup_and_restore(zenith_env_builder: ZenithEnvBuilder, storage_type: str): # zenith_env_builder.rust_log_override = 'debug' - zenith_env_builder.num_safekeepers = 1 if storage_type == 'local_fs': zenith_env_builder.enable_local_fs_remote_storage() elif storage_type == 'mock_s3': diff --git a/test_runner/batch_others/test_tenant_relocation.py b/test_runner/batch_others/test_tenant_relocation.py index 279b3a0a25..85a91b9ce1 100644 --- a/test_runner/batch_others/test_tenant_relocation.py +++ b/test_runner/batch_others/test_tenant_relocation.py @@ -8,7 +8,7 @@ from fixtures.log_helper import log import signal import pytest -from fixtures.zenith_fixtures import PgProtocol, PortDistributor, Postgres, ZenithEnvBuilder, ZenithPageserverHttpClient, assert_local, wait_for, wait_for_last_record_lsn, wait_for_upload, zenith_binpath, pg_distrib_dir +from fixtures.zenith_fixtures import PgProtocol, PortDistributor, Postgres, ZenithEnvBuilder, Etcd, ZenithPageserverHttpClient, assert_local, wait_for, wait_for_last_record_lsn, wait_for_upload, zenith_binpath, pg_distrib_dir from fixtures.utils import lsn_from_hex @@ -21,7 +21,8 @@ def new_pageserver_helper(new_pageserver_dir: pathlib.Path, pageserver_bin: pathlib.Path, remote_storage_mock_path: pathlib.Path, pg_port: int, - http_port: int): + http_port: int, + broker: Etcd): """ cannot use ZenithPageserver yet because it depends on zenith cli which currently lacks support for multiple pageservers @@ -36,6 +37,7 @@ def new_pageserver_helper(new_pageserver_dir: pathlib.Path, f"-c pg_distrib_dir='{pg_distrib_dir}'", f"-c id=2", f"-c remote_storage={{local_path='{remote_storage_mock_path}'}}", + f"-c broker_endpoints=['{broker.client_url()}']", ] subprocess.check_output(cmd, text=True) @@ -103,7 +105,6 @@ def load(pg: Postgres, stop_event: threading.Event, load_ok_event: threading.Eve def test_tenant_relocation(zenith_env_builder: ZenithEnvBuilder, port_distributor: PortDistributor, with_load: str): - zenith_env_builder.num_safekeepers = 1 zenith_env_builder.enable_local_fs_remote_storage() env = zenith_env_builder.init_start() @@ -180,7 +181,8 @@ def test_tenant_relocation(zenith_env_builder: ZenithEnvBuilder, pageserver_bin, remote_storage_mock_path, new_pageserver_pg_port, - new_pageserver_http_port): + new_pageserver_http_port, + zenith_env_builder.broker): # call to attach timeline to new pageserver new_pageserver_http.timeline_attach(tenant, timeline) diff --git a/test_runner/batch_others/test_timeline_size.py b/test_runner/batch_others/test_timeline_size.py index db33493d61..0b33b56df3 100644 --- a/test_runner/batch_others/test_timeline_size.py +++ b/test_runner/batch_others/test_timeline_size.py @@ -70,7 +70,6 @@ def wait_for_pageserver_catchup(pgmain: Postgres, polling_interval=1, timeout=60 def test_timeline_size_quota(zenith_env_builder: ZenithEnvBuilder): - zenith_env_builder.num_safekeepers = 1 env = zenith_env_builder.init_start() new_timeline_id = env.zenith_cli.create_branch('test_timeline_size_quota') diff --git a/test_runner/batch_others/test_wal_acceptor.py b/test_runner/batch_others/test_wal_acceptor.py index 67c9d6070e..85798156a7 100644 --- a/test_runner/batch_others/test_wal_acceptor.py +++ b/test_runner/batch_others/test_wal_acceptor.py @@ -12,7 +12,7 @@ from contextlib import closing from dataclasses import dataclass, field from multiprocessing import Process, Value from pathlib import Path -from fixtures.zenith_fixtures import PgBin, Postgres, Safekeeper, ZenithEnv, ZenithEnvBuilder, PortDistributor, SafekeeperPort, zenith_binpath, PgProtocol +from fixtures.zenith_fixtures import PgBin, Etcd, Postgres, Safekeeper, ZenithEnv, ZenithEnvBuilder, PortDistributor, SafekeeperPort, zenith_binpath, PgProtocol from fixtures.utils import etcd_path, get_dir_size, lsn_to_hex, mkdir_if_needed, lsn_from_hex from fixtures.log_helper import log from typing import List, Optional, Any @@ -22,7 +22,6 @@ from typing import List, Optional, Any # succeed and data is written def test_normal_work(zenith_env_builder: ZenithEnvBuilder): zenith_env_builder.num_safekeepers = 3 - zenith_env_builder.broker = True env = zenith_env_builder.init_start() env.zenith_cli.create_branch('test_safekeepers_normal_work') @@ -331,7 +330,6 @@ def test_race_conditions(zenith_env_builder: ZenithEnvBuilder, stop_value): @pytest.mark.skipif(etcd_path() is None, reason="requires etcd which is not present in PATH") def test_broker(zenith_env_builder: ZenithEnvBuilder): zenith_env_builder.num_safekeepers = 3 - zenith_env_builder.broker = True zenith_env_builder.enable_local_fs_remote_storage() env = zenith_env_builder.init_start() @@ -374,7 +372,6 @@ def test_broker(zenith_env_builder: ZenithEnvBuilder): @pytest.mark.skipif(etcd_path() is None, reason="requires etcd which is not present in PATH") def test_wal_removal(zenith_env_builder: ZenithEnvBuilder): zenith_env_builder.num_safekeepers = 2 - zenith_env_builder.broker = True # to advance remote_consistent_llsn zenith_env_builder.enable_local_fs_remote_storage() env = zenith_env_builder.init_start() @@ -557,8 +554,6 @@ def test_sync_safekeepers(zenith_env_builder: ZenithEnvBuilder, def test_timeline_status(zenith_env_builder: ZenithEnvBuilder): - - zenith_env_builder.num_safekeepers = 1 env = zenith_env_builder.init_start() env.zenith_cli.create_branch('test_timeline_status') @@ -599,6 +594,9 @@ class SafekeeperEnv: num_safekeepers: int = 1): self.repo_dir = repo_dir self.port_distributor = port_distributor + self.broker = Etcd(datadir=os.path.join(self.repo_dir, "etcd"), + port=self.port_distributor.get_port(), + peer_port=self.port_distributor.get_port()) self.pg_bin = pg_bin self.num_safekeepers = num_safekeepers self.bin_safekeeper = os.path.join(str(zenith_binpath), 'safekeeper') @@ -645,6 +643,8 @@ class SafekeeperEnv: safekeeper_dir, "--id", str(i), + "--broker-endpoints", + self.broker.client_url(), "--daemonize" ] @@ -698,7 +698,6 @@ def test_safekeeper_without_pageserver(test_output_dir: str, repo_dir, port_distributor, pg_bin, - num_safekeepers=1, ) with env: diff --git a/test_runner/batch_others/test_wal_restore.py b/test_runner/batch_others/test_wal_restore.py index b0f34f4aae..f4aceac5e8 100644 --- a/test_runner/batch_others/test_wal_restore.py +++ b/test_runner/batch_others/test_wal_restore.py @@ -15,7 +15,6 @@ def test_wal_restore(zenith_env_builder: ZenithEnvBuilder, pg_bin: PgBin, test_output_dir, port_distributor: PortDistributor): - zenith_env_builder.num_safekeepers = 1 env = zenith_env_builder.init_start() env.zenith_cli.create_branch("test_wal_restore") pg = env.postgres.create_start('test_wal_restore') diff --git a/test_runner/batch_others/test_zenith_cli.py b/test_runner/batch_others/test_zenith_cli.py index bff17fa679..103d51aae5 100644 --- a/test_runner/batch_others/test_zenith_cli.py +++ b/test_runner/batch_others/test_zenith_cli.py @@ -94,8 +94,6 @@ def test_cli_tenant_create(zenith_simple_env: ZenithEnv): def test_cli_ipv4_listeners(zenith_env_builder: ZenithEnvBuilder): - # Start with single sk - zenith_env_builder.num_safekeepers = 1 env = zenith_env_builder.init_start() # Connect to sk port on v4 loopback @@ -111,8 +109,6 @@ def test_cli_ipv4_listeners(zenith_env_builder: ZenithEnvBuilder): def test_cli_start_stop(zenith_env_builder: ZenithEnvBuilder): - # Start with single sk - zenith_env_builder.num_safekeepers = 1 env = zenith_env_builder.init_start() # Stop default ps/sk diff --git a/test_runner/fixtures/zenith_fixtures.py b/test_runner/fixtures/zenith_fixtures.py index 14eae60248..09f7f26588 100644 --- a/test_runner/fixtures/zenith_fixtures.py +++ b/test_runner/fixtures/zenith_fixtures.py @@ -412,11 +412,10 @@ class ZenithEnvBuilder: port_distributor: PortDistributor, pageserver_remote_storage: Optional[RemoteStorage] = None, pageserver_config_override: Optional[str] = None, - num_safekeepers: int = 0, + num_safekeepers: int = 1, pageserver_auth_enabled: bool = False, rust_log_override: Optional[str] = None, - default_branch_name=DEFAULT_BRANCH_NAME, - broker: bool = False): + default_branch_name=DEFAULT_BRANCH_NAME): self.repo_dir = repo_dir self.rust_log_override = rust_log_override self.port_distributor = port_distributor @@ -425,7 +424,10 @@ class ZenithEnvBuilder: self.num_safekeepers = num_safekeepers self.pageserver_auth_enabled = pageserver_auth_enabled self.default_branch_name = default_branch_name - self.broker = broker + # keep etcd datadir inside 'repo' + self.broker = Etcd(datadir=os.path.join(self.repo_dir, "etcd"), + port=self.port_distributor.get_port(), + peer_port=self.port_distributor.get_port()) self.env: Optional[ZenithEnv] = None self.s3_mock_server: Optional[MockS3Server] = None @@ -551,14 +553,9 @@ class ZenithEnv: default_tenant_id = '{self.initial_tenant.hex}' """) - self.broker = None - if config.broker: - # keep etcd datadir inside 'repo' - self.broker = Etcd(datadir=os.path.join(self.repo_dir, "etcd"), - port=self.port_distributor.get_port(), - peer_port=self.port_distributor.get_port()) - toml += textwrap.dedent(f""" - broker_endpoints = ['http://127.0.0.1:{self.broker.port}'] + self.broker = config.broker + toml += textwrap.dedent(f""" + broker_endpoints = ['{self.broker.client_url()}'] """) # Create config for pageserver @@ -1851,24 +1848,29 @@ class Etcd: peer_port: int handle: Optional[subprocess.Popen[Any]] = None # handle of running daemon + def client_url(self): + return f'http://127.0.0.1:{self.port}' + def check_status(self): s = requests.Session() s.mount('http://', requests.adapters.HTTPAdapter(max_retries=1)) # do not retry - s.get(f"http://localhost:{self.port}/health").raise_for_status() + s.get(f"{self.client_url()}/health").raise_for_status() def start(self): pathlib.Path(self.datadir).mkdir(exist_ok=True) etcd_full_path = etcd_path() if etcd_full_path is None: - raise Exception('etcd not found') + raise Exception('etcd binary not found locally') + client_url = self.client_url() + log.info(f'Starting etcd to listen incoming connections at "{client_url}"') with open(os.path.join(self.datadir, "etcd.log"), "wb") as log_file: args = [ etcd_full_path, f"--data-dir={self.datadir}", - f"--listen-client-urls=http://localhost:{self.port}", - f"--advertise-client-urls=http://localhost:{self.port}", - f"--listen-peer-urls=http://localhost:{self.peer_port}" + f"--listen-client-urls={client_url}", + f"--advertise-client-urls={client_url}", + f"--listen-peer-urls=http://127.0.0.1:{self.peer_port}" ] self.handle = subprocess.Popen(args, stdout=log_file, stderr=log_file) @@ -1920,7 +1922,13 @@ def test_output_dir(request: Any) -> str: return test_dir -SKIP_DIRS = frozenset(('pg_wal', 'pg_stat', 'pg_stat_tmp', 'pg_subtrans', 'pg_logical')) +SKIP_DIRS = frozenset(('pg_wal', + 'pg_stat', + 'pg_stat_tmp', + 'pg_subtrans', + 'pg_logical', + 'pg_replslot/wal_proposer_slot', + 'pg_xact')) SKIP_FILES = frozenset(('pg_internal.init', 'pg.log',