From 70fdd75c89375f80404a4485a451f14c2e44f256 Mon Sep 17 00:00:00 2001 From: Heikki Linnakangas Date: Mon, 14 Jul 2025 14:32:39 +0300 Subject: [PATCH] Introduce built-in Prometheus exporter to the Postgres extension Currently, the exporter exposes the same LFC metrics that are exposed by the "autoscaling" sql_exporter in the docker image. With this, we can remove the dedicated sql_exporter instance. But that's left as a TODO until this is rolled out to production and we have changed autoscaling-agent to fetch the metrics from this new endpoint. The exporter runs as a Postgres background worker process. This is extracted from the Rust communicator rewrite project, which will use the same worker process for much more, to handle the communications with the pageservers. For now, though, it merely handles the metrics requests. In the future, we will add more metrics, and perhaps even APIs to control the running Postgres instance. The exporter listens on a Unix Domain socket within the Postgres data directory. A Unix Domain socket is a bit inconventional, but it has some advantages: - Permissions are taken care of. Only processes that can access the data directory, and therefore already have full access to the running Postgres instance, can connect to it. - No need to allocate and manage a new port number for the listener It has some downsides too: it's not immediately accessible from the outside world, and the functions to work with Unix Domain sockets are more low-level than TCP sockets (see the symlink hack in `postgres_metrics_client.rs`, for example). To expose the metrics from the local Unix Domain Socket to the autoscaling agent, introduce a new '/autoscaling_metrics' endpoint in the compute_ctl's HTTP server. Currently it merely forwards the request to the Postgres instance, but we could add rate limiting and access control there in the future. --- Cargo.lock | 12 +- compute_tools/Cargo.toml | 4 + compute_tools/src/http/routes/metrics.rs | 63 ++++- compute_tools/src/http/server.rs | 8 +- compute_tools/src/lib.rs | 1 + compute_tools/src/postgres_metrics_client.rs | 97 ++++++++ pgxn/neon/Makefile | 1 + pgxn/neon/communicator/Cargo.toml | 9 +- pgxn/neon/communicator/README.md | 23 +- pgxn/neon/communicator/src/lib.rs | 7 +- .../src/worker_process/callbacks.rs | 35 +++ .../src/worker_process/lfc_metrics.rs | 91 +++++++ .../src/worker_process/logging.rs | 228 ++++++++++++++++++ .../src/worker_process/main_loop.rs | 40 +++ .../src/worker_process/metrics_exporter.rs | 99 ++++++++ .../communicator/src/worker_process/mod.rs | 13 + .../src/worker_process/worker_interface.rs | 39 +++ pgxn/neon/communicator_process.c | 178 ++++++++++++++ pgxn/neon/communicator_process.h | 17 ++ pgxn/neon/file_cache.c | 34 +++ pgxn/neon/neon.c | 6 +- poetry.lock | 19 +- pyproject.toml | 1 + test_runner/fixtures/endpoint/http.py | 6 + test_runner/fixtures/neon_fixtures.py | 1 + .../test_communicator_metrics_exporter.py | 52 ++++ .../test_lfc_working_set_approximation.py | 16 +- 27 files changed, 1079 insertions(+), 21 deletions(-) create mode 100644 compute_tools/src/postgres_metrics_client.rs create mode 100644 pgxn/neon/communicator/src/worker_process/callbacks.rs create mode 100644 pgxn/neon/communicator/src/worker_process/lfc_metrics.rs create mode 100644 pgxn/neon/communicator/src/worker_process/logging.rs create mode 100644 pgxn/neon/communicator/src/worker_process/main_loop.rs create mode 100644 pgxn/neon/communicator/src/worker_process/metrics_exporter.rs create mode 100644 pgxn/neon/communicator/src/worker_process/mod.rs create mode 100644 pgxn/neon/communicator/src/worker_process/worker_interface.rs create mode 100644 pgxn/neon/communicator_process.c create mode 100644 pgxn/neon/communicator_process.h create mode 100644 test_runner/regress/test_communicator_metrics_exporter.py diff --git a/Cargo.lock b/Cargo.lock index 2f36790d30..53bce59652 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1290,8 +1290,14 @@ dependencies = [ name = "communicator" version = "0.1.0" dependencies = [ + "axum", "cbindgen", - "neon-shmem", + "http 1.1.0", + "metrics", + "tokio", + "tracing", + "tracing-subscriber", + "utils", "workspace_hack", ] @@ -1335,6 +1341,9 @@ dependencies = [ "futures", "hostname-validator", "http 1.1.0", + "http-body-util", + "hyper 1.4.1", + "hyper-util", "indexmap 2.9.0", "itertools 0.10.5", "jsonwebtoken", @@ -1357,6 +1366,7 @@ dependencies = [ "ring", "rlimit", "rust-ini", + "scopeguard", "serde", "serde_json", "serde_with", diff --git a/compute_tools/Cargo.toml b/compute_tools/Cargo.toml index 910bae3bda..496471acc7 100644 --- a/compute_tools/Cargo.toml +++ b/compute_tools/Cargo.toml @@ -27,7 +27,10 @@ fail.workspace = true flate2.workspace = true futures.workspace = true http.workspace = true +http-body-util.workspace = true hostname-validator = "1.1" +hyper.workspace = true +hyper-util.workspace = true indexmap.workspace = true itertools.workspace = true jsonwebtoken.workspace = true @@ -44,6 +47,7 @@ postgres.workspace = true regex.workspace = true reqwest = { workspace = true, features = ["json"] } ring = "0.17" +scopeguard.workspace = true serde.workspace = true serde_with.workspace = true serde_json.workspace = true diff --git a/compute_tools/src/http/routes/metrics.rs b/compute_tools/src/http/routes/metrics.rs index da8d8b20a5..32c8a294a7 100644 --- a/compute_tools/src/http/routes/metrics.rs +++ b/compute_tools/src/http/routes/metrics.rs @@ -1,12 +1,20 @@ +use std::path::Path; +use std::sync::Arc; + +use anyhow::Context; use axum::body::Body; +use axum::extract::State; use axum::response::Response; -use http::StatusCode; use http::header::CONTENT_TYPE; +use http_body_util::BodyExt; +use hyper::{Request, StatusCode}; use metrics::proto::MetricFamily; use metrics::{Encoder, TextEncoder}; +use crate::compute::ComputeNode; use crate::http::JsonResponse; use crate::metrics::collect; +use crate::postgres_metrics_client::connect_postgres_metrics_socket; /// Expose Prometheus metrics. pub(in crate::http) async fn get_metrics() -> Response { @@ -31,3 +39,56 @@ pub(in crate::http) async fn get_metrics() -> Response { .body(Body::from(buffer)) .unwrap() } + +/// Fetch and forward metrics from the Postgres neon extension's metrics +/// exporter that are used by autoscaling-agent. +/// +/// The neon extension exposes these metrics over a Unix domain socket +/// in the data directory. That's not accessible directly from the outside +/// world, so we have this endpoint in compute_ctl to expose it +pub(in crate::http) async fn get_autoscaling_metrics( + State(compute): State>, +) -> Result { + let pgdata = Path::new(&compute.params.pgdata); + + // Connect to the communicator process's metrics socket + let mut metrics_client = connect_postgres_metrics_socket(pgdata) + .await + .map_err(|e| JsonResponse::error(StatusCode::INTERNAL_SERVER_ERROR, format!("{e:#}")))?; + + // Make a request for /autoscaling_metrics + let request = Request::builder() + .method("GET") + .uri("/autoscaling_metrics") + .header("Host", "localhost") // hyper requires Host, even though the server won't care + .body(Body::from("")) + .unwrap(); + let resp = metrics_client + .send_request(request) + .await + .context("fetching metrics from Postgres metrics service") + .map_err(|e| JsonResponse::error(StatusCode::INTERNAL_SERVER_ERROR, format!("{e:#}")))?; + + // Check response status + let status = resp.status(); + if status != StatusCode::OK { + return Err(JsonResponse::error( + status, + format!( + "Postgres metrics service returned error: {}", + status.to_string() + ), + )); + } + + // Build an OK response, with the body forwarded from the response we got. + let mut response = Response::builder(); + response = response.status(StatusCode::OK); + if let Some(content_type) = resp.headers().get(CONTENT_TYPE) { + response = response.header(CONTENT_TYPE, content_type); + } + + let body = tonic::service::AxumBody::from_stream(resp.into_body().into_data_stream()); + + Ok(response.body(body).unwrap()) +} diff --git a/compute_tools/src/http/server.rs b/compute_tools/src/http/server.rs index 17939e39d4..f0fbca8263 100644 --- a/compute_tools/src/http/server.rs +++ b/compute_tools/src/http/server.rs @@ -81,8 +81,12 @@ impl From<&Server> for Router> { Server::External { config, compute_id, .. } => { - let unauthenticated_router = - Router::>::new().route("/metrics", get(metrics::get_metrics)); + let unauthenticated_router = Router::>::new() + .route("/metrics", get(metrics::get_metrics)) + .route( + "/autoscaling_metrics", + get(metrics::get_autoscaling_metrics), + ); let authenticated_router = Router::>::new() .route("/lfc/prewarm", get(lfc::prewarm_state).post(lfc::prewarm)) diff --git a/compute_tools/src/lib.rs b/compute_tools/src/lib.rs index 2d5d4565b7..950a745201 100644 --- a/compute_tools/src/lib.rs +++ b/compute_tools/src/lib.rs @@ -24,6 +24,7 @@ pub mod monitor; pub mod params; pub mod pg_helpers; pub mod pgbouncer; +pub mod postgres_metrics_client; pub mod rsyslog; pub mod spec; mod spec_apply; diff --git a/compute_tools/src/postgres_metrics_client.rs b/compute_tools/src/postgres_metrics_client.rs new file mode 100644 index 0000000000..ef42a01444 --- /dev/null +++ b/compute_tools/src/postgres_metrics_client.rs @@ -0,0 +1,97 @@ +//! Client for making request to a running Postgres server's metrics service +//! +//! The storage communicator process that runs inside Postgres exposes +//! an HTTP endpoint in a Unix Domain Socket in the Postgres data +//! directory. This provides access to it. + +use std::path::Path; + +use anyhow::Context; +use hyper::client::conn::http1::SendRequest; +use hyper_util::rt::TokioIo; + +/// Name of the socket within the Postgres data directory. This better match that in +/// `pgxn/neon/communicator/src/worker_process/metrics_exporter.rs`. +const NEON_COMMUNICATOR_SOCKET_NAME: &str = "neon-communicator.socket"; + +/// Open a connection to the metrics exporter's socket, prepare to send requests to it +/// with hyper. +pub async fn connect_postgres_metrics_socket(pgdata: &Path) -> anyhow::Result> +where + B: hyper::body::Body + 'static + Send, + B::Data: Send, + B::Error: Into>, +{ + let socket_path = pgdata.join(NEON_COMMUNICATOR_SOCKET_NAME); + let socket_path_len = socket_path.display().to_string().len(); + + // There is a limit of around 100 bytes (108 on Linux?) on the length of the path to a + // Unix Domain socket. The limit is on the connect(2) function used to open the + // socket, not on the absolute path itself. Postgres changes the current directory to + // the data directory and uses a relative path to bind to the socket, and the relative + // path "./neon-communicator.socket" is always short, but when compute_ctl needs to + // open the socket, we need to use a full path, which can be arbitrarily long. + // + // There are a few ways we could work around this: + // + // 1. Change the current directory to the Postgres data directory and use a relative + // path in the connect(2) call. That's problematic because the current directory + // applies to the whole process. We could change the current directory early in + // compute_ctl startup, and that might be a good idea anyway for other reasons too: + // it would be more robust if the data directory is moved around or unlinked for some + // reason, and you would be less likely to accidentally litter other parts of the + // filesystem with e.g. temporary files. However, that's a pretty invasive change. + // + // 2. On Linux, you could open() the data directory, and refer to the the socket inside it + // as "/proc/self/fd//neon-communicator.socket". But that's Linux-only. + // + // 3. Create a symbolic link to the socket with a shorter path, and use that. + // + // We use the symbolic link approach here. Hopefully the paths we use in production + // are shorter, so that we can open the socket directly, so that this hack is needed + // only in development. + let connect_result = if socket_path_len < 100 { + // We can open the path directly with no hacks. + tokio::net::UnixStream::connect(socket_path).await + } else { + // The path to the socket is too long. Create a symlink to it with a shorter path. + let short_path = std::env::temp_dir().join(format!( + "compute_ctl.short-socket.{}.{}", + std::process::id(), + tokio::task::id() + )); + std::os::unix::fs::symlink(&socket_path, &short_path)?; + + // Delete the symlink as soon as we have connected to it. There's a small chance + // of leaking if the process dies before we remove it, so try to keep that window + // as small as possible. + scopeguard::defer! { + if let Err(err) = std::fs::remove_file(&short_path) { + tracing::warn!("could not remove symlink \"{}\" created for socket: {}", + short_path.display(), err); + } + } + + tracing::info!( + "created symlink \"{}\" for socket \"{}\", opening it now", + short_path.display(), + socket_path.display() + ); + + tokio::net::UnixStream::connect(&short_path).await + }; + + let stream = connect_result.context("opening postgres metrics socket")?; + + let io = TokioIo::new(stream); + let (request_sender, connection) = hyper::client::conn::http1::handshake(io).await.unwrap(); + + // spawn a task to poll the connection and drive the HTTP state + tokio::spawn(async move { + if let Err(e) = connection.await { + eprintln!("Error in connection: {}", e); + } + }); + + Ok(request_sender) +} diff --git a/pgxn/neon/Makefile b/pgxn/neon/Makefile index bf7aeb4108..df9a6f0e4b 100644 --- a/pgxn/neon/Makefile +++ b/pgxn/neon/Makefile @@ -5,6 +5,7 @@ MODULE_big = neon OBJS = \ $(WIN32RES) \ communicator.o \ + communicator_process.o \ extension_server.o \ file_cache.o \ hll.o \ diff --git a/pgxn/neon/communicator/Cargo.toml b/pgxn/neon/communicator/Cargo.toml index e95a269d90..c5b5804ebc 100644 --- a/pgxn/neon/communicator/Cargo.toml +++ b/pgxn/neon/communicator/Cargo.toml @@ -13,7 +13,14 @@ crate-type = ["staticlib"] testing = [] [dependencies] -neon-shmem.workspace = true +axum.workspace = true +http.workspace = true +tokio = { version = "1.43.1", features = ["macros", "net", "io-util", "rt", "rt-multi-thread"] } +tracing.workspace = true +tracing-subscriber.workspace = true + +metrics.workspace = true +utils.workspace = true workspace_hack = { version = "0.1", path = "../../../workspace_hack" } [build-dependencies] diff --git a/pgxn/neon/communicator/README.md b/pgxn/neon/communicator/README.md index 8169ae72b5..6c5f24ce52 100644 --- a/pgxn/neon/communicator/README.md +++ b/pgxn/neon/communicator/README.md @@ -1,7 +1,22 @@ -This package will evolve into a "compute-pageserver communicator" -process and machinery. For now, it's just a dummy that doesn't do -anything interesting, but it allows us to test the compilation and -linking of Rust code into the Postgres extensions. +# Communicator + +This package provides the so-called "compute-pageserver communicator", +or just "communicator" in short. The communicator is a separate +background worker process that runs in the PostgreSQL server. It's +part of the neon extension. Currently, it only provides an HTTP +endpoint for metrics, but in the future it will evolve to handle all +communications with the pageservers. + +## Source code view + +pgxn/neon/communicator_process.c + Contains code needed to start up the communicator process, and + the glue that interacts with PostgreSQL code and the Rust + code in the communicator process. + + +pgxn/neon/communicator/src/worker_process/ + Worker process main loop and glue code At compilation time, pgxn/neon/communicator/ produces a static library, libcommunicator.a. It is linked to the neon.so extension diff --git a/pgxn/neon/communicator/src/lib.rs b/pgxn/neon/communicator/src/lib.rs index 24c180d37d..6fc169ad47 100644 --- a/pgxn/neon/communicator/src/lib.rs +++ b/pgxn/neon/communicator/src/lib.rs @@ -1,6 +1 @@ -/// dummy function, just to test linking Rust functions into the C -/// extension -#[unsafe(no_mangle)] -pub extern "C" fn communicator_dummy(arg: u32) -> u32 { - arg + 1 -} +mod worker_process; diff --git a/pgxn/neon/communicator/src/worker_process/callbacks.rs b/pgxn/neon/communicator/src/worker_process/callbacks.rs new file mode 100644 index 0000000000..e98ffea80b --- /dev/null +++ b/pgxn/neon/communicator/src/worker_process/callbacks.rs @@ -0,0 +1,35 @@ +//! C callbacks to PostgreSQL facilities that the neon extension needs to provide. These are +//! implemented in `neon/pgxn/communicator_process.c`. The function signatures better match! +//! +//! These are called from the communicator threads! Careful what you do, most Postgres functions are +//! not safe to call in that context. + +unsafe extern "C" { + pub fn callback_set_my_latch_unsafe(); + + pub fn callback_get_lfc_metrics_unsafe() -> LfcMetrics; +} + +// safe wrappers + +pub(super) fn callback_set_my_latch() { + unsafe { callback_set_my_latch_unsafe() }; +} + +#[repr(C)] +pub struct LfcMetrics { + pub lfc_cache_size_limit: i64, + pub lfc_hits: i64, + pub lfc_misses: i64, + pub lfc_used: i64, + pub lfc_writes: i64, + + // working set size looking back 1..60 minutes. + // + // Index 0 is size of working set accessed within last 1 minute, + // index 59 is size of working set accessed within last 60 minutes. + pub lfc_approximate_working_set_size_windows: [i64; 60], +} +pub extern "C" fn callback_get_lfc_metrics() -> LfcMetrics { + unsafe { callback_get_lfc_metrics_unsafe() } +} diff --git a/pgxn/neon/communicator/src/worker_process/lfc_metrics.rs b/pgxn/neon/communicator/src/worker_process/lfc_metrics.rs new file mode 100644 index 0000000000..cb402ddef0 --- /dev/null +++ b/pgxn/neon/communicator/src/worker_process/lfc_metrics.rs @@ -0,0 +1,91 @@ +use metrics::{IntGauge, IntGaugeVec}; + +use super::callbacks::callback_get_lfc_metrics; + +pub(crate) struct LfcMetricsCollector { + lfc_cache_size_limit: IntGauge, + lfc_hits: IntGauge, + lfc_misses: IntGauge, + lfc_used: IntGauge, + lfc_writes: IntGauge, + lfc_approximate_working_set_size_windows_vec: IntGaugeVec, + lfc_approximate_working_set_size_windows: [IntGauge; 60], +} + +impl LfcMetricsCollector { + pub fn new() -> LfcMetricsCollector { + let lfc_approximate_working_set_size_windows_vec = IntGaugeVec::new( + metrics::opts!( + "lfc_approximate_working_set_size_windows", + "Approximate working set size in pages of 8192 bytes", + ), + &[&"duration_seconds"], + ) + .unwrap(); + + let lfc_approximate_working_set_size_windows: [IntGauge; 60] = (1..=60) + .map(|minutes| { + lfc_approximate_working_set_size_windows_vec + .with_label_values(&[&(minutes * 60).to_string()]) + }) + .collect::>() + .try_into() + .unwrap(); + + LfcMetricsCollector { + lfc_cache_size_limit: IntGauge::new( + "lfc_cache_size_limit", + "LFC cache size limit in bytes", + ) + .unwrap(), + lfc_hits: IntGauge::new("lfc_hits", "LFC cache hits").unwrap(), + lfc_misses: IntGauge::new("lfc_misses", "LFC cache misses").unwrap(), + lfc_used: IntGauge::new("lfc_used", "LFC chunks used (chunk = 1MB)").unwrap(), + lfc_writes: IntGauge::new("lfc_writes", "LFC cache writes").unwrap(), + + lfc_approximate_working_set_size_windows_vec, + lfc_approximate_working_set_size_windows, + } + } +} + +impl metrics::core::Collector for LfcMetricsCollector { + fn desc(&self) -> Vec<&metrics::core::Desc> { + let mut descs = Vec::new(); + + descs.append(&mut self.lfc_cache_size_limit.desc()); + descs.append(&mut self.lfc_hits.desc()); + descs.append(&mut self.lfc_misses.desc()); + descs.append(&mut self.lfc_used.desc()); + descs.append(&mut self.lfc_writes.desc()); + descs.append(&mut self.lfc_approximate_working_set_size_windows_vec.desc()); + + descs + } + + fn collect(&self) -> Vec { + let mut values = Vec::new(); + + // update the gauges + let lfc_metrics = callback_get_lfc_metrics(); + self.lfc_cache_size_limit + .set(lfc_metrics.lfc_cache_size_limit); + self.lfc_hits.set(lfc_metrics.lfc_hits); + self.lfc_misses.set(lfc_metrics.lfc_misses); + self.lfc_used.set(lfc_metrics.lfc_used); + self.lfc_writes.set(lfc_metrics.lfc_writes); + for i in 0..60 { + let val = lfc_metrics.lfc_approximate_working_set_size_windows[i]; + self.lfc_approximate_working_set_size_windows[i].set(val); + } + + values.append(&mut self.lfc_cache_size_limit.collect()); + values.append(&mut self.lfc_hits.collect()); + values.append(&mut self.lfc_misses.collect()); + values.append(&mut self.lfc_used.collect()); + values.append(&mut self.lfc_writes.collect()); + values.append(&mut self.lfc_approximate_working_set_size_windows_vec.collect()); + + values + } +} diff --git a/pgxn/neon/communicator/src/worker_process/logging.rs b/pgxn/neon/communicator/src/worker_process/logging.rs new file mode 100644 index 0000000000..f200eb0446 --- /dev/null +++ b/pgxn/neon/communicator/src/worker_process/logging.rs @@ -0,0 +1,228 @@ +//! Glue code to hook up Rust logging with the `tracing` crate to the PostgreSQL log +//! +//! In the Rust threads, the log messages are written to a mpsc Channel, and the Postgres +//! process latch is raised. That wakes up the loop in the main thread, see +//! `communicator_new_bgworker_main()`. It reads the message from the channel and +//! ereport()s it. This ensures that only one thread, the main thread, calls the +//! PostgreSQL logging routines at any time. + +use std::sync::mpsc::sync_channel; +use std::sync::mpsc::{Receiver, SyncSender}; +use std::sync::mpsc::{TryRecvError, TrySendError}; + +use tracing::info; +use tracing::{Event, Level, Metadata, Subscriber}; +use tracing_subscriber::filter::LevelFilter; +use tracing_subscriber::fmt::format::Writer; +use tracing_subscriber::fmt::{FmtContext, FormatEvent, FormatFields, FormattedFields, MakeWriter}; +use tracing_subscriber::registry::LookupSpan; + +use crate::worker_process::callbacks::callback_set_my_latch; + +pub struct LoggingState { + receiver: Receiver, +} + +/// Called once, at worker process startup. The returned LoggingState is passed back +/// in the subsequent calls to `pump_logging`. It is opaque to the C code. +#[unsafe(no_mangle)] +pub extern "C" fn configure_logging() -> Box { + let (sender, receiver) = sync_channel(1000); + + let maker = Maker { channel: sender }; + + use tracing_subscriber::prelude::*; + let r = tracing_subscriber::registry(); + + let r = r.with( + tracing_subscriber::fmt::layer() + .with_ansi(false) + .event_format(SimpleFormatter::new()) + .with_writer(maker) + // TODO: derive this from log_min_messages? + .with_filter(LevelFilter::from_level(Level::INFO)), + ); + r.init(); + + info!("communicator process logging started"); + + let state = LoggingState { receiver }; + + Box::new(state) +} + +/// Read one message from the logging queue. This is essentially a wrapper to Receiver, +/// with a C-friendly signature. +/// +/// The message is copied into *errbuf, which is a caller-supplied buffer of size +/// `errbuf_len`. If the message doesn't fit in the buffer, it is truncated. It is always +/// NULL-terminated. +/// +/// The error level is returned *elevel_p. It's one of the PostgreSQL error levels, see +/// elog.h +#[unsafe(no_mangle)] +pub extern "C" fn pump_logging( + state: &mut LoggingState, + errbuf: *mut u8, + errbuf_len: u32, + elevel_p: &mut i32, +) -> i32 { + let msg = match state.receiver.try_recv() { + Err(TryRecvError::Empty) => return 0, + Err(TryRecvError::Disconnected) => return -1, + Ok(msg) => msg, + }; + + let src: &[u8] = &msg.message; + let dst = errbuf; + let len = std::cmp::min(src.len(), errbuf_len as usize - 1); + unsafe { + std::ptr::copy_nonoverlapping(src.as_ptr(), dst, len); + *(errbuf.add(len)) = b'\0'; // NULL terminator + } + + // XXX: these levels are copied from PostgreSQL's elog.h. Introduce another enum to + // hide these? + *elevel_p = match msg.level { + Level::TRACE => 10, // DEBUG5 + Level::DEBUG => 14, // DEBUG1 + Level::INFO => 17, // INFO + Level::WARN => 19, // WARNING + Level::ERROR => 21, // ERROR + }; + + 1 +} + +//---- The following functions can be called from any thread ---- + +#[derive(Clone)] +struct FormattedEventWithMeta { + message: Vec, + level: tracing::Level, +} + +impl Default for FormattedEventWithMeta { + fn default() -> Self { + FormattedEventWithMeta { + message: Vec::new(), + level: tracing::Level::DEBUG, + } + } +} + +struct EventBuilder<'a> { + event: FormattedEventWithMeta, + + maker: &'a Maker, +} + +impl std::io::Write for EventBuilder<'_> { + fn write(&mut self, buf: &[u8]) -> std::io::Result { + self.event.message.write(buf) + } + fn flush(&mut self) -> std::io::Result<()> { + self.maker.send_event(self.event.clone()); + Ok(()) + } +} + +impl Drop for EventBuilder<'_> { + fn drop(&mut self) { + let maker = self.maker; + let event = std::mem::take(&mut self.event); + + maker.send_event(event); + } +} + +struct Maker { + channel: SyncSender, +} + +impl<'a> MakeWriter<'a> for Maker { + type Writer = EventBuilder<'a>; + + fn make_writer(&'a self) -> Self::Writer { + panic!("not expected to be called when make_writer_for is implemented"); + } + + fn make_writer_for(&'a self, meta: &Metadata<'_>) -> Self::Writer { + EventBuilder { + event: FormattedEventWithMeta { + message: Vec::new(), + level: *meta.level(), + }, + maker: self, + } + } +} + +impl Maker { + fn send_event(&self, e: FormattedEventWithMeta) { + match self.channel.try_send(e) { + Ok(()) => { + // notify the main thread + callback_set_my_latch(); + } + Err(TrySendError::Disconnected(_)) => {} + Err(TrySendError::Full(_)) => { + // TODO: record that some messages were lost + } + } + } +} + +/// Simple formatter implementation for tracing_subscriber, which prints the log spans and +/// message part like the default formatter, but no timestamp or error level. The error +/// level is captured separately by `FormattedEventWithMeta', and when the error is +/// printed by the main thread, with PostgreSQL ereport(), it gets a timestamp at that +/// point. (The timestamp printed will therefore lag behind the timestamp on the event +/// here, if the main thread doesn't process the log message promptly) +struct SimpleFormatter; + +impl FormatEvent for SimpleFormatter +where + S: Subscriber + for<'a> LookupSpan<'a>, + N: for<'a> FormatFields<'a> + 'static, +{ + fn format_event( + &self, + ctx: &FmtContext<'_, S, N>, + mut writer: Writer<'_>, + event: &Event<'_>, + ) -> std::fmt::Result { + // Format all the spans in the event's span context. + if let Some(scope) = ctx.event_scope() { + for span in scope.from_root() { + write!(writer, "{}", span.name())?; + + // `FormattedFields` is a formatted representation of the span's fields, + // which is stored in its extensions by the `fmt` layer's `new_span` + // method. The fields will have been formatted by the same field formatter + // that's provided to the event formatter in the `FmtContext`. + let ext = span.extensions(); + let fields = &ext + .get::>() + .expect("will never be `None`"); + + // Skip formatting the fields if the span had no fields. + if !fields.is_empty() { + write!(writer, "{{{fields}}}")?; + } + write!(writer, ": ")?; + } + } + + // Write fields on the event + ctx.field_format().format_fields(writer.by_ref(), event)?; + + writeln!(writer) + } +} + +impl SimpleFormatter { + fn new() -> Self { + SimpleFormatter {} + } +} diff --git a/pgxn/neon/communicator/src/worker_process/main_loop.rs b/pgxn/neon/communicator/src/worker_process/main_loop.rs new file mode 100644 index 0000000000..f3d7733904 --- /dev/null +++ b/pgxn/neon/communicator/src/worker_process/main_loop.rs @@ -0,0 +1,40 @@ +use std::str::FromStr as _; + +use crate::worker_process::lfc_metrics::LfcMetricsCollector; + +use utils::id::{TenantId, TimelineId}; + +pub struct CommunicatorWorkerProcessStruct { + /*** Metrics ***/ + pub(crate) lfc_metrics: LfcMetricsCollector, +} + +pub(super) async fn init( + tenant_id: String, + timeline_id: String, +) -> CommunicatorWorkerProcessStruct { + let _tenant_id = TenantId::from_str(&tenant_id).expect("invalid tenant ID"); + let _timeline_id = TimelineId::from_str(&timeline_id).expect("invalid timeline ID"); + + CommunicatorWorkerProcessStruct { + // metrics + lfc_metrics: LfcMetricsCollector::new(), + } +} + +impl metrics::core::Collector for CommunicatorWorkerProcessStruct { + fn desc(&self) -> Vec<&metrics::core::Desc> { + let mut descs = Vec::new(); + + descs.append(&mut self.lfc_metrics.desc()); + + descs + } + fn collect(&self) -> Vec { + let mut values = Vec::new(); + + values.append(&mut self.lfc_metrics.collect()); + + values + } +} diff --git a/pgxn/neon/communicator/src/worker_process/metrics_exporter.rs b/pgxn/neon/communicator/src/worker_process/metrics_exporter.rs new file mode 100644 index 0000000000..84cceb19df --- /dev/null +++ b/pgxn/neon/communicator/src/worker_process/metrics_exporter.rs @@ -0,0 +1,99 @@ +//! Export information about Postgres, the communicator process, file cache etc. as +//! prometheus metrics. +//! +//! The exporter speaks HTTP, listens on a Unix Domain Socket under the Postgres +//! data directory. For debugging, you can access it with curl: +//! +//! curl --unix-socket neon-communicator.socket http://localhost/metrics +//! +use axum::Router; +use axum::body::Body; +use axum::extract::State; +use axum::response::Response; +use http::StatusCode; +use http::header::CONTENT_TYPE; + +use metrics::proto::MetricFamily; +use metrics::{Encoder, TextEncoder}; + +use std::path::PathBuf; + +use tokio::net::UnixListener; + +use crate::worker_process::main_loop::CommunicatorWorkerProcessStruct; + +const NEON_COMMUNICATOR_SOCKET_NAME: &str = "neon-communicator.socket"; + +impl CommunicatorWorkerProcessStruct { + /// Launch the metrics exporter + pub(crate) async fn launch_metrics_exporter(&'static self) { + use axum::routing::get; + let app = Router::new() + .route("/metrics", get(get_metrics)) + .route("/autoscaling_metrics", get(get_autoscaling_metrics)) + .route("/debug/panic", get(handle_debug_panic)) + .with_state(self); + + // Listen on unix domain socket, in the data directory. That should be unique. + let path = PathBuf::from(NEON_COMMUNICATOR_SOCKET_NAME); + let listener = UnixListener::bind(path.clone()).unwrap(); + + tokio::spawn(async { + tracing::info!("metrics listener spawned"); + axum::serve(listener, app).await.unwrap() + }); + } +} + +/// Expose all Prometheus metrics. +async fn get_metrics(State(state): State<&CommunicatorWorkerProcessStruct>) -> Response { + use metrics::core::Collector; + let metrics = state.collect(); + tracing::trace!("/metrics requested"); + metrics_to_response(metrics).await +} + +/// Expose Prometheus metrics, for use by the autoscaling agent. +/// +/// This is a subset of all the metrics. +async fn get_autoscaling_metrics( + State(state): State<&CommunicatorWorkerProcessStruct>, +) -> Response { + use metrics::core::Collector; + let metrics = state.lfc_metrics.collect(); + + tracing::trace!("/autoscaling_metrics requested"); + metrics_to_response(metrics).await +} + +async fn handle_debug_panic(State(_state): State<&CommunicatorWorkerProcessStruct>) -> Response { + panic!("test HTTP handler task panic"); +} + +/// Helper function to convert prometheus metrics to a text response +async fn metrics_to_response(metrics: Vec) -> Response { + // When we call TextEncoder::encode() below, it will immediately return an + // error if a metric family has no metrics, so we need to preemptively + // filter out metric families with no metrics. + let metrics = metrics + .into_iter() + .filter(|m| !m.get_metric().is_empty()) + .collect::>(); + + let encoder = TextEncoder::new(); + let mut buffer = vec![]; + + if let Err(e) = encoder.encode(&metrics, &mut buffer) { + Response::builder() + .status(StatusCode::INTERNAL_SERVER_ERROR) + .header(CONTENT_TYPE, "application/text") + .body(Body::from(e.to_string())) + .unwrap() + } else { + Response::builder() + .status(StatusCode::OK) + .header(CONTENT_TYPE, encoder.format_type()) + .body(Body::from(buffer)) + .unwrap() + } +} diff --git a/pgxn/neon/communicator/src/worker_process/mod.rs b/pgxn/neon/communicator/src/worker_process/mod.rs new file mode 100644 index 0000000000..c58556081b --- /dev/null +++ b/pgxn/neon/communicator/src/worker_process/mod.rs @@ -0,0 +1,13 @@ +//! This code runs in the communicator worker process. This provides +//! the glue code to: +//! +//! - launch the main loop, +//! - receive IO requests from backends and process them, +//! - write results back to backends. + +mod callbacks; +mod lfc_metrics; +mod logging; +mod main_loop; +mod metrics_exporter; +mod worker_interface; diff --git a/pgxn/neon/communicator/src/worker_process/worker_interface.rs b/pgxn/neon/communicator/src/worker_process/worker_interface.rs new file mode 100644 index 0000000000..2ed30d20f6 --- /dev/null +++ b/pgxn/neon/communicator/src/worker_process/worker_interface.rs @@ -0,0 +1,39 @@ +//! Functions called from the C code in the worker process + +use std::ffi::{CStr, c_char}; + +use crate::worker_process::main_loop; +use crate::worker_process::main_loop::CommunicatorWorkerProcessStruct; + +/// Launch the communicator's tokio tasks, which do most of the work. +/// +/// The caller has initialized the process as a regular PostgreSQL background worker +/// process. +#[unsafe(no_mangle)] +pub extern "C" fn communicator_worker_process_launch( + tenant_id: *const c_char, + timeline_id: *const c_char, +) -> &'static CommunicatorWorkerProcessStruct { + // Convert the arguments into more convenient Rust types + let tenant_id = unsafe { CStr::from_ptr(tenant_id) }.to_str().unwrap(); + let timeline_id = unsafe { CStr::from_ptr(timeline_id) }.to_str().unwrap(); + + let runtime = tokio::runtime::Builder::new_multi_thread() + .enable_all() + .thread_name("communicator thread") + .build() + .unwrap(); + + let worker_struct = runtime.block_on(main_loop::init( + tenant_id.to_string(), + timeline_id.to_string(), + )); + let worker_struct = Box::leak(Box::new(worker_struct)); + + runtime.block_on(worker_struct.launch_metrics_exporter()); + + // keep the runtime running after we exit this function + Box::leak(Box::new(runtime)); + + worker_struct +} diff --git a/pgxn/neon/communicator_process.c b/pgxn/neon/communicator_process.c new file mode 100644 index 0000000000..f4beb0202a --- /dev/null +++ b/pgxn/neon/communicator_process.c @@ -0,0 +1,178 @@ +/*------------------------------------------------------------------------- + * + * communicator_process.c + * Functions for starting up the communicator background worker process. + * + * Currently, the communicator process only functions as a metrics + * exporter. It provides an HTTP endpoint for polling a limited set of + * metrics. TODO: In the future, it will do much more, i.e. handle all + * the communications with the pageservers. + * + * Portions Copyright (c) 1996-2021, PostgreSQL Global Development Group + * Portions Copyright (c) 1994, Regents of the University of California + * + *------------------------------------------------------------------------- + */ +#include "postgres.h" + +#include + +#include "miscadmin.h" +#include "postmaster/bgworker.h" +#include "postmaster/interrupt.h" +#include "postmaster/postmaster.h" +#include "replication/walsender.h" +#include "storage/ipc.h" +#include "storage/latch.h" +#include "storage/pmsignal.h" +#include "storage/procsignal.h" +#include "tcop/tcopprot.h" + +#include "communicator_process.h" +#include "file_cache.h" +#include "neon.h" +#include "neon_perf_counters.h" + +/* the rust bindings, generated by cbindgen */ +#include "communicator/communicator_bindings.h" + +PGDLLEXPORT void communicator_new_bgworker_main(Datum main_arg); + +/**** Initialization functions. These run in postmaster ****/ + +void +register_communicator_bgworker(void) +{ + BackgroundWorker bgw; + + /* Initialize the background worker process */ + memset(&bgw, 0, sizeof(bgw)); + bgw.bgw_flags = BGWORKER_SHMEM_ACCESS; + bgw.bgw_start_time = BgWorkerStart_PostmasterStart; + snprintf(bgw.bgw_library_name, BGW_MAXLEN, "neon"); + snprintf(bgw.bgw_function_name, BGW_MAXLEN, "communicator_new_bgworker_main"); + snprintf(bgw.bgw_name, BGW_MAXLEN, "Storage communicator process"); + snprintf(bgw.bgw_type, BGW_MAXLEN, "Storage communicator process"); + bgw.bgw_restart_time = 5; + bgw.bgw_notify_pid = 0; + bgw.bgw_main_arg = (Datum) 0; + + RegisterBackgroundWorker(&bgw); +} + +/**** Worker process functions. These run in the communicator worker process ****/ + +/* Entry point for the communicator bgworker process */ +void +communicator_new_bgworker_main(Datum main_arg) +{ + struct LoggingState *logging; + char errbuf[1000]; + int elevel; + const struct CommunicatorWorkerProcessStruct *proc_handle; + + /* + * Pretend that this process is a WAL sender. That affects the shutdown + * sequence: WAL senders are shut down last, after the final checkpoint + * has been written. That's what we want for the communicator process too. + */ + am_walsender = true; + MarkPostmasterChildWalSender(); + + /* Establish signal handlers. */ + pqsignal(SIGUSR1, procsignal_sigusr1_handler); + /* + * Postmaster sends us SIGUSR2 when all regular backends and bgworkers + * have exited, and it's time for us to exit too + */ + pqsignal(SIGUSR2, die); + pqsignal(SIGHUP, SignalHandlerForConfigReload); + pqsignal(SIGTERM, die); + + BackgroundWorkerUnblockSignals(); + + logging = configure_logging(); + + proc_handle = communicator_worker_process_launch( + neon_tenant, + neon_timeline); + + /* proc_handle is not currently used, but will be in the future */ + (void) proc_handle; + + /* + * The Rust tokio runtime has been launched, and it's running in the + * background now. This process is now multi-threaded! The Rust threads do + * not call into any Postgres functions. + * + * This loop in the main thread handles any interactions we need with the + * rest of PostgreSQL. + */ + elog(LOG, "communicator threads started"); + for (;;) + { + int32 rc; + + /* + * Check interrupts like system shutdown or config reload + */ + CHECK_FOR_INTERRUPTS(); + if (ConfigReloadPending) + { + ConfigReloadPending = false; + ProcessConfigFile(PGC_SIGHUP); + } + + /* + * Forward any log messages from the Rust threads into the normal Postgres + * logging facility. + */ + for (;;) + { + rc = pump_logging(logging, (uint8 *) errbuf, sizeof(errbuf), &elevel); + if (rc == 0) + { + /* nothing to do */ + break; + } + else if (rc == 1) + { + /* Because we don't want to exit on error */ + if (elevel == ERROR) + elevel = LOG; + if (elevel == INFO) + elevel = LOG; + elog(elevel, "[COMMUNICATOR] %s", errbuf); + } + else if (rc == -1) + { + elog(ERROR, "logging channel was closed unexpectedly"); + } + } + + /* + * Wait until we are woken up. The rust threads will set the latch if + * there's log message to forward. + */ + (void) WaitLatch(MyLatch, + WL_LATCH_SET | WL_EXIT_ON_PM_DEATH, + 0, + PG_WAIT_EXTENSION); + ResetLatch(MyLatch); + } +} + +/**** + * Callbacks from the rust code, in the communicator process. + * + * NOTE: These must be thread-safe! It's very limited which PostgreSQL + * functions you can use!!! + * + * The signatures of these need to match those in the Rust code. + */ + +void +callback_set_my_latch_unsafe(void) +{ + SetLatch(MyLatch); +} diff --git a/pgxn/neon/communicator_process.h b/pgxn/neon/communicator_process.h new file mode 100644 index 0000000000..3b1d88b6ac --- /dev/null +++ b/pgxn/neon/communicator_process.h @@ -0,0 +1,17 @@ +/*------------------------------------------------------------------------- + * + * communicator_process.h + * Communicator process + * + * + * Portions Copyright (c) 1996-2021, PostgreSQL Global Development Group + * Portions Copyright (c) 1994, Regents of the University of California + * + *------------------------------------------------------------------------- + */ +#ifndef COMMUNICATOR_PROCESS_H +#define COMMUNICATOR_PROCESS_H + +extern void register_communicator_bgworker(void); + +#endif /* COMMUNICATOR_PROCESS_H */ diff --git a/pgxn/neon/file_cache.c b/pgxn/neon/file_cache.c index 2c87f139af..7800770c80 100644 --- a/pgxn/neon/file_cache.c +++ b/pgxn/neon/file_cache.c @@ -52,6 +52,8 @@ #include "pagestore_client.h" #include "communicator.h" +#include "communicator/communicator_bindings.h" + #define CriticalAssert(cond) do if (!(cond)) elog(PANIC, "LFC: assertion %s failed at %s:%d: ", #cond, __FILE__, __LINE__); while (0) /* @@ -2179,6 +2181,38 @@ lfc_approximate_working_set_size_seconds(time_t duration, bool reset) return dc; } +/* + * Get metrics, for the built-in metrics exporter that's part of the communicator + * process. + * + * NB: This is called from a Rust tokio task inside the communicator process. + * Acquiring lwlocks, elog(), allocating memory etc. or anything else + * non-trivial is strictly prohibited here! + */ +struct LfcMetrics +callback_get_lfc_metrics_unsafe(void) +{ + struct LfcMetrics result = { + .lfc_cache_size_limit = lfc_size_limit, + .lfc_hits = lfc_ctl ? lfc_ctl->hits : 0, + .lfc_misses = lfc_ctl ? lfc_ctl->misses : 0, + .lfc_used = lfc_ctl ? lfc_ctl->used : 0, + .lfc_writes = lfc_ctl ? lfc_ctl->writes : 0, + }; + + if (lfc_ctl) + { + for (int minutes = 1; minutes <= 60; minutes++) + { + result.lfc_approximate_working_set_size_windows[minutes - 1] = + lfc_approximate_working_set_size_seconds(minutes * 60, false); + } + } + + return result; +} + + PG_FUNCTION_INFO_V1(get_local_cache_state); Datum diff --git a/pgxn/neon/neon.c b/pgxn/neon/neon.c index 7b749f1080..d8ea20331d 100644 --- a/pgxn/neon/neon.c +++ b/pgxn/neon/neon.c @@ -30,6 +30,7 @@ #include "utils/guc_tables.h" #include "communicator.h" +#include "communicator_process.h" #include "extension_server.h" #include "file_cache.h" #include "neon.h" @@ -455,14 +456,13 @@ _PG_init(void) shmem_startup_hook = neon_shmem_startup_hook; #endif - /* dummy call to a Rust function in the communicator library, to check that it works */ - (void) communicator_dummy(123); - pg_init_libpagestore(); lfc_init(); pg_init_walproposer(); init_lwlsncache(); + register_communicator_bgworker(); + pg_init_communicator(); Custom_XLogReaderRoutines = NeonOnDemandXLogReaderRoutines; diff --git a/poetry.lock b/poetry.lock index 1bc5077eb7..bd93fea10a 100644 --- a/poetry.lock +++ b/poetry.lock @@ -1,4 +1,4 @@ -# This file is automatically @generated by Poetry 2.1.1 and should not be changed by hand. +# This file is automatically @generated by Poetry 2.1.3 and should not be changed by hand. [[package]] name = "aiohappyeyeballs" @@ -3071,6 +3071,21 @@ urllib3 = ">=1.21.1,<3" socks = ["PySocks (>=1.5.6,!=1.5.7)"] use-chardet-on-py3 = ["chardet (>=3.0.2,<6)"] +[[package]] +name = "requests-unixsocket" +version = "0.4.1" +description = "Use requests to talk HTTP via a UNIX domain socket" +optional = false +python-versions = ">=3.9" +groups = ["main"] +files = [ + {file = "requests_unixsocket-0.4.1-py3-none-any.whl", hash = "sha256:60c4942e9dbecc2f64d611039fb1dfc25da382083c6434ac0316dca3ff908f4d"}, + {file = "requests_unixsocket-0.4.1.tar.gz", hash = "sha256:b2596158c356ecee68d27ba469a52211230ac6fb0cde8b66afb19f0ed47a1995"}, +] + +[package.dependencies] +requests = ">=1.1" + [[package]] name = "responses" version = "0.25.3" @@ -3847,4 +3862,4 @@ cffi = ["cffi (>=1.11)"] [metadata] lock-version = "2.1" python-versions = "^3.11" -content-hash = "bd93313f110110aa53b24a3ed47ba2d7f60e2c658a79cdff7320fed1bb1b57b5" +content-hash = "b741d0b6f7cd3a062dedb8896471b6e7ba20ab1caef82c060506562e19380ad5" diff --git a/pyproject.toml b/pyproject.toml index e7e314d144..a1ab00ec35 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -50,6 +50,7 @@ types-pyyaml = "^6.0.12.20240917" testcontainers = "^4.9.0" # Install a release candidate of `jsonnet`, as it supports Python 3.13 jsonnet = "^0.21.0-rc2" +requests-unixsocket = "^0.4.1" [tool.poetry.group.dev.dependencies] mypy = "==1.13.0" diff --git a/test_runner/fixtures/endpoint/http.py b/test_runner/fixtures/endpoint/http.py index 1d278095ce..c43445e89d 100644 --- a/test_runner/fixtures/endpoint/http.py +++ b/test_runner/fixtures/endpoint/http.py @@ -66,6 +66,12 @@ class EndpointHttpClient(requests.Session): res.raise_for_status() return res.json() + def autoscaling_metrics(self): + res = self.get(f"http://localhost:{self.external_port}/autoscaling_metrics") + res.raise_for_status() + log.debug("raw compute metrics: %s", res.text) + return res.text + def prewarm_lfc_status(self) -> dict[str, str]: res = self.get(self.prewarm_url) res.raise_for_status() diff --git a/test_runner/fixtures/neon_fixtures.py b/test_runner/fixtures/neon_fixtures.py index b9fff05c6c..4428e727a9 100644 --- a/test_runner/fixtures/neon_fixtures.py +++ b/test_runner/fixtures/neon_fixtures.py @@ -5417,6 +5417,7 @@ SKIP_FILES = frozenset( "postmaster.pid", "pg_control", "pg_dynshmem", + "neon-communicator.socket", ) ) diff --git a/test_runner/regress/test_communicator_metrics_exporter.py b/test_runner/regress/test_communicator_metrics_exporter.py new file mode 100644 index 0000000000..bfaf79b7cf --- /dev/null +++ b/test_runner/regress/test_communicator_metrics_exporter.py @@ -0,0 +1,52 @@ +from __future__ import annotations + +import os +from typing import TYPE_CHECKING + +import pytest +import requests +import requests_unixsocket +from fixtures.metrics import parse_metrics + +if TYPE_CHECKING: + from fixtures.neon_fixtures import NeonEnv + +NEON_COMMUNICATOR_SOCKET_NAME = "neon-communicator.socket" + + +def test_communicator_metrics(neon_simple_env: NeonEnv): + """ + Test the communicator's built-in HTTP prometheus exporter + """ + env = neon_simple_env + + endpoint = env.endpoints.create("main") + endpoint.start() + + os.chdir(endpoint.pgdata_dir) + + session = requests_unixsocket.Session() + r = session.get(f"http+unix://{NEON_COMMUNICATOR_SOCKET_NAME}/metrics") + assert r.status_code == 200 + + # quick test that the endpoint returned something expected. (We don't validate + # that the metrics returned are sensible.) + m = parse_metrics(r.text) + m.query_one("lfc_hits") + m.query_one("lfc_misses") + + # Test panic handling. The /debug/panic endpoint raises a Rust panic. It's + # expected to unwind and drop the HTTP connection without response, but not + # kill the process or the server. + with pytest.raises( + requests.ConnectionError, match="Remote end closed connection without response" + ): + r = session.get(f"http+unix://{NEON_COMMUNICATOR_SOCKET_NAME}/debug/panic") + assert r.status_code == 500 + + # Test that subsequent requests after the panic still work. + r = session.get(f"http+unix://{NEON_COMMUNICATOR_SOCKET_NAME}/metrics") + assert r.status_code == 200 + m = parse_metrics(r.text) + m.query_one("lfc_hits") + m.query_one("lfc_misses") diff --git a/test_runner/regress/test_lfc_working_set_approximation.py b/test_runner/regress/test_lfc_working_set_approximation.py index a28bc3d047..9637a4b84b 100644 --- a/test_runner/regress/test_lfc_working_set_approximation.py +++ b/test_runner/regress/test_lfc_working_set_approximation.py @@ -1,11 +1,13 @@ from __future__ import annotations import time +from logging import debug from pathlib import Path from typing import TYPE_CHECKING import pytest from fixtures.log_helper import log +from fixtures.metrics import parse_metrics from fixtures.utils import USE_LFC, query_scalar if TYPE_CHECKING: @@ -75,10 +77,22 @@ WITH (fillfactor='100'); cur.execute("SELECT abalance FROM pgbench_accounts WHERE aid = 104242") cur.execute("SELECT abalance FROM pgbench_accounts WHERE aid = 204242") # verify working set size after some index access of a few select pages only - blocks = query_scalar(cur, "select approximate_working_set_size(true)") + blocks = query_scalar(cur, "select approximate_working_set_size(false)") log.info(f"working set size after some index access of a few select pages only {blocks}") assert blocks < 20 + # Also test the metrics from the /autoscaling_metrics endpoint + autoscaling_metrics = endpoint.http_client().autoscaling_metrics() + log.debug(f"Raw metrics: {autoscaling_metrics}") + m = parse_metrics(autoscaling_metrics) + + http_estimate = m.query_one("lfc_approximate_working_set_size_windows", + { + "duration_seconds": "60", + }, + ).value + log.info(f"http estimate: {http_estimate}, blocks: {blocks}") + assert http_estimate > 0 and http_estimate < 20 @pytest.mark.skipif(not USE_LFC, reason="LFC is disabled, skipping") def test_sliding_working_set_approximation(neon_simple_env: NeonEnv):