Track extension updates live

This commit is contained in:
Anastasia Lubennikova
2024-12-04 19:20:55 +00:00
parent b9c6cd28a4
commit 811e5675a3
4 changed files with 124 additions and 7 deletions

View File

@@ -326,9 +326,20 @@ async fn routes(req: Request<Body>, compute: &Arc<ComputeNode>) -> Response<Body
return resp;
}
let connstr = compute.connstr.clone();
// should I rewrite this to not wait for the result?
let _ = crate::installed_extensions::get_installed_extensions(connstr).await;
//let _ = crate::installed_extensions::get_installed_extensions(connstr).await;
// spwan a task to get installed extensions, but don't wait for the result
// since get_installed_extensions is a blocking call call it with spawn_blocking
// within an async task
let conf = compute.get_conn_conf(None);
task::spawn(async move {
let _ = task::spawn_blocking(move || {
installed_extensions::get_installed_extensions(conf)
})
.await;
});
Response::new(Body::from("OK"))
}

View File

@@ -89,6 +89,11 @@ pub fn get_installed_extensions(mut conf: postgres::config::Config) -> Result<In
}
}
// reset the metric to handle dropped extensions and extension version changes
// It creates a race condition - if collector is called before we set the new values
// we will have a gap in the metrics. But it's not worth to optimize it further.
INSTALLED_EXTENSIONS.reset();
for (key, ext) in extensions_map.iter() {
let (extname, version, owned_by_superuser) = key;
let n_databases = ext.n_databases as u64;
@@ -98,6 +103,9 @@ pub fn get_installed_extensions(mut conf: postgres::config::Config) -> Result<In
.set(n_databases);
}
// log the installed extensions
tracing::info!("Installed extensions: {:?}", extensions_map);
Ok(InstalledExtensions {
extensions: extensions_map.into_values().collect(),
})

View File

@@ -19,7 +19,7 @@
#include "extension_server.h"
#include "neon_utils.h"
static int extension_server_port = 0;
static int extension_server_port = 0;
static download_extension_file_hook_type prev_download_extension_file_hook = NULL;
@@ -109,6 +109,7 @@ NeonExtensionProcessUtility(
case T_CreateExtensionStmt:
case T_AlterExtensionStmt:
extension_ddl_occured = true;
elog(LOG, "Extension DDL occured");
break;
case T_DropStmt:
{
@@ -116,6 +117,7 @@ NeonExtensionProcessUtility(
{
case OBJECT_EXTENSION:
extension_ddl_occured = true;
elog(LOG, "Extension DDL occured");
break;
default:
break;
@@ -171,7 +173,7 @@ neon_send_extension_ddl_event_http()
compute_ctl_url = psprintf("http://localhost:%d/installed_extensions",
extension_server_port);
elog(LOG, "Sending request to compute_ctl: %s", compute_ctl_url);
elog(LOG, "Sending ddl event to compute_ctl: %s", compute_ctl_url);
curl_easy_setopt(handle, CURLOPT_URL, compute_ctl_url);
@@ -199,13 +201,23 @@ neon_send_extension_ddl_event_http()
static void
NeonExtensionXactCallback(XactEvent event, void *arg)
{
if (event == XACT_EVENT_PRE_COMMIT || event == XACT_EVENT_PARALLEL_PRE_COMMIT)
elog(LOG, "NeonExtensionXactCallback: %d", event);
/* the handler on the compute_ctl side must be non-blocking
* otherwise, the compute_ctl won't see the data that is not yet committed.
* There is still a chance of the race, because the data becomes visible only after XACT_EVENT_COMMIT
* callback is called. We assume that this is not critical for the monitoring feature and expect that
* compute_ctl will eventually see the data.
*/
if (event == XACT_EVENT_COMMIT || event == XACT_EVENT_PARALLEL_COMMIT)
{
if (extension_ddl_occured)
{
elog(LOG, "Sending extension DDL event to compute_ctl");
neon_send_extension_ddl_event_http();
extension_ddl_occured = false;
}
elog(LOG, "no extension DDL");
}
extension_ddl_occured = false;
}
void

View File

@@ -152,3 +152,89 @@ def test_installed_extensions(neon_simple_env: NeonEnv):
assert len(neon_m) == 1
for sample in neon_m:
assert sample.value == 1
def test_installed_extensions_metric_live_update(neon_simple_env: NeonEnv):
"""basic test for the endpoint that returns the list of installed extensions"""
env = neon_simple_env
env.create_branch("test_installed_extensions_metric_live_update")
endpoint = env.endpoints.create_start("test_installed_extensions_metric_live_update")
endpoint.safe_psql("CREATE DATABASE test_installed_extensions")
endpoint.safe_psql("CREATE DATABASE test_installed_extensions_2")
client = endpoint.http_client()
timeout = 10
while timeout > 0:
try:
res = client.metrics()
timeout = -1
if len(parse_metrics(res).query_all("compute_installed_extensions")) < 1:
# Assume that not all metrics that are collected yet
time.sleep(1)
timeout -= 1
continue
except Exception:
log.exception("failed to get metrics, assume they are not collected yet")
time.sleep(1)
timeout -= 1
continue
info("After start metrics: %s", res)
m = parse_metrics(res)
info("parsed metrics: %s", m)
# create extension neon_test_utils
pg_conn = endpoint.connect(dbname="test_installed_extensions")
with pg_conn.cursor() as cur:
cur.execute("CREATE EXTENSION neon_test_utils")
cur.execute(
"SELECT default_version FROM pg_available_extensions WHERE name = 'neon_test_utils'"
)
res = cur.fetchone()
neon_test_utils_version = res[0]
# check the metric again
res = client.metrics()
info("After creating neon_test_utils metrics: %s", res)
time.sleep(1)
res = client.metrics()
info("After creating neon_test_utils metrics: %s", res)
m = parse_metrics(res)
neon_m = m.query_all(
"compute_installed_extensions",
{
"extension_name": "neon_test_utils",
"version": neon_test_utils_version,
"owned_by_superuser": "1",
},
)
assert len(neon_m) == 1
for sample in neon_m:
assert sample.value == 1
# drop extension neon_test_utils
with pg_conn.cursor() as cur:
cur.execute("DROP EXTENSION neon_test_utils")
# check the metric again
res = client.metrics()
info("After dropping neon_test_utils metrics: %s", res)
time.sleep(5)
res = client.metrics()
info("After dropping neon_test_utils metrics: %s", res)
m = parse_metrics(res)
neon_m = m.query_all(
"compute_installed_extensions",
{
"extension_name": "neon_test_utils",
"version": neon_test_utils_version,
"owned_by_superuser": "1",
},
)
assert len(neon_m) == 0