Compare commits

..

14 Commits

Author SHA1 Message Date
Conrad Ludgate
0b001a0001 proxy: remove connections on shutdown (#5051)
## Problem

On shutdown, proxy connections are staying open.

## Summary of changes

Remove the connections on shutdown
2023-08-21 19:20:58 +01:00
Felix Prasanna
4a8bd866f6 bump vm-builder version to v0.16.3 (#5055)
This change to autoscaling allows agents to connect directly to the
monitor, completely removing the informant.
2023-08-21 13:29:16 -04:00
John Spray
615a490239 pageserver: refactor Tenant/Timeline args into structs (#5053)
## Problem

There are some common types that we pass into tenants and timelines as
we construct them, such as remote storage and the broker client.
Currently the list is small, but this is likely to grow -- the deletion
queue PR (#4960) pushed some methods to the point of clippy complaining
they had too many args, because of the extra deletion queue client being
passed around.

There are some shared objects that currently aren't passed around
explicitly because they use a static `once_cell` (e.g.
CONCURRENT_COMPACTIONS), but as we add more resource management and
concurreny control over time, it will be more readable & testable to
pass a type around in the respective Resources object, rather than to
coordinate via static objects. The `Resources` structures in this PR
will make it easier to add references to central coordination functions,
without having to rely on statics.

## Summary of changes

- For `Tenant`, the `broker_client` and `remote_storage` are bundled
into `TenantSharedResources`
- For `Timeline`, the `remote_client` is wrapped into
`TimelineResources`.

Both of these structures will get an additional deletion queue member in
#4960.
2023-08-21 17:30:28 +01:00
John Spray
b95addddd5 pageserver: do not read redundant timeline_layers from IndexPart, so that we can remove it later (#4972)
## Problem

IndexPart contains two redundant lists of layer names: a set of the
names, and then a map of name to metadata.

We already required that all the layers in `timeline_layers` are also in
`layers_metadata`, in `initialize_with_current_remote_index_part`, so if
there were any index_part.json files in the field that relied on these
sets being different, they would already be broken.

## Summary of changes

`timeline_layers` is made private and no longer read at runtime. It is
still serialized, but not deserialized.

`disk_consistent_lsn` is also made private, as this field only exists
for convenience of humans reading the serialized JSON.

This prepares us to entirely remove `timeline_layers` in a future
release, once this change is fully deployed, and therefore no
pageservers are trying to read the field.
2023-08-21 14:29:36 +03:00
Joonas Koivunen
130ccb4b67 Remove initial timeline id troubles (#5044)
I made a mistake when I adding `env.initial_timeline:
Optional[TimelineId]` in the #3839, should had just generated it and
used it to create a specific timeline. This PR fixes those mistakes, and
some extra calling into psql which must be slower than python field
access.
2023-08-20 12:33:19 +03:00
Dmitry Rodionov
9140a950f4 Resume tenant deletion on attach (#5039)
I'm still a bit nervous about attach -> crash case. But it should work.
(unlike case with timeline). Ideally would be cool to cover this with
test.

This continues tradition of adding bool flags for Tenant::set_stopping.
Probably lifecycle project will help with fixing it.
2023-08-20 12:28:50 +03:00
Arpad Müller
a23b0773f1 Fix DeltaLayer dumping (#5045)
## Problem

Before, DeltaLayer dumping (via `cargo run --release -p pagectl --
print-layer-file` ) would crash as one can't call `Handle::block_on` in
an async executor thread.

## Summary of changes

Avoid the problem by using `DeltaLayerInner::load_keys` to load the keys
into RAM (which we already do during compaction), and then load the
values one by one during dumping.
2023-08-19 00:56:03 +02:00
Joonas Koivunen
368ee6c8ca refactor: failpoint support (#5033)
- move them to pageserver which is the only dependant on the crate fail
- "move" the exported macro to the new module
- support at init time the same failpoints as runtime

Found while debugging test failures and making tests more repeatable by
allowing "exit" from pageserver start via environment variables. Made
those changes to `test_gc_cutoff.py`.

---------

Co-authored-by: Christian Schwarz <christian@neon.tech>
2023-08-19 01:01:44 +03:00
Felix Prasanna
5c6a692cf1 bump VM_BUILDER_VERSION to v0.16.2 (#5031)
A very slight change that allows us to configure the UID of the
neon-postgres cgroup owner. We start postgres in this cgroup so we can
scale it with the cgroups v2 api. Currently, the control plane
overwrites the entrypoint set by `vm-builder`, so `compute_ctl` (and
thus postgres), is not started in the neon-postgres cgroup. Having
`compute_ctl` start postgres in the cgroup should fix this. However, at
the moment appears like it does not have the correct permissions.
Configuring the neon-postgres UID to `postgres` (which is the UID
`compute_ctl` runs under) should hopefully fix this.


See #4920 - the PR to modify `compute_ctl` to start postgres in the
cgorup.
See: neondatabase/autoscaling#480, neondatabase/autoscaling#477. Both
these PR's are part of an effort to increase `vm-builder`'s
configurability and allow us to adjust it as we integrate in the
monitor.
2023-08-18 14:29:20 -04:00
Dmitry Rodionov
30888a24d9 Avoid flakiness in test_timeline_delete_fail_before_local_delete (#5032)
The problem was that timeline detail can return timelines in not only
active state. And by the time request comes timeline deletion can still
be in progress if we're unlucky (test execution happened to be slower
for some reason)

Reference for failed test run

https://neon-github-public-dev.s3.amazonaws.com/reports/pr-5022/5891420105/index.html#suites/f588e0a787c49e67b29490359c589fae/dab036e9bd673274

The error was `Exception: detail succeeded (it should return 404)`

reported by @koivunej
2023-08-18 20:49:11 +03:00
Dmitry Rodionov
f6c671c140 resume timeline deletions on attach (#5030)
closes [#5036](https://github.com/neondatabase/neon/issues/5036)
2023-08-18 20:48:33 +03:00
Christian Schwarz
ed5bce7cba rfcs: archive my MVCC S3 Notion Proposal (#5040)
This is a copy from the [original Notion
page](https://www.notion.so/neondatabase/Proposal-Pageserver-MVCC-S3-Storage-8a424c0c7ec5459e89d3e3f00e87657c?pvs=4),
taken on 2023-08-16.

This is for archival mostly.
The RFC that we're likely to go with is
https://github.com/neondatabase/neon/pull/4919.
2023-08-18 19:34:29 +02:00
Christian Schwarz
7a63685cde simplify page-caching of EphemeralFile (#4994)
(This PR is the successor of https://github.com/neondatabase/neon/pull/4984 )

## Summary

The current way in which `EphemeralFile` uses `PageCache` complicates
the Pageserver code base to a degree that isn't worth it.
This PR refactors how we cache `EphemeralFile` contents, by exploiting
the append-only nature of `EphemeralFile`.

The result is that `PageCache` only holds `ImmutableFilePage` and
`MaterializedPage`.
These types of pages are read-only and evictable without write-back.
This allows us to remove the writeback code from `PageCache`, also
eliminating an entire failure mode.

Futher, many great open-source libraries exist to solve the problem of a
read-only cache,
much better than our `page_cache.rs` (e.g., better replacement policy,
less global locking).
With this PR, we can now explore using them.

## Problem & Analysis

Before this PR, `PageCache` had three types of pages:

* `ImmutableFilePage`: caches Delta / Image layer file contents
* `MaterializedPage`: caches results of Timeline::get (page
materialization)
* `EphemeralPage`: caches `EphemeralFile` contents

`EphemeralPage` is quite different from `ImmutableFilePage` and
`MaterializedPage`:

* Immutable and materialized pages are for the acceleration of (future)
reads of the same data using `PAGE_CACHE_SIZE * PAGE_SIZE` bytes of
DRAM.
* Ephemeral pages are a write-back cache of `EphemeralFile` contents,
i.e., if there is pressure in the page cache, we spill `EphemeralFile`
contents to disk.

`EphemeralFile` is only used by `InMemoryLayer`, for the following
purposes:
* **write**: when filling up the `InMemoryLayer`, via `impl BlobWriter
for EphemeralFile`
* **read**: when doing **page reconstruction** for a page@lsn that isn't
written to disk
* **read**: when writing L0 layer files, we re-read the `InMemoryLayer`
and put the contents into the L0 delta writer
(**`create_delta_layer`**). This happens every 10min or when
InMemoryLayer reaches 256MB in size.

The access patterns of the `InMemoryLayer` use case are as follows:

* **write**: via `BlobWriter`, strictly append-only
* **read for page reconstruction**: via `BlobReader`, random
* **read for `create_delta_layer`**: via `BlobReader`, dependent on
data, but generally random. Why?
* in classical LSM terms, this function is what writes the
memory-resident `C0` tree into the disk-resident `C1` tree
* in our system, though, the values of InMemoryLayer are stored in an
EphemeralFile, and hence they are not guaranteed to be memory-resident
* the function reads `Value`s in `Key, LSN` order, which is `!=` insert
order

What do these `EphemeralFile`-level access patterns mean for the page
cache?

* **write**:
* the common case is that `Value` is a WAL record, and if it isn't a
full-page-image WAL record, then it's smaller than `PAGE_SIZE`
* So, the `EphemeralPage` pages act as a buffer for these `< PAGE_CACHE`
sized writes.
* If there's no page cache eviction between subsequent
`InMemoryLayer::put_value` calls, the `EphemeralPage` is still resident,
so the page cache avoids doing a `write` system call.
* In practice, a busy page server will have page cache evictions because
we only configure 64MB of page cache size.
* **reads for page reconstruction**: read acceleration, just as for the
other page types.
* **reads for `create_delta_layer`**:
* The `Value` reads happen through a `BlockCursor`, which optimizes the
case of repeated reads from the same page.
* So, the best case is that subsequent values are located on the same
page; hence `BlockCursor`s buffer is maximally effective.
* The worst case is that each `Value` is on a different page; hence the
`BlockCursor`'s 1-page-sized buffer is ineffective.
* The best case translates into `256MB/PAGE_SIZE` page cache accesses,
one per page.
    * the worst case translates into `#Values` page cache accesses
* again, the page cache accesses must be assumed to be random because
the `Value`s aren't accessed in insertion order but `Key, LSN` order.

## Summary of changes

Preliminaries for this PR were:

- #5003
- #5004 
- #5005 
  - uncommitted microbenchmark in #5011 

Based on the observations outlined above, this PR makes the following
changes:

* Rip out `EphemeralPage` from `page_cache.rs`
* Move the `block_io::FileId` to `page_cache::FileId`
* Add a `PAGE_SIZE`d buffer to the `EphemeralPage` struct.
  It's called `mutable_tail`.
* Change `write_blob` to use `mutable_tail` for the write buffering
instead of a page cache page.
* if `mutable_tail` is full, it writes it out to disk, zeroes it out,
and re-uses it.
* There is explicitly no double-buffering, so that memory allocation per
`EphemeralFile` instance is fixed.
* Change `read_blob` to return different `BlockLease` variants depending
on `blknum`
* for the `blknum` that corresponds to the `mutable_tail`, return a ref
to it
* Rust borrowing rules prevent `write_blob` calls while refs are
outstanding.
  * for all non-tail blocks, return a page-cached `ImmutablePage`
* It is safe to page-cache these as ImmutablePage because EphemeralFile
is append-only.

## Performance 

How doe the changes above affect performance?
M claim is: not significantly.

* **write path**:
* before this PR, the `EphemeralFile::write_blob` didn't issue its own
`write` system calls.
* If there were enough free pages, it didn't issue *any* `write` system
calls.
* If it had to evict other `EphemeralPage`s to get pages a page for its
writes (`get_buf_for_write`), the page cache code would implicitly issue
the writeback of victim pages as needed.
* With this PR, `EphemeralFile::write_blob` *always* issues *all* of its
*own* `write` system calls.
* Also, the writes are explicit instead of implicit through page cache
write back, which will help #4743
* The perf impact of always doing the writes is the CPU overhead and
syscall latency.
* Before this PR, we might have never issued them if there were enough
free pages.
* We don't issue `fsync` and can expect the writes to only hit the
kernel page cache.
* There is also an advantage in issuing the writes directly: the perf
impact is paid by the tenant that caused the writes, instead of whatever
tenant evicts the `EphemeralPage`.
* **reads for page reconstruction**: no impact.
* The `write_blob` function pre-warms the page cache when it writes the
`mutable_tail` to disk.
* So, the behavior is the same as with the EphemeralPages before this
PR.
* **reads for `create_delta_layer`**: no impact.
  * Same argument as for page reconstruction.
  * Note for the future:
* going through the page cache likely causes read amplification here.
Why?
* Due to the `Key,Lsn`-ordered access pattern, we don't read all the
values in the page before moving to the next page. In the worst case, we
might read the same page multiple times to read different `Values` from
it.
    * So, it might be better to bypass the page cache here.
    * Idea drafts:
      * bypass PS page cache + prefetch pipeline + iovec-based IO
* bypass PS page cache + use `copy_file_range` to copy from ephemeral
file into the L0 delta file, without going through user space
2023-08-18 20:31:03 +03:00
Joonas Koivunen
0a082aee77 test: allow race with flush and stopped queue (#5027)
A lucky race can happen with the shutdown order I guess right now. Seen
in [test_tenant_delete_smoke].

The message is not the greatest to match against.

[test_tenant_delete_smoke]:
https://neon-github-public-dev.s3.amazonaws.com/reports/main/5892262320/index.html#suites/3556ed71f2d69272a7014df6dcb02317/189a0d1245fb5a8c
2023-08-18 19:36:25 +03:00
42 changed files with 1191 additions and 1314 deletions

View File

@@ -780,7 +780,7 @@ jobs:
run:
shell: sh -eu {0}
env:
VM_BUILDER_VERSION: v0.15.4
VM_BUILDER_VERSION: v0.16.3
steps:
- name: Checkout
@@ -801,7 +801,11 @@ jobs:
- name: Build vm image
run: |
./vm-builder -enable-file-cache -src=369495373322.dkr.ecr.eu-central-1.amazonaws.com/compute-node-${{ matrix.version }}:${{needs.tag.outputs.build-tag}} -dst=369495373322.dkr.ecr.eu-central-1.amazonaws.com/vm-compute-node-${{ matrix.version }}:${{needs.tag.outputs.build-tag}}
./vm-builder \
-enable-file-cache \
-enable-monitor \
-src=369495373322.dkr.ecr.eu-central-1.amazonaws.com/compute-node-${{ matrix.version }}:${{needs.tag.outputs.build-tag}} \
-dst=369495373322.dkr.ecr.eu-central-1.amazonaws.com/vm-compute-node-${{ matrix.version }}:${{needs.tag.outputs.build-tag}}
- name: Pushing vm-compute-node image
run: |

View File

@@ -0,0 +1,316 @@
This is a copy from the [original Notion page](https://www.notion.so/neondatabase/Proposal-Pageserver-MVCC-S3-Storage-8a424c0c7ec5459e89d3e3f00e87657c?pvs=4), taken on 2023-08-16.
This is for archival mostly.
The RFC that we're likely to go with is https://github.com/neondatabase/neon/pull/4919.
---
# Proposal: Pageserver MVCC S3 Storage
tl;dr: this proposal enables Control Plane to attach a tenant to a new pageserver without being 100% certain that it has been detached from the old pageserver. This enables us to automate failover if a pageserver dies (no human in the loop).
# Problem Statement
The current Neon architecture requires the Control Plane to guarantee that a tenant is only attached to one pageserver at a time. If a tenant is attached to multiple pageservers simultaneously, the pageservers will overwrite each others changes in S3 for that tenant, resulting in data loss for that tenant.
The above imposes limitations on tenant relocation and future designs for high availability. For instance, Control Plane cannot relocate a tenant to another pageserver before it is 100% certain that the tenant is detached from the source pageserver. If the source pageserver is unresponsive, the tenant detach procedure cannot proceed, and Control Plane has no choice but to wait for either the source to become responsive again, or rely on a node failure detection mechanism to detect that the source pageserver is dead, and give permission to skip the detachment step. Either way, the tenant is unavailable for an extended period, and we have no means to improve it in the current architecture.
Note that there is no 100% correct node failure detection mechanism, and even techniques to accelerate failure detection, such as ********************************shoot-the-other-node-in-the-head,******************************** have their limits. So, we currently rely on humans as node failure detectors: they get alerted via PagerDuty, assess the situation under high stress, and make the decision. If they make the wrong call, or the apparent dead pageserver somehow resurrects later, well have data loss.
Also, by relying on humans, were [incurring needless unscalable toil](https://sre.google/sre-book/eliminating-toil/): as Neon grows, pageserver failures will become more and more frequent because our fleet grows. Each instance will need quick response time to minimize downtime for the affected tenants, which implies higher toil, higher resulting attrition, and/or higher personnel cost.
Lastly, there are foreseeable needs by operation and product such as zero-downtime relocation and automatic failover/HA. For such features, the ability to have a tenant purposefully or accidentally attached to more than one pageserver will greatly reduce risk of data loss, and improve availability.
# High-Level Idea
The core idea is to evolve the per-Tenant S3 state to an MVCC-like scheme, allowing multiple pageservers to operate on the same tenant S3 state without interference. To make changes to S3, pageservers acquire long-running transactions from Control Plane. After opening a transaction, Pageservers make PUTs directly against S3, but they keys include the transaction ID, so overwrites never happen. Periodically, pageservers talk back to Control Plane to commit their transaction. This is where Control Plane enforces strict linearizability, favoring availability over work-conservation: commit is only granted if no transaction started after the one thats requesting commit. Garbage collection is done through deadlists, and its simplified tremendously by above commit grant/reject policy.
Minimal changes are required for safekeepers to allow WAL for a single timeline be consumed by more than one pageserver without premature truncation.
**Above scheme makes it safe to attach tenants without a 100% correct node failure detection mechanism. Further, it makes it safe to interleave tenant-attachment to pageservers, unlocking new capabilities for (internal) product features:**
- **Fast, Zero-Toil Failover on Network Partitions or Instance Failure**: if a pageserver is not reachable (network partition, hardware failure, overload) we want to spread its attached tenants to new pageservers to restore availability, within the range of *seconds*. We cannot afford gracious timeouts to maximize the probability that the unreachable pageserver has ceased writing to S3. This proposal enables us to attach the tenants to the replacement pageservers, and redirect their computes, without having to wait for confirmation that the unreachable pageserver has ceased writing to S3.
- **************************************Zero-Downtime Relocation:************************************** we want to be able to relocate tenants to different pageservers with minimized availability or a latency impact. This proposal enables us to attach the relocating Tenant to the destination Pageserver before detaching it from the source Pageserver. This can help minimize downtime because we can wait for the destination to catch up on WAL processing before redirecting Computes.
# Design
The core idea is to evolve the per-Tenant S3 state to a per-tenant MVCC-like scheme.
To make S3 changes for a given tenant, Pageserver requests a transaction ID from control plane for that tenant. Without a transaction ID, Pageserver does not write to S3.
Once Pageserver received a transaction ID it is allowed to produce new objects and overwrite objects created in this transaction. Pageserver is not allowed to delete any objects; instead, it marks the object as deleted by appending the key to the transactions deadlist for later deletion. Commits of transactions are serialized through Control Plane: when Pageserver wants to commit a transaction, it sends an RPC to Control Plane. Control Plane responds with a commit grant or commit reject message. Commit grant means that the transactions changes are now visible to subsequent transactions. Commit reject means that the transactions changes are not and never will be visible to another Pageserver instance, and the rejected Pageserver is to cease further activity on that tenant.
## ****************************************************Commit grant/reject policy****************************************************
For the purposes of Pageserver, we want **linearizability** of a tenants S3 state. Since our transactions are scoped per tenant, it is sufficient for linearizability to grant commit if and only if no other transaction has been started since the commit-requesting transaction started.
For example, consider the case of a single tenant, attached to Pageserver A. Pageserver A has an open transaction but becomes unresponsive. Control Plane decides to relocate the tenant to another Pageserver B. It need *not* wait for A to be 100%-certainly down before B can start uploading to S3 for that tenant. Instead, B can start a new transaction right away, make progress, and get commit grants; What about A? The transaction is RejectPending in Control Plane until A eventually becomes responsive again, tries to commit, gets a rejection, acknowledges it, and thus its transaction becomes RejectAcknowledge. If A is definitively dead, operator can also force-transition from state RejectPending to RejectAcknowledged. But critically, Control Plane doesnt have for As transaction to become RejectAcknowledge before attaching the tenant to B.
```mermaid
sequenceDiagram
participant CP
participant A
participant S3
participant B
CP -->> A: attach tenant
activate A
A -->> CP: start txn
CP -->> A: txn=23, last_committed_txn=22
Note over CP,A: network partition
CP --x A: heartbeat
CP --x A: heartbeat
Note over CP: relocate tenant to avoid downtime
CP -->> B: attach tenant
activate B
B -->> CP: start txn
Note over CP: mark A's txn 23 as RejectPending
CP -->> B: txn=24, last-committed txn is 22
B -->> S3: PUT X.layer.24<br>PUT index_part.json.24 referencing X.layer.24
B -->> CP: request commit
CP -->> B: granted
B -->> CP: start txn
CP -->> B: txn=25, last_committed_txn=22
A -->> S3: PUT Y.layer.23 <br> PUT index_part.json.23 referencing Y.layer.23
A --x CP: request commit
A --x CP: request commit
Note over CP,A: partition is over
A -->> CP: request commit
Note over CP: most recently started txn is 25, not 23, reject
CP -->> A: reject
A -->> CP: acknowledge reject
Note over CP: mark A's txn 23 as RejectAcknowledged
deactivate A
B -->> S3: PUT 000-FFF_X-Y.layer.**************25**************<br>...
deactivate B
```
If a Pageserver gets a rejection to a commit request, it acknowledges rejection and cedes further S3 uploads for the tenant, until it receives a `/detach` request for the tenant (control plane has most likely attached the tenant to another pageserver in the meantime).
In practice, Control Plane will probably extend the commit grant/reject schema above, taking into account the pageserver to which it last attached the tenant. In the above example, Control Plane could remember that the pageserver that is supposed to host the tenant is pageserver B, and reject start-txn and commit requests from pageserver A. It would also use such requests from A as a signal that A is reachable again, and retry the `/detach` .
<aside>
💡 A commit failure causes the tenant to become effectively `Broken`. Pageserver should persist this locally so it doesnt bother ControlPlane for a new txn when Pageserver is restarted.
</aside>
## ********************Visibility********************
We mentioned earlier that once a transaction commits, its changes are visible to subsequent transactions. But how does a given transaction know where to look for the data? There is no longer a single `index_part.json` per timeline, or a single `timelines/:timeline_id` prefix to look for; theyre all multi-versioned, suffixed by the txn number.
The solution is: at transaction start, Pageserver receives the last-committed transaction ID from Control Plane (`last_committed_txn` in the diagram). last_commited_txn is the upper bound for what is visible for the current transaction. Control Plane keeps track of each open transactions last_committed_txn for purposes of garbage collection (see later paragraph).
Equipped with last_committed_txn, Pageserver then discovers
- the current index part of a timeline at `tenants/:tenant_id/timelines/:timeline_id/index_part.json.$last_committed_txn`. The `index_part.json.$last_committed_txn` has the exact same contents as the current architectures index_part.json, i.e. full list of layers.
- the list of existent timelines as part of the `attach` RPC from CP;
There is no other S3 state per tenant, so, thats all the visibility required.
An alternative to receiving the list of existent timelines from CP is to introduce a proper **********SetOfTimelines********** object in S3, and multi-version it just like above. For example, we could have a `tenants/:tenant_id/timelines.json.$txn` file that references `index_part.json.$last_committed_txn` . It can be added later if more separation between CP and PS is desired.
So, the only MVCCed object types in this proposal are LayerFile and IndexPart (=individual timeline), but not the SetOfTimelines in a given tenant. Is this a problem? For example, the Pageservers garbage collection code needs to know the full set of timelines of a tenant. Otherwise itll make incorrect decisions. What if Pageserver A knows about timelines {R,S}, but another Pageserver B created an additional branch T, so, its set of timelines is {R,S,T}. Both pageservers will run GC code, and so, PS A may decide to delete a layer thats still needed for branch T. Not a problem with this propsoal, because the effect of GC (i.e., layer deletion) is properly MVCCed.
## Longevity Of Transactions & Availability
Pageserver depends on Control Plane to start a new transaction. If ControlPlane is down, no new transactions can be started.
Pageservers commit transactions based on a maximum amount of uncommitted changes that have accumulated in S3. A lower maximum increases dependence and load on ControlPlane which decreases availability. A higher maximum risks losing more work in the event of failover; the work will have to be re-done in a new transaction on the new node.
Pageservers are persist the open txn id in local storage, so that they can resume the transaction after restart, without dependence on Control Plane.
## **Operations**
********PUTs:********
- **layer files**
- current architecture: layer files are supposed to be write-once, but actually, there are edge-cases where we PUT the same layer file name twice; namely if we PUT the file to S3 but crash before uploading the index part that references it; then detach + attach, and re-run compaction, which is non-deterministic.
- this proposal: with transactions, we can now upload layers and index_part.json concurrently, just need to make sure layer file upload is done before we request txn commit.
- **index part** upload: `index_part.json.$txn` may be created and subsequently overwritten multiple times in a transaction; it is an availability/work-loss trade-off how often to request a commit from CP.
**************DELETEs**************: for deletion, we maintain a deadlist per transaction. It is located at `tenants/:tenant_id/deadlist/deadlist.json.$txn`. It is PUT once before the pageserver requests requests commit, and not changed after sending request to commit. An object created in the current txn need not (but can) be on the deadlist — it can be DELETEd immediately because its not visible to other transactions. An example use case would be an L0 layer that gets compacted within one transaction; or, if we ever start MVCCing the set of timelines of a tenant, a short-lived branch that is created & destroyed within one transaction.
<aside>
**Deadlist Invariant:** if a an object is on a deadlist of transaction T, it is not referenced from anywhere else in the full state visible to T or any later started transaction > T.
</aside>
### Rationale For Deadlist.json
Given that this proposal only MVCCs layers and indexparts, one may ask why the deadlist isnt part of indexpart. The reason is to not lose generality: the deadlist is just a list of keys; it is not necessary to understand the data format of the versioned object to process the deadlist. This is important for garbage collection / vacuuming, which well come to in the next section.
## Garbage Collection / Vacuuming
After a transaction has reached reject-acknowledged state, Control Plane initiates a garbage collection procedure for the aborted transaction.
Control Plane is in the unique position about transaction states. Here is a sketch of the exact transaction states and what Control Plane keeps track of.
```
struct Tenant {
...
txns: HashMap<TxnId, Transaction>,
// the most recently started txn's id; only most recently sarted can win
next_winner_txn: Option<TxnId>,
}
struct Transaction {
id: TxnId, // immutable
last_committed_txn: TxnId, // immutable; the most recent txn in state `Committed`
// when self was started
pageserver_id: PageserverId,
state: enum {
Open,
Committed,
RejectPending,
RejectAcknowledged, // invariant: we know all S3 activity has ceded
GarbageCollected,
}
}
```
Object creations & deletions by a rejected transaction have never been visible to other transactions. That is true for both RejectPending and RejectAcknowledged states. The difference is that, in RejectPending, the pageserver may still be uploading to S3, whereas in RejectAcknowledged, Control Plane can be certain that all S3 activity in the name of that transaction has ceded. So, once a transaction reaches state RejectAcknowledged state, it is safe to DELETE all objects created by that transaction, and discard the transactions deadlists.
A transaction T in state Committed has subsequent transactions that may or may not reference the objects it created. None of the subsequent transaction can reference the objects on Ts deadlist, though, as per the Deadlist Invariant (see previous section).
So, for garbage collection, we need to assess transactions in state Committed and RejectAcknowledged:
- Commited: delete objects on the deadlist.
- We dont need a LIST request here, the deadlist is sufficient. So, its really cheap.
- This is **not true MVCC garbage collection**; by deleting the objects on Committed transaction T s deadlist, we might delete data referenced by other transactions that were concurrent with T, i.e., they started while T was still open. However, the fact that T is committed means that the other transactions are RejectPending or RejectAcknowledged, so, they dont matter. Pageservers executing these doomed RejectPending transactions must handle 404 for GETs gracefully, e.g., by trying to commit txn so they observe the rejection theyre destined to get anyways. 404s for RejectAcknowledged is handled below.
- RejectAcknowledged: delete all objects created in that txn, and discard deadlists.
- 404s / object-already-deleted type messages must be expected because of Committed garbage collection (see above)
- How to get this list of objects created in a txn? Open but solvable design question; Ideas:
- **Brute force**: within tenant prefix, search for all keys ending in `.$txn` and delete them.
- **WAL for PUTs**: before a txn PUTs an object, it logs to S3, or some other equivalently durable storage, that its going to do it. If we log to S3, this means we have to do an additional WAL PUT per “readl” PUT.
- ******************************LIST with reorged S3 layout (preferred one right now):****************************** layout S3 key space such that `$txn` comes first, i.e., `tenants/:tenant_id/$txn/timelines/:timeline_id/*.json.$txn` . That way, when we need to GC a RejectAcknowledged txn, we just LIST the entire `tenants/:tenant_id/$txn` prefix and delete it. The cost of GC for RejectAcknowledged transactions is thus proportional to the number of objects created in that transaction.
## Branches
This proposal only MVCCs layer files and and index_part.json, but leaves the tenant object not-MVCCed. We argued earlier that its fine to ignore this for now, because
1. Control Plane can act as source-of-truth for the set of timelines, and
2. The only operation that makes decision based on “set of timelines” is GC, which in turn only does layer deletions, and layer deletions ***are*** properly MVCCed.
Now that weve introduced garbage collection, lets elaborate a little more on (2). Recall our example from earlier: Pageserver A knows about timelines {R,S}, but another Pageserver B created an additional branch T, so, its set of timelines is {R,S,T}. Both pageservers will run GC code, and so, PS A may decide to delete a layer thats still needed for branch T.
How does the MVCCing of layer files protect us here? If A decides to delete that layer, its just on As transactions deadlist, but still present in S3 and usable by B. If A commits first, B wont be able to commit and the layers in timeline T will be vacuumed. If B commits first, As deadlist is discarded and the layer continues to exist.
## Safekeeper Changes
We need to teach the safekeepers that there can be multiple pageservers requesting WAL for the same timeline, in order to prevent premature WAL truncation.
In the current architecture, the Safekeeper service currently assumes only one Pageserver and is allowed to prune WAL older than that Pageservers `remote_consistent_lsn`. Safekeeper currently learns the `remote_consistent_lsn` through the walreceiver protocol.
So, if we have a tenant attached to two pageservers at the same time, they will both try to stream WAL and the Safekeeper will get confused about which connections `remote_consistent_lsn` to use as a basis for WAL pruning.
What do we need to change to make it work? We need to make sure that the Safekeepers only prune WAL up to the `remote_consistent_lsn` of the last-committed transaction.
The straight-forward way to get it is to re-design WAL pruning as follows:
1. Pageserver reports remote_consistent_lsn as part of transaction commit to Control Plane.
2. Control Plane makes sure transaction state update is persisted.
3. Control Plane (asynchronous to transaction commit) reconciles with Safekeepers to ensure WAL pruning happens.
The above requires non-trivial changes, but, in the light of other planned projects such as restore-tenant-from-safekeeper-wal-backups, I think Control Plane will need to get involved in WAL pruning anyways.
# How This Proposal Unlocks Future Features
Let us revisit the example from the introduction where we were thinking about handling network partitions. Network partitions need to be solved first, because theyre unavoidable in distributed systems. We did that. Now lets see how we can solve actual product problems:
## **Fast, Zero-Toil Failover on Network Partitions or Instance Failure**
The “Problem Statement” section outlined the current architectures problems with regards to network partitions or instance failure: it requires a 100% correct node-dead detector to make decisions, which doesnt exist in reality. We rely instead on human toil: an oncall engineer has to inspect the situation and make a decision, which may be incorrect and in any case take time in the order of minutes, which means equivalent downtime for users.
With this proposal, automatic failover for pageservers is trivial:
If a pageserver is unresponsive from Control Planes / Computes perspective, Control Plane does the following:
- attach all tenants of the unresponsive pageserver to new pageservers
- switch over these tenants computes immediately;
At this point, availability is restored and user pain relieved.
Whats left is to somehow close the doomed transaction of the unresponsive pageserver, so that it beomes RejectAcknowledged, and GC can make progress. Since S3 is cheap, we can afford to wait a really long time here, especially if we put a soft bound on the amount of data a transaction may produce before it must commit. Procedure:
1. Ensure the unresponsive pageserver is taken out of rotation for new attachments. That probably should happen as part of the routine above.
2. Make a human operator investigate decide what to do (next morning, NO ONCALL ALERT):
1. Inspect the instance, investigate logs, understand root cause.
2. Try to re-establish connectivity between pageserver and Control Plane so that pageserver can retry commits, get rejected, ack rejection ⇒ enable GC.
3. Use below procedure to decomission pageserver.
### Decomissioning A Pageserver (Dead or Alive-but-Unrespsonive)
The solution, enabled by this proposal:
1. Ensure that pageservers S3 credentials are revoked so that it cannot make new uploads, which wouldnt be tracked anywhere.
2. Let enough time pass for the S3 credential revocation to propagate. Amazon doesnt give a guarantee here. As stated earlier, we can easily afford to wait here.
3. Mark all Open and RejectPending transactions of that pageserver as RejectAcknowledge.
Revocation of the S3 credentials is required so that, once we transition all the transactions of that pageserver to RejectAcknowledge, once garbage-collection pass is guaranteed to delete all objects that will ever exist for that pageserver. That way, we need not check *****GarbageCollected***** transactions every again.
## Workflow: Zero-Downtime Relocation
With zero-downtime relocation, the goal is to have the target pageserver warmed up, i.e., at the same `last_record_lsn` as the source pageserver, before switching over Computes from source to target pageserver.
With this proposal, it works like so:
1. Grant source pageserver its last open transaction. This one is doomed to be rejected later, unless the relocation fails.
2. Grant target pageserver its first open transaction.
3. Have target pageserver catch up on WAL, streaming from last-committed-txns remote_consistent_lsn onwards.
4. Once target pageserver reports `last_record_lsn` close enough to source pageserver, target pageserver requests commit.
5. Drain compute traffic from source to target pageserver. (Source can still answer requests until it tries to commit and gets reject, so, this will be quite smooth).
Note that as soon as we complete step (4), the source pageservers transaction is doomed to be rejected later. Conversely, if the target cant catch up fast enough, the source will make a transaction commit earlier. This will generally happen if there is a lot of write traffic coming in. The design space to make thing smooth here is large, but well explored in other areas of computing, e.g., VM live migration. We have all the important policy levers at hand, e.g.,
- delaying source commits if we see target making progress
- slowing down source consumption (need some signalling mechanism for it)
- slowing down compute wal generation
-
It doesnt really matter, whats important is that two pageservers can overlap.
# Additional Trade-Offs / Remarks Brought Up During Peer Review
This proposal was read by and discussed @Stas and @Dmitry Rodionov prior to publishing it with the broader team. (This does not mean they endorse this proposal!).
Issues that we discussed:
1. **Frequency of transactions:** If even idle tenants commit every 10min or so, thats quite a lot of load on Control Plane. Can we minimize it by Equating Transaction Commit Period to Attachment Period? I.e. start txn on attach, commit on detach?
1. Would be nice, but, if a tenant is attached for 1 month, then PS dies, we lose 1 month of work.
2. ⇒ my solution to this problem: Adjusted this proposal to make transaction commit frequency proportional to amount of uncommitted data.
1. Its ok to spend resources on active users, they pay us money to do it!
2. The amount of work per transaction is minimal.
1. In current Control Plane, its a small database transaction that is super unlikely to conflict with other transactions.
2. I have very little concerns about scalability of the commit workload on CP side because it's trivially horizontally scalable by sharding by tenant.
3. There's no super stringent availability requirement on control plane; if a txn can't commit because it can't reach the CP, PS can continue & retry in the background, speculating that it's CP downtime and not PS-partitioned-off scenario.
4. Without stringent availability requirement, there's flexibility for future changes to CP-side-implementation.
2. ************************************************Does this proposal address mirroring / no-performance-degradation failover ?************************************************
1. No it doesnt. It only provides the building block for attaching a tenant to a new pageserver without having to worry that the tenant is detached on the old pageserver.
2. A simple scheme to build no-performance-degradation failover on top of this proposal is to have an asynchronous read-only replica of a tenant on another pageserver in the same region.
3. Another more ambitious scheme to get no-performance-degradation would be [One-Pager: Layer File Spreading (Christian)](https://www.notion.so/One-Pager-Layer-File-Spreading-Christian-eb6b64182a214e11b3fceceee688d843?pvs=21); this proposal would be used in layer file spreading for risk-free automation of TenantLeader failover, which hasnt been addressed Ithere.
4. In any way, failover would restart from an older S3 state, and need to re-ingest WAL before being able to server recently written pages.
1. Is that a show-stopper? I think not.
2. Is it suboptimal? Absolutely: if a pageserver instance fails, all its tenants will be distributed among the remaining pageservers (OK), and all these tenants will ask the safekeepers for WAL at the same time (BAD). So, pageserver instance failure will cause a load spike in safekeepers.
1. Personally I think thats an OK trade-off to make.
2. There are countless options to avoid / mitigate the load spike. E.g., pro-actively streaming WAL to the standby read-only replica.
3. ********************************************Does this proposal allow multiple writers for a tenant?********************************************
1. In abstract terms, this proposal provides a linearized history for a given S3 prefix.
2. In concrete terms, this proposal provides a linearized history per tenant.
3. There can be multiple writers at a given time, but only one of them will win to become part of the linearized history.
4. ************************************************************************************Alternative ideas mentioned during meetings that should be turned into a written prospoal like this one:************************************************************************************
1. @Dmitry Rodionov : having linearized storage of index_part.json in some database that allows serializable transactions / atomic compare-and-swap PUT
2. @Dmitry Rodionov :
3. @Stas : something like this scheme, but somehow find a way to equate attachment duration with transaction duration, without losing work if pageserver dies months after attachment.

View File

@@ -68,44 +68,6 @@ pub mod completion;
/// Reporting utilities
pub mod error;
mod failpoint_macro_helpers {
/// use with fail::cfg("$name", "return(2000)")
///
/// The effect is similar to a "sleep(2000)" action, i.e. we sleep for the
/// specified time (in milliseconds). The main difference is that we use async
/// tokio sleep function. Another difference is that we print lines to the log,
/// which can be useful in tests to check that the failpoint was hit.
#[macro_export]
macro_rules! failpoint_sleep_millis_async {
($name:literal) => {{
// If the failpoint is used with a "return" action, set should_sleep to the
// returned value (as string). Otherwise it's set to None.
let should_sleep = (|| {
::fail::fail_point!($name, |x| x);
::std::option::Option::None
})();
// Sleep if the action was a returned value
if let ::std::option::Option::Some(duration_str) = should_sleep {
$crate::failpoint_sleep_helper($name, duration_str).await
}
}};
}
// Helper function used by the macro. (A function has nicer scoping so we
// don't need to decorate everything with "::")
pub async fn failpoint_sleep_helper(name: &'static str, duration_str: String) {
let millis = duration_str.parse::<u64>().unwrap();
let d = std::time::Duration::from_millis(millis);
tracing::info!("failpoint {:?}: sleeping for {:?}", name, d);
tokio::time::sleep(d).await;
tracing::info!("failpoint {:?}: sleep done", name);
}
}
pub use failpoint_macro_helpers::failpoint_sleep_helper;
/// This is a shortcut to embed git sha into binaries and avoid copying the same build script to all packages
///
/// we have several cases:

View File

@@ -6,11 +6,12 @@ use std::{env, ops::ControlFlow, path::Path, str::FromStr};
use anyhow::{anyhow, Context};
use clap::{Arg, ArgAction, Command};
use fail::FailScenario;
use metrics::launch_timestamp::{set_launch_timestamp_metric, LaunchTimestamp};
use pageserver::disk_usage_eviction_task::{self, launch_disk_usage_global_eviction_task};
use pageserver::metrics::{STARTUP_DURATION, STARTUP_IS_LOADING};
use pageserver::task_mgr::WALRECEIVER_RUNTIME;
use pageserver::tenant::TenantSharedResources;
use remote_storage::GenericRemoteStorage;
use tokio::time::Instant;
use tracing::*;
@@ -121,7 +122,7 @@ fn main() -> anyhow::Result<()> {
}
// Initialize up failpoints support
let scenario = FailScenario::setup();
let scenario = pageserver::failpoint_support::init();
// Basic initialization of things that don't change after startup
virtual_file::init(conf.max_file_descriptors);
@@ -382,8 +383,10 @@ fn start_pageserver(
BACKGROUND_RUNTIME.block_on(mgr::init_tenant_mgr(
conf,
broker_client.clone(),
remote_storage.clone(),
TenantSharedResources {
broker_client: broker_client.clone(),
remote_storage: remote_storage.clone(),
},
order,
))?;

View File

@@ -0,0 +1,86 @@
/// use with fail::cfg("$name", "return(2000)")
///
/// The effect is similar to a "sleep(2000)" action, i.e. we sleep for the
/// specified time (in milliseconds). The main difference is that we use async
/// tokio sleep function. Another difference is that we print lines to the log,
/// which can be useful in tests to check that the failpoint was hit.
#[macro_export]
macro_rules! __failpoint_sleep_millis_async {
($name:literal) => {{
// If the failpoint is used with a "return" action, set should_sleep to the
// returned value (as string). Otherwise it's set to None.
let should_sleep = (|| {
::fail::fail_point!($name, |x| x);
::std::option::Option::None
})();
// Sleep if the action was a returned value
if let ::std::option::Option::Some(duration_str) = should_sleep {
$crate::failpoint_support::failpoint_sleep_helper($name, duration_str).await
}
}};
}
pub use __failpoint_sleep_millis_async as sleep_millis_async;
// Helper function used by the macro. (A function has nicer scoping so we
// don't need to decorate everything with "::")
#[doc(hidden)]
pub(crate) async fn failpoint_sleep_helper(name: &'static str, duration_str: String) {
let millis = duration_str.parse::<u64>().unwrap();
let d = std::time::Duration::from_millis(millis);
tracing::info!("failpoint {:?}: sleeping for {:?}", name, d);
tokio::time::sleep(d).await;
tracing::info!("failpoint {:?}: sleep done", name);
}
pub fn init() -> fail::FailScenario<'static> {
// The failpoints lib provides support for parsing the `FAILPOINTS` env var.
// We want non-default behavior for `exit`, though, so, we handle it separately.
//
// Format for FAILPOINTS is "name=actions" separated by ";".
let actions = std::env::var("FAILPOINTS");
if actions.is_ok() {
std::env::remove_var("FAILPOINTS");
} else {
// let the library handle non-utf8, or nothing for not present
}
let scenario = fail::FailScenario::setup();
if let Ok(val) = actions {
val.split(';')
.enumerate()
.map(|(i, s)| s.split_once('=').ok_or((i, s)))
.for_each(|res| {
let (name, actions) = match res {
Ok(t) => t,
Err((i, s)) => {
panic!(
"startup failpoints: missing action on the {}th failpoint; try `{s}=return`",
i + 1,
);
}
};
if let Err(e) = apply_failpoint(name, actions) {
panic!("startup failpoints: failed to apply failpoint {name}={actions}: {e}");
}
});
}
scenario
}
pub(crate) fn apply_failpoint(name: &str, actions: &str) -> Result<(), String> {
if actions == "exit" {
fail::cfg_callback(name, exit_failpoint)
} else {
fail::cfg(name, actions)
}
}
#[inline(never)]
fn exit_failpoint() {
tracing::info!("Exit requested by failpoint");
std::process::exit(1);
}

View File

@@ -517,7 +517,6 @@ async fn timeline_delete_handler(
.instrument(info_span!("timeline_delete", %tenant_id, %timeline_id))
.await?;
// FIXME: needs to be an error for console to retry it. Ideally Accepted should be used and retried until 404.
json_response(StatusCode::ACCEPTED, ())
}
@@ -980,14 +979,7 @@ async fn failpoints_handler(
// We recognize one extra "action" that's not natively recognized
// by the failpoints crate: exit, to immediately kill the process
let cfg_result = if fp.actions == "exit" {
fail::cfg_callback(fp.name, || {
info!("Exit requested by failpoint");
std::process::exit(1);
})
} else {
fail::cfg(fp.name, &fp.actions)
};
let cfg_result = crate::failpoint_support::apply_failpoint(&fp.name, &fp.actions);
if let Err(err_msg) = cfg_result {
return Err(ApiError::BadRequest(anyhow!(

View File

@@ -21,6 +21,8 @@ pub mod walingest;
pub mod walrecord;
pub mod walredo;
pub mod failpoint_support;
use std::path::Path;
use crate::task_mgr::TaskKind;

View File

@@ -10,6 +10,42 @@
//! PostgreSQL buffer size, and a Slot struct for each buffer to contain
//! information about what's stored in the buffer.
//!
//! # Types Of Pages
//!
//! [`PageCache`] only supports immutable pages.
//! Hence there is no need to worry about coherency.
//!
//! Two types of pages are supported:
//!
//! * **Materialized pages**, filled & used by page reconstruction
//! * **Immutable File pages**, filled & used by [`crate::tenant::block_io`] and [`crate::tenant::ephemeral_file`].
//!
//! Note that [`crate::tenant::ephemeral_file::EphemeralFile`] is generally mutable, but, it's append-only.
//! It uses the page cache only for the blocks that are already fully written and immutable.
//!
//! # Filling The Page Cache
//!
//! Page cache maps from a cache key to a buffer slot.
//! The cache key uniquely identifies the piece of data that is being cached.
//!
//! The cache key for **materialized pages** is [`TenantId`], [`TimelineId`], [`Key`], and [`Lsn`].
//! Use [`PageCache::memorize_materialized_page`] and [`PageCache::lookup_materialized_page`] for fill & access.
//!
//! The cache key for **immutable file** pages is [`FileId`] and a block number.
//! Users of page cache that wish to page-cache an arbitrary (immutable!) on-disk file do the following:
//! * Have a mechanism to deterministically associate the on-disk file with a [`FileId`].
//! * Get a [`FileId`] using [`next_file_id`].
//! * Use the mechanism to associate the on-disk file with the returned [`FileId`].
//! * Use [`PageCache::read_immutable_buf`] to get a [`ReadBufResult`].
//! * If the page was already cached, it'll be the [`ReadBufResult::Found`] variant that contains
//! a read guard for the page. Just use it.
//! * If the page was not cached, it'll be the [`ReadBufResult::NotFound`] variant that contains
//! a write guard for the page. Fill the page with the contents of the on-disk file.
//! Then call [`PageWriteGuard::mark_valid`] to mark the page as valid.
//! Then try again to [`PageCache::read_immutable_buf`].
//! Unless there's high cache pressure, the page should now be cached.
//! (TODO: allow downgrading the write guard to a read guard to ensure forward progress.)
//!
//! # Locking
//!
//! There are two levels of locking involved: There's one lock for the "mapping"
@@ -40,20 +76,18 @@ use std::{
collections::{hash_map::Entry, HashMap},
convert::TryInto,
sync::{
atomic::{AtomicU8, AtomicUsize, Ordering},
atomic::{AtomicU64, AtomicU8, AtomicUsize, Ordering},
RwLock, RwLockReadGuard, RwLockWriteGuard, TryLockError,
},
};
use anyhow::Context;
use once_cell::sync::OnceCell;
use tracing::error;
use utils::{
id::{TenantId, TimelineId},
lsn::Lsn,
};
use crate::tenant::{block_io, ephemeral_file, writeback_ephemeral_file};
use crate::{metrics::PageCacheSizeMetrics, repository::Key};
static PAGE_CACHE: OnceCell<PageCache> = OnceCell::new();
@@ -87,6 +121,17 @@ pub fn get() -> &'static PageCache {
pub const PAGE_SZ: usize = postgres_ffi::BLCKSZ as usize;
const MAX_USAGE_COUNT: u8 = 5;
/// See module-level comment.
#[derive(Debug, Copy, Clone, PartialEq, Eq, Hash)]
pub struct FileId(u64);
static NEXT_ID: AtomicU64 = AtomicU64::new(1);
/// See module-level comment.
pub fn next_file_id() -> FileId {
FileId(NEXT_ID.fetch_add(1, Ordering::Relaxed))
}
///
/// CacheKey uniquely identifies a "thing" to cache in the page cache.
///
@@ -97,12 +142,8 @@ enum CacheKey {
hash_key: MaterializedPageHashKey,
lsn: Lsn,
},
EphemeralPage {
file_id: ephemeral_file::FileId,
blkno: u32,
},
ImmutableFilePage {
file_id: block_io::FileId,
file_id: FileId,
blkno: u32,
},
}
@@ -128,7 +169,6 @@ struct Slot {
struct SlotInner {
key: Option<CacheKey>,
buf: &'static mut [u8; PAGE_SZ],
dirty: bool,
}
impl Slot {
@@ -177,9 +217,7 @@ pub struct PageCache {
/// can have a separate mapping map, next to this field.
materialized_page_map: RwLock<HashMap<MaterializedPageHashKey, Vec<Version>>>,
ephemeral_page_map: RwLock<HashMap<(ephemeral_file::FileId, u32), usize>>,
immutable_page_map: RwLock<HashMap<(block_io::FileId, u32), usize>>,
immutable_page_map: RwLock<HashMap<(FileId, u32), usize>>,
/// The actual buffers with their metadata.
slots: Box<[Slot]>,
@@ -258,14 +296,6 @@ impl PageWriteGuard<'_> {
);
self.valid = true;
}
pub fn mark_dirty(&mut self) {
// only ephemeral pages can be dirty ATM.
assert!(matches!(
self.inner.key,
Some(CacheKey::EphemeralPage { .. })
));
self.inner.dirty = true;
}
}
impl Drop for PageWriteGuard<'_> {
@@ -280,7 +310,6 @@ impl Drop for PageWriteGuard<'_> {
let self_key = self.inner.key.as_ref().unwrap();
PAGE_CACHE.get().unwrap().remove_mapping(self_key);
self.inner.key = None;
self.inner.dirty = false;
}
}
}
@@ -388,62 +417,16 @@ impl PageCache {
Ok(())
}
// Section 1.2: Public interface functions for working with Ephemeral pages.
// Section 1.2: Public interface functions for working with immutable file pages.
pub fn read_ephemeral_buf(
&self,
file_id: ephemeral_file::FileId,
blkno: u32,
) -> anyhow::Result<ReadBufResult> {
let mut cache_key = CacheKey::EphemeralPage { file_id, blkno };
self.lock_for_read(&mut cache_key)
}
pub fn write_ephemeral_buf(
&self,
file_id: ephemeral_file::FileId,
blkno: u32,
) -> anyhow::Result<WriteBufResult> {
let cache_key = CacheKey::EphemeralPage { file_id, blkno };
self.lock_for_write(&cache_key)
}
/// Immediately drop all buffers belonging to given file, without writeback
pub fn drop_buffers_for_ephemeral(&self, drop_file_id: ephemeral_file::FileId) {
for slot_idx in 0..self.slots.len() {
let slot = &self.slots[slot_idx];
let mut inner = slot.inner.write().unwrap();
if let Some(key) = &inner.key {
match key {
CacheKey::EphemeralPage { file_id, blkno: _ } if *file_id == drop_file_id => {
// remove mapping for old buffer
self.remove_mapping(key);
inner.key = None;
inner.dirty = false;
}
_ => {}
}
}
}
}
// Section 1.3: Public interface functions for working with immutable file pages.
pub fn read_immutable_buf(
&self,
file_id: block_io::FileId,
blkno: u32,
) -> anyhow::Result<ReadBufResult> {
pub fn read_immutable_buf(&self, file_id: FileId, blkno: u32) -> anyhow::Result<ReadBufResult> {
let mut cache_key = CacheKey::ImmutableFilePage { file_id, blkno };
self.lock_for_read(&mut cache_key)
}
/// Immediately drop all buffers belonging to given file, without writeback
pub fn drop_buffers_for_immutable(&self, drop_file_id: block_io::FileId) {
/// Immediately drop all buffers belonging to given file
pub fn drop_buffers_for_immutable(&self, drop_file_id: FileId) {
for slot_idx in 0..self.slots.len() {
let slot = &self.slots[slot_idx];
@@ -456,7 +439,6 @@ impl PageCache {
// remove mapping for old buffer
self.remove_mapping(key);
inner.key = None;
inner.dirty = false;
}
_ => {}
}
@@ -534,10 +516,6 @@ impl PageCache {
CacheKey::MaterializedPage { .. } => {
unreachable!("Materialized pages use lookup_materialized_page")
}
CacheKey::EphemeralPage { .. } => (
&crate::metrics::PAGE_CACHE.read_accesses_ephemeral,
&crate::metrics::PAGE_CACHE.read_hits_ephemeral,
),
CacheKey::ImmutableFilePage { .. } => (
&crate::metrics::PAGE_CACHE.read_accesses_immutable,
&crate::metrics::PAGE_CACHE.read_hits_immutable,
@@ -578,7 +556,6 @@ impl PageCache {
// Make the slot ready
let slot = &self.slots[slot_idx];
inner.key = Some(cache_key.clone());
inner.dirty = false;
slot.usage_count.store(1, Ordering::Relaxed);
return Ok(ReadBufResult::NotFound(PageWriteGuard {
@@ -640,7 +617,6 @@ impl PageCache {
// Make the slot ready
let slot = &self.slots[slot_idx];
inner.key = Some(cache_key.clone());
inner.dirty = false;
slot.usage_count.store(1, Ordering::Relaxed);
return Ok(WriteBufResult::NotFound(PageWriteGuard {
@@ -679,10 +655,6 @@ impl PageCache {
*lsn = version.lsn;
Some(version.slot_idx)
}
CacheKey::EphemeralPage { file_id, blkno } => {
let map = self.ephemeral_page_map.read().unwrap();
Some(*map.get(&(*file_id, *blkno))?)
}
CacheKey::ImmutableFilePage { file_id, blkno } => {
let map = self.immutable_page_map.read().unwrap();
Some(*map.get(&(*file_id, *blkno))?)
@@ -706,10 +678,6 @@ impl PageCache {
None
}
}
CacheKey::EphemeralPage { file_id, blkno } => {
let map = self.ephemeral_page_map.read().unwrap();
Some(*map.get(&(*file_id, *blkno))?)
}
CacheKey::ImmutableFilePage { file_id, blkno } => {
let map = self.immutable_page_map.read().unwrap();
Some(*map.get(&(*file_id, *blkno))?)
@@ -743,12 +711,6 @@ impl PageCache {
panic!("could not find old key in mapping")
}
}
CacheKey::EphemeralPage { file_id, blkno } => {
let mut map = self.ephemeral_page_map.write().unwrap();
map.remove(&(*file_id, *blkno))
.expect("could not find old key in mapping");
self.size_metrics.current_bytes_ephemeral.sub_page_sz(1);
}
CacheKey::ImmutableFilePage { file_id, blkno } => {
let mut map = self.immutable_page_map.write().unwrap();
map.remove(&(*file_id, *blkno))
@@ -788,17 +750,7 @@ impl PageCache {
}
}
}
CacheKey::EphemeralPage { file_id, blkno } => {
let mut map = self.ephemeral_page_map.write().unwrap();
match map.entry((*file_id, *blkno)) {
Entry::Occupied(entry) => Some(*entry.get()),
Entry::Vacant(entry) => {
entry.insert(slot_idx);
self.size_metrics.current_bytes_ephemeral.add_page_sz(1);
None
}
}
}
CacheKey::ImmutableFilePage { file_id, blkno } => {
let mut map = self.immutable_page_map.write().unwrap();
match map.entry((*file_id, *blkno)) {
@@ -849,25 +801,8 @@ impl PageCache {
}
};
if let Some(old_key) = &inner.key {
if inner.dirty {
if let Err(err) = Self::writeback(old_key, inner.buf) {
// Writing the page to disk failed.
//
// FIXME: What to do here, when? We could propagate the error to the
// caller, but victim buffer is generally unrelated to the original
// call. It can even belong to a different tenant. Currently, we
// report the error to the log and continue the clock sweep to find
// a different victim. But if the problem persists, the page cache
// could fill up with dirty pages that we cannot evict, and we will
// loop retrying the writebacks indefinitely.
error!("writeback of buffer {:?} failed: {}", old_key, err);
continue;
}
}
// remove mapping for old buffer
self.remove_mapping(old_key);
inner.dirty = false;
inner.key = None;
}
return Ok((slot_idx, inner));
@@ -875,28 +810,6 @@ impl PageCache {
}
}
fn writeback(cache_key: &CacheKey, buf: &[u8]) -> Result<(), std::io::Error> {
match cache_key {
CacheKey::MaterializedPage {
hash_key: _,
lsn: _,
} => Err(std::io::Error::new(
std::io::ErrorKind::Other,
"unexpected dirty materialized page",
)),
CacheKey::EphemeralPage { file_id, blkno } => {
writeback_ephemeral_file(*file_id, *blkno, buf)
}
CacheKey::ImmutableFilePage {
file_id: _,
blkno: _,
} => Err(std::io::Error::new(
std::io::ErrorKind::Other,
"unexpected dirty immutable page",
)),
}
}
/// Initialize a new page cache
///
/// This should be called only once at page server startup.
@@ -907,7 +820,6 @@ impl PageCache {
let size_metrics = &crate::metrics::PAGE_CACHE_SIZE;
size_metrics.max_bytes.set_page_sz(num_pages);
size_metrics.current_bytes_ephemeral.set_page_sz(0);
size_metrics.current_bytes_immutable.set_page_sz(0);
size_metrics.current_bytes_materialized_page.set_page_sz(0);
@@ -917,11 +829,7 @@ impl PageCache {
let buf: &mut [u8; PAGE_SZ] = chunk.try_into().unwrap();
Slot {
inner: RwLock::new(SlotInner {
key: None,
buf,
dirty: false,
}),
inner: RwLock::new(SlotInner { key: None, buf }),
usage_count: AtomicU8::new(0),
}
})
@@ -929,7 +837,6 @@ impl PageCache {
Self {
materialized_page_map: Default::default(),
ephemeral_page_map: Default::default(),
immutable_page_map: Default::default(),
slots,
next_evict_slot: AtomicUsize::new(0),

View File

@@ -29,6 +29,7 @@ use std::collections::hash_map::Entry;
use std::collections::BTreeSet;
use std::collections::HashMap;
use std::fmt::Debug;
use std::fmt::Display;
use std::fs;
use std::fs::File;
use std::fs::OpenOptions;
@@ -55,6 +56,7 @@ use self::remote_timeline_client::RemoteTimelineClient;
use self::timeline::uninit::TimelineUninitMark;
use self::timeline::uninit::UninitializedTimeline;
use self::timeline::EvictionTaskTenantState;
use self::timeline::TimelineResources;
use crate::config::PageServerConf;
use crate::context::{DownloadBehavior, RequestContext};
use crate::import_datadir;
@@ -136,9 +138,6 @@ pub use timeline::{
LocalLayerInfoForDiskUsageEviction, LogicalSizeCalculationCause, PageReconstructError, Timeline,
};
// re-export this function so that page_cache.rs can use it.
pub use crate::tenant::ephemeral_file::writeback as writeback_ephemeral_file;
// re-export for use in remote_timeline_client.rs
pub use crate::tenant::metadata::save_metadata;
@@ -152,6 +151,14 @@ pub const TENANT_ATTACHING_MARKER_FILENAME: &str = "attaching";
pub const TENANT_DELETED_MARKER_FILE_NAME: &str = "deleted";
/// References to shared objects that are passed into each tenant, such
/// as the shared remote storage client and process initialization state.
#[derive(Clone)]
pub struct TenantSharedResources {
pub broker_client: storage_broker::BrokerClientChannel,
pub remote_storage: Option<GenericRemoteStorage>,
}
///
/// Tenant consists of multiple timelines. Keep them in a hash table.
///
@@ -391,7 +398,7 @@ impl Tenant {
async fn timeline_init_and_sync(
&self,
timeline_id: TimelineId,
remote_client: Option<RemoteTimelineClient>,
resources: TimelineResources,
remote_startup_data: Option<RemoteStartupData>,
local_metadata: Option<TimelineMetadata>,
ancestor: Option<Arc<Timeline>>,
@@ -412,7 +419,7 @@ impl Tenant {
timeline_id,
up_to_date_metadata,
ancestor.clone(),
remote_client,
resources,
init_order,
CreateTimelineCause::Load,
)?;
@@ -502,6 +509,7 @@ impl Tenant {
conf: &'static PageServerConf,
tenant_id: TenantId,
broker_client: storage_broker::BrokerClientChannel,
tenants: &'static tokio::sync::RwLock<TenantsMap>,
remote_storage: GenericRemoteStorage,
ctx: &RequestContext,
) -> anyhow::Result<Arc<Tenant>> {
@@ -516,7 +524,7 @@ impl Tenant {
tenant_conf,
wal_redo_manager,
tenant_id,
Some(remote_storage),
Some(remote_storage.clone()),
));
// Do all the hard work in the background
@@ -531,17 +539,61 @@ impl Tenant {
"attach tenant",
false,
async move {
// Ideally we should use Tenant::set_broken_no_wait, but it is not supposed to be used when tenant is in loading state.
let make_broken = |t: &Tenant, err: anyhow::Error| {
error!("attach failed, setting tenant state to Broken: {err:?}");
t.state.send_modify(|state| {
assert_eq!(
*state,
TenantState::Attaching,
"the attach task owns the tenant state until activation is complete"
);
*state = TenantState::broken_from_reason(err.to_string());
});
};
let pending_deletion = {
match DeleteTenantFlow::should_resume_deletion(
conf,
Some(&remote_storage),
&tenant_clone,
)
.await
{
Ok(should_resume_deletion) => should_resume_deletion,
Err(err) => {
make_broken(&tenant_clone, anyhow::anyhow!(err));
return Ok(());
}
}
};
info!("pending_deletion {}", pending_deletion.is_some());
if let Some(deletion) = pending_deletion {
match DeleteTenantFlow::resume_from_attach(
deletion,
&tenant_clone,
tenants,
&ctx,
)
.await
{
Err(err) => {
make_broken(&tenant_clone, anyhow::anyhow!(err));
return Ok(());
}
Ok(()) => return Ok(()),
}
}
match tenant_clone.attach(&ctx).await {
Ok(()) => {
info!("attach finished, activating");
tenant_clone.activate(broker_client, None, &ctx);
}
Err(e) => {
error!("attach failed, setting tenant state to Broken: {:?}", e);
tenant_clone.state.send_modify(|state| {
assert_eq!(*state, TenantState::Attaching, "the attach task owns the tenant state until activation is complete");
*state = TenantState::broken_from_reason(e.to_string());
});
make_broken(&tenant_clone, anyhow::anyhow!(e));
}
}
Ok(())
@@ -619,6 +671,9 @@ impl Tenant {
.instrument(info_span!("download_index_part", %timeline_id)),
);
}
let mut timelines_to_resume_deletions = vec![];
// Wait for all the download tasks to complete & collect results.
let mut remote_index_and_client = HashMap::new();
let mut timeline_ancestors = HashMap::new();
@@ -635,9 +690,12 @@ impl Tenant {
);
remote_index_and_client.insert(timeline_id, (index_part, client));
}
MaybeDeletedIndexPart::Deleted(_) => {
info!("timeline {} is deleted, skipping", timeline_id);
continue;
MaybeDeletedIndexPart::Deleted(index_part) => {
info!(
"timeline {} is deleted, picking to resume deletion",
timeline_id
);
timelines_to_resume_deletions.push((timeline_id, index_part, client));
}
}
}
@@ -652,14 +710,41 @@ impl Tenant {
.expect("just put it in above");
// TODO again handle early failure
self.load_remote_timeline(timeline_id, index_part, remote_metadata, remote_client, ctx)
.await
.with_context(|| {
format!(
"failed to load remote timeline {} for tenant {}",
timeline_id, self.tenant_id
)
})?;
self.load_remote_timeline(
timeline_id,
index_part,
remote_metadata,
TimelineResources {
remote_client: Some(remote_client),
},
ctx,
)
.await
.with_context(|| {
format!(
"failed to load remote timeline {} for tenant {}",
timeline_id, self.tenant_id
)
})?;
}
// Walk through deleted timelines, resume deletion
for (timeline_id, index_part, remote_timeline_client) in timelines_to_resume_deletions {
remote_timeline_client
.init_upload_queue_stopped_to_continue_deletion(&index_part)
.context("init queue stopped")
.map_err(LoadLocalTimelineError::ResumeDeletion)?;
DeleteTimelineFlow::resume_deletion(
Arc::clone(self),
timeline_id,
&index_part.parse_metadata().context("parse_metadata")?,
Some(remote_timeline_client),
None,
)
.await
.context("resume_deletion")
.map_err(LoadLocalTimelineError::ResumeDeletion)?;
}
std::fs::remove_file(&marker_file)
@@ -667,7 +752,7 @@ impl Tenant {
crashsafe::fsync(marker_file.parent().expect("marker file has parent dir"))
.context("fsync tenant directory after unlinking attach marker file")?;
utils::failpoint_sleep_millis_async!("attach-before-activate");
crate::failpoint_support::sleep_millis_async!("attach-before-activate");
info!("Done");
@@ -695,7 +780,7 @@ impl Tenant {
timeline_id: TimelineId,
index_part: IndexPart,
remote_metadata: TimelineMetadata,
remote_client: RemoteTimelineClient,
resources: TimelineResources,
ctx: &RequestContext,
) -> anyhow::Result<()> {
span::debug_assert_current_span_has_tenant_id();
@@ -725,7 +810,7 @@ impl Tenant {
self.timeline_init_and_sync(
timeline_id,
Some(remote_client),
resources,
Some(RemoteStartupData {
index_part,
remote_metadata,
@@ -772,8 +857,7 @@ impl Tenant {
pub(crate) fn spawn_load(
conf: &'static PageServerConf,
tenant_id: TenantId,
broker_client: storage_broker::BrokerClientChannel,
remote_storage: Option<GenericRemoteStorage>,
resources: TenantSharedResources,
init_order: Option<InitializationOrder>,
tenants: &'static tokio::sync::RwLock<TenantsMap>,
ctx: &RequestContext,
@@ -788,6 +872,9 @@ impl Tenant {
}
};
let broker_client = resources.broker_client;
let remote_storage = resources.remote_storage;
let wal_redo_manager = Arc::new(PostgresRedoManager::new(conf, tenant_id));
let tenant = Tenant::new(
TenantState::Loading,
@@ -811,6 +898,7 @@ impl Tenant {
"initial tenant load",
false,
async move {
// Ideally we should use Tenant::set_broken_no_wait, but it is not supposed to be used when tenant is in loading state.
let make_broken = |t: &Tenant, err: anyhow::Error| {
error!("load failed, setting tenant state to Broken: {err:?}");
t.state.send_modify(|state| {
@@ -858,7 +946,7 @@ impl Tenant {
.as_mut()
.and_then(|x| x.initial_logical_size_attempt.take());
match DeleteTenantFlow::resume(
match DeleteTenantFlow::resume_from_load(
deletion,
&tenant_clone,
init_order.as_ref(),
@@ -880,7 +968,7 @@ impl Tenant {
match tenant_clone.load(init_order.as_ref(), &ctx).await {
Ok(()) => {
debug!("load finished",);
debug!("load finished");
tenant_clone.activate(broker_client, background_jobs_can_start, &ctx);
}
@@ -1076,7 +1164,7 @@ impl Tenant {
debug!("loading tenant task");
utils::failpoint_sleep_millis_async!("before-loading-tenant");
crate::failpoint_support::sleep_millis_async!("before-loading-tenant");
// Load in-memory state to reflect the local files on disk
//
@@ -1172,16 +1260,9 @@ impl Tenant {
) -> Result<(), LoadLocalTimelineError> {
span::debug_assert_current_span_has_tenant_id();
let remote_client = self.remote_storage.as_ref().map(|remote_storage| {
RemoteTimelineClient::new(
remote_storage.clone(),
self.conf,
self.tenant_id,
timeline_id,
)
});
let mut resources = self.build_timeline_resources(timeline_id);
let (remote_startup_data, remote_client) = match remote_client {
let (remote_startup_data, remote_client) = match resources.remote_client {
Some(remote_client) => match remote_client.download_index_file().await {
Ok(index_part) => {
let index_part = match index_part {
@@ -1269,9 +1350,10 @@ impl Tenant {
return Ok(());
}
(None, remote_client)
(None, resources.remote_client)
}
};
resources.remote_client = remote_client;
let ancestor = if let Some(ancestor_timeline_id) = local_metadata.ancestor_timeline() {
let ancestor_timeline = self.get_timeline(ancestor_timeline_id, false)
@@ -1284,7 +1366,7 @@ impl Tenant {
self.timeline_init_and_sync(
timeline_id,
remote_client,
resources,
remote_startup_data,
Some(local_metadata),
ancestor,
@@ -1773,7 +1855,7 @@ impl Tenant {
// It's mesed up.
// we just ignore the failure to stop
match self.set_stopping(shutdown_progress, false).await {
match self.set_stopping(shutdown_progress, false, false).await {
Ok(()) => {}
Err(SetStoppingError::Broken) => {
// assume that this is acceptable
@@ -1815,15 +1897,18 @@ impl Tenant {
/// This function is not cancel-safe!
///
/// `allow_transition_from_loading` is needed for the special case of loading task deleting the tenant.
/// `allow_transition_from_attaching` is needed for the special case of attaching deleted tenant.
async fn set_stopping(
&self,
progress: completion::Barrier,
allow_transition_from_loading: bool,
allow_transition_from_attaching: bool,
) -> Result<(), SetStoppingError> {
let mut rx = self.state.subscribe();
// cannot stop before we're done activating, so wait out until we're done activating
rx.wait_for(|state| match state {
TenantState::Attaching if allow_transition_from_attaching => true,
TenantState::Activating(_) | TenantState::Attaching => {
info!(
"waiting for {} to turn Active|Broken|Stopping",
@@ -1840,12 +1925,19 @@ impl Tenant {
// we now know we're done activating, let's see whether this task is the winner to transition into Stopping
let mut err = None;
let stopping = self.state.send_if_modified(|current_state| match current_state {
TenantState::Activating(_) | TenantState::Attaching => {
unreachable!("we ensured above that we're done with activation, and, there is no re-activation")
TenantState::Activating(_) => {
unreachable!("1we ensured above that we're done with activation, and, there is no re-activation")
}
TenantState::Attaching => {
if !allow_transition_from_attaching {
unreachable!("2we ensured above that we're done with activation, and, there is no re-activation")
};
*current_state = TenantState::Stopping { progress };
true
}
TenantState::Loading => {
if !allow_transition_from_loading {
unreachable!("we ensured above that we're done with activation, and, there is no re-activation")
unreachable!("3we ensured above that we're done with activation, and, there is no re-activation")
};
*current_state = TenantState::Stopping { progress };
true
@@ -1921,7 +2013,8 @@ impl Tenant {
self.set_broken_no_wait(reason)
}
pub(crate) fn set_broken_no_wait(&self, reason: String) {
pub(crate) fn set_broken_no_wait(&self, reason: impl Display) {
let reason = reason.to_string();
self.state.send_modify(|current_state| {
match *current_state {
TenantState::Activating(_) | TenantState::Loading | TenantState::Attaching => {
@@ -2145,7 +2238,7 @@ impl Tenant {
new_timeline_id: TimelineId,
new_metadata: &TimelineMetadata,
ancestor: Option<Arc<Timeline>>,
remote_client: Option<RemoteTimelineClient>,
resources: TimelineResources,
init_order: Option<&InitializationOrder>,
cause: CreateTimelineCause,
) -> anyhow::Result<Arc<Timeline>> {
@@ -2174,7 +2267,7 @@ impl Tenant {
new_timeline_id,
self.tenant_id,
Arc::clone(&self.walredo_mgr),
remote_client,
resources,
pg_version,
initial_logical_size_can_start.cloned(),
initial_logical_size_attempt.cloned().flatten(),
@@ -2416,7 +2509,9 @@ impl Tenant {
.refresh_gc_info_internal(target_timeline_id, horizon, pitr, ctx)
.await?;
utils::failpoint_sleep_millis_async!("gc_iteration_internal_after_getting_gc_timelines");
crate::failpoint_support::sleep_millis_async!(
"gc_iteration_internal_after_getting_gc_timelines"
);
// If there is nothing to GC, we don't want any messages in the INFO log.
if !gc_timelines.is_empty() {
@@ -2820,6 +2915,23 @@ impl Tenant {
Ok(timeline)
}
/// Call this before constructing a timeline, to build its required structures
fn build_timeline_resources(&self, timeline_id: TimelineId) -> TimelineResources {
let remote_client = if let Some(remote_storage) = self.remote_storage.as_ref() {
let remote_client = RemoteTimelineClient::new(
remote_storage.clone(),
self.conf,
self.tenant_id,
timeline_id,
);
Some(remote_client)
} else {
None
};
TimelineResources { remote_client }
}
/// Creates intermediate timeline structure and its files.
///
/// An empty layer map is initialized, and new data and WAL can be imported starting
@@ -2836,25 +2948,17 @@ impl Tenant {
) -> anyhow::Result<UninitializedTimeline> {
let tenant_id = self.tenant_id;
let remote_client = if let Some(remote_storage) = self.remote_storage.as_ref() {
let remote_client = RemoteTimelineClient::new(
remote_storage.clone(),
self.conf,
tenant_id,
new_timeline_id,
);
let resources = self.build_timeline_resources(new_timeline_id);
if let Some(remote_client) = &resources.remote_client {
remote_client.init_upload_queue_for_empty_remote(new_metadata)?;
Some(remote_client)
} else {
None
};
}
let timeline_struct = self
.create_timeline_struct(
new_timeline_id,
new_metadata,
ancestor,
remote_client,
resources,
None,
CreateTimelineCause::Load,
)
@@ -3927,6 +4031,31 @@ mod tests {
Ok(())
}
#[tokio::test]
async fn delta_layer_dumping() -> anyhow::Result<()> {
let (tenant, ctx) = TenantHarness::create("test_layer_dumping")?.load().await;
let tline = tenant
.create_test_timeline(TIMELINE_ID, Lsn(0x10), DEFAULT_PG_VERSION, &ctx)
.await?;
make_some_layers(tline.as_ref(), Lsn(0x20)).await?;
let layer_map = tline.layers.read().await;
let level0_deltas = layer_map.layer_map().get_level0_deltas()?;
assert!(!level0_deltas.is_empty());
for delta in level0_deltas {
let delta = layer_map.get_from_desc(&delta);
// Ensure we are dumping a delta layer here
let delta = delta.downcast_delta_layer().unwrap();
delta.dump(false, &ctx).await.unwrap();
delta.dump(true, &ctx).await.unwrap();
}
Ok(())
}
#[tokio::test]
async fn corrupt_metadata() -> anyhow::Result<()> {
const TEST_NAME: &str = "corrupt_metadata";

View File

@@ -6,7 +6,6 @@ use crate::page_cache::{self, PageReadGuard, ReadBufResult, PAGE_SZ};
use bytes::Bytes;
use std::ops::{Deref, DerefMut};
use std::os::unix::fs::FileExt;
use std::sync::atomic::AtomicU64;
/// This is implemented by anything that can read 8 kB (PAGE_SZ)
/// blocks, using the page cache
@@ -43,37 +42,34 @@ where
}
}
/// A block accessible for reading
///
/// During builds with `#[cfg(test)]`, this is a proper enum
/// with two variants to support testing code. During normal
/// builds, it just has one variant and is thus a cheap newtype
/// wrapper of [`PageReadGuard`]
pub enum BlockLease {
/// Reference to an in-memory copy of an immutable on-disk block.
pub enum BlockLease<'a> {
PageReadGuard(PageReadGuard<'static>),
EphemeralFileMutableTail(&'a [u8; PAGE_SZ]),
#[cfg(test)]
Rc(std::rc::Rc<[u8; PAGE_SZ]>),
}
impl From<PageReadGuard<'static>> for BlockLease {
fn from(value: PageReadGuard<'static>) -> Self {
impl From<PageReadGuard<'static>> for BlockLease<'static> {
fn from(value: PageReadGuard<'static>) -> BlockLease<'static> {
BlockLease::PageReadGuard(value)
}
}
#[cfg(test)]
impl From<std::rc::Rc<[u8; PAGE_SZ]>> for BlockLease {
impl<'a> From<std::rc::Rc<[u8; PAGE_SZ]>> for BlockLease<'a> {
fn from(value: std::rc::Rc<[u8; PAGE_SZ]>) -> Self {
BlockLease::Rc(value)
}
}
impl Deref for BlockLease {
impl<'a> Deref for BlockLease<'a> {
type Target = [u8; PAGE_SZ];
fn deref(&self) -> &Self::Target {
match self {
BlockLease::PageReadGuard(v) => v.deref(),
BlockLease::EphemeralFileMutableTail(v) => v,
#[cfg(test)]
BlockLease::Rc(v) => v.deref(),
}
@@ -116,13 +112,6 @@ where
self.reader.read_blk(blknum)
}
}
static NEXT_ID: AtomicU64 = AtomicU64::new(1);
#[derive(Debug, Copy, Clone, PartialEq, Eq, Hash)]
pub struct FileId(u64);
fn next_file_id() -> FileId {
FileId(NEXT_ID.fetch_add(1, std::sync::atomic::Ordering::Relaxed))
}
/// An adapter for reading a (virtual) file using the page cache.
///
@@ -132,7 +121,7 @@ pub struct FileBlockReader<F> {
pub file: F,
/// Unique ID of this file, used as key in the page cache.
file_id: FileId,
file_id: page_cache::FileId,
}
impl<F> FileBlockReader<F>
@@ -140,7 +129,7 @@ where
F: FileExt,
{
pub fn new(file: F) -> Self {
let file_id = next_file_id();
let file_id = page_cache::next_file_id();
FileBlockReader { file_id, file }
}
@@ -157,7 +146,6 @@ where
F: FileExt,
{
fn read_blk(&self, blknum: u32) -> Result<BlockLease, std::io::Error> {
// Look up the right page
let cache = page_cache::get();
loop {
match cache

View File

@@ -275,8 +275,9 @@ pub(crate) async fn remote_delete_mark_exists(
/// It is resumable from any step in case a crash/restart occurs.
/// There are three entrypoints to the process:
/// 1. [`DeleteTenantFlow::run`] this is the main one called by a management api handler.
/// 2. [`DeleteTenantFlow::resume`] is called during restarts when local or remote deletion marks are still there.
/// Note the only other place that messes around timeline delete mark is the `Tenant::spawn_load` function.
/// 2. [`DeleteTenantFlow::resume_from_load`] is called during restarts when local or remote deletion marks are still there.
/// 3. [`DeleteTenantFlow::resume_from_attach`] is called when deletion is resumed tenant is found to be deleted during attach process.
/// Note the only other place that messes around timeline delete mark is the `Tenant::spawn_load` function.
#[derive(Default)]
pub enum DeleteTenantFlow {
#[default]
@@ -403,7 +404,7 @@ impl DeleteTenantFlow {
}
}
pub(crate) async fn resume(
pub(crate) async fn resume_from_load(
guard: DeletionGuard,
tenant: &Arc<Tenant>,
init_order: Option<&InitializationOrder>,
@@ -413,7 +414,7 @@ impl DeleteTenantFlow {
let (_, progress) = completion::channel();
tenant
.set_stopping(progress, true)
.set_stopping(progress, true, false)
.await
.expect("cant be stopping or broken");
@@ -441,6 +442,31 @@ impl DeleteTenantFlow {
.await
}
pub(crate) async fn resume_from_attach(
guard: DeletionGuard,
tenant: &Arc<Tenant>,
tenants: &'static tokio::sync::RwLock<TenantsMap>,
ctx: &RequestContext,
) -> Result<(), DeleteTenantError> {
let (_, progress) = completion::channel();
tenant
.set_stopping(progress, false, true)
.await
.expect("cant be stopping or broken");
tenant.attach(ctx).await.context("attach")?;
Self::background(
guard,
tenant.conf,
tenant.remote_storage.clone(),
tenants,
tenant,
)
.await
}
async fn prepare(
tenants: &tokio::sync::RwLock<TenantsMap>,
tenant_id: TenantId,

View File

@@ -2,54 +2,31 @@
//! used to keep in-memory layers spilled on disk.
use crate::config::PageServerConf;
use crate::page_cache::{self, ReadBufResult, WriteBufResult, PAGE_SZ};
use crate::page_cache::{self, PAGE_SZ};
use crate::tenant::blob_io::BlobWriter;
use crate::tenant::block_io::{BlockLease, BlockReader};
use crate::virtual_file::VirtualFile;
use once_cell::sync::Lazy;
use std::cmp::min;
use std::collections::HashMap;
use std::fs::OpenOptions;
use std::io::{self, ErrorKind};
use std::ops::DerefMut;
use std::os::unix::prelude::FileExt;
use std::path::PathBuf;
use std::sync::{Arc, RwLock};
use std::sync::atomic::AtomicU64;
use tracing::*;
use utils::id::{TenantId, TimelineId};
///
/// This is the global cache of file descriptors (File objects).
///
static EPHEMERAL_FILES: Lazy<RwLock<EphemeralFiles>> = Lazy::new(|| {
RwLock::new(EphemeralFiles {
next_file_id: FileId(1),
files: HashMap::new(),
})
});
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
pub struct FileId(u64);
impl std::fmt::Display for FileId {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "{}", self.0)
}
}
pub struct EphemeralFiles {
next_file_id: FileId,
files: HashMap<FileId, Arc<VirtualFile>>,
}
pub struct EphemeralFile {
file_id: FileId,
page_cache_file_id: page_cache::FileId,
_tenant_id: TenantId,
_timeline_id: TimelineId,
file: Arc<VirtualFile>,
pub size: u64,
file: VirtualFile,
size: u64,
/// An ephemeral file is append-only.
/// We keep the last page, which can still be modified, in [`Self::mutable_tail`].
/// The other pages, which can no longer be modified, are accessed through the page cache.
mutable_tail: [u8; PAGE_SZ],
}
impl EphemeralFile {
@@ -58,74 +35,31 @@ impl EphemeralFile {
tenant_id: TenantId,
timeline_id: TimelineId,
) -> Result<EphemeralFile, io::Error> {
let mut l = EPHEMERAL_FILES.write().unwrap();
let file_id = l.next_file_id;
l.next_file_id = FileId(l.next_file_id.0 + 1);
static NEXT_FILENAME: AtomicU64 = AtomicU64::new(1);
let filename_disambiguator =
NEXT_FILENAME.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
let filename = conf
.timeline_path(&tenant_id, &timeline_id)
.join(PathBuf::from(format!("ephemeral-{}", file_id)));
.join(PathBuf::from(format!("ephemeral-{filename_disambiguator}")));
let file = VirtualFile::open_with_options(
&filename,
OpenOptions::new().read(true).write(true).create(true),
)?;
let file_rc = Arc::new(file);
l.files.insert(file_id, file_rc.clone());
Ok(EphemeralFile {
file_id,
page_cache_file_id: page_cache::next_file_id(),
_tenant_id: tenant_id,
_timeline_id: timeline_id,
file: file_rc,
file,
size: 0,
mutable_tail: [0u8; PAGE_SZ],
})
}
fn fill_buffer(&self, buf: &mut [u8], blkno: u32) -> Result<(), io::Error> {
let mut off = 0;
while off < PAGE_SZ {
let n = self
.file
.read_at(&mut buf[off..], blkno as u64 * PAGE_SZ as u64 + off as u64)?;
if n == 0 {
// Reached EOF. Fill the rest of the buffer with zeros.
const ZERO_BUF: [u8; PAGE_SZ] = [0u8; PAGE_SZ];
buf[off..].copy_from_slice(&ZERO_BUF[off..]);
break;
}
off += n;
}
Ok(())
}
fn get_buf_for_write(
&self,
blkno: u32,
) -> Result<page_cache::PageWriteGuard<'static>, io::Error> {
// Look up the right page
let cache = page_cache::get();
let mut write_guard = match cache
.write_ephemeral_buf(self.file_id, blkno)
.map_err(|e| to_io_error(e, "Failed to write ephemeral buf"))?
{
WriteBufResult::Found(guard) => guard,
WriteBufResult::NotFound(mut guard) => {
// Read the page from disk into the buffer
// TODO: if we're overwriting the whole page, no need to read it in first
self.fill_buffer(guard.deref_mut(), blkno)?;
guard.mark_valid();
// And then fall through to modify it.
guard
}
};
write_guard.mark_dirty();
Ok(write_guard)
pub(crate) fn size(&self) -> u64 {
self.size
}
}
@@ -146,49 +80,74 @@ impl BlobWriter for EphemeralFile {
blknum: u32,
/// The offset inside the block identified by [`blknum`] to which [`push_bytes`] will write.
off: usize,
/// Used by [`push_bytes`] to memoize the page cache write guard across calls to it.
memo_page_guard: MemoizedPageWriteGuard,
}
struct MemoizedPageWriteGuard {
guard: page_cache::PageWriteGuard<'static>,
/// The block number of the page in `guard`.
blknum: u32,
}
impl<'a> Writer<'a> {
fn new(ephemeral_file: &'a mut EphemeralFile) -> io::Result<Writer<'a>> {
let blknum = (ephemeral_file.size / PAGE_SZ as u64) as u32;
Ok(Writer {
blknum,
blknum: (ephemeral_file.size / PAGE_SZ as u64) as u32,
off: (ephemeral_file.size % PAGE_SZ as u64) as usize,
memo_page_guard: MemoizedPageWriteGuard {
guard: ephemeral_file.get_buf_for_write(blknum)?,
blknum,
},
ephemeral_file,
})
}
#[inline(always)]
fn push_bytes(&mut self, src: &[u8]) -> Result<(), io::Error> {
// `src_remaining` is the remaining bytes to be written
let mut src_remaining = src;
while !src_remaining.is_empty() {
let page = if self.memo_page_guard.blknum == self.blknum {
&mut self.memo_page_guard.guard
} else {
self.memo_page_guard.guard =
self.ephemeral_file.get_buf_for_write(self.blknum)?;
self.memo_page_guard.blknum = self.blknum;
&mut self.memo_page_guard.guard
};
let dst_remaining = &mut page[self.off..];
let dst_remaining = &mut self.ephemeral_file.mutable_tail[self.off..];
let n = min(dst_remaining.len(), src_remaining.len());
dst_remaining[..n].copy_from_slice(&src_remaining[..n]);
self.off += n;
src_remaining = &src_remaining[n..];
if self.off == PAGE_SZ {
// This block is done, move to next one.
self.blknum += 1;
self.off = 0;
match self.ephemeral_file.file.write_all_at(
&self.ephemeral_file.mutable_tail,
self.blknum as u64 * PAGE_SZ as u64,
) {
Ok(_) => {
// Pre-warm the page cache with what we just wrote.
// This isn't necessary for coherency/correctness, but it's how we've always done it.
let cache = page_cache::get();
match cache.read_immutable_buf(
self.ephemeral_file.page_cache_file_id,
self.blknum,
) {
Ok(page_cache::ReadBufResult::Found(_guard)) => {
// This function takes &mut self, so, it shouldn't be possible to reach this point.
unreachable!("we just wrote blknum {} and this function takes &mut self, so, no concurrent read_blk is possible", self.blknum);
}
Ok(page_cache::ReadBufResult::NotFound(mut write_guard)) => {
let buf: &mut [u8] = write_guard.deref_mut();
debug_assert_eq!(buf.len(), PAGE_SZ);
buf.copy_from_slice(&self.ephemeral_file.mutable_tail);
write_guard.mark_valid();
// pre-warm successful
}
Err(e) => {
error!("ephemeral_file write_blob failed to get immutable buf to pre-warm page cache: {e:?}");
// fail gracefully, it's not the end of the world if we can't pre-warm the cache here
}
}
// Zero the buffer for re-use.
// Zeroing is critical for correcntess because the write_blob code below
// and similarly read_blk expect zeroed pages.
self.ephemeral_file.mutable_tail.fill(0);
// This block is done, move to next one.
self.blknum += 1;
self.off = 0;
}
Err(e) => {
return Err(std::io::Error::new(
ErrorKind::Other,
// order error before path because path is long and error is short
format!(
"ephemeral_file: write_blob: write-back full tail blk #{}: {:#}: {}",
self.blknum,
e,
self.ephemeral_file.file.path.display(),
),
));
}
}
}
}
Ok(())
@@ -227,10 +186,7 @@ impl Drop for EphemeralFile {
fn drop(&mut self) {
// drop all pages from page cache
let cache = page_cache::get();
cache.drop_buffers_for_ephemeral(self.file_id);
// remove entry from the hash map
EPHEMERAL_FILES.write().unwrap().files.remove(&self.file_id);
cache.drop_buffers_for_immutable(self.page_cache_file_id);
// unlink the file
let res = std::fs::remove_file(&self.file.path);
@@ -250,54 +206,48 @@ impl Drop for EphemeralFile {
}
}
pub fn writeback(file_id: FileId, blkno: u32, buf: &[u8]) -> Result<(), io::Error> {
if let Some(file) = EPHEMERAL_FILES.read().unwrap().files.get(&file_id) {
match file.write_all_at(buf, blkno as u64 * PAGE_SZ as u64) {
Ok(_) => Ok(()),
Err(e) => Err(io::Error::new(
ErrorKind::Other,
format!(
"failed to write back to ephemeral file at {} error: {}",
file.path.display(),
e
),
)),
}
} else {
Err(io::Error::new(
ErrorKind::Other,
"could not write back page, not found in ephemeral files hash",
))
}
}
impl BlockReader for EphemeralFile {
fn read_blk(&self, blknum: u32) -> Result<BlockLease, io::Error> {
// Look up the right page
let cache = page_cache::get();
loop {
match cache
.read_ephemeral_buf(self.file_id, blknum)
.map_err(|e| to_io_error(e, "Failed to read ephemeral buf"))?
{
ReadBufResult::Found(guard) => return Ok(guard.into()),
ReadBufResult::NotFound(mut write_guard) => {
// Read the page from disk into the buffer
self.fill_buffer(write_guard.deref_mut(), blknum)?;
write_guard.mark_valid();
let flushed_blknums = 0..self.size / PAGE_SZ as u64;
if flushed_blknums.contains(&(blknum as u64)) {
let cache = page_cache::get();
loop {
match cache
.read_immutable_buf(self.page_cache_file_id, blknum)
.map_err(|e| {
std::io::Error::new(
std::io::ErrorKind::Other,
// order path before error because error is anyhow::Error => might have many contexts
format!(
"ephemeral file: read immutable page #{}: {}: {:#}",
blknum,
self.file.path.display(),
e,
),
)
})? {
page_cache::ReadBufResult::Found(guard) => {
return Ok(BlockLease::PageReadGuard(guard))
}
page_cache::ReadBufResult::NotFound(mut write_guard) => {
let buf: &mut [u8] = write_guard.deref_mut();
debug_assert_eq!(buf.len(), PAGE_SZ);
self.file
.read_exact_at(&mut buf[..], blknum as u64 * PAGE_SZ as u64)?;
write_guard.mark_valid();
// Swap for read lock
continue;
}
};
// Swap for read lock
continue;
}
};
}
} else {
debug_assert_eq!(blknum as u64, self.size / PAGE_SZ as u64);
Ok(BlockLease::EphemeralFileMutableTail(&self.mutable_tail))
}
}
}
fn to_io_error(e: anyhow::Error, context: &str) -> io::Error {
io::Error::new(ErrorKind::Other, format!("{context}: {e:#}"))
}
#[cfg(test)]
mod tests {
use super::*;

View File

@@ -27,8 +27,9 @@ use crate::{InitializationOrder, IGNORED_TENANT_FILE_NAME};
use utils::fs_ext::PathExt;
use utils::id::{TenantId, TimelineId};
use super::delete::{remote_delete_mark_exists, DeleteTenantError};
use super::delete::DeleteTenantError;
use super::timeline::delete::DeleteTimelineFlow;
use super::TenantSharedResources;
/// The tenants known to the pageserver.
/// The enum variants are used to distinguish the different states that the pageserver can be in.
@@ -66,8 +67,7 @@ static TENANTS: Lazy<RwLock<TenantsMap>> = Lazy::new(|| RwLock::new(TenantsMap::
#[instrument(skip_all)]
pub async fn init_tenant_mgr(
conf: &'static PageServerConf,
broker_client: storage_broker::BrokerClientChannel,
remote_storage: Option<GenericRemoteStorage>,
resources: TenantSharedResources,
init_order: InitializationOrder,
) -> anyhow::Result<()> {
// Scan local filesystem for attached tenants
@@ -125,8 +125,7 @@ pub async fn init_tenant_mgr(
match schedule_local_tenant_processing(
conf,
&tenant_dir_path,
broker_client.clone(),
remote_storage.clone(),
resources.clone(),
Some(init_order.clone()),
&TENANTS,
&ctx,
@@ -162,8 +161,7 @@ pub async fn init_tenant_mgr(
pub(crate) fn schedule_local_tenant_processing(
conf: &'static PageServerConf,
tenant_path: &Path,
broker_client: storage_broker::BrokerClientChannel,
remote_storage: Option<GenericRemoteStorage>,
resources: TenantSharedResources,
init_order: Option<InitializationOrder>,
tenants: &'static tokio::sync::RwLock<TenantsMap>,
ctx: &RequestContext,
@@ -200,8 +198,15 @@ pub(crate) fn schedule_local_tenant_processing(
let tenant = if conf.tenant_attaching_mark_file_path(&tenant_id).exists() {
info!("tenant {tenant_id} has attaching mark file, resuming its attach operation");
if let Some(remote_storage) = remote_storage {
match Tenant::spawn_attach(conf, tenant_id, broker_client, remote_storage, ctx) {
if let Some(remote_storage) = resources.remote_storage {
match Tenant::spawn_attach(
conf,
tenant_id,
resources.broker_client,
tenants,
remote_storage,
ctx,
) {
Ok(tenant) => tenant,
Err(e) => {
error!("Failed to spawn_attach tenant {tenant_id}, reason: {e:#}");
@@ -219,15 +224,7 @@ pub(crate) fn schedule_local_tenant_processing(
} else {
info!("tenant {tenant_id} is assumed to be loadable, starting load operation");
// Start loading the tenant into memory. It will initially be in Loading state.
Tenant::spawn_load(
conf,
tenant_id,
broker_client,
remote_storage,
init_order,
tenants,
ctx,
)
Tenant::spawn_load(conf, tenant_id, resources, init_order, tenants, ctx)
};
Ok(tenant)
}
@@ -362,8 +359,12 @@ pub async fn create_tenant(
// TODO: tenant directory remains on disk if we bail out from here on.
// See https://github.com/neondatabase/neon/issues/4233
let tenant_resources = TenantSharedResources {
broker_client,
remote_storage,
};
let created_tenant =
schedule_local_tenant_processing(conf, &tenant_directory, broker_client, remote_storage, None, &TENANTS, ctx)?;
schedule_local_tenant_processing(conf, &tenant_directory, tenant_resources, None, &TENANTS, ctx)?;
// TODO: tenant object & its background loops remain, untracked in tenant map, if we fail here.
// See https://github.com/neondatabase/neon/issues/4233
@@ -522,7 +523,11 @@ pub async fn load_tenant(
.with_context(|| format!("Failed to remove tenant ignore mark {tenant_ignore_mark:?} during tenant loading"))?;
}
let new_tenant = schedule_local_tenant_processing(conf, &tenant_path, broker_client, remote_storage, None, &TENANTS, ctx)
let resources = TenantSharedResources {
broker_client,
remote_storage,
};
let new_tenant = schedule_local_tenant_processing(conf, &tenant_path, resources, None, &TENANTS, ctx)
.with_context(|| {
format!("Failed to schedule tenant processing in path {tenant_path:?}")
})?;
@@ -591,12 +596,6 @@ pub async fn attach_tenant(
remote_storage: GenericRemoteStorage,
ctx: &RequestContext,
) -> Result<(), TenantMapInsertError> {
// Temporary solution, proper one would be to resume deletion, but that needs more plumbing around Tenant::load/Tenant::attach
// Corresponding issue https://github.com/neondatabase/neon/issues/5006
if remote_delete_mark_exists(conf, &tenant_id, &remote_storage).await? {
return Err(anyhow::anyhow!("Tenant is marked as deleted on remote storage").into());
}
tenant_map_insert(tenant_id, || {
let tenant_dir = create_tenant_files(conf, tenant_conf, &tenant_id, CreateTenantFilesMode::Attach)?;
// TODO: tenant directory remains on disk if we bail out from here on.
@@ -609,7 +608,11 @@ pub async fn attach_tenant(
.context("check for attach marker file existence")?;
anyhow::ensure!(marker_file_exists, "create_tenant_files should have created the attach marker file");
let attached_tenant = schedule_local_tenant_processing(conf, &tenant_dir, broker_client, Some(remote_storage), None, &TENANTS, ctx)?;
let resources = TenantSharedResources {
broker_client,
remote_storage: Some(remote_storage),
};
let attached_tenant = schedule_local_tenant_processing(conf, &tenant_dir, resources, None, &TENANTS, ctx)?;
// TODO: tenant object & its background loops remain, untracked in tenant map, if we fail here.
// See https://github.com/neondatabase/neon/issues/4233

View File

@@ -1578,7 +1578,11 @@ mod tests {
};
assert_file_list(
&index_part.timeline_layers,
&index_part
.layer_metadata
.keys()
.map(|f| f.to_owned())
.collect(),
&[
&layer_file_name_1.file_name(),
&layer_file_name_2.file_name(),

View File

@@ -62,10 +62,9 @@ pub struct IndexPart {
#[serde(skip_serializing_if = "Option::is_none")]
pub deleted_at: Option<NaiveDateTime>,
/// Layer names, which are stored on the remote storage.
///
/// Additional metadata can might exist in `layer_metadata`.
pub timeline_layers: HashSet<LayerFileName>,
/// Legacy field: equal to the keys of `layer_metadata`, only written out for forward compat
#[serde(default, skip_deserializing)]
timeline_layers: HashSet<LayerFileName>,
/// Per layer file name metadata, which can be present for a present or missing layer file.
///
@@ -74,9 +73,10 @@ pub struct IndexPart {
pub layer_metadata: HashMap<LayerFileName, IndexLayerMetadata>,
// 'disk_consistent_lsn' is a copy of the 'disk_consistent_lsn' in the metadata.
// It's duplicated here for convenience.
// It's duplicated for convenience when reading the serialized structure, but is
// private because internally we would read from metadata instead.
#[serde_as(as = "DisplayFromStr")]
pub disk_consistent_lsn: Lsn,
disk_consistent_lsn: Lsn,
metadata_bytes: Vec<u8>,
}
@@ -85,7 +85,11 @@ impl IndexPart {
/// used to understand later versions.
///
/// Version is currently informative only.
const LATEST_VERSION: usize = 2;
/// Version history
/// - 2: added `deleted_at`
/// - 3: no longer deserialize `timeline_layers` (serialized format is the same, but timeline_layers
/// is always generated from the keys of `layer_metadata`)
const LATEST_VERSION: usize = 3;
pub const FILE_NAME: &'static str = "index_part.json";
pub fn new(
@@ -166,7 +170,7 @@ mod tests {
let expected = IndexPart {
// note this is not verified, could be anything, but exists for humans debugging.. could be the git version instead?
version: 1,
timeline_layers: HashSet::from(["000000000000000000000000000000000000-FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF__0000000001696070-00000000016960E9".parse().unwrap()]),
timeline_layers: HashSet::new(),
layer_metadata: HashMap::from([
("000000000000000000000000000000000000-FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF__0000000001696070-00000000016960E9".parse().unwrap(), IndexLayerMetadata {
file_size: 25600000,
@@ -203,7 +207,7 @@ mod tests {
let expected = IndexPart {
// note this is not verified, could be anything, but exists for humans debugging.. could be the git version instead?
version: 1,
timeline_layers: HashSet::from(["000000000000000000000000000000000000-FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF__0000000001696070-00000000016960E9".parse().unwrap()]),
timeline_layers: HashSet::new(),
layer_metadata: HashMap::from([
("000000000000000000000000000000000000-FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF__0000000001696070-00000000016960E9".parse().unwrap(), IndexLayerMetadata {
file_size: 25600000,
@@ -241,7 +245,7 @@ mod tests {
let expected = IndexPart {
// note this is not verified, could be anything, but exists for humans debugging.. could be the git version instead?
version: 2,
timeline_layers: HashSet::from(["000000000000000000000000000000000000-FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF__0000000001696070-00000000016960E9".parse().unwrap()]),
timeline_layers: HashSet::new(),
layer_metadata: HashMap::from([
("000000000000000000000000000000000000-FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF__0000000001696070-00000000016960E9".parse().unwrap(), IndexLayerMetadata {
file_size: 25600000,

View File

@@ -51,7 +51,6 @@ use std::ops::Range;
use std::os::unix::fs::FileExt;
use std::path::{Path, PathBuf};
use std::sync::Arc;
use tokio::runtime::Handle;
use tokio::sync::OnceCell;
use tracing::*;
@@ -177,10 +176,6 @@ impl DeltaKey {
Lsn(u64::from_be_bytes(self.0[KEY_SIZE..].try_into().unwrap()))
}
fn extract_key_from_buf(buf: &[u8]) -> Key {
Key::from_slice(&buf[..KEY_SIZE])
}
fn extract_lsn_from_buf(buf: &[u8]) -> Lsn {
let mut lsn_buf = [0u8; 8];
lsn_buf.copy_from_slice(&buf[KEY_SIZE..]);
@@ -277,48 +272,42 @@ impl Layer for DeltaLayer {
tree_reader.dump().await?;
let cursor = file.block_cursor();
let keys = DeltaLayerInner::load_keys(&Ref(&**inner)).await?;
// A subroutine to dump a single blob
let dump_blob = |blob_ref: BlobRef| -> anyhow::Result<String> {
// TODO this is not ideal, but on the other hand we are in dumping code...
let buf = Handle::current().block_on(cursor.read_blob(blob_ref.pos()))?;
let val = Value::des(&buf)?;
let desc = match val {
Value::Image(img) => {
format!(" img {} bytes", img.len())
}
Value::WalRecord(rec) => {
let wal_desc = walrecord::describe_wal_record(&rec)?;
format!(
" rec {} bytes will_init: {} {}",
buf.len(),
rec.will_init(),
wal_desc
)
}
};
Ok(desc)
let dump_blob = |val: ValueRef<_>| -> _ {
async move {
let buf = val.reader.read_blob(val.blob_ref.pos()).await?;
let val = Value::des(&buf)?;
let desc = match val {
Value::Image(img) => {
format!(" img {} bytes", img.len())
}
Value::WalRecord(rec) => {
let wal_desc = walrecord::describe_wal_record(&rec)?;
format!(
" rec {} bytes will_init: {} {}",
buf.len(),
rec.will_init(),
wal_desc
)
}
};
Ok(desc)
}
};
tree_reader
.visit(
&[0u8; DELTA_KEY_SIZE],
VisitDirection::Forwards,
|delta_key, val| {
let blob_ref = BlobRef(val);
let key = DeltaKey::extract_key_from_buf(delta_key);
let lsn = DeltaKey::extract_lsn_from_buf(delta_key);
let desc = match dump_blob(blob_ref) {
Ok(desc) => desc,
Err(err) => format!("ERROR: {}", err),
};
println!(" key {} at {}: {}", key, lsn, desc);
true
},
)
.await?;
for entry in keys {
let DeltaEntry { key, lsn, val, .. } = entry;
let desc = match dump_blob(val).await {
Ok(desc) => desc,
Err(err) => {
let err: anyhow::Error = err;
format!("ERROR: {err}")
}
};
println!(" key {key} at {lsn}: {desc}");
}
Ok(())
}

View File

@@ -230,11 +230,11 @@ impl std::fmt::Display for InMemoryLayer {
impl InMemoryLayer {
///
/// Get layer size on the disk
/// Get layer size.
///
pub async fn size(&self) -> Result<u64> {
let inner = self.inner.read().await;
Ok(inner.file.size)
Ok(inner.file.size())
}
///

View File

@@ -140,6 +140,12 @@ fn drop_rlock<T>(rlock: tokio::sync::OwnedRwLockReadGuard<T>) {
fn drop_wlock<T>(rlock: tokio::sync::RwLockWriteGuard<'_, T>) {
drop(rlock)
}
/// The outward-facing resources required to build a Timeline
pub struct TimelineResources {
pub remote_client: Option<RemoteTimelineClient>,
}
pub struct Timeline {
conf: &'static PageServerConf,
tenant_conf: Arc<RwLock<TenantConfOpt>>,
@@ -1374,7 +1380,7 @@ impl Timeline {
timeline_id: TimelineId,
tenant_id: TenantId,
walredo_mgr: Arc<dyn WalRedoManager + Send + Sync>,
remote_client: Option<RemoteTimelineClient>,
resources: TimelineResources,
pg_version: u32,
initial_logical_size_can_start: Option<completion::Barrier>,
initial_logical_size_attempt: Option<completion::Completion>,
@@ -1409,7 +1415,7 @@ impl Timeline {
walredo_mgr,
walreceiver: Mutex::new(None),
remote_client: remote_client.map(Arc::new),
remote_client: resources.remote_client.map(Arc::new),
// initialize in-memory 'last_record_lsn' from 'disk_consistent_lsn'.
last_record_lsn: SeqWait::new(RecordLsn {
@@ -1730,7 +1736,7 @@ impl Timeline {
let mut corrupted_local_layers = Vec::new();
let mut added_remote_layers = Vec::new();
for remote_layer_name in &index_part.timeline_layers {
for remote_layer_name in index_part.layer_metadata.keys() {
let local_layer = local_only_layers.remove(remote_layer_name);
let remote_layer_metadata = index_part
@@ -1890,7 +1896,7 @@ impl Timeline {
Some(index_part) => {
info!(
"initializing upload queue from remote index with {} layer files",
index_part.timeline_layers.len()
index_part.layer_metadata.len()
);
remote_client.init_upload_queue(index_part)?;
self.create_remote_layers(index_part, local_layers, disk_consistent_lsn)

View File

@@ -25,7 +25,7 @@ use crate::{
InitializationOrder,
};
use super::Timeline;
use super::{Timeline, TimelineResources};
/// Now that the Timeline is in Stopping state, request all the related tasks to shut down.
async fn stop_tasks(timeline: &Timeline) -> Result<(), DeleteTimelineError> {
@@ -416,7 +416,7 @@ impl DeleteTimelineFlow {
timeline_id,
local_metadata,
None, // Ancestor is not needed for deletion.
remote_client,
TimelineResources { remote_client },
init_order,
// Important. We dont pass ancestor above because it can be missing.
// Thus we need to skip the validation here.

View File

@@ -140,23 +140,12 @@ impl UploadQueue {
}
}
let mut files = HashMap::with_capacity(index_part.timeline_layers.len());
for layer_name in &index_part.timeline_layers {
match index_part
.layer_metadata
.get(layer_name)
.map(LayerFileMetadata::from)
{
Some(layer_metadata) => {
files.insert(layer_name.to_owned(), layer_metadata);
}
None => {
anyhow::bail!(
"No remote layer metadata found for layer {}",
layer_name.file_name()
);
}
}
let mut files = HashMap::with_capacity(index_part.layer_metadata.len());
for (layer_name, layer_metadata) in &index_part.layer_metadata {
files.insert(
layer_name.to_owned(),
LayerFileMetadata::from(layer_metadata),
);
}
let index_part_metadata = index_part.parse_metadata()?;

View File

@@ -312,7 +312,7 @@ impl<'a> WalIngest<'a> {
// particular point in the WAL. For more fine-grained control,
// we could peek into the message and only pause if it contains
// a particular string, for example, but this is enough for now.
utils::failpoint_sleep_millis_async!("wal-ingest-logical-message-sleep");
crate::failpoint_support::sleep_millis_async!("wal-ingest-logical-message-sleep");
}
}

View File

@@ -64,13 +64,13 @@ pub struct EndpointConnPool {
total_conns: usize,
}
/// This is cheap and not hugely secure.
/// But probably good enough for in memory only hashes.
/// 4096 is the number of rounds that SCRAM-SHA-256 recommends.
/// It's not the 600,000 that OWASP recommends... but our passwords are high entropy anyway.
///
/// Still takes 3.5ms to hash on my hardware.
/// Still takes 1.4ms to hash on my hardware.
/// We don't want to ruin the latency improvements of using the pool by making password verification take too long
const PARAMS: Params = Params {
rounds: 10_000,
rounds: 4096,
output_length: 32,
};
@@ -99,6 +99,10 @@ pub struct GlobalConnPool {
max_conns_per_endpoint: usize,
proxy_config: &'static crate::config::ProxyConfig,
// Using a lock to remove any race conditions.
// Eg cleaning up connections while a new connection is returned
closed: RwLock<bool>,
}
impl GlobalConnPool {
@@ -108,9 +112,24 @@ impl GlobalConnPool {
global_pool_size: AtomicUsize::new(0),
max_conns_per_endpoint: MAX_CONNS_PER_ENDPOINT,
proxy_config: config,
closed: RwLock::new(false),
})
}
pub fn shutdown(&self) {
*self.closed.write() = true;
self.global_pool.retain(|_, endpoint_pool| {
let mut pool = endpoint_pool.write();
// by clearing this hashmap, we remove the slots that a connection can be returned to.
// when returning, it drops the connection if the slot doesn't exist
pool.pools.clear();
pool.total_conns = 0;
false
});
}
pub async fn get(
&self,
conn_info: &ConnInfo,
@@ -208,7 +227,20 @@ impl GlobalConnPool {
new_client
}
pub async fn put(&self, conn_info: &ConnInfo, client: Client) -> anyhow::Result<()> {
pub fn put(&self, conn_info: &ConnInfo, client: Client) -> anyhow::Result<()> {
// We want to hold this open while we return. This ensures that the pool can't close
// while we are in the middle of returning the connection.
let closed = self.closed.read();
if *closed {
info!("pool: throwing away connection '{conn_info}' because pool is closed");
return Ok(());
}
if client.inner.is_closed() {
info!("pool: throwing away connection '{conn_info}' because connection is closed");
return Ok(());
}
let pool = self.get_or_create_endpoint_pool(&conn_info.hostname);
// return connection to the pool

View File

@@ -16,7 +16,6 @@ use tokio_postgres::types::Type;
use tokio_postgres::GenericClient;
use tokio_postgres::IsolationLevel;
use tokio_postgres::Row;
use tracing::Instrument;
use url::Url;
use super::conn_pool::ConnInfo;
@@ -286,13 +285,12 @@ pub async fn handle(
};
if allow_pool {
let current_span = tracing::Span::current();
// return connection to the pool
tokio::task::spawn(
async move {
let _ = conn_pool.put(&conn_info, client).await;
}
.in_current_span(),
);
tokio::task::spawn_blocking(move || {
let _span = current_span.enter();
let _ = conn_pool.put(&conn_info, client);
});
}
result

View File

@@ -269,6 +269,18 @@ pub async fn task_main(
let conn_pool: Arc<GlobalConnPool> = GlobalConnPool::new(config);
// shutdown the connection pool
tokio::spawn({
let cancellation_token = cancellation_token.clone();
let conn_pool = conn_pool.clone();
async move {
cancellation_token.cancelled().await;
tokio::task::spawn_blocking(move || conn_pool.shutdown())
.await
.unwrap();
}
});
let tls_config = config.tls_config.as_ref().map(|cfg| cfg.to_server_config());
let tls_acceptor: tokio_rustls::TlsAcceptor = match tls_config {
Some(config) => config.into(),

View File

@@ -1,581 +0,0 @@
import argparse
import asyncio
import enum
import json
import os
import pprint
import tempfile
from asyncio import subprocess
from datetime import date, datetime
from pathlib import Path
from typing import Any, Dict, List, Optional, Set
"""
This is the automation tool that was mostly helpful during our big aws account migration,
but may be helpful in other day to day tasks and concentrate knowledge about operations
that can help during on-call.
This script executes commands on remote using ssh multiplexing. See referenes:
https://blog.scottlowe.org/2015/12/11/using-ssh-multiplexing/
https://github.com/openssh-rust/openssh/blob/master/src/builder.rs
https://github.com/openssh-rust/openssh/blob/master/src/process_impl/session.rs
https://en.wikibooks.org/wiki/OpenSSH/Cookbook/Multiplexing
https://docs.rs/openssh/0.9.8/openssh/
For use with teleport you'll need to setup nsh script mentioned here:
https://github.com/neondatabase/cloud/wiki/Cloud%3A-access#3-access-the-nodes-with-ssm
"""
def show_line(output_label: Optional[str], line: str):
if output_label is not None:
print(f"({output_label})", line, end="")
else:
print(" ", line, end="")
if not line:
print()
async def exec_checked(
program: str,
args: List[str],
err_msg: Optional[str] = None,
output_label: Optional[str] = None,
show_output: bool = True,
expected_exit_codes=frozenset((0,)),
) -> List[str]:
if show_output:
print("+", program, *args)
proc = await subprocess.create_subprocess_exec(
program,
*args,
stdout=asyncio.subprocess.PIPE,
limit=10 << 20,
)
assert proc.stdout is not None
out = []
line = (await proc.stdout.readline()).decode()
if show_output:
show_line(output_label, line)
out.append(line)
while line:
line = (await proc.stdout.readline()).decode()
# empty line means eof, actual empty line from the program is represented by "\n"
if not line:
continue
if show_output:
show_line(output_label, line)
out.append(line)
exit_code = await proc.wait()
assert exit_code in expected_exit_codes, err_msg or f"{program} failed with {exit_code}"
return out
class Connection:
def __init__(
self,
tempdir: tempfile.TemporaryDirectory, # type: ignore
target: str,
):
self.tempdir = tempdir
self.target = target
def get_args(self, extra_args: List[str]):
ctl_path = os.path.join(self.tempdir.name, "master")
return ["-S", ctl_path, "-o", "BatchMode=yes", *extra_args, "none"]
async def check(self):
args = self.get_args(["-O", "check"])
await exec_checked("ssh", args, err_msg="master check operation failed")
async def spawn(self, cmd: str):
# https://github.com/openssh-rust/openssh/blob/cd8f174fafc530d8e55c2aa63add14a24cb2b94c/src/process_impl/session.rs#L72
local_args = self.get_args(["-T", "-p", "9"])
local_args.extend(["--", f"bash -c '{cmd}'"])
return await exec_checked(
"ssh", local_args, err_msg="spawn failed", output_label=self.target
)
async def close(self):
args = self.get_args(["-O", "exit"])
await exec_checked("ssh", args, err_msg="master exit operation failed")
async def connect(target: str) -> Connection:
"""
target is directly passed to ssh command
"""
# NOTE: it is mentioned that this setup is not secure
# For better security it should be placed somewhere in ~/.ssh
# or in other directory with proper permissions
# openssh-rust does it the same way
# https://github.com/openssh-rust/openssh/blob/master/src/builder.rs
connection_dir = tempfile.TemporaryDirectory(suffix=".ssh-multiplexed")
# "-E logfile"
await exec_checked(
"ssh",
[
"-S",
os.path.join(connection_dir.name, "master"),
"-M", # Places the ssh client into “master” mode for connection sharing.
"-f", # Requests ssh to go to background just before command execution.
"-N", # Do not execute a remote command. This is useful for just forwarding ports.
"-o",
"BatchMode=yes",
target,
],
err_msg="starting master process failed",
)
return Connection(tempdir=connection_dir, target=target)
class Timer:
def __init__(self, msg: str) -> None:
self.t0 = datetime.now()
self.msg = msg
def __enter__(self):
return None
def __exit__(self, *_):
print(self.msg, datetime.now() - self.t0)
def parse_date(s: str) -> date:
return datetime.strptime(s, "%Y-%m-%d").date()
def write_line(f, line: str):
f.write(line)
f.write("\n")
async def pageserver_tenant_sizes(
pageserver_target: str, tenants_of_interest: Optional[List[str]] = None
) -> Dict[str, int]:
"""
With ondemand it should rather look at physical size api
For old projects since we dont have eviction yet,
we can look at local fs state.
"""
if tenants_of_interest is not None:
tenants_of_interest = set(tenants_of_interest) # type: ignore
ps_connection = await connect(pageserver_target)
out = await ps_connection.spawn("du -sb /storage/pageserver/data/tenants/* | sort -rh")
tenants = {}
for line in out:
if line.startswith("du: cannot read directory"):
continue
size, tenant_path = map(str.strip, line.split())
tenant = Path(tenant_path).stem
if tenants_of_interest is not None:
if tenant not in tenants_of_interest:
continue
tenants[tenant] = int(size)
return tenants
async def fetch_ps_size(args):
if args.input is not None:
tenants = Path(args.input).read_text().splitlines()
else:
tenants = None
sizes = await pageserver_tenant_sizes(args.target, tenants_of_interest=tenants)
total = 0
for tenant, size in sorted(sizes.items(), key=lambda x: x[1], reverse=True):
total += size
print(tenant, size)
print("total", total)
@enum.unique
class Env(enum.Enum):
STAGING = "staging"
PRODUCTION = "production"
class ConsoleAdminShortcuts:
def __init__(self, env: Env, verbose: bool = False):
if env is Env.STAGING:
self.admin_base_url = "https://console.neon.tech/api/v1"
self.management_base_url = "http://console-staging.local:3440/management/api/v2"
elif env is Env.PRODUCTION:
self.admin_base_url = "https://console.neon.tech"
self.management_base_url = "http://console-release.local:3441/management/api/v2"
self.api_token = os.getenv("CONSOLE_ADMIN_API_TOKEN")
assert self.api_token, '"CONSOLE_ADMIN_API_TOKEN" is missing in env'
self.verbose = verbose
async def check_availability(self, project_id: str):
url = f"{self.admin_base_url}/admin/projects/{project_id}/check_availability"
output = await exec_checked(
"curl",
[
"--silent",
"--fail",
"-XPOST",
url,
"-H",
f"Authorization: Bearer {self.api_token}",
"-H",
"Accept: application/json",
],
show_output=self.verbose,
)
assert len(output) == 1 # output should be one line of json
return json.loads(output.pop())
async def get_operation(self, operation_id: str):
url = f"{self.admin_base_url}/admin/operations/{operation_id}"
output = await exec_checked(
"curl",
[
"--silent",
"--fail",
url,
"-H",
f"Authorization: Bearer {self.api_token}",
"-H",
"Accept: application/json",
],
show_output=self.verbose,
)
assert len(output) == 1 # output should be one line of json
return json.loads(output.pop())
async def get_pageservers(self):
url = f"{self.admin_base_url}/admin/pageservers"
output = await exec_checked(
"curl",
[
"--silent",
"--fail",
url,
"-H",
f"Authorization: Bearer {self.api_token}",
"-H",
"Accept: application/json",
],
show_output=self.verbose,
)
assert len(output) == 1 # output should be one line of json
return json.loads(output.pop())
async def set_maintenance(self, project_id: str, maintenance: bool) -> Dict[str, Any]:
"""
Example response:
{
"project": {
"id": "tight-wood-864662",
"maintenance_set_at": "2023-01-31T13:36:45.90346Z"
},
"operations": [
{
"id": "216142e0-fbb7-4f41-a470-e63408d4d6b4"
}
]
}
"""
url = f"{self.management_base_url}/projects/{project_id}/maintenance"
data = json.dumps({"maintenance": maintenance})
if not self.verbose:
args = ["--silent"]
else:
args = []
args.extend(
[
"--fail",
"-XPUT",
url,
"-H",
f"Authorization: Bearer {self.api_token}",
"-H",
"Accept: application/json",
"-d",
data,
]
)
output = await exec_checked(
"curl",
[],
show_output=self.verbose,
)
assert len(output) == 1 # output should be one line of json
ret = json.loads(output.pop())
assert isinstance(ret, Dict)
return ret
async def fetch_branches(self, project_id: str):
url = f"{self.admin_base_url}/admin/branches?project_id={project_id}"
output = await exec_checked(
"curl",
[
"--silent",
"--fail",
url,
"-H",
f"Authorization: Bearer {self.api_token}",
"-H",
"Accept: application/json",
],
show_output=self.verbose,
)
assert len(output) == 1 # output should be one line of json
return json.loads(output.pop())
async def poll_pending_ops(console: ConsoleAdminShortcuts, pending_ops: Set[str]):
finished = set() # needed because sets cannot be changed during iteration
for pending_op in pending_ops:
data = await console.get_operation(pending_op)
operation = data["operation"]
status = operation["status"]
if status == "failed":
print(f"ERROR: operation {pending_op} failed")
continue
if operation["failures_count"] != 0:
print(f"WARN: operation {pending_op} has failures != 0")
continue
if status == "finished":
print(f"operation {pending_op} finished")
finished.add(pending_op)
else:
print(f"operation {pending_op} is still pending: {status}")
pending_ops.difference_update(finished)
async def check_availability(args):
console = ConsoleAdminShortcuts(env=Env(args.env))
max_concurrent_checks = args.max_concurrent_checks
# reverse to keep the order because we will be popping from the end
projects: List[str] = list(reversed(Path(args.input).read_text().splitlines()))
print("n_projects", len(projects))
pending_ops: Set[str] = set()
while projects:
# walk through pending ops
if pending_ops:
print("pending", len(pending_ops), pending_ops)
await poll_pending_ops(console, pending_ops)
# schedule new ops if limit allows
while len(pending_ops) < max_concurrent_checks and len(projects) > 0:
project = projects.pop()
print("starting:", project, len(projects))
# there can be many operations, one for each endpoint
data = await console.check_availability(project)
for operation in data["operations"]:
pending_ops.add(operation["ID"])
# wait a bit before starting next one
await asyncio.sleep(2)
if projects:
# sleep a little bit to give operations time to finish
await asyncio.sleep(5)
print("all scheduled, poll pending", len(pending_ops), pending_ops, projects)
while pending_ops:
await poll_pending_ops(console, pending_ops)
await asyncio.sleep(5)
async def maintain(args):
console = ConsoleAdminShortcuts(env=Env(args.env))
finish_flag = args.finish
projects: List[str] = Path(args.input).read_text().splitlines()
print("n_projects", len(projects))
pending_ops: Set[str] = set()
for project in projects:
data = await console.set_maintenance(project, maintenance=not finish_flag)
print(project, len(data["operations"]))
for operation in data["operations"]:
pending_ops.add(operation["id"])
if finish_flag:
assert len(pending_ops) == 0
return
print("all scheduled, poll pending", len(pending_ops), pending_ops)
while pending_ops:
await poll_pending_ops(console, pending_ops)
print("n pending ops:", len(pending_ops))
if pending_ops:
await asyncio.sleep(5)
SOURCE_BUCKET = "zenith-storage-oregon"
AWS_REGION = "us-west-2"
SAFEKEEPER_SOURCE_PREFIX_IN_BUCKET = "prod-1/wal"
async def fetch_sk_s3_size(args):
tenants: List[str] = Path(args.input).read_text().splitlines()
total_objects = 0
total_size = 0
for tenant in tenants:
wal_prefix = f"s3://{SOURCE_BUCKET}/{SAFEKEEPER_SOURCE_PREFIX_IN_BUCKET}/{tenant}"
result = await exec_checked(
"aws",
[
"--profile",
"neon_main",
"s3",
"ls",
"--recursive",
"--summarize",
wal_prefix,
],
expected_exit_codes={0, 1},
show_output=False,
)
objects = int(result[-2].rsplit(maxsplit=1).pop())
total_objects += objects
size = int(result[-1].rsplit(maxsplit=1).pop())
total_size += size
print(tenant, "objects", objects, "size", size)
print("total_objects", total_objects, "total_size", total_size)
async def fetch_branches(args):
console = ConsoleAdminShortcuts(env=Env(args.env))
project_id = args.project_id
pprint.pprint(await console.fetch_branches(project_id=project_id))
async def get_pageservers(args):
console = ConsoleAdminShortcuts(env=Env(args.env))
pprint.pprint(await console.get_pageservers())
async def main():
parser = argparse.ArgumentParser("migrator")
sub = parser.add_subparsers(title="commands", dest="subparser_name")
split_parser = sub.add_parser(
"split",
)
split_parser.add_argument(
"--input",
help="CSV file with results from snowflake query mentioned in README.",
required=True,
)
split_parser.add_argument(
"--out",
help="Directory to store groups of projects. Directory name is pageserver id.",
required=True,
)
split_parser.add_argument(
"--last-usage-cutoff",
dest="last_usage_cutoff",
help="Projects which do not have compute time starting from passed date (e g 2022-12-01) wil be considered not used recently",
required=True,
)
split_parser.add_argument(
"--select-pageserver-id",
help="Filter input for this pageserver id",
required=True,
)
fetch_ps_size_parser = sub.add_parser("fetch-ps-size")
fetch_ps_size_parser.add_argument(
"--target",
help="Target pageserver host as resolvable by ssh",
required=True,
)
fetch_ps_size_parser.add_argument(
"--input",
help="File containing list of tenants to include",
)
check_availability_parser = sub.add_parser("check-availability")
check_availability_parser.add_argument(
"--input",
help="File containing list of projects to run availability checks for",
)
check_availability_parser.add_argument(
"--env", choices=["staging", "production"], default="staging"
)
check_availability_parser.add_argument(
"--max-concurrent-checks",
help="Max number of simultaneously active availability checks",
type=int,
default=50,
)
maintain_parser = sub.add_parser("maintain")
maintain_parser.add_argument(
"--input",
help="File containing list of projects",
)
maintain_parser.add_argument("--env", choices=["staging", "production"], default="staging")
maintain_parser.add_argument(
"--finish",
action="store_true",
)
fetch_sk_s3_size_parser = sub.add_parser("fetch-sk-s3-size")
fetch_sk_s3_size_parser.add_argument(
"--input",
help="File containing list of tenants",
)
fetch_branches_parser = sub.add_parser("fetch-branches")
fetch_branches_parser.add_argument("--project-id")
fetch_branches_parser.add_argument(
"--env", choices=["staging", "production"], default="staging"
)
get_pageservers_parser = sub.add_parser("get-pageservers")
get_pageservers_parser.add_argument(
"--env", choices=["staging", "production"], default="staging"
)
args = parser.parse_args()
handlers = {
"fetch-ps-size": fetch_ps_size,
"check-availability": check_availability,
"maintain": maintain,
"fetch-sk-s3-size": fetch_sk_s3_size,
"fetch-branches": fetch_branches,
"get-pageservers": get_pageservers,
}
handler = handlers.get(args.subparser_name)
if handler:
await handler(args)
else:
parser.print_help()
if __name__ == "__main__":
asyncio.run(main())

View File

@@ -427,6 +427,7 @@ class NeonEnvBuilder:
default_branch_name: str = DEFAULT_BRANCH_NAME,
preserve_database_files: bool = False,
initial_tenant: Optional[TenantId] = None,
initial_timeline: Optional[TimelineId] = None,
):
self.repo_dir = repo_dir
self.rust_log_override = rust_log_override
@@ -452,6 +453,7 @@ class NeonEnvBuilder:
self.pg_version = pg_version
self.preserve_database_files = preserve_database_files
self.initial_tenant = initial_tenant or TenantId.generate()
self.initial_timeline = initial_timeline or TimelineId.generate()
def init_configs(self) -> NeonEnv:
# Cannot create more than one environment from one builder
@@ -473,9 +475,10 @@ class NeonEnvBuilder:
f"Services started, creating initial tenant {env.initial_tenant} and its initial timeline"
)
initial_tenant, initial_timeline = env.neon_cli.create_tenant(
tenant_id=env.initial_tenant, conf=initial_tenant_conf
tenant_id=env.initial_tenant, conf=initial_tenant_conf, timeline_id=env.initial_timeline
)
env.initial_timeline = initial_timeline
assert env.initial_tenant == initial_tenant
assert env.initial_timeline == initial_timeline
log.info(f"Initial timeline {initial_tenant}/{initial_timeline} created successfully")
return env
@@ -784,7 +787,7 @@ class NeonEnv:
# generate initial tenant ID here instead of letting 'neon init' generate it,
# so that we don't need to dig it out of the config file afterwards.
self.initial_tenant = config.initial_tenant
self.initial_timeline: Optional[TimelineId] = None
self.initial_timeline = config.initial_timeline
# Create a config file corresponding to the options
toml = textwrap.dedent(

View File

@@ -315,4 +315,4 @@ MANY_SMALL_LAYERS_TENANT_CONFIG = {
def poll_for_remote_storage_iterations(remote_storage_kind: RemoteStorageKind) -> int:
return 40 if remote_storage_kind is RemoteStorageKind.REAL_S3 else 10
return 40 if remote_storage_kind is RemoteStorageKind.REAL_S3 else 15

View File

@@ -1,4 +1,3 @@
import shutil
import time
from dataclasses import dataclass
from typing import Dict, Tuple
@@ -14,7 +13,7 @@ from fixtures.neon_fixtures import (
)
from fixtures.pageserver.http import PageserverHttpClient
from fixtures.pageserver.utils import wait_for_upload_queue_empty
from fixtures.remote_storage import LocalFsStorage, RemoteStorageKind
from fixtures.remote_storage import RemoteStorageKind
from fixtures.types import Lsn, TenantId, TimelineId
from fixtures.utils import wait_until
@@ -138,22 +137,14 @@ def eviction_env(request, neon_env_builder: NeonEnvBuilder, pg_bin: PgBin) -> Ev
neon_env_builder.enable_remote_storage(RemoteStorageKind.LOCAL_FS, f"{request.node.name}")
env = neon_env_builder.init_start()
# initial tenant will not be present on this pageserver
env = neon_env_builder.init_configs()
env.start()
pageserver_http = env.pageserver.http_client()
# allow because we are invoking this manually; we always warn on executing disk based eviction
env.pageserver.allowed_errors.append(r".* running disk usage based eviction due to pressure.*")
# remove the initial tenant
assert env.initial_timeline
pageserver_http.tenant_detach(env.initial_tenant)
assert isinstance(env.remote_storage, LocalFsStorage)
tenant_remote_storage = env.remote_storage.root / "tenants" / str(env.initial_tenant)
assert tenant_remote_storage.is_dir()
shutil.rmtree(tenant_remote_storage)
env.initial_tenant = TenantId("0" * 32)
env.initial_timeline = None
# Choose small layer_size so that we can use low pgbench_scales and still get a large count of layers.
# Large count of layers and small layer size is good for testing because it makes evictions predictable.
# Predictable in the sense that many layer evictions will be required to reach the eviction target, because

View File

@@ -11,8 +11,7 @@ from fixtures.neon_fixtures import (
wait_for_last_flush_lsn,
)
from fixtures.remote_storage import RemoteStorageKind
from fixtures.types import TenantId, TimelineId
from fixtures.utils import query_scalar
from fixtures.types import TimelineId
# Test configuration
#
@@ -71,13 +70,11 @@ def test_gc_aggressive(neon_env_builder: NeonEnvBuilder):
# Disable pitr, because here we want to test branch creation after GC
neon_env_builder.pageserver_config_override = "tenant_config={pitr_interval = '0 sec'}"
env = neon_env_builder.init_start()
env.neon_cli.create_branch("test_gc_aggressive", "main")
timeline = env.neon_cli.create_branch("test_gc_aggressive", "main")
endpoint = env.endpoints.create_start("test_gc_aggressive")
log.info("postgres is running on test_gc_aggressive branch")
with endpoint.cursor() as cur:
timeline = TimelineId(query_scalar(cur, "SHOW neon.timeline_id"))
# Create table, and insert the first 100 rows
cur.execute("CREATE TABLE foo (id int, counter int, t text)")
cur.execute(
@@ -109,7 +106,8 @@ def test_gc_index_upload(neon_env_builder: NeonEnvBuilder, remote_storage_kind:
)
env = neon_env_builder.init_start()
env.neon_cli.create_branch("test_gc_index_upload", "main")
tenant_id = env.initial_tenant
timeline_id = env.neon_cli.create_branch("test_gc_index_upload", "main")
endpoint = env.endpoints.create_start("test_gc_index_upload")
pageserver_http = env.pageserver.http_client()
@@ -117,9 +115,6 @@ def test_gc_index_upload(neon_env_builder: NeonEnvBuilder, remote_storage_kind:
pg_conn = endpoint.connect()
cur = pg_conn.cursor()
tenant_id = TenantId(query_scalar(cur, "SHOW neon.tenant_id"))
timeline_id = TimelineId(query_scalar(cur, "SHOW neon.timeline_id"))
cur.execute("CREATE TABLE foo (id int, counter int, t text)")
cur.execute(
"""

View File

@@ -12,13 +12,8 @@ from fixtures.neon_fixtures import NeonEnvBuilder, PgBin
# test anyway, so it doesn't need any special attention here.
@pytest.mark.timeout(600)
def test_gc_cutoff(neon_env_builder: NeonEnvBuilder, pg_bin: PgBin):
env = neon_env_builder.init_start()
pageserver_http = env.pageserver.http_client()
# Use aggressive GC and checkpoint settings, so that we also exercise GC during the test
tenant_id, _ = env.neon_cli.create_tenant(
conf={
env = neon_env_builder.init_start(
initial_tenant_conf={
"gc_period": "10 s",
"gc_horizon": f"{1024 ** 2}",
"checkpoint_distance": f"{1024 ** 2}",
@@ -29,6 +24,11 @@ def test_gc_cutoff(neon_env_builder: NeonEnvBuilder, pg_bin: PgBin):
"image_creation_threshold": "2",
}
)
pageserver_http = env.pageserver.http_client()
# Use aggressive GC and checkpoint settings, so that we also exercise GC during the test
tenant_id = env.initial_tenant
endpoint = env.endpoints.create_start("main", tenant_id=tenant_id)
connstr = endpoint.connstr(options="-csynchronous_commit=off")
pg_bin.run_capture(["pgbench", "-i", "-s10", connstr])
@@ -39,5 +39,4 @@ def test_gc_cutoff(neon_env_builder: NeonEnvBuilder, pg_bin: PgBin):
with pytest.raises(subprocess.SubprocessError):
pg_bin.run_capture(["pgbench", "-P1", "-N", "-c5", "-T500", "-Mprepared", connstr])
env.pageserver.stop()
env.pageserver.start()
pageserver_http.configure_failpoints(("after-timeline-gc-removed-layers", "exit"))
env.pageserver.start(extra_env_vars={"FAILPOINTS": "after-timeline-gc-removed-layers=exit"})

View File

@@ -74,9 +74,9 @@ def test_large_schema(neon_env_builder: NeonEnvBuilder):
cur.execute("select * from pg_depend order by refclassid, refobjid, refobjsubid")
# Check layer file sizes
tenant_id = endpoint.safe_psql("show neon.tenant_id")[0][0]
timeline_id = endpoint.safe_psql("show neon.timeline_id")[0][0]
timeline_path = "{}/tenants/{}/timelines/{}/".format(env.repo_dir, tenant_id, timeline_id)
timeline_path = "{}/tenants/{}/timelines/{}/".format(
env.repo_dir, env.initial_tenant, env.initial_timeline
)
for filename in os.listdir(timeline_path):
if filename.startswith("00000"):
log.info(f"layer {filename} size is {os.path.getsize(timeline_path + filename)}")

View File

@@ -8,7 +8,7 @@ from fixtures.neon_fixtures import (
)
from fixtures.pageserver.utils import wait_for_last_record_lsn, wait_for_upload
from fixtures.remote_storage import RemoteStorageKind
from fixtures.types import Lsn, TenantId, TimelineId
from fixtures.types import Lsn
from fixtures.utils import query_scalar
@@ -34,8 +34,8 @@ def test_basic_eviction(
client = env.pageserver.http_client()
endpoint = env.endpoints.create_start("main")
tenant_id = TenantId(endpoint.safe_psql("show neon.tenant_id")[0][0])
timeline_id = TimelineId(endpoint.safe_psql("show neon.timeline_id")[0][0])
tenant_id = env.initial_tenant
timeline_id = env.initial_timeline
# Create a number of layers in the tenant
with endpoint.cursor() as cur:

View File

@@ -18,8 +18,7 @@ from fixtures.neon_fixtures import (
)
from fixtures.port_distributor import PortDistributor
from fixtures.remote_storage import RemoteStorageKind
from fixtures.types import TenantId, TimelineId
from fixtures.utils import query_scalar
from fixtures.types import TenantId
from pytest_httpserver import HTTPServer
from werkzeug.wrappers.request import Request
from werkzeug.wrappers.response import Response
@@ -115,15 +114,13 @@ def test_metric_collection(
# Order of fixtures shutdown is not specified, and if http server gets down
# before pageserver, pageserver log might contain such errors in the end.
env.pageserver.allowed_errors.append(".*metrics endpoint refused the sent metrics*")
env.neon_cli.create_branch("test_metric_collection")
tenant_id = env.initial_tenant
timeline_id = env.neon_cli.create_branch("test_metric_collection")
endpoint = env.endpoints.create_start("test_metric_collection")
pg_conn = endpoint.connect()
cur = pg_conn.cursor()
tenant_id = TenantId(query_scalar(cur, "SHOW neon.tenant_id"))
timeline_id = TimelineId(query_scalar(cur, "SHOW neon.timeline_id"))
cur.execute("CREATE TABLE foo (id int, counter int, t text)")
cur.execute(
"""

View File

@@ -78,8 +78,8 @@ def test_ondemand_download_large_rel(
client = env.pageserver.http_client()
tenant_id = endpoint.safe_psql("show neon.tenant_id")[0][0]
timeline_id = endpoint.safe_psql("show neon.timeline_id")[0][0]
tenant_id = env.initial_tenant
timeline_id = env.initial_timeline
# We want to make sure that the data is large enough that the keyspace is partitioned.
num_rows = 1000000
@@ -183,8 +183,8 @@ def test_ondemand_download_timetravel(
client = env.pageserver.http_client()
tenant_id = endpoint.safe_psql("show neon.tenant_id")[0][0]
timeline_id = endpoint.safe_psql("show neon.timeline_id")[0][0]
tenant_id = env.initial_tenant
timeline_id = env.initial_timeline
lsns = []
@@ -342,8 +342,8 @@ def test_download_remote_layers_api(
client = env.pageserver.http_client()
tenant_id = endpoint.safe_psql("show neon.tenant_id")[0][0]
timeline_id = endpoint.safe_psql("show neon.timeline_id")[0][0]
tenant_id = env.initial_tenant
timeline_id = env.initial_timeline
table_len = 10000
with endpoint.cursor() as cur:
@@ -516,7 +516,6 @@ def test_compaction_downloads_on_demand_without_image_creation(
tenant_id = env.initial_tenant
timeline_id = env.initial_timeline
assert timeline_id is not None
with env.endpoints.create_start("main") as endpoint:
# no particular reason to create the layers like this, but we are sure
@@ -590,7 +589,6 @@ def test_compaction_downloads_on_demand_with_image_creation(
env = neon_env_builder.init_start(initial_tenant_conf=stringify(conf))
tenant_id = env.initial_tenant
timeline_id = env.initial_timeline
assert timeline_id is not None
pageserver_http = env.pageserver.http_client()

View File

@@ -2,7 +2,7 @@ from contextlib import closing
from fixtures.neon_fixtures import NeonEnvBuilder
from fixtures.pageserver.utils import wait_for_last_record_lsn
from fixtures.types import Lsn, TenantId, TimelineId
from fixtures.types import Lsn
from fixtures.utils import query_scalar
@@ -12,24 +12,21 @@ from fixtures.utils import query_scalar
# Additionally, tests that pageserver is able to create tenants with custom configs.
def test_read_request_tracing(neon_env_builder: NeonEnvBuilder):
neon_env_builder.num_safekeepers = 1
env = neon_env_builder.init_start()
tenant, _ = env.neon_cli.create_tenant(
conf={
env = neon_env_builder.init_start(
initial_tenant_conf={
"trace_read_requests": "true",
}
)
timeline = env.neon_cli.create_timeline("test_trace_replay", tenant_id=tenant)
endpoint = env.endpoints.create_start("test_trace_replay", "main", tenant)
tenant_id = env.initial_tenant
timeline_id = env.initial_timeline
endpoint = env.endpoints.create_start("main")
with closing(endpoint.connect()) as conn:
with conn.cursor() as cur:
cur.execute("create table t (i integer);")
cur.execute(f"insert into t values (generate_series(1,{10000}));")
cur.execute("select count(*) from t;")
tenant_id = TenantId(endpoint.safe_psql("show neon.tenant_id")[0][0])
timeline_id = TimelineId(endpoint.safe_psql("show neon.timeline_id")[0][0])
current_lsn = Lsn(query_scalar(cur, "SELECT pg_current_wal_flush_lsn()"))
# wait until pageserver receives that data
pageserver_http = env.pageserver.http_client()
@@ -38,5 +35,5 @@ def test_read_request_tracing(neon_env_builder: NeonEnvBuilder):
# Stop postgres so we drop the connection and flush the traces
endpoint.stop()
trace_path = env.repo_dir / "traces" / str(tenant) / str(timeline)
trace_path = env.repo_dir / "traces" / str(tenant_id) / str(timeline_id)
assert trace_path.exists()

View File

@@ -95,12 +95,12 @@ def test_remote_storage_backup_and_restore(
client = env.pageserver.http_client()
tenant_id = TenantId(endpoint.safe_psql("show neon.tenant_id")[0][0])
timeline_id = TimelineId(endpoint.safe_psql("show neon.timeline_id")[0][0])
tenant_id = env.initial_tenant
timeline_id = env.initial_timeline
# Thats because of UnreliableWrapper's injected failures
env.pageserver.allowed_errors.append(
f".*failed to fetch tenant deletion mark at tenants/({tenant_id}|{env.initial_tenant})/deleted attempt 1.*"
f".*failed to fetch tenant deletion mark at tenants/{tenant_id}/deleted attempt 1.*"
)
checkpoint_numbers = range(1, 3)
@@ -403,8 +403,7 @@ def test_remote_timeline_client_calls_started_metric(
)
tenant_id = env.initial_tenant
assert env.initial_timeline is not None
timeline_id: TimelineId = env.initial_timeline
timeline_id = env.initial_timeline
client = env.pageserver.http_client()
@@ -542,8 +541,7 @@ def test_timeline_deletion_with_files_stuck_in_upload_queue(
}
)
tenant_id = env.initial_tenant
assert env.initial_timeline is not None
timeline_id: TimelineId = env.initial_timeline
timeline_id = env.initial_timeline
timeline_path = env.timeline_dir(tenant_id, timeline_id)
@@ -808,8 +806,7 @@ def test_compaction_delete_before_upload(
)
tenant_id = env.initial_tenant
assert env.initial_timeline is not None
timeline_id: TimelineId = env.initial_timeline
timeline_id = env.initial_timeline
client = env.pageserver.http_client()

View File

@@ -48,6 +48,11 @@ def test_tenant_delete_smoke(
env = neon_env_builder.init_start()
# lucky race with stopping from flushing a layer we fail to schedule any uploads
env.pageserver.allowed_errors.append(
".*layer flush task.+: could not flush frozen layer: update_metadata_file"
)
ps_http = env.pageserver.http_client()
# first try to delete non existing tenant
@@ -287,9 +292,8 @@ def test_delete_tenant_exercise_crash_safety_failpoints(
)
# TODO resume deletion (https://github.com/neondatabase/neon/issues/5006)
@pytest.mark.parametrize("remote_storage_kind", available_remote_storages())
def test_deleted_tenant_ignored_on_attach(
def test_tenant_delete_is_resumed_on_attach(
neon_env_builder: NeonEnvBuilder,
remote_storage_kind: RemoteStorageKind,
pg_bin: PgBin,
@@ -331,6 +335,8 @@ def test_deleted_tenant_ignored_on_attach(
(
# allow errors caused by failpoints
f".*failpoint: {failpoint}",
# From deletion polling
f".*NotFound: tenant {env.initial_tenant}.*",
# It appears when we stopped flush loop during deletion (attempt) and then pageserver is stopped
".*freeze_and_flush_on_shutdown.*failed to freeze and flush: cannot flush frozen layers when flush_loop is not running, state is Exited",
# error from http response is also logged
@@ -376,20 +382,17 @@ def test_deleted_tenant_ignored_on_attach(
env.pageserver.start()
# now we call attach
with pytest.raises(
PageserverApiException, match="Tenant is marked as deleted on remote storage"
):
ps_http.tenant_attach(tenant_id=tenant_id)
ps_http.tenant_attach(tenant_id=tenant_id)
# delete should be resumed (not yet)
# wait_tenant_status_404(ps_http, tenant_id, iterations)
# delete should be resumed
wait_tenant_status_404(ps_http, tenant_id, iterations)
# we shouldn've created tenant dir on disk
tenant_path = env.tenant_dir(tenant_id=tenant_id)
assert not tenant_path.exists()
if remote_storage_kind in available_s3_storages():
assert_prefix_not_empty(
assert_prefix_empty(
neon_env_builder,
prefix="/".join(
(

View File

@@ -463,8 +463,8 @@ def test_detach_while_attaching(
client = env.pageserver.http_client()
tenant_id = TenantId(endpoint.safe_psql("show neon.tenant_id")[0][0])
timeline_id = TimelineId(endpoint.safe_psql("show neon.timeline_id")[0][0])
tenant_id = env.initial_tenant
timeline_id = env.initial_timeline
# Attempts to connect from compute to pageserver while the tenant is
# temporarily detached produces these errors in the pageserver log.
@@ -615,8 +615,8 @@ def test_ignored_tenant_download_missing_layers(
pageserver_http = env.pageserver.http_client()
endpoint = env.endpoints.create_start("main")
tenant_id = TenantId(endpoint.safe_psql("show neon.tenant_id")[0][0])
timeline_id = TimelineId(endpoint.safe_psql("show neon.timeline_id")[0][0])
tenant_id = env.initial_tenant
timeline_id = env.initial_timeline
# Attempts to connect from compute to pageserver while the tenant is
# temporarily detached produces these errors in the pageserver log.
@@ -679,10 +679,10 @@ def test_ignored_tenant_stays_broken_without_metadata(
)
env = neon_env_builder.init_start()
pageserver_http = env.pageserver.http_client()
endpoint = env.endpoints.create_start("main")
env.endpoints.create_start("main")
tenant_id = TenantId(endpoint.safe_psql("show neon.tenant_id")[0][0])
timeline_id = TimelineId(endpoint.safe_psql("show neon.timeline_id")[0][0])
tenant_id = env.initial_tenant
timeline_id = env.initial_timeline
# Attempts to connect from compute to pageserver while the tenant is
# temporarily detached produces these errors in the pageserver log.
@@ -723,9 +723,9 @@ def test_load_attach_negatives(
)
env = neon_env_builder.init_start()
pageserver_http = env.pageserver.http_client()
endpoint = env.endpoints.create_start("main")
env.endpoints.create_start("main")
tenant_id = TenantId(endpoint.safe_psql("show neon.tenant_id")[0][0])
tenant_id = env.initial_tenant
# Attempts to connect from compute to pageserver while the tenant is
# temporarily detached produces these errors in the pageserver log.
@@ -773,8 +773,8 @@ def test_ignore_while_attaching(
pageserver_http = env.pageserver.http_client()
tenant_id = TenantId(endpoint.safe_psql("show neon.tenant_id")[0][0])
timeline_id = TimelineId(endpoint.safe_psql("show neon.timeline_id")[0][0])
tenant_id = env.initial_tenant
timeline_id = env.initial_timeline
# Attempts to connect from compute to pageserver while the tenant is
# temporarily detached produces these errors in the pageserver log.

View File

@@ -142,8 +142,8 @@ def test_tenants_attached_after_download(
client = env.pageserver.http_client()
tenant_id = TenantId(endpoint.safe_psql("show neon.tenant_id")[0][0])
timeline_id = TimelineId(endpoint.safe_psql("show neon.timeline_id")[0][0])
tenant_id = env.initial_tenant
timeline_id = env.initial_timeline
# Thats because of UnreliableWrapper's injected failures
env.pageserver.allowed_errors.append(
@@ -252,8 +252,8 @@ def test_tenant_redownloads_truncated_file_on_startup(
pageserver_http = env.pageserver.http_client()
endpoint = env.endpoints.create_start("main")
tenant_id = TenantId(endpoint.safe_psql("show neon.tenant_id")[0][0])
timeline_id = TimelineId(endpoint.safe_psql("show neon.timeline_id")[0][0])
tenant_id = env.initial_tenant
timeline_id = env.initial_timeline
with endpoint.cursor() as cur:
cur.execute("CREATE TABLE t1 AS VALUES (123, 'foobar');")

View File

@@ -10,7 +10,6 @@ from fixtures.neon_fixtures import (
)
from fixtures.pageserver.http import LayerMapInfo
from fixtures.remote_storage import RemoteStorageKind
from fixtures.types import TimelineId
from pytest_httpserver import HTTPServer
# NB: basic config change tests are in test_tenant_conf.py
@@ -45,7 +44,6 @@ def test_threshold_based_eviction(
)
tenant_id, timeline_id = env.initial_tenant, env.initial_timeline
assert isinstance(timeline_id, TimelineId)
ps_http = env.pageserver.http_client()
assert ps_http.tenant_config(tenant_id).effective_config["eviction_policy"] == {

View File

@@ -17,6 +17,7 @@ from fixtures.neon_fixtures import (
)
from fixtures.pageserver.http import PageserverApiException
from fixtures.pageserver.utils import (
MANY_SMALL_LAYERS_TENANT_CONFIG,
assert_prefix_empty,
assert_prefix_not_empty,
poll_for_remote_storage_iterations,
@@ -34,7 +35,7 @@ from fixtures.remote_storage import (
available_s3_storages,
)
from fixtures.types import Lsn, TenantId, TimelineId
from fixtures.utils import query_scalar, wait_until
from fixtures.utils import query_scalar, run_pg_bench_small, wait_until
def test_timeline_delete(neon_simple_env: NeonEnv):
@@ -208,7 +209,7 @@ def test_delete_timeline_exercise_crash_safety_failpoints(
timeline_id = env.neon_cli.create_timeline("delete")
with env.endpoints.create_start("delete") as endpoint:
# generate enough layers
pg_bin.run(["pgbench", "-i", "-I dtGvp", "-s1", endpoint.connstr()])
run_pg_bench_small(pg_bin, endpoint.connstr())
if remote_storage_kind is RemoteStorageKind.NOOP:
wait_for_last_flush_lsn(env, endpoint, env.initial_tenant, timeline_id)
else:
@@ -358,8 +359,8 @@ def test_timeline_resurrection_on_attach(
ps_http = env.pageserver.http_client()
pg = env.endpoints.create_start("main")
tenant_id = TenantId(pg.safe_psql("show neon.tenant_id")[0][0])
main_timeline_id = TimelineId(pg.safe_psql("show neon.timeline_id")[0][0])
tenant_id = env.initial_tenant
main_timeline_id = env.initial_timeline
with pg.cursor() as cur:
cur.execute("CREATE TABLE f (i integer);")
@@ -487,15 +488,7 @@ def test_timeline_delete_fail_before_local_delete(neon_env_builder: NeonEnvBuild
# Wait for tenant to finish loading.
wait_until_tenant_active(ps_http, tenant_id=env.initial_tenant, iterations=10, period=1)
try:
data = ps_http.timeline_detail(env.initial_tenant, leaf_timeline_id)
log.debug(f"detail {data}")
except PageserverApiException as e:
log.debug(e)
if e.status_code != 404:
raise
else:
raise Exception("detail succeeded (it should return 404)")
wait_timeline_detail_404(ps_http, env.initial_tenant, leaf_timeline_id, iterations=4)
assert (
not leaf_timeline_path.exists()
@@ -519,8 +512,6 @@ def test_timeline_delete_fail_before_local_delete(neon_env_builder: NeonEnvBuild
),
)
assert env.initial_timeline is not None
for timeline_id in (intermediate_timeline_id, env.initial_timeline):
timeline_delete_wait_completed(
ps_http, tenant_id=env.initial_tenant, timeline_id=timeline_id
@@ -723,13 +714,9 @@ def test_timeline_delete_works_for_remote_smoke(
ps_http = env.pageserver.http_client()
pg = env.endpoints.create_start("main")
tenant_id = TenantId(pg.safe_psql("show neon.tenant_id")[0][0])
main_timeline_id = TimelineId(pg.safe_psql("show neon.timeline_id")[0][0])
tenant_id = env.initial_tenant
timeline_id = env.initial_timeline
assert tenant_id == env.initial_tenant
assert main_timeline_id == env.initial_timeline
assert env.initial_timeline is not None
timeline_ids = [env.initial_timeline]
for i in range(2):
branch_timeline_id = env.neon_cli.create_branch(f"new{i}", "main")
@@ -750,9 +737,8 @@ def test_timeline_delete_works_for_remote_smoke(
log.info("waiting for checkpoint upload")
wait_for_upload(ps_http, tenant_id, branch_timeline_id, current_lsn)
log.info("upload of checkpoint is done")
timeline_id = TimelineId(pg.safe_psql("show neon.timeline_id")[0][0])
timeline_ids.append(timeline_id)
timeline_ids.append(branch_timeline_id)
for timeline_id in timeline_ids:
assert_prefix_not_empty(
@@ -812,7 +798,7 @@ def test_delete_orphaned_objects(
timeline_id = env.neon_cli.create_timeline("delete")
with env.endpoints.create_start("delete") as endpoint:
# generate enough layers
pg_bin.run(["pgbench", "-i", "-I dtGvp", "-s1", endpoint.connstr()])
run_pg_bench_small(pg_bin, endpoint.connstr())
last_flush_lsn_upload(env, endpoint, env.initial_tenant, timeline_id)
# write orphaned file that is missing from the index
@@ -848,3 +834,121 @@ def test_delete_orphaned_objects(
)
assert env.remote_storage.index_path(env.initial_tenant, timeline_id).exists()
@pytest.mark.parametrize("remote_storage_kind", available_remote_storages())
def test_timeline_delete_resumed_on_attach(
neon_env_builder: NeonEnvBuilder,
remote_storage_kind: RemoteStorageKind,
pg_bin: PgBin,
):
neon_env_builder.enable_remote_storage(
remote_storage_kind=remote_storage_kind,
test_name="test_deleted_tenant_ignored_on_attach",
)
env = neon_env_builder.init_start(initial_tenant_conf=MANY_SMALL_LAYERS_TENANT_CONFIG)
tenant_id = env.initial_tenant
ps_http = env.pageserver.http_client()
timeline_id = env.neon_cli.create_timeline("delete")
with env.endpoints.create_start("delete") as endpoint:
# generate enough layers
run_pg_bench_small(pg_bin, endpoint.connstr())
last_flush_lsn_upload(env, endpoint, env.initial_tenant, timeline_id)
if remote_storage_kind in available_s3_storages():
assert_prefix_not_empty(
neon_env_builder,
prefix="/".join(
(
"tenants",
str(env.initial_tenant),
"timelines",
str(timeline_id),
)
),
)
# failpoint before we remove index_part from s3
failpoint = "timeline-delete-during-rm"
ps_http.configure_failpoints((failpoint, "return"))
env.pageserver.allowed_errors.extend(
(
# allow errors caused by failpoints
f".*failpoint: {failpoint}",
# It appears when we stopped flush loop during deletion (attempt) and then pageserver is stopped
".*freeze_and_flush_on_shutdown.*failed to freeze and flush: cannot flush frozen layers when flush_loop is not running, state is Exited",
# error from http response is also logged
".*InternalServerError\\(Tenant is marked as deleted on remote storage.*",
# Polling after attach may fail with this
f".*InternalServerError\\(Tenant {tenant_id} is not active.*",
'.*shutdown_pageserver{exit_code=0}: stopping left-over name="remote upload".*',
)
)
iterations = poll_for_remote_storage_iterations(remote_storage_kind)
ps_http.timeline_delete(tenant_id, timeline_id)
timeline_info = wait_until_timeline_state(
pageserver_http=ps_http,
tenant_id=env.initial_tenant,
timeline_id=timeline_id,
expected_state="Broken",
iterations=iterations,
)
reason = timeline_info["state"]["Broken"]["reason"]
log.info(f"timeline broken: {reason}")
# failpoint may not be the only error in the stack
assert reason.endswith(f"failpoint: {failpoint}"), reason
if remote_storage_kind in available_s3_storages():
assert_prefix_not_empty(
neon_env_builder,
prefix="/".join(
(
"tenants",
str(tenant_id),
"timelines",
str(timeline_id),
)
),
)
# now we stop pageserver and remove local tenant state
env.endpoints.stop_all()
env.pageserver.stop()
dir_to_clear = Path(env.repo_dir) / "tenants"
shutil.rmtree(dir_to_clear)
os.mkdir(dir_to_clear)
env.pageserver.start()
# now we call attach
ps_http.tenant_attach(tenant_id=tenant_id)
# delete should be resumed
wait_timeline_detail_404(ps_http, env.initial_tenant, timeline_id, iterations=iterations)
tenant_path = env.timeline_dir(tenant_id=tenant_id, timeline_id=timeline_id)
assert not tenant_path.exists()
if remote_storage_kind in available_s3_storages():
assert_prefix_empty(
neon_env_builder,
prefix="/".join(
(
"tenants",
str(timeline_id),
"timelines",
str(timeline_id),
)
),
)

View File

@@ -270,7 +270,8 @@ def test_broker(neon_env_builder: NeonEnvBuilder):
neon_env_builder.enable_local_fs_remote_storage()
env = neon_env_builder.init_start()
env.neon_cli.create_branch("test_broker", "main")
tenant_id = env.initial_tenant
timeline_id = env.neon_cli.create_branch("test_broker", "main")
# FIXME: Is this expected?
env.pageserver.allowed_errors.append(
@@ -280,10 +281,6 @@ def test_broker(neon_env_builder: NeonEnvBuilder):
endpoint = env.endpoints.create_start("test_broker")
endpoint.safe_psql("CREATE TABLE t(key int primary key, value text)")
# learn neon timeline from compute
tenant_id = TenantId(endpoint.safe_psql("show neon.tenant_id")[0][0])
timeline_id = TimelineId(endpoint.safe_psql("show neon.timeline_id")[0][0])
# wait until remote_consistent_lsn gets advanced on all safekeepers
clients = [sk.http_client() for sk in env.safekeepers]
stat_before = [cli.timeline_status(tenant_id, timeline_id) for cli in clients]
@@ -325,7 +322,8 @@ def test_wal_removal(neon_env_builder: NeonEnvBuilder, auth_enabled: bool):
".*init_tenant_mgr: marking .* as locally complete, while it doesnt exist in remote index.*"
)
env.neon_cli.create_branch("test_safekeepers_wal_removal")
tenant_id = env.initial_tenant
timeline_id = env.neon_cli.create_branch("test_safekeepers_wal_removal")
endpoint = env.endpoints.create_start("test_safekeepers_wal_removal")
# Note: it is important to insert at least two segments, as currently
@@ -338,9 +336,6 @@ def test_wal_removal(neon_env_builder: NeonEnvBuilder, auth_enabled: bool):
]
)
tenant_id = TenantId(endpoint.safe_psql("show neon.tenant_id")[0][0])
timeline_id = TimelineId(endpoint.safe_psql("show neon.timeline_id")[0][0])
# force checkpoint to advance remote_consistent_lsn
pageserver_conn_options = {}
if auth_enabled:
@@ -451,13 +446,10 @@ def test_wal_backup(neon_env_builder: NeonEnvBuilder, remote_storage_kind: Remot
env = neon_env_builder.init_start()
env.neon_cli.create_branch("test_safekeepers_wal_backup")
tenant_id = env.initial_tenant
timeline_id = env.neon_cli.create_branch("test_safekeepers_wal_backup")
endpoint = env.endpoints.create_start("test_safekeepers_wal_backup")
# learn neon timeline from compute
tenant_id = TenantId(endpoint.safe_psql("show neon.tenant_id")[0][0])
timeline_id = TimelineId(endpoint.safe_psql("show neon.timeline_id")[0][0])
pg_conn = endpoint.connect()
cur = pg_conn.cursor()
cur.execute("create table t(key int, value text)")
@@ -505,14 +497,11 @@ def test_s3_wal_replay(neon_env_builder: NeonEnvBuilder, remote_storage_kind: Re
neon_env_builder.remote_storage_users = RemoteStorageUsers.SAFEKEEPER
env = neon_env_builder.init_start()
env.neon_cli.create_branch("test_s3_wal_replay")
tenant_id = env.initial_tenant
timeline_id = env.neon_cli.create_branch("test_s3_wal_replay")
endpoint = env.endpoints.create_start("test_s3_wal_replay")
# learn neon timeline from compute
tenant_id = TenantId(endpoint.safe_psql("show neon.tenant_id")[0][0])
timeline_id = TimelineId(endpoint.safe_psql("show neon.timeline_id")[0][0])
expected_sum = 0
with closing(endpoint.connect()) as conn:
@@ -796,15 +785,12 @@ def test_timeline_status(neon_env_builder: NeonEnvBuilder, auth_enabled: bool):
neon_env_builder.auth_enabled = auth_enabled
env = neon_env_builder.init_start()
env.neon_cli.create_branch("test_timeline_status")
tenant_id = env.initial_tenant
timeline_id = env.neon_cli.create_branch("test_timeline_status")
endpoint = env.endpoints.create_start("test_timeline_status")
wa = env.safekeepers[0]
# learn neon timeline from compute
tenant_id = TenantId(endpoint.safe_psql("show neon.tenant_id")[0][0])
timeline_id = TimelineId(endpoint.safe_psql("show neon.timeline_id")[0][0])
if not auth_enabled:
wa_http_cli = wa.http_client()
wa_http_cli.check_status()
@@ -887,15 +873,12 @@ def test_start_replication_term(neon_env_builder: NeonEnvBuilder):
env = neon_env_builder.init_start()
env.neon_cli.create_branch("test_start_replication_term")
tenant_id = env.initial_tenant
timeline_id = env.neon_cli.create_branch("test_start_replication_term")
endpoint = env.endpoints.create_start("test_start_replication_term")
endpoint.safe_psql("CREATE TABLE t(key int primary key, value text)")
# learn neon timeline from compute
tenant_id = TenantId(endpoint.safe_psql("show neon.tenant_id")[0][0])
timeline_id = TimelineId(endpoint.safe_psql("show neon.timeline_id")[0][0])
sk = env.safekeepers[0]
sk_http_cli = sk.http_client()
tli_status = sk_http_cli.timeline_status(tenant_id, timeline_id)
@@ -922,15 +905,12 @@ def test_sk_auth(neon_env_builder: NeonEnvBuilder):
neon_env_builder.auth_enabled = True
env = neon_env_builder.init_start()
env.neon_cli.create_branch("test_sk_auth")
endpoint = env.endpoints.create_start("test_sk_auth")
tenant_id = env.initial_tenant
timeline_id = env.neon_cli.create_branch("test_sk_auth")
env.endpoints.create_start("test_sk_auth")
sk = env.safekeepers[0]
# learn neon timeline from compute
tenant_id = TenantId(endpoint.safe_psql("show neon.tenant_id")[0][0])
timeline_id = TimelineId(endpoint.safe_psql("show neon.timeline_id")[0][0])
tenant_token = env.auth_keys.generate_tenant_token(tenant_id)
full_token = env.auth_keys.generate_safekeeper_token()
@@ -1185,7 +1165,8 @@ def test_replace_safekeeper(neon_env_builder: NeonEnvBuilder):
neon_env_builder.num_safekeepers = 4
env = neon_env_builder.init_start()
env.neon_cli.create_branch("test_replace_safekeeper")
tenant_id = env.initial_tenant
timeline_id = env.neon_cli.create_branch("test_replace_safekeeper")
log.info("Use only first 3 safekeepers")
env.safekeepers[3].stop()
@@ -1193,10 +1174,6 @@ def test_replace_safekeeper(neon_env_builder: NeonEnvBuilder):
endpoint.active_safekeepers = [1, 2, 3]
endpoint.start()
# learn neon timeline from compute
tenant_id = TenantId(endpoint.safe_psql("show neon.tenant_id")[0][0])
timeline_id = TimelineId(endpoint.safe_psql("show neon.timeline_id")[0][0])
execute_payload(endpoint)
show_statuses(env.safekeepers, tenant_id, timeline_id)
@@ -1448,7 +1425,8 @@ def test_pull_timeline(neon_env_builder: NeonEnvBuilder):
neon_env_builder.num_safekeepers = 4
env = neon_env_builder.init_start()
env.neon_cli.create_branch("test_pull_timeline")
tenant_id = env.initial_tenant
timeline_id = env.neon_cli.create_branch("test_pull_timeline")
log.info("Use only first 3 safekeepers")
env.safekeepers[3].stop()
@@ -1456,10 +1434,6 @@ def test_pull_timeline(neon_env_builder: NeonEnvBuilder):
endpoint.active_safekeepers = [1, 2, 3]
endpoint.start()
# learn neon timeline from compute
tenant_id = TenantId(endpoint.safe_psql("show neon.tenant_id")[0][0])
timeline_id = TimelineId(endpoint.safe_psql("show neon.timeline_id")[0][0])
execute_payload(endpoint)
show_statuses(env.safekeepers, tenant_id, timeline_id)