Compare commits

...

11 Commits

Author SHA1 Message Date
Heikki Linnakangas
87f270ee7a wip 2024-09-12 01:36:11 +03:00
Heikki Linnakangas
bfae8ff23e WIP 2024-09-12 01:20:59 +03:00
Heikki Linnakangas
a6e64fcc90 prefix endpoint ids with test name 2024-09-12 01:20:59 +03:00
Heikki Linnakangas
390235e095 wip add NeonEnv 2024-09-12 01:20:59 +03:00
Heikki Linnakangas
7565939dce Store branch name mappings in separate branches.toml file 2024-09-12 01:20:59 +03:00
Heikki Linnakangas
cde4ea2b39 Add --timeline-id option to "neon_local timeline branch" command
Makes it consistent with the "timeline create" and "timeline import"
commands, which allowed you to pass the timeline id as argument. This
also makes it unnecessary to parse the timeline ID from the output in
the python function that calls it.
2024-09-12 01:20:59 +03:00
Heikki Linnakangas
60320e8e45 Rename test_name to env_name 2024-09-12 01:20:59 +03:00
Heikki Linnakangas
fe1fe213ba Let NeonEnv handle starting and stopping NeonBroker 2024-09-12 01:20:59 +03:00
Heikki Linnakangas
67d48ac867 Remove silly NeonEnvBuilder.start function 2024-09-12 01:20:59 +03:00
Heikki Linnakangas
4da40cbbfb initdb cache 2024-09-12 01:20:59 +03:00
Heikki Linnakangas
8e7f336540 Cache initdb output to speed up tenant creation in tests
initdb takes about 1 s. Our tests create and destroy a lot of tenants,
so that adds up. Cache the initdb result to speed it up.

This is currently only enabled in tests. Out of caution, mostly. But
also because when you reuse the initdb result, all the postgres
clusters end up having the same system_identifier, which is supposed
to be unique. It's not necessary for it to be unique for correctness,
nothing critical relies on it and you can easily end up with duplicate
system_identifiers in standalone PostgreSQL too, if you e.g. create a
backup and restore it on a different system. But it is used in various
checks to reduce the chance that you e.g. accidentally apply WAL
belonging to a different cluster.

Because this is aimed at tests, there are a few things that might be
surprising:

- The initdb cache directory is configurable, and can be outside the
  pageserver's repo directory. This allows reuse across different
  pageservers running on the same host. In production use, that'd be
  pointless, but our tests create a lot of pageservers.

- The cache is not automatically purged at start / shutdown. For
  production use, we'd probably want that, so that we'd pick up any
  changes in what an empty cluster looks like after a Postgres minor
  version upgrade, for example. But again tests create and destroy a
  lot of pageservers, so it's important to retain the cache.

- The locking on the cache directory relies purely on filesystem
  operations and atomic rename(). Using e.g. a rust Mutex() would be
  more straightforward, but that's not enough because the cache needs
  to be shared between different pageservers running on the same
  system.
2024-09-12 01:20:54 +03:00
46 changed files with 995 additions and 509 deletions

View File

@@ -90,8 +90,11 @@ fn main() -> Result<()> {
handle_init(sub_args).map(Some)
} else {
// all other commands need an existing config
let mut env =
LocalEnv::load_config(&local_env::base_path()).context("Error loading config")?;
let base_path = local_env::base_path();
let branch_mappings_path = local_env::branch_mappings_path();
let mut env = LocalEnv::load_config(&base_path, Some(&branch_mappings_path))
.context("Error loading config")?;
let original_env = env.clone();
let rt = tokio::runtime::Builder::new_current_thread()
@@ -264,7 +267,7 @@ async fn get_timeline_infos(
fn get_tenant_id(sub_match: &ArgMatches, env: &local_env::LocalEnv) -> anyhow::Result<TenantId> {
if let Some(tenant_id_from_arguments) = parse_tenant_id(sub_match).transpose() {
tenant_id_from_arguments
} else if let Some(default_id) = env.default_tenant_id {
} else if let Some(default_id) = env.branch_mappings.default_tenant_id {
Ok(default_id)
} else {
anyhow::bail!("No tenant id. Use --tenant-id, or set a default tenant");
@@ -278,7 +281,7 @@ fn get_tenant_shard_id(
) -> anyhow::Result<TenantShardId> {
if let Some(tenant_id_from_arguments) = parse_tenant_shard_id(sub_match).transpose() {
tenant_id_from_arguments
} else if let Some(default_id) = env.default_tenant_id {
} else if let Some(default_id) = env.branch_mappings.default_tenant_id {
Ok(TenantShardId::unsharded(default_id))
} else {
anyhow::bail!("No tenant shard id. Use --tenant-id, or set a default tenant");
@@ -360,7 +363,6 @@ fn handle_init(init_match: &ArgMatches) -> anyhow::Result<LocalEnv> {
.collect(),
pg_distrib_dir: None,
neon_distrib_dir: None,
default_tenant_id: TenantId::from_array(std::array::from_fn(|_| 0)),
storage_controller: None,
control_plane_compute_hook_api: None,
}
@@ -368,8 +370,12 @@ fn handle_init(init_match: &ArgMatches) -> anyhow::Result<LocalEnv> {
LocalEnv::init(init_conf, force)
.context("materialize initial neon_local environment on disk")?;
Ok(LocalEnv::load_config(&local_env::base_path())
.expect("freshly written config should be loadable"))
let base_path = local_env::base_path();
let branch_mappings_path = local_env::branch_mappings_path();
let env = LocalEnv::load_config(&base_path, Some(&branch_mappings_path))
.expect("freshly written config should be loadable");
Ok(env)
}
/// The default pageserver is the one where CLI tenant/timeline operations are sent by default.
@@ -525,14 +531,14 @@ async fn handle_tenant(
if create_match.get_flag("set-default") {
println!("Setting tenant {tenant_id} as a default one");
env.default_tenant_id = Some(tenant_id);
env.branch_mappings.default_tenant_id = Some(tenant_id);
}
}
Some(("set-default", set_default_match)) => {
let tenant_id =
parse_tenant_id(set_default_match)?.context("No tenant id specified")?;
println!("Setting tenant {tenant_id} as a default one");
env.default_tenant_id = Some(tenant_id);
env.branch_mappings.default_tenant_id = Some(tenant_id);
}
Some(("config", create_match)) => {
let tenant_id = get_tenant_id(create_match, env)?;
@@ -640,6 +646,8 @@ async fn handle_timeline(timeline_match: &ArgMatches, env: &mut local_env::Local
}
Some(("branch", branch_match)) => {
let tenant_id = get_tenant_id(branch_match, env)?;
let new_timeline_id =
parse_timeline_id(branch_match)?.unwrap_or(TimelineId::generate());
let new_branch_name = branch_match
.get_one::<String>("branch-name")
.ok_or_else(|| anyhow!("No branch name provided"))?;
@@ -658,7 +666,6 @@ async fn handle_timeline(timeline_match: &ArgMatches, env: &mut local_env::Local
.map(|lsn_str| Lsn::from_str(lsn_str))
.transpose()
.context("Failed to parse ancestor start Lsn from the request")?;
let new_timeline_id = TimelineId::generate();
let storage_controller = StorageController::from_env(env);
let create_req = TimelineCreateRequest {
new_timeline_id,
@@ -692,6 +699,7 @@ async fn handle_endpoint(ep_match: &ArgMatches, env: &local_env::LocalEnv) -> Re
Some(ep_subcommand_data) => ep_subcommand_data,
None => bail!("no endpoint subcommand provided"),
};
let mut cplane = ComputeControlPlane::load(env.clone())?;
match sub_name {
@@ -1359,6 +1367,7 @@ async fn handle_stop_all(sub_match: &ArgMatches, env: &local_env::LocalEnv) -> R
async fn try_stop_all(env: &local_env::LocalEnv, immediate: bool) {
// Stop all endpoints
// NOTE: This only knows about endpoints in the default endpoints dir
match ComputeControlPlane::load(env.clone()) {
Ok(cplane) => {
for (_k, node) in cplane.endpoints {
@@ -1419,6 +1428,18 @@ fn cli() -> Command {
.default_value("10s")
.required(false);
let branch_mappings_arg = Arg::new("branch-mappings")
.long("branch-mappings")
.help("File holding all branch names. Default is <repo dir>/branches.toml")
.value_parser(value_parser!(PathBuf))
.required(false);
let endpoints_dir_arg = Arg::new("endpoints-dir")
.long("endpoints-dir")
.help("Path to directory holding all endpoints. Default is <repo dir>/endpoints")
.value_parser(value_parser!(PathBuf))
.required(false);
let branch_name_arg = Arg::new("branch-name")
.long("branch-name")
.help("Name of the branch to be created or used as an alias for other services")
@@ -1559,6 +1580,8 @@ fn cli() -> Command {
Command::new("Neon CLI")
.arg_required_else_help(true)
.version(GIT_VERSION)
.arg(branch_mappings_arg)
.arg(endpoints_dir_arg)
.subcommand(
Command::new("init")
.about("Initialize a new Neon repository, preparing configs for services to start with")
@@ -1570,7 +1593,6 @@ fn cli() -> Command {
.value_parser(value_parser!(PathBuf))
.value_name("config")
)
.arg(pg_version_arg.clone())
.arg(force_arg)
)
.subcommand(
@@ -1583,6 +1605,7 @@ fn cli() -> Command {
.subcommand(Command::new("branch")
.about("Create a new timeline, using another timeline as a base, copying its data")
.arg(tenant_id_arg.clone())
.arg(timeline_id_arg.clone())
.arg(branch_name_arg.clone())
.arg(Arg::new("ancestor-branch-name").long("ancestor-branch-name")
.help("Use last Lsn of another timeline (and its data) as base when creating the new timeline. The timeline gets resolved by its branch name.").required(false))

View File

@@ -46,6 +46,11 @@ pub struct LocalEnv {
// must be an absolute path. If the env var is not set, $PWD/.neon is used.
pub base_data_dir: PathBuf,
// Similarly, path to branch mappings file. Not stored in the config file but
// read from the NEON_BRANCH_MAPPINGS env variable. "None" means no mappings
// are loaded, and they cannot be saved either.
pub branch_name_mappings_path: Option<PathBuf>,
// Path to postgres distribution. It's expected that "bin", "include",
// "lib", "share" from postgres distribution are there. If at some point
// in time we will be able to run against vanilla postgres we may split that
@@ -55,10 +60,6 @@ pub struct LocalEnv {
// Path to pageserver binary.
pub neon_distrib_dir: PathBuf,
// Default tenant ID to use with the 'neon_local' command line utility, when
// --tenant_id is not explicitly specified.
pub default_tenant_id: Option<TenantId>,
// used to issue tokens during e.g pg start
pub private_key_path: PathBuf,
@@ -82,11 +83,7 @@ pub struct LocalEnv {
// storage controller's configuration.
pub control_plane_compute_hook_api: Option<Url>,
/// Keep human-readable aliases in memory (and persist them to config), to hide ZId hex strings from the user.
// A `HashMap<String, HashMap<TenantId, TimelineId>>` would be more appropriate here,
// but deserialization into a generic toml object as `toml::Value::try_from` fails with an error.
// https://toml.io/en/v1.0.0 does not contain a concept of "a table inside another table".
pub branch_name_mappings: HashMap<String, Vec<(TenantId, TimelineId)>>,
pub branch_mappings: BranchMappings,
}
/// On-disk state stored in `.neon/config`.
@@ -95,7 +92,6 @@ pub struct LocalEnv {
pub struct OnDiskConfig {
pub pg_distrib_dir: PathBuf,
pub neon_distrib_dir: PathBuf,
pub default_tenant_id: Option<TenantId>,
pub private_key_path: PathBuf,
pub broker: NeonBroker,
pub storage_controller: NeonStorageControllerConf,
@@ -107,7 +103,20 @@ pub struct OnDiskConfig {
pub safekeepers: Vec<SafekeeperConf>,
pub control_plane_api: Option<Url>,
pub control_plane_compute_hook_api: Option<Url>,
branch_name_mappings: HashMap<String, Vec<(TenantId, TimelineId)>>,
}
#[derive(PartialEq, Eq, Clone, Debug, Default, Serialize, Deserialize)]
#[serde(default, deny_unknown_fields)]
pub struct BranchMappings {
// Default tenant ID to use with the 'neon_local' command line utility, when
// --tenant_id is not explicitly specified. This comes from the branches.
pub default_tenant_id: Option<TenantId>,
/// Keep human-readable aliases in memory (and persist them to config XXX), to hide ZId hex strings from the user.
// A `HashMap<String, HashMap<TenantId, TimelineId>>` would be more appropriate here,
// but deserialization into a generic toml object as `toml::Value::try_from` fails with an error.
// https://toml.io/en/v1.0.0 does not contain a concept of "a table inside another table".
pub mappings: HashMap<String, Vec<(TenantId, TimelineId)>>,
}
fn fail_if_pageservers_field_specified<'de, D>(_: D) -> Result<Vec<PageServerConf>, D::Error>
@@ -128,7 +137,6 @@ pub struct NeonLocalInitConf {
pub pg_distrib_dir: Option<PathBuf>,
// TODO: do we need this? Seems unused
pub neon_distrib_dir: Option<PathBuf>,
pub default_tenant_id: TenantId,
pub broker: NeonBroker,
pub storage_controller: Option<NeonStorageControllerConf>,
pub pageservers: Vec<NeonLocalInitPageserverConf>,
@@ -443,7 +451,8 @@ impl LocalEnv {
timeline_id: TimelineId,
) -> anyhow::Result<()> {
let existing_values = self
.branch_name_mappings
.branch_mappings
.mappings
.entry(branch_name.clone())
.or_default();
@@ -468,7 +477,8 @@ impl LocalEnv {
branch_name: &str,
tenant_id: TenantId,
) -> Option<TimelineId> {
self.branch_name_mappings
self.branch_mappings
.mappings
.get(branch_name)?
.iter()
.find(|(mapped_tenant_id, _)| mapped_tenant_id == &tenant_id)
@@ -477,7 +487,8 @@ impl LocalEnv {
}
pub fn timeline_name_mappings(&self) -> HashMap<TenantTimelineId, String> {
self.branch_name_mappings
self.branch_mappings
.mappings
.iter()
.flat_map(|(name, tenant_timelines)| {
tenant_timelines.iter().map(|&(tenant_id, timeline_id)| {
@@ -488,7 +499,10 @@ impl LocalEnv {
}
/// Construct `Self` from on-disk state.
pub fn load_config(repopath: &Path) -> anyhow::Result<Self> {
pub fn load_config(
repopath: &Path,
branch_name_mappings_path: Option<&Path>,
) -> anyhow::Result<Self> {
if !repopath.exists() {
bail!(
"Neon config is not found in {}. You need to run 'neon_local init' first",
@@ -498,37 +512,43 @@ impl LocalEnv {
// TODO: check that it looks like a neon repository
// load and parse file
// load and parse config file
let config_file_contents = fs::read_to_string(repopath.join("config"))?;
let on_disk_config: OnDiskConfig = toml::from_str(config_file_contents.as_str())?;
let mut env = {
let OnDiskConfig {
pg_distrib_dir,
neon_distrib_dir,
default_tenant_id,
private_key_path,
broker,
storage_controller,
pageservers,
safekeepers,
control_plane_api,
control_plane_compute_hook_api,
branch_name_mappings,
} = on_disk_config;
LocalEnv {
base_data_dir: repopath.to_owned(),
pg_distrib_dir,
neon_distrib_dir,
default_tenant_id,
private_key_path,
broker,
storage_controller,
pageservers,
safekeepers,
control_plane_api,
control_plane_compute_hook_api,
branch_name_mappings,
}
let OnDiskConfig {
pg_distrib_dir,
neon_distrib_dir,
private_key_path,
broker,
storage_controller,
pageservers,
safekeepers,
control_plane_api,
control_plane_compute_hook_api,
} = on_disk_config;
// load and parse "branches.toml" file
let branch_mappings = if let Some(path) = branch_name_mappings_path {
let contents = fs::read_to_string(path)
.context(format!("load branch mappings file {}", path.display()))?;
toml::from_str::<BranchMappings>(contents.as_str())?
} else {
BranchMappings::default()
};
let mut env = LocalEnv {
base_data_dir: repopath.to_owned(),
branch_name_mappings_path: branch_name_mappings_path.map(|p| p.to_owned()),
pg_distrib_dir,
neon_distrib_dir,
private_key_path,
broker,
storage_controller,
pageservers,
safekeepers,
control_plane_api,
control_plane_compute_hook_api,
branch_mappings,
};
// The source of truth for pageserver configuration is the pageserver.toml.
@@ -618,7 +638,6 @@ impl LocalEnv {
&OnDiskConfig {
pg_distrib_dir: self.pg_distrib_dir.clone(),
neon_distrib_dir: self.neon_distrib_dir.clone(),
default_tenant_id: self.default_tenant_id,
private_key_path: self.private_key_path.clone(),
broker: self.broker.clone(),
storage_controller: self.storage_controller.clone(),
@@ -626,9 +645,20 @@ impl LocalEnv {
safekeepers: self.safekeepers.clone(),
control_plane_api: self.control_plane_api.clone(),
control_plane_compute_hook_api: self.control_plane_compute_hook_api.clone(),
branch_name_mappings: self.branch_name_mappings.clone(),
},
)
)?;
if let Some(path) = &self.branch_name_mappings_path {
Self::persist_branches_impl(path, &self.branch_mappings)?;
} else {
if !self.branch_mappings.mappings.is_empty() {
tracing::warn!("command created a branch mapping, but it was not saved because no mappings file was configured")
} else if self.branch_mappings.default_tenant_id.is_some() {
tracing::warn!("command created a tenant default, but it was not saved because no mappings file was configured")
}
}
Ok(())
}
pub fn persist_config_impl(base_path: &Path, config: &OnDiskConfig) -> anyhow::Result<()> {
@@ -642,6 +672,19 @@ impl LocalEnv {
})
}
pub fn persist_branches_impl(
branch_name_mappings_path: &Path,
branch_mappings: &BranchMappings,
) -> anyhow::Result<()> {
let content = &toml::to_string_pretty(branch_mappings)?;
fs::write(branch_name_mappings_path, content).with_context(|| {
format!(
"Failed to write branch information into path '{}'",
branch_name_mappings_path.display()
)
})
}
// this function is used only for testing purposes in CLI e g generate tokens during init
pub fn generate_auth_token(&self, claims: &Claims) -> anyhow::Result<String> {
let private_key_path = self.get_private_key_path();
@@ -702,7 +745,6 @@ impl LocalEnv {
let NeonLocalInitConf {
pg_distrib_dir,
neon_distrib_dir,
default_tenant_id,
broker,
storage_controller,
pageservers,
@@ -746,9 +788,9 @@ impl LocalEnv {
// TODO: refactor to avoid this, LocalEnv should only be constructed from on-disk state
let env = LocalEnv {
base_data_dir: base_path.clone(),
branch_name_mappings_path: Some(base_path.join("branches.toml")),
pg_distrib_dir,
neon_distrib_dir,
default_tenant_id: Some(default_tenant_id),
private_key_path,
broker,
storage_controller: storage_controller.unwrap_or_default(),
@@ -756,10 +798,10 @@ impl LocalEnv {
safekeepers,
control_plane_api: control_plane_api.unwrap_or_default(),
control_plane_compute_hook_api: control_plane_compute_hook_api.unwrap_or_default(),
branch_name_mappings: Default::default(),
branch_mappings: Default::default(),
};
// create endpoints dir
// create the default endpoints dir
fs::create_dir_all(env.endpoints_path())?;
// create safekeeper dirs
@@ -806,6 +848,24 @@ pub fn base_path() -> PathBuf {
path
}
pub fn branch_mappings_path() -> PathBuf {
let path = match std::env::var_os("NEON_BRANCH_MAPPINGS") {
Some(val) => {
let path = PathBuf::from(val);
// a relative path is relative to repo dir
if !path.is_absolute() {
base_path().join(path)
} else {
path
}
}
None => base_path().join("branches.toml"),
};
assert!(path.is_absolute());
path
}
/// Generate a public/private key pair for JWT authentication
fn generate_auth_keys(private_key_path: &Path, public_key_path: &Path) -> anyhow::Result<()> {
// Generate the key pair

View File

@@ -64,6 +64,7 @@ pub struct ConfigToml {
#[serde(with = "humantime_serde")]
pub wal_redo_timeout: Duration,
pub superuser: String,
pub initdb_cache_dir: Option<Utf8PathBuf>,
pub page_cache_size: usize,
pub max_file_descriptors: usize,
pub pg_distrib_dir: Option<Utf8PathBuf>,
@@ -358,6 +359,7 @@ impl Default for ConfigToml {
wal_redo_timeout: (humantime::parse_duration(DEFAULT_WAL_REDO_TIMEOUT)
.expect("cannot parse default wal redo timeout")),
superuser: (DEFAULT_SUPERUSER.to_string()),
initdb_cache_dir: None,
page_cache_size: (DEFAULT_PAGE_CACHE_SIZE),
max_file_descriptors: (DEFAULT_MAX_FILE_DESCRIPTORS),
pg_distrib_dir: None, // Utf8PathBuf::from("./pg_install"), // TODO: formely, this was std::env::current_dir()

View File

@@ -71,6 +71,8 @@ pub struct PageServerConf {
pub superuser: String,
pub initdb_cache_dir: Option<Utf8PathBuf>,
pub page_cache_size: usize,
pub max_file_descriptors: usize,
@@ -309,6 +311,7 @@ impl PageServerConf {
wait_lsn_timeout,
wal_redo_timeout,
superuser,
initdb_cache_dir,
page_cache_size,
max_file_descriptors,
pg_distrib_dir,
@@ -356,6 +359,7 @@ impl PageServerConf {
wait_lsn_timeout,
wal_redo_timeout,
superuser,
initdb_cache_dir,
page_cache_size,
max_file_descriptors,
http_auth_type,

View File

@@ -3491,7 +3491,7 @@ impl Tenant {
.context("extract initdb tar")?;
} else {
// Init temporarily repo to get bootstrap data, this creates a directory in the `pgdata_path` path
run_initdb(self.conf, &pgdata_path, pg_version, &self.cancel).await?;
run_initdb_with_cache(self.conf, &pgdata_path, pg_version, &self.cancel).await?;
// Upload the created data dir to S3
if self.tenant_shard_id().is_shard_zero() {
@@ -3534,6 +3534,7 @@ impl Tenant {
// Flush loop needs to be spawned in order to be able to flush.
unfinished_timeline.maybe_spawn_flush_loop();
info!("starting to import from datadir");
import_datadir::import_timeline_from_postgres_datadir(
unfinished_timeline,
&pgdata_path,
@@ -3549,6 +3550,7 @@ impl Tenant {
anyhow::bail!("failpoint before-checkpoint-new-timeline");
});
info!("calling freeze_and_flush");
unfinished_timeline
.freeze_and_flush()
.await
@@ -3559,7 +3561,9 @@ impl Tenant {
})?;
// All done!
info!("calling finish_creation");
let timeline = raw_timeline.finish_creation()?;
info!("call to finish_creation done");
Ok(timeline)
}
@@ -3837,6 +3841,118 @@ impl Tenant {
}
}
fn cached_initdb_dirname(initial_superuser_name: &str, pg_version: u32) -> String
{
use std::hash::Hash;
use std::hash::Hasher;
use std::collections::hash_map::DefaultHasher;
let mut hasher = DefaultHasher::new();
initial_superuser_name.hash(&mut hasher);
let hash = hasher.finish();
format!("cached_initial_pgdata_{pg_version}_{:016}", hash)
}
fn copy_dir_all(src: impl AsRef<std::path::Path>, dst: impl AsRef<std::path::Path>) -> std::io::Result<()> {
for entry in fs::read_dir(src.as_ref())? {
let entry = entry?;
let subsrc = entry.path();
let subdst = dst.as_ref().join(&entry.file_name());
if entry.file_type()?.is_dir() {
std::fs::create_dir(&subdst)?;
copy_dir_all(&subsrc, &subdst)?;
} else {
std::fs::copy(&subsrc, &subdst)?;
}
}
Ok(())
}
fn restore_cached_initdb_dir(
cached_path: &Utf8Path,
target_path: &Utf8Path,
) -> anyhow::Result<bool> {
if !cached_path.exists() {
info!("cached initdb dir \"{cached_path}\" does not exist yet");
return Ok(false);
}
std::fs::create_dir(target_path)?;
copy_dir_all(cached_path, target_path)?;
info!("restored initdb result from cache dir \"{cached_path}\"");
Ok(true)
}
fn save_cached_initdb_dir(
src_path: &Utf8Path,
cache_path: &Utf8Path,
) -> anyhow::Result<()> {
match std::fs::create_dir(cache_path) {
Ok(()) => {
info!("saving initdb result to cache dir \"{cache_path}\"");
},
Err(err) if err.kind() == std::io::ErrorKind::AlreadyExists => {
info!("cache initdb dir \"{cache_path}\" already exists, not saving");
return Ok(())
},
Err(err) => { return Err(anyhow::Error::from(err))},
};
let cache_dir_guard = scopeguard::guard(cache_path, |cp| {
if let Err(err) = std::fs::remove_dir_all(&cp) {
error!("could not remove cached initdb directory {cp}: {err}");
}
});
let cache_parent_path = cache_path.parent().ok_or(anyhow::Error::msg("no cache parent path"))?;
let tmp_dirpath = camino_tempfile::tempdir_in(cache_parent_path)?;
copy_dir_all(src_path, &tmp_dirpath)?;
std::fs::rename(tmp_dirpath, &*cache_dir_guard)?;
// disarm the guard
scopeguard::ScopeGuard::into_inner(cache_dir_guard);
Ok(())
}
async fn run_initdb_with_cache(
conf: &'static PageServerConf,
initdb_target_dir: &Utf8Path,
pg_version: u32,
cancel: &CancellationToken,
) -> Result<(), InitdbError> {
let cache_dir = conf.initdb_cache_dir.as_ref().map(|initdb_cache_dir| {
initdb_cache_dir.join(cached_initdb_dirname(&conf.superuser, pg_version))
});
if let Some(cache_dir) = &cache_dir {
match restore_cached_initdb_dir(&cache_dir, initdb_target_dir) {
Ok(true) => return Ok(()),
Ok(false) => {},
Err(err) => {
warn!("Error restoring from cached initdb directory \"{cache_dir}\": {err}");
if initdb_target_dir.exists() {
if let Err(err) = std::fs::remove_dir_all(&initdb_target_dir) {
error!("could not remove temporary initdb target directory {initdb_target_dir}: {err}");
}
}
},
}
}
run_initdb(conf, initdb_target_dir, pg_version, cancel).await?;
if let Some(cache_dir) = &cache_dir {
if let Err(err) = save_cached_initdb_dir(initdb_target_dir, &cache_dir) {
warn!("error saving initdb result to cache directory \"{cache_dir}\": {err}");
}
}
Ok(())
}
/// Create the cluster temporarily in 'initdbpath' directory inside the repository
/// to get bootstrap data for timeline initialization.
async fn run_initdb(

View File

@@ -292,7 +292,7 @@ impl ComputeHook {
);
return Ok(());
};
let env = match LocalEnv::load_config(repo_dir) {
let env = match LocalEnv::load_config(repo_dir, None) {
Ok(e) => e,
Err(e) => {
tracing::warn!("Couldn't load neon_local config, skipping compute update ({e})");

View File

@@ -50,7 +50,7 @@ class NeonBroker:
raise RuntimeError(
f"timed out waiting {elapsed:.0f}s for storage_broker start: {e}"
) from e
time.sleep(0.5)
time.sleep(0.1)
else:
break # success

File diff suppressed because it is too large Load Diff

View File

@@ -19,7 +19,7 @@ def pg_version() -> Optional[PgVersion]:
return None
@pytest.fixture(scope="function", autouse=True)
@pytest.fixture(scope="session", autouse=True)
def build_type() -> Optional[str]:
return None
@@ -64,7 +64,7 @@ def pytest_generate_tests(metafunc: Metafunc):
else:
build_types = [bt.lower()]
metafunc.parametrize("build_type", build_types)
metafunc.parametrize("build_type", build_types, scope="session")
if (v := os.getenv("DEFAULT_PG_VERSION")) is None:
pg_versions = [version for version in PgVersion if version != PgVersion.NOT_SET]

View File

@@ -6,8 +6,8 @@ from fixtures.neon_fixtures import (
)
def test_aux_v2_config_switch(neon_env_builder: NeonEnvBuilder, vanilla_pg):
env = neon_env_builder.init_start()
def test_aux_v2_config_switch(neon_shared_env: NeonEnv, vanilla_pg):
env = neon_shared_env
endpoint = env.endpoints.create_start("main")
client = env.pageserver.http_client()

View File

@@ -46,11 +46,11 @@ from fixtures.utils import query_scalar
# Because the delta layer D covering lsn1 is corrupted, creating a branch
# starting from lsn1 should return an error as follows:
# could not find data for key ... at LSN ..., for request at LSN ...
def test_branch_and_gc(neon_simple_env: NeonEnv, build_type: str):
def test_branch_and_gc(neon_shared_env: NeonEnv, build_type: str):
if build_type == "debug":
pytest.skip("times out in debug builds")
env = neon_simple_env
env = neon_shared_env
pageserver_http_client = env.pageserver.http_client()
tenant, _ = env.neon_cli.create_tenant(
@@ -116,8 +116,8 @@ def test_branch_and_gc(neon_simple_env: NeonEnv, build_type: str):
# and prevent creating branches with invalid starting LSNs.
#
# For more details, see discussion in https://github.com/neondatabase/neon/pull/2101#issuecomment-1185273447.
def test_branch_creation_before_gc(neon_simple_env: NeonEnv):
env = neon_simple_env
def test_branch_creation_before_gc(neon_shared_env: NeonEnv):
env = neon_shared_env
pageserver_http_client = env.pageserver.http_client()
error_regexes = [

View File

@@ -33,7 +33,7 @@ from requests import RequestException
@pytest.mark.parametrize("scale", get_scales_matrix(1))
@pytest.mark.parametrize("ty", ["cascade", "flat"])
def test_branching_with_pgbench(
neon_simple_env: NeonEnv, pg_bin: PgBin, n_branches: int, scale: int, ty: str
neon_shared_env: NeonEnv, pg_bin: PgBin, n_branches: int, scale: int, ty: str
):
env = neon_simple_env

View File

@@ -2,9 +2,8 @@ from fixtures.metrics import parse_metrics
from fixtures.neon_fixtures import NeonEnvBuilder, NeonProxy
def test_build_info_metric(neon_env_builder: NeonEnvBuilder, link_proxy: NeonProxy):
neon_env_builder.num_safekeepers = 1
env = neon_env_builder.init_start()
def test_build_info_metric(neon_shared_env: NeonEnv, link_proxy: NeonProxy):
env = neon_shared_env
parsed_metrics = {}

View File

@@ -9,8 +9,8 @@ from fixtures.utils import query_scalar
#
# Test compute node start after clog truncation
#
def test_clog_truncate(neon_simple_env: NeonEnv):
env = neon_simple_env
def test_clog_truncate(neon_shared_env: NeonEnv):
env = neon_shared_env
# set aggressive autovacuum to make sure that truncation will happen
config = [

View File

@@ -1,8 +1,8 @@
from fixtures.neon_fixtures import NeonEnvBuilder, flush_ep_to_pageserver
def do_combocid_op(neon_env_builder: NeonEnvBuilder, op):
env = neon_env_builder.init_start()
def do_combocid_op(neon_shared_env: NeonEnv, op):
env = neon_shared_env
endpoint = env.endpoints.create_start(
"main",
config_lines=[
@@ -49,20 +49,20 @@ def do_combocid_op(neon_env_builder: NeonEnvBuilder, op):
)
def test_combocid_delete(neon_env_builder: NeonEnvBuilder):
do_combocid_op(neon_env_builder, "delete from t")
def test_combocid_delete(neon_shared_env: NeonEnv):
do_combocid_op(neon_env, "delete from t")
def test_combocid_update(neon_env_builder: NeonEnvBuilder):
do_combocid_op(neon_env_builder, "update t set val=val+1")
def test_combocid_update(neon_shared_env: NeonEnv):
do_combocid_op(neon_env, "update t set val=val+1")
def test_combocid_lock(neon_env_builder: NeonEnvBuilder):
do_combocid_op(neon_env_builder, "select * from t for update")
def test_combocid_lock(neon_shared_env: NeonEnv):
do_combocid_op(neon_env, "select * from t for update")
def test_combocid_multi_insert(neon_env_builder: NeonEnvBuilder):
env = neon_env_builder.init_start()
def test_combocid_multi_insert(neon_shared_env: NeonEnv):
env = neon_shared_env
endpoint = env.endpoints.create_start(
"main",
config_lines=[
@@ -112,8 +112,8 @@ def test_combocid_multi_insert(neon_env_builder: NeonEnvBuilder):
)
def test_combocid(neon_env_builder: NeonEnvBuilder):
env = neon_env_builder.init_start()
def test_combocid(neon_shared_env: NeonEnv):
env = neon_shared_env
endpoint = env.endpoints.create_start("main")
conn = endpoint.connect()

View File

@@ -178,7 +178,7 @@ def test_backward_compatibility(
neon_env_builder.num_safekeepers = 3
env = neon_env_builder.from_repo_dir(compatibility_snapshot_dir / "repo")
env.pageserver.allowed_errors.append(ingest_lag_log_line)
neon_env_builder.start()
env.start()
check_neon_works(
env,
@@ -265,7 +265,7 @@ def test_forward_compatibility(
# does not include logs from previous runs
assert not env.pageserver.log_contains("git-env:" + prev_pageserver_version)
neon_env_builder.start()
env.start()
# ensure the specified pageserver is running
assert env.pageserver.log_contains("git-env:" + prev_pageserver_version)

View File

@@ -2,8 +2,8 @@ import requests
from fixtures.neon_fixtures import NeonEnv
def test_compute_catalog(neon_simple_env: NeonEnv):
env = neon_simple_env
def test_compute_catalog(neon_shared_env: NeonEnv):
env = neon_shared_env
endpoint = env.endpoints.create_start("main", config_lines=["log_min_messages=debug1"])
client = endpoint.http_client()

View File

@@ -29,6 +29,27 @@ def test_config(neon_simple_env: NeonEnv):
# check that config change was applied
assert cur.fetchone() == ("debug1",)
def test_shared_config(neon_shared_env: NeonEnv):
env = neon_shared_env
# change config
endpoint = env.endpoints.create_start("main", config_lines=["log_min_messages=debug1"])
with closing(endpoint.connect()) as conn:
with conn.cursor() as cur:
cur.execute(
"""
SELECT setting
FROM pg_settings
WHERE
source != 'default'
AND source != 'override'
AND name = 'log_min_messages'
"""
)
# check that config change was applied
assert cur.fetchone() == ("debug1",)
#
# Test that reordering of safekeepers does not restart walproposer

View File

@@ -12,8 +12,8 @@ from fixtures.utils import query_scalar
# Test CREATE DATABASE when there have been relmapper changes
#
@pytest.mark.parametrize("strategy", ["file_copy", "wal_log"])
def test_createdb(neon_simple_env: NeonEnv, strategy: str):
env = neon_simple_env
def test_createdb(neon_shared_env: NeonEnv, strategy: str):
env = neon_shared_env
if env.pg_version == PgVersion.V14 and strategy == "wal_log":
pytest.skip("wal_log strategy not supported on PostgreSQL 14")
@@ -58,8 +58,8 @@ def test_createdb(neon_simple_env: NeonEnv, strategy: str):
#
# Test DROP DATABASE
#
def test_dropdb(neon_simple_env: NeonEnv, test_output_dir):
env = neon_simple_env
def test_dropdb(neon_shared_env: NeonEnv, test_output_dir):
env = neon_shared_env
endpoint = env.endpoints.create_start("main")
with endpoint.cursor() as cur:

View File

@@ -5,8 +5,8 @@ from fixtures.utils import query_scalar
#
# Test CREATE USER to check shared catalog restore
#
def test_createuser(neon_simple_env: NeonEnv):
env = neon_simple_env
def test_createuser(neon_shared_env: NeonEnv):
env = neon_shared_env
endpoint = env.endpoints.create_start("main")
with endpoint.cursor() as cur:

View File

@@ -10,11 +10,11 @@ from fixtures.neon_fixtures import NeonEnvBuilder
"💣", # calls `trigger_segfault` internally
],
)
def test_endpoint_crash(neon_env_builder: NeonEnvBuilder, sql_func: str):
def test_endpoint_crash(neon_shared_env: NeonEnv, sql_func: str):
"""
Test that triggering crash from neon_test_utils crashes the endpoint
"""
env = neon_env_builder.init_start()
env = neon_shared_env
env.neon_cli.create_branch("test_endpoint_crash")
endpoint = env.endpoints.create_start("test_endpoint_crash")

View File

@@ -1,8 +1,8 @@
from fixtures.neon_fixtures import NeonEnvBuilder
from fixtures.neon_fixtures import NeonEnv
def test_fsm_truncate(neon_env_builder: NeonEnvBuilder):
env = neon_env_builder.init_start()
def test_fsm_truncate(neon_shared_env: NeonEnv):
env = neon_shared_env
env.neon_cli.create_branch("test_fsm_truncate")
endpoint = env.endpoints.create_start("test_fsm_truncate")
endpoint.safe_psql(

View File

@@ -16,12 +16,12 @@ num_rows = 1000
# Ensure that regular postgres can start from fullbackup
def test_fullbackup(
neon_env_builder: NeonEnvBuilder,
neon_shared_env: NeonEnv,
pg_bin: PgBin,
port_distributor: PortDistributor,
test_output_dir: Path,
):
env = neon_env_builder.init_start()
env = neon_shared_env
# endpoint needs to be alive until the fullbackup so that we have
# prev_record_lsn for the vanilla_pg to start in read-write mode

View File

@@ -6,8 +6,8 @@ from fixtures.neon_fixtures import NeonEnv, wait_replica_caughtup
#
# Test that redo of XLOG_GIN_VACUUM_PAGE doesn't produce error
#
def test_gin_redo(neon_simple_env: NeonEnv):
env = neon_simple_env
def test_gin_redo(neon_shared_env: NeonEnv):
env = neon_shared_env
primary = env.endpoints.create_start(branch_name="main", endpoint_id="primary")
time.sleep(1)

View File

@@ -87,8 +87,8 @@ def test_hot_standby(neon_simple_env: NeonEnv):
sk_http.configure_failpoints(("sk-send-wal-replica-sleep", "off"))
def test_2_replicas_start(neon_simple_env: NeonEnv):
env = neon_simple_env
def test_2_replicas_start(neon_shared_env: NeonEnv):
env = neon_shared_env
with env.endpoints.create_start(
branch_name="main",
@@ -286,8 +286,8 @@ def test_hot_standby_feedback(neon_env_builder: NeonEnvBuilder, pg_bin: PgBin):
# Test race condition between WAL replay and backends performing queries
# https://github.com/neondatabase/neon/issues/7791
def test_replica_query_race(neon_simple_env: NeonEnv):
env = neon_simple_env
def test_replica_query_race(neon_shared_env: NeonEnv):
env = neon_shared_env
primary_ep = env.endpoints.create_start(
branch_name="main",

View File

@@ -12,8 +12,8 @@ from fixtures.neon_fixtures import NeonEnvBuilder
# to large (several gigabytes) layer files (both ephemeral and delta layers).
# It may cause problems with uploading to S3 and also degrade performance because ephemeral file swapping.
#
def test_large_schema(neon_env_builder: NeonEnvBuilder):
env = neon_env_builder.init_start()
def test_large_schema(neon_shared_env: NeonEnv):
env = neon_shared_env
endpoint = env.endpoints.create_start("main")

View File

@@ -14,8 +14,8 @@ from fixtures.neon_fixtures import NeonEnv, PgBin
# Test branching, when a transaction is in prepared state
#
@pytest.mark.timeout(600)
def test_lfc_resize(neon_simple_env: NeonEnv, pg_bin: PgBin):
env = neon_simple_env
def test_lfc_resize(neon_shared_env: NeonEnv, pg_bin: PgBin):
env = neon_shared_env
endpoint = env.endpoints.create_start(
"main",
config_lines=[

View File

@@ -9,8 +9,8 @@ from fixtures.neon_fixtures import NeonEnvBuilder
from fixtures.utils import query_scalar
def test_local_file_cache_unlink(neon_env_builder: NeonEnvBuilder):
env = neon_env_builder.init_start()
def test_local_file_cache_unlink(neon_shared_env: NeonEnv):
env = neon_shared_env
cache_dir = os.path.join(env.repo_dir, "file_cache")
os.mkdir(cache_dir)

View File

@@ -24,13 +24,13 @@ def assert_lsn_lease_granted(result, with_lease: bool):
@pytest.mark.parametrize("with_lease", [True, False])
def test_lsn_mapping(neon_env_builder: NeonEnvBuilder, with_lease: bool):
def test_lsn_mapping(neon_shared_env: NeonEnv, with_lease: bool):
"""
Test pageserver get_lsn_by_timestamp API.
:param with_lease: Whether to get a lease associated with returned LSN.
"""
env = neon_env_builder.init_start()
env = neon_shared_env
tenant_id, _ = env.neon_cli.create_tenant(
conf={

View File

@@ -7,8 +7,8 @@ if TYPE_CHECKING:
from fixtures.neon_fixtures import NeonEnv
def test_migrations(neon_simple_env: NeonEnv):
env = neon_simple_env
def test_migrations(neon_shared_env: NeonEnv):
env = neon_shared_env
endpoint = env.endpoints.create("main")
endpoint.respec(skip_pg_catalog_updates=False)

View File

@@ -12,8 +12,8 @@ from fixtures.utils import query_scalar
# is enough to verify that the WAL records are handled correctly
# in the pageserver.
#
def test_multixact(neon_simple_env: NeonEnv, test_output_dir):
env = neon_simple_env
def test_multixact(neon_shared_env: NeonEnv, test_output_dir):
env = neon_shared_env
endpoint = env.endpoints.create_start("main")
cur = endpoint.connect().cursor()

View File

@@ -4,8 +4,8 @@ from fixtures.pg_version import PgVersion
from fixtures.utils import wait_until
def test_neon_superuser(neon_simple_env: NeonEnv, pg_version: PgVersion):
env = neon_simple_env
def test_neon_superuser(neon_shared_env: NeonEnv, pg_version: PgVersion):
env = neon_shared_env
env.neon_cli.create_branch("test_neon_superuser_publisher", "main")
pub = env.endpoints.create("test_neon_superuser_publisher")

View File

@@ -2,8 +2,8 @@ from fixtures.log_helper import log
from fixtures.neon_fixtures import NeonEnvBuilder
def test_oid_overflow(neon_env_builder: NeonEnvBuilder):
env = neon_env_builder.init_start()
def test_oid_overflow(neon_shared_env: NeonEnv):
env = neon_shared_env
endpoint = env.endpoints.create_start("main")

View File

@@ -56,8 +56,8 @@ def check_client(env: NeonEnv, client: PageserverHttpClient):
assert TimelineId(timeline_details["timeline_id"]) == timeline_id
def test_pageserver_http_get_wal_receiver_not_found(neon_simple_env: NeonEnv):
env = neon_simple_env
def test_pageserver_http_get_wal_receiver_not_found(neon_shared_env: NeonEnv):
env = neon_shared_env
with env.pageserver.http_client() as client:
tenant_id, timeline_id = env.neon_cli.create_tenant()
@@ -105,8 +105,8 @@ def expect_updated_msg_lsn(
#
# These fields used to be returned by a separate API call, but they're part of
# `timeline_details` now.
def test_pageserver_http_get_wal_receiver_success(neon_simple_env: NeonEnv):
env = neon_simple_env
def test_pageserver_http_get_wal_receiver_success(neon_shared_env: NeonEnv):
env = neon_shared_env
with env.pageserver.http_client() as client:
tenant_id, timeline_id = env.neon_cli.create_tenant()
endpoint = env.endpoints.create_start(DEFAULT_BRANCH_NAME, tenant_id=tenant_id)

View File

@@ -39,8 +39,8 @@ async def parallel_load_same_table(endpoint: Endpoint, n_parallel: int):
# Load data into one table with COPY TO from 5 parallel connections
def test_parallel_copy(neon_simple_env: NeonEnv, n_parallel=5):
env = neon_simple_env
def test_parallel_copy(neon_shared_env: NeonEnv, n_parallel=5):
env = neon_shared_env
endpoint = env.endpoints.create_start("main")
# Create test table

View File

@@ -19,8 +19,8 @@ def check_wal_segment(pg_waldump_path: str, segment_path: str, test_output_dir):
# Simple test to check that pg_waldump works with neon WAL files
def test_pg_waldump(neon_simple_env: NeonEnv, test_output_dir, pg_bin: PgBin):
env = neon_simple_env
def test_pg_waldump(neon_shared_env: NeonEnv, test_output_dir, pg_bin: PgBin):
env = neon_shared_env
tenant_id = env.initial_tenant
timeline_id = env.initial_timeline
endpoint = env.endpoints.create_start("main")

View File

@@ -13,8 +13,8 @@ extensions = ["pageinspect", "neon_test_utils", "pg_buffercache"]
#
# Validation of reading different page versions
#
def test_read_validation(neon_simple_env: NeonEnv):
env = neon_simple_env
def test_read_validation(neon_shared_env: NeonEnv):
env = neon_shared_env
endpoint = env.endpoints.create_start("main")
with closing(endpoint.connect()) as con:
@@ -125,8 +125,8 @@ def test_read_validation(neon_simple_env: NeonEnv):
log.info(f"Caught an expected failure: {e}")
def test_read_validation_neg(neon_simple_env: NeonEnv):
env = neon_simple_env
def test_read_validation_neg(neon_shared_env: NeonEnv):
env = neon_shared_env
env.pageserver.allowed_errors.append(".*invalid LSN\\(0\\) in request.*")
endpoint = env.endpoints.create_start("main")

View File

@@ -215,8 +215,8 @@ def test_readonly_node_gc(neon_env_builder: NeonEnvBuilder):
# Similar test, but with more data, and we force checkpoints
def test_timetravel(neon_simple_env: NeonEnv):
env = neon_simple_env
def test_timetravel(neon_shared_env: NeonEnv):
env = neon_shared_env
tenant_id = env.initial_tenant
timeline_id = env.initial_timeline
client = env.pageserver.http_client()

View File

@@ -374,7 +374,7 @@ def test_sharding_split_smoke(
non_default_tenant_config = {"gc_horizon": 77 * 1024 * 1024}
env = neon_env_builder.init_configs(True)
neon_env_builder.start()
env.start()
tenant_id = TenantId.generate()
timeline_id = TimelineId.generate()
env.neon_cli.create_tenant(
@@ -1436,7 +1436,7 @@ def test_sharding_unlogged_relation(neon_env_builder: NeonEnvBuilder):
neon_env_builder.num_pageservers = 2
env = neon_env_builder.init_configs()
neon_env_builder.start()
env.start()
tenant_id = TenantId.generate()
timeline_id = TimelineId.generate()
@@ -1475,7 +1475,7 @@ def test_top_tenants(neon_env_builder: NeonEnvBuilder):
"""
env = neon_env_builder.init_configs()
neon_env_builder.start()
env.start()
tenants = []
n_tenants = 8

View File

@@ -7,8 +7,8 @@ from fixtures.utils import wait_until
# This test checks of logical replication subscriber is able to correctly restart replication without receiving duplicates.
# It requires tracking information about replication origins at page server side
def test_subscriber_restart(neon_simple_env: NeonEnv):
env = neon_simple_env
def test_subscriber_restart(neon_shared_env: NeonEnv):
env = neon_shared_env
env.neon_cli.create_branch("publisher")
pub = env.endpoints.create("publisher")
pub.start()

View File

@@ -7,8 +7,8 @@ from fixtures.neon_fixtures import NeonEnv, check_restored_datadir_content
# maintained in the pageserver, so subtransactions are not very exciting for
# Neon. They are included in the commit record though and updated in the
# CLOG.
def test_subxacts(neon_simple_env: NeonEnv, test_output_dir):
env = neon_simple_env
def test_subxacts(neon_shared_env: NeonEnv, test_output_dir):
env = neon_shared_env
endpoint = env.endpoints.create_start("main")
pg_conn = endpoint.connect()

View File

@@ -36,8 +36,8 @@ from fixtures.utils import query_scalar, run_pg_bench_small, wait_until
from urllib3.util.retry import Retry
def test_timeline_delete(neon_simple_env: NeonEnv):
env = neon_simple_env
def test_timeline_delete(neon_shared_env: NeonEnv):
env = neon_shared_env
env.pageserver.allowed_errors.extend(
[

View File

@@ -1,13 +1,13 @@
import time
from fixtures.neon_fixtures import NeonEnvBuilder
from fixtures.neon_fixtures import NeonEnv
#
# Test truncation of FSM and VM forks of a relation
#
def test_truncate(neon_env_builder: NeonEnvBuilder, zenbenchmark):
env = neon_env_builder.init_start()
def test_truncate(neon_shared_env: NeonEnv, zenbenchmark):
env = neon_shared_env
n_records = 10000
n_iter = 10

View File

@@ -7,8 +7,8 @@ from fixtures.neon_fixtures import NeonEnv, fork_at_current_lsn
#
# Test branching, when a transaction is in prepared state
#
def test_twophase(neon_simple_env: NeonEnv):
env = neon_simple_env
def test_twophase(neon_shared_env: NeonEnv):
env = neon_shared_env
endpoint = env.endpoints.create_start("main", config_lines=["max_prepared_transactions=5"])
conn = endpoint.connect()

View File

@@ -7,8 +7,8 @@ from fixtures.pg_version import PgVersion
# fork to reset them during recovery. In Neon, pageserver directly sends init
# fork contents as main fork during basebackup.
#
def test_unlogged(neon_simple_env: NeonEnv):
env = neon_simple_env
def test_unlogged(neon_shared_env: NeonEnv):
env = neon_shared_env
endpoint = env.endpoints.create_start("main")
conn = endpoint.connect()

View File

@@ -10,8 +10,8 @@ from fixtures.utils import query_scalar
# Test that the VM bit is cleared correctly at a HEAP_DELETE and
# HEAP_UPDATE record.
#
def test_vm_bit_clear(neon_simple_env: NeonEnv):
env = neon_simple_env
def test_vm_bit_clear(neon_shared_env: NeonEnv):
env = neon_shared_env
endpoint = env.endpoints.create_start("main")
@@ -114,13 +114,13 @@ def test_vm_bit_clear(neon_simple_env: NeonEnv):
assert cur_new.fetchall() == []
def test_vm_bit_clear_on_heap_lock_whitebox(neon_env_builder: NeonEnvBuilder):
def test_vm_bit_clear_on_heap_lock_whitebox(neon_shared_env: NeonEnv):
"""
Test that the ALL_FROZEN VM bit is cleared correctly at a HEAP_LOCK record.
This is a repro for the bug fixed in commit 66fa176cc8.
"""
env = neon_env_builder.init_start()
env = neon_shared_env
endpoint = env.endpoints.create_start(
"main",
config_lines=[