Merge remote-tracking branch 'origin/main' into problame/integrate-tokio-epoll-uring/wip

This commit is contained in:
Christian Schwarz
2024-01-11 17:24:10 +00:00
195 changed files with 12854 additions and 3918 deletions

View File

@@ -40,6 +40,7 @@ from psycopg2.extensions import make_dsn, parse_dsn
from typing_extensions import Literal
from urllib3.util.retry import Retry
from fixtures import overlayfs
from fixtures.broker import NeonBroker
from fixtures.log_helper import log
from fixtures.pageserver.allowed_errors import (
@@ -347,7 +348,9 @@ class PgProtocol:
"""
return self.safe_psql_many([query], **kwargs)[0]
def safe_psql_many(self, queries: List[str], **kwargs: Any) -> List[List[Tuple[Any, ...]]]:
def safe_psql_many(
self, queries: List[str], log_query=True, **kwargs: Any
) -> List[List[Tuple[Any, ...]]]:
"""
Execute queries against the node and return all rows.
This method passes all extra params to connstr.
@@ -356,7 +359,8 @@ class PgProtocol:
with closing(self.connect(**kwargs)) as conn:
with conn.cursor() as cur:
for query in queries:
log.info(f"Executing query: {query}")
if log_query:
log.info(f"Executing query: {query}")
cur.execute(query)
if cur.description is None:
@@ -365,6 +369,12 @@ class PgProtocol:
result.append(cur.fetchall())
return result
def safe_psql_scalar(self, query, log_query=True) -> Any:
"""
Execute query returning single row with single column.
"""
return self.safe_psql(query, log_query=log_query)[0][0]
@dataclass
class AuthKeys:
@@ -415,6 +425,7 @@ class NeonEnvBuilder:
pg_version: PgVersion,
test_name: str,
test_output_dir: Path,
test_overlay_dir: Optional[Path] = None,
pageserver_remote_storage: Optional[RemoteStorage] = None,
pageserver_config_override: Optional[str] = None,
num_safekeepers: int = 1,
@@ -460,6 +471,8 @@ class NeonEnvBuilder:
self.initial_timeline = initial_timeline or TimelineId.generate()
self.scrub_on_exit = False
self.test_output_dir = test_output_dir
self.test_overlay_dir = test_overlay_dir
self.overlay_mounts_created_by_us: List[Tuple[str, Path]] = []
self.pageserver_virtual_file_io_engine: Optional[str] = pageserver_virtual_file_io_engine
@@ -541,7 +554,10 @@ class NeonEnvBuilder:
tenants_to_dir = self.repo_dir / ps_dir.name / "tenants"
log.info(f"Copying pageserver tenants directory {tenants_from_dir} to {tenants_to_dir}")
shutil.copytree(tenants_from_dir, tenants_to_dir)
if self.test_overlay_dir is None:
shutil.copytree(tenants_from_dir, tenants_to_dir)
else:
self.overlay_mount(f"{ps_dir.name}:tenants", tenants_from_dir, tenants_to_dir)
for sk_from_dir in (repo_dir / "safekeepers").glob("sk*"):
sk_to_dir = self.repo_dir / "safekeepers" / sk_from_dir.name
@@ -550,9 +566,16 @@ class NeonEnvBuilder:
shutil.copytree(sk_from_dir, sk_to_dir, ignore=shutil.ignore_patterns("*.log", "*.pid"))
shutil.rmtree(self.repo_dir / "local_fs_remote_storage", ignore_errors=True)
shutil.copytree(
repo_dir / "local_fs_remote_storage", self.repo_dir / "local_fs_remote_storage"
)
if self.test_overlay_dir is None:
shutil.copytree(
repo_dir / "local_fs_remote_storage", self.repo_dir / "local_fs_remote_storage"
)
else:
self.overlay_mount(
"local_fs_remote_storage",
repo_dir / "local_fs_remote_storage",
self.repo_dir / "local_fs_remote_storage",
)
if (attachments_json := Path(repo_dir / "attachments.json")).exists():
shutil.copyfile(attachments_json, self.repo_dir / attachments_json.name)
@@ -569,6 +592,69 @@ class NeonEnvBuilder:
return self.env
def overlay_mount(self, ident: str, srcdir: Path, dstdir: Path):
"""
Mount `srcdir` as an overlayfs mount at `dstdir`.
The overlayfs `upperdir` and `workdir` will be placed in test_overlay_dir.
"""
assert self.test_overlay_dir
assert (
self.test_output_dir in dstdir.parents
) # so that teardown & test_overlay_dir fixture work
assert srcdir.is_dir()
dstdir.mkdir(exist_ok=False, parents=False)
ident_state_dir = self.test_overlay_dir / ident
upper = ident_state_dir / "upper"
work = ident_state_dir / "work"
ident_state_dir.mkdir(
exist_ok=False, parents=False
) # exists_ok=False also checks uniqueness in self.overlay_mounts
upper.mkdir()
work.mkdir()
cmd = [
"sudo",
"mount",
"-t",
"overlay",
"overlay",
"-o",
f"lowerdir={srcdir},upperdir={upper},workdir={work}",
str(dstdir),
]
log.info(f"Mounting overlayfs srcdir={srcdir} dstdir={dstdir}: {cmd}")
subprocess_capture(
self.test_output_dir, cmd, check=True, echo_stderr=True, echo_stdout=True
)
self.overlay_mounts_created_by_us.append((ident, dstdir))
def overlay_cleanup_teardown(self):
"""
Unmount the overlayfs mounts created by `self.overlay_mount()`.
Supposed to be called during env teardown.
"""
if self.test_overlay_dir is None:
return
while len(self.overlay_mounts_created_by_us) > 0:
(ident, mountpoint) = self.overlay_mounts_created_by_us.pop()
ident_state_dir = self.test_overlay_dir / ident
cmd = ["sudo", "umount", str(mountpoint)]
log.info(
f"Unmounting overlayfs mount created during setup for ident {ident} at {mountpoint}: {cmd}"
)
subprocess_capture(
self.test_output_dir, cmd, check=True, echo_stderr=True, echo_stdout=True
)
log.info(
f"Cleaning up overlayfs state dir (owned by root user) for ident {ident} at {ident_state_dir}"
)
cmd = ["sudo", "rm", "-rf", str(ident_state_dir)]
subprocess_capture(
self.test_output_dir, cmd, check=True, echo_stderr=True, echo_stdout=True
)
# assert all overlayfs mounts in our test directory are gone
assert [] == list(overlayfs.iter_mounts_beneath(self.test_overlay_dir))
def enable_scrub_on_exit(self):
"""
Call this if you would like the fixture to automatically run
@@ -675,7 +761,10 @@ class NeonEnvBuilder:
sk.stop(immediate=True)
for pageserver in self.env.pageservers:
pageserver.assert_no_metric_errors()
# if the test threw an exception, don't check for errors
# as a failing assertion would cause the cleanup below to fail
if exc_type is not None:
pageserver.assert_no_metric_errors()
pageserver.stop(immediate=True)
@@ -690,6 +779,13 @@ class NeonEnvBuilder:
log.error(f"Error during remote storage scrub: {e}")
cleanup_error = e
try:
self.overlay_cleanup_teardown()
except Exception as e:
log.error(f"Error cleaning up overlay state: {e}")
if cleanup_error is not None:
cleanup_error = e
try:
self.cleanup_remote_storage()
except Exception as e:
@@ -892,8 +988,8 @@ class NeonEnv:
"""Get list of safekeeper endpoints suitable for safekeepers GUC"""
return ",".join(f"localhost:{wa.port.pg}" for wa in self.safekeepers)
def get_pageserver_version(self) -> str:
bin_pageserver = str(self.neon_binpath / "pageserver")
def get_binary_version(self, binary_name: str) -> str:
bin_pageserver = str(self.neon_binpath / binary_name)
res = subprocess.run(
[bin_pageserver, "--version"],
check=True,
@@ -1018,6 +1114,7 @@ def neon_env_builder(
default_broker: NeonBroker,
run_id: uuid.UUID,
request: FixtureRequest,
test_overlay_dir: Path,
pageserver_virtual_file_io_engine: str,
) -> Iterator[NeonEnvBuilder]:
"""
@@ -1050,6 +1147,7 @@ def neon_env_builder(
pageserver_virtual_file_io_engine=pageserver_virtual_file_io_engine,
test_name=request.node.name,
test_output_dir=test_output_dir,
test_overlay_dir=test_overlay_dir,
) as builder:
yield builder
@@ -1104,8 +1202,8 @@ class AbstractNeonCli(abc.ABC):
If `local_binpath` is true, then we are invoking a test utility
"""
assert type(arguments) == list
assert type(self.COMMAND) == str
assert isinstance(arguments, list)
assert isinstance(self.COMMAND, str)
if local_binpath:
# Test utility
@@ -1662,7 +1760,7 @@ class NeonPageserver(PgProtocol):
self.running = False
self.service_port = port
self.config_override = config_override
self.version = env.get_pageserver_version()
self.version = env.get_binary_version("pageserver")
# After a test finishes, we will scrape the log to see if there are any
# unexpected error messages. If your test expects an error, add it to
@@ -1831,18 +1929,24 @@ class NeonPageserver(PgProtocol):
return None
def tenant_attach(
self, tenant_id: TenantId, config: None | Dict[str, Any] = None, config_null: bool = False
self,
tenant_id: TenantId,
config: None | Dict[str, Any] = None,
config_null: bool = False,
generation: Optional[int] = None,
):
"""
Tenant attachment passes through here to acquire a generation number before proceeding
to call into the pageserver HTTP client.
"""
client = self.http_client()
if generation is None:
generation = self.env.attachment_service.attach_hook_issue(tenant_id, self.id)
return client.tenant_attach(
tenant_id,
config,
config_null,
generation=self.env.attachment_service.attach_hook_issue(tenant_id, self.id),
generation=generation,
)
def tenant_detach(self, tenant_id: TenantId):
@@ -2745,6 +2849,13 @@ class Endpoint(PgProtocol):
):
self.stop()
# Checkpoints running endpoint and returns pg_wal size in MB.
def get_pg_wal_size(self):
log.info(f'checkpointing at LSN {self.safe_psql("select pg_current_wal_lsn()")[0][0]}')
self.safe_psql("checkpoint")
assert self.pgdata_dir is not None # please mypy
return get_dir_size(os.path.join(self.pgdata_dir, "pg_wal")) / 1024 / 1024
class EndpointFactory:
"""An object representing multiple compute endpoints."""
@@ -2923,7 +3034,10 @@ class Safekeeper:
return res
def http_client(self, auth_token: Optional[str] = None) -> SafekeeperHttpClient:
return SafekeeperHttpClient(port=self.port.http, auth_token=auth_token)
is_testing_enabled = '"testing"' in self.env.get_binary_version("safekeeper")
return SafekeeperHttpClient(
port=self.port.http, auth_token=auth_token, is_testing_enabled=is_testing_enabled
)
def data_dir(self) -> str:
return os.path.join(self.env.repo_dir, "safekeepers", f"sk{self.id}")
@@ -2943,6 +3057,13 @@ class Safekeeper:
return segments
# Walreceiver as returned by sk's timeline status endpoint.
@dataclass
class Walreceiver:
conn_id: int
state: str
@dataclass
class SafekeeperTimelineStatus:
acceptor_epoch: int
@@ -2953,6 +3074,7 @@ class SafekeeperTimelineStatus:
backup_lsn: Lsn
peer_horizon_lsn: Lsn
remote_consistent_lsn: Lsn
walreceivers: List[Walreceiver]
@dataclass
@@ -2966,10 +3088,11 @@ class SafekeeperMetrics:
class SafekeeperHttpClient(requests.Session):
HTTPError = requests.HTTPError
def __init__(self, port: int, auth_token: Optional[str] = None):
def __init__(self, port: int, auth_token: Optional[str] = None, is_testing_enabled=False):
super().__init__()
self.port = port
self.auth_token = auth_token
self.is_testing_enabled = is_testing_enabled
if auth_token is not None:
self.headers["Authorization"] = f"Bearer {auth_token}"
@@ -2977,6 +3100,30 @@ class SafekeeperHttpClient(requests.Session):
def check_status(self):
self.get(f"http://localhost:{self.port}/v1/status").raise_for_status()
def is_testing_enabled_or_skip(self):
if not self.is_testing_enabled:
pytest.skip("safekeeper was built without 'testing' feature")
def configure_failpoints(self, config_strings: Tuple[str, str] | List[Tuple[str, str]]):
self.is_testing_enabled_or_skip()
if isinstance(config_strings, tuple):
pairs = [config_strings]
else:
pairs = config_strings
log.info(f"Requesting config failpoints: {repr(pairs)}")
res = self.put(
f"http://localhost:{self.port}/v1/failpoints",
json=[{"name": name, "actions": actions} for name, actions in pairs],
)
log.info(f"Got failpoints request response code {res.status_code}")
res.raise_for_status()
res_json = res.json()
assert res_json is None
return res_json
def debug_dump(self, params: Optional[Dict[str, str]] = None) -> Dict[str, Any]:
params = params or {}
res = self.get(f"http://localhost:{self.port}/v1/debug_dump", params=params)
@@ -2992,6 +3139,28 @@ class SafekeeperHttpClient(requests.Session):
assert isinstance(res_json, dict)
return res_json
def copy_timeline(self, tenant_id: TenantId, timeline_id: TimelineId, body: Dict[str, Any]):
res = self.post(
f"http://localhost:{self.port}/v1/tenant/{tenant_id}/timeline/{timeline_id}/copy",
json=body,
)
res.raise_for_status()
def timeline_digest(
self, tenant_id: TenantId, timeline_id: TimelineId, from_lsn: Lsn, until_lsn: Lsn
) -> Dict[str, Any]:
res = self.get(
f"http://localhost:{self.port}/v1/tenant/{tenant_id}/timeline/{timeline_id}/digest",
params={
"from_lsn": str(from_lsn),
"until_lsn": str(until_lsn),
},
)
res.raise_for_status()
res_json = res.json()
assert isinstance(res_json, dict)
return res_json
def timeline_create(
self,
tenant_id: TenantId,
@@ -3014,6 +3183,7 @@ class SafekeeperHttpClient(requests.Session):
res = self.get(f"http://localhost:{self.port}/v1/tenant/{tenant_id}/timeline/{timeline_id}")
res.raise_for_status()
resj = res.json()
walreceivers = [Walreceiver(wr["conn_id"], wr["status"]) for wr in resj["walreceivers"]]
return SafekeeperTimelineStatus(
acceptor_epoch=resj["acceptor_state"]["epoch"],
pg_version=resj["pg_info"]["pg_version"],
@@ -3023,6 +3193,7 @@ class SafekeeperHttpClient(requests.Session):
backup_lsn=Lsn(resj["backup_lsn"]),
peer_horizon_lsn=Lsn(resj["peer_horizon_lsn"]),
remote_consistent_lsn=Lsn(resj["remote_consistent_lsn"]),
walreceivers=walreceivers,
)
def record_safekeeper_info(self, tenant_id: TenantId, timeline_id: TimelineId, body):
@@ -3130,10 +3301,10 @@ class S3Scrubber:
raise
def get_test_output_dir(request: FixtureRequest, top_output_dir: Path) -> Path:
"""Compute the working directory for an individual test."""
def _get_test_dir(request: FixtureRequest, top_output_dir: Path, prefix: str) -> Path:
"""Compute the path to a working directory for an individual test."""
test_name = request.node.name
test_dir = top_output_dir / test_name.replace("/", "-")
test_dir = top_output_dir / f"{prefix}{test_name.replace('/', '-')}"
# We rerun flaky tests multiple times, use a separate directory for each run.
if (suffix := getattr(request.node, "execution_count", None)) is not None:
@@ -3145,6 +3316,21 @@ def get_test_output_dir(request: FixtureRequest, top_output_dir: Path) -> Path:
return test_dir
def get_test_output_dir(request: FixtureRequest, top_output_dir: Path) -> Path:
"""
The working directory for a test.
"""
return _get_test_dir(request, top_output_dir, "")
def get_test_overlay_dir(request: FixtureRequest, top_output_dir: Path) -> Path:
"""
Directory that contains `upperdir` and `workdir` for overlayfs mounts
that a test creates. See `NeonEnvBuilder.overlay_mount`.
"""
return _get_test_dir(request, top_output_dir, "overlay-")
def get_test_repo_dir(request: FixtureRequest, top_output_dir: Path) -> Path:
return get_test_output_dir(request, top_output_dir) / "repo"
@@ -3172,8 +3358,12 @@ SMALL_DB_FILE_NAME_REGEX: re.Pattern = re.compile( # type: ignore[type-arg]
# scope. So it uses the get_test_output_dir() function to get the path, and
# this fixture ensures that the directory exists. That works because
# 'autouse' fixtures are run before other fixtures.
#
# NB: we request the overlay dir fixture so the fixture does its cleanups
@pytest.fixture(scope="function", autouse=True)
def test_output_dir(request: FixtureRequest, top_output_dir: Path) -> Iterator[Path]:
def test_output_dir(
request: FixtureRequest, top_output_dir: Path, test_overlay_dir: Path
) -> Iterator[Path]:
"""Create the working directory for an individual test."""
# one directory per test
@@ -3187,6 +3377,43 @@ def test_output_dir(request: FixtureRequest, top_output_dir: Path) -> Iterator[P
allure_attach_from_dir(test_dir)
@pytest.fixture(scope="function")
def test_overlay_dir(request: FixtureRequest, top_output_dir: Path) -> Optional[Path]:
"""
Idempotently create a test's overlayfs mount state directory.
If the functionality isn't enabled via env var, returns None.
The procedure cleans up after previous runs that were aborted (e.g. due to Ctrl-C, OOM kills, etc).
"""
if os.getenv("NEON_ENV_BUILDER_FROM_REPO_DIR_USE_OVERLAYFS") is None:
return None
overlay_dir = get_test_overlay_dir(request, top_output_dir)
log.info(f"test_overlay_dir is {overlay_dir}")
overlay_dir.mkdir(exist_ok=True)
# unmount stale overlayfs mounts which subdirectories of `overlay_dir/*` as the overlayfs `upperdir` and `workdir`
for mountpoint in overlayfs.iter_mounts_beneath(get_test_output_dir(request, top_output_dir)):
cmd = ["sudo", "umount", str(mountpoint)]
log.info(
f"Unmounting stale overlayfs mount probably created during earlier test run: {cmd}"
)
subprocess.run(cmd, capture_output=True, check=True)
# the overlayfs `workdir`` is owned by `root`, shutil.rmtree won't work.
cmd = ["sudo", "rm", "-rf", str(overlay_dir)]
subprocess.run(cmd, capture_output=True, check=True)
overlay_dir.mkdir()
return overlay_dir
# no need to clean up anything: on clean shutdown,
# NeonEnvBuilder.overlay_cleanup_teardown takes care of cleanup
# and on unclean shutdown, this function will take care of it
# on the next test run
SKIP_DIRS = frozenset(
(
"pg_wal",

View File

@@ -0,0 +1,16 @@
from pathlib import Path
from typing import Iterator
import psutil
def iter_mounts_beneath(topdir: Path) -> Iterator[Path]:
"""
Iterate over the overlayfs mounts beneath the specififed `topdir`.
The `topdir` itself isn't considered.
"""
for part in psutil.disk_partitions(all=True):
if part.fstype == "overlay":
mountpoint = Path(part.mountpoint)
if topdir in mountpoint.parents:
yield mountpoint

View File

@@ -326,6 +326,10 @@ class PageserverHttpClient(requests.Session):
res = self.post(f"http://localhost:{self.port}/v1/tenant/{tenant_id}/heatmap_upload")
self.verbose_error(res)
def tenant_secondary_download(self, tenant_id: TenantId):
res = self.post(f"http://localhost:{self.port}/v1/tenant/{tenant_id}/secondary/download")
self.verbose_error(res)
def set_tenant_config(self, tenant_id: TenantId, config: dict[str, Any]):
assert "tenant_id" not in config.keys()
res = self.put(
@@ -361,9 +365,9 @@ class PageserverHttpClient(requests.Session):
assert isinstance(res, dict)
assert TenantId(res["id"]) == tenant_id
size = res["size"]
assert type(size) == int
assert isinstance(size, int)
inputs = res["inputs"]
assert type(inputs) is dict
assert isinstance(inputs, dict)
return (size, inputs)
def tenant_size_debug(self, tenant_id: TenantId) -> str:
@@ -437,6 +441,7 @@ class PageserverHttpClient(requests.Session):
timeline_id: TimelineId,
include_non_incremental_logical_size: bool = False,
include_timeline_dir_layer_file_size_sum: bool = False,
force_await_initial_logical_size: bool = False,
**kwargs,
) -> Dict[Any, Any]:
params = {}
@@ -444,6 +449,8 @@ class PageserverHttpClient(requests.Session):
params["include-non-incremental-logical-size"] = "true"
if include_timeline_dir_layer_file_size_sum:
params["include-timeline-dir-layer-file-size-sum"] = "true"
if force_await_initial_logical_size:
params["force-await-initial-logical-size"] = "true"
res = self.get(
f"http://localhost:{self.port}/v1/tenant/{tenant_id}/timeline/{timeline_id}",
@@ -714,7 +721,7 @@ class PageserverHttpClient(requests.Session):
)
self.verbose_error(res)
assert res.status_code == 200
assert res.status_code in (200, 304)
def evict_all_layers(self, tenant_id: TenantId, timeline_id: TimelineId):
info = self.layer_map_info(tenant_id, timeline_id)

View File

@@ -42,9 +42,10 @@ def test_clickbench_create_pg_stat_statements(remote_compare: RemoteCompare):
# Please do not alter the label for the query, as it is used to identify it.
# Labels for ClickBench queries match the labels in ClickBench reports
# on https://benchmark.clickhouse.com/ (the DB size may differ).
#
# Disable auto formatting for the list of queries so that it's easier to read
# fmt: off
QUERIES: Tuple[LabelledQuery, ...] = (
# Disable `black` formatting for the list of queries so that it's easier to read
# fmt: off
### ClickBench queries:
LabelledQuery("Q0", r"SELECT COUNT(*) FROM hits;"),
LabelledQuery("Q1", r"SELECT COUNT(*) FROM hits WHERE AdvEngineID <> 0;"),
@@ -96,8 +97,8 @@ QUERIES: Tuple[LabelledQuery, ...] = (
# LabelledQuery("NQ0", r"..."),
# LabelledQuery("NQ1", r"..."),
# ...
# fmt: on
)
# fmt: on
EXPLAIN_STRING: str = "EXPLAIN (ANALYZE, VERBOSE, BUFFERS, COSTS, SETTINGS, FORMAT JSON)"
@@ -151,7 +152,9 @@ def test_clickbench(query: LabelledQuery, remote_compare: RemoteCompare, scale:
An OLAP-style ClickHouse benchmark
Based on https://github.com/ClickHouse/ClickBench/tree/c00135ca5b6a0d86fedcdbf998fdaa8ed85c1c3b/aurora-postgresql
The DB prepared manually in advance
The DB prepared manually in advance.
Important: after intial data load, run `VACUUM (DISABLE_PAGE_SKIPPING, FREEZE, ANALYZE) hits;`
to ensure that Postgres optimizer chooses the same plans as RDS and Aurora.
"""
explain: bool = os.getenv("TEST_OLAP_COLLECT_EXPLAIN", "false").lower() == "true"

View File

@@ -32,8 +32,7 @@ def pg_compare(request) -> PgCompare:
else:
assert (
len(x) == 2
), f"request param ({request.param}) should have a format of \
`neon_{{safekeepers_enable_fsync}}`"
), f"request param ({request.param}) should have a format of `neon_{{safekeepers_enable_fsync}}`"
# `NeonCompare` interface
neon_env_builder = request.getfixturevalue("neon_env_builder")

View File

@@ -194,12 +194,13 @@ def test_fully_custom_config(positive_env: NeonEnv):
assert set(our_tenant_config.effective_config.keys()) == set(
fully_custom_config.keys()
), "ensure we cover all config options"
assert {
k: initial_tenant_config.effective_config[k] != our_tenant_config.effective_config[k]
for k in fully_custom_config.keys()
} == {
k: True for k in fully_custom_config.keys()
}, "ensure our custom config has different values than the default config for all config options, so we know we overrode everything"
assert (
{
k: initial_tenant_config.effective_config[k] != our_tenant_config.effective_config[k]
for k in fully_custom_config.keys()
}
== {k: True for k in fully_custom_config.keys()}
), "ensure our custom config has different values than the default config for all config options, so we know we overrode everything"
ps_http.tenant_detach(tenant_id)
env.pageserver.tenant_attach(tenant_id, config=fully_custom_config)

View File

@@ -186,9 +186,7 @@ def test_backward_compatibility(
else:
raise
assert (
not breaking_changes_allowed
), "Breaking changes are allowed by ALLOW_BACKWARD_COMPATIBILITY_BREAKAGE, but the test has passed without any breakage"
assert not breaking_changes_allowed, "Breaking changes are allowed by ALLOW_BACKWARD_COMPATIBILITY_BREAKAGE, but the test has passed without any breakage"
@check_ondisk_data_compatibility_if_enabled
@@ -247,9 +245,7 @@ def test_forward_compatibility(
else:
raise
assert (
not breaking_changes_allowed
), "Breaking changes are allowed by ALLOW_FORWARD_COMPATIBILITY_BREAKAGE, but the test has passed without any breakage"
assert not breaking_changes_allowed, "Breaking changes are allowed by ALLOW_FORWARD_COMPATIBILITY_BREAKAGE, but the test has passed without any breakage"
def check_neon_works(env: NeonEnv, test_output_dir: Path, sql_dump_path: Path, repo_dir: Path):

View File

@@ -2,7 +2,6 @@ import pytest
from fixtures.log_helper import log
from fixtures.neon_fixtures import NeonEnvBuilder, WalCraft
# Restart nodes with WAL end having specially crafted shape, like last record
# crossing segment boundary, to test decoding issues.

View File

@@ -1,6 +1,7 @@
import enum
import time
from dataclasses import dataclass
from typing import Dict, Tuple
from typing import Any, Dict, Tuple
import pytest
import toml
@@ -64,6 +65,23 @@ def test_min_resident_size_override_handling(
assert_config(tenant_id, None, config_level_override)
@enum.unique
class EvictionOrder(str, enum.Enum):
ABSOLUTE_ORDER = "absolute"
RELATIVE_ORDER_EQUAL = "relative_equal"
RELATIVE_ORDER_SPARE = "relative_spare"
def config(self) -> Dict[str, Any]:
if self == EvictionOrder.ABSOLUTE_ORDER:
return {"type": "AbsoluteAccessed"}
elif self == EvictionOrder.RELATIVE_ORDER_EQUAL:
return {"type": "RelativeAccessed", "args": {"highest_layer_count_loses_first": False}}
elif self == EvictionOrder.RELATIVE_ORDER_SPARE:
return {"type": "RelativeAccessed", "args": {"highest_layer_count_loses_first": True}}
else:
raise RuntimeError(f"not implemented: {self}")
@dataclass
class EvictionEnv:
timelines: list[Tuple[TenantId, TimelineId]]
@@ -108,13 +126,14 @@ class EvictionEnv:
_avg = cur.fetchone()
def pageserver_start_with_disk_usage_eviction(
self, period, max_usage_pct, min_avail_bytes, mock_behavior
self, period, max_usage_pct, min_avail_bytes, mock_behavior, eviction_order: EvictionOrder
):
disk_usage_config = {
"period": period,
"max_usage_pct": max_usage_pct,
"min_avail_bytes": min_avail_bytes,
"mock_statvfs": mock_behavior,
"eviction_order": eviction_order.config(),
}
enc = toml.TomlEncoder()
@@ -270,7 +289,13 @@ def test_broken_tenants_are_skipped(eviction_env: EvictionEnv):
env.neon_env.pageserver.allowed_errors.append(".*" + GLOBAL_LRU_LOG_LINE)
def test_pageserver_evicts_until_pressure_is_relieved(eviction_env: EvictionEnv):
@pytest.mark.parametrize(
"order",
[EvictionOrder.ABSOLUTE_ORDER, EvictionOrder.RELATIVE_ORDER_EQUAL],
)
def test_pageserver_evicts_until_pressure_is_relieved(
eviction_env: EvictionEnv, order: EvictionOrder
):
"""
Basic test to ensure that we evict enough to relieve pressure.
"""
@@ -281,7 +306,9 @@ def test_pageserver_evicts_until_pressure_is_relieved(eviction_env: EvictionEnv)
target = total_on_disk // 2
response = pageserver_http.disk_usage_eviction_run({"evict_bytes": target})
response = pageserver_http.disk_usage_eviction_run(
{"evict_bytes": target, "eviction_order": order.config()}
)
log.info(f"{response}")
(later_total_on_disk, _, _) = env.timelines_du()
@@ -296,7 +323,13 @@ def test_pageserver_evicts_until_pressure_is_relieved(eviction_env: EvictionEnv)
assert response["Finished"]["assumed"]["failed"]["count"] == 0, "zero failures expected"
def test_pageserver_respects_overridden_resident_size(eviction_env: EvictionEnv):
@pytest.mark.parametrize(
"order",
[EvictionOrder.ABSOLUTE_ORDER, EvictionOrder.RELATIVE_ORDER_EQUAL],
)
def test_pageserver_respects_overridden_resident_size(
eviction_env: EvictionEnv, order: EvictionOrder
):
"""
Override tenant min resident and ensure that it will be respected by eviction.
"""
@@ -336,7 +369,9 @@ def test_pageserver_respects_overridden_resident_size(eviction_env: EvictionEnv)
env.warm_up_tenant(large_tenant[0])
# do one run
response = ps_http.disk_usage_eviction_run({"evict_bytes": target})
response = ps_http.disk_usage_eviction_run(
{"evict_bytes": target, "eviction_order": order.config()}
)
log.info(f"{response}")
time.sleep(1) # give log time to flush
@@ -365,7 +400,11 @@ def test_pageserver_respects_overridden_resident_size(eviction_env: EvictionEnv)
assert du_by_timeline[large_tenant] - later_du_by_timeline[large_tenant] >= target
def test_pageserver_falls_back_to_global_lru(eviction_env: EvictionEnv):
@pytest.mark.parametrize(
"order",
[EvictionOrder.ABSOLUTE_ORDER, EvictionOrder.RELATIVE_ORDER_EQUAL],
)
def test_pageserver_falls_back_to_global_lru(eviction_env: EvictionEnv, order: EvictionOrder):
"""
If we can't relieve pressure using tenant_min_resident_size-respecting eviction,
we should continue to evict layers following global LRU.
@@ -376,7 +415,9 @@ def test_pageserver_falls_back_to_global_lru(eviction_env: EvictionEnv):
(total_on_disk, _, _) = env.timelines_du()
target = total_on_disk
response = ps_http.disk_usage_eviction_run({"evict_bytes": target})
response = ps_http.disk_usage_eviction_run(
{"evict_bytes": target, "eviction_order": order.config()}
)
log.info(f"{response}")
(later_total_on_disk, _, _) = env.timelines_du()
@@ -389,7 +430,15 @@ def test_pageserver_falls_back_to_global_lru(eviction_env: EvictionEnv):
env.neon_env.pageserver.allowed_errors.append(".*" + GLOBAL_LRU_LOG_LINE)
def test_partial_evict_tenant(eviction_env: EvictionEnv):
@pytest.mark.parametrize(
"order",
[
EvictionOrder.ABSOLUTE_ORDER,
EvictionOrder.RELATIVE_ORDER_EQUAL,
EvictionOrder.RELATIVE_ORDER_SPARE,
],
)
def test_partial_evict_tenant(eviction_env: EvictionEnv, order: EvictionOrder):
"""
Warm up a tenant, then build up pressure to cause in evictions in both.
We expect
@@ -402,7 +451,7 @@ def test_partial_evict_tenant(eviction_env: EvictionEnv):
(total_on_disk, _, _) = env.timelines_du()
du_by_timeline = env.du_by_timeline()
# pick any tenant
# pick smaller or greater (iteration order is insertion order of scale=4 and scale=6)
[warm, cold] = list(du_by_timeline.keys())
(tenant_id, timeline_id) = warm
@@ -413,7 +462,9 @@ def test_partial_evict_tenant(eviction_env: EvictionEnv):
# but not enough to fall into global LRU.
# So, set target to all occupied space, except 2*env.layer_size per tenant
target = du_by_timeline[cold] + (du_by_timeline[warm] // 2) - 2 * 2 * env.layer_size
response = ps_http.disk_usage_eviction_run({"evict_bytes": target})
response = ps_http.disk_usage_eviction_run(
{"evict_bytes": target, "eviction_order": order.config()}
)
log.info(f"{response}")
(later_total_on_disk, _, _) = env.timelines_du()
@@ -428,28 +479,32 @@ def test_partial_evict_tenant(eviction_env: EvictionEnv):
), "all tenants should have lost some layers"
warm_size = later_du_by_timeline[warm]
# bounds for warmed_size
warm_lower = 0.5 * du_by_timeline[warm]
# We don't know exactly whether the cold tenant needs 2 or just 1 env.layer_size wiggle room.
# So, check for up to 3 here.
warm_upper = warm_lower + 3 * env.layer_size
cold_size = later_du_by_timeline[cold]
cold_upper = 2 * env.layer_size
log.info(
f"expecting for warm tenant: {human_bytes(warm_lower)} < {human_bytes(warm_size)} < {human_bytes(warm_upper)}"
)
log.info(f"expecting for cold tenant: {human_bytes(cold_size)} < {human_bytes(cold_upper)}")
if order == EvictionOrder.ABSOLUTE_ORDER:
# bounds for warmed_size
warm_lower = 0.5 * du_by_timeline[warm]
assert warm_size > warm_lower, "warmed up tenant should be at about half size (lower)"
assert warm_size < warm_upper, "warmed up tenant should be at about half size (upper)"
# We don't know exactly whether the cold tenant needs 2 or just 1 env.layer_size wiggle room.
# So, check for up to 3 here.
warm_upper = warm_lower + 3 * env.layer_size
assert (
cold_size < cold_upper
), "the cold tenant should be evicted to its min_resident_size, i.e., max layer file size"
cold_upper = 2 * env.layer_size
log.info(f"tenants: warm={warm[0]}, cold={cold[0]}")
log.info(
f"expecting for warm tenant: {human_bytes(warm_lower)} < {human_bytes(warm_size)} < {human_bytes(warm_upper)}"
)
log.info(f"expecting for cold tenant: {human_bytes(cold_size)} < {human_bytes(cold_upper)}")
assert warm_size > warm_lower, "warmed up tenant should be at about half size (lower)"
assert warm_size < warm_upper, "warmed up tenant should be at about half size (upper)"
assert (
cold_size < cold_upper
), "the cold tenant should be evicted to its min_resident_size, i.e., max layer file size"
else:
# just go with the space was freed, find proper limits later
pass
def poor_mans_du(
@@ -501,6 +556,7 @@ def test_statvfs_error_handling(eviction_env: EvictionEnv):
"type": "Failure",
"mocked_error": "EIO",
},
eviction_order=EvictionOrder.ABSOLUTE_ORDER,
)
assert env.neon_env.pageserver.log_contains(".*statvfs failed.*EIO")
@@ -533,6 +589,7 @@ def test_statvfs_pressure_usage(eviction_env: EvictionEnv):
# This avoids accounting for metadata files & tenant conf in the tests.
"name_filter": ".*__.*",
},
eviction_order=EvictionOrder.ABSOLUTE_ORDER,
)
def relieved_log_message():
@@ -573,6 +630,7 @@ def test_statvfs_pressure_min_avail_bytes(eviction_env: EvictionEnv):
# This avoids accounting for metadata files & tenant conf in the tests.
"name_filter": ".*__.*",
},
eviction_order=EvictionOrder.ABSOLUTE_ORDER,
)
def relieved_log_message():

View File

@@ -1,19 +1,59 @@
import os
import re
import time
from fixtures.neon_fixtures import NeonEnv
from fixtures.log_helper import log
from fixtures.neon_fixtures import Endpoint, NeonEnv
def wait_caughtup(primary: Endpoint, secondary: Endpoint):
primary_lsn = primary.safe_psql_scalar(
"SELECT pg_current_wal_insert_lsn()::text", log_query=False
)
while True:
secondary_lsn = secondary.safe_psql_scalar(
"SELECT pg_last_wal_replay_lsn()", log_query=False
)
caught_up = secondary_lsn >= primary_lsn
log.info(f"caughtup={caught_up}, primary_lsn={primary_lsn}, secondary_lsn={secondary_lsn}")
if caught_up:
return
time.sleep(1)
# Check for corrupted WAL messages which might otherwise go unnoticed if
# reconnection fixes this.
def scan_standby_log_for_errors(secondary):
log_path = secondary.endpoint_path() / "compute.log"
with log_path.open("r") as f:
markers = re.compile(
r"incorrect resource manager data|record with incorrect|invalid magic number|unexpected pageaddr"
)
for line in f:
if markers.search(line):
log.info(f"bad error in standby log: {line}")
raise AssertionError()
def test_hot_standby(neon_simple_env: NeonEnv):
env = neon_simple_env
# We've had a bug caused by WAL records split across multiple XLogData
# messages resulting in corrupted WAL complains on standby. It reproduced
# only when sending from safekeeper is slow enough to grab full
# MAX_SEND_SIZE messages. So insert sleep through failpoints, but only in
# one conf to decrease test time.
slow_down_send = "[debug-pg16]" in os.environ.get("PYTEST_CURRENT_TEST", "")
if slow_down_send:
sk_http = env.safekeepers[0].http_client()
sk_http.configure_failpoints([("sk-send-wal-replica-sleep", "return(100)")])
with env.endpoints.create_start(
branch_name="main",
endpoint_id="primary",
) as primary:
time.sleep(1)
with env.endpoints.new_replica_start(origin=primary, endpoint_id="secondary") as secondary:
primary_lsn = None
caught_up = False
queries = [
"SHOW neon.timeline_id",
"SHOW neon.tenant_id",
@@ -26,23 +66,6 @@ def test_hot_standby(neon_simple_env: NeonEnv):
with p_con.cursor() as p_cur:
p_cur.execute("CREATE TABLE test AS SELECT generate_series(1, 100) AS i")
# Explicit commit to make sure other connections (and replicas) can
# see the changes of this commit.
p_con.commit()
with p_con.cursor() as p_cur:
p_cur.execute("SELECT pg_current_wal_insert_lsn()::text")
res = p_cur.fetchone()
assert res is not None
(lsn,) = res
primary_lsn = lsn
# Explicit commit to make sure other connections (and replicas) can
# see the changes of this commit.
# Note that this may generate more WAL if the transaction has changed
# things, but we don't care about that.
p_con.commit()
for query in queries:
with p_con.cursor() as p_cur:
p_cur.execute(query)
@@ -51,30 +74,28 @@ def test_hot_standby(neon_simple_env: NeonEnv):
response = res
responses[query] = response
# insert more data to make safekeeper send MAX_SEND_SIZE messages
if slow_down_send:
primary.safe_psql("create table t(key int, value text)")
primary.safe_psql("insert into t select generate_series(1, 100000), 'payload'")
wait_caughtup(primary, secondary)
with secondary.connect() as s_con:
with s_con.cursor() as s_cur:
s_cur.execute("SELECT 1 WHERE pg_is_in_recovery()")
res = s_cur.fetchone()
assert res is not None
while not caught_up:
with s_con.cursor() as secondary_cursor:
secondary_cursor.execute("SELECT pg_last_wal_replay_lsn()")
res = secondary_cursor.fetchone()
assert res is not None
(secondary_lsn,) = res
# There may be more changes on the primary after we got our LSN
# due to e.g. autovacuum, but that shouldn't impact the content
# of the tables, so we check whether we've replayed up to at
# least after the commit of the `test` table.
caught_up = secondary_lsn >= primary_lsn
# Explicit commit to flush any transient transaction-level state.
s_con.commit()
for query in queries:
with s_con.cursor() as secondary_cursor:
secondary_cursor.execute(query)
response = secondary_cursor.fetchone()
assert response is not None
assert response == responses[query]
scan_standby_log_for_errors(secondary)
# clean up
if slow_down_send:
sk_http.configure_failpoints(("sk-send-wal-replica-sleep", "off"))

View File

@@ -102,9 +102,7 @@ def test_basic_eviction(
), f"Did not expect to find {local_layer} layer after evicting"
empty_layers = list(filter(lambda path: path.name != "metadata", timeline_path.glob("*")))
assert (
not empty_layers
), f"After evicting all layers, timeline {tenant_id}/{timeline_id} should have no layers locally, but got: {empty_layers}"
assert not empty_layers, f"After evicting all layers, timeline {tenant_id}/{timeline_id} should have no layers locally, but got: {empty_layers}"
evicted_layer_map_info = client.layer_map_info(tenant_id=tenant_id, timeline_id=timeline_id)
assert (

View File

@@ -38,6 +38,9 @@ def test_issue_5878(neon_env_builder: NeonEnvBuilder):
neon_env_builder.enable_pageserver_remote_storage(RemoteStorageKind.LOCAL_FS)
env = neon_env_builder.init_start()
env.pageserver.allowed_errors.extend(
[".*Dropped remote consistent LSN updates.*", ".*Dropping stale deletions.*"]
)
ps_http = env.pageserver.http_client()

View File

@@ -145,8 +145,7 @@ def expect_updated_msg_lsn(
last_msg_lsn = Lsn(timeline_details["last_received_msg_lsn"])
assert (
prev_msg_lsn is None or prev_msg_lsn < last_msg_lsn
), f"the last received message's LSN {last_msg_lsn} hasn't been updated \
compared to the previous message's LSN {prev_msg_lsn}"
), f"the last received message's LSN {last_msg_lsn} hasn't been updated compared to the previous message's LSN {prev_msg_lsn}"
return last_msg_lsn

View File

@@ -254,7 +254,9 @@ def test_generations_upgrade(neon_env_builder: NeonEnvBuilder):
metadata_summary = S3Scrubber(
neon_env_builder.test_output_dir, neon_env_builder
).scan_metadata()
assert metadata_summary["count"] == 1 # Scrubber should have seen our timeline
assert metadata_summary["tenant_count"] == 1 # Scrubber should have seen our timeline
assert metadata_summary["timeline_count"] == 1
assert metadata_summary["timeline_shard_count"] == 1
assert not metadata_summary["with_errors"]
assert not metadata_summary["with_warnings"]

View File

@@ -1,9 +1,11 @@
import random
from pathlib import Path
from typing import Any, Dict, Optional
import pytest
from fixtures.log_helper import log
from fixtures.neon_fixtures import NeonEnvBuilder, NeonPageserver
from fixtures.neon_fixtures import NeonEnvBuilder, NeonPageserver, S3Scrubber
from fixtures.pageserver.utils import assert_prefix_empty, tenant_delete_wait_completed
from fixtures.remote_storage import LocalFsStorage, RemoteStorageKind
from fixtures.types import TenantId, TimelineId
from fixtures.utils import wait_until
@@ -251,6 +253,9 @@ def test_live_migration(neon_env_builder: NeonEnvBuilder):
flush_ms=5000,
)
# Encourage the new location to download while still in secondary mode
pageserver_b.http_client().tenant_secondary_download(tenant_id)
migrated_generation = env.attachment_service.attach_hook_issue(tenant_id, pageserver_b.id)
log.info(f"Acquired generation {migrated_generation} for destination pageserver")
assert migrated_generation == initial_generation + 1
@@ -258,8 +263,6 @@ def test_live_migration(neon_env_builder: NeonEnvBuilder):
# Writes and reads still work in AttachedStale.
workload.validate(pageserver_a.id)
# TODO: call into secondary mode API hooks to do an upload/download sync
# Generate some more dirty writes: we expect the origin to ingest WAL in
# in AttachedStale
workload.churn_rows(64, pageserver_a.id, upload=False)
@@ -369,3 +372,143 @@ def test_heatmap_uploads(neon_env_builder: NeonEnvBuilder):
log.info(f"Read back heatmap: {heatmap_second}")
assert heatmap_second != heatmap_first
validate_heatmap(heatmap_second)
def list_layers(pageserver, tenant_id: TenantId, timeline_id: TimelineId) -> list[Path]:
"""
Inspect local storage on a pageserver to discover which layer files are present.
:return: list of relative paths to layers, from the timeline root.
"""
timeline_path = pageserver.timeline_dir(tenant_id, timeline_id)
def relative(p: Path) -> Path:
return p.relative_to(timeline_path)
return sorted(
list(
map(
relative,
filter(
lambda path: path.name != "metadata"
and "ephemeral" not in path.name
and "temp" not in path.name,
timeline_path.glob("*"),
),
)
)
)
def test_secondary_downloads(neon_env_builder: NeonEnvBuilder):
"""
Test the overall data flow in secondary mode:
- Heatmap uploads from the attached location
- Heatmap & layer downloads from the secondary location
- Eviction of layers on the attached location results in deletion
on the secondary location as well.
"""
neon_env_builder.num_pageservers = 2
neon_env_builder.enable_pageserver_remote_storage(
remote_storage_kind=RemoteStorageKind.MOCK_S3,
)
env = neon_env_builder.init_start(initial_tenant_conf=TENANT_CONF)
assert env.attachment_service is not None
tenant_id = env.initial_tenant
timeline_id = env.initial_timeline
ps_attached = env.pageservers[0]
ps_secondary = env.pageservers[1]
workload = Workload(env, tenant_id, timeline_id)
workload.init(env.pageservers[0].id)
workload.write_rows(256, ps_attached.id)
# Configure a secondary location
log.info("Setting up secondary location...")
ps_secondary.tenant_location_configure(
tenant_id,
{
"mode": "Secondary",
"secondary_conf": {"warm": True},
"tenant_conf": {},
},
)
readback_conf = ps_secondary.read_tenant_location_conf(tenant_id)
log.info(f"Read back conf: {readback_conf}")
# Explicit upload/download cycle
# ==============================
log.info("Synchronizing after initial write...")
ps_attached.http_client().tenant_heatmap_upload(tenant_id)
ps_secondary.http_client().tenant_secondary_download(tenant_id)
assert list_layers(ps_attached, tenant_id, timeline_id) == list_layers(
ps_secondary, tenant_id, timeline_id
)
# Make changes on attached pageserver, check secondary downloads them
# ===================================================================
log.info("Synchronizing after subsequent write...")
workload.churn_rows(128, ps_attached.id)
ps_attached.http_client().tenant_heatmap_upload(tenant_id)
ps_secondary.http_client().tenant_secondary_download(tenant_id)
assert list_layers(ps_attached, tenant_id, timeline_id) == list_layers(
ps_secondary, tenant_id, timeline_id
)
# FIXME: this sleep is needed to avoid on-demand promotion of the layers we evict, while
# walreceiver is still doing something.
import time
time.sleep(5)
# Do evictions on attached pageserver, check secondary follows along
# ==================================================================
log.info("Evicting a layer...")
layer_to_evict = list_layers(ps_attached, tenant_id, timeline_id)[0]
ps_attached.http_client().evict_layer(tenant_id, timeline_id, layer_name=layer_to_evict.name)
log.info("Synchronizing after eviction...")
ps_attached.http_client().tenant_heatmap_upload(tenant_id)
ps_secondary.http_client().tenant_secondary_download(tenant_id)
assert layer_to_evict not in list_layers(ps_attached, tenant_id, timeline_id)
assert list_layers(ps_attached, tenant_id, timeline_id) == list_layers(
ps_secondary, tenant_id, timeline_id
)
# Scrub the remote storage
# ========================
# This confirms that the scrubber isn't upset by the presence of the heatmap
S3Scrubber(neon_env_builder.test_output_dir, neon_env_builder).scan_metadata()
# Detach secondary and delete tenant
# ===================================
# This confirms that the heatmap gets cleaned up as well as other normal content.
log.info("Detaching secondary location...")
ps_secondary.tenant_location_configure(
tenant_id,
{
"mode": "Detached",
"secondary_conf": None,
"tenant_conf": {},
},
)
log.info("Deleting tenant...")
tenant_delete_wait_completed(ps_attached.http_client(), tenant_id, 10)
assert_prefix_empty(
neon_env_builder,
prefix="/".join(
(
"tenants",
str(tenant_id),
)
),
)

View File

@@ -144,8 +144,11 @@ def test_remote_storage_backup_and_restore(
# Introduce failpoint in list remote timelines code path to make tenant_attach fail.
# This is before the failures injected by test_remote_failures, so it's a permanent error.
pageserver_http.configure_failpoints(("storage-sync-list-remote-timelines", "return"))
env.pageserver.allowed_errors.append(
".*attach failed.*: storage-sync-list-remote-timelines",
env.pageserver.allowed_errors.extend(
[
".*attach failed.*: storage-sync-list-remote-timelines",
".*Tenant state is Broken: storage-sync-list-remote-timelines.*",
]
)
# Attach it. This HTTP request will succeed and launch a
# background task to load the tenant. In that background task,
@@ -159,9 +162,13 @@ def test_remote_storage_backup_and_restore(
"data": {"reason": "storage-sync-list-remote-timelines"},
}
# Ensure that even though the tenant is broken, we can't attach it again.
with pytest.raises(Exception, match=f"tenant {tenant_id} already exists, state: Broken"):
env.pageserver.tenant_attach(tenant_id)
# Ensure that even though the tenant is broken, retrying the attachment fails
with pytest.raises(Exception, match="Tenant state is Broken"):
# Use same generation as in previous attempt
gen_state = env.attachment_service.inspect(tenant_id)
assert gen_state is not None
generation = gen_state[0]
env.pageserver.tenant_attach(tenant_id, generation=generation)
# Restart again, this implicitly clears the failpoint.
# test_remote_failures=1 remains active, though, as it's in the pageserver config.
@@ -176,10 +183,8 @@ def test_remote_storage_backup_and_restore(
), "we shouldn't have tried any layer downloads yet since list remote timelines has a failpoint"
env.pageserver.start()
# Ensure that the pageserver remembers that the tenant was attaching, by
# trying to attach it again. It should fail.
with pytest.raises(Exception, match=f"tenant {tenant_id} already exists, state:"):
env.pageserver.tenant_attach(tenant_id)
# The attach should have got far enough that it recovers on restart (i.e. tenant's
# config was written to local storage).
log.info("waiting for tenant to become active. this should be quick with on-demand download")
wait_until_tenant_active(

View File

@@ -391,8 +391,7 @@ def test_tenant_detach_ignored_tenant(neon_simple_env: NeonEnv):
tenants_after_detach = [tenant["id"] for tenant in client.tenant_list()]
assert (
tenant_id not in tenants_after_detach
), f"Ignored and then detached tenant {tenant_id} \
should not be present in pageserver's memory"
), f"Ignored and then detached tenant {tenant_id} should not be present in pageserver's memory"
# Creates a tenant, and detaches it with extra paremeter that forces ignored tenant detach.
@@ -430,8 +429,7 @@ def test_tenant_detach_regular_tenant(neon_simple_env: NeonEnv):
tenants_after_detach = [tenant["id"] for tenant in client.tenant_list()]
assert (
tenant_id not in tenants_after_detach
), f"Ignored and then detached tenant {tenant_id} \
should not be present in pageserver's memory"
), f"Ignored and then detached tenant {tenant_id} should not be present in pageserver's memory"
def test_detach_while_attaching(
@@ -629,7 +627,7 @@ def test_ignored_tenant_download_missing_layers(neon_env_builder: NeonEnvBuilder
# Tests that attach is never working on a tenant, ignored or not, as long as it's not absent locally
# Similarly, tests that it's not possible to schedule a `load` for tenat that's not ignored.
def test_load_attach_negatives(neon_env_builder: NeonEnvBuilder):
def test_load_negatives(neon_env_builder: NeonEnvBuilder):
neon_env_builder.enable_pageserver_remote_storage(RemoteStorageKind.LOCAL_FS)
env = neon_env_builder.init_start()
pageserver_http = env.pageserver.http_client()
@@ -646,25 +644,16 @@ def test_load_attach_negatives(neon_env_builder: NeonEnvBuilder):
):
env.pageserver.tenant_load(tenant_id)
with pytest.raises(
expected_exception=PageserverApiException,
match=f"tenant {tenant_id} already exists, state: Active",
):
env.pageserver.tenant_attach(tenant_id)
pageserver_http.tenant_ignore(tenant_id)
env.pageserver.allowed_errors.append(".*tenant directory already exists.*")
with pytest.raises(
expected_exception=PageserverApiException,
match="tenant directory already exists",
):
env.pageserver.tenant_attach(tenant_id)
def test_ignore_while_attaching(
def test_detach_while_activating(
neon_env_builder: NeonEnvBuilder,
):
"""
Test cancellation behavior for tenants that are stuck somewhere between
being attached and reaching Active state.
"""
neon_env_builder.enable_pageserver_remote_storage(RemoteStorageKind.LOCAL_FS)
env = neon_env_builder.init_start()
@@ -686,39 +675,28 @@ def test_ignore_while_attaching(
data_secret = "very secret secret"
insert_test_data(pageserver_http, tenant_id, timeline_id, data_id, data_secret, endpoint)
tenants_before_ignore = [tenant["id"] for tenant in pageserver_http.tenant_list()]
tenants_before_detach = [tenant["id"] for tenant in pageserver_http.tenant_list()]
# Detach it
pageserver_http.tenant_detach(tenant_id)
# And re-attach, but stop attach task_mgr task from completing
pageserver_http.configure_failpoints([("attach-before-activate", "return(5000)")])
pageserver_http.configure_failpoints([("attach-before-activate", "return(600000)")])
env.pageserver.tenant_attach(tenant_id)
# Run ignore on the task, thereby cancelling the attach.
# XXX This should take priority over attach, i.e., it should cancel the attach task.
# But neither the failpoint, nor the proper remote_timeline_client download functions,
# are sensitive to task_mgr::shutdown.
# This problem is tracked in https://github.com/neondatabase/neon/issues/2996 .
# So, for now, effectively, this ignore here will block until attach task completes.
pageserver_http.tenant_ignore(tenant_id)
# Cannot attach it due to some local files existing
env.pageserver.allowed_errors.append(".*tenant directory already exists.*")
with pytest.raises(
expected_exception=PageserverApiException,
match="tenant directory already exists",
):
env.pageserver.tenant_attach(tenant_id)
# The tenant is in the Activating state. This should not block us from
# shutting it down and detaching it.
pageserver_http.tenant_detach(tenant_id)
tenants_after_ignore = [tenant["id"] for tenant in pageserver_http.tenant_list()]
assert tenant_id not in tenants_after_ignore, "Ignored tenant should be missing"
assert len(tenants_after_ignore) + 1 == len(
tenants_before_ignore
tenants_after_detach = [tenant["id"] for tenant in pageserver_http.tenant_list()]
assert tenant_id not in tenants_after_detach, "Detached tenant should be missing"
assert len(tenants_after_detach) + 1 == len(
tenants_before_detach
), "Only ignored tenant should be missing"
# Calling load will bring the tenant back online
# Subsequently attaching it again should still work
pageserver_http.configure_failpoints([("attach-before-activate", "off")])
env.pageserver.tenant_load(tenant_id)
env.pageserver.tenant_attach(tenant_id)
wait_until_tenant_state(pageserver_http, tenant_id, "Active", 5)
endpoint.stop()
@@ -817,9 +795,7 @@ def test_metrics_while_ignoring_broken_tenant_and_reloading(
if found_broken:
break
time.sleep(0.5)
assert (
found_broken
), f"broken should still be in set, but it is not in the tenant state count: broken={broken}, broken_set={broken_set}"
assert found_broken, f"broken should still be in set, but it is not in the tenant state count: broken={broken}, broken_set={broken_set}"
env.pageserver.tenant_load(env.initial_tenant)
@@ -837,6 +813,4 @@ def test_metrics_while_ignoring_broken_tenant_and_reloading(
break
time.sleep(0.5)
assert (
found_active
), f"reloaded tenant should be active, and broken tenant set item removed: active={active}, broken_set={broken_set}"
assert found_active, f"reloaded tenant should be active, and broken tenant set item removed: active={active}, broken_set={broken_set}"

View File

@@ -161,12 +161,10 @@ def switch_pg_to_new_pageserver(
files_before_detach = os.listdir(timeline_to_detach_local_path)
assert (
"metadata" in files_before_detach
), f"Regular timeline {timeline_to_detach_local_path} should have the metadata file,\
but got: {files_before_detach}"
), f"Regular timeline {timeline_to_detach_local_path} should have the metadata file, but got: {files_before_detach}"
assert (
len(files_before_detach) >= 2
), f"Regular timeline {timeline_to_detach_local_path} should have at least one layer file,\
but got {files_before_detach}"
), f"Regular timeline {timeline_to_detach_local_path} should have at least one layer file, but got {files_before_detach}"
return timeline_to_detach_local_path

View File

@@ -29,18 +29,13 @@ def test_tenant_creation_fails(neon_simple_env: NeonEnv):
initial_tenants = sorted(
map(lambda t: t.split()[0], neon_simple_env.neon_cli.list_tenants().stdout.splitlines())
)
initial_tenant_dirs = [d for d in tenants_dir.iterdir()]
[d for d in tenants_dir.iterdir()]
neon_simple_env.pageserver.allowed_errors.extend(
[
".*Failed to create directory structure for tenant .*, cleaning tmp data.*",
".*Failed to fsync removed temporary tenant directory .*",
]
)
neon_simple_env.pageserver.allowed_errors.append(".*tenant-config-before-write.*")
pageserver_http = neon_simple_env.pageserver.http_client()
pageserver_http.configure_failpoints(("tenant-creation-before-tmp-rename", "return"))
with pytest.raises(Exception, match="tenant-creation-before-tmp-rename"):
pageserver_http.configure_failpoints(("tenant-config-before-write", "return"))
with pytest.raises(Exception, match="tenant-config-before-write"):
_ = neon_simple_env.neon_cli.create_tenant()
new_tenants = sorted(
@@ -48,10 +43,10 @@ def test_tenant_creation_fails(neon_simple_env: NeonEnv):
)
assert initial_tenants == new_tenants, "should not create new tenants"
new_tenant_dirs = [d for d in tenants_dir.iterdir()]
assert (
new_tenant_dirs == initial_tenant_dirs
), "pageserver should clean its temp tenant dirs on tenant creation failure"
# Any files left behind on disk during failed creation do not prevent
# a retry from succeeding.
pageserver_http.configure_failpoints(("tenant-config-before-write", "off"))
neon_simple_env.neon_cli.create_tenant()
def test_tenants_normal_work(neon_env_builder: NeonEnvBuilder):

View File

@@ -201,8 +201,8 @@ def test_tenants_attached_after_download(neon_env_builder: NeonEnvBuilder):
len(restored_timelines) == 1
), f"Tenant {tenant_id} should have its timeline reattached after its layer is downloaded from the remote storage"
restored_timeline = restored_timelines[0]
assert restored_timeline["timeline_id"] == str(
timeline_id
assert (
restored_timeline["timeline_id"] == str(timeline_id)
), f"Tenant {tenant_id} should have its old timeline {timeline_id} restored from the remote storage"
# Check that we had to retry the downloads
@@ -280,8 +280,8 @@ def test_tenant_redownloads_truncated_file_on_startup(
len(restored_timelines) == 1
), f"Tenant {tenant_id} should have its timeline reattached after its layer is downloaded from the remote storage"
retored_timeline = restored_timelines[0]
assert retored_timeline["timeline_id"] == str(
timeline_id
assert (
retored_timeline["timeline_id"] == str(timeline_id)
), f"Tenant {tenant_id} should have its old timeline {timeline_id} restored from the remote storage"
# Request non-incremental logical size. Calculating it needs the layer file that

View File

@@ -1,3 +1,4 @@
import concurrent.futures
import math
import queue
import random
@@ -24,6 +25,7 @@ from fixtures.pageserver.utils import (
assert_tenant_state,
timeline_delete_wait_completed,
wait_for_upload_queue_empty,
wait_tenant_status_404,
wait_until_tenant_active,
)
from fixtures.pg_version import PgVersion
@@ -776,6 +778,7 @@ def test_ondemand_activation(neon_env_builder: NeonEnvBuilder):
def get_tenant_states():
states = {}
log.info(f"Tenant ids: {tenant_ids}")
for tenant_id in tenant_ids:
tenant = pageserver_http.tenant_status(tenant_id=tenant_id)
states[tenant_id] = tenant["state"]["slug"]
@@ -872,3 +875,116 @@ def test_ondemand_activation(neon_env_builder: NeonEnvBuilder):
pageserver_http.get_metric_value("pageserver_tenant_startup_scheduled_total") == n_tenants
)
assert pageserver_http.get_metric_value("pageserver_tenant_startup_complete_total") == n_tenants
# Check that tenant deletion proactively wakes tenants: this is done separately to the main
# body of the test because it will disrupt tenant counts
env.pageserver.stop()
env.pageserver.start(
extra_env_vars={"FAILPOINTS": "timeline-calculate-logical-size-pause=pause"}
)
wait_until(10, 1, at_least_one_active)
delete_tenant_id = list(
[(tid, s) for (tid, s) in get_tenant_states().items() if s == "Attaching"]
)[0][0]
# Deleting a stuck tenant should prompt it to go active
with concurrent.futures.ThreadPoolExecutor() as executor:
log.info("Starting background delete")
def delete_tenant():
env.pageserver.http_client().tenant_delete(delete_tenant_id)
background_delete = executor.submit(delete_tenant)
# Deletion itself won't complete due to our failpoint: Tenant::shutdown can't complete while calculating
# logical size is paused in a failpoint. So instead we will use a log observation to check that
# on-demand activation was triggered by the tenant deletion
log_match = f".*attach{{tenant_id={delete_tenant_id} shard_id=0000}}: Activating tenant \\(on-demand\\).*"
def activated_on_demand():
assert env.pageserver.log_contains(log_match) is not None
log.info(f"Waiting for activation message '{log_match}'")
try:
wait_until(10, 1, activated_on_demand)
finally:
log.info("Clearing failpoint")
pageserver_http.configure_failpoints(("timeline-calculate-logical-size-pause", "off"))
# Deletion should complete successfully now that failpoint is unblocked
log.info("Joining background delete")
background_delete.result(timeout=10)
# Poll for deletion to complete
wait_tenant_status_404(pageserver_http, tenant_id=delete_tenant_id, iterations=40)
tenant_ids.remove(delete_tenant_id)
# Check that all the stuck tenants proceed to active (apart from the one that deletes)
wait_until(10, 1, all_active)
assert len(get_tenant_states()) == n_tenants - 1
def test_timeline_logical_size_task_priority(neon_env_builder: NeonEnvBuilder):
"""
/v1/tenant/:tenant_shard_id/timeline and /v1/tenant/:tenant_shard_id
should not bump the priority of the initial logical size computation
background task, unless the force-await-initial-logical-size query param
is set to true.
This test verifies the invariant stated above. A couple of tricks are involved:
1. Detach the tenant and re-attach it after the page server is restarted. This circumvents
the warm-up which forces the initial logical size calculation.
2. A fail point (initial-size-calculation-permit-pause) is used to block the initial
computation of the logical size until forced.
3. A fail point (walreceiver-after-ingest) is used to pause the walreceiver since
otherwise it would force the logical size computation.
"""
env = neon_env_builder.init_start()
client = env.pageserver.http_client()
tenant_id = env.initial_tenant
timeline_id = env.initial_timeline
# load in some data
endpoint = env.endpoints.create_start("main", tenant_id=tenant_id)
endpoint.safe_psql_many(
[
"CREATE TABLE foo (x INTEGER)",
"INSERT INTO foo SELECT g FROM generate_series(1, 10000) g",
]
)
wait_for_last_flush_lsn(env, endpoint, tenant_id, timeline_id)
# restart with failpoint inside initial size calculation task
log.info(f"Detaching tenant {tenant_id} and stopping pageserver...")
endpoint.stop()
env.pageserver.tenant_detach(tenant_id)
env.pageserver.stop()
env.pageserver.start(
extra_env_vars={
"FAILPOINTS": "initial-size-calculation-permit-pause=pause;walreceiver-after-ingest=pause"
}
)
log.info(f"Re-attaching tenant {tenant_id}...")
env.pageserver.tenant_attach(tenant_id)
# kick off initial size calculation task (the response we get here is the estimated size)
def assert_initial_logical_size_not_prioritised():
details = client.timeline_detail(tenant_id, timeline_id)
assert details["current_logical_size_is_accurate"] is False
assert_initial_logical_size_not_prioritised()
# ensure that's actually the case
time.sleep(2)
assert_initial_logical_size_not_prioritised()
details = client.timeline_detail(tenant_id, timeline_id, force_await_initial_logical_size=True)
assert details["current_logical_size_is_accurate"] is True
client.configure_failpoints(
[("initial-size-calculation-permit-pause", "off"), ("walreceiver-after-ingest", "off")]
)

View File

@@ -419,7 +419,8 @@ def wait(f, desc, timeout=30, wait_f=None):
try:
if f():
break
except Exception:
except Exception as e:
log.info(f"got exception while waiting for {desc}: {e}")
pass
elapsed = time.time() - started_at
if elapsed > timeout:
@@ -565,7 +566,7 @@ def test_s3_wal_replay(neon_env_builder: NeonEnvBuilder):
f"Pageserver last_record_lsn={pageserver_lsn}; flush_lsn={last_lsn}; lag before replay is {lag / 1024}kb"
)
endpoint.stop_and_destroy()
endpoint.stop()
timeline_delete_wait_completed(ps_http, tenant_id, timeline_id)
# Also delete and manually create timeline on safekeepers -- this tests
@@ -1001,8 +1002,40 @@ def test_restart_endpoint(neon_env_builder: NeonEnvBuilder):
endpoint.start()
# Context manager which logs passed time on exit.
class DurationLogger:
def __init__(self, desc):
self.desc = desc
def __enter__(self):
self.ts_before = time.time()
def __exit__(self, *exc):
log.info(f"{self.desc} finished in {time.time() - self.ts_before}s")
# Context manager which logs WAL position change on exit.
class WalChangeLogger:
def __init__(self, ep, desc_before):
self.ep = ep
self.desc_before = desc_before
def __enter__(self):
self.ts_before = time.time()
self.lsn_before = Lsn(self.ep.safe_psql_scalar("select pg_current_wal_lsn()"))
log.info(f"{self.desc_before}, lsn_before={self.lsn_before}")
def __exit__(self, *exc):
lsn_after = Lsn(self.ep.safe_psql_scalar("select pg_current_wal_lsn()"))
log.info(
f"inserted {((lsn_after - self.lsn_before) / 1024 / 1024):.3f} MB of WAL in {(time.time() - self.ts_before):.3f}s"
)
# Test that we can create timeline with one safekeeper down and initialize it
# later when some data already had been written.
# later when some data already had been written. It is strictly weaker than
# test_lagging_sk, but also is the simplest test to trigger WAL sk -> compute
# download (recovery) and as such useful for development/testing.
def test_late_init(neon_env_builder: NeonEnvBuilder):
neon_env_builder.num_safekeepers = 3
env = neon_env_builder.init_start()
@@ -1010,12 +1043,13 @@ def test_late_init(neon_env_builder: NeonEnvBuilder):
sk1 = env.safekeepers[0]
sk1.stop()
# create and insert smth while safekeeper is down...
env.neon_cli.create_branch("test_late_init")
tenant_id = env.initial_tenant
timeline_id = env.neon_cli.create_branch("test_late_init")
endpoint = env.endpoints.create_start("test_late_init")
# create and insert smth while safekeeper is down...
endpoint.safe_psql("create table t(key int, value text)")
endpoint.safe_psql("insert into t select generate_series(1, 1000), 'payload'")
log.info("insert with safekeeper down done")
with WalChangeLogger(endpoint, "doing insert with sk1 down"):
endpoint.safe_psql("insert into t select generate_series(1, 1000), 'payload'")
endpoint.stop() # stop compute
# stop another safekeeper, and start one which missed timeline creation
@@ -1024,28 +1058,213 @@ def test_late_init(neon_env_builder: NeonEnvBuilder):
sk1.start()
# insert some more
endpoint = env.endpoints.create_start("test_late_init")
with DurationLogger("recovery"):
endpoint = env.endpoints.create_start("test_late_init")
endpoint.safe_psql("insert into t select generate_series(1,100), 'payload'")
wait_flush_lsn_align_by_ep(
env, "test_late_init", tenant_id, timeline_id, endpoint, [sk1, env.safekeepers[2]]
)
# Check that WALs are the same.
cmp_sk_wal([sk1, env.safekeepers[2]], tenant_id, timeline_id)
# is timeline flush_lsn equal on provided safekeepers?
def is_flush_lsn_aligned(sk1_http_cli, sk2_http_cli, tenant_id, timeline_id):
status1 = sk1_http_cli.timeline_status(tenant_id, timeline_id)
status2 = sk2_http_cli.timeline_status(tenant_id, timeline_id)
log.info(
f"waiting for flush_lsn alignment, sk1.flush_lsn={status1.flush_lsn}, sk2.flush_lsn={status2.flush_lsn}"
def is_flush_lsn_aligned(sk_http_clis, tenant_id, timeline_id):
flush_lsns = [
sk_http_cli.timeline_status(tenant_id, timeline_id).flush_lsn
for sk_http_cli in sk_http_clis
]
log.info(f"waiting for flush_lsn alignment, flush_lsns={flush_lsns}")
return all([flush_lsns[0] == flsn for flsn in flush_lsns])
def are_walreceivers_absent(sk_http_cli, tenant_id: TenantId, timeline_id: TimelineId):
status = sk_http_cli.timeline_status(tenant_id, timeline_id)
log.info(f"waiting for walreceivers to be gone, currently {status.walreceivers}")
return len(status.walreceivers) == 0
# Assert by xxd that WAL on given safekeepers is identical. No compute must be
# running for this to be reliable.
def cmp_sk_wal(sks: List[Safekeeper], tenant_id: TenantId, timeline_id: TimelineId):
assert len(sks) >= 2, "cmp_sk_wal makes sense with >= 2 safekeepers passed"
sk_http_clis = [sk.http_client() for sk in sks]
# First check that term / flush_lsn are the same: it is easier to
# report/understand if WALs are different due to that.
statuses = [sk_http_cli.timeline_status(tenant_id, timeline_id) for sk_http_cli in sk_http_clis]
term_flush_lsns = [(s.acceptor_epoch, s.flush_lsn) for s in statuses]
for tfl, sk in zip(term_flush_lsns[1:], sks[1:]):
assert (
term_flush_lsns[0] == tfl
), f"(term, flush_lsn) are not equal on sks {sks[0].id} and {sk.id}: {term_flush_lsns[0]} != {tfl}"
# check that WALs are identic.
segs = [sk.list_segments(tenant_id, timeline_id) for sk in sks]
for cmp_segs, sk in zip(segs[1:], sks[1:]):
assert (
segs[0] == cmp_segs
), f"lists of segments on sks {sks[0].id} and {sk.id} are not identic: {segs[0]} and {cmp_segs}"
log.info(f"comparing segs {segs[0]}")
sk0 = sks[0]
for sk in sks[1:]:
(_, mismatch, not_regular) = filecmp.cmpfiles(
sk0.timeline_dir(tenant_id, timeline_id),
sk.timeline_dir(tenant_id, timeline_id),
segs[0],
shallow=False,
)
log.info(
f"filecmp result mismatch and not regular files:\n\t mismatch={mismatch}\n\t not_regular={not_regular}"
)
for f in mismatch:
f1 = os.path.join(sk0.timeline_dir(tenant_id, timeline_id), f)
f2 = os.path.join(sk.timeline_dir(tenant_id, timeline_id), f)
stdout_filename = "{}.filediff".format(f2)
with open(stdout_filename, "w") as stdout_f:
subprocess.run("xxd {} > {}.hex ".format(f1, f1), shell=True)
subprocess.run("xxd {} > {}.hex ".format(f2, f2), shell=True)
cmd = "diff {}.hex {}.hex".format(f1, f2)
subprocess.run([cmd], stdout=stdout_f, shell=True)
assert (mismatch, not_regular) == (
[],
[],
), f"WAL segs {f1} and {f2} on sks {sks[0].id} and {sk.id} are not identic"
# Wait until flush_lsn on given sks becomes equal, assuming endpoint ep is
# running. ep is stopped by this function. This is used in tests which check
# binary equality of WAL segments on safekeepers; which is inherently racy as
# shutting down endpoint might always write some WAL which can get to only one
# safekeeper. So here we recheck flush_lsn again after ep shutdown and retry if
# it has changed.
def wait_flush_lsn_align_by_ep(env, branch, tenant_id, timeline_id, ep, sks):
sk_http_clis = [sk.http_client() for sk in sks]
# First wait for the alignment.
wait(
partial(is_flush_lsn_aligned, sk_http_clis, tenant_id, timeline_id),
"flush_lsn to get aligned",
)
return status1.flush_lsn == status2.flush_lsn
ep.stop() # then stop endpoint
# Even if there is no compute, there might be some in flight data; ensure
# all walreceivers die before rechecking.
for sk_http_cli in sk_http_clis:
wait(
partial(are_walreceivers_absent, sk_http_cli, tenant_id, timeline_id),
"walreceivers to be gone",
)
# Now recheck again flush_lsn and exit if it is good
if is_flush_lsn_aligned(sk_http_clis, tenant_id, timeline_id):
return
# Otherwise repeat.
log.info("flush_lsn changed during endpoint shutdown; retrying alignment")
ep = env.endpoints.create_start(branch)
# Test behaviour with one safekeeper down and missing a lot of WAL. Namely, that
# 1) walproposer can't recover node if it misses WAL written by previous computes, but
# still starts up and functions normally if two other sks are ok.
# 2) walproposer doesn't keep WAL after some threshold (pg_wal bloat is limited), but functions
# normally if two other sks are ok.
# 3) Lagged safekeeper can still recover by peer recovery.
def test_one_sk_down(neon_env_builder: NeonEnvBuilder):
pass
# Test behaviour with one safekeeper down and missing a lot of WAL, exercising
# neon_walreader and checking that pg_wal never bloats. Namely, ensures that
# compute doesn't keep many WAL for lagging sk, but still can recover it with
# neon_walreader, in two scenarious: a) WAL never existed on compute (it started
# on basebackup LSN later than lagging sk position) though segment file exists
# b) WAL had been recycled on it and segment file doesn't exist.
#
# Also checks along the way that whenever there are two sks alive, compute
# should be able to commit.
def test_lagging_sk(neon_env_builder: NeonEnvBuilder):
# inserts ~20MB of WAL, a bit more than a segment.
def fill_segment(ep):
ep.safe_psql("insert into t select generate_series(1, 180000), 'payload'")
neon_env_builder.num_safekeepers = 3
env = neon_env_builder.init_start()
(sk1, sk2, sk3) = env.safekeepers
# create and insert smth while safekeeper is down...
sk1.stop()
tenant_id = env.initial_tenant
timeline_id = env.neon_cli.create_branch("test_lagging_sk")
ep = env.endpoints.create_start("test_lagging_sk")
ep.safe_psql("create table t(key int, value text)")
# make small insert to be on the same segment
ep.safe_psql("insert into t select generate_series(1, 1000), 'payload'")
log.info("insert with safekeeper down done")
ep.stop() # stop compute
# Stop another safekeeper, and start one which missed timeline creation.
sk2.stop()
sk1.start()
# Start new ep and insert some more. neon_walreader should download WAL for
# sk1 because it should be filled since the horizon (initial LSN) which is
# earlier than basebackup LSN.
ep = env.endpoints.create_start("test_lagging_sk")
ep.safe_psql("insert into t select generate_series(1,100), 'payload'")
# stop ep and ensure WAL is identical after recovery.
wait_flush_lsn_align_by_ep(env, "test_lagging_sk", tenant_id, timeline_id, ep, [sk1, sk3])
# Check that WALs are the same.
cmp_sk_wal([sk1, sk3], tenant_id, timeline_id)
# Now repeat insertion with sk1 down, but with inserting more data to check
# that WAL on compute is removed.
sk1.stop()
sk2.start()
# min_wal_size must be at least 2x segment size.
min_wal_config = [
"min_wal_size=32MB",
"max_wal_size=32MB",
"wal_keep_size=0",
"log_checkpoints=on",
]
ep = env.endpoints.create_start(
"test_lagging_sk",
config_lines=min_wal_config,
)
with WalChangeLogger(ep, "doing large insert with sk1 down"):
for _ in range(0, 5):
fill_segment(ep)
# there shouldn't be more than 2 WAL segments (but dir may have archive_status files)
assert ep.get_pg_wal_size() < 16 * 2.5
sk2.stop() # stop another sk to ensure sk1 and sk3 can work
sk1.start()
with DurationLogger("recovery"):
ep.safe_psql("insert into t select generate_series(1,100), 'payload'") # forces recovery
# stop ep and ensure WAL is identical after recovery.
wait_flush_lsn_align_by_ep(env, "test_lagging_sk", tenant_id, timeline_id, ep, [sk1, sk3])
# Check that WALs are the same.
cmp_sk_wal([sk1, sk3], tenant_id, timeline_id)
# Now do the same with different safekeeper sk2 down, and restarting ep
# before recovery (again scenario when recovery starts below basebackup_lsn,
# but multi segment now).
ep = env.endpoints.create_start(
"test_lagging_sk",
config_lines=["min_wal_size=32MB", "max_wal_size=32MB", "log_checkpoints=on"],
)
with WalChangeLogger(ep, "doing large insert with sk2 down"):
for _ in range(0, 5):
fill_segment(ep)
# there shouldn't be more than 2 WAL segments (but dir may have archive_status files)
assert ep.get_pg_wal_size() < 16 * 2.5
ep.stop()
ep = env.endpoints.create_start(
"test_lagging_sk",
config_lines=min_wal_config,
)
sk2.start()
with DurationLogger("recovery"):
wait_flush_lsn_align_by_ep(env, "test_lagging_sk", tenant_id, timeline_id, ep, [sk2, sk3])
# Check that WALs are the same.
cmp_sk_wal([sk1, sk2, sk3], tenant_id, timeline_id)
# Smaller version of test_one_sk_down testing peer recovery in isolation: that
@@ -1065,7 +1284,7 @@ def test_peer_recovery(neon_env_builder: NeonEnvBuilder):
sk2_http_cli = sk2.http_client()
# ensure tli gets created on sk1, peer recovery won't do that
wait(
partial(is_flush_lsn_aligned, sk1_http_cli, sk2_http_cli, tenant_id, timeline_id),
partial(is_flush_lsn_aligned, [sk1_http_cli, sk2_http_cli], tenant_id, timeline_id),
"flush_lsn to get aligned",
)
@@ -1087,7 +1306,7 @@ def test_peer_recovery(neon_env_builder: NeonEnvBuilder):
assert sk2_tli_status.flush_lsn - sk1_tli_status.flush_lsn >= 16 * 1024 * 1024
# wait a bit, lsns shouldn't change
# time.sleep(5)
time.sleep(2)
sk1_tli_status = sk1_http_cli.timeline_status(tenant_id, timeline_id)
sk2_tli_status = sk2_http_cli.timeline_status(tenant_id, timeline_id)
log.info(
@@ -1098,37 +1317,11 @@ def test_peer_recovery(neon_env_builder: NeonEnvBuilder):
# now restart safekeeper with peer recovery enabled and wait for recovery
sk1.stop().start(extra_opts=["--peer-recovery=true"])
wait(
partial(is_flush_lsn_aligned, sk1_http_cli, sk2_http_cli, tenant_id, timeline_id),
partial(is_flush_lsn_aligned, [sk1_http_cli, sk2_http_cli], tenant_id, timeline_id),
"flush_lsn to get aligned",
)
# check that WALs are identic after recovery
segs = sk1.list_segments(tenant_id, timeline_id)
log.info(f"segs are {segs}")
(_, mismatch, not_regular) = filecmp.cmpfiles(
sk1.timeline_dir(tenant_id, timeline_id),
sk2.timeline_dir(tenant_id, timeline_id),
segs,
shallow=False,
)
log.info(
f"filecmp result mismatch and not regular files:\n\t mismatch={mismatch}\n\t not_regular={not_regular}"
)
for f in mismatch:
f1 = os.path.join(sk1.timeline_dir(tenant_id, timeline_id), f)
f2 = os.path.join(sk2.timeline_dir(tenant_id, timeline_id), f)
stdout_filename = "{}.filediff".format(f2)
with open(stdout_filename, "w") as stdout_f:
subprocess.run("xxd {} > {}.hex ".format(f1, f1), shell=True)
subprocess.run("xxd {} > {}.hex ".format(f2, f2), shell=True)
cmd = "diff {}.hex {}.hex".format(f1, f2)
subprocess.run([cmd], stdout=stdout_f, shell=True)
assert (mismatch, not_regular) == ([], [])
cmp_sk_wal([sk1, sk2], tenant_id, timeline_id)
# stop one of safekeepers which weren't recovering and insert a bit more to check we can commit
env.safekeepers[2].stop()
@@ -1364,60 +1557,6 @@ def test_replace_safekeeper(neon_env_builder: NeonEnvBuilder):
show_statuses(env.safekeepers, tenant_id, timeline_id)
# We have `wal_keep_size=0`, so postgres should trim WAL once it's broadcasted
# to all safekeepers. This test checks that compute WAL can fit into small number
# of WAL segments.
def test_wal_deleted_after_broadcast(neon_env_builder: NeonEnvBuilder):
# used to calculate delta in collect_stats
last_lsn = Lsn(0)
# returns pg_wal size in MB
def collect_stats(endpoint: Endpoint, cur, enable_logs=True):
nonlocal last_lsn
assert endpoint.pgdata_dir is not None
log.info("executing INSERT to generate WAL")
current_lsn = Lsn(query_scalar(cur, "select pg_current_wal_lsn()"))
pg_wal_size_mb = get_dir_size(os.path.join(endpoint.pgdata_dir, "pg_wal")) / 1024 / 1024
if enable_logs:
lsn_delta_mb = (current_lsn - last_lsn) / 1024 / 1024
log.info(f"LSN delta: {lsn_delta_mb} MB, current WAL size: {pg_wal_size_mb} MB")
last_lsn = current_lsn
return pg_wal_size_mb
# generates about ~20MB of WAL, to create at least one new segment
def generate_wal(cur):
cur.execute("INSERT INTO t SELECT generate_series(1,300000), 'payload'")
neon_env_builder.num_safekeepers = 3
env = neon_env_builder.init_start()
env.neon_cli.create_branch("test_wal_deleted_after_broadcast")
# Adjust checkpoint config to prevent keeping old WAL segments
endpoint = env.endpoints.create_start(
"test_wal_deleted_after_broadcast",
config_lines=["min_wal_size=32MB", "max_wal_size=32MB", "log_checkpoints=on"],
)
pg_conn = endpoint.connect()
cur = pg_conn.cursor()
cur.execute("CREATE TABLE t(key int, value text)")
collect_stats(endpoint, cur)
# generate WAL to simulate normal workload
for _ in range(5):
generate_wal(cur)
collect_stats(endpoint, cur)
log.info("executing checkpoint")
cur.execute("CHECKPOINT")
wal_size_after_checkpoint = collect_stats(endpoint, cur)
# there shouldn't be more than 2 WAL segments (but dir may have archive_status files)
assert wal_size_after_checkpoint < 16 * 2.5
@pytest.mark.parametrize("auth_enabled", [False, True])
def test_delete_force(neon_env_builder: NeonEnvBuilder, auth_enabled: bool):
neon_env_builder.auth_enabled = auth_enabled
@@ -1699,3 +1838,83 @@ def test_idle_reconnections(neon_env_builder: NeonEnvBuilder):
assert final_stats.get("START_REPLICATION", 0) >= 1
# walproposer should connect to each safekeeper at least once
assert final_stats.get("START_WAL_PUSH", 0) >= 3
@pytest.mark.parametrize("insert_rows", [0, 100, 100000, 500000])
def test_timeline_copy(neon_env_builder: NeonEnvBuilder, insert_rows: int):
target_percents = [10, 50, 90, 100]
neon_env_builder.num_safekeepers = 3
# we need remote storage that supports copy_object S3 API
neon_env_builder.enable_safekeeper_remote_storage(RemoteStorageKind.MOCK_S3)
env = neon_env_builder.init_start()
tenant_id = env.initial_tenant
timeline_id = env.initial_timeline
endpoint = env.endpoints.create_start("main")
lsns = []
def remember_lsn():
lsn = Lsn(endpoint.safe_psql("SELECT pg_current_wal_flush_lsn()")[0][0])
lsns.append(lsn)
return lsn
# remember LSN right after timeline creation
lsn = remember_lsn()
log.info(f"LSN after timeline creation: {lsn}")
endpoint.safe_psql("create table t(key int, value text)")
timeline_status = env.safekeepers[0].http_client().timeline_status(tenant_id, timeline_id)
timeline_start_lsn = timeline_status.timeline_start_lsn
log.info(f"Timeline start LSN: {timeline_start_lsn}")
current_percent = 0.0
for new_percent in target_percents:
new_rows = insert_rows * (new_percent - current_percent) / 100
current_percent = new_percent
if new_rows == 0:
continue
endpoint.safe_psql(
f"insert into t select generate_series(1, {new_rows}), repeat('payload!', 10)"
)
# remember LSN right after reaching new_percent
lsn = remember_lsn()
log.info(f"LSN after inserting {new_rows} rows: {lsn}")
# TODO: would be also good to test cases where not all segments are uploaded to S3
for lsn in lsns:
new_timeline_id = TimelineId.generate()
log.info(f"Copying branch for LSN {lsn}, to timeline {new_timeline_id}")
orig_digest = (
env.safekeepers[0]
.http_client()
.timeline_digest(tenant_id, timeline_id, timeline_start_lsn, lsn)
)
log.info(f"Original digest: {orig_digest}")
for sk in env.safekeepers:
sk.http_client().copy_timeline(
tenant_id,
timeline_id,
{
"target_timeline_id": str(new_timeline_id),
"until_lsn": str(lsn),
},
)
new_digest = sk.http_client().timeline_digest(
tenant_id, new_timeline_id, timeline_start_lsn, lsn
)
log.info(f"Digest after timeline copy on safekeeper {sk.id}: {new_digest}")
assert orig_digest == new_digest
# TODO: test timelines can start after copy

View File

@@ -475,6 +475,46 @@ def test_unavailability(neon_env_builder: NeonEnvBuilder):
asyncio.run(run_unavailability(env, endpoint))
async def run_recovery_uncommitted(env: NeonEnv):
(sk1, sk2, _) = env.safekeepers
env.neon_cli.create_branch("test_recovery_uncommitted")
ep = env.endpoints.create_start("test_recovery_uncommitted")
ep.safe_psql("create table t(key int, value text)")
ep.safe_psql("insert into t select generate_series(1, 100), 'payload'")
# insert with only one safekeeper up to create tail of flushed but not committed WAL
sk1.stop()
sk2.stop()
conn = await ep.connect_async()
# query should hang, so execute in separate task
bg_query = asyncio.create_task(
conn.execute("insert into t select generate_series(1, 2000), 'payload'")
)
sleep_sec = 2
await asyncio.sleep(sleep_sec)
# it must still be not finished
assert not bg_query.done()
# note: destoy will kill compute_ctl, preventing it waiting for hanging sync-safekeepers.
ep.stop_and_destroy()
# Start one of sks to make quorum online plus compute and ensure they can
# sync.
sk2.start()
ep = env.endpoints.create_start(
"test_recovery_uncommitted",
)
ep.safe_psql("insert into t select generate_series(1, 2000), 'payload'")
# Test pulling uncommitted WAL (up to flush_lsn) during recovery.
def test_recovery_uncommitted(neon_env_builder: NeonEnvBuilder):
neon_env_builder.num_safekeepers = 3
env = neon_env_builder.init_start()
asyncio.run(run_recovery_uncommitted(env))
@dataclass
class RaceConditionTest:
iteration: int