mirror of
https://github.com/neondatabase/neon.git
synced 2026-01-09 22:42:57 +00:00
Exposes an endpoint "/profile/cpu" for profiling the postgres processes (currently spawned and the new ones) using "perf". Adds the corresponding python test to test the added endpoint and confirm the output expected is the profiling data in the expected format. Add "perf" binary to the sudo list. Fix python poetry ruff Address the clippy lints Document the code Format python code Address code review Prettify Embed profile_pb2.py and small code/test fixes. Make the code slightly better. 1. Makes optional the sampling_frequency parameter for profiling. 2. Avoids using unsafe code when killing a child. Better code, better tests More tests Separate start and stop of profiling Correctly check for the exceptions Address clippy lint Final fixes. 1. Allows the perf to be found in $PATH instead of having the path hardcoded. 2. Changes the path to perf in the sudoers file so that the compute can run it properly. 3. Changes the way perf is invoked, now it is with sudo and the path from $PATH. 4. Removes the authentication requirement from the /profile/cpu/ endpoint. hakari thing Python fixes Fix python formatting More python fixes Update poetry lock Fix ruff Address the review comments Fix the tests Try fixing the flaky test for pg17? Try fixing the flaky test for pg17? PYTHON Fix the tests Remove the PROGRESS parameter Remove unused Increase the timeout due to concurrency Increase the timeout to 60 Increase the profiling window timeout Try this Lets see the error Just log all the errors Add perf into the build environment uijdfghjdf Update tempfile to 3.20 Snapshot Use bbc-profile Update tempfile to 3.20 Provide bpfcc-tools in debian Properly respond with status Python check Fix build-tools dockerfile Add path probation for the bcc profile Try err printing Refactor Add bpfcc-tools to the final image Add error context sudo not found? Print more errors for verbosity Remove procfs and use libproc Update hakari Debug sudo in CI Rebase and adjust hakari remove leftover Add archiving support Correct the paths to the perf binary Try hardcoded sudo path Add sudo into build-tools dockerfile Minor cleanup Print out the sudoers file from github Stop the tests earlier Add the sudoers entry for nonroot, install kmod for modprobe for bcc-profile Try hacking the kernel headers for bcc-profile Redeclare the kernel version argument Try using the kernel of the runner Try another way Check bpfcc-tools
208 lines
6.5 KiB
Python
208 lines
6.5 KiB
Python
from __future__ import annotations
|
|
|
|
import urllib.parse
|
|
from enum import StrEnum
|
|
from typing import TYPE_CHECKING, Any, final
|
|
|
|
import requests
|
|
from requests.adapters import HTTPAdapter
|
|
from requests.auth import AuthBase
|
|
from requests.exceptions import ReadTimeout
|
|
from typing_extensions import override
|
|
|
|
from fixtures.log_helper import log
|
|
from fixtures.utils import wait_until
|
|
|
|
if TYPE_CHECKING:
|
|
from requests import PreparedRequest
|
|
|
|
|
|
COMPUTE_AUDIENCE = "compute"
|
|
"""
|
|
The value to place in the `aud` claim.
|
|
"""
|
|
|
|
|
|
@final
|
|
class ComputeClaimsScope(StrEnum):
|
|
ADMIN = "compute_ctl:admin"
|
|
|
|
|
|
@final
|
|
class BearerAuth(AuthBase):
|
|
"""
|
|
Auth implementation for bearer authorization in HTTP requests through the
|
|
requests HTTP client library.
|
|
"""
|
|
|
|
def __init__(self, jwt: str):
|
|
self.__jwt = jwt
|
|
|
|
@override
|
|
def __call__(self, request: PreparedRequest) -> PreparedRequest:
|
|
request.headers["Authorization"] = "Bearer " + self.__jwt
|
|
return request
|
|
|
|
|
|
@final
|
|
class EndpointHttpClient(requests.Session):
|
|
def __init__(
|
|
self,
|
|
external_port: int,
|
|
internal_port: int,
|
|
jwt: str,
|
|
):
|
|
super().__init__()
|
|
self.external_port: int = external_port
|
|
self.internal_port: int = internal_port
|
|
self.auth = BearerAuth(jwt)
|
|
|
|
self.mount("http://", HTTPAdapter())
|
|
self.prewarm_url = f"http://localhost:{external_port}/lfc/prewarm"
|
|
self.offload_url = f"http://localhost:{external_port}/lfc/offload"
|
|
|
|
def dbs_and_roles(self):
|
|
res = self.get(f"http://localhost:{self.external_port}/dbs_and_roles", auth=self.auth)
|
|
res.raise_for_status()
|
|
return res.json()
|
|
|
|
def prewarm_lfc_status(self) -> dict[str, str]:
|
|
res = self.get(self.prewarm_url)
|
|
res.raise_for_status()
|
|
json: dict[str, str] = res.json()
|
|
return json
|
|
|
|
def prewarm_lfc(self, from_endpoint_id: str | None = None):
|
|
params = {"from_endpoint": from_endpoint_id} if from_endpoint_id else dict()
|
|
self.post(self.prewarm_url, params=params).raise_for_status()
|
|
self.prewarm_lfc_wait()
|
|
|
|
def prewarm_lfc_wait(self):
|
|
def prewarmed():
|
|
json = self.prewarm_lfc_status()
|
|
status, err = json["status"], json.get("error")
|
|
assert status == "completed", f"{status}, {err=}"
|
|
|
|
wait_until(prewarmed, timeout=60)
|
|
|
|
def offload_lfc_status(self) -> dict[str, str]:
|
|
res = self.get(self.offload_url)
|
|
res.raise_for_status()
|
|
json: dict[str, str] = res.json()
|
|
return json
|
|
|
|
def offload_lfc(self):
|
|
self.post(self.offload_url).raise_for_status()
|
|
self.offload_lfc_wait()
|
|
|
|
def offload_lfc_wait(self):
|
|
def offloaded():
|
|
json = self.offload_lfc_status()
|
|
status, err = json["status"], json.get("error")
|
|
assert status == "completed", f"{status}, {err=}"
|
|
|
|
wait_until(offloaded)
|
|
|
|
def promote(self, safekeepers_lsn: dict[str, Any], disconnect: bool = False):
|
|
url = f"http://localhost:{self.external_port}/promote"
|
|
if disconnect:
|
|
try: # send first request to start promote and disconnect
|
|
self.post(url, data=safekeepers_lsn, timeout=0.001)
|
|
except ReadTimeout:
|
|
pass # wait on second request which returns on promotion finish
|
|
res = self.post(url, data=safekeepers_lsn)
|
|
res.raise_for_status()
|
|
json: dict[str, str] = res.json()
|
|
return json
|
|
|
|
def start_profiling_cpu(
|
|
self, sampling_frequency: int, timeout_seconds: int, archive: bool = False
|
|
) -> tuple[int, bytes]:
|
|
url = f"http://localhost:{self.external_port}/profile/cpu"
|
|
params = {
|
|
"profiler": {"BccProfile": None},
|
|
"sampling_frequency": sampling_frequency,
|
|
"timeout_seconds": timeout_seconds,
|
|
"archive": archive,
|
|
}
|
|
|
|
res = self.post(
|
|
url,
|
|
json=params,
|
|
auth=self.auth,
|
|
)
|
|
|
|
return res.status_code, res.content
|
|
|
|
def stop_profiling_cpu(self) -> int:
|
|
url = f"http://localhost:{self.external_port}/profile/cpu"
|
|
res = self.delete(url, auth=self.auth)
|
|
return res.status_code
|
|
|
|
def get_profiling_cpu_status(self) -> bool:
|
|
"""
|
|
Returns True if CPU profiling is currently running, False otherwise.
|
|
"""
|
|
url = f"http://localhost:{self.external_port}/profile/cpu"
|
|
res = self.get(url, auth=self.auth)
|
|
res.raise_for_status()
|
|
return res.status_code == 200
|
|
|
|
def database_schema(self, database: str):
|
|
res = self.get(
|
|
f"http://localhost:{self.external_port}/database_schema?database={urllib.parse.quote(database, safe='')}",
|
|
auth=self.auth,
|
|
)
|
|
res.raise_for_status()
|
|
return res.text
|
|
|
|
def extensions(self, extension: str, version: str, database: str):
|
|
body = {
|
|
"extension": extension,
|
|
"version": version,
|
|
"database": database,
|
|
}
|
|
res = self.post(f"http://localhost:{self.internal_port}/extensions", json=body)
|
|
res.raise_for_status()
|
|
return res.json()
|
|
|
|
def set_role_grants(self, database: str, role: str, schema: str, privileges: list[str]):
|
|
res = self.post(
|
|
f"http://localhost:{self.internal_port}/grants",
|
|
json={"database": database, "schema": schema, "role": role, "privileges": privileges},
|
|
)
|
|
res.raise_for_status()
|
|
return res.json()
|
|
|
|
def metrics(self) -> str:
|
|
res = self.get(f"http://localhost:{self.external_port}/metrics")
|
|
res.raise_for_status()
|
|
log.debug("raw compute metrics: %s", res.text)
|
|
return res.text
|
|
|
|
# Current compute status.
|
|
def status(self):
|
|
res = self.get(f"http://localhost:{self.external_port}/status", auth=self.auth)
|
|
res.raise_for_status()
|
|
return res.json()
|
|
|
|
# Compute startup-related metrics.
|
|
def metrics_json(self):
|
|
res = self.get(f"http://localhost:{self.external_port}/metrics.json", auth=self.auth)
|
|
res.raise_for_status()
|
|
return res.json()
|
|
|
|
def configure_failpoints(self, *args: tuple[str, str]) -> None:
|
|
body: list[dict[str, str]] = []
|
|
|
|
for fp in args:
|
|
body.append(
|
|
{
|
|
"name": fp[0],
|
|
"action": fp[1],
|
|
}
|
|
)
|
|
|
|
res = self.post(f"http://localhost:{self.internal_port}/failpoints", json=body)
|
|
res.raise_for_status()
|