Compare commits

...

8 Commits

Author SHA1 Message Date
Conrad Ludgate
ac0b314b13 immutable runtimes 2023-10-26 22:34:26 +01:00
Conrad Ludgate
a8805d846b disable test 2023-10-25 20:17:07 +01:00
Conrad Ludgate
835dca6537 fix warnings 2023-10-25 17:01:27 +01:00
Conrad Ludgate
755bff9676 update ci and register runtimes 2023-10-25 15:42:38 +01:00
Conrad Ludgate
03e646cc96 stable runtime name 2023-10-25 14:55:24 +01:00
Conrad Ludgate
52479df965 less duplication, more sharing 2023-10-24 17:16:42 +01:00
Conrad Ludgate
84b389ff92 remove a troublesome test 2023-10-24 17:08:13 +01:00
Conrad Ludgate
ba16dfb1e8 add tokio metrics collector 2023-10-24 16:58:49 +01:00
11 changed files with 329 additions and 8 deletions

View File

@@ -184,9 +184,14 @@ jobs:
fail-fast: false
matrix:
build_type: [ debug, release ]
# add tokio_unstable on debug
include:
- build_type: debug
rustflags: "--cfg=tokio_unstable"
env:
BUILD_TYPE: ${{ matrix.build_type }}
GIT_VERSION: ${{ github.event.pull_request.head.sha || github.sha }}
RUSTFLAGS: ${{ matrix.rustflags }}
steps:
- name: Fix git ownership

13
Cargo.lock generated
View File

@@ -2676,9 +2676,9 @@ checksum = "830d08ce1d1d941e6b30645f1a0eb5643013d835ce3779a5fc208261dbe10f55"
[[package]]
name = "libc"
version = "0.2.144"
version = "0.2.149"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "2b00cc1c228a6782d0f076e7b232802e0c5689d41bb5df366f2a6b6621cfdfe1"
checksum = "a08173bc88b7955d1b3145aa561539096c421ac8debde8cbc3612ec635fee29b"
[[package]]
name = "libloading"
@@ -2790,6 +2790,7 @@ dependencies = [
"libc",
"once_cell",
"prometheus",
"tokio",
"workspace_hack",
]
@@ -5357,18 +5358,18 @@ dependencies = [
[[package]]
name = "tokio"
version = "1.28.1"
version = "1.33.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0aa32867d44e6f2ce3385e89dceb990188b8bb0fb25b0cf576647a6f98ac5105"
checksum = "4f38200e3ef7995e5ef13baec2f432a6da0aa9ac495b2c0e8f3b7eec2c92d653"
dependencies = [
"autocfg",
"backtrace",
"bytes",
"libc",
"mio",
"num_cpus",
"pin-project-lite",
"signal-hook-registry",
"socket2 0.4.9",
"socket2 0.5.3",
"tokio-macros",
"windows-sys 0.48.0",
]

View File

@@ -136,7 +136,7 @@ tar = "0.4"
test-context = "0.1"
thiserror = "1.0"
tls-listener = { version = "0.7", features = ["rustls", "hyper-h1"] }
tokio = { version = "1.17", features = ["macros"] }
tokio = { version = "1.29", features = ["macros"] }
tokio-io-timeout = "1.2.0"
tokio-postgres-rustls = "0.10.0"
tokio-rustls = "0.24"

View File

@@ -9,5 +9,6 @@ prometheus.workspace = true
libc.workspace = true
once_cell.workspace = true
chrono.workspace = true
tokio.workspace = true
workspace_hack.workspace = true

View File

@@ -21,6 +21,8 @@ pub use prometheus::{register_int_gauge_vec, IntGaugeVec};
pub use prometheus::{Encoder, TextEncoder};
use prometheus::{Registry, Result};
pub mod tokio_metrics;
pub mod launch_timestamp;
mod wrappers;
pub use wrappers::{CountedReader, CountedWriter};

View File

@@ -0,0 +1,265 @@
use prometheus::{core::Desc, proto};
use crate::register_internal;
#[derive(Default)]
pub struct TokioCollector {
handles: Vec<(tokio::runtime::Handle, String)>,
}
impl TokioCollector {
pub fn add_runtime(mut self, handle: tokio::runtime::Handle, name: String) -> Self {
self.handles.push((handle, name));
self
}
pub fn register(self) -> Result<(), prometheus::Error> {
register_internal(Box::new(self))
}
#[cfg(tokio_unstable)]
fn gauge(
&self,
desc: &Desc,
f: impl Fn(&tokio::runtime::RuntimeMetrics) -> f64,
) -> proto::MetricFamily {
self.metric(desc, proto::MetricType::GAUGE, |rt| {
let mut g = proto::Gauge::default();
g.set_value(f(rt));
let mut m = proto::Metric::default();
m.set_gauge(g);
m
})
}
#[cfg(tokio_unstable)]
fn counter_per_worker(
&self,
desc: &Desc,
f: impl Fn(&tokio::runtime::RuntimeMetrics, usize) -> f64,
) -> proto::MetricFamily {
self.metric_per_worker(desc, proto::MetricType::COUNTER, |m, i| {
let mut g = proto::Counter::default();
g.set_value(f(m, i));
let mut m = proto::Metric::default();
m.set_counter(g);
m
})
}
#[cfg(tokio_unstable)]
fn metric(
&self,
desc: &Desc,
typ: proto::MetricType,
f: impl Fn(&tokio::runtime::RuntimeMetrics) -> proto::Metric,
) -> proto::MetricFamily {
self.metrics(desc, typ, |rt, labels, metrics| {
let mut m = f(rt);
m.set_label(labels.clone());
metrics.push(m);
})
}
#[cfg(tokio_unstable)]
fn metric_per_worker(
&self,
desc: &Desc,
typ: proto::MetricType,
f: impl Fn(&tokio::runtime::RuntimeMetrics, usize) -> proto::Metric,
) -> proto::MetricFamily {
self.metrics(desc, typ, |rt, labels, metrics| {
let workers = rt.num_workers();
for i in 0..workers {
let mut m = f(rt, i);
let mut label_worker = proto::LabelPair::default();
label_worker.set_name("worker".to_owned());
label_worker.set_value(i.to_string());
let mut labels = labels.clone();
labels.push(label_worker);
m.set_label(labels);
metrics.push(m);
}
})
}
#[cfg(tokio_unstable)]
fn metrics(
&self,
desc: &Desc,
typ: proto::MetricType,
f: impl Fn(&tokio::runtime::RuntimeMetrics, Vec<proto::LabelPair>, &mut Vec<proto::Metric>),
) -> proto::MetricFamily {
let mut m = proto::MetricFamily::default();
m.set_name(desc.fq_name.clone());
m.set_help(desc.help.clone());
m.set_field_type(typ);
let mut metrics = vec![];
for (rt, name) in &self.handles {
let rt_metrics = rt.metrics();
let mut label_name = proto::LabelPair::default();
label_name.set_name("runtime".to_owned());
label_name.set_value(name.to_owned());
let labels = vec![label_name];
f(&rt_metrics, labels, &mut metrics);
}
m.set_metric(metrics);
m
}
}
#[cfg(tokio_unstable)]
static ACTIVE_TASK_COUNT: once_cell::sync::Lazy<Desc> = once_cell::sync::Lazy::new(|| {
Desc::new(
"tokio_active_task_count_total".to_owned(),
"the number of active tasks in the runtime".to_owned(),
vec!["runtime".to_owned()],
Default::default(),
)
.expect("should be a valid description")
});
#[cfg(tokio_unstable)]
static WORKER_STEAL_COUNT: once_cell::sync::Lazy<Desc> = once_cell::sync::Lazy::new(|| {
Desc::new(
"tokio_worker_steal_count_total".to_owned(),
"the number of tasks the given worker thread stole from another worker thread".to_owned(),
vec!["runtime".to_owned(), "worker".to_owned()],
Default::default(),
)
.expect("should be a valid description")
});
#[allow(unused_mut, clippy::let_and_return)]
impl prometheus::core::Collector for TokioCollector {
fn desc(&self) -> Vec<&Desc> {
let mut metrics = Vec::new();
#[cfg(tokio_unstable)]
metrics.extend([&*ACTIVE_TASK_COUNT, &*WORKER_STEAL_COUNT]);
metrics
}
fn collect(&self) -> Vec<proto::MetricFamily> {
let mut metrics = Vec::new();
#[cfg(tokio_unstable)]
metrics.extend([
self.gauge(&ACTIVE_TASK_COUNT, |m| m.active_tasks_count() as f64),
self.counter_per_worker(&WORKER_STEAL_COUNT, |m, i| m.worker_steal_count(i) as f64),
]);
metrics
}
}
#[cfg(test)]
#[cfg(tokio_unstable)]
mod tests {
use std::{sync::Arc, time::Duration};
use prometheus::{Registry, TextEncoder};
use tokio::{runtime, sync::Mutex, task::JoinSet};
use crate::tokio_metrics::TokioCollector;
#[test]
fn gather_multiple_runtimes() {
let registry = Registry::new();
let runtime1 = runtime::Builder::new_current_thread().build().unwrap();
let runtime2 = runtime::Builder::new_current_thread().build().unwrap();
let collector = TokioCollector::default()
.add_runtime(runtime1.handle().clone(), "runtime1".to_owned())
.add_runtime(runtime2.handle().clone(), "runtime2".to_owned());
registry.register(Box::new(collector)).unwrap();
std::thread::scope(|s| {
let lock1 = Arc::new(Mutex::new(0));
let lock2 = Arc::new(Mutex::new(0));
let guard1 = lock1.clone().try_lock_owned().unwrap();
let guard2 = lock2.clone().try_lock_owned().unwrap();
s.spawn(move || {
runtime1.block_on(async {
let mut joinset = JoinSet::new();
for _ in 0..5 {
let lock = lock1.clone();
joinset.spawn(async move {
let mut _guard = lock.lock().await;
});
}
while let Some(_x) = joinset.join_next().await {}
})
});
s.spawn(move || {
runtime2.block_on(async {
let mut joinset = JoinSet::new();
for _ in 0..5 {
let lock = lock2.clone();
joinset.spawn(async move {
let mut _guard = lock.lock().await;
});
}
while let Some(_x) = joinset.join_next().await {}
})
});
std::thread::sleep(Duration::from_millis(10));
let text = TextEncoder.encode_to_string(&registry.gather()).unwrap();
assert_eq!(
text,
r#"# HELP tokio_active_task_count_total the number of active tasks in the runtime
# TYPE tokio_active_task_count_total gauge
tokio_active_task_count_total{runtime="runtime1"} 5
tokio_active_task_count_total{runtime="runtime2"} 5
# HELP tokio_worker_steal_count_total the number of tasks the given worker thread stole from another worker thread
# TYPE tokio_worker_steal_count_total counter
tokio_worker_steal_count_total{runtime="runtime1",worker="0"} 0
tokio_worker_steal_count_total{runtime="runtime2",worker="0"} 0
"#
);
drop(guard1);
std::thread::sleep(Duration::from_millis(10));
let text = TextEncoder.encode_to_string(&registry.gather()).unwrap();
assert_eq!(
text,
r#"# HELP tokio_active_task_count_total the number of active tasks in the runtime
# TYPE tokio_active_task_count_total gauge
tokio_active_task_count_total{runtime="runtime1"} 0
tokio_active_task_count_total{runtime="runtime2"} 5
# HELP tokio_worker_steal_count_total the number of tasks the given worker thread stole from another worker thread
# TYPE tokio_worker_steal_count_total counter
tokio_worker_steal_count_total{runtime="runtime1",worker="0"} 0
tokio_worker_steal_count_total{runtime="runtime2",worker="0"} 0
"#
);
drop(guard2);
std::thread::sleep(Duration::from_millis(10));
let text = TextEncoder.encode_to_string(&registry.gather()).unwrap();
assert_eq!(
text,
r#"# HELP tokio_active_task_count_total the number of active tasks in the runtime
# TYPE tokio_active_task_count_total gauge
tokio_active_task_count_total{runtime="runtime1"} 0
tokio_active_task_count_total{runtime="runtime2"} 0
# HELP tokio_worker_steal_count_total the number of tasks the given worker thread stole from another worker thread
# TYPE tokio_worker_steal_count_total counter
tokio_worker_steal_count_total{runtime="runtime1",worker="0"} 0
tokio_worker_steal_count_total{runtime="runtime2",worker="0"} 0
"#
);
});
}
}

View File

@@ -1639,6 +1639,9 @@ pub fn make_router(
launch_ts: &'static LaunchTimestamp,
auth: Option<Arc<JwtAuth>>,
) -> anyhow::Result<RouterBuilder<hyper::Body, ApiError>> {
// for the /metrics endpoint
crate::task_mgr::runtime_collector().register()?;
let spec = include_bytes!("openapi_spec.yml");
let mut router = attach_openapi_ui(endpoint::make_router(), spec, "/swagger.yml", "/v1/doc");
if auth.is_some() {

View File

@@ -42,6 +42,7 @@ use std::sync::atomic::{AtomicU64, Ordering};
use std::sync::{Arc, Mutex};
use futures::FutureExt;
use metrics::tokio_metrics::TokioCollector;
use tokio::runtime::Runtime;
use tokio::task::JoinHandle;
use tokio::task_local;
@@ -149,6 +150,20 @@ pub(crate) static BACKGROUND_RUNTIME_WORKER_THREADS: Lazy<usize> = Lazy::new(||
.unwrap_or_else(|_e| usize::max(1, num_cpus::get()))
});
pub fn runtime_collector() -> TokioCollector {
TokioCollector::default()
.add_runtime(
COMPUTE_REQUEST_RUNTIME.handle().clone(),
"compute".to_owned(),
)
.add_runtime(MGMT_REQUEST_RUNTIME.handle().clone(), "mgmt".to_owned())
.add_runtime(
WALRECEIVER_RUNTIME.handle().clone(),
"walreceiver".to_owned(),
)
.add_runtime(BACKGROUND_RUNTIME.handle().clone(), "background".to_owned())
}
#[derive(Debug, Clone, Copy)]
pub struct PageserverTaskId(u64);

View File

@@ -1,3 +1,4 @@
use ::metrics::tokio_metrics::TokioCollector;
use futures::future::Either;
use proxy::auth;
use proxy::config::AuthenticationConfig;
@@ -98,6 +99,10 @@ async fn main() -> anyhow::Result<()> {
let _panic_hook_guard = utils::logging::replace_panic_hook_with_tracing_panic_hook();
let _sentry_guard = init_sentry(Some(GIT_VERSION.into()), &[]);
TokioCollector::default()
.add_runtime(tokio::runtime::Handle::current(), "main".to_owned())
.register()?;
info!("Version: {GIT_VERSION}");
::metrics::set_build_info_metric(GIT_VERSION);

View File

@@ -29,13 +29,13 @@ use safekeeper::defaults::{
DEFAULT_HEARTBEAT_TIMEOUT, DEFAULT_HTTP_LISTEN_ADDR, DEFAULT_MAX_OFFLOADER_LAG_BYTES,
DEFAULT_PG_LISTEN_ADDR,
};
use safekeeper::wal_service;
use safekeeper::GlobalTimelines;
use safekeeper::SafeKeeperConf;
use safekeeper::{broker, WAL_SERVICE_RUNTIME};
use safekeeper::{control_file, BROKER_RUNTIME};
use safekeeper::{http, WAL_REMOVER_RUNTIME};
use safekeeper::{remove_wal, WAL_BACKUP_RUNTIME};
use safekeeper::{runtime_collector, wal_service};
use safekeeper::{wal_backup, HTTP_RUNTIME};
use storage_broker::DEFAULT_ENDPOINT;
use utils::auth::{JwtAuth, Scope};
@@ -340,6 +340,10 @@ async fn start_safekeeper(conf: SafeKeeperConf) -> Result<()> {
let timeline_collector = safekeeper::metrics::TimelineCollector::new();
metrics::register_internal(Box::new(timeline_collector))?;
runtime_collector()
.add_runtime(Handle::current(), "main".to_owned())
.register()?;
let (wal_backup_launcher_tx, wal_backup_launcher_rx) = mpsc::channel(100);
// Keep handles to main tasks to die if any of them disappears.

View File

@@ -1,3 +1,4 @@
use ::metrics::tokio_metrics::TokioCollector;
use camino::Utf8PathBuf;
use once_cell::sync::Lazy;
use remote_storage::RemoteStorageConfig;
@@ -165,3 +166,22 @@ pub static METRICS_SHIFTER_RUNTIME: Lazy<Runtime> = Lazy::new(|| {
.build()
.expect("Failed to create broker runtime")
});
pub fn runtime_collector() -> TokioCollector {
TokioCollector::default()
.add_runtime(
WAL_SERVICE_RUNTIME.handle().clone(),
"wal service".to_owned(),
)
.add_runtime(HTTP_RUNTIME.handle().clone(), "http".to_owned())
.add_runtime(BROKER_RUNTIME.handle().clone(), "broker".to_owned())
.add_runtime(
WAL_REMOVER_RUNTIME.handle().clone(),
"wal remover".to_owned(),
)
.add_runtime(WAL_BACKUP_RUNTIME.handle().clone(), "wal backup".to_owned())
.add_runtime(
METRICS_SHIFTER_RUNTIME.handle().clone(),
"metric shifter".to_owned(),
)
}