From 7b6337db583861811c1ee187aa2249095e204da9 Mon Sep 17 00:00:00 2001 From: John Spray Date: Fri, 8 Sep 2023 16:19:57 +0100 Subject: [PATCH] tests: enable multiple pageservers in `neon_local` and `neon_fixture` (#5231) ## Problem Currently our testing environment only supports running a single pageserver at a time. This is insufficient for testing failover and migrations. - Dependency of writing tests for #5207 ## Summary of changes - `neon_local` and `neon_fixture` now handle multiple pageservers - This is a breaking change to the `.neon/config` format: any local environments will need recreating - Existing tests continue to work unchanged: - The default number of pageservers is 1 - `NeonEnv.pageserver` is now a helper property that retrieves the first pageserver if there is only one, else throws. - Pageserver data directories are now at `.neon/pageserver_{n}` where n is 1,2,3... - Compatibility tests get some special casing to migrate neon_local configs: these are not meant to be backward/forward compatible, but they were treated that way by the test. --- control_plane/simple.conf | 3 +- control_plane/src/attachment_service.rs | 3 +- control_plane/src/bin/neon_local.rs | 154 ++++++++---- control_plane/src/endpoint.rs | 26 +- control_plane/src/local_env.rs | 38 ++- control_plane/src/pageserver.rs | 61 +++-- test_runner/fixtures/neon_fixtures.py | 223 +++++++++++++----- test_runner/regress/test_broken_timeline.py | 14 +- test_runner/regress/test_close_fds.py | 2 +- test_runner/regress/test_compatibility.py | 79 ++++--- test_runner/regress/test_import.py | 2 +- test_runner/regress/test_large_schema.py | 2 +- test_runner/regress/test_neon_cli.py | 38 ++- test_runner/regress/test_ondemand_download.py | 6 +- test_runner/regress/test_pageserver_api.py | 6 +- test_runner/regress/test_read_trace.py | 2 +- test_runner/regress/test_remote_storage.py | 6 +- test_runner/regress/test_tenant_conf.py | 2 +- test_runner/regress/test_tenant_delete.py | 3 +- test_runner/regress/test_tenant_detach.py | 14 +- test_runner/regress/test_tenant_relocation.py | 4 +- test_runner/regress/test_tenants.py | 11 +- .../test_tenants_with_remote_storage.py | 4 +- test_runner/regress/test_timeline_delete.py | 22 +- test_runner/regress/test_wal_restore.py | 2 +- .../test_walredo_not_left_behind_on_detach.py | 6 +- 26 files changed, 497 insertions(+), 236 deletions(-) diff --git a/control_plane/simple.conf b/control_plane/simple.conf index 243e13f3d3..0ad90a4618 100644 --- a/control_plane/simple.conf +++ b/control_plane/simple.conf @@ -1,6 +1,7 @@ # Minimal neon environment with one safekeeper. This is equivalent to the built-in # defaults that you get with no --config -[pageserver] +[[pageservers]] +id=1 listen_pg_addr = '127.0.0.1:64000' listen_http_addr = '127.0.0.1:9898' pg_auth_type = 'Trust' diff --git a/control_plane/src/attachment_service.rs b/control_plane/src/attachment_service.rs index 2bd4260aa8..f0e649cfa8 100644 --- a/control_plane/src/attachment_service.rs +++ b/control_plane/src/attachment_service.rs @@ -32,7 +32,7 @@ impl AttachmentService { // Makes no sense to construct this if pageservers aren't going to use it: assume // pageservers have control plane API set - let listen_url = env.pageserver.control_plane_api.clone().unwrap(); + let listen_url = env.control_plane_api.clone().unwrap(); let listen = format!( "{}:{}", @@ -80,7 +80,6 @@ impl AttachmentService { let url = self .env - .pageserver .control_plane_api .clone() .unwrap() diff --git a/control_plane/src/bin/neon_local.rs b/control_plane/src/bin/neon_local.rs index 6b49b92cfa..4cdb91bfd2 100644 --- a/control_plane/src/bin/neon_local.rs +++ b/control_plane/src/bin/neon_local.rs @@ -50,16 +50,17 @@ fn default_conf() -> String { format!( r#" # Default built-in configuration, defined in main.rs +control_plane_api = '{DEFAULT_PAGESERVER_CONTROL_PLANE_API}' + [broker] listen_addr = '{DEFAULT_BROKER_ADDR}' -[pageserver] +[[pageservers]] id = {DEFAULT_PAGESERVER_ID} listen_pg_addr = '{DEFAULT_PAGESERVER_PG_ADDR}' listen_http_addr = '{DEFAULT_PAGESERVER_HTTP_ADDR}' pg_auth_type = '{trust_auth}' http_auth_type = '{trust_auth}' -control_plane_api = '{DEFAULT_PAGESERVER_CONTROL_PLANE_API}' [[safekeepers]] id = {DEFAULT_SAFEKEEPER_ID} @@ -258,7 +259,7 @@ fn get_timeline_infos( env: &local_env::LocalEnv, tenant_id: &TenantId, ) -> Result> { - Ok(PageServerNode::from_env(env) + Ok(get_default_pageserver(env) .timeline_list(tenant_id)? .into_iter() .map(|timeline_info| (timeline_info.timeline_id, timeline_info)) @@ -319,17 +320,30 @@ fn handle_init(init_match: &ArgMatches) -> anyhow::Result { .context("Failed to initialize neon repository")?; // Initialize pageserver, create initial tenant and timeline. - let pageserver = PageServerNode::from_env(&env); - pageserver - .initialize(&pageserver_config_overrides(init_match)) - .unwrap_or_else(|e| { - eprintln!("pageserver init failed: {e:?}"); - exit(1); - }); + for ps_conf in &env.pageservers { + PageServerNode::from_env(&env, ps_conf) + .initialize(&pageserver_config_overrides(init_match)) + .unwrap_or_else(|e| { + eprintln!("pageserver init failed: {e:?}"); + exit(1); + }); + } Ok(env) } +/// The default pageserver is the one where CLI tenant/timeline operations are sent by default. +/// For typical interactive use, one would just run with a single pageserver. Scenarios with +/// tenant/timeline placement across multiple pageservers are managed by python test code rather +/// than this CLI. +fn get_default_pageserver(env: &local_env::LocalEnv) -> PageServerNode { + let ps_conf = env + .pageservers + .first() + .expect("Config is validated to contain at least one pageserver"); + PageServerNode::from_env(env, ps_conf) +} + fn pageserver_config_overrides(init_match: &ArgMatches) -> Vec<&str> { init_match .get_many::("pageserver-config-override") @@ -340,7 +354,7 @@ fn pageserver_config_overrides(init_match: &ArgMatches) -> Vec<&str> { } fn handle_tenant(tenant_match: &ArgMatches, env: &mut local_env::LocalEnv) -> anyhow::Result<()> { - let pageserver = PageServerNode::from_env(env); + let pageserver = get_default_pageserver(env); match tenant_match.subcommand() { Some(("list", _)) => { for t in pageserver.tenant_list()? { @@ -356,11 +370,11 @@ fn handle_tenant(tenant_match: &ArgMatches, env: &mut local_env::LocalEnv) -> an // If tenant ID was not specified, generate one let tenant_id = parse_tenant_id(create_match)?.unwrap_or_else(TenantId::generate); - let generation = if env.pageserver.control_plane_api.is_some() { + let generation = if env.control_plane_api.is_some() { // We must register the tenant with the attachment service, so // that when the pageserver restarts, it will be re-attached. let attachment_service = AttachmentService::from_env(env); - attachment_service.attach_hook(tenant_id, env.pageserver.id)? + attachment_service.attach_hook(tenant_id, pageserver.conf.id)? } else { None }; @@ -425,7 +439,7 @@ fn handle_tenant(tenant_match: &ArgMatches, env: &mut local_env::LocalEnv) -> an } fn handle_timeline(timeline_match: &ArgMatches, env: &mut local_env::LocalEnv) -> Result<()> { - let pageserver = PageServerNode::from_env(env); + let pageserver = get_default_pageserver(env); match timeline_match.subcommand() { Some(("list", list_match)) => { @@ -502,6 +516,7 @@ fn handle_timeline(timeline_match: &ArgMatches, env: &mut local_env::LocalEnv) - None, pg_version, ComputeMode::Primary, + DEFAULT_PAGESERVER_ID, )?; println!("Done"); } @@ -555,7 +570,6 @@ fn handle_endpoint(ep_match: &ArgMatches, env: &local_env::LocalEnv) -> Result<( Some(ep_subcommand_data) => ep_subcommand_data, None => bail!("no endpoint subcommand provided"), }; - let mut cplane = ComputeControlPlane::load(env.clone())?; // All subcommands take an optional --tenant-id option @@ -652,6 +666,13 @@ fn handle_endpoint(ep_match: &ArgMatches, env: &local_env::LocalEnv) -> Result<( .copied() .unwrap_or(false); + let pageserver_id = + if let Some(id_str) = sub_args.get_one::("endpoint-pageserver-id") { + NodeId(id_str.parse().context("while parsing pageserver id")?) + } else { + DEFAULT_PAGESERVER_ID + }; + let mode = match (lsn, hot_standby) { (Some(lsn), false) => ComputeMode::Static(lsn), (None, true) => ComputeMode::Replica, @@ -667,6 +688,7 @@ fn handle_endpoint(ep_match: &ArgMatches, env: &local_env::LocalEnv) -> Result<( http_port, pg_version, mode, + pageserver_id, )?; } "start" => { @@ -676,6 +698,13 @@ fn handle_endpoint(ep_match: &ArgMatches, env: &local_env::LocalEnv) -> Result<( .get_one::("endpoint_id") .ok_or_else(|| anyhow!("No endpoint ID was provided to start"))?; + let pageserver_id = + if let Some(id_str) = sub_args.get_one::("endpoint-pageserver-id") { + NodeId(id_str.parse().context("while parsing pageserver id")?) + } else { + DEFAULT_PAGESERVER_ID + }; + let remote_ext_config = sub_args.get_one::("remote-ext-config"); // If --safekeepers argument is given, use only the listed safekeeper nodes. @@ -695,7 +724,8 @@ fn handle_endpoint(ep_match: &ArgMatches, env: &local_env::LocalEnv) -> Result<( let endpoint = cplane.endpoints.get(endpoint_id.as_str()); - let auth_token = if matches!(env.pageserver.pg_auth_type, AuthType::NeonJWT) { + let ps_conf = env.get_pageserver_conf(pageserver_id)?; + let auth_token = if matches!(ps_conf.pg_auth_type, AuthType::NeonJWT) { let claims = Claims::new(Some(tenant_id), Scope::Tenant); Some(env.generate_auth_token(&claims)?) @@ -762,6 +792,7 @@ fn handle_endpoint(ep_match: &ArgMatches, env: &local_env::LocalEnv) -> Result<( http_port, pg_version, mode, + pageserver_id, )?; ep.start(&auth_token, safekeepers, remote_ext_config)?; } @@ -786,48 +817,64 @@ fn handle_endpoint(ep_match: &ArgMatches, env: &local_env::LocalEnv) -> Result<( } fn handle_pageserver(sub_match: &ArgMatches, env: &local_env::LocalEnv) -> Result<()> { - let pageserver = PageServerNode::from_env(env); + fn get_pageserver(env: &local_env::LocalEnv, args: &ArgMatches) -> Result { + let node_id = if let Some(id_str) = args.get_one::("pageserver-id") { + NodeId(id_str.parse().context("while parsing pageserver id")?) + } else { + DEFAULT_PAGESERVER_ID + }; + + Ok(PageServerNode::from_env( + env, + env.get_pageserver_conf(node_id)?, + )) + } match sub_match.subcommand() { - Some(("start", start_match)) => { - if let Err(e) = pageserver.start(&pageserver_config_overrides(start_match)) { + Some(("start", subcommand_args)) => { + if let Err(e) = get_pageserver(env, subcommand_args)? + .start(&pageserver_config_overrides(subcommand_args)) + { eprintln!("pageserver start failed: {e}"); exit(1); } } - Some(("stop", stop_match)) => { - let immediate = stop_match + Some(("stop", subcommand_args)) => { + let immediate = subcommand_args .get_one::("stop-mode") .map(|s| s.as_str()) == Some("immediate"); - if let Err(e) = pageserver.stop(immediate) { + if let Err(e) = get_pageserver(env, subcommand_args)?.stop(immediate) { eprintln!("pageserver stop failed: {}", e); exit(1); } } - Some(("restart", restart_match)) => { + Some(("restart", subcommand_args)) => { + let pageserver = get_pageserver(env, subcommand_args)?; //TODO what shutdown strategy should we use here? if let Err(e) = pageserver.stop(false) { eprintln!("pageserver stop failed: {}", e); exit(1); } - if let Err(e) = pageserver.start(&pageserver_config_overrides(restart_match)) { + if let Err(e) = pageserver.start(&pageserver_config_overrides(subcommand_args)) { eprintln!("pageserver start failed: {e}"); exit(1); } } - Some(("status", _)) => match PageServerNode::from_env(env).check_status() { - Ok(_) => println!("Page server is up and running"), - Err(err) => { - eprintln!("Page server is not available: {}", err); - exit(1); + Some(("status", subcommand_args)) => { + match get_pageserver(env, subcommand_args)?.check_status() { + Ok(_) => println!("Page server is up and running"), + Err(err) => { + eprintln!("Page server is not available: {}", err); + exit(1); + } } - }, + } Some((sub_name, _)) => bail!("Unexpected pageserver subcommand '{}'", sub_name), None => bail!("no pageserver subcommand provided"), @@ -943,7 +990,7 @@ fn handle_start_all(sub_match: &ArgMatches, env: &local_env::LocalEnv) -> anyhow broker::start_broker_process(env)?; // Only start the attachment service if the pageserver is configured to need it - if env.pageserver.control_plane_api.is_some() { + if env.control_plane_api.is_some() { let attachment_service = AttachmentService::from_env(env); if let Err(e) = attachment_service.start() { eprintln!("attachment_service start failed: {:#}", e); @@ -952,11 +999,13 @@ fn handle_start_all(sub_match: &ArgMatches, env: &local_env::LocalEnv) -> anyhow } } - let pageserver = PageServerNode::from_env(env); - if let Err(e) = pageserver.start(&pageserver_config_overrides(sub_match)) { - eprintln!("pageserver {} start failed: {:#}", env.pageserver.id, e); - try_stop_all(env, true); - exit(1); + for ps_conf in &env.pageservers { + let pageserver = PageServerNode::from_env(env, ps_conf); + if let Err(e) = pageserver.start(&pageserver_config_overrides(sub_match)) { + eprintln!("pageserver {} start failed: {:#}", ps_conf.id, e); + try_stop_all(env, true); + exit(1); + } } for node in env.safekeepers.iter() { @@ -980,8 +1029,6 @@ fn handle_stop_all(sub_match: &ArgMatches, env: &local_env::LocalEnv) -> Result< } fn try_stop_all(env: &local_env::LocalEnv, immediate: bool) { - let pageserver = PageServerNode::from_env(env); - // Stop all endpoints match ComputeControlPlane::load(env.clone()) { Ok(cplane) => { @@ -996,8 +1043,11 @@ fn try_stop_all(env: &local_env::LocalEnv, immediate: bool) { } } - if let Err(e) = pageserver.stop(immediate) { - eprintln!("pageserver {} stop failed: {:#}", env.pageserver.id, e); + for ps_conf in &env.pageservers { + let pageserver = PageServerNode::from_env(env, ps_conf); + if let Err(e) = pageserver.stop(immediate) { + eprintln!("pageserver {} stop failed: {:#}", ps_conf.id, e); + } } for node in env.safekeepers.iter() { @@ -1011,7 +1061,7 @@ fn try_stop_all(env: &local_env::LocalEnv, immediate: bool) { eprintln!("neon broker stop failed: {e:#}"); } - if env.pageserver.control_plane_api.is_some() { + if env.control_plane_api.is_some() { let attachment_service = AttachmentService::from_env(env); if let Err(e) = attachment_service.stop(immediate) { eprintln!("attachment service stop failed: {e:#}"); @@ -1031,6 +1081,16 @@ fn cli() -> Command { let safekeeper_id_arg = Arg::new("id").help("safekeeper id").required(false); + // --id, when using a pageserver command + let pageserver_id_arg = Arg::new("pageserver-id") + .long("id") + .help("pageserver id") + .required(false); + // --pageserver-id when using a non-pageserver command + let endpoint_pageserver_id_arg = Arg::new("endpoint-pageserver-id") + .long("pageserver-id") + .required(false); + let safekeeper_extra_opt_arg = Arg::new("safekeeper-extra-opt") .short('e') .long("safekeeper-extra-opt") @@ -1195,10 +1255,16 @@ fn cli() -> Command { .arg_required_else_help(true) .about("Manage pageserver") .subcommand(Command::new("status")) - .subcommand(Command::new("start").about("Start local pageserver").arg(pageserver_config_args.clone())) + .arg(pageserver_id_arg.clone()) + .subcommand(Command::new("start").about("Start local pageserver") + .arg(pageserver_id_arg.clone()) + .arg(pageserver_config_args.clone())) .subcommand(Command::new("stop").about("Stop local pageserver") + .arg(pageserver_id_arg.clone()) .arg(stop_mode_arg.clone())) - .subcommand(Command::new("restart").about("Restart local pageserver").arg(pageserver_config_args.clone())) + .subcommand(Command::new("restart").about("Restart local pageserver") + .arg(pageserver_id_arg.clone()) + .arg(pageserver_config_args.clone())) ) .subcommand( Command::new("attachment_service") @@ -1242,6 +1308,7 @@ fn cli() -> Command { .arg(lsn_arg.clone()) .arg(pg_port_arg.clone()) .arg(http_port_arg.clone()) + .arg(endpoint_pageserver_id_arg.clone()) .arg( Arg::new("config-only") .help("Don't do basebackup, create endpoint directory with only config files") @@ -1259,6 +1326,7 @@ fn cli() -> Command { .arg(lsn_arg) .arg(pg_port_arg) .arg(http_port_arg) + .arg(endpoint_pageserver_id_arg.clone()) .arg(pg_version_arg) .arg(hot_standby_arg) .arg(safekeepers_arg) diff --git a/control_plane/src/endpoint.rs b/control_plane/src/endpoint.rs index 4ed03c8771..cba364c049 100644 --- a/control_plane/src/endpoint.rs +++ b/control_plane/src/endpoint.rs @@ -70,6 +70,7 @@ pub struct EndpointConf { http_port: u16, pg_version: u32, skip_pg_catalog_updates: bool, + pageserver_id: NodeId, } // @@ -82,19 +83,16 @@ pub struct ComputeControlPlane { pub endpoints: BTreeMap>, env: LocalEnv, - pageserver: Arc, } impl ComputeControlPlane { // Load current endpoints from the endpoints/ subdirectories pub fn load(env: LocalEnv) -> Result { - let pageserver = Arc::new(PageServerNode::from_env(&env)); - let mut endpoints = BTreeMap::default(); for endpoint_dir in std::fs::read_dir(env.endpoints_path()) .with_context(|| format!("failed to list {}", env.endpoints_path().display()))? { - let ep = Endpoint::from_dir_entry(endpoint_dir?, &env, &pageserver)?; + let ep = Endpoint::from_dir_entry(endpoint_dir?, &env)?; endpoints.insert(ep.endpoint_id.clone(), Arc::new(ep)); } @@ -102,7 +100,6 @@ impl ComputeControlPlane { base_port: 55431, endpoints, env, - pageserver, }) } @@ -125,15 +122,18 @@ impl ComputeControlPlane { http_port: Option, pg_version: u32, mode: ComputeMode, + pageserver_id: NodeId, ) -> Result> { let pg_port = pg_port.unwrap_or_else(|| self.get_port()); let http_port = http_port.unwrap_or_else(|| self.get_port() + 1); + let pageserver = + PageServerNode::from_env(&self.env, self.env.get_pageserver_conf(pageserver_id)?); let ep = Arc::new(Endpoint { endpoint_id: endpoint_id.to_owned(), pg_address: SocketAddr::new("127.0.0.1".parse().unwrap(), pg_port), http_address: SocketAddr::new("127.0.0.1".parse().unwrap(), http_port), env: self.env.clone(), - pageserver: Arc::clone(&self.pageserver), + pageserver, timeline_id, mode, tenant_id, @@ -159,6 +159,7 @@ impl ComputeControlPlane { pg_port, pg_version, skip_pg_catalog_updates: true, + pageserver_id, })?, )?; std::fs::write( @@ -193,18 +194,14 @@ pub struct Endpoint { // These are not part of the endpoint as such, but the environment // the endpoint runs in. pub env: LocalEnv, - pageserver: Arc, + pageserver: PageServerNode, // Optimizations skip_pg_catalog_updates: bool, } impl Endpoint { - fn from_dir_entry( - entry: std::fs::DirEntry, - env: &LocalEnv, - pageserver: &Arc, - ) -> Result { + fn from_dir_entry(entry: std::fs::DirEntry, env: &LocalEnv) -> Result { if !entry.file_type()?.is_dir() { anyhow::bail!( "Endpoint::from_dir_entry failed: '{}' is not a directory", @@ -220,12 +217,15 @@ impl Endpoint { let conf: EndpointConf = serde_json::from_slice(&std::fs::read(entry.path().join("endpoint.json"))?)?; + let pageserver = + PageServerNode::from_env(env, env.get_pageserver_conf(conf.pageserver_id)?); + Ok(Endpoint { pg_address: SocketAddr::new("127.0.0.1".parse().unwrap(), conf.pg_port), http_address: SocketAddr::new("127.0.0.1".parse().unwrap(), conf.http_port), endpoint_id, env: env.clone(), - pageserver: Arc::clone(pageserver), + pageserver, timeline_id: conf.timeline_id, mode: conf.mode, tenant_id: conf.tenant_id, diff --git a/control_plane/src/local_env.rs b/control_plane/src/local_env.rs index 0215ab1bb5..b4d09b01ab 100644 --- a/control_plane/src/local_env.rs +++ b/control_plane/src/local_env.rs @@ -68,11 +68,17 @@ pub struct LocalEnv { pub broker: NeonBroker, - pub pageserver: PageServerConf, + /// This Vec must always contain at least one pageserver + pub pageservers: Vec, #[serde(default)] pub safekeepers: Vec, + // Control plane location: if None, we will not run attachment_service. If set, this will + // be propagated into each pageserver's configuration. + #[serde(default)] + pub control_plane_api: Option, + /// Keep human-readable aliases in memory (and persist them to config), to hide ZId hex strings from the user. #[serde(default)] // A `HashMap>` would be more appropriate here, @@ -118,9 +124,6 @@ pub struct PageServerConf { // auth type used for the PG and HTTP ports pub pg_auth_type: AuthType, pub http_auth_type: AuthType, - - // Control plane location - pub control_plane_api: Option, } impl Default for PageServerConf { @@ -131,7 +134,6 @@ impl Default for PageServerConf { listen_http_addr: String::new(), pg_auth_type: AuthType::Trust, http_auth_type: AuthType::Trust, - control_plane_api: None, } } } @@ -222,15 +224,23 @@ impl LocalEnv { self.base_data_dir.join("endpoints") } - // TODO: move pageserver files into ./pageserver - pub fn pageserver_data_dir(&self) -> PathBuf { - self.base_data_dir.clone() + pub fn pageserver_data_dir(&self, pageserver_id: NodeId) -> PathBuf { + self.base_data_dir + .join(format!("pageserver_{pageserver_id}")) } pub fn safekeeper_data_dir(&self, data_dir_name: &str) -> PathBuf { self.base_data_dir.join("safekeepers").join(data_dir_name) } + pub fn get_pageserver_conf(&self, id: NodeId) -> anyhow::Result<&PageServerConf> { + if let Some(conf) = self.pageservers.iter().find(|node| node.id == id) { + Ok(conf) + } else { + bail!("could not find pageserver {id}") + } + } + pub fn register_branch_mapping( &mut self, branch_name: String, @@ -307,6 +317,10 @@ impl LocalEnv { env.neon_distrib_dir = env::current_exe()?.parent().unwrap().to_owned(); } + if env.pageservers.is_empty() { + anyhow::bail!("Configuration must contain at least one pageserver"); + } + env.base_data_dir = base_path(); Ok(env) @@ -339,7 +353,7 @@ impl LocalEnv { // We read that in, in `create_config`, and fill any missing defaults. Then it's saved // to .neon/config. TODO: We lose any formatting and comments along the way, which is // a bit sad. - let mut conf_content = r#"# This file describes a locale deployment of the page server + let mut conf_content = r#"# This file describes a local deployment of the page server # and safekeeeper node. It is read by the 'neon_local' command-line # utility. "# @@ -469,9 +483,9 @@ impl LocalEnv { } fn auth_keys_needed(&self) -> bool { - self.pageserver.pg_auth_type == AuthType::NeonJWT - || self.pageserver.http_auth_type == AuthType::NeonJWT - || self.safekeepers.iter().any(|sk| sk.auth_enabled) + self.pageservers.iter().any(|ps| { + ps.pg_auth_type == AuthType::NeonJWT || ps.http_auth_type == AuthType::NeonJWT + }) || self.safekeepers.iter().any(|sk| sk.auth_enabled) } } diff --git a/control_plane/src/pageserver.rs b/control_plane/src/pageserver.rs index eecc2479ff..a6b675fdb5 100644 --- a/control_plane/src/pageserver.rs +++ b/control_plane/src/pageserver.rs @@ -27,6 +27,7 @@ use utils::{ lsn::Lsn, }; +use crate::local_env::PageServerConf; use crate::{background_process, local_env::LocalEnv}; #[derive(Error, Debug)] @@ -76,43 +77,40 @@ impl ResponseErrorMessageExt for Response { #[derive(Debug)] pub struct PageServerNode { pub pg_connection_config: PgConnectionConfig, + pub conf: PageServerConf, pub env: LocalEnv, pub http_client: Client, pub http_base_url: String, } impl PageServerNode { - pub fn from_env(env: &LocalEnv) -> PageServerNode { - let (host, port) = parse_host_port(&env.pageserver.listen_pg_addr) - .expect("Unable to parse listen_pg_addr"); + pub fn from_env(env: &LocalEnv, conf: &PageServerConf) -> PageServerNode { + let (host, port) = + parse_host_port(&conf.listen_pg_addr).expect("Unable to parse listen_pg_addr"); let port = port.unwrap_or(5432); Self { pg_connection_config: PgConnectionConfig::new_host_port(host, port), + conf: conf.clone(), env: env.clone(), http_client: Client::new(), - http_base_url: format!("http://{}/v1", env.pageserver.listen_http_addr), + http_base_url: format!("http://{}/v1", conf.listen_http_addr), } } // pageserver conf overrides defined by neon_local configuration. fn neon_local_overrides(&self) -> Vec { - let id = format!("id={}", self.env.pageserver.id); + let id = format!("id={}", self.conf.id); // FIXME: the paths should be shell-escaped to handle paths with spaces, quotas etc. let pg_distrib_dir_param = format!( "pg_distrib_dir='{}'", self.env.pg_distrib_dir_raw().display() ); - let http_auth_type_param = - format!("http_auth_type='{}'", self.env.pageserver.http_auth_type); - let listen_http_addr_param = format!( - "listen_http_addr='{}'", - self.env.pageserver.listen_http_addr - ); + let http_auth_type_param = format!("http_auth_type='{}'", self.conf.http_auth_type); + let listen_http_addr_param = format!("listen_http_addr='{}'", self.conf.listen_http_addr); - let pg_auth_type_param = format!("pg_auth_type='{}'", self.env.pageserver.pg_auth_type); - let listen_pg_addr_param = - format!("listen_pg_addr='{}'", self.env.pageserver.listen_pg_addr); + let pg_auth_type_param = format!("pg_auth_type='{}'", self.conf.pg_auth_type); + let listen_pg_addr_param = format!("listen_pg_addr='{}'", self.conf.listen_pg_addr); let broker_endpoint_param = format!("broker_endpoint='{}'", self.env.broker.client_url()); @@ -126,17 +124,18 @@ impl PageServerNode { broker_endpoint_param, ]; - if let Some(control_plane_api) = &self.env.pageserver.control_plane_api { + if let Some(control_plane_api) = &self.env.control_plane_api { overrides.push(format!( "control_plane_api='{}'", control_plane_api.as_str() )); } - if self.env.pageserver.http_auth_type != AuthType::Trust - || self.env.pageserver.pg_auth_type != AuthType::Trust + if self.conf.http_auth_type != AuthType::Trust || self.conf.pg_auth_type != AuthType::Trust { - overrides.push("auth_validation_public_key_path='auth_public_key.pem'".to_owned()); + // Keys are generated in the toplevel repo dir, pageservers' workdirs + // are one level below that, so refer to keys with ../ + overrides.push("auth_validation_public_key_path='../auth_public_key.pem'".to_owned()); } overrides } @@ -144,16 +143,12 @@ impl PageServerNode { /// Initializes a pageserver node by creating its config with the overrides provided. pub fn initialize(&self, config_overrides: &[&str]) -> anyhow::Result<()> { // First, run `pageserver --init` and wait for it to write a config into FS and exit. - self.pageserver_init(config_overrides).with_context(|| { - format!( - "Failed to run init for pageserver node {}", - self.env.pageserver.id, - ) - }) + self.pageserver_init(config_overrides) + .with_context(|| format!("Failed to run init for pageserver node {}", self.conf.id,)) } pub fn repo_path(&self) -> PathBuf { - self.env.pageserver_data_dir() + self.env.pageserver_data_dir(self.conf.id) } /// The pid file is created by the pageserver process, with its pid stored inside. @@ -169,7 +164,7 @@ impl PageServerNode { fn pageserver_init(&self, config_overrides: &[&str]) -> anyhow::Result<()> { let datadir = self.repo_path(); - let node_id = self.env.pageserver.id; + let node_id = self.conf.id; println!( "Initializing pageserver node {} at '{}' in {:?}", node_id, @@ -178,6 +173,10 @@ impl PageServerNode { ); io::stdout().flush()?; + if !datadir.exists() { + std::fs::create_dir(&datadir)?; + } + let datadir_path_str = datadir.to_str().with_context(|| { format!("Cannot start pageserver node {node_id} in path that has no string representation: {datadir:?}") })?; @@ -208,7 +207,7 @@ impl PageServerNode { let datadir = self.repo_path(); print!( "Starting pageserver node {} at '{}' in {:?}", - self.env.pageserver.id, + self.conf.id, self.pg_connection_config.raw_address(), datadir ); @@ -217,7 +216,7 @@ impl PageServerNode { let datadir_path_str = datadir.to_str().with_context(|| { format!( "Cannot start pageserver node {} in path that has no string representation: {:?}", - self.env.pageserver.id, datadir, + self.conf.id, datadir, ) })?; let mut args = self.pageserver_basic_args(config_overrides, datadir_path_str); @@ -261,7 +260,7 @@ impl PageServerNode { // FIXME: why is this tied to pageserver's auth type? Whether or not the safekeeper // needs a token, and how to generate that token, seems independent to whether // the pageserver requires a token in incoming requests. - Ok(if self.env.pageserver.http_auth_type != AuthType::Trust { + Ok(if self.conf.http_auth_type != AuthType::Trust { // Generate a token to connect from the pageserver to a safekeeper let token = self .env @@ -286,7 +285,7 @@ impl PageServerNode { pub fn page_server_psql_client(&self) -> anyhow::Result { let mut config = self.pg_connection_config.clone(); - if self.env.pageserver.pg_auth_type == AuthType::NeonJWT { + if self.conf.pg_auth_type == AuthType::NeonJWT { let token = self .env .generate_auth_token(&Claims::new(None, Scope::PageServerApi))?; @@ -297,7 +296,7 @@ impl PageServerNode { fn http_request(&self, method: Method, url: U) -> anyhow::Result { let mut builder = self.http_client.request(method, url); - if self.env.pageserver.http_auth_type == AuthType::NeonJWT { + if self.conf.http_auth_type == AuthType::NeonJWT { let token = self .env .generate_auth_token(&Claims::new(None, Scope::PageServerApi))?; diff --git a/test_runner/fixtures/neon_fixtures.py b/test_runner/fixtures/neon_fixtures.py index 37524e7f5d..272c32fdcd 100644 --- a/test_runner/fixtures/neon_fixtures.py +++ b/test_runner/fixtures/neon_fixtures.py @@ -416,6 +416,7 @@ class NeonEnvBuilder: pageserver_remote_storage: Optional[RemoteStorage] = None, pageserver_config_override: Optional[str] = None, num_safekeepers: int = 1, + num_pageservers: int = 1, # Use non-standard SK ids to check for various parsing bugs safekeepers_id_start: int = 0, # fsync is disabled by default to make the tests go faster @@ -443,6 +444,7 @@ class NeonEnvBuilder: self.mock_s3_server: MockS3Server = mock_s3_server self.pageserver_config_override = pageserver_config_override self.num_safekeepers = num_safekeepers + self.num_pageservers = num_pageservers self.safekeepers_id_start = safekeepers_id_start self.safekeepers_enable_fsync = safekeepers_enable_fsync self.auth_enabled = auth_enabled @@ -613,7 +615,9 @@ class NeonEnvBuilder: self.env.endpoints.stop_all() for sk in self.env.safekeepers: sk.stop(immediate=True) - self.env.pageserver.stop(immediate=True) + + for pageserver in self.env.pageservers: + pageserver.stop(immediate=True) if self.env.attachment_service is not None: self.env.attachment_service.stop(immediate=True) @@ -644,7 +648,8 @@ class NeonEnvBuilder: if cleanup_error is not None: raise cleanup_error - self.env.pageserver.assert_no_errors() + for pageserver in self.env.pageservers: + pageserver.assert_no_errors() class NeonEnv: @@ -664,8 +669,7 @@ class NeonEnv: postgres - A factory object for creating postgres compute nodes. - pageserver - An object that contains functions for manipulating and - connecting to the pageserver + pageservers - An array containing objects representing the pageservers safekeepers - An array containing objects representing the safekeepers @@ -680,7 +684,7 @@ class NeonEnv: the tenant id """ - PAGESERVER_ID = 1 + BASE_PAGESERVER_ID = 1 def __init__(self, config: NeonEnvBuilder): self.repo_dir = config.repo_dir @@ -690,14 +694,20 @@ class NeonEnv: self.neon_cli = NeonCli(env=self) self.endpoints = EndpointFactory(self) self.safekeepers: List[Safekeeper] = [] + self.pageservers: List[NeonPageserver] = [] self.broker = config.broker self.pageserver_remote_storage = config.pageserver_remote_storage self.ext_remote_storage = config.ext_remote_storage self.safekeepers_remote_storage = config.sk_remote_storage self.pg_version = config.pg_version + # Binary path for pageserver, safekeeper, etc self.neon_binpath = config.neon_binpath + # Binary path for neon_local test-specific binaries: may be overridden + # after construction for compat testing + self.neon_local_binpath = config.neon_binpath self.pg_distrib_dir = config.pg_distrib_dir self.endpoint_counter = 0 + self.pageserver_config_override = config.pageserver_config_override # generate initial tenant ID here instead of letting 'neon init' generate it, # so that we don't need to dig it out of the config file afterwards. @@ -719,6 +729,13 @@ class NeonEnv: """ ) + if self.control_plane_api is not None: + toml += textwrap.dedent( + f""" + control_plane_api = '{self.control_plane_api}' + """ + ) + toml += textwrap.dedent( f""" [broker] @@ -727,36 +744,36 @@ class NeonEnv: ) # Create config for pageserver - pageserver_port = PageserverPort( - pg=self.port_distributor.get_port(), - http=self.port_distributor.get_port(), - ) http_auth_type = "NeonJWT" if config.auth_enabled else "Trust" pg_auth_type = "NeonJWT" if config.auth_enabled else "Trust" + for ps_id in range( + self.BASE_PAGESERVER_ID, self.BASE_PAGESERVER_ID + config.num_pageservers + ): + pageserver_port = PageserverPort( + pg=self.port_distributor.get_port(), + http=self.port_distributor.get_port(), + ) - toml += textwrap.dedent( - f""" - [pageserver] - id={self.PAGESERVER_ID} - listen_pg_addr = 'localhost:{pageserver_port.pg}' - listen_http_addr = 'localhost:{pageserver_port.http}' - pg_auth_type = '{pg_auth_type}' - http_auth_type = '{http_auth_type}' - """ - ) - - if self.control_plane_api is not None: toml += textwrap.dedent( f""" - control_plane_api = '{self.control_plane_api}' + [[pageservers]] + id={ps_id} + listen_pg_addr = 'localhost:{pageserver_port.pg}' + listen_http_addr = 'localhost:{pageserver_port.http}' + pg_auth_type = '{pg_auth_type}' + http_auth_type = '{http_auth_type}' """ ) - # Create a corresponding NeonPageserver object - self.pageserver = NeonPageserver( - self, port=pageserver_port, config_override=config.pageserver_config_override - ) - + # Create a corresponding NeonPageserver object + self.pageservers.append( + NeonPageserver( + self, + ps_id, + port=pageserver_port, + config_override=config.pageserver_config_override, + ) + ) # Create config and a Safekeeper object for each safekeeper for i in range(1, config.num_safekeepers + 1): port = SafekeeperPort( @@ -798,25 +815,55 @@ class NeonEnv: if self.attachment_service is not None: self.attachment_service.start() - self.pageserver.start() + + for pageserver in self.pageservers: + pageserver.start() for safekeeper in self.safekeepers: safekeeper.start() + @property + def pageserver(self) -> NeonPageserver: + """ + For tests that are naive to multiple pageservers: give them the 1st in the list, and + assert that there is only one. Tests with multiple pageservers should always use + get_pageserver with an explicit ID. + """ + assert len(self.pageservers) == 1 + return self.pageservers[0] + + def get_pageserver(self, id: Optional[int]) -> NeonPageserver: + """ + Look up a pageserver by its node ID. + + As a convenience for tests that do not use multiple pageservers, passing None + will yield the same default pageserver as `self.pageserver`. + """ + + if id is None: + return self.pageserver + + for ps in self.pageservers: + if ps.id == id: + return ps + + raise RuntimeError(f"Pageserver with ID {id} not found") + def get_safekeeper_connstrs(self) -> str: """Get list of safekeeper endpoints suitable for safekeepers GUC""" return ",".join(f"localhost:{wa.port.pg}" for wa in self.safekeepers) - def timeline_dir(self, tenant_id: TenantId, timeline_id: TimelineId) -> Path: - """Get a timeline directory's path based on the repo directory of the test environment""" - return self.tenant_dir(tenant_id) / "timelines" / str(timeline_id) - - def tenant_dir( - self, - tenant_id: TenantId, + def timeline_dir( + self, tenant_id: TenantId, timeline_id: TimelineId, pageserver_id: Optional[int] = None ) -> Path: + """Get a timeline directory's path based on the repo directory of the test environment""" + return ( + self.tenant_dir(tenant_id, pageserver_id=pageserver_id) / "timelines" / str(timeline_id) + ) + + def tenant_dir(self, tenant_id: TenantId, pageserver_id: Optional[int] = None) -> Path: """Get a tenant directory's path based on the repo directory of the test environment""" - return self.repo_dir / "tenants" / str(tenant_id) + return self.get_pageserver(pageserver_id).workdir / "tenants" / str(tenant_id) def get_pageserver_version(self) -> str: bin_pageserver = str(self.neon_binpath / "pageserver") @@ -984,6 +1031,7 @@ class AbstractNeonCli(abc.ABC): extra_env_vars: Optional[Dict[str, str]] = None, check_return_code=True, timeout=None, + local_binpath=False, ) -> "subprocess.CompletedProcess[str]": """ Run the command with the specified arguments. @@ -997,12 +1045,19 @@ class AbstractNeonCli(abc.ABC): >>> log.info(result.stdout) If `check_return_code`, on non-zero exit code logs failure and raises. + + If `local_binpath` is true, then we are invoking a test utility """ assert type(arguments) == list assert type(self.COMMAND) == str - bin_neon = str(self.env.neon_binpath / self.COMMAND) + if local_binpath: + # Test utility + bin_neon = str(self.env.neon_local_binpath / self.COMMAND) + else: + # Normal binary + bin_neon = str(self.env.neon_binpath / self.COMMAND) args = [bin_neon] + arguments log.info('Running command "{}"'.format(" ".join(args))) @@ -1056,6 +1111,10 @@ class NeonCli(AbstractNeonCli): COMMAND = "neon_local" + def raw_cli(self, *args, **kwargs) -> subprocess.CompletedProcess[str]: + kwargs["local_binpath"] = True + return super().raw_cli(*args, **kwargs) + def create_tenant( self, tenant_id: Optional[TenantId] = None, @@ -1214,7 +1273,7 @@ class NeonCli(AbstractNeonCli): append_pageserver_param_overrides( params_to_update=cmd, remote_storage=storage, - pageserver_config_override=self.env.pageserver.config_override, + pageserver_config_override=self.env.pageserver_config_override, ) s3_env_vars = None @@ -1236,15 +1295,16 @@ class NeonCli(AbstractNeonCli): def pageserver_start( self, + id: int, overrides: Tuple[str, ...] = (), extra_env_vars: Optional[Dict[str, str]] = None, ) -> "subprocess.CompletedProcess[str]": - start_args = ["pageserver", "start", *overrides] + start_args = ["pageserver", "start", f"--id={id}", *overrides] storage = self.env.pageserver_remote_storage append_pageserver_param_overrides( params_to_update=start_args, remote_storage=storage, - pageserver_config_override=self.env.pageserver.config_override, + pageserver_config_override=self.env.pageserver_config_override, ) if isinstance(storage, S3Storage): @@ -1253,8 +1313,8 @@ class NeonCli(AbstractNeonCli): return self.raw_cli(start_args, extra_env_vars=extra_env_vars) - def pageserver_stop(self, immediate=False) -> "subprocess.CompletedProcess[str]": - cmd = ["pageserver", "stop"] + def pageserver_stop(self, id: int, immediate=False) -> "subprocess.CompletedProcess[str]": + cmd = ["pageserver", "stop", f"--id={id}"] if immediate: cmd.extend(["-m", "immediate"]) @@ -1295,6 +1355,7 @@ class NeonCli(AbstractNeonCli): tenant_id: Optional[TenantId] = None, hot_standby: bool = False, lsn: Optional[Lsn] = None, + pageserver_id: Optional[int] = None, ) -> "subprocess.CompletedProcess[str]": args = [ "endpoint", @@ -1316,6 +1377,8 @@ class NeonCli(AbstractNeonCli): args.append(endpoint_id) if hot_standby: args.extend(["--hot-standby", "true"]) + if pageserver_id is not None: + args.extend(["--pageserver-id", str(pageserver_id)]) res = self.raw_cli(args) res.check_returncode() @@ -1331,6 +1394,7 @@ class NeonCli(AbstractNeonCli): lsn: Optional[Lsn] = None, branch_name: Optional[str] = None, remote_ext_config: Optional[str] = None, + pageserver_id: Optional[int] = None, ) -> "subprocess.CompletedProcess[str]": args = [ "endpoint", @@ -1353,6 +1417,8 @@ class NeonCli(AbstractNeonCli): args.extend(["--branch-name", branch_name]) if endpoint_id is not None: args.append(endpoint_id) + if pageserver_id is not None: + args.extend(["--pageserver-id", str(pageserver_id)]) storage = self.env.ext_remote_storage s3_env_vars = None @@ -1452,9 +1518,12 @@ class NeonPageserver(PgProtocol): TEMP_FILE_SUFFIX = "___temp" - def __init__(self, env: NeonEnv, port: PageserverPort, config_override: Optional[str] = None): + def __init__( + self, env: NeonEnv, id: int, port: PageserverPort, config_override: Optional[str] = None + ): super().__init__(host="localhost", port=port.pg, user="cloud_admin") self.env = env + self.id = id self.running = False self.service_port = port self.config_override = config_override @@ -1528,7 +1597,9 @@ class NeonPageserver(PgProtocol): """ assert self.running is False - self.env.neon_cli.pageserver_start(overrides=overrides, extra_env_vars=extra_env_vars) + self.env.neon_cli.pageserver_start( + self.id, overrides=overrides, extra_env_vars=extra_env_vars + ) self.running = True return self @@ -1538,7 +1609,7 @@ class NeonPageserver(PgProtocol): Returns self. """ if self.running: - self.env.neon_cli.pageserver_stop(immediate) + self.env.neon_cli.pageserver_stop(self.id, immediate) self.running = False return self @@ -1564,8 +1635,12 @@ class NeonPageserver(PgProtocol): is_testing_enabled_or_skip=self.is_testing_enabled_or_skip, ) + @property + def workdir(self) -> Path: + return Path(os.path.join(self.env.repo_dir, f"pageserver_{self.id}")) + def assert_no_errors(self): - logfile = open(os.path.join(self.env.repo_dir, "pageserver.log"), "r") + logfile = open(os.path.join(self.workdir, "pageserver.log"), "r") error_or_warn = re.compile(r"\s(ERROR|WARN)") errors = [] while True: @@ -1588,7 +1663,7 @@ class NeonPageserver(PgProtocol): def log_contains(self, pattern: str) -> Optional[str]: """Check that the pageserver log contains a line that matches the given regex""" - logfile = open(os.path.join(self.env.repo_dir, "pageserver.log"), "r") + logfile = open(os.path.join(self.workdir, "pageserver.log"), "r") contains_re = re.compile(pattern) @@ -1618,14 +1693,14 @@ class NeonPageserver(PgProtocol): if self.env.attachment_service is not None: response = requests.post( f"{self.env.control_plane_api}/attach_hook", - json={"tenant_id": str(tenant_id), "pageserver_id": self.env.PAGESERVER_ID}, + json={"tenant_id": str(tenant_id), "pageserver_id": self.id}, ) response.raise_for_status() generation = response.json()["gen"] else: generation = None - client = self.env.pageserver.http_client() + client = self.http_client() return client.tenant_attach(tenant_id, config, config_null, generation=generation) @@ -2236,6 +2311,7 @@ class Endpoint(PgProtocol): hot_standby: bool = False, lsn: Optional[Lsn] = None, config_lines: Optional[List[str]] = None, + pageserver_id: Optional[int] = None, ) -> "Endpoint": """ Create a new Postgres endpoint. @@ -2257,6 +2333,7 @@ class Endpoint(PgProtocol): hot_standby=hot_standby, pg_port=self.pg_port, http_port=self.http_port, + pageserver_id=pageserver_id, ) path = Path("endpoints") / self.endpoint_id / "pgdata" self.pgdata_dir = os.path.join(self.env.repo_dir, path) @@ -2270,7 +2347,9 @@ class Endpoint(PgProtocol): return self - def start(self, remote_ext_config: Optional[str] = None) -> "Endpoint": + def start( + self, remote_ext_config: Optional[str] = None, pageserver_id: Optional[int] = None + ) -> "Endpoint": """ Start the Postgres instance. Returns self. @@ -2287,6 +2366,7 @@ class Endpoint(PgProtocol): tenant_id=self.tenant_id, safekeepers=self.active_safekeepers, remote_ext_config=remote_ext_config, + pageserver_id=pageserver_id, ) self.running = True @@ -2377,6 +2457,7 @@ class Endpoint(PgProtocol): lsn: Optional[Lsn] = None, config_lines: Optional[List[str]] = None, remote_ext_config: Optional[str] = None, + pageserver_id: Optional[int] = None, ) -> "Endpoint": """ Create an endpoint, apply config, and start Postgres. @@ -2391,6 +2472,7 @@ class Endpoint(PgProtocol): config_lines=config_lines, hot_standby=hot_standby, lsn=lsn, + pageserver_id=pageserver_id, ).start(remote_ext_config=remote_ext_config) log.info(f"Postgres startup took {time.time() - started_at} seconds") @@ -2426,6 +2508,7 @@ class EndpointFactory: hot_standby: bool = False, config_lines: Optional[List[str]] = None, remote_ext_config: Optional[str] = None, + pageserver_id: Optional[int] = None, ) -> Endpoint: ep = Endpoint( self.env, @@ -2443,6 +2526,7 @@ class EndpointFactory: config_lines=config_lines, lsn=lsn, remote_ext_config=remote_ext_config, + pageserver_id=pageserver_id, ) def create( @@ -2877,9 +2961,7 @@ def list_files_to_compare(pgdata_dir: Path) -> List[str]: # pg is the existing and running compute node, that we want to compare with a basebackup def check_restored_datadir_content( - test_output_dir: Path, - env: NeonEnv, - endpoint: Endpoint, + test_output_dir: Path, env: NeonEnv, endpoint: Endpoint, pageserver_id: Optional[int] = None ): # Get the timeline ID. We need it for the 'basebackup' command timeline_id = TimelineId(endpoint.safe_psql("SHOW neon.timeline_id")[0][0]) @@ -2904,7 +2986,7 @@ def check_restored_datadir_content( cmd = rf""" {psql_path} \ --no-psqlrc \ - postgres://localhost:{env.pageserver.service_port.pg} \ + postgres://localhost:{env.get_pageserver(pageserver_id).service_port.pg} \ -c 'basebackup {endpoint.tenant_id} {timeline_id}' \ | tar -x -C {restored_dir_path} """ @@ -2954,19 +3036,32 @@ def check_restored_datadir_content( def wait_for_last_flush_lsn( - env: NeonEnv, endpoint: Endpoint, tenant: TenantId, timeline: TimelineId + env: NeonEnv, + endpoint: Endpoint, + tenant: TenantId, + timeline: TimelineId, + pageserver_id: Optional[int] = None, ) -> Lsn: """Wait for pageserver to catch up the latest flush LSN, returns the last observed lsn.""" + last_flush_lsn = Lsn(endpoint.safe_psql("SELECT pg_current_wal_flush_lsn()")[0][0]) - return wait_for_last_record_lsn(env.pageserver.http_client(), tenant, timeline, last_flush_lsn) + return wait_for_last_record_lsn( + env.get_pageserver(pageserver_id).http_client(), tenant, timeline, last_flush_lsn + ) def wait_for_wal_insert_lsn( - env: NeonEnv, endpoint: Endpoint, tenant: TenantId, timeline: TimelineId + env: NeonEnv, + endpoint: Endpoint, + tenant: TenantId, + timeline: TimelineId, + pageserver_id: Optional[int] = None, ) -> Lsn: """Wait for pageserver to catch up the latest flush LSN, returns the last observed lsn.""" last_flush_lsn = Lsn(endpoint.safe_psql("SELECT pg_current_wal_insert_lsn()")[0][0]) - return wait_for_last_record_lsn(env.pageserver.http_client(), tenant, timeline, last_flush_lsn) + return wait_for_last_record_lsn( + env.get_pageserver(pageserver_id).http_client(), tenant, timeline, last_flush_lsn + ) def fork_at_current_lsn( @@ -2986,15 +3081,21 @@ def fork_at_current_lsn( def last_flush_lsn_upload( - env: NeonEnv, endpoint: Endpoint, tenant_id: TenantId, timeline_id: TimelineId + env: NeonEnv, + endpoint: Endpoint, + tenant_id: TenantId, + timeline_id: TimelineId, + pageserver_id: Optional[int] = None, ) -> Lsn: """ Wait for pageserver to catch to the latest flush LSN of given endpoint, checkpoint pageserver, and wait for it to be uploaded (remote_consistent_lsn reaching flush LSN). """ - last_flush_lsn = wait_for_last_flush_lsn(env, endpoint, tenant_id, timeline_id) - ps_http = env.pageserver.http_client() + last_flush_lsn = wait_for_last_flush_lsn( + env, endpoint, tenant_id, timeline_id, pageserver_id=pageserver_id + ) + ps_http = env.get_pageserver(pageserver_id).http_client() wait_for_last_record_lsn(ps_http, tenant_id, timeline_id, last_flush_lsn) # force a checkpoint to trigger upload ps_http.timeline_checkpoint(tenant_id, timeline_id) diff --git a/test_runner/regress/test_broken_timeline.py b/test_runner/regress/test_broken_timeline.py index 57e9413aa3..d0f180c866 100644 --- a/test_runner/regress/test_broken_timeline.py +++ b/test_runner/regress/test_broken_timeline.py @@ -44,14 +44,14 @@ def test_broken_timeline(neon_env_builder: NeonEnvBuilder): log.info(f"Timeline {tenant0}/{timeline0} is left intact") (tenant1, timeline1, pg1) = tenant_timelines[1] - metadata_path = f"{env.repo_dir}/tenants/{tenant1}/timelines/{timeline1}/metadata" + metadata_path = f"{env.pageserver.workdir}/tenants/{tenant1}/timelines/{timeline1}/metadata" f = open(metadata_path, "w") f.write("overwritten with garbage!") f.close() log.info(f"Timeline {tenant1}/{timeline1} got its metadata spoiled") (tenant2, timeline2, pg2) = tenant_timelines[2] - timeline_path = f"{env.repo_dir}/tenants/{tenant2}/timelines/{timeline2}/" + timeline_path = f"{env.pageserver.workdir}/tenants/{tenant2}/timelines/{timeline2}/" for filename in os.listdir(timeline_path): if filename.startswith("00000"): # Looks like a layer file. Remove it @@ -61,7 +61,7 @@ def test_broken_timeline(neon_env_builder: NeonEnvBuilder): ) (tenant3, timeline3, pg3) = tenant_timelines[3] - timeline_path = f"{env.repo_dir}/tenants/{tenant3}/timelines/{timeline3}/" + timeline_path = f"{env.pageserver.workdir}/tenants/{tenant3}/timelines/{timeline3}/" for filename in os.listdir(timeline_path): if filename.startswith("00000"): # Looks like a layer file. Corrupt it @@ -135,7 +135,7 @@ def test_timeline_init_break_before_checkpoint(neon_simple_env: NeonEnv): tenant_id, _ = env.neon_cli.create_tenant() - timelines_dir = env.repo_dir / "tenants" / str(tenant_id) / "timelines" + timelines_dir = env.pageserver.workdir / "tenants" / str(tenant_id) / "timelines" old_tenant_timelines = env.neon_cli.list_timelines(tenant_id) initial_timeline_dirs = [d for d in timelines_dir.iterdir()] @@ -145,8 +145,8 @@ def test_timeline_init_break_before_checkpoint(neon_simple_env: NeonEnv): _ = env.neon_cli.create_timeline("test_timeline_init_break_before_checkpoint", tenant_id) # Restart the page server - env.neon_cli.pageserver_stop(immediate=True) - env.neon_cli.pageserver_start() + env.pageserver.stop(immediate=True) + env.pageserver.start() # Creating the timeline didn't finish. The other timelines on tenant should still be present and work normally. new_tenant_timelines = env.neon_cli.list_timelines(tenant_id) @@ -166,7 +166,7 @@ def test_timeline_create_break_after_uninit_mark(neon_simple_env: NeonEnv): tenant_id, _ = env.neon_cli.create_tenant() - timelines_dir = env.repo_dir / "tenants" / str(tenant_id) / "timelines" + timelines_dir = env.pageserver.workdir / "tenants" / str(tenant_id) / "timelines" old_tenant_timelines = env.neon_cli.list_timelines(tenant_id) initial_timeline_dirs = [d for d in timelines_dir.iterdir()] diff --git a/test_runner/regress/test_close_fds.py b/test_runner/regress/test_close_fds.py index 7059f3360e..ce9ecb3dc4 100644 --- a/test_runner/regress/test_close_fds.py +++ b/test_runner/regress/test_close_fds.py @@ -33,7 +33,7 @@ def test_lsof_pageserver_pid(neon_simple_env: NeonEnv): workload_thread = threading.Thread(target=start_workload, args=(), daemon=True) workload_thread.start() - path = os.path.join(env.repo_dir, "pageserver.pid") + path = os.path.join(env.pageserver.workdir, "pageserver.pid") lsof = lsof_path() while workload_thread.is_alive(): res = subprocess.run( diff --git a/test_runner/regress/test_compatibility.py b/test_runner/regress/test_compatibility.py index 247b1d8bce..ad08a4b5ce 100644 --- a/test_runner/regress/test_compatibility.py +++ b/test_runner/regress/test_compatibility.py @@ -133,7 +133,6 @@ def test_backward_compatibility( prepare_snapshot( from_dir=compatibility_snapshot_dir, to_dir=test_output_dir / "compatibility_snapshot", - neon_binpath=neon_binpath, port_distributor=port_distributor, ) @@ -202,7 +201,6 @@ def test_forward_compatibility( from_dir=compatibility_snapshot_dir, to_dir=test_output_dir / "compatibility_snapshot", port_distributor=port_distributor, - neon_binpath=compatibility_neon_bin, pg_distrib_dir=compatibility_postgres_distrib_dir, ) @@ -234,7 +232,6 @@ def prepare_snapshot( from_dir: Path, to_dir: Path, port_distributor: PortDistributor, - neon_binpath: Path, pg_distrib_dir: Optional[Path] = None, ): assert from_dir.exists(), f"Snapshot '{from_dir}' doesn't exist" @@ -246,6 +243,9 @@ def prepare_snapshot( repo_dir = to_dir / "repo" + snapshot_config_toml = repo_dir / "config" + snapshot_config = toml.load(snapshot_config_toml) + # Remove old logs to avoid confusion in test artifacts for logfile in repo_dir.glob("**/*.log"): logfile.unlink() @@ -259,31 +259,53 @@ def prepare_snapshot( os.mkdir(repo_dir / "endpoints") # Update paths and ports in config files - pageserver_toml = repo_dir / "pageserver.toml" - pageserver_config = toml.load(pageserver_toml) - pageserver_config["remote_storage"]["local_path"] = LocalFsStorage.component_path( - repo_dir, RemoteStorageUser.PAGESERVER - ) - for param in ("listen_http_addr", "listen_pg_addr", "broker_endpoint"): - pageserver_config[param] = port_distributor.replace_with_new_port(pageserver_config[param]) + legacy_pageserver_toml = repo_dir / "pageserver.toml" + legacy_bundle = os.path.exists(legacy_pageserver_toml) - # We don't use authentication in compatibility tests - # so just remove authentication related settings. - pageserver_config.pop("pg_auth_type", None) - pageserver_config.pop("http_auth_type", None) - - if pg_distrib_dir: - pageserver_config["pg_distrib_dir"] = str(pg_distrib_dir) - - with pageserver_toml.open("w") as f: - toml.dump(pageserver_config, f) - - snapshot_config_toml = repo_dir / "config" - snapshot_config = toml.load(snapshot_config_toml) - for param in ("listen_http_addr", "listen_pg_addr"): - snapshot_config["pageserver"][param] = port_distributor.replace_with_new_port( - snapshot_config["pageserver"][param] + path_to_config: dict[Path, dict[Any, Any]] = {} + if legacy_bundle: + os.mkdir(repo_dir / "pageserver_1") + path_to_config[repo_dir / "pageserver_1" / "pageserver.toml"] = toml.load( + legacy_pageserver_toml ) + os.remove(legacy_pageserver_toml) + os.rename(repo_dir / "tenants", repo_dir / "pageserver_1" / "tenants") + else: + for ps_conf in snapshot_config["pageservers"]: + config_path = repo_dir / f"pageserver_{ps_conf['id']}" / "pageserver.toml" + path_to_config[config_path] = toml.load(config_path) + + # For each pageserver config, edit it and rewrite + for config_path, pageserver_config in path_to_config.items(): + pageserver_config["remote_storage"]["local_path"] = LocalFsStorage.component_path( + repo_dir, RemoteStorageUser.PAGESERVER + ) + + for param in ("listen_http_addr", "listen_pg_addr", "broker_endpoint"): + pageserver_config[param] = port_distributor.replace_with_new_port( + pageserver_config[param] + ) + + # We don't use authentication in compatibility tests + # so just remove authentication related settings. + pageserver_config.pop("pg_auth_type", None) + pageserver_config.pop("http_auth_type", None) + + if pg_distrib_dir: + pageserver_config["pg_distrib_dir"] = str(pg_distrib_dir) + + with config_path.open("w") as f: + toml.dump(pageserver_config, f) + + # neon_local config doesn't have to be backward compatible. If we're using a dump from before + # it supported multiple pageservers, fix it up. + if "pageservers" not in snapshot_config: + snapshot_config["pageservers"] = [snapshot_config["pageserver"]] + del snapshot_config["pageserver"] + + for param in ("listen_http_addr", "listen_pg_addr"): + for pageserver in snapshot_config["pageservers"]: + pageserver[param] = port_distributor.replace_with_new_port(pageserver[param]) snapshot_config["broker"]["listen_addr"] = port_distributor.replace_with_new_port( snapshot_config["broker"]["listen_addr"] ) @@ -347,6 +369,9 @@ def check_neon_works( # Use the "target" binaries to launch the storage nodes config_target = config config_target.neon_binpath = neon_target_binpath + # We are using maybe-old binaries for neon services, but want to use current + # binaries for test utilities like neon_local + config_target.neon_local_binpath = neon_current_binpath cli_target = NeonCli(config_target) # And the current binaries to launch computes @@ -379,7 +404,7 @@ def check_neon_works( # loosely based on https://github.com/neondatabase/cloud/wiki/Recovery-from-WAL tenant_id = snapshot_config["default_tenant_id"] timeline_id = dict(snapshot_config["branch_name_mappings"]["main"])[tenant_id] - pageserver_port = snapshot_config["pageserver"]["listen_http_addr"].split(":")[-1] + pageserver_port = snapshot_config["pageservers"][0]["listen_http_addr"].split(":")[-1] pageserver_http = PageserverHttpClient( port=pageserver_port, is_testing_enabled_or_skip=lambda: True, # TODO: check if testing really enabled diff --git a/test_runner/regress/test_import.py b/test_runner/regress/test_import.py index ec1196eea6..1722d6bfe7 100644 --- a/test_runner/regress/test_import.py +++ b/test_runner/regress/test_import.py @@ -271,7 +271,7 @@ def _import( env.endpoints.stop_all() env.pageserver.stop() - dir_to_clear = Path(env.repo_dir) / "tenants" + dir_to_clear = Path(env.pageserver.workdir) / "tenants" shutil.rmtree(dir_to_clear) os.mkdir(dir_to_clear) diff --git a/test_runner/regress/test_large_schema.py b/test_runner/regress/test_large_schema.py index 72bf32fcd3..b6ac1aa41f 100644 --- a/test_runner/regress/test_large_schema.py +++ b/test_runner/regress/test_large_schema.py @@ -75,7 +75,7 @@ def test_large_schema(neon_env_builder: NeonEnvBuilder): # Check layer file sizes timeline_path = "{}/tenants/{}/timelines/{}/".format( - env.repo_dir, env.initial_tenant, env.initial_timeline + env.pageserver.workdir, env.initial_tenant, env.initial_timeline ) for filename in os.listdir(timeline_path): if filename.startswith("00000"): diff --git a/test_runner/regress/test_neon_cli.py b/test_runner/regress/test_neon_cli.py index 9d24594cb6..1b3984583a 100644 --- a/test_runner/regress/test_neon_cli.py +++ b/test_runner/regress/test_neon_cli.py @@ -124,10 +124,14 @@ def test_cli_ipv4_listeners(neon_env_builder: NeonEnvBuilder): def test_cli_start_stop(neon_env_builder: NeonEnvBuilder): + """ + Basic start/stop with default single-instance config for + safekeeper and pageserver + """ env = neon_env_builder.init_start() # Stop default ps/sk - env.neon_cli.pageserver_stop() + env.neon_cli.pageserver_stop(env.pageserver.id) env.neon_cli.safekeeper_stop() # Default start @@ -139,6 +143,38 @@ def test_cli_start_stop(neon_env_builder: NeonEnvBuilder): res.check_returncode() +def test_cli_start_stop_multi(neon_env_builder: NeonEnvBuilder): + """ + Basic start/stop with explicitly configured counts of pageserver + and safekeeper + """ + neon_env_builder.num_pageservers = 2 + neon_env_builder.num_safekeepers = 2 + env = neon_env_builder.init_start() + + env.neon_cli.pageserver_stop(env.BASE_PAGESERVER_ID) + env.neon_cli.pageserver_stop(env.BASE_PAGESERVER_ID + 1) + + # Addressing a nonexistent ID throws + with pytest.raises(RuntimeError): + env.neon_cli.pageserver_stop(env.BASE_PAGESERVER_ID + 100) + + # Using the single-pageserver shortcut property throws when there are multiple pageservers + with pytest.raises(AssertionError): + _drop = env.pageserver + + env.neon_cli.safekeeper_stop(neon_env_builder.safekeepers_id_start + 1) + env.neon_cli.safekeeper_stop(neon_env_builder.safekeepers_id_start + 2) + + # Default start + res = env.neon_cli.raw_cli(["start"]) + res.check_returncode() + + # Default stop + res = env.neon_cli.raw_cli(["stop"]) + res.check_returncode() + + @skip_on_postgres(PgVersion.V14, reason="does not use postgres") @pytest.mark.skipif( os.environ.get("BUILD_TYPE") == "debug", reason="unit test for test support, either build works" diff --git a/test_runner/regress/test_ondemand_download.py b/test_runner/regress/test_ondemand_download.py index 4d5971cd11..259c6412ef 100644 --- a/test_runner/regress/test_ondemand_download.py +++ b/test_runner/regress/test_ondemand_download.py @@ -115,7 +115,7 @@ def test_ondemand_download_large_rel( env.pageserver.stop() # remove all the layer files - for layer in (Path(env.repo_dir) / "tenants").glob("*/timelines/*/*-*_*"): + for layer in (Path(env.pageserver.workdir) / "tenants").glob("*/timelines/*/*-*_*"): log.info(f"unlinking layer {layer}") layer.unlink() @@ -237,7 +237,7 @@ def test_ondemand_download_timetravel( env.pageserver.stop() # remove all the layer files - for layer in (Path(env.repo_dir) / "tenants").glob("*/timelines/*/*-*_*"): + for layer in (Path(env.pageserver.workdir) / "tenants").glob("*/timelines/*/*-*_*"): log.info(f"unlinking layer {layer}") layer.unlink() @@ -367,7 +367,7 @@ def test_download_remote_layers_api( # remove all the layer files # XXX only delete some of the layer files, to show that it really just downloads all the layers - for layer in (Path(env.repo_dir) / "tenants").glob("*/timelines/*/*-*_*"): + for layer in (Path(env.pageserver.workdir) / "tenants").glob("*/timelines/*/*-*_*"): log.info(f"unlinking layer {layer.name}") layer.unlink() diff --git a/test_runner/regress/test_pageserver_api.py b/test_runner/regress/test_pageserver_api.py index 28732872df..f5bcfd52f9 100644 --- a/test_runner/regress/test_pageserver_api.py +++ b/test_runner/regress/test_pageserver_api.py @@ -17,13 +17,13 @@ from fixtures.utils import wait_until def test_pageserver_init_node_id( neon_simple_env: NeonEnv, neon_binpath: Path, pg_distrib_dir: Path ): - repo_dir = neon_simple_env.repo_dir - pageserver_config = repo_dir / "pageserver.toml" + workdir = neon_simple_env.pageserver.workdir + pageserver_config = workdir / "pageserver.toml" pageserver_bin = neon_binpath / "pageserver" def run_pageserver(args): return subprocess.run( - [str(pageserver_bin), "-D", str(repo_dir), *args], + [str(pageserver_bin), "-D", str(workdir), *args], check=False, universal_newlines=True, stdout=subprocess.PIPE, diff --git a/test_runner/regress/test_read_trace.py b/test_runner/regress/test_read_trace.py index cae8ca3919..e6b3ccd7ec 100644 --- a/test_runner/regress/test_read_trace.py +++ b/test_runner/regress/test_read_trace.py @@ -35,5 +35,5 @@ def test_read_request_tracing(neon_env_builder: NeonEnvBuilder): # Stop postgres so we drop the connection and flush the traces endpoint.stop() - trace_path = env.repo_dir / "traces" / str(tenant_id) / str(timeline_id) + trace_path = env.pageserver.workdir / "traces" / str(tenant_id) / str(timeline_id) assert trace_path.exists() diff --git a/test_runner/regress/test_remote_storage.py b/test_runner/regress/test_remote_storage.py index b01c4cc332..d0bb52336d 100644 --- a/test_runner/regress/test_remote_storage.py +++ b/test_runner/regress/test_remote_storage.py @@ -137,7 +137,7 @@ def test_remote_storage_backup_and_restore( env.endpoints.stop_all() env.pageserver.stop() - dir_to_clear = Path(env.repo_dir) / "tenants" + dir_to_clear = Path(env.pageserver.workdir) / "tenants" shutil.rmtree(dir_to_clear) os.mkdir(dir_to_clear) @@ -353,7 +353,7 @@ def test_remote_storage_upload_queue_retries( env.pageserver.stop(immediate=True) env.endpoints.stop_all() - dir_to_clear = Path(env.repo_dir) / "tenants" + dir_to_clear = Path(env.pageserver.workdir) / "tenants" shutil.rmtree(dir_to_clear) os.mkdir(dir_to_clear) @@ -488,7 +488,7 @@ def test_remote_timeline_client_calls_started_metric( env.pageserver.stop(immediate=True) env.endpoints.stop_all() - dir_to_clear = Path(env.repo_dir) / "tenants" + dir_to_clear = Path(env.pageserver.workdir) / "tenants" shutil.rmtree(dir_to_clear) os.mkdir(dir_to_clear) diff --git a/test_runner/regress/test_tenant_conf.py b/test_runner/regress/test_tenant_conf.py index 4fe6909433..d8a9e0390a 100644 --- a/test_runner/regress/test_tenant_conf.py +++ b/test_runner/regress/test_tenant_conf.py @@ -299,7 +299,7 @@ def test_creating_tenant_conf_after_attach(neon_env_builder: NeonEnvBuilder): # tenant is created with defaults, as in without config file (tenant_id, timeline_id) = env.neon_cli.create_tenant() - config_path = env.repo_dir / "tenants" / str(tenant_id) / "config" + config_path = env.pageserver.workdir / "tenants" / str(tenant_id) / "config" assert config_path.exists(), "config file is always initially created" http_client = env.pageserver.http_client() diff --git a/test_runner/regress/test_tenant_delete.py b/test_runner/regress/test_tenant_delete.py index 68188eb80a..e60b996a16 100644 --- a/test_runner/regress/test_tenant_delete.py +++ b/test_runner/regress/test_tenant_delete.py @@ -1,7 +1,6 @@ import enum import os import shutil -from pathlib import Path import pytest from fixtures.log_helper import log @@ -367,7 +366,7 @@ def test_tenant_delete_is_resumed_on_attach( env.endpoints.stop_all() env.pageserver.stop() - dir_to_clear = Path(env.repo_dir) / "tenants" + dir_to_clear = env.pageserver.workdir / "tenants" shutil.rmtree(dir_to_clear) os.mkdir(dir_to_clear) diff --git a/test_runner/regress/test_tenant_detach.py b/test_runner/regress/test_tenant_detach.py index 4e3c13123d..cdf72edc4d 100644 --- a/test_runner/regress/test_tenant_detach.py +++ b/test_runner/regress/test_tenant_detach.py @@ -289,7 +289,7 @@ def test_tenant_detach_smoke(neon_env_builder: NeonEnvBuilder): ) # assert tenant exists on disk - assert (env.repo_dir / "tenants" / str(tenant_id)).exists() + assert (env.pageserver.workdir / "tenants" / str(tenant_id)).exists() endpoint = env.endpoints.create_start("main", tenant_id=tenant_id) # we rely upon autocommit after each statement @@ -332,7 +332,7 @@ def test_tenant_detach_smoke(neon_env_builder: NeonEnvBuilder): log.info("gc thread returned") # check that nothing is left on disk for deleted tenant - assert not (env.repo_dir / "tenants" / str(tenant_id)).exists() + assert not (env.pageserver.workdir / "tenants" / str(tenant_id)).exists() with pytest.raises( expected_exception=PageserverApiException, match=f"NotFound: tenant {tenant_id}" @@ -357,7 +357,7 @@ def test_tenant_detach_ignored_tenant(neon_simple_env: NeonEnv): ) # assert tenant exists on disk - assert (env.repo_dir / "tenants" / str(tenant_id)).exists() + assert (env.pageserver.workdir / "tenants" / str(tenant_id)).exists() endpoint = env.endpoints.create_start("main", tenant_id=tenant_id) # we rely upon autocommit after each statement @@ -386,7 +386,7 @@ def test_tenant_detach_ignored_tenant(neon_simple_env: NeonEnv): log.info("ignored tenant detached without error") # check that nothing is left on disk for deleted tenant - assert not (env.repo_dir / "tenants" / str(tenant_id)).exists() + assert not (env.pageserver.workdir / "tenants" / str(tenant_id)).exists() # assert the tenant does not exists in the Pageserver tenants_after_detach = [tenant["id"] for tenant in client.tenant_list()] @@ -413,7 +413,7 @@ def test_tenant_detach_regular_tenant(neon_simple_env: NeonEnv): ) # assert tenant exists on disk - assert (env.repo_dir / "tenants" / str(tenant_id)).exists() + assert (env.pageserver.workdir / "tenants" / str(tenant_id)).exists() endpoint = env.endpoints.create_start("main", tenant_id=tenant_id) # we rely upon autocommit after each statement @@ -430,7 +430,7 @@ def test_tenant_detach_regular_tenant(neon_simple_env: NeonEnv): log.info("regular tenant detached without error") # check that nothing is left on disk for deleted tenant - assert not (env.repo_dir / "tenants" / str(tenant_id)).exists() + assert not (env.pageserver.workdir / "tenants" / str(tenant_id)).exists() # assert the tenant does not exists in the Pageserver tenants_after_detach = [tenant["id"] for tenant in client.tenant_list()] @@ -531,7 +531,7 @@ def test_ignored_tenant_reattach( pageserver_http = env.pageserver.http_client() ignored_tenant_id, _ = env.neon_cli.create_tenant() - tenant_dir = env.repo_dir / "tenants" / str(ignored_tenant_id) + tenant_dir = env.pageserver.workdir / "tenants" / str(ignored_tenant_id) tenants_before_ignore = [tenant["id"] for tenant in pageserver_http.tenant_list()] tenants_before_ignore.sort() timelines_before_ignore = [ diff --git a/test_runner/regress/test_tenant_relocation.py b/test_runner/regress/test_tenant_relocation.py index 88f921c2cd..a2b922bb84 100644 --- a/test_runner/regress/test_tenant_relocation.py +++ b/test_runner/regress/test_tenant_relocation.py @@ -561,7 +561,7 @@ def test_emergency_relocate_with_branches_slow_replay( # simpler than initializing a new one from scratch, but the effect on the single tenant # is the same. env.pageserver.stop(immediate=True) - shutil.rmtree(Path(env.repo_dir) / "tenants" / str(tenant_id)) + shutil.rmtree(env.pageserver.workdir / "tenants" / str(tenant_id)) env.pageserver.start() # This fail point will pause the WAL ingestion on the main branch, after the @@ -709,7 +709,7 @@ def test_emergency_relocate_with_branches_createdb( # Kill the pageserver, remove the tenant directory, and restart env.pageserver.stop(immediate=True) - shutil.rmtree(Path(env.repo_dir) / "tenants" / str(tenant_id)) + shutil.rmtree(env.pageserver.workdir / "tenants" / str(tenant_id)) env.pageserver.start() # Wait before ingesting the WAL for CREATE DATABASE on the main branch. The original diff --git a/test_runner/regress/test_tenants.py b/test_runner/regress/test_tenants.py index 88094b8766..28f0e658a1 100644 --- a/test_runner/regress/test_tenants.py +++ b/test_runner/regress/test_tenants.py @@ -27,7 +27,7 @@ from prometheus_client.samples import Sample def test_tenant_creation_fails(neon_simple_env: NeonEnv): - tenants_dir = Path(neon_simple_env.repo_dir) / "tenants" + tenants_dir = Path(neon_simple_env.pageserver.workdir) / "tenants" initial_tenants = sorted( map(lambda t: t.split()[0], neon_simple_env.neon_cli.list_tenants().stdout.splitlines()) ) @@ -322,7 +322,10 @@ def test_pageserver_with_empty_tenants( files_in_timelines_dir = sum( 1 for _p in Path.iterdir( - Path(env.repo_dir) / "tenants" / str(tenant_with_empty_timelines) / "timelines" + Path(env.pageserver.workdir) + / "tenants" + / str(tenant_with_empty_timelines) + / "timelines" ) ) assert ( @@ -334,7 +337,9 @@ def test_pageserver_with_empty_tenants( env.pageserver.stop() tenant_without_timelines_dir = env.initial_tenant - shutil.rmtree(Path(env.repo_dir) / "tenants" / str(tenant_without_timelines_dir) / "timelines") + shutil.rmtree( + Path(env.pageserver.workdir) / "tenants" / str(tenant_without_timelines_dir) / "timelines" + ) env.pageserver.start() diff --git a/test_runner/regress/test_tenants_with_remote_storage.py b/test_runner/regress/test_tenants_with_remote_storage.py index bad090e83e..4aec4c6113 100644 --- a/test_runner/regress/test_tenants_with_remote_storage.py +++ b/test_runner/regress/test_tenants_with_remote_storage.py @@ -179,7 +179,9 @@ def test_tenants_attached_after_download( env.pageserver.stop() - timeline_dir = Path(env.repo_dir) / "tenants" / str(tenant_id) / "timelines" / str(timeline_id) + timeline_dir = ( + Path(env.pageserver.workdir) / "tenants" / str(tenant_id) / "timelines" / str(timeline_id) + ) local_layer_deleted = False for path in Path.iterdir(timeline_dir): if path.name.startswith("00000"): diff --git a/test_runner/regress/test_timeline_delete.py b/test_runner/regress/test_timeline_delete.py index cc770e5aa8..1e38e0534c 100644 --- a/test_runner/regress/test_timeline_delete.py +++ b/test_runner/regress/test_timeline_delete.py @@ -73,7 +73,11 @@ def test_timeline_delete(neon_simple_env: NeonEnv): ) timeline_path = ( - env.repo_dir / "tenants" / str(env.initial_tenant) / "timelines" / str(parent_timeline_id) + env.pageserver.workdir + / "tenants" + / str(env.initial_tenant) + / "timelines" + / str(parent_timeline_id) ) with pytest.raises( @@ -86,7 +90,11 @@ def test_timeline_delete(neon_simple_env: NeonEnv): assert exc.value.status_code == 412 timeline_path = ( - env.repo_dir / "tenants" / str(env.initial_tenant) / "timelines" / str(leaf_timeline_id) + env.pageserver.workdir + / "tenants" + / str(env.initial_tenant) + / "timelines" + / str(leaf_timeline_id) ) assert timeline_path.exists() @@ -406,7 +414,7 @@ def test_timeline_resurrection_on_attach( env.endpoints.stop_all() env.pageserver.stop() - dir_to_clear = Path(env.repo_dir) / "tenants" + dir_to_clear = Path(env.pageserver.workdir) / "tenants" shutil.rmtree(dir_to_clear) os.mkdir(dir_to_clear) @@ -458,7 +466,11 @@ def test_timeline_delete_fail_before_local_delete(neon_env_builder: NeonEnvBuild ) leaf_timeline_path = ( - env.repo_dir / "tenants" / str(env.initial_tenant) / "timelines" / str(leaf_timeline_id) + env.pageserver.workdir + / "tenants" + / str(env.initial_tenant) + / "timelines" + / str(leaf_timeline_id) ) ps_http.timeline_delete(env.initial_tenant, leaf_timeline_id) @@ -907,7 +919,7 @@ def test_timeline_delete_resumed_on_attach( env.endpoints.stop_all() env.pageserver.stop() - dir_to_clear = Path(env.repo_dir) / "tenants" + dir_to_clear = Path(env.pageserver.workdir) / "tenants" shutil.rmtree(dir_to_clear) os.mkdir(dir_to_clear) diff --git a/test_runner/regress/test_wal_restore.py b/test_runner/regress/test_wal_restore.py index c97c69db23..a4f03be7a0 100644 --- a/test_runner/regress/test_wal_restore.py +++ b/test_runner/regress/test_wal_restore.py @@ -29,7 +29,7 @@ def test_wal_restore( endpoint.safe_psql("create table t as select generate_series(1,300000)") tenant_id = TenantId(endpoint.safe_psql("show neon.tenant_id")[0][0]) timeline_id = TimelineId(endpoint.safe_psql("show neon.timeline_id")[0][0]) - env.neon_cli.pageserver_stop() + env.pageserver.stop() port = port_distributor.get_port() data_dir = test_output_dir / "pgsql.restored" with VanillaPostgres( diff --git a/test_runner/regress/test_walredo_not_left_behind_on_detach.py b/test_runner/regress/test_walredo_not_left_behind_on_detach.py index 4a47898935..155242ed15 100644 --- a/test_runner/regress/test_walredo_not_left_behind_on_detach.py +++ b/test_runner/regress/test_walredo_not_left_behind_on_detach.py @@ -27,7 +27,7 @@ def test_walredo_not_left_behind_on_detach(neon_env_builder: NeonEnvBuilder): env.pageserver.allowed_errors.append(".*NotFound: tenant.*") pageserver_http = env.pageserver.http_client() - pagserver_pid = int((env.repo_dir / "pageserver.pid").read_text()) + pagserver_pid = int((env.pageserver.workdir / "pageserver.pid").read_text()) assert_child_processes(pagserver_pid, wal_redo_present=False, defunct_present=False) @@ -43,7 +43,7 @@ def test_walredo_not_left_behind_on_detach(neon_env_builder: NeonEnvBuilder): tenant_id, _ = env.neon_cli.create_tenant() # assert tenant exists on disk - assert (env.repo_dir / "tenants" / str(tenant_id)).exists() + assert (env.pageserver.workdir / "tenants" / str(tenant_id)).exists() endpoint = env.endpoints.create_start("main", tenant_id=tenant_id) @@ -101,7 +101,7 @@ def test_walredo_not_left_behind_on_detach(neon_env_builder: NeonEnvBuilder): pytest.fail(f"could not detach tenant: {last_error}") # check that nothing is left on disk for deleted tenant - assert not (env.repo_dir / "tenants" / str(tenant_id)).exists() + assert not (env.pageserver.workdir / "tenants" / str(tenant_id)).exists() # Pageserver schedules kill+wait of the WAL redo process to the background runtime, # asynchronously to tenant detach. Cut it some slack to complete kill+wait before