pagebench-based GetPage@LSN performance test (#6214)

This commit is contained in:
Christian Schwarz
2024-01-24 12:51:53 +01:00
committed by GitHub
parent a72af29d12
commit 996abc9563
16 changed files with 852 additions and 26 deletions

View File

@@ -1017,7 +1017,10 @@ impl Tenant {
// IndexPart is the source of truth.
self.clean_up_timelines(&existent_timelines)?;
failpoint_support::sleep_millis_async!("attach-before-activate", &self.cancel);
fail::fail_point!("attach-before-activate", |_| {
anyhow::bail!("attach-before-activate");
});
failpoint_support::sleep_millis_async!("attach-before-activate-sleep", &self.cancel);
info!("Done");

View File

@@ -39,6 +39,9 @@ SETUP COMPLETE
To run your local neon.git build on the instance store volume,
run the following commands from the top of the neon.git checkout
# raise file descriptor limit of your shell and its child processes
sudo prlimit -p $$ --nofile=800000:800000
# test suite run
export TEST_OUTPUT="$TEST_OUTPUT"
DEFAULT_PG_VERSION=15 BUILD_TYPE=release ./scripts/pytest test_runner/performance/test_latency.py

View File

@@ -10,16 +10,18 @@ import shutil
import subprocess
import tempfile
import textwrap
import threading
import time
import uuid
from contextlib import closing, contextmanager
from dataclasses import dataclass, field
from datetime import datetime
from fcntl import LOCK_EX, LOCK_UN, flock
from functools import cached_property
from itertools import chain, product
from pathlib import Path
from types import TracebackType
from typing import Any, Dict, Iterator, List, Optional, Tuple, Type, Union, cast
from typing import Any, Callable, Dict, Iterator, List, Optional, Tuple, Type, Union, cast
from urllib.parse import urlparse
import asyncpg
@@ -49,7 +51,10 @@ from fixtures.pageserver.allowed_errors import (
)
from fixtures.pageserver.http import PageserverHttpClient
from fixtures.pageserver.types import IndexPartDump
from fixtures.pageserver.utils import wait_for_last_record_lsn, wait_for_upload
from fixtures.pageserver.utils import (
wait_for_last_record_lsn,
wait_for_upload,
)
from fixtures.pg_version import PgVersion
from fixtures.port_distributor import PortDistributor
from fixtures.remote_storage import (
@@ -424,6 +429,7 @@ class NeonEnvBuilder:
pg_distrib_dir: Path,
pg_version: PgVersion,
test_name: str,
top_output_dir: Path,
test_output_dir: Path,
test_overlay_dir: Optional[Path] = None,
pageserver_remote_storage: Optional[RemoteStorage] = None,
@@ -473,6 +479,7 @@ class NeonEnvBuilder:
self.test_overlay_dir = test_overlay_dir
self.overlay_mounts_created_by_us: List[Tuple[str, Path]] = []
self.config_init_force: Optional[str] = None
self.top_output_dir = top_output_dir
assert test_name.startswith(
"test_"
@@ -526,6 +533,64 @@ class NeonEnvBuilder:
return env
def build_and_use_snapshot(
self, global_ident: str, create_env_for_snapshot: Callable[[NeonEnvBuilder], NeonEnv]
) -> NeonEnv:
if os.getenv("CI", "false") == "true":
log.info("do not use snapshots in ephemeral CI environment")
env = create_env_for_snapshot(self)
env.stop(immediate=True, ps_assert_metric_no_errors=False)
return env
with shared_snapshot_dir(self.top_output_dir, global_ident) as snapshot_dir:
if not snapshot_dir.is_initialized():
self._build_and_use_snapshot_impl(snapshot_dir, create_env_for_snapshot)
assert snapshot_dir.is_initialized()
return self.from_repo_dir(snapshot_dir.path)
def _build_and_use_snapshot_impl(
self,
snapshot_dir: SnapshotDirLocked,
create_env_for_snapshot: Callable[[NeonEnvBuilder], NeonEnv],
):
if snapshot_dir.path.exists():
shutil.rmtree(snapshot_dir.path)
if self.test_overlay_dir is not None:
# Make repo_dir an overlayfs mount with lowerdir being the empty snapshot_dir.
# When we're done filling up repo_dir, tear everything down, unmount the overlayfs, and use
# the upperdir as the snapshot. This is equivalent to docker `FROM scratch`.
assert not self.repo_dir.exists()
assert self.repo_dir.parent.exists()
snapshot_dir.path.mkdir()
self.overlay_mount("create-snapshot-repo-dir", snapshot_dir.path, self.repo_dir)
self.config_init_force = "empty-dir-ok"
env = create_env_for_snapshot(self)
assert self.env is not None
assert self.env == env
# shut down everything for snapshot
env.stop(immediate=True, ps_assert_metric_no_errors=True)
# TODO: all kinds of assertions to ensure the env is unused
if self.test_overlay_dir is None:
log.info("take snapshot by moving repo dir")
env.repo_dir.rename(snapshot_dir.path)
else:
log.info("take snapshot by using overlayfs upperdir")
self.overlay_unmount_and_move("create-snapshot-repo-dir", snapshot_dir.path)
log.info("remove empty repo_dir (previously mountpoint) for snapshot overlay_mount")
env.repo_dir.rmdir()
# TODO from here on, we should be able to reset / goto top where snapshot_dir.is_initialized()
log.info("make repo_dir an overlayfs mount of the snapshot we just created")
assert not env.repo_dir.exists(), "both branches above should remove it"
snapshot_dir.set_initialized()
self.env = None # so that from_repo_dir works again
def from_repo_dir(
self,
repo_dir: Path,
@@ -557,10 +622,15 @@ class NeonEnvBuilder:
tenants_from_dir = ps_dir / "tenants"
tenants_to_dir = self.repo_dir / ps_dir.name / "tenants"
log.info(f"Copying pageserver tenants directory {tenants_from_dir} to {tenants_to_dir}")
if self.test_overlay_dir is None:
log.info(
f"Copying pageserver tenants directory {tenants_from_dir} to {tenants_to_dir}"
)
shutil.copytree(tenants_from_dir, tenants_to_dir)
else:
log.info(
f"Creating overlayfs mount of pageserver tenants directory {tenants_from_dir} to {tenants_to_dir}"
)
self.overlay_mount(f"{ps_dir.name}:tenants", tenants_from_dir, tenants_to_dir)
for sk_from_dir in (repo_dir / "safekeepers").glob("sk*"):
@@ -571,10 +641,12 @@ class NeonEnvBuilder:
shutil.rmtree(self.repo_dir / "local_fs_remote_storage", ignore_errors=True)
if self.test_overlay_dir is None:
log.info("Copying local_fs_remote_storage directory from snapshot")
shutil.copytree(
repo_dir / "local_fs_remote_storage", self.repo_dir / "local_fs_remote_storage"
)
else:
log.info("Creating overlayfs mount of local_fs_remote_storage directory from snapshot")
self.overlay_mount(
"local_fs_remote_storage",
repo_dir / "local_fs_remote_storage",
@@ -631,6 +703,54 @@ class NeonEnvBuilder:
)
self.overlay_mounts_created_by_us.append((ident, dstdir))
def _overlay_umount(self, mountpoint: Path):
cmd = ["sudo", "umount", str(mountpoint)]
assert mountpoint.is_mount()
subprocess_capture(
self.test_output_dir, cmd, check=True, echo_stderr=True, echo_stdout=True
)
def overlay_unmount_and_move(self, ident: str, dst: Path):
"""
Unmount previously established overlayfs mount at `dstdir` and move the upperdir contents to `dst`.
If `dst` is an empty directory, it gets replaced.
Caller is responsible for ensuring the unmount will succeed, i.e., that there aren't any nested mounts.
Raises exception if self.test_overlay_dir is None
"""
assert self.test_overlay_dir is not None
# not mutating state yet, make checks
ident_state_dir = self.test_overlay_dir / ident
assert ident_state_dir.is_dir()
upper = ident_state_dir / "upper"
work = ident_state_dir / "work"
assert upper.is_dir()
assert work.is_dir()
assert (
self.test_overlay_dir not in dst.parents
), "otherwise workdir cleanup below wouldn't work"
# find index, still not mutating state
idxmap = {
existing_ident: idx
for idx, (existing_ident, _) in enumerate(self.overlay_mounts_created_by_us)
}
idx = idxmap.get(ident)
if idx is None:
raise RuntimeError(f"cannot find mount for ident {ident}")
if dst.is_dir():
dst.rmdir() # raises exception if not empty, which is what we want
_, mountpoint = self.overlay_mounts_created_by_us.pop(idx)
self._overlay_umount(mountpoint)
upper.rename(dst)
# we moved the upperdir, clean up workdir and then its parent ident_state_dir
cmd = ["sudo", "rm", "-rf", str(work)]
subprocess_capture(
self.test_output_dir, cmd, check=True, echo_stderr=True, echo_stdout=True
)
ident_state_dir.rmdir() # should be empty since we moved `upper` out
def overlay_cleanup_teardown(self):
"""
Unmount the overlayfs mounts created by `self.overlay_mount()`.
@@ -641,13 +761,10 @@ class NeonEnvBuilder:
while len(self.overlay_mounts_created_by_us) > 0:
(ident, mountpoint) = self.overlay_mounts_created_by_us.pop()
ident_state_dir = self.test_overlay_dir / ident
cmd = ["sudo", "umount", str(mountpoint)]
log.info(
f"Unmounting overlayfs mount created during setup for ident {ident} at {mountpoint}: {cmd}"
)
subprocess_capture(
self.test_output_dir, cmd, check=True, echo_stderr=True, echo_stdout=True
f"Unmounting overlayfs mount created during setup for ident {ident} at {mountpoint}"
)
self._overlay_umount(mountpoint)
log.info(
f"Cleaning up overlayfs state dir (owned by root user) for ident {ident} at {ident_state_dir}"
)
@@ -725,8 +842,15 @@ class NeonEnvBuilder:
if self.preserve_database_files:
return
overlayfs_mounts = {mountpoint for _, mountpoint in self.overlay_mounts_created_by_us}
directories_to_clean: List[Path] = []
for test_entry in Path(self.repo_dir).glob("**/*"):
if test_entry in overlayfs_mounts:
continue
for parent in test_entry.parents:
if parent in overlayfs_mounts:
continue
if test_entry.is_file():
test_file = test_entry
if ATTACHMENT_NAME_REGEX.fullmatch(test_file.name):
@@ -775,13 +899,6 @@ class NeonEnvBuilder:
log.error(f"Error during remote storage scrub: {e}")
cleanup_error = e
try:
self.overlay_cleanup_teardown()
except Exception as e:
log.error(f"Error cleaning up overlay state: {e}")
if cleanup_error is not None:
cleanup_error = e
try:
self.cleanup_remote_storage()
except Exception as e:
@@ -802,6 +919,13 @@ class NeonEnvBuilder:
for pageserver in self.env.pageservers:
pageserver.assert_no_errors()
try:
self.overlay_cleanup_teardown()
except Exception as e:
log.error(f"Error cleaning up overlay state: {e}")
if cleanup_error is not None:
cleanup_error = e
class NeonEnv:
"""
@@ -1082,6 +1206,7 @@ def _shared_simple_env(
shutil.rmtree(repo_dir, ignore_errors=True)
with NeonEnvBuilder(
top_output_dir=top_output_dir,
repo_dir=repo_dir,
port_distributor=port_distributor,
broker=default_broker,
@@ -1130,6 +1255,7 @@ def neon_env_builder(
run_id: uuid.UUID,
request: FixtureRequest,
test_overlay_dir: Path,
top_output_dir: Path,
) -> Iterator[NeonEnvBuilder]:
"""
Fixture to create a Neon environment for test.
@@ -1149,6 +1275,7 @@ def neon_env_builder(
# Return the builder to the caller
with NeonEnvBuilder(
top_output_dir=top_output_dir,
repo_dir=Path(repo_dir),
port_distributor=port_distributor,
mock_s3_server=mock_s3_server,
@@ -3487,6 +3614,10 @@ def get_test_overlay_dir(request: FixtureRequest, top_output_dir: Path) -> Path:
return _get_test_dir(request, top_output_dir, "overlay-")
def get_shared_snapshot_dir_path(top_output_dir: Path, snapshot_name: str) -> Path:
return top_output_dir / "shared-snapshots" / snapshot_name
def get_test_repo_dir(request: FixtureRequest, top_output_dir: Path) -> Path:
return get_test_output_dir(request, top_output_dir) / "repo"
@@ -3533,6 +3664,75 @@ def test_output_dir(
allure_attach_from_dir(test_dir)
class FileAndThreadLock:
def __init__(self, path: Path):
self.path = path
self.thread_lock = threading.Lock()
self.fd: Optional[int] = None
def __enter__(self):
self.fd = os.open(self.path, os.O_CREAT | os.O_WRONLY)
# lock thread lock before file lock so that there's no race
# around flocking / funlocking the file lock
self.thread_lock.acquire()
flock(self.fd, LOCK_EX)
def __exit__(self, exc_type, exc_value, exc_traceback):
assert self.fd is not None
assert self.thread_lock.locked() # ... by us
flock(self.fd, LOCK_UN)
self.thread_lock.release()
os.close(self.fd)
self.fd = None
class SnapshotDirLocked:
def __init__(self, parent: SnapshotDir):
self._parent = parent
def is_initialized(self):
# TODO: in the future, take a `tag` as argument and store it in the marker in set_initialized.
# Then, in this function, compare marker file contents with the tag to invalidate the snapshot if the tag changed.
return self._parent._marker_file_path.exists()
def set_initialized(self):
self._parent._marker_file_path.write_text("")
@property
def path(self) -> Path:
return self._parent._path / "snapshot"
class SnapshotDir:
_path: Path
def __init__(self, path: Path):
self._path = path
assert self._path.is_dir()
self._lock = FileAndThreadLock(self._lock_file_path)
@property
def _lock_file_path(self) -> Path:
return self._path / "initializing.flock"
@property
def _marker_file_path(self) -> Path:
return self._path / "initialized.marker"
def __enter__(self) -> SnapshotDirLocked:
self._lock.__enter__()
return SnapshotDirLocked(self)
def __exit__(self, exc_type, exc_value, exc_traceback):
self._lock.__exit__(exc_type, exc_value, exc_traceback)
def shared_snapshot_dir(top_output_dir, ident: str) -> SnapshotDir:
snapshot_dir_path = get_shared_snapshot_dir_path(top_output_dir, ident)
snapshot_dir_path.mkdir(exist_ok=True, parents=True)
return SnapshotDir(snapshot_dir_path)
@pytest.fixture(scope="function")
def test_overlay_dir(request: FixtureRequest, top_output_dir: Path) -> Optional[Path]:
"""
@@ -3542,7 +3742,7 @@ def test_overlay_dir(request: FixtureRequest, top_output_dir: Path) -> Optional[
The procedure cleans up after previous runs that were aborted (e.g. due to Ctrl-C, OOM kills, etc).
"""
if os.getenv("NEON_ENV_BUILDER_FROM_REPO_DIR_USE_OVERLAYFS") is None:
if os.getenv("NEON_ENV_BUILDER_USE_OVERLAYFS_FOR_SNAPSHOTS") is None:
return None
overlay_dir = get_test_overlay_dir(request, top_output_dir)

View File

@@ -0,0 +1,85 @@
import concurrent.futures
import time
from typing import Any, Callable, Dict, Tuple
import fixtures.pageserver.remote_storage
from fixtures.log_helper import log
from fixtures.neon_fixtures import (
NeonEnv,
NeonEnvBuilder,
)
from fixtures.pageserver.utils import (
wait_until_tenant_state,
)
from fixtures.remote_storage import LocalFsStorage, RemoteStorageKind
from fixtures.types import TenantId, TimelineId
def single_timeline(
neon_env_builder: NeonEnvBuilder,
setup_template: Callable[[NeonEnv], Tuple[TenantId, TimelineId, Dict[str, Any]]],
ncopies: int,
) -> NeonEnv:
"""
Create `ncopies` duplicates of a template tenant that has a single timeline.
"""
neon_env_builder.enable_pageserver_remote_storage(RemoteStorageKind.LOCAL_FS)
env = neon_env_builder.init_start()
remote_storage = env.pageserver_remote_storage
assert isinstance(remote_storage, LocalFsStorage)
ps_http = env.pageserver.http_client()
# clean up the useless default tenant
ps_http.tenant_delete(env.initial_tenant)
log.info("invoking callback to create template tenant")
template_tenant, template_timeline, template_config = setup_template(env)
log.info(
f"template tenant is template_tenant={template_tenant} template_timeline={template_timeline}"
)
log.info("detach template tenant form pageserver")
env.pageserver.tenant_detach(template_tenant)
env.pageserver.allowed_errors.append(
# tenant detach causes this because the underlying attach-hook removes the tenant from attachment_service entirely
".*Dropped remote consistent LSN updates.*",
)
log.info(f"duplicating template tenant {ncopies} times in S3")
tenants = fixtures.pageserver.remote_storage.duplicate_tenant(env, template_tenant, ncopies)
log.info("attach duplicated tenants to pageserver")
# In theory we could just attach all the tenants, force on-demand downloads via mgmt API, and be done.
# However, on-demand downloads are quite slow ATM.
# => do the on-demand downloads in Python.
assert ps_http.tenant_list() == []
# make the attach fail after it created enough on-disk state to retry loading
# the tenant next startup, but before it can start background loops that would start download
ps_http.configure_failpoints(("attach-before-activate", "return"))
env.pageserver.allowed_errors.append(
".*attach failed, setting tenant state to Broken: attach-before-activate.*"
)
def attach_broken(tenant):
env.pageserver.tenant_attach(
tenant,
config=template_config.copy(),
)
time.sleep(0.1)
wait_until_tenant_state(ps_http, tenant, "Broken", 10)
with concurrent.futures.ThreadPoolExecutor(max_workers=22) as executor:
executor.map(attach_broken, tenants)
env.pageserver.stop(
immediate=True
) # clears the failpoint as a side-effect; immediate to avoid hitting neon_local's timeout
tenant_timelines = list(map(lambda tenant: (tenant, template_timeline), tenants))
log.info("python-side on-demand download the layer files into local tenant dir")
fixtures.pageserver.remote_storage.copy_all_remote_layer_files_to_local_tenant_dir(
env, tenant_timelines
)
return env

View File

@@ -0,0 +1,116 @@
import concurrent.futures
import os
import queue
import shutil
import threading
from pathlib import Path
from typing import Any, List, Tuple
from fixtures.neon_fixtures import NeonEnv, Pagectl
from fixtures.pageserver.types import (
InvalidFileName,
parse_layer_file_name,
)
from fixtures.remote_storage import LocalFsStorage
from fixtures.types import TenantId, TimelineId
def duplicate_one_tenant(env: NeonEnv, template_tenant: TenantId, new_tenant: TenantId):
remote_storage = env.pageserver_remote_storage
assert isinstance(remote_storage, LocalFsStorage)
src_timelines_dir: Path = remote_storage.tenant_path(template_tenant) / "timelines"
assert src_timelines_dir.is_dir(), f"{src_timelines_dir} is not a directory"
assert isinstance(remote_storage, LocalFsStorage)
dst_timelines_dir: Path = remote_storage.tenant_path(new_tenant) / "timelines"
dst_timelines_dir.parent.mkdir(parents=False, exist_ok=False)
dst_timelines_dir.mkdir(parents=False, exist_ok=False)
for tl in src_timelines_dir.iterdir():
src_tl_dir = src_timelines_dir / tl.name
assert src_tl_dir.is_dir(), f"{src_tl_dir} is not a directory"
dst_tl_dir = dst_timelines_dir / tl.name
dst_tl_dir.mkdir(parents=False, exist_ok=False)
for file in tl.iterdir():
shutil.copy2(file, dst_tl_dir)
if "__" in file.name:
Pagectl(env).raw_cli(
[
"layer",
"rewrite-summary",
str(dst_tl_dir / file.name),
"--new-tenant-id",
str(new_tenant),
]
)
else:
# index_part etc need no patching
pass
return None
def duplicate_tenant(env: NeonEnv, template_tenant: TenantId, ncopies: int) -> List[TenantId]:
assert isinstance(env.pageserver_remote_storage, LocalFsStorage)
def work(tenant_id):
duplicate_one_tenant(env, template_tenant, tenant_id)
new_tenants: List[TenantId] = [TenantId.generate() for _ in range(0, ncopies)]
with concurrent.futures.ThreadPoolExecutor(max_workers=8) as executor:
executor.map(work, new_tenants)
return new_tenants
def local_layer_name_from_remote_name(remote_name: str) -> str:
try:
return parse_layer_file_name(remote_name).to_str()
except InvalidFileName as e:
comps = remote_name.rsplit("-", 1)
if len(comps) == 1:
raise InvalidFileName("no generation suffix found") from e
else:
assert len(comps) == 2
layer_file_name, _generation = comps
try:
return parse_layer_file_name(layer_file_name).to_str()
except InvalidFileName:
raise
def copy_all_remote_layer_files_to_local_tenant_dir(
env: NeonEnv, tenant_timelines: List[Tuple[TenantId, TimelineId]]
):
remote_storage = env.pageserver_remote_storage
assert isinstance(remote_storage, LocalFsStorage)
work: queue.Queue[Any] = queue.Queue()
for tenant, timeline in tenant_timelines:
remote_timeline_path = remote_storage.timeline_path(tenant, timeline)
local_timeline_path = env.pageserver.timeline_dir(tenant, timeline)
local_timeline_path.mkdir(parents=True, exist_ok=True)
downloads = {}
for remote_layer in remote_timeline_path.glob("*__*"):
local_name = local_layer_name_from_remote_name(remote_layer.name)
assert local_name not in downloads, "remote storage must have had split brain"
downloads[local_name] = remote_layer
for local_name, remote_path in downloads.items():
work.put((remote_path, local_timeline_path / local_name))
def copy_layer_worker(queue):
while True:
item = queue.get()
if item is None:
return
remote_path, local_path = item
# not copy2, so it looks like a recent download, in case that's relevant to e.g. eviction
shutil.copy(remote_path, local_path, follow_symlinks=False)
workers = []
n_threads = os.cpu_count() or 1
for _ in range(0, n_threads):
w = threading.Thread(target=copy_layer_worker, args=[work])
workers.append(w)
w.start()
work.put(None)
for w in workers:
w.join()

View File

@@ -31,10 +31,10 @@ class DeltaLayerFileName:
key_start: Key
key_end: Key
def is_l0(self):
def is_l0(self) -> bool:
return self.key_start == KEY_MIN and self.key_end == KEY_MAX
def to_str(self):
def to_str(self) -> str:
ret = f"{self.key_start.as_int():036X}-{self.key_end.as_int():036X}__{self.lsn_start.as_int():016X}-{self.lsn_end.as_int():016X}"
assert self == parse_layer_file_name(ret)
return ret
@@ -107,7 +107,7 @@ def parse_layer_file_name(file_name: str) -> LayerFileName:
except InvalidFileName:
pass
raise ValueError()
raise InvalidFileName("neither image nor delta layer")
def is_future_layer(layer_file_name: LayerFileName, disk_consistent_lsn: Lsn):

View File

@@ -63,6 +63,14 @@ def wait_for_upload(
)
def _tenant_in_expected_state(tenant_info: Dict[str, Any], expected_state: str):
if tenant_info["state"]["slug"] == expected_state:
return True
if tenant_info["state"]["slug"] == "Broken":
raise RuntimeError(f"tenant became Broken, not {expected_state}")
return False
def wait_until_tenant_state(
pageserver_http: PageserverHttpClient,
tenant_id: TenantId,
@@ -80,10 +88,8 @@ def wait_until_tenant_state(
log.debug(f"Tenant {tenant_id} state retrieval failure: {e}")
else:
log.debug(f"Tenant {tenant_id} data: {tenant}")
if tenant["state"]["slug"] == expected_state:
if _tenant_in_expected_state(tenant, expected_state):
return tenant
if tenant["state"]["slug"] == "Broken":
raise RuntimeError(f"tenant became Broken, not {expected_state}")
time.sleep(period)
@@ -92,6 +98,34 @@ def wait_until_tenant_state(
)
def wait_until_all_tenants_state(
pageserver_http: PageserverHttpClient,
expected_state: str,
iterations: int,
period: float = 1.0,
http_error_ok: bool = True,
):
"""
Like wait_until_tenant_state, but checks all tenants.
"""
for _ in range(iterations):
try:
tenants = pageserver_http.tenant_list()
except Exception as e:
if http_error_ok:
log.debug(f"Failed to list tenants: {e}")
else:
raise
else:
if all(map(lambda tenant: _tenant_in_expected_state(tenant, expected_state), tenants)):
return
time.sleep(period)
raise Exception(
f"Not all tenants became active {expected_state} within {iterations * period} seconds"
)
def wait_until_timeline_state(
pageserver_http: PageserverHttpClient,
tenant_id: Union[TenantId, TenantShardId],

View File

@@ -397,3 +397,36 @@ def run_pg_bench_small(pg_bin: "PgBin", connstr: str):
}
"""
pg_bin.run(["pgbench", "-i", "-I dtGvp", "-s1", connstr])
def humantime_to_ms(humantime: str) -> float:
"""
Converts Rust humantime's output string to milliseconds.
humantime_to_ms("1h 1ms 406us") -> 3600001.406
"""
unit_multiplier_map = {
"ns": 1e-6,
"us": 1e-3,
"ms": 1,
"s": 1e3,
"m": 1e3 * 60,
"h": 1e3 * 60 * 60,
}
matcher = re.compile(rf"^(\d+)({'|'.join(unit_multiplier_map.keys())})$")
total_ms = 0.0
if humantime == "0":
return total_ms
for item in humantime.split():
if (match := matcher.search(item)) is not None:
n, unit = match.groups()
total_ms += int(n) * unit_multiplier_map[unit]
else:
raise ValueError(
f"can't parse '{item}' (from string '{humantime}'), known units are {', '.join(unit_multiplier_map.keys())}."
)
return round(total_ms, 3)

View File

@@ -0,0 +1,16 @@
How to reproduce benchmark results / run these benchmarks interactively.
1. Get an EC2 instance with Instance Store. Use the same instance type as used for the benchmark run.
2. Mount the Instance Store => `neon.git/scripts/ps_ec2_setup_instance_store`
3. Use a pytest command line (see other READMEs further up in the pytest hierarchy).
For tests that take a long time to set up / consume a lot of storage space,
we use the test suite's repo_dir snapshotting functionality (`from_repo_dir`).
It supports mounting snapshots using overlayfs, which improves iteration time.
Here's a full command line.
```
RUST_BACKTRACE=1 NEON_ENV_BUILDER_USE_OVERLAYFS_FOR_SNAPSHOTS=1 DEFAULT_PG_VERSION=15 BUILD_TYPE=release \
./scripts/pytest test_runner/performance/pageserver/pagebench/test_pageserver_max_throughput_getpage_at_latest_lsn.py
````

View File

@@ -0,0 +1,8 @@
"""
Tests that aren't really tests or benchmarks.
They're intended for the case where we want to standardize & automate setup,
but then debug a performance problem interactively.
It's kind of an abuse of the test framework, but, it's our only tool right
now to automate a complex test bench setup.
"""

View File

@@ -0,0 +1,79 @@
import os
import pdb
import fixtures.pageserver.many_tenants as many_tenants
import pytest
from fixtures.neon_fixtures import (
NeonEnv,
NeonEnvBuilder,
PgBin,
last_flush_lsn_upload,
)
from performance.pageserver.util import ensure_pageserver_ready_for_benchmarking
"""
Usage:
DEFAULT_PG_VERSION=15 BUILD_TYPE=debug NEON_ENV_BUILDER_USE_OVERLAYFS_FOR_SNAPSHOTS=1 INTERACTIVE=true \
./scripts/pytest --timeout 0 test_runner/performance/pageserver/interactive/test_many_small_tenants.py
"""
@pytest.mark.skipif(
os.environ.get("INTERACTIVE", "false") != "true",
reason="test is for interactive use only",
)
def test_many_small_tenants(
neon_env_builder: NeonEnvBuilder,
pg_bin: PgBin,
):
_env = setup_env(neon_env_builder, 2) # vary this to the desired number of tenants
_pg_bin = pg_bin
# drop into pdb so that we can debug pageserver interactively, use pdb here
# For example, to interactively examine pageserver startup behavior, call
# _env.pageserver.stop(immediate=True)
# _env.pageserver.start()
# from the pdb shell.
pdb.set_trace()
def setup_env(
neon_env_builder: NeonEnvBuilder,
n_tenants: int,
) -> NeonEnv:
def setup_template(env: NeonEnv):
# create our template tenant
config = {
"gc_period": "0s",
"checkpoint_timeout": "10 years",
"compaction_period": "20 s",
"compaction_threshold": 10,
"compaction_target_size": 134217728,
"checkpoint_distance": 268435456,
"image_creation_threshold": 3,
}
template_tenant, template_timeline = env.neon_cli.create_tenant(set_default=True)
env.pageserver.tenant_detach(template_tenant)
env.pageserver.allowed_errors.append(
# tenant detach causes this because the underlying attach-hook removes the tenant from attachment_service entirely
".*Dropped remote consistent LSN updates.*",
)
env.pageserver.tenant_attach(template_tenant, config)
ep = env.endpoints.create_start("main", tenant_id=template_tenant)
ep.safe_psql("create table foo(b text)")
for _ in range(0, 8):
ep.safe_psql("insert into foo(b) values ('some text')")
last_flush_lsn_upload(env, ep, template_tenant, template_timeline)
ep.stop_and_destroy()
return (template_tenant, template_timeline, config)
def doit(neon_env_builder: NeonEnvBuilder) -> NeonEnv:
return many_tenants.single_timeline(neon_env_builder, setup_template, n_tenants)
env = neon_env_builder.build_and_use_snapshot(f"many-small-tenants-{n_tenants}", doit)
env.start()
ensure_pageserver_ready_for_benchmarking(env, n_tenants)
return env

View File

@@ -0,0 +1,10 @@
"""
Pagebench-based performance regression tests.
The defining characteristic of tests in this sub-directory is that they
are component-level tests, i.e., they exercise pageserver directly using `pagebench`
instead of benchmarking the full stack.
See https://github.com/neondatabase/neon/issues/5771
for the context in which this was developed.
"""

View File

@@ -0,0 +1,210 @@
import json
from pathlib import Path
from typing import Any, Dict, Tuple
import fixtures.pageserver.many_tenants as many_tenants
import pytest
from fixtures.benchmark_fixture import MetricReport, NeonBenchmarker
from fixtures.log_helper import log
from fixtures.neon_fixtures import (
NeonEnv,
NeonEnvBuilder,
PgBin,
wait_for_last_flush_lsn,
)
from fixtures.utils import get_scale_for_db, humantime_to_ms
from performance.pageserver.util import ensure_pageserver_ready_for_benchmarking
# For reference, the space usage of the snapshots:
# admin@ip-172-31-13-23:[~/neon-main]: sudo du -hs /instance_store/test_output/shared-snapshots
# 137G /instance_store/test_output/shared-snapshots
# admin@ip-172-31-13-23:[~/neon-main]: sudo du -hs /instance_store/test_output/shared-snapshots/*
# 1.8G /instance_store/test_output/shared-snapshots/max_throughput_latest_lsn-1-13
# 1.1G /instance_store/test_output/shared-snapshots/max_throughput_latest_lsn-1-6
# 8.5G /instance_store/test_output/shared-snapshots/max_throughput_latest_lsn-10-13
# 5.1G /instance_store/test_output/shared-snapshots/max_throughput_latest_lsn-10-6
# 76G /instance_store/test_output/shared-snapshots/max_throughput_latest_lsn-100-13
# 46G /instance_store/test_output/shared-snapshots/max_throughput_latest_lsn-100-6
@pytest.mark.parametrize("duration", [30])
@pytest.mark.parametrize("pgbench_scale", [get_scale_for_db(s) for s in [100, 200]])
@pytest.mark.parametrize("n_tenants", [1, 10, 100])
@pytest.mark.timeout(
10000
) # TODO: this value is just "a really high number"; have this per instance type
def test_pageserver_max_throughput_getpage_at_latest_lsn(
neon_env_builder: NeonEnvBuilder,
zenbenchmark: NeonBenchmarker,
pg_bin: PgBin,
n_tenants: int,
pgbench_scale: int,
duration: int,
):
def record(metric, **kwargs):
zenbenchmark.record(
metric_name=f"pageserver_max_throughput_getpage_at_latest_lsn.{metric}", **kwargs
)
params: Dict[str, Tuple[Any, Dict[str, Any]]] = {}
# params from fixtures
params.update(
{
"n_tenants": (n_tenants, {"unit": ""}),
"pgbench_scale": (pgbench_scale, {"unit": ""}),
"duration": (duration, {"unit": "s"}),
}
)
# configure cache sizes like in prod
page_cache_size = 16384
max_file_descriptors = 500000
neon_env_builder.pageserver_config_override = (
f"page_cache_size={page_cache_size}; max_file_descriptors={max_file_descriptors}"
)
params.update(
{
"pageserver_config_override.page_cache_size": (
page_cache_size * 8192,
{"unit": "byte"},
),
"pageserver_config_override.max_file_descriptors": (max_file_descriptors, {"unit": ""}),
}
)
for param, (value, kwargs) in params.items():
record(param, metric_value=value, report=MetricReport.TEST_PARAM, **kwargs)
env = setup_pageserver_with_pgbench_tenants(neon_env_builder, pg_bin, n_tenants, pgbench_scale)
run_benchmark_max_throughput_latest_lsn(env, pg_bin, record, duration)
def run_benchmark_max_throughput_latest_lsn(
env: NeonEnv, pg_bin: PgBin, record, duration_secs: int
):
"""
Benchmark `env.pageserver` for max throughput @ latest LSN and record results in `zenbenchmark`.
"""
ps_http = env.pageserver.http_client()
cmd = [
str(env.neon_binpath / "pagebench"),
"get-page-latest-lsn",
"--mgmt-api-endpoint",
ps_http.base_url,
"--page-service-connstring",
env.pageserver.connstr(password=None),
"--runtime",
f"{duration_secs}s",
# don't specify the targets explicitly, let pagebench auto-discover them
]
log.info(f"command: {' '.join(cmd)}")
basepath = pg_bin.run_capture(cmd, with_command_header=False)
results_path = Path(basepath + ".stdout")
log.info(f"Benchmark results at: {results_path}")
with open(results_path, "r") as f:
results = json.load(f)
log.info(f"Results:\n{json.dumps(results, sort_keys=True, indent=2)}")
total = results["total"]
metric = "request_count"
record(
metric,
metric_value=total[metric],
unit="",
report=MetricReport.HIGHER_IS_BETTER,
)
metric = "latency_mean"
record(
metric,
metric_value=humantime_to_ms(total[metric]),
unit="ms",
report=MetricReport.LOWER_IS_BETTER,
)
metric = "latency_percentiles"
for k, v in total[metric].items():
record(
f"{metric}.{k}",
metric_value=humantime_to_ms(v),
unit="ms",
report=MetricReport.LOWER_IS_BETTER,
)
def setup_pageserver_with_pgbench_tenants(
neon_env_builder: NeonEnvBuilder,
pg_bin: PgBin,
n_tenants: int,
scale: int,
) -> NeonEnv:
"""
Utility function to set up a pageserver with a given number of identical tenants.
Each tenant is a pgbench tenant, initialize to a certain scale, and treated afterwards
with a repeat application of (pgbench simple-update workload, checkpoint, compact).
"""
def setup_template(env: NeonEnv):
# use a config that makes production of on-disk state timing-insensitive
# as we ingest data into the tenant.
config = {
"gc_period": "0s", # disable periodic gc
"checkpoint_timeout": "10 years",
"compaction_period": "0s", # disable periodic compaction
"compaction_threshold": 10,
"compaction_target_size": 134217728,
"checkpoint_distance": 268435456,
"image_creation_threshold": 3,
}
template_tenant, template_timeline = env.neon_cli.create_tenant(set_default=True)
env.pageserver.tenant_detach(template_tenant)
env.pageserver.allowed_errors.append(
# tenant detach causes this because the underlying attach-hook removes the tenant from attachment_service entirely
".*Dropped remote consistent LSN updates.*",
)
env.pageserver.tenant_attach(template_tenant, config)
ps_http = env.pageserver.http_client()
with env.endpoints.create_start("main", tenant_id=template_tenant) as ep:
pg_bin.run_capture(["pgbench", "-i", f"-s{scale}", "-I", "dtGvp", ep.connstr()])
wait_for_last_flush_lsn(env, ep, template_tenant, template_timeline)
ps_http.timeline_checkpoint(template_tenant, template_timeline)
ps_http.timeline_compact(template_tenant, template_timeline)
for _ in range(
0, 17
): # some prime number to avoid potential resonances with the "_threshold" variables from the config
# the L0s produced by this appear to have size ~5MiB
num_txns = 10_000
pg_bin.run_capture(
["pgbench", "-N", "-c1", "--transactions", f"{num_txns}", ep.connstr()]
)
wait_for_last_flush_lsn(env, ep, template_tenant, template_timeline)
ps_http.timeline_checkpoint(template_tenant, template_timeline)
ps_http.timeline_compact(template_tenant, template_timeline)
# for reference, the output at scale=6 looked like so (306M total)
# ls -sh test_output/shared-snapshots/max_throughput_latest_lsn-2-6/snapshot/pageserver_1/tenants/35c30b88ea16a7a09f82d9c6a115551b/timelines/da902b378eebe83dc8a4e81cd3dc1c59
# total 306M
# 188M 000000000000000000000000000000000000-030000000000000000000000000000000003__000000000149F060-0000000009E75829
# 4.5M 000000000000000000000000000000000000-FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF__0000000009E75829-000000000A21E919
# 33M 000000000000000000000000000000000000-FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF__000000000A21E919-000000000C20CB71
# 36M 000000000000000000000000000000000000-FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF__000000000C20CB71-000000000E470791
# 16M 000000000000000000000000000000000000-FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF__000000000E470791-000000000F34AEF1
# 8.2M 000000000000000000000000000000000000-FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF__000000000F34AEF1-000000000FABA8A9
# 6.0M 000000000000000000000000000000000000-FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF__000000000FABA8A9-000000000FFE0639
# 6.1M 000000000000000000000000000000000000-FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF__000000000FFE0639-000000001051D799
# 4.7M 000000000000000000000000000000000000-FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF__000000001051D799-0000000010908F19
# 4.6M 000000000000000000000000000000000000-FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF__0000000010908F19-0000000010CD3021
return (template_tenant, template_timeline, config)
def doit(neon_env_builder: NeonEnvBuilder) -> NeonEnv:
return many_tenants.single_timeline(neon_env_builder, setup_template, n_tenants)
env = neon_env_builder.build_and_use_snapshot(
f"max_throughput_latest_lsn-{n_tenants}-{scale}", doit
)
env.start()
ensure_pageserver_ready_for_benchmarking(env, n_tenants)
return env

View File

@@ -0,0 +1,29 @@
"""
Utilities used by all code in this sub-directory
"""
from fixtures.log_helper import log
from fixtures.neon_fixtures import NeonEnv
from fixtures.pageserver.utils import wait_until_all_tenants_state
def ensure_pageserver_ready_for_benchmarking(env: NeonEnv, n_tenants: int):
"""
Helper function.
"""
ps_http = env.pageserver.http_client()
log.info("wait for all tenants to become active")
wait_until_all_tenants_state(
ps_http, "Active", iterations=n_tenants, period=1, http_error_ok=False
)
# ensure all layers are resident for predictiable performance
tenants = [info["id"] for info in ps_http.tenant_list()]
for tenant in tenants:
for timeline in ps_http.tenant_status(tenant)["timelines"]:
info = ps_http.layer_map_info(tenant, timeline)
for layer in info.historic_layers:
assert not layer.remote
log.info("ready")

View File

@@ -482,7 +482,7 @@ def test_detach_while_attaching(
pageserver_http.tenant_detach(tenant_id)
# And re-attach
pageserver_http.configure_failpoints([("attach-before-activate", "return(5000)")])
pageserver_http.configure_failpoints([("attach-before-activate-sleep", "return(5000)")])
env.pageserver.tenant_attach(tenant_id)
@@ -681,7 +681,7 @@ def test_detach_while_activating(
pageserver_http.tenant_detach(tenant_id)
# And re-attach, but stop attach task_mgr task from completing
pageserver_http.configure_failpoints([("attach-before-activate", "return(600000)")])
pageserver_http.configure_failpoints([("attach-before-activate-sleep", "return(600000)")])
env.pageserver.tenant_attach(tenant_id)
# The tenant is in the Activating state. This should not block us from
@@ -695,7 +695,7 @@ def test_detach_while_activating(
), "Only ignored tenant should be missing"
# Subsequently attaching it again should still work
pageserver_http.configure_failpoints([("attach-before-activate", "off")])
pageserver_http.configure_failpoints([("attach-before-activate-sleep", "off")])
env.pageserver.tenant_attach(tenant_id)
wait_until_tenant_state(pageserver_http, tenant_id, "Active", 5)