Introduce built-in Prometheus exporter to the Postgres extension

Currently, the exporter exposes the same LFC metrics that are exposed
by the "autoscaling" sql_exporter in the docker image. With this, we
can remove the dedicated sql_exporter instance. But that's left as a
TODO until this is rolled out to production and we have changed
autoscaling-agent to fetch the metrics from this new endpoint.

The exporter runs as a Postgres background worker process. This is
extracted from the Rust communicator rewrite project, which will use
the same worker process for much more, to handle the communications
with the pageservers. For now, though, it merely handles the metrics
requests.

In the future, we will add more metrics, and perhaps even APIs to
control the running Postgres instance.

The exporter listens on a Unix Domain socket within the Postgres data
directory. A Unix Domain socket is a bit inconventional, but it has
some advantages:

- Permissions are taken care of. Only processes that can access the
  data directory, and therefore already have full access to the
  running Postgres instance, can connect to it.

- No need to allocate and manage a new port number for the listener

It has some downsides too: it's not immediately accessible from the
outside world, and the functions to work with Unix Domain sockets are
more low-level than TCP sockets (see the symlink hack in
`postgres_metrics_client.rs`, for example).

To expose the metrics from the local Unix Domain Socket to the
autoscaling agent, introduce a new '/autoscaling_metrics' endpoint in
the compute_ctl's HTTP server. Currently it merely forwards the
request to the Postgres instance, but we could add rate limiting and
access control there in the future.
This commit is contained in:
Heikki Linnakangas
2025-07-14 14:32:39 +03:00
parent ff526a1051
commit 70fdd75c89
27 changed files with 1079 additions and 21 deletions

12
Cargo.lock generated
View File

@@ -1290,8 +1290,14 @@ dependencies = [
name = "communicator"
version = "0.1.0"
dependencies = [
"axum",
"cbindgen",
"neon-shmem",
"http 1.1.0",
"metrics",
"tokio",
"tracing",
"tracing-subscriber",
"utils",
"workspace_hack",
]
@@ -1335,6 +1341,9 @@ dependencies = [
"futures",
"hostname-validator",
"http 1.1.0",
"http-body-util",
"hyper 1.4.1",
"hyper-util",
"indexmap 2.9.0",
"itertools 0.10.5",
"jsonwebtoken",
@@ -1357,6 +1366,7 @@ dependencies = [
"ring",
"rlimit",
"rust-ini",
"scopeguard",
"serde",
"serde_json",
"serde_with",

View File

@@ -27,7 +27,10 @@ fail.workspace = true
flate2.workspace = true
futures.workspace = true
http.workspace = true
http-body-util.workspace = true
hostname-validator = "1.1"
hyper.workspace = true
hyper-util.workspace = true
indexmap.workspace = true
itertools.workspace = true
jsonwebtoken.workspace = true
@@ -44,6 +47,7 @@ postgres.workspace = true
regex.workspace = true
reqwest = { workspace = true, features = ["json"] }
ring = "0.17"
scopeguard.workspace = true
serde.workspace = true
serde_with.workspace = true
serde_json.workspace = true

View File

@@ -1,12 +1,20 @@
use std::path::Path;
use std::sync::Arc;
use anyhow::Context;
use axum::body::Body;
use axum::extract::State;
use axum::response::Response;
use http::StatusCode;
use http::header::CONTENT_TYPE;
use http_body_util::BodyExt;
use hyper::{Request, StatusCode};
use metrics::proto::MetricFamily;
use metrics::{Encoder, TextEncoder};
use crate::compute::ComputeNode;
use crate::http::JsonResponse;
use crate::metrics::collect;
use crate::postgres_metrics_client::connect_postgres_metrics_socket;
/// Expose Prometheus metrics.
pub(in crate::http) async fn get_metrics() -> Response {
@@ -31,3 +39,56 @@ pub(in crate::http) async fn get_metrics() -> Response {
.body(Body::from(buffer))
.unwrap()
}
/// Fetch and forward metrics from the Postgres neon extension's metrics
/// exporter that are used by autoscaling-agent.
///
/// The neon extension exposes these metrics over a Unix domain socket
/// in the data directory. That's not accessible directly from the outside
/// world, so we have this endpoint in compute_ctl to expose it
pub(in crate::http) async fn get_autoscaling_metrics(
State(compute): State<Arc<ComputeNode>>,
) -> Result<Response, Response> {
let pgdata = Path::new(&compute.params.pgdata);
// Connect to the communicator process's metrics socket
let mut metrics_client = connect_postgres_metrics_socket(pgdata)
.await
.map_err(|e| JsonResponse::error(StatusCode::INTERNAL_SERVER_ERROR, format!("{e:#}")))?;
// Make a request for /autoscaling_metrics
let request = Request::builder()
.method("GET")
.uri("/autoscaling_metrics")
.header("Host", "localhost") // hyper requires Host, even though the server won't care
.body(Body::from(""))
.unwrap();
let resp = metrics_client
.send_request(request)
.await
.context("fetching metrics from Postgres metrics service")
.map_err(|e| JsonResponse::error(StatusCode::INTERNAL_SERVER_ERROR, format!("{e:#}")))?;
// Check response status
let status = resp.status();
if status != StatusCode::OK {
return Err(JsonResponse::error(
status,
format!(
"Postgres metrics service returned error: {}",
status.to_string()
),
));
}
// Build an OK response, with the body forwarded from the response we got.
let mut response = Response::builder();
response = response.status(StatusCode::OK);
if let Some(content_type) = resp.headers().get(CONTENT_TYPE) {
response = response.header(CONTENT_TYPE, content_type);
}
let body = tonic::service::AxumBody::from_stream(resp.into_body().into_data_stream());
Ok(response.body(body).unwrap())
}

View File

@@ -81,8 +81,12 @@ impl From<&Server> for Router<Arc<ComputeNode>> {
Server::External {
config, compute_id, ..
} => {
let unauthenticated_router =
Router::<Arc<ComputeNode>>::new().route("/metrics", get(metrics::get_metrics));
let unauthenticated_router = Router::<Arc<ComputeNode>>::new()
.route("/metrics", get(metrics::get_metrics))
.route(
"/autoscaling_metrics",
get(metrics::get_autoscaling_metrics),
);
let authenticated_router = Router::<Arc<ComputeNode>>::new()
.route("/lfc/prewarm", get(lfc::prewarm_state).post(lfc::prewarm))

View File

@@ -24,6 +24,7 @@ pub mod monitor;
pub mod params;
pub mod pg_helpers;
pub mod pgbouncer;
pub mod postgres_metrics_client;
pub mod rsyslog;
pub mod spec;
mod spec_apply;

View File

@@ -0,0 +1,97 @@
//! Client for making request to a running Postgres server's metrics service
//!
//! The storage communicator process that runs inside Postgres exposes
//! an HTTP endpoint in a Unix Domain Socket in the Postgres data
//! directory. This provides access to it.
use std::path::Path;
use anyhow::Context;
use hyper::client::conn::http1::SendRequest;
use hyper_util::rt::TokioIo;
/// Name of the socket within the Postgres data directory. This better match that in
/// `pgxn/neon/communicator/src/worker_process/metrics_exporter.rs`.
const NEON_COMMUNICATOR_SOCKET_NAME: &str = "neon-communicator.socket";
/// Open a connection to the metrics exporter's socket, prepare to send requests to it
/// with hyper.
pub async fn connect_postgres_metrics_socket<B>(pgdata: &Path) -> anyhow::Result<SendRequest<B>>
where
B: hyper::body::Body + 'static + Send,
B::Data: Send,
B::Error: Into<Box<dyn std::error::Error + Send + Sync>>,
{
let socket_path = pgdata.join(NEON_COMMUNICATOR_SOCKET_NAME);
let socket_path_len = socket_path.display().to_string().len();
// There is a limit of around 100 bytes (108 on Linux?) on the length of the path to a
// Unix Domain socket. The limit is on the connect(2) function used to open the
// socket, not on the absolute path itself. Postgres changes the current directory to
// the data directory and uses a relative path to bind to the socket, and the relative
// path "./neon-communicator.socket" is always short, but when compute_ctl needs to
// open the socket, we need to use a full path, which can be arbitrarily long.
//
// There are a few ways we could work around this:
//
// 1. Change the current directory to the Postgres data directory and use a relative
// path in the connect(2) call. That's problematic because the current directory
// applies to the whole process. We could change the current directory early in
// compute_ctl startup, and that might be a good idea anyway for other reasons too:
// it would be more robust if the data directory is moved around or unlinked for some
// reason, and you would be less likely to accidentally litter other parts of the
// filesystem with e.g. temporary files. However, that's a pretty invasive change.
//
// 2. On Linux, you could open() the data directory, and refer to the the socket inside it
// as "/proc/self/fd/<fd>/neon-communicator.socket". But that's Linux-only.
//
// 3. Create a symbolic link to the socket with a shorter path, and use that.
//
// We use the symbolic link approach here. Hopefully the paths we use in production
// are shorter, so that we can open the socket directly, so that this hack is needed
// only in development.
let connect_result = if socket_path_len < 100 {
// We can open the path directly with no hacks.
tokio::net::UnixStream::connect(socket_path).await
} else {
// The path to the socket is too long. Create a symlink to it with a shorter path.
let short_path = std::env::temp_dir().join(format!(
"compute_ctl.short-socket.{}.{}",
std::process::id(),
tokio::task::id()
));
std::os::unix::fs::symlink(&socket_path, &short_path)?;
// Delete the symlink as soon as we have connected to it. There's a small chance
// of leaking if the process dies before we remove it, so try to keep that window
// as small as possible.
scopeguard::defer! {
if let Err(err) = std::fs::remove_file(&short_path) {
tracing::warn!("could not remove symlink \"{}\" created for socket: {}",
short_path.display(), err);
}
}
tracing::info!(
"created symlink \"{}\" for socket \"{}\", opening it now",
short_path.display(),
socket_path.display()
);
tokio::net::UnixStream::connect(&short_path).await
};
let stream = connect_result.context("opening postgres metrics socket")?;
let io = TokioIo::new(stream);
let (request_sender, connection) = hyper::client::conn::http1::handshake(io).await.unwrap();
// spawn a task to poll the connection and drive the HTTP state
tokio::spawn(async move {
if let Err(e) = connection.await {
eprintln!("Error in connection: {}", e);
}
});
Ok(request_sender)
}

View File

@@ -5,6 +5,7 @@ MODULE_big = neon
OBJS = \
$(WIN32RES) \
communicator.o \
communicator_process.o \
extension_server.o \
file_cache.o \
hll.o \

View File

@@ -13,7 +13,14 @@ crate-type = ["staticlib"]
testing = []
[dependencies]
neon-shmem.workspace = true
axum.workspace = true
http.workspace = true
tokio = { version = "1.43.1", features = ["macros", "net", "io-util", "rt", "rt-multi-thread"] }
tracing.workspace = true
tracing-subscriber.workspace = true
metrics.workspace = true
utils.workspace = true
workspace_hack = { version = "0.1", path = "../../../workspace_hack" }
[build-dependencies]

View File

@@ -1,7 +1,22 @@
This package will evolve into a "compute-pageserver communicator"
process and machinery. For now, it's just a dummy that doesn't do
anything interesting, but it allows us to test the compilation and
linking of Rust code into the Postgres extensions.
# Communicator
This package provides the so-called "compute-pageserver communicator",
or just "communicator" in short. The communicator is a separate
background worker process that runs in the PostgreSQL server. It's
part of the neon extension. Currently, it only provides an HTTP
endpoint for metrics, but in the future it will evolve to handle all
communications with the pageservers.
## Source code view
pgxn/neon/communicator_process.c
Contains code needed to start up the communicator process, and
the glue that interacts with PostgreSQL code and the Rust
code in the communicator process.
pgxn/neon/communicator/src/worker_process/
Worker process main loop and glue code
At compilation time, pgxn/neon/communicator/ produces a static
library, libcommunicator.a. It is linked to the neon.so extension

View File

@@ -1,6 +1 @@
/// dummy function, just to test linking Rust functions into the C
/// extension
#[unsafe(no_mangle)]
pub extern "C" fn communicator_dummy(arg: u32) -> u32 {
arg + 1
}
mod worker_process;

View File

@@ -0,0 +1,35 @@
//! C callbacks to PostgreSQL facilities that the neon extension needs to provide. These are
//! implemented in `neon/pgxn/communicator_process.c`. The function signatures better match!
//!
//! These are called from the communicator threads! Careful what you do, most Postgres functions are
//! not safe to call in that context.
unsafe extern "C" {
pub fn callback_set_my_latch_unsafe();
pub fn callback_get_lfc_metrics_unsafe() -> LfcMetrics;
}
// safe wrappers
pub(super) fn callback_set_my_latch() {
unsafe { callback_set_my_latch_unsafe() };
}
#[repr(C)]
pub struct LfcMetrics {
pub lfc_cache_size_limit: i64,
pub lfc_hits: i64,
pub lfc_misses: i64,
pub lfc_used: i64,
pub lfc_writes: i64,
// working set size looking back 1..60 minutes.
//
// Index 0 is size of working set accessed within last 1 minute,
// index 59 is size of working set accessed within last 60 minutes.
pub lfc_approximate_working_set_size_windows: [i64; 60],
}
pub extern "C" fn callback_get_lfc_metrics() -> LfcMetrics {
unsafe { callback_get_lfc_metrics_unsafe() }
}

View File

@@ -0,0 +1,91 @@
use metrics::{IntGauge, IntGaugeVec};
use super::callbacks::callback_get_lfc_metrics;
pub(crate) struct LfcMetricsCollector {
lfc_cache_size_limit: IntGauge,
lfc_hits: IntGauge,
lfc_misses: IntGauge,
lfc_used: IntGauge,
lfc_writes: IntGauge,
lfc_approximate_working_set_size_windows_vec: IntGaugeVec,
lfc_approximate_working_set_size_windows: [IntGauge; 60],
}
impl LfcMetricsCollector {
pub fn new() -> LfcMetricsCollector {
let lfc_approximate_working_set_size_windows_vec = IntGaugeVec::new(
metrics::opts!(
"lfc_approximate_working_set_size_windows",
"Approximate working set size in pages of 8192 bytes",
),
&[&"duration_seconds"],
)
.unwrap();
let lfc_approximate_working_set_size_windows: [IntGauge; 60] = (1..=60)
.map(|minutes| {
lfc_approximate_working_set_size_windows_vec
.with_label_values(&[&(minutes * 60).to_string()])
})
.collect::<Vec<_>>()
.try_into()
.unwrap();
LfcMetricsCollector {
lfc_cache_size_limit: IntGauge::new(
"lfc_cache_size_limit",
"LFC cache size limit in bytes",
)
.unwrap(),
lfc_hits: IntGauge::new("lfc_hits", "LFC cache hits").unwrap(),
lfc_misses: IntGauge::new("lfc_misses", "LFC cache misses").unwrap(),
lfc_used: IntGauge::new("lfc_used", "LFC chunks used (chunk = 1MB)").unwrap(),
lfc_writes: IntGauge::new("lfc_writes", "LFC cache writes").unwrap(),
lfc_approximate_working_set_size_windows_vec,
lfc_approximate_working_set_size_windows,
}
}
}
impl metrics::core::Collector for LfcMetricsCollector {
fn desc(&self) -> Vec<&metrics::core::Desc> {
let mut descs = Vec::new();
descs.append(&mut self.lfc_cache_size_limit.desc());
descs.append(&mut self.lfc_hits.desc());
descs.append(&mut self.lfc_misses.desc());
descs.append(&mut self.lfc_used.desc());
descs.append(&mut self.lfc_writes.desc());
descs.append(&mut self.lfc_approximate_working_set_size_windows_vec.desc());
descs
}
fn collect(&self) -> Vec<metrics::proto::MetricFamily> {
let mut values = Vec::new();
// update the gauges
let lfc_metrics = callback_get_lfc_metrics();
self.lfc_cache_size_limit
.set(lfc_metrics.lfc_cache_size_limit);
self.lfc_hits.set(lfc_metrics.lfc_hits);
self.lfc_misses.set(lfc_metrics.lfc_misses);
self.lfc_used.set(lfc_metrics.lfc_used);
self.lfc_writes.set(lfc_metrics.lfc_writes);
for i in 0..60 {
let val = lfc_metrics.lfc_approximate_working_set_size_windows[i];
self.lfc_approximate_working_set_size_windows[i].set(val);
}
values.append(&mut self.lfc_cache_size_limit.collect());
values.append(&mut self.lfc_hits.collect());
values.append(&mut self.lfc_misses.collect());
values.append(&mut self.lfc_used.collect());
values.append(&mut self.lfc_writes.collect());
values.append(&mut self.lfc_approximate_working_set_size_windows_vec.collect());
values
}
}

View File

@@ -0,0 +1,228 @@
//! Glue code to hook up Rust logging with the `tracing` crate to the PostgreSQL log
//!
//! In the Rust threads, the log messages are written to a mpsc Channel, and the Postgres
//! process latch is raised. That wakes up the loop in the main thread, see
//! `communicator_new_bgworker_main()`. It reads the message from the channel and
//! ereport()s it. This ensures that only one thread, the main thread, calls the
//! PostgreSQL logging routines at any time.
use std::sync::mpsc::sync_channel;
use std::sync::mpsc::{Receiver, SyncSender};
use std::sync::mpsc::{TryRecvError, TrySendError};
use tracing::info;
use tracing::{Event, Level, Metadata, Subscriber};
use tracing_subscriber::filter::LevelFilter;
use tracing_subscriber::fmt::format::Writer;
use tracing_subscriber::fmt::{FmtContext, FormatEvent, FormatFields, FormattedFields, MakeWriter};
use tracing_subscriber::registry::LookupSpan;
use crate::worker_process::callbacks::callback_set_my_latch;
pub struct LoggingState {
receiver: Receiver<FormattedEventWithMeta>,
}
/// Called once, at worker process startup. The returned LoggingState is passed back
/// in the subsequent calls to `pump_logging`. It is opaque to the C code.
#[unsafe(no_mangle)]
pub extern "C" fn configure_logging() -> Box<LoggingState> {
let (sender, receiver) = sync_channel(1000);
let maker = Maker { channel: sender };
use tracing_subscriber::prelude::*;
let r = tracing_subscriber::registry();
let r = r.with(
tracing_subscriber::fmt::layer()
.with_ansi(false)
.event_format(SimpleFormatter::new())
.with_writer(maker)
// TODO: derive this from log_min_messages?
.with_filter(LevelFilter::from_level(Level::INFO)),
);
r.init();
info!("communicator process logging started");
let state = LoggingState { receiver };
Box::new(state)
}
/// Read one message from the logging queue. This is essentially a wrapper to Receiver,
/// with a C-friendly signature.
///
/// The message is copied into *errbuf, which is a caller-supplied buffer of size
/// `errbuf_len`. If the message doesn't fit in the buffer, it is truncated. It is always
/// NULL-terminated.
///
/// The error level is returned *elevel_p. It's one of the PostgreSQL error levels, see
/// elog.h
#[unsafe(no_mangle)]
pub extern "C" fn pump_logging(
state: &mut LoggingState,
errbuf: *mut u8,
errbuf_len: u32,
elevel_p: &mut i32,
) -> i32 {
let msg = match state.receiver.try_recv() {
Err(TryRecvError::Empty) => return 0,
Err(TryRecvError::Disconnected) => return -1,
Ok(msg) => msg,
};
let src: &[u8] = &msg.message;
let dst = errbuf;
let len = std::cmp::min(src.len(), errbuf_len as usize - 1);
unsafe {
std::ptr::copy_nonoverlapping(src.as_ptr(), dst, len);
*(errbuf.add(len)) = b'\0'; // NULL terminator
}
// XXX: these levels are copied from PostgreSQL's elog.h. Introduce another enum to
// hide these?
*elevel_p = match msg.level {
Level::TRACE => 10, // DEBUG5
Level::DEBUG => 14, // DEBUG1
Level::INFO => 17, // INFO
Level::WARN => 19, // WARNING
Level::ERROR => 21, // ERROR
};
1
}
//---- The following functions can be called from any thread ----
#[derive(Clone)]
struct FormattedEventWithMeta {
message: Vec<u8>,
level: tracing::Level,
}
impl Default for FormattedEventWithMeta {
fn default() -> Self {
FormattedEventWithMeta {
message: Vec::new(),
level: tracing::Level::DEBUG,
}
}
}
struct EventBuilder<'a> {
event: FormattedEventWithMeta,
maker: &'a Maker,
}
impl std::io::Write for EventBuilder<'_> {
fn write(&mut self, buf: &[u8]) -> std::io::Result<usize> {
self.event.message.write(buf)
}
fn flush(&mut self) -> std::io::Result<()> {
self.maker.send_event(self.event.clone());
Ok(())
}
}
impl Drop for EventBuilder<'_> {
fn drop(&mut self) {
let maker = self.maker;
let event = std::mem::take(&mut self.event);
maker.send_event(event);
}
}
struct Maker {
channel: SyncSender<FormattedEventWithMeta>,
}
impl<'a> MakeWriter<'a> for Maker {
type Writer = EventBuilder<'a>;
fn make_writer(&'a self) -> Self::Writer {
panic!("not expected to be called when make_writer_for is implemented");
}
fn make_writer_for(&'a self, meta: &Metadata<'_>) -> Self::Writer {
EventBuilder {
event: FormattedEventWithMeta {
message: Vec::new(),
level: *meta.level(),
},
maker: self,
}
}
}
impl Maker {
fn send_event(&self, e: FormattedEventWithMeta) {
match self.channel.try_send(e) {
Ok(()) => {
// notify the main thread
callback_set_my_latch();
}
Err(TrySendError::Disconnected(_)) => {}
Err(TrySendError::Full(_)) => {
// TODO: record that some messages were lost
}
}
}
}
/// Simple formatter implementation for tracing_subscriber, which prints the log spans and
/// message part like the default formatter, but no timestamp or error level. The error
/// level is captured separately by `FormattedEventWithMeta', and when the error is
/// printed by the main thread, with PostgreSQL ereport(), it gets a timestamp at that
/// point. (The timestamp printed will therefore lag behind the timestamp on the event
/// here, if the main thread doesn't process the log message promptly)
struct SimpleFormatter;
impl<S, N> FormatEvent<S, N> for SimpleFormatter
where
S: Subscriber + for<'a> LookupSpan<'a>,
N: for<'a> FormatFields<'a> + 'static,
{
fn format_event(
&self,
ctx: &FmtContext<'_, S, N>,
mut writer: Writer<'_>,
event: &Event<'_>,
) -> std::fmt::Result {
// Format all the spans in the event's span context.
if let Some(scope) = ctx.event_scope() {
for span in scope.from_root() {
write!(writer, "{}", span.name())?;
// `FormattedFields` is a formatted representation of the span's fields,
// which is stored in its extensions by the `fmt` layer's `new_span`
// method. The fields will have been formatted by the same field formatter
// that's provided to the event formatter in the `FmtContext`.
let ext = span.extensions();
let fields = &ext
.get::<FormattedFields<N>>()
.expect("will never be `None`");
// Skip formatting the fields if the span had no fields.
if !fields.is_empty() {
write!(writer, "{{{fields}}}")?;
}
write!(writer, ": ")?;
}
}
// Write fields on the event
ctx.field_format().format_fields(writer.by_ref(), event)?;
writeln!(writer)
}
}
impl SimpleFormatter {
fn new() -> Self {
SimpleFormatter {}
}
}

View File

@@ -0,0 +1,40 @@
use std::str::FromStr as _;
use crate::worker_process::lfc_metrics::LfcMetricsCollector;
use utils::id::{TenantId, TimelineId};
pub struct CommunicatorWorkerProcessStruct {
/*** Metrics ***/
pub(crate) lfc_metrics: LfcMetricsCollector,
}
pub(super) async fn init(
tenant_id: String,
timeline_id: String,
) -> CommunicatorWorkerProcessStruct {
let _tenant_id = TenantId::from_str(&tenant_id).expect("invalid tenant ID");
let _timeline_id = TimelineId::from_str(&timeline_id).expect("invalid timeline ID");
CommunicatorWorkerProcessStruct {
// metrics
lfc_metrics: LfcMetricsCollector::new(),
}
}
impl metrics::core::Collector for CommunicatorWorkerProcessStruct {
fn desc(&self) -> Vec<&metrics::core::Desc> {
let mut descs = Vec::new();
descs.append(&mut self.lfc_metrics.desc());
descs
}
fn collect(&self) -> Vec<metrics::proto::MetricFamily> {
let mut values = Vec::new();
values.append(&mut self.lfc_metrics.collect());
values
}
}

View File

@@ -0,0 +1,99 @@
//! Export information about Postgres, the communicator process, file cache etc. as
//! prometheus metrics.
//!
//! The exporter speaks HTTP, listens on a Unix Domain Socket under the Postgres
//! data directory. For debugging, you can access it with curl:
//!
//! curl --unix-socket neon-communicator.socket http://localhost/metrics
//!
use axum::Router;
use axum::body::Body;
use axum::extract::State;
use axum::response::Response;
use http::StatusCode;
use http::header::CONTENT_TYPE;
use metrics::proto::MetricFamily;
use metrics::{Encoder, TextEncoder};
use std::path::PathBuf;
use tokio::net::UnixListener;
use crate::worker_process::main_loop::CommunicatorWorkerProcessStruct;
const NEON_COMMUNICATOR_SOCKET_NAME: &str = "neon-communicator.socket";
impl CommunicatorWorkerProcessStruct {
/// Launch the metrics exporter
pub(crate) async fn launch_metrics_exporter(&'static self) {
use axum::routing::get;
let app = Router::new()
.route("/metrics", get(get_metrics))
.route("/autoscaling_metrics", get(get_autoscaling_metrics))
.route("/debug/panic", get(handle_debug_panic))
.with_state(self);
// Listen on unix domain socket, in the data directory. That should be unique.
let path = PathBuf::from(NEON_COMMUNICATOR_SOCKET_NAME);
let listener = UnixListener::bind(path.clone()).unwrap();
tokio::spawn(async {
tracing::info!("metrics listener spawned");
axum::serve(listener, app).await.unwrap()
});
}
}
/// Expose all Prometheus metrics.
async fn get_metrics(State(state): State<&CommunicatorWorkerProcessStruct>) -> Response {
use metrics::core::Collector;
let metrics = state.collect();
tracing::trace!("/metrics requested");
metrics_to_response(metrics).await
}
/// Expose Prometheus metrics, for use by the autoscaling agent.
///
/// This is a subset of all the metrics.
async fn get_autoscaling_metrics(
State(state): State<&CommunicatorWorkerProcessStruct>,
) -> Response {
use metrics::core::Collector;
let metrics = state.lfc_metrics.collect();
tracing::trace!("/autoscaling_metrics requested");
metrics_to_response(metrics).await
}
async fn handle_debug_panic(State(_state): State<&CommunicatorWorkerProcessStruct>) -> Response {
panic!("test HTTP handler task panic");
}
/// Helper function to convert prometheus metrics to a text response
async fn metrics_to_response(metrics: Vec<MetricFamily>) -> Response {
// When we call TextEncoder::encode() below, it will immediately return an
// error if a metric family has no metrics, so we need to preemptively
// filter out metric families with no metrics.
let metrics = metrics
.into_iter()
.filter(|m| !m.get_metric().is_empty())
.collect::<Vec<MetricFamily>>();
let encoder = TextEncoder::new();
let mut buffer = vec![];
if let Err(e) = encoder.encode(&metrics, &mut buffer) {
Response::builder()
.status(StatusCode::INTERNAL_SERVER_ERROR)
.header(CONTENT_TYPE, "application/text")
.body(Body::from(e.to_string()))
.unwrap()
} else {
Response::builder()
.status(StatusCode::OK)
.header(CONTENT_TYPE, encoder.format_type())
.body(Body::from(buffer))
.unwrap()
}
}

View File

@@ -0,0 +1,13 @@
//! This code runs in the communicator worker process. This provides
//! the glue code to:
//!
//! - launch the main loop,
//! - receive IO requests from backends and process them,
//! - write results back to backends.
mod callbacks;
mod lfc_metrics;
mod logging;
mod main_loop;
mod metrics_exporter;
mod worker_interface;

View File

@@ -0,0 +1,39 @@
//! Functions called from the C code in the worker process
use std::ffi::{CStr, c_char};
use crate::worker_process::main_loop;
use crate::worker_process::main_loop::CommunicatorWorkerProcessStruct;
/// Launch the communicator's tokio tasks, which do most of the work.
///
/// The caller has initialized the process as a regular PostgreSQL background worker
/// process.
#[unsafe(no_mangle)]
pub extern "C" fn communicator_worker_process_launch(
tenant_id: *const c_char,
timeline_id: *const c_char,
) -> &'static CommunicatorWorkerProcessStruct {
// Convert the arguments into more convenient Rust types
let tenant_id = unsafe { CStr::from_ptr(tenant_id) }.to_str().unwrap();
let timeline_id = unsafe { CStr::from_ptr(timeline_id) }.to_str().unwrap();
let runtime = tokio::runtime::Builder::new_multi_thread()
.enable_all()
.thread_name("communicator thread")
.build()
.unwrap();
let worker_struct = runtime.block_on(main_loop::init(
tenant_id.to_string(),
timeline_id.to_string(),
));
let worker_struct = Box::leak(Box::new(worker_struct));
runtime.block_on(worker_struct.launch_metrics_exporter());
// keep the runtime running after we exit this function
Box::leak(Box::new(runtime));
worker_struct
}

View File

@@ -0,0 +1,178 @@
/*-------------------------------------------------------------------------
*
* communicator_process.c
* Functions for starting up the communicator background worker process.
*
* Currently, the communicator process only functions as a metrics
* exporter. It provides an HTTP endpoint for polling a limited set of
* metrics. TODO: In the future, it will do much more, i.e. handle all
* the communications with the pageservers.
*
* Portions Copyright (c) 1996-2021, PostgreSQL Global Development Group
* Portions Copyright (c) 1994, Regents of the University of California
*
*-------------------------------------------------------------------------
*/
#include "postgres.h"
#include <unistd.h>
#include "miscadmin.h"
#include "postmaster/bgworker.h"
#include "postmaster/interrupt.h"
#include "postmaster/postmaster.h"
#include "replication/walsender.h"
#include "storage/ipc.h"
#include "storage/latch.h"
#include "storage/pmsignal.h"
#include "storage/procsignal.h"
#include "tcop/tcopprot.h"
#include "communicator_process.h"
#include "file_cache.h"
#include "neon.h"
#include "neon_perf_counters.h"
/* the rust bindings, generated by cbindgen */
#include "communicator/communicator_bindings.h"
PGDLLEXPORT void communicator_new_bgworker_main(Datum main_arg);
/**** Initialization functions. These run in postmaster ****/
void
register_communicator_bgworker(void)
{
BackgroundWorker bgw;
/* Initialize the background worker process */
memset(&bgw, 0, sizeof(bgw));
bgw.bgw_flags = BGWORKER_SHMEM_ACCESS;
bgw.bgw_start_time = BgWorkerStart_PostmasterStart;
snprintf(bgw.bgw_library_name, BGW_MAXLEN, "neon");
snprintf(bgw.bgw_function_name, BGW_MAXLEN, "communicator_new_bgworker_main");
snprintf(bgw.bgw_name, BGW_MAXLEN, "Storage communicator process");
snprintf(bgw.bgw_type, BGW_MAXLEN, "Storage communicator process");
bgw.bgw_restart_time = 5;
bgw.bgw_notify_pid = 0;
bgw.bgw_main_arg = (Datum) 0;
RegisterBackgroundWorker(&bgw);
}
/**** Worker process functions. These run in the communicator worker process ****/
/* Entry point for the communicator bgworker process */
void
communicator_new_bgworker_main(Datum main_arg)
{
struct LoggingState *logging;
char errbuf[1000];
int elevel;
const struct CommunicatorWorkerProcessStruct *proc_handle;
/*
* Pretend that this process is a WAL sender. That affects the shutdown
* sequence: WAL senders are shut down last, after the final checkpoint
* has been written. That's what we want for the communicator process too.
*/
am_walsender = true;
MarkPostmasterChildWalSender();
/* Establish signal handlers. */
pqsignal(SIGUSR1, procsignal_sigusr1_handler);
/*
* Postmaster sends us SIGUSR2 when all regular backends and bgworkers
* have exited, and it's time for us to exit too
*/
pqsignal(SIGUSR2, die);
pqsignal(SIGHUP, SignalHandlerForConfigReload);
pqsignal(SIGTERM, die);
BackgroundWorkerUnblockSignals();
logging = configure_logging();
proc_handle = communicator_worker_process_launch(
neon_tenant,
neon_timeline);
/* proc_handle is not currently used, but will be in the future */
(void) proc_handle;
/*
* The Rust tokio runtime has been launched, and it's running in the
* background now. This process is now multi-threaded! The Rust threads do
* not call into any Postgres functions.
*
* This loop in the main thread handles any interactions we need with the
* rest of PostgreSQL.
*/
elog(LOG, "communicator threads started");
for (;;)
{
int32 rc;
/*
* Check interrupts like system shutdown or config reload
*/
CHECK_FOR_INTERRUPTS();
if (ConfigReloadPending)
{
ConfigReloadPending = false;
ProcessConfigFile(PGC_SIGHUP);
}
/*
* Forward any log messages from the Rust threads into the normal Postgres
* logging facility.
*/
for (;;)
{
rc = pump_logging(logging, (uint8 *) errbuf, sizeof(errbuf), &elevel);
if (rc == 0)
{
/* nothing to do */
break;
}
else if (rc == 1)
{
/* Because we don't want to exit on error */
if (elevel == ERROR)
elevel = LOG;
if (elevel == INFO)
elevel = LOG;
elog(elevel, "[COMMUNICATOR] %s", errbuf);
}
else if (rc == -1)
{
elog(ERROR, "logging channel was closed unexpectedly");
}
}
/*
* Wait until we are woken up. The rust threads will set the latch if
* there's log message to forward.
*/
(void) WaitLatch(MyLatch,
WL_LATCH_SET | WL_EXIT_ON_PM_DEATH,
0,
PG_WAIT_EXTENSION);
ResetLatch(MyLatch);
}
}
/****
* Callbacks from the rust code, in the communicator process.
*
* NOTE: These must be thread-safe! It's very limited which PostgreSQL
* functions you can use!!!
*
* The signatures of these need to match those in the Rust code.
*/
void
callback_set_my_latch_unsafe(void)
{
SetLatch(MyLatch);
}

View File

@@ -0,0 +1,17 @@
/*-------------------------------------------------------------------------
*
* communicator_process.h
* Communicator process
*
*
* Portions Copyright (c) 1996-2021, PostgreSQL Global Development Group
* Portions Copyright (c) 1994, Regents of the University of California
*
*-------------------------------------------------------------------------
*/
#ifndef COMMUNICATOR_PROCESS_H
#define COMMUNICATOR_PROCESS_H
extern void register_communicator_bgworker(void);
#endif /* COMMUNICATOR_PROCESS_H */

View File

@@ -52,6 +52,8 @@
#include "pagestore_client.h"
#include "communicator.h"
#include "communicator/communicator_bindings.h"
#define CriticalAssert(cond) do if (!(cond)) elog(PANIC, "LFC: assertion %s failed at %s:%d: ", #cond, __FILE__, __LINE__); while (0)
/*
@@ -2179,6 +2181,38 @@ lfc_approximate_working_set_size_seconds(time_t duration, bool reset)
return dc;
}
/*
* Get metrics, for the built-in metrics exporter that's part of the communicator
* process.
*
* NB: This is called from a Rust tokio task inside the communicator process.
* Acquiring lwlocks, elog(), allocating memory etc. or anything else
* non-trivial is strictly prohibited here!
*/
struct LfcMetrics
callback_get_lfc_metrics_unsafe(void)
{
struct LfcMetrics result = {
.lfc_cache_size_limit = lfc_size_limit,
.lfc_hits = lfc_ctl ? lfc_ctl->hits : 0,
.lfc_misses = lfc_ctl ? lfc_ctl->misses : 0,
.lfc_used = lfc_ctl ? lfc_ctl->used : 0,
.lfc_writes = lfc_ctl ? lfc_ctl->writes : 0,
};
if (lfc_ctl)
{
for (int minutes = 1; minutes <= 60; minutes++)
{
result.lfc_approximate_working_set_size_windows[minutes - 1] =
lfc_approximate_working_set_size_seconds(minutes * 60, false);
}
}
return result;
}
PG_FUNCTION_INFO_V1(get_local_cache_state);
Datum

View File

@@ -30,6 +30,7 @@
#include "utils/guc_tables.h"
#include "communicator.h"
#include "communicator_process.h"
#include "extension_server.h"
#include "file_cache.h"
#include "neon.h"
@@ -455,14 +456,13 @@ _PG_init(void)
shmem_startup_hook = neon_shmem_startup_hook;
#endif
/* dummy call to a Rust function in the communicator library, to check that it works */
(void) communicator_dummy(123);
pg_init_libpagestore();
lfc_init();
pg_init_walproposer();
init_lwlsncache();
register_communicator_bgworker();
pg_init_communicator();
Custom_XLogReaderRoutines = NeonOnDemandXLogReaderRoutines;

19
poetry.lock generated
View File

@@ -1,4 +1,4 @@
# This file is automatically @generated by Poetry 2.1.1 and should not be changed by hand.
# This file is automatically @generated by Poetry 2.1.3 and should not be changed by hand.
[[package]]
name = "aiohappyeyeballs"
@@ -3071,6 +3071,21 @@ urllib3 = ">=1.21.1,<3"
socks = ["PySocks (>=1.5.6,!=1.5.7)"]
use-chardet-on-py3 = ["chardet (>=3.0.2,<6)"]
[[package]]
name = "requests-unixsocket"
version = "0.4.1"
description = "Use requests to talk HTTP via a UNIX domain socket"
optional = false
python-versions = ">=3.9"
groups = ["main"]
files = [
{file = "requests_unixsocket-0.4.1-py3-none-any.whl", hash = "sha256:60c4942e9dbecc2f64d611039fb1dfc25da382083c6434ac0316dca3ff908f4d"},
{file = "requests_unixsocket-0.4.1.tar.gz", hash = "sha256:b2596158c356ecee68d27ba469a52211230ac6fb0cde8b66afb19f0ed47a1995"},
]
[package.dependencies]
requests = ">=1.1"
[[package]]
name = "responses"
version = "0.25.3"
@@ -3847,4 +3862,4 @@ cffi = ["cffi (>=1.11)"]
[metadata]
lock-version = "2.1"
python-versions = "^3.11"
content-hash = "bd93313f110110aa53b24a3ed47ba2d7f60e2c658a79cdff7320fed1bb1b57b5"
content-hash = "b741d0b6f7cd3a062dedb8896471b6e7ba20ab1caef82c060506562e19380ad5"

View File

@@ -50,6 +50,7 @@ types-pyyaml = "^6.0.12.20240917"
testcontainers = "^4.9.0"
# Install a release candidate of `jsonnet`, as it supports Python 3.13
jsonnet = "^0.21.0-rc2"
requests-unixsocket = "^0.4.1"
[tool.poetry.group.dev.dependencies]
mypy = "==1.13.0"

View File

@@ -66,6 +66,12 @@ class EndpointHttpClient(requests.Session):
res.raise_for_status()
return res.json()
def autoscaling_metrics(self):
res = self.get(f"http://localhost:{self.external_port}/autoscaling_metrics")
res.raise_for_status()
log.debug("raw compute metrics: %s", res.text)
return res.text
def prewarm_lfc_status(self) -> dict[str, str]:
res = self.get(self.prewarm_url)
res.raise_for_status()

View File

@@ -5417,6 +5417,7 @@ SKIP_FILES = frozenset(
"postmaster.pid",
"pg_control",
"pg_dynshmem",
"neon-communicator.socket",
)
)

View File

@@ -0,0 +1,52 @@
from __future__ import annotations
import os
from typing import TYPE_CHECKING
import pytest
import requests
import requests_unixsocket
from fixtures.metrics import parse_metrics
if TYPE_CHECKING:
from fixtures.neon_fixtures import NeonEnv
NEON_COMMUNICATOR_SOCKET_NAME = "neon-communicator.socket"
def test_communicator_metrics(neon_simple_env: NeonEnv):
"""
Test the communicator's built-in HTTP prometheus exporter
"""
env = neon_simple_env
endpoint = env.endpoints.create("main")
endpoint.start()
os.chdir(endpoint.pgdata_dir)
session = requests_unixsocket.Session()
r = session.get(f"http+unix://{NEON_COMMUNICATOR_SOCKET_NAME}/metrics")
assert r.status_code == 200
# quick test that the endpoint returned something expected. (We don't validate
# that the metrics returned are sensible.)
m = parse_metrics(r.text)
m.query_one("lfc_hits")
m.query_one("lfc_misses")
# Test panic handling. The /debug/panic endpoint raises a Rust panic. It's
# expected to unwind and drop the HTTP connection without response, but not
# kill the process or the server.
with pytest.raises(
requests.ConnectionError, match="Remote end closed connection without response"
):
r = session.get(f"http+unix://{NEON_COMMUNICATOR_SOCKET_NAME}/debug/panic")
assert r.status_code == 500
# Test that subsequent requests after the panic still work.
r = session.get(f"http+unix://{NEON_COMMUNICATOR_SOCKET_NAME}/metrics")
assert r.status_code == 200
m = parse_metrics(r.text)
m.query_one("lfc_hits")
m.query_one("lfc_misses")

View File

@@ -1,11 +1,13 @@
from __future__ import annotations
import time
from logging import debug
from pathlib import Path
from typing import TYPE_CHECKING
import pytest
from fixtures.log_helper import log
from fixtures.metrics import parse_metrics
from fixtures.utils import USE_LFC, query_scalar
if TYPE_CHECKING:
@@ -75,10 +77,22 @@ WITH (fillfactor='100');
cur.execute("SELECT abalance FROM pgbench_accounts WHERE aid = 104242")
cur.execute("SELECT abalance FROM pgbench_accounts WHERE aid = 204242")
# verify working set size after some index access of a few select pages only
blocks = query_scalar(cur, "select approximate_working_set_size(true)")
blocks = query_scalar(cur, "select approximate_working_set_size(false)")
log.info(f"working set size after some index access of a few select pages only {blocks}")
assert blocks < 20
# Also test the metrics from the /autoscaling_metrics endpoint
autoscaling_metrics = endpoint.http_client().autoscaling_metrics()
log.debug(f"Raw metrics: {autoscaling_metrics}")
m = parse_metrics(autoscaling_metrics)
http_estimate = m.query_one("lfc_approximate_working_set_size_windows",
{
"duration_seconds": "60",
},
).value
log.info(f"http estimate: {http_estimate}, blocks: {blocks}")
assert http_estimate > 0 and http_estimate < 20
@pytest.mark.skipif(not USE_LFC, reason="LFC is disabled, skipping")
def test_sliding_working_set_approximation(neon_simple_env: NeonEnv):