mirror of
https://github.com/neondatabase/neon.git
synced 2026-03-07 18:30:37 +00:00
Compare commits
4 Commits
conrad/pro
...
reduce-met
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
c0a71bc334 | ||
|
|
2cdb5503b0 | ||
|
|
74d3cdeaf0 | ||
|
|
73b369531d |
11
Cargo.lock
generated
11
Cargo.lock
generated
@@ -1105,6 +1105,7 @@ dependencies = [
|
|||||||
"anyhow",
|
"anyhow",
|
||||||
"clap 4.2.2",
|
"clap 4.2.2",
|
||||||
"comfy-table",
|
"comfy-table",
|
||||||
|
"file-lock",
|
||||||
"git-version",
|
"git-version",
|
||||||
"nix",
|
"nix",
|
||||||
"once_cell",
|
"once_cell",
|
||||||
@@ -1550,6 +1551,16 @@ dependencies = [
|
|||||||
"instant",
|
"instant",
|
||||||
]
|
]
|
||||||
|
|
||||||
|
[[package]]
|
||||||
|
name = "file-lock"
|
||||||
|
version = "2.1.9"
|
||||||
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
|
checksum = "f59be9010c5418713a48aac4c1b897d85dafd958055683dc31bdae553536647b"
|
||||||
|
dependencies = [
|
||||||
|
"cc",
|
||||||
|
"libc",
|
||||||
|
]
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "filetime"
|
name = "filetime"
|
||||||
version = "0.2.21"
|
version = "0.2.21"
|
||||||
|
|||||||
@@ -42,6 +42,7 @@ either = "1.8"
|
|||||||
enum-map = "2.4.2"
|
enum-map = "2.4.2"
|
||||||
enumset = "1.0.12"
|
enumset = "1.0.12"
|
||||||
fail = "0.5.0"
|
fail = "0.5.0"
|
||||||
|
file-lock = "2.1.9"
|
||||||
fs2 = "0.4.3"
|
fs2 = "0.4.3"
|
||||||
futures = "0.3"
|
futures = "0.3"
|
||||||
futures-core = "0.3"
|
futures-core = "0.3"
|
||||||
|
|||||||
@@ -8,6 +8,7 @@ license.workspace = true
|
|||||||
anyhow.workspace = true
|
anyhow.workspace = true
|
||||||
clap.workspace = true
|
clap.workspace = true
|
||||||
comfy-table.workspace = true
|
comfy-table.workspace = true
|
||||||
|
file-lock.workspace = true
|
||||||
git-version.workspace = true
|
git-version.workspace = true
|
||||||
nix.workspace = true
|
nix.workspace = true
|
||||||
once_cell.workspace = true
|
once_cell.workspace = true
|
||||||
|
|||||||
@@ -365,11 +365,7 @@ fn handle_tenant(tenant_match: &ArgMatches, env: &mut local_env::LocalEnv) -> an
|
|||||||
let new_timeline_id = timeline_info.timeline_id;
|
let new_timeline_id = timeline_info.timeline_id;
|
||||||
let last_record_lsn = timeline_info.last_record_lsn;
|
let last_record_lsn = timeline_info.last_record_lsn;
|
||||||
|
|
||||||
env.register_branch_mapping(
|
env.register_branch_mapping(DEFAULT_BRANCH_NAME, new_tenant_id, new_timeline_id)?;
|
||||||
DEFAULT_BRANCH_NAME.to_string(),
|
|
||||||
new_tenant_id,
|
|
||||||
new_timeline_id,
|
|
||||||
)?;
|
|
||||||
|
|
||||||
println!(
|
println!(
|
||||||
"Created an initial timeline '{new_timeline_id}' at Lsn {last_record_lsn} for tenant: {new_tenant_id}",
|
"Created an initial timeline '{new_timeline_id}' at Lsn {last_record_lsn} for tenant: {new_tenant_id}",
|
||||||
@@ -411,7 +407,7 @@ fn handle_timeline(timeline_match: &ArgMatches, env: &mut local_env::LocalEnv) -
|
|||||||
Some(("list", list_match)) => {
|
Some(("list", list_match)) => {
|
||||||
let tenant_id = get_tenant_id(list_match, env)?;
|
let tenant_id = get_tenant_id(list_match, env)?;
|
||||||
let timelines = pageserver.timeline_list(&tenant_id)?;
|
let timelines = pageserver.timeline_list(&tenant_id)?;
|
||||||
print_timelines_tree(timelines, env.timeline_name_mappings())?;
|
print_timelines_tree(timelines, env.timeline_name_mappings()?)?;
|
||||||
}
|
}
|
||||||
Some(("create", create_match)) => {
|
Some(("create", create_match)) => {
|
||||||
let tenant_id = get_tenant_id(create_match, env)?;
|
let tenant_id = get_tenant_id(create_match, env)?;
|
||||||
@@ -429,7 +425,7 @@ fn handle_timeline(timeline_match: &ArgMatches, env: &mut local_env::LocalEnv) -
|
|||||||
let new_timeline_id = timeline_info.timeline_id;
|
let new_timeline_id = timeline_info.timeline_id;
|
||||||
|
|
||||||
let last_record_lsn = timeline_info.last_record_lsn;
|
let last_record_lsn = timeline_info.last_record_lsn;
|
||||||
env.register_branch_mapping(new_branch_name.to_string(), tenant_id, new_timeline_id)?;
|
env.register_branch_mapping(new_branch_name, tenant_id, new_timeline_id)?;
|
||||||
|
|
||||||
println!(
|
println!(
|
||||||
"Created timeline '{}' at Lsn {last_record_lsn} for tenant: {tenant_id}",
|
"Created timeline '{}' at Lsn {last_record_lsn} for tenant: {tenant_id}",
|
||||||
@@ -468,10 +464,10 @@ fn handle_timeline(timeline_match: &ArgMatches, env: &mut local_env::LocalEnv) -
|
|||||||
.copied()
|
.copied()
|
||||||
.context("Failed to parse postgres version from the argument string")?;
|
.context("Failed to parse postgres version from the argument string")?;
|
||||||
|
|
||||||
let mut cplane = ComputeControlPlane::load(env.clone())?;
|
let mut cplane = ComputeControlPlane::new(env.clone());
|
||||||
println!("Importing timeline into pageserver ...");
|
println!("Importing timeline into pageserver ...");
|
||||||
pageserver.timeline_import(tenant_id, timeline_id, base, pg_wal, pg_version)?;
|
pageserver.timeline_import(tenant_id, timeline_id, base, pg_wal, pg_version)?;
|
||||||
env.register_branch_mapping(name.to_string(), tenant_id, timeline_id)?;
|
env.register_branch_mapping(name, tenant_id, timeline_id)?;
|
||||||
|
|
||||||
println!("Creating endpoint for imported timeline ...");
|
println!("Creating endpoint for imported timeline ...");
|
||||||
cplane.new_endpoint(tenant_id, name, timeline_id, None, None, pg_version)?;
|
cplane.new_endpoint(tenant_id, name, timeline_id, None, None, pg_version)?;
|
||||||
@@ -487,7 +483,7 @@ fn handle_timeline(timeline_match: &ArgMatches, env: &mut local_env::LocalEnv) -
|
|||||||
.map(|s| s.as_str())
|
.map(|s| s.as_str())
|
||||||
.unwrap_or(DEFAULT_BRANCH_NAME);
|
.unwrap_or(DEFAULT_BRANCH_NAME);
|
||||||
let ancestor_timeline_id = env
|
let ancestor_timeline_id = env
|
||||||
.get_branch_timeline_id(ancestor_branch_name, tenant_id)
|
.get_branch_timeline_id(ancestor_branch_name, tenant_id)?
|
||||||
.ok_or_else(|| {
|
.ok_or_else(|| {
|
||||||
anyhow!("Found no timeline id for branch name '{ancestor_branch_name}'")
|
anyhow!("Found no timeline id for branch name '{ancestor_branch_name}'")
|
||||||
})?;
|
})?;
|
||||||
@@ -508,7 +504,7 @@ fn handle_timeline(timeline_match: &ArgMatches, env: &mut local_env::LocalEnv) -
|
|||||||
|
|
||||||
let last_record_lsn = timeline_info.last_record_lsn;
|
let last_record_lsn = timeline_info.last_record_lsn;
|
||||||
|
|
||||||
env.register_branch_mapping(new_branch_name.to_string(), tenant_id, new_timeline_id)?;
|
env.register_branch_mapping(new_branch_name, tenant_id, new_timeline_id)?;
|
||||||
|
|
||||||
println!(
|
println!(
|
||||||
"Created timeline '{}' at Lsn {last_record_lsn} for tenant: {tenant_id}. Ancestor timeline: '{ancestor_branch_name}'",
|
"Created timeline '{}' at Lsn {last_record_lsn} for tenant: {tenant_id}. Ancestor timeline: '{ancestor_branch_name}'",
|
||||||
@@ -528,7 +524,7 @@ fn handle_endpoint(ep_match: &ArgMatches, env: &local_env::LocalEnv) -> Result<(
|
|||||||
None => bail!("no endpoint subcommand provided"),
|
None => bail!("no endpoint subcommand provided"),
|
||||||
};
|
};
|
||||||
|
|
||||||
let mut cplane = ComputeControlPlane::load(env.clone())?;
|
let mut cplane = ComputeControlPlane::new(env.clone());
|
||||||
|
|
||||||
// All subcommands take an optional --tenant-id option
|
// All subcommands take an optional --tenant-id option
|
||||||
let tenant_id = get_tenant_id(sub_args, env)?;
|
let tenant_id = get_tenant_id(sub_args, env)?;
|
||||||
@@ -540,7 +536,7 @@ fn handle_endpoint(ep_match: &ArgMatches, env: &local_env::LocalEnv) -> Result<(
|
|||||||
HashMap::new()
|
HashMap::new()
|
||||||
});
|
});
|
||||||
|
|
||||||
let timeline_name_mappings = env.timeline_name_mappings();
|
let timeline_name_mappings = env.timeline_name_mappings()?;
|
||||||
|
|
||||||
let mut table = comfy_table::Table::new();
|
let mut table = comfy_table::Table::new();
|
||||||
|
|
||||||
@@ -555,8 +551,7 @@ fn handle_endpoint(ep_match: &ArgMatches, env: &local_env::LocalEnv) -> Result<(
|
|||||||
"STATUS",
|
"STATUS",
|
||||||
]);
|
]);
|
||||||
|
|
||||||
for (endpoint_id, endpoint) in cplane
|
for (endpoint_id, endpoint) in ComputeControlPlane::load_endpoints(env)?
|
||||||
.endpoints
|
|
||||||
.iter()
|
.iter()
|
||||||
.filter(|(_, endpoint)| endpoint.tenant_id == tenant_id)
|
.filter(|(_, endpoint)| endpoint.tenant_id == tenant_id)
|
||||||
{
|
{
|
||||||
@@ -609,7 +604,7 @@ fn handle_endpoint(ep_match: &ArgMatches, env: &local_env::LocalEnv) -> Result<(
|
|||||||
.transpose()
|
.transpose()
|
||||||
.context("Failed to parse Lsn from the request")?;
|
.context("Failed to parse Lsn from the request")?;
|
||||||
let timeline_id = env
|
let timeline_id = env
|
||||||
.get_branch_timeline_id(branch_name, tenant_id)
|
.get_branch_timeline_id(branch_name, tenant_id)?
|
||||||
.ok_or_else(|| anyhow!("Found no timeline id for branch name '{branch_name}'"))?;
|
.ok_or_else(|| anyhow!("Found no timeline id for branch name '{branch_name}'"))?;
|
||||||
|
|
||||||
let port: Option<u16> = sub_args.get_one::<u16>("port").copied();
|
let port: Option<u16> = sub_args.get_one::<u16>("port").copied();
|
||||||
@@ -627,7 +622,7 @@ fn handle_endpoint(ep_match: &ArgMatches, env: &local_env::LocalEnv) -> Result<(
|
|||||||
.get_one::<String>("endpoint_id")
|
.get_one::<String>("endpoint_id")
|
||||||
.ok_or_else(|| anyhow!("No endpoint ID was provided to start"))?;
|
.ok_or_else(|| anyhow!("No endpoint ID was provided to start"))?;
|
||||||
|
|
||||||
let endpoint = cplane.endpoints.get(endpoint_id.as_str());
|
let endpoint = ComputeControlPlane::load_endpoint(endpoint_id.as_str(), env)?;
|
||||||
|
|
||||||
let auth_token = if matches!(env.pageserver.pg_auth_type, AuthType::NeonJWT) {
|
let auth_token = if matches!(env.pageserver.pg_auth_type, AuthType::NeonJWT) {
|
||||||
let claims = Claims::new(Some(tenant_id), Scope::Tenant);
|
let claims = Claims::new(Some(tenant_id), Scope::Tenant);
|
||||||
@@ -646,7 +641,7 @@ fn handle_endpoint(ep_match: &ArgMatches, env: &local_env::LocalEnv) -> Result<(
|
|||||||
.map(|s| s.as_str())
|
.map(|s| s.as_str())
|
||||||
.unwrap_or(DEFAULT_BRANCH_NAME);
|
.unwrap_or(DEFAULT_BRANCH_NAME);
|
||||||
let timeline_id = env
|
let timeline_id = env
|
||||||
.get_branch_timeline_id(branch_name, tenant_id)
|
.get_branch_timeline_id(branch_name, tenant_id)?
|
||||||
.ok_or_else(|| {
|
.ok_or_else(|| {
|
||||||
anyhow!("Found no timeline id for branch name '{branch_name}'")
|
anyhow!("Found no timeline id for branch name '{branch_name}'")
|
||||||
})?;
|
})?;
|
||||||
@@ -683,9 +678,7 @@ fn handle_endpoint(ep_match: &ArgMatches, env: &local_env::LocalEnv) -> Result<(
|
|||||||
.ok_or_else(|| anyhow!("No endpoint ID was provided to stop"))?;
|
.ok_or_else(|| anyhow!("No endpoint ID was provided to stop"))?;
|
||||||
let destroy = sub_args.get_flag("destroy");
|
let destroy = sub_args.get_flag("destroy");
|
||||||
|
|
||||||
let endpoint = cplane
|
let endpoint = ComputeControlPlane::load_endpoint(endpoint_id.as_str(), env)?
|
||||||
.endpoints
|
|
||||||
.get(endpoint_id.as_str())
|
|
||||||
.with_context(|| format!("postgres endpoint {endpoint_id} is not found"))?;
|
.with_context(|| format!("postgres endpoint {endpoint_id} is not found"))?;
|
||||||
endpoint.stop(destroy)?;
|
endpoint.stop(destroy)?;
|
||||||
}
|
}
|
||||||
@@ -844,9 +837,9 @@ fn try_stop_all(env: &local_env::LocalEnv, immediate: bool) {
|
|||||||
let pageserver = PageServerNode::from_env(env);
|
let pageserver = PageServerNode::from_env(env);
|
||||||
|
|
||||||
// Stop all endpoints
|
// Stop all endpoints
|
||||||
match ComputeControlPlane::load(env.clone()) {
|
match ComputeControlPlane::load_endpoints(env) {
|
||||||
Ok(cplane) => {
|
Ok(endpoints) => {
|
||||||
for (_k, node) in cplane.endpoints {
|
for (_k, node) in endpoints {
|
||||||
if let Err(e) = node.stop(false) {
|
if let Err(e) = node.stop(false) {
|
||||||
eprintln!("postgres stop failed: {e:#}");
|
eprintln!("postgres stop failed: {e:#}");
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -11,56 +11,104 @@ use std::sync::Arc;
|
|||||||
use std::time::Duration;
|
use std::time::Duration;
|
||||||
|
|
||||||
use anyhow::{Context, Result};
|
use anyhow::{Context, Result};
|
||||||
|
use serde::{Deserialize, Serialize};
|
||||||
|
use serde_with::{serde_as, DisplayFromStr};
|
||||||
use utils::{
|
use utils::{
|
||||||
id::{TenantId, TimelineId},
|
id::{TenantId, TimelineId},
|
||||||
lsn::Lsn,
|
lsn::Lsn,
|
||||||
};
|
};
|
||||||
|
|
||||||
use crate::local_env::{LocalEnv, DEFAULT_PG_VERSION};
|
use crate::local_env::LocalEnv;
|
||||||
use crate::pageserver::PageServerNode;
|
use crate::pageserver::PageServerNode;
|
||||||
use crate::postgresql_conf::PostgresConf;
|
use crate::postgresql_conf::PostgresConf;
|
||||||
|
|
||||||
|
// contents of a endpoint.json file
|
||||||
|
#[serde_as]
|
||||||
|
#[derive(Serialize, Deserialize, PartialEq, Eq, Clone, Debug)]
|
||||||
|
pub struct EndpointConf {
|
||||||
|
name: String,
|
||||||
|
#[serde_as(as = "DisplayFromStr")]
|
||||||
|
tenant_id: TenantId,
|
||||||
|
#[serde_as(as = "DisplayFromStr")]
|
||||||
|
timeline_id: TimelineId,
|
||||||
|
#[serde_as(as = "Option<DisplayFromStr>")]
|
||||||
|
lsn: Option<Lsn>,
|
||||||
|
port: u16,
|
||||||
|
pg_version: u32,
|
||||||
|
}
|
||||||
|
|
||||||
//
|
//
|
||||||
// ComputeControlPlane
|
// ComputeControlPlane
|
||||||
//
|
//
|
||||||
pub struct ComputeControlPlane {
|
pub struct ComputeControlPlane {
|
||||||
base_port: u16,
|
base_port: u16,
|
||||||
|
|
||||||
// endpoint ID is the key
|
|
||||||
pub endpoints: BTreeMap<String, Arc<Endpoint>>,
|
|
||||||
|
|
||||||
env: LocalEnv,
|
env: LocalEnv,
|
||||||
pageserver: Arc<PageServerNode>,
|
pageserver: Arc<PageServerNode>,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl ComputeControlPlane {
|
impl ComputeControlPlane {
|
||||||
// Load current endpoints from the endpoints/ subdirectories
|
pub fn new(env: LocalEnv) -> Self {
|
||||||
pub fn load(env: LocalEnv) -> Result<ComputeControlPlane> {
|
|
||||||
let pageserver = Arc::new(PageServerNode::from_env(&env));
|
let pageserver = Arc::new(PageServerNode::from_env(&env));
|
||||||
|
ComputeControlPlane {
|
||||||
|
base_port: 55431,
|
||||||
|
env,
|
||||||
|
pageserver,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Load current endpoints from the endpoints/ subdirectories
|
||||||
|
//
|
||||||
|
// endpoint ID is the key in the returned BTreeMap.
|
||||||
|
//
|
||||||
|
// NOTE: This is not concurrency-safe, and can fail if another 'neon_local'
|
||||||
|
// invocation is creating or deleting an endpoint at the same time.
|
||||||
|
pub fn load_endpoints(env: &LocalEnv) -> Result<BTreeMap<String, Arc<Endpoint>>> {
|
||||||
|
let pageserver = Arc::new(PageServerNode::from_env(env));
|
||||||
|
|
||||||
let mut endpoints = BTreeMap::default();
|
let mut endpoints = BTreeMap::default();
|
||||||
for endpoint_dir in fs::read_dir(env.endpoints_path())
|
for endpoint_dir in fs::read_dir(env.endpoints_path())
|
||||||
.with_context(|| format!("failed to list {}", env.endpoints_path().display()))?
|
.with_context(|| format!("failed to list {}", env.endpoints_path().display()))?
|
||||||
{
|
{
|
||||||
let ep = Endpoint::from_dir_entry(endpoint_dir?, &env, &pageserver)?;
|
let ep = Endpoint::from_dir_entry(endpoint_dir?, env, &pageserver)?;
|
||||||
endpoints.insert(ep.name.clone(), Arc::new(ep));
|
endpoints.insert(ep.name.clone(), Arc::new(ep));
|
||||||
}
|
}
|
||||||
|
|
||||||
Ok(ComputeControlPlane {
|
Ok(endpoints)
|
||||||
base_port: 55431,
|
|
||||||
endpoints,
|
|
||||||
env,
|
|
||||||
pageserver,
|
|
||||||
})
|
|
||||||
}
|
}
|
||||||
|
|
||||||
fn get_port(&mut self) -> u16 {
|
// Load an endpoint from the endpoints/ subdirectories
|
||||||
1 + self
|
pub fn load_endpoint(name: &str, env: &LocalEnv) -> Result<Option<Endpoint>> {
|
||||||
.endpoints
|
let endpoint_json_path = env.endpoints_path().join(name).join("endpoint.json");
|
||||||
|
if !endpoint_json_path.exists() {
|
||||||
|
return Ok(None);
|
||||||
|
}
|
||||||
|
|
||||||
|
// Read the endpoint.json file
|
||||||
|
let conf: EndpointConf = serde_json::from_slice(&std::fs::read(endpoint_json_path)?)?;
|
||||||
|
|
||||||
|
// ok now
|
||||||
|
let pageserver = Arc::new(PageServerNode::from_env(env));
|
||||||
|
Ok(Some(Endpoint {
|
||||||
|
address: SocketAddr::new("127.0.0.1".parse().unwrap(), conf.port),
|
||||||
|
name: name.to_string(),
|
||||||
|
env: env.clone(),
|
||||||
|
pageserver,
|
||||||
|
timeline_id: conf.timeline_id,
|
||||||
|
lsn: conf.lsn,
|
||||||
|
tenant_id: conf.tenant_id,
|
||||||
|
pg_version: conf.pg_version,
|
||||||
|
}))
|
||||||
|
}
|
||||||
|
|
||||||
|
fn get_port(&self) -> anyhow::Result<u16> {
|
||||||
|
let endpoints = ComputeControlPlane::load_endpoints(&self.env)?;
|
||||||
|
let next_port = 1 + endpoints
|
||||||
.values()
|
.values()
|
||||||
.map(|ep| ep.address.port())
|
.map(|ep| ep.address.port())
|
||||||
.max()
|
.max()
|
||||||
.unwrap_or(self.base_port)
|
.unwrap_or(self.base_port);
|
||||||
|
Ok(next_port)
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn new_endpoint(
|
pub fn new_endpoint(
|
||||||
@@ -72,7 +120,13 @@ impl ComputeControlPlane {
|
|||||||
port: Option<u16>,
|
port: Option<u16>,
|
||||||
pg_version: u32,
|
pg_version: u32,
|
||||||
) -> Result<Arc<Endpoint>> {
|
) -> Result<Arc<Endpoint>> {
|
||||||
let port = port.unwrap_or_else(|| self.get_port());
|
// NOTE: Unlike most of neon_local, 'new_endpoint' is safe to run from
|
||||||
|
// two 'neon_local' invocations at the same time, IF the port is specified
|
||||||
|
// explicitly. (get_port() is racy)
|
||||||
|
let port = match port {
|
||||||
|
Some(port) => port,
|
||||||
|
None => self.get_port()?,
|
||||||
|
};
|
||||||
let ep = Arc::new(Endpoint {
|
let ep = Arc::new(Endpoint {
|
||||||
name: name.to_owned(),
|
name: name.to_owned(),
|
||||||
address: SocketAddr::new("127.0.0.1".parse().unwrap(), port),
|
address: SocketAddr::new("127.0.0.1".parse().unwrap(), port),
|
||||||
@@ -83,12 +137,20 @@ impl ComputeControlPlane {
|
|||||||
tenant_id,
|
tenant_id,
|
||||||
pg_version,
|
pg_version,
|
||||||
});
|
});
|
||||||
|
|
||||||
ep.create_pgdata()?;
|
ep.create_pgdata()?;
|
||||||
|
std::fs::write(
|
||||||
|
ep.endpoint_path().join("endpoint.json"),
|
||||||
|
serde_json::to_string_pretty(&EndpointConf {
|
||||||
|
name: name.to_string(),
|
||||||
|
tenant_id,
|
||||||
|
timeline_id,
|
||||||
|
lsn,
|
||||||
|
port,
|
||||||
|
pg_version,
|
||||||
|
})?,
|
||||||
|
)?;
|
||||||
ep.setup_pg_conf()?;
|
ep.setup_pg_conf()?;
|
||||||
|
|
||||||
self.endpoints.insert(ep.name.clone(), Arc::clone(&ep));
|
|
||||||
|
|
||||||
Ok(ep)
|
Ok(ep)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -131,42 +193,20 @@ impl Endpoint {
|
|||||||
let fname = entry.file_name();
|
let fname = entry.file_name();
|
||||||
let name = fname.to_str().unwrap().to_string();
|
let name = fname.to_str().unwrap().to_string();
|
||||||
|
|
||||||
// Read config file into memory
|
// Read the endpoint.json file
|
||||||
let cfg_path = entry.path().join("pgdata").join("postgresql.conf");
|
let conf: EndpointConf =
|
||||||
let cfg_path_str = cfg_path.to_string_lossy();
|
serde_json::from_slice(&std::fs::read(entry.path().join("endpoint.json"))?)?;
|
||||||
let mut conf_file = File::open(&cfg_path)
|
|
||||||
.with_context(|| format!("failed to open config file in {}", cfg_path_str))?;
|
|
||||||
let conf = PostgresConf::read(&mut conf_file)
|
|
||||||
.with_context(|| format!("failed to read config file in {}", cfg_path_str))?;
|
|
||||||
|
|
||||||
// Read a few options from the config file
|
|
||||||
let context = format!("in config file {}", cfg_path_str);
|
|
||||||
let port: u16 = conf.parse_field("port", &context)?;
|
|
||||||
let timeline_id: TimelineId = conf.parse_field("neon.timeline_id", &context)?;
|
|
||||||
let tenant_id: TenantId = conf.parse_field("neon.tenant_id", &context)?;
|
|
||||||
|
|
||||||
// Read postgres version from PG_VERSION file to determine which postgres version binary to use.
|
|
||||||
// If it doesn't exist, assume broken data directory and use default pg version.
|
|
||||||
let pg_version_path = entry.path().join("PG_VERSION");
|
|
||||||
|
|
||||||
let pg_version_str =
|
|
||||||
fs::read_to_string(pg_version_path).unwrap_or_else(|_| DEFAULT_PG_VERSION.to_string());
|
|
||||||
let pg_version = u32::from_str(&pg_version_str)?;
|
|
||||||
|
|
||||||
// parse recovery_target_lsn, if any
|
|
||||||
let recovery_target_lsn: Option<Lsn> =
|
|
||||||
conf.parse_field_optional("recovery_target_lsn", &context)?;
|
|
||||||
|
|
||||||
// ok now
|
// ok now
|
||||||
Ok(Endpoint {
|
Ok(Endpoint {
|
||||||
address: SocketAddr::new("127.0.0.1".parse().unwrap(), port),
|
address: SocketAddr::new("127.0.0.1".parse().unwrap(), conf.port),
|
||||||
name,
|
name,
|
||||||
env: env.clone(),
|
env: env.clone(),
|
||||||
pageserver: Arc::clone(pageserver),
|
pageserver: Arc::clone(pageserver),
|
||||||
timeline_id,
|
timeline_id: conf.timeline_id,
|
||||||
lsn: recovery_target_lsn,
|
lsn: conf.lsn,
|
||||||
tenant_id,
|
tenant_id: conf.tenant_id,
|
||||||
pg_version,
|
pg_version: conf.pg_version,
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -5,6 +5,7 @@
|
|||||||
|
|
||||||
use anyhow::{bail, ensure, Context};
|
use anyhow::{bail, ensure, Context};
|
||||||
|
|
||||||
|
use file_lock::{FileLock, FileOptions};
|
||||||
use postgres_backend::AuthType;
|
use postgres_backend::AuthType;
|
||||||
use reqwest::Url;
|
use reqwest::Url;
|
||||||
use serde::{Deserialize, Serialize};
|
use serde::{Deserialize, Serialize};
|
||||||
@@ -12,11 +13,14 @@ use serde_with::{serde_as, DisplayFromStr};
|
|||||||
use std::collections::HashMap;
|
use std::collections::HashMap;
|
||||||
use std::env;
|
use std::env;
|
||||||
use std::fs;
|
use std::fs;
|
||||||
|
use std::io::Seek;
|
||||||
use std::net::IpAddr;
|
use std::net::IpAddr;
|
||||||
use std::net::Ipv4Addr;
|
use std::net::Ipv4Addr;
|
||||||
use std::net::SocketAddr;
|
use std::net::SocketAddr;
|
||||||
|
use std::ops::{Deref, DerefMut};
|
||||||
use std::path::{Path, PathBuf};
|
use std::path::{Path, PathBuf};
|
||||||
use std::process::{Command, Stdio};
|
use std::process::{Command, Stdio};
|
||||||
|
use std::str::FromStr;
|
||||||
use utils::{
|
use utils::{
|
||||||
auth::{encode_from_key_file, Claims},
|
auth::{encode_from_key_file, Claims},
|
||||||
id::{NodeId, TenantId, TenantTimelineId, TimelineId},
|
id::{NodeId, TenantId, TenantTimelineId, TimelineId},
|
||||||
@@ -72,14 +76,84 @@ pub struct LocalEnv {
|
|||||||
|
|
||||||
#[serde(default)]
|
#[serde(default)]
|
||||||
pub safekeepers: Vec<SafekeeperConf>,
|
pub safekeepers: Vec<SafekeeperConf>,
|
||||||
|
}
|
||||||
|
|
||||||
/// Keep human-readable aliases in memory (and persist them to config), to hide ZId hex strings from the user.
|
// Keep human-readable aliases in memory (and persist them to
|
||||||
#[serde(default)]
|
// 'branch_name_mappings.json'), to hide ZId hex strings from the user.
|
||||||
// A `HashMap<String, HashMap<TenantId, TimelineId>>` would be more appropriate here,
|
//
|
||||||
// but deserialization into a generic toml object as `toml::Value::try_from` fails with an error.
|
// BranchNameMappingsSerialized corresponds to the actual JSON format of
|
||||||
// https://toml.io/en/v1.0.0 does not contain a concept of "a table inside another table".
|
// 'branch_name_mappings.json' file. It's a bit more awkward to work with, so we convert
|
||||||
#[serde_as(as = "HashMap<_, Vec<(DisplayFromStr, DisplayFromStr)>>")]
|
// it to/from BranchNameMappings when reading/writing the file.
|
||||||
branch_name_mappings: HashMap<String, Vec<(TenantId, TimelineId)>>,
|
type BranchNameMappings = HashMap<(TenantId, String), TimelineId>;
|
||||||
|
|
||||||
|
type BranchNameMappingsSerialized = HashMap<String, HashMap<String, String>>;
|
||||||
|
|
||||||
|
pub struct BranchNameMappingsLock {
|
||||||
|
mappings: BranchNameMappings,
|
||||||
|
lock: FileLock,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl Deref for BranchNameMappingsLock {
|
||||||
|
type Target = HashMap<(TenantId, String), TimelineId>;
|
||||||
|
|
||||||
|
fn deref(&self) -> &Self::Target {
|
||||||
|
&self.mappings
|
||||||
|
}
|
||||||
|
}
|
||||||
|
impl DerefMut for BranchNameMappingsLock {
|
||||||
|
fn deref_mut(&mut self) -> &mut Self::Target {
|
||||||
|
&mut self.mappings
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl BranchNameMappingsLock {
|
||||||
|
/// Write the modified branch-name mapppings back to 'branch_name_mappings.json',
|
||||||
|
/// and release the lock.
|
||||||
|
fn write_to_file(mut self) -> anyhow::Result<()> {
|
||||||
|
let mut serialized_mappings: BranchNameMappingsSerialized = HashMap::new();
|
||||||
|
for ((tenant_id, branch_name), timeline_id) in self.iter() {
|
||||||
|
serialized_mappings
|
||||||
|
.entry(tenant_id.to_string())
|
||||||
|
.or_default()
|
||||||
|
.insert(branch_name.clone(), timeline_id.to_string());
|
||||||
|
}
|
||||||
|
|
||||||
|
self.lock.file.set_len(0)?;
|
||||||
|
self.lock.file.rewind()?;
|
||||||
|
serde_json::to_writer_pretty(&self.lock.file, &serialized_mappings)?;
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Get the branch-name mappings.
|
||||||
|
///
|
||||||
|
/// This returns a guard object that holds a lock on the branch_name_mappings.json
|
||||||
|
/// file. That makes it safe for two 'neon_local' invocations to read/manipulate
|
||||||
|
/// branch name mappings at the same time.
|
||||||
|
pub fn load_branch_name_mappings() -> anyhow::Result<BranchNameMappingsLock> {
|
||||||
|
let path = base_path().join("branch_name_mappings.json");
|
||||||
|
let lock = FileLock::lock(
|
||||||
|
path,
|
||||||
|
true,
|
||||||
|
FileOptions::new().create(true).read(true).write(true),
|
||||||
|
)?;
|
||||||
|
|
||||||
|
let mut mappings = BranchNameMappings::new();
|
||||||
|
if lock.file.metadata()?.len() > 0 {
|
||||||
|
let serialized_mappings: BranchNameMappingsSerialized = serde_json::from_reader(&lock.file)
|
||||||
|
.context("Failed to read branch_name_mappings.json")?;
|
||||||
|
|
||||||
|
for (tenant_str, map) in serialized_mappings.iter() {
|
||||||
|
for (branch_name, timeline_str) in map.iter() {
|
||||||
|
mappings.insert(
|
||||||
|
(TenantId::from_str(tenant_str)?, branch_name.to_string()),
|
||||||
|
TimelineId::from_str(timeline_str)?,
|
||||||
|
);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
Ok(BranchNameMappingsLock { mappings, lock })
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Broker config for cluster internal communication.
|
/// Broker config for cluster internal communication.
|
||||||
@@ -215,27 +289,21 @@ impl LocalEnv {
|
|||||||
|
|
||||||
pub fn register_branch_mapping(
|
pub fn register_branch_mapping(
|
||||||
&mut self,
|
&mut self,
|
||||||
branch_name: String,
|
branch_name: &str,
|
||||||
tenant_id: TenantId,
|
tenant_id: TenantId,
|
||||||
timeline_id: TimelineId,
|
timeline_id: TimelineId,
|
||||||
) -> anyhow::Result<()> {
|
) -> anyhow::Result<()> {
|
||||||
let existing_values = self
|
let mut mappings = load_branch_name_mappings()?;
|
||||||
.branch_name_mappings
|
|
||||||
.entry(branch_name.clone())
|
|
||||||
.or_default();
|
|
||||||
|
|
||||||
let existing_ids = existing_values
|
if let Some(old_timeline_id) = mappings.get(&(tenant_id, branch_name.to_string())) {
|
||||||
.iter()
|
|
||||||
.find(|(existing_tenant_id, _)| existing_tenant_id == &tenant_id);
|
|
||||||
|
|
||||||
if let Some((_, old_timeline_id)) = existing_ids {
|
|
||||||
if old_timeline_id == &timeline_id {
|
if old_timeline_id == &timeline_id {
|
||||||
Ok(())
|
Ok(())
|
||||||
} else {
|
} else {
|
||||||
bail!("branch '{branch_name}' is already mapped to timeline {old_timeline_id}, cannot map to another timeline {timeline_id}");
|
bail!("branch '{branch_name}' is already mapped to timeline {old_timeline_id}, cannot map to another timeline {timeline_id}");
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
existing_values.push((tenant_id, timeline_id));
|
mappings.insert((tenant_id, branch_name.to_string()), timeline_id);
|
||||||
|
mappings.write_to_file()?;
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -244,24 +312,22 @@ impl LocalEnv {
|
|||||||
&self,
|
&self,
|
||||||
branch_name: &str,
|
branch_name: &str,
|
||||||
tenant_id: TenantId,
|
tenant_id: TenantId,
|
||||||
) -> Option<TimelineId> {
|
) -> anyhow::Result<Option<TimelineId>> {
|
||||||
self.branch_name_mappings
|
let mappings = load_branch_name_mappings()?;
|
||||||
.get(branch_name)?
|
Ok(mappings.get(&(tenant_id, branch_name.to_string())).copied())
|
||||||
.iter()
|
|
||||||
.find(|(mapped_tenant_id, _)| mapped_tenant_id == &tenant_id)
|
|
||||||
.map(|&(_, timeline_id)| timeline_id)
|
|
||||||
.map(TimelineId::from)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn timeline_name_mappings(&self) -> HashMap<TenantTimelineId, String> {
|
pub fn timeline_name_mappings(&self) -> anyhow::Result<HashMap<TenantTimelineId, String>> {
|
||||||
self.branch_name_mappings
|
let mappings = load_branch_name_mappings()?;
|
||||||
|
Ok(mappings
|
||||||
.iter()
|
.iter()
|
||||||
.flat_map(|(name, tenant_timelines)| {
|
.map(|((tenant_id, branch_name), timeline_id)| {
|
||||||
tenant_timelines.iter().map(|&(tenant_id, timeline_id)| {
|
(
|
||||||
(TenantTimelineId::new(tenant_id, timeline_id), name.clone())
|
TenantTimelineId::new(*tenant_id, *timeline_id),
|
||||||
})
|
branch_name.clone(),
|
||||||
|
)
|
||||||
})
|
})
|
||||||
.collect()
|
.collect())
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Create a LocalEnv from a config file.
|
/// Create a LocalEnv from a config file.
|
||||||
|
|||||||
@@ -139,6 +139,15 @@ pub static REMOTE_ONDEMAND_DOWNLOADED_BYTES: Lazy<IntCounter> = Lazy::new(|| {
|
|||||||
.unwrap()
|
.unwrap()
|
||||||
});
|
});
|
||||||
|
|
||||||
|
pub static LOAD_LAYER_MAP_HISTOGRAM: Lazy<Histogram> = Lazy::new(|| {
|
||||||
|
register_histogram!(
|
||||||
|
"pageserver_load_layer_map_histogram",
|
||||||
|
"Time spent on loadiing layer map",
|
||||||
|
STORAGE_OP_BUCKETS.into(),
|
||||||
|
)
|
||||||
|
.expect("failed to define a metric")
|
||||||
|
});
|
||||||
|
|
||||||
static CURRENT_LOGICAL_SIZE: Lazy<UIntGaugeVec> = Lazy::new(|| {
|
static CURRENT_LOGICAL_SIZE: Lazy<UIntGaugeVec> = Lazy::new(|| {
|
||||||
register_uint_gauge_vec!(
|
register_uint_gauge_vec!(
|
||||||
"pageserver_current_logical_size",
|
"pageserver_current_logical_size",
|
||||||
@@ -552,7 +561,7 @@ impl StorageTimeMetricsTimer {
|
|||||||
pub struct StorageTimeMetrics {
|
pub struct StorageTimeMetrics {
|
||||||
/// Sum of f64 seconds, per operation, tenant_id and timeline_id
|
/// Sum of f64 seconds, per operation, tenant_id and timeline_id
|
||||||
timeline_sum: Counter,
|
timeline_sum: Counter,
|
||||||
/// Number of oeprations, per operation, tenant_id and timeline_id
|
/// Number of operations, per operation, tenant_id and timeline_id
|
||||||
timeline_count: IntCounter,
|
timeline_count: IntCounter,
|
||||||
/// Global histogram having only the "operation" label.
|
/// Global histogram having only the "operation" label.
|
||||||
global_histogram: Histogram,
|
global_histogram: Histogram,
|
||||||
@@ -595,7 +604,6 @@ pub struct TimelineMetrics {
|
|||||||
pub compact_time_histo: StorageTimeMetrics,
|
pub compact_time_histo: StorageTimeMetrics,
|
||||||
pub create_images_time_histo: StorageTimeMetrics,
|
pub create_images_time_histo: StorageTimeMetrics,
|
||||||
pub logical_size_histo: StorageTimeMetrics,
|
pub logical_size_histo: StorageTimeMetrics,
|
||||||
pub load_layer_map_histo: StorageTimeMetrics,
|
|
||||||
pub garbage_collect_histo: StorageTimeMetrics,
|
pub garbage_collect_histo: StorageTimeMetrics,
|
||||||
pub last_record_gauge: IntGauge,
|
pub last_record_gauge: IntGauge,
|
||||||
pub wait_lsn_time_histo: Histogram,
|
pub wait_lsn_time_histo: Histogram,
|
||||||
@@ -627,8 +635,6 @@ impl TimelineMetrics {
|
|||||||
let create_images_time_histo =
|
let create_images_time_histo =
|
||||||
StorageTimeMetrics::new("create images", &tenant_id, &timeline_id);
|
StorageTimeMetrics::new("create images", &tenant_id, &timeline_id);
|
||||||
let logical_size_histo = StorageTimeMetrics::new("logical size", &tenant_id, &timeline_id);
|
let logical_size_histo = StorageTimeMetrics::new("logical size", &tenant_id, &timeline_id);
|
||||||
let load_layer_map_histo =
|
|
||||||
StorageTimeMetrics::new("load layer map", &tenant_id, &timeline_id);
|
|
||||||
let garbage_collect_histo = StorageTimeMetrics::new("gc", &tenant_id, &timeline_id);
|
let garbage_collect_histo = StorageTimeMetrics::new("gc", &tenant_id, &timeline_id);
|
||||||
let last_record_gauge = LAST_RECORD_LSN
|
let last_record_gauge = LAST_RECORD_LSN
|
||||||
.get_metric_with_label_values(&[&tenant_id, &timeline_id])
|
.get_metric_with_label_values(&[&tenant_id, &timeline_id])
|
||||||
@@ -664,7 +670,6 @@ impl TimelineMetrics {
|
|||||||
create_images_time_histo,
|
create_images_time_histo,
|
||||||
logical_size_histo,
|
logical_size_histo,
|
||||||
garbage_collect_histo,
|
garbage_collect_histo,
|
||||||
load_layer_map_histo,
|
|
||||||
last_record_gauge,
|
last_record_gauge,
|
||||||
wait_lsn_time_histo,
|
wait_lsn_time_histo,
|
||||||
resident_physical_size_gauge,
|
resident_physical_size_gauge,
|
||||||
|
|||||||
@@ -48,7 +48,7 @@ use crate::tenant::{
|
|||||||
|
|
||||||
use crate::config::PageServerConf;
|
use crate::config::PageServerConf;
|
||||||
use crate::keyspace::{KeyPartitioning, KeySpace};
|
use crate::keyspace::{KeyPartitioning, KeySpace};
|
||||||
use crate::metrics::TimelineMetrics;
|
use crate::metrics::{TimelineMetrics, LOAD_LAYER_MAP_HISTOGRAM};
|
||||||
use crate::pgdatadir_mapping::LsnForTimestamp;
|
use crate::pgdatadir_mapping::LsnForTimestamp;
|
||||||
use crate::pgdatadir_mapping::{is_rel_fsm_block_key, is_rel_vm_block_key};
|
use crate::pgdatadir_mapping::{is_rel_fsm_block_key, is_rel_vm_block_key};
|
||||||
use crate::pgdatadir_mapping::{BlockNumber, CalculateLogicalSizeError};
|
use crate::pgdatadir_mapping::{BlockNumber, CalculateLogicalSizeError};
|
||||||
@@ -1444,7 +1444,7 @@ impl Timeline {
|
|||||||
let mut updates = layers.batch_update();
|
let mut updates = layers.batch_update();
|
||||||
let mut num_layers = 0;
|
let mut num_layers = 0;
|
||||||
|
|
||||||
let timer = self.metrics.load_layer_map_histo.start_timer();
|
let timer = LOAD_LAYER_MAP_HISTOGRAM.start_timer();
|
||||||
|
|
||||||
// Scan timeline directory and create ImageFileName and DeltaFilename
|
// Scan timeline directory and create ImageFileName and DeltaFilename
|
||||||
// structs representing all files on disk
|
// structs representing all files on disk
|
||||||
|
|||||||
@@ -1,7 +1,10 @@
|
|||||||
|
import threading
|
||||||
import timeit
|
import timeit
|
||||||
|
from threading import BoundedSemaphore
|
||||||
|
|
||||||
import pytest
|
import pytest
|
||||||
from fixtures.benchmark_fixture import MetricReport
|
from fixtures.benchmark_fixture import MetricReport
|
||||||
|
from fixtures.compare_fixtures import NeonCompare
|
||||||
from fixtures.neon_fixtures import NeonEnvBuilder
|
from fixtures.neon_fixtures import NeonEnvBuilder
|
||||||
|
|
||||||
# Run bulk tenant creation test.
|
# Run bulk tenant creation test.
|
||||||
@@ -50,3 +53,57 @@ def test_bulk_tenant_create(
|
|||||||
"s",
|
"s",
|
||||||
report=MetricReport.LOWER_IS_BETTER,
|
report=MetricReport.LOWER_IS_BETTER,
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.mark.parametrize("tenants_count", [50])
|
||||||
|
def test_parallel_tenant_create(
|
||||||
|
neon_compare: NeonCompare,
|
||||||
|
tenants_count: int,
|
||||||
|
):
|
||||||
|
"""Create lots of tenants in parallel
|
||||||
|
|
||||||
|
One important thing that this measures is the amount of prometheus
|
||||||
|
metrics per tenant. The pageserver exposes a lot of metrics for
|
||||||
|
each timeline, and this test gives some visibility to how much
|
||||||
|
exactly. (We've had to raise the prometheus scraper's limit on
|
||||||
|
the max metrics size several times, because we expose so many.)
|
||||||
|
"""
|
||||||
|
env = neon_compare.env
|
||||||
|
zenbenchmark = neon_compare.zenbenchmark
|
||||||
|
|
||||||
|
max_concurrent = 5
|
||||||
|
pool_sema = BoundedSemaphore(value=max_concurrent)
|
||||||
|
|
||||||
|
def worker(i: int):
|
||||||
|
with pool_sema:
|
||||||
|
tenant, timeline_id = env.neon_cli.create_tenant()
|
||||||
|
|
||||||
|
endpoint_tenant = env.endpoints.create_start("main", tenant_id=tenant)
|
||||||
|
|
||||||
|
with endpoint_tenant.cursor() as cur:
|
||||||
|
cur.execute("select count(*) from pg_class")
|
||||||
|
endpoint_tenant.stop()
|
||||||
|
|
||||||
|
threads = [threading.Thread(target=worker, args=(i,)) for i in range(tenants_count)]
|
||||||
|
start = timeit.default_timer()
|
||||||
|
for thread in threads:
|
||||||
|
thread.start()
|
||||||
|
for thread in threads:
|
||||||
|
thread.join()
|
||||||
|
|
||||||
|
end = timeit.default_timer()
|
||||||
|
|
||||||
|
zenbenchmark.record(
|
||||||
|
"tenant_creation_time",
|
||||||
|
end - start,
|
||||||
|
"s",
|
||||||
|
report=MetricReport.LOWER_IS_BETTER,
|
||||||
|
)
|
||||||
|
|
||||||
|
metrics = env.pageserver.http_client().get_metrics_str()
|
||||||
|
zenbenchmark.record(
|
||||||
|
"prometheus_metrics_size",
|
||||||
|
len(metrics) / tenants_count,
|
||||||
|
"bytes",
|
||||||
|
report=MetricReport.LOWER_IS_BETTER,
|
||||||
|
)
|
||||||
|
|||||||
@@ -16,7 +16,8 @@ from fixtures.neon_fixtures import (
|
|||||||
)
|
)
|
||||||
from fixtures.pageserver.http import PageserverHttpClient
|
from fixtures.pageserver.http import PageserverHttpClient
|
||||||
from fixtures.pageserver.utils import wait_for_last_record_lsn, wait_for_upload
|
from fixtures.pageserver.utils import wait_for_last_record_lsn, wait_for_upload
|
||||||
from fixtures.types import Lsn
|
from fixtures.types import Lsn, TenantId, TimelineId
|
||||||
|
from fixtures.utils import query_scalar
|
||||||
from pytest import FixtureRequest
|
from pytest import FixtureRequest
|
||||||
|
|
||||||
#
|
#
|
||||||
@@ -58,6 +59,10 @@ def test_create_snapshot(neon_env_builder: NeonEnvBuilder, pg_bin: PgBin, test_o
|
|||||||
env = neon_env_builder.init_start()
|
env = neon_env_builder.init_start()
|
||||||
endpoint = env.endpoints.create_start("main")
|
endpoint = env.endpoints.create_start("main")
|
||||||
|
|
||||||
|
with endpoint.cursor() as cur:
|
||||||
|
tenant_id = TenantId(query_scalar(cur, "SHOW neon.tenant_id"))
|
||||||
|
timeline_id = TimelineId(query_scalar(cur, "SHOW neon.timeline_id"))
|
||||||
|
|
||||||
# FIXME: Is this expected?
|
# FIXME: Is this expected?
|
||||||
env.pageserver.allowed_errors.append(
|
env.pageserver.allowed_errors.append(
|
||||||
".*init_tenant_mgr: marking .* as locally complete, while it doesnt exist in remote index.*"
|
".*init_tenant_mgr: marking .* as locally complete, while it doesnt exist in remote index.*"
|
||||||
@@ -69,10 +74,6 @@ def test_create_snapshot(neon_env_builder: NeonEnvBuilder, pg_bin: PgBin, test_o
|
|||||||
["pg_dumpall", f"--dbname={endpoint.connstr()}", f"--file={test_output_dir / 'dump.sql'}"]
|
["pg_dumpall", f"--dbname={endpoint.connstr()}", f"--file={test_output_dir / 'dump.sql'}"]
|
||||||
)
|
)
|
||||||
|
|
||||||
snapshot_config = toml.load(test_output_dir / "repo" / "config")
|
|
||||||
tenant_id = snapshot_config["default_tenant_id"]
|
|
||||||
timeline_id = dict(snapshot_config["branch_name_mappings"]["main"])[tenant_id]
|
|
||||||
|
|
||||||
pageserver_http = env.pageserver.http_client()
|
pageserver_http = env.pageserver.http_client()
|
||||||
lsn = Lsn(endpoint.safe_psql("SELECT pg_current_wal_flush_lsn()")[0][0])
|
lsn = Lsn(endpoint.safe_psql("SELECT pg_current_wal_flush_lsn()")[0][0])
|
||||||
|
|
||||||
|
|||||||
@@ -51,7 +51,7 @@ metric_kinds_checked = set([])
|
|||||||
|
|
||||||
|
|
||||||
#
|
#
|
||||||
# verify that metrics look minilally sane
|
# verify that metrics look minimally sane
|
||||||
#
|
#
|
||||||
def metrics_handler(request: Request) -> Response:
|
def metrics_handler(request: Request) -> Response:
|
||||||
if request.json is None:
|
if request.json is None:
|
||||||
|
|||||||
Reference in New Issue
Block a user