From 66c52a629a0f4a503e193045e0df4c77139e344b Mon Sep 17 00:00:00 2001 From: Christian Schwarz Date: Mon, 8 Jan 2024 16:00:01 +0100 Subject: [PATCH 1/8] RFC: vectored `Timeline::get` (#6250) --- docs/rfcs/030-vectored-timeline-get.md | 142 +++++++++++++++++++++++++ 1 file changed, 142 insertions(+) create mode 100644 docs/rfcs/030-vectored-timeline-get.md diff --git a/docs/rfcs/030-vectored-timeline-get.md b/docs/rfcs/030-vectored-timeline-get.md new file mode 100644 index 0000000000..d4017471b7 --- /dev/null +++ b/docs/rfcs/030-vectored-timeline-get.md @@ -0,0 +1,142 @@ +# Vectored Timeline Get + +Created on: 2024-01-02 +Author: Christian Schwarz + +# Summary + +A brief RFC / GitHub Epic describing a vectored version of the `Timeline::get` method that is at the heart of Pageserver. + +# Motivation + +During basebackup, we issue many `Timeline::get` calls for SLRU pages that are *adjacent* in key space. +For an example, see +https://github.com/neondatabase/neon/blob/5c88213eaf1b1e29c610a078d0b380f69ed49a7e/pageserver/src/basebackup.rs#L281-L302. + +Each of these `Timeline::get` calls must traverse the layer map to gather reconstruct data (`Timeline::get_reconstruct_data`) for the requested page number (`blknum` in the example). +For each layer visited by layer map traversal, we do a `DiskBtree` point lookup. +If it's negative (no entry), we resume layer map traversal. +If it's positive, we collect the result in our reconstruct data bag. +If the reconstruct data bag contents suffice to reconstruct the page, we're done with `get_reconstruct_data` and move on to walredo. +Otherwise, we resume layer map traversal. + +Doing this many `Timeline::get` calls is quite inefficient because: + +1. We do the layer map traversal repeatedly, even if, e.g., all the data sits in the same image layer at the bottom of the stack. +2. We may visit many DiskBtree inner pages multiple times for point lookup of different keys. + This is likely particularly bad for L0s which span the whole key space and hence must be visited by layer map traversal, but + may not contain the data we're looking for. +3. Anecdotally, keys adjacent in keyspace and written simultaneously also end up physically adjacent in the layer files [^1]. + So, to provide the reconstruct data for N adjacent keys, we would actually only _need_ to issue a single large read to the filesystem, instead of the N reads we currently do. + The filesystem, in turn, ideally stores the layer file physically contiguously, so our large read will turn into one IOP toward the disk. + +[^1]: https://www.notion.so/neondatabase/Christian-Investigation-Slow-Basebackups-Early-2023-12-34ea5c7dcdc1485d9ac3731da4d2a6fc?pvs=4#15ee4e143392461fa64590679c8f54c9 + +# Solution + +We should have a vectored aka batched aka scatter-gather style alternative API for `Timeline::get`. Having such an API unlocks: + +* more efficient basebackup +* batched IO during compaction (useful for strides of unchanged pages) +* page_service: expose vectored get_page_at_lsn for compute (=> good for seqscan / prefetch) + * if [on-demand SLRU downloads](https://github.com/neondatabase/neon/pull/6151) land before vectored Timeline::get, on-demand SLRU downloads will still benefit from this API + +# DoD + +There is a new variant of `Timeline::get`, called `Timeline::get_vectored`. +It takes as arguments an `lsn: Lsn` and a `src: &[KeyVec]` where `struct KeyVec { base: Key, count: usize }`. + +It is up to the implementor to figure out a suitable and efficient way to return the reconstructed page images. +It is sufficient to simply return a `Vec`, but, likely more efficient solutions can be found after studying all the callers of `Timeline::get`. + +Functionally, the behavior of `Timeline::get_vectored` is equivalent to + +```rust +let mut keys_iter: impl Iterator + = src.map(|KeyVec{ base, count }| (base..base+count)).flatten(); +let mut out = Vec::new(); +for key in keys_iter { + let data = Timeline::get(key, lsn)?; + out.push(data); +} +return out; +``` + +However, unlike above, an ideal solution will + +* Visit each `struct Layer` at most once. +* For each visited layer, call `Layer::get_value_reconstruct_data` at most once. + * This means, read each `DiskBtree` page at most once. +* Facilitate merging of the reads we issue to the OS and eventually NVMe. + +Each of these items above represents a signficant amount of work. + +## Performance + +Ideally, the **base performance** of a vectored get of a single page should be identical to the current `Timeline::get`. +A reasonable constant overhead over current `Timeline::get` is acceptable. + +The performance improvement for the vectored use case is demonstrated in some way, e.g., using the `pagebench` basebackup benchmark against a tenant with a lot of SLRU segments. + +# Implementation + +High-level set of tasks / changes to be made: + +- **Get clarity on API**: + - Define naive `Timeline::get_vectored` implementation & adopt it across pageserver. + - The tricky thing here will be the return type (e.g. `Vec` vs `impl Stream`). + - Start with something simple to explore the different usages of the API. + Then iterate with peers until we have something that is good enough. +- **Vectored Layer Map traversal** + - Vectored `LayerMap::search` (take 1 LSN and N `Key`s instead of just 1 LSN and 1 `Key`) + - Refactor `Timeline::get_reconstruct_data` to hold & return state for N `Key`s instead of 1 + - The slightly tricky part here is what to do about `cont_lsn` [after we've found some reconstruct data for some keys](https://github.com/neondatabase/neon/blob/d066dad84b076daf3781cdf9a692098889d3974e/pageserver/src/tenant/timeline.rs#L2378-L2385) + but need more. + Likely we'll need to keep track of `cont_lsn` per key and continue next iteration at `max(cont_lsn)` of all keys that still need data. +- **Vectored `Layer::get_value_reconstruct_data` / `DiskBtree`** + - Current code calls it [here](https://github.com/neondatabase/neon/blob/d066dad84b076daf3781cdf9a692098889d3974e/pageserver/src/tenant/timeline.rs#L2378-L2384). + - Delta layers use `DiskBtreeReader::visit()` to collect the `(offset,len)` pairs for delta record blobs to load. + - Image layers use `DiskBtreeReader::get` to get the offset of the image blob to load. Underneath, that's just a `::visit()` call. + - What needs to happen to `DiskBtree::visit()`? + * Minimally + * take a single `KeyVec` instead of a single `Key` as argument, i.e., take a single contiguous key range to visit. + * Change the visit code to to invoke the callback for all values in the `KeyVec`'s key range + * This should be good enough for what we've seen when investigating basebackup slowness, because there, the key ranges are contiguous. + * Ideally: + * Take a `&[KeyVec]`, sort it; + * during Btree traversal, peek at the next `KeyVec` range to determine whether we need to descend or back out. + * NB: this should be a straight-forward extension of the minimal solution above, as we'll already be checking for "is there more key range in the requested `KeyVec`". +- **Facilitate merging of the reads we issue to the OS and eventually NVMe.** + - The `DiskBtree::visit` produces a set of offsets which we then read from a `VirtualFile` [here](https://github.com/neondatabase/neon/blob/292281c9dfb24152b728b1a846cc45105dac7fe0/pageserver/src/tenant/storage_layer/delta_layer.rs#L772-L804) + - [Delta layer reads](https://github.com/neondatabase/neon/blob/292281c9dfb24152b728b1a846cc45105dac7fe0/pageserver/src/tenant/storage_layer/delta_layer.rs#L772-L804) + - We hit (and rely) on `PageCache` and `VirtualFile here (not great under pressure) + - [Image layer reads](https://github.com/neondatabase/neon/blob/292281c9dfb24152b728b1a846cc45105dac7fe0/pageserver/src/tenant/storage_layer/image_layer.rs#L429-L435) + - What needs to happen is the **vectorization of the `blob_io` interface and then the `VirtualFile` API**. + - That is tricky because + - the `VirtualFile` API, which sits underneath `blob_io`, is being touched by ongoing [io_uring work](https://github.com/neondatabase/neon/pull/5824) + - there's the question how IO buffers will be managed; currently this area relies heavily on `PageCache`, but there's controversy around the future of `PageCache`. + - The guiding principle here should be to avoid coupling this work to the `PageCache`. + - I.e., treat `PageCache` as an extra hop in the I/O chain, rather than as an integral part of buffer management. + + +Let's see how we can improve by doing the first three items in above list first, then revisit. + +## Rollout / Feature Flags + +No feature flags are required for this epic. + +At the end of this epic, `Timeline::get` forwards to `Timeline::get_vectored`, i.e., it's an all-or-nothing type of change. + +It is encouraged to deliver this feature incrementally, i.e., do many small PRs over multiple weeks. +That will help isolate performance regressions across weekly releases. + +# Interaction With Sharding + +[Sharding](https://github.com/neondatabase/neon/pull/5432) splits up the key space, see functions `is_key_local` / `key_to_shard_number`. + +Just as with `Timeline::get`, callers of `Timeline::get_vectored` are responsible for ensuring that they only ask for blocks of the given `struct Timeline`'s shard. + +Given that this is already the case, there shouldn't be significant interaction/interference with sharding. + +However, let's have a safety check for this constraint (error or assertion) because there are currently few affordances at the higher layers of Pageserver for sharding<=>keyspace interaction. +For example, `KeySpace` is not broken up by shard stripe, so if someone naively converted the compaction code to issue a vectored get for a keyspace range it would violate this constraint. From d5e34343718c9b69f25716cc2f315b4436be072d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Arpad=20M=C3=BCller?= Date: Tue, 9 Jan 2024 05:22:36 +0100 Subject: [PATCH 2/8] Also allow unnecessary_fallible_conversions lint (#6294) This fixes the clippy lint firing on macOS on the conversion which needed for portability. For some reason, the logic in https://github.com/rust-lang/rust-clippy/pull/11669 to avoid an overlap is not working. --- pageserver/src/statvfs.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/pageserver/src/statvfs.rs b/pageserver/src/statvfs.rs index 08b5264290..45a516566f 100644 --- a/pageserver/src/statvfs.rs +++ b/pageserver/src/statvfs.rs @@ -23,7 +23,7 @@ impl Statvfs { } // NB: allow() because the block count type is u32 on macOS. - #[allow(clippy::useless_conversion)] + #[allow(clippy::useless_conversion, clippy::unnecessary_fallible_conversions)] pub fn blocks(&self) -> u64 { match self { Statvfs::Real(stat) => u64::try_from(stat.blocks()).unwrap(), @@ -32,7 +32,7 @@ impl Statvfs { } // NB: allow() because the block count type is u32 on macOS. - #[allow(clippy::useless_conversion)] + #[allow(clippy::useless_conversion, clippy::unnecessary_fallible_conversions)] pub fn blocks_available(&self) -> u64 { match self { Statvfs::Real(stat) => u64::try_from(stat.blocks_available()).unwrap(), From 9bf7664049c63f7d8e087f11eebe45006beb75ba Mon Sep 17 00:00:00 2001 From: Em Sharnoff Date: Mon, 8 Jan 2024 21:12:39 -0800 Subject: [PATCH 3/8] vm-monitor: Remove spammy log line (#6284) During a previous incident, we noticed that this particular line can be repeatedly logged every 100ms if the memory usage continues is persistently high enough to warrant upscaling. Per the added comment: Ideally we'd still like to include this log line, because it's useful information, but the simple way to include it produces far too many log lines, and the more complex ways to deduplicate the log lines while still including the information are probably not worth the effort right now. --- libs/vm_monitor/src/runner.rs | 11 +++++------ 1 file changed, 5 insertions(+), 6 deletions(-) diff --git a/libs/vm_monitor/src/runner.rs b/libs/vm_monitor/src/runner.rs index f162f53d24..ba37966476 100644 --- a/libs/vm_monitor/src/runner.rs +++ b/libs/vm_monitor/src/runner.rs @@ -446,12 +446,11 @@ impl Runner { if let Some(t) = self.last_upscale_request_at { let elapsed = t.elapsed(); if elapsed < Duration::from_secs(1) { - info!( - elapsed_millis = elapsed.as_millis(), - avg_non_reclaimable = bytes_to_mebibytes(cgroup_mem_stat.avg_non_reclaimable), - threshold = bytes_to_mebibytes(cgroup.threshold), - "cgroup memory stats are high enough to upscale but too soon to forward the request, ignoring", - ); + // *Ideally* we'd like to log here that we're ignoring the fact the + // memory stats are too high, but in practice this can result in + // spamming the logs with repetitive messages about ignoring the signal + // + // See https://github.com/neondatabase/neon/issues/5865 for more. continue; } } From 4b6004e8c9ca3cbef8e9efb6aed951160d0697a0 Mon Sep 17 00:00:00 2001 From: Christian Schwarz Date: Tue, 9 Jan 2024 10:22:43 +0100 Subject: [PATCH 4/8] fix(page_service client): correctly deserialize pagestream errors (#6302) Before this PR, we wouldn't advance the underlying `Bytes`'s cursor. fixes https://github.com/neondatabase/neon/issues/6298 --- libs/pageserver_api/src/models.rs | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/libs/pageserver_api/src/models.rs b/libs/pageserver_api/src/models.rs index dea925b468..316d79b634 100644 --- a/libs/pageserver_api/src/models.rs +++ b/libs/pageserver_api/src/models.rs @@ -2,7 +2,7 @@ pub mod partitioning; use std::{ collections::HashMap, - io::Read, + io::{BufRead, Read}, num::{NonZeroU64, NonZeroUsize}, time::SystemTime, }; @@ -813,9 +813,10 @@ impl PagestreamBeMessage { PagestreamBeMessage::GetPage(PagestreamGetPageResponse { page: page.into() }) } Tag::Error => { - let buf = buf.get_ref(); - let cstr = std::ffi::CStr::from_bytes_until_nul(buf)?; - let rust_str = cstr.to_str()?; + let mut msg = Vec::new(); + buf.read_until(0, &mut msg)?; + let cstring = std::ffi::CString::from_vec_with_nul(msg)?; + let rust_str = cstring.to_str()?; PagestreamBeMessage::Error(PagestreamErrorResponse { message: rust_str.to_owned(), }) From 90e0219b2938a95cf6ca36a29252db326a1cf98f Mon Sep 17 00:00:00 2001 From: Christian Schwarz Date: Tue, 9 Jan 2024 11:15:46 +0100 Subject: [PATCH 5/8] python tests: support overlayfs for NeonEnvBuilder.from_repo_dir (#6295) Part of #5771 Extracted from https://github.com/neondatabase/neon/pull/6214 This PR makes the test suite sensitive to the new env var `NEON_ENV_BUILDER_FROM_REPO_DIR_USE_OVERLAYFS`. If it is set, `NeonEnvBuilder.from_repo_dir` uses overlayfs to duplicate the the snapshot repo dir contents. Since mounting requires root privileges, we use sudo to perform the mounts. That, and macOS support, is also why copytree remains the default. If we ever run on a filesystem with copy reflink support, we should consider that as an alternative. This PR can be tried on a Linux machine on the `test_backward_compatiblity` test, which uses `from_repo_dir`. --- test_runner/fixtures/neon_fixtures.py | 163 ++++++++++++++++++++++++-- test_runner/fixtures/overlayfs.py | 16 +++ 2 files changed, 170 insertions(+), 9 deletions(-) create mode 100644 test_runner/fixtures/overlayfs.py diff --git a/test_runner/fixtures/neon_fixtures.py b/test_runner/fixtures/neon_fixtures.py index 001d4e23a9..353416a2ed 100644 --- a/test_runner/fixtures/neon_fixtures.py +++ b/test_runner/fixtures/neon_fixtures.py @@ -40,6 +40,7 @@ from psycopg2.extensions import make_dsn, parse_dsn from typing_extensions import Literal from urllib3.util.retry import Retry +from fixtures import overlayfs from fixtures.broker import NeonBroker from fixtures.log_helper import log from fixtures.pageserver.allowed_errors import ( @@ -424,6 +425,7 @@ class NeonEnvBuilder: pg_version: PgVersion, test_name: str, test_output_dir: Path, + test_overlay_dir: Optional[Path] = None, pageserver_remote_storage: Optional[RemoteStorage] = None, pageserver_config_override: Optional[str] = None, num_safekeepers: int = 1, @@ -468,6 +470,8 @@ class NeonEnvBuilder: self.initial_timeline = initial_timeline or TimelineId.generate() self.scrub_on_exit = False self.test_output_dir = test_output_dir + self.test_overlay_dir = test_overlay_dir + self.overlay_mounts_created_by_us: List[Tuple[str, Path]] = [] assert test_name.startswith( "test_" @@ -547,7 +551,10 @@ class NeonEnvBuilder: tenants_to_dir = self.repo_dir / ps_dir.name / "tenants" log.info(f"Copying pageserver tenants directory {tenants_from_dir} to {tenants_to_dir}") - shutil.copytree(tenants_from_dir, tenants_to_dir) + if self.test_overlay_dir is None: + shutil.copytree(tenants_from_dir, tenants_to_dir) + else: + self.overlay_mount(f"{ps_dir.name}:tenants", tenants_from_dir, tenants_to_dir) for sk_from_dir in (repo_dir / "safekeepers").glob("sk*"): sk_to_dir = self.repo_dir / "safekeepers" / sk_from_dir.name @@ -556,9 +563,16 @@ class NeonEnvBuilder: shutil.copytree(sk_from_dir, sk_to_dir, ignore=shutil.ignore_patterns("*.log", "*.pid")) shutil.rmtree(self.repo_dir / "local_fs_remote_storage", ignore_errors=True) - shutil.copytree( - repo_dir / "local_fs_remote_storage", self.repo_dir / "local_fs_remote_storage" - ) + if self.test_overlay_dir is None: + shutil.copytree( + repo_dir / "local_fs_remote_storage", self.repo_dir / "local_fs_remote_storage" + ) + else: + self.overlay_mount( + "local_fs_remote_storage", + repo_dir / "local_fs_remote_storage", + self.repo_dir / "local_fs_remote_storage", + ) if (attachments_json := Path(repo_dir / "attachments.json")).exists(): shutil.copyfile(attachments_json, self.repo_dir / attachments_json.name) @@ -575,6 +589,69 @@ class NeonEnvBuilder: return self.env + def overlay_mount(self, ident: str, srcdir: Path, dstdir: Path): + """ + Mount `srcdir` as an overlayfs mount at `dstdir`. + The overlayfs `upperdir` and `workdir` will be placed in test_overlay_dir. + """ + assert self.test_overlay_dir + assert ( + self.test_output_dir in dstdir.parents + ) # so that teardown & test_overlay_dir fixture work + assert srcdir.is_dir() + dstdir.mkdir(exist_ok=False, parents=False) + ident_state_dir = self.test_overlay_dir / ident + upper = ident_state_dir / "upper" + work = ident_state_dir / "work" + ident_state_dir.mkdir( + exist_ok=False, parents=False + ) # exists_ok=False also checks uniqueness in self.overlay_mounts + upper.mkdir() + work.mkdir() + cmd = [ + "sudo", + "mount", + "-t", + "overlay", + "overlay", + "-o", + f"lowerdir={srcdir},upperdir={upper},workdir={work}", + str(dstdir), + ] + log.info(f"Mounting overlayfs srcdir={srcdir} dstdir={dstdir}: {cmd}") + subprocess_capture( + self.test_output_dir, cmd, check=True, echo_stderr=True, echo_stdout=True + ) + self.overlay_mounts_created_by_us.append((ident, dstdir)) + + def overlay_cleanup_teardown(self): + """ + Unmount the overlayfs mounts created by `self.overlay_mount()`. + Supposed to be called during env teardown. + """ + if self.test_overlay_dir is None: + return + while len(self.overlay_mounts_created_by_us) > 0: + (ident, mountpoint) = self.overlay_mounts_created_by_us.pop() + ident_state_dir = self.test_overlay_dir / ident + cmd = ["sudo", "umount", str(mountpoint)] + log.info( + f"Unmounting overlayfs mount created during setup for ident {ident} at {mountpoint}: {cmd}" + ) + subprocess_capture( + self.test_output_dir, cmd, check=True, echo_stderr=True, echo_stdout=True + ) + log.info( + f"Cleaning up overlayfs state dir (owned by root user) for ident {ident} at {ident_state_dir}" + ) + cmd = ["sudo", "rm", "-rf", str(ident_state_dir)] + subprocess_capture( + self.test_output_dir, cmd, check=True, echo_stderr=True, echo_stdout=True + ) + + # assert all overlayfs mounts in our test directory are gone + assert [] == list(overlayfs.iter_mounts_beneath(self.test_overlay_dir)) + def enable_scrub_on_exit(self): """ Call this if you would like the fixture to automatically run @@ -681,7 +758,10 @@ class NeonEnvBuilder: sk.stop(immediate=True) for pageserver in self.env.pageservers: - pageserver.assert_no_metric_errors() + # if the test threw an exception, don't check for errors + # as a failing assertion would cause the cleanup below to fail + if exc_type is not None: + pageserver.assert_no_metric_errors() pageserver.stop(immediate=True) @@ -696,6 +776,13 @@ class NeonEnvBuilder: log.error(f"Error during remote storage scrub: {e}") cleanup_error = e + try: + self.overlay_cleanup_teardown() + except Exception as e: + log.error(f"Error cleaning up overlay state: {e}") + if cleanup_error is not None: + cleanup_error = e + try: self.cleanup_remote_storage() except Exception as e: @@ -1017,6 +1104,7 @@ def neon_env_builder( default_broker: NeonBroker, run_id: uuid.UUID, request: FixtureRequest, + test_overlay_dir: Path, ) -> Iterator[NeonEnvBuilder]: """ Fixture to create a Neon environment for test. @@ -1047,6 +1135,7 @@ def neon_env_builder( preserve_database_files=pytestconfig.getoption("--preserve-database-files"), test_name=request.node.name, test_output_dir=test_output_dir, + test_overlay_dir=test_overlay_dir, ) as builder: yield builder @@ -3194,10 +3283,10 @@ class S3Scrubber: raise -def get_test_output_dir(request: FixtureRequest, top_output_dir: Path) -> Path: - """Compute the working directory for an individual test.""" +def _get_test_dir(request: FixtureRequest, top_output_dir: Path, prefix: str) -> Path: + """Compute the path to a working directory for an individual test.""" test_name = request.node.name - test_dir = top_output_dir / test_name.replace("/", "-") + test_dir = top_output_dir / f"{prefix}{test_name.replace('/', '-')}" # We rerun flaky tests multiple times, use a separate directory for each run. if (suffix := getattr(request.node, "execution_count", None)) is not None: @@ -3209,6 +3298,21 @@ def get_test_output_dir(request: FixtureRequest, top_output_dir: Path) -> Path: return test_dir +def get_test_output_dir(request: FixtureRequest, top_output_dir: Path) -> Path: + """ + The working directory for a test. + """ + return _get_test_dir(request, top_output_dir, "") + + +def get_test_overlay_dir(request: FixtureRequest, top_output_dir: Path) -> Path: + """ + Directory that contains `upperdir` and `workdir` for overlayfs mounts + that a test creates. See `NeonEnvBuilder.overlay_mount`. + """ + return _get_test_dir(request, top_output_dir, "overlay-") + + def get_test_repo_dir(request: FixtureRequest, top_output_dir: Path) -> Path: return get_test_output_dir(request, top_output_dir) / "repo" @@ -3236,8 +3340,12 @@ SMALL_DB_FILE_NAME_REGEX: re.Pattern = re.compile( # type: ignore[type-arg] # scope. So it uses the get_test_output_dir() function to get the path, and # this fixture ensures that the directory exists. That works because # 'autouse' fixtures are run before other fixtures. +# +# NB: we request the overlay dir fixture so the fixture does its cleanups @pytest.fixture(scope="function", autouse=True) -def test_output_dir(request: FixtureRequest, top_output_dir: Path) -> Iterator[Path]: +def test_output_dir( + request: FixtureRequest, top_output_dir: Path, test_overlay_dir: Path +) -> Iterator[Path]: """Create the working directory for an individual test.""" # one directory per test @@ -3251,6 +3359,43 @@ def test_output_dir(request: FixtureRequest, top_output_dir: Path) -> Iterator[P allure_attach_from_dir(test_dir) +@pytest.fixture(scope="function") +def test_overlay_dir(request: FixtureRequest, top_output_dir: Path) -> Optional[Path]: + """ + Idempotently create a test's overlayfs mount state directory. + If the functionality isn't enabled via env var, returns None. + + The procedure cleans up after previous runs that were aborted (e.g. due to Ctrl-C, OOM kills, etc). + """ + + if os.getenv("NEON_ENV_BUILDER_FROM_REPO_DIR_USE_OVERLAYFS") is None: + return None + + overlay_dir = get_test_overlay_dir(request, top_output_dir) + log.info(f"test_overlay_dir is {overlay_dir}") + + overlay_dir.mkdir(exist_ok=True) + # unmount stale overlayfs mounts which subdirectories of `overlay_dir/*` as the overlayfs `upperdir` and `workdir` + for mountpoint in overlayfs.iter_mounts_beneath(get_test_output_dir(request, top_output_dir)): + cmd = ["sudo", "umount", str(mountpoint)] + log.info( + f"Unmounting stale overlayfs mount probably created during earlier test run: {cmd}" + ) + subprocess.run(cmd, capture_output=True, check=True) + # the overlayfs `workdir`` is owned by `root`, shutil.rmtree won't work. + cmd = ["sudo", "rm", "-rf", str(overlay_dir)] + subprocess.run(cmd, capture_output=True, check=True) + + overlay_dir.mkdir() + + return overlay_dir + + # no need to clean up anything: on clean shutdown, + # NeonEnvBuilder.overlay_cleanup_teardown takes care of cleanup + # and on unclean shutdown, this function will take care of it + # on the next test run + + SKIP_DIRS = frozenset( ( "pg_wal", diff --git a/test_runner/fixtures/overlayfs.py b/test_runner/fixtures/overlayfs.py new file mode 100644 index 0000000000..3e2f661893 --- /dev/null +++ b/test_runner/fixtures/overlayfs.py @@ -0,0 +1,16 @@ +from pathlib import Path +from typing import Iterator + +import psutil + + +def iter_mounts_beneath(topdir: Path) -> Iterator[Path]: + """ + Iterate over the overlayfs mounts beneath the specififed `topdir`. + The `topdir` itself isn't considered. + """ + for part in psutil.disk_partitions(all=True): + if part.fstype == "overlay": + mountpoint = Path(part.mountpoint) + if topdir in mountpoint.parents: + yield mountpoint From 8186f6b6f9f9dbfb8dbb8bedf365eeeb7efc4aeb Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Arpad=20M=C3=BCller?= Date: Tue, 9 Jan 2024 11:20:08 +0100 Subject: [PATCH 6/8] Drop async_trait usage from three internal traits (#6305) This uses the [newly stable](https://blog.rust-lang.org/2023/12/21/async-fn-rpit-in-traits.html) async trait feature for three internal traits. One requires `Send` bounds to be present so uses `impl Future<...> + Send` instead. Advantages: * less macro usage * no extra boxing Disadvantages: * impl syntax needed for `Send` bounds is a bit more verbose (but only required in one place) --- pageserver/client/src/mgmt_api.rs | 6 ++---- pageserver/src/control_plane_client.rs | 11 ++++++----- pageserver/src/deletion_queue.rs | 1 - pageserver/src/tenant/secondary/downloader.rs | 1 - pageserver/src/tenant/secondary/heatmap_uploader.rs | 1 - pageserver/src/tenant/secondary/scheduler.rs | 2 -- 6 files changed, 8 insertions(+), 14 deletions(-) diff --git a/pageserver/client/src/mgmt_api.rs b/pageserver/client/src/mgmt_api.rs index 4c285293f7..bfe1520e68 100644 --- a/pageserver/client/src/mgmt_api.rs +++ b/pageserver/client/src/mgmt_api.rs @@ -28,14 +28,12 @@ pub enum Error { pub type Result = std::result::Result; -#[async_trait::async_trait] -pub trait ResponseErrorMessageExt: Sized { +pub(crate) trait ResponseErrorMessageExt: Sized { async fn error_from_body(self) -> Result; } -#[async_trait::async_trait] impl ResponseErrorMessageExt for reqwest::Response { - async fn error_from_body(mut self) -> Result { + async fn error_from_body(self) -> Result { let status = self.status(); if !(status.is_client_error() || status.is_server_error()) { return Ok(self); diff --git a/pageserver/src/control_plane_client.rs b/pageserver/src/control_plane_client.rs index 25ae3d1b01..950791ea48 100644 --- a/pageserver/src/control_plane_client.rs +++ b/pageserver/src/control_plane_client.rs @@ -1,5 +1,6 @@ use std::collections::HashMap; +use futures::Future; use pageserver_api::{ control_api::{ ReAttachRequest, ReAttachResponse, ValidateRequest, ValidateRequestTenant, ValidateResponse, @@ -28,13 +29,14 @@ pub enum RetryForeverError { ShuttingDown, } -#[async_trait::async_trait] pub trait ControlPlaneGenerationsApi { - async fn re_attach(&self) -> Result, RetryForeverError>; - async fn validate( + fn re_attach( + &self, + ) -> impl Future, RetryForeverError>> + Send; + fn validate( &self, tenants: Vec<(TenantShardId, Generation)>, - ) -> Result, RetryForeverError>; + ) -> impl Future, RetryForeverError>> + Send; } impl ControlPlaneClient { @@ -123,7 +125,6 @@ impl ControlPlaneClient { } } -#[async_trait::async_trait] impl ControlPlaneGenerationsApi for ControlPlaneClient { /// Block until we get a successful response, or error out if we are shut down async fn re_attach(&self) -> Result, RetryForeverError> { diff --git a/pageserver/src/deletion_queue.rs b/pageserver/src/deletion_queue.rs index 7b05745483..6a820e1bdc 100644 --- a/pageserver/src/deletion_queue.rs +++ b/pageserver/src/deletion_queue.rs @@ -831,7 +831,6 @@ mod test { } } - #[async_trait::async_trait] impl ControlPlaneGenerationsApi for MockControlPlane { #[allow(clippy::diverging_sub_expression)] // False positive via async_trait async fn re_attach(&self) -> Result, RetryForeverError> { diff --git a/pageserver/src/tenant/secondary/downloader.rs b/pageserver/src/tenant/secondary/downloader.rs index 6fdee08a4e..2a79c406cf 100644 --- a/pageserver/src/tenant/secondary/downloader.rs +++ b/pageserver/src/tenant/secondary/downloader.rs @@ -186,7 +186,6 @@ type Scheduler = TenantBackgroundJobs< DownloadCommand, >; -#[async_trait::async_trait] impl JobGenerator for SecondaryDownloader { diff --git a/pageserver/src/tenant/secondary/heatmap_uploader.rs b/pageserver/src/tenant/secondary/heatmap_uploader.rs index ef01c33e8e..df865658a4 100644 --- a/pageserver/src/tenant/secondary/heatmap_uploader.rs +++ b/pageserver/src/tenant/secondary/heatmap_uploader.rs @@ -134,7 +134,6 @@ type Scheduler = TenantBackgroundJobs< UploadCommand, >; -#[async_trait::async_trait] impl JobGenerator for HeatmapUploader { diff --git a/pageserver/src/tenant/secondary/scheduler.rs b/pageserver/src/tenant/secondary/scheduler.rs index cf01a100d9..58bdb54161 100644 --- a/pageserver/src/tenant/secondary/scheduler.rs +++ b/pageserver/src/tenant/secondary/scheduler.rs @@ -1,4 +1,3 @@ -use async_trait; use futures::Future; use std::{ collections::HashMap, @@ -65,7 +64,6 @@ where _phantom: PhantomData<(PJ, RJ, C, CMD)>, } -#[async_trait::async_trait] pub(crate) trait JobGenerator where C: Completion, From 4b9b4c2c365f304e91ceaa1d6a9a70a0913983db Mon Sep 17 00:00:00 2001 From: John Spray Date: Tue, 9 Jan 2024 10:37:54 +0000 Subject: [PATCH 7/8] pageserver: cleanup redundant create/attach code, fix detach while attaching (#6277) ## Problem The code for tenant create and tenant attach was just a special case of what upsert_location does. ## Summary of changes - Use `upsert_location` for create and attach APIs - Clean up error handling in upsert_location so that it can generate appropriate HTTP response codes - Update tests that asserted the old non-idempotent behavior of attach - Rework the `test_ignore_while_attaching` test, and fix tenant shutdown during activation, which this test was supposed to cover, but it was actually just waiting for activation to complete. --- libs/utils/src/failpoint_support.rs | 38 +++++ pageserver/src/http/routes.rs | 113 +++++++++----- pageserver/src/tenant.rs | 150 ++---------------- pageserver/src/tenant/mgr.rs | 171 +++++++-------------- test_runner/fixtures/neon_fixtures.py | 10 +- test_runner/regress/test_remote_storage.py | 23 +-- test_runner/regress/test_tenant_detach.py | 56 +++---- test_runner/regress/test_tenants.py | 21 +-- 8 files changed, 234 insertions(+), 348 deletions(-) diff --git a/libs/utils/src/failpoint_support.rs b/libs/utils/src/failpoint_support.rs index 5ec532e2a6..8704b72921 100644 --- a/libs/utils/src/failpoint_support.rs +++ b/libs/utils/src/failpoint_support.rs @@ -15,6 +15,10 @@ use tracing::*; /// specified time (in milliseconds). The main difference is that we use async /// tokio sleep function. Another difference is that we print lines to the log, /// which can be useful in tests to check that the failpoint was hit. +/// +/// Optionally pass a cancellation token, and this failpoint will drop out of +/// its sleep when the cancellation token fires. This is useful for testing +/// cases where we would like to block something, but test its clean shutdown behavior. #[macro_export] macro_rules! __failpoint_sleep_millis_async { ($name:literal) => {{ @@ -30,6 +34,24 @@ macro_rules! __failpoint_sleep_millis_async { $crate::failpoint_support::failpoint_sleep_helper($name, duration_str).await } }}; + ($name:literal, $cancel:expr) => {{ + // If the failpoint is used with a "return" action, set should_sleep to the + // returned value (as string). Otherwise it's set to None. + let should_sleep = (|| { + ::fail::fail_point!($name, |x| x); + ::std::option::Option::None + })(); + + // Sleep if the action was a returned value + if let ::std::option::Option::Some(duration_str) = should_sleep { + $crate::failpoint_support::failpoint_sleep_cancellable_helper( + $name, + duration_str, + $cancel, + ) + .await + } + }}; } pub use __failpoint_sleep_millis_async as sleep_millis_async; @@ -45,6 +67,22 @@ pub async fn failpoint_sleep_helper(name: &'static str, duration_str: String) { tracing::info!("failpoint {:?}: sleep done", name); } +// Helper function used by the macro. (A function has nicer scoping so we +// don't need to decorate everything with "::") +#[doc(hidden)] +pub async fn failpoint_sleep_cancellable_helper( + name: &'static str, + duration_str: String, + cancel: &CancellationToken, +) { + let millis = duration_str.parse::().unwrap(); + let d = std::time::Duration::from_millis(millis); + + tracing::info!("failpoint {:?}: sleeping for {:?}", name, d); + tokio::time::timeout(d, cancel.cancelled()).await.ok(); + tracing::info!("failpoint {:?}: sleep done", name); +} + pub fn init() -> fail::FailScenario<'static> { // The failpoints lib provides support for parsing the `FAILPOINTS` env var. // We want non-default behavior for `exit`, though, so, we handle it separately. diff --git a/pageserver/src/http/routes.rs b/pageserver/src/http/routes.rs index 5c7747d353..feca08aeaf 100644 --- a/pageserver/src/http/routes.rs +++ b/pageserver/src/http/routes.rs @@ -15,6 +15,7 @@ use hyper::StatusCode; use hyper::{Body, Request, Response, Uri}; use metrics::launch_timestamp::LaunchTimestamp; use pageserver_api::models::TenantDetails; +use pageserver_api::models::TenantState; use pageserver_api::models::{ DownloadRemoteLayersTaskSpawnRequest, LocationConfigMode, TenantAttachRequest, TenantLoadRequest, TenantLocationConfigRequest, @@ -37,6 +38,7 @@ use crate::pgdatadir_mapping::LsnForTimestamp; use crate::task_mgr::TaskKind; use crate::tenant::config::{LocationConf, TenantConfOpt}; use crate::tenant::mgr::GetActiveTenantError; +use crate::tenant::mgr::UpsertLocationError; use crate::tenant::mgr::{ GetTenantError, SetNewTenantConfigError, TenantManager, TenantMapError, TenantMapInsertError, TenantSlotError, TenantSlotUpsertError, TenantStateError, @@ -46,7 +48,8 @@ use crate::tenant::size::ModelInputs; use crate::tenant::storage_layer::LayerAccessStatsReset; use crate::tenant::timeline::CompactFlags; use crate::tenant::timeline::Timeline; -use crate::tenant::{LogicalSizeCalculationCause, PageReconstructError, TenantSharedResources}; +use crate::tenant::SpawnMode; +use crate::tenant::{LogicalSizeCalculationCause, PageReconstructError}; use crate::{config::PageServerConf, tenant::mgr}; use crate::{disk_usage_eviction_task, tenant}; use pageserver_api::models::{ @@ -112,14 +115,6 @@ impl State { secondary_controller, }) } - - fn tenant_resources(&self) -> TenantSharedResources { - TenantSharedResources { - broker_client: self.broker_client.clone(), - remote_storage: self.remote_storage.clone(), - deletion_queue_client: self.deletion_queue_client.clone(), - } - } } #[inline(always)] @@ -175,7 +170,7 @@ impl From for ApiError { NotFound(tenant_id) => { ApiError::NotFound(anyhow::anyhow!("NotFound: tenant {tenant_id}").into()) } - e @ (AlreadyExists(_, _) | Conflict(_)) => ApiError::Conflict(format!("{e}")), + e @ AlreadyExists(_, _) => ApiError::Conflict(format!("{e}")), InProgress => { ApiError::ResourceUnavailable("Tenant is being modified concurrently".into()) } @@ -194,6 +189,18 @@ impl From for ApiError { } } +impl From for ApiError { + fn from(e: UpsertLocationError) -> ApiError { + use UpsertLocationError::*; + match e { + BadRequest(e) => ApiError::BadRequest(e), + Unavailable(_) => ApiError::ShuttingDown, + e @ InProgress => ApiError::Conflict(format!("{e}")), + Flush(e) | Other(e) => ApiError::InternalServerError(e), + } + } +} + impl From for ApiError { fn from(e: TenantMapError) -> ApiError { use TenantMapError::*; @@ -680,16 +687,37 @@ async fn tenant_attach_handler( ))); } - mgr::attach_tenant( - state.conf, - tenant_id, - generation, - tenant_conf, - state.tenant_resources(), - &ctx, - ) - .instrument(info_span!("tenant_attach", %tenant_id)) - .await?; + let tenant_shard_id = TenantShardId::unsharded(tenant_id); + let location_conf = LocationConf::attached_single(tenant_conf, generation); + let tenant = state + .tenant_manager + .upsert_location( + tenant_shard_id, + location_conf, + None, + SpawnMode::Normal, + &ctx, + ) + .await?; + + let Some(tenant) = tenant else { + // This should never happen: indicates a bug in upsert_location + return Err(ApiError::InternalServerError(anyhow::anyhow!( + "Upsert succeeded but didn't return tenant!" + ))); + }; + + // We might have successfully constructed a Tenant, but it could still + // end up in a broken state: + if let TenantState::Broken { + reason, + backtrace: _, + } = tenant.current_state() + { + return Err(ApiError::InternalServerError(anyhow::anyhow!( + "Tenant state is Broken: {reason}" + ))); + } json_response(StatusCode::ACCEPTED, ()) } @@ -1148,16 +1176,25 @@ async fn tenant_create_handler( let ctx = RequestContext::new(TaskKind::MgmtRequest, DownloadBehavior::Warn); - let new_tenant = mgr::create_tenant( - state.conf, - tenant_conf, - target_tenant_id, - generation, - state.tenant_resources(), - &ctx, - ) - .instrument(info_span!("tenant_create", tenant_id = %target_tenant_id)) - .await?; + let location_conf = LocationConf::attached_single(tenant_conf, generation); + + let new_tenant = state + .tenant_manager + .upsert_location( + target_tenant_id, + location_conf, + None, + SpawnMode::Create, + &ctx, + ) + .await?; + + let Some(new_tenant) = new_tenant else { + // This should never happen: indicates a bug in upsert_location + return Err(ApiError::InternalServerError(anyhow::anyhow!( + "Upsert succeeded but didn't return tenant!" + ))); + }; // We created the tenant. Existing API semantics are that the tenant // is Active when this function returns. @@ -1166,7 +1203,7 @@ async fn tenant_create_handler( .await { // This shouldn't happen because we just created the tenant directory - // in tenant::mgr::create_tenant, and there aren't any remote timelines + // in upsert_location, and there aren't any remote timelines // to load, so, nothing can really fail during load. // Don't do cleanup because we don't know how we got here. // The tenant will likely be in `Broken` state and subsequent @@ -1267,12 +1304,14 @@ async fn put_tenant_location_config_handler( state .tenant_manager - .upsert_location(tenant_shard_id, location_conf, flush, &ctx) - .await - // TODO: badrequest assumes the caller was asking for something unreasonable, but in - // principle we might have hit something like concurrent API calls to the same tenant, - // which is not a 400 but a 409. - .map_err(ApiError::BadRequest)?; + .upsert_location( + tenant_shard_id, + location_conf, + flush, + tenant::SpawnMode::Normal, + &ctx, + ) + .await?; if let Some(_flush_ms) = flush { match state diff --git a/pageserver/src/tenant.rs b/pageserver/src/tenant.rs index 7c609452e5..b3f888c393 100644 --- a/pageserver/src/tenant.rs +++ b/pageserver/src/tenant.rs @@ -12,7 +12,7 @@ //! use anyhow::{bail, Context}; -use camino::{Utf8Path, Utf8PathBuf}; +use camino::Utf8Path; use enumset::EnumSet; use futures::stream::FuturesUnordered; use futures::FutureExt; @@ -1003,7 +1003,7 @@ impl Tenant { // IndexPart is the source of truth. self.clean_up_timelines(&existent_timelines)?; - failpoint_support::sleep_millis_async!("attach-before-activate"); + failpoint_support::sleep_millis_async!("attach-before-activate", &self.cancel); info!("Done"); @@ -2036,6 +2036,13 @@ impl Tenant { // It's mesed up. // we just ignore the failure to stop + // If we're still attaching, fire the cancellation token early to drop out: this + // will prevent us flushing, but ensures timely shutdown if some I/O during attach + // is very slow. + if matches!(self.current_state(), TenantState::Attaching) { + self.cancel.cancel(); + } + match self.set_stopping(shutdown_progress, false, false).await { Ok(()) => {} Err(SetStoppingError::Broken) => { @@ -2734,6 +2741,10 @@ impl Tenant { "# .to_string(); + fail::fail_point!("tenant-config-before-write", |_| { + anyhow::bail!("tenant-config-before-write"); + }); + // Convert the config to a toml file. conf_content += &toml_edit::ser::to_string_pretty(&location_conf)?; @@ -3650,140 +3661,6 @@ fn remove_timeline_and_uninit_mark( Ok(()) } -pub(crate) async fn create_tenant_files( - conf: &'static PageServerConf, - location_conf: &LocationConf, - tenant_shard_id: &TenantShardId, -) -> anyhow::Result { - let target_tenant_directory = conf.tenant_path(tenant_shard_id); - anyhow::ensure!( - !target_tenant_directory - .try_exists() - .context("check existence of tenant directory")?, - "tenant directory already exists", - ); - - let temporary_tenant_dir = - path_with_suffix_extension(&target_tenant_directory, TEMP_FILE_SUFFIX); - debug!("Creating temporary directory structure in {temporary_tenant_dir}"); - - // top-level dir may exist if we are creating it through CLI - crashsafe::create_dir_all(&temporary_tenant_dir).with_context(|| { - format!("could not create temporary tenant directory {temporary_tenant_dir}") - })?; - - let creation_result = try_create_target_tenant_dir( - conf, - location_conf, - tenant_shard_id, - &temporary_tenant_dir, - &target_tenant_directory, - ) - .await; - - if creation_result.is_err() { - error!( - "Failed to create directory structure for tenant {tenant_shard_id}, cleaning tmp data" - ); - if let Err(e) = fs::remove_dir_all(&temporary_tenant_dir) { - error!("Failed to remove temporary tenant directory {temporary_tenant_dir:?}: {e}") - } else if let Err(e) = crashsafe::fsync(&temporary_tenant_dir) { - error!( - "Failed to fsync removed temporary tenant directory {temporary_tenant_dir:?}: {e}" - ) - } - } - - creation_result?; - - Ok(target_tenant_directory) -} - -async fn try_create_target_tenant_dir( - conf: &'static PageServerConf, - location_conf: &LocationConf, - tenant_shard_id: &TenantShardId, - temporary_tenant_dir: &Utf8Path, - target_tenant_directory: &Utf8Path, -) -> Result<(), anyhow::Error> { - let temporary_tenant_timelines_dir = rebase_directory( - &conf.timelines_path(tenant_shard_id), - target_tenant_directory, - temporary_tenant_dir, - ) - .with_context(|| format!("resolve tenant {tenant_shard_id} temporary timelines dir"))?; - let temporary_legacy_tenant_config_path = rebase_directory( - &conf.tenant_config_path(tenant_shard_id), - target_tenant_directory, - temporary_tenant_dir, - ) - .with_context(|| format!("resolve tenant {tenant_shard_id} temporary config path"))?; - let temporary_tenant_config_path = rebase_directory( - &conf.tenant_location_config_path(tenant_shard_id), - target_tenant_directory, - temporary_tenant_dir, - ) - .with_context(|| format!("resolve tenant {tenant_shard_id} temporary config path"))?; - - Tenant::persist_tenant_config_at( - tenant_shard_id, - &temporary_tenant_config_path, - &temporary_legacy_tenant_config_path, - location_conf, - ) - .await?; - - crashsafe::create_dir(&temporary_tenant_timelines_dir).with_context(|| { - format!( - "create tenant {} temporary timelines directory {}", - tenant_shard_id, temporary_tenant_timelines_dir, - ) - })?; - fail::fail_point!("tenant-creation-before-tmp-rename", |_| { - anyhow::bail!("failpoint tenant-creation-before-tmp-rename"); - }); - - // Make sure the current tenant directory entries are durable before renaming. - // Without this, a crash may reorder any of the directory entry creations above. - crashsafe::fsync(temporary_tenant_dir) - .with_context(|| format!("sync temporary tenant directory {temporary_tenant_dir:?}"))?; - - fs::rename(temporary_tenant_dir, target_tenant_directory).with_context(|| { - format!( - "move tenant {} temporary directory {} into the permanent one {}", - tenant_shard_id, temporary_tenant_dir, target_tenant_directory - ) - })?; - let target_dir_parent = target_tenant_directory.parent().with_context(|| { - format!( - "get tenant {} dir parent for {}", - tenant_shard_id, target_tenant_directory, - ) - })?; - crashsafe::fsync(target_dir_parent).with_context(|| { - format!( - "fsync renamed directory's parent {} for tenant {}", - target_dir_parent, tenant_shard_id, - ) - })?; - - Ok(()) -} - -fn rebase_directory( - original_path: &Utf8Path, - base: &Utf8Path, - new_base: &Utf8Path, -) -> anyhow::Result { - let relative_path = original_path.strip_prefix(base).with_context(|| { - format!( - "Failed to strip base prefix '{}' off path '{}'", - base, original_path - ) - })?; - Ok(new_base.join(relative_path)) -} - /// Create the cluster temporarily in 'initdbpath' directory inside the repository /// to get bootstrap data for timeline initialization. async fn run_initdb( @@ -3878,6 +3755,7 @@ pub async fn dump_layerfile_from_path( #[cfg(test)] pub(crate) mod harness { use bytes::{Bytes, BytesMut}; + use camino::Utf8PathBuf; use once_cell::sync::OnceCell; use pageserver_api::shard::ShardIndex; use std::fs; diff --git a/pageserver/src/tenant/mgr.rs b/pageserver/src/tenant/mgr.rs index 70b41b7b1f..5d2a87d5b7 100644 --- a/pageserver/src/tenant/mgr.rs +++ b/pageserver/src/tenant/mgr.rs @@ -35,7 +35,7 @@ use crate::tenant::config::{ }; use crate::tenant::delete::DeleteTenantFlow; use crate::tenant::span::debug_assert_current_span_has_tenant_id; -use crate::tenant::{create_tenant_files, AttachedTenantConf, SpawnMode, Tenant, TenantState}; +use crate::tenant::{AttachedTenantConf, SpawnMode, Tenant, TenantState}; use crate::{InitializationOrder, IGNORED_TENANT_FILE_NAME, TEMP_FILE_SUFFIX}; use utils::crashsafe::path_with_suffix_extension; @@ -754,45 +754,6 @@ async fn shutdown_all_tenants0(tenants: &std::sync::RwLock) { // caller will log how long we took } -pub(crate) async fn create_tenant( - conf: &'static PageServerConf, - tenant_conf: TenantConfOpt, - tenant_shard_id: TenantShardId, - generation: Generation, - resources: TenantSharedResources, - ctx: &RequestContext, -) -> Result, TenantMapInsertError> { - let location_conf = LocationConf::attached_single(tenant_conf, generation); - info!("Creating tenant at location {location_conf:?}"); - - let slot_guard = - tenant_map_acquire_slot(&tenant_shard_id, TenantSlotAcquireMode::MustNotExist)?; - let tenant_path = super::create_tenant_files(conf, &location_conf, &tenant_shard_id).await?; - - let shard_identity = location_conf.shard; - let created_tenant = tenant_spawn( - conf, - tenant_shard_id, - &tenant_path, - resources, - AttachedTenantConf::try_from(location_conf)?, - shard_identity, - None, - &TENANTS, - SpawnMode::Create, - ctx, - )?; - // TODO: tenant object & its background loops remain, untracked in tenant map, if we fail here. - // See https://github.com/neondatabase/neon/issues/4233 - - let created_tenant_id = created_tenant.tenant_id(); - debug_assert_eq!(created_tenant_id, tenant_shard_id.tenant_id); - - slot_guard.upsert(TenantSlot::Attached(created_tenant.clone()))?; - - Ok(created_tenant) -} - #[derive(Debug, thiserror::Error)] pub(crate) enum SetNewTenantConfigError { #[error(transparent)] @@ -824,6 +785,24 @@ pub(crate) async fn set_new_tenant_config( Ok(()) } +#[derive(thiserror::Error, Debug)] +pub(crate) enum UpsertLocationError { + #[error("Bad config request: {0}")] + BadRequest(anyhow::Error), + + #[error("Cannot change config in this state: {0}")] + Unavailable(#[from] TenantMapError), + + #[error("Tenant is already being modified")] + InProgress, + + #[error("Failed to flush: {0}")] + Flush(anyhow::Error), + + #[error("Internal error: {0}")] + Other(#[from] anyhow::Error), +} + impl TenantManager { /// Convenience function so that anyone with a TenantManager can get at the global configuration, without /// having to pass it around everywhere as a separate object. @@ -888,8 +867,9 @@ impl TenantManager { tenant_shard_id: TenantShardId, new_location_config: LocationConf, flush: Option, + spawn_mode: SpawnMode, ctx: &RequestContext, - ) -> Result<(), anyhow::Error> { + ) -> Result>, UpsertLocationError> { debug_assert_current_span_has_tenant_id(); info!("configuring tenant location to state {new_location_config:?}"); @@ -911,9 +891,10 @@ impl TenantManager { // A transition from Attached to Attached in the same generation, we may // take our fast path and just provide the updated configuration // to the tenant. - tenant.set_new_location_config(AttachedTenantConf::try_from( - new_location_config.clone(), - )?); + tenant.set_new_location_config( + AttachedTenantConf::try_from(new_location_config.clone()) + .map_err(UpsertLocationError::BadRequest)?, + ); Some(FastPathModified::Attached(tenant.clone())) } else { @@ -940,8 +921,7 @@ impl TenantManager { match fast_path_taken { Some(FastPathModified::Attached(tenant)) => { Tenant::persist_tenant_config(self.conf, &tenant_shard_id, &new_location_config) - .await - .map_err(SetNewTenantConfigError::Persist)?; + .await?; // Transition to AttachedStale means we may well hold a valid generation // still, and have been requested to go stale as part of a migration. If @@ -954,9 +934,9 @@ impl TenantManager { if let Some(flush_timeout) = flush { match tokio::time::timeout(flush_timeout, tenant.flush_remote()).await { Ok(Err(e)) => { - return Err(e); + return Err(UpsertLocationError::Flush(e)); } - Ok(Ok(_)) => return Ok(()), + Ok(Ok(_)) => return Ok(Some(tenant)), Err(_) => { tracing::warn!( timeout_ms = flush_timeout.as_millis(), @@ -967,14 +947,13 @@ impl TenantManager { } } - return Ok(()); + return Ok(Some(tenant)); } Some(FastPathModified::Secondary(_secondary_tenant)) => { Tenant::persist_tenant_config(self.conf, &tenant_shard_id, &new_location_config) - .await - .map_err(SetNewTenantConfigError::Persist)?; + .await?; - return Ok(()); + return Ok(None); } None => { // Proceed with the general case procedure, where we will shutdown & remove any existing @@ -987,7 +966,14 @@ impl TenantManager { // the tenant is inaccessible to the outside world while we are doing this, but that is sensible: // the state is ill-defined while we're in transition. Transitions are async, but fast: we do // not do significant I/O, and shutdowns should be prompt via cancellation tokens. - let mut slot_guard = tenant_map_acquire_slot(&tenant_shard_id, TenantSlotAcquireMode::Any)?; + let mut slot_guard = tenant_map_acquire_slot(&tenant_shard_id, TenantSlotAcquireMode::Any) + .map_err(|e| match e { + TenantSlotError::AlreadyExists(_, _) | TenantSlotError::NotFound(_) => { + unreachable!("Called with mode Any") + } + TenantSlotError::InProgress => UpsertLocationError::InProgress, + TenantSlotError::MapState(s) => UpsertLocationError::Unavailable(s), + })?; match slot_guard.get_old_value() { Some(TenantSlot::Attached(tenant)) => { @@ -1025,7 +1011,9 @@ impl TenantManager { Some(TenantSlot::InProgress(_)) => { // This should never happen: acquire_slot should error out // if the contents of a slot were InProgress. - anyhow::bail!("Acquired an InProgress slot, this is a bug.") + return Err(UpsertLocationError::Other(anyhow::anyhow!( + "Acquired an InProgress slot, this is a bug." + ))); } None => { // Slot was vacant, nothing needs shutting down. @@ -1047,9 +1035,7 @@ impl TenantManager { // Before activating either secondary or attached mode, persist the // configuration, so that on restart we will re-attach (or re-start // secondary) on the tenant. - Tenant::persist_tenant_config(self.conf, &tenant_shard_id, &new_location_config) - .await - .map_err(SetNewTenantConfigError::Persist)?; + Tenant::persist_tenant_config(self.conf, &tenant_shard_id, &new_location_config).await?; let new_slot = match &new_location_config.mode { LocationMode::Secondary(secondary_config) => { @@ -1066,7 +1052,7 @@ impl TenantManager { shard_identity, None, self.tenants, - SpawnMode::Normal, + spawn_mode, ctx, )?; @@ -1074,9 +1060,20 @@ impl TenantManager { } }; - slot_guard.upsert(new_slot)?; + let attached_tenant = if let TenantSlot::Attached(tenant) = &new_slot { + Some(tenant.clone()) + } else { + None + }; - Ok(()) + slot_guard.upsert(new_slot).map_err(|e| match e { + TenantSlotUpsertError::InternalError(e) => { + UpsertLocationError::Other(anyhow::anyhow!(e)) + } + TenantSlotUpsertError::MapState(e) => UpsertLocationError::Unavailable(e), + })?; + + Ok(attached_tenant) } /// Resetting a tenant is equivalent to detaching it, then attaching it again with the same @@ -1648,55 +1645,6 @@ pub(crate) async fn list_tenants() -> Result, .collect()) } -/// Execute Attach mgmt API command. -/// -/// Downloading all the tenant data is performed in the background, this merely -/// spawns the background task and returns quickly. -pub(crate) async fn attach_tenant( - conf: &'static PageServerConf, - tenant_id: TenantId, - generation: Generation, - tenant_conf: TenantConfOpt, - resources: TenantSharedResources, - ctx: &RequestContext, -) -> Result<(), TenantMapInsertError> { - // This is a legacy API (replaced by `/location_conf`). It does not support sharding - let tenant_shard_id = TenantShardId::unsharded(tenant_id); - - let slot_guard = - tenant_map_acquire_slot(&tenant_shard_id, TenantSlotAcquireMode::MustNotExist)?; - let location_conf = LocationConf::attached_single(tenant_conf, generation); - let tenant_dir = create_tenant_files(conf, &location_conf, &tenant_shard_id).await?; - // TODO: tenant directory remains on disk if we bail out from here on. - // See https://github.com/neondatabase/neon/issues/4233 - - let shard_identity = location_conf.shard; - let attached_tenant = tenant_spawn( - conf, - tenant_shard_id, - &tenant_dir, - resources, - AttachedTenantConf::try_from(location_conf)?, - shard_identity, - None, - &TENANTS, - SpawnMode::Normal, - ctx, - )?; - // TODO: tenant object & its background loops remain, untracked in tenant map, if we fail here. - // See https://github.com/neondatabase/neon/issues/4233 - - let attached_tenant_id = attached_tenant.tenant_id(); - if tenant_id != attached_tenant_id { - return Err(TenantMapInsertError::Other(anyhow::anyhow!( - "loaded created tenant has unexpected tenant id (expect {tenant_id} != actual {attached_tenant_id})", - ))); - } - - slot_guard.upsert(TenantSlot::Attached(attached_tenant))?; - Ok(()) -} - #[derive(Debug, thiserror::Error)] pub(crate) enum TenantMapInsertError { #[error(transparent)] @@ -1710,7 +1658,7 @@ pub(crate) enum TenantMapInsertError { /// Superset of TenantMapError: issues that can occur when acquiring a slot /// for a particular tenant ID. #[derive(Debug, thiserror::Error)] -pub enum TenantSlotError { +pub(crate) enum TenantSlotError { /// When acquiring a slot with the expectation that the tenant already exists. #[error("Tenant {0} not found")] NotFound(TenantShardId), @@ -1719,9 +1667,6 @@ pub enum TenantSlotError { #[error("tenant {0} already exists, state: {1:?}")] AlreadyExists(TenantShardId, TenantState), - #[error("tenant {0} already exists in but is not attached")] - Conflict(TenantShardId), - // Tried to read a slot that is currently being mutated by another administrative // operation. #[error("tenant has a state change in progress, try again later")] diff --git a/test_runner/fixtures/neon_fixtures.py b/test_runner/fixtures/neon_fixtures.py index 353416a2ed..0fa4cfb18a 100644 --- a/test_runner/fixtures/neon_fixtures.py +++ b/test_runner/fixtures/neon_fixtures.py @@ -1917,18 +1917,24 @@ class NeonPageserver(PgProtocol): return None def tenant_attach( - self, tenant_id: TenantId, config: None | Dict[str, Any] = None, config_null: bool = False + self, + tenant_id: TenantId, + config: None | Dict[str, Any] = None, + config_null: bool = False, + generation: Optional[int] = None, ): """ Tenant attachment passes through here to acquire a generation number before proceeding to call into the pageserver HTTP client. """ client = self.http_client() + if generation is None: + generation = self.env.attachment_service.attach_hook_issue(tenant_id, self.id) return client.tenant_attach( tenant_id, config, config_null, - generation=self.env.attachment_service.attach_hook_issue(tenant_id, self.id), + generation=generation, ) def tenant_detach(self, tenant_id: TenantId): diff --git a/test_runner/regress/test_remote_storage.py b/test_runner/regress/test_remote_storage.py index 2fda56d0f4..98b2e856ec 100644 --- a/test_runner/regress/test_remote_storage.py +++ b/test_runner/regress/test_remote_storage.py @@ -144,8 +144,11 @@ def test_remote_storage_backup_and_restore( # Introduce failpoint in list remote timelines code path to make tenant_attach fail. # This is before the failures injected by test_remote_failures, so it's a permanent error. pageserver_http.configure_failpoints(("storage-sync-list-remote-timelines", "return")) - env.pageserver.allowed_errors.append( - ".*attach failed.*: storage-sync-list-remote-timelines", + env.pageserver.allowed_errors.extend( + [ + ".*attach failed.*: storage-sync-list-remote-timelines", + ".*Tenant state is Broken: storage-sync-list-remote-timelines.*", + ] ) # Attach it. This HTTP request will succeed and launch a # background task to load the tenant. In that background task, @@ -159,9 +162,13 @@ def test_remote_storage_backup_and_restore( "data": {"reason": "storage-sync-list-remote-timelines"}, } - # Ensure that even though the tenant is broken, we can't attach it again. - with pytest.raises(Exception, match=f"tenant {tenant_id} already exists, state: Broken"): - env.pageserver.tenant_attach(tenant_id) + # Ensure that even though the tenant is broken, retrying the attachment fails + with pytest.raises(Exception, match="Tenant state is Broken"): + # Use same generation as in previous attempt + gen_state = env.attachment_service.inspect(tenant_id) + assert gen_state is not None + generation = gen_state[0] + env.pageserver.tenant_attach(tenant_id, generation=generation) # Restart again, this implicitly clears the failpoint. # test_remote_failures=1 remains active, though, as it's in the pageserver config. @@ -176,10 +183,8 @@ def test_remote_storage_backup_and_restore( ), "we shouldn't have tried any layer downloads yet since list remote timelines has a failpoint" env.pageserver.start() - # Ensure that the pageserver remembers that the tenant was attaching, by - # trying to attach it again. It should fail. - with pytest.raises(Exception, match=f"tenant {tenant_id} already exists, state:"): - env.pageserver.tenant_attach(tenant_id) + # The attach should have got far enough that it recovers on restart (i.e. tenant's + # config was written to local storage). log.info("waiting for tenant to become active. this should be quick with on-demand download") wait_until_tenant_active( diff --git a/test_runner/regress/test_tenant_detach.py b/test_runner/regress/test_tenant_detach.py index c6dbc77885..d548e63cc1 100644 --- a/test_runner/regress/test_tenant_detach.py +++ b/test_runner/regress/test_tenant_detach.py @@ -627,7 +627,7 @@ def test_ignored_tenant_download_missing_layers(neon_env_builder: NeonEnvBuilder # Tests that attach is never working on a tenant, ignored or not, as long as it's not absent locally # Similarly, tests that it's not possible to schedule a `load` for tenat that's not ignored. -def test_load_attach_negatives(neon_env_builder: NeonEnvBuilder): +def test_load_negatives(neon_env_builder: NeonEnvBuilder): neon_env_builder.enable_pageserver_remote_storage(RemoteStorageKind.LOCAL_FS) env = neon_env_builder.init_start() pageserver_http = env.pageserver.http_client() @@ -644,25 +644,16 @@ def test_load_attach_negatives(neon_env_builder: NeonEnvBuilder): ): env.pageserver.tenant_load(tenant_id) - with pytest.raises( - expected_exception=PageserverApiException, - match=f"tenant {tenant_id} already exists, state: Active", - ): - env.pageserver.tenant_attach(tenant_id) - pageserver_http.tenant_ignore(tenant_id) - env.pageserver.allowed_errors.append(".*tenant directory already exists.*") - with pytest.raises( - expected_exception=PageserverApiException, - match="tenant directory already exists", - ): - env.pageserver.tenant_attach(tenant_id) - -def test_ignore_while_attaching( +def test_detach_while_activating( neon_env_builder: NeonEnvBuilder, ): + """ + Test cancellation behavior for tenants that are stuck somewhere between + being attached and reaching Active state. + """ neon_env_builder.enable_pageserver_remote_storage(RemoteStorageKind.LOCAL_FS) env = neon_env_builder.init_start() @@ -684,39 +675,28 @@ def test_ignore_while_attaching( data_secret = "very secret secret" insert_test_data(pageserver_http, tenant_id, timeline_id, data_id, data_secret, endpoint) - tenants_before_ignore = [tenant["id"] for tenant in pageserver_http.tenant_list()] + tenants_before_detach = [tenant["id"] for tenant in pageserver_http.tenant_list()] # Detach it pageserver_http.tenant_detach(tenant_id) + # And re-attach, but stop attach task_mgr task from completing - pageserver_http.configure_failpoints([("attach-before-activate", "return(5000)")]) + pageserver_http.configure_failpoints([("attach-before-activate", "return(600000)")]) env.pageserver.tenant_attach(tenant_id) - # Run ignore on the task, thereby cancelling the attach. - # XXX This should take priority over attach, i.e., it should cancel the attach task. - # But neither the failpoint, nor the proper remote_timeline_client download functions, - # are sensitive to task_mgr::shutdown. - # This problem is tracked in https://github.com/neondatabase/neon/issues/2996 . - # So, for now, effectively, this ignore here will block until attach task completes. - pageserver_http.tenant_ignore(tenant_id) - # Cannot attach it due to some local files existing - env.pageserver.allowed_errors.append(".*tenant directory already exists.*") - with pytest.raises( - expected_exception=PageserverApiException, - match="tenant directory already exists", - ): - env.pageserver.tenant_attach(tenant_id) + # The tenant is in the Activating state. This should not block us from + # shutting it down and detaching it. + pageserver_http.tenant_detach(tenant_id) - tenants_after_ignore = [tenant["id"] for tenant in pageserver_http.tenant_list()] - assert tenant_id not in tenants_after_ignore, "Ignored tenant should be missing" - assert len(tenants_after_ignore) + 1 == len( - tenants_before_ignore + tenants_after_detach = [tenant["id"] for tenant in pageserver_http.tenant_list()] + assert tenant_id not in tenants_after_detach, "Detached tenant should be missing" + assert len(tenants_after_detach) + 1 == len( + tenants_before_detach ), "Only ignored tenant should be missing" - # Calling load will bring the tenant back online + # Subsequently attaching it again should still work pageserver_http.configure_failpoints([("attach-before-activate", "off")]) - env.pageserver.tenant_load(tenant_id) - + env.pageserver.tenant_attach(tenant_id) wait_until_tenant_state(pageserver_http, tenant_id, "Active", 5) endpoint.stop() diff --git a/test_runner/regress/test_tenants.py b/test_runner/regress/test_tenants.py index 22036884ee..5f2c1500d8 100644 --- a/test_runner/regress/test_tenants.py +++ b/test_runner/regress/test_tenants.py @@ -29,18 +29,13 @@ def test_tenant_creation_fails(neon_simple_env: NeonEnv): initial_tenants = sorted( map(lambda t: t.split()[0], neon_simple_env.neon_cli.list_tenants().stdout.splitlines()) ) - initial_tenant_dirs = [d for d in tenants_dir.iterdir()] + [d for d in tenants_dir.iterdir()] - neon_simple_env.pageserver.allowed_errors.extend( - [ - ".*Failed to create directory structure for tenant .*, cleaning tmp data.*", - ".*Failed to fsync removed temporary tenant directory .*", - ] - ) + neon_simple_env.pageserver.allowed_errors.append(".*tenant-config-before-write.*") pageserver_http = neon_simple_env.pageserver.http_client() - pageserver_http.configure_failpoints(("tenant-creation-before-tmp-rename", "return")) - with pytest.raises(Exception, match="tenant-creation-before-tmp-rename"): + pageserver_http.configure_failpoints(("tenant-config-before-write", "return")) + with pytest.raises(Exception, match="tenant-config-before-write"): _ = neon_simple_env.neon_cli.create_tenant() new_tenants = sorted( @@ -48,10 +43,10 @@ def test_tenant_creation_fails(neon_simple_env: NeonEnv): ) assert initial_tenants == new_tenants, "should not create new tenants" - new_tenant_dirs = [d for d in tenants_dir.iterdir()] - assert ( - new_tenant_dirs == initial_tenant_dirs - ), "pageserver should clean its temp tenant dirs on tenant creation failure" + # Any files left behind on disk during failed creation do not prevent + # a retry from succeeding. + pageserver_http.configure_failpoints(("tenant-config-before-write", "off")) + neon_simple_env.neon_cli.create_tenant() def test_tenants_normal_work(neon_env_builder: NeonEnvBuilder): From f94abbab952570ed97a19811bd96d1e8465fc2b1 Mon Sep 17 00:00:00 2001 From: John Spray Date: Tue, 9 Jan 2024 12:10:15 +0000 Subject: [PATCH 8/8] pageserver: clean up a redundant tenant_id attribute (#6280) This was a small TODO(sharding) thing in TenantHarness. --- pageserver/src/tenant.rs | 3 --- pageserver/src/tenant/remote_timeline_client.rs | 2 +- .../src/tenant/timeline/walreceiver/connection_manager.rs | 2 +- 3 files changed, 2 insertions(+), 5 deletions(-) diff --git a/pageserver/src/tenant.rs b/pageserver/src/tenant.rs index b3f888c393..8f7a5769a3 100644 --- a/pageserver/src/tenant.rs +++ b/pageserver/src/tenant.rs @@ -3823,8 +3823,6 @@ pub(crate) mod harness { pub struct TenantHarness { pub conf: &'static PageServerConf, pub tenant_conf: TenantConf, - // TODO(sharding): remove duplicative `tenant_id` in favor of access to tenant_shard_id - pub(crate) tenant_id: TenantId, pub tenant_shard_id: TenantShardId, pub generation: Generation, pub shard: ShardIndex, @@ -3886,7 +3884,6 @@ pub(crate) mod harness { Ok(Self { conf, tenant_conf, - tenant_id, tenant_shard_id, generation: Generation::new(0xdeadbeef), shard: ShardIndex::unsharded(), diff --git a/pageserver/src/tenant/remote_timeline_client.rs b/pageserver/src/tenant/remote_timeline_client.rs index 2ea3ced008..ec2a6efef6 100644 --- a/pageserver/src/tenant/remote_timeline_client.rs +++ b/pageserver/src/tenant/remote_timeline_client.rs @@ -1903,7 +1903,7 @@ mod tests { fn span(&self) -> tracing::Span { tracing::info_span!( "test", - tenant_id = %self.harness.tenant_id, + tenant_id = %self.harness.tenant_shard_id.tenant_id, timeline_id = %TIMELINE_ID ) } diff --git a/pageserver/src/tenant/timeline/walreceiver/connection_manager.rs b/pageserver/src/tenant/timeline/walreceiver/connection_manager.rs index 7fa5bb7689..cf6dee114f 100644 --- a/pageserver/src/tenant/timeline/walreceiver/connection_manager.rs +++ b/pageserver/src/tenant/timeline/walreceiver/connection_manager.rs @@ -1337,7 +1337,7 @@ mod tests { ConnectionManagerState { id: TenantTimelineId { - tenant_id: harness.tenant_id, + tenant_id: harness.tenant_shard_id.tenant_id, timeline_id: TIMELINE_ID, }, timeline,