mirror of
https://github.com/neondatabase/neon.git
synced 2026-01-26 23:00:37 +00:00
Compare commits
11 Commits
readonly-n
...
heikki/tes
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
87f270ee7a | ||
|
|
bfae8ff23e | ||
|
|
a6e64fcc90 | ||
|
|
390235e095 | ||
|
|
7565939dce | ||
|
|
cde4ea2b39 | ||
|
|
60320e8e45 | ||
|
|
fe1fe213ba | ||
|
|
67d48ac867 | ||
|
|
4da40cbbfb | ||
|
|
8e7f336540 |
@@ -90,8 +90,11 @@ fn main() -> Result<()> {
|
||||
handle_init(sub_args).map(Some)
|
||||
} else {
|
||||
// all other commands need an existing config
|
||||
let mut env =
|
||||
LocalEnv::load_config(&local_env::base_path()).context("Error loading config")?;
|
||||
let base_path = local_env::base_path();
|
||||
let branch_mappings_path = local_env::branch_mappings_path();
|
||||
|
||||
let mut env = LocalEnv::load_config(&base_path, Some(&branch_mappings_path))
|
||||
.context("Error loading config")?;
|
||||
let original_env = env.clone();
|
||||
|
||||
let rt = tokio::runtime::Builder::new_current_thread()
|
||||
@@ -264,7 +267,7 @@ async fn get_timeline_infos(
|
||||
fn get_tenant_id(sub_match: &ArgMatches, env: &local_env::LocalEnv) -> anyhow::Result<TenantId> {
|
||||
if let Some(tenant_id_from_arguments) = parse_tenant_id(sub_match).transpose() {
|
||||
tenant_id_from_arguments
|
||||
} else if let Some(default_id) = env.default_tenant_id {
|
||||
} else if let Some(default_id) = env.branch_mappings.default_tenant_id {
|
||||
Ok(default_id)
|
||||
} else {
|
||||
anyhow::bail!("No tenant id. Use --tenant-id, or set a default tenant");
|
||||
@@ -278,7 +281,7 @@ fn get_tenant_shard_id(
|
||||
) -> anyhow::Result<TenantShardId> {
|
||||
if let Some(tenant_id_from_arguments) = parse_tenant_shard_id(sub_match).transpose() {
|
||||
tenant_id_from_arguments
|
||||
} else if let Some(default_id) = env.default_tenant_id {
|
||||
} else if let Some(default_id) = env.branch_mappings.default_tenant_id {
|
||||
Ok(TenantShardId::unsharded(default_id))
|
||||
} else {
|
||||
anyhow::bail!("No tenant shard id. Use --tenant-id, or set a default tenant");
|
||||
@@ -360,7 +363,6 @@ fn handle_init(init_match: &ArgMatches) -> anyhow::Result<LocalEnv> {
|
||||
.collect(),
|
||||
pg_distrib_dir: None,
|
||||
neon_distrib_dir: None,
|
||||
default_tenant_id: TenantId::from_array(std::array::from_fn(|_| 0)),
|
||||
storage_controller: None,
|
||||
control_plane_compute_hook_api: None,
|
||||
}
|
||||
@@ -368,8 +370,12 @@ fn handle_init(init_match: &ArgMatches) -> anyhow::Result<LocalEnv> {
|
||||
|
||||
LocalEnv::init(init_conf, force)
|
||||
.context("materialize initial neon_local environment on disk")?;
|
||||
Ok(LocalEnv::load_config(&local_env::base_path())
|
||||
.expect("freshly written config should be loadable"))
|
||||
let base_path = local_env::base_path();
|
||||
let branch_mappings_path = local_env::branch_mappings_path();
|
||||
let env = LocalEnv::load_config(&base_path, Some(&branch_mappings_path))
|
||||
.expect("freshly written config should be loadable");
|
||||
|
||||
Ok(env)
|
||||
}
|
||||
|
||||
/// The default pageserver is the one where CLI tenant/timeline operations are sent by default.
|
||||
@@ -525,14 +531,14 @@ async fn handle_tenant(
|
||||
|
||||
if create_match.get_flag("set-default") {
|
||||
println!("Setting tenant {tenant_id} as a default one");
|
||||
env.default_tenant_id = Some(tenant_id);
|
||||
env.branch_mappings.default_tenant_id = Some(tenant_id);
|
||||
}
|
||||
}
|
||||
Some(("set-default", set_default_match)) => {
|
||||
let tenant_id =
|
||||
parse_tenant_id(set_default_match)?.context("No tenant id specified")?;
|
||||
println!("Setting tenant {tenant_id} as a default one");
|
||||
env.default_tenant_id = Some(tenant_id);
|
||||
env.branch_mappings.default_tenant_id = Some(tenant_id);
|
||||
}
|
||||
Some(("config", create_match)) => {
|
||||
let tenant_id = get_tenant_id(create_match, env)?;
|
||||
@@ -640,6 +646,8 @@ async fn handle_timeline(timeline_match: &ArgMatches, env: &mut local_env::Local
|
||||
}
|
||||
Some(("branch", branch_match)) => {
|
||||
let tenant_id = get_tenant_id(branch_match, env)?;
|
||||
let new_timeline_id =
|
||||
parse_timeline_id(branch_match)?.unwrap_or(TimelineId::generate());
|
||||
let new_branch_name = branch_match
|
||||
.get_one::<String>("branch-name")
|
||||
.ok_or_else(|| anyhow!("No branch name provided"))?;
|
||||
@@ -658,7 +666,6 @@ async fn handle_timeline(timeline_match: &ArgMatches, env: &mut local_env::Local
|
||||
.map(|lsn_str| Lsn::from_str(lsn_str))
|
||||
.transpose()
|
||||
.context("Failed to parse ancestor start Lsn from the request")?;
|
||||
let new_timeline_id = TimelineId::generate();
|
||||
let storage_controller = StorageController::from_env(env);
|
||||
let create_req = TimelineCreateRequest {
|
||||
new_timeline_id,
|
||||
@@ -692,6 +699,7 @@ async fn handle_endpoint(ep_match: &ArgMatches, env: &local_env::LocalEnv) -> Re
|
||||
Some(ep_subcommand_data) => ep_subcommand_data,
|
||||
None => bail!("no endpoint subcommand provided"),
|
||||
};
|
||||
|
||||
let mut cplane = ComputeControlPlane::load(env.clone())?;
|
||||
|
||||
match sub_name {
|
||||
@@ -1359,6 +1367,7 @@ async fn handle_stop_all(sub_match: &ArgMatches, env: &local_env::LocalEnv) -> R
|
||||
|
||||
async fn try_stop_all(env: &local_env::LocalEnv, immediate: bool) {
|
||||
// Stop all endpoints
|
||||
// NOTE: This only knows about endpoints in the default endpoints dir
|
||||
match ComputeControlPlane::load(env.clone()) {
|
||||
Ok(cplane) => {
|
||||
for (_k, node) in cplane.endpoints {
|
||||
@@ -1419,6 +1428,18 @@ fn cli() -> Command {
|
||||
.default_value("10s")
|
||||
.required(false);
|
||||
|
||||
let branch_mappings_arg = Arg::new("branch-mappings")
|
||||
.long("branch-mappings")
|
||||
.help("File holding all branch names. Default is <repo dir>/branches.toml")
|
||||
.value_parser(value_parser!(PathBuf))
|
||||
.required(false);
|
||||
|
||||
let endpoints_dir_arg = Arg::new("endpoints-dir")
|
||||
.long("endpoints-dir")
|
||||
.help("Path to directory holding all endpoints. Default is <repo dir>/endpoints")
|
||||
.value_parser(value_parser!(PathBuf))
|
||||
.required(false);
|
||||
|
||||
let branch_name_arg = Arg::new("branch-name")
|
||||
.long("branch-name")
|
||||
.help("Name of the branch to be created or used as an alias for other services")
|
||||
@@ -1559,6 +1580,8 @@ fn cli() -> Command {
|
||||
Command::new("Neon CLI")
|
||||
.arg_required_else_help(true)
|
||||
.version(GIT_VERSION)
|
||||
.arg(branch_mappings_arg)
|
||||
.arg(endpoints_dir_arg)
|
||||
.subcommand(
|
||||
Command::new("init")
|
||||
.about("Initialize a new Neon repository, preparing configs for services to start with")
|
||||
@@ -1570,7 +1593,6 @@ fn cli() -> Command {
|
||||
.value_parser(value_parser!(PathBuf))
|
||||
.value_name("config")
|
||||
)
|
||||
.arg(pg_version_arg.clone())
|
||||
.arg(force_arg)
|
||||
)
|
||||
.subcommand(
|
||||
@@ -1583,6 +1605,7 @@ fn cli() -> Command {
|
||||
.subcommand(Command::new("branch")
|
||||
.about("Create a new timeline, using another timeline as a base, copying its data")
|
||||
.arg(tenant_id_arg.clone())
|
||||
.arg(timeline_id_arg.clone())
|
||||
.arg(branch_name_arg.clone())
|
||||
.arg(Arg::new("ancestor-branch-name").long("ancestor-branch-name")
|
||||
.help("Use last Lsn of another timeline (and its data) as base when creating the new timeline. The timeline gets resolved by its branch name.").required(false))
|
||||
|
||||
@@ -46,6 +46,11 @@ pub struct LocalEnv {
|
||||
// must be an absolute path. If the env var is not set, $PWD/.neon is used.
|
||||
pub base_data_dir: PathBuf,
|
||||
|
||||
// Similarly, path to branch mappings file. Not stored in the config file but
|
||||
// read from the NEON_BRANCH_MAPPINGS env variable. "None" means no mappings
|
||||
// are loaded, and they cannot be saved either.
|
||||
pub branch_name_mappings_path: Option<PathBuf>,
|
||||
|
||||
// Path to postgres distribution. It's expected that "bin", "include",
|
||||
// "lib", "share" from postgres distribution are there. If at some point
|
||||
// in time we will be able to run against vanilla postgres we may split that
|
||||
@@ -55,10 +60,6 @@ pub struct LocalEnv {
|
||||
// Path to pageserver binary.
|
||||
pub neon_distrib_dir: PathBuf,
|
||||
|
||||
// Default tenant ID to use with the 'neon_local' command line utility, when
|
||||
// --tenant_id is not explicitly specified.
|
||||
pub default_tenant_id: Option<TenantId>,
|
||||
|
||||
// used to issue tokens during e.g pg start
|
||||
pub private_key_path: PathBuf,
|
||||
|
||||
@@ -82,11 +83,7 @@ pub struct LocalEnv {
|
||||
// storage controller's configuration.
|
||||
pub control_plane_compute_hook_api: Option<Url>,
|
||||
|
||||
/// Keep human-readable aliases in memory (and persist them to config), to hide ZId hex strings from the user.
|
||||
// A `HashMap<String, HashMap<TenantId, TimelineId>>` would be more appropriate here,
|
||||
// but deserialization into a generic toml object as `toml::Value::try_from` fails with an error.
|
||||
// https://toml.io/en/v1.0.0 does not contain a concept of "a table inside another table".
|
||||
pub branch_name_mappings: HashMap<String, Vec<(TenantId, TimelineId)>>,
|
||||
pub branch_mappings: BranchMappings,
|
||||
}
|
||||
|
||||
/// On-disk state stored in `.neon/config`.
|
||||
@@ -95,7 +92,6 @@ pub struct LocalEnv {
|
||||
pub struct OnDiskConfig {
|
||||
pub pg_distrib_dir: PathBuf,
|
||||
pub neon_distrib_dir: PathBuf,
|
||||
pub default_tenant_id: Option<TenantId>,
|
||||
pub private_key_path: PathBuf,
|
||||
pub broker: NeonBroker,
|
||||
pub storage_controller: NeonStorageControllerConf,
|
||||
@@ -107,7 +103,20 @@ pub struct OnDiskConfig {
|
||||
pub safekeepers: Vec<SafekeeperConf>,
|
||||
pub control_plane_api: Option<Url>,
|
||||
pub control_plane_compute_hook_api: Option<Url>,
|
||||
branch_name_mappings: HashMap<String, Vec<(TenantId, TimelineId)>>,
|
||||
}
|
||||
|
||||
#[derive(PartialEq, Eq, Clone, Debug, Default, Serialize, Deserialize)]
|
||||
#[serde(default, deny_unknown_fields)]
|
||||
pub struct BranchMappings {
|
||||
// Default tenant ID to use with the 'neon_local' command line utility, when
|
||||
// --tenant_id is not explicitly specified. This comes from the branches.
|
||||
pub default_tenant_id: Option<TenantId>,
|
||||
|
||||
/// Keep human-readable aliases in memory (and persist them to config XXX), to hide ZId hex strings from the user.
|
||||
// A `HashMap<String, HashMap<TenantId, TimelineId>>` would be more appropriate here,
|
||||
// but deserialization into a generic toml object as `toml::Value::try_from` fails with an error.
|
||||
// https://toml.io/en/v1.0.0 does not contain a concept of "a table inside another table".
|
||||
pub mappings: HashMap<String, Vec<(TenantId, TimelineId)>>,
|
||||
}
|
||||
|
||||
fn fail_if_pageservers_field_specified<'de, D>(_: D) -> Result<Vec<PageServerConf>, D::Error>
|
||||
@@ -128,7 +137,6 @@ pub struct NeonLocalInitConf {
|
||||
pub pg_distrib_dir: Option<PathBuf>,
|
||||
// TODO: do we need this? Seems unused
|
||||
pub neon_distrib_dir: Option<PathBuf>,
|
||||
pub default_tenant_id: TenantId,
|
||||
pub broker: NeonBroker,
|
||||
pub storage_controller: Option<NeonStorageControllerConf>,
|
||||
pub pageservers: Vec<NeonLocalInitPageserverConf>,
|
||||
@@ -443,7 +451,8 @@ impl LocalEnv {
|
||||
timeline_id: TimelineId,
|
||||
) -> anyhow::Result<()> {
|
||||
let existing_values = self
|
||||
.branch_name_mappings
|
||||
.branch_mappings
|
||||
.mappings
|
||||
.entry(branch_name.clone())
|
||||
.or_default();
|
||||
|
||||
@@ -468,7 +477,8 @@ impl LocalEnv {
|
||||
branch_name: &str,
|
||||
tenant_id: TenantId,
|
||||
) -> Option<TimelineId> {
|
||||
self.branch_name_mappings
|
||||
self.branch_mappings
|
||||
.mappings
|
||||
.get(branch_name)?
|
||||
.iter()
|
||||
.find(|(mapped_tenant_id, _)| mapped_tenant_id == &tenant_id)
|
||||
@@ -477,7 +487,8 @@ impl LocalEnv {
|
||||
}
|
||||
|
||||
pub fn timeline_name_mappings(&self) -> HashMap<TenantTimelineId, String> {
|
||||
self.branch_name_mappings
|
||||
self.branch_mappings
|
||||
.mappings
|
||||
.iter()
|
||||
.flat_map(|(name, tenant_timelines)| {
|
||||
tenant_timelines.iter().map(|&(tenant_id, timeline_id)| {
|
||||
@@ -488,7 +499,10 @@ impl LocalEnv {
|
||||
}
|
||||
|
||||
/// Construct `Self` from on-disk state.
|
||||
pub fn load_config(repopath: &Path) -> anyhow::Result<Self> {
|
||||
pub fn load_config(
|
||||
repopath: &Path,
|
||||
branch_name_mappings_path: Option<&Path>,
|
||||
) -> anyhow::Result<Self> {
|
||||
if !repopath.exists() {
|
||||
bail!(
|
||||
"Neon config is not found in {}. You need to run 'neon_local init' first",
|
||||
@@ -498,37 +512,43 @@ impl LocalEnv {
|
||||
|
||||
// TODO: check that it looks like a neon repository
|
||||
|
||||
// load and parse file
|
||||
// load and parse config file
|
||||
let config_file_contents = fs::read_to_string(repopath.join("config"))?;
|
||||
let on_disk_config: OnDiskConfig = toml::from_str(config_file_contents.as_str())?;
|
||||
let mut env = {
|
||||
let OnDiskConfig {
|
||||
pg_distrib_dir,
|
||||
neon_distrib_dir,
|
||||
default_tenant_id,
|
||||
private_key_path,
|
||||
broker,
|
||||
storage_controller,
|
||||
pageservers,
|
||||
safekeepers,
|
||||
control_plane_api,
|
||||
control_plane_compute_hook_api,
|
||||
branch_name_mappings,
|
||||
} = on_disk_config;
|
||||
LocalEnv {
|
||||
base_data_dir: repopath.to_owned(),
|
||||
pg_distrib_dir,
|
||||
neon_distrib_dir,
|
||||
default_tenant_id,
|
||||
private_key_path,
|
||||
broker,
|
||||
storage_controller,
|
||||
pageservers,
|
||||
safekeepers,
|
||||
control_plane_api,
|
||||
control_plane_compute_hook_api,
|
||||
branch_name_mappings,
|
||||
}
|
||||
let OnDiskConfig {
|
||||
pg_distrib_dir,
|
||||
neon_distrib_dir,
|
||||
private_key_path,
|
||||
broker,
|
||||
storage_controller,
|
||||
pageservers,
|
||||
safekeepers,
|
||||
control_plane_api,
|
||||
control_plane_compute_hook_api,
|
||||
} = on_disk_config;
|
||||
|
||||
// load and parse "branches.toml" file
|
||||
let branch_mappings = if let Some(path) = branch_name_mappings_path {
|
||||
let contents = fs::read_to_string(path)
|
||||
.context(format!("load branch mappings file {}", path.display()))?;
|
||||
toml::from_str::<BranchMappings>(contents.as_str())?
|
||||
} else {
|
||||
BranchMappings::default()
|
||||
};
|
||||
|
||||
let mut env = LocalEnv {
|
||||
base_data_dir: repopath.to_owned(),
|
||||
branch_name_mappings_path: branch_name_mappings_path.map(|p| p.to_owned()),
|
||||
pg_distrib_dir,
|
||||
neon_distrib_dir,
|
||||
private_key_path,
|
||||
broker,
|
||||
storage_controller,
|
||||
pageservers,
|
||||
safekeepers,
|
||||
control_plane_api,
|
||||
control_plane_compute_hook_api,
|
||||
branch_mappings,
|
||||
};
|
||||
|
||||
// The source of truth for pageserver configuration is the pageserver.toml.
|
||||
@@ -618,7 +638,6 @@ impl LocalEnv {
|
||||
&OnDiskConfig {
|
||||
pg_distrib_dir: self.pg_distrib_dir.clone(),
|
||||
neon_distrib_dir: self.neon_distrib_dir.clone(),
|
||||
default_tenant_id: self.default_tenant_id,
|
||||
private_key_path: self.private_key_path.clone(),
|
||||
broker: self.broker.clone(),
|
||||
storage_controller: self.storage_controller.clone(),
|
||||
@@ -626,9 +645,20 @@ impl LocalEnv {
|
||||
safekeepers: self.safekeepers.clone(),
|
||||
control_plane_api: self.control_plane_api.clone(),
|
||||
control_plane_compute_hook_api: self.control_plane_compute_hook_api.clone(),
|
||||
branch_name_mappings: self.branch_name_mappings.clone(),
|
||||
},
|
||||
)
|
||||
)?;
|
||||
|
||||
if let Some(path) = &self.branch_name_mappings_path {
|
||||
Self::persist_branches_impl(path, &self.branch_mappings)?;
|
||||
} else {
|
||||
if !self.branch_mappings.mappings.is_empty() {
|
||||
tracing::warn!("command created a branch mapping, but it was not saved because no mappings file was configured")
|
||||
} else if self.branch_mappings.default_tenant_id.is_some() {
|
||||
tracing::warn!("command created a tenant default, but it was not saved because no mappings file was configured")
|
||||
}
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub fn persist_config_impl(base_path: &Path, config: &OnDiskConfig) -> anyhow::Result<()> {
|
||||
@@ -642,6 +672,19 @@ impl LocalEnv {
|
||||
})
|
||||
}
|
||||
|
||||
pub fn persist_branches_impl(
|
||||
branch_name_mappings_path: &Path,
|
||||
branch_mappings: &BranchMappings,
|
||||
) -> anyhow::Result<()> {
|
||||
let content = &toml::to_string_pretty(branch_mappings)?;
|
||||
fs::write(branch_name_mappings_path, content).with_context(|| {
|
||||
format!(
|
||||
"Failed to write branch information into path '{}'",
|
||||
branch_name_mappings_path.display()
|
||||
)
|
||||
})
|
||||
}
|
||||
|
||||
// this function is used only for testing purposes in CLI e g generate tokens during init
|
||||
pub fn generate_auth_token(&self, claims: &Claims) -> anyhow::Result<String> {
|
||||
let private_key_path = self.get_private_key_path();
|
||||
@@ -702,7 +745,6 @@ impl LocalEnv {
|
||||
let NeonLocalInitConf {
|
||||
pg_distrib_dir,
|
||||
neon_distrib_dir,
|
||||
default_tenant_id,
|
||||
broker,
|
||||
storage_controller,
|
||||
pageservers,
|
||||
@@ -746,9 +788,9 @@ impl LocalEnv {
|
||||
// TODO: refactor to avoid this, LocalEnv should only be constructed from on-disk state
|
||||
let env = LocalEnv {
|
||||
base_data_dir: base_path.clone(),
|
||||
branch_name_mappings_path: Some(base_path.join("branches.toml")),
|
||||
pg_distrib_dir,
|
||||
neon_distrib_dir,
|
||||
default_tenant_id: Some(default_tenant_id),
|
||||
private_key_path,
|
||||
broker,
|
||||
storage_controller: storage_controller.unwrap_or_default(),
|
||||
@@ -756,10 +798,10 @@ impl LocalEnv {
|
||||
safekeepers,
|
||||
control_plane_api: control_plane_api.unwrap_or_default(),
|
||||
control_plane_compute_hook_api: control_plane_compute_hook_api.unwrap_or_default(),
|
||||
branch_name_mappings: Default::default(),
|
||||
branch_mappings: Default::default(),
|
||||
};
|
||||
|
||||
// create endpoints dir
|
||||
// create the default endpoints dir
|
||||
fs::create_dir_all(env.endpoints_path())?;
|
||||
|
||||
// create safekeeper dirs
|
||||
@@ -806,6 +848,24 @@ pub fn base_path() -> PathBuf {
|
||||
path
|
||||
}
|
||||
|
||||
pub fn branch_mappings_path() -> PathBuf {
|
||||
let path = match std::env::var_os("NEON_BRANCH_MAPPINGS") {
|
||||
Some(val) => {
|
||||
let path = PathBuf::from(val);
|
||||
|
||||
// a relative path is relative to repo dir
|
||||
if !path.is_absolute() {
|
||||
base_path().join(path)
|
||||
} else {
|
||||
path
|
||||
}
|
||||
}
|
||||
None => base_path().join("branches.toml"),
|
||||
};
|
||||
assert!(path.is_absolute());
|
||||
path
|
||||
}
|
||||
|
||||
/// Generate a public/private key pair for JWT authentication
|
||||
fn generate_auth_keys(private_key_path: &Path, public_key_path: &Path) -> anyhow::Result<()> {
|
||||
// Generate the key pair
|
||||
|
||||
@@ -64,6 +64,7 @@ pub struct ConfigToml {
|
||||
#[serde(with = "humantime_serde")]
|
||||
pub wal_redo_timeout: Duration,
|
||||
pub superuser: String,
|
||||
pub initdb_cache_dir: Option<Utf8PathBuf>,
|
||||
pub page_cache_size: usize,
|
||||
pub max_file_descriptors: usize,
|
||||
pub pg_distrib_dir: Option<Utf8PathBuf>,
|
||||
@@ -358,6 +359,7 @@ impl Default for ConfigToml {
|
||||
wal_redo_timeout: (humantime::parse_duration(DEFAULT_WAL_REDO_TIMEOUT)
|
||||
.expect("cannot parse default wal redo timeout")),
|
||||
superuser: (DEFAULT_SUPERUSER.to_string()),
|
||||
initdb_cache_dir: None,
|
||||
page_cache_size: (DEFAULT_PAGE_CACHE_SIZE),
|
||||
max_file_descriptors: (DEFAULT_MAX_FILE_DESCRIPTORS),
|
||||
pg_distrib_dir: None, // Utf8PathBuf::from("./pg_install"), // TODO: formely, this was std::env::current_dir()
|
||||
|
||||
@@ -71,6 +71,8 @@ pub struct PageServerConf {
|
||||
|
||||
pub superuser: String,
|
||||
|
||||
pub initdb_cache_dir: Option<Utf8PathBuf>,
|
||||
|
||||
pub page_cache_size: usize,
|
||||
pub max_file_descriptors: usize,
|
||||
|
||||
@@ -309,6 +311,7 @@ impl PageServerConf {
|
||||
wait_lsn_timeout,
|
||||
wal_redo_timeout,
|
||||
superuser,
|
||||
initdb_cache_dir,
|
||||
page_cache_size,
|
||||
max_file_descriptors,
|
||||
pg_distrib_dir,
|
||||
@@ -356,6 +359,7 @@ impl PageServerConf {
|
||||
wait_lsn_timeout,
|
||||
wal_redo_timeout,
|
||||
superuser,
|
||||
initdb_cache_dir,
|
||||
page_cache_size,
|
||||
max_file_descriptors,
|
||||
http_auth_type,
|
||||
|
||||
@@ -3491,7 +3491,7 @@ impl Tenant {
|
||||
.context("extract initdb tar")?;
|
||||
} else {
|
||||
// Init temporarily repo to get bootstrap data, this creates a directory in the `pgdata_path` path
|
||||
run_initdb(self.conf, &pgdata_path, pg_version, &self.cancel).await?;
|
||||
run_initdb_with_cache(self.conf, &pgdata_path, pg_version, &self.cancel).await?;
|
||||
|
||||
// Upload the created data dir to S3
|
||||
if self.tenant_shard_id().is_shard_zero() {
|
||||
@@ -3534,6 +3534,7 @@ impl Tenant {
|
||||
// Flush loop needs to be spawned in order to be able to flush.
|
||||
unfinished_timeline.maybe_spawn_flush_loop();
|
||||
|
||||
info!("starting to import from datadir");
|
||||
import_datadir::import_timeline_from_postgres_datadir(
|
||||
unfinished_timeline,
|
||||
&pgdata_path,
|
||||
@@ -3549,6 +3550,7 @@ impl Tenant {
|
||||
anyhow::bail!("failpoint before-checkpoint-new-timeline");
|
||||
});
|
||||
|
||||
info!("calling freeze_and_flush");
|
||||
unfinished_timeline
|
||||
.freeze_and_flush()
|
||||
.await
|
||||
@@ -3559,7 +3561,9 @@ impl Tenant {
|
||||
})?;
|
||||
|
||||
// All done!
|
||||
info!("calling finish_creation");
|
||||
let timeline = raw_timeline.finish_creation()?;
|
||||
info!("call to finish_creation done");
|
||||
|
||||
Ok(timeline)
|
||||
}
|
||||
@@ -3837,6 +3841,118 @@ impl Tenant {
|
||||
}
|
||||
}
|
||||
|
||||
fn cached_initdb_dirname(initial_superuser_name: &str, pg_version: u32) -> String
|
||||
{
|
||||
use std::hash::Hash;
|
||||
use std::hash::Hasher;
|
||||
use std::collections::hash_map::DefaultHasher;
|
||||
let mut hasher = DefaultHasher::new();
|
||||
initial_superuser_name.hash(&mut hasher);
|
||||
let hash = hasher.finish();
|
||||
|
||||
format!("cached_initial_pgdata_{pg_version}_{:016}", hash)
|
||||
}
|
||||
|
||||
fn copy_dir_all(src: impl AsRef<std::path::Path>, dst: impl AsRef<std::path::Path>) -> std::io::Result<()> {
|
||||
for entry in fs::read_dir(src.as_ref())? {
|
||||
let entry = entry?;
|
||||
let subsrc = entry.path();
|
||||
let subdst = dst.as_ref().join(&entry.file_name());
|
||||
|
||||
if entry.file_type()?.is_dir() {
|
||||
std::fs::create_dir(&subdst)?;
|
||||
copy_dir_all(&subsrc, &subdst)?;
|
||||
} else {
|
||||
std::fs::copy(&subsrc, &subdst)?;
|
||||
}
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn restore_cached_initdb_dir(
|
||||
cached_path: &Utf8Path,
|
||||
target_path: &Utf8Path,
|
||||
) -> anyhow::Result<bool> {
|
||||
if !cached_path.exists() {
|
||||
info!("cached initdb dir \"{cached_path}\" does not exist yet");
|
||||
return Ok(false);
|
||||
}
|
||||
|
||||
std::fs::create_dir(target_path)?;
|
||||
copy_dir_all(cached_path, target_path)?;
|
||||
info!("restored initdb result from cache dir \"{cached_path}\"");
|
||||
Ok(true)
|
||||
}
|
||||
|
||||
fn save_cached_initdb_dir(
|
||||
src_path: &Utf8Path,
|
||||
cache_path: &Utf8Path,
|
||||
) -> anyhow::Result<()> {
|
||||
match std::fs::create_dir(cache_path) {
|
||||
Ok(()) => {
|
||||
info!("saving initdb result to cache dir \"{cache_path}\"");
|
||||
},
|
||||
Err(err) if err.kind() == std::io::ErrorKind::AlreadyExists => {
|
||||
info!("cache initdb dir \"{cache_path}\" already exists, not saving");
|
||||
return Ok(())
|
||||
},
|
||||
Err(err) => { return Err(anyhow::Error::from(err))},
|
||||
};
|
||||
let cache_dir_guard = scopeguard::guard(cache_path, |cp| {
|
||||
if let Err(err) = std::fs::remove_dir_all(&cp) {
|
||||
error!("could not remove cached initdb directory {cp}: {err}");
|
||||
}
|
||||
});
|
||||
|
||||
let cache_parent_path = cache_path.parent().ok_or(anyhow::Error::msg("no cache parent path"))?;
|
||||
|
||||
let tmp_dirpath = camino_tempfile::tempdir_in(cache_parent_path)?;
|
||||
copy_dir_all(src_path, &tmp_dirpath)?;
|
||||
std::fs::rename(tmp_dirpath, &*cache_dir_guard)?;
|
||||
|
||||
// disarm the guard
|
||||
scopeguard::ScopeGuard::into_inner(cache_dir_guard);
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn run_initdb_with_cache(
|
||||
conf: &'static PageServerConf,
|
||||
initdb_target_dir: &Utf8Path,
|
||||
pg_version: u32,
|
||||
cancel: &CancellationToken,
|
||||
) -> Result<(), InitdbError> {
|
||||
|
||||
let cache_dir = conf.initdb_cache_dir.as_ref().map(|initdb_cache_dir| {
|
||||
initdb_cache_dir.join(cached_initdb_dirname(&conf.superuser, pg_version))
|
||||
});
|
||||
|
||||
if let Some(cache_dir) = &cache_dir {
|
||||
match restore_cached_initdb_dir(&cache_dir, initdb_target_dir) {
|
||||
Ok(true) => return Ok(()),
|
||||
Ok(false) => {},
|
||||
Err(err) => {
|
||||
warn!("Error restoring from cached initdb directory \"{cache_dir}\": {err}");
|
||||
if initdb_target_dir.exists() {
|
||||
if let Err(err) = std::fs::remove_dir_all(&initdb_target_dir) {
|
||||
error!("could not remove temporary initdb target directory {initdb_target_dir}: {err}");
|
||||
}
|
||||
}
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
run_initdb(conf, initdb_target_dir, pg_version, cancel).await?;
|
||||
|
||||
if let Some(cache_dir) = &cache_dir {
|
||||
if let Err(err) = save_cached_initdb_dir(initdb_target_dir, &cache_dir) {
|
||||
warn!("error saving initdb result to cache directory \"{cache_dir}\": {err}");
|
||||
}
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Create the cluster temporarily in 'initdbpath' directory inside the repository
|
||||
/// to get bootstrap data for timeline initialization.
|
||||
async fn run_initdb(
|
||||
|
||||
@@ -292,7 +292,7 @@ impl ComputeHook {
|
||||
);
|
||||
return Ok(());
|
||||
};
|
||||
let env = match LocalEnv::load_config(repo_dir) {
|
||||
let env = match LocalEnv::load_config(repo_dir, None) {
|
||||
Ok(e) => e,
|
||||
Err(e) => {
|
||||
tracing::warn!("Couldn't load neon_local config, skipping compute update ({e})");
|
||||
|
||||
@@ -50,7 +50,7 @@ class NeonBroker:
|
||||
raise RuntimeError(
|
||||
f"timed out waiting {elapsed:.0f}s for storage_broker start: {e}"
|
||||
) from e
|
||||
time.sleep(0.5)
|
||||
time.sleep(0.1)
|
||||
else:
|
||||
break # success
|
||||
|
||||
|
||||
File diff suppressed because it is too large
Load Diff
@@ -19,7 +19,7 @@ def pg_version() -> Optional[PgVersion]:
|
||||
return None
|
||||
|
||||
|
||||
@pytest.fixture(scope="function", autouse=True)
|
||||
@pytest.fixture(scope="session", autouse=True)
|
||||
def build_type() -> Optional[str]:
|
||||
return None
|
||||
|
||||
@@ -64,7 +64,7 @@ def pytest_generate_tests(metafunc: Metafunc):
|
||||
else:
|
||||
build_types = [bt.lower()]
|
||||
|
||||
metafunc.parametrize("build_type", build_types)
|
||||
metafunc.parametrize("build_type", build_types, scope="session")
|
||||
|
||||
if (v := os.getenv("DEFAULT_PG_VERSION")) is None:
|
||||
pg_versions = [version for version in PgVersion if version != PgVersion.NOT_SET]
|
||||
|
||||
@@ -6,8 +6,8 @@ from fixtures.neon_fixtures import (
|
||||
)
|
||||
|
||||
|
||||
def test_aux_v2_config_switch(neon_env_builder: NeonEnvBuilder, vanilla_pg):
|
||||
env = neon_env_builder.init_start()
|
||||
def test_aux_v2_config_switch(neon_shared_env: NeonEnv, vanilla_pg):
|
||||
env = neon_shared_env
|
||||
endpoint = env.endpoints.create_start("main")
|
||||
client = env.pageserver.http_client()
|
||||
|
||||
|
||||
@@ -46,11 +46,11 @@ from fixtures.utils import query_scalar
|
||||
# Because the delta layer D covering lsn1 is corrupted, creating a branch
|
||||
# starting from lsn1 should return an error as follows:
|
||||
# could not find data for key ... at LSN ..., for request at LSN ...
|
||||
def test_branch_and_gc(neon_simple_env: NeonEnv, build_type: str):
|
||||
def test_branch_and_gc(neon_shared_env: NeonEnv, build_type: str):
|
||||
if build_type == "debug":
|
||||
pytest.skip("times out in debug builds")
|
||||
|
||||
env = neon_simple_env
|
||||
env = neon_shared_env
|
||||
pageserver_http_client = env.pageserver.http_client()
|
||||
|
||||
tenant, _ = env.neon_cli.create_tenant(
|
||||
@@ -116,8 +116,8 @@ def test_branch_and_gc(neon_simple_env: NeonEnv, build_type: str):
|
||||
# and prevent creating branches with invalid starting LSNs.
|
||||
#
|
||||
# For more details, see discussion in https://github.com/neondatabase/neon/pull/2101#issuecomment-1185273447.
|
||||
def test_branch_creation_before_gc(neon_simple_env: NeonEnv):
|
||||
env = neon_simple_env
|
||||
def test_branch_creation_before_gc(neon_shared_env: NeonEnv):
|
||||
env = neon_shared_env
|
||||
pageserver_http_client = env.pageserver.http_client()
|
||||
|
||||
error_regexes = [
|
||||
|
||||
@@ -33,7 +33,7 @@ from requests import RequestException
|
||||
@pytest.mark.parametrize("scale", get_scales_matrix(1))
|
||||
@pytest.mark.parametrize("ty", ["cascade", "flat"])
|
||||
def test_branching_with_pgbench(
|
||||
neon_simple_env: NeonEnv, pg_bin: PgBin, n_branches: int, scale: int, ty: str
|
||||
neon_shared_env: NeonEnv, pg_bin: PgBin, n_branches: int, scale: int, ty: str
|
||||
):
|
||||
env = neon_simple_env
|
||||
|
||||
|
||||
@@ -2,9 +2,8 @@ from fixtures.metrics import parse_metrics
|
||||
from fixtures.neon_fixtures import NeonEnvBuilder, NeonProxy
|
||||
|
||||
|
||||
def test_build_info_metric(neon_env_builder: NeonEnvBuilder, link_proxy: NeonProxy):
|
||||
neon_env_builder.num_safekeepers = 1
|
||||
env = neon_env_builder.init_start()
|
||||
def test_build_info_metric(neon_shared_env: NeonEnv, link_proxy: NeonProxy):
|
||||
env = neon_shared_env
|
||||
|
||||
parsed_metrics = {}
|
||||
|
||||
|
||||
@@ -9,8 +9,8 @@ from fixtures.utils import query_scalar
|
||||
#
|
||||
# Test compute node start after clog truncation
|
||||
#
|
||||
def test_clog_truncate(neon_simple_env: NeonEnv):
|
||||
env = neon_simple_env
|
||||
def test_clog_truncate(neon_shared_env: NeonEnv):
|
||||
env = neon_shared_env
|
||||
|
||||
# set aggressive autovacuum to make sure that truncation will happen
|
||||
config = [
|
||||
|
||||
@@ -1,8 +1,8 @@
|
||||
from fixtures.neon_fixtures import NeonEnvBuilder, flush_ep_to_pageserver
|
||||
|
||||
|
||||
def do_combocid_op(neon_env_builder: NeonEnvBuilder, op):
|
||||
env = neon_env_builder.init_start()
|
||||
def do_combocid_op(neon_shared_env: NeonEnv, op):
|
||||
env = neon_shared_env
|
||||
endpoint = env.endpoints.create_start(
|
||||
"main",
|
||||
config_lines=[
|
||||
@@ -49,20 +49,20 @@ def do_combocid_op(neon_env_builder: NeonEnvBuilder, op):
|
||||
)
|
||||
|
||||
|
||||
def test_combocid_delete(neon_env_builder: NeonEnvBuilder):
|
||||
do_combocid_op(neon_env_builder, "delete from t")
|
||||
def test_combocid_delete(neon_shared_env: NeonEnv):
|
||||
do_combocid_op(neon_env, "delete from t")
|
||||
|
||||
|
||||
def test_combocid_update(neon_env_builder: NeonEnvBuilder):
|
||||
do_combocid_op(neon_env_builder, "update t set val=val+1")
|
||||
def test_combocid_update(neon_shared_env: NeonEnv):
|
||||
do_combocid_op(neon_env, "update t set val=val+1")
|
||||
|
||||
|
||||
def test_combocid_lock(neon_env_builder: NeonEnvBuilder):
|
||||
do_combocid_op(neon_env_builder, "select * from t for update")
|
||||
def test_combocid_lock(neon_shared_env: NeonEnv):
|
||||
do_combocid_op(neon_env, "select * from t for update")
|
||||
|
||||
|
||||
def test_combocid_multi_insert(neon_env_builder: NeonEnvBuilder):
|
||||
env = neon_env_builder.init_start()
|
||||
def test_combocid_multi_insert(neon_shared_env: NeonEnv):
|
||||
env = neon_shared_env
|
||||
endpoint = env.endpoints.create_start(
|
||||
"main",
|
||||
config_lines=[
|
||||
@@ -112,8 +112,8 @@ def test_combocid_multi_insert(neon_env_builder: NeonEnvBuilder):
|
||||
)
|
||||
|
||||
|
||||
def test_combocid(neon_env_builder: NeonEnvBuilder):
|
||||
env = neon_env_builder.init_start()
|
||||
def test_combocid(neon_shared_env: NeonEnv):
|
||||
env = neon_shared_env
|
||||
endpoint = env.endpoints.create_start("main")
|
||||
|
||||
conn = endpoint.connect()
|
||||
|
||||
@@ -178,7 +178,7 @@ def test_backward_compatibility(
|
||||
neon_env_builder.num_safekeepers = 3
|
||||
env = neon_env_builder.from_repo_dir(compatibility_snapshot_dir / "repo")
|
||||
env.pageserver.allowed_errors.append(ingest_lag_log_line)
|
||||
neon_env_builder.start()
|
||||
env.start()
|
||||
|
||||
check_neon_works(
|
||||
env,
|
||||
@@ -265,7 +265,7 @@ def test_forward_compatibility(
|
||||
# does not include logs from previous runs
|
||||
assert not env.pageserver.log_contains("git-env:" + prev_pageserver_version)
|
||||
|
||||
neon_env_builder.start()
|
||||
env.start()
|
||||
|
||||
# ensure the specified pageserver is running
|
||||
assert env.pageserver.log_contains("git-env:" + prev_pageserver_version)
|
||||
|
||||
@@ -2,8 +2,8 @@ import requests
|
||||
from fixtures.neon_fixtures import NeonEnv
|
||||
|
||||
|
||||
def test_compute_catalog(neon_simple_env: NeonEnv):
|
||||
env = neon_simple_env
|
||||
def test_compute_catalog(neon_shared_env: NeonEnv):
|
||||
env = neon_shared_env
|
||||
|
||||
endpoint = env.endpoints.create_start("main", config_lines=["log_min_messages=debug1"])
|
||||
client = endpoint.http_client()
|
||||
|
||||
@@ -29,6 +29,27 @@ def test_config(neon_simple_env: NeonEnv):
|
||||
# check that config change was applied
|
||||
assert cur.fetchone() == ("debug1",)
|
||||
|
||||
def test_shared_config(neon_shared_env: NeonEnv):
|
||||
env = neon_shared_env
|
||||
|
||||
# change config
|
||||
endpoint = env.endpoints.create_start("main", config_lines=["log_min_messages=debug1"])
|
||||
|
||||
with closing(endpoint.connect()) as conn:
|
||||
with conn.cursor() as cur:
|
||||
cur.execute(
|
||||
"""
|
||||
SELECT setting
|
||||
FROM pg_settings
|
||||
WHERE
|
||||
source != 'default'
|
||||
AND source != 'override'
|
||||
AND name = 'log_min_messages'
|
||||
"""
|
||||
)
|
||||
|
||||
# check that config change was applied
|
||||
assert cur.fetchone() == ("debug1",)
|
||||
|
||||
#
|
||||
# Test that reordering of safekeepers does not restart walproposer
|
||||
|
||||
@@ -12,8 +12,8 @@ from fixtures.utils import query_scalar
|
||||
# Test CREATE DATABASE when there have been relmapper changes
|
||||
#
|
||||
@pytest.mark.parametrize("strategy", ["file_copy", "wal_log"])
|
||||
def test_createdb(neon_simple_env: NeonEnv, strategy: str):
|
||||
env = neon_simple_env
|
||||
def test_createdb(neon_shared_env: NeonEnv, strategy: str):
|
||||
env = neon_shared_env
|
||||
if env.pg_version == PgVersion.V14 and strategy == "wal_log":
|
||||
pytest.skip("wal_log strategy not supported on PostgreSQL 14")
|
||||
|
||||
@@ -58,8 +58,8 @@ def test_createdb(neon_simple_env: NeonEnv, strategy: str):
|
||||
#
|
||||
# Test DROP DATABASE
|
||||
#
|
||||
def test_dropdb(neon_simple_env: NeonEnv, test_output_dir):
|
||||
env = neon_simple_env
|
||||
def test_dropdb(neon_shared_env: NeonEnv, test_output_dir):
|
||||
env = neon_shared_env
|
||||
endpoint = env.endpoints.create_start("main")
|
||||
|
||||
with endpoint.cursor() as cur:
|
||||
|
||||
@@ -5,8 +5,8 @@ from fixtures.utils import query_scalar
|
||||
#
|
||||
# Test CREATE USER to check shared catalog restore
|
||||
#
|
||||
def test_createuser(neon_simple_env: NeonEnv):
|
||||
env = neon_simple_env
|
||||
def test_createuser(neon_shared_env: NeonEnv):
|
||||
env = neon_shared_env
|
||||
endpoint = env.endpoints.create_start("main")
|
||||
|
||||
with endpoint.cursor() as cur:
|
||||
|
||||
@@ -10,11 +10,11 @@ from fixtures.neon_fixtures import NeonEnvBuilder
|
||||
"💣", # calls `trigger_segfault` internally
|
||||
],
|
||||
)
|
||||
def test_endpoint_crash(neon_env_builder: NeonEnvBuilder, sql_func: str):
|
||||
def test_endpoint_crash(neon_shared_env: NeonEnv, sql_func: str):
|
||||
"""
|
||||
Test that triggering crash from neon_test_utils crashes the endpoint
|
||||
"""
|
||||
env = neon_env_builder.init_start()
|
||||
env = neon_shared_env
|
||||
env.neon_cli.create_branch("test_endpoint_crash")
|
||||
endpoint = env.endpoints.create_start("test_endpoint_crash")
|
||||
|
||||
|
||||
@@ -1,8 +1,8 @@
|
||||
from fixtures.neon_fixtures import NeonEnvBuilder
|
||||
from fixtures.neon_fixtures import NeonEnv
|
||||
|
||||
|
||||
def test_fsm_truncate(neon_env_builder: NeonEnvBuilder):
|
||||
env = neon_env_builder.init_start()
|
||||
def test_fsm_truncate(neon_shared_env: NeonEnv):
|
||||
env = neon_shared_env
|
||||
env.neon_cli.create_branch("test_fsm_truncate")
|
||||
endpoint = env.endpoints.create_start("test_fsm_truncate")
|
||||
endpoint.safe_psql(
|
||||
|
||||
@@ -16,12 +16,12 @@ num_rows = 1000
|
||||
|
||||
# Ensure that regular postgres can start from fullbackup
|
||||
def test_fullbackup(
|
||||
neon_env_builder: NeonEnvBuilder,
|
||||
neon_shared_env: NeonEnv,
|
||||
pg_bin: PgBin,
|
||||
port_distributor: PortDistributor,
|
||||
test_output_dir: Path,
|
||||
):
|
||||
env = neon_env_builder.init_start()
|
||||
env = neon_shared_env
|
||||
|
||||
# endpoint needs to be alive until the fullbackup so that we have
|
||||
# prev_record_lsn for the vanilla_pg to start in read-write mode
|
||||
|
||||
@@ -6,8 +6,8 @@ from fixtures.neon_fixtures import NeonEnv, wait_replica_caughtup
|
||||
#
|
||||
# Test that redo of XLOG_GIN_VACUUM_PAGE doesn't produce error
|
||||
#
|
||||
def test_gin_redo(neon_simple_env: NeonEnv):
|
||||
env = neon_simple_env
|
||||
def test_gin_redo(neon_shared_env: NeonEnv):
|
||||
env = neon_shared_env
|
||||
|
||||
primary = env.endpoints.create_start(branch_name="main", endpoint_id="primary")
|
||||
time.sleep(1)
|
||||
|
||||
@@ -87,8 +87,8 @@ def test_hot_standby(neon_simple_env: NeonEnv):
|
||||
sk_http.configure_failpoints(("sk-send-wal-replica-sleep", "off"))
|
||||
|
||||
|
||||
def test_2_replicas_start(neon_simple_env: NeonEnv):
|
||||
env = neon_simple_env
|
||||
def test_2_replicas_start(neon_shared_env: NeonEnv):
|
||||
env = neon_shared_env
|
||||
|
||||
with env.endpoints.create_start(
|
||||
branch_name="main",
|
||||
@@ -286,8 +286,8 @@ def test_hot_standby_feedback(neon_env_builder: NeonEnvBuilder, pg_bin: PgBin):
|
||||
|
||||
# Test race condition between WAL replay and backends performing queries
|
||||
# https://github.com/neondatabase/neon/issues/7791
|
||||
def test_replica_query_race(neon_simple_env: NeonEnv):
|
||||
env = neon_simple_env
|
||||
def test_replica_query_race(neon_shared_env: NeonEnv):
|
||||
env = neon_shared_env
|
||||
|
||||
primary_ep = env.endpoints.create_start(
|
||||
branch_name="main",
|
||||
|
||||
@@ -12,8 +12,8 @@ from fixtures.neon_fixtures import NeonEnvBuilder
|
||||
# to large (several gigabytes) layer files (both ephemeral and delta layers).
|
||||
# It may cause problems with uploading to S3 and also degrade performance because ephemeral file swapping.
|
||||
#
|
||||
def test_large_schema(neon_env_builder: NeonEnvBuilder):
|
||||
env = neon_env_builder.init_start()
|
||||
def test_large_schema(neon_shared_env: NeonEnv):
|
||||
env = neon_shared_env
|
||||
|
||||
endpoint = env.endpoints.create_start("main")
|
||||
|
||||
|
||||
@@ -14,8 +14,8 @@ from fixtures.neon_fixtures import NeonEnv, PgBin
|
||||
# Test branching, when a transaction is in prepared state
|
||||
#
|
||||
@pytest.mark.timeout(600)
|
||||
def test_lfc_resize(neon_simple_env: NeonEnv, pg_bin: PgBin):
|
||||
env = neon_simple_env
|
||||
def test_lfc_resize(neon_shared_env: NeonEnv, pg_bin: PgBin):
|
||||
env = neon_shared_env
|
||||
endpoint = env.endpoints.create_start(
|
||||
"main",
|
||||
config_lines=[
|
||||
|
||||
@@ -9,8 +9,8 @@ from fixtures.neon_fixtures import NeonEnvBuilder
|
||||
from fixtures.utils import query_scalar
|
||||
|
||||
|
||||
def test_local_file_cache_unlink(neon_env_builder: NeonEnvBuilder):
|
||||
env = neon_env_builder.init_start()
|
||||
def test_local_file_cache_unlink(neon_shared_env: NeonEnv):
|
||||
env = neon_shared_env
|
||||
|
||||
cache_dir = os.path.join(env.repo_dir, "file_cache")
|
||||
os.mkdir(cache_dir)
|
||||
|
||||
@@ -24,13 +24,13 @@ def assert_lsn_lease_granted(result, with_lease: bool):
|
||||
|
||||
|
||||
@pytest.mark.parametrize("with_lease", [True, False])
|
||||
def test_lsn_mapping(neon_env_builder: NeonEnvBuilder, with_lease: bool):
|
||||
def test_lsn_mapping(neon_shared_env: NeonEnv, with_lease: bool):
|
||||
"""
|
||||
Test pageserver get_lsn_by_timestamp API.
|
||||
|
||||
:param with_lease: Whether to get a lease associated with returned LSN.
|
||||
"""
|
||||
env = neon_env_builder.init_start()
|
||||
env = neon_shared_env
|
||||
|
||||
tenant_id, _ = env.neon_cli.create_tenant(
|
||||
conf={
|
||||
|
||||
@@ -7,8 +7,8 @@ if TYPE_CHECKING:
|
||||
from fixtures.neon_fixtures import NeonEnv
|
||||
|
||||
|
||||
def test_migrations(neon_simple_env: NeonEnv):
|
||||
env = neon_simple_env
|
||||
def test_migrations(neon_shared_env: NeonEnv):
|
||||
env = neon_shared_env
|
||||
|
||||
endpoint = env.endpoints.create("main")
|
||||
endpoint.respec(skip_pg_catalog_updates=False)
|
||||
|
||||
@@ -12,8 +12,8 @@ from fixtures.utils import query_scalar
|
||||
# is enough to verify that the WAL records are handled correctly
|
||||
# in the pageserver.
|
||||
#
|
||||
def test_multixact(neon_simple_env: NeonEnv, test_output_dir):
|
||||
env = neon_simple_env
|
||||
def test_multixact(neon_shared_env: NeonEnv, test_output_dir):
|
||||
env = neon_shared_env
|
||||
endpoint = env.endpoints.create_start("main")
|
||||
|
||||
cur = endpoint.connect().cursor()
|
||||
|
||||
@@ -4,8 +4,8 @@ from fixtures.pg_version import PgVersion
|
||||
from fixtures.utils import wait_until
|
||||
|
||||
|
||||
def test_neon_superuser(neon_simple_env: NeonEnv, pg_version: PgVersion):
|
||||
env = neon_simple_env
|
||||
def test_neon_superuser(neon_shared_env: NeonEnv, pg_version: PgVersion):
|
||||
env = neon_shared_env
|
||||
env.neon_cli.create_branch("test_neon_superuser_publisher", "main")
|
||||
pub = env.endpoints.create("test_neon_superuser_publisher")
|
||||
|
||||
|
||||
@@ -2,8 +2,8 @@ from fixtures.log_helper import log
|
||||
from fixtures.neon_fixtures import NeonEnvBuilder
|
||||
|
||||
|
||||
def test_oid_overflow(neon_env_builder: NeonEnvBuilder):
|
||||
env = neon_env_builder.init_start()
|
||||
def test_oid_overflow(neon_shared_env: NeonEnv):
|
||||
env = neon_shared_env
|
||||
|
||||
endpoint = env.endpoints.create_start("main")
|
||||
|
||||
|
||||
@@ -56,8 +56,8 @@ def check_client(env: NeonEnv, client: PageserverHttpClient):
|
||||
assert TimelineId(timeline_details["timeline_id"]) == timeline_id
|
||||
|
||||
|
||||
def test_pageserver_http_get_wal_receiver_not_found(neon_simple_env: NeonEnv):
|
||||
env = neon_simple_env
|
||||
def test_pageserver_http_get_wal_receiver_not_found(neon_shared_env: NeonEnv):
|
||||
env = neon_shared_env
|
||||
with env.pageserver.http_client() as client:
|
||||
tenant_id, timeline_id = env.neon_cli.create_tenant()
|
||||
|
||||
@@ -105,8 +105,8 @@ def expect_updated_msg_lsn(
|
||||
#
|
||||
# These fields used to be returned by a separate API call, but they're part of
|
||||
# `timeline_details` now.
|
||||
def test_pageserver_http_get_wal_receiver_success(neon_simple_env: NeonEnv):
|
||||
env = neon_simple_env
|
||||
def test_pageserver_http_get_wal_receiver_success(neon_shared_env: NeonEnv):
|
||||
env = neon_shared_env
|
||||
with env.pageserver.http_client() as client:
|
||||
tenant_id, timeline_id = env.neon_cli.create_tenant()
|
||||
endpoint = env.endpoints.create_start(DEFAULT_BRANCH_NAME, tenant_id=tenant_id)
|
||||
|
||||
@@ -39,8 +39,8 @@ async def parallel_load_same_table(endpoint: Endpoint, n_parallel: int):
|
||||
|
||||
|
||||
# Load data into one table with COPY TO from 5 parallel connections
|
||||
def test_parallel_copy(neon_simple_env: NeonEnv, n_parallel=5):
|
||||
env = neon_simple_env
|
||||
def test_parallel_copy(neon_shared_env: NeonEnv, n_parallel=5):
|
||||
env = neon_shared_env
|
||||
endpoint = env.endpoints.create_start("main")
|
||||
|
||||
# Create test table
|
||||
|
||||
@@ -19,8 +19,8 @@ def check_wal_segment(pg_waldump_path: str, segment_path: str, test_output_dir):
|
||||
|
||||
|
||||
# Simple test to check that pg_waldump works with neon WAL files
|
||||
def test_pg_waldump(neon_simple_env: NeonEnv, test_output_dir, pg_bin: PgBin):
|
||||
env = neon_simple_env
|
||||
def test_pg_waldump(neon_shared_env: NeonEnv, test_output_dir, pg_bin: PgBin):
|
||||
env = neon_shared_env
|
||||
tenant_id = env.initial_tenant
|
||||
timeline_id = env.initial_timeline
|
||||
endpoint = env.endpoints.create_start("main")
|
||||
|
||||
@@ -13,8 +13,8 @@ extensions = ["pageinspect", "neon_test_utils", "pg_buffercache"]
|
||||
#
|
||||
# Validation of reading different page versions
|
||||
#
|
||||
def test_read_validation(neon_simple_env: NeonEnv):
|
||||
env = neon_simple_env
|
||||
def test_read_validation(neon_shared_env: NeonEnv):
|
||||
env = neon_shared_env
|
||||
|
||||
endpoint = env.endpoints.create_start("main")
|
||||
with closing(endpoint.connect()) as con:
|
||||
@@ -125,8 +125,8 @@ def test_read_validation(neon_simple_env: NeonEnv):
|
||||
log.info(f"Caught an expected failure: {e}")
|
||||
|
||||
|
||||
def test_read_validation_neg(neon_simple_env: NeonEnv):
|
||||
env = neon_simple_env
|
||||
def test_read_validation_neg(neon_shared_env: NeonEnv):
|
||||
env = neon_shared_env
|
||||
env.pageserver.allowed_errors.append(".*invalid LSN\\(0\\) in request.*")
|
||||
|
||||
endpoint = env.endpoints.create_start("main")
|
||||
|
||||
@@ -215,8 +215,8 @@ def test_readonly_node_gc(neon_env_builder: NeonEnvBuilder):
|
||||
|
||||
|
||||
# Similar test, but with more data, and we force checkpoints
|
||||
def test_timetravel(neon_simple_env: NeonEnv):
|
||||
env = neon_simple_env
|
||||
def test_timetravel(neon_shared_env: NeonEnv):
|
||||
env = neon_shared_env
|
||||
tenant_id = env.initial_tenant
|
||||
timeline_id = env.initial_timeline
|
||||
client = env.pageserver.http_client()
|
||||
|
||||
@@ -374,7 +374,7 @@ def test_sharding_split_smoke(
|
||||
non_default_tenant_config = {"gc_horizon": 77 * 1024 * 1024}
|
||||
|
||||
env = neon_env_builder.init_configs(True)
|
||||
neon_env_builder.start()
|
||||
env.start()
|
||||
tenant_id = TenantId.generate()
|
||||
timeline_id = TimelineId.generate()
|
||||
env.neon_cli.create_tenant(
|
||||
@@ -1436,7 +1436,7 @@ def test_sharding_unlogged_relation(neon_env_builder: NeonEnvBuilder):
|
||||
|
||||
neon_env_builder.num_pageservers = 2
|
||||
env = neon_env_builder.init_configs()
|
||||
neon_env_builder.start()
|
||||
env.start()
|
||||
|
||||
tenant_id = TenantId.generate()
|
||||
timeline_id = TimelineId.generate()
|
||||
@@ -1475,7 +1475,7 @@ def test_top_tenants(neon_env_builder: NeonEnvBuilder):
|
||||
"""
|
||||
|
||||
env = neon_env_builder.init_configs()
|
||||
neon_env_builder.start()
|
||||
env.start()
|
||||
|
||||
tenants = []
|
||||
n_tenants = 8
|
||||
|
||||
@@ -7,8 +7,8 @@ from fixtures.utils import wait_until
|
||||
|
||||
# This test checks of logical replication subscriber is able to correctly restart replication without receiving duplicates.
|
||||
# It requires tracking information about replication origins at page server side
|
||||
def test_subscriber_restart(neon_simple_env: NeonEnv):
|
||||
env = neon_simple_env
|
||||
def test_subscriber_restart(neon_shared_env: NeonEnv):
|
||||
env = neon_shared_env
|
||||
env.neon_cli.create_branch("publisher")
|
||||
pub = env.endpoints.create("publisher")
|
||||
pub.start()
|
||||
|
||||
@@ -7,8 +7,8 @@ from fixtures.neon_fixtures import NeonEnv, check_restored_datadir_content
|
||||
# maintained in the pageserver, so subtransactions are not very exciting for
|
||||
# Neon. They are included in the commit record though and updated in the
|
||||
# CLOG.
|
||||
def test_subxacts(neon_simple_env: NeonEnv, test_output_dir):
|
||||
env = neon_simple_env
|
||||
def test_subxacts(neon_shared_env: NeonEnv, test_output_dir):
|
||||
env = neon_shared_env
|
||||
endpoint = env.endpoints.create_start("main")
|
||||
|
||||
pg_conn = endpoint.connect()
|
||||
|
||||
@@ -36,8 +36,8 @@ from fixtures.utils import query_scalar, run_pg_bench_small, wait_until
|
||||
from urllib3.util.retry import Retry
|
||||
|
||||
|
||||
def test_timeline_delete(neon_simple_env: NeonEnv):
|
||||
env = neon_simple_env
|
||||
def test_timeline_delete(neon_shared_env: NeonEnv):
|
||||
env = neon_shared_env
|
||||
|
||||
env.pageserver.allowed_errors.extend(
|
||||
[
|
||||
|
||||
@@ -1,13 +1,13 @@
|
||||
import time
|
||||
|
||||
from fixtures.neon_fixtures import NeonEnvBuilder
|
||||
from fixtures.neon_fixtures import NeonEnv
|
||||
|
||||
|
||||
#
|
||||
# Test truncation of FSM and VM forks of a relation
|
||||
#
|
||||
def test_truncate(neon_env_builder: NeonEnvBuilder, zenbenchmark):
|
||||
env = neon_env_builder.init_start()
|
||||
def test_truncate(neon_shared_env: NeonEnv, zenbenchmark):
|
||||
env = neon_shared_env
|
||||
n_records = 10000
|
||||
n_iter = 10
|
||||
|
||||
|
||||
@@ -7,8 +7,8 @@ from fixtures.neon_fixtures import NeonEnv, fork_at_current_lsn
|
||||
#
|
||||
# Test branching, when a transaction is in prepared state
|
||||
#
|
||||
def test_twophase(neon_simple_env: NeonEnv):
|
||||
env = neon_simple_env
|
||||
def test_twophase(neon_shared_env: NeonEnv):
|
||||
env = neon_shared_env
|
||||
endpoint = env.endpoints.create_start("main", config_lines=["max_prepared_transactions=5"])
|
||||
|
||||
conn = endpoint.connect()
|
||||
|
||||
@@ -7,8 +7,8 @@ from fixtures.pg_version import PgVersion
|
||||
# fork to reset them during recovery. In Neon, pageserver directly sends init
|
||||
# fork contents as main fork during basebackup.
|
||||
#
|
||||
def test_unlogged(neon_simple_env: NeonEnv):
|
||||
env = neon_simple_env
|
||||
def test_unlogged(neon_shared_env: NeonEnv):
|
||||
env = neon_shared_env
|
||||
endpoint = env.endpoints.create_start("main")
|
||||
|
||||
conn = endpoint.connect()
|
||||
|
||||
@@ -10,8 +10,8 @@ from fixtures.utils import query_scalar
|
||||
# Test that the VM bit is cleared correctly at a HEAP_DELETE and
|
||||
# HEAP_UPDATE record.
|
||||
#
|
||||
def test_vm_bit_clear(neon_simple_env: NeonEnv):
|
||||
env = neon_simple_env
|
||||
def test_vm_bit_clear(neon_shared_env: NeonEnv):
|
||||
env = neon_shared_env
|
||||
|
||||
endpoint = env.endpoints.create_start("main")
|
||||
|
||||
@@ -114,13 +114,13 @@ def test_vm_bit_clear(neon_simple_env: NeonEnv):
|
||||
assert cur_new.fetchall() == []
|
||||
|
||||
|
||||
def test_vm_bit_clear_on_heap_lock_whitebox(neon_env_builder: NeonEnvBuilder):
|
||||
def test_vm_bit_clear_on_heap_lock_whitebox(neon_shared_env: NeonEnv):
|
||||
"""
|
||||
Test that the ALL_FROZEN VM bit is cleared correctly at a HEAP_LOCK record.
|
||||
|
||||
This is a repro for the bug fixed in commit 66fa176cc8.
|
||||
"""
|
||||
env = neon_env_builder.init_start()
|
||||
env = neon_shared_env
|
||||
endpoint = env.endpoints.create_start(
|
||||
"main",
|
||||
config_lines=[
|
||||
|
||||
Reference in New Issue
Block a user