mirror of
https://github.com/neondatabase/neon.git
synced 2026-06-03 05:20:38 +00:00
tests: use parse_metrics everywhere (#3737)
- use parse_metrics() in all places where we parse Prometheus metrics - query_all: make `filter` argument optional - encourage using properly parsed, typed metrics by changing get_metrics() to return already-parsed metrics. The new get_metric_str() method, like in the Safekeeper type, returns the raw text response.
This commit is contained in:
committed by
GitHub
parent
1b780fa752
commit
d1a0a907ff
@@ -366,17 +366,9 @@ class NeonBenchmarker:
|
||||
|
||||
def get_int_counter_value(self, pageserver: NeonPageserver, metric_name: str) -> int:
|
||||
"""Fetch the value of given int counter from pageserver metrics."""
|
||||
# TODO: If we start to collect more of the prometheus metrics in the
|
||||
# performance test suite like this, we should refactor this to load and
|
||||
# parse all the metrics into a more convenient structure in one go.
|
||||
#
|
||||
# The metric should be an integer, as it's a number of bytes. But in general
|
||||
# all prometheus metrics are floats. So to be pedantic, read it as a float
|
||||
# and round to integer.
|
||||
all_metrics = pageserver.http_client().get_metrics()
|
||||
matches = re.search(rf"^{metric_name} (\S+)$", all_metrics, re.MULTILINE)
|
||||
assert matches, f"metric {metric_name} not found"
|
||||
return int(round(float(matches.group(1))))
|
||||
sample = all_metrics.query_one(metric_name)
|
||||
return int(round(sample.value))
|
||||
|
||||
def get_timeline_size(
|
||||
self, repo_dir: Path, tenant_id: TenantId, timeline_id: TimelineId
|
||||
|
||||
@@ -13,7 +13,8 @@ class Metrics:
|
||||
self.metrics = defaultdict(list)
|
||||
self.name = name
|
||||
|
||||
def query_all(self, name: str, filter: Dict[str, str]) -> List[Sample]:
|
||||
def query_all(self, name: str, filter: Optional[Dict[str, str]] = None) -> List[Sample]:
|
||||
filter = filter or {}
|
||||
res = []
|
||||
for sample in self.metrics[name]:
|
||||
try:
|
||||
|
||||
@@ -29,7 +29,6 @@ import asyncpg
|
||||
import backoff # type: ignore
|
||||
import boto3
|
||||
import jwt
|
||||
import prometheus_client
|
||||
import psycopg2
|
||||
import pytest
|
||||
import requests
|
||||
@@ -37,7 +36,7 @@ from _pytest.config import Config
|
||||
from _pytest.config.argparsing import Parser
|
||||
from _pytest.fixtures import FixtureRequest
|
||||
from fixtures.log_helper import log
|
||||
from fixtures.metrics import parse_metrics
|
||||
from fixtures.metrics import Metrics, parse_metrics
|
||||
from fixtures.types import Lsn, TenantId, TimelineId
|
||||
from fixtures.utils import (
|
||||
ATTACHMENT_NAME_REGEX,
|
||||
@@ -46,7 +45,6 @@ from fixtures.utils import (
|
||||
get_self_dir,
|
||||
subprocess_capture,
|
||||
)
|
||||
from prometheus_client.parser import text_string_to_metric_families
|
||||
|
||||
# Type-related stuff
|
||||
from psycopg2.extensions import connection as PgConnection
|
||||
@@ -1437,22 +1435,27 @@ class PageserverHttpClient(requests.Session):
|
||||
assert completed["successful_download_count"] > 0
|
||||
return completed
|
||||
|
||||
def get_metrics(self) -> str:
|
||||
def get_metrics_str(self) -> str:
|
||||
"""You probably want to use get_metrics() instead."""
|
||||
res = self.get(f"http://localhost:{self.port}/metrics")
|
||||
self.verbose_error(res)
|
||||
return res.text
|
||||
|
||||
def get_timeline_metric(self, tenant_id: TenantId, timeline_id: TimelineId, metric_name: str):
|
||||
raw = self.get_metrics()
|
||||
family: List[prometheus_client.Metric] = list(text_string_to_metric_families(raw))
|
||||
[metric] = [m for m in family if m.name == metric_name]
|
||||
[sample] = [
|
||||
s
|
||||
for s in metric.samples
|
||||
if s.labels["tenant_id"] == str(tenant_id)
|
||||
and s.labels["timeline_id"] == str(timeline_id)
|
||||
]
|
||||
return sample.value
|
||||
def get_metrics(self) -> Metrics:
|
||||
res = self.get_metrics_str()
|
||||
return parse_metrics(res)
|
||||
|
||||
def get_timeline_metric(
|
||||
self, tenant_id: TenantId, timeline_id: TimelineId, metric_name: str
|
||||
) -> float:
|
||||
metrics = self.get_metrics()
|
||||
return metrics.query_one(
|
||||
metric_name,
|
||||
filter={
|
||||
"tenant_id": str(tenant_id),
|
||||
"timeline_id": str(timeline_id),
|
||||
},
|
||||
).value
|
||||
|
||||
def get_remote_timeline_client_metric(
|
||||
self,
|
||||
@@ -1462,7 +1465,7 @@ class PageserverHttpClient(requests.Session):
|
||||
file_kind: str,
|
||||
op_kind: str,
|
||||
) -> Optional[float]:
|
||||
metrics = parse_metrics(self.get_metrics(), "pageserver")
|
||||
metrics = self.get_metrics()
|
||||
matches = metrics.query_all(
|
||||
name=metric_name,
|
||||
filter={
|
||||
@@ -1481,14 +1484,16 @@ class PageserverHttpClient(requests.Session):
|
||||
assert len(matches) < 2, "above filter should uniquely identify metric"
|
||||
return value
|
||||
|
||||
def get_metric_value(self, name: str) -> Optional[str]:
|
||||
def get_metric_value(
|
||||
self, name: str, filter: Optional[Dict[str, str]] = None
|
||||
) -> Optional[float]:
|
||||
metrics = self.get_metrics()
|
||||
relevant = [line for line in metrics.splitlines() if line.startswith(name)]
|
||||
if len(relevant) == 0:
|
||||
results = metrics.query_all(name, filter=filter)
|
||||
if not results:
|
||||
log.info(f'could not find metric "{name}"')
|
||||
return None
|
||||
assert len(relevant) == 1
|
||||
return relevant[0].lstrip(name).strip()
|
||||
assert len(results) == 1, f"metric {name} with given filters is not unique, got: {results}"
|
||||
return results[0].value
|
||||
|
||||
def layer_map_info(
|
||||
self,
|
||||
|
||||
@@ -8,7 +8,7 @@ def test_build_info_metric(neon_env_builder: NeonEnvBuilder, link_proxy: NeonPro
|
||||
|
||||
parsed_metrics = {}
|
||||
|
||||
parsed_metrics["pageserver"] = parse_metrics(env.pageserver.http_client().get_metrics())
|
||||
parsed_metrics["pageserver"] = parse_metrics(env.pageserver.http_client().get_metrics_str())
|
||||
parsed_metrics["safekeeper"] = parse_metrics(env.safekeepers[0].http_client().get_metrics_str())
|
||||
parsed_metrics["proxy"] = parse_metrics(link_proxy.get_metrics())
|
||||
|
||||
|
||||
@@ -4,7 +4,6 @@ import random
|
||||
|
||||
import pytest
|
||||
from fixtures.log_helper import log
|
||||
from fixtures.metrics import parse_metrics
|
||||
from fixtures.neon_fixtures import (
|
||||
NeonEnv,
|
||||
NeonEnvBuilder,
|
||||
@@ -134,7 +133,7 @@ def test_gc_index_upload(neon_env_builder: NeonEnvBuilder, remote_storage_kind:
|
||||
|
||||
# Helper function that gets the number of given kind of remote ops from the metrics
|
||||
def get_num_remote_ops(file_kind: str, op_kind: str) -> int:
|
||||
ps_metrics = parse_metrics(env.pageserver.http_client().get_metrics(), "pageserver")
|
||||
ps_metrics = env.pageserver.http_client().get_metrics()
|
||||
total = 0.0
|
||||
for sample in ps_metrics.query_all(
|
||||
name="pageserver_remote_operation_seconds_count",
|
||||
|
||||
@@ -9,7 +9,6 @@ from typing import Iterator
|
||||
|
||||
import pytest
|
||||
from fixtures.log_helper import log
|
||||
from fixtures.metrics import parse_metrics
|
||||
from fixtures.neon_fixtures import (
|
||||
PSQL,
|
||||
NeonEnvBuilder,
|
||||
@@ -143,7 +142,7 @@ def test_metric_collection(
|
||||
|
||||
# Helper function that gets the number of given kind of remote ops from the metrics
|
||||
def get_num_remote_ops(file_kind: str, op_kind: str) -> int:
|
||||
ps_metrics = parse_metrics(env.pageserver.http_client().get_metrics(), "pageserver")
|
||||
ps_metrics = env.pageserver.http_client().get_metrics()
|
||||
total = 0.0
|
||||
for sample in ps_metrics.query_all(
|
||||
name="pageserver_remote_operation_seconds_count",
|
||||
|
||||
@@ -11,6 +11,7 @@ from fixtures.log_helper import log
|
||||
from fixtures.neon_fixtures import (
|
||||
NeonEnvBuilder,
|
||||
PageserverApiException,
|
||||
PageserverHttpClient,
|
||||
RemoteStorageKind,
|
||||
assert_tenant_status,
|
||||
available_remote_storages,
|
||||
@@ -25,9 +26,16 @@ from fixtures.types import Lsn
|
||||
from fixtures.utils import query_scalar
|
||||
|
||||
|
||||
def get_num_downloaded_layers(client, tenant_id, timeline_id):
|
||||
def get_num_downloaded_layers(client: PageserverHttpClient, tenant_id, timeline_id):
|
||||
value = client.get_metric_value(
|
||||
f'pageserver_remote_operation_seconds_count{{file_kind="layer",op_kind="download",status="success",tenant_id="{tenant_id}",timeline_id="{timeline_id}"}}'
|
||||
"pageserver_remote_operation_seconds_count",
|
||||
{
|
||||
"file_kind": "layer",
|
||||
"op_kind": "download",
|
||||
"status": "success",
|
||||
"tenant_id": tenant_id,
|
||||
"timeline_id": timeline_id,
|
||||
},
|
||||
)
|
||||
if value is None:
|
||||
return 0
|
||||
|
||||
@@ -6,7 +6,6 @@ from threading import Thread
|
||||
import asyncpg
|
||||
import pytest
|
||||
from fixtures.log_helper import log
|
||||
from fixtures.metrics import parse_metrics
|
||||
from fixtures.neon_fixtures import (
|
||||
NeonEnv,
|
||||
NeonEnvBuilder,
|
||||
@@ -79,7 +78,7 @@ def test_tenant_reattach(
|
||||
".*failed to perform remote task UploadMetadata.*, will retry.*"
|
||||
)
|
||||
|
||||
ps_metrics = parse_metrics(pageserver_http.get_metrics(), "pageserver")
|
||||
ps_metrics = pageserver_http.get_metrics()
|
||||
tenant_metric_filter = {
|
||||
"tenant_id": str(tenant_id),
|
||||
"timeline_id": str(timeline_id),
|
||||
@@ -93,7 +92,7 @@ def test_tenant_reattach(
|
||||
|
||||
time.sleep(1) # for metrics propagation
|
||||
|
||||
ps_metrics = parse_metrics(pageserver_http.get_metrics(), "pageserver")
|
||||
ps_metrics = pageserver_http.get_metrics()
|
||||
pageserver_last_record_lsn = int(
|
||||
ps_metrics.query_one("pageserver_last_record_lsn", filter=tenant_metric_filter).value
|
||||
)
|
||||
|
||||
@@ -50,16 +50,22 @@ def test_tenant_tasks(neon_env_builder: NeonEnvBuilder):
|
||||
wait_until(10, 0.2, lambda: assert_active(tenant_id))
|
||||
|
||||
# Assert that all tasks finish quickly after tenant is detached
|
||||
task_starts = client.get_metric_value('pageserver_tenant_task_events{event="start"}')
|
||||
task_starts = client.get_metric_value("pageserver_tenant_task_events_total", {"event": "start"})
|
||||
assert task_starts is not None
|
||||
assert int(task_starts) > 0
|
||||
client.tenant_detach(tenant)
|
||||
client.tenant_detach(env.initial_tenant)
|
||||
|
||||
def assert_tasks_finish():
|
||||
tasks_started = client.get_metric_value('pageserver_tenant_task_events{event="start"}')
|
||||
tasks_ended = client.get_metric_value('pageserver_tenant_task_events{event="stop"}')
|
||||
tasks_panicked = client.get_metric_value('pageserver_tenant_task_events{event="panic"}')
|
||||
tasks_started = client.get_metric_value(
|
||||
"pageserver_tenant_task_events_total", {"event": "start"}
|
||||
)
|
||||
tasks_ended = client.get_metric_value(
|
||||
"pageserver_tenant_task_events_total", {"event": "stop"}
|
||||
)
|
||||
tasks_panicked = client.get_metric_value(
|
||||
"pageserver_tenant_task_events_total", {"event": "panic"}
|
||||
)
|
||||
log.info(f"started {tasks_started}, ended {tasks_ended}, panicked {tasks_panicked}")
|
||||
assert tasks_started == tasks_ended
|
||||
assert tasks_panicked is None or int(tasks_panicked) == 0
|
||||
|
||||
@@ -107,7 +107,7 @@ def test_metrics_normal_work(neon_env_builder: NeonEnvBuilder):
|
||||
assert cur.fetchone() == (5000050000,)
|
||||
|
||||
collected_metrics = {
|
||||
"pageserver": env.pageserver.http_client().get_metrics(),
|
||||
"pageserver": env.pageserver.http_client().get_metrics_str(),
|
||||
}
|
||||
for sk in env.safekeepers:
|
||||
collected_metrics[f"safekeeper{sk.id}"] = sk.http_client().get_metrics_str()
|
||||
@@ -207,7 +207,7 @@ def test_pageserver_metrics_removed_after_detach(
|
||||
assert cur.fetchone() == (5000050000,)
|
||||
|
||||
def get_ps_metric_samples_for_tenant(tenant_id: TenantId) -> List[Sample]:
|
||||
ps_metrics = parse_metrics(env.pageserver.http_client().get_metrics(), "pageserver")
|
||||
ps_metrics = env.pageserver.http_client().get_metrics()
|
||||
samples = []
|
||||
for metric_name in ps_metrics.metrics:
|
||||
for sample in ps_metrics.query_all(
|
||||
@@ -307,7 +307,7 @@ def test_pageserver_with_empty_tenants(
|
||||
|
||||
time.sleep(1) # to allow metrics propagation
|
||||
|
||||
ps_metrics = parse_metrics(client.get_metrics(), "pageserver")
|
||||
ps_metrics = client.get_metrics()
|
||||
broken_tenants_metric_filter = {
|
||||
"tenant_id": str(tenant_without_timelines_dir),
|
||||
"state": "broken",
|
||||
|
||||
@@ -1,7 +1,6 @@
|
||||
import math
|
||||
import queue
|
||||
import random
|
||||
import re
|
||||
import threading
|
||||
import time
|
||||
from contextlib import closing
|
||||
@@ -465,26 +464,26 @@ def test_timeline_size_metrics(
|
||||
|
||||
# get the metrics and parse the metric for the current timeline's physical size
|
||||
metrics = env.pageserver.http_client().get_metrics()
|
||||
matches = re.search(
|
||||
f'^pageserver_resident_physical_size{{tenant_id="{env.initial_tenant}",timeline_id="{new_timeline_id}"}} (\\S+)$',
|
||||
metrics,
|
||||
re.MULTILINE,
|
||||
)
|
||||
assert matches
|
||||
tl_physical_size_metric = int(matches.group(1))
|
||||
tl_physical_size_metric = metrics.query_one(
|
||||
name="pageserver_resident_physical_size",
|
||||
filter={
|
||||
"tenant_id": str(env.initial_tenant),
|
||||
"timeline_id": str(new_timeline_id),
|
||||
},
|
||||
).value
|
||||
|
||||
# assert that the physical size metric matches the actual physical size on disk
|
||||
timeline_path = env.timeline_dir(env.initial_tenant, new_timeline_id)
|
||||
assert tl_physical_size_metric == get_timeline_dir_size(timeline_path)
|
||||
|
||||
# Check that the logical size metric is sane, and matches
|
||||
matches = re.search(
|
||||
f'^pageserver_current_logical_size{{tenant_id="{env.initial_tenant}",timeline_id="{new_timeline_id}"}} (\\S+)$',
|
||||
metrics,
|
||||
re.MULTILINE,
|
||||
)
|
||||
assert matches
|
||||
tl_logical_size_metric = int(matches.group(1))
|
||||
tl_logical_size_metric = metrics.query_one(
|
||||
name="pageserver_current_logical_size",
|
||||
filter={
|
||||
"tenant_id": str(env.initial_tenant),
|
||||
"timeline_id": str(new_timeline_id),
|
||||
},
|
||||
).value
|
||||
|
||||
pgdatadir = test_output_dir / "pgdata-vanilla"
|
||||
pg_bin = PgBin(test_output_dir, pg_distrib_dir, pg_version)
|
||||
@@ -575,8 +574,8 @@ def get_physical_size_values(
|
||||
|
||||
client = env.pageserver.http_client()
|
||||
|
||||
res.prometheus_resident_physical = client.get_timeline_metric(
|
||||
tenant_id, timeline_id, "pageserver_resident_physical_size"
|
||||
res.prometheus_resident_physical = int(
|
||||
client.get_timeline_metric(tenant_id, timeline_id, "pageserver_resident_physical_size")
|
||||
)
|
||||
|
||||
detail = client.timeline_detail(
|
||||
|
||||
Reference in New Issue
Block a user