refactor: globals in tests (#5298)

Refactor tests to have less globals.

This will allow to hopefully write more complex tests for our new metric
collection requirements in #5297. Includes reverted work from #4761
related to test globals.

Co-authored-by: Alexander Bayandin <alexander@neon.tech>
Co-authored-by: MMeent <matthias@neon.tech>
This commit is contained in:
Joonas Koivunen
2023-09-13 22:05:30 +03:00
committed by GitHub
parent 1697e7b319
commit ffd146c3e5
9 changed files with 228 additions and 176 deletions

View File

@@ -1,6 +1,7 @@
pytest_plugins = (
"fixtures.pg_version",
"fixtures.parametrize",
"fixtures.httpserver",
"fixtures.neon_fixtures",
"fixtures.benchmark_fixture",
"fixtures.pg_stats",

View File

@@ -0,0 +1,45 @@
from typing import Tuple
import pytest
from pytest_httpserver import HTTPServer
# TODO: mypy fails with:
# Module "fixtures.neon_fixtures" does not explicitly export attribute "PortDistributor" [attr-defined]
# from fixtures.neon_fixtures import PortDistributor
# compared to the fixtures from pytest_httpserver with same names, these are
# always function scoped, so you can check and stop the server in tests.
@pytest.fixture(scope="function")
def httpserver_ssl_context():
return None
@pytest.fixture(scope="function")
def make_httpserver(httpserver_listen_address, httpserver_ssl_context):
host, port = httpserver_listen_address
if not host:
host = HTTPServer.DEFAULT_LISTEN_HOST
if not port:
port = HTTPServer.DEFAULT_LISTEN_PORT
server = HTTPServer(host=host, port=port, ssl_context=httpserver_ssl_context)
server.start()
yield server
server.clear()
if server.is_running():
server.stop()
@pytest.fixture(scope="function")
def httpserver(make_httpserver):
server = make_httpserver
yield server
server.clear()
@pytest.fixture(scope="function")
def httpserver_listen_address(port_distributor) -> Tuple[str, int]:
port = port_distributor.get_port()
return ("localhost", port)

View File

@@ -223,12 +223,6 @@ def port_distributor(worker_base_port: int, worker_port_num: int) -> PortDistrib
return PortDistributor(base_port=worker_base_port, port_number=worker_port_num)
@pytest.fixture(scope="session")
def httpserver_listen_address(port_distributor: PortDistributor):
port = port_distributor.get_port()
return ("localhost", port)
@pytest.fixture(scope="function")
def default_broker(
port_distributor: PortDistributor,

View File

@@ -42,12 +42,11 @@ def handle_role(dbs, roles, operation):
raise ValueError("Invalid op")
fail = False
def ddl_forward_handler(request: Request, dbs: Dict[str, str], roles: Dict[str, str]) -> Response:
def ddl_forward_handler(
request: Request, dbs: Dict[str, str], roles: Dict[str, str], ddl: "DdlForwardingContext"
) -> Response:
log.info(f"Received request with data {request.get_data(as_text=True)}")
if fail:
if ddl.fail:
log.info("FAILING")
return Response(status=500, response="Failed just cuz")
if request.json is None:
@@ -72,6 +71,7 @@ class DdlForwardingContext:
self.port = port
self.dbs: Dict[str, str] = {}
self.roles: Dict[str, str] = {}
self.fail = False
endpoint = "/management/api/v2/roles_and_databases"
ddl_url = f"http://{host}:{port}{endpoint}"
self.pg.configure(
@@ -82,7 +82,7 @@ class DdlForwardingContext:
)
log.info(f"Listening on {ddl_url}")
self.server.expect_request(endpoint, method="PATCH").respond_with_handler(
lambda request: ddl_forward_handler(request, self.dbs, self.roles)
lambda request: ddl_forward_handler(request, self.dbs, self.roles, self)
)
def __enter__(self):
@@ -103,6 +103,9 @@ class DdlForwardingContext:
def wait(self, timeout=3):
self.server.wait(timeout=timeout)
def failures(self, bool):
self.fail = bool
def send_and_wait(self, query: str, timeout=3) -> List[Tuple[Any, ...]]:
res = self.send(query)
self.wait(timeout=timeout)
@@ -203,9 +206,9 @@ def test_ddl_forwarding(ddl: DdlForwardingContext):
assert ddl.dbs == {"stork": "cork"}
with pytest.raises(psycopg2.InternalError):
global fail
fail = True
ddl.failures(True)
cur.execute("CREATE DATABASE failure WITH OWNER=cork")
ddl.wait()
ddl.failures(False)
conn.close()

View File

@@ -15,45 +15,45 @@ from fixtures.types import TimelineId
# Test configuration
#
# Create a table with {num_rows} rows, and perform {updates_to_perform} random
# UPDATEs on it, using {num_connections} separate connections.
num_connections = 10
num_rows = 100000
updates_to_perform = 10000
updates_performed = 0
# Run random UPDATEs on test table
async def update_table(endpoint: Endpoint):
global updates_performed
pg_conn = await endpoint.connect_async()
while updates_performed < updates_to_perform:
updates_performed += 1
id = random.randrange(1, num_rows)
await pg_conn.fetchrow(f"UPDATE foo SET counter = counter + 1 WHERE id = {id}")
# Perform aggressive GC with 0 horizon
async def gc(env: NeonEnv, timeline: TimelineId):
pageserver_http = env.pageserver.http_client()
loop = asyncio.get_running_loop()
def do_gc():
pageserver_http.timeline_checkpoint(env.initial_tenant, timeline)
pageserver_http.timeline_gc(env.initial_tenant, timeline, 0)
with concurrent.futures.ThreadPoolExecutor() as pool:
while updates_performed < updates_to_perform:
await loop.run_in_executor(pool, do_gc)
# Create a table with {NUM_ROWS} rows, and perform {UPDATES_TO_PERFORM} random
# UPDATEs on it, using {NUM_CONNECTIONS} separate connections.
NUM_CONNECTIONS = 10
NUM_ROWS = 100000
UPDATES_TO_PERFORM = 10000
# At the same time, run UPDATEs and GC
async def update_and_gc(env: NeonEnv, endpoint: Endpoint, timeline: TimelineId):
workers = []
for _ in range(num_connections):
updates_performed = 0
# Perform aggressive GC with 0 horizon
async def gc(env: NeonEnv, timeline: TimelineId):
pageserver_http = env.pageserver.http_client()
nonlocal updates_performed
global UPDATES_TO_PERFORM
loop = asyncio.get_running_loop()
def do_gc():
pageserver_http.timeline_checkpoint(env.initial_tenant, timeline)
pageserver_http.timeline_gc(env.initial_tenant, timeline, 0)
with concurrent.futures.ThreadPoolExecutor() as pool:
while updates_performed < UPDATES_TO_PERFORM:
await loop.run_in_executor(pool, do_gc)
# Run random UPDATEs on test table
async def update_table(endpoint: Endpoint):
pg_conn = await endpoint.connect_async()
nonlocal updates_performed
while updates_performed < UPDATES_TO_PERFORM:
updates_performed += 1
id = random.randrange(1, NUM_ROWS)
await pg_conn.fetchrow(f"UPDATE foo SET counter = counter + 1 WHERE id = {id}")
for _ in range(NUM_CONNECTIONS):
workers.append(asyncio.create_task(update_table(endpoint)))
workers.append(asyncio.create_task(gc(env, timeline)))
@@ -81,7 +81,7 @@ def test_gc_aggressive(neon_env_builder: NeonEnvBuilder):
f"""
INSERT INTO foo
SELECT g, 0, 'long string to consume some space' || g
FROM generate_series(1, {num_rows}) g
FROM generate_series(1, {NUM_ROWS}) g
"""
)
cur.execute("CREATE INDEX ON foo(id)")
@@ -91,7 +91,7 @@ def test_gc_aggressive(neon_env_builder: NeonEnvBuilder):
cur.execute("SELECT COUNT(*), SUM(counter) FROM foo")
r = cur.fetchone()
assert r is not None
assert r == (num_rows, updates_to_perform)
assert r == (NUM_ROWS, UPDATES_TO_PERFORM)
#
@@ -99,6 +99,7 @@ def test_gc_aggressive(neon_env_builder: NeonEnvBuilder):
def test_gc_index_upload(neon_env_builder: NeonEnvBuilder, remote_storage_kind: RemoteStorageKind):
# Disable time-based pitr, we will use LSN-based thresholds in the manual GC calls
neon_env_builder.pageserver_config_override = "tenant_config={pitr_interval = '0 sec'}"
num_index_uploads = 0
neon_env_builder.enable_pageserver_remote_storage(remote_storage_kind)
@@ -160,5 +161,5 @@ def test_gc_index_upload(neon_env_builder: NeonEnvBuilder, remote_storage_kind:
log.info(f"{num_index_uploads} index uploads after GC iteration {i}")
after = num_index_uploads
log.info(f"{after-before} new index uploads during test")
log.info(f"{after - before} new index uploads during test")
assert after - before < 5

View File

@@ -3,9 +3,9 @@
# Use mock HTTP server to receive metrics and verify that they look sane.
#
import time
from pathlib import Path
from typing import Iterator
from queue import SimpleQueue
from typing import Any, Iterator, Set
import pytest
from fixtures.log_helper import log
@@ -18,56 +18,10 @@ from fixtures.neon_fixtures import (
)
from fixtures.port_distributor import PortDistributor
from fixtures.remote_storage import RemoteStorageKind
from fixtures.types import TenantId
from pytest_httpserver import HTTPServer
from werkzeug.wrappers.request import Request
from werkzeug.wrappers.response import Response
# ==============================================================================
# Storage metrics tests
# ==============================================================================
initial_tenant = TenantId.generate()
remote_uploaded = 0
checks = {
"written_size": lambda value: value > 0,
"resident_size": lambda value: value >= 0,
# >= 0 check here is to avoid race condition when we receive metrics before
# remote_uploaded is updated
"remote_storage_size": lambda value: value > 0 if remote_uploaded > 0 else value >= 0,
# logical size may lag behind the actual size, so allow 0 here
"timeline_logical_size": lambda value: value >= 0,
}
metric_kinds_checked = set([])
#
# verify that metrics look minilally sane
#
def metrics_handler(request: Request) -> Response:
if request.json is None:
return Response(status=400)
events = request.json["events"]
log.info("received events:")
log.info(events)
for event in events:
assert event["tenant_id"] == str(
initial_tenant
), "Expecting metrics only from the initial tenant"
metric_name = event["metric"]
check = checks.get(metric_name)
# calm down mypy
if check is not None:
assert check(event["value"]), f"{metric_name} isn't valid"
global metric_kinds_checked
metric_kinds_checked.add(metric_name)
return Response(status=200)
@pytest.mark.parametrize(
"remote_storage_kind", [RemoteStorageKind.NOOP, RemoteStorageKind.LOCAL_FS]
@@ -81,6 +35,18 @@ def test_metric_collection(
(host, port) = httpserver_listen_address
metric_collection_endpoint = f"http://{host}:{port}/billing/api/v1/usage_events"
metric_kinds_checked: Set[str] = set([])
uploads: SimpleQueue[Any] = SimpleQueue()
def metrics_handler(request: Request) -> Response:
if request.json is None:
return Response(status=400)
events = request.json["events"]
uploads.put(events)
return Response(status=200)
# Require collecting metrics frequently, since we change
# the timeline and want something to be logged about it.
#
@@ -90,6 +56,7 @@ def test_metric_collection(
f"""
metric_collection_interval="1s"
metric_collection_endpoint="{metric_collection_endpoint}"
cached_metric_collection_interval="0s"
"""
+ "tenant_config={pitr_interval = '0 sec'}"
)
@@ -98,9 +65,6 @@ def test_metric_collection(
log.info(f"test_metric_collection endpoint is {metric_collection_endpoint}")
# Set initial tenant of the test, that we expect the logs from
global initial_tenant
initial_tenant = neon_env_builder.initial_tenant
# mock http server that returns OK for the metrics
httpserver.expect_request("/billing/api/v1/usage_events", method="POST").respond_with_handler(
metrics_handler
@@ -108,8 +72,7 @@ def test_metric_collection(
# spin up neon, after http server is ready
env = neon_env_builder.init_start()
# Order of fixtures shutdown is not specified, and if http server gets down
# before pageserver, pageserver log might contain such errors in the end.
# httpserver is shut down before pageserver during passing run
env.pageserver.allowed_errors.append(".*metrics endpoint refused the sent metrics*")
tenant_id = env.initial_tenant
timeline_id = env.neon_cli.create_branch("test_metric_collection")
@@ -141,29 +104,74 @@ def test_metric_collection(
total += sample[2]
return int(total)
remote_uploaded = 0
# upload some data to remote storage
if remote_storage_kind == RemoteStorageKind.LOCAL_FS:
wait_for_last_flush_lsn(env, endpoint, tenant_id, timeline_id)
pageserver_http = env.pageserver.http_client()
pageserver_http.timeline_checkpoint(tenant_id, timeline_id)
pageserver_http.timeline_gc(tenant_id, timeline_id, 10000)
global remote_uploaded
remote_uploaded = get_num_remote_ops("index", "upload")
assert remote_uploaded > 0
# wait longer than collecting interval and check that all requests are served
time.sleep(3)
httpserver.check()
global metric_kinds_checked, checks
# we expect uploads at 1Hz, on busy runners this could be too optimistic,
# so give 5s we only want to get the following upload after "ready" value.
# later tests will be added to ensure that the timeseries are sane.
timeout = 5
uploads.put("ready")
while True:
# discard earlier than "ready"
log.info("waiting for upload")
events = uploads.get(timeout=timeout)
import json
if events == "ready":
events = uploads.get(timeout=timeout)
httpserver.check()
httpserver.stop()
# if anything comes after this, we'll just ignore it
stringified = json.dumps(events, indent=2)
log.info(f"inspecting: {stringified}")
break
else:
stringified = json.dumps(events, indent=2)
log.info(f"discarding: {stringified}")
# verify that metrics look minimally sane
checks = {
"written_size": lambda value: value > 0,
"resident_size": lambda value: value >= 0,
"remote_storage_size": lambda value: value > 0 if remote_uploaded > 0 else value == 0,
# logical size may lag behind the actual size, so allow 0 here
"timeline_logical_size": lambda value: value >= 0,
# this can also be zero, depending on when we get the value
"written_data_bytes_delta": lambda value: value >= 0,
}
metric_kinds_checked = set()
metric_kinds_seen = set()
for event in events:
assert event["tenant_id"] == str(tenant_id)
metric_name = event["metric"]
metric_kinds_seen.add(metric_name)
check = checks.get(metric_name)
# calm down mypy
if check is not None:
value = event["value"]
log.info(f"checking {metric_name} value {value}")
assert check(value), f"{metric_name} isn't valid"
metric_kinds_checked.add(metric_name)
expected_checks = set(checks.keys())
assert len(metric_kinds_checked) == len(
checks
assert (
metric_kinds_checked == checks.keys()
), f"Expected to receive and check all kind of metrics, but {expected_checks - metric_kinds_checked} got uncovered"
# ==============================================================================
# Proxy metrics tests
# ==============================================================================
assert metric_kinds_seen == metric_kinds_checked
def proxy_metrics_handler(request: Request) -> Response:

View File

@@ -301,6 +301,7 @@ def test_ondemand_download_timetravel(
# they are present only in the remote storage, only locally, or both.
# It should not change.
assert filled_current_physical == get_api_current_physical_size()
endpoint_old.stop()
#

View File

@@ -119,65 +119,6 @@ def test_tenant_reattach(
num_connections = 10
num_rows = 100000
updates_to_perform = 0
updates_started = 0
updates_finished = 0
# Run random UPDATEs on test table. On failure, try again.
async def update_table(pg_conn: asyncpg.Connection):
global updates_started, updates_finished, updates_to_perform
while updates_started < updates_to_perform or updates_to_perform == 0:
updates_started += 1
id = random.randrange(1, num_rows)
# Loop to retry until the UPDATE succeeds
while True:
try:
await pg_conn.fetchrow(f"UPDATE t SET counter = counter + 1 WHERE id = {id}")
updates_finished += 1
if updates_finished % 1000 == 0:
log.info(f"update {updates_finished} / {updates_to_perform}")
break
except asyncpg.PostgresError as e:
# Received error from Postgres. Log it, sleep a little, and continue
log.info(f"UPDATE error: {e}")
await asyncio.sleep(0.1)
async def sleep_and_reattach(pageserver_http: PageserverHttpClient, tenant_id: TenantId):
global updates_started, updates_finished, updates_to_perform
# Wait until we have performed some updates
wait_until(20, 0.5, lambda: updates_finished > 500)
log.info("Detaching tenant")
pageserver_http.tenant_detach(tenant_id)
await asyncio.sleep(1)
log.info("Re-attaching tenant")
pageserver_http.tenant_attach(tenant_id)
log.info("Re-attach finished")
# Continue with 5000 more updates
updates_to_perform = updates_started + 5000
# async guts of test_tenant_reattach_while_bysy test
async def reattach_while_busy(
env: NeonEnv, endpoint: Endpoint, pageserver_http: PageserverHttpClient, tenant_id: TenantId
):
workers = []
for _ in range(num_connections):
pg_conn = await endpoint.connect_async()
workers.append(asyncio.create_task(update_table(pg_conn)))
workers.append(asyncio.create_task(sleep_and_reattach(pageserver_http, tenant_id)))
await asyncio.gather(*workers)
assert updates_finished == updates_to_perform
# Detach and re-attach tenant, while compute is busy running queries.
#
@@ -226,6 +167,62 @@ def test_tenant_reattach_while_busy(
neon_env_builder: NeonEnvBuilder,
remote_storage_kind: RemoteStorageKind,
):
updates_started = 0
updates_finished = 0
updates_to_perform = 0
# Run random UPDATEs on test table. On failure, try again.
async def update_table(pg_conn: asyncpg.Connection):
nonlocal updates_started, updates_finished, updates_to_perform
while updates_started < updates_to_perform or updates_to_perform == 0:
updates_started += 1
id = random.randrange(1, num_rows)
# Loop to retry until the UPDATE succeeds
while True:
try:
await pg_conn.fetchrow(f"UPDATE t SET counter = counter + 1 WHERE id = {id}")
updates_finished += 1
if updates_finished % 1000 == 0:
log.info(f"update {updates_finished} / {updates_to_perform}")
break
except asyncpg.PostgresError as e:
# Received error from Postgres. Log it, sleep a little, and continue
log.info(f"UPDATE error: {e}")
await asyncio.sleep(0.1)
async def sleep_and_reattach(pageserver_http: PageserverHttpClient, tenant_id: TenantId):
nonlocal updates_started, updates_finished, updates_to_perform
# Wait until we have performed some updates
wait_until(20, 0.5, lambda: updates_finished > 500)
log.info("Detaching tenant")
pageserver_http.tenant_detach(tenant_id)
await asyncio.sleep(1)
log.info("Re-attaching tenant")
pageserver_http.tenant_attach(tenant_id)
log.info("Re-attach finished")
# Continue with 5000 more updates
updates_to_perform = updates_started + 5000
# async guts of test_tenant_reattach_while_bysy test
async def reattach_while_busy(
env: NeonEnv, endpoint: Endpoint, pageserver_http: PageserverHttpClient, tenant_id: TenantId
):
nonlocal updates_to_perform, updates_finished
workers = []
for _ in range(num_connections):
pg_conn = await endpoint.connect_async()
workers.append(asyncio.create_task(update_table(pg_conn)))
workers.append(asyncio.create_task(sleep_and_reattach(pageserver_http, tenant_id)))
await asyncio.gather(*workers)
assert updates_finished == updates_to_perform
neon_env_builder.enable_pageserver_remote_storage(remote_storage_kind)
env = neon_env_builder.init_start()

View File

@@ -14,6 +14,8 @@ from pathlib import Path
from typing import Any, List, Optional
import psycopg2
import psycopg2.errors
import psycopg2.extras
import pytest
from fixtures.broker import NeonBroker
from fixtures.log_helper import log
@@ -260,7 +262,7 @@ def test_restarts(neon_env_builder: NeonEnvBuilder):
else:
failed_node.start()
failed_node = None
assert query_scalar(cur, "SELECT sum(key) FROM t") == 500500
assert query_scalar(cur, "SELECT sum(key) FROM t") == (n_inserts * (n_inserts + 1)) // 2
# Test that safekeepers push their info to the broker and learn peer status from it