mirror of
https://github.com/neondatabase/neon.git
synced 2026-02-10 06:00:38 +00:00
Compare commits
4 Commits
devin/1747
...
installed_
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
8d7fa802f4 | ||
|
|
811e5675a3 | ||
|
|
b9c6cd28a4 | ||
|
|
bd722f24ae |
@@ -310,6 +310,41 @@ async fn routes(req: Request<Body>, compute: &Arc<ComputeNode>) -> Response<Body
|
||||
}
|
||||
}
|
||||
|
||||
// handle HEAD method of /installed_extensions route
|
||||
// This is the signal from postgres that extension DDL occured and we need to update the metric.
|
||||
//
|
||||
// Don't wait for the result, because the caller doesn't need it
|
||||
// just spawn a task and the metric will be updated eventually.
|
||||
//
|
||||
// In theory there could be multiple HEAD requests in a row, so we should protect
|
||||
// from spawning multiple tasks, but in practice it's not a problem.
|
||||
// TODO: add some mutex or quere?
|
||||
// In practice, extensions are not installed very often, so it's not a problem
|
||||
(&Method::HEAD, route) if route.starts_with("/installed_extensions") => {
|
||||
info!("serving /installed_extensions HEAD request");
|
||||
let status = compute.get_status();
|
||||
if status != ComputeStatus::Running {
|
||||
let msg = format!(
|
||||
"invalid compute status for extensions request: {:?}",
|
||||
status
|
||||
);
|
||||
error!(msg);
|
||||
let mut resp = Response::new(Body::from(msg));
|
||||
*resp.status_mut() = StatusCode::INTERNAL_SERVER_ERROR;
|
||||
return resp;
|
||||
}
|
||||
|
||||
let conf = compute.get_conn_conf(None);
|
||||
task::spawn(async move {
|
||||
let _ = task::spawn_blocking(move || {
|
||||
installed_extensions::get_installed_extensions(conf)
|
||||
})
|
||||
.await;
|
||||
});
|
||||
|
||||
Response::new(Body::from("OK"))
|
||||
}
|
||||
|
||||
// download extension files from remote extension storage on demand
|
||||
(&Method::POST, route) if route.starts_with("/extension_server/") => {
|
||||
info!("serving {:?} POST request", route);
|
||||
|
||||
@@ -82,6 +82,21 @@ paths:
|
||||
application/json:
|
||||
schema:
|
||||
$ref: "#/components/schemas/InstalledExtensions"
|
||||
head:
|
||||
tags:
|
||||
- Extension
|
||||
summary: Report extension DDL to trigger metric recollection.
|
||||
description: ""
|
||||
operationId: ExtensionDDL
|
||||
responses:
|
||||
200:
|
||||
description: Result
|
||||
500:
|
||||
description: Internal error
|
||||
content:
|
||||
application/json:
|
||||
schema:
|
||||
$ref: "#/components/schemas/GenericError"
|
||||
/info:
|
||||
get:
|
||||
tags:
|
||||
@@ -537,12 +552,14 @@ components:
|
||||
properties:
|
||||
extname:
|
||||
type: string
|
||||
versions:
|
||||
type: array
|
||||
version:
|
||||
type: string
|
||||
items:
|
||||
type: string
|
||||
n_databases:
|
||||
type: integer
|
||||
owned_by_superuser:
|
||||
type: integer
|
||||
|
||||
SetRoleGrantsRequest:
|
||||
type: object
|
||||
|
||||
@@ -1,7 +1,6 @@
|
||||
use compute_api::responses::{InstalledExtension, InstalledExtensions};
|
||||
use metrics::proto::MetricFamily;
|
||||
use std::collections::HashMap;
|
||||
use std::collections::HashSet;
|
||||
|
||||
use anyhow::Result;
|
||||
use postgres::{Client, NoTls};
|
||||
@@ -38,65 +37,94 @@ fn list_dbs(client: &mut Client) -> Result<Vec<String>> {
|
||||
/// Connect to every database (see list_dbs above) and get the list of installed extensions.
|
||||
///
|
||||
/// Same extension can be installed in multiple databases with different versions,
|
||||
/// we only keep the highest and lowest version across all databases.
|
||||
/// so we report a separate metric (number of databases where it is installed)
|
||||
/// for each extension version.
|
||||
pub fn get_installed_extensions(mut conf: postgres::config::Config) -> Result<InstalledExtensions> {
|
||||
conf.application_name("compute_ctl:get_installed_extensions");
|
||||
let mut client = conf.connect(NoTls)?;
|
||||
|
||||
let databases: Vec<String> = list_dbs(&mut client)?;
|
||||
|
||||
let mut extensions_map: HashMap<String, InstalledExtension> = HashMap::new();
|
||||
let mut extensions_map: HashMap<(String, String, String), InstalledExtension> = HashMap::new();
|
||||
for db in databases.iter() {
|
||||
conf.dbname(db);
|
||||
let mut db_client = conf.connect(NoTls)?;
|
||||
let extensions: Vec<(String, String)> = db_client
|
||||
let extensions: Vec<(String, String, i32)> = db_client
|
||||
.query(
|
||||
"SELECT extname, extversion FROM pg_catalog.pg_extension;",
|
||||
"SELECT extname, extversion, extowner::integer FROM pg_catalog.pg_extension",
|
||||
&[],
|
||||
)?
|
||||
.iter()
|
||||
.map(|row| (row.get("extname"), row.get("extversion")))
|
||||
.map(|row| {
|
||||
(
|
||||
row.get("extname"),
|
||||
row.get("extversion"),
|
||||
row.get("extowner"),
|
||||
)
|
||||
})
|
||||
.collect();
|
||||
|
||||
for (extname, v) in extensions.iter() {
|
||||
for (extname, v, extowner) in extensions.iter() {
|
||||
let version = v.to_string();
|
||||
|
||||
// increment the number of databases where the version of extension is installed
|
||||
INSTALLED_EXTENSIONS
|
||||
.with_label_values(&[extname, &version])
|
||||
.inc();
|
||||
// check if the extension is owned by superuser
|
||||
// 10 is the oid of superuser
|
||||
let owned_by_superuser = if *extowner == 10 { "1" } else { "0" };
|
||||
|
||||
extensions_map
|
||||
.entry(extname.to_string())
|
||||
.entry((
|
||||
extname.to_string(),
|
||||
version.clone(),
|
||||
owned_by_superuser.to_string(),
|
||||
))
|
||||
.and_modify(|e| {
|
||||
e.versions.insert(version.clone());
|
||||
// count the number of databases where the extension is installed
|
||||
e.n_databases += 1;
|
||||
})
|
||||
.or_insert(InstalledExtension {
|
||||
extname: extname.to_string(),
|
||||
versions: HashSet::from([version.clone()]),
|
||||
version: version.clone(),
|
||||
n_databases: 1,
|
||||
owned_by_superuser: owned_by_superuser.to_string(),
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
let res = InstalledExtensions {
|
||||
extensions: extensions_map.into_values().collect(),
|
||||
};
|
||||
// reset the metric to handle dropped extensions and extension version changes -
|
||||
// we need to remove them from the metric.
|
||||
// It creates a race condition - if collector is called before we set the new values
|
||||
// we will have a gap in the metrics.
|
||||
//
|
||||
// TODO: Add a mutex to lock the metric update and collection
|
||||
// so that the collector doesn't see intermediate state.
|
||||
// Or is it ok for this metric to be eventually consistent?
|
||||
INSTALLED_EXTENSIONS.reset();
|
||||
|
||||
Ok(res)
|
||||
for (key, ext) in extensions_map.iter() {
|
||||
let (extname, version, owned_by_superuser) = key;
|
||||
let n_databases = ext.n_databases as u64;
|
||||
|
||||
INSTALLED_EXTENSIONS
|
||||
.with_label_values(&[extname, version, owned_by_superuser])
|
||||
.set(n_databases);
|
||||
}
|
||||
|
||||
tracing::debug!("Installed extensions: {:?}", extensions_map);
|
||||
|
||||
Ok(InstalledExtensions {
|
||||
extensions: extensions_map.into_values().collect(),
|
||||
})
|
||||
}
|
||||
|
||||
static INSTALLED_EXTENSIONS: Lazy<UIntGaugeVec> = Lazy::new(|| {
|
||||
register_uint_gauge_vec!(
|
||||
"compute_installed_extensions",
|
||||
"Number of databases where the version of extension is installed",
|
||||
&["extension_name", "version"]
|
||||
&["extension_name", "version", "owned_by_superuser"]
|
||||
)
|
||||
.expect("failed to define a metric")
|
||||
});
|
||||
|
||||
pub fn collect() -> Vec<MetricFamily> {
|
||||
// TODO Add a mutex here to ensure that the metric is not updated while we are collecting it
|
||||
INSTALLED_EXTENSIONS.collect()
|
||||
}
|
||||
|
||||
@@ -1,6 +1,5 @@
|
||||
//! Structs representing the JSON formats used in the compute_ctl's HTTP API.
|
||||
|
||||
use std::collections::HashSet;
|
||||
use std::fmt::Display;
|
||||
|
||||
use chrono::{DateTime, Utc};
|
||||
@@ -163,8 +162,9 @@ pub enum ControlPlaneComputeStatus {
|
||||
#[derive(Clone, Debug, Default, Serialize)]
|
||||
pub struct InstalledExtension {
|
||||
pub extname: String,
|
||||
pub versions: HashSet<String>,
|
||||
pub version: String,
|
||||
pub n_databases: u32, // Number of databases using this extension
|
||||
pub owned_by_superuser: String,
|
||||
}
|
||||
|
||||
#[derive(Clone, Debug, Default, Serialize)]
|
||||
|
||||
@@ -12,15 +12,21 @@
|
||||
|
||||
#include <curl/curl.h>
|
||||
|
||||
#include "access/xact.h"
|
||||
#include "utils/guc.h"
|
||||
#include "tcop/utility.h"
|
||||
|
||||
#include "extension_server.h"
|
||||
#include "neon_utils.h"
|
||||
|
||||
static int extension_server_port = 0;
|
||||
static int extension_server_port = 0;
|
||||
|
||||
static download_extension_file_hook_type prev_download_extension_file_hook = NULL;
|
||||
|
||||
static ProcessUtility_hook_type PreviousProcessUtilityHook = NULL;
|
||||
|
||||
static bool extension_ddl_occured = false;
|
||||
|
||||
/*
|
||||
* to download all SQL (and data) files for an extension:
|
||||
* curl -X POST http://localhost:8080/extension_server/postgis
|
||||
@@ -74,9 +80,154 @@ neon_download_extension_file_http(const char *filename, bool is_library)
|
||||
return ret;
|
||||
}
|
||||
|
||||
|
||||
// Handle extension DDL: we need this for monitoring of installed extensions.
|
||||
// General solution is hard, because extensions can be installed in many ways,
|
||||
// i.e. sometimes as a cascade operations.
|
||||
//
|
||||
// Also, we don't have enough information in the statement itself,
|
||||
// i.e. extension version is not always present and retrieved from the control file
|
||||
// at a later stage.
|
||||
//
|
||||
// So, just remember the fact of the extension DDL and send it to compute_ctl
|
||||
// on commit.
|
||||
static void
|
||||
NeonExtensionProcessUtility(
|
||||
PlannedStmt *pstmt,
|
||||
const char *queryString,
|
||||
bool readOnlyTree,
|
||||
ProcessUtilityContext context,
|
||||
ParamListInfo params,
|
||||
QueryEnvironment *queryEnv,
|
||||
DestReceiver *dest,
|
||||
QueryCompletion *qc)
|
||||
{
|
||||
Node *parseTree = pstmt->utilityStmt;
|
||||
|
||||
switch (nodeTag(parseTree))
|
||||
{
|
||||
case T_CreateExtensionStmt:
|
||||
case T_AlterExtensionStmt:
|
||||
extension_ddl_occured = true;
|
||||
elog(LOG, "Extension DDL occured");
|
||||
break;
|
||||
case T_DropStmt:
|
||||
{
|
||||
switch (((DropStmt *) parseTree)->removeType)
|
||||
{
|
||||
case OBJECT_EXTENSION:
|
||||
extension_ddl_occured = true;
|
||||
elog(LOG, "Extension DDL occured");
|
||||
break;
|
||||
default:
|
||||
break;
|
||||
}
|
||||
}
|
||||
break;
|
||||
default:
|
||||
break;
|
||||
}
|
||||
|
||||
if (PreviousProcessUtilityHook)
|
||||
{
|
||||
PreviousProcessUtilityHook(
|
||||
pstmt,
|
||||
queryString,
|
||||
readOnlyTree,
|
||||
context,
|
||||
params,
|
||||
queryEnv,
|
||||
dest,
|
||||
qc);
|
||||
}
|
||||
else
|
||||
{
|
||||
standard_ProcessUtility(
|
||||
pstmt,
|
||||
queryString,
|
||||
readOnlyTree,
|
||||
context,
|
||||
params,
|
||||
queryEnv,
|
||||
dest,
|
||||
qc);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
|
||||
static bool
|
||||
neon_send_extension_ddl_event_http()
|
||||
{
|
||||
static CURL *handle = NULL;
|
||||
|
||||
CURLcode res;
|
||||
char *compute_ctl_url;
|
||||
bool ret = false;
|
||||
|
||||
if (handle == NULL)
|
||||
{
|
||||
handle = alloc_curl_handle();
|
||||
}
|
||||
|
||||
compute_ctl_url = psprintf("http://localhost:%d/installed_extensions",
|
||||
extension_server_port);
|
||||
|
||||
elog(LOG, "Sending ddl event to compute_ctl: %s", compute_ctl_url);
|
||||
|
||||
curl_easy_setopt(handle, CURLOPT_URL, compute_ctl_url);
|
||||
|
||||
// Use HEAD request without payload, because this is just a notification.
|
||||
//
|
||||
// This is probably not the best API design, but I didn't want to introduce
|
||||
// new endpoint for this. Suggestions are welcome.
|
||||
curl_easy_setopt(handle, CURLOPT_NOBODY, 1L);
|
||||
|
||||
/* Perform the request, res will get the return code */
|
||||
res = curl_easy_perform(handle);
|
||||
|
||||
/* Check for errors */
|
||||
if (res != CURLE_OK)
|
||||
{
|
||||
/*
|
||||
* Don't error here because this is just a monitoring feature.
|
||||
*/
|
||||
elog(WARNING, "neon_send_extension_ddl_event_http failed: %s\n", curl_easy_strerror(res));
|
||||
}
|
||||
|
||||
return ret;
|
||||
}
|
||||
|
||||
static void
|
||||
NeonExtensionXactCallback(XactEvent event, void *arg)
|
||||
{
|
||||
elog(LOG, "NeonExtensionXactCallback: %d", event);
|
||||
/* the handler on the compute_ctl side must be non-blocking
|
||||
* otherwise, the compute_ctl won't see the data that is not yet committed.
|
||||
* There is still a chance of the race, because the data becomes visible only after XACT_EVENT_COMMIT
|
||||
* callback is called. We assume that this is not critical for the monitoring feature and expect that
|
||||
* compute_ctl will eventually see the data.
|
||||
*/
|
||||
if (event == XACT_EVENT_COMMIT || event == XACT_EVENT_PARALLEL_COMMIT)
|
||||
{
|
||||
if (extension_ddl_occured)
|
||||
{
|
||||
elog(LOG, "Sending extension DDL event to compute_ctl");
|
||||
neon_send_extension_ddl_event_http();
|
||||
extension_ddl_occured = false;
|
||||
}
|
||||
elog(LOG, "no extension DDL");
|
||||
}
|
||||
}
|
||||
|
||||
void
|
||||
pg_init_extension_server()
|
||||
{
|
||||
|
||||
PreviousProcessUtilityHook = ProcessUtility_hook;
|
||||
ProcessUtility_hook = NeonExtensionProcessUtility;
|
||||
RegisterXactCallback(NeonExtensionXactCallback, NULL);
|
||||
|
||||
/* Port to connect to compute_ctl on localhost */
|
||||
/* to request extension files. */
|
||||
DefineCustomIntVariable("neon.extension_server_port",
|
||||
|
||||
@@ -30,7 +30,7 @@ def test_installed_extensions(neon_simple_env: NeonEnv):
|
||||
info("Extensions: %s", res["extensions"])
|
||||
# 'plpgsql' is a default extension that is always installed.
|
||||
assert any(
|
||||
ext["extname"] == "plpgsql" and ext["versions"] == ["1.0"] for ext in res["extensions"]
|
||||
ext["extname"] == "plpgsql" and ext["version"] == "1.0" for ext in res["extensions"]
|
||||
), "The 'plpgsql' extension is missing"
|
||||
|
||||
# check that the neon_test_utils extension is not installed
|
||||
@@ -63,7 +63,7 @@ def test_installed_extensions(neon_simple_env: NeonEnv):
|
||||
# and has the expected version
|
||||
assert any(
|
||||
ext["extname"] == "neon_test_utils"
|
||||
and ext["versions"] == [neon_test_utils_version]
|
||||
and ext["version"] == neon_test_utils_version
|
||||
and ext["n_databases"] == 1
|
||||
for ext in res["extensions"]
|
||||
)
|
||||
@@ -75,9 +75,8 @@ def test_installed_extensions(neon_simple_env: NeonEnv):
|
||||
# check that the neon extension is installed and has expected versions
|
||||
for ext in res["extensions"]:
|
||||
if ext["extname"] == "neon":
|
||||
assert ext["n_databases"] == 2
|
||||
ext["versions"].sort()
|
||||
assert ext["versions"] == ["1.1", "1.2"]
|
||||
assert ext["version"] in ["1.1", "1.2"]
|
||||
assert ext["n_databases"] == 1
|
||||
|
||||
with pg_conn.cursor() as cur:
|
||||
cur.execute("ALTER EXTENSION neon UPDATE TO '1.3'")
|
||||
@@ -90,9 +89,8 @@ def test_installed_extensions(neon_simple_env: NeonEnv):
|
||||
# check that the neon_test_utils extension is updated
|
||||
for ext in res["extensions"]:
|
||||
if ext["extname"] == "neon":
|
||||
assert ext["n_databases"] == 2
|
||||
ext["versions"].sort()
|
||||
assert ext["versions"] == ["1.2", "1.3"]
|
||||
assert ext["version"] in ["1.2", "1.3"]
|
||||
assert ext["n_databases"] == 1
|
||||
|
||||
# check that /metrics endpoint is available
|
||||
# ensure that we see the metric before and after restart
|
||||
@@ -100,13 +98,15 @@ def test_installed_extensions(neon_simple_env: NeonEnv):
|
||||
info("Metrics: %s", res)
|
||||
m = parse_metrics(res)
|
||||
neon_m = m.query_all(
|
||||
"compute_installed_extensions", {"extension_name": "neon", "version": "1.2"}
|
||||
"compute_installed_extensions",
|
||||
{"extension_name": "neon", "version": "1.2", "owned_by_superuser": "1"},
|
||||
)
|
||||
assert len(neon_m) == 1
|
||||
for sample in neon_m:
|
||||
assert sample.value == 2
|
||||
assert sample.value == 1
|
||||
neon_m = m.query_all(
|
||||
"compute_installed_extensions", {"extension_name": "neon", "version": "1.3"}
|
||||
"compute_installed_extensions",
|
||||
{"extension_name": "neon", "version": "1.3", "owned_by_superuser": "1"},
|
||||
)
|
||||
assert len(neon_m) == 1
|
||||
for sample in neon_m:
|
||||
@@ -138,15 +138,107 @@ def test_installed_extensions(neon_simple_env: NeonEnv):
|
||||
info("After restart metrics: %s", res)
|
||||
m = parse_metrics(res)
|
||||
neon_m = m.query_all(
|
||||
"compute_installed_extensions", {"extension_name": "neon", "version": "1.2"}
|
||||
"compute_installed_extensions",
|
||||
{"extension_name": "neon", "version": "1.2", "owned_by_superuser": "1"},
|
||||
)
|
||||
assert len(neon_m) == 1
|
||||
for sample in neon_m:
|
||||
assert sample.value == 1
|
||||
|
||||
neon_m = m.query_all(
|
||||
"compute_installed_extensions", {"extension_name": "neon", "version": "1.3"}
|
||||
"compute_installed_extensions",
|
||||
{"extension_name": "neon", "version": "1.3", "owned_by_superuser": "1"},
|
||||
)
|
||||
assert len(neon_m) == 1
|
||||
for sample in neon_m:
|
||||
assert sample.value == 1
|
||||
|
||||
|
||||
# WIP Test metric live update
|
||||
# TODO: cleamup the test and stabilize it.
|
||||
# Now there is a race, because there is a gap between the extension creation and the metric collection.
|
||||
# This is fine for the real world, as we don't need to be 100% real time, but not convenient for the test.
|
||||
def test_installed_extensions_metric_live_update(neon_simple_env: NeonEnv):
|
||||
"""basic test for the endpoint that returns the list of installed extensions"""
|
||||
|
||||
env = neon_simple_env
|
||||
|
||||
env.create_branch("test_installed_extensions_metric_live_update")
|
||||
|
||||
endpoint = env.endpoints.create_start("test_installed_extensions_metric_live_update")
|
||||
|
||||
endpoint.safe_psql("CREATE DATABASE test_installed_extensions")
|
||||
endpoint.safe_psql("CREATE DATABASE test_installed_extensions_2")
|
||||
client = endpoint.http_client()
|
||||
|
||||
timeout = 10
|
||||
while timeout > 0:
|
||||
try:
|
||||
res = client.metrics()
|
||||
timeout = -1
|
||||
if len(parse_metrics(res).query_all("compute_installed_extensions")) < 1:
|
||||
# Assume that not all metrics that are collected yet
|
||||
time.sleep(1)
|
||||
timeout -= 1
|
||||
continue
|
||||
except Exception:
|
||||
log.exception("failed to get metrics, assume they are not collected yet")
|
||||
time.sleep(1)
|
||||
timeout -= 1
|
||||
continue
|
||||
|
||||
info("After start metrics: %s", res)
|
||||
m = parse_metrics(res)
|
||||
info("parsed metrics: %s", m)
|
||||
|
||||
# create extension neon_test_utils
|
||||
pg_conn = endpoint.connect(dbname="test_installed_extensions")
|
||||
with pg_conn.cursor() as cur:
|
||||
cur.execute("CREATE EXTENSION neon_test_utils")
|
||||
cur.execute(
|
||||
"SELECT default_version FROM pg_available_extensions WHERE name = 'neon_test_utils'"
|
||||
)
|
||||
res = cur.fetchone()
|
||||
neon_test_utils_version = res[0]
|
||||
|
||||
# check the metric again
|
||||
res = client.metrics()
|
||||
info("After creating neon_test_utils metrics: %s", res)
|
||||
time.sleep(1)
|
||||
res = client.metrics()
|
||||
info("After creating neon_test_utils metrics: %s", res)
|
||||
|
||||
m = parse_metrics(res)
|
||||
neon_m = m.query_all(
|
||||
"compute_installed_extensions",
|
||||
{
|
||||
"extension_name": "neon_test_utils",
|
||||
"version": neon_test_utils_version,
|
||||
"owned_by_superuser": "1",
|
||||
},
|
||||
)
|
||||
assert len(neon_m) == 1
|
||||
for sample in neon_m:
|
||||
assert sample.value == 1
|
||||
|
||||
# drop extension neon_test_utils
|
||||
with pg_conn.cursor() as cur:
|
||||
cur.execute("DROP EXTENSION neon_test_utils")
|
||||
|
||||
# check the metric again
|
||||
res = client.metrics()
|
||||
info("After dropping neon_test_utils metrics: %s", res)
|
||||
time.sleep(5)
|
||||
res = client.metrics()
|
||||
info("After dropping neon_test_utils metrics: %s", res)
|
||||
|
||||
m = parse_metrics(res)
|
||||
neon_m = m.query_all(
|
||||
"compute_installed_extensions",
|
||||
{
|
||||
"extension_name": "neon_test_utils",
|
||||
"version": neon_test_utils_version,
|
||||
"owned_by_superuser": "1",
|
||||
},
|
||||
)
|
||||
assert len(neon_m) == 0
|
||||
|
||||
Reference in New Issue
Block a user