Compare commits

..

2 Commits

Author SHA1 Message Date
MMeent
a24d0f6cd1 Update pagestore_smgr.c
Fix a potential self-race where WAL isn't flushed before the changes are requested from PS, resulting in stalls.
2023-09-27 17:47:35 +02:00
MMeent
84ea96a3a5 Use single WAL record for 0-extension of relations
Bulk materialization of empty pages at logging of new last relation block is handled in the Pageserver already, so no need for us to log those pages.
2023-09-27 15:15:41 +02:00
45 changed files with 885 additions and 1819 deletions

View File

@@ -834,7 +834,7 @@ jobs:
run:
shell: sh -eu {0}
env:
VM_BUILDER_VERSION: v0.17.12
VM_BUILDER_VERSION: v0.17.11
steps:
- name: Checkout

View File

@@ -5,7 +5,7 @@
/libs/remote_storage/ @neondatabase/storage
/libs/safekeeper_api/ @neondatabase/safekeepers
/libs/vm_monitor/ @neondatabase/autoscaling @neondatabase/compute
/pageserver/ @neondatabase/storage
/pageserver/ @neondatabase/compute @neondatabase/storage
/pgxn/ @neondatabase/compute
/proxy/ @neondatabase/proxy
/safekeeper/ @neondatabase/safekeepers

51
Cargo.lock generated
View File

@@ -158,17 +158,6 @@ dependencies = [
"syn 1.0.109",
]
[[package]]
name = "async-channel"
version = "1.9.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "81953c529336010edd6d8e358f886d9581267795c61b19475b71314bffa46d35"
dependencies = [
"concurrent-queue",
"event-listener",
"futures-core",
]
[[package]]
name = "async-compression"
version = "0.4.0"
@@ -1026,15 +1015,6 @@ dependencies = [
"zstd",
]
[[package]]
name = "concurrent-queue"
version = "2.3.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f057a694a54f12365049b0958a1685bb52d567f5593b355fbf685838e873d400"
dependencies = [
"crossbeam-utils",
]
[[package]]
name = "const_format"
version = "0.2.30"
@@ -1455,12 +1435,6 @@ dependencies = [
"libc",
]
[[package]]
name = "event-listener"
version = "2.5.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0206175f82b8d6bf6652ff7d71a1e27fd2e4efde587fd368662814d6ec1d9ce0"
[[package]]
name = "fail"
version = "0.5.1"
@@ -1806,9 +1780,18 @@ checksum = "95505c38b4572b2d910cecb0281560f54b440a19336cbbcb27bf6ce6adc6f5a8"
[[package]]
name = "hermit-abi"
version = "0.3.3"
version = "0.2.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d77f7ec81a6d05a3abb01ab6eb7590f6083d08449fe5a1c8b1e620283546ccb7"
checksum = "ee512640fe35acbfb4bb779db6f0d80704c2cacfa2e39b601ef3e3f47d1ae4c7"
dependencies = [
"libc",
]
[[package]]
name = "hermit-abi"
version = "0.3.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "fed44880c466736ef9a5c5b5facefb5ed0785676d0c02d612db14e54f0d84286"
[[package]]
name = "hex"
@@ -2070,7 +2053,7 @@ version = "1.0.11"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "eae7b9aee968036d54dce06cebaefd919e4472e753296daccd6d344e3e2df0c2"
dependencies = [
"hermit-abi",
"hermit-abi 0.3.1",
"libc",
"windows-sys 0.48.0",
]
@@ -2087,7 +2070,7 @@ version = "0.4.7"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "adcf93614601c8129ddf72e2d5633df827ba6551541c6d8c59520a371475be1f"
dependencies = [
"hermit-abi",
"hermit-abi 0.3.1",
"io-lifetimes",
"rustix 0.37.19",
"windows-sys 0.48.0",
@@ -2461,11 +2444,11 @@ dependencies = [
[[package]]
name = "num_cpus"
version = "1.16.0"
version = "1.15.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "4161fcb6d602d4d2081af7c3a45852d875a03dd337a6bfdd6e06407b61342a43"
checksum = "0fac9e2da13b5eb447a6ce3d392f23a29d8694bff781bf03a16cd9ac8697593b"
dependencies = [
"hermit-abi",
"hermit-abi 0.2.6",
"libc",
]
@@ -2682,7 +2665,6 @@ name = "pageserver"
version = "0.1.0"
dependencies = [
"anyhow",
"async-channel",
"async-compression",
"async-stream",
"async-trait",
@@ -3264,7 +3246,6 @@ dependencies = [
"reqwest-tracing",
"routerify",
"rstest",
"rustc-hash",
"rustls",
"rustls-pemfile",
"scopeguard",

View File

@@ -107,7 +107,6 @@ reqwest-middleware = "0.2.0"
reqwest-retry = "0.2.2"
routerify = "3"
rpds = "0.13"
rustc-hash = "1.1.0"
rustls = "0.21"
rustls-pemfile = "1"
rustls-split = "0.3"

View File

@@ -614,11 +614,15 @@ RUN wget https://gitlab.com/dalibo/postgresql_anonymizer/-/archive/1.1.0/postgre
#########################################################################################
#
# Layer "rust extensions" for older extension which hasn't been updated to `pgrx` yet
# Layer "rust extensions"
# This layer is used to build `pgx` deps
#
# FIXME: This needs to be updated to latest version of 'pgrx' (it was renamed from
# 'pgx' to 'pgrx') for PostgreSQL 16. And that in turn requires bumping the pgx
# dependency on all the rust extension that depend on it, too.
#
#########################################################################################
FROM build-deps AS rust-extensions-build-pgx
FROM build-deps AS rust-extensions-build
COPY --from=pg-build /usr/local/pgsql/ /usr/local/pgsql/
RUN apt-get update && \
@@ -650,34 +654,6 @@ RUN case "${PG_VERSION}" in \
USER root
#########################################################################################
#
# Layer "rust extensions"
# This layer is used to build `pgrx` deps
#
#########################################################################################
FROM build-deps AS rust-extensions-build-pgrx
COPY --from=pg-build /usr/local/pgsql/ /usr/local/pgsql/
RUN apt-get update && \
apt-get install -y curl libclang-dev cmake && \
useradd -ms /bin/bash nonroot -b /home
ENV HOME=/home/nonroot
ENV PATH="/home/nonroot/.cargo/bin:/usr/local/pgsql/bin/:$PATH"
USER nonroot
WORKDIR /home/nonroot
ARG PG_VERSION
RUN curl -sSO https://static.rust-lang.org/rustup/dist/$(uname -m)-unknown-linux-gnu/rustup-init && \
chmod +x rustup-init && \
./rustup-init -y --no-modify-path --profile minimal --default-toolchain stable && \
rm rustup-init && \
cargo install --locked --version 0.10.2 cargo-pgrx && \
/bin/bash -c 'cargo pgrx init --pg${PG_VERSION:1}=/usr/local/pgsql/bin/pg_config'
USER root
#########################################################################################
#
# Layer "pg-jsonschema-pg-build"
@@ -685,7 +661,7 @@ USER root
#
#########################################################################################
FROM rust-extensions-build-pgx AS pg-jsonschema-pg-build
FROM rust-extensions-build AS pg-jsonschema-pg-build
ARG PG_VERSION
# caeab60d70b2fd3ae421ec66466a3abbb37b7ee6 made on 06/03/2023
@@ -714,7 +690,7 @@ RUN case "${PG_VERSION}" in \
#
#########################################################################################
FROM rust-extensions-build-pgx AS pg-graphql-pg-build
FROM rust-extensions-build AS pg-graphql-pg-build
ARG PG_VERSION
# b4988843647450a153439be367168ed09971af85 made on 22/02/2023 (from remove-pgx-contrib-spiext branch)
@@ -748,14 +724,24 @@ RUN case "${PG_VERSION}" in \
#
#########################################################################################
FROM rust-extensions-build-pgrx AS pg-tiktoken-pg-build
FROM rust-extensions-build AS pg-tiktoken-pg-build
ARG PG_VERSION
# 26806147b17b60763039c6a6878884c41a262318 made on 26/09/2023
RUN wget https://github.com/kelvich/pg_tiktoken/archive/26806147b17b60763039c6a6878884c41a262318.tar.gz -O pg_tiktoken.tar.gz && \
echo "e64e55aaa38c259512d3e27c572da22c4637418cf124caba904cd50944e5004e pg_tiktoken.tar.gz" | sha256sum --check && \
# 801f84f08c6881c8aa30f405fafbf00eec386a72 made on 10/03/2023
RUN case "${PG_VERSION}" in \
"v14" | "v15") \
;; \
"v16") \
echo "TODO: Not yet supported for PostgreSQL 16. Need to update pgrx dependencies" && exit 0 \
;; \
*) \
echo "unexpected PostgreSQL version" && exit 1 \
;; \
esac && \
wget https://github.com/kelvich/pg_tiktoken/archive/801f84f08c6881c8aa30f405fafbf00eec386a72.tar.gz -O pg_tiktoken.tar.gz && \
echo "52f60ac800993a49aa8c609961842b611b6b1949717b69ce2ec9117117e16e4a pg_tiktoken.tar.gz" | sha256sum --check && \
mkdir pg_tiktoken-src && cd pg_tiktoken-src && tar xvzf ../pg_tiktoken.tar.gz --strip-components=1 -C . && \
cargo pgrx install --release && \
cargo pgx install --release && \
echo "trusted = true" >> /usr/local/pgsql/share/extension/pg_tiktoken.control
#########################################################################################
@@ -765,18 +751,24 @@ RUN wget https://github.com/kelvich/pg_tiktoken/archive/26806147b17b60763039c6a6
#
#########################################################################################
FROM rust-extensions-build-pgrx AS pg-pgx-ulid-build
FROM rust-extensions-build AS pg-pgx-ulid-build
ARG PG_VERSION
RUN wget https://github.com/pksunkara/pgx_ulid/archive/refs/tags/v0.1.3.tar.gz -O pgx_ulid.tar.gz && \
echo "ee5db82945d2d9f2d15597a80cf32de9dca67b897f605beb830561705f12683c pgx_ulid.tar.gz" | sha256sum --check && \
RUN case "${PG_VERSION}" in \
"v14" | "v15") \
;; \
"v16") \
echo "TODO: Not yet supported for PostgreSQL 16. Need to update pgrx dependencies" && exit 0 \
;; \
*) \
echo "unexpected PostgreSQL version" && exit 1 \
;; \
esac && \
wget https://github.com/pksunkara/pgx_ulid/archive/refs/tags/v0.1.0.tar.gz -O pgx_ulid.tar.gz && \
echo "908b7358e6f846e87db508ae5349fb56a88ee6305519074b12f3d5b0ff09f791 pgx_ulid.tar.gz" | sha256sum --check && \
mkdir pgx_ulid-src && cd pgx_ulid-src && tar xvzf ../pgx_ulid.tar.gz --strip-components=1 -C . && \
echo "******************* Apply a patch for Postgres 16 support; delete in the next release ******************" && \
wget https://github.com/pksunkara/pgx_ulid/commit/f84954cf63fc8c80d964ac970d9eceed3c791196.patch && \
patch -p1 < f84954cf63fc8c80d964ac970d9eceed3c791196.patch && \
echo "********************************************************************************************************" && \
sed -i 's/pgrx = "=0.10.2"/pgrx = { version = "=0.10.2", features = [ "unsafe-postgres" ] }/g' Cargo.toml && \
cargo pgrx install --release && \
sed -i 's/pgx = "=0.7.3"/pgx = { version = "0.7.3", features = [ "unsafe-postgres" ] }/g' Cargo.toml && \
cargo pgx install --release && \
echo "trusted = true" >> /usr/local/pgsql/share/extension/ulid.control
#########################################################################################

View File

@@ -23,7 +23,7 @@ vulnerability = "deny"
unmaintained = "warn"
yanked = "warn"
notice = "warn"
ignore = []
ignore = ["RUSTSEC-2023-0052"]
# This section is considered when running `cargo deny check licenses`
# More documentation for the licenses section can be found here:

View File

@@ -1,599 +0,0 @@
# Seamless tenant migration
- Author: john@neon.tech
- Created on 2023-08-11
- Implemented on ..
## Summary
The preceding [generation numbers RFC](025-generation-numbers.md) may be thought of as "making tenant
migration safe". Following that,
this RFC is about how those migrations are to be done:
1. Seamlessly (without interruption to client availability)
2. Quickly (enabling faster operations)
3. Efficiently (minimizing I/O and $ cost)
These points are in priority order: if we have to sacrifice
efficiency to make a migration seamless for clients, we will
do so, etc.
This is accomplished by introducing two high level changes:
- A dual-attached state for tenants, used in a control-plane-orchestrated
migration procedure that preserves availability during a migration.
- Warm secondary locations for tenants, where on-disk content is primed
for a fast migration of the tenant from its current attachment to this
secondary location.
## Motivation
Migrating tenants between pageservers is essential to operating a service
at scale, in several contexts:
1. Responding to a pageserver node failure by migrating tenants to other pageservers
2. Balancing load and capacity across pageservers, for example when a user expands their
database and they need to migrate to a pageserver with more capacity.
3. Restarting pageservers for upgrades and maintenance
The current situation steps for migration are:
- detach from old node; skip if old node is dead; (the [skip part is still WIP](https://github.com/neondatabase/cloud/issues/5426)).
- attach to new node
- re-configure endpoints to use the new node
Once [generation numbers](025-generation-numbers.md) are implemented,
the detach step is no longer critical for correctness. So, we can
- attach to a new node,
- re-configure endpoints to use the new node, and then
- detach from the old node.
However, this still does not meet our seamless/fast/efficient goals:
- Not fast: The new node will have to download potentially large amounts
of data from S3, which may take many minutes.
- Not seamless: If we attach to a new pageserver before detaching an old one,
the new one might delete some objects that interrupt availability of reads on the old one.
- Not efficient: the old pageserver will continue uploading
S3 content during the migration that will never be read.
The user expectations for availability are:
- For planned maintenance, there should be zero availability
gap. This expectation is fulfilled by this RFC.
- For unplanned changes (e.g. node failures), there should be
minimal availability gap. This RFC provides the _mechanism_
to fail over quickly, but does not provide the failure _detection_
nor failover _policy_.
## Non Goals
- Defining service tiers with different storage strategies: the same
level of HA & overhead will apply to all tenants. This doesn't rule out
adding such tiers in future.
- Enabling pageserver failover in the absence of a control plane: the control
plane will remain the source of truth for what should be attached where.
- Totally avoiding availability gaps on unplanned migrations during
a failure (we expect a small, bounded window of
read unavailability of very recent LSNs)
- Workload balancing: this RFC defines the mechanism for moving tenants
around, not the higher level logic for deciding who goes where.
- Defining all possible configuration flows for tenants: the migration process
defined in this RFC demonstrates the sufficiency of the pageserver API, but
is not the only kind of configuration change the control plane will ever do.
The APIs defined here should let the control plane move tenants around in
whatever way is needed while preserving data safety and read availability.
## Impacted components
Pageserver, control plane
## Terminology
- **Attachment**: a tenant is _attached_ to a pageserver if it has
been issued a generation number, and is running an instance of
the `Tenant` type, ingesting the WAL, and available to serve
page reads.
- **Location**: locations are a superset of attachments. A location
is a combination of a tenant and a pageserver. We may _attach_ at a _location_.
- **Secondary location**: a location which is not currently attached.
- **Warm secondary location**: a location which is not currently attached, but is endeavoring to maintain a warm local cache of layers. We avoid calling this a _warm standby_ to avoid confusion with similar postgres features.
## Implementation (high level)
### Warm secondary locations
To enable faster migrations, we will identify at least one _secondary location_
for each tenant. This secondary location will keep a warm cache of layers
for the tenant, so that if it is later attached, it can catch up with the
latest LSN quickly: rather than downloading everything, it only has to replay
the recent part of the WAL to advance from the remote_consistent_offset to the
most recent LSN in the WAL.
The control plane is responsible for selecting secondary locations, and
calling into pageservers to configure tenants into a secondary mode at this
new location, as well as attaching the tenant in its existing primary location.
The attached pageserver for a tenant will publish a [layer heatmap](#layer-heatmap)
to advise secondaries of which layers should be downloaded.
### Location modes
Currently, we consider a tenant to be in one of two states on a pageserver:
- Attached: active `Tenant` object, and layers on local disk
- Detached: no layers on local disk, no runtime state.
We will extend this with finer-grained modes, whose purpose will become
clear in later sections:
- **AttachedSingle**: equivalent the existing attached state.
- **AttachedMulti**: like AttachedSingle, holds an up to date generation, but
does not do deletions.
- **AttachedStale**: like AttachedSingle, holds a stale generation,
do not do any remote storage operations.
- **Secondary**: keep local state on disk, periodically update from S3.
- **Detached**: equivalent to existing detached state.
To control these finer grained states, a new pageserver API endpoint will be added.
### Cutover procedure
Define old location and new location as "Node A" and "Node B". Consider
the case where both nodes are available, and Node B was previously configured
as a secondary location for the tenant we are migrating.
The cutover procedure is orchestrated by the control plane, calling into
the pageservers' APIs:
1. Call to Node A requesting it to flush to S3 and enter AttachedStale state
2. Increment generation, and call to Node B requesting it to enter AttachedMulti
state with the new generation.
3. Call to Node B, requesting it to download the latest hot layers from remote storage,
according to the latest heatmap flushed by Node A.
4. Wait for Node B's WAL ingestion to catch up with node A's
5. Update endpoints to use node B instead of node A
6. Call to node B requesting it to enter state AttachedSingle.
7. Call to node A requesting it to enter state Secondary
The following table summarizes how the state of the system advances:
| Step | Node A | Node B | Node used by endpoints |
| :-----------: | :------------: | :------------: | :--------------------: |
| 1 (_initial_) | AttachedSingle | Secondary | A |
| 2 | AttachedStale | AttachedMulti | A |
| 3 | AttachedStale | AttachedMulti | A |
| 4 | AttachedStale | AttachedMulti | A |
| 5 (_cutover_) | AttachedStale | AttachedMulti | B |
| 6 | AttachedStale | AttachedSingle | B |
| 7 (_final_) | Secondary | AttachedSingle | B |
The procedure described for a clean handover from a live node to a secondary
is also used for failure cases and for migrations to a location that is not
configured as a secondary, by simply skipping irrelevant steps, as described in
the following sections.
#### Migration from an unresponsive node
If node A is unavailable, then all calls into
node A are skipped and we don't wait for B to catch up before
switching updating the endpoints to use B.
#### Migration to a location that is not a secondary
If node B is initially in Detached state, the procedure is identical. Since Node B
is coming from a Detached state rather than Secondary, the download of layers and
catch up with WAL will take much longer.
We might do this if:
- Attached and secondary locations are both critically low on disk, and we need
to migrate to a third node with more resources available.
- We are migrating a tenant which does not use secondary locations to save on cost.
#### Permanent migration away from a node
In the final step of the migration, we generally request the original node to enter a Secondary
state. This is typical if we are doing a planned migration during maintenance, or to
balance CPU/network load away from a node.
One might also want to permanently migrate away: this can be done by simply removing the secondary
location after the migration is complete, or as an optimization by substituting the Detached state
for the Secondary state in the final step.
#### Cutover diagram
```mermaid
sequenceDiagram
participant CP as Control plane
participant A as Node A
participant B as Node B
participant E as Endpoint
CP->>A: PUT Flush & go to AttachedStale
note right of A: A continues to ingest WAL
CP->>B: PUT AttachedMulti
CP->>B: PUT Download layers from latest heatmap
note right of B: B downloads from S3
loop Poll until download complete
CP->>B: GET download status
end
activate B
note right of B: B ingests WAL
loop Poll until catch up
CP->>B: GET visible WAL
CP->>A: GET visible WAL
end
deactivate B
CP->>E: Configure to use Node B
E->>B: Connect for reads
CP->>B: PUT AttachedSingle
CP->>A: PUT Secondary
```
#### Cutover from an unavailable pageserver
This case is far simpler: we may skip straight to our intended
end state.
```mermaid
sequenceDiagram
participant A as Node A
participant CP as Control plane
participant B as Node B
participant E as Endpoint
note right of A: Node A offline
activate A
CP->>B: PUT AttachedSingle
CP->>E: Configure to use Node B
E->>B: Connect for reads
deactivate A
```
## Implementation (detail)
### Purpose of AttachedMulti, AttachedStale
#### AttachedMulti
Ordinarily, an attached pageserver whose generation is the latest may delete
layers at will (e.g. during compaction). If a previous generation pageserver
is also still attached, and in use by endpoints, then this layer deletion could
lead to a loss of availability for the endpoint when reading from the previous
generation pageserver.
The _AttachedMulti_ state simply disables deletions. These will be enqueued
in `RemoteTimelineClient` until the control plane transitions the
node into AttachedSingle, which unblocks deletions. Other remote storage operations
such as uploads are not blocked.
AttachedMulti is not required for data safety, only to preserve availability
on pageservers running with stale generations.
A node enters AttachedMulti only when explicitly asked to by the control plane. It should
only remain in this state for the duration of a migration.
If a control plane bug leaves
the node in AttachedMulti for a long time, then we must avoid unbounded memory use from enqueued
deletions. This may be accomplished simply, by dropping enqueued deletions when some modest
threshold of delayed deletions (e.g. 10k layers per tenant) is reached. As with all deletions,
it is safe to skip them, and the leaked objects will be eventually cleaned up by scrub or
by timeline deletion.
During AttachedMulti, the Tenant is free to drop layers from local disk in response to
disk pressure: only the deletion of remote layers is blocked.
#### AttachedStale
Currently, a pageserver with a stale generation number will continue to
upload layers, but be prevented from completing deletions. This is safe, but inefficient: layers uploaded by this stale generation
will not be read back by future generations of pageservers.
The _AttachedStale_ state disables S3 uploads. The stale pageserver
will continue to ingest the WAL and write layers to local disk, but not to
do any uploads to S3.
A node may enter AttachedStale in two ways:
- Explicitly, when control plane calls into the node at the start of a migration.
- Implicitly, when the node tries to validate some deletions and discovers
that its generation is stale.
The AttachedStale state also disables sending consumption metrics from
that location: it is interpreted as an indication that some other pageserver
is already attached or is about to be attached, and that new pageserver will
be responsible for sending consumption metrics.
#### Disk Pressure & AttachedStale
Over long periods of time, a tenant location in AttachedStale will accumulate data
on local disk, as it cannot evict any layers written since it entered the
AttachStale state. We rely on the control plane to revert the location to
Secondary or Detached at the end of a migration.
This scenario is particularly noteworthy when evacuating all tenants on a pageserver:
since _all_ the attached tenants will go into AttachedStale, we will be doing no
uploads at all, therefore ingested data will cause disk usage to increase continuously.
Under nominal conditions, the available disk space on pageservers should be sufficient
to complete the evacuation before this becomes a problem, but we must also handle
the case where we hit a low disk situation while in this state.
The concept of disk pressure already exists in the pageserver: the `disk_usage_eviction_task`
touches each Tenant when it determines that a low-disk condition requires
some layer eviction. Having selected layers for eviction, the eviction
task calls `Timeline::evict_layers`.
**Safety**: If evict_layers is called while in AttachedStale state, and some of the to-be-evicted
layers are not yet uploaded to S3, then the block on uploads will be lifted. This
will result in leaking some objects once a migration is complete, but will enable
the node to manage its disk space properly: if a node is left with some tenants
in AttachedStale indefinitely due to a network partition or control plane bug,
these tenants will not cause a full disk condition.
### Warm secondary updates
#### Layer heatmap
The secondary location's job is to serve reads **with the same quality of service as the original location
was serving them around the time of a migration**. This does not mean the secondary
location needs the whole set of layers: inactive layers that might soon
be evicted on the attached pageserver need not be downloaded by the
secondary. A totally idle tenant only needs to maintain enough on-disk
state to enable a fast cold start (i.e. the most recent image layers are
typically sufficient).
To enable this, we introduce the concept of a _layer heatmap_, which
acts as an advisory input to secondary locations to decide which
layers to download from S3.
#### Attached pageserver
The attached pageserver, if in state AttachedSingle, periodically
uploads a serialized heat map to S3. It may skip this if there
is no change since the last time it uploaded (e.g. if the tenant
is totally idle).
Additionally, when the tenant is flushed to remote storage prior to a migration
(the first step in [cutover procedure](#cutover-procedure)),
the heatmap is written out. This enables a future attached pageserver
to get an up to date view when deciding which layers to download.
#### Secondary location behavior
Secondary warm locations run a simple loop, implemented separately from
the main `Tenant` type, which represents attached tenants:
- Download the layer heatmap
- Select any "hot enough" layers to download, if there is sufficient
free disk space.
- Download layers, if they were not previously evicted (see below)
- Download the latest index_part.json
- Check if any layers currently on disk are no longer referenced by
IndexPart & delete them
Note that the heatmap is only advisory: if a secondary location has plenty
of disk space, it may choose to retain layers that aren't referenced
by the heatmap, as long as they are still referenced by the IndexPart. Conversely,
if a node is very low on disk space, it might opt to raise the heat threshold required
to both downloading a layer, until more disk space is available.
#### Secondary locations & disk pressure
Secondary locations are subject to eviction on disk pressure, just as
attached locations are. For eviction purposes, the access time of a
layer in a secondary location will be the access time given in the heatmap,
rather than the literal time at which the local layer file was accessed.
The heatmap will indicate which layers are in local storage on the attached
location. The secondary will always attempt to get back to having that
set of layers on disk, but to avoid flapping, it will remember the access
time of the layer it was most recently asked to evict, and layers whose
access time is below that will not be re-downloaded.
The resulting behavior is that after a layer is evicted from a secondary
location, it is only re-downloaded once the attached pageserver accesses
the layer and uploads a heatmap reflecting that access time. On a pageserver
restart, the secondary location will attempt to download all layers in
the heatmap again, if they are not on local disk.
This behavior will be slightly different when secondary locations are
used for "low energy tenants", but that is beyond the scope of this RFC.
### Location configuration API
Currently, the `/tenant/<tenant_id>/config` API defines various
tunables like compaction settings, which apply to the tenant irrespective
of which pageserver it is running on.
A new "location config" structure will be introduced, which defines
configuration which is per-tenant, but local to a particular pageserver,
such as the attachment mode and whether it is a secondary.
The pageserver will expose a new per-tenant API for setting
the state: `/tenant/<tenant_id>/location/config`.
Body content:
```
{
state: 'enum{Detached, Secondary, AttachedSingle, AttachedMulti, AttachedStale}',
generation: Option<u32>,
configuration: `Option<TenantConfig>`
flush: bool
}
```
Existing `/attach` and `/detach` endpoint will have the same
behavior as calling `/location/config` with `AttachedSingle` and `Detached`
states respectively. These endpoints will be deprecated and later
removed.
The generation attribute is mandatory for entering `AttachedSingle` or
`AttachedMulti`.
The configuration attribute is mandatory when entering any state other
than `Detached`. This configuration is the same as the body for
the existing `/tenant/<tenant_id>/config` endpoint.
The `flush` argument indicates whether the pageservers should flush
to S3 before proceeding: this only has any effect if the node is
currently in AttachedSingle or AttachedMulti. This is used
during the first phase of migration, when transitioning the
old pageserver to AttachedSingle.
The `/re-attach` API response will be extended to include a `state` as
well as a `generation`, enabling the pageserver to enter the
correct state for each tenant on startup.
### Database schema for locations
A new table `ProjectLocation`:
- pageserver_id: int
- tenant_id: TenantId
- generation: Option<int>
- state: `enum(Secondary, AttachedSingle, AttachedMulti)`
Notes:
- It is legacy for a Project to have zero `ProjectLocation`s
- The `pageserver` column in `Project` now means "to which pageserver should
endpoints connect", rather than simply which pageserver is attached.
- The `generation` column in `Project` remains, and is incremented and used
to set the generation of `ProjectLocation` rows when they are set into
an attached state.
- The `Detached` state is implicitly represented as the absence of
a `ProjectLocation`.
### Executing migrations
Migrations will be implemented as Go functions, within the
existing `Operation` framework in the control plane. These
operations are persistent, such that they will always keep
trying until completion: this property is important to avoid
leaving garbage behind on pageservers, such as AttachedStale
locations.
### Recovery from failures during migration
During migration, the control plane may encounter failures of either
the original or new pageserver, or both:
- If the original fails, skip past waiting for the new pageserver
to catch up, and put it into AttachedSingle immediately.
- If the new node fails, put the old pageserver into Secondary
and then back into AttachedSingle (this has the effect of
retaining on-disk state and granting it a fresh generation number).
- If both nodes fail, keep trying until one of them is available
again.
### Control plane -> Pageserver reconciliation
A migration may be done while the old node is unavailable,
in which case the old node may still be running in an AttachedStale
state.
In this case, it is undesirable to have the migration `Operation`
stay alive until the old node eventually comes back online
and can be cleaned up. To handle this, the control plane
should run a background reconciliation process to compare
a pageserver's attachments with the database, and clean up
any that shouldn't be there any more.
Note that there will be no work to do if the old node was really
offline, as during startup it will call into `/re-attach` and
be updated that way. The reconciliation will only be needed
if the node was unavailable but still running.
## Alternatives considered
### Only enabling secondary locations for tenants on a higher service tier
This will make sense in future, especially for tiny databases that may be
downloaded from S3 in milliseconds when needed.
However, it is not wise to do it immediately, because pageservers contain
a mixture of higher and lower tier workloads. If we had 1 tenant with
a secondary location and 9 without, then those other 9 tenants will do
a lot of I/O as they try to recover from S3, which may degrade the
service of the tenant which had a secondary location.
Until we segregate tenant on different service tiers on different pageserver
nodes, or implement & test QoS to ensure that tenants with secondaries are
not harmed by tenants without, we should use the same failover approach
for all the tenants.
### Hot secondary locations (continuous WAL replay)
Instead of secondary locations populating their caches from S3, we could
have them consume the WAL from safekeepers. The downsides of this would be:
- Double load on safekeepers, which are a less scalable service than S3
- Secondary locations' on-disk state would end up subtly different to
the remote state, which would make synchronizing with S3 more complex/expensive
when going into attached state.
The downside of only updating secondary locations from S3 is that we will
have a delay during migration from replaying the LSN range between what's
in S3 and what's in the pageserver. This range will be very small on
planned migrations, as we have the old pageserver flush to S3 immediately
before attaching the new pageserver. On unplanned migrations (old pageserver
is unavailable), the range of LSNs to replay is bounded by the flush frequency
on the old pageserver. However, the migration doesn't have to wait for the
replay: it's just that not-yet-replayed LSNs will be unavailable for read
until the new pageserver catches up.
We expect that pageserver reads of the most recent LSNs will be relatively
rare, as for an active endpoint those pages will usually still be in the postgres
page cache: this leads us to prefer synchronizing from S3 on secondary
locations, rather than consuming the WAL from safekeepers.
### Cold secondary locations
It is not functionally necessary to keep warm caches on secondary locations at all. However, if we do not, then
we would experience a de-facto availability loss in unplanned migrations, as reads to the new node would take an extremely long time (many seconds, perhaps minutes).
Warm caches on secondary locations are necessary to meet
our availability goals.
### Pageserver-granularity failover
Instead of migrating tenants individually, we could have entire spare nodes,
and on a node death, move all its work to one of these spares.
This approach is avoided for several reasons:
- we would still need fine-grained tenant migration for other
purposes such as balancing load
- by sharing the spare capacity over many peers rather than one spare node,
these peers may use the capacity for other purposes, until it is needed
to handle migrated tenants. e.g. for keeping a deeper cache of their
attached tenants.
### Readonly during migration
We could simplify migrations by making both previous and new nodes go into a
readonly state, then flush remote content from the previous node, then activate
attachment on the secondary node.
The downside to this approach is a potentially large gap in readability of
recent LSNs while loading data onto the new node. To avoid this, it is worthwhile
to incur the extra cost of double-replaying the WAL onto old and new nodes' local
storage during a migration.
### Peer-to-peer pageserver communication
Rather than uploading the heatmap to S3, attached pageservers could make it
available to peers.
Currently, pageservers have no peer to peer communication, so adding this
for heatmaps would incur significant overhead in deployment and configuration
of the service, and ensuring that when a new pageserver is deployed, other
pageservers are updated to be aware of it.
As well as simplifying implementation, putting heatmaps in S3 will be useful
for future analytics purposes -- gathering aggregated statistics on activity
pattersn across many tenants may be done directly from data in S3.

View File

@@ -107,7 +107,7 @@ pub const CHUNK_SIZE: usize = 1000;
// Just a wrapper around a slice of events
// to serialize it as `{"events" : [ ] }
#[derive(serde::Serialize, serde::Deserialize)]
#[derive(serde::Serialize)]
pub struct EventChunk<'a, T: Clone> {
pub events: std::borrow::Cow<'a, [T]>,
}

View File

@@ -431,14 +431,14 @@ impl CgroupWatcher {
.context("failed to request upscale")?;
let memory_high =
self.get_memory_high_bytes().context("failed to get memory.high")?;
self.get_high_bytes().context("failed to get memory.high")?;
let new_high = memory_high + self.config.memory_high_increase_by_bytes;
info!(
current_high_bytes = memory_high,
new_high_bytes = new_high,
"updating memory.high"
);
self.set_memory_high_bytes(new_high)
self.set_high_bytes(new_high)
.context("failed to set memory.high")?;
last_memory_high_increase_at = Some(Instant::now());
continue;
@@ -556,6 +556,14 @@ impl CgroupWatcher {
}
}
/// Represents a set of limits we apply to a cgroup to control memory usage.
///
/// Setting these values also affects the thresholds for receiving usage alerts.
#[derive(Debug)]
pub struct MemoryLimits {
pub high: u64,
}
// Methods for manipulating the actual cgroup
impl CgroupWatcher {
/// Get a handle on the freezer subsystem.
@@ -616,29 +624,50 @@ impl CgroupWatcher {
}
/// Set cgroup memory.high threshold.
pub fn set_memory_high_bytes(&self, bytes: u64) -> anyhow::Result<()> {
self.set_memory_high_internal(MaxValue::Value(u64::min(bytes, i64::MAX as u64) as i64))
}
/// Set the cgroup's memory.high to 'max', disabling it.
pub fn unset_memory_high(&self) -> anyhow::Result<()> {
self.set_memory_high_internal(MaxValue::Max)
}
fn set_memory_high_internal(&self, value: MaxValue) -> anyhow::Result<()> {
pub fn set_high_bytes(&self, bytes: u64) -> anyhow::Result<()> {
self.memory()
.context("failed to get memory subsystem")?
.set_mem(cgroups_rs::memory::SetMemory {
low: None,
high: Some(value),
high: Some(MaxValue::Value(u64::min(bytes, i64::MAX as u64) as i64)),
min: None,
max: None,
})
.map_err(anyhow::Error::from)
.context("failed to set memory.high")
}
/// Set cgroup memory.high and memory.max.
pub fn set_limits(&self, limits: &MemoryLimits) -> anyhow::Result<()> {
info!(limits.high, path = self.path(), "writing new memory limits",);
self.memory()
.context("failed to get memory subsystem while setting memory limits")?
.set_mem(cgroups_rs::memory::SetMemory {
min: None,
low: None,
high: Some(MaxValue::Value(
u64::min(limits.high, i64::MAX as u64) as i64
)),
max: None,
})
.context("failed to set memory limits")
}
/// Given some amount of available memory, set the desired cgroup memory limits
pub fn set_memory_limits(&mut self, available_memory: u64) -> anyhow::Result<()> {
let new_high = self.config.calculate_memory_high_value(available_memory);
let limits = MemoryLimits { high: new_high };
info!(
path = self.path(),
memory = ?limits,
"setting cgroup memory",
);
self.set_limits(&limits)
.context("failed to set cgroup memory limits")?;
Ok(())
}
/// Get memory.high threshold.
pub fn get_memory_high_bytes(&self) -> anyhow::Result<u64> {
pub fn get_high_bytes(&self) -> anyhow::Result<u64> {
let high = self
.memory()
.context("failed to get memory subsystem while getting memory statistics")?

View File

@@ -16,7 +16,7 @@ use tokio::sync::mpsc;
use tokio_util::sync::CancellationToken;
use tracing::{error, info, warn};
use crate::cgroup::{CgroupWatcher, Sequenced};
use crate::cgroup::{CgroupWatcher, MemoryLimits, Sequenced};
use crate::dispatcher::Dispatcher;
use crate::filecache::{FileCacheConfig, FileCacheState};
use crate::protocol::{InboundMsg, InboundMsgKind, OutboundMsg, OutboundMsgKind, Resources};
@@ -106,51 +106,6 @@ impl Runner {
kill,
};
// If we have both the cgroup and file cache integrations enabled, it's possible for
// temporary failures to result in cgroup throttling (from memory.high), that in turn makes
// it near-impossible to connect to the file cache (because it times out). Unfortunately,
// we *do* still want to determine the file cache size before setting the cgroup's
// memory.high, so it's not as simple as just swapping the order.
//
// Instead, the resolution here is that on vm-monitor startup (note: happens on each
// connection from autoscaler-agent, possibly multiple times per compute_ctl lifecycle), we
// temporarily unset memory.high, to allow any existing throttling to dissipate. It's a bit
// of a hacky solution, but helps with reliability.
if let Some(name) = &args.cgroup {
// Best not to set up cgroup stuff more than once, so we'll initialize cgroup state
// now, and then set limits later.
info!("initializing cgroup");
let (cgroup, cgroup_event_stream) = CgroupWatcher::new(name.clone(), requesting_send)
.context("failed to create cgroup manager")?;
info!("temporarily unsetting memory.high");
// Temporarily un-set cgroup memory.high; see above.
cgroup
.unset_memory_high()
.context("failed to unset memory.high")?;
let cgroup = Arc::new(cgroup);
let cgroup_clone = Arc::clone(&cgroup);
spawn_with_cancel(
token.clone(),
|_| error!("cgroup watcher terminated"),
async move { cgroup_clone.watch(notified_recv, cgroup_event_stream).await },
);
state.cgroup = Some(cgroup);
} else {
// *NOTE*: We need to forget the sender so that its drop impl does not get ran.
// This allows us to poll it in `Monitor::run` regardless of whether we
// are managing a cgroup or not. If we don't forget it, all receives will
// immediately return an error because the sender is droped and it will
// claim all select! statements, effectively turning `Monitor::run` into
// `loop { fail to receive }`.
mem::forget(requesting_send);
}
let mut file_cache_reserved_bytes = 0;
let mem = get_total_system_memory();
@@ -164,7 +119,7 @@ impl Runner {
false => FileCacheConfig::default_in_memory(),
};
let mut file_cache = FileCacheState::new(connstr, config, token)
let mut file_cache = FileCacheState::new(connstr, config, token.clone())
.await
.context("failed to create file cache")?;
@@ -197,15 +152,35 @@ impl Runner {
state.filecache = Some(file_cache);
}
if let Some(cgroup) = &state.cgroup {
let available = mem - file_cache_reserved_bytes;
let value = cgroup.config.calculate_memory_high_value(available);
if let Some(name) = &args.cgroup {
let (mut cgroup, cgroup_event_stream) =
CgroupWatcher::new(name.clone(), requesting_send)
.context("failed to create cgroup manager")?;
info!(value, "setting memory.high");
let available = mem - file_cache_reserved_bytes;
cgroup
.set_memory_high_bytes(value)
.context("failed to set cgroup memory.high")?;
.set_memory_limits(available)
.context("failed to set cgroup memory limits")?;
let cgroup = Arc::new(cgroup);
// Some might call this . . . cgroup v2
let cgroup_clone = Arc::clone(&cgroup);
spawn_with_cancel(token, |_| error!("cgroup watcher terminated"), async move {
cgroup_clone.watch(notified_recv, cgroup_event_stream).await
});
state.cgroup = Some(cgroup);
} else {
// *NOTE*: We need to forget the sender so that its drop impl does not get ran.
// This allows us to poll it in `Monitor::run` regardless of whether we
// are managing a cgroup or not. If we don't forget it, all receives will
// immediately return an error because the sender is droped and it will
// claim all select! statements, effectively turning `Monitor::run` into
// `loop { fail to receive }`.
mem::forget(requesting_send);
}
Ok(state)
@@ -282,11 +257,14 @@ impl Runner {
new_cgroup_mem_high = cgroup.config.calculate_memory_high_value(available_memory);
}
// new_cgroup_mem_high is initialized to 0 but it is guaranteed to not be here
// since it is properly initialized in the previous cgroup if let block
let limits = MemoryLimits {
// new_cgroup_mem_high is initialized to 0 but it is guarancontextd to not be here
// since it is properly initialized in the previous cgroup if let block
high: new_cgroup_mem_high,
};
cgroup
.set_memory_high_bytes(new_cgroup_mem_high)
.context("failed to set cgroup memory.high")?;
.set_limits(&limits)
.context("failed to set cgroup memory limits")?;
let message = format!(
"set cgroup memory.high to {} MiB, of new max {} MiB",
@@ -349,9 +327,12 @@ impl Runner {
name = cgroup.path(),
"updating cgroup memory.high",
);
let limits = MemoryLimits {
high: new_cgroup_mem_high,
};
cgroup
.set_memory_high_bytes(new_cgroup_mem_high)
.context("failed to set cgroup memory.high")?;
.set_limits(&limits)
.context("failed to set file cache size")?;
}
Ok(())

View File

@@ -81,7 +81,6 @@ enumset.workspace = true
strum.workspace = true
strum_macros.workspace = true
tempfile.workspace = true
async-channel = "1.9.0"
[dev-dependencies]
criterion.workspace = true

View File

@@ -605,31 +605,6 @@ fn start_pageserver(
);
}
task_mgr::spawn(
BACKGROUND_RUNTIME.handle(),
TaskKind::BackgroundRuntimeTurnaroundMeasure,
None,
None,
"background runtime turnaround measure",
true,
async move {
let server = hyper::Server::try_bind(&"0.0.0.0:2342".parse().unwrap()).expect("bind");
let server = server
.serve(hyper::service::make_service_fn(|_| async move {
Ok::<_, std::convert::Infallible>(hyper::service::service_fn(
move |_: hyper::Request<hyper::Body>| async move {
Ok::<_, std::convert::Infallible>(hyper::Response::new(
hyper::Body::from(format!("alive")),
))
},
))
}))
.with_graceful_shutdown(task_mgr::shutdown_watcher());
server.await?;
Ok(())
},
);
let mut shutdown_pageserver = Some(shutdown_pageserver.drop_guard());
// All started up! Now just sit and wait for shutdown signal.
@@ -669,7 +644,7 @@ fn create_remote_storage_client(
let config = if let Some(config) = &conf.remote_storage_config {
config
} else {
tracing::warn!("no remote storage configured, this is a deprecated configuration");
// No remote storage configured.
return Ok(None);
};

View File

@@ -264,46 +264,6 @@ pub static PAGE_CACHE_SIZE: Lazy<PageCacheSizeMetrics> = Lazy::new(|| PageCacheS
},
});
pub(crate) static PAGE_CACHE_ACQUIRE_PINNED_SLOT_TIME: Lazy<Histogram> = Lazy::new(|| {
register_histogram!(
"pageserver_page_cache_acquire_pinned_slot_seconds",
"Time spent acquiring a pinned slot in the page cache",
CRITICAL_OP_BUCKETS.into(),
)
.expect("failed to define a metric")
});
pub(crate) static PAGE_CACHE_FIND_VICTIMS_ITERS_TOTAL: Lazy<IntCounter> = Lazy::new(|| {
register_int_counter!(
"pageserver_page_cache_find_victim_iters_total",
"Counter for the number of iterations in the find_victim loop",
)
.expect("failed to define a metric")
});
static PAGE_CACHE_ERRORS: Lazy<IntCounterVec> = Lazy::new(|| {
register_int_counter_vec!(
"page_cache_errors_total",
"Number of timeouts while acquiring a pinned slot in the page cache",
&["error_kind"]
)
.expect("failed to define a metric")
});
#[derive(IntoStaticStr)]
#[strum(serialize_all = "kebab_case")]
pub(crate) enum PageCacheErrorKind {
AcquirePinnedSlotTimeout,
EvictIterLimit,
}
pub(crate) fn page_cache_errors_inc(error_kind: PageCacheErrorKind) {
PAGE_CACHE_ERRORS
.get_metric_with_label_values(&[error_kind.into()])
.unwrap()
.inc();
}
pub(crate) static WAIT_LSN_TIME: Lazy<Histogram> = Lazy::new(|| {
register_histogram!(
"pageserver_wait_lsn_seconds",

View File

@@ -75,12 +75,7 @@
use std::{
collections::{hash_map::Entry, HashMap},
convert::TryInto,
sync::{
atomic::{AtomicU64, AtomicU8, AtomicUsize, Ordering},
Arc, Weak,
},
task::Poll,
time::Duration,
sync::atomic::{AtomicU64, AtomicU8, AtomicUsize, Ordering},
};
use anyhow::Context;
@@ -170,8 +165,6 @@ struct Slot {
struct SlotInner {
key: Option<CacheKey>,
// for `coalesce_readers_permit`
permit: std::sync::Mutex<Weak<PinnedSlotsPermit>>,
buf: &'static mut [u8; PAGE_SZ],
}
@@ -214,22 +207,6 @@ impl Slot {
}
}
impl SlotInner {
/// If there is aready a reader, drop our permit and share its permit, just like we share read access.
fn coalesce_readers_permit(&self, permit: PinnedSlotsPermit) -> Arc<PinnedSlotsPermit> {
let mut guard = self.permit.lock().unwrap();
if let Some(existing_permit) = guard.upgrade() {
drop(guard);
drop(permit);
existing_permit
} else {
let permit = Arc::new(permit);
*guard = Arc::downgrade(&permit);
permit
}
}
}
pub struct PageCache {
/// This contains the mapping from the cache key to buffer slot that currently
/// contains the page, if any.
@@ -247,42 +224,30 @@ pub struct PageCache {
/// The actual buffers with their metadata.
slots: Box<[Slot]>,
pinned_slots: Arc<tokio::sync::Semaphore>,
/// Index of the next candidate to evict, for the Clock replacement algorithm.
/// This is interpreted modulo the page cache size.
next_evict_slot: AtomicUsize,
find_victim_sender:
async_channel::Sender<(usize, tokio::sync::RwLockWriteGuard<'static, SlotInner>)>,
find_victim_waiters:
async_channel::Receiver<(usize, tokio::sync::RwLockWriteGuard<'static, SlotInner>)>,
size_metrics: &'static PageCacheSizeMetrics,
}
struct PinnedSlotsPermit(tokio::sync::OwnedSemaphorePermit);
///
/// PageReadGuard is a "lease" on a buffer, for reading. The page is kept locked
/// until the guard is dropped.
///
pub struct PageReadGuard<'i> {
_permit: Arc<PinnedSlotsPermit>,
slot_guard: tokio::sync::RwLockReadGuard<'i, SlotInner>,
}
pub struct PageReadGuard<'i>(tokio::sync::RwLockReadGuard<'i, SlotInner>);
impl std::ops::Deref for PageReadGuard<'_> {
type Target = [u8; PAGE_SZ];
fn deref(&self) -> &Self::Target {
self.slot_guard.buf
self.0.buf
}
}
impl AsRef<[u8; PAGE_SZ]> for PageReadGuard<'_> {
fn as_ref(&self) -> &[u8; PAGE_SZ] {
self.slot_guard.buf
self.0.buf
}
}
@@ -297,23 +262,16 @@ impl AsRef<[u8; PAGE_SZ]> for PageReadGuard<'_> {
/// to initialize.
///
pub struct PageWriteGuard<'i> {
state: PageWriteGuardState<'i>,
}
inner: tokio::sync::RwLockWriteGuard<'i, SlotInner>,
enum PageWriteGuardState<'i> {
Invalid {
inner: tokio::sync::RwLockWriteGuard<'i, SlotInner>,
_permit: PinnedSlotsPermit,
},
Downgraded,
// Are the page contents currently valid?
// Used to mark pages as invalid that are assigned but not yet filled with data.
valid: bool,
}
impl std::ops::DerefMut for PageWriteGuard<'_> {
fn deref_mut(&mut self) -> &mut Self::Target {
match &mut self.state {
PageWriteGuardState::Invalid { inner, _permit } => &mut inner.buf,
PageWriteGuardState::Downgraded => unreachable!(),
}
self.inner.buf
}
}
@@ -321,37 +279,25 @@ impl std::ops::Deref for PageWriteGuard<'_> {
type Target = [u8; PAGE_SZ];
fn deref(&self) -> &Self::Target {
match &self.state {
PageWriteGuardState::Invalid { inner, _permit } => &inner.buf,
PageWriteGuardState::Downgraded => unreachable!(),
}
self.inner.buf
}
}
impl AsMut<[u8; PAGE_SZ]> for PageWriteGuard<'_> {
fn as_mut(&mut self) -> &mut [u8; PAGE_SZ] {
match &mut self.state {
PageWriteGuardState::Invalid { inner, _permit } => &mut inner.buf,
PageWriteGuardState::Downgraded => todo!(),
}
self.inner.buf
}
}
impl<'a> PageWriteGuard<'a> {
impl PageWriteGuard<'_> {
/// Mark that the buffer contents are now valid.
#[must_use]
pub fn mark_valid(mut self) -> PageReadGuard<'a> {
let prev = std::mem::replace(&mut self.state, PageWriteGuardState::Downgraded);
match prev {
PageWriteGuardState::Invalid { inner, _permit } => {
assert!(inner.key.is_some());
PageReadGuard {
_permit: Arc::new(_permit),
slot_guard: inner.downgrade(),
}
}
PageWriteGuardState::Downgraded => unreachable!(),
}
pub fn mark_valid(&mut self) {
assert!(self.inner.key.is_some());
assert!(
!self.valid,
"mark_valid called on a buffer that was already valid"
);
self.valid = true;
}
}
@@ -362,13 +308,11 @@ impl Drop for PageWriteGuard<'_> {
/// initializing it, remove the mapping from the page cache.
///
fn drop(&mut self) {
match &mut self.state {
PageWriteGuardState::Invalid { inner, _permit } => {
let self_key = inner.key.as_ref().unwrap();
PAGE_CACHE.get().unwrap().remove_mapping(self_key);
inner.key = None;
}
PageWriteGuardState::Downgraded => {}
assert!(self.inner.key.is_some());
if !self.valid {
let self_key = self.inner.key.as_ref().unwrap();
PAGE_CACHE.get().unwrap().remove_mapping(self_key);
self.inner.key = None;
}
}
}
@@ -381,7 +325,7 @@ pub enum ReadBufResult<'a> {
/// lock_for_write() return value
pub enum WriteBufResult<'a> {
Found(PageReadGuard<'a>),
Found(PageWriteGuard<'a>),
NotFound(PageWriteGuard<'a>),
}
@@ -404,10 +348,6 @@ impl PageCache {
lsn: Lsn,
ctx: &RequestContext,
) -> Option<(Lsn, PageReadGuard)> {
let Ok(permit) = self.try_get_pinned_slot_permit().await else {
return None;
};
crate::metrics::PAGE_CACHE
.for_ctx(ctx)
.read_accesses_materialized_page
@@ -422,10 +362,7 @@ impl PageCache {
lsn,
};
if let Some(guard) = self
.try_lock_for_read(&mut cache_key, &mut Some(permit))
.await
{
if let Some(guard) = self.try_lock_for_read(&mut cache_key).await {
if let CacheKey::MaterializedPage {
hash_key: _,
lsn: available_lsn,
@@ -455,7 +392,7 @@ impl PageCache {
/// Store an image of the given page in the cache.
///
pub async fn memorize_materialized_page(
&'static self,
&self,
tenant_id: TenantId,
timeline_id: TimelineId,
key: Key,
@@ -472,15 +409,15 @@ impl PageCache {
};
match self.lock_for_write(&cache_key).await? {
WriteBufResult::Found(read_guard) => {
WriteBufResult::Found(write_guard) => {
// We already had it in cache. Another thread must've put it there
// concurrently. Check that it had the same contents that we
// replayed.
assert!(*read_guard == img);
assert!(*write_guard == img);
}
WriteBufResult::NotFound(mut write_guard) => {
write_guard.copy_from_slice(img);
let _ = write_guard.mark_valid();
write_guard.mark_valid();
}
}
@@ -490,7 +427,7 @@ impl PageCache {
// Section 1.2: Public interface functions for working with immutable file pages.
pub async fn read_immutable_buf(
&'static self,
&self,
file_id: FileId,
blkno: u32,
ctx: &RequestContext,
@@ -508,16 +445,6 @@ impl PageCache {
// "mappings" after this section. But the routines in this section should
// not require changes.
async fn try_get_pinned_slot_permit(&self) -> anyhow::Result<PinnedSlotsPermit> {
let _timer = crate::metrics::PAGE_CACHE_ACQUIRE_PINNED_SLOT_TIME.start_timer();
Ok(PinnedSlotsPermit(
Arc::clone(&self.pinned_slots)
.acquire_owned()
.await
.unwrap(),
))
}
/// Look up a page in the cache.
///
/// If the search criteria is not exact, *cache_key is updated with the key
@@ -527,11 +454,7 @@ impl PageCache {
///
/// If no page is found, returns None and *cache_key is left unmodified.
///
async fn try_lock_for_read(
&self,
cache_key: &mut CacheKey,
permit: &mut Option<PinnedSlotsPermit>,
) -> Option<PageReadGuard> {
async fn try_lock_for_read(&self, cache_key: &mut CacheKey) -> Option<PageReadGuard> {
let cache_key_orig = cache_key.clone();
if let Some(slot_idx) = self.search_mapping(cache_key) {
// The page was found in the mapping. Lock the slot, and re-check
@@ -541,10 +464,7 @@ impl PageCache {
let inner = slot.inner.read().await;
if inner.key.as_ref() == Some(cache_key) {
slot.inc_usage_count();
return Some(PageReadGuard {
_permit: inner.coalesce_readers_permit(permit.take().unwrap()),
slot_guard: inner,
});
return Some(PageReadGuard(inner));
} else {
// search_mapping might have modified the search key; restore it.
*cache_key = cache_key_orig;
@@ -583,12 +503,10 @@ impl PageCache {
/// ```
///
async fn lock_for_read(
&'static self,
&self,
cache_key: &mut CacheKey,
ctx: &RequestContext,
) -> anyhow::Result<ReadBufResult> {
let mut permit = Some(self.try_get_pinned_slot_permit().await?);
let (read_access, hit) = match cache_key {
CacheKey::MaterializedPage { .. } => {
unreachable!("Materialized pages use lookup_materialized_page")
@@ -605,21 +523,17 @@ impl PageCache {
let mut is_first_iteration = true;
loop {
// First check if the key already exists in the cache.
if let Some(read_guard) = self.try_lock_for_read(cache_key, &mut permit).await {
debug_assert!(permit.is_none());
if let Some(read_guard) = self.try_lock_for_read(cache_key).await {
if is_first_iteration {
hit.inc();
}
return Ok(ReadBufResult::Found(read_guard));
}
debug_assert!(permit.is_some());
is_first_iteration = false;
// Not found. Find a victim buffer
let (slot_idx, mut inner) = self
.find_victim(permit.as_ref().unwrap())
.await
.context("Failed to find evict victim")?;
let (slot_idx, mut inner) =
self.find_victim().context("Failed to find evict victim")?;
// Insert mapping for this. At this point, we may find that another
// thread did the same thing concurrently. In that case, we evicted
@@ -641,41 +555,27 @@ impl PageCache {
inner.key = Some(cache_key.clone());
slot.set_usage_count(1);
debug_assert!(
{
let guard = inner.permit.lock().unwrap();
guard.upgrade().is_none()
},
"we hold a write lock, so, no one else should have a permit"
);
return Ok(ReadBufResult::NotFound(PageWriteGuard {
state: PageWriteGuardState::Invalid {
_permit: permit.take().unwrap(),
inner,
},
inner,
valid: false,
}));
}
}
// FIXME: the name is wrong.
async fn try_lock_for_write(
&self,
cache_key: &CacheKey,
permit: &mut Option<PinnedSlotsPermit>,
) -> Option<PageReadGuard> {
/// Look up a page in the cache and lock it in write mode. If it's not
/// found, returns None.
///
/// When locking a page for writing, the search criteria is always "exact".
async fn try_lock_for_write(&self, cache_key: &CacheKey) -> Option<PageWriteGuard> {
if let Some(slot_idx) = self.search_mapping_for_write(cache_key) {
// The page was found in the mapping. Lock the slot, and re-check
// that it's still what we expected (because we don't released the mapping
// lock already, another thread could have evicted the page)
let slot = &self.slots[slot_idx];
let inner = slot.inner.read().await;
let inner = slot.inner.write().await;
if inner.key.as_ref() == Some(cache_key) {
slot.inc_usage_count();
return Some(PageReadGuard {
_permit: inner.coalesce_readers_permit(permit.take().unwrap()),
slot_guard: inner,
});
return Some(PageWriteGuard { inner, valid: true });
}
}
None
@@ -685,21 +585,16 @@ impl PageCache {
///
/// Similar to lock_for_read(), but the returned buffer is write-locked and
/// may be modified by the caller even if it's already found in the cache.
async fn lock_for_write(&'static self, cache_key: &CacheKey) -> anyhow::Result<WriteBufResult> {
let mut permit = Some(self.try_get_pinned_slot_permit().await?);
async fn lock_for_write(&self, cache_key: &CacheKey) -> anyhow::Result<WriteBufResult> {
loop {
// First check if the key already exists in the cache.
if let Some(write_guard) = self.try_lock_for_write(cache_key, &mut permit).await {
debug_assert!(permit.is_none());
if let Some(write_guard) = self.try_lock_for_write(cache_key).await {
return Ok(WriteBufResult::Found(write_guard));
}
debug_assert!(permit.is_some());
// Not found. Find a victim buffer
let (slot_idx, mut inner) = self
.find_victim(permit.as_ref().unwrap())
.await
.context("Failed to find evict victim")?;
let (slot_idx, mut inner) =
self.find_victim().context("Failed to find evict victim")?;
// Insert mapping for this. At this point, we may find that another
// thread did the same thing concurrently. In that case, we evicted
@@ -721,19 +616,9 @@ impl PageCache {
inner.key = Some(cache_key.clone());
slot.set_usage_count(1);
debug_assert!(
{
let guard = inner.permit.lock().unwrap();
guard.upgrade().is_none()
},
"we hold a write lock, so, no one else should have a permit"
);
return Ok(WriteBufResult::NotFound(PageWriteGuard {
state: PageWriteGuardState::Invalid {
_permit: permit.take().unwrap(),
inner,
},
inner,
valid: false,
}));
}
}
@@ -884,21 +769,8 @@ impl PageCache {
/// Find a slot to evict.
///
/// On return, the slot is empty and write-locked.
async fn find_victim(
&'static self,
_permit_witness: &PinnedSlotsPermit,
) -> anyhow::Result<(usize, tokio::sync::RwLockWriteGuard<SlotInner>)> {
// Get in line.
let mut receiver = self.find_victim_waiters.recv();
// If we get cancelled at the receiver.await below, the victim slot
// remains in the channel. Consume these first before going into
// the loop below.
match futures::poll!(&mut receiver) {
Poll::Ready(Ok(res)) => return Ok(res),
Poll::Ready(Err(_closed)) => unreachable!("we never close the channel"),
Poll::Pending => {} // the regular case where we aren't cancelled below
};
fn find_victim(&self) -> anyhow::Result<(usize, tokio::sync::RwLockWriteGuard<SlotInner>)> {
let iter_limit = self.slots.len() * 10;
let mut iters = 0;
loop {
iters += 1;
@@ -910,8 +782,14 @@ impl PageCache {
let mut inner = match slot.inner.try_write() {
Ok(inner) => inner,
Err(_err) => {
if iters > self.slots.len() * (MAX_USAGE_COUNT as usize) {
unreachable!("find_victim_waiters prevents starvation");
// If we have looped through the whole buffer pool 10 times
// and still haven't found a victim buffer, something's wrong.
// Maybe all the buffers were in locked. That could happen in
// theory, if you have more threads holding buffers locked than
// there are buffers in the pool. In practice, with a reasonably
// large buffer pool it really shouldn't happen.
if iters > iter_limit {
anyhow::bail!("exceeded evict iter limit");
}
continue;
}
@@ -921,11 +799,7 @@ impl PageCache {
self.remove_mapping(old_key);
inner.key = None;
}
crate::metrics::PAGE_CACHE_FIND_VICTIMS_ITERS_TOTAL.inc_by(iters as u64);
self.find_victim_sender
.try_send((slot_idx, inner))
.expect("we always get in line first");
return Ok(receiver.await.unwrap());
return Ok((slot_idx, inner));
}
}
}
@@ -952,26 +826,18 @@ impl PageCache {
let buf: &mut [u8; PAGE_SZ] = chunk.try_into().unwrap();
Slot {
inner: tokio::sync::RwLock::new(SlotInner {
key: None,
buf,
permit: std::sync::Mutex::new(Weak::new()),
}),
inner: tokio::sync::RwLock::new(SlotInner { key: None, buf }),
usage_count: AtomicU8::new(0),
}
})
.collect();
let (find_victim_sender, find_victim_waiters) = async_channel::bounded(num_pages);
Self {
materialized_page_map: Default::default(),
immutable_page_map: Default::default(),
slots,
next_evict_slot: AtomicUsize::new(0),
size_metrics,
pinned_slots: Arc::new(tokio::sync::Semaphore::new(num_pages)),
find_victim_sender,
find_victim_waiters,
}
}
}

View File

@@ -293,8 +293,6 @@ pub enum TaskKind {
DebugTool,
BackgroundRuntimeTurnaroundMeasure,
#[cfg(test)]
UnitTest,
}

View File

@@ -186,22 +186,27 @@ impl FileBlockReader {
ctx: &RequestContext,
) -> Result<BlockLease, std::io::Error> {
let cache = page_cache::get();
match cache
.read_immutable_buf(self.file_id, blknum, ctx)
.await
.map_err(|e| {
std::io::Error::new(
std::io::ErrorKind::Other,
format!("Failed to read immutable buf: {e:#}"),
)
})? {
ReadBufResult::Found(guard) => return Ok(guard.into()),
ReadBufResult::NotFound(mut write_guard) => {
// Read the page from disk into the buffer
self.fill_buffer(write_guard.deref_mut(), blknum).await?;
return Ok(write_guard.mark_valid().into());
}
};
loop {
match cache
.read_immutable_buf(self.file_id, blknum, ctx)
.await
.map_err(|e| {
std::io::Error::new(
std::io::ErrorKind::Other,
format!("Failed to read immutable buf: {e:#}"),
)
})? {
ReadBufResult::Found(guard) => break Ok(guard.into()),
ReadBufResult::NotFound(mut write_guard) => {
// Read the page from disk into the buffer
self.fill_buffer(write_guard.deref_mut(), blknum).await?;
write_guard.mark_valid();
// Swap for read lock
continue;
}
};
}
}
}

View File

@@ -70,34 +70,38 @@ impl EphemeralFile {
let flushed_blknums = 0..self.len / PAGE_SZ as u64;
if flushed_blknums.contains(&(blknum as u64)) {
let cache = page_cache::get();
match cache
.read_immutable_buf(self.page_cache_file_id, blknum, ctx)
.await
.map_err(|e| {
std::io::Error::new(
std::io::ErrorKind::Other,
// order path before error because error is anyhow::Error => might have many contexts
format!(
"ephemeral file: read immutable page #{}: {}: {:#}",
blknum,
self.file.path.display(),
e,
),
)
})? {
page_cache::ReadBufResult::Found(guard) => {
return Ok(BlockLease::PageReadGuard(guard))
}
page_cache::ReadBufResult::NotFound(mut write_guard) => {
let buf: &mut [u8] = write_guard.deref_mut();
debug_assert_eq!(buf.len(), PAGE_SZ);
self.file
.read_exact_at(&mut buf[..], blknum as u64 * PAGE_SZ as u64)
.await?;
let read_guard = write_guard.mark_valid();
return Ok(BlockLease::PageReadGuard(read_guard));
}
};
loop {
match cache
.read_immutable_buf(self.page_cache_file_id, blknum, ctx)
.await
.map_err(|e| {
std::io::Error::new(
std::io::ErrorKind::Other,
// order path before error because error is anyhow::Error => might have many contexts
format!(
"ephemeral file: read immutable page #{}: {}: {:#}",
blknum,
self.file.path.display(),
e,
),
)
})? {
page_cache::ReadBufResult::Found(guard) => {
return Ok(BlockLease::PageReadGuard(guard))
}
page_cache::ReadBufResult::NotFound(mut write_guard) => {
let buf: &mut [u8] = write_guard.deref_mut();
debug_assert_eq!(buf.len(), PAGE_SZ);
self.file
.read_exact_at(&mut buf[..], blknum as u64 * PAGE_SZ as u64)
.await?;
write_guard.mark_valid();
// Swap for read lock
continue;
}
};
}
} else {
debug_assert_eq!(blknum as u64, self.len / PAGE_SZ as u64);
Ok(BlockLease::EphemeralFileMutableTail(&self.mutable_tail))
@@ -167,7 +171,7 @@ impl EphemeralFile {
let buf: &mut [u8] = write_guard.deref_mut();
debug_assert_eq!(buf.len(), PAGE_SZ);
buf.copy_from_slice(&self.ephemeral_file.mutable_tail);
let _ = write_guard.mark_valid();
write_guard.mark_valid();
// pre-warm successful
}
Err(e) => {

View File

@@ -130,15 +130,10 @@ pub async fn init_tenant_mgr(
// deletion list entries may still be valid. We provide that by pushing a recovery operation into
// the queue. Sequential processing of te queue ensures that recovery is done before any new tenant deletions
// are processed, even though we don't block on recovery completing here.
//
// Must only do this if remote storage is enabled, otherwise deletion queue
// is not running and channel push will fail.
if resources.remote_storage.is_some() {
resources
.deletion_queue_client
.recover(result.clone())
.await?;
}
resources
.deletion_queue_client
.recover(result.clone())
.await?;
Some(result)
} else {

View File

@@ -864,11 +864,11 @@ impl DeltaLayerInner {
expected_summary.index_start_blk = actual_summary.index_start_blk;
expected_summary.index_root_blk = actual_summary.index_root_blk;
if actual_summary != expected_summary {
// bail!(
// "in-file summary does not match expected summary. actual = {:?} expected = {:?}",
// actual_summary,
// expected_summary
// );
bail!(
"in-file summary does not match expected summary. actual = {:?} expected = {:?}",
actual_summary,
expected_summary
);
}
}

View File

@@ -457,11 +457,11 @@ impl ImageLayerInner {
expected_summary.index_root_blk = actual_summary.index_root_blk;
if actual_summary != expected_summary {
// bail!(
// "in-file summary does not match expected summary. actual = {:?} expected = {:?}",
// actual_summary,
// expected_summary
// );
bail!(
"in-file summary does not match expected summary. actual = {:?} expected = {:?}",
actual_summary,
expected_summary
);
}
}

View File

@@ -655,38 +655,38 @@ impl Timeline {
) -> anyhow::Result<()> {
const ROUNDS: usize = 2;
// static CONCURRENT_COMPACTIONS: once_cell::sync::Lazy<tokio::sync::Semaphore> =
// once_cell::sync::Lazy::new(|| {
// let total_threads = *task_mgr::BACKGROUND_RUNTIME_WORKER_THREADS;
// let permits = usize::max(
// 1,
// // while a lot of the work is done on spawn_blocking, we still do
// // repartitioning in the async context. this should give leave us some workers
// // unblocked to be blocked on other work, hopefully easing any outside visible
// // effects of restarts.
// //
// // 6/8 is a guess; previously we ran with unlimited 8 and more from
// // spawn_blocking.
// (total_threads * 3).checked_div(4).unwrap_or(0),
// );
// assert_ne!(permits, 0, "we will not be adding in permits later");
// assert!(
// permits < total_threads,
// "need threads avail for shorter work"
// );
// tokio::sync::Semaphore::new(permits)
// });
static CONCURRENT_COMPACTIONS: once_cell::sync::Lazy<tokio::sync::Semaphore> =
once_cell::sync::Lazy::new(|| {
let total_threads = *task_mgr::BACKGROUND_RUNTIME_WORKER_THREADS;
let permits = usize::max(
1,
// while a lot of the work is done on spawn_blocking, we still do
// repartitioning in the async context. this should give leave us some workers
// unblocked to be blocked on other work, hopefully easing any outside visible
// effects of restarts.
//
// 6/8 is a guess; previously we ran with unlimited 8 and more from
// spawn_blocking.
(total_threads * 3).checked_div(4).unwrap_or(0),
);
assert_ne!(permits, 0, "we will not be adding in permits later");
assert!(
permits < total_threads,
"need threads avail for shorter work"
);
tokio::sync::Semaphore::new(permits)
});
// // this wait probably never needs any "long time spent" logging, because we already nag if
// // compaction task goes over it's period (20s) which is quite often in production.
// let _permit = tokio::select! {
// permit = CONCURRENT_COMPACTIONS.acquire() => {
// permit
// },
// _ = cancel.cancelled() => {
// return Ok(());
// }
// };
// this wait probably never needs any "long time spent" logging, because we already nag if
// compaction task goes over it's period (20s) which is quite often in production.
let _permit = tokio::select! {
permit = CONCURRENT_COMPACTIONS.acquire() => {
permit
},
_ = cancel.cancelled() => {
return Ok(());
}
};
let last_record_lsn = self.get_last_record_lsn();

View File

@@ -18,8 +18,7 @@ use std::io::{Error, ErrorKind, Seek, SeekFrom};
use std::os::unix::fs::FileExt;
use std::path::{Path, PathBuf};
use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
use tokio::sync::{RwLock, RwLockReadGuard, RwLockWriteGuard};
use tokio::time::Instant;
use std::sync::{RwLock, RwLockWriteGuard};
///
/// A virtual file descriptor. You can use this just like std::fs::File, but internally
@@ -111,7 +110,7 @@ impl OpenFiles {
///
/// On return, we hold a lock on the slot, and its 'tag' has been updated
/// recently_used has been set. It's all ready for reuse.
async fn find_victim_slot(&self) -> (SlotHandle, RwLockWriteGuard<SlotInner>) {
fn find_victim_slot(&self) -> (SlotHandle, RwLockWriteGuard<SlotInner>) {
//
// Run the clock algorithm to find a slot to replace.
//
@@ -143,7 +142,7 @@ impl OpenFiles {
}
retries += 1;
} else {
slot_guard = slot.inner.write().await;
slot_guard = slot.inner.write().unwrap();
index = next;
break;
}
@@ -154,7 +153,7 @@ impl OpenFiles {
// old file.
//
if let Some(old_file) = slot_guard.file.take() {
// the normal path of dropping VirtualFile uses `Close`, use `CloseByReplace` here to
// the normal path of dropping VirtualFile uses "close", use "close-by-replace" here to
// distinguish the two.
STORAGE_IO_TIME_METRIC
.get(StorageIoOperation::CloseByReplace)
@@ -209,29 +208,6 @@ impl CrashsafeOverwriteError {
}
}
/// Observe duration for the given storage I/O operation
///
/// Unlike `observe_closure_duration`, this supports async,
/// where "support" means that we measure wall clock time.
macro_rules! observe_duration {
($op:expr, $($body:tt)*) => {{
let instant = Instant::now();
let result = $($body)*;
let elapsed = instant.elapsed().as_secs_f64();
STORAGE_IO_TIME_METRIC
.get($op)
.observe(elapsed);
result
}}
}
macro_rules! with_file {
($this:expr, $op:expr, | $ident:ident | $($body:tt)*) => {{
let $ident = $this.lock_file().await?;
observe_duration!($op, $($body)*)
}};
}
impl VirtualFile {
/// Open a file in read-only mode. Like File::open.
pub async fn open(path: &Path) -> Result<VirtualFile, std::io::Error> {
@@ -268,9 +244,11 @@ impl VirtualFile {
tenant_id = "*".to_string();
timeline_id = "*".to_string();
}
let (handle, mut slot_guard) = get_open_files().find_victim_slot().await;
let (handle, mut slot_guard) = get_open_files().find_victim_slot();
let file = observe_duration!(StorageIoOperation::Open, open_options.open(path))?;
let file = STORAGE_IO_TIME_METRIC
.get(StorageIoOperation::Open)
.observe_closure_duration(|| open_options.open(path))?;
// Strip all options other than read and write.
//
@@ -353,24 +331,22 @@ impl VirtualFile {
/// Call File::sync_all() on the underlying File.
pub async fn sync_all(&self) -> Result<(), Error> {
with_file!(self, StorageIoOperation::Fsync, |file| file
.as_ref()
.sync_all())
self.with_file(StorageIoOperation::Fsync, |file| file.sync_all())
.await?
}
pub async fn metadata(&self) -> Result<fs::Metadata, Error> {
with_file!(self, StorageIoOperation::Metadata, |file| file
.as_ref()
.metadata())
self.with_file(StorageIoOperation::Metadata, |file| file.metadata())
.await?
}
/// Helper function internal to `VirtualFile` that looks up the underlying File,
/// opens it and evicts some other File if necessary. The passed parameter is
/// assumed to be a function available for the physical `File`.
///
/// We are doing it via a macro as Rust doesn't support async closures that
/// take on parameters with lifetimes.
async fn lock_file(&self) -> Result<FileGuard<'_>, Error> {
/// Helper function that looks up the underlying File for this VirtualFile,
/// opening it and evicting some other File if necessary. It calls 'func'
/// with the physical File.
async fn with_file<F, R>(&self, op: StorageIoOperation, mut func: F) -> Result<R, Error>
where
F: FnMut(&File) -> R,
{
let open_files = get_open_files();
let mut handle_guard = {
@@ -380,23 +356,27 @@ impl VirtualFile {
// We only need to hold the handle lock while we read the current handle. If
// another thread closes the file and recycles the slot for a different file,
// we will notice that the handle we read is no longer valid and retry.
let mut handle = *self.handle.read().await;
let mut handle = *self.handle.read().unwrap();
loop {
// Check if the slot contains our File
{
let slot = &open_files.slots[handle.index];
let slot_guard = slot.inner.read().await;
if slot_guard.tag == handle.tag && slot_guard.file.is_some() {
// Found a cached file descriptor.
slot.recently_used.store(true, Ordering::Relaxed);
return Ok(FileGuard { slot_guard });
let slot_guard = slot.inner.read().unwrap();
if slot_guard.tag == handle.tag {
if let Some(file) = &slot_guard.file {
// Found a cached file descriptor.
slot.recently_used.store(true, Ordering::Relaxed);
return Ok(STORAGE_IO_TIME_METRIC
.get(op)
.observe_closure_duration(|| func(file)));
}
}
}
// The slot didn't contain our File. We will have to open it ourselves,
// but before that, grab a write lock on handle in the VirtualFile, so
// that no other thread will try to concurrently open the same file.
let handle_guard = self.handle.write().await;
let handle_guard = self.handle.write().unwrap();
// If another thread changed the handle while we were not holding the lock,
// then the handle might now be valid again. Loop back to retry.
@@ -410,10 +390,17 @@ impl VirtualFile {
// We need to open the file ourselves. The handle in the VirtualFile is
// now locked in write-mode. Find a free slot to put it in.
let (handle, mut slot_guard) = open_files.find_victim_slot().await;
let (handle, mut slot_guard) = open_files.find_victim_slot();
// Open the physical file
let file = observe_duration!(StorageIoOperation::Open, self.open_options.open(&self.path))?;
let file = STORAGE_IO_TIME_METRIC
.get(StorageIoOperation::Open)
.observe_closure_duration(|| self.open_options.open(&self.path))?;
// Perform the requested operation on it
let result = STORAGE_IO_TIME_METRIC
.get(op)
.observe_closure_duration(|| func(&file));
// Store the File in the slot and update the handle in the VirtualFile
// to point to it.
@@ -421,9 +408,7 @@ impl VirtualFile {
*handle_guard = handle;
return Ok(FileGuard {
slot_guard: slot_guard.downgrade(),
});
Ok(result)
}
pub fn remove(self) {
@@ -438,9 +423,11 @@ impl VirtualFile {
self.pos = offset;
}
SeekFrom::End(offset) => {
self.pos = with_file!(self, StorageIoOperation::Seek, |file| file
.as_ref()
.seek(SeekFrom::End(offset)))?
self.pos = self
.with_file(StorageIoOperation::Seek, |mut file| {
file.seek(SeekFrom::End(offset))
})
.await??
}
SeekFrom::Current(offset) => {
let pos = self.pos as i128 + offset as i128;
@@ -528,9 +515,9 @@ impl VirtualFile {
}
pub async fn read_at(&self, buf: &mut [u8], offset: u64) -> Result<usize, Error> {
let result = with_file!(self, StorageIoOperation::Read, |file| file
.as_ref()
.read_at(buf, offset));
let result = self
.with_file(StorageIoOperation::Read, |file| file.read_at(buf, offset))
.await?;
if let Ok(size) = result {
STORAGE_IO_SIZE
.with_label_values(&["read", &self.tenant_id, &self.timeline_id])
@@ -540,9 +527,9 @@ impl VirtualFile {
}
async fn write_at(&self, buf: &[u8], offset: u64) -> Result<usize, Error> {
let result = with_file!(self, StorageIoOperation::Write, |file| file
.as_ref()
.write_at(buf, offset));
let result = self
.with_file(StorageIoOperation::Write, |file| file.write_at(buf, offset))
.await?;
if let Ok(size) = result {
STORAGE_IO_SIZE
.with_label_values(&["write", &self.tenant_id, &self.timeline_id])
@@ -552,18 +539,6 @@ impl VirtualFile {
}
}
struct FileGuard<'a> {
slot_guard: RwLockReadGuard<'a, SlotInner>,
}
impl<'a> AsRef<File> for FileGuard<'a> {
fn as_ref(&self) -> &File {
// This unwrap is safe because we only create `FileGuard`s
// if we know that the file is Some.
self.slot_guard.file.as_ref().unwrap()
}
}
#[cfg(test)]
impl VirtualFile {
pub(crate) async fn read_blk(
@@ -596,39 +571,20 @@ impl VirtualFile {
impl Drop for VirtualFile {
/// If a VirtualFile is dropped, close the underlying file if it was open.
fn drop(&mut self) {
let handle = self.handle.get_mut();
let handle = self.handle.get_mut().unwrap();
fn clean_slot(slot: &Slot, mut slot_guard: RwLockWriteGuard<'_, SlotInner>, tag: u64) {
if slot_guard.tag == tag {
slot.recently_used.store(false, Ordering::Relaxed);
// there is also the `CloseByReplace` operation for closes done on eviction for
// comparison.
STORAGE_IO_TIME_METRIC
.get(StorageIoOperation::Close)
.observe_closure_duration(|| drop(slot_guard.file.take()));
}
}
// We don't have async drop so we cannot directly await the lock here.
// Instead, first do a best-effort attempt at closing the underlying
// file descriptor by using `try_write`, and if that fails, spawn
// a tokio task to do it asynchronously: we just want it to be
// cleaned up eventually.
// Most of the time, the `try_lock` should succeed though,
// as we have `&mut self` access. In other words, if the slot
// is still occupied by our file, there should be no access from
// other I/O operations; the only other possible place to lock
// the slot is the lock algorithm looking for free slots.
// We could check with a read-lock first, to avoid waiting on an
// unrelated I/O.
let slot = &get_open_files().slots[handle.index];
if let Ok(slot_guard) = slot.inner.try_write() {
clean_slot(slot, slot_guard, handle.tag);
} else {
let tag = handle.tag;
tokio::spawn(async move {
let slot_guard = slot.inner.write().await;
clean_slot(slot, slot_guard, tag);
});
};
let mut slot_guard = slot.inner.write().unwrap();
if slot_guard.tag == handle.tag {
slot.recently_used.store(false, Ordering::Relaxed);
// there is also operation "close-by-replace" for closes done on eviction for
// comparison.
STORAGE_IO_TIME_METRIC
.get(StorageIoOperation::Close)
.observe_closure_duration(|| drop(slot_guard.file.take()));
}
}
}

View File

@@ -741,13 +741,6 @@ NeonProcessUtility(
break;
case T_DropdbStmt:
HandleDropDb(castNode(DropdbStmt, parseTree));
/*
* We do this here to hack around the fact that Postgres performs the drop
* INSIDE of standard_ProcessUtility, which means that if we try to
* abort the drop normally it'll be too late. DROP DATABASE can't be inside
* of a transaction block anyway, so this should be fine to do.
*/
NeonXactCallback(XACT_EVENT_PRE_COMMIT, NULL);
break;
case T_CreateRoleStmt:
HandleCreateRole(castNode(CreateRoleStmt, parseTree));

View File

@@ -14,6 +14,7 @@
*/
#include <sys/file.h>
#include <sys/statvfs.h>
#include <unistd.h>
#include <fcntl.h>
@@ -37,6 +38,9 @@
#include "storage/fd.h"
#include "storage/pg_shmem.h"
#include "storage/buf_internals.h"
#include "storage/procsignal.h"
#include "postmaster/bgworker.h"
#include "postmaster/interrupt.h"
/*
* Local file cache is used to temporary store relations pages in local file system.
@@ -62,6 +66,9 @@
#define SIZE_MB_TO_CHUNKS(size) ((uint32)((size) * MB / BLCKSZ / BLOCKS_PER_CHUNK))
#define MAX_MONITOR_INTERVAL_USEC 1000000 /* 1 second */
#define MAX_DISK_WRITE_RATE 1000 /* MB/sec */
typedef struct FileCacheEntry
{
BufferTag key;
@@ -84,12 +91,14 @@ static int lfc_desc = 0;
static LWLockId lfc_lock;
static int lfc_max_size;
static int lfc_size_limit;
static int lfc_free_space_watermark;
static char* lfc_path;
static FileCacheControl* lfc_ctl;
static shmem_startup_hook_type prev_shmem_startup_hook;
#if PG_VERSION_NUM>=150000
static shmem_request_hook_type prev_shmem_request_hook;
#endif
static int lfc_shrinking_factor; /* power of two by which local cache size will be shrinked when lfc_free_space_watermark is reached */
void FileCacheMonitorMain(Datum main_arg);
@@ -245,6 +254,80 @@ lfc_change_limit_hook(int newval, void *extra)
LWLockRelease(lfc_lock);
}
/*
* Local file system state monitor check available free space.
* If it is lower than lfc_free_space_watermark then we shrink size of local cache
* but throwing away least recently accessed chunks.
* First time low space watermark is reached cache size is divided by two,
* second time by four,... Finally we remove all chunks from local cache.
*
* Please notice that we are not changing lfc_cache_size: it is used to be adjusted by autoscaler.
* We only throw away cached chunks but do not prevent from filling cache by new chunks.
*
* Interval of poooling cache state is calculated as minimal time needed to consume lfc_free_space_watermark
* disk space with maximal possible disk write speed (1Gb/sec). But not larger than 1 second.
* Calling statvfs each second should not add any noticeable overhead.
*/
void
FileCacheMonitorMain(Datum main_arg)
{
/*
* Choose file system state monitor interval so that space can not be exosted
* during this period but not longer than MAX_MONITOR_INTERVAL (10 sec)
*/
uint64 monitor_interval = Min(MAX_MONITOR_INTERVAL_USEC, lfc_free_space_watermark*MB/MAX_DISK_WRITE_RATE);
/* Establish signal handlers. */
pqsignal(SIGUSR1, procsignal_sigusr1_handler);
pqsignal(SIGHUP, SignalHandlerForConfigReload);
pqsignal(SIGTERM, SignalHandlerForShutdownRequest);
BackgroundWorkerUnblockSignals();
/* Periodically dump buffers until terminated. */
while (!ShutdownRequestPending)
{
if (lfc_size_limit != 0)
{
struct statvfs sfs;
if (statvfs(lfc_path, &sfs) < 0)
{
elog(WARNING, "Failed to obtain status of %s: %m", lfc_path);
}
else
{
if (sfs.f_bavail*sfs.f_bsize < lfc_free_space_watermark*MB)
{
if (lfc_shrinking_factor < 31) {
lfc_shrinking_factor += 1;
}
lfc_change_limit_hook(lfc_size_limit >> lfc_shrinking_factor, NULL);
}
else
lfc_shrinking_factor = 0; /* reset to initial value */
}
}
pg_usleep(monitor_interval);
}
}
static void
lfc_register_free_space_monitor(void)
{
BackgroundWorker bgw;
memset(&bgw, 0, sizeof(bgw));
bgw.bgw_flags = BGWORKER_SHMEM_ACCESS;
bgw.bgw_start_time = BgWorkerStart_RecoveryFinished;
snprintf(bgw.bgw_library_name, BGW_MAXLEN, "neon");
snprintf(bgw.bgw_function_name, BGW_MAXLEN, "FileCacheMonitorMain");
snprintf(bgw.bgw_name, BGW_MAXLEN, "Local free space monitor");
snprintf(bgw.bgw_type, BGW_MAXLEN, "Local free space monitor");
bgw.bgw_restart_time = 5;
bgw.bgw_notify_pid = 0;
bgw.bgw_main_arg = (Datum) 0;
RegisterBackgroundWorker(&bgw);
}
void
lfc_init(void)
{
@@ -281,6 +364,19 @@ lfc_init(void)
lfc_change_limit_hook,
NULL);
DefineCustomIntVariable("neon.free_space_watermark",
"Minimal free space in local file system after reaching which local file cache will be truncated",
NULL,
&lfc_free_space_watermark,
1024, /* 1GB */
0,
INT_MAX,
PGC_SIGHUP,
GUC_UNIT_MB,
NULL,
NULL,
NULL);
DefineCustomStringVariable("neon.file_cache_path",
"Path to local file cache (can be raw device)",
NULL,
@@ -295,6 +391,9 @@ lfc_init(void)
if (lfc_max_size == 0)
return;
if (lfc_free_space_watermark != 0)
lfc_register_free_space_monitor();
prev_shmem_startup_hook = shmem_startup_hook;
shmem_startup_hook = lfc_shmem_startup;
#if PG_VERSION_NUM>=150000

View File

@@ -1790,39 +1790,23 @@ neon_zeroextend(SMgrRelation reln, ForkNumber forkNum, BlockNumber blocknum,
if (!XLogInsertAllowed())
return;
/* ensure we have enough xlog buffers to log max-sized records */
XLogEnsureRecordSpace(Min(remblocks, (XLR_MAX_BLOCK_ID - 1)), 0);
/*
* Iterate over all the pages. They are collected into batches of
* XLR_MAX_BLOCK_ID pages, and a single WAL-record is written for each
* batch.
* Pageserver auto-extends relations with 0s, so WAL-logging only the last
* page is enough here (else we'd log each page 2 times with 0-bytes.
*/
while (remblocks > 0)
{
int count = Min(remblocks, XLR_MAX_BLOCK_ID);
XLogBeginInsert();
for (int i = 0; i < count; i++)
XLogRegisterBlock(i, &InfoFromSMgrRel(reln), forkNum, blocknum + i,
(char *) buffer.data, REGBUF_FORCE_IMAGE | REGBUF_STANDARD);
lsn = XLogInsert(RM_XLOG_ID, XLOG_FPI);
for (int i = 0; i < count; i++)
{
lfc_write(InfoFromSMgrRel(reln), forkNum, blocknum + i, buffer.data);
SetLastWrittenLSNForBlock(lsn, InfoFromSMgrRel(reln), forkNum,
blocknum + i);
}
blocknum += count;
remblocks -= count;
}
XLogBeginInsert();
XLogRegisterBlock(0, &InfoFromSMgrRel(reln), forkNum, blocknum + nblocks, (char *) &buffer.data, REGBUF_FORCE_IMAGE);
lsn = XLogInsert(RM_XLOG_ID, XLOG_FPI);
Assert(lsn != 0);
for (uint32 i = blocknum; i < blocknum + nblocks; i++)
{
lfc_write(InfoFromSMgrRel(reln), forkNum, blocknum + i, buffer.data);
SetLastWrittenLSNForBlock(lsn, InfoFromSMgrRel(reln), forkNum,
blocknum + i);
}
SetLastWrittenLSNForRelation(lsn, InfoFromSMgrRel(reln), forkNum);
set_cached_relsize(InfoFromSMgrRel(reln), forkNum, blocknum);
}
@@ -1985,6 +1969,14 @@ neon_read_at_lsn(NRelFileInfo rinfo, ForkNumber forkNum, BlockNumber blkno,
if (RecoveryInProgress() && !(MyBackendType == B_STARTUP))
XLogWaitForReplayOf(request_lsn);
/*
* It is possible that previous modifications in this backend haven't been
* flushed yet (e.g. in neon_zeroextend), so we do so right now to make sure
* future reads don't have to wait forever.
*/
if (!RecoveryInProgress())
XLogFlush(request_lsn);
/*
* Try to find prefetched page in the list of received pages.
*/

View File

@@ -42,7 +42,6 @@ reqwest-middleware.workspace = true
reqwest-retry.workspace = true
reqwest-tracing.workspace = true
routerify.workspace = true
rustc-hash.workspace = true
rustls-pemfile.workspace = true
rustls.workspace = true
scopeguard.workspace = true

View File

@@ -160,19 +160,6 @@ impl BackendType<'_, ClientCredentials<'_>> {
Test(_) => Some("test".to_owned()),
}
}
/// Get username from the credentials.
pub fn get_user(&self) -> &str {
use BackendType::*;
match self {
Console(_, creds) => creds.user,
Postgres(_, creds) => creds.user,
Link(_) => "link",
Test(_) => "test",
}
}
/// Authenticate the client via the requested backend, possibly using credentials.
#[tracing::instrument(fields(allow_cleartext = allow_cleartext), skip_all)]
pub async fn authenticate(

View File

@@ -17,12 +17,11 @@ use std::{
use tokio::time;
use tokio_postgres::AsyncMessage;
use crate::{
auth, console,
metrics::{Ids, MetricCounter, USAGE_METRICS},
};
use crate::{auth, console};
use crate::{compute, config};
use super::sql_over_http::MAX_RESPONSE_SIZE;
use crate::proxy::ConnectMechanism;
use tracing::{error, warn};
@@ -401,6 +400,7 @@ async fn connect_to_compute_once(
.user(&conn_info.username)
.password(&conn_info.password)
.dbname(&conn_info.dbname)
.max_backend_message_size(MAX_RESPONSE_SIZE)
.connect_timeout(timeout)
.connect(tokio_postgres::NoTls)
.await?;
@@ -412,10 +412,6 @@ async fn connect_to_compute_once(
span.in_scope(|| {
info!(%conn_info, %session, "new connection");
});
let ids = Ids {
endpoint_id: node_info.aux.endpoint_id.to_string(),
branch_id: node_info.aux.branch_id.to_string(),
};
tokio::spawn(
poll_fn(move |cx| {
@@ -454,18 +450,10 @@ async fn connect_to_compute_once(
Ok(Client {
inner: client,
session: tx,
ids,
})
}
pub struct Client {
pub inner: tokio_postgres::Client,
session: tokio::sync::watch::Sender<uuid::Uuid>,
ids: Ids,
}
impl Client {
pub fn metrics(&self) -> Arc<MetricCounter> {
USAGE_METRICS.register(self.ids.clone())
}
}

View File

@@ -3,12 +3,10 @@ use std::sync::Arc;
use anyhow::bail;
use futures::pin_mut;
use futures::StreamExt;
use hashbrown::HashMap;
use hyper::body::HttpBody;
use hyper::header;
use hyper::http::HeaderName;
use hyper::http::HeaderValue;
use hyper::Response;
use hyper::StatusCode;
use hyper::{Body, HeaderMap, Request};
use serde_json::json;
use serde_json::Map;
@@ -18,11 +16,7 @@ use tokio_postgres::types::Type;
use tokio_postgres::GenericClient;
use tokio_postgres::IsolationLevel;
use tokio_postgres::Row;
use tracing::error;
use tracing::instrument;
use url::Url;
use utils::http::error::ApiError;
use utils::http::json::json_response;
use super::conn_pool::ConnInfo;
use super::conn_pool::GlobalConnPool;
@@ -45,6 +39,7 @@ enum Payload {
Batch(BatchQueryData),
}
pub const MAX_RESPONSE_SIZE: usize = 10 * 1024 * 1024; // 10 MB
const MAX_REQUEST_SIZE: u64 = 1024 * 1024; // 1 MB
static RAW_TEXT_OUTPUT: HeaderName = HeaderName::from_static("neon-raw-text-output");
@@ -187,45 +182,7 @@ pub async fn handle(
sni_hostname: Option<String>,
conn_pool: Arc<GlobalConnPool>,
session_id: uuid::Uuid,
) -> Result<Response<Body>, ApiError> {
let result = handle_inner(request, sni_hostname, conn_pool, session_id).await;
let mut response = match result {
Ok(r) => r,
Err(e) => {
let message = format!("{:?}", e);
let code = match e.downcast_ref::<tokio_postgres::Error>() {
Some(e) => match e.code() {
Some(e) => serde_json::to_value(e.code()).unwrap(),
None => Value::Null,
},
None => Value::Null,
};
error!(
?code,
"sql-over-http per-client task finished with an error: {e:#}"
);
// TODO: this shouldn't always be bad request.
json_response(
StatusCode::BAD_REQUEST,
json!({ "message": message, "code": code }),
)?
}
};
response.headers_mut().insert(
"Access-Control-Allow-Origin",
hyper::http::HeaderValue::from_static("*"),
);
Ok(response)
}
#[instrument(name = "sql-over-http", skip_all)]
async fn handle_inner(
request: Request<Body>,
sni_hostname: Option<String>,
conn_pool: Arc<GlobalConnPool>,
session_id: uuid::Uuid,
) -> anyhow::Result<Response<Body>> {
) -> anyhow::Result<(Value, HashMap<HeaderName, HeaderValue>)> {
//
// Determine the destination and connection params
//
@@ -276,18 +233,13 @@ async fn handle_inner(
let mut client = conn_pool.get(&conn_info, !allow_pool, session_id).await?;
let mut response = Response::builder()
.status(StatusCode::OK)
.header(header::CONTENT_TYPE, "application/json");
//
// Now execute the query and return the result
//
let mut size = 0;
let result = match payload {
Payload::Single(query) => {
query_to_json(&client.inner, query, &mut size, raw_output, array_mode).await
}
Payload::Single(query) => query_to_json(&client.inner, query, raw_output, array_mode)
.await
.map(|x| (x, HashMap::default())),
Payload::Batch(batch_query) => {
let mut results = Vec::new();
let mut builder = client.inner.build_transaction();
@@ -302,8 +254,7 @@ async fn handle_inner(
}
let transaction = builder.start().await?;
for query in batch_query.queries {
let result =
query_to_json(&transaction, query, &mut size, raw_output, array_mode).await;
let result = query_to_json(&transaction, query, raw_output, array_mode).await;
match result {
Ok(r) => results.push(r),
Err(e) => {
@@ -313,27 +264,26 @@ async fn handle_inner(
}
}
transaction.commit().await?;
let mut headers = HashMap::default();
if txn_read_only {
response = response.header(
headers.insert(
TXN_READ_ONLY.clone(),
HeaderValue::try_from(txn_read_only.to_string())?,
);
}
if txn_deferrable {
response = response.header(
headers.insert(
TXN_DEFERRABLE.clone(),
HeaderValue::try_from(txn_deferrable.to_string())?,
);
}
if let Some(txn_isolation_level) = txn_isolation_level_raw {
response = response.header(TXN_ISOLATION_LEVEL.clone(), txn_isolation_level);
headers.insert(TXN_ISOLATION_LEVEL.clone(), txn_isolation_level);
}
Ok(json!({ "results": results }))
Ok((json!({ "results": results }), headers))
}
};
let metrics = client.metrics();
if allow_pool {
let current_span = tracing::Span::current();
// return connection to the pool
@@ -343,30 +293,12 @@ async fn handle_inner(
});
}
match result {
Ok(value) => {
// how could this possibly fail
let body = serde_json::to_string(&value).expect("json serialization should not fail");
let len = body.len();
let response = response
.body(Body::from(body))
// only fails if invalid status code or invalid header/values are given.
// these are not user configurable so it cannot fail dynamically
.expect("building response payload should not fail");
// count the egress bytes - we miss the TLS and header overhead but oh well...
// moving this later in the stack is going to be a lot of effort and ehhhh
metrics.record_egress(len as u64);
Ok(response)
}
Err(e) => Err(e),
}
result
}
async fn query_to_json<T: GenericClient>(
client: &T,
data: QueryData,
current_size: &mut usize,
raw_output: bool,
array_mode: bool,
) -> anyhow::Result<Value> {
@@ -380,10 +312,16 @@ async fn query_to_json<T: GenericClient>(
// big.
pin_mut!(row_stream);
let mut rows: Vec<tokio_postgres::Row> = Vec::new();
let mut current_size = 0;
while let Some(row) = row_stream.next().await {
let row = row?;
*current_size += row.body_len();
current_size += row.body_len();
rows.push(row);
if current_size > MAX_RESPONSE_SIZE {
return Err(anyhow::anyhow!(
"response is too large (max is {MAX_RESPONSE_SIZE} bytes)"
));
}
}
// grab the command tag and number of rows affected

View File

@@ -7,6 +7,7 @@ use crate::{
};
use bytes::{Buf, Bytes};
use futures::{Sink, Stream, StreamExt};
use hashbrown::HashMap;
use hyper::{
server::{
accept,
@@ -17,6 +18,7 @@ use hyper::{
};
use hyper_tungstenite::{tungstenite::Message, HyperWebsocket, WebSocketStream};
use pin_project_lite::pin_project;
use serde_json::{json, Value};
use std::{
convert::Infallible,
@@ -202,7 +204,44 @@ async fn ws_handler(
// TODO: that deserves a refactor as now this function also handles http json client besides websockets.
// Right now I don't want to blow up sql-over-http patch with file renames and do that as a follow up instead.
} else if request.uri().path() == "/sql" && request.method() == Method::POST {
sql_over_http::handle(request, sni_hostname, conn_pool, session_id).await
let result = sql_over_http::handle(request, sni_hostname, conn_pool, session_id)
.instrument(info_span!("sql-over-http"))
.await;
let status_code = match result {
Ok(_) => StatusCode::OK,
Err(_) => StatusCode::BAD_REQUEST,
};
let (json, headers) = match result {
Ok(r) => r,
Err(e) => {
let message = format!("{:?}", e);
let code = match e.downcast_ref::<tokio_postgres::Error>() {
Some(e) => match e.code() {
Some(e) => serde_json::to_value(e.code()).unwrap(),
None => Value::Null,
},
None => Value::Null,
};
error!(
?code,
"sql-over-http per-client task finished with an error: {e:#}"
);
(
json!({ "message": message, "code": code }),
HashMap::default(),
)
}
};
json_response(status_code, json).map(|mut r| {
r.headers_mut().insert(
"Access-Control-Allow-Origin",
hyper::http::HeaderValue::from_static("*"),
);
for (k, v) in headers {
r.headers_mut().insert(k, v);
}
r
})
} else if request.uri().path() == "/sql" && request.method() == Method::OPTIONS {
Response::builder()
.header("Allow", "OPTIONS, POST")
@@ -214,7 +253,7 @@ async fn ws_handler(
.header("Access-Control-Max-Age", "86400" /* 24 hours */)
.status(StatusCode::OK) // 204 is also valid, but see: https://developer.mozilla.org/en-US/docs/Web/HTTP/Methods/OPTIONS#status_code
.body(Body::empty())
.map_err(|e| ApiError::InternalServerError(e.into()))
.map_err(|e| ApiError::BadRequest(e.into()))
} else {
json_response(StatusCode::BAD_REQUEST, "query is not supported")
}

View File

@@ -3,18 +3,9 @@
use crate::{config::MetricCollectionConfig, http};
use chrono::{DateTime, Utc};
use consumption_metrics::{idempotency_key, Event, EventChunk, EventType, CHUNK_SIZE};
use dashmap::{mapref::entry::Entry, DashMap};
use once_cell::sync::Lazy;
use serde::{Deserialize, Serialize};
use std::{
convert::Infallible,
sync::{
atomic::{AtomicU64, AtomicUsize, Ordering},
Arc,
},
time::Duration,
};
use tracing::{error, info, instrument, trace};
use serde::Serialize;
use std::{collections::HashMap, convert::Infallible, time::Duration};
use tracing::{error, info, instrument, trace, warn};
const PROXY_IO_BYTES_PER_CLIENT: &str = "proxy_io_bytes_per_client";
@@ -27,95 +18,12 @@ const DEFAULT_HTTP_REPORTING_TIMEOUT: Duration = Duration::from_secs(60);
/// Both the proxy and the ingestion endpoint will live in the same region (or cell)
/// so while the project-id is unique across regions the whole pipeline will work correctly
/// because we enrich the event with project_id in the control-plane endpoint.
#[derive(Eq, Hash, PartialEq, Serialize, Deserialize, Debug, Clone)]
#[derive(Eq, Hash, PartialEq, Serialize, Debug, Clone)]
pub struct Ids {
pub endpoint_id: String,
pub branch_id: String,
}
#[derive(Debug)]
pub struct MetricCounter {
transmitted: AtomicU64,
opened_connections: AtomicUsize,
}
impl MetricCounter {
/// Record that some bytes were sent from the proxy to the client
pub fn record_egress(&self, bytes: u64) {
self.transmitted.fetch_add(bytes, Ordering::AcqRel);
}
/// extract the value that should be reported
fn should_report(self: &Arc<Self>) -> Option<u64> {
// heuristic to see if the branch is still open
// if a clone happens while we are observing, the heuristic will be incorrect.
//
// Worst case is that we won't report an event for this endpoint.
// However, for the strong count to be 1 it must have occured that at one instant
// all the endpoints were closed, so missing a report because the endpoints are closed is valid.
let is_open = Arc::strong_count(self) > 1;
let opened = self.opened_connections.swap(0, Ordering::AcqRel);
// update cached metrics eagerly, even if they can't get sent
// (to avoid sending the same metrics twice)
// see the relevant discussion on why to do so even if the status is not success:
// https://github.com/neondatabase/neon/pull/4563#discussion_r1246710956
let value = self.transmitted.swap(0, Ordering::AcqRel);
// Our only requirement is that we report in every interval if there was an open connection
// if there were no opened connections since, then we don't need to report
if value == 0 && !is_open && opened == 0 {
None
} else {
Some(value)
}
}
/// Determine whether the counter should be cleared from the global map.
fn should_clear(self: &mut Arc<Self>) -> bool {
// we can't clear this entry if it's acquired elsewhere
let Some(counter) = Arc::get_mut(self) else {
return false;
};
let opened = *counter.opened_connections.get_mut();
let value = *counter.transmitted.get_mut();
// clear if there's no data to report
value == 0 && opened == 0
}
}
// endpoint and branch IDs are not user generated so we don't run the risk of hash-dos
type FastHasher = std::hash::BuildHasherDefault<rustc_hash::FxHasher>;
#[derive(Default)]
pub struct Metrics {
endpoints: DashMap<Ids, Arc<MetricCounter>, FastHasher>,
}
impl Metrics {
/// Register a new byte metrics counter for this endpoint
pub fn register(&self, ids: Ids) -> Arc<MetricCounter> {
let entry = if let Some(entry) = self.endpoints.get(&ids) {
entry.clone()
} else {
self.endpoints
.entry(ids)
.or_insert_with(|| {
Arc::new(MetricCounter {
transmitted: AtomicU64::new(0),
opened_connections: AtomicUsize::new(0),
})
})
.clone()
};
entry.opened_connections.fetch_add(1, Ordering::AcqRel);
entry
}
}
pub static USAGE_METRICS: Lazy<Metrics> = Lazy::new(Metrics::default);
pub async fn task_main(config: &MetricCollectionConfig) -> anyhow::Result<Infallible> {
info!("metrics collector config: {config:?}");
scopeguard::defer! {
@@ -123,83 +31,145 @@ pub async fn task_main(config: &MetricCollectionConfig) -> anyhow::Result<Infall
}
let http_client = http::new_client_with_timeout(DEFAULT_HTTP_REPORTING_TIMEOUT);
let mut cached_metrics: HashMap<Ids, (u64, DateTime<Utc>)> = HashMap::new();
let hostname = hostname::get()?.as_os_str().to_string_lossy().into_owned();
let mut prev = Utc::now();
let mut ticker = tokio::time::interval(config.interval);
loop {
ticker.tick().await;
let now = Utc::now();
collect_metrics_iteration(
&USAGE_METRICS,
let res = collect_metrics_iteration(
&http_client,
&mut cached_metrics,
&config.endpoint,
&hostname,
prev,
now,
)
.await;
prev = now;
match res {
Err(e) => error!("failed to send consumption metrics: {e} "),
Ok(_) => trace!("periodic metrics collection completed successfully"),
}
}
}
fn gather_proxy_io_bytes_per_client() -> Vec<(Ids, (u64, DateTime<Utc>))> {
let mut current_metrics: Vec<(Ids, (u64, DateTime<Utc>))> = Vec::new();
let metrics = prometheus::default_registry().gather();
for m in metrics {
if m.get_name() == "proxy_io_bytes_per_client" {
for ms in m.get_metric() {
let direction = ms
.get_label()
.iter()
.find(|l| l.get_name() == "direction")
.unwrap()
.get_value();
// Only collect metric for outbound traffic
if direction == "tx" {
let endpoint_id = ms
.get_label()
.iter()
.find(|l| l.get_name() == "endpoint_id")
.unwrap()
.get_value();
let branch_id = ms
.get_label()
.iter()
.find(|l| l.get_name() == "branch_id")
.unwrap()
.get_value();
let value = ms.get_counter().get_value() as u64;
// Report if the metric value is suspiciously large
if value > (1u64 << 40) {
warn!(
"potentially abnormal counter value: branch_id {} endpoint_id {} val: {}",
branch_id, endpoint_id, value
);
}
current_metrics.push((
Ids {
endpoint_id: endpoint_id.to_string(),
branch_id: branch_id.to_string(),
},
(value, Utc::now()),
));
}
}
}
}
current_metrics
}
#[instrument(skip_all)]
async fn collect_metrics_iteration(
metrics: &Metrics,
client: &http::ClientWithMiddleware,
cached_metrics: &mut HashMap<Ids, (u64, DateTime<Utc>)>,
metric_collection_endpoint: &reqwest::Url,
hostname: &str,
prev: DateTime<Utc>,
now: DateTime<Utc>,
) {
) -> anyhow::Result<()> {
info!(
"starting collect_metrics_iteration. metric_collection_endpoint: {}",
metric_collection_endpoint
);
let mut metrics_to_clear = Vec::new();
let current_metrics = gather_proxy_io_bytes_per_client();
let metrics_to_send: Vec<(Ids, u64)> = metrics
.endpoints
let metrics_to_send: Vec<Event<Ids, &'static str>> = current_metrics
.iter()
.filter_map(|counter| {
let key = counter.key().clone();
let Some(value) = counter.should_report() else {
metrics_to_clear.push(key);
return None;
.filter_map(|(curr_key, (curr_val, curr_time))| {
let mut start_time = *curr_time;
let mut value = *curr_val;
if let Some((prev_val, prev_time)) = cached_metrics.get(curr_key) {
// Only send metrics updates if the metric has increased
if curr_val > prev_val {
value = curr_val - prev_val;
start_time = *prev_time;
} else {
if curr_val < prev_val {
error!("proxy_io_bytes_per_client metric value decreased from {} to {} for key {:?}",
prev_val, curr_val, curr_key);
}
return None;
}
};
Some((key, value))
Some(Event {
kind: EventType::Incremental {
start_time,
stop_time: *curr_time,
},
metric: PROXY_IO_BYTES_PER_CLIENT,
idempotency_key: idempotency_key(hostname),
value,
extra: Ids {
endpoint_id: curr_key.endpoint_id.clone(),
branch_id: curr_key.branch_id.clone(),
},
})
})
.collect();
if metrics_to_send.is_empty() {
trace!("no new metrics to send");
return Ok(());
}
// Send metrics.
// Split into chunks of 1000 metrics to avoid exceeding the max request size
for chunk in metrics_to_send.chunks(CHUNK_SIZE) {
let events = chunk
.iter()
.map(|(ids, value)| Event {
kind: EventType::Incremental {
start_time: prev,
stop_time: now,
},
metric: PROXY_IO_BYTES_PER_CLIENT,
idempotency_key: idempotency_key(hostname),
value: *value,
extra: Ids {
endpoint_id: ids.endpoint_id.clone(),
branch_id: ids.branch_id.clone(),
},
})
.collect();
let res = client
.post(metric_collection_endpoint.clone())
.json(&EventChunk { events })
.json(&EventChunk {
events: chunk.into(),
})
.send()
.await;
@@ -213,113 +183,34 @@ async fn collect_metrics_iteration(
if !res.status().is_success() {
error!("metrics endpoint refused the sent metrics: {:?}", res);
for metric in chunk.iter().filter(|(_, value)| *value > (1u64 << 40)) {
for metric in chunk.iter().filter(|metric| metric.value > (1u64 << 40)) {
// Report if the metric value is suspiciously large
error!("potentially abnormal metric value: {:?}", metric);
}
}
}
// update cached metrics after they were sent
// (to avoid sending the same metrics twice)
// see the relevant discussion on why to do so even if the status is not success:
// https://github.com/neondatabase/neon/pull/4563#discussion_r1246710956
for send_metric in chunk {
let stop_time = match send_metric.kind {
EventType::Incremental { stop_time, .. } => stop_time,
_ => unreachable!(),
};
for metric in metrics_to_clear {
match metrics.endpoints.entry(metric) {
Entry::Occupied(mut counter) => {
if counter.get_mut().should_clear() {
counter.remove_entry();
}
}
Entry::Vacant(_) => {}
cached_metrics
.entry(Ids {
endpoint_id: send_metric.extra.endpoint_id.clone(),
branch_id: send_metric.extra.branch_id.clone(),
})
// update cached value (add delta) and time
.and_modify(|e| {
e.0 = e.0.saturating_add(send_metric.value);
e.1 = stop_time
})
// cache new metric
.or_insert((send_metric.value, stop_time));
}
}
}
#[cfg(test)]
mod tests {
use std::{
net::TcpListener,
sync::{Arc, Mutex},
};
use anyhow::Error;
use chrono::Utc;
use consumption_metrics::{Event, EventChunk};
use hyper::{
service::{make_service_fn, service_fn},
Body, Response,
};
use url::Url;
use super::{collect_metrics_iteration, Ids, Metrics};
use crate::http;
#[tokio::test]
async fn metrics() {
let listener = TcpListener::bind("0.0.0.0:0").unwrap();
let reports = Arc::new(Mutex::new(vec![]));
let reports2 = reports.clone();
let server = hyper::server::Server::from_tcp(listener)
.unwrap()
.serve(make_service_fn(move |_| {
let reports = reports.clone();
async move {
Ok::<_, Error>(service_fn(move |req| {
let reports = reports.clone();
async move {
let bytes = hyper::body::to_bytes(req.into_body()).await?;
let events: EventChunk<'static, Event<Ids, String>> =
serde_json::from_slice(&bytes)?;
reports.lock().unwrap().push(events);
Ok::<_, Error>(Response::new(Body::from(vec![])))
}
}))
}
}));
let addr = server.local_addr();
tokio::spawn(server);
let metrics = Metrics::default();
let client = http::new_client();
let endpoint = Url::parse(&format!("http://{addr}")).unwrap();
let now = Utc::now();
// no counters have been registered
collect_metrics_iteration(&metrics, &client, &endpoint, "foo", now, now).await;
let r = std::mem::take(&mut *reports2.lock().unwrap());
assert!(r.is_empty());
// register a new counter
let counter = metrics.register(Ids {
endpoint_id: "e1".to_string(),
branch_id: "b1".to_string(),
});
// the counter should be observed despite 0 egress
collect_metrics_iteration(&metrics, &client, &endpoint, "foo", now, now).await;
let r = std::mem::take(&mut *reports2.lock().unwrap());
assert_eq!(r.len(), 1);
assert_eq!(r[0].events.len(), 1);
assert_eq!(r[0].events[0].value, 0);
// record egress
counter.record_egress(1);
// egress should be observered
collect_metrics_iteration(&metrics, &client, &endpoint, "foo", now, now).await;
let r = std::mem::take(&mut *reports2.lock().unwrap());
assert_eq!(r.len(), 1);
assert_eq!(r[0].events.len(), 1);
assert_eq!(r[0].events[0].value, 1);
// release counter
drop(counter);
// we do not observe the counter
collect_metrics_iteration(&metrics, &client, &endpoint, "foo", now, now).await;
let r = std::mem::take(&mut *reports2.lock().unwrap());
assert!(r.is_empty());
// counter is unregistered
assert!(metrics.endpoints.is_empty());
}
Ok(())
}

View File

@@ -7,7 +7,6 @@ use crate::{
compute::{self, PostgresConnection},
config::{ProxyConfig, TlsConfig},
console::{self, errors::WakeComputeError, messages::MetricsAuxInfo, Api},
metrics::{Ids, USAGE_METRICS},
protocol2::WithClientIp,
stream::{PqStream, Stream},
};
@@ -603,11 +602,6 @@ pub async fn proxy_pass(
compute: impl AsyncRead + AsyncWrite + Unpin,
aux: &MetricsAuxInfo,
) -> anyhow::Result<()> {
let usage = USAGE_METRICS.register(Ids {
endpoint_id: aux.endpoint_id.to_string(),
branch_id: aux.branch_id.to_string(),
});
let m_sent = NUM_BYTES_PROXIED_COUNTER.with_label_values(&aux.traffic_labels("tx"));
let mut client = MeasuredStream::new(
client,
@@ -615,7 +609,6 @@ pub async fn proxy_pass(
|cnt| {
// Number of bytes we sent to the client (outbound).
m_sent.inc_by(cnt as u64);
usage.record_egress(cnt as u64);
},
);
@@ -697,14 +690,7 @@ impl<S: AsyncRead + AsyncWrite + Unpin> Client<'_, S> {
.await
{
Ok(auth_result) => auth_result,
Err(e) => {
let user = creds.get_user();
let db = params.get("database");
let app = params.get("application_name");
let params_span = tracing::info_span!("", ?user, ?db, ?app);
return stream.throw_error(e).instrument(params_span).await;
}
Err(e) => return stream.throw_error(e).await,
};
let AuthSuccess {

View File

@@ -105,8 +105,6 @@ class NeonCompare(PgCompare):
self._pg_bin = pg_bin
self.pageserver_http_client = self.env.pageserver.http_client()
# note that neon_simple_env now uses LOCAL_FS remote storage
# Create tenant
tenant_conf: Dict[str, str] = {}
if False: # TODO add pytest setting for this

View File

@@ -460,11 +460,9 @@ class NeonEnvBuilder:
), "Unexpectedly instantiated from outside a test function"
self.test_name = test_name
def init_configs(self, default_remote_storage_if_missing: bool = True) -> NeonEnv:
def init_configs(self) -> NeonEnv:
# Cannot create more than one environment from one builder
assert self.env is None, "environment already initialized"
if default_remote_storage_if_missing and self.pageserver_remote_storage is None:
self.enable_pageserver_remote_storage(RemoteStorageKind.LOCAL_FS)
self.env = NeonEnv(self)
return self.env
@@ -472,19 +470,8 @@ class NeonEnvBuilder:
assert self.env is not None, "environment is not already initialized, call init() first"
self.env.start()
def init_start(
self,
initial_tenant_conf: Optional[Dict[str, str]] = None,
default_remote_storage_if_missing: bool = True,
) -> NeonEnv:
"""
Default way to create and start NeonEnv. Also creates the initial_tenant with root initial_timeline.
To avoid creating initial_tenant, call init_configs to setup the environment.
Configuring pageserver with remote storage is now the default. There will be a warning if pageserver is created without one.
"""
env = self.init_configs(default_remote_storage_if_missing=default_remote_storage_if_missing)
def init_start(self, initial_tenant_conf: Optional[Dict[str, str]] = None) -> NeonEnv:
env = self.init_configs()
self.start()
# Prepare the default branch to start the postgres on later.
@@ -559,7 +546,7 @@ class NeonEnvBuilder:
user: RemoteStorageUser,
bucket_name: Optional[str] = None,
bucket_region: Optional[str] = None,
) -> RemoteStorage:
) -> Optional[RemoteStorage]:
ret = kind.configure(
self.repo_dir,
self.mock_s3_server,
@@ -902,8 +889,6 @@ def _shared_simple_env(
"""
# Internal fixture backing the `neon_simple_env` fixture. If TEST_SHARED_FIXTURES
is set, this is shared by all tests using `neon_simple_env`.
This fixture will use RemoteStorageKind.LOCAL_FS with pageserver.
"""
if os.environ.get("TEST_SHARED_FIXTURES") is None:

View File

@@ -202,6 +202,9 @@ class RemoteStorageKind(str, enum.Enum):
LOCAL_FS = "local_fs"
MOCK_S3 = "mock_s3"
REAL_S3 = "real_s3"
# Pass to tests that are generic to remote storage
# to ensure the test pass with or without the remote storage
NOOP = "noop"
def configure(
self,
@@ -212,7 +215,10 @@ class RemoteStorageKind(str, enum.Enum):
user: RemoteStorageUser,
bucket_name: Optional[str] = None,
bucket_region: Optional[str] = None,
) -> RemoteStorage:
) -> Optional[RemoteStorage]:
if self == RemoteStorageKind.NOOP:
return None
if self == RemoteStorageKind.LOCAL_FS:
return LocalFsStorage(LocalFsStorage.component_path(repo_dir, user))

View File

@@ -1,52 +0,0 @@
import queue
import threading
from fixtures.neon_fixtures import NeonEnvBuilder, PgBin, wait_for_last_flush_lsn
from fixtures.types import TenantId
"""
553 sudo mkfs.ext4 /dev/nvme1n1
555 mkdir test_output
556 sudo mount /dev/nvme1n1 test_output
557 htop
559 ./scripts/pysync
560 NEON_BIN=/home/admin/neon/target/release DEFAULT_PG_VERSION=15 ./scripts/pytest --preserve-database-files --timeout=0 ./test_runner/performance/test_pageserver_startup_many_tenants.py
561 sudo chown -R admin:admin test_output
cargo build_testing --release
562 NEON_BIN=$PWD/target/release DEFAULT_PG_VERSION=15 ./scripts/pytest --preserve-database-files --timeout=0 ./test_runner/performance/test_pageserver_startup_many_tenants.py
cd test_output/test_pageserver_startup_many_tenants/repo
sudo env NEON_REPO_DIR=$PWD prlimit --nofile=300000:300000 ../../../target/release/neon_local start
# watch initial load complete, then background jobs start. That's the interesting part.
sudo env NEON_REPO_DIR=$PWD prlimit --nofile=300000:300000 ../../../target/release/neon_local stop
# usually pageserver won't be responsive, kill with
sudo pkill -9 pageserver
"""
def test_pageserver_startup_many_tenants(neon_env_builder: NeonEnvBuilder, pg_bin: PgBin):
env = neon_env_builder.init_start()
# below doesn't work because summaries contain tenant and timeline ids and we check for them
tenant_id, timeline_id = env.initial_tenant, env.initial_timeline
pshttp = env.pageserver.http_client()
ep = env.endpoints.create_start("main")
ep.safe_psql("create table foo(b text)")
for i in range(0, 8):
ep.safe_psql("insert into foo(b) values ('some text')")
# pg_bin.run_capture(["pgbench", "-i", "-s1", ep.connstr()])
wait_for_last_flush_lsn(env, ep, tenant_id, timeline_id)
pshttp.timeline_checkpoint(tenant_id, timeline_id)
ep.stop_and_destroy()
env.pageserver.stop()
for sk in env.safekeepers:
sk.stop()
tenant_dir = env.repo_dir / "pageserver_1" / "tenants" / str(env.initial_tenant)
for i in range(0, 20_000):
import shutil
shutil.copytree(tenant_dir, tenant_dir.parent / str(TenantId.generate()))

View File

@@ -4,12 +4,7 @@ from typing import List, Tuple
import pytest
from fixtures.log_helper import log
from fixtures.neon_fixtures import (
Endpoint,
NeonEnv,
NeonEnvBuilder,
wait_for_last_flush_lsn,
)
from fixtures.neon_fixtures import Endpoint, NeonEnv, NeonEnvBuilder
from fixtures.types import TenantId, TimelineId
@@ -31,18 +26,17 @@ def test_broken_timeline(neon_env_builder: NeonEnvBuilder):
tenant_timelines: List[Tuple[TenantId, TimelineId, Endpoint]] = []
for _ in range(3):
for _ in range(4):
tenant_id, timeline_id = env.neon_cli.create_tenant()
endpoint = env.endpoints.create_start("main", tenant_id=tenant_id)
with endpoint.cursor() as cur:
cur.execute("CREATE TABLE t(key int primary key, value text)")
cur.execute("INSERT INTO t SELECT generate_series(1,100), 'payload'")
wait_for_last_flush_lsn(env, endpoint, tenant_id, timeline_id)
endpoint.stop()
tenant_timelines.append((tenant_id, timeline_id, endpoint))
# Stop the pageserver -- this has to be not immediate or we need to wait for uploads
# Stop the pageserver
env.pageserver.stop()
# Leave the first timeline alone, but corrupt the others in different ways
@@ -51,21 +45,30 @@ def test_broken_timeline(neon_env_builder: NeonEnvBuilder):
(tenant1, timeline1, pg1) = tenant_timelines[1]
metadata_path = f"{env.pageserver.workdir}/tenants/{tenant1}/timelines/{timeline1}/metadata"
with open(metadata_path, "w") as f:
f.write("overwritten with garbage!")
f = open(metadata_path, "w")
f.write("overwritten with garbage!")
f.close()
log.info(f"Timeline {tenant1}/{timeline1} got its metadata spoiled")
(tenant2, timeline2, pg2) = tenant_timelines[2]
timeline_path = f"{env.pageserver.workdir}/tenants/{tenant2}/timelines/{timeline2}/"
for filename in os.listdir(timeline_path):
if filename.startswith("00000"):
# Looks like a layer file. Remove it
os.remove(f"{timeline_path}/{filename}")
log.info(
f"Timeline {tenant2}/{timeline2} got its layer files removed (no remote storage enabled)"
)
(tenant3, timeline3, pg3) = tenant_timelines[3]
timeline_path = f"{env.pageserver.workdir}/tenants/{tenant3}/timelines/{timeline3}/"
for filename in os.listdir(timeline_path):
if filename.startswith("00000"):
# Looks like a layer file. Corrupt it
p = f"{timeline_path}/{filename}"
size = os.path.getsize(p)
with open(p, "wb") as f:
f.truncate(0)
f.truncate(size)
log.info(f"Timeline {tenant2}/{timeline2} got its local layer files spoiled")
f = open(f"{timeline_path}/{filename}", "w")
f.write("overwritten with garbage!")
f.close()
log.info(f"Timeline {tenant3}/{timeline3} got its layer files spoiled")
env.pageserver.start()
@@ -84,13 +87,22 @@ def test_broken_timeline(neon_env_builder: NeonEnvBuilder):
f"As expected, compute startup failed eagerly for timeline with corrupt metadata: {err}"
)
# Second timeline will fail during basebackup, because the local layer file is corrupt.
# Second timeline has no ancestors, only the metadata file and no layer files locally,
# and we don't have the remote storage enabled. It is loaded into memory, but getting
# the basebackup from it will fail.
with pytest.raises(
Exception, match=f"Tenant {tenant2} will not become active. Current state: Broken"
) as err:
pg2.start()
log.info(f"As expected, compute startup failed for timeline with missing layers: {err}")
# Third timeline will also fail during basebackup, because the layer file is corrupt.
# It will fail when we try to read (and reconstruct) a page from it, ergo the error message.
# (We don't check layer file contents on startup, when loading the timeline)
with pytest.raises(Exception, match="Failed to load delta layer") as err:
pg2.start()
pg3.start()
log.info(
f"As expected, compute startup failed for timeline {tenant2}/{timeline2} with corrupt layers: {err}"
f"As expected, compute startup failed for timeline {tenant3}/{timeline3} with corrupt layers: {err}"
)

View File

@@ -211,12 +211,4 @@ def test_ddl_forwarding(ddl: DdlForwardingContext):
ddl.wait()
ddl.failures(False)
cur.execute("CREATE DATABASE failure WITH OWNER=cork")
ddl.wait()
with pytest.raises(psycopg2.InternalError):
ddl.failures(True)
cur.execute("DROP DATABASE failure")
ddl.wait()
ddl.pg.connect(dbname="failure") # Ensure we can connect after a failed drop
conn.close()

View File

@@ -5,6 +5,7 @@ from pathlib import Path
from queue import SimpleQueue
from typing import Any, Dict, Set
import pytest
from fixtures.log_helper import log
from fixtures.neon_fixtures import (
NeonEnvBuilder,
@@ -16,13 +17,18 @@ from pytest_httpserver import HTTPServer
from werkzeug.wrappers.request import Request
from werkzeug.wrappers.response import Response
# TODO: collect all of the env setup *AFTER* removal of RemoteStorageKind.NOOP
@pytest.mark.parametrize(
"remote_storage_kind", [RemoteStorageKind.NOOP, RemoteStorageKind.LOCAL_FS]
)
def test_metric_collection(
httpserver: HTTPServer,
neon_env_builder: NeonEnvBuilder,
httpserver_listen_address,
remote_storage_kind: RemoteStorageKind,
):
(host, port) = httpserver_listen_address
metric_collection_endpoint = f"http://{host}:{port}/billing/api/v1/usage_events"
@@ -52,7 +58,7 @@ def test_metric_collection(
synthetic_size_calculation_interval="3s"
"""
neon_env_builder.enable_pageserver_remote_storage(RemoteStorageKind.LOCAL_FS)
neon_env_builder.enable_pageserver_remote_storage(remote_storage_kind)
log.info(f"test_metric_collection endpoint is {metric_collection_endpoint}")
@@ -103,14 +109,17 @@ def test_metric_collection(
total += sample[2]
return int(total)
# upload some data to remote storage
wait_for_last_flush_lsn(env, endpoint, tenant_id, timeline_id)
pageserver_http = env.pageserver.http_client()
pageserver_http.timeline_checkpoint(tenant_id, timeline_id)
pageserver_http.timeline_gc(tenant_id, timeline_id, 10000)
remote_uploaded = 0
remote_uploaded = get_num_remote_ops("index", "upload")
assert remote_uploaded > 0
# upload some data to remote storage
if remote_storage_kind == RemoteStorageKind.LOCAL_FS:
wait_for_last_flush_lsn(env, endpoint, tenant_id, timeline_id)
pageserver_http = env.pageserver.http_client()
pageserver_http.timeline_checkpoint(tenant_id, timeline_id)
pageserver_http.timeline_gc(tenant_id, timeline_id, 10000)
remote_uploaded = get_num_remote_ops("index", "upload")
assert remote_uploaded > 0
# we expect uploads at 1Hz, on busy runners this could be too optimistic,
# so give 5s we only want to get the following upload after "ready" value.

View File

@@ -30,7 +30,9 @@ from fixtures.types import TenantId
from fixtures.utils import run_pg_bench_small
@pytest.mark.parametrize("remote_storage_kind", available_remote_storages())
@pytest.mark.parametrize(
"remote_storage_kind", [RemoteStorageKind.NOOP, *available_remote_storages()]
)
def test_tenant_delete_smoke(
neon_env_builder: NeonEnvBuilder,
remote_storage_kind: RemoteStorageKind,
@@ -142,12 +144,18 @@ FAILPOINTS_BEFORE_BACKGROUND = [
def combinations():
result = []
remotes = [RemoteStorageKind.MOCK_S3]
remotes = [RemoteStorageKind.NOOP, RemoteStorageKind.MOCK_S3]
if os.getenv("ENABLE_REAL_S3_REMOTE_STORAGE"):
remotes.append(RemoteStorageKind.REAL_S3)
for remote_storage_kind in remotes:
for delete_failpoint in FAILPOINTS:
if remote_storage_kind is RemoteStorageKind.NOOP and delete_failpoint in (
"timeline-delete-before-index-delete",
):
# the above failpoint are not relevant for config without remote storage
continue
# Simulate failures for only one type of remote storage
# to avoid log pollution and make tests run faster
if remote_storage_kind is RemoteStorageKind.MOCK_S3:
@@ -207,18 +215,21 @@ def test_delete_tenant_exercise_crash_safety_failpoints(
with env.endpoints.create_start("delete", tenant_id=tenant_id) as endpoint:
# generate enough layers
run_pg_bench_small(pg_bin, endpoint.connstr())
last_flush_lsn_upload(env, endpoint, tenant_id, timeline_id)
if remote_storage_kind is RemoteStorageKind.NOOP:
wait_for_last_flush_lsn(env, endpoint, tenant_id, timeline_id)
else:
last_flush_lsn_upload(env, endpoint, tenant_id, timeline_id)
if remote_storage_kind in available_s3_storages():
assert_prefix_not_empty(
neon_env_builder,
prefix="/".join(
(
"tenants",
str(tenant_id),
)
),
)
if remote_storage_kind in available_s3_storages():
assert_prefix_not_empty(
neon_env_builder,
prefix="/".join(
(
"tenants",
str(tenant_id),
)
),
)
ps_http.configure_failpoints((failpoint, "return"))
@@ -249,7 +260,12 @@ def test_delete_tenant_exercise_crash_safety_failpoints(
env.pageserver.stop()
env.pageserver.start()
if failpoint in (
if (
remote_storage_kind is RemoteStorageKind.NOOP
and failpoint == "tenant-delete-before-create-local-mark"
):
tenant_delete_wait_completed(ps_http, tenant_id, iterations=iterations)
elif failpoint in (
"tenant-delete-before-shutdown",
"tenant-delete-before-create-remote-mark",
):

View File

@@ -519,8 +519,11 @@ def test_detach_while_attaching(
# * restart the pageserver and verify that ignored tenant is still not loaded
# * `load` the same tenant
# * ensure that it's status is `Active` and it's present in pageserver's memory with all timelines
def test_ignored_tenant_reattach(neon_env_builder: NeonEnvBuilder):
neon_env_builder.enable_pageserver_remote_storage(RemoteStorageKind.MOCK_S3)
@pytest.mark.parametrize("remote_storage_kind", [RemoteStorageKind.NOOP, RemoteStorageKind.MOCK_S3])
def test_ignored_tenant_reattach(
neon_env_builder: NeonEnvBuilder, remote_storage_kind: RemoteStorageKind
):
neon_env_builder.enable_pageserver_remote_storage(remote_storage_kind)
env = neon_env_builder.init_start()
pageserver_http = env.pageserver.http_client()

View File

@@ -15,7 +15,7 @@ from fixtures.pageserver.utils import (
timeline_delete_wait_completed,
wait_until_tenant_active,
)
from fixtures.pg_version import PgVersion
from fixtures.pg_version import PgVersion, xfail_on_postgres
from fixtures.types import Lsn, TenantId, TimelineId
@@ -532,24 +532,7 @@ def test_single_branch_get_tenant_size_grows(
assert size_after == prev, "size after restarting pageserver should not have changed"
def assert_size_approx_equal(size_a, size_b):
"""
Tests that evaluate sizes are checking the pageserver space consumption
that sits many layers below the user input. The exact space needed
varies slightly depending on postgres behavior.
Rather than expecting postgres to be determinstic and occasionally
failing the test, we permit sizes for the same data to vary by a few pages.
"""
# Determined empirically from examples of equality failures: they differ
# by page multiples of 8272, and usually by 1-3 pages. Tolerate 4 to avoid
# failing on outliers from that observed range.
threshold = 4 * 8272
assert size_a == pytest.approx(size_b, abs=threshold)
@xfail_on_postgres(PgVersion.V15, reason="Test significantly more flaky on Postgres 15")
def test_get_tenant_size_with_multiple_branches(
neon_env_builder: NeonEnvBuilder, test_output_dir: Path
):
@@ -590,7 +573,7 @@ def test_get_tenant_size_with_multiple_branches(
)
size_after_first_branch = http_client.tenant_size(tenant_id)
assert_size_approx_equal(size_after_first_branch, size_at_branch)
assert size_after_first_branch == size_at_branch
first_branch_endpoint = env.endpoints.create_start("first-branch", tenant_id=tenant_id)
@@ -616,7 +599,7 @@ def test_get_tenant_size_with_multiple_branches(
"second-branch", main_branch_name, tenant_id
)
size_after_second_branch = http_client.tenant_size(tenant_id)
assert_size_approx_equal(size_after_second_branch, size_after_continuing_on_main)
assert size_after_second_branch == size_after_continuing_on_main
second_branch_endpoint = env.endpoints.create_start("second-branch", tenant_id=tenant_id)
@@ -652,7 +635,7 @@ def test_get_tenant_size_with_multiple_branches(
# tenant_size but so far this has been reliable, even though at least gc
# and tenant_size race for the same locks
size_after = http_client.tenant_size(tenant_id)
assert_size_approx_equal(size_after, size_after_thinning_branch)
assert size_after == size_after_thinning_branch
size_debug_file_before = open(test_output_dir / "size_debug_before.html", "w")
size_debug = http_client.tenant_size_debug(tenant_id)

View File

@@ -12,6 +12,7 @@ from fixtures.log_helper import log
from fixtures.metrics import (
PAGESERVER_GLOBAL_METRICS,
PAGESERVER_PER_TENANT_METRICS,
PAGESERVER_PER_TENANT_REMOTE_TIMELINE_CLIENT_METRICS,
parse_metrics,
)
from fixtures.neon_fixtures import (
@@ -231,10 +232,17 @@ def test_metrics_normal_work(neon_env_builder: NeonEnvBuilder):
assert value
def test_pageserver_metrics_removed_after_detach(neon_env_builder: NeonEnvBuilder):
@pytest.mark.parametrize(
"remote_storage_kind",
# exercise both the code paths where remote_storage=None and remote_storage=Some(...)
[RemoteStorageKind.NOOP, RemoteStorageKind.MOCK_S3],
)
def test_pageserver_metrics_removed_after_detach(
neon_env_builder: NeonEnvBuilder, remote_storage_kind: RemoteStorageKind
):
"""Tests that when a tenant is detached, the tenant specific metrics are not left behind"""
neon_env_builder.enable_pageserver_remote_storage(RemoteStorageKind.MOCK_S3)
neon_env_builder.enable_pageserver_remote_storage(remote_storage_kind)
neon_env_builder.num_safekeepers = 3
@@ -274,6 +282,9 @@ def test_pageserver_metrics_removed_after_detach(neon_env_builder: NeonEnvBuilde
for tenant in [tenant_1, tenant_2]:
pre_detach_samples = set([x.name for x in get_ps_metric_samples_for_tenant(tenant)])
expected = set(PAGESERVER_PER_TENANT_METRICS)
if remote_storage_kind == RemoteStorageKind.NOOP:
# if there's no remote storage configured, we don't expose the remote timeline client metrics
expected -= set(PAGESERVER_PER_TENANT_REMOTE_TIMELINE_CLIENT_METRICS)
assert pre_detach_samples == expected
env.pageserver.http_client().tenant_detach(tenant)
@@ -283,7 +294,9 @@ def test_pageserver_metrics_removed_after_detach(neon_env_builder: NeonEnvBuilde
# Check that empty tenants work with or without the remote storage
@pytest.mark.parametrize("remote_storage_kind", available_remote_storages())
@pytest.mark.parametrize(
"remote_storage_kind", available_remote_storages() + [RemoteStorageKind.NOOP]
)
def test_pageserver_with_empty_tenants(
neon_env_builder: NeonEnvBuilder, remote_storage_kind: RemoteStorageKind
):

View File

@@ -12,6 +12,7 @@ from fixtures.neon_fixtures import (
NeonEnvBuilder,
PgBin,
last_flush_lsn_upload,
wait_for_last_flush_lsn,
)
from fixtures.pageserver.http import PageserverApiException
from fixtures.pageserver.utils import (
@@ -144,12 +145,19 @@ DELETE_FAILPOINTS = [
def combinations():
result = []
remotes = [RemoteStorageKind.MOCK_S3]
remotes = [RemoteStorageKind.NOOP, RemoteStorageKind.MOCK_S3]
if os.getenv("ENABLE_REAL_S3_REMOTE_STORAGE"):
remotes.append(RemoteStorageKind.REAL_S3)
for remote_storage_kind in remotes:
for delete_failpoint in DELETE_FAILPOINTS:
if remote_storage_kind == RemoteStorageKind.NOOP and delete_failpoint in (
"timeline-delete-before-index-delete",
"timeline-delete-after-index-delete",
):
# the above failpoints are not relevant for config without remote storage
continue
result.append((remote_storage_kind, delete_failpoint))
return result
@@ -197,21 +205,23 @@ def test_delete_timeline_exercise_crash_safety_failpoints(
with env.endpoints.create_start("delete") as endpoint:
# generate enough layers
run_pg_bench_small(pg_bin, endpoint.connstr())
if remote_storage_kind is RemoteStorageKind.NOOP:
wait_for_last_flush_lsn(env, endpoint, env.initial_tenant, timeline_id)
else:
last_flush_lsn_upload(env, endpoint, env.initial_tenant, timeline_id)
last_flush_lsn_upload(env, endpoint, env.initial_tenant, timeline_id)
if remote_storage_kind in available_s3_storages():
assert_prefix_not_empty(
neon_env_builder,
prefix="/".join(
(
"tenants",
str(env.initial_tenant),
"timelines",
str(timeline_id),
)
),
)
if remote_storage_kind in available_s3_storages():
assert_prefix_not_empty(
neon_env_builder,
prefix="/".join(
(
"tenants",
str(env.initial_tenant),
"timelines",
str(timeline_id),
)
),
)
env.pageserver.allowed_errors.append(f".*{timeline_id}.*failpoint: {failpoint}")
# It appears when we stopped flush loop during deletion and then pageserver is stopped

View File

@@ -301,8 +301,12 @@ def test_timeline_initial_logical_size_calculation_cancellation(
# message emitted by the code behind failpoint "timeline-calculate-logical-size-check-dir-exists"
def test_timeline_physical_size_init(neon_env_builder: NeonEnvBuilder):
neon_env_builder.enable_pageserver_remote_storage(RemoteStorageKind.LOCAL_FS)
@pytest.mark.parametrize("remote_storage_kind", [None, RemoteStorageKind.LOCAL_FS])
def test_timeline_physical_size_init(
neon_env_builder: NeonEnvBuilder, remote_storage_kind: Optional[RemoteStorageKind]
):
if remote_storage_kind is not None:
neon_env_builder.enable_pageserver_remote_storage(remote_storage_kind)
env = neon_env_builder.init_start()
@@ -333,12 +337,17 @@ def test_timeline_physical_size_init(neon_env_builder: NeonEnvBuilder):
)
assert_physical_size_invariants(
get_physical_size_values(env, env.initial_tenant, new_timeline_id),
get_physical_size_values(env, env.initial_tenant, new_timeline_id, remote_storage_kind),
remote_storage_kind,
)
def test_timeline_physical_size_post_checkpoint(neon_env_builder: NeonEnvBuilder):
neon_env_builder.enable_pageserver_remote_storage(RemoteStorageKind.LOCAL_FS)
@pytest.mark.parametrize("remote_storage_kind", [None, RemoteStorageKind.LOCAL_FS])
def test_timeline_physical_size_post_checkpoint(
neon_env_builder: NeonEnvBuilder, remote_storage_kind: Optional[RemoteStorageKind]
):
if remote_storage_kind is not None:
neon_env_builder.enable_pageserver_remote_storage(remote_storage_kind)
env = neon_env_builder.init_start()
@@ -360,14 +369,19 @@ def test_timeline_physical_size_post_checkpoint(neon_env_builder: NeonEnvBuilder
def check():
assert_physical_size_invariants(
get_physical_size_values(env, env.initial_tenant, new_timeline_id),
get_physical_size_values(env, env.initial_tenant, new_timeline_id, remote_storage_kind),
remote_storage_kind,
)
wait_until(10, 1, check)
def test_timeline_physical_size_post_compaction(neon_env_builder: NeonEnvBuilder):
neon_env_builder.enable_pageserver_remote_storage(RemoteStorageKind.LOCAL_FS)
@pytest.mark.parametrize("remote_storage_kind", [None, RemoteStorageKind.LOCAL_FS])
def test_timeline_physical_size_post_compaction(
neon_env_builder: NeonEnvBuilder, remote_storage_kind: Optional[RemoteStorageKind]
):
if remote_storage_kind is not None:
neon_env_builder.enable_pageserver_remote_storage(remote_storage_kind)
# Disable background compaction as we don't want it to happen after `get_physical_size` request
# and before checking the expected size on disk, which makes the assertion failed
@@ -406,15 +420,21 @@ def test_timeline_physical_size_post_compaction(neon_env_builder: NeonEnvBuilder
pageserver_http.timeline_checkpoint(env.initial_tenant, new_timeline_id)
pageserver_http.timeline_compact(env.initial_tenant, new_timeline_id)
wait_for_upload_queue_empty(pageserver_http, env.initial_tenant, new_timeline_id)
if remote_storage_kind is not None:
wait_for_upload_queue_empty(pageserver_http, env.initial_tenant, new_timeline_id)
assert_physical_size_invariants(
get_physical_size_values(env, env.initial_tenant, new_timeline_id),
get_physical_size_values(env, env.initial_tenant, new_timeline_id, remote_storage_kind),
remote_storage_kind,
)
def test_timeline_physical_size_post_gc(neon_env_builder: NeonEnvBuilder):
neon_env_builder.enable_pageserver_remote_storage(RemoteStorageKind.LOCAL_FS)
@pytest.mark.parametrize("remote_storage_kind", [None, RemoteStorageKind.LOCAL_FS])
def test_timeline_physical_size_post_gc(
neon_env_builder: NeonEnvBuilder, remote_storage_kind: Optional[RemoteStorageKind]
):
if remote_storage_kind is not None:
neon_env_builder.enable_pageserver_remote_storage(remote_storage_kind)
# Disable background compaction and GC as we don't want it to happen after `get_physical_size` request
# and before checking the expected size on disk, which makes the assertion failed
@@ -451,10 +471,12 @@ def test_timeline_physical_size_post_gc(neon_env_builder: NeonEnvBuilder):
pageserver_http.timeline_checkpoint(env.initial_tenant, new_timeline_id)
pageserver_http.timeline_gc(env.initial_tenant, new_timeline_id, gc_horizon=None)
wait_for_upload_queue_empty(pageserver_http, env.initial_tenant, new_timeline_id)
if remote_storage_kind is not None:
wait_for_upload_queue_empty(pageserver_http, env.initial_tenant, new_timeline_id)
assert_physical_size_invariants(
get_physical_size_values(env, env.initial_tenant, new_timeline_id),
get_physical_size_values(env, env.initial_tenant, new_timeline_id, remote_storage_kind),
remote_storage_kind,
)
@@ -538,10 +560,14 @@ def test_timeline_size_metrics(
assert math.isclose(dbsize_sum, tl_logical_size_metric, abs_tol=2 * 1024 * 1024)
def test_tenant_physical_size(neon_env_builder: NeonEnvBuilder):
@pytest.mark.parametrize("remote_storage_kind", [None, RemoteStorageKind.LOCAL_FS])
def test_tenant_physical_size(
neon_env_builder: NeonEnvBuilder, remote_storage_kind: Optional[RemoteStorageKind]
):
random.seed(100)
neon_env_builder.enable_pageserver_remote_storage(RemoteStorageKind.LOCAL_FS)
if remote_storage_kind is not None:
neon_env_builder.enable_pageserver_remote_storage(remote_storage_kind)
env = neon_env_builder.init_start()
@@ -549,10 +575,12 @@ def test_tenant_physical_size(neon_env_builder: NeonEnvBuilder):
client = env.pageserver.http_client()
tenant, timeline = env.neon_cli.create_tenant()
if remote_storage_kind is not None:
wait_for_upload_queue_empty(pageserver_http, tenant, timeline)
def get_timeline_resident_physical_size(timeline: TimelineId):
sizes = get_physical_size_values(env, tenant, timeline)
assert_physical_size_invariants(sizes)
sizes = get_physical_size_values(env, tenant, timeline, remote_storage_kind)
assert_physical_size_invariants(sizes, remote_storage_kind)
return sizes.prometheus_resident_physical
timeline_total_resident_physical_size = get_timeline_resident_physical_size(timeline)
@@ -572,7 +600,8 @@ def test_tenant_physical_size(neon_env_builder: NeonEnvBuilder):
wait_for_last_flush_lsn(env, endpoint, tenant, timeline)
pageserver_http.timeline_checkpoint(tenant, timeline)
wait_for_upload_queue_empty(pageserver_http, tenant, timeline)
if remote_storage_kind is not None:
wait_for_upload_queue_empty(pageserver_http, tenant, timeline)
timeline_total_resident_physical_size += get_timeline_resident_physical_size(timeline)
@@ -601,6 +630,7 @@ def get_physical_size_values(
env: NeonEnv,
tenant_id: TenantId,
timeline_id: TimelineId,
remote_storage_kind: Optional[RemoteStorageKind],
) -> TimelinePhysicalSizeValues:
res = TimelinePhysicalSizeValues()
@@ -616,9 +646,12 @@ def get_physical_size_values(
res.prometheus_resident_physical = metrics.query_one(
"pageserver_resident_physical_size", metrics_filter
).value
res.prometheus_remote_physical = metrics.query_one(
"pageserver_remote_physical_size", metrics_filter
).value
if remote_storage_kind is not None:
res.prometheus_remote_physical = metrics.query_one(
"pageserver_remote_physical_size", metrics_filter
).value
else:
res.prometheus_remote_physical = None
detail = client.timeline_detail(
tenant_id, timeline_id, include_timeline_dir_layer_file_size_sum=True
@@ -631,15 +664,20 @@ def get_physical_size_values(
return res
def assert_physical_size_invariants(sizes: TimelinePhysicalSizeValues):
def assert_physical_size_invariants(
sizes: TimelinePhysicalSizeValues, remote_storage_kind: Optional[RemoteStorageKind]
):
# resident phyiscal size is defined as
assert sizes.python_timelinedir_layerfiles_physical == sizes.prometheus_resident_physical
assert sizes.python_timelinedir_layerfiles_physical == sizes.layer_map_file_size_sum
# we don't do layer eviction, so, all layers are resident
assert sizes.api_current_physical == sizes.prometheus_resident_physical
assert sizes.prometheus_resident_physical == sizes.prometheus_remote_physical
# XXX would be nice to assert layer file physical storage utilization here as well, but we can only do that for LocalFS
if remote_storage_kind is not None:
assert sizes.prometheus_resident_physical == sizes.prometheus_remote_physical
# XXX would be nice to assert layer file physical storage utilization here as well, but we can only do that for LocalFS
else:
assert sizes.prometheus_remote_physical is None
# Timeline logical size initialization is an asynchronous background task that runs once,