This commit is contained in:
Conrad Ludgate
2025-07-11 13:26:46 +01:00
parent e8ccb4a4d1
commit d5c17559ce
11 changed files with 48 additions and 50 deletions

28
Cargo.lock generated
View File

@@ -3715,9 +3715,7 @@ checksum = "490cc448043f947bae3cbee9c203358d62dbee0db12107a74be5c30ccfd09771"
[[package]]
name = "measured"
version = "0.0.22"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "3051f3a030d55d680cdef6ca50e80abd1182f8da29f2344a7c9cb575721138f0"
version = "0.0.23"
dependencies = [
"bytes",
"crossbeam-utils",
@@ -3726,6 +3724,7 @@ dependencies = [
"lasso",
"measured-derive",
"memchr",
"paracord",
"parking_lot 0.12.1",
"rustc-hash 1.1.0",
"ryu",
@@ -3733,9 +3732,7 @@ dependencies = [
[[package]]
name = "measured-derive"
version = "0.0.22"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b9e6777fc80a575f9503d908c8b498782a6c3ee88a06cb416dc3941401e43b94"
version = "0.0.23"
dependencies = [
"heck",
"proc-macro2",
@@ -3745,15 +3742,22 @@ dependencies = [
[[package]]
name = "measured-process"
version = "0.0.22"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7c4b80445aeb08e832d87bf1830049a924cdc1d6b7ef40b6b9b365bff17bf8ec"
version = "0.0.23"
dependencies = [
"libc",
"measured",
"procfs",
]
[[package]]
name = "measured-tokio"
version = "0.0.23"
dependencies = [
"itoa",
"measured",
"tokio",
]
[[package]]
name = "memchr"
version = "2.6.4"
@@ -4614,9 +4618,9 @@ dependencies = [
[[package]]
name = "paracord"
version = "0.1.0-rc.6"
version = "0.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "965ee8b44c8a2556aa73279eab88d5aa63a2859336d8a61295104c8d8a4fb248"
checksum = "5e41e924113f9a05eecd561c6e695648f243a03b3ad8d9b3eff689342b95023b"
dependencies = [
"boxcar",
"clashmap",
@@ -5384,8 +5388,8 @@ dependencies = [
"itoa",
"jose-jwa",
"jose-jwk",
"lasso",
"measured",
"measured-tokio",
"metrics",
"once_cell",
"opentelemetry",

View File

@@ -131,8 +131,9 @@ jsonwebtoken = "9"
lasso = "0.7"
libc = "0.2"
md5 = "0.7.0"
measured = { version = "0.0.22", features=["lasso"] }
measured-process = { version = "0.0.22" }
measured = { version = "0.0.23", features = ["paracord", "lasso"], path = "../../code/measured/core" }
measured-process = { version = "0.0.23", path = "../../code/measured/process" }
measured-tokio = { version = "0.0.23", path = "../../code/measured/tokio" }
memoffset = "0.9"
nix = { version = "0.30.1", features = ["dir", "fs", "mman", "process", "socket", "signal", "poll"] }
# Do not update to >= 7.0.0, at least. The update will have a significant impact
@@ -145,6 +146,7 @@ opentelemetry = "0.27"
opentelemetry_sdk = "0.27"
opentelemetry-otlp = { version = "0.27", default-features = false, features = ["http-proto", "trace", "http", "reqwest-client"] }
opentelemetry-semantic-conventions = "0.27"
paracord = { version = "0.1.0", features = ["serde"] }
parking_lot = "0.12"
parquet = { version = "53", default-features = false, features = ["zstd"] }
parquet_derive = "53"

View File

@@ -48,12 +48,13 @@ indexmap = { workspace = true, features = ["serde"] }
ipnet.workspace = true
itertools.workspace = true
itoa.workspace = true
lasso = { workspace = true, features = ["multi-threaded"] }
measured = { workspace = true, features = ["lasso"] }
measured = { workspace = true, features = ["paracord"] }
measured-tokio.workspace = true
metrics.workspace = true
once_cell.workspace = true
opentelemetry = { workspace = true, features = ["trace"] }
papaya = "0.2.0"
paracord.workspace = true
parking_lot.workspace = true
parquet.workspace = true
parquet_derive.workspace = true
@@ -114,7 +115,6 @@ ed25519-dalek = { version = "2", default-features = false, features = ["rand_cor
rsa = "0.9"
workspace_hack.workspace = true
paracord = { version = "0.1.0-rc.6", features = ["serde"] }
[dev-dependencies]
assert-json-diff.workspace = true
@@ -128,4 +128,4 @@ rstest.workspace = true
walkdir.workspace = true
rand_distr = "0.4"
tokio-postgres.workspace = true
tracing-test = "0.2"
tracing-test = "0.2"

View File

@@ -38,9 +38,9 @@ impl LocalBackend {
},
// TODO(conrad): make this better reflect compute info rather than endpoint info.
aux: MetricsAuxInfo {
endpoint_id: EndpointIdInt::from_str_or_intern("local"),
project_id: ProjectIdInt::from_str_or_intern("local"),
branch_id: BranchIdInt::from_str_or_intern("local"),
endpoint_id: EndpointIdInt::new("local"),
project_id: ProjectIdInt::new("local"),
branch_id: BranchIdInt::new("local"),
compute_id: "local".into(),
cold_start_info: ColdStartInfo::WarmCached,
},

View File

@@ -192,6 +192,7 @@ pub async fn run() -> anyhow::Result<()> {
jemalloc,
neon_metrics,
proxy: crate::metrics::Metrics::get(),
tokio: measured_tokio::RuntimeCollector::current(),
},
));

View File

@@ -513,6 +513,7 @@ pub async fn run() -> anyhow::Result<()> {
jemalloc,
neon_metrics,
proxy: crate::metrics::Metrics::get(),
tokio: measured_tokio::RuntimeCollector::current(),
},
));
maintenance_tasks.spawn(control_plane::mgmt::task_main(mgmt_listener));

View File

@@ -194,7 +194,7 @@ impl ProjectInfoCacheImpl {
&self,
endpoint_id: &EndpointId,
) -> Option<Ref<'_, EndpointIdInt, EndpointInfo>> {
let endpoint_id = EndpointIdInt::try_from_str(endpoint_id)?;
let endpoint_id = EndpointIdInt::try_new_existing(endpoint_id)?;
self.cache.get(&endpoint_id)
}
@@ -204,7 +204,7 @@ impl ProjectInfoCacheImpl {
role_name: &RoleName,
) -> Option<RoleAccessControl> {
let valid_since = self.get_cache_times();
let role_name = RoleNameInt::try_from_str(role_name)?;
let role_name = RoleNameInt::try_new_existing(role_name)?;
let endpoint_info = self.get_endpoint_cache(endpoint_id)?;
endpoint_info.get_role_secret(role_name, valid_since)
}
@@ -297,10 +297,10 @@ impl ProjectInfoCacheImpl {
}
pub fn maybe_invalidate_role_secret(&self, endpoint_id: &EndpointId, role_name: &RoleName) {
let Some(endpoint_id) = EndpointIdInt::try_from_str(endpoint_id) else {
let Some(endpoint_id) = EndpointIdInt::try_new_existing(endpoint_id) else {
return;
};
let Some(role_name) = RoleNameInt::try_from_str(role_name) else {
let Some(role_name) = RoleNameInt::try_new_existing(role_name) else {
return;
};

View File

@@ -71,6 +71,8 @@ pub struct AppMetrics {
pub neon_metrics: NeonMetrics,
#[metric(flatten)]
pub proxy: &'static crate::metrics::Metrics,
#[metric(namespace = "tokio")]
pub tokio: measured_tokio::RuntimeCollector,
}
async fn prometheus_metrics_handler(

View File

@@ -10,62 +10,50 @@ custom_key!(pub struct AccountIdInt);
impl From<&RoleName> for RoleNameInt {
fn from(value: &RoleName) -> Self {
RoleNameInt::from_str_or_intern(value)
RoleNameInt::new(value)
}
}
impl From<&EndpointId> for EndpointIdInt {
fn from(value: &EndpointId) -> Self {
EndpointIdInt::from_str_or_intern(value)
EndpointIdInt::new(value)
}
}
impl From<EndpointId> for EndpointIdInt {
fn from(value: EndpointId) -> Self {
EndpointIdInt::from_str_or_intern(&value)
EndpointIdInt::new(&value)
}
}
impl From<&BranchId> for BranchIdInt {
fn from(value: &BranchId) -> Self {
BranchIdInt::from_str_or_intern(value)
BranchIdInt::new(value)
}
}
impl From<BranchId> for BranchIdInt {
fn from(value: BranchId) -> Self {
BranchIdInt::from_str_or_intern(&value)
}
}
impl std::ops::Deref for BranchIdInt {
type Target = str;
fn deref(&self) -> &str {
self.as_str()
BranchIdInt::new(&value)
}
}
impl From<&ProjectId> for ProjectIdInt {
fn from(value: &ProjectId) -> Self {
ProjectIdInt::from_str_or_intern(value)
ProjectIdInt::new(value)
}
}
impl From<ProjectId> for ProjectIdInt {
fn from(value: ProjectId) -> Self {
ProjectIdInt::from_str_or_intern(&value)
}
}
impl std::ops::Deref for ProjectIdInt {
type Target = str;
fn deref(&self) -> &str {
self.as_str()
ProjectIdInt::new(&value)
}
}
impl From<&AccountId> for AccountIdInt {
fn from(value: &AccountId) -> Self {
AccountIdInt::from_str_or_intern(value)
AccountIdInt::new(value)
}
}
impl From<AccountId> for AccountIdInt {
fn from(value: AccountId) -> Self {
AccountIdInt::from_str_or_intern(&value)
AccountIdInt::new(&value)
}
}

View File

@@ -1,6 +1,5 @@
use std::sync::{Arc, OnceLock};
use lasso::ThreadedRodeo;
use measured::label::{
FixedCardinalitySet, LabelGroupSet, LabelName, LabelSet, LabelValue, StaticLabelSet,
};
@@ -11,6 +10,7 @@ use measured::{
MetricGroup,
};
use metrics::{CounterPairAssoc, CounterPairVec, HyperLogLogVec};
use paracord::ParaCord;
use tokio::time::{self, Instant};
use crate::control_plane::messages::ColdStartInfo;
@@ -222,7 +222,7 @@ pub enum CacheOutcome {
#[derive(LabelGroup)]
#[label(set = ConsoleRequestSet)]
pub struct ConsoleRequest<'a> {
#[label(dynamic_with = ThreadedRodeo, default)]
#[label(dynamic_with = ParaCord, default)]
pub request: &'a str,
}
@@ -345,7 +345,7 @@ pub struct ConnectionFailuresBreakdownGroup {
#[derive(LabelGroup, Copy, Clone)]
#[label(set = RedisErrorsSet)]
pub struct RedisErrors<'a> {
#[label(dynamic_with = ThreadedRodeo, default)]
#[label(dynamic_with = ParaCord, default)]
pub channel: &'a str,
}

View File

@@ -85,7 +85,7 @@ impl EndpointId {
#[must_use]
pub fn normalize_intern(&self) -> EndpointIdInt {
EndpointIdInt::from_str_or_intern(self.normalize_str())
EndpointIdInt::new(self.normalize_str())
}
}