Add Safekeeper metrics tests (#746)

* zenith_fixtures.py: add SafekeeperHttpClient.get_metrics()
* Ensure that `collect_lsn` and `flush_lsn`'s reported values look reasonable in `test_many_timelines`
This commit is contained in:
Egor Suvorov
2021-11-09 22:18:59 +03:00
committed by GitHub
parent 07dddfed28
commit 587935ebed
2 changed files with 139 additions and 8 deletions

View File

@@ -3,13 +3,17 @@ import random
import time
import os
import subprocess
import sys
import threading
import uuid
from contextlib import closing
from dataclasses import dataclass, field
from multiprocessing import Process, Value
from fixtures.zenith_fixtures import PgBin, ZenithEnv, ZenithEnvBuilder
from fixtures.utils import lsn_to_hex, mkdir_if_needed
from fixtures.log_helper import log
from typing import List, Optional
pytest_plugins = ("fixtures.zenith_fixtures")
@@ -34,13 +38,22 @@ def test_normal_work(zenith_env_builder: ZenithEnvBuilder):
assert cur.fetchone() == (5000050000, )
@dataclass
class BranchMetrics:
name: str
latest_valid_lsn: int
# One entry per each Safekeeper, order is the same
flush_lsns: List[int] = field(default_factory=list)
commit_lsns: List[int] = field(default_factory=list)
# Run page server and multiple acceptors, and multiple compute nodes running
# against different timelines.
def test_many_timelines(zenith_env_builder: ZenithEnvBuilder):
zenith_env_builder.num_safekeepers = 3
env = zenith_env_builder.init()
n_timelines = 2
n_timelines = 3
branches = ["test_wal_acceptors_many_timelines_{}".format(tlin) for tlin in range(n_timelines)]
@@ -50,21 +63,114 @@ def test_many_timelines(zenith_env_builder: ZenithEnvBuilder):
env.zenith_cli(["branch", branch, "main"])
pgs.append(env.postgres.create_start(branch))
tenant_id = uuid.UUID(env.initial_tenant)
def collect_metrics(message: str) -> List[BranchMetrics]:
with env.pageserver.http_client() as pageserver_http:
branch_details = [
pageserver_http.branch_detail(tenant_id=tenant_id, name=branch)
for branch in branches
]
# All changes visible to pageserver (latest_valid_lsn) should be
# confirmed by safekeepers first. As we cannot atomically get
# state of both pageserver and safekeepers, we should start with
# pageserver. Looking at outdated data from pageserver is ok.
# Asking safekeepers first is not ok because new commits may arrive
# to both safekeepers and pageserver after we've already obtained
# safekeepers' state, it will look contradictory.
sk_metrics = [sk.http_client().get_metrics() for sk in env.safekeepers]
branch_metrics = []
with env.pageserver.http_client() as pageserver_http:
for branch_detail in branch_details:
timeline_id: str = branch_detail["timeline_id"]
m = BranchMetrics(
name=branch_detail["name"],
latest_valid_lsn=branch_detail["latest_valid_lsn"],
)
for sk_m in sk_metrics:
m.flush_lsns.append(sk_m.flush_lsn_inexact[timeline_id])
m.commit_lsns.append(sk_m.commit_lsn_inexact[timeline_id])
for flush_lsn, commit_lsn in zip(m.flush_lsns, m.commit_lsns):
# Invariant. May be < when transaction is in progress.
assert commit_lsn <= flush_lsn
# We only call collect_metrics() after a transaction is confirmed by
# the compute node, which only happens after a consensus of safekeepers
# has confirmed the transaction. We assume majority consensus here.
assert (2 * sum(m.latest_valid_lsn <= lsn
for lsn in m.flush_lsns) > zenith_env_builder.num_safekeepers)
assert (2 * sum(m.latest_valid_lsn <= lsn
for lsn in m.commit_lsns) > zenith_env_builder.num_safekeepers)
branch_metrics.append(m)
log.info(f"{message}: {branch_metrics}")
return branch_metrics
# TODO: https://github.com/zenithdb/zenith/issues/809
# collect_metrics("before CREATE TABLE")
# Do everything in different loops to have actions on different timelines
# interleaved.
# create schema
for pg in pgs:
pg.safe_psql("CREATE TABLE t(key int primary key, value text)")
init_m = collect_metrics("after CREATE TABLE")
# Populate data
for pg in pgs:
# Populate data for 2/3 branches
class MetricsChecker(threading.Thread):
def __init__(self) -> None:
super().__init__(daemon=True)
self.should_stop = threading.Event()
self.exception: Optional[BaseException] = None
def run(self) -> None:
try:
while not self.should_stop.is_set():
collect_metrics("during INSERT INTO")
time.sleep(1)
except:
log.error("MetricsChecker's thread failed, the test will be failed on .stop() call",
exc_info=True)
# We want to preserve traceback as well as the exception
exc_type, exc_value, exc_tb = sys.exc_info()
assert exc_type
e = exc_type(exc_value)
e.__traceback__ = exc_tb
self.exception = e
def stop(self) -> None:
self.should_stop.set()
self.join()
if self.exception:
raise self.exception
metrics_checker = MetricsChecker()
metrics_checker.start()
for pg in pgs[:-1]:
pg.safe_psql("INSERT INTO t SELECT generate_series(1,100000), 'payload'")
# Check data
for pg in pgs:
metrics_checker.stop()
collect_metrics("after INSERT INTO")
# Check data for 2/3 branches
for pg in pgs[:-1]:
res = pg.safe_psql("SELECT sum(key) FROM t")
assert res[0] == (5000050000, )
final_m = collect_metrics("after SELECT")
# Assume that LSNs (a) behave similarly in all branches; and (b) INSERT INTO alters LSN significantly.
# Also assume that safekeepers will not be significantly out of sync in this test.
middle_lsn = (init_m[0].latest_valid_lsn + final_m[0].latest_valid_lsn) // 2
assert max(init_m[0].flush_lsns) < middle_lsn < min(final_m[0].flush_lsns)
assert max(init_m[0].commit_lsns) < middle_lsn < min(final_m[0].commit_lsns)
assert max(init_m[1].flush_lsns) < middle_lsn < min(final_m[1].flush_lsns)
assert max(init_m[1].commit_lsns) < middle_lsn < min(final_m[1].commit_lsns)
assert max(init_m[2].flush_lsns) <= min(final_m[2].flush_lsns) < middle_lsn
assert max(init_m[2].commit_lsns) <= min(final_m[2].commit_lsns) < middle_lsn
# Check that dead minority doesn't prevent the commits: execute insert n_inserts
# times, with fault_probability chance of getting a wal acceptor down or up

View File

@@ -1,6 +1,6 @@
from __future__ import annotations
from dataclasses import dataclass
from dataclasses import dataclass, field
from cached_property import cached_property
import asyncpg
import os
@@ -11,6 +11,7 @@ import jwt
import json
import psycopg2
import pytest
import re
import shutil
import socket
import subprocess
@@ -715,7 +716,7 @@ class ZenithPageserver(PgProtocol):
def __exit__(self, exc_type, exc, tb):
self.stop(True)
def http_client(self, auth_token: Optional[str] = None):
def http_client(self, auth_token: Optional[str] = None) -> ZenithPageserverHttpClient:
return ZenithPageserverHttpClient(
port=self.service_port.http,
auth_token=auth_token,
@@ -1084,7 +1085,7 @@ class Safekeeper:
assert isinstance(res, dict)
return res
def http_client(self):
def http_client(self) -> SafekeeperHttpClient:
return SafekeeperHttpClient(port=self.port.http)
@@ -1094,6 +1095,14 @@ class SafekeeperTimelineStatus:
flush_lsn: str
@dataclass
class SafekeeperMetrics:
# These are metrics from Prometheus which uses float64 internally.
# As a consequence, values may differ from real original int64s.
flush_lsn_inexact: Dict[str, int] = field(default_factory=dict)
commit_lsn_inexact: Dict[str, int] = field(default_factory=dict)
class SafekeeperHttpClient(requests.Session):
def __init__(self, port: int) -> None:
super().__init__()
@@ -1109,6 +1118,22 @@ class SafekeeperHttpClient(requests.Session):
return SafekeeperTimelineStatus(acceptor_epoch=resj['acceptor_state']['epoch'],
flush_lsn=resj['flush_lsn'])
def get_metrics(self) -> SafekeeperMetrics:
request_result = self.get(f"http://localhost:{self.port}/metrics")
request_result.raise_for_status()
all_metrics_text = request_result.text
metrics = SafekeeperMetrics()
for match in re.finditer(r'^safekeeper_flush_lsn{ztli="([0-9a-f]+)"} (\S+)$',
all_metrics_text,
re.MULTILINE):
metrics.flush_lsn_inexact[match.group(1)] = int(match.group(2))
for match in re.finditer(r'^safekeeper_commit_lsn{ztli="([0-9a-f]+)"} (\S+)$',
all_metrics_text,
re.MULTILINE):
metrics.commit_lsn_inexact[match.group(1)] = int(match.group(2))
return metrics
def get_test_output_dir(request: Any) -> str:
""" Compute the working directory for an individual test. """