diff --git a/control_plane/src/compute.rs b/control_plane/src/compute.rs index 342b043387..053aade9c8 100644 --- a/control_plane/src/compute.rs +++ b/control_plane/src/compute.rs @@ -88,15 +88,17 @@ impl ComputeControlPlane { &mut self, tenantid: ZTenantId, branch_name: &str, + port: Option, ) -> Result> { let timeline_id = self .pageserver .branch_get_by_name(&tenantid, branch_name)? .timeline_id; + let port = port.unwrap_or(self.get_port()); let node = Arc::new(PostgresNode { name: branch_name.to_owned(), - address: SocketAddr::new("127.0.0.1".parse().unwrap(), self.get_port()), + address: SocketAddr::new("127.0.0.1".parse().unwrap(), port), env: self.env.clone(), pageserver: Arc::clone(&self.pageserver), is_test: false, diff --git a/control_plane/src/local_env.rs b/control_plane/src/local_env.rs index 56f9129f2c..6de4e7a9c8 100644 --- a/control_plane/src/local_env.rs +++ b/control_plane/src/local_env.rs @@ -20,8 +20,9 @@ use zenith_utils::zid::ZTenantId; // #[derive(Serialize, Deserialize, Clone, Debug)] pub struct LocalEnv { - // Pageserver connection strings - pub pageserver_connstring: String, + // Pageserver connection settings + pub pageserver_pg_port: u16, + pub pageserver_http_port: u16, // Base directory for both pageserver and compute nodes pub base_data_dir: PathBuf, @@ -88,7 +89,12 @@ fn base_path() -> PathBuf { // // Initialize a new Zenith repository // -pub fn init(tenantid: ZTenantId, auth_type: AuthType) -> Result<()> { +pub fn init( + pageserver_pg_port: u16, + pageserver_http_port: u16, + tenantid: ZTenantId, + auth_type: AuthType, +) -> Result<()> { // check if config already exists let base_path = base_path(); if base_path.exists() { @@ -159,7 +165,8 @@ pub fn init(tenantid: ZTenantId, auth_type: AuthType) -> Result<()> { } let conf = LocalEnv { - pageserver_connstring: "postgresql://127.0.0.1:6400".to_string(), + pageserver_pg_port, + pageserver_http_port, pg_distrib_dir, zenith_distrib_dir, base_data_dir: base_path, diff --git a/control_plane/src/storage.rs b/control_plane/src/storage.rs index 34e7678291..d826836ea1 100644 --- a/control_plane/src/storage.rs +++ b/control_plane/src/storage.rs @@ -22,7 +22,6 @@ use crate::read_pidfile; use pageserver::branches::BranchInfo; use zenith_utils::connstring::connection_address; -const HTTP_BASE_URL: &str = "http://127.0.0.1:9898/v1"; #[derive(Error, Debug)] pub enum PageserverHttpError { @@ -81,27 +80,36 @@ impl PageServerNode { PageServerNode { kill_on_exit: false, - pg_connection_config: Self::default_config(password), // default + pg_connection_config: Self::pageserver_connection_config( + password, + env.pageserver_pg_port, + ), env: env.clone(), http_client: Client::new(), - http_base_url: HTTP_BASE_URL.to_owned(), + http_base_url: format!("http://localhost:{}/v1", env.pageserver_http_port), } } - fn default_config(password: &str) -> Config { - format!("postgresql://no_user:{}@localhost:64000/no_db", password) + fn pageserver_connection_config(password: &str, port: u16) -> Config { + format!("postgresql://no_user:{}@localhost:{}/no_db", password, port) .parse() .unwrap() } pub fn init(&self, create_tenant: Option<&str>, enable_auth: bool) -> anyhow::Result<()> { let mut cmd = Command::new(self.env.pageserver_bin()?); + let listen_pg = format!("localhost:{}", self.env.pageserver_pg_port); + let listen_http = format!("localhost:{}", self.env.pageserver_http_port); let mut args = vec![ "--init", "-D", self.env.base_data_dir.to_str().unwrap(), "--postgres-distrib", self.env.pg_distrib_dir.to_str().unwrap(), + "--listen-pg", + &listen_pg, + "--listen-http", + &listen_http, ]; if enable_auth { diff --git a/pageserver/src/bin/pageserver.rs b/pageserver/src/bin/pageserver.rs index 2da1e677fb..45a697ac5f 100644 --- a/pageserver/src/bin/pageserver.rs +++ b/pageserver/src/bin/pageserver.rs @@ -3,6 +3,7 @@ // use log::*; +use pageserver::defaults::*; use serde::{Deserialize, Serialize}; use std::{ env, @@ -11,7 +12,6 @@ use std::{ process::exit, str::FromStr, thread, - time::Duration, }; use zenith_utils::{auth::JwtAuth, logging, postgres_backend::AuthType}; @@ -22,19 +22,11 @@ use daemonize::Daemonize; use pageserver::{branches, http, page_service, tenant_mgr, PageServerConf, LOG_FILE_NAME}; use zenith_utils::http::endpoint; -const DEFAULT_LISTEN_ADDR: &str = "127.0.0.1:64000"; -const DEFAULT_HTTP_ENDPOINT_ADDR: &str = "127.0.0.1:9898"; - -const DEFAULT_GC_HORIZON: u64 = 64 * 1024 * 1024; -const DEFAULT_GC_PERIOD: Duration = Duration::from_secs(10); - -const DEFAULT_SUPERUSER: &str = "zenith_admin"; - /// String arguments that can be declared via CLI or config file #[derive(Serialize, Deserialize)] struct CfgFileParams { - listen_addr: Option, - http_endpoint_addr: Option, + listen_pg_addr: Option, + listen_http_addr: Option, gc_horizon: Option, gc_period: Option, pg_distrib_dir: Option, @@ -50,8 +42,8 @@ impl CfgFileParams { }; Self { - listen_addr: get_arg("listen"), - http_endpoint_addr: get_arg("http_endpoint"), + listen_pg_addr: get_arg("listen-pg"), + listen_http_addr: get_arg("listen-http"), gc_horizon: get_arg("gc_horizon"), gc_period: get_arg("gc_period"), pg_distrib_dir: get_arg("postgres-distrib"), @@ -64,8 +56,8 @@ impl CfgFileParams { fn or(self, other: CfgFileParams) -> Self { // TODO cleaner way to do this Self { - listen_addr: self.listen_addr.or(other.listen_addr), - http_endpoint_addr: self.http_endpoint_addr.or(other.http_endpoint_addr), + listen_pg_addr: self.listen_pg_addr.or(other.listen_pg_addr), + listen_http_addr: self.listen_http_addr.or(other.listen_http_addr), gc_horizon: self.gc_horizon.or(other.gc_horizon), gc_period: self.gc_period.or(other.gc_period), pg_distrib_dir: self.pg_distrib_dir.or(other.pg_distrib_dir), @@ -80,14 +72,14 @@ impl CfgFileParams { fn try_into_config(&self) -> Result { let workdir = PathBuf::from("."); - let listen_addr = match self.listen_addr.as_ref() { + let listen_pg_addr = match self.listen_pg_addr.as_ref() { Some(addr) => addr.clone(), - None => DEFAULT_LISTEN_ADDR.to_owned(), + None => DEFAULT_PG_LISTEN_ADDR.to_owned(), }; - let http_endpoint_addr = match self.http_endpoint_addr.as_ref() { + let listen_http_addr = match self.listen_http_addr.as_ref() { Some(addr) => addr.clone(), - None => DEFAULT_HTTP_ENDPOINT_ADDR.to_owned(), + None => DEFAULT_HTTP_LISTEN_ADDR.to_owned(), }; let gc_horizon: u64 = match self.gc_horizon.as_ref() { @@ -135,8 +127,8 @@ impl CfgFileParams { Ok(PageServerConf { daemonize: false, - listen_addr, - http_endpoint_addr, + listen_pg_addr, + listen_http_addr, gc_horizon, gc_period, @@ -156,12 +148,20 @@ fn main() -> Result<()> { let arg_matches = App::new("Zenith page server") .about("Materializes WAL stream to pages and serves them to the postgres") .arg( - Arg::with_name("listen") + Arg::with_name("listen-pg") .short("l") - .long("listen") + .long("listen-pg") + .alias("listen") // keep some compatibility .takes_value(true) .help("listen for incoming page requests on ip:port (default: 127.0.0.1:5430)"), ) + .arg( + Arg::with_name("listen-http") + .long("listen-http") + .alias("http_endpoint") // keep some compatibility + .takes_value(true) + .help("http endpoint address for for metrics and management API calls ip:port (default: 127.0.0.1:5430)"), + ) .arg( Arg::with_name("daemonize") .short("d") @@ -280,15 +280,15 @@ fn start_pageserver(conf: &'static PageServerConf) -> Result<()> { // bind sockets before daemonizing so we report errors early and do not return until we are listening info!( "Starting pageserver http handler on {}", - conf.http_endpoint_addr + conf.listen_http_addr ); - let http_listener = TcpListener::bind(conf.http_endpoint_addr.clone())?; + let http_listener = TcpListener::bind(conf.listen_http_addr.clone())?; info!( "Starting pageserver pg protocol handler on {}", - conf.listen_addr + conf.listen_pg_addr ); - let pageserver_listener = TcpListener::bind(conf.listen_addr.clone())?; + let pageserver_listener = TcpListener::bind(conf.listen_pg_addr.clone())?; if conf.daemonize { info!("daemonizing..."); diff --git a/pageserver/src/lib.rs b/pageserver/src/lib.rs index 2b52f17009..3388f32df1 100644 --- a/pageserver/src/lib.rs +++ b/pageserver/src/lib.rs @@ -20,6 +20,20 @@ pub mod waldecoder; pub mod walreceiver; pub mod walredo; +pub mod defaults { + use std::time::Duration; + + pub const DEFAULT_PG_LISTEN_PORT: u16 = 64000; + pub const DEFAULT_PG_LISTEN_ADDR: &str = "127.0.0.1:64000"; // can't format! const yet... + pub const DEFAULT_HTTP_LISTEN_PORT: u16 = 9898; + pub const DEFAULT_HTTP_LISTEN_ADDR: &str = "127.0.0.1:9898"; + + pub const DEFAULT_GC_HORIZON: u64 = 64 * 1024 * 1024; + pub const DEFAULT_GC_PERIOD: Duration = Duration::from_secs(100); + + pub const DEFAULT_SUPERUSER: &str = "zenith_admin"; +} + lazy_static! { static ref LIVE_CONNECTIONS_COUNT: IntGaugeVec = register_int_gauge_vec!( "pageserver_live_connections_count", @@ -34,8 +48,8 @@ pub const LOG_FILE_NAME: &str = "pageserver.log"; #[derive(Debug, Clone)] pub struct PageServerConf { pub daemonize: bool, - pub listen_addr: String, - pub http_endpoint_addr: String, + pub listen_pg_addr: String, + pub listen_http_addr: String, pub gc_horizon: u64, pub gc_period: Duration, pub superuser: String, @@ -123,8 +137,8 @@ impl PageServerConf { daemonize: false, gc_horizon: 64 * 1024 * 1024, gc_period: Duration::from_secs(10), - listen_addr: "127.0.0.1:5430".to_string(), - http_endpoint_addr: "127.0.0.1:9898".to_string(), + listen_pg_addr: "127.0.0.1:5430".to_string(), + listen_http_addr: "127.0.0.1:9898".to_string(), superuser: "zenith_admin".to_string(), workdir: repo_dir, pg_distrib_dir: "".into(), diff --git a/pageserver/src/repository.rs b/pageserver/src/repository.rs index 56f9cba5ba..3e7467f072 100644 --- a/pageserver/src/repository.rs +++ b/pageserver/src/repository.rs @@ -263,8 +263,8 @@ mod tests { daemonize: false, gc_horizon: 64 * 1024 * 1024, gc_period: Duration::from_secs(10), - listen_addr: "127.0.0.1:5430".to_string(), - http_endpoint_addr: "127.0.0.1:9898".to_string(), + listen_pg_addr: "127.0.0.1:5430".to_string(), + listen_http_addr: "127.0.0.1:9898".to_string(), superuser: "zenith_admin".to_string(), workdir: repo_dir, pg_distrib_dir: "".into(), diff --git a/test_runner/Pipfile b/test_runner/Pipfile index 8b38f3423b..f5250e4051 100644 --- a/test_runner/Pipfile +++ b/test_runner/Pipfile @@ -9,6 +9,7 @@ psycopg2 = "*" typing-extensions = "*" pyjwt = {extras = ["crypto"], version = "*"} requests = "*" +pytest-xdist = "*" [dev-packages] yapf = "*" diff --git a/test_runner/Pipfile.lock b/test_runner/Pipfile.lock index 567fa47d06..4219b91098 100644 --- a/test_runner/Pipfile.lock +++ b/test_runner/Pipfile.lock @@ -1,7 +1,7 @@ { "_meta": { "hash": { - "sha256": "b666740289d9c82797e5c39b2a7f0074c865c9183ee878ce4fa5cda7928506ea" + "sha256": "480afaf71a214984dac55d128a4f67ec2d9749136e570c64df562c79900a9d83" }, "pipfile-spec": 6, "requires": { @@ -91,22 +91,33 @@ }, "cryptography": { "hashes": [ - "sha256:0f1212a66329c80d68aeeb39b8a16d54ef57071bf22ff4e521657b27372e327d", - "sha256:1e056c28420c072c5e3cb36e2b23ee55e260cb04eee08f702e0edfec3fb51959", - "sha256:240f5c21aef0b73f40bb9f78d2caff73186700bf1bc6b94285699aff98cc16c6", - "sha256:26965837447f9c82f1855e0bc8bc4fb910240b6e0d16a664bb722df3b5b06873", - "sha256:37340614f8a5d2fb9aeea67fd159bfe4f5f4ed535b1090ce8ec428b2f15a11f2", - "sha256:3d10de8116d25649631977cb37da6cbdd2d6fa0e0281d014a5b7d337255ca713", - "sha256:3d8427734c781ea5f1b41d6589c293089704d4759e34597dce91014ac125aad1", - "sha256:7ec5d3b029f5fa2b179325908b9cd93db28ab7b85bb6c1db56b10e0b54235177", - "sha256:8e56e16617872b0957d1c9742a3f94b43533447fd78321514abbe7db216aa250", - "sha256:b01fd6f2737816cb1e08ed4807ae194404790eac7ad030b34f2ce72b332f5586", - "sha256:bf40af59ca2465b24e54f671b2de2c59257ddc4f7e5706dbd6930e26823668d3", - "sha256:de4e5f7f68220d92b7637fc99847475b59154b7a1b3868fb7385337af54ac9ca", - "sha256:eb8cc2afe8b05acbd84a43905832ec78e7b3873fb124ca190f574dca7389a87d", - "sha256:ee77aa129f481be46f8d92a1a7db57269a2f23052d5f2433b4621bb457081cc9" + "sha256:0a7dcbcd3f1913f664aca35d47c1331fce738d44ec34b7be8b9d332151b0b01e", + "sha256:1eb7bb0df6f6f583dd8e054689def236255161ebbcf62b226454ab9ec663746b", + "sha256:21ca464b3a4b8d8e86ba0ee5045e103a1fcfac3b39319727bc0fc58c09c6aff7", + "sha256:34dae04a0dce5730d8eb7894eab617d8a70d0c97da76b905de9efb7128ad7085", + "sha256:3520667fda779eb788ea00080124875be18f2d8f0848ec00733c0ec3bb8219fc", + "sha256:3fa3a7ccf96e826affdf1a0a9432be74dc73423125c8f96a909e3835a5ef194a", + "sha256:5b0fbfae7ff7febdb74b574055c7466da334a5371f253732d7e2e7525d570498", + "sha256:8695456444f277af73a4877db9fc979849cd3ee74c198d04fc0776ebc3db52b9", + "sha256:94cc5ed4ceaefcbe5bf38c8fba6a21fc1d365bb8fb826ea1688e3370b2e24a1c", + "sha256:94fff993ee9bc1b2440d3b7243d488c6a3d9724cc2b09cdb297f6a886d040ef7", + "sha256:9965c46c674ba8cc572bc09a03f4c649292ee73e1b683adb1ce81e82e9a6a0fb", + "sha256:a00cf305f07b26c351d8d4e1af84ad7501eca8a342dedf24a7acb0e7b7406e14", + "sha256:a305600e7a6b7b855cd798e00278161b681ad6e9b7eca94c721d5f588ab212af", + "sha256:cd65b60cfe004790c795cc35f272e41a3df4631e2fb6b35aa7ac6ef2859d554e", + "sha256:d2a6e5ef66503da51d2110edf6c403dc6b494cc0082f85db12f54e9c5d4c3ec5", + "sha256:d9ec0e67a14f9d1d48dd87a2531009a9b251c02ea42851c060b25c782516ff06", + "sha256:f44d141b8c4ea5eb4dbc9b3ad992d45580c1d22bf5e24363f2fbf50c2d7ae8a7" ], - "version": "==3.4.7" + "version": "==3.4.8" + }, + "execnet": { + "hashes": [ + "sha256:8f694f3ba9cc92cab508b152dcfe322153975c29bda272e2fd7f3f00f36e47c5", + "sha256:a295f7cc774947aac58dde7fdc85f4aa00c42adf5d8f5468fc630c1acf30a142" + ], + "markers": "python_version >= '2.7' and python_version not in '3.0, 3.1, 3.2, 3.3, 3.4'", + "version": "==1.9.0" }, "idna": { "hashes": [ @@ -133,11 +144,11 @@ }, "pluggy": { "hashes": [ - "sha256:15b2acde666561e1298d71b523007ed7364de07029219b604cf808bfa1c765b0", - "sha256:966c145cd83c96502c3c3868f50408687b38434af77734af1e9ca461a4081d2d" + "sha256:4224373bacce55f955a878bf9cfa763c1e360858e330072059e10bad68531159", + "sha256:74134bbf457f031a36d68416e1509f34bd5ccc019f0bcc952c7b909d06b37bd3" ], - "markers": "python_version >= '2.7' and python_version not in '3.0, 3.1, 3.2, 3.3'", - "version": "==0.13.1" + "markers": "python_version >= '3.6'", + "version": "==1.0.0" }, "psycopg2": { "hashes": [ @@ -191,11 +202,27 @@ }, "pytest": { "hashes": [ - "sha256:50bcad0a0b9c5a72c8e4e7c9855a3ad496ca6a881a3641b4260605450772c54b", - "sha256:91ef2131a9bd6be8f76f1f08eac5c5317221d6ad1e143ae03894b862e8976890" + "sha256:131b36680866a76e6781d13f101efb86cf674ebb9762eb70d3082b6f29889e89", + "sha256:7310f8d27bc79ced999e760ca304d69f6ba6c6649c0b60fb0e04a4a77cacc134" ], "index": "pypi", - "version": "==6.2.4" + "version": "==6.2.5" + }, + "pytest-forked": { + "hashes": [ + "sha256:6aa9ac7e00ad1a539c41bec6d21011332de671e938c7637378ec9710204e37ca", + "sha256:dc4147784048e70ef5d437951728825a131b81714b398d5d52f17c7c144d8815" + ], + "markers": "python_version >= '2.7' and python_version not in '3.0, 3.1, 3.2, 3.3, 3.4'", + "version": "==1.3.0" + }, + "pytest-xdist": { + "hashes": [ + "sha256:e8ecde2f85d88fbcadb7d28cb33da0fa29bca5cf7d5967fa89fc0e97e5299ea5", + "sha256:ed3d7da961070fce2a01818b51f6888327fb88df4379edeb6b9d990e789d9c8d" + ], + "index": "pypi", + "version": "==2.3.0" }, "requests": { "hashes": [ @@ -215,12 +242,12 @@ }, "typing-extensions": { "hashes": [ - "sha256:0ac0f89795dd19de6b97debb0c6af1c70987fd80a2d62d1958f7e56fcc31b497", - "sha256:50b6f157849174217d0656f99dc82fe932884fb250826c18350e159ec6cdf342", - "sha256:779383f6086d90c99ae41cf0ff39aac8a7937a9283ce0a414e5dd782f4c94a84" + "sha256:49f75d16ff11f1cd258e1b988ccff82a3ca5570217d7ad8c5f48205dd99a677e", + "sha256:d8226d10bc02a29bcc81df19a26e56a9647f8b0a6d4a83924139f4a8b01f17b7", + "sha256:f1d25edafde516b146ecd0613dabcc61409817af4766fbbcfb8d1ad4ec441a34" ], "index": "pypi", - "version": "==3.10.0.0" + "version": "==3.10.0.2" }, "urllib3": { "hashes": [ @@ -309,12 +336,12 @@ }, "typing-extensions": { "hashes": [ - "sha256:0ac0f89795dd19de6b97debb0c6af1c70987fd80a2d62d1958f7e56fcc31b497", - "sha256:50b6f157849174217d0656f99dc82fe932884fb250826c18350e159ec6cdf342", - "sha256:779383f6086d90c99ae41cf0ff39aac8a7937a9283ce0a414e5dd782f4c94a84" + "sha256:49f75d16ff11f1cd258e1b988ccff82a3ca5570217d7ad8c5f48205dd99a677e", + "sha256:d8226d10bc02a29bcc81df19a26e56a9647f8b0a6d4a83924139f4a8b01f17b7", + "sha256:f1d25edafde516b146ecd0613dabcc61409817af4766fbbcfb8d1ad4ec441a34" ], "index": "pypi", - "version": "==3.10.0.0" + "version": "==3.10.0.2" }, "yapf": { "hashes": [ diff --git a/test_runner/batch_others/test_auth.py b/test_runner/batch_others/test_auth.py index 10e5bb22b5..acde4a4594 100644 --- a/test_runner/batch_others/test_auth.py +++ b/test_runner/batch_others/test_auth.py @@ -1,11 +1,15 @@ from contextlib import closing +from typing import Iterator from uuid import uuid4 import psycopg2 from fixtures.zenith_fixtures import Postgres, ZenithCli, ZenithPageserver, PgBin import pytest +pytest_plugins = ("fixtures.zenith_fixtures") + + def test_pageserver_auth(pageserver_auth_enabled: ZenithPageserver): ps = pageserver_auth_enabled @@ -42,7 +46,8 @@ def test_compute_auth_to_pageserver( pageserver_auth_enabled: ZenithPageserver, repo_dir: str, with_wal_acceptors: bool, - pg_bin: PgBin + pg_bin: PgBin, + port_distributor: Iterator[int], ): ps = pageserver_auth_enabled # since we are in progress of refactoring protocols between compute safekeeper and page server @@ -59,7 +64,7 @@ def test_compute_auth_to_pageserver( repo_dir=repo_dir, pg_bin=pg_bin, tenant_id=ps.initial_tenant, - port=55432, # FIXME port distribution is hardcoded in tests and in cli + port=next(port_distributor), ).create_start( branch, wal_acceptors=wa_factory.get_connstrs() if with_wal_acceptors else None, diff --git a/test_runner/batch_others/test_createdropdb.py b/test_runner/batch_others/test_createdropdb.py index d670fa6d38..cbe89a77cb 100644 --- a/test_runner/batch_others/test_createdropdb.py +++ b/test_runner/batch_others/test_createdropdb.py @@ -99,4 +99,4 @@ def test_dropdb( assert os.path.isdir(dbpath) == False # Check that we restore the content of the datadir correctly - check_restored_datadir_content(zenith_cli, test_output_dir, pg) + check_restored_datadir_content(zenith_cli, test_output_dir, pg, pageserver.service_port.pg) diff --git a/test_runner/batch_others/test_multixact.py b/test_runner/batch_others/test_multixact.py index 5eb813868c..aaa9e7f58d 100644 --- a/test_runner/batch_others/test_multixact.py +++ b/test_runner/batch_others/test_multixact.py @@ -66,4 +66,4 @@ def test_multixact(pageserver: ZenithPageserver, postgres: PostgresFactory, assert next_multixact_id_new == next_multixact_id # Check that we restore the content of the datadir correctly - check_restored_datadir_content(zenith_cli, test_output_dir, pg_new) + check_restored_datadir_content(zenith_cli, test_output_dir, pg_new, pageserver.service_port.pg) diff --git a/test_runner/batch_pg_regress/test_pg_regress.py b/test_runner/batch_pg_regress/test_pg_regress.py index cd66967fa1..6f61b77ebc 100644 --- a/test_runner/batch_pg_regress/test_pg_regress.py +++ b/test_runner/batch_pg_regress/test_pg_regress.py @@ -55,4 +55,4 @@ def test_pg_regress(pageserver: ZenithPageserver, postgres: PostgresFactory, pg_ lsn = pg.safe_psql('select pg_current_wal_insert_lsn()')[0][0] # Check that we restore the content of the datadir correctly - check_restored_datadir_content(zenith_cli, test_output_dir, pg) + check_restored_datadir_content(zenith_cli, test_output_dir, pg, pageserver.service_port.pg) diff --git a/test_runner/batch_pg_regress/test_zenith_regress.py b/test_runner/batch_pg_regress/test_zenith_regress.py index 62f55f0e67..09f5f83933 100644 --- a/test_runner/batch_pg_regress/test_zenith_regress.py +++ b/test_runner/batch_pg_regress/test_zenith_regress.py @@ -1,13 +1,13 @@ import os from fixtures.utils import mkdir_if_needed -from fixtures.zenith_fixtures import PostgresFactory, check_restored_datadir_content +from fixtures.zenith_fixtures import PageserverPort, PostgresFactory, check_restored_datadir_content pytest_plugins = ("fixtures.zenith_fixtures") def test_zenith_regress(postgres: PostgresFactory, pg_bin, zenith_cli, test_output_dir, pg_distrib_dir, - base_dir, capsys): + base_dir, capsys, pageserver_port: PageserverPort): # Create a branch for us zenith_cli.run(["branch", "test_zenith_regress", "empty"]) @@ -56,4 +56,4 @@ def test_zenith_regress(postgres: PostgresFactory, pg_bin, zenith_cli, test_outp lsn = pg.safe_psql('select pg_current_wal_insert_lsn()')[0][0] # Check that we restore the content of the datadir correctly - check_restored_datadir_content(zenith_cli, test_output_dir, pg) + check_restored_datadir_content(zenith_cli, test_output_dir, pg, pageserver_port.pg) diff --git a/test_runner/fixtures/utils.py b/test_runner/fixtures/utils.py index 6214503af5..9e20ae723b 100644 --- a/test_runner/fixtures/utils.py +++ b/test_runner/fixtures/utils.py @@ -14,10 +14,11 @@ def mkdir_if_needed(path: str) -> None: Note this won't try to create intermediate directories. """ - if os.path.exists(path): - assert os.path.isdir(path) - return - os.mkdir(path) + try: + os.mkdir(path) + except FileExistsError: + pass + assert os.path.isdir(path) def subprocess_capture(capture_dir: str, cmd: List[str], **kwargs: Any) -> None: diff --git a/test_runner/fixtures/zenith_fixtures.py b/test_runner/fixtures/zenith_fixtures.py index 39d7a6690d..a1ed11200a 100644 --- a/test_runner/fixtures/zenith_fixtures.py +++ b/test_runner/fixtures/zenith_fixtures.py @@ -50,7 +50,7 @@ DEFAULT_OUTPUT_DIR = 'test_output' DEFAULT_POSTGRES_DIR = 'tmp_install' DEFAULT_PAGESERVER_PG_PORT = 64000 -DEFAULT_PAGESERVER_HTTP_PORT = 9898 +BASE_PORT = 55000 def determine_scope(fixture_name: str, config: Any) -> str: @@ -75,20 +75,6 @@ def zenfixture(func: Fn) -> Fn: return pytest.fixture(func, scope=scope) -@pytest.fixture(autouse=True, scope='session') -def safety_check() -> None: - """ Ensure that no unwanted daemons are running before we start testing. """ - - # does not use -c as it is not supported on macOS - cmd = ['pgrep', 'pageserver|postgres|wal_acceptor'] - result = subprocess.run(cmd, stdout=subprocess.DEVNULL) - if result.returncode == 0: - # returncode of 0 means it found something. - # This is bad; we don't want any of those processes polluting the - # result of the test. - raise Exception('found interfering processes running') - - class PgProtocol: """ Reusable connection logic """ def __init__(self, host: str, port: int, username: Optional[str] = None): @@ -277,24 +263,48 @@ class AuthKeys: return token +@zenfixture +def worker_seq_no(worker_id: str): + if worker_id == 'master': + return 0 + assert worker_id.startswith('gw') + return int(worker_id[2:]) +@zenfixture +def worker_base_port(worker_seq_no: int): + # so we divide ports in ranges of 100 ports + # so workers have disjoint set of ports for services + return BASE_PORT + worker_seq_no * 100 + + +@zenfixture +def port_distributor(worker_base_port): + yield iter(range(worker_base_port, worker_base_port + 100)) + + +@dataclass +class PageserverPort: + pg: int + http: int + class ZenithPageserver(PgProtocol): """ An object representing a running pageserver. """ - def __init__(self, zenith_cli: ZenithCli, repo_dir: str): - super().__init__(host='localhost', port=DEFAULT_PAGESERVER_PG_PORT) + def __init__(self, zenith_cli: ZenithCli, repo_dir: str, port: PageserverPort): + super().__init__(host='localhost', port=port.pg) self.zenith_cli = zenith_cli self.running = False self.initial_tenant = None self.repo_dir = repo_dir + self.service_port = port # do not shadow PgProtocol.port which is just int def init(self, enable_auth: bool = False) -> 'ZenithPageserver': """ Initialize the repository, i.e. run "zenith init". Returns self. """ - cmd = ['init'] + cmd = ['init', f'--pageserver-pg-port={self.service_port.pg}', f'--pageserver-http-port={self.service_port.http}'] if enable_auth: cmd.append('--enable-auth') self.zenith_cli.run(cmd) @@ -344,13 +354,23 @@ class ZenithPageserver(PgProtocol): def http_client(self, auth_token: Optional[str] = None): return ZenithPageserverHttpClient( - port=DEFAULT_PAGESERVER_HTTP_PORT, + port=self.service_port.http, auth_token=auth_token, ) + + @zenfixture -def pageserver(zenith_cli: ZenithCli, repo_dir: str) -> Iterator[ZenithPageserver]: +def pageserver_port(port_distributor: Iterator[int]) -> PageserverPort: + pg = next(port_distributor) + http = next(port_distributor) + print(f"pageserver_port: pg={pg} http={http}") + return PageserverPort(pg=pg, http=http) + + +@zenfixture +def pageserver(zenith_cli: ZenithCli, repo_dir: str, pageserver_port: PageserverPort) -> Iterator[ZenithPageserver]: """ The 'pageserver' fixture provides a Page Server that's up and running. @@ -362,8 +382,7 @@ def pageserver(zenith_cli: ZenithCli, repo_dir: str) -> Iterator[ZenithPageserve By convention, the test branches are named after the tests. For example, test called 'test_foo' would create and use branches with the 'test_foo' prefix. """ - - ps = ZenithPageserver(zenith_cli, repo_dir).init().start() + ps = ZenithPageserver(zenith_cli=zenith_cli, repo_dir=repo_dir, port=pageserver_port).init().start() # For convenience in tests, create a branch from the freshly-initialized cluster. zenith_cli.run(["branch", "empty", "main"]) @@ -433,8 +452,8 @@ def pg_bin(test_output_dir: str, pg_distrib_dir: str) -> PgBin: return PgBin(test_output_dir, pg_distrib_dir) @pytest.fixture -def pageserver_auth_enabled(zenith_cli: ZenithCli, repo_dir: str): - with ZenithPageserver(zenith_cli, repo_dir).init(enable_auth=True).start() as ps: +def pageserver_auth_enabled(zenith_cli: ZenithCli, repo_dir: str, pageserver_port: PageserverPort): + with ZenithPageserver(zenith_cli=zenith_cli, repo_dir=repo_dir, port=pageserver_port).init(enable_auth=True).start() as ps: # For convenience in tests, create a branch from the freshly-initialized cluster. zenith_cli.run(["branch", "empty", "main"]) yield ps @@ -470,7 +489,7 @@ class Postgres(PgProtocol): if not config_lines: config_lines = [] - self.zenith_cli.run(['pg', 'create', branch, f'--tenantid={self.tenant_id}']) + self.zenith_cli.run(['pg', 'create', branch, f'--tenantid={self.tenant_id}', f'--port={self.port}']) self.branch = branch path = pathlib.Path('pgdatadirs') / 'tenants' / self.tenant_id / self.branch self.pgdata_dir = os.path.join(self.repo_dir, path) @@ -493,7 +512,7 @@ class Postgres(PgProtocol): print(f"Starting postgres on branch {self.branch}") - run_result = self.zenith_cli.run(['pg', 'start', self.branch, f'--tenantid={self.tenant_id}']) + run_result = self.zenith_cli.run(['pg', 'start', self.branch, f'--tenantid={self.tenant_id}', f'--port={self.port}']) self.running = True print(f"stdout: {run_result.stdout}") @@ -607,13 +626,13 @@ class Postgres(PgProtocol): class PostgresFactory: """ An object representing multiple running postgres daemons. """ - def __init__(self, zenith_cli: ZenithCli, repo_dir: str, pg_bin: PgBin, initial_tenant: str, base_port: int = 55431): + def __init__(self, zenith_cli: ZenithCli, repo_dir: str, pg_bin: PgBin, initial_tenant: str, port_distributor: Iterator[int]): self.zenith_cli = zenith_cli self.repo_dir = repo_dir self.num_instances = 0 self.instances: List[Postgres] = [] self.initial_tenant: str = initial_tenant - self.base_port = base_port + self.port_distributor = port_distributor self.pg_bin = pg_bin def create_start( @@ -623,15 +642,13 @@ class PostgresFactory: wal_acceptors: Optional[str] = None, config_lines: Optional[List[str]] = None ) -> Postgres: - pg = Postgres( zenith_cli=self.zenith_cli, repo_dir=self.repo_dir, pg_bin=self.pg_bin, tenant_id=tenant_id or self.initial_tenant, - port=self.base_port + self.num_instances + 1, + port=next(self.port_distributor), ) - self.num_instances += 1 self.instances.append(pg) @@ -654,7 +671,7 @@ class PostgresFactory: repo_dir=self.repo_dir, pg_bin=self.pg_bin, tenant_id=tenant_id or self.initial_tenant, - port=self.base_port + self.num_instances + 1, + port=next(self.port_distributor), ) self.num_instances += 1 @@ -679,7 +696,7 @@ class PostgresFactory: repo_dir=self.repo_dir, pg_bin=self.pg_bin, tenant_id=tenant_id or self.initial_tenant, - port=self.base_port + self.num_instances + 1, + port=next(self.port_distributor), ) self.num_instances += 1 @@ -703,8 +720,14 @@ def initial_tenant(pageserver: ZenithPageserver): @zenfixture -def postgres(zenith_cli: ZenithCli, initial_tenant: str, repo_dir: str, pg_bin: PgBin) -> Iterator[PostgresFactory]: - pgfactory = PostgresFactory(zenith_cli, repo_dir, pg_bin, initial_tenant=initial_tenant) +def postgres(zenith_cli: ZenithCli, initial_tenant: str, repo_dir: str, pg_bin: PgBin, port_distributor: Iterator[int]) -> Iterator[PostgresFactory]: + pgfactory = PostgresFactory( + zenith_cli=zenith_cli, + repo_dir=repo_dir, + pg_bin=pg_bin, + initial_tenant=initial_tenant, + port_distributor=port_distributor, + ) yield pgfactory @@ -720,10 +743,11 @@ def read_pid(path: Path): @dataclass class WalAcceptor: """ An object representing a running wal acceptor daemon. """ - bin_path: Path + wa_bin_path: Path data_dir: Path port: int num: int # identifier for logging + pageserver_port: int auth_token: Optional[str] = None def start(self) -> 'WalAcceptor': @@ -731,13 +755,13 @@ class WalAcceptor: self.data_dir.mkdir(parents=True, exist_ok=True) self.pidfile.unlink(missing_ok=True) - cmd = [str(self.bin_path)] + cmd = [str(self.wa_bin_path)] cmd.extend(["-D", str(self.data_dir)]) - cmd.extend(["-l", "localhost:{}".format(self.port)]) + cmd.extend(["-l", f"localhost:{self.port}"]) cmd.append("--daemonize") cmd.append("--no-sync") # Tell page server it can receive WAL from this WAL safekeeper - cmd.extend(["--pageserver", "localhost:{}".format(DEFAULT_PAGESERVER_PG_PORT)]) + cmd.extend(["--pageserver", f"localhost:{self.pageserver_port}"]) cmd.extend(["--recall", "1 second"]) print('Running command "{}"'.format(' '.join(cmd))) env = {'PAGESERVER_AUTH_TOKEN': self.auth_token} if self.auth_token else None @@ -784,24 +808,25 @@ class WalAcceptor: class WalAcceptorFactory: """ An object representing multiple running wal acceptors. """ - def __init__(self, zenith_binpath: Path, data_dir: Path): - self.wa_binpath = zenith_binpath / 'wal_acceptor' + def __init__(self, zenith_binpath: Path, data_dir: Path, pageserver_port: int, port_distributor: Iterator[int]): + self.wa_bin_path = zenith_binpath / 'wal_acceptor' self.data_dir = data_dir self.instances: List[WalAcceptor] = [] - self.initial_port = 54321 + self.port_distributor = port_distributor + self.pageserver_port = pageserver_port def start_new(self, auth_token: Optional[str] = None) -> WalAcceptor: """ Start new wal acceptor. """ - wa_num = len(self.instances) wa = WalAcceptor( - self.wa_binpath, - self.data_dir / "wal_acceptor_{}".format(wa_num), - self.initial_port + wa_num, - wa_num, - auth_token, + wa_bin_path=self.wa_bin_path, + data_dir=self.data_dir / "wal_acceptor_{}".format(wa_num), + port=next(self.port_distributor), + num=wa_num, + pageserver_port=self.pageserver_port, + auth_token=auth_token, ) wa.start() self.instances.append(wa) @@ -826,9 +851,14 @@ class WalAcceptorFactory: @zenfixture -def wa_factory(zenith_binpath: str, repo_dir: str) -> Iterator[WalAcceptorFactory]: +def wa_factory(zenith_binpath: str, repo_dir: str, pageserver_port: PageserverPort, port_distributor: Iterator[int]) -> Iterator[WalAcceptorFactory]: """ Gives WalAcceptorFactory providing wal acceptors. """ - wafactory = WalAcceptorFactory(Path(zenith_binpath), Path(repo_dir) / "wal_acceptors") + wafactory = WalAcceptorFactory( + zenith_binpath=Path(zenith_binpath), + data_dir=Path(repo_dir) / "wal_acceptors", + pageserver_port=pageserver_port.pg, + port_distributor=port_distributor, + ) yield wafactory # After the yield comes any cleanup code we need. print('Starting wal acceptors cleanup') @@ -954,7 +984,7 @@ def list_files_to_compare(pgdata_dir: str): return pgdata_files # pg is the existing and running compute node, that we want to compare with a basebackup -def check_restored_datadir_content(zenith_cli, test_output_dir, pg): +def check_restored_datadir_content(zenith_cli: ZenithCli, test_output_dir: str, pg: Postgres, pageserver_pg_port: int): # Get the timeline ID of our branch. We need it for the 'basebackup' command with closing(pg.connect()) as conn: @@ -969,12 +999,11 @@ def check_restored_datadir_content(zenith_cli, test_output_dir, pg): restored_dir_path = os.path.join(test_output_dir, "{}_restored_datadir".format(pg.branch)) mkdir_if_needed(restored_dir_path) - cmd = "psql -h 127.0.0.1 -p {} -c 'basebackup {} {}' | tar -x -C {}".format( - DEFAULT_PAGESERVER_PG_PORT, pg.tenant_id, timeline, restored_dir_path) + cmd = f"psql -h localhost -p {pageserver_pg_port} -c 'basebackup {pg.tenant_id} {timeline}' | tar -x -C {restored_dir_path}" cmd = os.path.join(pg.pg_bin.pg_bin_path, cmd) - subprocess.run(cmd, shell=True) + subprocess.check_call(cmd, shell=True) # list files we're going to compare pgdata_files = list_files_to_compare(pg.pgdata_dir) diff --git a/zenith/src/main.rs b/zenith/src/main.rs index 9f5343b5a2..1c04e803e6 100644 --- a/zenith/src/main.rs +++ b/zenith/src/main.rs @@ -4,6 +4,7 @@ use clap::{App, AppSettings, Arg, ArgMatches, SubCommand}; use control_plane::compute::ComputeControlPlane; use control_plane::local_env; use control_plane::storage::PageServerNode; +use pageserver::defaults::{DEFAULT_HTTP_LISTEN_PORT, DEFAULT_PG_LISTEN_PORT}; use std::collections::HashMap; use std::process::exit; use std::str::FromStr; @@ -43,11 +44,28 @@ fn main() -> Result<()> { .takes_value(true) .required(false); + let port_arg = Arg::with_name("port") + .long("port") + .required(false) + .value_name("port"); + let matches = App::new("Zenith CLI") .setting(AppSettings::ArgRequiredElseHelp) .subcommand( SubCommand::with_name("init") .about("Initialize a new Zenith repository") + .arg( + Arg::with_name("pageserver-pg-port") + .long("pageserver-pg-port") + .required(false) + .value_name("pageserver-pg-port"), + ) + .arg( + Arg::with_name("pageserver-http-port") + .long("pageserver-http-port") + .required(false) + .value_name("pageserver-http-port"), + ) .arg( Arg::with_name("enable-auth") .long("enable-auth") @@ -79,7 +97,7 @@ fn main() -> Result<()> { .subcommand(SubCommand::with_name("list").arg(tenantid_arg.clone())) .subcommand(SubCommand::with_name("create") .about("Create a postgres compute node") - .arg(timeline_arg.clone()).arg(tenantid_arg.clone()) + .arg(timeline_arg.clone()).arg(tenantid_arg.clone()).arg(port_arg.clone()) .arg( Arg::with_name("config-only") .help("Don't do basebackup, create compute node with only config files") @@ -88,7 +106,11 @@ fn main() -> Result<()> { )) .subcommand(SubCommand::with_name("start") .about("Start a postgres compute node.\n This command actually creates new node from scratch, but preserves existing config files") - .arg(timeline_arg.clone()).arg(tenantid_arg.clone())) + .arg( + timeline_arg.clone() + ).arg( + tenantid_arg.clone() + ).arg(port_arg.clone())) .subcommand( SubCommand::with_name("stop") .arg(timeline_arg.clone()) @@ -99,19 +121,36 @@ fn main() -> Result<()> { .long("destroy") .required(false) ) - ) + ) + ) .get_matches(); // Create config file if let ("init", Some(init_match)) = matches.subcommand() { let tenantid = ZTenantId::generate(); + let pageserver_pg_port = match init_match.value_of("pageserver-pg-port") { + Some(v) => v.parse()?, + None => DEFAULT_PG_LISTEN_PORT, + }; + let pageserver_http_port = match init_match.value_of("pageserver-http-port") { + Some(v) => v.parse()?, + None => DEFAULT_HTTP_LISTEN_PORT, + }; + let auth_type = if init_match.is_present("enable-auth") { AuthType::ZenithJWT } else { AuthType::Trust }; - local_env::init(tenantid, auth_type).with_context(|| "Failed to create config file")?; + + local_env::init( + pageserver_pg_port, + pageserver_http_port, + tenantid, + auth_type, + ) + .with_context(|| "Failed to create config file")?; } // all other commands would need config @@ -412,7 +451,11 @@ fn handle_pg(pg_match: &ArgMatches, env: &local_env::LocalEnv) -> Result<()> { .map_or(Ok(env.tenantid), |value| value.parse())?; let timeline_name = create_match.value_of("timeline").unwrap_or("main"); - cplane.new_node(tenantid, timeline_name)?; + let port: Option = match create_match.value_of("port") { + Some(p) => Some(p.parse()?), + None => None, + }; + cplane.new_node(tenantid, timeline_name, port)?; } ("start", Some(start_match)) => { let tenantid: ZTenantId = start_match @@ -420,6 +463,11 @@ fn handle_pg(pg_match: &ArgMatches, env: &local_env::LocalEnv) -> Result<()> { .map_or(Ok(env.tenantid), |value| value.parse())?; let timeline_name = start_match.value_of("timeline").unwrap_or("main"); + let port: Option = match start_match.value_of("port") { + Some(p) => Some(p.parse()?), + None => None, + }; + let node = cplane.nodes.get(&(tenantid, timeline_name.to_owned())); let auth_token = if matches!(env.auth_type, AuthType::ZenithJWT) { @@ -437,7 +485,12 @@ fn handle_pg(pg_match: &ArgMatches, env: &local_env::LocalEnv) -> Result<()> { if let Some(node) = node { node.start(&auth_token)?; } else { - let node = cplane.new_node(tenantid, timeline_name)?; + // when used with custom port this results in non obvious behaviour + // port is remembered from first start command, i e + // start --port X + // stop + // start <-- will also use port X even without explicit port argument + let node = cplane.new_node(tenantid, timeline_name, port)?; node.start(&auth_token)?; } } diff --git a/zenith_utils/src/http/endpoint.rs b/zenith_utils/src/http/endpoint.rs index f17c6c7448..03802f61ff 100644 --- a/zenith_utils/src/http/endpoint.rs +++ b/zenith_utils/src/http/endpoint.rs @@ -1,5 +1,3 @@ -use std::net::TcpListener; - use crate::auth::{self, Claims, JwtAuth}; use crate::http::error; use crate::zid::ZTenantId; @@ -10,6 +8,7 @@ use lazy_static::lazy_static; use routerify::ext::RequestExt; use routerify::RequestInfo; use routerify::{Middleware, Router, RouterBuilder, RouterService}; +use std::net::TcpListener; use zenith_metrics::{register_int_counter, IntCounter}; use zenith_metrics::{Encoder, TextEncoder};