WIP track extension DDL events

This commit is contained in:
Anastasia Lubennikova
2024-11-28 16:33:41 +00:00
parent bd722f24ae
commit b9c6cd28a4
3 changed files with 176 additions and 0 deletions

View File

@@ -310,6 +310,28 @@ async fn routes(req: Request<Body>, compute: &Arc<ComputeNode>) -> Response<Body
}
}
// handle HEAD method of /installed_extensions route
// just log info
(&Method::HEAD, route) if route.starts_with("/installed_extensions") => {
info!("serving /installed_extensions HEAD request");
let status = compute.get_status();
if status != ComputeStatus::Running {
let msg = format!(
"invalid compute status for extensions request: {:?}",
status
);
error!(msg);
let mut resp = Response::new(Body::from(msg));
*resp.status_mut() = StatusCode::INTERNAL_SERVER_ERROR;
return resp;
}
let connstr = compute.connstr.clone();
// should I rewrite this to not wait for the result?
let _ = crate::installed_extensions::get_installed_extensions(connstr).await;
Response::new(Body::from("OK"))
}
// download extension files from remote extension storage on demand
(&Method::POST, route) if route.starts_with("/extension_server/") => {
info!("serving {:?} POST request", route);

View File

@@ -82,6 +82,21 @@ paths:
application/json:
schema:
$ref: "#/components/schemas/InstalledExtensions"
head:
tags:
- Extension
summary: Report extension DDL to trigger metric recollection.
description: ""
operationId: ExtensionDDL
responses:
200:
description: Result
500:
description: Internal error
content:
application/json:
schema:
$ref: "#/components/schemas/GenericError"
/info:
get:
tags:

View File

@@ -12,7 +12,9 @@
#include <curl/curl.h>
#include "access/xact.h"
#include "utils/guc.h"
#include "tcop/utility.h"
#include "extension_server.h"
#include "neon_utils.h"
@@ -21,6 +23,10 @@ static int extension_server_port = 0;
static download_extension_file_hook_type prev_download_extension_file_hook = NULL;
static ProcessUtility_hook_type PreviousProcessUtilityHook = NULL;
static bool extension_ddl_occured = false;
/*
* to download all SQL (and data) files for an extension:
* curl -X POST http://localhost:8080/extension_server/postgis
@@ -74,9 +80,142 @@ neon_download_extension_file_http(const char *filename, bool is_library)
return ret;
}
// Handle extension DDL: we need this for monitoring of installed extensions.
// General solution is hard, because extensions can be installed in many ways,
// i.e. sometimes as a cascade operations.
//
// Also, we don't have enough information in the statement itself,
// i.e. extension version is not always present and retrieved from the control file
// at a later stage.
//
// So, just remember the fact of the extension DDL and send it to compute_ctl
// on commit.
static void
NeonExtensionProcessUtility(
PlannedStmt *pstmt,
const char *queryString,
bool readOnlyTree,
ProcessUtilityContext context,
ParamListInfo params,
QueryEnvironment *queryEnv,
DestReceiver *dest,
QueryCompletion *qc)
{
Node *parseTree = pstmt->utilityStmt;
switch (nodeTag(parseTree))
{
case T_CreateExtensionStmt:
case T_AlterExtensionStmt:
extension_ddl_occured = true;
break;
case T_DropStmt:
{
switch (((DropStmt *) parseTree)->removeType)
{
case OBJECT_EXTENSION:
extension_ddl_occured = true;
break;
default:
break;
}
}
break;
default:
break;
}
if (PreviousProcessUtilityHook)
{
PreviousProcessUtilityHook(
pstmt,
queryString,
readOnlyTree,
context,
params,
queryEnv,
dest,
qc);
}
else
{
standard_ProcessUtility(
pstmt,
queryString,
readOnlyTree,
context,
params,
queryEnv,
dest,
qc);
}
}
static bool
neon_send_extension_ddl_event_http()
{
static CURL *handle = NULL;
CURLcode res;
char *compute_ctl_url;
bool ret = false;
if (handle == NULL)
{
handle = alloc_curl_handle();
}
compute_ctl_url = psprintf("http://localhost:%d/installed_extensions",
extension_server_port);
elog(LOG, "Sending request to compute_ctl: %s", compute_ctl_url);
curl_easy_setopt(handle, CURLOPT_URL, compute_ctl_url);
// Use HEAD request without payload, because this is just a notification.
//
// This is probably not the best API design, but I didn't want to introduce
// new endpoint for this. Suggestions are welcome.
curl_easy_setopt(handle, CURLOPT_NOBODY, 1L);
/* Perform the request, res will get the return code */
res = curl_easy_perform(handle);
/* Check for errors */
if (res != CURLE_OK)
{
/*
* Don't error here because this is just a monitoring feature.
*/
elog(WARNING, "neon_send_extension_ddl_event_http failed: %s\n", curl_easy_strerror(res));
}
return ret;
}
static void
NeonExtensionXactCallback(XactEvent event, void *arg)
{
if (event == XACT_EVENT_PRE_COMMIT || event == XACT_EVENT_PARALLEL_PRE_COMMIT)
{
if (extension_ddl_occured)
neon_send_extension_ddl_event_http();
}
extension_ddl_occured = false;
}
void
pg_init_extension_server()
{
PreviousProcessUtilityHook = ProcessUtility_hook;
ProcessUtility_hook = NeonExtensionProcessUtility;
RegisterXactCallback(NeonExtensionXactCallback, NULL);
/* Port to connect to compute_ctl on localhost */
/* to request extension files. */
DefineCustomIntVariable("neon.extension_server_port",