mirror of
https://github.com/neondatabase/neon.git
synced 2026-05-30 19:40:39 +00:00
Compare commits
4 Commits
skyzh/repa
...
rel_size_c
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
d9788d7cbd | ||
|
|
374495c041 | ||
|
|
ca4d758504 | ||
|
|
d5a7ecade5 |
@@ -5,4 +5,3 @@ listen_http_addr='0.0.0.0:9898'
|
||||
remote_storage={ endpoint='http://minio:9000', bucket_name='neon', bucket_region='eu-north-1', prefix_in_bucket='/pageserver' }
|
||||
control_plane_api='http://0.0.0.0:6666' # No storage controller in docker compose, specify a junk address
|
||||
control_plane_emergency_mode=true
|
||||
virtual_file_io_mode="buffered" # the CI runners where we run the docker compose tests have slow disks
|
||||
|
||||
@@ -7,8 +7,6 @@ Author: Christian Schwarz
|
||||
|
||||
A brief RFC / GitHub Epic describing a vectored version of the `Timeline::get` method that is at the heart of Pageserver.
|
||||
|
||||
**EDIT**: the implementation of this feature is described in [Vlad's (internal) tech talk](https://drive.google.com/file/d/1vfY24S869UP8lEUUDHRWKF1AJn8fpWoJ/view?usp=drive_link).
|
||||
|
||||
# Motivation
|
||||
|
||||
During basebackup, we issue many `Timeline::get` calls for SLRU pages that are *adjacent* in key space.
|
||||
|
||||
@@ -1,362 +0,0 @@
|
||||
# Direct IO For Pageserver
|
||||
|
||||
Date: Apr 30, 2025
|
||||
|
||||
## Summary
|
||||
|
||||
This document is a retroactive RFC. It
|
||||
- provides some background on what direct IO is,
|
||||
- motivates why Pageserver should be using it for its IO, and
|
||||
- describes how we changed Pageserver to use it.
|
||||
|
||||
The [initial proposal](https://github.com/neondatabase/neon/pull/8240) that kicked off the work can be found in this closed GitHub PR.
|
||||
|
||||
People primarily involved in this project were:
|
||||
- Yuchen Liang <yuchen@neon.tech>
|
||||
- Vlad Lazar <vlad@neon.tech>
|
||||
- Christian Schwarz <christian@neon.tech>
|
||||
|
||||
## Timeline
|
||||
|
||||
For posterity, here is the rough timeline of the development work that got us to where we are today.
|
||||
|
||||
- Jan 2024: [integrate `tokio-epoll-uring`](https://github.com/neondatabase/neon/pull/5824) along with owned buffers API
|
||||
- March 2024: `tokio-epoll-uring` enabled in all regions in buffered IO mode
|
||||
- Feb 2024 to June 2024: PS PageCache Bypass For Data Blocks
|
||||
- Feb 2024: [Vectored Get Implementation](https://github.com/neondatabase/neon/pull/6576) bypasses delta & image layer blocks for page requests
|
||||
- Apr to June 2024: [Epic: bypass PageCache for use data blocks](https://github.com/neondatabase/neon/issues/7386) addresses remaining users
|
||||
- Aug to Nov 2024: direct IO: first code; preliminaries; read path coding; BufferedWriter; benchmarks show perf regressions too high, no-go.
|
||||
- Nov 2024 to Jan 2025: address perf regressions by developing page_service pipelining (aka batching) and concurrent IO ([Epic](https://github.com/neondatabase/neon/issues/9376))
|
||||
- Feb to March 2024: rollout batching, then concurrent+direct IO => read path and InMemoryLayer is now direct IO
|
||||
- Apr 2025: develop & roll out direct IO for the write path
|
||||
|
||||
## Background: Terminology & Glossary
|
||||
|
||||
**kernel page cache**: the Linux kernel's page cache is a write-back cache for filesystem contents.
|
||||
The cached unit is memory-page-sized & aligned chunks of the files that are being cached (typically 4k).
|
||||
The cache lives in kernel memory and is not directly accessible through userspace.
|
||||
|
||||
**Buffered IO**: an application's read/write system calls go through the kernel page cache.
|
||||
For example, a 10 byte sized read or write to offset 5000 in a file will load the file contents
|
||||
at offset `[4096,8192)` into a free page in the kernel page cache. If necessary, it will evict
|
||||
a page to make room (cf eviction). Then, the kernel performs a memory-to-memory copy of 10 bytes
|
||||
from/to the offset `4` (`5000 = 4096 + 4`) within the cached page. If it's a write, the kernel keeps
|
||||
track of the fact that the page is now "dirty" in some ancillary structure.
|
||||
|
||||
**Writeback**: a buffered read/write syscall returns after the memory-to-memory copy. The modifications
|
||||
made by e.g. write system calls are not even *issued* to disk, let alone durable. Instead, the kernel
|
||||
asynchronously writes back dirtied pages based on a variety of conditions. For us, the most relevant
|
||||
ones are a) explicit request by userspace (`fsync`) and b) memory pressure.
|
||||
|
||||
**Memory pressure**: the kernel page cache is a best effort service and a user of spare memory capacity.
|
||||
If there is no free memory, the kernel page allocator will take pages used by page cache to satisfy allocations.
|
||||
Before reusing a page like that, the page has to be written back (writeback, see above).
|
||||
The far-reaching consequence of this is that **any allocation of anonymous memory can do IO** if the only
|
||||
way to get that memory is by eviction & re-using a dirty page cache page.
|
||||
Notably, this includes a simple `malloc` in userspace, because eventually that boils down to `mmap(..., MAP_ANON, ...)`.
|
||||
I refer to this effect as the "malloc latency backscatter" caused by buffered IO.
|
||||
|
||||
**Direct IO** allows application's read/write system calls to bypass the kernel page cache. The filesystem
|
||||
is still involved because it is ultimately in charge of mapping the concept of files & offsets within them
|
||||
to sectors on block devices. Typically, the filesystem poses size and alignment requirements for memory buffers
|
||||
and file offsets (statx `Dio_mem_align` / `Dio_offset_align`), see [this gist](https://gist.github.com/problame/1c35cac41b7cd617779f8aae50f97155).
|
||||
The IO operations will fail at runtime with EINVAL if the alignment requirements are not met.
|
||||
|
||||
**"buffered" vs "direct"**: the central distinction between buffered and direct IO is about who allocates and
|
||||
fills the IO buffers, and who controls when exactly the IOs are issued. In buffered IO, it's the syscall handlers,
|
||||
kernel page cache, and memory management subsystems (cf "writeback"). In direct IO, all of it is done by
|
||||
the application.
|
||||
It takes more effort by the application to program with direct instead of buffered IO.
|
||||
The return is precise control over and a clear distinction between consumption/modification of memory vs disk.
|
||||
|
||||
**Pageserver PageCache**: Pageserver has an additional `PageCache` (referred to as PS PageCache from here on, as opposed to "kernel page cache").
|
||||
Its caching unit is 8KiB blocks of the layer files written by Pageserver.
|
||||
A miss in PageCache is filled by reading from the filesystem, through the `VirtualFile` abstraction layer.
|
||||
The default size is tiny (64MiB), very much like Postgres's `shared_buffers`.
|
||||
We ran production at 128MiB for a long time but gradually moved it up to 2GiB over the past ~year.
|
||||
|
||||
**VirtualFile** is Pageserver's abstraction for file IO, very similar to the facility in Postgres that bears the same name.
|
||||
Its historical purpose appears to be working around open file descriptor limitations, which is practically irrelevant on Linux.
|
||||
However, the facility in Pageserver is useful as an intermediary layer for metrics and abstracts over the different kinds of
|
||||
IO engines that Pageserver supports (`std-fs` vs `tokio-epoll-uring`).
|
||||
|
||||
## Background: History Of Caching In Pageserver
|
||||
|
||||
For multiple years, Pageserver's `PageCache` was on the path of all read _and write_ IO.
|
||||
It performed write-back to the kernel using buffered IO.
|
||||
|
||||
We converted it into a read-only cache of immutable data in [PR 4994](https://github.com/neondatabase/neon/pull/4994).
|
||||
|
||||
The introduction of `tokio-epoll-uring` required converting the code base to used owned IO buffers.
|
||||
The `PageCache` pages are usable as owned IO buffers.
|
||||
|
||||
We then started bypassing PageCache for user data blocks.
|
||||
Data blocks are the 8k blocks of data in layer files that hold the multiple `Value`s, as opposed to the disk btree index blocks that tell us which values exist in a file at what offsets.
|
||||
The disk btree embedded in delta & image layers remains `PageCache`'d.
|
||||
Epics for that work were:
|
||||
- Vectored `Timeline::get` (cf RFC 30) skipped delta and image layer data block `PageCache`ing outright.
|
||||
- Epic https://github.com/neondatabase/neon/issues/7386 took care of the remaining users for data blocks:
|
||||
- Materialized page cache (cached materialized pages; shown to be ~0% hit rate in practice)
|
||||
- InMemoryLayer
|
||||
- Compaction
|
||||
|
||||
The outcome of the above:
|
||||
1. All data blocks are always read through the `VirtualFile` APIs, hitting the kernel buffered read path (=> kernel page cache).
|
||||
2. Indirect blocks (=disk btree blocks) would be cached in the PS `PageCache`.
|
||||
|
||||
In production we size the PS `PageCache` to be 2GiB.
|
||||
Thus drives hit rate up to ~99.95% and the eviction rate / replacement rates down to less than 200/second on a 1-minute average, on the busiest machines.
|
||||
High baseline replacement rates are treated as a signal of resource exhaustion (page cache insufficient to host working set of the PS).
|
||||
The response to this is to migrate tenants away, or increase PS `PageCache` size.
|
||||
It is currently manual but could be automated, e.g., in Storage Controller.
|
||||
|
||||
In the future, we may eliminate the `PageCache` even for indirect blocks.
|
||||
For example with an LRU cache that has as unit the entire disk btree content
|
||||
instead of individual blocks.
|
||||
|
||||
## High-Level Design
|
||||
|
||||
So, before work on this project started, all data block reads and the entire write path of Pageserver were using kernel-buffered IO, i.e., the kernel page cache.
|
||||
We now want to get the kernel page cache out of the picture by using direct IO for all interaction with the filesystem.
|
||||
This achieves the following system properties:
|
||||
|
||||
**Predictable VirtualFile latencies**
|
||||
* With buffered IO, reads are sometimes fast, sometimes slow, depending on kernel page cache hit/miss.
|
||||
* With buffered IO, appends when writing out new layer files during ingest or compaction are sometimes fast, sometimes slow because of write-back backpressure.
|
||||
* With buffered IO, the "malloc backscatter" phenomenon pointed out in the Glossary section is not something we actively observe.
|
||||
But we do have occasional spikes in Dirty memory amount and Memory PSI graphs, so it may already be affecting to some degree.
|
||||
* By switching to direct IO, above operations will have the (predictable) device latency -- always.
|
||||
Reads and appends always go to disk.
|
||||
And malloc will not have to write back dirty data.
|
||||
|
||||
**Explicitness & Tangibility of resource usage**
|
||||
* In a multi-tenant system, it is generally desirable and valuable to be *explicit* about the main resources we use for each tenant.
|
||||
* By using direct IO, we become explicit about the resources *disk IOPs* and *memory capacity* in a way that was previously being conflated through the kernel page cache, outside our immediate control.
|
||||
* We will be able to build per-tenant observability of resource usage ("what tenant is causing the actual IOs that are sent to the disk?").
|
||||
* We will be able to build accounting & QoS by implementing an IO scheduler that is tenant aware. The kernel is not tenant-aware and can't do that.
|
||||
|
||||
**CPU Efficiency**
|
||||
* The involvement of the kernel page cache means one additional memory-to-memory copy on read and write path.
|
||||
* Direct IO will eliminate that memory-to-memory copy, if we can make the userspace buffers used for the IO calls satisfy direct IO alignment requirements.
|
||||
|
||||
The **trade-off** is that we no longer get the theoretical benefits of the kernel page cache. These are:
|
||||
- read latency improvements for repeat reads of the same data ("locality of reference")
|
||||
- asterisk: only if that state is still cache-resident by time of next access
|
||||
- write throughput by having kernel page cache batch small VFS writes into bigger disk writes
|
||||
- asterisk: only if memory pressure is low enough that the kernel can afford to delay writeback
|
||||
|
||||
We are **happy to make this trade-off**:
|
||||
- Because of the advantages listed above.
|
||||
- Because we empirically have enough DRAM on Pageservers to serve metadata (=index blocks) from PS PageCache.
|
||||
(At just 2GiB PS PageCache size, we average a 99.95% hit rate).
|
||||
So, the latency of going to disk is only for data block reads, not the index traversal.
|
||||
- Because **the kernel page cache is ineffective** at high tenant density anyway (#tenants/pageserver instance).
|
||||
And because dense packing of tenants will always be desirable to drive COGS down, we should design the system for it.
|
||||
(See the appendix for a more detailed explanation why this is).
|
||||
- So, we accept that some reads that used to be fast by circumstance will have higher but **predictable** latency than before.
|
||||
|
||||
### Desired End State
|
||||
|
||||
The desired end state of the project is as follows, and with some asterisks, we have achieved it.
|
||||
|
||||
All IOs of the Pageserver data path use direct IO, thereby bypassing the kernel page cache.
|
||||
|
||||
In particular, the "data path" includes
|
||||
- the wal ingest path
|
||||
- compaction
|
||||
- anything on the `Timeline::get` / `Timeline::get_vectored` path.
|
||||
|
||||
The production Pageserver config is tuned such that virtually all non-data blocks are cached in the PS PageCache.
|
||||
Hit rate target is 99.95%.
|
||||
|
||||
There are no regressions to ingest latency.
|
||||
|
||||
The total "wait-for-disk time" contribution to random getpage request latency is `O(1 read IOP latency)`.
|
||||
We accomplish that by having a near 100% PS PageCache hit rate so that layer index traversal effectively never needs not wait for IO.
|
||||
Thereby, it can issue all the data blocks as it traverses the index, and only wait at the end of it (concurrent IO).
|
||||
|
||||
The amortized "wait-for-disk time" contribution of this direct IO proposal to a series of sequential getpage requests is `1/32 * read IOP latency` for each getpage request.
|
||||
We accomplish this by server-side batching of up to 32 reads into a single `Timeline::get_vectored` call.
|
||||
(This is an ideal world where our batches are full - that's not the case in prod today because of lack of queue depth).
|
||||
|
||||
## Design & Implementation
|
||||
|
||||
### Prerequisites
|
||||
|
||||
A lot of prerequisite work had to happen to enable use of direct IO.
|
||||
|
||||
To meet the "wait-for-disk time" requirements from the DoD, we implement for the read path:
|
||||
- page_service level server-side batching (config field `page_service_pipelining`)
|
||||
- concurrent IO (config field `get_vectored_concurrent_io`)
|
||||
The work for both of these these was tracked [in the epic](https://github.com/neondatabase/neon/issues/9376).
|
||||
Server-side batching will likely be obsoleted by the [#proj-compute-communicator](https://github.com/neondatabase/neon/pull/10799).
|
||||
The Concurrent IO work is described in retroactive RFC `2025-04-30-pageserver-concurrent-io-on-read-path.md`.
|
||||
The implementation is relatively brittle and needs further investment, see the `Future Work` section in that RFC.
|
||||
|
||||
For the write path, and especially WAL ingest, we need to hide write latency.
|
||||
We accomplish this by implementing a (`BufferedWriter`) type that does double-buffering: flushes of the filled
|
||||
buffer happen in a sidecar tokio task while new writes fill a new buffer.
|
||||
We refactor InMemoryLayer as well as BlobWriter (=> delta and image layer writers) to use this new `BufferedWriter`.
|
||||
The most comprehensive write-up of this work is in [the PR description](https://github.com/neondatabase/neon/pull/11558).
|
||||
|
||||
### Ensuring Adherence to Alignment Requirements
|
||||
|
||||
Direct IO puts requirements on
|
||||
- memory buffer alignment
|
||||
- io size (=memory buffer size)
|
||||
- file offset alignment
|
||||
|
||||
The requirements are specific to a combination of filesystem/block-device/architecture(hardware page size!).
|
||||
|
||||
In Neon production environments we currently use ext4 with Linux 6.1.X on AWS and Azure storage-optimized instances (locally attached NVMe).
|
||||
Instead of dynamic discovery using `statx`, we statically hard-code 512 bytes as the buffer/offset alignment and size-multiple.
|
||||
We made this decision because:
|
||||
- a) it is compatible with all the environments we need to run in
|
||||
- b) our primary workload can be small-random-read-heavy (we do merge adjacent reads if possible, but the worst case is that all `Value`s that needs to be read are far apart)
|
||||
- c) 512-byte tail latency on the production instance types is much better than 4k (p99.9: 3x lower, p99.99 5x lower).
|
||||
- d) hard-coding at compile-time allows us to use the Rust type system to enforce the use of only aligned IO buffers, eliminating a source of runtime errors typically associated with direct IO.
|
||||
|
||||
This was [discussed here](https://neondb.slack.com/archives/C07BZ38E6SD/p1725036790965549?thread_ts=1725026845.455259&cid=C07BZ38E6SD).
|
||||
|
||||
The new `IoBufAligned` / `IoBufAlignedMut` marker traits indicate that a given buffer meets memory alignment requirements.
|
||||
All `VirtualFile` APIs and several software layers built on top of them only accept buffers that implement those traits.
|
||||
Implementors of the marker traits are:
|
||||
- `IoBuffer` / `IoBufferMut`: used for most reads and writes
|
||||
- `PageWriteGuardBuf`: for filling PS PageCache pages (index blocks!)
|
||||
|
||||
The alignment requirement is infectious; it permeates bottom-up throughout the code base.
|
||||
We stop the infection at roughly the same layers in the code base where we stopped permeating the
|
||||
use of owned-buffers-style API for tokio-epoll-uring. The way the stopping works is by introducing
|
||||
a memory-to-memory copy from/to some unaligned memory location on the stack/current/heap.
|
||||
The places where we currently stop permeating are sort of arbitrary. For example, it would probably
|
||||
make sense to replace more usage of `Bytes` that we know holds 8k pages with 8k-sized `IoBuffer`s.
|
||||
|
||||
The `IoBufAligned` / `IoBufAlignedMut` types do not protect us from the following types of runtime errors:
|
||||
- non-adherence to file offset alignment requirements
|
||||
- non-adherence to io size requirements
|
||||
|
||||
The following higher-level constructs ensure we meet the requirements:
|
||||
- read path: the `ChunkedVectoredReadBuilder` and `mod vectored_dio_read` ensure reads happen at aligned offsets and in appropriate size multiples.
|
||||
- write path: `BufferedWriter` only writes in multiples of the capacity, at offsets that are `start_offset+N*capacity`; see its doc comment.
|
||||
|
||||
Note that these types are used always, regardless of whether direct IO is enabled or not.
|
||||
There are some cases where this adds unnecessary overhead to buffered IO (e.g. all memcpy's inflated to multiples of 512).
|
||||
But we could not identify meaningful impact in practice when we shipped these changes while we were still using buffered IO.
|
||||
|
||||
### Configuration / Feature Flagging
|
||||
|
||||
In the previous section we described how all users of VirtualFile were changed to always adhere to direct IO alignment and size-multiple requirements.
|
||||
To actually enable direct IO, all we need to do is set the `O_DIRECT` flag in `open` syscalls / io_uring operations.
|
||||
|
||||
We set `O_DIRECT` based on:
|
||||
- the VirtualFile API used to create/open the VirtualFile instance
|
||||
- the `virtual_file_io_mode` configuration flag
|
||||
- the OpenOptions `read` and/or `write` flags.
|
||||
|
||||
The VirtualFile APIs suffixed with `_v2` are the only ones that _may_ open with `O_DIRECT` depending on the other two factors in above list.
|
||||
Other APIs never use `O_DIRECT`.
|
||||
(The name is bad and should really be `_maybe_direct_io`.)
|
||||
|
||||
The reason for having new APIs is because all code used VirtualFile but implementation and rollout happened in consecutive phases (read path, InMemoryLayer, write path).
|
||||
At the VirtualFile level, context on whether an instance of VirtualFile is on read path, InMemoryLayer, or write path is not available.
|
||||
|
||||
The `_v2` APIs then check make the decision to set `O_DIRECT` based on the `virtual_file_io_mode` flag and the OpenOptions `read`/`write` flags.
|
||||
The result is the following runtime behavior:
|
||||
|
||||
|what|OpenOptions|`v_f_io_mode`<br/>=`buffered`|`v_f_io_mode`<br/>=`direct`|`v_f_io_mode`<br/>=`direct-rw`|
|
||||
|-|-|-|-|-|
|
||||
|`DeltaLayerInner`|read|()|O_DIRECT|O_DIRECT|
|
||||
|`ImageLayerInner`|read|()|O_DIRECT|O_DIRECT|
|
||||
|`InMemoryLayer`|read + write|()|()*|O_DIRECT|
|
||||
|`DeltaLayerWriter`| write | () | () | O_DIRECT |
|
||||
|`ImageLayerWriter`| write | () | () | O_DIRECT |
|
||||
|`download_layer_file`|write |()|()|O_DIRECT|
|
||||
|
||||
The `InMemoryLayer` is marked with `*` because there was a period when it *did* use O_DIRECT under `=direct`.
|
||||
That period was when we implemented and shipped the first version of `BufferedWriter`.
|
||||
We used it in `InMemoryLayer` and `download_layer_file` but it was only sensitive to `v_f_io_mode` in `InMemoryLayer`.
|
||||
The introduction of `=direct-rw`, and the switch of the remaining write path to `BufferedWriter`, happened later,
|
||||
in https://github.com/neondatabase/neon/pull/11558.
|
||||
|
||||
Note that this way of feature flagging inside VirtualFile makes it less and less a general purpose POSIX file access abstraction.
|
||||
For example, with `=direct-rw` enabled, it is no longer possible to open a `VirtualFile` without `O_DIRECT`. It'll always be set.
|
||||
|
||||
## Correctness Validation
|
||||
|
||||
The correctness risks with this project were:
|
||||
- Memory safety issues in the `IoBuffer` / `IoBufferMut` implementation.
|
||||
These types expose an API that is largely identical to that of the `bytes` crate and/or Vec.
|
||||
- Runtime errors (=> downtime / unavailability) because of non-adherence to alignment/size-multiple requirements, resulting in EINVAL on the read path.
|
||||
|
||||
We sadly do not have infrastructure to run pageserver under `cargo miri`.
|
||||
So for memory safety issues, we relied on careful peer review.
|
||||
|
||||
We do assert the production-like alignment requirements in testing builds.
|
||||
However, these asserts were added retroactively.
|
||||
The actual validation before rollout happened in staging and pre-prod.
|
||||
We eventually enabled `=direct`/`=direct-rw` for Rust unit tests and the regression test suite.
|
||||
I cannot recall a single instance of staging/pre-prod/production errors caused by non-adherence to alignment/size-multiple requirements.
|
||||
Evidently developer testing was good enough.
|
||||
|
||||
## Performance Validation
|
||||
|
||||
The read path went through a lot of iterations of benchmarking in staging and pre-prod.
|
||||
The benchmarks in those environments demonstrated performance regressions early in the implementation.
|
||||
It was actually this performance testing that made us implement batching and concurrent IO to avoid unacceptable regressions.
|
||||
|
||||
The write path was much quicker to validate because `bench_ingest` covered all of the (less numerous) access patterns.
|
||||
|
||||
## Future Work
|
||||
|
||||
There is minor and major follow-up work that can be considered in the future.
|
||||
Check the (soon-to-be-closed) Epic https://github.com/neondatabase/neon/issues/8130's "Follow-Ups" section for a current list.
|
||||
|
||||
Read Path:
|
||||
- PS PageCache hit rate is crucial to unlock concurrent IO and reasonable latency for random reads generally.
|
||||
Instead of reactively sizing PS PageCache, we should estimate the required PS PageCache size
|
||||
and potentially also use that to drive placement decisions of shards from StorageController
|
||||
https://github.com/neondatabase/neon/issues/9288
|
||||
- ... unless we get rid of PS PageCache entirely and cache the index block in a more specialized cache.
|
||||
But even then, an estimation of the working set would be helpful to figure out caching strategy.
|
||||
|
||||
Write Path:
|
||||
- BlobWriter and its users could switch back to a borrowed API https://github.com/neondatabase/neon/issues/10129
|
||||
- ... unless we want to implement bypass mode for large writes https://github.com/neondatabase/neon/issues/10101
|
||||
- The `TempVirtualFile` introduced as part of this project could internalize more of the common usage pattern: https://github.com/neondatabase/neon/issues/11692
|
||||
- Reduce conditional compilation around `virtual_file_io_mode`: https://github.com/neondatabase/neon/issues/11676
|
||||
|
||||
Both:
|
||||
- A performance simulation mode that pads VirtualFile op latencies to typical NVMe latencies, even if the underlying storage is faster.
|
||||
This would avoid misleadingly good performance on developer systems and in benchmarks on systems that are less busy than production hosts.
|
||||
However, padding latencies at microsecond scale is non-trivial.
|
||||
|
||||
Misc:
|
||||
- We should finish trimming VirtualFile's scope to be truly limited to core data path read & write.
|
||||
Abstractions for reading & writing pageserver config, location config, heatmaps, etc, should use
|
||||
APIs in a different package (`VirtualFile::crashsafe_overwrite` and `VirtualFile::read_to_string`
|
||||
are good entrypoints for cleanup.) https://github.com/neondatabase/neon/issues/11809
|
||||
|
||||
# Appendix
|
||||
|
||||
## Why Kernel Page Cache Is Ineffective At Tenant High Density
|
||||
|
||||
In the Motivation section, we stated:
|
||||
|
||||
> - **The kernel page cache ineffective** at high tenant density anyways (#tenants/pageserver instance).
|
||||
|
||||
The reason is that the Pageserver workload sent from Computes is whatever is a Compute cache(s) miss.
|
||||
That's either sequential scans or random reads.
|
||||
A random read workload simply causes cache thrashing because a packed Pageserver NVMe drive (`im4gn.2xlarge`) has ~100x more capacity than DRAM available.
|
||||
It is complete waste to have the kernel page cache cache data blocks in this case.
|
||||
Sequential read workloads *can* benefit iff those pages have been updated recently (=no image layer yet) and together in time/LSN space.
|
||||
In such cases, the WAL records of those updates likely sit on the same delta layer block.
|
||||
When Compute does a sequential scan, it sends a series of single-page requests for these individual pages.
|
||||
When Pageserver processes the second request in such a series, it goes to the same delta layer block and have a kernel page cache hit.
|
||||
This dependence on kernel page cache for sequential scan performance is significant, but the solution is at a higher level than generic data block caching.
|
||||
We can either add a small per-connection LRU cache for such delta layer blocks.
|
||||
Or we can merge those sequential requests into a larger vectored get request, which is designed to never read a block twice.
|
||||
This amortizes the read latency for our delta layer block across the vectored get batch size (which currently is up to 32).
|
||||
|
||||
There are Pageserver-internal workloads that do sequential access (compaction, image layer generation), but these
|
||||
1. are not latency-critical and can do batched access outside of the `page_service` protocol constraints (image layer generation)
|
||||
2. don't actually need to reconstruct images and therefore can use totally different access methods (=> compaction can use k-way merge iterators with their own internal buffering / prefetching).
|
||||
@@ -1,251 +0,0 @@
|
||||
# Concurrent IO for Pageserver Read Path
|
||||
|
||||
Date: May 6, 2025
|
||||
|
||||
## Summary
|
||||
|
||||
This document is a retroactive RFC on the Pageserver Concurrent IO work that happened in late 2024 / early 2025.
|
||||
|
||||
The gist of it is that Pageserver's `Timeline::get_vectored` now _issues_ the data block read operations against layer files
|
||||
_as it traverses the layer map_ and only _wait_ once, for all of them, after traversal is complete.
|
||||
|
||||
Assuming a good PS PageCache hits on the index blocks during traversal, this drives down the "wait-for-disk" time
|
||||
contribution down from `random_read_io_latency * O(number_of_values)` to `random_read_io_latency * O(1 + traversal)`.
|
||||
|
||||
The motivation for why this work had to happen when it happened was the switch of Pageserver to
|
||||
- not cache user data blocks in PS PageCache and
|
||||
- switch to use direct IO.
|
||||
More context on this are given in complimentary RFC `./rfcs/2025-04-30-direct-io-for-pageserver.md`.
|
||||
|
||||
### Refs
|
||||
|
||||
- Epic: https://github.com/neondatabase/neon/issues/9378
|
||||
- Prototyping happened during the Lisbon 2024 Offsite hackathon: https://github.com/neondatabase/neon/pull/9002
|
||||
- Main implementation PR with good description: https://github.com/neondatabase/neon/issues/9378
|
||||
|
||||
Design and implementation by:
|
||||
- Vlad Lazar <vlad@neon.tech>
|
||||
- Christian Schwarz <christian@neon.tech>
|
||||
|
||||
## Background & Motivation
|
||||
|
||||
The Pageserver read path (`Timeline::get_vectored`) consists of two high-level steps:
|
||||
- Retrieve the delta and image `Value`s required to reconstruct the requested Page@LSN (`Timeline::get_values_reconstruct_data`).
|
||||
- Pass these values to walredo to reconstruct the page images.
|
||||
|
||||
The read path used to be single-key but has been made multi-key some time ago.
|
||||
([Internal tech talk by Vlad](https://drive.google.com/file/d/1vfY24S869UP8lEUUDHRWKF1AJn8fpWoJ/view?usp=drive_link))
|
||||
However, for simplicity, most of this doc will explain things in terms of a single key being requested.
|
||||
|
||||
The `Value` retrieval step above can be broken down into the following functions:
|
||||
- **Traversal** of the layer map to figure out which `Value`s from which layer files are required for the page reconstruction.
|
||||
- **Read IO Planning**: planning of the read IOs that need to be issued to the layer files / filesystem / disk.
|
||||
The main job here is to coalesce the small value reads into larger filesystem-level read operations.
|
||||
This layer also takes care of direct IO alignment and size-multiple requirements (cf the RFC for details.)
|
||||
Check `struct VectoredReadPlanner` and `mod vectored_dio_read` for how it's done.
|
||||
- **Perform the read IO** using `tokio-epoll-uring`.
|
||||
|
||||
Before this project, above functions were sequentially interleaved, meaning:
|
||||
1. we would advance traversal, ...
|
||||
2. discover, that we need to read a value, ...
|
||||
3. read it from disk using `tokio-epoll-uring`, ...
|
||||
4. goto 1 unless we're done.
|
||||
|
||||
This meant that if N `Value`s need to be read to reconstruct a page,
|
||||
the time we spend waiting for disk will be we `random_read_io_latency * O(number_of_values)`.
|
||||
|
||||
## Design
|
||||
|
||||
The **traversal** and **read IO Planning** jobs still happen sequentially, layer by layer, as before.
|
||||
But instead of performing the read IOs inline, we submit the IOs to a concurrent tokio task for execution.
|
||||
After the last read from the last layer is submitted, we wait for the IOs to complete.
|
||||
|
||||
Assuming the filesystem / disk is able to actually process the submitted IOs without queuing,
|
||||
we arrive at _time spent waiting for disk_ ~ `random_read_io_latency * O(1 + traversal)`.
|
||||
|
||||
Note this whole RFC is concerned with the steady state where all layer files required for reconstruction are resident on local NVMe.
|
||||
Traversal will stall on on-demand layer download if a layer is not yet resident.
|
||||
It cannot proceed without the layer being resident beccause its next step depends on the contents of the layer index.
|
||||
|
||||
### Avoiding Waiting For IO During Traversal
|
||||
|
||||
The `traversal` component in above time-spent-waiting-for-disk estimation is dominant and needs to be minimized.
|
||||
|
||||
Before this project, traversal needed to perform IOs for the following:
|
||||
1. The time we are waiting on PS PageCache to page in the visited layers' disk btree index blocks.
|
||||
2. When visiting a delta layer, reading the data block that contains a `Value` for a requested key,
|
||||
to determine whether the `Value::will_init` the page and therefore traversal can stop for this key.
|
||||
|
||||
The solution for (1) is to raise the PS PageCache size such that the hit rate is practically 100%.
|
||||
(Check out the `Background: History Of Caching In Pageserver` section in the RFC on Direct IO for more details.)
|
||||
|
||||
The solution for (2) is source `will_init` from the disk btree index keys, which fortunately
|
||||
already encode this bit of information since the introduction of the current storage/layer format.
|
||||
|
||||
### Concurrent IOs, Submission & Completion
|
||||
|
||||
To separate IO submission from waiting for its completion,
|
||||
we introduce the notion of an `IoConcurrency` struct through which IOs are issued.
|
||||
|
||||
An IO is an opaque future that
|
||||
- captures the `tx` side of a `oneshot` channel
|
||||
- performs the read IO by calling `VirtualFile::read_exact_at().await`
|
||||
- sending the result into the `tx`
|
||||
|
||||
Issuing an IO means `Box`ing the future above and handing that `Box` over to the `IoConcurrency` struct.
|
||||
|
||||
The traversal code that submits the IO stores the the corresponding `oneshot::Receiver`
|
||||
in the `VectoredValueReconstructState`, in the the place where we previously stored
|
||||
the sequentially read `img` and `records` fields.
|
||||
|
||||
When we're done with traversal, we wait for all submitted IOs:
|
||||
for each key, there is a future that awaits all the `oneshot::Receiver`s
|
||||
for that key, and then calls into walredo to reconstruct the page image.
|
||||
Walredo is now invoked concurrently for each value instead of sequentially.
|
||||
Walredo itself remains unchanged.
|
||||
|
||||
The spawned IO futures are driven to completion by a sidecar tokio task that
|
||||
is separate from the task that performs all the layer visiting and spawning of IOs.
|
||||
That tasks receives the IO futures via an unbounded mpsc channel and
|
||||
drives them to completion inside a `FuturedUnordered`.
|
||||
|
||||
### Error handling, Panics, Cancellation-Safety
|
||||
|
||||
There are two error classes during reconstruct data retrieval:
|
||||
* traversal errors: index lookup, move to next layer, and the like
|
||||
* value read IO errors
|
||||
|
||||
A traversal error fails the entire `get_vectored` request, as before this PR.
|
||||
A value read error only fails reconstruction of that value.
|
||||
|
||||
Panics and dropping of the `get_vectored` future before it completes
|
||||
leaves the sidecar task running and does not cancel submitted IOs
|
||||
(see next section for details on sidecar task lifecycle).
|
||||
All of this is safe, but, today's preference in the team is to close out
|
||||
all resource usage explicitly if possible, rather than cancelling + forgetting
|
||||
about it on drop. So, there is warning if we drop a
|
||||
`VectoredValueReconstructState`/`ValuesReconstructState` that still has uncompleted IOs.
|
||||
|
||||
### Sidecar Task Lifecycle
|
||||
|
||||
The sidecar tokio task is spawned as part of the `IoConcurrency::spawn_from_conf` struct.
|
||||
The `IoConcurrency` object acts as a handle through which IO futures are submitted.
|
||||
|
||||
The spawned tokio task holds the `Timeline::gate` open.
|
||||
It is _not_ sensitive to `Timeline::cancel`, but instead to the `IoConcurrency` object being dropped.
|
||||
|
||||
Once the `IoConcurrency` struct is dropped, no new IO futures can come in
|
||||
but already submitted IO futures will be driven to completion regardless.
|
||||
We _could_ safely stop polling these futures because `tokio-epoll-uring` op futures are cancel-safe.
|
||||
But the underlying kernel and hardware resources are not magically freed up by that.
|
||||
So, again, in the interest of closing out all outstanding resource usage, we make timeline shutdown wait for sidecar tasks and their IOs to complete.
|
||||
Under normal conditions, this should be in the low hundreds of microseconds.
|
||||
|
||||
It is advisable to make the `IoConcurrency` as long-lived as possible to minimize the amount of
|
||||
tokio task churn (=> lower pressure on tokio). Generally this means creating it "high up" in the call stack.
|
||||
The pain with this is that the `IoConcurrency` reference needs to be propagated "down" to
|
||||
the (short-lived) functions/scope where we issue the IOs.
|
||||
We would like to use `RequestContext` for this propagation in the future (issue [here](https://github.com/neondatabase/neon/issues/10460)).
|
||||
For now, we just add another argument to the relevant code paths.
|
||||
|
||||
### Feature Gating
|
||||
|
||||
The `IoConcurrency` is an `enum` with two variants: `Sequential` and `SidecarTask`.
|
||||
|
||||
The behavior from before this project is available through `IoConcurrency::Sequential`,
|
||||
which awaits the IO futures in place, without "spawning" or "submitting" them anywhere.
|
||||
|
||||
The `get_vectored_concurrent_io` pageserver config variable determines the runtime value,
|
||||
**except** for the places that use `IoConcurrency::sequential` to get an `IoConcurrency` object.
|
||||
|
||||
### Alternatives Explored & Caveats Encountered
|
||||
|
||||
A few words on the rationale behind having a sidecar *task* and what
|
||||
alternatives were considered but abandoned.
|
||||
|
||||
#### Why We Need A Sidecar *Task* / Why Just `FuturesUnordered` Doesn't Work
|
||||
|
||||
We explored to not have a sidecar task, and instead have a `FuturesUnordered` per
|
||||
`Timeline::get_vectored`. We would queue all IO futures in it and poll it for the
|
||||
first time after traversal is complete (i.e., at `collect_pending_ios`).
|
||||
|
||||
The obvious disadvantage, but not showstopper, is that we wouldn't be submitting
|
||||
IOs until traversal is complete.
|
||||
|
||||
The showstopper however, is that deadlocks happen if we don't drive the
|
||||
IO futures to completion independently of the traversal task.
|
||||
The reason is that both the IO futures and the traversal task may hold _some_,
|
||||
_and_ try to acquire _more_, shared limited resources.
|
||||
For example, both the travseral task and IO future may try to acquire
|
||||
* a `VirtualFile` file descriptor cache slot async mutex (observed during impl)
|
||||
* a `tokio-epoll-uring` submission slot (observed during impl)
|
||||
* a `PageCache` slot (currently this is not the case but we may move more code into the IO futures in the future)
|
||||
|
||||
#### Why We Don't Do `tokio::task`-per-IO-future
|
||||
|
||||
Another option is to spawn a short-lived `tokio::task` for each IO future.
|
||||
We implemented and benchmarked it during development, but found little
|
||||
throughput improvement and moderate mean & tail latency degradation.
|
||||
Concerns about pressure on the tokio scheduler led us to abandon this variant.
|
||||
|
||||
## Future Work
|
||||
|
||||
In addition to what is listed here, also check the "Punted" list in the epic:
|
||||
https://github.com/neondatabase/neon/issues/9378
|
||||
|
||||
### Enable `Timeline::get`
|
||||
|
||||
The only major code path that still uses `IoConcurrency::sequential` is `Timeline::get`.
|
||||
The impact is that roughly the following parts of pageserver do not benefit yet:
|
||||
- parts of basebackup
|
||||
- reads performed by the ingest path
|
||||
- most internal operations that read metadata keys (e.g. `collect_keyspace`!)
|
||||
|
||||
The solution is to propagate `IoConcurrency` via `RequestContext`:https://github.com/neondatabase/neon/issues/10460
|
||||
|
||||
The tricky part is to figure out at which level of the code the `IoConcurrency` is spawned (and added to the RequestContext).
|
||||
|
||||
Also, propagation via `RequestContext` makes makes it harder to tell during development whether a given
|
||||
piece of code uses concurrent vs sequential mode: one has to recurisvely walk up the call tree to find the
|
||||
place that puts the `IoConcurrency` into the `RequestContext`.
|
||||
We'd have to use `::Sequential` as the conservative default value in a fresh `RequestContext`, and add some
|
||||
observability to weed out places that fail to enrich with a properly spanwed `IoConcurrency::spawn_from_conf`.
|
||||
|
||||
### Concurrent On-Demand Downloads enabled by Detached Indices
|
||||
|
||||
As stated earlier, traversal stalls on on-demand download because its next step depends on the contents of the layer index.
|
||||
Once we have separated indices from data blocks (=> https://github.com/neondatabase/neon/issues/11695)
|
||||
we will only need to stall if the index is not resident. The download of the data blocks can happen concurrently or in the background. For example:
|
||||
- Move the `Layer::get_or_maybe_download().await` inside the IO futures.
|
||||
This goes in the opposite direction of the next "future work" item below, but it's easy to do.
|
||||
- Serve the IO future directly from object storage and dispatch the layer download
|
||||
to some other actor, e.g., an actor that is responsible for both downloads & eviction.
|
||||
|
||||
### New `tokio-epoll-uring` API That Separates Submission & Wait-For-Completion
|
||||
|
||||
Instead of `$op().await` style API, it would be useful to have a different `tokio-epoll-uring` API
|
||||
that separates enqueuing (without necessarily `io_uring_enter`ing the kernel each time), submission,
|
||||
and then wait for completion.
|
||||
|
||||
The `$op().await` API is too opaque, so we _have_ to stuff it into a `FuturesUnordered`.
|
||||
|
||||
A split API as sketched above would allow traversal to ensure an IO operation is enqueued to the kernel/disk (and get back-pressure iff the io_uring squeue is full).
|
||||
While avoiding spending of CPU cycles on processing of completions while we're still traversing.
|
||||
|
||||
The idea gets muddied by the fact that we may self-deadlock if we submit too much without completing.
|
||||
So, the submission part of the split API needs to process completions if squeue is full.
|
||||
|
||||
In any way, this split API is precondition for the bigger issue with the design presented here,
|
||||
which we dicsuss in the next section.
|
||||
|
||||
### Opaque Futures Are Brittle
|
||||
|
||||
The use of opaque futures to represent submitted IOs is a clever hack to minimize changes & allow for near-perfect feature-gating.
|
||||
However, we take on **brittleness** because callers must guarantee that the submitted futures are independent.
|
||||
By our experience, it is non-trivial to identify or rule out the interdependencies.
|
||||
See the lengthy doc comment on the `IoConcurrency::spawn_io` method for more details.
|
||||
|
||||
The better interface and proper subsystem boundary is a _descriptive_ struct of what needs to be done ("read this range from this VirtualFile into this buffer")
|
||||
and get back a means to wait for completion.
|
||||
The subsystem can thereby reason by its own how operations may be related;
|
||||
unlike today, where the submitted opaque future can do just about anything.
|
||||
@@ -561,6 +561,21 @@ pub fn rel_block_to_key(rel: RelTag, blknum: BlockNumber) -> Key {
|
||||
}
|
||||
}
|
||||
|
||||
#[inline(always)]
|
||||
pub fn key_to_rel_tag(key: Key) -> RelTag {
|
||||
RelTag {
|
||||
spcnode: key.field2,
|
||||
dbnode: key.field3,
|
||||
relnode: key.field4,
|
||||
forknum: key.field5,
|
||||
}
|
||||
}
|
||||
|
||||
#[inline(always)]
|
||||
pub fn key_to_blknum(key: Key) -> BlockNumber {
|
||||
key.field6
|
||||
}
|
||||
|
||||
#[inline(always)]
|
||||
pub fn rel_size_to_key(rel: RelTag) -> Key {
|
||||
Key {
|
||||
|
||||
@@ -1832,7 +1832,6 @@ pub mod virtual_file {
|
||||
Eq,
|
||||
Hash,
|
||||
strum_macros::EnumString,
|
||||
strum_macros::EnumIter,
|
||||
strum_macros::Display,
|
||||
serde_with::DeserializeFromStr,
|
||||
serde_with::SerializeDisplay,
|
||||
@@ -1844,8 +1843,10 @@ pub mod virtual_file {
|
||||
/// Uses buffered IO.
|
||||
Buffered,
|
||||
/// Uses direct IO for reads only.
|
||||
#[cfg(target_os = "linux")]
|
||||
Direct,
|
||||
/// Use direct IO for reads and writes.
|
||||
#[cfg(target_os = "linux")]
|
||||
DirectRw,
|
||||
}
|
||||
|
||||
@@ -1853,13 +1854,26 @@ pub mod virtual_file {
|
||||
pub fn preferred() -> Self {
|
||||
// The default behavior when running Rust unit tests without any further
|
||||
// flags is to use the newest behavior (DirectRw).
|
||||
// The CI uses the environment variable to unit tests for all different modes.
|
||||
// The CI uses the following environment variable to unit tests for all
|
||||
// different modes.
|
||||
// NB: the Python regression & perf tests have their own defaults management
|
||||
// that writes pageserver.toml; they do not use this variable.
|
||||
static ENV_OVERRIDE: LazyLock<Option<IoMode>> = LazyLock::new(|| {
|
||||
utils::env::var_serde_json_string("NEON_PAGESERVER_UNIT_TEST_VIRTUAL_FILE_IO_MODE")
|
||||
});
|
||||
ENV_OVERRIDE.unwrap_or(IoMode::DirectRw)
|
||||
if cfg!(test) {
|
||||
static CACHED: LazyLock<IoMode> = LazyLock::new(|| {
|
||||
utils::env::var_serde_json_string(
|
||||
"NEON_PAGESERVER_UNIT_TEST_VIRTUAL_FILE_IO_MODE",
|
||||
)
|
||||
.unwrap_or(
|
||||
#[cfg(target_os = "linux")]
|
||||
IoMode::DirectRw,
|
||||
#[cfg(not(target_os = "linux"))]
|
||||
IoMode::Buffered,
|
||||
)
|
||||
});
|
||||
*CACHED
|
||||
} else {
|
||||
IoMode::Buffered
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1869,7 +1883,9 @@ pub mod virtual_file {
|
||||
fn try_from(value: u8) -> Result<Self, Self::Error> {
|
||||
Ok(match value {
|
||||
v if v == (IoMode::Buffered as u8) => IoMode::Buffered,
|
||||
#[cfg(target_os = "linux")]
|
||||
v if v == (IoMode::Direct as u8) => IoMode::Direct,
|
||||
#[cfg(target_os = "linux")]
|
||||
v if v == (IoMode::DirectRw as u8) => IoMode::DirectRw,
|
||||
x => return Err(x),
|
||||
})
|
||||
|
||||
@@ -14,7 +14,6 @@ use pageserver_api::key::Key;
|
||||
use pageserver_api::models::virtual_file::IoMode;
|
||||
use pageserver_api::shard::TenantShardId;
|
||||
use pageserver_api::value::Value;
|
||||
use strum::IntoEnumIterator;
|
||||
use tokio_util::sync::CancellationToken;
|
||||
use utils::bin_ser::BeSer;
|
||||
use utils::id::{TenantId, TimelineId};
|
||||
@@ -245,7 +244,13 @@ fn criterion_benchmark(c: &mut Criterion) {
|
||||
];
|
||||
let exploded_parameters = {
|
||||
let mut out = Vec::new();
|
||||
for io_mode in IoMode::iter() {
|
||||
for io_mode in [
|
||||
IoMode::Buffered,
|
||||
#[cfg(target_os = "linux")]
|
||||
IoMode::Direct,
|
||||
#[cfg(target_os = "linux")]
|
||||
IoMode::DirectRw,
|
||||
] {
|
||||
for param in expect.clone() {
|
||||
let HandPickedParameters {
|
||||
volume_mib,
|
||||
|
||||
@@ -16,9 +16,9 @@ use bytes::{Buf, Bytes, BytesMut};
|
||||
use enum_map::Enum;
|
||||
use pageserver_api::key::{
|
||||
AUX_FILES_KEY, CHECKPOINT_KEY, CONTROLFILE_KEY, CompactKey, DBDIR_KEY, Key, RelDirExists,
|
||||
TWOPHASEDIR_KEY, dbdir_key_range, rel_block_to_key, rel_dir_to_key, rel_key_range,
|
||||
rel_size_to_key, rel_tag_sparse_key, rel_tag_sparse_key_range, relmap_file_key,
|
||||
repl_origin_key, repl_origin_key_range, slru_block_to_key, slru_dir_to_key,
|
||||
TWOPHASEDIR_KEY, dbdir_key_range, key_to_blknum, key_to_rel_tag, rel_block_to_key,
|
||||
rel_dir_to_key, rel_key_range, rel_size_to_key, rel_tag_sparse_key, rel_tag_sparse_key_range,
|
||||
relmap_file_key, repl_origin_key, repl_origin_key_range, slru_block_to_key, slru_dir_to_key,
|
||||
slru_segment_key_range, slru_segment_size_to_key, twophase_file_key, twophase_key_range,
|
||||
};
|
||||
use pageserver_api::keyspace::{KeySpaceRandomAccum, SparseKeySpace};
|
||||
@@ -259,7 +259,7 @@ impl Timeline {
|
||||
let mut result = Vec::with_capacity(pages.len());
|
||||
let result_slots = result.spare_capacity_mut();
|
||||
|
||||
let mut keys_slots: HashMap<Key, smallvec::SmallVec<[(usize, RequestContext); 1]>> =
|
||||
let mut keys_slots: HashMap<Key, smallvec::SmallVec<[(usize, Lsn, RequestContext); 1]>> =
|
||||
HashMap::with_capacity(pages.len());
|
||||
|
||||
let mut req_keyspaces: HashMap<Lsn, KeySpaceRandomAccum> =
|
||||
@@ -275,43 +275,6 @@ impl Timeline {
|
||||
continue;
|
||||
}
|
||||
|
||||
let nblocks = {
|
||||
let ctx = RequestContextBuilder::from(&ctx)
|
||||
.perf_span(|crnt_perf_span| {
|
||||
info_span!(
|
||||
target: PERF_TRACE_TARGET,
|
||||
parent: crnt_perf_span,
|
||||
"GET_REL_SIZE",
|
||||
reltag=%tag,
|
||||
lsn=%lsn,
|
||||
)
|
||||
})
|
||||
.attached_child();
|
||||
|
||||
match self
|
||||
.get_rel_size(*tag, Version::Lsn(lsn), &ctx)
|
||||
.maybe_perf_instrument(&ctx, |crnt_perf_span| crnt_perf_span.clone())
|
||||
.await
|
||||
{
|
||||
Ok(nblocks) => nblocks,
|
||||
Err(err) => {
|
||||
result_slots[response_slot_idx].write(Err(err));
|
||||
slots_filled += 1;
|
||||
continue;
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
if *blknum >= nblocks {
|
||||
debug!(
|
||||
"read beyond EOF at {} blk {} at {}, size is {}: returning all-zeros page",
|
||||
tag, blknum, lsn, nblocks
|
||||
);
|
||||
result_slots[response_slot_idx].write(Ok(ZERO_PAGE.clone()));
|
||||
slots_filled += 1;
|
||||
continue;
|
||||
}
|
||||
|
||||
let key = rel_block_to_key(*tag, *blknum);
|
||||
|
||||
let ctx = RequestContextBuilder::from(&ctx)
|
||||
@@ -326,7 +289,7 @@ impl Timeline {
|
||||
.attached_child();
|
||||
|
||||
let key_slots = keys_slots.entry(key).or_default();
|
||||
key_slots.push((response_slot_idx, ctx));
|
||||
key_slots.push((response_slot_idx, lsn, ctx));
|
||||
|
||||
let acc = req_keyspaces.entry(lsn).or_default();
|
||||
acc.add_key(key);
|
||||
@@ -347,42 +310,95 @@ impl Timeline {
|
||||
Ok(results) => {
|
||||
for (key, res) in results {
|
||||
let mut key_slots = keys_slots.remove(&key).unwrap().into_iter();
|
||||
let (first_slot, first_req_ctx) = key_slots.next().unwrap();
|
||||
|
||||
for (slot, req_ctx) in key_slots {
|
||||
let clone = match &res {
|
||||
Ok(buf) => Ok(buf.clone()),
|
||||
Err(err) => Err(match err {
|
||||
PageReconstructError::Cancelled => PageReconstructError::Cancelled,
|
||||
// Try to check if error is caused by access beyond end of relation
|
||||
match &res {
|
||||
Err(err) => {
|
||||
let tag = key_to_rel_tag(key);
|
||||
let blknum = key_to_blknum(key);
|
||||
let mut first_error_slot: Option<usize> = None;
|
||||
for (slot, lsn, req_ctx) in key_slots {
|
||||
// Check relation size only in case of error
|
||||
let relsize_ctx = RequestContextBuilder::from(&ctx)
|
||||
.perf_span(|crnt_perf_span| {
|
||||
info_span!(
|
||||
target: PERF_TRACE_TARGET,
|
||||
parent: crnt_perf_span,
|
||||
"GET_REL_SIZE",
|
||||
reltag=%tag,
|
||||
lsn=%lsn,
|
||||
)
|
||||
})
|
||||
.attached_child();
|
||||
|
||||
x @ PageReconstructError::Other(_)
|
||||
| x @ PageReconstructError::AncestorLsnTimeout(_)
|
||||
| x @ PageReconstructError::WalRedo(_)
|
||||
| x @ PageReconstructError::MissingKey(_) => {
|
||||
PageReconstructError::Other(anyhow::anyhow!(
|
||||
"there was more than one request for this key in the batch, error logged once: {x:?}"
|
||||
))
|
||||
if let Ok(nblocks) = self
|
||||
.get_rel_size(tag, Version::Lsn(lsn), &relsize_ctx)
|
||||
.maybe_perf_instrument(&ctx, |crnt_perf_span| {
|
||||
crnt_perf_span.clone()
|
||||
})
|
||||
.await
|
||||
{
|
||||
if blknum >= nblocks {
|
||||
debug!(
|
||||
"read beyond EOF at {} blk {} at {}, size is {}: returning all-zeros page",
|
||||
tag, blknum, lsn, nblocks
|
||||
);
|
||||
result_slots[slot].write(Ok(ZERO_PAGE.clone()));
|
||||
slots_filled += 1;
|
||||
continue;
|
||||
}
|
||||
}
|
||||
}),
|
||||
};
|
||||
if first_error_slot.is_none() {
|
||||
first_error_slot = Some(slot);
|
||||
} else {
|
||||
let err = match err {
|
||||
PageReconstructError::Cancelled => {
|
||||
PageReconstructError::Cancelled
|
||||
}
|
||||
|
||||
result_slots[slot].write(clone);
|
||||
// There is no standardized way to express that the batched span followed from N request spans.
|
||||
// So, abuse the system and mark the request contexts as follows_from the batch span, so we get
|
||||
// some linkage in our trace viewer. It allows us to answer: which GET_VECTORED did this GET_PAGE wait for.
|
||||
req_ctx.perf_follows_from(ctx);
|
||||
slots_filled += 1;
|
||||
x @ PageReconstructError::Other(_)
|
||||
| x @ PageReconstructError::AncestorLsnTimeout(_)
|
||||
| x @ PageReconstructError::WalRedo(_)
|
||||
| x @ PageReconstructError::MissingKey(_) => {
|
||||
PageReconstructError::Other(anyhow::anyhow!(
|
||||
"there was more than one request for this key in the batch, error logged once: {x:?}"
|
||||
))
|
||||
}
|
||||
};
|
||||
result_slots[slot].write(Err(err));
|
||||
};
|
||||
// There is no standardized way to express that the batched span followed from N request spans.
|
||||
// So, abuse the system and mark the request contexts as follows_from the batch span, so we get
|
||||
// some linkage in our trace viewer. It allows us to answer: which GET_VECTORED did this GET_PAGE wait for.
|
||||
req_ctx.perf_follows_from(ctx);
|
||||
slots_filled += 1;
|
||||
}
|
||||
if let Some(slot) = first_error_slot {
|
||||
result_slots[slot].write(res);
|
||||
}
|
||||
}
|
||||
Ok(buf) => {
|
||||
let (first_slot, _first_lsn, first_req_ctx) = key_slots.next().unwrap();
|
||||
|
||||
for (slot, _lsn, req_ctx) in key_slots {
|
||||
result_slots[slot].write(Ok(buf.clone()));
|
||||
// There is no standardized way to express that the batched span followed from N request spans.
|
||||
// So, abuse the system and mark the request contexts as follows_from the batch span, so we get
|
||||
// some linkage in our trace viewer. It allows us to answer: which GET_VECTORED did this GET_PAGE wait for.
|
||||
req_ctx.perf_follows_from(ctx);
|
||||
slots_filled += 1;
|
||||
}
|
||||
result_slots[first_slot].write(res);
|
||||
first_req_ctx.perf_follows_from(ctx);
|
||||
slots_filled += 1;
|
||||
}
|
||||
}
|
||||
|
||||
result_slots[first_slot].write(res);
|
||||
first_req_ctx.perf_follows_from(ctx);
|
||||
slots_filled += 1;
|
||||
}
|
||||
}
|
||||
Err(err) => {
|
||||
// this cannot really happen because get_vectored only errors globally on invalid LSN or too large batch size
|
||||
// (We enforce the max batch size outside of this function, in the code that constructs the batch request.)
|
||||
for (slot, req_ctx) in keys_slots.values().flatten() {
|
||||
for (slot, _lsn, req_ctx) in keys_slots.values().flatten() {
|
||||
// this whole `match` is a lot like `From<GetVectoredError> for PageReconstructError`
|
||||
// but without taking ownership of the GetVectoredError
|
||||
let err = match &err {
|
||||
@@ -488,8 +504,6 @@ impl Timeline {
|
||||
let mut buf = version.get(self, key, ctx).await?;
|
||||
let nblocks = buf.get_u32_le();
|
||||
|
||||
self.update_cached_rel_size(tag, version.get_lsn(), nblocks);
|
||||
|
||||
Ok(nblocks)
|
||||
}
|
||||
|
||||
@@ -1343,32 +1357,6 @@ impl Timeline {
|
||||
None
|
||||
}
|
||||
|
||||
/// Update cached relation size if there is no more recent update
|
||||
pub fn update_cached_rel_size(&self, tag: RelTag, lsn: Lsn, nblocks: BlockNumber) {
|
||||
let mut rel_size_cache = self.rel_size_cache.write().unwrap();
|
||||
|
||||
if lsn < rel_size_cache.complete_as_of {
|
||||
// Do not cache old values. It's safe to cache the size on read, as long as
|
||||
// the read was at an LSN since we started the WAL ingestion. Reasoning: we
|
||||
// never evict values from the cache, so if the relation size changed after
|
||||
// 'lsn', the new value is already in the cache.
|
||||
return;
|
||||
}
|
||||
|
||||
match rel_size_cache.map.entry(tag) {
|
||||
hash_map::Entry::Occupied(mut entry) => {
|
||||
let cached_lsn = entry.get_mut();
|
||||
if lsn >= cached_lsn.0 {
|
||||
*cached_lsn = (lsn, nblocks);
|
||||
}
|
||||
}
|
||||
hash_map::Entry::Vacant(entry) => {
|
||||
entry.insert((lsn, nblocks));
|
||||
RELSIZE_CACHE_ENTRIES.inc();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Store cached relation size
|
||||
pub fn set_cached_rel_size(&self, tag: RelTag, lsn: Lsn, nblocks: BlockNumber) {
|
||||
let mut rel_size_cache = self.rel_size_cache.write().unwrap();
|
||||
|
||||
@@ -165,7 +165,6 @@ pub enum LastImageLayerCreationStatus {
|
||||
/// attempt.
|
||||
last_key: Key,
|
||||
},
|
||||
NeedRepartition,
|
||||
Complete,
|
||||
#[default]
|
||||
Initial,
|
||||
@@ -200,11 +199,8 @@ pub struct TimelineResources {
|
||||
|
||||
/// The relation size cache caches relation sizes at the end of the timeline. It speeds up WAL
|
||||
/// ingestion considerably, because WAL ingestion needs to check on most records if the record
|
||||
/// implicitly extends the relation. At startup, `complete_as_of` is initialized to the current end
|
||||
/// of the timeline (disk_consistent_lsn). It's used on reads of relation sizes to check if the
|
||||
/// value can be used to also update the cache, see [`Timeline::update_cached_rel_size`].
|
||||
/// implicitly extends the relation.
|
||||
pub(crate) struct RelSizeCache {
|
||||
pub(crate) complete_as_of: Lsn,
|
||||
pub(crate) map: HashMap<RelTag, (Lsn, BlockNumber)>,
|
||||
}
|
||||
|
||||
@@ -2971,7 +2967,6 @@ impl Timeline {
|
||||
|
||||
last_received_wal: Mutex::new(None),
|
||||
rel_size_cache: RwLock::new(RelSizeCache {
|
||||
complete_as_of: disk_consistent_lsn,
|
||||
map: HashMap::new(),
|
||||
}),
|
||||
|
||||
|
||||
@@ -1278,24 +1278,18 @@ impl Timeline {
|
||||
}
|
||||
|
||||
let gc_cutoff = *self.applied_gc_cutoff_lsn.read();
|
||||
let mut flags = options.flags;
|
||||
if let LastImageLayerCreationStatus::NeedRepartition =
|
||||
self.last_image_layer_creation_status.load().as_ref()
|
||||
{
|
||||
flags.insert(CompactFlags::ForceRepartition);
|
||||
}
|
||||
|
||||
// 2. Repartition and create image layers if necessary
|
||||
match self
|
||||
.repartition(
|
||||
self.get_last_record_lsn(),
|
||||
self.get_compaction_target_size(),
|
||||
flags,
|
||||
options.flags,
|
||||
ctx,
|
||||
)
|
||||
.await
|
||||
{
|
||||
Ok(((dense_partitioning, sparse_partitioning), lsn)) => {
|
||||
Ok(((dense_partitioning, sparse_partitioning), lsn)) if lsn >= gc_cutoff => {
|
||||
// Disables access_stats updates, so that the files we read remain candidates for eviction after we're done with them
|
||||
let image_ctx = RequestContextBuilder::from(ctx)
|
||||
.access_stats_behavior(AccessStatsBehavior::Skip)
|
||||
@@ -1307,7 +1301,7 @@ impl Timeline {
|
||||
.extend(sparse_partitioning.into_dense().parts);
|
||||
|
||||
// 3. Create new image layers for partitions that have been modified "enough".
|
||||
let res = self
|
||||
let (image_layers, outcome) = self
|
||||
.create_image_layers(
|
||||
&partitioning,
|
||||
lsn,
|
||||
@@ -1334,41 +1328,23 @@ impl Timeline {
|
||||
{
|
||||
critical!("missing key during compaction: {err:?}");
|
||||
}
|
||||
});
|
||||
})?;
|
||||
|
||||
match res {
|
||||
Ok((image_layers, outcome)) => {
|
||||
self.last_image_layer_creation_status
|
||||
.store(Arc::new(outcome.clone()));
|
||||
self.last_image_layer_creation_status
|
||||
.store(Arc::new(outcome.clone()));
|
||||
|
||||
self.upload_new_image_layers(image_layers)?;
|
||||
if let LastImageLayerCreationStatus::Incomplete { .. } = outcome {
|
||||
// Yield and do not do any other kind of compaction.
|
||||
info!(
|
||||
"skipping shard ancestor compaction due to pending image layer generation tasks (preempted by L0 compaction)."
|
||||
);
|
||||
return Ok(CompactionOutcome::YieldForL0);
|
||||
}
|
||||
// Fall through to shard ancestor compaction
|
||||
}
|
||||
Err(err) if lsn <= gc_cutoff => {
|
||||
if let CreateImageLayersError::GetVectoredError(_) = err {
|
||||
warn!(
|
||||
"could not create image layers due to {}; this is not critical because the requested image LSN is below the GC curoff",
|
||||
err
|
||||
);
|
||||
self.last_image_layer_creation_status
|
||||
.store(Arc::new(LastImageLayerCreationStatus::NeedRepartition));
|
||||
self.upload_new_image_layers(image_layers)?;
|
||||
if let LastImageLayerCreationStatus::Incomplete { .. } = outcome {
|
||||
// Yield and do not do any other kind of compaction.
|
||||
info!(
|
||||
"skipping shard ancestor compaction due to pending image layer generation tasks (preempted by L0 compaction)."
|
||||
);
|
||||
return Ok(CompactionOutcome::YieldForL0);
|
||||
}
|
||||
}
|
||||
|
||||
// Fall through to shard ancestor compaction
|
||||
} else {
|
||||
return Err(err.into());
|
||||
}
|
||||
}
|
||||
Err(e) => {
|
||||
return Err(e.into());
|
||||
}
|
||||
};
|
||||
Ok(_) => {
|
||||
info!("skipping repartitioning due to image compaction LSN being below GC cutoff");
|
||||
}
|
||||
|
||||
// Suppress errors when cancelled.
|
||||
|
||||
@@ -74,8 +74,6 @@ pub struct VirtualFile {
|
||||
|
||||
impl VirtualFile {
|
||||
/// Open a file in read-only mode. Like File::open.
|
||||
///
|
||||
/// Insensitive to `virtual_file_io_mode` setting.
|
||||
pub async fn open<P: AsRef<Utf8Path>>(
|
||||
path: P,
|
||||
ctx: &RequestContext,
|
||||
@@ -97,20 +95,31 @@ impl VirtualFile {
|
||||
Self::open_with_options_v2(path.as_ref(), OpenOptions::new().read(true), ctx).await
|
||||
}
|
||||
|
||||
/// `O_DIRECT` will be enabled base on `virtual_file_io_mode`.
|
||||
pub async fn open_with_options_v2<P: AsRef<Utf8Path>>(
|
||||
path: P,
|
||||
mut open_options: OpenOptions,
|
||||
#[cfg_attr(not(target_os = "linux"), allow(unused_mut))] mut open_options: OpenOptions,
|
||||
ctx: &RequestContext,
|
||||
) -> Result<Self, std::io::Error> {
|
||||
let mode = get_io_mode();
|
||||
let direct = match (mode, open_options.is_write()) {
|
||||
let set_o_direct = match (mode, open_options.is_write()) {
|
||||
(IoMode::Buffered, _) => false,
|
||||
#[cfg(target_os = "linux")]
|
||||
(IoMode::Direct, false) => true,
|
||||
#[cfg(target_os = "linux")]
|
||||
(IoMode::Direct, true) => false,
|
||||
#[cfg(target_os = "linux")]
|
||||
(IoMode::DirectRw, _) => true,
|
||||
};
|
||||
open_options = open_options.direct(direct);
|
||||
if set_o_direct {
|
||||
#[cfg(target_os = "linux")]
|
||||
{
|
||||
open_options = open_options.custom_flags(nix::libc::O_DIRECT);
|
||||
}
|
||||
#[cfg(not(target_os = "linux"))]
|
||||
unreachable!(
|
||||
"O_DIRECT is not supported on this platform, IoMode's that result in set_o_direct=true shouldn't even be defined"
|
||||
);
|
||||
}
|
||||
let inner = VirtualFileInner::open_with_options(path, open_options, ctx).await?;
|
||||
Ok(VirtualFile { inner, _mode: mode })
|
||||
}
|
||||
@@ -782,12 +791,6 @@ impl VirtualFileInner {
|
||||
where
|
||||
Buf: tokio_epoll_uring::IoBufMut + Send,
|
||||
{
|
||||
self.validate_direct_io(
|
||||
Slice::stable_ptr(&buf).addr(),
|
||||
Slice::bytes_total(&buf),
|
||||
offset,
|
||||
);
|
||||
|
||||
let file_guard = match self
|
||||
.lock_file()
|
||||
.await
|
||||
@@ -813,8 +816,6 @@ impl VirtualFileInner {
|
||||
offset: u64,
|
||||
ctx: &RequestContext,
|
||||
) -> (FullSlice<B>, Result<usize, Error>) {
|
||||
self.validate_direct_io(buf.as_ptr().addr(), buf.len(), offset);
|
||||
|
||||
let file_guard = match self.lock_file().await {
|
||||
Ok(file_guard) => file_guard,
|
||||
Err(e) => return (buf, Err(e)),
|
||||
@@ -829,64 +830,6 @@ impl VirtualFileInner {
|
||||
(buf, result)
|
||||
})
|
||||
}
|
||||
|
||||
/// Validate all reads and writes to adhere to the O_DIRECT requirements of our production systems.
|
||||
///
|
||||
/// Validating it iin userspace sets a consistent bar, independent of what actual OS/filesystem/block device is in use.
|
||||
fn validate_direct_io(&self, addr: usize, size: usize, offset: u64) {
|
||||
// TODO: eventually enable validation in the builds we use in real environments like staging, preprod, and prod.
|
||||
if !(cfg!(feature = "testing") || cfg!(test)) {
|
||||
return;
|
||||
}
|
||||
if !self.open_options.is_direct() {
|
||||
return;
|
||||
}
|
||||
|
||||
// Validate buffer memory alignment.
|
||||
//
|
||||
// What practically matters as of Linux 6.1 is bdev_dma_alignment()
|
||||
// which is practically between 512 and 4096.
|
||||
// On our production systems, the value is 512.
|
||||
// The IoBuffer/IoBufferMut hard-code that value.
|
||||
//
|
||||
// Because the alloctor might return _more_ aligned addresses than requested,
|
||||
// there is a chance that testing would not catch violations of a runtime requirement stricter than 512.
|
||||
{
|
||||
let requirement = 512;
|
||||
let remainder = addr % requirement;
|
||||
assert!(
|
||||
remainder == 0,
|
||||
"Direct I/O buffer must be aligned: buffer_addr=0x{addr:x} % 0x{requirement:x} = 0x{remainder:x}"
|
||||
);
|
||||
}
|
||||
|
||||
// Validate offset alignment.
|
||||
//
|
||||
// We hard-code 512 throughout the code base.
|
||||
// So enforce just that and not anything more restrictive.
|
||||
// Even the shallowest testing will expose more restrictive requirements if those ever arise.
|
||||
{
|
||||
let requirement = 512;
|
||||
let remainder = offset % requirement;
|
||||
assert!(
|
||||
remainder == 0,
|
||||
"Direct I/O offset must be aligned: offset=0x{offset:x} % 0x{requirement:x} = 0x{remainder:x}"
|
||||
);
|
||||
}
|
||||
|
||||
// Validate buffer size multiple requirement.
|
||||
//
|
||||
// The requirement in Linux 6.1 is bdev_logical_block_size().
|
||||
// On our production systems, that is 512.
|
||||
{
|
||||
let requirement = 512;
|
||||
let remainder = size % requirement;
|
||||
assert!(
|
||||
remainder == 0,
|
||||
"Direct I/O buffer size must be a multiple of {requirement}: size=0x{size:x} % 0x{requirement:x} = 0x{remainder:x}"
|
||||
);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Adapted from https://doc.rust-lang.org/1.72.0/src/std/os/unix/fs.rs.html#117-135
|
||||
@@ -1275,6 +1218,7 @@ mod tests {
|
||||
use std::sync::Arc;
|
||||
|
||||
use owned_buffers_io::io_buf_ext::IoBufExt;
|
||||
use owned_buffers_io::slice::SliceMutExt;
|
||||
use rand::seq::SliceRandom;
|
||||
use rand::{Rng, thread_rng};
|
||||
|
||||
@@ -1282,38 +1226,162 @@ mod tests {
|
||||
use crate::context::DownloadBehavior;
|
||||
use crate::task_mgr::TaskKind;
|
||||
|
||||
enum MaybeVirtualFile {
|
||||
VirtualFile(VirtualFile),
|
||||
File(File),
|
||||
}
|
||||
|
||||
impl From<VirtualFile> for MaybeVirtualFile {
|
||||
fn from(vf: VirtualFile) -> Self {
|
||||
MaybeVirtualFile::VirtualFile(vf)
|
||||
}
|
||||
}
|
||||
|
||||
impl MaybeVirtualFile {
|
||||
async fn read_exact_at(
|
||||
&self,
|
||||
mut slice: tokio_epoll_uring::Slice<IoBufferMut>,
|
||||
offset: u64,
|
||||
ctx: &RequestContext,
|
||||
) -> Result<tokio_epoll_uring::Slice<IoBufferMut>, Error> {
|
||||
match self {
|
||||
MaybeVirtualFile::VirtualFile(file) => file.read_exact_at(slice, offset, ctx).await,
|
||||
MaybeVirtualFile::File(file) => {
|
||||
let rust_slice: &mut [u8] = slice.as_mut_rust_slice_full_zeroed();
|
||||
file.read_exact_at(rust_slice, offset).map(|()| slice)
|
||||
}
|
||||
}
|
||||
}
|
||||
async fn write_all_at<Buf: IoBufAligned + Send>(
|
||||
&self,
|
||||
buf: FullSlice<Buf>,
|
||||
offset: u64,
|
||||
ctx: &RequestContext,
|
||||
) -> Result<(), Error> {
|
||||
match self {
|
||||
MaybeVirtualFile::VirtualFile(file) => {
|
||||
let (_buf, res) = file.write_all_at(buf, offset, ctx).await;
|
||||
res
|
||||
}
|
||||
MaybeVirtualFile::File(file) => file.write_all_at(&buf[..], offset),
|
||||
}
|
||||
}
|
||||
|
||||
// Helper function to slurp a portion of a file into a string
|
||||
async fn read_string_at(
|
||||
&mut self,
|
||||
pos: u64,
|
||||
len: usize,
|
||||
ctx: &RequestContext,
|
||||
) -> Result<String, Error> {
|
||||
let slice = IoBufferMut::with_capacity(len).slice_full();
|
||||
assert_eq!(slice.bytes_total(), len);
|
||||
let slice = self.read_exact_at(slice, pos, ctx).await?;
|
||||
let buf = slice.into_inner();
|
||||
assert_eq!(buf.len(), len);
|
||||
|
||||
Ok(String::from_utf8(buf.to_vec()).unwrap())
|
||||
}
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_virtual_files() -> anyhow::Result<()> {
|
||||
// The real work is done in the test_files() helper function. This
|
||||
// allows us to run the same set of tests against a native File, and
|
||||
// VirtualFile. We trust the native Files and wouldn't need to test them,
|
||||
// but this allows us to verify that the operations return the same
|
||||
// results with VirtualFiles as with native Files. (Except that with
|
||||
// native files, you will run out of file descriptors if the ulimit
|
||||
// is low enough.)
|
||||
struct A;
|
||||
|
||||
impl Adapter for A {
|
||||
async fn open(
|
||||
path: Utf8PathBuf,
|
||||
opts: OpenOptions,
|
||||
ctx: &RequestContext,
|
||||
) -> Result<MaybeVirtualFile, anyhow::Error> {
|
||||
let vf = VirtualFile::open_with_options_v2(&path, opts, ctx).await?;
|
||||
Ok(MaybeVirtualFile::VirtualFile(vf))
|
||||
}
|
||||
}
|
||||
test_files::<A>("virtual_files").await
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_physical_files() -> anyhow::Result<()> {
|
||||
struct B;
|
||||
|
||||
impl Adapter for B {
|
||||
async fn open(
|
||||
path: Utf8PathBuf,
|
||||
opts: OpenOptions,
|
||||
_ctx: &RequestContext,
|
||||
) -> Result<MaybeVirtualFile, anyhow::Error> {
|
||||
Ok(MaybeVirtualFile::File({
|
||||
let owned_fd = opts.open(path.as_std_path()).await?;
|
||||
File::from(owned_fd)
|
||||
}))
|
||||
}
|
||||
}
|
||||
|
||||
test_files::<B>("physical_files").await
|
||||
}
|
||||
|
||||
/// This is essentially a closure which returns a MaybeVirtualFile, but because rust edition
|
||||
/// 2024 is not yet out with new lifetime capture or outlives rules, this is a async function
|
||||
/// in trait which benefits from the new lifetime capture rules already.
|
||||
trait Adapter {
|
||||
async fn open(
|
||||
path: Utf8PathBuf,
|
||||
opts: OpenOptions,
|
||||
ctx: &RequestContext,
|
||||
) -> Result<MaybeVirtualFile, anyhow::Error>;
|
||||
}
|
||||
|
||||
async fn test_files<A>(testname: &str) -> anyhow::Result<()>
|
||||
where
|
||||
A: Adapter,
|
||||
{
|
||||
let ctx =
|
||||
RequestContext::new(TaskKind::UnitTest, DownloadBehavior::Error).with_scope_unit_test();
|
||||
let testdir = crate::config::PageServerConf::test_repo_dir("test_virtual_files");
|
||||
let testdir = crate::config::PageServerConf::test_repo_dir(testname);
|
||||
std::fs::create_dir_all(&testdir)?;
|
||||
|
||||
let zeropad512 = |content: &[u8]| {
|
||||
let mut buf = IoBufferMut::with_capacity_zeroed(512);
|
||||
buf[..content.len()].copy_from_slice(content);
|
||||
buf.freeze().slice_len()
|
||||
};
|
||||
|
||||
let path_a = testdir.join("file_a");
|
||||
let file_a = VirtualFile::open_with_options_v2(
|
||||
let mut file_a = A::open(
|
||||
path_a.clone(),
|
||||
OpenOptions::new()
|
||||
.read(true)
|
||||
.write(true)
|
||||
// set create & truncate flags to ensure when we trigger a reopen later in this test,
|
||||
// the reopen_options must have masked out those flags; if they don't, then
|
||||
// the after reopen we will fail to read the `content_a` that we write here.
|
||||
.create(true)
|
||||
.truncate(true),
|
||||
.truncate(true)
|
||||
.to_owned(),
|
||||
&ctx,
|
||||
)
|
||||
.await?;
|
||||
let (_, res) = file_a.write_all_at(zeropad512(b"content_a"), 0, &ctx).await;
|
||||
res?;
|
||||
|
||||
file_a
|
||||
.write_all_at(IoBuffer::from(b"foobar").slice_len(), 0, &ctx)
|
||||
.await?;
|
||||
|
||||
// cannot read from a file opened in write-only mode
|
||||
let _ = file_a.read_string_at(0, 1, &ctx).await.unwrap_err();
|
||||
|
||||
// Close the file and re-open for reading
|
||||
let mut file_a = A::open(path_a, OpenOptions::new().read(true), &ctx).await?;
|
||||
|
||||
// cannot write to a file opened in read-only mode
|
||||
let _ = file_a
|
||||
.write_all_at(IoBuffer::from(b"bar").slice_len(), 0, &ctx)
|
||||
.await
|
||||
.unwrap_err();
|
||||
|
||||
// Try simple read
|
||||
assert_eq!("foobar", file_a.read_string_at(0, 6, &ctx).await?);
|
||||
|
||||
// Create another test file, and try FileExt functions on it.
|
||||
let path_b = testdir.join("file_b");
|
||||
let file_b = VirtualFile::open_with_options_v2(
|
||||
let mut file_b = A::open(
|
||||
path_b.clone(),
|
||||
OpenOptions::new()
|
||||
.read(true)
|
||||
@@ -1323,44 +1391,37 @@ mod tests {
|
||||
&ctx,
|
||||
)
|
||||
.await?;
|
||||
let (_, res) = file_b.write_all_at(zeropad512(b"content_b"), 0, &ctx).await;
|
||||
res?;
|
||||
|
||||
let assert_first_512_eq = async |vfile: &VirtualFile, expect: &[u8]| {
|
||||
let buf = vfile
|
||||
.read_exact_at(IoBufferMut::with_capacity_zeroed(512).slice_full(), 0, &ctx)
|
||||
.await
|
||||
.unwrap();
|
||||
assert_eq!(&buf[..], &zeropad512(expect)[..]);
|
||||
};
|
||||
|
||||
// Open a lot of file descriptors / VirtualFile instances.
|
||||
// Enough to cause some evictions in the fd cache.
|
||||
|
||||
let mut file_b_dupes = Vec::new();
|
||||
for _ in 0..100 {
|
||||
let vfile = VirtualFile::open_with_options_v2(
|
||||
path_b.clone(),
|
||||
OpenOptions::new().read(true),
|
||||
&ctx,
|
||||
)
|
||||
file_b
|
||||
.write_all_at(IoBuffer::from(b"BAR").slice_len(), 3, &ctx)
|
||||
.await?;
|
||||
assert_first_512_eq(&vfile, b"content_b").await;
|
||||
file_b_dupes.push(vfile);
|
||||
file_b
|
||||
.write_all_at(IoBuffer::from(b"FOO").slice_len(), 0, &ctx)
|
||||
.await?;
|
||||
|
||||
assert_eq!(file_b.read_string_at(2, 3, &ctx).await?, "OBA");
|
||||
|
||||
// Open a lot of files, enough to cause some evictions. (Or to be precise,
|
||||
// open the same file many times. The effect is the same.)
|
||||
|
||||
let mut vfiles = Vec::new();
|
||||
for _ in 0..100 {
|
||||
let mut vfile = A::open(path_b.clone(), OpenOptions::new().read(true), &ctx).await?;
|
||||
assert_eq!("FOOBAR", vfile.read_string_at(0, 6, &ctx).await?);
|
||||
vfiles.push(vfile);
|
||||
}
|
||||
|
||||
// make sure we opened enough files to definitely cause evictions.
|
||||
assert!(file_b_dupes.len() > TEST_MAX_FILE_DESCRIPTORS * 2);
|
||||
assert!(vfiles.len() > TEST_MAX_FILE_DESCRIPTORS * 2);
|
||||
|
||||
// The underlying file descriptor for 'file_a' should be closed now. Try to read
|
||||
// from it again. The VirtualFile reopens the file internally.
|
||||
assert_first_512_eq(&file_a, b"content_a").await;
|
||||
// from it again.
|
||||
assert_eq!("foobar", file_a.read_string_at(0, 6, &ctx).await?);
|
||||
|
||||
// Check that all the other FDs still work too. Use them in random order for
|
||||
// good measure.
|
||||
file_b_dupes.as_mut_slice().shuffle(&mut thread_rng());
|
||||
for vfile in file_b_dupes.iter_mut() {
|
||||
assert_first_512_eq(vfile, b"content_b").await;
|
||||
vfiles.as_mut_slice().shuffle(&mut thread_rng());
|
||||
for vfile in vfiles.iter_mut() {
|
||||
assert_eq!("OOBAR", vfile.read_string_at(1, 5, &ctx).await?);
|
||||
}
|
||||
|
||||
Ok(())
|
||||
@@ -1391,7 +1452,7 @@ mod tests {
|
||||
// Open the file many times.
|
||||
let mut files = Vec::new();
|
||||
for _ in 0..VIRTUAL_FILES {
|
||||
let f = VirtualFile::open_with_options_v2(
|
||||
let f = VirtualFileInner::open_with_options(
|
||||
&test_file_path,
|
||||
OpenOptions::new().read(true),
|
||||
&ctx,
|
||||
@@ -1436,6 +1497,8 @@ mod tests {
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_atomic_overwrite_basic() {
|
||||
let ctx =
|
||||
RequestContext::new(TaskKind::UnitTest, DownloadBehavior::Error).with_scope_unit_test();
|
||||
let testdir = crate::config::PageServerConf::test_repo_dir("test_atomic_overwrite_basic");
|
||||
std::fs::create_dir_all(&testdir).unwrap();
|
||||
|
||||
@@ -1445,22 +1508,26 @@ mod tests {
|
||||
VirtualFileInner::crashsafe_overwrite(path.clone(), tmp_path.clone(), b"foo".to_vec())
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
let post = std::fs::read_to_string(&path).unwrap();
|
||||
let mut file = MaybeVirtualFile::from(VirtualFile::open(&path, &ctx).await.unwrap());
|
||||
let post = file.read_string_at(0, 3, &ctx).await.unwrap();
|
||||
assert_eq!(post, "foo");
|
||||
assert!(!tmp_path.exists());
|
||||
drop(file);
|
||||
|
||||
VirtualFileInner::crashsafe_overwrite(path.clone(), tmp_path.clone(), b"bar".to_vec())
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
let post = std::fs::read_to_string(&path).unwrap();
|
||||
let mut file = MaybeVirtualFile::from(VirtualFile::open(&path, &ctx).await.unwrap());
|
||||
let post = file.read_string_at(0, 3, &ctx).await.unwrap();
|
||||
assert_eq!(post, "bar");
|
||||
assert!(!tmp_path.exists());
|
||||
drop(file);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_atomic_overwrite_preexisting_tmp() {
|
||||
let ctx =
|
||||
RequestContext::new(TaskKind::UnitTest, DownloadBehavior::Error).with_scope_unit_test();
|
||||
let testdir =
|
||||
crate::config::PageServerConf::test_repo_dir("test_atomic_overwrite_preexisting_tmp");
|
||||
std::fs::create_dir_all(&testdir).unwrap();
|
||||
@@ -1475,8 +1542,10 @@ mod tests {
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
let post = std::fs::read_to_string(&path).unwrap();
|
||||
let mut file = MaybeVirtualFile::from(VirtualFile::open(&path, &ctx).await.unwrap());
|
||||
let post = file.read_string_at(0, 3, &ctx).await.unwrap();
|
||||
assert_eq!(post, "foo");
|
||||
assert!(!tmp_path.exists());
|
||||
drop(file);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -8,13 +8,7 @@ use super::io_engine::IoEngine;
|
||||
|
||||
#[derive(Debug, Clone)]
|
||||
pub struct OpenOptions {
|
||||
/// We keep a copy of the write() flag we pass to the `inner`` `OptionOptions`
|
||||
/// to support [`Self::is_write`].
|
||||
write: bool,
|
||||
/// We don't expose + pass through a raw `custom_flags()` style API.
|
||||
/// The only custom flag we support is `O_DIRECT`, which we track here
|
||||
/// and map to `custom_flags()` in the [`Self::open`] method.
|
||||
direct: bool,
|
||||
inner: Inner,
|
||||
}
|
||||
#[derive(Debug, Clone)]
|
||||
@@ -36,7 +30,6 @@ impl Default for OpenOptions {
|
||||
};
|
||||
Self {
|
||||
write: false,
|
||||
direct: false,
|
||||
inner,
|
||||
}
|
||||
}
|
||||
@@ -51,10 +44,6 @@ impl OpenOptions {
|
||||
self.write
|
||||
}
|
||||
|
||||
pub(super) fn is_direct(&self) -> bool {
|
||||
self.direct
|
||||
}
|
||||
|
||||
pub fn read(mut self, read: bool) -> Self {
|
||||
match &mut self.inner {
|
||||
Inner::StdFs(x) => {
|
||||
@@ -127,38 +116,13 @@ impl OpenOptions {
|
||||
}
|
||||
|
||||
pub(in crate::virtual_file) async fn open(&self, path: &Path) -> std::io::Result<OwnedFd> {
|
||||
#[cfg_attr(not(target_os = "linux"), allow(unused_mut))]
|
||||
let mut custom_flags = 0;
|
||||
if self.direct {
|
||||
match &self.inner {
|
||||
Inner::StdFs(x) => x.open(path).map(|file| file.into()),
|
||||
#[cfg(target_os = "linux")]
|
||||
{
|
||||
custom_flags |= nix::libc::O_DIRECT;
|
||||
}
|
||||
#[cfg(not(target_os = "linux"))]
|
||||
{
|
||||
// Other platforms may be used for development but don't necessarily have a 1:1 equivalent to Linux's O_DIRECT (macOS!).
|
||||
// Just don't set the flag; to catch alignment bugs typical for O_DIRECT,
|
||||
// we have a runtime validation layer inside `VirtualFile::write_at` and `VirtualFile::read_at`.
|
||||
static WARNING: std::sync::Once = std::sync::Once::new();
|
||||
WARNING.call_once(|| {
|
||||
let span = tracing::info_span!(parent: None, "open_options");
|
||||
let _enter = span.enter();
|
||||
tracing::warn!("your platform is not a supported production platform, ignoing request for O_DIRECT; this could hide alignment bugs; this warning is logged once per process");
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
match self.inner.clone() {
|
||||
Inner::StdFs(mut x) => x
|
||||
.custom_flags(custom_flags)
|
||||
.open(path)
|
||||
.map(|file| file.into()),
|
||||
#[cfg(target_os = "linux")]
|
||||
Inner::TokioEpollUring(mut x) => {
|
||||
x.custom_flags(custom_flags);
|
||||
Inner::TokioEpollUring(x) => {
|
||||
let system = super::io_engine::tokio_epoll_uring_ext::thread_local_system().await;
|
||||
let (_, res) = super::io_engine::retry_ecanceled_once((), |()| async {
|
||||
let res = system.open(path, &x).await;
|
||||
let res = system.open(path, x).await;
|
||||
((), res)
|
||||
})
|
||||
.await;
|
||||
@@ -180,8 +144,19 @@ impl OpenOptions {
|
||||
self
|
||||
}
|
||||
|
||||
pub fn direct(mut self, direct: bool) -> Self {
|
||||
self.direct = direct;
|
||||
pub fn custom_flags(mut self, flags: i32) -> Self {
|
||||
if flags & nix::libc::O_APPEND != 0 {
|
||||
super::io_engine::panic_operation_must_be_idempotent();
|
||||
}
|
||||
match &mut self.inner {
|
||||
Inner::StdFs(x) => {
|
||||
let _ = x.custom_flags(flags);
|
||||
}
|
||||
#[cfg(target_os = "linux")]
|
||||
Inner::TokioEpollUring(x) => {
|
||||
let _ = x.custom_flags(flags);
|
||||
}
|
||||
}
|
||||
self
|
||||
}
|
||||
}
|
||||
|
||||
@@ -433,6 +433,7 @@ pageserver_connect(shardno_t shard_no, int elevel)
|
||||
|
||||
now = GetCurrentTimestamp();
|
||||
us_since_last_attempt = (int64) (now - shard->last_reconnect_time);
|
||||
shard->last_reconnect_time = now;
|
||||
|
||||
/*
|
||||
* Make sure we don't do exponential backoff with a constant multiplier
|
||||
@@ -446,23 +447,14 @@ pageserver_connect(shardno_t shard_no, int elevel)
|
||||
/*
|
||||
* If we did other tasks between reconnect attempts, then we won't
|
||||
* need to wait as long as a full delay.
|
||||
*
|
||||
* This is a loop to protect against interrupted sleeps.
|
||||
*/
|
||||
while (us_since_last_attempt < shard->delay_us)
|
||||
if (us_since_last_attempt < shard->delay_us)
|
||||
{
|
||||
pg_usleep(shard->delay_us - us_since_last_attempt);
|
||||
|
||||
/* At least we should handle cancellations here */
|
||||
CHECK_FOR_INTERRUPTS();
|
||||
|
||||
now = GetCurrentTimestamp();
|
||||
us_since_last_attempt = (int64) (now - shard->last_reconnect_time);
|
||||
}
|
||||
|
||||
/* update the delay metric */
|
||||
shard->delay_us = Min(shard->delay_us * 2, MAX_RECONNECT_INTERVAL_USEC);
|
||||
shard->last_reconnect_time = now;
|
||||
|
||||
/*
|
||||
* Connect using the connection string we got from the
|
||||
|
||||
@@ -1,9 +1,4 @@
|
||||
use std::{
|
||||
collections::HashMap,
|
||||
str::FromStr,
|
||||
sync::{Arc, atomic::AtomicU64},
|
||||
time::Duration,
|
||||
};
|
||||
use std::{collections::HashMap, str::FromStr, sync::Arc, time::Duration};
|
||||
|
||||
use clashmap::{ClashMap, Entry};
|
||||
use safekeeper_api::models::PullTimelineRequest;
|
||||
@@ -174,17 +169,10 @@ pub(crate) struct ScheduleRequest {
|
||||
pub(crate) kind: SafekeeperTimelineOpKind,
|
||||
}
|
||||
|
||||
/// A way to keep ongoing/queued reconcile requests apart
|
||||
#[derive(Copy, Clone, PartialEq, Eq)]
|
||||
struct TokenId(u64);
|
||||
|
||||
type OngoingTokens = ClashMap<(TenantId, Option<TimelineId>), (CancellationToken, TokenId)>;
|
||||
|
||||
/// Handle to per safekeeper reconciler.
|
||||
struct ReconcilerHandle {
|
||||
tx: UnboundedSender<(ScheduleRequest, CancellationToken, TokenId)>,
|
||||
ongoing_tokens: Arc<OngoingTokens>,
|
||||
token_id_counter: AtomicU64,
|
||||
tx: UnboundedSender<(ScheduleRequest, CancellationToken)>,
|
||||
ongoing_tokens: Arc<ClashMap<(TenantId, Option<TimelineId>), CancellationToken>>,
|
||||
cancel: CancellationToken,
|
||||
}
|
||||
|
||||
@@ -197,28 +185,24 @@ impl ReconcilerHandle {
|
||||
&self,
|
||||
tenant_id: TenantId,
|
||||
timeline_id: Option<TimelineId>,
|
||||
) -> (CancellationToken, TokenId) {
|
||||
let token_id = self
|
||||
.token_id_counter
|
||||
.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
|
||||
let token_id = TokenId(token_id);
|
||||
) -> CancellationToken {
|
||||
let entry = self.ongoing_tokens.entry((tenant_id, timeline_id));
|
||||
if let Entry::Occupied(entry) = &entry {
|
||||
let (cancel, _) = entry.get();
|
||||
let cancel: &CancellationToken = entry.get();
|
||||
cancel.cancel();
|
||||
}
|
||||
entry.insert((self.cancel.child_token(), token_id)).clone()
|
||||
entry.insert(self.cancel.child_token()).clone()
|
||||
}
|
||||
/// Cancel an ongoing reconciliation
|
||||
fn cancel_reconciliation(&self, tenant_id: TenantId, timeline_id: Option<TimelineId>) {
|
||||
if let Some((_, (cancel, _id))) = self.ongoing_tokens.remove(&(tenant_id, timeline_id)) {
|
||||
if let Some((_, cancel)) = self.ongoing_tokens.remove(&(tenant_id, timeline_id)) {
|
||||
cancel.cancel();
|
||||
}
|
||||
}
|
||||
fn schedule_reconcile(&self, req: ScheduleRequest) {
|
||||
let (cancel, token_id) = self.new_token_slot(req.tenant_id, req.timeline_id);
|
||||
let cancel = self.new_token_slot(req.tenant_id, req.timeline_id);
|
||||
let hostname = req.safekeeper.skp.host.clone();
|
||||
if let Err(err) = self.tx.send((req, cancel, token_id)) {
|
||||
if let Err(err) = self.tx.send((req, cancel)) {
|
||||
tracing::info!("scheduling request onto {hostname} returned error: {err}");
|
||||
}
|
||||
}
|
||||
@@ -227,14 +211,13 @@ impl ReconcilerHandle {
|
||||
pub(crate) struct SafekeeperReconciler {
|
||||
inner: SafekeeperReconcilerInner,
|
||||
concurrency_limiter: Arc<Semaphore>,
|
||||
rx: UnboundedReceiver<(ScheduleRequest, CancellationToken, TokenId)>,
|
||||
rx: UnboundedReceiver<(ScheduleRequest, CancellationToken)>,
|
||||
cancel: CancellationToken,
|
||||
}
|
||||
|
||||
/// Thin wrapper over `Service` to not clutter its inherent functions
|
||||
#[derive(Clone)]
|
||||
struct SafekeeperReconcilerInner {
|
||||
ongoing_tokens: Arc<OngoingTokens>,
|
||||
service: Arc<Service>,
|
||||
}
|
||||
|
||||
@@ -243,20 +226,15 @@ impl SafekeeperReconciler {
|
||||
// We hold the ServiceInner lock so we don't want to make sending to the reconciler channel to be blocking.
|
||||
let (tx, rx) = mpsc::unbounded_channel();
|
||||
let concurrency = service.config.safekeeper_reconciler_concurrency;
|
||||
let ongoing_tokens = Arc::new(ClashMap::new());
|
||||
let mut reconciler = SafekeeperReconciler {
|
||||
inner: SafekeeperReconcilerInner {
|
||||
service,
|
||||
ongoing_tokens: ongoing_tokens.clone(),
|
||||
},
|
||||
inner: SafekeeperReconcilerInner { service },
|
||||
rx,
|
||||
concurrency_limiter: Arc::new(Semaphore::new(concurrency)),
|
||||
cancel: cancel.clone(),
|
||||
};
|
||||
let handle = ReconcilerHandle {
|
||||
tx,
|
||||
ongoing_tokens,
|
||||
token_id_counter: AtomicU64::new(0),
|
||||
ongoing_tokens: Arc::new(ClashMap::new()),
|
||||
cancel,
|
||||
};
|
||||
tokio::spawn(async move { reconciler.run().await });
|
||||
@@ -268,9 +246,7 @@ impl SafekeeperReconciler {
|
||||
req = self.rx.recv() => req,
|
||||
_ = self.cancel.cancelled() => break,
|
||||
};
|
||||
let Some((req, req_cancel, req_token_id)) = req else {
|
||||
break;
|
||||
};
|
||||
let Some((req, req_cancel)) = req else { break };
|
||||
|
||||
let permit_res = tokio::select! {
|
||||
req = self.concurrency_limiter.clone().acquire_owned() => req,
|
||||
@@ -289,7 +265,7 @@ impl SafekeeperReconciler {
|
||||
let timeline_id = req.timeline_id;
|
||||
let node_id = req.safekeeper.skp.id;
|
||||
inner
|
||||
.reconcile_one(req, req_cancel, req_token_id)
|
||||
.reconcile_one(req, req_cancel)
|
||||
.instrument(tracing::info_span!(
|
||||
"reconcile_one",
|
||||
?kind,
|
||||
@@ -304,14 +280,8 @@ impl SafekeeperReconciler {
|
||||
}
|
||||
|
||||
impl SafekeeperReconcilerInner {
|
||||
async fn reconcile_one(
|
||||
&self,
|
||||
req: ScheduleRequest,
|
||||
req_cancel: CancellationToken,
|
||||
req_token_id: TokenId,
|
||||
) {
|
||||
async fn reconcile_one(&self, req: ScheduleRequest, req_cancel: CancellationToken) {
|
||||
let req_host = req.safekeeper.skp.host.clone();
|
||||
let success;
|
||||
match req.kind {
|
||||
SafekeeperTimelineOpKind::Pull => {
|
||||
let Some(timeline_id) = req.timeline_id else {
|
||||
@@ -332,22 +302,19 @@ impl SafekeeperReconcilerInner {
|
||||
tenant_id: req.tenant_id,
|
||||
timeline_id,
|
||||
};
|
||||
success = self
|
||||
.reconcile_inner(
|
||||
&req,
|
||||
async |client| client.pull_timeline(&pull_req).await,
|
||||
|resp| {
|
||||
if let Some(host) = resp.safekeeper_host {
|
||||
tracing::info!("pulled timeline from {host} onto {req_host}");
|
||||
} else {
|
||||
tracing::info!(
|
||||
"timeline already present on safekeeper on {req_host}"
|
||||
);
|
||||
}
|
||||
},
|
||||
req_cancel,
|
||||
)
|
||||
.await;
|
||||
self.reconcile_inner(
|
||||
req,
|
||||
async |client| client.pull_timeline(&pull_req).await,
|
||||
|resp| {
|
||||
if let Some(host) = resp.safekeeper_host {
|
||||
tracing::info!("pulled timeline from {host} onto {req_host}");
|
||||
} else {
|
||||
tracing::info!("timeline already present on safekeeper on {req_host}");
|
||||
}
|
||||
},
|
||||
req_cancel,
|
||||
)
|
||||
.await;
|
||||
}
|
||||
SafekeeperTimelineOpKind::Exclude => {
|
||||
// TODO actually exclude instead of delete here
|
||||
@@ -358,23 +325,22 @@ impl SafekeeperReconcilerInner {
|
||||
);
|
||||
return;
|
||||
};
|
||||
success = self
|
||||
.reconcile_inner(
|
||||
&req,
|
||||
async |client| client.delete_timeline(tenant_id, timeline_id).await,
|
||||
|_resp| {
|
||||
tracing::info!("deleted timeline from {req_host}");
|
||||
},
|
||||
req_cancel,
|
||||
)
|
||||
.await;
|
||||
self.reconcile_inner(
|
||||
req,
|
||||
async |client| client.delete_timeline(tenant_id, timeline_id).await,
|
||||
|_resp| {
|
||||
tracing::info!("deleted timeline from {req_host}");
|
||||
},
|
||||
req_cancel,
|
||||
)
|
||||
.await;
|
||||
}
|
||||
SafekeeperTimelineOpKind::Delete => {
|
||||
let tenant_id = req.tenant_id;
|
||||
if let Some(timeline_id) = req.timeline_id {
|
||||
success = self
|
||||
let deleted = self
|
||||
.reconcile_inner(
|
||||
&req,
|
||||
req,
|
||||
async |client| client.delete_timeline(tenant_id, timeline_id).await,
|
||||
|_resp| {
|
||||
tracing::info!("deleted timeline from {req_host}");
|
||||
@@ -382,13 +348,13 @@ impl SafekeeperReconcilerInner {
|
||||
req_cancel,
|
||||
)
|
||||
.await;
|
||||
if success {
|
||||
if deleted {
|
||||
self.delete_timeline_from_db(tenant_id, timeline_id).await;
|
||||
}
|
||||
} else {
|
||||
success = self
|
||||
let deleted = self
|
||||
.reconcile_inner(
|
||||
&req,
|
||||
req,
|
||||
async |client| client.delete_tenant(tenant_id).await,
|
||||
|_resp| {
|
||||
tracing::info!(%tenant_id, "deleted tenant from {req_host}");
|
||||
@@ -396,21 +362,12 @@ impl SafekeeperReconcilerInner {
|
||||
req_cancel,
|
||||
)
|
||||
.await;
|
||||
if success {
|
||||
if deleted {
|
||||
self.delete_tenant_timelines_from_db(tenant_id).await;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
if success {
|
||||
self.ongoing_tokens.remove_if(
|
||||
&(req.tenant_id, req.timeline_id),
|
||||
|_ttid, (_cancel, token_id)| {
|
||||
// Ensure that this request is indeed the request we just finished and not a new one
|
||||
req_token_id == *token_id
|
||||
},
|
||||
);
|
||||
}
|
||||
}
|
||||
async fn delete_timeline_from_db(&self, tenant_id: TenantId, timeline_id: TimelineId) {
|
||||
match self
|
||||
@@ -464,10 +421,10 @@ impl SafekeeperReconcilerInner {
|
||||
self.delete_timeline_from_db(tenant_id, timeline_id).await;
|
||||
}
|
||||
}
|
||||
/// Returns whether the reconciliation happened successfully (or we got cancelled)
|
||||
/// Returns whether the reconciliation happened successfully
|
||||
async fn reconcile_inner<T, F, U>(
|
||||
&self,
|
||||
req: &ScheduleRequest,
|
||||
req: ScheduleRequest,
|
||||
closure: impl Fn(SafekeeperClient) -> F,
|
||||
log_success: impl FnOnce(T) -> U,
|
||||
req_cancel: CancellationToken,
|
||||
|
||||
@@ -1274,8 +1274,6 @@ class NeonEnv:
|
||||
|
||||
if self.pageserver_virtual_file_io_engine is not None:
|
||||
ps_cfg["virtual_file_io_engine"] = self.pageserver_virtual_file_io_engine
|
||||
if self.pageserver_virtual_file_io_mode is not None:
|
||||
ps_cfg["virtual_file_io_mode"] = self.pageserver_virtual_file_io_mode
|
||||
if config.pageserver_default_tenant_config_compaction_algorithm is not None:
|
||||
tenant_config = ps_cfg.setdefault("tenant_config", {})
|
||||
tenant_config["compaction_algorithm"] = (
|
||||
|
||||
@@ -111,13 +111,6 @@ DEFAULT_PAGESERVER_ALLOWED_ERRORS = (
|
||||
".*stalling layer flushes for compaction backpressure.*",
|
||||
".*layer roll waiting for flush due to compaction backpressure.*",
|
||||
".*BatchSpanProcessor.*",
|
||||
*(
|
||||
[
|
||||
r".*your platform is not a supported production platform, ignoing request for O_DIRECT; this could hide alignment bugs.*"
|
||||
]
|
||||
if sys.platform != "linux"
|
||||
else []
|
||||
),
|
||||
)
|
||||
|
||||
|
||||
|
||||
Reference in New Issue
Block a user