Files
neon/test_runner/regress/test_compute_profiling.py
Victor Polevoy d8b0c0834e Implement HTTP endpoint for compute profiling.
Exposes an endpoint "/profile/cpu" for profiling the postgres
processes (currently spawned and the new ones) using "perf".

Adds the corresponding python test to test the added endpoint
and confirm the output expected is the profiling data in the
expected format.

Add "perf" binary to the sudo list.

Fix python poetry ruff

Address the clippy lints

Document the code

Format python code

Address code review

Prettify

Embed profile_pb2.py and small code/test fixes.

Make the code slightly better.

1. Makes optional the sampling_frequency parameter for profiling.
2. Avoids using unsafe code when killing a child.

Better code, better tests

More tests

Separate start and stop of profiling

Correctly check for the exceptions

Address clippy lint

Final fixes.

1. Allows the perf to be found in $PATH instead of having the path
hardcoded.
2. Changes the path to perf in the sudoers file so that the compute
can run it properly.
3. Changes the way perf is invoked, now it is with sudo and the path
from $PATH.
4. Removes the authentication requirement from the /profile/cpu/
endpoint.

hakari thing

Python fixes

Fix python formatting

More python fixes

Update poetry lock

Fix ruff

Address the review comments

Fix the tests

Try fixing the flaky test for pg17?

Try fixing the flaky test for pg17?

PYTHON

Fix the tests

Remove the PROGRESS parameter

Remove unused

Increase the timeout due to concurrency

Increase the timeout to 60

Increase the profiling window timeout

Try this

Lets see the error

Just log all the errors

Add perf into the build environment

uijdfghjdf

Update tempfile to 3.20

Snapshot

Use bbc-profile

Update tempfile to 3.20

Provide bpfcc-tools in debian

Properly respond with status

Python check

Fix build-tools dockerfile

Add path probation for the bcc profile

Try err printing

Refactor

Add bpfcc-tools to the final image

Add error context

sudo not found?

Print more errors for verbosity

Remove procfs and use libproc

Update hakari

Debug sudo in CI

Rebase and adjust hakari

remove leftover

Add archiving support

Correct the paths to the perf binary

Try hardcoded sudo path

Add sudo into build-tools dockerfile

Minor cleanup

Print out the sudoers file from github

Stop the tests earlier

Add the sudoers entry for nonroot, install kmod for modprobe for bcc-profile

Try hacking the kernel headers for bcc-profile

Redeclare the kernel version argument

Try using the kernel of the runner

Try another way

Check bpfcc-tools
2025-07-11 12:53:48 +02:00

315 lines
10 KiB
Python

from __future__ import annotations
import gzip
import io
import threading
import time
from typing import TYPE_CHECKING, Any
import pytest
from data.profile_pb2 import Profile # type: ignore
from fixtures.log_helper import log
from fixtures.utils import run_only_on_default_postgres
from google.protobuf.message import Message
from requests import HTTPError
if TYPE_CHECKING:
from fixtures.endpoint.http import EndpointHttpClient
from fixtures.neon_fixtures import NeonEnv
REASON = "test doesn't use postgres"
def _start_profiling_cpu(
client: EndpointHttpClient, event: threading.Event | None, archive: bool = False
) -> Profile | None:
"""
Start CPU profiling for the compute node.
"""
log.info("Starting CPU profiling...")
if event is not None:
event.set()
status, response = client.start_profiling_cpu(100, 30, archive=archive)
match status:
case 200:
log.debug("CPU profiling finished")
profile: Any = Profile()
if archive:
log.debug("Unpacking gzipped profile data")
with gzip.GzipFile(fileobj=io.BytesIO(response)) as f:
response = f.read()
else:
log.debug("Unpacking non-gzipped profile data")
Message.ParseFromString(profile, response)
return profile
case 204:
log.debug("CPU profiling was stopped")
raise HTTPError("Failed to finish CPU profiling: was stopped.")
case 409:
log.debug("CPU profiling is already in progress, cannot start again")
raise HTTPError("Failed to finish CPU profiling: profiling is already in progress.")
case 500:
response_str = response.decode("utf-8", errors="replace")
log.debug(
f"Failed to finish CPU profiling, was stopped or got an error: {status}: {response_str}"
)
raise HTTPError(f"Failed to finish CPU profiling, {status}: {response_str}")
case _:
log.error(f"Failed to start CPU profiling: {status}")
raise HTTPError(f"Failed to start CPU profiling: {status}")
def _stop_profiling_cpu(client: EndpointHttpClient, event: threading.Event | None):
"""
Stop CPU profiling for the compute node.
"""
log.info("Manually stopping CPU profiling...")
if event is not None:
event.set()
status = client.stop_profiling_cpu()
match status:
case 200:
log.debug("CPU profiling stopped successfully")
case 412:
log.debug("CPU profiling is not running, nothing to do")
case _:
log.error(f"Failed to stop CPU profiling: {status}")
raise HTTPError(f"Failed to stop CPU profiling: {status}")
return status
def _wait_till_profiling_starts(
http_client: EndpointHttpClient,
event: threading.Event | None,
repeat_delay_secs: float = 0.3,
timeout: int = 60,
) -> bool:
"""
Wait until CPU profiling starts.
"""
log.info("Waiting for CPU profiling to start...")
end_time = time.time() + timeout
while not http_client.get_profiling_cpu_status():
time.sleep(repeat_delay_secs)
if end_time <= time.time():
log.error("Timeout while waiting for CPU profiling to start")
return False
log.info("CPU profiling has started successfully and is in progress")
if event is not None:
event.set()
return True
def _wait_and_assert_cpu_profiling(http_client: EndpointHttpClient, event: threading.Event | None):
profile = _start_profiling_cpu(http_client, event)
if profile is None:
log.error("The received profiling data is malformed or empty.")
return
assert len(profile.sample) > 0, "No samples found in CPU profiling data"
assert len(profile.mapping) > 0, "No mappings found in CPU profiling data"
assert len(profile.location) > 0, "No locations found in CPU profiling data"
assert len(profile.function) > 0, "No functions found in CPU profiling data"
assert len(profile.string_table) > 0, "No string tables found in CPU profiling data"
strings = [
"PostgresMain",
"ServerLoop",
"BackgroundWorkerMain",
"pq_recvbuf",
"pq_getbyte",
]
assert any(s in profile.string_table for s in strings), (
f"Expected at least one of {strings} in string table, but none found"
)
@run_only_on_default_postgres(reason=REASON)
def test_compute_profiling_cpu_with_timeout(neon_simple_env: NeonEnv):
"""
Test that CPU profiling works correctly with timeout.
"""
env = neon_simple_env
endpoint = env.endpoints.create_start("main")
pg_conn = endpoint.connect()
pg_cur = pg_conn.cursor()
pg_cur.execute("create database profiling_test2")
http_client = endpoint.http_client()
def _wait_and_assert_cpu_profiling_local():
_wait_and_assert_cpu_profiling(http_client, None)
thread = threading.Thread(target=_wait_and_assert_cpu_profiling_local)
thread.start()
inserting_should_stop_event = threading.Event()
def insert_rows():
lfc_conn = endpoint.connect(dbname="profiling_test2")
lfc_cur = lfc_conn.cursor()
n_records = 0
lfc_cur.execute(
"create table t(pk integer primary key, payload text default repeat('?', 128))"
)
batch_size = 1000
log.info("Inserting rows")
while not inserting_should_stop_event.is_set():
n_records += batch_size
lfc_cur.execute(
f"insert into t (pk) values (generate_series({n_records - batch_size + 1},{batch_size}))"
)
log.info(f"Inserted {n_records} rows")
thread2 = threading.Thread(target=insert_rows)
assert _wait_till_profiling_starts(http_client, None)
time.sleep(4) # Give some time for the profiling to start
thread2.start()
thread.join(timeout=60)
inserting_should_stop_event.set() # Stop the insertion thread
thread2.join(timeout=60)
@run_only_on_default_postgres(reason=REASON)
def test_compute_profiling_cpu_with_archiving_the_response(neon_simple_env: NeonEnv):
"""
Test that CPU profiling works correctly with archiving the data.
"""
env = neon_simple_env
endpoint = env.endpoints.create_start("main")
http_client = endpoint.http_client()
assert _start_profiling_cpu(http_client, None, archive=True) is not None, (
"Failed to start and finish CPU profiling with archiving enabled"
)
@run_only_on_default_postgres(reason=REASON)
def test_compute_profiling_cpu_start_and_stop(neon_simple_env: NeonEnv):
"""
Test that CPU profiling can be started and stopped correctly.
"""
env = neon_simple_env
endpoint = env.endpoints.create_start("main")
http_client = endpoint.http_client()
def _wait_and_assert_cpu_profiling():
# Should raise as the profiling will be stopped.
with pytest.raises(HTTPError) as _:
_start_profiling_cpu(http_client, None)
thread = threading.Thread(target=_wait_and_assert_cpu_profiling)
thread.start()
assert _wait_till_profiling_starts(http_client, None)
_stop_profiling_cpu(http_client, None)
# Should raise as the profiling is already stopped.
assert _stop_profiling_cpu(http_client, None) == 412
thread.join(timeout=60)
@run_only_on_default_postgres(reason=REASON)
def test_compute_profiling_cpu_conflict(neon_simple_env: NeonEnv):
"""
Test that CPU profiling can be started once and the second time
it will throw an error as it is already running.
"""
env = neon_simple_env
endpoint = env.endpoints.create_start("main")
pg_conn = endpoint.connect()
pg_cur = pg_conn.cursor()
pg_cur.execute("create database profiling_test")
http_client = endpoint.http_client()
def _wait_and_assert_cpu_profiling_local():
_wait_and_assert_cpu_profiling(http_client, None)
thread = threading.Thread(target=_wait_and_assert_cpu_profiling_local)
thread.start()
assert _wait_till_profiling_starts(http_client, None)
inserting_should_stop_event = threading.Event()
def insert_rows():
lfc_conn = endpoint.connect(dbname="profiling_test")
lfc_cur = lfc_conn.cursor()
n_records = 0
lfc_cur.execute(
"create table t(pk integer primary key, payload text default repeat('?', 128))"
)
batch_size = 1000
log.info("Inserting rows")
while not inserting_should_stop_event.is_set():
n_records += batch_size
lfc_cur.execute(
f"insert into t (pk) values (generate_series({n_records - batch_size + 1},{batch_size}))"
)
log.info(f"Inserted {n_records} rows")
thread2 = threading.Thread(target=insert_rows)
thread2.start()
# Should raise as the profiling is already in progress.
with pytest.raises(HTTPError) as _:
_start_profiling_cpu(http_client, None)
inserting_should_stop_event.set() # Stop the insertion thread
# The profiling should still be running and finish normally.
thread.join(timeout=600)
thread2.join(timeout=600)
@run_only_on_default_postgres(reason=REASON)
def test_compute_profiling_cpu_stop_when_not_running(neon_simple_env: NeonEnv):
"""
Test that CPU profiling throws the expected error when is attempted
to be stopped when it hasn't be running.
"""
env = neon_simple_env
endpoint = env.endpoints.create_start("main")
http_client = endpoint.http_client()
for _ in range(3):
status = _stop_profiling_cpu(http_client, None)
assert status == 412
@run_only_on_default_postgres(reason=REASON)
def test_compute_profiling_cpu_start_arguments_validation_works(neon_simple_env: NeonEnv):
"""
Test that CPU profiling start request properly validated the
arguments and throws the expected error (bad request).
"""
env = neon_simple_env
endpoint = env.endpoints.create_start("main")
http_client = endpoint.http_client()
for sampling_frequency in [-1, 0, 1000000]:
status, _ = http_client.start_profiling_cpu(sampling_frequency, 5)
assert status == 422
for timeout_seconds in [-1, 0, 1000000]:
status, _ = http_client.start_profiling_cpu(5, timeout_seconds)
assert status == 422