Support parallel test running for python tests

Support is done via pytest-xdist plugin.
To use the feature add -n<concurrency> to pytest invocation
e.g. pytest -n8 to run 8 tests in parallel.

Changes in code are mostly about ports assigning. Previously port for
pageserver was hardcoded without the ability to override through zenith
cli and ports for started compute nodes were calculated twice, in zenith
cli and in test code. Now zenith cli supports port arguments for
pageserver and compute nodes to be passed explicitly.

Tests are modified in such a way that each worker gets a non overlapping
port range which can be configured and now contains 100 ports. These
ports are distributed to test services (pageserver, wal acceptors,
compute nodes) so they can work independently.
This commit is contained in:
Dmitry Rodionov
2021-08-11 12:39:03 +03:00
committed by Dmitry
parent dc897fb864
commit 4ebe643d0c
17 changed files with 294 additions and 148 deletions

View File

@@ -88,15 +88,17 @@ impl ComputeControlPlane {
&mut self,
tenantid: ZTenantId,
branch_name: &str,
port: Option<u16>,
) -> Result<Arc<PostgresNode>> {
let timeline_id = self
.pageserver
.branch_get_by_name(&tenantid, branch_name)?
.timeline_id;
let port = port.unwrap_or(self.get_port());
let node = Arc::new(PostgresNode {
name: branch_name.to_owned(),
address: SocketAddr::new("127.0.0.1".parse().unwrap(), self.get_port()),
address: SocketAddr::new("127.0.0.1".parse().unwrap(), port),
env: self.env.clone(),
pageserver: Arc::clone(&self.pageserver),
is_test: false,

View File

@@ -20,8 +20,9 @@ use zenith_utils::zid::ZTenantId;
//
#[derive(Serialize, Deserialize, Clone, Debug)]
pub struct LocalEnv {
// Pageserver connection strings
pub pageserver_connstring: String,
// Pageserver connection settings
pub pageserver_pg_port: u16,
pub pageserver_http_port: u16,
// Base directory for both pageserver and compute nodes
pub base_data_dir: PathBuf,
@@ -88,7 +89,12 @@ fn base_path() -> PathBuf {
//
// Initialize a new Zenith repository
//
pub fn init(tenantid: ZTenantId, auth_type: AuthType) -> Result<()> {
pub fn init(
pageserver_pg_port: u16,
pageserver_http_port: u16,
tenantid: ZTenantId,
auth_type: AuthType,
) -> Result<()> {
// check if config already exists
let base_path = base_path();
if base_path.exists() {
@@ -159,7 +165,8 @@ pub fn init(tenantid: ZTenantId, auth_type: AuthType) -> Result<()> {
}
let conf = LocalEnv {
pageserver_connstring: "postgresql://127.0.0.1:6400".to_string(),
pageserver_pg_port,
pageserver_http_port,
pg_distrib_dir,
zenith_distrib_dir,
base_data_dir: base_path,

View File

@@ -22,7 +22,6 @@ use crate::read_pidfile;
use pageserver::branches::BranchInfo;
use zenith_utils::connstring::connection_address;
const HTTP_BASE_URL: &str = "http://127.0.0.1:9898/v1";
#[derive(Error, Debug)]
pub enum PageserverHttpError {
@@ -81,27 +80,36 @@ impl PageServerNode {
PageServerNode {
kill_on_exit: false,
pg_connection_config: Self::default_config(password), // default
pg_connection_config: Self::pageserver_connection_config(
password,
env.pageserver_pg_port,
),
env: env.clone(),
http_client: Client::new(),
http_base_url: HTTP_BASE_URL.to_owned(),
http_base_url: format!("http://localhost:{}/v1", env.pageserver_http_port),
}
}
fn default_config(password: &str) -> Config {
format!("postgresql://no_user:{}@localhost:64000/no_db", password)
fn pageserver_connection_config(password: &str, port: u16) -> Config {
format!("postgresql://no_user:{}@localhost:{}/no_db", password, port)
.parse()
.unwrap()
}
pub fn init(&self, create_tenant: Option<&str>, enable_auth: bool) -> anyhow::Result<()> {
let mut cmd = Command::new(self.env.pageserver_bin()?);
let listen_pg = format!("localhost:{}", self.env.pageserver_pg_port);
let listen_http = format!("localhost:{}", self.env.pageserver_http_port);
let mut args = vec![
"--init",
"-D",
self.env.base_data_dir.to_str().unwrap(),
"--postgres-distrib",
self.env.pg_distrib_dir.to_str().unwrap(),
"--listen-pg",
&listen_pg,
"--listen-http",
&listen_http,
];
if enable_auth {

View File

@@ -3,6 +3,7 @@
//
use log::*;
use pageserver::defaults::*;
use serde::{Deserialize, Serialize};
use std::{
env,
@@ -11,7 +12,6 @@ use std::{
process::exit,
str::FromStr,
thread,
time::Duration,
};
use zenith_utils::{auth::JwtAuth, logging, postgres_backend::AuthType};
@@ -22,19 +22,11 @@ use daemonize::Daemonize;
use pageserver::{branches, http, page_service, tenant_mgr, PageServerConf, LOG_FILE_NAME};
use zenith_utils::http::endpoint;
const DEFAULT_LISTEN_ADDR: &str = "127.0.0.1:64000";
const DEFAULT_HTTP_ENDPOINT_ADDR: &str = "127.0.0.1:9898";
const DEFAULT_GC_HORIZON: u64 = 64 * 1024 * 1024;
const DEFAULT_GC_PERIOD: Duration = Duration::from_secs(10);
const DEFAULT_SUPERUSER: &str = "zenith_admin";
/// String arguments that can be declared via CLI or config file
#[derive(Serialize, Deserialize)]
struct CfgFileParams {
listen_addr: Option<String>,
http_endpoint_addr: Option<String>,
listen_pg_addr: Option<String>,
listen_http_addr: Option<String>,
gc_horizon: Option<String>,
gc_period: Option<String>,
pg_distrib_dir: Option<String>,
@@ -50,8 +42,8 @@ impl CfgFileParams {
};
Self {
listen_addr: get_arg("listen"),
http_endpoint_addr: get_arg("http_endpoint"),
listen_pg_addr: get_arg("listen-pg"),
listen_http_addr: get_arg("listen-http"),
gc_horizon: get_arg("gc_horizon"),
gc_period: get_arg("gc_period"),
pg_distrib_dir: get_arg("postgres-distrib"),
@@ -64,8 +56,8 @@ impl CfgFileParams {
fn or(self, other: CfgFileParams) -> Self {
// TODO cleaner way to do this
Self {
listen_addr: self.listen_addr.or(other.listen_addr),
http_endpoint_addr: self.http_endpoint_addr.or(other.http_endpoint_addr),
listen_pg_addr: self.listen_pg_addr.or(other.listen_pg_addr),
listen_http_addr: self.listen_http_addr.or(other.listen_http_addr),
gc_horizon: self.gc_horizon.or(other.gc_horizon),
gc_period: self.gc_period.or(other.gc_period),
pg_distrib_dir: self.pg_distrib_dir.or(other.pg_distrib_dir),
@@ -80,14 +72,14 @@ impl CfgFileParams {
fn try_into_config(&self) -> Result<PageServerConf> {
let workdir = PathBuf::from(".");
let listen_addr = match self.listen_addr.as_ref() {
let listen_pg_addr = match self.listen_pg_addr.as_ref() {
Some(addr) => addr.clone(),
None => DEFAULT_LISTEN_ADDR.to_owned(),
None => DEFAULT_PG_LISTEN_ADDR.to_owned(),
};
let http_endpoint_addr = match self.http_endpoint_addr.as_ref() {
let listen_http_addr = match self.listen_http_addr.as_ref() {
Some(addr) => addr.clone(),
None => DEFAULT_HTTP_ENDPOINT_ADDR.to_owned(),
None => DEFAULT_HTTP_LISTEN_ADDR.to_owned(),
};
let gc_horizon: u64 = match self.gc_horizon.as_ref() {
@@ -135,8 +127,8 @@ impl CfgFileParams {
Ok(PageServerConf {
daemonize: false,
listen_addr,
http_endpoint_addr,
listen_pg_addr,
listen_http_addr,
gc_horizon,
gc_period,
@@ -156,12 +148,20 @@ fn main() -> Result<()> {
let arg_matches = App::new("Zenith page server")
.about("Materializes WAL stream to pages and serves them to the postgres")
.arg(
Arg::with_name("listen")
Arg::with_name("listen-pg")
.short("l")
.long("listen")
.long("listen-pg")
.alias("listen") // keep some compatibility
.takes_value(true)
.help("listen for incoming page requests on ip:port (default: 127.0.0.1:5430)"),
)
.arg(
Arg::with_name("listen-http")
.long("listen-http")
.alias("http_endpoint") // keep some compatibility
.takes_value(true)
.help("http endpoint address for for metrics and management API calls ip:port (default: 127.0.0.1:5430)"),
)
.arg(
Arg::with_name("daemonize")
.short("d")
@@ -280,15 +280,15 @@ fn start_pageserver(conf: &'static PageServerConf) -> Result<()> {
// bind sockets before daemonizing so we report errors early and do not return until we are listening
info!(
"Starting pageserver http handler on {}",
conf.http_endpoint_addr
conf.listen_http_addr
);
let http_listener = TcpListener::bind(conf.http_endpoint_addr.clone())?;
let http_listener = TcpListener::bind(conf.listen_http_addr.clone())?;
info!(
"Starting pageserver pg protocol handler on {}",
conf.listen_addr
conf.listen_pg_addr
);
let pageserver_listener = TcpListener::bind(conf.listen_addr.clone())?;
let pageserver_listener = TcpListener::bind(conf.listen_pg_addr.clone())?;
if conf.daemonize {
info!("daemonizing...");

View File

@@ -20,6 +20,20 @@ pub mod waldecoder;
pub mod walreceiver;
pub mod walredo;
pub mod defaults {
use std::time::Duration;
pub const DEFAULT_PG_LISTEN_PORT: u16 = 64000;
pub const DEFAULT_PG_LISTEN_ADDR: &str = "127.0.0.1:64000"; // can't format! const yet...
pub const DEFAULT_HTTP_LISTEN_PORT: u16 = 9898;
pub const DEFAULT_HTTP_LISTEN_ADDR: &str = "127.0.0.1:9898";
pub const DEFAULT_GC_HORIZON: u64 = 64 * 1024 * 1024;
pub const DEFAULT_GC_PERIOD: Duration = Duration::from_secs(100);
pub const DEFAULT_SUPERUSER: &str = "zenith_admin";
}
lazy_static! {
static ref LIVE_CONNECTIONS_COUNT: IntGaugeVec = register_int_gauge_vec!(
"pageserver_live_connections_count",
@@ -34,8 +48,8 @@ pub const LOG_FILE_NAME: &str = "pageserver.log";
#[derive(Debug, Clone)]
pub struct PageServerConf {
pub daemonize: bool,
pub listen_addr: String,
pub http_endpoint_addr: String,
pub listen_pg_addr: String,
pub listen_http_addr: String,
pub gc_horizon: u64,
pub gc_period: Duration,
pub superuser: String,
@@ -123,8 +137,8 @@ impl PageServerConf {
daemonize: false,
gc_horizon: 64 * 1024 * 1024,
gc_period: Duration::from_secs(10),
listen_addr: "127.0.0.1:5430".to_string(),
http_endpoint_addr: "127.0.0.1:9898".to_string(),
listen_pg_addr: "127.0.0.1:5430".to_string(),
listen_http_addr: "127.0.0.1:9898".to_string(),
superuser: "zenith_admin".to_string(),
workdir: repo_dir,
pg_distrib_dir: "".into(),

View File

@@ -263,8 +263,8 @@ mod tests {
daemonize: false,
gc_horizon: 64 * 1024 * 1024,
gc_period: Duration::from_secs(10),
listen_addr: "127.0.0.1:5430".to_string(),
http_endpoint_addr: "127.0.0.1:9898".to_string(),
listen_pg_addr: "127.0.0.1:5430".to_string(),
listen_http_addr: "127.0.0.1:9898".to_string(),
superuser: "zenith_admin".to_string(),
workdir: repo_dir,
pg_distrib_dir: "".into(),

View File

@@ -9,6 +9,7 @@ psycopg2 = "*"
typing-extensions = "*"
pyjwt = {extras = ["crypto"], version = "*"}
requests = "*"
pytest-xdist = "*"
[dev-packages]
yapf = "*"

View File

@@ -1,7 +1,7 @@
{
"_meta": {
"hash": {
"sha256": "b666740289d9c82797e5c39b2a7f0074c865c9183ee878ce4fa5cda7928506ea"
"sha256": "480afaf71a214984dac55d128a4f67ec2d9749136e570c64df562c79900a9d83"
},
"pipfile-spec": 6,
"requires": {
@@ -91,22 +91,33 @@
},
"cryptography": {
"hashes": [
"sha256:0f1212a66329c80d68aeeb39b8a16d54ef57071bf22ff4e521657b27372e327d",
"sha256:1e056c28420c072c5e3cb36e2b23ee55e260cb04eee08f702e0edfec3fb51959",
"sha256:240f5c21aef0b73f40bb9f78d2caff73186700bf1bc6b94285699aff98cc16c6",
"sha256:26965837447f9c82f1855e0bc8bc4fb910240b6e0d16a664bb722df3b5b06873",
"sha256:37340614f8a5d2fb9aeea67fd159bfe4f5f4ed535b1090ce8ec428b2f15a11f2",
"sha256:3d10de8116d25649631977cb37da6cbdd2d6fa0e0281d014a5b7d337255ca713",
"sha256:3d8427734c781ea5f1b41d6589c293089704d4759e34597dce91014ac125aad1",
"sha256:7ec5d3b029f5fa2b179325908b9cd93db28ab7b85bb6c1db56b10e0b54235177",
"sha256:8e56e16617872b0957d1c9742a3f94b43533447fd78321514abbe7db216aa250",
"sha256:b01fd6f2737816cb1e08ed4807ae194404790eac7ad030b34f2ce72b332f5586",
"sha256:bf40af59ca2465b24e54f671b2de2c59257ddc4f7e5706dbd6930e26823668d3",
"sha256:de4e5f7f68220d92b7637fc99847475b59154b7a1b3868fb7385337af54ac9ca",
"sha256:eb8cc2afe8b05acbd84a43905832ec78e7b3873fb124ca190f574dca7389a87d",
"sha256:ee77aa129f481be46f8d92a1a7db57269a2f23052d5f2433b4621bb457081cc9"
"sha256:0a7dcbcd3f1913f664aca35d47c1331fce738d44ec34b7be8b9d332151b0b01e",
"sha256:1eb7bb0df6f6f583dd8e054689def236255161ebbcf62b226454ab9ec663746b",
"sha256:21ca464b3a4b8d8e86ba0ee5045e103a1fcfac3b39319727bc0fc58c09c6aff7",
"sha256:34dae04a0dce5730d8eb7894eab617d8a70d0c97da76b905de9efb7128ad7085",
"sha256:3520667fda779eb788ea00080124875be18f2d8f0848ec00733c0ec3bb8219fc",
"sha256:3fa3a7ccf96e826affdf1a0a9432be74dc73423125c8f96a909e3835a5ef194a",
"sha256:5b0fbfae7ff7febdb74b574055c7466da334a5371f253732d7e2e7525d570498",
"sha256:8695456444f277af73a4877db9fc979849cd3ee74c198d04fc0776ebc3db52b9",
"sha256:94cc5ed4ceaefcbe5bf38c8fba6a21fc1d365bb8fb826ea1688e3370b2e24a1c",
"sha256:94fff993ee9bc1b2440d3b7243d488c6a3d9724cc2b09cdb297f6a886d040ef7",
"sha256:9965c46c674ba8cc572bc09a03f4c649292ee73e1b683adb1ce81e82e9a6a0fb",
"sha256:a00cf305f07b26c351d8d4e1af84ad7501eca8a342dedf24a7acb0e7b7406e14",
"sha256:a305600e7a6b7b855cd798e00278161b681ad6e9b7eca94c721d5f588ab212af",
"sha256:cd65b60cfe004790c795cc35f272e41a3df4631e2fb6b35aa7ac6ef2859d554e",
"sha256:d2a6e5ef66503da51d2110edf6c403dc6b494cc0082f85db12f54e9c5d4c3ec5",
"sha256:d9ec0e67a14f9d1d48dd87a2531009a9b251c02ea42851c060b25c782516ff06",
"sha256:f44d141b8c4ea5eb4dbc9b3ad992d45580c1d22bf5e24363f2fbf50c2d7ae8a7"
],
"version": "==3.4.7"
"version": "==3.4.8"
},
"execnet": {
"hashes": [
"sha256:8f694f3ba9cc92cab508b152dcfe322153975c29bda272e2fd7f3f00f36e47c5",
"sha256:a295f7cc774947aac58dde7fdc85f4aa00c42adf5d8f5468fc630c1acf30a142"
],
"markers": "python_version >= '2.7' and python_version not in '3.0, 3.1, 3.2, 3.3, 3.4'",
"version": "==1.9.0"
},
"idna": {
"hashes": [
@@ -133,11 +144,11 @@
},
"pluggy": {
"hashes": [
"sha256:15b2acde666561e1298d71b523007ed7364de07029219b604cf808bfa1c765b0",
"sha256:966c145cd83c96502c3c3868f50408687b38434af77734af1e9ca461a4081d2d"
"sha256:4224373bacce55f955a878bf9cfa763c1e360858e330072059e10bad68531159",
"sha256:74134bbf457f031a36d68416e1509f34bd5ccc019f0bcc952c7b909d06b37bd3"
],
"markers": "python_version >= '2.7' and python_version not in '3.0, 3.1, 3.2, 3.3'",
"version": "==0.13.1"
"markers": "python_version >= '3.6'",
"version": "==1.0.0"
},
"psycopg2": {
"hashes": [
@@ -191,11 +202,27 @@
},
"pytest": {
"hashes": [
"sha256:50bcad0a0b9c5a72c8e4e7c9855a3ad496ca6a881a3641b4260605450772c54b",
"sha256:91ef2131a9bd6be8f76f1f08eac5c5317221d6ad1e143ae03894b862e8976890"
"sha256:131b36680866a76e6781d13f101efb86cf674ebb9762eb70d3082b6f29889e89",
"sha256:7310f8d27bc79ced999e760ca304d69f6ba6c6649c0b60fb0e04a4a77cacc134"
],
"index": "pypi",
"version": "==6.2.4"
"version": "==6.2.5"
},
"pytest-forked": {
"hashes": [
"sha256:6aa9ac7e00ad1a539c41bec6d21011332de671e938c7637378ec9710204e37ca",
"sha256:dc4147784048e70ef5d437951728825a131b81714b398d5d52f17c7c144d8815"
],
"markers": "python_version >= '2.7' and python_version not in '3.0, 3.1, 3.2, 3.3, 3.4'",
"version": "==1.3.0"
},
"pytest-xdist": {
"hashes": [
"sha256:e8ecde2f85d88fbcadb7d28cb33da0fa29bca5cf7d5967fa89fc0e97e5299ea5",
"sha256:ed3d7da961070fce2a01818b51f6888327fb88df4379edeb6b9d990e789d9c8d"
],
"index": "pypi",
"version": "==2.3.0"
},
"requests": {
"hashes": [
@@ -215,12 +242,12 @@
},
"typing-extensions": {
"hashes": [
"sha256:0ac0f89795dd19de6b97debb0c6af1c70987fd80a2d62d1958f7e56fcc31b497",
"sha256:50b6f157849174217d0656f99dc82fe932884fb250826c18350e159ec6cdf342",
"sha256:779383f6086d90c99ae41cf0ff39aac8a7937a9283ce0a414e5dd782f4c94a84"
"sha256:49f75d16ff11f1cd258e1b988ccff82a3ca5570217d7ad8c5f48205dd99a677e",
"sha256:d8226d10bc02a29bcc81df19a26e56a9647f8b0a6d4a83924139f4a8b01f17b7",
"sha256:f1d25edafde516b146ecd0613dabcc61409817af4766fbbcfb8d1ad4ec441a34"
],
"index": "pypi",
"version": "==3.10.0.0"
"version": "==3.10.0.2"
},
"urllib3": {
"hashes": [
@@ -309,12 +336,12 @@
},
"typing-extensions": {
"hashes": [
"sha256:0ac0f89795dd19de6b97debb0c6af1c70987fd80a2d62d1958f7e56fcc31b497",
"sha256:50b6f157849174217d0656f99dc82fe932884fb250826c18350e159ec6cdf342",
"sha256:779383f6086d90c99ae41cf0ff39aac8a7937a9283ce0a414e5dd782f4c94a84"
"sha256:49f75d16ff11f1cd258e1b988ccff82a3ca5570217d7ad8c5f48205dd99a677e",
"sha256:d8226d10bc02a29bcc81df19a26e56a9647f8b0a6d4a83924139f4a8b01f17b7",
"sha256:f1d25edafde516b146ecd0613dabcc61409817af4766fbbcfb8d1ad4ec441a34"
],
"index": "pypi",
"version": "==3.10.0.0"
"version": "==3.10.0.2"
},
"yapf": {
"hashes": [

View File

@@ -1,11 +1,15 @@
from contextlib import closing
from typing import Iterator
from uuid import uuid4
import psycopg2
from fixtures.zenith_fixtures import Postgres, ZenithCli, ZenithPageserver, PgBin
import pytest
pytest_plugins = ("fixtures.zenith_fixtures")
def test_pageserver_auth(pageserver_auth_enabled: ZenithPageserver):
ps = pageserver_auth_enabled
@@ -42,7 +46,8 @@ def test_compute_auth_to_pageserver(
pageserver_auth_enabled: ZenithPageserver,
repo_dir: str,
with_wal_acceptors: bool,
pg_bin: PgBin
pg_bin: PgBin,
port_distributor: Iterator[int],
):
ps = pageserver_auth_enabled
# since we are in progress of refactoring protocols between compute safekeeper and page server
@@ -59,7 +64,7 @@ def test_compute_auth_to_pageserver(
repo_dir=repo_dir,
pg_bin=pg_bin,
tenant_id=ps.initial_tenant,
port=55432, # FIXME port distribution is hardcoded in tests and in cli
port=next(port_distributor),
).create_start(
branch,
wal_acceptors=wa_factory.get_connstrs() if with_wal_acceptors else None,

View File

@@ -99,4 +99,4 @@ def test_dropdb(
assert os.path.isdir(dbpath) == False
# Check that we restore the content of the datadir correctly
check_restored_datadir_content(zenith_cli, test_output_dir, pg)
check_restored_datadir_content(zenith_cli, test_output_dir, pg, pageserver.service_port.pg)

View File

@@ -66,4 +66,4 @@ def test_multixact(pageserver: ZenithPageserver, postgres: PostgresFactory,
assert next_multixact_id_new == next_multixact_id
# Check that we restore the content of the datadir correctly
check_restored_datadir_content(zenith_cli, test_output_dir, pg_new)
check_restored_datadir_content(zenith_cli, test_output_dir, pg_new, pageserver.service_port.pg)

View File

@@ -55,4 +55,4 @@ def test_pg_regress(pageserver: ZenithPageserver, postgres: PostgresFactory, pg_
lsn = pg.safe_psql('select pg_current_wal_insert_lsn()')[0][0]
# Check that we restore the content of the datadir correctly
check_restored_datadir_content(zenith_cli, test_output_dir, pg)
check_restored_datadir_content(zenith_cli, test_output_dir, pg, pageserver.service_port.pg)

View File

@@ -1,13 +1,13 @@
import os
from fixtures.utils import mkdir_if_needed
from fixtures.zenith_fixtures import PostgresFactory, check_restored_datadir_content
from fixtures.zenith_fixtures import PageserverPort, PostgresFactory, check_restored_datadir_content
pytest_plugins = ("fixtures.zenith_fixtures")
def test_zenith_regress(postgres: PostgresFactory, pg_bin, zenith_cli, test_output_dir, pg_distrib_dir,
base_dir, capsys):
base_dir, capsys, pageserver_port: PageserverPort):
# Create a branch for us
zenith_cli.run(["branch", "test_zenith_regress", "empty"])
@@ -56,4 +56,4 @@ def test_zenith_regress(postgres: PostgresFactory, pg_bin, zenith_cli, test_outp
lsn = pg.safe_psql('select pg_current_wal_insert_lsn()')[0][0]
# Check that we restore the content of the datadir correctly
check_restored_datadir_content(zenith_cli, test_output_dir, pg)
check_restored_datadir_content(zenith_cli, test_output_dir, pg, pageserver_port.pg)

View File

@@ -14,10 +14,11 @@ def mkdir_if_needed(path: str) -> None:
Note this won't try to create intermediate directories.
"""
if os.path.exists(path):
assert os.path.isdir(path)
return
os.mkdir(path)
try:
os.mkdir(path)
except FileExistsError:
pass
assert os.path.isdir(path)
def subprocess_capture(capture_dir: str, cmd: List[str], **kwargs: Any) -> None:

View File

@@ -50,7 +50,7 @@ DEFAULT_OUTPUT_DIR = 'test_output'
DEFAULT_POSTGRES_DIR = 'tmp_install'
DEFAULT_PAGESERVER_PG_PORT = 64000
DEFAULT_PAGESERVER_HTTP_PORT = 9898
BASE_PORT = 55000
def determine_scope(fixture_name: str, config: Any) -> str:
@@ -75,20 +75,6 @@ def zenfixture(func: Fn) -> Fn:
return pytest.fixture(func, scope=scope)
@pytest.fixture(autouse=True, scope='session')
def safety_check() -> None:
""" Ensure that no unwanted daemons are running before we start testing. """
# does not use -c as it is not supported on macOS
cmd = ['pgrep', 'pageserver|postgres|wal_acceptor']
result = subprocess.run(cmd, stdout=subprocess.DEVNULL)
if result.returncode == 0:
# returncode of 0 means it found something.
# This is bad; we don't want any of those processes polluting the
# result of the test.
raise Exception('found interfering processes running')
class PgProtocol:
""" Reusable connection logic """
def __init__(self, host: str, port: int, username: Optional[str] = None):
@@ -277,24 +263,48 @@ class AuthKeys:
return token
@zenfixture
def worker_seq_no(worker_id: str):
if worker_id == 'master':
return 0
assert worker_id.startswith('gw')
return int(worker_id[2:])
@zenfixture
def worker_base_port(worker_seq_no: int):
# so we divide ports in ranges of 100 ports
# so workers have disjoint set of ports for services
return BASE_PORT + worker_seq_no * 100
@zenfixture
def port_distributor(worker_base_port):
yield iter(range(worker_base_port, worker_base_port + 100))
@dataclass
class PageserverPort:
pg: int
http: int
class ZenithPageserver(PgProtocol):
""" An object representing a running pageserver. """
def __init__(self, zenith_cli: ZenithCli, repo_dir: str):
super().__init__(host='localhost', port=DEFAULT_PAGESERVER_PG_PORT)
def __init__(self, zenith_cli: ZenithCli, repo_dir: str, port: PageserverPort):
super().__init__(host='localhost', port=port.pg)
self.zenith_cli = zenith_cli
self.running = False
self.initial_tenant = None
self.repo_dir = repo_dir
self.service_port = port # do not shadow PgProtocol.port which is just int
def init(self, enable_auth: bool = False) -> 'ZenithPageserver':
"""
Initialize the repository, i.e. run "zenith init".
Returns self.
"""
cmd = ['init']
cmd = ['init', f'--pageserver-pg-port={self.service_port.pg}', f'--pageserver-http-port={self.service_port.http}']
if enable_auth:
cmd.append('--enable-auth')
self.zenith_cli.run(cmd)
@@ -344,13 +354,23 @@ class ZenithPageserver(PgProtocol):
def http_client(self, auth_token: Optional[str] = None):
return ZenithPageserverHttpClient(
port=DEFAULT_PAGESERVER_HTTP_PORT,
port=self.service_port.http,
auth_token=auth_token,
)
@zenfixture
def pageserver(zenith_cli: ZenithCli, repo_dir: str) -> Iterator[ZenithPageserver]:
def pageserver_port(port_distributor: Iterator[int]) -> PageserverPort:
pg = next(port_distributor)
http = next(port_distributor)
print(f"pageserver_port: pg={pg} http={http}")
return PageserverPort(pg=pg, http=http)
@zenfixture
def pageserver(zenith_cli: ZenithCli, repo_dir: str, pageserver_port: PageserverPort) -> Iterator[ZenithPageserver]:
"""
The 'pageserver' fixture provides a Page Server that's up and running.
@@ -362,8 +382,7 @@ def pageserver(zenith_cli: ZenithCli, repo_dir: str) -> Iterator[ZenithPageserve
By convention, the test branches are named after the tests. For example,
test called 'test_foo' would create and use branches with the 'test_foo' prefix.
"""
ps = ZenithPageserver(zenith_cli, repo_dir).init().start()
ps = ZenithPageserver(zenith_cli=zenith_cli, repo_dir=repo_dir, port=pageserver_port).init().start()
# For convenience in tests, create a branch from the freshly-initialized cluster.
zenith_cli.run(["branch", "empty", "main"])
@@ -433,8 +452,8 @@ def pg_bin(test_output_dir: str, pg_distrib_dir: str) -> PgBin:
return PgBin(test_output_dir, pg_distrib_dir)
@pytest.fixture
def pageserver_auth_enabled(zenith_cli: ZenithCli, repo_dir: str):
with ZenithPageserver(zenith_cli, repo_dir).init(enable_auth=True).start() as ps:
def pageserver_auth_enabled(zenith_cli: ZenithCli, repo_dir: str, pageserver_port: PageserverPort):
with ZenithPageserver(zenith_cli=zenith_cli, repo_dir=repo_dir, port=pageserver_port).init(enable_auth=True).start() as ps:
# For convenience in tests, create a branch from the freshly-initialized cluster.
zenith_cli.run(["branch", "empty", "main"])
yield ps
@@ -470,7 +489,7 @@ class Postgres(PgProtocol):
if not config_lines:
config_lines = []
self.zenith_cli.run(['pg', 'create', branch, f'--tenantid={self.tenant_id}'])
self.zenith_cli.run(['pg', 'create', branch, f'--tenantid={self.tenant_id}', f'--port={self.port}'])
self.branch = branch
path = pathlib.Path('pgdatadirs') / 'tenants' / self.tenant_id / self.branch
self.pgdata_dir = os.path.join(self.repo_dir, path)
@@ -493,7 +512,7 @@ class Postgres(PgProtocol):
print(f"Starting postgres on branch {self.branch}")
run_result = self.zenith_cli.run(['pg', 'start', self.branch, f'--tenantid={self.tenant_id}'])
run_result = self.zenith_cli.run(['pg', 'start', self.branch, f'--tenantid={self.tenant_id}', f'--port={self.port}'])
self.running = True
print(f"stdout: {run_result.stdout}")
@@ -607,13 +626,13 @@ class Postgres(PgProtocol):
class PostgresFactory:
""" An object representing multiple running postgres daemons. """
def __init__(self, zenith_cli: ZenithCli, repo_dir: str, pg_bin: PgBin, initial_tenant: str, base_port: int = 55431):
def __init__(self, zenith_cli: ZenithCli, repo_dir: str, pg_bin: PgBin, initial_tenant: str, port_distributor: Iterator[int]):
self.zenith_cli = zenith_cli
self.repo_dir = repo_dir
self.num_instances = 0
self.instances: List[Postgres] = []
self.initial_tenant: str = initial_tenant
self.base_port = base_port
self.port_distributor = port_distributor
self.pg_bin = pg_bin
def create_start(
@@ -623,15 +642,13 @@ class PostgresFactory:
wal_acceptors: Optional[str] = None,
config_lines: Optional[List[str]] = None
) -> Postgres:
pg = Postgres(
zenith_cli=self.zenith_cli,
repo_dir=self.repo_dir,
pg_bin=self.pg_bin,
tenant_id=tenant_id or self.initial_tenant,
port=self.base_port + self.num_instances + 1,
port=next(self.port_distributor),
)
self.num_instances += 1
self.instances.append(pg)
@@ -654,7 +671,7 @@ class PostgresFactory:
repo_dir=self.repo_dir,
pg_bin=self.pg_bin,
tenant_id=tenant_id or self.initial_tenant,
port=self.base_port + self.num_instances + 1,
port=next(self.port_distributor),
)
self.num_instances += 1
@@ -679,7 +696,7 @@ class PostgresFactory:
repo_dir=self.repo_dir,
pg_bin=self.pg_bin,
tenant_id=tenant_id or self.initial_tenant,
port=self.base_port + self.num_instances + 1,
port=next(self.port_distributor),
)
self.num_instances += 1
@@ -703,8 +720,14 @@ def initial_tenant(pageserver: ZenithPageserver):
@zenfixture
def postgres(zenith_cli: ZenithCli, initial_tenant: str, repo_dir: str, pg_bin: PgBin) -> Iterator[PostgresFactory]:
pgfactory = PostgresFactory(zenith_cli, repo_dir, pg_bin, initial_tenant=initial_tenant)
def postgres(zenith_cli: ZenithCli, initial_tenant: str, repo_dir: str, pg_bin: PgBin, port_distributor: Iterator[int]) -> Iterator[PostgresFactory]:
pgfactory = PostgresFactory(
zenith_cli=zenith_cli,
repo_dir=repo_dir,
pg_bin=pg_bin,
initial_tenant=initial_tenant,
port_distributor=port_distributor,
)
yield pgfactory
@@ -720,10 +743,11 @@ def read_pid(path: Path):
@dataclass
class WalAcceptor:
""" An object representing a running wal acceptor daemon. """
bin_path: Path
wa_bin_path: Path
data_dir: Path
port: int
num: int # identifier for logging
pageserver_port: int
auth_token: Optional[str] = None
def start(self) -> 'WalAcceptor':
@@ -731,13 +755,13 @@ class WalAcceptor:
self.data_dir.mkdir(parents=True, exist_ok=True)
self.pidfile.unlink(missing_ok=True)
cmd = [str(self.bin_path)]
cmd = [str(self.wa_bin_path)]
cmd.extend(["-D", str(self.data_dir)])
cmd.extend(["-l", "localhost:{}".format(self.port)])
cmd.extend(["-l", f"localhost:{self.port}"])
cmd.append("--daemonize")
cmd.append("--no-sync")
# Tell page server it can receive WAL from this WAL safekeeper
cmd.extend(["--pageserver", "localhost:{}".format(DEFAULT_PAGESERVER_PG_PORT)])
cmd.extend(["--pageserver", f"localhost:{self.pageserver_port}"])
cmd.extend(["--recall", "1 second"])
print('Running command "{}"'.format(' '.join(cmd)))
env = {'PAGESERVER_AUTH_TOKEN': self.auth_token} if self.auth_token else None
@@ -784,24 +808,25 @@ class WalAcceptor:
class WalAcceptorFactory:
""" An object representing multiple running wal acceptors. """
def __init__(self, zenith_binpath: Path, data_dir: Path):
self.wa_binpath = zenith_binpath / 'wal_acceptor'
def __init__(self, zenith_binpath: Path, data_dir: Path, pageserver_port: int, port_distributor: Iterator[int]):
self.wa_bin_path = zenith_binpath / 'wal_acceptor'
self.data_dir = data_dir
self.instances: List[WalAcceptor] = []
self.initial_port = 54321
self.port_distributor = port_distributor
self.pageserver_port = pageserver_port
def start_new(self, auth_token: Optional[str] = None) -> WalAcceptor:
"""
Start new wal acceptor.
"""
wa_num = len(self.instances)
wa = WalAcceptor(
self.wa_binpath,
self.data_dir / "wal_acceptor_{}".format(wa_num),
self.initial_port + wa_num,
wa_num,
auth_token,
wa_bin_path=self.wa_bin_path,
data_dir=self.data_dir / "wal_acceptor_{}".format(wa_num),
port=next(self.port_distributor),
num=wa_num,
pageserver_port=self.pageserver_port,
auth_token=auth_token,
)
wa.start()
self.instances.append(wa)
@@ -826,9 +851,14 @@ class WalAcceptorFactory:
@zenfixture
def wa_factory(zenith_binpath: str, repo_dir: str) -> Iterator[WalAcceptorFactory]:
def wa_factory(zenith_binpath: str, repo_dir: str, pageserver_port: PageserverPort, port_distributor: Iterator[int]) -> Iterator[WalAcceptorFactory]:
""" Gives WalAcceptorFactory providing wal acceptors. """
wafactory = WalAcceptorFactory(Path(zenith_binpath), Path(repo_dir) / "wal_acceptors")
wafactory = WalAcceptorFactory(
zenith_binpath=Path(zenith_binpath),
data_dir=Path(repo_dir) / "wal_acceptors",
pageserver_port=pageserver_port.pg,
port_distributor=port_distributor,
)
yield wafactory
# After the yield comes any cleanup code we need.
print('Starting wal acceptors cleanup')
@@ -954,7 +984,7 @@ def list_files_to_compare(pgdata_dir: str):
return pgdata_files
# pg is the existing and running compute node, that we want to compare with a basebackup
def check_restored_datadir_content(zenith_cli, test_output_dir, pg):
def check_restored_datadir_content(zenith_cli: ZenithCli, test_output_dir: str, pg: Postgres, pageserver_pg_port: int):
# Get the timeline ID of our branch. We need it for the 'basebackup' command
with closing(pg.connect()) as conn:
@@ -969,12 +999,11 @@ def check_restored_datadir_content(zenith_cli, test_output_dir, pg):
restored_dir_path = os.path.join(test_output_dir, "{}_restored_datadir".format(pg.branch))
mkdir_if_needed(restored_dir_path)
cmd = "psql -h 127.0.0.1 -p {} -c 'basebackup {} {}' | tar -x -C {}".format(
DEFAULT_PAGESERVER_PG_PORT, pg.tenant_id, timeline, restored_dir_path)
cmd = f"psql -h localhost -p {pageserver_pg_port} -c 'basebackup {pg.tenant_id} {timeline}' | tar -x -C {restored_dir_path}"
cmd = os.path.join(pg.pg_bin.pg_bin_path, cmd)
subprocess.run(cmd, shell=True)
subprocess.check_call(cmd, shell=True)
# list files we're going to compare
pgdata_files = list_files_to_compare(pg.pgdata_dir)

View File

@@ -4,6 +4,7 @@ use clap::{App, AppSettings, Arg, ArgMatches, SubCommand};
use control_plane::compute::ComputeControlPlane;
use control_plane::local_env;
use control_plane::storage::PageServerNode;
use pageserver::defaults::{DEFAULT_HTTP_LISTEN_PORT, DEFAULT_PG_LISTEN_PORT};
use std::collections::HashMap;
use std::process::exit;
use std::str::FromStr;
@@ -43,11 +44,28 @@ fn main() -> Result<()> {
.takes_value(true)
.required(false);
let port_arg = Arg::with_name("port")
.long("port")
.required(false)
.value_name("port");
let matches = App::new("Zenith CLI")
.setting(AppSettings::ArgRequiredElseHelp)
.subcommand(
SubCommand::with_name("init")
.about("Initialize a new Zenith repository")
.arg(
Arg::with_name("pageserver-pg-port")
.long("pageserver-pg-port")
.required(false)
.value_name("pageserver-pg-port"),
)
.arg(
Arg::with_name("pageserver-http-port")
.long("pageserver-http-port")
.required(false)
.value_name("pageserver-http-port"),
)
.arg(
Arg::with_name("enable-auth")
.long("enable-auth")
@@ -79,7 +97,7 @@ fn main() -> Result<()> {
.subcommand(SubCommand::with_name("list").arg(tenantid_arg.clone()))
.subcommand(SubCommand::with_name("create")
.about("Create a postgres compute node")
.arg(timeline_arg.clone()).arg(tenantid_arg.clone())
.arg(timeline_arg.clone()).arg(tenantid_arg.clone()).arg(port_arg.clone())
.arg(
Arg::with_name("config-only")
.help("Don't do basebackup, create compute node with only config files")
@@ -88,7 +106,11 @@ fn main() -> Result<()> {
))
.subcommand(SubCommand::with_name("start")
.about("Start a postgres compute node.\n This command actually creates new node from scratch, but preserves existing config files")
.arg(timeline_arg.clone()).arg(tenantid_arg.clone()))
.arg(
timeline_arg.clone()
).arg(
tenantid_arg.clone()
).arg(port_arg.clone()))
.subcommand(
SubCommand::with_name("stop")
.arg(timeline_arg.clone())
@@ -99,19 +121,36 @@ fn main() -> Result<()> {
.long("destroy")
.required(false)
)
)
)
)
.get_matches();
// Create config file
if let ("init", Some(init_match)) = matches.subcommand() {
let tenantid = ZTenantId::generate();
let pageserver_pg_port = match init_match.value_of("pageserver-pg-port") {
Some(v) => v.parse()?,
None => DEFAULT_PG_LISTEN_PORT,
};
let pageserver_http_port = match init_match.value_of("pageserver-http-port") {
Some(v) => v.parse()?,
None => DEFAULT_HTTP_LISTEN_PORT,
};
let auth_type = if init_match.is_present("enable-auth") {
AuthType::ZenithJWT
} else {
AuthType::Trust
};
local_env::init(tenantid, auth_type).with_context(|| "Failed to create config file")?;
local_env::init(
pageserver_pg_port,
pageserver_http_port,
tenantid,
auth_type,
)
.with_context(|| "Failed to create config file")?;
}
// all other commands would need config
@@ -412,7 +451,11 @@ fn handle_pg(pg_match: &ArgMatches, env: &local_env::LocalEnv) -> Result<()> {
.map_or(Ok(env.tenantid), |value| value.parse())?;
let timeline_name = create_match.value_of("timeline").unwrap_or("main");
cplane.new_node(tenantid, timeline_name)?;
let port: Option<u16> = match create_match.value_of("port") {
Some(p) => Some(p.parse()?),
None => None,
};
cplane.new_node(tenantid, timeline_name, port)?;
}
("start", Some(start_match)) => {
let tenantid: ZTenantId = start_match
@@ -420,6 +463,11 @@ fn handle_pg(pg_match: &ArgMatches, env: &local_env::LocalEnv) -> Result<()> {
.map_or(Ok(env.tenantid), |value| value.parse())?;
let timeline_name = start_match.value_of("timeline").unwrap_or("main");
let port: Option<u16> = match start_match.value_of("port") {
Some(p) => Some(p.parse()?),
None => None,
};
let node = cplane.nodes.get(&(tenantid, timeline_name.to_owned()));
let auth_token = if matches!(env.auth_type, AuthType::ZenithJWT) {
@@ -437,7 +485,12 @@ fn handle_pg(pg_match: &ArgMatches, env: &local_env::LocalEnv) -> Result<()> {
if let Some(node) = node {
node.start(&auth_token)?;
} else {
let node = cplane.new_node(tenantid, timeline_name)?;
// when used with custom port this results in non obvious behaviour
// port is remembered from first start command, i e
// start --port X
// stop
// start <-- will also use port X even without explicit port argument
let node = cplane.new_node(tenantid, timeline_name, port)?;
node.start(&auth_token)?;
}
}

View File

@@ -1,5 +1,3 @@
use std::net::TcpListener;
use crate::auth::{self, Claims, JwtAuth};
use crate::http::error;
use crate::zid::ZTenantId;
@@ -10,6 +8,7 @@ use lazy_static::lazy_static;
use routerify::ext::RequestExt;
use routerify::RequestInfo;
use routerify::{Middleware, Router, RouterBuilder, RouterService};
use std::net::TcpListener;
use zenith_metrics::{register_int_counter, IntCounter};
use zenith_metrics::{Encoder, TextEncoder};