diff --git a/Cargo.lock b/Cargo.lock index 2f36790d30..53bce59652 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1290,8 +1290,14 @@ dependencies = [ name = "communicator" version = "0.1.0" dependencies = [ + "axum", "cbindgen", - "neon-shmem", + "http 1.1.0", + "metrics", + "tokio", + "tracing", + "tracing-subscriber", + "utils", "workspace_hack", ] @@ -1335,6 +1341,9 @@ dependencies = [ "futures", "hostname-validator", "http 1.1.0", + "http-body-util", + "hyper 1.4.1", + "hyper-util", "indexmap 2.9.0", "itertools 0.10.5", "jsonwebtoken", @@ -1357,6 +1366,7 @@ dependencies = [ "ring", "rlimit", "rust-ini", + "scopeguard", "serde", "serde_json", "serde_with", diff --git a/compute_tools/Cargo.toml b/compute_tools/Cargo.toml index 910bae3bda..496471acc7 100644 --- a/compute_tools/Cargo.toml +++ b/compute_tools/Cargo.toml @@ -27,7 +27,10 @@ fail.workspace = true flate2.workspace = true futures.workspace = true http.workspace = true +http-body-util.workspace = true hostname-validator = "1.1" +hyper.workspace = true +hyper-util.workspace = true indexmap.workspace = true itertools.workspace = true jsonwebtoken.workspace = true @@ -44,6 +47,7 @@ postgres.workspace = true regex.workspace = true reqwest = { workspace = true, features = ["json"] } ring = "0.17" +scopeguard.workspace = true serde.workspace = true serde_with.workspace = true serde_json.workspace = true diff --git a/compute_tools/src/http/routes/metrics.rs b/compute_tools/src/http/routes/metrics.rs index da8d8b20a5..32c8a294a7 100644 --- a/compute_tools/src/http/routes/metrics.rs +++ b/compute_tools/src/http/routes/metrics.rs @@ -1,12 +1,20 @@ +use std::path::Path; +use std::sync::Arc; + +use anyhow::Context; use axum::body::Body; +use axum::extract::State; use axum::response::Response; -use http::StatusCode; use http::header::CONTENT_TYPE; +use http_body_util::BodyExt; +use hyper::{Request, StatusCode}; use metrics::proto::MetricFamily; use metrics::{Encoder, TextEncoder}; +use crate::compute::ComputeNode; use crate::http::JsonResponse; use crate::metrics::collect; +use crate::postgres_metrics_client::connect_postgres_metrics_socket; /// Expose Prometheus metrics. pub(in crate::http) async fn get_metrics() -> Response { @@ -31,3 +39,56 @@ pub(in crate::http) async fn get_metrics() -> Response { .body(Body::from(buffer)) .unwrap() } + +/// Fetch and forward metrics from the Postgres neon extension's metrics +/// exporter that are used by autoscaling-agent. +/// +/// The neon extension exposes these metrics over a Unix domain socket +/// in the data directory. That's not accessible directly from the outside +/// world, so we have this endpoint in compute_ctl to expose it +pub(in crate::http) async fn get_autoscaling_metrics( + State(compute): State>, +) -> Result { + let pgdata = Path::new(&compute.params.pgdata); + + // Connect to the communicator process's metrics socket + let mut metrics_client = connect_postgres_metrics_socket(pgdata) + .await + .map_err(|e| JsonResponse::error(StatusCode::INTERNAL_SERVER_ERROR, format!("{e:#}")))?; + + // Make a request for /autoscaling_metrics + let request = Request::builder() + .method("GET") + .uri("/autoscaling_metrics") + .header("Host", "localhost") // hyper requires Host, even though the server won't care + .body(Body::from("")) + .unwrap(); + let resp = metrics_client + .send_request(request) + .await + .context("fetching metrics from Postgres metrics service") + .map_err(|e| JsonResponse::error(StatusCode::INTERNAL_SERVER_ERROR, format!("{e:#}")))?; + + // Check response status + let status = resp.status(); + if status != StatusCode::OK { + return Err(JsonResponse::error( + status, + format!( + "Postgres metrics service returned error: {}", + status.to_string() + ), + )); + } + + // Build an OK response, with the body forwarded from the response we got. + let mut response = Response::builder(); + response = response.status(StatusCode::OK); + if let Some(content_type) = resp.headers().get(CONTENT_TYPE) { + response = response.header(CONTENT_TYPE, content_type); + } + + let body = tonic::service::AxumBody::from_stream(resp.into_body().into_data_stream()); + + Ok(response.body(body).unwrap()) +} diff --git a/compute_tools/src/http/server.rs b/compute_tools/src/http/server.rs index 17939e39d4..f0fbca8263 100644 --- a/compute_tools/src/http/server.rs +++ b/compute_tools/src/http/server.rs @@ -81,8 +81,12 @@ impl From<&Server> for Router> { Server::External { config, compute_id, .. } => { - let unauthenticated_router = - Router::>::new().route("/metrics", get(metrics::get_metrics)); + let unauthenticated_router = Router::>::new() + .route("/metrics", get(metrics::get_metrics)) + .route( + "/autoscaling_metrics", + get(metrics::get_autoscaling_metrics), + ); let authenticated_router = Router::>::new() .route("/lfc/prewarm", get(lfc::prewarm_state).post(lfc::prewarm)) diff --git a/compute_tools/src/lib.rs b/compute_tools/src/lib.rs index 2d5d4565b7..950a745201 100644 --- a/compute_tools/src/lib.rs +++ b/compute_tools/src/lib.rs @@ -24,6 +24,7 @@ pub mod monitor; pub mod params; pub mod pg_helpers; pub mod pgbouncer; +pub mod postgres_metrics_client; pub mod rsyslog; pub mod spec; mod spec_apply; diff --git a/compute_tools/src/postgres_metrics_client.rs b/compute_tools/src/postgres_metrics_client.rs new file mode 100644 index 0000000000..ef42a01444 --- /dev/null +++ b/compute_tools/src/postgres_metrics_client.rs @@ -0,0 +1,97 @@ +//! Client for making request to a running Postgres server's metrics service +//! +//! The storage communicator process that runs inside Postgres exposes +//! an HTTP endpoint in a Unix Domain Socket in the Postgres data +//! directory. This provides access to it. + +use std::path::Path; + +use anyhow::Context; +use hyper::client::conn::http1::SendRequest; +use hyper_util::rt::TokioIo; + +/// Name of the socket within the Postgres data directory. This better match that in +/// `pgxn/neon/communicator/src/worker_process/metrics_exporter.rs`. +const NEON_COMMUNICATOR_SOCKET_NAME: &str = "neon-communicator.socket"; + +/// Open a connection to the metrics exporter's socket, prepare to send requests to it +/// with hyper. +pub async fn connect_postgres_metrics_socket(pgdata: &Path) -> anyhow::Result> +where + B: hyper::body::Body + 'static + Send, + B::Data: Send, + B::Error: Into>, +{ + let socket_path = pgdata.join(NEON_COMMUNICATOR_SOCKET_NAME); + let socket_path_len = socket_path.display().to_string().len(); + + // There is a limit of around 100 bytes (108 on Linux?) on the length of the path to a + // Unix Domain socket. The limit is on the connect(2) function used to open the + // socket, not on the absolute path itself. Postgres changes the current directory to + // the data directory and uses a relative path to bind to the socket, and the relative + // path "./neon-communicator.socket" is always short, but when compute_ctl needs to + // open the socket, we need to use a full path, which can be arbitrarily long. + // + // There are a few ways we could work around this: + // + // 1. Change the current directory to the Postgres data directory and use a relative + // path in the connect(2) call. That's problematic because the current directory + // applies to the whole process. We could change the current directory early in + // compute_ctl startup, and that might be a good idea anyway for other reasons too: + // it would be more robust if the data directory is moved around or unlinked for some + // reason, and you would be less likely to accidentally litter other parts of the + // filesystem with e.g. temporary files. However, that's a pretty invasive change. + // + // 2. On Linux, you could open() the data directory, and refer to the the socket inside it + // as "/proc/self/fd//neon-communicator.socket". But that's Linux-only. + // + // 3. Create a symbolic link to the socket with a shorter path, and use that. + // + // We use the symbolic link approach here. Hopefully the paths we use in production + // are shorter, so that we can open the socket directly, so that this hack is needed + // only in development. + let connect_result = if socket_path_len < 100 { + // We can open the path directly with no hacks. + tokio::net::UnixStream::connect(socket_path).await + } else { + // The path to the socket is too long. Create a symlink to it with a shorter path. + let short_path = std::env::temp_dir().join(format!( + "compute_ctl.short-socket.{}.{}", + std::process::id(), + tokio::task::id() + )); + std::os::unix::fs::symlink(&socket_path, &short_path)?; + + // Delete the symlink as soon as we have connected to it. There's a small chance + // of leaking if the process dies before we remove it, so try to keep that window + // as small as possible. + scopeguard::defer! { + if let Err(err) = std::fs::remove_file(&short_path) { + tracing::warn!("could not remove symlink \"{}\" created for socket: {}", + short_path.display(), err); + } + } + + tracing::info!( + "created symlink \"{}\" for socket \"{}\", opening it now", + short_path.display(), + socket_path.display() + ); + + tokio::net::UnixStream::connect(&short_path).await + }; + + let stream = connect_result.context("opening postgres metrics socket")?; + + let io = TokioIo::new(stream); + let (request_sender, connection) = hyper::client::conn::http1::handshake(io).await.unwrap(); + + // spawn a task to poll the connection and drive the HTTP state + tokio::spawn(async move { + if let Err(e) = connection.await { + eprintln!("Error in connection: {}", e); + } + }); + + Ok(request_sender) +} diff --git a/pgxn/neon/Makefile b/pgxn/neon/Makefile index bf7aeb4108..df9a6f0e4b 100644 --- a/pgxn/neon/Makefile +++ b/pgxn/neon/Makefile @@ -5,6 +5,7 @@ MODULE_big = neon OBJS = \ $(WIN32RES) \ communicator.o \ + communicator_process.o \ extension_server.o \ file_cache.o \ hll.o \ diff --git a/pgxn/neon/communicator/Cargo.toml b/pgxn/neon/communicator/Cargo.toml index e95a269d90..c5b5804ebc 100644 --- a/pgxn/neon/communicator/Cargo.toml +++ b/pgxn/neon/communicator/Cargo.toml @@ -13,7 +13,14 @@ crate-type = ["staticlib"] testing = [] [dependencies] -neon-shmem.workspace = true +axum.workspace = true +http.workspace = true +tokio = { version = "1.43.1", features = ["macros", "net", "io-util", "rt", "rt-multi-thread"] } +tracing.workspace = true +tracing-subscriber.workspace = true + +metrics.workspace = true +utils.workspace = true workspace_hack = { version = "0.1", path = "../../../workspace_hack" } [build-dependencies] diff --git a/pgxn/neon/communicator/README.md b/pgxn/neon/communicator/README.md index 8169ae72b5..6c5f24ce52 100644 --- a/pgxn/neon/communicator/README.md +++ b/pgxn/neon/communicator/README.md @@ -1,7 +1,22 @@ -This package will evolve into a "compute-pageserver communicator" -process and machinery. For now, it's just a dummy that doesn't do -anything interesting, but it allows us to test the compilation and -linking of Rust code into the Postgres extensions. +# Communicator + +This package provides the so-called "compute-pageserver communicator", +or just "communicator" in short. The communicator is a separate +background worker process that runs in the PostgreSQL server. It's +part of the neon extension. Currently, it only provides an HTTP +endpoint for metrics, but in the future it will evolve to handle all +communications with the pageservers. + +## Source code view + +pgxn/neon/communicator_process.c + Contains code needed to start up the communicator process, and + the glue that interacts with PostgreSQL code and the Rust + code in the communicator process. + + +pgxn/neon/communicator/src/worker_process/ + Worker process main loop and glue code At compilation time, pgxn/neon/communicator/ produces a static library, libcommunicator.a. It is linked to the neon.so extension diff --git a/pgxn/neon/communicator/src/lib.rs b/pgxn/neon/communicator/src/lib.rs index 24c180d37d..6fc169ad47 100644 --- a/pgxn/neon/communicator/src/lib.rs +++ b/pgxn/neon/communicator/src/lib.rs @@ -1,6 +1 @@ -/// dummy function, just to test linking Rust functions into the C -/// extension -#[unsafe(no_mangle)] -pub extern "C" fn communicator_dummy(arg: u32) -> u32 { - arg + 1 -} +mod worker_process; diff --git a/pgxn/neon/communicator/src/worker_process/callbacks.rs b/pgxn/neon/communicator/src/worker_process/callbacks.rs new file mode 100644 index 0000000000..e98ffea80b --- /dev/null +++ b/pgxn/neon/communicator/src/worker_process/callbacks.rs @@ -0,0 +1,35 @@ +//! C callbacks to PostgreSQL facilities that the neon extension needs to provide. These are +//! implemented in `neon/pgxn/communicator_process.c`. The function signatures better match! +//! +//! These are called from the communicator threads! Careful what you do, most Postgres functions are +//! not safe to call in that context. + +unsafe extern "C" { + pub fn callback_set_my_latch_unsafe(); + + pub fn callback_get_lfc_metrics_unsafe() -> LfcMetrics; +} + +// safe wrappers + +pub(super) fn callback_set_my_latch() { + unsafe { callback_set_my_latch_unsafe() }; +} + +#[repr(C)] +pub struct LfcMetrics { + pub lfc_cache_size_limit: i64, + pub lfc_hits: i64, + pub lfc_misses: i64, + pub lfc_used: i64, + pub lfc_writes: i64, + + // working set size looking back 1..60 minutes. + // + // Index 0 is size of working set accessed within last 1 minute, + // index 59 is size of working set accessed within last 60 minutes. + pub lfc_approximate_working_set_size_windows: [i64; 60], +} +pub extern "C" fn callback_get_lfc_metrics() -> LfcMetrics { + unsafe { callback_get_lfc_metrics_unsafe() } +} diff --git a/pgxn/neon/communicator/src/worker_process/lfc_metrics.rs b/pgxn/neon/communicator/src/worker_process/lfc_metrics.rs new file mode 100644 index 0000000000..cb402ddef0 --- /dev/null +++ b/pgxn/neon/communicator/src/worker_process/lfc_metrics.rs @@ -0,0 +1,91 @@ +use metrics::{IntGauge, IntGaugeVec}; + +use super::callbacks::callback_get_lfc_metrics; + +pub(crate) struct LfcMetricsCollector { + lfc_cache_size_limit: IntGauge, + lfc_hits: IntGauge, + lfc_misses: IntGauge, + lfc_used: IntGauge, + lfc_writes: IntGauge, + lfc_approximate_working_set_size_windows_vec: IntGaugeVec, + lfc_approximate_working_set_size_windows: [IntGauge; 60], +} + +impl LfcMetricsCollector { + pub fn new() -> LfcMetricsCollector { + let lfc_approximate_working_set_size_windows_vec = IntGaugeVec::new( + metrics::opts!( + "lfc_approximate_working_set_size_windows", + "Approximate working set size in pages of 8192 bytes", + ), + &[&"duration_seconds"], + ) + .unwrap(); + + let lfc_approximate_working_set_size_windows: [IntGauge; 60] = (1..=60) + .map(|minutes| { + lfc_approximate_working_set_size_windows_vec + .with_label_values(&[&(minutes * 60).to_string()]) + }) + .collect::>() + .try_into() + .unwrap(); + + LfcMetricsCollector { + lfc_cache_size_limit: IntGauge::new( + "lfc_cache_size_limit", + "LFC cache size limit in bytes", + ) + .unwrap(), + lfc_hits: IntGauge::new("lfc_hits", "LFC cache hits").unwrap(), + lfc_misses: IntGauge::new("lfc_misses", "LFC cache misses").unwrap(), + lfc_used: IntGauge::new("lfc_used", "LFC chunks used (chunk = 1MB)").unwrap(), + lfc_writes: IntGauge::new("lfc_writes", "LFC cache writes").unwrap(), + + lfc_approximate_working_set_size_windows_vec, + lfc_approximate_working_set_size_windows, + } + } +} + +impl metrics::core::Collector for LfcMetricsCollector { + fn desc(&self) -> Vec<&metrics::core::Desc> { + let mut descs = Vec::new(); + + descs.append(&mut self.lfc_cache_size_limit.desc()); + descs.append(&mut self.lfc_hits.desc()); + descs.append(&mut self.lfc_misses.desc()); + descs.append(&mut self.lfc_used.desc()); + descs.append(&mut self.lfc_writes.desc()); + descs.append(&mut self.lfc_approximate_working_set_size_windows_vec.desc()); + + descs + } + + fn collect(&self) -> Vec { + let mut values = Vec::new(); + + // update the gauges + let lfc_metrics = callback_get_lfc_metrics(); + self.lfc_cache_size_limit + .set(lfc_metrics.lfc_cache_size_limit); + self.lfc_hits.set(lfc_metrics.lfc_hits); + self.lfc_misses.set(lfc_metrics.lfc_misses); + self.lfc_used.set(lfc_metrics.lfc_used); + self.lfc_writes.set(lfc_metrics.lfc_writes); + for i in 0..60 { + let val = lfc_metrics.lfc_approximate_working_set_size_windows[i]; + self.lfc_approximate_working_set_size_windows[i].set(val); + } + + values.append(&mut self.lfc_cache_size_limit.collect()); + values.append(&mut self.lfc_hits.collect()); + values.append(&mut self.lfc_misses.collect()); + values.append(&mut self.lfc_used.collect()); + values.append(&mut self.lfc_writes.collect()); + values.append(&mut self.lfc_approximate_working_set_size_windows_vec.collect()); + + values + } +} diff --git a/pgxn/neon/communicator/src/worker_process/logging.rs b/pgxn/neon/communicator/src/worker_process/logging.rs new file mode 100644 index 0000000000..f200eb0446 --- /dev/null +++ b/pgxn/neon/communicator/src/worker_process/logging.rs @@ -0,0 +1,228 @@ +//! Glue code to hook up Rust logging with the `tracing` crate to the PostgreSQL log +//! +//! In the Rust threads, the log messages are written to a mpsc Channel, and the Postgres +//! process latch is raised. That wakes up the loop in the main thread, see +//! `communicator_new_bgworker_main()`. It reads the message from the channel and +//! ereport()s it. This ensures that only one thread, the main thread, calls the +//! PostgreSQL logging routines at any time. + +use std::sync::mpsc::sync_channel; +use std::sync::mpsc::{Receiver, SyncSender}; +use std::sync::mpsc::{TryRecvError, TrySendError}; + +use tracing::info; +use tracing::{Event, Level, Metadata, Subscriber}; +use tracing_subscriber::filter::LevelFilter; +use tracing_subscriber::fmt::format::Writer; +use tracing_subscriber::fmt::{FmtContext, FormatEvent, FormatFields, FormattedFields, MakeWriter}; +use tracing_subscriber::registry::LookupSpan; + +use crate::worker_process::callbacks::callback_set_my_latch; + +pub struct LoggingState { + receiver: Receiver, +} + +/// Called once, at worker process startup. The returned LoggingState is passed back +/// in the subsequent calls to `pump_logging`. It is opaque to the C code. +#[unsafe(no_mangle)] +pub extern "C" fn configure_logging() -> Box { + let (sender, receiver) = sync_channel(1000); + + let maker = Maker { channel: sender }; + + use tracing_subscriber::prelude::*; + let r = tracing_subscriber::registry(); + + let r = r.with( + tracing_subscriber::fmt::layer() + .with_ansi(false) + .event_format(SimpleFormatter::new()) + .with_writer(maker) + // TODO: derive this from log_min_messages? + .with_filter(LevelFilter::from_level(Level::INFO)), + ); + r.init(); + + info!("communicator process logging started"); + + let state = LoggingState { receiver }; + + Box::new(state) +} + +/// Read one message from the logging queue. This is essentially a wrapper to Receiver, +/// with a C-friendly signature. +/// +/// The message is copied into *errbuf, which is a caller-supplied buffer of size +/// `errbuf_len`. If the message doesn't fit in the buffer, it is truncated. It is always +/// NULL-terminated. +/// +/// The error level is returned *elevel_p. It's one of the PostgreSQL error levels, see +/// elog.h +#[unsafe(no_mangle)] +pub extern "C" fn pump_logging( + state: &mut LoggingState, + errbuf: *mut u8, + errbuf_len: u32, + elevel_p: &mut i32, +) -> i32 { + let msg = match state.receiver.try_recv() { + Err(TryRecvError::Empty) => return 0, + Err(TryRecvError::Disconnected) => return -1, + Ok(msg) => msg, + }; + + let src: &[u8] = &msg.message; + let dst = errbuf; + let len = std::cmp::min(src.len(), errbuf_len as usize - 1); + unsafe { + std::ptr::copy_nonoverlapping(src.as_ptr(), dst, len); + *(errbuf.add(len)) = b'\0'; // NULL terminator + } + + // XXX: these levels are copied from PostgreSQL's elog.h. Introduce another enum to + // hide these? + *elevel_p = match msg.level { + Level::TRACE => 10, // DEBUG5 + Level::DEBUG => 14, // DEBUG1 + Level::INFO => 17, // INFO + Level::WARN => 19, // WARNING + Level::ERROR => 21, // ERROR + }; + + 1 +} + +//---- The following functions can be called from any thread ---- + +#[derive(Clone)] +struct FormattedEventWithMeta { + message: Vec, + level: tracing::Level, +} + +impl Default for FormattedEventWithMeta { + fn default() -> Self { + FormattedEventWithMeta { + message: Vec::new(), + level: tracing::Level::DEBUG, + } + } +} + +struct EventBuilder<'a> { + event: FormattedEventWithMeta, + + maker: &'a Maker, +} + +impl std::io::Write for EventBuilder<'_> { + fn write(&mut self, buf: &[u8]) -> std::io::Result { + self.event.message.write(buf) + } + fn flush(&mut self) -> std::io::Result<()> { + self.maker.send_event(self.event.clone()); + Ok(()) + } +} + +impl Drop for EventBuilder<'_> { + fn drop(&mut self) { + let maker = self.maker; + let event = std::mem::take(&mut self.event); + + maker.send_event(event); + } +} + +struct Maker { + channel: SyncSender, +} + +impl<'a> MakeWriter<'a> for Maker { + type Writer = EventBuilder<'a>; + + fn make_writer(&'a self) -> Self::Writer { + panic!("not expected to be called when make_writer_for is implemented"); + } + + fn make_writer_for(&'a self, meta: &Metadata<'_>) -> Self::Writer { + EventBuilder { + event: FormattedEventWithMeta { + message: Vec::new(), + level: *meta.level(), + }, + maker: self, + } + } +} + +impl Maker { + fn send_event(&self, e: FormattedEventWithMeta) { + match self.channel.try_send(e) { + Ok(()) => { + // notify the main thread + callback_set_my_latch(); + } + Err(TrySendError::Disconnected(_)) => {} + Err(TrySendError::Full(_)) => { + // TODO: record that some messages were lost + } + } + } +} + +/// Simple formatter implementation for tracing_subscriber, which prints the log spans and +/// message part like the default formatter, but no timestamp or error level. The error +/// level is captured separately by `FormattedEventWithMeta', and when the error is +/// printed by the main thread, with PostgreSQL ereport(), it gets a timestamp at that +/// point. (The timestamp printed will therefore lag behind the timestamp on the event +/// here, if the main thread doesn't process the log message promptly) +struct SimpleFormatter; + +impl FormatEvent for SimpleFormatter +where + S: Subscriber + for<'a> LookupSpan<'a>, + N: for<'a> FormatFields<'a> + 'static, +{ + fn format_event( + &self, + ctx: &FmtContext<'_, S, N>, + mut writer: Writer<'_>, + event: &Event<'_>, + ) -> std::fmt::Result { + // Format all the spans in the event's span context. + if let Some(scope) = ctx.event_scope() { + for span in scope.from_root() { + write!(writer, "{}", span.name())?; + + // `FormattedFields` is a formatted representation of the span's fields, + // which is stored in its extensions by the `fmt` layer's `new_span` + // method. The fields will have been formatted by the same field formatter + // that's provided to the event formatter in the `FmtContext`. + let ext = span.extensions(); + let fields = &ext + .get::>() + .expect("will never be `None`"); + + // Skip formatting the fields if the span had no fields. + if !fields.is_empty() { + write!(writer, "{{{fields}}}")?; + } + write!(writer, ": ")?; + } + } + + // Write fields on the event + ctx.field_format().format_fields(writer.by_ref(), event)?; + + writeln!(writer) + } +} + +impl SimpleFormatter { + fn new() -> Self { + SimpleFormatter {} + } +} diff --git a/pgxn/neon/communicator/src/worker_process/main_loop.rs b/pgxn/neon/communicator/src/worker_process/main_loop.rs new file mode 100644 index 0000000000..f3d7733904 --- /dev/null +++ b/pgxn/neon/communicator/src/worker_process/main_loop.rs @@ -0,0 +1,40 @@ +use std::str::FromStr as _; + +use crate::worker_process::lfc_metrics::LfcMetricsCollector; + +use utils::id::{TenantId, TimelineId}; + +pub struct CommunicatorWorkerProcessStruct { + /*** Metrics ***/ + pub(crate) lfc_metrics: LfcMetricsCollector, +} + +pub(super) async fn init( + tenant_id: String, + timeline_id: String, +) -> CommunicatorWorkerProcessStruct { + let _tenant_id = TenantId::from_str(&tenant_id).expect("invalid tenant ID"); + let _timeline_id = TimelineId::from_str(&timeline_id).expect("invalid timeline ID"); + + CommunicatorWorkerProcessStruct { + // metrics + lfc_metrics: LfcMetricsCollector::new(), + } +} + +impl metrics::core::Collector for CommunicatorWorkerProcessStruct { + fn desc(&self) -> Vec<&metrics::core::Desc> { + let mut descs = Vec::new(); + + descs.append(&mut self.lfc_metrics.desc()); + + descs + } + fn collect(&self) -> Vec { + let mut values = Vec::new(); + + values.append(&mut self.lfc_metrics.collect()); + + values + } +} diff --git a/pgxn/neon/communicator/src/worker_process/metrics_exporter.rs b/pgxn/neon/communicator/src/worker_process/metrics_exporter.rs new file mode 100644 index 0000000000..84cceb19df --- /dev/null +++ b/pgxn/neon/communicator/src/worker_process/metrics_exporter.rs @@ -0,0 +1,99 @@ +//! Export information about Postgres, the communicator process, file cache etc. as +//! prometheus metrics. +//! +//! The exporter speaks HTTP, listens on a Unix Domain Socket under the Postgres +//! data directory. For debugging, you can access it with curl: +//! +//! curl --unix-socket neon-communicator.socket http://localhost/metrics +//! +use axum::Router; +use axum::body::Body; +use axum::extract::State; +use axum::response::Response; +use http::StatusCode; +use http::header::CONTENT_TYPE; + +use metrics::proto::MetricFamily; +use metrics::{Encoder, TextEncoder}; + +use std::path::PathBuf; + +use tokio::net::UnixListener; + +use crate::worker_process::main_loop::CommunicatorWorkerProcessStruct; + +const NEON_COMMUNICATOR_SOCKET_NAME: &str = "neon-communicator.socket"; + +impl CommunicatorWorkerProcessStruct { + /// Launch the metrics exporter + pub(crate) async fn launch_metrics_exporter(&'static self) { + use axum::routing::get; + let app = Router::new() + .route("/metrics", get(get_metrics)) + .route("/autoscaling_metrics", get(get_autoscaling_metrics)) + .route("/debug/panic", get(handle_debug_panic)) + .with_state(self); + + // Listen on unix domain socket, in the data directory. That should be unique. + let path = PathBuf::from(NEON_COMMUNICATOR_SOCKET_NAME); + let listener = UnixListener::bind(path.clone()).unwrap(); + + tokio::spawn(async { + tracing::info!("metrics listener spawned"); + axum::serve(listener, app).await.unwrap() + }); + } +} + +/// Expose all Prometheus metrics. +async fn get_metrics(State(state): State<&CommunicatorWorkerProcessStruct>) -> Response { + use metrics::core::Collector; + let metrics = state.collect(); + tracing::trace!("/metrics requested"); + metrics_to_response(metrics).await +} + +/// Expose Prometheus metrics, for use by the autoscaling agent. +/// +/// This is a subset of all the metrics. +async fn get_autoscaling_metrics( + State(state): State<&CommunicatorWorkerProcessStruct>, +) -> Response { + use metrics::core::Collector; + let metrics = state.lfc_metrics.collect(); + + tracing::trace!("/autoscaling_metrics requested"); + metrics_to_response(metrics).await +} + +async fn handle_debug_panic(State(_state): State<&CommunicatorWorkerProcessStruct>) -> Response { + panic!("test HTTP handler task panic"); +} + +/// Helper function to convert prometheus metrics to a text response +async fn metrics_to_response(metrics: Vec) -> Response { + // When we call TextEncoder::encode() below, it will immediately return an + // error if a metric family has no metrics, so we need to preemptively + // filter out metric families with no metrics. + let metrics = metrics + .into_iter() + .filter(|m| !m.get_metric().is_empty()) + .collect::>(); + + let encoder = TextEncoder::new(); + let mut buffer = vec![]; + + if let Err(e) = encoder.encode(&metrics, &mut buffer) { + Response::builder() + .status(StatusCode::INTERNAL_SERVER_ERROR) + .header(CONTENT_TYPE, "application/text") + .body(Body::from(e.to_string())) + .unwrap() + } else { + Response::builder() + .status(StatusCode::OK) + .header(CONTENT_TYPE, encoder.format_type()) + .body(Body::from(buffer)) + .unwrap() + } +} diff --git a/pgxn/neon/communicator/src/worker_process/mod.rs b/pgxn/neon/communicator/src/worker_process/mod.rs new file mode 100644 index 0000000000..c58556081b --- /dev/null +++ b/pgxn/neon/communicator/src/worker_process/mod.rs @@ -0,0 +1,13 @@ +//! This code runs in the communicator worker process. This provides +//! the glue code to: +//! +//! - launch the main loop, +//! - receive IO requests from backends and process them, +//! - write results back to backends. + +mod callbacks; +mod lfc_metrics; +mod logging; +mod main_loop; +mod metrics_exporter; +mod worker_interface; diff --git a/pgxn/neon/communicator/src/worker_process/worker_interface.rs b/pgxn/neon/communicator/src/worker_process/worker_interface.rs new file mode 100644 index 0000000000..2ed30d20f6 --- /dev/null +++ b/pgxn/neon/communicator/src/worker_process/worker_interface.rs @@ -0,0 +1,39 @@ +//! Functions called from the C code in the worker process + +use std::ffi::{CStr, c_char}; + +use crate::worker_process::main_loop; +use crate::worker_process::main_loop::CommunicatorWorkerProcessStruct; + +/// Launch the communicator's tokio tasks, which do most of the work. +/// +/// The caller has initialized the process as a regular PostgreSQL background worker +/// process. +#[unsafe(no_mangle)] +pub extern "C" fn communicator_worker_process_launch( + tenant_id: *const c_char, + timeline_id: *const c_char, +) -> &'static CommunicatorWorkerProcessStruct { + // Convert the arguments into more convenient Rust types + let tenant_id = unsafe { CStr::from_ptr(tenant_id) }.to_str().unwrap(); + let timeline_id = unsafe { CStr::from_ptr(timeline_id) }.to_str().unwrap(); + + let runtime = tokio::runtime::Builder::new_multi_thread() + .enable_all() + .thread_name("communicator thread") + .build() + .unwrap(); + + let worker_struct = runtime.block_on(main_loop::init( + tenant_id.to_string(), + timeline_id.to_string(), + )); + let worker_struct = Box::leak(Box::new(worker_struct)); + + runtime.block_on(worker_struct.launch_metrics_exporter()); + + // keep the runtime running after we exit this function + Box::leak(Box::new(runtime)); + + worker_struct +} diff --git a/pgxn/neon/communicator_process.c b/pgxn/neon/communicator_process.c new file mode 100644 index 0000000000..f4beb0202a --- /dev/null +++ b/pgxn/neon/communicator_process.c @@ -0,0 +1,178 @@ +/*------------------------------------------------------------------------- + * + * communicator_process.c + * Functions for starting up the communicator background worker process. + * + * Currently, the communicator process only functions as a metrics + * exporter. It provides an HTTP endpoint for polling a limited set of + * metrics. TODO: In the future, it will do much more, i.e. handle all + * the communications with the pageservers. + * + * Portions Copyright (c) 1996-2021, PostgreSQL Global Development Group + * Portions Copyright (c) 1994, Regents of the University of California + * + *------------------------------------------------------------------------- + */ +#include "postgres.h" + +#include + +#include "miscadmin.h" +#include "postmaster/bgworker.h" +#include "postmaster/interrupt.h" +#include "postmaster/postmaster.h" +#include "replication/walsender.h" +#include "storage/ipc.h" +#include "storage/latch.h" +#include "storage/pmsignal.h" +#include "storage/procsignal.h" +#include "tcop/tcopprot.h" + +#include "communicator_process.h" +#include "file_cache.h" +#include "neon.h" +#include "neon_perf_counters.h" + +/* the rust bindings, generated by cbindgen */ +#include "communicator/communicator_bindings.h" + +PGDLLEXPORT void communicator_new_bgworker_main(Datum main_arg); + +/**** Initialization functions. These run in postmaster ****/ + +void +register_communicator_bgworker(void) +{ + BackgroundWorker bgw; + + /* Initialize the background worker process */ + memset(&bgw, 0, sizeof(bgw)); + bgw.bgw_flags = BGWORKER_SHMEM_ACCESS; + bgw.bgw_start_time = BgWorkerStart_PostmasterStart; + snprintf(bgw.bgw_library_name, BGW_MAXLEN, "neon"); + snprintf(bgw.bgw_function_name, BGW_MAXLEN, "communicator_new_bgworker_main"); + snprintf(bgw.bgw_name, BGW_MAXLEN, "Storage communicator process"); + snprintf(bgw.bgw_type, BGW_MAXLEN, "Storage communicator process"); + bgw.bgw_restart_time = 5; + bgw.bgw_notify_pid = 0; + bgw.bgw_main_arg = (Datum) 0; + + RegisterBackgroundWorker(&bgw); +} + +/**** Worker process functions. These run in the communicator worker process ****/ + +/* Entry point for the communicator bgworker process */ +void +communicator_new_bgworker_main(Datum main_arg) +{ + struct LoggingState *logging; + char errbuf[1000]; + int elevel; + const struct CommunicatorWorkerProcessStruct *proc_handle; + + /* + * Pretend that this process is a WAL sender. That affects the shutdown + * sequence: WAL senders are shut down last, after the final checkpoint + * has been written. That's what we want for the communicator process too. + */ + am_walsender = true; + MarkPostmasterChildWalSender(); + + /* Establish signal handlers. */ + pqsignal(SIGUSR1, procsignal_sigusr1_handler); + /* + * Postmaster sends us SIGUSR2 when all regular backends and bgworkers + * have exited, and it's time for us to exit too + */ + pqsignal(SIGUSR2, die); + pqsignal(SIGHUP, SignalHandlerForConfigReload); + pqsignal(SIGTERM, die); + + BackgroundWorkerUnblockSignals(); + + logging = configure_logging(); + + proc_handle = communicator_worker_process_launch( + neon_tenant, + neon_timeline); + + /* proc_handle is not currently used, but will be in the future */ + (void) proc_handle; + + /* + * The Rust tokio runtime has been launched, and it's running in the + * background now. This process is now multi-threaded! The Rust threads do + * not call into any Postgres functions. + * + * This loop in the main thread handles any interactions we need with the + * rest of PostgreSQL. + */ + elog(LOG, "communicator threads started"); + for (;;) + { + int32 rc; + + /* + * Check interrupts like system shutdown or config reload + */ + CHECK_FOR_INTERRUPTS(); + if (ConfigReloadPending) + { + ConfigReloadPending = false; + ProcessConfigFile(PGC_SIGHUP); + } + + /* + * Forward any log messages from the Rust threads into the normal Postgres + * logging facility. + */ + for (;;) + { + rc = pump_logging(logging, (uint8 *) errbuf, sizeof(errbuf), &elevel); + if (rc == 0) + { + /* nothing to do */ + break; + } + else if (rc == 1) + { + /* Because we don't want to exit on error */ + if (elevel == ERROR) + elevel = LOG; + if (elevel == INFO) + elevel = LOG; + elog(elevel, "[COMMUNICATOR] %s", errbuf); + } + else if (rc == -1) + { + elog(ERROR, "logging channel was closed unexpectedly"); + } + } + + /* + * Wait until we are woken up. The rust threads will set the latch if + * there's log message to forward. + */ + (void) WaitLatch(MyLatch, + WL_LATCH_SET | WL_EXIT_ON_PM_DEATH, + 0, + PG_WAIT_EXTENSION); + ResetLatch(MyLatch); + } +} + +/**** + * Callbacks from the rust code, in the communicator process. + * + * NOTE: These must be thread-safe! It's very limited which PostgreSQL + * functions you can use!!! + * + * The signatures of these need to match those in the Rust code. + */ + +void +callback_set_my_latch_unsafe(void) +{ + SetLatch(MyLatch); +} diff --git a/pgxn/neon/communicator_process.h b/pgxn/neon/communicator_process.h new file mode 100644 index 0000000000..3b1d88b6ac --- /dev/null +++ b/pgxn/neon/communicator_process.h @@ -0,0 +1,17 @@ +/*------------------------------------------------------------------------- + * + * communicator_process.h + * Communicator process + * + * + * Portions Copyright (c) 1996-2021, PostgreSQL Global Development Group + * Portions Copyright (c) 1994, Regents of the University of California + * + *------------------------------------------------------------------------- + */ +#ifndef COMMUNICATOR_PROCESS_H +#define COMMUNICATOR_PROCESS_H + +extern void register_communicator_bgworker(void); + +#endif /* COMMUNICATOR_PROCESS_H */ diff --git a/pgxn/neon/file_cache.c b/pgxn/neon/file_cache.c index 2c87f139af..7800770c80 100644 --- a/pgxn/neon/file_cache.c +++ b/pgxn/neon/file_cache.c @@ -52,6 +52,8 @@ #include "pagestore_client.h" #include "communicator.h" +#include "communicator/communicator_bindings.h" + #define CriticalAssert(cond) do if (!(cond)) elog(PANIC, "LFC: assertion %s failed at %s:%d: ", #cond, __FILE__, __LINE__); while (0) /* @@ -2179,6 +2181,38 @@ lfc_approximate_working_set_size_seconds(time_t duration, bool reset) return dc; } +/* + * Get metrics, for the built-in metrics exporter that's part of the communicator + * process. + * + * NB: This is called from a Rust tokio task inside the communicator process. + * Acquiring lwlocks, elog(), allocating memory etc. or anything else + * non-trivial is strictly prohibited here! + */ +struct LfcMetrics +callback_get_lfc_metrics_unsafe(void) +{ + struct LfcMetrics result = { + .lfc_cache_size_limit = lfc_size_limit, + .lfc_hits = lfc_ctl ? lfc_ctl->hits : 0, + .lfc_misses = lfc_ctl ? lfc_ctl->misses : 0, + .lfc_used = lfc_ctl ? lfc_ctl->used : 0, + .lfc_writes = lfc_ctl ? lfc_ctl->writes : 0, + }; + + if (lfc_ctl) + { + for (int minutes = 1; minutes <= 60; minutes++) + { + result.lfc_approximate_working_set_size_windows[minutes - 1] = + lfc_approximate_working_set_size_seconds(minutes * 60, false); + } + } + + return result; +} + + PG_FUNCTION_INFO_V1(get_local_cache_state); Datum diff --git a/pgxn/neon/neon.c b/pgxn/neon/neon.c index 7b749f1080..d8ea20331d 100644 --- a/pgxn/neon/neon.c +++ b/pgxn/neon/neon.c @@ -30,6 +30,7 @@ #include "utils/guc_tables.h" #include "communicator.h" +#include "communicator_process.h" #include "extension_server.h" #include "file_cache.h" #include "neon.h" @@ -455,14 +456,13 @@ _PG_init(void) shmem_startup_hook = neon_shmem_startup_hook; #endif - /* dummy call to a Rust function in the communicator library, to check that it works */ - (void) communicator_dummy(123); - pg_init_libpagestore(); lfc_init(); pg_init_walproposer(); init_lwlsncache(); + register_communicator_bgworker(); + pg_init_communicator(); Custom_XLogReaderRoutines = NeonOnDemandXLogReaderRoutines; diff --git a/poetry.lock b/poetry.lock index 1bc5077eb7..bd93fea10a 100644 --- a/poetry.lock +++ b/poetry.lock @@ -1,4 +1,4 @@ -# This file is automatically @generated by Poetry 2.1.1 and should not be changed by hand. +# This file is automatically @generated by Poetry 2.1.3 and should not be changed by hand. [[package]] name = "aiohappyeyeballs" @@ -3071,6 +3071,21 @@ urllib3 = ">=1.21.1,<3" socks = ["PySocks (>=1.5.6,!=1.5.7)"] use-chardet-on-py3 = ["chardet (>=3.0.2,<6)"] +[[package]] +name = "requests-unixsocket" +version = "0.4.1" +description = "Use requests to talk HTTP via a UNIX domain socket" +optional = false +python-versions = ">=3.9" +groups = ["main"] +files = [ + {file = "requests_unixsocket-0.4.1-py3-none-any.whl", hash = "sha256:60c4942e9dbecc2f64d611039fb1dfc25da382083c6434ac0316dca3ff908f4d"}, + {file = "requests_unixsocket-0.4.1.tar.gz", hash = "sha256:b2596158c356ecee68d27ba469a52211230ac6fb0cde8b66afb19f0ed47a1995"}, +] + +[package.dependencies] +requests = ">=1.1" + [[package]] name = "responses" version = "0.25.3" @@ -3847,4 +3862,4 @@ cffi = ["cffi (>=1.11)"] [metadata] lock-version = "2.1" python-versions = "^3.11" -content-hash = "bd93313f110110aa53b24a3ed47ba2d7f60e2c658a79cdff7320fed1bb1b57b5" +content-hash = "b741d0b6f7cd3a062dedb8896471b6e7ba20ab1caef82c060506562e19380ad5" diff --git a/pyproject.toml b/pyproject.toml index e7e314d144..a1ab00ec35 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -50,6 +50,7 @@ types-pyyaml = "^6.0.12.20240917" testcontainers = "^4.9.0" # Install a release candidate of `jsonnet`, as it supports Python 3.13 jsonnet = "^0.21.0-rc2" +requests-unixsocket = "^0.4.1" [tool.poetry.group.dev.dependencies] mypy = "==1.13.0" diff --git a/test_runner/fixtures/endpoint/http.py b/test_runner/fixtures/endpoint/http.py index 1d278095ce..c43445e89d 100644 --- a/test_runner/fixtures/endpoint/http.py +++ b/test_runner/fixtures/endpoint/http.py @@ -66,6 +66,12 @@ class EndpointHttpClient(requests.Session): res.raise_for_status() return res.json() + def autoscaling_metrics(self): + res = self.get(f"http://localhost:{self.external_port}/autoscaling_metrics") + res.raise_for_status() + log.debug("raw compute metrics: %s", res.text) + return res.text + def prewarm_lfc_status(self) -> dict[str, str]: res = self.get(self.prewarm_url) res.raise_for_status() diff --git a/test_runner/fixtures/neon_fixtures.py b/test_runner/fixtures/neon_fixtures.py index b9fff05c6c..4428e727a9 100644 --- a/test_runner/fixtures/neon_fixtures.py +++ b/test_runner/fixtures/neon_fixtures.py @@ -5417,6 +5417,7 @@ SKIP_FILES = frozenset( "postmaster.pid", "pg_control", "pg_dynshmem", + "neon-communicator.socket", ) ) diff --git a/test_runner/regress/test_communicator_metrics_exporter.py b/test_runner/regress/test_communicator_metrics_exporter.py new file mode 100644 index 0000000000..bfaf79b7cf --- /dev/null +++ b/test_runner/regress/test_communicator_metrics_exporter.py @@ -0,0 +1,52 @@ +from __future__ import annotations + +import os +from typing import TYPE_CHECKING + +import pytest +import requests +import requests_unixsocket +from fixtures.metrics import parse_metrics + +if TYPE_CHECKING: + from fixtures.neon_fixtures import NeonEnv + +NEON_COMMUNICATOR_SOCKET_NAME = "neon-communicator.socket" + + +def test_communicator_metrics(neon_simple_env: NeonEnv): + """ + Test the communicator's built-in HTTP prometheus exporter + """ + env = neon_simple_env + + endpoint = env.endpoints.create("main") + endpoint.start() + + os.chdir(endpoint.pgdata_dir) + + session = requests_unixsocket.Session() + r = session.get(f"http+unix://{NEON_COMMUNICATOR_SOCKET_NAME}/metrics") + assert r.status_code == 200 + + # quick test that the endpoint returned something expected. (We don't validate + # that the metrics returned are sensible.) + m = parse_metrics(r.text) + m.query_one("lfc_hits") + m.query_one("lfc_misses") + + # Test panic handling. The /debug/panic endpoint raises a Rust panic. It's + # expected to unwind and drop the HTTP connection without response, but not + # kill the process or the server. + with pytest.raises( + requests.ConnectionError, match="Remote end closed connection without response" + ): + r = session.get(f"http+unix://{NEON_COMMUNICATOR_SOCKET_NAME}/debug/panic") + assert r.status_code == 500 + + # Test that subsequent requests after the panic still work. + r = session.get(f"http+unix://{NEON_COMMUNICATOR_SOCKET_NAME}/metrics") + assert r.status_code == 200 + m = parse_metrics(r.text) + m.query_one("lfc_hits") + m.query_one("lfc_misses") diff --git a/test_runner/regress/test_lfc_working_set_approximation.py b/test_runner/regress/test_lfc_working_set_approximation.py index a28bc3d047..9637a4b84b 100644 --- a/test_runner/regress/test_lfc_working_set_approximation.py +++ b/test_runner/regress/test_lfc_working_set_approximation.py @@ -1,11 +1,13 @@ from __future__ import annotations import time +from logging import debug from pathlib import Path from typing import TYPE_CHECKING import pytest from fixtures.log_helper import log +from fixtures.metrics import parse_metrics from fixtures.utils import USE_LFC, query_scalar if TYPE_CHECKING: @@ -75,10 +77,22 @@ WITH (fillfactor='100'); cur.execute("SELECT abalance FROM pgbench_accounts WHERE aid = 104242") cur.execute("SELECT abalance FROM pgbench_accounts WHERE aid = 204242") # verify working set size after some index access of a few select pages only - blocks = query_scalar(cur, "select approximate_working_set_size(true)") + blocks = query_scalar(cur, "select approximate_working_set_size(false)") log.info(f"working set size after some index access of a few select pages only {blocks}") assert blocks < 20 + # Also test the metrics from the /autoscaling_metrics endpoint + autoscaling_metrics = endpoint.http_client().autoscaling_metrics() + log.debug(f"Raw metrics: {autoscaling_metrics}") + m = parse_metrics(autoscaling_metrics) + + http_estimate = m.query_one("lfc_approximate_working_set_size_windows", + { + "duration_seconds": "60", + }, + ).value + log.info(f"http estimate: {http_estimate}, blocks: {blocks}") + assert http_estimate > 0 and http_estimate < 20 @pytest.mark.skipif(not USE_LFC, reason="LFC is disabled, skipping") def test_sliding_working_set_approximation(neon_simple_env: NeonEnv):