Files
neon/test_runner/fixtures/endpoint/http.py
Alexey Kondratov 557127550c feat(compute): Add compute_ctl_up metric (#11376)
## Problem

For computes running inside NeonVM, the actual compute image tag is
buried inside the NeonVM spec, and we cannot get it as part of standard
k8s container metrics (it's always an image and a tag of the NeonVM
runner container). The workaround we currently use is to extract the
running computes info from the control plane database with SQL. It has
several drawbacks: i) it's complicated, separate DB per region; ii) it's
slow; iii) it's still an indirect source of info, i.e. k8s state could
be different from what the control plane expects.

## Summary of changes

Add a new `compute_ctl_up` gauge metric with `build_tag` and `status`
labels. It will help us to both overview what are the tags/versions of
all running computes; and to break them down by current status (`empty`,
`running`, `failed`, etc.)

Later, we could introduce low cardinality (no endpoint or compute ids)
streaming aggregates for such metrics, so they will be blazingly fast
and usable for monitoring the fleet-wide state.
2025-04-01 08:51:17 +00:00

84 lines
2.5 KiB
Python

from __future__ import annotations
import urllib.parse
import requests
from requests.adapters import HTTPAdapter
from fixtures.log_helper import log
class EndpointHttpClient(requests.Session):
def __init__(
self,
external_port: int,
internal_port: int,
):
super().__init__()
self.external_port: int = external_port
self.internal_port: int = internal_port
self.mount("http://", HTTPAdapter())
def dbs_and_roles(self):
res = self.get(f"http://localhost:{self.external_port}/dbs_and_roles")
res.raise_for_status()
return res.json()
def database_schema(self, database: str):
res = self.get(
f"http://localhost:{self.external_port}/database_schema?database={urllib.parse.quote(database, safe='')}"
)
res.raise_for_status()
return res.text
def extensions(self, extension: str, version: str, database: str):
body = {
"extension": extension,
"version": version,
"database": database,
}
res = self.post(f"http://localhost:{self.internal_port}/extensions", json=body)
res.raise_for_status()
return res.json()
def set_role_grants(self, database: str, role: str, schema: str, privileges: list[str]):
res = self.post(
f"http://localhost:{self.internal_port}/grants",
json={"database": database, "schema": schema, "role": role, "privileges": privileges},
)
res.raise_for_status()
return res.json()
def metrics(self) -> str:
res = self.get(f"http://localhost:{self.external_port}/metrics")
res.raise_for_status()
log.debug("raw compute metrics: %s", res.text)
return res.text
# Current compute status.
def status(self):
res = self.get(f"http://localhost:{self.external_port}/status")
res.raise_for_status()
return res.json()
# Compute startup-related metrics.
def metrics_json(self):
res = self.get(f"http://localhost:{self.external_port}/metrics.json")
res.raise_for_status()
return res.json()
def configure_failpoints(self, *args: tuple[str, str]) -> None:
body: list[dict[str, str]] = []
for fp in args:
body.append(
{
"name": fp[0],
"action": fp[1],
}
)
res = self.post(f"http://localhost:{self.internal_port}/failpoints", json=body)
res.raise_for_status()