Compare commits

..

23 Commits

Author SHA1 Message Date
Alex Chi Z
acb171b8b3 fix(build): warnings in dockerfile
Signed-off-by: Alex Chi Z <chi@neon.tech>
2024-08-09 12:36:38 +08:00
John Spray
4431688dc6 tests: don't require kafka client for regular tests (#8662)
## Problem

We're adding more third party dependencies to support more diverse +
realistic test cases in `test_runner/logical_repl`. I ❤️ these
tests, they are a good thing.

The slight glitch is that python packaging is hard, and some third party
python packages have issues. For example the current kafka dependency
doesn't work on latest python. We can mitigate that by only importing
these more specialized dependencies in the tests that use them.

## Summary of changes

- Move the `kafka` import into a test body, so that folks running the
regular `test_runner/regress` tests don't have to have a working kafka
client package.
2024-08-08 19:24:21 +01:00
John Spray
953b7d4f7e pageserver: remove paranoia double-calculation of retain_lsns (#8617)
## Problem

This code was to mitigate risk in
https://github.com/neondatabase/neon/pull/8427

As expected, we did not hit this code path - the new continuous updates
of gc_info are working fine, we can remove this code now.

## Summary of changes

- Remove block that double-checks retain_lsns
2024-08-08 12:57:48 +01:00
Joonas Koivunen
8561b2c628 fix: stop leaking BackgroundPurges (#8650)
avoid "leaking" the completions of BackgroundPurges by:

1. switching it to TaskTracker for provided close+wait
2. stop using tokio::fs::remove_dir_all which will consume two units of
memory instead of one blocking task

Additionally, use more graceful shutdown in tests which do actually some
background cleanup.
2024-08-08 12:02:53 +01:00
Joonas Koivunen
21638ee96c fix(test): do not fail test for filesystem race (#8643)
evidence:
https://neon-github-public-dev.s3.amazonaws.com/reports/pr-8632/10287641784/index.html#suites/0e58fb04d9998963e98e45fe1880af7d/c7a46335515142b/
2024-08-08 10:34:47 +01:00
Konstantin Knizhnik
cbe8c77997 Use sycnhronous commit for logical replicaiton worker (#8645)
## Problem

See
https://neondb.slack.com/archives/C03QLRH7PPD/p1723038557449239?thread_ts=1722868375.476789&cid=C03QLRH7PPD


Logical replication subscription by default use `synchronous_commit=off`
which cause problems with safekeeper

## Summary of changes

Set `synchronous_commit=on` for logical replication subscription in
test_subscriber_restart.py

## Checklist before requesting a review

- [ ] I have performed a self-review of my code.
- [ ] If it is a core feature, I have added thorough tests.
- [ ] Do we need to implement analytics? if so did you add the relevant
metrics to the dashboard?
- [ ] If this PR requires public announcement, mark it with
/release-notes label and add several sentences in this section.

## Checklist before merging

- [ ] Do not forget to reformat commit message to not include the above
checklist

---------

Co-authored-by: Konstantin Knizhnik <knizhnik@neon.tech>
Co-authored-by: Heikki Linnakangas <heikki@neon.tech>
2024-08-08 10:23:57 +03:00
John Spray
cf3eac785b pageserver: make bench_ingest build (but panic) on macOS (#8641)
## Problem

Some developers build on MacOS, which doesn't have  io_uring.

## Summary of changes

- Add `io_engine_for_bench`, which on linux will give io_uring or panic
if it's unavailable, and on MacOS will always panic.

We do not want to run such benchmarks with StdFs: the results aren't
interesting, and will actively waste the time of any developers who
start investigating performance before they realize they're using a
known-slow I/O backend.

Why not just conditionally compile this benchmark on linux only? Because
even on linux, I still want it to refuse to run if it can't get
io_uring.
2024-08-07 21:17:08 +01:00
Yuchen Liang
542385e364 feat(pageserver): add direct io pageserver config (#8622)
Part of #8130, [RFC: Direct IO For Pageserver](https://github.com/neondatabase/neon/blob/problame/direct-io-rfc/docs/rfcs/034-direct-io-for-pageserver.md)

## Description

Add pageserver config for evaluating/enabling direct I/O. 

- Disabled: current default, uses buffered io as is.
- Evaluate: still uses buffered io, but could do alignment checking and
perf simulation (pad latency by direct io RW to a fake file).
- Enabled: uses direct io, behavior on alignment error is configurable.


Signed-off-by: Yuchen Liang <yuchen@neon.tech>
2024-08-07 21:04:19 +01:00
Joonas Koivunen
05dd1ae9e0 fix: drain completed page_service connections (#8632)
We've noticed increased memory usage with the latest release. Drain the
joinset of `page_service` connection handlers to avoid leaking them
until shutdown. An alternative would be to use a TaskTracker.
TaskTracker was not discussed in original PR #8339 review, so not hot
fixing it in here either.
2024-08-07 17:14:45 +00:00
Cihan Demirci
8468d51a14 cicd: push build-tools image to ACR as well (#8638)
https://github.com/neondatabase/cloud/issues/15899
2024-08-07 17:53:47 +01:00
Joonas Koivunen
a81fab4826 refactor(timeline_detach_ancestor): replace ordered reparented with a hashset (#8629)
Earlier I was thinking we'd need a (ancestor_lsn, timeline_id) ordered
list of reparented. Turns out we did not need it at all. Replace it with
an unordered hashset. Additionally refactor the reparented direct
children query out, it will later be used from more places.

Split off from #8430.

Cc: #6994
2024-08-07 18:19:00 +02:00
Alex Chi Z.
b3eea45277 fix(pageserver): dump the key when it's invalid (#8633)
We see an assertion error in staging. Dump the key to guess where it was
from, and then we can fix it.

Signed-off-by: Alex Chi Z <chi@neon.tech>
2024-08-07 16:37:46 +01:00
Joonas Koivunen
fc78774f39 fix: EphemeralFiles can outlive their Timeline via enum LayerManager (#8229)
Ephemeral files cleanup on drop but did not delay shutdown, leading to
problems with restarting the tenant. The solution is as proposed:
- make ephemeral files carry the gate guard to delay `Timeline::gate`
closing
- flush in-memory layers and strong references to those on
`Timeline::shutdown`

The above are realized by making LayerManager an `enum` with `Open` and
`Closed` variants, and fail requests to modify `LayerMap`.

Additionally:

- fix too eager anyhow conversions in compaction
- unify how we freeze layers and handle errors
- optimize likely_resident_layers to read LayerFileManager hashmap
values instead of bouncing through LayerMap

Fixes: #7830
2024-08-07 17:50:09 +03:00
Conrad Ludgate
ad0988f278 proxy: random changes (#8602)
## Problem

1. Hard to correlate startup parameters with the endpoint that provided
them.
2. Some configurations are not needed in the `ProxyConfig` struct.

## Summary of changes

Because of some borrow checker fun, I needed to switch to an
interior-mutability implementation of our `RequestMonitoring` context
system. Using https://docs.rs/try-lock/latest/try_lock/ as a cheap lock
for such a use-case (needed to be thread safe).

Removed the lock of each startup message, instead just logging only the
startup params in a successful handshake.

Also removed from values from `ProxyConfig` and kept as arguments.
(needed for local-proxy config)
2024-08-07 14:37:03 +01:00
Arpad Müller
4d7c0dac93 Add missing colon to ArchivalConfigRequest specification (#8627)
Add a missing colon to the API specification of `ArchivalConfigRequest`.
The `state` field is required. Pointed out by Gleb.
2024-08-07 14:53:52 +02:00
Arpad Müller
00c981576a Lower level for timeline cancellations during gc (#8626)
Timeline cancellation running in parallel with gc yields error log lines
like:

```
Gc failed 1 times, retrying in 2s: TimelineCancelled
```

They are completely harmless though and normal to occur. Therefore, only
print those messages at an info level. Still print them at all so that
we know what is going on if we focus on a single timeline.
2024-08-07 09:29:52 +02:00
Arpad Müller
c3f2240fbd storage broker: only print one line for version and build tag in init (#8624)
This makes it more consistent with pageserver and safekeeper. Also, it
is easier to collect the two values into one data point.
2024-08-07 09:14:26 +02:00
Yuchen Liang
ed5724d79d scrubber: clean up scan_metadata before prod (#8565)
Part of #8128.

## Problem
Currently, scrubber `scan_metadata` command will return with an error
code if the metadata on remote storage is corrupted with fatal errors.
To safely deploy this command in a cronjob, we want to differentiate
between failures while running scrubber command and the erroneous
metadata. At the same time, we also want our regression tests to catch
corrupted metadata using the scrubber command.

## Summary of changes

- Return with error code only when the scrubber command fails
- Uses explicit checks on errors and warnings to determine metadata
health in regression tests.

**Resolve conflict with `tenant-snapshot` command (after shard split):**
[`test_scrubber_tenant_snapshot`](https://github.com/neondatabase/neon/blob/yuchen/scrubber-scan-cleanup-before-prod/test_runner/regress/test_storage_scrubber.py#L23)
failed before applying 422a8443dd
- When taking a snapshot, the old `index_part.json` in the unsharded
tenant directory is not kept.
- The current `list_timeline_blobs` implementation consider no
`index_part.json` as a parse error.
- During the scan, we are only analyzing shards with highest shard
count, so we will not get a parse error. but we do need to add the
layers to tenant object listing, otherwise we will get index is
referencing a layer that is not in remote storage error.
- **Action:** Add s3_layers from `list_timeline_blobs` regardless of
parsing error

Signed-off-by: Yuchen Liang <yuchen@neon.tech>
2024-08-06 18:55:42 +01:00
John Spray
ca5390a89d pageserver: add bench_ingest (#7409)
## Problem

We lack a rust bench for the inmemory layer and delta layer write paths:
it is useful to benchmark these components independent of postgres & WAL
decoding.

Related: https://github.com/neondatabase/neon/issues/8452

## Summary of changes

- Refactor DeltaLayerWriter to avoid carrying a Timeline, so that it can
be cleanly tested + benched without a Tenant/Timeline test harness. It
only needed the Timeline for building `Layer`, so this can be done in a
separate step.
- Add `bench_ingest`, which exercises a variety of workload "shapes"
(big values, small values, sequential keys, random keys)
- Include a small uncontroversial optimization: in `freeze`, only
exhaustively walk values to assert ordering relative to end_lsn in debug
mode.

These benches are limited by drive performance on a lot of machines, but
still useful as a local tool for iterating on CPU/memory improvements
around this code path.

Anecdotal measurements on Hetzner AX102 (Ryzen 7950xd):

```

ingest-small-values/ingest 128MB/100b seq
                        time:   [1.1160 s 1.1230 s 1.1289 s]
                        thrpt:  [113.38 MiB/s 113.98 MiB/s 114.70 MiB/s]
Found 1 outliers among 10 measurements (10.00%)
  1 (10.00%) low mild
Benchmarking ingest-small-values/ingest 128MB/100b rand: Warming up for 3.0000 s
Warning: Unable to complete 10 samples in 10.0s. You may wish to increase target time to 18.9s.
ingest-small-values/ingest 128MB/100b rand
                        time:   [1.9001 s 1.9056 s 1.9110 s]
                        thrpt:  [66.982 MiB/s 67.171 MiB/s 67.365 MiB/s]
Benchmarking ingest-small-values/ingest 128MB/100b rand-1024keys: Warming up for 3.0000 s
Warning: Unable to complete 10 samples in 10.0s. You may wish to increase target time to 11.0s.
ingest-small-values/ingest 128MB/100b rand-1024keys
                        time:   [1.0715 s 1.0828 s 1.0937 s]
                        thrpt:  [117.04 MiB/s 118.21 MiB/s 119.46 MiB/s]
ingest-small-values/ingest 128MB/100b seq, no delta
                        time:   [425.49 ms 429.07 ms 432.04 ms]
                        thrpt:  [296.27 MiB/s 298.32 MiB/s 300.83 MiB/s]
Found 1 outliers among 10 measurements (10.00%)
  1 (10.00%) low mild

ingest-big-values/ingest 128MB/8k seq
                        time:   [373.03 ms 375.84 ms 379.17 ms]
                        thrpt:  [337.58 MiB/s 340.57 MiB/s 343.13 MiB/s]
Found 1 outliers among 10 measurements (10.00%)
  1 (10.00%) high mild
ingest-big-values/ingest 128MB/8k seq, no delta
                        time:   [81.534 ms 82.811 ms 83.364 ms]
                        thrpt:  [1.4994 GiB/s 1.5095 GiB/s 1.5331 GiB/s]
Found 1 outliers among 10 measurements (10.00%)


```
2024-08-06 16:39:40 +00:00
John Spray
3727c6fbbe pageserver: use layer visibility when composing heatmap (#8616)
## Problem

Sometimes, a layer is Covered by hasn't yet been evicted from local disk
(e.g. shortly after image layer generation). It is not good use of
resources to download these to a secondary location, as there's a good
chance they will never be read.

This follows the previous change that added layer visibility:
- #8511 

Part of epic:
- https://github.com/neondatabase/neon/issues/8398

## Summary of changes

- When generating heatmaps, only include Visible layers
- Update test_secondary_downloads to filter to visible layers when
listing layers from an attached location
2024-08-06 17:15:40 +01:00
John Spray
42229aacf6 pageserver: fixes for layer visibility metric (#8603)
## Problem

In staging, we could see that occasionally tenants were wrapping their
pageserver_visible_physical_size metric past zero to 2^64.

This is harmless right now, but will matter more later when we start
using visible size in things like the /utilization endpoint.

## Summary of changes

- Add debug asserts that detect this case. `test_gc_of_remote_layers`
works as a reproducer for this issue once the asserts are added.
- Tighten up the interface around access_stats so that only Layer can
mutate it.
- In Layer, wrap calls to `record_access` in code that will update the
visible size statistic if the access implicitly marks the layer visible
(this was what caused the bug)
- In LayerManager::rewrite_layers, use the proper set_visibility layer
function instead of directly using access_stats (this is an additional
path where metrics could go bad.)
- Removed unused instances of LayerAccessStats in DeltaLayer and
ImageLayer which I noticed while reviewing the code paths that call
record_access.
2024-08-06 14:47:01 +01:00
John Spray
b7beaa0fd7 tests: improve stability of test_storage_controller_many_tenants (#8607)
## Problem

The controller scale test does random migrations. These mutate secondary
locations, and therefore can cause secondary optimizations to happen in
the background, violating the test's expectation that consistency_check
will work as there are no reconciliations running.

Example:
https://neon-github-public-dev.s3.amazonaws.com/reports/main/10247161379/index.html#suites/07874de07c4a1c9effe0d92da7755ebf/6316beacd3fb3060/

## Summary of changes

- Only migrate to existing secondary locations, not randomly picked
nodes, so that we can do a fast reconcile_until_idle (otherwise
reconcile_until_idle is takes a long time to create new secondary
locations).
- Do a reconcile_until_idle before consistency_check.
2024-08-06 12:58:33 +01:00
a-masterov
16c91ff5d3 enable rum test (#8380)
## Problem
We need to test the rum extension automatically as a path of the GitHub
workflow

## Summary of changes

rum test is enabled
2024-08-06 13:56:42 +02:00
91 changed files with 1727 additions and 1005 deletions

View File

@@ -66,8 +66,22 @@ jobs:
username: ${{ secrets.AWS_ACCESS_KEY_DEV }}
password: ${{ secrets.AWS_SECRET_KEY_DEV }}
- name: Tag build-tools with `${{ env.TO_TAG }}` in ECR
- name: Azure login
if: steps.check-manifests.outputs.skip == 'false'
uses: azure/login@6c251865b4e6290e7b78be643ea2d005bc51f69a # @v2.1.1
with:
client-id: ${{ secrets.AZURE_DEV_CLIENT_ID }}
tenant-id: ${{ secrets.AZURE_TENANT_ID }}
subscription-id: ${{ secrets.AZURE_DEV_SUBSCRIPTION_ID }}
- name: Login to ACR
if: steps.check-manifests.outputs.skip == 'false'
run: |
az acr login --name=neoneastus2
- name: Tag build-tools with `${{ env.TO_TAG }}` in ECR and ACR
if: steps.check-manifests.outputs.skip == 'false'
run: |
docker buildx imagetools create -t 369495373322.dkr.ecr.eu-central-1.amazonaws.com/build-tools:${TO_TAG} \
-t neoneastus2.azurecr.io/neondatabase/build-tools:${TO_TAG} \
neondatabase/build-tools:${FROM_TAG}

5
Cargo.lock generated
View File

@@ -4324,6 +4324,7 @@ dependencies = [
"tracing-opentelemetry",
"tracing-subscriber",
"tracing-utils",
"try-lock",
"typed-json",
"url",
"urlencoding",
@@ -6563,9 +6564,9 @@ dependencies = [
[[package]]
name = "try-lock"
version = "0.2.4"
version = "0.2.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "3528ecfd12c466c6f163363caf2d02a71161dd5e1cc6ae7b34207ea2d42d81ed"
checksum = "e421abadd41a4225275504ea4d6566923418b7f05506fbc9c0fe86ba7396114b"
[[package]]
name = "tungstenite"

View File

@@ -184,6 +184,7 @@ tracing = "0.1"
tracing-error = "0.2.0"
tracing-opentelemetry = "0.21.0"
tracing-subscriber = { version = "0.3", default-features = false, features = ["smallvec", "fmt", "tracing-log", "std", "env-filter", "json"] }
try-lock = "0.2.5"
twox-hash = { version = "1.6.3", default-features = false }
typed-json = "0.1"
url = "2.2"

View File

@@ -33,8 +33,8 @@ ARG BUILD_TAG
# Set up cachepot to use an AWS S3 bucket for cache results, to reuse it between `docker build` invocations.
# cachepot falls back to local filesystem if S3 is misconfigured, not failing the build
ARG RUSTC_WRAPPER=cachepot
ENV AWS_REGION=eu-central-1
ENV CACHEPOT_S3_KEY_PREFIX=cachepot
ENV AWS_REGION eu-central-1
ENV CACHEPOT_S3_KEY_PREFIX cachepot
ARG CACHEPOT_BUCKET=neon-github-dev
#ARG AWS_ACCESS_KEY_ID
#ARG AWS_SECRET_ACCESS_KEY

View File

@@ -66,13 +66,13 @@ RUN curl -fsSL "https://github.com/protocolbuffers/protobuf/releases/download/v$
&& rm -rf protoc.zip protoc
# s5cmd
ENV S5CMD_VERSION=2.2.2
ENV S5CMD_VERSION 2.2.2
RUN curl -sL "https://github.com/peak/s5cmd/releases/download/v${S5CMD_VERSION}/s5cmd_${S5CMD_VERSION}_Linux-$(uname -m | sed 's/x86_64/64bit/g' | sed 's/aarch64/arm64/g').tar.gz" | tar zxvf - s5cmd \
&& chmod +x s5cmd \
&& mv s5cmd /usr/local/bin/s5cmd
# LLVM
ENV LLVM_VERSION=18
ENV LLVM_VERSION 18
RUN curl -fsSL 'https://apt.llvm.org/llvm-snapshot.gpg.key' | apt-key add - \
&& echo "deb http://apt.llvm.org/${DEBIAN_VERSION_CODENAME}/ llvm-toolchain-${DEBIAN_VERSION_CODENAME}-${LLVM_VERSION} main" > /etc/apt/sources.list.d/llvm.stable.list \
&& apt update \
@@ -125,8 +125,8 @@ RUN for package in Capture::Tiny DateTime Devel::Cover Digest::MD5 File::Spec JS
&& rm -rf ../lcov.tar.gz
# Compile and install the static OpenSSL library
ENV OPENSSL_VERSION=1.1.1w
ENV OPENSSL_PREFIX=/usr/local/openssl
ENV OPENSSL_VERSION 1.1.1w
ENV OPENSSL_PREFIX /usr/local/openssl
RUN wget -O /tmp/openssl-${OPENSSL_VERSION}.tar.gz https://www.openssl.org/source/openssl-${OPENSSL_VERSION}.tar.gz && \
echo "cf3098950cb4d853ad95c0841f1f9c6d3dc102dccfcacd521d93925208b76ac8 /tmp/openssl-${OPENSSL_VERSION}.tar.gz" | sha256sum --check && \
cd /tmp && \
@@ -145,8 +145,8 @@ RUN wget -O /tmp/openssl-${OPENSSL_VERSION}.tar.gz https://www.openssl.org/sourc
# TODO: at this time, Dockerfile.compute-node uses the debian bullseye libicu
# package, which is 67.1. We're duplicating that knowledge here, and also, technically,
# Debian has a few patches on top of 67.1 that we're not adding here.
ENV ICU_VERSION=67.1
ENV ICU_PREFIX=/usr/local/icu
ENV ICU_VERSION 67.1
ENV ICU_PREFIX /usr/local/icu
# Download and build static ICU
RUN wget -O /tmp/libicu-${ICU_VERSION}.tgz https://github.com/unicode-org/icu/releases/download/release-${ICU_VERSION//./-}/icu4c-${ICU_VERSION//./_}-src.tgz && \
@@ -168,9 +168,9 @@ USER nonroot:nonroot
WORKDIR /home/nonroot
# Python
ENV PYTHON_VERSION=3.9.18 \
PYENV_ROOT=/home/nonroot/.pyenv \
PATH=/home/nonroot/.pyenv/shims:/home/nonroot/.pyenv/bin:/home/nonroot/.poetry/bin:$PATH
ENV PYTHON_VERSION 3.9.18
ENV PYENV_ROOT /home/nonroot/.pyenv
ENV PATH /home/nonroot/.pyenv/shims:/home/nonroot/.pyenv/bin:/home/nonroot/.poetry/bin:$PATH
RUN set -e \
&& cd $HOME \
&& curl -sSO https://raw.githubusercontent.com/pyenv/pyenv-installer/master/bin/pyenv-installer \
@@ -192,9 +192,9 @@ WORKDIR /home/nonroot
# Rust
# Please keep the version of llvm (installed above) in sync with rust llvm (`rustc --version --verbose | grep LLVM`)
ENV RUSTC_VERSION=1.80.0
ENV RUSTUP_HOME="/home/nonroot/.rustup"
ENV PATH="/home/nonroot/.cargo/bin:${PATH}"
ENV RUSTC_VERSION 1.80.0
ENV RUSTUP_HOME "/home/nonroot/.rustup"
ENV PATH "/home/nonroot/.cargo/bin:${PATH}"
RUN curl -sSO https://static.rust-lang.org/rustup/dist/$(uname -m)-unknown-linux-gnu/rustup-init && whoami && \
chmod +x rustup-init && \
./rustup-init -y --default-toolchain ${RUSTC_VERSION} && \
@@ -211,7 +211,7 @@ RUN curl -sSO https://static.rust-lang.org/rustup/dist/$(uname -m)-unknown-linux
cargo install cargo-nextest && \
rm -rf /home/nonroot/.cargo/registry && \
rm -rf /home/nonroot/.cargo/git
ENV RUSTC_WRAPPER=cachepot
ENV RUSTC_WRAPPER cachepot
# Show versions
RUN whoami \

View File

@@ -647,8 +647,8 @@ RUN apt-get update && \
apt-get install -y curl libclang-dev cmake && \
useradd -ms /bin/bash nonroot -b /home
ENV HOME=/home/nonroot
ENV PATH="/home/nonroot/.cargo/bin:/usr/local/pgsql/bin/:$PATH"
ENV HOME /home/nonroot
ENV PATH "/home/nonroot/.cargo/bin:/usr/local/pgsql/bin/:$PATH"
USER nonroot
WORKDIR /home/nonroot
ARG PG_VERSION
@@ -873,7 +873,7 @@ RUN make -j $(getconf _NPROCESSORS_ONLN) \
#########################################################################################
FROM $REPOSITORY/$IMAGE:$TAG AS compute-tools
ARG BUILD_TAG
ENV BUILD_TAG=$BUILD_TAG
ENV BUILD_TAG $BUILD_TAG
USER nonroot
# Copy entire project to get Cargo.* files with proper dependencies for the whole project
@@ -933,7 +933,8 @@ COPY --from=pgjwt-pg-build /pgjwt.tar.gz /ext-src
#COPY --from=pg-tiktoken-pg-build /home/nonroot/pg_tiktoken.tar.gz /ext-src
COPY --from=hypopg-pg-build /hypopg.tar.gz /ext-src
COPY --from=pg-hashids-pg-build /pg_hashids.tar.gz /ext-src
#COPY --from=rum-pg-build /rum.tar.gz /ext-src
COPY --from=rum-pg-build /rum.tar.gz /ext-src
COPY patches/rum.patch /ext-src
#COPY --from=pgtap-pg-build /pgtap.tar.gz /ext-src
COPY --from=ip4r-pg-build /ip4r.tar.gz /ext-src
COPY --from=prefix-pg-build /prefix.tar.gz /ext-src
@@ -945,7 +946,7 @@ COPY patches/pg_hintplan.patch /ext-src
COPY --from=pg-cron-pg-build /pg_cron.tar.gz /ext-src
COPY patches/pg_cron.patch /ext-src
#COPY --from=pg-pgx-ulid-build /home/nonroot/pgx_ulid.tar.gz /ext-src
COPY --from=rdkit-pg-build /rdkit.tar.gz /ext-src
#COPY --from=rdkit-pg-build /rdkit.tar.gz /ext-src
COPY --from=pg-uuidv7-pg-build /pg_uuidv7.tar.gz /ext-src
COPY --from=pg-roaringbitmap-pg-build /pg_roaringbitmap.tar.gz /ext-src
COPY --from=pg-semver-pg-build /pg_semver.tar.gz /ext-src
@@ -960,17 +961,18 @@ RUN cd /ext-src/ && for f in *.tar.gz; \
rm -rf $dname; mkdir $dname; tar xzf $f --strip-components=1 -C $dname \
|| exit 1; rm -f $f; done
RUN cd /ext-src/pgvector-src && patch -p1 <../pgvector.patch
RUN cd /ext-src/rum-src && patch -p1 <../rum.patch
# cmake is required for the h3 test
RUN apt-get update && apt-get install -y cmake
RUN patch -p1 < /ext-src/pg_hintplan.patch
COPY --chmod=755 docker-compose/run-tests.sh /run-tests.sh
RUN patch -p1 </ext-src/pg_anon.patch
RUN patch -p1 </ext-src/pg_cron.patch
ENV PATH=/usr/local/pgsql/bin:$PATH
ENV PGHOST=compute
ENV PGPORT=55433
ENV PGUSER=cloud_admin
ENV PGDATABASE=postgres
ENV PATH /usr/local/pgsql/bin:$PATH
ENV PGHOST compute
ENV PGPORT 55433
ENV PGUSER cloud_admin
ENV PGDATABASE postgres
#########################################################################################
#
# Final layer

View File

@@ -78,7 +78,7 @@ for pg_version in 14 15 16; do
docker cp $TMPDIR/data $COMPUTE_CONTAINER_NAME:/ext-src/pg_hint_plan-src/
rm -rf $TMPDIR
# We are running tests now
if docker exec -e SKIP=rum-src,timescaledb-src,rdkit-src,postgis-src,pgx_ulid-src,pgtap-src,pg_tiktoken-src,pg_jsonschema-src,pg_graphql-src,kq_imcx-src,wal2json_2_5-src \
if docker exec -e SKIP=timescaledb-src,rdkit-src,postgis-src,pgx_ulid-src,pgtap-src,pg_tiktoken-src,pg_jsonschema-src,pg_graphql-src,kq_imcx-src,wal2json_2_5-src \
$TEST_CONTAINER_NAME /run-tests.sh | tee testout.txt
then
cleanup

View File

@@ -1,15 +1,15 @@
#!/bin/bash
set -x
cd /ext-src
cd /ext-src || exit 2
FAILED=
LIST=$((echo ${SKIP} | sed 's/,/\n/g'; ls -d *-src) | sort | uniq -u)
LIST=$( (echo "${SKIP//","/"\n"}"; ls -d -- *-src) | sort | uniq -u)
for d in ${LIST}
do
[ -d ${d} ] || continue
[ -d "${d}" ] || continue
psql -c "select 1" >/dev/null || break
make -C ${d} installcheck || FAILED="${d} ${FAILED}"
USE_PGXS=1 make -C "${d}" installcheck || FAILED="${d} ${FAILED}"
done
[ -z "${FAILED}" ] && exit 0
echo ${FAILED}
echo "${FAILED}"
exit 1

View File

@@ -107,7 +107,10 @@ impl Key {
/// As long as Neon does not support tablespace (because of lack of access to local file system),
/// we can assume that only some predefined namespace OIDs are used which can fit in u16
pub fn to_i128(&self) -> i128 {
assert!(self.field2 <= 0xFFFF || self.field2 == 0xFFFFFFFF || self.field2 == 0x22222222);
assert!(
self.field2 <= 0xFFFF || self.field2 == 0xFFFFFFFF || self.field2 == 0x22222222,
"invalid key: {self}",
);
(((self.field1 & 0x7F) as i128) << 120)
| (((self.field2 & 0xFFFF) as i128) << 104)
| ((self.field3 as i128) << 72)

View File

@@ -947,6 +947,8 @@ pub struct TopTenantShardsResponse {
}
pub mod virtual_file {
use std::path::PathBuf;
#[derive(
Copy,
Clone,
@@ -965,6 +967,53 @@ pub mod virtual_file {
#[cfg(target_os = "linux")]
TokioEpollUring,
}
/// Direct IO modes for a pageserver.
#[derive(Debug, PartialEq, Eq, Clone, serde::Deserialize, serde::Serialize, Default)]
#[serde(tag = "mode", rename_all = "kebab-case", deny_unknown_fields)]
pub enum DirectIoMode {
/// Direct IO disabled (uses usual buffered IO).
#[default]
Disabled,
/// Direct IO disabled (performs checks and perf simulations).
Evaluate {
/// Alignment check level
alignment_check: DirectIoAlignmentCheckLevel,
/// Latency padded for performance simulation.
latency_padding: DirectIoLatencyPadding,
},
/// Direct IO enabled.
Enabled {
/// Actions to perform on alignment error.
on_alignment_error: DirectIoOnAlignmentErrorAction,
},
}
#[derive(Debug, PartialEq, Eq, Clone, serde::Deserialize, serde::Serialize, Default)]
#[serde(rename_all = "kebab-case")]
pub enum DirectIoAlignmentCheckLevel {
#[default]
Error,
Log,
None,
}
#[derive(Debug, PartialEq, Eq, Clone, serde::Deserialize, serde::Serialize, Default)]
#[serde(rename_all = "kebab-case")]
pub enum DirectIoOnAlignmentErrorAction {
Error,
#[default]
FallbackToBuffered,
}
#[derive(Debug, PartialEq, Eq, Clone, serde::Deserialize, serde::Serialize, Default)]
#[serde(tag = "type", rename_all = "kebab-case")]
pub enum DirectIoLatencyPadding {
/// Pad virtual file operations with IO to a fake file.
FakeFileRW { path: PathBuf },
#[default]
None,
}
}
// Wrapped in libpq CopyData

View File

@@ -1,6 +1,8 @@
use std::collections::HashSet;
use utils::id::TimelineId;
#[derive(Debug, Default, PartialEq, serde::Serialize, serde::Deserialize)]
pub struct AncestorDetached {
pub reparented_timelines: Vec<TimelineId>,
pub reparented_timelines: HashSet<TimelineId>,
}

View File

@@ -78,8 +78,9 @@ impl Drop for GateGuard {
}
}
#[derive(Debug)]
#[derive(Debug, thiserror::Error)]
pub enum GateError {
#[error("gate is closed")]
GateClosed,
}

View File

@@ -108,3 +108,7 @@ harness = false
[[bench]]
name = "bench_walredo"
harness = false
[[bench]]
name = "bench_ingest"
harness = false

View File

@@ -0,0 +1,239 @@
use std::{env, num::NonZeroUsize};
use bytes::Bytes;
use camino::Utf8PathBuf;
use criterion::{criterion_group, criterion_main, Criterion};
use pageserver::{
config::PageServerConf,
context::{DownloadBehavior, RequestContext},
l0_flush::{L0FlushConfig, L0FlushGlobalState},
page_cache,
repository::Value,
task_mgr::TaskKind,
tenant::storage_layer::InMemoryLayer,
virtual_file,
};
use pageserver_api::{key::Key, shard::TenantShardId};
use utils::{
bin_ser::BeSer,
id::{TenantId, TimelineId},
};
// A very cheap hash for generating non-sequential keys.
fn murmurhash32(mut h: u32) -> u32 {
h ^= h >> 16;
h = h.wrapping_mul(0x85ebca6b);
h ^= h >> 13;
h = h.wrapping_mul(0xc2b2ae35);
h ^= h >> 16;
h
}
enum KeyLayout {
/// Sequential unique keys
Sequential,
/// Random unique keys
Random,
/// Random keys, but only use the bits from the mask of them
RandomReuse(u32),
}
enum WriteDelta {
Yes,
No,
}
async fn ingest(
conf: &'static PageServerConf,
put_size: usize,
put_count: usize,
key_layout: KeyLayout,
write_delta: WriteDelta,
) -> anyhow::Result<()> {
let mut lsn = utils::lsn::Lsn(1000);
let mut key = Key::from_i128(0x0);
let timeline_id = TimelineId::generate();
let tenant_id = TenantId::generate();
let tenant_shard_id = TenantShardId::unsharded(tenant_id);
tokio::fs::create_dir_all(conf.timeline_path(&tenant_shard_id, &timeline_id)).await?;
let ctx = RequestContext::new(TaskKind::DebugTool, DownloadBehavior::Error);
let gate = utils::sync::gate::Gate::default();
let entered = gate.enter().unwrap();
let layer =
InMemoryLayer::create(conf, timeline_id, tenant_shard_id, lsn, entered, &ctx).await?;
let data = Value::Image(Bytes::from(vec![0u8; put_size])).ser()?;
let ctx = RequestContext::new(
pageserver::task_mgr::TaskKind::WalReceiverConnectionHandler,
pageserver::context::DownloadBehavior::Download,
);
for i in 0..put_count {
lsn += put_size as u64;
// Generate lots of keys within a single relation, which simulates the typical bulk ingest case: people
// usually care the most about write performance when they're blasting a huge batch of data into a huge table.
match key_layout {
KeyLayout::Sequential => {
// Use sequential order to illustrate the experience a user is likely to have
// when ingesting bulk data.
key.field6 = i as u32;
}
KeyLayout::Random => {
// Use random-order keys to avoid giving a false advantage to data structures that are
// faster when inserting on the end.
key.field6 = murmurhash32(i as u32);
}
KeyLayout::RandomReuse(mask) => {
// Use low bits only, to limit cardinality
key.field6 = murmurhash32(i as u32) & mask;
}
}
layer.put_value(key, lsn, &data, &ctx).await?;
}
layer.freeze(lsn + 1).await;
if matches!(write_delta, WriteDelta::Yes) {
let l0_flush_state = L0FlushGlobalState::new(L0FlushConfig::Direct {
max_concurrency: NonZeroUsize::new(1).unwrap(),
});
let (_desc, path) = layer
.write_to_disk(&ctx, None, l0_flush_state.inner())
.await?
.unwrap();
tokio::fs::remove_file(path).await?;
}
Ok(())
}
/// Wrapper to instantiate a tokio runtime
fn ingest_main(
conf: &'static PageServerConf,
put_size: usize,
put_count: usize,
key_layout: KeyLayout,
write_delta: WriteDelta,
) {
let runtime = tokio::runtime::Builder::new_current_thread()
.enable_all()
.build()
.unwrap();
runtime.block_on(async move {
let r = ingest(conf, put_size, put_count, key_layout, write_delta).await;
if let Err(e) = r {
panic!("{e:?}");
}
});
}
/// Declare a series of benchmarks for the Pageserver's ingest write path.
///
/// This benchmark does not include WAL decode: it starts at InMemoryLayer::put_value, and ends either
/// at freezing the ephemeral layer, or writing the ephemeral layer out to an L0 (depending on whether WriteDelta is set).
///
/// Genuine disk I/O is used, so expect results to differ depending on storage. However, when running on
/// a fast disk, CPU is the bottleneck at time of writing.
fn criterion_benchmark(c: &mut Criterion) {
let temp_dir_parent: Utf8PathBuf = env::current_dir().unwrap().try_into().unwrap();
let temp_dir = camino_tempfile::tempdir_in(temp_dir_parent).unwrap();
eprintln!("Data directory: {}", temp_dir.path());
let conf: &'static PageServerConf = Box::leak(Box::new(
pageserver::config::PageServerConf::dummy_conf(temp_dir.path().to_path_buf()),
));
virtual_file::init(16384, virtual_file::io_engine_for_bench());
page_cache::init(conf.page_cache_size);
{
let mut group = c.benchmark_group("ingest-small-values");
let put_size = 100usize;
let put_count = 128 * 1024 * 1024 / put_size;
group.throughput(criterion::Throughput::Bytes((put_size * put_count) as u64));
group.sample_size(10);
group.bench_function("ingest 128MB/100b seq", |b| {
b.iter(|| {
ingest_main(
conf,
put_size,
put_count,
KeyLayout::Sequential,
WriteDelta::Yes,
)
})
});
group.bench_function("ingest 128MB/100b rand", |b| {
b.iter(|| {
ingest_main(
conf,
put_size,
put_count,
KeyLayout::Random,
WriteDelta::Yes,
)
})
});
group.bench_function("ingest 128MB/100b rand-1024keys", |b| {
b.iter(|| {
ingest_main(
conf,
put_size,
put_count,
KeyLayout::RandomReuse(0x3ff),
WriteDelta::Yes,
)
})
});
group.bench_function("ingest 128MB/100b seq, no delta", |b| {
b.iter(|| {
ingest_main(
conf,
put_size,
put_count,
KeyLayout::Sequential,
WriteDelta::No,
)
})
});
}
{
let mut group = c.benchmark_group("ingest-big-values");
let put_size = 8192usize;
let put_count = 128 * 1024 * 1024 / put_size;
group.throughput(criterion::Throughput::Bytes((put_size * put_count) as u64));
group.sample_size(10);
group.bench_function("ingest 128MB/8k seq", |b| {
b.iter(|| {
ingest_main(
conf,
put_size,
put_count,
KeyLayout::Sequential,
WriteDelta::Yes,
)
})
});
group.bench_function("ingest 128MB/8k seq, no delta", |b| {
b.iter(|| {
ingest_main(
conf,
put_size,
put_count,
KeyLayout::Sequential,
WriteDelta::No,
)
})
});
}
}
criterion_group!(benches, criterion_benchmark);
criterion_main!(benches);

View File

@@ -123,6 +123,7 @@ fn main() -> anyhow::Result<()> {
// after setting up logging, log the effective IO engine choice and read path implementations
info!(?conf.virtual_file_io_engine, "starting with virtual_file IO engine");
info!(?conf.virtual_file_direct_io, "starting with virtual_file Direct IO settings");
info!(?conf.get_impl, "starting with get page implementation");
info!(?conf.get_vectored_impl, "starting with vectored get page implementation");
info!(?conf.compact_level0_phase1_value_access, "starting with setting for compact_level0_phase1_value_access");

View File

@@ -300,6 +300,9 @@ pub struct PageServerConf {
/// This flag is temporary and will be removed after gradual rollout.
/// See <https://github.com/neondatabase/neon/issues/8184>.
pub compact_level0_phase1_value_access: CompactL0Phase1ValueAccess,
/// Direct IO settings
pub virtual_file_direct_io: virtual_file::DirectIoMode,
}
/// We do not want to store this in a PageServerConf because the latter may be logged
@@ -408,6 +411,8 @@ struct PageServerConfigBuilder {
l0_flush: BuilderValue<L0FlushConfig>,
compact_level0_phase1_value_access: BuilderValue<CompactL0Phase1ValueAccess>,
virtual_file_direct_io: BuilderValue<virtual_file::DirectIoMode>,
}
impl PageServerConfigBuilder {
@@ -498,6 +503,7 @@ impl PageServerConfigBuilder {
ephemeral_bytes_per_memory_kb: Set(DEFAULT_EPHEMERAL_BYTES_PER_MEMORY_KB),
l0_flush: Set(L0FlushConfig::default()),
compact_level0_phase1_value_access: Set(CompactL0Phase1ValueAccess::default()),
virtual_file_direct_io: Set(virtual_file::DirectIoMode::default()),
}
}
}
@@ -685,6 +691,10 @@ impl PageServerConfigBuilder {
self.compact_level0_phase1_value_access = BuilderValue::Set(value);
}
pub fn virtual_file_direct_io(&mut self, value: virtual_file::DirectIoMode) {
self.virtual_file_direct_io = BuilderValue::Set(value);
}
pub fn build(self, id: NodeId) -> anyhow::Result<PageServerConf> {
let default = Self::default_values();
@@ -743,6 +753,7 @@ impl PageServerConfigBuilder {
ephemeral_bytes_per_memory_kb,
l0_flush,
compact_level0_phase1_value_access,
virtual_file_direct_io,
}
CUSTOM LOGIC
{
@@ -1018,6 +1029,9 @@ impl PageServerConf {
"compact_level0_phase1_value_access" => {
builder.compact_level0_phase1_value_access(utils::toml_edit_ext::deserialize_item(item).context("compact_level0_phase1_value_access")?)
}
"virtual_file_direct_io" => {
builder.virtual_file_direct_io(utils::toml_edit_ext::deserialize_item(item).context("virtual_file_direct_io")?)
}
_ => bail!("unrecognized pageserver option '{key}'"),
}
}
@@ -1103,6 +1117,7 @@ impl PageServerConf {
ephemeral_bytes_per_memory_kb: defaults::DEFAULT_EPHEMERAL_BYTES_PER_MEMORY_KB,
l0_flush: L0FlushConfig::default(),
compact_level0_phase1_value_access: CompactL0Phase1ValueAccess::default(),
virtual_file_direct_io: virtual_file::DirectIoMode::default(),
}
}
}
@@ -1345,6 +1360,7 @@ background_task_maximum_delay = '334 s'
ephemeral_bytes_per_memory_kb: defaults::DEFAULT_EPHEMERAL_BYTES_PER_MEMORY_KB,
l0_flush: L0FlushConfig::default(),
compact_level0_phase1_value_access: CompactL0Phase1ValueAccess::default(),
virtual_file_direct_io: virtual_file::DirectIoMode::default(),
},
"Correct defaults should be used when no config values are provided"
);
@@ -1420,6 +1436,7 @@ background_task_maximum_delay = '334 s'
ephemeral_bytes_per_memory_kb: defaults::DEFAULT_EPHEMERAL_BYTES_PER_MEMORY_KB,
l0_flush: L0FlushConfig::default(),
compact_level0_phase1_value_access: CompactL0Phase1ValueAccess::default(),
virtual_file_direct_io: virtual_file::DirectIoMode::default(),
},
"Should be able to parse all basic config values correctly"
);

View File

@@ -932,7 +932,7 @@ components:
description: Whether to poll remote storage for layers to download. If false, secondary locations don't download anything.
ArchivalConfigRequest:
type: object
required
required:
- state
properties:
state:

View File

@@ -1162,7 +1162,10 @@ async fn layer_map_info_handler(
let timeline =
active_timeline_of_active_tenant(&state.tenant_manager, tenant_shard_id, timeline_id)
.await?;
let layer_map_info = timeline.layer_map_info(reset).await;
let layer_map_info = timeline
.layer_map_info(reset)
.await
.map_err(|_shutdown| ApiError::ShuttingDown)?;
json_response(StatusCode::OK, layer_map_info)
}

View File

@@ -24,7 +24,7 @@ impl Default for L0FlushConfig {
#[derive(Clone)]
pub struct L0FlushGlobalState(Arc<Inner>);
pub(crate) enum Inner {
pub enum Inner {
PageCached,
Direct { semaphore: tokio::sync::Semaphore },
}
@@ -40,7 +40,7 @@ impl L0FlushGlobalState {
}
}
pub(crate) fn inner(&self) -> &Arc<Inner> {
pub fn inner(&self) -> &Arc<Inner> {
&self.0
}
}

View File

@@ -122,16 +122,19 @@ impl Listener {
}
}
impl Connections {
pub async fn shutdown(self) {
pub(crate) async fn shutdown(self) {
let Self { cancel, mut tasks } = self;
cancel.cancel();
while let Some(res) = tasks.join_next().await {
// the logging done here mimics what was formerly done by task_mgr
match res {
Ok(Ok(())) => {}
Ok(Err(e)) => error!("error in page_service connection task: {:?}", e),
Err(e) => error!("page_service connection task panicked: {:?}", e),
}
Self::handle_connection_completion(res);
}
}
fn handle_connection_completion(res: Result<anyhow::Result<()>, tokio::task::JoinError>) {
match res {
Ok(Ok(())) => {}
Ok(Err(e)) => error!("error in page_service connection task: {:?}", e),
Err(e) => error!("page_service connection task panicked: {:?}", e),
}
}
}
@@ -155,20 +158,19 @@ pub async fn libpq_listener_main(
let connections_cancel = CancellationToken::new();
let mut connection_handler_tasks = tokio::task::JoinSet::default();
// Wait for a new connection to arrive, or for server shutdown.
while let Some(res) = tokio::select! {
biased;
loop {
let accepted = tokio::select! {
biased;
_ = listener_cancel.cancelled() => break,
next = connection_handler_tasks.join_next(), if !connection_handler_tasks.is_empty() => {
let res = next.expect("we dont poll while empty");
Connections::handle_connection_completion(res);
continue;
}
accepted = listener.accept() => accepted,
};
_ = listener_cancel.cancelled() => {
// We were requested to shut down.
None
}
res = listener.accept() => {
Some(res)
}
} {
match res {
match accepted {
Ok((socket, peer_addr)) => {
// Connection established. Spawn a new task to handle it.
debug!("accepted connection from {}", peer_addr);

View File

@@ -56,7 +56,6 @@ impl Statvfs {
}
pub mod mock {
use anyhow::Context;
use camino::Utf8Path;
use regex::Regex;
use tracing::log::info;
@@ -135,14 +134,30 @@ pub mod mock {
{
continue;
}
total += entry
.metadata()
.with_context(|| format!("get metadata of {:?}", entry.path()))?
.len();
let m = match entry.metadata() {
Ok(m) => m,
Err(e) if is_not_found(&e) => {
// some temp file which got removed right as we are walking
continue;
}
Err(e) => {
return Err(anyhow::Error::new(e)
.context(format!("get metadata of {:?}", entry.path())))
}
};
total += m.len();
}
Ok(total)
}
fn is_not_found(e: &walkdir::Error) -> bool {
let Some(io_error) = e.io_error() else {
return false;
};
let kind = io_error.kind();
matches!(kind, std::io::ErrorKind::NotFound)
}
pub struct Statvfs {
pub blocks: u64,
pub blocks_available: u64,

View File

@@ -601,6 +601,12 @@ impl From<PageReconstructError> for GcError {
}
}
impl From<timeline::layer_manager::Shutdown> for GcError {
fn from(_: timeline::layer_manager::Shutdown) -> Self {
GcError::TimelineCancelled
}
}
#[derive(thiserror::Error, Debug)]
pub(crate) enum LoadConfigError {
#[error("TOML deserialization error: '{0}'")]
@@ -710,6 +716,7 @@ impl Tenant {
.read()
.await
.layer_map()
.expect("currently loading, layer manager cannot be shutdown already")
.iter_historic_layers()
.next()
.is_some(),
@@ -3005,54 +3012,6 @@ impl Tenant {
// because that will stall branch creation.
let gc_cs = self.gc_cs.lock().await;
// Paranoia check: it is critical that GcInfo's list of child timelines is correct, to avoid incorrectly GC'ing data they
// depend on. So although GcInfo is updated continuously by Timeline::new and Timeline::drop, we also calculate it here
// and fail out if it's inaccurate.
// (this can be removed later, it's a risk mitigation for https://github.com/neondatabase/neon/pull/8427)
{
let mut all_branchpoints: BTreeMap<TimelineId, Vec<(Lsn, TimelineId)>> =
BTreeMap::new();
timelines.iter().for_each(|timeline| {
if let Some(ancestor_timeline_id) = &timeline.get_ancestor_timeline_id() {
let ancestor_children =
all_branchpoints.entry(*ancestor_timeline_id).or_default();
ancestor_children.push((timeline.get_ancestor_lsn(), timeline.timeline_id));
}
});
for timeline in &timelines {
let mut branchpoints: Vec<(Lsn, TimelineId)> = all_branchpoints
.remove(&timeline.timeline_id)
.unwrap_or_default();
branchpoints.sort_by_key(|b| b.0);
let target = timeline.gc_info.read().unwrap();
// We require that retain_lsns contains everything in `branchpoints`, but not that
// they are exactly equal: timeline deletions can race with us, so retain_lsns
// may contain some extra stuff. It is safe to have extra timelines in there, because it
// just means that we retain slightly more data than we otherwise might.
let have_branchpoints = target.retain_lsns.iter().copied().collect::<HashSet<_>>();
for b in &branchpoints {
if !have_branchpoints.contains(b) {
tracing::error!(
"Bug: `retain_lsns` is set incorrectly. Expected be {:?}, but found {:?}",
branchpoints,
target.retain_lsns
);
debug_assert!(false);
// Do not GC based on bad information!
// (ab-use an existing GcError type rather than adding a new one, since this is a
// "should never happen" check that will be removed soon).
return Err(GcError::Remote(anyhow::anyhow!(
"retain_lsns failed validation!"
)));
}
}
}
}
// Ok, we now know all the branch points.
// Update the GC information for each timeline.
let mut gc_timelines = Vec::with_capacity(timelines.len());
@@ -4674,10 +4633,10 @@ mod tests {
let layer_map = tline.layers.read().await;
let level0_deltas = layer_map
.layer_map()
.get_level0_deltas()
.into_iter()
.map(|desc| layer_map.get_from_desc(&desc))
.layer_map()?
.level0_deltas()
.iter()
.map(|desc| layer_map.get_from_desc(desc))
.collect::<Vec<_>>();
assert!(!level0_deltas.is_empty());
@@ -4908,11 +4867,13 @@ mod tests {
let inserted = bulk_insert_compact_gc(&tenant, &tline, &ctx, lsn, 50, 10000).await?;
let guard = tline.layers.read().await;
guard.layer_map().dump(true, &ctx).await?;
let lm = guard.layer_map()?;
lm.dump(true, &ctx).await?;
let mut reads = Vec::new();
let mut prev = None;
guard.layer_map().iter_historic_layers().for_each(|desc| {
lm.iter_historic_layers().for_each(|desc| {
if !desc.is_delta() {
prev = Some(desc.clone());
return;
@@ -5918,23 +5879,12 @@ mod tests {
tline.freeze_and_flush().await?; // force create a delta layer
}
let before_num_l0_delta_files = tline
.layers
.read()
.await
.layer_map()
.get_level0_deltas()
.len();
let before_num_l0_delta_files =
tline.layers.read().await.layer_map()?.level0_deltas().len();
tline.compact(&cancel, EnumSet::empty(), &ctx).await?;
let after_num_l0_delta_files = tline
.layers
.read()
.await
.layer_map()
.get_level0_deltas()
.len();
let after_num_l0_delta_files = tline.layers.read().await.layer_map()?.level0_deltas().len();
assert!(after_num_l0_delta_files < before_num_l0_delta_files, "after_num_l0_delta_files={after_num_l0_delta_files}, before_num_l0_delta_files={before_num_l0_delta_files}");

View File

@@ -29,6 +29,7 @@ impl EphemeralFile {
conf: &PageServerConf,
tenant_shard_id: TenantShardId,
timeline_id: TimelineId,
gate_guard: utils::sync::gate::GateGuard,
ctx: &RequestContext,
) -> Result<EphemeralFile, io::Error> {
static NEXT_FILENAME: AtomicU64 = AtomicU64::new(1);
@@ -51,10 +52,12 @@ impl EphemeralFile {
)
.await?;
let prewarm = conf.l0_flush.prewarm_on_write();
Ok(EphemeralFile {
_tenant_shard_id: tenant_shard_id,
_timeline_id: timeline_id,
rw: page_caching::RW::new(file, conf.l0_flush.prewarm_on_write()),
rw: page_caching::RW::new(file, prewarm, gate_guard),
})
}
@@ -161,7 +164,11 @@ mod tests {
async fn test_ephemeral_blobs() -> Result<(), io::Error> {
let (conf, tenant_id, timeline_id, ctx) = harness("ephemeral_blobs")?;
let mut file = EphemeralFile::create(conf, tenant_id, timeline_id, &ctx).await?;
let gate = utils::sync::gate::Gate::default();
let entered = gate.enter().unwrap();
let mut file = EphemeralFile::create(conf, tenant_id, timeline_id, entered, &ctx).await?;
let pos_foo = file.write_blob(b"foo", &ctx).await?;
assert_eq!(
@@ -215,4 +222,38 @@ mod tests {
Ok(())
}
#[tokio::test]
async fn ephemeral_file_holds_gate_open() {
const FOREVER: std::time::Duration = std::time::Duration::from_secs(5);
let (conf, tenant_id, timeline_id, ctx) =
harness("ephemeral_file_holds_gate_open").unwrap();
let gate = utils::sync::gate::Gate::default();
let file = EphemeralFile::create(conf, tenant_id, timeline_id, gate.enter().unwrap(), &ctx)
.await
.unwrap();
let mut closing = tokio::task::spawn(async move {
gate.close().await;
});
// gate is entered until the ephemeral file is dropped
// do not start paused tokio-epoll-uring has a sleep loop
tokio::time::pause();
tokio::time::timeout(FOREVER, &mut closing)
.await
.expect_err("closing cannot complete before dropping");
// this is a requirement of the reset_tenant functionality: we have to be able to restart a
// tenant fast, and for that, we need all tenant_dir operations be guarded by entering a gate
drop(file);
tokio::time::timeout(FOREVER, &mut closing)
.await
.expect("closing completes right away")
.expect("closing does not panic");
}
}

View File

@@ -18,6 +18,8 @@ use super::zero_padded_read_write;
pub struct RW {
page_cache_file_id: page_cache::FileId,
rw: super::zero_padded_read_write::RW<PreWarmingWriter>,
/// Gate guard is held on as long as we need to do operations in the path (delete on drop).
_gate_guard: utils::sync::gate::GateGuard,
}
/// When we flush a block to the underlying [`crate::virtual_file::VirtualFile`],
@@ -29,7 +31,11 @@ pub enum PrewarmOnWrite {
}
impl RW {
pub fn new(file: VirtualFile, prewarm_on_write: PrewarmOnWrite) -> Self {
pub fn new(
file: VirtualFile,
prewarm_on_write: PrewarmOnWrite,
_gate_guard: utils::sync::gate::GateGuard,
) -> Self {
let page_cache_file_id = page_cache::next_file_id();
Self {
page_cache_file_id,
@@ -38,6 +44,7 @@ impl RW {
file,
prewarm_on_write,
)),
_gate_guard,
}
}
@@ -145,6 +152,7 @@ impl Drop for RW {
// We leave them there, [`crate::page_cache::PageCache::find_victim`] will evict them when needed.
// unlink the file
// we are clear to do this, because we have entered a gate
let res = std::fs::remove_file(&self.rw.as_writer().file.path);
if let Err(e) = res {
if e.kind() != std::io::ErrorKind::NotFound {

View File

@@ -846,8 +846,8 @@ impl LayerMap {
}
/// Return all L0 delta layers
pub fn get_level0_deltas(&self) -> Vec<Arc<PersistentLayerDesc>> {
self.l0_delta_layers.to_vec()
pub fn level0_deltas(&self) -> &Vec<Arc<PersistentLayerDesc>> {
&self.l0_delta_layers
}
/// debugging function to print out the contents of the layer map

View File

@@ -13,7 +13,7 @@ use pageserver_api::upcall_api::ReAttachResponseTenant;
use rand::{distributions::Alphanumeric, Rng};
use std::borrow::Cow;
use std::cmp::Ordering;
use std::collections::{BTreeMap, HashMap};
use std::collections::{BTreeMap, HashMap, HashSet};
use std::ops::Deref;
use std::sync::Arc;
use std::time::Duration;
@@ -224,21 +224,8 @@ async fn safe_rename_tenant_dir(path: impl AsRef<Utf8Path>) -> std::io::Result<U
}
/// See [`Self::spawn`].
#[derive(Clone)]
pub struct BackgroundPurges(Arc<std::sync::Mutex<BackgroundPurgesInner>>);
enum BackgroundPurgesInner {
Open(tokio::task::JoinSet<()>),
// we use the async mutex for coalescing
ShuttingDown(Arc<tokio::sync::Mutex<tokio::task::JoinSet<()>>>),
}
impl Default for BackgroundPurges {
fn default() -> Self {
Self(Arc::new(std::sync::Mutex::new(
BackgroundPurgesInner::Open(JoinSet::new()),
)))
}
}
#[derive(Clone, Default)]
pub struct BackgroundPurges(tokio_util::task::TaskTracker);
impl BackgroundPurges {
/// When we have moved a tenant's content to a temporary directory, we may delete it lazily in
@@ -247,24 +234,32 @@ impl BackgroundPurges {
/// Although we are cleaning up the tenant, this task is not meant to be bound by the lifetime of the tenant in memory.
/// Thus the [`BackgroundPurges`] type to keep track of these tasks.
pub fn spawn(&self, tmp_path: Utf8PathBuf) {
let mut guard = self.0.lock().unwrap();
let jset = match &mut *guard {
BackgroundPurgesInner::Open(ref mut jset) => jset,
BackgroundPurgesInner::ShuttingDown(_) => {
warn!("trying to spawn background purge during shutdown, ignoring");
return;
// because on shutdown we close and wait, we are misusing TaskTracker a bit.
//
// so first acquire a token, then check if the tracker has been closed. the tracker might get closed
// right after, but at least the shutdown will wait for what we are spawning next.
let token = self.0.token();
if self.0.is_closed() {
warn!(
%tmp_path,
"trying to spawn background purge during shutdown, ignoring"
);
return;
}
let span = info_span!(parent: None, "background_purge", %tmp_path);
let task = move || {
let _token = token;
let _entered = span.entered();
if let Err(error) = std::fs::remove_dir_all(tmp_path.as_path()) {
// should we fatal_io_error here?
warn!(%error, "failed to purge tenant directory");
}
};
jset.spawn_on(
async move {
if let Err(error) = fs::remove_dir_all(tmp_path.as_path()).await {
// should we fatal_io_error here?
warn!(%error, path=%tmp_path, "failed to purge tenant directory");
}
}
.instrument(info_span!(parent: None, "background_purge")),
BACKGROUND_RUNTIME.handle(),
);
BACKGROUND_RUNTIME.spawn_blocking(task);
}
/// When this future completes, all background purges have completed.
@@ -278,42 +273,9 @@ impl BackgroundPurges {
/// instances of this future will continue to be correct.
#[instrument(skip_all)]
pub async fn shutdown(&self) {
let jset = {
let mut guard = self.0.lock().unwrap();
match &mut *guard {
BackgroundPurgesInner::Open(jset) => {
*guard = BackgroundPurgesInner::ShuttingDown(Arc::new(tokio::sync::Mutex::new(
std::mem::take(jset),
)))
}
BackgroundPurgesInner::ShuttingDown(_) => {
// calling shutdown multiple times is most likely a bug in pageserver shutdown code
warn!("already shutting down");
}
};
match &mut *guard {
BackgroundPurgesInner::ShuttingDown(ref mut jset) => jset.clone(),
BackgroundPurgesInner::Open(_) => {
unreachable!("above code transitions into shut down state");
}
}
};
let mut jset = jset.lock().await; // concurrent callers coalesce here
while let Some(res) = jset.join_next().await {
match res {
Ok(()) => {}
Err(e) if e.is_panic() => {
// If it panicked, the error is already logged by the panic hook.
}
Err(e) if e.is_cancelled() => {
unreachable!("we don't cancel the joinset or runtime")
}
Err(e) => {
// No idea when this can happen, but let's log it.
warn!(%e, "background purge task failed or panicked");
}
}
}
// forbid new tasks (can be called many times)
self.0.close();
self.0.wait().await;
}
}
@@ -1767,14 +1729,9 @@ impl TenantManager {
let parent_timelines = timelines.keys().cloned().collect::<Vec<_>>();
for timeline in timelines.values() {
tracing::info!(timeline_id=%timeline.timeline_id, "Loading list of layers to hardlink");
let timeline_layers = timeline
.layers
.read()
.await
.likely_resident_layers()
.collect::<Vec<_>>();
let layers = timeline.layers.read().await;
for layer in timeline_layers {
for layer in layers.likely_resident_layers() {
let relative_path = layer
.local_path()
.strip_prefix(&parent_path)
@@ -1971,7 +1928,8 @@ impl TenantManager {
timeline_id: TimelineId,
prepared: PreparedTimelineDetach,
ctx: &RequestContext,
) -> Result<Vec<TimelineId>, anyhow::Error> {
) -> Result<HashSet<TimelineId>, anyhow::Error> {
// FIXME: this is unnecessary, slotguard already has these semantics
struct RevertOnDropSlot(Option<SlotGuard>);
impl Drop for RevertOnDropSlot {

View File

@@ -539,19 +539,25 @@ impl LayerAccessStats {
self.record_residence_event_at(SystemTime::now())
}
pub(crate) fn record_access_at(&self, now: SystemTime) {
fn record_access_at(&self, now: SystemTime) -> bool {
let (mut mask, mut value) = Self::to_low_res_timestamp(Self::ATIME_SHIFT, now);
// A layer which is accessed must be visible.
mask |= 0x1 << Self::VISIBILITY_SHIFT;
value |= 0x1 << Self::VISIBILITY_SHIFT;
self.write_bits(mask, value);
let old_bits = self.write_bits(mask, value);
!matches!(
self.decode_visibility(old_bits),
LayerVisibilityHint::Visible
)
}
pub(crate) fn record_access(&self, ctx: &RequestContext) {
/// Returns true if we modified the layer's visibility to set it to Visible implicitly
/// as a result of this access
pub(crate) fn record_access(&self, ctx: &RequestContext) -> bool {
if ctx.access_stats_behavior() == AccessStatsBehavior::Skip {
return;
return false;
}
self.record_access_at(SystemTime::now())

View File

@@ -36,13 +36,12 @@ use crate::tenant::block_io::{BlockBuf, BlockCursor, BlockLease, BlockReader, Fi
use crate::tenant::disk_btree::{
DiskBtreeBuilder, DiskBtreeIterator, DiskBtreeReader, VisitDirection,
};
use crate::tenant::storage_layer::Layer;
use crate::tenant::timeline::GetVectoredError;
use crate::tenant::vectored_blob_io::{
BlobFlag, MaxVectoredReadBytes, StreamingVectoredReadPlanner, VectoredBlobReader, VectoredRead,
VectoredReadPlanner,
};
use crate::tenant::{PageReconstructError, Timeline};
use crate::tenant::PageReconstructError;
use crate::virtual_file::{self, VirtualFile};
use crate::{walrecord, TEMP_FILE_SUFFIX};
use crate::{DELTA_FILE_MAGIC, STORAGE_FORMAT_VERSION};
@@ -72,10 +71,7 @@ use utils::{
lsn::Lsn,
};
use super::{
AsLayerDesc, LayerAccessStats, LayerName, PersistentLayerDesc, ResidentLayer,
ValuesReconstructState,
};
use super::{AsLayerDesc, LayerName, PersistentLayerDesc, ValuesReconstructState};
///
/// Header stored in the beginning of the file
@@ -200,7 +196,6 @@ impl DeltaKey {
pub struct DeltaLayer {
path: Utf8PathBuf,
pub desc: PersistentLayerDesc,
access_stats: LayerAccessStats,
inner: OnceCell<Arc<DeltaLayerInner>>,
}
@@ -299,7 +294,6 @@ impl DeltaLayer {
/// not loaded already.
///
async fn load(&self, ctx: &RequestContext) -> Result<&Arc<DeltaLayerInner>> {
self.access_stats.record_access(ctx);
// Quick exit if already loaded
self.inner
.get_or_try_init(|| self.load_inner(ctx))
@@ -350,7 +344,6 @@ impl DeltaLayer {
summary.lsn_range,
metadata.len(),
),
access_stats: Default::default(),
inner: OnceCell::new(),
})
}
@@ -373,7 +366,6 @@ impl DeltaLayer {
/// 3. Call `finish`.
///
struct DeltaLayerWriterInner {
conf: &'static PageServerConf,
pub path: Utf8PathBuf,
timeline_id: TimelineId,
tenant_shard_id: TenantShardId,
@@ -420,7 +412,6 @@ impl DeltaLayerWriterInner {
let tree_builder = DiskBtreeBuilder::new(block_buf);
Ok(Self {
conf,
path,
timeline_id,
tenant_shard_id,
@@ -495,11 +486,10 @@ impl DeltaLayerWriterInner {
async fn finish(
self,
key_end: Key,
timeline: &Arc<Timeline>,
ctx: &RequestContext,
) -> anyhow::Result<ResidentLayer> {
) -> anyhow::Result<(PersistentLayerDesc, Utf8PathBuf)> {
let temp_path = self.path.clone();
let result = self.finish0(key_end, timeline, ctx).await;
let result = self.finish0(key_end, ctx).await;
if result.is_err() {
tracing::info!(%temp_path, "cleaning up temporary file after error during writing");
if let Err(e) = std::fs::remove_file(&temp_path) {
@@ -512,9 +502,8 @@ impl DeltaLayerWriterInner {
async fn finish0(
self,
key_end: Key,
timeline: &Arc<Timeline>,
ctx: &RequestContext,
) -> anyhow::Result<ResidentLayer> {
) -> anyhow::Result<(PersistentLayerDesc, Utf8PathBuf)> {
let index_start_blk =
((self.blob_writer.size() + PAGE_SZ as u64 - 1) / PAGE_SZ as u64) as u32;
@@ -579,11 +568,9 @@ impl DeltaLayerWriterInner {
// fsync the file
file.sync_all().await?;
let layer = Layer::finish_creating(self.conf, timeline, desc, &self.path)?;
trace!("created delta layer {}", self.path);
trace!("created delta layer {}", layer.local_path());
Ok(layer)
Ok((desc, self.path))
}
}
@@ -684,14 +671,9 @@ impl DeltaLayerWriter {
pub(crate) async fn finish(
mut self,
key_end: Key,
timeline: &Arc<Timeline>,
ctx: &RequestContext,
) -> anyhow::Result<ResidentLayer> {
self.inner
.take()
.unwrap()
.finish(key_end, timeline, ctx)
.await
) -> anyhow::Result<(PersistentLayerDesc, Utf8PathBuf)> {
self.inner.take().unwrap().finish(key_end, ctx).await
}
#[cfg(test)]
@@ -1598,8 +1580,9 @@ pub(crate) mod test {
use super::*;
use crate::repository::Value;
use crate::tenant::harness::TIMELINE_ID;
use crate::tenant::storage_layer::{Layer, ResidentLayer};
use crate::tenant::vectored_blob_io::StreamingVectoredReadPlanner;
use crate::tenant::Tenant;
use crate::tenant::{Tenant, Timeline};
use crate::{
context::DownloadBehavior,
task_mgr::TaskKind,
@@ -1893,9 +1876,8 @@ pub(crate) mod test {
res?;
}
let resident = writer
.finish(entries_meta.key_range.end, &timeline, &ctx)
.await?;
let (desc, path) = writer.finish(entries_meta.key_range.end, &ctx).await?;
let resident = Layer::finish_creating(harness.conf, &timeline, desc, &path)?;
let inner = resident.get_as_delta(&ctx).await?;
@@ -1975,6 +1957,7 @@ pub(crate) mod test {
.await
.likely_resident_layers()
.next()
.cloned()
.unwrap();
{
@@ -2049,7 +2032,8 @@ pub(crate) mod test {
.read()
.await
.likely_resident_layers()
.find(|x| x != &initdb_layer)
.find(|&x| x != &initdb_layer)
.cloned()
.unwrap();
// create a copy for the timeline, so we don't overwrite the file
@@ -2084,7 +2068,8 @@ pub(crate) mod test {
.await
.unwrap();
let copied_layer = writer.finish(Key::MAX, &branch, ctx).await.unwrap();
let (desc, path) = writer.finish(Key::MAX, ctx).await.unwrap();
let copied_layer = Layer::finish_creating(tenant.conf, &branch, desc, &path).unwrap();
copied_layer.get_as_delta(ctx).await.unwrap();
@@ -2212,7 +2197,9 @@ pub(crate) mod test {
for (key, lsn, value) in deltas {
writer.put_value(key, lsn, value, ctx).await?;
}
let delta_layer = writer.finish(key_end, tline, ctx).await?;
let (desc, path) = writer.finish(key_end, ctx).await?;
let delta_layer = Layer::finish_creating(tenant.conf, tline, desc, &path)?;
Ok::<_, anyhow::Error>(delta_layer)
}

View File

@@ -32,7 +32,6 @@ use crate::tenant::block_io::{BlockBuf, BlockReader, FileBlockReader};
use crate::tenant::disk_btree::{
DiskBtreeBuilder, DiskBtreeIterator, DiskBtreeReader, VisitDirection,
};
use crate::tenant::storage_layer::LayerAccessStats;
use crate::tenant::timeline::GetVectoredError;
use crate::tenant::vectored_blob_io::{
BlobFlag, MaxVectoredReadBytes, StreamingVectoredReadPlanner, VectoredBlobReader, VectoredRead,
@@ -135,7 +134,6 @@ pub struct ImageLayer {
pub desc: PersistentLayerDesc,
// This entry contains an image of all pages as of this LSN, should be the same as desc.lsn
pub lsn: Lsn,
access_stats: LayerAccessStats,
inner: OnceCell<ImageLayerInner>,
}
@@ -253,7 +251,6 @@ impl ImageLayer {
/// not loaded already.
///
async fn load(&self, ctx: &RequestContext) -> Result<&ImageLayerInner> {
self.access_stats.record_access(ctx);
self.inner
.get_or_try_init(|| self.load_inner(ctx))
.await
@@ -304,7 +301,6 @@ impl ImageLayer {
metadata.len(),
), // Now we assume image layer ALWAYS covers the full range. This may change in the future.
lsn: summary.lsn,
access_stats: Default::default(),
inner: OnceCell::new(),
})
}

View File

@@ -11,9 +11,10 @@ use crate::repository::{Key, Value};
use crate::tenant::block_io::{BlockCursor, BlockReader, BlockReaderRef};
use crate::tenant::ephemeral_file::EphemeralFile;
use crate::tenant::timeline::GetVectoredError;
use crate::tenant::{PageReconstructError, Timeline};
use crate::tenant::PageReconstructError;
use crate::{l0_flush, page_cache, walrecord};
use anyhow::{anyhow, Result};
use camino::Utf8PathBuf;
use pageserver_api::keyspace::KeySpace;
use pageserver_api::models::InMemoryLayerInfo;
use pageserver_api::shard::TenantShardId;
@@ -32,7 +33,9 @@ use std::sync::atomic::Ordering as AtomicOrdering;
use std::sync::atomic::{AtomicU64, AtomicUsize};
use tokio::sync::{RwLock, RwLockWriteGuard};
use super::{DeltaLayerWriter, ResidentLayer, ValueReconstructSituation, ValuesReconstructState};
use super::{
DeltaLayerWriter, PersistentLayerDesc, ValueReconstructSituation, ValuesReconstructState,
};
#[derive(Debug, PartialEq, Eq, Clone, Copy, Hash)]
pub(crate) struct InMemoryLayerFileId(page_cache::FileId);
@@ -382,11 +385,13 @@ impl InMemoryLayer {
timeline_id: TimelineId,
tenant_shard_id: TenantShardId,
start_lsn: Lsn,
gate_guard: utils::sync::gate::GateGuard,
ctx: &RequestContext,
) -> Result<InMemoryLayer> {
trace!("initializing new empty InMemoryLayer for writing on timeline {timeline_id} at {start_lsn}");
let file = EphemeralFile::create(conf, tenant_shard_id, timeline_id, ctx).await?;
let file =
EphemeralFile::create(conf, tenant_shard_id, timeline_id, gate_guard, ctx).await?;
let key = InMemoryLayerFileId(file.page_cache_file_id());
Ok(InMemoryLayer {
@@ -410,8 +415,7 @@ impl InMemoryLayer {
/// Common subroutine of the public put_wal_record() and put_page_image() functions.
/// Adds the page version to the in-memory tree
pub(crate) async fn put_value(
pub async fn put_value(
&self,
key: Key,
lsn: Lsn,
@@ -476,8 +480,6 @@ impl InMemoryLayer {
/// Records the end_lsn for non-dropped layers.
/// `end_lsn` is exclusive
pub async fn freeze(&self, end_lsn: Lsn) {
let inner = self.inner.write().await;
assert!(
self.start_lsn < end_lsn,
"{} >= {}",
@@ -495,9 +497,13 @@ impl InMemoryLayer {
})
.expect("frozen_local_path_str set only once");
for vec_map in inner.index.values() {
for (lsn, _pos) in vec_map.as_slice() {
assert!(*lsn < end_lsn);
#[cfg(debug_assertions)]
{
let inner = self.inner.write().await;
for vec_map in inner.index.values() {
for (lsn, _pos) in vec_map.as_slice() {
assert!(*lsn < end_lsn);
}
}
}
}
@@ -507,12 +513,12 @@ impl InMemoryLayer {
/// if there are no matching keys.
///
/// Returns a new delta layer with all the same data as this in-memory layer
pub(crate) async fn write_to_disk(
pub async fn write_to_disk(
&self,
timeline: &Arc<Timeline>,
ctx: &RequestContext,
key_range: Option<Range<Key>>,
) -> Result<Option<ResidentLayer>> {
l0_flush_global_state: &l0_flush::Inner,
) -> Result<Option<(PersistentLayerDesc, Utf8PathBuf)>> {
// Grab the lock in read-mode. We hold it over the I/O, but because this
// layer is not writeable anymore, no one should be trying to acquire the
// write lock on it, so we shouldn't block anyone. There's one exception
@@ -524,9 +530,8 @@ impl InMemoryLayer {
// rare though, so we just accept the potential latency hit for now.
let inner = self.inner.read().await;
let l0_flush_global_state = timeline.l0_flush_global_state.inner().clone();
use l0_flush::Inner;
let _concurrency_permit = match &*l0_flush_global_state {
let _concurrency_permit = match l0_flush_global_state {
Inner::PageCached => None,
Inner::Direct { semaphore, .. } => Some(semaphore.acquire().await),
};
@@ -556,7 +561,7 @@ impl InMemoryLayer {
)
.await?;
match &*l0_flush_global_state {
match l0_flush_global_state {
l0_flush::Inner::PageCached => {
let ctx = RequestContextBuilder::extend(ctx)
.page_content_kind(PageContentKind::InMemoryLayer)
@@ -621,7 +626,7 @@ impl InMemoryLayer {
}
// MAX is used here because we identify L0 layers by full key range
let delta_layer = delta_layer_writer.finish(Key::MAX, timeline, ctx).await?;
let (desc, path) = delta_layer_writer.finish(Key::MAX, ctx).await?;
// Hold the permit until all the IO is done, including the fsync in `delta_layer_writer.finish()``.
//
@@ -633,6 +638,6 @@ impl InMemoryLayer {
// we dirtied when writing to the filesystem have been flushed and marked !dirty.
drop(_concurrency_permit);
Ok(Some(delta_layer))
Ok(Some((desc, path)))
}
}

View File

@@ -316,7 +316,7 @@ impl Layer {
other => GetVectoredError::Other(anyhow::anyhow!(other)),
})?;
self.0.access_stats.record_access(ctx);
self.record_access(ctx);
layer
.get_values_reconstruct_data(keyspace, lsn_range, reconstruct_data, &self.0, ctx)
@@ -396,8 +396,12 @@ impl Layer {
self.0.info(reset)
}
pub(crate) fn access_stats(&self) -> &LayerAccessStats {
&self.0.access_stats
pub(crate) fn latest_activity(&self) -> SystemTime {
self.0.access_stats.latest_activity()
}
pub(crate) fn visibility(&self) -> LayerVisibilityHint {
self.0.access_stats.visibility()
}
pub(crate) fn local_path(&self) -> &Utf8Path {
@@ -447,13 +451,31 @@ impl Layer {
}
}
fn record_access(&self, ctx: &RequestContext) {
if self.0.access_stats.record_access(ctx) {
// Visibility was modified to Visible
tracing::info!(
"Layer {} became visible as a result of access",
self.0.desc.key()
);
if let Some(tl) = self.0.timeline.upgrade() {
tl.metrics
.visible_physical_size_gauge
.add(self.0.desc.file_size)
}
}
}
pub(crate) fn set_visibility(&self, visibility: LayerVisibilityHint) {
let old_visibility = self.access_stats().set_visibility(visibility.clone());
let old_visibility = self.0.access_stats.set_visibility(visibility.clone());
use LayerVisibilityHint::*;
match (old_visibility, visibility) {
(Visible, Covered) => {
// Subtract this layer's contribution to the visible size metric
if let Some(tl) = self.0.timeline.upgrade() {
debug_assert!(
tl.metrics.visible_physical_size_gauge.get() >= self.0.desc.file_size
);
tl.metrics
.visible_physical_size_gauge
.sub(self.0.desc.file_size)
@@ -671,6 +693,9 @@ impl Drop for LayerInner {
}
if matches!(self.access_stats.visibility(), LayerVisibilityHint::Visible) {
debug_assert!(
timeline.metrics.visible_physical_size_gauge.get() >= self.desc.file_size
);
timeline
.metrics
.visible_physical_size_gauge
@@ -1810,7 +1835,7 @@ impl ResidentLayer {
// this is valid because the DownloadedLayer::kind is a OnceCell, not a
// Mutex<OnceCell>, so we cannot go and deinitialize the value with OnceCell::take
// while it's being held.
owner.access_stats.record_access(ctx);
self.owner.record_access(ctx);
delta_layer::DeltaLayerInner::load_keys(d, ctx)
.await

View File

@@ -39,7 +39,7 @@ async fn smoke_test() {
let layer = {
let mut layers = {
let layers = timeline.layers.read().await;
layers.likely_resident_layers().collect::<Vec<_>>()
layers.likely_resident_layers().cloned().collect::<Vec<_>>()
};
assert_eq!(layers.len(), 1);
@@ -176,7 +176,7 @@ async fn smoke_test() {
{
let layers = &[layer];
let mut g = timeline.layers.write().await;
g.finish_gc_timeline(layers);
g.open_mut().unwrap().finish_gc_timeline(layers);
// this just updates the remote_physical_size for demonstration purposes
rtc.schedule_gc_update(layers).unwrap();
}
@@ -216,7 +216,7 @@ async fn evict_and_wait_on_wanted_deleted() {
let layer = {
let mut layers = {
let layers = timeline.layers.read().await;
layers.likely_resident_layers().collect::<Vec<_>>()
layers.likely_resident_layers().cloned().collect::<Vec<_>>()
};
assert_eq!(layers.len(), 1);
@@ -260,7 +260,7 @@ async fn evict_and_wait_on_wanted_deleted() {
// the deletion of the layer in remote_storage happens.
{
let mut layers = timeline.layers.write().await;
layers.finish_gc_timeline(&[layer]);
layers.open_mut().unwrap().finish_gc_timeline(&[layer]);
}
SpawnBlockingPoolHelper::consume_and_release_all_of_spawn_blocking_threads(&handle).await;
@@ -301,7 +301,7 @@ fn read_wins_pending_eviction() {
let layer = {
let mut layers = {
let layers = timeline.layers.read().await;
layers.likely_resident_layers().collect::<Vec<_>>()
layers.likely_resident_layers().cloned().collect::<Vec<_>>()
};
assert_eq!(layers.len(), 1);
@@ -433,7 +433,7 @@ fn multiple_pending_evictions_scenario(name: &'static str, in_order: bool) {
let layer = {
let mut layers = {
let layers = timeline.layers.read().await;
layers.likely_resident_layers().collect::<Vec<_>>()
layers.likely_resident_layers().cloned().collect::<Vec<_>>()
};
assert_eq!(layers.len(), 1);
@@ -602,7 +602,7 @@ async fn cancelled_get_or_maybe_download_does_not_cancel_eviction() {
let layer = {
let mut layers = {
let layers = timeline.layers.read().await;
layers.likely_resident_layers().collect::<Vec<_>>()
layers.likely_resident_layers().cloned().collect::<Vec<_>>()
};
assert_eq!(layers.len(), 1);
@@ -682,7 +682,7 @@ async fn evict_and_wait_does_not_wait_for_download() {
let layer = {
let mut layers = {
let layers = timeline.layers.read().await;
layers.likely_resident_layers().collect::<Vec<_>>()
layers.likely_resident_layers().cloned().collect::<Vec<_>>()
};
assert_eq!(layers.len(), 1);
@@ -801,9 +801,9 @@ async fn eviction_cancellation_on_drop() {
let (evicted_layer, not_evicted) = {
let mut layers = {
let mut guard = timeline.layers.write().await;
let layers = guard.likely_resident_layers().collect::<Vec<_>>();
let layers = guard.likely_resident_layers().cloned().collect::<Vec<_>>();
// remove the layers from layermap
guard.finish_gc_timeline(&layers);
guard.open_mut().unwrap().finish_gc_timeline(&layers);
layers
};

View File

@@ -4,6 +4,7 @@ use bytes::Bytes;
use pageserver_api::key::{Key, KEY_SIZE};
use utils::{id::TimelineId, lsn::Lsn, shard::TenantShardId};
use crate::tenant::storage_layer::Layer;
use crate::{config::PageServerConf, context::RequestContext, repository::Value, tenant::Timeline};
use super::{DeltaLayerWriter, ImageLayerWriter, ResidentLayer};
@@ -173,8 +174,9 @@ impl SplitDeltaLayerWriter {
)
.await?;
let prev_delta_writer = std::mem::replace(&mut self.inner, next_delta_writer);
self.generated_layers
.push(prev_delta_writer.finish(key, tline, ctx).await?);
let (desc, path) = prev_delta_writer.finish(key, ctx).await?;
let delta_layer = Layer::finish_creating(self.conf, tline, desc, &path)?;
self.generated_layers.push(delta_layer);
}
self.inner.put_value(key, lsn, val, ctx).await
}
@@ -190,7 +192,10 @@ impl SplitDeltaLayerWriter {
inner,
..
} = self;
generated_layers.push(inner.finish(end_key, tline, ctx).await?);
let (desc, path) = inner.finish(end_key, ctx).await?;
let delta_layer = Layer::finish_creating(self.conf, tline, desc, &path)?;
generated_layers.push(delta_layer);
Ok(generated_layers)
}

View File

@@ -407,9 +407,16 @@ async fn gc_loop(tenant: Arc<Tenant>, cancel: CancellationToken) {
error_run_count += 1;
let wait_duration = Duration::from_secs_f64(wait_duration);
error!(
"Gc failed {error_run_count} times, retrying in {wait_duration:?}: {e:?}",
);
if matches!(e, crate::tenant::GcError::TimelineCancelled) {
// Timeline was cancelled during gc. We might either be in an event
// that affects the entire tenant (tenant deletion, pageserver shutdown),
// or in one that affects the timeline only (timeline deletion).
// Therefore, don't exit the loop.
info!("Gc failed {error_run_count} times, retrying in {wait_duration:?}: {e:?}");
} else {
error!("Gc failed {error_run_count} times, retrying in {wait_duration:?}: {e:?}");
}
wait_duration
}
}

View File

@@ -137,7 +137,7 @@ use self::layer_manager::LayerManager;
use self::logical_size::LogicalSize;
use self::walreceiver::{WalReceiver, WalReceiverConf};
use super::{config::TenantConf, upload_queue::NotInitialized};
use super::{config::TenantConf, storage_layer::LayerVisibilityHint, upload_queue::NotInitialized};
use super::{debug_assert_current_span_has_tenant_and_timeline_id, AttachedTenantConf};
use super::{remote_timeline_client::index::IndexPart, storage_layer::LayerFringe};
use super::{
@@ -527,6 +527,12 @@ pub(crate) enum PageReconstructError {
MissingKey(MissingKeyError),
}
impl From<layer_manager::Shutdown> for PageReconstructError {
fn from(_: layer_manager::Shutdown) -> Self {
PageReconstructError::Cancelled
}
}
impl GetVectoredError {
#[cfg(test)]
pub(crate) fn is_missing_key_error(&self) -> bool {
@@ -534,6 +540,12 @@ impl GetVectoredError {
}
}
impl From<layer_manager::Shutdown> for GetVectoredError {
fn from(_: layer_manager::Shutdown) -> Self {
GetVectoredError::Cancelled
}
}
pub struct MissingKeyError {
key: Key,
shard: ShardNumber,
@@ -597,6 +609,12 @@ pub(crate) enum CreateImageLayersError {
Other(#[from] anyhow::Error),
}
impl From<layer_manager::Shutdown> for CreateImageLayersError {
fn from(_: layer_manager::Shutdown) -> Self {
CreateImageLayersError::Cancelled
}
}
#[derive(thiserror::Error, Debug, Clone)]
pub(crate) enum FlushLayerError {
/// Timeline cancellation token was cancelled
@@ -634,6 +652,12 @@ impl FlushLayerError {
}
}
impl From<layer_manager::Shutdown> for FlushLayerError {
fn from(_: layer_manager::Shutdown) -> Self {
FlushLayerError::Cancelled
}
}
#[derive(thiserror::Error, Debug)]
pub(crate) enum GetVectoredError {
#[error("timeline shutting down")]
@@ -1198,12 +1222,7 @@ impl Timeline {
/// Hence, the result **does not represent local filesystem usage**.
pub(crate) async fn layer_size_sum(&self) -> u64 {
let guard = self.layers.read().await;
let layer_map = guard.layer_map();
let mut size = 0;
for l in layer_map.iter_historic_layers() {
size += l.file_size;
}
size
guard.layer_size_sum()
}
pub(crate) fn resident_physical_size(&self) -> u64 {
@@ -1370,16 +1389,15 @@ impl Timeline {
// This exists to provide a non-span creating version of `freeze_and_flush` we can call without
// polluting the span hierarchy.
pub(crate) async fn freeze_and_flush0(&self) -> Result<(), FlushLayerError> {
let to_lsn = {
let token = {
// Freeze the current open in-memory layer. It will be written to disk on next
// iteration.
let mut g = self.write_lock.lock().await;
let to_lsn = self.get_last_record_lsn();
self.freeze_inmem_layer_at(to_lsn, &mut g).await;
to_lsn
self.freeze_inmem_layer_at(to_lsn, &mut g).await?
};
self.flush_frozen_layers_and_wait(to_lsn).await
self.wait_flush_completion(token).await
}
// Check if an open ephemeral layer should be closed: this provides
@@ -1393,12 +1411,20 @@ impl Timeline {
return;
};
// FIXME: why not early exit? because before #7927 the state would had been cleared every
// time, and this was missed.
// if write_guard.is_none() { return; }
let Ok(layers_guard) = self.layers.try_read() else {
// Don't block if the layer lock is busy
return;
};
let Some(open_layer) = &layers_guard.layer_map().open_layer else {
let Ok(lm) = layers_guard.layer_map() else {
return;
};
let Some(open_layer) = &lm.open_layer else {
// If there is no open layer, we have no layer freezing to do. However, we might need to generate
// some updates to disk_consistent_lsn and remote_consistent_lsn, in case we ingested some WAL regions
// that didn't result in writes to this shard.
@@ -1424,9 +1450,16 @@ impl Timeline {
);
// The flush loop will update remote consistent LSN as well as disk consistent LSN.
self.flush_frozen_layers_and_wait(last_record_lsn)
.await
.ok();
// We know there is no open layer, so we can request freezing without actually
// freezing anything. This is true even if we have dropped the layers_guard, we
// still hold the write_guard.
let _ = async {
let token = self
.freeze_inmem_layer_at(last_record_lsn, &mut write_guard)
.await?;
self.wait_flush_completion(token).await
}
.await;
}
}
@@ -1464,33 +1497,26 @@ impl Timeline {
self.last_freeze_at.load(),
open_layer.get_opened_at(),
) {
let at_lsn = match open_layer.info() {
match open_layer.info() {
InMemoryLayerInfo::Frozen { lsn_start, lsn_end } => {
// We may reach this point if the layer was already frozen by not yet flushed: flushing
// happens asynchronously in the background.
tracing::debug!(
"Not freezing open layer, it's already frozen ({lsn_start}..{lsn_end})"
);
None
}
InMemoryLayerInfo::Open { .. } => {
// Upgrade to a write lock and freeze the layer
drop(layers_guard);
let mut layers_guard = self.layers.write().await;
let froze = layers_guard
.try_freeze_in_memory_layer(
current_lsn,
&self.last_freeze_at,
&mut write_guard,
)
let res = self
.freeze_inmem_layer_at(current_lsn, &mut write_guard)
.await;
Some(current_lsn).filter(|_| froze)
}
};
if let Some(lsn) = at_lsn {
let res: Result<u64, _> = self.flush_frozen_layers(lsn);
if let Err(e) = res {
tracing::info!("failed to flush frozen layer after background freeze: {e:#}");
if let Err(e) = res {
tracing::info!(
"failed to flush frozen layer after background freeze: {e:#}"
);
}
}
}
}
@@ -1644,6 +1670,11 @@ impl Timeline {
// about corner cases like s3 suddenly hanging up?
self.remote_client.shutdown().await;
}
Err(FlushLayerError::Cancelled) => {
// this is likely the second shutdown, ignore silently.
// TODO: this can be removed once https://github.com/neondatabase/neon/issues/5080
debug_assert!(self.cancel.is_cancelled());
}
Err(e) => {
// Non-fatal. Shutdown is infallible. Failures to flush just mean that
// we have some extra WAL replay to do next time the timeline starts.
@@ -1662,6 +1693,7 @@ impl Timeline {
// Transition the remote_client into a state where it's only useful for timeline deletion.
// (The deletion use case is why we can't just hook up remote_client to Self::cancel).)
self.remote_client.stop();
// As documented in remote_client.stop()'s doc comment, it's our responsibility
// to shut down the upload queue tasks.
// TODO: fix that, task management should be encapsulated inside remote_client.
@@ -1672,10 +1704,17 @@ impl Timeline {
)
.await;
// TODO: work toward making this a no-op. See this funciton's doc comment for more context.
// TODO: work toward making this a no-op. See this function's doc comment for more context.
tracing::debug!("Waiting for tasks...");
task_mgr::shutdown_tasks(None, Some(self.tenant_shard_id), Some(self.timeline_id)).await;
{
// Allow any remaining in-memory layers to do cleanup -- until that, they hold the gate
// open.
let mut write_guard = self.write_lock.lock().await;
self.layers.write().await.shutdown(&mut write_guard);
}
// Finally wait until any gate-holders are complete.
//
// TODO: once above shutdown_tasks is a no-op, we can close the gate before calling shutdown_tasks
@@ -1769,9 +1808,12 @@ impl Timeline {
}
}
pub(crate) async fn layer_map_info(&self, reset: LayerAccessStatsReset) -> LayerMapInfo {
pub(crate) async fn layer_map_info(
&self,
reset: LayerAccessStatsReset,
) -> Result<LayerMapInfo, layer_manager::Shutdown> {
let guard = self.layers.read().await;
let layer_map = guard.layer_map();
let layer_map = guard.layer_map()?;
let mut in_memory_layers = Vec::with_capacity(layer_map.frozen_layers.len() + 1);
if let Some(open_layer) = &layer_map.open_layer {
in_memory_layers.push(open_layer.info());
@@ -1780,16 +1822,15 @@ impl Timeline {
in_memory_layers.push(frozen_layer.info());
}
let mut historic_layers = Vec::new();
for historic_layer in layer_map.iter_historic_layers() {
let historic_layer = guard.get_from_desc(&historic_layer);
historic_layers.push(historic_layer.info(reset));
}
let historic_layers = layer_map
.iter_historic_layers()
.map(|desc| guard.get_from_desc(&desc).info(reset))
.collect();
LayerMapInfo {
Ok(LayerMapInfo {
in_memory_layers,
historic_layers,
}
})
}
#[instrument(skip_all, fields(tenant_id = %self.tenant_shard_id.tenant_id, shard_id = %self.tenant_shard_id.shard_slug(), timeline_id = %self.timeline_id))]
@@ -1797,7 +1838,7 @@ impl Timeline {
&self,
layer_file_name: &LayerName,
) -> anyhow::Result<Option<bool>> {
let Some(layer) = self.find_layer(layer_file_name).await else {
let Some(layer) = self.find_layer(layer_file_name).await? else {
return Ok(None);
};
@@ -1818,7 +1859,7 @@ impl Timeline {
.enter()
.map_err(|_| anyhow::anyhow!("Shutting down"))?;
let Some(local_layer) = self.find_layer(layer_file_name).await else {
let Some(local_layer) = self.find_layer(layer_file_name).await? else {
return Ok(None);
};
@@ -2304,7 +2345,10 @@ impl Timeline {
let mut layers = self.layers.try_write().expect(
"in the context where we call this function, no other task has access to the object",
);
layers.initialize_empty(Lsn(start_lsn.0));
layers
.open_mut()
.expect("in this context the LayerManager must still be open")
.initialize_empty(Lsn(start_lsn.0));
}
/// Scan the timeline directory, cleanup, populate the layer map, and schedule uploads for local-only
@@ -2436,7 +2480,10 @@ impl Timeline {
let num_layers = loaded_layers.len();
guard.initialize_local_layers(loaded_layers, disk_consistent_lsn + 1);
guard
.open_mut()
.expect("layermanager must be open during init")
.initialize_local_layers(loaded_layers, disk_consistent_lsn + 1);
self.remote_client
.schedule_layer_file_deletion(&needs_cleanup)?;
@@ -2471,7 +2518,7 @@ impl Timeline {
// Now that we have the full layer map, we may calculate the visibility of layers within it (a global scan)
drop(guard); // drop write lock, update_layer_visibility will take a read lock.
self.update_layer_visibility().await;
self.update_layer_visibility().await?;
info!(
"loaded layer map with {} layers at {}, total physical size: {}",
@@ -2893,16 +2940,17 @@ impl Timeline {
}
}
async fn find_layer(&self, layer_name: &LayerName) -> Option<Layer> {
async fn find_layer(
&self,
layer_name: &LayerName,
) -> Result<Option<Layer>, layer_manager::Shutdown> {
let guard = self.layers.read().await;
for historic_layer in guard.layer_map().iter_historic_layers() {
let historic_layer_name = historic_layer.layer_name();
if layer_name == &historic_layer_name {
return Some(guard.get_from_desc(&historic_layer));
}
}
None
let layer = guard
.layer_map()?
.iter_historic_layers()
.find(|l| &l.layer_name() == layer_name)
.map(|found| guard.get_from_desc(&found));
Ok(layer)
}
/// The timeline heatmap is a hint to secondary locations from the primary location,
@@ -2919,14 +2967,22 @@ impl Timeline {
let guard = self.layers.read().await;
let resident = guard.likely_resident_layers().map(|layer| {
let last_activity_ts = layer.access_stats().latest_activity();
HeatMapLayer::new(
layer.layer_desc().layer_name(),
layer.metadata(),
last_activity_ts,
)
let resident = guard.likely_resident_layers().filter_map(|layer| {
match layer.visibility() {
LayerVisibilityHint::Visible => {
// Layer is visible to one or more read LSNs: elegible for inclusion in layer map
let last_activity_ts = layer.latest_activity();
Some(HeatMapLayer::new(
layer.layer_desc().layer_name(),
layer.metadata(),
last_activity_ts,
))
}
LayerVisibilityHint::Covered => {
// Layer is resident but unlikely to be read: not elegible for inclusion in heatmap.
None
}
}
});
let layers = resident.collect();
@@ -2945,6 +3001,7 @@ impl Timeline {
}
impl Timeline {
#[allow(unknown_lints)] // doc_lazy_continuation is still a new lint
#[allow(clippy::doc_lazy_continuation)]
/// Get the data needed to reconstruct all keys in the provided keyspace
///
@@ -3096,7 +3153,7 @@ impl Timeline {
// which turns out to be a perf bottleneck in some cases.
if !unmapped_keyspace.is_empty() {
let guard = timeline.layers.read().await;
let layers = guard.layer_map();
let layers = guard.layer_map()?;
let in_memory_layer = layers.find_in_memory_layer(|l| {
let start_lsn = l.get_lsn_range().start;
@@ -3229,10 +3286,6 @@ impl Timeline {
Ok(ancestor.clone())
}
pub(crate) fn get_ancestor_timeline(&self) -> Option<Arc<Timeline>> {
self.ancestor_timeline.clone()
}
pub(crate) fn get_shard_identity(&self) -> &ShardIdentity {
&self.shard_identity
}
@@ -3248,22 +3301,35 @@ impl Timeline {
}
}
/// Returns a non-frozen open in-memory layer for ingestion.
///
/// Get a handle to the latest layer for appending.
///
/// Takes a witness of timeline writer state lock being held, because it makes no sense to call
/// this function without holding the mutex.
async fn get_layer_for_write(
&self,
lsn: Lsn,
_guard: &tokio::sync::MutexGuard<'_, Option<TimelineWriterState>>,
ctx: &RequestContext,
) -> anyhow::Result<Arc<InMemoryLayer>> {
let mut guard = self.layers.write().await;
let gate_guard = self.gate.enter().context("enter gate for inmem layer")?;
let last_record_lsn = self.get_last_record_lsn();
ensure!(
lsn > last_record_lsn,
"cannot modify relation after advancing last_record_lsn (incoming_lsn={}, last_record_lsn={})",
lsn,
last_record_lsn,
);
let layer = guard
.open_mut()?
.get_layer_for_write(
lsn,
self.get_last_record_lsn(),
self.conf,
self.timeline_id,
self.tenant_shard_id,
gate_guard,
ctx,
)
.await?;
@@ -3277,21 +3343,48 @@ impl Timeline {
self.last_record_lsn.advance(new_lsn);
}
/// Freeze any existing open in-memory layer and unconditionally notify the flush loop.
///
/// Unconditional flush loop notification is given because in sharded cases we will want to
/// leave an Lsn gap. Unsharded tenants do not have Lsn gaps.
async fn freeze_inmem_layer_at(
&self,
at: Lsn,
write_lock: &mut tokio::sync::MutexGuard<'_, Option<TimelineWriterState>>,
) {
) -> Result<u64, FlushLayerError> {
let frozen = {
let mut guard = self.layers.write().await;
guard
.open_mut()?
.try_freeze_in_memory_layer(at, &self.last_freeze_at, write_lock)
.await
};
if frozen {
let now = Instant::now();
*(self.last_freeze_ts.write().unwrap()) = now;
}
// Increment the flush cycle counter and wake up the flush task.
// Remember the new value, so that when we listen for the flush
// to finish, we know when the flush that we initiated has
// finished, instead of some other flush that was started earlier.
let mut my_flush_request = 0;
let flush_loop_state = { *self.flush_loop_state.lock().unwrap() };
if !matches!(flush_loop_state, FlushLoopState::Running { .. }) {
return Err(FlushLayerError::NotRunning(flush_loop_state));
}
self.layer_flush_start_tx.send_modify(|(counter, lsn)| {
my_flush_request = *counter + 1;
*counter = my_flush_request;
*lsn = std::cmp::max(at, *lsn);
});
assert_ne!(my_flush_request, 0);
Ok(my_flush_request)
}
/// Layer flusher task's main loop.
@@ -3328,7 +3421,11 @@ impl Timeline {
let layer_to_flush = {
let guard = self.layers.read().await;
guard.layer_map().frozen_layers.front().cloned()
let Ok(lm) = guard.layer_map() else {
info!("dropping out of flush loop for timeline shutdown");
return;
};
lm.frozen_layers.front().cloned()
// drop 'layers' lock to allow concurrent reads and writes
};
let Some(layer_to_flush) = layer_to_flush else {
@@ -3385,34 +3482,7 @@ impl Timeline {
}
}
/// Request the flush loop to write out all frozen layers up to `at_lsn` as Delta L0 files to disk.
/// The caller is responsible for the freezing, e.g., [`Self::freeze_inmem_layer_at`].
///
/// `at_lsn` may be higher than the highest LSN of a frozen layer: if this is the
/// case, it means no data will be written between the top of the highest frozen layer and
/// to_lsn, e.g. because this tenant shard has ingested up to to_lsn and not written any data
/// locally for that part of the WAL.
fn flush_frozen_layers(&self, at_lsn: Lsn) -> Result<u64, FlushLayerError> {
// Increment the flush cycle counter and wake up the flush task.
// Remember the new value, so that when we listen for the flush
// to finish, we know when the flush that we initiated has
// finished, instead of some other flush that was started earlier.
let mut my_flush_request = 0;
let flush_loop_state = { *self.flush_loop_state.lock().unwrap() };
if !matches!(flush_loop_state, FlushLoopState::Running { .. }) {
return Err(FlushLayerError::NotRunning(flush_loop_state));
}
self.layer_flush_start_tx.send_modify(|(counter, lsn)| {
my_flush_request = *counter + 1;
*counter = my_flush_request;
*lsn = std::cmp::max(at_lsn, *lsn);
});
Ok(my_flush_request)
}
/// Waits any flush request created by [`Self::freeze_inmem_layer_at`] to complete.
async fn wait_flush_completion(&self, request: u64) -> Result<(), FlushLayerError> {
let mut rx = self.layer_flush_done_tx.subscribe();
loop {
@@ -3445,11 +3515,6 @@ impl Timeline {
}
}
async fn flush_frozen_layers_and_wait(&self, at_lsn: Lsn) -> Result<(), FlushLayerError> {
let token = self.flush_frozen_layers(at_lsn)?;
self.wait_flush_completion(token).await
}
/// Flush one frozen in-memory layer to disk, as a new delta layer.
///
/// Return value is the last lsn (inclusive) of the layer that was frozen.
@@ -3586,11 +3651,11 @@ impl Timeline {
{
let mut guard = self.layers.write().await;
if self.cancel.is_cancelled() {
return Err(FlushLayerError::Cancelled);
}
guard.finish_flush_l0_layer(delta_layer_to_add.as_ref(), &frozen_layer, &self.metrics);
guard.open_mut()?.finish_flush_l0_layer(
delta_layer_to_add.as_ref(),
&frozen_layer,
&self.metrics,
);
if self.set_disk_consistent_lsn(disk_consistent_lsn) {
// Schedule remote uploads that will reflect our new disk_consistent_lsn
@@ -3701,12 +3766,14 @@ impl Timeline {
let frozen_layer = Arc::clone(frozen_layer);
let ctx = ctx.attached_child();
let work = async move {
let Some(new_delta) = frozen_layer
.write_to_disk(&self_clone, &ctx, key_range)
let Some((desc, path)) = frozen_layer
.write_to_disk(&ctx, key_range, self_clone.l0_flush_global_state.inner())
.await?
else {
return Ok(None);
};
let new_delta = Layer::finish_creating(self_clone.conf, &self_clone, desc, &path)?;
// The write_to_disk() above calls writer.finish() which already did the fsync of the inodes.
// We just need to fsync the directory in which these inodes are linked,
// which we know to be the timeline directory.
@@ -3796,7 +3863,9 @@ impl Timeline {
let threshold = self.get_image_creation_threshold();
let guard = self.layers.read().await;
let layers = guard.layer_map();
let Ok(layers) = guard.layer_map() else {
return false;
};
let mut max_deltas = 0;
for part_range in &partition.ranges {
@@ -4204,13 +4273,16 @@ impl Timeline {
let mut guard = self.layers.write().await;
// FIXME: we could add the images to be uploaded *before* returning from here, but right
// now they are being scheduled outside of write lock
guard.track_new_image_layers(&image_layers, &self.metrics);
// now they are being scheduled outside of write lock; current way is inconsistent with
// compaction lock order.
guard
.open_mut()?
.track_new_image_layers(&image_layers, &self.metrics);
drop_wlock(guard);
timer.stop_and_record();
// Creating image layers may have caused some previously visible layers to be covered
self.update_layer_visibility().await;
self.update_layer_visibility().await?;
Ok(image_layers)
}
@@ -4290,7 +4362,7 @@ impl Timeline {
tenant: &crate::tenant::Tenant,
prepared: detach_ancestor::PreparedTimelineDetach,
ctx: &RequestContext,
) -> Result<Vec<TimelineId>, anyhow::Error> {
) -> Result<HashSet<TimelineId>, anyhow::Error> {
detach_ancestor::complete(self, tenant, prepared, ctx).await
}
@@ -4369,6 +4441,12 @@ impl CompactionError {
}
}
impl From<layer_manager::Shutdown> for CompactionError {
fn from(_: layer_manager::Shutdown) -> Self {
CompactionError::ShuttingDown
}
}
#[serde_as]
#[derive(serde::Serialize)]
struct RecordedDuration(#[serde_as(as = "serde_with::DurationMicroSeconds")] Duration);
@@ -4474,11 +4552,14 @@ impl Timeline {
.collect();
if !new_images.is_empty() {
guard.track_new_image_layers(new_images, &self.metrics);
guard
.open_mut()?
.track_new_image_layers(new_images, &self.metrics);
}
// deletion will happen later, the layer file manager calls garbage_collect_on_drop
guard.finish_compact_l0(&remove_layers, &insert_layers, &self.metrics);
guard
.open_mut()?
.finish_compact_l0(&remove_layers, &insert_layers, &self.metrics);
self.remote_client
.schedule_compaction_update(&remove_layers, new_deltas)?;
@@ -4492,7 +4573,7 @@ impl Timeline {
self: &Arc<Self>,
mut replace_layers: Vec<(Layer, ResidentLayer)>,
mut drop_layers: Vec<Layer>,
) -> Result<(), super::upload_queue::NotInitialized> {
) -> Result<(), CompactionError> {
let mut guard = self.layers.write().await;
// Trim our lists in case our caller (compaction) raced with someone else (GC) removing layers: we want
@@ -4500,7 +4581,9 @@ impl Timeline {
replace_layers.retain(|(l, _)| guard.contains(l));
drop_layers.retain(|l| guard.contains(l));
guard.rewrite_layers(&replace_layers, &drop_layers, &self.metrics);
guard
.open_mut()?
.rewrite_layers(&replace_layers, &drop_layers, &self.metrics);
let upload_layers: Vec<_> = replace_layers.into_iter().map(|r| r.1).collect();
@@ -4789,7 +4872,7 @@ impl Timeline {
//
// TODO holding a write lock is too agressive and avoidable
let mut guard = self.layers.write().await;
let layers = guard.layer_map();
let layers = guard.layer_map()?;
'outer: for l in layers.iter_historic_layers() {
result.layers_total += 1;
@@ -4917,7 +5000,7 @@ impl Timeline {
}
})?;
guard.finish_gc_timeline(&gc_layers);
guard.open_mut()?.finish_gc_timeline(&gc_layers);
#[cfg(feature = "testing")]
{
@@ -5073,9 +5156,13 @@ impl Timeline {
let remaining = {
let guard = self.layers.read().await;
guard
.layer_map()
.iter_historic_layers()
let Ok(lm) = guard.layer_map() else {
// technically here we could look into iterating accessible layers, but downloading
// all layers of a shutdown timeline makes no sense regardless.
tracing::info!("attempted to download all layers of shutdown timeline");
return;
};
lm.iter_historic_layers()
.map(|desc| guard.get_from_desc(&desc))
.collect::<Vec<_>>()
};
@@ -5182,10 +5269,10 @@ impl Timeline {
let file_size = layer.layer_desc().file_size;
max_layer_size = max_layer_size.map_or(Some(file_size), |m| Some(m.max(file_size)));
let last_activity_ts = layer.access_stats().latest_activity();
let last_activity_ts = layer.latest_activity();
EvictionCandidate {
layer: layer.into(),
layer: layer.to_owned().into(),
last_activity_ts,
relative_last_activity: finite_f32::FiniteF32::ZERO,
}
@@ -5270,7 +5357,7 @@ impl Timeline {
{
let mut guard = self.layers.write().await;
guard.force_insert_layer(image_layer);
guard.open_mut().unwrap().force_insert_layer(image_layer);
}
Ok(())
@@ -5314,7 +5401,7 @@ impl Timeline {
}
let guard = self.layers.read().await;
for layer in guard.layer_map().iter_historic_layers() {
for layer in guard.layer_map()?.iter_historic_layers() {
if layer.is_delta()
&& overlaps_with(&layer.lsn_range, &deltas.lsn_range)
&& layer.lsn_range != deltas.lsn_range
@@ -5339,13 +5426,12 @@ impl Timeline {
for (key, lsn, val) in deltas.data {
delta_layer_writer.put_value(key, lsn, val, ctx).await?;
}
let delta_layer = delta_layer_writer
.finish(deltas.key_range.end, self, ctx)
.await?;
let (desc, path) = delta_layer_writer.finish(deltas.key_range.end, ctx).await?;
let delta_layer = Layer::finish_creating(self.conf, self, desc, &path)?;
{
let mut guard = self.layers.write().await;
guard.force_insert_layer(delta_layer);
guard.open_mut().unwrap().force_insert_layer(delta_layer);
}
Ok(())
@@ -5360,7 +5446,7 @@ impl Timeline {
) -> anyhow::Result<Vec<(Key, Bytes)>> {
let mut all_data = Vec::new();
let guard = self.layers.read().await;
for layer in guard.layer_map().iter_historic_layers() {
for layer in guard.layer_map()?.iter_historic_layers() {
if !layer.is_delta() && layer.image_layer_lsn() == lsn {
let layer = guard.get_from_desc(&layer);
let mut reconstruct_data = ValuesReconstructState::default();
@@ -5388,7 +5474,7 @@ impl Timeline {
) -> anyhow::Result<Vec<super::storage_layer::PersistentLayerKey>> {
let mut layers = Vec::new();
let guard = self.layers.read().await;
for layer in guard.layer_map().iter_historic_layers() {
for layer in guard.layer_map()?.iter_historic_layers() {
layers.push(layer.key());
}
Ok(layers)
@@ -5405,7 +5491,7 @@ impl Timeline {
/// Tracking writes ingestion does to a particular in-memory layer.
///
/// Cleared upon freezing a layer.
struct TimelineWriterState {
pub(crate) struct TimelineWriterState {
open_layer: Arc<InMemoryLayer>,
current_size: u64,
// Previous Lsn which passed through
@@ -5513,7 +5599,10 @@ impl<'a> TimelineWriter<'a> {
}
async fn open_layer(&mut self, at: Lsn, ctx: &RequestContext) -> anyhow::Result<()> {
let layer = self.tl.get_layer_for_write(at, ctx).await?;
let layer = self
.tl
.get_layer_for_write(at, &self.write_guard, ctx)
.await?;
let initial_size = layer.size().await?;
let last_freeze_at = self.last_freeze_at.load();
@@ -5526,15 +5615,15 @@ impl<'a> TimelineWriter<'a> {
Ok(())
}
async fn roll_layer(&mut self, freeze_at: Lsn) -> anyhow::Result<()> {
async fn roll_layer(&mut self, freeze_at: Lsn) -> Result<(), FlushLayerError> {
let current_size = self.write_guard.as_ref().unwrap().current_size;
// self.write_guard will be taken by the freezing
self.tl
.freeze_inmem_layer_at(freeze_at, &mut self.write_guard)
.await;
.await?;
self.tl.flush_frozen_layers(freeze_at)?;
assert!(self.write_guard.is_none());
if current_size >= self.get_checkpoint_distance() * 2 {
warn!("Flushed oversized open layer with size {}", current_size)
@@ -5699,6 +5788,7 @@ mod tests {
let layers = timeline.layers.read().await;
let desc = layers
.layer_map()
.unwrap()
.iter_historic_layers()
.next()
.expect("must find one layer to evict");

View File

@@ -371,7 +371,7 @@ impl Timeline {
);
let layers = self.layers.read().await;
for layer_desc in layers.layer_map().iter_historic_layers() {
for layer_desc in layers.layer_map()?.iter_historic_layers() {
let layer = layers.get_from_desc(&layer_desc);
if layer.metadata().shard.shard_count == self.shard_identity.count {
// This layer does not belong to a historic ancestor, no need to re-image it.
@@ -549,7 +549,9 @@ impl Timeline {
///
/// The result may be used as an input to eviction and secondary downloads to de-prioritize layers
/// that we know won't be needed for reads.
pub(super) async fn update_layer_visibility(&self) {
pub(super) async fn update_layer_visibility(
&self,
) -> Result<(), super::layer_manager::Shutdown> {
let head_lsn = self.get_last_record_lsn();
// We will sweep through layers in reverse-LSN order. We only do historic layers. L0 deltas
@@ -557,7 +559,7 @@ impl Timeline {
// Note that L0 deltas _can_ be covered by image layers, but we consider them 'visible' because we anticipate that
// they will be subject to L0->L1 compaction in the near future.
let layer_manager = self.layers.read().await;
let layer_map = layer_manager.layer_map();
let layer_map = layer_manager.layer_map()?;
let readable_points = {
let children = self.gc_info.read().unwrap().retain_lsns.clone();
@@ -580,6 +582,7 @@ impl Timeline {
// TODO: publish our covered KeySpace to our parent, so that when they update their visibility, they can
// avoid assuming that everything at a branch point is visible.
drop(covered);
Ok(())
}
/// Collect a bunch of Level 0 layer files, and compact and reshuffle them as
@@ -633,12 +636,8 @@ impl Timeline {
) -> Result<CompactLevel0Phase1Result, CompactionError> {
stats.read_lock_held_spawn_blocking_startup_micros =
stats.read_lock_acquisition_micros.till_now(); // set by caller
let layers = guard.layer_map();
let level0_deltas = layers.get_level0_deltas();
let mut level0_deltas = level0_deltas
.into_iter()
.map(|x| guard.get_from_desc(&x))
.collect_vec();
let layers = guard.layer_map()?;
let level0_deltas = layers.level0_deltas();
stats.level0_deltas_count = Some(level0_deltas.len());
// Only compact if enough layers have accumulated.
@@ -651,6 +650,11 @@ impl Timeline {
return Ok(CompactLevel0Phase1Result::default());
}
let mut level0_deltas = level0_deltas
.iter()
.map(|x| guard.get_from_desc(x))
.collect::<Vec<_>>();
// Gather the files to compact in this iteration.
//
// Start with the oldest Level 0 delta file, and collect any other
@@ -1104,14 +1108,16 @@ impl Timeline {
|| contains_hole
{
// ... if so, flush previous layer and prepare to write new one
new_layers.push(
writer
.take()
.unwrap()
.finish(prev_key.unwrap().next(), self, ctx)
.await
.map_err(CompactionError::Other)?,
);
let (desc, path) = writer
.take()
.unwrap()
.finish(prev_key.unwrap().next(), ctx)
.await
.map_err(CompactionError::Other)?;
let new_delta = Layer::finish_creating(self.conf, self, desc, &path)
.map_err(CompactionError::Other)?;
new_layers.push(new_delta);
writer = None;
if contains_hole {
@@ -1174,12 +1180,13 @@ impl Timeline {
prev_key = Some(key);
}
if let Some(writer) = writer {
new_layers.push(
writer
.finish(prev_key.unwrap().next(), self, ctx)
.await
.map_err(CompactionError::Other)?,
);
let (desc, path) = writer
.finish(prev_key.unwrap().next(), ctx)
.await
.map_err(CompactionError::Other)?;
let new_delta = Layer::finish_creating(self.conf, self, desc, &path)
.map_err(CompactionError::Other)?;
new_layers.push(new_delta);
}
// Sync layers
@@ -1404,10 +1411,9 @@ impl Timeline {
// Find the top of the historical layers
let end_lsn = {
let guard = self.layers.read().await;
let layers = guard.layer_map();
let layers = guard.layer_map()?;
let l0_deltas = layers.get_level0_deltas();
drop(guard);
let l0_deltas = layers.level0_deltas();
// As an optimization, if we find that there are too few L0 layers,
// bail out early. We know that the compaction algorithm would do
@@ -1779,7 +1785,7 @@ impl Timeline {
// 2. Inferred from (1), for each key in the layer selection, the value can be reconstructed only with the layers in the layer selection.
let (layer_selection, gc_cutoff, retain_lsns_below_horizon) = {
let guard = self.layers.read().await;
let layers = guard.layer_map();
let layers = guard.layer_map()?;
let gc_info = self.gc_info.read().unwrap();
let mut retain_lsns_below_horizon = Vec::new();
let gc_cutoff = gc_info.cutoffs.select_min();
@@ -1966,13 +1972,16 @@ impl Timeline {
for (key, lsn, val) in deltas {
delta_layer_writer.put_value(key, lsn, val, ctx).await?;
}
stats.produce_delta_layer(delta_layer_writer.size());
if dry_run {
return Ok(None);
}
let delta_layer = delta_layer_writer
.finish(delta_key.key_range.end, tline, ctx)
let (desc, path) = delta_layer_writer
.finish(delta_key.key_range.end, ctx)
.await?;
let delta_layer = Layer::finish_creating(tline.conf, tline, desc, &path)?;
Ok(Some(FlushDeltaResult::CreateResidentLayer(delta_layer)))
}
@@ -2210,7 +2219,9 @@ impl Timeline {
// Step 3: Place back to the layer map.
{
let mut guard = self.layers.write().await;
guard.finish_gc_compaction(&layer_selection, &compact_to, &self.metrics)
guard
.open_mut()?
.finish_gc_compaction(&layer_selection, &compact_to, &self.metrics)
};
self.remote_client
.schedule_compaction_update(&layer_selection, &compact_to)?;
@@ -2290,7 +2301,7 @@ impl CompactionJobExecutor for TimelineAdaptor {
self.flush_updates().await?;
let guard = self.timeline.layers.read().await;
let layer_map = guard.layer_map();
let layer_map = guard.layer_map()?;
let result = layer_map
.iter_historic_layers()
@@ -2413,9 +2424,9 @@ impl CompactionJobExecutor for TimelineAdaptor {
))
});
let new_delta_layer = writer
.finish(prev.unwrap().0.next(), &self.timeline, ctx)
.await?;
let (desc, path) = writer.finish(prev.unwrap().0.next(), ctx).await?;
let new_delta_layer =
Layer::finish_creating(self.timeline.conf, &self.timeline, desc, &path)?;
self.new_deltas.push(new_delta_layer);
Ok(())

View File

@@ -1,4 +1,4 @@
use std::sync::Arc;
use std::{collections::HashSet, sync::Arc};
use super::{layer_manager::LayerManager, FlushLayerError, Timeline};
use crate::{
@@ -74,6 +74,11 @@ impl From<crate::tenant::upload_queue::NotInitialized> for Error {
Error::ShuttingDown
}
}
impl From<super::layer_manager::Shutdown> for Error {
fn from(_: super::layer_manager::Shutdown) -> Self {
Error::ShuttingDown
}
}
impl From<FlushLayerError> for Error {
fn from(value: FlushLayerError) -> Self {
@@ -141,50 +146,9 @@ pub(super) async fn prepare(
}
}
// detached has previously been detached; let's inspect each of the current timelines and
// report back the timelines which have been reparented by our detach
let mut all_direct_children = tenant
.timelines
.lock()
.unwrap()
.values()
.filter(|tl| matches!(tl.ancestor_timeline.as_ref(), Some(ancestor) if Arc::ptr_eq(ancestor, detached)))
.map(|tl| (tl.ancestor_lsn, tl.clone()))
.collect::<Vec<_>>();
let mut any_shutdown = false;
all_direct_children.retain(
|(_, tl)| match tl.remote_client.initialized_upload_queue() {
Ok(accessor) => accessor
.latest_uploaded_index_part()
.lineage
.is_reparented(),
Err(_shutdownalike) => {
// not 100% a shutdown, but let's bail early not to give inconsistent results in
// sharded enviroment.
any_shutdown = true;
true
}
},
);
if any_shutdown {
// it could be one or many being deleted; have client retry
return Err(Error::ShuttingDown);
}
let mut reparented = all_direct_children;
// why this instead of hashset? there is a reason, but I've forgotten it many times.
//
// maybe if this was a hashset we would not be able to distinguish some race condition.
reparented.sort_unstable_by_key(|(lsn, tl)| (*lsn, tl.timeline_id));
let reparented_timelines = reparented_direct_children(detached, tenant)?;
return Ok(Progress::Done(AncestorDetached {
reparented_timelines: reparented
.into_iter()
.map(|(_, tl)| tl.timeline_id)
.collect(),
reparented_timelines,
}));
};
@@ -277,7 +241,7 @@ pub(super) async fn prepare(
// between retries, these can change if compaction or gc ran in between. this will mean
// we have to redo work.
partition_work(ancestor_lsn, &layers)
partition_work(ancestor_lsn, &layers)?
};
// TODO: layers are already sorted by something: use that to determine how much of remote
@@ -381,16 +345,67 @@ pub(super) async fn prepare(
Ok(Progress::Prepared(guard, prepared))
}
fn reparented_direct_children(
detached: &Arc<Timeline>,
tenant: &Tenant,
) -> Result<HashSet<TimelineId>, Error> {
let mut all_direct_children = tenant
.timelines
.lock()
.unwrap()
.values()
.filter_map(|tl| {
let is_direct_child = matches!(tl.ancestor_timeline.as_ref(), Some(ancestor) if Arc::ptr_eq(ancestor, detached));
if is_direct_child {
Some(tl.clone())
} else {
if let Some(timeline) = tl.ancestor_timeline.as_ref() {
assert_ne!(timeline.timeline_id, detached.timeline_id, "we cannot have two timelines with the same timeline_id live");
}
None
}
})
// Collect to avoid lock taking order problem with Tenant::timelines and
// Timeline::remote_client
.collect::<Vec<_>>();
let mut any_shutdown = false;
all_direct_children.retain(|tl| match tl.remote_client.initialized_upload_queue() {
Ok(accessor) => accessor
.latest_uploaded_index_part()
.lineage
.is_reparented(),
Err(_shutdownalike) => {
// not 100% a shutdown, but let's bail early not to give inconsistent results in
// sharded enviroment.
any_shutdown = true;
true
}
});
if any_shutdown {
// it could be one or many being deleted; have client retry
return Err(Error::ShuttingDown);
}
Ok(all_direct_children
.into_iter()
.map(|tl| tl.timeline_id)
.collect())
}
fn partition_work(
ancestor_lsn: Lsn,
source_layermap: &LayerManager,
) -> (usize, Vec<Layer>, Vec<Layer>) {
source: &LayerManager,
) -> Result<(usize, Vec<Layer>, Vec<Layer>), Error> {
let mut straddling_branchpoint = vec![];
let mut rest_of_historic = vec![];
let mut later_by_lsn = 0;
for desc in source_layermap.layer_map().iter_historic_layers() {
for desc in source.layer_map()?.iter_historic_layers() {
// off by one chances here:
// - start is inclusive
// - end is exclusive
@@ -409,10 +424,10 @@ fn partition_work(
&mut rest_of_historic
};
target.push(source_layermap.get_from_desc(&desc));
target.push(source.get_from_desc(&desc));
}
(later_by_lsn, straddling_branchpoint, rest_of_historic)
Ok((later_by_lsn, straddling_branchpoint, rest_of_historic))
}
async fn upload_rewritten_layer(
@@ -488,10 +503,12 @@ async fn copy_lsn_prefix(
// reuse the key instead of adding more holes between layers by using the real
// highest key in the layer.
let reused_highest_key = layer.layer_desc().key_range.end;
let copied = writer
.finish(reused_highest_key, target_timeline, ctx)
let (desc, path) = writer
.finish(reused_highest_key, ctx)
.await
.map_err(CopyDeltaPrefix)?;
let copied = Layer::finish_creating(target_timeline.conf, target_timeline, desc, &path)
.map_err(CopyDeltaPrefix)?;
tracing::debug!(%layer, %copied, "new layer produced");
@@ -537,11 +554,12 @@ pub(super) async fn complete(
tenant: &Tenant,
prepared: PreparedTimelineDetach,
_ctx: &RequestContext,
) -> Result<Vec<TimelineId>, anyhow::Error> {
) -> Result<HashSet<TimelineId>, anyhow::Error> {
let PreparedTimelineDetach { layers } = prepared;
let ancestor = detached
.get_ancestor_timeline()
.ancestor_timeline
.as_ref()
.expect("must still have a ancestor");
let ancestor_lsn = detached.get_ancestor_lsn();
@@ -581,7 +599,7 @@ pub(super) async fn complete(
}
let tl_ancestor = tl.ancestor_timeline.as_ref()?;
let is_same = Arc::ptr_eq(&ancestor, tl_ancestor);
let is_same = Arc::ptr_eq(ancestor, tl_ancestor);
let is_earlier = tl.get_ancestor_lsn() <= ancestor_lsn;
let is_deleting = tl
@@ -622,13 +640,18 @@ pub(super) async fn complete(
});
let reparenting_candidates = tasks.len();
let mut reparented = Vec::with_capacity(tasks.len());
let mut reparented = HashSet::with_capacity(tasks.len());
while let Some(res) = tasks.join_next().await {
match res {
Ok(Some(timeline)) => {
tracing::info!(reparented=%timeline.timeline_id, "reparenting done");
reparented.push((timeline.ancestor_lsn, timeline.timeline_id));
assert!(
reparented.insert(timeline.timeline_id),
"duplicate reparenting? timeline_id={}",
timeline.timeline_id
);
}
Ok(None) => {
// lets just ignore this for now. one or all reparented timelines could had
@@ -650,12 +673,5 @@ pub(super) async fn complete(
tracing::info!("failed to reparent some candidates");
}
reparented.sort_unstable();
let reparented = reparented
.into_iter()
.map(|(_, timeline_id)| timeline_id)
.collect();
Ok(reparented)
}

View File

@@ -213,51 +213,45 @@ impl Timeline {
let mut js = tokio::task::JoinSet::new();
{
let guard = self.layers.read().await;
let layers = guard.layer_map();
for layer in layers.iter_historic_layers() {
let layer = guard.get_from_desc(&layer);
// guard against eviction while we inspect it; it might be that eviction_task and
// disk_usage_eviction_task both select the same layers to be evicted, and
// seemingly free up double the space. both succeeding is of no consequence.
guard
.likely_resident_layers()
.filter(|layer| {
let last_activity_ts = layer.latest_activity();
if !layer.is_likely_resident() {
continue;
}
let no_activity_for = match now.duration_since(last_activity_ts) {
Ok(d) => d,
Err(_e) => {
// We reach here if `now` < `last_activity_ts`, which can legitimately
// happen if there is an access between us getting `now`, and us getting
// the access stats from the layer.
//
// The other reason why it can happen is system clock skew because
// SystemTime::now() is not monotonic, so, even if there is no access
// to the layer after we get `now` at the beginning of this function,
// it could be that `now` < `last_activity_ts`.
//
// To distinguish the cases, we would need to record `Instant`s in the
// access stats (i.e., monotonic timestamps), but then, the timestamps
// values in the access stats would need to be `Instant`'s, and hence
// they would be meaningless outside of the pageserver process.
// At the time of writing, the trade-off is that access stats are more
// valuable than detecting clock skew.
return false;
}
};
let last_activity_ts = layer.access_stats().latest_activity();
let no_activity_for = match now.duration_since(last_activity_ts) {
Ok(d) => d,
Err(_e) => {
// We reach here if `now` < `last_activity_ts`, which can legitimately
// happen if there is an access between us getting `now`, and us getting
// the access stats from the layer.
//
// The other reason why it can happen is system clock skew because
// SystemTime::now() is not monotonic, so, even if there is no access
// to the layer after we get `now` at the beginning of this function,
// it could be that `now` < `last_activity_ts`.
//
// To distinguish the cases, we would need to record `Instant`s in the
// access stats (i.e., monotonic timestamps), but then, the timestamps
// values in the access stats would need to be `Instant`'s, and hence
// they would be meaningless outside of the pageserver process.
// At the time of writing, the trade-off is that access stats are more
// valuable than detecting clock skew.
continue;
}
};
if no_activity_for > p.threshold {
no_activity_for > p.threshold
})
.cloned()
.for_each(|layer| {
js.spawn(async move {
layer
.evict_and_wait(std::time::Duration::from_secs(5))
.await
});
stats.candidates += 1;
}
}
});
};
let join_all = async move {

View File

@@ -1,4 +1,4 @@
use anyhow::{bail, ensure, Context, Result};
use anyhow::{bail, ensure, Context};
use itertools::Itertools;
use pageserver_api::shard::TenantShardId;
use std::{collections::HashMap, sync::Arc};
@@ -24,39 +24,142 @@ use crate::{
use super::TimelineWriterState;
/// Provides semantic APIs to manipulate the layer map.
#[derive(Default)]
pub(crate) struct LayerManager {
layer_map: LayerMap,
layer_fmgr: LayerFileManager<Layer>,
pub(crate) enum LayerManager {
/// Open as in not shutdown layer manager; we still have in-memory layers and we can manipulate
/// the layers.
Open(OpenLayerManager),
/// Shutdown layer manager where there are no more in-memory layers and persistent layers are
/// read-only.
Closed {
layers: HashMap<PersistentLayerKey, Layer>,
},
}
impl Default for LayerManager {
fn default() -> Self {
LayerManager::Open(OpenLayerManager::default())
}
}
impl LayerManager {
pub(crate) fn get_from_desc(&self, desc: &PersistentLayerDesc) -> Layer {
self.layer_fmgr.get_from_desc(desc)
pub(crate) fn get_from_key(&self, key: &PersistentLayerKey) -> Layer {
// The assumption for the `expect()` is that all code maintains the following invariant:
// A layer's descriptor is present in the LayerMap => the LayerFileManager contains a layer for the descriptor.
self.layers()
.get(key)
.with_context(|| format!("get layer from key: {key}"))
.expect("not found")
.clone()
}
pub(crate) fn get_from_key(&self, desc: &PersistentLayerKey) -> Layer {
self.layer_fmgr.get_from_key(desc)
pub(crate) fn get_from_desc(&self, desc: &PersistentLayerDesc) -> Layer {
self.get_from_key(&desc.key())
}
/// Get an immutable reference to the layer map.
///
/// We expect users only to be able to get an immutable layer map. If users want to make modifications,
/// they should use the below semantic APIs. This design makes us step closer to immutable storage state.
pub(crate) fn layer_map(&self) -> &LayerMap {
&self.layer_map
pub(crate) fn layer_map(&self) -> Result<&LayerMap, Shutdown> {
use LayerManager::*;
match self {
Open(OpenLayerManager { layer_map, .. }) => Ok(layer_map),
Closed { .. } => Err(Shutdown),
}
}
pub(crate) fn open_mut(&mut self) -> Result<&mut OpenLayerManager, Shutdown> {
use LayerManager::*;
match self {
Open(open) => Ok(open),
Closed { .. } => Err(Shutdown),
}
}
/// LayerManager shutdown. The in-memory layers do cleanup on drop, so we must drop them in
/// order to allow shutdown to complete.
///
/// If there was a want to flush in-memory layers, it must have happened earlier.
pub(crate) fn shutdown(&mut self, writer_state: &mut Option<TimelineWriterState>) {
use LayerManager::*;
match self {
Open(OpenLayerManager {
layer_map,
layer_fmgr: LayerFileManager(hashmap),
}) => {
let open = layer_map.open_layer.take();
let frozen = layer_map.frozen_layers.len();
let taken_writer_state = writer_state.take();
tracing::info!(open = open.is_some(), frozen, "dropped inmemory layers");
let layers = std::mem::take(hashmap);
*self = Closed { layers };
assert_eq!(open.is_some(), taken_writer_state.is_some());
}
Closed { .. } => {
tracing::debug!("ignoring multiple shutdowns on layer manager")
}
}
}
/// Sum up the historic layer sizes
pub(crate) fn layer_size_sum(&self) -> u64 {
self.layers()
.values()
.map(|l| l.layer_desc().file_size)
.sum()
}
pub(crate) fn likely_resident_layers(&self) -> impl Iterator<Item = &'_ Layer> + '_ {
self.layers().values().filter(|l| l.is_likely_resident())
}
pub(crate) fn contains(&self, layer: &Layer) -> bool {
self.contains_key(&layer.layer_desc().key())
}
pub(crate) fn contains_key(&self, key: &PersistentLayerKey) -> bool {
self.layers().contains_key(key)
}
pub(crate) fn all_persistent_layers(&self) -> Vec<PersistentLayerKey> {
self.layers().keys().cloned().collect_vec()
}
fn layers(&self) -> &HashMap<PersistentLayerKey, Layer> {
use LayerManager::*;
match self {
Open(OpenLayerManager { layer_fmgr, .. }) => &layer_fmgr.0,
Closed { layers } => layers,
}
}
}
#[derive(Default)]
pub(crate) struct OpenLayerManager {
layer_map: LayerMap,
layer_fmgr: LayerFileManager<Layer>,
}
impl std::fmt::Debug for OpenLayerManager {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("OpenLayerManager")
.field("layer_count", &self.layer_fmgr.0.len())
.finish()
}
}
#[derive(Debug, thiserror::Error)]
#[error("layer manager has been shutdown")]
pub(crate) struct Shutdown;
impl OpenLayerManager {
/// Called from `load_layer_map`. Initialize the layer manager with:
/// 1. all on-disk layers
/// 2. next open layer (with disk disk_consistent_lsn LSN)
pub(crate) fn initialize_local_layers(
&mut self,
on_disk_layers: Vec<Layer>,
next_open_layer_at: Lsn,
) {
pub(crate) fn initialize_local_layers(&mut self, layers: Vec<Layer>, next_open_layer_at: Lsn) {
let mut updates = self.layer_map.batch_update();
for layer in on_disk_layers {
for layer in layers {
Self::insert_historic_layer(layer, &mut updates, &mut self.layer_fmgr);
}
updates.flush();
@@ -68,26 +171,19 @@ impl LayerManager {
self.layer_map.next_open_layer_at = Some(next_open_layer_at);
}
/// Open a new writable layer to append data if there is no open layer, otherwise return the current open layer,
/// called within `get_layer_for_write`.
/// Open a new writable layer to append data if there is no open layer, otherwise return the
/// current open layer, called within `get_layer_for_write`.
pub(crate) async fn get_layer_for_write(
&mut self,
lsn: Lsn,
last_record_lsn: Lsn,
conf: &'static PageServerConf,
timeline_id: TimelineId,
tenant_shard_id: TenantShardId,
gate_guard: utils::sync::gate::GateGuard,
ctx: &RequestContext,
) -> Result<Arc<InMemoryLayer>> {
) -> anyhow::Result<Arc<InMemoryLayer>> {
ensure!(lsn.is_aligned());
ensure!(
lsn > last_record_lsn,
"cannot modify relation after advancing last_record_lsn (incoming_lsn={}, last_record_lsn={})",
lsn,
last_record_lsn,
);
// Do we have a layer open for writing already?
let layer = if let Some(open_layer) = &self.layer_map.open_layer {
if open_layer.get_lsn_range().start > lsn {
@@ -113,8 +209,15 @@ impl LayerManager {
lsn
);
let new_layer =
InMemoryLayer::create(conf, timeline_id, tenant_shard_id, start_lsn, ctx).await?;
let new_layer = InMemoryLayer::create(
conf,
timeline_id,
tenant_shard_id,
start_lsn,
gate_guard,
ctx,
)
.await?;
let layer = Arc::new(new_layer);
self.layer_map.open_layer = Some(layer.clone());
@@ -168,7 +271,7 @@ impl LayerManager {
froze
}
/// Add image layers to the layer map, called from `create_image_layers`.
/// Add image layers to the layer map, called from [`super::Timeline::create_image_layers`].
pub(crate) fn track_new_image_layers(
&mut self,
image_layers: &[ResidentLayer],
@@ -241,7 +344,7 @@ impl LayerManager {
self.finish_compact_l0(compact_from, compact_to, metrics)
}
/// Called when compaction is completed.
/// Called post-compaction when some previous generation image layers were trimmed.
pub(crate) fn rewrite_layers(
&mut self,
rewrite_layers: &[(Layer, ResidentLayer)],
@@ -259,13 +362,10 @@ impl LayerManager {
new_layer.layer_desc().lsn_range
);
// Transfer visibilty hint from old to new layer, since the new layer covers the same key space. This is not guaranteed to
// Transfer visibility hint from old to new layer, since the new layer covers the same key space. This is not guaranteed to
// be accurate (as the new layer may cover a different subset of the key range), but is a sensible default, and prevents
// always marking rewritten layers as visible.
new_layer
.as_ref()
.access_stats()
.set_visibility(old_layer.access_stats().visibility());
new_layer.as_ref().set_visibility(old_layer.visibility());
// Safety: we may never rewrite the same file in-place. Callers are responsible
// for ensuring that they only rewrite layers after something changes the path,
@@ -333,31 +433,6 @@ impl LayerManager {
mapping.remove(layer);
layer.delete_on_drop();
}
pub(crate) fn likely_resident_layers(&self) -> impl Iterator<Item = Layer> + '_ {
// for small layer maps, we most likely have all resident, but for larger more are likely
// to be evicted assuming lots of layers correlated with longer lifespan.
self.layer_map().iter_historic_layers().filter_map(|desc| {
self.layer_fmgr
.0
.get(&desc.key())
.filter(|l| l.is_likely_resident())
.cloned()
})
}
pub(crate) fn contains(&self, layer: &Layer) -> bool {
self.layer_fmgr.contains(layer)
}
pub(crate) fn contains_key(&self, key: &PersistentLayerKey) -> bool {
self.layer_fmgr.contains_key(key)
}
pub(crate) fn all_persistent_layers(&self) -> Vec<PersistentLayerKey> {
self.layer_fmgr.0.keys().cloned().collect_vec()
}
}
pub(crate) struct LayerFileManager<T>(HashMap<PersistentLayerKey, T>);
@@ -369,24 +444,6 @@ impl<T> Default for LayerFileManager<T> {
}
impl<T: AsLayerDesc + Clone> LayerFileManager<T> {
fn get_from_key(&self, key: &PersistentLayerKey) -> T {
// The assumption for the `expect()` is that all code maintains the following invariant:
// A layer's descriptor is present in the LayerMap => the LayerFileManager contains a layer for the descriptor.
self.0
.get(key)
.with_context(|| format!("get layer from key: {}", key))
.expect("not found")
.clone()
}
fn get_from_desc(&self, desc: &PersistentLayerDesc) -> T {
self.get_from_key(&desc.key())
}
fn contains_key(&self, key: &PersistentLayerKey) -> bool {
self.0.contains_key(key)
}
pub(crate) fn insert(&mut self, layer: T) {
let present = self.0.insert(layer.layer_desc().key(), layer.clone());
if present.is_some() && cfg!(debug_assertions) {
@@ -394,10 +451,6 @@ impl<T: AsLayerDesc + Clone> LayerFileManager<T> {
}
}
pub(crate) fn contains(&self, layer: &T) -> bool {
self.0.contains_key(&layer.layer_desc().key())
}
pub(crate) fn remove(&mut self, layer: &T) {
let present = self.0.remove(&layer.layer_desc().key());
if present.is_none() && cfg!(debug_assertions) {

View File

@@ -30,10 +30,12 @@ use tokio::time::Instant;
pub use pageserver_api::models::virtual_file as api;
pub(crate) mod io_engine;
pub use io_engine::feature_test as io_engine_feature_test;
pub use io_engine::io_engine_for_bench;
pub use io_engine::FeatureTestResult as IoEngineFeatureTestResult;
mod metadata;
mod open_options;
use self::owned_buffers_io::write::OwnedAsyncWriter;
pub(crate) use api::DirectIoMode;
pub(crate) use io_engine::IoEngineKind;
pub(crate) use metadata::Metadata;
pub(crate) use open_options::*;

View File

@@ -328,3 +328,29 @@ pub fn feature_test() -> anyhow::Result<FeatureTestResult> {
.join()
.unwrap()
}
/// For use in benchmark binaries only.
///
/// Benchmarks which initialize `virtual_file` need to know what engine to use, but we also
/// don't want to silently fall back to slower I/O engines in a benchmark: this could waste
/// developer time trying to figure out why it's slow.
///
/// In practice, this method will either return IoEngineKind::TokioEpollUring, or panic.
pub fn io_engine_for_bench() -> IoEngineKind {
#[cfg(not(target_os = "linux"))]
{
panic!("This benchmark does I/O and can only give a representative result on Linux");
}
#[cfg(target_os = "linux")]
{
match feature_test().unwrap() {
FeatureTestResult::PlatformPreferred(engine) => engine,
FeatureTestResult::Worse {
engine: _engine,
remark,
} => {
panic!("This benchmark does I/O can requires the preferred I/O engine: {remark}");
}
}
}
}

View File

@@ -92,6 +92,7 @@ tracing-opentelemetry.workspace = true
tracing-subscriber.workspace = true
tracing-utils.workspace = true
tracing.workspace = true
try-lock.workspace = true
typed-json.workspace = true
url.workspace = true
urlencoding.workspace = true

View File

@@ -218,7 +218,7 @@ impl RateBucketInfo {
impl AuthenticationConfig {
pub fn check_rate_limit(
&self,
ctx: &mut RequestMonitoring,
ctx: &RequestMonitoring,
config: &AuthenticationConfig,
secret: AuthSecret,
endpoint: &EndpointId,
@@ -243,7 +243,7 @@ impl AuthenticationConfig {
let limit_not_exceeded = self.rate_limiter.check(
(
endpoint_int,
MaskedIp::new(ctx.peer_addr, config.rate_limit_ip_subnet),
MaskedIp::new(ctx.peer_addr(), config.rate_limit_ip_subnet),
),
password_weight,
);
@@ -274,7 +274,7 @@ impl AuthenticationConfig {
///
/// All authentication flows will emit an AuthenticationOk message if successful.
async fn auth_quirks(
ctx: &mut RequestMonitoring,
ctx: &RequestMonitoring,
api: &impl console::Api,
user_info: ComputeUserInfoMaybeEndpoint,
client: &mut stream::PqStream<Stream<impl AsyncRead + AsyncWrite + Unpin>>,
@@ -303,8 +303,8 @@ async fn auth_quirks(
let (allowed_ips, maybe_secret) = api.get_allowed_ips_and_secret(ctx, &info).await?;
// check allowed list
if !check_peer_addr_is_in_list(&ctx.peer_addr, &allowed_ips) {
return Err(auth::AuthError::ip_address_not_allowed(ctx.peer_addr));
if !check_peer_addr_is_in_list(&ctx.peer_addr(), &allowed_ips) {
return Err(auth::AuthError::ip_address_not_allowed(ctx.peer_addr()));
}
if !endpoint_rate_limiter.check(info.endpoint.clone().into(), 1) {
@@ -356,7 +356,7 @@ async fn auth_quirks(
}
async fn authenticate_with_secret(
ctx: &mut RequestMonitoring,
ctx: &RequestMonitoring,
secret: AuthSecret,
info: ComputeUserInfo,
client: &mut stream::PqStream<Stream<impl AsyncRead + AsyncWrite + Unpin>>,
@@ -421,7 +421,7 @@ impl<'a> BackendType<'a, ComputeUserInfoMaybeEndpoint, &()> {
#[tracing::instrument(fields(allow_cleartext = allow_cleartext), skip_all)]
pub async fn authenticate(
self,
ctx: &mut RequestMonitoring,
ctx: &RequestMonitoring,
client: &mut stream::PqStream<Stream<impl AsyncRead + AsyncWrite + Unpin>>,
allow_cleartext: bool,
config: &'static AuthenticationConfig,
@@ -467,7 +467,7 @@ impl<'a> BackendType<'a, ComputeUserInfoMaybeEndpoint, &()> {
impl BackendType<'_, ComputeUserInfo, &()> {
pub async fn get_role_secret(
&self,
ctx: &mut RequestMonitoring,
ctx: &RequestMonitoring,
) -> Result<CachedRoleSecret, GetAuthInfoError> {
use BackendType::*;
match self {
@@ -478,7 +478,7 @@ impl BackendType<'_, ComputeUserInfo, &()> {
pub async fn get_allowed_ips_and_secret(
&self,
ctx: &mut RequestMonitoring,
ctx: &RequestMonitoring,
) -> Result<(CachedAllowedIps, Option<CachedRoleSecret>), GetAuthInfoError> {
use BackendType::*;
match self {
@@ -492,7 +492,7 @@ impl BackendType<'_, ComputeUserInfo, &()> {
impl ComputeConnectBackend for BackendType<'_, ComputeCredentials, NodeInfo> {
async fn wake_compute(
&self,
ctx: &mut RequestMonitoring,
ctx: &RequestMonitoring,
) -> Result<CachedNodeInfo, console::errors::WakeComputeError> {
use BackendType::*;
@@ -514,7 +514,7 @@ impl ComputeConnectBackend for BackendType<'_, ComputeCredentials, NodeInfo> {
impl ComputeConnectBackend for BackendType<'_, ComputeCredentials, &()> {
async fn wake_compute(
&self,
ctx: &mut RequestMonitoring,
ctx: &RequestMonitoring,
) -> Result<CachedNodeInfo, console::errors::WakeComputeError> {
use BackendType::*;
@@ -571,7 +571,7 @@ mod tests {
impl console::Api for Auth {
async fn get_role_secret(
&self,
_ctx: &mut RequestMonitoring,
_ctx: &RequestMonitoring,
_user_info: &super::ComputeUserInfo,
) -> Result<CachedRoleSecret, console::errors::GetAuthInfoError> {
Ok(CachedRoleSecret::new_uncached(Some(self.secret.clone())))
@@ -579,7 +579,7 @@ mod tests {
async fn get_allowed_ips_and_secret(
&self,
_ctx: &mut RequestMonitoring,
_ctx: &RequestMonitoring,
_user_info: &super::ComputeUserInfo,
) -> Result<(CachedAllowedIps, Option<CachedRoleSecret>), console::errors::GetAuthInfoError>
{
@@ -591,7 +591,7 @@ mod tests {
async fn wake_compute(
&self,
_ctx: &mut RequestMonitoring,
_ctx: &RequestMonitoring,
_user_info: &super::ComputeUserInfo,
) -> Result<CachedNodeInfo, console::errors::WakeComputeError> {
unimplemented!()
@@ -665,7 +665,7 @@ mod tests {
let (mut client, server) = tokio::io::duplex(1024);
let mut stream = PqStream::new(Stream::from_raw(server));
let mut ctx = RequestMonitoring::test();
let ctx = RequestMonitoring::test();
let api = Auth {
ips: vec![],
secret: AuthSecret::Scram(ServerSecret::build("my-secret-password").await.unwrap()),
@@ -723,7 +723,7 @@ mod tests {
));
let _creds = auth_quirks(
&mut ctx,
&ctx,
&api,
user_info,
&mut stream,
@@ -742,7 +742,7 @@ mod tests {
let (mut client, server) = tokio::io::duplex(1024);
let mut stream = PqStream::new(Stream::from_raw(server));
let mut ctx = RequestMonitoring::test();
let ctx = RequestMonitoring::test();
let api = Auth {
ips: vec![],
secret: AuthSecret::Scram(ServerSecret::build("my-secret-password").await.unwrap()),
@@ -775,7 +775,7 @@ mod tests {
));
let _creds = auth_quirks(
&mut ctx,
&ctx,
&api,
user_info,
&mut stream,
@@ -794,7 +794,7 @@ mod tests {
let (mut client, server) = tokio::io::duplex(1024);
let mut stream = PqStream::new(Stream::from_raw(server));
let mut ctx = RequestMonitoring::test();
let ctx = RequestMonitoring::test();
let api = Auth {
ips: vec![],
secret: AuthSecret::Scram(ServerSecret::build("my-secret-password").await.unwrap()),
@@ -828,7 +828,7 @@ mod tests {
));
let creds = auth_quirks(
&mut ctx,
&ctx,
&api,
user_info,
&mut stream,

View File

@@ -12,7 +12,7 @@ use tokio::io::{AsyncRead, AsyncWrite};
use tracing::{info, warn};
pub(super) async fn authenticate(
ctx: &mut RequestMonitoring,
ctx: &RequestMonitoring,
creds: ComputeUserInfo,
client: &mut PqStream<Stream<impl AsyncRead + AsyncWrite + Unpin>>,
config: &'static AuthenticationConfig,
@@ -27,7 +27,7 @@ pub(super) async fn authenticate(
}
AuthSecret::Scram(secret) => {
info!("auth endpoint chooses SCRAM");
let scram = auth::Scram(&secret, &mut *ctx);
let scram = auth::Scram(&secret, ctx);
let auth_outcome = tokio::time::timeout(
config.scram_protocol_timeout,

View File

@@ -18,7 +18,7 @@ use tracing::{info, warn};
/// These properties are benefical for serverless JS workers, so we
/// use this mechanism for websocket connections.
pub async fn authenticate_cleartext(
ctx: &mut RequestMonitoring,
ctx: &RequestMonitoring,
info: ComputeUserInfo,
client: &mut stream::PqStream<Stream<impl AsyncRead + AsyncWrite + Unpin>>,
secret: AuthSecret,
@@ -28,7 +28,7 @@ pub async fn authenticate_cleartext(
ctx.set_auth_method(crate::context::AuthMethod::Cleartext);
// pause the timer while we communicate with the client
let paused = ctx.latency_timer.pause(crate::metrics::Waiting::Client);
let paused = ctx.latency_timer_pause(crate::metrics::Waiting::Client);
let ep = EndpointIdInt::from(&info.endpoint);
@@ -60,7 +60,7 @@ pub async fn authenticate_cleartext(
/// Similar to [`authenticate_cleartext`], but there's a specific password format,
/// and passwords are not yet validated (we don't know how to validate them!)
pub async fn password_hack_no_authentication(
ctx: &mut RequestMonitoring,
ctx: &RequestMonitoring,
info: ComputeUserInfoNoEndpoint,
client: &mut stream::PqStream<Stream<impl AsyncRead + AsyncWrite + Unpin>>,
) -> auth::Result<ComputeCredentials> {
@@ -68,7 +68,7 @@ pub async fn password_hack_no_authentication(
ctx.set_auth_method(crate::context::AuthMethod::Cleartext);
// pause the timer while we communicate with the client
let _paused = ctx.latency_timer.pause(crate::metrics::Waiting::Client);
let _paused = ctx.latency_timer_pause(crate::metrics::Waiting::Client);
let payload = AuthFlow::new(client)
.begin(auth::PasswordHack)

View File

@@ -57,7 +57,7 @@ pub fn new_psql_session_id() -> String {
}
pub(super) async fn authenticate(
ctx: &mut RequestMonitoring,
ctx: &RequestMonitoring,
link_uri: &reqwest::Url,
client: &mut PqStream<impl AsyncRead + AsyncWrite + Unpin>,
) -> auth::Result<NodeInfo> {

View File

@@ -84,7 +84,7 @@ pub fn endpoint_sni(
impl ComputeUserInfoMaybeEndpoint {
pub fn parse(
ctx: &mut RequestMonitoring,
ctx: &RequestMonitoring,
params: &StartupMessageParams,
sni: Option<&str>,
common_names: Option<&HashSet<String>>,
@@ -249,8 +249,8 @@ mod tests {
fn parse_bare_minimum() -> anyhow::Result<()> {
// According to postgresql, only `user` should be required.
let options = StartupMessageParams::new([("user", "john_doe")]);
let mut ctx = RequestMonitoring::test();
let user_info = ComputeUserInfoMaybeEndpoint::parse(&mut ctx, &options, None, None)?;
let ctx = RequestMonitoring::test();
let user_info = ComputeUserInfoMaybeEndpoint::parse(&ctx, &options, None, None)?;
assert_eq!(user_info.user, "john_doe");
assert_eq!(user_info.endpoint_id, None);
@@ -264,8 +264,8 @@ mod tests {
("database", "world"), // should be ignored
("foo", "bar"), // should be ignored
]);
let mut ctx = RequestMonitoring::test();
let user_info = ComputeUserInfoMaybeEndpoint::parse(&mut ctx, &options, None, None)?;
let ctx = RequestMonitoring::test();
let user_info = ComputeUserInfoMaybeEndpoint::parse(&ctx, &options, None, None)?;
assert_eq!(user_info.user, "john_doe");
assert_eq!(user_info.endpoint_id, None);
@@ -279,9 +279,9 @@ mod tests {
let sni = Some("foo.localhost");
let common_names = Some(["localhost".into()].into());
let mut ctx = RequestMonitoring::test();
let ctx = RequestMonitoring::test();
let user_info =
ComputeUserInfoMaybeEndpoint::parse(&mut ctx, &options, sni, common_names.as_ref())?;
ComputeUserInfoMaybeEndpoint::parse(&ctx, &options, sni, common_names.as_ref())?;
assert_eq!(user_info.user, "john_doe");
assert_eq!(user_info.endpoint_id.as_deref(), Some("foo"));
assert_eq!(user_info.options.get_cache_key("foo"), "foo");
@@ -296,8 +296,8 @@ mod tests {
("options", "-ckey=1 project=bar -c geqo=off"),
]);
let mut ctx = RequestMonitoring::test();
let user_info = ComputeUserInfoMaybeEndpoint::parse(&mut ctx, &options, None, None)?;
let ctx = RequestMonitoring::test();
let user_info = ComputeUserInfoMaybeEndpoint::parse(&ctx, &options, None, None)?;
assert_eq!(user_info.user, "john_doe");
assert_eq!(user_info.endpoint_id.as_deref(), Some("bar"));
@@ -311,8 +311,8 @@ mod tests {
("options", "-ckey=1 endpoint=bar -c geqo=off"),
]);
let mut ctx = RequestMonitoring::test();
let user_info = ComputeUserInfoMaybeEndpoint::parse(&mut ctx, &options, None, None)?;
let ctx = RequestMonitoring::test();
let user_info = ComputeUserInfoMaybeEndpoint::parse(&ctx, &options, None, None)?;
assert_eq!(user_info.user, "john_doe");
assert_eq!(user_info.endpoint_id.as_deref(), Some("bar"));
@@ -329,8 +329,8 @@ mod tests {
),
]);
let mut ctx = RequestMonitoring::test();
let user_info = ComputeUserInfoMaybeEndpoint::parse(&mut ctx, &options, None, None)?;
let ctx = RequestMonitoring::test();
let user_info = ComputeUserInfoMaybeEndpoint::parse(&ctx, &options, None, None)?;
assert_eq!(user_info.user, "john_doe");
assert!(user_info.endpoint_id.is_none());
@@ -344,8 +344,8 @@ mod tests {
("options", "-ckey=1 endpoint=bar project=foo -c geqo=off"),
]);
let mut ctx = RequestMonitoring::test();
let user_info = ComputeUserInfoMaybeEndpoint::parse(&mut ctx, &options, None, None)?;
let ctx = RequestMonitoring::test();
let user_info = ComputeUserInfoMaybeEndpoint::parse(&ctx, &options, None, None)?;
assert_eq!(user_info.user, "john_doe");
assert!(user_info.endpoint_id.is_none());
@@ -359,9 +359,9 @@ mod tests {
let sni = Some("baz.localhost");
let common_names = Some(["localhost".into()].into());
let mut ctx = RequestMonitoring::test();
let ctx = RequestMonitoring::test();
let user_info =
ComputeUserInfoMaybeEndpoint::parse(&mut ctx, &options, sni, common_names.as_ref())?;
ComputeUserInfoMaybeEndpoint::parse(&ctx, &options, sni, common_names.as_ref())?;
assert_eq!(user_info.user, "john_doe");
assert_eq!(user_info.endpoint_id.as_deref(), Some("baz"));
@@ -374,16 +374,16 @@ mod tests {
let common_names = Some(["a.com".into(), "b.com".into()].into());
let sni = Some("p1.a.com");
let mut ctx = RequestMonitoring::test();
let ctx = RequestMonitoring::test();
let user_info =
ComputeUserInfoMaybeEndpoint::parse(&mut ctx, &options, sni, common_names.as_ref())?;
ComputeUserInfoMaybeEndpoint::parse(&ctx, &options, sni, common_names.as_ref())?;
assert_eq!(user_info.endpoint_id.as_deref(), Some("p1"));
let common_names = Some(["a.com".into(), "b.com".into()].into());
let sni = Some("p1.b.com");
let mut ctx = RequestMonitoring::test();
let ctx = RequestMonitoring::test();
let user_info =
ComputeUserInfoMaybeEndpoint::parse(&mut ctx, &options, sni, common_names.as_ref())?;
ComputeUserInfoMaybeEndpoint::parse(&ctx, &options, sni, common_names.as_ref())?;
assert_eq!(user_info.endpoint_id.as_deref(), Some("p1"));
Ok(())
@@ -397,10 +397,9 @@ mod tests {
let sni = Some("second.localhost");
let common_names = Some(["localhost".into()].into());
let mut ctx = RequestMonitoring::test();
let err =
ComputeUserInfoMaybeEndpoint::parse(&mut ctx, &options, sni, common_names.as_ref())
.expect_err("should fail");
let ctx = RequestMonitoring::test();
let err = ComputeUserInfoMaybeEndpoint::parse(&ctx, &options, sni, common_names.as_ref())
.expect_err("should fail");
match err {
InconsistentProjectNames { domain, option } => {
assert_eq!(option, "first");
@@ -417,10 +416,9 @@ mod tests {
let sni = Some("project.localhost");
let common_names = Some(["example.com".into()].into());
let mut ctx = RequestMonitoring::test();
let err =
ComputeUserInfoMaybeEndpoint::parse(&mut ctx, &options, sni, common_names.as_ref())
.expect_err("should fail");
let ctx = RequestMonitoring::test();
let err = ComputeUserInfoMaybeEndpoint::parse(&ctx, &options, sni, common_names.as_ref())
.expect_err("should fail");
match err {
UnknownCommonName { cn } => {
assert_eq!(cn, "localhost");
@@ -438,9 +436,9 @@ mod tests {
let sni = Some("project.localhost");
let common_names = Some(["localhost".into()].into());
let mut ctx = RequestMonitoring::test();
let ctx = RequestMonitoring::test();
let user_info =
ComputeUserInfoMaybeEndpoint::parse(&mut ctx, &options, sni, common_names.as_ref())?;
ComputeUserInfoMaybeEndpoint::parse(&ctx, &options, sni, common_names.as_ref())?;
assert_eq!(user_info.endpoint_id.as_deref(), Some("project"));
assert_eq!(
user_info.options.get_cache_key("project"),

View File

@@ -27,7 +27,7 @@ pub trait AuthMethod {
pub struct Begin;
/// Use [SCRAM](crate::scram)-based auth in [`AuthFlow`].
pub struct Scram<'a>(pub &'a scram::ServerSecret, pub &'a mut RequestMonitoring);
pub struct Scram<'a>(pub &'a scram::ServerSecret, pub &'a RequestMonitoring);
impl AuthMethod for Scram<'_> {
#[inline(always)]
@@ -155,7 +155,7 @@ impl<S: AsyncRead + AsyncWrite + Unpin> AuthFlow<'_, S, Scram<'_>> {
let Scram(secret, ctx) = self.state;
// pause the timer while we communicate with the client
let _paused = ctx.latency_timer.pause(crate::metrics::Waiting::Client);
let _paused = ctx.latency_timer_pause(crate::metrics::Waiting::Client);
// Initial client message contains the chosen auth method's name.
let msg = self.stream.read_password_message().await?;
@@ -168,10 +168,8 @@ impl<S: AsyncRead + AsyncWrite + Unpin> AuthFlow<'_, S, Scram<'_>> {
}
match sasl.method {
SCRAM_SHA_256 => ctx.auth_method = Some(crate::context::AuthMethod::ScramSha256),
SCRAM_SHA_256_PLUS => {
ctx.auth_method = Some(crate::context::AuthMethod::ScramSha256Plus)
}
SCRAM_SHA_256 => ctx.set_auth_method(crate::context::AuthMethod::ScramSha256),
SCRAM_SHA_256_PLUS => ctx.set_auth_method(crate::context::AuthMethod::ScramSha256Plus),
_ => {}
}
info!("client chooses {}", sasl.method);

View File

@@ -205,7 +205,7 @@ async fn task_main(
const ERR_INSECURE_CONNECTION: &str = "connection is insecure (try using `sslmode=require`)";
async fn ssl_handshake<S: AsyncRead + AsyncWrite + Unpin>(
ctx: &mut RequestMonitoring,
ctx: &RequestMonitoring,
raw_stream: S,
tls_config: Arc<rustls::ServerConfig>,
tls_server_end_point: TlsServerEndPoint,
@@ -256,13 +256,13 @@ async fn ssl_handshake<S: AsyncRead + AsyncWrite + Unpin>(
}
async fn handle_client(
mut ctx: RequestMonitoring,
ctx: RequestMonitoring,
dest_suffix: Arc<String>,
tls_config: Arc<rustls::ServerConfig>,
tls_server_end_point: TlsServerEndPoint,
stream: impl AsyncRead + AsyncWrite + Unpin,
) -> anyhow::Result<()> {
let mut tls_stream = ssl_handshake(&mut ctx, stream, tls_config, tls_server_end_point).await?;
let mut tls_stream = ssl_handshake(&ctx, stream, tls_config, tls_server_end_point).await?;
// Cut off first part of the SNI domain
// We receive required destination details in the format of

View File

@@ -5,6 +5,7 @@ use aws_config::meta::region::RegionProviderChain;
use aws_config::profile::ProfileFileCredentialsProvider;
use aws_config::provider_config::ProviderConfig;
use aws_config::web_identity_token::WebIdentityTokenCredentialsProvider;
use aws_config::Region;
use futures::future::Either;
use proxy::auth;
use proxy::auth::backend::AuthRateLimiter;
@@ -290,9 +291,10 @@ async fn main() -> anyhow::Result<()> {
let config = build_config(&args)?;
info!("Authentication backend: {}", config.auth_backend);
info!("Using region: {}", config.aws_region);
info!("Using region: {}", args.aws_region);
let region_provider = RegionProviderChain::default_provider().or_else(&*config.aws_region); // Replace with your Redis region if needed
let region_provider =
RegionProviderChain::default_provider().or_else(Region::new(args.aws_region.clone()));
let provider_conf =
ProviderConfig::without_region().with_region(region_provider.region().await);
let aws_credentials_provider = {
@@ -318,7 +320,7 @@ async fn main() -> anyhow::Result<()> {
};
let elasticache_credentials_provider = Arc::new(elasticache::CredentialsProvider::new(
elasticache::AWSIRSAConfig::new(
config.aws_region.clone(),
args.aws_region.clone(),
args.redis_cluster_name,
args.redis_user_id,
),
@@ -376,11 +378,14 @@ async fn main() -> anyhow::Result<()> {
let cancel_map = CancelMap::default();
let redis_rps_limit = Vec::leak(args.redis_rps_limit.clone());
RateBucketInfo::validate(redis_rps_limit)?;
let redis_publisher = match &regional_redis_client {
Some(redis_publisher) => Some(Arc::new(Mutex::new(RedisPublisherClient::new(
redis_publisher.clone(),
args.region.clone(),
&config.redis_rps_limit,
redis_rps_limit,
)?))),
None => None,
};
@@ -656,7 +661,6 @@ fn build_config(args: &ProxyCliArgs) -> anyhow::Result<&'static ProxyConfig> {
)?;
let http_config = HttpConfig {
request_timeout: args.sql_over_http.sql_over_http_timeout,
pool_options: GlobalConnPoolOptions {
max_conns_per_endpoint: args.sql_over_http.sql_over_http_pool_max_conns_per_endpoint,
gc_epoch: args.sql_over_http.sql_over_http_pool_gc_epoch,
@@ -676,9 +680,6 @@ fn build_config(args: &ProxyCliArgs) -> anyhow::Result<&'static ProxyConfig> {
rate_limit_ip_subnet: args.auth_rate_limit_ip_subnet,
};
let mut redis_rps_limit = args.redis_rps_limit.clone();
RateBucketInfo::validate(&mut redis_rps_limit)?;
let config = Box::leak(Box::new(ProxyConfig {
tls_config,
auth_backend,
@@ -687,11 +688,8 @@ fn build_config(args: &ProxyCliArgs) -> anyhow::Result<&'static ProxyConfig> {
http_config,
authentication_config,
require_client_ip: args.require_client_ip,
disable_ip_check_for_http: args.disable_ip_check_for_http,
redis_rps_limit,
handshake_timeout: args.handshake_timeout,
region: args.region.clone(),
aws_region: args.aws_region.clone(),
wake_compute_retry_config: config::RetryConfig::parse(&args.wake_compute_retry)?,
connect_compute_locks,
connect_to_compute_retry_config: config::RetryConfig::parse(

View File

@@ -68,7 +68,7 @@ impl EndpointsCache {
ready: AtomicBool::new(false),
}
}
pub async fn is_valid(&self, ctx: &mut RequestMonitoring, endpoint: &EndpointId) -> bool {
pub async fn is_valid(&self, ctx: &RequestMonitoring, endpoint: &EndpointId) -> bool {
if !self.ready.load(Ordering::Acquire) {
return true;
}

View File

@@ -288,12 +288,12 @@ impl ConnCfg {
/// Connect to a corresponding compute node.
pub async fn connect(
&self,
ctx: &mut RequestMonitoring,
ctx: &RequestMonitoring,
allow_self_signed_compute: bool,
aux: MetricsAuxInfo,
timeout: Duration,
) -> Result<PostgresConnection, ConnectionError> {
let pause = ctx.latency_timer.pause(crate::metrics::Waiting::Compute);
let pause = ctx.latency_timer_pause(crate::metrics::Waiting::Compute);
let (socket_addr, stream, host) = self.connect_raw(timeout).await?;
drop(pause);
@@ -316,14 +316,14 @@ impl ConnCfg {
)?;
// connect_raw() will not use TLS if sslmode is "disable"
let pause = ctx.latency_timer.pause(crate::metrics::Waiting::Compute);
let pause = ctx.latency_timer_pause(crate::metrics::Waiting::Compute);
let (client, connection) = self.0.connect_raw(stream, tls).await?;
drop(pause);
tracing::Span::current().record("pid", tracing::field::display(client.get_process_id()));
let stream = connection.stream.into_inner();
info!(
cold_start_info = ctx.cold_start_info.as_str(),
cold_start_info = ctx.cold_start_info().as_str(),
"connected to compute node at {host} ({socket_addr}) sslmode={:?}",
self.0.get_ssl_mode()
);
@@ -342,7 +342,7 @@ impl ConnCfg {
params,
cancel_closure,
aux,
_guage: Metrics::get().proxy.db_connections.guard(ctx.protocol),
_guage: Metrics::get().proxy.db_connections.guard(ctx.protocol()),
};
Ok(connection)

View File

@@ -31,11 +31,8 @@ pub struct ProxyConfig {
pub http_config: HttpConfig,
pub authentication_config: AuthenticationConfig,
pub require_client_ip: bool,
pub disable_ip_check_for_http: bool,
pub redis_rps_limit: Vec<RateBucketInfo>,
pub region: String,
pub handshake_timeout: Duration,
pub aws_region: String,
pub wake_compute_retry_config: RetryConfig,
pub connect_compute_locks: ApiLocks<Host>,
pub connect_to_compute_retry_config: RetryConfig,
@@ -55,7 +52,6 @@ pub struct TlsConfig {
}
pub struct HttpConfig {
pub request_timeout: tokio::time::Duration,
pub pool_options: GlobalConnPoolOptions,
pub cancel_set: CancelSet,
pub client_conn_threshold: u64,

View File

@@ -292,7 +292,7 @@ pub struct NodeInfo {
impl NodeInfo {
pub async fn connect(
&self,
ctx: &mut RequestMonitoring,
ctx: &RequestMonitoring,
timeout: Duration,
) -> Result<compute::PostgresConnection, compute::ConnectionError> {
self.config
@@ -330,20 +330,20 @@ pub(crate) trait Api {
/// We still have to mock the scram to avoid leaking information that user doesn't exist.
async fn get_role_secret(
&self,
ctx: &mut RequestMonitoring,
ctx: &RequestMonitoring,
user_info: &ComputeUserInfo,
) -> Result<CachedRoleSecret, errors::GetAuthInfoError>;
async fn get_allowed_ips_and_secret(
&self,
ctx: &mut RequestMonitoring,
ctx: &RequestMonitoring,
user_info: &ComputeUserInfo,
) -> Result<(CachedAllowedIps, Option<CachedRoleSecret>), errors::GetAuthInfoError>;
/// Wake up the compute node and return the corresponding connection info.
async fn wake_compute(
&self,
ctx: &mut RequestMonitoring,
ctx: &RequestMonitoring,
user_info: &ComputeUserInfo,
) -> Result<CachedNodeInfo, errors::WakeComputeError>;
}
@@ -363,7 +363,7 @@ pub enum ConsoleBackend {
impl Api for ConsoleBackend {
async fn get_role_secret(
&self,
ctx: &mut RequestMonitoring,
ctx: &RequestMonitoring,
user_info: &ComputeUserInfo,
) -> Result<CachedRoleSecret, errors::GetAuthInfoError> {
use ConsoleBackend::*;
@@ -378,7 +378,7 @@ impl Api for ConsoleBackend {
async fn get_allowed_ips_and_secret(
&self,
ctx: &mut RequestMonitoring,
ctx: &RequestMonitoring,
user_info: &ComputeUserInfo,
) -> Result<(CachedAllowedIps, Option<CachedRoleSecret>), errors::GetAuthInfoError> {
use ConsoleBackend::*;
@@ -393,7 +393,7 @@ impl Api for ConsoleBackend {
async fn wake_compute(
&self,
ctx: &mut RequestMonitoring,
ctx: &RequestMonitoring,
user_info: &ComputeUserInfo,
) -> Result<CachedNodeInfo, errors::WakeComputeError> {
use ConsoleBackend::*;

View File

@@ -158,7 +158,7 @@ impl super::Api for Api {
#[tracing::instrument(skip_all)]
async fn get_role_secret(
&self,
_ctx: &mut RequestMonitoring,
_ctx: &RequestMonitoring,
user_info: &ComputeUserInfo,
) -> Result<CachedRoleSecret, GetAuthInfoError> {
Ok(CachedRoleSecret::new_uncached(
@@ -168,7 +168,7 @@ impl super::Api for Api {
async fn get_allowed_ips_and_secret(
&self,
_ctx: &mut RequestMonitoring,
_ctx: &RequestMonitoring,
user_info: &ComputeUserInfo,
) -> Result<(CachedAllowedIps, Option<CachedRoleSecret>), GetAuthInfoError> {
Ok((
@@ -182,7 +182,7 @@ impl super::Api for Api {
#[tracing::instrument(skip_all)]
async fn wake_compute(
&self,
_ctx: &mut RequestMonitoring,
_ctx: &RequestMonitoring,
_user_info: &ComputeUserInfo,
) -> Result<CachedNodeInfo, WakeComputeError> {
self.do_wake_compute().map_ok(Cached::new_uncached).await

View File

@@ -57,7 +57,7 @@ impl Api {
async fn do_get_auth_info(
&self,
ctx: &mut RequestMonitoring,
ctx: &RequestMonitoring,
user_info: &ComputeUserInfo,
) -> Result<AuthInfo, GetAuthInfoError> {
if !self
@@ -69,7 +69,7 @@ impl Api {
info!("endpoint is not valid, skipping the request");
return Ok(AuthInfo::default());
}
let request_id = ctx.session_id.to_string();
let request_id = ctx.session_id().to_string();
let application_name = ctx.console_application_name();
async {
let request = self
@@ -77,7 +77,7 @@ impl Api {
.get("proxy_get_role_secret")
.header("X-Request-ID", &request_id)
.header("Authorization", format!("Bearer {}", &self.jwt))
.query(&[("session_id", ctx.session_id)])
.query(&[("session_id", ctx.session_id())])
.query(&[
("application_name", application_name.as_str()),
("project", user_info.endpoint.as_str()),
@@ -87,7 +87,7 @@ impl Api {
info!(url = request.url().as_str(), "sending http request");
let start = Instant::now();
let pause = ctx.latency_timer.pause(crate::metrics::Waiting::Cplane);
let pause = ctx.latency_timer_pause(crate::metrics::Waiting::Cplane);
let response = self.endpoint.execute(request).await?;
drop(pause);
info!(duration = ?start.elapsed(), "received http response");
@@ -130,10 +130,10 @@ impl Api {
async fn do_wake_compute(
&self,
ctx: &mut RequestMonitoring,
ctx: &RequestMonitoring,
user_info: &ComputeUserInfo,
) -> Result<NodeInfo, WakeComputeError> {
let request_id = ctx.session_id.to_string();
let request_id = ctx.session_id().to_string();
let application_name = ctx.console_application_name();
async {
let mut request_builder = self
@@ -141,7 +141,7 @@ impl Api {
.get("proxy_wake_compute")
.header("X-Request-ID", &request_id)
.header("Authorization", format!("Bearer {}", &self.jwt))
.query(&[("session_id", ctx.session_id)])
.query(&[("session_id", ctx.session_id())])
.query(&[
("application_name", application_name.as_str()),
("project", user_info.endpoint.as_str()),
@@ -156,7 +156,7 @@ impl Api {
info!(url = request.url().as_str(), "sending http request");
let start = Instant::now();
let pause = ctx.latency_timer.pause(crate::metrics::Waiting::Cplane);
let pause = ctx.latency_timer_pause(crate::metrics::Waiting::Cplane);
let response = self.endpoint.execute(request).await?;
drop(pause);
info!(duration = ?start.elapsed(), "received http response");
@@ -192,7 +192,7 @@ impl super::Api for Api {
#[tracing::instrument(skip_all)]
async fn get_role_secret(
&self,
ctx: &mut RequestMonitoring,
ctx: &RequestMonitoring,
user_info: &ComputeUserInfo,
) -> Result<CachedRoleSecret, GetAuthInfoError> {
let normalized_ep = &user_info.endpoint.normalize();
@@ -226,7 +226,7 @@ impl super::Api for Api {
async fn get_allowed_ips_and_secret(
&self,
ctx: &mut RequestMonitoring,
ctx: &RequestMonitoring,
user_info: &ComputeUserInfo,
) -> Result<(CachedAllowedIps, Option<CachedRoleSecret>), GetAuthInfoError> {
let normalized_ep = &user_info.endpoint.normalize();
@@ -268,7 +268,7 @@ impl super::Api for Api {
#[tracing::instrument(skip_all)]
async fn wake_compute(
&self,
ctx: &mut RequestMonitoring,
ctx: &RequestMonitoring,
user_info: &ComputeUserInfo,
) -> Result<CachedNodeInfo, WakeComputeError> {
let key = user_info.endpoint_cache_key();

View File

@@ -7,13 +7,14 @@ use smol_str::SmolStr;
use std::net::IpAddr;
use tokio::sync::mpsc;
use tracing::{field::display, info, info_span, Span};
use try_lock::TryLock;
use uuid::Uuid;
use crate::{
console::messages::{ColdStartInfo, MetricsAuxInfo},
error::ErrorKind,
intern::{BranchIdInt, ProjectIdInt},
metrics::{ConnectOutcome, InvalidEndpointsGroup, LatencyTimer, Metrics, Protocol},
metrics::{ConnectOutcome, InvalidEndpointsGroup, LatencyTimer, Metrics, Protocol, Waiting},
DbName, EndpointId, RoleName,
};
@@ -28,7 +29,15 @@ pub static LOG_CHAN_DISCONNECT: OnceCell<mpsc::WeakUnboundedSender<RequestData>>
///
/// This data should **not** be used for connection logic, only for observability and limiting purposes.
/// All connection logic should instead use strongly typed state machines, not a bunch of Options.
pub struct RequestMonitoring {
pub struct RequestMonitoring(
/// To allow easier use of the ctx object, we have interior mutability.
/// I would typically use a RefCell but that would break the `Send` requirements
/// so we need something with thread-safety. `TryLock` is a cheap alternative
/// that offers similar semantics to a `RefCell` but with synchronisation.
TryLock<RequestMonitoringInner>,
);
struct RequestMonitoringInner {
pub peer_addr: IpAddr,
pub session_id: Uuid,
pub protocol: Protocol,
@@ -85,7 +94,7 @@ impl RequestMonitoring {
role = tracing::field::Empty,
);
Self {
let inner = RequestMonitoringInner {
peer_addr,
session_id,
protocol,
@@ -110,7 +119,9 @@ impl RequestMonitoring {
disconnect_sender: LOG_CHAN_DISCONNECT.get().and_then(|tx| tx.upgrade()),
latency_timer: LatencyTimer::new(protocol),
disconnect_timestamp: None,
}
};
Self(TryLock::new(inner))
}
#[cfg(test)]
@@ -119,48 +130,177 @@ impl RequestMonitoring {
}
pub fn console_application_name(&self) -> String {
let this = self.0.try_lock().expect("should not deadlock");
format!(
"{}/{}",
self.application.as_deref().unwrap_or_default(),
self.protocol
this.application.as_deref().unwrap_or_default(),
this.protocol
)
}
pub fn set_rejected(&mut self, rejected: bool) {
self.rejected = Some(rejected);
pub fn set_rejected(&self, rejected: bool) {
let mut this = self.0.try_lock().expect("should not deadlock");
this.rejected = Some(rejected);
}
pub fn set_cold_start_info(&mut self, info: ColdStartInfo) {
pub fn set_cold_start_info(&self, info: ColdStartInfo) {
self.0
.try_lock()
.expect("should not deadlock")
.set_cold_start_info(info);
}
pub fn set_db_options(&self, options: StartupMessageParams) {
let mut this = self.0.try_lock().expect("should not deadlock");
this.set_application(options.get("application_name").map(SmolStr::from));
if let Some(user) = options.get("user") {
this.set_user(user.into());
}
if let Some(dbname) = options.get("database") {
this.set_dbname(dbname.into());
}
this.pg_options = Some(options);
}
pub fn set_project(&self, x: MetricsAuxInfo) {
let mut this = self.0.try_lock().expect("should not deadlock");
if this.endpoint_id.is_none() {
this.set_endpoint_id(x.endpoint_id.as_str().into())
}
this.branch = Some(x.branch_id);
this.project = Some(x.project_id);
this.set_cold_start_info(x.cold_start_info);
}
pub fn set_project_id(&self, project_id: ProjectIdInt) {
let mut this = self.0.try_lock().expect("should not deadlock");
this.project = Some(project_id);
}
pub fn set_endpoint_id(&self, endpoint_id: EndpointId) {
self.0
.try_lock()
.expect("should not deadlock")
.set_endpoint_id(endpoint_id);
}
pub fn set_dbname(&self, dbname: DbName) {
self.0
.try_lock()
.expect("should not deadlock")
.set_dbname(dbname);
}
pub fn set_user(&self, user: RoleName) {
self.0
.try_lock()
.expect("should not deadlock")
.set_user(user);
}
pub fn set_auth_method(&self, auth_method: AuthMethod) {
let mut this = self.0.try_lock().expect("should not deadlock");
this.auth_method = Some(auth_method);
}
pub fn has_private_peer_addr(&self) -> bool {
self.0
.try_lock()
.expect("should not deadlock")
.has_private_peer_addr()
}
pub fn set_error_kind(&self, kind: ErrorKind) {
let mut this = self.0.try_lock().expect("should not deadlock");
// Do not record errors from the private address to metrics.
if !this.has_private_peer_addr() {
Metrics::get().proxy.errors_total.inc(kind);
}
if let Some(ep) = &this.endpoint_id {
let metric = &Metrics::get().proxy.endpoints_affected_by_errors;
let label = metric.with_labels(kind);
metric.get_metric(label).measure(ep);
}
this.error_kind = Some(kind);
}
pub fn set_success(&self) {
let mut this = self.0.try_lock().expect("should not deadlock");
this.success = true;
}
pub fn log_connect(&self) {
self.0
.try_lock()
.expect("should not deadlock")
.log_connect();
}
pub fn protocol(&self) -> Protocol {
self.0.try_lock().expect("should not deadlock").protocol
}
pub fn span(&self) -> Span {
self.0.try_lock().expect("should not deadlock").span.clone()
}
pub fn session_id(&self) -> Uuid {
self.0.try_lock().expect("should not deadlock").session_id
}
pub fn peer_addr(&self) -> IpAddr {
self.0.try_lock().expect("should not deadlock").peer_addr
}
pub fn cold_start_info(&self) -> ColdStartInfo {
self.0
.try_lock()
.expect("should not deadlock")
.cold_start_info
}
pub fn latency_timer_pause(&self, waiting_for: Waiting) -> LatencyTimerPause {
LatencyTimerPause {
ctx: self,
start: tokio::time::Instant::now(),
waiting_for,
}
}
pub fn success(&self) {
self.0
.try_lock()
.expect("should not deadlock")
.latency_timer
.success()
}
}
pub struct LatencyTimerPause<'a> {
ctx: &'a RequestMonitoring,
start: tokio::time::Instant,
waiting_for: Waiting,
}
impl Drop for LatencyTimerPause<'_> {
fn drop(&mut self) {
self.ctx
.0
.try_lock()
.expect("should not deadlock")
.latency_timer
.unpause(self.start, self.waiting_for);
}
}
impl RequestMonitoringInner {
fn set_cold_start_info(&mut self, info: ColdStartInfo) {
self.cold_start_info = info;
self.latency_timer.cold_start_info(info);
}
pub fn set_db_options(&mut self, options: StartupMessageParams) {
self.set_application(options.get("application_name").map(SmolStr::from));
if let Some(user) = options.get("user") {
self.set_user(user.into());
}
if let Some(dbname) = options.get("database") {
self.set_dbname(dbname.into());
}
self.pg_options = Some(options);
}
pub fn set_project(&mut self, x: MetricsAuxInfo) {
if self.endpoint_id.is_none() {
self.set_endpoint_id(x.endpoint_id.as_str().into())
}
self.branch = Some(x.branch_id);
self.project = Some(x.project_id);
self.set_cold_start_info(x.cold_start_info);
}
pub fn set_project_id(&mut self, project_id: ProjectIdInt) {
self.project = Some(project_id);
}
pub fn set_endpoint_id(&mut self, endpoint_id: EndpointId) {
fn set_endpoint_id(&mut self, endpoint_id: EndpointId) {
if self.endpoint_id.is_none() {
self.span.record("ep", display(&endpoint_id));
let metric = &Metrics::get().proxy.connecting_endpoints;
@@ -176,44 +316,23 @@ impl RequestMonitoring {
}
}
pub fn set_dbname(&mut self, dbname: DbName) {
fn set_dbname(&mut self, dbname: DbName) {
self.dbname = Some(dbname);
}
pub fn set_user(&mut self, user: RoleName) {
fn set_user(&mut self, user: RoleName) {
self.span.record("role", display(&user));
self.user = Some(user);
}
pub fn set_auth_method(&mut self, auth_method: AuthMethod) {
self.auth_method = Some(auth_method);
}
pub fn has_private_peer_addr(&self) -> bool {
fn has_private_peer_addr(&self) -> bool {
match self.peer_addr {
IpAddr::V4(ip) => ip.is_private(),
_ => false,
}
}
pub fn set_error_kind(&mut self, kind: ErrorKind) {
// Do not record errors from the private address to metrics.
if !self.has_private_peer_addr() {
Metrics::get().proxy.errors_total.inc(kind);
}
if let Some(ep) = &self.endpoint_id {
let metric = &Metrics::get().proxy.endpoints_affected_by_errors;
let label = metric.with_labels(kind);
metric.get_metric(label).measure(ep);
}
self.error_kind = Some(kind);
}
pub fn set_success(&mut self) {
self.success = true;
}
pub fn log_connect(&mut self) {
fn log_connect(&mut self) {
let outcome = if self.success {
ConnectOutcome::Success
} else {
@@ -256,7 +375,7 @@ impl RequestMonitoring {
}
}
impl Drop for RequestMonitoring {
impl Drop for RequestMonitoringInner {
fn drop(&mut self) {
if self.sender.is_some() {
self.log_connect();

View File

@@ -23,7 +23,7 @@ use utils::backoff;
use crate::{config::remote_storage_from_toml, context::LOG_CHAN_DISCONNECT};
use super::{RequestMonitoring, LOG_CHAN};
use super::{RequestMonitoringInner, LOG_CHAN};
#[derive(clap::Args, Clone, Debug)]
pub struct ParquetUploadArgs {
@@ -118,8 +118,8 @@ impl<'a> serde::Serialize for Options<'a> {
}
}
impl From<&RequestMonitoring> for RequestData {
fn from(value: &RequestMonitoring) -> Self {
impl From<&RequestMonitoringInner> for RequestData {
fn from(value: &RequestMonitoringInner) -> Self {
Self {
session_id: value.session_id,
peer_addr: value.peer_addr.to_string(),

View File

@@ -370,6 +370,7 @@ pub struct CancellationRequest {
pub kind: CancellationOutcome,
}
#[derive(Clone, Copy)]
pub enum Waiting {
Cplane,
Client,
@@ -398,12 +399,6 @@ pub struct LatencyTimer {
outcome: ConnectOutcome,
}
pub struct LatencyTimerPause<'a> {
timer: &'a mut LatencyTimer,
start: time::Instant,
waiting_for: Waiting,
}
impl LatencyTimer {
pub fn new(protocol: Protocol) -> Self {
Self {
@@ -417,11 +412,13 @@ impl LatencyTimer {
}
}
pub fn pause(&mut self, waiting_for: Waiting) -> LatencyTimerPause<'_> {
LatencyTimerPause {
timer: self,
start: Instant::now(),
waiting_for,
pub fn unpause(&mut self, start: Instant, waiting_for: Waiting) {
let dur = start.elapsed();
match waiting_for {
Waiting::Cplane => self.accumulated.cplane += dur,
Waiting::Client => self.accumulated.client += dur,
Waiting::Compute => self.accumulated.compute += dur,
Waiting::RetryTimeout => self.accumulated.retry += dur,
}
}
@@ -438,18 +435,6 @@ impl LatencyTimer {
}
}
impl Drop for LatencyTimerPause<'_> {
fn drop(&mut self) {
let dur = self.start.elapsed();
match self.waiting_for {
Waiting::Cplane => self.timer.accumulated.cplane += dur,
Waiting::Client => self.timer.accumulated.client += dur,
Waiting::Compute => self.timer.accumulated.compute += dur,
Waiting::RetryTimeout => self.timer.accumulated.retry += dur,
}
}
}
#[derive(FixedCardinalityLabel, Clone, Copy, Debug)]
pub enum ConnectOutcome {
Success,

View File

@@ -113,18 +113,18 @@ pub async fn task_main(
}
};
let mut ctx = RequestMonitoring::new(
let ctx = RequestMonitoring::new(
session_id,
peer_addr,
crate::metrics::Protocol::Tcp,
&config.region,
);
let span = ctx.span.clone();
let span = ctx.span();
let startup = Box::pin(
handle_client(
config,
&mut ctx,
&ctx,
cancellation_handler,
socket,
ClientMode::Tcp,
@@ -240,7 +240,7 @@ impl ReportableError for ClientRequestError {
pub async fn handle_client<S: AsyncRead + AsyncWrite + Unpin>(
config: &'static ProxyConfig,
ctx: &mut RequestMonitoring,
ctx: &RequestMonitoring,
cancellation_handler: Arc<CancellationHandlerMain>,
stream: S,
mode: ClientMode,
@@ -248,25 +248,25 @@ pub async fn handle_client<S: AsyncRead + AsyncWrite + Unpin>(
conn_gauge: NumClientConnectionsGuard<'static>,
) -> Result<Option<ProxyPassthrough<CancellationHandlerMainInternal, S>>, ClientRequestError> {
info!(
protocol = %ctx.protocol,
protocol = %ctx.protocol(),
"handling interactive connection from client"
);
let metrics = &Metrics::get().proxy;
let proto = ctx.protocol;
let proto = ctx.protocol();
let _request_gauge = metrics.connection_requests.guard(proto);
let tls = config.tls_config.as_ref();
let record_handshake_error = !ctx.has_private_peer_addr();
let pause = ctx.latency_timer.pause(crate::metrics::Waiting::Client);
let do_handshake = handshake(stream, mode.handshake_tls(tls), record_handshake_error);
let pause = ctx.latency_timer_pause(crate::metrics::Waiting::Client);
let do_handshake = handshake(ctx, stream, mode.handshake_tls(tls), record_handshake_error);
let (mut stream, params) =
match tokio::time::timeout(config.handshake_timeout, do_handshake).await?? {
HandshakeData::Startup(stream, params) => (stream, params),
HandshakeData::Cancel(cancel_key_data) => {
return Ok(cancellation_handler
.cancel_session(cancel_key_data, ctx.session_id)
.cancel_session(cancel_key_data, ctx.session_id())
.await
.map(|()| None)?)
}

View File

@@ -46,7 +46,7 @@ pub trait ConnectMechanism {
type Error: From<Self::ConnectError>;
async fn connect_once(
&self,
ctx: &mut RequestMonitoring,
ctx: &RequestMonitoring,
node_info: &console::CachedNodeInfo,
timeout: time::Duration,
) -> Result<Self::Connection, Self::ConnectError>;
@@ -58,7 +58,7 @@ pub trait ConnectMechanism {
pub trait ComputeConnectBackend {
async fn wake_compute(
&self,
ctx: &mut RequestMonitoring,
ctx: &RequestMonitoring,
) -> Result<CachedNodeInfo, console::errors::WakeComputeError>;
fn get_keys(&self) -> Option<&ComputeCredentialKeys>;
@@ -81,7 +81,7 @@ impl ConnectMechanism for TcpMechanism<'_> {
#[tracing::instrument(fields(pid = tracing::field::Empty), skip_all)]
async fn connect_once(
&self,
ctx: &mut RequestMonitoring,
ctx: &RequestMonitoring,
node_info: &console::CachedNodeInfo,
timeout: time::Duration,
) -> Result<PostgresConnection, Self::Error> {
@@ -98,7 +98,7 @@ impl ConnectMechanism for TcpMechanism<'_> {
/// Try to connect to the compute node, retrying if necessary.
#[tracing::instrument(skip_all)]
pub async fn connect_to_compute<M: ConnectMechanism, B: ComputeConnectBackend>(
ctx: &mut RequestMonitoring,
ctx: &RequestMonitoring,
mechanism: &M,
user_info: &B,
allow_self_signed_compute: bool,
@@ -126,7 +126,7 @@ where
.await
{
Ok(res) => {
ctx.latency_timer.success();
ctx.success();
Metrics::get().proxy.retries_metric.observe(
RetriesMetricGroup {
outcome: ConnectOutcome::Success,
@@ -178,7 +178,7 @@ where
.await
{
Ok(res) => {
ctx.latency_timer.success();
ctx.success();
Metrics::get().proxy.retries_metric.observe(
RetriesMetricGroup {
outcome: ConnectOutcome::Success,
@@ -209,9 +209,7 @@ where
let wait_duration = retry_after(num_retries, connect_to_compute_retry_config);
num_retries += 1;
let pause = ctx
.latency_timer
.pause(crate::metrics::Waiting::RetryTimeout);
let pause = ctx.latency_timer_pause(crate::metrics::Waiting::RetryTimeout);
time::sleep(wait_duration).await;
drop(pause);
}

View File

@@ -10,6 +10,7 @@ use tracing::{info, warn};
use crate::{
auth::endpoint_sni,
config::{TlsConfig, PG_ALPN_PROTOCOL},
context::RequestMonitoring,
error::ReportableError,
metrics::Metrics,
proxy::ERR_INSECURE_CONNECTION,
@@ -67,6 +68,7 @@ pub enum HandshakeData<S> {
/// we also take an extra care of propagating only the select handshake errors to client.
#[tracing::instrument(skip_all)]
pub async fn handshake<S: AsyncRead + AsyncWrite + Unpin>(
ctx: &RequestMonitoring,
stream: S,
mut tls: Option<&TlsConfig>,
record_handshake_error: bool,
@@ -80,8 +82,6 @@ pub async fn handshake<S: AsyncRead + AsyncWrite + Unpin>(
let mut stream = PqStream::new(Stream::from_raw(stream));
loop {
let msg = stream.read_startup_packet().await?;
info!("received {msg:?}");
use FeStartupPacket::*;
match msg {
SslRequest { direct } => match stream.get_ref() {
@@ -145,16 +145,20 @@ pub async fn handshake<S: AsyncRead + AsyncWrite + Unpin>(
let conn_info = tls_stream.get_ref().1;
// try parse endpoint
let ep = conn_info
.server_name()
.and_then(|sni| endpoint_sni(sni, &tls.common_names).ok().flatten());
if let Some(ep) = ep {
ctx.set_endpoint_id(ep);
}
// check the ALPN, if exists, as required.
match conn_info.alpn_protocol() {
None | Some(PG_ALPN_PROTOCOL) => {}
Some(other) => {
// try parse ep for better error
let ep = conn_info.server_name().and_then(|sni| {
endpoint_sni(sni, &tls.common_names).ok().flatten()
});
let alpn = String::from_utf8_lossy(other);
warn!(?ep, %alpn, "unexpected ALPN");
warn!(%alpn, "unexpected ALPN");
return Err(HandshakeError::ProtocolViolation);
}
}
@@ -198,7 +202,12 @@ pub async fn handshake<S: AsyncRead + AsyncWrite + Unpin>(
.await?;
}
info!(?version, session_type = "normal", "successful handshake");
info!(
?version,
?params,
session_type = "normal",
"successful handshake"
);
break Ok(HandshakeData::Startup(stream, params));
}
// downgrade protocol version

View File

@@ -155,7 +155,7 @@ impl TestAuth for Scram {
stream: &mut PqStream<Stream<S>>,
) -> anyhow::Result<()> {
let outcome = auth::AuthFlow::new(stream)
.begin(auth::Scram(&self.0, &mut RequestMonitoring::test()))
.begin(auth::Scram(&self.0, &RequestMonitoring::test()))
.await?
.authenticate()
.await?;
@@ -175,10 +175,11 @@ async fn dummy_proxy(
auth: impl TestAuth + Send,
) -> anyhow::Result<()> {
let (client, _) = read_proxy_protocol(client).await?;
let mut stream = match handshake(client, tls.as_ref(), false).await? {
HandshakeData::Startup(stream, _) => stream,
HandshakeData::Cancel(_) => bail!("cancellation not supported"),
};
let mut stream =
match handshake(&RequestMonitoring::test(), client, tls.as_ref(), false).await? {
HandshakeData::Startup(stream, _) => stream,
HandshakeData::Cancel(_) => bail!("cancellation not supported"),
};
auth.authenticate(&mut stream).await?;
@@ -457,7 +458,7 @@ impl ConnectMechanism for TestConnectMechanism {
async fn connect_once(
&self,
_ctx: &mut RequestMonitoring,
_ctx: &RequestMonitoring,
_node_info: &console::CachedNodeInfo,
_timeout: std::time::Duration,
) -> Result<Self::Connection, Self::ConnectError> {
@@ -565,7 +566,7 @@ fn helper_create_connect_info(
async fn connect_to_compute_success() {
let _ = env_logger::try_init();
use ConnectAction::*;
let mut ctx = RequestMonitoring::test();
let ctx = RequestMonitoring::test();
let mechanism = TestConnectMechanism::new(vec![Wake, Connect]);
let user_info = helper_create_connect_info(&mechanism);
let config = RetryConfig {
@@ -573,7 +574,7 @@ async fn connect_to_compute_success() {
max_retries: 5,
backoff_factor: 2.0,
};
connect_to_compute(&mut ctx, &mechanism, &user_info, false, config, config)
connect_to_compute(&ctx, &mechanism, &user_info, false, config, config)
.await
.unwrap();
mechanism.verify();
@@ -583,7 +584,7 @@ async fn connect_to_compute_success() {
async fn connect_to_compute_retry() {
let _ = env_logger::try_init();
use ConnectAction::*;
let mut ctx = RequestMonitoring::test();
let ctx = RequestMonitoring::test();
let mechanism = TestConnectMechanism::new(vec![Wake, Retry, Wake, Connect]);
let user_info = helper_create_connect_info(&mechanism);
let config = RetryConfig {
@@ -591,7 +592,7 @@ async fn connect_to_compute_retry() {
max_retries: 5,
backoff_factor: 2.0,
};
connect_to_compute(&mut ctx, &mechanism, &user_info, false, config, config)
connect_to_compute(&ctx, &mechanism, &user_info, false, config, config)
.await
.unwrap();
mechanism.verify();
@@ -602,7 +603,7 @@ async fn connect_to_compute_retry() {
async fn connect_to_compute_non_retry_1() {
let _ = env_logger::try_init();
use ConnectAction::*;
let mut ctx = RequestMonitoring::test();
let ctx = RequestMonitoring::test();
let mechanism = TestConnectMechanism::new(vec![Wake, Retry, Wake, Fail]);
let user_info = helper_create_connect_info(&mechanism);
let config = RetryConfig {
@@ -610,7 +611,7 @@ async fn connect_to_compute_non_retry_1() {
max_retries: 5,
backoff_factor: 2.0,
};
connect_to_compute(&mut ctx, &mechanism, &user_info, false, config, config)
connect_to_compute(&ctx, &mechanism, &user_info, false, config, config)
.await
.unwrap_err();
mechanism.verify();
@@ -621,7 +622,7 @@ async fn connect_to_compute_non_retry_1() {
async fn connect_to_compute_non_retry_2() {
let _ = env_logger::try_init();
use ConnectAction::*;
let mut ctx = RequestMonitoring::test();
let ctx = RequestMonitoring::test();
let mechanism = TestConnectMechanism::new(vec![Wake, Fail, Wake, Connect]);
let user_info = helper_create_connect_info(&mechanism);
let config = RetryConfig {
@@ -629,7 +630,7 @@ async fn connect_to_compute_non_retry_2() {
max_retries: 5,
backoff_factor: 2.0,
};
connect_to_compute(&mut ctx, &mechanism, &user_info, false, config, config)
connect_to_compute(&ctx, &mechanism, &user_info, false, config, config)
.await
.unwrap();
mechanism.verify();
@@ -641,7 +642,7 @@ async fn connect_to_compute_non_retry_3() {
let _ = env_logger::try_init();
tokio::time::pause();
use ConnectAction::*;
let mut ctx = RequestMonitoring::test();
let ctx = RequestMonitoring::test();
let mechanism =
TestConnectMechanism::new(vec![Wake, Retry, Wake, Retry, Retry, Retry, Retry, Retry]);
let user_info = helper_create_connect_info(&mechanism);
@@ -656,7 +657,7 @@ async fn connect_to_compute_non_retry_3() {
backoff_factor: 2.0,
};
connect_to_compute(
&mut ctx,
&ctx,
&mechanism,
&user_info,
false,
@@ -673,7 +674,7 @@ async fn connect_to_compute_non_retry_3() {
async fn wake_retry() {
let _ = env_logger::try_init();
use ConnectAction::*;
let mut ctx = RequestMonitoring::test();
let ctx = RequestMonitoring::test();
let mechanism = TestConnectMechanism::new(vec![WakeRetry, Wake, Connect]);
let user_info = helper_create_connect_info(&mechanism);
let config = RetryConfig {
@@ -681,7 +682,7 @@ async fn wake_retry() {
max_retries: 5,
backoff_factor: 2.0,
};
connect_to_compute(&mut ctx, &mechanism, &user_info, false, config, config)
connect_to_compute(&ctx, &mechanism, &user_info, false, config, config)
.await
.unwrap();
mechanism.verify();
@@ -692,7 +693,7 @@ async fn wake_retry() {
async fn wake_non_retry() {
let _ = env_logger::try_init();
use ConnectAction::*;
let mut ctx = RequestMonitoring::test();
let ctx = RequestMonitoring::test();
let mechanism = TestConnectMechanism::new(vec![WakeRetry, WakeFail]);
let user_info = helper_create_connect_info(&mechanism);
let config = RetryConfig {
@@ -700,7 +701,7 @@ async fn wake_non_retry() {
max_retries: 5,
backoff_factor: 2.0,
};
connect_to_compute(&mut ctx, &mechanism, &user_info, false, config, config)
connect_to_compute(&ctx, &mechanism, &user_info, false, config, config)
.await
.unwrap_err();
mechanism.verify();

View File

@@ -34,9 +34,14 @@ async fn proxy_mitm(
tokio::spawn(async move {
// begin handshake with end_server
let end_server = connect_tls(server2, client_config2.make_tls_connect().unwrap()).await;
let (end_client, startup) = match handshake(client1, Some(&server_config1), false)
.await
.unwrap()
let (end_client, startup) = match handshake(
&RequestMonitoring::test(),
client1,
Some(&server_config1),
false,
)
.await
.unwrap()
{
HandshakeData::Startup(stream, params) => (stream, params),
HandshakeData::Cancel(_) => panic!("cancellation not supported"),

View File

@@ -14,7 +14,7 @@ use super::connect_compute::ComputeConnectBackend;
pub async fn wake_compute<B: ComputeConnectBackend>(
num_retries: &mut u32,
ctx: &mut RequestMonitoring,
ctx: &RequestMonitoring,
api: &B,
config: RetryConfig,
) -> Result<CachedNodeInfo, WakeComputeError> {
@@ -52,9 +52,7 @@ pub async fn wake_compute<B: ComputeConnectBackend>(
let wait_duration = retry_after(*num_retries, config);
*num_retries += 1;
let pause = ctx
.latency_timer
.pause(crate::metrics::Waiting::RetryTimeout);
let pause = ctx.latency_timer_pause(crate::metrics::Waiting::RetryTimeout);
tokio::time::sleep(wait_duration).await;
drop(pause);
}

View File

@@ -334,7 +334,7 @@ async fn request_handler(
&config.region,
);
let span = ctx.span.clone();
let span = ctx.span();
info!(parent: &span, "performing websocket upgrade");
let (response, websocket) = framed_websockets::upgrade::upgrade(&mut request)
@@ -367,7 +367,7 @@ async fn request_handler(
crate::metrics::Protocol::Http,
&config.region,
);
let span = ctx.span.clone();
let span = ctx.span();
sql_over_http::handle(config, ctx, request, backend, http_cancellation_token)
.instrument(span)

View File

@@ -35,15 +35,15 @@ pub struct PoolingBackend {
impl PoolingBackend {
pub async fn authenticate(
&self,
ctx: &mut RequestMonitoring,
ctx: &RequestMonitoring,
config: &AuthenticationConfig,
conn_info: &ConnInfo,
) -> Result<ComputeCredentials, AuthError> {
let user_info = conn_info.user_info.clone();
let backend = self.config.auth_backend.as_ref().map(|_| user_info.clone());
let (allowed_ips, maybe_secret) = backend.get_allowed_ips_and_secret(ctx).await?;
if !check_peer_addr_is_in_list(&ctx.peer_addr, &allowed_ips) {
return Err(AuthError::ip_address_not_allowed(ctx.peer_addr));
if !check_peer_addr_is_in_list(&ctx.peer_addr(), &allowed_ips) {
return Err(AuthError::ip_address_not_allowed(ctx.peer_addr()));
}
if !self
.endpoint_rate_limiter
@@ -100,7 +100,7 @@ impl PoolingBackend {
#[tracing::instrument(fields(pid = tracing::field::Empty), skip_all)]
pub async fn connect_to_compute(
&self,
ctx: &mut RequestMonitoring,
ctx: &RequestMonitoring,
conn_info: ConnInfo,
keys: ComputeCredentials,
force_new: bool,
@@ -222,7 +222,7 @@ impl ConnectMechanism for TokioMechanism {
async fn connect_once(
&self,
ctx: &mut RequestMonitoring,
ctx: &RequestMonitoring,
node_info: &CachedNodeInfo,
timeout: Duration,
) -> Result<Self::Connection, Self::ConnectError> {
@@ -240,7 +240,7 @@ impl ConnectMechanism for TokioMechanism {
.param("client_encoding", "UTF8")
.expect("client encoding UTF8 is always valid");
let pause = ctx.latency_timer.pause(crate::metrics::Waiting::Compute);
let pause = ctx.latency_timer_pause(crate::metrics::Waiting::Compute);
let res = config.connect(tokio_postgres::NoTls).await;
drop(pause);
let (client, connection) = permit.release_result(res)?;

View File

@@ -377,7 +377,7 @@ impl<C: ClientInnerExt> GlobalConnPool<C> {
pub fn get(
self: &Arc<Self>,
ctx: &mut RequestMonitoring,
ctx: &RequestMonitoring,
conn_info: &ConnInfo,
) -> Result<Option<Client<C>>, HttpConnError> {
let mut client: Option<ClientInner<C>> = None;
@@ -409,9 +409,9 @@ impl<C: ClientInnerExt> GlobalConnPool<C> {
cold_start_info = ColdStartInfo::HttpPoolHit.as_str(),
"pool: reusing connection '{conn_info}'"
);
client.session.send(ctx.session_id)?;
client.session.send(ctx.session_id())?;
ctx.set_cold_start_info(ColdStartInfo::HttpPoolHit);
ctx.latency_timer.success();
ctx.success();
return Ok(Some(Client::new(client, conn_info.clone(), endpoint_pool)));
}
}
@@ -465,19 +465,19 @@ impl<C: ClientInnerExt> GlobalConnPool<C> {
pub fn poll_client<C: ClientInnerExt>(
global_pool: Arc<GlobalConnPool<C>>,
ctx: &mut RequestMonitoring,
ctx: &RequestMonitoring,
conn_info: ConnInfo,
client: C,
mut connection: tokio_postgres::Connection<Socket, NoTlsStream>,
conn_id: uuid::Uuid,
aux: MetricsAuxInfo,
) -> Client<C> {
let conn_gauge = Metrics::get().proxy.db_connections.guard(ctx.protocol);
let mut session_id = ctx.session_id;
let conn_gauge = Metrics::get().proxy.db_connections.guard(ctx.protocol());
let mut session_id = ctx.session_id();
let (tx, mut rx) = tokio::sync::watch::channel(session_id);
let span = info_span!(parent: None, "connection", %conn_id);
let cold_start_info = ctx.cold_start_info;
let cold_start_info = ctx.cold_start_info();
span.in_scope(|| {
info!(cold_start_info = cold_start_info.as_str(), %conn_info, %session_id, "new connection");
});
@@ -766,7 +766,6 @@ mod tests {
opt_in: false,
max_total_conns: 3,
},
request_timeout: Duration::from_secs(1),
cancel_set: CancelSet::new(0),
client_conn_threshold: u64::MAX,
}));

View File

@@ -144,7 +144,7 @@ impl UserFacingError for ConnInfoError {
}
fn get_conn_info(
ctx: &mut RequestMonitoring,
ctx: &RequestMonitoring,
headers: &HeaderMap,
tls: &TlsConfig,
) -> Result<ConnInfo, ConnInfoError> {
@@ -224,12 +224,12 @@ fn get_conn_info(
// TODO: return different http error codes
pub async fn handle(
config: &'static ProxyConfig,
mut ctx: RequestMonitoring,
ctx: RequestMonitoring,
request: Request<Incoming>,
backend: Arc<PoolingBackend>,
cancel: CancellationToken,
) -> Result<Response<Full<Bytes>>, ApiError> {
let result = handle_inner(cancel, config, &mut ctx, request, backend).await;
let result = handle_inner(cancel, config, &ctx, request, backend).await;
let mut response = match result {
Ok(r) => {
@@ -482,13 +482,16 @@ fn map_isolation_level_to_headers(level: IsolationLevel) -> Option<HeaderValue>
async fn handle_inner(
cancel: CancellationToken,
config: &'static ProxyConfig,
ctx: &mut RequestMonitoring,
ctx: &RequestMonitoring,
request: Request<Incoming>,
backend: Arc<PoolingBackend>,
) -> Result<Response<Full<Bytes>>, SqlOverHttpError> {
let _requeset_gauge = Metrics::get().proxy.connection_requests.guard(ctx.protocol);
let _requeset_gauge = Metrics::get()
.proxy
.connection_requests
.guard(ctx.protocol());
info!(
protocol = %ctx.protocol,
protocol = %ctx.protocol(),
"handling interactive connection from client"
);
@@ -544,7 +547,7 @@ async fn handle_inner(
.await?;
// not strictly necessary to mark success here,
// but it's just insurance for if we forget it somewhere else
ctx.latency_timer.success();
ctx.success();
Ok::<_, HttpConnError>(client)
}
.map_err(SqlOverHttpError::from),

View File

@@ -129,7 +129,7 @@ impl<S: AsyncRead + AsyncWrite + Unpin> AsyncBufRead for WebSocketRw<S> {
pub async fn serve_websocket(
config: &'static ProxyConfig,
mut ctx: RequestMonitoring,
ctx: RequestMonitoring,
websocket: OnUpgrade,
cancellation_handler: Arc<CancellationHandlerMain>,
endpoint_rate_limiter: Arc<EndpointRateLimiter>,
@@ -145,7 +145,7 @@ pub async fn serve_websocket(
let res = Box::pin(handle_client(
config,
&mut ctx,
&ctx,
cancellation_handler,
WebSocketRw::new(websocket),
ClientMode::Websockets { hostname },

View File

@@ -7,7 +7,6 @@ use clap::{ArgAction, Parser};
use futures::future::BoxFuture;
use futures::stream::FuturesUnordered;
use futures::{FutureExt, StreamExt};
use metrics::launch_timestamp::{set_launch_timestamp_metric, LaunchTimestamp};
use remote_storage::RemoteStorageConfig;
use sd_notify::NotifyState;
use tokio::runtime::Handle;
@@ -205,7 +204,6 @@ fn opt_pathbuf_parser(s: &str) -> Result<Utf8PathBuf, String> {
#[tokio::main(flavor = "current_thread")]
async fn main() -> anyhow::Result<()> {
let launch_ts = Box::leak(Box::new(LaunchTimestamp::generate()));
// We want to allow multiple occurences of the same arg (taking the last) so
// that neon_local could generate command with defaults + overrides without
// getting 'argument cannot be used multiple times' error. This seems to be
@@ -358,14 +356,14 @@ async fn main() -> anyhow::Result<()> {
Some(GIT_VERSION.into()),
&[("node_id", &conf.my_id.to_string())],
);
start_safekeeper(launch_ts, conf).await
start_safekeeper(conf).await
}
/// Result of joining any of main tasks: upper error means task failed to
/// complete, e.g. panicked, inner is error produced by task itself.
type JoinTaskRes = Result<anyhow::Result<()>, JoinError>;
async fn start_safekeeper(launch_ts: &'static LaunchTimestamp, conf: SafeKeeperConf) -> Result<()> {
async fn start_safekeeper(conf: SafeKeeperConf) -> Result<()> {
// Prevent running multiple safekeepers on the same directory
let lock_file_path = conf.workdir.join(PID_FILE_NAME);
let lock_file =
@@ -493,7 +491,6 @@ async fn start_safekeeper(launch_ts: &'static LaunchTimestamp, conf: SafeKeeperC
tasks_handles.push(Box::pin(broker_task_handle));
set_build_info_metric(GIT_VERSION, BUILD_TAG);
set_launch_timestamp_metric(launch_ts);
// TODO: update tokio-stream, convert to real async Stream with
// SignalStream, map it to obtain missing signal name, combine streams into

View File

@@ -642,8 +642,7 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
logging::replace_panic_hook_with_tracing_panic_hook().forget();
// initialize sentry if SENTRY_DSN is provided
let _sentry_guard = init_sentry(Some(GIT_VERSION.into()), &[]);
info!("version: {GIT_VERSION}");
info!("build_tag: {BUILD_TAG}");
info!("version: {GIT_VERSION} build_tag: {BUILD_TAG}");
metrics::set_build_info_metric(GIT_VERSION, BUILD_TAG);
// On any shutdown signal, log receival and exit.

View File

@@ -2954,7 +2954,6 @@ impl Service {
}
// no shard needs to go first/last; the operation should be idempotent
// TODO: it would be great to ensure that all shards return the same error
let mut results = self
.tenant_for_shards(targets, |tenant_shard_id, node| {
futures::FutureExt::boxed(detach_one(
@@ -2973,6 +2972,7 @@ impl Service {
.filter(|(_, res)| res != &any.1)
.collect::<Vec<_>>();
if !mismatching.is_empty() {
// this can be hit by races which should not happen because operation lock on cplane
let matching = results.len() - mismatching.len();
tracing::error!(
matching,

View File

@@ -172,8 +172,11 @@ pub(crate) async fn branch_cleanup_and_check_errors(
}
}
BlobDataParseResult::Relic => {}
BlobDataParseResult::Incorrect(parse_errors) => result.errors.extend(
parse_errors
BlobDataParseResult::Incorrect {
errors,
s3_layers: _,
} => result.errors.extend(
errors
.into_iter()
.map(|error| format!("parse error: {error}")),
),
@@ -300,7 +303,10 @@ pub(crate) enum BlobDataParseResult {
},
/// The remains of a deleted Timeline (i.e. an initdb archive only)
Relic,
Incorrect(Vec<String>),
Incorrect {
errors: Vec<String>,
s3_layers: HashSet<(LayerName, Generation)>,
},
}
pub(crate) fn parse_layer_object_name(name: &str) -> Result<(LayerName, Generation), String> {
@@ -443,7 +449,7 @@ pub(crate) async fn list_timeline_blobs(
}
Ok(S3TimelineBlobData {
blob_data: BlobDataParseResult::Incorrect(errors),
blob_data: BlobDataParseResult::Incorrect { errors, s3_layers },
unused_index_keys: index_part_keys,
unknown_keys,
})

View File

@@ -208,21 +208,21 @@ async fn main() -> anyhow::Result<()> {
}
if summary.is_fatal() {
Err(anyhow::anyhow!("Fatal scrub errors detected"))
tracing::error!("Fatal scrub errors detected");
} else if summary.is_empty() {
// Strictly speaking an empty bucket is a valid bucket, but if someone ran the
// scrubber they were likely expecting to scan something, and if we see no timelines
// at all then it's likely due to some configuration issues like a bad prefix
Err(anyhow::anyhow!(
tracing::error!(
"No timelines found in bucket {} prefix {}",
bucket_config.bucket,
bucket_config
.prefix_in_bucket
.unwrap_or("<none>".to_string())
))
} else {
Ok(())
);
}
Ok(())
}
}
}

View File

@@ -389,10 +389,13 @@ async fn gc_ancestor(
// Post-deletion tenant location: don't try and GC it.
continue;
}
BlobDataParseResult::Incorrect(reasons) => {
BlobDataParseResult::Incorrect {
errors,
s3_layers: _, // TODO(yuchen): could still check references to these s3 layers?
} => {
// Our primary purpose isn't to report on bad data, but log this rather than skipping silently
tracing::warn!(
"Skipping ancestor GC for timeline {ttid}, bad metadata: {reasons:?}"
"Skipping ancestor GC for timeline {ttid}, bad metadata: {errors:?}"
);
continue;
}
@@ -518,9 +521,12 @@ pub async fn pageserver_physical_gc(
// Post-deletion tenant location: don't try and GC it.
return Ok(summary);
}
BlobDataParseResult::Incorrect(reasons) => {
BlobDataParseResult::Incorrect {
errors,
s3_layers: _,
} => {
// Our primary purpose isn't to report on bad data, but log this rather than skipping silently
tracing::warn!("Skipping timeline {ttid}, bad metadata: {reasons:?}");
tracing::warn!("Skipping timeline {ttid}, bad metadata: {errors:?}");
return Ok(summary);
}
};

View File

@@ -290,13 +290,21 @@ pub async fn scan_metadata(
}
}
if let BlobDataParseResult::Parsed {
index_part: _index_part,
index_part_generation: _index_part_generation,
s3_layers,
} = &data.blob_data
{
tenant_objects.push(ttid, s3_layers.clone());
match &data.blob_data {
BlobDataParseResult::Parsed {
index_part: _index_part,
index_part_generation: _index_part_generation,
s3_layers,
} => {
tenant_objects.push(ttid, s3_layers.clone());
}
BlobDataParseResult::Relic => (),
BlobDataParseResult::Incorrect {
errors: _,
s3_layers,
} => {
tenant_objects.push(ttid, s3_layers.clone());
}
}
tenant_timeline_results.push((ttid, data));
}

View File

@@ -269,7 +269,7 @@ impl SnapshotDownloader {
.context("Downloading timeline")?;
}
BlobDataParseResult::Relic => {}
BlobDataParseResult::Incorrect(_) => {
BlobDataParseResult::Incorrect { .. } => {
tracing::error!("Bad metadata in timeline {ttid}");
}
};

View File

@@ -978,7 +978,10 @@ class NeonEnvBuilder:
and self.enable_scrub_on_exit
):
try:
self.env.storage_scrubber.scan_metadata()
healthy, _ = self.env.storage_scrubber.scan_metadata()
if not healthy:
e = Exception("Remote storage metadata corrupted")
cleanup_error = e
except Exception as e:
log.error(f"Error during remote storage scrub: {e}")
cleanup_error = e
@@ -4411,14 +4414,19 @@ class StorageScrubber:
assert stdout is not None
return stdout
def scan_metadata(self, post_to_storage_controller: bool = False) -> Any:
def scan_metadata(self, post_to_storage_controller: bool = False) -> Tuple[bool, Any]:
"""
Returns the health status and the metadata summary.
"""
args = ["scan-metadata", "--node-kind", "pageserver", "--json"]
if post_to_storage_controller:
args.append("--post")
stdout = self.scrubber_cli(args, timeout=30)
try:
return json.loads(stdout)
summary = json.loads(stdout)
healthy = not summary["with_errors"] and not summary["with_warnings"]
return healthy, summary
except:
log.error("Failed to decode JSON output from `scan-metadata`. Dumping stdout:")
log.error(stdout)

View File

@@ -61,6 +61,7 @@ class HistoricLayerInfo:
remote: bool
# None for image layers, true if pageserver thinks this is an L0 delta layer
l0: Optional[bool]
visible: bool
@classmethod
def from_json(cls, d: Dict[str, Any]) -> HistoricLayerInfo:
@@ -79,6 +80,7 @@ class HistoricLayerInfo:
lsn_end=d.get("lsn_end"),
remote=d["remote"],
l0=l0_ness,
visible=d["access_stats"]["visible"],
)
@@ -855,7 +857,7 @@ class PageserverHttpClient(requests.Session, MetricsGetter):
timeline_id: TimelineId,
batch_size: int | None = None,
**kwargs,
) -> List[TimelineId]:
) -> Set[TimelineId]:
params = {}
if batch_size is not None:
params["batch_size"] = batch_size
@@ -866,7 +868,7 @@ class PageserverHttpClient(requests.Session, MetricsGetter):
)
self.verbose_error(res)
json = res.json()
return list(map(TimelineId, json["reparented_timelines"]))
return set(map(TimelineId, json["reparented_timelines"]))
def evict_layer(
self, tenant_id: Union[TenantId, TenantShardId], timeline_id: TimelineId, layer_name: str

View File

@@ -12,7 +12,6 @@ import requests
from fixtures.log_helper import log
from fixtures.neon_fixtures import RemotePostgres
from fixtures.utils import wait_until
from kafka import KafkaConsumer
class DebeziumAPI:
@@ -95,6 +94,8 @@ def debezium(remote_pg: RemotePostgres):
log.debug("%s %s %s", resp.status_code, resp.ok, resp.text)
assert resp.status_code == 201
assert len(dbz.list_connectors()) == 1
from kafka import KafkaConsumer
consumer = KafkaConsumer(
"dbserver1.inventory.customers",
bootstrap_servers=["kafka:9092"],

View File

@@ -217,7 +217,11 @@ def test_storage_controller_many_tenants(
# A reconciler operation: migrate a shard.
shard_number = rng.randint(0, shard_count - 1)
tenant_shard_id = TenantShardId(tenant_id, shard_number, shard_count)
dest_ps_id = rng.choice([ps.id for ps in env.pageservers])
# Migrate it to its secondary location
desc = env.storage_controller.tenant_describe(tenant_id)
dest_ps_id = desc["shards"][shard_number]["node_secondary"][0]
f = executor.submit(
env.storage_controller.tenant_shard_migrate, tenant_shard_id, dest_ps_id
)
@@ -231,7 +235,11 @@ def test_storage_controller_many_tenants(
for f in futs:
f.result()
# Consistency check is safe here: all the previous operations waited for reconcile before completing
# Some of the operations above (notably migrations) might leave the controller in a state where it has
# some work to do, for example optimizing shard placement after we do a random migration. Wait for the system
# to reach a quiescent state before doing following checks.
env.storage_controller.reconcile_until_idle()
env.storage_controller.consistency_check()
check_memory()

View File

@@ -10,7 +10,12 @@ from fixtures.neon_fixtures import NeonEnvBuilder
@pytest.mark.timeout(600)
def test_compute_pageserver_connection_stress(neon_env_builder: NeonEnvBuilder):
env = neon_env_builder.init_start()
env.pageserver.allowed_errors.append(".*simulated connection error.*")
env.pageserver.allowed_errors.append(".*simulated connection error.*") # this is never hit
# the real reason (Simulated Connection Error) is on the next line, and we cannot filter this out.
env.pageserver.allowed_errors.append(
".*ERROR error in page_service connection task: Postgres query error"
)
# Enable failpoint before starting everything else up so that we exercise the retry
# on fetching basebackup
@@ -69,3 +74,7 @@ def test_compute_pageserver_connection_stress(neon_env_builder: NeonEnvBuilder):
cur.fetchall()
times_executed += 1
log.info(f"Workload executed {times_executed} times")
# do a graceful shutdown which would had caught the allowed_errors before
# https://github.com/neondatabase/neon/pull/8632
env.pageserver.stop()

View File

@@ -496,11 +496,10 @@ def test_historic_storage_formats(
# Check the scrubber handles this old data correctly (can read it and doesn't consider it corrupt)
#
# Do this _before_ importing to the pageserver, as that import may start writing immediately
metadata_summary = env.storage_scrubber.scan_metadata()
healthy, metadata_summary = env.storage_scrubber.scan_metadata()
assert healthy
assert metadata_summary["tenant_count"] >= 1
assert metadata_summary["timeline_count"] >= 1
assert not metadata_summary["with_errors"]
assert not metadata_summary["with_warnings"]
env.neon_cli.import_tenant(dataset.tenant_id)

View File

@@ -214,12 +214,11 @@ def test_generations_upgrade(neon_env_builder: NeonEnvBuilder):
# Having written a mixture of generation-aware and legacy index_part.json,
# ensure the scrubber handles the situation as expected.
metadata_summary = env.storage_scrubber.scan_metadata()
healthy, metadata_summary = env.storage_scrubber.scan_metadata()
assert metadata_summary["tenant_count"] == 1 # Scrubber should have seen our timeline
assert metadata_summary["timeline_count"] == 1
assert metadata_summary["timeline_shard_count"] == 1
assert not metadata_summary["with_errors"]
assert not metadata_summary["with_warnings"]
assert healthy
def test_deferred_deletion(neon_env_builder: NeonEnvBuilder):

View File

@@ -2,10 +2,11 @@ import json
import os
import random
import time
from typing import Any, Dict, Optional
from pathlib import Path
from typing import Any, Dict, Optional, Union
import pytest
from fixtures.common_types import TenantId, TimelineId
from fixtures.common_types import TenantId, TenantShardId, TimelineId
from fixtures.log_helper import log
from fixtures.neon_fixtures import NeonEnvBuilder, NeonPageserver
from fixtures.pageserver.common_types import parse_layer_file_name
@@ -437,6 +438,35 @@ def test_heatmap_uploads(neon_env_builder: NeonEnvBuilder):
validate_heatmap(heatmap_second)
def list_elegible_layers(
pageserver, tenant_id: Union[TenantId, TenantShardId], timeline_id: TimelineId
) -> list[Path]:
"""
The subset of layer filenames that are elegible for secondary download: at time of writing this
is all resident layers which are also visible.
"""
candidates = pageserver.list_layers(tenant_id, timeline_id)
layer_map = pageserver.http_client().layer_map_info(tenant_id, timeline_id)
# Map of layer filenames to their visibility the "layer name" is not the same as the filename: add suffix to resolve one to the other
visible_map = dict(
(f"{layer.layer_file_name}-v1-00000001", layer.visible)
for layer in layer_map.historic_layers
)
def is_visible(layer_file_name):
try:
return visible_map[str(layer_file_name)]
except KeyError:
# Unexpected: tests should call this when pageservers are in a quiet state such that the layer map
# matches what's on disk.
log.warn(f"Lookup {layer_file_name} from {list(visible_map.keys())}")
raise
return list(c for c in candidates if is_visible(c))
def test_secondary_downloads(neon_env_builder: NeonEnvBuilder):
"""
Test the overall data flow in secondary mode:
@@ -491,7 +521,7 @@ def test_secondary_downloads(neon_env_builder: NeonEnvBuilder):
ps_secondary.http_client().tenant_secondary_download(tenant_id)
assert ps_attached.list_layers(tenant_id, timeline_id) == ps_secondary.list_layers(
assert list_elegible_layers(ps_attached, tenant_id, timeline_id) == ps_secondary.list_layers(
tenant_id, timeline_id
)
@@ -509,9 +539,9 @@ def test_secondary_downloads(neon_env_builder: NeonEnvBuilder):
ps_secondary.http_client().tenant_secondary_download(tenant_id)
try:
assert ps_attached.list_layers(tenant_id, timeline_id) == ps_secondary.list_layers(
tenant_id, timeline_id
)
assert list_elegible_layers(
ps_attached, tenant_id, timeline_id
) == ps_secondary.list_layers(tenant_id, timeline_id)
except:
# Do a full listing of the secondary location on errors, to help debug of
# https://github.com/neondatabase/neon/issues/6966
@@ -532,8 +562,8 @@ def test_secondary_downloads(neon_env_builder: NeonEnvBuilder):
# ==================================================================
try:
log.info("Evicting a layer...")
layer_to_evict = ps_attached.list_layers(tenant_id, timeline_id)[0]
some_other_layer = ps_attached.list_layers(tenant_id, timeline_id)[1]
layer_to_evict = list_elegible_layers(ps_attached, tenant_id, timeline_id)[0]
some_other_layer = list_elegible_layers(ps_attached, tenant_id, timeline_id)[1]
log.info(f"Victim layer: {layer_to_evict.name}")
ps_attached.http_client().evict_layer(
tenant_id, timeline_id, layer_name=layer_to_evict.name
@@ -551,9 +581,9 @@ def test_secondary_downloads(neon_env_builder: NeonEnvBuilder):
ps_secondary.http_client().tenant_secondary_download(tenant_id)
assert layer_to_evict not in ps_attached.list_layers(tenant_id, timeline_id)
assert ps_attached.list_layers(tenant_id, timeline_id) == ps_secondary.list_layers(
tenant_id, timeline_id
)
assert list_elegible_layers(
ps_attached, tenant_id, timeline_id
) == ps_secondary.list_layers(tenant_id, timeline_id)
except:
# On assertion failures, log some details to help with debugging
heatmap = env.pageserver_remote_storage.heatmap_content(tenant_id)
@@ -563,7 +593,8 @@ def test_secondary_downloads(neon_env_builder: NeonEnvBuilder):
# Scrub the remote storage
# ========================
# This confirms that the scrubber isn't upset by the presence of the heatmap
env.storage_scrubber.scan_metadata()
healthy, _ = env.storage_scrubber.scan_metadata()
assert healthy
# Detach secondary and delete tenant
# ===================================

View File

@@ -124,7 +124,8 @@ def test_sharding_smoke(
# Check the scrubber isn't confused by sharded content, then disable
# it during teardown because we'll have deleted by then
env.storage_scrubber.scan_metadata()
healthy, _ = env.storage_scrubber.scan_metadata()
assert healthy
env.storage_controller.pageserver_api().tenant_delete(tenant_id)
assert_prefix_empty(

View File

@@ -516,9 +516,8 @@ def test_scrubber_scan_pageserver_metadata(
assert len(index.layer_metadata) > 0
it = iter(index.layer_metadata.items())
scan_summary = env.storage_scrubber.scan_metadata(post_to_storage_controller=True)
assert not scan_summary["with_warnings"]
assert not scan_summary["with_errors"]
healthy, scan_summary = env.storage_scrubber.scan_metadata(post_to_storage_controller=True)
assert healthy
assert env.storage_controller.metadata_health_is_healthy()
@@ -532,16 +531,18 @@ def test_scrubber_scan_pageserver_metadata(
log.info(f"delete response: {delete_response}")
# Check scan summary without posting to storage controller. Expect it to be a L0 layer so only emit warnings.
scan_summary = env.storage_scrubber.scan_metadata()
_, scan_summary = env.storage_scrubber.scan_metadata()
log.info(f"{pprint.pformat(scan_summary)}")
assert len(scan_summary["with_warnings"]) > 0
assert env.storage_controller.metadata_health_is_healthy()
# Now post to storage controller, expect seeing one unhealthy health record
scan_summary = env.storage_scrubber.scan_metadata(post_to_storage_controller=True)
_, scan_summary = env.storage_scrubber.scan_metadata(post_to_storage_controller=True)
log.info(f"{pprint.pformat(scan_summary)}")
assert len(scan_summary["with_warnings"]) > 0
unhealthy = env.storage_controller.metadata_health_list_unhealthy()["unhealthy_tenant_shards"]
assert len(unhealthy) == 1 and unhealthy[0] == str(tenant_shard_id)
neon_env_builder.disable_scrub_on_exit()

View File

@@ -37,7 +37,9 @@ def test_subscriber_restart(neon_simple_env: NeonEnv):
scur.execute("CREATE TABLE t (pk integer primary key, sk integer)")
# scur.execute("CREATE INDEX on t(sk)") # slowdown applying WAL at replica
pub_conn = f"host=localhost port={pub.pg_port} dbname=postgres user=cloud_admin"
query = f"CREATE SUBSCRIPTION sub CONNECTION '{pub_conn}' PUBLICATION pub"
# synchronous_commit=on to test a hypothesis for why this test has been flaky.
# XXX: Add link to the issue
query = f"CREATE SUBSCRIPTION sub CONNECTION '{pub_conn}' PUBLICATION pub with (synchronous_commit=on)"
scur.execute(query)
time.sleep(2) # let initial table sync complete

View File

@@ -128,6 +128,8 @@ def test_tenant_delete_smoke(
assert ps_http.get_metric_value("pageserver_tenant_manager_slots", {"mode": "attached"}) == 1
assert ps_http.get_metric_value("pageserver_tenant_manager_slots", {"mode": "inprogress"}) == 0
env.pageserver.stop()
def test_long_timeline_create_cancelled_by_tenant_delete(neon_env_builder: NeonEnvBuilder):
"""Reproduction of 2023-11-23 stuck tenants investigation"""
@@ -200,11 +202,10 @@ def test_long_timeline_create_cancelled_by_tenant_delete(neon_env_builder: NeonE
if deletion is not None:
deletion.join()
env.pageserver.stop()
def test_tenant_delete_races_timeline_creation(
neon_env_builder: NeonEnvBuilder,
pg_bin: PgBin,
):
def test_tenant_delete_races_timeline_creation(neon_env_builder: NeonEnvBuilder):
"""
Validate that timeline creation executed in parallel with deletion works correctly.
@@ -318,6 +319,8 @@ def test_tenant_delete_races_timeline_creation(
# We deleted our only tenant, and the scrubber fails if it detects nothing
neon_env_builder.disable_scrub_on_exit()
env.pageserver.stop()
def test_tenant_delete_scrubber(pg_bin: PgBin, neon_env_builder: NeonEnvBuilder):
"""
@@ -341,13 +344,13 @@ def test_tenant_delete_scrubber(pg_bin: PgBin, neon_env_builder: NeonEnvBuilder)
wait_for_upload(ps_http, tenant_id, timeline_id, last_flush_lsn)
env.stop()
result = env.storage_scrubber.scan_metadata()
assert result["with_warnings"] == []
healthy, _ = env.storage_scrubber.scan_metadata()
assert healthy
env.start()
ps_http = env.pageserver.http_client()
ps_http.tenant_delete(tenant_id)
env.stop()
env.storage_scrubber.scan_metadata()
assert result["with_warnings"] == []
healthy, _ = env.storage_scrubber.scan_metadata()
assert healthy

View File

@@ -165,7 +165,7 @@ def test_ancestor_detach_branched_from(
)
all_reparented = client.detach_ancestor(env.initial_tenant, timeline_id)
assert all_reparented == []
assert all_reparented == set()
if restart_after:
env.pageserver.stop()
@@ -534,7 +534,7 @@ def test_compaction_induced_by_detaches_in_history(
for _, timeline_id in skip_main:
reparented = client.detach_ancestor(env.initial_tenant, timeline_id)
assert reparented == [], "we have no earlier branches at any level"
assert reparented == set(), "we have no earlier branches at any level"
post_detach_l0s = list(filter(lambda x: x.l0, delta_layers(branch_timeline_id)))
assert len(post_detach_l0s) == 5, "should had inherited 4 L0s, have 5 in total"
@@ -774,7 +774,7 @@ def test_sharded_timeline_detach_ancestor(neon_env_builder: NeonEnvBuilder):
else:
break
assert reparented == [], "too many retries (None) or unexpected reparentings"
assert reparented == set(), "too many retries (None) or unexpected reparentings"
for shard_info in shards:
node_id = int(shard_info["node_id"])