mirror of
https://github.com/neondatabase/neon.git
synced 2026-01-15 17:32:56 +00:00
Add a fixture for setting up neon_env and metrics server
Signed-off-by: Rahul Modpur <rmodpur2@gmail.com>
This commit is contained in:
committed by
Joonas Koivunen
parent
1ab0cfc8cb
commit
5cf2b8de01
@@ -18,6 +18,7 @@ from datetime import datetime
|
||||
from functools import cached_property
|
||||
from itertools import chain, product
|
||||
from pathlib import Path
|
||||
from queue import SimpleQueue
|
||||
from types import TracebackType
|
||||
from typing import Any, Dict, Iterator, List, Optional, Tuple, Type, cast
|
||||
from urllib.parse import urlparse
|
||||
@@ -36,8 +37,11 @@ from _pytest.fixtures import FixtureRequest
|
||||
from psycopg2.extensions import connection as PgConnection
|
||||
from psycopg2.extensions import cursor as PgCursor
|
||||
from psycopg2.extensions import make_dsn, parse_dsn
|
||||
from pytest_httpserver import HTTPServer
|
||||
from typing_extensions import Literal
|
||||
from urllib3.util.retry import Retry
|
||||
from werkzeug.wrappers.request import Request
|
||||
from werkzeug.wrappers.response import Response
|
||||
|
||||
from fixtures.broker import NeonBroker
|
||||
from fixtures.log_helper import log
|
||||
@@ -1005,6 +1009,70 @@ def neon_env_builder(
|
||||
yield builder
|
||||
|
||||
|
||||
@pytest.fixture(scope="function")
|
||||
def neon_env_and_metrics_server(
|
||||
httpserver: HTTPServer,
|
||||
neon_env_builder: NeonEnvBuilder,
|
||||
httpserver_listen_address,
|
||||
) -> Iterator[Tuple[NeonEnv, SimpleQueue[Any]]]:
|
||||
"""
|
||||
Fixture to create a Neon environment and metrics server.
|
||||
"""
|
||||
|
||||
(host, port) = httpserver_listen_address
|
||||
metric_collection_endpoint = f"http://{host}:{port}/billing/api/v1/usage_events"
|
||||
|
||||
# this should be Union[str, Tuple[List[Any], bool]], but it will make unpacking much more verbose
|
||||
uploads: SimpleQueue[Any] = SimpleQueue()
|
||||
|
||||
def metrics_handler(request: Request) -> Response:
|
||||
if request.json is None:
|
||||
return Response(status=400)
|
||||
|
||||
events = request.json["events"]
|
||||
is_last = request.headers["pageserver-metrics-last-upload-in-batch"]
|
||||
assert is_last in ["true", "false"]
|
||||
uploads.put((events, is_last == "true"))
|
||||
return Response(status=200)
|
||||
|
||||
# Require collecting metrics frequently, since we change
|
||||
# the timeline and want something to be logged about it.
|
||||
#
|
||||
# Disable time-based pitr, we will use the manual GC calls
|
||||
# to trigger remote storage operations in a controlled way
|
||||
neon_env_builder.pageserver_config_override = f"""
|
||||
metric_collection_interval="1s"
|
||||
metric_collection_endpoint="{metric_collection_endpoint}"
|
||||
cached_metric_collection_interval="0s"
|
||||
synthetic_size_calculation_interval="3s"
|
||||
"""
|
||||
|
||||
neon_env_builder.enable_pageserver_remote_storage(RemoteStorageKind.LOCAL_FS)
|
||||
|
||||
log.info(f"test_metric_collection endpoint is {metric_collection_endpoint}")
|
||||
|
||||
# mock http server that returns OK for the metrics
|
||||
httpserver.expect_request("/billing/api/v1/usage_events", method="POST").respond_with_handler(
|
||||
metrics_handler
|
||||
)
|
||||
|
||||
# spin up neon, after http server is ready
|
||||
env = neon_env_builder.init_start(initial_tenant_conf={"pitr_interval": "0 sec"})
|
||||
# httpserver is shut down before pageserver during passing run
|
||||
env.pageserver.allowed_errors.append(".*metrics endpoint refused the sent metrics*")
|
||||
# we have a fast rate of calculation, these can happen at shutdown
|
||||
env.pageserver.allowed_errors.append(
|
||||
".*synthetic_size_worker:calculate_synthetic_size.*:gather_size_inputs.*: failed to calculate logical size at .*: cancelled.*"
|
||||
)
|
||||
env.pageserver.allowed_errors.append(
|
||||
".*synthetic_size_worker: failed to calculate synthetic size for tenant .*: failed to calculate some logical_sizes"
|
||||
)
|
||||
|
||||
yield (env, uploads)
|
||||
|
||||
httpserver.check()
|
||||
|
||||
|
||||
@dataclass
|
||||
class PageserverPort:
|
||||
pg: int
|
||||
|
||||
@@ -3,75 +3,18 @@ import time
|
||||
from dataclasses import dataclass
|
||||
from pathlib import Path
|
||||
from queue import SimpleQueue
|
||||
from typing import Any, Dict, Set
|
||||
from typing import Any, Dict, Set, Tuple
|
||||
|
||||
from fixtures.log_helper import log
|
||||
from fixtures.neon_fixtures import (
|
||||
NeonEnvBuilder,
|
||||
NeonEnv,
|
||||
wait_for_last_flush_lsn,
|
||||
)
|
||||
from fixtures.remote_storage import RemoteStorageKind
|
||||
from fixtures.types import TenantId, TimelineId
|
||||
from pytest_httpserver import HTTPServer
|
||||
from werkzeug.wrappers.request import Request
|
||||
from werkzeug.wrappers.response import Response
|
||||
|
||||
# TODO: collect all of the env setup *AFTER* removal of RemoteStorageKind.NOOP
|
||||
|
||||
|
||||
def test_metric_collection(
|
||||
httpserver: HTTPServer,
|
||||
neon_env_builder: NeonEnvBuilder,
|
||||
httpserver_listen_address,
|
||||
):
|
||||
(host, port) = httpserver_listen_address
|
||||
metric_collection_endpoint = f"http://{host}:{port}/billing/api/v1/usage_events"
|
||||
|
||||
# this should be Union[str, Tuple[List[Any], bool]], but it will make unpacking much more verbose
|
||||
uploads: SimpleQueue[Any] = SimpleQueue()
|
||||
|
||||
def metrics_handler(request: Request) -> Response:
|
||||
if request.json is None:
|
||||
return Response(status=400)
|
||||
|
||||
events = request.json["events"]
|
||||
is_last = request.headers["pageserver-metrics-last-upload-in-batch"]
|
||||
assert is_last in ["true", "false"]
|
||||
uploads.put((events, is_last == "true"))
|
||||
return Response(status=200)
|
||||
|
||||
# Require collecting metrics frequently, since we change
|
||||
# the timeline and want something to be logged about it.
|
||||
#
|
||||
# Disable time-based pitr, we will use the manual GC calls
|
||||
# to trigger remote storage operations in a controlled way
|
||||
neon_env_builder.pageserver_config_override = f"""
|
||||
metric_collection_interval="1s"
|
||||
metric_collection_endpoint="{metric_collection_endpoint}"
|
||||
cached_metric_collection_interval="0s"
|
||||
synthetic_size_calculation_interval="3s"
|
||||
"""
|
||||
|
||||
neon_env_builder.enable_pageserver_remote_storage(RemoteStorageKind.LOCAL_FS)
|
||||
|
||||
log.info(f"test_metric_collection endpoint is {metric_collection_endpoint}")
|
||||
|
||||
# mock http server that returns OK for the metrics
|
||||
httpserver.expect_request("/billing/api/v1/usage_events", method="POST").respond_with_handler(
|
||||
metrics_handler
|
||||
)
|
||||
|
||||
# spin up neon, after http server is ready
|
||||
env = neon_env_builder.init_start(initial_tenant_conf={"pitr_interval": "0 sec"})
|
||||
# httpserver is shut down before pageserver during passing run
|
||||
env.pageserver.allowed_errors.append(".*metrics endpoint refused the sent metrics*")
|
||||
# we have a fast rate of calculation, these can happen at shutdown
|
||||
env.pageserver.allowed_errors.append(
|
||||
".*synthetic_size_worker:calculate_synthetic_size.*:gather_size_inputs.*: failed to calculate logical size at .*: cancelled.*"
|
||||
)
|
||||
env.pageserver.allowed_errors.append(
|
||||
".*synthetic_size_worker: failed to calculate synthetic size for tenant .*: failed to calculate some logical_sizes"
|
||||
)
|
||||
def test_metric_collection(neon_env_and_metrics_server: Tuple[NeonEnv, SimpleQueue[Any]]):
|
||||
(env, uploads) = neon_env_and_metrics_server
|
||||
|
||||
tenant_id = env.initial_tenant
|
||||
timeline_id = env.initial_timeline
|
||||
@@ -164,63 +107,13 @@ def test_metric_collection(
|
||||
(events, is_last) = events
|
||||
v.ingest(events, is_last)
|
||||
|
||||
httpserver.check()
|
||||
|
||||
|
||||
def test_metric_collection_cleans_up_tempfile(
|
||||
httpserver: HTTPServer,
|
||||
neon_env_builder: NeonEnvBuilder,
|
||||
httpserver_listen_address,
|
||||
neon_env_and_metrics_server: Tuple[NeonEnv, SimpleQueue[Any]]
|
||||
):
|
||||
(host, port) = httpserver_listen_address
|
||||
metric_collection_endpoint = f"http://{host}:{port}/billing/api/v1/usage_events"
|
||||
|
||||
# this should be Union[str, Tuple[List[Any], bool]], but it will make unpacking much more verbose
|
||||
uploads: SimpleQueue[Any] = SimpleQueue()
|
||||
|
||||
def metrics_handler(request: Request) -> Response:
|
||||
if request.json is None:
|
||||
return Response(status=400)
|
||||
|
||||
events = request.json["events"]
|
||||
is_last = request.headers["pageserver-metrics-last-upload-in-batch"]
|
||||
assert is_last in ["true", "false"]
|
||||
uploads.put((events, is_last == "true"))
|
||||
return Response(status=200)
|
||||
|
||||
# Require collecting metrics frequently, since we change
|
||||
# the timeline and want something to be logged about it.
|
||||
#
|
||||
# Disable time-based pitr, we will use the manual GC calls
|
||||
# to trigger remote storage operations in a controlled way
|
||||
neon_env_builder.pageserver_config_override = f"""
|
||||
metric_collection_interval="1s"
|
||||
metric_collection_endpoint="{metric_collection_endpoint}"
|
||||
cached_metric_collection_interval="0s"
|
||||
synthetic_size_calculation_interval="3s"
|
||||
"""
|
||||
|
||||
neon_env_builder.enable_pageserver_remote_storage(RemoteStorageKind.LOCAL_FS)
|
||||
|
||||
# mock http server that returns OK for the metrics
|
||||
httpserver.expect_request("/billing/api/v1/usage_events", method="POST").respond_with_handler(
|
||||
metrics_handler
|
||||
)
|
||||
|
||||
# spin up neon, after http server is ready
|
||||
env = neon_env_builder.init_start(initial_tenant_conf={"pitr_interval": "0 sec"})
|
||||
(env, uploads) = neon_env_and_metrics_server
|
||||
pageserver_http = env.pageserver.http_client()
|
||||
|
||||
# httpserver is shut down before pageserver during passing run
|
||||
env.pageserver.allowed_errors.append(".*metrics endpoint refused the sent metrics*")
|
||||
# we have a fast rate of calculation, these can happen at shutdown
|
||||
env.pageserver.allowed_errors.append(
|
||||
".*synthetic_size_worker:calculate_synthetic_size.*:gather_size_inputs.*: failed to calculate logical size at .*: cancelled.*"
|
||||
)
|
||||
env.pageserver.allowed_errors.append(
|
||||
".*synthetic_size_worker: failed to calculate synthetic size for tenant .*: failed to calculate some logical_sizes"
|
||||
)
|
||||
|
||||
tenant_id = env.initial_tenant
|
||||
timeline_id = env.initial_timeline
|
||||
endpoint = env.endpoints.create_start("main", tenant_id=tenant_id)
|
||||
|
||||
Reference in New Issue
Block a user