From 996abc95632848c788ad12f2e3c9240fdd1ed9f4 Mon Sep 17 00:00:00 2001 From: Christian Schwarz Date: Wed, 24 Jan 2024 12:51:53 +0100 Subject: [PATCH] pagebench-based GetPage@LSN performance test (#6214) --- pageserver/src/tenant.rs | 5 +- scripts/ps_ec2_setup_instance_store | 3 + test_runner/fixtures/neon_fixtures.py | 232 ++++++++++++++++-- .../fixtures/pageserver/many_tenants.py | 85 +++++++ .../fixtures/pageserver/remote_storage.py | 116 +++++++++ test_runner/fixtures/pageserver/types.py | 6 +- test_runner/fixtures/pageserver/utils.py | 40 ++- test_runner/fixtures/utils.py | 33 +++ test_runner/performance/pageserver/README.md | 16 ++ .../performance/pageserver/__init__.py | 0 .../pageserver/interactive/__init__.py | 8 + .../interactive/test_many_small_tenants.py | 79 ++++++ .../pageserver/pagebench/__init__.py | 10 + ...er_max_throughput_getpage_at_latest_lsn.py | 210 ++++++++++++++++ test_runner/performance/pageserver/util.py | 29 +++ test_runner/regress/test_tenant_detach.py | 6 +- 16 files changed, 852 insertions(+), 26 deletions(-) create mode 100644 test_runner/fixtures/pageserver/many_tenants.py create mode 100644 test_runner/fixtures/pageserver/remote_storage.py create mode 100644 test_runner/performance/pageserver/README.md create mode 100644 test_runner/performance/pageserver/__init__.py create mode 100644 test_runner/performance/pageserver/interactive/__init__.py create mode 100644 test_runner/performance/pageserver/interactive/test_many_small_tenants.py create mode 100644 test_runner/performance/pageserver/pagebench/__init__.py create mode 100644 test_runner/performance/pageserver/pagebench/test_pageserver_max_throughput_getpage_at_latest_lsn.py create mode 100644 test_runner/performance/pageserver/util.py diff --git a/pageserver/src/tenant.rs b/pageserver/src/tenant.rs index 1d9b91c9ce..d4d4208ac2 100644 --- a/pageserver/src/tenant.rs +++ b/pageserver/src/tenant.rs @@ -1017,7 +1017,10 @@ impl Tenant { // IndexPart is the source of truth. self.clean_up_timelines(&existent_timelines)?; - failpoint_support::sleep_millis_async!("attach-before-activate", &self.cancel); + fail::fail_point!("attach-before-activate", |_| { + anyhow::bail!("attach-before-activate"); + }); + failpoint_support::sleep_millis_async!("attach-before-activate-sleep", &self.cancel); info!("Done"); diff --git a/scripts/ps_ec2_setup_instance_store b/scripts/ps_ec2_setup_instance_store index 00d37e5f83..4cca3a9857 100755 --- a/scripts/ps_ec2_setup_instance_store +++ b/scripts/ps_ec2_setup_instance_store @@ -39,6 +39,9 @@ SETUP COMPLETE To run your local neon.git build on the instance store volume, run the following commands from the top of the neon.git checkout + # raise file descriptor limit of your shell and its child processes + sudo prlimit -p $$ --nofile=800000:800000 + # test suite run export TEST_OUTPUT="$TEST_OUTPUT" DEFAULT_PG_VERSION=15 BUILD_TYPE=release ./scripts/pytest test_runner/performance/test_latency.py diff --git a/test_runner/fixtures/neon_fixtures.py b/test_runner/fixtures/neon_fixtures.py index fb1925216a..eef8de876e 100644 --- a/test_runner/fixtures/neon_fixtures.py +++ b/test_runner/fixtures/neon_fixtures.py @@ -10,16 +10,18 @@ import shutil import subprocess import tempfile import textwrap +import threading import time import uuid from contextlib import closing, contextmanager from dataclasses import dataclass, field from datetime import datetime +from fcntl import LOCK_EX, LOCK_UN, flock from functools import cached_property from itertools import chain, product from pathlib import Path from types import TracebackType -from typing import Any, Dict, Iterator, List, Optional, Tuple, Type, Union, cast +from typing import Any, Callable, Dict, Iterator, List, Optional, Tuple, Type, Union, cast from urllib.parse import urlparse import asyncpg @@ -49,7 +51,10 @@ from fixtures.pageserver.allowed_errors import ( ) from fixtures.pageserver.http import PageserverHttpClient from fixtures.pageserver.types import IndexPartDump -from fixtures.pageserver.utils import wait_for_last_record_lsn, wait_for_upload +from fixtures.pageserver.utils import ( + wait_for_last_record_lsn, + wait_for_upload, +) from fixtures.pg_version import PgVersion from fixtures.port_distributor import PortDistributor from fixtures.remote_storage import ( @@ -424,6 +429,7 @@ class NeonEnvBuilder: pg_distrib_dir: Path, pg_version: PgVersion, test_name: str, + top_output_dir: Path, test_output_dir: Path, test_overlay_dir: Optional[Path] = None, pageserver_remote_storage: Optional[RemoteStorage] = None, @@ -473,6 +479,7 @@ class NeonEnvBuilder: self.test_overlay_dir = test_overlay_dir self.overlay_mounts_created_by_us: List[Tuple[str, Path]] = [] self.config_init_force: Optional[str] = None + self.top_output_dir = top_output_dir assert test_name.startswith( "test_" @@ -526,6 +533,64 @@ class NeonEnvBuilder: return env + def build_and_use_snapshot( + self, global_ident: str, create_env_for_snapshot: Callable[[NeonEnvBuilder], NeonEnv] + ) -> NeonEnv: + if os.getenv("CI", "false") == "true": + log.info("do not use snapshots in ephemeral CI environment") + env = create_env_for_snapshot(self) + env.stop(immediate=True, ps_assert_metric_no_errors=False) + return env + + with shared_snapshot_dir(self.top_output_dir, global_ident) as snapshot_dir: + if not snapshot_dir.is_initialized(): + self._build_and_use_snapshot_impl(snapshot_dir, create_env_for_snapshot) + assert snapshot_dir.is_initialized() + + return self.from_repo_dir(snapshot_dir.path) + + def _build_and_use_snapshot_impl( + self, + snapshot_dir: SnapshotDirLocked, + create_env_for_snapshot: Callable[[NeonEnvBuilder], NeonEnv], + ): + if snapshot_dir.path.exists(): + shutil.rmtree(snapshot_dir.path) + + if self.test_overlay_dir is not None: + # Make repo_dir an overlayfs mount with lowerdir being the empty snapshot_dir. + # When we're done filling up repo_dir, tear everything down, unmount the overlayfs, and use + # the upperdir as the snapshot. This is equivalent to docker `FROM scratch`. + assert not self.repo_dir.exists() + assert self.repo_dir.parent.exists() + snapshot_dir.path.mkdir() + self.overlay_mount("create-snapshot-repo-dir", snapshot_dir.path, self.repo_dir) + self.config_init_force = "empty-dir-ok" + + env = create_env_for_snapshot(self) + assert self.env is not None + assert self.env == env + + # shut down everything for snapshot + env.stop(immediate=True, ps_assert_metric_no_errors=True) + + # TODO: all kinds of assertions to ensure the env is unused + + if self.test_overlay_dir is None: + log.info("take snapshot by moving repo dir") + env.repo_dir.rename(snapshot_dir.path) + else: + log.info("take snapshot by using overlayfs upperdir") + self.overlay_unmount_and_move("create-snapshot-repo-dir", snapshot_dir.path) + log.info("remove empty repo_dir (previously mountpoint) for snapshot overlay_mount") + env.repo_dir.rmdir() + # TODO from here on, we should be able to reset / goto top where snapshot_dir.is_initialized() + log.info("make repo_dir an overlayfs mount of the snapshot we just created") + assert not env.repo_dir.exists(), "both branches above should remove it" + snapshot_dir.set_initialized() + + self.env = None # so that from_repo_dir works again + def from_repo_dir( self, repo_dir: Path, @@ -557,10 +622,15 @@ class NeonEnvBuilder: tenants_from_dir = ps_dir / "tenants" tenants_to_dir = self.repo_dir / ps_dir.name / "tenants" - log.info(f"Copying pageserver tenants directory {tenants_from_dir} to {tenants_to_dir}") if self.test_overlay_dir is None: + log.info( + f"Copying pageserver tenants directory {tenants_from_dir} to {tenants_to_dir}" + ) shutil.copytree(tenants_from_dir, tenants_to_dir) else: + log.info( + f"Creating overlayfs mount of pageserver tenants directory {tenants_from_dir} to {tenants_to_dir}" + ) self.overlay_mount(f"{ps_dir.name}:tenants", tenants_from_dir, tenants_to_dir) for sk_from_dir in (repo_dir / "safekeepers").glob("sk*"): @@ -571,10 +641,12 @@ class NeonEnvBuilder: shutil.rmtree(self.repo_dir / "local_fs_remote_storage", ignore_errors=True) if self.test_overlay_dir is None: + log.info("Copying local_fs_remote_storage directory from snapshot") shutil.copytree( repo_dir / "local_fs_remote_storage", self.repo_dir / "local_fs_remote_storage" ) else: + log.info("Creating overlayfs mount of local_fs_remote_storage directory from snapshot") self.overlay_mount( "local_fs_remote_storage", repo_dir / "local_fs_remote_storage", @@ -631,6 +703,54 @@ class NeonEnvBuilder: ) self.overlay_mounts_created_by_us.append((ident, dstdir)) + def _overlay_umount(self, mountpoint: Path): + cmd = ["sudo", "umount", str(mountpoint)] + assert mountpoint.is_mount() + subprocess_capture( + self.test_output_dir, cmd, check=True, echo_stderr=True, echo_stdout=True + ) + + def overlay_unmount_and_move(self, ident: str, dst: Path): + """ + Unmount previously established overlayfs mount at `dstdir` and move the upperdir contents to `dst`. + If `dst` is an empty directory, it gets replaced. + Caller is responsible for ensuring the unmount will succeed, i.e., that there aren't any nested mounts. + + Raises exception if self.test_overlay_dir is None + """ + assert self.test_overlay_dir is not None + # not mutating state yet, make checks + ident_state_dir = self.test_overlay_dir / ident + assert ident_state_dir.is_dir() + upper = ident_state_dir / "upper" + work = ident_state_dir / "work" + assert upper.is_dir() + assert work.is_dir() + assert ( + self.test_overlay_dir not in dst.parents + ), "otherwise workdir cleanup below wouldn't work" + # find index, still not mutating state + idxmap = { + existing_ident: idx + for idx, (existing_ident, _) in enumerate(self.overlay_mounts_created_by_us) + } + idx = idxmap.get(ident) + if idx is None: + raise RuntimeError(f"cannot find mount for ident {ident}") + + if dst.is_dir(): + dst.rmdir() # raises exception if not empty, which is what we want + + _, mountpoint = self.overlay_mounts_created_by_us.pop(idx) + self._overlay_umount(mountpoint) + upper.rename(dst) + # we moved the upperdir, clean up workdir and then its parent ident_state_dir + cmd = ["sudo", "rm", "-rf", str(work)] + subprocess_capture( + self.test_output_dir, cmd, check=True, echo_stderr=True, echo_stdout=True + ) + ident_state_dir.rmdir() # should be empty since we moved `upper` out + def overlay_cleanup_teardown(self): """ Unmount the overlayfs mounts created by `self.overlay_mount()`. @@ -641,13 +761,10 @@ class NeonEnvBuilder: while len(self.overlay_mounts_created_by_us) > 0: (ident, mountpoint) = self.overlay_mounts_created_by_us.pop() ident_state_dir = self.test_overlay_dir / ident - cmd = ["sudo", "umount", str(mountpoint)] log.info( - f"Unmounting overlayfs mount created during setup for ident {ident} at {mountpoint}: {cmd}" - ) - subprocess_capture( - self.test_output_dir, cmd, check=True, echo_stderr=True, echo_stdout=True + f"Unmounting overlayfs mount created during setup for ident {ident} at {mountpoint}" ) + self._overlay_umount(mountpoint) log.info( f"Cleaning up overlayfs state dir (owned by root user) for ident {ident} at {ident_state_dir}" ) @@ -725,8 +842,15 @@ class NeonEnvBuilder: if self.preserve_database_files: return + overlayfs_mounts = {mountpoint for _, mountpoint in self.overlay_mounts_created_by_us} + directories_to_clean: List[Path] = [] for test_entry in Path(self.repo_dir).glob("**/*"): + if test_entry in overlayfs_mounts: + continue + for parent in test_entry.parents: + if parent in overlayfs_mounts: + continue if test_entry.is_file(): test_file = test_entry if ATTACHMENT_NAME_REGEX.fullmatch(test_file.name): @@ -775,13 +899,6 @@ class NeonEnvBuilder: log.error(f"Error during remote storage scrub: {e}") cleanup_error = e - try: - self.overlay_cleanup_teardown() - except Exception as e: - log.error(f"Error cleaning up overlay state: {e}") - if cleanup_error is not None: - cleanup_error = e - try: self.cleanup_remote_storage() except Exception as e: @@ -802,6 +919,13 @@ class NeonEnvBuilder: for pageserver in self.env.pageservers: pageserver.assert_no_errors() + try: + self.overlay_cleanup_teardown() + except Exception as e: + log.error(f"Error cleaning up overlay state: {e}") + if cleanup_error is not None: + cleanup_error = e + class NeonEnv: """ @@ -1082,6 +1206,7 @@ def _shared_simple_env( shutil.rmtree(repo_dir, ignore_errors=True) with NeonEnvBuilder( + top_output_dir=top_output_dir, repo_dir=repo_dir, port_distributor=port_distributor, broker=default_broker, @@ -1130,6 +1255,7 @@ def neon_env_builder( run_id: uuid.UUID, request: FixtureRequest, test_overlay_dir: Path, + top_output_dir: Path, ) -> Iterator[NeonEnvBuilder]: """ Fixture to create a Neon environment for test. @@ -1149,6 +1275,7 @@ def neon_env_builder( # Return the builder to the caller with NeonEnvBuilder( + top_output_dir=top_output_dir, repo_dir=Path(repo_dir), port_distributor=port_distributor, mock_s3_server=mock_s3_server, @@ -3487,6 +3614,10 @@ def get_test_overlay_dir(request: FixtureRequest, top_output_dir: Path) -> Path: return _get_test_dir(request, top_output_dir, "overlay-") +def get_shared_snapshot_dir_path(top_output_dir: Path, snapshot_name: str) -> Path: + return top_output_dir / "shared-snapshots" / snapshot_name + + def get_test_repo_dir(request: FixtureRequest, top_output_dir: Path) -> Path: return get_test_output_dir(request, top_output_dir) / "repo" @@ -3533,6 +3664,75 @@ def test_output_dir( allure_attach_from_dir(test_dir) +class FileAndThreadLock: + def __init__(self, path: Path): + self.path = path + self.thread_lock = threading.Lock() + self.fd: Optional[int] = None + + def __enter__(self): + self.fd = os.open(self.path, os.O_CREAT | os.O_WRONLY) + # lock thread lock before file lock so that there's no race + # around flocking / funlocking the file lock + self.thread_lock.acquire() + flock(self.fd, LOCK_EX) + + def __exit__(self, exc_type, exc_value, exc_traceback): + assert self.fd is not None + assert self.thread_lock.locked() # ... by us + flock(self.fd, LOCK_UN) + self.thread_lock.release() + os.close(self.fd) + self.fd = None + + +class SnapshotDirLocked: + def __init__(self, parent: SnapshotDir): + self._parent = parent + + def is_initialized(self): + # TODO: in the future, take a `tag` as argument and store it in the marker in set_initialized. + # Then, in this function, compare marker file contents with the tag to invalidate the snapshot if the tag changed. + return self._parent._marker_file_path.exists() + + def set_initialized(self): + self._parent._marker_file_path.write_text("") + + @property + def path(self) -> Path: + return self._parent._path / "snapshot" + + +class SnapshotDir: + _path: Path + + def __init__(self, path: Path): + self._path = path + assert self._path.is_dir() + self._lock = FileAndThreadLock(self._lock_file_path) + + @property + def _lock_file_path(self) -> Path: + return self._path / "initializing.flock" + + @property + def _marker_file_path(self) -> Path: + return self._path / "initialized.marker" + + def __enter__(self) -> SnapshotDirLocked: + self._lock.__enter__() + return SnapshotDirLocked(self) + + def __exit__(self, exc_type, exc_value, exc_traceback): + self._lock.__exit__(exc_type, exc_value, exc_traceback) + + +def shared_snapshot_dir(top_output_dir, ident: str) -> SnapshotDir: + snapshot_dir_path = get_shared_snapshot_dir_path(top_output_dir, ident) + snapshot_dir_path.mkdir(exist_ok=True, parents=True) + return SnapshotDir(snapshot_dir_path) + + @pytest.fixture(scope="function") def test_overlay_dir(request: FixtureRequest, top_output_dir: Path) -> Optional[Path]: """ @@ -3542,7 +3742,7 @@ def test_overlay_dir(request: FixtureRequest, top_output_dir: Path) -> Optional[ The procedure cleans up after previous runs that were aborted (e.g. due to Ctrl-C, OOM kills, etc). """ - if os.getenv("NEON_ENV_BUILDER_FROM_REPO_DIR_USE_OVERLAYFS") is None: + if os.getenv("NEON_ENV_BUILDER_USE_OVERLAYFS_FOR_SNAPSHOTS") is None: return None overlay_dir = get_test_overlay_dir(request, top_output_dir) diff --git a/test_runner/fixtures/pageserver/many_tenants.py b/test_runner/fixtures/pageserver/many_tenants.py new file mode 100644 index 0000000000..bbb4ccee5b --- /dev/null +++ b/test_runner/fixtures/pageserver/many_tenants.py @@ -0,0 +1,85 @@ +import concurrent.futures +import time +from typing import Any, Callable, Dict, Tuple + +import fixtures.pageserver.remote_storage +from fixtures.log_helper import log +from fixtures.neon_fixtures import ( + NeonEnv, + NeonEnvBuilder, +) +from fixtures.pageserver.utils import ( + wait_until_tenant_state, +) +from fixtures.remote_storage import LocalFsStorage, RemoteStorageKind +from fixtures.types import TenantId, TimelineId + + +def single_timeline( + neon_env_builder: NeonEnvBuilder, + setup_template: Callable[[NeonEnv], Tuple[TenantId, TimelineId, Dict[str, Any]]], + ncopies: int, +) -> NeonEnv: + """ + Create `ncopies` duplicates of a template tenant that has a single timeline. + """ + neon_env_builder.enable_pageserver_remote_storage(RemoteStorageKind.LOCAL_FS) + + env = neon_env_builder.init_start() + + remote_storage = env.pageserver_remote_storage + assert isinstance(remote_storage, LocalFsStorage) + + ps_http = env.pageserver.http_client() + # clean up the useless default tenant + ps_http.tenant_delete(env.initial_tenant) + + log.info("invoking callback to create template tenant") + template_tenant, template_timeline, template_config = setup_template(env) + log.info( + f"template tenant is template_tenant={template_tenant} template_timeline={template_timeline}" + ) + + log.info("detach template tenant form pageserver") + env.pageserver.tenant_detach(template_tenant) + env.pageserver.allowed_errors.append( + # tenant detach causes this because the underlying attach-hook removes the tenant from attachment_service entirely + ".*Dropped remote consistent LSN updates.*", + ) + + log.info(f"duplicating template tenant {ncopies} times in S3") + tenants = fixtures.pageserver.remote_storage.duplicate_tenant(env, template_tenant, ncopies) + + log.info("attach duplicated tenants to pageserver") + # In theory we could just attach all the tenants, force on-demand downloads via mgmt API, and be done. + # However, on-demand downloads are quite slow ATM. + # => do the on-demand downloads in Python. + assert ps_http.tenant_list() == [] + # make the attach fail after it created enough on-disk state to retry loading + # the tenant next startup, but before it can start background loops that would start download + ps_http.configure_failpoints(("attach-before-activate", "return")) + env.pageserver.allowed_errors.append( + ".*attach failed, setting tenant state to Broken: attach-before-activate.*" + ) + + def attach_broken(tenant): + env.pageserver.tenant_attach( + tenant, + config=template_config.copy(), + ) + time.sleep(0.1) + wait_until_tenant_state(ps_http, tenant, "Broken", 10) + + with concurrent.futures.ThreadPoolExecutor(max_workers=22) as executor: + executor.map(attach_broken, tenants) + + env.pageserver.stop( + immediate=True + ) # clears the failpoint as a side-effect; immediate to avoid hitting neon_local's timeout + tenant_timelines = list(map(lambda tenant: (tenant, template_timeline), tenants)) + log.info("python-side on-demand download the layer files into local tenant dir") + fixtures.pageserver.remote_storage.copy_all_remote_layer_files_to_local_tenant_dir( + env, tenant_timelines + ) + + return env diff --git a/test_runner/fixtures/pageserver/remote_storage.py b/test_runner/fixtures/pageserver/remote_storage.py new file mode 100644 index 0000000000..e6cd9b4614 --- /dev/null +++ b/test_runner/fixtures/pageserver/remote_storage.py @@ -0,0 +1,116 @@ +import concurrent.futures +import os +import queue +import shutil +import threading +from pathlib import Path +from typing import Any, List, Tuple + +from fixtures.neon_fixtures import NeonEnv, Pagectl +from fixtures.pageserver.types import ( + InvalidFileName, + parse_layer_file_name, +) +from fixtures.remote_storage import LocalFsStorage +from fixtures.types import TenantId, TimelineId + + +def duplicate_one_tenant(env: NeonEnv, template_tenant: TenantId, new_tenant: TenantId): + remote_storage = env.pageserver_remote_storage + assert isinstance(remote_storage, LocalFsStorage) + + src_timelines_dir: Path = remote_storage.tenant_path(template_tenant) / "timelines" + assert src_timelines_dir.is_dir(), f"{src_timelines_dir} is not a directory" + + assert isinstance(remote_storage, LocalFsStorage) + dst_timelines_dir: Path = remote_storage.tenant_path(new_tenant) / "timelines" + dst_timelines_dir.parent.mkdir(parents=False, exist_ok=False) + dst_timelines_dir.mkdir(parents=False, exist_ok=False) + + for tl in src_timelines_dir.iterdir(): + src_tl_dir = src_timelines_dir / tl.name + assert src_tl_dir.is_dir(), f"{src_tl_dir} is not a directory" + dst_tl_dir = dst_timelines_dir / tl.name + dst_tl_dir.mkdir(parents=False, exist_ok=False) + for file in tl.iterdir(): + shutil.copy2(file, dst_tl_dir) + if "__" in file.name: + Pagectl(env).raw_cli( + [ + "layer", + "rewrite-summary", + str(dst_tl_dir / file.name), + "--new-tenant-id", + str(new_tenant), + ] + ) + else: + # index_part etc need no patching + pass + return None + + +def duplicate_tenant(env: NeonEnv, template_tenant: TenantId, ncopies: int) -> List[TenantId]: + assert isinstance(env.pageserver_remote_storage, LocalFsStorage) + + def work(tenant_id): + duplicate_one_tenant(env, template_tenant, tenant_id) + + new_tenants: List[TenantId] = [TenantId.generate() for _ in range(0, ncopies)] + with concurrent.futures.ThreadPoolExecutor(max_workers=8) as executor: + executor.map(work, new_tenants) + return new_tenants + + +def local_layer_name_from_remote_name(remote_name: str) -> str: + try: + return parse_layer_file_name(remote_name).to_str() + except InvalidFileName as e: + comps = remote_name.rsplit("-", 1) + if len(comps) == 1: + raise InvalidFileName("no generation suffix found") from e + else: + assert len(comps) == 2 + layer_file_name, _generation = comps + try: + return parse_layer_file_name(layer_file_name).to_str() + except InvalidFileName: + raise + + +def copy_all_remote_layer_files_to_local_tenant_dir( + env: NeonEnv, tenant_timelines: List[Tuple[TenantId, TimelineId]] +): + remote_storage = env.pageserver_remote_storage + assert isinstance(remote_storage, LocalFsStorage) + work: queue.Queue[Any] = queue.Queue() + for tenant, timeline in tenant_timelines: + remote_timeline_path = remote_storage.timeline_path(tenant, timeline) + local_timeline_path = env.pageserver.timeline_dir(tenant, timeline) + local_timeline_path.mkdir(parents=True, exist_ok=True) + downloads = {} + for remote_layer in remote_timeline_path.glob("*__*"): + local_name = local_layer_name_from_remote_name(remote_layer.name) + assert local_name not in downloads, "remote storage must have had split brain" + downloads[local_name] = remote_layer + for local_name, remote_path in downloads.items(): + work.put((remote_path, local_timeline_path / local_name)) + + def copy_layer_worker(queue): + while True: + item = queue.get() + if item is None: + return + remote_path, local_path = item + # not copy2, so it looks like a recent download, in case that's relevant to e.g. eviction + shutil.copy(remote_path, local_path, follow_symlinks=False) + + workers = [] + n_threads = os.cpu_count() or 1 + for _ in range(0, n_threads): + w = threading.Thread(target=copy_layer_worker, args=[work]) + workers.append(w) + w.start() + work.put(None) + for w in workers: + w.join() diff --git a/test_runner/fixtures/pageserver/types.py b/test_runner/fixtures/pageserver/types.py index b3c1174b35..72fa30a2f2 100644 --- a/test_runner/fixtures/pageserver/types.py +++ b/test_runner/fixtures/pageserver/types.py @@ -31,10 +31,10 @@ class DeltaLayerFileName: key_start: Key key_end: Key - def is_l0(self): + def is_l0(self) -> bool: return self.key_start == KEY_MIN and self.key_end == KEY_MAX - def to_str(self): + def to_str(self) -> str: ret = f"{self.key_start.as_int():036X}-{self.key_end.as_int():036X}__{self.lsn_start.as_int():016X}-{self.lsn_end.as_int():016X}" assert self == parse_layer_file_name(ret) return ret @@ -107,7 +107,7 @@ def parse_layer_file_name(file_name: str) -> LayerFileName: except InvalidFileName: pass - raise ValueError() + raise InvalidFileName("neither image nor delta layer") def is_future_layer(layer_file_name: LayerFileName, disk_consistent_lsn: Lsn): diff --git a/test_runner/fixtures/pageserver/utils.py b/test_runner/fixtures/pageserver/utils.py index a6c4b8e930..972e0b714d 100644 --- a/test_runner/fixtures/pageserver/utils.py +++ b/test_runner/fixtures/pageserver/utils.py @@ -63,6 +63,14 @@ def wait_for_upload( ) +def _tenant_in_expected_state(tenant_info: Dict[str, Any], expected_state: str): + if tenant_info["state"]["slug"] == expected_state: + return True + if tenant_info["state"]["slug"] == "Broken": + raise RuntimeError(f"tenant became Broken, not {expected_state}") + return False + + def wait_until_tenant_state( pageserver_http: PageserverHttpClient, tenant_id: TenantId, @@ -80,10 +88,8 @@ def wait_until_tenant_state( log.debug(f"Tenant {tenant_id} state retrieval failure: {e}") else: log.debug(f"Tenant {tenant_id} data: {tenant}") - if tenant["state"]["slug"] == expected_state: + if _tenant_in_expected_state(tenant, expected_state): return tenant - if tenant["state"]["slug"] == "Broken": - raise RuntimeError(f"tenant became Broken, not {expected_state}") time.sleep(period) @@ -92,6 +98,34 @@ def wait_until_tenant_state( ) +def wait_until_all_tenants_state( + pageserver_http: PageserverHttpClient, + expected_state: str, + iterations: int, + period: float = 1.0, + http_error_ok: bool = True, +): + """ + Like wait_until_tenant_state, but checks all tenants. + """ + for _ in range(iterations): + try: + tenants = pageserver_http.tenant_list() + except Exception as e: + if http_error_ok: + log.debug(f"Failed to list tenants: {e}") + else: + raise + else: + if all(map(lambda tenant: _tenant_in_expected_state(tenant, expected_state), tenants)): + return + time.sleep(period) + + raise Exception( + f"Not all tenants became active {expected_state} within {iterations * period} seconds" + ) + + def wait_until_timeline_state( pageserver_http: PageserverHttpClient, tenant_id: Union[TenantId, TenantShardId], diff --git a/test_runner/fixtures/utils.py b/test_runner/fixtures/utils.py index cda788b2a4..91f33e1196 100644 --- a/test_runner/fixtures/utils.py +++ b/test_runner/fixtures/utils.py @@ -397,3 +397,36 @@ def run_pg_bench_small(pg_bin: "PgBin", connstr: str): } """ pg_bin.run(["pgbench", "-i", "-I dtGvp", "-s1", connstr]) + + +def humantime_to_ms(humantime: str) -> float: + """ + Converts Rust humantime's output string to milliseconds. + + humantime_to_ms("1h 1ms 406us") -> 3600001.406 + """ + + unit_multiplier_map = { + "ns": 1e-6, + "us": 1e-3, + "ms": 1, + "s": 1e3, + "m": 1e3 * 60, + "h": 1e3 * 60 * 60, + } + matcher = re.compile(rf"^(\d+)({'|'.join(unit_multiplier_map.keys())})$") + total_ms = 0.0 + + if humantime == "0": + return total_ms + + for item in humantime.split(): + if (match := matcher.search(item)) is not None: + n, unit = match.groups() + total_ms += int(n) * unit_multiplier_map[unit] + else: + raise ValueError( + f"can't parse '{item}' (from string '{humantime}'), known units are {', '.join(unit_multiplier_map.keys())}." + ) + + return round(total_ms, 3) diff --git a/test_runner/performance/pageserver/README.md b/test_runner/performance/pageserver/README.md new file mode 100644 index 0000000000..fdd09cd946 --- /dev/null +++ b/test_runner/performance/pageserver/README.md @@ -0,0 +1,16 @@ +How to reproduce benchmark results / run these benchmarks interactively. + +1. Get an EC2 instance with Instance Store. Use the same instance type as used for the benchmark run. +2. Mount the Instance Store => `neon.git/scripts/ps_ec2_setup_instance_store` +3. Use a pytest command line (see other READMEs further up in the pytest hierarchy). + +For tests that take a long time to set up / consume a lot of storage space, +we use the test suite's repo_dir snapshotting functionality (`from_repo_dir`). +It supports mounting snapshots using overlayfs, which improves iteration time. + +Here's a full command line. + +``` +RUST_BACKTRACE=1 NEON_ENV_BUILDER_USE_OVERLAYFS_FOR_SNAPSHOTS=1 DEFAULT_PG_VERSION=15 BUILD_TYPE=release \ + ./scripts/pytest test_runner/performance/pageserver/pagebench/test_pageserver_max_throughput_getpage_at_latest_lsn.py +```` diff --git a/test_runner/performance/pageserver/__init__.py b/test_runner/performance/pageserver/__init__.py new file mode 100644 index 0000000000..e69de29bb2 diff --git a/test_runner/performance/pageserver/interactive/__init__.py b/test_runner/performance/pageserver/interactive/__init__.py new file mode 100644 index 0000000000..29644c240e --- /dev/null +++ b/test_runner/performance/pageserver/interactive/__init__.py @@ -0,0 +1,8 @@ +""" +Tests that aren't really tests or benchmarks. + +They're intended for the case where we want to standardize & automate setup, +but then debug a performance problem interactively. +It's kind of an abuse of the test framework, but, it's our only tool right +now to automate a complex test bench setup. +""" diff --git a/test_runner/performance/pageserver/interactive/test_many_small_tenants.py b/test_runner/performance/pageserver/interactive/test_many_small_tenants.py new file mode 100644 index 0000000000..3fb28ace46 --- /dev/null +++ b/test_runner/performance/pageserver/interactive/test_many_small_tenants.py @@ -0,0 +1,79 @@ +import os +import pdb + +import fixtures.pageserver.many_tenants as many_tenants +import pytest +from fixtures.neon_fixtures import ( + NeonEnv, + NeonEnvBuilder, + PgBin, + last_flush_lsn_upload, +) + +from performance.pageserver.util import ensure_pageserver_ready_for_benchmarking + +""" +Usage: +DEFAULT_PG_VERSION=15 BUILD_TYPE=debug NEON_ENV_BUILDER_USE_OVERLAYFS_FOR_SNAPSHOTS=1 INTERACTIVE=true \ + ./scripts/pytest --timeout 0 test_runner/performance/pageserver/interactive/test_many_small_tenants.py +""" + + +@pytest.mark.skipif( + os.environ.get("INTERACTIVE", "false") != "true", + reason="test is for interactive use only", +) +def test_many_small_tenants( + neon_env_builder: NeonEnvBuilder, + pg_bin: PgBin, +): + _env = setup_env(neon_env_builder, 2) # vary this to the desired number of tenants + _pg_bin = pg_bin + + # drop into pdb so that we can debug pageserver interactively, use pdb here + # For example, to interactively examine pageserver startup behavior, call + # _env.pageserver.stop(immediate=True) + # _env.pageserver.start() + # from the pdb shell. + pdb.set_trace() + + +def setup_env( + neon_env_builder: NeonEnvBuilder, + n_tenants: int, +) -> NeonEnv: + def setup_template(env: NeonEnv): + # create our template tenant + config = { + "gc_period": "0s", + "checkpoint_timeout": "10 years", + "compaction_period": "20 s", + "compaction_threshold": 10, + "compaction_target_size": 134217728, + "checkpoint_distance": 268435456, + "image_creation_threshold": 3, + } + template_tenant, template_timeline = env.neon_cli.create_tenant(set_default=True) + env.pageserver.tenant_detach(template_tenant) + env.pageserver.allowed_errors.append( + # tenant detach causes this because the underlying attach-hook removes the tenant from attachment_service entirely + ".*Dropped remote consistent LSN updates.*", + ) + env.pageserver.tenant_attach(template_tenant, config) + ep = env.endpoints.create_start("main", tenant_id=template_tenant) + ep.safe_psql("create table foo(b text)") + for _ in range(0, 8): + ep.safe_psql("insert into foo(b) values ('some text')") + last_flush_lsn_upload(env, ep, template_tenant, template_timeline) + ep.stop_and_destroy() + return (template_tenant, template_timeline, config) + + def doit(neon_env_builder: NeonEnvBuilder) -> NeonEnv: + return many_tenants.single_timeline(neon_env_builder, setup_template, n_tenants) + + env = neon_env_builder.build_and_use_snapshot(f"many-small-tenants-{n_tenants}", doit) + + env.start() + ensure_pageserver_ready_for_benchmarking(env, n_tenants) + + return env diff --git a/test_runner/performance/pageserver/pagebench/__init__.py b/test_runner/performance/pageserver/pagebench/__init__.py new file mode 100644 index 0000000000..9f5e45c0a0 --- /dev/null +++ b/test_runner/performance/pageserver/pagebench/__init__.py @@ -0,0 +1,10 @@ +""" +Pagebench-based performance regression tests. + +The defining characteristic of tests in this sub-directory is that they +are component-level tests, i.e., they exercise pageserver directly using `pagebench` +instead of benchmarking the full stack. + +See https://github.com/neondatabase/neon/issues/5771 +for the context in which this was developed. +""" diff --git a/test_runner/performance/pageserver/pagebench/test_pageserver_max_throughput_getpage_at_latest_lsn.py b/test_runner/performance/pageserver/pagebench/test_pageserver_max_throughput_getpage_at_latest_lsn.py new file mode 100644 index 0000000000..f1f5511453 --- /dev/null +++ b/test_runner/performance/pageserver/pagebench/test_pageserver_max_throughput_getpage_at_latest_lsn.py @@ -0,0 +1,210 @@ +import json +from pathlib import Path +from typing import Any, Dict, Tuple + +import fixtures.pageserver.many_tenants as many_tenants +import pytest +from fixtures.benchmark_fixture import MetricReport, NeonBenchmarker +from fixtures.log_helper import log +from fixtures.neon_fixtures import ( + NeonEnv, + NeonEnvBuilder, + PgBin, + wait_for_last_flush_lsn, +) +from fixtures.utils import get_scale_for_db, humantime_to_ms + +from performance.pageserver.util import ensure_pageserver_ready_for_benchmarking + + +# For reference, the space usage of the snapshots: +# admin@ip-172-31-13-23:[~/neon-main]: sudo du -hs /instance_store/test_output/shared-snapshots +# 137G /instance_store/test_output/shared-snapshots +# admin@ip-172-31-13-23:[~/neon-main]: sudo du -hs /instance_store/test_output/shared-snapshots/* +# 1.8G /instance_store/test_output/shared-snapshots/max_throughput_latest_lsn-1-13 +# 1.1G /instance_store/test_output/shared-snapshots/max_throughput_latest_lsn-1-6 +# 8.5G /instance_store/test_output/shared-snapshots/max_throughput_latest_lsn-10-13 +# 5.1G /instance_store/test_output/shared-snapshots/max_throughput_latest_lsn-10-6 +# 76G /instance_store/test_output/shared-snapshots/max_throughput_latest_lsn-100-13 +# 46G /instance_store/test_output/shared-snapshots/max_throughput_latest_lsn-100-6 +@pytest.mark.parametrize("duration", [30]) +@pytest.mark.parametrize("pgbench_scale", [get_scale_for_db(s) for s in [100, 200]]) +@pytest.mark.parametrize("n_tenants", [1, 10, 100]) +@pytest.mark.timeout( + 10000 +) # TODO: this value is just "a really high number"; have this per instance type +def test_pageserver_max_throughput_getpage_at_latest_lsn( + neon_env_builder: NeonEnvBuilder, + zenbenchmark: NeonBenchmarker, + pg_bin: PgBin, + n_tenants: int, + pgbench_scale: int, + duration: int, +): + def record(metric, **kwargs): + zenbenchmark.record( + metric_name=f"pageserver_max_throughput_getpage_at_latest_lsn.{metric}", **kwargs + ) + + params: Dict[str, Tuple[Any, Dict[str, Any]]] = {} + + # params from fixtures + params.update( + { + "n_tenants": (n_tenants, {"unit": ""}), + "pgbench_scale": (pgbench_scale, {"unit": ""}), + "duration": (duration, {"unit": "s"}), + } + ) + + # configure cache sizes like in prod + page_cache_size = 16384 + max_file_descriptors = 500000 + neon_env_builder.pageserver_config_override = ( + f"page_cache_size={page_cache_size}; max_file_descriptors={max_file_descriptors}" + ) + params.update( + { + "pageserver_config_override.page_cache_size": ( + page_cache_size * 8192, + {"unit": "byte"}, + ), + "pageserver_config_override.max_file_descriptors": (max_file_descriptors, {"unit": ""}), + } + ) + + for param, (value, kwargs) in params.items(): + record(param, metric_value=value, report=MetricReport.TEST_PARAM, **kwargs) + env = setup_pageserver_with_pgbench_tenants(neon_env_builder, pg_bin, n_tenants, pgbench_scale) + run_benchmark_max_throughput_latest_lsn(env, pg_bin, record, duration) + + +def run_benchmark_max_throughput_latest_lsn( + env: NeonEnv, pg_bin: PgBin, record, duration_secs: int +): + """ + Benchmark `env.pageserver` for max throughput @ latest LSN and record results in `zenbenchmark`. + """ + + ps_http = env.pageserver.http_client() + cmd = [ + str(env.neon_binpath / "pagebench"), + "get-page-latest-lsn", + "--mgmt-api-endpoint", + ps_http.base_url, + "--page-service-connstring", + env.pageserver.connstr(password=None), + "--runtime", + f"{duration_secs}s", + # don't specify the targets explicitly, let pagebench auto-discover them + ] + log.info(f"command: {' '.join(cmd)}") + basepath = pg_bin.run_capture(cmd, with_command_header=False) + results_path = Path(basepath + ".stdout") + log.info(f"Benchmark results at: {results_path}") + + with open(results_path, "r") as f: + results = json.load(f) + log.info(f"Results:\n{json.dumps(results, sort_keys=True, indent=2)}") + + total = results["total"] + + metric = "request_count" + record( + metric, + metric_value=total[metric], + unit="", + report=MetricReport.HIGHER_IS_BETTER, + ) + + metric = "latency_mean" + record( + metric, + metric_value=humantime_to_ms(total[metric]), + unit="ms", + report=MetricReport.LOWER_IS_BETTER, + ) + + metric = "latency_percentiles" + for k, v in total[metric].items(): + record( + f"{metric}.{k}", + metric_value=humantime_to_ms(v), + unit="ms", + report=MetricReport.LOWER_IS_BETTER, + ) + + +def setup_pageserver_with_pgbench_tenants( + neon_env_builder: NeonEnvBuilder, + pg_bin: PgBin, + n_tenants: int, + scale: int, +) -> NeonEnv: + """ + Utility function to set up a pageserver with a given number of identical tenants. + Each tenant is a pgbench tenant, initialize to a certain scale, and treated afterwards + with a repeat application of (pgbench simple-update workload, checkpoint, compact). + """ + + def setup_template(env: NeonEnv): + # use a config that makes production of on-disk state timing-insensitive + # as we ingest data into the tenant. + config = { + "gc_period": "0s", # disable periodic gc + "checkpoint_timeout": "10 years", + "compaction_period": "0s", # disable periodic compaction + "compaction_threshold": 10, + "compaction_target_size": 134217728, + "checkpoint_distance": 268435456, + "image_creation_threshold": 3, + } + template_tenant, template_timeline = env.neon_cli.create_tenant(set_default=True) + env.pageserver.tenant_detach(template_tenant) + env.pageserver.allowed_errors.append( + # tenant detach causes this because the underlying attach-hook removes the tenant from attachment_service entirely + ".*Dropped remote consistent LSN updates.*", + ) + env.pageserver.tenant_attach(template_tenant, config) + ps_http = env.pageserver.http_client() + with env.endpoints.create_start("main", tenant_id=template_tenant) as ep: + pg_bin.run_capture(["pgbench", "-i", f"-s{scale}", "-I", "dtGvp", ep.connstr()]) + wait_for_last_flush_lsn(env, ep, template_tenant, template_timeline) + ps_http.timeline_checkpoint(template_tenant, template_timeline) + ps_http.timeline_compact(template_tenant, template_timeline) + for _ in range( + 0, 17 + ): # some prime number to avoid potential resonances with the "_threshold" variables from the config + # the L0s produced by this appear to have size ~5MiB + num_txns = 10_000 + pg_bin.run_capture( + ["pgbench", "-N", "-c1", "--transactions", f"{num_txns}", ep.connstr()] + ) + wait_for_last_flush_lsn(env, ep, template_tenant, template_timeline) + ps_http.timeline_checkpoint(template_tenant, template_timeline) + ps_http.timeline_compact(template_tenant, template_timeline) + # for reference, the output at scale=6 looked like so (306M total) + # ls -sh test_output/shared-snapshots/max_throughput_latest_lsn-2-6/snapshot/pageserver_1/tenants/35c30b88ea16a7a09f82d9c6a115551b/timelines/da902b378eebe83dc8a4e81cd3dc1c59 + # total 306M + # 188M 000000000000000000000000000000000000-030000000000000000000000000000000003__000000000149F060-0000000009E75829 + # 4.5M 000000000000000000000000000000000000-FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF__0000000009E75829-000000000A21E919 + # 33M 000000000000000000000000000000000000-FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF__000000000A21E919-000000000C20CB71 + # 36M 000000000000000000000000000000000000-FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF__000000000C20CB71-000000000E470791 + # 16M 000000000000000000000000000000000000-FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF__000000000E470791-000000000F34AEF1 + # 8.2M 000000000000000000000000000000000000-FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF__000000000F34AEF1-000000000FABA8A9 + # 6.0M 000000000000000000000000000000000000-FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF__000000000FABA8A9-000000000FFE0639 + # 6.1M 000000000000000000000000000000000000-FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF__000000000FFE0639-000000001051D799 + # 4.7M 000000000000000000000000000000000000-FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF__000000001051D799-0000000010908F19 + # 4.6M 000000000000000000000000000000000000-FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF__0000000010908F19-0000000010CD3021 + + return (template_tenant, template_timeline, config) + + def doit(neon_env_builder: NeonEnvBuilder) -> NeonEnv: + return many_tenants.single_timeline(neon_env_builder, setup_template, n_tenants) + + env = neon_env_builder.build_and_use_snapshot( + f"max_throughput_latest_lsn-{n_tenants}-{scale}", doit + ) + env.start() + ensure_pageserver_ready_for_benchmarking(env, n_tenants) + return env diff --git a/test_runner/performance/pageserver/util.py b/test_runner/performance/pageserver/util.py new file mode 100644 index 0000000000..45eb652362 --- /dev/null +++ b/test_runner/performance/pageserver/util.py @@ -0,0 +1,29 @@ +""" +Utilities used by all code in this sub-directory +""" + +from fixtures.log_helper import log +from fixtures.neon_fixtures import NeonEnv +from fixtures.pageserver.utils import wait_until_all_tenants_state + + +def ensure_pageserver_ready_for_benchmarking(env: NeonEnv, n_tenants: int): + """ + Helper function. + """ + ps_http = env.pageserver.http_client() + + log.info("wait for all tenants to become active") + wait_until_all_tenants_state( + ps_http, "Active", iterations=n_tenants, period=1, http_error_ok=False + ) + + # ensure all layers are resident for predictiable performance + tenants = [info["id"] for info in ps_http.tenant_list()] + for tenant in tenants: + for timeline in ps_http.tenant_status(tenant)["timelines"]: + info = ps_http.layer_map_info(tenant, timeline) + for layer in info.historic_layers: + assert not layer.remote + + log.info("ready") diff --git a/test_runner/regress/test_tenant_detach.py b/test_runner/regress/test_tenant_detach.py index d548e63cc1..8d5ef4e3c4 100644 --- a/test_runner/regress/test_tenant_detach.py +++ b/test_runner/regress/test_tenant_detach.py @@ -482,7 +482,7 @@ def test_detach_while_attaching( pageserver_http.tenant_detach(tenant_id) # And re-attach - pageserver_http.configure_failpoints([("attach-before-activate", "return(5000)")]) + pageserver_http.configure_failpoints([("attach-before-activate-sleep", "return(5000)")]) env.pageserver.tenant_attach(tenant_id) @@ -681,7 +681,7 @@ def test_detach_while_activating( pageserver_http.tenant_detach(tenant_id) # And re-attach, but stop attach task_mgr task from completing - pageserver_http.configure_failpoints([("attach-before-activate", "return(600000)")]) + pageserver_http.configure_failpoints([("attach-before-activate-sleep", "return(600000)")]) env.pageserver.tenant_attach(tenant_id) # The tenant is in the Activating state. This should not block us from @@ -695,7 +695,7 @@ def test_detach_while_activating( ), "Only ignored tenant should be missing" # Subsequently attaching it again should still work - pageserver_http.configure_failpoints([("attach-before-activate", "off")]) + pageserver_http.configure_failpoints([("attach-before-activate-sleep", "off")]) env.pageserver.tenant_attach(tenant_id) wait_until_tenant_state(pageserver_http, tenant_id, "Active", 5)