mirror of
https://github.com/neondatabase/neon.git
synced 2025-12-22 21:59:59 +00:00
Introduce built-in Prometheus exporter to the Postgres extension (#12591)
Currently, the exporter exposes the same LFC metrics that are exposed by the "autoscaling" sql_exporter in the docker image. With this, we can remove the dedicated sql_exporter instance. (Actually doing the removal is left as a TODO until this is rolled out to production and we have changed autoscaling-agent to fetch the metrics from this new endpoint.) The exporter runs as a Postgres background worker process. This is extracted from the Rust communicator rewrite project, which will use the same worker process for much more, to handle the communications with the pageservers. For now, though, it merely handles the metrics requests. In the future, we will add more metrics, and perhaps even APIs to control the running Postgres instance. The exporter listens on a Unix Domain socket within the Postgres data directory. A Unix Domain socket is a bit unconventional, but it has some advantages: - Permissions are taken care of. Only processes that can access the data directory, and therefore already have full access to the running Postgres instance, can connect to it. - No need to allocate and manage a new port number for the listener It has some downsides too: it's not immediately accessible from the outside world, and the functions to work with Unix Domain sockets are more low-level than TCP sockets (see the symlink hack in `postgres_metrics_client.rs`, for example). To expose the metrics from the local Unix Domain Socket to the autoscaling agent, introduce a new '/autoscaling_metrics' endpoint in the compute_ctl's HTTP server. Currently it merely forwards the request to the Postgres instance, but we could add rate limiting and access control there in the future. --------- Co-authored-by: Conrad Ludgate <conrad@neon.tech>
This commit is contained in:
committed by
GitHub
parent
88bc06f148
commit
8bb45fd5da
12
Cargo.lock
generated
12
Cargo.lock
generated
@@ -1296,8 +1296,14 @@ dependencies = [
|
||||
name = "communicator"
|
||||
version = "0.1.0"
|
||||
dependencies = [
|
||||
"axum",
|
||||
"cbindgen",
|
||||
"neon-shmem",
|
||||
"http 1.3.1",
|
||||
"measured",
|
||||
"tokio",
|
||||
"tracing",
|
||||
"tracing-subscriber",
|
||||
"utils",
|
||||
"workspace_hack",
|
||||
]
|
||||
|
||||
@@ -1341,6 +1347,9 @@ dependencies = [
|
||||
"futures",
|
||||
"hostname-validator",
|
||||
"http 1.3.1",
|
||||
"http-body-util",
|
||||
"hyper 1.4.1",
|
||||
"hyper-util",
|
||||
"indexmap 2.9.0",
|
||||
"itertools 0.10.5",
|
||||
"jsonwebtoken",
|
||||
@@ -1363,6 +1372,7 @@ dependencies = [
|
||||
"ring",
|
||||
"rlimit",
|
||||
"rust-ini",
|
||||
"scopeguard",
|
||||
"serde",
|
||||
"serde_json",
|
||||
"serde_with",
|
||||
|
||||
@@ -27,7 +27,10 @@ fail.workspace = true
|
||||
flate2.workspace = true
|
||||
futures.workspace = true
|
||||
http.workspace = true
|
||||
http-body-util.workspace = true
|
||||
hostname-validator = "1.1"
|
||||
hyper.workspace = true
|
||||
hyper-util.workspace = true
|
||||
indexmap.workspace = true
|
||||
itertools.workspace = true
|
||||
jsonwebtoken.workspace = true
|
||||
@@ -44,6 +47,7 @@ postgres.workspace = true
|
||||
regex.workspace = true
|
||||
reqwest = { workspace = true, features = ["json"] }
|
||||
ring = "0.17"
|
||||
scopeguard.workspace = true
|
||||
serde.workspace = true
|
||||
serde_with.workspace = true
|
||||
serde_json.workspace = true
|
||||
|
||||
98
compute_tools/src/communicator_socket_client.rs
Normal file
98
compute_tools/src/communicator_socket_client.rs
Normal file
@@ -0,0 +1,98 @@
|
||||
//! Client for making request to a running Postgres server's communicator control socket.
|
||||
//!
|
||||
//! The storage communicator process that runs inside Postgres exposes an HTTP endpoint in
|
||||
//! a Unix Domain Socket in the Postgres data directory. This provides access to it.
|
||||
|
||||
use std::path::Path;
|
||||
|
||||
use anyhow::Context;
|
||||
use hyper::client::conn::http1::SendRequest;
|
||||
use hyper_util::rt::TokioIo;
|
||||
|
||||
/// Name of the socket within the Postgres data directory. This better match that in
|
||||
/// `pgxn/neon/communicator/src/lib.rs`.
|
||||
const NEON_COMMUNICATOR_SOCKET_NAME: &str = "neon-communicator.socket";
|
||||
|
||||
/// Open a connection to the communicator's control socket, prepare to send requests to it
|
||||
/// with hyper.
|
||||
pub async fn connect_communicator_socket<B>(pgdata: &Path) -> anyhow::Result<SendRequest<B>>
|
||||
where
|
||||
B: hyper::body::Body + 'static + Send,
|
||||
B::Data: Send,
|
||||
B::Error: Into<Box<dyn std::error::Error + Send + Sync>>,
|
||||
{
|
||||
let socket_path = pgdata.join(NEON_COMMUNICATOR_SOCKET_NAME);
|
||||
let socket_path_len = socket_path.display().to_string().len();
|
||||
|
||||
// There is a limit of around 100 bytes (108 on Linux?) on the length of the path to a
|
||||
// Unix Domain socket. The limit is on the connect(2) function used to open the
|
||||
// socket, not on the absolute path itself. Postgres changes the current directory to
|
||||
// the data directory and uses a relative path to bind to the socket, and the relative
|
||||
// path "./neon-communicator.socket" is always short, but when compute_ctl needs to
|
||||
// open the socket, we need to use a full path, which can be arbitrarily long.
|
||||
//
|
||||
// There are a few ways we could work around this:
|
||||
//
|
||||
// 1. Change the current directory to the Postgres data directory and use a relative
|
||||
// path in the connect(2) call. That's problematic because the current directory
|
||||
// applies to the whole process. We could change the current directory early in
|
||||
// compute_ctl startup, and that might be a good idea anyway for other reasons too:
|
||||
// it would be more robust if the data directory is moved around or unlinked for
|
||||
// some reason, and you would be less likely to accidentally litter other parts of
|
||||
// the filesystem with e.g. temporary files. However, that's a pretty invasive
|
||||
// change.
|
||||
//
|
||||
// 2. On Linux, you could open() the data directory, and refer to the the socket
|
||||
// inside it as "/proc/self/fd/<fd>/neon-communicator.socket". But that's
|
||||
// Linux-only.
|
||||
//
|
||||
// 3. Create a symbolic link to the socket with a shorter path, and use that.
|
||||
//
|
||||
// We use the symbolic link approach here. Hopefully the paths we use in production
|
||||
// are shorter, so that we can open the socket directly, so that this hack is needed
|
||||
// only in development.
|
||||
let connect_result = if socket_path_len < 100 {
|
||||
// We can open the path directly with no hacks.
|
||||
tokio::net::UnixStream::connect(socket_path).await
|
||||
} else {
|
||||
// The path to the socket is too long. Create a symlink to it with a shorter path.
|
||||
let short_path = std::env::temp_dir().join(format!(
|
||||
"compute_ctl.short-socket.{}.{}",
|
||||
std::process::id(),
|
||||
tokio::task::id()
|
||||
));
|
||||
std::os::unix::fs::symlink(&socket_path, &short_path)?;
|
||||
|
||||
// Delete the symlink as soon as we have connected to it. There's a small chance
|
||||
// of leaking if the process dies before we remove it, so try to keep that window
|
||||
// as small as possible.
|
||||
scopeguard::defer! {
|
||||
if let Err(err) = std::fs::remove_file(&short_path) {
|
||||
tracing::warn!("could not remove symlink \"{}\" created for socket: {}",
|
||||
short_path.display(), err);
|
||||
}
|
||||
}
|
||||
|
||||
tracing::info!(
|
||||
"created symlink \"{}\" for socket \"{}\", opening it now",
|
||||
short_path.display(),
|
||||
socket_path.display()
|
||||
);
|
||||
|
||||
tokio::net::UnixStream::connect(&short_path).await
|
||||
};
|
||||
|
||||
let stream = connect_result.context("connecting to communicator control socket")?;
|
||||
|
||||
let io = TokioIo::new(stream);
|
||||
let (request_sender, connection) = hyper::client::conn::http1::handshake(io).await?;
|
||||
|
||||
// spawn a task to poll the connection and drive the HTTP state
|
||||
tokio::spawn(async move {
|
||||
if let Err(err) = connection.await {
|
||||
eprintln!("Error in connection: {err}");
|
||||
}
|
||||
});
|
||||
|
||||
Ok(request_sender)
|
||||
}
|
||||
@@ -1,10 +1,18 @@
|
||||
use std::path::Path;
|
||||
use std::sync::Arc;
|
||||
|
||||
use anyhow::Context;
|
||||
use axum::body::Body;
|
||||
use axum::extract::State;
|
||||
use axum::response::Response;
|
||||
use http::StatusCode;
|
||||
use http::header::CONTENT_TYPE;
|
||||
use http_body_util::BodyExt;
|
||||
use hyper::{Request, StatusCode};
|
||||
use metrics::proto::MetricFamily;
|
||||
use metrics::{Encoder, TextEncoder};
|
||||
|
||||
use crate::communicator_socket_client::connect_communicator_socket;
|
||||
use crate::compute::ComputeNode;
|
||||
use crate::http::JsonResponse;
|
||||
use crate::metrics::collect;
|
||||
|
||||
@@ -31,3 +39,42 @@ pub(in crate::http) async fn get_metrics() -> Response {
|
||||
.body(Body::from(buffer))
|
||||
.unwrap()
|
||||
}
|
||||
|
||||
/// Fetch and forward metrics from the Postgres neon extension's metrics
|
||||
/// exporter that are used by autoscaling-agent.
|
||||
///
|
||||
/// The neon extension exposes these metrics over a Unix domain socket
|
||||
/// in the data directory. That's not accessible directly from the outside
|
||||
/// world, so we have this endpoint in compute_ctl to expose it
|
||||
pub(in crate::http) async fn get_autoscaling_metrics(
|
||||
State(compute): State<Arc<ComputeNode>>,
|
||||
) -> Result<Response, Response> {
|
||||
let pgdata = Path::new(&compute.params.pgdata);
|
||||
|
||||
// Connect to the communicator process's metrics socket
|
||||
let mut metrics_client = connect_communicator_socket(pgdata)
|
||||
.await
|
||||
.map_err(|e| JsonResponse::error(StatusCode::INTERNAL_SERVER_ERROR, format!("{e:#}")))?;
|
||||
|
||||
// Make a request for /autoscaling_metrics
|
||||
let request = Request::builder()
|
||||
.method("GET")
|
||||
.uri("/autoscaling_metrics")
|
||||
.header("Host", "localhost") // hyper requires Host, even though the server won't care
|
||||
.body(Body::from(""))
|
||||
.unwrap();
|
||||
let resp = metrics_client
|
||||
.send_request(request)
|
||||
.await
|
||||
.context("fetching metrics from Postgres metrics service")
|
||||
.map_err(|e| JsonResponse::error(StatusCode::INTERNAL_SERVER_ERROR, format!("{e:#}")))?;
|
||||
|
||||
// Build a response that just forwards the response we got.
|
||||
let mut response = Response::builder();
|
||||
response = response.status(resp.status());
|
||||
if let Some(content_type) = resp.headers().get(CONTENT_TYPE) {
|
||||
response = response.header(CONTENT_TYPE, content_type);
|
||||
}
|
||||
let body = tonic::service::AxumBody::from_stream(resp.into_body().into_data_stream());
|
||||
Ok(response.body(body).unwrap())
|
||||
}
|
||||
|
||||
@@ -81,8 +81,12 @@ impl From<&Server> for Router<Arc<ComputeNode>> {
|
||||
Server::External {
|
||||
config, compute_id, ..
|
||||
} => {
|
||||
let unauthenticated_router =
|
||||
Router::<Arc<ComputeNode>>::new().route("/metrics", get(metrics::get_metrics));
|
||||
let unauthenticated_router = Router::<Arc<ComputeNode>>::new()
|
||||
.route("/metrics", get(metrics::get_metrics))
|
||||
.route(
|
||||
"/autoscaling_metrics",
|
||||
get(metrics::get_autoscaling_metrics),
|
||||
);
|
||||
|
||||
let authenticated_router = Router::<Arc<ComputeNode>>::new()
|
||||
.route("/lfc/prewarm", get(lfc::prewarm_state).post(lfc::prewarm))
|
||||
|
||||
@@ -4,6 +4,7 @@
|
||||
#![deny(clippy::undocumented_unsafe_blocks)]
|
||||
|
||||
pub mod checker;
|
||||
pub mod communicator_socket_client;
|
||||
pub mod config;
|
||||
pub mod configurator;
|
||||
pub mod http;
|
||||
|
||||
@@ -5,6 +5,7 @@ MODULE_big = neon
|
||||
OBJS = \
|
||||
$(WIN32RES) \
|
||||
communicator.o \
|
||||
communicator_process.o \
|
||||
extension_server.o \
|
||||
file_cache.o \
|
||||
hll.o \
|
||||
@@ -29,6 +30,11 @@ PG_CPPFLAGS = -I$(libpq_srcdir)
|
||||
SHLIB_LINK_INTERNAL = $(libpq)
|
||||
SHLIB_LINK = -lcurl
|
||||
|
||||
UNAME_S := $(shell uname -s)
|
||||
ifeq ($(UNAME_S), Darwin)
|
||||
SHLIB_LINK += -framework Security -framework CoreFoundation -framework SystemConfiguration
|
||||
endif
|
||||
|
||||
EXTENSION = neon
|
||||
DATA = \
|
||||
neon--1.0.sql \
|
||||
@@ -57,7 +63,8 @@ WALPROP_OBJS = \
|
||||
|
||||
# libcommunicator.a is built by cargo from the Rust sources under communicator/
|
||||
# subdirectory. `cargo build` also generates communicator_bindings.h.
|
||||
neon.o: communicator/communicator_bindings.h
|
||||
communicator_process.o: communicator/communicator_bindings.h
|
||||
file_cache.o: communicator/communicator_bindings.h
|
||||
|
||||
$(NEON_CARGO_ARTIFACT_TARGET_DIR)/libcommunicator.a communicator/communicator_bindings.h &:
|
||||
(cd $(srcdir)/communicator && cargo build $(CARGO_BUILD_FLAGS) $(CARGO_PROFILE))
|
||||
|
||||
@@ -16,7 +16,14 @@ testing = []
|
||||
rest_broker = []
|
||||
|
||||
[dependencies]
|
||||
neon-shmem.workspace = true
|
||||
axum.workspace = true
|
||||
http.workspace = true
|
||||
tokio = { workspace = true, features = ["macros", "net", "io-util", "rt", "rt-multi-thread"] }
|
||||
tracing.workspace = true
|
||||
tracing-subscriber.workspace = true
|
||||
|
||||
measured.workspace = true
|
||||
utils.workspace = true
|
||||
workspace_hack = { version = "0.1", path = "../../../workspace_hack" }
|
||||
|
||||
[build-dependencies]
|
||||
|
||||
@@ -1,7 +1,22 @@
|
||||
This package will evolve into a "compute-pageserver communicator"
|
||||
process and machinery. For now, it's just a dummy that doesn't do
|
||||
anything interesting, but it allows us to test the compilation and
|
||||
linking of Rust code into the Postgres extensions.
|
||||
# Communicator
|
||||
|
||||
This package provides the so-called "compute-pageserver communicator",
|
||||
or just "communicator" in short. The communicator is a separate
|
||||
background worker process that runs in the PostgreSQL server. It's
|
||||
part of the neon extension. Currently, it only provides an HTTP
|
||||
endpoint for metrics, but in the future it will evolve to handle all
|
||||
communications with the pageservers.
|
||||
|
||||
## Source code view
|
||||
|
||||
pgxn/neon/communicator_process.c
|
||||
Contains code needed to start up the communicator process, and
|
||||
the glue that interacts with PostgreSQL code and the Rust
|
||||
code in the communicator process.
|
||||
|
||||
|
||||
pgxn/neon/communicator/src/worker_process/
|
||||
Worker process main loop and glue code
|
||||
|
||||
At compilation time, pgxn/neon/communicator/ produces a static
|
||||
library, libcommunicator.a. It is linked to the neon.so extension
|
||||
|
||||
@@ -1,6 +1,5 @@
|
||||
/// dummy function, just to test linking Rust functions into the C
|
||||
/// extension
|
||||
#[unsafe(no_mangle)]
|
||||
pub extern "C" fn communicator_dummy(arg: u32) -> u32 {
|
||||
arg + 1
|
||||
}
|
||||
mod worker_process;
|
||||
|
||||
/// Name of the Unix Domain Socket that serves the metrics, and other APIs in the
|
||||
/// future. This is within the Postgres data directory.
|
||||
const NEON_COMMUNICATOR_SOCKET_NAME: &str = "neon-communicator.socket";
|
||||
|
||||
51
pgxn/neon/communicator/src/worker_process/callbacks.rs
Normal file
51
pgxn/neon/communicator/src/worker_process/callbacks.rs
Normal file
@@ -0,0 +1,51 @@
|
||||
//! C callbacks to PostgreSQL facilities that the neon extension needs to provide. These
|
||||
//! are implemented in `neon/pgxn/communicator_process.c`. The function signatures better
|
||||
//! match!
|
||||
//!
|
||||
//! These are called from the communicator threads! Careful what you do, most Postgres
|
||||
//! functions are not safe to call in that context.
|
||||
|
||||
#[cfg(not(test))]
|
||||
unsafe extern "C" {
|
||||
pub fn callback_set_my_latch_unsafe();
|
||||
pub fn callback_get_lfc_metrics_unsafe() -> LfcMetrics;
|
||||
}
|
||||
|
||||
// Compile unit tests with dummy versions of the functions. Unit tests cannot call back
|
||||
// into the C code. (As of this writing, no unit tests even exists in the communicator
|
||||
// package, but the code coverage build still builds these and tries to link with the
|
||||
// external C code.)
|
||||
#[cfg(test)]
|
||||
unsafe fn callback_set_my_latch_unsafe() {
|
||||
panic!("not usable in unit tests");
|
||||
}
|
||||
#[cfg(test)]
|
||||
unsafe fn callback_get_lfc_metrics_unsafe() -> LfcMetrics {
|
||||
panic!("not usable in unit tests");
|
||||
}
|
||||
|
||||
// safe wrappers
|
||||
|
||||
pub(super) fn callback_set_my_latch() {
|
||||
unsafe { callback_set_my_latch_unsafe() };
|
||||
}
|
||||
|
||||
pub(super) fn callback_get_lfc_metrics() -> LfcMetrics {
|
||||
unsafe { callback_get_lfc_metrics_unsafe() }
|
||||
}
|
||||
|
||||
/// Return type of the callback_get_lfc_metrics() function.
|
||||
#[repr(C)]
|
||||
pub struct LfcMetrics {
|
||||
pub lfc_cache_size_limit: i64,
|
||||
pub lfc_hits: i64,
|
||||
pub lfc_misses: i64,
|
||||
pub lfc_used: i64,
|
||||
pub lfc_writes: i64,
|
||||
|
||||
// working set size looking back 1..60 minutes.
|
||||
//
|
||||
// Index 0 is the size of the working set accessed within last 1 minute,
|
||||
// index 59 is the size of the working set accessed within last 60 minutes.
|
||||
pub lfc_approximate_working_set_size_windows: [i64; 60],
|
||||
}
|
||||
102
pgxn/neon/communicator/src/worker_process/control_socket.rs
Normal file
102
pgxn/neon/communicator/src/worker_process/control_socket.rs
Normal file
@@ -0,0 +1,102 @@
|
||||
//! Communicator control socket.
|
||||
//!
|
||||
//! Currently, the control socket is used to provide information about the communicator
|
||||
//! process, file cache etc. as prometheus metrics. In the future, it can be used to
|
||||
//! expose more things.
|
||||
//!
|
||||
//! The exporter speaks HTTP, listens on a Unix Domain Socket under the Postgres
|
||||
//! data directory. For debugging, you can access it with curl:
|
||||
//!
|
||||
//! ```sh
|
||||
//! curl --unix-socket neon-communicator.socket http://localhost/metrics
|
||||
//! ```
|
||||
//!
|
||||
use axum::Router;
|
||||
use axum::body::Body;
|
||||
use axum::extract::State;
|
||||
use axum::response::Response;
|
||||
use http::StatusCode;
|
||||
use http::header::CONTENT_TYPE;
|
||||
|
||||
use measured::MetricGroup;
|
||||
use measured::text::BufferedTextEncoder;
|
||||
|
||||
use std::io::ErrorKind;
|
||||
|
||||
use tokio::net::UnixListener;
|
||||
|
||||
use crate::NEON_COMMUNICATOR_SOCKET_NAME;
|
||||
use crate::worker_process::main_loop::CommunicatorWorkerProcessStruct;
|
||||
|
||||
impl CommunicatorWorkerProcessStruct {
|
||||
/// Launch the listener
|
||||
pub(crate) async fn launch_control_socket_listener(
|
||||
&'static self,
|
||||
) -> Result<(), std::io::Error> {
|
||||
use axum::routing::get;
|
||||
let app = Router::new()
|
||||
.route("/metrics", get(get_metrics))
|
||||
.route("/autoscaling_metrics", get(get_autoscaling_metrics))
|
||||
.route("/debug/panic", get(handle_debug_panic))
|
||||
.with_state(self);
|
||||
|
||||
// If the server is restarted, there might be an old socket still
|
||||
// lying around. Remove it first.
|
||||
match std::fs::remove_file(NEON_COMMUNICATOR_SOCKET_NAME) {
|
||||
Ok(()) => {
|
||||
tracing::warn!("removed stale control socket");
|
||||
}
|
||||
Err(e) if e.kind() == ErrorKind::NotFound => {}
|
||||
Err(e) => {
|
||||
tracing::error!("could not remove stale control socket: {e:#}");
|
||||
// Try to proceed anyway. It will likely fail below though.
|
||||
}
|
||||
};
|
||||
|
||||
// Create the unix domain socket and start listening on it
|
||||
let listener = UnixListener::bind(NEON_COMMUNICATOR_SOCKET_NAME)?;
|
||||
|
||||
tokio::spawn(async {
|
||||
tracing::info!("control socket listener spawned");
|
||||
axum::serve(listener, app)
|
||||
.await
|
||||
.expect("axum::serve never returns")
|
||||
});
|
||||
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
/// Expose all Prometheus metrics.
|
||||
async fn get_metrics(State(state): State<&CommunicatorWorkerProcessStruct>) -> Response {
|
||||
tracing::trace!("/metrics requested");
|
||||
metrics_to_response(&state).await
|
||||
}
|
||||
|
||||
/// Expose Prometheus metrics, for use by the autoscaling agent.
|
||||
///
|
||||
/// This is a subset of all the metrics.
|
||||
async fn get_autoscaling_metrics(
|
||||
State(state): State<&CommunicatorWorkerProcessStruct>,
|
||||
) -> Response {
|
||||
tracing::trace!("/metrics requested");
|
||||
metrics_to_response(&state.lfc_metrics).await
|
||||
}
|
||||
|
||||
async fn handle_debug_panic(State(_state): State<&CommunicatorWorkerProcessStruct>) -> Response {
|
||||
panic!("test HTTP handler task panic");
|
||||
}
|
||||
|
||||
/// Helper function to convert prometheus metrics to a text response
|
||||
async fn metrics_to_response(metrics: &(dyn MetricGroup<BufferedTextEncoder> + Sync)) -> Response {
|
||||
let mut enc = BufferedTextEncoder::new();
|
||||
metrics
|
||||
.collect_group_into(&mut enc)
|
||||
.unwrap_or_else(|never| match never {});
|
||||
|
||||
Response::builder()
|
||||
.status(StatusCode::OK)
|
||||
.header(CONTENT_TYPE, "application/text")
|
||||
.body(Body::from(enc.finish()))
|
||||
.unwrap()
|
||||
}
|
||||
83
pgxn/neon/communicator/src/worker_process/lfc_metrics.rs
Normal file
83
pgxn/neon/communicator/src/worker_process/lfc_metrics.rs
Normal file
@@ -0,0 +1,83 @@
|
||||
use measured::{
|
||||
FixedCardinalityLabel, Gauge, GaugeVec, LabelGroup, MetricGroup,
|
||||
label::{LabelName, LabelValue, StaticLabelSet},
|
||||
metric::{MetricEncoding, gauge::GaugeState, group::Encoding},
|
||||
};
|
||||
|
||||
use super::callbacks::callback_get_lfc_metrics;
|
||||
|
||||
pub(crate) struct LfcMetricsCollector;
|
||||
|
||||
#[derive(MetricGroup)]
|
||||
#[metric(new())]
|
||||
struct LfcMetricsGroup {
|
||||
/// LFC cache size limit in bytes
|
||||
lfc_cache_size_limit: Gauge,
|
||||
/// LFC cache hits
|
||||
lfc_hits: Gauge,
|
||||
/// LFC cache misses
|
||||
lfc_misses: Gauge,
|
||||
/// LFC chunks used (chunk = 1MB)
|
||||
lfc_used: Gauge,
|
||||
/// LFC cache writes
|
||||
lfc_writes: Gauge,
|
||||
/// Approximate working set size in pages of 8192 bytes
|
||||
#[metric(init = GaugeVec::dense())]
|
||||
lfc_approximate_working_set_size_windows: GaugeVec<StaticLabelSet<MinuteAsSeconds>>,
|
||||
}
|
||||
|
||||
impl<T: Encoding> MetricGroup<T> for LfcMetricsCollector
|
||||
where
|
||||
GaugeState: MetricEncoding<T>,
|
||||
{
|
||||
fn collect_group_into(&self, enc: &mut T) -> Result<(), <T as Encoding>::Err> {
|
||||
let g = LfcMetricsGroup::new();
|
||||
|
||||
let lfc_metrics = callback_get_lfc_metrics();
|
||||
|
||||
g.lfc_cache_size_limit.set(lfc_metrics.lfc_cache_size_limit);
|
||||
g.lfc_hits.set(lfc_metrics.lfc_hits);
|
||||
g.lfc_misses.set(lfc_metrics.lfc_misses);
|
||||
g.lfc_used.set(lfc_metrics.lfc_used);
|
||||
g.lfc_writes.set(lfc_metrics.lfc_writes);
|
||||
|
||||
for i in 0..60 {
|
||||
let val = lfc_metrics.lfc_approximate_working_set_size_windows[i];
|
||||
g.lfc_approximate_working_set_size_windows
|
||||
.set(MinuteAsSeconds(i), val);
|
||||
}
|
||||
|
||||
g.collect_group_into(enc)
|
||||
}
|
||||
}
|
||||
|
||||
/// This stores the values in range 0..60,
|
||||
/// encodes them as seconds (60, 120, 180, ..., 3600)
|
||||
#[derive(Clone, Copy)]
|
||||
struct MinuteAsSeconds(usize);
|
||||
|
||||
impl FixedCardinalityLabel for MinuteAsSeconds {
|
||||
fn cardinality() -> usize {
|
||||
60
|
||||
}
|
||||
|
||||
fn encode(&self) -> usize {
|
||||
self.0
|
||||
}
|
||||
|
||||
fn decode(value: usize) -> Self {
|
||||
Self(value)
|
||||
}
|
||||
}
|
||||
|
||||
impl LabelValue for MinuteAsSeconds {
|
||||
fn visit<V: measured::label::LabelVisitor>(&self, v: V) -> V::Output {
|
||||
v.write_int((self.0 + 1) as i64 * 60)
|
||||
}
|
||||
}
|
||||
|
||||
impl LabelGroup for MinuteAsSeconds {
|
||||
fn visit_values(&self, v: &mut impl measured::label::LabelGroupVisitor) {
|
||||
v.write_value(LabelName::from_str("duration_seconds"), self);
|
||||
}
|
||||
}
|
||||
250
pgxn/neon/communicator/src/worker_process/logging.rs
Normal file
250
pgxn/neon/communicator/src/worker_process/logging.rs
Normal file
@@ -0,0 +1,250 @@
|
||||
//! Glue code to hook up Rust logging with the `tracing` crate to the PostgreSQL log
|
||||
//!
|
||||
//! In the Rust threads, the log messages are written to a mpsc Channel, and the Postgres
|
||||
//! process latch is raised. That wakes up the loop in the main thread, see
|
||||
//! `communicator_new_bgworker_main()`. It reads the message from the channel and
|
||||
//! ereport()s it. This ensures that only one thread, the main thread, calls the
|
||||
//! PostgreSQL logging routines at any time.
|
||||
|
||||
use std::ffi::c_char;
|
||||
use std::sync::atomic::{AtomicU64, Ordering};
|
||||
use std::sync::mpsc::sync_channel;
|
||||
use std::sync::mpsc::{Receiver, SyncSender};
|
||||
use std::sync::mpsc::{TryRecvError, TrySendError};
|
||||
|
||||
use tracing::info;
|
||||
use tracing::{Event, Level, Metadata, Subscriber};
|
||||
use tracing_subscriber::filter::LevelFilter;
|
||||
use tracing_subscriber::fmt::format::Writer;
|
||||
use tracing_subscriber::fmt::{FmtContext, FormatEvent, FormatFields, FormattedFields, MakeWriter};
|
||||
use tracing_subscriber::registry::LookupSpan;
|
||||
|
||||
use crate::worker_process::callbacks::callback_set_my_latch;
|
||||
|
||||
/// This handle is passed to the C code, and used by [`communicator_worker_poll_logging`]
|
||||
pub struct LoggingReceiver {
|
||||
receiver: Receiver<FormattedEventWithMeta>,
|
||||
}
|
||||
|
||||
/// This is passed to `tracing`
|
||||
struct LoggingSender {
|
||||
sender: SyncSender<FormattedEventWithMeta>,
|
||||
}
|
||||
|
||||
static DROPPED_EVENT_COUNT: AtomicU64 = AtomicU64::new(0);
|
||||
|
||||
/// Called once, at worker process startup. The returned LoggingState is passed back
|
||||
/// in the subsequent calls to `pump_logging`. It is opaque to the C code.
|
||||
#[unsafe(no_mangle)]
|
||||
pub extern "C" fn communicator_worker_configure_logging() -> Box<LoggingReceiver> {
|
||||
let (sender, receiver) = sync_channel(1000);
|
||||
|
||||
let receiver = LoggingReceiver { receiver };
|
||||
let sender = LoggingSender { sender };
|
||||
|
||||
use tracing_subscriber::prelude::*;
|
||||
let r = tracing_subscriber::registry();
|
||||
|
||||
let r = r.with(
|
||||
tracing_subscriber::fmt::layer()
|
||||
.with_ansi(false)
|
||||
.event_format(SimpleFormatter)
|
||||
.with_writer(sender)
|
||||
// TODO: derive this from log_min_messages? Currently the code in
|
||||
// communicator_process.c forces log_min_messages='INFO'.
|
||||
.with_filter(LevelFilter::from_level(Level::INFO)),
|
||||
);
|
||||
r.init();
|
||||
|
||||
info!("communicator process logging started");
|
||||
|
||||
Box::new(receiver)
|
||||
}
|
||||
|
||||
/// Read one message from the logging queue. This is essentially a wrapper to Receiver,
|
||||
/// with a C-friendly signature.
|
||||
///
|
||||
/// The message is copied into *errbuf, which is a caller-supplied buffer of size
|
||||
/// `errbuf_len`. If the message doesn't fit in the buffer, it is truncated. It is always
|
||||
/// NULL-terminated.
|
||||
///
|
||||
/// The error level is returned *elevel_p. It's one of the PostgreSQL error levels, see
|
||||
/// elog.h
|
||||
///
|
||||
/// If there was a message, *dropped_event_count_p is also updated with a counter of how
|
||||
/// many log messages in total has been dropped. By comparing that with the value from
|
||||
/// previous call, you can tell how many were dropped since last call.
|
||||
///
|
||||
/// Returns:
|
||||
///
|
||||
/// 0 if there were no messages
|
||||
/// 1 if there was a message. The message and its level are returned in
|
||||
/// *errbuf and *elevel_p. *dropped_event_count_p is also updated.
|
||||
/// -1 on error, i.e the other end of the queue was disconnected
|
||||
#[unsafe(no_mangle)]
|
||||
pub extern "C" fn communicator_worker_poll_logging(
|
||||
state: &mut LoggingReceiver,
|
||||
errbuf: *mut c_char,
|
||||
errbuf_len: u32,
|
||||
elevel_p: &mut i32,
|
||||
dropped_event_count_p: &mut u64,
|
||||
) -> i32 {
|
||||
let msg = match state.receiver.try_recv() {
|
||||
Err(TryRecvError::Empty) => return 0,
|
||||
Err(TryRecvError::Disconnected) => return -1,
|
||||
Ok(msg) => msg,
|
||||
};
|
||||
|
||||
let src: &[u8] = &msg.message;
|
||||
let dst: *mut u8 = errbuf.cast();
|
||||
let len = std::cmp::min(src.len(), errbuf_len as usize - 1);
|
||||
unsafe {
|
||||
std::ptr::copy_nonoverlapping(src.as_ptr(), dst, len);
|
||||
*(dst.add(len)) = b'\0'; // NULL terminator
|
||||
}
|
||||
|
||||
// Map the tracing Level to PostgreSQL elevel.
|
||||
//
|
||||
// XXX: These levels are copied from PostgreSQL's elog.h. Introduce another enum to
|
||||
// hide these?
|
||||
*elevel_p = match msg.level {
|
||||
Level::TRACE => 10, // DEBUG5
|
||||
Level::DEBUG => 14, // DEBUG1
|
||||
Level::INFO => 17, // INFO
|
||||
Level::WARN => 19, // WARNING
|
||||
Level::ERROR => 21, // ERROR
|
||||
};
|
||||
|
||||
*dropped_event_count_p = DROPPED_EVENT_COUNT.load(Ordering::Relaxed);
|
||||
|
||||
1
|
||||
}
|
||||
|
||||
//---- The following functions can be called from any thread ----
|
||||
|
||||
#[derive(Clone)]
|
||||
struct FormattedEventWithMeta {
|
||||
message: Vec<u8>,
|
||||
level: tracing::Level,
|
||||
}
|
||||
|
||||
impl Default for FormattedEventWithMeta {
|
||||
fn default() -> Self {
|
||||
FormattedEventWithMeta {
|
||||
message: Vec::new(),
|
||||
level: tracing::Level::DEBUG,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
struct EventBuilder<'a> {
|
||||
event: FormattedEventWithMeta,
|
||||
|
||||
sender: &'a LoggingSender,
|
||||
}
|
||||
|
||||
impl std::io::Write for EventBuilder<'_> {
|
||||
fn write(&mut self, buf: &[u8]) -> std::io::Result<usize> {
|
||||
self.event.message.write(buf)
|
||||
}
|
||||
fn flush(&mut self) -> std::io::Result<()> {
|
||||
self.sender.send_event(self.event.clone());
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
impl Drop for EventBuilder<'_> {
|
||||
fn drop(&mut self) {
|
||||
let sender = self.sender;
|
||||
let event = std::mem::take(&mut self.event);
|
||||
|
||||
sender.send_event(event);
|
||||
}
|
||||
}
|
||||
|
||||
impl<'a> MakeWriter<'a> for LoggingSender {
|
||||
type Writer = EventBuilder<'a>;
|
||||
|
||||
fn make_writer(&'a self) -> Self::Writer {
|
||||
panic!("not expected to be called when make_writer_for is implemented");
|
||||
}
|
||||
|
||||
fn make_writer_for(&'a self, meta: &Metadata<'_>) -> Self::Writer {
|
||||
EventBuilder {
|
||||
event: FormattedEventWithMeta {
|
||||
message: Vec::new(),
|
||||
level: *meta.level(),
|
||||
},
|
||||
sender: self,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl LoggingSender {
|
||||
fn send_event(&self, e: FormattedEventWithMeta) {
|
||||
match self.sender.try_send(e) {
|
||||
Ok(()) => {
|
||||
// notify the main thread
|
||||
callback_set_my_latch();
|
||||
}
|
||||
Err(TrySendError::Disconnected(_)) => {}
|
||||
Err(TrySendError::Full(_)) => {
|
||||
// The queue is full, cannot send any more. To avoid blocking the tokio
|
||||
// thread, simply drop the message. Better to lose some logs than get
|
||||
// stuck if there's a problem with the logging.
|
||||
//
|
||||
// Record the fact that was a message was dropped by incrementing the
|
||||
// counter.
|
||||
DROPPED_EVENT_COUNT.fetch_add(1, Ordering::Relaxed);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Simple formatter implementation for tracing_subscriber, which prints the log spans and
|
||||
/// message part like the default formatter, but no timestamp or error level. The error
|
||||
/// level is captured separately by `FormattedEventWithMeta', and when the error is
|
||||
/// printed by the main thread, with PostgreSQL ereport(), it gets a timestamp at that
|
||||
/// point. (The timestamp printed will therefore lag behind the timestamp on the event
|
||||
/// here, if the main thread doesn't process the log message promptly)
|
||||
struct SimpleFormatter;
|
||||
|
||||
impl<S, N> FormatEvent<S, N> for SimpleFormatter
|
||||
where
|
||||
S: Subscriber + for<'a> LookupSpan<'a>,
|
||||
N: for<'a> FormatFields<'a> + 'static,
|
||||
{
|
||||
fn format_event(
|
||||
&self,
|
||||
ctx: &FmtContext<'_, S, N>,
|
||||
mut writer: Writer<'_>,
|
||||
event: &Event<'_>,
|
||||
) -> std::fmt::Result {
|
||||
// Format all the spans in the event's span context.
|
||||
if let Some(scope) = ctx.event_scope() {
|
||||
for span in scope.from_root() {
|
||||
write!(writer, "{}", span.name())?;
|
||||
|
||||
// `FormattedFields` is a formatted representation of the span's fields,
|
||||
// which is stored in its extensions by the `fmt` layer's `new_span`
|
||||
// method. The fields will have been formatted by the same field formatter
|
||||
// that's provided to the event formatter in the `FmtContext`.
|
||||
let ext = span.extensions();
|
||||
let fields = &ext
|
||||
.get::<FormattedFields<N>>()
|
||||
.expect("will never be `None`");
|
||||
|
||||
// Skip formatting the fields if the span had no fields.
|
||||
if !fields.is_empty() {
|
||||
write!(writer, "{{{fields}}}")?;
|
||||
}
|
||||
write!(writer, ": ")?;
|
||||
}
|
||||
}
|
||||
|
||||
// Write fields on the event
|
||||
ctx.field_format().format_fields(writer.by_ref(), event)?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
66
pgxn/neon/communicator/src/worker_process/main_loop.rs
Normal file
66
pgxn/neon/communicator/src/worker_process/main_loop.rs
Normal file
@@ -0,0 +1,66 @@
|
||||
use std::str::FromStr as _;
|
||||
|
||||
use crate::worker_process::lfc_metrics::LfcMetricsCollector;
|
||||
|
||||
use measured::MetricGroup;
|
||||
use measured::metric::MetricEncoding;
|
||||
use measured::metric::gauge::GaugeState;
|
||||
use measured::metric::group::Encoding;
|
||||
use utils::id::{TenantId, TimelineId};
|
||||
|
||||
pub struct CommunicatorWorkerProcessStruct {
|
||||
runtime: tokio::runtime::Runtime,
|
||||
|
||||
/*** Metrics ***/
|
||||
pub(crate) lfc_metrics: LfcMetricsCollector,
|
||||
}
|
||||
|
||||
/// Launch the communicator process's Rust subsystems
|
||||
pub(super) fn init(
|
||||
tenant_id: Option<&str>,
|
||||
timeline_id: Option<&str>,
|
||||
) -> Result<&'static CommunicatorWorkerProcessStruct, String> {
|
||||
// The caller validated these already
|
||||
let _tenant_id = tenant_id
|
||||
.map(TenantId::from_str)
|
||||
.transpose()
|
||||
.map_err(|e| format!("invalid tenant ID: {e}"))?;
|
||||
let _timeline_id = timeline_id
|
||||
.map(TimelineId::from_str)
|
||||
.transpose()
|
||||
.map_err(|e| format!("invalid timeline ID: {e}"))?;
|
||||
|
||||
let runtime = tokio::runtime::Builder::new_multi_thread()
|
||||
.enable_all()
|
||||
.thread_name("communicator thread")
|
||||
.build()
|
||||
.unwrap();
|
||||
|
||||
let worker_struct = CommunicatorWorkerProcessStruct {
|
||||
// Note: it's important to not drop the runtime, or all the tasks are dropped
|
||||
// too. Including it in the returned struct is one way to keep it around.
|
||||
runtime,
|
||||
|
||||
// metrics
|
||||
lfc_metrics: LfcMetricsCollector,
|
||||
};
|
||||
let worker_struct = Box::leak(Box::new(worker_struct));
|
||||
|
||||
// Start the listener on the control socket
|
||||
worker_struct
|
||||
.runtime
|
||||
.block_on(worker_struct.launch_control_socket_listener())
|
||||
.map_err(|e| e.to_string())?;
|
||||
|
||||
Ok(worker_struct)
|
||||
}
|
||||
|
||||
impl<T> MetricGroup<T> for CommunicatorWorkerProcessStruct
|
||||
where
|
||||
T: Encoding,
|
||||
GaugeState: MetricEncoding<T>,
|
||||
{
|
||||
fn collect_group_into(&self, enc: &mut T) -> Result<(), T::Err> {
|
||||
self.lfc_metrics.collect_group_into(enc)
|
||||
}
|
||||
}
|
||||
13
pgxn/neon/communicator/src/worker_process/mod.rs
Normal file
13
pgxn/neon/communicator/src/worker_process/mod.rs
Normal file
@@ -0,0 +1,13 @@
|
||||
//! This code runs in the communicator worker process. This provides
|
||||
//! the glue code to:
|
||||
//!
|
||||
//! - launch the main loop,
|
||||
//! - receive IO requests from backends and process them,
|
||||
//! - write results back to backends.
|
||||
|
||||
mod callbacks;
|
||||
mod control_socket;
|
||||
mod lfc_metrics;
|
||||
mod logging;
|
||||
mod main_loop;
|
||||
mod worker_interface;
|
||||
@@ -0,0 +1,60 @@
|
||||
//! Functions called from the C code in the worker process
|
||||
|
||||
use std::ffi::{CStr, CString, c_char};
|
||||
|
||||
use crate::worker_process::main_loop;
|
||||
use crate::worker_process::main_loop::CommunicatorWorkerProcessStruct;
|
||||
|
||||
/// Launch the communicator's tokio tasks, which do most of the work.
|
||||
///
|
||||
/// The caller has initialized the process as a regular PostgreSQL background worker
|
||||
/// process.
|
||||
///
|
||||
/// Inputs:
|
||||
/// `tenant_id` and `timeline_id` can be NULL, if we're been launched in "non-Neon" mode,
|
||||
/// where we use local storage instead of connecting to remote neon storage. That's
|
||||
/// currently only used in some unit tests.
|
||||
///
|
||||
/// Result:
|
||||
/// Returns pointer to CommunicatorWorkerProcessStruct, which is a handle to running
|
||||
/// Rust tasks. The C code can use it to interact with the Rust parts. On failure, returns
|
||||
/// None/NULL, and an error message is returned in *error_p
|
||||
///
|
||||
/// This is called only once in the process, so the returned struct, and error message in
|
||||
/// case of failure, are simply leaked.
|
||||
#[unsafe(no_mangle)]
|
||||
pub extern "C" fn communicator_worker_launch(
|
||||
tenant_id: *const c_char,
|
||||
timeline_id: *const c_char,
|
||||
error_p: *mut *const c_char,
|
||||
) -> Option<&'static CommunicatorWorkerProcessStruct> {
|
||||
// Convert the arguments into more convenient Rust types
|
||||
let tenant_id = if tenant_id.is_null() {
|
||||
None
|
||||
} else {
|
||||
let cstr = unsafe { CStr::from_ptr(tenant_id) };
|
||||
Some(cstr.to_str().expect("assume UTF-8"))
|
||||
};
|
||||
let timeline_id = if timeline_id.is_null() {
|
||||
None
|
||||
} else {
|
||||
let cstr = unsafe { CStr::from_ptr(timeline_id) };
|
||||
Some(cstr.to_str().expect("assume UTF-8"))
|
||||
};
|
||||
|
||||
// The `init` function does all the work.
|
||||
let result = main_loop::init(tenant_id, timeline_id);
|
||||
|
||||
// On failure, return the error message to the C caller in *error_p.
|
||||
match result {
|
||||
Ok(worker_struct) => Some(worker_struct),
|
||||
Err(errmsg) => {
|
||||
let errmsg = CString::new(errmsg).expect("no nuls within error message");
|
||||
let errmsg = Box::leak(errmsg.into_boxed_c_str());
|
||||
let p: *const c_char = errmsg.as_ptr();
|
||||
|
||||
unsafe { *error_p = p };
|
||||
None
|
||||
}
|
||||
}
|
||||
}
|
||||
273
pgxn/neon/communicator_process.c
Normal file
273
pgxn/neon/communicator_process.c
Normal file
@@ -0,0 +1,273 @@
|
||||
/*-------------------------------------------------------------------------
|
||||
*
|
||||
* communicator_process.c
|
||||
* Functions for starting up the communicator background worker process.
|
||||
*
|
||||
* Currently, the communicator process only functions as a metrics
|
||||
* exporter. It provides an HTTP endpoint for polling a limited set of
|
||||
* metrics. TODO: In the future, it will do much more, i.e. handle all
|
||||
* the communications with the pageservers.
|
||||
*
|
||||
* Portions Copyright (c) 1996-2021, PostgreSQL Global Development Group
|
||||
* Portions Copyright (c) 1994, Regents of the University of California
|
||||
*
|
||||
*-------------------------------------------------------------------------
|
||||
*/
|
||||
#include "postgres.h"
|
||||
|
||||
#include <unistd.h>
|
||||
|
||||
#include "miscadmin.h"
|
||||
#include "postmaster/bgworker.h"
|
||||
#include "postmaster/interrupt.h"
|
||||
#include "postmaster/postmaster.h"
|
||||
#include "replication/walsender.h"
|
||||
#include "storage/ipc.h"
|
||||
#include "storage/latch.h"
|
||||
#include "storage/pmsignal.h"
|
||||
#include "storage/procsignal.h"
|
||||
#include "tcop/tcopprot.h"
|
||||
#include "utils/timestamp.h"
|
||||
|
||||
#include "communicator_process.h"
|
||||
#include "file_cache.h"
|
||||
#include "neon.h"
|
||||
#include "neon_perf_counters.h"
|
||||
|
||||
/* the rust bindings, generated by cbindgen */
|
||||
#include "communicator/communicator_bindings.h"
|
||||
|
||||
static void pump_logging(struct LoggingReceiver *logging);
|
||||
PGDLLEXPORT void communicator_new_bgworker_main(Datum main_arg);
|
||||
|
||||
/**** Initialization functions. These run in postmaster ****/
|
||||
|
||||
void
|
||||
pg_init_communicator_process(void)
|
||||
{
|
||||
BackgroundWorker bgw;
|
||||
|
||||
/* Initialize the background worker process */
|
||||
memset(&bgw, 0, sizeof(bgw));
|
||||
bgw.bgw_flags = BGWORKER_SHMEM_ACCESS;
|
||||
bgw.bgw_start_time = BgWorkerStart_PostmasterStart;
|
||||
snprintf(bgw.bgw_library_name, BGW_MAXLEN, "neon");
|
||||
snprintf(bgw.bgw_function_name, BGW_MAXLEN, "communicator_new_bgworker_main");
|
||||
snprintf(bgw.bgw_name, BGW_MAXLEN, "Storage communicator process");
|
||||
snprintf(bgw.bgw_type, BGW_MAXLEN, "Storage communicator process");
|
||||
bgw.bgw_restart_time = 5;
|
||||
bgw.bgw_notify_pid = 0;
|
||||
bgw.bgw_main_arg = (Datum) 0;
|
||||
|
||||
RegisterBackgroundWorker(&bgw);
|
||||
}
|
||||
|
||||
/**** Worker process functions. These run in the communicator worker process ****/
|
||||
|
||||
/*
|
||||
* Entry point for the communicator bgworker process
|
||||
*/
|
||||
void
|
||||
communicator_new_bgworker_main(Datum main_arg)
|
||||
{
|
||||
struct LoggingReceiver *logging;
|
||||
const char *errmsg = NULL;
|
||||
const struct CommunicatorWorkerProcessStruct *proc_handle;
|
||||
|
||||
/*
|
||||
* Pretend that this process is a WAL sender. That affects the shutdown
|
||||
* sequence: WAL senders are shut down last, after the final checkpoint
|
||||
* has been written. That's what we want for the communicator process too.
|
||||
*/
|
||||
am_walsender = true;
|
||||
MarkPostmasterChildWalSender();
|
||||
|
||||
/* Establish signal handlers. */
|
||||
pqsignal(SIGUSR1, procsignal_sigusr1_handler);
|
||||
/*
|
||||
* Postmaster sends us SIGUSR2 when all regular backends and bgworkers
|
||||
* have exited, and it's time for us to exit too
|
||||
*/
|
||||
pqsignal(SIGUSR2, die);
|
||||
pqsignal(SIGHUP, SignalHandlerForConfigReload);
|
||||
pqsignal(SIGTERM, die);
|
||||
|
||||
BackgroundWorkerUnblockSignals();
|
||||
|
||||
/*
|
||||
* By default, INFO messages are not printed to the log. We want
|
||||
* `tracing::info!` messages emitted from the communicator to be printed,
|
||||
* however, so increase the log level.
|
||||
*
|
||||
* XXX: This overrides any user-set value from the config file. That's not
|
||||
* great, but on the other hand, there should be little reason for user to
|
||||
* control the verbosity of the communicator. It's not too verbose by
|
||||
* default.
|
||||
*/
|
||||
SetConfigOption("log_min_messages", "INFO", PGC_SUSET, PGC_S_OVERRIDE);
|
||||
|
||||
logging = communicator_worker_configure_logging();
|
||||
|
||||
proc_handle = communicator_worker_launch(
|
||||
neon_tenant[0] == '\0' ? NULL : neon_tenant,
|
||||
neon_timeline[0] == '\0' ? NULL : neon_timeline,
|
||||
&errmsg
|
||||
);
|
||||
if (proc_handle == NULL)
|
||||
{
|
||||
/*
|
||||
* Something went wrong. Before exiting, forward any log messages that
|
||||
* might've been generated during the failed launch.
|
||||
*/
|
||||
pump_logging(logging);
|
||||
|
||||
elog(PANIC, "%s", errmsg);
|
||||
}
|
||||
|
||||
/*
|
||||
* The Rust tokio runtime has been launched, and it's running in the
|
||||
* background now. This loop in the main thread handles any interactions
|
||||
* we need with the rest of PostgreSQL.
|
||||
*
|
||||
* NB: This process is now multi-threaded! The Rust threads do not call
|
||||
* into any Postgres functions, but it's not entirely clear which Postgres
|
||||
* functions are safe to call from this main thread either. Be very
|
||||
* careful about adding anything non-trivial here.
|
||||
*
|
||||
* Also note that we try to react quickly to any log messages arriving
|
||||
* from the Rust thread. Be careful to not do anything too expensive here
|
||||
* that might cause delays.
|
||||
*/
|
||||
elog(LOG, "communicator threads started");
|
||||
for (;;)
|
||||
{
|
||||
TimestampTz before;
|
||||
long duration;
|
||||
|
||||
ResetLatch(MyLatch);
|
||||
|
||||
/*
|
||||
* Forward any log messages from the Rust threads into the normal
|
||||
* Postgres logging facility.
|
||||
*/
|
||||
pump_logging(logging);
|
||||
|
||||
/*
|
||||
* Check interrupts like system shutdown or config reload
|
||||
*
|
||||
* We mustn't block for too long within this loop, or we risk the log
|
||||
* queue to fill up and messages to be lost. Also, even if we can keep
|
||||
* up, if there's a long delay between sending a message and printing
|
||||
* it to the log, the timestamps on the messages get skewed, which is
|
||||
* confusing.
|
||||
*
|
||||
* We expect processing interrupts to happen fast enough that it's OK,
|
||||
* but measure it just in case, and print a warning if it takes longer
|
||||
* than 100 ms.
|
||||
*/
|
||||
#define LOG_SKEW_WARNING_MS 100
|
||||
before = GetCurrentTimestamp();
|
||||
|
||||
CHECK_FOR_INTERRUPTS();
|
||||
if (ConfigReloadPending)
|
||||
{
|
||||
ConfigReloadPending = false;
|
||||
ProcessConfigFile(PGC_SIGHUP);
|
||||
}
|
||||
|
||||
duration = TimestampDifferenceMilliseconds(before, GetCurrentTimestamp());
|
||||
if (duration > LOG_SKEW_WARNING_MS)
|
||||
elog(WARNING, "handling interrupts took %ld ms, communicator log timestamps might be skewed", duration);
|
||||
|
||||
/*
|
||||
* Wait until we are woken up. The rust threads will set the latch
|
||||
* when there's a log message to forward.
|
||||
*/
|
||||
(void) WaitLatch(MyLatch,
|
||||
WL_LATCH_SET | WL_EXIT_ON_PM_DEATH,
|
||||
0,
|
||||
PG_WAIT_EXTENSION);
|
||||
}
|
||||
}
|
||||
|
||||
static void
|
||||
pump_logging(struct LoggingReceiver *logging)
|
||||
{
|
||||
char errbuf[1000];
|
||||
int elevel;
|
||||
int32 rc;
|
||||
static uint64_t last_dropped_event_count = 0;
|
||||
uint64_t dropped_event_count;
|
||||
uint64_t dropped_now;
|
||||
|
||||
for (;;)
|
||||
{
|
||||
rc = communicator_worker_poll_logging(logging,
|
||||
errbuf,
|
||||
sizeof(errbuf),
|
||||
&elevel,
|
||||
&dropped_event_count);
|
||||
if (rc == 0)
|
||||
{
|
||||
/* nothing to do */
|
||||
break;
|
||||
}
|
||||
else if (rc == 1)
|
||||
{
|
||||
/* Because we don't want to exit on error */
|
||||
|
||||
if (message_level_is_interesting(elevel))
|
||||
{
|
||||
/*
|
||||
* Prevent interrupts while cleaning up.
|
||||
*
|
||||
* (Not sure if this is required, but all the error handlers
|
||||
* in Postgres that are installed as sigsetjmp() targets do
|
||||
* this, so let's follow the example)
|
||||
*/
|
||||
HOLD_INTERRUPTS();
|
||||
|
||||
errstart(elevel, TEXTDOMAIN);
|
||||
errmsg_internal("[COMMUNICATOR] %s", errbuf);
|
||||
EmitErrorReport();
|
||||
FlushErrorState();
|
||||
|
||||
/* Now we can allow interrupts again */
|
||||
RESUME_INTERRUPTS();
|
||||
}
|
||||
}
|
||||
else if (rc == -1)
|
||||
{
|
||||
elog(ERROR, "logging channel was closed unexpectedly");
|
||||
}
|
||||
}
|
||||
|
||||
/*
|
||||
* If the queue was full at any time since the last time we reported it,
|
||||
* report how many messages were lost. We do this outside the loop, so
|
||||
* that if the logging system is clogged, we don't exacerbate it by
|
||||
* printing lots of warnings about dropped messages.
|
||||
*/
|
||||
dropped_now = dropped_event_count - last_dropped_event_count;
|
||||
if (dropped_now != 0)
|
||||
{
|
||||
elog(WARNING, "%lu communicator log messages were dropped because the log buffer was full",
|
||||
(unsigned long) dropped_now);
|
||||
last_dropped_event_count = dropped_event_count;
|
||||
}
|
||||
}
|
||||
|
||||
/****
|
||||
* Callbacks from the rust code, in the communicator process.
|
||||
*
|
||||
* NOTE: These must be thread-safe! It's very limited which PostgreSQL
|
||||
* functions you can use!!!
|
||||
*
|
||||
* The signatures of these need to match those in the Rust code.
|
||||
*/
|
||||
|
||||
void
|
||||
callback_set_my_latch_unsafe(void)
|
||||
{
|
||||
SetLatch(MyLatch);
|
||||
}
|
||||
17
pgxn/neon/communicator_process.h
Normal file
17
pgxn/neon/communicator_process.h
Normal file
@@ -0,0 +1,17 @@
|
||||
/*-------------------------------------------------------------------------
|
||||
*
|
||||
* communicator_process.h
|
||||
* Communicator process
|
||||
*
|
||||
*
|
||||
* Portions Copyright (c) 1996-2021, PostgreSQL Global Development Group
|
||||
* Portions Copyright (c) 1994, Regents of the University of California
|
||||
*
|
||||
*-------------------------------------------------------------------------
|
||||
*/
|
||||
#ifndef COMMUNICATOR_PROCESS_H
|
||||
#define COMMUNICATOR_PROCESS_H
|
||||
|
||||
extern void pg_init_communicator_process(void);
|
||||
|
||||
#endif /* COMMUNICATOR_PROCESS_H */
|
||||
@@ -52,6 +52,8 @@
|
||||
#include "pagestore_client.h"
|
||||
#include "communicator.h"
|
||||
|
||||
#include "communicator/communicator_bindings.h"
|
||||
|
||||
#define CriticalAssert(cond) do if (!(cond)) elog(PANIC, "LFC: assertion %s failed at %s:%d: ", #cond, __FILE__, __LINE__); while (0)
|
||||
|
||||
/*
|
||||
@@ -2156,6 +2158,38 @@ lfc_approximate_working_set_size_seconds(time_t duration, bool reset)
|
||||
return dc;
|
||||
}
|
||||
|
||||
/*
|
||||
* Get metrics, for the built-in metrics exporter that's part of the communicator
|
||||
* process.
|
||||
*
|
||||
* NB: This is called from a Rust tokio task inside the communicator process.
|
||||
* Acquiring lwlocks, elog(), allocating memory or anything else non-trivial
|
||||
* is strictly prohibited here!
|
||||
*/
|
||||
struct LfcMetrics
|
||||
callback_get_lfc_metrics_unsafe(void)
|
||||
{
|
||||
struct LfcMetrics result = {
|
||||
.lfc_cache_size_limit = (int64) lfc_size_limit * 1024 * 1024,
|
||||
.lfc_hits = lfc_ctl ? lfc_ctl->hits : 0,
|
||||
.lfc_misses = lfc_ctl ? lfc_ctl->misses : 0,
|
||||
.lfc_used = lfc_ctl ? lfc_ctl->used : 0,
|
||||
.lfc_writes = lfc_ctl ? lfc_ctl->writes : 0,
|
||||
};
|
||||
|
||||
if (lfc_ctl)
|
||||
{
|
||||
for (int minutes = 1; minutes <= 60; minutes++)
|
||||
{
|
||||
result.lfc_approximate_working_set_size_windows[minutes - 1] =
|
||||
lfc_approximate_working_set_size_seconds(minutes * 60, false);
|
||||
}
|
||||
}
|
||||
|
||||
return result;
|
||||
}
|
||||
|
||||
|
||||
PG_FUNCTION_INFO_V1(get_local_cache_state);
|
||||
|
||||
Datum
|
||||
|
||||
@@ -31,6 +31,7 @@
|
||||
#include "utils/guc_tables.h"
|
||||
|
||||
#include "communicator.h"
|
||||
#include "communicator_process.h"
|
||||
#include "extension_server.h"
|
||||
#include "file_cache.h"
|
||||
#include "neon.h"
|
||||
@@ -44,9 +45,6 @@
|
||||
#include "storage/ipc.h"
|
||||
#endif
|
||||
|
||||
/* the rust bindings, generated by cbindgen */
|
||||
#include "communicator/communicator_bindings.h"
|
||||
|
||||
PG_MODULE_MAGIC;
|
||||
void _PG_init(void);
|
||||
|
||||
@@ -457,9 +455,6 @@ _PG_init(void)
|
||||
load_file("$libdir/neon_rmgr", false);
|
||||
#endif
|
||||
|
||||
/* dummy call to a Rust function in the communicator library, to check that it works */
|
||||
(void) communicator_dummy(123);
|
||||
|
||||
/*
|
||||
* Initializing a pre-loaded Postgres extension happens in three stages:
|
||||
*
|
||||
@@ -497,6 +492,8 @@ _PG_init(void)
|
||||
pg_init_walproposer();
|
||||
init_lwlsncache();
|
||||
|
||||
pg_init_communicator_process();
|
||||
|
||||
pg_init_communicator();
|
||||
Custom_XLogReaderRoutines = NeonOnDemandXLogReaderRoutines;
|
||||
|
||||
|
||||
19
poetry.lock
generated
19
poetry.lock
generated
@@ -1,4 +1,4 @@
|
||||
# This file is automatically @generated by Poetry 2.1.1 and should not be changed by hand.
|
||||
# This file is automatically @generated by Poetry 2.1.3 and should not be changed by hand.
|
||||
|
||||
[[package]]
|
||||
name = "aiohappyeyeballs"
|
||||
@@ -3068,6 +3068,21 @@ urllib3 = ">=1.21.1,<3"
|
||||
socks = ["PySocks (>=1.5.6,!=1.5.7)"]
|
||||
use-chardet-on-py3 = ["chardet (>=3.0.2,<6)"]
|
||||
|
||||
[[package]]
|
||||
name = "requests-unixsocket"
|
||||
version = "0.4.1"
|
||||
description = "Use requests to talk HTTP via a UNIX domain socket"
|
||||
optional = false
|
||||
python-versions = ">=3.9"
|
||||
groups = ["main"]
|
||||
files = [
|
||||
{file = "requests_unixsocket-0.4.1-py3-none-any.whl", hash = "sha256:60c4942e9dbecc2f64d611039fb1dfc25da382083c6434ac0316dca3ff908f4d"},
|
||||
{file = "requests_unixsocket-0.4.1.tar.gz", hash = "sha256:b2596158c356ecee68d27ba469a52211230ac6fb0cde8b66afb19f0ed47a1995"},
|
||||
]
|
||||
|
||||
[package.dependencies]
|
||||
requests = ">=1.1"
|
||||
|
||||
[[package]]
|
||||
name = "responses"
|
||||
version = "0.25.3"
|
||||
@@ -3844,4 +3859,4 @@ cffi = ["cffi (>=1.11)"]
|
||||
[metadata]
|
||||
lock-version = "2.1"
|
||||
python-versions = "^3.11"
|
||||
content-hash = "6a1e8ba06b8194bf28d87fd5e184e2ddc2b4a19dffcbe3953b26da3d55c9212f"
|
||||
content-hash = "b08aba407631b0341d2ef8bf9acffd733bfc7d32b12d344717ab4c7fef697625"
|
||||
|
||||
@@ -50,6 +50,7 @@ types-pyyaml = "^6.0.12.20240917"
|
||||
testcontainers = "^4.9.0"
|
||||
# Install a release candidate of `jsonnet`, as it supports Python 3.13
|
||||
jsonnet = "^0.21.0-rc2"
|
||||
requests-unixsocket = "^0.4.1"
|
||||
|
||||
[tool.poetry.group.dev.dependencies]
|
||||
mypy = "==1.13.0"
|
||||
|
||||
@@ -66,6 +66,12 @@ class EndpointHttpClient(requests.Session):
|
||||
res.raise_for_status()
|
||||
return res.json()
|
||||
|
||||
def autoscaling_metrics(self):
|
||||
res = self.get(f"http://localhost:{self.external_port}/autoscaling_metrics")
|
||||
res.raise_for_status()
|
||||
log.debug("raw compute metrics: %s", res.text)
|
||||
return res.text
|
||||
|
||||
def prewarm_lfc_status(self) -> dict[str, str]:
|
||||
res = self.get(self.prewarm_url)
|
||||
res.raise_for_status()
|
||||
|
||||
@@ -5793,6 +5793,7 @@ SKIP_FILES = frozenset(
|
||||
"postmaster.pid",
|
||||
"pg_control",
|
||||
"pg_dynshmem",
|
||||
"neon-communicator.socket",
|
||||
)
|
||||
)
|
||||
|
||||
|
||||
54
test_runner/regress/test_communicator_metrics_exporter.py
Normal file
54
test_runner/regress/test_communicator_metrics_exporter.py
Normal file
@@ -0,0 +1,54 @@
|
||||
from __future__ import annotations
|
||||
|
||||
import os
|
||||
from typing import TYPE_CHECKING
|
||||
|
||||
import pytest
|
||||
import requests
|
||||
import requests_unixsocket # type: ignore [import-untyped]
|
||||
from fixtures.metrics import parse_metrics
|
||||
|
||||
if TYPE_CHECKING:
|
||||
from fixtures.neon_fixtures import NeonEnv
|
||||
|
||||
NEON_COMMUNICATOR_SOCKET_NAME = "neon-communicator.socket"
|
||||
|
||||
|
||||
def test_communicator_metrics(neon_simple_env: NeonEnv):
|
||||
"""
|
||||
Test the communicator's built-in HTTP prometheus exporter
|
||||
"""
|
||||
env = neon_simple_env
|
||||
|
||||
endpoint = env.endpoints.create("main")
|
||||
endpoint.start()
|
||||
|
||||
# Change current directory to the data directory, so that we can use
|
||||
# a short relative path to refer to the socket. (There's a 100 char
|
||||
# limitation on the path.)
|
||||
os.chdir(str(endpoint.pgdata_dir))
|
||||
session = requests_unixsocket.Session()
|
||||
r = session.get(f"http+unix://{NEON_COMMUNICATOR_SOCKET_NAME}/metrics")
|
||||
assert r.status_code == 200, f"got response {r.status_code}: {r.text}"
|
||||
|
||||
# quick test that the endpoint returned something expected. (We don't validate
|
||||
# that the metrics returned are sensible.)
|
||||
m = parse_metrics(r.text)
|
||||
m.query_one("lfc_hits")
|
||||
m.query_one("lfc_misses")
|
||||
|
||||
# Test panic handling. The /debug/panic endpoint raises a Rust panic. It's
|
||||
# expected to unwind and drop the HTTP connection without response, but not
|
||||
# kill the process or the server.
|
||||
with pytest.raises(
|
||||
requests.ConnectionError, match="Remote end closed connection without response"
|
||||
):
|
||||
r = session.get(f"http+unix://{NEON_COMMUNICATOR_SOCKET_NAME}/debug/panic")
|
||||
assert r.status_code == 500
|
||||
|
||||
# Test that subsequent requests after the panic still work.
|
||||
r = session.get(f"http+unix://{NEON_COMMUNICATOR_SOCKET_NAME}/metrics")
|
||||
assert r.status_code == 200, f"got response {r.status_code}: {r.text}"
|
||||
m = parse_metrics(r.text)
|
||||
m.query_one("lfc_hits")
|
||||
m.query_one("lfc_misses")
|
||||
@@ -197,7 +197,7 @@ def test_create_snapshot(
|
||||
shutil.copytree(
|
||||
test_output_dir,
|
||||
new_compatibility_snapshot_dir,
|
||||
ignore=shutil.ignore_patterns("pg_dynshmem"),
|
||||
ignore=shutil.ignore_patterns("pg_dynshmem", "neon-communicator.socket"),
|
||||
)
|
||||
|
||||
log.info(f"Copied new compatibility snapshot dir to: {new_compatibility_snapshot_dir}")
|
||||
|
||||
@@ -6,6 +6,7 @@ from typing import TYPE_CHECKING
|
||||
|
||||
import pytest
|
||||
from fixtures.log_helper import log
|
||||
from fixtures.metrics import parse_metrics
|
||||
from fixtures.utils import USE_LFC, query_scalar
|
||||
|
||||
if TYPE_CHECKING:
|
||||
@@ -75,10 +76,24 @@ WITH (fillfactor='100');
|
||||
cur.execute("SELECT abalance FROM pgbench_accounts WHERE aid = 104242")
|
||||
cur.execute("SELECT abalance FROM pgbench_accounts WHERE aid = 204242")
|
||||
# verify working set size after some index access of a few select pages only
|
||||
blocks = query_scalar(cur, "select approximate_working_set_size(true)")
|
||||
blocks = query_scalar(cur, "select approximate_working_set_size(false)")
|
||||
log.info(f"working set size after some index access of a few select pages only {blocks}")
|
||||
assert blocks < 20
|
||||
|
||||
# Also test the metrics from the /autoscaling_metrics endpoint
|
||||
autoscaling_metrics = endpoint.http_client().autoscaling_metrics()
|
||||
log.debug(f"Raw metrics: {autoscaling_metrics}")
|
||||
m = parse_metrics(autoscaling_metrics)
|
||||
|
||||
http_estimate = m.query_one(
|
||||
"lfc_approximate_working_set_size_windows",
|
||||
{
|
||||
"duration_seconds": "60",
|
||||
},
|
||||
).value
|
||||
log.info(f"http estimate: {http_estimate}, blocks: {blocks}")
|
||||
assert http_estimate > 0 and http_estimate < 20
|
||||
|
||||
|
||||
@pytest.mark.skipif(not USE_LFC, reason="LFC is disabled, skipping")
|
||||
def test_sliding_working_set_approximation(neon_simple_env: NeonEnv):
|
||||
|
||||
Reference in New Issue
Block a user