Compare commits

...

4 Commits

Author SHA1 Message Date
Heikki Linnakangas
c0a71bc334 Don't collect histogram of load_layer_map operations per timeline.
The layer map is loaded only once, when the tenant is attached. We don't
need that level of detail anyway: if one tenant is particularly slow
at loading the layer map, we can probably pinpoint which one it is by
looking at the logs.
2023-04-24 11:26:01 +03:00
Heikki Linnakangas
2cdb5503b0 Add perf test that creates tenants in parallel, and measures metrics size 2023-04-24 11:26:01 +03:00
Heikki Linnakangas
74d3cdeaf0 neon_local: Fix race conditions creating multiple endpoints concurrently
The list of endpoints was loaded by scanning the endpoints directory,
whenever you started 'neon_local'. If you ran two 'neon_local endpoint
create' commands concurrently, one of them might fail because of the
other half-finished endpoint creation. There might be a directory
without the endpoint.json file, for example, or a half-written
endpoint.json file. Most commands don't need the full list of
endpoints, however, so only load that when it's needed. In particular,
it is now possible to run two "neon_local endpoint create" commands in
parallel, if you explicitly assign a port to both. (If you let
neon_local assign the port, it needs to check all the other endpoints
to decide what ports are free.)

Another race condition was with the branch-name mappings. They were
previously stored in the 'config' file, which would also fail if two
'neon_local' commands tried to change it at the same time. To fix that
race condition, use advisory file lock to coordinate access to the
branch-name mappings, and only read/modify that file in commands that
actually need the mappings. While at it, move the mappings to a
separate 'branch_name_mappings.json' file.

The motivation for these changes is that I wanted to create a python
test that creates lots of tenants concurrently. Without these changes,
the concurrent tenant creations would often hit these 'neon_local'
race conditions and fail.

'neon_local' is still not concurrency-safe in general, but this is a
good start.
2023-04-24 10:34:01 +03:00
Heikki Linnakangas
73b369531d Store basic endpoint info in endpoint.json file.
More convenient than parsing the postgresql.conf file.
2023-04-24 09:19:43 +03:00
11 changed files with 295 additions and 120 deletions

11
Cargo.lock generated
View File

@@ -1105,6 +1105,7 @@ dependencies = [
"anyhow",
"clap 4.2.2",
"comfy-table",
"file-lock",
"git-version",
"nix",
"once_cell",
@@ -1550,6 +1551,16 @@ dependencies = [
"instant",
]
[[package]]
name = "file-lock"
version = "2.1.9"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f59be9010c5418713a48aac4c1b897d85dafd958055683dc31bdae553536647b"
dependencies = [
"cc",
"libc",
]
[[package]]
name = "filetime"
version = "0.2.21"

View File

@@ -42,6 +42,7 @@ either = "1.8"
enum-map = "2.4.2"
enumset = "1.0.12"
fail = "0.5.0"
file-lock = "2.1.9"
fs2 = "0.4.3"
futures = "0.3"
futures-core = "0.3"

View File

@@ -8,6 +8,7 @@ license.workspace = true
anyhow.workspace = true
clap.workspace = true
comfy-table.workspace = true
file-lock.workspace = true
git-version.workspace = true
nix.workspace = true
once_cell.workspace = true

View File

@@ -365,11 +365,7 @@ fn handle_tenant(tenant_match: &ArgMatches, env: &mut local_env::LocalEnv) -> an
let new_timeline_id = timeline_info.timeline_id;
let last_record_lsn = timeline_info.last_record_lsn;
env.register_branch_mapping(
DEFAULT_BRANCH_NAME.to_string(),
new_tenant_id,
new_timeline_id,
)?;
env.register_branch_mapping(DEFAULT_BRANCH_NAME, new_tenant_id, new_timeline_id)?;
println!(
"Created an initial timeline '{new_timeline_id}' at Lsn {last_record_lsn} for tenant: {new_tenant_id}",
@@ -411,7 +407,7 @@ fn handle_timeline(timeline_match: &ArgMatches, env: &mut local_env::LocalEnv) -
Some(("list", list_match)) => {
let tenant_id = get_tenant_id(list_match, env)?;
let timelines = pageserver.timeline_list(&tenant_id)?;
print_timelines_tree(timelines, env.timeline_name_mappings())?;
print_timelines_tree(timelines, env.timeline_name_mappings()?)?;
}
Some(("create", create_match)) => {
let tenant_id = get_tenant_id(create_match, env)?;
@@ -429,7 +425,7 @@ fn handle_timeline(timeline_match: &ArgMatches, env: &mut local_env::LocalEnv) -
let new_timeline_id = timeline_info.timeline_id;
let last_record_lsn = timeline_info.last_record_lsn;
env.register_branch_mapping(new_branch_name.to_string(), tenant_id, new_timeline_id)?;
env.register_branch_mapping(new_branch_name, tenant_id, new_timeline_id)?;
println!(
"Created timeline '{}' at Lsn {last_record_lsn} for tenant: {tenant_id}",
@@ -468,10 +464,10 @@ fn handle_timeline(timeline_match: &ArgMatches, env: &mut local_env::LocalEnv) -
.copied()
.context("Failed to parse postgres version from the argument string")?;
let mut cplane = ComputeControlPlane::load(env.clone())?;
let mut cplane = ComputeControlPlane::new(env.clone());
println!("Importing timeline into pageserver ...");
pageserver.timeline_import(tenant_id, timeline_id, base, pg_wal, pg_version)?;
env.register_branch_mapping(name.to_string(), tenant_id, timeline_id)?;
env.register_branch_mapping(name, tenant_id, timeline_id)?;
println!("Creating endpoint for imported timeline ...");
cplane.new_endpoint(tenant_id, name, timeline_id, None, None, pg_version)?;
@@ -487,7 +483,7 @@ fn handle_timeline(timeline_match: &ArgMatches, env: &mut local_env::LocalEnv) -
.map(|s| s.as_str())
.unwrap_or(DEFAULT_BRANCH_NAME);
let ancestor_timeline_id = env
.get_branch_timeline_id(ancestor_branch_name, tenant_id)
.get_branch_timeline_id(ancestor_branch_name, tenant_id)?
.ok_or_else(|| {
anyhow!("Found no timeline id for branch name '{ancestor_branch_name}'")
})?;
@@ -508,7 +504,7 @@ fn handle_timeline(timeline_match: &ArgMatches, env: &mut local_env::LocalEnv) -
let last_record_lsn = timeline_info.last_record_lsn;
env.register_branch_mapping(new_branch_name.to_string(), tenant_id, new_timeline_id)?;
env.register_branch_mapping(new_branch_name, tenant_id, new_timeline_id)?;
println!(
"Created timeline '{}' at Lsn {last_record_lsn} for tenant: {tenant_id}. Ancestor timeline: '{ancestor_branch_name}'",
@@ -528,7 +524,7 @@ fn handle_endpoint(ep_match: &ArgMatches, env: &local_env::LocalEnv) -> Result<(
None => bail!("no endpoint subcommand provided"),
};
let mut cplane = ComputeControlPlane::load(env.clone())?;
let mut cplane = ComputeControlPlane::new(env.clone());
// All subcommands take an optional --tenant-id option
let tenant_id = get_tenant_id(sub_args, env)?;
@@ -540,7 +536,7 @@ fn handle_endpoint(ep_match: &ArgMatches, env: &local_env::LocalEnv) -> Result<(
HashMap::new()
});
let timeline_name_mappings = env.timeline_name_mappings();
let timeline_name_mappings = env.timeline_name_mappings()?;
let mut table = comfy_table::Table::new();
@@ -555,8 +551,7 @@ fn handle_endpoint(ep_match: &ArgMatches, env: &local_env::LocalEnv) -> Result<(
"STATUS",
]);
for (endpoint_id, endpoint) in cplane
.endpoints
for (endpoint_id, endpoint) in ComputeControlPlane::load_endpoints(env)?
.iter()
.filter(|(_, endpoint)| endpoint.tenant_id == tenant_id)
{
@@ -609,7 +604,7 @@ fn handle_endpoint(ep_match: &ArgMatches, env: &local_env::LocalEnv) -> Result<(
.transpose()
.context("Failed to parse Lsn from the request")?;
let timeline_id = env
.get_branch_timeline_id(branch_name, tenant_id)
.get_branch_timeline_id(branch_name, tenant_id)?
.ok_or_else(|| anyhow!("Found no timeline id for branch name '{branch_name}'"))?;
let port: Option<u16> = sub_args.get_one::<u16>("port").copied();
@@ -627,7 +622,7 @@ fn handle_endpoint(ep_match: &ArgMatches, env: &local_env::LocalEnv) -> Result<(
.get_one::<String>("endpoint_id")
.ok_or_else(|| anyhow!("No endpoint ID was provided to start"))?;
let endpoint = cplane.endpoints.get(endpoint_id.as_str());
let endpoint = ComputeControlPlane::load_endpoint(endpoint_id.as_str(), env)?;
let auth_token = if matches!(env.pageserver.pg_auth_type, AuthType::NeonJWT) {
let claims = Claims::new(Some(tenant_id), Scope::Tenant);
@@ -646,7 +641,7 @@ fn handle_endpoint(ep_match: &ArgMatches, env: &local_env::LocalEnv) -> Result<(
.map(|s| s.as_str())
.unwrap_or(DEFAULT_BRANCH_NAME);
let timeline_id = env
.get_branch_timeline_id(branch_name, tenant_id)
.get_branch_timeline_id(branch_name, tenant_id)?
.ok_or_else(|| {
anyhow!("Found no timeline id for branch name '{branch_name}'")
})?;
@@ -683,9 +678,7 @@ fn handle_endpoint(ep_match: &ArgMatches, env: &local_env::LocalEnv) -> Result<(
.ok_or_else(|| anyhow!("No endpoint ID was provided to stop"))?;
let destroy = sub_args.get_flag("destroy");
let endpoint = cplane
.endpoints
.get(endpoint_id.as_str())
let endpoint = ComputeControlPlane::load_endpoint(endpoint_id.as_str(), env)?
.with_context(|| format!("postgres endpoint {endpoint_id} is not found"))?;
endpoint.stop(destroy)?;
}
@@ -844,9 +837,9 @@ fn try_stop_all(env: &local_env::LocalEnv, immediate: bool) {
let pageserver = PageServerNode::from_env(env);
// Stop all endpoints
match ComputeControlPlane::load(env.clone()) {
Ok(cplane) => {
for (_k, node) in cplane.endpoints {
match ComputeControlPlane::load_endpoints(env) {
Ok(endpoints) => {
for (_k, node) in endpoints {
if let Err(e) = node.stop(false) {
eprintln!("postgres stop failed: {e:#}");
}

View File

@@ -11,56 +11,104 @@ use std::sync::Arc;
use std::time::Duration;
use anyhow::{Context, Result};
use serde::{Deserialize, Serialize};
use serde_with::{serde_as, DisplayFromStr};
use utils::{
id::{TenantId, TimelineId},
lsn::Lsn,
};
use crate::local_env::{LocalEnv, DEFAULT_PG_VERSION};
use crate::local_env::LocalEnv;
use crate::pageserver::PageServerNode;
use crate::postgresql_conf::PostgresConf;
// contents of a endpoint.json file
#[serde_as]
#[derive(Serialize, Deserialize, PartialEq, Eq, Clone, Debug)]
pub struct EndpointConf {
name: String,
#[serde_as(as = "DisplayFromStr")]
tenant_id: TenantId,
#[serde_as(as = "DisplayFromStr")]
timeline_id: TimelineId,
#[serde_as(as = "Option<DisplayFromStr>")]
lsn: Option<Lsn>,
port: u16,
pg_version: u32,
}
//
// ComputeControlPlane
//
pub struct ComputeControlPlane {
base_port: u16,
// endpoint ID is the key
pub endpoints: BTreeMap<String, Arc<Endpoint>>,
env: LocalEnv,
pageserver: Arc<PageServerNode>,
}
impl ComputeControlPlane {
// Load current endpoints from the endpoints/ subdirectories
pub fn load(env: LocalEnv) -> Result<ComputeControlPlane> {
pub fn new(env: LocalEnv) -> Self {
let pageserver = Arc::new(PageServerNode::from_env(&env));
ComputeControlPlane {
base_port: 55431,
env,
pageserver,
}
}
// Load current endpoints from the endpoints/ subdirectories
//
// endpoint ID is the key in the returned BTreeMap.
//
// NOTE: This is not concurrency-safe, and can fail if another 'neon_local'
// invocation is creating or deleting an endpoint at the same time.
pub fn load_endpoints(env: &LocalEnv) -> Result<BTreeMap<String, Arc<Endpoint>>> {
let pageserver = Arc::new(PageServerNode::from_env(env));
let mut endpoints = BTreeMap::default();
for endpoint_dir in fs::read_dir(env.endpoints_path())
.with_context(|| format!("failed to list {}", env.endpoints_path().display()))?
{
let ep = Endpoint::from_dir_entry(endpoint_dir?, &env, &pageserver)?;
let ep = Endpoint::from_dir_entry(endpoint_dir?, env, &pageserver)?;
endpoints.insert(ep.name.clone(), Arc::new(ep));
}
Ok(ComputeControlPlane {
base_port: 55431,
endpoints,
env,
pageserver,
})
Ok(endpoints)
}
fn get_port(&mut self) -> u16 {
1 + self
.endpoints
// Load an endpoint from the endpoints/ subdirectories
pub fn load_endpoint(name: &str, env: &LocalEnv) -> Result<Option<Endpoint>> {
let endpoint_json_path = env.endpoints_path().join(name).join("endpoint.json");
if !endpoint_json_path.exists() {
return Ok(None);
}
// Read the endpoint.json file
let conf: EndpointConf = serde_json::from_slice(&std::fs::read(endpoint_json_path)?)?;
// ok now
let pageserver = Arc::new(PageServerNode::from_env(env));
Ok(Some(Endpoint {
address: SocketAddr::new("127.0.0.1".parse().unwrap(), conf.port),
name: name.to_string(),
env: env.clone(),
pageserver,
timeline_id: conf.timeline_id,
lsn: conf.lsn,
tenant_id: conf.tenant_id,
pg_version: conf.pg_version,
}))
}
fn get_port(&self) -> anyhow::Result<u16> {
let endpoints = ComputeControlPlane::load_endpoints(&self.env)?;
let next_port = 1 + endpoints
.values()
.map(|ep| ep.address.port())
.max()
.unwrap_or(self.base_port)
.unwrap_or(self.base_port);
Ok(next_port)
}
pub fn new_endpoint(
@@ -72,7 +120,13 @@ impl ComputeControlPlane {
port: Option<u16>,
pg_version: u32,
) -> Result<Arc<Endpoint>> {
let port = port.unwrap_or_else(|| self.get_port());
// NOTE: Unlike most of neon_local, 'new_endpoint' is safe to run from
// two 'neon_local' invocations at the same time, IF the port is specified
// explicitly. (get_port() is racy)
let port = match port {
Some(port) => port,
None => self.get_port()?,
};
let ep = Arc::new(Endpoint {
name: name.to_owned(),
address: SocketAddr::new("127.0.0.1".parse().unwrap(), port),
@@ -83,12 +137,20 @@ impl ComputeControlPlane {
tenant_id,
pg_version,
});
ep.create_pgdata()?;
std::fs::write(
ep.endpoint_path().join("endpoint.json"),
serde_json::to_string_pretty(&EndpointConf {
name: name.to_string(),
tenant_id,
timeline_id,
lsn,
port,
pg_version,
})?,
)?;
ep.setup_pg_conf()?;
self.endpoints.insert(ep.name.clone(), Arc::clone(&ep));
Ok(ep)
}
}
@@ -131,42 +193,20 @@ impl Endpoint {
let fname = entry.file_name();
let name = fname.to_str().unwrap().to_string();
// Read config file into memory
let cfg_path = entry.path().join("pgdata").join("postgresql.conf");
let cfg_path_str = cfg_path.to_string_lossy();
let mut conf_file = File::open(&cfg_path)
.with_context(|| format!("failed to open config file in {}", cfg_path_str))?;
let conf = PostgresConf::read(&mut conf_file)
.with_context(|| format!("failed to read config file in {}", cfg_path_str))?;
// Read a few options from the config file
let context = format!("in config file {}", cfg_path_str);
let port: u16 = conf.parse_field("port", &context)?;
let timeline_id: TimelineId = conf.parse_field("neon.timeline_id", &context)?;
let tenant_id: TenantId = conf.parse_field("neon.tenant_id", &context)?;
// Read postgres version from PG_VERSION file to determine which postgres version binary to use.
// If it doesn't exist, assume broken data directory and use default pg version.
let pg_version_path = entry.path().join("PG_VERSION");
let pg_version_str =
fs::read_to_string(pg_version_path).unwrap_or_else(|_| DEFAULT_PG_VERSION.to_string());
let pg_version = u32::from_str(&pg_version_str)?;
// parse recovery_target_lsn, if any
let recovery_target_lsn: Option<Lsn> =
conf.parse_field_optional("recovery_target_lsn", &context)?;
// Read the endpoint.json file
let conf: EndpointConf =
serde_json::from_slice(&std::fs::read(entry.path().join("endpoint.json"))?)?;
// ok now
Ok(Endpoint {
address: SocketAddr::new("127.0.0.1".parse().unwrap(), port),
address: SocketAddr::new("127.0.0.1".parse().unwrap(), conf.port),
name,
env: env.clone(),
pageserver: Arc::clone(pageserver),
timeline_id,
lsn: recovery_target_lsn,
tenant_id,
pg_version,
timeline_id: conf.timeline_id,
lsn: conf.lsn,
tenant_id: conf.tenant_id,
pg_version: conf.pg_version,
})
}

View File

@@ -5,6 +5,7 @@
use anyhow::{bail, ensure, Context};
use file_lock::{FileLock, FileOptions};
use postgres_backend::AuthType;
use reqwest::Url;
use serde::{Deserialize, Serialize};
@@ -12,11 +13,14 @@ use serde_with::{serde_as, DisplayFromStr};
use std::collections::HashMap;
use std::env;
use std::fs;
use std::io::Seek;
use std::net::IpAddr;
use std::net::Ipv4Addr;
use std::net::SocketAddr;
use std::ops::{Deref, DerefMut};
use std::path::{Path, PathBuf};
use std::process::{Command, Stdio};
use std::str::FromStr;
use utils::{
auth::{encode_from_key_file, Claims},
id::{NodeId, TenantId, TenantTimelineId, TimelineId},
@@ -72,14 +76,84 @@ pub struct LocalEnv {
#[serde(default)]
pub safekeepers: Vec<SafekeeperConf>,
}
/// Keep human-readable aliases in memory (and persist them to config), to hide ZId hex strings from the user.
#[serde(default)]
// A `HashMap<String, HashMap<TenantId, TimelineId>>` would be more appropriate here,
// but deserialization into a generic toml object as `toml::Value::try_from` fails with an error.
// https://toml.io/en/v1.0.0 does not contain a concept of "a table inside another table".
#[serde_as(as = "HashMap<_, Vec<(DisplayFromStr, DisplayFromStr)>>")]
branch_name_mappings: HashMap<String, Vec<(TenantId, TimelineId)>>,
// Keep human-readable aliases in memory (and persist them to
// 'branch_name_mappings.json'), to hide ZId hex strings from the user.
//
// BranchNameMappingsSerialized corresponds to the actual JSON format of
// 'branch_name_mappings.json' file. It's a bit more awkward to work with, so we convert
// it to/from BranchNameMappings when reading/writing the file.
type BranchNameMappings = HashMap<(TenantId, String), TimelineId>;
type BranchNameMappingsSerialized = HashMap<String, HashMap<String, String>>;
pub struct BranchNameMappingsLock {
mappings: BranchNameMappings,
lock: FileLock,
}
impl Deref for BranchNameMappingsLock {
type Target = HashMap<(TenantId, String), TimelineId>;
fn deref(&self) -> &Self::Target {
&self.mappings
}
}
impl DerefMut for BranchNameMappingsLock {
fn deref_mut(&mut self) -> &mut Self::Target {
&mut self.mappings
}
}
impl BranchNameMappingsLock {
/// Write the modified branch-name mapppings back to 'branch_name_mappings.json',
/// and release the lock.
fn write_to_file(mut self) -> anyhow::Result<()> {
let mut serialized_mappings: BranchNameMappingsSerialized = HashMap::new();
for ((tenant_id, branch_name), timeline_id) in self.iter() {
serialized_mappings
.entry(tenant_id.to_string())
.or_default()
.insert(branch_name.clone(), timeline_id.to_string());
}
self.lock.file.set_len(0)?;
self.lock.file.rewind()?;
serde_json::to_writer_pretty(&self.lock.file, &serialized_mappings)?;
Ok(())
}
}
/// Get the branch-name mappings.
///
/// This returns a guard object that holds a lock on the branch_name_mappings.json
/// file. That makes it safe for two 'neon_local' invocations to read/manipulate
/// branch name mappings at the same time.
pub fn load_branch_name_mappings() -> anyhow::Result<BranchNameMappingsLock> {
let path = base_path().join("branch_name_mappings.json");
let lock = FileLock::lock(
path,
true,
FileOptions::new().create(true).read(true).write(true),
)?;
let mut mappings = BranchNameMappings::new();
if lock.file.metadata()?.len() > 0 {
let serialized_mappings: BranchNameMappingsSerialized = serde_json::from_reader(&lock.file)
.context("Failed to read branch_name_mappings.json")?;
for (tenant_str, map) in serialized_mappings.iter() {
for (branch_name, timeline_str) in map.iter() {
mappings.insert(
(TenantId::from_str(tenant_str)?, branch_name.to_string()),
TimelineId::from_str(timeline_str)?,
);
}
}
}
Ok(BranchNameMappingsLock { mappings, lock })
}
/// Broker config for cluster internal communication.
@@ -215,27 +289,21 @@ impl LocalEnv {
pub fn register_branch_mapping(
&mut self,
branch_name: String,
branch_name: &str,
tenant_id: TenantId,
timeline_id: TimelineId,
) -> anyhow::Result<()> {
let existing_values = self
.branch_name_mappings
.entry(branch_name.clone())
.or_default();
let mut mappings = load_branch_name_mappings()?;
let existing_ids = existing_values
.iter()
.find(|(existing_tenant_id, _)| existing_tenant_id == &tenant_id);
if let Some((_, old_timeline_id)) = existing_ids {
if let Some(old_timeline_id) = mappings.get(&(tenant_id, branch_name.to_string())) {
if old_timeline_id == &timeline_id {
Ok(())
} else {
bail!("branch '{branch_name}' is already mapped to timeline {old_timeline_id}, cannot map to another timeline {timeline_id}");
}
} else {
existing_values.push((tenant_id, timeline_id));
mappings.insert((tenant_id, branch_name.to_string()), timeline_id);
mappings.write_to_file()?;
Ok(())
}
}
@@ -244,24 +312,22 @@ impl LocalEnv {
&self,
branch_name: &str,
tenant_id: TenantId,
) -> Option<TimelineId> {
self.branch_name_mappings
.get(branch_name)?
.iter()
.find(|(mapped_tenant_id, _)| mapped_tenant_id == &tenant_id)
.map(|&(_, timeline_id)| timeline_id)
.map(TimelineId::from)
) -> anyhow::Result<Option<TimelineId>> {
let mappings = load_branch_name_mappings()?;
Ok(mappings.get(&(tenant_id, branch_name.to_string())).copied())
}
pub fn timeline_name_mappings(&self) -> HashMap<TenantTimelineId, String> {
self.branch_name_mappings
pub fn timeline_name_mappings(&self) -> anyhow::Result<HashMap<TenantTimelineId, String>> {
let mappings = load_branch_name_mappings()?;
Ok(mappings
.iter()
.flat_map(|(name, tenant_timelines)| {
tenant_timelines.iter().map(|&(tenant_id, timeline_id)| {
(TenantTimelineId::new(tenant_id, timeline_id), name.clone())
})
.map(|((tenant_id, branch_name), timeline_id)| {
(
TenantTimelineId::new(*tenant_id, *timeline_id),
branch_name.clone(),
)
})
.collect()
.collect())
}
/// Create a LocalEnv from a config file.

View File

@@ -139,6 +139,15 @@ pub static REMOTE_ONDEMAND_DOWNLOADED_BYTES: Lazy<IntCounter> = Lazy::new(|| {
.unwrap()
});
pub static LOAD_LAYER_MAP_HISTOGRAM: Lazy<Histogram> = Lazy::new(|| {
register_histogram!(
"pageserver_load_layer_map_histogram",
"Time spent on loadiing layer map",
STORAGE_OP_BUCKETS.into(),
)
.expect("failed to define a metric")
});
static CURRENT_LOGICAL_SIZE: Lazy<UIntGaugeVec> = Lazy::new(|| {
register_uint_gauge_vec!(
"pageserver_current_logical_size",
@@ -552,7 +561,7 @@ impl StorageTimeMetricsTimer {
pub struct StorageTimeMetrics {
/// Sum of f64 seconds, per operation, tenant_id and timeline_id
timeline_sum: Counter,
/// Number of oeprations, per operation, tenant_id and timeline_id
/// Number of operations, per operation, tenant_id and timeline_id
timeline_count: IntCounter,
/// Global histogram having only the "operation" label.
global_histogram: Histogram,
@@ -595,7 +604,6 @@ pub struct TimelineMetrics {
pub compact_time_histo: StorageTimeMetrics,
pub create_images_time_histo: StorageTimeMetrics,
pub logical_size_histo: StorageTimeMetrics,
pub load_layer_map_histo: StorageTimeMetrics,
pub garbage_collect_histo: StorageTimeMetrics,
pub last_record_gauge: IntGauge,
pub wait_lsn_time_histo: Histogram,
@@ -627,8 +635,6 @@ impl TimelineMetrics {
let create_images_time_histo =
StorageTimeMetrics::new("create images", &tenant_id, &timeline_id);
let logical_size_histo = StorageTimeMetrics::new("logical size", &tenant_id, &timeline_id);
let load_layer_map_histo =
StorageTimeMetrics::new("load layer map", &tenant_id, &timeline_id);
let garbage_collect_histo = StorageTimeMetrics::new("gc", &tenant_id, &timeline_id);
let last_record_gauge = LAST_RECORD_LSN
.get_metric_with_label_values(&[&tenant_id, &timeline_id])
@@ -664,7 +670,6 @@ impl TimelineMetrics {
create_images_time_histo,
logical_size_histo,
garbage_collect_histo,
load_layer_map_histo,
last_record_gauge,
wait_lsn_time_histo,
resident_physical_size_gauge,

View File

@@ -48,7 +48,7 @@ use crate::tenant::{
use crate::config::PageServerConf;
use crate::keyspace::{KeyPartitioning, KeySpace};
use crate::metrics::TimelineMetrics;
use crate::metrics::{TimelineMetrics, LOAD_LAYER_MAP_HISTOGRAM};
use crate::pgdatadir_mapping::LsnForTimestamp;
use crate::pgdatadir_mapping::{is_rel_fsm_block_key, is_rel_vm_block_key};
use crate::pgdatadir_mapping::{BlockNumber, CalculateLogicalSizeError};
@@ -1444,7 +1444,7 @@ impl Timeline {
let mut updates = layers.batch_update();
let mut num_layers = 0;
let timer = self.metrics.load_layer_map_histo.start_timer();
let timer = LOAD_LAYER_MAP_HISTOGRAM.start_timer();
// Scan timeline directory and create ImageFileName and DeltaFilename
// structs representing all files on disk

View File

@@ -1,7 +1,10 @@
import threading
import timeit
from threading import BoundedSemaphore
import pytest
from fixtures.benchmark_fixture import MetricReport
from fixtures.compare_fixtures import NeonCompare
from fixtures.neon_fixtures import NeonEnvBuilder
# Run bulk tenant creation test.
@@ -50,3 +53,57 @@ def test_bulk_tenant_create(
"s",
report=MetricReport.LOWER_IS_BETTER,
)
@pytest.mark.parametrize("tenants_count", [50])
def test_parallel_tenant_create(
neon_compare: NeonCompare,
tenants_count: int,
):
"""Create lots of tenants in parallel
One important thing that this measures is the amount of prometheus
metrics per tenant. The pageserver exposes a lot of metrics for
each timeline, and this test gives some visibility to how much
exactly. (We've had to raise the prometheus scraper's limit on
the max metrics size several times, because we expose so many.)
"""
env = neon_compare.env
zenbenchmark = neon_compare.zenbenchmark
max_concurrent = 5
pool_sema = BoundedSemaphore(value=max_concurrent)
def worker(i: int):
with pool_sema:
tenant, timeline_id = env.neon_cli.create_tenant()
endpoint_tenant = env.endpoints.create_start("main", tenant_id=tenant)
with endpoint_tenant.cursor() as cur:
cur.execute("select count(*) from pg_class")
endpoint_tenant.stop()
threads = [threading.Thread(target=worker, args=(i,)) for i in range(tenants_count)]
start = timeit.default_timer()
for thread in threads:
thread.start()
for thread in threads:
thread.join()
end = timeit.default_timer()
zenbenchmark.record(
"tenant_creation_time",
end - start,
"s",
report=MetricReport.LOWER_IS_BETTER,
)
metrics = env.pageserver.http_client().get_metrics_str()
zenbenchmark.record(
"prometheus_metrics_size",
len(metrics) / tenants_count,
"bytes",
report=MetricReport.LOWER_IS_BETTER,
)

View File

@@ -16,7 +16,8 @@ from fixtures.neon_fixtures import (
)
from fixtures.pageserver.http import PageserverHttpClient
from fixtures.pageserver.utils import wait_for_last_record_lsn, wait_for_upload
from fixtures.types import Lsn
from fixtures.types import Lsn, TenantId, TimelineId
from fixtures.utils import query_scalar
from pytest import FixtureRequest
#
@@ -58,6 +59,10 @@ def test_create_snapshot(neon_env_builder: NeonEnvBuilder, pg_bin: PgBin, test_o
env = neon_env_builder.init_start()
endpoint = env.endpoints.create_start("main")
with endpoint.cursor() as cur:
tenant_id = TenantId(query_scalar(cur, "SHOW neon.tenant_id"))
timeline_id = TimelineId(query_scalar(cur, "SHOW neon.timeline_id"))
# FIXME: Is this expected?
env.pageserver.allowed_errors.append(
".*init_tenant_mgr: marking .* as locally complete, while it doesnt exist in remote index.*"
@@ -69,10 +74,6 @@ def test_create_snapshot(neon_env_builder: NeonEnvBuilder, pg_bin: PgBin, test_o
["pg_dumpall", f"--dbname={endpoint.connstr()}", f"--file={test_output_dir / 'dump.sql'}"]
)
snapshot_config = toml.load(test_output_dir / "repo" / "config")
tenant_id = snapshot_config["default_tenant_id"]
timeline_id = dict(snapshot_config["branch_name_mappings"]["main"])[tenant_id]
pageserver_http = env.pageserver.http_client()
lsn = Lsn(endpoint.safe_psql("SELECT pg_current_wal_flush_lsn()")[0][0])

View File

@@ -51,7 +51,7 @@ metric_kinds_checked = set([])
#
# verify that metrics look minilally sane
# verify that metrics look minimally sane
#
def metrics_handler(request: Request) -> Response:
if request.json is None: