tests: enable multiple pageservers in neon_local and neon_fixture (#5231)

## Problem

Currently our testing environment only supports running a single
pageserver at a time. This is insufficient for testing failover and
migrations.
- Dependency of writing tests for #5207 

## Summary of changes

- `neon_local` and `neon_fixture` now handle multiple pageservers
- This is a breaking change to the `.neon/config` format: any local
environments will need recreating
- Existing tests continue to work unchanged:
  - The default number of pageservers is 1
- `NeonEnv.pageserver` is now a helper property that retrieves the first
pageserver if there is only one, else throws.
- Pageserver data directories are now at `.neon/pageserver_{n}` where n
is 1,2,3...
- Compatibility tests get some special casing to migrate neon_local
configs: these are not meant to be backward/forward compatible, but they
were treated that way by the test.
This commit is contained in:
John Spray
2023-09-08 16:19:57 +01:00
committed by GitHub
parent 499d0707d2
commit 7b6337db58
26 changed files with 497 additions and 236 deletions

View File

@@ -1,6 +1,7 @@
# Minimal neon environment with one safekeeper. This is equivalent to the built-in
# defaults that you get with no --config
[pageserver]
[[pageservers]]
id=1
listen_pg_addr = '127.0.0.1:64000'
listen_http_addr = '127.0.0.1:9898'
pg_auth_type = 'Trust'

View File

@@ -32,7 +32,7 @@ impl AttachmentService {
// Makes no sense to construct this if pageservers aren't going to use it: assume
// pageservers have control plane API set
let listen_url = env.pageserver.control_plane_api.clone().unwrap();
let listen_url = env.control_plane_api.clone().unwrap();
let listen = format!(
"{}:{}",
@@ -80,7 +80,6 @@ impl AttachmentService {
let url = self
.env
.pageserver
.control_plane_api
.clone()
.unwrap()

View File

@@ -50,16 +50,17 @@ fn default_conf() -> String {
format!(
r#"
# Default built-in configuration, defined in main.rs
control_plane_api = '{DEFAULT_PAGESERVER_CONTROL_PLANE_API}'
[broker]
listen_addr = '{DEFAULT_BROKER_ADDR}'
[pageserver]
[[pageservers]]
id = {DEFAULT_PAGESERVER_ID}
listen_pg_addr = '{DEFAULT_PAGESERVER_PG_ADDR}'
listen_http_addr = '{DEFAULT_PAGESERVER_HTTP_ADDR}'
pg_auth_type = '{trust_auth}'
http_auth_type = '{trust_auth}'
control_plane_api = '{DEFAULT_PAGESERVER_CONTROL_PLANE_API}'
[[safekeepers]]
id = {DEFAULT_SAFEKEEPER_ID}
@@ -258,7 +259,7 @@ fn get_timeline_infos(
env: &local_env::LocalEnv,
tenant_id: &TenantId,
) -> Result<HashMap<TimelineId, TimelineInfo>> {
Ok(PageServerNode::from_env(env)
Ok(get_default_pageserver(env)
.timeline_list(tenant_id)?
.into_iter()
.map(|timeline_info| (timeline_info.timeline_id, timeline_info))
@@ -319,17 +320,30 @@ fn handle_init(init_match: &ArgMatches) -> anyhow::Result<LocalEnv> {
.context("Failed to initialize neon repository")?;
// Initialize pageserver, create initial tenant and timeline.
let pageserver = PageServerNode::from_env(&env);
pageserver
.initialize(&pageserver_config_overrides(init_match))
.unwrap_or_else(|e| {
eprintln!("pageserver init failed: {e:?}");
exit(1);
});
for ps_conf in &env.pageservers {
PageServerNode::from_env(&env, ps_conf)
.initialize(&pageserver_config_overrides(init_match))
.unwrap_or_else(|e| {
eprintln!("pageserver init failed: {e:?}");
exit(1);
});
}
Ok(env)
}
/// The default pageserver is the one where CLI tenant/timeline operations are sent by default.
/// For typical interactive use, one would just run with a single pageserver. Scenarios with
/// tenant/timeline placement across multiple pageservers are managed by python test code rather
/// than this CLI.
fn get_default_pageserver(env: &local_env::LocalEnv) -> PageServerNode {
let ps_conf = env
.pageservers
.first()
.expect("Config is validated to contain at least one pageserver");
PageServerNode::from_env(env, ps_conf)
}
fn pageserver_config_overrides(init_match: &ArgMatches) -> Vec<&str> {
init_match
.get_many::<String>("pageserver-config-override")
@@ -340,7 +354,7 @@ fn pageserver_config_overrides(init_match: &ArgMatches) -> Vec<&str> {
}
fn handle_tenant(tenant_match: &ArgMatches, env: &mut local_env::LocalEnv) -> anyhow::Result<()> {
let pageserver = PageServerNode::from_env(env);
let pageserver = get_default_pageserver(env);
match tenant_match.subcommand() {
Some(("list", _)) => {
for t in pageserver.tenant_list()? {
@@ -356,11 +370,11 @@ fn handle_tenant(tenant_match: &ArgMatches, env: &mut local_env::LocalEnv) -> an
// If tenant ID was not specified, generate one
let tenant_id = parse_tenant_id(create_match)?.unwrap_or_else(TenantId::generate);
let generation = if env.pageserver.control_plane_api.is_some() {
let generation = if env.control_plane_api.is_some() {
// We must register the tenant with the attachment service, so
// that when the pageserver restarts, it will be re-attached.
let attachment_service = AttachmentService::from_env(env);
attachment_service.attach_hook(tenant_id, env.pageserver.id)?
attachment_service.attach_hook(tenant_id, pageserver.conf.id)?
} else {
None
};
@@ -425,7 +439,7 @@ fn handle_tenant(tenant_match: &ArgMatches, env: &mut local_env::LocalEnv) -> an
}
fn handle_timeline(timeline_match: &ArgMatches, env: &mut local_env::LocalEnv) -> Result<()> {
let pageserver = PageServerNode::from_env(env);
let pageserver = get_default_pageserver(env);
match timeline_match.subcommand() {
Some(("list", list_match)) => {
@@ -502,6 +516,7 @@ fn handle_timeline(timeline_match: &ArgMatches, env: &mut local_env::LocalEnv) -
None,
pg_version,
ComputeMode::Primary,
DEFAULT_PAGESERVER_ID,
)?;
println!("Done");
}
@@ -555,7 +570,6 @@ fn handle_endpoint(ep_match: &ArgMatches, env: &local_env::LocalEnv) -> Result<(
Some(ep_subcommand_data) => ep_subcommand_data,
None => bail!("no endpoint subcommand provided"),
};
let mut cplane = ComputeControlPlane::load(env.clone())?;
// All subcommands take an optional --tenant-id option
@@ -652,6 +666,13 @@ fn handle_endpoint(ep_match: &ArgMatches, env: &local_env::LocalEnv) -> Result<(
.copied()
.unwrap_or(false);
let pageserver_id =
if let Some(id_str) = sub_args.get_one::<String>("endpoint-pageserver-id") {
NodeId(id_str.parse().context("while parsing pageserver id")?)
} else {
DEFAULT_PAGESERVER_ID
};
let mode = match (lsn, hot_standby) {
(Some(lsn), false) => ComputeMode::Static(lsn),
(None, true) => ComputeMode::Replica,
@@ -667,6 +688,7 @@ fn handle_endpoint(ep_match: &ArgMatches, env: &local_env::LocalEnv) -> Result<(
http_port,
pg_version,
mode,
pageserver_id,
)?;
}
"start" => {
@@ -676,6 +698,13 @@ fn handle_endpoint(ep_match: &ArgMatches, env: &local_env::LocalEnv) -> Result<(
.get_one::<String>("endpoint_id")
.ok_or_else(|| anyhow!("No endpoint ID was provided to start"))?;
let pageserver_id =
if let Some(id_str) = sub_args.get_one::<String>("endpoint-pageserver-id") {
NodeId(id_str.parse().context("while parsing pageserver id")?)
} else {
DEFAULT_PAGESERVER_ID
};
let remote_ext_config = sub_args.get_one::<String>("remote-ext-config");
// If --safekeepers argument is given, use only the listed safekeeper nodes.
@@ -695,7 +724,8 @@ fn handle_endpoint(ep_match: &ArgMatches, env: &local_env::LocalEnv) -> Result<(
let endpoint = cplane.endpoints.get(endpoint_id.as_str());
let auth_token = if matches!(env.pageserver.pg_auth_type, AuthType::NeonJWT) {
let ps_conf = env.get_pageserver_conf(pageserver_id)?;
let auth_token = if matches!(ps_conf.pg_auth_type, AuthType::NeonJWT) {
let claims = Claims::new(Some(tenant_id), Scope::Tenant);
Some(env.generate_auth_token(&claims)?)
@@ -762,6 +792,7 @@ fn handle_endpoint(ep_match: &ArgMatches, env: &local_env::LocalEnv) -> Result<(
http_port,
pg_version,
mode,
pageserver_id,
)?;
ep.start(&auth_token, safekeepers, remote_ext_config)?;
}
@@ -786,48 +817,64 @@ fn handle_endpoint(ep_match: &ArgMatches, env: &local_env::LocalEnv) -> Result<(
}
fn handle_pageserver(sub_match: &ArgMatches, env: &local_env::LocalEnv) -> Result<()> {
let pageserver = PageServerNode::from_env(env);
fn get_pageserver(env: &local_env::LocalEnv, args: &ArgMatches) -> Result<PageServerNode> {
let node_id = if let Some(id_str) = args.get_one::<String>("pageserver-id") {
NodeId(id_str.parse().context("while parsing pageserver id")?)
} else {
DEFAULT_PAGESERVER_ID
};
Ok(PageServerNode::from_env(
env,
env.get_pageserver_conf(node_id)?,
))
}
match sub_match.subcommand() {
Some(("start", start_match)) => {
if let Err(e) = pageserver.start(&pageserver_config_overrides(start_match)) {
Some(("start", subcommand_args)) => {
if let Err(e) = get_pageserver(env, subcommand_args)?
.start(&pageserver_config_overrides(subcommand_args))
{
eprintln!("pageserver start failed: {e}");
exit(1);
}
}
Some(("stop", stop_match)) => {
let immediate = stop_match
Some(("stop", subcommand_args)) => {
let immediate = subcommand_args
.get_one::<String>("stop-mode")
.map(|s| s.as_str())
== Some("immediate");
if let Err(e) = pageserver.stop(immediate) {
if let Err(e) = get_pageserver(env, subcommand_args)?.stop(immediate) {
eprintln!("pageserver stop failed: {}", e);
exit(1);
}
}
Some(("restart", restart_match)) => {
Some(("restart", subcommand_args)) => {
let pageserver = get_pageserver(env, subcommand_args)?;
//TODO what shutdown strategy should we use here?
if let Err(e) = pageserver.stop(false) {
eprintln!("pageserver stop failed: {}", e);
exit(1);
}
if let Err(e) = pageserver.start(&pageserver_config_overrides(restart_match)) {
if let Err(e) = pageserver.start(&pageserver_config_overrides(subcommand_args)) {
eprintln!("pageserver start failed: {e}");
exit(1);
}
}
Some(("status", _)) => match PageServerNode::from_env(env).check_status() {
Ok(_) => println!("Page server is up and running"),
Err(err) => {
eprintln!("Page server is not available: {}", err);
exit(1);
Some(("status", subcommand_args)) => {
match get_pageserver(env, subcommand_args)?.check_status() {
Ok(_) => println!("Page server is up and running"),
Err(err) => {
eprintln!("Page server is not available: {}", err);
exit(1);
}
}
},
}
Some((sub_name, _)) => bail!("Unexpected pageserver subcommand '{}'", sub_name),
None => bail!("no pageserver subcommand provided"),
@@ -943,7 +990,7 @@ fn handle_start_all(sub_match: &ArgMatches, env: &local_env::LocalEnv) -> anyhow
broker::start_broker_process(env)?;
// Only start the attachment service if the pageserver is configured to need it
if env.pageserver.control_plane_api.is_some() {
if env.control_plane_api.is_some() {
let attachment_service = AttachmentService::from_env(env);
if let Err(e) = attachment_service.start() {
eprintln!("attachment_service start failed: {:#}", e);
@@ -952,11 +999,13 @@ fn handle_start_all(sub_match: &ArgMatches, env: &local_env::LocalEnv) -> anyhow
}
}
let pageserver = PageServerNode::from_env(env);
if let Err(e) = pageserver.start(&pageserver_config_overrides(sub_match)) {
eprintln!("pageserver {} start failed: {:#}", env.pageserver.id, e);
try_stop_all(env, true);
exit(1);
for ps_conf in &env.pageservers {
let pageserver = PageServerNode::from_env(env, ps_conf);
if let Err(e) = pageserver.start(&pageserver_config_overrides(sub_match)) {
eprintln!("pageserver {} start failed: {:#}", ps_conf.id, e);
try_stop_all(env, true);
exit(1);
}
}
for node in env.safekeepers.iter() {
@@ -980,8 +1029,6 @@ fn handle_stop_all(sub_match: &ArgMatches, env: &local_env::LocalEnv) -> Result<
}
fn try_stop_all(env: &local_env::LocalEnv, immediate: bool) {
let pageserver = PageServerNode::from_env(env);
// Stop all endpoints
match ComputeControlPlane::load(env.clone()) {
Ok(cplane) => {
@@ -996,8 +1043,11 @@ fn try_stop_all(env: &local_env::LocalEnv, immediate: bool) {
}
}
if let Err(e) = pageserver.stop(immediate) {
eprintln!("pageserver {} stop failed: {:#}", env.pageserver.id, e);
for ps_conf in &env.pageservers {
let pageserver = PageServerNode::from_env(env, ps_conf);
if let Err(e) = pageserver.stop(immediate) {
eprintln!("pageserver {} stop failed: {:#}", ps_conf.id, e);
}
}
for node in env.safekeepers.iter() {
@@ -1011,7 +1061,7 @@ fn try_stop_all(env: &local_env::LocalEnv, immediate: bool) {
eprintln!("neon broker stop failed: {e:#}");
}
if env.pageserver.control_plane_api.is_some() {
if env.control_plane_api.is_some() {
let attachment_service = AttachmentService::from_env(env);
if let Err(e) = attachment_service.stop(immediate) {
eprintln!("attachment service stop failed: {e:#}");
@@ -1031,6 +1081,16 @@ fn cli() -> Command {
let safekeeper_id_arg = Arg::new("id").help("safekeeper id").required(false);
// --id, when using a pageserver command
let pageserver_id_arg = Arg::new("pageserver-id")
.long("id")
.help("pageserver id")
.required(false);
// --pageserver-id when using a non-pageserver command
let endpoint_pageserver_id_arg = Arg::new("endpoint-pageserver-id")
.long("pageserver-id")
.required(false);
let safekeeper_extra_opt_arg = Arg::new("safekeeper-extra-opt")
.short('e')
.long("safekeeper-extra-opt")
@@ -1195,10 +1255,16 @@ fn cli() -> Command {
.arg_required_else_help(true)
.about("Manage pageserver")
.subcommand(Command::new("status"))
.subcommand(Command::new("start").about("Start local pageserver").arg(pageserver_config_args.clone()))
.arg(pageserver_id_arg.clone())
.subcommand(Command::new("start").about("Start local pageserver")
.arg(pageserver_id_arg.clone())
.arg(pageserver_config_args.clone()))
.subcommand(Command::new("stop").about("Stop local pageserver")
.arg(pageserver_id_arg.clone())
.arg(stop_mode_arg.clone()))
.subcommand(Command::new("restart").about("Restart local pageserver").arg(pageserver_config_args.clone()))
.subcommand(Command::new("restart").about("Restart local pageserver")
.arg(pageserver_id_arg.clone())
.arg(pageserver_config_args.clone()))
)
.subcommand(
Command::new("attachment_service")
@@ -1242,6 +1308,7 @@ fn cli() -> Command {
.arg(lsn_arg.clone())
.arg(pg_port_arg.clone())
.arg(http_port_arg.clone())
.arg(endpoint_pageserver_id_arg.clone())
.arg(
Arg::new("config-only")
.help("Don't do basebackup, create endpoint directory with only config files")
@@ -1259,6 +1326,7 @@ fn cli() -> Command {
.arg(lsn_arg)
.arg(pg_port_arg)
.arg(http_port_arg)
.arg(endpoint_pageserver_id_arg.clone())
.arg(pg_version_arg)
.arg(hot_standby_arg)
.arg(safekeepers_arg)

View File

@@ -70,6 +70,7 @@ pub struct EndpointConf {
http_port: u16,
pg_version: u32,
skip_pg_catalog_updates: bool,
pageserver_id: NodeId,
}
//
@@ -82,19 +83,16 @@ pub struct ComputeControlPlane {
pub endpoints: BTreeMap<String, Arc<Endpoint>>,
env: LocalEnv,
pageserver: Arc<PageServerNode>,
}
impl ComputeControlPlane {
// Load current endpoints from the endpoints/ subdirectories
pub fn load(env: LocalEnv) -> Result<ComputeControlPlane> {
let pageserver = Arc::new(PageServerNode::from_env(&env));
let mut endpoints = BTreeMap::default();
for endpoint_dir in std::fs::read_dir(env.endpoints_path())
.with_context(|| format!("failed to list {}", env.endpoints_path().display()))?
{
let ep = Endpoint::from_dir_entry(endpoint_dir?, &env, &pageserver)?;
let ep = Endpoint::from_dir_entry(endpoint_dir?, &env)?;
endpoints.insert(ep.endpoint_id.clone(), Arc::new(ep));
}
@@ -102,7 +100,6 @@ impl ComputeControlPlane {
base_port: 55431,
endpoints,
env,
pageserver,
})
}
@@ -125,15 +122,18 @@ impl ComputeControlPlane {
http_port: Option<u16>,
pg_version: u32,
mode: ComputeMode,
pageserver_id: NodeId,
) -> Result<Arc<Endpoint>> {
let pg_port = pg_port.unwrap_or_else(|| self.get_port());
let http_port = http_port.unwrap_or_else(|| self.get_port() + 1);
let pageserver =
PageServerNode::from_env(&self.env, self.env.get_pageserver_conf(pageserver_id)?);
let ep = Arc::new(Endpoint {
endpoint_id: endpoint_id.to_owned(),
pg_address: SocketAddr::new("127.0.0.1".parse().unwrap(), pg_port),
http_address: SocketAddr::new("127.0.0.1".parse().unwrap(), http_port),
env: self.env.clone(),
pageserver: Arc::clone(&self.pageserver),
pageserver,
timeline_id,
mode,
tenant_id,
@@ -159,6 +159,7 @@ impl ComputeControlPlane {
pg_port,
pg_version,
skip_pg_catalog_updates: true,
pageserver_id,
})?,
)?;
std::fs::write(
@@ -193,18 +194,14 @@ pub struct Endpoint {
// These are not part of the endpoint as such, but the environment
// the endpoint runs in.
pub env: LocalEnv,
pageserver: Arc<PageServerNode>,
pageserver: PageServerNode,
// Optimizations
skip_pg_catalog_updates: bool,
}
impl Endpoint {
fn from_dir_entry(
entry: std::fs::DirEntry,
env: &LocalEnv,
pageserver: &Arc<PageServerNode>,
) -> Result<Endpoint> {
fn from_dir_entry(entry: std::fs::DirEntry, env: &LocalEnv) -> Result<Endpoint> {
if !entry.file_type()?.is_dir() {
anyhow::bail!(
"Endpoint::from_dir_entry failed: '{}' is not a directory",
@@ -220,12 +217,15 @@ impl Endpoint {
let conf: EndpointConf =
serde_json::from_slice(&std::fs::read(entry.path().join("endpoint.json"))?)?;
let pageserver =
PageServerNode::from_env(env, env.get_pageserver_conf(conf.pageserver_id)?);
Ok(Endpoint {
pg_address: SocketAddr::new("127.0.0.1".parse().unwrap(), conf.pg_port),
http_address: SocketAddr::new("127.0.0.1".parse().unwrap(), conf.http_port),
endpoint_id,
env: env.clone(),
pageserver: Arc::clone(pageserver),
pageserver,
timeline_id: conf.timeline_id,
mode: conf.mode,
tenant_id: conf.tenant_id,

View File

@@ -68,11 +68,17 @@ pub struct LocalEnv {
pub broker: NeonBroker,
pub pageserver: PageServerConf,
/// This Vec must always contain at least one pageserver
pub pageservers: Vec<PageServerConf>,
#[serde(default)]
pub safekeepers: Vec<SafekeeperConf>,
// Control plane location: if None, we will not run attachment_service. If set, this will
// be propagated into each pageserver's configuration.
#[serde(default)]
pub control_plane_api: Option<Url>,
/// Keep human-readable aliases in memory (and persist them to config), to hide ZId hex strings from the user.
#[serde(default)]
// A `HashMap<String, HashMap<TenantId, TimelineId>>` would be more appropriate here,
@@ -118,9 +124,6 @@ pub struct PageServerConf {
// auth type used for the PG and HTTP ports
pub pg_auth_type: AuthType,
pub http_auth_type: AuthType,
// Control plane location
pub control_plane_api: Option<Url>,
}
impl Default for PageServerConf {
@@ -131,7 +134,6 @@ impl Default for PageServerConf {
listen_http_addr: String::new(),
pg_auth_type: AuthType::Trust,
http_auth_type: AuthType::Trust,
control_plane_api: None,
}
}
}
@@ -222,15 +224,23 @@ impl LocalEnv {
self.base_data_dir.join("endpoints")
}
// TODO: move pageserver files into ./pageserver
pub fn pageserver_data_dir(&self) -> PathBuf {
self.base_data_dir.clone()
pub fn pageserver_data_dir(&self, pageserver_id: NodeId) -> PathBuf {
self.base_data_dir
.join(format!("pageserver_{pageserver_id}"))
}
pub fn safekeeper_data_dir(&self, data_dir_name: &str) -> PathBuf {
self.base_data_dir.join("safekeepers").join(data_dir_name)
}
pub fn get_pageserver_conf(&self, id: NodeId) -> anyhow::Result<&PageServerConf> {
if let Some(conf) = self.pageservers.iter().find(|node| node.id == id) {
Ok(conf)
} else {
bail!("could not find pageserver {id}")
}
}
pub fn register_branch_mapping(
&mut self,
branch_name: String,
@@ -307,6 +317,10 @@ impl LocalEnv {
env.neon_distrib_dir = env::current_exe()?.parent().unwrap().to_owned();
}
if env.pageservers.is_empty() {
anyhow::bail!("Configuration must contain at least one pageserver");
}
env.base_data_dir = base_path();
Ok(env)
@@ -339,7 +353,7 @@ impl LocalEnv {
// We read that in, in `create_config`, and fill any missing defaults. Then it's saved
// to .neon/config. TODO: We lose any formatting and comments along the way, which is
// a bit sad.
let mut conf_content = r#"# This file describes a locale deployment of the page server
let mut conf_content = r#"# This file describes a local deployment of the page server
# and safekeeeper node. It is read by the 'neon_local' command-line
# utility.
"#
@@ -469,9 +483,9 @@ impl LocalEnv {
}
fn auth_keys_needed(&self) -> bool {
self.pageserver.pg_auth_type == AuthType::NeonJWT
|| self.pageserver.http_auth_type == AuthType::NeonJWT
|| self.safekeepers.iter().any(|sk| sk.auth_enabled)
self.pageservers.iter().any(|ps| {
ps.pg_auth_type == AuthType::NeonJWT || ps.http_auth_type == AuthType::NeonJWT
}) || self.safekeepers.iter().any(|sk| sk.auth_enabled)
}
}

View File

@@ -27,6 +27,7 @@ use utils::{
lsn::Lsn,
};
use crate::local_env::PageServerConf;
use crate::{background_process, local_env::LocalEnv};
#[derive(Error, Debug)]
@@ -76,43 +77,40 @@ impl ResponseErrorMessageExt for Response {
#[derive(Debug)]
pub struct PageServerNode {
pub pg_connection_config: PgConnectionConfig,
pub conf: PageServerConf,
pub env: LocalEnv,
pub http_client: Client,
pub http_base_url: String,
}
impl PageServerNode {
pub fn from_env(env: &LocalEnv) -> PageServerNode {
let (host, port) = parse_host_port(&env.pageserver.listen_pg_addr)
.expect("Unable to parse listen_pg_addr");
pub fn from_env(env: &LocalEnv, conf: &PageServerConf) -> PageServerNode {
let (host, port) =
parse_host_port(&conf.listen_pg_addr).expect("Unable to parse listen_pg_addr");
let port = port.unwrap_or(5432);
Self {
pg_connection_config: PgConnectionConfig::new_host_port(host, port),
conf: conf.clone(),
env: env.clone(),
http_client: Client::new(),
http_base_url: format!("http://{}/v1", env.pageserver.listen_http_addr),
http_base_url: format!("http://{}/v1", conf.listen_http_addr),
}
}
// pageserver conf overrides defined by neon_local configuration.
fn neon_local_overrides(&self) -> Vec<String> {
let id = format!("id={}", self.env.pageserver.id);
let id = format!("id={}", self.conf.id);
// FIXME: the paths should be shell-escaped to handle paths with spaces, quotas etc.
let pg_distrib_dir_param = format!(
"pg_distrib_dir='{}'",
self.env.pg_distrib_dir_raw().display()
);
let http_auth_type_param =
format!("http_auth_type='{}'", self.env.pageserver.http_auth_type);
let listen_http_addr_param = format!(
"listen_http_addr='{}'",
self.env.pageserver.listen_http_addr
);
let http_auth_type_param = format!("http_auth_type='{}'", self.conf.http_auth_type);
let listen_http_addr_param = format!("listen_http_addr='{}'", self.conf.listen_http_addr);
let pg_auth_type_param = format!("pg_auth_type='{}'", self.env.pageserver.pg_auth_type);
let listen_pg_addr_param =
format!("listen_pg_addr='{}'", self.env.pageserver.listen_pg_addr);
let pg_auth_type_param = format!("pg_auth_type='{}'", self.conf.pg_auth_type);
let listen_pg_addr_param = format!("listen_pg_addr='{}'", self.conf.listen_pg_addr);
let broker_endpoint_param = format!("broker_endpoint='{}'", self.env.broker.client_url());
@@ -126,17 +124,18 @@ impl PageServerNode {
broker_endpoint_param,
];
if let Some(control_plane_api) = &self.env.pageserver.control_plane_api {
if let Some(control_plane_api) = &self.env.control_plane_api {
overrides.push(format!(
"control_plane_api='{}'",
control_plane_api.as_str()
));
}
if self.env.pageserver.http_auth_type != AuthType::Trust
|| self.env.pageserver.pg_auth_type != AuthType::Trust
if self.conf.http_auth_type != AuthType::Trust || self.conf.pg_auth_type != AuthType::Trust
{
overrides.push("auth_validation_public_key_path='auth_public_key.pem'".to_owned());
// Keys are generated in the toplevel repo dir, pageservers' workdirs
// are one level below that, so refer to keys with ../
overrides.push("auth_validation_public_key_path='../auth_public_key.pem'".to_owned());
}
overrides
}
@@ -144,16 +143,12 @@ impl PageServerNode {
/// Initializes a pageserver node by creating its config with the overrides provided.
pub fn initialize(&self, config_overrides: &[&str]) -> anyhow::Result<()> {
// First, run `pageserver --init` and wait for it to write a config into FS and exit.
self.pageserver_init(config_overrides).with_context(|| {
format!(
"Failed to run init for pageserver node {}",
self.env.pageserver.id,
)
})
self.pageserver_init(config_overrides)
.with_context(|| format!("Failed to run init for pageserver node {}", self.conf.id,))
}
pub fn repo_path(&self) -> PathBuf {
self.env.pageserver_data_dir()
self.env.pageserver_data_dir(self.conf.id)
}
/// The pid file is created by the pageserver process, with its pid stored inside.
@@ -169,7 +164,7 @@ impl PageServerNode {
fn pageserver_init(&self, config_overrides: &[&str]) -> anyhow::Result<()> {
let datadir = self.repo_path();
let node_id = self.env.pageserver.id;
let node_id = self.conf.id;
println!(
"Initializing pageserver node {} at '{}' in {:?}",
node_id,
@@ -178,6 +173,10 @@ impl PageServerNode {
);
io::stdout().flush()?;
if !datadir.exists() {
std::fs::create_dir(&datadir)?;
}
let datadir_path_str = datadir.to_str().with_context(|| {
format!("Cannot start pageserver node {node_id} in path that has no string representation: {datadir:?}")
})?;
@@ -208,7 +207,7 @@ impl PageServerNode {
let datadir = self.repo_path();
print!(
"Starting pageserver node {} at '{}' in {:?}",
self.env.pageserver.id,
self.conf.id,
self.pg_connection_config.raw_address(),
datadir
);
@@ -217,7 +216,7 @@ impl PageServerNode {
let datadir_path_str = datadir.to_str().with_context(|| {
format!(
"Cannot start pageserver node {} in path that has no string representation: {:?}",
self.env.pageserver.id, datadir,
self.conf.id, datadir,
)
})?;
let mut args = self.pageserver_basic_args(config_overrides, datadir_path_str);
@@ -261,7 +260,7 @@ impl PageServerNode {
// FIXME: why is this tied to pageserver's auth type? Whether or not the safekeeper
// needs a token, and how to generate that token, seems independent to whether
// the pageserver requires a token in incoming requests.
Ok(if self.env.pageserver.http_auth_type != AuthType::Trust {
Ok(if self.conf.http_auth_type != AuthType::Trust {
// Generate a token to connect from the pageserver to a safekeeper
let token = self
.env
@@ -286,7 +285,7 @@ impl PageServerNode {
pub fn page_server_psql_client(&self) -> anyhow::Result<postgres::Client> {
let mut config = self.pg_connection_config.clone();
if self.env.pageserver.pg_auth_type == AuthType::NeonJWT {
if self.conf.pg_auth_type == AuthType::NeonJWT {
let token = self
.env
.generate_auth_token(&Claims::new(None, Scope::PageServerApi))?;
@@ -297,7 +296,7 @@ impl PageServerNode {
fn http_request<U: IntoUrl>(&self, method: Method, url: U) -> anyhow::Result<RequestBuilder> {
let mut builder = self.http_client.request(method, url);
if self.env.pageserver.http_auth_type == AuthType::NeonJWT {
if self.conf.http_auth_type == AuthType::NeonJWT {
let token = self
.env
.generate_auth_token(&Claims::new(None, Scope::PageServerApi))?;

View File

@@ -416,6 +416,7 @@ class NeonEnvBuilder:
pageserver_remote_storage: Optional[RemoteStorage] = None,
pageserver_config_override: Optional[str] = None,
num_safekeepers: int = 1,
num_pageservers: int = 1,
# Use non-standard SK ids to check for various parsing bugs
safekeepers_id_start: int = 0,
# fsync is disabled by default to make the tests go faster
@@ -443,6 +444,7 @@ class NeonEnvBuilder:
self.mock_s3_server: MockS3Server = mock_s3_server
self.pageserver_config_override = pageserver_config_override
self.num_safekeepers = num_safekeepers
self.num_pageservers = num_pageservers
self.safekeepers_id_start = safekeepers_id_start
self.safekeepers_enable_fsync = safekeepers_enable_fsync
self.auth_enabled = auth_enabled
@@ -613,7 +615,9 @@ class NeonEnvBuilder:
self.env.endpoints.stop_all()
for sk in self.env.safekeepers:
sk.stop(immediate=True)
self.env.pageserver.stop(immediate=True)
for pageserver in self.env.pageservers:
pageserver.stop(immediate=True)
if self.env.attachment_service is not None:
self.env.attachment_service.stop(immediate=True)
@@ -644,7 +648,8 @@ class NeonEnvBuilder:
if cleanup_error is not None:
raise cleanup_error
self.env.pageserver.assert_no_errors()
for pageserver in self.env.pageservers:
pageserver.assert_no_errors()
class NeonEnv:
@@ -664,8 +669,7 @@ class NeonEnv:
postgres - A factory object for creating postgres compute nodes.
pageserver - An object that contains functions for manipulating and
connecting to the pageserver
pageservers - An array containing objects representing the pageservers
safekeepers - An array containing objects representing the safekeepers
@@ -680,7 +684,7 @@ class NeonEnv:
the tenant id
"""
PAGESERVER_ID = 1
BASE_PAGESERVER_ID = 1
def __init__(self, config: NeonEnvBuilder):
self.repo_dir = config.repo_dir
@@ -690,14 +694,20 @@ class NeonEnv:
self.neon_cli = NeonCli(env=self)
self.endpoints = EndpointFactory(self)
self.safekeepers: List[Safekeeper] = []
self.pageservers: List[NeonPageserver] = []
self.broker = config.broker
self.pageserver_remote_storage = config.pageserver_remote_storage
self.ext_remote_storage = config.ext_remote_storage
self.safekeepers_remote_storage = config.sk_remote_storage
self.pg_version = config.pg_version
# Binary path for pageserver, safekeeper, etc
self.neon_binpath = config.neon_binpath
# Binary path for neon_local test-specific binaries: may be overridden
# after construction for compat testing
self.neon_local_binpath = config.neon_binpath
self.pg_distrib_dir = config.pg_distrib_dir
self.endpoint_counter = 0
self.pageserver_config_override = config.pageserver_config_override
# generate initial tenant ID here instead of letting 'neon init' generate it,
# so that we don't need to dig it out of the config file afterwards.
@@ -719,6 +729,13 @@ class NeonEnv:
"""
)
if self.control_plane_api is not None:
toml += textwrap.dedent(
f"""
control_plane_api = '{self.control_plane_api}'
"""
)
toml += textwrap.dedent(
f"""
[broker]
@@ -727,36 +744,36 @@ class NeonEnv:
)
# Create config for pageserver
pageserver_port = PageserverPort(
pg=self.port_distributor.get_port(),
http=self.port_distributor.get_port(),
)
http_auth_type = "NeonJWT" if config.auth_enabled else "Trust"
pg_auth_type = "NeonJWT" if config.auth_enabled else "Trust"
for ps_id in range(
self.BASE_PAGESERVER_ID, self.BASE_PAGESERVER_ID + config.num_pageservers
):
pageserver_port = PageserverPort(
pg=self.port_distributor.get_port(),
http=self.port_distributor.get_port(),
)
toml += textwrap.dedent(
f"""
[pageserver]
id={self.PAGESERVER_ID}
listen_pg_addr = 'localhost:{pageserver_port.pg}'
listen_http_addr = 'localhost:{pageserver_port.http}'
pg_auth_type = '{pg_auth_type}'
http_auth_type = '{http_auth_type}'
"""
)
if self.control_plane_api is not None:
toml += textwrap.dedent(
f"""
control_plane_api = '{self.control_plane_api}'
[[pageservers]]
id={ps_id}
listen_pg_addr = 'localhost:{pageserver_port.pg}'
listen_http_addr = 'localhost:{pageserver_port.http}'
pg_auth_type = '{pg_auth_type}'
http_auth_type = '{http_auth_type}'
"""
)
# Create a corresponding NeonPageserver object
self.pageserver = NeonPageserver(
self, port=pageserver_port, config_override=config.pageserver_config_override
)
# Create a corresponding NeonPageserver object
self.pageservers.append(
NeonPageserver(
self,
ps_id,
port=pageserver_port,
config_override=config.pageserver_config_override,
)
)
# Create config and a Safekeeper object for each safekeeper
for i in range(1, config.num_safekeepers + 1):
port = SafekeeperPort(
@@ -798,25 +815,55 @@ class NeonEnv:
if self.attachment_service is not None:
self.attachment_service.start()
self.pageserver.start()
for pageserver in self.pageservers:
pageserver.start()
for safekeeper in self.safekeepers:
safekeeper.start()
@property
def pageserver(self) -> NeonPageserver:
"""
For tests that are naive to multiple pageservers: give them the 1st in the list, and
assert that there is only one. Tests with multiple pageservers should always use
get_pageserver with an explicit ID.
"""
assert len(self.pageservers) == 1
return self.pageservers[0]
def get_pageserver(self, id: Optional[int]) -> NeonPageserver:
"""
Look up a pageserver by its node ID.
As a convenience for tests that do not use multiple pageservers, passing None
will yield the same default pageserver as `self.pageserver`.
"""
if id is None:
return self.pageserver
for ps in self.pageservers:
if ps.id == id:
return ps
raise RuntimeError(f"Pageserver with ID {id} not found")
def get_safekeeper_connstrs(self) -> str:
"""Get list of safekeeper endpoints suitable for safekeepers GUC"""
return ",".join(f"localhost:{wa.port.pg}" for wa in self.safekeepers)
def timeline_dir(self, tenant_id: TenantId, timeline_id: TimelineId) -> Path:
"""Get a timeline directory's path based on the repo directory of the test environment"""
return self.tenant_dir(tenant_id) / "timelines" / str(timeline_id)
def tenant_dir(
self,
tenant_id: TenantId,
def timeline_dir(
self, tenant_id: TenantId, timeline_id: TimelineId, pageserver_id: Optional[int] = None
) -> Path:
"""Get a timeline directory's path based on the repo directory of the test environment"""
return (
self.tenant_dir(tenant_id, pageserver_id=pageserver_id) / "timelines" / str(timeline_id)
)
def tenant_dir(self, tenant_id: TenantId, pageserver_id: Optional[int] = None) -> Path:
"""Get a tenant directory's path based on the repo directory of the test environment"""
return self.repo_dir / "tenants" / str(tenant_id)
return self.get_pageserver(pageserver_id).workdir / "tenants" / str(tenant_id)
def get_pageserver_version(self) -> str:
bin_pageserver = str(self.neon_binpath / "pageserver")
@@ -984,6 +1031,7 @@ class AbstractNeonCli(abc.ABC):
extra_env_vars: Optional[Dict[str, str]] = None,
check_return_code=True,
timeout=None,
local_binpath=False,
) -> "subprocess.CompletedProcess[str]":
"""
Run the command with the specified arguments.
@@ -997,12 +1045,19 @@ class AbstractNeonCli(abc.ABC):
>>> log.info(result.stdout)
If `check_return_code`, on non-zero exit code logs failure and raises.
If `local_binpath` is true, then we are invoking a test utility
"""
assert type(arguments) == list
assert type(self.COMMAND) == str
bin_neon = str(self.env.neon_binpath / self.COMMAND)
if local_binpath:
# Test utility
bin_neon = str(self.env.neon_local_binpath / self.COMMAND)
else:
# Normal binary
bin_neon = str(self.env.neon_binpath / self.COMMAND)
args = [bin_neon] + arguments
log.info('Running command "{}"'.format(" ".join(args)))
@@ -1056,6 +1111,10 @@ class NeonCli(AbstractNeonCli):
COMMAND = "neon_local"
def raw_cli(self, *args, **kwargs) -> subprocess.CompletedProcess[str]:
kwargs["local_binpath"] = True
return super().raw_cli(*args, **kwargs)
def create_tenant(
self,
tenant_id: Optional[TenantId] = None,
@@ -1214,7 +1273,7 @@ class NeonCli(AbstractNeonCli):
append_pageserver_param_overrides(
params_to_update=cmd,
remote_storage=storage,
pageserver_config_override=self.env.pageserver.config_override,
pageserver_config_override=self.env.pageserver_config_override,
)
s3_env_vars = None
@@ -1236,15 +1295,16 @@ class NeonCli(AbstractNeonCli):
def pageserver_start(
self,
id: int,
overrides: Tuple[str, ...] = (),
extra_env_vars: Optional[Dict[str, str]] = None,
) -> "subprocess.CompletedProcess[str]":
start_args = ["pageserver", "start", *overrides]
start_args = ["pageserver", "start", f"--id={id}", *overrides]
storage = self.env.pageserver_remote_storage
append_pageserver_param_overrides(
params_to_update=start_args,
remote_storage=storage,
pageserver_config_override=self.env.pageserver.config_override,
pageserver_config_override=self.env.pageserver_config_override,
)
if isinstance(storage, S3Storage):
@@ -1253,8 +1313,8 @@ class NeonCli(AbstractNeonCli):
return self.raw_cli(start_args, extra_env_vars=extra_env_vars)
def pageserver_stop(self, immediate=False) -> "subprocess.CompletedProcess[str]":
cmd = ["pageserver", "stop"]
def pageserver_stop(self, id: int, immediate=False) -> "subprocess.CompletedProcess[str]":
cmd = ["pageserver", "stop", f"--id={id}"]
if immediate:
cmd.extend(["-m", "immediate"])
@@ -1295,6 +1355,7 @@ class NeonCli(AbstractNeonCli):
tenant_id: Optional[TenantId] = None,
hot_standby: bool = False,
lsn: Optional[Lsn] = None,
pageserver_id: Optional[int] = None,
) -> "subprocess.CompletedProcess[str]":
args = [
"endpoint",
@@ -1316,6 +1377,8 @@ class NeonCli(AbstractNeonCli):
args.append(endpoint_id)
if hot_standby:
args.extend(["--hot-standby", "true"])
if pageserver_id is not None:
args.extend(["--pageserver-id", str(pageserver_id)])
res = self.raw_cli(args)
res.check_returncode()
@@ -1331,6 +1394,7 @@ class NeonCli(AbstractNeonCli):
lsn: Optional[Lsn] = None,
branch_name: Optional[str] = None,
remote_ext_config: Optional[str] = None,
pageserver_id: Optional[int] = None,
) -> "subprocess.CompletedProcess[str]":
args = [
"endpoint",
@@ -1353,6 +1417,8 @@ class NeonCli(AbstractNeonCli):
args.extend(["--branch-name", branch_name])
if endpoint_id is not None:
args.append(endpoint_id)
if pageserver_id is not None:
args.extend(["--pageserver-id", str(pageserver_id)])
storage = self.env.ext_remote_storage
s3_env_vars = None
@@ -1452,9 +1518,12 @@ class NeonPageserver(PgProtocol):
TEMP_FILE_SUFFIX = "___temp"
def __init__(self, env: NeonEnv, port: PageserverPort, config_override: Optional[str] = None):
def __init__(
self, env: NeonEnv, id: int, port: PageserverPort, config_override: Optional[str] = None
):
super().__init__(host="localhost", port=port.pg, user="cloud_admin")
self.env = env
self.id = id
self.running = False
self.service_port = port
self.config_override = config_override
@@ -1528,7 +1597,9 @@ class NeonPageserver(PgProtocol):
"""
assert self.running is False
self.env.neon_cli.pageserver_start(overrides=overrides, extra_env_vars=extra_env_vars)
self.env.neon_cli.pageserver_start(
self.id, overrides=overrides, extra_env_vars=extra_env_vars
)
self.running = True
return self
@@ -1538,7 +1609,7 @@ class NeonPageserver(PgProtocol):
Returns self.
"""
if self.running:
self.env.neon_cli.pageserver_stop(immediate)
self.env.neon_cli.pageserver_stop(self.id, immediate)
self.running = False
return self
@@ -1564,8 +1635,12 @@ class NeonPageserver(PgProtocol):
is_testing_enabled_or_skip=self.is_testing_enabled_or_skip,
)
@property
def workdir(self) -> Path:
return Path(os.path.join(self.env.repo_dir, f"pageserver_{self.id}"))
def assert_no_errors(self):
logfile = open(os.path.join(self.env.repo_dir, "pageserver.log"), "r")
logfile = open(os.path.join(self.workdir, "pageserver.log"), "r")
error_or_warn = re.compile(r"\s(ERROR|WARN)")
errors = []
while True:
@@ -1588,7 +1663,7 @@ class NeonPageserver(PgProtocol):
def log_contains(self, pattern: str) -> Optional[str]:
"""Check that the pageserver log contains a line that matches the given regex"""
logfile = open(os.path.join(self.env.repo_dir, "pageserver.log"), "r")
logfile = open(os.path.join(self.workdir, "pageserver.log"), "r")
contains_re = re.compile(pattern)
@@ -1618,14 +1693,14 @@ class NeonPageserver(PgProtocol):
if self.env.attachment_service is not None:
response = requests.post(
f"{self.env.control_plane_api}/attach_hook",
json={"tenant_id": str(tenant_id), "pageserver_id": self.env.PAGESERVER_ID},
json={"tenant_id": str(tenant_id), "pageserver_id": self.id},
)
response.raise_for_status()
generation = response.json()["gen"]
else:
generation = None
client = self.env.pageserver.http_client()
client = self.http_client()
return client.tenant_attach(tenant_id, config, config_null, generation=generation)
@@ -2236,6 +2311,7 @@ class Endpoint(PgProtocol):
hot_standby: bool = False,
lsn: Optional[Lsn] = None,
config_lines: Optional[List[str]] = None,
pageserver_id: Optional[int] = None,
) -> "Endpoint":
"""
Create a new Postgres endpoint.
@@ -2257,6 +2333,7 @@ class Endpoint(PgProtocol):
hot_standby=hot_standby,
pg_port=self.pg_port,
http_port=self.http_port,
pageserver_id=pageserver_id,
)
path = Path("endpoints") / self.endpoint_id / "pgdata"
self.pgdata_dir = os.path.join(self.env.repo_dir, path)
@@ -2270,7 +2347,9 @@ class Endpoint(PgProtocol):
return self
def start(self, remote_ext_config: Optional[str] = None) -> "Endpoint":
def start(
self, remote_ext_config: Optional[str] = None, pageserver_id: Optional[int] = None
) -> "Endpoint":
"""
Start the Postgres instance.
Returns self.
@@ -2287,6 +2366,7 @@ class Endpoint(PgProtocol):
tenant_id=self.tenant_id,
safekeepers=self.active_safekeepers,
remote_ext_config=remote_ext_config,
pageserver_id=pageserver_id,
)
self.running = True
@@ -2377,6 +2457,7 @@ class Endpoint(PgProtocol):
lsn: Optional[Lsn] = None,
config_lines: Optional[List[str]] = None,
remote_ext_config: Optional[str] = None,
pageserver_id: Optional[int] = None,
) -> "Endpoint":
"""
Create an endpoint, apply config, and start Postgres.
@@ -2391,6 +2472,7 @@ class Endpoint(PgProtocol):
config_lines=config_lines,
hot_standby=hot_standby,
lsn=lsn,
pageserver_id=pageserver_id,
).start(remote_ext_config=remote_ext_config)
log.info(f"Postgres startup took {time.time() - started_at} seconds")
@@ -2426,6 +2508,7 @@ class EndpointFactory:
hot_standby: bool = False,
config_lines: Optional[List[str]] = None,
remote_ext_config: Optional[str] = None,
pageserver_id: Optional[int] = None,
) -> Endpoint:
ep = Endpoint(
self.env,
@@ -2443,6 +2526,7 @@ class EndpointFactory:
config_lines=config_lines,
lsn=lsn,
remote_ext_config=remote_ext_config,
pageserver_id=pageserver_id,
)
def create(
@@ -2877,9 +2961,7 @@ def list_files_to_compare(pgdata_dir: Path) -> List[str]:
# pg is the existing and running compute node, that we want to compare with a basebackup
def check_restored_datadir_content(
test_output_dir: Path,
env: NeonEnv,
endpoint: Endpoint,
test_output_dir: Path, env: NeonEnv, endpoint: Endpoint, pageserver_id: Optional[int] = None
):
# Get the timeline ID. We need it for the 'basebackup' command
timeline_id = TimelineId(endpoint.safe_psql("SHOW neon.timeline_id")[0][0])
@@ -2904,7 +2986,7 @@ def check_restored_datadir_content(
cmd = rf"""
{psql_path} \
--no-psqlrc \
postgres://localhost:{env.pageserver.service_port.pg} \
postgres://localhost:{env.get_pageserver(pageserver_id).service_port.pg} \
-c 'basebackup {endpoint.tenant_id} {timeline_id}' \
| tar -x -C {restored_dir_path}
"""
@@ -2954,19 +3036,32 @@ def check_restored_datadir_content(
def wait_for_last_flush_lsn(
env: NeonEnv, endpoint: Endpoint, tenant: TenantId, timeline: TimelineId
env: NeonEnv,
endpoint: Endpoint,
tenant: TenantId,
timeline: TimelineId,
pageserver_id: Optional[int] = None,
) -> Lsn:
"""Wait for pageserver to catch up the latest flush LSN, returns the last observed lsn."""
last_flush_lsn = Lsn(endpoint.safe_psql("SELECT pg_current_wal_flush_lsn()")[0][0])
return wait_for_last_record_lsn(env.pageserver.http_client(), tenant, timeline, last_flush_lsn)
return wait_for_last_record_lsn(
env.get_pageserver(pageserver_id).http_client(), tenant, timeline, last_flush_lsn
)
def wait_for_wal_insert_lsn(
env: NeonEnv, endpoint: Endpoint, tenant: TenantId, timeline: TimelineId
env: NeonEnv,
endpoint: Endpoint,
tenant: TenantId,
timeline: TimelineId,
pageserver_id: Optional[int] = None,
) -> Lsn:
"""Wait for pageserver to catch up the latest flush LSN, returns the last observed lsn."""
last_flush_lsn = Lsn(endpoint.safe_psql("SELECT pg_current_wal_insert_lsn()")[0][0])
return wait_for_last_record_lsn(env.pageserver.http_client(), tenant, timeline, last_flush_lsn)
return wait_for_last_record_lsn(
env.get_pageserver(pageserver_id).http_client(), tenant, timeline, last_flush_lsn
)
def fork_at_current_lsn(
@@ -2986,15 +3081,21 @@ def fork_at_current_lsn(
def last_flush_lsn_upload(
env: NeonEnv, endpoint: Endpoint, tenant_id: TenantId, timeline_id: TimelineId
env: NeonEnv,
endpoint: Endpoint,
tenant_id: TenantId,
timeline_id: TimelineId,
pageserver_id: Optional[int] = None,
) -> Lsn:
"""
Wait for pageserver to catch to the latest flush LSN of given endpoint,
checkpoint pageserver, and wait for it to be uploaded (remote_consistent_lsn
reaching flush LSN).
"""
last_flush_lsn = wait_for_last_flush_lsn(env, endpoint, tenant_id, timeline_id)
ps_http = env.pageserver.http_client()
last_flush_lsn = wait_for_last_flush_lsn(
env, endpoint, tenant_id, timeline_id, pageserver_id=pageserver_id
)
ps_http = env.get_pageserver(pageserver_id).http_client()
wait_for_last_record_lsn(ps_http, tenant_id, timeline_id, last_flush_lsn)
# force a checkpoint to trigger upload
ps_http.timeline_checkpoint(tenant_id, timeline_id)

View File

@@ -44,14 +44,14 @@ def test_broken_timeline(neon_env_builder: NeonEnvBuilder):
log.info(f"Timeline {tenant0}/{timeline0} is left intact")
(tenant1, timeline1, pg1) = tenant_timelines[1]
metadata_path = f"{env.repo_dir}/tenants/{tenant1}/timelines/{timeline1}/metadata"
metadata_path = f"{env.pageserver.workdir}/tenants/{tenant1}/timelines/{timeline1}/metadata"
f = open(metadata_path, "w")
f.write("overwritten with garbage!")
f.close()
log.info(f"Timeline {tenant1}/{timeline1} got its metadata spoiled")
(tenant2, timeline2, pg2) = tenant_timelines[2]
timeline_path = f"{env.repo_dir}/tenants/{tenant2}/timelines/{timeline2}/"
timeline_path = f"{env.pageserver.workdir}/tenants/{tenant2}/timelines/{timeline2}/"
for filename in os.listdir(timeline_path):
if filename.startswith("00000"):
# Looks like a layer file. Remove it
@@ -61,7 +61,7 @@ def test_broken_timeline(neon_env_builder: NeonEnvBuilder):
)
(tenant3, timeline3, pg3) = tenant_timelines[3]
timeline_path = f"{env.repo_dir}/tenants/{tenant3}/timelines/{timeline3}/"
timeline_path = f"{env.pageserver.workdir}/tenants/{tenant3}/timelines/{timeline3}/"
for filename in os.listdir(timeline_path):
if filename.startswith("00000"):
# Looks like a layer file. Corrupt it
@@ -135,7 +135,7 @@ def test_timeline_init_break_before_checkpoint(neon_simple_env: NeonEnv):
tenant_id, _ = env.neon_cli.create_tenant()
timelines_dir = env.repo_dir / "tenants" / str(tenant_id) / "timelines"
timelines_dir = env.pageserver.workdir / "tenants" / str(tenant_id) / "timelines"
old_tenant_timelines = env.neon_cli.list_timelines(tenant_id)
initial_timeline_dirs = [d for d in timelines_dir.iterdir()]
@@ -145,8 +145,8 @@ def test_timeline_init_break_before_checkpoint(neon_simple_env: NeonEnv):
_ = env.neon_cli.create_timeline("test_timeline_init_break_before_checkpoint", tenant_id)
# Restart the page server
env.neon_cli.pageserver_stop(immediate=True)
env.neon_cli.pageserver_start()
env.pageserver.stop(immediate=True)
env.pageserver.start()
# Creating the timeline didn't finish. The other timelines on tenant should still be present and work normally.
new_tenant_timelines = env.neon_cli.list_timelines(tenant_id)
@@ -166,7 +166,7 @@ def test_timeline_create_break_after_uninit_mark(neon_simple_env: NeonEnv):
tenant_id, _ = env.neon_cli.create_tenant()
timelines_dir = env.repo_dir / "tenants" / str(tenant_id) / "timelines"
timelines_dir = env.pageserver.workdir / "tenants" / str(tenant_id) / "timelines"
old_tenant_timelines = env.neon_cli.list_timelines(tenant_id)
initial_timeline_dirs = [d for d in timelines_dir.iterdir()]

View File

@@ -33,7 +33,7 @@ def test_lsof_pageserver_pid(neon_simple_env: NeonEnv):
workload_thread = threading.Thread(target=start_workload, args=(), daemon=True)
workload_thread.start()
path = os.path.join(env.repo_dir, "pageserver.pid")
path = os.path.join(env.pageserver.workdir, "pageserver.pid")
lsof = lsof_path()
while workload_thread.is_alive():
res = subprocess.run(

View File

@@ -133,7 +133,6 @@ def test_backward_compatibility(
prepare_snapshot(
from_dir=compatibility_snapshot_dir,
to_dir=test_output_dir / "compatibility_snapshot",
neon_binpath=neon_binpath,
port_distributor=port_distributor,
)
@@ -202,7 +201,6 @@ def test_forward_compatibility(
from_dir=compatibility_snapshot_dir,
to_dir=test_output_dir / "compatibility_snapshot",
port_distributor=port_distributor,
neon_binpath=compatibility_neon_bin,
pg_distrib_dir=compatibility_postgres_distrib_dir,
)
@@ -234,7 +232,6 @@ def prepare_snapshot(
from_dir: Path,
to_dir: Path,
port_distributor: PortDistributor,
neon_binpath: Path,
pg_distrib_dir: Optional[Path] = None,
):
assert from_dir.exists(), f"Snapshot '{from_dir}' doesn't exist"
@@ -246,6 +243,9 @@ def prepare_snapshot(
repo_dir = to_dir / "repo"
snapshot_config_toml = repo_dir / "config"
snapshot_config = toml.load(snapshot_config_toml)
# Remove old logs to avoid confusion in test artifacts
for logfile in repo_dir.glob("**/*.log"):
logfile.unlink()
@@ -259,31 +259,53 @@ def prepare_snapshot(
os.mkdir(repo_dir / "endpoints")
# Update paths and ports in config files
pageserver_toml = repo_dir / "pageserver.toml"
pageserver_config = toml.load(pageserver_toml)
pageserver_config["remote_storage"]["local_path"] = LocalFsStorage.component_path(
repo_dir, RemoteStorageUser.PAGESERVER
)
for param in ("listen_http_addr", "listen_pg_addr", "broker_endpoint"):
pageserver_config[param] = port_distributor.replace_with_new_port(pageserver_config[param])
legacy_pageserver_toml = repo_dir / "pageserver.toml"
legacy_bundle = os.path.exists(legacy_pageserver_toml)
# We don't use authentication in compatibility tests
# so just remove authentication related settings.
pageserver_config.pop("pg_auth_type", None)
pageserver_config.pop("http_auth_type", None)
if pg_distrib_dir:
pageserver_config["pg_distrib_dir"] = str(pg_distrib_dir)
with pageserver_toml.open("w") as f:
toml.dump(pageserver_config, f)
snapshot_config_toml = repo_dir / "config"
snapshot_config = toml.load(snapshot_config_toml)
for param in ("listen_http_addr", "listen_pg_addr"):
snapshot_config["pageserver"][param] = port_distributor.replace_with_new_port(
snapshot_config["pageserver"][param]
path_to_config: dict[Path, dict[Any, Any]] = {}
if legacy_bundle:
os.mkdir(repo_dir / "pageserver_1")
path_to_config[repo_dir / "pageserver_1" / "pageserver.toml"] = toml.load(
legacy_pageserver_toml
)
os.remove(legacy_pageserver_toml)
os.rename(repo_dir / "tenants", repo_dir / "pageserver_1" / "tenants")
else:
for ps_conf in snapshot_config["pageservers"]:
config_path = repo_dir / f"pageserver_{ps_conf['id']}" / "pageserver.toml"
path_to_config[config_path] = toml.load(config_path)
# For each pageserver config, edit it and rewrite
for config_path, pageserver_config in path_to_config.items():
pageserver_config["remote_storage"]["local_path"] = LocalFsStorage.component_path(
repo_dir, RemoteStorageUser.PAGESERVER
)
for param in ("listen_http_addr", "listen_pg_addr", "broker_endpoint"):
pageserver_config[param] = port_distributor.replace_with_new_port(
pageserver_config[param]
)
# We don't use authentication in compatibility tests
# so just remove authentication related settings.
pageserver_config.pop("pg_auth_type", None)
pageserver_config.pop("http_auth_type", None)
if pg_distrib_dir:
pageserver_config["pg_distrib_dir"] = str(pg_distrib_dir)
with config_path.open("w") as f:
toml.dump(pageserver_config, f)
# neon_local config doesn't have to be backward compatible. If we're using a dump from before
# it supported multiple pageservers, fix it up.
if "pageservers" not in snapshot_config:
snapshot_config["pageservers"] = [snapshot_config["pageserver"]]
del snapshot_config["pageserver"]
for param in ("listen_http_addr", "listen_pg_addr"):
for pageserver in snapshot_config["pageservers"]:
pageserver[param] = port_distributor.replace_with_new_port(pageserver[param])
snapshot_config["broker"]["listen_addr"] = port_distributor.replace_with_new_port(
snapshot_config["broker"]["listen_addr"]
)
@@ -347,6 +369,9 @@ def check_neon_works(
# Use the "target" binaries to launch the storage nodes
config_target = config
config_target.neon_binpath = neon_target_binpath
# We are using maybe-old binaries for neon services, but want to use current
# binaries for test utilities like neon_local
config_target.neon_local_binpath = neon_current_binpath
cli_target = NeonCli(config_target)
# And the current binaries to launch computes
@@ -379,7 +404,7 @@ def check_neon_works(
# loosely based on https://github.com/neondatabase/cloud/wiki/Recovery-from-WAL
tenant_id = snapshot_config["default_tenant_id"]
timeline_id = dict(snapshot_config["branch_name_mappings"]["main"])[tenant_id]
pageserver_port = snapshot_config["pageserver"]["listen_http_addr"].split(":")[-1]
pageserver_port = snapshot_config["pageservers"][0]["listen_http_addr"].split(":")[-1]
pageserver_http = PageserverHttpClient(
port=pageserver_port,
is_testing_enabled_or_skip=lambda: True, # TODO: check if testing really enabled

View File

@@ -271,7 +271,7 @@ def _import(
env.endpoints.stop_all()
env.pageserver.stop()
dir_to_clear = Path(env.repo_dir) / "tenants"
dir_to_clear = Path(env.pageserver.workdir) / "tenants"
shutil.rmtree(dir_to_clear)
os.mkdir(dir_to_clear)

View File

@@ -75,7 +75,7 @@ def test_large_schema(neon_env_builder: NeonEnvBuilder):
# Check layer file sizes
timeline_path = "{}/tenants/{}/timelines/{}/".format(
env.repo_dir, env.initial_tenant, env.initial_timeline
env.pageserver.workdir, env.initial_tenant, env.initial_timeline
)
for filename in os.listdir(timeline_path):
if filename.startswith("00000"):

View File

@@ -124,10 +124,14 @@ def test_cli_ipv4_listeners(neon_env_builder: NeonEnvBuilder):
def test_cli_start_stop(neon_env_builder: NeonEnvBuilder):
"""
Basic start/stop with default single-instance config for
safekeeper and pageserver
"""
env = neon_env_builder.init_start()
# Stop default ps/sk
env.neon_cli.pageserver_stop()
env.neon_cli.pageserver_stop(env.pageserver.id)
env.neon_cli.safekeeper_stop()
# Default start
@@ -139,6 +143,38 @@ def test_cli_start_stop(neon_env_builder: NeonEnvBuilder):
res.check_returncode()
def test_cli_start_stop_multi(neon_env_builder: NeonEnvBuilder):
"""
Basic start/stop with explicitly configured counts of pageserver
and safekeeper
"""
neon_env_builder.num_pageservers = 2
neon_env_builder.num_safekeepers = 2
env = neon_env_builder.init_start()
env.neon_cli.pageserver_stop(env.BASE_PAGESERVER_ID)
env.neon_cli.pageserver_stop(env.BASE_PAGESERVER_ID + 1)
# Addressing a nonexistent ID throws
with pytest.raises(RuntimeError):
env.neon_cli.pageserver_stop(env.BASE_PAGESERVER_ID + 100)
# Using the single-pageserver shortcut property throws when there are multiple pageservers
with pytest.raises(AssertionError):
_drop = env.pageserver
env.neon_cli.safekeeper_stop(neon_env_builder.safekeepers_id_start + 1)
env.neon_cli.safekeeper_stop(neon_env_builder.safekeepers_id_start + 2)
# Default start
res = env.neon_cli.raw_cli(["start"])
res.check_returncode()
# Default stop
res = env.neon_cli.raw_cli(["stop"])
res.check_returncode()
@skip_on_postgres(PgVersion.V14, reason="does not use postgres")
@pytest.mark.skipif(
os.environ.get("BUILD_TYPE") == "debug", reason="unit test for test support, either build works"

View File

@@ -115,7 +115,7 @@ def test_ondemand_download_large_rel(
env.pageserver.stop()
# remove all the layer files
for layer in (Path(env.repo_dir) / "tenants").glob("*/timelines/*/*-*_*"):
for layer in (Path(env.pageserver.workdir) / "tenants").glob("*/timelines/*/*-*_*"):
log.info(f"unlinking layer {layer}")
layer.unlink()
@@ -237,7 +237,7 @@ def test_ondemand_download_timetravel(
env.pageserver.stop()
# remove all the layer files
for layer in (Path(env.repo_dir) / "tenants").glob("*/timelines/*/*-*_*"):
for layer in (Path(env.pageserver.workdir) / "tenants").glob("*/timelines/*/*-*_*"):
log.info(f"unlinking layer {layer}")
layer.unlink()
@@ -367,7 +367,7 @@ def test_download_remote_layers_api(
# remove all the layer files
# XXX only delete some of the layer files, to show that it really just downloads all the layers
for layer in (Path(env.repo_dir) / "tenants").glob("*/timelines/*/*-*_*"):
for layer in (Path(env.pageserver.workdir) / "tenants").glob("*/timelines/*/*-*_*"):
log.info(f"unlinking layer {layer.name}")
layer.unlink()

View File

@@ -17,13 +17,13 @@ from fixtures.utils import wait_until
def test_pageserver_init_node_id(
neon_simple_env: NeonEnv, neon_binpath: Path, pg_distrib_dir: Path
):
repo_dir = neon_simple_env.repo_dir
pageserver_config = repo_dir / "pageserver.toml"
workdir = neon_simple_env.pageserver.workdir
pageserver_config = workdir / "pageserver.toml"
pageserver_bin = neon_binpath / "pageserver"
def run_pageserver(args):
return subprocess.run(
[str(pageserver_bin), "-D", str(repo_dir), *args],
[str(pageserver_bin), "-D", str(workdir), *args],
check=False,
universal_newlines=True,
stdout=subprocess.PIPE,

View File

@@ -35,5 +35,5 @@ def test_read_request_tracing(neon_env_builder: NeonEnvBuilder):
# Stop postgres so we drop the connection and flush the traces
endpoint.stop()
trace_path = env.repo_dir / "traces" / str(tenant_id) / str(timeline_id)
trace_path = env.pageserver.workdir / "traces" / str(tenant_id) / str(timeline_id)
assert trace_path.exists()

View File

@@ -137,7 +137,7 @@ def test_remote_storage_backup_and_restore(
env.endpoints.stop_all()
env.pageserver.stop()
dir_to_clear = Path(env.repo_dir) / "tenants"
dir_to_clear = Path(env.pageserver.workdir) / "tenants"
shutil.rmtree(dir_to_clear)
os.mkdir(dir_to_clear)
@@ -353,7 +353,7 @@ def test_remote_storage_upload_queue_retries(
env.pageserver.stop(immediate=True)
env.endpoints.stop_all()
dir_to_clear = Path(env.repo_dir) / "tenants"
dir_to_clear = Path(env.pageserver.workdir) / "tenants"
shutil.rmtree(dir_to_clear)
os.mkdir(dir_to_clear)
@@ -488,7 +488,7 @@ def test_remote_timeline_client_calls_started_metric(
env.pageserver.stop(immediate=True)
env.endpoints.stop_all()
dir_to_clear = Path(env.repo_dir) / "tenants"
dir_to_clear = Path(env.pageserver.workdir) / "tenants"
shutil.rmtree(dir_to_clear)
os.mkdir(dir_to_clear)

View File

@@ -299,7 +299,7 @@ def test_creating_tenant_conf_after_attach(neon_env_builder: NeonEnvBuilder):
# tenant is created with defaults, as in without config file
(tenant_id, timeline_id) = env.neon_cli.create_tenant()
config_path = env.repo_dir / "tenants" / str(tenant_id) / "config"
config_path = env.pageserver.workdir / "tenants" / str(tenant_id) / "config"
assert config_path.exists(), "config file is always initially created"
http_client = env.pageserver.http_client()

View File

@@ -1,7 +1,6 @@
import enum
import os
import shutil
from pathlib import Path
import pytest
from fixtures.log_helper import log
@@ -367,7 +366,7 @@ def test_tenant_delete_is_resumed_on_attach(
env.endpoints.stop_all()
env.pageserver.stop()
dir_to_clear = Path(env.repo_dir) / "tenants"
dir_to_clear = env.pageserver.workdir / "tenants"
shutil.rmtree(dir_to_clear)
os.mkdir(dir_to_clear)

View File

@@ -289,7 +289,7 @@ def test_tenant_detach_smoke(neon_env_builder: NeonEnvBuilder):
)
# assert tenant exists on disk
assert (env.repo_dir / "tenants" / str(tenant_id)).exists()
assert (env.pageserver.workdir / "tenants" / str(tenant_id)).exists()
endpoint = env.endpoints.create_start("main", tenant_id=tenant_id)
# we rely upon autocommit after each statement
@@ -332,7 +332,7 @@ def test_tenant_detach_smoke(neon_env_builder: NeonEnvBuilder):
log.info("gc thread returned")
# check that nothing is left on disk for deleted tenant
assert not (env.repo_dir / "tenants" / str(tenant_id)).exists()
assert not (env.pageserver.workdir / "tenants" / str(tenant_id)).exists()
with pytest.raises(
expected_exception=PageserverApiException, match=f"NotFound: tenant {tenant_id}"
@@ -357,7 +357,7 @@ def test_tenant_detach_ignored_tenant(neon_simple_env: NeonEnv):
)
# assert tenant exists on disk
assert (env.repo_dir / "tenants" / str(tenant_id)).exists()
assert (env.pageserver.workdir / "tenants" / str(tenant_id)).exists()
endpoint = env.endpoints.create_start("main", tenant_id=tenant_id)
# we rely upon autocommit after each statement
@@ -386,7 +386,7 @@ def test_tenant_detach_ignored_tenant(neon_simple_env: NeonEnv):
log.info("ignored tenant detached without error")
# check that nothing is left on disk for deleted tenant
assert not (env.repo_dir / "tenants" / str(tenant_id)).exists()
assert not (env.pageserver.workdir / "tenants" / str(tenant_id)).exists()
# assert the tenant does not exists in the Pageserver
tenants_after_detach = [tenant["id"] for tenant in client.tenant_list()]
@@ -413,7 +413,7 @@ def test_tenant_detach_regular_tenant(neon_simple_env: NeonEnv):
)
# assert tenant exists on disk
assert (env.repo_dir / "tenants" / str(tenant_id)).exists()
assert (env.pageserver.workdir / "tenants" / str(tenant_id)).exists()
endpoint = env.endpoints.create_start("main", tenant_id=tenant_id)
# we rely upon autocommit after each statement
@@ -430,7 +430,7 @@ def test_tenant_detach_regular_tenant(neon_simple_env: NeonEnv):
log.info("regular tenant detached without error")
# check that nothing is left on disk for deleted tenant
assert not (env.repo_dir / "tenants" / str(tenant_id)).exists()
assert not (env.pageserver.workdir / "tenants" / str(tenant_id)).exists()
# assert the tenant does not exists in the Pageserver
tenants_after_detach = [tenant["id"] for tenant in client.tenant_list()]
@@ -531,7 +531,7 @@ def test_ignored_tenant_reattach(
pageserver_http = env.pageserver.http_client()
ignored_tenant_id, _ = env.neon_cli.create_tenant()
tenant_dir = env.repo_dir / "tenants" / str(ignored_tenant_id)
tenant_dir = env.pageserver.workdir / "tenants" / str(ignored_tenant_id)
tenants_before_ignore = [tenant["id"] for tenant in pageserver_http.tenant_list()]
tenants_before_ignore.sort()
timelines_before_ignore = [

View File

@@ -561,7 +561,7 @@ def test_emergency_relocate_with_branches_slow_replay(
# simpler than initializing a new one from scratch, but the effect on the single tenant
# is the same.
env.pageserver.stop(immediate=True)
shutil.rmtree(Path(env.repo_dir) / "tenants" / str(tenant_id))
shutil.rmtree(env.pageserver.workdir / "tenants" / str(tenant_id))
env.pageserver.start()
# This fail point will pause the WAL ingestion on the main branch, after the
@@ -709,7 +709,7 @@ def test_emergency_relocate_with_branches_createdb(
# Kill the pageserver, remove the tenant directory, and restart
env.pageserver.stop(immediate=True)
shutil.rmtree(Path(env.repo_dir) / "tenants" / str(tenant_id))
shutil.rmtree(env.pageserver.workdir / "tenants" / str(tenant_id))
env.pageserver.start()
# Wait before ingesting the WAL for CREATE DATABASE on the main branch. The original

View File

@@ -27,7 +27,7 @@ from prometheus_client.samples import Sample
def test_tenant_creation_fails(neon_simple_env: NeonEnv):
tenants_dir = Path(neon_simple_env.repo_dir) / "tenants"
tenants_dir = Path(neon_simple_env.pageserver.workdir) / "tenants"
initial_tenants = sorted(
map(lambda t: t.split()[0], neon_simple_env.neon_cli.list_tenants().stdout.splitlines())
)
@@ -322,7 +322,10 @@ def test_pageserver_with_empty_tenants(
files_in_timelines_dir = sum(
1
for _p in Path.iterdir(
Path(env.repo_dir) / "tenants" / str(tenant_with_empty_timelines) / "timelines"
Path(env.pageserver.workdir)
/ "tenants"
/ str(tenant_with_empty_timelines)
/ "timelines"
)
)
assert (
@@ -334,7 +337,9 @@ def test_pageserver_with_empty_tenants(
env.pageserver.stop()
tenant_without_timelines_dir = env.initial_tenant
shutil.rmtree(Path(env.repo_dir) / "tenants" / str(tenant_without_timelines_dir) / "timelines")
shutil.rmtree(
Path(env.pageserver.workdir) / "tenants" / str(tenant_without_timelines_dir) / "timelines"
)
env.pageserver.start()

View File

@@ -179,7 +179,9 @@ def test_tenants_attached_after_download(
env.pageserver.stop()
timeline_dir = Path(env.repo_dir) / "tenants" / str(tenant_id) / "timelines" / str(timeline_id)
timeline_dir = (
Path(env.pageserver.workdir) / "tenants" / str(tenant_id) / "timelines" / str(timeline_id)
)
local_layer_deleted = False
for path in Path.iterdir(timeline_dir):
if path.name.startswith("00000"):

View File

@@ -73,7 +73,11 @@ def test_timeline_delete(neon_simple_env: NeonEnv):
)
timeline_path = (
env.repo_dir / "tenants" / str(env.initial_tenant) / "timelines" / str(parent_timeline_id)
env.pageserver.workdir
/ "tenants"
/ str(env.initial_tenant)
/ "timelines"
/ str(parent_timeline_id)
)
with pytest.raises(
@@ -86,7 +90,11 @@ def test_timeline_delete(neon_simple_env: NeonEnv):
assert exc.value.status_code == 412
timeline_path = (
env.repo_dir / "tenants" / str(env.initial_tenant) / "timelines" / str(leaf_timeline_id)
env.pageserver.workdir
/ "tenants"
/ str(env.initial_tenant)
/ "timelines"
/ str(leaf_timeline_id)
)
assert timeline_path.exists()
@@ -406,7 +414,7 @@ def test_timeline_resurrection_on_attach(
env.endpoints.stop_all()
env.pageserver.stop()
dir_to_clear = Path(env.repo_dir) / "tenants"
dir_to_clear = Path(env.pageserver.workdir) / "tenants"
shutil.rmtree(dir_to_clear)
os.mkdir(dir_to_clear)
@@ -458,7 +466,11 @@ def test_timeline_delete_fail_before_local_delete(neon_env_builder: NeonEnvBuild
)
leaf_timeline_path = (
env.repo_dir / "tenants" / str(env.initial_tenant) / "timelines" / str(leaf_timeline_id)
env.pageserver.workdir
/ "tenants"
/ str(env.initial_tenant)
/ "timelines"
/ str(leaf_timeline_id)
)
ps_http.timeline_delete(env.initial_tenant, leaf_timeline_id)
@@ -907,7 +919,7 @@ def test_timeline_delete_resumed_on_attach(
env.endpoints.stop_all()
env.pageserver.stop()
dir_to_clear = Path(env.repo_dir) / "tenants"
dir_to_clear = Path(env.pageserver.workdir) / "tenants"
shutil.rmtree(dir_to_clear)
os.mkdir(dir_to_clear)

View File

@@ -29,7 +29,7 @@ def test_wal_restore(
endpoint.safe_psql("create table t as select generate_series(1,300000)")
tenant_id = TenantId(endpoint.safe_psql("show neon.tenant_id")[0][0])
timeline_id = TimelineId(endpoint.safe_psql("show neon.timeline_id")[0][0])
env.neon_cli.pageserver_stop()
env.pageserver.stop()
port = port_distributor.get_port()
data_dir = test_output_dir / "pgsql.restored"
with VanillaPostgres(

View File

@@ -27,7 +27,7 @@ def test_walredo_not_left_behind_on_detach(neon_env_builder: NeonEnvBuilder):
env.pageserver.allowed_errors.append(".*NotFound: tenant.*")
pageserver_http = env.pageserver.http_client()
pagserver_pid = int((env.repo_dir / "pageserver.pid").read_text())
pagserver_pid = int((env.pageserver.workdir / "pageserver.pid").read_text())
assert_child_processes(pagserver_pid, wal_redo_present=False, defunct_present=False)
@@ -43,7 +43,7 @@ def test_walredo_not_left_behind_on_detach(neon_env_builder: NeonEnvBuilder):
tenant_id, _ = env.neon_cli.create_tenant()
# assert tenant exists on disk
assert (env.repo_dir / "tenants" / str(tenant_id)).exists()
assert (env.pageserver.workdir / "tenants" / str(tenant_id)).exists()
endpoint = env.endpoints.create_start("main", tenant_id=tenant_id)
@@ -101,7 +101,7 @@ def test_walredo_not_left_behind_on_detach(neon_env_builder: NeonEnvBuilder):
pytest.fail(f"could not detach tenant: {last_error}")
# check that nothing is left on disk for deleted tenant
assert not (env.repo_dir / "tenants" / str(tenant_id)).exists()
assert not (env.pageserver.workdir / "tenants" / str(tenant_id)).exists()
# Pageserver schedules kill+wait of the WAL redo process to the background runtime,
# asynchronously to tenant detach. Cut it some slack to complete kill+wait before