mirror of
https://github.com/neondatabase/neon.git
synced 2026-01-17 10:22:56 +00:00
Compare commits
8 Commits
release-pr
...
tokio-unst
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
ac0b314b13 | ||
|
|
a8805d846b | ||
|
|
835dca6537 | ||
|
|
755bff9676 | ||
|
|
03e646cc96 | ||
|
|
52479df965 | ||
|
|
84b389ff92 | ||
|
|
ba16dfb1e8 |
5
.github/workflows/build_and_test.yml
vendored
5
.github/workflows/build_and_test.yml
vendored
@@ -184,9 +184,14 @@ jobs:
|
||||
fail-fast: false
|
||||
matrix:
|
||||
build_type: [ debug, release ]
|
||||
# add tokio_unstable on debug
|
||||
include:
|
||||
- build_type: debug
|
||||
rustflags: "--cfg=tokio_unstable"
|
||||
env:
|
||||
BUILD_TYPE: ${{ matrix.build_type }}
|
||||
GIT_VERSION: ${{ github.event.pull_request.head.sha || github.sha }}
|
||||
RUSTFLAGS: ${{ matrix.rustflags }}
|
||||
|
||||
steps:
|
||||
- name: Fix git ownership
|
||||
|
||||
13
Cargo.lock
generated
13
Cargo.lock
generated
@@ -2676,9 +2676,9 @@ checksum = "830d08ce1d1d941e6b30645f1a0eb5643013d835ce3779a5fc208261dbe10f55"
|
||||
|
||||
[[package]]
|
||||
name = "libc"
|
||||
version = "0.2.144"
|
||||
version = "0.2.149"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "2b00cc1c228a6782d0f076e7b232802e0c5689d41bb5df366f2a6b6621cfdfe1"
|
||||
checksum = "a08173bc88b7955d1b3145aa561539096c421ac8debde8cbc3612ec635fee29b"
|
||||
|
||||
[[package]]
|
||||
name = "libloading"
|
||||
@@ -2790,6 +2790,7 @@ dependencies = [
|
||||
"libc",
|
||||
"once_cell",
|
||||
"prometheus",
|
||||
"tokio",
|
||||
"workspace_hack",
|
||||
]
|
||||
|
||||
@@ -5357,18 +5358,18 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "tokio"
|
||||
version = "1.28.1"
|
||||
version = "1.33.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "0aa32867d44e6f2ce3385e89dceb990188b8bb0fb25b0cf576647a6f98ac5105"
|
||||
checksum = "4f38200e3ef7995e5ef13baec2f432a6da0aa9ac495b2c0e8f3b7eec2c92d653"
|
||||
dependencies = [
|
||||
"autocfg",
|
||||
"backtrace",
|
||||
"bytes",
|
||||
"libc",
|
||||
"mio",
|
||||
"num_cpus",
|
||||
"pin-project-lite",
|
||||
"signal-hook-registry",
|
||||
"socket2 0.4.9",
|
||||
"socket2 0.5.3",
|
||||
"tokio-macros",
|
||||
"windows-sys 0.48.0",
|
||||
]
|
||||
|
||||
@@ -136,7 +136,7 @@ tar = "0.4"
|
||||
test-context = "0.1"
|
||||
thiserror = "1.0"
|
||||
tls-listener = { version = "0.7", features = ["rustls", "hyper-h1"] }
|
||||
tokio = { version = "1.17", features = ["macros"] }
|
||||
tokio = { version = "1.29", features = ["macros"] }
|
||||
tokio-io-timeout = "1.2.0"
|
||||
tokio-postgres-rustls = "0.10.0"
|
||||
tokio-rustls = "0.24"
|
||||
|
||||
@@ -9,5 +9,6 @@ prometheus.workspace = true
|
||||
libc.workspace = true
|
||||
once_cell.workspace = true
|
||||
chrono.workspace = true
|
||||
tokio.workspace = true
|
||||
|
||||
workspace_hack.workspace = true
|
||||
|
||||
@@ -21,6 +21,8 @@ pub use prometheus::{register_int_gauge_vec, IntGaugeVec};
|
||||
pub use prometheus::{Encoder, TextEncoder};
|
||||
use prometheus::{Registry, Result};
|
||||
|
||||
pub mod tokio_metrics;
|
||||
|
||||
pub mod launch_timestamp;
|
||||
mod wrappers;
|
||||
pub use wrappers::{CountedReader, CountedWriter};
|
||||
|
||||
265
libs/metrics/src/tokio_metrics.rs
Normal file
265
libs/metrics/src/tokio_metrics.rs
Normal file
@@ -0,0 +1,265 @@
|
||||
use prometheus::{core::Desc, proto};
|
||||
|
||||
use crate::register_internal;
|
||||
|
||||
#[derive(Default)]
|
||||
pub struct TokioCollector {
|
||||
handles: Vec<(tokio::runtime::Handle, String)>,
|
||||
}
|
||||
|
||||
impl TokioCollector {
|
||||
pub fn add_runtime(mut self, handle: tokio::runtime::Handle, name: String) -> Self {
|
||||
self.handles.push((handle, name));
|
||||
self
|
||||
}
|
||||
|
||||
pub fn register(self) -> Result<(), prometheus::Error> {
|
||||
register_internal(Box::new(self))
|
||||
}
|
||||
|
||||
#[cfg(tokio_unstable)]
|
||||
fn gauge(
|
||||
&self,
|
||||
desc: &Desc,
|
||||
f: impl Fn(&tokio::runtime::RuntimeMetrics) -> f64,
|
||||
) -> proto::MetricFamily {
|
||||
self.metric(desc, proto::MetricType::GAUGE, |rt| {
|
||||
let mut g = proto::Gauge::default();
|
||||
g.set_value(f(rt));
|
||||
let mut m = proto::Metric::default();
|
||||
m.set_gauge(g);
|
||||
m
|
||||
})
|
||||
}
|
||||
|
||||
#[cfg(tokio_unstable)]
|
||||
fn counter_per_worker(
|
||||
&self,
|
||||
desc: &Desc,
|
||||
f: impl Fn(&tokio::runtime::RuntimeMetrics, usize) -> f64,
|
||||
) -> proto::MetricFamily {
|
||||
self.metric_per_worker(desc, proto::MetricType::COUNTER, |m, i| {
|
||||
let mut g = proto::Counter::default();
|
||||
g.set_value(f(m, i));
|
||||
let mut m = proto::Metric::default();
|
||||
m.set_counter(g);
|
||||
m
|
||||
})
|
||||
}
|
||||
|
||||
#[cfg(tokio_unstable)]
|
||||
fn metric(
|
||||
&self,
|
||||
desc: &Desc,
|
||||
typ: proto::MetricType,
|
||||
f: impl Fn(&tokio::runtime::RuntimeMetrics) -> proto::Metric,
|
||||
) -> proto::MetricFamily {
|
||||
self.metrics(desc, typ, |rt, labels, metrics| {
|
||||
let mut m = f(rt);
|
||||
m.set_label(labels.clone());
|
||||
metrics.push(m);
|
||||
})
|
||||
}
|
||||
|
||||
#[cfg(tokio_unstable)]
|
||||
fn metric_per_worker(
|
||||
&self,
|
||||
desc: &Desc,
|
||||
typ: proto::MetricType,
|
||||
f: impl Fn(&tokio::runtime::RuntimeMetrics, usize) -> proto::Metric,
|
||||
) -> proto::MetricFamily {
|
||||
self.metrics(desc, typ, |rt, labels, metrics| {
|
||||
let workers = rt.num_workers();
|
||||
for i in 0..workers {
|
||||
let mut m = f(rt, i);
|
||||
|
||||
let mut label_worker = proto::LabelPair::default();
|
||||
label_worker.set_name("worker".to_owned());
|
||||
label_worker.set_value(i.to_string());
|
||||
let mut labels = labels.clone();
|
||||
labels.push(label_worker);
|
||||
|
||||
m.set_label(labels);
|
||||
metrics.push(m);
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
#[cfg(tokio_unstable)]
|
||||
fn metrics(
|
||||
&self,
|
||||
desc: &Desc,
|
||||
typ: proto::MetricType,
|
||||
f: impl Fn(&tokio::runtime::RuntimeMetrics, Vec<proto::LabelPair>, &mut Vec<proto::Metric>),
|
||||
) -> proto::MetricFamily {
|
||||
let mut m = proto::MetricFamily::default();
|
||||
m.set_name(desc.fq_name.clone());
|
||||
m.set_help(desc.help.clone());
|
||||
m.set_field_type(typ);
|
||||
let mut metrics = vec![];
|
||||
for (rt, name) in &self.handles {
|
||||
let rt_metrics = rt.metrics();
|
||||
|
||||
let mut label_name = proto::LabelPair::default();
|
||||
label_name.set_name("runtime".to_owned());
|
||||
label_name.set_value(name.to_owned());
|
||||
|
||||
let labels = vec![label_name];
|
||||
|
||||
f(&rt_metrics, labels, &mut metrics);
|
||||
}
|
||||
|
||||
m.set_metric(metrics);
|
||||
m
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(tokio_unstable)]
|
||||
static ACTIVE_TASK_COUNT: once_cell::sync::Lazy<Desc> = once_cell::sync::Lazy::new(|| {
|
||||
Desc::new(
|
||||
"tokio_active_task_count_total".to_owned(),
|
||||
"the number of active tasks in the runtime".to_owned(),
|
||||
vec!["runtime".to_owned()],
|
||||
Default::default(),
|
||||
)
|
||||
.expect("should be a valid description")
|
||||
});
|
||||
#[cfg(tokio_unstable)]
|
||||
static WORKER_STEAL_COUNT: once_cell::sync::Lazy<Desc> = once_cell::sync::Lazy::new(|| {
|
||||
Desc::new(
|
||||
"tokio_worker_steal_count_total".to_owned(),
|
||||
"the number of tasks the given worker thread stole from another worker thread".to_owned(),
|
||||
vec!["runtime".to_owned(), "worker".to_owned()],
|
||||
Default::default(),
|
||||
)
|
||||
.expect("should be a valid description")
|
||||
});
|
||||
|
||||
#[allow(unused_mut, clippy::let_and_return)]
|
||||
impl prometheus::core::Collector for TokioCollector {
|
||||
fn desc(&self) -> Vec<&Desc> {
|
||||
let mut metrics = Vec::new();
|
||||
#[cfg(tokio_unstable)]
|
||||
metrics.extend([&*ACTIVE_TASK_COUNT, &*WORKER_STEAL_COUNT]);
|
||||
metrics
|
||||
}
|
||||
|
||||
fn collect(&self) -> Vec<proto::MetricFamily> {
|
||||
let mut metrics = Vec::new();
|
||||
#[cfg(tokio_unstable)]
|
||||
metrics.extend([
|
||||
self.gauge(&ACTIVE_TASK_COUNT, |m| m.active_tasks_count() as f64),
|
||||
self.counter_per_worker(&WORKER_STEAL_COUNT, |m, i| m.worker_steal_count(i) as f64),
|
||||
]);
|
||||
metrics
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
#[cfg(tokio_unstable)]
|
||||
mod tests {
|
||||
use std::{sync::Arc, time::Duration};
|
||||
|
||||
use prometheus::{Registry, TextEncoder};
|
||||
use tokio::{runtime, sync::Mutex, task::JoinSet};
|
||||
|
||||
use crate::tokio_metrics::TokioCollector;
|
||||
|
||||
#[test]
|
||||
fn gather_multiple_runtimes() {
|
||||
let registry = Registry::new();
|
||||
|
||||
let runtime1 = runtime::Builder::new_current_thread().build().unwrap();
|
||||
let runtime2 = runtime::Builder::new_current_thread().build().unwrap();
|
||||
|
||||
let collector = TokioCollector::default()
|
||||
.add_runtime(runtime1.handle().clone(), "runtime1".to_owned())
|
||||
.add_runtime(runtime2.handle().clone(), "runtime2".to_owned());
|
||||
|
||||
registry.register(Box::new(collector)).unwrap();
|
||||
|
||||
std::thread::scope(|s| {
|
||||
let lock1 = Arc::new(Mutex::new(0));
|
||||
let lock2 = Arc::new(Mutex::new(0));
|
||||
|
||||
let guard1 = lock1.clone().try_lock_owned().unwrap();
|
||||
let guard2 = lock2.clone().try_lock_owned().unwrap();
|
||||
|
||||
s.spawn(move || {
|
||||
runtime1.block_on(async {
|
||||
let mut joinset = JoinSet::new();
|
||||
for _ in 0..5 {
|
||||
let lock = lock1.clone();
|
||||
joinset.spawn(async move {
|
||||
let mut _guard = lock.lock().await;
|
||||
});
|
||||
}
|
||||
while let Some(_x) = joinset.join_next().await {}
|
||||
})
|
||||
});
|
||||
|
||||
s.spawn(move || {
|
||||
runtime2.block_on(async {
|
||||
let mut joinset = JoinSet::new();
|
||||
for _ in 0..5 {
|
||||
let lock = lock2.clone();
|
||||
joinset.spawn(async move {
|
||||
let mut _guard = lock.lock().await;
|
||||
});
|
||||
}
|
||||
while let Some(_x) = joinset.join_next().await {}
|
||||
})
|
||||
});
|
||||
|
||||
std::thread::sleep(Duration::from_millis(10));
|
||||
|
||||
let text = TextEncoder.encode_to_string(®istry.gather()).unwrap();
|
||||
assert_eq!(
|
||||
text,
|
||||
r#"# HELP tokio_active_task_count_total the number of active tasks in the runtime
|
||||
# TYPE tokio_active_task_count_total gauge
|
||||
tokio_active_task_count_total{runtime="runtime1"} 5
|
||||
tokio_active_task_count_total{runtime="runtime2"} 5
|
||||
# HELP tokio_worker_steal_count_total the number of tasks the given worker thread stole from another worker thread
|
||||
# TYPE tokio_worker_steal_count_total counter
|
||||
tokio_worker_steal_count_total{runtime="runtime1",worker="0"} 0
|
||||
tokio_worker_steal_count_total{runtime="runtime2",worker="0"} 0
|
||||
"#
|
||||
);
|
||||
|
||||
drop(guard1);
|
||||
std::thread::sleep(Duration::from_millis(10));
|
||||
|
||||
let text = TextEncoder.encode_to_string(®istry.gather()).unwrap();
|
||||
assert_eq!(
|
||||
text,
|
||||
r#"# HELP tokio_active_task_count_total the number of active tasks in the runtime
|
||||
# TYPE tokio_active_task_count_total gauge
|
||||
tokio_active_task_count_total{runtime="runtime1"} 0
|
||||
tokio_active_task_count_total{runtime="runtime2"} 5
|
||||
# HELP tokio_worker_steal_count_total the number of tasks the given worker thread stole from another worker thread
|
||||
# TYPE tokio_worker_steal_count_total counter
|
||||
tokio_worker_steal_count_total{runtime="runtime1",worker="0"} 0
|
||||
tokio_worker_steal_count_total{runtime="runtime2",worker="0"} 0
|
||||
"#
|
||||
);
|
||||
|
||||
drop(guard2);
|
||||
std::thread::sleep(Duration::from_millis(10));
|
||||
|
||||
let text = TextEncoder.encode_to_string(®istry.gather()).unwrap();
|
||||
assert_eq!(
|
||||
text,
|
||||
r#"# HELP tokio_active_task_count_total the number of active tasks in the runtime
|
||||
# TYPE tokio_active_task_count_total gauge
|
||||
tokio_active_task_count_total{runtime="runtime1"} 0
|
||||
tokio_active_task_count_total{runtime="runtime2"} 0
|
||||
# HELP tokio_worker_steal_count_total the number of tasks the given worker thread stole from another worker thread
|
||||
# TYPE tokio_worker_steal_count_total counter
|
||||
tokio_worker_steal_count_total{runtime="runtime1",worker="0"} 0
|
||||
tokio_worker_steal_count_total{runtime="runtime2",worker="0"} 0
|
||||
"#
|
||||
);
|
||||
});
|
||||
}
|
||||
}
|
||||
@@ -1639,6 +1639,9 @@ pub fn make_router(
|
||||
launch_ts: &'static LaunchTimestamp,
|
||||
auth: Option<Arc<JwtAuth>>,
|
||||
) -> anyhow::Result<RouterBuilder<hyper::Body, ApiError>> {
|
||||
// for the /metrics endpoint
|
||||
crate::task_mgr::runtime_collector().register()?;
|
||||
|
||||
let spec = include_bytes!("openapi_spec.yml");
|
||||
let mut router = attach_openapi_ui(endpoint::make_router(), spec, "/swagger.yml", "/v1/doc");
|
||||
if auth.is_some() {
|
||||
|
||||
@@ -42,6 +42,7 @@ use std::sync::atomic::{AtomicU64, Ordering};
|
||||
use std::sync::{Arc, Mutex};
|
||||
|
||||
use futures::FutureExt;
|
||||
use metrics::tokio_metrics::TokioCollector;
|
||||
use tokio::runtime::Runtime;
|
||||
use tokio::task::JoinHandle;
|
||||
use tokio::task_local;
|
||||
@@ -149,6 +150,20 @@ pub(crate) static BACKGROUND_RUNTIME_WORKER_THREADS: Lazy<usize> = Lazy::new(||
|
||||
.unwrap_or_else(|_e| usize::max(1, num_cpus::get()))
|
||||
});
|
||||
|
||||
pub fn runtime_collector() -> TokioCollector {
|
||||
TokioCollector::default()
|
||||
.add_runtime(
|
||||
COMPUTE_REQUEST_RUNTIME.handle().clone(),
|
||||
"compute".to_owned(),
|
||||
)
|
||||
.add_runtime(MGMT_REQUEST_RUNTIME.handle().clone(), "mgmt".to_owned())
|
||||
.add_runtime(
|
||||
WALRECEIVER_RUNTIME.handle().clone(),
|
||||
"walreceiver".to_owned(),
|
||||
)
|
||||
.add_runtime(BACKGROUND_RUNTIME.handle().clone(), "background".to_owned())
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, Copy)]
|
||||
pub struct PageserverTaskId(u64);
|
||||
|
||||
|
||||
@@ -1,3 +1,4 @@
|
||||
use ::metrics::tokio_metrics::TokioCollector;
|
||||
use futures::future::Either;
|
||||
use proxy::auth;
|
||||
use proxy::config::AuthenticationConfig;
|
||||
@@ -98,6 +99,10 @@ async fn main() -> anyhow::Result<()> {
|
||||
let _panic_hook_guard = utils::logging::replace_panic_hook_with_tracing_panic_hook();
|
||||
let _sentry_guard = init_sentry(Some(GIT_VERSION.into()), &[]);
|
||||
|
||||
TokioCollector::default()
|
||||
.add_runtime(tokio::runtime::Handle::current(), "main".to_owned())
|
||||
.register()?;
|
||||
|
||||
info!("Version: {GIT_VERSION}");
|
||||
::metrics::set_build_info_metric(GIT_VERSION);
|
||||
|
||||
|
||||
@@ -29,13 +29,13 @@ use safekeeper::defaults::{
|
||||
DEFAULT_HEARTBEAT_TIMEOUT, DEFAULT_HTTP_LISTEN_ADDR, DEFAULT_MAX_OFFLOADER_LAG_BYTES,
|
||||
DEFAULT_PG_LISTEN_ADDR,
|
||||
};
|
||||
use safekeeper::wal_service;
|
||||
use safekeeper::GlobalTimelines;
|
||||
use safekeeper::SafeKeeperConf;
|
||||
use safekeeper::{broker, WAL_SERVICE_RUNTIME};
|
||||
use safekeeper::{control_file, BROKER_RUNTIME};
|
||||
use safekeeper::{http, WAL_REMOVER_RUNTIME};
|
||||
use safekeeper::{remove_wal, WAL_BACKUP_RUNTIME};
|
||||
use safekeeper::{runtime_collector, wal_service};
|
||||
use safekeeper::{wal_backup, HTTP_RUNTIME};
|
||||
use storage_broker::DEFAULT_ENDPOINT;
|
||||
use utils::auth::{JwtAuth, Scope};
|
||||
@@ -340,6 +340,10 @@ async fn start_safekeeper(conf: SafeKeeperConf) -> Result<()> {
|
||||
let timeline_collector = safekeeper::metrics::TimelineCollector::new();
|
||||
metrics::register_internal(Box::new(timeline_collector))?;
|
||||
|
||||
runtime_collector()
|
||||
.add_runtime(Handle::current(), "main".to_owned())
|
||||
.register()?;
|
||||
|
||||
let (wal_backup_launcher_tx, wal_backup_launcher_rx) = mpsc::channel(100);
|
||||
|
||||
// Keep handles to main tasks to die if any of them disappears.
|
||||
|
||||
@@ -1,3 +1,4 @@
|
||||
use ::metrics::tokio_metrics::TokioCollector;
|
||||
use camino::Utf8PathBuf;
|
||||
use once_cell::sync::Lazy;
|
||||
use remote_storage::RemoteStorageConfig;
|
||||
@@ -165,3 +166,22 @@ pub static METRICS_SHIFTER_RUNTIME: Lazy<Runtime> = Lazy::new(|| {
|
||||
.build()
|
||||
.expect("Failed to create broker runtime")
|
||||
});
|
||||
|
||||
pub fn runtime_collector() -> TokioCollector {
|
||||
TokioCollector::default()
|
||||
.add_runtime(
|
||||
WAL_SERVICE_RUNTIME.handle().clone(),
|
||||
"wal service".to_owned(),
|
||||
)
|
||||
.add_runtime(HTTP_RUNTIME.handle().clone(), "http".to_owned())
|
||||
.add_runtime(BROKER_RUNTIME.handle().clone(), "broker".to_owned())
|
||||
.add_runtime(
|
||||
WAL_REMOVER_RUNTIME.handle().clone(),
|
||||
"wal remover".to_owned(),
|
||||
)
|
||||
.add_runtime(WAL_BACKUP_RUNTIME.handle().clone(), "wal backup".to_owned())
|
||||
.add_runtime(
|
||||
METRICS_SHIFTER_RUNTIME.handle().clone(),
|
||||
"metric shifter".to_owned(),
|
||||
)
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user