diff --git a/test_runner/conftest.py b/test_runner/conftest.py index 1c36c1ed02..200c9c3740 100644 --- a/test_runner/conftest.py +++ b/test_runner/conftest.py @@ -1,6 +1,7 @@ pytest_plugins = ( "fixtures.pg_version", "fixtures.parametrize", + "fixtures.httpserver", "fixtures.neon_fixtures", "fixtures.benchmark_fixture", "fixtures.pg_stats", diff --git a/test_runner/fixtures/httpserver.py b/test_runner/fixtures/httpserver.py new file mode 100644 index 0000000000..a321d59266 --- /dev/null +++ b/test_runner/fixtures/httpserver.py @@ -0,0 +1,45 @@ +from typing import Tuple + +import pytest +from pytest_httpserver import HTTPServer + +# TODO: mypy fails with: +# Module "fixtures.neon_fixtures" does not explicitly export attribute "PortDistributor" [attr-defined] +# from fixtures.neon_fixtures import PortDistributor + +# compared to the fixtures from pytest_httpserver with same names, these are +# always function scoped, so you can check and stop the server in tests. + + +@pytest.fixture(scope="function") +def httpserver_ssl_context(): + return None + + +@pytest.fixture(scope="function") +def make_httpserver(httpserver_listen_address, httpserver_ssl_context): + host, port = httpserver_listen_address + if not host: + host = HTTPServer.DEFAULT_LISTEN_HOST + if not port: + port = HTTPServer.DEFAULT_LISTEN_PORT + + server = HTTPServer(host=host, port=port, ssl_context=httpserver_ssl_context) + server.start() + yield server + server.clear() + if server.is_running(): + server.stop() + + +@pytest.fixture(scope="function") +def httpserver(make_httpserver): + server = make_httpserver + yield server + server.clear() + + +@pytest.fixture(scope="function") +def httpserver_listen_address(port_distributor) -> Tuple[str, int]: + port = port_distributor.get_port() + return ("localhost", port) diff --git a/test_runner/fixtures/neon_fixtures.py b/test_runner/fixtures/neon_fixtures.py index 5a1acec9da..c7759d9b44 100644 --- a/test_runner/fixtures/neon_fixtures.py +++ b/test_runner/fixtures/neon_fixtures.py @@ -223,12 +223,6 @@ def port_distributor(worker_base_port: int, worker_port_num: int) -> PortDistrib return PortDistributor(base_port=worker_base_port, port_number=worker_port_num) -@pytest.fixture(scope="session") -def httpserver_listen_address(port_distributor: PortDistributor): - port = port_distributor.get_port() - return ("localhost", port) - - @pytest.fixture(scope="function") def default_broker( port_distributor: PortDistributor, diff --git a/test_runner/regress/test_ddl_forwarding.py b/test_runner/regress/test_ddl_forwarding.py index ebd836ecbc..740e489759 100644 --- a/test_runner/regress/test_ddl_forwarding.py +++ b/test_runner/regress/test_ddl_forwarding.py @@ -42,12 +42,11 @@ def handle_role(dbs, roles, operation): raise ValueError("Invalid op") -fail = False - - -def ddl_forward_handler(request: Request, dbs: Dict[str, str], roles: Dict[str, str]) -> Response: +def ddl_forward_handler( + request: Request, dbs: Dict[str, str], roles: Dict[str, str], ddl: "DdlForwardingContext" +) -> Response: log.info(f"Received request with data {request.get_data(as_text=True)}") - if fail: + if ddl.fail: log.info("FAILING") return Response(status=500, response="Failed just cuz") if request.json is None: @@ -72,6 +71,7 @@ class DdlForwardingContext: self.port = port self.dbs: Dict[str, str] = {} self.roles: Dict[str, str] = {} + self.fail = False endpoint = "/management/api/v2/roles_and_databases" ddl_url = f"http://{host}:{port}{endpoint}" self.pg.configure( @@ -82,7 +82,7 @@ class DdlForwardingContext: ) log.info(f"Listening on {ddl_url}") self.server.expect_request(endpoint, method="PATCH").respond_with_handler( - lambda request: ddl_forward_handler(request, self.dbs, self.roles) + lambda request: ddl_forward_handler(request, self.dbs, self.roles, self) ) def __enter__(self): @@ -103,6 +103,9 @@ class DdlForwardingContext: def wait(self, timeout=3): self.server.wait(timeout=timeout) + def failures(self, bool): + self.fail = bool + def send_and_wait(self, query: str, timeout=3) -> List[Tuple[Any, ...]]: res = self.send(query) self.wait(timeout=timeout) @@ -203,9 +206,9 @@ def test_ddl_forwarding(ddl: DdlForwardingContext): assert ddl.dbs == {"stork": "cork"} with pytest.raises(psycopg2.InternalError): - global fail - fail = True + ddl.failures(True) cur.execute("CREATE DATABASE failure WITH OWNER=cork") ddl.wait() + ddl.failures(False) conn.close() diff --git a/test_runner/regress/test_gc_aggressive.py b/test_runner/regress/test_gc_aggressive.py index db0020a8a4..017a38f85c 100644 --- a/test_runner/regress/test_gc_aggressive.py +++ b/test_runner/regress/test_gc_aggressive.py @@ -15,45 +15,45 @@ from fixtures.types import TimelineId # Test configuration # -# Create a table with {num_rows} rows, and perform {updates_to_perform} random -# UPDATEs on it, using {num_connections} separate connections. -num_connections = 10 -num_rows = 100000 -updates_to_perform = 10000 - -updates_performed = 0 - - -# Run random UPDATEs on test table -async def update_table(endpoint: Endpoint): - global updates_performed - pg_conn = await endpoint.connect_async() - - while updates_performed < updates_to_perform: - updates_performed += 1 - id = random.randrange(1, num_rows) - await pg_conn.fetchrow(f"UPDATE foo SET counter = counter + 1 WHERE id = {id}") - - -# Perform aggressive GC with 0 horizon -async def gc(env: NeonEnv, timeline: TimelineId): - pageserver_http = env.pageserver.http_client() - - loop = asyncio.get_running_loop() - - def do_gc(): - pageserver_http.timeline_checkpoint(env.initial_tenant, timeline) - pageserver_http.timeline_gc(env.initial_tenant, timeline, 0) - - with concurrent.futures.ThreadPoolExecutor() as pool: - while updates_performed < updates_to_perform: - await loop.run_in_executor(pool, do_gc) +# Create a table with {NUM_ROWS} rows, and perform {UPDATES_TO_PERFORM} random +# UPDATEs on it, using {NUM_CONNECTIONS} separate connections. +NUM_CONNECTIONS = 10 +NUM_ROWS = 100000 +UPDATES_TO_PERFORM = 10000 # At the same time, run UPDATEs and GC async def update_and_gc(env: NeonEnv, endpoint: Endpoint, timeline: TimelineId): workers = [] - for _ in range(num_connections): + updates_performed = 0 + + # Perform aggressive GC with 0 horizon + async def gc(env: NeonEnv, timeline: TimelineId): + pageserver_http = env.pageserver.http_client() + nonlocal updates_performed + global UPDATES_TO_PERFORM + + loop = asyncio.get_running_loop() + + def do_gc(): + pageserver_http.timeline_checkpoint(env.initial_tenant, timeline) + pageserver_http.timeline_gc(env.initial_tenant, timeline, 0) + + with concurrent.futures.ThreadPoolExecutor() as pool: + while updates_performed < UPDATES_TO_PERFORM: + await loop.run_in_executor(pool, do_gc) + + # Run random UPDATEs on test table + async def update_table(endpoint: Endpoint): + pg_conn = await endpoint.connect_async() + nonlocal updates_performed + + while updates_performed < UPDATES_TO_PERFORM: + updates_performed += 1 + id = random.randrange(1, NUM_ROWS) + await pg_conn.fetchrow(f"UPDATE foo SET counter = counter + 1 WHERE id = {id}") + + for _ in range(NUM_CONNECTIONS): workers.append(asyncio.create_task(update_table(endpoint))) workers.append(asyncio.create_task(gc(env, timeline))) @@ -81,7 +81,7 @@ def test_gc_aggressive(neon_env_builder: NeonEnvBuilder): f""" INSERT INTO foo SELECT g, 0, 'long string to consume some space' || g - FROM generate_series(1, {num_rows}) g + FROM generate_series(1, {NUM_ROWS}) g """ ) cur.execute("CREATE INDEX ON foo(id)") @@ -91,7 +91,7 @@ def test_gc_aggressive(neon_env_builder: NeonEnvBuilder): cur.execute("SELECT COUNT(*), SUM(counter) FROM foo") r = cur.fetchone() assert r is not None - assert r == (num_rows, updates_to_perform) + assert r == (NUM_ROWS, UPDATES_TO_PERFORM) # @@ -99,6 +99,7 @@ def test_gc_aggressive(neon_env_builder: NeonEnvBuilder): def test_gc_index_upload(neon_env_builder: NeonEnvBuilder, remote_storage_kind: RemoteStorageKind): # Disable time-based pitr, we will use LSN-based thresholds in the manual GC calls neon_env_builder.pageserver_config_override = "tenant_config={pitr_interval = '0 sec'}" + num_index_uploads = 0 neon_env_builder.enable_pageserver_remote_storage(remote_storage_kind) @@ -160,5 +161,5 @@ def test_gc_index_upload(neon_env_builder: NeonEnvBuilder, remote_storage_kind: log.info(f"{num_index_uploads} index uploads after GC iteration {i}") after = num_index_uploads - log.info(f"{after-before} new index uploads during test") + log.info(f"{after - before} new index uploads during test") assert after - before < 5 diff --git a/test_runner/regress/test_metric_collection.py b/test_runner/regress/test_metric_collection.py index 9ec649ee24..821c2ae683 100644 --- a/test_runner/regress/test_metric_collection.py +++ b/test_runner/regress/test_metric_collection.py @@ -3,9 +3,9 @@ # Use mock HTTP server to receive metrics and verify that they look sane. # -import time from pathlib import Path -from typing import Iterator +from queue import SimpleQueue +from typing import Any, Iterator, Set import pytest from fixtures.log_helper import log @@ -18,56 +18,10 @@ from fixtures.neon_fixtures import ( ) from fixtures.port_distributor import PortDistributor from fixtures.remote_storage import RemoteStorageKind -from fixtures.types import TenantId from pytest_httpserver import HTTPServer from werkzeug.wrappers.request import Request from werkzeug.wrappers.response import Response -# ============================================================================== -# Storage metrics tests -# ============================================================================== - -initial_tenant = TenantId.generate() -remote_uploaded = 0 -checks = { - "written_size": lambda value: value > 0, - "resident_size": lambda value: value >= 0, - # >= 0 check here is to avoid race condition when we receive metrics before - # remote_uploaded is updated - "remote_storage_size": lambda value: value > 0 if remote_uploaded > 0 else value >= 0, - # logical size may lag behind the actual size, so allow 0 here - "timeline_logical_size": lambda value: value >= 0, -} - -metric_kinds_checked = set([]) - - -# -# verify that metrics look minilally sane -# -def metrics_handler(request: Request) -> Response: - if request.json is None: - return Response(status=400) - - events = request.json["events"] - log.info("received events:") - log.info(events) - - for event in events: - assert event["tenant_id"] == str( - initial_tenant - ), "Expecting metrics only from the initial tenant" - metric_name = event["metric"] - - check = checks.get(metric_name) - # calm down mypy - if check is not None: - assert check(event["value"]), f"{metric_name} isn't valid" - global metric_kinds_checked - metric_kinds_checked.add(metric_name) - - return Response(status=200) - @pytest.mark.parametrize( "remote_storage_kind", [RemoteStorageKind.NOOP, RemoteStorageKind.LOCAL_FS] @@ -81,6 +35,18 @@ def test_metric_collection( (host, port) = httpserver_listen_address metric_collection_endpoint = f"http://{host}:{port}/billing/api/v1/usage_events" + metric_kinds_checked: Set[str] = set([]) + + uploads: SimpleQueue[Any] = SimpleQueue() + + def metrics_handler(request: Request) -> Response: + if request.json is None: + return Response(status=400) + + events = request.json["events"] + uploads.put(events) + return Response(status=200) + # Require collecting metrics frequently, since we change # the timeline and want something to be logged about it. # @@ -90,6 +56,7 @@ def test_metric_collection( f""" metric_collection_interval="1s" metric_collection_endpoint="{metric_collection_endpoint}" + cached_metric_collection_interval="0s" """ + "tenant_config={pitr_interval = '0 sec'}" ) @@ -98,9 +65,6 @@ def test_metric_collection( log.info(f"test_metric_collection endpoint is {metric_collection_endpoint}") - # Set initial tenant of the test, that we expect the logs from - global initial_tenant - initial_tenant = neon_env_builder.initial_tenant # mock http server that returns OK for the metrics httpserver.expect_request("/billing/api/v1/usage_events", method="POST").respond_with_handler( metrics_handler @@ -108,8 +72,7 @@ def test_metric_collection( # spin up neon, after http server is ready env = neon_env_builder.init_start() - # Order of fixtures shutdown is not specified, and if http server gets down - # before pageserver, pageserver log might contain such errors in the end. + # httpserver is shut down before pageserver during passing run env.pageserver.allowed_errors.append(".*metrics endpoint refused the sent metrics*") tenant_id = env.initial_tenant timeline_id = env.neon_cli.create_branch("test_metric_collection") @@ -141,29 +104,74 @@ def test_metric_collection( total += sample[2] return int(total) + remote_uploaded = 0 + # upload some data to remote storage if remote_storage_kind == RemoteStorageKind.LOCAL_FS: wait_for_last_flush_lsn(env, endpoint, tenant_id, timeline_id) pageserver_http = env.pageserver.http_client() pageserver_http.timeline_checkpoint(tenant_id, timeline_id) pageserver_http.timeline_gc(tenant_id, timeline_id, 10000) - global remote_uploaded + remote_uploaded = get_num_remote_ops("index", "upload") assert remote_uploaded > 0 - # wait longer than collecting interval and check that all requests are served - time.sleep(3) - httpserver.check() - global metric_kinds_checked, checks + # we expect uploads at 1Hz, on busy runners this could be too optimistic, + # so give 5s we only want to get the following upload after "ready" value. + # later tests will be added to ensure that the timeseries are sane. + timeout = 5 + uploads.put("ready") + + while True: + # discard earlier than "ready" + log.info("waiting for upload") + events = uploads.get(timeout=timeout) + import json + + if events == "ready": + events = uploads.get(timeout=timeout) + httpserver.check() + httpserver.stop() + # if anything comes after this, we'll just ignore it + stringified = json.dumps(events, indent=2) + log.info(f"inspecting: {stringified}") + break + else: + stringified = json.dumps(events, indent=2) + log.info(f"discarding: {stringified}") + + # verify that metrics look minimally sane + checks = { + "written_size": lambda value: value > 0, + "resident_size": lambda value: value >= 0, + "remote_storage_size": lambda value: value > 0 if remote_uploaded > 0 else value == 0, + # logical size may lag behind the actual size, so allow 0 here + "timeline_logical_size": lambda value: value >= 0, + # this can also be zero, depending on when we get the value + "written_data_bytes_delta": lambda value: value >= 0, + } + + metric_kinds_checked = set() + metric_kinds_seen = set() + + for event in events: + assert event["tenant_id"] == str(tenant_id) + metric_name = event["metric"] + metric_kinds_seen.add(metric_name) + + check = checks.get(metric_name) + # calm down mypy + if check is not None: + value = event["value"] + log.info(f"checking {metric_name} value {value}") + assert check(value), f"{metric_name} isn't valid" + metric_kinds_checked.add(metric_name) + expected_checks = set(checks.keys()) - assert len(metric_kinds_checked) == len( - checks + assert ( + metric_kinds_checked == checks.keys() ), f"Expected to receive and check all kind of metrics, but {expected_checks - metric_kinds_checked} got uncovered" - - -# ============================================================================== -# Proxy metrics tests -# ============================================================================== + assert metric_kinds_seen == metric_kinds_checked def proxy_metrics_handler(request: Request) -> Response: diff --git a/test_runner/regress/test_ondemand_download.py b/test_runner/regress/test_ondemand_download.py index fd9f9d631d..49439c5fab 100644 --- a/test_runner/regress/test_ondemand_download.py +++ b/test_runner/regress/test_ondemand_download.py @@ -301,6 +301,7 @@ def test_ondemand_download_timetravel( # they are present only in the remote storage, only locally, or both. # It should not change. assert filled_current_physical == get_api_current_physical_size() + endpoint_old.stop() # diff --git a/test_runner/regress/test_tenant_detach.py b/test_runner/regress/test_tenant_detach.py index cdf72edc4d..b34b2f95b5 100644 --- a/test_runner/regress/test_tenant_detach.py +++ b/test_runner/regress/test_tenant_detach.py @@ -119,65 +119,6 @@ def test_tenant_reattach( num_connections = 10 num_rows = 100000 -updates_to_perform = 0 - -updates_started = 0 -updates_finished = 0 - - -# Run random UPDATEs on test table. On failure, try again. -async def update_table(pg_conn: asyncpg.Connection): - global updates_started, updates_finished, updates_to_perform - - while updates_started < updates_to_perform or updates_to_perform == 0: - updates_started += 1 - id = random.randrange(1, num_rows) - - # Loop to retry until the UPDATE succeeds - while True: - try: - await pg_conn.fetchrow(f"UPDATE t SET counter = counter + 1 WHERE id = {id}") - updates_finished += 1 - if updates_finished % 1000 == 0: - log.info(f"update {updates_finished} / {updates_to_perform}") - break - except asyncpg.PostgresError as e: - # Received error from Postgres. Log it, sleep a little, and continue - log.info(f"UPDATE error: {e}") - await asyncio.sleep(0.1) - - -async def sleep_and_reattach(pageserver_http: PageserverHttpClient, tenant_id: TenantId): - global updates_started, updates_finished, updates_to_perform - - # Wait until we have performed some updates - wait_until(20, 0.5, lambda: updates_finished > 500) - - log.info("Detaching tenant") - pageserver_http.tenant_detach(tenant_id) - await asyncio.sleep(1) - log.info("Re-attaching tenant") - pageserver_http.tenant_attach(tenant_id) - log.info("Re-attach finished") - - # Continue with 5000 more updates - updates_to_perform = updates_started + 5000 - - -# async guts of test_tenant_reattach_while_bysy test -async def reattach_while_busy( - env: NeonEnv, endpoint: Endpoint, pageserver_http: PageserverHttpClient, tenant_id: TenantId -): - workers = [] - for _ in range(num_connections): - pg_conn = await endpoint.connect_async() - workers.append(asyncio.create_task(update_table(pg_conn))) - - workers.append(asyncio.create_task(sleep_and_reattach(pageserver_http, tenant_id))) - await asyncio.gather(*workers) - - assert updates_finished == updates_to_perform - # Detach and re-attach tenant, while compute is busy running queries. # @@ -226,6 +167,62 @@ def test_tenant_reattach_while_busy( neon_env_builder: NeonEnvBuilder, remote_storage_kind: RemoteStorageKind, ): + updates_started = 0 + updates_finished = 0 + updates_to_perform = 0 + + # Run random UPDATEs on test table. On failure, try again. + async def update_table(pg_conn: asyncpg.Connection): + nonlocal updates_started, updates_finished, updates_to_perform + + while updates_started < updates_to_perform or updates_to_perform == 0: + updates_started += 1 + id = random.randrange(1, num_rows) + + # Loop to retry until the UPDATE succeeds + while True: + try: + await pg_conn.fetchrow(f"UPDATE t SET counter = counter + 1 WHERE id = {id}") + updates_finished += 1 + if updates_finished % 1000 == 0: + log.info(f"update {updates_finished} / {updates_to_perform}") + break + except asyncpg.PostgresError as e: + # Received error from Postgres. Log it, sleep a little, and continue + log.info(f"UPDATE error: {e}") + await asyncio.sleep(0.1) + + async def sleep_and_reattach(pageserver_http: PageserverHttpClient, tenant_id: TenantId): + nonlocal updates_started, updates_finished, updates_to_perform + + # Wait until we have performed some updates + wait_until(20, 0.5, lambda: updates_finished > 500) + + log.info("Detaching tenant") + pageserver_http.tenant_detach(tenant_id) + await asyncio.sleep(1) + log.info("Re-attaching tenant") + pageserver_http.tenant_attach(tenant_id) + log.info("Re-attach finished") + + # Continue with 5000 more updates + updates_to_perform = updates_started + 5000 + + # async guts of test_tenant_reattach_while_bysy test + async def reattach_while_busy( + env: NeonEnv, endpoint: Endpoint, pageserver_http: PageserverHttpClient, tenant_id: TenantId + ): + nonlocal updates_to_perform, updates_finished + workers = [] + for _ in range(num_connections): + pg_conn = await endpoint.connect_async() + workers.append(asyncio.create_task(update_table(pg_conn))) + + workers.append(asyncio.create_task(sleep_and_reattach(pageserver_http, tenant_id))) + await asyncio.gather(*workers) + + assert updates_finished == updates_to_perform + neon_env_builder.enable_pageserver_remote_storage(remote_storage_kind) env = neon_env_builder.init_start() diff --git a/test_runner/regress/test_wal_acceptor.py b/test_runner/regress/test_wal_acceptor.py index b080caca33..8199f5777b 100644 --- a/test_runner/regress/test_wal_acceptor.py +++ b/test_runner/regress/test_wal_acceptor.py @@ -14,6 +14,8 @@ from pathlib import Path from typing import Any, List, Optional import psycopg2 +import psycopg2.errors +import psycopg2.extras import pytest from fixtures.broker import NeonBroker from fixtures.log_helper import log @@ -260,7 +262,7 @@ def test_restarts(neon_env_builder: NeonEnvBuilder): else: failed_node.start() failed_node = None - assert query_scalar(cur, "SELECT sum(key) FROM t") == 500500 + assert query_scalar(cur, "SELECT sum(key) FROM t") == (n_inserts * (n_inserts + 1)) // 2 # Test that safekeepers push their info to the broker and learn peer status from it