diff --git a/test_runner/batch_others/test_wal_acceptor.py b/test_runner/batch_others/test_wal_acceptor.py index 8e33264a20..57cf379a96 100644 --- a/test_runner/batch_others/test_wal_acceptor.py +++ b/test_runner/batch_others/test_wal_acceptor.py @@ -3,13 +3,17 @@ import random import time import os import subprocess +import sys +import threading import uuid from contextlib import closing +from dataclasses import dataclass, field from multiprocessing import Process, Value from fixtures.zenith_fixtures import PgBin, ZenithEnv, ZenithEnvBuilder from fixtures.utils import lsn_to_hex, mkdir_if_needed from fixtures.log_helper import log +from typing import List, Optional pytest_plugins = ("fixtures.zenith_fixtures") @@ -34,13 +38,22 @@ def test_normal_work(zenith_env_builder: ZenithEnvBuilder): assert cur.fetchone() == (5000050000, ) +@dataclass +class BranchMetrics: + name: str + latest_valid_lsn: int + # One entry per each Safekeeper, order is the same + flush_lsns: List[int] = field(default_factory=list) + commit_lsns: List[int] = field(default_factory=list) + + # Run page server and multiple acceptors, and multiple compute nodes running # against different timelines. def test_many_timelines(zenith_env_builder: ZenithEnvBuilder): zenith_env_builder.num_safekeepers = 3 env = zenith_env_builder.init() - n_timelines = 2 + n_timelines = 3 branches = ["test_wal_acceptors_many_timelines_{}".format(tlin) for tlin in range(n_timelines)] @@ -50,21 +63,114 @@ def test_many_timelines(zenith_env_builder: ZenithEnvBuilder): env.zenith_cli(["branch", branch, "main"]) pgs.append(env.postgres.create_start(branch)) + tenant_id = uuid.UUID(env.initial_tenant) + + def collect_metrics(message: str) -> List[BranchMetrics]: + with env.pageserver.http_client() as pageserver_http: + branch_details = [ + pageserver_http.branch_detail(tenant_id=tenant_id, name=branch) + for branch in branches + ] + # All changes visible to pageserver (latest_valid_lsn) should be + # confirmed by safekeepers first. As we cannot atomically get + # state of both pageserver and safekeepers, we should start with + # pageserver. Looking at outdated data from pageserver is ok. + # Asking safekeepers first is not ok because new commits may arrive + # to both safekeepers and pageserver after we've already obtained + # safekeepers' state, it will look contradictory. + sk_metrics = [sk.http_client().get_metrics() for sk in env.safekeepers] + + branch_metrics = [] + with env.pageserver.http_client() as pageserver_http: + for branch_detail in branch_details: + timeline_id: str = branch_detail["timeline_id"] + + m = BranchMetrics( + name=branch_detail["name"], + latest_valid_lsn=branch_detail["latest_valid_lsn"], + ) + for sk_m in sk_metrics: + m.flush_lsns.append(sk_m.flush_lsn_inexact[timeline_id]) + m.commit_lsns.append(sk_m.commit_lsn_inexact[timeline_id]) + + for flush_lsn, commit_lsn in zip(m.flush_lsns, m.commit_lsns): + # Invariant. May be < when transaction is in progress. + assert commit_lsn <= flush_lsn + # We only call collect_metrics() after a transaction is confirmed by + # the compute node, which only happens after a consensus of safekeepers + # has confirmed the transaction. We assume majority consensus here. + assert (2 * sum(m.latest_valid_lsn <= lsn + for lsn in m.flush_lsns) > zenith_env_builder.num_safekeepers) + assert (2 * sum(m.latest_valid_lsn <= lsn + for lsn in m.commit_lsns) > zenith_env_builder.num_safekeepers) + branch_metrics.append(m) + log.info(f"{message}: {branch_metrics}") + return branch_metrics + + # TODO: https://github.com/zenithdb/zenith/issues/809 + # collect_metrics("before CREATE TABLE") + # Do everything in different loops to have actions on different timelines # interleaved. # create schema for pg in pgs: pg.safe_psql("CREATE TABLE t(key int primary key, value text)") + init_m = collect_metrics("after CREATE TABLE") - # Populate data - for pg in pgs: + # Populate data for 2/3 branches + class MetricsChecker(threading.Thread): + def __init__(self) -> None: + super().__init__(daemon=True) + self.should_stop = threading.Event() + self.exception: Optional[BaseException] = None + + def run(self) -> None: + try: + while not self.should_stop.is_set(): + collect_metrics("during INSERT INTO") + time.sleep(1) + except: + log.error("MetricsChecker's thread failed, the test will be failed on .stop() call", + exc_info=True) + # We want to preserve traceback as well as the exception + exc_type, exc_value, exc_tb = sys.exc_info() + assert exc_type + e = exc_type(exc_value) + e.__traceback__ = exc_tb + self.exception = e + + def stop(self) -> None: + self.should_stop.set() + self.join() + if self.exception: + raise self.exception + + metrics_checker = MetricsChecker() + metrics_checker.start() + + for pg in pgs[:-1]: pg.safe_psql("INSERT INTO t SELECT generate_series(1,100000), 'payload'") - # Check data - for pg in pgs: + metrics_checker.stop() + + collect_metrics("after INSERT INTO") + + # Check data for 2/3 branches + for pg in pgs[:-1]: res = pg.safe_psql("SELECT sum(key) FROM t") assert res[0] == (5000050000, ) + final_m = collect_metrics("after SELECT") + # Assume that LSNs (a) behave similarly in all branches; and (b) INSERT INTO alters LSN significantly. + # Also assume that safekeepers will not be significantly out of sync in this test. + middle_lsn = (init_m[0].latest_valid_lsn + final_m[0].latest_valid_lsn) // 2 + assert max(init_m[0].flush_lsns) < middle_lsn < min(final_m[0].flush_lsns) + assert max(init_m[0].commit_lsns) < middle_lsn < min(final_m[0].commit_lsns) + assert max(init_m[1].flush_lsns) < middle_lsn < min(final_m[1].flush_lsns) + assert max(init_m[1].commit_lsns) < middle_lsn < min(final_m[1].commit_lsns) + assert max(init_m[2].flush_lsns) <= min(final_m[2].flush_lsns) < middle_lsn + assert max(init_m[2].commit_lsns) <= min(final_m[2].commit_lsns) < middle_lsn + # Check that dead minority doesn't prevent the commits: execute insert n_inserts # times, with fault_probability chance of getting a wal acceptor down or up diff --git a/test_runner/fixtures/zenith_fixtures.py b/test_runner/fixtures/zenith_fixtures.py index 0a32c5bffb..6330b28094 100644 --- a/test_runner/fixtures/zenith_fixtures.py +++ b/test_runner/fixtures/zenith_fixtures.py @@ -1,6 +1,6 @@ from __future__ import annotations -from dataclasses import dataclass +from dataclasses import dataclass, field from cached_property import cached_property import asyncpg import os @@ -11,6 +11,7 @@ import jwt import json import psycopg2 import pytest +import re import shutil import socket import subprocess @@ -715,7 +716,7 @@ class ZenithPageserver(PgProtocol): def __exit__(self, exc_type, exc, tb): self.stop(True) - def http_client(self, auth_token: Optional[str] = None): + def http_client(self, auth_token: Optional[str] = None) -> ZenithPageserverHttpClient: return ZenithPageserverHttpClient( port=self.service_port.http, auth_token=auth_token, @@ -1084,7 +1085,7 @@ class Safekeeper: assert isinstance(res, dict) return res - def http_client(self): + def http_client(self) -> SafekeeperHttpClient: return SafekeeperHttpClient(port=self.port.http) @@ -1094,6 +1095,14 @@ class SafekeeperTimelineStatus: flush_lsn: str +@dataclass +class SafekeeperMetrics: + # These are metrics from Prometheus which uses float64 internally. + # As a consequence, values may differ from real original int64s. + flush_lsn_inexact: Dict[str, int] = field(default_factory=dict) + commit_lsn_inexact: Dict[str, int] = field(default_factory=dict) + + class SafekeeperHttpClient(requests.Session): def __init__(self, port: int) -> None: super().__init__() @@ -1109,6 +1118,22 @@ class SafekeeperHttpClient(requests.Session): return SafekeeperTimelineStatus(acceptor_epoch=resj['acceptor_state']['epoch'], flush_lsn=resj['flush_lsn']) + def get_metrics(self) -> SafekeeperMetrics: + request_result = self.get(f"http://localhost:{self.port}/metrics") + request_result.raise_for_status() + all_metrics_text = request_result.text + + metrics = SafekeeperMetrics() + for match in re.finditer(r'^safekeeper_flush_lsn{ztli="([0-9a-f]+)"} (\S+)$', + all_metrics_text, + re.MULTILINE): + metrics.flush_lsn_inexact[match.group(1)] = int(match.group(2)) + for match in re.finditer(r'^safekeeper_commit_lsn{ztli="([0-9a-f]+)"} (\S+)$', + all_metrics_text, + re.MULTILINE): + metrics.commit_lsn_inexact[match.group(1)] = int(match.group(2)) + return metrics + def get_test_output_dir(request: Any) -> str: """ Compute the working directory for an individual test. """