mirror of
https://github.com/neondatabase/neon.git
synced 2026-01-31 01:00:36 +00:00
Compare commits
15 Commits
v16-lr-wor
...
heikki/tes
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
87f270ee7a | ||
|
|
bfae8ff23e | ||
|
|
a6e64fcc90 | ||
|
|
390235e095 | ||
|
|
7565939dce | ||
|
|
cde4ea2b39 | ||
|
|
60320e8e45 | ||
|
|
fe1fe213ba | ||
|
|
67d48ac867 | ||
|
|
4da40cbbfb | ||
|
|
8e7f336540 | ||
|
|
cb060548fb | ||
|
|
bae793ffcd | ||
|
|
26b5fcdc50 | ||
|
|
97582178cb |
@@ -90,8 +90,11 @@ fn main() -> Result<()> {
|
||||
handle_init(sub_args).map(Some)
|
||||
} else {
|
||||
// all other commands need an existing config
|
||||
let mut env =
|
||||
LocalEnv::load_config(&local_env::base_path()).context("Error loading config")?;
|
||||
let base_path = local_env::base_path();
|
||||
let branch_mappings_path = local_env::branch_mappings_path();
|
||||
|
||||
let mut env = LocalEnv::load_config(&base_path, Some(&branch_mappings_path))
|
||||
.context("Error loading config")?;
|
||||
let original_env = env.clone();
|
||||
|
||||
let rt = tokio::runtime::Builder::new_current_thread()
|
||||
@@ -264,7 +267,7 @@ async fn get_timeline_infos(
|
||||
fn get_tenant_id(sub_match: &ArgMatches, env: &local_env::LocalEnv) -> anyhow::Result<TenantId> {
|
||||
if let Some(tenant_id_from_arguments) = parse_tenant_id(sub_match).transpose() {
|
||||
tenant_id_from_arguments
|
||||
} else if let Some(default_id) = env.default_tenant_id {
|
||||
} else if let Some(default_id) = env.branch_mappings.default_tenant_id {
|
||||
Ok(default_id)
|
||||
} else {
|
||||
anyhow::bail!("No tenant id. Use --tenant-id, or set a default tenant");
|
||||
@@ -278,7 +281,7 @@ fn get_tenant_shard_id(
|
||||
) -> anyhow::Result<TenantShardId> {
|
||||
if let Some(tenant_id_from_arguments) = parse_tenant_shard_id(sub_match).transpose() {
|
||||
tenant_id_from_arguments
|
||||
} else if let Some(default_id) = env.default_tenant_id {
|
||||
} else if let Some(default_id) = env.branch_mappings.default_tenant_id {
|
||||
Ok(TenantShardId::unsharded(default_id))
|
||||
} else {
|
||||
anyhow::bail!("No tenant shard id. Use --tenant-id, or set a default tenant");
|
||||
@@ -360,7 +363,6 @@ fn handle_init(init_match: &ArgMatches) -> anyhow::Result<LocalEnv> {
|
||||
.collect(),
|
||||
pg_distrib_dir: None,
|
||||
neon_distrib_dir: None,
|
||||
default_tenant_id: TenantId::from_array(std::array::from_fn(|_| 0)),
|
||||
storage_controller: None,
|
||||
control_plane_compute_hook_api: None,
|
||||
}
|
||||
@@ -368,8 +370,12 @@ fn handle_init(init_match: &ArgMatches) -> anyhow::Result<LocalEnv> {
|
||||
|
||||
LocalEnv::init(init_conf, force)
|
||||
.context("materialize initial neon_local environment on disk")?;
|
||||
Ok(LocalEnv::load_config(&local_env::base_path())
|
||||
.expect("freshly written config should be loadable"))
|
||||
let base_path = local_env::base_path();
|
||||
let branch_mappings_path = local_env::branch_mappings_path();
|
||||
let env = LocalEnv::load_config(&base_path, Some(&branch_mappings_path))
|
||||
.expect("freshly written config should be loadable");
|
||||
|
||||
Ok(env)
|
||||
}
|
||||
|
||||
/// The default pageserver is the one where CLI tenant/timeline operations are sent by default.
|
||||
@@ -525,14 +531,14 @@ async fn handle_tenant(
|
||||
|
||||
if create_match.get_flag("set-default") {
|
||||
println!("Setting tenant {tenant_id} as a default one");
|
||||
env.default_tenant_id = Some(tenant_id);
|
||||
env.branch_mappings.default_tenant_id = Some(tenant_id);
|
||||
}
|
||||
}
|
||||
Some(("set-default", set_default_match)) => {
|
||||
let tenant_id =
|
||||
parse_tenant_id(set_default_match)?.context("No tenant id specified")?;
|
||||
println!("Setting tenant {tenant_id} as a default one");
|
||||
env.default_tenant_id = Some(tenant_id);
|
||||
env.branch_mappings.default_tenant_id = Some(tenant_id);
|
||||
}
|
||||
Some(("config", create_match)) => {
|
||||
let tenant_id = get_tenant_id(create_match, env)?;
|
||||
@@ -640,6 +646,8 @@ async fn handle_timeline(timeline_match: &ArgMatches, env: &mut local_env::Local
|
||||
}
|
||||
Some(("branch", branch_match)) => {
|
||||
let tenant_id = get_tenant_id(branch_match, env)?;
|
||||
let new_timeline_id =
|
||||
parse_timeline_id(branch_match)?.unwrap_or(TimelineId::generate());
|
||||
let new_branch_name = branch_match
|
||||
.get_one::<String>("branch-name")
|
||||
.ok_or_else(|| anyhow!("No branch name provided"))?;
|
||||
@@ -658,7 +666,6 @@ async fn handle_timeline(timeline_match: &ArgMatches, env: &mut local_env::Local
|
||||
.map(|lsn_str| Lsn::from_str(lsn_str))
|
||||
.transpose()
|
||||
.context("Failed to parse ancestor start Lsn from the request")?;
|
||||
let new_timeline_id = TimelineId::generate();
|
||||
let storage_controller = StorageController::from_env(env);
|
||||
let create_req = TimelineCreateRequest {
|
||||
new_timeline_id,
|
||||
@@ -692,6 +699,7 @@ async fn handle_endpoint(ep_match: &ArgMatches, env: &local_env::LocalEnv) -> Re
|
||||
Some(ep_subcommand_data) => ep_subcommand_data,
|
||||
None => bail!("no endpoint subcommand provided"),
|
||||
};
|
||||
|
||||
let mut cplane = ComputeControlPlane::load(env.clone())?;
|
||||
|
||||
match sub_name {
|
||||
@@ -1359,6 +1367,7 @@ async fn handle_stop_all(sub_match: &ArgMatches, env: &local_env::LocalEnv) -> R
|
||||
|
||||
async fn try_stop_all(env: &local_env::LocalEnv, immediate: bool) {
|
||||
// Stop all endpoints
|
||||
// NOTE: This only knows about endpoints in the default endpoints dir
|
||||
match ComputeControlPlane::load(env.clone()) {
|
||||
Ok(cplane) => {
|
||||
for (_k, node) in cplane.endpoints {
|
||||
@@ -1419,6 +1428,18 @@ fn cli() -> Command {
|
||||
.default_value("10s")
|
||||
.required(false);
|
||||
|
||||
let branch_mappings_arg = Arg::new("branch-mappings")
|
||||
.long("branch-mappings")
|
||||
.help("File holding all branch names. Default is <repo dir>/branches.toml")
|
||||
.value_parser(value_parser!(PathBuf))
|
||||
.required(false);
|
||||
|
||||
let endpoints_dir_arg = Arg::new("endpoints-dir")
|
||||
.long("endpoints-dir")
|
||||
.help("Path to directory holding all endpoints. Default is <repo dir>/endpoints")
|
||||
.value_parser(value_parser!(PathBuf))
|
||||
.required(false);
|
||||
|
||||
let branch_name_arg = Arg::new("branch-name")
|
||||
.long("branch-name")
|
||||
.help("Name of the branch to be created or used as an alias for other services")
|
||||
@@ -1559,6 +1580,8 @@ fn cli() -> Command {
|
||||
Command::new("Neon CLI")
|
||||
.arg_required_else_help(true)
|
||||
.version(GIT_VERSION)
|
||||
.arg(branch_mappings_arg)
|
||||
.arg(endpoints_dir_arg)
|
||||
.subcommand(
|
||||
Command::new("init")
|
||||
.about("Initialize a new Neon repository, preparing configs for services to start with")
|
||||
@@ -1570,7 +1593,6 @@ fn cli() -> Command {
|
||||
.value_parser(value_parser!(PathBuf))
|
||||
.value_name("config")
|
||||
)
|
||||
.arg(pg_version_arg.clone())
|
||||
.arg(force_arg)
|
||||
)
|
||||
.subcommand(
|
||||
@@ -1583,6 +1605,7 @@ fn cli() -> Command {
|
||||
.subcommand(Command::new("branch")
|
||||
.about("Create a new timeline, using another timeline as a base, copying its data")
|
||||
.arg(tenant_id_arg.clone())
|
||||
.arg(timeline_id_arg.clone())
|
||||
.arg(branch_name_arg.clone())
|
||||
.arg(Arg::new("ancestor-branch-name").long("ancestor-branch-name")
|
||||
.help("Use last Lsn of another timeline (and its data) as base when creating the new timeline. The timeline gets resolved by its branch name.").required(false))
|
||||
|
||||
@@ -46,6 +46,11 @@ pub struct LocalEnv {
|
||||
// must be an absolute path. If the env var is not set, $PWD/.neon is used.
|
||||
pub base_data_dir: PathBuf,
|
||||
|
||||
// Similarly, path to branch mappings file. Not stored in the config file but
|
||||
// read from the NEON_BRANCH_MAPPINGS env variable. "None" means no mappings
|
||||
// are loaded, and they cannot be saved either.
|
||||
pub branch_name_mappings_path: Option<PathBuf>,
|
||||
|
||||
// Path to postgres distribution. It's expected that "bin", "include",
|
||||
// "lib", "share" from postgres distribution are there. If at some point
|
||||
// in time we will be able to run against vanilla postgres we may split that
|
||||
@@ -55,10 +60,6 @@ pub struct LocalEnv {
|
||||
// Path to pageserver binary.
|
||||
pub neon_distrib_dir: PathBuf,
|
||||
|
||||
// Default tenant ID to use with the 'neon_local' command line utility, when
|
||||
// --tenant_id is not explicitly specified.
|
||||
pub default_tenant_id: Option<TenantId>,
|
||||
|
||||
// used to issue tokens during e.g pg start
|
||||
pub private_key_path: PathBuf,
|
||||
|
||||
@@ -82,11 +83,7 @@ pub struct LocalEnv {
|
||||
// storage controller's configuration.
|
||||
pub control_plane_compute_hook_api: Option<Url>,
|
||||
|
||||
/// Keep human-readable aliases in memory (and persist them to config), to hide ZId hex strings from the user.
|
||||
// A `HashMap<String, HashMap<TenantId, TimelineId>>` would be more appropriate here,
|
||||
// but deserialization into a generic toml object as `toml::Value::try_from` fails with an error.
|
||||
// https://toml.io/en/v1.0.0 does not contain a concept of "a table inside another table".
|
||||
pub branch_name_mappings: HashMap<String, Vec<(TenantId, TimelineId)>>,
|
||||
pub branch_mappings: BranchMappings,
|
||||
}
|
||||
|
||||
/// On-disk state stored in `.neon/config`.
|
||||
@@ -95,7 +92,6 @@ pub struct LocalEnv {
|
||||
pub struct OnDiskConfig {
|
||||
pub pg_distrib_dir: PathBuf,
|
||||
pub neon_distrib_dir: PathBuf,
|
||||
pub default_tenant_id: Option<TenantId>,
|
||||
pub private_key_path: PathBuf,
|
||||
pub broker: NeonBroker,
|
||||
pub storage_controller: NeonStorageControllerConf,
|
||||
@@ -107,7 +103,20 @@ pub struct OnDiskConfig {
|
||||
pub safekeepers: Vec<SafekeeperConf>,
|
||||
pub control_plane_api: Option<Url>,
|
||||
pub control_plane_compute_hook_api: Option<Url>,
|
||||
branch_name_mappings: HashMap<String, Vec<(TenantId, TimelineId)>>,
|
||||
}
|
||||
|
||||
#[derive(PartialEq, Eq, Clone, Debug, Default, Serialize, Deserialize)]
|
||||
#[serde(default, deny_unknown_fields)]
|
||||
pub struct BranchMappings {
|
||||
// Default tenant ID to use with the 'neon_local' command line utility, when
|
||||
// --tenant_id is not explicitly specified. This comes from the branches.
|
||||
pub default_tenant_id: Option<TenantId>,
|
||||
|
||||
/// Keep human-readable aliases in memory (and persist them to config XXX), to hide ZId hex strings from the user.
|
||||
// A `HashMap<String, HashMap<TenantId, TimelineId>>` would be more appropriate here,
|
||||
// but deserialization into a generic toml object as `toml::Value::try_from` fails with an error.
|
||||
// https://toml.io/en/v1.0.0 does not contain a concept of "a table inside another table".
|
||||
pub mappings: HashMap<String, Vec<(TenantId, TimelineId)>>,
|
||||
}
|
||||
|
||||
fn fail_if_pageservers_field_specified<'de, D>(_: D) -> Result<Vec<PageServerConf>, D::Error>
|
||||
@@ -128,7 +137,6 @@ pub struct NeonLocalInitConf {
|
||||
pub pg_distrib_dir: Option<PathBuf>,
|
||||
// TODO: do we need this? Seems unused
|
||||
pub neon_distrib_dir: Option<PathBuf>,
|
||||
pub default_tenant_id: TenantId,
|
||||
pub broker: NeonBroker,
|
||||
pub storage_controller: Option<NeonStorageControllerConf>,
|
||||
pub pageservers: Vec<NeonLocalInitPageserverConf>,
|
||||
@@ -443,7 +451,8 @@ impl LocalEnv {
|
||||
timeline_id: TimelineId,
|
||||
) -> anyhow::Result<()> {
|
||||
let existing_values = self
|
||||
.branch_name_mappings
|
||||
.branch_mappings
|
||||
.mappings
|
||||
.entry(branch_name.clone())
|
||||
.or_default();
|
||||
|
||||
@@ -468,7 +477,8 @@ impl LocalEnv {
|
||||
branch_name: &str,
|
||||
tenant_id: TenantId,
|
||||
) -> Option<TimelineId> {
|
||||
self.branch_name_mappings
|
||||
self.branch_mappings
|
||||
.mappings
|
||||
.get(branch_name)?
|
||||
.iter()
|
||||
.find(|(mapped_tenant_id, _)| mapped_tenant_id == &tenant_id)
|
||||
@@ -477,7 +487,8 @@ impl LocalEnv {
|
||||
}
|
||||
|
||||
pub fn timeline_name_mappings(&self) -> HashMap<TenantTimelineId, String> {
|
||||
self.branch_name_mappings
|
||||
self.branch_mappings
|
||||
.mappings
|
||||
.iter()
|
||||
.flat_map(|(name, tenant_timelines)| {
|
||||
tenant_timelines.iter().map(|&(tenant_id, timeline_id)| {
|
||||
@@ -488,7 +499,10 @@ impl LocalEnv {
|
||||
}
|
||||
|
||||
/// Construct `Self` from on-disk state.
|
||||
pub fn load_config(repopath: &Path) -> anyhow::Result<Self> {
|
||||
pub fn load_config(
|
||||
repopath: &Path,
|
||||
branch_name_mappings_path: Option<&Path>,
|
||||
) -> anyhow::Result<Self> {
|
||||
if !repopath.exists() {
|
||||
bail!(
|
||||
"Neon config is not found in {}. You need to run 'neon_local init' first",
|
||||
@@ -498,37 +512,43 @@ impl LocalEnv {
|
||||
|
||||
// TODO: check that it looks like a neon repository
|
||||
|
||||
// load and parse file
|
||||
// load and parse config file
|
||||
let config_file_contents = fs::read_to_string(repopath.join("config"))?;
|
||||
let on_disk_config: OnDiskConfig = toml::from_str(config_file_contents.as_str())?;
|
||||
let mut env = {
|
||||
let OnDiskConfig {
|
||||
pg_distrib_dir,
|
||||
neon_distrib_dir,
|
||||
default_tenant_id,
|
||||
private_key_path,
|
||||
broker,
|
||||
storage_controller,
|
||||
pageservers,
|
||||
safekeepers,
|
||||
control_plane_api,
|
||||
control_plane_compute_hook_api,
|
||||
branch_name_mappings,
|
||||
} = on_disk_config;
|
||||
LocalEnv {
|
||||
base_data_dir: repopath.to_owned(),
|
||||
pg_distrib_dir,
|
||||
neon_distrib_dir,
|
||||
default_tenant_id,
|
||||
private_key_path,
|
||||
broker,
|
||||
storage_controller,
|
||||
pageservers,
|
||||
safekeepers,
|
||||
control_plane_api,
|
||||
control_plane_compute_hook_api,
|
||||
branch_name_mappings,
|
||||
}
|
||||
let OnDiskConfig {
|
||||
pg_distrib_dir,
|
||||
neon_distrib_dir,
|
||||
private_key_path,
|
||||
broker,
|
||||
storage_controller,
|
||||
pageservers,
|
||||
safekeepers,
|
||||
control_plane_api,
|
||||
control_plane_compute_hook_api,
|
||||
} = on_disk_config;
|
||||
|
||||
// load and parse "branches.toml" file
|
||||
let branch_mappings = if let Some(path) = branch_name_mappings_path {
|
||||
let contents = fs::read_to_string(path)
|
||||
.context(format!("load branch mappings file {}", path.display()))?;
|
||||
toml::from_str::<BranchMappings>(contents.as_str())?
|
||||
} else {
|
||||
BranchMappings::default()
|
||||
};
|
||||
|
||||
let mut env = LocalEnv {
|
||||
base_data_dir: repopath.to_owned(),
|
||||
branch_name_mappings_path: branch_name_mappings_path.map(|p| p.to_owned()),
|
||||
pg_distrib_dir,
|
||||
neon_distrib_dir,
|
||||
private_key_path,
|
||||
broker,
|
||||
storage_controller,
|
||||
pageservers,
|
||||
safekeepers,
|
||||
control_plane_api,
|
||||
control_plane_compute_hook_api,
|
||||
branch_mappings,
|
||||
};
|
||||
|
||||
// The source of truth for pageserver configuration is the pageserver.toml.
|
||||
@@ -618,7 +638,6 @@ impl LocalEnv {
|
||||
&OnDiskConfig {
|
||||
pg_distrib_dir: self.pg_distrib_dir.clone(),
|
||||
neon_distrib_dir: self.neon_distrib_dir.clone(),
|
||||
default_tenant_id: self.default_tenant_id,
|
||||
private_key_path: self.private_key_path.clone(),
|
||||
broker: self.broker.clone(),
|
||||
storage_controller: self.storage_controller.clone(),
|
||||
@@ -626,9 +645,20 @@ impl LocalEnv {
|
||||
safekeepers: self.safekeepers.clone(),
|
||||
control_plane_api: self.control_plane_api.clone(),
|
||||
control_plane_compute_hook_api: self.control_plane_compute_hook_api.clone(),
|
||||
branch_name_mappings: self.branch_name_mappings.clone(),
|
||||
},
|
||||
)
|
||||
)?;
|
||||
|
||||
if let Some(path) = &self.branch_name_mappings_path {
|
||||
Self::persist_branches_impl(path, &self.branch_mappings)?;
|
||||
} else {
|
||||
if !self.branch_mappings.mappings.is_empty() {
|
||||
tracing::warn!("command created a branch mapping, but it was not saved because no mappings file was configured")
|
||||
} else if self.branch_mappings.default_tenant_id.is_some() {
|
||||
tracing::warn!("command created a tenant default, but it was not saved because no mappings file was configured")
|
||||
}
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub fn persist_config_impl(base_path: &Path, config: &OnDiskConfig) -> anyhow::Result<()> {
|
||||
@@ -642,6 +672,19 @@ impl LocalEnv {
|
||||
})
|
||||
}
|
||||
|
||||
pub fn persist_branches_impl(
|
||||
branch_name_mappings_path: &Path,
|
||||
branch_mappings: &BranchMappings,
|
||||
) -> anyhow::Result<()> {
|
||||
let content = &toml::to_string_pretty(branch_mappings)?;
|
||||
fs::write(branch_name_mappings_path, content).with_context(|| {
|
||||
format!(
|
||||
"Failed to write branch information into path '{}'",
|
||||
branch_name_mappings_path.display()
|
||||
)
|
||||
})
|
||||
}
|
||||
|
||||
// this function is used only for testing purposes in CLI e g generate tokens during init
|
||||
pub fn generate_auth_token(&self, claims: &Claims) -> anyhow::Result<String> {
|
||||
let private_key_path = self.get_private_key_path();
|
||||
@@ -702,7 +745,6 @@ impl LocalEnv {
|
||||
let NeonLocalInitConf {
|
||||
pg_distrib_dir,
|
||||
neon_distrib_dir,
|
||||
default_tenant_id,
|
||||
broker,
|
||||
storage_controller,
|
||||
pageservers,
|
||||
@@ -746,9 +788,9 @@ impl LocalEnv {
|
||||
// TODO: refactor to avoid this, LocalEnv should only be constructed from on-disk state
|
||||
let env = LocalEnv {
|
||||
base_data_dir: base_path.clone(),
|
||||
branch_name_mappings_path: Some(base_path.join("branches.toml")),
|
||||
pg_distrib_dir,
|
||||
neon_distrib_dir,
|
||||
default_tenant_id: Some(default_tenant_id),
|
||||
private_key_path,
|
||||
broker,
|
||||
storage_controller: storage_controller.unwrap_or_default(),
|
||||
@@ -756,10 +798,10 @@ impl LocalEnv {
|
||||
safekeepers,
|
||||
control_plane_api: control_plane_api.unwrap_or_default(),
|
||||
control_plane_compute_hook_api: control_plane_compute_hook_api.unwrap_or_default(),
|
||||
branch_name_mappings: Default::default(),
|
||||
branch_mappings: Default::default(),
|
||||
};
|
||||
|
||||
// create endpoints dir
|
||||
// create the default endpoints dir
|
||||
fs::create_dir_all(env.endpoints_path())?;
|
||||
|
||||
// create safekeeper dirs
|
||||
@@ -806,6 +848,24 @@ pub fn base_path() -> PathBuf {
|
||||
path
|
||||
}
|
||||
|
||||
pub fn branch_mappings_path() -> PathBuf {
|
||||
let path = match std::env::var_os("NEON_BRANCH_MAPPINGS") {
|
||||
Some(val) => {
|
||||
let path = PathBuf::from(val);
|
||||
|
||||
// a relative path is relative to repo dir
|
||||
if !path.is_absolute() {
|
||||
base_path().join(path)
|
||||
} else {
|
||||
path
|
||||
}
|
||||
}
|
||||
None => base_path().join("branches.toml"),
|
||||
};
|
||||
assert!(path.is_absolute());
|
||||
path
|
||||
}
|
||||
|
||||
/// Generate a public/private key pair for JWT authentication
|
||||
fn generate_auth_keys(private_key_path: &Path, public_key_path: &Path) -> anyhow::Result<()> {
|
||||
// Generate the key pair
|
||||
|
||||
@@ -64,6 +64,7 @@ pub struct ConfigToml {
|
||||
#[serde(with = "humantime_serde")]
|
||||
pub wal_redo_timeout: Duration,
|
||||
pub superuser: String,
|
||||
pub initdb_cache_dir: Option<Utf8PathBuf>,
|
||||
pub page_cache_size: usize,
|
||||
pub max_file_descriptors: usize,
|
||||
pub pg_distrib_dir: Option<Utf8PathBuf>,
|
||||
@@ -358,6 +359,7 @@ impl Default for ConfigToml {
|
||||
wal_redo_timeout: (humantime::parse_duration(DEFAULT_WAL_REDO_TIMEOUT)
|
||||
.expect("cannot parse default wal redo timeout")),
|
||||
superuser: (DEFAULT_SUPERUSER.to_string()),
|
||||
initdb_cache_dir: None,
|
||||
page_cache_size: (DEFAULT_PAGE_CACHE_SIZE),
|
||||
max_file_descriptors: (DEFAULT_MAX_FILE_DESCRIPTORS),
|
||||
pg_distrib_dir: None, // Utf8PathBuf::from("./pg_install"), // TODO: formely, this was std::env::current_dir()
|
||||
|
||||
@@ -89,8 +89,19 @@ impl PageserverUtilization {
|
||||
|
||||
/// If a node is currently hosting more work than it can comfortably handle. This does not indicate that
|
||||
/// it will fail, but it is a strong signal that more work should not be added unless there is no alternative.
|
||||
///
|
||||
/// When a node is overloaded, we may override soft affinity preferences and do things like scheduling
|
||||
/// into a node in a less desirable AZ, if all the nodes in the preferred AZ are overloaded.
|
||||
pub fn is_overloaded(score: RawScore) -> bool {
|
||||
score >= Self::UTILIZATION_FULL
|
||||
// Why the factor of two? This is unscientific but reflects behavior of real systems:
|
||||
// - In terms of shard counts, a node's preferred max count is a soft limit intended to keep
|
||||
// startup and housekeeping jobs nice and responsive. We can go to double this limit if needed
|
||||
// until some more nodes are deployed.
|
||||
// - In terms of disk space, the node's utilization heuristic assumes every tenant needs to
|
||||
// hold its biggest timeline fully on disk, which is tends to be an over estimate when
|
||||
// some tenants are very idle and have dropped layers from disk. In practice going up to
|
||||
// double is generally better than giving up and scheduling in a sub-optimal AZ.
|
||||
score >= 2 * Self::UTILIZATION_FULL
|
||||
}
|
||||
|
||||
pub fn adjust_shard_count_max(&mut self, shard_count: u32) {
|
||||
|
||||
@@ -81,17 +81,16 @@ pub fn is_expected_io_error(e: &io::Error) -> bool {
|
||||
)
|
||||
}
|
||||
|
||||
#[async_trait::async_trait]
|
||||
pub trait Handler<IO> {
|
||||
/// Handle single query.
|
||||
/// postgres_backend will issue ReadyForQuery after calling this (this
|
||||
/// might be not what we want after CopyData streaming, but currently we don't
|
||||
/// care). It will also flush out the output buffer.
|
||||
async fn process_query(
|
||||
fn process_query(
|
||||
&mut self,
|
||||
pgb: &mut PostgresBackend<IO>,
|
||||
query_string: &str,
|
||||
) -> Result<(), QueryError>;
|
||||
) -> impl Future<Output = Result<(), QueryError>>;
|
||||
|
||||
/// Called on startup packet receival, allows to process params.
|
||||
///
|
||||
|
||||
@@ -23,7 +23,6 @@ async fn make_tcp_pair() -> (TcpStream, TcpStream) {
|
||||
|
||||
struct TestHandler {}
|
||||
|
||||
#[async_trait::async_trait]
|
||||
impl<IO: AsyncRead + AsyncWrite + Unpin + Send> Handler<IO> for TestHandler {
|
||||
// return single col 'hey' for any query
|
||||
async fn process_query(
|
||||
|
||||
@@ -71,6 +71,8 @@ pub struct PageServerConf {
|
||||
|
||||
pub superuser: String,
|
||||
|
||||
pub initdb_cache_dir: Option<Utf8PathBuf>,
|
||||
|
||||
pub page_cache_size: usize,
|
||||
pub max_file_descriptors: usize,
|
||||
|
||||
@@ -309,6 +311,7 @@ impl PageServerConf {
|
||||
wait_lsn_timeout,
|
||||
wal_redo_timeout,
|
||||
superuser,
|
||||
initdb_cache_dir,
|
||||
page_cache_size,
|
||||
max_file_descriptors,
|
||||
pg_distrib_dir,
|
||||
@@ -356,6 +359,7 @@ impl PageServerConf {
|
||||
wait_lsn_timeout,
|
||||
wal_redo_timeout,
|
||||
superuser,
|
||||
initdb_cache_dir,
|
||||
page_cache_size,
|
||||
max_file_descriptors,
|
||||
http_auth_type,
|
||||
|
||||
@@ -1199,7 +1199,6 @@ impl PageServerHandler {
|
||||
}
|
||||
}
|
||||
|
||||
#[async_trait::async_trait]
|
||||
impl<IO> postgres_backend::Handler<IO> for PageServerHandler
|
||||
where
|
||||
IO: AsyncRead + AsyncWrite + Send + Sync + Unpin,
|
||||
|
||||
@@ -1205,6 +1205,13 @@ impl<'a> DatadirModification<'a> {
|
||||
img: Bytes,
|
||||
) -> anyhow::Result<()> {
|
||||
anyhow::ensure!(rel.relnode != 0, RelationError::InvalidRelnode);
|
||||
let key = rel_block_to_key(rel, blknum);
|
||||
if !key.is_valid_key_on_write_path() {
|
||||
anyhow::bail!(
|
||||
"the request contains data not supported by pageserver at {}",
|
||||
key
|
||||
);
|
||||
}
|
||||
self.put(rel_block_to_key(rel, blknum), Value::Image(img));
|
||||
Ok(())
|
||||
}
|
||||
@@ -1216,14 +1223,34 @@ impl<'a> DatadirModification<'a> {
|
||||
blknum: BlockNumber,
|
||||
img: Bytes,
|
||||
) -> anyhow::Result<()> {
|
||||
self.put(slru_block_to_key(kind, segno, blknum), Value::Image(img));
|
||||
let key = slru_block_to_key(kind, segno, blknum);
|
||||
if !key.is_valid_key_on_write_path() {
|
||||
anyhow::bail!(
|
||||
"the request contains data not supported by pageserver at {}",
|
||||
key
|
||||
);
|
||||
}
|
||||
self.put(key, Value::Image(img));
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub(crate) fn put_rel_page_image_zero(&mut self, rel: RelTag, blknum: BlockNumber) {
|
||||
self.pending_zero_data_pages
|
||||
.insert(rel_block_to_key(rel, blknum).to_compact());
|
||||
pub(crate) fn put_rel_page_image_zero(
|
||||
&mut self,
|
||||
rel: RelTag,
|
||||
blknum: BlockNumber,
|
||||
) -> anyhow::Result<()> {
|
||||
anyhow::ensure!(rel.relnode != 0, RelationError::InvalidRelnode);
|
||||
let key = rel_block_to_key(rel, blknum);
|
||||
if !key.is_valid_key_on_write_path() {
|
||||
anyhow::bail!(
|
||||
"the request contains data not supported by pageserver: {} @ {}",
|
||||
key,
|
||||
self.lsn
|
||||
);
|
||||
}
|
||||
self.pending_zero_data_pages.insert(key.to_compact());
|
||||
self.pending_bytes += ZERO_PAGE.len();
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub(crate) fn put_slru_page_image_zero(
|
||||
@@ -1231,10 +1258,18 @@ impl<'a> DatadirModification<'a> {
|
||||
kind: SlruKind,
|
||||
segno: u32,
|
||||
blknum: BlockNumber,
|
||||
) {
|
||||
self.pending_zero_data_pages
|
||||
.insert(slru_block_to_key(kind, segno, blknum).to_compact());
|
||||
) -> anyhow::Result<()> {
|
||||
let key = slru_block_to_key(kind, segno, blknum);
|
||||
if !key.is_valid_key_on_write_path() {
|
||||
anyhow::bail!(
|
||||
"the request contains data not supported by pageserver: {} @ {}",
|
||||
key,
|
||||
self.lsn
|
||||
);
|
||||
}
|
||||
self.pending_zero_data_pages.insert(key.to_compact());
|
||||
self.pending_bytes += ZERO_PAGE.len();
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Call this at the end of each WAL record.
|
||||
|
||||
@@ -3491,7 +3491,7 @@ impl Tenant {
|
||||
.context("extract initdb tar")?;
|
||||
} else {
|
||||
// Init temporarily repo to get bootstrap data, this creates a directory in the `pgdata_path` path
|
||||
run_initdb(self.conf, &pgdata_path, pg_version, &self.cancel).await?;
|
||||
run_initdb_with_cache(self.conf, &pgdata_path, pg_version, &self.cancel).await?;
|
||||
|
||||
// Upload the created data dir to S3
|
||||
if self.tenant_shard_id().is_shard_zero() {
|
||||
@@ -3534,6 +3534,7 @@ impl Tenant {
|
||||
// Flush loop needs to be spawned in order to be able to flush.
|
||||
unfinished_timeline.maybe_spawn_flush_loop();
|
||||
|
||||
info!("starting to import from datadir");
|
||||
import_datadir::import_timeline_from_postgres_datadir(
|
||||
unfinished_timeline,
|
||||
&pgdata_path,
|
||||
@@ -3549,6 +3550,7 @@ impl Tenant {
|
||||
anyhow::bail!("failpoint before-checkpoint-new-timeline");
|
||||
});
|
||||
|
||||
info!("calling freeze_and_flush");
|
||||
unfinished_timeline
|
||||
.freeze_and_flush()
|
||||
.await
|
||||
@@ -3559,7 +3561,9 @@ impl Tenant {
|
||||
})?;
|
||||
|
||||
// All done!
|
||||
info!("calling finish_creation");
|
||||
let timeline = raw_timeline.finish_creation()?;
|
||||
info!("call to finish_creation done");
|
||||
|
||||
Ok(timeline)
|
||||
}
|
||||
@@ -3837,6 +3841,118 @@ impl Tenant {
|
||||
}
|
||||
}
|
||||
|
||||
fn cached_initdb_dirname(initial_superuser_name: &str, pg_version: u32) -> String
|
||||
{
|
||||
use std::hash::Hash;
|
||||
use std::hash::Hasher;
|
||||
use std::collections::hash_map::DefaultHasher;
|
||||
let mut hasher = DefaultHasher::new();
|
||||
initial_superuser_name.hash(&mut hasher);
|
||||
let hash = hasher.finish();
|
||||
|
||||
format!("cached_initial_pgdata_{pg_version}_{:016}", hash)
|
||||
}
|
||||
|
||||
fn copy_dir_all(src: impl AsRef<std::path::Path>, dst: impl AsRef<std::path::Path>) -> std::io::Result<()> {
|
||||
for entry in fs::read_dir(src.as_ref())? {
|
||||
let entry = entry?;
|
||||
let subsrc = entry.path();
|
||||
let subdst = dst.as_ref().join(&entry.file_name());
|
||||
|
||||
if entry.file_type()?.is_dir() {
|
||||
std::fs::create_dir(&subdst)?;
|
||||
copy_dir_all(&subsrc, &subdst)?;
|
||||
} else {
|
||||
std::fs::copy(&subsrc, &subdst)?;
|
||||
}
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn restore_cached_initdb_dir(
|
||||
cached_path: &Utf8Path,
|
||||
target_path: &Utf8Path,
|
||||
) -> anyhow::Result<bool> {
|
||||
if !cached_path.exists() {
|
||||
info!("cached initdb dir \"{cached_path}\" does not exist yet");
|
||||
return Ok(false);
|
||||
}
|
||||
|
||||
std::fs::create_dir(target_path)?;
|
||||
copy_dir_all(cached_path, target_path)?;
|
||||
info!("restored initdb result from cache dir \"{cached_path}\"");
|
||||
Ok(true)
|
||||
}
|
||||
|
||||
fn save_cached_initdb_dir(
|
||||
src_path: &Utf8Path,
|
||||
cache_path: &Utf8Path,
|
||||
) -> anyhow::Result<()> {
|
||||
match std::fs::create_dir(cache_path) {
|
||||
Ok(()) => {
|
||||
info!("saving initdb result to cache dir \"{cache_path}\"");
|
||||
},
|
||||
Err(err) if err.kind() == std::io::ErrorKind::AlreadyExists => {
|
||||
info!("cache initdb dir \"{cache_path}\" already exists, not saving");
|
||||
return Ok(())
|
||||
},
|
||||
Err(err) => { return Err(anyhow::Error::from(err))},
|
||||
};
|
||||
let cache_dir_guard = scopeguard::guard(cache_path, |cp| {
|
||||
if let Err(err) = std::fs::remove_dir_all(&cp) {
|
||||
error!("could not remove cached initdb directory {cp}: {err}");
|
||||
}
|
||||
});
|
||||
|
||||
let cache_parent_path = cache_path.parent().ok_or(anyhow::Error::msg("no cache parent path"))?;
|
||||
|
||||
let tmp_dirpath = camino_tempfile::tempdir_in(cache_parent_path)?;
|
||||
copy_dir_all(src_path, &tmp_dirpath)?;
|
||||
std::fs::rename(tmp_dirpath, &*cache_dir_guard)?;
|
||||
|
||||
// disarm the guard
|
||||
scopeguard::ScopeGuard::into_inner(cache_dir_guard);
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn run_initdb_with_cache(
|
||||
conf: &'static PageServerConf,
|
||||
initdb_target_dir: &Utf8Path,
|
||||
pg_version: u32,
|
||||
cancel: &CancellationToken,
|
||||
) -> Result<(), InitdbError> {
|
||||
|
||||
let cache_dir = conf.initdb_cache_dir.as_ref().map(|initdb_cache_dir| {
|
||||
initdb_cache_dir.join(cached_initdb_dirname(&conf.superuser, pg_version))
|
||||
});
|
||||
|
||||
if let Some(cache_dir) = &cache_dir {
|
||||
match restore_cached_initdb_dir(&cache_dir, initdb_target_dir) {
|
||||
Ok(true) => return Ok(()),
|
||||
Ok(false) => {},
|
||||
Err(err) => {
|
||||
warn!("Error restoring from cached initdb directory \"{cache_dir}\": {err}");
|
||||
if initdb_target_dir.exists() {
|
||||
if let Err(err) = std::fs::remove_dir_all(&initdb_target_dir) {
|
||||
error!("could not remove temporary initdb target directory {initdb_target_dir}: {err}");
|
||||
}
|
||||
}
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
run_initdb(conf, initdb_target_dir, pg_version, cancel).await?;
|
||||
|
||||
if let Some(cache_dir) = &cache_dir {
|
||||
if let Err(err) = save_cached_initdb_dir(initdb_target_dir, &cache_dir) {
|
||||
warn!("error saving initdb result to cache directory \"{cache_dir}\": {err}");
|
||||
}
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Create the cluster temporarily in 'initdbpath' directory inside the repository
|
||||
/// to get bootstrap data for timeline initialization.
|
||||
async fn run_initdb(
|
||||
|
||||
@@ -1222,7 +1222,7 @@ impl WalIngest {
|
||||
if rec.blkno % pg_constants::SLOTS_PER_FSM_PAGE != 0 {
|
||||
// Tail of last remaining FSM page has to be zeroed.
|
||||
// We are not precise here and instead of digging in FSM bitmap format just clear the whole page.
|
||||
modification.put_rel_page_image_zero(rel, fsm_physical_page_no);
|
||||
modification.put_rel_page_image_zero(rel, fsm_physical_page_no)?;
|
||||
fsm_physical_page_no += 1;
|
||||
}
|
||||
let nblocks = get_relsize(modification, rel, ctx).await?;
|
||||
@@ -1244,7 +1244,7 @@ impl WalIngest {
|
||||
if rec.blkno % pg_constants::VM_HEAPBLOCKS_PER_PAGE != 0 {
|
||||
// Tail of last remaining vm page has to be zeroed.
|
||||
// We are not precise here and instead of digging in VM bitmap format just clear the whole page.
|
||||
modification.put_rel_page_image_zero(rel, vm_page_no);
|
||||
modification.put_rel_page_image_zero(rel, vm_page_no)?;
|
||||
vm_page_no += 1;
|
||||
}
|
||||
let nblocks = get_relsize(modification, rel, ctx).await?;
|
||||
@@ -1737,7 +1737,7 @@ impl WalIngest {
|
||||
continue;
|
||||
}
|
||||
|
||||
modification.put_rel_page_image_zero(rel, gap_blknum);
|
||||
modification.put_rel_page_image_zero(rel, gap_blknum)?;
|
||||
}
|
||||
}
|
||||
Ok(())
|
||||
@@ -1803,7 +1803,7 @@ impl WalIngest {
|
||||
|
||||
// fill the gap with zeros
|
||||
for gap_blknum in old_nblocks..blknum {
|
||||
modification.put_slru_page_image_zero(kind, segno, gap_blknum);
|
||||
modification.put_slru_page_image_zero(kind, segno, gap_blknum)?;
|
||||
}
|
||||
}
|
||||
Ok(())
|
||||
|
||||
2
proxy/src/cache/endpoints.rs
vendored
2
proxy/src/cache/endpoints.rs
vendored
@@ -242,6 +242,6 @@ mod tests {
|
||||
#[test]
|
||||
fn test() {
|
||||
let s = "{\"branch_created\":null,\"endpoint_created\":{\"endpoint_id\":\"ep-rapid-thunder-w0qqw2q9\"},\"project_created\":null,\"type\":\"endpoint_created\"}";
|
||||
let _: ControlPlaneEventKey = serde_json::from_str(s).unwrap();
|
||||
serde_json::from_str::<ControlPlaneEventKey>(s).unwrap();
|
||||
}
|
||||
}
|
||||
|
||||
@@ -395,7 +395,7 @@ mod tests {
|
||||
}
|
||||
}
|
||||
});
|
||||
let _: KickSession<'_> = serde_json::from_str(&json.to_string())?;
|
||||
serde_json::from_str::<KickSession<'_>>(&json.to_string())?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
@@ -403,7 +403,7 @@ mod tests {
|
||||
#[test]
|
||||
fn parse_db_info() -> anyhow::Result<()> {
|
||||
// with password
|
||||
let _: DatabaseInfo = serde_json::from_value(json!({
|
||||
serde_json::from_value::<DatabaseInfo>(json!({
|
||||
"host": "localhost",
|
||||
"port": 5432,
|
||||
"dbname": "postgres",
|
||||
@@ -413,7 +413,7 @@ mod tests {
|
||||
}))?;
|
||||
|
||||
// without password
|
||||
let _: DatabaseInfo = serde_json::from_value(json!({
|
||||
serde_json::from_value::<DatabaseInfo>(json!({
|
||||
"host": "localhost",
|
||||
"port": 5432,
|
||||
"dbname": "postgres",
|
||||
@@ -422,7 +422,7 @@ mod tests {
|
||||
}))?;
|
||||
|
||||
// new field (forward compatibility)
|
||||
let _: DatabaseInfo = serde_json::from_value(json!({
|
||||
serde_json::from_value::<DatabaseInfo>(json!({
|
||||
"host": "localhost",
|
||||
"port": 5432,
|
||||
"dbname": "postgres",
|
||||
@@ -441,7 +441,7 @@ mod tests {
|
||||
"address": "0.0.0.0",
|
||||
"aux": dummy_aux(),
|
||||
});
|
||||
let _: WakeCompute = serde_json::from_str(&json.to_string())?;
|
||||
serde_json::from_str::<WakeCompute>(&json.to_string())?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
@@ -451,18 +451,18 @@ mod tests {
|
||||
let json = json!({
|
||||
"role_secret": "secret",
|
||||
});
|
||||
let _: GetRoleSecret = serde_json::from_str(&json.to_string())?;
|
||||
serde_json::from_str::<GetRoleSecret>(&json.to_string())?;
|
||||
let json = json!({
|
||||
"role_secret": "secret",
|
||||
"allowed_ips": ["8.8.8.8"],
|
||||
});
|
||||
let _: GetRoleSecret = serde_json::from_str(&json.to_string())?;
|
||||
serde_json::from_str::<GetRoleSecret>(&json.to_string())?;
|
||||
let json = json!({
|
||||
"role_secret": "secret",
|
||||
"allowed_ips": ["8.8.8.8"],
|
||||
"project_id": "project",
|
||||
});
|
||||
let _: GetRoleSecret = serde_json::from_str(&json.to_string())?;
|
||||
serde_json::from_str::<GetRoleSecret>(&json.to_string())?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
@@ -78,7 +78,7 @@ pub(crate) type ComputeReady = DatabaseInfo;
|
||||
|
||||
// TODO: replace with an http-based protocol.
|
||||
struct MgmtHandler;
|
||||
#[async_trait::async_trait]
|
||||
|
||||
impl postgres_backend::Handler<tokio::net::TcpStream> for MgmtHandler {
|
||||
async fn process_query(
|
||||
&mut self,
|
||||
|
||||
@@ -6,7 +6,7 @@ use pq_proto::StartupMessageParams;
|
||||
use smol_str::SmolStr;
|
||||
use std::net::IpAddr;
|
||||
use tokio::sync::mpsc;
|
||||
use tracing::{field::display, info, info_span, Span};
|
||||
use tracing::{debug, field::display, info, info_span, Span};
|
||||
use try_lock::TryLock;
|
||||
use uuid::Uuid;
|
||||
|
||||
@@ -362,7 +362,9 @@ impl RequestMonitoringInner {
|
||||
});
|
||||
}
|
||||
if let Some(tx) = self.sender.take() {
|
||||
let _: Result<(), _> = tx.send(RequestData::from(&*self));
|
||||
tx.send(RequestData::from(&*self))
|
||||
.inspect_err(|e| debug!("tx send failed: {e}"))
|
||||
.ok();
|
||||
}
|
||||
}
|
||||
|
||||
@@ -371,7 +373,9 @@ impl RequestMonitoringInner {
|
||||
// Here we log the length of the session.
|
||||
self.disconnect_timestamp = Some(Utc::now());
|
||||
if let Some(tx) = self.disconnect_sender.take() {
|
||||
let _: Result<(), _> = tx.send(RequestData::from(&*self));
|
||||
tx.send(RequestData::from(&*self))
|
||||
.inspect_err(|e| debug!("tx send failed: {e}"))
|
||||
.ok();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -290,7 +290,7 @@ async fn worker_inner(
|
||||
}
|
||||
|
||||
if !w.flushed_row_groups().is_empty() {
|
||||
let _: Writer<BytesMut> = upload_parquet(w, len, &storage).await?;
|
||||
let _rtchk: Writer<BytesMut> = upload_parquet(w, len, &storage).await?;
|
||||
}
|
||||
|
||||
Ok(())
|
||||
|
||||
@@ -3,7 +3,7 @@
|
||||
#![deny(
|
||||
deprecated,
|
||||
future_incompatible,
|
||||
// TODO: consider let_underscore
|
||||
let_underscore,
|
||||
nonstandard_style,
|
||||
rust_2024_compatibility
|
||||
)]
|
||||
|
||||
@@ -268,7 +268,7 @@ async fn keepalive_is_inherited() -> anyhow::Result<()> {
|
||||
anyhow::Ok(keepalive)
|
||||
});
|
||||
|
||||
let _ = TcpStream::connect(("127.0.0.1", port)).await?;
|
||||
TcpStream::connect(("127.0.0.1", port)).await?;
|
||||
assert!(t.await??, "keepalive should be inherited");
|
||||
|
||||
Ok(())
|
||||
|
||||
@@ -6,7 +6,7 @@ use redis::{
|
||||
ConnectionInfo, IntoConnectionInfo, RedisConnectionInfo, RedisResult,
|
||||
};
|
||||
use tokio::task::JoinHandle;
|
||||
use tracing::{error, info};
|
||||
use tracing::{debug, error, info};
|
||||
|
||||
use super::elasticache::CredentialsProvider;
|
||||
|
||||
@@ -109,7 +109,10 @@ impl ConnectionWithCredentialsProvider {
|
||||
let credentials_provider = credentials_provider.clone();
|
||||
let con2 = con.clone();
|
||||
let f = tokio::spawn(async move {
|
||||
let _ = Self::keep_connection(con2, credentials_provider).await;
|
||||
Self::keep_connection(con2, credentials_provider)
|
||||
.await
|
||||
.inspect_err(|e| debug!("keep_connection failed: {e}"))
|
||||
.ok();
|
||||
});
|
||||
self.refresh_token_task = Some(f);
|
||||
}
|
||||
|
||||
@@ -12,6 +12,7 @@ use std::{io, task};
|
||||
use thiserror::Error;
|
||||
use tokio::io::{AsyncRead, AsyncWrite, ReadBuf};
|
||||
use tokio_rustls::server::TlsStream;
|
||||
use tracing::debug;
|
||||
|
||||
/// Stream wrapper which implements libpq's protocol.
|
||||
///
|
||||
@@ -138,9 +139,10 @@ impl<S: AsyncWrite + Unpin> PqStream<S> {
|
||||
);
|
||||
|
||||
// already error case, ignore client IO error
|
||||
let _: Result<_, std::io::Error> = self
|
||||
.write_message(&BeMessage::ErrorResponse(msg, None))
|
||||
.await;
|
||||
self.write_message(&BeMessage::ErrorResponse(msg, None))
|
||||
.await
|
||||
.inspect_err(|e| debug!("write_message failed: {e}"))
|
||||
.ok();
|
||||
|
||||
Err(ReportedError {
|
||||
source: anyhow::anyhow!(msg),
|
||||
@@ -164,9 +166,10 @@ impl<S: AsyncWrite + Unpin> PqStream<S> {
|
||||
);
|
||||
|
||||
// already error case, ignore client IO error
|
||||
let _: Result<_, std::io::Error> = self
|
||||
.write_message(&BeMessage::ErrorResponse(&msg, None))
|
||||
.await;
|
||||
self.write_message(&BeMessage::ErrorResponse(&msg, None))
|
||||
.await
|
||||
.inspect_err(|e| debug!("write_message failed: {e}"))
|
||||
.ok();
|
||||
|
||||
Err(ReportedError {
|
||||
source: anyhow::anyhow!(error),
|
||||
|
||||
@@ -57,7 +57,7 @@ mod tests {
|
||||
fn bad_url() {
|
||||
let url = "test:foobar";
|
||||
url.parse::<url::Url>().expect("unexpected parsing failure");
|
||||
let _ = url.parse::<ApiUrl>().expect_err("should not parse");
|
||||
url.parse::<ApiUrl>().expect_err("should not parse");
|
||||
}
|
||||
|
||||
#[test]
|
||||
|
||||
@@ -2,6 +2,7 @@
|
||||
//! protocol commands.
|
||||
|
||||
use anyhow::Context;
|
||||
use std::future::Future;
|
||||
use std::str::{self, FromStr};
|
||||
use std::sync::Arc;
|
||||
use tokio::io::{AsyncRead, AsyncWrite};
|
||||
@@ -95,7 +96,6 @@ fn cmd_to_string(cmd: &SafekeeperPostgresCommand) -> &str {
|
||||
}
|
||||
}
|
||||
|
||||
#[async_trait::async_trait]
|
||||
impl<IO: AsyncRead + AsyncWrite + Unpin + Send> postgres_backend::Handler<IO>
|
||||
for SafekeeperPostgresHandler
|
||||
{
|
||||
@@ -197,49 +197,51 @@ impl<IO: AsyncRead + AsyncWrite + Unpin + Send> postgres_backend::Handler<IO>
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn process_query(
|
||||
fn process_query(
|
||||
&mut self,
|
||||
pgb: &mut PostgresBackend<IO>,
|
||||
query_string: &str,
|
||||
) -> Result<(), QueryError> {
|
||||
if query_string
|
||||
.to_ascii_lowercase()
|
||||
.starts_with("set datestyle to ")
|
||||
{
|
||||
// important for debug because psycopg2 executes "SET datestyle TO 'ISO'" on connect
|
||||
pgb.write_message_noflush(&BeMessage::CommandComplete(b"SELECT 1"))?;
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
let cmd = parse_cmd(query_string)?;
|
||||
let cmd_str = cmd_to_string(&cmd);
|
||||
|
||||
let _guard = PG_QUERIES_GAUGE.with_label_values(&[cmd_str]).guard();
|
||||
|
||||
info!("got query {:?}", query_string);
|
||||
|
||||
let tenant_id = self.tenant_id.context("tenantid is required")?;
|
||||
let timeline_id = self.timeline_id.context("timelineid is required")?;
|
||||
self.check_permission(Some(tenant_id))?;
|
||||
self.ttid = TenantTimelineId::new(tenant_id, timeline_id);
|
||||
|
||||
match cmd {
|
||||
SafekeeperPostgresCommand::StartWalPush => {
|
||||
self.handle_start_wal_push(pgb)
|
||||
.instrument(info_span!("WAL receiver"))
|
||||
.await
|
||||
) -> impl Future<Output = Result<(), QueryError>> {
|
||||
Box::pin(async move {
|
||||
if query_string
|
||||
.to_ascii_lowercase()
|
||||
.starts_with("set datestyle to ")
|
||||
{
|
||||
// important for debug because psycopg2 executes "SET datestyle TO 'ISO'" on connect
|
||||
pgb.write_message_noflush(&BeMessage::CommandComplete(b"SELECT 1"))?;
|
||||
return Ok(());
|
||||
}
|
||||
SafekeeperPostgresCommand::StartReplication { start_lsn, term } => {
|
||||
self.handle_start_replication(pgb, start_lsn, term)
|
||||
.instrument(info_span!("WAL sender"))
|
||||
.await
|
||||
|
||||
let cmd = parse_cmd(query_string)?;
|
||||
let cmd_str = cmd_to_string(&cmd);
|
||||
|
||||
let _guard = PG_QUERIES_GAUGE.with_label_values(&[cmd_str]).guard();
|
||||
|
||||
info!("got query {:?}", query_string);
|
||||
|
||||
let tenant_id = self.tenant_id.context("tenantid is required")?;
|
||||
let timeline_id = self.timeline_id.context("timelineid is required")?;
|
||||
self.check_permission(Some(tenant_id))?;
|
||||
self.ttid = TenantTimelineId::new(tenant_id, timeline_id);
|
||||
|
||||
match cmd {
|
||||
SafekeeperPostgresCommand::StartWalPush => {
|
||||
self.handle_start_wal_push(pgb)
|
||||
.instrument(info_span!("WAL receiver"))
|
||||
.await
|
||||
}
|
||||
SafekeeperPostgresCommand::StartReplication { start_lsn, term } => {
|
||||
self.handle_start_replication(pgb, start_lsn, term)
|
||||
.instrument(info_span!("WAL sender"))
|
||||
.await
|
||||
}
|
||||
SafekeeperPostgresCommand::IdentifySystem => self.handle_identify_system(pgb).await,
|
||||
SafekeeperPostgresCommand::TimelineStatus => self.handle_timeline_status(pgb).await,
|
||||
SafekeeperPostgresCommand::JSONCtrl { ref cmd } => {
|
||||
handle_json_ctrl(self, pgb, cmd).await
|
||||
}
|
||||
}
|
||||
SafekeeperPostgresCommand::IdentifySystem => self.handle_identify_system(pgb).await,
|
||||
SafekeeperPostgresCommand::TimelineStatus => self.handle_timeline_status(pgb).await,
|
||||
SafekeeperPostgresCommand::JSONCtrl { ref cmd } => {
|
||||
handle_json_ctrl(self, pgb, cmd).await
|
||||
}
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -292,7 +292,7 @@ impl ComputeHook {
|
||||
);
|
||||
return Ok(());
|
||||
};
|
||||
let env = match LocalEnv::load_config(repo_dir) {
|
||||
let env = match LocalEnv::load_config(repo_dir, None) {
|
||||
Ok(e) => e,
|
||||
Err(e) => {
|
||||
tracing::warn!("Couldn't load neon_local config, skipping compute update ({e})");
|
||||
|
||||
@@ -50,7 +50,7 @@ class NeonBroker:
|
||||
raise RuntimeError(
|
||||
f"timed out waiting {elapsed:.0f}s for storage_broker start: {e}"
|
||||
) from e
|
||||
time.sleep(0.5)
|
||||
time.sleep(0.1)
|
||||
else:
|
||||
break # success
|
||||
|
||||
|
||||
File diff suppressed because it is too large
Load Diff
@@ -19,7 +19,7 @@ def pg_version() -> Optional[PgVersion]:
|
||||
return None
|
||||
|
||||
|
||||
@pytest.fixture(scope="function", autouse=True)
|
||||
@pytest.fixture(scope="session", autouse=True)
|
||||
def build_type() -> Optional[str]:
|
||||
return None
|
||||
|
||||
@@ -64,7 +64,7 @@ def pytest_generate_tests(metafunc: Metafunc):
|
||||
else:
|
||||
build_types = [bt.lower()]
|
||||
|
||||
metafunc.parametrize("build_type", build_types)
|
||||
metafunc.parametrize("build_type", build_types, scope="session")
|
||||
|
||||
if (v := os.getenv("DEFAULT_PG_VERSION")) is None:
|
||||
pg_versions = [version for version in PgVersion if version != PgVersion.NOT_SET]
|
||||
|
||||
@@ -6,8 +6,8 @@ from fixtures.neon_fixtures import (
|
||||
)
|
||||
|
||||
|
||||
def test_aux_v2_config_switch(neon_env_builder: NeonEnvBuilder, vanilla_pg):
|
||||
env = neon_env_builder.init_start()
|
||||
def test_aux_v2_config_switch(neon_shared_env: NeonEnv, vanilla_pg):
|
||||
env = neon_shared_env
|
||||
endpoint = env.endpoints.create_start("main")
|
||||
client = env.pageserver.http_client()
|
||||
|
||||
|
||||
@@ -46,11 +46,11 @@ from fixtures.utils import query_scalar
|
||||
# Because the delta layer D covering lsn1 is corrupted, creating a branch
|
||||
# starting from lsn1 should return an error as follows:
|
||||
# could not find data for key ... at LSN ..., for request at LSN ...
|
||||
def test_branch_and_gc(neon_simple_env: NeonEnv, build_type: str):
|
||||
def test_branch_and_gc(neon_shared_env: NeonEnv, build_type: str):
|
||||
if build_type == "debug":
|
||||
pytest.skip("times out in debug builds")
|
||||
|
||||
env = neon_simple_env
|
||||
env = neon_shared_env
|
||||
pageserver_http_client = env.pageserver.http_client()
|
||||
|
||||
tenant, _ = env.neon_cli.create_tenant(
|
||||
@@ -116,8 +116,8 @@ def test_branch_and_gc(neon_simple_env: NeonEnv, build_type: str):
|
||||
# and prevent creating branches with invalid starting LSNs.
|
||||
#
|
||||
# For more details, see discussion in https://github.com/neondatabase/neon/pull/2101#issuecomment-1185273447.
|
||||
def test_branch_creation_before_gc(neon_simple_env: NeonEnv):
|
||||
env = neon_simple_env
|
||||
def test_branch_creation_before_gc(neon_shared_env: NeonEnv):
|
||||
env = neon_shared_env
|
||||
pageserver_http_client = env.pageserver.http_client()
|
||||
|
||||
error_regexes = [
|
||||
|
||||
@@ -33,7 +33,7 @@ from requests import RequestException
|
||||
@pytest.mark.parametrize("scale", get_scales_matrix(1))
|
||||
@pytest.mark.parametrize("ty", ["cascade", "flat"])
|
||||
def test_branching_with_pgbench(
|
||||
neon_simple_env: NeonEnv, pg_bin: PgBin, n_branches: int, scale: int, ty: str
|
||||
neon_shared_env: NeonEnv, pg_bin: PgBin, n_branches: int, scale: int, ty: str
|
||||
):
|
||||
env = neon_simple_env
|
||||
|
||||
|
||||
@@ -2,9 +2,8 @@ from fixtures.metrics import parse_metrics
|
||||
from fixtures.neon_fixtures import NeonEnvBuilder, NeonProxy
|
||||
|
||||
|
||||
def test_build_info_metric(neon_env_builder: NeonEnvBuilder, link_proxy: NeonProxy):
|
||||
neon_env_builder.num_safekeepers = 1
|
||||
env = neon_env_builder.init_start()
|
||||
def test_build_info_metric(neon_shared_env: NeonEnv, link_proxy: NeonProxy):
|
||||
env = neon_shared_env
|
||||
|
||||
parsed_metrics = {}
|
||||
|
||||
|
||||
@@ -9,8 +9,8 @@ from fixtures.utils import query_scalar
|
||||
#
|
||||
# Test compute node start after clog truncation
|
||||
#
|
||||
def test_clog_truncate(neon_simple_env: NeonEnv):
|
||||
env = neon_simple_env
|
||||
def test_clog_truncate(neon_shared_env: NeonEnv):
|
||||
env = neon_shared_env
|
||||
|
||||
# set aggressive autovacuum to make sure that truncation will happen
|
||||
config = [
|
||||
|
||||
@@ -1,8 +1,8 @@
|
||||
from fixtures.neon_fixtures import NeonEnvBuilder, flush_ep_to_pageserver
|
||||
|
||||
|
||||
def do_combocid_op(neon_env_builder: NeonEnvBuilder, op):
|
||||
env = neon_env_builder.init_start()
|
||||
def do_combocid_op(neon_shared_env: NeonEnv, op):
|
||||
env = neon_shared_env
|
||||
endpoint = env.endpoints.create_start(
|
||||
"main",
|
||||
config_lines=[
|
||||
@@ -49,20 +49,20 @@ def do_combocid_op(neon_env_builder: NeonEnvBuilder, op):
|
||||
)
|
||||
|
||||
|
||||
def test_combocid_delete(neon_env_builder: NeonEnvBuilder):
|
||||
do_combocid_op(neon_env_builder, "delete from t")
|
||||
def test_combocid_delete(neon_shared_env: NeonEnv):
|
||||
do_combocid_op(neon_env, "delete from t")
|
||||
|
||||
|
||||
def test_combocid_update(neon_env_builder: NeonEnvBuilder):
|
||||
do_combocid_op(neon_env_builder, "update t set val=val+1")
|
||||
def test_combocid_update(neon_shared_env: NeonEnv):
|
||||
do_combocid_op(neon_env, "update t set val=val+1")
|
||||
|
||||
|
||||
def test_combocid_lock(neon_env_builder: NeonEnvBuilder):
|
||||
do_combocid_op(neon_env_builder, "select * from t for update")
|
||||
def test_combocid_lock(neon_shared_env: NeonEnv):
|
||||
do_combocid_op(neon_env, "select * from t for update")
|
||||
|
||||
|
||||
def test_combocid_multi_insert(neon_env_builder: NeonEnvBuilder):
|
||||
env = neon_env_builder.init_start()
|
||||
def test_combocid_multi_insert(neon_shared_env: NeonEnv):
|
||||
env = neon_shared_env
|
||||
endpoint = env.endpoints.create_start(
|
||||
"main",
|
||||
config_lines=[
|
||||
@@ -112,8 +112,8 @@ def test_combocid_multi_insert(neon_env_builder: NeonEnvBuilder):
|
||||
)
|
||||
|
||||
|
||||
def test_combocid(neon_env_builder: NeonEnvBuilder):
|
||||
env = neon_env_builder.init_start()
|
||||
def test_combocid(neon_shared_env: NeonEnv):
|
||||
env = neon_shared_env
|
||||
endpoint = env.endpoints.create_start("main")
|
||||
|
||||
conn = endpoint.connect()
|
||||
|
||||
@@ -178,7 +178,7 @@ def test_backward_compatibility(
|
||||
neon_env_builder.num_safekeepers = 3
|
||||
env = neon_env_builder.from_repo_dir(compatibility_snapshot_dir / "repo")
|
||||
env.pageserver.allowed_errors.append(ingest_lag_log_line)
|
||||
neon_env_builder.start()
|
||||
env.start()
|
||||
|
||||
check_neon_works(
|
||||
env,
|
||||
@@ -265,7 +265,7 @@ def test_forward_compatibility(
|
||||
# does not include logs from previous runs
|
||||
assert not env.pageserver.log_contains("git-env:" + prev_pageserver_version)
|
||||
|
||||
neon_env_builder.start()
|
||||
env.start()
|
||||
|
||||
# ensure the specified pageserver is running
|
||||
assert env.pageserver.log_contains("git-env:" + prev_pageserver_version)
|
||||
|
||||
@@ -2,8 +2,8 @@ import requests
|
||||
from fixtures.neon_fixtures import NeonEnv
|
||||
|
||||
|
||||
def test_compute_catalog(neon_simple_env: NeonEnv):
|
||||
env = neon_simple_env
|
||||
def test_compute_catalog(neon_shared_env: NeonEnv):
|
||||
env = neon_shared_env
|
||||
|
||||
endpoint = env.endpoints.create_start("main", config_lines=["log_min_messages=debug1"])
|
||||
client = endpoint.http_client()
|
||||
|
||||
@@ -29,6 +29,27 @@ def test_config(neon_simple_env: NeonEnv):
|
||||
# check that config change was applied
|
||||
assert cur.fetchone() == ("debug1",)
|
||||
|
||||
def test_shared_config(neon_shared_env: NeonEnv):
|
||||
env = neon_shared_env
|
||||
|
||||
# change config
|
||||
endpoint = env.endpoints.create_start("main", config_lines=["log_min_messages=debug1"])
|
||||
|
||||
with closing(endpoint.connect()) as conn:
|
||||
with conn.cursor() as cur:
|
||||
cur.execute(
|
||||
"""
|
||||
SELECT setting
|
||||
FROM pg_settings
|
||||
WHERE
|
||||
source != 'default'
|
||||
AND source != 'override'
|
||||
AND name = 'log_min_messages'
|
||||
"""
|
||||
)
|
||||
|
||||
# check that config change was applied
|
||||
assert cur.fetchone() == ("debug1",)
|
||||
|
||||
#
|
||||
# Test that reordering of safekeepers does not restart walproposer
|
||||
|
||||
@@ -12,8 +12,8 @@ from fixtures.utils import query_scalar
|
||||
# Test CREATE DATABASE when there have been relmapper changes
|
||||
#
|
||||
@pytest.mark.parametrize("strategy", ["file_copy", "wal_log"])
|
||||
def test_createdb(neon_simple_env: NeonEnv, strategy: str):
|
||||
env = neon_simple_env
|
||||
def test_createdb(neon_shared_env: NeonEnv, strategy: str):
|
||||
env = neon_shared_env
|
||||
if env.pg_version == PgVersion.V14 and strategy == "wal_log":
|
||||
pytest.skip("wal_log strategy not supported on PostgreSQL 14")
|
||||
|
||||
@@ -58,8 +58,8 @@ def test_createdb(neon_simple_env: NeonEnv, strategy: str):
|
||||
#
|
||||
# Test DROP DATABASE
|
||||
#
|
||||
def test_dropdb(neon_simple_env: NeonEnv, test_output_dir):
|
||||
env = neon_simple_env
|
||||
def test_dropdb(neon_shared_env: NeonEnv, test_output_dir):
|
||||
env = neon_shared_env
|
||||
endpoint = env.endpoints.create_start("main")
|
||||
|
||||
with endpoint.cursor() as cur:
|
||||
|
||||
@@ -5,8 +5,8 @@ from fixtures.utils import query_scalar
|
||||
#
|
||||
# Test CREATE USER to check shared catalog restore
|
||||
#
|
||||
def test_createuser(neon_simple_env: NeonEnv):
|
||||
env = neon_simple_env
|
||||
def test_createuser(neon_shared_env: NeonEnv):
|
||||
env = neon_shared_env
|
||||
endpoint = env.endpoints.create_start("main")
|
||||
|
||||
with endpoint.cursor() as cur:
|
||||
|
||||
@@ -10,11 +10,11 @@ from fixtures.neon_fixtures import NeonEnvBuilder
|
||||
"💣", # calls `trigger_segfault` internally
|
||||
],
|
||||
)
|
||||
def test_endpoint_crash(neon_env_builder: NeonEnvBuilder, sql_func: str):
|
||||
def test_endpoint_crash(neon_shared_env: NeonEnv, sql_func: str):
|
||||
"""
|
||||
Test that triggering crash from neon_test_utils crashes the endpoint
|
||||
"""
|
||||
env = neon_env_builder.init_start()
|
||||
env = neon_shared_env
|
||||
env.neon_cli.create_branch("test_endpoint_crash")
|
||||
endpoint = env.endpoints.create_start("test_endpoint_crash")
|
||||
|
||||
|
||||
@@ -1,8 +1,8 @@
|
||||
from fixtures.neon_fixtures import NeonEnvBuilder
|
||||
from fixtures.neon_fixtures import NeonEnv
|
||||
|
||||
|
||||
def test_fsm_truncate(neon_env_builder: NeonEnvBuilder):
|
||||
env = neon_env_builder.init_start()
|
||||
def test_fsm_truncate(neon_shared_env: NeonEnv):
|
||||
env = neon_shared_env
|
||||
env.neon_cli.create_branch("test_fsm_truncate")
|
||||
endpoint = env.endpoints.create_start("test_fsm_truncate")
|
||||
endpoint.safe_psql(
|
||||
|
||||
@@ -16,12 +16,12 @@ num_rows = 1000
|
||||
|
||||
# Ensure that regular postgres can start from fullbackup
|
||||
def test_fullbackup(
|
||||
neon_env_builder: NeonEnvBuilder,
|
||||
neon_shared_env: NeonEnv,
|
||||
pg_bin: PgBin,
|
||||
port_distributor: PortDistributor,
|
||||
test_output_dir: Path,
|
||||
):
|
||||
env = neon_env_builder.init_start()
|
||||
env = neon_shared_env
|
||||
|
||||
# endpoint needs to be alive until the fullbackup so that we have
|
||||
# prev_record_lsn for the vanilla_pg to start in read-write mode
|
||||
|
||||
@@ -6,8 +6,8 @@ from fixtures.neon_fixtures import NeonEnv, wait_replica_caughtup
|
||||
#
|
||||
# Test that redo of XLOG_GIN_VACUUM_PAGE doesn't produce error
|
||||
#
|
||||
def test_gin_redo(neon_simple_env: NeonEnv):
|
||||
env = neon_simple_env
|
||||
def test_gin_redo(neon_shared_env: NeonEnv):
|
||||
env = neon_shared_env
|
||||
|
||||
primary = env.endpoints.create_start(branch_name="main", endpoint_id="primary")
|
||||
time.sleep(1)
|
||||
|
||||
@@ -87,8 +87,8 @@ def test_hot_standby(neon_simple_env: NeonEnv):
|
||||
sk_http.configure_failpoints(("sk-send-wal-replica-sleep", "off"))
|
||||
|
||||
|
||||
def test_2_replicas_start(neon_simple_env: NeonEnv):
|
||||
env = neon_simple_env
|
||||
def test_2_replicas_start(neon_shared_env: NeonEnv):
|
||||
env = neon_shared_env
|
||||
|
||||
with env.endpoints.create_start(
|
||||
branch_name="main",
|
||||
@@ -286,8 +286,8 @@ def test_hot_standby_feedback(neon_env_builder: NeonEnvBuilder, pg_bin: PgBin):
|
||||
|
||||
# Test race condition between WAL replay and backends performing queries
|
||||
# https://github.com/neondatabase/neon/issues/7791
|
||||
def test_replica_query_race(neon_simple_env: NeonEnv):
|
||||
env = neon_simple_env
|
||||
def test_replica_query_race(neon_shared_env: NeonEnv):
|
||||
env = neon_shared_env
|
||||
|
||||
primary_ep = env.endpoints.create_start(
|
||||
branch_name="main",
|
||||
|
||||
@@ -12,8 +12,8 @@ from fixtures.neon_fixtures import NeonEnvBuilder
|
||||
# to large (several gigabytes) layer files (both ephemeral and delta layers).
|
||||
# It may cause problems with uploading to S3 and also degrade performance because ephemeral file swapping.
|
||||
#
|
||||
def test_large_schema(neon_env_builder: NeonEnvBuilder):
|
||||
env = neon_env_builder.init_start()
|
||||
def test_large_schema(neon_shared_env: NeonEnv):
|
||||
env = neon_shared_env
|
||||
|
||||
endpoint = env.endpoints.create_start("main")
|
||||
|
||||
|
||||
@@ -14,8 +14,8 @@ from fixtures.neon_fixtures import NeonEnv, PgBin
|
||||
# Test branching, when a transaction is in prepared state
|
||||
#
|
||||
@pytest.mark.timeout(600)
|
||||
def test_lfc_resize(neon_simple_env: NeonEnv, pg_bin: PgBin):
|
||||
env = neon_simple_env
|
||||
def test_lfc_resize(neon_shared_env: NeonEnv, pg_bin: PgBin):
|
||||
env = neon_shared_env
|
||||
endpoint = env.endpoints.create_start(
|
||||
"main",
|
||||
config_lines=[
|
||||
|
||||
@@ -9,8 +9,8 @@ from fixtures.neon_fixtures import NeonEnvBuilder
|
||||
from fixtures.utils import query_scalar
|
||||
|
||||
|
||||
def test_local_file_cache_unlink(neon_env_builder: NeonEnvBuilder):
|
||||
env = neon_env_builder.init_start()
|
||||
def test_local_file_cache_unlink(neon_shared_env: NeonEnv):
|
||||
env = neon_shared_env
|
||||
|
||||
cache_dir = os.path.join(env.repo_dir, "file_cache")
|
||||
os.mkdir(cache_dir)
|
||||
|
||||
@@ -24,13 +24,13 @@ def assert_lsn_lease_granted(result, with_lease: bool):
|
||||
|
||||
|
||||
@pytest.mark.parametrize("with_lease", [True, False])
|
||||
def test_lsn_mapping(neon_env_builder: NeonEnvBuilder, with_lease: bool):
|
||||
def test_lsn_mapping(neon_shared_env: NeonEnv, with_lease: bool):
|
||||
"""
|
||||
Test pageserver get_lsn_by_timestamp API.
|
||||
|
||||
:param with_lease: Whether to get a lease associated with returned LSN.
|
||||
"""
|
||||
env = neon_env_builder.init_start()
|
||||
env = neon_shared_env
|
||||
|
||||
tenant_id, _ = env.neon_cli.create_tenant(
|
||||
conf={
|
||||
|
||||
@@ -7,8 +7,8 @@ if TYPE_CHECKING:
|
||||
from fixtures.neon_fixtures import NeonEnv
|
||||
|
||||
|
||||
def test_migrations(neon_simple_env: NeonEnv):
|
||||
env = neon_simple_env
|
||||
def test_migrations(neon_shared_env: NeonEnv):
|
||||
env = neon_shared_env
|
||||
|
||||
endpoint = env.endpoints.create("main")
|
||||
endpoint.respec(skip_pg_catalog_updates=False)
|
||||
|
||||
@@ -12,8 +12,8 @@ from fixtures.utils import query_scalar
|
||||
# is enough to verify that the WAL records are handled correctly
|
||||
# in the pageserver.
|
||||
#
|
||||
def test_multixact(neon_simple_env: NeonEnv, test_output_dir):
|
||||
env = neon_simple_env
|
||||
def test_multixact(neon_shared_env: NeonEnv, test_output_dir):
|
||||
env = neon_shared_env
|
||||
endpoint = env.endpoints.create_start("main")
|
||||
|
||||
cur = endpoint.connect().cursor()
|
||||
|
||||
@@ -4,8 +4,8 @@ from fixtures.pg_version import PgVersion
|
||||
from fixtures.utils import wait_until
|
||||
|
||||
|
||||
def test_neon_superuser(neon_simple_env: NeonEnv, pg_version: PgVersion):
|
||||
env = neon_simple_env
|
||||
def test_neon_superuser(neon_shared_env: NeonEnv, pg_version: PgVersion):
|
||||
env = neon_shared_env
|
||||
env.neon_cli.create_branch("test_neon_superuser_publisher", "main")
|
||||
pub = env.endpoints.create("test_neon_superuser_publisher")
|
||||
|
||||
|
||||
@@ -2,8 +2,8 @@ from fixtures.log_helper import log
|
||||
from fixtures.neon_fixtures import NeonEnvBuilder
|
||||
|
||||
|
||||
def test_oid_overflow(neon_env_builder: NeonEnvBuilder):
|
||||
env = neon_env_builder.init_start()
|
||||
def test_oid_overflow(neon_shared_env: NeonEnv):
|
||||
env = neon_shared_env
|
||||
|
||||
endpoint = env.endpoints.create_start("main")
|
||||
|
||||
|
||||
@@ -56,8 +56,8 @@ def check_client(env: NeonEnv, client: PageserverHttpClient):
|
||||
assert TimelineId(timeline_details["timeline_id"]) == timeline_id
|
||||
|
||||
|
||||
def test_pageserver_http_get_wal_receiver_not_found(neon_simple_env: NeonEnv):
|
||||
env = neon_simple_env
|
||||
def test_pageserver_http_get_wal_receiver_not_found(neon_shared_env: NeonEnv):
|
||||
env = neon_shared_env
|
||||
with env.pageserver.http_client() as client:
|
||||
tenant_id, timeline_id = env.neon_cli.create_tenant()
|
||||
|
||||
@@ -105,8 +105,8 @@ def expect_updated_msg_lsn(
|
||||
#
|
||||
# These fields used to be returned by a separate API call, but they're part of
|
||||
# `timeline_details` now.
|
||||
def test_pageserver_http_get_wal_receiver_success(neon_simple_env: NeonEnv):
|
||||
env = neon_simple_env
|
||||
def test_pageserver_http_get_wal_receiver_success(neon_shared_env: NeonEnv):
|
||||
env = neon_shared_env
|
||||
with env.pageserver.http_client() as client:
|
||||
tenant_id, timeline_id = env.neon_cli.create_tenant()
|
||||
endpoint = env.endpoints.create_start(DEFAULT_BRANCH_NAME, tenant_id=tenant_id)
|
||||
|
||||
@@ -39,8 +39,8 @@ async def parallel_load_same_table(endpoint: Endpoint, n_parallel: int):
|
||||
|
||||
|
||||
# Load data into one table with COPY TO from 5 parallel connections
|
||||
def test_parallel_copy(neon_simple_env: NeonEnv, n_parallel=5):
|
||||
env = neon_simple_env
|
||||
def test_parallel_copy(neon_shared_env: NeonEnv, n_parallel=5):
|
||||
env = neon_shared_env
|
||||
endpoint = env.endpoints.create_start("main")
|
||||
|
||||
# Create test table
|
||||
|
||||
@@ -19,8 +19,8 @@ def check_wal_segment(pg_waldump_path: str, segment_path: str, test_output_dir):
|
||||
|
||||
|
||||
# Simple test to check that pg_waldump works with neon WAL files
|
||||
def test_pg_waldump(neon_simple_env: NeonEnv, test_output_dir, pg_bin: PgBin):
|
||||
env = neon_simple_env
|
||||
def test_pg_waldump(neon_shared_env: NeonEnv, test_output_dir, pg_bin: PgBin):
|
||||
env = neon_shared_env
|
||||
tenant_id = env.initial_tenant
|
||||
timeline_id = env.initial_timeline
|
||||
endpoint = env.endpoints.create_start("main")
|
||||
|
||||
@@ -13,8 +13,8 @@ extensions = ["pageinspect", "neon_test_utils", "pg_buffercache"]
|
||||
#
|
||||
# Validation of reading different page versions
|
||||
#
|
||||
def test_read_validation(neon_simple_env: NeonEnv):
|
||||
env = neon_simple_env
|
||||
def test_read_validation(neon_shared_env: NeonEnv):
|
||||
env = neon_shared_env
|
||||
|
||||
endpoint = env.endpoints.create_start("main")
|
||||
with closing(endpoint.connect()) as con:
|
||||
@@ -125,8 +125,8 @@ def test_read_validation(neon_simple_env: NeonEnv):
|
||||
log.info(f"Caught an expected failure: {e}")
|
||||
|
||||
|
||||
def test_read_validation_neg(neon_simple_env: NeonEnv):
|
||||
env = neon_simple_env
|
||||
def test_read_validation_neg(neon_shared_env: NeonEnv):
|
||||
env = neon_shared_env
|
||||
env.pageserver.allowed_errors.append(".*invalid LSN\\(0\\) in request.*")
|
||||
|
||||
endpoint = env.endpoints.create_start("main")
|
||||
|
||||
@@ -215,8 +215,8 @@ def test_readonly_node_gc(neon_env_builder: NeonEnvBuilder):
|
||||
|
||||
|
||||
# Similar test, but with more data, and we force checkpoints
|
||||
def test_timetravel(neon_simple_env: NeonEnv):
|
||||
env = neon_simple_env
|
||||
def test_timetravel(neon_shared_env: NeonEnv):
|
||||
env = neon_shared_env
|
||||
tenant_id = env.initial_tenant
|
||||
timeline_id = env.initial_timeline
|
||||
client = env.pageserver.http_client()
|
||||
|
||||
@@ -374,7 +374,7 @@ def test_sharding_split_smoke(
|
||||
non_default_tenant_config = {"gc_horizon": 77 * 1024 * 1024}
|
||||
|
||||
env = neon_env_builder.init_configs(True)
|
||||
neon_env_builder.start()
|
||||
env.start()
|
||||
tenant_id = TenantId.generate()
|
||||
timeline_id = TimelineId.generate()
|
||||
env.neon_cli.create_tenant(
|
||||
@@ -1436,7 +1436,7 @@ def test_sharding_unlogged_relation(neon_env_builder: NeonEnvBuilder):
|
||||
|
||||
neon_env_builder.num_pageservers = 2
|
||||
env = neon_env_builder.init_configs()
|
||||
neon_env_builder.start()
|
||||
env.start()
|
||||
|
||||
tenant_id = TenantId.generate()
|
||||
timeline_id = TimelineId.generate()
|
||||
@@ -1475,7 +1475,7 @@ def test_top_tenants(neon_env_builder: NeonEnvBuilder):
|
||||
"""
|
||||
|
||||
env = neon_env_builder.init_configs()
|
||||
neon_env_builder.start()
|
||||
env.start()
|
||||
|
||||
tenants = []
|
||||
n_tenants = 8
|
||||
|
||||
@@ -7,8 +7,8 @@ from fixtures.utils import wait_until
|
||||
|
||||
# This test checks of logical replication subscriber is able to correctly restart replication without receiving duplicates.
|
||||
# It requires tracking information about replication origins at page server side
|
||||
def test_subscriber_restart(neon_simple_env: NeonEnv):
|
||||
env = neon_simple_env
|
||||
def test_subscriber_restart(neon_shared_env: NeonEnv):
|
||||
env = neon_shared_env
|
||||
env.neon_cli.create_branch("publisher")
|
||||
pub = env.endpoints.create("publisher")
|
||||
pub.start()
|
||||
|
||||
@@ -7,8 +7,8 @@ from fixtures.neon_fixtures import NeonEnv, check_restored_datadir_content
|
||||
# maintained in the pageserver, so subtransactions are not very exciting for
|
||||
# Neon. They are included in the commit record though and updated in the
|
||||
# CLOG.
|
||||
def test_subxacts(neon_simple_env: NeonEnv, test_output_dir):
|
||||
env = neon_simple_env
|
||||
def test_subxacts(neon_shared_env: NeonEnv, test_output_dir):
|
||||
env = neon_shared_env
|
||||
endpoint = env.endpoints.create_start("main")
|
||||
|
||||
pg_conn = endpoint.connect()
|
||||
|
||||
@@ -36,8 +36,8 @@ from fixtures.utils import query_scalar, run_pg_bench_small, wait_until
|
||||
from urllib3.util.retry import Retry
|
||||
|
||||
|
||||
def test_timeline_delete(neon_simple_env: NeonEnv):
|
||||
env = neon_simple_env
|
||||
def test_timeline_delete(neon_shared_env: NeonEnv):
|
||||
env = neon_shared_env
|
||||
|
||||
env.pageserver.allowed_errors.extend(
|
||||
[
|
||||
|
||||
@@ -1,13 +1,13 @@
|
||||
import time
|
||||
|
||||
from fixtures.neon_fixtures import NeonEnvBuilder
|
||||
from fixtures.neon_fixtures import NeonEnv
|
||||
|
||||
|
||||
#
|
||||
# Test truncation of FSM and VM forks of a relation
|
||||
#
|
||||
def test_truncate(neon_env_builder: NeonEnvBuilder, zenbenchmark):
|
||||
env = neon_env_builder.init_start()
|
||||
def test_truncate(neon_shared_env: NeonEnv, zenbenchmark):
|
||||
env = neon_shared_env
|
||||
n_records = 10000
|
||||
n_iter = 10
|
||||
|
||||
|
||||
@@ -7,8 +7,8 @@ from fixtures.neon_fixtures import NeonEnv, fork_at_current_lsn
|
||||
#
|
||||
# Test branching, when a transaction is in prepared state
|
||||
#
|
||||
def test_twophase(neon_simple_env: NeonEnv):
|
||||
env = neon_simple_env
|
||||
def test_twophase(neon_shared_env: NeonEnv):
|
||||
env = neon_shared_env
|
||||
endpoint = env.endpoints.create_start("main", config_lines=["max_prepared_transactions=5"])
|
||||
|
||||
conn = endpoint.connect()
|
||||
|
||||
@@ -7,8 +7,8 @@ from fixtures.pg_version import PgVersion
|
||||
# fork to reset them during recovery. In Neon, pageserver directly sends init
|
||||
# fork contents as main fork during basebackup.
|
||||
#
|
||||
def test_unlogged(neon_simple_env: NeonEnv):
|
||||
env = neon_simple_env
|
||||
def test_unlogged(neon_shared_env: NeonEnv):
|
||||
env = neon_shared_env
|
||||
endpoint = env.endpoints.create_start("main")
|
||||
|
||||
conn = endpoint.connect()
|
||||
|
||||
@@ -10,8 +10,8 @@ from fixtures.utils import query_scalar
|
||||
# Test that the VM bit is cleared correctly at a HEAP_DELETE and
|
||||
# HEAP_UPDATE record.
|
||||
#
|
||||
def test_vm_bit_clear(neon_simple_env: NeonEnv):
|
||||
env = neon_simple_env
|
||||
def test_vm_bit_clear(neon_shared_env: NeonEnv):
|
||||
env = neon_shared_env
|
||||
|
||||
endpoint = env.endpoints.create_start("main")
|
||||
|
||||
@@ -114,13 +114,13 @@ def test_vm_bit_clear(neon_simple_env: NeonEnv):
|
||||
assert cur_new.fetchall() == []
|
||||
|
||||
|
||||
def test_vm_bit_clear_on_heap_lock_whitebox(neon_env_builder: NeonEnvBuilder):
|
||||
def test_vm_bit_clear_on_heap_lock_whitebox(neon_shared_env: NeonEnv):
|
||||
"""
|
||||
Test that the ALL_FROZEN VM bit is cleared correctly at a HEAP_LOCK record.
|
||||
|
||||
This is a repro for the bug fixed in commit 66fa176cc8.
|
||||
"""
|
||||
env = neon_env_builder.init_start()
|
||||
env = neon_shared_env
|
||||
endpoint = env.endpoints.create_start(
|
||||
"main",
|
||||
config_lines=[
|
||||
|
||||
Reference in New Issue
Block a user