mirror of
https://github.com/neondatabase/neon.git
synced 2026-05-25 17:10:38 +00:00
Merge branch 'main' into skyzh/rm-file-if-fail
This commit is contained in:
@@ -1,6 +1,6 @@
|
||||
import json
|
||||
from pathlib import Path
|
||||
from typing import List
|
||||
from typing import Any, List, MutableMapping, cast
|
||||
|
||||
import pytest
|
||||
from _pytest.config import Config
|
||||
@@ -56,3 +56,15 @@ def pytest_collection_modifyitems(config: Config, items: List[pytest.Item]):
|
||||
# Rerun 3 times = 1 original run + 2 reruns
|
||||
log.info(f"Marking {item.nodeid} as flaky. It will be rerun up to 3 times")
|
||||
item.add_marker(pytest.mark.flaky(reruns=2))
|
||||
|
||||
# pytest-rerunfailures is not compatible with pytest-timeout (timeout is not set for reruns),
|
||||
# we can workaround it by setting `timeout_func_only` to True[1].
|
||||
# Unfortunately, setting `timeout_func_only = True` globally in pytest.ini is broken[2],
|
||||
# but we still can do it using pytest marker.
|
||||
#
|
||||
# - [1] https://github.com/pytest-dev/pytest-rerunfailures/issues/99
|
||||
# - [2] https://github.com/pytest-dev/pytest-timeout/issues/142
|
||||
timeout_marker = item.get_closest_marker("timeout")
|
||||
if timeout_marker is not None:
|
||||
kwargs = cast(MutableMapping[str, Any], timeout_marker.kwargs)
|
||||
kwargs["func_only"] = True
|
||||
|
||||
@@ -57,20 +57,28 @@ PAGESERVER_GLOBAL_METRICS: Tuple[str, ...] = (
|
||||
"libmetrics_launch_timestamp",
|
||||
"libmetrics_build_info",
|
||||
"libmetrics_tracing_event_count_total",
|
||||
"pageserver_materialized_cache_hits_total",
|
||||
"pageserver_materialized_cache_hits_direct_total",
|
||||
"pageserver_getpage_reconstruct_seconds_bucket",
|
||||
"pageserver_getpage_reconstruct_seconds_count",
|
||||
"pageserver_getpage_reconstruct_seconds_sum",
|
||||
*[f"pageserver_basebackup_query_seconds_{x}" for x in ["bucket", "count", "sum"]],
|
||||
)
|
||||
|
||||
PAGESERVER_PER_TENANT_METRICS: Tuple[str, ...] = (
|
||||
"pageserver_current_logical_size",
|
||||
"pageserver_resident_physical_size",
|
||||
"pageserver_getpage_reconstruct_seconds_bucket",
|
||||
"pageserver_getpage_reconstruct_seconds_count",
|
||||
"pageserver_getpage_reconstruct_seconds_sum",
|
||||
"pageserver_getpage_get_reconstruct_data_seconds_bucket",
|
||||
"pageserver_getpage_get_reconstruct_data_seconds_count",
|
||||
"pageserver_getpage_get_reconstruct_data_seconds_sum",
|
||||
"pageserver_io_operations_bytes_total",
|
||||
"pageserver_io_operations_seconds_bucket",
|
||||
"pageserver_io_operations_seconds_count",
|
||||
"pageserver_io_operations_seconds_sum",
|
||||
"pageserver_last_record_lsn",
|
||||
"pageserver_materialized_cache_hits_total",
|
||||
"pageserver_read_num_fs_layers_bucket",
|
||||
"pageserver_read_num_fs_layers_count",
|
||||
"pageserver_read_num_fs_layers_sum",
|
||||
"pageserver_smgr_query_seconds_bucket",
|
||||
"pageserver_smgr_query_seconds_count",
|
||||
"pageserver_smgr_query_seconds_sum",
|
||||
|
||||
@@ -629,7 +629,7 @@ class NeonEnvBuilder:
|
||||
assert self.env is not None, "environment is not already initialized, call init() first"
|
||||
self.env.start()
|
||||
|
||||
def init_start(self) -> NeonEnv:
|
||||
def init_start(self, initial_tenant_conf: Optional[Dict[str, str]] = None) -> NeonEnv:
|
||||
env = self.init_configs()
|
||||
self.start()
|
||||
|
||||
@@ -638,7 +638,9 @@ class NeonEnvBuilder:
|
||||
log.info(
|
||||
f"Services started, creating initial tenant {env.initial_tenant} and its initial timeline"
|
||||
)
|
||||
initial_tenant, initial_timeline = env.neon_cli.create_tenant(tenant_id=env.initial_tenant)
|
||||
initial_tenant, initial_timeline = env.neon_cli.create_tenant(
|
||||
tenant_id=env.initial_tenant, conf=initial_tenant_conf
|
||||
)
|
||||
env.initial_timeline = initial_timeline
|
||||
log.info(f"Initial timeline {initial_tenant}/{initial_timeline} created successfully")
|
||||
|
||||
@@ -661,6 +663,8 @@ class NeonEnvBuilder:
|
||||
else:
|
||||
raise RuntimeError(f"Unknown storage type: {remote_storage_kind}")
|
||||
|
||||
self.remote_storage_kind = remote_storage_kind
|
||||
|
||||
def enable_local_fs_remote_storage(self, force_enable: bool = True):
|
||||
"""
|
||||
Sets up the pageserver to use the local fs at the `test_dir/local_fs_remote_storage` path.
|
||||
@@ -1444,11 +1448,12 @@ class NeonCli(AbstractNeonCli):
|
||||
def endpoint_create(
|
||||
self,
|
||||
branch_name: str,
|
||||
pg_port: int,
|
||||
http_port: int,
|
||||
endpoint_id: Optional[str] = None,
|
||||
tenant_id: Optional[TenantId] = None,
|
||||
hot_standby: bool = False,
|
||||
lsn: Optional[Lsn] = None,
|
||||
port: Optional[int] = None,
|
||||
) -> "subprocess.CompletedProcess[str]":
|
||||
args = [
|
||||
"endpoint",
|
||||
@@ -1462,8 +1467,10 @@ class NeonCli(AbstractNeonCli):
|
||||
]
|
||||
if lsn is not None:
|
||||
args.extend(["--lsn", str(lsn)])
|
||||
if port is not None:
|
||||
args.extend(["--port", str(port)])
|
||||
if pg_port is not None:
|
||||
args.extend(["--pg-port", str(pg_port)])
|
||||
if http_port is not None:
|
||||
args.extend(["--http-port", str(http_port)])
|
||||
if endpoint_id is not None:
|
||||
args.append(endpoint_id)
|
||||
if hot_standby:
|
||||
@@ -1476,9 +1483,11 @@ class NeonCli(AbstractNeonCli):
|
||||
def endpoint_start(
|
||||
self,
|
||||
endpoint_id: str,
|
||||
pg_port: int,
|
||||
http_port: int,
|
||||
safekeepers: Optional[List[int]] = None,
|
||||
tenant_id: Optional[TenantId] = None,
|
||||
lsn: Optional[Lsn] = None,
|
||||
port: Optional[int] = None,
|
||||
) -> "subprocess.CompletedProcess[str]":
|
||||
args = [
|
||||
"endpoint",
|
||||
@@ -1490,8 +1499,10 @@ class NeonCli(AbstractNeonCli):
|
||||
]
|
||||
if lsn is not None:
|
||||
args.append(f"--lsn={lsn}")
|
||||
if port is not None:
|
||||
args.append(f"--port={port}")
|
||||
args.extend(["--pg-port", str(pg_port)])
|
||||
args.extend(["--http-port", str(http_port)])
|
||||
if safekeepers is not None:
|
||||
args.extend(["--safekeepers", (",".join(map(str, safekeepers)))])
|
||||
if endpoint_id is not None:
|
||||
args.append(endpoint_id)
|
||||
|
||||
@@ -1583,13 +1594,11 @@ class NeonPageserver(PgProtocol):
|
||||
".*serving compute connection task.*exited with error: Postgres connection error.*",
|
||||
".*serving compute connection task.*exited with error: Connection reset by peer.*",
|
||||
".*serving compute connection task.*exited with error: Postgres query error.*",
|
||||
".*Connection aborted: connection error: error communicating with the server: Broken pipe.*",
|
||||
".*Connection aborted: connection error: error communicating with the server: Transport endpoint is not connected.*",
|
||||
".*Connection aborted: connection error: error communicating with the server: Connection reset by peer.*",
|
||||
".*Connection aborted: error communicating with the server: Transport endpoint is not connected.*",
|
||||
# FIXME: replication patch for tokio_postgres regards any but CopyDone/CopyData message in CopyBoth stream as unexpected
|
||||
".*Connection aborted: connection error: unexpected message from server*",
|
||||
".*Connection aborted: unexpected message from server*",
|
||||
".*kill_and_wait_impl.*: wait successful.*",
|
||||
".*Replication stream finished: db error:.*ending streaming to Some*",
|
||||
".*: db error:.*ending streaming to Some.*",
|
||||
".*query handler for 'pagestream.*failed: Broken pipe.*", # pageserver notices compute shut down
|
||||
".*query handler for 'pagestream.*failed: Connection reset by peer.*", # pageserver notices compute shut down
|
||||
# safekeeper connection can fail with this, in the window between timeline creation
|
||||
@@ -1606,13 +1615,12 @@ class NeonPageserver(PgProtocol):
|
||||
".*manual_gc.*is_shutdown_requested\\(\\) called in an unexpected task or thread.*",
|
||||
".*tenant_list: timeline is not found in remote index while it is present in the tenants registry.*",
|
||||
".*Removing intermediate uninit mark file.*",
|
||||
# FIXME: known race condition in TaskHandle: https://github.com/neondatabase/neon/issues/2885
|
||||
".*sender is dropped while join handle is still alive.*",
|
||||
# Tenant::delete_timeline() can cause any of the four following errors.
|
||||
# FIXME: we shouldn't be considering it an error: https://github.com/neondatabase/neon/issues/2946
|
||||
".*could not flush frozen layer.*queue is in state Stopped", # when schedule layer upload fails because queued got closed before compaction got killed
|
||||
".*wait for layer upload ops to complete.*", # .*Caused by:.*wait_completion aborted because upload queue was stopped
|
||||
".*gc_loop.*Gc failed, retrying in.*timeline is Stopping", # When gc checks timeline state after acquiring layer_removal_cs
|
||||
".*gc_loop.*Gc failed, retrying in.*: Cannot run GC iteration on inactive tenant", # Tenant::gc precondition
|
||||
".*compaction_loop.*Compaction failed, retrying in.*timeline is Stopping", # When compaction checks timeline state after acquiring layer_removal_cs
|
||||
".*query handler for 'pagestream.*failed: Timeline .* was not found", # postgres reconnects while timeline_delete doesn't hold the tenant's timelines.lock()
|
||||
".*query handler for 'pagestream.*failed: Timeline .* is not active", # timeline delete in progress
|
||||
@@ -1621,6 +1629,10 @@ class NeonPageserver(PgProtocol):
|
||||
".*Compaction failed, retrying in [^:]+: Cannot run compaction iteration on inactive tenant",
|
||||
# these can happen anytime we do compactions from background task and shutdown pageserver
|
||||
r".*ERROR.*ancestor timeline \S+ is being stopped",
|
||||
# this is expected given our collaborative shutdown approach for the UploadQueue
|
||||
".*Compaction failed, retrying in .*: queue is in state Stopped.*",
|
||||
# Pageserver timeline deletion should be polled until it gets 404, so ignore it globally
|
||||
".*Error processing HTTP request: NotFound: Timeline .* was not found",
|
||||
]
|
||||
|
||||
def start(
|
||||
@@ -1688,6 +1700,9 @@ class NeonPageserver(PgProtocol):
|
||||
else:
|
||||
errors.append(line)
|
||||
|
||||
for error in errors:
|
||||
log.info(f"not allowed error: {error.strip()}")
|
||||
|
||||
assert not errors
|
||||
|
||||
def log_contains(self, pattern: str) -> Optional[str]:
|
||||
@@ -2280,17 +2295,24 @@ class Endpoint(PgProtocol):
|
||||
"""An object representing a Postgres compute endpoint managed by the control plane."""
|
||||
|
||||
def __init__(
|
||||
self, env: NeonEnv, tenant_id: TenantId, port: int, check_stop_result: bool = True
|
||||
self,
|
||||
env: NeonEnv,
|
||||
tenant_id: TenantId,
|
||||
pg_port: int,
|
||||
http_port: int,
|
||||
check_stop_result: bool = True,
|
||||
):
|
||||
super().__init__(host="localhost", port=port, user="cloud_admin", dbname="postgres")
|
||||
super().__init__(host="localhost", port=pg_port, user="cloud_admin", dbname="postgres")
|
||||
self.env = env
|
||||
self.running = False
|
||||
self.branch_name: Optional[str] = None # dubious
|
||||
self.endpoint_id: Optional[str] = None # dubious, see asserts below
|
||||
self.pgdata_dir: Optional[str] = None # Path to computenode PGDATA
|
||||
self.tenant_id = tenant_id
|
||||
self.port = port
|
||||
self.pg_port = pg_port
|
||||
self.http_port = http_port
|
||||
self.check_stop_result = check_stop_result
|
||||
self.active_safekeepers: List[int] = list(map(lambda sk: sk.id, env.safekeepers))
|
||||
# path to conf is <repo_dir>/endpoints/<endpoint_id>/pgdata/postgresql.conf
|
||||
|
||||
def create(
|
||||
@@ -2320,7 +2342,8 @@ class Endpoint(PgProtocol):
|
||||
tenant_id=self.tenant_id,
|
||||
lsn=lsn,
|
||||
hot_standby=hot_standby,
|
||||
port=self.port,
|
||||
pg_port=self.pg_port,
|
||||
http_port=self.http_port,
|
||||
)
|
||||
path = Path("endpoints") / self.endpoint_id / "pgdata"
|
||||
self.pgdata_dir = os.path.join(self.env.repo_dir, path)
|
||||
@@ -2345,7 +2368,13 @@ class Endpoint(PgProtocol):
|
||||
|
||||
log.info(f"Starting postgres endpoint {self.endpoint_id}")
|
||||
|
||||
self.env.neon_cli.endpoint_start(self.endpoint_id, tenant_id=self.tenant_id, port=self.port)
|
||||
self.env.neon_cli.endpoint_start(
|
||||
self.endpoint_id,
|
||||
pg_port=self.pg_port,
|
||||
http_port=self.http_port,
|
||||
tenant_id=self.tenant_id,
|
||||
safekeepers=self.active_safekeepers,
|
||||
)
|
||||
self.running = True
|
||||
|
||||
return self
|
||||
@@ -2369,32 +2398,8 @@ class Endpoint(PgProtocol):
|
||||
return os.path.join(self.pg_data_dir_path(), "pg_twophase")
|
||||
|
||||
def config_file_path(self) -> str:
|
||||
"""Path to postgresql.conf"""
|
||||
return os.path.join(self.pg_data_dir_path(), "postgresql.conf")
|
||||
|
||||
def adjust_for_safekeepers(self, safekeepers: str) -> "Endpoint":
|
||||
"""
|
||||
Adjust instance config for working with wal acceptors instead of
|
||||
pageserver (pre-configured by CLI) directly.
|
||||
"""
|
||||
|
||||
# TODO: reuse config()
|
||||
with open(self.config_file_path(), "r") as f:
|
||||
cfg_lines = f.readlines()
|
||||
with open(self.config_file_path(), "w") as f:
|
||||
for cfg_line in cfg_lines:
|
||||
# walproposer uses different application_name
|
||||
if (
|
||||
"synchronous_standby_names" in cfg_line
|
||||
or
|
||||
# don't repeat safekeepers/wal_acceptors multiple times
|
||||
"neon.safekeepers" in cfg_line
|
||||
):
|
||||
continue
|
||||
f.write(cfg_line)
|
||||
f.write("synchronous_standby_names = 'walproposer'\n")
|
||||
f.write("neon.safekeepers = '{}'\n".format(safekeepers))
|
||||
return self
|
||||
"""Path to the postgresql.conf in the endpoint directory (not the one in pgdata)"""
|
||||
return os.path.join(self.endpoint_path(), "postgresql.conf")
|
||||
|
||||
def config(self, lines: List[str]) -> "Endpoint":
|
||||
"""
|
||||
@@ -2410,6 +2415,17 @@ class Endpoint(PgProtocol):
|
||||
|
||||
return self
|
||||
|
||||
def respec(self, **kwargs):
|
||||
"""Update the endpoint.json file used by control_plane."""
|
||||
# Read config
|
||||
config_path = os.path.join(self.endpoint_path(), "endpoint.json")
|
||||
with open(config_path, "r") as f:
|
||||
data_dict = json.load(f)
|
||||
|
||||
# Write it back updated
|
||||
with open(config_path, "w") as file:
|
||||
json.dump(dict(data_dict, **kwargs), file, indent=4)
|
||||
|
||||
def stop(self) -> "Endpoint":
|
||||
"""
|
||||
Stop the Postgres instance if it's running.
|
||||
@@ -2499,7 +2515,8 @@ class EndpointFactory:
|
||||
ep = Endpoint(
|
||||
self.env,
|
||||
tenant_id=tenant_id or self.env.initial_tenant,
|
||||
port=self.env.port_distributor.get_port(),
|
||||
pg_port=self.env.port_distributor.get_port(),
|
||||
http_port=self.env.port_distributor.get_port(),
|
||||
)
|
||||
self.num_instances += 1
|
||||
self.endpoints.append(ep)
|
||||
@@ -2524,7 +2541,8 @@ class EndpointFactory:
|
||||
ep = Endpoint(
|
||||
self.env,
|
||||
tenant_id=tenant_id or self.env.initial_tenant,
|
||||
port=self.env.port_distributor.get_port(),
|
||||
pg_port=self.env.port_distributor.get_port(),
|
||||
http_port=self.env.port_distributor.get_port(),
|
||||
)
|
||||
|
||||
if endpoint_id is None:
|
||||
@@ -2907,6 +2925,7 @@ SKIP_FILES = frozenset(
|
||||
"pg_internal.init",
|
||||
"pg.log",
|
||||
"zenith.signal",
|
||||
"pg_hba.conf",
|
||||
"postgresql.conf",
|
||||
"postmaster.opts",
|
||||
"postmaster.pid",
|
||||
|
||||
@@ -342,6 +342,11 @@ class PageserverHttpClient(requests.Session):
|
||||
return res_json
|
||||
|
||||
def timeline_delete(self, tenant_id: TenantId, timeline_id: TimelineId, **kwargs):
|
||||
"""
|
||||
Note that deletion is not instant, it is scheduled and performed mostly in the background.
|
||||
So if you need to wait for it to complete use `timeline_delete_wait_completed`.
|
||||
For longer description consult with pageserver openapi spec.
|
||||
"""
|
||||
res = self.delete(
|
||||
f"http://localhost:{self.port}/v1/tenant/{tenant_id}/timeline/{timeline_id}", **kwargs
|
||||
)
|
||||
|
||||
@@ -1,8 +1,8 @@
|
||||
import time
|
||||
from typing import Optional
|
||||
from typing import Any, Dict, Optional
|
||||
|
||||
from fixtures.log_helper import log
|
||||
from fixtures.pageserver.http import PageserverHttpClient
|
||||
from fixtures.pageserver.http import PageserverApiException, PageserverHttpClient
|
||||
from fixtures.types import Lsn, TenantId, TimelineId
|
||||
|
||||
|
||||
@@ -72,7 +72,7 @@ def wait_until_tenant_state(
|
||||
expected_state: str,
|
||||
iterations: int,
|
||||
period: float = 1.0,
|
||||
) -> bool:
|
||||
) -> Dict[str, Any]:
|
||||
"""
|
||||
Does not use `wait_until` for debugging purposes
|
||||
"""
|
||||
@@ -81,7 +81,7 @@ def wait_until_tenant_state(
|
||||
tenant = pageserver_http.tenant_status(tenant_id=tenant_id)
|
||||
log.debug(f"Tenant {tenant_id} data: {tenant}")
|
||||
if tenant["state"]["slug"] == expected_state:
|
||||
return True
|
||||
return tenant
|
||||
except Exception as e:
|
||||
log.debug(f"Tenant {tenant_id} state retrieval failure: {e}")
|
||||
|
||||
@@ -92,6 +92,41 @@ def wait_until_tenant_state(
|
||||
)
|
||||
|
||||
|
||||
def wait_until_timeline_state(
|
||||
pageserver_http: PageserverHttpClient,
|
||||
tenant_id: TenantId,
|
||||
timeline_id: TimelineId,
|
||||
expected_state: str,
|
||||
iterations: int,
|
||||
period: float = 1.0,
|
||||
) -> Dict[str, Any]:
|
||||
"""
|
||||
Does not use `wait_until` for debugging purposes
|
||||
"""
|
||||
for i in range(iterations):
|
||||
try:
|
||||
timeline = pageserver_http.timeline_detail(tenant_id=tenant_id, timeline_id=timeline_id)
|
||||
log.debug(f"Timeline {tenant_id}/{timeline_id} data: {timeline}")
|
||||
if isinstance(timeline["state"], str):
|
||||
if timeline["state"] == expected_state:
|
||||
return timeline
|
||||
elif isinstance(timeline, Dict):
|
||||
if timeline["state"].get(expected_state):
|
||||
return timeline
|
||||
|
||||
except Exception as e:
|
||||
log.debug(f"Timeline {tenant_id}/{timeline_id} state retrieval failure: {e}")
|
||||
|
||||
if i == iterations - 1:
|
||||
# do not sleep last time, we already know that we failed
|
||||
break
|
||||
time.sleep(period)
|
||||
|
||||
raise Exception(
|
||||
f"Timeline {tenant_id}/{timeline_id} did not become {expected_state} within {iterations * period} seconds"
|
||||
)
|
||||
|
||||
|
||||
def wait_until_tenant_active(
|
||||
pageserver_http: PageserverHttpClient,
|
||||
tenant_id: TenantId,
|
||||
@@ -156,3 +191,32 @@ def wait_for_upload_queue_empty(
|
||||
if all(m.value == 0 for m in tl):
|
||||
return
|
||||
time.sleep(0.2)
|
||||
|
||||
|
||||
def wait_timeline_detail_404(
|
||||
pageserver_http: PageserverHttpClient, tenant_id: TenantId, timeline_id: TimelineId
|
||||
):
|
||||
last_exc = None
|
||||
for _ in range(2):
|
||||
time.sleep(0.250)
|
||||
try:
|
||||
data = pageserver_http.timeline_detail(tenant_id, timeline_id)
|
||||
log.error(f"detail {data}")
|
||||
except PageserverApiException as e:
|
||||
log.debug(e)
|
||||
if e.status_code == 404:
|
||||
return
|
||||
|
||||
last_exc = e
|
||||
|
||||
raise last_exc or RuntimeError(f"Timeline wasnt deleted in time, state: {data['state']}")
|
||||
|
||||
|
||||
def timeline_delete_wait_completed(
|
||||
pageserver_http: PageserverHttpClient,
|
||||
tenant_id: TenantId,
|
||||
timeline_id: TimelineId,
|
||||
**delete_args,
|
||||
):
|
||||
pageserver_http.timeline_delete(tenant_id=tenant_id, timeline_id=timeline_id, **delete_args)
|
||||
wait_timeline_detail_404(pageserver_http, tenant_id, timeline_id)
|
||||
|
||||
@@ -1,10 +1,71 @@
|
||||
from contextlib import closing
|
||||
|
||||
import pytest
|
||||
from fixtures.benchmark_fixture import NeonBenchmarker
|
||||
import requests
|
||||
from fixtures.benchmark_fixture import MetricReport, NeonBenchmarker
|
||||
from fixtures.neon_fixtures import NeonEnvBuilder
|
||||
|
||||
|
||||
# Just start and measure duration.
|
||||
#
|
||||
# This test runs pretty quickly and can be informative when used in combination
|
||||
# with emulated network delay. Some useful delay commands:
|
||||
#
|
||||
# 1. Add 2msec delay to all localhost traffic
|
||||
# `sudo tc qdisc add dev lo root handle 1:0 netem delay 2msec`
|
||||
#
|
||||
# 2. Test that it works (you should see 4ms ping)
|
||||
# `ping localhost`
|
||||
#
|
||||
# 3. Revert back to normal
|
||||
# `sudo tc qdisc del dev lo root netem`
|
||||
#
|
||||
# NOTE this test might not represent the real startup time because the basebackup
|
||||
# for a large database might be larger if there's a lof of transaction metadata,
|
||||
# or safekeepers might need more syncing, or there might be more operations to
|
||||
# apply during config step, like more users, databases, or extensions. By default
|
||||
# we load extensions 'neon,pg_stat_statements,timescaledb,pg_cron', but in this
|
||||
# test we only load neon.
|
||||
def test_startup_simple(neon_env_builder: NeonEnvBuilder, zenbenchmark: NeonBenchmarker):
|
||||
neon_env_builder.num_safekeepers = 3
|
||||
env = neon_env_builder.init_start()
|
||||
|
||||
env.neon_cli.create_branch("test_startup")
|
||||
|
||||
endpoint = None
|
||||
|
||||
# We do two iterations so we can see if the second startup is faster. It should
|
||||
# be because the compute node should already be configured with roles, databases,
|
||||
# extensions, etc from the first run.
|
||||
for i in range(2):
|
||||
# Start
|
||||
with zenbenchmark.record_duration(f"{i}_start_and_select"):
|
||||
if endpoint:
|
||||
endpoint.start()
|
||||
else:
|
||||
endpoint = env.endpoints.create_start("test_startup")
|
||||
endpoint.safe_psql("select 1;")
|
||||
|
||||
# Get metrics
|
||||
metrics = requests.get(f"http://localhost:{endpoint.http_port}/metrics.json").json()
|
||||
durations = {
|
||||
"wait_for_spec_ms": f"{i}_wait_for_spec",
|
||||
"sync_safekeepers_ms": f"{i}_sync_safekeepers",
|
||||
"basebackup_ms": f"{i}_basebackup",
|
||||
"config_ms": f"{i}_config",
|
||||
"total_startup_ms": f"{i}_total_startup",
|
||||
}
|
||||
for key, name in durations.items():
|
||||
value = metrics[key]
|
||||
zenbenchmark.record(name, value, "ms", report=MetricReport.LOWER_IS_BETTER)
|
||||
|
||||
# Stop so we can restart
|
||||
endpoint.stop()
|
||||
|
||||
# Imitate optimizations that console would do for the second start
|
||||
endpoint.respec(skip_pg_catalog_updates=True)
|
||||
|
||||
|
||||
# This test sometimes runs for longer than the global 5 minute timeout.
|
||||
@pytest.mark.timeout(600)
|
||||
def test_startup(neon_env_builder: NeonEnvBuilder, zenbenchmark: NeonBenchmarker):
|
||||
|
||||
@@ -1,2 +1,2 @@
|
||||
pg8000==1.29.4
|
||||
pg8000==1.29.8
|
||||
scramp>=1.4.3
|
||||
|
||||
@@ -396,9 +396,9 @@ checksum = "b7e5500299e16ebb147ae15a00a942af264cf3688f47923b8fc2cd5858f23ad3"
|
||||
|
||||
[[package]]
|
||||
name = "openssl"
|
||||
version = "0.10.52"
|
||||
version = "0.10.55"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "01b8574602df80f7b85fdfc5392fa884a4e3b3f4f35402c070ab34c3d3f78d56"
|
||||
checksum = "345df152bc43501c5eb9e4654ff05f794effb78d4efe3d53abc158baddc0703d"
|
||||
dependencies = [
|
||||
"bitflags",
|
||||
"cfg-if",
|
||||
@@ -428,9 +428,9 @@ checksum = "ff011a302c396a5197692431fc1948019154afc178baf7d8e37367442a4601cf"
|
||||
|
||||
[[package]]
|
||||
name = "openssl-sys"
|
||||
version = "0.9.87"
|
||||
version = "0.9.90"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "8e17f59264b2809d77ae94f0e1ebabc434773f370d6ca667bd223ea10e06cc7e"
|
||||
checksum = "374533b0e45f3a7ced10fcaeccca020e66656bc03dac384f852e4e5a7a8104a6"
|
||||
dependencies = [
|
||||
"cc",
|
||||
"libc",
|
||||
|
||||
@@ -1,4 +1,4 @@
|
||||
FROM rust:1.69
|
||||
FROM rust:1.70
|
||||
WORKDIR /source
|
||||
|
||||
COPY . .
|
||||
|
||||
@@ -5,8 +5,8 @@
|
||||
"kind" : "remoteSourceControl",
|
||||
"location" : "https://github.com/vapor/postgres-nio.git",
|
||||
"state" : {
|
||||
"revision" : "dbf9c2eb596df39cba8ff3f74d74b2e6a31bd937",
|
||||
"version" : "1.14.1"
|
||||
"revision" : "061a0836d7c1887e04a975d1d2eaa2ef5fd7dfab",
|
||||
"version" : "1.16.0"
|
||||
}
|
||||
},
|
||||
{
|
||||
@@ -59,8 +59,8 @@
|
||||
"kind" : "remoteSourceControl",
|
||||
"location" : "https://github.com/apple/swift-nio.git",
|
||||
"state" : {
|
||||
"revision" : "d1690f85419fdac8d54e350fb6d2ab9fd95afd75",
|
||||
"version" : "2.51.1"
|
||||
"revision" : "6213ba7a06febe8fef60563a4a7d26a4085783cf",
|
||||
"version" : "2.54.0"
|
||||
}
|
||||
},
|
||||
{
|
||||
|
||||
@@ -4,7 +4,7 @@ import PackageDescription
|
||||
let package = Package(
|
||||
name: "PostgresNIOExample",
|
||||
dependencies: [
|
||||
.package(url: "https://github.com/vapor/postgres-nio.git", from: "1.14.1")
|
||||
.package(url: "https://github.com/vapor/postgres-nio.git", from: "1.16.0")
|
||||
],
|
||||
targets: [
|
||||
.executableTarget(
|
||||
|
||||
@@ -5,23 +5,7 @@
|
||||
"packages": {
|
||||
"": {
|
||||
"dependencies": {
|
||||
"postgresql-client": "2.5.5"
|
||||
}
|
||||
},
|
||||
"node_modules/debug": {
|
||||
"version": "4.3.4",
|
||||
"resolved": "https://registry.npmjs.org/debug/-/debug-4.3.4.tgz",
|
||||
"integrity": "sha512-PRWFHuSU3eDtQJPvnNY7Jcket1j0t5OuOsFzPPzsekD52Zl8qUfFIPEiswXqIvHWGVHOgX+7G/vCNNhehwxfkQ==",
|
||||
"dependencies": {
|
||||
"ms": "2.1.2"
|
||||
},
|
||||
"engines": {
|
||||
"node": ">=6.0"
|
||||
},
|
||||
"peerDependenciesMeta": {
|
||||
"supports-color": {
|
||||
"optional": true
|
||||
}
|
||||
"postgresql-client": "2.5.9"
|
||||
}
|
||||
},
|
||||
"node_modules/doublylinked": {
|
||||
@@ -41,11 +25,6 @@
|
||||
"putil-promisify": "^1.8.6"
|
||||
}
|
||||
},
|
||||
"node_modules/ms": {
|
||||
"version": "2.1.2",
|
||||
"resolved": "https://registry.npmjs.org/ms/-/ms-2.1.2.tgz",
|
||||
"integrity": "sha512-sGkPx+VjMtmA6MX27oA4FBFELFCZZ4S4XqeGOXCv68tT+jb3vk/RyaKWP0PTKyWtmLSM0b+adUTEvbs1PEaH2w=="
|
||||
},
|
||||
"node_modules/obuf": {
|
||||
"version": "1.1.2",
|
||||
"resolved": "https://registry.npmjs.org/obuf/-/obuf-1.1.2.tgz",
|
||||
@@ -63,30 +42,28 @@
|
||||
}
|
||||
},
|
||||
"node_modules/postgresql-client": {
|
||||
"version": "2.5.5",
|
||||
"resolved": "https://registry.npmjs.org/postgresql-client/-/postgresql-client-2.5.5.tgz",
|
||||
"integrity": "sha512-2Mu3i+6NQ9cnkoZNd0XeSZo9WoUpuWf4ZSiCCoDWSj82T93py2/SKXZ1aUaP8mVaU0oKpyyGe0IwLYZ1VHShnA==",
|
||||
"version": "2.5.9",
|
||||
"resolved": "https://registry.npmjs.org/postgresql-client/-/postgresql-client-2.5.9.tgz",
|
||||
"integrity": "sha512-s+kgTN6TfWLzehEyxw4Im4odnxVRCbZ0DEJzWS6SLowPAmB2m1/DOiOvZC0+ZVoi5AfbGE6SBqFxKguSyVAXZg==",
|
||||
"dependencies": {
|
||||
"debug": "^4.3.4",
|
||||
"doublylinked": "^2.5.2",
|
||||
"lightning-pool": "^4.2.1",
|
||||
"postgres-bytea": "^3.0.0",
|
||||
"power-tasks": "^1.6.4",
|
||||
"power-tasks": "^1.7.0",
|
||||
"putil-merge": "^3.10.3",
|
||||
"putil-promisify": "^1.10.0",
|
||||
"putil-varhelpers": "^1.6.5"
|
||||
},
|
||||
"engines": {
|
||||
"node": ">=14.0",
|
||||
"node": ">=16.0",
|
||||
"npm": ">=7.0.0"
|
||||
}
|
||||
},
|
||||
"node_modules/power-tasks": {
|
||||
"version": "1.6.4",
|
||||
"resolved": "https://registry.npmjs.org/power-tasks/-/power-tasks-1.6.4.tgz",
|
||||
"integrity": "sha512-LX8GGgEIP1N7jsZqlqZ275e6f1Ehq97APCEGj8uVO0NoEoB+77QUX12BFv3LmlNKfq4fIuNSPiHhyHFjqn2gfA==",
|
||||
"version": "1.7.0",
|
||||
"resolved": "https://registry.npmjs.org/power-tasks/-/power-tasks-1.7.0.tgz",
|
||||
"integrity": "sha512-rndZXCDxhuIDjPUJJvQwBDHaYagCkjvbPF/NA+omh/Ef4rAI9KtnvdA0k98dyiGpn1zXOpc6c2c0JWzg/xAhJg==",
|
||||
"dependencies": {
|
||||
"debug": "^4.3.4",
|
||||
"doublylinked": "^2.5.2",
|
||||
"strict-typed-events": "^2.3.1"
|
||||
},
|
||||
@@ -132,9 +109,9 @@
|
||||
}
|
||||
},
|
||||
"node_modules/ts-gems": {
|
||||
"version": "2.3.0",
|
||||
"resolved": "https://registry.npmjs.org/ts-gems/-/ts-gems-2.3.0.tgz",
|
||||
"integrity": "sha512-bUvrwrzlct7vfaNvtgMhynDf6lAki/kTtrNsIGhX6l7GJGK3s6b8Ro7dazOLXabV0m2jyShBzDQ8X1+h/C2Cug=="
|
||||
"version": "2.4.0",
|
||||
"resolved": "https://registry.npmjs.org/ts-gems/-/ts-gems-2.4.0.tgz",
|
||||
"integrity": "sha512-SdugYAXoWvbqrxLodIObzxhEKacDxh5LfAJIiIkiH7q5thvuuCzdmkdTVQYf7uEDrEpPhfx4tokDMamdO3be9A=="
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1,6 +1,6 @@
|
||||
{
|
||||
"type": "module",
|
||||
"dependencies": {
|
||||
"postgresql-client": "2.5.5"
|
||||
"postgresql-client": "2.5.9"
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1,4 +1,4 @@
|
||||
FROM node:18
|
||||
FROM node:20
|
||||
WORKDIR /source
|
||||
|
||||
COPY . .
|
||||
|
||||
@@ -5,16 +5,16 @@
|
||||
"packages": {
|
||||
"": {
|
||||
"dependencies": {
|
||||
"@neondatabase/serverless": "0.4.3",
|
||||
"@neondatabase/serverless": "0.4.18",
|
||||
"ws": "8.13.0"
|
||||
}
|
||||
},
|
||||
"node_modules/@neondatabase/serverless": {
|
||||
"version": "0.4.3",
|
||||
"resolved": "https://registry.npmjs.org/@neondatabase/serverless/-/serverless-0.4.3.tgz",
|
||||
"integrity": "sha512-U8tpuF5f0R5WRsciR7iaJ5S2h54DWa6Z6CEW+J4KgwyvRN3q3qDz0MibdfFXU0WqnRoi/9RSf/2XN4TfeaOCbQ==",
|
||||
"version": "0.4.18",
|
||||
"resolved": "https://registry.npmjs.org/@neondatabase/serverless/-/serverless-0.4.18.tgz",
|
||||
"integrity": "sha512-2TZnIyRGC/+0fjZ8TKCzaSTPUD94PM7NBGuantGZbUrbWyqBwGnUoRtdZAQ95qBKVHqORLVfymlv2NE+HQMFeA==",
|
||||
"dependencies": {
|
||||
"@types/pg": "^8.6.6"
|
||||
"@types/pg": "8.6.6"
|
||||
}
|
||||
},
|
||||
"node_modules/@types/node": {
|
||||
|
||||
@@ -1,7 +1,7 @@
|
||||
{
|
||||
"type": "module",
|
||||
"dependencies": {
|
||||
"@neondatabase/serverless": "0.4.3",
|
||||
"@neondatabase/serverless": "0.4.18",
|
||||
"ws": "8.13.0"
|
||||
}
|
||||
}
|
||||
|
||||
@@ -20,6 +20,11 @@ def positive_env(neon_env_builder: NeonEnvBuilder) -> NeonEnv:
|
||||
test_name="test_attach_tenant_config",
|
||||
)
|
||||
env = neon_env_builder.init_start()
|
||||
|
||||
# eviction might be the first one after an attach to access the layers
|
||||
env.pageserver.allowed_errors.append(
|
||||
".*unexpectedly on-demand downloading remote layer remote.* for task kind Eviction"
|
||||
)
|
||||
assert isinstance(env.remote_storage, LocalFsStorage)
|
||||
return env
|
||||
|
||||
@@ -158,6 +163,7 @@ def test_fully_custom_config(positive_env: NeonEnv):
|
||||
"threshold": "23h",
|
||||
},
|
||||
"evictions_low_residence_duration_metric_threshold": "2days",
|
||||
"gc_feedback": True,
|
||||
"gc_horizon": 23 * (1024 * 1024),
|
||||
"gc_period": "2h 13m",
|
||||
"image_creation_threshold": 7,
|
||||
|
||||
@@ -20,7 +20,7 @@ def test_broken_timeline(neon_env_builder: NeonEnvBuilder):
|
||||
".*is not active. Current state: Broken.*",
|
||||
".*will not become active. Current state: Broken.*",
|
||||
".*failed to load metadata.*",
|
||||
".*could not load tenant.*load local timeline.*",
|
||||
".*load failed.*load local timeline.*",
|
||||
]
|
||||
)
|
||||
|
||||
|
||||
@@ -2,6 +2,7 @@ import copy
|
||||
import os
|
||||
import shutil
|
||||
import subprocess
|
||||
import tempfile
|
||||
from pathlib import Path
|
||||
from typing import Any, Optional
|
||||
|
||||
@@ -15,7 +16,11 @@ from fixtures.neon_fixtures import (
|
||||
PortDistributor,
|
||||
)
|
||||
from fixtures.pageserver.http import PageserverHttpClient
|
||||
from fixtures.pageserver.utils import wait_for_last_record_lsn, wait_for_upload
|
||||
from fixtures.pageserver.utils import (
|
||||
timeline_delete_wait_completed,
|
||||
wait_for_last_record_lsn,
|
||||
wait_for_upload,
|
||||
)
|
||||
from fixtures.pg_version import PgVersion
|
||||
from fixtures.types import Lsn
|
||||
from pytest import FixtureRequest
|
||||
@@ -383,6 +388,9 @@ def check_neon_works(
|
||||
cli_target = NeonCli(config_target)
|
||||
|
||||
# And the current binaries to launch computes
|
||||
snapshot_config["neon_distrib_dir"] = str(neon_current_binpath)
|
||||
with (snapshot_config_toml).open("w") as f:
|
||||
toml.dump(snapshot_config, f)
|
||||
config_current = copy.copy(config)
|
||||
config_current.neon_binpath = neon_current_binpath
|
||||
cli_current = NeonCli(config_current)
|
||||
@@ -391,7 +399,8 @@ def check_neon_works(
|
||||
request.addfinalizer(lambda: cli_target.raw_cli(["stop"]))
|
||||
|
||||
pg_port = port_distributor.get_port()
|
||||
cli_current.endpoint_start("main", port=pg_port)
|
||||
http_port = port_distributor.get_port()
|
||||
cli_current.endpoint_start("main", pg_port=pg_port, http_port=http_port)
|
||||
request.addfinalizer(lambda: cli_current.endpoint_stop("main"))
|
||||
|
||||
connstr = f"host=127.0.0.1 port={pg_port} user=cloud_admin dbname=postgres"
|
||||
@@ -413,7 +422,7 @@ def check_neon_works(
|
||||
)
|
||||
|
||||
shutil.rmtree(repo_dir / "local_fs_remote_storage")
|
||||
pageserver_http.timeline_delete(tenant_id, timeline_id)
|
||||
timeline_delete_wait_completed(pageserver_http, tenant_id, timeline_id)
|
||||
pageserver_http.timeline_create(pg_version, tenant_id, timeline_id)
|
||||
pg_bin.run(
|
||||
["pg_dumpall", f"--dbname={connstr}", f"--file={test_output_dir / 'dump-from-wal.sql'}"]
|
||||
@@ -440,7 +449,7 @@ def dump_differs(first: Path, second: Path, output: Path) -> bool:
|
||||
"""
|
||||
|
||||
with output.open("w") as stdout:
|
||||
rv = subprocess.run(
|
||||
res = subprocess.run(
|
||||
[
|
||||
"diff",
|
||||
"--unified", # Make diff output more readable
|
||||
@@ -452,4 +461,53 @@ def dump_differs(first: Path, second: Path, output: Path) -> bool:
|
||||
stdout=stdout,
|
||||
)
|
||||
|
||||
return rv.returncode != 0
|
||||
differs = res.returncode != 0
|
||||
|
||||
# TODO: Remove after https://github.com/neondatabase/neon/pull/4425 is merged, and a couple of releases are made
|
||||
if differs:
|
||||
with tempfile.NamedTemporaryFile(mode="w") as tmp:
|
||||
tmp.write(PR4425_ALLOWED_DIFF)
|
||||
tmp.flush()
|
||||
|
||||
allowed = subprocess.run(
|
||||
[
|
||||
"diff",
|
||||
"--unified", # Make diff output more readable
|
||||
r"--ignore-matching-lines=^---", # Ignore diff headers
|
||||
r"--ignore-matching-lines=^\+\+\+", # Ignore diff headers
|
||||
"--ignore-matching-lines=^@@", # Ignore diff blocks location
|
||||
"--ignore-matching-lines=^ *$", # Ignore lines with only spaces
|
||||
"--ignore-matching-lines=^ --.*", # Ignore the " --" lines for compatibility with PG14
|
||||
"--ignore-blank-lines",
|
||||
str(output),
|
||||
str(tmp.name),
|
||||
],
|
||||
)
|
||||
|
||||
differs = allowed.returncode != 0
|
||||
|
||||
return differs
|
||||
|
||||
|
||||
PR4425_ALLOWED_DIFF = """
|
||||
--- /tmp/test_output/test_backward_compatibility[release-pg15]/compatibility_snapshot/dump.sql 2023-06-08 18:12:45.000000000 +0000
|
||||
+++ /tmp/test_output/test_backward_compatibility[release-pg15]/dump.sql 2023-06-13 07:25:35.211733653 +0000
|
||||
@@ -13,12 +13,20 @@
|
||||
|
||||
CREATE ROLE cloud_admin;
|
||||
ALTER ROLE cloud_admin WITH SUPERUSER INHERIT CREATEROLE CREATEDB LOGIN REPLICATION BYPASSRLS;
|
||||
+CREATE ROLE neon_superuser;
|
||||
+ALTER ROLE neon_superuser WITH NOSUPERUSER INHERIT CREATEROLE CREATEDB NOLOGIN NOREPLICATION NOBYPASSRLS;
|
||||
|
||||
--
|
||||
-- User Configurations
|
||||
--
|
||||
|
||||
|
||||
+--
|
||||
+-- Role memberships
|
||||
+--
|
||||
+
|
||||
+GRANT pg_read_all_data TO neon_superuser GRANTED BY cloud_admin;
|
||||
+GRANT pg_write_all_data TO neon_superuser GRANTED BY cloud_admin;
|
||||
"""
|
||||
|
||||
@@ -1,253 +0,0 @@
|
||||
import os
|
||||
from pathlib import Path
|
||||
from subprocess import TimeoutExpired
|
||||
|
||||
from fixtures.log_helper import log
|
||||
from fixtures.neon_fixtures import ComputeCtl, NeonEnvBuilder, PgBin
|
||||
|
||||
|
||||
# Test that compute_ctl works and prints "--sync-safekeepers" logs.
|
||||
def test_sync_safekeepers_logs(neon_env_builder: NeonEnvBuilder, pg_bin: PgBin):
|
||||
neon_env_builder.num_safekeepers = 3
|
||||
env = neon_env_builder.init_start()
|
||||
ctl = ComputeCtl(env)
|
||||
|
||||
env.neon_cli.create_branch("test_compute_ctl", "main")
|
||||
endpoint = env.endpoints.create_start("test_compute_ctl")
|
||||
endpoint.safe_psql("CREATE TABLE t(key int primary key, value text)")
|
||||
|
||||
with open(endpoint.config_file_path(), "r") as f:
|
||||
cfg_lines = f.readlines()
|
||||
cfg_map = {}
|
||||
for line in cfg_lines:
|
||||
if "=" in line:
|
||||
k, v = line.split("=")
|
||||
cfg_map[k] = v.strip("\n '\"")
|
||||
log.info(f"postgres config: {cfg_map}")
|
||||
pgdata = endpoint.pg_data_dir_path()
|
||||
pg_bin_path = os.path.join(pg_bin.pg_bin_path, "postgres")
|
||||
|
||||
endpoint.stop_and_destroy()
|
||||
|
||||
# stop_and_destroy removes the whole endpoint directory. Recreate it.
|
||||
Path(pgdata).mkdir(parents=True)
|
||||
|
||||
spec = (
|
||||
"""
|
||||
{
|
||||
"format_version": 1.0,
|
||||
|
||||
"timestamp": "2021-05-23T18:25:43.511Z",
|
||||
"operation_uuid": "0f657b36-4b0f-4a2d-9c2e-1dcd615e7d8b",
|
||||
|
||||
"cluster": {
|
||||
"cluster_id": "test-cluster-42",
|
||||
"name": "Neon Test",
|
||||
"state": "restarted",
|
||||
"roles": [
|
||||
],
|
||||
"databases": [
|
||||
],
|
||||
"settings": [
|
||||
{
|
||||
"name": "fsync",
|
||||
"value": "off",
|
||||
"vartype": "bool"
|
||||
},
|
||||
{
|
||||
"name": "wal_level",
|
||||
"value": "replica",
|
||||
"vartype": "enum"
|
||||
},
|
||||
{
|
||||
"name": "neon.safekeepers",
|
||||
"value": """
|
||||
+ f'"{cfg_map["neon.safekeepers"]}"'
|
||||
+ """,
|
||||
"vartype": "string"
|
||||
},
|
||||
{
|
||||
"name": "wal_log_hints",
|
||||
"value": "on",
|
||||
"vartype": "bool"
|
||||
},
|
||||
{
|
||||
"name": "log_connections",
|
||||
"value": "on",
|
||||
"vartype": "bool"
|
||||
},
|
||||
{
|
||||
"name": "shared_buffers",
|
||||
"value": "32768",
|
||||
"vartype": "integer"
|
||||
},
|
||||
{
|
||||
"name": "port",
|
||||
"value": """
|
||||
+ f'"{cfg_map["port"]}"'
|
||||
+ """,
|
||||
"vartype": "integer"
|
||||
},
|
||||
{
|
||||
"name": "max_connections",
|
||||
"value": "100",
|
||||
"vartype": "integer"
|
||||
},
|
||||
{
|
||||
"name": "max_wal_senders",
|
||||
"value": "10",
|
||||
"vartype": "integer"
|
||||
},
|
||||
{
|
||||
"name": "listen_addresses",
|
||||
"value": "0.0.0.0",
|
||||
"vartype": "string"
|
||||
},
|
||||
{
|
||||
"name": "wal_sender_timeout",
|
||||
"value": "0",
|
||||
"vartype": "integer"
|
||||
},
|
||||
{
|
||||
"name": "password_encryption",
|
||||
"value": "md5",
|
||||
"vartype": "enum"
|
||||
},
|
||||
{
|
||||
"name": "maintenance_work_mem",
|
||||
"value": "65536",
|
||||
"vartype": "integer"
|
||||
},
|
||||
{
|
||||
"name": "max_parallel_workers",
|
||||
"value": "8",
|
||||
"vartype": "integer"
|
||||
},
|
||||
{
|
||||
"name": "max_worker_processes",
|
||||
"value": "8",
|
||||
"vartype": "integer"
|
||||
},
|
||||
{
|
||||
"name": "neon.tenant_id",
|
||||
"value": """
|
||||
+ f'"{cfg_map["neon.tenant_id"]}"'
|
||||
+ """,
|
||||
"vartype": "string"
|
||||
},
|
||||
{
|
||||
"name": "max_replication_slots",
|
||||
"value": "10",
|
||||
"vartype": "integer"
|
||||
},
|
||||
{
|
||||
"name": "neon.timeline_id",
|
||||
"value": """
|
||||
+ f'"{cfg_map["neon.timeline_id"]}"'
|
||||
+ """,
|
||||
"vartype": "string"
|
||||
},
|
||||
{
|
||||
"name": "shared_preload_libraries",
|
||||
"value": "neon",
|
||||
"vartype": "string"
|
||||
},
|
||||
{
|
||||
"name": "synchronous_standby_names",
|
||||
"value": "walproposer",
|
||||
"vartype": "string"
|
||||
},
|
||||
{
|
||||
"name": "neon.pageserver_connstring",
|
||||
"value": """
|
||||
+ f'"{cfg_map["neon.pageserver_connstring"]}"'
|
||||
+ """,
|
||||
"vartype": "string"
|
||||
}
|
||||
]
|
||||
},
|
||||
"delta_operations": [
|
||||
]
|
||||
}
|
||||
"""
|
||||
)
|
||||
|
||||
ps_connstr = cfg_map["neon.pageserver_connstring"]
|
||||
log.info(f"ps_connstr: {ps_connstr}, pgdata: {pgdata}")
|
||||
|
||||
# run compute_ctl and wait for 10s
|
||||
try:
|
||||
ctl.raw_cli(
|
||||
[
|
||||
"--connstr",
|
||||
"postgres://invalid/",
|
||||
"--pgdata",
|
||||
pgdata,
|
||||
"--spec",
|
||||
spec,
|
||||
"--pgbin",
|
||||
pg_bin_path,
|
||||
],
|
||||
timeout=10,
|
||||
)
|
||||
except TimeoutExpired as exc:
|
||||
ctl_logs = (exc.stderr or b"").decode("utf-8")
|
||||
log.info(f"compute_ctl stderr:\n{ctl_logs}")
|
||||
|
||||
with ExternalProcessManager(Path(pgdata) / "postmaster.pid"):
|
||||
start = "starting safekeepers syncing"
|
||||
end = "safekeepers synced at LSN"
|
||||
start_pos = ctl_logs.index(start)
|
||||
assert start_pos != -1
|
||||
end_pos = ctl_logs.index(end, start_pos)
|
||||
assert end_pos != -1
|
||||
sync_safekeepers_logs = ctl_logs[start_pos : end_pos + len(end)]
|
||||
log.info("sync_safekeepers_logs:\n" + sync_safekeepers_logs)
|
||||
|
||||
# assert that --sync-safekeepers logs are present in the output
|
||||
assert "connecting with node" in sync_safekeepers_logs
|
||||
assert "connected with node" in sync_safekeepers_logs
|
||||
assert "proposer connected to quorum (2)" in sync_safekeepers_logs
|
||||
assert "got votes from majority (2)" in sync_safekeepers_logs
|
||||
assert "sending elected msg to node" in sync_safekeepers_logs
|
||||
|
||||
|
||||
class ExternalProcessManager:
|
||||
"""
|
||||
Context manager that kills a process with a pid file on exit.
|
||||
"""
|
||||
|
||||
def __init__(self, pid_file: Path):
|
||||
self.path = pid_file
|
||||
self.pid_file = open(pid_file, "r")
|
||||
self.pid = int(self.pid_file.readline().strip())
|
||||
|
||||
def __enter__(self):
|
||||
return self
|
||||
|
||||
def leave_alive(self):
|
||||
self.pid_file.close()
|
||||
|
||||
def __exit__(self, _type, _value, _traceback):
|
||||
import signal
|
||||
import time
|
||||
|
||||
if self.pid_file.closed:
|
||||
return
|
||||
|
||||
with self.pid_file:
|
||||
try:
|
||||
os.kill(self.pid, signal.SIGTERM)
|
||||
except OSError as e:
|
||||
if not self.path.is_file():
|
||||
return
|
||||
log.info(f"Failed to kill {self.pid}, but the pidfile remains: {e}")
|
||||
return
|
||||
|
||||
for _ in range(20):
|
||||
if not self.path.is_file():
|
||||
return
|
||||
time.sleep(0.2)
|
||||
|
||||
log.info("Process failed to stop after SIGTERM: {self.pid}")
|
||||
os.kill(self.pid, signal.SIGKILL)
|
||||
@@ -33,6 +33,7 @@ def handle_role(dbs, roles, operation):
|
||||
dbs[db] = operation["name"]
|
||||
if "password" in operation:
|
||||
roles[operation["name"]] = operation["password"]
|
||||
assert "encrypted_password" in operation
|
||||
elif operation["op"] == "del":
|
||||
if "old_name" in operation:
|
||||
roles.pop(operation["old_name"])
|
||||
|
||||
@@ -1,7 +1,6 @@
|
||||
import shutil
|
||||
import time
|
||||
from dataclasses import dataclass
|
||||
from pathlib import Path
|
||||
from typing import Dict, Tuple
|
||||
|
||||
import pytest
|
||||
@@ -110,6 +109,12 @@ class EvictionEnv:
|
||||
overrides=(
|
||||
"--pageserver-config-override=disk_usage_based_eviction="
|
||||
+ enc.dump_inline_table(disk_usage_config).replace("\n", " "),
|
||||
# Disk usage based eviction runs as a background task.
|
||||
# But pageserver startup delays launch of background tasks for some time, to prioritize initial logical size calculations during startup.
|
||||
# But, initial logical size calculation may not be triggered if safekeepers don't publish new broker messages.
|
||||
# But, we only have a 10-second-timeout in this test.
|
||||
# So, disable the delay for this test.
|
||||
"--pageserver-config-override=background_task_maximum_delay='0s'",
|
||||
),
|
||||
)
|
||||
|
||||
@@ -422,14 +427,14 @@ def poor_mans_du(
|
||||
largest_layer = 0
|
||||
smallest_layer = None
|
||||
for tenant_id, timeline_id in timelines:
|
||||
dir = Path(env.repo_dir) / "tenants" / str(tenant_id) / "timelines" / str(timeline_id)
|
||||
assert dir.exists(), f"timeline dir does not exist: {dir}"
|
||||
sum = 0
|
||||
for file in dir.iterdir():
|
||||
timeline_dir = env.timeline_dir(tenant_id, timeline_id)
|
||||
assert timeline_dir.exists(), f"timeline dir does not exist: {timeline_dir}"
|
||||
total = 0
|
||||
for file in timeline_dir.iterdir():
|
||||
if "__" not in file.name:
|
||||
continue
|
||||
size = file.stat().st_size
|
||||
sum += size
|
||||
total += size
|
||||
largest_layer = max(largest_layer, size)
|
||||
if smallest_layer:
|
||||
smallest_layer = min(smallest_layer, size)
|
||||
@@ -437,8 +442,8 @@ def poor_mans_du(
|
||||
smallest_layer = size
|
||||
log.info(f"{tenant_id}/{timeline_id} => {file.name} {size}")
|
||||
|
||||
log.info(f"{tenant_id}/{timeline_id}: sum {sum}")
|
||||
total_on_disk += sum
|
||||
log.info(f"{tenant_id}/{timeline_id}: sum {total}")
|
||||
total_on_disk += total
|
||||
|
||||
assert smallest_layer is not None or total_on_disk == 0 and largest_layer == 0
|
||||
return (total_on_disk, largest_layer, smallest_layer or 0)
|
||||
|
||||
@@ -1,3 +1,5 @@
|
||||
import time
|
||||
|
||||
import pytest
|
||||
from fixtures.neon_fixtures import NeonEnv
|
||||
|
||||
@@ -10,9 +12,10 @@ def test_hot_standby(neon_simple_env: NeonEnv):
|
||||
branch_name="main",
|
||||
endpoint_id="primary",
|
||||
) as primary:
|
||||
time.sleep(1)
|
||||
with env.endpoints.new_replica_start(origin=primary, endpoint_id="secondary") as secondary:
|
||||
primary_lsn = None
|
||||
cought_up = False
|
||||
caught_up = False
|
||||
queries = [
|
||||
"SHOW neon.timeline_id",
|
||||
"SHOW neon.tenant_id",
|
||||
@@ -56,7 +59,7 @@ def test_hot_standby(neon_simple_env: NeonEnv):
|
||||
res = s_cur.fetchone()
|
||||
assert res is not None
|
||||
|
||||
while not cought_up:
|
||||
while not caught_up:
|
||||
with s_con.cursor() as secondary_cursor:
|
||||
secondary_cursor.execute("SELECT pg_last_wal_replay_lsn()")
|
||||
res = secondary_cursor.fetchone()
|
||||
@@ -66,7 +69,7 @@ def test_hot_standby(neon_simple_env: NeonEnv):
|
||||
# due to e.g. autovacuum, but that shouldn't impact the content
|
||||
# of the tables, so we check whether we've replayed up to at
|
||||
# least after the commit of the `test` table.
|
||||
cought_up = secondary_lsn >= primary_lsn
|
||||
caught_up = secondary_lsn >= primary_lsn
|
||||
|
||||
# Explicit commit to flush any transient transaction-level state.
|
||||
s_con.commit()
|
||||
|
||||
@@ -14,7 +14,11 @@ from fixtures.neon_fixtures import (
|
||||
NeonEnvBuilder,
|
||||
PgBin,
|
||||
)
|
||||
from fixtures.pageserver.utils import wait_for_last_record_lsn, wait_for_upload
|
||||
from fixtures.pageserver.utils import (
|
||||
timeline_delete_wait_completed,
|
||||
wait_for_last_record_lsn,
|
||||
wait_for_upload,
|
||||
)
|
||||
from fixtures.types import Lsn, TenantId, TimelineId
|
||||
from fixtures.utils import subprocess_capture
|
||||
|
||||
@@ -79,6 +83,7 @@ def test_import_from_vanilla(test_output_dir, pg_bin, vanilla_pg, neon_env_build
|
||||
# Set up pageserver for import
|
||||
neon_env_builder.enable_local_fs_remote_storage()
|
||||
env = neon_env_builder.init_start()
|
||||
|
||||
client = env.pageserver.http_client()
|
||||
client.tenant_create(tenant)
|
||||
|
||||
@@ -145,7 +150,12 @@ def test_import_from_vanilla(test_output_dir, pg_bin, vanilla_pg, neon_env_build
|
||||
)
|
||||
|
||||
# NOTE: delete can easily come before upload operations are completed
|
||||
client.timeline_delete(tenant, timeline)
|
||||
# https://github.com/neondatabase/neon/issues/4326
|
||||
env.pageserver.allowed_errors.append(
|
||||
".*files not bound to index_file.json, proceeding with their deletion.*"
|
||||
)
|
||||
|
||||
timeline_delete_wait_completed(client, tenant, timeline)
|
||||
|
||||
# Importing correct backup works
|
||||
import_tar(base_tar, wal_tar)
|
||||
|
||||
@@ -24,7 +24,13 @@ def test_basic_eviction(
|
||||
test_name="test_download_remote_layers_api",
|
||||
)
|
||||
|
||||
env = neon_env_builder.init_start()
|
||||
env = neon_env_builder.init_start(
|
||||
initial_tenant_conf={
|
||||
# disable gc and compaction background loops because they perform on-demand downloads
|
||||
"gc_period": "0s",
|
||||
"compaction_period": "0s",
|
||||
}
|
||||
)
|
||||
client = env.pageserver.http_client()
|
||||
endpoint = env.endpoints.create_start("main")
|
||||
|
||||
@@ -47,7 +53,12 @@ def test_basic_eviction(
|
||||
client.timeline_checkpoint(tenant_id, timeline_id)
|
||||
wait_for_upload(client, tenant_id, timeline_id, current_lsn)
|
||||
|
||||
timeline_path = env.repo_dir / "tenants" / str(tenant_id) / "timelines" / str(timeline_id)
|
||||
# disable compute & sks to avoid on-demand downloads by walreceiver / getpage
|
||||
endpoint.stop()
|
||||
for sk in env.safekeepers:
|
||||
sk.stop()
|
||||
|
||||
timeline_path = env.timeline_dir(tenant_id, timeline_id)
|
||||
initial_local_layers = sorted(
|
||||
list(filter(lambda path: path.name != "metadata", timeline_path.glob("*")))
|
||||
)
|
||||
|
||||
@@ -9,11 +9,18 @@ def test_neon_cli_basics(neon_env_builder: NeonEnvBuilder, port_distributor: Por
|
||||
try:
|
||||
env.neon_cli.start()
|
||||
env.neon_cli.create_tenant(tenant_id=env.initial_tenant, set_default=True)
|
||||
env.neon_cli.endpoint_start(endpoint_id="ep-main", port=port_distributor.get_port())
|
||||
|
||||
pg_port = port_distributor.get_port()
|
||||
http_port = port_distributor.get_port()
|
||||
env.neon_cli.endpoint_start(
|
||||
endpoint_id="ep-basic-main", pg_port=pg_port, http_port=http_port
|
||||
)
|
||||
|
||||
env.neon_cli.create_branch(new_branch_name="migration_check")
|
||||
pg_port = port_distributor.get_port()
|
||||
http_port = port_distributor.get_port()
|
||||
env.neon_cli.endpoint_start(
|
||||
endpoint_id="ep-migration_check", port=port_distributor.get_port()
|
||||
endpoint_id="ep-migration_check", pg_port=pg_port, http_port=http_port
|
||||
)
|
||||
finally:
|
||||
env.neon_cli.stop()
|
||||
|
||||
@@ -58,11 +58,8 @@ def test_ondemand_download_large_rel(
|
||||
)
|
||||
|
||||
##### First start, insert secret data and upload it to the remote storage
|
||||
env = neon_env_builder.init_start()
|
||||
|
||||
# Override defaults, to create more layers
|
||||
tenant, _ = env.neon_cli.create_tenant(
|
||||
conf={
|
||||
env = neon_env_builder.init_start(
|
||||
initial_tenant_conf={
|
||||
# disable background GC
|
||||
"gc_period": "0s",
|
||||
"gc_horizon": f"{10 * 1024 ** 3}", # 10 GB
|
||||
@@ -75,7 +72,6 @@ def test_ondemand_download_large_rel(
|
||||
"compaction_period": "0s",
|
||||
}
|
||||
)
|
||||
env.initial_tenant = tenant
|
||||
|
||||
endpoint = env.endpoints.create_start("main")
|
||||
|
||||
@@ -717,9 +713,7 @@ def test_ondemand_download_failure_to_replace(
|
||||
# error message is not useful
|
||||
pageserver_http.timeline_detail(tenant_id, timeline_id, True, timeout=2)
|
||||
|
||||
actual_message = (
|
||||
".* ERROR .*replacing downloaded layer into layermap failed because layer was not found"
|
||||
)
|
||||
actual_message = ".* ERROR .*layermap-replace-notfound"
|
||||
assert env.pageserver.log_contains(actual_message) is not None
|
||||
env.pageserver.allowed_errors.append(actual_message)
|
||||
|
||||
|
||||
@@ -17,12 +17,6 @@ def test_pageserver_restarts_under_worload(neon_simple_env: NeonEnv, pg_bin: PgB
|
||||
n_restarts = 10
|
||||
scale = 10
|
||||
|
||||
# the background task may complete the init task delay after finding an
|
||||
# active tenant, but shutdown starts right before Tenant::gc_iteration
|
||||
env.pageserver.allowed_errors.append(
|
||||
r".*Gc failed, retrying in \S+: Cannot run GC iteration on inactive tenant"
|
||||
)
|
||||
|
||||
def run_pgbench(connstr: str):
|
||||
log.info(f"Start a pgbench workload on pg {connstr}")
|
||||
pg_bin.run_capture(["pgbench", "-i", f"-s{scale}", connstr])
|
||||
|
||||
@@ -163,7 +163,6 @@ def test_forward_params_to_client(static_proxy: NeonProxy):
|
||||
assert conn.get_parameter_status(name) == value
|
||||
|
||||
|
||||
@pytest.mark.timeout(5)
|
||||
def test_close_on_connections_exit(static_proxy: NeonProxy):
|
||||
# Open two connections, send SIGTERM, then ensure that proxy doesn't exit
|
||||
# until after connections close.
|
||||
@@ -225,3 +224,37 @@ def test_sql_over_http(static_proxy: NeonProxy):
|
||||
res = q("drop table t")
|
||||
assert res["command"] == "DROP"
|
||||
assert res["rowCount"] is None
|
||||
|
||||
|
||||
def test_sql_over_http_output_options(static_proxy: NeonProxy):
|
||||
static_proxy.safe_psql("create role http2 with login password 'http2' superuser")
|
||||
|
||||
def q(sql: str, raw_text: bool, array_mode: bool, params: List[Any] = []) -> Any:
|
||||
connstr = (
|
||||
f"postgresql://http2:http2@{static_proxy.domain}:{static_proxy.proxy_port}/postgres"
|
||||
)
|
||||
response = requests.post(
|
||||
f"https://{static_proxy.domain}:{static_proxy.external_http_port}/sql",
|
||||
data=json.dumps({"query": sql, "params": params}),
|
||||
headers={
|
||||
"Content-Type": "application/sql",
|
||||
"Neon-Connection-String": connstr,
|
||||
"Neon-Raw-Text-Output": "true" if raw_text else "false",
|
||||
"Neon-Array-Mode": "true" if array_mode else "false",
|
||||
},
|
||||
verify=str(static_proxy.test_output_dir / "proxy.crt"),
|
||||
)
|
||||
assert response.status_code == 200
|
||||
return response.json()
|
||||
|
||||
rows = q("select 1 as n, 'a' as s, '{1,2,3}'::int4[] as arr", False, False)["rows"]
|
||||
assert rows == [{"arr": [1, 2, 3], "n": 1, "s": "a"}]
|
||||
|
||||
rows = q("select 1 as n, 'a' as s, '{1,2,3}'::int4[] as arr", False, True)["rows"]
|
||||
assert rows == [[1, "a", [1, 2, 3]]]
|
||||
|
||||
rows = q("select 1 as n, 'a' as s, '{1,2,3}'::int4[] as arr", True, False)["rows"]
|
||||
assert rows == [{"arr": "{1,2,3}", "n": "1", "s": "a"}]
|
||||
|
||||
rows = q("select 1 as n, 'a' as s, '{1,2,3}'::int4[] as arr", True, True)["rows"]
|
||||
assert rows == [["1", "a", "{1,2,3}"]]
|
||||
|
||||
@@ -20,6 +20,7 @@ from fixtures.neon_fixtures import (
|
||||
)
|
||||
from fixtures.pageserver.http import PageserverApiException, PageserverHttpClient
|
||||
from fixtures.pageserver.utils import (
|
||||
timeline_delete_wait_completed,
|
||||
wait_for_last_record_lsn,
|
||||
wait_for_upload,
|
||||
wait_until_tenant_active,
|
||||
@@ -140,14 +141,19 @@ def test_remote_storage_backup_and_restore(
|
||||
# This is before the failures injected by test_remote_failures, so it's a permanent error.
|
||||
pageserver_http.configure_failpoints(("storage-sync-list-remote-timelines", "return"))
|
||||
env.pageserver.allowed_errors.append(
|
||||
".*error attaching tenant: storage-sync-list-remote-timelines",
|
||||
".*attach failed.*: storage-sync-list-remote-timelines",
|
||||
)
|
||||
# Attach it. This HTTP request will succeed and launch a
|
||||
# background task to load the tenant. In that background task,
|
||||
# listing the remote timelines will fail because of the failpoint,
|
||||
# and the tenant will be marked as Broken.
|
||||
client.tenant_attach(tenant_id)
|
||||
wait_until_tenant_state(pageserver_http, tenant_id, "Broken", 15)
|
||||
|
||||
tenant_info = wait_until_tenant_state(pageserver_http, tenant_id, "Broken", 15)
|
||||
assert tenant_info["attachment_status"] == {
|
||||
"slug": "failed",
|
||||
"data": {"reason": "storage-sync-list-remote-timelines"},
|
||||
}
|
||||
|
||||
# Ensure that even though the tenant is broken, we can't attach it again.
|
||||
with pytest.raises(Exception, match=f"tenant {tenant_id} already exists, state: Broken"):
|
||||
@@ -177,7 +183,7 @@ def test_remote_storage_backup_and_restore(
|
||||
wait_until_tenant_active(
|
||||
pageserver_http=client,
|
||||
tenant_id=tenant_id,
|
||||
iterations=5,
|
||||
iterations=10, # make it longer for real_s3 tests when unreliable wrapper is involved
|
||||
)
|
||||
|
||||
detail = client.timeline_detail(tenant_id, timeline_id)
|
||||
@@ -529,7 +535,7 @@ def test_timeline_deletion_with_files_stuck_in_upload_queue(
|
||||
"pitr_interval": "0s",
|
||||
}
|
||||
)
|
||||
timeline_path = env.repo_dir / "tenants" / str(tenant_id) / "timelines" / str(timeline_id)
|
||||
timeline_path = env.timeline_dir(tenant_id, timeline_id)
|
||||
|
||||
client = env.pageserver.http_client()
|
||||
|
||||
@@ -591,10 +597,22 @@ def test_timeline_deletion_with_files_stuck_in_upload_queue(
|
||||
env.pageserver.allowed_errors.append(
|
||||
".* ERROR .*Error processing HTTP request: InternalServerError\\(timeline is Stopping"
|
||||
)
|
||||
client.timeline_delete(tenant_id, timeline_id)
|
||||
|
||||
env.pageserver.allowed_errors.append(
|
||||
".*files not bound to index_file.json, proceeding with their deletion.*"
|
||||
)
|
||||
timeline_delete_wait_completed(client, tenant_id, timeline_id)
|
||||
|
||||
assert not timeline_path.exists()
|
||||
|
||||
# to please mypy
|
||||
assert isinstance(env.remote_storage, LocalFsStorage)
|
||||
remote_timeline_path = (
|
||||
env.remote_storage.root / "tenants" / str(tenant_id) / "timelines" / str(timeline_id)
|
||||
)
|
||||
|
||||
assert not list(remote_timeline_path.iterdir())
|
||||
|
||||
# timeline deletion should kill ongoing uploads, so, the metric will be gone
|
||||
assert get_queued_count(file_kind="index", op_kind="upload") is None
|
||||
|
||||
@@ -693,15 +711,15 @@ def test_empty_branch_remote_storage_upload_on_restart(
|
||||
f".*POST.* path=/v1/tenant/{env.initial_tenant}/timeline.* request was dropped before completing"
|
||||
)
|
||||
|
||||
# index upload is now hitting the failpoint, should not block the shutdown
|
||||
env.pageserver.stop()
|
||||
# index upload is now hitting the failpoint, it should block the shutdown
|
||||
env.pageserver.stop(immediate=True)
|
||||
|
||||
timeline_path = (
|
||||
Path("tenants") / str(env.initial_tenant) / "timelines" / str(new_branch_timeline_id)
|
||||
)
|
||||
|
||||
local_metadata = env.repo_dir / timeline_path / "metadata"
|
||||
assert local_metadata.is_file(), "timeout cancelled timeline branching, not the upload"
|
||||
assert local_metadata.is_file()
|
||||
|
||||
assert isinstance(env.remote_storage, LocalFsStorage)
|
||||
new_branch_on_remote_storage = env.remote_storage.root / timeline_path
|
||||
|
||||
@@ -37,6 +37,7 @@ class PgSniRouter(PgProtocol):
|
||||
destination: str,
|
||||
tls_cert: Path,
|
||||
tls_key: Path,
|
||||
test_output_dir: Path,
|
||||
):
|
||||
# Must use a hostname rather than IP here, for SNI to work
|
||||
host = "localhost"
|
||||
@@ -49,6 +50,7 @@ class PgSniRouter(PgProtocol):
|
||||
self.tls_cert = tls_cert
|
||||
self.tls_key = tls_key
|
||||
self._popen: Optional[subprocess.Popen[bytes]] = None
|
||||
self.test_output_dir = test_output_dir
|
||||
|
||||
def start(self) -> "PgSniRouter":
|
||||
assert self._popen is None
|
||||
@@ -60,8 +62,12 @@ class PgSniRouter(PgProtocol):
|
||||
*["--destination", self.destination],
|
||||
]
|
||||
|
||||
self._popen = subprocess.Popen(args)
|
||||
router_log_path = self.test_output_dir / "pg_sni_router.log"
|
||||
router_log = open(router_log_path, "w")
|
||||
|
||||
self._popen = subprocess.Popen(args, stderr=router_log)
|
||||
self._wait_until_ready()
|
||||
log.info(f"pg_sni_router started, log file: {router_log_path}")
|
||||
return self
|
||||
|
||||
@backoff.on_exception(backoff.expo, OSError, max_time=10)
|
||||
@@ -121,6 +127,7 @@ def test_pg_sni_router(
|
||||
destination="localtest.me",
|
||||
tls_cert=test_output_dir / "router.crt",
|
||||
tls_key=test_output_dir / "router.key",
|
||||
test_output_dir=test_output_dir,
|
||||
) as router:
|
||||
router.start()
|
||||
|
||||
|
||||
@@ -59,6 +59,13 @@ def test_tenant_reattach(
|
||||
# create new nenant
|
||||
tenant_id, timeline_id = env.neon_cli.create_tenant()
|
||||
|
||||
# Attempts to connect from compute to pageserver while the tenant is
|
||||
# temporarily detached produces these errors in the pageserver log.
|
||||
env.pageserver.allowed_errors.append(f".*Tenant {tenant_id} not found.*")
|
||||
env.pageserver.allowed_errors.append(
|
||||
f".*Tenant {tenant_id} will not become active\\. Current state: Stopping.*"
|
||||
)
|
||||
|
||||
with env.endpoints.create_start("main", tenant_id=tenant_id) as endpoint:
|
||||
with endpoint.cursor() as cur:
|
||||
cur.execute("CREATE TABLE t(key int primary key, value text)")
|
||||
@@ -223,13 +230,6 @@ def test_tenant_reattach_while_busy(
|
||||
)
|
||||
env = neon_env_builder.init_start()
|
||||
|
||||
# Attempts to connect from compute to pageserver while the tenant is
|
||||
# temporarily detached produces these errors in the pageserver log.
|
||||
env.pageserver.allowed_errors.append(".*Tenant .* not found.*")
|
||||
env.pageserver.allowed_errors.append(
|
||||
".*Tenant .* will not become active\\. Current state: Stopping.*"
|
||||
)
|
||||
|
||||
pageserver_http = env.pageserver.http_client()
|
||||
|
||||
# create new nenant
|
||||
@@ -238,6 +238,13 @@ def test_tenant_reattach_while_busy(
|
||||
conf={"checkpoint_distance": "100000"}
|
||||
)
|
||||
|
||||
# Attempts to connect from compute to pageserver while the tenant is
|
||||
# temporarily detached produces these errors in the pageserver log.
|
||||
env.pageserver.allowed_errors.append(f".*Tenant {tenant_id} not found.*")
|
||||
env.pageserver.allowed_errors.append(
|
||||
f".*Tenant {tenant_id} will not become active\\. Current state: Stopping.*"
|
||||
)
|
||||
|
||||
endpoint = env.endpoints.create_start("main", tenant_id=tenant_id)
|
||||
|
||||
cur = endpoint.connect().cursor()
|
||||
@@ -275,6 +282,13 @@ def test_tenant_detach_smoke(neon_env_builder: NeonEnvBuilder):
|
||||
# create new nenant
|
||||
tenant_id, timeline_id = env.neon_cli.create_tenant()
|
||||
|
||||
# Attempts to connect from compute to pageserver while the tenant is
|
||||
# temporarily detached produces these errors in the pageserver log.
|
||||
env.pageserver.allowed_errors.append(f".*Tenant {tenant_id} not found.*")
|
||||
env.pageserver.allowed_errors.append(
|
||||
f".*Tenant {tenant_id} will not become active\\. Current state: Stopping.*"
|
||||
)
|
||||
|
||||
# assert tenant exists on disk
|
||||
assert (env.repo_dir / "tenants" / str(tenant_id)).exists()
|
||||
|
||||
@@ -336,6 +350,13 @@ def test_tenant_detach_ignored_tenant(neon_simple_env: NeonEnv):
|
||||
# create a new tenant
|
||||
tenant_id, _ = env.neon_cli.create_tenant()
|
||||
|
||||
# Attempts to connect from compute to pageserver while the tenant is
|
||||
# temporarily detached produces these errors in the pageserver log.
|
||||
env.pageserver.allowed_errors.append(f".*Tenant {tenant_id} not found.*")
|
||||
env.pageserver.allowed_errors.append(
|
||||
f".*Tenant {tenant_id} will not become active\\. Current state: Stopping.*"
|
||||
)
|
||||
|
||||
# assert tenant exists on disk
|
||||
assert (env.repo_dir / "tenants" / str(tenant_id)).exists()
|
||||
|
||||
@@ -385,6 +406,13 @@ def test_tenant_detach_regular_tenant(neon_simple_env: NeonEnv):
|
||||
# create a new tenant
|
||||
tenant_id, _ = env.neon_cli.create_tenant()
|
||||
|
||||
# Attempts to connect from compute to pageserver while the tenant is
|
||||
# temporarily detached produces these errors in the pageserver log.
|
||||
env.pageserver.allowed_errors.append(f".*Tenant {tenant_id} not found.*")
|
||||
env.pageserver.allowed_errors.append(
|
||||
f".*Tenant {tenant_id} will not become active\\. Current state: Stopping.*"
|
||||
)
|
||||
|
||||
# assert tenant exists on disk
|
||||
assert (env.repo_dir / "tenants" / str(tenant_id)).exists()
|
||||
|
||||
@@ -399,6 +427,7 @@ def test_tenant_detach_regular_tenant(neon_simple_env: NeonEnv):
|
||||
|
||||
log.info("detaching regular tenant with detach ignored flag")
|
||||
client.tenant_detach(tenant_id, True)
|
||||
|
||||
log.info("regular tenant detached without error")
|
||||
|
||||
# check that nothing is left on disk for deleted tenant
|
||||
@@ -432,6 +461,13 @@ def test_detach_while_attaching(
|
||||
tenant_id = TenantId(endpoint.safe_psql("show neon.tenant_id")[0][0])
|
||||
timeline_id = TimelineId(endpoint.safe_psql("show neon.timeline_id")[0][0])
|
||||
|
||||
# Attempts to connect from compute to pageserver while the tenant is
|
||||
# temporarily detached produces these errors in the pageserver log.
|
||||
env.pageserver.allowed_errors.append(f".*Tenant {tenant_id} not found.*")
|
||||
env.pageserver.allowed_errors.append(
|
||||
f".*Tenant {tenant_id} will not become active\\. Current state: Stopping.*"
|
||||
)
|
||||
|
||||
# Create table, and insert some rows. Make it big enough that it doesn't fit in
|
||||
# shared_buffers, otherwise the SELECT after restart will just return answer
|
||||
# from shared_buffers without hitting the page server, which defeats the point
|
||||
@@ -496,7 +532,7 @@ def test_ignored_tenant_reattach(
|
||||
):
|
||||
neon_env_builder.enable_remote_storage(
|
||||
remote_storage_kind=remote_storage_kind,
|
||||
test_name="test_remote_storage_backup_and_restore",
|
||||
test_name="test_ignored_tenant_reattach",
|
||||
)
|
||||
env = neon_env_builder.init_start()
|
||||
pageserver_http = env.pageserver.http_client()
|
||||
@@ -577,6 +613,13 @@ def test_ignored_tenant_download_missing_layers(
|
||||
tenant_id = TenantId(endpoint.safe_psql("show neon.tenant_id")[0][0])
|
||||
timeline_id = TimelineId(endpoint.safe_psql("show neon.timeline_id")[0][0])
|
||||
|
||||
# Attempts to connect from compute to pageserver while the tenant is
|
||||
# temporarily detached produces these errors in the pageserver log.
|
||||
env.pageserver.allowed_errors.append(f".*Tenant {tenant_id} not found.*")
|
||||
env.pageserver.allowed_errors.append(
|
||||
f".*Tenant {tenant_id} will not become active\\. Current state: Stopping.*"
|
||||
)
|
||||
|
||||
data_id = 1
|
||||
data_secret = "very secret secret"
|
||||
insert_test_data(pageserver_http, tenant_id, timeline_id, data_id, data_secret, endpoint)
|
||||
@@ -589,14 +632,14 @@ def test_ignored_tenant_download_missing_layers(
|
||||
|
||||
# ignore the tenant and remove its layers
|
||||
pageserver_http.tenant_ignore(tenant_id)
|
||||
tenant_timeline_dir = env.repo_dir / "tenants" / str(tenant_id) / "timelines" / str(timeline_id)
|
||||
timeline_dir = env.timeline_dir(tenant_id, timeline_id)
|
||||
layers_removed = False
|
||||
for dir_entry in tenant_timeline_dir.iterdir():
|
||||
for dir_entry in timeline_dir.iterdir():
|
||||
if dir_entry.name.startswith("00000"):
|
||||
# Looks like a layer file. Remove it
|
||||
dir_entry.unlink()
|
||||
layers_removed = True
|
||||
assert layers_removed, f"Found no layers for tenant {tenant_timeline_dir}"
|
||||
assert layers_removed, f"Found no layers for tenant {timeline_dir}"
|
||||
|
||||
# now, load it from the local files and expect it to work due to remote storage restoration
|
||||
pageserver_http.tenant_load(tenant_id=tenant_id)
|
||||
@@ -636,18 +679,27 @@ def test_ignored_tenant_stays_broken_without_metadata(
|
||||
tenant_id = TenantId(endpoint.safe_psql("show neon.tenant_id")[0][0])
|
||||
timeline_id = TimelineId(endpoint.safe_psql("show neon.timeline_id")[0][0])
|
||||
|
||||
# Attempts to connect from compute to pageserver while the tenant is
|
||||
# temporarily detached produces these errors in the pageserver log.
|
||||
env.pageserver.allowed_errors.append(f".*Tenant {tenant_id} not found.*")
|
||||
env.pageserver.allowed_errors.append(
|
||||
f".*Tenant {tenant_id} will not become active\\. Current state: Broken.*"
|
||||
)
|
||||
|
||||
# ignore the tenant and remove its metadata
|
||||
pageserver_http.tenant_ignore(tenant_id)
|
||||
tenant_timeline_dir = env.repo_dir / "tenants" / str(tenant_id) / "timelines" / str(timeline_id)
|
||||
timeline_dir = env.timeline_dir(tenant_id, timeline_id)
|
||||
metadata_removed = False
|
||||
for dir_entry in tenant_timeline_dir.iterdir():
|
||||
for dir_entry in timeline_dir.iterdir():
|
||||
if dir_entry.name == "metadata":
|
||||
# Looks like a layer file. Remove it
|
||||
dir_entry.unlink()
|
||||
metadata_removed = True
|
||||
assert metadata_removed, f"Failed to find metadata file in {tenant_timeline_dir}"
|
||||
assert metadata_removed, f"Failed to find metadata file in {timeline_dir}"
|
||||
|
||||
env.pageserver.allowed_errors.append(".*could not load tenant .*?: failed to load metadata.*")
|
||||
env.pageserver.allowed_errors.append(
|
||||
f".*{tenant_id}.*: load failed.*: failed to load metadata.*"
|
||||
)
|
||||
|
||||
# now, load it from the local files and expect it to be broken due to inability to load tenant files into memory
|
||||
pageserver_http.tenant_load(tenant_id=tenant_id)
|
||||
@@ -670,6 +722,13 @@ def test_load_attach_negatives(
|
||||
|
||||
tenant_id = TenantId(endpoint.safe_psql("show neon.tenant_id")[0][0])
|
||||
|
||||
# Attempts to connect from compute to pageserver while the tenant is
|
||||
# temporarily detached produces these errors in the pageserver log.
|
||||
env.pageserver.allowed_errors.append(f".*Tenant {tenant_id} not found.*")
|
||||
env.pageserver.allowed_errors.append(
|
||||
f".*Tenant {tenant_id} will not become active\\. Current state: Stopping.*"
|
||||
)
|
||||
|
||||
env.pageserver.allowed_errors.append(".*tenant .*? already exists, state:.*")
|
||||
with pytest.raises(
|
||||
expected_exception=PageserverApiException,
|
||||
@@ -712,6 +771,13 @@ def test_ignore_while_attaching(
|
||||
tenant_id = TenantId(endpoint.safe_psql("show neon.tenant_id")[0][0])
|
||||
timeline_id = TimelineId(endpoint.safe_psql("show neon.timeline_id")[0][0])
|
||||
|
||||
# Attempts to connect from compute to pageserver while the tenant is
|
||||
# temporarily detached produces these errors in the pageserver log.
|
||||
env.pageserver.allowed_errors.append(f".*Tenant {tenant_id} not found.*")
|
||||
env.pageserver.allowed_errors.append(
|
||||
f".*Tenant {tenant_id} will not become active\\. Current state: Stopping.*"
|
||||
)
|
||||
|
||||
data_id = 1
|
||||
data_secret = "very secret secret"
|
||||
insert_test_data(pageserver_http, tenant_id, timeline_id, data_id, data_secret, endpoint)
|
||||
|
||||
@@ -214,9 +214,7 @@ def switch_pg_to_new_pageserver(
|
||||
|
||||
endpoint.start()
|
||||
|
||||
timeline_to_detach_local_path = (
|
||||
env.repo_dir / "tenants" / str(tenant_id) / "timelines" / str(timeline_id)
|
||||
)
|
||||
timeline_to_detach_local_path = env.timeline_dir(tenant_id, timeline_id)
|
||||
files_before_detach = os.listdir(timeline_to_detach_local_path)
|
||||
assert (
|
||||
"metadata" in files_before_detach
|
||||
@@ -419,8 +417,6 @@ def test_tenant_relocation(
|
||||
new_pageserver_http.tenant_attach(tenant_id)
|
||||
|
||||
# wait for tenant to finish attaching
|
||||
tenant_status = new_pageserver_http.tenant_status(tenant_id=tenant_id)
|
||||
assert tenant_status["state"]["slug"] in ["Attaching", "Active"]
|
||||
wait_until(
|
||||
number_of_iterations=10,
|
||||
interval=1,
|
||||
|
||||
@@ -11,10 +11,12 @@ from fixtures.neon_fixtures import (
|
||||
wait_for_wal_insert_lsn,
|
||||
)
|
||||
from fixtures.pageserver.http import PageserverHttpClient
|
||||
from fixtures.pageserver.utils import timeline_delete_wait_completed
|
||||
from fixtures.pg_version import PgVersion, xfail_on_postgres
|
||||
from fixtures.types import Lsn, TenantId, TimelineId
|
||||
|
||||
|
||||
@pytest.mark.xfail
|
||||
def test_empty_tenant_size(neon_simple_env: NeonEnv, test_output_dir: Path):
|
||||
env = neon_simple_env
|
||||
(tenant_id, _) = env.neon_cli.create_tenant()
|
||||
@@ -43,12 +45,16 @@ def test_empty_tenant_size(neon_simple_env: NeonEnv, test_output_dir: Path):
|
||||
# we've disabled the autovacuum and checkpoint
|
||||
# so background processes should not change the size.
|
||||
# If this test will flake we should probably loosen the check
|
||||
assert size == initial_size, "starting idle compute should not change the tenant size"
|
||||
assert (
|
||||
size == initial_size
|
||||
), f"starting idle compute should not change the tenant size (Currently {size}, expected {initial_size})"
|
||||
|
||||
# the size should be the same, until we increase the size over the
|
||||
# gc_horizon
|
||||
size, inputs = http_client.tenant_size_and_modelinputs(tenant_id)
|
||||
assert size == initial_size, "tenant_size should not be affected by shutdown of compute"
|
||||
assert (
|
||||
size == initial_size
|
||||
), f"tenant_size should not be affected by shutdown of compute (Currently {size}, expected {initial_size})"
|
||||
|
||||
expected_inputs = {
|
||||
"segments": [
|
||||
@@ -317,8 +323,9 @@ def test_only_heads_within_horizon(neon_simple_env: NeonEnv, test_output_dir: Pa
|
||||
size_debug_file.write(size_debug)
|
||||
|
||||
|
||||
@pytest.mark.xfail
|
||||
def test_single_branch_get_tenant_size_grows(
|
||||
neon_env_builder: NeonEnvBuilder, test_output_dir: Path
|
||||
neon_env_builder: NeonEnvBuilder, test_output_dir: Path, pg_version: PgVersion
|
||||
):
|
||||
"""
|
||||
Operate on single branch reading the tenants size after each transaction.
|
||||
@@ -332,7 +339,14 @@ def test_single_branch_get_tenant_size_grows(
|
||||
# inserts is larger than gc_horizon. for example 0x20000 here hid the fact
|
||||
# that there next_gc_cutoff could be smaller than initdb_lsn, which will
|
||||
# obviously lead to issues when calculating the size.
|
||||
gc_horizon = 0x38000
|
||||
gc_horizon = 0x3BA00
|
||||
|
||||
# it's a bit of a hack, but different versions of postgres have different
|
||||
# amount of WAL generated for the same amount of data. so we need to
|
||||
# adjust the gc_horizon accordingly.
|
||||
if pg_version == PgVersion.V14:
|
||||
gc_horizon = 0x4A000
|
||||
|
||||
neon_env_builder.pageserver_config_override = f"tenant_config={{compaction_period='0s', gc_period='0s', pitr_interval='0sec', gc_horizon={gc_horizon}}}"
|
||||
|
||||
env = neon_env_builder.init_start()
|
||||
@@ -352,11 +366,11 @@ def test_single_branch_get_tenant_size_grows(
|
||||
if current_lsn - initdb_lsn >= gc_horizon:
|
||||
assert (
|
||||
size >= prev_size
|
||||
), "tenant_size may grow or not grow, because we only add gc_horizon amount of WAL to initial snapshot size"
|
||||
), f"tenant_size may grow or not grow, because we only add gc_horizon amount of WAL to initial snapshot size (Currently at: {current_lsn}, Init at: {initdb_lsn})"
|
||||
else:
|
||||
assert (
|
||||
size > prev_size
|
||||
), "tenant_size should grow, because we continue to add WAL to initial snapshot size"
|
||||
), f"tenant_size should grow, because we continue to add WAL to initial snapshot size (Currently at: {current_lsn}, Init at: {initdb_lsn})"
|
||||
|
||||
def get_current_consistent_size(
|
||||
env: NeonEnv,
|
||||
@@ -621,12 +635,12 @@ def test_get_tenant_size_with_multiple_branches(
|
||||
size_debug_file_before.write(size_debug)
|
||||
|
||||
# teardown, delete branches, and the size should be going down
|
||||
http_client.timeline_delete(tenant_id, first_branch_timeline_id)
|
||||
timeline_delete_wait_completed(http_client, tenant_id, first_branch_timeline_id)
|
||||
|
||||
size_after_deleting_first = http_client.tenant_size(tenant_id)
|
||||
assert size_after_deleting_first < size_after_thinning_branch
|
||||
|
||||
http_client.timeline_delete(tenant_id, second_branch_timeline_id)
|
||||
timeline_delete_wait_completed(http_client, tenant_id, second_branch_timeline_id)
|
||||
size_after_deleting_second = http_client.tenant_size(tenant_id)
|
||||
assert size_after_deleting_second < size_after_deleting_first
|
||||
|
||||
|
||||
@@ -1,6 +1,10 @@
|
||||
from fixtures.log_helper import log
|
||||
from fixtures.neon_fixtures import NeonEnvBuilder
|
||||
from fixtures.pageserver.utils import assert_tenant_state, wait_until_tenant_active
|
||||
from fixtures.pageserver.utils import (
|
||||
assert_tenant_state,
|
||||
timeline_delete_wait_completed,
|
||||
wait_until_tenant_active,
|
||||
)
|
||||
from fixtures.types import TenantId, TimelineId
|
||||
from fixtures.utils import wait_until
|
||||
|
||||
@@ -24,7 +28,7 @@ def test_tenant_tasks(neon_env_builder: NeonEnvBuilder):
|
||||
def delete_all_timelines(tenant: TenantId):
|
||||
timelines = [TimelineId(t["timeline_id"]) for t in client.timeline_list(tenant)]
|
||||
for t in timelines:
|
||||
client.timeline_delete(tenant, t)
|
||||
timeline_delete_wait_completed(client, tenant, t)
|
||||
|
||||
# Create tenant, start compute
|
||||
tenant, _ = env.neon_cli.create_tenant()
|
||||
|
||||
@@ -21,8 +21,9 @@ from fixtures.neon_fixtures import (
|
||||
RemoteStorageKind,
|
||||
available_remote_storages,
|
||||
)
|
||||
from fixtures.pageserver.http import PageserverApiException
|
||||
from fixtures.pageserver.utils import timeline_delete_wait_completed
|
||||
from fixtures.types import Lsn, TenantId, TimelineId
|
||||
from fixtures.utils import wait_until
|
||||
from prometheus_client.samples import Sample
|
||||
|
||||
|
||||
@@ -213,7 +214,7 @@ def test_metrics_normal_work(neon_env_builder: NeonEnvBuilder):
|
||||
# Test (a subset of) pageserver global metrics
|
||||
for metric in PAGESERVER_GLOBAL_METRICS:
|
||||
ps_samples = ps_metrics.query_all(metric, {})
|
||||
assert len(ps_samples) > 0
|
||||
assert len(ps_samples) > 0, f"expected at least one sample for {metric}"
|
||||
for sample in ps_samples:
|
||||
labels = ",".join([f'{key}="{value}"' for key, value in sample.labels.items()])
|
||||
log.info(f"{sample.name}{{{labels}}} {sample.value}")
|
||||
@@ -267,6 +268,7 @@ def test_pageserver_metrics_removed_after_detach(
|
||||
cur.execute("INSERT INTO t SELECT generate_series(1,100000), 'payload'")
|
||||
cur.execute("SELECT sum(key) FROM t")
|
||||
assert cur.fetchone() == (5000050000,)
|
||||
endpoint.stop()
|
||||
|
||||
def get_ps_metric_samples_for_tenant(tenant_id: TenantId) -> List[Sample]:
|
||||
ps_metrics = env.pageserver.http_client().get_metrics()
|
||||
@@ -309,9 +311,7 @@ def test_pageserver_with_empty_tenants(
|
||||
env.pageserver.allowed_errors.append(
|
||||
".*marking .* as locally complete, while it doesnt exist in remote index.*"
|
||||
)
|
||||
env.pageserver.allowed_errors.append(
|
||||
".*could not load tenant.*Failed to list timelines directory.*"
|
||||
)
|
||||
env.pageserver.allowed_errors.append(".*load failed.*list timelines directory.*")
|
||||
|
||||
client = env.pageserver.http_client()
|
||||
|
||||
@@ -319,9 +319,10 @@ def test_pageserver_with_empty_tenants(
|
||||
client.tenant_create(tenant_with_empty_timelines)
|
||||
temp_timelines = client.timeline_list(tenant_with_empty_timelines)
|
||||
for temp_timeline in temp_timelines:
|
||||
client.timeline_delete(
|
||||
tenant_with_empty_timelines, TimelineId(temp_timeline["timeline_id"])
|
||||
timeline_delete_wait_completed(
|
||||
client, tenant_with_empty_timelines, TimelineId(temp_timeline["timeline_id"])
|
||||
)
|
||||
|
||||
files_in_timelines_dir = sum(
|
||||
1
|
||||
for _p in Path.iterdir(
|
||||
@@ -342,9 +343,15 @@ def test_pageserver_with_empty_tenants(
|
||||
env.pageserver.start()
|
||||
|
||||
client = env.pageserver.http_client()
|
||||
tenants = client.tenant_list()
|
||||
|
||||
assert len(tenants) == 2
|
||||
def not_loading():
|
||||
tenants = client.tenant_list()
|
||||
assert len(tenants) == 2
|
||||
assert all(t["state"]["slug"] != "Loading" for t in tenants)
|
||||
|
||||
wait_until(10, 0.2, not_loading)
|
||||
|
||||
tenants = client.tenant_list()
|
||||
|
||||
[broken_tenant] = [t for t in tenants if t["id"] == str(tenant_without_timelines_dir)]
|
||||
assert (
|
||||
@@ -356,7 +363,7 @@ def test_pageserver_with_empty_tenants(
|
||||
broken_tenant_status["state"]["slug"] == "Broken"
|
||||
), f"Tenant {tenant_without_timelines_dir} without timelines dir should be broken"
|
||||
|
||||
assert env.pageserver.log_contains(".*Setting tenant as Broken state, reason:.*")
|
||||
assert env.pageserver.log_contains(".*load failed, setting tenant state to Broken:.*")
|
||||
|
||||
[loaded_tenant] = [t for t in tenants if t["id"] == str(tenant_with_empty_timelines)]
|
||||
assert (
|
||||
|
||||
@@ -257,7 +257,7 @@ def test_tenant_redownloads_truncated_file_on_startup(
|
||||
env.endpoints.stop_all()
|
||||
env.pageserver.stop()
|
||||
|
||||
timeline_dir = Path(env.repo_dir) / "tenants" / str(tenant_id) / "timelines" / str(timeline_id)
|
||||
timeline_dir = env.timeline_dir(tenant_id, timeline_id)
|
||||
local_layer_truncated = None
|
||||
for path in Path.iterdir(timeline_dir):
|
||||
if path.name.startswith("00000"):
|
||||
|
||||
@@ -3,6 +3,7 @@ import queue
|
||||
import shutil
|
||||
import threading
|
||||
from pathlib import Path
|
||||
from typing import Optional
|
||||
|
||||
import pytest
|
||||
import requests
|
||||
@@ -11,13 +12,17 @@ from fixtures.neon_fixtures import (
|
||||
NeonEnv,
|
||||
NeonEnvBuilder,
|
||||
RemoteStorageKind,
|
||||
S3Storage,
|
||||
available_remote_storages,
|
||||
)
|
||||
from fixtures.pageserver.http import PageserverApiException
|
||||
from fixtures.pageserver.utils import (
|
||||
timeline_delete_wait_completed,
|
||||
wait_for_last_record_lsn,
|
||||
wait_for_upload,
|
||||
wait_timeline_detail_404,
|
||||
wait_until_tenant_active,
|
||||
wait_until_timeline_state,
|
||||
)
|
||||
from fixtures.types import Lsn, TenantId, TimelineId
|
||||
from fixtures.utils import query_scalar, wait_until
|
||||
@@ -68,7 +73,7 @@ def test_timeline_delete(neon_simple_env: NeonEnv):
|
||||
|
||||
ps_http.timeline_delete(env.initial_tenant, parent_timeline_id)
|
||||
|
||||
assert exc.value.status_code == 400
|
||||
assert exc.value.status_code == 412
|
||||
|
||||
timeline_path = (
|
||||
env.repo_dir / "tenants" / str(env.initial_tenant) / "timelines" / str(leaf_timeline_id)
|
||||
@@ -79,7 +84,7 @@ def test_timeline_delete(neon_simple_env: NeonEnv):
|
||||
wait_until(
|
||||
number_of_iterations=3,
|
||||
interval=0.2,
|
||||
func=lambda: ps_http.timeline_delete(env.initial_tenant, leaf_timeline_id),
|
||||
func=lambda: timeline_delete_wait_completed(ps_http, env.initial_tenant, leaf_timeline_id),
|
||||
)
|
||||
|
||||
assert not timeline_path.exists()
|
||||
@@ -90,16 +95,16 @@ def test_timeline_delete(neon_simple_env: NeonEnv):
|
||||
match=f"Timeline {env.initial_tenant}/{leaf_timeline_id} was not found",
|
||||
) as exc:
|
||||
ps_http.timeline_detail(env.initial_tenant, leaf_timeline_id)
|
||||
|
||||
# FIXME leaves tenant without timelines, should we prevent deletion of root timeline?
|
||||
wait_until(
|
||||
number_of_iterations=3,
|
||||
interval=0.2,
|
||||
func=lambda: ps_http.timeline_delete(env.initial_tenant, parent_timeline_id),
|
||||
)
|
||||
|
||||
assert exc.value.status_code == 404
|
||||
|
||||
wait_until(
|
||||
number_of_iterations=3,
|
||||
interval=0.2,
|
||||
func=lambda: timeline_delete_wait_completed(
|
||||
ps_http, env.initial_tenant, parent_timeline_id
|
||||
),
|
||||
)
|
||||
|
||||
# Check that we didn't pick up the timeline again after restart.
|
||||
# See https://github.com/neondatabase/neon/issues/3560
|
||||
env.pageserver.stop(immediate=True)
|
||||
@@ -130,13 +135,24 @@ def test_delete_timeline_post_rm_failure(
|
||||
env = neon_env_builder.init_start()
|
||||
assert env.initial_timeline
|
||||
|
||||
env.pageserver.allowed_errors.append(".*Error: failpoint: timeline-delete-after-rm")
|
||||
env.pageserver.allowed_errors.append(".*Ignoring state update Stopping for broken timeline")
|
||||
|
||||
ps_http = env.pageserver.http_client()
|
||||
|
||||
failpoint_name = "timeline-delete-after-rm"
|
||||
ps_http.configure_failpoints((failpoint_name, "return"))
|
||||
|
||||
with pytest.raises(PageserverApiException, match=f"failpoint: {failpoint_name}"):
|
||||
ps_http.timeline_delete(env.initial_tenant, env.initial_timeline)
|
||||
ps_http.timeline_delete(env.initial_tenant, env.initial_timeline)
|
||||
timeline_info = wait_until_timeline_state(
|
||||
pageserver_http=ps_http,
|
||||
tenant_id=env.initial_tenant,
|
||||
timeline_id=env.initial_timeline,
|
||||
expected_state="Broken",
|
||||
iterations=2, # effectively try immediately and retry once in one second
|
||||
)
|
||||
|
||||
timeline_info["state"]["Broken"]["reason"] == "failpoint: timeline-delete-after-rm"
|
||||
|
||||
at_failpoint_log_message = f".*{env.initial_timeline}.*at failpoint {failpoint_name}.*"
|
||||
env.pageserver.allowed_errors.append(at_failpoint_log_message)
|
||||
@@ -148,11 +164,8 @@ def test_delete_timeline_post_rm_failure(
|
||||
ps_http.configure_failpoints((failpoint_name, "off"))
|
||||
|
||||
# this should succeed
|
||||
ps_http.timeline_delete(env.initial_tenant, env.initial_timeline, timeout=2)
|
||||
# the second call will try to transition the timeline into Stopping state, but it's already in that state
|
||||
env.pageserver.allowed_errors.append(
|
||||
f".*{env.initial_timeline}.*Ignoring new state, equal to the existing one: Stopping"
|
||||
)
|
||||
# this also checks that delete can be retried even when timeline is in Broken state
|
||||
timeline_delete_wait_completed(ps_http, env.initial_tenant, env.initial_timeline)
|
||||
env.pageserver.allowed_errors.append(
|
||||
f".*{env.initial_timeline}.*timeline directory not found, proceeding anyway.*"
|
||||
)
|
||||
@@ -228,7 +241,7 @@ def test_timeline_resurrection_on_attach(
|
||||
pass
|
||||
|
||||
# delete new timeline
|
||||
ps_http.timeline_delete(tenant_id=tenant_id, timeline_id=branch_timeline_id)
|
||||
timeline_delete_wait_completed(ps_http, tenant_id=tenant_id, timeline_id=branch_timeline_id)
|
||||
|
||||
##### Stop the pageserver instance, erase all its data
|
||||
env.endpoints.stop_all()
|
||||
@@ -252,12 +265,31 @@ def test_timeline_resurrection_on_attach(
|
||||
assert all([tl["state"] == "Active" for tl in timelines])
|
||||
|
||||
|
||||
def assert_prefix_empty(neon_env_builder: NeonEnvBuilder, prefix: Optional[str] = None):
|
||||
# For local_fs we need to properly handle empty directories, which we currently dont, so for simplicity stick to s3 api.
|
||||
assert neon_env_builder.remote_storage_kind in (
|
||||
RemoteStorageKind.MOCK_S3,
|
||||
RemoteStorageKind.REAL_S3,
|
||||
)
|
||||
# For mypy
|
||||
assert isinstance(neon_env_builder.remote_storage, S3Storage)
|
||||
|
||||
# Note that this doesnt use pagination, so list is not guaranteed to be exhaustive.
|
||||
response = neon_env_builder.remote_storage_client.list_objects_v2(
|
||||
Bucket=neon_env_builder.remote_storage.bucket_name,
|
||||
Prefix=prefix or neon_env_builder.remote_storage.prefix_in_bucket or "",
|
||||
)
|
||||
objects = response.get("Contents")
|
||||
assert (
|
||||
response["KeyCount"] == 0
|
||||
), f"remote dir with prefix {prefix} is not empty after deletion: {objects}"
|
||||
|
||||
|
||||
def test_timeline_delete_fail_before_local_delete(neon_env_builder: NeonEnvBuilder):
|
||||
"""
|
||||
When deleting a timeline, if we succeed in setting the deleted flag remotely
|
||||
but fail to delete the local state, restarting the pageserver should resume
|
||||
the deletion of the local state.
|
||||
(Deletion of the state in S3 is not implemented yet.)
|
||||
"""
|
||||
|
||||
neon_env_builder.enable_remote_storage(
|
||||
@@ -271,8 +303,9 @@ def test_timeline_delete_fail_before_local_delete(neon_env_builder: NeonEnvBuild
|
||||
env.pageserver.allowed_errors.append(
|
||||
".*Ignoring new state, equal to the existing one: Stopping"
|
||||
)
|
||||
# this happens, because the stuck timeline is visible to shutdown
|
||||
env.pageserver.allowed_errors.append(
|
||||
".*during shutdown: cannot flush frozen layers when flush_loop is not running, state is Exited"
|
||||
".*freeze_and_flush_on_shutdown.+: failed to freeze and flush: cannot flush frozen layers when flush_loop is not running, state is Exited"
|
||||
)
|
||||
|
||||
ps_http = env.pageserver.http_client()
|
||||
@@ -292,11 +325,16 @@ def test_timeline_delete_fail_before_local_delete(neon_env_builder: NeonEnvBuild
|
||||
env.repo_dir / "tenants" / str(env.initial_tenant) / "timelines" / str(leaf_timeline_id)
|
||||
)
|
||||
|
||||
with pytest.raises(
|
||||
PageserverApiException,
|
||||
match="failpoint: timeline-delete-before-rm",
|
||||
):
|
||||
ps_http.timeline_delete(env.initial_tenant, leaf_timeline_id)
|
||||
ps_http.timeline_delete(env.initial_tenant, leaf_timeline_id)
|
||||
timeline_info = wait_until_timeline_state(
|
||||
pageserver_http=ps_http,
|
||||
tenant_id=env.initial_tenant,
|
||||
timeline_id=leaf_timeline_id,
|
||||
expected_state="Broken",
|
||||
iterations=2, # effectively try immediately and retry once in one second
|
||||
)
|
||||
|
||||
timeline_info["state"]["Broken"]["reason"] == "failpoint: timeline-delete-after-rm"
|
||||
|
||||
assert leaf_timeline_path.exists(), "the failpoint didn't work"
|
||||
|
||||
@@ -304,7 +342,17 @@ def test_timeline_delete_fail_before_local_delete(neon_env_builder: NeonEnvBuild
|
||||
env.pageserver.start()
|
||||
|
||||
# Wait for tenant to finish loading.
|
||||
wait_until_tenant_active(ps_http, tenant_id=env.initial_tenant, iterations=10, period=0.5)
|
||||
wait_until_tenant_active(ps_http, tenant_id=env.initial_tenant, iterations=10, period=1)
|
||||
|
||||
try:
|
||||
data = ps_http.timeline_detail(env.initial_tenant, leaf_timeline_id)
|
||||
log.debug(f"detail {data}")
|
||||
except PageserverApiException as e:
|
||||
log.debug(e)
|
||||
if e.status_code != 404:
|
||||
raise
|
||||
else:
|
||||
raise Exception("detail succeeded (it should return 404)")
|
||||
|
||||
assert (
|
||||
not leaf_timeline_path.exists()
|
||||
@@ -316,24 +364,67 @@ def test_timeline_delete_fail_before_local_delete(neon_env_builder: NeonEnvBuild
|
||||
}, "other timelines should not have been affected"
|
||||
assert all([tl["state"] == "Active" for tl in timelines])
|
||||
|
||||
assert_prefix_empty(
|
||||
neon_env_builder,
|
||||
prefix="/".join(
|
||||
(
|
||||
"tenants",
|
||||
str(env.initial_tenant),
|
||||
"timelines",
|
||||
str(leaf_timeline_id),
|
||||
)
|
||||
),
|
||||
)
|
||||
|
||||
def test_concurrent_timeline_delete_if_first_stuck_at_index_upload(
|
||||
neon_env_builder: NeonEnvBuilder,
|
||||
assert env.initial_timeline is not None
|
||||
|
||||
for timeline_id in (intermediate_timeline_id, env.initial_timeline):
|
||||
timeline_delete_wait_completed(
|
||||
ps_http, tenant_id=env.initial_tenant, timeline_id=timeline_id
|
||||
)
|
||||
|
||||
assert_prefix_empty(
|
||||
neon_env_builder,
|
||||
prefix="/".join(
|
||||
(
|
||||
"tenants",
|
||||
str(env.initial_tenant),
|
||||
"timelines",
|
||||
str(timeline_id),
|
||||
)
|
||||
),
|
||||
)
|
||||
|
||||
# for some reason the check above doesnt immediately take effect for the below.
|
||||
# Assume it is mock server incosistency and check twice.
|
||||
wait_until(
|
||||
2,
|
||||
0.5,
|
||||
lambda: assert_prefix_empty(neon_env_builder),
|
||||
)
|
||||
|
||||
|
||||
@pytest.mark.parametrize(
|
||||
"stuck_failpoint",
|
||||
["persist_deleted_index_part", "in_progress_delete"],
|
||||
)
|
||||
def test_concurrent_timeline_delete_stuck_on(
|
||||
neon_env_builder: NeonEnvBuilder, stuck_failpoint: str
|
||||
):
|
||||
"""
|
||||
If we're stuck uploading the index file with the is_delete flag,
|
||||
eventually console will hand up and retry.
|
||||
If we're still stuck at the retry time, ensure that the retry
|
||||
fails with status 500, signalling to console that it should retry
|
||||
later.
|
||||
Ideally, timeline_delete should return 202 Accepted and require
|
||||
console to poll for completion, but, that would require changing
|
||||
the API contract.
|
||||
If delete is stuck console will eventually retry deletion.
|
||||
So we need to be sure that these requests wont interleave with each other.
|
||||
In this tests we check two places where we can spend a lot of time.
|
||||
This is a regression test because there was a bug when DeletionGuard wasnt propagated
|
||||
to the background task.
|
||||
|
||||
Ensure that when retry comes if we're still stuck request will get an immediate error response,
|
||||
signalling to console that it should retry later.
|
||||
"""
|
||||
|
||||
neon_env_builder.enable_remote_storage(
|
||||
remote_storage_kind=RemoteStorageKind.MOCK_S3,
|
||||
test_name="test_concurrent_timeline_delete_if_first_stuck_at_index_upload",
|
||||
test_name=f"concurrent_timeline_delete_stuck_on_{stuck_failpoint}",
|
||||
)
|
||||
|
||||
env = neon_env_builder.init_start()
|
||||
@@ -343,13 +434,14 @@ def test_concurrent_timeline_delete_if_first_stuck_at_index_upload(
|
||||
ps_http = env.pageserver.http_client()
|
||||
|
||||
# make the first call sleep practically forever
|
||||
failpoint_name = "persist_index_part_with_deleted_flag_after_set_before_upload_pause"
|
||||
ps_http.configure_failpoints((failpoint_name, "pause"))
|
||||
ps_http.configure_failpoints((stuck_failpoint, "pause"))
|
||||
|
||||
def first_call(result_queue):
|
||||
try:
|
||||
log.info("first call start")
|
||||
ps_http.timeline_delete(env.initial_tenant, child_timeline_id, timeout=10)
|
||||
timeline_delete_wait_completed(
|
||||
ps_http, env.initial_tenant, child_timeline_id, timeout=10
|
||||
)
|
||||
log.info("first call success")
|
||||
result_queue.put("success")
|
||||
except Exception:
|
||||
@@ -364,17 +456,17 @@ def test_concurrent_timeline_delete_if_first_stuck_at_index_upload(
|
||||
|
||||
def first_call_hit_failpoint():
|
||||
assert env.pageserver.log_contains(
|
||||
f".*{child_timeline_id}.*at failpoint {failpoint_name}"
|
||||
f".*{child_timeline_id}.*at failpoint {stuck_failpoint}"
|
||||
)
|
||||
|
||||
wait_until(50, 0.1, first_call_hit_failpoint)
|
||||
|
||||
# make the second call and assert behavior
|
||||
log.info("second call start")
|
||||
error_msg_re = "another task is already setting the deleted_flag, started at"
|
||||
error_msg_re = "Timeline deletion is already in progress"
|
||||
with pytest.raises(PageserverApiException, match=error_msg_re) as second_call_err:
|
||||
ps_http.timeline_delete(env.initial_tenant, child_timeline_id)
|
||||
assert second_call_err.value.status_code == 500
|
||||
assert second_call_err.value.status_code == 409
|
||||
env.pageserver.allowed_errors.append(f".*{child_timeline_id}.*{error_msg_re}.*")
|
||||
# the second call will try to transition the timeline into Stopping state as well
|
||||
env.pageserver.allowed_errors.append(
|
||||
@@ -382,8 +474,12 @@ def test_concurrent_timeline_delete_if_first_stuck_at_index_upload(
|
||||
)
|
||||
log.info("second call failed as expected")
|
||||
|
||||
# ensure it is not 404 and stopping
|
||||
detail = ps_http.timeline_detail(env.initial_tenant, child_timeline_id)
|
||||
assert detail["state"] == "Stopping"
|
||||
|
||||
# by now we know that the second call failed, let's ensure the first call will finish
|
||||
ps_http.configure_failpoints((failpoint_name, "off"))
|
||||
ps_http.configure_failpoints((stuck_failpoint, "off"))
|
||||
|
||||
result = first_call_result.get()
|
||||
assert result == "success"
|
||||
@@ -396,8 +492,10 @@ def test_concurrent_timeline_delete_if_first_stuck_at_index_upload(
|
||||
|
||||
def test_delete_timeline_client_hangup(neon_env_builder: NeonEnvBuilder):
|
||||
"""
|
||||
If the client hangs up before we start the index part upload but after we mark it
|
||||
If the client hangs up before we start the index part upload but after deletion is scheduled
|
||||
we mark it
|
||||
deleted in local memory, a subsequent delete_timeline call should be able to do
|
||||
|
||||
another delete timeline operation.
|
||||
|
||||
This tests cancel safety up to the given failpoint.
|
||||
@@ -413,12 +511,18 @@ def test_delete_timeline_client_hangup(neon_env_builder: NeonEnvBuilder):
|
||||
|
||||
ps_http = env.pageserver.http_client()
|
||||
|
||||
failpoint_name = "persist_index_part_with_deleted_flag_after_set_before_upload_pause"
|
||||
failpoint_name = "persist_deleted_index_part"
|
||||
ps_http.configure_failpoints((failpoint_name, "pause"))
|
||||
|
||||
with pytest.raises(requests.exceptions.Timeout):
|
||||
ps_http.timeline_delete(env.initial_tenant, child_timeline_id, timeout=2)
|
||||
|
||||
env.pageserver.allowed_errors.append(
|
||||
f".*{child_timeline_id}.*Timeline deletion is already in progress.*"
|
||||
)
|
||||
with pytest.raises(PageserverApiException, match="Timeline deletion is already in progress"):
|
||||
ps_http.timeline_delete(env.initial_tenant, child_timeline_id, timeout=2)
|
||||
|
||||
# make sure the timeout was due to the failpoint
|
||||
at_failpoint_log_message = f".*{child_timeline_id}.*at failpoint {failpoint_name}.*"
|
||||
|
||||
@@ -437,12 +541,96 @@ def test_delete_timeline_client_hangup(neon_env_builder: NeonEnvBuilder):
|
||||
|
||||
wait_until(50, 0.1, got_hangup_log_message)
|
||||
|
||||
# ok, retry without failpoint, it should succeed
|
||||
# check that the timeline is still present
|
||||
ps_http.timeline_detail(env.initial_tenant, child_timeline_id)
|
||||
|
||||
# ok, disable the failpoint to let the deletion finish
|
||||
ps_http.configure_failpoints((failpoint_name, "off"))
|
||||
|
||||
# this should succeed
|
||||
ps_http.timeline_delete(env.initial_tenant, child_timeline_id, timeout=2)
|
||||
# the second call will try to transition the timeline into Stopping state, but it's already in that state
|
||||
env.pageserver.allowed_errors.append(
|
||||
f".*{child_timeline_id}.*Ignoring new state, equal to the existing one: Stopping"
|
||||
def first_request_finished():
|
||||
message = f".*DELETE.*{child_timeline_id}.*Cancelled request finished"
|
||||
assert env.pageserver.log_contains(message)
|
||||
|
||||
wait_until(50, 0.1, first_request_finished)
|
||||
|
||||
# check that the timeline is gone
|
||||
wait_timeline_detail_404(ps_http, env.initial_tenant, child_timeline_id)
|
||||
|
||||
|
||||
@pytest.mark.parametrize(
|
||||
"remote_storage_kind",
|
||||
list(
|
||||
filter(
|
||||
lambda s: s in (RemoteStorageKind.MOCK_S3, RemoteStorageKind.REAL_S3),
|
||||
available_remote_storages(),
|
||||
)
|
||||
),
|
||||
)
|
||||
def test_timeline_delete_works_for_remote_smoke(
|
||||
neon_env_builder: NeonEnvBuilder,
|
||||
remote_storage_kind: RemoteStorageKind,
|
||||
):
|
||||
neon_env_builder.enable_remote_storage(
|
||||
remote_storage_kind=remote_storage_kind,
|
||||
test_name="test_timeline_delete_works_for_remote_smoke",
|
||||
)
|
||||
|
||||
env = neon_env_builder.init_start()
|
||||
|
||||
ps_http = env.pageserver.http_client()
|
||||
pg = env.endpoints.create_start("main")
|
||||
|
||||
tenant_id = TenantId(pg.safe_psql("show neon.tenant_id")[0][0])
|
||||
main_timeline_id = TimelineId(pg.safe_psql("show neon.timeline_id")[0][0])
|
||||
|
||||
assert tenant_id == env.initial_tenant
|
||||
assert main_timeline_id == env.initial_timeline
|
||||
|
||||
timeline_ids = [env.initial_timeline]
|
||||
for i in range(2):
|
||||
branch_timeline_id = env.neon_cli.create_branch(f"new{i}", "main")
|
||||
pg = env.endpoints.create_start(f"new{i}")
|
||||
|
||||
with pg.cursor() as cur:
|
||||
cur.execute("CREATE TABLE f (i integer);")
|
||||
cur.execute("INSERT INTO f VALUES (generate_series(1,1000));")
|
||||
current_lsn = Lsn(query_scalar(cur, "SELECT pg_current_wal_flush_lsn()"))
|
||||
|
||||
# wait until pageserver receives that data
|
||||
wait_for_last_record_lsn(ps_http, tenant_id, branch_timeline_id, current_lsn)
|
||||
|
||||
# run checkpoint manually to be sure that data landed in remote storage
|
||||
ps_http.timeline_checkpoint(tenant_id, branch_timeline_id)
|
||||
|
||||
# wait until pageserver successfully uploaded a checkpoint to remote storage
|
||||
log.info("waiting for checkpoint upload")
|
||||
wait_for_upload(ps_http, tenant_id, branch_timeline_id, current_lsn)
|
||||
log.info("upload of checkpoint is done")
|
||||
timeline_id = TimelineId(pg.safe_psql("show neon.timeline_id")[0][0])
|
||||
|
||||
timeline_ids.append(timeline_id)
|
||||
|
||||
for timeline_id in reversed(timeline_ids):
|
||||
# note that we need to finish previous deletion before scheduling next one
|
||||
# otherwise we can get an "HasChildren" error if deletion is not fast enough (real_s3)
|
||||
timeline_delete_wait_completed(ps_http, tenant_id=tenant_id, timeline_id=timeline_id)
|
||||
|
||||
assert_prefix_empty(
|
||||
neon_env_builder,
|
||||
prefix="/".join(
|
||||
(
|
||||
"tenants",
|
||||
str(env.initial_tenant),
|
||||
"timelines",
|
||||
str(timeline_id),
|
||||
)
|
||||
),
|
||||
)
|
||||
|
||||
# for some reason the check above doesnt immediately take effect for the below.
|
||||
# Assume it is mock server incosistency and check twice.
|
||||
wait_until(
|
||||
2,
|
||||
0.5,
|
||||
lambda: assert_prefix_empty(neon_env_builder),
|
||||
)
|
||||
|
||||
@@ -24,6 +24,7 @@ from fixtures.neon_fixtures import (
|
||||
from fixtures.pageserver.http import PageserverApiException, PageserverHttpClient
|
||||
from fixtures.pageserver.utils import (
|
||||
assert_tenant_state,
|
||||
timeline_delete_wait_completed,
|
||||
wait_for_upload_queue_empty,
|
||||
wait_until_tenant_active,
|
||||
)
|
||||
@@ -272,7 +273,7 @@ def test_timeline_initial_logical_size_calculation_cancellation(
|
||||
if deletion_method == "tenant_detach":
|
||||
client.tenant_detach(tenant_id)
|
||||
elif deletion_method == "timeline_delete":
|
||||
client.timeline_delete(tenant_id, timeline_id)
|
||||
timeline_delete_wait_completed(client, tenant_id, timeline_id)
|
||||
delete_timeline_success.put(True)
|
||||
except PageserverApiException:
|
||||
delete_timeline_success.put(False)
|
||||
@@ -415,6 +416,7 @@ def test_timeline_physical_size_post_compaction(
|
||||
wait_for_last_flush_lsn(env, endpoint, env.initial_tenant, new_timeline_id)
|
||||
|
||||
# shutdown safekeepers to prevent new data from coming in
|
||||
endpoint.stop() # We can't gracefully stop after safekeepers die
|
||||
for sk in env.safekeepers:
|
||||
sk.stop()
|
||||
|
||||
|
||||
@@ -31,7 +31,11 @@ from fixtures.neon_fixtures import (
|
||||
SafekeeperPort,
|
||||
available_remote_storages,
|
||||
)
|
||||
from fixtures.pageserver.utils import wait_for_last_record_lsn, wait_for_upload
|
||||
from fixtures.pageserver.utils import (
|
||||
timeline_delete_wait_completed,
|
||||
wait_for_last_record_lsn,
|
||||
wait_for_upload,
|
||||
)
|
||||
from fixtures.pg_version import PgVersion
|
||||
from fixtures.types import Lsn, TenantId, TimelineId
|
||||
from fixtures.utils import get_dir_size, query_scalar, start_in_background
|
||||
@@ -548,15 +552,15 @@ def test_s3_wal_replay(neon_env_builder: NeonEnvBuilder, remote_storage_kind: Re
|
||||
f"sk_id={sk.id} to flush {last_lsn}",
|
||||
)
|
||||
|
||||
ps_cli = env.pageserver.http_client()
|
||||
pageserver_lsn = Lsn(ps_cli.timeline_detail(tenant_id, timeline_id)["last_record_lsn"])
|
||||
ps_http = env.pageserver.http_client()
|
||||
pageserver_lsn = Lsn(ps_http.timeline_detail(tenant_id, timeline_id)["last_record_lsn"])
|
||||
lag = last_lsn - pageserver_lsn
|
||||
log.info(
|
||||
f"Pageserver last_record_lsn={pageserver_lsn}; flush_lsn={last_lsn}; lag before replay is {lag / 1024}kb"
|
||||
)
|
||||
|
||||
endpoint.stop_and_destroy()
|
||||
ps_cli.timeline_delete(tenant_id, timeline_id)
|
||||
timeline_delete_wait_completed(ps_http, tenant_id, timeline_id)
|
||||
|
||||
# Also delete and manually create timeline on safekeepers -- this tests
|
||||
# scenario of manual recovery on different set of safekeepers.
|
||||
@@ -571,11 +575,21 @@ def test_s3_wal_replay(neon_env_builder: NeonEnvBuilder, remote_storage_kind: Re
|
||||
|
||||
pg_version = sk.http_client().timeline_status(tenant_id, timeline_id).pg_version
|
||||
|
||||
# Terminate first all safekeepers to prevent communication unexpectantly
|
||||
# advancing peer_horizon_lsn.
|
||||
for sk in env.safekeepers:
|
||||
cli = sk.http_client()
|
||||
cli.timeline_delete_force(tenant_id, timeline_id)
|
||||
# restart safekeeper to clear its in-memory state
|
||||
sk.stop().start()
|
||||
sk.stop()
|
||||
# wait all potenital in flight pushes to broker arrive before starting
|
||||
# safekeepers (even without sleep, it is very unlikely they are not
|
||||
# delivered yet).
|
||||
time.sleep(1)
|
||||
|
||||
for sk in env.safekeepers:
|
||||
sk.start()
|
||||
cli = sk.http_client()
|
||||
cli.timeline_create(tenant_id, timeline_id, pg_version, last_lsn)
|
||||
f_partial_path = (
|
||||
Path(sk.data_dir()) / str(tenant_id) / str(timeline_id) / f_partial_saved.name
|
||||
@@ -583,7 +597,7 @@ def test_s3_wal_replay(neon_env_builder: NeonEnvBuilder, remote_storage_kind: Re
|
||||
shutil.copy(f_partial_saved, f_partial_path)
|
||||
|
||||
# recreate timeline on pageserver from scratch
|
||||
ps_cli.timeline_create(
|
||||
ps_http.timeline_create(
|
||||
pg_version=PgVersion(pg_version),
|
||||
tenant_id=tenant_id,
|
||||
new_timeline_id=timeline_id,
|
||||
@@ -598,7 +612,7 @@ def test_s3_wal_replay(neon_env_builder: NeonEnvBuilder, remote_storage_kind: Re
|
||||
if elapsed > wait_lsn_timeout:
|
||||
raise RuntimeError("Timed out waiting for WAL redo")
|
||||
|
||||
tenant_status = ps_cli.tenant_status(tenant_id)
|
||||
tenant_status = ps_http.tenant_status(tenant_id)
|
||||
if tenant_status["state"]["slug"] == "Loading":
|
||||
log.debug(f"Tenant {tenant_id} is still loading, retrying")
|
||||
else:
|
||||
@@ -1001,9 +1015,6 @@ def test_safekeeper_without_pageserver(
|
||||
|
||||
|
||||
def test_replace_safekeeper(neon_env_builder: NeonEnvBuilder):
|
||||
def safekeepers_guc(env: NeonEnv, sk_names: List[int]) -> str:
|
||||
return ",".join([f"localhost:{sk.port.pg}" for sk in env.safekeepers if sk.id in sk_names])
|
||||
|
||||
def execute_payload(endpoint: Endpoint):
|
||||
with closing(endpoint.connect()) as conn:
|
||||
with conn.cursor() as cur:
|
||||
@@ -1032,9 +1043,8 @@ def test_replace_safekeeper(neon_env_builder: NeonEnvBuilder):
|
||||
|
||||
log.info("Use only first 3 safekeepers")
|
||||
env.safekeepers[3].stop()
|
||||
active_safekeepers = [1, 2, 3]
|
||||
endpoint = env.endpoints.create("test_replace_safekeeper")
|
||||
endpoint.adjust_for_safekeepers(safekeepers_guc(env, active_safekeepers))
|
||||
endpoint.active_safekeepers = [1, 2, 3]
|
||||
endpoint.start()
|
||||
|
||||
# learn neon timeline from compute
|
||||
@@ -1072,9 +1082,8 @@ def test_replace_safekeeper(neon_env_builder: NeonEnvBuilder):
|
||||
|
||||
log.info("Recreate postgres to replace failed sk1 with new sk4")
|
||||
endpoint.stop_and_destroy().create("test_replace_safekeeper")
|
||||
active_safekeepers = [2, 3, 4]
|
||||
env.safekeepers[3].start()
|
||||
endpoint.adjust_for_safekeepers(safekeepers_guc(env, active_safekeepers))
|
||||
endpoint.active_safekeepers = [2, 3, 4]
|
||||
endpoint.start()
|
||||
|
||||
execute_payload(endpoint)
|
||||
@@ -1201,6 +1210,10 @@ def test_delete_force(neon_env_builder: NeonEnvBuilder, auth_enabled: bool):
|
||||
with conn.cursor() as cur:
|
||||
cur.execute("INSERT INTO t (key) VALUES (1)")
|
||||
|
||||
# Stop all computes gracefully before safekeepers stop responding to them
|
||||
endpoint_1.stop_and_destroy()
|
||||
endpoint_3.stop_and_destroy()
|
||||
|
||||
# Remove initial tenant's br1 (active)
|
||||
assert sk_http.timeline_delete_force(tenant_id, timeline_id_1)["dir_existed"]
|
||||
assert not (sk_data_dir / str(tenant_id) / str(timeline_id_1)).exists()
|
||||
@@ -1293,9 +1306,8 @@ def test_pull_timeline(neon_env_builder: NeonEnvBuilder):
|
||||
|
||||
log.info("Use only first 3 safekeepers")
|
||||
env.safekeepers[3].stop()
|
||||
active_safekeepers = [1, 2, 3]
|
||||
endpoint = env.endpoints.create("test_pull_timeline")
|
||||
endpoint.adjust_for_safekeepers(safekeepers_guc(env, active_safekeepers))
|
||||
endpoint.active_safekeepers = [1, 2, 3]
|
||||
endpoint.start()
|
||||
|
||||
# learn neon timeline from compute
|
||||
@@ -1332,10 +1344,8 @@ def test_pull_timeline(neon_env_builder: NeonEnvBuilder):
|
||||
show_statuses(env.safekeepers, tenant_id, timeline_id)
|
||||
|
||||
log.info("Restarting compute with new config to verify that it works")
|
||||
active_safekeepers = [1, 3, 4]
|
||||
|
||||
endpoint.stop_and_destroy().create("test_pull_timeline")
|
||||
endpoint.adjust_for_safekeepers(safekeepers_guc(env, active_safekeepers))
|
||||
endpoint.active_safekeepers = [1, 3, 4]
|
||||
endpoint.start()
|
||||
|
||||
execute_payload(endpoint)
|
||||
|
||||
@@ -2,9 +2,11 @@ import asyncio
|
||||
import random
|
||||
import time
|
||||
from dataclasses import dataclass
|
||||
from pathlib import Path
|
||||
from typing import List, Optional
|
||||
|
||||
import asyncpg
|
||||
import toml
|
||||
from fixtures.log_helper import getLogger
|
||||
from fixtures.neon_fixtures import Endpoint, NeonEnv, NeonEnvBuilder, Safekeeper
|
||||
from fixtures.types import Lsn, TenantId, TimelineId
|
||||
@@ -251,7 +253,8 @@ def endpoint_create_start(env: NeonEnv, branch: str, pgdir_name: Optional[str]):
|
||||
endpoint = Endpoint(
|
||||
env,
|
||||
tenant_id=env.initial_tenant,
|
||||
port=env.port_distributor.get_port(),
|
||||
pg_port=env.port_distributor.get_port(),
|
||||
http_port=env.port_distributor.get_port(),
|
||||
# In these tests compute has high probability of terminating on its own
|
||||
# before our stop() due to lost consensus leadership.
|
||||
check_stop_result=False,
|
||||
@@ -536,15 +539,20 @@ def test_race_conditions(neon_env_builder: NeonEnvBuilder):
|
||||
|
||||
# Check that pageserver can select safekeeper with largest commit_lsn
|
||||
# and switch if LSN is not updated for some time (NoWalTimeout).
|
||||
async def run_wal_lagging(env: NeonEnv, endpoint: Endpoint):
|
||||
def safekeepers_guc(env: NeonEnv, active_sk: List[bool]) -> str:
|
||||
# use ports 10, 11 and 12 to simulate unavailable safekeepers
|
||||
return ",".join(
|
||||
[
|
||||
f"localhost:{sk.port.pg if active else 10 + i}"
|
||||
for i, (sk, active) in enumerate(zip(env.safekeepers, active_sk))
|
||||
]
|
||||
)
|
||||
async def run_wal_lagging(env: NeonEnv, endpoint: Endpoint, test_output_dir: Path):
|
||||
def adjust_safekeepers(env: NeonEnv, active_sk: List[bool]):
|
||||
# Change the pg ports of the inactive safekeepers in the config file to be
|
||||
# invalid, to make them unavailable to the endpoint. We use
|
||||
# ports 10, 11 and 12 to simulate unavailable safekeepers.
|
||||
config = toml.load(test_output_dir / "repo" / "config")
|
||||
for i, (sk, active) in enumerate(zip(env.safekeepers, active_sk)):
|
||||
if active:
|
||||
config["safekeepers"][i]["pg_port"] = env.safekeepers[i].port.pg
|
||||
else:
|
||||
config["safekeepers"][i]["pg_port"] = 10 + i
|
||||
|
||||
with open(test_output_dir / "repo" / "config", "w") as f:
|
||||
toml.dump(config, f)
|
||||
|
||||
conn = await endpoint.connect_async()
|
||||
await conn.execute("CREATE TABLE t(key int primary key, value text)")
|
||||
@@ -565,7 +573,7 @@ async def run_wal_lagging(env: NeonEnv, endpoint: Endpoint):
|
||||
it -= 1
|
||||
continue
|
||||
|
||||
endpoint.adjust_for_safekeepers(safekeepers_guc(env, active_sk))
|
||||
adjust_safekeepers(env, active_sk)
|
||||
log.info(f"Iteration {it}: {active_sk}")
|
||||
|
||||
endpoint.start()
|
||||
@@ -579,7 +587,7 @@ async def run_wal_lagging(env: NeonEnv, endpoint: Endpoint):
|
||||
await conn.close()
|
||||
endpoint.stop()
|
||||
|
||||
endpoint.adjust_for_safekeepers(safekeepers_guc(env, [True] * len(env.safekeepers)))
|
||||
adjust_safekeepers(env, [True] * len(env.safekeepers))
|
||||
endpoint.start()
|
||||
conn = await endpoint.connect_async()
|
||||
|
||||
@@ -590,11 +598,11 @@ async def run_wal_lagging(env: NeonEnv, endpoint: Endpoint):
|
||||
|
||||
|
||||
# do inserts while restarting postgres and messing with safekeeper addresses
|
||||
def test_wal_lagging(neon_env_builder: NeonEnvBuilder):
|
||||
def test_wal_lagging(neon_env_builder: NeonEnvBuilder, test_output_dir: Path):
|
||||
neon_env_builder.num_safekeepers = 3
|
||||
env = neon_env_builder.init_start()
|
||||
|
||||
env.neon_cli.create_branch("test_wal_lagging")
|
||||
endpoint = env.endpoints.create_start("test_wal_lagging")
|
||||
|
||||
asyncio.run(run_wal_lagging(env, endpoint))
|
||||
asyncio.run(run_wal_lagging(env, endpoint, test_output_dir))
|
||||
|
||||
@@ -1,3 +1,5 @@
|
||||
import time
|
||||
|
||||
from fixtures.log_helper import log
|
||||
from fixtures.neon_fixtures import NeonEnv, NeonEnvBuilder
|
||||
from fixtures.types import Lsn, TenantId
|
||||
@@ -40,7 +42,10 @@ def test_pageserver_lsn_wait_error_start(neon_env_builder: NeonEnvBuilder):
|
||||
# Kills one of the safekeepers and ensures that only the active ones are printed in the state.
|
||||
def test_pageserver_lsn_wait_error_safekeeper_stop(neon_env_builder: NeonEnvBuilder):
|
||||
# Trigger WAL wait timeout faster
|
||||
neon_env_builder.pageserver_config_override = "wait_lsn_timeout = '1s'"
|
||||
neon_env_builder.pageserver_config_override = """
|
||||
wait_lsn_timeout = "1s"
|
||||
tenant_config={walreceiver_connect_timeout = "2s", lagging_wal_timeout = "2s"}
|
||||
"""
|
||||
# Have notable SK ids to ensure we check logs for their presence, not some other random numbers
|
||||
neon_env_builder.safekeepers_id_start = 12345
|
||||
neon_env_builder.num_safekeepers = 3
|
||||
@@ -70,6 +75,8 @@ def test_pageserver_lsn_wait_error_safekeeper_stop(neon_env_builder: NeonEnvBuil
|
||||
stopped_safekeeper_id = stopped_safekeeper.id
|
||||
log.info(f"Stopping safekeeper {stopped_safekeeper.id}")
|
||||
stopped_safekeeper.stop()
|
||||
# sleep until stopped safekeeper is removed from candidates
|
||||
time.sleep(2)
|
||||
|
||||
# Spend some more time inserting, to ensure SKs report updated statuses and walreceiver in PS have time to update its connection stats.
|
||||
insert_test_elements(env, tenant_id, start=elements_to_insert + 1, count=elements_to_insert)
|
||||
@@ -77,7 +84,8 @@ def test_pageserver_lsn_wait_error_safekeeper_stop(neon_env_builder: NeonEnvBuil
|
||||
try:
|
||||
trigger_wait_lsn_timeout(env, tenant_id)
|
||||
except Exception as e:
|
||||
exception_string = str(e)
|
||||
# Strip out the part before stdout, as it contains full command with the list of all safekeepers
|
||||
exception_string = str(e).split("stdout", 1)[-1]
|
||||
assert expected_timeout_error in exception_string, "Should time out during waiting for WAL"
|
||||
|
||||
for safekeeper in env.safekeepers:
|
||||
|
||||
@@ -83,6 +83,9 @@ def test_walredo_not_left_behind_on_detach(neon_env_builder: NeonEnvBuilder):
|
||||
# XXX this is quite brittle as the lifecycle of the WAL redo process is an implementation detail
|
||||
assert_child_processes(pagserver_pid, wal_redo_present=True, defunct_present=False)
|
||||
|
||||
# Stop the compute before detaching, to avoid errors in the log.
|
||||
endpoint.stop()
|
||||
|
||||
last_error = None
|
||||
for i in range(3):
|
||||
try:
|
||||
|
||||
Reference in New Issue
Block a user