Compare commits

...

4 Commits

Author SHA1 Message Date
Anastasia Lubennikova
8d7fa802f4 code cleanup 2024-12-05 10:31:43 +00:00
Anastasia Lubennikova
811e5675a3 Track extension updates live 2024-12-04 19:20:55 +00:00
Anastasia Lubennikova
b9c6cd28a4 WIP track extension DDL events 2024-12-03 14:50:14 +00:00
Anastasia Lubennikova
bd722f24ae Update compute_installed_extensions metric:
add owned_by_superuser field to filter out system extensions.

While on it, also correct related code:
- fix the metric setting: use set() instead of inc() in a loop.
inc() is not idempotent and can lead to incorrect results
if the function called multiple times. Currently it is only called at
compute start, but this will change soon.
- fix the return type of the installed_extensions endpoint
to match the metric. Currently it is only used in the test.
2024-12-03 12:16:00 +00:00
6 changed files with 361 additions and 38 deletions

View File

@@ -310,6 +310,41 @@ async fn routes(req: Request<Body>, compute: &Arc<ComputeNode>) -> Response<Body
}
}
// handle HEAD method of /installed_extensions route
// This is the signal from postgres that extension DDL occured and we need to update the metric.
//
// Don't wait for the result, because the caller doesn't need it
// just spawn a task and the metric will be updated eventually.
//
// In theory there could be multiple HEAD requests in a row, so we should protect
// from spawning multiple tasks, but in practice it's not a problem.
// TODO: add some mutex or quere?
// In practice, extensions are not installed very often, so it's not a problem
(&Method::HEAD, route) if route.starts_with("/installed_extensions") => {
info!("serving /installed_extensions HEAD request");
let status = compute.get_status();
if status != ComputeStatus::Running {
let msg = format!(
"invalid compute status for extensions request: {:?}",
status
);
error!(msg);
let mut resp = Response::new(Body::from(msg));
*resp.status_mut() = StatusCode::INTERNAL_SERVER_ERROR;
return resp;
}
let conf = compute.get_conn_conf(None);
task::spawn(async move {
let _ = task::spawn_blocking(move || {
installed_extensions::get_installed_extensions(conf)
})
.await;
});
Response::new(Body::from("OK"))
}
// download extension files from remote extension storage on demand
(&Method::POST, route) if route.starts_with("/extension_server/") => {
info!("serving {:?} POST request", route);

View File

@@ -82,6 +82,21 @@ paths:
application/json:
schema:
$ref: "#/components/schemas/InstalledExtensions"
head:
tags:
- Extension
summary: Report extension DDL to trigger metric recollection.
description: ""
operationId: ExtensionDDL
responses:
200:
description: Result
500:
description: Internal error
content:
application/json:
schema:
$ref: "#/components/schemas/GenericError"
/info:
get:
tags:
@@ -537,12 +552,14 @@ components:
properties:
extname:
type: string
versions:
type: array
version:
type: string
items:
type: string
n_databases:
type: integer
owned_by_superuser:
type: integer
SetRoleGrantsRequest:
type: object

View File

@@ -1,7 +1,6 @@
use compute_api::responses::{InstalledExtension, InstalledExtensions};
use metrics::proto::MetricFamily;
use std::collections::HashMap;
use std::collections::HashSet;
use anyhow::Result;
use postgres::{Client, NoTls};
@@ -38,65 +37,94 @@ fn list_dbs(client: &mut Client) -> Result<Vec<String>> {
/// Connect to every database (see list_dbs above) and get the list of installed extensions.
///
/// Same extension can be installed in multiple databases with different versions,
/// we only keep the highest and lowest version across all databases.
/// so we report a separate metric (number of databases where it is installed)
/// for each extension version.
pub fn get_installed_extensions(mut conf: postgres::config::Config) -> Result<InstalledExtensions> {
conf.application_name("compute_ctl:get_installed_extensions");
let mut client = conf.connect(NoTls)?;
let databases: Vec<String> = list_dbs(&mut client)?;
let mut extensions_map: HashMap<String, InstalledExtension> = HashMap::new();
let mut extensions_map: HashMap<(String, String, String), InstalledExtension> = HashMap::new();
for db in databases.iter() {
conf.dbname(db);
let mut db_client = conf.connect(NoTls)?;
let extensions: Vec<(String, String)> = db_client
let extensions: Vec<(String, String, i32)> = db_client
.query(
"SELECT extname, extversion FROM pg_catalog.pg_extension;",
"SELECT extname, extversion, extowner::integer FROM pg_catalog.pg_extension",
&[],
)?
.iter()
.map(|row| (row.get("extname"), row.get("extversion")))
.map(|row| {
(
row.get("extname"),
row.get("extversion"),
row.get("extowner"),
)
})
.collect();
for (extname, v) in extensions.iter() {
for (extname, v, extowner) in extensions.iter() {
let version = v.to_string();
// increment the number of databases where the version of extension is installed
INSTALLED_EXTENSIONS
.with_label_values(&[extname, &version])
.inc();
// check if the extension is owned by superuser
// 10 is the oid of superuser
let owned_by_superuser = if *extowner == 10 { "1" } else { "0" };
extensions_map
.entry(extname.to_string())
.entry((
extname.to_string(),
version.clone(),
owned_by_superuser.to_string(),
))
.and_modify(|e| {
e.versions.insert(version.clone());
// count the number of databases where the extension is installed
e.n_databases += 1;
})
.or_insert(InstalledExtension {
extname: extname.to_string(),
versions: HashSet::from([version.clone()]),
version: version.clone(),
n_databases: 1,
owned_by_superuser: owned_by_superuser.to_string(),
});
}
}
let res = InstalledExtensions {
extensions: extensions_map.into_values().collect(),
};
// reset the metric to handle dropped extensions and extension version changes -
// we need to remove them from the metric.
// It creates a race condition - if collector is called before we set the new values
// we will have a gap in the metrics.
//
// TODO: Add a mutex to lock the metric update and collection
// so that the collector doesn't see intermediate state.
// Or is it ok for this metric to be eventually consistent?
INSTALLED_EXTENSIONS.reset();
Ok(res)
for (key, ext) in extensions_map.iter() {
let (extname, version, owned_by_superuser) = key;
let n_databases = ext.n_databases as u64;
INSTALLED_EXTENSIONS
.with_label_values(&[extname, version, owned_by_superuser])
.set(n_databases);
}
tracing::debug!("Installed extensions: {:?}", extensions_map);
Ok(InstalledExtensions {
extensions: extensions_map.into_values().collect(),
})
}
static INSTALLED_EXTENSIONS: Lazy<UIntGaugeVec> = Lazy::new(|| {
register_uint_gauge_vec!(
"compute_installed_extensions",
"Number of databases where the version of extension is installed",
&["extension_name", "version"]
&["extension_name", "version", "owned_by_superuser"]
)
.expect("failed to define a metric")
});
pub fn collect() -> Vec<MetricFamily> {
// TODO Add a mutex here to ensure that the metric is not updated while we are collecting it
INSTALLED_EXTENSIONS.collect()
}

View File

@@ -1,6 +1,5 @@
//! Structs representing the JSON formats used in the compute_ctl's HTTP API.
use std::collections::HashSet;
use std::fmt::Display;
use chrono::{DateTime, Utc};
@@ -163,8 +162,9 @@ pub enum ControlPlaneComputeStatus {
#[derive(Clone, Debug, Default, Serialize)]
pub struct InstalledExtension {
pub extname: String,
pub versions: HashSet<String>,
pub version: String,
pub n_databases: u32, // Number of databases using this extension
pub owned_by_superuser: String,
}
#[derive(Clone, Debug, Default, Serialize)]

View File

@@ -12,15 +12,21 @@
#include <curl/curl.h>
#include "access/xact.h"
#include "utils/guc.h"
#include "tcop/utility.h"
#include "extension_server.h"
#include "neon_utils.h"
static int extension_server_port = 0;
static int extension_server_port = 0;
static download_extension_file_hook_type prev_download_extension_file_hook = NULL;
static ProcessUtility_hook_type PreviousProcessUtilityHook = NULL;
static bool extension_ddl_occured = false;
/*
* to download all SQL (and data) files for an extension:
* curl -X POST http://localhost:8080/extension_server/postgis
@@ -74,9 +80,154 @@ neon_download_extension_file_http(const char *filename, bool is_library)
return ret;
}
// Handle extension DDL: we need this for monitoring of installed extensions.
// General solution is hard, because extensions can be installed in many ways,
// i.e. sometimes as a cascade operations.
//
// Also, we don't have enough information in the statement itself,
// i.e. extension version is not always present and retrieved from the control file
// at a later stage.
//
// So, just remember the fact of the extension DDL and send it to compute_ctl
// on commit.
static void
NeonExtensionProcessUtility(
PlannedStmt *pstmt,
const char *queryString,
bool readOnlyTree,
ProcessUtilityContext context,
ParamListInfo params,
QueryEnvironment *queryEnv,
DestReceiver *dest,
QueryCompletion *qc)
{
Node *parseTree = pstmt->utilityStmt;
switch (nodeTag(parseTree))
{
case T_CreateExtensionStmt:
case T_AlterExtensionStmt:
extension_ddl_occured = true;
elog(LOG, "Extension DDL occured");
break;
case T_DropStmt:
{
switch (((DropStmt *) parseTree)->removeType)
{
case OBJECT_EXTENSION:
extension_ddl_occured = true;
elog(LOG, "Extension DDL occured");
break;
default:
break;
}
}
break;
default:
break;
}
if (PreviousProcessUtilityHook)
{
PreviousProcessUtilityHook(
pstmt,
queryString,
readOnlyTree,
context,
params,
queryEnv,
dest,
qc);
}
else
{
standard_ProcessUtility(
pstmt,
queryString,
readOnlyTree,
context,
params,
queryEnv,
dest,
qc);
}
}
static bool
neon_send_extension_ddl_event_http()
{
static CURL *handle = NULL;
CURLcode res;
char *compute_ctl_url;
bool ret = false;
if (handle == NULL)
{
handle = alloc_curl_handle();
}
compute_ctl_url = psprintf("http://localhost:%d/installed_extensions",
extension_server_port);
elog(LOG, "Sending ddl event to compute_ctl: %s", compute_ctl_url);
curl_easy_setopt(handle, CURLOPT_URL, compute_ctl_url);
// Use HEAD request without payload, because this is just a notification.
//
// This is probably not the best API design, but I didn't want to introduce
// new endpoint for this. Suggestions are welcome.
curl_easy_setopt(handle, CURLOPT_NOBODY, 1L);
/* Perform the request, res will get the return code */
res = curl_easy_perform(handle);
/* Check for errors */
if (res != CURLE_OK)
{
/*
* Don't error here because this is just a monitoring feature.
*/
elog(WARNING, "neon_send_extension_ddl_event_http failed: %s\n", curl_easy_strerror(res));
}
return ret;
}
static void
NeonExtensionXactCallback(XactEvent event, void *arg)
{
elog(LOG, "NeonExtensionXactCallback: %d", event);
/* the handler on the compute_ctl side must be non-blocking
* otherwise, the compute_ctl won't see the data that is not yet committed.
* There is still a chance of the race, because the data becomes visible only after XACT_EVENT_COMMIT
* callback is called. We assume that this is not critical for the monitoring feature and expect that
* compute_ctl will eventually see the data.
*/
if (event == XACT_EVENT_COMMIT || event == XACT_EVENT_PARALLEL_COMMIT)
{
if (extension_ddl_occured)
{
elog(LOG, "Sending extension DDL event to compute_ctl");
neon_send_extension_ddl_event_http();
extension_ddl_occured = false;
}
elog(LOG, "no extension DDL");
}
}
void
pg_init_extension_server()
{
PreviousProcessUtilityHook = ProcessUtility_hook;
ProcessUtility_hook = NeonExtensionProcessUtility;
RegisterXactCallback(NeonExtensionXactCallback, NULL);
/* Port to connect to compute_ctl on localhost */
/* to request extension files. */
DefineCustomIntVariable("neon.extension_server_port",

View File

@@ -30,7 +30,7 @@ def test_installed_extensions(neon_simple_env: NeonEnv):
info("Extensions: %s", res["extensions"])
# 'plpgsql' is a default extension that is always installed.
assert any(
ext["extname"] == "plpgsql" and ext["versions"] == ["1.0"] for ext in res["extensions"]
ext["extname"] == "plpgsql" and ext["version"] == "1.0" for ext in res["extensions"]
), "The 'plpgsql' extension is missing"
# check that the neon_test_utils extension is not installed
@@ -63,7 +63,7 @@ def test_installed_extensions(neon_simple_env: NeonEnv):
# and has the expected version
assert any(
ext["extname"] == "neon_test_utils"
and ext["versions"] == [neon_test_utils_version]
and ext["version"] == neon_test_utils_version
and ext["n_databases"] == 1
for ext in res["extensions"]
)
@@ -75,9 +75,8 @@ def test_installed_extensions(neon_simple_env: NeonEnv):
# check that the neon extension is installed and has expected versions
for ext in res["extensions"]:
if ext["extname"] == "neon":
assert ext["n_databases"] == 2
ext["versions"].sort()
assert ext["versions"] == ["1.1", "1.2"]
assert ext["version"] in ["1.1", "1.2"]
assert ext["n_databases"] == 1
with pg_conn.cursor() as cur:
cur.execute("ALTER EXTENSION neon UPDATE TO '1.3'")
@@ -90,9 +89,8 @@ def test_installed_extensions(neon_simple_env: NeonEnv):
# check that the neon_test_utils extension is updated
for ext in res["extensions"]:
if ext["extname"] == "neon":
assert ext["n_databases"] == 2
ext["versions"].sort()
assert ext["versions"] == ["1.2", "1.3"]
assert ext["version"] in ["1.2", "1.3"]
assert ext["n_databases"] == 1
# check that /metrics endpoint is available
# ensure that we see the metric before and after restart
@@ -100,13 +98,15 @@ def test_installed_extensions(neon_simple_env: NeonEnv):
info("Metrics: %s", res)
m = parse_metrics(res)
neon_m = m.query_all(
"compute_installed_extensions", {"extension_name": "neon", "version": "1.2"}
"compute_installed_extensions",
{"extension_name": "neon", "version": "1.2", "owned_by_superuser": "1"},
)
assert len(neon_m) == 1
for sample in neon_m:
assert sample.value == 2
assert sample.value == 1
neon_m = m.query_all(
"compute_installed_extensions", {"extension_name": "neon", "version": "1.3"}
"compute_installed_extensions",
{"extension_name": "neon", "version": "1.3", "owned_by_superuser": "1"},
)
assert len(neon_m) == 1
for sample in neon_m:
@@ -138,15 +138,107 @@ def test_installed_extensions(neon_simple_env: NeonEnv):
info("After restart metrics: %s", res)
m = parse_metrics(res)
neon_m = m.query_all(
"compute_installed_extensions", {"extension_name": "neon", "version": "1.2"}
"compute_installed_extensions",
{"extension_name": "neon", "version": "1.2", "owned_by_superuser": "1"},
)
assert len(neon_m) == 1
for sample in neon_m:
assert sample.value == 1
neon_m = m.query_all(
"compute_installed_extensions", {"extension_name": "neon", "version": "1.3"}
"compute_installed_extensions",
{"extension_name": "neon", "version": "1.3", "owned_by_superuser": "1"},
)
assert len(neon_m) == 1
for sample in neon_m:
assert sample.value == 1
# WIP Test metric live update
# TODO: cleamup the test and stabilize it.
# Now there is a race, because there is a gap between the extension creation and the metric collection.
# This is fine for the real world, as we don't need to be 100% real time, but not convenient for the test.
def test_installed_extensions_metric_live_update(neon_simple_env: NeonEnv):
"""basic test for the endpoint that returns the list of installed extensions"""
env = neon_simple_env
env.create_branch("test_installed_extensions_metric_live_update")
endpoint = env.endpoints.create_start("test_installed_extensions_metric_live_update")
endpoint.safe_psql("CREATE DATABASE test_installed_extensions")
endpoint.safe_psql("CREATE DATABASE test_installed_extensions_2")
client = endpoint.http_client()
timeout = 10
while timeout > 0:
try:
res = client.metrics()
timeout = -1
if len(parse_metrics(res).query_all("compute_installed_extensions")) < 1:
# Assume that not all metrics that are collected yet
time.sleep(1)
timeout -= 1
continue
except Exception:
log.exception("failed to get metrics, assume they are not collected yet")
time.sleep(1)
timeout -= 1
continue
info("After start metrics: %s", res)
m = parse_metrics(res)
info("parsed metrics: %s", m)
# create extension neon_test_utils
pg_conn = endpoint.connect(dbname="test_installed_extensions")
with pg_conn.cursor() as cur:
cur.execute("CREATE EXTENSION neon_test_utils")
cur.execute(
"SELECT default_version FROM pg_available_extensions WHERE name = 'neon_test_utils'"
)
res = cur.fetchone()
neon_test_utils_version = res[0]
# check the metric again
res = client.metrics()
info("After creating neon_test_utils metrics: %s", res)
time.sleep(1)
res = client.metrics()
info("After creating neon_test_utils metrics: %s", res)
m = parse_metrics(res)
neon_m = m.query_all(
"compute_installed_extensions",
{
"extension_name": "neon_test_utils",
"version": neon_test_utils_version,
"owned_by_superuser": "1",
},
)
assert len(neon_m) == 1
for sample in neon_m:
assert sample.value == 1
# drop extension neon_test_utils
with pg_conn.cursor() as cur:
cur.execute("DROP EXTENSION neon_test_utils")
# check the metric again
res = client.metrics()
info("After dropping neon_test_utils metrics: %s", res)
time.sleep(5)
res = client.metrics()
info("After dropping neon_test_utils metrics: %s", res)
m = parse_metrics(res)
neon_m = m.query_all(
"compute_installed_extensions",
{
"extension_name": "neon_test_utils",
"version": neon_test_utils_version,
"owned_by_superuser": "1",
},
)
assert len(neon_m) == 0