Files
neon/pgxn/neon/communicator_process.c
Heikki Linnakangas 8bb45fd5da Introduce built-in Prometheus exporter to the Postgres extension (#12591)
Currently, the exporter exposes the same LFC metrics that are exposed by
the "autoscaling" sql_exporter in the docker image. With this, we can
remove the dedicated sql_exporter instance. (Actually doing the removal
is left as a TODO until this is rolled out to production and we have
changed autoscaling-agent to fetch the metrics from this new endpoint.)

The exporter runs as a Postgres background worker process. This is
extracted from the Rust communicator rewrite project, which will use the
same worker process for much more, to handle the communications with the
pageservers. For now, though, it merely handles the metrics requests.

In the future, we will add more metrics, and perhaps even APIs to
control the running Postgres instance.

The exporter listens on a Unix Domain socket within the Postgres data
directory. A Unix Domain socket is a bit unconventional, but it has some
advantages:

- Permissions are taken care of. Only processes that can access the data
directory, and therefore already have full access to the running
Postgres instance, can connect to it.

- No need to allocate and manage a new port number for the listener

It has some downsides too: it's not immediately accessible from the
outside world, and the functions to work with Unix Domain sockets are
more low-level than TCP sockets (see the symlink hack in
`postgres_metrics_client.rs`, for example).

To expose the metrics from the local Unix Domain Socket to the
autoscaling agent, introduce a new '/autoscaling_metrics' endpoint in
the compute_ctl's HTTP server. Currently it merely forwards the request
to the Postgres instance, but we could add rate limiting and access
control there in the future.

---------

Co-authored-by: Conrad Ludgate <conrad@neon.tech>
2025-07-22 12:00:20 +00:00

274 lines
7.9 KiB
C

/*-------------------------------------------------------------------------
*
* communicator_process.c
* Functions for starting up the communicator background worker process.
*
* Currently, the communicator process only functions as a metrics
* exporter. It provides an HTTP endpoint for polling a limited set of
* metrics. TODO: In the future, it will do much more, i.e. handle all
* the communications with the pageservers.
*
* Portions Copyright (c) 1996-2021, PostgreSQL Global Development Group
* Portions Copyright (c) 1994, Regents of the University of California
*
*-------------------------------------------------------------------------
*/
#include "postgres.h"
#include <unistd.h>
#include "miscadmin.h"
#include "postmaster/bgworker.h"
#include "postmaster/interrupt.h"
#include "postmaster/postmaster.h"
#include "replication/walsender.h"
#include "storage/ipc.h"
#include "storage/latch.h"
#include "storage/pmsignal.h"
#include "storage/procsignal.h"
#include "tcop/tcopprot.h"
#include "utils/timestamp.h"
#include "communicator_process.h"
#include "file_cache.h"
#include "neon.h"
#include "neon_perf_counters.h"
/* the rust bindings, generated by cbindgen */
#include "communicator/communicator_bindings.h"
static void pump_logging(struct LoggingReceiver *logging);
PGDLLEXPORT void communicator_new_bgworker_main(Datum main_arg);
/**** Initialization functions. These run in postmaster ****/
void
pg_init_communicator_process(void)
{
BackgroundWorker bgw;
/* Initialize the background worker process */
memset(&bgw, 0, sizeof(bgw));
bgw.bgw_flags = BGWORKER_SHMEM_ACCESS;
bgw.bgw_start_time = BgWorkerStart_PostmasterStart;
snprintf(bgw.bgw_library_name, BGW_MAXLEN, "neon");
snprintf(bgw.bgw_function_name, BGW_MAXLEN, "communicator_new_bgworker_main");
snprintf(bgw.bgw_name, BGW_MAXLEN, "Storage communicator process");
snprintf(bgw.bgw_type, BGW_MAXLEN, "Storage communicator process");
bgw.bgw_restart_time = 5;
bgw.bgw_notify_pid = 0;
bgw.bgw_main_arg = (Datum) 0;
RegisterBackgroundWorker(&bgw);
}
/**** Worker process functions. These run in the communicator worker process ****/
/*
* Entry point for the communicator bgworker process
*/
void
communicator_new_bgworker_main(Datum main_arg)
{
struct LoggingReceiver *logging;
const char *errmsg = NULL;
const struct CommunicatorWorkerProcessStruct *proc_handle;
/*
* Pretend that this process is a WAL sender. That affects the shutdown
* sequence: WAL senders are shut down last, after the final checkpoint
* has been written. That's what we want for the communicator process too.
*/
am_walsender = true;
MarkPostmasterChildWalSender();
/* Establish signal handlers. */
pqsignal(SIGUSR1, procsignal_sigusr1_handler);
/*
* Postmaster sends us SIGUSR2 when all regular backends and bgworkers
* have exited, and it's time for us to exit too
*/
pqsignal(SIGUSR2, die);
pqsignal(SIGHUP, SignalHandlerForConfigReload);
pqsignal(SIGTERM, die);
BackgroundWorkerUnblockSignals();
/*
* By default, INFO messages are not printed to the log. We want
* `tracing::info!` messages emitted from the communicator to be printed,
* however, so increase the log level.
*
* XXX: This overrides any user-set value from the config file. That's not
* great, but on the other hand, there should be little reason for user to
* control the verbosity of the communicator. It's not too verbose by
* default.
*/
SetConfigOption("log_min_messages", "INFO", PGC_SUSET, PGC_S_OVERRIDE);
logging = communicator_worker_configure_logging();
proc_handle = communicator_worker_launch(
neon_tenant[0] == '\0' ? NULL : neon_tenant,
neon_timeline[0] == '\0' ? NULL : neon_timeline,
&errmsg
);
if (proc_handle == NULL)
{
/*
* Something went wrong. Before exiting, forward any log messages that
* might've been generated during the failed launch.
*/
pump_logging(logging);
elog(PANIC, "%s", errmsg);
}
/*
* The Rust tokio runtime has been launched, and it's running in the
* background now. This loop in the main thread handles any interactions
* we need with the rest of PostgreSQL.
*
* NB: This process is now multi-threaded! The Rust threads do not call
* into any Postgres functions, but it's not entirely clear which Postgres
* functions are safe to call from this main thread either. Be very
* careful about adding anything non-trivial here.
*
* Also note that we try to react quickly to any log messages arriving
* from the Rust thread. Be careful to not do anything too expensive here
* that might cause delays.
*/
elog(LOG, "communicator threads started");
for (;;)
{
TimestampTz before;
long duration;
ResetLatch(MyLatch);
/*
* Forward any log messages from the Rust threads into the normal
* Postgres logging facility.
*/
pump_logging(logging);
/*
* Check interrupts like system shutdown or config reload
*
* We mustn't block for too long within this loop, or we risk the log
* queue to fill up and messages to be lost. Also, even if we can keep
* up, if there's a long delay between sending a message and printing
* it to the log, the timestamps on the messages get skewed, which is
* confusing.
*
* We expect processing interrupts to happen fast enough that it's OK,
* but measure it just in case, and print a warning if it takes longer
* than 100 ms.
*/
#define LOG_SKEW_WARNING_MS 100
before = GetCurrentTimestamp();
CHECK_FOR_INTERRUPTS();
if (ConfigReloadPending)
{
ConfigReloadPending = false;
ProcessConfigFile(PGC_SIGHUP);
}
duration = TimestampDifferenceMilliseconds(before, GetCurrentTimestamp());
if (duration > LOG_SKEW_WARNING_MS)
elog(WARNING, "handling interrupts took %ld ms, communicator log timestamps might be skewed", duration);
/*
* Wait until we are woken up. The rust threads will set the latch
* when there's a log message to forward.
*/
(void) WaitLatch(MyLatch,
WL_LATCH_SET | WL_EXIT_ON_PM_DEATH,
0,
PG_WAIT_EXTENSION);
}
}
static void
pump_logging(struct LoggingReceiver *logging)
{
char errbuf[1000];
int elevel;
int32 rc;
static uint64_t last_dropped_event_count = 0;
uint64_t dropped_event_count;
uint64_t dropped_now;
for (;;)
{
rc = communicator_worker_poll_logging(logging,
errbuf,
sizeof(errbuf),
&elevel,
&dropped_event_count);
if (rc == 0)
{
/* nothing to do */
break;
}
else if (rc == 1)
{
/* Because we don't want to exit on error */
if (message_level_is_interesting(elevel))
{
/*
* Prevent interrupts while cleaning up.
*
* (Not sure if this is required, but all the error handlers
* in Postgres that are installed as sigsetjmp() targets do
* this, so let's follow the example)
*/
HOLD_INTERRUPTS();
errstart(elevel, TEXTDOMAIN);
errmsg_internal("[COMMUNICATOR] %s", errbuf);
EmitErrorReport();
FlushErrorState();
/* Now we can allow interrupts again */
RESUME_INTERRUPTS();
}
}
else if (rc == -1)
{
elog(ERROR, "logging channel was closed unexpectedly");
}
}
/*
* If the queue was full at any time since the last time we reported it,
* report how many messages were lost. We do this outside the loop, so
* that if the logging system is clogged, we don't exacerbate it by
* printing lots of warnings about dropped messages.
*/
dropped_now = dropped_event_count - last_dropped_event_count;
if (dropped_now != 0)
{
elog(WARNING, "%lu communicator log messages were dropped because the log buffer was full",
(unsigned long) dropped_now);
last_dropped_event_count = dropped_event_count;
}
}
/****
* Callbacks from the rust code, in the communicator process.
*
* NOTE: These must be thread-safe! It's very limited which PostgreSQL
* functions you can use!!!
*
* The signatures of these need to match those in the Rust code.
*/
void
callback_set_my_latch_unsafe(void)
{
SetLatch(MyLatch);
}