mirror of
https://github.com/neondatabase/neon.git
synced 2026-01-20 03:42:55 +00:00
Compare commits
4 Commits
rustls
...
reduce-met
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
c0a71bc334 | ||
|
|
2cdb5503b0 | ||
|
|
74d3cdeaf0 | ||
|
|
73b369531d |
11
Cargo.lock
generated
11
Cargo.lock
generated
@@ -1105,6 +1105,7 @@ dependencies = [
|
||||
"anyhow",
|
||||
"clap 4.2.2",
|
||||
"comfy-table",
|
||||
"file-lock",
|
||||
"git-version",
|
||||
"nix",
|
||||
"once_cell",
|
||||
@@ -1550,6 +1551,16 @@ dependencies = [
|
||||
"instant",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "file-lock"
|
||||
version = "2.1.9"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "f59be9010c5418713a48aac4c1b897d85dafd958055683dc31bdae553536647b"
|
||||
dependencies = [
|
||||
"cc",
|
||||
"libc",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "filetime"
|
||||
version = "0.2.21"
|
||||
|
||||
@@ -42,6 +42,7 @@ either = "1.8"
|
||||
enum-map = "2.4.2"
|
||||
enumset = "1.0.12"
|
||||
fail = "0.5.0"
|
||||
file-lock = "2.1.9"
|
||||
fs2 = "0.4.3"
|
||||
futures = "0.3"
|
||||
futures-core = "0.3"
|
||||
|
||||
@@ -8,6 +8,7 @@ license.workspace = true
|
||||
anyhow.workspace = true
|
||||
clap.workspace = true
|
||||
comfy-table.workspace = true
|
||||
file-lock.workspace = true
|
||||
git-version.workspace = true
|
||||
nix.workspace = true
|
||||
once_cell.workspace = true
|
||||
|
||||
@@ -365,11 +365,7 @@ fn handle_tenant(tenant_match: &ArgMatches, env: &mut local_env::LocalEnv) -> an
|
||||
let new_timeline_id = timeline_info.timeline_id;
|
||||
let last_record_lsn = timeline_info.last_record_lsn;
|
||||
|
||||
env.register_branch_mapping(
|
||||
DEFAULT_BRANCH_NAME.to_string(),
|
||||
new_tenant_id,
|
||||
new_timeline_id,
|
||||
)?;
|
||||
env.register_branch_mapping(DEFAULT_BRANCH_NAME, new_tenant_id, new_timeline_id)?;
|
||||
|
||||
println!(
|
||||
"Created an initial timeline '{new_timeline_id}' at Lsn {last_record_lsn} for tenant: {new_tenant_id}",
|
||||
@@ -411,7 +407,7 @@ fn handle_timeline(timeline_match: &ArgMatches, env: &mut local_env::LocalEnv) -
|
||||
Some(("list", list_match)) => {
|
||||
let tenant_id = get_tenant_id(list_match, env)?;
|
||||
let timelines = pageserver.timeline_list(&tenant_id)?;
|
||||
print_timelines_tree(timelines, env.timeline_name_mappings())?;
|
||||
print_timelines_tree(timelines, env.timeline_name_mappings()?)?;
|
||||
}
|
||||
Some(("create", create_match)) => {
|
||||
let tenant_id = get_tenant_id(create_match, env)?;
|
||||
@@ -429,7 +425,7 @@ fn handle_timeline(timeline_match: &ArgMatches, env: &mut local_env::LocalEnv) -
|
||||
let new_timeline_id = timeline_info.timeline_id;
|
||||
|
||||
let last_record_lsn = timeline_info.last_record_lsn;
|
||||
env.register_branch_mapping(new_branch_name.to_string(), tenant_id, new_timeline_id)?;
|
||||
env.register_branch_mapping(new_branch_name, tenant_id, new_timeline_id)?;
|
||||
|
||||
println!(
|
||||
"Created timeline '{}' at Lsn {last_record_lsn} for tenant: {tenant_id}",
|
||||
@@ -468,10 +464,10 @@ fn handle_timeline(timeline_match: &ArgMatches, env: &mut local_env::LocalEnv) -
|
||||
.copied()
|
||||
.context("Failed to parse postgres version from the argument string")?;
|
||||
|
||||
let mut cplane = ComputeControlPlane::load(env.clone())?;
|
||||
let mut cplane = ComputeControlPlane::new(env.clone());
|
||||
println!("Importing timeline into pageserver ...");
|
||||
pageserver.timeline_import(tenant_id, timeline_id, base, pg_wal, pg_version)?;
|
||||
env.register_branch_mapping(name.to_string(), tenant_id, timeline_id)?;
|
||||
env.register_branch_mapping(name, tenant_id, timeline_id)?;
|
||||
|
||||
println!("Creating endpoint for imported timeline ...");
|
||||
cplane.new_endpoint(tenant_id, name, timeline_id, None, None, pg_version)?;
|
||||
@@ -487,7 +483,7 @@ fn handle_timeline(timeline_match: &ArgMatches, env: &mut local_env::LocalEnv) -
|
||||
.map(|s| s.as_str())
|
||||
.unwrap_or(DEFAULT_BRANCH_NAME);
|
||||
let ancestor_timeline_id = env
|
||||
.get_branch_timeline_id(ancestor_branch_name, tenant_id)
|
||||
.get_branch_timeline_id(ancestor_branch_name, tenant_id)?
|
||||
.ok_or_else(|| {
|
||||
anyhow!("Found no timeline id for branch name '{ancestor_branch_name}'")
|
||||
})?;
|
||||
@@ -508,7 +504,7 @@ fn handle_timeline(timeline_match: &ArgMatches, env: &mut local_env::LocalEnv) -
|
||||
|
||||
let last_record_lsn = timeline_info.last_record_lsn;
|
||||
|
||||
env.register_branch_mapping(new_branch_name.to_string(), tenant_id, new_timeline_id)?;
|
||||
env.register_branch_mapping(new_branch_name, tenant_id, new_timeline_id)?;
|
||||
|
||||
println!(
|
||||
"Created timeline '{}' at Lsn {last_record_lsn} for tenant: {tenant_id}. Ancestor timeline: '{ancestor_branch_name}'",
|
||||
@@ -528,7 +524,7 @@ fn handle_endpoint(ep_match: &ArgMatches, env: &local_env::LocalEnv) -> Result<(
|
||||
None => bail!("no endpoint subcommand provided"),
|
||||
};
|
||||
|
||||
let mut cplane = ComputeControlPlane::load(env.clone())?;
|
||||
let mut cplane = ComputeControlPlane::new(env.clone());
|
||||
|
||||
// All subcommands take an optional --tenant-id option
|
||||
let tenant_id = get_tenant_id(sub_args, env)?;
|
||||
@@ -540,7 +536,7 @@ fn handle_endpoint(ep_match: &ArgMatches, env: &local_env::LocalEnv) -> Result<(
|
||||
HashMap::new()
|
||||
});
|
||||
|
||||
let timeline_name_mappings = env.timeline_name_mappings();
|
||||
let timeline_name_mappings = env.timeline_name_mappings()?;
|
||||
|
||||
let mut table = comfy_table::Table::new();
|
||||
|
||||
@@ -555,8 +551,7 @@ fn handle_endpoint(ep_match: &ArgMatches, env: &local_env::LocalEnv) -> Result<(
|
||||
"STATUS",
|
||||
]);
|
||||
|
||||
for (endpoint_id, endpoint) in cplane
|
||||
.endpoints
|
||||
for (endpoint_id, endpoint) in ComputeControlPlane::load_endpoints(env)?
|
||||
.iter()
|
||||
.filter(|(_, endpoint)| endpoint.tenant_id == tenant_id)
|
||||
{
|
||||
@@ -609,7 +604,7 @@ fn handle_endpoint(ep_match: &ArgMatches, env: &local_env::LocalEnv) -> Result<(
|
||||
.transpose()
|
||||
.context("Failed to parse Lsn from the request")?;
|
||||
let timeline_id = env
|
||||
.get_branch_timeline_id(branch_name, tenant_id)
|
||||
.get_branch_timeline_id(branch_name, tenant_id)?
|
||||
.ok_or_else(|| anyhow!("Found no timeline id for branch name '{branch_name}'"))?;
|
||||
|
||||
let port: Option<u16> = sub_args.get_one::<u16>("port").copied();
|
||||
@@ -627,7 +622,7 @@ fn handle_endpoint(ep_match: &ArgMatches, env: &local_env::LocalEnv) -> Result<(
|
||||
.get_one::<String>("endpoint_id")
|
||||
.ok_or_else(|| anyhow!("No endpoint ID was provided to start"))?;
|
||||
|
||||
let endpoint = cplane.endpoints.get(endpoint_id.as_str());
|
||||
let endpoint = ComputeControlPlane::load_endpoint(endpoint_id.as_str(), env)?;
|
||||
|
||||
let auth_token = if matches!(env.pageserver.pg_auth_type, AuthType::NeonJWT) {
|
||||
let claims = Claims::new(Some(tenant_id), Scope::Tenant);
|
||||
@@ -646,7 +641,7 @@ fn handle_endpoint(ep_match: &ArgMatches, env: &local_env::LocalEnv) -> Result<(
|
||||
.map(|s| s.as_str())
|
||||
.unwrap_or(DEFAULT_BRANCH_NAME);
|
||||
let timeline_id = env
|
||||
.get_branch_timeline_id(branch_name, tenant_id)
|
||||
.get_branch_timeline_id(branch_name, tenant_id)?
|
||||
.ok_or_else(|| {
|
||||
anyhow!("Found no timeline id for branch name '{branch_name}'")
|
||||
})?;
|
||||
@@ -683,9 +678,7 @@ fn handle_endpoint(ep_match: &ArgMatches, env: &local_env::LocalEnv) -> Result<(
|
||||
.ok_or_else(|| anyhow!("No endpoint ID was provided to stop"))?;
|
||||
let destroy = sub_args.get_flag("destroy");
|
||||
|
||||
let endpoint = cplane
|
||||
.endpoints
|
||||
.get(endpoint_id.as_str())
|
||||
let endpoint = ComputeControlPlane::load_endpoint(endpoint_id.as_str(), env)?
|
||||
.with_context(|| format!("postgres endpoint {endpoint_id} is not found"))?;
|
||||
endpoint.stop(destroy)?;
|
||||
}
|
||||
@@ -844,9 +837,9 @@ fn try_stop_all(env: &local_env::LocalEnv, immediate: bool) {
|
||||
let pageserver = PageServerNode::from_env(env);
|
||||
|
||||
// Stop all endpoints
|
||||
match ComputeControlPlane::load(env.clone()) {
|
||||
Ok(cplane) => {
|
||||
for (_k, node) in cplane.endpoints {
|
||||
match ComputeControlPlane::load_endpoints(env) {
|
||||
Ok(endpoints) => {
|
||||
for (_k, node) in endpoints {
|
||||
if let Err(e) = node.stop(false) {
|
||||
eprintln!("postgres stop failed: {e:#}");
|
||||
}
|
||||
|
||||
@@ -11,56 +11,104 @@ use std::sync::Arc;
|
||||
use std::time::Duration;
|
||||
|
||||
use anyhow::{Context, Result};
|
||||
use serde::{Deserialize, Serialize};
|
||||
use serde_with::{serde_as, DisplayFromStr};
|
||||
use utils::{
|
||||
id::{TenantId, TimelineId},
|
||||
lsn::Lsn,
|
||||
};
|
||||
|
||||
use crate::local_env::{LocalEnv, DEFAULT_PG_VERSION};
|
||||
use crate::local_env::LocalEnv;
|
||||
use crate::pageserver::PageServerNode;
|
||||
use crate::postgresql_conf::PostgresConf;
|
||||
|
||||
// contents of a endpoint.json file
|
||||
#[serde_as]
|
||||
#[derive(Serialize, Deserialize, PartialEq, Eq, Clone, Debug)]
|
||||
pub struct EndpointConf {
|
||||
name: String,
|
||||
#[serde_as(as = "DisplayFromStr")]
|
||||
tenant_id: TenantId,
|
||||
#[serde_as(as = "DisplayFromStr")]
|
||||
timeline_id: TimelineId,
|
||||
#[serde_as(as = "Option<DisplayFromStr>")]
|
||||
lsn: Option<Lsn>,
|
||||
port: u16,
|
||||
pg_version: u32,
|
||||
}
|
||||
|
||||
//
|
||||
// ComputeControlPlane
|
||||
//
|
||||
pub struct ComputeControlPlane {
|
||||
base_port: u16,
|
||||
|
||||
// endpoint ID is the key
|
||||
pub endpoints: BTreeMap<String, Arc<Endpoint>>,
|
||||
|
||||
env: LocalEnv,
|
||||
pageserver: Arc<PageServerNode>,
|
||||
}
|
||||
|
||||
impl ComputeControlPlane {
|
||||
// Load current endpoints from the endpoints/ subdirectories
|
||||
pub fn load(env: LocalEnv) -> Result<ComputeControlPlane> {
|
||||
pub fn new(env: LocalEnv) -> Self {
|
||||
let pageserver = Arc::new(PageServerNode::from_env(&env));
|
||||
ComputeControlPlane {
|
||||
base_port: 55431,
|
||||
env,
|
||||
pageserver,
|
||||
}
|
||||
}
|
||||
|
||||
// Load current endpoints from the endpoints/ subdirectories
|
||||
//
|
||||
// endpoint ID is the key in the returned BTreeMap.
|
||||
//
|
||||
// NOTE: This is not concurrency-safe, and can fail if another 'neon_local'
|
||||
// invocation is creating or deleting an endpoint at the same time.
|
||||
pub fn load_endpoints(env: &LocalEnv) -> Result<BTreeMap<String, Arc<Endpoint>>> {
|
||||
let pageserver = Arc::new(PageServerNode::from_env(env));
|
||||
|
||||
let mut endpoints = BTreeMap::default();
|
||||
for endpoint_dir in fs::read_dir(env.endpoints_path())
|
||||
.with_context(|| format!("failed to list {}", env.endpoints_path().display()))?
|
||||
{
|
||||
let ep = Endpoint::from_dir_entry(endpoint_dir?, &env, &pageserver)?;
|
||||
let ep = Endpoint::from_dir_entry(endpoint_dir?, env, &pageserver)?;
|
||||
endpoints.insert(ep.name.clone(), Arc::new(ep));
|
||||
}
|
||||
|
||||
Ok(ComputeControlPlane {
|
||||
base_port: 55431,
|
||||
endpoints,
|
||||
env,
|
||||
pageserver,
|
||||
})
|
||||
Ok(endpoints)
|
||||
}
|
||||
|
||||
fn get_port(&mut self) -> u16 {
|
||||
1 + self
|
||||
.endpoints
|
||||
// Load an endpoint from the endpoints/ subdirectories
|
||||
pub fn load_endpoint(name: &str, env: &LocalEnv) -> Result<Option<Endpoint>> {
|
||||
let endpoint_json_path = env.endpoints_path().join(name).join("endpoint.json");
|
||||
if !endpoint_json_path.exists() {
|
||||
return Ok(None);
|
||||
}
|
||||
|
||||
// Read the endpoint.json file
|
||||
let conf: EndpointConf = serde_json::from_slice(&std::fs::read(endpoint_json_path)?)?;
|
||||
|
||||
// ok now
|
||||
let pageserver = Arc::new(PageServerNode::from_env(env));
|
||||
Ok(Some(Endpoint {
|
||||
address: SocketAddr::new("127.0.0.1".parse().unwrap(), conf.port),
|
||||
name: name.to_string(),
|
||||
env: env.clone(),
|
||||
pageserver,
|
||||
timeline_id: conf.timeline_id,
|
||||
lsn: conf.lsn,
|
||||
tenant_id: conf.tenant_id,
|
||||
pg_version: conf.pg_version,
|
||||
}))
|
||||
}
|
||||
|
||||
fn get_port(&self) -> anyhow::Result<u16> {
|
||||
let endpoints = ComputeControlPlane::load_endpoints(&self.env)?;
|
||||
let next_port = 1 + endpoints
|
||||
.values()
|
||||
.map(|ep| ep.address.port())
|
||||
.max()
|
||||
.unwrap_or(self.base_port)
|
||||
.unwrap_or(self.base_port);
|
||||
Ok(next_port)
|
||||
}
|
||||
|
||||
pub fn new_endpoint(
|
||||
@@ -72,7 +120,13 @@ impl ComputeControlPlane {
|
||||
port: Option<u16>,
|
||||
pg_version: u32,
|
||||
) -> Result<Arc<Endpoint>> {
|
||||
let port = port.unwrap_or_else(|| self.get_port());
|
||||
// NOTE: Unlike most of neon_local, 'new_endpoint' is safe to run from
|
||||
// two 'neon_local' invocations at the same time, IF the port is specified
|
||||
// explicitly. (get_port() is racy)
|
||||
let port = match port {
|
||||
Some(port) => port,
|
||||
None => self.get_port()?,
|
||||
};
|
||||
let ep = Arc::new(Endpoint {
|
||||
name: name.to_owned(),
|
||||
address: SocketAddr::new("127.0.0.1".parse().unwrap(), port),
|
||||
@@ -83,12 +137,20 @@ impl ComputeControlPlane {
|
||||
tenant_id,
|
||||
pg_version,
|
||||
});
|
||||
|
||||
ep.create_pgdata()?;
|
||||
std::fs::write(
|
||||
ep.endpoint_path().join("endpoint.json"),
|
||||
serde_json::to_string_pretty(&EndpointConf {
|
||||
name: name.to_string(),
|
||||
tenant_id,
|
||||
timeline_id,
|
||||
lsn,
|
||||
port,
|
||||
pg_version,
|
||||
})?,
|
||||
)?;
|
||||
ep.setup_pg_conf()?;
|
||||
|
||||
self.endpoints.insert(ep.name.clone(), Arc::clone(&ep));
|
||||
|
||||
Ok(ep)
|
||||
}
|
||||
}
|
||||
@@ -131,42 +193,20 @@ impl Endpoint {
|
||||
let fname = entry.file_name();
|
||||
let name = fname.to_str().unwrap().to_string();
|
||||
|
||||
// Read config file into memory
|
||||
let cfg_path = entry.path().join("pgdata").join("postgresql.conf");
|
||||
let cfg_path_str = cfg_path.to_string_lossy();
|
||||
let mut conf_file = File::open(&cfg_path)
|
||||
.with_context(|| format!("failed to open config file in {}", cfg_path_str))?;
|
||||
let conf = PostgresConf::read(&mut conf_file)
|
||||
.with_context(|| format!("failed to read config file in {}", cfg_path_str))?;
|
||||
|
||||
// Read a few options from the config file
|
||||
let context = format!("in config file {}", cfg_path_str);
|
||||
let port: u16 = conf.parse_field("port", &context)?;
|
||||
let timeline_id: TimelineId = conf.parse_field("neon.timeline_id", &context)?;
|
||||
let tenant_id: TenantId = conf.parse_field("neon.tenant_id", &context)?;
|
||||
|
||||
// Read postgres version from PG_VERSION file to determine which postgres version binary to use.
|
||||
// If it doesn't exist, assume broken data directory and use default pg version.
|
||||
let pg_version_path = entry.path().join("PG_VERSION");
|
||||
|
||||
let pg_version_str =
|
||||
fs::read_to_string(pg_version_path).unwrap_or_else(|_| DEFAULT_PG_VERSION.to_string());
|
||||
let pg_version = u32::from_str(&pg_version_str)?;
|
||||
|
||||
// parse recovery_target_lsn, if any
|
||||
let recovery_target_lsn: Option<Lsn> =
|
||||
conf.parse_field_optional("recovery_target_lsn", &context)?;
|
||||
// Read the endpoint.json file
|
||||
let conf: EndpointConf =
|
||||
serde_json::from_slice(&std::fs::read(entry.path().join("endpoint.json"))?)?;
|
||||
|
||||
// ok now
|
||||
Ok(Endpoint {
|
||||
address: SocketAddr::new("127.0.0.1".parse().unwrap(), port),
|
||||
address: SocketAddr::new("127.0.0.1".parse().unwrap(), conf.port),
|
||||
name,
|
||||
env: env.clone(),
|
||||
pageserver: Arc::clone(pageserver),
|
||||
timeline_id,
|
||||
lsn: recovery_target_lsn,
|
||||
tenant_id,
|
||||
pg_version,
|
||||
timeline_id: conf.timeline_id,
|
||||
lsn: conf.lsn,
|
||||
tenant_id: conf.tenant_id,
|
||||
pg_version: conf.pg_version,
|
||||
})
|
||||
}
|
||||
|
||||
|
||||
@@ -5,6 +5,7 @@
|
||||
|
||||
use anyhow::{bail, ensure, Context};
|
||||
|
||||
use file_lock::{FileLock, FileOptions};
|
||||
use postgres_backend::AuthType;
|
||||
use reqwest::Url;
|
||||
use serde::{Deserialize, Serialize};
|
||||
@@ -12,11 +13,14 @@ use serde_with::{serde_as, DisplayFromStr};
|
||||
use std::collections::HashMap;
|
||||
use std::env;
|
||||
use std::fs;
|
||||
use std::io::Seek;
|
||||
use std::net::IpAddr;
|
||||
use std::net::Ipv4Addr;
|
||||
use std::net::SocketAddr;
|
||||
use std::ops::{Deref, DerefMut};
|
||||
use std::path::{Path, PathBuf};
|
||||
use std::process::{Command, Stdio};
|
||||
use std::str::FromStr;
|
||||
use utils::{
|
||||
auth::{encode_from_key_file, Claims},
|
||||
id::{NodeId, TenantId, TenantTimelineId, TimelineId},
|
||||
@@ -72,14 +76,84 @@ pub struct LocalEnv {
|
||||
|
||||
#[serde(default)]
|
||||
pub safekeepers: Vec<SafekeeperConf>,
|
||||
}
|
||||
|
||||
/// Keep human-readable aliases in memory (and persist them to config), to hide ZId hex strings from the user.
|
||||
#[serde(default)]
|
||||
// A `HashMap<String, HashMap<TenantId, TimelineId>>` would be more appropriate here,
|
||||
// but deserialization into a generic toml object as `toml::Value::try_from` fails with an error.
|
||||
// https://toml.io/en/v1.0.0 does not contain a concept of "a table inside another table".
|
||||
#[serde_as(as = "HashMap<_, Vec<(DisplayFromStr, DisplayFromStr)>>")]
|
||||
branch_name_mappings: HashMap<String, Vec<(TenantId, TimelineId)>>,
|
||||
// Keep human-readable aliases in memory (and persist them to
|
||||
// 'branch_name_mappings.json'), to hide ZId hex strings from the user.
|
||||
//
|
||||
// BranchNameMappingsSerialized corresponds to the actual JSON format of
|
||||
// 'branch_name_mappings.json' file. It's a bit more awkward to work with, so we convert
|
||||
// it to/from BranchNameMappings when reading/writing the file.
|
||||
type BranchNameMappings = HashMap<(TenantId, String), TimelineId>;
|
||||
|
||||
type BranchNameMappingsSerialized = HashMap<String, HashMap<String, String>>;
|
||||
|
||||
pub struct BranchNameMappingsLock {
|
||||
mappings: BranchNameMappings,
|
||||
lock: FileLock,
|
||||
}
|
||||
|
||||
impl Deref for BranchNameMappingsLock {
|
||||
type Target = HashMap<(TenantId, String), TimelineId>;
|
||||
|
||||
fn deref(&self) -> &Self::Target {
|
||||
&self.mappings
|
||||
}
|
||||
}
|
||||
impl DerefMut for BranchNameMappingsLock {
|
||||
fn deref_mut(&mut self) -> &mut Self::Target {
|
||||
&mut self.mappings
|
||||
}
|
||||
}
|
||||
|
||||
impl BranchNameMappingsLock {
|
||||
/// Write the modified branch-name mapppings back to 'branch_name_mappings.json',
|
||||
/// and release the lock.
|
||||
fn write_to_file(mut self) -> anyhow::Result<()> {
|
||||
let mut serialized_mappings: BranchNameMappingsSerialized = HashMap::new();
|
||||
for ((tenant_id, branch_name), timeline_id) in self.iter() {
|
||||
serialized_mappings
|
||||
.entry(tenant_id.to_string())
|
||||
.or_default()
|
||||
.insert(branch_name.clone(), timeline_id.to_string());
|
||||
}
|
||||
|
||||
self.lock.file.set_len(0)?;
|
||||
self.lock.file.rewind()?;
|
||||
serde_json::to_writer_pretty(&self.lock.file, &serialized_mappings)?;
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
/// Get the branch-name mappings.
|
||||
///
|
||||
/// This returns a guard object that holds a lock on the branch_name_mappings.json
|
||||
/// file. That makes it safe for two 'neon_local' invocations to read/manipulate
|
||||
/// branch name mappings at the same time.
|
||||
pub fn load_branch_name_mappings() -> anyhow::Result<BranchNameMappingsLock> {
|
||||
let path = base_path().join("branch_name_mappings.json");
|
||||
let lock = FileLock::lock(
|
||||
path,
|
||||
true,
|
||||
FileOptions::new().create(true).read(true).write(true),
|
||||
)?;
|
||||
|
||||
let mut mappings = BranchNameMappings::new();
|
||||
if lock.file.metadata()?.len() > 0 {
|
||||
let serialized_mappings: BranchNameMappingsSerialized = serde_json::from_reader(&lock.file)
|
||||
.context("Failed to read branch_name_mappings.json")?;
|
||||
|
||||
for (tenant_str, map) in serialized_mappings.iter() {
|
||||
for (branch_name, timeline_str) in map.iter() {
|
||||
mappings.insert(
|
||||
(TenantId::from_str(tenant_str)?, branch_name.to_string()),
|
||||
TimelineId::from_str(timeline_str)?,
|
||||
);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Ok(BranchNameMappingsLock { mappings, lock })
|
||||
}
|
||||
|
||||
/// Broker config for cluster internal communication.
|
||||
@@ -215,27 +289,21 @@ impl LocalEnv {
|
||||
|
||||
pub fn register_branch_mapping(
|
||||
&mut self,
|
||||
branch_name: String,
|
||||
branch_name: &str,
|
||||
tenant_id: TenantId,
|
||||
timeline_id: TimelineId,
|
||||
) -> anyhow::Result<()> {
|
||||
let existing_values = self
|
||||
.branch_name_mappings
|
||||
.entry(branch_name.clone())
|
||||
.or_default();
|
||||
let mut mappings = load_branch_name_mappings()?;
|
||||
|
||||
let existing_ids = existing_values
|
||||
.iter()
|
||||
.find(|(existing_tenant_id, _)| existing_tenant_id == &tenant_id);
|
||||
|
||||
if let Some((_, old_timeline_id)) = existing_ids {
|
||||
if let Some(old_timeline_id) = mappings.get(&(tenant_id, branch_name.to_string())) {
|
||||
if old_timeline_id == &timeline_id {
|
||||
Ok(())
|
||||
} else {
|
||||
bail!("branch '{branch_name}' is already mapped to timeline {old_timeline_id}, cannot map to another timeline {timeline_id}");
|
||||
}
|
||||
} else {
|
||||
existing_values.push((tenant_id, timeline_id));
|
||||
mappings.insert((tenant_id, branch_name.to_string()), timeline_id);
|
||||
mappings.write_to_file()?;
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
@@ -244,24 +312,22 @@ impl LocalEnv {
|
||||
&self,
|
||||
branch_name: &str,
|
||||
tenant_id: TenantId,
|
||||
) -> Option<TimelineId> {
|
||||
self.branch_name_mappings
|
||||
.get(branch_name)?
|
||||
.iter()
|
||||
.find(|(mapped_tenant_id, _)| mapped_tenant_id == &tenant_id)
|
||||
.map(|&(_, timeline_id)| timeline_id)
|
||||
.map(TimelineId::from)
|
||||
) -> anyhow::Result<Option<TimelineId>> {
|
||||
let mappings = load_branch_name_mappings()?;
|
||||
Ok(mappings.get(&(tenant_id, branch_name.to_string())).copied())
|
||||
}
|
||||
|
||||
pub fn timeline_name_mappings(&self) -> HashMap<TenantTimelineId, String> {
|
||||
self.branch_name_mappings
|
||||
pub fn timeline_name_mappings(&self) -> anyhow::Result<HashMap<TenantTimelineId, String>> {
|
||||
let mappings = load_branch_name_mappings()?;
|
||||
Ok(mappings
|
||||
.iter()
|
||||
.flat_map(|(name, tenant_timelines)| {
|
||||
tenant_timelines.iter().map(|&(tenant_id, timeline_id)| {
|
||||
(TenantTimelineId::new(tenant_id, timeline_id), name.clone())
|
||||
})
|
||||
.map(|((tenant_id, branch_name), timeline_id)| {
|
||||
(
|
||||
TenantTimelineId::new(*tenant_id, *timeline_id),
|
||||
branch_name.clone(),
|
||||
)
|
||||
})
|
||||
.collect()
|
||||
.collect())
|
||||
}
|
||||
|
||||
/// Create a LocalEnv from a config file.
|
||||
|
||||
@@ -139,6 +139,15 @@ pub static REMOTE_ONDEMAND_DOWNLOADED_BYTES: Lazy<IntCounter> = Lazy::new(|| {
|
||||
.unwrap()
|
||||
});
|
||||
|
||||
pub static LOAD_LAYER_MAP_HISTOGRAM: Lazy<Histogram> = Lazy::new(|| {
|
||||
register_histogram!(
|
||||
"pageserver_load_layer_map_histogram",
|
||||
"Time spent on loadiing layer map",
|
||||
STORAGE_OP_BUCKETS.into(),
|
||||
)
|
||||
.expect("failed to define a metric")
|
||||
});
|
||||
|
||||
static CURRENT_LOGICAL_SIZE: Lazy<UIntGaugeVec> = Lazy::new(|| {
|
||||
register_uint_gauge_vec!(
|
||||
"pageserver_current_logical_size",
|
||||
@@ -552,7 +561,7 @@ impl StorageTimeMetricsTimer {
|
||||
pub struct StorageTimeMetrics {
|
||||
/// Sum of f64 seconds, per operation, tenant_id and timeline_id
|
||||
timeline_sum: Counter,
|
||||
/// Number of oeprations, per operation, tenant_id and timeline_id
|
||||
/// Number of operations, per operation, tenant_id and timeline_id
|
||||
timeline_count: IntCounter,
|
||||
/// Global histogram having only the "operation" label.
|
||||
global_histogram: Histogram,
|
||||
@@ -595,7 +604,6 @@ pub struct TimelineMetrics {
|
||||
pub compact_time_histo: StorageTimeMetrics,
|
||||
pub create_images_time_histo: StorageTimeMetrics,
|
||||
pub logical_size_histo: StorageTimeMetrics,
|
||||
pub load_layer_map_histo: StorageTimeMetrics,
|
||||
pub garbage_collect_histo: StorageTimeMetrics,
|
||||
pub last_record_gauge: IntGauge,
|
||||
pub wait_lsn_time_histo: Histogram,
|
||||
@@ -627,8 +635,6 @@ impl TimelineMetrics {
|
||||
let create_images_time_histo =
|
||||
StorageTimeMetrics::new("create images", &tenant_id, &timeline_id);
|
||||
let logical_size_histo = StorageTimeMetrics::new("logical size", &tenant_id, &timeline_id);
|
||||
let load_layer_map_histo =
|
||||
StorageTimeMetrics::new("load layer map", &tenant_id, &timeline_id);
|
||||
let garbage_collect_histo = StorageTimeMetrics::new("gc", &tenant_id, &timeline_id);
|
||||
let last_record_gauge = LAST_RECORD_LSN
|
||||
.get_metric_with_label_values(&[&tenant_id, &timeline_id])
|
||||
@@ -664,7 +670,6 @@ impl TimelineMetrics {
|
||||
create_images_time_histo,
|
||||
logical_size_histo,
|
||||
garbage_collect_histo,
|
||||
load_layer_map_histo,
|
||||
last_record_gauge,
|
||||
wait_lsn_time_histo,
|
||||
resident_physical_size_gauge,
|
||||
|
||||
@@ -48,7 +48,7 @@ use crate::tenant::{
|
||||
|
||||
use crate::config::PageServerConf;
|
||||
use crate::keyspace::{KeyPartitioning, KeySpace};
|
||||
use crate::metrics::TimelineMetrics;
|
||||
use crate::metrics::{TimelineMetrics, LOAD_LAYER_MAP_HISTOGRAM};
|
||||
use crate::pgdatadir_mapping::LsnForTimestamp;
|
||||
use crate::pgdatadir_mapping::{is_rel_fsm_block_key, is_rel_vm_block_key};
|
||||
use crate::pgdatadir_mapping::{BlockNumber, CalculateLogicalSizeError};
|
||||
@@ -1444,7 +1444,7 @@ impl Timeline {
|
||||
let mut updates = layers.batch_update();
|
||||
let mut num_layers = 0;
|
||||
|
||||
let timer = self.metrics.load_layer_map_histo.start_timer();
|
||||
let timer = LOAD_LAYER_MAP_HISTOGRAM.start_timer();
|
||||
|
||||
// Scan timeline directory and create ImageFileName and DeltaFilename
|
||||
// structs representing all files on disk
|
||||
|
||||
@@ -1,7 +1,10 @@
|
||||
import threading
|
||||
import timeit
|
||||
from threading import BoundedSemaphore
|
||||
|
||||
import pytest
|
||||
from fixtures.benchmark_fixture import MetricReport
|
||||
from fixtures.compare_fixtures import NeonCompare
|
||||
from fixtures.neon_fixtures import NeonEnvBuilder
|
||||
|
||||
# Run bulk tenant creation test.
|
||||
@@ -50,3 +53,57 @@ def test_bulk_tenant_create(
|
||||
"s",
|
||||
report=MetricReport.LOWER_IS_BETTER,
|
||||
)
|
||||
|
||||
|
||||
@pytest.mark.parametrize("tenants_count", [50])
|
||||
def test_parallel_tenant_create(
|
||||
neon_compare: NeonCompare,
|
||||
tenants_count: int,
|
||||
):
|
||||
"""Create lots of tenants in parallel
|
||||
|
||||
One important thing that this measures is the amount of prometheus
|
||||
metrics per tenant. The pageserver exposes a lot of metrics for
|
||||
each timeline, and this test gives some visibility to how much
|
||||
exactly. (We've had to raise the prometheus scraper's limit on
|
||||
the max metrics size several times, because we expose so many.)
|
||||
"""
|
||||
env = neon_compare.env
|
||||
zenbenchmark = neon_compare.zenbenchmark
|
||||
|
||||
max_concurrent = 5
|
||||
pool_sema = BoundedSemaphore(value=max_concurrent)
|
||||
|
||||
def worker(i: int):
|
||||
with pool_sema:
|
||||
tenant, timeline_id = env.neon_cli.create_tenant()
|
||||
|
||||
endpoint_tenant = env.endpoints.create_start("main", tenant_id=tenant)
|
||||
|
||||
with endpoint_tenant.cursor() as cur:
|
||||
cur.execute("select count(*) from pg_class")
|
||||
endpoint_tenant.stop()
|
||||
|
||||
threads = [threading.Thread(target=worker, args=(i,)) for i in range(tenants_count)]
|
||||
start = timeit.default_timer()
|
||||
for thread in threads:
|
||||
thread.start()
|
||||
for thread in threads:
|
||||
thread.join()
|
||||
|
||||
end = timeit.default_timer()
|
||||
|
||||
zenbenchmark.record(
|
||||
"tenant_creation_time",
|
||||
end - start,
|
||||
"s",
|
||||
report=MetricReport.LOWER_IS_BETTER,
|
||||
)
|
||||
|
||||
metrics = env.pageserver.http_client().get_metrics_str()
|
||||
zenbenchmark.record(
|
||||
"prometheus_metrics_size",
|
||||
len(metrics) / tenants_count,
|
||||
"bytes",
|
||||
report=MetricReport.LOWER_IS_BETTER,
|
||||
)
|
||||
|
||||
@@ -16,7 +16,8 @@ from fixtures.neon_fixtures import (
|
||||
)
|
||||
from fixtures.pageserver.http import PageserverHttpClient
|
||||
from fixtures.pageserver.utils import wait_for_last_record_lsn, wait_for_upload
|
||||
from fixtures.types import Lsn
|
||||
from fixtures.types import Lsn, TenantId, TimelineId
|
||||
from fixtures.utils import query_scalar
|
||||
from pytest import FixtureRequest
|
||||
|
||||
#
|
||||
@@ -58,6 +59,10 @@ def test_create_snapshot(neon_env_builder: NeonEnvBuilder, pg_bin: PgBin, test_o
|
||||
env = neon_env_builder.init_start()
|
||||
endpoint = env.endpoints.create_start("main")
|
||||
|
||||
with endpoint.cursor() as cur:
|
||||
tenant_id = TenantId(query_scalar(cur, "SHOW neon.tenant_id"))
|
||||
timeline_id = TimelineId(query_scalar(cur, "SHOW neon.timeline_id"))
|
||||
|
||||
# FIXME: Is this expected?
|
||||
env.pageserver.allowed_errors.append(
|
||||
".*init_tenant_mgr: marking .* as locally complete, while it doesnt exist in remote index.*"
|
||||
@@ -69,10 +74,6 @@ def test_create_snapshot(neon_env_builder: NeonEnvBuilder, pg_bin: PgBin, test_o
|
||||
["pg_dumpall", f"--dbname={endpoint.connstr()}", f"--file={test_output_dir / 'dump.sql'}"]
|
||||
)
|
||||
|
||||
snapshot_config = toml.load(test_output_dir / "repo" / "config")
|
||||
tenant_id = snapshot_config["default_tenant_id"]
|
||||
timeline_id = dict(snapshot_config["branch_name_mappings"]["main"])[tenant_id]
|
||||
|
||||
pageserver_http = env.pageserver.http_client()
|
||||
lsn = Lsn(endpoint.safe_psql("SELECT pg_current_wal_flush_lsn()")[0][0])
|
||||
|
||||
|
||||
@@ -51,7 +51,7 @@ metric_kinds_checked = set([])
|
||||
|
||||
|
||||
#
|
||||
# verify that metrics look minilally sane
|
||||
# verify that metrics look minimally sane
|
||||
#
|
||||
def metrics_handler(request: Request) -> Response:
|
||||
if request.json is None:
|
||||
|
||||
Reference in New Issue
Block a user